psana/src/smalldata.py

Go to the documentation of this file.
00001 
00002 """
00003 
00004 existing issue to remember:
00005 
00006 > How do people access _dlist_master for applications like psmon?
00007 
00008 > user forgets to call save, they get a small but empty HDF5
00009 -- atexit
00010 -- use a destructor: __del__ (prone to circular ref issues)
00011 -- use "with" statement + __exit__ (may affect interface)
00012 
00013 > metadata
00014 -- docstrings for default values
00015 -- user interface for adding attributes/info/units (e.g. smld.attribute(a='a is great'))
00016 -- xarray compatability?
00017 
00018 > chunking for performance?
00019 
00020 
00021 >>> from Silke
00022 - storing extra "attributes" or datasets with stuff like ROI (like summary/config field)
00023 - think about cube problem
00024 - user-controlled mode for event distribution (e.g. based on delay-time)
00025 - always write detectors like phasecav
00026 
00027 >>> xarray thoughts from Jason:
00028 - coordinates/attributes for everything, using h5netcdf
00029 - for analyzing the small-data: xarray makes it easy to select/filter,
00030   and will keep track of coordinates for you
00031 - can use pandas for plotting and packages like seaborn (stat plotting)
00032   with these coordinate arrays
00033 - the xarray coordinates will provide the time sorting
00034 - wildcard to merge multiple files
00035 - didn't support variable length data? (maybe can do this)
00036 - treats NaNs correctly
00037 - would merge fast and slow detectors (e.g. eventcode 40 and 42)
00038 - handles dropped data
00039 - not clear that xarray can write pieces while taking data?
00040 - probably supports hierarchy of hdf5 groups in h5netcdf, but might
00041   make it more difficult to cut/merge/sort in xarray
00042 """
00043 
00044 import numpy as np
00045 import tables
00046 import collections
00047 
00048 from _psana import EventId, Source, Bld
00049 from psana import Detector, DetNames
00050 
00051 from mpi4py import MPI
00052 comm = MPI.COMM_WORLD
00053 rank = comm.Get_rank()
00054 size = comm.Get_size()
00055 
00056 MISSING_INT = -99999
00057 MISSING_FLOAT = np.nan
00058 
00059 INT_TYPES   = [int, np.int8, np.int16, np.int32, np.int64,
00060                np.int, np.uint8, np.uint16, np.uint32, np.uint64, np.uint]
00061 FLOAT_TYPES = [float, np.float16, np.float32, np.float64, np.float128, np.float]
00062 
00063 
00064 def _flatten_dictionary(d, parent_key='', sep='/'):
00065     """
00066     http://stackoverflow.com/questions/6027558/flatten-nested-python-dictionaries-compressing-keys
00067     """
00068     items = []
00069     for k, v in d.items():
00070         new_key = parent_key + sep + k if parent_key else k
00071         if isinstance(v, collections.MutableMapping):
00072             items.extend(_flatten_dictionary(v, new_key, sep=sep).items())
00073         else:
00074             items.append((new_key, v))
00075     return dict(items)
00076 
00077 
00078 def remove_values(the_list, val):
00079    return [value for value in the_list if value != val]
00080 
00081 
00082 class SynchDict(dict):
00083     """
00084     Class used to keep track of all arrays that need to be gathered
00085     (a.k.a. "send_list").  This dictionary is synched before every call
00086     to gather.
00087     """
00088     def synchronize(self):
00089         tot_send_list = comm.allgather(self)
00090         for node_send_list in tot_send_list:
00091             for k in node_send_list:
00092                 if k not in self.keys():
00093                     self[k] = node_send_list[k]
00094 
00095     def keys(self):
00096         # this helps ensure that we call "gather" in the right order
00097         # on all cores
00098         return sorted(self)
00099 
00100 
00101 class SmallData(object):
00102     """
00103 
00104     On master,
00105       * numbers (int/float) are lists of arrays, representing workers (items
00106         in the list) and events (items in the array)
00107       * arrays are lists of lists (!) of arrays, representing workers (items
00108         in the outter list) and events (items in the inner list) and values
00109         of the original array (arrays)
00110 
00111     """
00112 
00113     def __init__(self, datasource_parent, filename=None, save_on_gather=False):
00114         """
00115 
00116         Parameters
00117         ----------
00118         gather_interval : int
00119             The number of events a single process must see before the results
00120             get gathered for processing. If `None`, then only gather results
00121             when `save` is called.
00122 
00123         save_on_gather : bool
00124             Whether to save the results to disk periodically (after the
00125             results get gathered)
00126         """
00127 
00128         self._datasource_parent = datasource_parent
00129         self.save_on_gather     = save_on_gather
00130         self._num_send_list     = SynchDict()
00131         self._arr_send_list     = SynchDict()
00132 
00133         self._initialize_default_detectors()
00134 
00135         # storage space for event data
00136         self._dlist = {}
00137         if self.master:
00138             self._dlist_master = {}
00139             self._newkeys = []
00140 
00141         if filename and self.master:
00142             self.file_handle = tables.File(filename, 'w')
00143 
00144         return
00145 
00146 
00147 
00148     @property
00149     def master(self):
00150         return (rank == 0)
00151 
00152 
00153     @property
00154     def currevt(self):
00155         return self._datasource_parent._currevt
00156 
00157 
00158     @staticmethod
00159     def _num_or_array(obj):
00160 
00161         data_type = type(obj)
00162         if ((data_type in [int, float]) or
00163              np.issubdtype(data_type, np.integer) or
00164              np.issubdtype(data_type, np.float)):
00165             s = 'num'
00166 
00167         elif data_type is np.ndarray:
00168             s = 'array'
00169 
00170         else:
00171             raise TypeError('object is not number or array')
00172 
00173         return s
00174 
00175 
00176     def _sort(self, obj, sort_order):
00177         """
00178         sort `obj`, an array or list, by `sort_order` in dim -1,
00179         IN PLACE
00180         """
00181         
00182         assert type(sort_order) == np.ndarray, 'sort_order must be array'
00183         t = self._num_or_array(obj[0])
00184 
00185         if t is 'num':
00186             res = obj[sort_order]
00187         elif t is 'array':
00188             res = [x for (y,x) in sorted( zip(sort_order, obj) ) ]
00189 
00190         return res
00191 
00192 
00193     def missing(self, key):
00194         """
00195         Use the send_list to figure out the correct types and fill
00196         values for missing data
00197         """
00198 
00199 
00200         if key in self._num_send_list.keys():
00201             t = self._num_send_list[key]
00202 
00203             if t in INT_TYPES:
00204                 missing_value = MISSING_INT
00205             elif t in FLOAT_TYPES:
00206                 missing_value = MISSING_FLOAT
00207             else:
00208                 raise ValueError('%s :: Invalid num type for missing data' % str(t))
00209 
00210 
00211         elif key in self._arr_send_list.keys():
00212 
00213             t     = self._arr_send_list[key][0]
00214             shape = self._arr_send_list[key][1]
00215             dtype = self._arr_send_list[key][2]
00216 
00217             missing_value = np.empty(shape, dtype=dtype)
00218 
00219             if dtype in INT_TYPES:
00220                 missing_value.fill(MISSING_INT)
00221             elif dtype in FLOAT_TYPES:
00222                 missing_value.fill(MISSING_FLOAT)
00223             else:
00224                 raise ValueError('%s :: Invalid array type for missing data' % str(dtype))
00225 
00226         else:
00227             raise KeyError('key %s not found in array or number send_list' % key)
00228 
00229         return missing_value
00230 
00231 
00232     def _gather(self):
00233         """
00234         Gather arrays and numbers from all MPI ranks.
00235         """
00236 
00237         # "send lists" hold a catalogue of the data keys to expect
00238         # aggregated across all ranks
00239         self._arr_send_list.synchronize()
00240         self._num_send_list.synchronize()
00241 
00242         # for all data in our aggregated catalogue, gather
00243         for k in self._arr_send_list.keys():
00244             if k not in self._dlist.keys(): self._dlist[k] = []
00245             #if k == 'a': print self._dlist[k]
00246             self._backfill_client(self._nevents, self._dlist[k], k)
00247             self._gather_arrays(self._dlist[k], k)
00248 
00249         for k in self._num_send_list.keys():
00250             if k not in self._dlist.keys(): self._dlist[k] = []
00251             self._backfill_client(self._nevents, self._dlist[k], k)
00252             self._gather_numbers(self._dlist[k], k)
00253 
00254         self._dlist = {}  # forget all the data we just sent
00255 
00256         # if we are the master:
00257         #   (1) sort data by time
00258         #   (2) backfill missing data [to beginning of time, memory + disk]
00259         #       this must be after the sort, because there are no timestamps
00260         #       to sort backfilled data
00261         #   (3) save if requested
00262 
00263         # note that _dlist_master is different than the client _dlist's.  it is a list of lists,
00264         # one list for each gather that has happened since the last save.  the style of
00265         # the contents of _dlist_master is also changed by the "sort" call: individual numbers
00266         # (not arrays) in a gather are originally arrays, but the sort converts them into lists
00267         # of numbers.
00268 
00269         if self.master:
00270             # handle the case of zero events
00271             if len(self._dlist_master.keys()) > 0:
00272 
00273                 # (1) sort data by time
00274 
00275                 # get the order to sort in from the event times
00276                 sort_map = np.argsort(self._dlist_master['event_time'][-1])
00277 
00278                 for k in self._dlist_master.keys():
00279 
00280                     # "-1" here says we are only sorting the result from the most recent gather
00281                     self._dlist_master[k][-1] = [self._dlist_master[k][-1][i] for i in sort_map]
00282             
00283 
00284                 # (2) backfill missing data
00285                 for k in self._newkeys:
00286                 
00287                     events_in_mem = sum([len(x) for x in self._dlist_master['fiducials']])
00288                     target_events = self._nevents_on_disk + events_in_mem
00289                     self._dlist_master[k] = self._backfill_master(target_events, 
00290                                                                   self._dlist_master[k], 
00291                                                                   self.missing(k))
00292                 self._newkeys=[]
00293 
00294                 # (3) save if requested
00295                 if self.save_on_gather:
00296                     self._save() # save all event data on gather
00297 
00298         return
00299 
00300 
00301     def _gather_numbers(self, num_list, key):
00302 
00303         lengths = np.array(comm.gather(len(num_list))) # get list of lengths
00304         mysend = np.array(num_list,dtype=self._num_send_list[key])
00305         myrecv = None
00306 
00307         if self.master:
00308             myrecv = np.empty((sum(lengths)),mysend.dtype) # allocate receive buffer
00309 
00310         comm.Gatherv(sendbuf=mysend, recvbuf=[myrecv, lengths])
00311 
00312         if self.master:
00313             self._dlist_append(self._dlist_master, key, myrecv)
00314 
00315         return
00316 
00317 
00318     def _gather_arrays(self, array_list, key):
00319         """
00320         Gather arrays from all workers and update the master's dlist
00321         """
00322 
00323         # send to master the shape of each array to expect
00324         # on rank 0 (master), `worker_shps` is a list of list of tuples:
00325         # first list:  one item for each rank
00326         # second list: one item for each array
00327         # tuple:       the shape of the array
00328         worker_shps = comm.gather([ a.shape for a in array_list ])
00329 
00330         # workers flatten arrays and send those to master
00331         if len(array_list) > 0:
00332             mysend = np.concatenate([ x.reshape(-1) for x in array_list ])
00333             mysend = np.ascontiguousarray(mysend)
00334         else:
00335             mysend = np.array([], dtype=self._arr_send_list[key][2])
00336 
00337         # master computes how many array elements to expect, 
00338         # recvs in linear array
00339 
00340         if self.master:
00341             worker_lens = [[np.product(shp) for shp in worker] for worker in worker_shps]  # flattened size of each array
00342             worker_msg_sizes = [np.sum(lens) for lens in worker_lens]                      # size of msg from each rank
00343             recv_buff_size = np.sum(worker_msg_sizes)
00344             myrecv = np.empty(recv_buff_size, mysend.dtype)                                # allocate receive buffer
00345             recvbuf = [myrecv, worker_msg_sizes]
00346         else:
00347             recvbuf = None
00348 
00349         comm.Gatherv(sendbuf=mysend, recvbuf=recvbuf)
00350         
00351         # master re-shapes linear received message into a list of well-shaped arrays
00352         # the array shapes were communicated previously
00353         if self.master:
00354             start = 0
00355             reshaped_recv = []
00356             for worker in worker_shps:
00357                 for shp in worker:
00358                     l = np.product(shp)
00359                     reshaped_recv.append( myrecv[start:start+l].reshape(*shp) )
00360                     start += l
00361 
00362             # finally, insert the new arrays into master's dlist
00363             self._dlist_append(self._dlist_master, key, reshaped_recv)
00364 
00365         return
00366 
00367 
00368     @property
00369     def _nevents(self):
00370         if 'fiducials' in self._dlist:
00371             return len(self._dlist['fiducials'])
00372         else:
00373             return 0
00374 
00375 
00376     @property
00377     def _nevents_on_disk(self):
00378         try:
00379             self.file_handle.get_node('/', 'fiducials')
00380             return len(self.file_handle.root.fiducials)
00381         except tables.NoSuchNodeError:
00382             return 0
00383 
00384 
00385     # missing data ideas:
00386     # 
00387     # - do an allgather of the keys/types/shapes into "send_list"
00388     # 
00389     # * client-side issues:
00390     # - (case 1) when we append data to _dlist we will backfill numbers before adding the new data, roughly done.
00391     # - (case 2) when we gather we check for keys that we didn't see at all OR keys that are "ragged" and backfill.
00392     # - (case 3) no events at all on a client
00393     # - need to construct the missing data for each of the above cases
00394     # 
00395     # * master-side issues:
00396     # - (case 4) master also needs to backfill on disk and memory
00397 
00398     def _backfill_client(self, target_events, dlist_element, key):
00399         numfill = target_events - len(dlist_element)
00400         if numfill > 0:
00401             fill_value = self.missing(key)
00402             dlist_element.extend([fill_value]*numfill)
00403         return
00404 
00405 
00406     @staticmethod
00407     def _backfill_master(target_events, dlist_element, fill_value):
00408         numfill = target_events - sum([len(x) for x in dlist_element])
00409         if numfill > 0:
00410             dlist_element = [[fill_value]*numfill] + dlist_element
00411         return dlist_element
00412 
00413 
00414     def _dlist_append_client(self, key, value):
00415 
00416         data_type = type(value)
00417 
00418         if data_type is np.ndarray:
00419             value = np.atleast_1d(value)
00420             if key not in self._arr_send_list:
00421                 self._arr_send_list[key] = (data_type, value.shape, value.dtype)
00422 
00423         else:
00424             if key not in self._num_send_list:
00425                 self._num_send_list[key] = data_type
00426 
00427         if key not in self._dlist.keys():
00428             self._dlist[key] = []
00429 
00430         # patch up _dlist with missing data before we add new values
00431         self._backfill_client(self._nevents - 1,
00432                               self._dlist[key],
00433                               key)
00434 
00435         if data_type is np.ndarray:
00436             # save a copy of the array (not a reference) in case the
00437             # user reuses the same array memory for the next event
00438             self._dlist[key].append(np.copy(value))
00439         else:
00440             self._dlist[key].append(value)
00441 
00442         return
00443 
00444 
00445     def _dlist_append(self, dlist, key, value):
00446 
00447         if key not in dlist.keys():
00448             dlist[key] = [value]
00449             self._newkeys.append(key)
00450         else:
00451             dlist[key].append(value)
00452 
00453         return
00454 
00455 
00456     def event(self, *args, **kwargs):
00457         """
00458         Save some data from this event for later use.
00459 
00460         Parameters
00461         ----------
00462         *args : dictionaries
00463             Save HDF5 group heirarchies using nested dictionaries. Each level
00464             of the dictionary is a level in the HDF5 group heirarchy.
00465 
00466         **kwargs : datasetname, dataset
00467             Save an arbitrary piece of data from this run. The kwarg will
00468             give it an (HDF5 dataset) name that appears in the resulting 
00469             HDF5 file.
00470 
00471         Examples
00472         --------
00473         >>> # save the status of a laser
00474         >>> smldata.event(laser_on=laser_on)
00475         
00476         >>> # save "data' at a special location "/base/next_group/data"
00477         >>> smldata.event({'base': {'next_group' : data}})
00478         """
00479 
00480         # get timestamp data for most recently yielded evt
00481         evt_id = self.currevt.get(EventId)
00482         if evt_id is None: return  # can't do anything without a timestamp
00483 
00484         if ('event_time' in kwargs.keys()) or ('fiducials' in kwargs.keys()):
00485             raise KeyError('`event_time` and `fiducials` are special names'
00486                            ' reserved for timestamping -- choose a different '
00487                            'name')
00488 
00489         # *args can be used to pass hdf5 hierarchies (groups/names) in a dict
00490         # flatten these and create a single dictionary of (name : value) pairs
00491         # when groups will be created in the hdf5, we will use a "/"
00492 
00493         event_data_dict = {}
00494         event_data_dict.update(kwargs)
00495         for d in args:
00496             event_data_dict.update( _flatten_dictionary(d) )
00497 
00498 
00499         time = evt_id.time()[0] << 32 | evt_id.time()[1] # chris' craziness
00500         fid  = evt_id.fiducials()
00501 
00502         # --> check to see if we already save the time for this event
00503         if ('fiducials' in self._dlist.keys()) and (self._dlist['fiducials'][-1] == fid):
00504 
00505             # check to see if we already added this field this event
00506             events_seen = len(self._dlist['fiducials'])
00507             for k in event_data_dict.keys():
00508 
00509                 # if the list is as long as the # evts seen, 
00510                 # user has tried to add key twice
00511                 if len( self._dlist.get(k, []) ) == events_seen:        # False if key not in dlist
00512                     raise RuntimeError("Event data already added for "
00513                                        "key '%s' this event!" % k)
00514 
00515         # --> otherwise this is a new event
00516         else:
00517             self._dlist_append_client('event_time', time)
00518             self._dlist_append_client('fiducials', fid)
00519             self._event_default()
00520 
00521         for k in event_data_dict.keys():
00522             self._dlist_append_client(k, event_data_dict[k])
00523 
00524         return
00525 
00526 
00527     def _initialize_default_detectors(self):
00528 
00529         self._default_detectors = []
00530 
00531         for detector_name in ['EBeam', 'PhaseCavity', 'FEEGasDetEnergy']:
00532             try:
00533                 det = Detector(detector_name)
00534                 self._default_detectors.append(det)
00535             except KeyError as e:
00536                 pass
00537 
00538         # add all Evrs (there may be more than one)
00539         self._evr_cfgcodes = [] 
00540         for n in DetNames():
00541             if ':Evr.' in n[0]:
00542                 det = Detector(n[0])
00543                 self._default_detectors.append(det)
00544 
00545                 cfg = det._fetch_configs()
00546                 assert len(cfg) == 1
00547                 self._evr_cfgcodes += [e.code() for e in cfg[0].eventcodes()]
00548 
00549         self._evr_cfgcodes = set(self._evr_cfgcodes)
00550         self._evr_cfgcodes = list(self._evr_cfgcodes)
00551         self._evr_cfgcodes.sort()
00552 
00553         return
00554 
00555 
00556     def _event_default(self):
00557         """
00558         Cherry-picked machine parameters we think will be useful
00559         for basically every experiment
00560         """
00561 
00562         default = {}
00563 
00564         for det in self._default_detectors:
00565 
00566             if det.name.dev == 'EBeam':
00567                 ebeam_ddl = det.get(self.currevt)
00568                 if ebeam_ddl is not None:
00569                     default['ebeam/charge']        = ebeam_ddl.ebeamCharge()
00570                     default['ebeam/dump_charge']   = ebeam_ddl.ebeamDumpCharge()
00571                     default['ebeam/L3_energy']     = ebeam_ddl.ebeamL3Energy()
00572                     default['ebeam/photon_energy'] = ebeam_ddl.ebeamPhotonEnergy()
00573                     default['ebeam/pk_curr_bc2']   = ebeam_ddl.ebeamPkCurrBC2()
00574 
00575             if det.name.dev == 'PhaseCavity':
00576                 pc_ddl = det.get(self.currevt)
00577                 if pc_ddl is not None:
00578                     default['phase_cav/charge1']    = pc_ddl.charge1()
00579                     default['phase_cav/charge2']    = pc_ddl.charge2()
00580                     default['phase_cav/fit_time_1'] = pc_ddl.fitTime1()
00581                     default['phase_cav/fit_time_2'] = pc_ddl.fitTime2()
00582 
00583             if det.name.dev == 'FEEGasDetEnergy':
00584                 gdet_ddl = det.get(self.currevt)
00585                 if gdet_ddl is not None:
00586                     default['gas_detector/f_11_ENRC'] = gdet_ddl.f_11_ENRC()
00587                     default['gas_detector/f_12_ENRC'] = gdet_ddl.f_12_ENRC()
00588                     default['gas_detector/f_21_ENRC'] = gdet_ddl.f_21_ENRC()
00589                     default['gas_detector/f_22_ENRC'] = gdet_ddl.f_22_ENRC()
00590                     default['gas_detector/f_63_ENRC'] = gdet_ddl.f_63_ENRC()
00591                     default['gas_detector/f_64_ENRC'] = gdet_ddl.f_64_ENRC()
00592 
00593             if det.name.dev == 'Evr':
00594                 evr_codes = det.eventCodes(self.currevt, this_fiducial_only=True)
00595                 if evr_codes is not None:
00596                     for c in self._evr_cfgcodes:
00597                         if c in evr_codes:
00598                             default['evr/code_%d' % c] = 1
00599                         else:
00600                             default['evr/code_%d' % c] = 0
00601                 
00602         self.event(default)
00603 
00604         return
00605 
00606 
00607     def sum(self, value):
00608         return self._mpi_reduce(value, MPI.SUM)
00609 
00610 
00611     def max(self, value):
00612         return self._mpi_reduce(value, MPI.MAX)
00613 
00614 
00615     def min(self, value):
00616         return self._mpi_reduce(value, MPI.MIN)
00617 
00618 
00619     def _mpi_reduce(self, value, function):
00620         t = self._num_or_array(value)
00621         if t is 'num':
00622             s = comm.reduce(value, function)
00623         elif t is 'array':
00624             s = np.empty_like(value) # recvbuf
00625             comm.Reduce(value, s, function)
00626         return s
00627 
00628 
00629     def _get_node(self, k):
00630         """
00631         Retrieve or create (if necessary) the pytables node
00632         for a specific key.
00633         """
00634 
00635         try:
00636             #print 'trying to get node: %s' % k
00637             node = self.file_handle.get_node('/'+k)
00638 
00639         except tables.NoSuchNodeError as e: # --> create node
00640             ex = self._dlist_master[k][0]
00641             if self._num_or_array(ex) == 'array':
00642                 a = tables.Atom.from_dtype(ex.dtype)
00643                 shp = tuple([0] + list(ex.shape))
00644             elif self._num_or_array(ex) == 'num':
00645                 a = tables.Atom.from_dtype(np.array(ex).dtype)
00646                 shp = (0,)
00647 
00648             path, _, name = k.rpartition('/')
00649             node = self.file_handle.create_earray(where='/'+path, name=name,
00650                                                   shape=shp, atom=a,
00651                                                   createparents=True)
00652 
00653         return node
00654 
00655 
00656     def save(self, *args, **kwargs):
00657         """
00658         Save registered data to an HDF5 file.
00659 
00660         There are 3 behaviors of the arguments to this function:
00661 
00662             1. Decide what 'event data' (declared by SmallData.event())
00663                should be saved
00664             2. Add summary (ie. any non-event) data using key-value
00665                pairs (similar to SmallData.event())
00666             3. Add summary (ie. any non-event) data organized in a
00667                heirarchy using nested dictionaries (als similar to 
00668                SmallData.event())
00669 
00670         These data are then saved to the file specifed in the SmallData
00671         constructor.
00672 
00673         Parameters
00674         ----------
00675         *args : strings
00676             A list of the event data names to save, if you want to save a
00677             subset. Otherwise save all data to disk. For example, imagine
00678             you had called SmallData.event(a=..., b=...). Then smldata.save('b')
00679             would not save data labelled 'a'. smldata.save() would save both
00680             'a' and 'b'.
00681 
00682         *args : dictionaries
00683             In direct analogy to the SmallData.event call, you can also pass
00684             HDF5 group heirarchies using nested dictionaries. Each level
00685             of the dictionary is a level in the HDF5 group heirarchy.
00686 
00687         **kwargs : datasetname, dataset
00688             Similar to SmallData.event, it is possible to save arbitrary
00689             singleton data (e.g. at the end of a run, to save an average over
00690             some quanitity).
00691 
00692         event_data : bool
00693             Special kwarg. If `False`, do not save the event data, just
00694             the run summary data. If `True` (default), do.
00695 
00696         Examples
00697         --------
00698         >>> # save all event data
00699         >>> smldata.save()
00700 
00701         >>> # save all event data AND "array_containing_sum"
00702         >>> smldata.save(cspad_sum=array_containing_sum)
00703         
00704         >>> # save 'b' event data, and no other
00705         >>> smldata.save('b')
00706 
00707         >>> # save all event data AND "/base/next_group/data"
00708         >>> smldata.save({'base': {'next_group' : data}})
00709         """
00710 
00711         self._gather()
00712 
00713         if self.master:
00714             self._save(*args, **kwargs)
00715 
00716         return
00717 
00718 
00719     def _save(self, *args, **kwargs):
00720 
00721         evt_variables_to_save = [s for s in args if type(s)==str]
00722         dictionaries_to_save  = [d for d in args if type(d)==dict]
00723         
00724 
00725         dict_to_save = {}
00726         dict_to_save.update(kwargs)
00727         for d in dictionaries_to_save:
00728             dict_to_save.update(_flatten_dictionary(d))
00729 
00730 
00731         if self.file_handle is None:
00732             # we could accept a 'filename' argument here in the save method
00733             raise IOError('no filename specified in SmallData constructor')
00734 
00735         # if 'event_data' key is set, save event data (default True)
00736         if kwargs.pop('event_data', True):
00737           
00738             # if the user has specified which keys to save, just
00739             # save those; else, save all event data
00740             if len(evt_variables_to_save) > 0:
00741                 keys_to_save = ['event_time', 'fiducials']
00742                 for k in evt_variables_to_save:
00743                     if k in self._dlist_master.keys():
00744                         keys_to_save.append(k)
00745                     else:
00746                         print('Warning: event data key %s has no '
00747                               'associated event data and will not '
00748                               'be saved' % k)
00749             else:
00750                 keys_to_save = self._dlist_master.keys()
00751 
00752             # for each item to save, write to disk
00753             for k in keys_to_save:
00754                 if len(self._dlist_master[k]) > 0:
00755 
00756                     # make a list of lists of arrays a single np array
00757                     #     note "[]" due to gathers with no data (for any rank)
00758                     self._dlist_master[k] = remove_values(self._dlist_master[k], [])
00759                     self._dlist_master[k] = np.concatenate(self._dlist_master[k])
00760 
00761                     node = self._get_node(k)
00762                     node.append( self._dlist_master[k] )
00763                     self._dlist_master[k] = []
00764                 else:
00765                     #print 'Warning: no data to save for key %s' % k
00766                     pass
00767         
00768 
00769         # save "accumulated" data (e.g. averages)
00770         for k,v in dict_to_save.items():
00771 
00772             if type(v) is not np.ndarray:
00773                 v = np.array([v])
00774 
00775             path, _, name = k.rpartition('/')
00776             node = self.file_handle.create_carray(where='/'+path, name=name,
00777                                                   obj=v,
00778                                                   createparents=True)
00779 
00780         return
00781 
00782 
00783     def close(self):
00784         """
00785         Close the HDF5 file used for writing.
00786         """
00787         if self.master:
00788             self.file_handle.close()
00789 
00790 

Generated on 19 Dec 2016 for PSDMSoftware by  doxygen 1.4.7