00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include "XtcInput/XtcStreamMerger.h"
00017
00018
00019
00020
00021 #include <map>
00022 #include <iomanip>
00023 #include <sstream>
00024 #include <boost/make_shared.hpp>
00025
00026
00027
00028 #include "MsgLogger/MsgLogger.h"
00029 #include "XtcInput/ChunkFileIterList.h"
00030 #include "XtcInput/Exceptions.h"
00031 #include "pdsdata/xtc/TransitionId.hh"
00032
00033
00034
00035
00036
00037 #define DBGMSG debug
00038
00039 using namespace XtcInput;
00040
00041 namespace {
00042
00043 const char* logger = "XtcInput.XtcStreamMerger" ;
00044
00045 bool isDisable(const XtcInput::Dgram &dg) {
00046 if (dg.empty()) return false;
00047 Pds::TransitionId::Value nextService = dg.dg()->seq.service();
00048 return (nextService == Pds::TransitionId::Disable);
00049 }
00050
00051 boost::shared_ptr<XtcStreamDgIter::ThirdDatagram>
00052 checkForThirdDatagram(int stream, boost::shared_ptr<XtcFilesPosition> thirdEvent) {
00053
00054 if (not thirdEvent) {
00055 MsgLog(logger, DBGMSG, "XtcStreamMerger: no third event position");
00056 return boost::shared_ptr<XtcStreamDgIter::ThirdDatagram>();
00057 }
00058 if (not thirdEvent->hasStream(stream)) {
00059 std::stringstream msg;
00060 msg << stream;
00061 throw StreamNotInPosition(ERR_LOC, msg.str());
00062 }
00063 std::pair<XtcFileName, off64_t> thirdDatagramThisStream = thirdEvent->getChunkFileOffset(stream);
00064 boost::shared_ptr<XtcStreamDgIter::ThirdDatagram> thirdDgram =
00065 boost::make_shared<XtcStreamDgIter::ThirdDatagram>(thirdDatagramThisStream.first,
00066 thirdDatagramThisStream.second);
00067 return thirdDgram;
00068 }
00069
00070 }
00071
00072
00073
00074
00075
00076 namespace XtcInput {
00077
00078
00079
00080
00081 XtcStreamMerger::XtcStreamMerger(const boost::shared_ptr<StreamFileIterI>& streamIter,
00082 double l1OffsetSec, int firstControlStream,
00083 unsigned maxStreamClockDiffSec,
00084 boost::shared_ptr<XtcFilesPosition> thirdEvent)
00085 : m_streams()
00086 , m_priorTransBlock()
00087 , m_processingDAQ(false)
00088 , m_l1OffsetSec(int(l1OffsetSec))
00089 , m_l1OffsetNsec(int((l1OffsetSec-m_l1OffsetSec)*1e9))
00090 , m_firstControlStream(firstControlStream)
00091 , m_streamDgramGreater(maxStreamClockDiffSec)
00092 , m_thirdEvent(thirdEvent)
00093 , m_outputQueue(m_streamDgramGreater)
00094
00095 {
00096
00097
00098 int idxDAQ = 0;
00099 int idxCtrl = 0;
00100 while (true) {
00101 const boost::shared_ptr<ChunkFileIterI>& chunkFileIter = streamIter->next();
00102 if (not chunkFileIter) break;
00103
00104 bool controlStream = int(streamIter->stream()) >= m_firstControlStream;
00105
00106 boost::shared_ptr<XtcStreamDgIter::ThirdDatagram> thirdDatagram =
00107 checkForThirdDatagram(streamIter->stream(), m_thirdEvent);
00108
00109
00110 const boost::shared_ptr<XtcStreamDgIter>& stream =
00111 boost::make_shared<XtcStreamDgIter>(chunkFileIter, thirdDatagram, controlStream);
00112 if (controlStream) {
00113 StreamDgram dg(stream->next(), StreamDgram::controlUnderDAQ, 0, idxCtrl);
00114 StreamIndex streamIndex(StreamDgram::controlUnderDAQ, idxCtrl);
00115 ++idxCtrl;
00116 m_streams[streamIndex] = stream;
00117 m_priorTransBlock[streamIndex] = getInitialTransBlock(dg);
00118 if (not dg.empty()) updateDgramTime(*dg.dg());
00119 m_outputQueue.push(dg);
00120 MsgLog(logger, DBGMSG, "XtcStreamMerger initialization. Added "
00121 << StreamDgram::dumpStr(dg));
00122 } else {
00123
00124 StreamDgram dg(stream->next(), StreamDgram::DAQ, 0, idxDAQ);
00125 StreamIndex streamIndex(StreamDgram::DAQ, idxDAQ);
00126 ++idxDAQ;
00127 m_streams[streamIndex] = stream;
00128 m_priorTransBlock[streamIndex] = getInitialTransBlock(dg);
00129 if (not dg.empty()) updateDgramTime(*dg.dg());
00130 m_outputQueue.push(dg);
00131 MsgLog(logger, DBGMSG, "XtcStreamMerger initialization. Added "
00132 << StreamDgram::dumpStr(dg));
00133 }
00134 }
00135 if (idxDAQ > 0) {
00136 m_processingDAQ = true;
00137 }
00138 MsgLog(logger, DBGMSG, "XtcStreamMerger initialization: "
00139 << idxDAQ << " DAQ streams and " << idxCtrl << " control streams");
00140 }
00141
00142
00143
00144
00145 XtcStreamMerger::~XtcStreamMerger ()
00146 {
00147 }
00148
00149
00150
00151 Dgram
00152 XtcStreamMerger::next()
00153 {
00154 MutexLock protect(m_protect);
00155 if (m_outputQueue.empty()) return Dgram();
00156
00157 StreamDgram nextStreamDg = m_outputQueue.top();
00158 m_outputQueue.pop();
00159 int replaceStreamId = nextStreamDg.streamId();
00160 StreamIndex replaceStreamIndex(nextStreamDg.streamType(), replaceStreamId);
00161
00162 if (m_streams.find(replaceStreamIndex) == m_streams.end()) {
00163 throw psana::Exception(ERR_LOC, "XtcStreamMerger::next() replacement stream index not found in m_streams");
00164 }
00165 if (m_priorTransBlock.find(replaceStreamIndex) == m_priorTransBlock.end()) {
00166 throw psana::Exception(ERR_LOC, "XtcStreamMerger::next() replacement stream index not found in m_priorTransBlock");
00167 }
00168
00169 MsgLog(logger,DBGMSG,"next() returning: " << StreamDgram::dumpStr(nextStreamDg));
00170
00171 bool replaced = false;
00172 while (not replaced) {
00173 Dgram replaceDg = m_streams[replaceStreamIndex]->next();
00174 TransBlock lastTransBlock = m_priorTransBlock[replaceStreamIndex];
00175 uint64_t replaceBlock = getNextBlock(lastTransBlock, replaceDg);
00176 m_priorTransBlock[replaceStreamIndex] = makeTransBlock(replaceDg, replaceBlock);
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188
00189
00190
00191
00192 bool skip = false;
00193 if (not replaceDg.empty()) {
00194 if ( (replaceDg.dg()->seq.service() == Pds::TransitionId::Enable) or
00195 (replaceDg.dg()->seq.service() == Pds::TransitionId::Disable) ) {
00196 MsgLog(logger, DBGMSG, "next() skipping Enable or Disable in "
00197 << dumpStr(replaceStreamIndex));
00198 skip = true;
00199 } else if ( (replaceDg.dg()->seq.service() == Pds::TransitionId::L1Accept) and
00200 (replaceDg.dg()->seq.stamp().fiducials() >= Pds::TimeStamp::MaxFiducials) ) {
00201 MsgLog(logger, DBGMSG, "next() skipping L1Accept with fiducials >= "
00202 << Pds::TimeStamp::MaxFiducials << " in "
00203 << dumpStr(replaceStreamIndex));
00204 skip = true;
00205 }
00206 } else if (processingDAQ()) {
00207 if ((nextStreamDg.streamType() == StreamDgram::controlUnderDAQ) or
00208 (nextStreamDg.streamType() == StreamDgram::controlIndependent)) {
00209 if (not replaceDg.empty()) {
00210 Pds::TransitionId::Value replaceTrans = replaceDg.dg()->seq.service();
00211 if (replaceTrans == Pds::TransitionId::Configure) {
00212
00213
00214 MsgLog(logger, warning, "Discarding Configure transition found in "
00215 << replaceDg.file().path()
00216 << " expected if processing multiple runs, investigate further if not");
00217 }
00218 if (replaceTrans != Pds::TransitionId::L1Accept) {
00219 MsgLog(logger, DBGMSG, "next() skipping non L1Accept in "
00220 << dumpStr(replaceStreamIndex));
00221 skip = true;
00222 }
00223 }
00224 }
00225 }
00226 if (not skip) {
00227 StreamDgram replaceStreamDg(replaceDg, nextStreamDg.streamType(), replaceBlock, replaceStreamId);
00228 m_outputQueue.push(replaceStreamDg);
00229 replaced = true;
00230 }
00231 }
00232 return nextStreamDg;
00233 }
00234
00235
00236 void
00237 XtcStreamMerger::updateDgramTime(Pds::Dgram& dgram) const
00238 {
00239 if ( dgram.seq.service() != Pds::TransitionId::L1Accept ) {
00240
00241
00242 const Pds::ClockTime& time = dgram.seq.clock() ;
00243 int32_t sec = time.seconds() + m_l1OffsetSec;
00244 int32_t nsec = time.nanoseconds() + m_l1OffsetNsec;
00245 if (nsec < 0) {
00246 nsec += 1000000000;
00247 -- sec;
00248 } else if (nsec >= 1000000000) {
00249 nsec -= 1000000000;
00250 ++ sec;
00251 }
00252 Pds::ClockTime newTime(sec, nsec) ;
00253
00254
00255
00256 dgram.seq = Pds::Sequence(newTime, dgram.seq.stamp());
00257 }
00258 }
00259
00260 XtcStreamMerger::TransBlock XtcStreamMerger::makeTransBlock(const Dgram &dg, uint64_t block) {
00261 if (dg.empty()) {
00262 return TransBlock();
00263 }
00264 return TransBlock(dg.dg()->seq.service(), block, dg.file().run());
00265 }
00266
00267 XtcStreamMerger::TransBlock XtcStreamMerger::getInitialTransBlock(const Dgram &dg) {
00268 return makeTransBlock(dg, 0);
00269 }
00270
00271 uint64_t XtcStreamMerger::getNextBlock(const TransBlock & prevTransBlock, const Dgram &dg) {
00272 if (dg.empty()) {
00273 return prevTransBlock.block;
00274 }
00275 int nextRun = dg.file().run();
00276 if (nextRun != prevTransBlock.run) return 0;
00277 Pds::TransitionId::Value nextService = dg.dg()->seq.service();
00278
00279
00280
00281 if ((prevTransBlock.trans != Pds::TransitionId::EndCalibCycle) and
00282 (nextService == Pds::TransitionId::EndCalibCycle)) {
00283 return prevTransBlock.block + 1;
00284 }
00285 return prevTransBlock.block;
00286 }
00287
00288 std::string XtcStreamMerger::dumpStr(const StreamIndex &streamIndex) {
00289 std::ostringstream msg;
00290 msg << "streamType="
00291 << StreamDgram::streamType2str(streamIndex.first)
00292 << " streamId=" << std::setw(2) << streamIndex.second;
00293 return msg.str();
00294 }
00295
00296 unsigned
00297 XtcStreamMerger::countAvailDgramsStopAt(unsigned maxToCount) {
00298 MutexLock protect(m_protect);
00299 unsigned count = 0;
00300 typedef std::map<StreamIndex, boost::shared_ptr<XtcStreamDgIter> > StreamMap;
00301 for (StreamMap::iterator it = m_streams.begin(); it != m_streams.end(); ++it) {
00302 if (count >= maxToCount) break;
00303 StreamDgram::StreamType streamType = (it->first).first;
00304 if (streamType == StreamDgram::controlUnderDAQ) continue;
00305 boost::shared_ptr<XtcStreamDgIter> stream = it->second;
00306 boost::shared_ptr<DgHeader> latestStreamDgHeader = stream->latestDgHeaderInQueue();
00307 if (latestStreamDgHeader) {
00308 unsigned res = m_streamAvail.countUpTo(latestStreamDgHeader->path(),
00309 latestStreamDgHeader->offset(),
00310 maxToCount - count);
00311 MsgLog(logger, DBGMSG, " m_streamAvail.countUpTo(" << latestStreamDgHeader->path()
00312 << ", " << latestStreamDgHeader->offset() << ", " << maxToCount - count << ")="
00313 << res);
00314 count += res;
00315 }
00316 }
00317 MsgLog(logger, DBGMSG, "---XtcStreamMerger::countAvailDgramsStopAt(unsigned " << maxToCount << ")=" << count);
00318 return count;
00319 }
00320
00321 }