-
Simon Wittl authoredSimon Wittl authored
thd_roboct_service.py 9.50 KiB
import artistlib.hardware
from ..base_hardware_service import BaseHardwareInterfaceService, AquireProjection, ReachabilityCheck
from rq_controller.common import PyProjection
import ros2_numpy
import numpy as np
from pathlib import Path
from sensor_msgs.msg import Image
try:
from rosct_nodes.detector.client import DetectorClient, TriggerMode, AcquisitionMode # type: ignore
from rosct_nodes.xraysource.client import XraySourceClient # type: ignore
from rosct_nodes.ct_sequence.ct_sequence_node import CtSequenceNode # type: ignore
from PythonTools.imageio import ImageFormat # type: ignore
from PythonTools.ezrt_header import AcquisitionGeometry # type: ignore
import rclpy
from threading import Event
import rosct_nodes.utilities.sequence_helper as sh # type: ignore
from trajectory_toolbox.sorter import turntable_path_planning # type: ignore
from roboct_moveit_commander_utils import RobotGoalTarget_from_Matrix # type: ignore
from geometry_msgs.msg import Point, Quaternion
from roboct_interfaces.msg import PlannerParameter # type: ignore
from rosct_roboct_interface.roboct_moveit_commander_client import RoboCtMoveItCommander # type: ignore
from trajectory_toolbox.util.util_trajectory import TrajectoryPoseManager, TrajectoryBase # type: ignore
from trajectory_toolbox.util.util_poses import Frame, ScanPoseList, ScanPoseTuple # type: ignore
except ModuleNotFoundError:
import warnings
warnings.warn("Packages are spedific for the RoboCT of the Deggendorf Institute of Technology. \n" +
"Please Contact: simon.wittl@th-deg.de")
class TurntablePlannerParameter:
golden_angle_degree: float = 0.
angle_weight: float = 0.01
class ThdRoboCTHardwareInterface(BaseHardwareInterfaceService):
interface_type: str = 'THD'
def __init__(self, node_name: str = 'rq_thd_hardware_interface_service'):
super().__init__(node_name)
self.client_detector = DetectorClient()
self.client_xraysource = XraySourceClient()
self.ct_node = CtSequenceNode()
self.client_move_it_commander = RoboCtMoveItCommander()
# check_xray_hardware
self.check_xray_hardware_connections()
self.check_xray_voltage()
# init robot planner
self.configure_turntable_planner_parameter()
self.configure_planner_parameter()
def print_info(self, msg: str):
self.get_logger().info(msg)
def print_error(self, msg: str):
self.get_logger().error(msg)
@property
def stop_event(self) -> Event:
return sh.get_stop_event(self.ct_node)
def check_xray_hardware_connections(self):
self.print_info('CHECK: XRay source and detector connections.')
if not self.client_xraysource.is_connected:
future = self.client_xraysource.connect()
self.client_xraysource.wait_for_future_ready(future)
if not self.client_detector.is_connected:
future = self.client_detector.connect()
self.client_detector.wait_for_future_ready(future)
def check_xray_voltage(self):
if not self.client_xraysource.is_xray_on:
xray_on_future = self.client_xraysource.xray_on()
self.client_xraysource.wait_for_future_ready(xray_on_future, timeout_sec=180.0)
if not self.client_xraysource.poll_until_target_voltage_reached(self.stop_event):
self.print_error('Xray source voltage not reached!')
def configure_detector(self,
number_of_projections: str,
installed_prefilter: str,
image_format: ImageFormat):
self.print_info('CONFIGURE: Detector.')
self.client_detector.reset_image_metadata()
# this sequences runs in software trigger mode, thus the detector settings are adjusted -
# the old trigger mode is saved and later reset
self.previous_trigger_mode = self.client_detector.trigger_mode
self.client_detector.trigger_mode = TriggerMode.SOFTWARE
measurement_name = self.measurement_name
self.client_detector.set_image_metadata('measurement_name', measurement_name)
self.client_detector.set_image_metadata('acquisition_geometry', int(AcquisitionGeometry.ARBITRARY))
self.client_detector.set_image_metadata('number_of_projections', number_of_projections)
self.client_detector.set_image_metadata('voltage_in_kv', self.client_xraysource.voltage_in_kv)
self.client_detector.set_image_metadata('current_in_ua', self.client_xraysource.current_in_ua)
self.client_detector.set_image_metadata('prefilter', installed_prefilter)
exposure_time_in_sec = self.client_detector.exposure_time_in_ms / 1000.0
self.image_acquisition_timeout = max((exposure_time_in_sec * self.client_detector.num_averages
+ exposure_time_in_sec * self.client_detector.num_skip_images) * 5.0, 5.0)
# start continuous acquisition of image data
start_acquisition_future = self.client_detector.start_acquisition(mode=AcquisitionMode.CONTINUOUS)
self.client_detector.wait_for_future_ready(start_acquisition_future, timeout_sec=20.0)
# initialize saving of an image series with the expected number of projections (saving is stopped after the stated
# amount of images), the desired fileprefix, savepath and imageformat. If it is a continued CT adjust values
# accordingly. The used target save path (e.g. with time prefix) for saving is returned for later use.
_ = self.client_detector.save_image_series(number_of_projections, fileprefix=measurement_name,
startindex=0,
savepath=str(self.projection_tempfolder),
create_prefix_dir=True,
imageformat=image_format)
# all following images are tagged as valid projections
self.client_detector.set_image_metadata('tag', 'projection')
def configure_planner_parameter(self,
acceleration_scaling: float = .9,
velocity_scaling: float = .9,
number_of_parallel_plans: int = 12,
planning_time: float = 25.):
"""Configuration for the path planning of the robot via the MoveIt2 plugin
Args:
acceleration_scaling (float, optional): Maximal allowed acceleration for all joints. Defaults to 1..
velocity_scaling (float, optional): Maximal allowed velocity for all joint. Defaults to 0.5.
number_of_parallel_plans (int, optional): Defaults to 12.
planning_time (float, optional): Maximal planning time before stop. Defaults to 5.
"""
self.print_info('CONFIGURE: Planner parameters.')
self.parameter_planner = PlannerParameter()
self.parameter_planner.acceleration_scaling = acceleration_scaling
self.parameter_planner.velocity_scaling = velocity_scaling
self.parameter_planner.number_of_parallel_plans = number_of_parallel_plans
self.parameter_planner.planning_time = planning_time
def configure_turntable_planner_parameter(self,
golden_angle_degree: float = 0,
angle_weight: float = 1e7) -> TurntablePlannerParameter:
"""Configuration for the use of the turn table to maximize the reachable scan poses.
Args:
golden_angle_degree (float, optional): All Primary beams are parallel to this z-angle. Defaults to 0.
angle_weight (float, optional): Weight for the feature vector (Position, Turntable angle * w) for the minimizing of the robot path length of a full ct trajectory. Defaults to 0.01.
Returns:
TurntablePlannerParameter: Class for storing the turntalbe planning parameters
"""
self.parameter_turntable_planer = TurntablePlannerParameter()
self.parameter_turntable_planer.golden_angle_degree = golden_angle_degree
self.parameter_turntable_planer.angle_weight = angle_weight
self.print_info(f'TURNTABLEPLANNING: {golden_angle_degree} {angle_weight}')
def aquire_projection_callback(self,
request: AquireProjection.Request,
response: AquireProjection.Response):
trajectory = TrajectoryBase('rq_move')
scan_pose_list = ScanPoseList()
planned_projection = PyProjection.from_message(request.scan_pose)
source_frame = Frame.from_position_and_quaternion(planned_projection.focal_spot_mm,
planned_projection.focal_spot_orientation_quad)
detector_frame = Frame.from_position_and_quaternion(planned_projection.detector_postion_mm,
planned_projection.detector_orientation_quad)
scan_pose = ScanPoseTuple(source_frame, detector_frame)
scan_pose_list = ScanPoseList()
scan_pose_list.add_pose_tuple(scan_pose)
trajectory._scan_pose_list = scan_pose_list
tpm = TrajectoryPoseManager(trajectory)
pass
def check_reachability_callback(self,
request: ReachabilityCheck.Request,
response: ReachabilityCheck.Response):
pass
def main(args=None):
rclpy.init(args=args)
minimal_service = ThdRoboCTHardwareInterface()
rclpy.spin(minimal_service)
rclpy.shutdown()
if __name__ == '__main__':
main()