00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include "psana/EventLoop.h"
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include "MsgLogger/MsgLogger.h"
00026 #include "psana/Exceptions.h"
00027 #include "psana/InputIter.h"
00028 #include "psana/InputModule.h"
00029 #include "PSEvt/ProxyDict.h"
00030
00031
00032
00033
00034
00035 namespace {
00036
00037 const char* logger = "EventLoop";
00038
00039
00040 EventLoop::EventType eventType(InputIter::EventType type)
00041 {
00042
00043 return EventLoop::EventType(type);
00044 }
00045
00046 }
00047
00048
00049
00050
00051
00052
00053 namespace psana {
00054
00055
00056
00057
00058 EventLoop::EventLoop (const boost::shared_ptr<InputModule>& inputModule,
00059 const std::vector<boost::shared_ptr<Module> >& modules,
00060 const boost::shared_ptr<PSEnv::Env>& env)
00061 : m_inputIter(boost::make_shared<InputIter>(inputModule, env))
00062 , m_modules(modules)
00063 , m_values()
00064 , m_inputModule(inputModule)
00065 {
00066 m_eventMethods[BeginJob] = &Module::beginJob;
00067 m_eventMethods[EndJob] = &Module::endJob;
00068 m_eventMethods[BeginRun] = &Module::beginRun;
00069 m_eventMethods[EndRun] = &Module::endRun;
00070 m_eventMethods[BeginCalibCycle] = &Module::beginCalibCycle;
00071 m_eventMethods[EndCalibCycle] = &Module::endCalibCycle;
00072 m_eventMethods[Event] = &Module::event;
00073 m_eventMethods[None] = 0;
00074 }
00075
00076
00077
00078
00079 EventLoop::~EventLoop ()
00080 {
00081 }
00082
00083
00084 PSEnv::Env&
00085 EventLoop::env() const
00086 {
00087 return m_inputIter->env();
00088 }
00089
00090
00091
00092
00093
00094 EventLoop::value_type
00095 EventLoop::next()
00096 {
00097 value_type result(None, boost::shared_ptr<PSEvt::Event>());
00098
00099
00100 if (not m_values.empty()) {
00101 result = m_values.front();
00102 m_values.pop_front();
00103 return result;
00104 }
00105
00106 while (true) {
00107
00108
00109 InputIter::value_type evt = m_inputIter->next();
00110 EventType evtType = ::eventType(evt.first);
00111 if (evtType == None) break;
00112
00113
00114 Module::Status stat = callModuleMethod(m_eventMethods[evtType], *evt.second, m_inputIter->env(), evtType != Event);
00115 if (stat == Module::Abort) {
00116
00117 throw ExceptionAbort(ERR_LOC, "User module requested abort");
00118 } else if (stat == Module::Stop) {
00119
00120 m_inputIter->finish();
00121 } else {
00122
00123 result = value_type(evtType, evt.second);
00124 break;
00125 }
00126
00127 }
00128
00129 return result;
00130
00131 }
00132
00133
00134
00135
00136
00137 Module::Status
00138 EventLoop::callModuleMethod(ModuleMethod method, PSEvt::Event& evt, PSEnv::Env& env, bool ignoreSkip)
00139 {
00140 Module::Status stat = Module::OK;
00141
00142 if (ignoreSkip) {
00143
00144
00145
00146 for (std::vector<boost::shared_ptr<Module> >::const_iterator it = m_modules.begin() ; it != m_modules.end() ; ++it) {
00147 boost::shared_ptr<Module> mod = *it;
00148
00149
00150 mod->reset();
00151
00152
00153 ((*mod).*method)(evt, env);
00154
00155
00156 if (mod->status() == Module::Skip) {
00157
00158 } else if (mod->status() == Module::Stop) {
00159
00160 MsgLog(logger, info, "module " << mod->name() << " requested stop");
00161 stat = Module::Stop;
00162 } else if (mod->status() == Module::Abort) {
00163
00164 MsgLog(logger, info, "module " << mod->name() << " requested abort");
00165 stat = Module::Abort;
00166 break;
00167 }
00168 }
00169
00170 } else {
00171
00172
00173
00174 for (std::vector<boost::shared_ptr<Module> >::const_iterator it = m_modules.begin() ; it != m_modules.end() ; ++it) {
00175 boost::shared_ptr<Module> mod = *it;
00176
00177
00178 mod->reset();
00179
00180
00181
00182 if (stat == Module::OK or mod->observeAllEvents()) {
00183 ((*mod).*method)(evt, env);
00184 }
00185
00186
00187 if (mod->status() == Module::Skip) {
00188
00189
00190 MsgLog(logger, trace, "module " << mod->name() << " requested skip");
00191 if (stat == Module::OK) stat = Module::Skip;
00192
00193
00194 if (not evt.exists<int>("__psana_skip_event__")) {
00195 evt.put(boost::make_shared<int>(1), "__psana_skip_event__");
00196 }
00197
00198 } else if (mod->status() == Module::Stop) {
00199
00200 MsgLog(logger, info, "module " << mod->name() << " requested stop");
00201 stat = Module::Stop;
00202 break;
00203 } else if (mod->status() == Module::Abort) {
00204
00205 MsgLog(logger, info, "module " << mod->name() << " requested abort");
00206 stat = Module::Abort;
00207 break;
00208 }
00209 }
00210
00211 }
00212
00213 return stat;
00214 }
00215
00216 Index& EventLoop::index()
00217 {
00218 return m_inputModule->index();
00219 }
00220
00221 bool EventLoop::liveAvail(int numEvents) {
00222 return m_inputModule->liveAvail(numEvents);
00223 }
00224
00225 }