00001 import os 00002 import sys 00003 import glob 00004 import time 00005 import argparse 00006 import pprint 00007 import multiprocessing 00008 import psana_test.liveModeSimLib as liveModeLib 00009 00010 programDescription = ''' 00011 Moves xtc and smalldata files from a source directory to a destination dir. 00012 Reads a Python config file with move parameters. 00013 ''' 00014 00015 programDescriptionEpilog = '''Example Config file: 00016 00017 delay_xtc = 0.0 00018 delay_smalldata = 0.0 00019 additional_delay_xtc_stream_1 = 0.0 00020 additional_delay_xtc_stream_5 = 0.0 00021 additional_delay_xtc_stream_80 = 0.0 00022 additional_delay_smalldata_stream_1 = 0.0 00023 additional_delay_smalldata_stream_5 = 0.0 00024 additional_delay_smalldata_stream_80 = 0.0 00025 00026 mb_read_xtc = 4.0 00027 mb_read_smalldata = 0.0 00028 additional_mb_read_xtc_stream_1 = 0.0 00029 additional_mb_read_xtc_stream_5 = 0.0 00030 additional_mb_read_xtc_stream_80 = 0.0 00031 additional_mb_read_smalldata_stream_1 = 0.0 00032 additional_mb_read_smalldata_stream_5 = 0.0 00033 additional_mb_read_smalldata_stream_80 = 0.0 00034 00035 pause_between_writes_xtc = 0.0 00036 pause_between_writes_smalldata = 0.0 00037 additional_pause_between_writes_xtc_stream_1 = 0.0 00038 additional_pause_between_writes_xtc_stream_5 = 0.0 00039 additional_pause_between_writes_xtc_stream_80 = 0.0 00040 additional_pause_between_writes_smalldata_stream_1 = 0.0 00041 additional_pause_between_writes_smalldata_stream_5 = 0.0 00042 additional_pause_between_writes_smalldata_stream_80 = 0.0 00043 00044 Note all these parameters can be overridden through command line arguments. For example: 00045 00046 --delay_xtc 3.3 this introduces a 3.3 second delay between writes of the big xtc files 00047 --delay_xtc 3.3:s0=.3:s1=-.4:s80=.4 for stream 0, delay is 3.6, stream 1, it is 2.9, and stream 80 3.7 00048 all other streams will be 3.3 seconds. 00049 ''' 00050 00051 class InvalidXtcName(Exception): 00052 def __init__(self, msg): 00053 Exception.__init__(self, msg) 00054 00055 def parseXtcFileName(xtcFile): 00056 baseName = os.path.basename(xtcFile) 00057 baseStem, ext = os.path.splitext(baseName) 00058 if ext != '.xtc': raise InvalidXtcName(xtcFile) 00059 baseStemFlds = baseStem.split('-') 00060 if len(baseStemFlds) < 4: raise InvalidXtcName(xtcFile) 00061 expStr, runStr, streamStr, chunkStr = baseStemFlds[-4:] 00062 if not expStr.startswith('e'): raise InvalidXtcName(xtcFile) 00063 if not runStr.startswith('r'): raise InvalidXtcName(xtcFile) 00064 if not streamStr.startswith('s'): raise InvalidXtcName(xtcFile) 00065 if not chunkStr.startswith('c'): raise InvalidXtcName(xtcFile) 00066 try: 00067 return (int(expStr[1:]), int(runStr[1:]), int(streamStr[1:]), int(chunkStr[1:])) 00068 except ValueError: 00069 raise InvalidXtcName(xtcFile) 00070 00071 00072 def getSmallDataFile(xtcFile): 00073 assert xtcFile.endswith('.xtc') 00074 basedir, basename = os.path.split(xtcFile) 00075 smallDataDir = os.path.join(basedir, 'smalldata') 00076 if os.path.exists(smallDataDir): 00077 smallDataXtcBase = basename[0:-4] + '.smd.xtc' 00078 smallDataXtc = os.path.join(smallDataDir, smallDataXtcBase) 00079 if os.path.exists(smallDataXtc): 00080 return smallDataXtc 00081 return '' 00082 00083 def getXtcRunFiles(inputdir, run): 00084 xtcFiles = [] 00085 for xtcFile in glob.glob(os.path.join(inputdir,'*.xtc')): 00086 try: 00087 parseXtcFileName(xtcFile) 00088 xtcFiles.append(xtcFile) 00089 except InvalidXtcName: 00090 sys.stderr.write("WARNING: bad xtcfile: %s" % xtcFile) 00091 runFiles = [xtcFile for xtcFile in xtcFiles if parseXtcFileName(xtcFile)[1] == run] 00092 if len(runFiles) == 0 and len(xtcFile) > 0: 00093 sys.stderr.write("ERROR: run %d doesn't exist among xtc files in %s" % (run, inputdir)) 00094 assert len(runFiles) > 0, "no run files found for run=%d inputdir=%s" % (run, inputdir) 00095 return runFiles 00096 00097 def indexStreams(runFiles): 00098 stream2chunk2xtc = {} 00099 numberOfSmallData = 0 00100 for xtc in runFiles: 00101 exp, run, stream, chunk = parseXtcFileName(xtc) 00102 smallDataFile = getSmallDataFile(xtc) 00103 if smallDataFile != '': 00104 numberOfSmallData += 1 00105 if stream not in stream2chunk2xtc: 00106 stream2chunk2xtc[stream]={} 00107 stream2chunk2xtc[stream][chunk] = (xtc, smallDataFile) 00108 assert numberOfSmallData == 0 or numberOfSmallData == len(runFiles), "There are %d small data files for the %d run files" % \ 00109 (numberOfSmallData, len(runFiles)) 00110 stream2xtcs = {} 00111 for stream, chunk2xtc in stream2chunk2xtc.iteritems(): 00112 chunks = chunk2xtc.keys() 00113 chunks.sort() 00114 xtcSmallList = [chunk2xtc[chunk] for chunk in chunks] 00115 stream2xtcs[stream] = {'xtc':[el[0] for el in xtcSmallList], 00116 'smalldata':[el[1] for el in xtcSmallList]} 00117 return stream2xtcs 00118 00119 def getMoverParams(args, streams): 00120 def commandLineOverride(key, args, stream=None): 00121 if getattr(args,key) is None: return None 00122 argFields = getattr(args,key).split(':') 00123 try: 00124 overall = float(argFields[0]) 00125 streamVals = {} 00126 for fld in argFields[1:]: 00127 streamStr, streamVal = fld.split('=') 00128 stream = int(streamStr[1:]) 00129 streamVal = float(streamVal) 00130 streamVals[stream]=streamVal 00131 except: 00132 raise Exception("Could not parse command line argument: %s -> %s\nExample Syntax is --%s 3.3 or --%s 3.3:s0=.4:s5=-1.2:s80=.33" % (key, getattr(args,key), key, key)) 00133 if stream is None: 00134 return overall 00135 return streamVals.get(stream,None) 00136 00137 globalName = '__%s__' % os.path.splitext(os.path.basename(__file__))[0] 00138 configGlobals = { '__name__' : globalName } 00139 configLocals = {} 00140 if args.config is not None: 00141 execfile(args.config, configGlobals, configLocals) 00142 00143 moverParams = {} 00144 for overallParam in ['delay', 'mb_read', 'pause_between_writes']: 00145 for ftype in ['xtc', 'smalldata']: 00146 overallKey = '%s_%s' % (overallParam, ftype) 00147 moverParams[overallKey] = configLocals.pop(overallKey, 0.0) 00148 cmdLineVal = commandLineOverride(overallKey, args) 00149 if cmdLineVal is not None: 00150 moverParams[overallKey] = cmdLineVal 00151 for stream in streams: 00152 streamKey = 'additional_%s_stream_%d' % (overallKey, stream) 00153 moverParams[streamKey] = configLocals.pop(streamKey, 0.0) 00154 cmdLineVal = commandLineOverride(overallKey, args, stream) 00155 if cmdLineVal is not None: 00156 moverParams[streamKey] = cmdLineVal 00157 if moverParams['mb_read_xtc'] == 0.0: 00158 moverParams['mb_read_xtc'] = 4.0 00159 00160 for key in configLocals.iterkeys(): 00161 sys.stderr.write("WARNING: unknown key %s in config" % key, args.config) 00162 00163 return moverParams 00164 00165 def getStreamMoverParams(ftype, stream, moverParams): 00166 params = {} 00167 for key in ['delay', 'mb_read', 'pause_between_writes']: 00168 baseKey = '%s_%s' % (key, ftype) 00169 offsetKey = 'additional_%s_%s_stream_%d' % (key, ftype, stream) 00170 params[key] = moverParams[baseKey] + moverParams[offsetKey] 00171 return params 00172 00173 def sumFileSizes(filenames): 00174 total = 0 00175 for fname in filenames: 00176 if not os.path.isfile(fname): return 0 00177 total += os.stat(fname).st_size 00178 return total 00179 00180 def computeDefaultSmallDataRead(xtcRead, xtcFiles, smallFiles): 00181 xtcTotalSize = sumFileSizes(xtcFiles) 00182 smallTotalSize = sumFileSizes(smallFiles) 00183 return xtcRead * (float(smallTotalSize)/float(xtcTotalSize)) 00184 00185 def getAllFiles(stream2xtcFiles): 00186 xtcFiles = [] 00187 smallFiles = [] 00188 for stream, xtcSmallFiles in stream2xtcFiles.iteritems(): 00189 xtcFiles.extend(xtcSmallFiles['xtc']) 00190 smallFiles.extend(xtcSmallFiles['smalldata']) 00191 return xtcFiles, smallFiles 00192 00193 def constructMoverProcess(srcFile, destFile, moveParams, timeout, verbose, lock, 00194 max_mb_write=0.0, overwrite=True): 00195 assert os.path.exists(srcFile), "constructMoverProcess received srcfile that doesn't exist: %s" % srcFile 00196 process = multiprocessing.Process(target=liveModeLib.inProgressCopyWithThrottle, 00197 args=(srcFile, 00198 destFile, 00199 moveParams['delay'], 00200 moveParams['mb_read'], 00201 '.inprogress', 00202 max_mb_write, 00203 moveParams['pause_between_writes'], 00204 overwrite, 00205 verbose, 00206 lock, 00207 timeout)) 00208 return process 00209 00210 def moveAllChunks(streamFtype2chunkCurrentProcess, stream2xtcFiles, args, lock): 00211 outdir = {'xtc':args.outputdir, 'smalldata':os.path.join(args.outputdir, 'smalldata')} 00212 while True: 00213 time.sleep(1.0) 00214 stillMoving = False 00215 doneKeys = [] 00216 newProcesses = {} 00217 for key, startTimeProcess in streamFtype2chunkCurrentProcess.iteritems(): 00218 stream = int(key.split('_')[-1]) 00219 ftype = key.split('_')[0] 00220 t0, process = startTimeProcess 00221 if process.is_alive(): 00222 stillMoving = True 00223 else: 00224 if len(stream2xtcFiles[stream][ftype])==0: 00225 doneKeys.append(key) 00226 continue 00227 srcFile = stream2xtcFiles[stream][ftype].pop(0) 00228 if not os.path.isfile(srcFile): 00229 doneKeys.append(key) 00230 continue 00231 destFile = os.path.join(outdir[ftype], os.path.basename(srcFile)) 00232 moveParams = getStreamMoverParams(ftype, stream, moverParams) 00233 if args.timeout > 0: 00234 newtimeout = max(0.0, args.timeout - (time.time()-t0)) 00235 if args.timeout <= 0: 00236 doneKeys.append(key) 00237 continue 00238 else: 00239 newtimeout = 0.0 00240 newProcesses[key] = constructMoverProcess(srcFile, destFile, moveParams, newtimeout, args.verbose, lock) 00241 if len(newProcesses)==0 and (not stillMoving): 00242 break 00243 for key in doneKeys: 00244 del streamFtype2chunkCurrentProcess[key] 00245 for key, newProcess in newProcesses.iteritems(): 00246 streamFtype2chunkCurrentProcess[key]=(time.time(), newProcess) 00247 newProcess.start() 00248 00249 def dataMover(args): 00250 runFiles = getXtcRunFiles(args.inputdir, args.run) 00251 stream2xtcFiles = indexStreams(runFiles) 00252 moverParams = getMoverParams(args, stream2xtcFiles.keys()) 00253 if moverParams['mb_read_smalldata'] == 0.0: 00254 xtcFiles, smallFiles = getAllFiles(stream2xtcFiles) 00255 moverParams['mb_read_smalldata'] = computeDefaultSmallDataRead(moverParams['mb_read_xtc'], xtcFiles, smallFiles) 00256 streamFtype2chunkCurrentProcess = {} 00257 lock = multiprocessing.Lock() 00258 xtcOutDir = args.outputdir 00259 smalldataOutDir = os.path.join(args.outputdir, 'smalldata') 00260 for stream in stream2xtcFiles.keys(): 00261 for ftype, outdir in zip(['xtc','smalldata'],[xtcOutDir, smalldataOutDir]): 00262 if len(stream2xtcFiles[stream][ftype])==0: continue 00263 srcFile = stream2xtcFiles[stream][ftype].pop(0) 00264 if not os.path.isfile(srcFile): 00265 continue 00266 destFile = os.path.join(outdir, os.path.basename(srcFile)) 00267 moveParams = getStreamMoverParams(ftype, stream, moverParams) 00268 key = '%s_stream_%d' % (ftype, stream) 00269 moverProcess = constructMoverProcess(srcFile, destFile, 00270 moveParams, args.timeout, args.verbose, lock) 00271 streamFtype2chunkCurrentProcess[key] = (time.time(), moverProcess) 00272 00273 for key, startTimeProcess in streamFtype2chunkCurrentProcess.iteritems(): 00274 t0, process = startTimeProcess 00275 process.start() 00276 00277 try: 00278 moveAllChunks(streamFtype2chunkCurrentProcess, stream2xtcFiles, args, lock) 00279 except KeyboardInterrupt, kb: 00280 print "Killing current processes:" 00281 for timeProcess in streamFtype2chunkCurrentProcess.itervalues(): 00282 t0, process = timeProcess 00283 process.terminate() 00284 raise kb 00285 00286 if __name__ == '__main__': 00287 parser = argparse.ArgumentParser(description=programDescription, 00288 epilog=programDescriptionEpilog, 00289 formatter_class=argparse.RawDescriptionHelpFormatter) 00290 parser.add_argument('-i', '--inputdir', type=str, help="input directory", default=None) 00291 parser.add_argument('-r', '--run', type=int, help="which run to do from the input dir", default=None) 00292 parser.add_argument('-o', '--outputdir', type=str, help="output directory", default=None) 00293 parser.add_argument('-c', '--config', type=str, help="config file", default=None) 00294 parser.add_argument('-t', '--timeout', type=float, help="quit moving after this many seconds", default=None) 00295 parser.add_argument('-v', '--verbose', action='store_true', help="verbose output", default=False) 00296 00297 parser.add_argument('-d', '--delay_xtc', type=str, help="xtc initial delay (override config)", default=None) 00298 parser.add_argument('-s', '--delay_smalldata', type=str, help="smalldata initial delay (override config)", default=None) 00299 parser.add_argument('-p', '--pause_between_writes_xtc', type=str, help="xtc pause between writes (override config)", default=None) 00300 parser.add_argument('-q', '--pause_between_writes_smalldata', type=str, help="smalldata pause between writes (override config)", default=None) 00301 parser.add_argument('-m', '--mb_read_xtc', type=str, help="mb xtcread (override config)", default=None) 00302 parser.add_argument('-n', '--mb_read_smalldata', type=str, help="mb smalldataread (override config)", default=None) 00303 args = parser.parse_args() 00304 00305 assert args.inputdir is not None, "You must supply input directory with -i" 00306 assert args.outputdir is not None, "You must supply output directory with -o" 00307 assert args.inputdir != args.outputdir, "inputdir can't equal outputdir, this is a datamover" 00308 assert args.run is not None, "You must supply a run with -r" 00309 assert os.path.exists(args.inputdir), "Did not find input directorty: %s" % args.inputdir 00310 assert os.path.exists(args.outputdir), "Did not find output directorty: %s" % args.outputdir 00311 00312 if args.config is not None: 00313 assert os.path.exists(args.config), "config file not found: %s" % args.config 00314 00315 dataMover(args)