-
Simon Wittl authoredSimon Wittl authored
base_hardware_service.py 2.24 KiB
from rq_interfaces.msg import Projection, ProjectionGeometry, CheckedScanPose
from rq_interfaces.srv import AquireProjection, ReachabilityCheck, ReachabilityMapCheck
from sensor_msgs.msg import Image
from std_msgs.msg import Float64
from rq_controller.common import PyProjection
import rclpy
from rclpy.node import Node
class BaseHardwareInterfaceService(Node):
interface_type: str = 'Base'
last_projection: PyProjection = None
def __init__(self, node_name: str = 'rq_base_hardware_interface_service'):
super().__init__(node_name)
self.aquire_projection_srv = self.create_service(AquireProjection, 'aquire_projection', self.aquire_projection_callback)
self.reachability_srv = self.create_service(ReachabilityCheck, 'check_reachability', self.check_reachability_callback)
self.reachability_map_srv = self.create_service(ReachabilityMapCheck, 'check_reachability_map', self.check_reachability_map_callback)
self.projection_publisher = self.create_publisher(Projection, 'current_projection_image', 5)
self.projection_geometry_publisher = self.create_publisher(ProjectionGeometry, 'current_projection_geometry', 5)
self.reachabilty_publisher = self.create_publisher(CheckedScanPose, 'checked_scan_pose', 5)
self.projection_subscriper = self.create_subscription(Projection, 'current_projection_image', self.projection_callback, 5)
def aquire_projection_callback(self,
request: AquireProjection.Request,
response: AquireProjection.Response):
raise NotImplementedError
def check_reachability_callback(self,
request: ReachabilityCheck.Request,
response: ReachabilityCheck.Response):
raise NotImplementedError
def check_reachability_map_callback(self,
request: ReachabilityMapCheck.Request,
response: ReachabilityMapCheck.Response):
raise NotImplementedError
def projection_callback(self, msg: Projection):
self.get_logger().info('got projection ...')
self.last_projection = PyProjection.from_message(msg)