19 #include "BoostSerializer.h"
20 #include "FairMQLogger.h"
21 #include "FairMQProgOptions.h"
22 #include "FairParGenericSet.h"
23 #include "FairRootFileSink.h"
24 #include "FairRootManager.h"
25 #include "FairRunOnline.h"
26 #include "RootSerializer.h"
34 #include <boost/archive/binary_iarchive.hpp>
35 #include <boost/serialization/utility.hpp>
45 using std::runtime_error::runtime_error;
56 LOG(info) <<
"Init options for CbmDeviceMcbmEventSink.";
58 fsOutputFileName = fConfig->GetValue<std::string>(
"OutFileName");
60 fsChannelNameDataInput = fConfig->GetValue<std::string>(
"EvtNameIn");
61 fsAllowedChannels[0] = fsChannelNameDataInput;
63 fbFillHistos = fConfig->GetValue<
bool>(
"FillHistos");
64 fsChannelNameHistosInput = fConfig->GetValue<std::string>(
"ChNameIn");
65 fsChannelNameHistosConfig = fConfig->GetValue<std::string>(
"ChNameHistCfg");
66 fsChannelNameCanvasConfig = fConfig->GetValue<std::string>(
"ChNameCanvCfg");
67 fuPublishFreqTs = fConfig->GetValue<uint32_t>(
"PubFreqTs");
68 fdMinPublishTime = fConfig->GetValue<double_t>(
"PubTimeMin");
69 fdMaxPublishTime = fConfig->GetValue<double_t>(
"PubTimeMax");
87 int noChannel = fChannels.size();
88 LOG(info) <<
"Number of defined channels: " << noChannel;
89 for (
auto const& entry : fChannels) {
90 LOG(info) <<
"Channel name: " << entry.first;
91 if (std::string::npos != entry.first.find(fsChannelNameDataInput)) {
92 if (!IsChannelNameAllowed(entry.first))
101 fvDigiT0 =
new std::vector<CbmTofDigi>();
102 fvDigiSts =
new std::vector<CbmStsDigi>();
103 fvDigiMuch =
new std::vector<CbmMuchBeamTimeDigi>();
104 fvDigiTrd =
new std::vector<CbmTrdDigi>();
105 fvDigiTof =
new std::vector<CbmTofDigi>();
106 fvDigiRich =
new std::vector<CbmRichDigi>();
107 fvDigiPsd =
new std::vector<CbmPsdDigi>();
111 fTimeSliceMetaDataArray =
new TClonesArray(
"TimesliceMetaData", 1);
112 if (NULL == fTimeSliceMetaDataArray) {
113 throw InitTaskError(
"Failed creating the TS meta data TClonesarray ");
117 fEventsArray =
new TClonesArray(
"CbmEvent", 500);
118 if (NULL == fEventsArray) {
119 throw InitTaskError(
"Failed creating the Events TClonesarray ");
123 if (
"" != fsOutputFileName) {
124 fpRun =
new FairRunOnline();
125 fpFairRootMgr = FairRootManager::Instance();
126 fpFairRootMgr->SetSink(
new FairRootFileSink(fsOutputFileName));
127 if (
nullptr == fpFairRootMgr->GetOutFile()) {
135 LOG(info) <<
"Init Root Output to " << fsOutputFileName;
137 fpFairRootMgr->InitSink();
145 fpFairRootMgr->Register(
146 "TimesliceMetaData",
"TS Meta Data", fTimeSliceMetaDataArray, kTRUE);
148 fpFairRootMgr->RegisterAny(
"T0Digi", fvDigiT0, kTRUE);
149 fpFairRootMgr->RegisterAny(
"StsDigi", fvDigiSts, kTRUE);
150 fpFairRootMgr->RegisterAny(
"MuchBeamTimeDigi", fvDigiMuch, kTRUE);
151 fpFairRootMgr->RegisterAny(
"TrdDigi", fvDigiTrd, kTRUE);
152 fpFairRootMgr->RegisterAny(
"TofDigi", fvDigiTof, kTRUE);
153 fpFairRootMgr->RegisterAny(
"RichDigi", fvDigiRich, kTRUE);
154 fpFairRootMgr->RegisterAny(
"PsdDigi", fvDigiPsd, kTRUE);
156 fpFairRootMgr->Register(
"CbmEvent",
"Cbm Event", fEventsArray, kTRUE);
163 fpFairRootMgr->WriteFolder();
165 LOG(info) <<
"Initialized outTree with rootMgr at " << fpFairRootMgr;
168 if (kTRUE == fbFillHistos) {
234 LOG(error) << e.what();
240 for (
auto const& entry : fsAllowedChannels) {
241 std::size_t pos1 = channelName.find(entry);
242 if (pos1 != std::string::npos) {
243 const vector<std::string>::const_iterator
pos =
244 std::find(fsAllowedChannels.begin(), fsAllowedChannels.end(), entry);
245 const vector<std::string>::size_type idx =
246 pos - fsAllowedChannels.begin();
247 LOG(info) <<
"Found " << entry <<
" in " << channelName;
248 LOG(info) <<
"Channel name " << channelName
249 <<
" found in list of allowed channel names at position "
254 LOG(info) <<
"Channel name " << channelName
255 <<
" not found in list of allowed channel names.";
256 LOG(error) <<
"Stop device.";
325 std::vector<uint64_t> vIndices;
326 std::string msgStrMissTs(
static_cast<char*
>(msg->GetData()), msg->GetSize());
327 std::istringstream issMissTs(msgStrMissTs);
328 boost::archive::binary_iarchive inputArchiveMissTs(issMissTs);
329 inputArchiveMissTs >> vIndices;
331 fvulMissedTsIndices.insert(
332 fvulMissedTsIndices.end(), vIndices.begin(), vIndices.end());
343 LOG(debug) <<
"Received message number " << fulNumMessages <<
" with "
344 << parts.Size() <<
" parts"
345 <<
", size0: " << parts.At(0)->GetSize();
347 if (0 == fulNumMessages % 10000)
348 LOG(info) <<
"Received " << fulNumMessages <<
" messages";
351 uint32_t uPartIdx = 0;
362 Deserialize<RootSerializer>(*parts.At(uPartIdx), fTsMetaData);
363 LOG(debug) <<
"TS metadata extracted";
366 if (fuPrevTsIndex + 1 == fTsMetaData->GetIndex()
367 || (0 == fuPrevTsIndex && 0 == fulTsCounter
368 && 0 == fTsMetaData->GetIndex())) {
369 LOG(debug) <<
"TS direct to dump";
371 PrepareTreeEntry(parts);
375 fuPrevTsIndex = fTsMetaData->GetIndex();
379 LOG(debug) <<
"TS direct to storage";
381 fmFullTsStorage.emplace_hint(
382 fmFullTsStorage.end(),
383 std::pair<uint64_t, CbmUnpackedTimeslice>(
386 LOG(debug) <<
"TS metadata checked";
393 LOG(debug) <<
"TS queues checked";
396 if (kTRUE == fbFillHistos) {
400 std::chrono::system_clock::time_point currentTime =
401 std::chrono::system_clock::now();
402 std::chrono::duration<double_t> elapsedSeconds =
403 currentTime - fLastPublishTime;
404 if ((fdMaxPublishTime < elapsedSeconds.count())
405 || (0 == fulNumMessages % fuPublishFreqTs
406 && fdMinPublishTime < elapsedSeconds.count())) {
408 fLastPublishTime = std::chrono::system_clock::now();
421 std::string sCommand;
422 std::string msgStrCmd(
static_cast<char*
>(msg->GetData()), msg->GetSize());
423 std::istringstream issCmd(msgStrCmd);
424 boost::archive::binary_iarchive inputArchiveCmd(issCmd);
425 inputArchiveCmd >> sCommand;
427 std::string sCmdTag = sCommand;
428 size_t charPosDel = sCommand.find(
' ');
429 if (std::string::npos != charPosDel) {
430 sCmdTag = sCommand.substr(0, charPosDel);
433 if (
"EOF" == sCmdTag) {
434 fbReceivedEof =
true;
437 if (std::string::npos == charPosDel) {
438 LOG(fatal) <<
"CbmDeviceMcbmEventSink::HandleCommand => "
439 <<
"Incomplete EOF command received: " << sCommand;
444 std::string sNext = sCommand.substr(charPosDel);
445 charPosDel = sNext.find(
' ');
447 if (std::string::npos == charPosDel) {
448 LOG(fatal) <<
"CbmDeviceMcbmEventSink::HandleCommand => "
449 <<
"Incomplete EOF command received: " << sCommand;
452 fuLastTsIndex = std::stoul(sNext.substr(0, charPosDel));
455 fuTotalTsCount = std::stoul(sNext.substr(charPosDel));
457 LOG(info) <<
"CbmDeviceMcbmEventSink::HandleCommand => "
458 <<
"Received EOF command with final TS index " << fuLastTsIndex
459 <<
" and total nb TS " << fuTotalTsCount;
461 if (fuPrevTsIndex == fuLastTsIndex && fulTsCounter == fuTotalTsCount) {
462 LOG(info) <<
"CbmDeviceMcbmEventSink::HandleCommand => "
463 <<
"Found final TS index " << fuLastTsIndex
464 <<
" and total nb TS " << fuTotalTsCount;
468 else if (
"STOP" == sCmdTag) {
474 LOG(warning) <<
"Unknown command received: " << sCmdTag
475 <<
" => will be ignored!";
482 bool bHoleFoundInBothQueues =
false;
484 std::map<uint64_t, CbmUnpackedTimeslice>::iterator itFullTs =
485 fmFullTsStorage.begin();
486 std::vector<uint64_t>::iterator itMissTs = fvulMissedTsIndices.begin();
488 while (!bHoleFoundInBothQueues) {
490 if (fmFullTsStorage.end() != itFullTs
491 && fuPrevTsIndex + 1 == (*itFullTs).first) {
493 PrepareTreeEntry((*itFullTs).second);
498 fuPrevTsIndex = (*itFullTs).first;
506 if (fvulMissedTsIndices.end() != itMissTs
507 && fuPrevTsIndex + 1 == (*itMissTs)) {
510 (*fTimeSliceMetaDataArray)[fTimeSliceMetaDataArray->GetEntriesFast()])
517 fuPrevTsIndex = (*itMissTs);
518 fulMissedTsCounter++;
526 bHoleFoundInBothQueues =
true;
530 fmFullTsStorage.erase(fmFullTsStorage.begin(), itFullTs);
531 fvulMissedTsIndices.erase(fvulMissedTsIndices.begin(), itMissTs);
534 if (fbReceivedEof && fuPrevTsIndex == fuLastTsIndex
535 && fulTsCounter == fuTotalTsCount) {
536 LOG(info) <<
"CbmDeviceMcbmEventSink::CheckTsQueues => "
537 <<
"Found final TS index " << fuLastTsIndex <<
" and total nb TS "
548 new ((*fTimeSliceMetaDataArray)[fTimeSliceMetaDataArray->GetEntriesFast()])
570 (*fvDigiT0) = std::move(unpTs.
fvDigiT0);
572 (*fvDigiSts) = std::move(unpTs.
fvDigiSts);
576 (*fvDigiTrd) = std::move(unpTs.
fvDigiTrd);
578 (*fvDigiTof) = std::move(unpTs.
fvDigiTof);
582 (*fvDigiPsd) = std::move(unpTs.
fvDigiPsd);
600 fpFairRootMgr->StoreWriteoutBufferData(fpFairRootMgr->GetEventTime());
601 fpFairRootMgr->Fill();
602 fpFairRootMgr->DeleteOldWriteoutBufferData();
605 fTimeSliceMetaDataArray->Clear();
618 fEventsArray->Clear(
"C");
625 FairMQMessagePtr message(NewMessage());
626 Serialize<RootSerializer>(*message, &fArrayHisto);
629 if (Send(message, fsChannelNameHistosInput) < 0) {
630 LOG(error) <<
"Problem sending data";
645 if (!fbFinishDone) Finish();
648 fTimeSliceMetaDataArray->Clear();
649 delete fTimeSliceMetaDataArray;
662 fEventsArray->Clear();
670 fpFairRootMgr->Write();
672 fpFairRootMgr->CloseSink();
673 LOG(info) <<
"File closed after saving "
674 << (fulTsCounter + fulMissedTsCounter) <<
" TS (" << fulTsCounter
675 <<
" full ones and " << fulMissedTsCounter <<
" missed/empty ones)";
677 if (kTRUE == fbFillHistos) {
679 fLastPublishTime = std::chrono::system_clock::now();
683 std::this_thread::sleep_for(std::chrono::milliseconds(3000));
686 fbFinishDone = kTRUE;
690 : fEventsArray(
"CbmEvent", 500) {
692 uint32_t uPartIdx = 0;
703 TObject* tempObjectMeta =
nullptr;
704 RootSerializer().Deserialize(*parts.At(uPartIdx), tempObjectMeta);
707 if (TString(tempObjectMeta->ClassName()).EqualTo(
"TimesliceMetaData")) {
712 std::string msgStrT0(
static_cast<char*
>(parts.At(uPartIdx)->GetData()),
713 (parts.At(uPartIdx))->GetSize());
714 std::istringstream issT0(msgStrT0);
715 boost::archive::binary_iarchive inputArchiveT0(issT0);
720 std::string msgStrSts(
static_cast<char*
>(parts.At(uPartIdx)->GetData()),
721 (parts.At(uPartIdx))->GetSize());
722 std::istringstream issSts(msgStrSts);
723 boost::archive::binary_iarchive inputArchiveSts(issSts);
728 std::string msgStrMuch(
static_cast<char*
>(parts.At(uPartIdx)->GetData()),
729 (parts.At(uPartIdx))->GetSize());
730 std::istringstream issMuch(msgStrMuch);
731 boost::archive::binary_iarchive inputArchiveMuch(issMuch);
736 std::string msgStrTrd(
static_cast<char*
>(parts.At(uPartIdx)->GetData()),
737 (parts.At(uPartIdx))->GetSize());
738 std::istringstream issTrd(msgStrTrd);
739 boost::archive::binary_iarchive inputArchiveTrd(issTrd);
744 std::string msgStrTof(
static_cast<char*
>(parts.At(uPartIdx)->GetData()),
745 (parts.At(uPartIdx))->GetSize());
746 std::istringstream issTof(msgStrTof);
747 boost::archive::binary_iarchive inputArchiveTof(issTof);
752 std::string msgStrRich(
static_cast<char*
>(parts.At(uPartIdx)->GetData()),
753 (parts.At(uPartIdx))->GetSize());
754 std::istringstream issRich(msgStrRich);
755 boost::archive::binary_iarchive inputArchiveRich(issRich);
760 std::string msgStrPsd(
static_cast<char*
>(parts.At(uPartIdx)->GetData()),
761 (parts.At(uPartIdx))->GetSize());
762 std::istringstream issPsd(msgStrPsd);
763 boost::archive::binary_iarchive inputArchivePsd(issPsd);
768 TObject* tempObject =
nullptr;
769 RootSerializer().Deserialize(*parts.At(uPartIdx), tempObject);
772 if (TString(tempObject->ClassName()).EqualTo(
"TClonesArray")) {
773 TClonesArray* arrayEventsIn =
static_cast<TClonesArray*
>(tempObject);