PSXtcInput/src/XtcInputModuleBase.cpp

Go to the documentation of this file.
00001 //--------------------------------------------------------------------------
00002 // File and Version Information:
00003 //     $Id: XtcInputModuleBase.cpp 11490 2016-03-16 04:25:57Z cpo@SLAC.STANFORD.EDU $
00004 //
00005 // Description:
00006 //     Class XtcInputModuleBase...
00007 //
00008 // Author List:
00009 //     Andrei Salnikov
00010 //
00011 //------------------------------------------------------------------------
00012 
00013 //-----------------------
00014 // This Class's Header --
00015 //-----------------------
00016 #include "PSXtcInput/XtcInputModuleBase.h"
00017 
00018 //-----------------
00019 // C/C++ Headers --
00020 //-----------------
00021 #include <climits>
00022 #include <algorithm>
00023 #include <iterator>
00024 #include <vector>
00025 #include <boost/make_shared.hpp>
00026 #include <boost/foreach.hpp>
00027 #include <string>
00028 
00029 //-------------------------------
00030 // Collaborating Class Headers --
00031 //-------------------------------
00032 #include "MsgLogger/MsgLogger.h"
00033 #include "pdsdata/xtc/L1AcceptEnv.hh"
00034 #include "pdsdata/psddl/alias.ddl.h"
00035 #include "psddl_psana/epics.ddl.h"
00036 #include "PSTime/Time.h"
00037 #include "PSXtcInput/Exceptions.h"
00038 #include "PSXtcInput/XtcEventId.h"
00039 #include "XtcInput/XtcFileName.h"
00040 #include "XtcInput/XtcIterator.h"
00041 #include "XtcInput/MergeMode.h"
00042 #include "XtcInput/DgramList.h"
00043 #include "XtcInput/DgramUtil.h"
00044 #include "PSEvt/DamageMap.h"
00045 #include "IData/Dataset.h"
00046 #include "psddl_pds2psana/SmallDataProxy.h"
00047 #include "PSEvt/Exceptions.h"
00048 
00049 //-----------------------------------------------------------------------
00050 // Local Macros, Typedefs, Structures, Unions and Forward Declarations --
00051 //-----------------------------------------------------------------------
00052 
00053 using namespace XtcInput;
00054 
00055 namespace {
00056 
00057 
00058   // return true if all datagrams contains EPICS data only
00059   bool epicsOnly(const std::vector<XtcInput::Dgram>& dgs)
00060   {
00061     BOOST_FOREACH(const XtcInput::Dgram& dg, dgs) {
00062       XtcInput::XtcIterator iter(&(dg.dg()->xtc));
00063       while (Pds::Xtc* x = iter.next()) {
00064         switch (x->contains.id()) {
00065         case Pds::TypeId::Id_Xtc:
00066         case Pds::TypeId::Id_Epics:
00067           continue;
00068         default:
00069           return false;
00070         }
00071       }
00072     }
00073     return true;
00074   }
00075 
00076 
00077   long nextNonNegativeValue(const long v) {
00078     if ((v >=0) and (v < LONG_MAX)) return (v+1);
00079     return 0;
00080   }
00081 
00082   bool isConfigOrBeginCalib(const XtcInput::Dgram &dg) {
00083     if (dg.empty()) return false;
00084     XtcInput::Dgram::ptr dgptr = dg.dg();
00085     if (not dgptr) return false;
00086     const Pds::Sequence& seq = dgptr->seq ;
00087     if ((seq.service() == Pds::TransitionId::Configure) or
00088         (seq.service() == Pds::TransitionId::BeginCalibCycle)) {
00089       return true;
00090     }
00091     return false;
00092   }
00093 
00094   // returns true if typeId is for Xtc Any
00095   inline bool checkForAndRecordSrcDamage(const Pds::TypeId& typeId, Pds::Xtc *xtc, 
00096                                          const Pds::Damage &damage, boost::shared_ptr<PSEvt::DamageMap> &damageMap, 
00097                                          const char *loggerName) {
00098     if (typeId.id() == Pds::TypeId::Any) {
00099       if (damage.value()) {
00100         damageMap->addSrcDamage(xtc->src,damage);
00101       } else {
00102         MsgLog(loggerName, warning, loggerName << ": unexpected - xtc type id is 'Any' but its damage=0");
00103       }
00104       return true;
00105     }
00106     return false;
00107   }
00108 
00109 }
00110 
00111 
00112 
00113 //             ----------------------------------------
00114 //             -- Public Function Member Definitions --
00115 //             ----------------------------------------
00116 
00117 namespace PSXtcInput {
00118 
00119 //----------------
00120 // Constructors --
00121 //----------------
00122 XtcInputModuleBase::XtcInputModuleBase (const std::string& name,
00123     const boost::shared_ptr<IDatagramSource>& dgsource, bool noSkip)
00124   : InputModule(name)
00125   , m_dgsource(dgsource)
00126   , m_damagePolicy()
00127   , m_putBack()
00128   , m_cvt()
00129   , m_skipEvents(0)
00130   , m_maxEvents(0)
00131   , m_skipEpics(true)
00132   , m_l3tAcceptOnly(true)
00133   , m_firstControlStream(80)
00134   , m_l1Count(0)
00135   , m_eventTagEpicsStore(0)
00136   , m_simulateEOR(0)
00137   , m_run(-1)
00138   , m_liveMode(false)
00139 {
00140   std::fill_n(m_transitions, int(Pds::TransitionId::NumberOf), Pds::ClockTime(0, 0));
00141 
00142   ConfigSvc::ConfigSvc cfg = configSvc();
00143   m_firstControlStream = cfg.get("psana", "first_control_stream", m_firstControlStream);
00144   if (not noSkip) {
00145     // get number of events to process/skip from psana configuration
00146     m_skipEvents = cfg.get("psana", "skip-events", 0UL);
00147     m_maxEvents = cfg.get("psana", "events", 0UL);
00148     m_skipEpics = cfg.get("psana", "skip-epics", true);
00149     m_l3tAcceptOnly = cfg.get("psana", "l3t-accept-only", true);
00150   }
00151   try {
00152     std::list<std::string> files = configList("files");
00153     
00154     // The DgramReader does a more thorough job of checking the files input, throwing errors
00155     // if live is mixed with dead mode, etc, here if we find "live" in one of the files, we assume
00156     // live mode 
00157 
00158     for (std::list<std::string>::iterator file = files.begin(); file != files.end(); ++file) {
00159       IData::Dataset ds(*file);
00160       if (ds.exists("live")) {
00161         m_liveMode = true;
00162       }
00163     }
00164   } catch (ConfigSvc::Exception &) {
00165     MsgLog(name, error, " " << name << ": unable to read 'files' parameters, assuming non-live mode");
00166   }
00167   m_liveTimeOut = config("liveTimeout", 120U);
00168 }
00169 
00170 //--------------
00171 // Destructor --
00172 //--------------
00173 XtcInputModuleBase::~XtcInputModuleBase ()
00174 {
00175 }
00176 
00177 /// Method which is called once at the beginning of the job
00178 void 
00179 XtcInputModuleBase::beginJob(Event& evt, Env& env)
00180 {
00181   MsgLog(name(), debug, name() << ": in beginJob()");
00182 
00183   m_eventTagEpicsStore = nextNonNegativeValue(m_eventTagEpicsStore);
00184 
00185   // call initialization method for external datagram source
00186   m_dgsource->init();
00187 
00188   // Read initial datagrams, skip all Map transitions, stop at first non-Map.
00189   // If first non-Map is Configure then update event/env, otherwise push
00190   // it into read_back buffer.
00191   bool foundNonMap = false;
00192   for (int count = 0; not foundNonMap; ++ count) {
00193 
00194     std::vector<XtcInput::Dgram> eventDg;
00195     std::vector<XtcInput::Dgram> nonEventDg;
00196     if (not m_dgsource->next(eventDg, nonEventDg)) {
00197       if (count == 0) {
00198         // Nothing there at all, this is unexpected
00199         throw EmptyInput(ERR_LOC);
00200       } else {
00201         // just stop here
00202         break;
00203       }
00204     }
00205 
00206     // fillEnv and fillEvent require the datagrams to have DAQ streams
00207     // first, and Control streams second. We want the EventId to come from
00208     // a DAQ stream if one is present. After sorting by stream number, we
00209     // can use the first datagram for the EventId.
00210     sort(eventDg.begin(), eventDg.end(), LessStream());
00211     sort(nonEventDg.begin(), nonEventDg.end(), LessStream());
00212 
00213     // push non-event stuff to environment. 
00214     fillEnv(nonEventDg, env);
00215 
00216     MsgLog(name(),trace," beginJob datagrams: ");
00217     int idx=0;
00218     BOOST_FOREACH(const XtcInput::Dgram& dg, eventDg) {
00219       MsgLog(name(),debug,"  dg " << idx << ": " << Dgram::dumpStr(dg));
00220       ++idx;
00221     }
00222 
00223     if (not allDgsHaveSameTransition(eventDg)) {
00224       MsgLog(name(), warning, name() << ": first datagrams do not have same transition.");
00225     }
00226 
00227     Pds::TransitionId::Value transition = Pds::TransitionId::Map;
00228     if (not eventDg.empty()) {
00229       transition = eventDg.front().dg()->seq.service();
00230     }
00231 
00232     MsgLog(name(), debug, name() << ": read first datagram(s), transition = "
00233           << Pds::TransitionId::name(transition));
00234 
00235     // skip Map transition
00236     foundNonMap = transition != Pds::TransitionId::Map;
00237     if (not foundNonMap) continue;
00238 
00239     // If this is not Map then we expect Configure here, anything else must be handled in event()
00240     if (transition != Pds::TransitionId::Configure) {
00241       // Something else than Configure, store if for event()
00242       MsgLog(name(), warning, ": Expected Configure transition for first datagram, received "
00243              << Pds::TransitionId::name(transition) );
00244       m_putBack = eventDg;
00245       break;
00246     }
00247 
00248     // get the transition clock time, event id, and event datagram from the first
00249     // event in the list. Given the prior sort by stream number, this will be a DAQ datagram 
00250     // if any DAQ streams are present in this event
00251     XtcInput::Dgram firstDg = eventDg.front();
00252 
00253     m_transitions[firstDg.dg()->seq.service()] = firstDg.dg()->seq.clock();
00254 
00255     fillEventDgList(eventDg, evt);
00256     fillEventId(firstDg, evt);
00257 
00258     boost::shared_ptr<PSEvt::DamageMap> damageMap = boost::make_shared<PSEvt::DamageMap>();
00259     evt.put(damageMap);
00260 
00261     fillEnv(eventDg, env);
00262 
00263     // there is BLD data in Configure which is event-like data
00264     fillEvent(eventDg, evt, env);
00265   }
00266 }
00267 
00268 InputModule::Status 
00269 XtcInputModuleBase::event(Event& evt, Env& env)
00270 {
00271   MsgLog(name(), debug, name() << ": in event() - m_l1Count=" << m_l1Count
00272       << " m_maxEvents=" << m_maxEvents << " m_skipEvents=" << m_skipEvents);
00273 
00274   m_eventTagEpicsStore = nextNonNegativeValue(m_eventTagEpicsStore);
00275   // are we in the simulated EOR/EOF
00276   if (m_simulateEOR > 0) {
00277     // fake EndRun, prepare to stop on next call
00278     MsgLog(name(), debug, name() << ": simulated EOR");
00279     if (m_putBack.size()) {
00280       fillEventId(m_putBack[0], evt);
00281       fillEventDgList(m_putBack, evt);
00282     }
00283     // negative means stop at next call
00284     m_simulateEOR = -1;
00285     return EndRun;
00286   } else if (m_simulateEOR < 0) {
00287     // fake EOF
00288     MsgLog(name(), debug, name() << ": simulated EOF");
00289     return Stop;
00290   }
00291 
00292   Status status = Skip;
00293   bool found = false;
00294   while (not found) {
00295 
00296     std::vector<XtcInput::Dgram> eventDg;
00297     std::vector<XtcInput::Dgram> nonEventDg;
00298 
00299     // get datagram either from saved event or queue
00300     if (not m_putBack.empty()) {
00301 
00302       std::swap(eventDg, m_putBack);
00303       m_putBack.clear();
00304 
00305     } else {
00306 
00307       if (not m_dgsource->next(eventDg, nonEventDg)) {
00308         // finita
00309         MsgLog(name(), debug, ": EOF seen");
00310         return Stop;
00311       }
00312       // sort by stream number to get DAQ streams in front.
00313       // This is for fillEnv and fillEvent, and obtaining the EventId from a 
00314       // DAQ datagram (if any are present for this event).
00315       sort(eventDg.begin(), eventDg.end(), LessStream());
00316       sort(nonEventDg.begin(), nonEventDg.end(), LessStream());
00317 
00318     }
00319 
00320     fillEnv(nonEventDg, env);
00321 
00322     if (eventDg.empty()) {
00323       // can't do anything, skip to next transition
00324       continue;
00325     }
00326 
00327     if (not allDgsHaveSameTransition(eventDg)) {
00328       MsgLog(name(), warning, name() 
00329              << ": eventDg's do not all have the same transition. Using first transition.");
00330       // print the dgram headers since they don't have the same transition
00331       int idx = 0;
00332       BOOST_FOREACH(const XtcInput::Dgram& dg, eventDg) {
00333         MsgLog(name(),info,"  dg " << idx <<": " << Dgram::dumpStr(dg));
00334         ++idx;
00335       }
00336     }
00337     // We use the first datagram in eventDg for the transition, and clock.
00338     // The clock will differ between DAQ and control streams. Given prior sort, this will give
00339     // us a DAQ stream if one is present in this event.
00340     const Pds::Sequence& seq = eventDg.front().dg()->seq;
00341     const Pds::ClockTime& clock = seq.clock();
00342     Pds::TransitionId::Value trans = seq.service();
00343     
00344     MsgLog(name(), debug, name() << ": found " << eventDg.size() << " new datagram(s), transition = "
00345            << Pds::TransitionId::name(trans));
00346     
00347     switch (trans) {
00348     
00349     case Pds::TransitionId::Configure:
00350       if (not (clock == m_transitions[trans])) {
00351         m_transitions[trans] = clock;
00352         fillEnv(eventDg, env);
00353       }
00354       break;
00355       
00356     case Pds::TransitionId::Unconfigure:
00357       break;
00358    
00359     case Pds::TransitionId::BeginRun:
00360       // take run number from transition env, in some streams env is not set (or set to 0), so we want to skip those
00361       BOOST_FOREACH(const XtcInput::Dgram& dg, eventDg) {
00362         if (dg.dg()->env.value() > 0) {
00363           m_run = dg.dg()->env.value();
00364           break;
00365         }
00366       }
00367       // signal new run, content is not relevant
00368       if (not (clock == m_transitions[trans])) {
00369         fillEventId(eventDg.front(), evt);
00370         fillEventDgList(eventDg, evt);
00371         status = BeginRun;
00372         found = true;
00373         m_transitions[trans] = clock;
00374       }
00375       break;
00376     
00377     case Pds::TransitionId::EndRun:
00378       // signal end of run, content is not relevant
00379       if (not (clock == m_transitions[trans])) {
00380         fillEventId(eventDg.front(), evt);
00381         fillEventDgList(eventDg, evt);
00382         // reset run number, so that if next BeginRun is missing we don't reuse this run
00383         m_run = -1;
00384         status = EndRun;
00385         found = true;
00386         m_transitions[trans] = clock;
00387       }
00388       break;
00389     
00390     case Pds::TransitionId::BeginCalibCycle:
00391       // copy config data and signal new calib cycle
00392       if (not (clock == m_transitions[trans])) {
00393         fillEnv(eventDg, env);
00394         fillEventId(eventDg.front(), evt);
00395         fillEventDgList(eventDg, evt);
00396         status = BeginCalibCycle;
00397         found = true;
00398         m_transitions[trans] = clock;
00399       }
00400       break;
00401     
00402     case Pds::TransitionId::EndCalibCycle:
00403       // stop calib cycle
00404       if (not (clock == m_transitions[trans])) {
00405         fillEventId(eventDg.front(), evt);
00406         fillEventDgList(eventDg, evt);
00407         status = EndCalibCycle;
00408         found = true;
00409         m_transitions[trans] = clock;
00410       }
00411       break;
00412     
00413     case Pds::TransitionId::L1Accept:
00414       // regular event
00415 
00416       if (m_l3tAcceptOnly and not l3tAcceptPass(eventDg, m_firstControlStream)) {
00417 
00418         // did not pass L3, its payload is usually empty but if there is Epics
00419         // data in the event it may be preserved, so try to save it
00420         fillEnv(eventDg, env);
00421 
00422       } else if (m_skipEpics and ::epicsOnly(eventDg)) {
00423 
00424         // datagram is likely filtered, has only epics data and users do not need to
00425         // see it. Do not count it as an event too, just save EPICS data and move on.
00426         fillEnv(eventDg, env);
00427 
00428       } else if (m_maxEvents and m_l1Count >= m_skipEvents+m_maxEvents) {
00429 
00430         // reached event limit, will go in simulated end-of-run
00431         MsgLog(name(), debug, name() << ": event limit reached, simulated EndCalibCycle");
00432         fillEventId(eventDg.front(), evt);
00433         fillEventDgList(eventDg, evt);
00434         found = true;
00435         status = EndCalibCycle;
00436         m_simulateEOR = 1;
00437         // remember datagram to be used in simulated EndRun
00438         m_putBack = eventDg;
00439 
00440       } else if (m_l1Count < m_skipEvents) {
00441 
00442         // skipping the events, note that things like environment and EPICS need to be updated
00443         MsgLog(name(), debug, name() << ": skipping event");
00444         fillEnv(eventDg, env);
00445         found = true;
00446         status = Skip;
00447 
00448         ++m_l1Count;
00449 
00450       } else {
00451 
00452         boost::shared_ptr<PSEvt::DamageMap> damageMap = boost::make_shared<PSEvt::DamageMap>();
00453         evt.put(damageMap);
00454         fillEnv(eventDg, env);
00455         fillEvent(eventDg, evt, env);
00456         fillEventId(eventDg.front(), evt);
00457         fillEventDgList(eventDg, evt);
00458         found = true;
00459         status = DoEvent;
00460 
00461         ++m_l1Count;
00462       }
00463 
00464       break;
00465     
00466     case Pds::TransitionId::Unknown:
00467     case Pds::TransitionId::Reset:
00468     case Pds::TransitionId::Map:
00469     case Pds::TransitionId::Unmap:
00470     case Pds::TransitionId::Enable:
00471     case Pds::TransitionId::Disable:
00472     case Pds::TransitionId::NumberOf:
00473       // Do not do anything for these transitions, just go to next
00474       break;
00475     }
00476   }
00477   
00478   return status ;
00479 }
00480 
00481 /// Method which is called once at the end of the job
00482 void 
00483 XtcInputModuleBase::endJob(Event& evt, Env& env)
00484 {
00485 }
00486 
00487 /// Fill event from list of datagrams 
00488 /// Datagrams should be sorted with DAQ streams first, Control streams last.
00489 /// If transition is Configure or BeginCalibCycle, uses first DAQ stream 
00490 /// (skips others) and all control streams.
00491 void 
00492 XtcInputModuleBase::fillEvent(const std::vector<XtcInput::Dgram>& dgList, Event& evt, Env& env)
00493 {
00494   // If the same EventKey is in both the DAQ and Control stream,
00495   // we want the DAQ stream to take precedence so that the DAQ data gets stored.
00496   // Below we go through all datagrams in order, assuming they have been sorted by 
00497   // stream number, we will get the DAQ streams first.
00498 
00499   // The Duplicate key exception is currently caught by the dispatch code.
00500 
00501   // if this is a Configure or BeginCalibCycle, then all DAQ dgrams are 
00502   // duplicates of one another. We need only store one of the DAQ dgrams. 
00503   // It seems cleaner and less error prone to just store one DAQ dgram in this case.
00504   int numDaqStored = 0;
00505   int numControlStored = 0;
00506   bool storedDaq = false;
00507   BOOST_FOREACH(const XtcInput::Dgram& dg, dgList) {
00508     bool isDaq = int(dg.file().stream()) < m_firstControlStream;
00509     if (isDaq and storedDaq and ::isConfigOrBeginCalib(dg)) continue;
00510     fillEvent(dg, evt, env);
00511     if (isDaq) {
00512       ++numDaqStored;
00513       storedDaq = true;
00514     } else {
00515       ++numControlStored;
00516     }
00517   }
00518 
00519   MsgLog(name(), debug, name() << ": in fillEvent() from dgList of " 
00520          << dgList.size() << " dgrams. Stored "
00521          << numControlStored << " control dgrams and "
00522          << numDaqStored << " DAQ dgrams");
00523 }
00524 
00525 // Fill event with datagram contents
00526 void 
00527 XtcInputModuleBase::fillEvent(const XtcInput::Dgram& dg, Event& evt, Env& env)
00528 {
00529   MsgLog(name(), debug, name() << ": in fillEvent()");
00530 
00531   boost::shared_ptr<psddl_pds2psana::SmallDataProxy> smallDataProxy = \
00532     psddl_pds2psana::SmallDataProxy::makeSmallDataProxy(dg.file(), m_liveMode, m_liveTimeOut, m_cvt,  &evt, env);
00533 
00534   Dgram::ptr dgptr = dg.dg();
00535 
00536   boost::shared_ptr<PSEvt::DamageMap> damageMap = evt.get();
00537   // Loop over all XTC contained in the datagram
00538   XtcInput::XtcIterator iter(&dgptr->xtc);
00539   while (Pds::Xtc* xtc = iter.next()) {
00540     const Pds::TypeId& typeId = xtc->contains;
00541     const Pds::Damage damage = xtc->damage;
00542     bool isXtcAny = checkForAndRecordSrcDamage(typeId, xtc, damage, damageMap, name().c_str());
00543     if (isXtcAny) continue;
00544     bool isSmallDataProxy = psddl_pds2psana::SmallDataProxy::isSmallDataProxy(typeId);
00545     std::vector<const std::type_info *> convertTypeInfoPtrs;
00546     bool storeObject;
00547 
00548     if (isSmallDataProxy) {
00549       convertTypeInfoPtrs = psddl_pds2psana::SmallDataProxy::getSmallConvertTypeInfoPtrs(xtc, m_cvt);
00550       const Pds::TypeId proxiedTypeId = psddl_pds2psana::SmallDataProxy::getSmallDataProxiedType(xtc);
00551       if (proxiedTypeId.id() == Pds::TypeId::Id_Epics) {
00552         MsgLog(name(), error, " epics has been proxied in small data - skipping");
00553         continue;
00554       }
00555       storeObject = m_damagePolicy.eventDamagePolicy(damage, proxiedTypeId.id());
00556     } else {
00557       convertTypeInfoPtrs = m_cvt.getConvertTypeInfoPtrs(typeId);
00558       storeObject = m_damagePolicy.eventDamagePolicy(damage, typeId.id());
00559     }        
00560 
00561     for (unsigned idx=0; idx < convertTypeInfoPtrs.size(); ++idx) {
00562       (*damageMap)[PSEvt::EventKey( convertTypeInfoPtrs[idx], xtc->src, "") ] = damage;
00563     }
00564 
00565     if (not storeObject) {
00566       MsgLog(name(),debug, name() << " damage = " << damage.value() 
00567              << " src=" << xtc->src << " not storing in Event. SmallDataProxy=" << isSmallDataProxy);
00568       continue;
00569     }
00570 
00571     boost::shared_ptr<Pds::Xtc> xptr(dgptr, xtc);   
00572     if (isSmallDataProxy) {
00573       if (not smallDataProxy) {
00574         MsgLog(name(), warning, name() << " smallDataProxy typeid found but smallDataProxy is null. Skiping (is this a .smd.xtc file?).");
00575       } else {
00576         smallDataProxy->addEventProxy(xptr, convertTypeInfoPtrs);
00577       }
00578     } else {
00579       m_cvt.convert(xptr, evt, env.configStore());
00580     }
00581     
00582   }
00583 
00584   if (smallDataProxy) smallDataProxy->finalize();
00585 
00586 }
00587 
00588 void
00589 XtcInputModuleBase::fillEventId(const XtcInput::Dgram& dg, Event& evt)
00590 {
00591   MsgLog(name(), debug, name() << ": in fillEventId()");
00592 
00593   Dgram::ptr dgptr = dg.dg();
00594 
00595   const Pds::Sequence& seq = dgptr->seq ;
00596   const Pds::ClockTime& clock = seq.clock() ;
00597 
00598   // Store event ID
00599   PSTime::Time evtTime(clock.seconds(), clock.nanoseconds());
00600   unsigned run = m_run > 0 ? int(m_run) : dg.file().run();
00601   unsigned fiducials = seq.stamp().fiducials();
00602   unsigned ticks = seq.stamp().ticks();
00603   unsigned vect = seq.stamp().vector();
00604   unsigned control = seq.stamp().control();
00605   boost::shared_ptr<PSEvt::EventId> eventId = boost::make_shared<XtcEventId>(run, evtTime, fiducials, ticks, vect, control);
00606   evt.put(eventId);
00607 }
00608 
00609 void
00610 XtcInputModuleBase::fillEventDgList(const std::vector<XtcInput::Dgram> & dgList, Event& evt)
00611 {
00612   MsgLog(name(), debug, name() << ": in fillEventDgList()");
00613 
00614   boost::shared_ptr< XtcInput::DgramList > dgramList = 
00615     boost::make_shared< XtcInput::DgramList >();
00616 
00617   BOOST_FOREACH(const XtcInput::Dgram & dg, dgList) {
00618     dgramList->push_back(dg);
00619   }
00620 
00621   // Store list of datagrams in the event
00622   evt.put(dgramList);
00623 }
00624 
00625 /// Fill environment from list of datagrams 
00626 /// Datagrams should be sorted with DAQ streams first, Control streams last.
00627 /// If transition is Configure or BeginCalibCycle, uses first DAQ stream 
00628 /// (skips others) and all control streams.
00629 void 
00630 XtcInputModuleBase::fillEnv(const std::vector<XtcInput::Dgram> & dgList, Env& env)
00631 {
00632   // If the same EventKey is in both the DAQ and Control stream,
00633   // we want the DAQ stream to take precedence.
00634   // we process the Control streams first, then the DAQ streams.
00635   // The DAQ entry will overwrite the Control entry. 
00636 
00637   // If this is a Configure or BeginCalibCycle, then all DAQ dgrams are *almost* entirely
00638   // duplicates of one another. The only difference observed is that some src's can include 
00639   // an IP address that will differ between the DAQ streams. These different src's will 
00640   // create different entries in the configStore (even though the data is the same).
00641   // It is cleaner and less error prone to only process one of the DAQ streams because of this case.
00642 
00643   // store control streams
00644   int numControlStored = 0;
00645   BOOST_REVERSE_FOREACH(const XtcInput::Dgram& dg, dgList) {
00646     bool isDaq = int(dg.file().stream()) < m_firstControlStream;
00647     if (isDaq) break;
00648     fillEnv(dg, env);
00649     ++numControlStored;
00650   }
00651 
00652   // store all Daq, or just first if config or beginCalibCycle
00653   int numDaqStored = 0;
00654   bool storedDaq = false;
00655   BOOST_FOREACH(const XtcInput::Dgram& dg, dgList) {
00656     bool isDaq = int(dg.file().stream()) < m_firstControlStream;
00657     if (isDaq and storedDaq and ::isConfigOrBeginCalib(dg)) break;
00658     if (not isDaq) break; // we already stored control streams
00659     fillEnv(dg, env);
00660     ++numDaqStored;
00661     if (isDaq) storedDaq = true;
00662   }
00663   MsgLog(name(), debug, name() << ": in fillEnv() from dgList of " 
00664          << dgList.size() << " dgrams. Stored "
00665          << numControlStored << " control dgrams and "
00666          << numDaqStored << " DAQ dgrams");
00667 }
00668 
00669 
00670 // Fill environment with datagram contents
00671 void 
00672 XtcInputModuleBase::fillEnv(const XtcInput::Dgram& dg, Env& env)
00673 {
00674   MsgLog(name(), debug, name() << ": in fillEnv()");
00675 
00676   // All objects in datagram in Configuration and BeginCalibCycle transitions
00677   // (except for EPICS data) are considered configuration data. Just store them
00678   // them in the ConfigStore part of the environment
00679 
00680   Dgram::ptr dgptr = dg.dg();
00681 
00682   boost::shared_ptr<psddl_pds2psana::SmallDataProxy> smallDataProxy = \
00683     psddl_pds2psana::SmallDataProxy::makeSmallDataProxy(dg.file(), m_liveMode, 
00684                                                         m_liveTimeOut, m_cvt, 0, env);
00685 
00686   if (::isConfigOrBeginCalib(dg)) {
00687 
00688     // before we start adding all config types we need to update alias map so
00689     // that when we add objects to proxy dict correct version of alias map is used
00690     boost::shared_ptr<PSEvt::AliasMap> amap = env.aliasMap();
00691     if (amap) {
00692       XtcInput::XtcIterator iter1(&dgptr->xtc);
00693       while (Pds::Xtc* xtc = iter1.next()) {
00694         if (xtc->src.level()==Pds::Level::Source) {
00695           const Pds::DetInfo& di = *(Pds::DetInfo*)(&xtc->src);
00696           amap->addsrc(di);
00697         }
00698         if (xtc->src.level()==Pds::Level::Reporter) {
00699           const Pds::BldInfo& bi = *(Pds::BldInfo*)(&xtc->src);
00700           amap->addsrc(bi);
00701         }
00702         if (xtc->contains.id() == Pds::TypeId::Id_AliasConfig) {
00703           
00704           boost::shared_ptr<PSEvt::AliasMap> amap = env.aliasMap();
00705           
00706           if (xtc->contains.version() == 1) {
00707             const Pds::Alias::ConfigV1* cfgV1 = (const Pds::Alias::ConfigV1*)xtc->payload();
00708             const ndarray<const Pds::Alias::SrcAlias, 1>& aliases = cfgV1->srcAlias();
00709             for (unsigned i = 0; i != aliases.shape()[0]; ++ i) {
00710               const Pds::Alias::SrcAlias& alias = aliases[i];
00711               amap->add(alias.aliasName(), alias.src());
00712             }
00713           } else {
00714             MsgLog(name(), warning, name() << ": failed to find Alias::ConfigV1 in config store");
00715           }
00716         }
00717       }
00718     }
00719     
00720  
00721     // Loop over all XTC contained in the datagram
00722     XtcInput::XtcIterator iter(&dgptr->xtc);
00723     while (Pds::Xtc* xtc = iter.next()) {
00724       if (xtc->contains.id() != Pds::TypeId::Id_Epics) {
00725         boost::shared_ptr<Pds::Xtc> xptr(dgptr, xtc);
00726         if (psddl_pds2psana::SmallDataProxy::isSmallDataProxy(xtc->contains)) {
00727           if (smallDataProxy) {
00728             // not checking if epics as already checked above
00729             smallDataProxy->addEnvProxy(xptr, psddl_pds2psana::SmallDataProxy::getSmallConvertTypeInfoPtrs(xtc, m_cvt));
00730           } else {
00731             MsgLog(name(), warning, name() << ": smallDataProxy typeid found but smallDataProxy is null. Skiping (is this a .smd.xtc file?).");
00732           }
00733         } else {
00734           // call the converter which will fill config store
00735           m_cvt.convertConfig(xptr, env.configStore());
00736         }
00737       }
00738       
00739       if (xtc->contains.id() == Pds::TypeId::Id_EpicsConfig) {
00740         // need to tell Epics store about aliases
00741         boost::shared_ptr<Psana::Epics::ConfigV1> cfgV1 = env.configStore().get(xtc->src);
00742         if (cfgV1) {
00743           const ndarray<const Psana::Epics::PvConfigV1, 1>& pvs = cfgV1->getPvConfig();
00744           for (unsigned i = 0; i != pvs.shape()[0]; ++ i) {
00745             const Psana::Epics::PvConfigV1& pvcfg = pvs[i];
00746             env.epicsStore().storeAlias(xtc->src, pvcfg.pvId(), pvcfg.description());
00747           }
00748         }
00749       }
00750 
00751     }
00752       
00753   }
00754 
00755   // Convert EPICS too and store it in EPICS store
00756   // Loop over all XTC contained in the datagram
00757   XtcInput::XtcIterator iter(&dgptr->xtc);
00758   while (Pds::Xtc* xtc = iter.next()) {
00759 
00760     if (xtc->contains.id() == Pds::TypeId::Id_Epics) {
00761       // call the converter which will fill config store
00762       boost::shared_ptr<Pds::Xtc> xptr(dgptr, xtc);
00763       m_cvt.convertEpics(xptr, env.epicsStore(), m_eventTagEpicsStore);
00764     }
00765     
00766   }
00767   if (smallDataProxy) smallDataProxy->finalize();
00768 }
00769 
00770 bool XtcInputModuleBase::liveAvail(int numEvents) {
00771   return m_dgsource->liveAvail(numEvents);
00772 }
00773 
00774 } // namespace PSXtcInput

Generated on 19 Dec 2016 for PSDMSoftware by  doxygen 1.4.7