diff --git a/demos/top-down v1.webm b/demos/top-down v1.webm new file mode 100644 index 0000000..c4abc44 Binary files /dev/null and b/demos/top-down v1.webm differ diff --git a/demos/top-down v2.webm b/demos/top-down v2.webm new file mode 100644 index 0000000..a5377fd Binary files /dev/null and b/demos/top-down v2.webm differ diff --git a/tracking_re_id/launch/top_down_launch.py b/tracking_re_id/launch/top_down_launch.py new file mode 100644 index 0000000..ed689c2 --- /dev/null +++ b/tracking_re_id/launch/top_down_launch.py @@ -0,0 +1,72 @@ +"""Launch the full 3D tracking + ground-plane estimation + top-down view pipeline. + +Nodes started: + 1. single_person_loc_node -- headless stereo keypoint triangulator + publishes: /keypoint_markers (MarkerArray) + /keypoints_3d (PointCloud2) + 2. ground_plane_node -- ground-plane estimator + publishes: /ground_plane_markers (MarkerArray) + /ground_plane_pose (PoseStamped) + 3. top_down_node -- synthetic bird's-eye view of the ground plane + publishes: /top_down_image (Image) + +To view the top-down image: + ros2 run rqt_image_view rqt_image_view → select /top_down_image +""" + +import os +import sys + +sys.path.insert(0, os.path.dirname(__file__)) +from _conda_utils import find_conda_python # noqa: E402 + +from launch import LaunchDescription +from launch.actions import ExecuteProcess + + +def generate_launch_description(): + python_exe = find_conda_python('mmpose') + + return LaunchDescription([ + + # ── 1. Keypoint triangulator (headless) ────────────────────────────── + ExecuteProcess( + cmd=[ + python_exe, '-m', 'tracking_re_id.single_person_loc_node', + '--ros-args', + '-p', 'threshold:=0.3', + '-p', 'device:=cuda:0', + '-p', 'max_residual:=0.10', + '-p', 'headless:=true', + ], + output='screen', + env={**os.environ}, + ), + + # ── 2. Ground-plane estimator ───────────────────────────────────────── + ExecuteProcess( + cmd=[ + python_exe, '-m', 'tracking_re_id.ground_plane_node', + '--ros-args', + '-p', 'stable_frames:=5', + '-p', 'stable_radius:=0.05', + '-p', 'duplicate_radius:=0', + '-p', 'collinearity_threshold:=0.25', + '-p', 'max_ground_points:=100', + '-p', 'min_plane_points:=5', + ], + output='screen', + env={**os.environ}, + ), + + # ── 3. Top-down bird's-eye visualiser ──────────────────────────────── + ExecuteProcess( + cmd=[ + python_exe, '-m', 'tracking_re_id.top_down_node', + '--ros-args', + ], + output='screen', + env={**os.environ}, + ), + + ]) diff --git a/tracking_re_id/setup.py b/tracking_re_id/setup.py index 2fb1ea3..3d96b5a 100644 --- a/tracking_re_id/setup.py +++ b/tracking_re_id/setup.py @@ -32,6 +32,7 @@ setup( 'single_person_loc_node = tracking_re_id.single_person_loc_node:main', 'ground_plane_node = tracking_re_id.ground_plane_node:main', 'overlay_node = tracking_re_id.overlay_node:main', + 'top_down_node = tracking_re_id.top_down_node:main', 'reid_node = tracking_re_id.reid_node:main', ], }, diff --git a/tracking_re_id/tracking_re_id/top_down_node.py b/tracking_re_id/tracking_re_id/top_down_node.py new file mode 100644 index 0000000..eec2d6a --- /dev/null +++ b/tracking_re_id/tracking_re_id/top_down_node.py @@ -0,0 +1,350 @@ +""" +ROS 2 node: top_down_node + +Renders a synthetic bird's-eye (top-down) 2-D view of the detected ground +plane. Each detected person is shown as a filled circle (centred on their +ankle midpoint) with a facing arrow. The facing direction is derived by +projecting the 3-D vector from the shoulder midpoint to the nose onto the +ground plane — no cross-product or disambiguation required. + +Before the ground plane is established the window shows "Waiting for plane…". + +Subscriptions +───────────── + /keypoints_3d sensor_msgs/PointCloud2 (from single_person_loc_node) + /ground_plane_pose geometry_msgs/PoseStamped (from ground_plane_node) + +Publications +──────────── + /top_down_image sensor_msgs/Image (BGR8, fixed-size canvas) +""" + +import numpy as np +import cv2 +import rclpy +from rclpy.node import Node +from sensor_msgs.msg import Image, PointCloud2 +from geometry_msgs.msg import PoseStamped +from sensor_msgs_py import point_cloud2 as pc2 +from cv_bridge import CvBridge + + +# ═══════════════════════════════════════════════════════════════════════════════ +# TUNABLE PARAMETERS +# ═══════════════════════════════════════════════════════════════════════════════ + +# Canvas side length in pixels (square output image). +CANVAS_SIZE: int = 800 + +# Physical half-extent of the canvas from the plane origin (metres). +# A value of 4.0 yields an 8 m × 8 m visible area. +CANVAS_HALF_M: float = 4.0 + +# Derived: pixels per metre. +SCALE: float = CANVAS_SIZE / (2.0 * CANVAS_HALF_M) + +# Canvas background colour (B, G, R). +BG_COLOR: tuple = (30, 30, 30) + +# Regular grid line colour (B, G, R). +GRID_COLOR: tuple = (70, 70, 70) + +# Grid line spacing in metres. +GRID_SPACING: float = 0.5 + +# Axis (u=0, v=0) line colour (B, G, R). +AXIS_COLOR: tuple = (0, 180, 0) + +# Radius of the per-person presence circle in metres. +PERSON_RADIUS_M: float = 0.20 + +# Fill alpha of the presence circle (0.0–1.0). +CIRCLE_ALPHA: float = 0.45 + +# Thickness of the presence circle outline in pixels. +CIRCLE_OUTLINE_PX: int = 2 + +# Length of the facing arrow beyond the circle edge, in metres. +ARROW_LENGTH_M: float = 0.7 + +# Fraction of arrow length used for the arrowhead. +ARROW_TIP_FRACTION: float = 0.30 + +# Publish rate of the top-down image in Hz. +PUBLISH_HZ: float = 15.0 + +# Per-person colours, cycled by person_id (B, G, R). +PERSON_COLORS: list = [ + (0, 230, 0), # green + (0, 100, 255), # orange + (255, 50, 200), # magenta + (0, 200, 255), # yellow +] + + +# ═══════════════════════════════════════════════════════════════════════════════ +# Helpers +# ═══════════════════════════════════════════════════════════════════════════════ + +def _quat_to_rot(qx: float, qy: float, qz: float, qw: float) -> np.ndarray: + """Return the 3×3 rotation matrix for unit quaternion (x, y, z, w).""" + return np.array([ + [1 - 2*(qy*qy + qz*qz), 2*(qx*qy - qz*qw), 2*(qx*qz + qy*qw)], + [ 2*(qx*qy + qz*qw), 1 - 2*(qx*qx + qz*qz), 2*(qy*qz - qx*qw)], + [ 2*(qx*qz - qy*qw), 2*(qy*qz + qx*qw), 1 - 2*(qx*qx + qy*qy)], + ], dtype=np.float64) + + +def _to_plane_uv(pt3d: np.ndarray, + origin: np.ndarray, + u_axis: np.ndarray, + v_axis: np.ndarray) -> np.ndarray: + """Project a 3-D camera-frame point onto the plane, returning (u, v).""" + d = pt3d - origin + return np.array([np.dot(d, u_axis), np.dot(d, v_axis)], dtype=np.float64) + + +def _to_pixel(uv: np.ndarray) -> tuple[int, int]: + """ + Map plane coordinates (u, v) to integer canvas pixel (x, y). + + u increases to the right; v is flipped so that "further from camera" + appears upward on the canvas. + """ + px = int(round( uv[0] * SCALE + CANVAS_SIZE * 0.5)) + py = int(round(-uv[1] * SCALE + CANVAS_SIZE * 0.5)) + return px, py + + +def _ankle_center(kp_dict: dict[int, np.ndarray], + origin: np.ndarray, + u_axis: np.ndarray, + v_axis: np.ndarray) -> np.ndarray: + """Return the mean ankle plane-UV, falling back to all-keypoint mean.""" + ankles = [_to_plane_uv(kp_dict[k], origin, u_axis, v_axis) + for k in (15, 16) if k in kp_dict] + if ankles: + return np.mean(ankles, axis=0) + return np.mean( + [_to_plane_uv(v, origin, u_axis, v_axis) for v in kp_dict.values()], + axis=0) + + +def _facing_dir(kp_dict: dict[int, np.ndarray], + u_axis: np.ndarray, + v_axis: np.ndarray) -> np.ndarray | None: + """ + Return the normalised facing direction as a plane (u, v) unit vector, or None. + + Method + ------ + The 3-D vector (nose − shoulder_midpoint) points directly forward from + the person's body. Projecting it onto the ground plane and normalising + gives an unambiguous facing direction — no cross-product or sign-flip + disambiguation needed. + + Falls back to (nose − hip_midpoint) if shoulders are not detected. + Returns None if neither fallback has enough data. + """ + nose = kp_dict.get(0) + if nose is None: + return None + + # Prefer shoulders (5, 6), fall back to hips (11, 12) + refs = [kp_dict[k] for k in (5, 6) if k in kp_dict] + if not refs: + refs = [kp_dict[k] for k in (11, 12) if k in kp_dict] + if not refs: + return None + + ref_mid = np.mean(refs, axis=0) # shoulder (or hip) midpoint + forward_3d = nose - ref_mid # points toward the face + + facing_uv = np.array([np.dot(forward_3d, u_axis), + np.dot(forward_3d, v_axis)], dtype=np.float64) + n = np.linalg.norm(facing_uv) + if n < 1e-6: + return None + return facing_uv / n + + +# ═══════════════════════════════════════════════════════════════════════════════ +# Node +# ═══════════════════════════════════════════════════════════════════════════════ + +class TopDownNode(Node): + """ + Publishes a top-down 2-D view of the ground plane with: + • a metric grid + • one circle per detected person (centred on ankle midpoint) + • a facing arrow per person (nose − shoulder direction, projected) + """ + + def __init__(self): + super().__init__('top_down_node') + + self._bridge = CvBridge() + + # {person_id: {kp_id: np.ndarray([x, y, z])}} + self._kps: dict[int, dict[int, np.ndarray]] = {} + + # (origin, R) — R cols: [u_axis | v_axis | normal] + self._plane: tuple[np.ndarray, np.ndarray] | None = None + + self.create_subscription( + PointCloud2, '/keypoints_3d', self._kp_cb, 10) + self.create_subscription( + PoseStamped, '/ground_plane_pose', self._plane_cb, 10) + + self._pub = self.create_publisher(Image, '/top_down_image', 10) + self.create_timer(1.0 / PUBLISH_HZ, self._render) + + self.get_logger().info('TopDown node started — publishing /top_down_image') + + # ── Upstream data caches ────────────────────────────────────────────────── + + def _kp_cb(self, msg: PointCloud2): + kps: dict[int, dict[int, np.ndarray]] = {} + for pt in pc2.read_points( + msg, + field_names=('x', 'y', 'z', 'person_id', 'kp_id'), + skip_nans=True): + pid = int(pt[3]) + kid = int(pt[4]) + kps.setdefault(pid, {})[kid] = np.array( + [pt[0], pt[1], pt[2]], dtype=np.float64) + self._kps = kps + + def _plane_cb(self, msg: PoseStamped): + o = msg.pose.position + q = msg.pose.orientation + origin = np.array([o.x, o.y, o.z], dtype=np.float64) + R = _quat_to_rot(q.x, q.y, q.z, q.w) + self._plane = (origin, R) + + # ── Main render ─────────────────────────────────────────────────────────── + + def _render(self): + canvas = np.full((CANVAS_SIZE, CANVAS_SIZE, 3), BG_COLOR, dtype=np.uint8) + + if self._plane is None: + _draw_waiting(canvas) + self._pub.publish( + self._bridge.cv2_to_imgmsg(canvas, encoding='bgr8')) + return + + origin, R = self._plane + u_axis = R[:, 0] + v_axis = R[:, 1] + + self._draw_grid(canvas) + + for person_id, kp_dict in self._kps.items(): + color = PERSON_COLORS[person_id % len(PERSON_COLORS)] + + center_uv = _ankle_center(kp_dict, origin, u_axis, v_axis) + cx, cy = _to_pixel(center_uv) + r_px = max(1, int(round(PERSON_RADIUS_M * SCALE))) + + # Filled circle (alpha-blended) + overlay = canvas.copy() + cv2.circle(overlay, (cx, cy), r_px, color, -1, cv2.LINE_AA) + cv2.addWeighted(overlay, CIRCLE_ALPHA, + canvas, 1.0 - CIRCLE_ALPHA, 0, canvas) + + # Circle outline (fully opaque) + cv2.circle(canvas, (cx, cy), r_px, color, CIRCLE_OUTLINE_PX, cv2.LINE_AA) + + # Person label + cv2.putText(canvas, f'P{person_id}', + (cx + r_px + 4, cy + 5), + cv2.FONT_HERSHEY_SIMPLEX, 0.55, color, 2, cv2.LINE_AA) + + # Facing arrow: base at circle edge, tip beyond it + fdir = _facing_dir(kp_dict, u_axis, v_axis) + if fdir is not None: + base_uv = center_uv + fdir * PERSON_RADIUS_M + tip_uv = center_uv + fdir * (PERSON_RADIUS_M + ARROW_LENGTH_M) + base_px = _to_pixel(base_uv) + tip_px = _to_pixel(tip_uv) + cv2.arrowedLine(canvas, base_px, tip_px, + (255, 255, 255), 2, cv2.LINE_AA, + tipLength=ARROW_TIP_FRACTION) + + self._pub.publish( + self._bridge.cv2_to_imgmsg(canvas, encoding='bgr8')) + + # ── Grid ────────────────────────────────────────────────────────────────── + + def _draw_grid(self, canvas: np.ndarray): + lim = CANVAS_HALF_M + steps = np.arange(-lim, lim + GRID_SPACING * 0.5, GRID_SPACING) + + for s in steps: + is_axis = abs(s) < 1e-6 + color = AXIS_COLOR if is_axis else GRID_COLOR + thick = 2 if is_axis else 1 + + x0, y0 = _to_pixel(np.array([s, -lim])) + x1, y1 = _to_pixel(np.array([s, lim])) + cv2.line(canvas, (x0, y0), (x1, y1), color, thick, cv2.LINE_AA) + + x0, y0 = _to_pixel(np.array([-lim, s])) + x1, y1 = _to_pixel(np.array([ lim, s])) + cv2.line(canvas, (x0, y0), (x1, y1), color, thick, cv2.LINE_AA) + + # Origin marker + ox, oy = _to_pixel(np.zeros(2)) + cv2.circle(canvas, (ox, oy), 5, AXIS_COLOR, -1, cv2.LINE_AA) + cv2.putText(canvas, 'origin', (ox + 8, oy - 8), + cv2.FONT_HERSHEY_SIMPLEX, 0.38, AXIS_COLOR, 1, cv2.LINE_AA) + + # Axis tip labels + cv2.putText(canvas, 'u+', + _to_pixel(np.array([lim - 0.4, 0.0])), + cv2.FONT_HERSHEY_SIMPLEX, 0.45, AXIS_COLOR, 1, cv2.LINE_AA) + cv2.putText(canvas, 'v+', + _to_pixel(np.array([0.0, lim - 0.4])), + cv2.FONT_HERSHEY_SIMPLEX, 0.45, AXIS_COLOR, 1, cv2.LINE_AA) + + # 1 m scale bar + bx0, by0 = _to_pixel(np.array([-lim + 0.2, -lim + 0.2])) + bx1, by1 = _to_pixel(np.array([-lim + 1.2, -lim + 0.2])) + cv2.line(canvas, (bx0, by0), (bx1, by1), (180, 180, 180), 2, cv2.LINE_AA) + cv2.putText(canvas, '1 m', (bx0, by0 - 6), + cv2.FONT_HERSHEY_SIMPLEX, 0.38, (180, 180, 180), 1, cv2.LINE_AA) + + +# ═══════════════════════════════════════════════════════════════════════════════ +# Module-level helpers +# ═══════════════════════════════════════════════════════════════════════════════ + +def _draw_waiting(canvas: np.ndarray): + text = 'Waiting for plane...' + font = cv2.FONT_HERSHEY_SIMPLEX + scale = 1.0 + thick = 2 + (tw, th), _ = cv2.getTextSize(text, font, scale, thick) + cv2.putText(canvas, text, + ((CANVAS_SIZE - tw) // 2, (CANVAS_SIZE + th) // 2), + font, scale, (200, 200, 200), thick, cv2.LINE_AA) + + +# ═══════════════════════════════════════════════════════════════════════════════ +# Entry point +# ═══════════════════════════════════════════════════════════════════════════════ + +def main(args=None): + rclpy.init(args=args) + node = TopDownNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.try_shutdown() + + +if __name__ == '__main__': + main()