import artistlib.hardware from ..base_hardware_service import BaseHardwareInterfaceService, AquireProjection, ReachabilityCheck from rq_controller.common import PyProjection import ros2_numpy import numpy as np from pathlib import Path from sensor_msgs.msg import Image try: from rosct_nodes.detector.client import DetectorClient, TriggerMode, AcquisitionMode # type: ignore from rosct_nodes.xraysource.client import XraySourceClient # type: ignore from rosct_nodes.ct_sequence.ct_sequence_node import CtSequenceNode # type: ignore from PythonTools.imageio import ImageFormat # type: ignore from PythonTools.ezrt_header import AcquisitionGeometry # type: ignore import rclpy from threading import Event import rosct_nodes.utilities.sequence_helper as sh # type: ignore from trajectory_toolbox.sorter import turntable_path_planning # type: ignore from roboct_moveit_commander_utils import RobotGoalTarget_from_Matrix # type: ignore from geometry_msgs.msg import Point, Quaternion from roboct_interfaces.msg import PlannerParameter # type: ignore from rosct_roboct_interface.roboct_moveit_commander_client import RoboCtMoveItCommander # type: ignore from trajectory_toolbox.util.util_trajectory import TrajectoryPoseManager, TrajectoryBase # type: ignore from trajectory_toolbox.util.util_poses import Frame, ScanPoseList, ScanPoseTuple # type: ignore except ModuleNotFoundError: raise ModuleNotFoundError("Packages are spedific for the RoboCT of the Deggendorf Institute of Technology. \n" + "Please Contact: simon.wittl@th-deg.de") class TurntablePlannerParameter: golden_angle_degree: float = 0. angle_weight: float = 0.01 class ThdRoboCTHardwareInterface(BaseHardwareInterfaceService): interface_type: str = 'THD' def __init__(self, node_name: str = 'rq_thd_hardware_interface_service'): super().__init__(node_name) self.client_detector = DetectorClient() self.client_xraysource = XraySourceClient() self.ct_node = CtSequenceNode() self.client_move_it_commander = RoboCtMoveItCommander() # check_xray_hardware self.check_xray_hardware_connections() self.check_xray_voltage() # init robot planner self.configure_turntable_planner_parameter() self.configure_planner_parameter() def print_info(self, msg: str): self.get_logger().info(msg) def print_error(self, msg: str): self.get_logger().error(msg) @property def stop_event(self) -> Event: return sh.get_stop_event(self.ct_node) def check_xray_hardware_connections(self): self.print_info('CHECK: XRay source and detector connections.') if not self.client_xraysource.is_connected: future = self.client_xraysource.connect() self.client_xraysource.wait_for_future_ready(future) if not self.client_detector.is_connected: future = self.client_detector.connect() self.client_detector.wait_for_future_ready(future) def check_xray_voltage(self): if not self.client_xraysource.is_xray_on: xray_on_future = self.client_xraysource.xray_on() self.client_xraysource.wait_for_future_ready(xray_on_future, timeout_sec=180.0) if not self.client_xraysource.poll_until_target_voltage_reached(self.stop_event): self.print_error('Xray source voltage not reached!') def configure_detector(self, number_of_projections: str, installed_prefilter: str, image_format: ImageFormat): self.print_info('CONFIGURE: Detector.') self.client_detector.reset_image_metadata() # this sequences runs in software trigger mode, thus the detector settings are adjusted - # the old trigger mode is saved and later reset self.previous_trigger_mode = self.client_detector.trigger_mode self.client_detector.trigger_mode = TriggerMode.SOFTWARE measurement_name = self.measurement_name self.client_detector.set_image_metadata('measurement_name', measurement_name) self.client_detector.set_image_metadata('acquisition_geometry', int(AcquisitionGeometry.ARBITRARY)) self.client_detector.set_image_metadata('number_of_projections', number_of_projections) self.client_detector.set_image_metadata('voltage_in_kv', self.client_xraysource.voltage_in_kv) self.client_detector.set_image_metadata('current_in_ua', self.client_xraysource.current_in_ua) self.client_detector.set_image_metadata('prefilter', installed_prefilter) exposure_time_in_sec = self.client_detector.exposure_time_in_ms / 1000.0 self.image_acquisition_timeout = max((exposure_time_in_sec * self.client_detector.num_averages + exposure_time_in_sec * self.client_detector.num_skip_images) * 5.0, 5.0) # start continuous acquisition of image data start_acquisition_future = self.client_detector.start_acquisition(mode=AcquisitionMode.CONTINUOUS) self.client_detector.wait_for_future_ready(start_acquisition_future, timeout_sec=20.0) # initialize saving of an image series with the expected number of projections (saving is stopped after the stated # amount of images), the desired fileprefix, savepath and imageformat. If it is a continued CT adjust values # accordingly. The used target save path (e.g. with time prefix) for saving is returned for later use. _ = self.client_detector.save_image_series(number_of_projections, fileprefix=measurement_name, startindex=0, savepath=str(self.projection_tempfolder), create_prefix_dir=True, imageformat=image_format) # all following images are tagged as valid projections self.client_detector.set_image_metadata('tag', 'projection') def configure_planner_parameter(self, acceleration_scaling: float = .9, velocity_scaling: float = .9, number_of_parallel_plans: int = 12, planning_time: float = 25.): """Configuration for the path planning of the robot via the MoveIt2 plugin Args: acceleration_scaling (float, optional): Maximal allowed acceleration for all joints. Defaults to 1.. velocity_scaling (float, optional): Maximal allowed velocity for all joint. Defaults to 0.5. number_of_parallel_plans (int, optional): Defaults to 12. planning_time (float, optional): Maximal planning time before stop. Defaults to 5. """ self.print_info('CONFIGURE: Planner parameters.') self.parameter_planner = PlannerParameter() self.parameter_planner.acceleration_scaling = acceleration_scaling self.parameter_planner.velocity_scaling = velocity_scaling self.parameter_planner.number_of_parallel_plans = number_of_parallel_plans self.parameter_planner.planning_time = planning_time def configure_turntable_planner_parameter(self, golden_angle_degree: float = 0, angle_weight: float = 1e7) -> TurntablePlannerParameter: """Configuration for the use of the turn table to maximize the reachable scan poses. Args: golden_angle_degree (float, optional): All Primary beams are parallel to this z-angle. Defaults to 0. angle_weight (float, optional): Weight for the feature vector (Position, Turntable angle * w) for the minimizing of the robot path length of a full ct trajectory. Defaults to 0.01. Returns: TurntablePlannerParameter: Class for storing the turntalbe planning parameters """ self.parameter_turntable_planer = TurntablePlannerParameter() self.parameter_turntable_planer.golden_angle_degree = golden_angle_degree self.parameter_turntable_planer.angle_weight = angle_weight self.print_info(f'TURNTABLEPLANNING: {golden_angle_degree} {angle_weight}') def aquire_projection_callback(self, request: AquireProjection.Request, response: AquireProjection.Response): trajectory = TrajectoryBase('rq_move') scan_pose_list = ScanPoseList() planned_projection = PyProjection.from_message(request.scan_pose) source_frame = Frame.from_position_and_quaternion(planned_projection.focal_spot_mm, planned_projection.focal_spot_orientation_quad) detector_frame = Frame.from_position_and_quaternion(planned_projection.detector_postion_mm, planned_projection.detector_orientation_quad) scan_pose = ScanPoseTuple(source_frame, detector_frame) scan_pose_list = ScanPoseList() scan_pose_list.add_pose_tuple(scan_pose) trajectory._scan_pose_list = scan_pose_list tpm = TrajectoryPoseManager(trajectory) pass def check_reachability_callback(self, request: ReachabilityCheck.Request, response: ReachabilityCheck.Response): pass def main(args=None): rclpy.init(args=args) minimal_service = ThdRoboCTHardwareInterface() rclpy.spin(minimal_service) rclpy.shutdown() if __name__ == '__main__': main()