CbmRoot
CbmMQTsaInfo.cxx
Go to the documentation of this file.
1 
9 #include "CbmMQTsaInfo.h"
10 #include "CbmMQDefs.h"
11 
12 #include "FairMQLogger.h"
13 #include "FairMQProgOptions.h" // device->fConfig
14 
15 #include "TimesliceInputArchive.hpp"
16 #include "TimesliceSubscriber.hpp"
17 
18 #include <boost/archive/binary_oarchive.hpp>
19 
20 #include <chrono>
21 #include <ctime>
22 #include <stdio.h>
23 #include <thread> // this_thread::sleep_for
24 
25 using namespace std;
26 
27 #include <stdexcept>
28 
29 struct InitTaskError : std::runtime_error {
30  using std::runtime_error::runtime_error;
31 };
32 
33 
35  : FairMQDevice()
36  , fMaxTimeslices(0)
37  , fFileName("")
38  , fInputFileList()
39  , fFileCounter(0)
40  , fHost("")
41  , fPort(0)
42  , fTSNumber(0)
43  , fTSCounter(0)
44  , fMessageCounter(0)
45  , fTime() {}
46 
48  // Get the values from the command line options (via fConfig)
49  fFileName = fConfig->GetValue<string>("filename");
50  fHost = fConfig->GetValue<string>("flib-host");
51  fPort = fConfig->GetValue<uint64_t>("flib-port");
52  fMaxTimeslices = fConfig->GetValue<uint64_t>("max-timeslices");
53 
54 
55  LOG(info) << "Filename: " << fFileName;
56  LOG(info) << "Host: " << fHost;
57  LOG(info) << "Port: " << fPort;
58 
59  LOG(info) << "MaxTimeslices: " << fMaxTimeslices;
60 
61  // Get the information about created channels from the device
62  // Check if the defined channels from the topology (by name)
63  // are in the list of channels which are possible/allowed
64  // for the device
65  // The idea is to check at initilization if the devices are
66  // properly connected. For the time beeing this is done with a
67  // nameing convention. It is not avoided that someone sends other
68  // data on this channel.
69  int noChannel = fChannels.size();
70  LOG(info) << "Number of defined output channels: " << noChannel;
71  for (auto const& entry : fChannels) {
72  LOG(info) << "Channel name: " << entry.first;
73  if (!IsChannelNameAllowed(entry.first))
74  throw InitTaskError("Channel name does not match.");
75  }
76 
77  if (0 == fFileName.size() && 0 != fHost.size()) {
78  std::string connector = "tcp://" + fHost + ":" + std::to_string(fPort);
79  LOG(info) << "Open TSPublisher at " << connector;
80  fSource = new fles::TimesliceSubscriber(connector);
81  if (!fSource) { throw InitTaskError("Could not connect to publisher."); }
82  } else {
83  LOG(info) << "Open the Flib input file " << fFileName;
84  // Check if the input file exist
85  FILE* inputFile = fopen(fFileName.c_str(), "r");
86  if (!inputFile) { throw InitTaskError("Input file doesn't exist."); }
87  fclose(inputFile);
88  fSource = new fles::TimesliceInputArchive(fFileName);
89  if (!fSource) { throw InitTaskError("Could not open input file."); }
90  }
91  fTime = std::chrono::steady_clock::now();
92 } catch (InitTaskError& e) {
93  LOG(error) << e.what();
94  // Wrapper defined in CbmMQDefs.h to support different FairMQ versions
96 }
97 
98 bool CbmMQTsaInfo::IsChannelNameAllowed(std::string channelName) {
99  if (std::find(fAllowedChannels.begin(), fAllowedChannels.end(), channelName)
100  != fAllowedChannels.end()) {
101  LOG(info) << "Channel name " << channelName
102  << " found in list of allowed channel names.";
103  return true;
104  } else {
105  LOG(info) << "Channel name " << channelName
106  << " not found in list of allowed channel names.";
107  LOG(error) << "Stop device.";
108  return false;
109  }
110 }
111 
113 
114 
115  auto timeslice = fSource->get();
116  if (timeslice) {
117  fTSCounter++;
118  if (fTSCounter % 10000 == 0) LOG(info) << "Analyse Event " << fTSCounter;
119 
120 
121  const fles::Timeslice& ts = *timeslice;
122  // auto tsIndex = ts.index();
123 
124 
125  LOG(info) << "Found " << ts.num_components()
126  << " different components in timeslice";
127 
128  CheckTimeslice(ts);
129 
130  if (fTSCounter < fMaxTimeslices) {
131  return true;
132  } else {
133  CalcRuntime();
134  return false;
135  }
136  } else {
137  CalcRuntime();
138  return false;
139  }
140 }
141 
142 
144 
146  std::chrono::duration<double> run_time =
147  std::chrono::steady_clock::now() - fTime;
148 
149  LOG(info) << "Runtime: " << run_time.count();
150  LOG(info) << "No more input data";
151 }
152 
153 
155  const fles::MicrosliceDescriptor& mdsc) {
156  LOG(info) << "Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id)
157  << std::dec;
158  LOG(info) << "Header version: Ox" << std::hex
159  << static_cast<int>(mdsc.hdr_ver) << std::dec;
160  LOG(info) << "Equipement ID: " << mdsc.eq_id;
161  LOG(info) << "Flags: " << mdsc.flags;
162  LOG(info) << "Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id)
163  << std::dec;
164  LOG(info) << "Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver)
165  << std::dec;
166  LOG(info) << "Microslice Idx: " << mdsc.idx;
167  LOG(info) << "Checksum: " << mdsc.crc;
168  LOG(info) << "Size: " << mdsc.size;
169  LOG(info) << "Offset: " << mdsc.offset;
170 }
171 
172 bool CbmMQTsaInfo::CheckTimeslice(const fles::Timeslice& ts) {
173  if (0 == ts.num_components()) {
174  LOG(error) << "No Component in TS " << ts.index();
175  return 1;
176  }
177  LOG(info) << "Found " << ts.num_components()
178  << " different components in timeslice";
179 
180  for (size_t c = 0; c < ts.num_components(); ++c) {
181  LOG(info) << "Found " << ts.num_microslices(c)
182  << " microslices in component " << c;
183  LOG(info) << "Component " << c << " has a size of " << ts.size_component(c)
184  << " bytes";
185  LOG(info) << "Component " << c << " has the system id 0x" << std::hex
186  << static_cast<int>(ts.descriptor(c, 0).sys_id) << std::dec;
187 
188  /*
189  for (size_t m = 0; m < ts.num_microslices(c); ++m) {
190  PrintMicroSliceDescriptor(ts.descriptor(c,m));
191  }
192 */
193  }
194 
195  return true;
196 }
CbmMQTsaInfo::ConditionalRun
virtual bool ConditionalRun()
Definition: CbmMQTsaInfo.cxx:112
CbmMQTsaInfo::CbmMQTsaInfo
CbmMQTsaInfo()
Definition: CbmMQTsaInfo.cxx:34
CbmMQTsaInfo::fTSCounter
uint64_t fTSCounter
Definition: CbmMQTsaInfo.h:39
CbmMQTsaInfo::~CbmMQTsaInfo
virtual ~CbmMQTsaInfo()
Definition: CbmMQTsaInfo.cxx:143
InitTaskError
CBM headers.
Definition: CbmDeviceEventBuilderEtofStar2019.cxx:36
CbmMQTsaInfo::PrintMicroSliceDescriptor
void PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor &mdsc)
Definition: CbmMQTsaInfo.cxx:154
CbmMQTsaInfo::CalcRuntime
void CalcRuntime()
Definition: CbmMQTsaInfo.cxx:145
CbmMQTsaInfo::fMaxTimeslices
uint64_t fMaxTimeslices
Definition: CbmMQTsaInfo.h:30
cbm::mq::Transition::ErrorFound
@ ErrorFound
CbmMQTsaInfo::fTime
std::chrono::steady_clock::time_point fTime
Definition: CbmMQTsaInfo.h:55
CbmMQTsaInfo::fPort
uint64_t fPort
Definition: CbmMQTsaInfo.h:36
CbmMQTsaInfo::fHost
std::string fHost
Definition: CbmMQTsaInfo.h:35
CbmMQTsaInfo::CheckTimeslice
bool CheckTimeslice(const fles::Timeslice &ts)
Definition: CbmMQTsaInfo.cxx:172
CbmMQTsaInfo::fFileName
std::string fFileName
Definition: CbmMQTsaInfo.h:32
CbmMQTsaInfo.h
CbmMQTsaInfo::IsChannelNameAllowed
bool IsChannelNameAllowed(std::string)
Definition: CbmMQTsaInfo.cxx:98
CbmMQTsaInfo::InitTask
virtual void InitTask()
Definition: CbmMQTsaInfo.cxx:47
CbmMQTsaInfo::fSource
fles::TimesliceSource * fSource
Definition: CbmMQTsaInfo.h:54
CbmMQDefs.h
CbmMQTsaInfo::fAllowedChannels
std::vector< std::string > fAllowedChannels
Definition: CbmMQTsaInfo.h:57
cbm::mq::ChangeState
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)
Definition: CbmMQDefs.h:19