PSXtcInput/src/DgramSourceFile.cpp

Go to the documentation of this file.
00001 //--------------------------------------------------------------------------
00002 // File and Version Information:
00003 //     $Id: DgramSourceFile.cpp 10730 2015-09-23 22:06:21Z davidsch@SLAC.STANFORD.EDU $
00004 //
00005 // Description:
00006 //     Class DgramSourceFile...
00007 //
00008 // Author List:
00009 //     Andy Salnikov
00010 //
00011 //------------------------------------------------------------------------
00012 
00013 //-----------------------
00014 // This Class's Header --
00015 //-----------------------
00016 #include "PSXtcInput/DgramSourceFile.h"
00017 
00018 //-----------------
00019 // C/C++ Headers --
00020 //-----------------
00021 #include <algorithm>
00022 
00023 //-------------------------------
00024 // Collaborating Class Headers --
00025 //-------------------------------
00026 #include "MsgLogger/MsgLogger.h"
00027 #include "PSXtcInput/Exceptions.h"
00028 #include "XtcInput/DgramQueue.h"
00029 #include "XtcInput/DgramReader.h"
00030 #include "XtcInput/MergeMode.h"
00031 #include "XtcInput/XtcFilesPosition.h"
00032 
00033 //-----------------------------------------------------------------------
00034 // Local Macros, Typedefs, Structures, Unions and Forward Declarations --
00035 //-----------------------------------------------------------------------
00036 
00037 using namespace XtcInput;
00038 
00039 //             ----------------------------------------
00040 //             -- Public Function Member Definitions --
00041 //             ----------------------------------------
00042 
00043 namespace {
00044 
00045   const unsigned MAX_SEC_DRIFT_FOR_FIDUCIAL_MATCH = 90;
00046 
00047   bool isL1Accept(const XtcInput::Dgram &dg) {
00048     if (dg.empty()) return false;
00049     XtcInput::Dgram::ptr pDg = dg.dg();
00050     return pDg->seq.service() == Pds::TransitionId::L1Accept;
00051   }
00052 
00053   bool transitionsMatch(const XtcInput::Dgram &dgA, const XtcInput::Dgram &dgB) {
00054     if (dgA.empty() or dgB.empty()) return false;
00055     XtcInput::Dgram::ptr pDgA = dgA.dg();
00056     XtcInput::Dgram::ptr pDgB = dgB.dg();
00057     return pDgA->seq.service() == pDgB->seq.service();
00058   }
00059 
00060   bool isFiducialMatchStream(const XtcInput::Dgram &dg, const int firstControlStream) {
00061     if (dg.empty()) return false;
00062     const XtcFileName& fileName = dg.file();
00063     if (fileName.empty()) return false;
00064     return (int(fileName.stream()) >= firstControlStream);
00065   }
00066   
00067   unsigned absDiff(unsigned a, unsigned b) {
00068     if (b >= a) return b-a;
00069     return a-b;
00070   }
00071 
00072   bool clockTimesMatch(const XtcInput::Dgram &dgA, const XtcInput::Dgram &dgB) {
00073     if (dgA.empty() or dgB.empty()) return false;
00074     XtcInput::Dgram::ptr pDgA = dgA.dg();
00075     XtcInput::Dgram::ptr pDgB = dgB.dg();
00076     unsigned secA = pDgA->seq.clock().seconds();
00077     unsigned secB = pDgB->seq.clock().seconds();
00078     if (secA != secB) return false;
00079     unsigned nanoA = pDgA->seq.clock().nanoseconds();
00080     unsigned nanoB = pDgB->seq.clock().nanoseconds();
00081     return nanoA == nanoB;
00082   }
00083 
00084 }
00085 
00086 namespace PSXtcInput {
00087 
00088 //----------------
00089 // Constructors --
00090 //----------------
00091 DgramSourceFile::DgramSourceFile (const std::string& name)
00092   : IDatagramSource()
00093   , psana::Configurable(name)
00094   , m_dgQueue(new XtcInput::DgramQueue(10))
00095   , m_readerThread()
00096   , m_fileNames()
00097   , m_firstControlStream(80)
00098 
00099 {
00100   m_fileNames = configList("files");
00101   if ( m_fileNames.empty() ) {
00102     throw EmptyFileList(ERR_LOC);
00103   }
00104 }
00105 
00106 //--------------
00107 // Destructor --
00108 //--------------
00109 DgramSourceFile::~DgramSourceFile ()
00110 {
00111   if (m_readerThread) {
00112     // ask the thread to stop
00113     m_readerThread->interrupt();
00114     MsgLog(name(), debug, "wait for reader thread to finish");
00115     // wait until it does
00116     m_readerThread->join();
00117     MsgLog(name(), debug, "reader thread has finished");
00118   }
00119 }
00120 
00121 // Initialization method for datagram source
00122 void 
00123 DgramSourceFile::init()
00124 {
00125   // will throw if no files were defined in config
00126   WithMsgLog(name(), debug, str) {
00127     str << "Input files: ";
00128     std::copy(m_fileNames.begin(), m_fileNames.end(), std::ostream_iterator<std::string>(str, " "));
00129   }
00130   
00131   // start reader thread
00132   std::string liveDbConn = configStr("liveDbConn", "");
00133   std::string liveTable = configStr("liveTable", "file");
00134   unsigned liveTimeout = config("liveTimeout", 120U);
00135   unsigned runLiveTimeout = config("runLiveTimeout", 0U);
00136   double l1offset = config("l1offset", 0.0);
00137   MergeMode merge = mergeMode(configStr("mergeMode", "FileName"));
00138   m_firstControlStream = config("first_control_stream",80);
00139   m_maxStreamClockDiffSec = config("max_stream_clock_diff",85);
00140 
00141   std::list<off64_t> emptyOffsets;
00142   std::list<off64_t> offsets =  configList("third_event_jump_offsets",emptyOffsets);
00143   std::list<std::string> emptyStrings;
00144   std::list<std::string> filenames = configList("third_event_jump_filenames",emptyStrings);
00145 
00146   boost::shared_ptr<XtcFilesPosition> firstEventAfterConfigure;
00147   if ((offsets.size() > 0) or (filenames.size() > 0)) {
00148     firstEventAfterConfigure = boost::make_shared<XtcFilesPosition>(filenames,
00149                                                                     offsets);
00150   }
00151   m_readerThread.reset( new boost::thread( DgramReader ( m_fileNames.begin(), 
00152                                                          m_fileNames.end(),
00153                                                          *m_dgQueue, 
00154                                                          m_liveAvail,
00155                                                          merge, liveDbConn, 
00156                                                          liveTable, liveTimeout, runLiveTimeout,
00157                                                          l1offset, 
00158                                                          m_firstControlStream,
00159                                                          m_maxStreamClockDiffSec,
00160                                                          firstEventAfterConfigure) ) );
00161   MsgLog(name(), debug, "config params: liveDbConn=" << liveDbConn << ", " 
00162          << "liveTable=" << liveTable << ", "
00163          << "liveTimeout=" << liveTimeout << ", "
00164          << "runLiveTimeout=" << runLiveTimeout << ", "
00165          << "l1offset=" << l1offset << ", "
00166          << "mergeMode=" << merge << ", "
00167          << "first_control_stream=" << m_firstControlStream << ","
00168          << "max_stream_clock_diff=" << m_maxStreamClockDiffSec);
00169 }
00170 
00171 
00172 //  This method returns next datagram from the source
00173 bool
00174 DgramSourceFile::next(std::vector<XtcInput::Dgram>& eventDg, std::vector<XtcInput::Dgram>& nonEventDg)
00175 {
00176   XtcInput::Dgram dg = m_dgQueue->pop();
00177   if (not dg.empty()) {
00178     eventDg.push_back(dg);
00179     bool foundDgramForDifferentEvent = false;
00180     while (not foundDgramForDifferentEvent) {
00181       XtcInput::Dgram nextDg =  m_dgQueue->front();
00182       if (sameEvent(dg, nextDg)) {
00183         nextDg = m_dgQueue->pop();
00184         eventDg.push_back(nextDg);
00185       } else {
00186         foundDgramForDifferentEvent = true;
00187       }
00188     }
00189     return true;
00190   } else {
00191     return false;
00192   }
00193 }
00194 
00195 bool
00196 DgramSourceFile::liveAvail(int numEvents) {
00197   if (not m_liveAvail) return false;
00198   return m_liveAvail->availEventsIsAtLeast(numEvents);
00199 }
00200 
00201 bool DgramSourceFile::sameEvent(const XtcInput::Dgram &eventDg, const XtcInput::Dgram &otherDg) const
00202 {
00203   if (::isL1Accept(otherDg) and ::isL1Accept(eventDg) and 
00204       (::isFiducialMatchStream(otherDg, m_firstControlStream) or
00205        ::isFiducialMatchStream(eventDg, m_firstControlStream)) and
00206       fiducialSecondsMatch(eventDg, otherDg)) {
00207     return true;
00208   }
00209   if ((not ::isL1Accept(otherDg)) and
00210       (not ::isL1Accept(eventDg)) and 
00211       ::transitionsMatch(otherDg, eventDg) and
00212       ::clockTimesMatch(otherDg, eventDg)) {
00213     return true;
00214   }
00215   return false;
00216 }
00217 
00218 bool DgramSourceFile::fiducialSecondsMatch(const XtcInput::Dgram &dgA, const XtcInput::Dgram &dgB) const {
00219   if (dgA.empty() or dgB.empty()) return false;
00220   XtcInput::Dgram::ptr pDgA = dgA.dg();
00221   XtcInput::Dgram::ptr pDgB = dgB.dg();
00222   unsigned fidA = pDgA->seq.stamp().fiducials();
00223   unsigned fidB = pDgB->seq.stamp().fiducials();
00224   if (fidA != fidB) return false;
00225   unsigned secA = pDgA->seq.clock().seconds();
00226   unsigned secB = pDgB->seq.clock().seconds();
00227   unsigned drift = ::absDiff(secA,secB);
00228   return drift < m_maxStreamClockDiffSec;
00229 }
00230 
00231 
00232 } // namespace PSXtcInput

Generated on 19 Dec 2016 for PSANAclasses by  doxygen 1.4.7