import artistlib.hardware from ..base_hardware_service import BaseHardwareInterfaceService, AquireProjection, ReachabilityCheck from rq_controller.common import PyProjection try: import artistlib from artistlib import utility as artist_utility except ModuleNotFoundError: raise ModuleNotFoundError('aRTist API not found') import ros2_numpy import numpy as np from pathlib import Path from sensor_msgs.msg import Image import rclpy class ArtistHardwareInterface(BaseHardwareInterfaceService): projection_tempfolder: Path = Path(r'C:\dev\rq_workflow\src\rq_hardware\temp') def __init__(self, node_name: str = 'rq_hardware_interface_service'): super().__init__(node_name) self.api = artistlib.API() self.source = artistlib.hardware.XraySource(self.api) self.detector = artistlib.hardware.XrayDetector(self.api) def aquire_projection_callback(self, request: AquireProjection.Request, response: AquireProjection.Response): projection_geometry = PyProjection.from_message(request.scan_pose) # move source self.api.rotate_from_quat('S', projection_geometry.focal_spot_orientation_quad) self.api.translate('S', request.scan_pose.projection_geometry.focal_spot_postion_mm.x, request.scan_pose.projection_geometry.focal_spot_postion_mm.y, request.scan_pose.projection_geometry.focal_spot_postion_mm.z) # move detector self.api.rotate_from_quat('D', projection_geometry.detector_orientation_quad) self.api.translate('D', request.scan_pose.projection_geometry.detector_postion_mm.x, request.scan_pose.projection_geometry.detector_postion_mm.y, request.scan_pose.projection_geometry.detector_postion_mm.z) if not np.isclose(self.source.voltage_kv, request.scan_pose.voltage_kv, atol=0.1): # Check if voltage is diffrent to avoid calculation of source spectrum self.source.voltage_kv = request.scan_pose.voltage_kv self.source.exposure_ma = request.scan_pose.current_ua projection_path = self.projection_tempfolder / 'projection.tif' self.api.save_image(projection_path) projection_image = artist_utility.load_projection(projection_path, load_projection_geometry=False)[0] response.projection = request.scan_pose response.projection.image = ros2_numpy.msgify(Image, projection_image.astype(np.uint16), 'mono16') self.projection_publisher.publish(response.projection.image) self.projection_geometry_publisher.publish(response.projection.projection_geometry) return response def check_reachability_callback(self, request: ReachabilityCheck.Request, response: ReachabilityCheck.Response): self.get_logger().info('Pose to check ...') # include some logic response.checked_scan_pose.scan_pose = request.scan_pose response.checked_scan_pose.reachable = True response.checked_scan_pose.cost = 42. self.reachabilty_publisher.publish(response.checked_scan_pose) return response def main(args=None): rclpy.init(args=args) minimal_service = ArtistHardwareInterface() rclpy.spin(minimal_service) rclpy.shutdown() if __name__ == '__main__': main()