Skip to content
Snippets Groups Projects
base_hardware_service.py 4.02 KiB
from rq_interfaces.msg import Projection, ProjectionGeometry, CheckedScanPose
from rq_interfaces.srv import AquireProjection, ReachabilityCheck, ReachabilityMapCheck
from sensor_msgs.msg import Image
from std_msgs.msg import Float64
from rq_controller.common import PyProjection

import rclpy
from rclpy.node import Node
from pathlib import Path


class BaseHardwareInterfaceService(Node):
    """
    BaseHardwareInterfaceService is a ROS 2 Node that provides a base interface for hardware interaction services.

    Attributes:
        interface_type (str): The type of interface.
        last_projection (PyProjection): The last received projection.
        measurement_name (str): The name of the measurement.
        projection_tempfolder (Path): The temporary folder for storing projections.
    """
    interface_type: str = 'Base'
    last_projection: PyProjection = None
    measurement_name: str = 'projection'
    projection_tempfolder: Path = Path(r'C:\dev\rq_workflow\src\rq_hardware\temp')
    
    def __init__(self, node_name: str = 'rq_base_hardware_interface_service'):
        """
        Initialize the BaseHardwareInterfaceService node, create services and publishers, and set up a subscription.

        :param node_name: The name of the node. Defaults to 'rq_base_hardware_interface_service'.
        """
        super().__init__(node_name, namespace="rq")
        self.aquire_projection_srv = self.create_service(AquireProjection, 'aquire_projection', self.aquire_projection_callback)
        self.reachability_srv = self.create_service(ReachabilityCheck, 'check_reachability', self.check_reachability_callback)
        self.reachability_map_srv = self.create_service(ReachabilityMapCheck, 'check_reachability_map', self.check_reachability_map_callback)

        self.projection_publisher = self.create_publisher(Projection, 'current_projection_image', 5)
        self.projection_geometry_publisher = self.create_publisher(ProjectionGeometry, 'current_projection_geometry', 5)
        self.reachabilty_publisher = self.create_publisher(CheckedScanPose, 'checked_scan_pose', 5)

        self.projection_subscriper = self.create_subscription(Projection, 'current_projection_image', self.projection_callback, 5)

    def aquire_projection_callback(self,
                                   request: AquireProjection.Request,
                                   response: AquireProjection.Response):
        """
        Handle the aquire_projection service request.

        :param request: The service request for acquiring a projection.
        :param response: The service response to be filled.
        :raises NotImplementedError: This method should be overridden by a subclass.
        """
        raise NotImplementedError
    
    def check_reachability_callback(self,
                                    request: ReachabilityCheck.Request,
                                    response: ReachabilityCheck.Response):
        """
        Handle the check_reachability service request.

        :param request: The service request for checking reachability.
        :param response: The service response to be filled.
        :raises NotImplementedError: This method should be overridden by a subclass.
        """
        raise NotImplementedError
    
    def check_reachability_map_callback(self,
                                        request: ReachabilityMapCheck.Request,
                                        response: ReachabilityMapCheck.Response):
        """
        Handle the check_reachability_map service request.

        :param request: The service request for checking the reachability map.
        :param response: The service response to be filled.
        :raises NotImplementedError: This method should be overridden by a subclass.
        """
        raise NotImplementedError
    
    def projection_callback(self, msg: Projection):
        """
        Handle incoming Projection messages.

        :param msg: The received Projection message.
        """
        self.get_logger().info('got projection ...')
        self.last_projection = PyProjection.from_message(msg)