Skip to content
Snippets Groups Projects
artist_harware_service.py 3.48 KiB

import artistlib.hardware
from ..base_hardware_service import BaseHardwareInterfaceService, AquireProjection, ReachabilityCheck
from rq_controller.common import PyProjection

try:
    import artistlib
    from artistlib import utility as artist_utility
except ModuleNotFoundError:
    raise ModuleNotFoundError('aRTist API not found')

import ros2_numpy
import numpy as np
from pathlib import Path

from sensor_msgs.msg import Image

import rclpy

class ArtistHardwareInterface(BaseHardwareInterfaceService):
    projection_tempfolder: Path = Path(r'C:\dev\rq_workflow\src\rq_hardware\temp')

    def __init__(self, node_name: str = 'rq_hardware_interface_service'):

        super().__init__(node_name)
        self.api = artistlib.API()
        self.source = artistlib.hardware.XraySource(self.api)
        self.detector = artistlib.hardware.XrayDetector(self.api)

    def aquire_projection_callback(self, 
                                   request: AquireProjection.Request,
                                   response: AquireProjection.Response):
        
        projection_geometry = PyProjection.from_message(request.scan_pose)
        
        # move source
        self.api.rotate_from_quat('S', projection_geometry.focal_spot_orientation_quad)
        self.api.translate('S', request.scan_pose.projection_geometry.focal_spot_postion_mm.x,
                                request.scan_pose.projection_geometry.focal_spot_postion_mm.y, 
                                request.scan_pose.projection_geometry.focal_spot_postion_mm.z)
        
        # move detector
        self.api.rotate_from_quat('D', projection_geometry.detector_orientation_quad)
        self.api.translate('D', request.scan_pose.projection_geometry.detector_postion_mm.x,
                                request.scan_pose.projection_geometry.detector_postion_mm.y, 
                                request.scan_pose.projection_geometry.detector_postion_mm.z)
        
        if not np.isclose(self.source.voltage_kv, request.scan_pose.voltage_kv, atol=0.1):
            # Check if voltage is diffrent to avoid calculation of source spectrum
            self.source.voltage_kv = request.scan_pose.voltage_kv

        self.source.exposure_ma = request.scan_pose.current_ua

        projection_path = self.projection_tempfolder / 'projection.tif'
        self.api.save_image(projection_path)
        projection_image = artist_utility.load_projection(projection_path, load_projection_geometry=False)[0]

        response.projection = request.scan_pose
        response.projection.image = ros2_numpy.msgify(Image, projection_image.astype(np.uint16), 'mono16')

        self.projection_publisher.publish(response.projection.image)
        self.projection_geometry_publisher.publish(response.projection.projection_geometry)

        return response
    
    def check_reachability_callback(self,
                                    request: ReachabilityCheck.Request,
                                    response: ReachabilityCheck.Response):
        self.get_logger().info('Pose to check ...')
        # include some logic
        response.checked_scan_pose.scan_pose = request.scan_pose
        response.checked_scan_pose.reachable = True
        response.checked_scan_pose.cost = 42.

        self.reachabilty_publisher.publish(response.checked_scan_pose)

        return response
    

def main(args=None):
    rclpy.init(args=args)

    minimal_service = ArtistHardwareInterface()

    rclpy.spin(minimal_service)
    rclpy.shutdown()


if __name__ == '__main__':
    main()