CbmRoot
CbmDeviceMcbmEventSink.cxx
Go to the documentation of this file.
1 
9 
10 
12 #include "CbmMQDefs.h"
13 
14 #include "CbmEvent.h"
15 #include "CbmFlesCanvasTools.h"
16 #include "TimesliceMetaData.h"
17 
19 #include "BoostSerializer.h"
20 #include "FairMQLogger.h"
21 #include "FairMQProgOptions.h" // device->fConfig
22 #include "FairParGenericSet.h"
23 #include "FairRootFileSink.h"
24 #include "FairRootManager.h"
25 #include "FairRunOnline.h"
26 #include "RootSerializer.h"
27 
29 #include "TCanvas.h"
30 #include "TFile.h"
31 #include "TH1.h"
32 #include "TList.h"
33 #include "TNamed.h"
34 #include <boost/archive/binary_iarchive.hpp>
35 #include <boost/serialization/utility.hpp>
36 
38 #include <array>
39 #include <iomanip>
40 #include <string>
41 #include <thread> // this_thread::sleep_for
42 
43 #include <stdexcept>
44 struct InitTaskError : std::runtime_error {
45  using std::runtime_error::runtime_error;
46 };
47 
48 using namespace std;
49 
50 //Bool_t bMcbm2018MonitorTaskT0ResetHistos = kFALSE;
51 
53 
56  LOG(info) << "Init options for CbmDeviceMcbmEventSink.";
57 
58  fsOutputFileName = fConfig->GetValue<std::string>("OutFileName");
59 
60  fsChannelNameDataInput = fConfig->GetValue<std::string>("EvtNameIn");
61  fsAllowedChannels[0] = fsChannelNameDataInput;
62 
63  fbFillHistos = fConfig->GetValue<bool>("FillHistos");
64  fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn");
65  fsChannelNameHistosConfig = fConfig->GetValue<std::string>("ChNameHistCfg");
66  fsChannelNameCanvasConfig = fConfig->GetValue<std::string>("ChNameCanvCfg");
67  fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs");
68  fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin");
69  fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax");
70 
72  OnData(fsChannelNameMissedTs, &CbmDeviceMcbmEventSink::HandleMissTsData);
73 
75  OnData(fsChannelNameCommands, &CbmDeviceMcbmEventSink::HandleCommand);
76 
78  // Get the information about created channels from the device
79  // Check if the defined channels from the topology (by name)
80  // are in the list of channels which are possible/allowed
81  // for the device
82  // The idea is to check at initilization if the devices are
83  // properly connected. For the time beeing this is done with a
84  // nameing convention. It is not avoided that someone sends other
85  // data on this channel.
86  //logger::SetLogLevel("INFO");
87  int noChannel = fChannels.size();
88  LOG(info) << "Number of defined channels: " << noChannel;
89  for (auto const& entry : fChannels) {
90  LOG(info) << "Channel name: " << entry.first;
91  if (std::string::npos != entry.first.find(fsChannelNameDataInput)) {
92  if (!IsChannelNameAllowed(entry.first))
93  throw InitTaskError("Channel name does not match.");
94  OnData(entry.first, &CbmDeviceMcbmEventSink::HandleData);
95  } // if( entry.first.find( "ts" )
96  } // for( auto const &entry : fChannels )
97 
98  // InitContainers();
99 
101  fvDigiT0 = new std::vector<CbmTofDigi>();
102  fvDigiSts = new std::vector<CbmStsDigi>();
103  fvDigiMuch = new std::vector<CbmMuchBeamTimeDigi>();
104  fvDigiTrd = new std::vector<CbmTrdDigi>();
105  fvDigiTof = new std::vector<CbmTofDigi>();
106  fvDigiRich = new std::vector<CbmRichDigi>();
107  fvDigiPsd = new std::vector<CbmPsdDigi>();
108 
111  fTimeSliceMetaDataArray = new TClonesArray("TimesliceMetaData", 1);
112  if (NULL == fTimeSliceMetaDataArray) {
113  throw InitTaskError("Failed creating the TS meta data TClonesarray ");
114  } // if( NULL == fTimeSliceMetaDataArray )
117  fEventsArray = new TClonesArray("CbmEvent", 500);
118  if (NULL == fEventsArray) {
119  throw InitTaskError("Failed creating the Events TClonesarray ");
120  } // if( NULL == fEventsArray )
121 
123  if ("" != fsOutputFileName) {
124  fpRun = new FairRunOnline();
125  fpFairRootMgr = FairRootManager::Instance();
126  fpFairRootMgr->SetSink(new FairRootFileSink(fsOutputFileName));
127  if (nullptr == fpFairRootMgr->GetOutFile()) {
128  throw InitTaskError("Could not open root file");
129  } // if( nullptr == fpFairRootMgr->GetOutFile() )
130  } // if( "" != fsOutputFileName )
131  else {
132  throw InitTaskError("Empty output filename!");
133  } // else of if( "" != fsOutputFileName )
134 
135  LOG(info) << "Init Root Output to " << fsOutputFileName;
136 
137  fpFairRootMgr->InitSink();
138  // fEvtHeader = new FairEventHeader();
139  // fEvtHeader->SetRunId(iRunId);
140  // rootMgr->Register("EventHeader.", "Event", fEvtHeader, kTRUE);
141  // rootMgr->FillEventHeader(fEvtHeader);
142 
145  fpFairRootMgr->Register(
146  "TimesliceMetaData", "TS Meta Data", fTimeSliceMetaDataArray, kTRUE);
148  fpFairRootMgr->RegisterAny("T0Digi", fvDigiT0, kTRUE);
149  fpFairRootMgr->RegisterAny("StsDigi", fvDigiSts, kTRUE);
150  fpFairRootMgr->RegisterAny("MuchBeamTimeDigi", fvDigiMuch, kTRUE);
151  fpFairRootMgr->RegisterAny("TrdDigi", fvDigiTrd, kTRUE);
152  fpFairRootMgr->RegisterAny("TofDigi", fvDigiTof, kTRUE);
153  fpFairRootMgr->RegisterAny("RichDigi", fvDigiRich, kTRUE);
154  fpFairRootMgr->RegisterAny("PsdDigi", fvDigiPsd, kTRUE);
156  fpFairRootMgr->Register("CbmEvent", "Cbm Event", fEventsArray, kTRUE);
157  /*
158  TTree* outTree =new TTree(FairRootManager::GetTreeName(), "/cbmout", 99);
159  LOG(info) << "define Tree " << outTree->GetName();
160 
161  fpFairRootMgr->GetSink()->SetOutTree(outTree);
162 */
163  fpFairRootMgr->WriteFolder();
164 
165  LOG(info) << "Initialized outTree with rootMgr at " << fpFairRootMgr;
166 
168  if (kTRUE == fbFillHistos) {
169  /*
171  std::vector< std::pair< TNamed *, std::string > > vHistos = fpAlgo->GetHistoVector();
173  std::vector< std::pair< TCanvas *, std::string > > vCanvases = fpAlgo->GetCanvasVector();
174 
179  for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
180  {
181 // LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName()
182 // << " in " << vHistos[ uHisto ].second.data()
183 // ;
184  fArrayHisto.Add( vHistos[ uHisto ].first );
185  std::pair< std::string, std::string > psHistoConfig( vHistos[ uHisto ].first->GetName(),
186  vHistos[ uHisto ].second );
187  fvpsHistosFolder.push_back( psHistoConfig );
188 
190  FairMQMessagePtr messageHist( NewMessage() );
191  Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageHist, psHistoConfig );
192 
194  if( Send( messageHist, fsChannelNameHistosConfig ) < 0 )
195  {
196  throw InitTaskError( "Problem sending histo config" );
197  } // if( Send( messageHist, fsChannelNameHistosConfig ) < 0 )
198 
199  LOG(info) << "Config of hist " << psHistoConfig.first.data()
200  << " in folder " << psHistoConfig.second.data() ;
201  } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
202 
206  for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
207  {
208 // LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName()
209 // << " in " << vCanvases[ uCanv ].second.data();
210  std::string sCanvName = (vCanvases[ uCanv ].first)->GetName();
211  std::string sCanvConf = GenerateCanvasConfigString( vCanvases[ uCanv ].first );
212 
213  std::pair< std::string, std::string > psCanvConfig( sCanvName, sCanvConf );
214 
215  fvpsCanvasConfig.push_back( psCanvConfig );
216 
218  FairMQMessagePtr messageCan( NewMessage() );
219  Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageCan, psCanvConfig );
220 
222  if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 )
223  {
224  throw InitTaskError( "Problem sending canvas config" );
225  } // if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 )
226 
227  LOG(info) << "Config string of Canvas " << psCanvConfig.first.data()
228  << " is " << psCanvConfig.second.data() ;
229  } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
230 */
231  } // if( kTRUE == fbFillHistos )
232 
233 } catch (InitTaskError& e) {
234  LOG(error) << e.what();
235  // Wrapper defined in CbmMQDefs.h to support different FairMQ versions
237 }
238 
239 bool CbmDeviceMcbmEventSink::IsChannelNameAllowed(std::string channelName) {
240  for (auto const& entry : fsAllowedChannels) {
241  std::size_t pos1 = channelName.find(entry);
242  if (pos1 != std::string::npos) {
243  const vector<std::string>::const_iterator pos =
244  std::find(fsAllowedChannels.begin(), fsAllowedChannels.end(), entry);
245  const vector<std::string>::size_type idx =
246  pos - fsAllowedChannels.begin();
247  LOG(info) << "Found " << entry << " in " << channelName;
248  LOG(info) << "Channel name " << channelName
249  << " found in list of allowed channel names at position "
250  << idx;
251  return true;
252  } // if (pos1!=std::string::npos)
253  } // for(auto const &entry : fsAllowedChannels)
254  LOG(info) << "Channel name " << channelName
255  << " not found in list of allowed channel names.";
256  LOG(error) << "Stop device.";
257  return false;
258 }
259 /*
260 Bool_t CbmDeviceMcbmEventSink::InitContainers()
261 {
262  LOG(info) << "Init parameter containers for CbmDeviceMcbmEventSink.";
263 
264  if( kFALSE == InitParameters( fpAlgo ->GetParList() ) )
265  return kFALSE;
266 
268  fpAlgo ->SetIgnoreOverlapMs( fbIgnoreOverlapMs );
269 
270  Bool_t initOK = fpAlgo->InitContainers();
271 
272 // Bool_t initOK = fMonitorAlgo->ReInitContainers();
273 
274  return initOK;
275 }
276 
277 Bool_t CbmDeviceMcbmEventSink::InitParameters( TList* fParCList )
278 {
279  for( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
280  {
281  FairParGenericSet* tempObj = (FairParGenericSet*)( fParCList->At( iparC ) );
282  fParCList->Remove( tempObj );
283  std::string paramName{ tempObj->GetName() };
284  // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
285  // Should only be used for small data because of the cost of an additional copy
286 
287  // Her must come the proper Runid
288  std::string message = paramName + ",111";
289  LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message;
290 
291  FairMQMessagePtr req( NewSimpleMessage(message) );
292  FairMQMessagePtr rep( NewMessage() );
293 
294  FairParGenericSet* newObj = nullptr;
295 
296  if( Send(req, "parameters") > 0 )
297  {
298  if( Receive( rep, "parameters" ) >= 0)
299  {
300  if( 0 != rep->GetSize() )
301  {
302  CbmMQTMessage tmsg( rep->GetData(), rep->GetSize() );
303  newObj = static_cast< FairParGenericSet* >( tmsg.ReadObject( tmsg.GetClass() ) );
304  LOG( info ) << "Received unpack parameter from the server:";
305  newObj->print();
306  } // if( 0 != rep->GetSize() )
307  else
308  {
309  LOG( error ) << "Received empty reply. Parameter not available";
310  return kFALSE;
311  } // else of if( 0 != rep->GetSize() )
312  } // if( Receive( rep, "parameters" ) >= 0)
313  } // if( Send(req, "parameters") > 0 )
314  fParCList->AddAt( newObj, iparC );
315  delete tempObj;
316  } // for( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
317 
318  return kTRUE;
319 }
320 */
321 //--------------------------------------------------------------------//
322 // handler is called whenever a message arrives on fsChannelNameMissedTs, with a reference to the message and a sub-channel index (here 0)
323 bool CbmDeviceMcbmEventSink::HandleMissTsData(FairMQMessagePtr& msg,
324  int /*index*/) {
325  std::vector<uint64_t> vIndices;
326  std::string msgStrMissTs(static_cast<char*>(msg->GetData()), msg->GetSize());
327  std::istringstream issMissTs(msgStrMissTs);
328  boost::archive::binary_iarchive inputArchiveMissTs(issMissTs);
329  inputArchiveMissTs >> vIndices;
330 
331  fvulMissedTsIndices.insert(
332  fvulMissedTsIndices.end(), vIndices.begin(), vIndices.end());
333 
335  CheckTsQueues();
336 
337  return true;
338 }
339 //--------------------------------------------------------------------//
340 // handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
341 bool CbmDeviceMcbmEventSink::HandleData(FairMQParts& parts, int /*index*/) {
342  fulNumMessages++;
343  LOG(debug) << "Received message number " << fulNumMessages << " with "
344  << parts.Size() << " parts"
345  << ", size0: " << parts.At(0)->GetSize();
346 
347  if (0 == fulNumMessages % 10000)
348  LOG(info) << "Received " << fulNumMessages << " messages";
349 
351  uint32_t uPartIdx = 0;
354  /*
355  std::string msgStrTsMeta( static_cast< char * >( parts.At( uPartIdx )->GetData() ),
356  ( parts.At( uPartIdx ) )->GetSize() );
357  std::istringstream issTsMeta(msgStrTsMeta);
358  boost::archive::binary_iarchive inputArchiveTsMeta(issTsMeta);
359  inputArchiveTsMeta >> (*fTsMetaData);
360  ++uPartIdx;
361 */
362  Deserialize<RootSerializer>(*parts.At(uPartIdx), fTsMetaData);
363  LOG(debug) << "TS metadata extracted";
364 
366  if (fuPrevTsIndex + 1 == fTsMetaData->GetIndex()
367  || (0 == fuPrevTsIndex && 0 == fulTsCounter
368  && 0 == fTsMetaData->GetIndex())) {
369  LOG(debug) << "TS direct to dump";
371  PrepareTreeEntry(parts);
373  DumpTreeEntry();
375  fuPrevTsIndex = fTsMetaData->GetIndex();
376  fulTsCounter++;
377  } // if( fuPrevTsIndex + 1 == fTsMetaData->GetIndex() || ( 0 == fuPrevTsIndex && 0 == fulTsCounter ) )
378  else {
379  LOG(debug) << "TS direct to storage";
381  fmFullTsStorage.emplace_hint(
382  fmFullTsStorage.end(),
383  std::pair<uint64_t, CbmUnpackedTimeslice>(
384  fTsMetaData->GetIndex(), std::move(CbmUnpackedTimeslice(parts))));
385  } // else of if( fuPrevTsIndex + 1 == fTsMetaData->GetIndex() || ( 0 == fuPrevTsIndex && 0 == fulTsCounter && 0 == fTsMetaData->GetIndex() )
386  LOG(debug) << "TS metadata checked";
387 
389  // delete fTsMetaData;
390 
392  CheckTsQueues();
393  LOG(debug) << "TS queues checked";
394 
396  if (kTRUE == fbFillHistos) {
400  std::chrono::system_clock::time_point currentTime =
401  std::chrono::system_clock::now();
402  std::chrono::duration<double_t> elapsedSeconds =
403  currentTime - fLastPublishTime;
404  if ((fdMaxPublishTime < elapsedSeconds.count())
405  || (0 == fulNumMessages % fuPublishFreqTs
406  && fdMinPublishTime < elapsedSeconds.count())) {
407  SendHistograms();
408  fLastPublishTime = std::chrono::system_clock::now();
409  } // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
410  } // if( kTRUE == fbFillHistos )
411 
412  return true;
413 }
414 //--------------------------------------------------------------------//
415 bool CbmDeviceMcbmEventSink::HandleCommand(FairMQMessagePtr& msg,
416  int /*index*/) {
417  /*
418  std::string sCommand( static_cast< char * >( msg->GetData() ),
419  msg->GetSize() );
420 */
421  std::string sCommand;
422  std::string msgStrCmd(static_cast<char*>(msg->GetData()), msg->GetSize());
423  std::istringstream issCmd(msgStrCmd);
424  boost::archive::binary_iarchive inputArchiveCmd(issCmd);
425  inputArchiveCmd >> sCommand;
426 
427  std::string sCmdTag = sCommand;
428  size_t charPosDel = sCommand.find(' ');
429  if (std::string::npos != charPosDel) {
430  sCmdTag = sCommand.substr(0, charPosDel);
431  } // if( std::string::npos != charPosDel )
432 
433  if ("EOF" == sCmdTag) {
434  fbReceivedEof = true;
435 
437  if (std::string::npos == charPosDel) {
438  LOG(fatal) << "CbmDeviceMcbmEventSink::HandleCommand => "
439  << "Incomplete EOF command received: " << sCommand;
440  return false;
441  } // if( std::string::npos == charPosDel )
443  charPosDel++;
444  std::string sNext = sCommand.substr(charPosDel);
445  charPosDel = sNext.find(' ');
446 
447  if (std::string::npos == charPosDel) {
448  LOG(fatal) << "CbmDeviceMcbmEventSink::HandleCommand => "
449  << "Incomplete EOF command received: " << sCommand;
450  return false;
451  } // if( std::string::npos == charPosDel )
452  fuLastTsIndex = std::stoul(sNext.substr(0, charPosDel));
454  charPosDel++;
455  fuTotalTsCount = std::stoul(sNext.substr(charPosDel));
456 
457  LOG(info) << "CbmDeviceMcbmEventSink::HandleCommand => "
458  << "Received EOF command with final TS index " << fuLastTsIndex
459  << " and total nb TS " << fuTotalTsCount;
461  if (fuPrevTsIndex == fuLastTsIndex && fulTsCounter == fuTotalTsCount) {
462  LOG(info) << "CbmDeviceMcbmEventSink::HandleCommand => "
463  << "Found final TS index " << fuLastTsIndex
464  << " and total nb TS " << fuTotalTsCount;
465  Finish();
466  } // if( fuPrevTsIndex == fuLastTsIndex && fulTsCounter == fuTotalTsCount )
467  } // if( "EOF" == sCmdTag )
468  else if ("STOP" == sCmdTag) {
471  Finish();
472  } // else if( "STOP" == sCmdTag )
473  else {
474  LOG(warning) << "Unknown command received: " << sCmdTag
475  << " => will be ignored!";
476  } // else if command not recognized
477 
478  return true;
479 }
480 //--------------------------------------------------------------------//
482  bool bHoleFoundInBothQueues = false;
483 
484  std::map<uint64_t, CbmUnpackedTimeslice>::iterator itFullTs =
485  fmFullTsStorage.begin();
486  std::vector<uint64_t>::iterator itMissTs = fvulMissedTsIndices.begin();
487 
488  while (!bHoleFoundInBothQueues) {
490  if (fmFullTsStorage.end() != itFullTs
491  && fuPrevTsIndex + 1 == (*itFullTs).first) {
493  PrepareTreeEntry((*itFullTs).second);
495  DumpTreeEntry();
496 
498  fuPrevTsIndex = (*itFullTs).first;
499  fulTsCounter++;
500 
502  ++itFullTs;
503  continue;
504  } // if( fmFullTsStorage.end() != itFullTs && fuPrevTsIndex + 1 == (*itFullTs).first() )
506  if (fvulMissedTsIndices.end() != itMissTs
507  && fuPrevTsIndex + 1 == (*itMissTs)) {
509  new (
510  (*fTimeSliceMetaDataArray)[fTimeSliceMetaDataArray->GetEntriesFast()])
511  TimesliceMetaData(0, 0, 0, (*itMissTs));
512 
514  DumpTreeEntry();
515 
517  fuPrevTsIndex = (*itMissTs);
518  fulMissedTsCounter++;
519 
521  ++itMissTs;
522  continue;
523  } // if( fvulMissedTsIndices.end() != itMissTs && fuPrevTsIndex + 1 == (*itMissTs ) )
524 
526  bHoleFoundInBothQueues = true;
527  } // while( !bHoleFoundInBothQueues )
528 
530  fmFullTsStorage.erase(fmFullTsStorage.begin(), itFullTs);
531  fvulMissedTsIndices.erase(fvulMissedTsIndices.begin(), itMissTs);
532 
534  if (fbReceivedEof && fuPrevTsIndex == fuLastTsIndex
535  && fulTsCounter == fuTotalTsCount) {
536  LOG(info) << "CbmDeviceMcbmEventSink::CheckTsQueues => "
537  << "Found final TS index " << fuLastTsIndex << " and total nb TS "
538  << fuTotalTsCount;
539  Finish();
540  } // if( fbReceivedEof && fuPrevTsIndex == fuLastTsIndex && fulTsCounter == fuTotalTsCount )
541 }
542 //--------------------------------------------------------------------//
546 
548  new ((*fTimeSliceMetaDataArray)[fTimeSliceMetaDataArray->GetEntriesFast()])
549  TimesliceMetaData(std::move(unpTs.fTsMetaData));
550 
551  /*
554  fvDigiT0->insert( fvDigiT0->end(), unpTs.fvDigiT0.begin(), unpTs.fvDigiT0.end() );
556  fvDigiSts->insert( fvDigiSts->end(), unpTs.fvDigiSts.begin(), unpTs.fvDigiSts.end() );
558  fvDigiMuch->insert( fvDigiMuch->end(), unpTs.fvDigiMuch.begin(), unpTs.fvDigiMuch.end() );
560  fvDigiTrd->insert( fvDigiTrd->end(), unpTs.fvDigiTrd.begin(), unpTs.fvDigiTrd.end() );
562  fvDigiTof->insert( fvDigiTof->end(), unpTs.fvDigiTof.begin(), unpTs.fvDigiTof.end() );
564  fvDigiRich->insert( fvDigiRich->end(), unpTs.fvDigiRich.begin(), unpTs.fvDigiRich.end() );
566  fvDigiPsd->insert( fvDigiPsd->end(), unpTs.fvDigiPsd.begin(), unpTs.fvDigiPsd.end() );
567 */
570  (*fvDigiT0) = std::move(unpTs.fvDigiT0);
572  (*fvDigiSts) = std::move(unpTs.fvDigiSts);
574  (*fvDigiMuch) = std::move(unpTs.fvDigiMuch);
576  (*fvDigiTrd) = std::move(unpTs.fvDigiTrd);
578  (*fvDigiTof) = std::move(unpTs.fvDigiTof);
580  (*fvDigiRich) = std::move(unpTs.fvDigiRich);
582  (*fvDigiPsd) = std::move(unpTs.fvDigiPsd);
583 
585  fEventsArray->AbsorbObjects(&(unpTs.fEventsArray));
586 }
588  // Unpacked digis + CbmEvent output to root file
589  /*
590  * NH style
591 // fpFairRootMgr->FillEventHeader(fEvtHeader);
592 // LOG(info) << "Fill WriteOutBuffer with FairRootManager at " << fpFairRootMgr;
593 // fpOutRootFile->cd();
594  fpFairRootMgr->Fill();
595  fpFairRootMgr->StoreWriteoutBufferData( fpFairRootMgr->GetEventTime() );
596  //fpFairRootMgr->StoreAllWriteoutBufferData();
597  fpFairRootMgr->DeleteOldWriteoutBufferData();
598 */
600  fpFairRootMgr->StoreWriteoutBufferData(fpFairRootMgr->GetEventTime());
601  fpFairRootMgr->Fill();
602  fpFairRootMgr->DeleteOldWriteoutBufferData();
603 
605  fTimeSliceMetaDataArray->Clear();
606 
608  fvDigiT0->clear();
609  fvDigiSts->clear();
610  fvDigiMuch->clear();
611  fvDigiTrd->clear();
612  fvDigiTof->clear();
613  fvDigiRich->clear();
614  fvDigiPsd->clear();
615 
617  // fEventsArray->Delete();
618  fEventsArray->Clear("C");
619  // fEventsArray->Clear();
620 }
621 
622 //--------------------------------------------------------------------//
625  FairMQMessagePtr message(NewMessage());
626  Serialize<RootSerializer>(*message, &fArrayHisto);
627 
629  if (Send(message, fsChannelNameHistosInput) < 0) {
630  LOG(error) << "Problem sending data";
631  return false;
632  } // if( Send( message, fsChannelNameHistosInput ) < 0 )
633 
635  // fpAlgo->ResetHistograms( kFALSE );
636 
637  return true;
638 }
639 
640 //--------------------------------------------------------------------//
643 
645  if (!fbFinishDone) Finish();
646 
648  fTimeSliceMetaDataArray->Clear();
649  delete fTimeSliceMetaDataArray;
650  delete fTsMetaData;
651 
653  fvDigiT0->clear();
654  fvDigiSts->clear();
655  fvDigiMuch->clear();
656  fvDigiTrd->clear();
657  fvDigiTof->clear();
658  fvDigiRich->clear();
659  fvDigiPsd->clear();
660 
662  fEventsArray->Clear();
663  delete fEventsArray;
664 
665  delete fpRun;
666 }
667 
669  // Clean closure of output to root file
670  fpFairRootMgr->Write();
671  // fpFairRootMgr->GetSource()->Close();
672  fpFairRootMgr->CloseSink();
673  LOG(info) << "File closed after saving "
674  << (fulTsCounter + fulMissedTsCounter) << " TS (" << fulTsCounter
675  << " full ones and " << fulMissedTsCounter << " missed/empty ones)";
676 
677  if (kTRUE == fbFillHistos) {
678  SendHistograms();
679  fLastPublishTime = std::chrono::system_clock::now();
680  } // if( kTRUE == fbFillHistos )
681 
682  ChangeState(fair::mq::Transition::Stop);
683  std::this_thread::sleep_for(std::chrono::milliseconds(3000));
684  ChangeState(fair::mq::Transition::End);
685 
686  fbFinishDone = kTRUE;
687 }
688 
690  : fEventsArray("CbmEvent", 500) {
692  uint32_t uPartIdx = 0;
695  /*
696  std::string msgStrTsMeta( static_cast< char * >( parts.At( uPartIdx )->GetData() ),
697  ( parts.At( uPartIdx ) )->GetSize() );
698  std::istringstream issTsMeta(msgStrTsMeta);
699  boost::archive::binary_iarchive inputArchiveTsMeta(issTsMeta);
700  inputArchiveTsMeta >> (*fTsMetaData);
701  ++uPartIdx;
702 */
703  TObject* tempObjectMeta = nullptr;
704  RootSerializer().Deserialize(*parts.At(uPartIdx), tempObjectMeta);
705  ++uPartIdx;
706 
707  if (TString(tempObjectMeta->ClassName()).EqualTo("TimesliceMetaData")) {
708  fTsMetaData = *(static_cast<TimesliceMetaData*>(tempObjectMeta));
709  } // if( TString( tempObject->ClassName() ).EqualTo( "TClonesArray") )
710 
712  std::string msgStrT0(static_cast<char*>(parts.At(uPartIdx)->GetData()),
713  (parts.At(uPartIdx))->GetSize());
714  std::istringstream issT0(msgStrT0);
715  boost::archive::binary_iarchive inputArchiveT0(issT0);
716  inputArchiveT0 >> fvDigiT0;
717  ++uPartIdx;
718 
720  std::string msgStrSts(static_cast<char*>(parts.At(uPartIdx)->GetData()),
721  (parts.At(uPartIdx))->GetSize());
722  std::istringstream issSts(msgStrSts);
723  boost::archive::binary_iarchive inputArchiveSts(issSts);
724  inputArchiveSts >> fvDigiSts;
725  ++uPartIdx;
726 
728  std::string msgStrMuch(static_cast<char*>(parts.At(uPartIdx)->GetData()),
729  (parts.At(uPartIdx))->GetSize());
730  std::istringstream issMuch(msgStrMuch);
731  boost::archive::binary_iarchive inputArchiveMuch(issMuch);
732  inputArchiveMuch >> fvDigiMuch;
733  ++uPartIdx;
734 
736  std::string msgStrTrd(static_cast<char*>(parts.At(uPartIdx)->GetData()),
737  (parts.At(uPartIdx))->GetSize());
738  std::istringstream issTrd(msgStrTrd);
739  boost::archive::binary_iarchive inputArchiveTrd(issTrd);
740  inputArchiveTrd >> fvDigiTrd;
741  ++uPartIdx;
742 
744  std::string msgStrTof(static_cast<char*>(parts.At(uPartIdx)->GetData()),
745  (parts.At(uPartIdx))->GetSize());
746  std::istringstream issTof(msgStrTof);
747  boost::archive::binary_iarchive inputArchiveTof(issTof);
748  inputArchiveTof >> fvDigiTof;
749  ++uPartIdx;
750 
752  std::string msgStrRich(static_cast<char*>(parts.At(uPartIdx)->GetData()),
753  (parts.At(uPartIdx))->GetSize());
754  std::istringstream issRich(msgStrRich);
755  boost::archive::binary_iarchive inputArchiveRich(issRich);
756  inputArchiveRich >> fvDigiRich;
757  ++uPartIdx;
758 
760  std::string msgStrPsd(static_cast<char*>(parts.At(uPartIdx)->GetData()),
761  (parts.At(uPartIdx))->GetSize());
762  std::istringstream issPsd(msgStrPsd);
763  boost::archive::binary_iarchive inputArchivePsd(issPsd);
764  inputArchivePsd >> fvDigiPsd;
765  ++uPartIdx;
766 
768  TObject* tempObject = nullptr;
769  RootSerializer().Deserialize(*parts.At(uPartIdx), tempObject);
770  ++uPartIdx;
771 
772  if (TString(tempObject->ClassName()).EqualTo("TClonesArray")) {
773  TClonesArray* arrayEventsIn = static_cast<TClonesArray*>(tempObject);
774 
776  fEventsArray.AbsorbObjects(arrayEventsIn);
777  } // if( TString( tempObject->ClassName() ).EqualTo( "TClonesArray") )
778 }
CbmDeviceMcbmEventSink::DumpTreeEntry
void DumpTreeEntry()
Definition: CbmDeviceMcbmEventSink.cxx:587
CbmDeviceMcbmEventSink::HandleMissTsData
bool HandleMissTsData(FairMQMessagePtr &, int)
Definition: CbmDeviceMcbmEventSink.cxx:323
CbmUnpackedTimeslice
Definition: CbmDeviceMcbmEventSink.h:42
CbmDeviceMcbmEventSink::CheckTsQueues
void CheckTsQueues()
Definition: CbmDeviceMcbmEventSink.cxx:481
CbmDeviceMcbmEventSink.h
CbmDeviceMcbmEventSink::IsChannelNameAllowed
bool IsChannelNameAllowed(std::string channelName)
Internal methods.
Definition: CbmDeviceMcbmEventSink.cxx:239
CbmUnpackedTimeslice::fvDigiT0
std::vector< CbmTofDigi > fvDigiT0
Definition: CbmDeviceMcbmEventSink.h:48
InitTaskError
CBM headers.
Definition: CbmDeviceEventBuilderEtofStar2019.cxx:36
CbmFlesCanvasTools.h
TimesliceMetaData
Definition: TimesliceMetaData.h:11
CbmDeviceMcbmEventSink::CbmDeviceMcbmEventSink
CbmDeviceMcbmEventSink()
Definition: CbmDeviceMcbmEventSink.cxx:52
cbm::mq::Transition::ErrorFound
@ ErrorFound
CbmEvent.h
CbmDeviceMcbmEventSink::HandleData
bool HandleData(FairMQParts &, int)
Definition: CbmDeviceMcbmEventSink.cxx:341
CbmUnpackedTimeslice::CbmUnpackedTimeslice
CbmUnpackedTimeslice(FairMQParts &parts)
TODO: rename to CbmTsWithEvents.
Definition: CbmDeviceMcbmEventSink.cxx:689
CbmUnpackedTimeslice::fvDigiTof
std::vector< CbmTofDigi > fvDigiTof
Definition: CbmDeviceMcbmEventSink.h:52
CbmDeviceMcbmEventSink::PrepareTreeEntry
void PrepareTreeEntry(CbmUnpackedTimeslice unpTs)
Definition: CbmDeviceMcbmEventSink.cxx:543
CbmUnpackedTimeslice::fvDigiMuch
std::vector< CbmMuchBeamTimeDigi > fvDigiMuch
Definition: CbmDeviceMcbmEventSink.h:50
CbmDeviceMcbmEventSink::HandleCommand
bool HandleCommand(FairMQMessagePtr &, int)
Definition: CbmDeviceMcbmEventSink.cxx:415
CbmDeviceMcbmEventSink::~CbmDeviceMcbmEventSink
virtual ~CbmDeviceMcbmEventSink()
Definition: CbmDeviceMcbmEventSink.cxx:641
CbmUnpackedTimeslice::fvDigiTrd
std::vector< CbmTrdDigi > fvDigiTrd
Definition: CbmDeviceMcbmEventSink.h:51
CbmUnpackedTimeslice::fEventsArray
TClonesArray fEventsArray
Definition: CbmDeviceMcbmEventSink.h:55
CbmDeviceMcbmEventSink::Finish
void Finish()
Definition: CbmDeviceMcbmEventSink.cxx:668
pos
TVector3 pos
Definition: CbmMvdSensorDigiToHitTask.cxx:60
CbmUnpackedTimeslice::fvDigiRich
std::vector< CbmRichDigi > fvDigiRich
Definition: CbmDeviceMcbmEventSink.h:53
CbmUnpackedTimeslice::fTsMetaData
TimesliceMetaData fTsMetaData
Definition: CbmDeviceMcbmEventSink.h:47
TimesliceMetaData.h
CbmMQDefs.h
CbmUnpackedTimeslice::fvDigiPsd
std::vector< CbmPsdDigi > fvDigiPsd
Definition: CbmDeviceMcbmEventSink.h:54
cbm::mq::ChangeState
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)
Definition: CbmMQDefs.h:19
CbmUnpackedTimeslice::fvDigiSts
std::vector< CbmStsDigi > fvDigiSts
Definition: CbmDeviceMcbmEventSink.h:49
CbmDeviceMcbmEventSink::InitTask
virtual void InitTask()
Definition: CbmDeviceMcbmEventSink.cxx:54
CbmDeviceMcbmEventSink::SendHistograms
bool SendHistograms()
Definition: CbmDeviceMcbmEventSink.cxx:623