CbmRoot
ParameterMQServer.cxx
Go to the documentation of this file.
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence version 3 (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
15 #include "CbmMQDefs.h"
16 
17 #include "Rtypes.h"
18 #include "TMessage.h"
19 
20 #include "FairParAsciiFileIo.h"
21 #include "FairParGenericSet.h"
22 #include "FairParRootFileIo.h"
23 #include "FairRuntimeDb.h"
24 
25 #include "FairMQLogger.h"
26 #include "FairMQProgOptions.h"
27 #include "ParameterMQServer.h"
28 
29 #include "TGeoManager.h"
30 #include "TList.h"
31 #include "TObjString.h"
32 #include "TSystem.h"
33 
34 using namespace std;
35 
37  : fRtdb(FairRuntimeDb::instance())
38  , fFirstInputName("first_input.root")
39  , fFirstInputType("ROOT")
40  , fSecondInputName("")
41  , fSecondInputType("ROOT")
42  , fOutputName("")
43  , fOutputType("ROOT")
44  , fChannelName("data") {}
45 
47  string loadLibs = fConfig->GetValue<string>("libs-to-load");
48  if (loadLibs.length() > 0) {
49  LOG(info) << "There are libraries to load.";
50  if (loadLibs.find(";") != std::string::npos) {
51  LOG(info) << "There are several libraries to load";
52  istringstream f(loadLibs);
53  string s;
54  while (getline(f, s, ';')) {
55  LOG(info) << "Load library " << s;
56  gSystem->Load(s.c_str());
57  }
58  } else {
59  LOG(info) << "Load library " << loadLibs;
60  gSystem->Load(loadLibs.c_str());
61  }
62  } else {
63  LOG(info) << "There are no libraries to load.";
64  }
65 
66  fFirstInputName = fConfig->GetValue<string>("first-input-name");
67  fFirstInputType = fConfig->GetValue<string>("first-input-type");
68  fSecondInputName = fConfig->GetValue<string>("second-input-name");
69  fSecondInputType = fConfig->GetValue<string>("second-input-type");
70  fOutputName = fConfig->GetValue<string>("output-name");
71  fOutputType = fConfig->GetValue<string>("output-type");
72  fChannelName = fConfig->GetValue<string>("channel-name");
73 
74  if (fRtdb != 0) {
75  // Set first input
76  if (fFirstInputType == "ROOT") {
77  FairParRootFileIo* par1R = new FairParRootFileIo();
78  par1R->open(fFirstInputName.data(), "UPDATE");
79  fRtdb->setFirstInput(par1R);
80  } else if (fFirstInputType == "ASCII") {
81  FairParAsciiFileIo* par1A = new FairParAsciiFileIo();
82  if (fFirstInputName.find(";") != std::string::npos) {
83  LOG(info) << "File list found!";
84  TList* parFileList = new TList();
85  TObjString* parFile(NULL);
86  istringstream f(fFirstInputName);
87  string s;
88  while (getline(f, s, ';')) {
89  LOG(info) << "File: " << s;
90  parFile = new TObjString(s.c_str());
91  parFileList->Add(parFile);
92  par1A->open(parFileList, "in");
93  }
94  } else {
95  LOG(info) << "Single input file found!";
96  par1A->open(fFirstInputName.data(), "in");
97  }
98  fRtdb->setFirstInput(par1A);
99  }
100 
101  // Set second input
102  if (fSecondInputName != "") {
103  if (fSecondInputType == "ROOT") {
104  FairParRootFileIo* par2R = new FairParRootFileIo();
105  par2R->open(fSecondInputName.data(), "UPDATE");
106  fRtdb->setSecondInput(par2R);
107  } else if (fSecondInputType == "ASCII") {
108  FairParAsciiFileIo* par2A = new FairParAsciiFileIo();
109  if (fSecondInputName.find(";") != std::string::npos) {
110  LOG(info) << "File list found!";
111  TList* parFileList = new TList();
112  TObjString* parFile(NULL);
113  istringstream f(fSecondInputName);
114  string s;
115  while (getline(f, s, ';')) {
116  LOG(info) << "File: " << s;
117  parFile = new TObjString(s.c_str());
118  parFileList->Add(parFile);
119  par2A->open(parFileList, "in");
120  }
121  } else {
122  LOG(info) << "Single input file found!";
123  par2A->open(fFirstInputName.data(), "in");
124  }
125  fRtdb->setSecondInput(par2A);
126  }
127  }
128 
129  // Set output
130  if (fOutputName != "") {
131  if (fOutputType == "ROOT") {
132  FairParRootFileIo* parOut = new FairParRootFileIo(kTRUE);
133  parOut->open(fOutputName.data());
134  fRtdb->setOutput(parOut);
135  }
136 
137  fRtdb->saveOutput();
138  }
139  }
140  fRtdb->print();
141 }
142 
144  string parameterName = "";
145  FairParGenericSet* par = nullptr;
146 
148  FairMQMessagePtr req(NewMessage());
149 
150  if (Receive(req, fChannelName, 0) > 0) {
151  string reqStr(static_cast<char*>(req->GetData()), req->GetSize());
152  LOG(info) << "Received parameter request from client: \"" << reqStr
153  << "\"";
154 
155  size_t pos = reqStr.rfind(",");
156  string newParameterName = reqStr.substr(0, pos);
157  int runId = stoi(reqStr.substr(pos + 1));
158  LOG(info) << "Parameter name: " << newParameterName;
159  LOG(info) << "Run ID: " << runId;
160 
161  LOG(info) << "Retrieving parameter...";
162  // Check if the parameter name has changed to avoid getting same container repeatedly
163  if (newParameterName != parameterName) {
164  parameterName = newParameterName;
165  par = static_cast<FairParGenericSet*>(
166  fRtdb->getContainer(parameterName.c_str()));
167  }
168  LOG(info) << "Retrieving parameter...Done";
169 
170  if (-1 != runId) { fRtdb->initContainers(runId); }
171 
172  LOG(info) << "Sending following parameter to the client:";
173  if (par) {
174  par->print();
175 
176  TMessage* tmsg = new TMessage(kMESS_OBJECT);
177  tmsg->WriteObject(par);
178 
179  FairMQMessagePtr rep(NewMessage(
180  tmsg->Buffer(),
181  tmsg->BufferSize(),
182  [](void* /*data*/, void* object) {
183  delete static_cast<TMessage*>(object);
184  },
185  tmsg));
186 
187  if (Send(rep, fChannelName, 0) < 0) {
188  LOG(error) << "failed sending reply";
189  break;
190  }
191  } else {
192  LOG(error) << "Parameter uninitialized!";
193  // Send an empty message back to keep the REQ/REP cycle
194  FairMQMessagePtr rep(NewMessage());
195  if (Send(rep, fChannelName, 0) < 0) {
196  LOG(error) << "failed sending reply";
197  break;
198  }
199  }
200  }
201  }
202 }
203 
205  if (gGeoManager) {
206  gGeoManager->GetListOfVolumes()->Delete();
207  gGeoManager->GetListOfShapes()->Delete();
208  }
209  delete fRtdb;
210 }
f
float f
Definition: L1/vectors/P4_F32vec4.h:24
ParameterMQServer::fSecondInputName
std::string fSecondInputName
Definition: ParameterMQServer.h:71
cbm::mq::CheckCurrentState
bool CheckCurrentState(FairMQDevice *device, cbm::mq::State state)
Definition: CbmMQDefs.h:62
ParameterMQServer::Run
virtual void Run()
Definition: ParameterMQServer.cxx:143
ParameterMQServer::fRtdb
FairRuntimeDb * fRtdb
Definition: ParameterMQServer.h:67
ParameterMQServer::fChannelName
std::string fChannelName
Definition: ParameterMQServer.h:76
ParameterMQServer::fSecondInputType
std::string fSecondInputType
Definition: ParameterMQServer.h:72
ParameterMQServer::~ParameterMQServer
virtual ~ParameterMQServer()
Definition: ParameterMQServer.cxx:204
ParameterMQServer::fFirstInputType
std::string fFirstInputType
Definition: ParameterMQServer.h:70
ParameterMQServer::InitTask
virtual void InitTask()
Definition: ParameterMQServer.cxx:46
cbm::mq::State::Running
@ Running
ParameterMQServer::fFirstInputName
std::string fFirstInputName
Definition: ParameterMQServer.h:69
ParameterMQServer::fOutputName
std::string fOutputName
Definition: ParameterMQServer.h:73
pos
TVector3 pos
Definition: CbmMvdSensorDigiToHitTask.cxx:60
ParameterMQServer.h
ParameterMQServer::ParameterMQServer
ParameterMQServer()
Definition: ParameterMQServer.cxx:36
CbmMQDefs.h
ParameterMQServer::fOutputType
std::string fOutputType
Definition: ParameterMQServer.h:74