Source code for lightpath.path

"""
The :class:`.BeamPath` is the main abstraction for the lightpath module,
grouping together a set of devices using the :class:`.LightInterface` and
representing the path between them as single object. While the manipulation of
each of these object should be done at the device level, the
:meth:`.BeamPath.clear` does provide a powerful tool to quickly change the
status of the path.

The :class:`.BeamPath` object is also not meant to be a rigid representation,
:meth:`.BeamPath.split` and :meth:`.BeamPath.join` both allow for slicing and
combining of different areas of the LCLS beamline. However, keep in mind that
the path only knows the state of the devices it contains, so certain methods
might not return an accurate representation of reality if an upstream device is
affecting the beam.
"""
####################
# Standard Library #
####################
import math
import logging
from collections import Iterable

####################
#    Third Party   #
####################
from prettytable    import PrettyTable
from ophyd.ophydobj import OphydObject
from ophyd.status   import wait as status_wait

####################
#     Package      #
####################
from .errors import CoordinateError


logger = logging.getLogger(__name__)

[docs]class BeamPath(OphydObject): """ Represents a straight line of devices along the beamline The devices given must be a continuous set all along the same beamline, or, multiple beamlines with appropriate reflecting devices in between. Parameters ---------- devices : :class:`.LightDevice` Arguments are interpreted as LightDevices along a common beamline. name = str, optional Name of the BeamPath Raises ------ TypeError: If a non-LightDevice object is supplied CoordinateError: If a coordinate is not properly specified PathError: If multiple beamlines are present, with no reflecting device Attributes ---------- minimum_transmission : float Minimum amount of transmission considered for beam presence """ #Subscription Information SUB_PTH_CHNG = 'beampath_changed' _default_sub = SUB_PTH_CHNG #Transmission setting minimum_transmission = 0.1 def __init__(self, *devices, name=None): super().__init__(name=name) self.devices = devices self._has_subscribed = False logger.debug("Configuring path %s with %s devices", name, len(self.devices)) #Sort by position downstream to upstream try: #Check types and positions for dev in self.path: #Ensure positioning is physical if math.isnan(dev.md.z) or dev.md.z < 0.: raise CoordinateError('Device %r is reporting a ' 'non-existant beamline position, ' 'its coordinate was not properly ' 'initialized', dev) #Add as attribute setattr(self, dev.name.replace(' ','_'), dev) except AttributeError as e: raise TypeError('One of the devices does not meet the ' 'neccesary lightpath interface. Missing ' 'attribute {}'.format(e)) @property def branches(self): """ Branching devices along the path """ return [d for d in self.devices if getattr(d, 'branches', False)] @property def range(self): """ Starting z position of beamline """ return self.path[0].md.z, self.path[-1].md.z @property def path(self): """ List of devices ordered by coordinates """ return sorted(self.devices, key=lambda dev : dev.md.z) @property def blocking_devices(self): """ A list of devices that are currently inserted or are in unknown positions. This includes devices downstream of the first :attr:`.impediment` """ #Cache important prior devices prior = None last_branches = list() block = list() for device in self.path: #If we have switched beamlines if prior and device.md.beamline != prior.md.beamline: #Find improperly configured optics for optic in last_branches: if device.md.beamline not in optic.destination: block.append(optic) #Clear optics that have been evaluated last_branches.clear() #If our last device was an optic, make sure it wasn't required #to continue along this beampath elif (prior in last_branches and device.md.beamline in prior.branches and device.md.beamline not in prior.destination): block.append(last_branches.pop(-1)) #Find branching devices and store #They will be marked as blocking by downstream devices try: if device in self.branches: last_branches.append(device) #Find inserted devices elif device.inserted and (device.transmission < self.minimum_transmission): block.append(device) #Find unknown devices elif not device.removed and not device.inserted: block.append(device) except Exception as exc: logger.error('Unable to determine state of %s', device.name) logger.error(exc) block.append(device) finally: #Stache our prior device prior = device return block @property def incident_devices(self): """ A list of devices the beam is currently incident on. This includes the current :attr:`.impediment` and any upstream devices that may be inserted but have more transmission than :attr:`.minimum_transmission` """ #Find device information inserted = [d for d in self.path if d.inserted] impediment = self.impediment #No blocking devices, all inserted devices incident if not impediment: return inserted #Otherwise only return upstream of the impediment return [d for d in inserted if d.md.z <= impediment.md.z]
[docs] def show_devices(self, file=None): """ Print a table of the devices along the beamline Parameters ---------- file : file-like object File to writable """ #Initialize Table pt = PrettyTable(['Name', 'Prefix', 'Position', 'Beamline', 'Removed']) #Adjust Table settings pt.align = 'r' pt.align['Name'] = 'l' pt.align['Prefix'] = 'l' pt.float_format = '8.5' #Add info for d in self.path: pt.add_row([d.name, d.prefix, d.md.z, d.md.beamline, str(d.removed)]) #Show table print(pt, file=file)
@property def impediment(self): """ First blocking device along the path """ #Find device information blocks = self.blocking_devices if not blocks: return None else: return blocks[0] @property def cleared(self): """ Whether beamline is clear of any devices that are below the :attr:`.minimum_transmission` """ return not any(self.blocking_devices)
[docs] def clear(self, wait=False, timeout=None, passive=False, ignore=None): """ Clear the beampath of all obstructions Parameters ---------- wait : bool Wait for all devices to complete their motion timeout : float, optional Duration to wait for device movements ignore: device or iterable, optional Leave devices in their current state without removing them passive : bool, optional If False, devices that are inserted but don't attenuate the beam below :attr:`.minimum_threshold` are ignored Returns ------- statuses : Returns list of status objects returned by :meth:`.LightInterface.remove` """ logger.info('Clearing beampath %s ...', self) #Assemble device list target_devices, ignored = self._ignore(ignore, passive=passive) #Remove devices logger.info('Removing devices along the beampath ...') status = [device.remove(timeout=timeout) for device in target_devices if not device.removed and hasattr(device, 'remove')] #Wait parameters if wait: logger.info('Waiting for all devices to be '\ 'removed from the beampath %s ...', self) #Wait consecutively for statuses, this can be done by combining #statuses in the future for s in status: logger.debug('Waiting for %s to be done ...', s) status_wait(s, timeout=timeout) logger.info('Completed') return status
[docs] def join(self, *beampaths): """ Join multiple beampaths with the current one Parameters ---------- beampaths : arguments A list of beampaths to join into a complete path, order is irrelavant Returns ------- BeamPath : :class:`.BeamPath` A new object with all of the path devices Raises ------ TypeError: Raised if a non-BeamPath object is supplied """ return BeamPath.from_join(self, *beampaths, name=self.name)
[docs] def split(self, z=None, device=None): """ Split the beampath producing two new BeamPath objects either by a specific position or a devices location Parameters ---------- z : float Z position to split the paths device : LightDevice, name, or base PV The specified device will be the last device in the first :class:`.BeamPath` object Returns ------- BeamPath, BeamPath Two new beampath instances """ #Not enough information if not z and not device: raise ValueError("Must supply information where to split the path") #Grab the z if given a device if device: z = device.md.z #Look within range if z<self.range[0] or z>self.range[1]: raise ValueError("Split position {} is not within the range of " "the path.".format(z)) #Split the paths return (BeamPath(*[d for d in self.devices if d.md.z <= z]), BeamPath(*[d for d in self.devices if d.md.z > z]) )
[docs] @classmethod def from_join(cls, *beampaths, name=None): """ Join other beampaths with the current one Parameters ---------- beampaths : arguments A list of beampaths to join into a complete path, order is irrelavant name : str, optional New name for created beampath Returns ------- BeamPath : :class:`.BeamPath` A new object with all of the path devices Raises ------ TypeError: Raised if a non-BeamPath object is supplied """ #Catch invalid paths if not all(isinstance(bp,BeamPath) for bp in beampaths): raise TypeError('Can not join non-BeamPath object') #Flatten path lists devices = [device for path in beampaths for device in path.devices] #Create a new instance return BeamPath(*set(devices), name=name)
def _ignore(self, ignore_devices, passive=False): """ Assemble list of available devices with some exclusions Parameters ---------- ignore_devices : list Devices to ignore passive : bool If False, ignore passive devices Returns ------- (target, ignore) : tuple Tuple of two lists of devices """ ignore = list() #Add passive devices to ignored if not passive: logger.debug("Passive devices will be ignored ...") ignore.extend([d for d in self.devices if d.transmission > self.minimum_transmission]) #Add ignored devices if isinstance(ignore_devices, Iterable): ignore.extend(ignore_devices) elif ignore_devices: ignore.append(ignore_devices) #Grab target devices target_devices = [device for device in self.devices if device not in ignore] logger.debug("Targeting devices %s ...", target_devices) logger.debug('Ignoring devices %s ...', ignore) return target_devices, ignore def _device_moved(self, *args, obj=None, **kwargs): """ Run when a device changes state """ #Determine whether our path has been changed block = self.impediment if block: block = block.md.z else: block = math.inf #If device is upstream of impediment if obj and obj.md.z <= block: self._run_subs(sub_type = self.SUB_PTH_CHNG, device = obj)
[docs] def subscribe(self, cb, event_type=None, run=True): """ Subscribe to changes of the valve Parameters ---------- cb : callable Callback to be run event_type : str, optional Type of event to run callback on run : bool, optional Run the callback immediatelly """ if not self._has_subscribed: #Subscribe to all child devices for dev in self.devices: #Add callback here! try: dev.subscribe(self._device_moved, event_type=dev.SUB_STATE, run=False) except: logger.error("BeamPath is unable to subscribe to device %s", dev.name) self._has_subscribed = True super().subscribe(cb, event_type=event_type, run=run)
def _repr_info(self): yield('range', self.range) yield('devices', len(self.devices)) __hash = object.__hash__ def __eq__(self, *args, **kwargs): try: return self.devices == args[0].devices except AttributeError: return super().__eq__(*args, **kwargs)