00001
00002 """
00003
00004 existing issue to remember:
00005
00006 > How do people access _dlist_master for applications like psmon?
00007
00008 > user forgets to call save, they get a small but empty HDF5
00009 -- atexit
00010 -- use a destructor: __del__ (prone to circular ref issues)
00011 -- use "with" statement + __exit__ (may affect interface)
00012
00013 > metadata
00014 -- docstrings for default values
00015 -- user interface for adding attributes/info/units (e.g. smld.attribute(a='a is great'))
00016 -- xarray compatability?
00017
00018 > chunking for performance?
00019
00020
00021 >>> from Silke
00022 - storing extra "attributes" or datasets with stuff like ROI (like summary/config field)
00023 - think about cube problem
00024 - user-controlled mode for event distribution (e.g. based on delay-time)
00025 - always write detectors like phasecav
00026
00027 >>> xarray thoughts from Jason:
00028 - coordinates/attributes for everything, using h5netcdf
00029 - for analyzing the small-data: xarray makes it easy to select/filter,
00030 and will keep track of coordinates for you
00031 - can use pandas for plotting and packages like seaborn (stat plotting)
00032 with these coordinate arrays
00033 - the xarray coordinates will provide the time sorting
00034 - wildcard to merge multiple files
00035 - didn't support variable length data? (maybe can do this)
00036 - treats NaNs correctly
00037 - would merge fast and slow detectors (e.g. eventcode 40 and 42)
00038 - handles dropped data
00039 - not clear that xarray can write pieces while taking data?
00040 - probably supports hierarchy of hdf5 groups in h5netcdf, but might
00041 make it more difficult to cut/merge/sort in xarray
00042 """
00043
00044 import numpy as np
00045 import tables
00046 import collections
00047
00048 from _psana import EventId, Source, Bld
00049 from psana import Detector, DetNames
00050
00051 from mpi4py import MPI
00052 comm = MPI.COMM_WORLD
00053 rank = comm.Get_rank()
00054 size = comm.Get_size()
00055
00056 MISSING_INT = -99999
00057 MISSING_FLOAT = np.nan
00058
00059 INT_TYPES = [int, np.int8, np.int16, np.int32, np.int64,
00060 np.int, np.uint8, np.uint16, np.uint32, np.uint64, np.uint]
00061 FLOAT_TYPES = [float, np.float16, np.float32, np.float64, np.float128, np.float]
00062
00063
00064 def _flatten_dictionary(d, parent_key='', sep='/'):
00065 """
00066 http://stackoverflow.com/questions/6027558/flatten-nested-python-dictionaries-compressing-keys
00067 """
00068 items = []
00069 for k, v in d.items():
00070 new_key = parent_key + sep + k if parent_key else k
00071 if isinstance(v, collections.MutableMapping):
00072 items.extend(_flatten_dictionary(v, new_key, sep=sep).items())
00073 else:
00074 items.append((new_key, v))
00075 return dict(items)
00076
00077
00078 def remove_values(the_list, val):
00079 return [value for value in the_list if value != val]
00080
00081
00082 class SynchDict(dict):
00083 """
00084 Class used to keep track of all arrays that need to be gathered
00085 (a.k.a. "send_list"). This dictionary is synched before every call
00086 to gather.
00087 """
00088 def synchronize(self):
00089 tot_send_list = comm.allgather(self)
00090 for node_send_list in tot_send_list:
00091 for k in node_send_list:
00092 if k not in self.keys():
00093 self[k] = node_send_list[k]
00094
00095 def keys(self):
00096
00097
00098 return sorted(self)
00099
00100
00101 class SmallData(object):
00102 """
00103
00104 On master,
00105 * numbers (int/float) are lists of arrays, representing workers (items
00106 in the list) and events (items in the array)
00107 * arrays are lists of lists (!) of arrays, representing workers (items
00108 in the outter list) and events (items in the inner list) and values
00109 of the original array (arrays)
00110
00111 """
00112
00113 def __init__(self, datasource_parent, filename=None, save_on_gather=False):
00114 """
00115
00116 Parameters
00117 ----------
00118 gather_interval : int
00119 The number of events a single process must see before the results
00120 get gathered for processing. If `None`, then only gather results
00121 when `save` is called.
00122
00123 save_on_gather : bool
00124 Whether to save the results to disk periodically (after the
00125 results get gathered)
00126 """
00127
00128 self._datasource_parent = datasource_parent
00129 self.save_on_gather = save_on_gather
00130 self._num_send_list = SynchDict()
00131 self._arr_send_list = SynchDict()
00132
00133 self._initialize_default_detectors()
00134
00135
00136 self._dlist = {}
00137 if self.master:
00138 self._dlist_master = {}
00139 self._newkeys = []
00140
00141 if filename and self.master:
00142 self.file_handle = tables.File(filename, 'w')
00143
00144 return
00145
00146
00147
00148 @property
00149 def master(self):
00150 return (rank == 0)
00151
00152
00153 @property
00154 def currevt(self):
00155 return self._datasource_parent._currevt
00156
00157
00158 @staticmethod
00159 def _num_or_array(obj):
00160
00161 data_type = type(obj)
00162 if ((data_type in [int, float]) or
00163 np.issubdtype(data_type, np.integer) or
00164 np.issubdtype(data_type, np.float)):
00165 s = 'num'
00166
00167 elif data_type is np.ndarray:
00168 s = 'array'
00169
00170 else:
00171 raise TypeError('object is not number or array')
00172
00173 return s
00174
00175
00176 def _sort(self, obj, sort_order):
00177 """
00178 sort `obj`, an array or list, by `sort_order` in dim -1,
00179 IN PLACE
00180 """
00181
00182 assert type(sort_order) == np.ndarray, 'sort_order must be array'
00183 t = self._num_or_array(obj[0])
00184
00185 if t is 'num':
00186 res = obj[sort_order]
00187 elif t is 'array':
00188 res = [x for (y,x) in sorted( zip(sort_order, obj) ) ]
00189
00190 return res
00191
00192
00193 def missing(self, key):
00194 """
00195 Use the send_list to figure out the correct types and fill
00196 values for missing data
00197 """
00198
00199
00200 if key in self._num_send_list.keys():
00201 t = self._num_send_list[key]
00202
00203 if t in INT_TYPES:
00204 missing_value = MISSING_INT
00205 elif t in FLOAT_TYPES:
00206 missing_value = MISSING_FLOAT
00207 else:
00208 raise ValueError('%s :: Invalid num type for missing data' % str(t))
00209
00210
00211 elif key in self._arr_send_list.keys():
00212
00213 t = self._arr_send_list[key][0]
00214 shape = self._arr_send_list[key][1]
00215 dtype = self._arr_send_list[key][2]
00216
00217 missing_value = np.empty(shape, dtype=dtype)
00218
00219 if dtype in INT_TYPES:
00220 missing_value.fill(MISSING_INT)
00221 elif dtype in FLOAT_TYPES:
00222 missing_value.fill(MISSING_FLOAT)
00223 else:
00224 raise ValueError('%s :: Invalid array type for missing data' % str(dtype))
00225
00226 else:
00227 raise KeyError('key %s not found in array or number send_list' % key)
00228
00229 return missing_value
00230
00231
00232 def _gather(self):
00233 """
00234 Gather arrays and numbers from all MPI ranks.
00235 """
00236
00237
00238
00239 self._arr_send_list.synchronize()
00240 self._num_send_list.synchronize()
00241
00242
00243 for k in self._arr_send_list.keys():
00244 if k not in self._dlist.keys(): self._dlist[k] = []
00245
00246 self._backfill_client(self._nevents, self._dlist[k], k)
00247 self._gather_arrays(self._dlist[k], k)
00248
00249 for k in self._num_send_list.keys():
00250 if k not in self._dlist.keys(): self._dlist[k] = []
00251 self._backfill_client(self._nevents, self._dlist[k], k)
00252 self._gather_numbers(self._dlist[k], k)
00253
00254 self._dlist = {}
00255
00256
00257
00258
00259
00260
00261
00262
00263
00264
00265
00266
00267
00268
00269 if self.master:
00270
00271 if len(self._dlist_master.keys()) > 0:
00272
00273
00274
00275
00276 sort_map = np.argsort(self._dlist_master['event_time'][-1])
00277
00278 for k in self._dlist_master.keys():
00279
00280
00281 self._dlist_master[k][-1] = [self._dlist_master[k][-1][i] for i in sort_map]
00282
00283
00284
00285 for k in self._newkeys:
00286
00287 events_in_mem = sum([len(x) for x in self._dlist_master['fiducials']])
00288 target_events = self._nevents_on_disk + events_in_mem
00289 self._dlist_master[k] = self._backfill_master(target_events,
00290 self._dlist_master[k],
00291 self.missing(k))
00292 self._newkeys=[]
00293
00294
00295 if self.save_on_gather:
00296 self._save()
00297
00298 return
00299
00300
00301 def _gather_numbers(self, num_list, key):
00302
00303 lengths = np.array(comm.gather(len(num_list)))
00304 mysend = np.array(num_list,dtype=self._num_send_list[key])
00305 myrecv = None
00306
00307 if self.master:
00308 myrecv = np.empty((sum(lengths)),mysend.dtype)
00309
00310 comm.Gatherv(sendbuf=mysend, recvbuf=[myrecv, lengths])
00311
00312 if self.master:
00313 self._dlist_append(self._dlist_master, key, myrecv)
00314
00315 return
00316
00317
00318 def _gather_arrays(self, array_list, key):
00319 """
00320 Gather arrays from all workers and update the master's dlist
00321 """
00322
00323
00324
00325
00326
00327
00328 worker_shps = comm.gather([ a.shape for a in array_list ])
00329
00330
00331 if len(array_list) > 0:
00332 mysend = np.concatenate([ x.reshape(-1) for x in array_list ])
00333 mysend = np.ascontiguousarray(mysend)
00334 else:
00335 mysend = np.array([], dtype=self._arr_send_list[key][2])
00336
00337
00338
00339
00340 if self.master:
00341 worker_lens = [[np.product(shp) for shp in worker] for worker in worker_shps]
00342 worker_msg_sizes = [np.sum(lens) for lens in worker_lens]
00343 recv_buff_size = np.sum(worker_msg_sizes)
00344 myrecv = np.empty(recv_buff_size, mysend.dtype)
00345 recvbuf = [myrecv, worker_msg_sizes]
00346 else:
00347 recvbuf = None
00348
00349 comm.Gatherv(sendbuf=mysend, recvbuf=recvbuf)
00350
00351
00352
00353 if self.master:
00354 start = 0
00355 reshaped_recv = []
00356 for worker in worker_shps:
00357 for shp in worker:
00358 l = np.product(shp)
00359 reshaped_recv.append( myrecv[start:start+l].reshape(*shp) )
00360 start += l
00361
00362
00363 self._dlist_append(self._dlist_master, key, reshaped_recv)
00364
00365 return
00366
00367
00368 @property
00369 def _nevents(self):
00370 if 'fiducials' in self._dlist:
00371 return len(self._dlist['fiducials'])
00372 else:
00373 return 0
00374
00375
00376 @property
00377 def _nevents_on_disk(self):
00378 try:
00379 self.file_handle.get_node('/', 'fiducials')
00380 return len(self.file_handle.root.fiducials)
00381 except tables.NoSuchNodeError:
00382 return 0
00383
00384
00385
00386
00387
00388
00389
00390
00391
00392
00393
00394
00395
00396
00397
00398 def _backfill_client(self, target_events, dlist_element, key):
00399 numfill = target_events - len(dlist_element)
00400 if numfill > 0:
00401 fill_value = self.missing(key)
00402 dlist_element.extend([fill_value]*numfill)
00403 return
00404
00405
00406 @staticmethod
00407 def _backfill_master(target_events, dlist_element, fill_value):
00408 numfill = target_events - sum([len(x) for x in dlist_element])
00409 if numfill > 0:
00410 dlist_element = [[fill_value]*numfill] + dlist_element
00411 return dlist_element
00412
00413
00414 def _dlist_append_client(self, key, value):
00415
00416 data_type = type(value)
00417
00418 if data_type is np.ndarray:
00419 value = np.atleast_1d(value)
00420 if key not in self._arr_send_list:
00421 self._arr_send_list[key] = (data_type, value.shape, value.dtype)
00422
00423 else:
00424 if key not in self._num_send_list:
00425 self._num_send_list[key] = data_type
00426
00427 if key not in self._dlist.keys():
00428 self._dlist[key] = []
00429
00430
00431 self._backfill_client(self._nevents - 1,
00432 self._dlist[key],
00433 key)
00434
00435 if data_type is np.ndarray:
00436
00437
00438 self._dlist[key].append(np.copy(value))
00439 else:
00440 self._dlist[key].append(value)
00441
00442 return
00443
00444
00445 def _dlist_append(self, dlist, key, value):
00446
00447 if key not in dlist.keys():
00448 dlist[key] = [value]
00449 self._newkeys.append(key)
00450 else:
00451 dlist[key].append(value)
00452
00453 return
00454
00455
00456 def event(self, *args, **kwargs):
00457 """
00458 Save some data from this event for later use.
00459
00460 Parameters
00461 ----------
00462 *args : dictionaries
00463 Save HDF5 group heirarchies using nested dictionaries. Each level
00464 of the dictionary is a level in the HDF5 group heirarchy.
00465
00466 **kwargs : datasetname, dataset
00467 Save an arbitrary piece of data from this run. The kwarg will
00468 give it an (HDF5 dataset) name that appears in the resulting
00469 HDF5 file.
00470
00471 Examples
00472 --------
00473 >>> # save the status of a laser
00474 >>> smldata.event(laser_on=laser_on)
00475
00476 >>> # save "data' at a special location "/base/next_group/data"
00477 >>> smldata.event({'base': {'next_group' : data}})
00478 """
00479
00480
00481 evt_id = self.currevt.get(EventId)
00482 if evt_id is None: return
00483
00484 if ('event_time' in kwargs.keys()) or ('fiducials' in kwargs.keys()):
00485 raise KeyError('`event_time` and `fiducials` are special names'
00486 ' reserved for timestamping -- choose a different '
00487 'name')
00488
00489
00490
00491
00492
00493 event_data_dict = {}
00494 event_data_dict.update(kwargs)
00495 for d in args:
00496 event_data_dict.update( _flatten_dictionary(d) )
00497
00498
00499 time = evt_id.time()[0] << 32 | evt_id.time()[1]
00500 fid = evt_id.fiducials()
00501
00502
00503 if ('fiducials' in self._dlist.keys()) and (self._dlist['fiducials'][-1] == fid):
00504
00505
00506 events_seen = len(self._dlist['fiducials'])
00507 for k in event_data_dict.keys():
00508
00509
00510
00511 if len( self._dlist.get(k, []) ) == events_seen:
00512 raise RuntimeError("Event data already added for "
00513 "key '%s' this event!" % k)
00514
00515
00516 else:
00517 self._dlist_append_client('event_time', time)
00518 self._dlist_append_client('fiducials', fid)
00519 self._event_default()
00520
00521 for k in event_data_dict.keys():
00522 self._dlist_append_client(k, event_data_dict[k])
00523
00524 return
00525
00526
00527 def _initialize_default_detectors(self):
00528
00529 self._default_detectors = []
00530
00531 for detector_name in ['EBeam', 'PhaseCavity', 'FEEGasDetEnergy']:
00532 try:
00533 det = Detector(detector_name)
00534 self._default_detectors.append(det)
00535 except KeyError as e:
00536 pass
00537
00538
00539 self._evr_cfgcodes = []
00540 for n in DetNames():
00541 if ':Evr.' in n[0]:
00542 det = Detector(n[0])
00543 self._default_detectors.append(det)
00544
00545 cfg = det._fetch_configs()
00546 assert len(cfg) == 1
00547 self._evr_cfgcodes += [e.code() for e in cfg[0].eventcodes()]
00548
00549 self._evr_cfgcodes = set(self._evr_cfgcodes)
00550 self._evr_cfgcodes = list(self._evr_cfgcodes)
00551 self._evr_cfgcodes.sort()
00552
00553 return
00554
00555
00556 def _event_default(self):
00557 """
00558 Cherry-picked machine parameters we think will be useful
00559 for basically every experiment
00560 """
00561
00562 default = {}
00563
00564 for det in self._default_detectors:
00565
00566 if det.name.dev == 'EBeam':
00567 ebeam_ddl = det.get(self.currevt)
00568 if ebeam_ddl is not None:
00569 default['ebeam/charge'] = ebeam_ddl.ebeamCharge()
00570 default['ebeam/dump_charge'] = ebeam_ddl.ebeamDumpCharge()
00571 default['ebeam/L3_energy'] = ebeam_ddl.ebeamL3Energy()
00572 default['ebeam/photon_energy'] = ebeam_ddl.ebeamPhotonEnergy()
00573 default['ebeam/pk_curr_bc2'] = ebeam_ddl.ebeamPkCurrBC2()
00574
00575 if det.name.dev == 'PhaseCavity':
00576 pc_ddl = det.get(self.currevt)
00577 if pc_ddl is not None:
00578 default['phase_cav/charge1'] = pc_ddl.charge1()
00579 default['phase_cav/charge2'] = pc_ddl.charge2()
00580 default['phase_cav/fit_time_1'] = pc_ddl.fitTime1()
00581 default['phase_cav/fit_time_2'] = pc_ddl.fitTime2()
00582
00583 if det.name.dev == 'FEEGasDetEnergy':
00584 gdet_ddl = det.get(self.currevt)
00585 if gdet_ddl is not None:
00586 default['gas_detector/f_11_ENRC'] = gdet_ddl.f_11_ENRC()
00587 default['gas_detector/f_12_ENRC'] = gdet_ddl.f_12_ENRC()
00588 default['gas_detector/f_21_ENRC'] = gdet_ddl.f_21_ENRC()
00589 default['gas_detector/f_22_ENRC'] = gdet_ddl.f_22_ENRC()
00590 default['gas_detector/f_63_ENRC'] = gdet_ddl.f_63_ENRC()
00591 default['gas_detector/f_64_ENRC'] = gdet_ddl.f_64_ENRC()
00592
00593 if det.name.dev == 'Evr':
00594 evr_codes = det.eventCodes(self.currevt, this_fiducial_only=True)
00595 if evr_codes is not None:
00596 for c in self._evr_cfgcodes:
00597 if c in evr_codes:
00598 default['evr/code_%d' % c] = 1
00599 else:
00600 default['evr/code_%d' % c] = 0
00601
00602 self.event(default)
00603
00604 return
00605
00606
00607 def sum(self, value):
00608 return self._mpi_reduce(value, MPI.SUM)
00609
00610
00611 def max(self, value):
00612 return self._mpi_reduce(value, MPI.MAX)
00613
00614
00615 def min(self, value):
00616 return self._mpi_reduce(value, MPI.MIN)
00617
00618
00619 def _mpi_reduce(self, value, function):
00620 t = self._num_or_array(value)
00621 if t is 'num':
00622 s = comm.reduce(value, function)
00623 elif t is 'array':
00624 s = np.empty_like(value)
00625 comm.Reduce(value, s, function)
00626 return s
00627
00628
00629 def _get_node(self, k):
00630 """
00631 Retrieve or create (if necessary) the pytables node
00632 for a specific key.
00633 """
00634
00635 try:
00636
00637 node = self.file_handle.get_node('/'+k)
00638
00639 except tables.NoSuchNodeError as e:
00640 ex = self._dlist_master[k][0]
00641 if self._num_or_array(ex) == 'array':
00642 a = tables.Atom.from_dtype(ex.dtype)
00643 shp = tuple([0] + list(ex.shape))
00644 elif self._num_or_array(ex) == 'num':
00645 a = tables.Atom.from_dtype(np.array(ex).dtype)
00646 shp = (0,)
00647
00648 path, _, name = k.rpartition('/')
00649 node = self.file_handle.create_earray(where='/'+path, name=name,
00650 shape=shp, atom=a,
00651 createparents=True)
00652
00653 return node
00654
00655
00656 def save(self, *args, **kwargs):
00657 """
00658 Save registered data to an HDF5 file.
00659
00660 There are 3 behaviors of the arguments to this function:
00661
00662 1. Decide what 'event data' (declared by SmallData.event())
00663 should be saved
00664 2. Add summary (ie. any non-event) data using key-value
00665 pairs (similar to SmallData.event())
00666 3. Add summary (ie. any non-event) data organized in a
00667 heirarchy using nested dictionaries (als similar to
00668 SmallData.event())
00669
00670 These data are then saved to the file specifed in the SmallData
00671 constructor.
00672
00673 Parameters
00674 ----------
00675 *args : strings
00676 A list of the event data names to save, if you want to save a
00677 subset. Otherwise save all data to disk. For example, imagine
00678 you had called SmallData.event(a=..., b=...). Then smldata.save('b')
00679 would not save data labelled 'a'. smldata.save() would save both
00680 'a' and 'b'.
00681
00682 *args : dictionaries
00683 In direct analogy to the SmallData.event call, you can also pass
00684 HDF5 group heirarchies using nested dictionaries. Each level
00685 of the dictionary is a level in the HDF5 group heirarchy.
00686
00687 **kwargs : datasetname, dataset
00688 Similar to SmallData.event, it is possible to save arbitrary
00689 singleton data (e.g. at the end of a run, to save an average over
00690 some quanitity).
00691
00692 event_data : bool
00693 Special kwarg. If `False`, do not save the event data, just
00694 the run summary data. If `True` (default), do.
00695
00696 Examples
00697 --------
00698 >>> # save all event data
00699 >>> smldata.save()
00700
00701 >>> # save all event data AND "array_containing_sum"
00702 >>> smldata.save(cspad_sum=array_containing_sum)
00703
00704 >>> # save 'b' event data, and no other
00705 >>> smldata.save('b')
00706
00707 >>> # save all event data AND "/base/next_group/data"
00708 >>> smldata.save({'base': {'next_group' : data}})
00709 """
00710
00711 self._gather()
00712
00713 if self.master:
00714 self._save(*args, **kwargs)
00715
00716 return
00717
00718
00719 def _save(self, *args, **kwargs):
00720
00721 evt_variables_to_save = [s for s in args if type(s)==str]
00722 dictionaries_to_save = [d for d in args if type(d)==dict]
00723
00724
00725 dict_to_save = {}
00726 dict_to_save.update(kwargs)
00727 for d in dictionaries_to_save:
00728 dict_to_save.update(_flatten_dictionary(d))
00729
00730
00731 if self.file_handle is None:
00732
00733 raise IOError('no filename specified in SmallData constructor')
00734
00735
00736 if kwargs.pop('event_data', True):
00737
00738
00739
00740 if len(evt_variables_to_save) > 0:
00741 keys_to_save = ['event_time', 'fiducials']
00742 for k in evt_variables_to_save:
00743 if k in self._dlist_master.keys():
00744 keys_to_save.append(k)
00745 else:
00746 print('Warning: event data key %s has no '
00747 'associated event data and will not '
00748 'be saved' % k)
00749 else:
00750 keys_to_save = self._dlist_master.keys()
00751
00752
00753 for k in keys_to_save:
00754 if len(self._dlist_master[k]) > 0:
00755
00756
00757
00758 self._dlist_master[k] = remove_values(self._dlist_master[k], [])
00759 self._dlist_master[k] = np.concatenate(self._dlist_master[k])
00760
00761 node = self._get_node(k)
00762 node.append( self._dlist_master[k] )
00763 self._dlist_master[k] = []
00764 else:
00765
00766 pass
00767
00768
00769
00770 for k,v in dict_to_save.items():
00771
00772 if type(v) is not np.ndarray:
00773 v = np.array([v])
00774
00775 path, _, name = k.rpartition('/')
00776 node = self.file_handle.create_carray(where='/'+path, name=name,
00777 obj=v,
00778 createparents=True)
00779
00780 return
00781
00782
00783 def close(self):
00784 """
00785 Close the HDF5 file used for writing.
00786 """
00787 if self.master:
00788 self.file_handle.close()
00789
00790