00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 """Deals with batch jobs for correlation analysis
00011
00012 This software was developed for the LCLS project. If you use all or
00013 part of it, please give an appropriate acknowledgment.
00014
00015 @version $Id: BatchJobCorAna.py 9892 2015-04-10 00:57:40Z dubrovin@SLAC.STANFORD.EDU $
00016
00017 @author Mikhail S. Dubrovin
00018 """
00019
00020
00021
00022
00023 __version__ = "$Revision: 9892 $"
00024
00025
00026
00027
00028
00029
00030 from BatchJob import *
00031 from PyQt4 import QtGui, QtCore
00032
00033
00034
00035 class BatchJobCorAna( BatchJob, QtCore.QObject ) :
00036 """Deals with batch jobs for correlation analysis.
00037 """
00038
00039 def __init__ (self) :
00040 """Constructor.
00041 @param fname the file name for ...
00042 """
00043
00044 BatchJob.__init__(self)
00045 QtCore.QObject.__init__(self, None)
00046
00047 self.job_id_cora_split = None
00048 self.time_sub_split = None
00049
00050 self.job_id_cora_merge = None
00051 self.time_sub_merge = None
00052
00053 self.nparts = None
00054 self.init_list_for_proc()
00055
00056
00057
00058 def init_list_for_proc(self) :
00059 """Creates the empty list for proc. containing ing, jobid and time for all processes"""
00060 if cp.bat_img_nparts.value() == self.nparts : return
00061 self.nparts = cp.bat_img_nparts.value()
00062
00063
00064 self.list_for_proc = []
00065 for i in range(self.nparts) :
00066 self.list_for_proc.append([i, None, None])
00067
00068
00069
00070
00071
00072 def make_psana_cfg_file_for_cora_split(self) :
00073 cfg.make_psana_cfg_file_for_cora_split()
00074
00075
00076
00077 def submit_batch_for_cora_split(self) :
00078
00079 if not self.job_can_be_submitted(self.job_id_cora_split, self.time_sub_split, 'cor. ana. split') : return
00080 self.time_sub_split = gu.get_time_sec()
00081
00082 self.make_psana_cfg_file_for_cora_split()
00083
00084 command = 'psana -c ' + fnm.path_cora_split_psana_cfg() + ' ' + fnm.path_data_xtc_cond()
00085 queue = cp.bat_queue.value()
00086 log_file = fnm.path_cora_split_batch_log()
00087
00088
00089 self.job_id_cora_split, out, err = gu.batch_job_submit(command, queue, log_file)
00090
00091
00092
00093 def submit_batch_for_cora_proc(self, ind) :
00094
00095 self.init_list_for_proc()
00096
00097 i, job_id, time_sub = self.list_for_proc[ind]
00098
00099 if not self.job_can_be_submitted(job_id, time_sub, 'cor. ana. proc') : return
00100 time_sub = gu.get_time_sec()
00101
00102 fname = fnm.get_list_of_files_cora_split_work()[ind]
00103
00104 tname = fnm.path_tau_list()
00105 log_file = fnm.get_list_of_files_cora_proc_work_log()[ind]
00106
00107 command = 'corana -f ' + fname
00108 if cp.ana_tau_list_type.value() == 'file' and os.path.exists(tname) : command += ' -t ' + tname
00109 queue = cp.bat_queue.value()
00110
00111
00112
00113
00114 job_id, out, err = gu.batch_job_submit(command, queue, log_file)
00115 self.list_for_proc[ind] = [i, job_id, time_sub]
00116
00117
00118
00119 def submit_batch_for_cora_merge(self) :
00120
00121 if not self.job_can_be_submitted(self.job_id_cora_merge, self.time_sub_merge, 'cor. ana. merge') : return
00122 self.time_sub_merge = gu.get_time_sec()
00123
00124 fname = fnm.get_list_of_files_cora_proc_work()[0]
00125 tname = fnm.path_cora_merge_tau()
00126 log_file = fnm.path_cora_merge_batch_log()
00127
00128 command = 'corana_merge -f ' + fname + ' -t ' + tname
00129 queue = cp.bat_queue.value()
00130
00131
00132 self.job_id_cora_merge, out, err = gu.batch_job_submit(command, queue, log_file)
00133
00134
00135
00136 def check_batch_job_for_cora_split(self) :
00137 self.check_batch_job(self.job_id_cora_split, 'split')
00138
00139 def kill_batch_job_for_cora_split(self) :
00140 self.kill_batch_job(self.job_id_cora_split, 'for split')
00141
00142 def status_for_cora_split_files(self, comment='of split: ') :
00143 return self.status_and_string_for_files(fnm.get_list_of_files_cora_split_all(), comment )
00144
00145 def status_batch_job_for_cora_split(self) :
00146 return self.get_batch_job_status_and_string(self.job_id_cora_split, self.time_sub_split)
00147
00148
00149
00150 def status_for_cora_proc_files(self, comment='of proc: ') :
00151 return self.status_and_string_for_files(fnm.get_list_of_files_cora_proc_check(), comment )
00152
00153 def status_batch_job_for_cora_proc(self, ind) :
00154 i, job_id, time_sub = self.list_for_proc[ind]
00155 return self.get_batch_job_status(job_id, '')
00156
00157 def kill_batch_job_for_cora_proc(self, ind) :
00158 i, job_id, time_sub = self.list_for_proc[ind]
00159 return self.kill_batch_job(job_id, 'for proc')
00160
00161 def status_batch_job_for_cora_proc_all(self) :
00162 ind = 0
00163 i, job_id, time_sub = self.list_for_proc[ind]
00164 return self.get_batch_job_status_and_string(job_id, time_sub)
00165
00166
00167
00168 def check_batch_job_for_cora_merge(self) :
00169 self.check_batch_job(self.job_id_cora_merge, 'merge')
00170
00171 def kill_batch_job_for_cora_merge(self) :
00172 self.kill_batch_job(self.job_id_cora_merge, 'for merge')
00173
00174 def status_for_cora_merge_files(self, comment='of merge: ' ) :
00175 fstatus, fstatus_str = self.status_and_string_for_files(fnm.get_list_of_files_cora_merge(), comment )
00176 if fstatus : cp.res_fname.setValue(fnm.path_cora_merge_result())
00177 return fstatus, fstatus_str
00178
00179 def status_batch_job_for_cora_merge(self) :
00180 return self.get_batch_job_status_and_string(self.job_id_cora_merge, self.time_sub_merge)
00181
00182
00183
00184
00185
00186
00187
00188
00189 def check_work_files_cora(self) :
00190 self.check_files_for_list(fnm.get_list_of_files_cora_split(),'of correlation analysis:')
00191
00192
00193
00194 def remove_files_cora_split(self) :
00195 self.remove_files_for_list(fnm.get_list_of_files_cora_split_all(),'of split:')
00196
00197
00198
00199 def remove_files_cora_proc(self, ind=None) :
00200
00201 if ind is None :
00202 self.list_of_files_to_remove = fnm.get_list_of_files_cora_proc_work() + \
00203 fnm.get_list_of_files_cora_proc_work_log()
00204 self.list_of_files_to_remove.append(fnm.path_cora_proc_tau_out())
00205
00206 else :
00207 self.list_of_files_to_remove = [fnm.get_list_of_files_cora_proc_work()[ind], \
00208 fnm.get_list_of_files_cora_proc_work_log()[ind]]
00209
00210
00211 self.remove_files_for_list(self.list_of_files_to_remove,'of proc:')
00212
00213
00214
00215 def remove_files_cora_merge(self) :
00216 self.remove_files_for_list(fnm.get_list_of_files_cora_merge_main(),'of merge:')
00217
00218
00219
00220 def get_batch_job_id_cora_split(self) :
00221 return self.job_id_cora_split
00222
00223
00224
00225 def get_batch_job_cora_split_time_string(self) :
00226 return gu.get_local_time_str(self.time_sub_split, fmt='%Y-%m-%d %H:%M:%S')
00227
00228
00229
00230 def get_batch_job_id_cora_proc(self, ind) :
00231 return self.list_for_proc[ind][1]
00232
00233
00234
00235 def get_batch_job_cora_proc_time_string(self, ind) :
00236
00237 time_sub_sec = self.list_for_proc[ind][2]
00238 if time_sub_sec is None : return 'Time N/A'
00239 return gu.get_local_time_str(time_sub_sec, fmt='%Y-%m-%d %H:%M:%S')
00240
00241
00242
00243 def get_batch_job_id_cora_merge(self) :
00244 return self.job_id_cora_merge
00245
00246
00247
00248 def get_batch_job_cora_merge_time_string(self) :
00249 return gu.get_local_time_str(self.time_sub_merge, fmt='%Y-%m-%d %H:%M:%S')
00250
00251
00252
00253 def remove_files_cora_all(self):
00254 logger.debug('remove_files_cora_all', __name__)
00255 self.remove_files_cora_split()
00256 self.remove_files_cora_merge()
00257 for i in range(self.nparts) :
00258 self.remove_files_cora_proc(i)
00259
00260
00261
00262
00263
00264
00265
00266 def connectToThread1(self):
00267 try : self.connect( cp.thread1, QtCore.SIGNAL('update(QString)'), self.updateStatus )
00268 except : logger.warning('connectToThread1 IS FAILED !!!', __name__)
00269
00270
00271 def disconnectFromThread1(self):
00272 try : self.disconnect( cp.thread1, QtCore.SIGNAL('update(QString)'), self.updateStatus )
00273 except : logger.warning('disconnectFromThread1 IS FAILED !!!', __name__)
00274
00275
00276 def updateStatus(self, text):
00277
00278 self.auto_processing_status()
00279
00280
00281
00282 def stop_auto_processing(self) :
00283 cp.autoRunStatus = 0
00284 self.kill_all_batch_jobs()
00285 logger.info('Auto-processing IS STOPPED', __name__)
00286 self.disconnectFromThread1()
00287
00288
00289 def kill_all_batch_jobs(self):
00290 logger.debug('kill_all_batch_jobs', __name__)
00291 self.kill_batch_job_for_cora_split()
00292 self.kill_batch_job_for_cora_merge()
00293 for i in range(self.nparts) :
00294 self.kill_batch_job_for_cora_proc(i)
00295
00296
00297
00298 def start_auto_processing(self) :
00299 if cp.autoRunStatus != 0 :
00300 logger.warning('Auto-processing procedure is already active in stage '+str(cp.autoRunStatus), __name__)
00301 else :
00302 self.connectToThread1()
00303 self.remove_files_cora_all()
00304 self.onRunSplit()
00305
00306
00307
00308 def auto_processing_status(self):
00309 if cp.autoRunStatus : self.updateRunState()
00310
00311
00312
00313 def updateRunState(self):
00314 logger.info('Auto run stage '+str(cp.autoRunStatus), __name__)
00315
00316 self.status_split, fstatus_str_split = bjcora.status_for_cora_split_files(comment='')
00317 self.status_proc, fstatus_str_proc = bjcora.status_for_cora_proc_files (comment='')
00318 self.status_merge, fstatus_str_merge = bjcora.status_for_cora_merge_files(comment='')
00319
00320 if cp.autoRunStatus == 1 and self.status_split :
00321 logger.info('updateRunState: Split is completed, begin processing', __name__)
00322 self.onRunProc()
00323
00324 elif cp.autoRunStatus == 2 and self.status_proc :
00325 logger.info('updateRunState: Processing is completed, begin merging', __name__)
00326 self.onRunMerge()
00327
00328 elif cp.autoRunStatus == 3 and self.status_merge :
00329 logger.info('updateRunState: Merging is completed, stop auto-run', __name__)
00330 cp.autoRunStatus = 0
00331 self.disconnectFromThread1()
00332
00333
00334
00335 def onRunSplit(self):
00336 logger.debug('onRunSplit', __name__)
00337 if self.isReadyToStartRunSplit() :
00338 self.submit_batch_for_cora_split()
00339 cp.autoRunStatus = 1
00340
00341
00342 def isReadyToStartRunSplit(self):
00343 msg1 = 'JOB IS NOT SUBMITTED !!!\nFirst, set the number of events for data.'
00344 if (cp.bat_data_end.value() == cp.bat_data_end.value_def()) :
00345 logger.warning(msg1, __name__)
00346 return False
00347
00348 elif(cp.bat_data_start.value() >= cp.bat_data_end.value()) :
00349 logger.warning(msg1, __name__)
00350 return False
00351
00352 else :
00353 return True
00354
00355
00356
00357 def onRunProc(self):
00358 logger.debug('onRunProc', __name__)
00359
00360 for i in range(self.nparts) :
00361 if self.isReadyToStartRunProc(i) :
00362 self.submit_batch_for_cora_proc(i)
00363 cp.autoRunStatus = 2
00364
00365
00366 def isReadyToStartRunProc(self, ind):
00367
00368 fname = fnm.get_list_of_files_cora_split_work()[ind]
00369 if not os.path.exists(fname) :
00370 msg1 = 'JOB IS NOT SUBMITTED !!!\nThe file ' + str(fname) + ' does not exist'
00371 logger.warning(msg1, __name__)
00372 return False
00373
00374 fsize = os.path.getsize(fname)
00375 if fsize < 1 :
00376 msg2 = 'JOB IS NOT SUBMITTED !!!\nThe file ' + str(fname) + ' has wrong size(Byte): ' + str(fsize)
00377 logger.warning(msg2, __name__)
00378 return False
00379
00380 msg3 = 'The file ' + str(fname) + ' exists and its size(Byte): ' + str(fsize)
00381 logger.info(msg3, __name__)
00382 return True
00383
00384
00385
00386 def onRunMerge(self):
00387 logger.debug('onRunMerge', __name__)
00388 if self.isReadyToStartRunMerge() :
00389 self.submit_batch_for_cora_merge()
00390 cp.autoRunStatus = 3
00391
00392
00393 def isReadyToStartRunMerge(self):
00394 fstatus, fstatus_str = bjcora.status_for_cora_proc_files()
00395 if fstatus :
00396 logger.info(fstatus_str, __name__)
00397 return True
00398 else :
00399 msg = 'JOB IS NOT SUBMITTED !!!' + fstatus_str
00400 logger.warning(msg, __name__)
00401 return False
00402
00403
00404
00405
00406
00407
00408
00409 bjcora = BatchJobCorAna ()
00410
00411
00412
00413
00414
00415 if __name__ == "__main__" :
00416
00417 bjcora.submit_batch_for_cora()
00418
00419
00420
00421
00422
00423
00424 sys.exit ( 'End of test for BatchJobCorAna' )
00425
00426