00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include "PSHdf5Input/Hdf5InputModule.h"
00017
00018
00019
00020
00021 #include <list>
00022
00023
00024
00025
00026 #include "IData/Dataset.h"
00027 #include "MsgLogger/MsgLogger.h"
00028 #include "psddl_psana/alias.ddl.h"
00029 #include "PSHdf5Input/Exceptions.h"
00030 #include "PSHdf5Input/Hdf5EventId.h"
00031 #include "PSHdf5Input/Hdf5FileListIter.h"
00032
00033
00034
00035
00036
00037 using namespace PSHdf5Input;
00038 PSANA_INPUT_MODULE_FACTORY(Hdf5InputModule)
00039
00040 namespace {
00041
00042
00043 bool isEpics(const std::string& group)
00044 {
00045 return group.find("/Epics::EpicsPv/") != std::string::npos;
00046 }
00047
00048
00049 bool isL3TData(const std::string& group)
00050 {
00051 return group.find("/L3T::DataV") != std::string::npos;
00052 }
00053
00054
00055 bool l3accept(const Hdf5IterData& data)
00056 {
00057
00058
00059
00060
00061
00062
00063
00064 const Hdf5IterData::seq_type& seq = data.data();
00065 for (Hdf5IterData::seq_type::const_iterator it = seq.begin(); it != seq.end(); ++ it) {
00066 const std::string& grpname = it->group.name();
00067 if (not isEpics(grpname) and not isL3TData(grpname)) return true;
00068 }
00069 return false;
00070 }
00071
00072 }
00073
00074
00075
00076
00077
00078 namespace PSHdf5Input {
00079
00080
00081
00082
00083 Hdf5InputModule::Hdf5InputModule (const std::string& name)
00084 : psana::InputModule(name)
00085 , m_datasets()
00086 , m_iter()
00087 , m_cvt()
00088 , m_skipEvents(0)
00089 , m_maxEvents(0)
00090 , m_l3tAcceptOnly(true)
00091 , m_l1Count(0)
00092 , m_simulateEOR(0)
00093 , m_evtId()
00094 {
00095
00096 ConfigSvc::ConfigSvc cfg = configSvc();
00097 m_skipEvents = cfg.get("psana", "skip-events", 0UL);
00098 m_maxEvents = cfg.get("psana", "events", 0UL);
00099 m_datasets = configList("files");
00100 if ( m_datasets.empty() ) {
00101 throw EmptyFileList(ERR_LOC);
00102 }
00103 m_l3tAcceptOnly = cfg.get("psana", "l3t-accept-only", true);
00104
00105 WithMsgLog(this->name(), debug, str) {
00106 str << "Input datasets: ";
00107 std::copy(m_datasets.begin(), m_datasets.end(), std::ostream_iterator<std::string>(str, " "));
00108 }
00109 MsgLog(this->name(), debug, "skip-events: " << m_skipEvents);
00110 MsgLog(this->name(), debug, "events: " << m_maxEvents);
00111 }
00112
00113
00114
00115
00116 Hdf5InputModule::~Hdf5InputModule ()
00117 {
00118 }
00119
00120
00121 void
00122 Hdf5InputModule::beginJob(Event& evt, Env& env)
00123 {
00124 MsgLog(name(), debug, name() << ": in beginJob()");
00125
00126
00127 std::list<std::string> files;
00128 for (std::vector<std::string>::const_iterator dsiter = m_datasets.begin(); dsiter != m_datasets.end(); ++ dsiter) {
00129
00130 IData::Dataset ds(*dsiter);
00131
00132
00133 if (not ds.exists("h5")) {
00134 throw NotHdf5Dataset(ERR_LOC, *dsiter);
00135 }
00136
00137 const IData::Dataset::NameList& strfiles = ds.files();
00138 if (strfiles.empty()) {
00139 throw NoFilesInDataset(ERR_LOC, *dsiter);
00140 }
00141 for (IData::Dataset::NameList::const_iterator it = strfiles.begin(); it != strfiles.end(); ++ it) {
00142 MsgLog(name(), debug, "Hdf5InputModule::beginJob -- add file: " << *it);
00143 files.push_back(*it);
00144 }
00145
00146 }
00147
00148
00149 m_iter.reset(new Hdf5FileListIter(files));
00150
00151
00152
00153 Hdf5IterData data = m_iter->next();
00154 MsgLog(name(), debug, "First data item: " << data);
00155
00156 if (data.type() != Hdf5IterData::Configure) {
00157 throw FileStructure(ERR_LOC, "Non-configure data at the beginning of file");
00158 }
00159
00160
00161 fillEventEnv(data, evt, env);
00162 fillEventId(data, evt);
00163 fillEpics(data, env);
00164 }
00165
00166
00167 InputModule::Status
00168 Hdf5InputModule::event(Event& evt, Env& env)
00169 {
00170
00171 if (m_simulateEOR > 0) {
00172
00173 MsgLog(name(), debug, name() << ": simulated EOR");
00174 evt.put(m_evtId);
00175
00176 m_simulateEOR = -1;
00177 return EndRun;
00178 } else if (m_simulateEOR < 0) {
00179
00180 MsgLog(name(), debug, name() << ": simulated EOF");
00181 return Stop;
00182 }
00183
00184 Hdf5IterData data = m_iter->next();
00185 MsgLog(name(), debug, "Hdf5InputModule::event -- data: " << data);
00186
00187 InputModule::Status ret = InputModule::Abort;
00188 switch(data.type()) {
00189 case Hdf5IterData::Configure:
00190 fillEventEnv(data, evt, env);
00191 fillEventId(data, evt);
00192 fillEpics(data, env);
00193 ret = InputModule::Skip;
00194 break;
00195 case Hdf5IterData::BeginRun:
00196 fillEventId(data, evt);
00197 ret = InputModule::BeginRun;
00198 break;
00199 case Hdf5IterData::BeginCalibCycle:
00200 fillEventEnv(data, evt, env);
00201 fillEventId(data, evt);
00202 ret = InputModule::BeginCalibCycle;
00203 break;
00204 case Hdf5IterData::Event:
00205 if (m_l3tAcceptOnly and not ::l3accept(data)) {
00206
00207
00208 fillEventEnv(data, evt, env);
00209 fillEpics(data, env);
00210 ret = InputModule::Skip;
00211 } else if (m_maxEvents and m_l1Count >= m_skipEvents+m_maxEvents) {
00212 m_evtId = data.eventId();
00213 evt.put(m_evtId);
00214 ret = InputModule::EndCalibCycle;
00215 m_simulateEOR = 1;
00216 } else if (m_l1Count < m_skipEvents) {
00217 ret = InputModule::Skip;
00218 } else {
00219 fillEventEnv(data, evt, env);
00220 fillEventId(data, evt);
00221 fillEpics(data, env);
00222 ret = InputModule::DoEvent;
00223 }
00224 ++m_l1Count;
00225 break;
00226 case Hdf5IterData::EndCalibCycle:
00227 fillEventId(data, evt);
00228 m_cvt.resetCache();
00229 ret = InputModule::EndCalibCycle;
00230 break;
00231 case Hdf5IterData::EndRun:
00232 fillEventId(data, evt);
00233 m_cvt.resetCache();
00234 ret = InputModule::EndRun;
00235 break;
00236 case Hdf5IterData::UnConfigure:
00237 fillEventId(data, evt);
00238 m_cvt.resetCache();
00239 ret = InputModule::Skip;
00240 break;
00241 case Hdf5IterData::Stop:
00242 m_cvt.resetCache();
00243 ret = InputModule::Stop;
00244 break;
00245 }
00246 return ret;
00247 }
00248
00249
00250 void
00251 Hdf5InputModule::endJob(Event& evt, Env& env)
00252 {
00253 }
00254
00255
00256 void
00257 Hdf5InputModule::fillEpics(const Hdf5IterData& data, Env& env)
00258 {
00259 MsgLog(name(), debug, name() << ": in fillEpics()");
00260
00261
00262 const Hdf5IterData::seq_type& pieces = data.data();
00263 for (Hdf5IterData::const_iterator it = pieces.begin(); it != pieces.end(); ++ it) {
00264 if (it->mask) m_cvt.convertEpics(it->group, it->index, env.epicsStore());
00265 }
00266 }
00267
00268
00269 void
00270 Hdf5InputModule::fillEventId(const Hdf5IterData& data, Event& evt)
00271 {
00272 MsgLog(name(), debug, name() << ": in fillEventId()");
00273
00274
00275 evt.put(data.eventId());
00276 }
00277
00278
00279 void
00280 Hdf5InputModule::fillEventEnv(const Hdf5IterData& data, Event& evt, Env& env)
00281 {
00282 MsgLog(name(), debug, name() << ": in fillEvent()");
00283
00284
00285 if (data.type() == Hdf5IterData::Configure and env.aliasMap()) {
00286
00287 boost::shared_ptr<PSEvt::AliasMap> amap = env.aliasMap();
00288
00289
00290
00291 bool reset = true;
00292 const Hdf5IterData::seq_type& pieces = data.data();
00293 for (Hdf5IterData::const_iterator it = pieces.begin(); it != pieces.end(); ++ it) {
00294
00295 const std::string grpname = it->group.name();
00296 if (grpname.find("/Alias::ConfigV1/") != std::string::npos) {
00297
00298
00299 if (reset) {
00300 amap->clear();
00301 reset = false;
00302 }
00303
00304
00305 m_cvt.convert(it->group, it->index, evt, env);
00306
00307 boost::shared_ptr<Psana::Alias::ConfigV1> cfgV1 = env.configStore().get(Source());
00308 if (cfgV1) {
00309 const ndarray<const Psana::Alias::SrcAlias, 1>& aliases = cfgV1->srcAlias();
00310 for (unsigned i = 0; i != aliases.shape()[0]; ++ i) {
00311 const Psana::Alias::SrcAlias& alias = aliases[i];
00312 amap->add(alias.aliasName(), alias.src());
00313 }
00314 } else {
00315 MsgLog(name(), warning, name() << "failed to find Alias::ConfigV1 in config store");
00316 }
00317 }
00318 }
00319 }
00320
00321
00322 const Hdf5IterData::seq_type& pieces = data.data();
00323 for (Hdf5IterData::const_iterator it = pieces.begin(); it != pieces.end(); ++ it) {
00324 if (it->mask) m_cvt.convert(it->group, it->index, evt, env);
00325 }
00326 }
00327
00328 }