CorAna/src/BatchJobCorAna.py

Go to the documentation of this file.
00001 #--------------------------------------------------------------------------
00002 # File and Version Information:
00003 #  $Id: BatchJobCorAna.py 9892 2015-04-10 00:57:40Z dubrovin@SLAC.STANFORD.EDU $
00004 #
00005 # Description:
00006 #  Module BatchJobCorAna...
00007 #
00008 #------------------------------------------------------------------------
00009 
00010 """Deals with batch jobs for correlation analysis
00011 
00012 This software was developed for the LCLS project.  If you use all or 
00013 part of it, please give an appropriate acknowledgment.
00014 
00015 @version $Id: BatchJobCorAna.py 9892 2015-04-10 00:57:40Z dubrovin@SLAC.STANFORD.EDU $
00016 
00017 @author Mikhail S. Dubrovin
00018 """
00019 
00020 #------------------------------
00021 #  Module's version from CVS --
00022 #------------------------------
00023 __version__ = "$Revision: 9892 $"
00024 # $Source$
00025 
00026 #--------------------------------
00027 #  Imports of standard modules --
00028 #--------------------------------
00029 
00030 from BatchJob import *
00031 from PyQt4 import QtGui, QtCore # need it in order to use QtCore.QObject for connect
00032 
00033 #-----------------------------
00034 
00035 class BatchJobCorAna( BatchJob, QtCore.QObject ) : # need in QtCore.QObject in order to connect to signals
00036     """Deals with batch jobs for correlation analysis.
00037     """
00038 
00039     def __init__ (self) :
00040         """Constructor.
00041         @param fname the file name for ...
00042         """
00043 
00044         BatchJob.__init__(self)
00045         QtCore.QObject.__init__(self, None)
00046 
00047         self.job_id_cora_split = None
00048         self.time_sub_split    = None
00049 
00050         self.job_id_cora_merge = None
00051         self.time_sub_merge    = None
00052 
00053         self.nparts            = None
00054         self.init_list_for_proc()
00055         
00056 #-----------------------------
00057 
00058     def init_list_for_proc(self) :
00059         """Creates the empty list for proc. containing ing, jobid and time for all processes"""
00060         if cp.bat_img_nparts.value() == self.nparts : return
00061         self.nparts = cp.bat_img_nparts.value()
00062         #print 'self.nparts:', self.nparts
00063 
00064         self.list_for_proc = []
00065         for i in range(self.nparts) :
00066             self.list_for_proc.append([i, None, None])
00067 
00068         #print 'self.list_for_proc =', self.list_for_proc
00069 
00070 #-----------------------------
00071 
00072     def     make_psana_cfg_file_for_cora_split(self) :
00073         cfg.make_psana_cfg_file_for_cora_split()
00074 
00075 #-----------------------------
00076 
00077     def submit_batch_for_cora_split(self) :
00078 
00079         if not self.job_can_be_submitted(self.job_id_cora_split, self.time_sub_split, 'cor. ana. split') : return
00080         self.time_sub_split = gu.get_time_sec()
00081 
00082         self.make_psana_cfg_file_for_cora_split()
00083 
00084         command  = 'psana -c ' + fnm.path_cora_split_psana_cfg() + ' ' + fnm.path_data_xtc_cond()
00085         queue    = cp.bat_queue.value()
00086         log_file = fnm.path_cora_split_batch_log()
00087 
00088         #print "!!!!!!!!! WARNING gu.batch_job_submit(...) IS COMMENTED in BatchJobCorAna.py !!!!!!!!! "
00089         self.job_id_cora_split, out, err = gu.batch_job_submit(command, queue, log_file)
00090 
00091 #-----------------------------
00092 
00093     def submit_batch_for_cora_proc(self, ind) :
00094 
00095         self.init_list_for_proc()
00096 
00097         i, job_id, time_sub = self.list_for_proc[ind] 
00098 
00099         if not self.job_can_be_submitted(job_id, time_sub, 'cor. ana. proc') : return
00100         time_sub = gu.get_time_sec()
00101 
00102         fname    = fnm.get_list_of_files_cora_split_work()[ind]
00103         #tname    = fnm.path_cora_proc_tau_in()
00104         tname    = fnm.path_tau_list()
00105         log_file = fnm.get_list_of_files_cora_proc_work_log()[ind]
00106 
00107         command  = 'corana -f ' + fname # + ' -l ' + log_file
00108         if cp.ana_tau_list_type.value() == 'file' and os.path.exists(tname) : command +=   ' -t ' + tname
00109         queue    = cp.bat_queue.value()
00110 
00111         #print 'command  =', command
00112         #print 'log_file =', log_file, '\n'
00113   
00114         job_id, out, err = gu.batch_job_submit(command, queue, log_file)
00115         self.list_for_proc[ind] = [i, job_id, time_sub]
00116 
00117 #-----------------------------
00118 
00119     def submit_batch_for_cora_merge(self) :
00120 
00121         if not self.job_can_be_submitted(self.job_id_cora_merge, self.time_sub_merge, 'cor. ana. merge') : return
00122         self.time_sub_merge = gu.get_time_sec()
00123 
00124         fname    = fnm.get_list_of_files_cora_proc_work()[0]
00125         tname    = fnm.path_cora_merge_tau()
00126         log_file = fnm.path_cora_merge_batch_log()
00127 
00128         command  = 'corana_merge -f ' + fname + ' -t ' + tname
00129         queue    = cp.bat_queue.value()
00130 
00131         #print 'command =', command
00132         self.job_id_cora_merge, out, err = gu.batch_job_submit(command, queue, log_file)
00133 
00134 #-----------------------------
00135 
00136     def check_batch_job_for_cora_split(self) :
00137         self.check_batch_job(self.job_id_cora_split, 'split')
00138 
00139     def kill_batch_job_for_cora_split(self) :
00140         self.kill_batch_job(self.job_id_cora_split, 'for split')
00141 
00142     def status_for_cora_split_files(self, comment='of split: ') :
00143         return self.status_and_string_for_files(fnm.get_list_of_files_cora_split_all(), comment )
00144 
00145     def status_batch_job_for_cora_split(self) :
00146         return self.get_batch_job_status_and_string(self.job_id_cora_split, self.time_sub_split)
00147 
00148 #-----------------------------
00149 
00150     def status_for_cora_proc_files(self, comment='of proc: ') :
00151         return self.status_and_string_for_files(fnm.get_list_of_files_cora_proc_check(), comment )
00152 
00153     def status_batch_job_for_cora_proc(self, ind) :
00154         i, job_id, time_sub =  self.list_for_proc[ind]
00155         return self.get_batch_job_status(job_id, '')
00156 
00157     def kill_batch_job_for_cora_proc(self, ind) :
00158         i, job_id, time_sub =  self.list_for_proc[ind]
00159         return self.kill_batch_job(job_id, 'for proc')
00160 
00161     def status_batch_job_for_cora_proc_all(self) :
00162         ind = 0
00163         i, job_id, time_sub =  self.list_for_proc[ind]
00164         return self.get_batch_job_status_and_string(job_id, time_sub)
00165 
00166 #-----------------------------
00167 
00168     def check_batch_job_for_cora_merge(self) :
00169         self.check_batch_job(self.job_id_cora_merge, 'merge')
00170 
00171     def kill_batch_job_for_cora_merge(self) :
00172         self.kill_batch_job(self.job_id_cora_merge, 'for merge')
00173 
00174     def status_for_cora_merge_files(self, comment='of merge: ' ) :
00175         fstatus, fstatus_str = self.status_and_string_for_files(fnm.get_list_of_files_cora_merge(), comment )
00176         if fstatus : cp.res_fname.setValue(fnm.path_cora_merge_result())
00177         return fstatus, fstatus_str
00178 
00179     def status_batch_job_for_cora_merge(self) :
00180         return self.get_batch_job_status_and_string(self.job_id_cora_merge, self.time_sub_merge)
00181 
00182 #-----------------------------
00183 
00184 #    def print_work_files_for_data_aver(self) :
00185 #        self.print_files_for_list(fnm.get_list_of_files_cora_split(),'of correlation analysis:')
00186 
00187 #-----------------------------
00188 
00189     def check_work_files_cora(self) :
00190         self.check_files_for_list(fnm.get_list_of_files_cora_split(),'of correlation analysis:')
00191 
00192 #-----------------------------
00193 
00194     def remove_files_cora_split(self) :
00195         self.remove_files_for_list(fnm.get_list_of_files_cora_split_all(),'of split:')
00196 
00197 #-----------------------------
00198 
00199     def remove_files_cora_proc(self, ind=None) :
00200 
00201         if ind is None :
00202             self.list_of_files_to_remove = fnm.get_list_of_files_cora_proc_work() + \
00203                                            fnm.get_list_of_files_cora_proc_work_log()
00204             self.list_of_files_to_remove.append(fnm.path_cora_proc_tau_out()) 
00205 
00206         else :
00207             self.list_of_files_to_remove = [fnm.get_list_of_files_cora_proc_work()[ind], \
00208                                            fnm.get_list_of_files_cora_proc_work_log()[ind]]
00209 
00210         #print 'self.list_of_files_to_remove =\n', self.list_of_files_to_remove
00211         self.remove_files_for_list(self.list_of_files_to_remove,'of proc:')
00212 
00213 #-----------------------------
00214 
00215     def remove_files_cora_merge(self) :
00216         self.remove_files_for_list(fnm.get_list_of_files_cora_merge_main(),'of merge:')
00217 
00218 #-----------------------------
00219 
00220     def get_batch_job_id_cora_split(self) :
00221         return self.job_id_cora_split
00222 
00223 #-----------------------------
00224 
00225     def get_batch_job_cora_split_time_string(self) :
00226         return gu.get_local_time_str(self.time_sub_split, fmt='%Y-%m-%d %H:%M:%S')
00227 
00228 #-----------------------------
00229 
00230     def get_batch_job_id_cora_proc(self, ind) :
00231         return self.list_for_proc[ind][1]
00232 
00233 #-----------------------------
00234 
00235     def get_batch_job_cora_proc_time_string(self, ind) :
00236         #print 'ind:', ind
00237         time_sub_sec = self.list_for_proc[ind][2]
00238         if time_sub_sec is None : return 'Time N/A'
00239         return gu.get_local_time_str(time_sub_sec, fmt='%Y-%m-%d %H:%M:%S')
00240 
00241 #-----------------------------
00242 
00243     def get_batch_job_id_cora_merge(self) :
00244         return self.job_id_cora_merge
00245 
00246 #-----------------------------
00247 
00248     def get_batch_job_cora_merge_time_string(self) :
00249         return gu.get_local_time_str(self.time_sub_merge, fmt='%Y-%m-%d %H:%M:%S')
00250 
00251 #-----------------------------
00252 
00253     def remove_files_cora_all(self):
00254         logger.debug('remove_files_cora_all', __name__)
00255         self.remove_files_cora_split()
00256         self.remove_files_cora_merge()
00257         for i in range(self.nparts) :
00258             self.remove_files_cora_proc(i)
00259 
00260 #-----------------------------
00261 #-----------------------------
00262 #----- AUTO-PROCESSING -------
00263 #-----------------------------
00264 #-----------------------------
00265 
00266     def connectToThread1(self):
00267         try : self.connect( cp.thread1, QtCore.SIGNAL('update(QString)'), self.updateStatus )
00268         except : logger.warning('connectToThread1 IS FAILED !!!', __name__)
00269 
00270 
00271     def disconnectFromThread1(self):
00272         try : self.disconnect( cp.thread1, QtCore.SIGNAL('update(QString)'), self.updateStatus )
00273         except : logger.warning('disconnectFromThread1 IS FAILED !!!', __name__)
00274 
00275 
00276     def updateStatus(self, text):
00277         #print 'BatchJobCorAna: Signal is recieved ' + str(text)
00278         self.auto_processing_status()
00279 
00280 #-----------------------------
00281 
00282     def stop_auto_processing(self) :
00283         cp.autoRunStatus = 0            
00284         self.kill_all_batch_jobs()
00285         logger.info('Auto-processing IS STOPPED', __name__)
00286         self.disconnectFromThread1()
00287 
00288 
00289     def kill_all_batch_jobs(self):
00290         logger.debug('kill_all_batch_jobs', __name__)
00291         self.kill_batch_job_for_cora_split()
00292         self.kill_batch_job_for_cora_merge()
00293         for i in range(self.nparts) :
00294             self.kill_batch_job_for_cora_proc(i)
00295 
00296 #-----------------------------
00297 
00298     def start_auto_processing(self) :
00299         if cp.autoRunStatus != 0 :            
00300             logger.warning('Auto-processing procedure is already active in stage '+str(cp.autoRunStatus), __name__)
00301         else :
00302             self.connectToThread1()
00303             self.remove_files_cora_all()
00304             self.onRunSplit()
00305 
00306 #-----------------------------
00307 
00308     def auto_processing_status(self):
00309         if cp.autoRunStatus : self.updateRunState()
00310 
00311 #-----------------------------
00312 
00313     def updateRunState(self):
00314         logger.info('Auto run stage '+str(cp.autoRunStatus), __name__)
00315 
00316         self.status_split, fstatus_str_split = bjcora.status_for_cora_split_files(comment='')
00317         self.status_proc,  fstatus_str_proc  = bjcora.status_for_cora_proc_files (comment='')
00318         self.status_merge, fstatus_str_merge = bjcora.status_for_cora_merge_files(comment='')
00319 
00320         if   cp.autoRunStatus == 1 and self.status_split :            
00321             logger.info('updateRunState: Split is completed, begin processing', __name__)
00322             self.onRunProc()
00323 
00324         elif cp.autoRunStatus == 2 and self.status_proc : 
00325             logger.info('updateRunState: Processing is completed, begin merging', __name__)
00326             self.onRunMerge()
00327 
00328         elif cp.autoRunStatus == 3 and self.status_merge : 
00329             logger.info('updateRunState: Merging is completed, stop auto-run', __name__)
00330             cp.autoRunStatus = 0            
00331             self.disconnectFromThread1()
00332         
00333 #-----------------------------
00334 
00335     def onRunSplit(self):
00336         logger.debug('onRunSplit', __name__)
00337         if self.isReadyToStartRunSplit() :
00338             self.submit_batch_for_cora_split()
00339             cp.autoRunStatus = 1
00340 
00341 
00342     def isReadyToStartRunSplit(self):
00343         msg1 = 'JOB IS NOT SUBMITTED !!!\nFirst, set the number of events for data.'
00344         if  (cp.bat_data_end.value() == cp.bat_data_end.value_def()) :
00345             logger.warning(msg1, __name__)
00346             return False
00347 
00348         elif(cp.bat_data_start.value() >= cp.bat_data_end.value()) :
00349             logger.warning(msg1, __name__)
00350             return False
00351 
00352         else :
00353             return True
00354 
00355 #-----------------------------
00356 
00357     def onRunProc(self):
00358         logger.debug('onRunProc', __name__)
00359 
00360         for i in range(self.nparts) :
00361             if self.isReadyToStartRunProc(i) :
00362                 self.submit_batch_for_cora_proc(i)
00363                 cp.autoRunStatus = 2
00364 
00365 
00366     def isReadyToStartRunProc(self, ind):
00367 
00368         fname = fnm.get_list_of_files_cora_split_work()[ind]
00369         if not os.path.exists(fname) :
00370             msg1 = 'JOB IS NOT SUBMITTED !!!\nThe file ' + str(fname) + ' does not exist'
00371             logger.warning(msg1, __name__)
00372             return False
00373 
00374         fsize = os.path.getsize(fname)
00375         if fsize < 1 :
00376             msg2 = 'JOB IS NOT SUBMITTED !!!\nThe file ' + str(fname) + ' has wrong size(Byte): ' + str(fsize) 
00377             logger.warning(msg2, __name__)
00378             return False
00379 
00380         msg3 = 'The file ' + str(fname) + ' exists and its size(Byte): ' + str(fsize) 
00381         logger.info(msg3, __name__)
00382         return True
00383 
00384 #-----------------------------
00385 
00386     def onRunMerge(self):
00387         logger.debug('onRunMerge', __name__)
00388         if self.isReadyToStartRunMerge() :
00389             self.submit_batch_for_cora_merge()
00390             cp.autoRunStatus = 3
00391 
00392 
00393     def isReadyToStartRunMerge(self):
00394         fstatus, fstatus_str = bjcora.status_for_cora_proc_files()
00395         if fstatus : 
00396             logger.info(fstatus_str, __name__)
00397             return True
00398         else :
00399             msg = 'JOB IS NOT SUBMITTED !!!' + fstatus_str
00400             logger.warning(msg, __name__)
00401             return False
00402 
00403 #-----------------------------
00404 #-----------------------------
00405 #-----------------------------
00406 #-----------------------------
00407 #-----------------------------
00408 
00409 bjcora = BatchJobCorAna ()
00410 
00411 #-----------------------------
00412 #
00413 #  In case someone decides to run this module
00414 #
00415 if __name__ == "__main__" :
00416 
00417     bjcora.submit_batch_for_cora()
00418     #gu.sleep_sec(5)
00419 
00420     #bjcora.submit_batch_for_data_scan_on_data_xtc()
00421     #bjcora.print_work_files_for_data_aver()
00422     #bjcora.check_work_files_for_data_aver()
00423 
00424     sys.exit ( 'End of test for BatchJobCorAna' )
00425 
00426 #-----------------------------

Generated on 19 Dec 2016 for PSDMSoftware by  doxygen 1.4.7