Source code for psana.smalldata

"""

existing issue to remember:

> How do people access _dlist_master for applications like psmon?

> user forgets to call save, they get a small but empty HDF5
-- atexit
-- use a destructor: __del__ (prone to circular ref issues)
-- use "with" statement + __exit__ (may affect interface)

> metadata
-- docstrings for default values
-- user interface for adding attributes/info/units (e.g. smld.attribute(a='a is great'))
-- xarray compatability?

> chunking for performance?


>>> from Silke
- storing extra "attributes" or datasets with stuff like ROI (like summary/config field)
- think about cube problem
- user-controlled mode for event distribution (e.g. based on delay-time)
- always write detectors like phasecav

>>> xarray thoughts from Jason:
- coordinates/attributes for everything, using h5netcdf
- for analyzing the small-data: xarray makes it easy to select/filter,
  and will keep track of coordinates for you
- can use pandas for plotting and packages like seaborn (stat plotting)
  with these coordinate arrays
- the xarray coordinates will provide the time sorting
- wildcard to merge multiple files
- didn't support variable length data? (maybe can do this)
- treats NaNs correctly
- would merge fast and slow detectors (e.g. eventcode 40 and 42)
- handles dropped data
- not clear that xarray can write pieces while taking data?
- probably supports hierarchy of hdf5 groups in h5netcdf, but might
  make it more difficult to cut/merge/sort in xarray
"""

import numpy as np
import tables
import collections

from _psana import EventId, Source, Bld
from psana import Detector, DetNames

from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

MISSING_INT = -99999
MISSING_FLOAT = np.nan

INT_TYPES   = [int, np.int8, np.int16, np.int32, np.int64,
               np.int, np.uint8, np.uint16, np.uint32, np.uint64, np.uint]
FLOAT_TYPES = [float, np.float16, np.float32, np.float64, np.float128, np.float]


def _flatten_dictionary(d, parent_key='', sep='/'):
    """
    http://stackoverflow.com/questions/6027558/flatten-nested-python-dictionaries-compressing-keys
    """
    items = []
    for k, v in d.items():
        new_key = parent_key + sep + k if parent_key else k
        if isinstance(v, collections.MutableMapping):
            items.extend(_flatten_dictionary(v, new_key, sep=sep).items())
        else:
            items.append((new_key, v))
    return dict(items)


def remove_values(the_list, val):
[docs] return [value for value in the_list if value != val] class SynchDict(dict):
[docs] """ Class used to keep track of all arrays that need to be gathered (a.k.a. "send_list"). This dictionary is synched before every call to gather. """ def synchronize(self):
[docs] tot_send_list = comm.allgather(self) for node_send_list in tot_send_list: for k in node_send_list: if k not in self.keys(): self[k] = node_send_list[k] def keys(self):
[docs] # this helps ensure that we call "gather" in the right order # on all cores return sorted(self) class SmallData(object):
[docs] """ On master, * numbers (int/float) are lists of arrays, representing workers (items in the list) and events (items in the array) * arrays are lists of lists (!) of arrays, representing workers (items in the outter list) and events (items in the inner list) and values of the original array (arrays) """ def __init__(self, datasource_parent, filename=None, save_on_gather=False): """ Parameters ---------- gather_interval : int The number of events a single process must see before the results get gathered for processing. If `None`, then only gather results when `save` is called. save_on_gather : bool Whether to save the results to disk periodically (after the results get gathered) """ self._datasource_parent = datasource_parent self.save_on_gather = save_on_gather self._num_send_list = SynchDict() self._arr_send_list = SynchDict() self._initialize_default_detectors() # storage space for event data self._dlist = {} if self.master: self._dlist_master = {} self._newkeys = [] if filename and self.master: self.file_handle = tables.File(filename, 'w') return @property def master(self): return (rank == 0) @property def currevt(self): return self._datasource_parent._currevt @staticmethod def _num_or_array(obj):
[docs] data_type = type(obj) if ((data_type in [int, float]) or np.issubdtype(data_type, np.integer) or np.issubdtype(data_type, np.float)): s = 'num' elif data_type is np.ndarray: s = 'array' else: raise TypeError('object is not number or array') return s def _sort(self, obj, sort_order):
[docs] """ sort `obj`, an array or list, by `sort_order` in dim -1, IN PLACE """ assert type(sort_order) == np.ndarray, 'sort_order must be array' t = self._num_or_array(obj[0]) if t is 'num': res = obj[sort_order] elif t is 'array': res = [x for (y,x) in sorted( zip(sort_order, obj) ) ] return res def missing(self, key):
[docs] """ Use the send_list to figure out the correct types and fill values for missing data """ if key in self._num_send_list.keys(): t = self._num_send_list[key] if t in INT_TYPES: missing_value = MISSING_INT elif t in FLOAT_TYPES: missing_value = MISSING_FLOAT else: raise ValueError('%s :: Invalid num type for missing data' % str(t)) elif key in self._arr_send_list.keys(): t = self._arr_send_list[key][0] shape = self._arr_send_list[key][1] dtype = self._arr_send_list[key][2] missing_value = np.empty(shape, dtype=dtype) if dtype in INT_TYPES: missing_value.fill(MISSING_INT) elif dtype in FLOAT_TYPES: missing_value.fill(MISSING_FLOAT) else: raise ValueError('%s :: Invalid array type for missing data' % str(dtype)) else: raise KeyError('key %s not found in array or number send_list' % key) return missing_value def _gather(self):
[docs] """ Gather arrays and numbers from all MPI ranks. """ # "send lists" hold a catalogue of the data keys to expect # aggregated across all ranks self._arr_send_list.synchronize() self._num_send_list.synchronize() # for all data in our aggregated catalogue, gather for k in self._arr_send_list.keys(): if k not in self._dlist.keys(): self._dlist[k] = [] #if k == 'a': print self._dlist[k] self._backfill_client(self._nevents, self._dlist[k], k) self._gather_arrays(self._dlist[k], k) for k in self._num_send_list.keys(): if k not in self._dlist.keys(): self._dlist[k] = [] self._backfill_client(self._nevents, self._dlist[k], k) self._gather_numbers(self._dlist[k], k) self._dlist = {} # forget all the data we just sent # if we are the master: # (1) sort data by time # (2) backfill missing data [to beginning of time, memory + disk] # this must be after the sort, because there are no timestamps # to sort backfilled data # (3) save if requested # note that _dlist_master is different than the client _dlist's. it is a list of lists, # one list for each gather that has happened since the last save. the style of # the contents of _dlist_master is also changed by the "sort" call: individual numbers # (not arrays) in a gather are originally arrays, but the sort converts them into lists # of numbers. if self.master: # handle the case of zero events if len(self._dlist_master.keys()) > 0: # (1) sort data by time # get the order to sort in from the event times sort_map = np.argsort(self._dlist_master['event_time'][-1]) for k in self._dlist_master.keys(): # "-1" here says we are only sorting the result from the most recent gather self._dlist_master[k][-1] = [self._dlist_master[k][-1][i] for i in sort_map] # (2) backfill missing data for k in self._newkeys: events_in_mem = sum([len(x) for x in self._dlist_master['fiducials']]) target_events = self._nevents_on_disk + events_in_mem self._dlist_master[k] = self._backfill_master(target_events, self._dlist_master[k], self.missing(k)) self._newkeys=[] # (3) save if requested if self.save_on_gather: self._save() # save all event data on gather return def _gather_numbers(self, num_list, key):
[docs] lengths = np.array(comm.gather(len(num_list))) # get list of lengths mysend = np.array(num_list,dtype=self._num_send_list[key]) myrecv = None if self.master: myrecv = np.empty((sum(lengths)),mysend.dtype) # allocate receive buffer comm.Gatherv(sendbuf=mysend, recvbuf=[myrecv, lengths]) if self.master: self._dlist_append(self._dlist_master, key, myrecv) return def _gather_arrays(self, array_list, key):
[docs] """ Gather arrays from all workers and update the master's dlist """ # send to master the shape of each array to expect # on rank 0 (master), `worker_shps` is a list of list of tuples: # first list: one item for each rank # second list: one item for each array # tuple: the shape of the array worker_shps = comm.gather([ a.shape for a in array_list ]) # workers flatten arrays and send those to master if len(array_list) > 0: mysend = np.concatenate([ x.reshape(-1) for x in array_list ]) mysend = np.ascontiguousarray(mysend) else: mysend = np.array([], dtype=self._arr_send_list[key][2]) # master computes how many array elements to expect, # recvs in linear array if self.master: worker_lens = [[np.product(shp) for shp in worker] for worker in worker_shps] # flattened size of each array worker_msg_sizes = [np.sum(lens) for lens in worker_lens] # size of msg from each rank recv_buff_size = np.sum(worker_msg_sizes) myrecv = np.empty(recv_buff_size, mysend.dtype) # allocate receive buffer recvbuf = [myrecv, worker_msg_sizes] else: recvbuf = None comm.Gatherv(sendbuf=mysend, recvbuf=recvbuf) # master re-shapes linear received message into a list of well-shaped arrays # the array shapes were communicated previously if self.master: start = 0 reshaped_recv = [] for worker in worker_shps: for shp in worker: l = np.product(shp) reshaped_recv.append( myrecv[start:start+l].reshape(*shp) ) start += l # finally, insert the new arrays into master's dlist self._dlist_append(self._dlist_master, key, reshaped_recv) return @property
def _nevents(self): if 'fiducials' in self._dlist: return len(self._dlist['fiducials']) else: return 0 @property def _nevents_on_disk(self): try: self.file_handle.get_node('/', 'fiducials') return len(self.file_handle.root.fiducials) except tables.NoSuchNodeError: return 0 # missing data ideas: # # - do an allgather of the keys/types/shapes into "send_list" # # * client-side issues: # - (case 1) when we append data to _dlist we will backfill numbers before adding the new data, roughly done. # - (case 2) when we gather we check for keys that we didn't see at all OR keys that are "ragged" and backfill. # - (case 3) no events at all on a client # - need to construct the missing data for each of the above cases # # * master-side issues: # - (case 4) master also needs to backfill on disk and memory def _backfill_client(self, target_events, dlist_element, key):
[docs] numfill = target_events - len(dlist_element) if numfill > 0: fill_value = self.missing(key) dlist_element.extend([fill_value]*numfill) return @staticmethod
def _backfill_master(target_events, dlist_element, fill_value):
[docs] numfill = target_events - sum([len(x) for x in dlist_element]) if numfill > 0: dlist_element = [[fill_value]*numfill] + dlist_element return dlist_element def _dlist_append_client(self, key, value):
[docs] data_type = type(value) if data_type is np.ndarray: value = np.atleast_1d(value) if key not in self._arr_send_list: self._arr_send_list[key] = (data_type, value.shape, value.dtype) else: if key not in self._num_send_list: self._num_send_list[key] = data_type if key not in self._dlist.keys(): self._dlist[key] = [] # patch up _dlist with missing data before we add new values self._backfill_client(self._nevents - 1, self._dlist[key], key) if data_type is np.ndarray: # save a copy of the array (not a reference) in case the # user reuses the same array memory for the next event self._dlist[key].append(np.copy(value)) else: self._dlist[key].append(value) return def _dlist_append(self, dlist, key, value):
[docs] if key not in dlist.keys(): dlist[key] = [value] self._newkeys.append(key) else: dlist[key].append(value) return def event(self, *args, **kwargs):
[docs] """ Save some data from this event for later use. Parameters ---------- *args : dictionaries Save HDF5 group heirarchies using nested dictionaries. Each level of the dictionary is a level in the HDF5 group heirarchy. **kwargs : datasetname, dataset Save an arbitrary piece of data from this run. The kwarg will give it an (HDF5 dataset) name that appears in the resulting HDF5 file. Examples -------- >>> # save the status of a laser >>> smldata.event(laser_on=laser_on) >>> # save "data' at a special location "/base/next_group/data" >>> smldata.event({'base': {'next_group' : data}}) """ # get timestamp data for most recently yielded evt evt_id = self.currevt.get(EventId) if evt_id is None: return # can't do anything without a timestamp if ('event_time' in kwargs.keys()) or ('fiducials' in kwargs.keys()): raise KeyError('`event_time` and `fiducials` are special names' ' reserved for timestamping -- choose a different ' 'name') # *args can be used to pass hdf5 hierarchies (groups/names) in a dict # flatten these and create a single dictionary of (name : value) pairs # when groups will be created in the hdf5, we will use a "/" event_data_dict = {} event_data_dict.update(kwargs) for d in args: event_data_dict.update( _flatten_dictionary(d) ) time = evt_id.time()[0] << 32 | evt_id.time()[1] # chris' craziness fid = evt_id.fiducials() # --> check to see if we already save the time for this event if ('fiducials' in self._dlist.keys()) and (self._dlist['fiducials'][-1] == fid): # check to see if we already added this field this event events_seen = len(self._dlist['fiducials']) for k in event_data_dict.keys(): # if the list is as long as the # evts seen, # user has tried to add key twice if len( self._dlist.get(k, []) ) == events_seen: # False if key not in dlist raise RuntimeError("Event data already added for " "key '%s' this event!" % k) # --> otherwise this is a new event else: self._dlist_append_client('event_time', time) self._dlist_append_client('fiducials', fid) self._event_default() for k in event_data_dict.keys(): self._dlist_append_client(k, event_data_dict[k]) return def _initialize_default_detectors(self):
[docs] self._default_detectors = [] for detector_name in ['EBeam', 'PhaseCavity', 'FEEGasDetEnergy']: try: det = Detector(detector_name) self._default_detectors.append(det) except KeyError as e: pass # add all Evrs (there may be more than one) self._evr_cfgcodes = [] for n in DetNames(): if ':Evr.' in n[0]: det = Detector(n[0]) self._default_detectors.append(det) cfg = det._fetch_configs() assert len(cfg) == 1 self._evr_cfgcodes += [e.code() for e in cfg[0].eventcodes()] self._evr_cfgcodes = set(self._evr_cfgcodes) self._evr_cfgcodes = list(self._evr_cfgcodes) self._evr_cfgcodes.sort() return def _event_default(self):
[docs] """ Cherry-picked machine parameters we think will be useful for basically every experiment """ default = {} for det in self._default_detectors: if det.name.dev == 'EBeam': ebeam_ddl = det.get(self.currevt) if ebeam_ddl is not None: default['ebeam/charge'] = ebeam_ddl.ebeamCharge() default['ebeam/dump_charge'] = ebeam_ddl.ebeamDumpCharge() default['ebeam/L3_energy'] = ebeam_ddl.ebeamL3Energy() default['ebeam/photon_energy'] = ebeam_ddl.ebeamPhotonEnergy() default['ebeam/pk_curr_bc2'] = ebeam_ddl.ebeamPkCurrBC2() if det.name.dev == 'PhaseCavity': pc_ddl = det.get(self.currevt) if pc_ddl is not None: default['phase_cav/charge1'] = pc_ddl.charge1() default['phase_cav/charge2'] = pc_ddl.charge2() default['phase_cav/fit_time_1'] = pc_ddl.fitTime1() default['phase_cav/fit_time_2'] = pc_ddl.fitTime2() if det.name.dev == 'FEEGasDetEnergy': gdet_ddl = det.get(self.currevt) if gdet_ddl is not None: default['gas_detector/f_11_ENRC'] = gdet_ddl.f_11_ENRC() default['gas_detector/f_12_ENRC'] = gdet_ddl.f_12_ENRC() default['gas_detector/f_21_ENRC'] = gdet_ddl.f_21_ENRC() default['gas_detector/f_22_ENRC'] = gdet_ddl.f_22_ENRC() default['gas_detector/f_63_ENRC'] = gdet_ddl.f_63_ENRC() default['gas_detector/f_64_ENRC'] = gdet_ddl.f_64_ENRC() if det.name.dev == 'Evr': evr_codes = det.eventCodes(self.currevt, this_fiducial_only=True) if evr_codes is not None: for c in self._evr_cfgcodes: if c in evr_codes: default['evr/code_%d' % c] = 1 else: default['evr/code_%d' % c] = 0 self.event(default) return def sum(self, value):
[docs] return self._mpi_reduce(value, MPI.SUM) def max(self, value):
[docs] return self._mpi_reduce(value, MPI.MAX) def min(self, value):
[docs] return self._mpi_reduce(value, MPI.MIN) def _mpi_reduce(self, value, function):
[docs] t = self._num_or_array(value) if t is 'num': s = comm.reduce(value, function) elif t is 'array': s = np.empty_like(value) # recvbuf comm.Reduce(value, s, function) return s def _get_node(self, k):
[docs] """ Retrieve or create (if necessary) the pytables node for a specific key. """ try: #print 'trying to get node: %s' % k node = self.file_handle.get_node('/'+k) except tables.NoSuchNodeError as e: # --> create node ex = self._dlist_master[k][0] if self._num_or_array(ex) == 'array': a = tables.Atom.from_dtype(ex.dtype) shp = tuple([0] + list(ex.shape)) elif self._num_or_array(ex) == 'num': a = tables.Atom.from_dtype(np.array(ex).dtype) shp = (0,) path, _, name = k.rpartition('/') node = self.file_handle.create_earray(where='/'+path, name=name, shape=shp, atom=a, createparents=True) return node def save(self, *args, **kwargs):
[docs] """ Save registered data to an HDF5 file. There are 3 behaviors of the arguments to this function: 1. Decide what 'event data' (declared by SmallData.event()) should be saved 2. Add summary (ie. any non-event) data using key-value pairs (similar to SmallData.event()) 3. Add summary (ie. any non-event) data organized in a heirarchy using nested dictionaries (als similar to SmallData.event()) These data are then saved to the file specifed in the SmallData constructor. Parameters ---------- *args : strings A list of the event data names to save, if you want to save a subset. Otherwise save all data to disk. For example, imagine you had called SmallData.event(a=..., b=...). Then smldata.save('b') would not save data labelled 'a'. smldata.save() would save both 'a' and 'b'. *args : dictionaries In direct analogy to the SmallData.event call, you can also pass HDF5 group heirarchies using nested dictionaries. Each level of the dictionary is a level in the HDF5 group heirarchy. **kwargs : datasetname, dataset Similar to SmallData.event, it is possible to save arbitrary singleton data (e.g. at the end of a run, to save an average over some quanitity). event_data : bool Special kwarg. If `False`, do not save the event data, just the run summary data. If `True` (default), do. Examples -------- >>> # save all event data >>> smldata.save() >>> # save all event data AND "array_containing_sum" >>> smldata.save(cspad_sum=array_containing_sum) >>> # save 'b' event data, and no other >>> smldata.save('b') >>> # save all event data AND "/base/next_group/data" >>> smldata.save({'base': {'next_group' : data}}) """ self._gather() if self.master: self._save(*args, **kwargs) return def _save(self, *args, **kwargs):
[docs] evt_variables_to_save = [s for s in args if type(s)==str] dictionaries_to_save = [d for d in args if type(d)==dict] dict_to_save = {} dict_to_save.update(kwargs) for d in dictionaries_to_save: dict_to_save.update(_flatten_dictionary(d)) if self.file_handle is None: # we could accept a 'filename' argument here in the save method raise IOError('no filename specified in SmallData constructor') # if 'event_data' key is set, save event data (default True) if kwargs.pop('event_data', True): # if the user has specified which keys to save, just # save those; else, save all event data if len(evt_variables_to_save) > 0: keys_to_save = ['event_time', 'fiducials'] for k in evt_variables_to_save: if k in self._dlist_master.keys(): keys_to_save.append(k) else: print('Warning: event data key %s has no ' 'associated event data and will not ' 'be saved' % k) else: keys_to_save = self._dlist_master.keys() # for each item to save, write to disk for k in keys_to_save: if len(self._dlist_master[k]) > 0: # make a list of lists of arrays a single np array # note "[]" due to gathers with no data (for any rank) self._dlist_master[k] = remove_values(self._dlist_master[k], []) self._dlist_master[k] = np.concatenate(self._dlist_master[k]) node = self._get_node(k) node.append( self._dlist_master[k] ) self._dlist_master[k] = [] else: #print 'Warning: no data to save for key %s' % k pass # save "accumulated" data (e.g. averages) for k,v in dict_to_save.items(): if type(v) is not np.ndarray: v = np.array([v]) path, _, name = k.rpartition('/') node = self.file_handle.create_carray(where='/'+path, name=name, obj=v, createparents=True) return def close(self):
[docs] """ Close the HDF5 file used for writing. """ if self.master: self.file_handle.close()