Skip to content
Snippets Groups Projects
Commit e712c3e4 authored by Simon Wittl's avatar Simon Wittl
Browse files

echo update

parent 980b9a05
No related branches found
No related tags found
No related merge requests found
......@@ -2,8 +2,10 @@ from __future__ import annotations
from rq_hardware.hardware_client import HardwareClient
from rq_ddetection.defect_detection_client import DefectDetectionClient
from rq_reconstruction.reconstruction_client import ReconstructionClient
from rq_trajectory.trajectory_optimization_client import TrajectoryOptimizationClient
from rq_controller.common import PyProjection, PyRegionOfIntrest
from rq_controller.common import PyProjection, PyRegionOfIntrest, PyVolume
import rclpy
......@@ -13,12 +15,13 @@ from scipy.spatial.transform import Rotation
import numpy as np
class WorkflowNode(Node):
class WorkflowNode():
def __init__(self):
super().__init__('rq_workflow_node')
self.hardware_interface = HardwareClient()
self.defect_detection_interface = DefectDetectionClient()
self.trajectory = TrajectoryOptimizationClient()
self.reconstruction = ReconstructionClient()
def defect_detection(self, projections: list[PyProjection]) -> list[PyRegionOfIntrest]:
......@@ -51,55 +54,58 @@ class WorkflowNode(Node):
reached, cost, _ = self.hardware_interface.reachability_response_2_py(response)
return reached, cost
def get_volume(self, projection_stack: list[PyProjection], roi: PyRegionOfIntrest):
future = self.reconstruction.get_volume(projection_stack, roi)
def main():
rclpy.init()
rclpy.spin_until_future_complete(self.reconstruction, future)
response = future.result()
return self.reconstruction.response_2_py(response)
workflow = WorkflowNode()
def trajectory_update(self, current_projection: PyProjection):
future = self.trajectory.trajectory_update(current_projection)
number_of_projections = 10
rclpy.spin_until_future_complete(self.trajectory, future)
projection_list = list()
angles = np.linspace(0, np.pi, number_of_projections)
rotation = Rotation.from_euler('z', angles, degrees=False)
response = future.result()
return self.trajectory.trajectory_update_response_2_py(response)
def trajectory_initialize(self, roi: list[PyRegionOfIntrest]):
future = self.trajectory.trajectory_initialize(roi)
fod_mm = 1000.
fdd_mm = 2000.
rclpy.spin_until_future_complete(self.trajectory, future)
src_pos = np.array([0, fod_mm, 0])
det_pos = np.array([0, fod_mm-fdd_mm, 0])
for i in range(number_of_projections):
src_pos_rotated = rotation[i].apply(src_pos)
det_pos_rotated = rotation[i].apply(det_pos)
quad = (rotation[i] * Rotation.from_euler('x', 90, degrees=True)).as_quat()
def main():
print('This minimal example need the nodes described in the rq_workflow/launch/echo_launch.py script.')
rclpy.init()
workflow = WorkflowNode()
print('Aquire some scout views...')
projection_list = list()
for i in range(5):
print(f' - At projection {i}')
scan_pose = PyProjection.dummy()
scan_pose.focal_spot_mm = src_pos_rotated
scan_pose.detector_postion_mm = det_pos_rotated
scan_pose.detector_orientation_quad = quad
scan_pose.voltage_kv = 100.
scan_pose.current_ua = 1.
scan_pose.exposure_time_ms = 500.
scan_pose.focal_spot_mm += i
projection_list.append(workflow.aquire_projection(scan_pose))
reachable, cost = workflow.check_reachability(scan_pose)
print('Detect errors in scout views ...')
roi = workflow.defect_detection(projection_list)
if reachable:
print("Pose can be reached -> scan...")
else:
print("Pose not reachable -> skip ...")
continue
print('Init traj opt ...')
workflow.trajectory_initialize(roi)
projection = workflow.aquire_projection(scan_pose)
projection_list.append(projection)
print('Get next scan pose from traj opt ...')
next_scan_pose, finished = workflow.trajectory_update(projection_list[-1])
defect_roi = workflow.defect_detection(projection_list)
print('Aquire scan pose ...')
projection_list.append(workflow.aquire_projection(next_scan_pose))
for roi in defect_roi:
print(f'Roi center: {roi.center_points_mm}')
print(f'Roi dimension: {roi.dimensions_mm}')
print('Resconstruct projections ...')
volume = workflow.get_volume(projection_list, roi[0])
print('FINISHED !!!')
if __name__ == '__main__':
main()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment