updates
This commit is contained in:
58
tracking_re_id/launch/full_pipeline.launch.py
Normal file
58
tracking_re_id/launch/full_pipeline.launch.py
Normal file
@@ -0,0 +1,58 @@
|
||||
"""Launch the full 3D tracking + ground-plane estimation pipeline.
|
||||
|
||||
Nodes started:
|
||||
1. single_person_loc_node -- headless keypoint triangulator
|
||||
publishes: /keypoint_markers (MarkerArray)
|
||||
/keypoints_3d (PointCloud2)
|
||||
2. ground_plane_node -- ground-plane estimator
|
||||
publishes: /ground_plane_markers (MarkerArray)
|
||||
/ground_plane_pose (PoseStamped)
|
||||
|
||||
Visualise in RViz with Fixed Frame = 'left', then add:
|
||||
MarkerArray /keypoint_markers (3D skeleton + keypoints)
|
||||
MarkerArray /ground_plane_markers (plane disc, normal arrow, ground samples)
|
||||
"""
|
||||
|
||||
import os
|
||||
from launch import LaunchDescription
|
||||
from launch.actions import ExecuteProcess
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
python_exe = os.path.expanduser(
|
||||
'~/miniconda3/envs/mmpose/bin/python3'
|
||||
)
|
||||
|
||||
return LaunchDescription([
|
||||
|
||||
# ── 1. Keypoint triangulator (headless) ─────────────────────────
|
||||
ExecuteProcess(
|
||||
cmd=[
|
||||
python_exe, '-m', 'tracking_re_id.single_person_loc_node',
|
||||
'--ros-args',
|
||||
'-p', 'threshold:=0.3',
|
||||
'-p', 'device:=cuda:0',
|
||||
'-p', 'max_residual:=0.10',
|
||||
'-p', 'headless:=true',
|
||||
],
|
||||
output='screen',
|
||||
env={**os.environ},
|
||||
),
|
||||
|
||||
# ── 2. Ground-plane estimator ────────────────────────────────────
|
||||
ExecuteProcess(
|
||||
cmd=[
|
||||
python_exe, '-m', 'tracking_re_id.ground_plane_node',
|
||||
'--ros-args',
|
||||
'-p', 'stable_frames:=5',
|
||||
'-p', 'stable_radius:=0.05',
|
||||
'-p', 'duplicate_radius:=0',
|
||||
'-p', 'collinearity_threshold:=0.25',
|
||||
'-p', 'max_ground_points:=100',
|
||||
'-p', 'min_plane_points:=5',
|
||||
],
|
||||
output='screen',
|
||||
env={**os.environ},
|
||||
),
|
||||
|
||||
])
|
||||
53
tracking_re_id/launch/ground_plane.launch.py
Normal file
53
tracking_re_id/launch/ground_plane.launch.py
Normal file
@@ -0,0 +1,53 @@
|
||||
"""Launch ground_plane_node alongside single_person_loc_node (headless).
|
||||
|
||||
Runs the keypoint triangulator in headless mode and pipes its output
|
||||
into the ground-plane estimator. Both nodes share the mmpose conda
|
||||
Python environment.
|
||||
"""
|
||||
|
||||
import os
|
||||
from launch import LaunchDescription
|
||||
from launch.actions import ExecuteProcess
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
python_exe = os.path.expanduser(
|
||||
'~/miniconda3/envs/mmpose/bin/python3'
|
||||
)
|
||||
|
||||
return LaunchDescription([
|
||||
|
||||
# ── Keypoint triangulator (headless) ────────────────────────────
|
||||
ExecuteProcess(
|
||||
cmd=[
|
||||
python_exe, '-m', 'tracking_re_id.single_person_loc_node',
|
||||
'--ros-args',
|
||||
'-p', 'threshold:=0.3',
|
||||
'-p', 'device:=cuda:0',
|
||||
'-p', 'max_residual:=0.10',
|
||||
'-p', 'headless:=true',
|
||||
],
|
||||
output='screen',
|
||||
env={**os.environ},
|
||||
),
|
||||
|
||||
# ── Ground-plane estimator ───────────────────────────────────────
|
||||
ExecuteProcess(
|
||||
cmd=[
|
||||
python_exe, '-m', 'tracking_re_id.ground_plane_node',
|
||||
'--ros-args',
|
||||
# Foot must stay within 15 cm for 5 consecutive frames
|
||||
'-p', 'stable_frames:=5',
|
||||
'-p', 'stable_radius:=0.15',
|
||||
# Suppress ground points closer than 12 cm to an existing one
|
||||
'-p', 'duplicate_radius:=0.12',
|
||||
# Points are collinear if none deviates > 5 cm from best-fit line
|
||||
'-p', 'collinearity_threshold:=0.05',
|
||||
'-p', 'max_ground_points:=100',
|
||||
'-p', 'min_plane_points:=3',
|
||||
],
|
||||
output='screen',
|
||||
env={**os.environ},
|
||||
),
|
||||
|
||||
])
|
||||
31
tracking_re_id/launch/single_person_demo.launch.py
Normal file
31
tracking_re_id/launch/single_person_demo.launch.py
Normal file
@@ -0,0 +1,31 @@
|
||||
"""Launch single_person_loc_node using the mmpose conda environment's Python."""
|
||||
|
||||
import os
|
||||
from launch import LaunchDescription
|
||||
from launch.actions import ExecuteProcess
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
python_exe = os.path.expanduser(
|
||||
'~/miniconda3/envs/mmpose/bin/python3'
|
||||
)
|
||||
|
||||
node_module = 'tracking_re_id.single_person_loc_node'
|
||||
|
||||
return LaunchDescription([
|
||||
ExecuteProcess(
|
||||
cmd=[
|
||||
python_exe, '-m', node_module,
|
||||
'--ros-args',
|
||||
'-p', 'threshold:=0.3',
|
||||
'-p', 'device:=cuda:0',
|
||||
'-p', 'max_residual:=0.10',
|
||||
],
|
||||
output='screen',
|
||||
env={
|
||||
**os.environ,
|
||||
'DISPLAY': os.environ.get('DISPLAY', ':1'),
|
||||
'QT_QPA_PLATFORM_PLUGIN_PATH': '',
|
||||
},
|
||||
),
|
||||
])
|
||||
33
tracking_re_id/launch/single_person_headless.launch.py
Normal file
33
tracking_re_id/launch/single_person_headless.launch.py
Normal file
@@ -0,0 +1,33 @@
|
||||
"""Launch single_person_loc_node in headless mode (no display windows).
|
||||
|
||||
Publishes 3D keypoint markers to /keypoint_markers without opening any
|
||||
OpenCV windows. Useful for running on a server or as part of a larger
|
||||
pipeline where visualisation is handled elsewhere.
|
||||
"""
|
||||
|
||||
import os
|
||||
from launch import LaunchDescription
|
||||
from launch.actions import ExecuteProcess
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
python_exe = os.path.expanduser(
|
||||
'~/miniconda3/envs/mmpose/bin/python3'
|
||||
)
|
||||
|
||||
node_module = 'tracking_re_id.single_person_loc_node'
|
||||
|
||||
return LaunchDescription([
|
||||
ExecuteProcess(
|
||||
cmd=[
|
||||
python_exe, '-m', node_module,
|
||||
'--ros-args',
|
||||
'-p', 'threshold:=0.3',
|
||||
'-p', 'device:=cuda:0',
|
||||
'-p', 'max_residual:=0.10',
|
||||
'-p', 'headless:=true',
|
||||
],
|
||||
output='screen',
|
||||
env={**os.environ},
|
||||
),
|
||||
])
|
||||
25
tracking_re_id/package.xml
Normal file
25
tracking_re_id/package.xml
Normal file
@@ -0,0 +1,25 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>tracking_re_id</name>
|
||||
<version>0.0.0</version>
|
||||
<description>MMPose keypoint detection on stereo rectified images</description>
|
||||
<maintainer email="pulipakaa24@outlook.com">sentry</maintainer>
|
||||
<license>TODO: License declaration</license>
|
||||
|
||||
<depend>rclpy</depend>
|
||||
<depend>sensor_msgs</depend>
|
||||
<depend>cv_bridge</depend>
|
||||
<depend>message_filters</depend>
|
||||
<depend>visualization_msgs</depend>
|
||||
<depend>geometry_msgs</depend>
|
||||
|
||||
<test_depend>ament_copyright</test_depend>
|
||||
<test_depend>ament_flake8</test_depend>
|
||||
<test_depend>ament_pep257</test_depend>
|
||||
<test_depend>python3-pytest</test_depend>
|
||||
|
||||
<export>
|
||||
<build_type>ament_python</build_type>
|
||||
</export>
|
||||
</package>
|
||||
0
tracking_re_id/resource/keypoint_pose
Normal file
0
tracking_re_id/resource/keypoint_pose
Normal file
0
tracking_re_id/resource/tracking_re_id
Normal file
0
tracking_re_id/resource/tracking_re_id
Normal file
6
tracking_re_id/setup.cfg
Normal file
6
tracking_re_id/setup.cfg
Normal file
@@ -0,0 +1,6 @@
|
||||
[develop]
|
||||
script_dir=$base/lib/3D_tracking_Re-ID
|
||||
[install]
|
||||
install_scripts=$base/lib/3D_tracking_Re-ID
|
||||
[build_scripts]
|
||||
executable=/usr/bin/env python3
|
||||
34
tracking_re_id/setup.py
Normal file
34
tracking_re_id/setup.py
Normal file
@@ -0,0 +1,34 @@
|
||||
from setuptools import find_packages, setup
|
||||
import os
|
||||
from glob import glob
|
||||
|
||||
package_name = 'tracking_re_id'
|
||||
|
||||
setup(
|
||||
name=package_name,
|
||||
version='0.0.0',
|
||||
packages=find_packages(exclude=['test']),
|
||||
data_files=[
|
||||
('share/ament_index/resource_index/packages',
|
||||
['resource/' + package_name]),
|
||||
('share/' + package_name, ['package.xml']),
|
||||
(os.path.join('share', package_name, 'launch'), glob('launch/*.py')),
|
||||
],
|
||||
install_requires=['setuptools'],
|
||||
zip_safe=True,
|
||||
maintainer='sentry',
|
||||
maintainer_email='pulipakaa24@outlook.com',
|
||||
description='MMPose keypoint detection on stereo rectified images',
|
||||
license='TODO: License declaration',
|
||||
extras_require={
|
||||
'test': [
|
||||
'pytest',
|
||||
],
|
||||
},
|
||||
entry_points={
|
||||
'console_scripts': [
|
||||
'single_person_loc_node = tracking_re_id.single_person_loc_node:main',
|
||||
'ground_plane_node = tracking_re_id.ground_plane_node:main',
|
||||
],
|
||||
},
|
||||
)
|
||||
25
tracking_re_id/test/test_copyright.py
Normal file
25
tracking_re_id/test/test_copyright.py
Normal file
@@ -0,0 +1,25 @@
|
||||
# Copyright 2015 Open Source Robotics Foundation, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ament_copyright.main import main
|
||||
import pytest
|
||||
|
||||
|
||||
# Remove the `skip` decorator once the source file(s) have a copyright header
|
||||
@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.')
|
||||
@pytest.mark.copyright
|
||||
@pytest.mark.linter
|
||||
def test_copyright():
|
||||
rc = main(argv=['.', 'test'])
|
||||
assert rc == 0, 'Found errors'
|
||||
25
tracking_re_id/test/test_flake8.py
Normal file
25
tracking_re_id/test/test_flake8.py
Normal file
@@ -0,0 +1,25 @@
|
||||
# Copyright 2017 Open Source Robotics Foundation, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ament_flake8.main import main_with_errors
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.mark.flake8
|
||||
@pytest.mark.linter
|
||||
def test_flake8():
|
||||
rc, errors = main_with_errors(argv=[])
|
||||
assert rc == 0, \
|
||||
'Found %d code style errors / warnings:\n' % len(errors) + \
|
||||
'\n'.join(errors)
|
||||
23
tracking_re_id/test/test_pep257.py
Normal file
23
tracking_re_id/test/test_pep257.py
Normal file
@@ -0,0 +1,23 @@
|
||||
# Copyright 2015 Open Source Robotics Foundation, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ament_pep257.main import main
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.mark.linter
|
||||
@pytest.mark.pep257
|
||||
def test_pep257():
|
||||
rc = main(argv=['.', 'test'])
|
||||
assert rc == 0, 'Found code style errors / warnings'
|
||||
0
tracking_re_id/tracking_re_id/__init__.py
Normal file
0
tracking_re_id/tracking_re_id/__init__.py
Normal file
426
tracking_re_id/tracking_re_id/ground_plane_node.py
Normal file
426
tracking_re_id/tracking_re_id/ground_plane_node.py
Normal file
@@ -0,0 +1,426 @@
|
||||
"""
|
||||
ROS 2 node: ground-plane estimator from stable foot keypoints.
|
||||
|
||||
Subscribes to /keypoints_3d (PointCloud2) published by
|
||||
single_person_loc_node. Detects locations where foot keypoints remain
|
||||
stable for several consecutive frames, registers those locations as
|
||||
ground-plane samples, and fits a least-squares plane once three or more
|
||||
non-collinear samples have been accumulated.
|
||||
|
||||
Published topics:
|
||||
/ground_plane_pose (geometry_msgs/PoseStamped) -- plane origin + orientation
|
||||
/ground_plane_markers (visualization_msgs/MarkerArray) -- RViz visualisation
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from sensor_msgs.msg import PointCloud2
|
||||
from sensor_msgs_py import point_cloud2 as pc2
|
||||
from geometry_msgs.msg import PoseStamped
|
||||
from visualization_msgs.msg import Marker, MarkerArray
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════════
|
||||
# TUNABLE PARAMETERS
|
||||
# All of these are also exposed as ROS parameters so they can be overridden
|
||||
# from a launch file or the command line without editing source.
|
||||
# ═══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
# Number of *consecutive* frames a foot keypoint must appear within
|
||||
# STABLE_RADIUS of the same cluster centroid before that centroid is
|
||||
# registered as a ground-plane sample.
|
||||
STABLE_FRAMES: int = 5
|
||||
|
||||
# Two foot positions are considered the "same location" if they lie within
|
||||
# this radius (metres) of the running cluster centroid.
|
||||
STABLE_RADIUS: float = 0.05
|
||||
|
||||
# A newly registered ground-plane sample is suppressed (treated as a
|
||||
# duplicate) if it lies within this distance (metres) of any already-
|
||||
# registered sample. Prevents the same standing spot from being added
|
||||
# repeatedly. Set to 0 to disable.
|
||||
DUPLICATE_RADIUS: float = 0
|
||||
|
||||
# Minimum perpendicular distance (metres) that at least one registered
|
||||
# sample must lie from the best-fit line through all current samples for
|
||||
# the point set to be considered non-collinear. Points are always recorded;
|
||||
# plane publication is gated on this condition being satisfied.
|
||||
COLLINEARITY_THRESHOLD: float = 0.25
|
||||
|
||||
# Maximum number of ground-plane samples retained for the least-squares fit.
|
||||
# When the limit is reached, the oldest sample is discarded (sliding window).
|
||||
MAX_GROUND_POINTS: int = 100
|
||||
|
||||
# Minimum number of registered ground-plane samples required before any
|
||||
# plane is published. Must be >= 3.
|
||||
MIN_PLANE_POINTS: int = 3
|
||||
|
||||
# COCO-17 keypoint indices treated as foot/ankle keypoints.
|
||||
# 15 = left_ankle, 16 = right_ankle
|
||||
FOOT_KP_INDICES: frozenset = frozenset({15, 16})
|
||||
|
||||
# Radius (metres) of the semi-transparent plane disc drawn in RViz.
|
||||
PLANE_VIS_RADIUS: float = 2.0
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════════
|
||||
# Helpers
|
||||
# ═══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
def _quat_from_z_to_normal(n: np.ndarray) -> np.ndarray:
|
||||
"""Return quaternion [x, y, z, w] that rotates (0,0,1) onto unit vector n."""
|
||||
z = np.array([0.0, 0.0, 1.0])
|
||||
n = n / np.linalg.norm(n)
|
||||
dot = float(np.clip(np.dot(z, n), -1.0, 1.0))
|
||||
|
||||
if dot > 0.9999:
|
||||
return np.array([0.0, 0.0, 0.0, 1.0])
|
||||
if dot < -0.9999:
|
||||
# 180° rotation around x-axis
|
||||
return np.array([1.0, 0.0, 0.0, 0.0])
|
||||
|
||||
axis = np.cross(z, n)
|
||||
axis /= np.linalg.norm(axis)
|
||||
half = np.arccos(dot) / 2.0
|
||||
s = np.sin(half)
|
||||
return np.array([axis[0] * s, axis[1] * s, axis[2] * s, np.cos(half)])
|
||||
|
||||
|
||||
def _fit_plane(pts: np.ndarray):
|
||||
"""
|
||||
Fit a plane to an (N, 3) array of points via PCA / SVD.
|
||||
|
||||
Returns
|
||||
-------
|
||||
centroid : np.ndarray (3,)
|
||||
normal : np.ndarray (3,) -- unit normal, z-component >= 0
|
||||
max_off_line : float -- max perpendicular distance of any point
|
||||
from the best-fit *line* through centroid;
|
||||
zero when N == 1.
|
||||
"""
|
||||
centroid = pts.mean(axis=0)
|
||||
if len(pts) < 2:
|
||||
return centroid, np.array([0.0, 0.0, 1.0]), 0.0
|
||||
|
||||
centered = pts - centroid
|
||||
_, _, Vt = np.linalg.svd(centered, full_matrices=False)
|
||||
|
||||
# Best-fit line direction (largest singular value)
|
||||
line_dir = Vt[0]
|
||||
proj = (centered @ line_dir)[:, None] * line_dir
|
||||
residuals = np.linalg.norm(centered - proj, axis=1)
|
||||
max_off_line = float(residuals.max())
|
||||
|
||||
# Plane normal (smallest singular value)
|
||||
normal = Vt[-1]
|
||||
if normal[2] < 0:
|
||||
normal = -normal
|
||||
|
||||
return centroid, normal, max_off_line
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════════
|
||||
# Node
|
||||
# ═══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class GroundPlaneNode(Node):
|
||||
"""Estimates a ground plane from stable foot-keypoint positions."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__('ground_plane_node')
|
||||
|
||||
# ── ROS parameters (fall back to module-level constants) ─────────
|
||||
self.declare_parameter('stable_frames', STABLE_FRAMES)
|
||||
self.declare_parameter('stable_radius', STABLE_RADIUS)
|
||||
self.declare_parameter('duplicate_radius', DUPLICATE_RADIUS)
|
||||
self.declare_parameter('collinearity_threshold', COLLINEARITY_THRESHOLD)
|
||||
self.declare_parameter('max_ground_points', MAX_GROUND_POINTS)
|
||||
self.declare_parameter('min_plane_points', MIN_PLANE_POINTS)
|
||||
|
||||
self._stable_frames = self.get_parameter('stable_frames').value
|
||||
self._stable_radius = self.get_parameter('stable_radius').value
|
||||
self._dup_radius = self.get_parameter('duplicate_radius').value
|
||||
self._collinear_thr = self.get_parameter('collinearity_threshold').value
|
||||
self._max_pts = self.get_parameter('max_ground_points').value
|
||||
self._min_pts = max(3, self.get_parameter('min_plane_points').value)
|
||||
|
||||
# ── State ────────────────────────────────────────────────────────
|
||||
# Active foot-position candidates, each a dict:
|
||||
# centroid (np.array 3) -- running mean of positions seen so far
|
||||
# _sum (np.array 3) -- accumulator for running mean
|
||||
# _n (int) -- total points in sum (for running mean)
|
||||
# streak (int) -- consecutive frames seen
|
||||
# seen (bool) -- updated during current frame
|
||||
# registered (bool) -- True once this candidate has fired;
|
||||
# prevents re-registering the same spot
|
||||
# while the foot stays put
|
||||
self._candidates: list[dict] = []
|
||||
|
||||
# Registered ground-plane samples (list of np.ndarray(3))
|
||||
self._ground_pts: list[np.ndarray] = []
|
||||
|
||||
# Current plane state
|
||||
self._plane_valid: bool = False
|
||||
self._plane_normal: np.ndarray | None = None
|
||||
self._plane_origin: np.ndarray | None = None
|
||||
|
||||
# ── ROS I/O ──────────────────────────────────────────────────────
|
||||
self.create_subscription(
|
||||
PointCloud2, '/keypoints_3d', self._kp_cb, 10)
|
||||
|
||||
self._pose_pub = self.create_publisher(
|
||||
PoseStamped, '/ground_plane_pose', 10)
|
||||
self._marker_pub = self.create_publisher(
|
||||
MarkerArray, '/ground_plane_markers', 10)
|
||||
|
||||
self.get_logger().info(
|
||||
f'Ground-plane node ready. '
|
||||
f'stable_frames={self._stable_frames} '
|
||||
f'stable_radius={self._stable_radius} m '
|
||||
f'collinearity_threshold={self._collinear_thr} m')
|
||||
|
||||
# ── Keypoint callback ────────────────────────────────────────────────
|
||||
|
||||
def _kp_cb(self, msg: PointCloud2):
|
||||
# Collect foot positions from this frame
|
||||
foot_positions: list[np.ndarray] = []
|
||||
for pt in pc2.read_points(
|
||||
msg, field_names=('x', 'y', 'z', 'kp_id'), skip_nans=True):
|
||||
if int(pt[3]) in FOOT_KP_INDICES:
|
||||
foot_positions.append(np.array([pt[0], pt[1], pt[2]],
|
||||
dtype=np.float64))
|
||||
|
||||
self._update_candidates(foot_positions)
|
||||
self._publish(msg.header.stamp)
|
||||
|
||||
# ── Candidate tracking ───────────────────────────────────────────────
|
||||
|
||||
def _update_candidates(self, foot_positions: list[np.ndarray]):
|
||||
# Reset "seen" flag for all candidates at the start of each frame
|
||||
for c in self._candidates:
|
||||
c['seen'] = False
|
||||
|
||||
for pos in foot_positions:
|
||||
# Associate pos with the nearest candidate within stable_radius
|
||||
best_i, best_d = -1, float('inf')
|
||||
for i, c in enumerate(self._candidates):
|
||||
d = float(np.linalg.norm(pos - c['centroid']))
|
||||
if d < best_d:
|
||||
best_d, best_i = d, i
|
||||
|
||||
if best_i >= 0 and best_d < self._stable_radius:
|
||||
c = self._candidates[best_i]
|
||||
c['_sum'] += pos
|
||||
c['_n'] += 1
|
||||
c['centroid'] = c['_sum'] / c['_n']
|
||||
c['streak'] += 1
|
||||
c['seen'] = True
|
||||
else:
|
||||
self._candidates.append({
|
||||
'centroid': pos.copy(),
|
||||
'_sum': pos.copy(),
|
||||
'_n': 1,
|
||||
'streak': 1,
|
||||
'seen': True,
|
||||
'registered': False,
|
||||
})
|
||||
|
||||
# Process matured / stale candidates (iterate in reverse for safe removal)
|
||||
for i in reversed(range(len(self._candidates))):
|
||||
c = self._candidates[i]
|
||||
if not c['seen']:
|
||||
# Foot left — discard candidate regardless of registration state
|
||||
del self._candidates[i]
|
||||
elif not c['registered'] and c['streak'] >= self._stable_frames:
|
||||
# Matured for the first time — register and keep alive
|
||||
self._try_register(c['centroid'])
|
||||
c['registered'] = True
|
||||
|
||||
# ── Ground-point registration ────────────────────────────────────────
|
||||
|
||||
def _try_register(self, pt: np.ndarray):
|
||||
"""Add pt as a ground-plane sample if it is not a near-duplicate."""
|
||||
if self._dup_radius > 0:
|
||||
for existing in self._ground_pts:
|
||||
if np.linalg.norm(pt - existing) < self._dup_radius:
|
||||
return # duplicate — discard
|
||||
|
||||
self.get_logger().info(
|
||||
f'Ground sample #{len(self._ground_pts) + 1}: '
|
||||
f'({pt[0]:.3f}, {pt[1]:.3f}, {pt[2]:.3f})')
|
||||
|
||||
self._ground_pts.append(pt.copy())
|
||||
if len(self._ground_pts) > self._max_pts:
|
||||
self._ground_pts.pop(0)
|
||||
|
||||
self._refit_plane()
|
||||
|
||||
# ── Plane fitting ────────────────────────────────────────────────────
|
||||
|
||||
def _refit_plane(self):
|
||||
n = len(self._ground_pts)
|
||||
if n < self._min_pts:
|
||||
self._plane_valid = False
|
||||
return
|
||||
|
||||
pts = np.array(self._ground_pts)
|
||||
centroid, normal, max_off_line = _fit_plane(pts)
|
||||
|
||||
if max_off_line < self._collinear_thr:
|
||||
self._plane_valid = False
|
||||
self.get_logger().info(
|
||||
f'{n} samples but all nearly collinear '
|
||||
f'(max off-line = {max_off_line:.4f} m, '
|
||||
f'threshold = {self._collinear_thr} m). '
|
||||
f'Plane not published until a non-collinear point is found.')
|
||||
return
|
||||
|
||||
# Move origin to the point on the new plane closest to the old origin.
|
||||
# If there is no prior origin, use the centroid.
|
||||
old_origin = self._plane_origin if self._plane_origin is not None else centroid
|
||||
t = float(np.dot(old_origin - centroid, normal))
|
||||
new_origin = old_origin - t * normal
|
||||
|
||||
self._plane_normal = normal
|
||||
self._plane_origin = new_origin
|
||||
self._plane_valid = True
|
||||
|
||||
self.get_logger().info(
|
||||
f'Plane (re)fit: normal=({normal[0]:.3f}, {normal[1]:.3f}, {normal[2]:.3f}) '
|
||||
f'origin=({new_origin[0]:.3f}, {new_origin[1]:.3f}, {new_origin[2]:.3f}) '
|
||||
f'n_samples={n} max_off_line={max_off_line:.4f} m')
|
||||
|
||||
# ── Publishing ───────────────────────────────────────────────────────
|
||||
|
||||
def _publish(self, stamp):
|
||||
ma = MarkerArray()
|
||||
|
||||
# Clear previous markers
|
||||
delete = Marker()
|
||||
delete.action = Marker.DELETEALL
|
||||
delete.header.frame_id = 'left'
|
||||
delete.header.stamp = stamp
|
||||
ma.markers.append(delete)
|
||||
|
||||
mid = 0
|
||||
|
||||
# Registered ground-plane samples — orange spheres
|
||||
for pt in self._ground_pts:
|
||||
m = Marker()
|
||||
m.header.frame_id = 'left'
|
||||
m.header.stamp = stamp
|
||||
m.ns = 'ground_samples'
|
||||
m.id = mid; mid += 1
|
||||
m.type = Marker.SPHERE
|
||||
m.action = Marker.ADD
|
||||
m.pose.position.x = float(pt[0])
|
||||
m.pose.position.y = float(pt[1])
|
||||
m.pose.position.z = float(pt[2])
|
||||
m.pose.orientation.w = 1.0
|
||||
m.scale.x = m.scale.y = m.scale.z = 0.08
|
||||
m.color.r = 1.0; m.color.g = 0.45; m.color.b = 0.0
|
||||
m.color.a = 1.0
|
||||
ma.markers.append(m)
|
||||
|
||||
# Active tracking candidates — dim yellow spheres whose alpha
|
||||
# grows as the streak approaches stable_frames
|
||||
for c in self._candidates:
|
||||
pt = c['centroid']
|
||||
alpha = float(min(c['streak'] / self._stable_frames, 1.0)) * 0.7
|
||||
m = Marker()
|
||||
m.header.frame_id = 'left'
|
||||
m.header.stamp = stamp
|
||||
m.ns = 'candidates'
|
||||
m.id = mid; mid += 1
|
||||
m.type = Marker.SPHERE
|
||||
m.action = Marker.ADD
|
||||
m.pose.position.x = float(pt[0])
|
||||
m.pose.position.y = float(pt[1])
|
||||
m.pose.position.z = float(pt[2])
|
||||
m.pose.orientation.w = 1.0
|
||||
m.scale.x = m.scale.y = m.scale.z = 0.05
|
||||
m.color.r = 0.9; m.color.g = 0.9; m.color.b = 0.0
|
||||
m.color.a = alpha
|
||||
ma.markers.append(m)
|
||||
|
||||
if self._plane_valid:
|
||||
n = self._plane_normal
|
||||
o = self._plane_origin
|
||||
q = _quat_from_z_to_normal(n)
|
||||
|
||||
# Semi-transparent cyan disc representing the plane
|
||||
disc = Marker()
|
||||
disc.header.frame_id = 'left'
|
||||
disc.header.stamp = stamp
|
||||
disc.ns = 'plane_disc'
|
||||
disc.id = mid; mid += 1
|
||||
disc.type = Marker.CYLINDER
|
||||
disc.action = Marker.ADD
|
||||
disc.pose.position.x = float(o[0])
|
||||
disc.pose.position.y = float(o[1])
|
||||
disc.pose.position.z = float(o[2])
|
||||
disc.pose.orientation.x = float(q[0])
|
||||
disc.pose.orientation.y = float(q[1])
|
||||
disc.pose.orientation.z = float(q[2])
|
||||
disc.pose.orientation.w = float(q[3])
|
||||
disc.scale.x = disc.scale.y = PLANE_VIS_RADIUS * 2.0
|
||||
disc.scale.z = 0.005
|
||||
disc.color.r = 0.0; disc.color.g = 0.75; disc.color.b = 1.0
|
||||
disc.color.a = 0.25
|
||||
ma.markers.append(disc)
|
||||
|
||||
# Green arrow showing the plane normal
|
||||
arrow = Marker()
|
||||
arrow.header.frame_id = 'left'
|
||||
arrow.header.stamp = stamp
|
||||
arrow.ns = 'plane_normal'
|
||||
arrow.id = mid; mid += 1
|
||||
arrow.type = Marker.ARROW
|
||||
arrow.action = Marker.ADD
|
||||
arrow.pose.position.x = float(o[0])
|
||||
arrow.pose.position.y = float(o[1])
|
||||
arrow.pose.position.z = float(o[2])
|
||||
arrow.pose.orientation.x = float(q[0])
|
||||
arrow.pose.orientation.y = float(q[1])
|
||||
arrow.pose.orientation.z = float(q[2])
|
||||
arrow.pose.orientation.w = float(q[3])
|
||||
arrow.scale.x = 0.4 # length
|
||||
arrow.scale.y = 0.03 # shaft diameter
|
||||
arrow.scale.z = 0.06 # head diameter
|
||||
arrow.color.r = 0.0; arrow.color.g = 1.0; arrow.color.b = 0.4
|
||||
arrow.color.a = 1.0
|
||||
ma.markers.append(arrow)
|
||||
|
||||
# PoseStamped — origin at plane centre, z-axis along normal
|
||||
ps = PoseStamped()
|
||||
ps.header.frame_id = 'left'
|
||||
ps.header.stamp = stamp
|
||||
ps.pose.position.x = float(o[0])
|
||||
ps.pose.position.y = float(o[1])
|
||||
ps.pose.position.z = float(o[2])
|
||||
ps.pose.orientation.x = float(q[0])
|
||||
ps.pose.orientation.y = float(q[1])
|
||||
ps.pose.orientation.z = float(q[2])
|
||||
ps.pose.orientation.w = float(q[3])
|
||||
self._pose_pub.publish(ps)
|
||||
|
||||
self._marker_pub.publish(ma)
|
||||
|
||||
|
||||
# ── Entry point ───────────────────────────────────────────────────────────────
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = GroundPlaneNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.try_shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
540
tracking_re_id/tracking_re_id/single_person_loc_node.py
Normal file
540
tracking_re_id/tracking_re_id/single_person_loc_node.py
Normal file
@@ -0,0 +1,540 @@
|
||||
"""
|
||||
ROS 2 node: stereo keypoint triangulation using MMPose.
|
||||
|
||||
Subscribes to raw stereo images, runs pose estimation on each,
|
||||
matches keypoints across cameras, and triangulates 3D positions.
|
||||
|
||||
Published topics:
|
||||
/keypoint_markers (visualization_msgs/MarkerArray) -- for RViz
|
||||
"""
|
||||
|
||||
import colorsys
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from sensor_msgs.msg import Image, CameraInfo, PointCloud2, PointField
|
||||
from std_msgs.msg import Header
|
||||
from visualization_msgs.msg import Marker, MarkerArray
|
||||
from geometry_msgs.msg import Point
|
||||
from cv_bridge import CvBridge
|
||||
from sensor_msgs_py import point_cloud2 as pc2
|
||||
from message_filters import Subscriber, ApproximateTimeSynchronizer
|
||||
from mmpose.apis import MMPoseInferencer
|
||||
|
||||
|
||||
# ── COCO-17 constants ───────────────────────────────────────────────────
|
||||
|
||||
SKELETON = [
|
||||
(0, 1), (0, 2), # nose -> eyes
|
||||
(1, 3), (2, 4), # eyes -> ears
|
||||
(5, 6), # shoulders
|
||||
(5, 7), (7, 9), # left arm
|
||||
(6, 8), (8, 10), # right arm
|
||||
(5, 11), (6, 12), # shoulders -> hips
|
||||
(11, 12), # hips
|
||||
(11, 13), (13, 15), # left leg
|
||||
(12, 14), (14, 16), # right leg
|
||||
]
|
||||
|
||||
KEYPOINT_NAMES = [
|
||||
'nose', 'left_eye', 'right_eye', 'left_ear', 'right_ear',
|
||||
'left_shoulder', 'right_shoulder', 'left_elbow', 'right_elbow',
|
||||
'left_wrist', 'right_wrist', 'left_hip', 'right_hip',
|
||||
'left_knee', 'right_knee', 'left_ankle', 'right_ankle',
|
||||
]
|
||||
|
||||
# Minimum confidence in BOTH camera feeds to include a keypoint in distance calc
|
||||
DIST_THRESHOLD = 0.75
|
||||
|
||||
|
||||
def draw_keypoints(frame, keypoints, keypoint_scores, threshold=0.3):
|
||||
"""Draw COCO 17-keypoint skeleton on *frame* (in-place)."""
|
||||
h, w = frame.shape[:2]
|
||||
|
||||
for person_kps, person_scores in zip(keypoints, keypoint_scores):
|
||||
for j1, j2 in SKELETON:
|
||||
if person_scores[j1] > threshold and person_scores[j2] > threshold:
|
||||
pt1 = (int(person_kps[j1][0]), int(person_kps[j1][1]))
|
||||
pt2 = (int(person_kps[j2][0]), int(person_kps[j2][1]))
|
||||
if (0 <= pt1[0] < w and 0 <= pt1[1] < h and
|
||||
0 <= pt2[0] < w and 0 <= pt2[1] < h):
|
||||
cv2.line(frame, pt1, pt2, (0, 255, 0), 2)
|
||||
|
||||
for kp, score in zip(person_kps, person_scores):
|
||||
if score > threshold:
|
||||
x, y = int(kp[0]), int(kp[1])
|
||||
if 0 <= x < w and 0 <= y < h:
|
||||
color = (0, 255, 0) if score > 0.7 else (0, 200, 200)
|
||||
cv2.circle(frame, (x, y), 4, color, -1)
|
||||
cv2.circle(frame, (x, y), 5, (255, 255, 255), 1)
|
||||
|
||||
return frame
|
||||
|
||||
|
||||
# ── ROS 2 Node ───────────────────────────────────────────────────────────
|
||||
|
||||
class KeypointTriangulationNode(Node):
|
||||
"""Stereo keypoint triangulation with MMPose."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__('single_person_loc_node')
|
||||
|
||||
# ── Parameters ──────────────────────────────────────────────────
|
||||
self.declare_parameter('threshold', 0.3)
|
||||
self.declare_parameter('device', 'cuda:0')
|
||||
self.declare_parameter('max_residual', 0.10) # metres
|
||||
self.declare_parameter('headless', False)
|
||||
|
||||
self._threshold = self.get_parameter('threshold').value
|
||||
self._max_residual = self.get_parameter('max_residual').value
|
||||
device = self.get_parameter('device').value
|
||||
self._headless = self.get_parameter('headless').value
|
||||
|
||||
# ── MMPose (single shared instance) ─────────────────────────────
|
||||
self.get_logger().info(f'Loading MMPose on {device} ...')
|
||||
self._inferencer = MMPoseInferencer(pose2d='human', device=device)
|
||||
self.get_logger().info('MMPose loaded.')
|
||||
|
||||
# ── cv_bridge ───────────────────────────────────────────────────
|
||||
self._bridge = CvBridge()
|
||||
|
||||
# ── Calibration (filled from camera_info subscribers) ───────────
|
||||
self._calib = {'left': None, 'right': None}
|
||||
self._stereo = None # computed once both camera_infos arrive
|
||||
|
||||
self.create_subscription(
|
||||
CameraInfo, '/stereo/left/camera_info',
|
||||
lambda m: self._info_cb(m, 'left'), 10)
|
||||
self.create_subscription(
|
||||
CameraInfo, '/stereo/right/camera_info',
|
||||
lambda m: self._info_cb(m, 'right'), 10)
|
||||
|
||||
# ── Time-synced image subscribers ───────────────────────────────
|
||||
left_sub = Subscriber(self, Image, '/stereo/left/image_raw')
|
||||
right_sub = Subscriber(self, Image, '/stereo/right/image_raw')
|
||||
self._sync = ApproximateTimeSynchronizer(
|
||||
[left_sub, right_sub], queue_size=5, slop=0.05)
|
||||
self._sync.registerCallback(self._synced_cb)
|
||||
|
||||
# ── Publishers ──────────────────────────────────────────────────
|
||||
self._marker_pub = self.create_publisher(
|
||||
MarkerArray, '/keypoint_markers', 10)
|
||||
self._kp3d_pub = self.create_publisher(
|
||||
PointCloud2, '/keypoints_3d', 10)
|
||||
|
||||
# PointCloud2 field layout: x, y, z, person_id, kp_id (all float32)
|
||||
self._pc2_fields = [
|
||||
PointField(name='x', offset=0, datatype=PointField.FLOAT32, count=1),
|
||||
PointField(name='y', offset=4, datatype=PointField.FLOAT32, count=1),
|
||||
PointField(name='z', offset=8, datatype=PointField.FLOAT32, count=1),
|
||||
PointField(name='person_id', offset=12, datatype=PointField.FLOAT32, count=1),
|
||||
PointField(name='kp_id', offset=16, datatype=PointField.FLOAT32, count=1),
|
||||
]
|
||||
|
||||
# ── Display state ───────────────────────────────────────────────
|
||||
if not self._headless:
|
||||
self._left_display = None
|
||||
self._right_display = None
|
||||
self._dist_display = None
|
||||
self.get_logger().info(
|
||||
f'Distance window uses keypoints with >= {DIST_THRESHOLD*100:.0f}% '
|
||||
'confidence in both camera feeds.')
|
||||
self.create_timer(1.0 / 30.0, self._display_timer_cb)
|
||||
else:
|
||||
self.get_logger().info('Running in headless mode — no display windows.')
|
||||
|
||||
self.get_logger().info(
|
||||
'Waiting for camera_info and synced image_raw from '
|
||||
'/stereo/left and /stereo/right ...')
|
||||
|
||||
# ── Camera info / stereo geometry ───────────────────────────────────
|
||||
|
||||
def _info_cb(self, msg: CameraInfo, side: str):
|
||||
if self._calib[side] is not None:
|
||||
return
|
||||
self._calib[side] = {
|
||||
'K': np.array(msg.k, dtype=np.float64).reshape(3, 3),
|
||||
'D': np.array(msg.d, dtype=np.float64),
|
||||
'R_rect': np.array(msg.r, dtype=np.float64).reshape(3, 3),
|
||||
'P': np.array(msg.p, dtype=np.float64).reshape(3, 4),
|
||||
}
|
||||
self.get_logger().info(
|
||||
f'{side} camera_info received ({msg.width}x{msg.height})')
|
||||
self._try_compute_stereo()
|
||||
|
||||
def _try_compute_stereo(self):
|
||||
if self._calib['left'] is None or self._calib['right'] is None:
|
||||
return
|
||||
if self._stereo is not None:
|
||||
return
|
||||
|
||||
R1 = self._calib['left']['R_rect']
|
||||
R2 = self._calib['right']['R_rect']
|
||||
P_R = self._calib['right']['P']
|
||||
|
||||
# Baseline: P_right[0,3] = fx_rect * tx (tx is negative)
|
||||
tx = P_R[0, 3] / P_R[0, 0]
|
||||
|
||||
# Right camera origin in left (world) frame:
|
||||
# O_right = -R1^T @ [tx, 0, 0]
|
||||
O_right = -R1.T @ np.array([tx, 0.0, 0.0])
|
||||
|
||||
# Rotation that transforms a direction vector from the right
|
||||
# camera frame into the left (world) frame:
|
||||
# d_world = R_right_to_world @ d_right_cam
|
||||
R_right_to_world = R1.T @ R2
|
||||
|
||||
self._stereo = {
|
||||
'O_right': O_right,
|
||||
'R_right_to_world': R_right_to_world,
|
||||
}
|
||||
|
||||
baseline = np.linalg.norm(O_right)
|
||||
self.get_logger().info(
|
||||
f'Stereo geometry ready. Baseline = {baseline:.4f} m '
|
||||
f'O_right = {O_right.round(4).tolist()}')
|
||||
|
||||
# ── Pixel -> ray ────────────────────────────────────────────────────
|
||||
|
||||
def _pixel_to_ray(self, u, v, side):
|
||||
"""Undistort pixel (u,v) and return a unit ray in the world frame."""
|
||||
K = self._calib[side]['K']
|
||||
D = self._calib[side]['D']
|
||||
|
||||
# cv2.undistortPoints returns normalised coordinates (K^-1 applied
|
||||
# and distortion corrected).
|
||||
pts = np.array([[[u, v]]], dtype=np.float64)
|
||||
norm = cv2.undistortPoints(pts, K, D)
|
||||
d_cam = np.array([norm[0, 0, 0], norm[0, 0, 1], 1.0])
|
||||
|
||||
if side == 'right':
|
||||
d_world = self._stereo['R_right_to_world'] @ d_cam
|
||||
else:
|
||||
d_world = d_cam # left camera IS the world frame
|
||||
|
||||
return d_world / np.linalg.norm(d_world)
|
||||
|
||||
# ── Triangulation (closest point of approach) ───────────────────────
|
||||
|
||||
def _triangulate(self, d1, d2):
|
||||
"""
|
||||
Given two unit rays from O_left and O_right, find the 3D midpoint
|
||||
of closest approach and the residual (ray separation).
|
||||
|
||||
Returns (point_3d, residual) or (None, inf) if rays are parallel.
|
||||
"""
|
||||
O1 = np.zeros(3)
|
||||
O2 = self._stereo['O_right']
|
||||
b = O2 - O1 # baseline vector
|
||||
|
||||
# Solve s*d1 - t*d2 = b in the least-squares sense:
|
||||
# | d1.d1 -d1.d2 | |s| |d1.b|
|
||||
# | d1.d2 -d2.d2 | |t| = |d2.b|
|
||||
d1d1 = d1 @ d1
|
||||
d2d2 = d2 @ d2
|
||||
d1d2 = d1 @ d2
|
||||
|
||||
denom = d1d1 * d2d2 - d1d2 * d1d2
|
||||
if abs(denom) < 1e-12:
|
||||
return None, float('inf')
|
||||
|
||||
d1b = d1 @ b
|
||||
d2b = d2 @ b
|
||||
|
||||
s = (d1b * d2d2 - d2b * d1d2) / denom
|
||||
t = (d1b * d1d2 - d2b * d1d1) / denom
|
||||
|
||||
p1 = O1 + s * d1
|
||||
p2 = O2 + t * d2
|
||||
|
||||
return (p1 + p2) / 2.0, np.linalg.norm(p1 - p2)
|
||||
|
||||
# ── MMPose helper ───────────────────────────────────────────────────
|
||||
|
||||
def _run_mmpose(self, frame):
|
||||
"""Return list of dicts with 'keypoints' (N,2) and 'scores' (N,)."""
|
||||
result = next(self._inferencer(
|
||||
frame, show=False, return_datasamples=False))
|
||||
|
||||
people = []
|
||||
for pred in result.get('predictions', [[]])[0]:
|
||||
kps = pred.get('keypoints', [])
|
||||
scores = pred.get('keypoint_scores', [])
|
||||
if len(kps) > 0:
|
||||
people.append({
|
||||
'keypoints': np.array(kps),
|
||||
'scores': np.array(scores),
|
||||
})
|
||||
return people
|
||||
|
||||
# ── Person matching across cameras ──────────────────────────────────
|
||||
|
||||
def _match_people(self, left_people, right_people):
|
||||
"""
|
||||
Match people across stereo views using vertical-coordinate
|
||||
similarity (epipolar heuristic: same person has similar y in
|
||||
a roughly-rectified stereo pair).
|
||||
|
||||
Returns list of (left_person, right_person) tuples.
|
||||
"""
|
||||
if not left_people or not right_people:
|
||||
return []
|
||||
|
||||
thr = self._threshold
|
||||
n_l, n_r = len(left_people), len(right_people)
|
||||
|
||||
# Cost matrix: mean |y-difference| for mutually confident keypoints
|
||||
cost = np.full((n_l, n_r), np.inf)
|
||||
for i, lp in enumerate(left_people):
|
||||
for j, rp in enumerate(right_people):
|
||||
mask = (lp['scores'] > thr) & (rp['scores'] > thr)
|
||||
if mask.sum() < 3:
|
||||
continue
|
||||
cost[i, j] = np.abs(
|
||||
lp['keypoints'][mask, 1] - rp['keypoints'][mask, 1]
|
||||
).mean()
|
||||
|
||||
# Greedy matching by lowest cost
|
||||
matches = []
|
||||
used_l, used_r = set(), set()
|
||||
while True:
|
||||
remaining = np.full_like(cost, np.inf)
|
||||
for i in range(n_l):
|
||||
for j in range(n_r):
|
||||
if i not in used_l and j not in used_r:
|
||||
remaining[i, j] = cost[i, j]
|
||||
idx = np.unravel_index(remaining.argmin(), remaining.shape)
|
||||
if remaining[idx] > 100.0 or np.isinf(remaining[idx]):
|
||||
break
|
||||
matches.append((left_people[idx[0]], right_people[idx[1]]))
|
||||
used_l.add(idx[0])
|
||||
used_r.add(idx[1])
|
||||
|
||||
return matches
|
||||
|
||||
# ── Marker builder ──────────────────────────────────────────────────
|
||||
|
||||
def _build_markers(self, all_points_3d, stamp):
|
||||
ma = MarkerArray()
|
||||
|
||||
# Clear previous frame's markers
|
||||
delete = Marker()
|
||||
delete.action = Marker.DELETEALL
|
||||
delete.header.frame_id = 'left'
|
||||
delete.header.stamp = stamp
|
||||
ma.markers.append(delete)
|
||||
|
||||
mid = 0
|
||||
for person_idx, person_pts in enumerate(all_points_3d):
|
||||
r, g, b_ = colorsys.hsv_to_rgb((person_idx * 0.3) % 1.0, 1.0, 1.0)
|
||||
|
||||
# Sphere per keypoint
|
||||
for kp_idx, (pt, _res) in person_pts.items():
|
||||
m = Marker()
|
||||
m.header.frame_id = 'left'
|
||||
m.header.stamp = stamp
|
||||
m.ns = f'person_{person_idx}'
|
||||
m.id = mid; mid += 1
|
||||
m.type = Marker.SPHERE
|
||||
m.action = Marker.ADD
|
||||
m.pose.position.x = float(pt[0])
|
||||
m.pose.position.y = float(pt[1])
|
||||
m.pose.position.z = float(pt[2])
|
||||
m.pose.orientation.w = 1.0
|
||||
m.scale.x = m.scale.y = m.scale.z = 0.05
|
||||
m.color.r, m.color.g, m.color.b = r, g, b_
|
||||
m.color.a = 1.0
|
||||
m.lifetime.nanosec = 300_000_000
|
||||
ma.markers.append(m)
|
||||
|
||||
# Skeleton lines
|
||||
for j1, j2 in SKELETON:
|
||||
if j1 in person_pts and j2 in person_pts:
|
||||
m = Marker()
|
||||
m.header.frame_id = 'left'
|
||||
m.header.stamp = stamp
|
||||
m.ns = f'skel_{person_idx}'
|
||||
m.id = mid; mid += 1
|
||||
m.type = Marker.LINE_STRIP
|
||||
m.action = Marker.ADD
|
||||
m.points = [
|
||||
Point(x=float(person_pts[j1][0][0]),
|
||||
y=float(person_pts[j1][0][1]),
|
||||
z=float(person_pts[j1][0][2])),
|
||||
Point(x=float(person_pts[j2][0][0]),
|
||||
y=float(person_pts[j2][0][1]),
|
||||
z=float(person_pts[j2][0][2])),
|
||||
]
|
||||
m.scale.x = 0.02
|
||||
m.color.r, m.color.g, m.color.b = r, g, b_
|
||||
m.color.a = 0.8
|
||||
m.lifetime.nanosec = 300_000_000
|
||||
ma.markers.append(m)
|
||||
|
||||
return ma
|
||||
|
||||
# ── Main synced callback ────────────────────────────────────────────
|
||||
|
||||
def _synced_cb(self, left_msg: Image, right_msg: Image):
|
||||
if self._stereo is None:
|
||||
return # waiting for camera_info
|
||||
|
||||
# Convert images (handles Bayer -> BGR automatically)
|
||||
left_frame = self._bridge.imgmsg_to_cv2(
|
||||
left_msg, desired_encoding='bgr8')
|
||||
right_frame = self._bridge.imgmsg_to_cv2(
|
||||
right_msg, desired_encoding='bgr8')
|
||||
|
||||
# Run MMPose on both views
|
||||
left_people = self._run_mmpose(left_frame)
|
||||
right_people = self._run_mmpose(right_frame)
|
||||
|
||||
# Match people across cameras
|
||||
matches = self._match_people(left_people, right_people)
|
||||
|
||||
# Triangulate every mutually-confident keypoint
|
||||
all_points_3d = []
|
||||
avg_distances = [] # per-person average distance (75%+ conf kps only)
|
||||
avg_coords = [] # per-person mean (x, y, z) of 75%+ conf kps
|
||||
for lp, rp in matches:
|
||||
person_pts = {} # kp_idx -> (xyz, residual)
|
||||
high_conf_dists = []
|
||||
high_conf_pts = []
|
||||
for kp_idx in range(17):
|
||||
if (lp['scores'][kp_idx] <= self._threshold or
|
||||
rp['scores'][kp_idx] <= self._threshold):
|
||||
continue
|
||||
|
||||
u_l, v_l = lp['keypoints'][kp_idx]
|
||||
u_r, v_r = rp['keypoints'][kp_idx]
|
||||
|
||||
d1 = self._pixel_to_ray(u_l, v_l, 'left')
|
||||
d2 = self._pixel_to_ray(u_r, v_r, 'right')
|
||||
|
||||
pt3d, residual = self._triangulate(d1, d2)
|
||||
if pt3d is not None and residual < self._max_residual:
|
||||
person_pts[kp_idx] = (pt3d, residual)
|
||||
# Only count toward distance if both scores >= DIST_THRESHOLD
|
||||
if (lp['scores'][kp_idx] >= DIST_THRESHOLD and
|
||||
rp['scores'][kp_idx] >= DIST_THRESHOLD):
|
||||
high_conf_dists.append(np.linalg.norm(pt3d))
|
||||
high_conf_pts.append(pt3d)
|
||||
|
||||
if person_pts:
|
||||
all_points_3d.append(person_pts)
|
||||
avg_distances.append(
|
||||
float(np.mean(high_conf_dists)) if high_conf_dists else None)
|
||||
avg_coords.append(
|
||||
np.mean(high_conf_pts, axis=0) if high_conf_pts else None)
|
||||
|
||||
# Publish 3D markers
|
||||
self._marker_pub.publish(
|
||||
self._build_markers(all_points_3d, left_msg.header.stamp))
|
||||
|
||||
# Publish structured keypoints for downstream nodes (e.g. ground plane)
|
||||
kp_points = [
|
||||
(float(pt[0]), float(pt[1]), float(pt[2]),
|
||||
float(person_idx), float(kp_idx))
|
||||
for person_idx, person_pts in enumerate(all_points_3d)
|
||||
for kp_idx, (pt, _) in person_pts.items()
|
||||
]
|
||||
if kp_points:
|
||||
h = Header()
|
||||
h.stamp = left_msg.header.stamp
|
||||
h.frame_id = 'left'
|
||||
self._kp3d_pub.publish(
|
||||
pc2.create_cloud(h, self._pc2_fields, kp_points))
|
||||
|
||||
if not self._headless:
|
||||
# Build distance display window
|
||||
row_h = 220
|
||||
frame_h = max(400, 140 + row_h * max(len(avg_distances), 1))
|
||||
dist_frame = np.zeros((frame_h, 1600, 3), dtype=np.uint8)
|
||||
cv2.putText(dist_frame, 'Avg distance (>=75% conf keypoints)',
|
||||
(20, 70), cv2.FONT_HERSHEY_SIMPLEX, 2.0, (180, 180, 180), 3)
|
||||
if avg_distances:
|
||||
for i, (d, xyz) in enumerate(zip(avg_distances, avg_coords)):
|
||||
if d is not None:
|
||||
txt = f'Person {i + 1}: {d:.2f} m'
|
||||
color = (100, 255, 100)
|
||||
coord_txt = f'x={xyz[0]:+.3f} y={xyz[1]:+.3f} z={xyz[2]:+.3f} [m, left cam frame]'
|
||||
coord_color = (160, 160, 160)
|
||||
else:
|
||||
txt = f'Person {i + 1}: -- (no 75%+ kps)'
|
||||
color = (80, 80, 200)
|
||||
coord_txt = ''
|
||||
coord_color = (80, 80, 80)
|
||||
cv2.putText(dist_frame, txt, (20, 140 + i * row_h),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 4.5, color, 8)
|
||||
if coord_txt:
|
||||
cv2.putText(dist_frame, coord_txt, (28, 140 + i * row_h + 70),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 1.4, coord_color, 2)
|
||||
else:
|
||||
cv2.putText(dist_frame, 'No people detected', (20, 200),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 4.0, (100, 100, 100), 6)
|
||||
self._dist_display = dist_frame
|
||||
|
||||
# Draw 2D keypoints on display frames
|
||||
if left_people:
|
||||
draw_keypoints(
|
||||
left_frame,
|
||||
[p['keypoints'] for p in left_people],
|
||||
[p['scores'] for p in left_people],
|
||||
self._threshold)
|
||||
if right_people:
|
||||
draw_keypoints(
|
||||
right_frame,
|
||||
[p['keypoints'] for p in right_people],
|
||||
[p['scores'] for p in right_people],
|
||||
self._threshold)
|
||||
|
||||
n_3d = sum(len(p) for p in all_points_3d)
|
||||
cv2.putText(
|
||||
left_frame,
|
||||
f'People: {len(left_people)} 3D pts: {n_3d}',
|
||||
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
|
||||
cv2.putText(
|
||||
right_frame,
|
||||
f'People: {len(right_people)} Matched: {len(matches)}',
|
||||
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
|
||||
|
||||
self._left_display = left_frame
|
||||
self._right_display = right_frame
|
||||
|
||||
# ── Display timer ───────────────────────────────────────────────────
|
||||
|
||||
def _display_timer_cb(self):
|
||||
if self._left_display is not None:
|
||||
cv2.imshow('Left - Keypoints', self._left_display)
|
||||
if self._right_display is not None:
|
||||
cv2.imshow('Right - Keypoints', self._right_display)
|
||||
if self._dist_display is not None:
|
||||
cv2.imshow('Distance (75%+ confidence)', self._dist_display)
|
||||
|
||||
key = cv2.waitKey(1) & 0xFF
|
||||
if key == ord('q'):
|
||||
self.get_logger().info('Quit requested.')
|
||||
self.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
# ── Entry point ──────────────────────────────────────────────────────────
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = KeypointTriangulationNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
if not node._headless:
|
||||
cv2.destroyAllWindows()
|
||||
node.destroy_node()
|
||||
rclpy.try_shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user