PSShmemInput/src/DgramSourceShmem.cpp

Go to the documentation of this file.
00001 //--------------------------------------------------------------------------
00002 // File and Version Information:
00003 //      $Id: DgramSourceShmem.cpp 9165 2014-10-27 19:33:20Z cpo@SLAC.STANFORD.EDU $
00004 //
00005 // Description:
00006 //      Class DgramSourceShmem...
00007 //
00008 // Author List:
00009 //      Andy Salnikov
00010 //
00011 //------------------------------------------------------------------------
00012 
00013 //-----------------------
00014 // This Class's Header --
00015 //-----------------------
00016 #include "PSShmemInput/DgramSourceShmem.h"
00017 
00018 //-----------------
00019 // C/C++ Headers --
00020 //-----------------
00021 #include <boost/lexical_cast.hpp>
00022 #include <boost/algorithm/string.hpp>
00023 
00024 //-------------------------------
00025 // Collaborating Class Headers --
00026 //-------------------------------
00027 #include "IData/Dataset.h"
00028 #include "MsgLogger/MsgLogger.h"
00029 #include "PSShmemInput/Exceptions.h"
00030 #include "PSShmemInput/ShmemMonitorClient.h"
00031 #include "XtcInput/DgramQueue.h"
00032 
00033 //-----------------------------------------------------------------------
00034 // Local Macros, Typedefs, Structures, Unions and Forward Declarations --
00035 //-----------------------------------------------------------------------
00036 
00037 //              ----------------------------------------
00038 //              -- Public Function Member Definitions --
00039 //              ----------------------------------------
00040 
00041 namespace PSShmemInput {
00042 
00043 //----------------
00044 // Constructors --
00045 //----------------
00046 DgramSourceShmem::DgramSourceShmem (const std::string& name)
00047   : PSXtcInput::IDatagramSource()
00048   , psana::Configurable(name)
00049   , m_dgQueue(new XtcInput::DgramQueue(3))
00050   , m_readerThread()
00051 {
00052 }
00053 
00054 //--------------
00055 // Destructor --
00056 //--------------
00057 DgramSourceShmem::~DgramSourceShmem ()
00058 {
00059   if (m_readerThread) {
00060     // ask the thread to stop
00061     m_readerThread->interrupt();
00062     MsgLog(name(), debug, "wait for reader thread to finish");
00063     // wait until it does
00064     m_readerThread->join();
00065     MsgLog(name(), debug, "reader thread has finished");
00066   }
00067 }
00068 
00069 // Initialization method for datagram source
00070 void 
00071 DgramSourceShmem::init()
00072 {
00073   // get input spec
00074   std::vector<std::string> inputs = configList("input");
00075   if (inputs.empty()) throw EmptyInputList(ERR_LOC);
00076   if (inputs.size() > 1) throw DatasetSpecError(ERR_LOC, "more than one dataset", configStr("input"));
00077 
00078   // make dataset out of it
00079   IData::Dataset ds(inputs[0]);
00080 
00081   // need "shmem=name.N" spec
00082   if (not ds.exists("shmem")) throw DatasetSpecError(ERR_LOC, "'shmem' key is missing", inputs[0]);
00083 
00084   std::string shmem = ds.value("shmem");
00085   std::string::size_type p = shmem.rfind('.');
00086   if (p == std::string::npos) throw DatasetSpecError(ERR_LOC, "comma missing in shmem value", inputs[0]);
00087 
00088   std::string tag(shmem, 0, p);
00089   int index = 0;
00090   try {
00091     index = boost::lexical_cast<int>(shmem.substr(p+1));
00092   } catch (const boost::bad_lexical_cast& ex) {
00093     throw DatasetSpecError(ERR_LOC, "bad index in shmem value", inputs[0]);
00094   }
00095 
00096   // stop transition
00097   Pds::TransitionId::Value stopTr = Pds::TransitionId::EndRun;
00098   if (ds.exists("stop")) {
00099     std::string stopTrStr = ds.value("stop");
00100     boost::algorithm::to_lower(stopTrStr);
00101 
00102     if (stopTrStr == "unmap") {
00103       stopTr = Pds::TransitionId::Unmap;
00104     } else if (stopTrStr == "unconfigure") {
00105       stopTr = Pds::TransitionId::Unconfigure;
00106     } else if (stopTrStr == "endrun") {
00107       stopTr = Pds::TransitionId::EndRun;
00108     } else if (stopTrStr == "endcalibcycle") {
00109       stopTr = Pds::TransitionId::EndCalibCycle;
00110     } else if (stopTrStr == "none" or stopTrStr == "no") {
00111       // this transition cannot happen, means non-stop running
00112       stopTr = Pds::TransitionId::NumberOf;
00113     } else {
00114       throw DatasetSpecError(ERR_LOC, "unexpected stop condition", inputs[0]);
00115     }
00116   }
00117 
00118   m_readerThread.reset(new boost::thread(ShmemMonitorClient(tag, index, *m_dgQueue, stopTr)));
00119 }
00120 
00121 //  This method returns next datagram from the source
00122 bool
00123 DgramSourceShmem::next(std::vector<XtcInput::Dgram>& eventDg, std::vector<XtcInput::Dgram>& nonEventDg)
00124 {
00125   XtcInput::Dgram dg = m_dgQueue->pop();
00126   if (not dg.empty()) {
00127     eventDg.push_back(dg);
00128     return true;
00129   } else {
00130     return false;
00131   }
00132 }
00133 
00134 } // namespace PSShmemInput

Generated on 19 Dec 2016 for PSANAclasses by  doxygen 1.4.7