CbmRoot
CbmTsaComponentSink.cxx
Go to the documentation of this file.
1 
8 #include "CbmTsaComponentSink.h"
9 #include "CbmMQDefs.h"
10 
11 #include "StorableTimeslice.hpp"
12 
13 #include "FairMQLogger.h"
14 #include "FairMQProgOptions.h" // device->fConfig
15 
16 #include <boost/archive/binary_iarchive.hpp>
17 
18 #include <string>
19 
20 #include <stdexcept>
21 struct InitTaskError : std::runtime_error {
22  using std::runtime_error::runtime_error;
23 };
24 
25 using namespace std;
26 
28 
30  // Get the information about created channels from the device
31  // Check if the defined channels from the topology (by name)
32  // are in the list of channels which are possible/allowed
33  // for the device
34  // The idea is to check at initilization if the devices are
35  // properly connected. For the time beeing this is done with a
36  // nameing convention. It is not avoided that someone sends other
37  // data on this channel.
38  int noChannel = fChannels.size();
39  LOG(info) << "Number of defined input channels: " << noChannel;
40  for (auto const& entry : fChannels) {
41  LOG(info) << "Channel name: " << entry.first;
42  if (!IsChannelNameAllowed(entry.first))
43  throw InitTaskError("Channel name does not match.");
44  OnData(entry.first, &CbmTsaComponentSink::HandleData);
45  }
46 } catch (InitTaskError& e) {
47  LOG(error) << e.what();
48  // Wrapper defined in CbmMQDefs.h to support different FairMQ versions
50 }
51 
52 bool CbmTsaComponentSink::IsChannelNameAllowed(std::string channelName) {
53 
54  for (auto const& entry : fAllowedChannels) {
55  std::size_t pos1 = channelName.find(entry);
56  if (pos1 != std::string::npos) {
57  const vector<std::string>::const_iterator pos =
58  std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
59  const vector<std::string>::size_type idx = pos - fAllowedChannels.begin();
60  LOG(info) << "Found " << entry << " in " << channelName;
61  LOG(info) << "Channel name " << channelName
62  << " found in list of allowed channel names at position "
63  << idx;
64  return true;
65  }
66  }
67  LOG(info) << "Channel name " << channelName
68  << " not found in list of allowed channel names.";
69  LOG(error) << "Stop device.";
70  return false;
71 }
72 
73 
74 // handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
75 bool CbmTsaComponentSink::HandleData(FairMQMessagePtr& msg, int /*index*/) {
76  // Don't do anything with the data
77  // Maybe add an message counter which counts the incomming messages and add
78  // an output
79  fNumMessages++;
80  LOG(info) << "Received message number " << fNumMessages << " with size "
81  << msg->GetSize();
82 
83  std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
84  std::istringstream iss(msgStr);
85  boost::archive::binary_iarchive inputArchive(iss);
86 
87  fles::StorableTimeslice component {0};
88  inputArchive >> component;
89 
90  CheckTimeslice(component);
91 
92  return true;
93 }
94 
96 
98  const fles::MicrosliceDescriptor& mdsc) {
99  LOG(info) << "Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id)
100  << std::dec;
101  LOG(info) << "Header version: Ox" << std::hex
102  << static_cast<int>(mdsc.hdr_ver) << std::dec;
103  LOG(info) << "Equipement ID: " << mdsc.eq_id;
104  LOG(info) << "Flags: " << mdsc.flags;
105  LOG(info) << "Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id)
106  << std::dec;
107  LOG(info) << "Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver)
108  << std::dec;
109  LOG(info) << "Microslice Idx: " << mdsc.idx;
110  LOG(info) << "Checksum: " << mdsc.crc;
111  LOG(info) << "Size: " << mdsc.size;
112  LOG(info) << "Offset: " << mdsc.offset;
113 }
114 
115 bool CbmTsaComponentSink::CheckTimeslice(const fles::Timeslice& ts) {
116  if (0 == ts.num_components()) {
117  LOG(error) << "No Component in TS " << ts.index();
118  return 1;
119  }
120  LOG(info) << "Found " << ts.num_components()
121  << " different components in timeslice";
122 
123  for (size_t c = 0; c < ts.num_components(); ++c) {
124  LOG(info) << "Found " << ts.num_microslices(c)
125  << " microslices in component " << c;
126  LOG(info) << "Component " << c << " has a size of " << ts.size_component(c)
127  << " bytes";
128  LOG(info) << "Sys ID: Ox" << std::hex
129  << static_cast<int>(ts.descriptor(0, 0).sys_id) << std::dec;
130 
131  /*
132  for (size_t m = 0; m < ts.num_microslices(c); ++m) {
133  PrintMicroSliceDescriptor(ts.descriptor(c,m));
134  }
135 */
136  }
137 
138  return true;
139 }
CbmTsaComponentSink::CheckTimeslice
bool CheckTimeslice(const fles::Timeslice &ts)
Definition: CbmTsaComponentSink.cxx:115
CbmTsaComponentSink.h
CbmTsaComponentSink::IsChannelNameAllowed
bool IsChannelNameAllowed(std::string channelName)
Definition: CbmTsaComponentSink.cxx:52
InitTaskError
CBM headers.
Definition: CbmDeviceEventBuilderEtofStar2019.cxx:36
CbmTsaComponentSink::fNumMessages
uint64_t fNumMessages
Definition: CbmTsaComponentSink.h:26
cbm::mq::Transition::ErrorFound
@ ErrorFound
CbmTsaComponentSink::fAllowedChannels
std::vector< std::string > fAllowedChannels
Definition: CbmTsaComponentSink.h:28
CbmTsaComponentSink::~CbmTsaComponentSink
virtual ~CbmTsaComponentSink()
Definition: CbmTsaComponentSink.cxx:95
CbmTsaComponentSink::PrintMicroSliceDescriptor
void PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor &mdsc)
Definition: CbmTsaComponentSink.cxx:97
CbmTsaComponentSink::CbmTsaComponentSink
CbmTsaComponentSink()
Definition: CbmTsaComponentSink.cxx:27
CbmTsaComponentSink::HandleData
bool HandleData(FairMQMessagePtr &, int)
Definition: CbmTsaComponentSink.cxx:75
pos
TVector3 pos
Definition: CbmMvdSensorDigiToHitTask.cxx:60
CbmMQDefs.h
cbm::mq::ChangeState
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)
Definition: CbmMQDefs.h:19
CbmTsaComponentSink::InitTask
virtual void InitTask()
Definition: CbmTsaComponentSink.cxx:29