00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include "PSXtcInput/Index.h"
00017
00018
00019
00020
00021 #include <algorithm>
00022 #include <vector>
00023 #include <queue>
00024 #include <map>
00025 #include <string>
00026 #include <iomanip>
00027 #include <fcntl.h>
00028 #include <stdlib.h>
00029 #include <sstream>
00030
00031
00032
00033
00034 #include "MsgLogger/MsgLogger.h"
00035 #include "PSXtcInput/Exceptions.h"
00036 #include "pdsdata/index/IndexFileStruct.hh"
00037 #include "pdsdata/index/IndexFileReader.hh"
00038 #include "pdsdata/index/IndexList.hh"
00039 #include "pdsdata/xtc/Sequence.hh"
00040 #include "IData/Dataset.h"
00041 #include "XtcInput/XtcFileName.h"
00042 #include "pdsdata/xtc/XtcIterator.hh"
00043 #include "pdsdata/psddl/epics.ddl.h"
00044
00045
00046
00047
00048
00049 using namespace XtcInput;
00050 using namespace std;
00051
00052
00053
00054
00055
00056 namespace {
00057 const char* logger = "PSXtcInput::Index";
00058 }
00059
00060 namespace PSXtcInput {
00061
00062
00063
00064
00065 class RunMap {
00066 public:
00067 std::vector<XtcInput::XtcFileName> files;
00068 typedef std::map<unsigned, std::vector<XtcInput::XtcFileName> > map;
00069 std::vector<unsigned> runs;
00070 map runFiles;
00071
00072 RunMap(std::vector<std::string> &m_fileNames) {
00073
00074
00075
00076
00077
00078 typedef std::vector<std::string> FileList;
00079
00080
00081 for (FileList::const_iterator it = m_fileNames.begin(); it != m_fileNames.end(); ++ it) {
00082
00083 IData::Dataset ds(*it);
00084 if (ds.exists("live")) MsgLog(logger, fatal, "Live mode not supported with xtc indexing");
00085
00086 if (ds.isFile()) {
00087
00088
00089 files.push_back(XtcInput::XtcFileName(*it));
00090
00091 } else {
00092
00093
00094 const IData::Dataset::NameList& strfiles = ds.files();
00095 if (strfiles.empty()) MsgLog(logger, fatal, "Empty file list");
00096 for (IData::Dataset::NameList::const_iterator it = strfiles.begin(); it != strfiles.end(); ++ it) {
00097 XtcInput::XtcFileName file(*it);
00098 files.push_back(file);
00099 }
00100 }
00101
00102 sort(files.begin(),files.end());
00103
00104
00105 for (std::vector<XtcInput::XtcFileName>::const_iterator it = files.begin(); it != files.end(); ++ it) {
00106 runFiles[it->run()].push_back(*it);
00107 }
00108 for (map::const_iterator it = runFiles.begin(); it != runFiles.end(); ++ it) {
00109 runs.push_back(it->first);
00110 }
00111 }
00112 }
00113 };
00114
00115
00116
00117 class IndexXtcReader {
00118 public:
00119 IndexXtcReader() {}
00120
00121 void add(const XtcFileName& xtcfile) {
00122 int fd = ::open(xtcfile.path().c_str(), O_RDONLY | O_LARGEFILE);
00123 if (fd==-1) MsgLog(logger, fatal,
00124 "File " << xtcfile.path().c_str() << " not found");
00125 _xtcFileList.push_back(xtcfile);
00126 _fd.push_back(fd);
00127 }
00128
00129 ~IndexXtcReader() {
00130 for (vector<int>::const_iterator it = _fd.begin(); it!= _fd.end(); it++)
00131 ::close(*it);
00132 }
00133
00134 const vector<XtcFileName>& files() {return _xtcFileList;}
00135
00136 Pds::Dgram* jump(int file, int64_t offset) {
00137 int64_t found = lseek64(_fd[file],offset, SEEK_SET);
00138 if (found != offset) {
00139 stringstream ss;
00140 ss << "Jump to offset " << offset << " failed";
00141 MsgLog(logger, error, ss.str());
00142 throw IndexSeekFailed(ERR_LOC);
00143 }
00144 Pds::Dgram dghdr;
00145 if (::read(_fd[file], &dghdr, sizeof(dghdr))==0) {
00146 return 0;
00147 } else {
00148 if (dghdr.xtc.sizeofPayload()>MaxDgramSize)
00149 MsgLog(logger, fatal, "Datagram size exceeds sanity check. Size: " << dghdr.xtc.sizeofPayload() << " Limit: " << MaxDgramSize);
00150 Pds::Dgram* dg = (Pds::Dgram*)new char[sizeof(dghdr)+dghdr.xtc.sizeofPayload()];
00151 *dg = dghdr;
00152 ::read(_fd[file], dg->xtc.payload(), dg->xtc.sizeofPayload());
00153 return dg;
00154 }
00155 }
00156
00157 private:
00158 enum {MaxDgramSize=0x2000000};
00159 vector<int> _fd;
00160 vector<XtcFileName> _xtcFileList;
00161 };
00162
00163
00164
00165
00166
00167 class EpicsInfo {
00168 public:
00169 EpicsInfo() : offset(-1),file(-1) {}
00170 int64_t offset;
00171 int file;
00172 bool operator==(const EpicsInfo& other) const {return (offset==other.offset && file==other.file);}
00173 bool operator!=(const EpicsInfo& other) const {return !(*this==other);}
00174 };
00175
00176
00177
00178
00179 template<typename T>
00180 class IndexBase {
00181 public:
00182 T entry;
00183 int file;
00184 void _init() {
00185 file=-1;
00186 }
00187 virtual ~IndexBase() {}
00188 IndexBase() {}
00189 IndexBase(uint32_t seconds, uint32_t nanoseconds) {
00190 entry.uSeconds=seconds;
00191 entry.uNanoseconds=nanoseconds;
00192 _init();
00193 }
00194 bool operator<(const IndexBase& other) const {
00195 return entry.time()<other.entry.time();
00196 }
00197 bool operator==(const IndexBase& other) const {
00198 return entry.time()==other.entry.time();
00199 }
00200 bool operator!=(const IndexBase& other) const {
00201 return !(*this==other);
00202 }
00203 };
00204
00205 typedef IndexBase<Pds::Index::CalibNode> IndexCalib;
00206 typedef IndexBase<Pds::Index::L1AcceptNode> IndexUnixTime;
00207
00208
00209
00210
00211 class IndexFiducial : public IndexUnixTime {
00212 public:
00213 IndexFiducial() {}
00214 IndexFiducial(uint32_t seconds, uint32_t nanoseconds, uint32_t fiducial) : IndexUnixTime(seconds,nanoseconds) {
00215 entry.uFiducial=fiducial;
00216 }
00217 };
00218
00219 class IndexEvent : public IndexFiducial {
00220 public:
00221 enum {MaxEpicsSources=6};
00222
00223
00224 EpicsInfo einfo[MaxEpicsSources];
00225 virtual ~IndexEvent() {}
00226 IndexEvent() {}
00227 IndexEvent(uint32_t seconds, uint32_t nanoseconds, uint32_t fiducial) : IndexFiducial(seconds,nanoseconds,fiducial) {}
00228 bool operator<(const IndexEvent& other) const {
00229 if (entry.time()==other.entry.time())
00230 return entry.uFiducial<other.entry.uFiducial;
00231 else
00232 return entry.time()<other.entry.time();
00233 }
00234 bool operator==(const IndexEvent& other) const {
00235 return entry.time()==other.entry.time() && entry.uFiducial==other.entry.uFiducial;
00236 }
00237 bool operator!=(const IndexEvent& other) const {
00238 return !(*this==other);
00239 }
00240 };
00241
00242 ostream& operator<<(ostream& os, const IndexEvent& idx) {
00243 os << "time " << std::hex << idx.entry.uSeconds << "/" << idx.entry.uNanoseconds << " fiducial " << idx.entry.uFiducial << ", filenum " << idx.file;
00244 return os;
00245 }
00246
00247 ostream& operator<<(ostream& os, const IndexCalib& idx) {
00248 os << "time " << std::hex << idx.entry.uSeconds << "/" << idx.entry.uNanoseconds;
00249 return os;
00250 }
00251
00252
00253
00254 class myLevelIter : public XtcIterator {
00255 public:
00256 enum {Stop, Continue};
00257 myLevelIter(Xtc* xtc, bool allowCorruptEpics) :
00258 XtcIterator(xtc), _allowCorruptEpics(allowCorruptEpics) {}
00259
00260 void process(const Epics::ConfigV1& e) {
00261 int32_t numpv = e.numPv();
00262 ndarray<const Epics::PvConfigV1, 1> pv = e.getPvConfig();
00263 float interval = pv[0].interval();
00264 for (int i=1;i<numpv;i++) {
00265 float nextinterval = pv[i].interval();
00266 if (interval != nextinterval) {
00267 if (_allowCorruptEpics) {
00268 MsgLog(logger, warning,
00269 "Index mode does not support multiple epics archiving "
00270 "intervals."
00271 << " Variable 0 interval: " << interval
00272 << ", Variable " << i << " interval " << nextinterval);
00273 } else {
00274 MsgLog(logger, fatal,
00275 "Index mode does not support multiple epics archiving intervals."
00276 << " Variable 0 interval: " << interval
00277 << ", Variable " << i << " interval " << nextinterval
00278 << endl << " This error can be turned into a warning "
00279 "by setting the parameter ""allow-corrupt-epics"" "
00280 "in the [psana] section of the configuration file, "
00281 "but slow EPICS data will be corrupt."
00282 );
00283 }
00284 break;
00285 }
00286 }
00287 }
00288 int process(Xtc* xtc) {
00289 Level::Type level = xtc->src.level();
00290 if (level < 0 || level >= Level::NumberOfLevels )
00291 {
00292 MsgLog(logger, warning,
00293 "Unsupported level " << (int) level);
00294 return Continue;
00295 }
00296 switch (xtc->contains.id()) {
00297 case (TypeId::Id_Xtc) : {
00298 myLevelIter iter(xtc,_allowCorruptEpics);
00299 iter.iterate();
00300 break;
00301 }
00302 case (TypeId::Id_EpicsConfig) :
00303 process(*(const Epics::ConfigV1*)(xtc->payload()));
00304 break;
00305 default:
00306 break;
00307 }
00308 return Continue;
00309 }
00310 private:
00311 bool _allowCorruptEpics;
00312 };
00313
00314
00315
00316
00317 class IndexRun {
00318 private:
00319
00320
00321
00322 void _checkEpicsInterval(Pds::Dgram* dg, bool allowCorruptEpics) {
00323 myLevelIter iter(&(dg->xtc),allowCorruptEpics);
00324 iter.iterate();
00325 }
00326
00327
00328 bool _getidx(const XtcFileName &xtcfile, Pds::Index::IndexList& idxlist) {
00329 string idxname = xtcfile.path();
00330 string basename = xtcfile.basename();
00331 size_t pos = idxname.find(basename,0);
00332 idxname.insert(pos,"index/");
00333 idxname.append(".idx");
00334 int fd = open(idxname.c_str(), O_RDONLY | O_LARGEFILE);
00335 if (fd < 0) {
00336 MsgLog(logger, warning, "Unable to open xtc index file " << idxname);
00337 return 1;
00338 }
00339 idxlist.readFromFile(fd);
00340 ::close(fd);
00341 return 0;
00342 }
00343
00344
00345
00346
00347
00348
00349 unsigned _getEpicsBit2SrcMap(const Pds::Index::IndexList::TSegmentToIdMap& seg, std::map<int,Pds::Src> &bit2Src) {
00350 unsigned mask = 0;
00351 for (Pds::Index::IndexList::TSegmentToIdMap::const_iterator it=seg.begin(); it!=seg.end(); ++it) {
00352 const Pds::Index::L1SegmentId::TTypeList& type = it->second.typeList;
00353 const Pds::Index::L1SegmentId::TSrcList& src = it->second.srcList;
00354 Pds::Index::L1SegmentId::TSrcList::const_iterator itsrc=src.begin();
00355 for (Pds::Index::L1SegmentId::TTypeList::const_iterator ittype=type.begin(); ittype!=type.end(); ++ittype) {
00356 Pds::TypeId::Type type = (*ittype).id();
00357 int index = it->second.iIndex;
00358 Pds::Src src = *itsrc;
00359 if (type==Pds::TypeId::Id_Epics) {
00360 bit2Src[1<<index]=src;
00361 mask |= 1<<index;
00362 }
00363 ++itsrc;
00364 }
00365 }
00366 return mask;
00367 }
00368
00369
00370
00371 template <typename T1, typename T2>
00372 void _store(T1 &idx, const T2 &add, vector<string>::size_type ifile) {
00373 int numadd = add.size();
00374 int numtot = idx.size();
00375 idx.resize(numadd+numtot);
00376 for (int i=0; i<numadd; i++) {
00377 idx[numtot].entry=add[i];
00378 idx[numtot].file = ifile;
00379 numtot++;
00380 }
00381 }
00382
00383
00384
00385
00386 void _fillTimes() {
00387 IndexEvent last(0,0,0);
00388
00389 _calibTimeIndex.clear();
00390 std::vector<IndexCalib>::const_iterator caliter = _idxcalib.begin();
00391 for (vector<IndexEvent>::iterator itev = _idx.begin(); itev != _idx.end(); ++ itev) {
00392 if (*itev!=last) {
00393 _times.push_back(psana::EventTime(itev->entry.time(),itev->entry.uFiducial));
00394 last = *itev;
00395 if (caliter != _idxcalib.end()) {
00396 if (itev->entry.time() > caliter->entry.time()) {
00397
00398
00399 _calibTimeIndex.push_back(_times.size()-1);
00400 ++caliter;
00401 }
00402 }
00403 }
00404 }
00405 }
00406
00407
00408
00409 void _add(Pds::Dgram* dg, int file) {
00410 _pieces.eventDg.push_back(XtcInput::Dgram(XtcInput::Dgram::make_ptr(dg),_xtc.files()[file]));
00411 }
00412
00413
00414
00415 void _post() {
00416 _queue.push(_pieces);
00417 }
00418
00419
00420 void _post(Pds::Dgram* dg, int file) {
00421 _add(dg, file);
00422 _post();
00423 }
00424
00425
00426 void _postOneDg(Pds::Dgram* dg, int file) {
00427 _pieces.reset();
00428 if (dg) _post(dg, file);
00429 }
00430
00431 void _addIocConfigure() {
00432 Pds::Dgram* dg=0;
00433 int file = 0;
00434 for (vector<XtcFileName>::const_iterator it = _xtc.files().begin();
00435 it != _xtc.files().end(); it++,file++) {
00436 if ((*it).stream()>=80 && (*it).chunk()==0) {
00437 dg = _xtc.jump(file, 0);
00438 if (dg->seq.service()!=Pds::TransitionId::Configure) {
00439 MsgLog(logger, fatal, "Configure transition not found at beginning of file" << (*it));
00440 }
00441 _add(dg,file);
00442 }
00443 }
00444 }
00445
00446
00447
00448
00449 void _configure(bool allowCorruptEpics) {
00450
00451 _pieces.reset();
00452
00453 int64_t offset = 0;
00454 for (int i=0; i<2; i++) {
00455 Pds::Dgram* dg = _xtc.jump(0, offset);
00456 if (dg->seq.service()==Pds::TransitionId::Configure) {
00457 _checkEpicsInterval(dg,allowCorruptEpics);
00458 _addIocConfigure();
00459 _post(dg,0);
00460 _beginrunOffset = dg->xtc.sizeofPayload()+sizeof(Pds::Dgram);
00461 return;
00462 }
00463 offset+=dg->xtc.sizeofPayload()+sizeof(Pds::Dgram);
00464 }
00465 MsgLog(logger, fatal, "Configure transition not found in first 2 datagrams");
00466 }
00467
00468
00469 void _beginrun() {
00470 Pds::Dgram* dg = _xtc.jump(0, _beginrunOffset);
00471 if (dg->seq.service()!=Pds::TransitionId::BeginRun)
00472 MsgLog(logger, fatal, "BeginRun transition not found after configure transition");
00473 _postOneDg(dg,0);
00474 }
00475
00476
00477
00478
00479 void _maybePostCalib(uint32_t seconds, uint32_t nanoseconds) {
00480 IndexCalib request(seconds,nanoseconds);
00481 vector<IndexCalib>::iterator it;
00482 it = lower_bound(_idxcalib.begin(),_idxcalib.end(),request);
00483 if (it==_idxcalib.begin()) {
00484 MsgLog(logger, warning, "Calib cycle for event time " << seconds << "/" << nanoseconds << " earlier than first calib-cycle time " << _idxcalib.begin()->entry.uSeconds << "/" << _idxcalib.begin()->entry.uNanoseconds << ". Sending first calib cycle. DAQ scan information may be incorrect.");
00485 it+=1;
00486 }
00487 vector<IndexCalib>::iterator calib;
00488 calib=it-1;
00489 if (*calib!=_lastcalib) {
00490
00491
00492 _postOneDg(_xtc.jump((*calib).file, (*calib).entry.i64Offset),(*calib).file);
00493 _lastcalib=*calib;
00494 }
00495 }
00496
00497
00498
00499
00500 void _maybeAddIoc(uint32_t seconds, uint32_t fiducial) {
00501 const unsigned window = 5;
00502 vector<IndexFiducial>::iterator it;
00503 it = lower_bound(_idxioc.begin(),_idxioc.end(),IndexFiducial(seconds-window,0,0));
00504 while (it!=_idxioc.end()) {
00505 if (it->entry.uSeconds>(seconds+window)) return;
00506 if (it->entry.uFiducial==fiducial) {
00507 Pds::Dgram* dg=0;
00508 dg = _xtc.jump((*it).file, (*it).entry.i64OffsetXtc);
00509 _add(dg,(*it).file);
00510 }
00511 it++;
00512 }
00513 }
00514
00515
00516
00517
00518 void _maybeAddEpics(const IndexEvent& evt, const vector<Pds::Src>& src, Pds::Dgram* dg) {
00519 int i=0;
00520 for (vector<EpicsInfo>::iterator last =_lastEpics.begin(); last!=_lastEpics.end(); last++) {
00521 EpicsInfo request = evt.einfo[i];
00522
00523 if (*last != request) {
00524
00525 if (request.file != evt.file || request.offset != evt.entry.i64OffsetXtc) {
00526 Pds::Dgram* epicsdg = _xtc.jump(request.file,request.offset);
00527 if (epicsdg) {
00528 _pieces.nonEventDg.push_back(XtcInput::Dgram(XtcInput::Dgram::make_ptr(epicsdg),_xtc.files()[request.file]));
00529 } else {
00530 MsgLog(logger, fatal, "Epics data not found at offset" << request.offset);
00531 }
00532 *last=request;
00533 }
00534 }
00535 i++;
00536 }
00537
00538
00539
00540 sort(_pieces.nonEventDg.begin(),_pieces.nonEventDg.end());
00541 }
00542
00543 typedef std::map<int,Pds::Src> epicsmap;
00544
00545
00546
00547
00548 void _storeIndex(const vector<XtcFileName> &xtclist, std::map<Pds::Src,int>& src2EpicsArray,
00549 vector<epicsmap>& bit2SrcVec, vector<unsigned>& epicsmask) {
00550 bool ifirst = 1;
00551 int ifile = 0;
00552 int firststream = -1;
00553 for (std::vector<XtcFileName>::const_iterator it = xtclist.begin(); it!=xtclist.end(); ++it) {
00554 Pds::Index::IndexList idxlist;
00555 unsigned stream = (*it).stream();
00556
00557 if (_getidx(*it, idxlist)) continue;
00558 _xtc.add(*it);
00559 if (stream<80) {
00560
00561 _store(_idx,idxlist.getL1(),ifile);
00562
00563
00564
00565
00566
00567
00568
00569 if (firststream==-1) {
00570 _store(_idxcalib,idxlist.getCalib(),ifile);
00571 firststream = stream;
00572 } else {
00573 if (stream==(unsigned)firststream)
00574 _store(_idxcalib,idxlist.getCalib(),ifile);
00575 }
00576
00577
00578
00579
00580
00581
00582 epicsmap bit2Src;
00583 epicsmask.push_back(_getEpicsBit2SrcMap(idxlist.getSeg(),bit2Src));
00584 if (ifirst) {
00585 ifirst = 0;
00586 int i=0;
00587 for (epicsmap::const_iterator it=bit2Src.begin(); it!=bit2Src.end(); ++it) {
00588 src2EpicsArray[it->second]=i;
00589 i++;
00590 _epicsSource.push_back(it->second);
00591 }
00592 }
00593 bit2SrcVec.push_back(bit2Src);
00594 } else {
00595
00596 _store(_idxioc,idxlist.getL1(),ifile);
00597 }
00598 ifile++;
00599 }
00600 _lastEpics.resize(_epicsSource.size());
00601
00602 sort(_idx.begin(),_idx.end());
00603 sort(_idxcalib.begin(),_idxcalib.end());
00604 sort(_idxioc.begin(),_idxioc.end());
00605 }
00606
00607
00608
00609 void _updateEpics(std::map<Pds::Src,int>& src2EpicsArray,
00610 vector<epicsmap>& bit2SrcVec, vector<unsigned>& epicsmask) {
00611
00612 vector<EpicsInfo> einfo(bit2SrcVec[0].size());
00613 for (vector<IndexEvent>::iterator itev = _idx.begin(); itev != _idx.end(); ++ itev) {
00614 unsigned detmask = ~itev->entry.uMaskDetData;
00615 int file = itev->file;
00616 if (detmask & epicsmask[file]) {
00617
00618 for (epicsmap::const_iterator itsrc=bit2SrcVec[file].begin(); itsrc!=bit2SrcVec[file].end(); ++itsrc) {
00619 if (itsrc->first & detmask) {
00620 Pds::Src src = itsrc->second;
00621 einfo[src2EpicsArray[src]].offset = itev->entry.i64OffsetXtc;
00622 einfo[src2EpicsArray[src]].file = itev->file;
00623 }
00624 }
00625 }
00626 int i=0;
00627
00628 for (vector<EpicsInfo>::iterator it = einfo.begin(); it != einfo.end(); ++it) {
00629 if (i>=IndexEvent::MaxEpicsSources)
00630 MsgLog(logger, fatal,"Too many epics sources: " << i+1 <<
00631 ". Max allowed: " << IndexEvent::MaxEpicsSources);
00632 itev->einfo[i]=*it;
00633 i++;
00634 }
00635 }
00636 }
00637
00638 public:
00639
00640 IndexRun(queue<DgramPieces>& queue, const vector<XtcFileName> &xtclist, bool allowCorruptEpics) :
00641 _xtc(), _beginrunOffset(0), _lastcalib(0,0), _queue(queue) {
00642
00643
00644 std::map<Pds::Src,int> src2EpicsArray;
00645 vector<epicsmap> bit2SrcVec;
00646 vector<unsigned> epicsmask;
00647 _storeIndex(xtclist,src2EpicsArray,bit2SrcVec,epicsmask);
00648
00649
00650 _updateEpics(src2EpicsArray, bit2SrcVec, epicsmask);
00651
00652
00653 _fillTimes();
00654
00655 _configure(allowCorruptEpics);
00656
00657 _beginrun();
00658 }
00659
00660 ~IndexRun() {}
00661
00662
00663 void times(psana::Index::EventTimeIter& begin, psana::Index::EventTimeIter& end) const {
00664 begin = _times.begin();
00665 end = _times.end();
00666 }
00667
00668 void times(unsigned step, psana::Index::EventTimeIter& begin, psana::Index::EventTimeIter& end) const {
00669
00670 if (_calibTimeIndex.size() < _idxcalib.size()-1 || _calibTimeIndex.size() > _idxcalib.size()) {
00671 MsgLog(logger, fatal, "Incorrect number of calibcycles: " << _calibTimeIndex.size() << " " << _idxcalib.size());
00672 }
00673 if (step>=_calibTimeIndex.size()) MsgLog(logger, fatal, "Requested step " << step << " not in range. Number of steps in this run: " << _calibTimeIndex.size());
00674 begin = _times.begin()+_calibTimeIndex[step];
00675 if (step==_calibTimeIndex.size()-1) {
00676 end = _times.end();
00677 } else {
00678 end = _times.begin()+_calibTimeIndex[step+1];
00679 }
00680 }
00681
00682 unsigned nsteps() const {return _calibTimeIndex.size();}
00683
00684 void endrun() {
00685 Pds::Dgram* dg = new Pds::Dgram;
00686 Pds::ClockTime ct;
00687 Pds::TimeStamp ts;
00688 Pds::Sequence seq(Pds::Sequence::Event,
00689 Pds::TransitionId::EndRun,
00690 ct, ts);
00691 dg->seq = seq;
00692 _postOneDg(dg,0);
00693 }
00694
00695
00696
00697 int jump(uint64_t timestamp, uint32_t fiducial) {
00698 uint32_t seconds= (uint32_t)((timestamp&0xffffffff00000000)>>32);
00699 uint32_t nanoseconds= (uint32_t)(timestamp&0xffffffff);
00700 IndexEvent request(seconds,nanoseconds,fiducial);
00701 vector<IndexEvent>::iterator it;
00702 it = lower_bound(_idx.begin(),_idx.end(),request);
00703 Pds::Dgram* dg=0;
00704 if (*it==request) {
00705 _maybePostCalib(seconds,nanoseconds);
00706 _pieces.reset();
00707
00708 int ifirst = 1;
00709 while (it!=_idx.end() && *it==request) {
00710 dg = _xtc.jump((*it).file, (*it).entry.i64OffsetXtc);
00711 _add(dg,(*it).file);
00712 if (ifirst) {
00713 ifirst = 0;
00714 _maybeAddEpics(*it,_epicsSource,dg);
00715 }
00716 it++;
00717 }
00718 _maybeAddIoc(seconds,fiducial);
00719 _post();
00720 return 0;
00721 } else {
00722 return 1;
00723 }
00724 }
00725
00726 private:
00727 IndexXtcReader _xtc;
00728 vector<IndexEvent> _idx;
00729 vector<IndexFiducial> _idxioc;
00730 vector<Pds::Src> _epicsSource;
00731 vector<IndexCalib> _idxcalib;
00732 int64_t _beginrunOffset;
00733 IndexCalib _lastcalib;
00734 vector<EpicsInfo> _lastEpics;
00735 DgramPieces _pieces;
00736 vector<psana::EventTime> _times;
00737 queue<DgramPieces>& _queue;
00738 vector<unsigned> _calibTimeIndex;
00739 };
00740
00741
00742
00743
00744 Index::Index(const std::string& name, std::queue<DgramPieces>& queue) : Configurable(name), _queue(queue),_idxrun(0),_run(-1) {
00745 _fileNames = configList("files");
00746 if ( _fileNames.empty() ) MsgLog(logger, fatal, "Empty file list");
00747 _rmap = new RunMap(_fileNames);
00748 _allowCorruptEpics = false;
00749 }
00750
00751 void Index::allowCorruptEpics() {_allowCorruptEpics = true;}
00752
00753 Index::~Index() {
00754 delete _idxrun;
00755 delete _rmap;
00756 }
00757
00758 int Index::jump(psana::EventTime time) {
00759 return _idxrun->jump(time.time(),time.fiducial());
00760 }
00761
00762 void Index::times(psana::Index::EventTimeIter& begin, psana::Index::EventTimeIter& end) {
00763 _idxrun->times(begin,end);
00764 }
00765
00766 void Index::times(unsigned step, psana::Index::EventTimeIter& begin, psana::Index::EventTimeIter& end) {
00767 _idxrun->times(step, begin, end);
00768 }
00769
00770 void Index::setrun(int run) {
00771
00772
00773
00774 if (run==_run) return;
00775 _run=run;
00776 if (not _rmap->runFiles.count(run)) MsgLog(logger, fatal, "Run " << run << " not found");
00777 delete _idxrun;
00778 _idxrun = new IndexRun(_queue,_rmap->runFiles[run],_allowCorruptEpics);
00779 }
00780
00781 unsigned Index::nsteps() {
00782 return _idxrun->nsteps();
00783 }
00784
00785 void Index::end() {
00786 _idxrun->endrun();
00787 }
00788
00789 const std::vector<unsigned>& Index::runs() {
00790 return _rmap->runs;
00791 }
00792
00793 }