-
Simon Wittl authoredSimon Wittl authored
artist_harware_service.py 3.48 KiB
import artistlib.hardware
from ..base_hardware_service import BaseHardwareInterfaceService, AquireProjection, ReachabilityCheck
from rq_controller.common import PyProjection
try:
import artistlib
from artistlib import utility as artist_utility
except ModuleNotFoundError:
raise ModuleNotFoundError('aRTist API not found')
import ros2_numpy
import numpy as np
from pathlib import Path
from sensor_msgs.msg import Image
import rclpy
class ArtistHardwareInterface(BaseHardwareInterfaceService):
projection_tempfolder: Path = Path(r'C:\dev\rq_workflow\src\rq_hardware\temp')
def __init__(self, node_name: str = 'rq_hardware_interface_service'):
super().__init__(node_name)
self.api = artistlib.API()
self.source = artistlib.hardware.XraySource(self.api)
self.detector = artistlib.hardware.XrayDetector(self.api)
def aquire_projection_callback(self,
request: AquireProjection.Request,
response: AquireProjection.Response):
projection_geometry = PyProjection.from_message(request.scan_pose)
# move source
self.api.rotate_from_quat('S', projection_geometry.focal_spot_orientation_quad)
self.api.translate('S', request.scan_pose.projection_geometry.focal_spot_postion_mm.x,
request.scan_pose.projection_geometry.focal_spot_postion_mm.y,
request.scan_pose.projection_geometry.focal_spot_postion_mm.z)
# move detector
self.api.rotate_from_quat('D', projection_geometry.detector_orientation_quad)
self.api.translate('D', request.scan_pose.projection_geometry.detector_postion_mm.x,
request.scan_pose.projection_geometry.detector_postion_mm.y,
request.scan_pose.projection_geometry.detector_postion_mm.z)
if not np.isclose(self.source.voltage_kv, request.scan_pose.voltage_kv, atol=0.1):
# Check if voltage is diffrent to avoid calculation of source spectrum
self.source.voltage_kv = request.scan_pose.voltage_kv
self.source.exposure_ma = request.scan_pose.current_ua
projection_path = self.projection_tempfolder / 'projection.tif'
self.api.save_image(projection_path)
projection_image = artist_utility.load_projection(projection_path, load_projection_geometry=False)[0]
response.projection = request.scan_pose
response.projection.image = ros2_numpy.msgify(Image, projection_image.astype(np.uint16), 'mono16')
self.projection_publisher.publish(response.projection.image)
self.projection_geometry_publisher.publish(response.projection.projection_geometry)
return response
def check_reachability_callback(self,
request: ReachabilityCheck.Request,
response: ReachabilityCheck.Response):
self.get_logger().info('Pose to check ...')
# include some logic
response.checked_scan_pose.scan_pose = request.scan_pose
response.checked_scan_pose.reachable = True
response.checked_scan_pose.cost = 42.
self.reachabilty_publisher.publish(response.checked_scan_pose)
return response
def main(args=None):
rclpy.init(args=args)
minimal_service = ArtistHardwareInterface()
rclpy.spin(minimal_service)
rclpy.shutdown()
if __name__ == '__main__':
main()