This commit is contained in:
Aditya Pulipaka
2026-03-16 13:14:38 -05:00
parent 378512b8a4
commit 66dd16ddc7
5 changed files with 423 additions and 0 deletions

View File

@@ -0,0 +1,72 @@
"""Launch the full 3D tracking + ground-plane estimation + top-down view pipeline.
Nodes started:
1. single_person_loc_node -- headless stereo keypoint triangulator
publishes: /keypoint_markers (MarkerArray)
/keypoints_3d (PointCloud2)
2. ground_plane_node -- ground-plane estimator
publishes: /ground_plane_markers (MarkerArray)
/ground_plane_pose (PoseStamped)
3. top_down_node -- synthetic bird's-eye view of the ground plane
publishes: /top_down_image (Image)
To view the top-down image:
ros2 run rqt_image_view rqt_image_view → select /top_down_image
"""
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
from _conda_utils import find_conda_python # noqa: E402
from launch import LaunchDescription
from launch.actions import ExecuteProcess
def generate_launch_description():
python_exe = find_conda_python('mmpose')
return LaunchDescription([
# ── 1. Keypoint triangulator (headless) ──────────────────────────────
ExecuteProcess(
cmd=[
python_exe, '-m', 'tracking_re_id.single_person_loc_node',
'--ros-args',
'-p', 'threshold:=0.3',
'-p', 'device:=cuda:0',
'-p', 'max_residual:=0.10',
'-p', 'headless:=true',
],
output='screen',
env={**os.environ},
),
# ── 2. Ground-plane estimator ─────────────────────────────────────────
ExecuteProcess(
cmd=[
python_exe, '-m', 'tracking_re_id.ground_plane_node',
'--ros-args',
'-p', 'stable_frames:=5',
'-p', 'stable_radius:=0.05',
'-p', 'duplicate_radius:=0',
'-p', 'collinearity_threshold:=0.25',
'-p', 'max_ground_points:=100',
'-p', 'min_plane_points:=5',
],
output='screen',
env={**os.environ},
),
# ── 3. Top-down bird's-eye visualiser ────────────────────────────────
ExecuteProcess(
cmd=[
python_exe, '-m', 'tracking_re_id.top_down_node',
'--ros-args',
],
output='screen',
env={**os.environ},
),
])

View File

@@ -32,6 +32,7 @@ setup(
'single_person_loc_node = tracking_re_id.single_person_loc_node:main',
'ground_plane_node = tracking_re_id.ground_plane_node:main',
'overlay_node = tracking_re_id.overlay_node:main',
'top_down_node = tracking_re_id.top_down_node:main',
'reid_node = tracking_re_id.reid_node:main',
],
},

View File

@@ -0,0 +1,350 @@
"""
ROS 2 node: top_down_node
Renders a synthetic bird's-eye (top-down) 2-D view of the detected ground
plane. Each detected person is shown as a filled circle (centred on their
ankle midpoint) with a facing arrow. The facing direction is derived by
projecting the 3-D vector from the shoulder midpoint to the nose onto the
ground plane — no cross-product or disambiguation required.
Before the ground plane is established the window shows "Waiting for plane…".
Subscriptions
─────────────
/keypoints_3d sensor_msgs/PointCloud2 (from single_person_loc_node)
/ground_plane_pose geometry_msgs/PoseStamped (from ground_plane_node)
Publications
────────────
/top_down_image sensor_msgs/Image (BGR8, fixed-size canvas)
"""
import numpy as np
import cv2
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image, PointCloud2
from geometry_msgs.msg import PoseStamped
from sensor_msgs_py import point_cloud2 as pc2
from cv_bridge import CvBridge
# ═══════════════════════════════════════════════════════════════════════════════
# TUNABLE PARAMETERS
# ═══════════════════════════════════════════════════════════════════════════════
# Canvas side length in pixels (square output image).
CANVAS_SIZE: int = 800
# Physical half-extent of the canvas from the plane origin (metres).
# A value of 4.0 yields an 8 m × 8 m visible area.
CANVAS_HALF_M: float = 4.0
# Derived: pixels per metre.
SCALE: float = CANVAS_SIZE / (2.0 * CANVAS_HALF_M)
# Canvas background colour (B, G, R).
BG_COLOR: tuple = (30, 30, 30)
# Regular grid line colour (B, G, R).
GRID_COLOR: tuple = (70, 70, 70)
# Grid line spacing in metres.
GRID_SPACING: float = 0.5
# Axis (u=0, v=0) line colour (B, G, R).
AXIS_COLOR: tuple = (0, 180, 0)
# Radius of the per-person presence circle in metres.
PERSON_RADIUS_M: float = 0.20
# Fill alpha of the presence circle (0.01.0).
CIRCLE_ALPHA: float = 0.45
# Thickness of the presence circle outline in pixels.
CIRCLE_OUTLINE_PX: int = 2
# Length of the facing arrow beyond the circle edge, in metres.
ARROW_LENGTH_M: float = 0.7
# Fraction of arrow length used for the arrowhead.
ARROW_TIP_FRACTION: float = 0.30
# Publish rate of the top-down image in Hz.
PUBLISH_HZ: float = 15.0
# Per-person colours, cycled by person_id (B, G, R).
PERSON_COLORS: list = [
(0, 230, 0), # green
(0, 100, 255), # orange
(255, 50, 200), # magenta
(0, 200, 255), # yellow
]
# ═══════════════════════════════════════════════════════════════════════════════
# Helpers
# ═══════════════════════════════════════════════════════════════════════════════
def _quat_to_rot(qx: float, qy: float, qz: float, qw: float) -> np.ndarray:
"""Return the 3×3 rotation matrix for unit quaternion (x, y, z, w)."""
return np.array([
[1 - 2*(qy*qy + qz*qz), 2*(qx*qy - qz*qw), 2*(qx*qz + qy*qw)],
[ 2*(qx*qy + qz*qw), 1 - 2*(qx*qx + qz*qz), 2*(qy*qz - qx*qw)],
[ 2*(qx*qz - qy*qw), 2*(qy*qz + qx*qw), 1 - 2*(qx*qx + qy*qy)],
], dtype=np.float64)
def _to_plane_uv(pt3d: np.ndarray,
origin: np.ndarray,
u_axis: np.ndarray,
v_axis: np.ndarray) -> np.ndarray:
"""Project a 3-D camera-frame point onto the plane, returning (u, v)."""
d = pt3d - origin
return np.array([np.dot(d, u_axis), np.dot(d, v_axis)], dtype=np.float64)
def _to_pixel(uv: np.ndarray) -> tuple[int, int]:
"""
Map plane coordinates (u, v) to integer canvas pixel (x, y).
u increases to the right; v is flipped so that "further from camera"
appears upward on the canvas.
"""
px = int(round( uv[0] * SCALE + CANVAS_SIZE * 0.5))
py = int(round(-uv[1] * SCALE + CANVAS_SIZE * 0.5))
return px, py
def _ankle_center(kp_dict: dict[int, np.ndarray],
origin: np.ndarray,
u_axis: np.ndarray,
v_axis: np.ndarray) -> np.ndarray:
"""Return the mean ankle plane-UV, falling back to all-keypoint mean."""
ankles = [_to_plane_uv(kp_dict[k], origin, u_axis, v_axis)
for k in (15, 16) if k in kp_dict]
if ankles:
return np.mean(ankles, axis=0)
return np.mean(
[_to_plane_uv(v, origin, u_axis, v_axis) for v in kp_dict.values()],
axis=0)
def _facing_dir(kp_dict: dict[int, np.ndarray],
u_axis: np.ndarray,
v_axis: np.ndarray) -> np.ndarray | None:
"""
Return the normalised facing direction as a plane (u, v) unit vector, or None.
Method
------
The 3-D vector (nose shoulder_midpoint) points directly forward from
the person's body. Projecting it onto the ground plane and normalising
gives an unambiguous facing direction — no cross-product or sign-flip
disambiguation needed.
Falls back to (nose hip_midpoint) if shoulders are not detected.
Returns None if neither fallback has enough data.
"""
nose = kp_dict.get(0)
if nose is None:
return None
# Prefer shoulders (5, 6), fall back to hips (11, 12)
refs = [kp_dict[k] for k in (5, 6) if k in kp_dict]
if not refs:
refs = [kp_dict[k] for k in (11, 12) if k in kp_dict]
if not refs:
return None
ref_mid = np.mean(refs, axis=0) # shoulder (or hip) midpoint
forward_3d = nose - ref_mid # points toward the face
facing_uv = np.array([np.dot(forward_3d, u_axis),
np.dot(forward_3d, v_axis)], dtype=np.float64)
n = np.linalg.norm(facing_uv)
if n < 1e-6:
return None
return facing_uv / n
# ═══════════════════════════════════════════════════════════════════════════════
# Node
# ═══════════════════════════════════════════════════════════════════════════════
class TopDownNode(Node):
"""
Publishes a top-down 2-D view of the ground plane with:
• a metric grid
• one circle per detected person (centred on ankle midpoint)
• a facing arrow per person (nose shoulder direction, projected)
"""
def __init__(self):
super().__init__('top_down_node')
self._bridge = CvBridge()
# {person_id: {kp_id: np.ndarray([x, y, z])}}
self._kps: dict[int, dict[int, np.ndarray]] = {}
# (origin, R) — R cols: [u_axis | v_axis | normal]
self._plane: tuple[np.ndarray, np.ndarray] | None = None
self.create_subscription(
PointCloud2, '/keypoints_3d', self._kp_cb, 10)
self.create_subscription(
PoseStamped, '/ground_plane_pose', self._plane_cb, 10)
self._pub = self.create_publisher(Image, '/top_down_image', 10)
self.create_timer(1.0 / PUBLISH_HZ, self._render)
self.get_logger().info('TopDown node started — publishing /top_down_image')
# ── Upstream data caches ──────────────────────────────────────────────────
def _kp_cb(self, msg: PointCloud2):
kps: dict[int, dict[int, np.ndarray]] = {}
for pt in pc2.read_points(
msg,
field_names=('x', 'y', 'z', 'person_id', 'kp_id'),
skip_nans=True):
pid = int(pt[3])
kid = int(pt[4])
kps.setdefault(pid, {})[kid] = np.array(
[pt[0], pt[1], pt[2]], dtype=np.float64)
self._kps = kps
def _plane_cb(self, msg: PoseStamped):
o = msg.pose.position
q = msg.pose.orientation
origin = np.array([o.x, o.y, o.z], dtype=np.float64)
R = _quat_to_rot(q.x, q.y, q.z, q.w)
self._plane = (origin, R)
# ── Main render ───────────────────────────────────────────────────────────
def _render(self):
canvas = np.full((CANVAS_SIZE, CANVAS_SIZE, 3), BG_COLOR, dtype=np.uint8)
if self._plane is None:
_draw_waiting(canvas)
self._pub.publish(
self._bridge.cv2_to_imgmsg(canvas, encoding='bgr8'))
return
origin, R = self._plane
u_axis = R[:, 0]
v_axis = R[:, 1]
self._draw_grid(canvas)
for person_id, kp_dict in self._kps.items():
color = PERSON_COLORS[person_id % len(PERSON_COLORS)]
center_uv = _ankle_center(kp_dict, origin, u_axis, v_axis)
cx, cy = _to_pixel(center_uv)
r_px = max(1, int(round(PERSON_RADIUS_M * SCALE)))
# Filled circle (alpha-blended)
overlay = canvas.copy()
cv2.circle(overlay, (cx, cy), r_px, color, -1, cv2.LINE_AA)
cv2.addWeighted(overlay, CIRCLE_ALPHA,
canvas, 1.0 - CIRCLE_ALPHA, 0, canvas)
# Circle outline (fully opaque)
cv2.circle(canvas, (cx, cy), r_px, color, CIRCLE_OUTLINE_PX, cv2.LINE_AA)
# Person label
cv2.putText(canvas, f'P{person_id}',
(cx + r_px + 4, cy + 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.55, color, 2, cv2.LINE_AA)
# Facing arrow: base at circle edge, tip beyond it
fdir = _facing_dir(kp_dict, u_axis, v_axis)
if fdir is not None:
base_uv = center_uv + fdir * PERSON_RADIUS_M
tip_uv = center_uv + fdir * (PERSON_RADIUS_M + ARROW_LENGTH_M)
base_px = _to_pixel(base_uv)
tip_px = _to_pixel(tip_uv)
cv2.arrowedLine(canvas, base_px, tip_px,
(255, 255, 255), 2, cv2.LINE_AA,
tipLength=ARROW_TIP_FRACTION)
self._pub.publish(
self._bridge.cv2_to_imgmsg(canvas, encoding='bgr8'))
# ── Grid ──────────────────────────────────────────────────────────────────
def _draw_grid(self, canvas: np.ndarray):
lim = CANVAS_HALF_M
steps = np.arange(-lim, lim + GRID_SPACING * 0.5, GRID_SPACING)
for s in steps:
is_axis = abs(s) < 1e-6
color = AXIS_COLOR if is_axis else GRID_COLOR
thick = 2 if is_axis else 1
x0, y0 = _to_pixel(np.array([s, -lim]))
x1, y1 = _to_pixel(np.array([s, lim]))
cv2.line(canvas, (x0, y0), (x1, y1), color, thick, cv2.LINE_AA)
x0, y0 = _to_pixel(np.array([-lim, s]))
x1, y1 = _to_pixel(np.array([ lim, s]))
cv2.line(canvas, (x0, y0), (x1, y1), color, thick, cv2.LINE_AA)
# Origin marker
ox, oy = _to_pixel(np.zeros(2))
cv2.circle(canvas, (ox, oy), 5, AXIS_COLOR, -1, cv2.LINE_AA)
cv2.putText(canvas, 'origin', (ox + 8, oy - 8),
cv2.FONT_HERSHEY_SIMPLEX, 0.38, AXIS_COLOR, 1, cv2.LINE_AA)
# Axis tip labels
cv2.putText(canvas, 'u+',
_to_pixel(np.array([lim - 0.4, 0.0])),
cv2.FONT_HERSHEY_SIMPLEX, 0.45, AXIS_COLOR, 1, cv2.LINE_AA)
cv2.putText(canvas, 'v+',
_to_pixel(np.array([0.0, lim - 0.4])),
cv2.FONT_HERSHEY_SIMPLEX, 0.45, AXIS_COLOR, 1, cv2.LINE_AA)
# 1 m scale bar
bx0, by0 = _to_pixel(np.array([-lim + 0.2, -lim + 0.2]))
bx1, by1 = _to_pixel(np.array([-lim + 1.2, -lim + 0.2]))
cv2.line(canvas, (bx0, by0), (bx1, by1), (180, 180, 180), 2, cv2.LINE_AA)
cv2.putText(canvas, '1 m', (bx0, by0 - 6),
cv2.FONT_HERSHEY_SIMPLEX, 0.38, (180, 180, 180), 1, cv2.LINE_AA)
# ═══════════════════════════════════════════════════════════════════════════════
# Module-level helpers
# ═══════════════════════════════════════════════════════════════════════════════
def _draw_waiting(canvas: np.ndarray):
text = 'Waiting for plane...'
font = cv2.FONT_HERSHEY_SIMPLEX
scale = 1.0
thick = 2
(tw, th), _ = cv2.getTextSize(text, font, scale, thick)
cv2.putText(canvas, text,
((CANVAS_SIZE - tw) // 2, (CANVAS_SIZE + th) // 2),
font, scale, (200, 200, 200), thick, cv2.LINE_AA)
# ═══════════════════════════════════════════════════════════════════════════════
# Entry point
# ═══════════════════════════════════════════════════════════════════════════════
def main(args=None):
rclpy.init(args=args)
node = TopDownNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == '__main__':
main()