20 #include "StorableTimeslice.hpp"
22 #include "BoostSerializer.h"
23 #include "FairMQLogger.h"
24 #include "FairMQProgOptions.h"
25 #include "FairParGenericSet.h"
26 #include "RootSerializer.h"
38 #include <boost/archive/binary_iarchive.hpp>
39 #include <boost/serialization/utility.hpp>
43 using std::runtime_error::runtime_error;
61 LOG(info) <<
"Init options for CbmDeviceMcbmUnpack.";
62 fbIgnoreOverlapMs = fConfig->GetValue<
bool>(
"IgnOverMs");
63 fvsSetTimeOffs = fConfig->GetValue<std::vector<std::string>>(
"SetTrigWin");
64 fsChannelNameDataInput = fConfig->GetValue<std::string>(
"TsNameIn");
65 fsChannelNameDataOutput = fConfig->GetValue<std::string>(
"TsNameOut");
67 fsAllowedChannels[0] = fsChannelNameDataInput;
79 int noChannel = fChannels.size();
80 LOG(info) <<
"Number of defined channels: " << noChannel;
81 for (
auto const& entry : fChannels) {
82 LOG(info) <<
"Channel name: " << entry.first;
83 if (std::string::npos != entry.first.find(fsChannelNameDataInput)) {
84 if (!IsChannelNameAllowed(entry.first))
91 LOG(error) << e.what();
97 for (
auto const& entry : fsAllowedChannels) {
98 std::size_t pos1 = channelName.find(entry);
99 if (pos1 != std::string::npos) {
100 const vector<std::string>::const_iterator
pos =
101 std::find(fsAllowedChannels.begin(), fsAllowedChannels.end(), entry);
102 const vector<std::string>::size_type idx =
103 pos - fsAllowedChannels.begin();
104 LOG(info) <<
"Found " << entry <<
" in " << channelName;
105 LOG(info) <<
"Channel name " << channelName
106 <<
" found in list of allowed channel names at position "
111 LOG(info) <<
"Channel name " << channelName
112 <<
" not found in list of allowed channel names.";
113 LOG(error) <<
"Stop device.";
118 LOG(info) <<
"Init parameter containers for CbmDeviceMcbmUnpack.";
120 if (kFALSE == InitParameters(fUnpAlgoSts->GetParList()))
return kFALSE;
121 if (kFALSE == InitParameters(fUnpAlgoMuch->GetParList()))
return kFALSE;
122 if (kFALSE == InitParameters(fUnpAlgoTrd->GetParList()))
return kFALSE;
123 if (kFALSE == InitParameters(fUnpAlgoTof->GetParList()))
return kFALSE;
124 if (kFALSE == InitParameters(fUnpAlgoRich->GetParList()))
return kFALSE;
125 if (kFALSE == InitParameters(fUnpAlgoPsd->GetParList()))
return kFALSE;
128 fUnpAlgoSts->SetIgnoreOverlapMs(fbIgnoreOverlapMs);
129 fUnpAlgoMuch->SetIgnoreOverlapMs(fbIgnoreOverlapMs);
130 fUnpAlgoTrd->SetIgnoreOverlapMs(fbIgnoreOverlapMs);
131 fUnpAlgoTof->SetIgnoreOverlapMs(fbIgnoreOverlapMs);
132 fUnpAlgoRich->SetIgnoreOverlapMs(fbIgnoreOverlapMs);
133 fUnpAlgoPsd->SetIgnoreOverlapMs(fbIgnoreOverlapMs);
136 for (std::vector<std::string>::iterator itStrOffs = fvsSetTimeOffs.begin();
137 itStrOffs != fvsSetTimeOffs.end();
139 size_t charPosDel = (*itStrOffs).find(
',');
140 if (std::string::npos == charPosDel) {
142 <<
"CbmDeviceMcbmUnpack::InitContainers => "
143 <<
"Trying to set trigger window with invalid option pattern, ignored! "
144 <<
" (Should be ECbmModuleId,dWinBeg,dWinEnd but instead found "
145 << (*itStrOffs) <<
" )";
149 std::string sSelDet = (*itStrOffs).substr(0, charPosDel);
152 Double_t dOffset = std::stod((*itStrOffs).substr(charPosDel));
154 if (
"kSTS" == sSelDet) {
155 fUnpAlgoSts->SetTimeOffsetNs(dOffset);
157 else if (
"kMUCH" == sSelDet) {
158 fUnpAlgoMuch->SetTimeOffsetNs(dOffset);
160 else if (
"kTRD" == sSelDet) {
161 fUnpAlgoTrd->SetTimeOffsetNs(dOffset);
163 else if (
"kTOF" == sSelDet) {
164 fUnpAlgoTof->SetTimeOffsetNs(dOffset);
166 else if (
"kRICH" == sSelDet) {
167 fUnpAlgoRich->SetTimeOffsetNs(dOffset);
169 else if (
"kPSD" == sSelDet) {
170 fUnpAlgoPsd->SetTimeOffsetNs(dOffset);
173 LOG(info) <<
"CbmDeviceMcbmUnpack::InitContainers => Trying to set time "
174 "offset for unsupported detector, ignored! "
182 fUnpAlgoSts->SetBinningFwFlag(kTRUE);
184 fUnpAlgoMuch->SetBinningFwFlag(kTRUE);
186 Bool_t initOK = fUnpAlgoSts->InitContainers();
187 initOK &= fUnpAlgoMuch->InitContainers();
188 initOK &= fUnpAlgoTrd->InitContainers();
189 initOK &= fUnpAlgoTof->InitContainers();
190 initOK &= fUnpAlgoRich->InitContainers();
191 initOK &= fUnpAlgoPsd->InitContainers();
195 initOK &= fUnpAlgoTrd->SetDigiOutputPointer(&(fUnpAlgoTrd->GetVector()));
203 for (
int iparC = 0; iparC < fParCList->GetEntries(); iparC++) {
204 FairParGenericSet* tempObj = (FairParGenericSet*) (fParCList->At(iparC));
205 fParCList->Remove(tempObj);
206 std::string paramName {tempObj->GetName()};
211 std::string message = paramName +
",111";
212 LOG(info) <<
"Requesting parameter container " << paramName
213 <<
", sending message: " << message;
215 FairMQMessagePtr req(NewSimpleMessage(message));
216 FairMQMessagePtr rep(NewMessage());
218 FairParGenericSet* newObj =
nullptr;
220 if (Send(req,
"parameters") > 0) {
221 if (Receive(rep,
"parameters") >= 0) {
222 if (0 != rep->GetSize()) {
225 static_cast<FairParGenericSet*
>(tmsg.ReadObject(tmsg.GetClass()));
226 LOG(info) <<
"Received unpack parameter from the server:";
230 LOG(error) <<
"Received empty reply. Parameter not available";
235 fParCList->AddAt(newObj, iparC);
245 LOG(debug) <<
"Received message number " << fulNumMessages <<
" with size "
248 if (0 == fulNumMessages % 10000)
249 LOG(info) <<
"Received " << fulNumMessages <<
" messages";
251 std::string msgStr(
static_cast<char*
>(msg->GetData()), msg->GetSize());
252 std::istringstream iss(msgStr);
253 boost::archive::binary_iarchive inputArchive(iss);
256 fles::StorableTimeslice ts {0};
260 if (-1.0 == fdTsCoreSizeInNs) {
261 fuNbCoreMsPerTs = ts.num_core_microslices();
262 fuNbOverMsPerTs = ts.num_microslices(0) - ts.num_core_microslices();
263 fdTsCoreSizeInNs = fdMsSizeInNs * (fuNbCoreMsPerTs);
264 fdTsOverSizeInNs = fdMsSizeInNs * (fuNbOverMsPerTs);
265 fdTsFullSizeInNs = fdTsCoreSizeInNs + fdTsOverSizeInNs;
266 LOG(info) <<
"Timeslice parameters: each TS has " << fuNbCoreMsPerTs
267 <<
" Core MS and " << fuNbOverMsPerTs
268 <<
" Overlap MS, for a core duration of " << fdTsCoreSizeInNs
269 <<
" ns and a full duration of " << fdTsFullSizeInNs <<
" ns";
273 ts.descriptor(0, 0).idx, fdTsCoreSizeInNs, fdTsOverSizeInNs, ts.index());
279 if (!SendUnpData())
return false;
284 fUnpAlgoSts->ClearVector();
285 fUnpAlgoMuch->ClearVector();
286 fUnpAlgoTrd->ClearVector();
287 fUnpAlgoTof->ClearVector();
288 fUnpAlgoRich->ClearVector();
289 fUnpAlgoPsd->ClearVector();
292 fUnpAlgoSts->ClearErrorVector();
293 fUnpAlgoMuch->ClearErrorVector();
294 fUnpAlgoTrd->ClearErrorVector();
295 fUnpAlgoTof->ClearErrorVector();
296 fUnpAlgoRich->ClearErrorVector();
297 fUnpAlgoPsd->ClearErrorVector();
311 FairMQMessagePtr messTsMeta(NewMessage());
312 Serialize<RootSerializer>(*messTsMeta, fTsMetaData);
314 std::stringstream ossSts;
315 boost::archive::binary_oarchive oaSts(ossSts);
316 oaSts << fUnpAlgoSts->GetVector();
317 std::string* strMsgSts =
new std::string(ossSts.str());
319 std::stringstream ossMuch;
320 boost::archive::binary_oarchive oaMuch(ossMuch);
321 oaMuch << fUnpAlgoMuch->GetVector();
322 std::string* strMsgMuch =
new std::string(ossMuch.str());
324 std::stringstream ossTrd;
325 boost::archive::binary_oarchive oaTrd(ossTrd);
326 oaTrd << fUnpAlgoTrd->GetVector();
327 std::string* strMsgTrd =
new std::string(ossTrd.str());
330 std::vector<CbmTofDigi>& vDigiTofT0 = fUnpAlgoTof->GetVector();
331 std::vector<CbmTofDigi> vDigiTof = {};
332 std::vector<CbmTofDigi> vDigiT0 = {};
334 for (
auto digi : vDigiTofT0) {
335 if (fuDigiMaskedIdT0 == (digi.GetAddress() & fuDigiMaskId)) {
337 vDigiT0.emplace_back(digi);
341 vDigiTof.emplace_back(digi);
345 std::stringstream ossTof;
346 boost::archive::binary_oarchive oaTof(ossTof);
348 std::string* strMsgTof =
new std::string(ossTof.str());
350 std::stringstream ossT0;
351 boost::archive::binary_oarchive oaT0(ossT0);
353 std::string* strMsgT0 =
new std::string(ossT0.str());
355 std::stringstream ossRich;
356 boost::archive::binary_oarchive oaRich(ossRich);
357 oaRich << fUnpAlgoRich->GetVector();
358 std::string* strMsgRich =
new std::string(ossRich.str());
360 std::stringstream ossPsd;
361 boost::archive::binary_oarchive oaPsd(ossPsd);
362 oaPsd << fUnpAlgoPsd->GetVector();
363 std::string* strMsgPsd =
new std::string(ossPsd.str());
367 parts.AddPart(std::move(messTsMeta));
377 parts.AddPart(NewMessage(
378 const_cast<char*
>(strMsgT0->c_str()),
380 [](
void*,
void*
object) { delete static_cast<std::string*>(object); },
383 parts.AddPart(NewMessage(
384 const_cast<char*
>(strMsgSts->c_str()),
386 [](
void*,
void*
object) { delete static_cast<std::string*>(object); },
389 parts.AddPart(NewMessage(
390 const_cast<char*
>(strMsgMuch->c_str()),
391 strMsgMuch->length(),
392 [](
void*,
void*
object) { delete static_cast<std::string*>(object); },
395 parts.AddPart(NewMessage(
396 const_cast<char*
>(strMsgTrd->c_str()),
398 [](
void*,
void*
object) { delete static_cast<std::string*>(object); },
401 parts.AddPart(NewMessage(
402 const_cast<char*
>(strMsgTof->c_str()),
404 [](
void*,
void*
object) { delete static_cast<std::string*>(object); },
407 parts.AddPart(NewMessage(
408 const_cast<char*
>(strMsgRich->c_str()),
409 strMsgRich->length(),
410 [](
void*,
void*
object) { delete static_cast<std::string*>(object); },
413 parts.AddPart(NewMessage(
414 const_cast<char*
>(strMsgPsd->c_str()),
416 [](
void*,
void*
object) { delete static_cast<std::string*>(object); },
419 if (Send(parts, fsChannelNameDataOutput) < 0) {
420 LOG(error) <<
"Problem sending data to " << fsChannelNameDataOutput;
429 if (
nullptr != fUnpAlgoSts)
delete fUnpAlgoSts;
430 if (
nullptr != fUnpAlgoMuch)
delete fUnpAlgoMuch;
431 if (
nullptr != fUnpAlgoTrd)
delete fUnpAlgoTrd;
432 if (
nullptr != fUnpAlgoTof)
delete fUnpAlgoTof;
433 if (
nullptr != fUnpAlgoRich)
delete fUnpAlgoRich;
434 if (
nullptr != fUnpAlgoPsd)
delete fUnpAlgoPsd;
442 if (kFALSE == fbComponentsAddedToList) {
443 for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
444 switch (ts.descriptor(uCompIdx, 0).sys_id) {
446 fUnpAlgoSts->AddMsComponentToList(uCompIdx, kusSysIdSts);
450 fUnpAlgoMuch->AddMsComponentToList(uCompIdx, kusSysIdMuch);
454 fUnpAlgoTrd->AddMsComponentToList(uCompIdx, kusSysIdTrd);
458 fUnpAlgoTof->AddMsComponentToList(uCompIdx, kusSysIdTof);
462 fUnpAlgoTof->AddMsComponentToList(uCompIdx, kusSysIdT0);
466 fUnpAlgoRich->AddMsComponentToList(uCompIdx, kusSysIdRich);
470 fUnpAlgoPsd->AddMsComponentToList(uCompIdx, kusSysIdPsd);
476 fbComponentsAddedToList = kTRUE;
479 if (kFALSE == fUnpAlgoSts->ProcessTs(ts)) {
480 LOG(error) <<
"Failed processing TS " << ts.index()
481 <<
" in STS unpacker algorithm class";
485 if (kFALSE == fUnpAlgoMuch->ProcessTs(ts)) {
486 LOG(error) <<
"Failed processing TS " << ts.index()
487 <<
" in MUCH unpacker algorithm class";
491 if (kFALSE == fUnpAlgoTrd->ProcessTs(ts)) {
492 LOG(error) <<
"Failed processing TS " << ts.index()
493 <<
" in TRD unpacker algorithm class";
497 if (kFALSE == fUnpAlgoTof->ProcessTs(ts)) {
498 LOG(error) <<
"Failed processing TS " << ts.index()
499 <<
" in TOF unpacker algorithm class";
503 if (kFALSE == fUnpAlgoRich->ProcessTs(ts)) {
504 LOG(error) <<
"Failed processing TS " << ts.index()
505 <<
" in RICH unpacker algorithm class";
509 if (kFALSE == fUnpAlgoPsd->ProcessTs(ts)) {
510 LOG(error) <<
"Failed processing TS " << ts.index()
511 <<
" in PSD unpacker algorithm class";
516 if (0 == fulTsCounter % 10000)
517 LOG(info) <<
"Processed " << fulTsCounter <<
" time slices";