15 #include "FairMQLogger.h"
16 #include "FairMQProgOptions.h"
19 #include "TimesliceInputArchive.hpp"
20 #include "TimesliceMultiInputArchive.hpp"
21 #include "TimesliceMultiSubscriber.hpp"
22 #include "TimesliceSubscriber.hpp"
24 #include <boost/algorithm/string.hpp>
25 #include <boost/archive/binary_oarchive.hpp>
26 #include <boost/filesystem.hpp>
27 #include <boost/regex.hpp>
29 namespace filesys = boost::filesystem;
45 using std::runtime_error::runtime_error;
65 fFileName = fConfig->GetValue<
string>(
"filename");
66 fDirName = fConfig->GetValue<
string>(
"dirname");
67 fHost = fConfig->GetValue<
string>(
"flib-host");
68 fPort = fConfig->GetValue<uint64_t>(
"flib-port");
71 fbNoSplitTs = fConfig->GetValue<
bool>(
"no-split-ts");
87 LOG(warning) <<
"Both no-split-ts, send-ts-per-sysid and "
88 "send-ts-per-channel options used => "
89 <<
" second and third one will be ignored!!!!";
93 <<
"Both no-split-ts and send-ts-per-sysid options used => "
94 <<
" second one will be ignored!!!!";
98 <<
"Both no-split-ts and send-ts-per-channel options used => "
99 <<
" second one will be ignored!!!!";
104 <<
"Both send-ts-per-sysid and send-ts-per-channel options used => "
105 <<
" second one will be ignored!!!!";
109 std::vector<std::string> vSysIdChanPairs =
110 fConfig->GetValue<std::vector<std::string>>(
"sysid-chan");
111 for (uint32_t uPair = 0; uPair < vSysIdChanPairs.size(); ++uPair) {
112 const size_t sep = vSysIdChanPairs[uPair].find(
':');
113 if (string::npos == sep || 0 == sep
114 || vSysIdChanPairs[uPair].size() == sep) {
115 LOG(info) << vSysIdChanPairs[uPair];
117 "Provided pair of SysId + Channel name is missing a : or an argument.");
121 std::string sSysId = vSysIdChanPairs[uPair].substr(0, sep);
122 const size_t hexPos = sSysId.find(
"0x");
124 if (string::npos == hexPos)
125 iSysId = std::stoi(sSysId);
127 iSysId = std::stoi(sSysId.substr(hexPos + 2),
nullptr, 16);
130 std::string sChannelName = vSysIdChanPairs[uPair].substr(sep + 1);
133 const vector<int>::const_iterator
pos =
137 const vector<std::string>::size_type idx =
pos -
fSysId.begin();
146 LOG(info) << vSysIdChanPairs[uPair] <<
" " << iSysId <<
" " << sChannelName;
157 bool isGoodInputCombi {
false};
160 isGoodInputCombi =
true;
164 isGoodInputCombi =
true;
168 isGoodInputCombi =
true;
169 LOG(info) <<
"Host: " <<
fHost;
170 LOG(info) <<
"Port: " <<
fPort;
173 isGoodInputCombi =
true;
174 LOG(info) <<
"Host string: " <<
fHost;
176 isGoodInputCombi =
false;
180 if (!isGoodInputCombi) {
182 "Wrong combination of inputs. Either file or wildcard file + directory "
183 "or host + port are allowed combination.");
197 int noChannel = fChannels.size();
198 LOG(info) <<
"Number of defined output channels: " << noChannel;
199 for (
auto const& entry : fChannels) {
206 LOG(info) <<
"Channel name: " << entry.first;
212 LOG(info) <<
"Value : " << value;
214 throw InitTaskError(
"Sending same data to more than one output channel "
215 "not implemented yet.");
223 std::string connector =
fHost +
":" + std::to_string(
fPort);
224 LOG(info) <<
"Open TSPublisher at " << connector;
225 fSource =
new fles::TimesliceMultiSubscriber(connector);
227 std::string connector =
fHost;
228 LOG(info) <<
"Open TSPublisher with host string: " << connector;
232 std::string fileList {
""};
234 std::string fileName = obj;
235 fileList += fileName;
239 LOG(info) <<
"Input File String: " << fileList;
247 LOG(info) <<
"Sending TS copies in no-split mode";
250 LOG(info) <<
"Sending components in separate TS per SysId";
253 LOG(info) <<
"Sending components in separate TS per channel";
325 fTime = std::chrono::steady_clock::now();
327 LOG(error) << e.what();
340 bool bFoundMatch =
false;
344 LOG(info) <<
"Looking for name " << channelName <<
" in " << entry;
345 std::size_t pos1 = channelName.find(entry);
346 if (pos1 != std::string::npos) {
352 LOG(info) <<
"Found " << entry <<
" in " << channelName;
353 LOG(info) <<
"Channel name " << channelName
354 <<
" found in list of allowed channel names at position " << idx
355 <<
" (SysId 0x" << std::hex <<
fSysId[idx] << std::dec <<
")";
369 LOG(info) <<
"Channel name " << channelName
370 <<
" not found in list of allowed channel names.";
371 LOG(error) <<
"Stop device.";
378 auto timeslice =
fSource->get();
383 const fles::Timeslice& ts = *timeslice;
384 uint64_t uTsIndex = ts.index();
390 LOG(debug) <<
"Missed Timeslices. Old TS Index was " <<
fuPrevTsIndex
391 <<
" New TS Index is " << uTsIndex;
393 std::vector<uint64_t> vulMissedIndices;
394 for (uint64_t ulMiss =
fuPrevTsIndex + 1; ulMiss < uTsIndex; ++ulMiss) {
395 vulMissedIndices.emplace_back(ulMiss);
401 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
411 LOG(info) <<
"Received TS " <<
fTSCounter <<
" with index " << uTsIndex;
414 LOG(debug) <<
"Found " << ts.num_components()
415 <<
" different components in timeslice";
427 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
441 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
455 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
463 for (
unsigned int nrComp = 0; nrComp < ts.num_components(); ++nrComp) {
468 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
483 std::this_thread::sleep_for(std::chrono::seconds(10));
484 std::string sCmd =
"EOF ";
499 std::this_thread::sleep_for(std::chrono::seconds(10));
500 std::string sCmd =
"EOF ";
531 LOG(debug) <<
"SysID: " <<
static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
532 const vector<int>::const_iterator
pos =
535 static_cast<int>(ts.descriptor(nrComp, 0).sys_id));
537 const vector<std::string>::size_type idx =
pos -
fSysId.begin();
539 LOG(debug) <<
"Create timeslice component for link " << nrComp;
541 fles::StorableTimeslice component {
542 static_cast<uint32_t
>(ts.num_core_microslices()), ts.index()};
543 component.append_component(ts.num_microslices(0));
545 for (
size_t m = 0;
m < ts.num_microslices(nrComp); ++
m) {
546 component.append_microslice(
547 0,
m, ts.descriptor(nrComp,
m), ts.content(nrComp,
m));
554 if (!
SendData(component, idx))
return false;
562 const fles::Timeslice& ts) {
565 for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
566 uint16_t usSysId = ts.descriptor(uCompIdx, 0).sys_id;
568 const vector<int>::const_iterator
pos =
571 const vector<std::string>::size_type idx =
pos -
fSysId.begin();
578 std::stringstream ss;
580 <<
" components for SysId 0x" << std::hex << std::setw(2)
581 <<
fSysId[uSysIdx] << std::dec <<
" :";
588 LOG(info) << ss.str();
597 LOG(debug) <<
"Create timeslice with components for SysId " << std::hex
598 <<
fSysId[uSysIdx] << std::dec;
601 fles::StorableTimeslice component {
602 static_cast<uint32_t
>(ts.num_core_microslices()), ts.index()};
606 uint32_t uNumMsInComp =
608 component.append_component(uNumMsInComp);
610 LOG(debug) <<
"Add components to TS for SysId " << std::hex
611 <<
fSysId[uSysIdx] << std::dec <<
" TS " << ts.index()
614 for (
size_t m = 0;
m < uNumMsInComp; ++
m) {
615 component.append_microslice(
623 LOG(debug) <<
"Prepared timeslice for SysId " << std::hex
624 <<
fSysId[uSysIdx] << std::dec <<
" with "
625 << component.num_components() <<
" components";
627 if (!
SendData(component, uSysIdx))
return false;
636 const fles::Timeslice& ts) {
645 const vector<std::string>::const_iterator
pos =
662 for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
663 uint16_t usSysId = ts.descriptor(uCompIdx, 0).sys_id;
665 const vector<int>::const_iterator
pos =
668 const vector<std::string>::size_type idxSys =
pos -
fSysId.begin();
673 const vector<std::string>::const_iterator posCh =
678 const vector<std::string>::size_type idxChan =
689 std::stringstream ss;
698 LOG(info) << ss.str();
708 for (uint32_t uChanIdx = 0; uChanIdx <
fvChannelsToSend.size(); ++uChanIdx) {
709 LOG(debug) <<
"Create timeslice with components for channel "
713 fles::StorableTimeslice component {
714 static_cast<uint32_t
>(ts.num_core_microslices()), ts.index()};
718 uint32_t uNumMsInComp =
720 component.append_component(uNumMsInComp);
723 <<
"Add components to TS for SysId " << std::hex
724 <<
static_cast<uint16_t
>(
726 << std::dec <<
" TS " << ts.index() <<
" Comp "
729 for (
size_t m = 0;
m < uNumMsInComp; ++
m) {
730 component.append_microslice(
738 LOG(debug) <<
"Prepared timeslice for channel "
740 << component.num_components() <<
" components";
751 for (uint32_t uChanIdx = 0; uChanIdx <
fChannelsToSend.size(); ++uChanIdx) {
753 LOG(debug) <<
"Copy timeslice component for channel "
756 fles::StorableTimeslice fullTs {ts};
757 if (!
SendData(fullTs, uChanIdx))
return false;
766 std::stringstream oss;
767 boost::archive::binary_oarchive oa(oss);
769 std::string* strMsg =
new std::string(oss.str());
771 FairMQMessagePtr msg(NewMessage(
772 const_cast<char*
>(strMsg->c_str()),
774 [](
void* ,
void*
object) {
775 delete static_cast<std::string*>(object);
790 LOG(error) <<
"Problem sending data";
802 std::string sChannel) {
804 std::stringstream oss;
805 boost::archive::binary_oarchive oa(oss);
807 std::string* strMsg =
new std::string(oss.str());
809 FairMQMessagePtr msg(NewMessage(
810 const_cast<char*
>(strMsg->c_str()),
812 [](
void* ,
void*
object) {
813 delete static_cast<std::string*>(object);
821 LOG(debug) <<
"Send data to channel " << sChannel;
822 if (Send(msg, sChannel) < 0) {
823 LOG(error) <<
"Problem sending data";
835 std::stringstream oss;
836 boost::archive::binary_oarchive oa(oss);
838 std::string* strMsg =
new std::string(oss.str());
840 FairMQMessagePtr msg(NewMessage(
841 const_cast<char*
>(strMsg->c_str()),
843 [](
void* ,
void*
object) {
844 delete static_cast<std::string*>(object);
854 LOG(error) <<
"Problem sending missed TS indices to channel "
863 std::stringstream oss;
864 boost::archive::binary_oarchive oa(oss);
866 std::string* strMsg =
new std::string(oss.str());
868 FairMQMessagePtr msg(NewMessage(
869 const_cast<char*
>(strMsg->c_str()),
871 [](
void* ,
void*
object) {
872 delete static_cast<std::string*>(object);
887 LOG(error) <<
"Problem sending missed TS indices to channel "
919 std::chrono::duration<double> run_time =
920 std::chrono::steady_clock::now() -
fTime;
922 LOG(info) <<
"Runtime: " << run_time.count();
923 LOG(info) <<
"No more input data";
928 const fles::MicrosliceDescriptor& mdsc) {
929 LOG(info) <<
"Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id)
931 LOG(info) <<
"Header version: Ox" << std::hex
932 <<
static_cast<int>(mdsc.hdr_ver) << std::dec;
933 LOG(info) <<
"Equipement ID: " << mdsc.eq_id;
934 LOG(info) <<
"Flags: " << mdsc.flags;
935 LOG(info) <<
"Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id)
937 LOG(info) <<
"Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver)
939 LOG(info) <<
"Microslice Idx: " << mdsc.idx;
940 LOG(info) <<
"Checksum: " << mdsc.crc;
941 LOG(info) <<
"Size: " << mdsc.size;
942 LOG(info) <<
"Offset: " << mdsc.offset;
946 if (0 == ts.num_components()) {
947 LOG(error) <<
"No Component in TS " << ts.index();
950 LOG(info) <<
"Found " << ts.num_components()
951 <<
" different components in timeslice";
953 for (
size_t c = 0; c < ts.num_components(); ++c) {
954 LOG(info) <<
"Found " << ts.num_microslices(c)
955 <<
" microslices in component " << c;
956 LOG(info) <<
"Component " << c <<
" has a size of " << ts.size_component(c)
958 LOG(info) <<
"Component " << c <<
" has the system id 0x" << std::hex
959 <<
static_cast<int>(ts.descriptor(c, 0).sys_id) << std::dec;
960 LOG(info) <<
"Component " << c <<
" has the system id 0x"
961 <<
static_cast<int>(ts.descriptor(c, 0).sys_id);