from ..base_hardware_service import BaseHardwareInterfaceService from rq_interfaces.srv import ReachabilityMapCheck from rq_interfaces.msg import ServiceStatus, Volume from ..base_hardware_service import BaseHardwareInterfaceService, AquireProjection, ReachabilityCheck, ReachabilityMapCheck from rq_controller.common import PyProjection, PyRegionOfIntrest, PyVolume import numpy as np import rclpy class EchoHardwareService(BaseHardwareInterfaceService): """ EchoHardwareInterface is a dummy hardware interface service node for testing rq workflows. """ def __init__(self, node_name: str = 'rq_hardware_interface_service'): """ Initialize the EchoHardwareService node. :param node_name: The name of the node. Defaults to 'rq_hardware_interface_service'. """ super().__init__(node_name) def aquire_projection_callback(self, request: AquireProjection.Request, response: AquireProjection.Response): """ Handle the aquire_projection service request by setting the X-ray source and detector positions and acquiring a projection image. :param request: The service request containing the scan pose information. :param response: The service response to be filled with the acquired projection. :return: The updated service response with the projection image. """ projection = PyProjection.from_message(request.scan_pose) response.projection = projection return response def check_reachability_callback(self, request: ReachabilityCheck.Request, response: ReachabilityCheck.Response): """ Handle the check_reachability service request by verifying if a scan pose is reachable. :param request: The service request containing the scan pose to check. :param response: The service response to be filled with the reachability result. :return: The updated service response with the reachability status. """ response.checked_scan_pose.scan_pose = request.scan_pose response.checked_scan_pose.reachable = request.scan_pose.current_ua > 100. # just to controll the output and catch diffrent cases with tests... response.checked_scan_pose.cost = request.scan_pose.voltage_kv # just to controll the output and catch diffrent cases with tests... return response def check_reachability_map_callback(self, request: ReachabilityMapCheck.Request, response: ReachabilityMapCheck.Response): """ Handle the check_reachability_map service request by generating a voxel occupancy map based on the collision mesh. :param request: The service request containing the region of interest and other parameters. :param response: The service response to be filled with the voxel occupancy map. :return: The updated service response with the voxel occupancy map. """ roi = PyRegionOfIntrest.from_message(request.region_of_intrest) ocupation_map = np.random.randint(0, 1, size=roi.shape, dtype=np.uint8) response.ocupation_map = ocupation_map response.status.success = True return response def main(args=None): rclpy.init(args=args) minimal_service = EchoHardwareService() rclpy.spin(minimal_service) rclpy.shutdown() if __name__ == '__main__': main()