00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include "PSShmemInput/DgramSourceShmem.h"
00017
00018
00019
00020
00021 #include <boost/lexical_cast.hpp>
00022 #include <boost/algorithm/string.hpp>
00023
00024
00025
00026
00027 #include "IData/Dataset.h"
00028 #include "MsgLogger/MsgLogger.h"
00029 #include "PSShmemInput/Exceptions.h"
00030 #include "PSShmemInput/ShmemMonitorClient.h"
00031 #include "XtcInput/DgramQueue.h"
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041 namespace PSShmemInput {
00042
00043
00044
00045
00046 DgramSourceShmem::DgramSourceShmem (const std::string& name)
00047 : PSXtcInput::IDatagramSource()
00048 , psana::Configurable(name)
00049 , m_dgQueue(new XtcInput::DgramQueue(3))
00050 , m_readerThread()
00051 {
00052 }
00053
00054
00055
00056
00057 DgramSourceShmem::~DgramSourceShmem ()
00058 {
00059 if (m_readerThread) {
00060
00061 m_readerThread->interrupt();
00062 MsgLog(name(), debug, "wait for reader thread to finish");
00063
00064 m_readerThread->join();
00065 MsgLog(name(), debug, "reader thread has finished");
00066 }
00067 }
00068
00069
00070 void
00071 DgramSourceShmem::init()
00072 {
00073
00074 std::vector<std::string> inputs = configList("input");
00075 if (inputs.empty()) throw EmptyInputList(ERR_LOC);
00076 if (inputs.size() > 1) throw DatasetSpecError(ERR_LOC, "more than one dataset", configStr("input"));
00077
00078
00079 IData::Dataset ds(inputs[0]);
00080
00081
00082 if (not ds.exists("shmem")) throw DatasetSpecError(ERR_LOC, "'shmem' key is missing", inputs[0]);
00083
00084 std::string shmem = ds.value("shmem");
00085 std::string::size_type p = shmem.rfind('.');
00086 if (p == std::string::npos) throw DatasetSpecError(ERR_LOC, "comma missing in shmem value", inputs[0]);
00087
00088 std::string tag(shmem, 0, p);
00089 int index = 0;
00090 try {
00091 index = boost::lexical_cast<int>(shmem.substr(p+1));
00092 } catch (const boost::bad_lexical_cast& ex) {
00093 throw DatasetSpecError(ERR_LOC, "bad index in shmem value", inputs[0]);
00094 }
00095
00096
00097 Pds::TransitionId::Value stopTr = Pds::TransitionId::EndRun;
00098 if (ds.exists("stop")) {
00099 std::string stopTrStr = ds.value("stop");
00100 boost::algorithm::to_lower(stopTrStr);
00101
00102 if (stopTrStr == "unmap") {
00103 stopTr = Pds::TransitionId::Unmap;
00104 } else if (stopTrStr == "unconfigure") {
00105 stopTr = Pds::TransitionId::Unconfigure;
00106 } else if (stopTrStr == "endrun") {
00107 stopTr = Pds::TransitionId::EndRun;
00108 } else if (stopTrStr == "endcalibcycle") {
00109 stopTr = Pds::TransitionId::EndCalibCycle;
00110 } else if (stopTrStr == "none" or stopTrStr == "no") {
00111
00112 stopTr = Pds::TransitionId::NumberOf;
00113 } else {
00114 throw DatasetSpecError(ERR_LOC, "unexpected stop condition", inputs[0]);
00115 }
00116 }
00117
00118 m_readerThread.reset(new boost::thread(ShmemMonitorClient(tag, index, *m_dgQueue, stopTr)));
00119 }
00120
00121
00122 bool
00123 DgramSourceShmem::next(std::vector<XtcInput::Dgram>& eventDg, std::vector<XtcInput::Dgram>& nonEventDg)
00124 {
00125 XtcInput::Dgram dg = m_dgQueue->pop();
00126 if (not dg.empty()) {
00127 eventDg.push_back(dg);
00128 return true;
00129 } else {
00130 return false;
00131 }
00132 }
00133
00134 }