from rq_interfaces.msg import Projection, ProjectionGeometry, CheckedScanPose from rq_interfaces.srv import AquireProjection, ReachabilityCheck, ReachabilityMapCheck from sensor_msgs.msg import Image from std_msgs.msg import Float64 from rq_controller.common import PyProjection import rclpy from rclpy.node import Node from pathlib import Path class BaseHardwareInterfaceService(Node): """ BaseHardwareInterfaceService is a ROS 2 Node that provides a base interface for hardware interaction services. Attributes: interface_type (str): The type of interface. last_projection (PyProjection): The last received projection. measurement_name (str): The name of the measurement. projection_tempfolder (Path): The temporary folder for storing projections. """ interface_type: str = 'Base' last_projection: PyProjection = None measurement_name: str = 'projection' projection_tempfolder: Path = Path(r'C:\dev\rq_workflow\src\rq_hardware\temp') def __init__(self, node_name: str = 'rq_base_hardware_interface_service'): """ Initialize the BaseHardwareInterfaceService node, create services and publishers, and set up a subscription. :param node_name: The name of the node. Defaults to 'rq_base_hardware_interface_service'. """ super().__init__(node_name, namespace="rq") self.aquire_projection_srv = self.create_service(AquireProjection, 'aquire_projection', self.aquire_projection_callback) self.reachability_srv = self.create_service(ReachabilityCheck, 'check_reachability', self.check_reachability_callback) self.reachability_map_srv = self.create_service(ReachabilityMapCheck, 'check_reachability_map', self.check_reachability_map_callback) self.projection_publisher = self.create_publisher(Projection, 'current_projection_image', 5) self.projection_geometry_publisher = self.create_publisher(ProjectionGeometry, 'current_projection_geometry', 5) self.reachabilty_publisher = self.create_publisher(CheckedScanPose, 'checked_scan_pose', 5) self.projection_subscriper = self.create_subscription(Projection, 'current_projection_image', self.projection_callback, 5) def aquire_projection_callback(self, request: AquireProjection.Request, response: AquireProjection.Response): """ Handle the aquire_projection service request. :param request: The service request for acquiring a projection. :param response: The service response to be filled. :raises NotImplementedError: This method should be overridden by a subclass. """ raise NotImplementedError def check_reachability_callback(self, request: ReachabilityCheck.Request, response: ReachabilityCheck.Response): """ Handle the check_reachability service request. :param request: The service request for checking reachability. :param response: The service response to be filled. :raises NotImplementedError: This method should be overridden by a subclass. """ raise NotImplementedError def check_reachability_map_callback(self, request: ReachabilityMapCheck.Request, response: ReachabilityMapCheck.Response): """ Handle the check_reachability_map service request. :param request: The service request for checking the reachability map. :param response: The service response to be filled. :raises NotImplementedError: This method should be overridden by a subclass. """ raise NotImplementedError def projection_callback(self, msg: Projection): """ Handle incoming Projection messages. :param msg: The received Projection message. """ self.get_logger().info('got projection ...') self.last_projection = PyProjection.from_message(msg)