14 #include "StorableTimeslice.hpp"
16 #include "BoostSerializer.h"
17 #include "FairMQLogger.h"
18 #include "FairMQProgOptions.h"
19 #include "FairParGenericSet.h"
20 #include "RootSerializer.h"
32 #include <boost/archive/binary_iarchive.hpp>
33 #include <boost/serialization/utility.hpp>
37 using std::runtime_error::runtime_error;
45 : fbIgnoreOverlapMs {
false}
46 , fsChannelNameDataInput {
"t0component"}
47 , fsChannelNameHistosInput {
"histogram-in"}
48 , fsChannelNameHistosConfig {
"histo-conf"}
49 , fsChannelNameCanvasConfig {
"canvas-conf"}
50 , fuHistoryHistoSize {3600}
51 , fuMinTotPulser {185}
52 , fuMaxTotPulser {195}
53 , fuOffSpillCountLimit {1000}
54 , fuPublishFreqTs {100}
55 , fdMinPublishTime {0.5}
56 , fdMaxPublishTime {5.0}
57 , fsAllowedChannels {fsChannelNameDataInput}
61 , fLastPublishTime {std::chrono::system_clock::now()}
65 , fvpsCanvasConfig {} {}
69 LOG(info) <<
"Init options for CbmMqStarHistoServer.";
84 LOG(info) <<
"Histograms publication frequency in TS: " <<
fuPublishFreqTs;
85 LOG(info) <<
"Histograms publication min. interval in s: "
87 LOG(info) <<
"Histograms publication max. interval in s: "
100 int noChannel = fChannels.size();
101 LOG(info) <<
"Number of defined channels: " << noChannel;
102 for (
auto const& entry : fChannels) {
103 LOG(info) <<
"Channel name: " << entry.first;
112 LOG(error) << e.what();
119 std::size_t pos1 = channelName.find(entry);
120 if (pos1 != std::string::npos) {
121 const vector<std::string>::const_iterator
pos =
123 const vector<std::string>::size_type idx =
125 LOG(info) <<
"Found " << entry <<
" in " << channelName;
126 LOG(info) <<
"Channel name " << channelName
127 <<
" found in list of allowed channel names at position "
132 LOG(info) <<
"Channel name " << channelName
133 <<
" not found in list of allowed channel names.";
134 LOG(error) <<
"Stop device.";
139 LOG(info) <<
"Init parameter containers for CbmDeviceMonitorT0.";
143 for (
int iparC = 0; iparC <
fParCList->GetEntries(); iparC++) {
144 FairParGenericSet* tempObj = (FairParGenericSet*) (
fParCList->At(iparC));
146 std::string paramName {tempObj->GetName()};
151 std::string message = paramName +
",111";
152 LOG(info) <<
"Requesting parameter container " << paramName
153 <<
", sending message: " << message;
155 FairMQMessagePtr req(NewSimpleMessage(message));
156 FairMQMessagePtr rep(NewMessage());
158 FairParGenericSet* newObj =
nullptr;
160 if (Send(req,
"parameters") > 0) {
161 if (Receive(rep,
"parameters") >= 0) {
162 if (rep->GetSize() != 0) {
165 static_cast<FairParGenericSet*
>(tmsg.ReadObject(tmsg.GetClass()));
166 LOG(info) <<
"Received unpack parameter from the server:";
169 LOG(error) <<
"Received empty reply. Parameter not available";
194 std::vector<std::pair<TNamed*, std::string>> vHistos =
197 std::vector<std::pair<TCanvas*, std::string>> vCanvases =
204 for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
209 std::pair<std::string, std::string> psHistoConfig(
210 vHistos[uHisto].
first->GetName(), vHistos[uHisto].second);
214 FairMQMessagePtr messageHist(NewMessage());
215 Serialize<BoostSerializer<std::pair<std::string, std::string>>>(
216 *messageHist, psHistoConfig);
220 LOG(error) <<
"Problem sending histo config";
224 LOG(info) <<
"Config of hist " << psHistoConfig.first.data()
225 <<
" in folder " << psHistoConfig.second.data();
231 for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
234 std::string sCanvName = (vCanvases[uCanv].first)->GetName();
237 std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
242 FairMQMessagePtr messageCan(NewMessage());
243 Serialize<BoostSerializer<std::pair<std::string, std::string>>>(
244 *messageCan, psCanvConfig);
248 LOG(error) <<
"Problem sending canvas config";
252 LOG(info) <<
"Config string of Canvas " << psCanvConfig.first.data()
253 <<
" is " << psCanvConfig.second.data();
263 LOG(debug) <<
"Received message number " <<
fulNumMessages <<
" with size "
269 std::string msgStr(
static_cast<char*
>(msg->GetData()), msg->GetSize());
270 std::istringstream iss(msgStr);
271 boost::archive::binary_iarchive inputArchive(iss);
274 fles::StorableTimeslice component {0};
275 inputArchive >> component;
283 std::chrono::system_clock::time_point currentTime =
284 std::chrono::system_clock::now();
285 std::chrono::duration<double_t> elapsedSeconds =
299 FairMQMessagePtr message(NewMessage());
322 LOG(error) <<
"Problem sending data";
341 for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
342 if (
kusSysId == ts.descriptor(uCompIdx, 0).sys_id) {
350 LOG(info) <<
"Reset T0 Monitor histos ";
356 LOG(error) <<
"Failed processing TS " << ts.index()
357 <<
" in unpacker algorithm class";
365 LOG(info) <<
"Processed " <<
fulTsCounter <<
" time slices";