from __future__ import annotations from rq_hardware.hardware_client import HardwareClient from rq_ddetection.defect_detection_client import DefectDetectionClient from rq_controller.common import PyProjection, PyRegionOfIntrest import rclpy from rclpy.node import Node from scipy.spatial.transform import Rotation import numpy as np class WorkflowNode(Node): def __init__(self): super().__init__('rq_workflow_node') self.hardware_interface = HardwareClient() self.defect_detection_interface = DefectDetectionClient() def defect_detection(self, projections: list[PyProjection]) -> list[PyRegionOfIntrest]: future = self.defect_detection_interface.multi_projection_defect_detection( projections) rclpy.spin_until_future_complete(self.defect_detection_interface, future) response = future.result() roi_list = self.defect_detection_interface.response_2_py(response) return roi_list def aquire_projection(self, projection: PyProjection): future = self.hardware_interface.aquire_projection(projection) rclpy.spin_until_future_complete(self.hardware_interface, future) response = future.result() projection = self.hardware_interface.projection_response_2_py(response) return projection def check_reachability(self, projection: PyProjection): future = self.hardware_interface.check_reachability(projection) rclpy.spin_until_future_complete(self.hardware_interface, future) response = future.result() reached, cost, _ = self.hardware_interface.reachability_response_2_py(response) return reached, cost def main(): rclpy.init() workflow = WorkflowNode() number_of_projections = 10 projection_list = list() angles = np.linspace(0, np.pi, number_of_projections) rotation = Rotation.from_euler('z', angles, degrees=False) fod_mm = 1000. fdd_mm = 2000. src_pos = np.array([0, fod_mm, 0]) det_pos = np.array([0, fod_mm-fdd_mm, 0]) for i in range(number_of_projections): src_pos_rotated = rotation[i].apply(src_pos) det_pos_rotated = rotation[i].apply(det_pos) quad = (rotation[i] * Rotation.from_euler('x', 90, degrees=True)).as_quat() scan_pose = PyProjection.dummy() scan_pose.focal_spot_mm = src_pos_rotated scan_pose.detector_postion_mm = det_pos_rotated scan_pose.detector_orientation_quad = quad scan_pose.voltage_kv = 100. scan_pose.current_ua = 1. scan_pose.exposure_time_ms = 500. reachable, cost = workflow.check_reachability(scan_pose) if reachable: print("Pose can be reached -> scan...") else: print("Pose not reachable -> skip ...") continue projection = workflow.aquire_projection(scan_pose) projection_list.append(projection) defect_roi = workflow.defect_detection(projection_list) for roi in defect_roi: print(f'Roi center: {roi.center_points_mm}') print(f'Roi dimension: {roi.dimensions_mm}') if __name__ == '__main__': main()