CbmRoot
CbmDeviceMcbmMonitorPulser.cxx
Go to the documentation of this file.
1 
9 
10 #include "CbmMQDefs.h"
11 #include "TimesliceMetaData.h"
12 
13 //#include "CbmMcbm2018MonitorAlgoTof.h"
14 #include "CbmFlesCanvasTools.h"
15 
16 #include "StorableTimeslice.hpp"
17 
18 #include "BoostSerializer.h"
19 #include "FairMQLogger.h"
20 #include "FairMQProgOptions.h" // device->fConfig
21 #include "FairParGenericSet.h"
22 #include "RootSerializer.h"
23 
24 #include "TCanvas.h"
25 #include "TFile.h"
26 #include "TH1.h"
27 #include "TList.h"
28 #include "TNamed.h"
29 
30 #include <array>
31 #include <iomanip>
32 #include <string>
33 
34 #include <boost/archive/binary_iarchive.hpp>
35 #include <boost/serialization/utility.hpp>
36 
37 #include <stdexcept>
38 struct InitTaskError : std::runtime_error {
39  using std::runtime_error::runtime_error;
40 };
41 
42 using namespace std;
43 
44 //Bool_t bMcbm2018MonitorTaskTofResetHistos = kFALSE;
45 
47 // : fMonitorAlgo{ new CbmMcbm2018MonitorAlgoTof() }
48 {}
49 
52  LOG(info) << "Init options for CbmMqStarHistoServer.";
53 
54  fbDebugMonitorMode = fConfig->GetValue<bool>("DebugMoni");
55  fuHistoryHistoSize = fConfig->GetValue<uint32_t>("HistEvoSz");
56  fuMinTotPulser = fConfig->GetValue<uint32_t>("PulsTotMin");
57  fuMaxTotPulser = fConfig->GetValue<uint32_t>("PulsTotMax");
58 
59  fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs");
60  fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin");
61  fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax");
62  fsChannelNameDataInput = fConfig->GetValue<std::string>("TsNameIn");
63  fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn");
64  fsChannelNameHistosConfig = fConfig->GetValue<std::string>("ChNameHistCfg");
65  fsChannelNameCanvasConfig = fConfig->GetValue<std::string>("ChNameCanvCfg");
66  fsAllowedChannels[0] = fsChannelNameDataInput;
67 
68  LOG(info) << "Histograms publication frequency in TS: " << fuPublishFreqTs;
69  LOG(info) << "Histograms publication min. interval in s: "
70  << fdMinPublishTime;
71  LOG(info) << "Histograms publication max. interval in s: "
72  << fdMaxPublishTime;
73 
75  // fMonitorAlgo->UseAbsoluteTime();
76 
77  // Get the information about created channels from the device
78  // Check if the defined channels from the topology (by name)
79  // are in the list of channels which are possible/allowed
80  // for the device
81  // The idea is to check at initilization if the devices are
82  // properly connected. For the time beeing this is done with a
83  // nameing convention. It is not avoided that someone sends other
84  // data on this channel.
85  //logger::SetLogLevel("INFO");
86 
87  int noChannel = fChannels.size();
88  LOG(info) << "Number of defined channels: " << noChannel;
89  for (auto const& entry : fChannels) {
90  LOG(info) << "Channel name: " << entry.first;
91  if (std::string::npos != entry.first.find(fsChannelNameDataInput)) {
92  if (!IsChannelNameAllowed(entry.first))
93  throw InitTaskError("Channel name does not match.");
94  OnData(entry.first, &CbmDeviceMcbmMonitorPulser::HandleData);
95  } // if( std::string::npos != entry.first.find( fsChannelNameDataInput ) )
96  } // for( auto const &entry : fChannels )
97  InitContainers();
98 } catch (InitTaskError& e) {
99  LOG(error) << e.what();
100  // Wrapper defined in CbmMQDefs.h to support different FairMQ versions
102 }
103 
105  for (auto const& entry : fsAllowedChannels) {
106  std::size_t pos1 = channelName.find(entry);
107  if (pos1 != std::string::npos) {
108  const vector<std::string>::const_iterator pos =
109  std::find(fsAllowedChannels.begin(), fsAllowedChannels.end(), entry);
110  const vector<std::string>::size_type idx =
111  pos - fsAllowedChannels.begin();
112  LOG(info) << "Found " << entry << " in " << channelName;
113  LOG(info) << "Channel name " << channelName
114  << " found in list of allowed channel names at position "
115  << idx;
116  return true;
117  } // if (pos1!=std::string::npos)
118  } // for(auto const &entry : fsAllowedChannels)
119  LOG(info) << "Channel name " << channelName
120  << " not found in list of allowed channel names.";
121  LOG(error) << "Stop device.";
122  return false;
123 }
124 
126  LOG(info) << "Init parameter containers for CbmDeviceMcbmMonitorPulser.";
127  Bool_t initOK = kTRUE;
128  /*
129  fParCList = fMonitorAlgo->GetParList();
130 
131  for( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ ) {
132  FairParGenericSet* tempObj = (FairParGenericSet*)( fParCList->At( iparC ) );
133  fParCList->Remove( tempObj );
134  std::string paramName{ tempObj->GetName() };
135  // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
136  // Should only be used for small data because of the cost of an additional copy
137 
138  // Her must come the proper Runid
139  std::string message = paramName + ",111";
140  LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message;
141 
142  FairMQMessagePtr req( NewSimpleMessage(message) );
143  FairMQMessagePtr rep( NewMessage() );
144 
145  FairParGenericSet* newObj = nullptr;
146 
147  if ( Send(req, "parameters") > 0 ) {
148  if ( Receive( rep, "parameters" ) >= 0) {
149  if ( rep->GetSize() != 0 ) {
150  CbmMQTMessage tmsg( rep->GetData(), rep->GetSize() );
151  newObj = static_cast< FairParGenericSet* >( tmsg.ReadObject( tmsg.GetClass() ) );
152  LOG( info ) << "Received unpack parameter from the server:";
153  newObj->print();
154  } else {
155  LOG( error ) << "Received empty reply. Parameter not available";
156  } // if (rep->GetSize() != 0)
157  } // if (Receive(rep, "parameters") >= 0)
158  } // if (Send(req, "parameters") > 0)
159  fParCList->AddAt( newObj, iparC );
160  delete tempObj;
161  } // for ( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
162 
164  fMonitorAlgo->SetIgnoreOverlapMs( fbIgnoreOverlapMs );
165  fMonitorAlgo->SetDebugMonitorMode( fbDebugMonitorMode );
166  fMonitorAlgo->SetIgnoreCriticalErrors( fbIgnoreCriticalErrors );
167  fMonitorAlgo->SetHistoryHistoSize( fuHistoryHistoSize );
168  fMonitorAlgo->SetPulserTotLimits( fuMinTotPulser, fuMaxTotPulser );
169 
170  Bool_t initOK = fMonitorAlgo->InitContainers();
171 */
172  // Bool_t initOK = fMonitorAlgo->ReInitContainers();
173 
174  // CreateHistos();
175  /*
178  initOK &= fMonitorAlgo->CreateHistograms();
179 
181  std::vector< std::pair< TNamed *, std::string > > vHistos = fMonitorAlgo->GetHistoVector();
183  std::vector< std::pair< TCanvas *, std::string > > vCanvases = fMonitorAlgo->GetCanvasVector();
184 
189  for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
190  {
191 // LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName()
192 // << " in " << vHistos[ uHisto ].second.data()
193 // ;
194  fArrayHisto.Add( vHistos[ uHisto ].first );
195  std::pair< std::string, std::string > psHistoConfig( vHistos[ uHisto ].first->GetName(),
196  vHistos[ uHisto ].second );
197  fvpsHistosFolder.push_back( psHistoConfig );
198 
200  FairMQMessagePtr messageHist( NewMessage() );
201  Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageHist, psHistoConfig );
202 
204  if( Send( messageHist, fsChannelNameHistosConfig ) < 0 )
205  {
206  LOG(error) << "Problem sending histo config";
207  return false;
208  } // if( Send( messageHist, fsChannelNameHistosConfig ) < 0 )
209 
210  LOG(info) << "Config of hist " << psHistoConfig.first.data()
211  << " in folder " << psHistoConfig.second.data() ;
212  } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
213 
217  for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
218  {
219 // LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName()
220 // << " in " << vCanvases[ uCanv ].second.data();
221  std::string sCanvName = (vCanvases[ uCanv ].first)->GetName();
222  std::string sCanvConf = GenerateCanvasConfigString( vCanvases[ uCanv ].first );
223 
224  std::pair< std::string, std::string > psCanvConfig( sCanvName, sCanvConf );
225 
226  fvpsCanvasConfig.push_back( psCanvConfig );
227 
229  FairMQMessagePtr messageCan( NewMessage() );
230  Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageCan, psCanvConfig );
231 
233  if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 )
234  {
235  LOG(error) << "Problem sending canvas config";
236  return false;
237  } // if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 )
238 
239  LOG(info) << "Config string of Canvas " << psCanvConfig.first.data()
240  << " is " << psCanvConfig.second.data() ;
241  } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
242 */
243  return initOK;
244 }
245 
246 
247 // handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
248 bool CbmDeviceMcbmMonitorPulser::HandleData(FairMQParts& parts, int /*index*/) {
249  fulNumMessages++;
250 
251  LOG(debug) << "Received message " << fulNumMessages << " with "
252  << parts.Size() << " parts"
253  << ", size0: " << parts.At(0)->GetSize();
254 
255  uint32_t uPartIdx = 0;
256 
258  /*
259  std::string msgStrTsMeta( static_cast< char * >( parts.At( uPartIdx )->GetData() ),
260  ( parts.At( uPartIdx ) )->GetSize() );
261  std::istringstream issTsMeta(msgStrTsMeta);
262  boost::archive::binary_iarchive inputArchiveTsMeta(issTsMeta);
263  inputArchiveTsMeta >> (*fTsMetaData);
264  ++uPartIdx;
265 */
266  Deserialize<RootSerializer>(*parts.At(uPartIdx), fTsMetaData);
267  ++uPartIdx;
268 
269  std::string msgStrT0(static_cast<char*>(parts.At(uPartIdx)->GetData()),
270  (parts.At(uPartIdx))->GetSize());
271  std::istringstream issT0(msgStrT0);
272  boost::archive::binary_iarchive inputArchiveT0(issT0);
273  inputArchiveT0 >> fvDigiT0;
274  ++uPartIdx;
275 
276  std::string msgStrSts(static_cast<char*>(parts.At(uPartIdx)->GetData()),
277  (parts.At(uPartIdx))->GetSize());
278  std::istringstream issSts(msgStrSts);
279  boost::archive::binary_iarchive inputArchiveSts(issSts);
280  inputArchiveSts >> fvDigiSts;
281  ++uPartIdx;
282 
283  std::string msgStrMuch(static_cast<char*>(parts.At(uPartIdx)->GetData()),
284  (parts.At(uPartIdx))->GetSize());
285  std::istringstream issMuch(msgStrMuch);
286  boost::archive::binary_iarchive inputArchiveMuch(issMuch);
287  inputArchiveMuch >> fvDigiMuch;
288  ++uPartIdx;
289 
290  std::string msgStrTrd(static_cast<char*>(parts.At(uPartIdx)->GetData()),
291  (parts.At(uPartIdx))->GetSize());
292  std::istringstream issTrd(msgStrTrd);
293  boost::archive::binary_iarchive inputArchiveTrd(issTrd);
294  inputArchiveTrd >> fvDigiTrd;
295  ++uPartIdx;
296 
297  std::string msgStrTof(static_cast<char*>(parts.At(uPartIdx)->GetData()),
298  (parts.At(uPartIdx))->GetSize());
299  std::istringstream issTof(msgStrTof);
300  boost::archive::binary_iarchive inputArchiveTof(issTof);
301  inputArchiveTof >> fvDigiTof;
302  ++uPartIdx;
303 
304  std::string msgStrRich(static_cast<char*>(parts.At(uPartIdx)->GetData()),
305  (parts.At(uPartIdx))->GetSize());
306  std::istringstream issRich(msgStrRich);
307  boost::archive::binary_iarchive inputArchiveRich(issRich);
308  inputArchiveRich >> fvDigiRich;
309  ++uPartIdx;
310 
311  std::string msgStrPsd(static_cast<char*>(parts.At(uPartIdx)->GetData()),
312  (parts.At(uPartIdx))->GetSize());
313  std::istringstream issPsd(msgStrPsd);
314  boost::archive::binary_iarchive inputArchivePsd(issPsd);
315  inputArchivePsd >> fvDigiPsd;
316  ++uPartIdx;
317 
319 
321  delete fTsMetaData;
322  fvDigiT0.clear();
323  fvDigiSts.clear();
324  fvDigiMuch.clear();
325  fvDigiTrd.clear();
326  fvDigiTof.clear();
327  fvDigiRich.clear();
328  fvDigiPsd.clear();
329 
330  /*
331  LOG(debug) << "Received message number "<< fulNumMessages
332  << " with size " << msg->GetSize();
333 
334  if( 0 == fulNumMessages % 10000 )
335  LOG(info) << "Received " << fulNumMessages << " messages";
336 
337  std::string msgStr( static_cast<char*>( msg->GetData() ), msg->GetSize() );
338  std::istringstream iss( msgStr );
339  boost::archive::binary_iarchive inputArchive( iss );
340 
342  fles::StorableTimeslice component{ 0 };
343  inputArchive >> component;
344 
346  DoUnpack(component, 0);
347 
351  std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
352  std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime;
353  if( ( fdMaxPublishTime < elapsedSeconds.count() ) ||
354  ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
355  {
356  SendHistograms();
357  fLastPublishTime = std::chrono::system_clock::now();
358  } // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
359 */
360  return true;
361 }
362 
365  FairMQMessagePtr message(NewMessage());
366  Serialize<RootSerializer>(*message, &fArrayHisto);
367 
368  // test code to check if deserialization works
369  /*
370  TObject* tempObject = nullptr;
371  Deserialize<RootDeserializer>(*message, tempObject);
372 
373  if (TString(tempObject->ClassName()).EqualTo("TObjArray")) {
374  TObjArray* arrayHisto = static_cast<TObjArray*>(tempObject);
375  LOG(info) << "Array contains " << arrayHisto->GetEntriesFast()
376  << " entries";
377  for (Int_t i = 0; i < arrayHisto->GetEntriesFast(); i++) {
378  TObject* obj = arrayHisto->At(i);
379  LOG(info) << obj->GetName();
380  TH1* histogram = static_cast<TH1*>(obj);
381  LOG(info) << histogram->GetNbinsX();
382  }
383  }
384 */
385 
387  if (Send(message, fsChannelNameHistosInput) < 0) {
388  LOG(error) << "Problem sending data";
389  return false;
390  } // if( Send( message, fsChannelNameHistosInput ) < 0 )
391 
393  // fMonitorAlgo->ResetHistograms( kFALSE );
394 
395  return true;
396 }
397 
398 
400 
CbmDeviceMcbmMonitorPulser::SendHistograms
bool SendHistograms()
Definition: CbmDeviceMcbmMonitorPulser.cxx:363
CbmDeviceMcbmMonitorPulser::Finish
void Finish()
Definition: CbmDeviceMcbmMonitorPulser.cxx:401
InitTaskError
CBM headers.
Definition: CbmDeviceEventBuilderEtofStar2019.cxx:36
CbmFlesCanvasTools.h
CbmDeviceMcbmMonitorPulser::InitContainers
Bool_t InitContainers()
Definition: CbmDeviceMcbmMonitorPulser.cxx:125
CbmDeviceMcbmMonitorPulser::IsChannelNameAllowed
bool IsChannelNameAllowed(std::string channelName)
Definition: CbmDeviceMcbmMonitorPulser.cxx:104
cbm::mq::Transition::ErrorFound
@ ErrorFound
CbmDeviceMcbmMonitorPulser::CbmDeviceMcbmMonitorPulser
CbmDeviceMcbmMonitorPulser()
Definition: CbmDeviceMcbmMonitorPulser.cxx:46
CbmDeviceMcbmMonitorPulser::InitTask
virtual void InitTask()
Definition: CbmDeviceMcbmMonitorPulser.cxx:50
CbmDeviceMcbmMonitorPulser::~CbmDeviceMcbmMonitorPulser
virtual ~CbmDeviceMcbmMonitorPulser()
Definition: CbmDeviceMcbmMonitorPulser.cxx:399
CbmDeviceMcbmMonitorPulser::HandleData
bool HandleData(FairMQParts &, int)
Definition: CbmDeviceMcbmMonitorPulser.cxx:248
pos
TVector3 pos
Definition: CbmMvdSensorDigiToHitTask.cxx:60
TimesliceMetaData.h
CbmDeviceMcbmMonitorPulser.h
CbmMQDefs.h
cbm::mq::ChangeState
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)
Definition: CbmMQDefs.h:19