CbmRoot
CbmMQTsaSampler.cxx
Go to the documentation of this file.
1 
9 #include "CbmMQTsaSampler.h"
10 #include "CbmMQDefs.h"
11 
12 #include "FairMQLogger.h"
13 #include "FairMQProgOptions.h" // device->fConfig
14 
15 #include "TimesliceInputArchive.hpp"
16 #include "TimesliceSubscriber.hpp"
17 
18 #include <boost/algorithm/string.hpp>
19 #include <boost/archive/binary_oarchive.hpp>
20 #include <boost/filesystem.hpp>
21 #include <boost/regex.hpp>
22 
23 namespace filesys = boost::filesystem;
24 
25 #include <algorithm>
26 #include <chrono>
27 #include <ctime>
28 #include <stdio.h>
29 #include <string>
30 #include <thread> // this_thread::sleep_for
31 
32 using namespace std;
33 
34 #include <stdexcept>
35 
36 struct InitTaskError : std::runtime_error {
37  using std::runtime_error::runtime_error;
38 };
39 
41  : FairMQDevice()
42  , fMaxTimeslices(0)
43  , fFileName("")
44  , fDirName("")
45  , fInputFileList()
46  , fFileCounter(0)
47  , fHost("")
48  , fPort(0)
49  , fTSNumber(0)
50  , fTSCounter(0)
51  , fMessageCounter(0)
52  , fSource(nullptr)
53  , fTime() {}
54 
56  // Get the values from the command line options (via fConfig)
57  fFileName = fConfig->GetValue<string>("filename");
58  fDirName = fConfig->GetValue<string>("dirname");
59  fHost = fConfig->GetValue<string>("flib-host");
60  fPort = fConfig->GetValue<uint64_t>("flib-port");
61  fMaxTimeslices = fConfig->GetValue<uint64_t>("max-timeslices");
62 
63  // Check which input is defined
64  // Posibilities
65  // filename && ! dirname : single file
66  // filename with wildcards && diranme : all files with filename regex in the directory
67  // host && port : connect to the flim server
68 
69  bool isGoodInputCombi {false};
70  if (0 != fFileName.size() && 0 == fDirName.size() && 0 == fHost.size()
71  && 0 == fPort) {
72  isGoodInputCombi = true;
73  // Create a Path object from given path string
74  filesys::path pathObj(fFileName);
75  if (!filesys::is_regular_file(pathObj)) {
76  throw InitTaskError("Passed file name is no valid file");
77  }
78  fInputFileList.push_back(fFileName);
79  LOG(info) << "Filename: " << fFileName;
80  } else if (0 != fFileName.size() && 0 != fDirName.size() && 0 == fHost.size()
81  && 0 == fPort) {
82  isGoodInputCombi = true;
83  filesys::path pathObj = fDirName;
84  if (!filesys::is_directory(pathObj)) {
85  throw InitTaskError("Passed directory name is no valid directory");
86  }
87  if (fFileName.find("*") == std::string::npos) {
88  // Normal file without wildcards
89  pathObj += fFileName;
90  if (!filesys::is_regular_file(pathObj)) {
91  throw InitTaskError("Passed file name is no valid file");
92  }
93  fInputFileList.push_back(pathObj.string());
94  LOG(info) << "Filename: " << fInputFileList[0];
95  } else {
96  std::vector<filesys::path> v;
97 
98  // escape "." which have a special meaning in regex
99  // change "*" to ".*" to find any number
100  // e.g. tofget4_hd2018.*.tsa => tofget4_hd2018\..*\.tsa
101  boost::replace_all(fFileName, ".", "\\.");
102  boost::replace_all(fFileName, "*", ".*");
103 
104  // create regex
105  const boost::regex my_filter(fFileName);
106 
107  // loop over all files in input directory
108  for (auto&& x : filesys::directory_iterator(pathObj)) {
109  // Skip if not a file
110  if (!boost::filesystem::is_regular_file(x)) continue;
111 
112  // Skip if no match
113  // x.path().leaf().string() means get from directory iterator the
114  // current entry as filesys::path, from this extract the leaf
115  // filename or directory name and convert it to a string to be
116  // used in the regex:match
117  boost::smatch what;
118  if (!boost::regex_match(x.path().leaf().string(), what, my_filter))
119  continue;
120 
121  v.push_back(x.path());
122  }
123 
124  // sort the files which match the regex in increasing order
125  // (hopefully)
126  std::sort(v.begin(), v.end());
127 
128  for (auto&& x : v)
129  fInputFileList.push_back(x.string());
130 
131  LOG(info) << "The following files will be used in this order.";
132  for (auto&& x : v)
133  LOG(info) << " " << x;
134  }
135 
136  } else if (0 == fFileName.size() && 0 == fDirName.size() && 0 != fHost.size()
137  && 0 != fPort) {
138  isGoodInputCombi = true;
139  LOG(info) << "Host: " << fHost;
140  LOG(info) << "Port: " << fPort;
141  } else {
142  isGoodInputCombi = false;
143  }
144 
145  if (!isGoodInputCombi) {
146  throw InitTaskError(
147  "Wrong combination of inputs. Either file or wildcard file + directory "
148  "or host + port are allowed combination.");
149  }
150 
151 
152  LOG(info) << "MaxTimeslices: " << fMaxTimeslices;
153 
154  // Get the information about created channels from the device
155  // Check if the defined channels from the topology (by name)
156  // are in the list of channels which are possible/allowed
157  // for the device
158  // The idea is to check at initilization if the devices are
159  // properly connected. For the time beeing this is done with a
160  // nameing convention. It is not avoided that someone sends other
161  // data on this channel.
162  int noChannel = fChannels.size();
163  LOG(info) << "Number of defined output channels: " << noChannel;
164  for (auto const& entry : fChannels) {
165  LOG(info) << "Channel name: " << entry.first;
166  if (!IsChannelNameAllowed(entry.first))
167  throw InitTaskError("Channel name does not match.");
168  }
169 
170  for (auto const& value : fComponentsToSend) {
171  LOG(info) << "Value : " << value;
172  if (value > 1) {
173  throw InitTaskError("Sending same data to more than one output channel "
174  "not implemented yet.");
175  }
176  }
177 
178  if (0 == fFileName.size() && 0 != fHost.size()) {
179  std::string connector = "tcp://" + fHost + ":" + std::to_string(fPort);
180  LOG(info) << "Open TSPublisher at " << connector;
181  fSource = new fles::TimesliceSubscriber(connector);
182  if (!fSource) { throw InitTaskError("Could not connect to publisher."); }
183  } else {
184  if (false == OpenNextFile()) {
185  throw InitTaskError(
186  "Could not open the first input file in the list, Doing nothing!");
187  }
188  }
189 
190  fTime = std::chrono::steady_clock::now();
191 } catch (InitTaskError& e) {
192  LOG(error) << e.what();
193  // Wrapper defined in CbmMQDefs.h to support different FairMQ versions
195 }
196 
198  // First Close and delete existing source
199  if (nullptr != fSource) delete fSource;
200 
201  if (fInputFileList.size() > 0) {
203  fInputFileList.erase(fInputFileList.begin());
204  LOG(info) << "Open the Flib input file " << fFileName;
205  filesys::path pathObj(fFileName);
206  if (!filesys::is_regular_file(pathObj)) {
207  LOG(error) << "Input file " << fFileName << " doesn't exist.";
208  return false;
209  }
210  fSource = new fles::TimesliceInputArchive(fFileName);
211  if (!fSource) {
212  LOG(error) << "Could not open input file.";
213  return false;
214  }
215  } else {
216  LOG(info) << "End of files list reached.";
217  return false;
218  }
219  return true;
220 }
221 
222 bool CbmMQTsaSampler::IsChannelNameAllowed(std::string channelName) {
223 
224  for (auto const& entry : fAllowedChannels) {
225  std::size_t pos1 = channelName.find(entry);
226  if (pos1 != std::string::npos) {
227  const vector<std::string>::const_iterator pos =
228  std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
229  const vector<std::string>::size_type idx = pos - fAllowedChannels.begin();
230  LOG(info) << "Found " << entry << " in " << channelName;
231  LOG(info) << "Channel name " << channelName
232  << " found in list of allowed channel names at position "
233  << idx;
234  fComponentsToSend[idx]++;
235  fChannelsToSend[idx].push_back(channelName);
236  return true;
237  }
238  }
239  LOG(info) << "Channel name " << channelName
240  << " not found in list of allowed channel names.";
241  LOG(error) << "Stop device.";
242  return false;
243 }
244 
246 
247 
248  auto timeslice = fSource->get();
249  if (timeslice) {
250  if (fTSCounter < fMaxTimeslices) {
251  fTSCounter++;
252  if (fTSCounter % 10000 == 0) LOG(info) << "Analyse Event " << fTSCounter;
253 
254 
255  const fles::Timeslice& ts = *timeslice;
256  // auto tsIndex = ts.index();
257 
258 
259  LOG(info) << "Found " << ts.num_components()
260  << " different components in timeslice";
261 
262 
263  CheckTimeslice(ts);
264 
265  for (unsigned int nrComp = 0; nrComp < ts.num_components(); ++nrComp) {
266  CreateAndSendComponent(ts, nrComp);
267  }
268  return true;
269  } else {
270  if (false == OpenNextFile()) {
271  CalcRuntime();
272  return false;
273  } else {
274  return true;
275  }
276  }
277  } else {
278  if (false == OpenNextFile()) {
279  CalcRuntime();
280  return false;
281  } else {
282  return true;
283  }
284  }
285 }
286 
287 bool CbmMQTsaSampler::CreateAndSendComponent(const fles::Timeslice& ts,
288  int nrComp) {
289 
290  // Check if component has to be send. If the corresponding channel
291  // is connected create the new timeslice and send it to the
292  // correct channel
293 
294  LOG(info) << "SysID: " << static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
295  const vector<int>::const_iterator pos =
296  std::find(fSysId.begin(),
297  fSysId.end(),
298  static_cast<int>(ts.descriptor(nrComp, 0).sys_id));
299  if (pos != fSysId.end()) {
300  const vector<std::string>::size_type idx = pos - fSysId.begin();
301  if (fComponentsToSend[idx] > 0) {
302  LOG(info) << "Create timeslice component for link " << nrComp;
303 
304  fles::StorableTimeslice component {
305  static_cast<uint32_t>(ts.num_microslices(nrComp), ts.index())};
306  component.append_component(ts.num_microslices(0));
307 
308  for (size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
309  component.append_microslice(
310  0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m));
311  }
312  if (!SendData(component, idx)) return false;
313  return true;
314  }
315  }
316  return true;
317 }
318 
319 bool CbmMQTsaSampler::SendData(const fles::StorableTimeslice& component,
320  int idx) {
321  // serialize the timeslice and create the message
322  std::stringstream oss;
323  boost::archive::binary_oarchive oa(oss);
324  oa << component;
325  std::string* strMsg = new std::string(oss.str());
326 
327  FairMQMessagePtr msg(NewMessage(
328  const_cast<char*>(strMsg->c_str()), // data
329  strMsg->length(), // size
330  [](void* /*data*/, void* object) {
331  delete static_cast<std::string*>(object);
332  },
333  strMsg)); // object that manages the data
334 
335  // TODO: Implement sending same data to more than one channel
336  // Need to create new message (copy message??)
337  if (fComponentsToSend[idx] > 1) { LOG(info) << "Need to copy FairMessage"; }
338 
339  // in case of error or transfer interruption,
340  // return false to go to IDLE state
341  // successfull transfer will return number of bytes
342  // transfered (can be 0 if sending an empty message).
343 
344  LOG(info) << "Send data to channel " << fChannelsToSend[idx][0];
345  if (Send(msg, fChannelsToSend[idx][0]) < 0) {
346  LOG(error) << "Problem sending data";
347  return false;
348  }
349 
350  fMessageCounter++;
351  LOG(info) << "Send message " << fMessageCounter << " with a size of "
352  << msg->GetSize();
353 
354  return true;
355 }
356 
357 
359 
361  std::chrono::duration<double> run_time =
362  std::chrono::steady_clock::now() - fTime;
363 
364  LOG(info) << "Runtime: " << run_time.count();
365  LOG(info) << "No more input data";
366 }
367 
368 
370  const fles::MicrosliceDescriptor& mdsc) {
371  LOG(info) << "Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id)
372  << std::dec;
373  LOG(info) << "Header version: Ox" << std::hex
374  << static_cast<int>(mdsc.hdr_ver) << std::dec;
375  LOG(info) << "Equipement ID: " << mdsc.eq_id;
376  LOG(info) << "Flags: " << mdsc.flags;
377  LOG(info) << "Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id)
378  << std::dec;
379  LOG(info) << "Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver)
380  << std::dec;
381  LOG(info) << "Microslice Idx: " << mdsc.idx;
382  LOG(info) << "Checksum: " << mdsc.crc;
383  LOG(info) << "Size: " << mdsc.size;
384  LOG(info) << "Offset: " << mdsc.offset;
385 }
386 
387 bool CbmMQTsaSampler::CheckTimeslice(const fles::Timeslice& ts) {
388  if (0 == ts.num_components()) {
389  LOG(error) << "No Component in TS " << ts.index();
390  return 1;
391  }
392  LOG(info) << "Found " << ts.num_components()
393  << " different components in timeslice";
394 
395  for (size_t c = 0; c < ts.num_components(); ++c) {
396  LOG(info) << "Found " << ts.num_microslices(c)
397  << " microslices in component " << c;
398  LOG(info) << "Component " << c << " has a size of " << ts.size_component(c)
399  << " bytes";
400  LOG(info) << "Component " << c << " has the system id 0x" << std::hex
401  << static_cast<int>(ts.descriptor(c, 0).sys_id) << std::dec;
402  LOG(info) << "Component " << c << " has the system id 0x"
403  << static_cast<int>(ts.descriptor(c, 0).sys_id);
404 
405  /*
406  for (size_t m = 0; m < ts.num_microslices(c); ++m) {
407  PrintMicroSliceDescriptor(ts.descriptor(c,m));
408  }
409 */
410  }
411 
412  return true;
413 }
CbmMQTsaSampler::fTime
std::chrono::steady_clock::time_point fTime
Definition: CbmMQTsaSampler.h:61
CbmMQTsaSampler::fAllowedChannels
std::vector< std::string > fAllowedChannels
Definition: CbmMQTsaSampler.h:75
CbmMQTsaSampler::IsChannelNameAllowed
bool IsChannelNameAllowed(std::string)
Definition: CbmMQTsaSampler.cxx:222
CbmMQTsaSampler::fHost
std::string fHost
Definition: CbmMQTsaSampler.h:37
CbmMQTsaSampler::fSysId
std::vector< int > fSysId
Definition: CbmMQTsaSampler.h:78
CbmMQTsaSampler::SendData
bool SendData(const fles::StorableTimeslice &component)
InitTaskError
CBM headers.
Definition: CbmDeviceEventBuilderEtofStar2019.cxx:36
CbmMQTsaSampler::fFileName
std::string fFileName
Definition: CbmMQTsaSampler.h:32
CbmMQTsaSampler::fComponentsToSend
std::vector< int > fComponentsToSend
Definition: CbmMQTsaSampler.h:81
CbmMQTsaSampler::InitTask
virtual void InitTask()
Definition: CbmMQTsaSampler.cxx:55
cbm::mq::Transition::ErrorFound
@ ErrorFound
CbmMQTsaSampler.h
CbmMQTsaSampler::fTSCounter
uint64_t fTSCounter
Definition: CbmMQTsaSampler.h:41
CbmMQTsaSampler::CheckTimeslice
bool CheckTimeslice(const fles::Timeslice &ts)
Definition: CbmMQTsaSampler.cxx:387
CbmMQTsaSampler::fMaxTimeslices
uint64_t fMaxTimeslices
Definition: CbmMQTsaSampler.h:30
CbmMQTsaSampler::PrintMicroSliceDescriptor
void PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor &mdsc)
Definition: CbmMQTsaSampler.cxx:369
CbmMQTsaSampler::~CbmMQTsaSampler
virtual ~CbmMQTsaSampler()
Definition: CbmMQTsaSampler.cxx:358
CbmMQTsaSampler::fSource
fles::TimesliceSource * fSource
Definition: CbmMQTsaSampler.h:60
v
__m128 v
Definition: L1/vectors/P4_F32vec4.h:1
CbmMQTsaSampler::CbmMQTsaSampler
CbmMQTsaSampler()
Definition: CbmMQTsaSampler.cxx:40
CbmMQTsaSampler::fMessageCounter
uint64_t fMessageCounter
Definition: CbmMQTsaSampler.h:42
x
Double_t x
Definition: CbmMvdSensorDigiToHitTask.cxx:68
CbmMQTsaSampler::fInputFileList
std::vector< std::string > fInputFileList
List of input files.
Definition: CbmMQTsaSampler.h:35
m
__m128 m
Definition: L1/vectors/P4_F32vec4.h:26
CbmMQTsaSampler::CalcRuntime
void CalcRuntime()
Definition: CbmMQTsaSampler.cxx:360
pos
TVector3 pos
Definition: CbmMvdSensorDigiToHitTask.cxx:60
CbmMQTsaSampler::ConditionalRun
virtual bool ConditionalRun()
Definition: CbmMQTsaSampler.cxx:245
CbmMQTsaSampler::OpenNextFile
bool OpenNextFile()
Definition: CbmMQTsaSampler.cxx:197
CbmMQTsaSampler::fChannelsToSend
std::vector< std::vector< std::string > > fChannelsToSend
Definition: CbmMQTsaSampler.h:82
CbmMQDefs.h
cbm::mq::ChangeState
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)
Definition: CbmMQDefs.h:19
CbmMQTsaSampler::CreateAndSendComponent
bool CreateAndSendComponent(const fles::Timeslice &, int)
Definition: CbmMQTsaSampler.cxx:287
CbmMQTsaSampler::fDirName
std::string fDirName
Definition: CbmMQTsaSampler.h:33
CbmMQTsaSampler::fPort
uint64_t fPort
Definition: CbmMQTsaSampler.h:38