launchfile and new keypoint fusion method

This commit is contained in:
Aditya Pulipaka
2026-03-04 15:34:57 -06:00
parent 7178ec89a4
commit 36200f010a
6 changed files with 454 additions and 95 deletions

View File

@@ -1,26 +1,36 @@
""" """
ROS 2 node that subscribes to rectified stereo images, runs MMPose ROS 2 node: stereo keypoint triangulation using MMPose.
keypoint detection, and displays the results in OpenCV windows.
Subscribes to raw stereo images, runs pose estimation on each,
matches keypoints across cameras, and triangulates 3D positions.
Published topics:
/keypoint_markers (visualization_msgs/MarkerArray) -- for RViz
""" """
import colorsys
import cv2 import cv2
import numpy as np import numpy as np
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
from sensor_msgs.msg import Image from sensor_msgs.msg import Image, CameraInfo
from visualization_msgs.msg import Marker, MarkerArray
from geometry_msgs.msg import Point
from cv_bridge import CvBridge from cv_bridge import CvBridge
from message_filters import Subscriber, ApproximateTimeSynchronizer
from mmpose.apis import MMPoseInferencer from mmpose.apis import MMPoseInferencer
# ── COCO-17 drawing helpers ───────────────────────────────────────────── # ── COCO-17 constants ───────────────────────────────────────────────────
SKELETON = [ SKELETON = [
(0, 1), (0, 2), # nose eyes (0, 1), (0, 2), # nose -> eyes
(1, 3), (2, 4), # eyes ears (1, 3), (2, 4), # eyes -> ears
(5, 6), # shoulders (5, 6), # shoulders
(5, 7), (7, 9), # left arm (5, 7), (7, 9), # left arm
(6, 8), (8, 10), # right arm (6, 8), (8, 10), # right arm
(5, 11), (6, 12), # shoulders hips (5, 11), (6, 12), # shoulders -> hips
(11, 12), # hips (11, 12), # hips
(11, 13), (13, 15), # left leg (11, 13), (13, 15), # left leg
(12, 14), (14, 16), # right leg (12, 14), (14, 16), # right leg
@@ -39,7 +49,6 @@ def draw_keypoints(frame, keypoints, keypoint_scores, threshold=0.3):
h, w = frame.shape[:2] h, w = frame.shape[:2]
for person_kps, person_scores in zip(keypoints, keypoint_scores): for person_kps, person_scores in zip(keypoints, keypoint_scores):
# Skeleton lines
for j1, j2 in SKELETON: for j1, j2 in SKELETON:
if person_scores[j1] > threshold and person_scores[j2] > threshold: if person_scores[j1] > threshold and person_scores[j2] > threshold:
pt1 = (int(person_kps[j1][0]), int(person_kps[j1][1])) pt1 = (int(person_kps[j1][0]), int(person_kps[j1][1]))
@@ -48,7 +57,6 @@ def draw_keypoints(frame, keypoints, keypoint_scores, threshold=0.3):
0 <= pt2[0] < w and 0 <= pt2[1] < h): 0 <= pt2[0] < w and 0 <= pt2[1] < h):
cv2.line(frame, pt1, pt2, (0, 255, 0), 2) cv2.line(frame, pt1, pt2, (0, 255, 0), 2)
# Joint circles
for idx, (kp, score) in enumerate(zip(person_kps, person_scores)): for idx, (kp, score) in enumerate(zip(person_kps, person_scores)):
if score > threshold: if score > threshold:
x, y = int(kp[0]), int(kp[1]) x, y = int(kp[0]), int(kp[1])
@@ -62,8 +70,8 @@ def draw_keypoints(frame, keypoints, keypoint_scores, threshold=0.3):
# ── ROS 2 Node ─────────────────────────────────────────────────────────── # ── ROS 2 Node ───────────────────────────────────────────────────────────
class KeypointNode(Node): class KeypointTriangulationNode(Node):
"""Subscribe to stereo rectified images, run MMPose, and display.""" """Stereo keypoint triangulation with MMPose."""
def __init__(self): def __init__(self):
super().__init__('keypoint_node') super().__init__('keypoint_node')
@@ -71,125 +79,360 @@ class KeypointNode(Node):
# ── Parameters ────────────────────────────────────────────────── # ── Parameters ──────────────────────────────────────────────────
self.declare_parameter('threshold', 0.3) self.declare_parameter('threshold', 0.3)
self.declare_parameter('device', 'cuda:0') self.declare_parameter('device', 'cuda:0')
self.declare_parameter('max_residual', 0.10) # metres
threshold = self.get_parameter('threshold').value self._threshold = self.get_parameter('threshold').value
self._max_residual = self.get_parameter('max_residual').value
device = self.get_parameter('device').value device = self.get_parameter('device').value
self._threshold = threshold
# ── MMPose (single shared instance) ───────────────────────────── # ── MMPose (single shared instance) ─────────────────────────────
self.get_logger().info( self.get_logger().info(f'Loading MMPose on {device} ...')
f'Loading MMPose model on {device} (this may take a moment)…' self._inferencer = MMPoseInferencer(pose2d='human', device=device)
) self.get_logger().info('MMPose loaded.')
self._inferencer = MMPoseInferencer(
pose2d='human',
device=device,
)
self.get_logger().info('MMPose model loaded.')
# ── cv_bridge ─────────────────────────────────────────────────── # ── cv_bridge ───────────────────────────────────────────────────
self._bridge = CvBridge() self._bridge = CvBridge()
# Latest frames (written by subscribers, read by display timer) # ── Calibration (filled from camera_info subscribers) ───────────
self._left_frame: np.ndarray | None = None self._calib = {'left': None, 'right': None}
self._right_frame: np.ndarray | None = None self._stereo = None # computed once both camera_infos arrive
# ── Subscribers (independent — no time-sync needed) ─────────────
self.create_subscription( self.create_subscription(
Image, CameraInfo, '/stereo/left/camera_info',
'/stereo/left/image_rect', lambda m: self._info_cb(m, 'left'), 10)
self._left_cb,
10,
)
self.create_subscription( self.create_subscription(
Image, CameraInfo, '/stereo/right/camera_info',
'/stereo/right/image_rect', lambda m: self._info_cb(m, 'right'), 10)
self._right_cb,
10,
)
# ── Timer for cv2.waitKey (keeps GUI responsive) ──────────────── # ── Time-synced image subscribers ───────────────────────────────
# ~30 Hz is plenty for display refresh left_sub = Subscriber(self, Image, '/stereo/left/image_raw')
right_sub = Subscriber(self, Image, '/stereo/right/image_raw')
self._sync = ApproximateTimeSynchronizer(
[left_sub, right_sub], queue_size=5, slop=0.05)
self._sync.registerCallback(self._synced_cb)
# ── Publisher ───────────────────────────────────────────────────
self._marker_pub = self.create_publisher(
MarkerArray, '/keypoint_markers', 10)
# ── Display state ───────────────────────────────────────────────
self._left_display = None
self._right_display = None
self.create_timer(1.0 / 30.0, self._display_timer_cb) self.create_timer(1.0 / 30.0, self._display_timer_cb)
self.get_logger().info( self.get_logger().info(
'Subscribed to /stereo/left/image_rect and ' 'Waiting for camera_info and synced image_raw from '
'/stereo/right/image_rect — waiting for images…' '/stereo/left and /stereo/right ...')
)
# ── Subscriber callbacks ──────────────────────────────────────────── # ── Camera info / stereo geometry ───────────────────────────────────
def _process_frame(self, msg: Image, label: str) -> np.ndarray: def _info_cb(self, msg: CameraInfo, side: str):
"""Convert ROS Image -> OpenCV, run MMPose, draw keypoints.""" if self._calib[side] is not None:
# Log image metadata once to help diagnose blank-frame issues return
if not hasattr(self, f'_logged_{label}'): self._calib[side] = {
'K': np.array(msg.k, dtype=np.float64).reshape(3, 3),
'D': np.array(msg.d, dtype=np.float64),
'R_rect': np.array(msg.r, dtype=np.float64).reshape(3, 3),
'P': np.array(msg.p, dtype=np.float64).reshape(3, 4),
}
self.get_logger().info( self.get_logger().info(
f'[{label}] First frame: encoding={msg.encoding} ' f'{side} camera_info received ({msg.width}x{msg.height})')
f'size={msg.width}x{msg.height} step={msg.step}' self._try_compute_stereo()
)
setattr(self, f'_logged_{label}', True)
# Use passthrough first so we can inspect the raw data def _try_compute_stereo(self):
frame = self._bridge.imgmsg_to_cv2(msg, desired_encoding='passthrough') if self._calib['left'] is None or self._calib['right'] is None:
return
if self._stereo is not None:
return
if not hasattr(self, f'_logged_px_{label}'): R1 = self._calib['left']['R_rect']
R2 = self._calib['right']['R_rect']
P_R = self._calib['right']['P']
# Baseline: P_right[0,3] = fx_rect * tx (tx is negative)
tx = P_R[0, 3] / P_R[0, 0]
# Right camera origin in left (world) frame:
# O_right = -R1^T @ [tx, 0, 0]
O_right = -R1.T @ np.array([tx, 0.0, 0.0])
# Rotation that transforms a direction vector from the right
# camera frame into the left (world) frame:
# d_world = R_right_to_world @ d_right_cam
R_right_to_world = R1.T @ R2
self._stereo = {
'O_right': O_right,
'R_right_to_world': R_right_to_world,
}
baseline = np.linalg.norm(O_right)
self.get_logger().info( self.get_logger().info(
f'[{label}] Frame dtype={frame.dtype} shape={frame.shape} ' f'Stereo geometry ready. Baseline = {baseline:.4f} m '
f'min={frame.min()} max={frame.max()}' f'O_right = {O_right.round(4).tolist()}')
)
setattr(self, f'_logged_px_{label}', True)
# Convert grayscale to BGR so MMPose and drawing work correctly # ── Pixel -> ray ────────────────────────────────────────────────────
if len(frame.shape) == 2 or frame.shape[2] == 1:
frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR)
elif frame.shape[2] == 4:
frame = cv2.cvtColor(frame, cv2.COLOR_BGRA2BGR)
def _pixel_to_ray(self, u, v, side):
"""Undistort pixel (u,v) and return a unit ray in the world frame."""
K = self._calib[side]['K']
D = self._calib[side]['D']
# cv2.undistortPoints returns normalised coordinates (K^-1 applied
# and distortion corrected).
pts = np.array([[[u, v]]], dtype=np.float64)
norm = cv2.undistortPoints(pts, K, D)
d_cam = np.array([norm[0, 0, 0], norm[0, 0, 1], 1.0])
if side == 'right':
d_world = self._stereo['R_right_to_world'] @ d_cam
else:
d_world = d_cam # left camera IS the world frame
return d_world / np.linalg.norm(d_world)
# ── Triangulation (closest point of approach) ───────────────────────
def _triangulate(self, d1, d2):
"""
Given two unit rays from O_left and O_right, find the 3D midpoint
of closest approach and the residual (ray separation).
Returns (point_3d, residual) or (None, inf) if rays are parallel.
"""
O1 = np.zeros(3)
O2 = self._stereo['O_right']
b = O2 - O1 # baseline vector
# Solve s*d1 - t*d2 = b in the least-squares sense:
# | d1.d1 -d1.d2 | |s| |d1.b|
# | d1.d2 -d2.d2 | |t| = |d2.b|
d1d1 = d1 @ d1
d2d2 = d2 @ d2
d1d2 = d1 @ d2
denom = d1d1 * d2d2 - d1d2 * d1d2
if abs(denom) < 1e-12:
return None, float('inf')
d1b = d1 @ b
d2b = d2 @ b
s = (d1b * d2d2 - d2b * d1d2) / denom
t = (d1b * d1d2 - d2b * d1d1) / denom
p1 = O1 + s * d1
p2 = O2 + t * d2
return (p1 + p2) / 2.0, np.linalg.norm(p1 - p2)
# ── MMPose helper ───────────────────────────────────────────────────
def _run_mmpose(self, frame):
"""Return list of dicts with 'keypoints' (N,2) and 'scores' (N,)."""
result = next(self._inferencer( result = next(self._inferencer(
frame, frame, show=False, return_datasamples=False))
show=False,
return_datasamples=False,
))
predictions = result.get('predictions', [[]])[0] people = []
all_kps, all_scores = [], [] for pred in result.get('predictions', [[]])[0]:
for pred in predictions:
kps = pred.get('keypoints', []) kps = pred.get('keypoints', [])
scores = pred.get('keypoint_scores', []) scores = pred.get('keypoint_scores', [])
if len(kps) > 0: if len(kps) > 0:
all_kps.append(np.array(kps)) people.append({
all_scores.append(np.array(scores)) 'keypoints': np.array(kps),
'scores': np.array(scores),
})
return people
if all_kps: # ── Person matching across cameras ──────────────────────────────────
draw_keypoints(frame, all_kps, all_scores, self._threshold)
def _match_people(self, left_people, right_people):
"""
Match people across stereo views using vertical-coordinate
similarity (epipolar heuristic: same person has similar y in
a roughly-rectified stereo pair).
Returns list of (left_person, right_person) tuples.
"""
if not left_people or not right_people:
return []
thr = self._threshold
n_l, n_r = len(left_people), len(right_people)
# Cost matrix: mean |y-difference| for mutually confident keypoints
cost = np.full((n_l, n_r), np.inf)
for i, lp in enumerate(left_people):
for j, rp in enumerate(right_people):
mask = (lp['scores'] > thr) & (rp['scores'] > thr)
if mask.sum() < 3:
continue
cost[i, j] = np.abs(
lp['keypoints'][mask, 1] - rp['keypoints'][mask, 1]
).mean()
# Greedy matching by lowest cost
matches = []
used_l, used_r = set(), set()
while True:
remaining = np.full_like(cost, np.inf)
for i in range(n_l):
for j in range(n_r):
if i not in used_l and j not in used_r:
remaining[i, j] = cost[i, j]
idx = np.unravel_index(remaining.argmin(), remaining.shape)
if remaining[idx] > 100.0 or np.isinf(remaining[idx]):
break
matches.append((left_people[idx[0]], right_people[idx[1]]))
used_l.add(idx[0])
used_r.add(idx[1])
return matches
# ── Marker builder ──────────────────────────────────────────────────
def _build_markers(self, all_points_3d, stamp):
ma = MarkerArray()
# Clear previous frame's markers
delete = Marker()
delete.action = Marker.DELETEALL
delete.header.frame_id = 'left'
delete.header.stamp = stamp
ma.markers.append(delete)
mid = 0
for person_idx, person_pts in enumerate(all_points_3d):
r, g, b_ = colorsys.hsv_to_rgb((person_idx * 0.3) % 1.0, 1.0, 1.0)
# Sphere per keypoint
for kp_idx, (pt, _res) in person_pts.items():
m = Marker()
m.header.frame_id = 'left'
m.header.stamp = stamp
m.ns = f'person_{person_idx}'
m.id = mid; mid += 1
m.type = Marker.SPHERE
m.action = Marker.ADD
m.pose.position.x = float(pt[0])
m.pose.position.y = float(pt[1])
m.pose.position.z = float(pt[2])
m.pose.orientation.w = 1.0
m.scale.x = m.scale.y = m.scale.z = 0.05
m.color.r, m.color.g, m.color.b = r, g, b_
m.color.a = 1.0
m.lifetime.nanosec = 300_000_000
ma.markers.append(m)
# Skeleton lines
for j1, j2 in SKELETON:
if j1 in person_pts and j2 in person_pts:
m = Marker()
m.header.frame_id = 'left'
m.header.stamp = stamp
m.ns = f'skel_{person_idx}'
m.id = mid; mid += 1
m.type = Marker.LINE_STRIP
m.action = Marker.ADD
m.points = [
Point(x=float(person_pts[j1][0][0]),
y=float(person_pts[j1][0][1]),
z=float(person_pts[j1][0][2])),
Point(x=float(person_pts[j2][0][0]),
y=float(person_pts[j2][0][1]),
z=float(person_pts[j2][0][2])),
]
m.scale.x = 0.02
m.color.r, m.color.g, m.color.b = r, g, b_
m.color.a = 0.8
m.lifetime.nanosec = 300_000_000
ma.markers.append(m)
return ma
# ── Main synced callback ────────────────────────────────────────────
def _synced_cb(self, left_msg: Image, right_msg: Image):
if self._stereo is None:
return # waiting for camera_info
# Convert images (handles Bayer -> BGR automatically)
left_frame = self._bridge.imgmsg_to_cv2(
left_msg, desired_encoding='bgr8')
right_frame = self._bridge.imgmsg_to_cv2(
right_msg, desired_encoding='bgr8')
# Run MMPose on both views
left_people = self._run_mmpose(left_frame)
right_people = self._run_mmpose(right_frame)
# Match people across cameras
matches = self._match_people(left_people, right_people)
# Triangulate every mutually-confident keypoint
all_points_3d = []
for lp, rp in matches:
person_pts = {} # kp_idx -> (xyz, residual)
for kp_idx in range(17):
if (lp['scores'][kp_idx] <= self._threshold or
rp['scores'][kp_idx] <= self._threshold):
continue
u_l, v_l = lp['keypoints'][kp_idx]
u_r, v_r = rp['keypoints'][kp_idx]
d1 = self._pixel_to_ray(u_l, v_l, 'left')
d2 = self._pixel_to_ray(u_r, v_r, 'right')
pt3d, residual = self._triangulate(d1, d2)
if pt3d is not None and residual < self._max_residual:
person_pts[kp_idx] = (pt3d, residual)
if person_pts:
all_points_3d.append(person_pts)
# Publish 3D markers
self._marker_pub.publish(
self._build_markers(all_points_3d, left_msg.header.stamp))
# Draw 2D keypoints on display frames
if left_people:
draw_keypoints(
left_frame,
[p['keypoints'] for p in left_people],
[p['scores'] for p in left_people],
self._threshold)
if right_people:
draw_keypoints(
right_frame,
[p['keypoints'] for p in right_people],
[p['scores'] for p in right_people],
self._threshold)
n_3d = sum(len(p) for p in all_points_3d)
cv2.putText( cv2.putText(
frame, left_frame,
f'Subjects: {len(all_kps)}', f'People: {len(left_people)} 3D pts: {n_3d}',
(10, 30), (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
cv2.FONT_HERSHEY_SIMPLEX, cv2.putText(
0.8, right_frame,
(255, 255, 255), f'People: {len(right_people)} Matched: {len(matches)}',
2, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
)
return frame
def _left_cb(self, msg: Image) -> None: self._left_display = left_frame
self._left_frame = self._process_frame(msg, 'left') self._right_display = right_frame
def _right_cb(self, msg: Image) -> None:
self._right_frame = self._process_frame(msg, 'right')
# ── Display timer ─────────────────────────────────────────────────── # ── Display timer ───────────────────────────────────────────────────
def _display_timer_cb(self) -> None: def _display_timer_cb(self):
if self._left_frame is not None: if self._left_display is not None:
cv2.imshow('Left - MMPose', self._left_frame) cv2.imshow('Left - Keypoints', self._left_display)
if self._right_frame is not None: if self._right_display is not None:
cv2.imshow('Right - MMPose', self._right_frame) cv2.imshow('Right - Keypoints', self._right_display)
key = cv2.waitKey(1) & 0xFF key = cv2.waitKey(1) & 0xFF
if key == ord('q'): if key == ord('q'):
self.get_logger().info('Quit requested — shutting down.') self.get_logger().info('Quit requested.')
self.destroy_node() self.destroy_node()
rclpy.shutdown() rclpy.shutdown()
@@ -198,7 +441,7 @@ class KeypointNode(Node):
def main(args=None): def main(args=None):
rclpy.init(args=args) rclpy.init(args=args)
node = KeypointNode() node = KeypointTriangulationNode()
try: try:
rclpy.spin(node) rclpy.spin(node)
except KeyboardInterrupt: except KeyboardInterrupt:

View File

@@ -0,0 +1,31 @@
"""Launch keypoint_node using the mmpose conda environment's Python."""
import os
from launch import LaunchDescription
from launch.actions import ExecuteProcess
def generate_launch_description():
python_exe = os.path.expanduser(
'~/miniconda3/envs/mmpose/bin/python3'
)
node_module = 'keypoint_pose.keypoint_node'
return LaunchDescription([
ExecuteProcess(
cmd=[
python_exe, '-m', node_module,
'--ros-args',
'-p', 'threshold:=0.3',
'-p', 'device:=cuda:0',
'-p', 'max_residual:=0.10',
],
output='screen',
env={
**os.environ,
'DISPLAY': os.environ.get('DISPLAY', ':1'),
'QT_QPA_PLATFORM_PLUGIN_PATH': '',
},
),
])

View File

@@ -10,6 +10,9 @@
<depend>rclpy</depend> <depend>rclpy</depend>
<depend>sensor_msgs</depend> <depend>sensor_msgs</depend>
<depend>cv_bridge</depend> <depend>cv_bridge</depend>
<depend>message_filters</depend>
<depend>visualization_msgs</depend>
<depend>geometry_msgs</depend>
<test_depend>ament_copyright</test_depend> <test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend> <test_depend>ament_flake8</test_depend>

View File

@@ -2,3 +2,5 @@
script_dir=$base/lib/keypoint_pose script_dir=$base/lib/keypoint_pose
[install] [install]
install_scripts=$base/lib/keypoint_pose install_scripts=$base/lib/keypoint_pose
[build_scripts]
executable=/usr/bin/env python3

View File

@@ -1,4 +1,6 @@
from setuptools import find_packages, setup from setuptools import find_packages, setup
import os
from glob import glob
package_name = 'keypoint_pose' package_name = 'keypoint_pose'
@@ -10,6 +12,7 @@ setup(
('share/ament_index/resource_index/packages', ('share/ament_index/resource_index/packages',
['resource/' + package_name]), ['resource/' + package_name]),
('share/' + package_name, ['package.xml']), ('share/' + package_name, ['package.xml']),
(os.path.join('share', package_name, 'launch'), glob('launch/*.py')),
], ],
install_requires=['setuptools'], install_requires=['setuptools'],
zip_safe=True, zip_safe=True,

View File

@@ -0,0 +1,77 @@
"""Launch camera drivers only (no rectification or stereo processing)."""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch_ros.actions import ComposableNodeContainer, Node
from launch_ros.descriptions import ComposableNode
LEFT_CAMERA_NAME = 'left'
RIGHT_CAMERA_NAME = 'right'
def generate_launch_description():
config_path = os.path.join(
get_package_share_directory('spinnaker_camera_driver'),
'config',
'blackfly_s.yaml'
)
# ── Camera Drivers (component_container_mt) ─────────────────────────
#
# Both cameras share a single process because the FLIR Spinnaker SDK
# requires USB3 cameras to negotiate bandwidth within one process.
#
# Topics published (per camera):
# /stereo/{left,right}/image_raw (sensor_msgs/Image)
# /stereo/{left,right}/camera_info (sensor_msgs/CameraInfo)
camera_container = ComposableNodeContainer(
name='camera_container',
namespace='',
package='rclcpp_components',
executable='component_container_mt',
composable_node_descriptions=[
ComposableNode(
package='spinnaker_camera_driver',
plugin='spinnaker_camera_driver::CameraDriver',
name=LEFT_CAMERA_NAME,
namespace='stereo',
parameters=[{
'parameter_file': config_path,
'serial_number': '25282106',
'camera_info_url': 'file:///home/sentry/camera_ws/stereoCal/left.yaml',
}],
extra_arguments=[{'use_intra_process_comms': True}],
),
ComposableNode(
package='spinnaker_camera_driver',
plugin='spinnaker_camera_driver::CameraDriver',
name=RIGHT_CAMERA_NAME,
namespace='stereo',
parameters=[{
'parameter_file': config_path,
'serial_number': '25235293',
'camera_info_url': 'file:///home/sentry/camera_ws/stereoCal/right.yaml',
}],
extra_arguments=[{'use_intra_process_comms': True}],
),
],
output='screen',
)
# ── Static TF: world -> left ─────────────────────────────────────────
tf_publisher = Node(
package='tf2_ros',
executable='static_transform_publisher',
arguments=[
'--x', '0', '--y', '0', '--z', '0',
'--yaw', '0', '--pitch', '0', '--roll', '0',
'--frame-id', 'world',
'--child-frame-id', 'left',
],
)
return LaunchDescription([
camera_container,
tf_publisher,
])