Source code for pyana.merger
#--------------------------------------------------------------------------
# File and Version Information:
# $Id: merger.py 1091 2010-07-07 22:45:39Z salnikov $
#
# Description:
# Module merger...
#
#------------------------------------------------------------------------
"""Code that merges results of processing from individual jobs
This software was developed for the LUSI project. If you use all or
part of it, please give an appropriate acknowledgment.
@see RelatedModule
@version $Id: merger.py 1091 2010-07-07 22:45:39Z salnikov $
@author Andrei Salnikov
"""
#------------------------------
# Module's version from CVS --
#------------------------------
__version__ = "$Revision: 1091 $"
# $Source$
#--------------------------------
# Imports of standard modules --
#--------------------------------
import sys
import os
import logging
import shutil
import numpy as np
#---------------------------------
# Imports of base class module --
#---------------------------------
#-----------------------------
# Imports for other modules --
#-----------------------------
#----------------------------------
# Local non-exported definitions --
#----------------------------------
_log = logging.getLogger('pyana.merger')
#------------------------
# Exported definitions --
#------------------------
#---------------------
# Class definition --
#---------------------
[docs]class merger ( object ) :
#--------------------
# Class variables --
#--------------------
#----------------
# Constructor --
#----------------
def __init__ ( self ) :
self._files = {}
self._histos = {}
#-------------------
# Public methods --
#-------------------
[docs] def merge( self, result ) :
_log.debug('merging data: %s', result)
if not result: return
files = result.get("files",{})
self.__mergeFiles( files )
histos = result.get("histos",[])
self.__mergeHistos( histos )
[docs] def finish(self, env):
# close all files
for file in self._files.itervalues() : file.close()
# store histograms
if self._histos :
env.hmgr().file().cd()
for h in self._histos.itervalues():
h.Write()
#--------------------------------
# Static/class public methods --
#--------------------------------
#--------------------
# Private methods --
#--------------------
def __mergeFiles(self, files):
for name, tmpname in files.iteritems() :
try :
_log.debug('merging file %s -> %s', tmpname, name)
# open output file if needed
file = self._files.get(name)
if not file :
file = open(name, 'wb')
self._files[name] = file
# open input file
tmpfile = open(tmpname)
# copy data in chunks of 1MB
shutil.copyfileobj(tmpfile, file, 1024*1024)
# delete the temporary file, but do not stop if it fails
try :
os.remove(tmpname)
except StandardError, exc:
_log.error('failed to remove file after merge: %s (%s)', tmpname, exc)
except Exception, exc:
_log.error('file merge failed for file %s (from %s)', name, tmpname)
def __mergeHistos(self, histos):
for h in histos :
name = h.GetName()
_log.debug('merging histogram %s', name)
histo = self._histos.get(name)
if not histo:
self._histos[name] = h
else :
histo.Add(h)
#
# In case someone decides to run this module
#
if __name__ == "__main__" :
# In principle we can try to run test suite for this module,
# have to think about it later. Right now just abort.
sys.exit ( "Module is not supposed to be run as main module" )