-
Simon Wittl authoredSimon Wittl authored
hardware_client.py 6.07 KiB
from __future__ import annotations
from rq_interfaces.msg import Projection
from rq_interfaces.srv import AquireProjection, ReachabilityCheck, ReachabilityMapCheck
from rq_controller.common import PyProjection, PyRegionOfIntrest, PyVolume
import rclpy
from rclpy.node import Node
import numpy as np
class HardwareClient(Node):
"""
HardwareClient is a ROS 2 Node that interacts with several services to acquire projections
and check reachability of given projections and regions of interest.
"""
def __init__(self, **kwargs):
"""
Initialize the HardwareClient node, create service clients, and wait for services to be available.
:param kwargs: Additional keyword arguments to pass to the Node initialization.
"""
super().__init__('rq_hardware_client', namespace="rq", **kwargs)
self.cli_projection = self.create_client(AquireProjection, 'aquire_projection')
while not self.cli_projection.wait_for_service(timeout_sec=1.0):
self.get_logger().info('projection service not available, waiting again...')
self.cli_reachability = self.create_client(ReachabilityCheck, 'check_reachability')
while not self.cli_projection.wait_for_service(timeout_sec=1.0):
self.get_logger().info('reachability service not available, waiting again...')
self.cli_reachability_map = self.create_client(ReachabilityMapCheck, 'check_reachability_map')
while not self.cli_projection.wait_for_service(timeout_sec=1.0):
self.get_logger().info('reachability MAP service not available, waiting again...')
self.req_projection = AquireProjection.Request()
self.req_reachability = ReachabilityCheck.Request()
self.req_reachability_map = ReachabilityMapCheck.Request()
def aquire_projection(self, projection: PyProjection) -> rclpy.Future:
"""
Send a request to acquire a projection.
:param projection: The PyProjection object containing the scan pose.
:return: A Future object representing the result of the asynchronous service call.
"""
self.req_projection.scan_pose = projection.as_message()
return self.cli_projection.call_async(self.req_projection)
def check_reachability(self, projection: PyProjection) -> rclpy.Future:
"""
Send a request to check the reachability of a given projection.
:param projection: The PyProjection object containing the scan pose.
:return: A Future object representing the result of the asynchronous service call.
"""
self.req_reachability.scan_pose = projection.as_message()
return self.cli_reachability.call_async(self.req_reachability)
def check_reachability_map(self, source_position_roi_mm: PyRegionOfIntrest, center_mm: np.ndarray, fdd_mm: float) -> rclpy.Future:
"""
Send a request to check the reachability map for a given region of interest and focal detector distance.
:param source_position_roi_mm: The PyRegionOfIntrest object containing the region of interest.
:param center_mm: A numpy array representing the center position in millimeters.
:param fdd_mm: The focal detector distance in millimeters.
:return: A Future object representing the result of the asynchronous service call.
"""
center_mm = center_mm.reshape((3,))
self.req_reachability_map.center.x = center_mm[0]
self.req_reachability_map.center.y = center_mm[1]
self.req_reachability_map.center.z = center_mm[2]
self.req_reachability_map.region_of_intrest = source_position_roi_mm.as_message()
self.req_reachability_map.fdd_mm = fdd_mm
return self.cli_reachability_map.call_async(self.req_reachability_map)
@staticmethod
def projection_response_2_py(response: AquireProjection.Response) -> PyProjection:
"""
Convert an AquireProjection response to a PyProjection object.
:param response: The AquireProjection response message.
:return: A PyProjection object.
"""
return PyProjection.from_message(response.projection)
@staticmethod
def reachability_response_2_py(response: ReachabilityCheck.Response) -> tuple[bool, float, PyProjection]:
"""
Convert a ReachabilityCheck response to a tuple containing reachability information.
:param response: The ReachabilityCheck response message.
:return: A tuple containing (reachable: bool, cost: float, scan_pose: PyProjection).
"""
return (response.checked_scan_pose.reachable,
response.checked_scan_pose.cost,
PyProjection.from_message(response.checked_scan_pose.scan_pose))
@staticmethod
def reachability_map_response_2_py(response: ReachabilityMapCheck.Response) -> PyVolume:
"""
Convert a ReachabilityMapCheck response to a PyVolume object.
:param response: The ReachabilityMapCheck response message.
:return: A PyVolume object.
"""
return PyVolume.from_message(response.ocupation_map)
def main():
"""
Main function to initialize the ROS 2 node, perform reachability checks, and acquire projections.
"""
from matplotlib import pyplot as plt
rclpy.init()
minimal_client = HardwareClient()
scan_pose = PyProjection.dummy()
future = minimal_client.check_reachability(scan_pose)
rclpy.spin_until_future_complete(minimal_client, future)
response = future.result()
reachability_check = minimal_client.reachability_response_2_py(response)
minimal_client.get_logger().info(
f'Pose is: {reachability_check[0]} \nMove cost: {reachability_check[1]}')
future = minimal_client.aquire_projection(scan_pose)
rclpy.spin_until_future_complete(minimal_client, future)
response = future.result()
projection = minimal_client.projection_response_2_py(response)
minimal_client.get_logger().info(
f'Received Projection ...')
minimal_client.destroy_node()
rclpy.shutdown()
plt.imshow(projection.image)
plt.show()
if __name__ == '__main__':
main()