diff --git a/keypoint_pose/keypoint_pose/keypoint_node.py b/keypoint_pose/keypoint_pose/keypoint_node.py
index 545d158..bc67ccc 100644
--- a/keypoint_pose/keypoint_pose/keypoint_node.py
+++ b/keypoint_pose/keypoint_pose/keypoint_node.py
@@ -1,26 +1,36 @@
"""
-ROS 2 node that subscribes to rectified stereo images, runs MMPose
-keypoint detection, and displays the results in OpenCV windows.
+ROS 2 node: stereo keypoint triangulation using MMPose.
+
+Subscribes to raw stereo images, runs pose estimation on each,
+matches keypoints across cameras, and triangulates 3D positions.
+
+Published topics:
+ /keypoint_markers (visualization_msgs/MarkerArray) -- for RViz
"""
+import colorsys
+
import cv2
import numpy as np
import rclpy
from rclpy.node import Node
-from sensor_msgs.msg import Image
+from sensor_msgs.msg import Image, CameraInfo
+from visualization_msgs.msg import Marker, MarkerArray
+from geometry_msgs.msg import Point
from cv_bridge import CvBridge
+from message_filters import Subscriber, ApproximateTimeSynchronizer
from mmpose.apis import MMPoseInferencer
-# ── COCO-17 drawing helpers ─────────────────────────────────────────────
+# ── COCO-17 constants ───────────────────────────────────────────────────
SKELETON = [
- (0, 1), (0, 2), # nose → eyes
- (1, 3), (2, 4), # eyes → ears
+ (0, 1), (0, 2), # nose -> eyes
+ (1, 3), (2, 4), # eyes -> ears
(5, 6), # shoulders
(5, 7), (7, 9), # left arm
(6, 8), (8, 10), # right arm
- (5, 11), (6, 12), # shoulders → hips
+ (5, 11), (6, 12), # shoulders -> hips
(11, 12), # hips
(11, 13), (13, 15), # left leg
(12, 14), (14, 16), # right leg
@@ -39,7 +49,6 @@ def draw_keypoints(frame, keypoints, keypoint_scores, threshold=0.3):
h, w = frame.shape[:2]
for person_kps, person_scores in zip(keypoints, keypoint_scores):
- # Skeleton lines
for j1, j2 in SKELETON:
if person_scores[j1] > threshold and person_scores[j2] > threshold:
pt1 = (int(person_kps[j1][0]), int(person_kps[j1][1]))
@@ -48,7 +57,6 @@ def draw_keypoints(frame, keypoints, keypoint_scores, threshold=0.3):
0 <= pt2[0] < w and 0 <= pt2[1] < h):
cv2.line(frame, pt1, pt2, (0, 255, 0), 2)
- # Joint circles
for idx, (kp, score) in enumerate(zip(person_kps, person_scores)):
if score > threshold:
x, y = int(kp[0]), int(kp[1])
@@ -62,8 +70,8 @@ def draw_keypoints(frame, keypoints, keypoint_scores, threshold=0.3):
# ── ROS 2 Node ───────────────────────────────────────────────────────────
-class KeypointNode(Node):
- """Subscribe to stereo rectified images, run MMPose, and display."""
+class KeypointTriangulationNode(Node):
+ """Stereo keypoint triangulation with MMPose."""
def __init__(self):
super().__init__('keypoint_node')
@@ -71,125 +79,360 @@ class KeypointNode(Node):
# ── Parameters ──────────────────────────────────────────────────
self.declare_parameter('threshold', 0.3)
self.declare_parameter('device', 'cuda:0')
+ self.declare_parameter('max_residual', 0.10) # metres
- threshold = self.get_parameter('threshold').value
+ self._threshold = self.get_parameter('threshold').value
+ self._max_residual = self.get_parameter('max_residual').value
device = self.get_parameter('device').value
- self._threshold = threshold
# ── MMPose (single shared instance) ─────────────────────────────
- self.get_logger().info(
- f'Loading MMPose model on {device} (this may take a moment)…'
- )
- self._inferencer = MMPoseInferencer(
- pose2d='human',
- device=device,
- )
- self.get_logger().info('MMPose model loaded.')
+ self.get_logger().info(f'Loading MMPose on {device} ...')
+ self._inferencer = MMPoseInferencer(pose2d='human', device=device)
+ self.get_logger().info('MMPose loaded.')
# ── cv_bridge ───────────────────────────────────────────────────
self._bridge = CvBridge()
- # Latest frames (written by subscribers, read by display timer)
- self._left_frame: np.ndarray | None = None
- self._right_frame: np.ndarray | None = None
+ # ── Calibration (filled from camera_info subscribers) ───────────
+ self._calib = {'left': None, 'right': None}
+ self._stereo = None # computed once both camera_infos arrive
- # ── Subscribers (independent — no time-sync needed) ─────────────
self.create_subscription(
- Image,
- '/stereo/left/image_rect',
- self._left_cb,
- 10,
- )
+ CameraInfo, '/stereo/left/camera_info',
+ lambda m: self._info_cb(m, 'left'), 10)
self.create_subscription(
- Image,
- '/stereo/right/image_rect',
- self._right_cb,
- 10,
- )
+ CameraInfo, '/stereo/right/camera_info',
+ lambda m: self._info_cb(m, 'right'), 10)
- # ── Timer for cv2.waitKey (keeps GUI responsive) ────────────────
- # ~30 Hz is plenty for display refresh
+ # ── Time-synced image subscribers ───────────────────────────────
+ left_sub = Subscriber(self, Image, '/stereo/left/image_raw')
+ right_sub = Subscriber(self, Image, '/stereo/right/image_raw')
+ self._sync = ApproximateTimeSynchronizer(
+ [left_sub, right_sub], queue_size=5, slop=0.05)
+ self._sync.registerCallback(self._synced_cb)
+
+ # ── Publisher ───────────────────────────────────────────────────
+ self._marker_pub = self.create_publisher(
+ MarkerArray, '/keypoint_markers', 10)
+
+ # ── Display state ───────────────────────────────────────────────
+ self._left_display = None
+ self._right_display = None
self.create_timer(1.0 / 30.0, self._display_timer_cb)
self.get_logger().info(
- 'Subscribed to /stereo/left/image_rect and '
- '/stereo/right/image_rect — waiting for images…'
- )
+ 'Waiting for camera_info and synced image_raw from '
+ '/stereo/left and /stereo/right ...')
- # ── Subscriber callbacks ────────────────────────────────────────────
+ # ── Camera info / stereo geometry ───────────────────────────────────
- def _process_frame(self, msg: Image, label: str) -> np.ndarray:
- """Convert ROS Image -> OpenCV, run MMPose, draw keypoints."""
- # Log image metadata once to help diagnose blank-frame issues
- if not hasattr(self, f'_logged_{label}'):
- self.get_logger().info(
- f'[{label}] First frame: encoding={msg.encoding} '
- f'size={msg.width}x{msg.height} step={msg.step}'
- )
- setattr(self, f'_logged_{label}', True)
+ def _info_cb(self, msg: CameraInfo, side: str):
+ if self._calib[side] is not None:
+ return
+ self._calib[side] = {
+ 'K': np.array(msg.k, dtype=np.float64).reshape(3, 3),
+ 'D': np.array(msg.d, dtype=np.float64),
+ 'R_rect': np.array(msg.r, dtype=np.float64).reshape(3, 3),
+ 'P': np.array(msg.p, dtype=np.float64).reshape(3, 4),
+ }
+ self.get_logger().info(
+ f'{side} camera_info received ({msg.width}x{msg.height})')
+ self._try_compute_stereo()
- # Use passthrough first so we can inspect the raw data
- frame = self._bridge.imgmsg_to_cv2(msg, desired_encoding='passthrough')
+ def _try_compute_stereo(self):
+ if self._calib['left'] is None or self._calib['right'] is None:
+ return
+ if self._stereo is not None:
+ return
- if not hasattr(self, f'_logged_px_{label}'):
- self.get_logger().info(
- f'[{label}] Frame dtype={frame.dtype} shape={frame.shape} '
- f'min={frame.min()} max={frame.max()}'
- )
- setattr(self, f'_logged_px_{label}', True)
+ R1 = self._calib['left']['R_rect']
+ R2 = self._calib['right']['R_rect']
+ P_R = self._calib['right']['P']
- # Convert grayscale to BGR so MMPose and drawing work correctly
- if len(frame.shape) == 2 or frame.shape[2] == 1:
- frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR)
- elif frame.shape[2] == 4:
- frame = cv2.cvtColor(frame, cv2.COLOR_BGRA2BGR)
+ # Baseline: P_right[0,3] = fx_rect * tx (tx is negative)
+ tx = P_R[0, 3] / P_R[0, 0]
+ # Right camera origin in left (world) frame:
+ # O_right = -R1^T @ [tx, 0, 0]
+ O_right = -R1.T @ np.array([tx, 0.0, 0.0])
+
+ # Rotation that transforms a direction vector from the right
+ # camera frame into the left (world) frame:
+ # d_world = R_right_to_world @ d_right_cam
+ R_right_to_world = R1.T @ R2
+
+ self._stereo = {
+ 'O_right': O_right,
+ 'R_right_to_world': R_right_to_world,
+ }
+
+ baseline = np.linalg.norm(O_right)
+ self.get_logger().info(
+ f'Stereo geometry ready. Baseline = {baseline:.4f} m '
+ f'O_right = {O_right.round(4).tolist()}')
+
+ # ── Pixel -> ray ────────────────────────────────────────────────────
+
+ def _pixel_to_ray(self, u, v, side):
+ """Undistort pixel (u,v) and return a unit ray in the world frame."""
+ K = self._calib[side]['K']
+ D = self._calib[side]['D']
+
+ # cv2.undistortPoints returns normalised coordinates (K^-1 applied
+ # and distortion corrected).
+ pts = np.array([[[u, v]]], dtype=np.float64)
+ norm = cv2.undistortPoints(pts, K, D)
+ d_cam = np.array([norm[0, 0, 0], norm[0, 0, 1], 1.0])
+
+ if side == 'right':
+ d_world = self._stereo['R_right_to_world'] @ d_cam
+ else:
+ d_world = d_cam # left camera IS the world frame
+
+ return d_world / np.linalg.norm(d_world)
+
+ # ── Triangulation (closest point of approach) ───────────────────────
+
+ def _triangulate(self, d1, d2):
+ """
+ Given two unit rays from O_left and O_right, find the 3D midpoint
+ of closest approach and the residual (ray separation).
+
+ Returns (point_3d, residual) or (None, inf) if rays are parallel.
+ """
+ O1 = np.zeros(3)
+ O2 = self._stereo['O_right']
+ b = O2 - O1 # baseline vector
+
+ # Solve s*d1 - t*d2 = b in the least-squares sense:
+ # | d1.d1 -d1.d2 | |s| |d1.b|
+ # | d1.d2 -d2.d2 | |t| = |d2.b|
+ d1d1 = d1 @ d1
+ d2d2 = d2 @ d2
+ d1d2 = d1 @ d2
+
+ denom = d1d1 * d2d2 - d1d2 * d1d2
+ if abs(denom) < 1e-12:
+ return None, float('inf')
+
+ d1b = d1 @ b
+ d2b = d2 @ b
+
+ s = (d1b * d2d2 - d2b * d1d2) / denom
+ t = (d1b * d1d2 - d2b * d1d1) / denom
+
+ p1 = O1 + s * d1
+ p2 = O2 + t * d2
+
+ return (p1 + p2) / 2.0, np.linalg.norm(p1 - p2)
+
+ # ── MMPose helper ───────────────────────────────────────────────────
+
+ def _run_mmpose(self, frame):
+ """Return list of dicts with 'keypoints' (N,2) and 'scores' (N,)."""
result = next(self._inferencer(
- frame,
- show=False,
- return_datasamples=False,
- ))
+ frame, show=False, return_datasamples=False))
- predictions = result.get('predictions', [[]])[0]
- all_kps, all_scores = [], []
- for pred in predictions:
+ people = []
+ for pred in result.get('predictions', [[]])[0]:
kps = pred.get('keypoints', [])
scores = pred.get('keypoint_scores', [])
if len(kps) > 0:
- all_kps.append(np.array(kps))
- all_scores.append(np.array(scores))
+ people.append({
+ 'keypoints': np.array(kps),
+ 'scores': np.array(scores),
+ })
+ return people
- if all_kps:
- draw_keypoints(frame, all_kps, all_scores, self._threshold)
+ # ── Person matching across cameras ──────────────────────────────────
+ def _match_people(self, left_people, right_people):
+ """
+ Match people across stereo views using vertical-coordinate
+ similarity (epipolar heuristic: same person has similar y in
+ a roughly-rectified stereo pair).
+
+ Returns list of (left_person, right_person) tuples.
+ """
+ if not left_people or not right_people:
+ return []
+
+ thr = self._threshold
+ n_l, n_r = len(left_people), len(right_people)
+
+ # Cost matrix: mean |y-difference| for mutually confident keypoints
+ cost = np.full((n_l, n_r), np.inf)
+ for i, lp in enumerate(left_people):
+ for j, rp in enumerate(right_people):
+ mask = (lp['scores'] > thr) & (rp['scores'] > thr)
+ if mask.sum() < 3:
+ continue
+ cost[i, j] = np.abs(
+ lp['keypoints'][mask, 1] - rp['keypoints'][mask, 1]
+ ).mean()
+
+ # Greedy matching by lowest cost
+ matches = []
+ used_l, used_r = set(), set()
+ while True:
+ remaining = np.full_like(cost, np.inf)
+ for i in range(n_l):
+ for j in range(n_r):
+ if i not in used_l and j not in used_r:
+ remaining[i, j] = cost[i, j]
+ idx = np.unravel_index(remaining.argmin(), remaining.shape)
+ if remaining[idx] > 100.0 or np.isinf(remaining[idx]):
+ break
+ matches.append((left_people[idx[0]], right_people[idx[1]]))
+ used_l.add(idx[0])
+ used_r.add(idx[1])
+
+ return matches
+
+ # ── Marker builder ──────────────────────────────────────────────────
+
+ def _build_markers(self, all_points_3d, stamp):
+ ma = MarkerArray()
+
+ # Clear previous frame's markers
+ delete = Marker()
+ delete.action = Marker.DELETEALL
+ delete.header.frame_id = 'left'
+ delete.header.stamp = stamp
+ ma.markers.append(delete)
+
+ mid = 0
+ for person_idx, person_pts in enumerate(all_points_3d):
+ r, g, b_ = colorsys.hsv_to_rgb((person_idx * 0.3) % 1.0, 1.0, 1.0)
+
+ # Sphere per keypoint
+ for kp_idx, (pt, _res) in person_pts.items():
+ m = Marker()
+ m.header.frame_id = 'left'
+ m.header.stamp = stamp
+ m.ns = f'person_{person_idx}'
+ m.id = mid; mid += 1
+ m.type = Marker.SPHERE
+ m.action = Marker.ADD
+ m.pose.position.x = float(pt[0])
+ m.pose.position.y = float(pt[1])
+ m.pose.position.z = float(pt[2])
+ m.pose.orientation.w = 1.0
+ m.scale.x = m.scale.y = m.scale.z = 0.05
+ m.color.r, m.color.g, m.color.b = r, g, b_
+ m.color.a = 1.0
+ m.lifetime.nanosec = 300_000_000
+ ma.markers.append(m)
+
+ # Skeleton lines
+ for j1, j2 in SKELETON:
+ if j1 in person_pts and j2 in person_pts:
+ m = Marker()
+ m.header.frame_id = 'left'
+ m.header.stamp = stamp
+ m.ns = f'skel_{person_idx}'
+ m.id = mid; mid += 1
+ m.type = Marker.LINE_STRIP
+ m.action = Marker.ADD
+ m.points = [
+ Point(x=float(person_pts[j1][0][0]),
+ y=float(person_pts[j1][0][1]),
+ z=float(person_pts[j1][0][2])),
+ Point(x=float(person_pts[j2][0][0]),
+ y=float(person_pts[j2][0][1]),
+ z=float(person_pts[j2][0][2])),
+ ]
+ m.scale.x = 0.02
+ m.color.r, m.color.g, m.color.b = r, g, b_
+ m.color.a = 0.8
+ m.lifetime.nanosec = 300_000_000
+ ma.markers.append(m)
+
+ return ma
+
+ # ── Main synced callback ────────────────────────────────────────────
+
+ def _synced_cb(self, left_msg: Image, right_msg: Image):
+ if self._stereo is None:
+ return # waiting for camera_info
+
+ # Convert images (handles Bayer -> BGR automatically)
+ left_frame = self._bridge.imgmsg_to_cv2(
+ left_msg, desired_encoding='bgr8')
+ right_frame = self._bridge.imgmsg_to_cv2(
+ right_msg, desired_encoding='bgr8')
+
+ # Run MMPose on both views
+ left_people = self._run_mmpose(left_frame)
+ right_people = self._run_mmpose(right_frame)
+
+ # Match people across cameras
+ matches = self._match_people(left_people, right_people)
+
+ # Triangulate every mutually-confident keypoint
+ all_points_3d = []
+ for lp, rp in matches:
+ person_pts = {} # kp_idx -> (xyz, residual)
+ for kp_idx in range(17):
+ if (lp['scores'][kp_idx] <= self._threshold or
+ rp['scores'][kp_idx] <= self._threshold):
+ continue
+
+ u_l, v_l = lp['keypoints'][kp_idx]
+ u_r, v_r = rp['keypoints'][kp_idx]
+
+ d1 = self._pixel_to_ray(u_l, v_l, 'left')
+ d2 = self._pixel_to_ray(u_r, v_r, 'right')
+
+ pt3d, residual = self._triangulate(d1, d2)
+ if pt3d is not None and residual < self._max_residual:
+ person_pts[kp_idx] = (pt3d, residual)
+
+ if person_pts:
+ all_points_3d.append(person_pts)
+
+ # Publish 3D markers
+ self._marker_pub.publish(
+ self._build_markers(all_points_3d, left_msg.header.stamp))
+
+ # Draw 2D keypoints on display frames
+ if left_people:
+ draw_keypoints(
+ left_frame,
+ [p['keypoints'] for p in left_people],
+ [p['scores'] for p in left_people],
+ self._threshold)
+ if right_people:
+ draw_keypoints(
+ right_frame,
+ [p['keypoints'] for p in right_people],
+ [p['scores'] for p in right_people],
+ self._threshold)
+
+ n_3d = sum(len(p) for p in all_points_3d)
cv2.putText(
- frame,
- f'Subjects: {len(all_kps)}',
- (10, 30),
- cv2.FONT_HERSHEY_SIMPLEX,
- 0.8,
- (255, 255, 255),
- 2,
- )
- return frame
+ left_frame,
+ f'People: {len(left_people)} 3D pts: {n_3d}',
+ (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
+ cv2.putText(
+ right_frame,
+ f'People: {len(right_people)} Matched: {len(matches)}',
+ (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
- def _left_cb(self, msg: Image) -> None:
- self._left_frame = self._process_frame(msg, 'left')
-
- def _right_cb(self, msg: Image) -> None:
- self._right_frame = self._process_frame(msg, 'right')
+ self._left_display = left_frame
+ self._right_display = right_frame
# ── Display timer ───────────────────────────────────────────────────
- def _display_timer_cb(self) -> None:
- if self._left_frame is not None:
- cv2.imshow('Left - MMPose', self._left_frame)
- if self._right_frame is not None:
- cv2.imshow('Right - MMPose', self._right_frame)
+ def _display_timer_cb(self):
+ if self._left_display is not None:
+ cv2.imshow('Left - Keypoints', self._left_display)
+ if self._right_display is not None:
+ cv2.imshow('Right - Keypoints', self._right_display)
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
- self.get_logger().info('Quit requested — shutting down.')
+ self.get_logger().info('Quit requested.')
self.destroy_node()
rclpy.shutdown()
@@ -198,7 +441,7 @@ class KeypointNode(Node):
def main(args=None):
rclpy.init(args=args)
- node = KeypointNode()
+ node = KeypointTriangulationNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
diff --git a/keypoint_pose/launch/keypoint_node.launch.py b/keypoint_pose/launch/keypoint_node.launch.py
new file mode 100644
index 0000000..c7b2b9f
--- /dev/null
+++ b/keypoint_pose/launch/keypoint_node.launch.py
@@ -0,0 +1,31 @@
+"""Launch keypoint_node using the mmpose conda environment's Python."""
+
+import os
+from launch import LaunchDescription
+from launch.actions import ExecuteProcess
+
+
+def generate_launch_description():
+ python_exe = os.path.expanduser(
+ '~/miniconda3/envs/mmpose/bin/python3'
+ )
+
+ node_module = 'keypoint_pose.keypoint_node'
+
+ return LaunchDescription([
+ ExecuteProcess(
+ cmd=[
+ python_exe, '-m', node_module,
+ '--ros-args',
+ '-p', 'threshold:=0.3',
+ '-p', 'device:=cuda:0',
+ '-p', 'max_residual:=0.10',
+ ],
+ output='screen',
+ env={
+ **os.environ,
+ 'DISPLAY': os.environ.get('DISPLAY', ':1'),
+ 'QT_QPA_PLATFORM_PLUGIN_PATH': '',
+ },
+ ),
+ ])
diff --git a/keypoint_pose/package.xml b/keypoint_pose/package.xml
index deb7875..7ecbcac 100644
--- a/keypoint_pose/package.xml
+++ b/keypoint_pose/package.xml
@@ -10,6 +10,9 @@
rclpy
sensor_msgs
cv_bridge
+ message_filters
+ visualization_msgs
+ geometry_msgs
ament_copyright
ament_flake8
diff --git a/keypoint_pose/setup.cfg b/keypoint_pose/setup.cfg
index 9fe19aa..4502bea 100644
--- a/keypoint_pose/setup.cfg
+++ b/keypoint_pose/setup.cfg
@@ -2,3 +2,5 @@
script_dir=$base/lib/keypoint_pose
[install]
install_scripts=$base/lib/keypoint_pose
+[build_scripts]
+executable=/usr/bin/env python3
diff --git a/keypoint_pose/setup.py b/keypoint_pose/setup.py
index 4ef54e5..395a41f 100644
--- a/keypoint_pose/setup.py
+++ b/keypoint_pose/setup.py
@@ -1,4 +1,6 @@
from setuptools import find_packages, setup
+import os
+from glob import glob
package_name = 'keypoint_pose'
@@ -10,6 +12,7 @@ setup(
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
+ (os.path.join('share', package_name, 'launch'), glob('launch/*.py')),
],
install_requires=['setuptools'],
zip_safe=True,
diff --git a/start_cameras_only.launch.py b/start_cameras_only.launch.py
new file mode 100644
index 0000000..12c55ba
--- /dev/null
+++ b/start_cameras_only.launch.py
@@ -0,0 +1,77 @@
+"""Launch camera drivers only (no rectification or stereo processing)."""
+
+import os
+from ament_index_python.packages import get_package_share_directory
+from launch import LaunchDescription
+from launch_ros.actions import ComposableNodeContainer, Node
+from launch_ros.descriptions import ComposableNode
+
+LEFT_CAMERA_NAME = 'left'
+RIGHT_CAMERA_NAME = 'right'
+
+
+def generate_launch_description():
+ config_path = os.path.join(
+ get_package_share_directory('spinnaker_camera_driver'),
+ 'config',
+ 'blackfly_s.yaml'
+ )
+
+ # ── Camera Drivers (component_container_mt) ─────────────────────────
+ #
+ # Both cameras share a single process because the FLIR Spinnaker SDK
+ # requires USB3 cameras to negotiate bandwidth within one process.
+ #
+ # Topics published (per camera):
+ # /stereo/{left,right}/image_raw (sensor_msgs/Image)
+ # /stereo/{left,right}/camera_info (sensor_msgs/CameraInfo)
+ camera_container = ComposableNodeContainer(
+ name='camera_container',
+ namespace='',
+ package='rclcpp_components',
+ executable='component_container_mt',
+ composable_node_descriptions=[
+ ComposableNode(
+ package='spinnaker_camera_driver',
+ plugin='spinnaker_camera_driver::CameraDriver',
+ name=LEFT_CAMERA_NAME,
+ namespace='stereo',
+ parameters=[{
+ 'parameter_file': config_path,
+ 'serial_number': '25282106',
+ 'camera_info_url': 'file:///home/sentry/camera_ws/stereoCal/left.yaml',
+ }],
+ extra_arguments=[{'use_intra_process_comms': True}],
+ ),
+ ComposableNode(
+ package='spinnaker_camera_driver',
+ plugin='spinnaker_camera_driver::CameraDriver',
+ name=RIGHT_CAMERA_NAME,
+ namespace='stereo',
+ parameters=[{
+ 'parameter_file': config_path,
+ 'serial_number': '25235293',
+ 'camera_info_url': 'file:///home/sentry/camera_ws/stereoCal/right.yaml',
+ }],
+ extra_arguments=[{'use_intra_process_comms': True}],
+ ),
+ ],
+ output='screen',
+ )
+
+ # ── Static TF: world -> left ─────────────────────────────────────────
+ tf_publisher = Node(
+ package='tf2_ros',
+ executable='static_transform_publisher',
+ arguments=[
+ '--x', '0', '--y', '0', '--z', '0',
+ '--yaw', '0', '--pitch', '0', '--roll', '0',
+ '--frame-id', 'world',
+ '--child-frame-id', 'left',
+ ],
+ )
+
+ return LaunchDescription([
+ camera_container,
+ tf_publisher,
+ ])