Skip to content
Snippets Groups Projects
rq_workflow.py 3.13 KiB
Newer Older
Simon Wittl's avatar
Simon Wittl committed
from __future__ import annotations

from rq_hardware.hardware_client import HardwareClient
from rq_ddetection.defect_detection_client import DefectDetectionClient

from rq_controller.common import PyProjection, PyRegionOfIntrest


import rclpy
from rclpy.node import Node

from scipy.spatial.transform import Rotation
import numpy as np


class WorkflowNode(Node):
    def __init__(self):
        super().__init__('rq_workflow_node')

        self.hardware_interface = HardwareClient()
        self.defect_detection_interface = DefectDetectionClient()


    def defect_detection(self, projections: list[PyProjection]) -> list[PyRegionOfIntrest]:
        future = self.defect_detection_interface.multi_projection_defect_detection(
            projections)
        
        rclpy.spin_until_future_complete(self.defect_detection_interface, future)

        response = future.result()
        roi_list = self.defect_detection_interface.response_2_py(response)

        return roi_list
    
    def aquire_projection(self, projection: PyProjection):
        future = self.hardware_interface.aquire_projection(projection)
        
        rclpy.spin_until_future_complete(self.hardware_interface, future)

        response = future.result()
        projection = self.hardware_interface.projection_response_2_py(response)

        return projection
    
    def check_reachability(self, projection: PyProjection):
        future = self.hardware_interface.check_reachability(projection)
        
        rclpy.spin_until_future_complete(self.hardware_interface, future)

        response = future.result()
        reached, cost, _ = self.hardware_interface.reachability_response_2_py(response)
        return reached, cost
    
def main():
    rclpy.init()
    
    workflow = WorkflowNode()

    number_of_projections = 10

    projection_list = list()
    angles = np.linspace(0, np.pi, number_of_projections)
    rotation = Rotation.from_euler('z', angles, degrees=False)

    fod_mm = 1000.
    fdd_mm = 2000.

    src_pos = np.array([0, fod_mm, 0])
    det_pos = np.array([0, fod_mm-fdd_mm, 0])

    for i in range(number_of_projections):
        src_pos_rotated = rotation[i].apply(src_pos)
        det_pos_rotated = rotation[i].apply(det_pos)
        quad = (rotation[i] * Rotation.from_euler('x', 90, degrees=True)).as_quat()

        scan_pose = PyProjection.dummy()
        scan_pose.focal_spot_mm = src_pos_rotated
        scan_pose.detector_postion_mm = det_pos_rotated
        scan_pose.detector_orientation_quad = quad

        scan_pose.voltage_kv = 100.
        scan_pose.current_ua = 1.
        scan_pose.exposure_time_ms = 500.

        reachable, cost = workflow.check_reachability(scan_pose)

        if reachable:
            print("Pose can be reached -> scan...")
        else:
            print("Pose not reachable -> skip ...")
            continue

        projection = workflow.aquire_projection(scan_pose)
        projection_list.append(projection)

    defect_roi = workflow.defect_detection(projection_list)

    for roi in defect_roi:
        print(f'Roi center: {roi.center_points_mm}')
        print(f'Roi dimension: {roi.dimensions_mm}')


if __name__ == '__main__':
    main()