XtcInput/src/XtcStreamMerger.cpp

Go to the documentation of this file.
00001 //--------------------------------------------------------------------------
00002 // File and Version Information:
00003 //     $Id: XtcStreamMerger.cpp 11768 2016-04-23 13:28:02Z davidsch@SLAC.STANFORD.EDU $
00004 //
00005 // Description:
00006 //     Class XtcStreamMerger...
00007 //
00008 // Author List:
00009 //     Andrei Salnikov
00010 //
00011 //------------------------------------------------------------------------
00012 
00013 //-----------------------
00014 // This Class's Header --
00015 //-----------------------
00016 #include "XtcInput/XtcStreamMerger.h"
00017 
00018 //-----------------
00019 // C/C++ Headers --
00020 //-----------------
00021 #include <map>
00022 #include <iomanip>
00023 #include <sstream>
00024 #include <boost/make_shared.hpp>
00025 //-------------------------------
00026 // Collaborating Class Headers --
00027 //-------------------------------
00028 #include "MsgLogger/MsgLogger.h"
00029 #include "XtcInput/ChunkFileIterList.h"
00030 #include "XtcInput/Exceptions.h"
00031 #include "pdsdata/xtc/TransitionId.hh"
00032 
00033 //-----------------------------------------------------------------------
00034 // Local Macros, Typedefs, Structures, Unions and Forward Declarations --
00035 //-----------------------------------------------------------------------
00036 
00037 #define DBGMSG debug
00038 
00039 using namespace XtcInput;
00040 
00041 namespace {
00042 
00043 const char* logger = "XtcInput.XtcStreamMerger" ;
00044 
00045 bool isDisable(const XtcInput::Dgram &dg) {
00046   if (dg.empty()) return false;
00047   Pds::TransitionId::Value nextService = dg.dg()->seq.service();
00048   return (nextService == Pds::TransitionId::Disable);
00049 }
00050 
00051 boost::shared_ptr<XtcStreamDgIter::ThirdDatagram> 
00052 checkForThirdDatagram(int stream, boost::shared_ptr<XtcFilesPosition> thirdEvent) {
00053 
00054   if (not thirdEvent) {
00055     MsgLog(logger, DBGMSG, "XtcStreamMerger: no third event position");
00056     return boost::shared_ptr<XtcStreamDgIter::ThirdDatagram>();
00057   }
00058   if (not thirdEvent->hasStream(stream)) {
00059     std::stringstream msg;
00060     msg << stream;
00061     throw StreamNotInPosition(ERR_LOC, msg.str());
00062   }
00063   std::pair<XtcFileName, off64_t> thirdDatagramThisStream = thirdEvent->getChunkFileOffset(stream);
00064   boost::shared_ptr<XtcStreamDgIter::ThirdDatagram> thirdDgram = 
00065     boost::make_shared<XtcStreamDgIter::ThirdDatagram>(thirdDatagramThisStream.first,
00066                                                         thirdDatagramThisStream.second);
00067   return thirdDgram;
00068 }
00069   
00070 } // local namespace
00071 
00072 //              ----------------------------------------
00073 //              -- Public Function Member Definitions --
00074 //              ----------------------------------------
00075 
00076 namespace XtcInput {
00077 
00078 //----------------
00079 // Constructors --
00080 //----------------
00081 XtcStreamMerger::XtcStreamMerger(const boost::shared_ptr<StreamFileIterI>& streamIter,
00082                                  double l1OffsetSec, int firstControlStream,
00083                                  unsigned maxStreamClockDiffSec,
00084                                  boost::shared_ptr<XtcFilesPosition> thirdEvent) 
00085   : m_streams()
00086   , m_priorTransBlock()
00087   , m_processingDAQ(false)
00088   , m_l1OffsetSec(int(l1OffsetSec))
00089   , m_l1OffsetNsec(int((l1OffsetSec-m_l1OffsetSec)*1e9))
00090   , m_firstControlStream(firstControlStream)
00091   , m_streamDgramGreater(maxStreamClockDiffSec)
00092   , m_thirdEvent(thirdEvent)
00093   , m_outputQueue(m_streamDgramGreater)
00094     
00095 {
00096 
00097   // create all streams
00098   int idxDAQ = 0;
00099   int idxCtrl = 0;
00100   while (true) {
00101     const boost::shared_ptr<ChunkFileIterI>& chunkFileIter = streamIter->next();
00102     if (not chunkFileIter) break;
00103 
00104     bool controlStream = int(streamIter->stream()) >= m_firstControlStream;
00105 
00106     boost::shared_ptr<XtcStreamDgIter::ThirdDatagram> thirdDatagram = 
00107       checkForThirdDatagram(streamIter->stream(), m_thirdEvent);
00108 
00109     // create new stream
00110     const boost::shared_ptr<XtcStreamDgIter>& stream = 
00111       boost::make_shared<XtcStreamDgIter>(chunkFileIter, thirdDatagram, controlStream);
00112     if (controlStream) {
00113       StreamDgram dg(stream->next(), StreamDgram::controlUnderDAQ, 0, idxCtrl);
00114       StreamIndex streamIndex(StreamDgram::controlUnderDAQ, idxCtrl);
00115       ++idxCtrl;
00116       m_streams[streamIndex] = stream;
00117       m_priorTransBlock[streamIndex] = getInitialTransBlock(dg);
00118       if (not dg.empty()) updateDgramTime(*dg.dg());
00119       m_outputQueue.push(dg);
00120       MsgLog(logger, DBGMSG, "XtcStreamMerger initialization. Added " 
00121              << StreamDgram::dumpStr(dg)); 
00122     } else {
00123       // this is a DAQ stream
00124       StreamDgram dg(stream->next(), StreamDgram::DAQ, 0, idxDAQ);
00125       StreamIndex streamIndex(StreamDgram::DAQ, idxDAQ);
00126       ++idxDAQ;
00127       m_streams[streamIndex] = stream;
00128       m_priorTransBlock[streamIndex] = getInitialTransBlock(dg);
00129       if (not dg.empty()) updateDgramTime(*dg.dg());
00130       m_outputQueue.push(dg);
00131       MsgLog(logger, DBGMSG, "XtcStreamMerger initialization. Added " 
00132              << StreamDgram::dumpStr(dg));
00133     }
00134   }
00135   if (idxDAQ > 0) {
00136     m_processingDAQ = true;
00137   }
00138   MsgLog(logger, DBGMSG, "XtcStreamMerger initialization: "
00139          << idxDAQ << " DAQ streams and " << idxCtrl << " control streams");
00140 }
00141 
00142 //--------------
00143 // Destructor --
00144 //--------------
00145 XtcStreamMerger::~XtcStreamMerger ()
00146 {
00147 }
00148 
00149 // read next datagram, return zero pointer after last file has been read,
00150 // throws exception for errors.
00151 Dgram
00152 XtcStreamMerger::next()
00153 {
00154   MutexLock protect(m_protect);
00155   if (m_outputQueue.empty()) return Dgram();
00156 
00157   StreamDgram nextStreamDg = m_outputQueue.top();
00158   m_outputQueue.pop();
00159   int replaceStreamId = nextStreamDg.streamId();
00160   StreamIndex replaceStreamIndex(nextStreamDg.streamType(), replaceStreamId);
00161     
00162   if (m_streams.find(replaceStreamIndex) == m_streams.end()) {
00163     throw psana::Exception(ERR_LOC, "XtcStreamMerger::next() replacement stream index not found in m_streams");
00164   }
00165   if (m_priorTransBlock.find(replaceStreamIndex) == m_priorTransBlock.end()) {
00166     throw psana::Exception(ERR_LOC, "XtcStreamMerger::next() replacement stream index not found in m_priorTransBlock");
00167   }
00168 
00169   MsgLog(logger,DBGMSG,"next() returning: " << StreamDgram::dumpStr(nextStreamDg));
00170 
00171   bool replaced = false;
00172   while (not replaced) {
00173     Dgram replaceDg = m_streams[replaceStreamIndex]->next();
00174     TransBlock lastTransBlock = m_priorTransBlock[replaceStreamIndex];
00175     uint64_t replaceBlock = getNextBlock(lastTransBlock, replaceDg);
00176     m_priorTransBlock[replaceStreamIndex] = makeTransBlock(replaceDg, replaceBlock);
00177 
00178     // skip over enable and disable transitions in all streams. We use EndCalibCycle for 
00179     // the L1Block count as its synchronization across the DAQ streams is more robust.
00180     // Sending the Enable/Disable pairs that exist in between Begin/End Calib cycle's 
00181     // through the prioriry queue is Ok, but it is cleaner to keep them out.
00182     // Downstream processing in PSXtcInput does not use Disable/Enable.
00183 
00184     // For control streams, if we are processing DAQ streams, skip over all transitions.
00185     // they should contain no event data. However if we are only processing control streams,
00186     // process the control transitions.
00187 
00188     // Skip over L1Accepts that have 0x1FFFF as the fiducial value. In older data, the control streams 
00189     // were producing these when they had trouble getting the DAQ timestamp. These control 
00190     // stream Dgrams cannot be merged and will trigger warnings from FiducialsCompare.
00191 
00192     bool skip = false;
00193     if (not replaceDg.empty()) {
00194       if ( (replaceDg.dg()->seq.service() == Pds::TransitionId::Enable) or
00195            (replaceDg.dg()->seq.service() == Pds::TransitionId::Disable) ) {
00196         MsgLog(logger, DBGMSG, "next() skipping Enable or Disable in " 
00197                << dumpStr(replaceStreamIndex));
00198         skip = true;
00199       } else if ( (replaceDg.dg()->seq.service() == Pds::TransitionId::L1Accept) and
00200                   (replaceDg.dg()->seq.stamp().fiducials() >= Pds::TimeStamp::MaxFiducials) ) {
00201         MsgLog(logger, DBGMSG, "next() skipping L1Accept with fiducials >= " 
00202                << Pds::TimeStamp::MaxFiducials << " in " 
00203                << dumpStr(replaceStreamIndex));
00204         skip = true;
00205       }
00206     } else if (processingDAQ()) {
00207       if ((nextStreamDg.streamType() == StreamDgram::controlUnderDAQ) or 
00208           (nextStreamDg.streamType() == StreamDgram::controlIndependent)) {
00209         if (not replaceDg.empty()) {
00210           Pds::TransitionId::Value replaceTrans = replaceDg.dg()->seq.service();
00211           if (replaceTrans == Pds::TransitionId::Configure) {
00212             // the first configure was put in the queue during initialization. Now we have a
00213             // configure in the midst of the stream.
00214             MsgLog(logger, warning, "Discarding Configure transition found in " 
00215                    << replaceDg.file().path()
00216                    << " expected if processing multiple runs, investigate further if not");
00217           }
00218           if (replaceTrans != Pds::TransitionId::L1Accept) {
00219             MsgLog(logger, DBGMSG, "next() skipping non L1Accept in " 
00220                    << dumpStr(replaceStreamIndex));
00221             skip = true;
00222           }
00223         }
00224       }
00225     }
00226     if (not skip) {
00227       StreamDgram replaceStreamDg(replaceDg, nextStreamDg.streamType(), replaceBlock, replaceStreamId);
00228       m_outputQueue.push(replaceStreamDg);
00229       replaced = true;
00230     }
00231   }
00232   return nextStreamDg;
00233 }
00234 
00235 // updates the time for non L1 Accepts
00236 void 
00237 XtcStreamMerger::updateDgramTime(Pds::Dgram& dgram) const
00238 {
00239   if ( dgram.seq.service() != Pds::TransitionId::L1Accept ) {
00240 
00241     // update clock values
00242     const Pds::ClockTime& time = dgram.seq.clock() ;
00243     int32_t sec = time.seconds() + m_l1OffsetSec;
00244     int32_t nsec = time.nanoseconds() + m_l1OffsetNsec;
00245     if (nsec < 0) {
00246         nsec += 1000000000;
00247         -- sec;
00248     } else if (nsec >= 1000000000) {
00249         nsec -= 1000000000;
00250         ++ sec;
00251     }      
00252     Pds::ClockTime newTime(sec, nsec) ;
00253 
00254     // there is no way to change clock field in datagram but there is 
00255     // an assignment operator
00256     dgram.seq = Pds::Sequence(newTime, dgram.seq.stamp());
00257   }
00258 }
00259 
00260 XtcStreamMerger::TransBlock XtcStreamMerger::makeTransBlock(const Dgram &dg, uint64_t block) {
00261   if (dg.empty()) {
00262     return TransBlock();
00263   }
00264   return TransBlock(dg.dg()->seq.service(), block, dg.file().run());
00265 }
00266 
00267 XtcStreamMerger::TransBlock XtcStreamMerger::getInitialTransBlock(const Dgram &dg) {
00268   return makeTransBlock(dg, 0);
00269 }
00270 
00271 uint64_t  XtcStreamMerger::getNextBlock(const TransBlock & prevTransBlock, const Dgram &dg) {
00272   if (dg.empty()) {
00273     return prevTransBlock.block;
00274   }
00275   int nextRun = dg.file().run();
00276   if (nextRun != prevTransBlock.run) return 0;
00277   Pds::TransitionId::Value nextService = dg.dg()->seq.service();
00278   // increment the block count if we see a EndCalibCycle, and the prior transition was not also a
00279   // EndCalibCycle. generally there should not be two EndCalibCycle's in a row, this protects against 
00280   // problems in the data
00281   if ((prevTransBlock.trans != Pds::TransitionId::EndCalibCycle) and 
00282       (nextService == Pds::TransitionId::EndCalibCycle)) {
00283     return prevTransBlock.block + 1;
00284   }
00285   return prevTransBlock.block;
00286 }
00287 
00288 std::string XtcStreamMerger::dumpStr(const StreamIndex &streamIndex) {
00289   std::ostringstream msg;
00290   msg << "streamType=" 
00291       << StreamDgram::streamType2str(streamIndex.first) 
00292       << " streamId=" << std::setw(2) << streamIndex.second;
00293   return msg.str();
00294 } 
00295 
00296 unsigned 
00297 XtcStreamMerger::countAvailDgramsStopAt(unsigned maxToCount) {
00298   MutexLock protect(m_protect);
00299   unsigned count = 0;
00300   typedef std::map<StreamIndex, boost::shared_ptr<XtcStreamDgIter> > StreamMap;
00301   for (StreamMap::iterator it = m_streams.begin(); it != m_streams.end(); ++it) {
00302     if (count >= maxToCount) break;
00303     StreamDgram::StreamType streamType = (it->first).first;
00304     if (streamType == StreamDgram::controlUnderDAQ) continue;
00305     boost::shared_ptr<XtcStreamDgIter> stream = it->second;
00306     boost::shared_ptr<DgHeader> latestStreamDgHeader = stream->latestDgHeaderInQueue();
00307     if (latestStreamDgHeader) {
00308       unsigned res = m_streamAvail.countUpTo(latestStreamDgHeader->path(),
00309                                        latestStreamDgHeader->offset(),
00310                                        maxToCount - count);
00311       MsgLog(logger, DBGMSG, "    m_streamAvail.countUpTo(" << latestStreamDgHeader->path()
00312              << ", " << latestStreamDgHeader->offset() << ", " << maxToCount - count << ")="
00313              << res);
00314       count += res;
00315     }
00316   }
00317   MsgLog(logger, DBGMSG, "---XtcStreamMerger::countAvailDgramsStopAt(unsigned " << maxToCount << ")=" << count);
00318   return count;
00319 }
00320 
00321 } // namespace XtcInput

Generated on 19 Dec 2016 for PSDMSoftware by  doxygen 1.4.7