CbmRoot
CbmMQTsaMultiSampler.cxx
Go to the documentation of this file.
1 
9 #include "CbmMQTsaMultiSampler.h"
10 
11 #include "CbmMQDefs.h"
12 //#include "CbmFlesCanvasTools.h"
14 
15 #include "FairMQLogger.h"
16 #include "FairMQProgOptions.h" // device->fConfig
17 //#include "RootSerializer.h"
18 
19 #include "TimesliceInputArchive.hpp"
20 #include "TimesliceMultiInputArchive.hpp"
21 #include "TimesliceMultiSubscriber.hpp"
22 #include "TimesliceSubscriber.hpp"
23 
24 #include <boost/algorithm/string.hpp>
25 #include <boost/archive/binary_oarchive.hpp>
26 #include <boost/filesystem.hpp>
27 #include <boost/regex.hpp>
28 
29 namespace filesys = boost::filesystem;
30 
31 #include <algorithm>
32 #include <chrono>
33 #include <ctime>
34 #include <iomanip>
35 #include <sstream>
36 #include <stdio.h>
37 #include <string>
38 #include <thread> // this_thread::sleep_for
39 
40 using namespace std;
41 
42 #include <stdexcept>
43 
44 struct InitTaskError : std::runtime_error {
45  using std::runtime_error::runtime_error;
46 };
47 
49  : FairMQDevice()
50  , fMaxTimeslices(0)
51  , fFileName("")
52  , fDirName("")
53  , fInputFileList()
54  , fFileCounter(0)
55  , fHost("")
56  , fPort(0)
57  , fHighWaterMark(1)
58  , fTSCounter(0)
59  , fMessageCounter(0)
60  , fSource(nullptr)
61  , fTime() {}
62 
64  // Get the values from the command line options (via fConfig)
65  fFileName = fConfig->GetValue<string>("filename");
66  fDirName = fConfig->GetValue<string>("dirname");
67  fHost = fConfig->GetValue<string>("flib-host");
68  fPort = fConfig->GetValue<uint64_t>("flib-port");
69  fHighWaterMark = fConfig->GetValue<uint64_t>("high-water-mark");
70  fMaxTimeslices = fConfig->GetValue<uint64_t>("max-timeslices");
71  fbNoSplitTs = fConfig->GetValue<bool>("no-split-ts");
72  fbSendTsPerSysId = fConfig->GetValue<bool>("send-ts-per-sysid");
73  fbSendTsPerChannel = fConfig->GetValue<bool>("send-ts-per-channel");
74  fsChannelNameMissedTs = fConfig->GetValue<std::string>("ChNameMissTs");
75  fsChannelNameCommands = fConfig->GetValue<std::string>("ChNameCmds");
76  /*
77  fuPublishFreqTs = fConfig->GetValue< uint32_t >( "PubFreqTs" );
78  fdMinPublishTime = fConfig->GetValue< double_t >( "PubTimeMin" );
79  fdMaxPublishTime = fConfig->GetValue< double_t >( "PubTimeMax" );
80  fsChannelNameHistosInput = fConfig->GetValue< std::string >( "ChNameIn" );
81  fsChannelNameHistosConfig = fConfig->GetValue< std::string >( "ChNameHistCfg" );
82  fsChannelNameCanvasConfig = fConfig->GetValue< std::string >( "ChNameCanvCfg" );
83 */
84  if (fbNoSplitTs) {
85  if (fbSendTsPerSysId) {
86  if (fbSendTsPerChannel) {
87  LOG(warning) << "Both no-split-ts, send-ts-per-sysid and "
88  "send-ts-per-channel options used => "
89  << " second and third one will be ignored!!!!";
90  } // if( fbSendTsPerSysId )
91  else
92  LOG(warning)
93  << "Both no-split-ts and send-ts-per-sysid options used => "
94  << " second one will be ignored!!!!";
95  } // if( fbSendTsPerSysId )
96  else if (fbSendTsPerChannel) {
97  LOG(warning)
98  << "Both no-split-ts and send-ts-per-channel options used => "
99  << " second one will be ignored!!!!";
100  } // else if( fbSendTsPerSysId ) of if( fbSendTsPerSysId )
101  } // if( fbNoSplitTs )
102  else if (fbSendTsPerSysId && fbSendTsPerChannel) {
103  LOG(warning)
104  << "Both send-ts-per-sysid and send-ts-per-channel options used => "
105  << " second one will be ignored!!!!";
106  } // else if( fbSendTsPerSysId && fbSendTsPerSysId ) of if( fbNoSplitTs )
107 
109  std::vector<std::string> vSysIdChanPairs =
110  fConfig->GetValue<std::vector<std::string>>("sysid-chan");
111  for (uint32_t uPair = 0; uPair < vSysIdChanPairs.size(); ++uPair) {
112  const size_t sep = vSysIdChanPairs[uPair].find(':');
113  if (string::npos == sep || 0 == sep
114  || vSysIdChanPairs[uPair].size() == sep) {
115  LOG(info) << vSysIdChanPairs[uPair];
116  throw InitTaskError(
117  "Provided pair of SysId + Channel name is missing a : or an argument.");
118  } // if( string::npos == sep || 0 == sep || vSysIdChanPairs[ uPair ].size() == sep )
119 
121  std::string sSysId = vSysIdChanPairs[uPair].substr(0, sep);
122  const size_t hexPos = sSysId.find("0x");
123  int iSysId;
124  if (string::npos == hexPos)
125  iSysId = std::stoi(sSysId);
126  else
127  iSysId = std::stoi(sSysId.substr(hexPos + 2), nullptr, 16);
128 
130  std::string sChannelName = vSysIdChanPairs[uPair].substr(sep + 1);
131 
133  const vector<int>::const_iterator pos =
134  std::find(fSysId.begin(), fSysId.end(), iSysId);
135  if (fSysId.end() != pos) {
137  const vector<std::string>::size_type idx = pos - fSysId.begin();
138  fAllowedChannels[idx] = sChannelName;
139  } // if( fSysId.end() != pos )
140  else {
142  fSysId.push_back(iSysId);
143  fAllowedChannels.push_back(sChannelName);
144  } // else of if( fSysId.end() != pos )
145 
146  LOG(info) << vSysIdChanPairs[uPair] << " " << iSysId << " " << sChannelName;
147  } // for( uint32_t uPair = 0; uPair < vSysIdChanPairs.size(); ++uPair )
148 
149  if (0 == fMaxTimeslices) fMaxTimeslices = UINT_MAX;
150 
151  // Check which input is defined
152  // Posibilities
153  // filename && ! dirname : single file
154  // filename with wildcards && diranme : all files with filename regex in the directory
155  // host && port : connect to the flim server
156 
157  bool isGoodInputCombi {false};
158  if (0 != fFileName.size() && 0 == fDirName.size() && 0 == fHost.size()
159  && 0 == fPort) {
160  isGoodInputCombi = true;
161  fInputFileList.push_back(fFileName);
162  } else if (0 != fFileName.size() && 0 != fDirName.size() && 0 == fHost.size()
163  && 0 == fPort) {
164  isGoodInputCombi = true;
165  fInputFileList.push_back(fFileName);
166  } else if (0 == fFileName.size() && 0 == fDirName.size() && 0 != fHost.size()
167  && 0 != fPort) {
168  isGoodInputCombi = true;
169  LOG(info) << "Host: " << fHost;
170  LOG(info) << "Port: " << fPort;
171  } else if (0 == fFileName.size() && 0 == fDirName.size() && 0 != fHost.size()
172  && 0 == fPort) {
173  isGoodInputCombi = true;
174  LOG(info) << "Host string: " << fHost;
175  } else {
176  isGoodInputCombi = false;
177  }
178 
179 
180  if (!isGoodInputCombi) {
181  throw InitTaskError(
182  "Wrong combination of inputs. Either file or wildcard file + directory "
183  "or host + port are allowed combination.");
184  }
185 
186 
187  LOG(info) << "MaxTimeslices: " << fMaxTimeslices;
188 
189  // Get the information about created channels from the device
190  // Check if the defined channels from the topology (by name)
191  // are in the list of channels which are possible/allowed
192  // for the device
193  // The idea is to check at initilization if the devices are
194  // properly connected. For the time beeing this is done with a
195  // nameing convention. It is not avoided that someone sends other
196  // data on this channel.
197  int noChannel = fChannels.size();
198  LOG(info) << "Number of defined output channels: " << noChannel;
199  for (auto const& entry : fChannels) {
201  if (entry.first == fsChannelNameMissedTs
202  || entry.first == fsChannelNameCommands) {
203  continue;
204  } // if( entry.first == fsChannelNameMissedTs || entry.first == fsChannelNameCommands )
205 
206  LOG(info) << "Channel name: " << entry.first;
207  if (!IsChannelNameAllowed(entry.first))
208  throw InitTaskError("Channel name does not match.");
209  }
210 
211  for (auto const& value : fComponentsToSend) {
212  LOG(info) << "Value : " << value;
213  if (value > 1) {
214  throw InitTaskError("Sending same data to more than one output channel "
215  "not implemented yet.");
216  }
217  }
218 
219 
220  if (0 == fFileName.size() && 0 != fHost.size() && 0 != fPort) {
221  // Don't add the protocol since this is done now in the TimesliceMultiSubscriber
222  //std::string connector = "tcp://" + fHost + ":" + std::to_string(fPort);
223  std::string connector = fHost + ":" + std::to_string(fPort);
224  LOG(info) << "Open TSPublisher at " << connector;
225  fSource = new fles::TimesliceMultiSubscriber(connector);
226  } else if (0 == fFileName.size() && 0 != fHost.size()) {
227  std::string connector = fHost;
228  LOG(info) << "Open TSPublisher with host string: " << connector;
229  fSource = new fles::TimesliceMultiSubscriber(connector, fHighWaterMark);
230  } else {
231  // Create a ";" separated string with all file names
232  std::string fileList {""};
233  for (const auto obj : fInputFileList) {
234  std::string fileName = obj;
235  fileList += fileName;
236  fileList += ";";
237  }
238  fileList.pop_back(); // Remove the last ;
239  LOG(info) << "Input File String: " << fileList;
240  fSource = new fles::TimesliceMultiInputArchive(fileList, fDirName);
241  if (!fSource) { throw InitTaskError("Could open files from file list."); }
242  }
243 
244  LOG(info) << "High-Water Mark: " << fHighWaterMark;
245  LOG(info) << "Max. Timeslices: " << fMaxTimeslices;
246  if (fbNoSplitTs) {
247  LOG(info) << "Sending TS copies in no-split mode";
248  } // if( fbNoSplitTs )
249  else if (fbSendTsPerSysId) {
250  LOG(info) << "Sending components in separate TS per SysId";
251  } // else if( fbSendTsPerSysId && fbSendTsPerSysId ) of if( fbNoSplitTs
252  else if (fbSendTsPerChannel) {
253  LOG(info) << "Sending components in separate TS per channel";
254  } // else if( fbSendTsPerSysId && fbSendTsPerSysId ) of if( fbNoSplitTs )
255  /*
256  LOG(info) << "Histograms publication frequency in TS: " << fuPublishFreqTs;
257  LOG(info) << "Histograms publication min. interval in s: " << fdMinPublishTime;
258  LOG(info) << "Histograms publication max. interval in s: " << fdMaxPublishTime;
259 */
260 
261  /*
264  initOK &= CreateHistograms();
265 
270  for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
271  {
272 // LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName()
273 // << " in " << vHistos[ uHisto ].second.data()
274 // ;
275  fArrayHisto.Add( vHistos[ uHisto ].first );
276  std::pair< std::string, std::string > psHistoConfig( vHistos[ uHisto ].first->GetName(),
277  vHistos[ uHisto ].second );
278  fvpsHistosFolder.push_back( psHistoConfig );
279 
281  FairMQMessagePtr messageHist( NewMessage() );
282  Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageHist, psHistoConfig );
283 
285  if( Send( messageHist, fsChannelNameHistosConfig ) < 0 )
286  {
287  LOG(error) << "Problem sending histo config";
288  return false;
289  } // if( Send( messageHist, fsChannelNameHistosConfig ) < 0 )
290 
291  LOG(info) << "Config of hist " << psHistoConfig.first.data()
292  << " in folder " << psHistoConfig.second.data() ;
293  } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
294 
298  for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
299  {
300 // LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName()
301 // << " in " << vCanvases[ uCanv ].second.data();
302  std::string sCanvName = (vCanvases[ uCanv ].first)->GetName();
303  std::string sCanvConf = GenerateCanvasConfigString( vCanvases[ uCanv ].first );
304 
305  std::pair< std::string, std::string > psCanvConfig( sCanvName, sCanvConf );
306 
307  fvpsCanvasConfig.push_back( psCanvConfig );
308 
310  FairMQMessagePtr messageCan( NewMessage() );
311  Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageCan, psCanvConfig );
312 
314  if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 )
315  {
316  LOG(error) << "Problem sending canvas config";
317  return false;
318  } // if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 )
319 
320  LOG(info) << "Config string of Canvas " << psCanvConfig.first.data()
321  << " is " << psCanvConfig.second.data() ;
322  } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
323 */
324 
325  fTime = std::chrono::steady_clock::now();
326 } catch (InitTaskError& e) {
327  LOG(error) << e.what();
328  // Wrapper defined in CbmMQDefs.h to support different FairMQ versions
330 }
331 
332 bool CbmMQTsaMultiSampler::IsChannelNameAllowed(std::string channelName) {
334  if (fbNoSplitTs) {
335  fComponentsToSend[0]++;
336  fChannelsToSend[0].push_back(channelName);
337  return true;
338  } // if( fbNoSplitTs )
339 
340  bool bFoundMatch = false;
341  // for(auto const &entry : fAllowedChannels) {
342  for (uint32_t idx = 0; idx < fAllowedChannels.size(); ++idx) {
343  auto const& entry = fAllowedChannels[idx];
344  LOG(info) << "Looking for name " << channelName << " in " << entry;
345  std::size_t pos1 = channelName.find(entry);
346  if (pos1 != std::string::npos) {
347  /*
348  const vector<std::string>::const_iterator pos =
349  std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
350  const vector<std::string>::size_type idx = pos-fAllowedChannels.begin();
351 */
352  LOG(info) << "Found " << entry << " in " << channelName;
353  LOG(info) << "Channel name " << channelName
354  << " found in list of allowed channel names at position " << idx
355  << " (SysId 0x" << std::hex << fSysId[idx] << std::dec << ")";
356  fComponentsToSend[idx]++;
357  fChannelsToSend[idx].push_back(channelName);
358 
360  if (fbSendTsPerChannel)
361  bFoundMatch = true;
362  else
363  return true;
364  } // if (pos1!=std::string::npos)
365  }
367  if (fbSendTsPerChannel && bFoundMatch) return true;
368 
369  LOG(info) << "Channel name " << channelName
370  << " not found in list of allowed channel names.";
371  LOG(error) << "Stop device.";
372  return false;
373 }
374 
376 
377 
378  auto timeslice = fSource->get();
379  if (timeslice) {
380  if (fTSCounter < fMaxTimeslices) {
381  fTSCounter++;
382 
383  const fles::Timeslice& ts = *timeslice;
384  uint64_t uTsIndex = ts.index();
385 
387  if ((uTsIndex != (fuPrevTsIndex + 1))
388  && (0 != fuPrevTsIndex && 0 != uTsIndex)
389  && "" != fsChannelNameMissedTs) {
390  LOG(debug) << "Missed Timeslices. Old TS Index was " << fuPrevTsIndex
391  << " New TS Index is " << uTsIndex;
393  std::vector<uint64_t> vulMissedIndices;
394  for (uint64_t ulMiss = fuPrevTsIndex + 1; ulMiss < uTsIndex; ++ulMiss) {
395  vulMissedIndices.emplace_back(ulMiss);
396  } // for( uint64_t ulMiss = fuPrevTsIndex + 1; ulMiss < uTsIndex; ++ulMiss )
397  if (!SendMissedTsIdx(vulMissedIndices)) {
399  if ("" != fsChannelNameCommands) {
401  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
402  SendCommand("STOP");
403  } // if( "" != fsChannelNameCommands )
404 
405  return false;
406  } // if( !SendMissedTsIdx( vulMissedIndices ) )
407  } // if( ( uTsIndex != ( fuPrevTsIndex + 1 ) ) && ( 0 != fuPrevTsIndex && 0 != uTsIndex ) && "" != fsChannelNameMissedTs )
408  fuPrevTsIndex = uTsIndex;
409 
410  if (fTSCounter % 10000 == 0) {
411  LOG(info) << "Received TS " << fTSCounter << " with index " << uTsIndex;
412  }
413 
414  LOG(debug) << "Found " << ts.num_components()
415  << " different components in timeslice";
416 
417 
418  // CheckTimeslice(ts);
419 
420  if (fbNoSplitTs) {
423  if (!CreateAndSendFullTs(ts)) {
425  if ("" != fsChannelNameCommands) {
427  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
428  SendCommand("STOP");
429  } // if( "" != fsChannelNameCommands )
430 
431  return false;
432  } // if( !CreateAndSendFullTs( ts ) )
433  } // if( fbNoSplitTs )
434  else if (fbSendTsPerSysId) {
439  if ("" != fsChannelNameCommands) {
441  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
442  SendCommand("STOP");
443  } // if( "" != fsChannelNameCommands )
444 
445  return false;
446  } // if( !CreateAndCombineComponentsPerSysId( ts ) )
447  } // else if( fbSendTsPerSysId ) of if( fbNoSplitTs )
448  else if (fbSendTsPerChannel) {
453  if ("" != fsChannelNameCommands) {
455  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
456  SendCommand("STOP");
457  } // if( "" != fsChannelNameCommands )
458 
459  return false;
460  } // if( !CreateAndCombineComponentsPerChannel( ts ) )
461  } // else if( fbSendTsPerChannel ) of if( fbSendTsPerSysId )
462  else {
463  for (unsigned int nrComp = 0; nrComp < ts.num_components(); ++nrComp) {
464  if (!CreateAndSendComponent(ts, nrComp)) {
466  if ("" != fsChannelNameCommands) {
468  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
469  SendCommand("STOP");
470  } // if( "" != fsChannelNameCommands )
471 
472  return false;
473  } // if( !CreateAndSendComponent(ts, nrComp) )
474  } // for (unsigned int nrComp = 0; nrComp < ts.num_components(); ++nrComp)
475  } // else of if( fbSendTsPerSysId )
476  return true;
477  } else {
478  CalcRuntime();
479 
481  if ("" != fsChannelNameCommands) {
483  std::this_thread::sleep_for(std::chrono::seconds(10));
484  std::string sCmd = "EOF ";
486  sCmd += " ";
487  sCmd += FormatDecPrintout(fTSCounter);
488  SendCommand(sCmd);
489  } // if( "" != fsChannelNameCommands )
490 
491  return false;
492  }
493  } else {
494  CalcRuntime();
495 
497  if ("" != fsChannelNameCommands) {
499  std::this_thread::sleep_for(std::chrono::seconds(10));
500  std::string sCmd = "EOF ";
502  sCmd += " ";
503  sCmd += FormatDecPrintout(fTSCounter);
504  SendCommand(sCmd);
505  } // if( "" != fsChannelNameCommands )
506 
507  return false;
508  }
509  /*
513  std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
514  std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime;
515  if( ( fdMaxPublishTime < elapsedSeconds.count() ) ||
516  ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
517  {
518  SendHistograms();
519  fLastPublishTime = std::chrono::system_clock::now();
520  } // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
521 */
522 }
523 
524 bool CbmMQTsaMultiSampler::CreateAndSendComponent(const fles::Timeslice& ts,
525  int nrComp) {
526 
527  // Check if component has to be send. If the corresponding channel
528  // is connected create the new timeslice and send it to the
529  // correct channel
530 
531  LOG(debug) << "SysID: " << static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
532  const vector<int>::const_iterator pos =
533  std::find(fSysId.begin(),
534  fSysId.end(),
535  static_cast<int>(ts.descriptor(nrComp, 0).sys_id));
536  if (pos != fSysId.end()) {
537  const vector<std::string>::size_type idx = pos - fSysId.begin();
538  if (fComponentsToSend[idx] > 0) {
539  LOG(debug) << "Create timeslice component for link " << nrComp;
540 
541  fles::StorableTimeslice component {
542  static_cast<uint32_t>(ts.num_core_microslices()), ts.index()};
543  component.append_component(ts.num_microslices(0));
544 
545  for (size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
546  component.append_microslice(
547  0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m));
548  }
549  /*
550  LOG(info) << "Number of core microslices before: " << ts.num_core_microslices();
551  LOG(info) << "Number of core microslices after : " << component.num_core_microslices();
552  LOG(info) << "Number of microslices: " << component.num_microslices(0);
553 */
554  if (!SendData(component, idx)) return false;
555  return true;
556  }
557  }
558  return true;
559 }
560 
562  const fles::Timeslice& ts) {
564  if (false == fbListCompPerSysIdReady) {
565  for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
566  uint16_t usSysId = ts.descriptor(uCompIdx, 0).sys_id;
567 
568  const vector<int>::const_iterator pos =
569  std::find(fSysId.begin(), fSysId.end(), usSysId);
570  if (fSysId.end() != pos) {
571  const vector<std::string>::size_type idx = pos - fSysId.begin();
572 
573  fvvCompPerSysId[idx].push_back(uCompIdx);
574  } // if( fSysId.end() != pos )
575  } // for( uint32_t uNrComp = 0; uNrComp < ts.num_components(); ++uNrComp )
576 
577  for (uint32_t uSysIdx = 0; uSysIdx < fComponentsToSend.size(); ++uSysIdx) {
578  std::stringstream ss;
579  ss << "Found " << std::setw(2) << fvvCompPerSysId[uSysIdx].size()
580  << " components for SysId 0x" << std::hex << std::setw(2)
581  << fSysId[uSysIdx] << std::dec << " :";
582 
583  for (uint32_t uComp = 0; uComp < fvvCompPerSysId[uSysIdx].size();
584  ++uComp) {
585  ss << " " << std::setw(3) << fvvCompPerSysId[uSysIdx][uComp];
586  } // for( uint32_t uComp = 0; uComp < fvvCompPerSysId[ uSysIdx ].size(); ++uComp )
587 
588  LOG(info) << ss.str();
589  } // for( uint32_t uSysId = 0; uSysId < fSysId.size(); ++uSysId )
590 
592  } // if( false == fbListCompPerSysIdReady )
593 
595  for (uint32_t uSysIdx = 0; uSysIdx < fComponentsToSend.size(); ++uSysIdx) {
596  if (0 < fComponentsToSend[uSysIdx]) {
597  LOG(debug) << "Create timeslice with components for SysId " << std::hex
598  << fSysId[uSysIdx] << std::dec;
599 
600  if (0 < fvvCompPerSysId[uSysIdx].size()) {
601  fles::StorableTimeslice component {
602  static_cast<uint32_t>(ts.num_core_microslices()), ts.index()};
603 
604  for (uint32_t uComp = 0; uComp < fvvCompPerSysId[uSysIdx].size();
605  ++uComp) {
606  uint32_t uNumMsInComp =
607  ts.num_microslices(fvvCompPerSysId[uSysIdx][uComp]);
608  component.append_component(uNumMsInComp);
609 
610  LOG(debug) << "Add components to TS for SysId " << std::hex
611  << fSysId[uSysIdx] << std::dec << " TS " << ts.index()
612  << " Comp " << fvvCompPerSysId[uSysIdx][uComp];
613 
614  for (size_t m = 0; m < uNumMsInComp; ++m) {
615  component.append_microslice(
616  uComp,
617  m,
618  ts.descriptor(fvvCompPerSysId[uSysIdx][uComp], m),
619  ts.content(fvvCompPerSysId[uSysIdx][uComp], m));
620  } // for( size_t m = 0; m < uNumMsInComp; ++m )
621  } // for( uint32_t uComp = 0; uComp < fvvCompPerSysId[ uSysIdx ].size(); ++uComp )
622 
623  LOG(debug) << "Prepared timeslice for SysId " << std::hex
624  << fSysId[uSysIdx] << std::dec << " with "
625  << component.num_components() << " components";
626 
627  if (!SendData(component, uSysIdx)) return false;
628  } // if( 0 < fvvCompPerSysId[ uSysIdx ].size() )
629  } // if( 0 < fComponentsToSend[ uSysIdx ] )
630  } // for( uSysIdx = 0; uSysIdx < fComponentsToSend.size(); ++uSysIdx )
631 
632  return true;
633 }
634 
636  const fles::Timeslice& ts) {
637 
639  if (false == fbListCompPerChannelReady) {
641  for (uint32_t uSysIdx = 0; uSysIdx < fComponentsToSend.size(); ++uSysIdx) {
642  if (0 < fComponentsToSend[uSysIdx]) {
643  for (uint32_t uChan = 0; uChan < fChannelsToSend[uSysIdx].size();
644  ++uChan) {
645  const vector<std::string>::const_iterator pos =
646  std::find(fvChannelsToSend.begin(),
647  fvChannelsToSend.end(),
648  fChannelsToSend[uSysIdx][uChan]);
649  if (fvChannelsToSend.end() == pos) {
650  fvChannelsToSend.push_back(fChannelsToSend[uSysIdx][uChan]);
651  fvvCompPerChannel.push_back(std::vector<uint32_t>());
652  }
653  } // for( uChan = 0; uChan < fChannelsToSend[ uSysIdx ].size(); ++ uChan )
654  } // if( 0 < fComponentsToSend[ uSysIdx ] )
655  } // for( uint32_t uSysIdx = 0; uSysIdx < fComponentsToSend.size(); ++uSysIdx )
656 
658  fvvCompPerChannel.resize(fvChannelsToSend.size());
659 
662  for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
663  uint16_t usSysId = ts.descriptor(uCompIdx, 0).sys_id;
664 
665  const vector<int>::const_iterator pos =
666  std::find(fSysId.begin(), fSysId.end(), usSysId);
667  if (fSysId.end() != pos) {
668  const vector<std::string>::size_type idxSys = pos - fSysId.begin();
669 
670  if (0 < fComponentsToSend[idxSys]) {
671  for (uint32_t uChan = 0; uChan < fChannelsToSend[idxSys].size();
672  ++uChan) {
673  const vector<std::string>::const_iterator posCh =
674  std::find(fvChannelsToSend.begin(),
675  fvChannelsToSend.end(),
676  fChannelsToSend[idxSys][uChan]);
677  if (fvChannelsToSend.end() != posCh) {
678  const vector<std::string>::size_type idxChan =
679  posCh - fvChannelsToSend.begin();
680  fvvCompPerChannel[idxChan].push_back(uCompIdx);
681  } // if( fvChannelsToSend.end() != posCh )
682  } // for( uChan = 0; uChan < fChannelsToSend[ idxSys ].size(); ++ uChan )
683  } // if( 0 < fComponentsToSend[ uSysIdx ] )
684  } // if( fSysId.end() != pos )
685  } // for( uint32_t uNrComp = 0; uNrComp < ts.num_components(); ++uNrComp )
686 
687  for (uint32_t uChanIdx = 0; uChanIdx < fvChannelsToSend.size();
688  ++uChanIdx) {
689  std::stringstream ss;
690  ss << "Found " << std::setw(2) << fvvCompPerChannel[uChanIdx].size()
691  << " components for channel " << fvChannelsToSend[uChanIdx] << " :";
692 
693  for (uint32_t uComp = 0; uComp < fvvCompPerChannel[uChanIdx].size();
694  ++uComp) {
695  ss << " " << std::setw(3) << fvvCompPerChannel[uChanIdx][uComp];
696  } // for( uint32_t uComp = 0; uComp < fvvCompPerChannel[ uChanIdx ].size(); ++uComp )
697 
698  LOG(info) << ss.str();
699  } // for( uint32_t uChanIdx = 0; uChanIdx < fvChannelsToSend.size(); ++uChanIdx )
700 
702  } // if( false == fbListCompPerSysIdReady )
703 
706 
708  for (uint32_t uChanIdx = 0; uChanIdx < fvChannelsToSend.size(); ++uChanIdx) {
709  LOG(debug) << "Create timeslice with components for channel "
710  << fvChannelsToSend[uChanIdx];
711 
712  if (0 < fvvCompPerChannel[uChanIdx].size()) {
713  fles::StorableTimeslice component {
714  static_cast<uint32_t>(ts.num_core_microslices()), ts.index()};
715 
716  for (uint32_t uComp = 0; uComp < fvvCompPerChannel[uChanIdx].size();
717  ++uComp) {
718  uint32_t uNumMsInComp =
719  ts.num_microslices(fvvCompPerChannel[uChanIdx][uComp]);
720  component.append_component(uNumMsInComp);
721 
722  LOG(debug)
723  << "Add components to TS for SysId " << std::hex
724  << static_cast<uint16_t>(
725  ts.descriptor(fvvCompPerChannel[uChanIdx][uComp], 0).sys_id)
726  << std::dec << " TS " << ts.index() << " Comp "
727  << fvvCompPerChannel[uChanIdx][uComp];
728 
729  for (size_t m = 0; m < uNumMsInComp; ++m) {
730  component.append_microslice(
731  uComp,
732  m,
733  ts.descriptor(fvvCompPerChannel[uChanIdx][uComp], m),
734  ts.content(fvvCompPerChannel[uChanIdx][uComp], m));
735  } // for( size_t m = 0; m < uNumMsInComp; ++m )
736  } // for( uint32_t uComp = 0; uComp < fvvCompPerChannel[ uChanIdx ].size(); ++uComp )
737 
738  LOG(debug) << "Prepared timeslice for channel "
739  << fvChannelsToSend[uChanIdx] << " with "
740  << component.num_components() << " components";
741 
742  if (!SendData(component, fvChannelsToSend[uChanIdx])) return false;
743  } // if( 0 < fvvCompPerSysId[ uSysIdx ].size() )
744  } // for( uChanIdx = 0; uChanIdx < fvChannelsToSend.size(); ++uChanIdx )
745 
746  return true;
747 }
748 
749 bool CbmMQTsaMultiSampler::CreateAndSendFullTs(const fles::Timeslice& ts) {
751  for (uint32_t uChanIdx = 0; uChanIdx < fChannelsToSend.size(); ++uChanIdx) {
752  if (0 < fComponentsToSend[uChanIdx]) {
753  LOG(debug) << "Copy timeslice component for channel "
754  << fChannelsToSend[uChanIdx][0];
755 
756  fles::StorableTimeslice fullTs {ts};
757  if (!SendData(fullTs, uChanIdx)) return false;
758  } // if( 0 < fComponentsToSend[ uChanIdx ] )
759  } // for( uint32_t uChanIdx = 0; uChanIdx < fChannelsToSend.size(); ++uChanIdx )
760  return true;
761 }
762 
763 bool CbmMQTsaMultiSampler::SendData(const fles::StorableTimeslice& component,
764  int idx) {
765  // serialize the timeslice and create the message
766  std::stringstream oss;
767  boost::archive::binary_oarchive oa(oss);
768  oa << component;
769  std::string* strMsg = new std::string(oss.str());
770 
771  FairMQMessagePtr msg(NewMessage(
772  const_cast<char*>(strMsg->c_str()), // data
773  strMsg->length(), // size
774  [](void* /*data*/, void* object) {
775  delete static_cast<std::string*>(object);
776  },
777  strMsg)); // object that manages the data
778 
779  // TODO: Implement sending same data to more than one channel
780  // Need to create new message (copy message??)
781  if (fComponentsToSend[idx] > 1) { LOG(info) << "Need to copy FairMessage"; }
782 
783  // in case of error or transfer interruption,
784  // return false to go to IDLE state
785  // successfull transfer will return number of bytes
786  // transfered (can be 0 if sending an empty message).
787 
788  LOG(debug) << "Send data to channel " << fChannelsToSend[idx][0];
789  if (Send(msg, fChannelsToSend[idx][0]) < 0) {
790  LOG(error) << "Problem sending data";
791  return false;
792  }
793 
794  fMessageCounter++;
795  LOG(debug) << "Send message " << fMessageCounter << " with a size of "
796  << msg->GetSize();
797 
798  return true;
799 }
800 
801 bool CbmMQTsaMultiSampler::SendData(const fles::StorableTimeslice& component,
802  std::string sChannel) {
803  // serialize the timeslice and create the message
804  std::stringstream oss;
805  boost::archive::binary_oarchive oa(oss);
806  oa << component;
807  std::string* strMsg = new std::string(oss.str());
808 
809  FairMQMessagePtr msg(NewMessage(
810  const_cast<char*>(strMsg->c_str()), // data
811  strMsg->length(), // size
812  [](void* /*data*/, void* object) {
813  delete static_cast<std::string*>(object);
814  },
815  strMsg)); // object that manages the data
816 
817  // in case of error or transfer interruption,
818  // return false to go to IDLE state
819  // successfull transfer will return number of bytes
820  // transfered (can be 0 if sending an empty message).
821  LOG(debug) << "Send data to channel " << sChannel;
822  if (Send(msg, sChannel) < 0) {
823  LOG(error) << "Problem sending data";
824  return false;
825  }
826 
827  fMessageCounter++;
828  LOG(debug) << "Send message " << fMessageCounter << " with a size of "
829  << msg->GetSize();
830 
831  return true;
832 }
833 bool CbmMQTsaMultiSampler::SendMissedTsIdx(std::vector<uint64_t> vIndices) {
834  // serialize the vector and create the message
835  std::stringstream oss;
836  boost::archive::binary_oarchive oa(oss);
837  oa << vIndices;
838  std::string* strMsg = new std::string(oss.str());
839 
840  FairMQMessagePtr msg(NewMessage(
841  const_cast<char*>(strMsg->c_str()), // data
842  strMsg->length(), // size
843  [](void* /*data*/, void* object) {
844  delete static_cast<std::string*>(object);
845  },
846  strMsg)); // object that manages the data
847 
848  // in case of error or transfer interruption,
849  // return false to go to IDLE state
850  // successfull transfer will return number of bytes
851  // transfered (can be 0 if sending an empty message).
852  LOG(debug) << "Send data to channel " << fsChannelNameMissedTs;
853  if (Send(msg, fsChannelNameMissedTs) < 0) {
854  LOG(error) << "Problem sending missed TS indices to channel "
856  return false;
857  } // if( Send( msg, fsChannelNameMissedTs ) < 0 )
858 
859  return true;
860 }
861 bool CbmMQTsaMultiSampler::SendCommand(std::string sCommand) {
862  // serialize the vector and create the message
863  std::stringstream oss;
864  boost::archive::binary_oarchive oa(oss);
865  oa << sCommand;
866  std::string* strMsg = new std::string(oss.str());
867 
868  FairMQMessagePtr msg(NewMessage(
869  const_cast<char*>(strMsg->c_str()), // data
870  strMsg->length(), // size
871  [](void* /*data*/, void* object) {
872  delete static_cast<std::string*>(object);
873  },
874  strMsg)); // object that manages the data
875 
876  // FairMQMessagePtr msg( NewMessage( const_cast<char*>( sCommand.c_str() ), // data
877  // sCommand.length(), // size
878  // []( void* /*data*/, void* object ){ delete static_cast< std::string * >( object ); },
879  // &sCommand ) ); // object that manages the data
880 
881  // in case of error or transfer interruption,
882  // return false to go to IDLE state
883  // successfull transfer will return number of bytes
884  // transfered (can be 0 if sending an empty message).
885  LOG(debug) << "Send data to channel " << fsChannelNameCommands;
886  if (Send(msg, fsChannelNameCommands) < 0) {
887  LOG(error) << "Problem sending missed TS indices to channel "
889  return false;
890  } // if( Send( msg, fsChannelNameMissedTs ) < 0 )
891 
892  return true;
893 }
894 
895 /*
896 bool CbmMQTsaMultiSampler::SendHistograms()
897 {
899  FairMQMessagePtr message( NewMessage() );
900  Serialize<RootSerializer>( *message, &fArrayHisto );
901 
903  if( Send( message, fsChannelNameHistosInput ) < 0 )
904  {
905  LOG(error) << "Problem sending data";
906  return false;
907  } // if( Send( message, fsChannelNameHistosInput ) < 0 )
908 
910  ResetHistograms();
911 
912  return true;
913 }
914 */
915 
917 
919  std::chrono::duration<double> run_time =
920  std::chrono::steady_clock::now() - fTime;
921 
922  LOG(info) << "Runtime: " << run_time.count();
923  LOG(info) << "No more input data";
924 }
925 
926 
928  const fles::MicrosliceDescriptor& mdsc) {
929  LOG(info) << "Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id)
930  << std::dec;
931  LOG(info) << "Header version: Ox" << std::hex
932  << static_cast<int>(mdsc.hdr_ver) << std::dec;
933  LOG(info) << "Equipement ID: " << mdsc.eq_id;
934  LOG(info) << "Flags: " << mdsc.flags;
935  LOG(info) << "Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id)
936  << std::dec;
937  LOG(info) << "Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver)
938  << std::dec;
939  LOG(info) << "Microslice Idx: " << mdsc.idx;
940  LOG(info) << "Checksum: " << mdsc.crc;
941  LOG(info) << "Size: " << mdsc.size;
942  LOG(info) << "Offset: " << mdsc.offset;
943 }
944 
945 bool CbmMQTsaMultiSampler::CheckTimeslice(const fles::Timeslice& ts) {
946  if (0 == ts.num_components()) {
947  LOG(error) << "No Component in TS " << ts.index();
948  return 1;
949  }
950  LOG(info) << "Found " << ts.num_components()
951  << " different components in timeslice";
952 
953  for (size_t c = 0; c < ts.num_components(); ++c) {
954  LOG(info) << "Found " << ts.num_microslices(c)
955  << " microslices in component " << c;
956  LOG(info) << "Component " << c << " has a size of " << ts.size_component(c)
957  << " bytes";
958  LOG(info) << "Component " << c << " has the system id 0x" << std::hex
959  << static_cast<int>(ts.descriptor(c, 0).sys_id) << std::dec;
960  LOG(info) << "Component " << c << " has the system id 0x"
961  << static_cast<int>(ts.descriptor(c, 0).sys_id);
962 
963  /*
964  for (size_t m = 0; m < ts.num_microslices(c); ++m) {
965  PrintMicroSliceDescriptor(ts.descriptor(c,m));
966  }
967 */
968  }
969 
970  return true;
971 }
CbmMQTsaMultiSampler::fMaxTimeslices
uint64_t fMaxTimeslices
Definition: CbmMQTsaMultiSampler.h:29
CbmMQTsaMultiSampler::CheckTimeslice
bool CheckTimeslice(const fles::Timeslice &ts)
Definition: CbmMQTsaMultiSampler.cxx:945
CbmMQTsaMultiSampler::fAllowedChannels
std::vector< std::string > fAllowedChannels
Definition: CbmMQTsaMultiSampler.h:95
CbmMQTsaMultiSampler::fbNoSplitTs
bool fbNoSplitTs
Definition: CbmMQTsaMultiSampler.h:40
CbmFormatDecHexPrintout.h
CbmMQTsaMultiSampler::~CbmMQTsaMultiSampler
virtual ~CbmMQTsaMultiSampler()
Definition: CbmMQTsaMultiSampler.cxx:916
CbmMQTsaMultiSampler::CreateAndCombineComponentsPerSysId
bool CreateAndCombineComponentsPerSysId(const fles::Timeslice &)
Definition: CbmMQTsaMultiSampler.cxx:561
CbmMQTsaMultiSampler::fChannelsToSend
std::vector< std::vector< std::string > > fChannelsToSend
Definition: CbmMQTsaMultiSampler.h:106
CbmMQTsaMultiSampler::CreateAndSendFullTs
bool CreateAndSendFullTs(const fles::Timeslice &)
Definition: CbmMQTsaMultiSampler.cxx:749
CbmMQTsaMultiSampler::fSource
fles::TimesliceSource * fSource
Definition: CbmMQTsaMultiSampler.h:80
CbmMQTsaMultiSampler::fComponentsToSend
std::vector< int > fComponentsToSend
Definition: CbmMQTsaMultiSampler.h:105
InitTaskError
CBM headers.
Definition: CbmDeviceEventBuilderEtofStar2019.cxx:36
CbmMQTsaMultiSampler::SendData
bool SendData(const fles::StorableTimeslice &component)
CbmMQTsaMultiSampler::fTSCounter
uint64_t fTSCounter
Definition: CbmMQTsaMultiSampler.h:52
CbmMQTsaMultiSampler::SendCommand
bool SendCommand(std::string sCommand)
Definition: CbmMQTsaMultiSampler.cxx:861
cbm::mq::Transition::ErrorFound
@ ErrorFound
CbmMQTsaMultiSampler::PrintMicroSliceDescriptor
void PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor &mdsc)
Definition: CbmMQTsaMultiSampler.cxx:927
CbmMQTsaMultiSampler::IsChannelNameAllowed
bool IsChannelNameAllowed(std::string)
Definition: CbmMQTsaMultiSampler.cxx:332
CbmMQTsaMultiSampler::SendMissedTsIdx
bool SendMissedTsIdx(std::vector< uint64_t > vIndices)
Definition: CbmMQTsaMultiSampler.cxx:833
CbmMQTsaMultiSampler::fHost
std::string fHost
Definition: CbmMQTsaMultiSampler.h:36
CbmMQTsaMultiSampler::fbSendTsPerChannel
bool fbSendTsPerChannel
Definition: CbmMQTsaMultiSampler.h:42
CbmMQTsaMultiSampler::fbSendTsPerSysId
bool fbSendTsPerSysId
Definition: CbmMQTsaMultiSampler.h:41
CbmMQTsaMultiSampler::fbListCompPerSysIdReady
bool fbListCompPerSysIdReady
Definition: CbmMQTsaMultiSampler.h:109
CbmMQTsaMultiSampler::CreateAndCombineComponentsPerChannel
bool CreateAndCombineComponentsPerChannel(const fles::Timeslice &)
Definition: CbmMQTsaMultiSampler.cxx:635
CbmMQTsaMultiSampler::fvvCompPerChannel
std::vector< std::vector< uint32_t > > fvvCompPerChannel
Definition: CbmMQTsaMultiSampler.h:114
CbmMQTsaMultiSampler::fDirName
std::string fDirName
Definition: CbmMQTsaMultiSampler.h:32
CbmMQTsaMultiSampler::fvvCompPerSysId
std::vector< std::vector< uint32_t > > fvvCompPerSysId
Definition: CbmMQTsaMultiSampler.h:110
CbmMQTsaMultiSampler::fPort
uint64_t fPort
Definition: CbmMQTsaMultiSampler.h:37
CbmMQTsaMultiSampler::CreateAndSendComponent
bool CreateAndSendComponent(const fles::Timeslice &, int)
Definition: CbmMQTsaMultiSampler.cxx:524
CbmMQTsaMultiSampler::InitTask
virtual void InitTask()
Definition: CbmMQTsaMultiSampler.cxx:63
CbmMQTsaMultiSampler::fvChannelsToSend
std::vector< std::string > fvChannelsToSend
Definition: CbmMQTsaMultiSampler.h:113
CbmMQTsaMultiSampler::fsChannelNameCommands
std::string fsChannelNameCommands
Definition: CbmMQTsaMultiSampler.h:117
FormatDecPrintout
std::string FormatDecPrintout(uint64_t ulVal, char cFill, uint uWidth)
Definition: CbmFormatDecHexPrintout.cxx:4
CbmMQTsaMultiSampler::fTime
std::chrono::steady_clock::time_point fTime
Definition: CbmMQTsaMultiSampler.h:81
CbmMQTsaMultiSampler.h
CbmMQTsaMultiSampler::fuPrevTsIndex
uint64_t fuPrevTsIndex
Definition: CbmMQTsaMultiSampler.h:51
CbmMQTsaMultiSampler::CbmMQTsaMultiSampler
CbmMQTsaMultiSampler()
Definition: CbmMQTsaMultiSampler.cxx:48
CbmMQTsaMultiSampler::ConditionalRun
virtual bool ConditionalRun()
Definition: CbmMQTsaMultiSampler.cxx:375
m
__m128 m
Definition: L1/vectors/P4_F32vec4.h:26
CbmMQTsaMultiSampler::fbListCompPerChannelReady
bool fbListCompPerChannelReady
Definition: CbmMQTsaMultiSampler.h:112
CbmMQTsaMultiSampler::fsChannelNameMissedTs
std::string fsChannelNameMissedTs
Definition: CbmMQTsaMultiSampler.h:116
pos
TVector3 pos
Definition: CbmMvdSensorDigiToHitTask.cxx:60
CbmMQTsaMultiSampler::fFileName
std::string fFileName
Definition: CbmMQTsaMultiSampler.h:31
CbmMQTsaMultiSampler::fHighWaterMark
uint64_t fHighWaterMark
Definition: CbmMQTsaMultiSampler.h:38
CbmMQTsaMultiSampler::fInputFileList
std::vector< std::string > fInputFileList
List of input files.
Definition: CbmMQTsaMultiSampler.h:34
CbmMQDefs.h
cbm::mq::ChangeState
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)
Definition: CbmMQDefs.h:19
CbmMQTsaMultiSampler::fSysId
std::vector< int > fSysId
Definition: CbmMQTsaMultiSampler.h:102
CbmMQTsaMultiSampler::CalcRuntime
void CalcRuntime()
Definition: CbmMQTsaMultiSampler.cxx:918
CbmMQTsaMultiSampler::fMessageCounter
uint64_t fMessageCounter
Definition: CbmMQTsaMultiSampler.h:53