12 #include "FairMQLogger.h"
13 #include "FairMQProgOptions.h"
15 #include "TimesliceInputArchive.hpp"
16 #include "TimesliceSubscriber.hpp"
18 #include <boost/algorithm/string.hpp>
19 #include <boost/archive/binary_oarchive.hpp>
20 #include <boost/filesystem.hpp>
21 #include <boost/regex.hpp>
23 namespace filesys = boost::filesystem;
37 using std::runtime_error::runtime_error;
58 fFileName = fConfig->GetValue<
string>(
"filename");
59 fDirName = fConfig->GetValue<
string>(
"dirname");
60 fHost = fConfig->GetValue<
string>(
"flib-host");
61 fPort = fConfig->GetValue<uint64_t>(
"flib-port");
70 bool isGoodInputCombi {
false};
73 isGoodInputCombi =
true;
76 if (!filesys::is_regular_file(pathObj)) {
83 isGoodInputCombi =
true;
85 if (!filesys::is_directory(pathObj)) {
86 throw InitTaskError(
"Passed directory name is no valid directory");
88 if (
fFileName.find(
"*") == std::string::npos) {
91 if (!filesys::is_regular_file(pathObj)) {
97 std::vector<filesys::path>
v;
102 boost::replace_all(
fFileName,
".",
"\\.");
103 boost::replace_all(
fFileName,
"*",
".*");
109 for (
auto&&
x : filesys::directory_iterator(pathObj)) {
111 if (!boost::filesystem::is_regular_file(
x))
continue;
119 if (!boost::regex_match(
x.path().leaf().string(), what, my_filter))
122 v.push_back(
x.path());
127 std::sort(
v.begin(),
v.end());
132 LOG(info) <<
"The following files will be used in this order.";
134 LOG(info) <<
" " <<
x;
140 isGoodInputCombi =
true;
141 LOG(info) <<
"Host: " <<
fHost;
142 LOG(info) <<
"Port: " <<
fPort;
144 isGoodInputCombi =
false;
147 if (!isGoodInputCombi) {
149 "Wrong combination of inputs. Either file or wildcard file + directory "
150 "or host + port are allowed combination.");
164 int noChannel = fChannels.size();
165 LOG(info) <<
"Number of defined output channels: " << noChannel;
166 for (
auto const& entry : fChannels) {
167 LOG(info) <<
"Channel name: " << entry.first;
173 LOG(info) <<
"Value : " << value;
175 throw InitTaskError(
"Sending same data to more than one output channel "
176 "not implemented yet.");
181 std::string connector =
"tcp://" +
fHost +
":" + std::to_string(
fPort);
182 LOG(info) <<
"Open TSPublisher at " << connector;
183 fSource =
new fles::TimesliceSubscriber(connector);
188 "Could not open the first input file in the list, Doing nothing!");
192 fTime = std::chrono::steady_clock::now();
194 LOG(error) << e.what();
206 LOG(info) <<
"Open the Flib input file " <<
fFileName;
208 if (!filesys::is_regular_file(pathObj)) {
209 LOG(error) <<
"Input file " <<
fFileName <<
" doesn't exist.";
214 LOG(error) <<
"Could not open input file.";
218 LOG(info) <<
"End of files list reached.";
227 LOG(info) <<
"Inspect " << entry;
228 std::size_t pos1 = channelName.find(entry);
229 if (pos1 != std::string::npos) {
230 const vector<std::string>::const_iterator
pos =
233 LOG(info) <<
"Found " << entry <<
" in " << channelName;
234 LOG(info) <<
"Channel name " << channelName
235 <<
" found in list of allowed channel names at position "
244 LOG(info) <<
"Channel name " << channelName
245 <<
" not found in list of allowed channel names.";
246 LOG(error) <<
"Stop device.";
251 for (
auto const& entry : fChannels) {
252 LOG(info) <<
"Inspect " << entry.first;
253 std::size_t pos1 = channelName.find(entry.first);
254 if (pos1 != std::string::npos) {
255 LOG(info) <<
"Channel name " << channelName
256 <<
" found in list of defined channel names ";
260 LOG(info) <<
"Channel name " << channelName
261 <<
" not found in list of defined channel names.";
262 LOG(error) <<
"Stop device.";
268 auto timeslice =
fSource->get();
273 const fles::Timeslice& ts = *timeslice;
274 auto tsIndex = ts.index();
277 LOG(info) <<
"Sample TimeSlice " <<
fTSCounter <<
", Index " << tsIndex;
279 LOG(debug) <<
"Found " << ts.num_components()
280 <<
" different components in timeslice " <<
fTSCounter
281 <<
", index " << tsIndex;
291 std::vector<FairMQParts> parts;
292 std::vector<bool> bparts;
294 bparts.resize(parts.size());
295 for (uint
i = 0;
i < bparts.size();
i++)
297 LOG(debug) <<
"parts with size " << parts.size();
299 for (uint nrComp = 0; nrComp < ts.num_components(); ++nrComp) {
301 LOG(debug) <<
"nrComp " << nrComp <<
", SysID: "
302 <<
static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
303 const vector<int>::const_iterator
pos =
306 static_cast<int>(ts.descriptor(nrComp, 0).sys_id));
308 const vector<std::string>::size_type idx =
pos -
fSysId.begin();
310 LOG(debug) <<
"Append timeslice component of link " << nrComp
311 <<
" to idx " << idx;
313 fles::StorableTimeslice component {
314 static_cast<uint32_t
>(ts.num_microslices(nrComp), ts.index())};
315 component.append_component(ts.num_microslices(0));
317 for (
size_t m = 0;
m < ts.num_microslices(nrComp); ++
m) {
318 component.append_microslice(
319 0,
m, ts.descriptor(nrComp,
m), ts.content(nrComp,
m));
327 std::stringstream oss;
328 boost::archive::binary_oarchive oa(oss);
330 std::string* strMsg =
new std::string(oss.str());
332 LOG(debug) <<
"AddParts to " << idx <<
": current size "
333 << parts[idx].Size();
335 parts[idx].AddPart(NewMessage(
336 const_cast<char*
>(strMsg->c_str()),
338 [](
void* ,
void*
object) {
339 delete static_cast<std::string*>(object);
348 for (uint idx = 0; idx < parts.size(); idx++)
350 LOG(debug) <<
"Send parts with size " << parts[idx].Size()
353 LOG(error) <<
"Problem sending data";
357 << parts[idx].Size();
364 LOG(info) <<
" Number of requested time slices reached, exiting ";
387 const fles::Timeslice& ,
421 const fles::StorableTimeslice& ,
465 LOG(debug) <<
"SysID: " <<
static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
466 const vector<int>::const_iterator
pos =
469 static_cast<int>(ts.descriptor(nrComp, 0).sys_id));
471 const vector<std::string>::size_type idx =
pos -
fSysId.begin();
473 LOG(debug) <<
"Create timeslice component for link " << nrComp;
475 fles::StorableTimeslice component {
476 static_cast<uint32_t
>(ts.num_microslices(nrComp), ts.index())};
477 component.append_component(ts.num_microslices(0));
479 for (
size_t m = 0;
m < ts.num_microslices(nrComp); ++
m) {
480 component.append_microslice(
481 0,
m, ts.descriptor(nrComp,
m), ts.content(nrComp,
m));
483 if (!
SendData(component, idx))
return false;
493 std::stringstream oss;
494 boost::archive::binary_oarchive oa(oss);
496 std::string* strMsg =
new std::string(oss.str());
498 FairMQMessagePtr msg(NewMessage(
499 const_cast<char*
>(strMsg->c_str()),
501 [](
void* ,
void*
object) {
502 delete static_cast<std::string*>(object);
517 LOG(error) <<
"Problem sending data";
532 std::chrono::duration<double> run_time =
533 std::chrono::steady_clock::now() -
fTime;
535 LOG(info) <<
"Runtime: " << run_time.count();
536 LOG(info) <<
"No more input data";
541 const fles::MicrosliceDescriptor& mdsc) {
542 LOG(info) <<
"Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id)
544 LOG(info) <<
"Header version: Ox" << std::hex
545 <<
static_cast<int>(mdsc.hdr_ver) << std::dec;
546 LOG(info) <<
"Equipement ID: " << mdsc.eq_id;
547 LOG(info) <<
"Flags: " << mdsc.flags;
548 LOG(info) <<
"Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id)
550 LOG(info) <<
"Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver)
552 LOG(info) <<
"Microslice Idx: " << mdsc.idx;
553 LOG(info) <<
"Checksum: " << mdsc.crc;
554 LOG(info) <<
"Size: " << mdsc.size;
555 LOG(info) <<
"Offset: " << mdsc.offset;
559 if (0 == ts.num_components()) {
560 LOG(error) <<
"No Component in TS " << ts.index();
563 LOG(debug) <<
"Found " << ts.num_components()
564 <<
" different components in timeslice";
566 for (
size_t c = 0; c < ts.num_components(); ++c) {
567 LOG(debug) <<
"Found " << ts.num_microslices(c)
568 <<
" microslices in component " << c;
569 LOG(debug) <<
"Component " << c <<
" has a size of " << ts.size_component(c)
571 LOG(debug) <<
"Component " << c <<
" has the system id 0x" << std::hex
572 <<
static_cast<int>(ts.descriptor(c, 0).sys_id) << std::dec;
589 LOG(info) <<
"stop subscribers in 100 sec";
590 std::this_thread::sleep_for(std::chrono::milliseconds(100000));
592 FairMQMessagePtr pub(NewSimpleMessage(
"STOP"));
593 if (Send(pub,
"syscmd") < 0) {
594 LOG(error) <<
"Sending STOP message failed";
597 LOG(info) <<
"task reset subscribers in 1 sec";
598 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
599 FairMQMessagePtr task_reset(NewSimpleMessage(
"TASK_RESET"));
601 if (Send(task_reset,
"syscmd") < 0) {
602 LOG(error) <<
"Sending Task_Reset message failed";