# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST LMC project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module provides an implementation of the BEAM PST component manager."""
from __future__ import annotations
import functools
import json
import logging
import sys
import threading
from typing import Any, Callable, Dict, List, Optional, Tuple
from ska_tango_base.base import check_communicating
from ska_tango_base.control_model import (
AdminMode,
CommunicationStatus,
HealthState,
LoggingLevel,
ObsState,
PowerState,
)
from ska_tango_base.executor import TaskStatus
from ska_pst_lmc.beam.beam_device_interface import PstBeamDeviceInterface
from ska_pst_lmc.component import as_device_attribute_name
from ska_pst_lmc.component.component_manager import PstComponentManager
from ska_pst_lmc.device_proxy import ChangeEventSubscription, DeviceProxyFactory, PstDeviceProxy
from ska_pst_lmc.job import DeviceCommandTask, LambdaTask, NoopTask, SequentialTask, Task, TaskExecutor
from ska_pst_lmc.util import TelescopeFacilityEnum
from ska_pst_lmc.util.callback import Callback, callback_safely
TaskResponse = Tuple[TaskStatus, str]
RemoteTaskResponse = Tuple[List[TaskStatus], List[str]]
__all__ = [
"PstBeamComponentManager",
]
ActionResponse = Tuple[List[str], List[str]]
RemoteDeviceAction = Callable[[PstDeviceProxy], ActionResponse]
class _RemoteJob:
def __init__(
self: _RemoteJob,
job: Task,
task_executor: TaskExecutor,
completion_callback: Callback,
logger: logging.Logger,
):
self._job = job
self._completion_callback = completion_callback
self._logger = logger
self._task_executor = task_executor
def __call__(
self: _RemoteJob,
*args: Any,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
**kwargs: Any,
) -> None:
def _completion_callback(*arg: Any, **kwargs: Any) -> None:
self._completion_callback(task_callback) # type: ignore
callback_safely(task_callback, status=TaskStatus.IN_PROGRESS)
try:
self._task_executor.submit_job(job=self._job, callback=_completion_callback)
except Exception as e:
self._logger.warning("Error in submitting long running commands to remote devices", exc_info=True)
if task_callback:
task_callback(status=TaskStatus.FAILED, result=str(e), exception=e)
[docs]class PstBeamComponentManager(PstComponentManager[PstBeamDeviceInterface]):
"""
Component manager for the BEAM component in PST.LMC.
Since the BEAM component is a logical device, this component
manager is used to orchestrate the process devices, such as
BEAM, RECV.
Commands that are executed on this component manager are
sent to instances of :py:class:`PstDeviceProxy` for each
device that the BEAM device manages.
This component manager only takes the fully-qualified device
name (FQDN) for the remote devices, but uses the
:py:class:`DeviceProxyFactory` to retrieve instances of the
device proxies that commands should be sent to.
"""
_smrb_device: PstDeviceProxy
_recv_device: PstDeviceProxy
_dsp_device: PstDeviceProxy
_stat_device: PstDeviceProxy
def __init__(
self: PstBeamComponentManager,
*,
device_interface: PstBeamDeviceInterface,
logger: logging.Logger,
**kwargs: Any,
) -> None:
"""
Initialise component manager.
:param smrb_fqdn: the fully qualified device name (FQDN) of the shared memory ring buffer (SMRB) TANGO
device.
:param recv_fqdn: the FQDN of the Receive TANGO device.
:param dsp_fqdn: the FQDN of the Digital Signal processing (DSP) TANGO device.
:param simulation_mode: enum to track if component should be in simulation mode or not.
:param logger: a logger for this object to use
:param communication_status_changed_callback: callback to be called when the status of the
communications channel between the component manager and its component changes
:param component_fault_callback: callback to be called when the component faults (or stops faulting)
"""
self._smrb_device = DeviceProxyFactory.get_device(device_interface.smrb_fqdn)
self._recv_device = DeviceProxyFactory.get_device(device_interface.recv_fqdn)
self._dsp_device = DeviceProxyFactory.get_device(device_interface.dsp_fqdn)
self._stat_device = DeviceProxyFactory.get_device(device_interface.stat_fqdn)
self._remote_devices = [self._smrb_device, self._recv_device, self._dsp_device, self._stat_device]
self._subscribed = False
self._pst_task_executor = TaskExecutor(logger=logger)
self._curr_scan_config: dict | None = None
self._pst_task_executor.start()
super().__init__(
device_interface=device_interface,
power=PowerState.UNKNOWN,
fault=None,
logger=logger,
**kwargs,
)
self._initialise_monitoring_properties()
self._change_event_subscriptions: List[ChangeEventSubscription] = []
def __del__(self: PstBeamComponentManager) -> None:
"""Handle shutdown of component manager."""
self._pst_task_executor.stop()
def _initialise_monitoring_properties(self: PstBeamComponentManager) -> None:
"""
Initialise all the monitored properties.
This method will set all the properties to there default values. This
calls the `_reset_monitoring_properties` method. Most properties can
be reset, but the `available_disk_space` property is not reset once
it has been updated by the system.
"""
import sys
self.available_disk_space = sys.maxsize
self._reset_monitoring_properties()
def _reset_monitoring_properties(self: PstBeamComponentManager) -> None:
"""
Reset monitored attributes.
This method resets the values to a sensible default when not in a configured state.
"""
from ska_pst_lmc.dsp.dsp_model import DEFAULT_RECORDING_TIME
# RECV properties
self.data_received = 0
self.data_receive_rate = 0.0
self.data_dropped = 0
self.data_drop_rate = 0.0
self.misordered_packets = 0
self.misordered_packet_rate = 0.0
self.malformed_packets = 0
self.malformed_packet_rate = 0.0
self.misdirected_packets = 0
self.misdirected_packet_rate = 0.0
self.checksum_failure_packets = 0
self.checksum_failure_packet_rate = 0.0
self.timestamp_sync_error_packets = 0
self.timestamp_sync_error_packet_rate = 0.0
self.seq_number_sync_error_packets = 0
self.seq_number_sync_error_packet_rate = 0.0
self.data_record_rate = 0.0
self.data_recorded = 0
self.available_recording_time = DEFAULT_RECORDING_TIME
self.ring_buffer_utilisation = 0.0
self.expected_data_record_rate = 0.0
self.channel_block_configuration = {}
self.config_id = ""
self.scan_id = 0
# expose disk
self.disk_capacity = sys.maxsize
self.disk_used_bytes = 0
self.disk_used_percentage = 0.0
@property
def channel_block_configuration(self: PstBeamComponentManager) -> Dict[str, Any]:
"""Get current channel block configuration."""
return self._channel_block_configuration
@channel_block_configuration.setter
def channel_block_configuration(self: PstBeamComponentManager, config: Dict[str, Any]) -> None:
"""Set channel black configuration."""
self._channel_block_configuration = config
self._property_callback("channel_block_configuration", json.dumps(config))
def _update_channel_block_configuration(
self: PstBeamComponentManager, subband_beam_configuration: str
) -> None:
"""
Update the channel block configuration.
This calculates the new channel block configuration and is only called
after a successful `ConfigureScan` request. It uses the SMRB util to work
determine the subband configuration and then maps that to what is need
by the client of the BEAM.MGMT.
.. code-block:: python
{
"num_channel_blocks": 2,
"channel_blocks": [
{
"destination_host": "10.10.0.1",
"destination_port": 20000,
"start_pst_channel": 0,
"num_pst_channels": 12,
},
{
"destination_host": "10.10.0.1",
"destination_port": 20001,
"start_pst_channel": 12,
"num_pst_channels": 10,
},
]
}
"""
subband_resources = json.loads(subband_beam_configuration)
if subband_resources:
self.channel_block_configuration = {
"num_channel_blocks": subband_resources["common"]["nsubband"],
"channel_blocks": [
{
"destination_host": subband["data_host"],
"destination_port": subband["data_port"],
"start_pst_channel": subband["start_channel"],
"num_pst_channels": subband["end_channel"] - subband["start_channel"],
}
for subband in subband_resources["subbands"].values()
],
}
else:
self.channel_block_configuration = {}
@property
def data_receive_rate(self: PstBeamComponentManager) -> float:
"""Get current received data rate in Gb/s."""
return self._data_receive_rate
@data_receive_rate.setter
def data_receive_rate(self: PstBeamComponentManager, data_receive_rate: float) -> None:
"""Set current received data rate in Gb/s."""
self._data_receive_rate = data_receive_rate
self._property_callback("data_receive_rate", data_receive_rate)
@property
def data_received(self: PstBeamComponentManager) -> int:
"""Get current received data in bytes."""
return self._data_received
@data_received.setter
def data_received(self: PstBeamComponentManager, data_received: int) -> None:
"""Set current received data in bytes."""
self._data_received = data_received
self._property_callback("data_received", data_received)
@property
def data_drop_rate(self: PstBeamComponentManager) -> float:
"""Get current dropped data rate in bytes per second."""
return self._data_drop_rate
@data_drop_rate.setter
def data_drop_rate(self: PstBeamComponentManager, data_drop_rate: float) -> None:
"""Set current dropped data rate in bytes per second."""
self._data_drop_rate = data_drop_rate
self._property_callback("data_drop_rate", data_drop_rate)
@property
def data_dropped(self: PstBeamComponentManager) -> int:
"""Get current dropped data in bytes."""
return self._data_dropped
@data_dropped.setter
def data_dropped(self: PstBeamComponentManager, data_dropped: int) -> None:
"""Set current dropped data in bytes."""
self._data_dropped = data_dropped
self._property_callback("data_dropped", data_dropped)
@property
def misordered_packets(self: PstBeamComponentManager) -> int:
"""Get the total number of packets received out of order in the current scan."""
return self._misordered_packets
@misordered_packets.setter
def misordered_packets(self: PstBeamComponentManager, misordered_packets: int) -> None:
"""Set the total number of packets received out of order in the current scan."""
self._misordered_packets = misordered_packets
self._property_callback("misordered_packets", misordered_packets)
@property
def misordered_packet_rate(self: PstBeamComponentManager) -> float:
"""Get the current rate of packets received out of order in packets/sec."""
return self._misordered_packet_rate
@misordered_packet_rate.setter
def misordered_packet_rate(self: PstBeamComponentManager, misordered_packet_rate: float) -> None:
"""Set the current rate of packets received out of order in packets/sec."""
self._misordered_packet_rate = misordered_packet_rate
self._property_callback("misordered_packet_rate", misordered_packet_rate)
@property
def malformed_packets(self: PstBeamComponentManager) -> int:
"""Get the total number of malformed packets in the current scan."""
return self._malformed_packets
@malformed_packets.setter
def malformed_packets(self: PstBeamComponentManager, malformed_packets: int) -> None:
"""Set the total number of malformed packets in the current scan."""
self._malformed_packets = malformed_packets
self._property_callback("malformed_packets", malformed_packets)
@property
def malformed_packet_rate(self: PstBeamComponentManager) -> float:
"""Get the current rate of malformed packets in packets/sec."""
return self._malformed_packet_rate
@malformed_packet_rate.setter
def malformed_packet_rate(self: PstBeamComponentManager, malformed_packet_rate: float) -> None:
"""Set the current rate of malformed packets in packets/sec."""
self._malformed_packet_rate = malformed_packet_rate
self._property_callback("malformed_packet_rate", malformed_packet_rate)
@property
def misdirected_packets(self: PstBeamComponentManager) -> int:
"""Get the total number of misdirected packets in the current scan."""
return self._misdirected_packets
@misdirected_packets.setter
def misdirected_packets(self: PstBeamComponentManager, misdirected_packets: int) -> None:
"""Set the total number of misdirected packets in the current scan."""
self._misdirected_packets = misdirected_packets
self._property_callback("misdirected_packets", misdirected_packets)
@property
def misdirected_packet_rate(self: PstBeamComponentManager) -> float:
"""Get the current rate of misdirected packets in packets/sec."""
return self._misdirected_packet_rate
@misdirected_packet_rate.setter
def misdirected_packet_rate(self: PstBeamComponentManager, misdirected_packet_rate: float) -> None:
"""Set the current rate of misdirected packets in packets/sec."""
self._misdirected_packet_rate = misdirected_packet_rate
self._property_callback("misdirected_packet_rate", misdirected_packet_rate)
@property
def checksum_failure_packets(self: PstBeamComponentManager) -> int:
"""Get the total number of packets with checksum failures for the current scan."""
return self._checksum_failure_packets
@checksum_failure_packets.setter
def checksum_failure_packets(self: PstBeamComponentManager, checksum_failure_packets: int) -> None:
"""Set the total number of packets with checksum failures for the current scan."""
self._checksum_failure_packets = checksum_failure_packets
self._property_callback("checksum_failure_packets", checksum_failure_packets)
@property
def checksum_failure_packet_rate(self: PstBeamComponentManager) -> float:
"""Get the current rate of packets with checksum failures in packets/sec."""
return self._checksum_failure_packet_rate
@checksum_failure_packet_rate.setter
def checksum_failure_packet_rate(
self: PstBeamComponentManager, checksum_failure_packet_rate: float
) -> None:
"""Set the current rate of packets with checksum failures in packets/sec."""
self._checksum_failure_packet_rate = checksum_failure_packet_rate
self._property_callback("checksum_failure_packet_rate", checksum_failure_packet_rate)
@property
def timestamp_sync_error_packets(self: PstBeamComponentManager) -> int:
"""Get the total number of packets with timestamp sync errors for the current scan."""
return self._timestamp_sync_error_packets
@timestamp_sync_error_packets.setter
def timestamp_sync_error_packets(
self: PstBeamComponentManager, timestamp_sync_error_packets: int
) -> None:
"""Set the total number of packets with timestamp sync errors for the current scan."""
self._timestamp_sync_error_packets = timestamp_sync_error_packets
self._property_callback("timestamp_sync_error_packets", timestamp_sync_error_packets)
@property
def timestamp_sync_error_packet_rate(self: PstBeamComponentManager) -> float:
"""Get the current rate of packets with timestamp sync errors in packets/sec."""
return self._timestamp_sync_error_packet_rate
@timestamp_sync_error_packet_rate.setter
def timestamp_sync_error_packet_rate(
self: PstBeamComponentManager, timestamp_sync_error_packet_rate: float
) -> None:
"""Set the current rate of packets with timestamp sync errors in packets/sec."""
self._timestamp_sync_error_packet_rate = timestamp_sync_error_packet_rate
self._property_callback("timestamp_sync_error_packet_rate", timestamp_sync_error_packet_rate)
@property
def seq_number_sync_error_packets(self: PstBeamComponentManager) -> int:
"""
Get the total number of packets with seq.
number sync error for the current scan.
"""
return self._seq_number_sync_error_packets
@seq_number_sync_error_packets.setter
def seq_number_sync_error_packets(
self: PstBeamComponentManager, seq_number_sync_error_packets: int
) -> None:
"""
Set the total number of packets with seq.
number sync error for the current scan.
"""
self._seq_number_sync_error_packets = seq_number_sync_error_packets
self._property_callback("seq_number_sync_error_packets", seq_number_sync_error_packets)
@property
def seq_number_sync_error_packet_rate(self: PstBeamComponentManager) -> float:
"""
Get the current rate of packets with seq.
number sync error in packets/sec.
"""
return self._seq_number_sync_error_packet_rate
@seq_number_sync_error_packet_rate.setter
def seq_number_sync_error_packet_rate(
self: PstBeamComponentManager, seq_number_sync_error_packet_rate: float
) -> None:
"""
Set the current rate of packets with seq.
number sync error in packets/sec.
"""
self._seq_number_sync_error_packet_rate = seq_number_sync_error_packet_rate
self._property_callback("seq_number_sync_error_packet_rate", seq_number_sync_error_packet_rate)
@property
def data_record_rate(self: PstBeamComponentManager) -> float:
"""Get current data write rate in bytes per second."""
return self._data_record_rate
@data_record_rate.setter
def data_record_rate(self: PstBeamComponentManager, data_record_rate: int) -> None:
"""Set current data write rate in bytes per second."""
self._data_record_rate = data_record_rate
self._property_callback("data_record_rate", data_record_rate)
@property
def data_recorded(self: PstBeamComponentManager) -> int:
"""Get current amount of bytes written to file."""
return self._data_recorded
@data_recorded.setter
def data_recorded(self: PstBeamComponentManager, data_recorded: int) -> None:
"""Set current amount of bytes written to file."""
self._data_recorded = data_recorded
self._property_callback("data_recorded", data_recorded)
@property
def disk_capacity(self: PstBeamComponentManager) -> int:
"""Get size, in bytes, for the disk used for recording scan data."""
return self._disk_capacity
@disk_capacity.setter
def disk_capacity(self: PstBeamComponentManager, disk_capacity: int) -> None:
"""Set size, in bytes, for the disk used for recording scan data."""
self._disk_capacity = disk_capacity
self._property_callback("disk_capacity", disk_capacity)
@property
def disk_used_bytes(self: PstBeamComponentManager) -> int:
"""Get the current amount, in bytes, of disk used used."""
return self._disk_used_bytes
@disk_used_bytes.setter
def disk_used_bytes(self: PstBeamComponentManager, disk_used_bytes: int) -> None:
"""Set the current amount, in bytes, of disk used."""
self._disk_used_bytes = disk_used_bytes
self._property_callback("disk_used_bytes", disk_used_bytes)
@property
def disk_used_percentage(self: PstBeamComponentManager) -> float:
"""Get the percentage of used disk space for recording of scan data."""
return self._disk_used_percentage
@disk_used_percentage.setter
def disk_used_percentage(self: PstBeamComponentManager, disk_used_percentage: float) -> None:
"""Set the percent of used disk space for recording of scan data."""
self._disk_used_percentage = disk_used_percentage
self._property_callback("disk_used_percentage", disk_used_percentage)
@property
def available_disk_space(self: PstBeamComponentManager) -> int:
"""Get available bytes for disk to be written to during scan."""
return self._available_disk_space
@available_disk_space.setter
def available_disk_space(self: PstBeamComponentManager, available_disk_space: int) -> None:
"""Set available bytes for disk to be written to during scan."""
self._available_disk_space = available_disk_space
self._property_callback("available_disk_space", available_disk_space)
@property
def available_recording_time(self: PstBeamComponentManager) -> float:
"""Get the available recording time, for the disk being written to during the scan, in seconds."""
return self._available_recording_time
@available_recording_time.setter
def available_recording_time(self: PstBeamComponentManager, available_recording_time: float) -> None:
"""Set the available recording time, for the disk being written to during the scan, in seconds."""
self._available_recording_time = available_recording_time
self._property_callback("available_recording_time", available_recording_time)
@property
def ring_buffer_utilisation(self: PstBeamComponentManager) -> float:
"""Get current utilisation of ring buffer for current scan configuration."""
return self._ring_buffer_utilisation
@ring_buffer_utilisation.setter
def ring_buffer_utilisation(self: PstBeamComponentManager, ring_buffer_utilisation: float) -> None:
"""Set current utilisation of ring buffer for current scan configuration."""
self._ring_buffer_utilisation = ring_buffer_utilisation
self._property_callback("ring_buffer_utilisation", ring_buffer_utilisation)
@property
def expected_data_record_rate(self: PstBeamComponentManager) -> float:
"""Get the expected data rate for DSP output for current scan configuration."""
return self._expected_data_record_rate
@expected_data_record_rate.setter
def expected_data_record_rate(self: PstBeamComponentManager, expected_data_record_rate: float) -> None:
"""Set the expected data rate for DSP output for current scan configuration."""
self._expected_data_record_rate = expected_data_record_rate
self._property_callback("expected_data_record_rate", expected_data_record_rate)
def _handle_subdevice_obs_state_event(
self: PstBeamComponentManager, device: PstDeviceProxy, obs_state: ObsState
) -> None:
"""
Handle a change in the a subdevice's obsState.
Currently this just handles that a subdevice goes into a FAULT state. However, this could be used for
knowning when the device has moved out of FAULT or when it has stopped scanning.
:param device: the device proxy for the subordinate device.
:type device: PstDeviceProxy
:param obs_state: the new obsState of a subordinated device.
:type obs_state: ObsState
"""
self.logger.debug(f"Recevied an update to {device._fqdn}.obsState. New value is {obs_state}")
if obs_state == ObsState.FAULT:
fault_msg: str = device.healthFailureMessage
self.logger.warning(f"Recevied a FAULT for {device.fqdn}. Fault msg = '{fault_msg}'")
self._device_interface.handle_subdevice_fault(device_fqdn=device.fqdn, fault_msg=fault_msg)
def _simulation_mode_changed(self: PstBeamComponentManager) -> None:
"""
Set simulation mode state.
:param simulation_mode: the new simulation mode value.
:type simulation_mode: :py:class:`SimulationMode`
"""
# ensure we set the subordinate devices into to the same simulation mode.
self._smrb_device.simulationMode = self.simulation_mode
self._recv_device.simulationMode = self.simulation_mode
self._dsp_device.simulationMode = self.simulation_mode
self._stat_device.simulationMode = self.simulation_mode
def _handle_communication_state_change(
self: PstBeamComponentManager, communication_state: CommunicationStatus
) -> None:
if communication_state == CommunicationStatus.NOT_ESTABLISHED:
# fake going through states to have the communication established.
self._update_communication_state(CommunicationStatus.NOT_ESTABLISHED)
self._update_communication_state(CommunicationStatus.ESTABLISHED)
self._push_component_state_update(fault=None, power=PowerState.OFF)
self._device_interface.update_health_state(health_state=HealthState.OK)
elif communication_state == CommunicationStatus.DISABLED:
self._push_component_state_update(fault=None, power=PowerState.UNKNOWN)
self._update_communication_state(CommunicationStatus.DISABLED)
self._device_interface.update_health_state(health_state=HealthState.UNKNOWN)
[docs] def update_admin_mode(self: PstBeamComponentManager, admin_mode: AdminMode) -> None:
"""
Update the admin mode of the remote devices.
The adminMode of the remote devices should only be managed through the BEAM
device, and this method is called from the :py:class:`PstBeam` device to
make sure that the component manager will update the remote devices.
"""
self._smrb_device.adminMode = admin_mode
self._recv_device.adminMode = admin_mode
self._dsp_device.adminMode = admin_mode
self._stat_device.adminMode = admin_mode
def _submit_remote_job(
self: PstBeamComponentManager,
job: Task,
task_callback: Callback,
completion_callback: Callback,
) -> TaskResponse:
remote_job = _RemoteJob(
job,
task_executor=self._pst_task_executor,
completion_callback=completion_callback,
logger=self.logger,
)
return self.submit_task(
remote_job,
task_callback=task_callback,
)
def _subscribe_change_events(self: PstBeamComponentManager) -> None:
"""Subscribe to monitoring attributes of remote devices."""
self.logger.debug(f"{self.device_name} subscribing to monitoring events")
subscriptions_config = {
self._recv_device: [
"data_receive_rate",
"data_received",
"data_dropped",
"data_drop_rate",
"misordered_packets",
"misordered_packet_rate",
"malformed_packets",
"malformed_packet_rate",
"misdirected_packets",
"misdirected_packet_rate",
"checksum_failure_packets",
"checksum_failure_packet_rate",
"timestamp_sync_error_packets",
"timestamp_sync_error_packet_rate",
"seq_number_sync_error_packets",
"seq_number_sync_error_packet_rate",
"subband_beam_configuration",
"obs_state",
],
self._dsp_device: [
"data_record_rate",
"data_recorded",
"available_disk_space",
"available_recording_time",
"disk_capacity",
"disk_used_bytes",
"disk_used_percentage",
"obs_state",
],
self._smrb_device: [
"ring_buffer_utilisation",
"obs_state",
],
self._stat_device: [
"obs_state",
],
}
def _set_attr(attribute: str, value: Any) -> None:
try:
setattr(self, attribute, value)
except Exception:
self.logger.exception(f"Error in trying to set value to attribute {attribute}", exc_info=True)
def _subscribe_change_event(device: PstDeviceProxy, attribute: str) -> ChangeEventSubscription:
try:
device_attribute = as_device_attribute_name(attribute)
if attribute == "subband_beam_configuration":
callback = self._update_channel_block_configuration
elif attribute == "obs_state":
callback = functools.partial(self._handle_subdevice_obs_state_event, device)
else:
callback = functools.partial(_set_attr, attribute)
return device.subscribe_change_event(
attribute_name=device_attribute,
callback=callback,
)
except Exception:
self.logger.exception(f"Error in subscribing to change event of {device}")
raise
self._change_event_subscriptions = [
_subscribe_change_event(device, property)
for device, property_names in subscriptions_config.items()
for property in property_names
]
def _unsubscribe_change_events(self: PstBeamComponentManager) -> None:
"""Unsubscribe from current monitoring attributes of remote devices."""
for s in self._change_event_subscriptions:
s.unsubscribe()
self._change_event_subscriptions = []
@check_communicating
def on(self: PstBeamComponentManager, task_callback: Callback = None) -> TaskResponse:
"""
Turn the component on.
:param task_callback: callback to be called when the status of the command changes
"""
def _completion_callback(task_callback: Callable) -> None:
try:
self.logger.debug("All the 'On' commands have completed.")
self._subscribe_change_events()
self._push_component_state_update(power=PowerState.ON)
task_callback(status=TaskStatus.COMPLETED, result="Completed")
except Exception as e:
self.logger.exception("Error occured when dealing with turning on BEAM", exc_info=True)
task_callback(status=TaskStatus.FAILED, exception=e)
return self._submit_remote_job(
job=DeviceCommandTask(
devices=self._remote_devices,
action=lambda d: d.On(),
command_name="On",
),
task_callback=task_callback,
completion_callback=_completion_callback,
)
@check_communicating
def off(self: PstBeamComponentManager, task_callback: Callback = None) -> TaskResponse:
"""
Turn the component off.
:param task_callback: callback to be called when the status of the command changes
"""
from tango import DevState
def _completion_callback(task_callback: Callable) -> None:
try:
self.logger.debug("All the 'Off' commands have completed.")
self._push_component_state_update(power=PowerState.OFF)
task_callback(status=TaskStatus.COMPLETED, result="Completed")
except Exception as e:
self.logger.exception("Error occured when dealing with turning off BEAM", exc_info=True)
task_callback(status=TaskStatus.FAILED, exception=e)
# need to unsubscribe from monitoring events.
self._unsubscribe_change_events()
devices = [d for d in self._remote_devices if d.state() != DevState.OFF]
return self._submit_remote_job(
job=DeviceCommandTask(
devices=devices,
action=lambda d: d.Off(),
command_name="Off",
),
task_callback=task_callback,
completion_callback=_completion_callback,
)
@check_communicating
def standby(self: PstBeamComponentManager, task_callback: Callback = None) -> TaskResponse:
"""
Put the component is standby.
:param task_callback: callback to be called when the status of the command changes
"""
def _completion_callback(task_callback: Callable) -> None:
self.logger.debug("All the 'Standby' commands have completed.")
self._push_component_state_update(power=PowerState.STANDBY)
task_callback(status=TaskStatus.COMPLETED, result="Completed")
return self._submit_remote_job(
job=DeviceCommandTask(
devices=self._remote_devices,
action=lambda d: d.Standby(),
command_name="Standby",
),
task_callback=task_callback,
completion_callback=_completion_callback,
)
@check_communicating
def reset(self: PstBeamComponentManager, task_callback: Callback = None) -> TaskResponse:
"""
Reset the component.
:param task_callback: callback to be called when the status of the command changes
"""
def _completion_callback(task_callback: Callable) -> None:
self.logger.debug("All the 'Reset' commands have completed.")
self._push_component_state_update(power=PowerState.OFF)
task_callback(status=TaskStatus.COMPLETED, result="Completed")
return self._submit_remote_job(
job=DeviceCommandTask(
devices=self._remote_devices,
action=lambda d: d.Reset(),
command_name="Reset",
),
task_callback=task_callback,
completion_callback=_completion_callback,
)
def _as_pst_configure_scan_request(self: PstBeamComponentManager, configuration: Dict[str, Any]) -> dict:
"""Convert configure scan request into a PST request string."""
common_configure = configuration["common"]
pst_configuration = configuration["pst"]["scan"]
if self._device_interface.facility == TelescopeFacilityEnum.Low:
# force using a low Frequency Band if the facility is SKALow
common_configure["frequency_band"] = "low"
return {
**common_configure,
**pst_configuration,
}
[docs] def start_scan(
self: PstBeamComponentManager, args: Dict[str, Any], task_callback: Callback = None
) -> TaskResponse:
"""Start scanning."""
scan_id = str(args["scan_id"])
def _completion_callback(task_callback: Callable) -> None:
self.logger.debug("All the 'Scan' commands have completed.")
self._push_component_state_update(scanning=True)
self.scan_id = int(scan_id)
task_callback(status=TaskStatus.COMPLETED, result="Completed")
return self._submit_remote_job(
job=SequentialTask(
subtasks=[
LambdaTask(
action=lambda: self._write_scan_config_to_output_dir(scan_id),
name="write_scan_config_to_output_dir",
),
DeviceCommandTask(
devices=self._remote_devices,
action=lambda d: d.Scan(scan_id),
command_name="Scan",
),
]
),
task_callback=task_callback,
completion_callback=_completion_callback,
)
def _write_scan_config_to_output_dir(self: PstBeamComponentManager, scan_id: str) -> None:
"""Write the scan configuration out as JSON."""
import pathlib
self.logger.debug(f"Writing scan configuration for scan {scan_id}")
# dump current scan configuration as JSON
params = {
"eb_id": self._curr_scan_config["common"]["eb_id"], # type: ignore
"subsystem_id": self._device_interface.subsystem_id,
"scan_id": scan_id,
}
output_dir_str = self._device_interface.scan_output_dir_pattern
for k, v in params.items():
output_dir_str = output_dir_str.replace(f"<{k}>", v)
try:
output_dir = pathlib.Path(output_dir_str)
output_dir.mkdir(parents=True, exist_ok=True)
scan_configuration_path = output_dir / "scan_configuration.json"
self.logger.info(f"Writing scan configuration for scan {scan_id} to {scan_configuration_path}")
with open(scan_configuration_path, "w") as f:
json.dump(self._curr_scan_config, f)
except Exception:
self.logger.exception("Error in writting output file.", exc_info=True)
raise
[docs] def stop_scan(self: PstBeamComponentManager, task_callback: Callback = None) -> TaskResponse:
"""Stop scanning."""
def _completion_callback(task_callback: Callable) -> None:
self.logger.debug("All the 'EndScan' commands have completed.")
self._push_component_state_update(scanning=False)
self.scan_id = 0
task_callback(status=TaskStatus.COMPLETED, result="Completed")
# need to stop_scan on RECV before DSP and STAT, then SMRB
return self._submit_remote_job(
job=SequentialTask(
subtasks=[
DeviceCommandTask(
devices=[self._recv_device],
action=lambda d: d.EndScan(),
command_name="EndScan",
),
DeviceCommandTask(
devices=[self._dsp_device, self._stat_device],
action=lambda d: d.EndScan(),
command_name="EndScan",
),
DeviceCommandTask(
devices=[self._smrb_device],
action=lambda d: d.EndScan(),
command_name="EndScan",
),
]
),
task_callback=task_callback,
completion_callback=_completion_callback,
)
def _abort_task(self: PstBeamComponentManager, remote_devices: List[PstDeviceProxy]) -> Task:
# find devices that need to be put into an aborted state. These are any that
# are not in ABORTED, FAULT or EMPTY
devices_to_abort = [
d for d in remote_devices if d.obsState not in [ObsState.ABORTED, ObsState.FAULT, ObsState.EMPTY]
]
abort_subtask: Task = NoopTask()
if len(devices_to_abort) > 0:
abort_subtask = DeviceCommandTask(
devices=devices_to_abort,
action=lambda d: d.Abort(),
command_name="Abort",
)
return abort_subtask
[docs] def abort(self: PstBeamComponentManager, task_callback: Callback = None) -> TaskResponse:
"""Tell the component to abort whatever it was doing."""
# return self.abort_commands(task_callback=task_callback)
def _completion_callback(task_callback: Callable) -> None:
self.logger.debug("All the 'Abort' commands have completed.")
self._push_component_state_update(scanning=False)
self.abort_commands()
task_callback(status=TaskStatus.COMPLETED, result="Completed")
self._submit_remote_job(
job=SequentialTask(
subtasks=[
LambdaTask(
action=lambda: callback_safely(task_callback, status=TaskStatus.IN_PROGRESS),
name="abort_in_progress",
),
self._abort_task([self._recv_device]),
self._abort_task([self._dsp_device, self._stat_device]),
self._abort_task([self._smrb_device]),
]
),
task_callback=task_callback,
completion_callback=_completion_callback,
)
return TaskStatus.IN_PROGRESS, "Aborting"
[docs] def obsreset(self: PstBeamComponentManager, task_callback: Callback = None) -> TaskResponse:
"""Reset the component and put it into a READY state."""
def _completion_callback(task_callback: Callable) -> None:
self.logger.debug("All the 'ObsReset' commands have completed.")
self._push_component_state_update(configured=False)
self._device_interface.update_health_state(health_state=HealthState.OK)
task_callback(status=TaskStatus.COMPLETED, result="Completed")
abort_recv_subtask: Task = self._abort_task([self._recv_device])
abort_readers_subtask: Task = self._abort_task([self._dsp_device, self._stat_device])
abort_smrb_subtask: Task = self._abort_task([self._smrb_device])
# call ObsReset on devices that aren't in EMPTY state. This will move them
# into IDLE state.
#
# Cannot reset devices in parallel. This will reset the devices in the following order:
# DSP, RECV, and then SMRB
devices_to_reset = [d for d in reversed(self._remote_devices) if d.obsState != ObsState.EMPTY]
obsreset_subtasks: List[Task] = []
if len(devices_to_reset) > 0:
obsreset_subtasks = [
DeviceCommandTask(
devices=[d],
action=lambda d: d.ObsReset(),
command_name="ObsReset",
)
for d in devices_to_reset
]
return self._submit_remote_job(
job=SequentialTask(
subtasks=[
abort_recv_subtask,
abort_readers_subtask,
abort_smrb_subtask,
*obsreset_subtasks,
],
),
task_callback=task_callback,
completion_callback=_completion_callback,
)
[docs] def go_to_fault(
self: PstBeamComponentManager, fault_msg: str, task_callback: Callback = None
) -> TaskResponse:
"""Put all the sub-devices into a FAULT state."""
def _completion_callback(task_callback: Callable) -> None:
self.logger.debug("All the 'GoToFault' commands have completed.")
self._push_component_state_update(obsfault=True)
task_callback(status=TaskStatus.COMPLETED, result="Completed")
self._device_interface.handle_fault(fault_msg=fault_msg)
return self._submit_remote_job(
job=SequentialTask(
subtasks=[
DeviceCommandTask(
devices=[self._recv_device],
action=lambda d: d.GoToFault(fault_msg),
command_name="GoToFault",
),
DeviceCommandTask(
devices=[self._dsp_device, self._stat_device],
action=lambda d: d.GoToFault(fault_msg),
command_name="GoToFault",
),
DeviceCommandTask(
devices=[self._smrb_device],
action=lambda d: d.GoToFault(fault_msg),
command_name="GoToFault",
),
]
),
task_callback=task_callback,
completion_callback=_completion_callback,
)
[docs] def set_logging_level(self: PstBeamComponentManager, log_level: LoggingLevel) -> None:
"""
Set LoggingLevel of all the sub-devices.
:param log_level: The required TANGO LoggingLevel
:returns: None.
"""
for remote_device in self._remote_devices:
remote_device.loggingLevel = log_level
[docs] def set_monitoring_polling_rate(self: PstBeamComponentManager, monitor_polling_rate: int) -> None:
"""Set the monitoring polling rate on the subordinate devices."""
for d in self._remote_devices:
d.monitoringPollingRate = monitor_polling_rate