00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include "psana/InputIter.h"
00017
00018
00019
00020
00021 #include <algorithm>
00022 #include <iostream>
00023 #include <iterator>
00024
00025
00026
00027
00028 #include "MsgLogger/MsgLogger.h"
00029 #include "psana/Exceptions.h"
00030 #include "psana/InputModule.h"
00031 #include "PSEvt/ProxyDict.h"
00032
00033
00034
00035
00036
00037 namespace {
00038
00039 const char* logger = "InputIter";
00040
00041 }
00042
00043
00044
00045
00046
00047 namespace psana {
00048
00049
00050
00051
00052 InputIter::InputIter (const boost::shared_ptr<InputModule>& inputModule,
00053 const boost::shared_ptr<PSEnv::Env>& env)
00054 : m_inputModule(inputModule)
00055 , m_env(env)
00056 , m_finished(false)
00057 , m_state(StateNone)
00058 , m_values()
00059 , m_aliasMap(env->aliasMap())
00060 {
00061 m_newStateEventType[StateNone] = None;
00062 m_newStateEventType[StateConfigured] = BeginJob;
00063 m_newStateEventType[StateRunning] = BeginRun;
00064 m_newStateEventType[StateScanning] = BeginCalibCycle;
00065
00066 m_closeStateEventType[StateNone] = None;
00067 m_closeStateEventType[StateConfigured] = EndJob;
00068 m_closeStateEventType[StateRunning] = EndRun;
00069 m_closeStateEventType[StateScanning] = EndCalibCycle;
00070
00071
00072 EventPtr evt = boost::make_shared<PSEvt::Event>(boost::make_shared<PSEvt::ProxyDict>(m_aliasMap));
00073 m_inputModule->beginJob(*evt, *m_env);
00074 newState(StateConfigured, evt);
00075 }
00076
00077
00078
00079
00080 InputIter::~InputIter ()
00081 {
00082
00083 if (m_state != StateNone) {
00084 EventPtr evt = boost::make_shared<PSEvt::Event>(boost::make_shared<PSEvt::ProxyDict>(m_aliasMap));
00085 m_inputModule->endJob(*evt, *m_env);
00086 }
00087 }
00088
00089
00090
00091
00092
00093 InputIter::value_type
00094 InputIter::next()
00095 {
00096 value_type result(None, boost::shared_ptr<PSEvt::Event>());
00097
00098
00099 if (not m_values.empty()) {
00100 result = m_values.front();
00101 m_values.pop_front();
00102 return result;
00103 }
00104
00105 if (m_finished) return result;
00106
00107 WithMsgLog(logger, debug, out) {
00108 out << "enter -- m_state: " << m_state;
00109 if (not m_values.empty()) {
00110 out << " m_values:";
00111 for (std::deque<value_type>::const_iterator it = m_values.begin(); it != m_values.end(); ++it) {
00112 out << " " << it->first;
00113 }
00114 }
00115 }
00116
00117
00118
00119 while (m_values.empty()) {
00120
00121
00122 EventPtr evt = boost::make_shared<PSEvt::Event>(boost::make_shared<PSEvt::ProxyDict>(m_aliasMap));
00123
00124
00125 InputModule::Status istat = m_inputModule->event(*evt, *m_env);
00126 MsgLog(logger, debug, "input.event() returned " << istat);
00127
00128
00129 if (istat == InputModule::Skip) continue;
00130 if (istat == InputModule::Stop) break;
00131 if (istat == InputModule::Abort) {
00132 MsgLog(logger, info, "Input module requested abort");
00133 throw ExceptionAbort(ERR_LOC, "Input module requested abort");
00134 }
00135
00136
00137 if (istat == InputModule::DoEvent) {
00138
00139
00140 this->newState(StateScanning, evt);
00141 m_values.push_back(value_type(Event, evt));
00142
00143 } else {
00144
00145 State unwindTo = StateNone;
00146 State newState = StateNone;
00147 if (istat == InputModule::BeginRun) {
00148 unwindTo = StateConfigured;
00149 newState = StateRunning;
00150 } else if (istat == InputModule::BeginCalibCycle) {
00151 unwindTo = StateRunning;
00152 newState = StateScanning;
00153 } else if (istat == InputModule::EndCalibCycle) {
00154 unwindTo = StateRunning;
00155 } else if (istat == InputModule::EndRun) {
00156 unwindTo = StateConfigured;
00157 }
00158
00159 unwind(unwindTo, evt);
00160 if (newState != StateNone) {
00161 this->newState(newState, evt);
00162 }
00163
00164 }
00165 }
00166
00167 if (m_values.empty()) {
00168
00169 EventPtr evt = boost::make_shared<PSEvt::Event>(boost::make_shared<PSEvt::ProxyDict>(m_aliasMap));
00170 m_inputModule->endJob(*evt, *m_env);
00171 unwind(StateNone, evt);
00172 m_finished = true;
00173 }
00174
00175
00176 if (not m_values.empty()) {
00177 result = m_values.front();
00178 m_values.pop_front();
00179 }
00180
00181 WithMsgLog(logger, debug, out) {
00182 out << "exit -- m_state: " << m_state;
00183 if (not m_values.empty()) {
00184 out << " m_values:";
00185 for (std::deque<value_type>::const_iterator it = m_values.begin(); it != m_values.end(); ++it) {
00186 out << " " << it->first;
00187 }
00188 }
00189 }
00190
00191 return result;
00192 }
00193
00194
00195 void
00196 InputIter::finish()
00197 {
00198
00199 EventPtr evt = boost::make_shared<PSEvt::Event>(boost::make_shared<PSEvt::ProxyDict>(m_aliasMap));
00200 m_inputModule->endJob(*evt, *m_env);
00201 unwind(StateNone, evt);
00202 m_finished = true;
00203 }
00204
00205
00206 void
00207 InputIter::newState(State state, const EventPtr& evt)
00208 {
00209 MsgLog(logger, trace, "newState " << state);
00210
00211
00212 if (int(m_state) < int(state-1)) {
00213
00214 EventPtr evt = boost::make_shared<PSEvt::Event>(boost::make_shared<PSEvt::ProxyDict>(m_aliasMap));
00215 newState(State(state-1), evt);
00216 }
00217
00218 if (int(m_state) < int(state)) {
00219
00220 m_state = state;
00221
00222
00223 if (m_newStateEventType[state] != None) {
00224 m_values.push_back(value_type(m_newStateEventType[state], evt));
00225 }
00226 }
00227 }
00228
00229
00230 void
00231 InputIter::closeState(const EventPtr& evt)
00232 {
00233 MsgLog(logger, trace, "closeState " << m_state);
00234
00235
00236 if (m_closeStateEventType[m_state] != None) {
00237 m_values.push_back(value_type(m_closeStateEventType[m_state], evt));
00238 }
00239
00240
00241 m_state = State(m_state-1);
00242 }
00243
00244
00245 void
00246 InputIter::unwind(State newState, const EventPtr& evt)
00247 {
00248 while (m_state > newState+1) {
00249
00250 EventPtr evt = boost::make_shared<PSEvt::Event>(boost::make_shared<PSEvt::ProxyDict>(m_aliasMap));
00251 closeState(evt);
00252 }
00253 if (m_state > newState) {
00254 closeState(evt);
00255 }
00256 }
00257
00258
00259 std::ostream&
00260 operator<<(std::ostream& out, InputIter::EventType type)
00261 {
00262 const char* str = "???";
00263 switch (type) {
00264 case InputIter::None:
00265 str = "None";
00266 break;
00267 case InputIter::BeginJob:
00268 str = "BeginJob";
00269 break;
00270 case InputIter::BeginRun:
00271 str = "BeginRun";
00272 break;
00273 case InputIter::BeginCalibCycle:
00274 str = "BeginCalibCycle";
00275 break;
00276 case InputIter::Event:
00277 str = "Event";
00278 break;
00279 case InputIter::EndCalibCycle:
00280 str = "EndCalibCycle";
00281 break;
00282 case InputIter::EndRun:
00283 str = "EndRun";
00284 break;
00285 case InputIter::EndJob:
00286 str = "EndJob";
00287 break;
00288 case InputIter::NumEventTypes:
00289 break;
00290 }
00291 return out << str;
00292 }
00293
00294
00295 std::ostream&
00296 operator<<(std::ostream& out, InputIter::State state)
00297 {
00298 const char* str = "???";
00299 switch (state) {
00300 case InputIter::StateNone:
00301 str = "StateNone";
00302 break;
00303 case InputIter::StateConfigured:
00304 str = "StateConfigured";
00305 break;
00306 case InputIter::StateRunning:
00307 str = "StateRunning";
00308 break;
00309 case InputIter::StateScanning:
00310 str = "StateScanning";
00311 break;
00312 case InputIter::NumStates:
00313 break;
00314 }
00315 return out << str;
00316 }
00317
00318 }