Skip to content
Snippets Groups Projects
Commit 981730b0 authored by Simon Wittl's avatar Simon Wittl
Browse files

we

parent 4272d6fc
No related branches found
No related tags found
No related merge requests found
import artistlib.hardware
from ..base_hardware_service import BaseHardwareInterfaceService, AquireProjection
from ..base_hardware_service import BaseHardwareInterfaceService, AquireProjection, ReachabilityCheck
from rq_controller.common import PyProjection
try:
......@@ -12,6 +12,7 @@ except ModuleNotFoundError:
import ros2_numpy
import numpy as np
from pathlib import Path
from sensor_msgs.msg import Image
import rclpy
......@@ -20,6 +21,7 @@ class ArtistHardwareInterface(BaseHardwareInterfaceService):
projection_tempfolder: Path = Path(r'C:\dev\rq_workflow\src\rq_hardware\temp')
def __init__(self, node_name: str = 'rq_hardware_interface_service'):
super().__init__(node_name)
self.api = artistlib.API()
self.source = artistlib.hardware.XraySource(self.api)
......@@ -31,17 +33,19 @@ class ArtistHardwareInterface(BaseHardwareInterfaceService):
projection_geometry = PyProjection.from_message(request.scan_pose)
# move source
self.api.rotate_from_quat('S', projection_geometry.focal_spot_orientation_quad)
self.api.translate('S', request.scan_pose.projection_geometry.focal_spot_postion_mm.x,
request.scan_pose.projection_geometry.focal_spot_postion_mm.y,
request.scan_pose.projection_geometry.focal_spot_postion_mm.z)
# move detector
self.api.rotate_from_quat('D', projection_geometry.detector_orientation_quad)
self.api.translate('D', request.scan_pose.projection_geometry.detector_postion_mm.x,
request.scan_pose.projection_geometry.detector_postion_mm.y,
request.scan_pose.projection_geometry.detector_postion_mm.z)
if not np.isclose(self.source.voltage_kv, request.scan_pose.voltage_kv, atol=1.):
if not np.isclose(self.source.voltage_kv, request.scan_pose.voltage_kv, atol=0.1):
# Check if voltage is diffrent to avoid calculation of source spectrum
self.source.voltage_kv = request.scan_pose.voltage_kv
......@@ -54,6 +58,22 @@ class ArtistHardwareInterface(BaseHardwareInterfaceService):
response.projection = request.scan_pose
response.projection.image = ros2_numpy.msgify(Image, projection_image.astype(np.uint16), 'mono16')
self.projection_publisher.publish(response.projection.image)
self.projection_geometry_publisher.publish(response.projection.projection_geometry)
return response
def check_reachability_callback(self,
request: ReachabilityCheck.Request,
response: ReachabilityCheck.Response):
# include some logic
response.checked_scan_pose.scan_pose = request.scan_pose
response.checked_scan_pose.reachable = True
response.checked_scan_pose.cost = 42.
self.reachabilty_publisher.publish(response.checked_scan_pose)
return response
......
from rq_interfaces.msg import Projection
from rq_interfaces.srv import AquireProjection
from rq_interfaces.msg import Projection, ProjectionGeometry, CheckedScanPose
from rq_interfaces.srv import AquireProjection, ReachabilityCheck
from sensor_msgs.msg import Image
from std_msgs.msg import Float64
import rclpy
from rclpy.node import Node
......@@ -9,10 +11,22 @@ class BaseHardwareInterfaceService(Node):
def __init__(self, node_name: str = 'rq_base_hardware_interface_service'):
super().__init__(node_name)
self.srv = self.create_service(AquireProjection, 'aquire_projection', self.aquire_projection_callback)
self.srv = self.create_service(AquireProjection, 'reachability_check', self.reachability_check_callback)
self.projection_publisher = self.create_publisher(Image, 'current_projection_image', 5)
self.projection_geometry_publisher = self.create_publisher(ProjectionGeometry, 'current_projection_geometry', 5)
self.reachabilty_publisher = self.create_publisher(CheckedScanPose, 'checked_scan_pose', 5)
def aquire_projection_callback(self,
rquest: AquireProjection.Request,
request: AquireProjection.Request,
response: AquireProjection.Response):
raise NotImplementedError
def reachability_check_callback(self,
request: ReachabilityCheck.Request,
response: ReachabilityCheck.Response):
raise NotImplementedError
s
\ No newline at end of file
from __future__ import annotations
from rq_interfaces.msg import Projection
from rq_interfaces.srv import AquireProjection
from rq_interfaces.srv import AquireProjection, ReachabilityCheck
from rq_controller.common import PyProjection
......@@ -10,20 +12,36 @@ class HardwareClient(Node):
def __init__(self):
super().__init__('rq_hardware_client')
self.cli = self.create_client(AquireProjection, 'aquire_projection')
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('service not available, waiting again...')
self.req = AquireProjection.Request()
self.cli_projection = self.create_client(AquireProjection, 'aquire_projection')
while not self.cli_projection.wait_for_service(timeout_sec=1.0):
self.get_logger().info('projection service not available, waiting again...')
self.cli_reachability = self.create_client(ReachabilityCheck, 'reachability_check')
while not self.cli_projection.wait_for_service(timeout_sec=1.0):
self.get_logger().info('reachability service not available, waiting again...')
self.req_projection = AquireProjection.Request()
self.req_reachability = ReachabilityCheck.Request()
def aquire_projection(self, projection: PyProjection) -> rclpy.Future:
self.req.scan_pose = projection.as_message()
self.req_projection.scan_pose = projection.as_message()
return self.cli_projection.call_async(self.req_projection)
def check_reachability(self, projection: PyProjection) -> rclpy.Future:
self.req_reachability.scan_pose = projection.as_message()
return self.cli.call_async(self.req)
return self.cli_projection.call_async(self.req_reachability)
@staticmethod
def response_2_py(response: AquireProjection.Response) -> PyProjection:
def projection_response_2_py(response: AquireProjection.Response) -> PyProjection:
return PyProjection.from_message(response.projection)
@staticmethod
def reachability_response_2_py(response: ReachabilityCheck.Response) -> tuple[bool, float, PyProjection]:
return (response.checked_scan_pose.reachable,
response.checked_scan_pose.cost,
PyProjection.from_message(response.checked_scan_pose.scan_pose))
def main():
from matplotlib import pyplot as plt
......@@ -33,11 +51,19 @@ def main():
minimal_client = HardwareClient()
scan_pose = PyProjection.dummy()
future = minimal_client.check_reachability(scan_pose)
rclpy.spin_until_future_complete(minimal_client, future)
response = future.result()
reachability_check = minimal_client.reachability_response_2_py(response)
minimal_client.get_logger().info(
f'Pose is: {reachability_check[0]} \nMove cost: {reachability_check[1]}')
future = minimal_client.aquire_projection(scan_pose)
rclpy.spin_until_future_complete(minimal_client, future)
response = future.result()
projection = minimal_client.response_2_py(response)
projection = minimal_client.projection_response_2_py(response)
minimal_client.get_logger().info(
f'Received Projection ...')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment