Source code for PSCalib.DCUtils

#--------------------------------------------------------------------------
# File and Version Information:
#  $Id: DCUtils.py 12777 2016-10-19 21:37:38Z dubrovin@SLAC.STANFORD.EDU $
#
# Description:
#------------------------------------------------------------------------

"""
:py:class:`PSCalib.DCUtils` - contains a set of utilities

Usage::

    # Import
    import PSCalib.DCUtils as gu

    # Methods
    # Get string with time stamp, ex: 2016-01-26T10:40:53
    ts    = gu.str_tstamp(fmt='%Y-%m-%dT%H:%M:%S', time_sec=None)

    usr   = gu.get_enviroment(env='USER')
    usr   = gu.get_login()
    host  = gu.get_hostname()
    cwd   = gu.get_cwd()
    gu.create_directory(dir, mode=0775)
    gu.create_path(path, depth=2, mode=0775)
    gu.save_string_as_dset(grp, name, s)
    src   = gu.source_full_name(env, src)
    dtype = gu.dettype_from_str_source(src)

    src   = gu.string_from_source(source) # source is psana.Source object or string like
                                          # 'CxiDs2.0:Cspad.0' from 'DetInfo(CxiDs2.0:Cspad.0)'
    dname  = gu.detector_full_name(env, src)
    source = gu.psana_source(env, srcpar)
    t_sec  = gu.evt_time(evt)
    fid    = gu.evt_fiducials(evt)

    # methods for HDF5 
    sg = gu.get_subgroup(grp, subgr_name)
    gu.delete_object(grp, oname)
    gu.save_object_as_dset(grp, name, shape=None, dtype=None, data=0)


@see project modules
    * :py:class:`PSCalib.DCStore`
    * :py:class:`PSCalib.DCType`
    * :py:class:`PSCalib.DCRange`
    * :py:class:`PSCalib.DCVersion`
    * :py:class:`PSCalib.DCBase`
    * :py:class:`PSCalib.DCInterface`
    * :py:class:`PSCalib.DCUtils`
    * :py:class:`PSCalib.DCDetectorId`
    * :py:class:`PSCalib.DCConfigParameters`
    * :py:class:`PSCalib.DCFileName`
    * :py:class:`PSCalib.DCLogger`
    * :py:class:`PSCalib.DCMethods`
    * :py:class:`PSCalib.DCEmail`

This software was developed for the SIT project.
If you use all or part of it, please give an appropriate acknowledgment.

@version $Id: 2013-03-08$

@author Mikhail S. Dubrovin
"""

#--------------------------------
__version__ = "$Revision: 12777 $"
#--------------------------------

import sys
import os
import getpass
import socket
import numpy as np
from time import localtime, strftime, time
import psana
from PSCalib.DCLogger import log
import PSCalib.GlobalUtils as gu

#------------------------------

class h5py_proxy :
    def __init__(self) :
        self.h5py = None
        self.Dataset = None
        self.Group = None
        self.File = None

    def fetch_h5py(self) :
        if self.h5py is None :
            import h5py
            self.h5py = h5py

#------------------------------
#h5py = h5py_proxy()
import h5py
#------------------------------

class Storage :
    def __init__(self) :
        self.dataset_t = h5py.Dataset
        self.group_t   = h5py.Group
        self.File      = h5py.File

#------------------------------

sp = Storage()

#------------------------------

[docs]def str_tstamp(fmt='%Y-%m-%dT%H:%M:%S', time_sec=None) : """Returns string timestamp for specified format and time in sec or current time by default """ return strftime(fmt, localtime(time_sec)) #------------------------------
[docs]def get_enviroment(env='USER') : """Returns the value of specified by string name environment variable """ return os.environ[env] #------------------------------
[docs]def get_login() : """Returns login name """ #return os.getlogin() return getpass.getuser() #------------------------------
[docs]def get_hostname() : """Returns login name """ #return os.uname()[1] return socket.gethostname() #------------------------------
[docs]def get_cwd() : """Returns current working directory """ return os.getcwd() #------------------------------
[docs]def create_directory_v0(dir, verb=False) : if os.path.exists(dir) : if verb : print 'Directory exists: %s' % dir else : os.makedirs(dir) if verb : print 'Directory created: %s' % dir #------------------------------
[docs]def create_directory(dir, mode=0775) : #print 'create_directory: %s' % dir if os.path.exists(dir) : log.debug('Directory exists: %s' % dir, __name__) else : os.makedirs(dir, mode) #os.chmod(dir, mode) #os.system(cmd) log.info('Directory created: %s' % dir, __name__) #------------------------------
[docs]def create_path(path, depth=2, mode=0775) : # Creates missing path for /reg/d/psdm/<INS>/<EXP>/calib/<dtype> beginning from calib subdirs = path.rstrip('/').rsplit('/', depth) log.debug('subdirs: %s' % str(subdirs), __name__) cpath = subdirs[0] length = len(subdirs) for i,sd in enumerate(subdirs[1:]) : cpath += '/%s'% sd #if i<length-depth : continue create_directory(cpath, mode) #print 'create_path: %s' % cpath return os.path.exists(cpath) #------------------------------
[docs]def save_string_as_dset(grp, name, s) : """Creates and returns the h5py dataset object with name for single string s """ if s is None : return None #size = len(s) #create_dataset(name, shape=None, dtype=None, data=None, **kwds) dset = grp.create_dataset(name, shape=(1,), dtype='S%d'%len(s)) #, data=s) dset[0] = s return dset #------------------------------
[docs]def source_full_name(env, src) : """Returns full name like 'DetInfo(XppGon.0:Cspad2x2.0)' of the brief source or its alias using env.configStore().keys() """ str_src = str(src) for k in env.configStore().keys() : if str_src in str(k.src())\ or str_src == str(k.alias()) : return k.src() return None #------------------------------
[docs]def dettype_from_str_source(src) : """Returns the detector type from full psana source name (Ex.: Cspad2x2 from DetInfo(XppGon.0:Cspad2x2.0) """ str_src = str(src) str_split = str_src.rsplit(':',1) detname = str_split[1].split('.',1) if len(str_split)>1 else None return detname[0] if len(detname)>1 else None #------------------------------
[docs]def string_from_source(source) : """Returns string like 'CxiDs2.0:Cspad.0' from 'DetInfo(CxiDs2.0:Cspad.0)' or 'DsaCsPad' from 'Source('DsaCsPad')' form input string or psana.Source object """ str_src = str(source) if '"' in str_src : return str_src.split('"')[1] # case of psana.String object str_split = str_src.rsplit('(',1) return str_split[1].split(')',1)[0] if len(str_split)>1 else str_src #------------------------------
[docs]def detector_full_name(env, src) : """Returns full detector name like 'XppGon.0:Cspad2x2.0' for short src, alias src, or psana.Source. """ str_src = str(src) str_src = string_from_source(str_src) if str_src is None : return None str_src = source_full_name(env, str_src) if str_src is None : return None return string_from_source(str_src) ##------------------------------
[docs]def psana_source(env, srcpar) : """returns psana.Source(src) from other psana.Source brief src or alias. Parameters srcpar : str - regular source or its alias, ex.: 'XppEndstation.0:Rayonix.0' or 'rayonix' set_sub : bool - default=True - propagates source parameter to low level package """ #print 'type of srcpar: ', type(srcpar) src = srcpar if isinstance(srcpar, psana.Source) else psana.Source(srcpar) str_src = string_from_source(src) amap = env.aliasMap() psasrc = amap.src(str_src) source = src if amap.alias(psasrc) == '' else amap.src(str_src) if not isinstance(source, psana.Source) : source = psana.Source(source) return source #------------------------------
[docs]def get_subgroup(grp, subgr_name) : """For hdf5: returns subgroup of the group if it exists or creates and returns new subgroup """ #print 'YYY grp.name:', grp.name, ' subgr_name:', subgr_name if subgr_name in grp : return grp[subgr_name] return grp.create_group(subgr_name) #------------------------------
[docs]def delete_object(grp, oname) : """For hdf5: removes object from group. """ #print 'TTT grp.name: %s delete object with name: %s' % (grp.name, oname) #t0_sec = time() if oname in grp : del grp[oname] #print 'TTT %s: time (sec) = %.6f' % (sys._getframe().f_code.co_name, time()-t0_sec) #------------------------------
[docs]def save_object_as_dset(grp, name, shape=None, dtype=None, data=0) : """Saves object as h5py dataset Currently supports scalar int, double, string and numpy.array """ #print 'XXX: save_object_as_dset ' #print 'XXX grp.keys():', grp.keys() #print 'XXX %s in grp.keys(): ' % name, name in grp.keys() if name in grp.keys() : return if isinstance(data, np.ndarray) : return grp.create_dataset(name, data=data) sh = (1,) if shape is None else shape if dtype is not None : return grp.create_dataset(name, shape=sh, dtype=dtype, data=data) if isinstance(data, str) : return save_string_as_dset(grp, name, data) if isinstance(data, int) : return grp.create_dataset(name, shape=sh, dtype='int', data=data) if isinstance(data, float) : return grp.create_dataset(name, shape=sh, dtype='double', data=data) log.warning("Can't save parameter: %s of %s in the h5py group: %s" % (name, str(dtype), grp.name), 'DCUtils.save_object_as_dset') #------------------------------
[docs]def evt_time(evt) : """Returns event (double) time. """ evid = evt.get(psana.EventId) ttuple = evid.time() #print 'XXX time:', ttuple return float(ttuple[0]) + float(ttuple[1])*1e-9 #------------------------------
[docs]def evt_fiducials(evt) : """Returns event fiducials. """ evid = evt.get(psana.EventId) return evid.fiducials() #------------------------------ #------------------------------ #------------------------------ #------------------------------ #------------------------------
[docs]def test_source_full_name() : ds = psana.DataSource('/reg/g/psdm/detector/data_test/types/0007-NoDetector.0-Epix100a.0.xtc') env=ds.env() print 20*'_', '\n%s:' % sys._getframe().f_code.co_name print 'src="Epix" :', source_full_name(env, 'Epix') print 'src="Cspad." :', source_full_name(env, 'Cspad.') print 'src="Cspad" :', source_full_name(env, 'Cspad') print 'src="cs140_0" :', source_full_name(env, 'cs140_0') #------------------------------
[docs]def test_string_from_source() : ds = psana.DataSource('/reg/g/psdm/detector/data_test/types/0007-NoDetector.0-Epix100a.0.xtc') env=ds.env() source = psana_source(env, 'cs140_0') print 20*'_', '\n%s:' % sys._getframe().f_code.co_name print 'source', source print 'string_from_source', string_from_source(source) #------------------------------
[docs]def test_psana_source() : ds = psana.DataSource('/reg/g/psdm/detector/data_test/types/0007-NoDetector.0-Epix100a.0.xtc') env=ds.env() print 20*'_', '\n%s:' % sys._getframe().f_code.co_name print 'psana_source(env, "Epix") :', psana_source(env, 'Epix') print 'psana_source(env, "Cspad.") :', psana_source(env, 'Cspad.') print 'psana_source(env, "Cspad") :', psana_source(env, 'Cspad') print 'psana_source(env, "cs140_0") :', psana_source(env, 'cs140_0') #------------------------------
[docs]def test_detector_full_name() : ds = psana.DataSource('/reg/g/psdm/detector/data_test/types/0007-NoDetector.0-Epix100a.0.xtc') env=ds.env() print 20*'_', '\n%s:' % sys._getframe().f_code.co_name print 'src="Epix" :', detector_full_name(env, "Epix") print 'src=psana.Source("Epix")) :', detector_full_name(env, psana.Source('Epix')) print 'src="DetInfo(NoDetector.0:Epix100a.0)":', detector_full_name(env, 'DetInfo(NoDetector.0:Epix100a.0)') print 'for alias src="cs140_0" :', detector_full_name(env, 'cs140_0') #------------------------------
[docs]def test_evt_time() : ds = psana.DataSource('/reg/g/psdm/detector/data_test/types/0007-NoDetector.0-Epix100a.0.xtc') evt=ds.events().next() print 20*'_', '\n%s:' % sys._getframe().f_code.co_name t=evt_time(evt) print 'evt_time(evt) : %.9f' % t #------------------------------
[docs]def test_misc() : print 20*'_', '\n%s:' % sys._getframe().f_code.co_name print 'get_enviroment(USER) : %s' % get_enviroment() print 'get_login() : %s' % get_login() print 'get_hostname() : %s' % get_hostname() print 'get_cwd() : %s' % get_cwd() #------------------------------
[docs]def do_test() : import sys; global sys tname = sys.argv[1] if len(sys.argv) > 1 else '0' print 50*'_', '\nTest %s:' % tname if tname == '0' : test_misc(); test_source_full_name(); test_string_from_source();\ test_psana_source(); test_detector_full_name() elif tname == '1' : test_source_full_name() elif tname == '2' : test_string_from_source() elif tname == '3' : test_psana_source() elif tname == '4' : test_detector_full_name() elif tname == '5' : test_evt_time() else : print 'Not-recognized test name: %s' % tname sys.exit('End of test %s' % tname) #------------------------------
if __name__ == "__main__" : do_test() #------------------------------