diff --git a/src/dt4acc/core/accelerators/element_proxies.py b/src/dt4acc/core/accelerators/element_proxies.py index 3d5972f..b734a85 100644 --- a/src/dt4acc/core/accelerators/element_proxies.py +++ b/src/dt4acc/core/accelerators/element_proxies.py @@ -41,8 +41,10 @@ def estimate_shift(element, eps=1e-8): return shift -def manipulate_kick(kick_angles: Tuple[float, float], kick_x = None, kick_y = None) -> Tuple[float, float]: - kick_angles = kick_angles.copy() +def manipulate_kick( + kick_angles: Tuple[float, float], kick_x=None, kick_y=None +) -> Tuple[float, float]: + kick_angles = kick_angles.copy() if kick_x is not None: kick_angles[0] = kick_x if kick_y is not None: @@ -90,7 +92,7 @@ async def update_shift(self, *, dx=None, dy=None): """ assert dx is not None or dy is not None, "Either dx or dy must be provided" - element, = self._obj + (element,) = self._obj shift = estimate_shift(element) dx = dx if dx is not None else shift[0] @@ -100,7 +102,7 @@ async def update_shift(self, *, dx=None, dy=None): shift_elem(element, dx, dy) # look what really happened - element, = self._obj + (element,) = self._obj dxr, _, dyr, _, _, _ = estimate_shift(element) pass @@ -119,7 +121,7 @@ async def update(self, property_id: str, value, element_data): if value is not None: assert np.isfinite(value), "Value must be finite" - element, = self._obj + (element,) = self._obj method_name = f"set_{property_id}" if method_name == "set_x": @@ -129,12 +131,19 @@ async def update(self, property_id: str, value, element_data): elif method_name == "set_roll": await self.update_roll(roll=value) elif method_name == "set_im": - val = value * element_data.hw2phys - element_type = str(element).split('\n')[0] - if 'Sextupole' in element_type: - element.update(H=val) - elif 'Quadrupole' in element_type: - element.update(K=val) + raise AssertionError("should not end up here") + # val = value * element_data.hw2phys + # element_type = str(element).split('\n')[0] + elif method_name == "set_main_strength": + element_type = element.__class__.__name__ + if "Sextupole" in element_type: + element.update(H=value) + elif "Quadrupole" in element_type: + element.update(K=value) + else: + raise NotImplementedError( + f"Don't know how to set main strength for element {element_type}" + ) elif method_name == "set_freq": element.update(Frequency=value * 1000) elif method_name in ["set_rdbk", "set_K"]: @@ -144,8 +153,9 @@ async def update(self, property_id: str, value, element_data): elif method_name == "set_y_kick": element.update(KickAngle=manipulate_kick(element.KickAngle, kick_y=value)) elif method_name == "set_frequency": - raise AssertionError("Cavity control not yet declared as functional, have a look to the line below") + # should be ok for AT element.update(Frequency=value) + # raise AssertionError("Cavity control not yet declared as functional, have a look to the line below") else: method = getattr(element, method_name) await method(value) @@ -153,15 +163,47 @@ async def update(self, property_id: str, value, element_data): await self.on_update_finished.trigger(None) def peek(self, property_id: str) -> float: - element, = self._obj + if property_id in ["K", "H", "main_strength"]: + return self.peek_main_strength(property_id) + elif property_id in ["x_kick", "y_kick"]: + return self.peek_kick(property_id) + elif property_id in ["frequency"]: + return self.peek_frequency() + else: + raise NotImplementedError( + f"handling property {property_id} not (yet) implemented" + ) + + def peek_frequency(self): + (element,) = self._obj + return element.Frequency + + def peek_main_strength(self, property_id: str): + (element,) = self._obj element_type = element.__class__.__name__ - assert property_id == "main_strength" if element_type == "Quadrupole": + assert property_id in ["K", "main_strength"] return element.K elif element_type == "Sextupole": + if property_id not in ["H", "main_strength"]: + raise AssertionError( + f"Not handling {property_id} for element {element_type}" + ) return element.H else: - raise NotImplementedError + raise NotImplementedError( + f"main strength not implemented for element {element_type}" + ) + + def peek_kick(self, property_id: str): + (element,) = self._obj + lut = dict(x_kick=0, y_kick=1) + try: + idx = lut[property_id] + except KeyError as ke: + raise AssertionError(f"Did not expect kick {property_id}") + return element.KickAngle[idx] + class AddOnElementProxy(ElementProxy): """ @@ -191,7 +233,10 @@ class KickAngleCorrectorProxy(AddOnElementProxy): """ def __init__(self, obj, *, correction_plane, **kwargs): - assert correction_plane in ["horizontal", "vertical"], "Invalid correction plane" + assert correction_plane in [ + "horizontal", + "vertical", + ], "Invalid correction plane" self.correction_planes = correction_plane super().__init__(*obj, **kwargs) @@ -206,12 +251,14 @@ async def update_kick(self, *, kick_x=None, kick_y=None, element_data): Todo: review if this code is still neede """ - element, = self._obj + (element,) = self._obj if kick_x is not None: kick_x = kick_x * element_data.hw2phys if kick_y is not None: kick_y = kick_y * element_data.hw2phys - element.update(KickAngle=manipulate_kick(self._obj.KickAngle, kick_x=kick_x, kick_y=kick_y)) + element.update( + KickAngle=manipulate_kick(self._obj.KickAngle, kick_x=kick_x, kick_y=kick_y) + ) async def update(self, property_id: str, value, element_data): """ diff --git a/src/dt4acc/core/command.py b/src/dt4acc/core/command.py index 5b97a43..fad263e 100644 --- a/src/dt4acc/core/command.py +++ b/src/dt4acc/core/command.py @@ -1,8 +1,18 @@ +from typing import Sequence + from bact_twin_architecture.data_model.command import Command, BehaviourOnError +from bact_twin_architecture.data_model.identifiers import ( + LatticeElementPropertyID, + DevicePropertyID, + ConversionID, +) from bact_twin_architecture.interfaces.command_rewritter import CommandRewriterBase +from bact_twin_architecture.interfaces.liaison_manager import LiaisonManagerBase +from bact_twin_architecture.interfaces.translator_service import TranslatorServiceBase from .accelerators.accelerator_manager import AcceleratorManager from .update_context_manager import UpdateContext +from ..custom_epics.ioc.liasion_translation_manager import TranslatorService class UpdateManager: @@ -10,17 +20,59 @@ class UpdateManager: Treats the incoming requests as commands to be rewritten and delivered to the machine + + Supports peeking into the engine """ - def __init__(self, command_rewritter: CommandRewriterBase, acc_mgr: AcceleratorManager): + + def __init__( + self, + *, + command_rewritter: CommandRewriterBase, + liaison_manager: LiaisonManagerBase, + translator_service: TranslatorServiceBase, + acc_mgr: AcceleratorManager, + ): self.command_rewritter = command_rewritter + self.liaison_manager = liaison_manager + self.translator_service = translator_service self.acc_mgr = acc_mgr - def peek(self, lat_elem, property_name) -> object: + def device_value_from_peeking_engine( + self, dev_prop: DevicePropertyID + ) -> Sequence[float]: + """ + Todo: + review interface + place it in correct layer + + How to handle that many values can be returned """ - Todo: remove layer violation + if dev_prop.device_name[:3].upper() == "CAV": + pass + lat_props = self.liaison_manager.inverse(dev_prop) + if lat_props is None: + raise AssertionError( + f"{self.liaison_manager.__class__.__name__} does not know {dev_prop}" + ) + + def convert(lat_prop): + translator = self.translator_service.get( + ConversionID(lattice_property_id=lat_prop, device_property_id=dev_prop) + ) + val = self.peek_engine(lat_prop) + return translator.forward(val) + + values = [convert(lat_prop) for lat_prop in lat_props] + return values + + def peek_engine(self, lat_elem_prop: LatticeElementPropertyID) -> object: + """peek into underlaying engine to get value + + Todo: + resolve layring violation """ - proxy = self.acc_mgr.accelerator.proxy_factory.get(lat_elem) - val = proxy.peek(property_id=property_name) + proxy = self.acc_mgr.accelerator.proxy_factory.get(lat_elem_prop.element_name) + val = proxy.peek(property_id=lat_elem_prop.property) return val async def update(self, *, device_id, property_name, value=None, element=None): @@ -34,11 +86,21 @@ async def update(self, *, device_id, property_name, value=None, element=None): # this argument shall be removed assert element is None # update context manager: currently here as the async io comm stops at first exception - with UpdateContext(element_id=device_id, property_name=property_name, value=value, element=element, - kwargs=dict()): + with UpdateContext( + element_id=device_id, + property_name=property_name, + value=value, + element=element, + kwargs=dict(), + ): cmds = self.command_rewritter.inverse( - Command(id=device_id, property=property_name, value=value, behaviour_on_error=BehaviourOnError.stop - )) + Command( + id=device_id, + property=property_name, + value=value, + behaviour_on_error=BehaviourOnError.stop, + ) + ) cmds for cmd in cmds: # Todo: simplify the code down here ... diff --git a/src/dt4acc/core/interfaces/accelerator_interface.py b/src/dt4acc/core/interfaces/accelerator_interface.py index c5ec6e5..cf955b0 100644 --- a/src/dt4acc/core/interfaces/accelerator_interface.py +++ b/src/dt4acc/core/interfaces/accelerator_interface.py @@ -9,11 +9,14 @@ class AcceleratorInterface(metaclass=ABCMeta): Todo: Derive from a list interface """ + @abstractmethod def get_element(self, element_id) -> ElementInterface: + """ + Review if derived classes use async implementations + """ pass @abstractmethod def __init__(self, *args, **kwargs): pass - diff --git a/src/dt4acc/custom_epics/ioc/handlers.py b/src/dt4acc/custom_epics/ioc/handlers.py index 792ed75..8667748 100644 --- a/src/dt4acc/custom_epics/ioc/handlers.py +++ b/src/dt4acc/custom_epics/ioc/handlers.py @@ -20,6 +20,8 @@ liasion_manager=lm, translation_service=tm ), + liaison_manager=lm, + translator_service=tm, acc_mgr=setup_accelerator() ) diff --git a/src/dt4acc/custom_epics/ioc/liasion_translation_manager.py b/src/dt4acc/custom_epics/ioc/liasion_translation_manager.py index 0d0bad3..b522f13 100644 --- a/src/dt4acc/custom_epics/ioc/liasion_translation_manager.py +++ b/src/dt4acc/custom_epics/ioc/liasion_translation_manager.py @@ -1,16 +1,30 @@ from typing import Dict, Sequence, Mapping import logging -from bact_twin_architecture.data_model.identifiers import LatticeElementPropertyID, DevicePropertyID, ConversionID +from bact_twin_architecture.data_model.identifiers import ( + LatticeElementPropertyID, + DevicePropertyID, + ConversionID, +) from bact_twin_architecture.interfaces.liaison_manager import LiaisonManagerBase from bact_twin_architecture.interfaces.state_conversion import StateConversion from bact_twin_architecture.interfaces.translator_service import TranslatorServiceBase -from bact_twin_architecture.utils.unit_conversion import LinearUnitConversion, EnergyIndependentLinearUnitConversion -from bact_twin_bessyii_impl.bl.bessyii_nomen_clature import name_matches_horizontal_steerer_name, \ - name_matches_vertical_steerer_name, name_matches_steerer_name +from bact_twin_architecture.utils.unit_conversion import ( + LinearUnitConversion, + EnergyIndependentLinearUnitConversion, +) +from bact_twin_bessyii_impl.bl.bessyii_nomen_clature import ( + name_matches_horizontal_steerer_name, + name_matches_vertical_steerer_name, + name_matches_steerer_name, + name_matches_quadrupole_name, + name_matches_sextupole_name, + YellowPages, + bessyii_yellow_pages, +) from ..data.querries import get_magnets -from ..data.constants import ring_parameters +from ..data.constants import ring_parameters, cavity_names from ...core.model.elementmodel import MagnetElementSetup logger = logging.getLogger("dt4acc") @@ -32,7 +46,9 @@ def inverse(self, id_: DevicePropertyID) -> Sequence[LatticeElementPropertyID]: try: return self.inverse_lut[id_] except KeyError as ke: - logger.error(f"{self.__class__.__name__} I did not find id {id_} in lookup table: {ke}") + logger.error( + f"{self.__class__.__name__} I did not find id {id_} in lookup table: {ke}" + ) class TranslatorService(TranslatorServiceBase): @@ -43,9 +59,33 @@ def get(self, id_: ConversionID) -> StateConversion: try: return self.lut[id_] except KeyError as ke: - logger.error(f"{self.__class__.__name__}: I did not find id {id_} in lookup table: {ke}") + logger.error( + f"{self.__class__.__name__}: I did not find id {id_} in lookup table: {ke}" + ) + od = self.objects_for_device(id_) + logger.warning(f"{self.__class__.__name__}: For the device I know {od}") + em = self.objects_for_lat_elem(id_) + logger.warning( + f"{self.__class__.__name__}: For the lattice element I know {em}" + ) raise ke + def objects_for_lat_elem(self, id_: ConversionID): + return { + key: to + for key, to in self.lut.items() + if id_.lattice_property_id.element_name + == key.lattice_property_id.element_name + } + + def objects_for_device(self, id_: ConversionID): + return { + key: to + for key, to in self.lut.items() + if id_.device_property_id.device_name == key.device_property_id.device_name + } + + def remove_id(d: Dict) -> Dict: nd = d.copy() del d @@ -57,8 +97,39 @@ def magnet_infos_from_db() -> Sequence[MagnetElementSetup]: return [MagnetElementSetup(**remove_id(info)) for info in get_magnets().to_list()] -def build_managers() -> (LiaisonManagerBase, TranslatorServiceBase): - """A first poor mans implementation of the database +def element_method(element_name): + if name_matches_horizontal_steerer_name(element_name): + return "x_kick" + elif name_matches_vertical_steerer_name(element_name): + return "y_kick" + elif name_matches_quadrupole_name(element_name): + return "K" + elif name_matches_sextupole_name(element_name): + return "H" + else: + raise AssertionError(f"Don't know how to handle {element_name}") + + +def extract_host_element_name(element_name: str) -> str: + if name_matches_steerer_name(element_name): + return element_name[1:] + return element_name + + +def construct_energy_independent_linear_conversion( + slope: float, +) -> EnergyIndependentLinearUnitConversion: + if slope is None: + raise AssertionError("Refusing creating linear unit conversion without slope") + return EnergyIndependentLinearUnitConversion( + slope=1.0 / slope, intercept=0.0, brho=ring_parameters.brho + ) + + +def build_managers( + yp: YellowPages = bessyii_yellow_pages(), +) -> (LiaisonManagerBase, TranslatorServiceBase): + """A first poor mans implementation of liasion manager and Translation service for BessyII Todo: Which info is already in database and better obtained from database? @@ -69,47 +140,90 @@ def build_managers() -> (LiaisonManagerBase, TranslatorServiceBase): # Make sure that names are unique ... everything down the list depends on it magnet_names = set([info.name for info in infos]) if len(list(magnet_names)) != len(infos): - raise AssertionError("Magnet names seem not to be unique, but is assumption of all further processing") + raise AssertionError( + "Magnet names seem not to be unique, but is assumption of all further processing" + ) power_converter_names = set([info.pc for info in infos]) power_converter_feeds = { - pc_name: [info.name for info in infos if info.pc == pc_name] for pc_name in power_converter_names + pc_name: [info.name for info in infos if info.pc == pc_name] + for pc_name in power_converter_names } magnet_lut = {info.name: info for info in infos} # todo: check if property must be different for the different magnets ... - - # first for steerers : for AT these are angles applied to the host magnet # I use that I know one pc goes to one steerer inverse_lut = { - DevicePropertyID(device_name=info.pc, property="set_current"): - (LatticeElementPropertyID(element_name=info.name[1:], property="x_kick"),) - for info in infos if name_matches_horizontal_steerer_name(info.name) - + DevicePropertyID(device_name=info.pc, property="set_current"): ( + LatticeElementPropertyID(element_name=info.name[1:], property="x_kick"), + ) + for info in infos + if info.name in yp.horizontal_steerer_names() } - inverse_lut.update({ - DevicePropertyID(device_name=info.pc, property="set_current"): - (LatticeElementPropertyID(element_name=info.name[1:], property="y_kick"),) - for info in infos if name_matches_vertical_steerer_name(info.name) - - }) + inverse_lut.update( + { + DevicePropertyID(device_name=info.pc, property="set_current"): ( + LatticeElementPropertyID(element_name=info.name[1:], property="y_kick"), + ) + for info in infos + if info.name in yp.vertical_steerer_names() + } + ) # Test that steerer power converters only feed one before going to the next step for key in inverse_lut: corr_pc = key.device_name magnet_names = power_converter_feeds[corr_pc] if len(magnet_names) != 1: - raise AssertionError(f"Found {magnet_names} magnets on assumed corrector power supply {corr_pc}") + raise AssertionError( + f"Found {magnet_names} magnets on assumed corrector power supply {corr_pc}" + ) # quadrupoles and sextupoles steerer_pc_names = [key.device_name for key in inverse_lut] - inverse_lut.update({ - DevicePropertyID(device_name=pc_name, property="set_current"): - tuple([LatticeElementPropertyID(element_name=magnet_name, property="K") for magnet_name in magnet_names]) - for pc_name, magnet_names in power_converter_feeds.items() if pc_name not in steerer_pc_names - }) + inverse_lut.update( + { + DevicePropertyID(device_name=pc_name, property="set_current"): tuple( + [ + LatticeElementPropertyID(element_name=magnet_name, property="K") + for magnet_name in magnet_names + if magnet_name in yp.quadrupole_names() + ] + ) + for pc_name, magnet_names in power_converter_feeds.items() + if pc_name not in steerer_pc_names + } + ) + inverse_lut.update( + { + DevicePropertyID(device_name=pc_name, property="set_current"): tuple( + [ + LatticeElementPropertyID(element_name=magnet_name, property="H") + for magnet_name in magnet_names + if magnet_name in yp.sextupole_names() + ] + ) + for pc_name, magnet_names in power_converter_feeds.items() + if pc_name not in steerer_pc_names + } + ) + + inverse_lut.update( + { + DevicePropertyID(device_name=pc_name, property="set_current"): tuple( + [ + LatticeElementPropertyID( + element_name=magnet_name, property="main_strength" + ) + for magnet_name in magnet_names + ] + ) + for pc_name, magnet_names in power_converter_feeds.items() + if pc_name not in steerer_pc_names + } + ) # Add lut for quadrupoles and sextupole # Furthermore to feed through the K value ... @@ -117,68 +231,160 @@ def build_managers() -> (LiaisonManagerBase, TranslatorServiceBase): # is that appropriate ? # Should one rather use a handler for lattice elements quad_updates = dict() - for axis_name in "x", "y", "K": - quad_updates.update({ - DevicePropertyID(device_name=info.name, property=axis_name): - (LatticeElementPropertyID(element_name=info.name, property=axis_name),) - for info in infos if info.type in ["Sextupole", "Quadrupole"] - }) + for axis_name in "x", "y", "K": + quad_updates.update( + { + DevicePropertyID(device_name=info.name, property=axis_name): ( + LatticeElementPropertyID( + element_name=info.name, property=axis_name + ), + ) + for info in infos + if info.type in ["Sextupole", "Quadrupole"] + } + ) inverse_lut.update(quad_updates) + # Cavities and master clock + inverse_lut.update( + { + DevicePropertyID(device_name=name, property="frequency"): ( + LatticeElementPropertyID(element_name=name, property="frequency"), + ) + for name in cavity_names + } + ) + inverse_lut.update( + { + DevicePropertyID( + device_name="master_clock", property="reference_frequency" + ): tuple( + [ + LatticeElementPropertyID(element_name=name, property="frequency") + for name in cavity_names + ] + ) + } + ) forward_lut = None lm = LiaisonManager(forward_lut=forward_lut, inverse_lut=inverse_lut) - def element_method(element_name): - device_name = "HS4M2D1R" - if element_name == device_name: - pass - if name_matches_horizontal_steerer_name(element_name): - return "x_kick" - elif name_matches_vertical_steerer_name(element_name): - return "y_kick" - else: - return "K" - - def extract_host_element_name(element_name: str) -> str: - if name_matches_steerer_name(element_name): - return element_name[1:] - return element_name - - def construct_energy_independent_linear_conversion(slope: float) -> EnergyIndependentLinearUnitConversion: - if slope is None: - raise AssertionError("Refusing creating linear unit conversion without slope") - return EnergyIndependentLinearUnitConversion(slope=1.0/slope, intercept=0.0, brho=ring_parameters.brho) - # start to build it for the magnets ... power converter feed translator_lut = { ConversionID( LatticeElementPropertyID( element_name=extract_host_element_name(info.name), - property=element_method(info.name) + property=element_method(info.name), ), - DevicePropertyID(device_name=info.pc, property="set_current") + DevicePropertyID(device_name=info.pc, property="set_current"), ): - # todo: check for the correct conversion - construct_energy_independent_linear_conversion(slope=info.magnetic_strength) + # todo: check for the correct conversion + construct_energy_independent_linear_conversion(slope=info.magnetic_strength) for info in infos } + # add look up for value using main strength for quadrupoles and sextupoles + translator_lut.update( + { + ConversionID( + LatticeElementPropertyID( + element_name=extract_host_element_name(info.name), + property="main_strength", + ), + DevicePropertyID(device_name=info.pc, property="set_current"), + ): + # todo: check for the correct conversion + construct_energy_independent_linear_conversion(slope=info.magnetic_strength) + for info in infos + if info.type in ["Sextupole", "Quadrupole"] + } + ) axis_updates = dict() - for axis_name in "x", "y", "K": - axis_updates.update({ - ConversionID( - lattice_property_id=LatticeElementPropertyID(element_name=info.name, property=axis_name), - device_property_id=DevicePropertyID(device_name=info.name, property=axis_name) - ) - : LinearUnitConversion(slope=1.0, intercept=0.0) - for info in infos if info.type in ["Sextupole", "Quadrupole"] - }) - translator_lut.update(axis_updates) # start to build it for quadrupoles and sextupoles axes + for axis_name in "x", "y": + axis_updates.update( + { + ConversionID( + lattice_property_id=LatticeElementPropertyID( + element_name=info.name, property=axis_name + ), + device_property_id=DevicePropertyID( + device_name=info.name, property=axis_name + ), + ): LinearUnitConversion(slope=1.0, intercept=0.0) + for info in infos + if info.type in ["Sextupole", "Quadrupole"] + } + ) + translator_lut.update(axis_updates) + + # K and H as requested from the device world ... should it be there? + translator_lut.update( + { + ConversionID( + lattice_property_id=LatticeElementPropertyID( + element_name=info.name, property="K" + ), + device_property_id=DevicePropertyID( + device_name=info.name, property="K" + ), + ): LinearUnitConversion(slope=1.0, intercept=0.0) + for info in infos + if info.type in ["Quadrupole"] + } + ) + # Todo: questionable if that should be handled here ... + # view should know what to delegate to the lattice + translator_lut.update( + { + ConversionID( + lattice_property_id=LatticeElementPropertyID( + element_name=info.name, property="H" + ), + device_property_id=DevicePropertyID( + device_name=info.name, property="H" + ), + ): LinearUnitConversion(slope=1.0, intercept=0.0) + for info in infos + if info.type in ["Sextupole"] + } + ) + + # cavities + translator_lut.update( + { + ConversionID( + lattice_property_id=LatticeElementPropertyID( + element_name=name, property="frequency" + ), + device_property_id=DevicePropertyID( + device_name=name, property="frequency" + ), + ): LinearUnitConversion( + slope=1e-3, intercept=0.0 + ) # BESSY II uses kHz for the cavities clock + for name in cavity_names + } + ) + + translator_lut.update( + { + ConversionID( + LatticeElementPropertyID(element_name=name, property="frequency"), + DevicePropertyID( + device_name="master_clock", property="reference_frequency" + ), + ): LinearUnitConversion( + slope=1e-3, intercept=0.0 + ) # BESSY II uses kHz for the master clock + for name in cavity_names + } + ) tm = TranslatorService(translator_lut) return lm, tm + if __name__ == "__main__": - build_managers() \ No newline at end of file + build_managers() diff --git a/src/dt4acc/custom_epics/ioc/pv_setup.py b/src/dt4acc/custom_epics/ioc/pv_setup.py index 2d5ff3c..76a6510 100644 --- a/src/dt4acc/custom_epics/ioc/pv_setup.py +++ b/src/dt4acc/custom_epics/ioc/pv_setup.py @@ -1,14 +1,21 @@ -import logging - import numpy as np +from bact_twin_architecture.data_model.identifiers import ( + LatticeElementPropertyID, + DevicePropertyID, +) +from p4p.asLib.yacc import start from .handlers import handle_device_update, update_manager from ..data.constants import config, special_pvs, cavity_names -from ..data.querries import get_unique_power_converters, get_magnets_per_power_converters +from ..data.querries import ( + get_unique_power_converters, + get_magnets_per_power_converters, +) from ...core.utils.logger import get_logger logger = get_logger() + def flag_not_handling(pv_name: str, val: object): logger.warning("Not handling update of pv %s to %s", pv_name, val) @@ -27,22 +34,37 @@ def initialize_magnet_pvs(builder, magnet): - `:x:set`: Horizontal position setpoint - `:y:set`: Vertical position setpoint """ - magnet_name = magnet['name'] + magnet_name = magnet["name"] # Create an element representing the magnet # Create PVs and link to update logic - val = update_manager.peek(magnet_name, "main_strength") - builder.aOut(f"{magnet_name}:Cm:set", initial_value=magnet["k"] or 0, - on_update=lambda val: handle_device_update(magnet_name, "K", val)) + val = update_manager.peek_engine( + LatticeElementPropertyID(element_name=magnet_name, property="main_strength") + ) + builder.aOut( + f"{magnet_name}:Cm:set", + initial_value=magnet["k"] or 0, + on_update=lambda val: handle_device_update(magnet_name, "K", val), + ) builder.aIn(f"{magnet_name}:Cm:rdbk", initial_value=val) - builder.aOut(f"{magnet_name}:im:I", initial_value=0.0, - # Todo: what to do if current is set, should be rather read only - # on_update=lambda val: handle_device_update(f"{magnet_name}:im:I", val) - on_update=lambda val: handle_device_update(magnet_name, "powersupply_current", val) - ) - builder.aOut(f"{magnet_name}:x:set", initial_value=0.0, - on_update=lambda val: handle_device_update(magnet_name, "x", val)) - builder.aOut(f"{magnet_name}:y:set", initial_value=0.0, - on_update=lambda val: handle_device_update(magnet_name, "y", val)) + builder.aOut( + f"{magnet_name}:im:I", + initial_value=0.0, + # Todo: what to do if current is set, should be rather read only + # on_update=lambda val: handle_device_update(f"{magnet_name}:im:I", val) + on_update=lambda val: handle_device_update( + magnet_name, "powersupply_current", val + ), + ) + builder.aOut( + f"{magnet_name}:x:set", + initial_value=0.0, + on_update=lambda val: handle_device_update(magnet_name, "x", val), + ) + builder.aOut( + f"{magnet_name}:y:set", + initial_value=0.0, + on_update=lambda val: handle_device_update(magnet_name, "y", val), + ) def initialize_power_converter_pvs(builder, prefix): @@ -67,7 +89,7 @@ def add_pc_pvs(builder, pc_name, prefix): prefix (str): Prefix for PVs. """ magnets = get_magnets_per_power_converters(pc_name) - element = {'magnets': [item['name'] for item in magnets]} + element = {"magnets": [item["name"] for item in magnets]} element_cache = {} # Store magnet information for reference element_cache[pc_name] = element @@ -77,10 +99,17 @@ def add_pc_pvs(builder, pc_name, prefix): # Create power converter setpoint and readback PVs # Todo: put it to power converters directly - builder.aOut(f"{pc_name}:set", initial_value=0.0, - on_update=lambda val: handle_device_update(pc_name, "set_current", val) + vals = update_manager.device_value_from_peeking_engine( + DevicePropertyID(device_name=pc_name, property="set_current") + ) + start_val = np.asarray(vals).mean() + builder.aOut( + f"{pc_name}:set", + initial_value=start_val, + on_update=lambda val: handle_device_update(pc_name, "set_current", val), ) - builder.aOut(f"{pc_name}:rdbk", initial_value=0.0) + #: todo ensur that readback is updated + builder.aOut(f"{pc_name}:rdbk", initial_value=start_val) def initialize_orbit_pvs(builder): @@ -93,7 +122,9 @@ def initialize_orbit_pvs(builder): builder.WaveformOut(f"beam:orbit:x", initial_value=[0.0], length=config.n_elements) builder.WaveformOut(f"beam:orbit:y", initial_value=[0.0], length=config.n_elements) builder.WaveformOut(f"beam:orbit:x0", initial_value=[0.0], length=config.n_elements) - builder.WaveformOut(f"beam:orbit:names", initial_value=[""], length=config.n_elements) + builder.WaveformOut( + f"beam:orbit:names", initial_value=[""], length=config.n_elements + ) builder.aOut(f"beam:orbit:found", initial_value=0) @@ -104,23 +135,61 @@ def initialize_twiss_pvs(builder): Args: builder: The SoftIOC PV builder instance. """ - for axis in ['x', 'y']: - builder.WaveformOut(f"beam:twiss:{axis}:alpha", initial_value=[0.0], length=config.n_elements) - builder.WaveformOut(f"beam:twiss:{axis}:beta", initial_value=[0.0], length=config.n_elements) - builder.WaveformOut(f"beam:twiss:{axis}:nu", initial_value=[0.0], length=config.n_elements) - builder.WaveformOut(f"beam:twiss:names", initial_value=[""], length=config.n_elements) + for axis in ["x", "y"]: + builder.WaveformOut( + f"beam:twiss:{axis}:alpha", initial_value=[0.0], length=config.n_elements + ) + builder.WaveformOut( + f"beam:twiss:{axis}:beta", initial_value=[0.0], length=config.n_elements + ) + builder.WaveformOut( + f"beam:twiss:{axis}:nu", initial_value=[0.0], length=config.n_elements + ) + builder.WaveformOut( + f"beam:twiss:names", initial_value=[""], length=config.n_elements + ) -def initialize_other_pvs(builder, prefix): +def initialize_master_clock_pvs(builder): + """initalise master clock pv + + Warning: + note for running the twin as a shadow it will + require precise frequency tuning + + Todo: + Foresee dedicated variables for allowing only a difference shift + Provide the frequency the code starts with """ - Initializes miscellaneous PVs including master clock and dummy values. + vals = update_manager.device_value_from_peeking_engine( + DevicePropertyID(device_name="master_clock", property="reference_frequency") + ) + start_val = np.asarray(vals).mean() + builder.aOut( + f"{special_pvs['master_clock']}:freq", + initial_value=start_val, + always_update=True, + EGU="kHz", + PREC=3, + on_update=lambda val: handle_device_update( + device_id="master_clock", property_id="reference_frequency", value=val + ), + ) + builder.aIn("lattice_info:ref_freq", initial_value=start_val, EGU="kHz", PREC=1) + builder.longIn( + "lattice_info:ref_freq:khz:up", initial_value=int(start_val), EGU="kHz" + ) + frac = (start_val % 1) * 1e6 + builder.longIn("lattice_info:ref_freq:khz:frac", initial_value=int(frac), EGU="mHz") + + +def initialize_other_pvs(builder, prefix): + """Initializes miscellaneous PVs (dummy values). Args: builder: The SoftIOC PV builder instance. prefix (str): Prefix for PV naming. """ - builder.aOut(f"{special_pvs['master_clock']}:freq", initial_value=0, - on_update=lambda val: handle_device_update(device_id="master_clock", property_id="reference_frequency", value=val)) builder.aOut(f"dummy:x", initial_value=0) builder.aOut(f"dummy:y", initial_value=0) builder.aOut(f"{special_pvs['current']}:current", initial_value=0) @@ -128,9 +197,11 @@ def initialize_other_pvs(builder, prefix): def initialize_bpm_pvs(builder): tmp = np.empty([2048], np.int16) - tmp.fill(-2 ** 15 + 1) + tmp.fill(-(2 ** 15) + 1) # bpm pv names from default - builder.WaveformOut(f"{special_pvs['bpm_pv']}:bdata", initial_value=tmp, length=len(tmp)) + builder.WaveformOut( + f"{special_pvs['bpm_pv']}:bdata", initial_value=tmp, length=len(tmp) + ) builder.longOut(f"{special_pvs['bpm_pv']}:count", initial_value=0) @@ -140,6 +211,14 @@ def initialize_cavity_pvs(builder): Args: builder: The SoftIOC PV builder instance. + + Todo: + check that these are updated if the master clock changes """ + for cavity_name in cavity_names: - builder.aOut(f"{cavity_name}:freq", initial_value=0.0) + vals = update_manager.device_value_from_peeking_engine( + DevicePropertyID(device_name=cavity_name, property="frequency") + ) + start_val = np.asarray(vals).mean() + builder.aOut(f"{cavity_name}:freq", initial_value=start_val, EGU="kHz", PREC=3) diff --git a/src/dt4acc/custom_epics/ioc/server.py b/src/dt4acc/custom_epics/ioc/server.py index 7e95ca3..3d0d9cd 100644 --- a/src/dt4acc/custom_epics/ioc/server.py +++ b/src/dt4acc/custom_epics/ioc/server.py @@ -6,11 +6,12 @@ from .tasks import monitor_heartbeat from .pv_setup import ( initialize_power_converter_pvs, + initialize_cavity_pvs, + initialize_master_clock_pvs, initialize_orbit_pvs, initialize_bpm_pvs, initialize_twiss_pvs, initialize_other_pvs, - initialize_cavity_pvs ) # Create an asyncio dispatcher to handle asynchronous PV updates @@ -25,13 +26,16 @@ def main(): prefix = os.environ.get("DT4ACC_PREFIX", "Anonym") builder.SetDeviceName(prefix) + + initialize_cavity_pvs(builder) # Initialize cavity-related PVs + initialize_master_clock_pvs(builder) # Initialize additional PVs such as master clock, dummy data + initialize_other_pvs(builder, prefix) # Initialize additional PVs such as master clock, dummy data + # Initialize PVs for various accelerator components initialize_power_converter_pvs(builder, prefix) # Initialize power converters and linked magnets initialize_orbit_pvs(builder) # Initialize orbit-related PVs initialize_bpm_pvs(builder) # Initialize Beam Position Monitor PVs initialize_twiss_pvs(builder) # Initialize Twiss parameter PVs - initialize_cavity_pvs(builder) # Initialize cavity-related PVs - initialize_other_pvs(builder, prefix) # Initialize additional PVs such as master clock, dummy data # Load the database of PVs defined above into the SoftIOC server builder.LoadDatabase()