sensible names

This commit is contained in:
2026-03-15 13:14:40 -05:00
parent 4a16258286
commit e65b1b4ccb
11 changed files with 12 additions and 12 deletions

View File

@@ -0,0 +1,506 @@
"""
ROS 2 node: stereo keypoint triangulation using MMPose.
Subscribes to raw stereo images, runs pose estimation on each,
matches keypoints across cameras, and triangulates 3D positions.
Published topics:
/keypoint_markers (visualization_msgs/MarkerArray) -- for RViz
"""
import colorsys
import cv2
import numpy as np
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image, CameraInfo
from visualization_msgs.msg import Marker, MarkerArray
from geometry_msgs.msg import Point
from cv_bridge import CvBridge
from message_filters import Subscriber, ApproximateTimeSynchronizer
from mmpose.apis import MMPoseInferencer
# ── COCO-17 constants ───────────────────────────────────────────────────
SKELETON = [
(0, 1), (0, 2), # nose -> eyes
(1, 3), (2, 4), # eyes -> ears
(5, 6), # shoulders
(5, 7), (7, 9), # left arm
(6, 8), (8, 10), # right arm
(5, 11), (6, 12), # shoulders -> hips
(11, 12), # hips
(11, 13), (13, 15), # left leg
(12, 14), (14, 16), # right leg
]
KEYPOINT_NAMES = [
'nose', 'left_eye', 'right_eye', 'left_ear', 'right_ear',
'left_shoulder', 'right_shoulder', 'left_elbow', 'right_elbow',
'left_wrist', 'right_wrist', 'left_hip', 'right_hip',
'left_knee', 'right_knee', 'left_ankle', 'right_ankle',
]
# Minimum confidence in BOTH camera feeds to include a keypoint in distance calc
DIST_THRESHOLD = 0.75
def draw_keypoints(frame, keypoints, keypoint_scores, threshold=0.3):
"""Draw COCO 17-keypoint skeleton on *frame* (in-place)."""
h, w = frame.shape[:2]
for person_kps, person_scores in zip(keypoints, keypoint_scores):
for j1, j2 in SKELETON:
if person_scores[j1] > threshold and person_scores[j2] > threshold:
pt1 = (int(person_kps[j1][0]), int(person_kps[j1][1]))
pt2 = (int(person_kps[j2][0]), int(person_kps[j2][1]))
if (0 <= pt1[0] < w and 0 <= pt1[1] < h and
0 <= pt2[0] < w and 0 <= pt2[1] < h):
cv2.line(frame, pt1, pt2, (0, 255, 0), 2)
for kp, score in zip(person_kps, person_scores):
if score > threshold:
x, y = int(kp[0]), int(kp[1])
if 0 <= x < w and 0 <= y < h:
color = (0, 255, 0) if score > 0.7 else (0, 200, 200)
cv2.circle(frame, (x, y), 4, color, -1)
cv2.circle(frame, (x, y), 5, (255, 255, 255), 1)
return frame
# ── ROS 2 Node ───────────────────────────────────────────────────────────
class KeypointTriangulationNode(Node):
"""Stereo keypoint triangulation with MMPose."""
def __init__(self):
super().__init__('single_person_loc_node')
# ── Parameters ──────────────────────────────────────────────────
self.declare_parameter('threshold', 0.3)
self.declare_parameter('device', 'cuda:0')
self.declare_parameter('max_residual', 0.10) # metres
self._threshold = self.get_parameter('threshold').value
self._max_residual = self.get_parameter('max_residual').value
device = self.get_parameter('device').value
# ── MMPose (single shared instance) ─────────────────────────────
self.get_logger().info(f'Loading MMPose on {device} ...')
self._inferencer = MMPoseInferencer(pose2d='human', device=device)
self.get_logger().info('MMPose loaded.')
# ── cv_bridge ───────────────────────────────────────────────────
self._bridge = CvBridge()
# ── Calibration (filled from camera_info subscribers) ───────────
self._calib = {'left': None, 'right': None}
self._stereo = None # computed once both camera_infos arrive
self.create_subscription(
CameraInfo, '/stereo/left/camera_info',
lambda m: self._info_cb(m, 'left'), 10)
self.create_subscription(
CameraInfo, '/stereo/right/camera_info',
lambda m: self._info_cb(m, 'right'), 10)
# ── Time-synced image subscribers ───────────────────────────────
left_sub = Subscriber(self, Image, '/stereo/left/image_raw')
right_sub = Subscriber(self, Image, '/stereo/right/image_raw')
self._sync = ApproximateTimeSynchronizer(
[left_sub, right_sub], queue_size=5, slop=0.05)
self._sync.registerCallback(self._synced_cb)
# ── Publisher ───────────────────────────────────────────────────
self._marker_pub = self.create_publisher(
MarkerArray, '/keypoint_markers', 10)
# ── Display state ───────────────────────────────────────────────
self._left_display = None
self._right_display = None
self._dist_display = None
self.get_logger().info(
f'Distance window uses keypoints with >= {DIST_THRESHOLD*100:.0f}% '
'confidence in both camera feeds.')
self.create_timer(1.0 / 30.0, self._display_timer_cb)
self.get_logger().info(
'Waiting for camera_info and synced image_raw from '
'/stereo/left and /stereo/right ...')
# ── Camera info / stereo geometry ───────────────────────────────────
def _info_cb(self, msg: CameraInfo, side: str):
if self._calib[side] is not None:
return
self._calib[side] = {
'K': np.array(msg.k, dtype=np.float64).reshape(3, 3),
'D': np.array(msg.d, dtype=np.float64),
'R_rect': np.array(msg.r, dtype=np.float64).reshape(3, 3),
'P': np.array(msg.p, dtype=np.float64).reshape(3, 4),
}
self.get_logger().info(
f'{side} camera_info received ({msg.width}x{msg.height})')
self._try_compute_stereo()
def _try_compute_stereo(self):
if self._calib['left'] is None or self._calib['right'] is None:
return
if self._stereo is not None:
return
R1 = self._calib['left']['R_rect']
R2 = self._calib['right']['R_rect']
P_R = self._calib['right']['P']
# Baseline: P_right[0,3] = fx_rect * tx (tx is negative)
tx = P_R[0, 3] / P_R[0, 0]
# Right camera origin in left (world) frame:
# O_right = -R1^T @ [tx, 0, 0]
O_right = -R1.T @ np.array([tx, 0.0, 0.0])
# Rotation that transforms a direction vector from the right
# camera frame into the left (world) frame:
# d_world = R_right_to_world @ d_right_cam
R_right_to_world = R1.T @ R2
self._stereo = {
'O_right': O_right,
'R_right_to_world': R_right_to_world,
}
baseline = np.linalg.norm(O_right)
self.get_logger().info(
f'Stereo geometry ready. Baseline = {baseline:.4f} m '
f'O_right = {O_right.round(4).tolist()}')
# ── Pixel -> ray ────────────────────────────────────────────────────
def _pixel_to_ray(self, u, v, side):
"""Undistort pixel (u,v) and return a unit ray in the world frame."""
K = self._calib[side]['K']
D = self._calib[side]['D']
# cv2.undistortPoints returns normalised coordinates (K^-1 applied
# and distortion corrected).
pts = np.array([[[u, v]]], dtype=np.float64)
norm = cv2.undistortPoints(pts, K, D)
d_cam = np.array([norm[0, 0, 0], norm[0, 0, 1], 1.0])
if side == 'right':
d_world = self._stereo['R_right_to_world'] @ d_cam
else:
d_world = d_cam # left camera IS the world frame
return d_world / np.linalg.norm(d_world)
# ── Triangulation (closest point of approach) ───────────────────────
def _triangulate(self, d1, d2):
"""
Given two unit rays from O_left and O_right, find the 3D midpoint
of closest approach and the residual (ray separation).
Returns (point_3d, residual) or (None, inf) if rays are parallel.
"""
O1 = np.zeros(3)
O2 = self._stereo['O_right']
b = O2 - O1 # baseline vector
# Solve s*d1 - t*d2 = b in the least-squares sense:
# | d1.d1 -d1.d2 | |s| |d1.b|
# | d1.d2 -d2.d2 | |t| = |d2.b|
d1d1 = d1 @ d1
d2d2 = d2 @ d2
d1d2 = d1 @ d2
denom = d1d1 * d2d2 - d1d2 * d1d2
if abs(denom) < 1e-12:
return None, float('inf')
d1b = d1 @ b
d2b = d2 @ b
s = (d1b * d2d2 - d2b * d1d2) / denom
t = (d1b * d1d2 - d2b * d1d1) / denom
p1 = O1 + s * d1
p2 = O2 + t * d2
return (p1 + p2) / 2.0, np.linalg.norm(p1 - p2)
# ── MMPose helper ───────────────────────────────────────────────────
def _run_mmpose(self, frame):
"""Return list of dicts with 'keypoints' (N,2) and 'scores' (N,)."""
result = next(self._inferencer(
frame, show=False, return_datasamples=False))
people = []
for pred in result.get('predictions', [[]])[0]:
kps = pred.get('keypoints', [])
scores = pred.get('keypoint_scores', [])
if len(kps) > 0:
people.append({
'keypoints': np.array(kps),
'scores': np.array(scores),
})
return people
# ── Person matching across cameras ──────────────────────────────────
def _match_people(self, left_people, right_people):
"""
Match people across stereo views using vertical-coordinate
similarity (epipolar heuristic: same person has similar y in
a roughly-rectified stereo pair).
Returns list of (left_person, right_person) tuples.
"""
if not left_people or not right_people:
return []
thr = self._threshold
n_l, n_r = len(left_people), len(right_people)
# Cost matrix: mean |y-difference| for mutually confident keypoints
cost = np.full((n_l, n_r), np.inf)
for i, lp in enumerate(left_people):
for j, rp in enumerate(right_people):
mask = (lp['scores'] > thr) & (rp['scores'] > thr)
if mask.sum() < 3:
continue
cost[i, j] = np.abs(
lp['keypoints'][mask, 1] - rp['keypoints'][mask, 1]
).mean()
# Greedy matching by lowest cost
matches = []
used_l, used_r = set(), set()
while True:
remaining = np.full_like(cost, np.inf)
for i in range(n_l):
for j in range(n_r):
if i not in used_l and j not in used_r:
remaining[i, j] = cost[i, j]
idx = np.unravel_index(remaining.argmin(), remaining.shape)
if remaining[idx] > 100.0 or np.isinf(remaining[idx]):
break
matches.append((left_people[idx[0]], right_people[idx[1]]))
used_l.add(idx[0])
used_r.add(idx[1])
return matches
# ── Marker builder ──────────────────────────────────────────────────
def _build_markers(self, all_points_3d, stamp):
ma = MarkerArray()
# Clear previous frame's markers
delete = Marker()
delete.action = Marker.DELETEALL
delete.header.frame_id = 'left'
delete.header.stamp = stamp
ma.markers.append(delete)
mid = 0
for person_idx, person_pts in enumerate(all_points_3d):
r, g, b_ = colorsys.hsv_to_rgb((person_idx * 0.3) % 1.0, 1.0, 1.0)
# Sphere per keypoint
for kp_idx, (pt, _res) in person_pts.items():
m = Marker()
m.header.frame_id = 'left'
m.header.stamp = stamp
m.ns = f'person_{person_idx}'
m.id = mid; mid += 1
m.type = Marker.SPHERE
m.action = Marker.ADD
m.pose.position.x = float(pt[0])
m.pose.position.y = float(pt[1])
m.pose.position.z = float(pt[2])
m.pose.orientation.w = 1.0
m.scale.x = m.scale.y = m.scale.z = 0.05
m.color.r, m.color.g, m.color.b = r, g, b_
m.color.a = 1.0
m.lifetime.nanosec = 300_000_000
ma.markers.append(m)
# Skeleton lines
for j1, j2 in SKELETON:
if j1 in person_pts and j2 in person_pts:
m = Marker()
m.header.frame_id = 'left'
m.header.stamp = stamp
m.ns = f'skel_{person_idx}'
m.id = mid; mid += 1
m.type = Marker.LINE_STRIP
m.action = Marker.ADD
m.points = [
Point(x=float(person_pts[j1][0][0]),
y=float(person_pts[j1][0][1]),
z=float(person_pts[j1][0][2])),
Point(x=float(person_pts[j2][0][0]),
y=float(person_pts[j2][0][1]),
z=float(person_pts[j2][0][2])),
]
m.scale.x = 0.02
m.color.r, m.color.g, m.color.b = r, g, b_
m.color.a = 0.8
m.lifetime.nanosec = 300_000_000
ma.markers.append(m)
return ma
# ── Main synced callback ────────────────────────────────────────────
def _synced_cb(self, left_msg: Image, right_msg: Image):
if self._stereo is None:
return # waiting for camera_info
# Convert images (handles Bayer -> BGR automatically)
left_frame = self._bridge.imgmsg_to_cv2(
left_msg, desired_encoding='bgr8')
right_frame = self._bridge.imgmsg_to_cv2(
right_msg, desired_encoding='bgr8')
# Run MMPose on both views
left_people = self._run_mmpose(left_frame)
right_people = self._run_mmpose(right_frame)
# Match people across cameras
matches = self._match_people(left_people, right_people)
# Triangulate every mutually-confident keypoint
all_points_3d = []
avg_distances = [] # per-person average distance (75%+ conf kps only)
avg_coords = [] # per-person mean (x, y, z) of 75%+ conf kps
for lp, rp in matches:
person_pts = {} # kp_idx -> (xyz, residual)
high_conf_dists = []
high_conf_pts = []
for kp_idx in range(17):
if (lp['scores'][kp_idx] <= self._threshold or
rp['scores'][kp_idx] <= self._threshold):
continue
u_l, v_l = lp['keypoints'][kp_idx]
u_r, v_r = rp['keypoints'][kp_idx]
d1 = self._pixel_to_ray(u_l, v_l, 'left')
d2 = self._pixel_to_ray(u_r, v_r, 'right')
pt3d, residual = self._triangulate(d1, d2)
if pt3d is not None and residual < self._max_residual:
person_pts[kp_idx] = (pt3d, residual)
# Only count toward distance if both scores >= DIST_THRESHOLD
if (lp['scores'][kp_idx] >= DIST_THRESHOLD and
rp['scores'][kp_idx] >= DIST_THRESHOLD):
high_conf_dists.append(np.linalg.norm(pt3d))
high_conf_pts.append(pt3d)
if person_pts:
all_points_3d.append(person_pts)
avg_distances.append(
float(np.mean(high_conf_dists)) if high_conf_dists else None)
avg_coords.append(
np.mean(high_conf_pts, axis=0) if high_conf_pts else None)
# Publish 3D markers
self._marker_pub.publish(
self._build_markers(all_points_3d, left_msg.header.stamp))
# Build distance display window
row_h = 220
frame_h = max(400, 140 + row_h * max(len(avg_distances), 1))
dist_frame = np.zeros((frame_h, 1600, 3), dtype=np.uint8)
cv2.putText(dist_frame, 'Avg distance (>=75% conf keypoints)',
(20, 70), cv2.FONT_HERSHEY_SIMPLEX, 2.0, (180, 180, 180), 3)
if avg_distances:
for i, (d, xyz) in enumerate(zip(avg_distances, avg_coords)):
if d is not None:
txt = f'Person {i + 1}: {d:.2f} m'
color = (100, 255, 100)
coord_txt = f'x={xyz[0]:+.3f} y={xyz[1]:+.3f} z={xyz[2]:+.3f} [m, left cam frame]'
coord_color = (160, 160, 160)
else:
txt = f'Person {i + 1}: -- (no 75%+ kps)'
color = (80, 80, 200)
coord_txt = ''
coord_color = (80, 80, 80)
cv2.putText(dist_frame, txt, (20, 140 + i * row_h),
cv2.FONT_HERSHEY_SIMPLEX, 4.5, color, 8)
if coord_txt:
cv2.putText(dist_frame, coord_txt, (28, 140 + i * row_h + 70),
cv2.FONT_HERSHEY_SIMPLEX, 1.4, coord_color, 2)
else:
cv2.putText(dist_frame, 'No people detected', (20, 200),
cv2.FONT_HERSHEY_SIMPLEX, 4.0, (100, 100, 100), 6)
self._dist_display = dist_frame
# Draw 2D keypoints on display frames
if left_people:
draw_keypoints(
left_frame,
[p['keypoints'] for p in left_people],
[p['scores'] for p in left_people],
self._threshold)
if right_people:
draw_keypoints(
right_frame,
[p['keypoints'] for p in right_people],
[p['scores'] for p in right_people],
self._threshold)
n_3d = sum(len(p) for p in all_points_3d)
cv2.putText(
left_frame,
f'People: {len(left_people)} 3D pts: {n_3d}',
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
cv2.putText(
right_frame,
f'People: {len(right_people)} Matched: {len(matches)}',
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
self._left_display = left_frame
self._right_display = right_frame
# ── Display timer ───────────────────────────────────────────────────
def _display_timer_cb(self):
if self._left_display is not None:
cv2.imshow('Left - Keypoints', self._left_display)
if self._right_display is not None:
cv2.imshow('Right - Keypoints', self._right_display)
if self._dist_display is not None:
cv2.imshow('Distance (75%+ confidence)', self._dist_display)
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
self.get_logger().info('Quit requested.')
self.destroy_node()
rclpy.shutdown()
# ── Entry point ──────────────────────────────────────────────────────────
def main(args=None):
rclpy.init(args=args)
node = KeypointTriangulationNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
cv2.destroyAllWindows()
node.destroy_node()
rclpy.try_shutdown()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,31 @@
"""Launch single_person_loc_node using the mmpose conda environment's Python."""
import os
from launch import LaunchDescription
from launch.actions import ExecuteProcess
def generate_launch_description():
python_exe = os.path.expanduser(
'~/miniconda3/envs/mmpose/bin/python3'
)
node_module = '3D_tracking_Re-ID.single_person_loc_node'
return LaunchDescription([
ExecuteProcess(
cmd=[
python_exe, '-m', node_module,
'--ros-args',
'-p', 'threshold:=0.3',
'-p', 'device:=cuda:0',
'-p', 'max_residual:=0.10',
],
output='screen',
env={
**os.environ,
'DISPLAY': os.environ.get('DISPLAY', ':1'),
'QT_QPA_PLATFORM_PLUGIN_PATH': '',
},
),
])

View File

@@ -0,0 +1,25 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>3D_tracking_Re-ID</name>
<version>0.0.0</version>
<description>MMPose keypoint detection on stereo rectified images</description>
<maintainer email="pulipakaa24@outlook.com">sentry</maintainer>
<license>TODO: License declaration</license>
<depend>rclpy</depend>
<depend>sensor_msgs</depend>
<depend>cv_bridge</depend>
<depend>message_filters</depend>
<depend>visualization_msgs</depend>
<depend>geometry_msgs</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

View File

@@ -0,0 +1,6 @@
[develop]
script_dir=$base/lib/3D_tracking_Re-ID
[install]
install_scripts=$base/lib/3D_tracking_Re-ID
[build_scripts]
executable=/usr/bin/env python3

View File

@@ -0,0 +1,33 @@
from setuptools import find_packages, setup
import os
from glob import glob
package_name = '3D_tracking_Re-ID'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
(os.path.join('share', package_name, 'launch'), glob('launch/*.py')),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='sentry',
maintainer_email='pulipakaa24@outlook.com',
description='MMPose keypoint detection on stereo rectified images',
license='TODO: License declaration',
extras_require={
'test': [
'pytest',
],
},
entry_points={
'console_scripts': [
'single_person_loc_node = 3D_tracking_Re-ID.single_person_loc_node:main',
],
},
)

View File

@@ -0,0 +1,25 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_copyright.main import main
import pytest
# Remove the `skip` decorator once the source file(s) have a copyright header
@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.')
@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found errors'

View File

@@ -0,0 +1,25 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_flake8.main import main_with_errors
import pytest
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc, errors = main_with_errors(argv=[])
assert rc == 0, \
'Found %d code style errors / warnings:\n' % len(errors) + \
'\n'.join(errors)

View File

@@ -0,0 +1,23 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_pep257.main import main
import pytest
@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found code style errors / warnings'