00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include "PSXtcInput/XtcInputModuleBase.h"
00017
00018
00019
00020
00021 #include <climits>
00022 #include <algorithm>
00023 #include <iterator>
00024 #include <vector>
00025 #include <boost/make_shared.hpp>
00026 #include <boost/foreach.hpp>
00027 #include <string>
00028
00029
00030
00031
00032 #include "MsgLogger/MsgLogger.h"
00033 #include "pdsdata/xtc/L1AcceptEnv.hh"
00034 #include "pdsdata/psddl/alias.ddl.h"
00035 #include "psddl_psana/epics.ddl.h"
00036 #include "PSTime/Time.h"
00037 #include "PSXtcInput/Exceptions.h"
00038 #include "PSXtcInput/XtcEventId.h"
00039 #include "XtcInput/XtcFileName.h"
00040 #include "XtcInput/XtcIterator.h"
00041 #include "XtcInput/MergeMode.h"
00042 #include "XtcInput/DgramList.h"
00043 #include "XtcInput/DgramUtil.h"
00044 #include "PSEvt/DamageMap.h"
00045 #include "IData/Dataset.h"
00046 #include "psddl_pds2psana/SmallDataProxy.h"
00047 #include "PSEvt/Exceptions.h"
00048
00049
00050
00051
00052
00053 using namespace XtcInput;
00054
00055 namespace {
00056
00057
00058
00059 bool epicsOnly(const std::vector<XtcInput::Dgram>& dgs)
00060 {
00061 BOOST_FOREACH(const XtcInput::Dgram& dg, dgs) {
00062 XtcInput::XtcIterator iter(&(dg.dg()->xtc));
00063 while (Pds::Xtc* x = iter.next()) {
00064 switch (x->contains.id()) {
00065 case Pds::TypeId::Id_Xtc:
00066 case Pds::TypeId::Id_Epics:
00067 continue;
00068 default:
00069 return false;
00070 }
00071 }
00072 }
00073 return true;
00074 }
00075
00076
00077 long nextNonNegativeValue(const long v) {
00078 if ((v >=0) and (v < LONG_MAX)) return (v+1);
00079 return 0;
00080 }
00081
00082 bool isConfigOrBeginCalib(const XtcInput::Dgram &dg) {
00083 if (dg.empty()) return false;
00084 XtcInput::Dgram::ptr dgptr = dg.dg();
00085 if (not dgptr) return false;
00086 const Pds::Sequence& seq = dgptr->seq ;
00087 if ((seq.service() == Pds::TransitionId::Configure) or
00088 (seq.service() == Pds::TransitionId::BeginCalibCycle)) {
00089 return true;
00090 }
00091 return false;
00092 }
00093
00094
00095 inline bool checkForAndRecordSrcDamage(const Pds::TypeId& typeId, Pds::Xtc *xtc,
00096 const Pds::Damage &damage, boost::shared_ptr<PSEvt::DamageMap> &damageMap,
00097 const char *loggerName) {
00098 if (typeId.id() == Pds::TypeId::Any) {
00099 if (damage.value()) {
00100 damageMap->addSrcDamage(xtc->src,damage);
00101 } else {
00102 MsgLog(loggerName, warning, loggerName << ": unexpected - xtc type id is 'Any' but its damage=0");
00103 }
00104 return true;
00105 }
00106 return false;
00107 }
00108
00109 }
00110
00111
00112
00113
00114
00115
00116
00117 namespace PSXtcInput {
00118
00119
00120
00121
00122 XtcInputModuleBase::XtcInputModuleBase (const std::string& name,
00123 const boost::shared_ptr<IDatagramSource>& dgsource, bool noSkip)
00124 : InputModule(name)
00125 , m_dgsource(dgsource)
00126 , m_damagePolicy()
00127 , m_putBack()
00128 , m_cvt()
00129 , m_skipEvents(0)
00130 , m_maxEvents(0)
00131 , m_skipEpics(true)
00132 , m_l3tAcceptOnly(true)
00133 , m_firstControlStream(80)
00134 , m_l1Count(0)
00135 , m_eventTagEpicsStore(0)
00136 , m_simulateEOR(0)
00137 , m_run(-1)
00138 , m_liveMode(false)
00139 {
00140 std::fill_n(m_transitions, int(Pds::TransitionId::NumberOf), Pds::ClockTime(0, 0));
00141
00142 ConfigSvc::ConfigSvc cfg = configSvc();
00143 m_firstControlStream = cfg.get("psana", "first_control_stream", m_firstControlStream);
00144 if (not noSkip) {
00145
00146 m_skipEvents = cfg.get("psana", "skip-events", 0UL);
00147 m_maxEvents = cfg.get("psana", "events", 0UL);
00148 m_skipEpics = cfg.get("psana", "skip-epics", true);
00149 m_l3tAcceptOnly = cfg.get("psana", "l3t-accept-only", true);
00150 }
00151 try {
00152 std::list<std::string> files = configList("files");
00153
00154
00155
00156
00157
00158 for (std::list<std::string>::iterator file = files.begin(); file != files.end(); ++file) {
00159 IData::Dataset ds(*file);
00160 if (ds.exists("live")) {
00161 m_liveMode = true;
00162 }
00163 }
00164 } catch (ConfigSvc::Exception &) {
00165 MsgLog(name, error, " " << name << ": unable to read 'files' parameters, assuming non-live mode");
00166 }
00167 m_liveTimeOut = config("liveTimeout", 120U);
00168 }
00169
00170
00171
00172
00173 XtcInputModuleBase::~XtcInputModuleBase ()
00174 {
00175 }
00176
00177
00178 void
00179 XtcInputModuleBase::beginJob(Event& evt, Env& env)
00180 {
00181 MsgLog(name(), debug, name() << ": in beginJob()");
00182
00183 m_eventTagEpicsStore = nextNonNegativeValue(m_eventTagEpicsStore);
00184
00185
00186 m_dgsource->init();
00187
00188
00189
00190
00191 bool foundNonMap = false;
00192 for (int count = 0; not foundNonMap; ++ count) {
00193
00194 std::vector<XtcInput::Dgram> eventDg;
00195 std::vector<XtcInput::Dgram> nonEventDg;
00196 if (not m_dgsource->next(eventDg, nonEventDg)) {
00197 if (count == 0) {
00198
00199 throw EmptyInput(ERR_LOC);
00200 } else {
00201
00202 break;
00203 }
00204 }
00205
00206
00207
00208
00209
00210 sort(eventDg.begin(), eventDg.end(), LessStream());
00211 sort(nonEventDg.begin(), nonEventDg.end(), LessStream());
00212
00213
00214 fillEnv(nonEventDg, env);
00215
00216 MsgLog(name(),trace," beginJob datagrams: ");
00217 int idx=0;
00218 BOOST_FOREACH(const XtcInput::Dgram& dg, eventDg) {
00219 MsgLog(name(),debug," dg " << idx << ": " << Dgram::dumpStr(dg));
00220 ++idx;
00221 }
00222
00223 if (not allDgsHaveSameTransition(eventDg)) {
00224 MsgLog(name(), warning, name() << ": first datagrams do not have same transition.");
00225 }
00226
00227 Pds::TransitionId::Value transition = Pds::TransitionId::Map;
00228 if (not eventDg.empty()) {
00229 transition = eventDg.front().dg()->seq.service();
00230 }
00231
00232 MsgLog(name(), debug, name() << ": read first datagram(s), transition = "
00233 << Pds::TransitionId::name(transition));
00234
00235
00236 foundNonMap = transition != Pds::TransitionId::Map;
00237 if (not foundNonMap) continue;
00238
00239
00240 if (transition != Pds::TransitionId::Configure) {
00241
00242 MsgLog(name(), warning, ": Expected Configure transition for first datagram, received "
00243 << Pds::TransitionId::name(transition) );
00244 m_putBack = eventDg;
00245 break;
00246 }
00247
00248
00249
00250
00251 XtcInput::Dgram firstDg = eventDg.front();
00252
00253 m_transitions[firstDg.dg()->seq.service()] = firstDg.dg()->seq.clock();
00254
00255 fillEventDgList(eventDg, evt);
00256 fillEventId(firstDg, evt);
00257
00258 boost::shared_ptr<PSEvt::DamageMap> damageMap = boost::make_shared<PSEvt::DamageMap>();
00259 evt.put(damageMap);
00260
00261 fillEnv(eventDg, env);
00262
00263
00264 fillEvent(eventDg, evt, env);
00265 }
00266 }
00267
00268 InputModule::Status
00269 XtcInputModuleBase::event(Event& evt, Env& env)
00270 {
00271 MsgLog(name(), debug, name() << ": in event() - m_l1Count=" << m_l1Count
00272 << " m_maxEvents=" << m_maxEvents << " m_skipEvents=" << m_skipEvents);
00273
00274 m_eventTagEpicsStore = nextNonNegativeValue(m_eventTagEpicsStore);
00275
00276 if (m_simulateEOR > 0) {
00277
00278 MsgLog(name(), debug, name() << ": simulated EOR");
00279 if (m_putBack.size()) {
00280 fillEventId(m_putBack[0], evt);
00281 fillEventDgList(m_putBack, evt);
00282 }
00283
00284 m_simulateEOR = -1;
00285 return EndRun;
00286 } else if (m_simulateEOR < 0) {
00287
00288 MsgLog(name(), debug, name() << ": simulated EOF");
00289 return Stop;
00290 }
00291
00292 Status status = Skip;
00293 bool found = false;
00294 while (not found) {
00295
00296 std::vector<XtcInput::Dgram> eventDg;
00297 std::vector<XtcInput::Dgram> nonEventDg;
00298
00299
00300 if (not m_putBack.empty()) {
00301
00302 std::swap(eventDg, m_putBack);
00303 m_putBack.clear();
00304
00305 } else {
00306
00307 if (not m_dgsource->next(eventDg, nonEventDg)) {
00308
00309 MsgLog(name(), debug, ": EOF seen");
00310 return Stop;
00311 }
00312
00313
00314
00315 sort(eventDg.begin(), eventDg.end(), LessStream());
00316 sort(nonEventDg.begin(), nonEventDg.end(), LessStream());
00317
00318 }
00319
00320 fillEnv(nonEventDg, env);
00321
00322 if (eventDg.empty()) {
00323
00324 continue;
00325 }
00326
00327 if (not allDgsHaveSameTransition(eventDg)) {
00328 MsgLog(name(), warning, name()
00329 << ": eventDg's do not all have the same transition. Using first transition.");
00330
00331 int idx = 0;
00332 BOOST_FOREACH(const XtcInput::Dgram& dg, eventDg) {
00333 MsgLog(name(),info," dg " << idx <<": " << Dgram::dumpStr(dg));
00334 ++idx;
00335 }
00336 }
00337
00338
00339
00340 const Pds::Sequence& seq = eventDg.front().dg()->seq;
00341 const Pds::ClockTime& clock = seq.clock();
00342 Pds::TransitionId::Value trans = seq.service();
00343
00344 MsgLog(name(), debug, name() << ": found " << eventDg.size() << " new datagram(s), transition = "
00345 << Pds::TransitionId::name(trans));
00346
00347 switch (trans) {
00348
00349 case Pds::TransitionId::Configure:
00350 if (not (clock == m_transitions[trans])) {
00351 m_transitions[trans] = clock;
00352 fillEnv(eventDg, env);
00353 }
00354 break;
00355
00356 case Pds::TransitionId::Unconfigure:
00357 break;
00358
00359 case Pds::TransitionId::BeginRun:
00360
00361 BOOST_FOREACH(const XtcInput::Dgram& dg, eventDg) {
00362 if (dg.dg()->env.value() > 0) {
00363 m_run = dg.dg()->env.value();
00364 break;
00365 }
00366 }
00367
00368 if (not (clock == m_transitions[trans])) {
00369 fillEventId(eventDg.front(), evt);
00370 fillEventDgList(eventDg, evt);
00371 status = BeginRun;
00372 found = true;
00373 m_transitions[trans] = clock;
00374 }
00375 break;
00376
00377 case Pds::TransitionId::EndRun:
00378
00379 if (not (clock == m_transitions[trans])) {
00380 fillEventId(eventDg.front(), evt);
00381 fillEventDgList(eventDg, evt);
00382
00383 m_run = -1;
00384 status = EndRun;
00385 found = true;
00386 m_transitions[trans] = clock;
00387 }
00388 break;
00389
00390 case Pds::TransitionId::BeginCalibCycle:
00391
00392 if (not (clock == m_transitions[trans])) {
00393 fillEnv(eventDg, env);
00394 fillEventId(eventDg.front(), evt);
00395 fillEventDgList(eventDg, evt);
00396 status = BeginCalibCycle;
00397 found = true;
00398 m_transitions[trans] = clock;
00399 }
00400 break;
00401
00402 case Pds::TransitionId::EndCalibCycle:
00403
00404 if (not (clock == m_transitions[trans])) {
00405 fillEventId(eventDg.front(), evt);
00406 fillEventDgList(eventDg, evt);
00407 status = EndCalibCycle;
00408 found = true;
00409 m_transitions[trans] = clock;
00410 }
00411 break;
00412
00413 case Pds::TransitionId::L1Accept:
00414
00415
00416 if (m_l3tAcceptOnly and not l3tAcceptPass(eventDg, m_firstControlStream)) {
00417
00418
00419
00420 fillEnv(eventDg, env);
00421
00422 } else if (m_skipEpics and ::epicsOnly(eventDg)) {
00423
00424
00425
00426 fillEnv(eventDg, env);
00427
00428 } else if (m_maxEvents and m_l1Count >= m_skipEvents+m_maxEvents) {
00429
00430
00431 MsgLog(name(), debug, name() << ": event limit reached, simulated EndCalibCycle");
00432 fillEventId(eventDg.front(), evt);
00433 fillEventDgList(eventDg, evt);
00434 found = true;
00435 status = EndCalibCycle;
00436 m_simulateEOR = 1;
00437
00438 m_putBack = eventDg;
00439
00440 } else if (m_l1Count < m_skipEvents) {
00441
00442
00443 MsgLog(name(), debug, name() << ": skipping event");
00444 fillEnv(eventDg, env);
00445 found = true;
00446 status = Skip;
00447
00448 ++m_l1Count;
00449
00450 } else {
00451
00452 boost::shared_ptr<PSEvt::DamageMap> damageMap = boost::make_shared<PSEvt::DamageMap>();
00453 evt.put(damageMap);
00454 fillEnv(eventDg, env);
00455 fillEvent(eventDg, evt, env);
00456 fillEventId(eventDg.front(), evt);
00457 fillEventDgList(eventDg, evt);
00458 found = true;
00459 status = DoEvent;
00460
00461 ++m_l1Count;
00462 }
00463
00464 break;
00465
00466 case Pds::TransitionId::Unknown:
00467 case Pds::TransitionId::Reset:
00468 case Pds::TransitionId::Map:
00469 case Pds::TransitionId::Unmap:
00470 case Pds::TransitionId::Enable:
00471 case Pds::TransitionId::Disable:
00472 case Pds::TransitionId::NumberOf:
00473
00474 break;
00475 }
00476 }
00477
00478 return status ;
00479 }
00480
00481
00482 void
00483 XtcInputModuleBase::endJob(Event& evt, Env& env)
00484 {
00485 }
00486
00487
00488
00489
00490
00491 void
00492 XtcInputModuleBase::fillEvent(const std::vector<XtcInput::Dgram>& dgList, Event& evt, Env& env)
00493 {
00494
00495
00496
00497
00498
00499
00500
00501
00502
00503
00504 int numDaqStored = 0;
00505 int numControlStored = 0;
00506 bool storedDaq = false;
00507 BOOST_FOREACH(const XtcInput::Dgram& dg, dgList) {
00508 bool isDaq = int(dg.file().stream()) < m_firstControlStream;
00509 if (isDaq and storedDaq and ::isConfigOrBeginCalib(dg)) continue;
00510 fillEvent(dg, evt, env);
00511 if (isDaq) {
00512 ++numDaqStored;
00513 storedDaq = true;
00514 } else {
00515 ++numControlStored;
00516 }
00517 }
00518
00519 MsgLog(name(), debug, name() << ": in fillEvent() from dgList of "
00520 << dgList.size() << " dgrams. Stored "
00521 << numControlStored << " control dgrams and "
00522 << numDaqStored << " DAQ dgrams");
00523 }
00524
00525
00526 void
00527 XtcInputModuleBase::fillEvent(const XtcInput::Dgram& dg, Event& evt, Env& env)
00528 {
00529 MsgLog(name(), debug, name() << ": in fillEvent()");
00530
00531 boost::shared_ptr<psddl_pds2psana::SmallDataProxy> smallDataProxy = \
00532 psddl_pds2psana::SmallDataProxy::makeSmallDataProxy(dg.file(), m_liveMode, m_liveTimeOut, m_cvt, &evt, env);
00533
00534 Dgram::ptr dgptr = dg.dg();
00535
00536 boost::shared_ptr<PSEvt::DamageMap> damageMap = evt.get();
00537
00538 XtcInput::XtcIterator iter(&dgptr->xtc);
00539 while (Pds::Xtc* xtc = iter.next()) {
00540 const Pds::TypeId& typeId = xtc->contains;
00541 const Pds::Damage damage = xtc->damage;
00542 bool isXtcAny = checkForAndRecordSrcDamage(typeId, xtc, damage, damageMap, name().c_str());
00543 if (isXtcAny) continue;
00544 bool isSmallDataProxy = psddl_pds2psana::SmallDataProxy::isSmallDataProxy(typeId);
00545 std::vector<const std::type_info *> convertTypeInfoPtrs;
00546 bool storeObject;
00547
00548 if (isSmallDataProxy) {
00549 convertTypeInfoPtrs = psddl_pds2psana::SmallDataProxy::getSmallConvertTypeInfoPtrs(xtc, m_cvt);
00550 const Pds::TypeId proxiedTypeId = psddl_pds2psana::SmallDataProxy::getSmallDataProxiedType(xtc);
00551 if (proxiedTypeId.id() == Pds::TypeId::Id_Epics) {
00552 MsgLog(name(), error, " epics has been proxied in small data - skipping");
00553 continue;
00554 }
00555 storeObject = m_damagePolicy.eventDamagePolicy(damage, proxiedTypeId.id());
00556 } else {
00557 convertTypeInfoPtrs = m_cvt.getConvertTypeInfoPtrs(typeId);
00558 storeObject = m_damagePolicy.eventDamagePolicy(damage, typeId.id());
00559 }
00560
00561 for (unsigned idx=0; idx < convertTypeInfoPtrs.size(); ++idx) {
00562 (*damageMap)[PSEvt::EventKey( convertTypeInfoPtrs[idx], xtc->src, "") ] = damage;
00563 }
00564
00565 if (not storeObject) {
00566 MsgLog(name(),debug, name() << " damage = " << damage.value()
00567 << " src=" << xtc->src << " not storing in Event. SmallDataProxy=" << isSmallDataProxy);
00568 continue;
00569 }
00570
00571 boost::shared_ptr<Pds::Xtc> xptr(dgptr, xtc);
00572 if (isSmallDataProxy) {
00573 if (not smallDataProxy) {
00574 MsgLog(name(), warning, name() << " smallDataProxy typeid found but smallDataProxy is null. Skiping (is this a .smd.xtc file?).");
00575 } else {
00576 smallDataProxy->addEventProxy(xptr, convertTypeInfoPtrs);
00577 }
00578 } else {
00579 m_cvt.convert(xptr, evt, env.configStore());
00580 }
00581
00582 }
00583
00584 if (smallDataProxy) smallDataProxy->finalize();
00585
00586 }
00587
00588 void
00589 XtcInputModuleBase::fillEventId(const XtcInput::Dgram& dg, Event& evt)
00590 {
00591 MsgLog(name(), debug, name() << ": in fillEventId()");
00592
00593 Dgram::ptr dgptr = dg.dg();
00594
00595 const Pds::Sequence& seq = dgptr->seq ;
00596 const Pds::ClockTime& clock = seq.clock() ;
00597
00598
00599 PSTime::Time evtTime(clock.seconds(), clock.nanoseconds());
00600 unsigned run = m_run > 0 ? int(m_run) : dg.file().run();
00601 unsigned fiducials = seq.stamp().fiducials();
00602 unsigned ticks = seq.stamp().ticks();
00603 unsigned vect = seq.stamp().vector();
00604 unsigned control = seq.stamp().control();
00605 boost::shared_ptr<PSEvt::EventId> eventId = boost::make_shared<XtcEventId>(run, evtTime, fiducials, ticks, vect, control);
00606 evt.put(eventId);
00607 }
00608
00609 void
00610 XtcInputModuleBase::fillEventDgList(const std::vector<XtcInput::Dgram> & dgList, Event& evt)
00611 {
00612 MsgLog(name(), debug, name() << ": in fillEventDgList()");
00613
00614 boost::shared_ptr< XtcInput::DgramList > dgramList =
00615 boost::make_shared< XtcInput::DgramList >();
00616
00617 BOOST_FOREACH(const XtcInput::Dgram & dg, dgList) {
00618 dgramList->push_back(dg);
00619 }
00620
00621
00622 evt.put(dgramList);
00623 }
00624
00625
00626
00627
00628
00629 void
00630 XtcInputModuleBase::fillEnv(const std::vector<XtcInput::Dgram> & dgList, Env& env)
00631 {
00632
00633
00634
00635
00636
00637
00638
00639
00640
00641
00642
00643
00644 int numControlStored = 0;
00645 BOOST_REVERSE_FOREACH(const XtcInput::Dgram& dg, dgList) {
00646 bool isDaq = int(dg.file().stream()) < m_firstControlStream;
00647 if (isDaq) break;
00648 fillEnv(dg, env);
00649 ++numControlStored;
00650 }
00651
00652
00653 int numDaqStored = 0;
00654 bool storedDaq = false;
00655 BOOST_FOREACH(const XtcInput::Dgram& dg, dgList) {
00656 bool isDaq = int(dg.file().stream()) < m_firstControlStream;
00657 if (isDaq and storedDaq and ::isConfigOrBeginCalib(dg)) break;
00658 if (not isDaq) break;
00659 fillEnv(dg, env);
00660 ++numDaqStored;
00661 if (isDaq) storedDaq = true;
00662 }
00663 MsgLog(name(), debug, name() << ": in fillEnv() from dgList of "
00664 << dgList.size() << " dgrams. Stored "
00665 << numControlStored << " control dgrams and "
00666 << numDaqStored << " DAQ dgrams");
00667 }
00668
00669
00670
00671 void
00672 XtcInputModuleBase::fillEnv(const XtcInput::Dgram& dg, Env& env)
00673 {
00674 MsgLog(name(), debug, name() << ": in fillEnv()");
00675
00676
00677
00678
00679
00680 Dgram::ptr dgptr = dg.dg();
00681
00682 boost::shared_ptr<psddl_pds2psana::SmallDataProxy> smallDataProxy = \
00683 psddl_pds2psana::SmallDataProxy::makeSmallDataProxy(dg.file(), m_liveMode,
00684 m_liveTimeOut, m_cvt, 0, env);
00685
00686 if (::isConfigOrBeginCalib(dg)) {
00687
00688
00689
00690 boost::shared_ptr<PSEvt::AliasMap> amap = env.aliasMap();
00691 if (amap) {
00692 XtcInput::XtcIterator iter1(&dgptr->xtc);
00693 while (Pds::Xtc* xtc = iter1.next()) {
00694 if (xtc->src.level()==Pds::Level::Source) {
00695 const Pds::DetInfo& di = *(Pds::DetInfo*)(&xtc->src);
00696 amap->addsrc(di);
00697 }
00698 if (xtc->src.level()==Pds::Level::Reporter) {
00699 const Pds::BldInfo& bi = *(Pds::BldInfo*)(&xtc->src);
00700 amap->addsrc(bi);
00701 }
00702 if (xtc->contains.id() == Pds::TypeId::Id_AliasConfig) {
00703
00704 boost::shared_ptr<PSEvt::AliasMap> amap = env.aliasMap();
00705
00706 if (xtc->contains.version() == 1) {
00707 const Pds::Alias::ConfigV1* cfgV1 = (const Pds::Alias::ConfigV1*)xtc->payload();
00708 const ndarray<const Pds::Alias::SrcAlias, 1>& aliases = cfgV1->srcAlias();
00709 for (unsigned i = 0; i != aliases.shape()[0]; ++ i) {
00710 const Pds::Alias::SrcAlias& alias = aliases[i];
00711 amap->add(alias.aliasName(), alias.src());
00712 }
00713 } else {
00714 MsgLog(name(), warning, name() << ": failed to find Alias::ConfigV1 in config store");
00715 }
00716 }
00717 }
00718 }
00719
00720
00721
00722 XtcInput::XtcIterator iter(&dgptr->xtc);
00723 while (Pds::Xtc* xtc = iter.next()) {
00724 if (xtc->contains.id() != Pds::TypeId::Id_Epics) {
00725 boost::shared_ptr<Pds::Xtc> xptr(dgptr, xtc);
00726 if (psddl_pds2psana::SmallDataProxy::isSmallDataProxy(xtc->contains)) {
00727 if (smallDataProxy) {
00728
00729 smallDataProxy->addEnvProxy(xptr, psddl_pds2psana::SmallDataProxy::getSmallConvertTypeInfoPtrs(xtc, m_cvt));
00730 } else {
00731 MsgLog(name(), warning, name() << ": smallDataProxy typeid found but smallDataProxy is null. Skiping (is this a .smd.xtc file?).");
00732 }
00733 } else {
00734
00735 m_cvt.convertConfig(xptr, env.configStore());
00736 }
00737 }
00738
00739 if (xtc->contains.id() == Pds::TypeId::Id_EpicsConfig) {
00740
00741 boost::shared_ptr<Psana::Epics::ConfigV1> cfgV1 = env.configStore().get(xtc->src);
00742 if (cfgV1) {
00743 const ndarray<const Psana::Epics::PvConfigV1, 1>& pvs = cfgV1->getPvConfig();
00744 for (unsigned i = 0; i != pvs.shape()[0]; ++ i) {
00745 const Psana::Epics::PvConfigV1& pvcfg = pvs[i];
00746 env.epicsStore().storeAlias(xtc->src, pvcfg.pvId(), pvcfg.description());
00747 }
00748 }
00749 }
00750
00751 }
00752
00753 }
00754
00755
00756
00757 XtcInput::XtcIterator iter(&dgptr->xtc);
00758 while (Pds::Xtc* xtc = iter.next()) {
00759
00760 if (xtc->contains.id() == Pds::TypeId::Id_Epics) {
00761
00762 boost::shared_ptr<Pds::Xtc> xptr(dgptr, xtc);
00763 m_cvt.convertEpics(xptr, env.epicsStore(), m_eventTagEpicsStore);
00764 }
00765
00766 }
00767 if (smallDataProxy) smallDataProxy->finalize();
00768 }
00769
00770 bool XtcInputModuleBase::liveAvail(int numEvents) {
00771 return m_dgsource->liveAvail(numEvents);
00772 }
00773
00774 }