#!/usr/bin/env python
##############################################################################
##
## This file is part of Sardana
##
## http://www.tango-controls.org/static/sardana/latest/doc/html/index.html
##
## Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
## Sardana is free software: you can redistribute it and/or modify
## it under the terms of the GNU Lesser General Public License as published by
## the Free Software Foundation, either version 3 of the License, or
## (at your option) any later version.
##
## Sardana is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU Lesser General Public License for more details.
##
## You should have received a copy of the GNU Lesser General Public License
## along with Sardana. If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################
"""This module is part of the Python Pool libray. It defines the class for an
acquisition"""
__all__ = [ "AcquisitionState", "AcquisitionMap", "PoolCTAcquisition",
"Pool0DAcquisition", "Channel", "PoolIORAcquisition" ]
__docformat__ = 'restructuredtext'
import time
from taurus.core.util import Enumeration, DebugIt
from sardana import State, ElementType, TYPE_TIMERABLE_ELEMENTS
from sardana.sardanathreadpool import get_thread_pool
from poolaction import ActionContext, PoolActionItem, PoolAction
#: enumeration representing possible motion states
AcquisitionState = Enumeration("AcquisitionState", ( \
"Stopped",
# "StoppedOnError",
# "StoppedOnAbort",
"Acquiring",
"Invalid") )
AS = AcquisitionState
AcquiringStates = AS.Acquiring,
StoppedStates = AS.Stopped, #MS.StoppedOnError, MS.StoppedOnAbort
AcquisitionMap = {
#AS.Stopped : State.On,
AS.Acquiring : State.Moving,
AS.Invalid : State.Invalid,
}
class PoolAcquisition(PoolAction):
def __init__(self, main_element, name="Acquisition"):
PoolAction.__init__(self, main_element, name)
ctname = name + ".CTAcquisition"
zerodname = name + ".0DAcquisition"
self._0d_acq = zd_acq = Pool0DAcquisition(main_element, name=zerodname)
self._ct_acq = PoolCTAcquisition(main_element, name=ctname, slaves=(zd_acq,))
def run(self, *args, **kwargs):
n = kwargs.get('multiple', 1)
if n == 1:
return self._run_single(*args, **kwargs)
return self._run_multiple(*args, **kwargs)
def _run_multiple(self, *args, **kwargs):
n = kwargs['multiple']
synch = kwargs.get("synch", False)
if synch:
for _ in range(n):
self._run_single(self, *args, **kwargs)
else:
kwargs["synch"] = True
get_thread_pool().add(self._run_multiple, None, *args, **kwargs)
def _run_single(self, *args, **kwargs):
"""Runs this action"""
synch = kwargs.get("synch", False)
ct_acq = self._ct_acq
zd_acq = self._0d_acq
if synch:
ct_acq.run(*args, **kwargs)
else:
ct_acq.run(*args, **kwargs)
zd_acq.run(*args, **kwargs)
def _get_acq_for_element(self, element):
elem_type = element.get_type()
if elem_type in TYPE_TIMERABLE_ELEMENTS:
return self._ct_acq
elif elem_type == ElementType.ZeroDExpChannel:
return self._0d_acq
def clear_elements(self):
"""Clears all elements from this action"""
def add_element(self, element):
"""Adds a new element to this action.
:param element: the new element to be added
:type element: sardana.pool.poolelement.PoolElement"""
return self._get_acq_for_element(element).add_element(element)
def remove_element(self, element):
"""Removes an element from this action. If the element is not part of
this action, a ValueError is raised.
:param element: the new element to be removed
:type element: sardana.pool.poolelement.PoolElement
:raises: ValueError"""
return self._get_acq_for_element(element).add_element(element)
def get_elements(self, copy_of=False):
"""Returns a sequence of all elements involved in this action.
:param copy_of: If False (default) the internal container of elements is
returned. If True, a copy of the internal container is
returned instead
:type copy_of: bool
:return: a sequence of all elements involved in this action.
:rtype: seq<sardana.pool.poolelement.PoolElement>"""
return self._ct_acq.get_elements() + self._0d_acq.get_elements()
def get_pool_controller_list(self):
"""Returns a list of all controller elements involved in this action.
:return: a list of all controller elements involved in this action.
:rtype: list<sardana.pool.poolelement.PoolController>"""
return self._pool_ctrl_list
def get_pool_controllers(self):
"""Returns a dict of all controller elements involved in this action.
:return: a dict of all controller elements involved in this action.
:rtype: dict<sardana.pool.poolelement.PoolController, seq<sardana.pool.poolelement.PoolElement>>"""
ret = {}
ret.update(self._ct_acq.get_pool_controllers())
ret.update(self._0d_acq.get_pool_controllers())
return ret
def read_value(self, ret=None, serial=False):
"""Reads value information of all elements involved in this action
:param ret: output map parameter that should be filled with value
information. If None is given (default), a new map is
created an returned
:type ret: dict
:param serial: If False (default) perform controller HW value requests
in parallel. If True, access is serialized.
:type serial: bool
:return: a map containing value information per element
:rtype: dict<:class:~`sardana.pool.poolelement.PoolElement`,
:class:~`sardana.sardanavalue.SardanaValue`>"""
ret = self._ct_acq.read_value(ret=ret, serial=serial)
ret.update(self._0d_acq.read_value(ret=ret, serial=serial))
return ret
class Channel(PoolActionItem):
def __init__(self, acquirable, info=None):
PoolActionItem.__init__(self, acquirable)
if info:
self.__dict__.update(info)
def __getattr__(self, name):
return getattr(self.element, name)
[docs]class PoolCTAcquisition(PoolAction):
def __init__(self, main_element, name="CTAcquisition", slaves=None):
self._channels = None
if slaves is None:
slaves = ()
self._slaves = slaves
PoolAction.__init__(self, main_element, name)
[docs] def get_read_value_loop_ctrls(self):
return self._pool_ctrl_dict_loop
[docs] def start_action(self, *args, **kwargs):
"""Prepares everything for acquisition and starts it.
:param: config"""
pool = self.pool
# prepare data structures
self._aborted = False
self._stopped = False
self._acq_sleep_time = kwargs.pop("acq_sleep_time",
pool.acq_loop_sleep_time)
self._nb_states_per_value = \
kwargs.pop("nb_states_per_value",
pool.acq_loop_states_per_value)
self._integ_time = integ_time = kwargs.get("integ_time")
self._mon_count = mon_count = kwargs.get("monitor_count")
if integ_time is None and mon_count is None:
raise Exception("must give integration time or monitor counts")
if integ_time is not None and mon_count is not None:
raise Exception("must give either integration time or monitor counts (not both)")
_ = kwargs.get("items", self.get_elements())
cfg = kwargs['config']
# determine which is the controller which holds the master channel
if integ_time is not None:
master_key = 'timer'
master_value = integ_time
if mon_count is not None:
master_key = 'monitor'
master_value = - mon_count
master = cfg[master_key]
master_ctrl = master.controller
pool_ctrls_dict = dict(cfg['controllers'])
pool_ctrls_dict.pop('__tango__', None)
pool_ctrls = []
self._pool_ctrl_dict_loop = _pool_ctrl_dict_loop = {}
for ctrl, v in pool_ctrls_dict.items():
if ctrl.is_timerable():
pool_ctrls.append(ctrl)
if ElementType.CTExpChannel in ctrl.get_ctrl_types():
_pool_ctrl_dict_loop[ctrl] = v
# make sure the controller which has the master channel is the last to
# be called
pool_ctrls.remove(master_ctrl)
pool_ctrls.append(master_ctrl)
# Determine which channels are active
self._channels = channels = {}
for pool_ctrl in pool_ctrls:
ctrl = pool_ctrl.ctrl
pool_ctrl_data = pool_ctrls_dict[pool_ctrl]
main_unit_data = pool_ctrl_data['units']['0']
elements = main_unit_data['channels']
for element, element_info in elements.items():
axis = element.axis
channel = Channel(element, info=element_info)
channels[element] = channel
#for channel in channels:
# channel.prepare_to_acquire(self)
with ActionContext(self):
# PreLoadAll, PreLoadOne, LoadOne and LoadAll
for pool_ctrl in pool_ctrls:
ctrl = pool_ctrl.ctrl
pool_ctrl_data = pool_ctrls_dict[pool_ctrl]
main_unit_data = pool_ctrl_data['units']['0']
ctrl.PreLoadAll()
master = main_unit_data[master_key]
axis = master.axis
res = ctrl.PreLoadOne(axis, master_value)
if not res:
raise Exception("%s.PreLoadOne(%d) returns False" % (pool_ctrl.name, axis,))
ctrl.LoadOne(axis, master_value)
ctrl.LoadAll()
# PreStartAll on all controllers
for pool_ctrl in pool_ctrls:
pool_ctrl.ctrl.PreStartAll()
# PreStartOne & StartOne on all elements
for pool_ctrl in pool_ctrls:
ctrl = pool_ctrl.ctrl
pool_ctrl_data = pool_ctrls_dict[pool_ctrl]
main_unit_data = pool_ctrl_data['units']['0']
elements = main_unit_data['channels']
for element in elements:
axis = element.axis
channel = channels[element]
if channel.enabled:
ret = ctrl.PreStartOne(axis, master_value)
if not ret:
raise Exception("%s.PreStartOne(%d) returns False" \
% (pool_ctrl.name, axis))
ctrl.StartOne(axis, master_value)
# set the state of all elements to and inform their listeners
for channel in channels:
channel.set_state(State.Moving, propagate=2)
# StartAll on all controllers
for pool_ctrl in pool_ctrls:
pool_ctrl.ctrl.StartAll()
[docs] def in_acquisition(self, states):
"""Determines if we are in acquisition or if the acquisition has ended
based on the current unit trigger modes and states returned by the
controller(s)
:param states: a map containing state information as returned by
read_state_info
:type states: dict<PoolElement, State>
:return: returns True if in acquisition or False otherwise
:rtype: bool"""
for elem in states:
s = states[elem][0][0]
if self._is_in_action(s):
return True
[docs] def action_loop(self):
i = 0
states, values = {}, {}
for element in self._channels:
states[element] = None
#values[element] = None
nap = self._acq_sleep_time
nb_states_per_value = self._nb_states_per_value
# read values to send a first event when starting to acquire
with ActionContext(self):
self.raw_read_value_loop(ret=values)
for acquirable, value in values.items():
acquirable.put_value(value, propagate=2)
while True:
self.read_state_info(ret=states)
if not self.in_acquisition(states):
break
# read value every n times
if not i % nb_states_per_value:
self.read_value_loop(ret=values)
for acquirable, value in values.items():
acquirable.put_value(value)
time.sleep(nap)
i += 1
for slave in self._slaves:
try:
slave.stop_action()
except:
self.warning("Unable to stop slave acquisition %s",
slave.getLogName())
self.debug("Details", exc_info=1)
with ActionContext(self):
self.raw_read_state_info(ret=states)
self.raw_read_value_loop(ret=values)
for acquirable, state_info in states.items():
# first update the element state so that value calculation
# that is done after takes the updated state into account
acquirable.set_state_info(state_info, propagate=0)
if acquirable in values:
value = values[acquirable]
acquirable.put_value(value, propagate=2)
with acquirable:
acquirable.clear_operation()
state_info = acquirable._from_ctrl_state_info(state_info)
acquirable.set_state_info(state_info, propagate=2)
class Pool0DAcquisition(PoolAction):
def __init__(self, main_element, name="0DAcquisition"):
self._channels = None
PoolAction.__init__(self, main_element, name)
def start_action(self, *args, **kwargs):
"""Prepares everything for acquisition and starts it.
:param: config"""
pool = self.pool
# prepare data structures
self._aborted = False
self._stopped = False
self._acq_sleep_time = kwargs.pop("acq_sleep_time",
pool.acq_loop_sleep_time)
self._nb_states_per_value = \
kwargs.pop("nb_states_per_value",
pool.acq_loop_states_per_value)
integ_time = kwargs.get("integ_time")
mon_count = kwargs.get("monitor_count")
if integ_time is None and mon_count is None:
raise Exception("must give integration time or monitor counts")
if integ_time is not None and mon_count is not None:
raise Exception("must give either integration time or monitor counts (not both)")
items = kwargs.get("items")
if items is None:
items = self.get_elements()
cfg = kwargs['config']
pool_ctrls_dict = dict(cfg['controllers'])
pool_ctrls_dict.pop('__tango__', None)
pool_ctrls = []
for ctrl in pool_ctrls_dict:
if ElementType.ZeroDExpChannel in ctrl.get_ctrl_types():
pool_ctrls.append(ctrl)
# Determine which channels are active
self._channels = channels = {}
for pool_ctrl in pool_ctrls:
ctrl = pool_ctrl.ctrl
pool_ctrl_data = pool_ctrls_dict[pool_ctrl]
main_unit_data = pool_ctrl_data['units']['0']
elements = main_unit_data['channels']
for element, element_info in elements.items():
channel = Channel(element, info=element_info)
channels[element] = channel
with ActionContext(self):
# set the state of all elements to and inform their listeners
for channel in channels:
channel.clear_buffer()
channel.set_state(State.Moving, propagate=2)
def in_acquisition(self, states):
"""Determines if we are in acquisition or if the acquisition has ended
based on the current unit trigger modes and states returned by the
controller(s)
:param states: a map containing state information as returned by
read_state_info
:type states: dict<PoolElement, State>
:return: returns True if in acquisition or False otherwise
:rtype: bool"""
for state in states:
s = states[state][0]
if self._is_in_action(s):
return True
def action_loop(self):
i = 0
states, values = {}, {}
for element in self._channels:
states[element] = None
values[element] = None
nap = self._acq_sleep_time
while not (self._stopped or self._aborted):
self.read_value(ret=values)
for acquirable, value in values.items():
acquirable.put_value(value)
i += 1
time.sleep(nap)
with ActionContext(self):
self.raw_read_state_info(ret=states)
for acquirable, state_info in states.items():
# first update the element state so that value calculation
# that is done after takes the updated state into account
state_info = acquirable._from_ctrl_state_info(state_info)
acquirable.set_state_info(state_info, propagate=0)
with acquirable:
acquirable.clear_operation()
acquirable.set_state_info(state_info, propagate=2)
def stop_action(self, *args, **kwargs):
"""Stop procedure for this action."""
self._stopped = True
def abort_action(self, *args, **kwargs):
"""Aborts procedure for this action"""
self._aborted = True
class PoolIORAcquisition(PoolAction):
def __init__(self, pool, name="IORAcquisition"):
self._channels = None
PoolAction.__init__(self, pool, name)
def start_action(self, *args, **kwargs):
pass
def in_acquisition(self, states):
return True
pass
@DebugIt()
def action_loop(self):
i = 0
states, values = {}, {}
for element in self._channels:
states[element] = None
values[element] = None
# read values to send a first event when starting to acquire
self.read_value(ret=values)
for acquirable, value in values.items():
acquirable.put_value(value, propagate=2)
while True:
self.read_state_info(ret=states)
if not self.in_acquisition(states):
break
# read value every n times
if not i % 5:
self.read_value(ret=values)
for acquirable, value in values.items():
acquirable.put_value(value)
i += 1
time.sleep(0.01)
self.read_state_info(ret=states)
# first update the element state so that value calculation
# that is done after takes the updated state into account
for acquirable, state_info in states.items():
acquirable.set_state_info(state_info, propagate=0)
# Do NOT send events before we exit the OperationContext, otherwise
# we may be asked to start another action before we leave the context
# of the current action. Instead, send the events in the finish hook
# which is executed outside the OperationContext
def finish_hook(*args, **kwargs):
# read values and propagate the change to all listeners
self.read_value(ret=values)
for acquirable, value in values.items():
acquirable.put_value(value, propagate=2)
# finally set the state and propagate to all listeners
for acquirable, state_info in states.items():
acquirable.set_state_info(state_info, propagate=2)
self.set_finish_hook(finish_hook)