#-----------------------------------------------------------------------------
# File and Version Information:
# $Id: DCRange.py 12730 2016-10-11 21:11:56Z dubrovin@SLAC.STANFORD.EDU $
#-----------------------------------------------------------------------------
"""
:py:class:`PSCalib.DCRange` - class for the Detector Calibration (DC) project.
Usage::
# Import
from PSCalib.DCRange import DCRange
# Initialization
o = DCRange(begin, end=None, cmt=None)
# Methods
str_range = o.range() # (str) of the time stamp validity range
t_sec = o.begin() # (double) time stamp beginning validity range
t_sec = o.end() # (double) time stamp ending validity range or (str) 'end'
dico = o.versions() # (list of uint) versions of calibrations
v = o.vnum_def() # returns default version number
v = o.vnum_last() # returns last version number
vo = o.version(vnum=None) # returns version object for specified version
ts_in_range = o.tsec_in_range(tsec) # (bool) True/False if tsec is/not in the validity range
evt_in_range= o.evt_in_range(evt) # (bool) True/False if evt is/not in the validity range
o.set_begin(tsbegin) # set (int) time stamp beginning validity range
o.set_end(tsend) # set (int) time stamp ending validity range
o.add_version(vnum=None, tsec_prod=None, nda=None, cmt=None) # add object for new version of calibration data
o.set_vnum_def(vnum=None) # set default version number, if available. vnum=None - use last available.
vd = o.mark_version(vnum=None) # mark version for deletion, returns version number or None if nothing was deleted
o.mark_versions() # mark all registered versions for deletion
o.save(group) # saves object content under h5py.group in the hdf5 file.
o.load(group) # loads object content from the hdf5 file.
o.print_obj() # print info about this object and its children
@see project modules
* :py:class:`PSCalib.DCStore`
* :py:class:`PSCalib.DCType`
* :py:class:`PSCalib.DCRange`
* :py:class:`PSCalib.DCVersion`
* :py:class:`PSCalib.DCBase`
* :py:class:`PSCalib.DCInterface`
* :py:class:`PSCalib.DCUtils`
* :py:class:`PSCalib.DCDetectorId`
* :py:class:`PSCalib.DCConfigParameters`
* :py:class:`PSCalib.DCFileName`
* :py:class:`PSCalib.DCLogger`
* :py:class:`PSCalib.DCMethods`
* :py:class:`PSCalib.DCEmail`
This software was developed for the SIT project.
If you use all or part of it, please give an appropriate acknowledgment.
@version $Id: DCRange.py 12730 2016-10-11 21:11:56Z dubrovin@SLAC.STANFORD.EDU $
@author Mikhail S. Dubrovin
"""
#---------------------------------
__version__ = "$Revision: 12730 $"
#---------------------------------
import os
import sys
from math import floor, ceil
#import math
#import numpy as np
#from time import time
#from PSCalib.DCConfigParameters import cp
from PSCalib.DCInterface import DCRangeI
from PSCalib.DCLogger import log
from PSCalib.DCVersion import DCVersion, version_int_to_str
from PSCalib.DCUtils import sp, evt_time, get_subgroup, save_object_as_dset, delete_object
#------------------------------
[docs]def key(begin, end=None) :
""" Return range as a string,
ex.: 1471285222-1471285555 or 1471285222-end from double time like 1471285222.123456
"""
str_begin = ('%d' % floor(begin)) if begin is not None else 0
str_end = 'end' if (end is None or end=='end') else ('%d' % ceil(end))
return '%s-%s' % (str_begin, str_end)
#------------------------------
[docs]class DCRange(DCRangeI) :
"""Class for the Detector Calibration (DC) project
Parameters
begin : double - time in sec
end : double - time in sec or None meaning infinity
cmt : str - comment
"""
def __init__(self, begin, end=None, cmt=None) : # double, double/None
DCRangeI.__init__(self, begin, end, cmt)
self._name = self.__class__.__name__
self.set_begin(begin)
self.set_end(end)
self._dicvers = {}
self._vnum_def = 0 # 0 = use last
self._str_range = key(begin, end)
log.debug('In c-tor for range: %s' % self._str_range, self._name)
[docs] def range(self) : return self._str_range
[docs] def begin(self) : return self._begin
[docs] def end(self) : return self._end
[docs] def versions(self) : return self._dicvers
[docs] def vnum_def(self) :
#return self.vnum_last()
if self._vnum_def == 0 or self._vnum_def is None :
return self.vnum_last()
return self._vnum_def
[docs] def vnum_last(self) :
keys = self._dicvers.keys()
return keys[-1] if len(keys) else 0
[docs] def version(self, vnum=None) :
v = vnum if vnum is not None else self.vnum_def()
return self._dicvers.get(v, None) if v is not None else None
[docs] def set_begin(self, begin) : self._begin = begin
[docs] def set_end(self, end=None) : self._end = 'end' if end is None else end
[docs] def set_str_range(self, str_range) : self._str_range = str_range
[docs] def add_version(self, vnum=None, tsec_prod=None, nda=None, cmt=False) :
vn = self.vnum_last() + 1 if vnum is None else vnum
if vn in self._dicvers.keys() :
return self._dicvers[vn]
o = self._dicvers[vn] = DCVersion(vn, tsec_prod, nda)
rec = self.make_record('add version', str(vn), cmt)
if cmt is not False : self.add_history_record(rec)
log.info(rec, self.__class__.__name__)
return o
[docs] def set_vnum_def(self, vnum=None) :
if vnum is None or vnum == 0 :
self._vnum_def = 0 # will use last
elif vnum in self._dicvers.keys() :
self._vnum_def = vnum
self.add_history_record('WARNING: set_vnum_defdef sets default version %d' % vnum)
else :
msg = 'Attemt to set non-existent version %d as default' % vnum
log.warning(msg, self._name)
[docs] def mark_version(self, vnum=None, cmt=False) :
"""Marks child object for deletion in save()"""
vers = self.vnum_last() if vnum is None else vnum
if vers in self._dicvers.keys() :
self._lst_del_keys.append(vers)
rec = self.make_record('del version', str(vers), cmt)
if cmt is not False : self.add_history_record(rec)
log.info(rec, self.__class__.__name__)
return vers
else :
msg = 'Marking of non-existent version %s' % str(vers)
log.warning(msg, self._name)
return None
[docs] def mark_versions(self) :
"""Marks all child objects for deletion in save()"""
for vers in self._dicvers.keys() :
self.mark_version(vers)
#self._lst_del_keys.append(vers)
def __del__(self) :
for vers in self._dicvers.keys() :
del self._dicvers[vers]
[docs] def clear_versions(self) :
self._dicvers.clear()
[docs] def tsec_in_range(self, tsec) :
if tsec < self.begin() : return False
if self.end() == 'end' : return True
if tsec > self.end() : return False
return True
[docs] def evt_in_range(self, evt) :
return self.tsec_in_range(evt_time(evt))
def __cmp__(self, other) :
"""for comparison in sorted()
"""
if self.begin() < other.begin() : return -1
if self.begin() > other.begin() : return 1
if self.begin() == other.begin() :
if self.end() == other.end() : return 0
if self.end() == 'end' : return -1 # inverse comparison for end
if other.end() == 'end' : return 1
if self.end() > other.end() : return -1
if self.end() < other.end() : return 1
[docs] def save(self, group) :
grp = get_subgroup(group, self.range())
ds1 = save_object_as_dset(grp, 'begin', data=self.begin()) # dtype='double'
ds2 = save_object_as_dset(grp, 'end', data=self.end()) # dtype='double'
ds3 = save_object_as_dset(grp, 'range', data=self.range()) # dtype='str'
ds4 = save_object_as_dset(grp, 'versdef', data=self._vnum_def) # dtype='int'
msg = '=== save(), group %s object for %s' % (grp.name, self.range())
log.debug(msg, self._name)
#print 'ZZZ: self.versions()', self.versions()
# save/delete objects in/from hdf5 file
for k,v in self._dicvers.iteritems() :
if k in self._lst_del_keys : delete_object(grp, version_int_to_str(k))
else : v.save(grp)
# deletes items from dictionary
for k in self._lst_del_keys :
del self._dicvers[k]
self._lst_del_keys = []
self.save_base(grp)
[docs] def load(self, grp) :
msg = '=== load data from group %s and fill object %s' % (grp.name, self._name)
log.debug(msg, self._name)
for k,v in dict(grp).iteritems() :
#subgrp = v
#print ' ', k , v
if isinstance(v, sp.dataset_t) :
log.debug('load dataset "%s"' % k, self._name)
if k == 'begin' : self.set_begin(v[0])
elif k == 'end' : self.set_end(v[0])
elif k == 'range' : self.set_str_range(v[0])
elif k == 'versdef' : self.set_vnum_def(v[0]) # self._vnum_def = v[0]
else : log.warning('group "%s" has unrecognized dataset "%s"' % (grp.name, k), self._name)
elif isinstance(v, sp.group_t) :
if self.is_base_group(k,v) : continue
log.debug('load group "%s"' % k, self._name)
o = self.add_version(v['version'][0], cmt=False)
o.load(v)
[docs] def print_obj(self) :
offset = 3 * self._offspace
self.print_base(offset)
print '%s begin %s' % (offset, self.begin()),
print ': %s' % self.tsec_to_tstr(self.begin())
print '%s end %s' % (offset, self.end()),
print ' %s' % ('' if (self.end() in (None,'end')) else self.tsec_to_tstr(self.end()))
print '%s range %s' % (offset, self.range())
print '%s versdef %s' % (offset, self.vnum_def())
print '%s N vers %s' % (offset, len(self.versions()))
print '%s versions %s' % (offset, str(self.versions().keys()))
for k,v in self.versions().iteritems() :
v.print_obj()
#------------------------------
[docs]def test_DCRange() :
o = DCRange(None, None)
r = o.begin()
r = o.end()
r = o.versions()
r = o.vnum_def()
r = o.version(None)
o.set_begin(None)
o.set_end(None)
o.add_version()
o.set_vnum_def(None)
v = o.del_version(None)
o.del_versions()
o.clear_versions()
r = o.get(None, None, None)
#o.save(None)
o.load(None)
#------------------------------
[docs]def test() :
log.setPrintBits(0377)
if len(sys.argv)==1 : print 'For test(s) use command: python %s <test-number=1-4>' % sys.argv[0]
elif(sys.argv[1]=='1') : test_DCRange()
else : print 'Non-expected arguments: sys.argv = %s use 1,2,...' % sys.argv
#------------------------------
if __name__ == "__main__" :
test()
sys.exit( 'End of %s test.' % sys.argv[0])
#------------------------------