Skip to content
Snippets Groups Projects
Commit 2edf6bfd authored by Anton Weiss's avatar Anton Weiss
Browse files

added the robo w/o reachability

parent 3dabc6df
No related branches found
No related tags found
No related merge requests found
Showing with 124 additions and 33 deletions
git+https://github.com/filippocastelli/pyometiff
git+https://github.com/harisankar95/pathfinding3D
git+https://github.com/wittlsn/aRTist-PythonLib/tree/artist_thd_wip
\ No newline at end of file
git+https://github.com/wittlsn/aRTist-PythonLib/
\ No newline at end of file
File added
File added
File added
File added
File added
......@@ -25,15 +25,15 @@ class BaseHardwareInterfaceService(Node):
interface_type: str = 'Base'
last_projection_image: np.ndarray = None
measurement_name: str = 'projection'
projection_tempfolder: Path = Path(r'C:\dev\rq_workflow\src\rq_hardware\temp')
projection_tempfolder: Path = Path(r'C:\ros\workspaces\rq_workflow\tmp')
def __init__(self, node_name: str = 'rq_base_hardware_interface_service'):
def __init__(self, nodename: str = 'rq_base_hardware_interface_service'):
"""
Initialize the BaseHardwareInterfaceService node, create services and publishers, and set up a subscription.
:param node_name: The name of the node. Defaults to 'rq_base_hardware_interface_service'.
"""
super().__init__(node_name, namespace="rq")
super().__init__(nodename, namespace="rq")
self.aquire_projection_srv = self.create_service(AquireProjection, 'aquire_projection', self.aquire_projection_callback)
self.reachability_srv = self.create_service(ReachabilityCheck, 'check_reachability', self.check_reachability_callback)
......@@ -44,7 +44,7 @@ class BaseHardwareInterfaceService(Node):
self.reachabilty_publisher = self.create_publisher(CheckedScanPose, 'checked_scan_pose', 5)
self.projection_subscriper = self.create_subscription(Image, 'current_projection_image', self.projection_callback, 5)
self.transform_broadcaster = TransformBroadcaster(self)
self.transform_broadcaster = TransformBroadcaster(Node('rq_transform_broadcaster'))
def aquire_projection_callback(self,
request: AquireProjection.Request,
......@@ -91,21 +91,6 @@ class BaseHardwareInterfaceService(Node):
self.get_logger().info('got projection ...')
self.last_projection_image = ros2_numpy.numpify(msg)
def create_tf2_source_broadcaster(self):
return StaticFramePublisher(
'rq_focal_spot',
np.array([1000., 0., 0.]),
np.array([0., 0., 0., 1.]),
'rq_source_broadcaster',
'object')
def create_tf2_detector_broadcaster(self):
return StaticFramePublisher(
'rq_detector',
np.array([-1000., 0., 0.]),
np.array([0., 0., 0., 1.]),
'rq_detector_broadcaster',
'object')
def set_source_pose(self, position, quarternion):
t = TransformStamped()
......
......@@ -6,6 +6,8 @@ from ..base_hardware_service import BaseHardwareInterfaceService, AquireProjecti
from rq_controller.common import PyProjection, PyRegionOfIntrest, PyVolume
import numpy as np
import rclpy
import ros2_numpy
from sensor_msgs.msg import Image
class EchoHardwareService(BaseHardwareInterfaceService):
......@@ -36,6 +38,13 @@ class EchoHardwareService(BaseHardwareInterfaceService):
response.projection = request.scan_pose
response.projection.exposure_time_ms += 1
projection = PyProjection.from_message(request.scan_pose)
projection.image = np.random.randint(0, 65535, projection.image.shape, np.uint16)
self.projection_publisher.publish(ros2_numpy.msgify(Image, projection.image.astype(np.uint16), 'mono16'))
self.set_source_pose(projection.focal_spot_mm / 1000., projection.focal_spot_orientation_quad)
self.set_detector_pose(projection.detector_postion_mm / 1000., projection.detector_orientation_quad)
return response
def check_reachability_callback(self,
......
File added
File added
......@@ -33,6 +33,7 @@ try:
from trajectory_toolbox.util.util_poses import Frame, ScanPoseList, ScanPoseTuple # type: ignore
except ModuleNotFoundError:
import warnings
warnings.warn("Packages are spedific for the RoboCT of the Deggendorf Institute of Technology. \n" +
"Please Contact: simon.wittl@th-deg.de")
......@@ -48,21 +49,40 @@ class TurntablePlannerParameter:
angle_weight: float = 0.01
def transform_frames(frame1: Frame, frame2: Frame):
matrix = frame1.transform @ frame2.transform
return Frame.from_transform(matrix)
def frame_mm_to_m(frame: Frame) -> np.ndarray:
transform = frame.transform
transform[:3, 3] /= 1000.
return transform
def frame_m_to_mm(frame: Frame) -> np.ndarray:
transform = frame.transform
transform[:3, 3] *= 1000.
return transform
def frame_fro_transform_msg(msg) -> Frame:
return Frame.from_position_and_quaternion(
np.array([msg.position.x, msg.position.y, msg.position.z]),
np.array([msg.orientation.x, msg.orientation.y, msg.orientation.z, msg.orientation.w]))
class ThdRoboCTHardwareInterface(BaseHardwareInterfaceService):
interface_type: str = 'THD'
def __init__(self, node_name: str = 'rq_thd_hardware_interface_service'):
def __init__(self, nodename: str = 'rq_thd_hardware_interface_service'):
super().__init__(node_name)
super().__init__(nodename)
self.client_detector = DetectorClient()
self.client_xraysource = XraySourceClient()
self.ct_node = CtSequenceNode()
self.client_detector = DetectorClient(name="rq_detector_client", namespace=None)
self.client_xraysource = XraySourceClient(name="rq_source_client", namespace=None)
self.client_move_it_commander = RoboCtMoveItCommander()
# check_xray_hardware
self.check_xray_hardware_connections()
self.check_xray_voltage()
# self.check_xray_hardware_connections()
# self.check_xray_voltage()
# init robot planner
self.configure_turntable_planner_parameter()
......@@ -181,18 +201,79 @@ class ThdRoboCTHardwareInterface(BaseHardwareInterfaceService):
trajectory = TrajectoryBase('rq_move')
scan_pose_list = ScanPoseList()
planned_projection = PyProjection.from_message(request.scan_pose)
source_frame = Frame.from_position_and_quaternion(planned_projection.focal_spot_mm,
# get trafo world -> object
world_object_m = self.client_move_it_commander.get_transform('world', 'object') # in meter !!!
object_frame_m = frame_fro_transform_msg(world_object_m)
object_frame_mm = Frame.from_transform(frame_m_to_mm(object_frame_m))
source_frame: Frame = Frame.from_position_and_quaternion(planned_projection.focal_spot_mm,
planned_projection.focal_spot_orientation_quad)
source_frame = transform_frames(object_frame_mm, source_frame)
detector_frame = Frame.from_position_and_quaternion(planned_projection.detector_postion_mm,
planned_projection.detector_orientation_quad)
detector_frame = transform_frames(object_frame_mm, detector_frame)
scan_pose = ScanPoseTuple(source_frame, detector_frame)
scan_pose_list = ScanPoseList()
scan_pose_list.add_pose_tuple(scan_pose)
trajectory._scan_pose_list = scan_pose_list
tpm = TrajectoryPoseManager(trajectory)
pass
world_table_transformation_m = frame_fro_transform_msg(
self.client_move_it_commander.get_transform("world", "tisch_static_plane"))
roated_scan_poses, angles = turntable_path_planning(
world_table_transformation_m,
tpm,
golden_angle=0.)
roated_scan_poses.pose_tuples[0]._source_pose
angle_in_rad = angles[0]
angle_in_rad = angle_in_rad - np.pi / 2
vector = world_table_transformation_m.position
vector[0] += np.sin(angle_in_rad * -1) * 2.
vector[1] += np.cos(angle_in_rad * -1) * 2.
vector[2] += 1.
matrix = np.eye(4)
matrix[:3,3] = vector
target_master = RobotGoalTarget_from_Matrix('all', 'master', 'comet_tcp', frame_mm_to_m(
roated_scan_poses.pose_tuples[0]._source_pose))
target_slave = RobotGoalTarget_from_Matrix('all', 'slave', 'varex_tcp', frame_mm_to_m(
roated_scan_poses.pose_tuples[0]._detector_pose))
target_table = RobotGoalTarget_from_Matrix('all', 'tisch', 'tisch_virtual_tcp', matrix)
req, res = self.client_move_it_commander.move_robots([target_table, target_master, target_slave], self.parameter_planner)
if not res.movement_executed:
response.status.success = False
return response
self.client_detector.take_single_image(block=True)
image = self.client_detector.current_image.copy()
source_m = frame_fro_transform_msg(
self.client_move_it_commander.get_transform("tisch_tcp", "comet_tcp"))
detector_m = frame_fro_transform_msg(
self.client_move_it_commander.get_transform("tisch_tcp", "varex_tcp"))
projection = PyProjection(
source_m.position * 1000.,
detector_m.position * 1000.,
detector_m.quaternion,
image,
427.008, 427.008,
'tisch',
source_frame.quaternion)
response.projection = projection.as_message()
response.status.success = True
return response
def check_reachability_callback(self,
request: ReachabilityCheck.Request,
......@@ -201,12 +282,28 @@ class ThdRoboCTHardwareInterface(BaseHardwareInterfaceService):
def main(args=None):
import sys
rclpy.init(args=args)
minimal_service = ThdRoboCTHardwareInterface()
rclpy.spin(minimal_service)
rclpy.shutdown()
executor = rclpy.executors.MultiThreadedExecutor()
executor.add_node(minimal_service)
executor.add_node(minimal_service.client_detector)
executor.add_node(minimal_service.client_xraysource)
executor.add_node(minimal_service.client_move_it_commander)
executor.add_node(minimal_service.transform_broadcaster)
try:
executor.spin()
except KeyboardInterrupt:
pass
except rclpy.executors.ExternalShutdownException:
sys.exit()
finally:
minimal_service.shutdown()
rclpy.try_shutdown()
minimal_service.destroy_node()
if __name__ == '__main__':
......
......@@ -20,7 +20,7 @@ setup(
tests_require=['pytest'],
entry_points={
'console_scripts': [
'echo_service = rq_hardware.echo.echo_hardware_service:main',
'echo_hardware_service = rq_hardware.echo.echo_hardware_service:main',
'artist_service = rq_hardware.artist.artist_hardware_service:main',
'thd_roboct_service = rq_hardware.thd_roboct.thd_roboct_service:main',
'client = rq_hardware.hardware_client:main'
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment