00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include "psana/PSAna.h"
00017
00018
00019
00020
00021 #include <signal.h>
00022 #include <map>
00023 #include <boost/algorithm/string.hpp>
00024 #include <boost/filesystem.hpp>
00025 #include <boost/make_shared.hpp>
00026 #include <boost/format.hpp>
00027
00028
00029
00030
00031 #include "ConfigSvc/ConfigSvc.h"
00032 #include "ConfigSvc/ConfigSvcImplFile.h"
00033 #include "IData/Dataset.h"
00034 #include "MsgLogger/MsgLogger.h"
00035 #include "psana/DynLoader.h"
00036 #include "psana/Exceptions.h"
00037 #include "psana/ExpNameFromConfig.h"
00038 #include "psana/ExpNameFromDs.h"
00039 #include "psana/MPWorkerId.h"
00040 #include "PSEnv/Env.h"
00041
00042
00043
00044
00045
00046 namespace fs = boost::filesystem ;
00047
00048 namespace {
00049
00050 const char* logger = "PSAna";
00051
00052 enum FileType { Unknown=-1, Mixed=0, XTC, HDF5, SHMEM, IDX, SMALLDATA };
00053
00054 std::map<std::string, FileType> getDsetInputKeys() {
00055 std::map<std::string, FileType> dsetInputKeys;
00056 dsetInputKeys["xtc"]=XTC;
00057 dsetInputKeys["h5"]=HDF5;
00058 dsetInputKeys["shmem"]=SHMEM;
00059 dsetInputKeys["idx"]=IDX;
00060 dsetInputKeys["smd"]=SMALLDATA;
00061 return dsetInputKeys;
00062 }
00063
00064
00065 template <typename Iter>
00066 FileType guessType(Iter begin, Iter end) {
00067
00068 FileType type = Unknown;
00069
00070 std::map<std::string, FileType> dsetInputKeys = getDsetInputKeys();
00071
00072 for ( ; begin != end; ++ begin) {
00073
00074 IData::Dataset ds(*begin);
00075
00076 FileType ftype = Unknown;
00077 int numSpecifiersInDataset = 0;
00078
00079 for (std::map<std::string, FileType>::iterator dsKey = dsetInputKeys.begin();
00080 dsKey != dsetInputKeys.end(); ++dsKey) {
00081 const std::string inputKey = dsKey->first;
00082 FileType dsFtype = dsKey->second;
00083 if (ds.exists(inputKey)) {
00084 ftype = dsFtype;
00085 numSpecifiersInDataset += 1;
00086 }
00087 }
00088
00089 if (numSpecifiersInDataset > 1) {
00090 MsgLog(logger, fatal, "More than one input source specified in dataset");
00091 }
00092
00093 if (ftype == Unknown) return ftype;
00094 if (type == Unknown) {
00095 type = ftype;
00096 } else if (type == XTC or type == HDF5) {
00097 if (ftype != type) return Mixed;
00098 }
00099 }
00100
00101 return type;
00102 }
00103
00104 }
00105
00106
00107
00108
00109
00110
00111 namespace psana {
00112
00113
00114
00115
00116 PSAna::PSAna(const std::string& config, const std::map<std::string, std::string>& options)
00117 : m_context(Context::generate())
00118 , m_modules()
00119 {
00120 Context::set(m_context);
00121
00122
00123 boost::shared_ptr<ConfigSvc::ConfigSvcImplI> cfgImpl = boost::make_shared<ConfigSvc::ConfigSvcImplFile>(config);
00124 ConfigSvc::ConfigSvc::init(cfgImpl, m_context);
00125
00126
00127 if (not ConfigSvc::ConfigSvc::initialized()) {
00128 ConfigSvc::ConfigSvc::init(boost::make_shared<ConfigSvc::ConfigSvcImplFile>());
00129 }
00130
00131 ConfigSvc::ConfigSvc cfgsvc(m_context);
00132 ConfigSvc::ConfigSvc glbcfgsvc;
00133
00134
00135 for (std::map<std::string, std::string>::const_iterator it = options.begin(); it != options.end(); ++ it) {
00136 std::string section;
00137 std::string option = it->first;
00138 std::string::size_type p = option.rfind('.');
00139 if (p == std::string::npos) {
00140 section = "psana";
00141 } else {
00142 section = std::string(option, 0, p);
00143 option.erase(0, p+1);
00144 }
00145 cfgsvc.put(section, option, it->second);
00146
00147 glbcfgsvc.put(section, option, it->second);
00148 }
00149 }
00150
00151
00152
00153
00154 PSAna::~PSAna ()
00155 {
00156 }
00157
00158
00159
00160
00161 std::vector<std::string>
00162 PSAna::modules()
00163 {
00164 ConfigSvc::ConfigSvc cfgsvc(m_context);
00165 std::vector<std::string> moduleNames = cfgsvc.getList("psana", "modules", std::vector<std::string>());
00166 return moduleNames;
00167 }
00168
00169
00170
00171 DataSource
00172 PSAna::dataSource(const std::vector<std::string>& input)
00173 {
00174 Context::set(m_context);
00175
00176 ConfigSvc::ConfigSvc cfgsvc(m_context);
00177
00178 DataSource dataSrc;
00179
00180
00181 std::vector<std::string> inputList(input);
00182 if (inputList.empty()) {
00183 inputList = cfgsvc.getList("psana", "input", std::vector<std::string>());
00184 }
00185 if (inputList.empty()) {
00186 inputList = cfgsvc.getList("psana", "files", std::vector<std::string>());
00187 }
00188 if (inputList.empty()) {
00189 MsgLog(logger, error, "no input data specified");
00190 return dataSrc;
00191 }
00192
00193
00194 const char* datadir = getenv("SIT_PSDM_DATA");
00195 std::string calibDirRoot;
00196 if (datadir) {
00197 calibDirRoot = datadir;
00198 } else {
00199 calibDirRoot = "/reg/d/psdm";
00200 }
00201 boost::format fmt("%1%/%2%");
00202 fmt % calibDirRoot % "{instr}/{exp}/calib";
00203 std::string calibDirDefault = fmt.str();
00204 std::string calibDir = cfgsvc.getStr("psana", "calib-dir", calibDirDefault);
00205
00206
00207 std::string jobName = cfgsvc.getStr("psana", "job-name", "");
00208 if (jobName.empty() and not inputList.empty()) {
00209 boost::filesystem::path path = inputList.front();
00210 jobName = path.stem().string();
00211 }
00212 MsgLog(logger, debug, "job name = " << jobName);
00213
00214
00215 boost::shared_ptr<PSEnv::IExpNameProvider> expNameProvider;
00216 if(not cfgsvc.getStr("psana", "experiment", "").empty()) {
00217 const std::string& instr = cfgsvc.getStr("psana", "instrument", "");
00218 const std::string& exp = cfgsvc.getStr("psana", "experiment", "");
00219 expNameProvider = boost::make_shared<ExpNameFromConfig>(instr, exp);
00220 } else {
00221 expNameProvider = boost::make_shared<ExpNameFromDs>(inputList);
00222 }
00223
00224
00225 boost::shared_ptr<PSEvt::AliasMap> amap = boost::make_shared<AliasMap>();
00226
00227
00228 ::FileType ftype = ::guessType(inputList.begin(), inputList.end());
00229 MsgLog(logger, debug, "input data type: " << int(ftype));
00230 if (ftype == Mixed) {
00231 MsgLog(logger, error, "Mixed input file types");
00232 return dataSrc;
00233 }
00234
00235
00236 int nworkers = cfgsvc.get("psana", "parallel", 0);
00237 switch (ftype) {
00238 case IDX:
00239 if (nworkers > 0) {
00240 MsgLog(logger, warning, "Multi-process mode is not available for IDX data, switching to single-process");
00241 nworkers = 0;
00242 }
00243 break;
00244 case HDF5:
00245 if (nworkers > 0) {
00246 MsgLog(logger, warning, "Multi-process mode is not available for HDF5 data, switching to single-process");
00247 nworkers = 0;
00248 }
00249 break;
00250 case XTC:
00251 case SHMEM:
00252 case SMALLDATA:
00253
00254 break;
00255 case Unknown:
00256 ftype = XTC;
00257 break;
00258 case Mixed:
00259
00260 break;
00261 }
00262 if (nworkers > 255) {
00263 MsgLog(logger, warning, "Number of workers exceeds limit, reduced to 255");
00264 nworkers = 255;
00265 }
00266
00267
00268
00269 int workerId = -1;
00270 int readyPipe = -1;
00271 int dPipe = -1;
00272 boost::shared_ptr<std::vector<MPWorkerId> > workers;
00273 if (nworkers > 0) {
00274
00275 workers = boost::make_shared<std::vector<MPWorkerId> >();
00276
00277
00278 int rPipe[2];
00279 pipe(rPipe);
00280 readyPipe = rPipe[0];
00281
00282 for (int iworker = 0; iworker < nworkers; ++ iworker) {
00283
00284
00285 int dataPipe[2];
00286 pipe(dataPipe);
00287
00288 pid_t pid = fork();
00289 if (pid == -1) {
00290
00291
00292 throw ExceptionErrno(ERR_LOC, "fork failed");
00293
00294 } else if (pid == 0) {
00295
00296
00297
00298
00299 close(dataPipe[1]);
00300 close(rPipe[0]);
00301
00302 workerId = iworker;
00303 readyPipe = rPipe[1];
00304 dPipe = dataPipe[0];
00305
00306
00307 workers.reset();
00308
00309 MsgLog(logger, trace, "Forked worker #" << iworker << " dataPipeFd: " << dataPipe[0] << " readyPipe: " << readyPipe);
00310
00311 break;
00312
00313 } else {
00314
00315
00316
00317
00318 close(dataPipe[0]);
00319
00320
00321 workers->push_back(MPWorkerId(iworker, pid, dataPipe[1]));
00322 MsgLog(logger, trace, "Add worker #" << iworker << " pid " << pid << " dataPipeFd " << dataPipe[1]);
00323
00324 }
00325
00326 }
00327
00328 if (workerId < 0) {
00329
00330 ::close(rPipe[1]);
00331 }
00332 }
00333
00334
00335
00336 std::string iname;
00337 switch (ftype) {
00338 case XTC:
00339 if (nworkers <= 0) {
00340
00341 iname = "PSXtcInput.XtcInputModule";
00342 } else if (workerId < 0) {
00343
00344 iname = "PSXtcMPInput.XtcMPMasterInput";
00345 } else {
00346
00347 iname = "PSXtcMPInput.XtcMPWorkerInput";
00348 }
00349 break;
00350 case SMALLDATA:
00351 if (nworkers <= 0) {
00352
00353 iname = "PSXtcInput.XtcInputModule";
00354 } else if (workerId < 0) {
00355
00356 MsgLog(logger, fatal, "smldata not supported with parallel");
00357 } else {
00358
00359 MsgLog(logger, fatal, "smldata not supported with parallel");
00360 }
00361 break;
00362 case SHMEM:
00363 if (nworkers <= 0) {
00364
00365 iname = "PSShmemInput.ShmemInputModule";
00366 } else if (workerId < 0) {
00367
00368 iname = "PSXtcMPInput.ShmemMPMasterInput";
00369 } else {
00370
00371 iname = "PSXtcMPInput.XtcMPWorkerInput";
00372 }
00373 break;
00374 case HDF5:
00375 iname = "PSHdf5Input.Hdf5InputModule";
00376 break;
00377 case IDX:
00378 iname = "PSXtcInput.XtcIndexInputModule";
00379 break;
00380 case Unknown:
00381 case Mixed:
00382
00383 break;
00384 }
00385
00386
00387 std::string flist = boost::join(inputList, " ");
00388 cfgsvc.put(iname, "input", flist);
00389 cfgsvc.put(iname, "files", flist);
00390 if (readyPipe >= 0) {
00391 cfgsvc.put(iname, "fdReadyPipe", boost::lexical_cast<std::string>(readyPipe));
00392 }
00393 if (workerId >= 0) {
00394 cfgsvc.put(iname, "workerId", boost::lexical_cast<std::string>(workerId));
00395 }
00396 if (dPipe >= 0) {
00397 cfgsvc.put(iname, "fdDataPipe", boost::lexical_cast<std::string>(dPipe));
00398 }
00399
00400
00401 DynLoader loader;
00402 boost::shared_ptr<psana::InputModule> inputModule(loader.loadInputModule(iname));
00403 MsgLog(logger, trace, "Loaded input module " << iname);
00404
00405
00406 boost::shared_ptr<PSEnv::Env> env = boost::make_shared<PSEnv::Env>(jobName, expNameProvider, calibDir, amap, workerId);
00407 MsgLogRoot(debug, "instrument = " << env->instrument() << " experiment = " << env->experiment());
00408 MsgLogRoot(debug, "calibDir = " << env->calibDir());
00409
00410
00411 if (nworkers > 0 and workerId < 0) {
00412
00413
00414
00415
00416 env->configStore().put(workers, Pds::Src());
00417
00418
00419
00420 struct sigaction sa;
00421 memset(&sa, 0, sizeof(sa));
00422 sa.sa_handler = SIG_IGN;
00423 sa.sa_flags = SA_NOCLDWAIT;
00424 sigaction(SIGCHLD, &sa, NULL);
00425 sa.sa_flags = 0;
00426 sigaction(SIGPIPE, &sa, NULL);
00427
00428 } else {
00429
00430
00431
00432
00433 std::vector<std::string> moduleNames = cfgsvc.getList("psana", "modules", std::vector<std::string>());
00434
00435
00436 for ( std::vector<std::string>::const_iterator it = moduleNames.begin(); it != moduleNames.end() ; ++ it ) {
00437 m_modules.push_back(loader.loadModule(*it));
00438 MsgLog(logger, trace, "From psana modules, loaded module " << m_modules.back()->name());
00439 }
00440 if (moduleNames.size()==0) {
00441 MsgLog(logger, trace, "psana modules parameter is empty.");
00442 }
00443
00444 }
00445
00446
00447 dataSrc = DataSource(inputModule, m_modules, env);
00448
00449 return dataSrc;
00450 }
00451
00452 }