00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include "PSXtcInput/DgramSourceFile.h"
00017
00018
00019
00020
00021 #include <algorithm>
00022
00023
00024
00025
00026 #include "MsgLogger/MsgLogger.h"
00027 #include "PSXtcInput/Exceptions.h"
00028 #include "XtcInput/DgramQueue.h"
00029 #include "XtcInput/DgramReader.h"
00030 #include "XtcInput/MergeMode.h"
00031 #include "XtcInput/XtcFilesPosition.h"
00032
00033
00034
00035
00036
00037 using namespace XtcInput;
00038
00039
00040
00041
00042
00043 namespace {
00044
00045 const unsigned MAX_SEC_DRIFT_FOR_FIDUCIAL_MATCH = 90;
00046
00047 bool isL1Accept(const XtcInput::Dgram &dg) {
00048 if (dg.empty()) return false;
00049 XtcInput::Dgram::ptr pDg = dg.dg();
00050 return pDg->seq.service() == Pds::TransitionId::L1Accept;
00051 }
00052
00053 bool transitionsMatch(const XtcInput::Dgram &dgA, const XtcInput::Dgram &dgB) {
00054 if (dgA.empty() or dgB.empty()) return false;
00055 XtcInput::Dgram::ptr pDgA = dgA.dg();
00056 XtcInput::Dgram::ptr pDgB = dgB.dg();
00057 return pDgA->seq.service() == pDgB->seq.service();
00058 }
00059
00060 bool isFiducialMatchStream(const XtcInput::Dgram &dg, const int firstControlStream) {
00061 if (dg.empty()) return false;
00062 const XtcFileName& fileName = dg.file();
00063 if (fileName.empty()) return false;
00064 return (int(fileName.stream()) >= firstControlStream);
00065 }
00066
00067 unsigned absDiff(unsigned a, unsigned b) {
00068 if (b >= a) return b-a;
00069 return a-b;
00070 }
00071
00072 bool clockTimesMatch(const XtcInput::Dgram &dgA, const XtcInput::Dgram &dgB) {
00073 if (dgA.empty() or dgB.empty()) return false;
00074 XtcInput::Dgram::ptr pDgA = dgA.dg();
00075 XtcInput::Dgram::ptr pDgB = dgB.dg();
00076 unsigned secA = pDgA->seq.clock().seconds();
00077 unsigned secB = pDgB->seq.clock().seconds();
00078 if (secA != secB) return false;
00079 unsigned nanoA = pDgA->seq.clock().nanoseconds();
00080 unsigned nanoB = pDgB->seq.clock().nanoseconds();
00081 return nanoA == nanoB;
00082 }
00083
00084 }
00085
00086 namespace PSXtcInput {
00087
00088
00089
00090
00091 DgramSourceFile::DgramSourceFile (const std::string& name)
00092 : IDatagramSource()
00093 , psana::Configurable(name)
00094 , m_dgQueue(new XtcInput::DgramQueue(10))
00095 , m_readerThread()
00096 , m_fileNames()
00097 , m_firstControlStream(80)
00098
00099 {
00100 m_fileNames = configList("files");
00101 if ( m_fileNames.empty() ) {
00102 throw EmptyFileList(ERR_LOC);
00103 }
00104 }
00105
00106
00107
00108
00109 DgramSourceFile::~DgramSourceFile ()
00110 {
00111 if (m_readerThread) {
00112
00113 m_readerThread->interrupt();
00114 MsgLog(name(), debug, "wait for reader thread to finish");
00115
00116 m_readerThread->join();
00117 MsgLog(name(), debug, "reader thread has finished");
00118 }
00119 }
00120
00121
00122 void
00123 DgramSourceFile::init()
00124 {
00125
00126 WithMsgLog(name(), debug, str) {
00127 str << "Input files: ";
00128 std::copy(m_fileNames.begin(), m_fileNames.end(), std::ostream_iterator<std::string>(str, " "));
00129 }
00130
00131
00132 std::string liveDbConn = configStr("liveDbConn", "");
00133 std::string liveTable = configStr("liveTable", "file");
00134 unsigned liveTimeout = config("liveTimeout", 120U);
00135 unsigned runLiveTimeout = config("runLiveTimeout", 0U);
00136 double l1offset = config("l1offset", 0.0);
00137 MergeMode merge = mergeMode(configStr("mergeMode", "FileName"));
00138 m_firstControlStream = config("first_control_stream",80);
00139 m_maxStreamClockDiffSec = config("max_stream_clock_diff",85);
00140
00141 std::list<off64_t> emptyOffsets;
00142 std::list<off64_t> offsets = configList("third_event_jump_offsets",emptyOffsets);
00143 std::list<std::string> emptyStrings;
00144 std::list<std::string> filenames = configList("third_event_jump_filenames",emptyStrings);
00145
00146 boost::shared_ptr<XtcFilesPosition> firstEventAfterConfigure;
00147 if ((offsets.size() > 0) or (filenames.size() > 0)) {
00148 firstEventAfterConfigure = boost::make_shared<XtcFilesPosition>(filenames,
00149 offsets);
00150 }
00151 m_readerThread.reset( new boost::thread( DgramReader ( m_fileNames.begin(),
00152 m_fileNames.end(),
00153 *m_dgQueue,
00154 m_liveAvail,
00155 merge, liveDbConn,
00156 liveTable, liveTimeout, runLiveTimeout,
00157 l1offset,
00158 m_firstControlStream,
00159 m_maxStreamClockDiffSec,
00160 firstEventAfterConfigure) ) );
00161 MsgLog(name(), debug, "config params: liveDbConn=" << liveDbConn << ", "
00162 << "liveTable=" << liveTable << ", "
00163 << "liveTimeout=" << liveTimeout << ", "
00164 << "runLiveTimeout=" << runLiveTimeout << ", "
00165 << "l1offset=" << l1offset << ", "
00166 << "mergeMode=" << merge << ", "
00167 << "first_control_stream=" << m_firstControlStream << ","
00168 << "max_stream_clock_diff=" << m_maxStreamClockDiffSec);
00169 }
00170
00171
00172
00173 bool
00174 DgramSourceFile::next(std::vector<XtcInput::Dgram>& eventDg, std::vector<XtcInput::Dgram>& nonEventDg)
00175 {
00176 XtcInput::Dgram dg = m_dgQueue->pop();
00177 if (not dg.empty()) {
00178 eventDg.push_back(dg);
00179 bool foundDgramForDifferentEvent = false;
00180 while (not foundDgramForDifferentEvent) {
00181 XtcInput::Dgram nextDg = m_dgQueue->front();
00182 if (sameEvent(dg, nextDg)) {
00183 nextDg = m_dgQueue->pop();
00184 eventDg.push_back(nextDg);
00185 } else {
00186 foundDgramForDifferentEvent = true;
00187 }
00188 }
00189 return true;
00190 } else {
00191 return false;
00192 }
00193 }
00194
00195 bool
00196 DgramSourceFile::liveAvail(int numEvents) {
00197 if (not m_liveAvail) return false;
00198 return m_liveAvail->availEventsIsAtLeast(numEvents);
00199 }
00200
00201 bool DgramSourceFile::sameEvent(const XtcInput::Dgram &eventDg, const XtcInput::Dgram &otherDg) const
00202 {
00203 if (::isL1Accept(otherDg) and ::isL1Accept(eventDg) and
00204 (::isFiducialMatchStream(otherDg, m_firstControlStream) or
00205 ::isFiducialMatchStream(eventDg, m_firstControlStream)) and
00206 fiducialSecondsMatch(eventDg, otherDg)) {
00207 return true;
00208 }
00209 if ((not ::isL1Accept(otherDg)) and
00210 (not ::isL1Accept(eventDg)) and
00211 ::transitionsMatch(otherDg, eventDg) and
00212 ::clockTimesMatch(otherDg, eventDg)) {
00213 return true;
00214 }
00215 return false;
00216 }
00217
00218 bool DgramSourceFile::fiducialSecondsMatch(const XtcInput::Dgram &dgA, const XtcInput::Dgram &dgB) const {
00219 if (dgA.empty() or dgB.empty()) return false;
00220 XtcInput::Dgram::ptr pDgA = dgA.dg();
00221 XtcInput::Dgram::ptr pDgB = dgB.dg();
00222 unsigned fidA = pDgA->seq.stamp().fiducials();
00223 unsigned fidB = pDgB->seq.stamp().fiducials();
00224 if (fidA != fidB) return false;
00225 unsigned secA = pDgA->seq.clock().seconds();
00226 unsigned secB = pDgB->seq.clock().seconds();
00227 unsigned drift = ::absDiff(secA,secB);
00228 return drift < m_maxStreamClockDiffSec;
00229 }
00230
00231
00232 }