12 #include "FairMQLogger.h"
13 #include "FairMQProgOptions.h"
15 #include "StorableTimeslice.hpp"
16 #include "TimesliceInputArchive.hpp"
17 #include "TimesliceMultiInputArchive.hpp"
18 #include "TimesliceMultiSubscriber.hpp"
19 #include "TimesliceSubscriber.hpp"
21 #include <boost/algorithm/string.hpp>
22 #include <boost/archive/binary_oarchive.hpp>
23 #include <boost/filesystem.hpp>
24 #include <boost/regex.hpp>
26 namespace filesys = boost::filesystem;
42 using std::runtime_error::runtime_error;
63 fFileName = fConfig->GetValue<
string>(
"filename");
64 fDirName = fConfig->GetValue<
string>(
"dirname");
65 fHost = fConfig->GetValue<
string>(
"flib-host");
66 fPort = fConfig->GetValue<uint64_t>(
"flib-port");
80 bool isGoodInputCombi {
false};
83 isGoodInputCombi =
true;
87 isGoodInputCombi =
true;
91 isGoodInputCombi =
true;
92 LOG(info) <<
"Host: " <<
fHost;
93 LOG(info) <<
"Port: " <<
fPort;
96 isGoodInputCombi =
true;
97 LOG(info) <<
"Host string: " <<
fHost;
100 isGoodInputCombi =
true;
101 LOG(INFO) <<
"Host string: " <<
fHost;
103 isGoodInputCombi =
false;
107 if (!isGoodInputCombi) {
109 "Wrong combination of inputs. Either file or wildcard file + directory "
110 "or host + port are allowed combination.");
124 int noChannel = fChannels.size();
125 LOG(info) <<
"Number of defined output channels: " << noChannel;
126 for (
auto const& entry : fChannels) {
127 LOG(info) <<
"Channel name: " << entry.first;
133 LOG(info) <<
"Value : " << value;
135 throw InitTaskError(
"Sending same data to more than one output channel "
136 "not implemented yet.");
144 std::string connector =
fHost +
":" + std::to_string(
fPort);
145 LOG(info) <<
"Open TSPublisher at " << connector;
146 fSource =
new fles::TimesliceMultiSubscriber(connector);
148 std::string connector =
fHost;
149 LOG(info) <<
"Open TSPublisher with host string: " << connector;
153 std::string fileList {
""};
155 std::string fileName = obj;
156 fileList += fileName;
160 LOG(info) <<
"Input File String: " << fileList;
165 fTime = std::chrono::steady_clock::now();
167 LOG(error) << e.what();
175 LOG(info) <<
"Looking for name " << channelName <<
" in " << entry;
176 std::size_t pos1 = channelName.find(entry);
177 if (pos1 != std::string::npos) {
178 const vector<std::string>::const_iterator
pos =
181 LOG(info) <<
"Found " << entry <<
" in " << channelName;
182 LOG(info) <<
"Channel name " << channelName
183 <<
" found in list of allowed channel names at position "
192 LOG(info) <<
"Channel name " << channelName
193 <<
" not found in list of allowed channel names.";
194 LOG(error) <<
"Stop device.";
199 for (
auto const& entry : fChannels) {
200 LOG(info) <<
"Inspect " << entry.first;
201 std::size_t pos1 = channelName.find(entry.first);
202 if (pos1 != std::string::npos) {
203 LOG(info) <<
"Channel name " << channelName
204 <<
" found in list of defined channel names ";
208 LOG(info) <<
"Channel name " << channelName
209 <<
" not found in list of defined channel names.";
210 LOG(error) <<
"Stop device.";
215 auto timeslice =
fSource->get();
220 const fles::Timeslice& ts = *timeslice;
221 auto tsIndex = ts.index();
224 LOG(info) <<
"Sample TimeSlice " <<
fTSCounter <<
", Index " << tsIndex;
226 LOG(debug) <<
"Found " << ts.num_components()
227 <<
" different components in timeslice " <<
fTSCounter
228 <<
", index " << tsIndex;
238 std::vector<FairMQParts> parts;
239 std::vector<bool> bparts;
241 bparts.resize(parts.size());
242 for (uint
i = 0;
i < bparts.size();
i++)
248 const vector<int>::const_iterator
pos =
251 const vector<std::string>::size_type idx =
pos -
fSysId.begin();
253 fles::StorableTimeslice tss = fles::StorableTimeslice(ts);
256 std::stringstream oss;
257 boost::archive::binary_oarchive oa(oss);
259 std::string* strMsg =
new std::string(oss.str());
260 LOG(debug) <<
"AddPart " << idx <<
" with length "
263 parts[idx].AddPart(NewMessage(
264 const_cast<char*
>(strMsg->c_str()),
266 [](
void* ,
void*
object) {
267 delete static_cast<std::string*>(object);
270 LOG(debug) <<
"AddParts to " << idx <<
": current size "
271 << parts[idx].Size();
278 LOG(debug) <<
"parts with size " << parts.size()
279 <<
", #components: " << ts.num_components();
281 for (uint nrComp = 0; nrComp < ts.num_components(); ++nrComp) {
283 LOG(debug) <<
"nrComp " << nrComp <<
", SysID: "
284 <<
static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
285 int iSysId =
static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
286 if (iSysId == 0x90 || iSysId == 0x91)
288 const vector<int>::const_iterator
pos =
291 const vector<std::string>::size_type idx =
pos -
fSysId.begin();
293 LOG(debug) <<
"Append timeslice component of link " << nrComp
294 <<
" to idx " << idx;
296 fles::StorableTimeslice component {
static_cast<uint32_t
>(
297 ts.num_microslices(nrComp), ts.index())};
298 component.append_component(ts.num_microslices(0));
300 for (
size_t m = 0;
m < ts.num_microslices(nrComp); ++
m) {
301 component.append_microslice(
302 0,
m, ts.descriptor(nrComp,
m), ts.content(nrComp,
m));
310 std::stringstream oss;
311 boost::archive::binary_oarchive oa(oss);
313 std::string* strMsg =
new std::string(oss.str());
315 LOG(debug) <<
"AddParts to " << idx <<
": current size "
316 << parts[idx].Size();
318 parts[idx].AddPart(NewMessage(
319 const_cast<char*
>(strMsg->c_str()),
321 [](
void* ,
void*
object) {
322 delete static_cast<std::string*>(object);
335 for (uint idx = 0; idx < parts.size(); idx++)
337 LOG(debug) <<
"Send parts with size " << parts[idx].Size()
340 LOG(error) <<
"Problem sending data";
344 << parts[idx].Size();
351 LOG(info) <<
" Number of requested time slices reached, exiting ";
356 LOG(info) <<
" No more data, exiting ";
363 const fles::Timeslice& ,
397 const fles::StorableTimeslice& ,
441 LOG(debug) <<
"SysID: " <<
static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
442 const vector<int>::const_iterator
pos =
445 static_cast<int>(ts.descriptor(nrComp, 0).sys_id));
447 const vector<std::string>::size_type idx =
pos -
fSysId.begin();
449 LOG(debug) <<
"Create timeslice component for link " << nrComp;
451 fles::StorableTimeslice component {
452 static_cast<uint32_t
>(ts.num_core_microslices()), ts.index()};
453 component.append_component(ts.num_microslices(nrComp));
455 for (
size_t m = 0;
m < ts.num_microslices(nrComp); ++
m) {
456 component.append_microslice(
457 0,
m, ts.descriptor(nrComp,
m), ts.content(nrComp,
m));
459 if (!
SendData(component, idx))
return false;
469 std::stringstream oss;
470 boost::archive::binary_oarchive oa(oss);
472 std::string* strMsg =
new std::string(oss.str());
474 FairMQMessagePtr msg(NewMessage(
475 const_cast<char*
>(strMsg->c_str()),
477 [](
void* ,
void*
object) {
478 delete static_cast<std::string*>(object);
493 LOG(error) <<
"Problem sending data";
508 std::chrono::duration<double> run_time =
509 std::chrono::steady_clock::now() -
fTime;
511 LOG(info) <<
"Runtime: " << run_time.count();
512 LOG(info) <<
"No more input data";
517 const fles::MicrosliceDescriptor& mdsc) {
518 LOG(info) <<
"Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id)
520 LOG(info) <<
"Header version: Ox" << std::hex
521 <<
static_cast<int>(mdsc.hdr_ver) << std::dec;
522 LOG(info) <<
"Equipement ID: " << mdsc.eq_id;
523 LOG(info) <<
"Flags: " << mdsc.flags;
524 LOG(info) <<
"Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id)
526 LOG(info) <<
"Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver)
528 LOG(info) <<
"Microslice Idx: " << mdsc.idx;
529 LOG(info) <<
"Checksum: " << mdsc.crc;
530 LOG(info) <<
"Size: " << mdsc.size;
531 LOG(info) <<
"Offset: " << mdsc.offset;
535 if (0 == ts.num_components()) {
536 LOG(error) <<
"No Component in TS " << ts.index();
539 LOG(debug) <<
"Found " << ts.num_components()
540 <<
" different components in timeslice";
542 for (
size_t c = 0; c < ts.num_components(); ++c) {
543 LOG(debug) <<
"Found " << ts.num_microslices(c)
544 <<
" microslices in component " << c;
545 LOG(debug) <<
"Component " << c <<
" has a size of " << ts.size_component(c)
547 LOG(debug) <<
"Component " << c <<
" has the system id 0x" << std::hex
548 <<
static_cast<int>(ts.descriptor(c, 0).sys_id) << std::dec;
570 LOG(info) <<
"stop subscribers in 10 sec";
571 std::this_thread::sleep_for(std::chrono::milliseconds(10000));
573 FairMQMessagePtr pub(NewSimpleMessage(
"STOP"));
574 if (Send(pub,
"syscmd") < 0) {
575 LOG(error) <<
"Sending STOP message failed";
578 LOG(info) <<
"task reset subscribers in 1 sec";
579 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
580 FairMQMessagePtr task_reset(NewSimpleMessage(
"TASK_RESET"));
582 if (Send(task_reset,
"syscmd") < 0) {
583 LOG(error) <<
"Sending Task_Reset message failed";