psana/src/PSAna.cpp

Go to the documentation of this file.
00001 //--------------------------------------------------------------------------
00002 // File and Version Information:
00003 //      $Id: PSAna.cpp 9886 2015-04-09 16:17:19Z cpo@SLAC.STANFORD.EDU $
00004 //
00005 // Description:
00006 //      Class PSAna...
00007 //
00008 // Author List:
00009 //      Andy Salnikov
00010 //
00011 //------------------------------------------------------------------------
00012 
00013 //-----------------------
00014 // This Class's Header --
00015 //-----------------------
00016 #include "psana/PSAna.h"
00017 
00018 //-----------------
00019 // C/C++ Headers --
00020 //-----------------
00021 #include <signal.h>
00022 #include <map>
00023 #include <boost/algorithm/string.hpp>
00024 #include <boost/filesystem.hpp>
00025 #include <boost/make_shared.hpp>
00026 #include <boost/format.hpp>
00027 
00028 //-------------------------------
00029 // Collaborating Class Headers --
00030 //-------------------------------
00031 #include "ConfigSvc/ConfigSvc.h"
00032 #include "ConfigSvc/ConfigSvcImplFile.h"
00033 #include "IData/Dataset.h"
00034 #include "MsgLogger/MsgLogger.h"
00035 #include "psana/DynLoader.h"
00036 #include "psana/Exceptions.h"
00037 #include "psana/ExpNameFromConfig.h"
00038 #include "psana/ExpNameFromDs.h"
00039 #include "psana/MPWorkerId.h"
00040 #include "PSEnv/Env.h"
00041 
00042 //-----------------------------------------------------------------------
00043 // Local Macros, Typedefs, Structures, Unions and Forward Declarations --
00044 //-----------------------------------------------------------------------
00045 
00046 namespace fs = boost::filesystem ;
00047 
00048 namespace {
00049 
00050   const char* logger = "PSAna";
00051 
00052   enum FileType { Unknown=-1, Mixed=0, XTC, HDF5, SHMEM, IDX, SMALLDATA };
00053 
00054   std::map<std::string, FileType> getDsetInputKeys() {
00055     std::map<std::string, FileType> dsetInputKeys;
00056     dsetInputKeys["xtc"]=XTC;
00057     dsetInputKeys["h5"]=HDF5;
00058     dsetInputKeys["shmem"]=SHMEM;
00059     dsetInputKeys["idx"]=IDX;
00060     dsetInputKeys["smd"]=SMALLDATA;
00061     return dsetInputKeys;
00062   }
00063 
00064   // Function which tries to guess input data type from file name extensions
00065   template <typename Iter>
00066   FileType guessType(Iter begin, Iter end) {
00067 
00068     FileType type = Unknown;
00069 
00070     std::map<std::string, FileType> dsetInputKeys = getDsetInputKeys();
00071 
00072     for ( ; begin != end; ++ begin) {
00073 
00074       IData::Dataset ds(*begin);
00075 
00076       FileType ftype = Unknown;
00077       int numSpecifiersInDataset = 0;
00078       
00079       for (std::map<std::string, FileType>::iterator dsKey = dsetInputKeys.begin();
00080            dsKey != dsetInputKeys.end(); ++dsKey) {
00081         const std::string inputKey = dsKey->first;
00082         FileType dsFtype = dsKey->second;
00083         if (ds.exists(inputKey)) {
00084           ftype = dsFtype;
00085           numSpecifiersInDataset += 1;
00086         } 
00087       }
00088       
00089       if (numSpecifiersInDataset > 1) {
00090         MsgLog(logger, fatal, "More than one input source specified in dataset");
00091       }
00092 
00093       if (ftype == Unknown) return ftype;
00094       if (type == Unknown) {
00095         type = ftype;
00096       } else if (type == XTC or type == HDF5) {
00097         if (ftype != type) return Mixed;
00098       }
00099     }
00100 
00101     return type;
00102   }
00103 
00104 }
00105 
00106 
00107 //              ----------------------------------------
00108 //              -- Public Function Member Definitions --
00109 //              ----------------------------------------
00110 
00111 namespace psana {
00112 
00113 //----------------
00114 // Constructors --
00115 //----------------
00116 PSAna::PSAna(const std::string& config, const std::map<std::string, std::string>& options)
00117   : m_context(Context::generate())
00118   , m_modules()
00119 {
00120   Context::set(m_context);
00121 
00122   // initialize configuration service, this can only be done once
00123   boost::shared_ptr<ConfigSvc::ConfigSvcImplI> cfgImpl = boost::make_shared<ConfigSvc::ConfigSvcImplFile>(config);
00124   ConfigSvc::ConfigSvc::init(cfgImpl, m_context);
00125 
00126   // for backward compaibility also initialize config service in global context
00127   if (not ConfigSvc::ConfigSvc::initialized()) {
00128     ConfigSvc::ConfigSvc::init(boost::make_shared<ConfigSvc::ConfigSvcImplFile>());
00129   }
00130 
00131   ConfigSvc::ConfigSvc cfgsvc(m_context);
00132   ConfigSvc::ConfigSvc glbcfgsvc;
00133 
00134   // copy all options
00135   for (std::map<std::string, std::string>::const_iterator it = options.begin(); it != options.end(); ++ it) {
00136     std::string section;
00137     std::string option = it->first;
00138     std::string::size_type p = option.rfind('.');
00139     if (p == std::string::npos) {
00140       section = "psana";
00141     } else {
00142       section = std::string(option, 0, p);
00143       option.erase(0, p+1);
00144     }
00145     cfgsvc.put(section, option, it->second);
00146     // and update global config as well
00147     glbcfgsvc.put(section, option, it->second);
00148   }
00149 }
00150 
00151 //--------------
00152 // Destructor --
00153 //--------------
00154 PSAna::~PSAna ()
00155 {
00156 }
00157 
00158 /**
00159  *  @brief Get the list of modules.
00160  */
00161 std::vector<std::string>
00162 PSAna::modules()
00163 {
00164   ConfigSvc::ConfigSvc cfgsvc(m_context);
00165   std::vector<std::string> moduleNames = cfgsvc.getList("psana", "modules", std::vector<std::string>());
00166   return moduleNames;
00167 }
00168 
00169 
00170 // Create data source instance for the set of input files/datasets.
00171 DataSource
00172 PSAna::dataSource(const std::vector<std::string>& input)
00173 {
00174   Context::set(m_context);
00175 
00176   ConfigSvc::ConfigSvc cfgsvc(m_context);
00177 
00178   DataSource dataSrc;
00179 
00180   // if input is empty try to use input from config file
00181   std::vector<std::string> inputList(input);
00182   if (inputList.empty()) {
00183     inputList = cfgsvc.getList("psana", "input", std::vector<std::string>());
00184   }
00185   if (inputList.empty()) {
00186     inputList = cfgsvc.getList("psana", "files", std::vector<std::string>());
00187   }
00188   if (inputList.empty()) {
00189     MsgLog(logger, error, "no input data specified");
00190     return dataSrc;
00191   }
00192 
00193   // get calib directory name
00194   const char* datadir = getenv("SIT_PSDM_DATA");
00195   std::string calibDirRoot;
00196   if (datadir) {
00197     calibDirRoot = datadir;
00198   } else {
00199     calibDirRoot = "/reg/d/psdm";
00200   } 
00201   boost::format fmt("%1%/%2%");
00202   fmt % calibDirRoot % "{instr}/{exp}/calib";
00203   std::string calibDirDefault = fmt.str();
00204   std::string calibDir = cfgsvc.getStr("psana", "calib-dir", calibDirDefault);
00205 
00206   // get/build job name
00207   std::string jobName = cfgsvc.getStr("psana", "job-name", "");
00208   if (jobName.empty() and not inputList.empty()) {
00209     boost::filesystem::path path = inputList.front();
00210     jobName = path.stem().string();
00211   }
00212   MsgLog(logger, debug, "job name = " << jobName);
00213 
00214   // instantiate experiment name provider
00215   boost::shared_ptr<PSEnv::IExpNameProvider> expNameProvider;
00216   if(not cfgsvc.getStr("psana", "experiment", "").empty()) {
00217     const std::string& instr = cfgsvc.getStr("psana", "instrument", "");
00218     const std::string& exp = cfgsvc.getStr("psana", "experiment", "");
00219     expNameProvider = boost::make_shared<ExpNameFromConfig>(instr, exp);
00220   } else {
00221     expNameProvider = boost::make_shared<ExpNameFromDs>(inputList);
00222   }
00223 
00224   // make AliasMap instance
00225   boost::shared_ptr<PSEvt::AliasMap> amap = boost::make_shared<AliasMap>();
00226 
00227   // Guess input data type
00228   ::FileType ftype = ::guessType(inputList.begin(), inputList.end());
00229   MsgLog(logger, debug, "input data type: " << int(ftype));
00230   if (ftype == Mixed) {
00231     MsgLog(logger, error, "Mixed input file types");
00232     return dataSrc;
00233   }
00234 
00235   // check if requested multi-process mode and it's compatible with input data
00236   int nworkers = cfgsvc.get("psana", "parallel", 0);
00237   switch (ftype) {
00238   case IDX:
00239     if (nworkers > 0) {
00240       MsgLog(logger, warning, "Multi-process mode is not available for IDX data, switching to single-process");
00241       nworkers = 0;
00242     }
00243     break;
00244   case HDF5:
00245     if (nworkers > 0) {
00246       MsgLog(logger, warning, "Multi-process mode is not available for HDF5 data, switching to single-process");
00247       nworkers = 0;
00248     }
00249     break;
00250   case XTC:
00251   case SHMEM:
00252   case SMALLDATA:
00253     // OK
00254     break;
00255   case Unknown:
00256     ftype = XTC;
00257     break;
00258   case Mixed:
00259     // should not happen
00260     break;
00261   }
00262   if (nworkers > 255) {
00263     MsgLog(logger, warning, "Number of workers exceeds limit, reduced to 255");
00264     nworkers = 255;
00265   }
00266 
00267   // in parallel mode start spawning workers, workerId will be -1 in master
00268   // and non-negative number in workers
00269   int workerId = -1;
00270   int readyPipe = -1;   // fd for ready pipe
00271   int dPipe = -1;   // fd for data pipe
00272   boost::shared_ptr<std::vector<MPWorkerId> > workers;
00273   if (nworkers > 0) {
00274 
00275     workers = boost::make_shared<std::vector<MPWorkerId> >();
00276 
00277     // make a pipe for ready queue
00278     int rPipe[2];
00279     pipe(rPipe);
00280     readyPipe = rPipe[0]; // to be used by master
00281 
00282     for (int iworker = 0; iworker < nworkers; ++ iworker) {
00283 
00284       // make a pipe for communication with worker
00285       int dataPipe[2];
00286       pipe(dataPipe);
00287 
00288       pid_t pid = fork();
00289       if (pid == -1) {
00290 
00291         // error happened, this is fatal
00292         throw ExceptionErrno(ERR_LOC, "fork failed");
00293 
00294       } else if (pid == 0) {
00295 
00296         // we are in the child (worker) process
00297 
00298         // close pipe ends that we don't use
00299         close(dataPipe[1]);
00300         close(rPipe[0]);
00301 
00302         workerId = iworker;
00303         readyPipe = rPipe[1];
00304         dPipe = dataPipe[0];
00305 
00306         // can cleanup some space
00307         workers.reset();
00308 
00309         MsgLog(logger, trace, "Forked worker #" << iworker << " dataPipeFd: " << dataPipe[0] << " readyPipe: " << readyPipe);
00310 
00311         break;
00312 
00313       } else {
00314 
00315         // we are still in parent process
00316 
00317         // close pipe ends that we don't use
00318         close(dataPipe[0]);
00319 
00320         // save worker info
00321         workers->push_back(MPWorkerId(iworker, pid, dataPipe[1]));
00322         MsgLog(logger, trace, "Add worker #" << iworker << " pid " << pid << " dataPipeFd " << dataPipe[1]);
00323 
00324       }
00325 
00326     }
00327 
00328     if (workerId < 0) {
00329       // close unused end of ready pipe in master
00330       ::close(rPipe[1]);
00331     }
00332   }
00333 
00334 
00335   // Guess input module name
00336   std::string iname;
00337   switch (ftype) {
00338   case XTC:
00339     if (nworkers <= 0) {
00340       // single-process input for XTC
00341       iname = "PSXtcInput.XtcInputModule";
00342     } else if (workerId < 0) {
00343       // master process in multi-process mode
00344       iname = "PSXtcMPInput.XtcMPMasterInput";
00345     } else {
00346       // worker process in multi-process mode
00347       iname = "PSXtcMPInput.XtcMPWorkerInput";
00348     }
00349     break;
00350   case SMALLDATA:
00351     if (nworkers <= 0) {
00352       // single-process input for SMALLDATA
00353       iname = "PSXtcInput.XtcInputModule"; 
00354     } else if (workerId < 0) {
00355       // master process in multi-process mode
00356       MsgLog(logger, fatal, "smldata not supported with parallel");
00357     } else {
00358       // worker process in multi-process mode
00359       MsgLog(logger, fatal, "smldata not supported with parallel");
00360     }
00361     break;
00362   case SHMEM:
00363     if (nworkers <= 0) {
00364       // single-process input for shmem XTC
00365       iname = "PSShmemInput.ShmemInputModule";
00366     } else if (workerId < 0) {
00367       // master process in multi-process mode
00368       iname = "PSXtcMPInput.ShmemMPMasterInput";
00369     } else {
00370       // worker process in multi-process mode
00371       iname = "PSXtcMPInput.XtcMPWorkerInput";
00372     }
00373     break;
00374   case HDF5:
00375     iname = "PSHdf5Input.Hdf5InputModule";
00376     break;
00377   case IDX:
00378     iname = "PSXtcInput.XtcIndexInputModule";
00379     break;
00380   case Unknown:
00381   case Mixed:
00382     // should not happen
00383     break;
00384   }
00385 
00386   // pass datasets/file names to the configuration so that input module can find them
00387   std::string flist = boost::join(inputList, " ");
00388   cfgsvc.put(iname, "input", flist);
00389   cfgsvc.put(iname, "files", flist);
00390   if (readyPipe >= 0) {
00391     cfgsvc.put(iname, "fdReadyPipe", boost::lexical_cast<std::string>(readyPipe));
00392   }
00393   if (workerId >= 0) {
00394     cfgsvc.put(iname, "workerId", boost::lexical_cast<std::string>(workerId));
00395   }
00396   if (dPipe >= 0) {
00397     cfgsvc.put(iname, "fdDataPipe", boost::lexical_cast<std::string>(dPipe));
00398   }
00399 
00400   // Load input module
00401   DynLoader loader;
00402   boost::shared_ptr<psana::InputModule> inputModule(loader.loadInputModule(iname));
00403   MsgLog(logger, trace, "Loaded input module " << iname);
00404 
00405   // Setup environment
00406   boost::shared_ptr<PSEnv::Env> env = boost::make_shared<PSEnv::Env>(jobName, expNameProvider, calibDir, amap, workerId);
00407   MsgLogRoot(debug, "instrument = " << env->instrument() << " experiment = " << env->experiment());
00408   MsgLogRoot(debug, "calibDir = " << env->calibDir());
00409 
00410   // instantiate all user modules
00411   if (nworkers > 0 and workerId < 0) {
00412 
00413     // master process in multi-process mode does not need any user modules
00414 
00415     // put workers info into environment so that it can be seen by master module
00416     env->configStore().put(workers, Pds::Src());
00417 
00418     // install special signal handler so that dying children do not turn into zombies
00419     // and writing to a pipe directed to dead worker does not cause crash
00420     struct sigaction sa;
00421     memset(&sa, 0, sizeof(sa));
00422     sa.sa_handler = SIG_IGN;
00423     sa.sa_flags = SA_NOCLDWAIT;
00424     sigaction(SIGCHLD, &sa, NULL);
00425     sa.sa_flags = 0;
00426     sigaction(SIGPIPE, &sa, NULL);
00427 
00428   } else {
00429 
00430     // single process mode or worker process in multi-process mode
00431 
00432     // get list of modules to load
00433     std::vector<std::string> moduleNames = cfgsvc.getList("psana", "modules", std::vector<std::string>());
00434 
00435     // instantiate all user modules
00436     for ( std::vector<std::string>::const_iterator it = moduleNames.begin(); it != moduleNames.end() ; ++ it ) {
00437       m_modules.push_back(loader.loadModule(*it));
00438       MsgLog(logger, trace, "From psana modules, loaded module " << m_modules.back()->name());
00439     }
00440     if (moduleNames.size()==0) {
00441       MsgLog(logger, trace, "psana modules parameter is empty.");
00442     }
00443 
00444   }
00445 
00446   // make new instance
00447   dataSrc = DataSource(inputModule, m_modules, env);
00448 
00449   return dataSrc;
00450 }
00451 
00452 } // namespace psana

Generated on 19 Dec 2016 for PSANAclasses by  doxygen 1.4.7