CbmRoot
CbmDeviceTriggerHandlerEtof.cxx
Go to the documentation of this file.
1 
9 #include "CbmMQDefs.h"
10 
11 #include "FairEventHeader.h"
12 #include "FairFileHeader.h"
13 #include "FairGeoParSet.h"
14 #include "FairMQLogger.h"
15 #include "FairMQProgOptions.h" // device->fConfig
16 #include "FairRootFileSink.h"
17 #include "FairRootManager.h"
18 #include "FairRunOnline.h"
19 #include "FairRuntimeDb.h"
20 
21 #include <boost/archive/binary_iarchive.hpp>
22 #include <boost/archive/binary_oarchive.hpp>
23 #include <boost/serialization/vector.hpp>
24 
25 #include <chrono>
26 #include <iomanip>
27 #include <string>
28 #include <thread> // this_thread::sleep_for
29 
30 #include <stdexcept>
31 struct InitTaskError : std::runtime_error {
32  using std::runtime_error::runtime_error;
33 };
34 
35 static std::chrono::steady_clock::time_point dctime =
36  std::chrono::steady_clock::now();
37 static double dSize = 0.;
38 
39 using namespace std;
40 
42  : fNumMessages(0)
43  , fiMsgCnt(0)
44  , fbMonitorMode(kFALSE)
45  , fbDebugMonitorMode(kFALSE)
46  , fbSandboxMode(kFALSE)
47  , fbEventDumpEna(kFALSE)
48  , fdEvent(0.) {}
49 
51 
53  // Get the information about created channels from the device
54  // Check if the defined channels from the topology (by name)
55  // are in the list of channels which are possible/allowed
56  // for the device
57  // The idea is to check at initilization if the devices are
58  // properly connected. For the time beeing this is done with a
59  // nameing convention. It is not avoided that someone sends other
60  // data on this channel.
61  int noChannel = fChannels.size();
62  LOG(info) << "Number of defined input channels: " << noChannel;
63  for (auto const& entry : fChannels) {
64  LOG(info) << "Channel name: " << entry.first;
65  if (!IsChannelNameAllowed(entry.first))
66  throw InitTaskError("Channel name does not match.");
67  if (entry.first != "syscmd")
68  OnData(entry.first, &CbmDeviceTriggerHandlerEtof::HandleData);
69  else
70  OnData(entry.first, &CbmDeviceTriggerHandlerEtof::HandleMessage);
71  }
72  InitWorkspace();
73 } catch (InitTaskError& e) {
74  LOG(error) << e.what();
76 }
77 
79  std::string channelName) {
80  for (auto const& entry : fAllowedChannels) {
81  std::size_t pos1 = channelName.find(entry);
82  if (pos1 != std::string::npos) {
83  const vector<std::string>::const_iterator pos =
84  std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
85  const vector<std::string>::size_type idx = pos - fAllowedChannels.begin();
86  LOG(info) << "Found " << entry << " in " << channelName;
87  LOG(info) << "Channel name " << channelName
88  << " found in list of allowed channel names at position "
89  << idx;
90  return true;
91  }
92  }
93  LOG(info) << "Channel name " << channelName
94  << " not found in list of allowed channel names.";
95  LOG(error) << "Stop device.";
96  return false;
97 }
98 
100  LOG(info) << "Init work space for CbmDeviceTriggerHandlerEtof.";
101 
102  // steering variables
103  fbSandboxMode = fConfig->GetValue<bool>("SandboxMode");
104 
105  return kTRUE;
106 }
107 
108 // handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
109 //bool CbmDeviceTriggerHandlerEtof::HandleData(FairMQMessagePtr& msg, int /*index*/)
111  int /*index*/) {
112  // Don't do anything with the data
113  // Maybe add an message counter which counts the incomming messages and add
114  // an output
115  fNumMessages++;
116  LOG(debug) << "Received message " << fNumMessages << " with " << parts.Size()
117  << " parts"
118  << ", size0: " << parts.At(0)->GetSize();
119 
120  uint TrigWord {0};
121  std::string msgStrE(static_cast<char*>(parts.At(0)->GetData()),
122  (parts.At(0))->GetSize());
123  std::istringstream issE(msgStrE);
124  boost::archive::binary_iarchive inputArchiveE(issE);
125  inputArchiveE >> TrigWord;
126 
127  char* pDataBuff = static_cast<char*>(parts.At(1)->GetData());
128  int iBuffSzByte = parts.At(1)->GetSize();
129 
130  // Send Subevent to STAR
131  LOG(debug) << "Send Data for event " << fdEvent << ", TrigWord " << TrigWord
132  << " with size " << iBuffSzByte << Form(" at %p ", pDataBuff);
133  if (kFALSE == fbSandboxMode) {
134  star_rhicf_write(TrigWord, pDataBuff, iBuffSzByte);
135  }
136  dSize += iBuffSzByte;
137  if (0 == (int) fdEvent % 10000) {
138  std::chrono::duration<double> deltatime =
139  std::chrono::steady_clock::now() - dctime;
140  LOG(info) << "Processed " << fdEvent
141  << " events, delta-time: " << deltatime.count()
142  << ", rate: " << dSize * 1.E-6 / deltatime.count() << "MB/s";
143  dctime = std::chrono::steady_clock::now();
144  dSize = 0.;
145  }
146  fdEvent++;
147 
148  return kTRUE;
149 }
150 
151 /************************************************************************************/
152 
154  int /*index*/) {
155  const char* cmd = (char*) (msg->GetData());
156  const char cmda[4] = {*cmd};
157  LOG(info) << "Handle message " << cmd << ", " << cmd[0];
158 
159  // only one implemented so far "Stop"
160  if (strcmp(cmda, "STOP")) {
162  cbm::mq::LogState(this);
164  cbm::mq::LogState(this);
166  cbm::mq::LogState(this);
168  cbm::mq::LogState(this);
169  // ChangeState(fair::mq::Transition(STOP));
170  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
171  }
172 
173  return true;
174 }
CbmDeviceTriggerHandlerEtof::HandleMessage
bool HandleMessage(FairMQMessagePtr &, int)
Definition: CbmDeviceTriggerHandlerEtof.cxx:153
InitTaskError
CBM headers.
Definition: CbmDeviceEventBuilderEtofStar2019.cxx:36
cbm::mq::Transition::End
@ End
CbmDeviceTriggerHandlerEtof::fNumMessages
uint64_t fNumMessages
Definition: CbmDeviceTriggerHandlerEtof.h:57
CbmDeviceTriggerHandlerEtof::InitWorkspace
Bool_t InitWorkspace()
Definition: CbmDeviceTriggerHandlerEtof.cxx:99
cbm::mq::Transition::ErrorFound
@ ErrorFound
cbm::mq::LogState
void LogState(FairMQDevice *device)
Definition: CbmMQDefs.h:53
CbmDeviceTriggerHandlerEtof::InitTask
virtual void InitTask()
Definition: CbmDeviceTriggerHandlerEtof.cxx:52
dctime
static std::chrono::steady_clock::time_point dctime
Definition: CbmDeviceTriggerHandlerEtof.cxx:35
star_rhicf_write
int star_rhicf_write(unsigned int trg_word, void *dta, int bytes)
cbm::mq::Transition::Ready
@ Ready
CbmDeviceTriggerHandlerEtof::fdEvent
Double_t fdEvent
Switch ON the dumping of the events to a binary file.
Definition: CbmDeviceTriggerHandlerEtof.h:79
CbmDeviceTriggerHandlerEtof::IsChannelNameAllowed
Bool_t IsChannelNameAllowed(std::string channelName)
Definition: CbmDeviceTriggerHandlerEtof.cxx:78
CbmDeviceTriggerHandlerEtof::HandleData
bool HandleData(FairMQParts &, int)
Definition: CbmDeviceTriggerHandlerEtof.cxx:110
CbmDeviceTriggerHandlerEtof::fAllowedChannels
std::vector< std::string > fAllowedChannels
Definition: CbmDeviceTriggerHandlerEtof.h:58
CbmDeviceTriggerHandlerEtof::fbSandboxMode
Bool_t fbSandboxMode
Switch ON the filling of a additional set of histograms.
Definition: CbmDeviceTriggerHandlerEtof.h:75
cbm::mq::Transition::DeviceReady
@ DeviceReady
dSize
static double dSize
Definition: CbmDeviceTriggerHandlerEtof.cxx:37
pos
TVector3 pos
Definition: CbmMvdSensorDigiToHitTask.cxx:60
CbmDeviceTriggerHandlerEtof::CbmDeviceTriggerHandlerEtof
CbmDeviceTriggerHandlerEtof()
Definition: CbmDeviceTriggerHandlerEtof.cxx:41
CbmDeviceTriggerHandlerEtof::~CbmDeviceTriggerHandlerEtof
virtual ~CbmDeviceTriggerHandlerEtof()
Definition: CbmDeviceTriggerHandlerEtof.cxx:50
CbmMQDefs.h
cbm::mq::ChangeState
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)
Definition: CbmMQDefs.h:19
CbmDeviceTriggerHandlerEtof.h
cbm::mq::Transition::Idle
@ Idle