psana_test/src/psanaTestDataMover.py

Go to the documentation of this file.
00001 import os
00002 import sys
00003 import glob
00004 import time
00005 import argparse
00006 import pprint
00007 import multiprocessing
00008 import psana_test.liveModeSimLib as liveModeLib
00009 
00010 programDescription = '''
00011 Moves xtc and smalldata files from a source directory to a destination dir. 
00012 Reads a Python config file with move parameters. 
00013 '''
00014 
00015 programDescriptionEpilog = '''Example Config file:
00016 
00017 delay_xtc = 0.0
00018 delay_smalldata = 0.0
00019 additional_delay_xtc_stream_1 = 0.0
00020 additional_delay_xtc_stream_5 = 0.0
00021 additional_delay_xtc_stream_80 = 0.0
00022 additional_delay_smalldata_stream_1 = 0.0
00023 additional_delay_smalldata_stream_5 = 0.0
00024 additional_delay_smalldata_stream_80 = 0.0
00025 
00026 mb_read_xtc = 4.0
00027 mb_read_smalldata = 0.0
00028 additional_mb_read_xtc_stream_1 = 0.0
00029 additional_mb_read_xtc_stream_5 = 0.0
00030 additional_mb_read_xtc_stream_80 = 0.0
00031 additional_mb_read_smalldata_stream_1 = 0.0
00032 additional_mb_read_smalldata_stream_5 = 0.0
00033 additional_mb_read_smalldata_stream_80 = 0.0
00034 
00035 pause_between_writes_xtc = 0.0
00036 pause_between_writes_smalldata = 0.0
00037 additional_pause_between_writes_xtc_stream_1 = 0.0
00038 additional_pause_between_writes_xtc_stream_5 = 0.0
00039 additional_pause_between_writes_xtc_stream_80 = 0.0
00040 additional_pause_between_writes_smalldata_stream_1 = 0.0
00041 additional_pause_between_writes_smalldata_stream_5 = 0.0
00042 additional_pause_between_writes_smalldata_stream_80 = 0.0
00043 
00044 Note all these parameters can be overridden through command line arguments. For example:
00045 
00046 --delay_xtc 3.3                     this introduces a 3.3 second delay between writes of the big xtc files
00047 --delay_xtc 3.3:s0=.3:s1=-.4:s80=.4 for stream 0, delay is 3.6, stream 1, it is 2.9, and stream 80 3.7
00048                                     all other streams will be  3.3 seconds.  
00049 '''
00050 
00051 class InvalidXtcName(Exception):
00052     def __init__(self, msg):
00053         Exception.__init__(self, msg)
00054 
00055 def parseXtcFileName(xtcFile):
00056     baseName = os.path.basename(xtcFile)
00057     baseStem, ext = os.path.splitext(baseName)
00058     if ext != '.xtc': raise InvalidXtcName(xtcFile)
00059     baseStemFlds = baseStem.split('-')
00060     if len(baseStemFlds) < 4: raise InvalidXtcName(xtcFile)
00061     expStr, runStr, streamStr, chunkStr = baseStemFlds[-4:]
00062     if not expStr.startswith('e'): raise InvalidXtcName(xtcFile)
00063     if not runStr.startswith('r'): raise InvalidXtcName(xtcFile)
00064     if not streamStr.startswith('s'): raise InvalidXtcName(xtcFile)
00065     if not chunkStr.startswith('c'): raise InvalidXtcName(xtcFile)
00066     try:
00067         return (int(expStr[1:]), int(runStr[1:]), int(streamStr[1:]), int(chunkStr[1:]))
00068     except ValueError:
00069         raise InvalidXtcName(xtcFile)
00070 
00071 
00072 def getSmallDataFile(xtcFile):
00073     assert xtcFile.endswith('.xtc')
00074     basedir, basename = os.path.split(xtcFile)
00075     smallDataDir = os.path.join(basedir, 'smalldata')
00076     if os.path.exists(smallDataDir):
00077         smallDataXtcBase = basename[0:-4] + '.smd.xtc'
00078         smallDataXtc = os.path.join(smallDataDir, smallDataXtcBase)
00079         if os.path.exists(smallDataXtc):
00080             return smallDataXtc
00081     return ''
00082 
00083 def getXtcRunFiles(inputdir, run):
00084     xtcFiles = []
00085     for xtcFile in glob.glob(os.path.join(inputdir,'*.xtc')):
00086         try:
00087             parseXtcFileName(xtcFile)
00088             xtcFiles.append(xtcFile)
00089         except InvalidXtcName:
00090             sys.stderr.write("WARNING: bad xtcfile: %s" % xtcFile)
00091     runFiles = [xtcFile for xtcFile in xtcFiles if parseXtcFileName(xtcFile)[1] == run]
00092     if len(runFiles) == 0 and len(xtcFile) > 0:
00093         sys.stderr.write("ERROR: run %d doesn't exist among xtc files in %s" % (run, inputdir))
00094     assert len(runFiles) > 0, "no run files found for run=%d inputdir=%s" % (run, inputdir)
00095     return runFiles
00096 
00097 def indexStreams(runFiles):
00098     stream2chunk2xtc = {}
00099     numberOfSmallData = 0
00100     for xtc in runFiles:
00101         exp, run, stream, chunk = parseXtcFileName(xtc)
00102         smallDataFile = getSmallDataFile(xtc)
00103         if smallDataFile != '':
00104             numberOfSmallData += 1
00105         if stream not in stream2chunk2xtc:
00106             stream2chunk2xtc[stream]={}
00107         stream2chunk2xtc[stream][chunk] = (xtc, smallDataFile)
00108     assert numberOfSmallData == 0 or numberOfSmallData == len(runFiles), "There are %d small data files for the %d run files" % \
00109         (numberOfSmallData, len(runFiles))
00110     stream2xtcs = {}
00111     for stream, chunk2xtc in stream2chunk2xtc.iteritems():
00112         chunks = chunk2xtc.keys()
00113         chunks.sort()
00114         xtcSmallList = [chunk2xtc[chunk] for chunk in chunks]
00115         stream2xtcs[stream] = {'xtc':[el[0] for el in xtcSmallList],
00116                                'smalldata':[el[1] for el in xtcSmallList]}
00117     return stream2xtcs
00118 
00119 def getMoverParams(args, streams):
00120     def commandLineOverride(key, args, stream=None):
00121         if getattr(args,key) is None: return None
00122         argFields = getattr(args,key).split(':')
00123         try:
00124             overall = float(argFields[0])
00125             streamVals = {}
00126             for fld in argFields[1:]:
00127                 streamStr, streamVal = fld.split('=')
00128                 stream = int(streamStr[1:])
00129                 streamVal = float(streamVal)
00130                 streamVals[stream]=streamVal
00131         except:
00132             raise Exception("Could not parse command line argument: %s -> %s\nExample Syntax is --%s 3.3 or --%s 3.3:s0=.4:s5=-1.2:s80=.33" % (key, getattr(args,key), key, key))
00133         if stream is None:
00134             return overall
00135         return streamVals.get(stream,None)
00136             
00137     globalName = '__%s__' % os.path.splitext(os.path.basename(__file__))[0]
00138     configGlobals = { '__name__' : globalName }
00139     configLocals = {}
00140     if args.config is not None:
00141         execfile(args.config, configGlobals, configLocals)
00142 
00143     moverParams = {}
00144     for overallParam in ['delay', 'mb_read', 'pause_between_writes']:
00145         for ftype in ['xtc', 'smalldata']:
00146             overallKey = '%s_%s' % (overallParam, ftype)
00147             moverParams[overallKey] = configLocals.pop(overallKey, 0.0)
00148             cmdLineVal = commandLineOverride(overallKey, args)
00149             if cmdLineVal is not None:
00150                 moverParams[overallKey] = cmdLineVal
00151             for stream in streams:
00152                 streamKey = 'additional_%s_stream_%d' % (overallKey, stream)
00153                 moverParams[streamKey] = configLocals.pop(streamKey, 0.0)
00154                 cmdLineVal = commandLineOverride(overallKey, args, stream)
00155                 if cmdLineVal is not None:
00156                     moverParams[streamKey] = cmdLineVal
00157     if moverParams['mb_read_xtc'] == 0.0:
00158         moverParams['mb_read_xtc'] = 4.0
00159 
00160     for key in configLocals.iterkeys():
00161         sys.stderr.write("WARNING: unknown key %s in config" % key, args.config)
00162 
00163     return moverParams
00164 
00165 def getStreamMoverParams(ftype, stream, moverParams):
00166     params = {}
00167     for key in ['delay', 'mb_read', 'pause_between_writes']:
00168         baseKey = '%s_%s' % (key, ftype)
00169         offsetKey = 'additional_%s_%s_stream_%d' % (key, ftype, stream)
00170         params[key] = moverParams[baseKey] + moverParams[offsetKey]
00171     return params
00172 
00173 def sumFileSizes(filenames):
00174     total = 0
00175     for fname in filenames:
00176         if not os.path.isfile(fname): return 0
00177         total += os.stat(fname).st_size
00178     return total
00179 
00180 def computeDefaultSmallDataRead(xtcRead, xtcFiles, smallFiles):
00181     xtcTotalSize = sumFileSizes(xtcFiles)
00182     smallTotalSize = sumFileSizes(smallFiles)
00183     return xtcRead * (float(smallTotalSize)/float(xtcTotalSize))
00184     
00185 def getAllFiles(stream2xtcFiles):
00186     xtcFiles = []
00187     smallFiles = []
00188     for stream, xtcSmallFiles in stream2xtcFiles.iteritems():
00189         xtcFiles.extend(xtcSmallFiles['xtc'])
00190         smallFiles.extend(xtcSmallFiles['smalldata'])
00191     return xtcFiles, smallFiles
00192 
00193 def constructMoverProcess(srcFile, destFile, moveParams, timeout, verbose, lock, 
00194                           max_mb_write=0.0, overwrite=True):
00195     assert os.path.exists(srcFile), "constructMoverProcess received srcfile that doesn't exist: %s" % srcFile
00196     process = multiprocessing.Process(target=liveModeLib.inProgressCopyWithThrottle,
00197                                       args=(srcFile,
00198                                             destFile,
00199                                             moveParams['delay'],
00200                                             moveParams['mb_read'],
00201                                             '.inprogress',
00202                                             max_mb_write,
00203                                             moveParams['pause_between_writes'],
00204                                             overwrite,
00205                                             verbose,
00206                                             lock,
00207                                             timeout))
00208     return process
00209 
00210 def moveAllChunks(streamFtype2chunkCurrentProcess, stream2xtcFiles, args, lock):
00211     outdir = {'xtc':args.outputdir, 'smalldata':os.path.join(args.outputdir, 'smalldata')}
00212     while True:
00213         time.sleep(1.0)
00214         stillMoving = False
00215         doneKeys = []
00216         newProcesses = {}        
00217         for key, startTimeProcess in streamFtype2chunkCurrentProcess.iteritems():
00218             stream = int(key.split('_')[-1])
00219             ftype = key.split('_')[0]
00220             t0, process = startTimeProcess
00221             if process.is_alive():
00222                 stillMoving = True
00223             else:
00224                 if len(stream2xtcFiles[stream][ftype])==0:
00225                     doneKeys.append(key)
00226                     continue
00227                 srcFile = stream2xtcFiles[stream][ftype].pop(0)
00228                 if not os.path.isfile(srcFile):
00229                     doneKeys.append(key)
00230                     continue
00231                 destFile = os.path.join(outdir[ftype], os.path.basename(srcFile))
00232                 moveParams = getStreamMoverParams(ftype, stream, moverParams)
00233                 if args.timeout > 0:
00234                     newtimeout = max(0.0, args.timeout - (time.time()-t0))
00235                     if args.timeout <= 0:
00236                         doneKeys.append(key)
00237                         continue
00238                 else:
00239                     newtimeout = 0.0
00240                 newProcesses[key] = constructMoverProcess(srcFile, destFile, moveParams, newtimeout, args.verbose, lock)
00241         if len(newProcesses)==0 and (not stillMoving):
00242             break
00243         for key in doneKeys:
00244             del streamFtype2chunkCurrentProcess[key]
00245         for key, newProcess in newProcesses.iteritems():
00246             streamFtype2chunkCurrentProcess[key]=(time.time(), newProcess)
00247             newProcess.start()
00248 
00249 def dataMover(args):
00250     runFiles = getXtcRunFiles(args.inputdir, args.run)
00251     stream2xtcFiles = indexStreams(runFiles)
00252     moverParams = getMoverParams(args, stream2xtcFiles.keys())
00253     if moverParams['mb_read_smalldata'] == 0.0:
00254         xtcFiles, smallFiles = getAllFiles(stream2xtcFiles)
00255         moverParams['mb_read_smalldata'] = computeDefaultSmallDataRead(moverParams['mb_read_xtc'], xtcFiles, smallFiles)
00256     streamFtype2chunkCurrentProcess = {}
00257     lock = multiprocessing.Lock()
00258     xtcOutDir = args.outputdir
00259     smalldataOutDir = os.path.join(args.outputdir, 'smalldata')
00260     for stream in stream2xtcFiles.keys():
00261         for ftype, outdir in zip(['xtc','smalldata'],[xtcOutDir, smalldataOutDir]):
00262             if len(stream2xtcFiles[stream][ftype])==0: continue
00263             srcFile = stream2xtcFiles[stream][ftype].pop(0)
00264             if not os.path.isfile(srcFile):
00265                 continue
00266             destFile = os.path.join(outdir, os.path.basename(srcFile))
00267             moveParams = getStreamMoverParams(ftype, stream, moverParams)
00268             key = '%s_stream_%d' % (ftype, stream)
00269             moverProcess = constructMoverProcess(srcFile, destFile,
00270                                                  moveParams, args.timeout, args.verbose, lock)
00271             streamFtype2chunkCurrentProcess[key] = (time.time(), moverProcess)
00272     
00273     for key, startTimeProcess in streamFtype2chunkCurrentProcess.iteritems():
00274         t0, process = startTimeProcess
00275         process.start()
00276 
00277     try:
00278         moveAllChunks(streamFtype2chunkCurrentProcess, stream2xtcFiles, args, lock)
00279     except KeyboardInterrupt, kb:
00280         print "Killing current processes:"
00281         for timeProcess in streamFtype2chunkCurrentProcess.itervalues():
00282             t0, process = timeProcess
00283             process.terminate()
00284         raise kb
00285 
00286 if __name__ == '__main__':
00287     parser = argparse.ArgumentParser(description=programDescription, 
00288                                      epilog=programDescriptionEpilog,
00289                                      formatter_class=argparse.RawDescriptionHelpFormatter)
00290     parser.add_argument('-i', '--inputdir', type=str, help="input directory", default=None)
00291     parser.add_argument('-r', '--run', type=int, help="which run to do from the input dir", default=None)
00292     parser.add_argument('-o', '--outputdir', type=str, help="output directory", default=None)
00293     parser.add_argument('-c', '--config', type=str, help="config file", default=None)
00294     parser.add_argument('-t', '--timeout', type=float, help="quit moving after this many seconds", default=None)
00295     parser.add_argument('-v', '--verbose', action='store_true', help="verbose output", default=False)
00296 
00297     parser.add_argument('-d', '--delay_xtc', type=str, help="xtc initial delay (override config)", default=None)
00298     parser.add_argument('-s', '--delay_smalldata', type=str, help="smalldata initial delay (override config)", default=None)
00299     parser.add_argument('-p', '--pause_between_writes_xtc', type=str, help="xtc pause between writes (override config)", default=None)
00300     parser.add_argument('-q', '--pause_between_writes_smalldata', type=str, help="smalldata pause between writes (override config)", default=None)
00301     parser.add_argument('-m', '--mb_read_xtc', type=str, help="mb xtcread (override config)", default=None)
00302     parser.add_argument('-n', '--mb_read_smalldata', type=str, help="mb smalldataread (override config)", default=None)
00303     args = parser.parse_args()
00304 
00305     assert args.inputdir is not None, "You must supply input directory with -i"
00306     assert args.outputdir is not None, "You must supply output directory with -o"
00307     assert args.inputdir != args.outputdir, "inputdir can't equal outputdir, this is a datamover"
00308     assert args.run is not None, "You must supply a run with -r"
00309     assert os.path.exists(args.inputdir), "Did not find input directorty: %s" % args.inputdir
00310     assert os.path.exists(args.outputdir), "Did not find output directorty: %s" % args.outputdir
00311 
00312     if args.config is not None:
00313         assert os.path.exists(args.config), "config file not found: %s" % args.config
00314 
00315     dataMover(args)

Generated on 19 Dec 2016 for PSDMSoftware by  doxygen 1.4.7