12 #include "FairMQLogger.h"
13 #include "FairMQProgOptions.h"
15 #include "TimesliceInputArchive.hpp"
16 #include "TimesliceSubscriber.hpp"
18 #include <boost/algorithm/string.hpp>
19 #include <boost/archive/binary_oarchive.hpp>
20 #include <boost/filesystem.hpp>
21 #include <boost/regex.hpp>
23 namespace filesys = boost::filesystem;
37 using std::runtime_error::runtime_error;
57 fFileName = fConfig->GetValue<
string>(
"filename");
58 fDirName = fConfig->GetValue<
string>(
"dirname");
59 fHost = fConfig->GetValue<
string>(
"flib-host");
60 fPort = fConfig->GetValue<uint64_t>(
"flib-port");
69 bool isGoodInputCombi {
false};
72 isGoodInputCombi =
true;
75 if (!filesys::is_regular_file(pathObj)) {
82 isGoodInputCombi =
true;
84 if (!filesys::is_directory(pathObj)) {
85 throw InitTaskError(
"Passed directory name is no valid directory");
87 if (
fFileName.find(
"*") == std::string::npos) {
90 if (!filesys::is_regular_file(pathObj)) {
96 std::vector<filesys::path>
v;
101 boost::replace_all(
fFileName,
".",
"\\.");
102 boost::replace_all(
fFileName,
"*",
".*");
108 for (
auto&&
x : filesys::directory_iterator(pathObj)) {
110 if (!boost::filesystem::is_regular_file(
x))
continue;
118 if (!boost::regex_match(
x.path().leaf().string(), what, my_filter))
121 v.push_back(
x.path());
126 std::sort(
v.begin(),
v.end());
131 LOG(info) <<
"The following files will be used in this order.";
133 LOG(info) <<
" " <<
x;
138 isGoodInputCombi =
true;
139 LOG(info) <<
"Host: " <<
fHost;
140 LOG(info) <<
"Port: " <<
fPort;
142 isGoodInputCombi =
false;
145 if (!isGoodInputCombi) {
147 "Wrong combination of inputs. Either file or wildcard file + directory "
148 "or host + port are allowed combination.");
162 int noChannel = fChannels.size();
163 LOG(info) <<
"Number of defined output channels: " << noChannel;
164 for (
auto const& entry : fChannels) {
165 LOG(info) <<
"Channel name: " << entry.first;
171 LOG(info) <<
"Value : " << value;
173 throw InitTaskError(
"Sending same data to more than one output channel "
174 "not implemented yet.");
179 std::string connector =
"tcp://" +
fHost +
":" + std::to_string(
fPort);
180 LOG(info) <<
"Open TSPublisher at " << connector;
181 fSource =
new fles::TimesliceSubscriber(connector);
186 "Could not open the first input file in the list, Doing nothing!");
190 fTime = std::chrono::steady_clock::now();
192 LOG(error) << e.what();
204 LOG(info) <<
"Open the Flib input file " <<
fFileName;
206 if (!filesys::is_regular_file(pathObj)) {
207 LOG(error) <<
"Input file " <<
fFileName <<
" doesn't exist.";
212 LOG(error) <<
"Could not open input file.";
216 LOG(info) <<
"End of files list reached.";
225 std::size_t pos1 = channelName.find(entry);
226 if (pos1 != std::string::npos) {
227 const vector<std::string>::const_iterator
pos =
230 LOG(info) <<
"Found " << entry <<
" in " << channelName;
231 LOG(info) <<
"Channel name " << channelName
232 <<
" found in list of allowed channel names at position "
239 LOG(info) <<
"Channel name " << channelName
240 <<
" not found in list of allowed channel names.";
241 LOG(error) <<
"Stop device.";
248 auto timeslice =
fSource->get();
255 const fles::Timeslice& ts = *timeslice;
259 LOG(info) <<
"Found " << ts.num_components()
260 <<
" different components in timeslice";
265 for (
unsigned int nrComp = 0; nrComp < ts.num_components(); ++nrComp) {
294 LOG(info) <<
"SysID: " <<
static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
295 const vector<int>::const_iterator
pos =
298 static_cast<int>(ts.descriptor(nrComp, 0).sys_id));
300 const vector<std::string>::size_type idx =
pos -
fSysId.begin();
302 LOG(info) <<
"Create timeslice component for link " << nrComp;
304 fles::StorableTimeslice component {
305 static_cast<uint32_t
>(ts.num_microslices(nrComp), ts.index())};
306 component.append_component(ts.num_microslices(0));
308 for (
size_t m = 0;
m < ts.num_microslices(nrComp); ++
m) {
309 component.append_microslice(
310 0,
m, ts.descriptor(nrComp,
m), ts.content(nrComp,
m));
312 if (!
SendData(component, idx))
return false;
322 std::stringstream oss;
323 boost::archive::binary_oarchive oa(oss);
325 std::string* strMsg =
new std::string(oss.str());
327 FairMQMessagePtr msg(NewMessage(
328 const_cast<char*
>(strMsg->c_str()),
330 [](
void* ,
void*
object) {
331 delete static_cast<std::string*>(object);
346 LOG(error) <<
"Problem sending data";
361 std::chrono::duration<double> run_time =
362 std::chrono::steady_clock::now() -
fTime;
364 LOG(info) <<
"Runtime: " << run_time.count();
365 LOG(info) <<
"No more input data";
370 const fles::MicrosliceDescriptor& mdsc) {
371 LOG(info) <<
"Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id)
373 LOG(info) <<
"Header version: Ox" << std::hex
374 <<
static_cast<int>(mdsc.hdr_ver) << std::dec;
375 LOG(info) <<
"Equipement ID: " << mdsc.eq_id;
376 LOG(info) <<
"Flags: " << mdsc.flags;
377 LOG(info) <<
"Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id)
379 LOG(info) <<
"Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver)
381 LOG(info) <<
"Microslice Idx: " << mdsc.idx;
382 LOG(info) <<
"Checksum: " << mdsc.crc;
383 LOG(info) <<
"Size: " << mdsc.size;
384 LOG(info) <<
"Offset: " << mdsc.offset;
388 if (0 == ts.num_components()) {
389 LOG(error) <<
"No Component in TS " << ts.index();
392 LOG(info) <<
"Found " << ts.num_components()
393 <<
" different components in timeslice";
395 for (
size_t c = 0; c < ts.num_components(); ++c) {
396 LOG(info) <<
"Found " << ts.num_microslices(c)
397 <<
" microslices in component " << c;
398 LOG(info) <<
"Component " << c <<
" has a size of " << ts.size_component(c)
400 LOG(info) <<
"Component " << c <<
" has the system id 0x" << std::hex
401 <<
static_cast<int>(ts.descriptor(c, 0).sys_id) << std::dec;
402 LOG(info) <<
"Component " << c <<
" has the system id 0x"
403 <<
static_cast<int>(ts.descriptor(c, 0).sys_id);