PSXtcInput/src/Index.cpp

Go to the documentation of this file.
00001 //--------------------------------------------------------------------------
00002 // File and Version Information:
00003 //      $Id: Index.cpp 7696 2014-02-27 00:40:59Z cpo@SLAC.STANFORD.EDU $
00004 //
00005 // Description:
00006 //      Class Index...
00007 //
00008 // Author List:
00009 //      Christopher O'Grady
00010 //
00011 //------------------------------------------------------------------------
00012 
00013 //-----------------------
00014 // This Class's Header --
00015 //-----------------------
00016 #include "PSXtcInput/Index.h"
00017 
00018 //-----------------
00019 // C/C++ Headers --
00020 //-----------------
00021 #include <algorithm>
00022 #include <vector>
00023 #include <queue>
00024 #include <map>
00025 #include <string>
00026 #include <iomanip>
00027 #include <fcntl.h>
00028 #include <stdlib.h>
00029 #include <sstream>
00030 
00031 //-------------------------------
00032 // Collaborating Class Headers --
00033 //-------------------------------
00034 #include "MsgLogger/MsgLogger.h"
00035 #include "PSXtcInput/Exceptions.h"
00036 #include "pdsdata/index/IndexFileStruct.hh"
00037 #include "pdsdata/index/IndexFileReader.hh"
00038 #include "pdsdata/index/IndexList.hh"
00039 #include "pdsdata/xtc/Sequence.hh"
00040 #include "IData/Dataset.h"
00041 #include "XtcInput/XtcFileName.h"
00042 #include "pdsdata/xtc/XtcIterator.hh"
00043 #include "pdsdata/psddl/epics.ddl.h"
00044 
00045 //-----------------------------------------------------------------------
00046 // Local Macros, Typedefs, Structures, Unions and Forward Declarations --
00047 //-----------------------------------------------------------------------
00048 
00049 using namespace XtcInput;
00050 using namespace std;
00051 
00052 //              ----------------------------------------
00053 //              -- Public Function Member Definitions --
00054 //              ----------------------------------------
00055 
00056 namespace {
00057   const char* logger = "PSXtcInput::Index";
00058 }
00059 
00060 namespace PSXtcInput {
00061 
00062 // class to take a list of xtc filenames and generate a map
00063 // where one can look up the files for a particular run.
00064 
00065 class RunMap {
00066 public:
00067   std::vector<XtcInput::XtcFileName> files;
00068   typedef std::map<unsigned, std::vector<XtcInput::XtcFileName> > map;
00069   std::vector<unsigned> runs;
00070   map runFiles;
00071 
00072   RunMap(std::vector<std::string> &m_fileNames) {
00073     // Input can be a mixture of files and datasets.
00074     // Live mode is not supported. "one-stream mode"
00075     // is only supported if the users provides a list of
00076     // timestamps from one stream.
00077 
00078     typedef std::vector<std::string> FileList;
00079 
00080     // guess whether we have datasets or pure file names (or mixture)
00081     for (FileList::const_iterator it = m_fileNames.begin(); it != m_fileNames.end(); ++ it) {
00082     
00083       IData::Dataset ds(*it);
00084       if (ds.exists("live")) MsgLog(logger, fatal, "Live mode not supported with xtc indexing");
00085 
00086       if (ds.isFile()) {
00087 
00088         // must be file name
00089         files.push_back(XtcInput::XtcFileName(*it));
00090         
00091       } else {
00092 
00093         // Find files on disk and add to the list
00094         const IData::Dataset::NameList& strfiles = ds.files();
00095         if (strfiles.empty()) MsgLog(logger, fatal, "Empty file list");
00096         for (IData::Dataset::NameList::const_iterator it = strfiles.begin(); it != strfiles.end(); ++ it) {
00097           XtcInput::XtcFileName file(*it);
00098           files.push_back(file);
00099         }
00100       }
00101       // sort files to make sure we get a chunk0 first
00102       sort(files.begin(),files.end());
00103       
00104       // sort all files according run
00105       for (std::vector<XtcInput::XtcFileName>::const_iterator it = files.begin(); it != files.end(); ++ it) {
00106         runFiles[it->run()].push_back(*it);
00107       }
00108       for (map::const_iterator it = runFiles.begin(); it != runFiles.end(); ++ it) {
00109         runs.push_back(it->first);
00110       }
00111     }
00112   }
00113 };
00114 
00115 // class which manages xtc files, including "jump" function to do random access
00116 
00117 class IndexXtcReader {
00118 public:
00119   IndexXtcReader() {}
00120 
00121   void add(const XtcFileName& xtcfile) {
00122     int fd = ::open(xtcfile.path().c_str(), O_RDONLY | O_LARGEFILE);
00123     if (fd==-1) MsgLog(logger, fatal,
00124                                "File " << xtcfile.path().c_str() << " not found");
00125     _xtcFileList.push_back(xtcfile);
00126     _fd.push_back(fd);
00127   }
00128 
00129   ~IndexXtcReader() {
00130     for  (vector<int>::const_iterator it = _fd.begin(); it!= _fd.end(); it++)
00131       ::close(*it);
00132   }
00133 
00134   const vector<XtcFileName>& files() {return _xtcFileList;}
00135 
00136   Pds::Dgram* jump(int file, int64_t offset) {
00137     int64_t found = lseek64(_fd[file],offset, SEEK_SET);
00138     if (found != offset) {
00139       stringstream ss;
00140       ss << "Jump to offset " << offset << " failed";
00141       MsgLog(logger, error, ss.str());
00142       throw IndexSeekFailed(ERR_LOC);
00143     }
00144     Pds::Dgram dghdr;
00145     if (::read(_fd[file], &dghdr, sizeof(dghdr))==0) {
00146       return 0;
00147     } else {
00148       if (dghdr.xtc.sizeofPayload()>MaxDgramSize)
00149         MsgLog(logger, fatal, "Datagram size exceeds sanity check. Size: " << dghdr.xtc.sizeofPayload() << " Limit: " << MaxDgramSize);
00150       Pds::Dgram* dg = (Pds::Dgram*)new char[sizeof(dghdr)+dghdr.xtc.sizeofPayload()];
00151       *dg = dghdr;
00152       ::read(_fd[file], dg->xtc.payload(), dg->xtc.sizeofPayload());
00153       return dg;
00154     }
00155   }
00156 
00157 private:
00158   enum {MaxDgramSize=0x2000000};
00159   vector<int> _fd;
00160   vector<XtcFileName> _xtcFileList;
00161 };
00162 
00163 // class which is used by IndexBase. for each event in the index table,
00164 // keeps track of which xtc-file-number contains the associated epics
00165 // data, as well as the offset to that data (used by "jump").
00166 
00167 class EpicsInfo {
00168 public:  
00169   EpicsInfo() : offset(-1),file(-1) {}
00170   int64_t offset;
00171   int file;
00172   bool operator==(const EpicsInfo& other) const {return (offset==other.offset && file==other.file);}
00173   bool operator!=(const EpicsInfo& other) const {return !(*this==other);}
00174 };
00175 
00176 // this class is one entry in a index table.  it is used mainly for L1Accepts,
00177 // but is also reused for the table of BeginCalibs (hence the template)
00178 
00179 template<typename T>
00180 class IndexBase {
00181 public:
00182   T entry;
00183   int file;
00184   void _init() {
00185     file=-1;
00186   }
00187   virtual ~IndexBase() {}
00188   IndexBase() {}
00189   IndexBase(uint32_t seconds, uint32_t nanoseconds) {
00190     entry.uSeconds=seconds;
00191     entry.uNanoseconds=nanoseconds;
00192     _init();
00193   }
00194   bool operator<(const IndexBase& other) const {
00195     return entry.time()<other.entry.time();
00196   }
00197   bool operator==(const IndexBase& other) const {
00198     return entry.time()==other.entry.time();
00199   }
00200   bool operator!=(const IndexBase& other) const {
00201     return !(*this==other);
00202   }
00203 };
00204 
00205 typedef IndexBase<Pds::Index::CalibNode> IndexCalib;
00206 typedef IndexBase<Pds::Index::L1AcceptNode> IndexUnixTime;
00207 
00208 // used by the IOC recorders when we want to sort/search
00209 // using unix timestamp, but not the fiducial
00210 
00211 class IndexFiducial : public IndexUnixTime {
00212 public:
00213   IndexFiducial() {}
00214   IndexFiducial(uint32_t seconds, uint32_t nanoseconds, uint32_t fiducial) : IndexUnixTime(seconds,nanoseconds) {
00215     entry.uFiducial=fiducial;
00216   }
00217 };
00218 
00219 class IndexEvent : public IndexFiducial {
00220 public:
00221   enum {MaxEpicsSources=6};
00222   // the code would be neater if this was a vector, but I think it would
00223   // be less performant as well, hence the hardwired number. - cpo
00224   EpicsInfo einfo[MaxEpicsSources];
00225   virtual ~IndexEvent() {}
00226   IndexEvent() {}
00227   IndexEvent(uint32_t seconds, uint32_t nanoseconds, uint32_t fiducial) : IndexFiducial(seconds,nanoseconds,fiducial) {}
00228   bool operator<(const IndexEvent& other) const {
00229     if (entry.time()==other.entry.time())
00230       return entry.uFiducial<other.entry.uFiducial;
00231     else
00232       return entry.time()<other.entry.time();
00233   }
00234   bool operator==(const IndexEvent& other) const {
00235     return entry.time()==other.entry.time() && entry.uFiducial==other.entry.uFiducial;
00236   }
00237   bool operator!=(const IndexEvent& other) const {
00238     return !(*this==other);
00239   }
00240 };
00241 
00242 ostream& operator<<(ostream& os, const IndexEvent& idx) {
00243   os << "time " << std::hex << idx.entry.uSeconds << "/" << idx.entry.uNanoseconds << " fiducial " << idx.entry.uFiducial << ", filenum " << idx.file;
00244     return os;
00245 }
00246 
00247 ostream& operator<<(ostream& os, const IndexCalib& idx) {
00248   os << "time " << std::hex << idx.entry.uSeconds << "/" << idx.entry.uNanoseconds;
00249     return os;
00250 }
00251 
00252 // The index files do not support multiple archiving intervals
00253 
00254 class myLevelIter : public XtcIterator {
00255 public:
00256   enum {Stop, Continue};
00257   myLevelIter(Xtc* xtc, bool allowCorruptEpics) :
00258     XtcIterator(xtc), _allowCorruptEpics(allowCorruptEpics) {}
00259 
00260   void process(const Epics::ConfigV1& e) {
00261     int32_t numpv = e.numPv();
00262     ndarray<const Epics::PvConfigV1, 1> pv = e.getPvConfig();
00263     float interval = pv[0].interval();
00264     for (int i=1;i<numpv;i++) {
00265       float nextinterval = pv[i].interval();
00266       if (interval != nextinterval) {
00267         if (_allowCorruptEpics) {
00268           MsgLog(logger, warning,
00269                  "Index mode does not support multiple epics archiving "
00270                  "intervals."
00271                  << " Variable 0 interval: " << interval
00272                  << ", Variable " << i << " interval " << nextinterval);
00273         } else {
00274           MsgLog(logger, fatal,
00275                  "Index mode does not support multiple epics archiving intervals."
00276                  << " Variable 0 interval: " << interval
00277                  << ", Variable " << i << " interval " << nextinterval
00278                  << endl << " This error can be turned into a warning "
00279                  "by setting the parameter ""allow-corrupt-epics"" "
00280                  "in the [psana] section of the configuration file, "
00281                  "but slow EPICS data will be corrupt."
00282                  );
00283         }
00284         break;
00285       }
00286     }
00287   }
00288   int process(Xtc* xtc) {
00289     Level::Type   level     = xtc->src.level();
00290     if (level < 0 || level >= Level::NumberOfLevels )
00291     {
00292         MsgLog(logger, warning,
00293                "Unsupported level " << (int) level);
00294         return Continue;
00295     }    
00296     switch (xtc->contains.id()) {
00297     case (TypeId::Id_Xtc) : {
00298       myLevelIter iter(xtc,_allowCorruptEpics);
00299       iter.iterate();
00300       break;
00301     }
00302     case (TypeId::Id_EpicsConfig) :
00303       process(*(const Epics::ConfigV1*)(xtc->payload()));
00304       break;
00305     default:
00306       break;
00307     }
00308     return Continue;
00309   }
00310 private:
00311   bool _allowCorruptEpics;
00312 };
00313 
00314 // this is the implementation of the per-run indexing.  shouldn't be too
00315 // hard to make it work for for per-calibcycle indexing as well.
00316 
00317 class IndexRun {
00318 private:
00319   // the index files do not support multiple recording intervals
00320   // for epics variables (happened in ~20% of experiments before
00321   // we started saving every variable in every shot (summer 2014)).
00322   void _checkEpicsInterval(Pds::Dgram* dg, bool allowCorruptEpics) {
00323     myLevelIter iter(&(dg->xtc),allowCorruptEpics);
00324     iter.iterate();
00325   }
00326 
00327   // read an index file corresponding to an xtc file
00328   bool _getidx(const XtcFileName &xtcfile, Pds::Index::IndexList& idxlist) {
00329     string idxname = xtcfile.path();
00330     string basename = xtcfile.basename();
00331     size_t pos = idxname.find(basename,0);
00332     idxname.insert(pos,"index/");
00333     idxname.append(".idx");
00334     int fd = open(idxname.c_str(), O_RDONLY | O_LARGEFILE);
00335     if (fd < 0) {
00336       MsgLog(logger, warning, "Unable to open xtc index file " << idxname);
00337       return 1;
00338     }
00339     idxlist.readFromFile(fd);
00340     ::close(fd);
00341     return 0;
00342   }
00343 
00344   // this is tricky.  the bit-list of DAQ "detector sources" can be different
00345   // for the various event-nodes ("streams").  returns a bitmask of the bits
00346   // in the DAQ index TSegmentToIdMap that correspond to Epics types, as
00347   // well as a map (bit2Src) from those bits to the Src.  This latter map essentially
00348   // "inverts" the direction of TSegmentToIdMap.
00349   unsigned _getEpicsBit2SrcMap(const Pds::Index::IndexList::TSegmentToIdMap& seg, std::map<int,Pds::Src> &bit2Src) {
00350     unsigned mask = 0;
00351     for (Pds::Index::IndexList::TSegmentToIdMap::const_iterator it=seg.begin(); it!=seg.end(); ++it) {
00352       const Pds::Index::L1SegmentId::TTypeList& type = it->second.typeList;
00353       const Pds::Index::L1SegmentId::TSrcList& src = it->second.srcList;
00354       Pds::Index::L1SegmentId::TSrcList::const_iterator itsrc=src.begin();
00355       for (Pds::Index::L1SegmentId::TTypeList::const_iterator ittype=type.begin(); ittype!=type.end(); ++ittype) {
00356         Pds::TypeId::Type type = (*ittype).id();
00357         int index = it->second.iIndex;
00358         Pds::Src src = *itsrc;
00359         if (type==Pds::TypeId::Id_Epics) {
00360           bit2Src[1<<index]=src;
00361           mask |= 1<<index;
00362         }
00363         ++itsrc;
00364       }
00365     }
00366     return mask;
00367   }
00368 
00369   // add a single DAQ index-file to the large IndexBase table (either IndexEvent
00370   // or IndexCalib)
00371   template <typename T1, typename T2>
00372   void _store(T1 &idx, const T2 &add, vector<string>::size_type ifile) {
00373     int numadd = add.size();
00374     int numtot = idx.size();
00375     idx.resize(numadd+numtot);
00376     for (int i=0; i<numadd; i++) {
00377       idx[numtot].entry=add[i];
00378       idx[numtot].file = ifile;
00379       numtot++;
00380     }
00381   }
00382 
00383   // create a vector of unique times that the user can use to jump to events.
00384   // also keep track of which times correspond to the beginning of a new
00385   // calibcycle.
00386   void _fillTimes() {
00387     IndexEvent last(0,0,0);
00388     
00389     _calibTimeIndex.clear();
00390     std::vector<IndexCalib>::const_iterator caliter = _idxcalib.begin();
00391     for (vector<IndexEvent>::iterator itev = _idx.begin(); itev != _idx.end(); ++ itev) {
00392       if (*itev!=last) {
00393         _times.push_back(psana::EventTime(itev->entry.time(),itev->entry.uFiducial));
00394         last = *itev;
00395         if (caliter != _idxcalib.end()) {
00396           if (itev->entry.time() > caliter->entry.time()) {
00397             // put the array-offset of the first event in this
00398             // calibcycle into the list.
00399             _calibTimeIndex.push_back(_times.size()-1);
00400             ++caliter;
00401           }
00402         }
00403       }
00404     }
00405   }
00406 
00407   // add a datagram with "event" data (versus nonEvent data, like epics)
00408   // to the vector of pieces (i.e. add another "piece")
00409   void _add(Pds::Dgram* dg, int file) {
00410     _pieces.eventDg.push_back(XtcInput::Dgram(XtcInput::Dgram::make_ptr(dg),_xtc.files()[file]));
00411   }
00412 
00413   // copy the event-pieces onto the queue where the DgramSourceIndex object
00414   // can pick them up.
00415   void _post() {
00416     _queue.push(_pieces);
00417   }
00418 
00419   // add only one "event" datagram and post
00420   void _post(Pds::Dgram* dg, int file) {
00421     _add(dg, file);
00422     _post();
00423   }
00424 
00425   // post only this dg
00426   void _postOneDg(Pds::Dgram* dg, int file) {
00427     _pieces.reset();
00428     if (dg) _post(dg, file);
00429   }
00430 
00431   void _addIocConfigure() {
00432     Pds::Dgram* dg=0;
00433     int file = 0;
00434     for (vector<XtcFileName>::const_iterator it = _xtc.files().begin();
00435          it != _xtc.files().end(); it++,file++) {
00436       if ((*it).stream()>=80 && (*it).chunk()==0) {
00437         dg = _xtc.jump(file, 0);
00438         if (dg->seq.service()!=Pds::TransitionId::Configure) {
00439           MsgLog(logger, fatal, "Configure transition not found at beginning of file" << (*it));
00440         }
00441         _add(dg,file);
00442       }
00443     }
00444   }
00445 
00446   // look for configure in first 2 datagrams from the first file.  this will fail
00447   // if we don't get a chunk0 first in the list of files.  we have previously
00448   // sorted the files in RunMap to ensure this is the case.
00449   void _configure(bool allowCorruptEpics) {
00450 
00451     _pieces.reset();
00452 
00453     int64_t offset = 0;
00454     for (int i=0; i<2; i++) {
00455       Pds::Dgram* dg = _xtc.jump(0, offset);
00456       if (dg->seq.service()==Pds::TransitionId::Configure) {
00457         _checkEpicsInterval(dg,allowCorruptEpics);
00458         _addIocConfigure();
00459         _post(dg,0);
00460         _beginrunOffset = dg->xtc.sizeofPayload()+sizeof(Pds::Dgram);
00461         return;
00462       }
00463       offset+=dg->xtc.sizeofPayload()+sizeof(Pds::Dgram);
00464     }
00465     MsgLog(logger, fatal, "Configure transition not found in first 2 datagrams");
00466   }
00467 
00468   // send beginrun from the first file
00469   void _beginrun() {
00470     Pds::Dgram* dg = _xtc.jump(0, _beginrunOffset);
00471     if (dg->seq.service()!=Pds::TransitionId::BeginRun)
00472       MsgLog(logger, fatal, "BeginRun transition not found after configure transition");
00473     _postOneDg(dg,0);
00474   }
00475 
00476   // check to see if we need to send a begincalib by looking
00477   // to see if the begincalib needed by this timestamp
00478   // is the same as the begincalib we sent previously
00479   void _maybePostCalib(uint32_t seconds, uint32_t nanoseconds) {
00480     IndexCalib request(seconds,nanoseconds);
00481     vector<IndexCalib>::iterator it;
00482     it = lower_bound(_idxcalib.begin(),_idxcalib.end(),request);
00483     if (it==_idxcalib.begin()) {
00484       MsgLog(logger, warning, "Calib cycle for event time " << seconds << "/" << nanoseconds << " earlier than first calib-cycle time " << _idxcalib.begin()->entry.uSeconds << "/" << _idxcalib.begin()->entry.uNanoseconds << ". Sending first calib cycle.  DAQ scan information may be incorrect.");
00485       it+=1;
00486     }
00487     vector<IndexCalib>::iterator calib;
00488     calib=it-1;
00489     if (*calib!=_lastcalib) {
00490       // it appears that psana takes care of sending endcalib for us
00491       // need to send begincalib
00492       _postOneDg(_xtc.jump((*calib).file, (*calib).entry.i64Offset),(*calib).file);
00493       _lastcalib=*calib;
00494     }
00495   }
00496 
00497   // look through the ioc index table to find datagrams
00498   // within a time window of the DAQ dg, where the fiducials
00499   // match precisely
00500   void _maybeAddIoc(uint32_t seconds, uint32_t fiducial) {
00501     const unsigned window = 5; // in seconds
00502     vector<IndexFiducial>::iterator it;
00503     it = lower_bound(_idxioc.begin(),_idxioc.end(),IndexFiducial(seconds-window,0,0));
00504     while (it!=_idxioc.end()) {
00505       if (it->entry.uSeconds>(seconds+window)) return; // out of the window
00506       if (it->entry.uFiducial==fiducial) {
00507         Pds::Dgram* dg=0;
00508         dg = _xtc.jump((*it).file, (*it).entry.i64OffsetXtc);
00509         _add(dg,(*it).file); // don't return: there can be a match from another ioc stream
00510       }
00511       it++;
00512     }
00513   }
00514 
00515   // loop over the _lastEpics list (one per epics source).
00516   // look in the big IndexEvent table and see if this L1 timestamp needs
00517   // a new "nonEvent" datagram of epics info.
00518   void _maybeAddEpics(const IndexEvent& evt, const vector<Pds::Src>& src, Pds::Dgram* dg) {
00519     int i=0;
00520     for (vector<EpicsInfo>::iterator last =_lastEpics.begin(); last!=_lastEpics.end(); last++) {
00521       EpicsInfo request = evt.einfo[i];
00522       // check if we need new epics info
00523       if (*last != request) {
00524         // don't send if already have the epics data inside this event
00525         if (request.file != evt.file || request.offset != evt.entry.i64OffsetXtc) {
00526           Pds::Dgram* epicsdg = _xtc.jump(request.file,request.offset);
00527           if (epicsdg) {
00528             _pieces.nonEventDg.push_back(XtcInput::Dgram(XtcInput::Dgram::make_ptr(epicsdg),_xtc.files()[request.file]));
00529           } else {
00530             MsgLog(logger, fatal, "Epics data not found at offset" << request.offset);
00531           }
00532           *last=request;
00533         }
00534       }
00535       i++;
00536     }
00537     // if there are multiple EPICS sources, sort them so that
00538     // the oldest is first in the list, so the newest data "wins"
00539     // when andy processes the non-event datagrams.
00540     sort(_pieces.nonEventDg.begin(),_pieces.nonEventDg.end());
00541   }
00542 
00543   typedef std::map<int,Pds::Src> epicsmap;
00544 
00545   // loop over XTC files, and store the corresponding DAQ index information
00546   // in the big IndexEvent table.  also take care of the tricky mapping
00547   // from epics "bitmask" to Src for each index file.
00548   void _storeIndex(const vector<XtcFileName> &xtclist, std::map<Pds::Src,int>& src2EpicsArray,
00549                    vector<epicsmap>& bit2SrcVec, vector<unsigned>& epicsmask) {
00550     bool ifirst = 1;
00551     int ifile = 0;
00552     int firststream = -1;
00553     for (std::vector<XtcFileName>::const_iterator it = xtclist.begin(); it!=xtclist.end(); ++it) {
00554       Pds::Index::IndexList idxlist;
00555       unsigned stream = (*it).stream();
00556       // get the DAQ index file, if it exists, otherwise ignore both idx/xtc files.
00557       if (_getidx(*it, idxlist)) continue;
00558       _xtc.add(*it);
00559       if (stream<80) {
00560         // store them in event table that includes DAQ data
00561         _store(_idx,idxlist.getL1(),ifile);
00562         // begincalibs are a little tricky, I believe.  in principle
00563         // a begincalib for an event could be in a previous chunk
00564         // so I think we need to put them in one big list for the
00565         // whole run and search (although we could also add them
00566         // to the one big IndexEvent table, like we do for epics data.
00567         // Only store the first stream's calibcycles, since all
00568         // streams are identical -cpo
00569         if (firststream==-1) {
00570           _store(_idxcalib,idxlist.getCalib(),ifile);
00571           firststream = stream;
00572         } else {
00573           if (stream==(unsigned)firststream)
00574             _store(_idxcalib,idxlist.getCalib(),ifile);
00575         }
00576 
00577         // epics is also tricky, because the ordering of the different
00578         // sources can change in the different DAQ index files.
00579         // store which array-offset we are using for this epics source.
00580         // also store the Pds::Src values in the same order, which
00581         // we will use to go lookup the epics data to attach to the requested event
00582         epicsmap bit2Src;
00583         epicsmask.push_back(_getEpicsBit2SrcMap(idxlist.getSeg(),bit2Src));
00584         if (ifirst) {
00585           ifirst = 0;
00586           int i=0;
00587           for (epicsmap::const_iterator it=bit2Src.begin(); it!=bit2Src.end(); ++it) {
00588             src2EpicsArray[it->second]=i;
00589             i++;
00590             _epicsSource.push_back(it->second);
00591           }
00592         }
00593         bit2SrcVec.push_back(bit2Src);
00594       } else {
00595         // store them in event table that includes ioc data
00596         _store(_idxioc,idxlist.getL1(),ifile);
00597       }
00598       ifile++;
00599     }
00600     _lastEpics.resize(_epicsSource.size());
00601 
00602     sort(_idx.begin(),_idx.end());
00603     sort(_idxcalib.begin(),_idxcalib.end());
00604     sort(_idxioc.begin(),_idxioc.end());
00605   }
00606 
00607   // go through the big IndexEvent table, and store nonEvent
00608   // datagram offsets to use for each epics source.
00609   void _updateEpics(std::map<Pds::Src,int>& src2EpicsArray,
00610                     vector<epicsmap>& bit2SrcVec, vector<unsigned>& epicsmask) {
00611     // put the epics offsets in the sorted index table
00612     vector<EpicsInfo> einfo(bit2SrcVec[0].size());
00613     for (vector<IndexEvent>::iterator itev = _idx.begin(); itev != _idx.end(); ++ itev) {
00614       unsigned detmask = ~itev->entry.uMaskDetData;
00615       int file = itev->file;
00616       if (detmask & epicsmask[file]) {
00617         // found an event with epics data, update the epics offsets
00618         for (epicsmap::const_iterator itsrc=bit2SrcVec[file].begin(); itsrc!=bit2SrcVec[file].end(); ++itsrc) {
00619           if (itsrc->first & detmask) {
00620             Pds::Src src = itsrc->second;
00621             einfo[src2EpicsArray[src]].offset = itev->entry.i64OffsetXtc;
00622             einfo[src2EpicsArray[src]].file = itev->file;
00623           }
00624         }
00625       }
00626       int i=0;
00627       // put the latest epics offsets in the official table
00628       for (vector<EpicsInfo>::iterator it = einfo.begin(); it != einfo.end();  ++it) {
00629         if (i>=IndexEvent::MaxEpicsSources)
00630           MsgLog(logger, fatal,"Too many epics sources: " << i+1 <<
00631                  ". Max allowed: " << IndexEvent::MaxEpicsSources);
00632         itev->einfo[i]=*it; // would be neater with vector, but less performant, I believe
00633         i++;
00634       }
00635     }
00636   }
00637 
00638 public:
00639 
00640   IndexRun(queue<DgramPieces>& queue, const vector<XtcFileName> &xtclist, bool allowCorruptEpics) :
00641     _xtc(), _beginrunOffset(0), _lastcalib(0,0), _queue(queue) {
00642 
00643     // store the index files in our table, and get some information about epics
00644     std::map<Pds::Src,int> src2EpicsArray;
00645     vector<epicsmap> bit2SrcVec;
00646     vector<unsigned> epicsmask;
00647     _storeIndex(xtclist,src2EpicsArray,bit2SrcVec,epicsmask);
00648     
00649     // update our table with the pointers to the appropriate epics event
00650     _updateEpics(src2EpicsArray, bit2SrcVec, epicsmask);
00651 
00652     // fill in the list of unique event times that the users will use to jump()
00653     _fillTimes();
00654     // send a configure transition
00655     _configure(allowCorruptEpics);
00656     // send a beginrun transition
00657     _beginrun();
00658   }
00659 
00660   ~IndexRun() {}
00661 
00662   // return vector of times that can be used for the "jump" method.
00663   void times(psana::Index::EventTimeIter& begin, psana::Index::EventTimeIter& end) const {
00664     begin = _times.begin();
00665     end = _times.end();
00666   }
00667 
00668   void times(unsigned step, psana::Index::EventTimeIter& begin, psana::Index::EventTimeIter& end) const {
00669     // if the last calibcycle has no events, the calibTimeIndex can have one fewer entries than the idxcalib
00670     if (_calibTimeIndex.size() < _idxcalib.size()-1 || _calibTimeIndex.size() > _idxcalib.size()) {
00671       MsgLog(logger, fatal, "Incorrect number of calibcycles: " << _calibTimeIndex.size() << " " << _idxcalib.size());
00672     }
00673     if (step>=_calibTimeIndex.size()) MsgLog(logger, fatal, "Requested step " << step << " not in range.  Number of steps in this run: " << _calibTimeIndex.size());
00674     begin = _times.begin()+_calibTimeIndex[step];
00675     if (step==_calibTimeIndex.size()-1) { // the last calibstep
00676       end = _times.end();
00677     } else {
00678       end = _times.begin()+_calibTimeIndex[step+1]; // the software "excludes" the last event
00679     }
00680   }
00681 
00682   unsigned nsteps() const {return _calibTimeIndex.size();}
00683 
00684   void endrun() {
00685     Pds::Dgram* dg = new Pds::Dgram;
00686     Pds::ClockTime ct;
00687     Pds::TimeStamp ts;
00688     Pds::Sequence seq(Pds::Sequence::Event,
00689                       Pds::TransitionId::EndRun,
00690                       ct, ts);
00691     dg->seq = seq;
00692     _postOneDg(dg,0);
00693   }
00694 
00695   // jump to an event
00696   // can't be a const method because it changes the "pieces" object
00697   int jump(uint64_t timestamp, uint32_t fiducial) {
00698     uint32_t seconds= (uint32_t)((timestamp&0xffffffff00000000)>>32);
00699     uint32_t nanoseconds= (uint32_t)(timestamp&0xffffffff);
00700     IndexEvent request(seconds,nanoseconds,fiducial);
00701     vector<IndexEvent>::iterator it;
00702     it = lower_bound(_idx.begin(),_idx.end(),request);
00703     Pds::Dgram* dg=0;
00704     if (*it==request) {
00705       _maybePostCalib(seconds,nanoseconds);
00706       _pieces.reset();
00707       // event-build split-events
00708       int ifirst = 1;
00709       while (it!=_idx.end() && *it==request) {
00710         dg = _xtc.jump((*it).file, (*it).entry.i64OffsetXtc);
00711         _add(dg,(*it).file);
00712         if (ifirst) {
00713           ifirst = 0;
00714           _maybeAddEpics(*it,_epicsSource,dg);
00715         }
00716         it++;
00717       }
00718       _maybeAddIoc(seconds,fiducial);
00719       _post();
00720       return 0;
00721     } else {
00722       return 1;
00723     }
00724   }
00725 
00726 private:
00727   IndexXtcReader           _xtc;
00728   vector<IndexEvent>       _idx;
00729   vector<IndexFiducial>    _idxioc;
00730   vector<Pds::Src>         _epicsSource;
00731   vector<IndexCalib>       _idxcalib;
00732   int64_t                  _beginrunOffset;
00733   IndexCalib               _lastcalib;
00734   vector<EpicsInfo>        _lastEpics;
00735   DgramPieces              _pieces;
00736   vector<psana::EventTime> _times;
00737   queue<DgramPieces>&      _queue;
00738   vector<unsigned>         _calibTimeIndex;
00739 };
00740 
00741 // above is the "private" implementation (class IndexRun), below this is the
00742 // "public" implementation (class Index)
00743 
00744 Index::Index(const std::string& name, std::queue<DgramPieces>& queue) : Configurable(name), _queue(queue),_idxrun(0),_run(-1) {
00745   _fileNames = configList("files");
00746   if ( _fileNames.empty() ) MsgLog(logger, fatal, "Empty file list");
00747   _rmap = new RunMap(_fileNames);
00748   _allowCorruptEpics = false;
00749 }
00750 
00751 void Index::allowCorruptEpics() {_allowCorruptEpics = true;}
00752 
00753 Index::~Index() {
00754   delete _idxrun;
00755   delete _rmap;
00756 }
00757 
00758 int Index::jump(psana::EventTime time) {
00759   return _idxrun->jump(time.time(),time.fiducial());
00760 }
00761 
00762 void Index::times(psana::Index::EventTimeIter& begin, psana::Index::EventTimeIter& end) {
00763   _idxrun->times(begin,end);
00764 }
00765 
00766 void Index::times(unsigned step, psana::Index::EventTimeIter& begin, psana::Index::EventTimeIter& end) {
00767   _idxrun->times(step, begin, end);
00768 }
00769 
00770 void Index::setrun(int run) {
00771   // we can be called twice for the same run, because
00772   // at beginJob we "prefetch" the first configure transition
00773   // and then we will get another setrun from the run iterator
00774   if (run==_run) return;
00775   _run=run;
00776   if (not _rmap->runFiles.count(run)) MsgLog(logger, fatal, "Run " << run << " not found");
00777   delete _idxrun;
00778   _idxrun = new IndexRun(_queue,_rmap->runFiles[run],_allowCorruptEpics);
00779 }
00780 
00781 unsigned Index::nsteps() {
00782   return _idxrun->nsteps();
00783 }
00784 
00785 void Index::end() {
00786   _idxrun->endrun();
00787 }
00788 
00789 const std::vector<unsigned>& Index::runs() {
00790   return _rmap->runs;
00791 }
00792 
00793 } // namespace PSXtcInput

Generated on 19 Dec 2016 for PSANAclasses by  doxygen 1.4.7