From d09033bd9ab0bd2741558ee5b77bed0d9444ea8f Mon Sep 17 00:00:00 2001 From: pulipakaa24 Date: Sun, 15 Mar 2026 16:37:20 -0500 Subject: [PATCH] overlay view --- tracking_re_id/launch/full_pipeline.launch.py | 19 +- tracking_re_id/setup.py | 1 + tracking_re_id/tracking_re_id/overlay_node.py | 352 ++++++++++++++++++ 3 files changed, 368 insertions(+), 4 deletions(-) create mode 100644 tracking_re_id/tracking_re_id/overlay_node.py diff --git a/tracking_re_id/launch/full_pipeline.launch.py b/tracking_re_id/launch/full_pipeline.launch.py index 74f13a0..fdb7bfd 100644 --- a/tracking_re_id/launch/full_pipeline.launch.py +++ b/tracking_re_id/launch/full_pipeline.launch.py @@ -1,4 +1,4 @@ -"""Launch the full 3D tracking + ground-plane estimation pipeline. +"""Launch the full 3D tracking + ground-plane estimation + image overlay pipeline. Nodes started: 1. single_person_loc_node -- headless keypoint triangulator @@ -7,10 +7,11 @@ Nodes started: 2. ground_plane_node -- ground-plane estimator publishes: /ground_plane_markers (MarkerArray) /ground_plane_pose (PoseStamped) + 3. overlay_node -- undistorted image with plane grid + skeleton + publishes: /overlay_image (Image) -Visualise in RViz with Fixed Frame = 'left', then add: - MarkerArray /keypoint_markers (3D skeleton + keypoints) - MarkerArray /ground_plane_markers (plane disc, normal arrow, ground samples) +To view the annotated image: + ros2 run rqt_image_view rqt_image_view → select /overlay_image """ import os @@ -55,4 +56,14 @@ def generate_launch_description(): env={**os.environ}, ), + # ── 3. Image overlay (undistorted image + plane grid + skeleton) ── + ExecuteProcess( + cmd=[ + python_exe, '-m', 'tracking_re_id.overlay_node', + '--ros-args', + ], + output='screen', + env={**os.environ}, + ), + ]) diff --git a/tracking_re_id/setup.py b/tracking_re_id/setup.py index 5920cc9..d8883fe 100644 --- a/tracking_re_id/setup.py +++ b/tracking_re_id/setup.py @@ -29,6 +29,7 @@ setup( 'console_scripts': [ 'single_person_loc_node = tracking_re_id.single_person_loc_node:main', 'ground_plane_node = tracking_re_id.ground_plane_node:main', + 'overlay_node = tracking_re_id.overlay_node:main', ], }, ) diff --git a/tracking_re_id/tracking_re_id/overlay_node.py b/tracking_re_id/tracking_re_id/overlay_node.py new file mode 100644 index 0000000..445d250 --- /dev/null +++ b/tracking_re_id/tracking_re_id/overlay_node.py @@ -0,0 +1,352 @@ +""" +ROS 2 node: overlay_node + +Reprojects the estimated ground plane and 3D keypoints back onto the raw +camera image. The image is first undistorted using the camera's intrinsic +calibration so that projection with the new (distortion-free) camera matrix +K_new is geometrically exact. + +Subscriptions +───────────── + /stereo/left/image_raw sensor_msgs/Image + /stereo/left/camera_info sensor_msgs/CameraInfo + /keypoints_3d sensor_msgs/PointCloud2 (from single_person_loc_node) + /ground_plane_pose geometry_msgs/PoseStamped (from ground_plane_node) + +Publications +──────────── + /overlay_image sensor_msgs/Image (undistorted + annotated) +""" + +import numpy as np +import cv2 +import rclpy +from rclpy.node import Node +from sensor_msgs.msg import Image, CameraInfo, PointCloud2 +from geometry_msgs.msg import PoseStamped +from sensor_msgs_py import point_cloud2 as pc2 +from cv_bridge import CvBridge + + +# ═══════════════════════════════════════════════════════════════════════════════ +# TUNABLE PARAMETERS +# ═══════════════════════════════════════════════════════════════════════════════ + +# Distance (metres) between adjacent parallel grid lines on the ground plane. +GRID_SPACING: float = 0.5 + +# How far the grid extends in each in-plane direction from the plane origin. +# A value of 3.0 m gives a 6 m × 6 m grid. +GRID_EXTENT: float = 3.0 + +# Colour of regular grid lines (B, G, R) +GRID_COLOR: tuple = (0, 215, 255) + +# Thickness of regular grid lines in pixels. +GRID_THICKNESS: int = 1 + +# Colour of the two centre (axis) lines of the grid. +GRID_AXIS_COLOR: tuple = (0, 255, 128) + +# Thickness of grid axis lines in pixels. +GRID_AXIS_THICKNESS: int = 2 + +# Radius of the plane-origin dot drawn on the image, in pixels. +GRID_ORIGIN_RADIUS: int = 6 + +# Radius of ordinary keypoint dots, in pixels. +KP_RADIUS: int = 6 + +# Thickness of skeleton line segments, in pixels. +SKELETON_THICKNESS: int = 2 + +# Cycle of per-person colours (B, G, R). +PERSON_COLORS: list = [ + (0, 230, 0), + (0, 100, 255), + (255, 50, 200), + (0, 200, 255), +] + +# COCO-17 keypoint index pairs that form the skeleton. +COCO_SKELETON: list = [ + (0, 1), (0, 2), # nose → eyes + (1, 3), (2, 4), # eyes → ears + (5, 6), # shoulders + (5, 7), (7, 9), # left arm + (6, 8), (8, 10), # right arm + (5, 11), (6, 12), # torso sides + (11, 12), # hips + (11, 13), (13, 15), # left leg + (12, 14), (14, 16), # right leg +] + +# COCO-17 indices of ankle / foot keypoints — drawn with a highlight ring. +FOOT_KP_INDICES: frozenset = frozenset({15, 16}) + +# If alpha=0.0, getOptimalNewCameraMatrix keeps all valid pixels but adds +# black borders. alpha=0.0 crops to the fully-valid rectangle (no black). +UNDISTORT_ALPHA: float = 0.0 + + +# ═══════════════════════════════════════════════════════════════════════════════ +# Helpers +# ═══════════════════════════════════════════════════════════════════════════════ + +def _quat_to_rot(qx: float, qy: float, qz: float, qw: float) -> np.ndarray: + """Return the 3×3 rotation matrix for unit quaternion (x, y, z, w).""" + return np.array([ + [1 - 2*(qy*qy + qz*qz), 2*(qx*qy - qz*qw), 2*(qx*qz + qy*qw)], + [ 2*(qx*qy + qz*qw), 1 - 2*(qx*qx + qz*qz), 2*(qy*qz - qx*qw)], + [ 2*(qx*qz - qy*qw), 2*(qy*qz + qx*qw), 1 - 2*(qx*qx + qy*qy)], + ], dtype=np.float64) + + +def _project_pt(pt: np.ndarray, K: np.ndarray) -> tuple[int, int]: + """Project a single 3-D point (camera frame) to integer pixel coords.""" + p = K @ pt + return int(p[0] / p[2]), int(p[1] / p[2]) + + +def _clip_to_camera(p0: np.ndarray, p1: np.ndarray, + z_near: float = 0.01 + ) -> tuple[np.ndarray | None, np.ndarray | None]: + """ + Clip a 3-D line segment to the half-space z >= z_near. + + Returns (clipped_p0, clipped_p1) or (None, None) if fully behind camera. + """ + in0 = p0[2] >= z_near + in1 = p1[2] >= z_near + + if in0 and in1: + return p0, p1 + if not in0 and not in1: + return None, None + + # One endpoint is behind — find the crossing point + t = (z_near - p0[2]) / (p1[2] - p0[2]) + p_cross = p0 + t * (p1 - p0) + + if in0: + return p0, p_cross + else: + return p_cross, p1 + + +# ═══════════════════════════════════════════════════════════════════════════════ +# Node +# ═══════════════════════════════════════════════════════════════════════════════ + +class OverlayNode(Node): + """ + Undistorts the left camera image and overlays: + • a perspective grid on the fitted ground plane + • the triangulated 3D skeleton keypoints + """ + + def __init__(self): + super().__init__('overlay_node') + + self._bridge = CvBridge() + + # Camera calibration — populated on first CameraInfo message + self._K_new: np.ndarray | None = None # undistorted camera matrix + self._mapx: np.ndarray | None = None # remap X-map + self._mapy: np.ndarray | None = None # remap Y-map + + # Latest data from upstream nodes (cached; applied to every image) + # {person_id: {kp_id: np.ndarray(3)}} + self._kps: dict[int, dict[int, np.ndarray]] = {} + + # (origin_3d, rotation_3x3) — axes: col0=u, col1=v, col2=normal + self._plane: tuple[np.ndarray, np.ndarray] | None = None + + # Subscriptions + self.create_subscription( + CameraInfo, '/stereo/left/camera_info', self._cam_info_cb, 1) + self.create_subscription( + Image, '/stereo/left/image_raw', self._image_cb, 10) + self.create_subscription( + PointCloud2, '/keypoints_3d', self._kp_cb, 10) + self.create_subscription( + PoseStamped, '/ground_plane_pose', self._plane_cb, 10) + + # Publisher + self._pub = self.create_publisher(Image, '/overlay_image', 10) + + self.get_logger().info('Overlay node started — publishing /overlay_image') + + # ── Camera calibration ─────────────────────────────────────────────────── + + def _cam_info_cb(self, msg: CameraInfo): + if self._K_new is not None: + return # already initialised + + K = np.array(msg.k, dtype=np.float64).reshape(3, 3) + D = np.array(msg.d, dtype=np.float64) + w, h = msg.width, msg.height + + K_new, _ = cv2.getOptimalNewCameraMatrix( + K, D, (w, h), alpha=UNDISTORT_ALPHA) + mapx, mapy = cv2.initUndistortRectifyMap( + K, D, None, K_new, (w, h), cv2.CV_32FC1) + + self._K_new = K_new + self._mapx = mapx + self._mapy = mapy + + self.get_logger().info( + f'Calibration received — K_new: ' + f'fx={K_new[0,0]:.1f} fy={K_new[1,1]:.1f} ' + f'cx={K_new[0,2]:.1f} cy={K_new[1,2]:.1f}') + + # ── Upstream data caches ───────────────────────────────────────────────── + + def _kp_cb(self, msg: PointCloud2): + kps: dict[int, dict[int, np.ndarray]] = {} + for pt in pc2.read_points( + msg, + field_names=('x', 'y', 'z', 'person_id', 'kp_id'), + skip_nans=True): + pid = int(pt[3]) + kid = int(pt[4]) + kps.setdefault(pid, {})[kid] = np.array( + [pt[0], pt[1], pt[2]], dtype=np.float64) + self._kps = kps + + def _plane_cb(self, msg: PoseStamped): + o = msg.pose.position + q = msg.pose.orientation + origin = np.array([o.x, o.y, o.z], dtype=np.float64) + R = _quat_to_rot(q.x, q.y, q.z, q.w) + self._plane = (origin, R) + + # ── Main rendering loop ────────────────────────────────────────────────── + + def _image_cb(self, msg: Image): + if self._K_new is None: + return # waiting for calibration + + # Decode & undistort + raw = self._bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8') + frame = cv2.remap(raw, self._mapx, self._mapy, cv2.INTER_LINEAR) + + K = self._K_new + + # 1 ── Ground-plane grid + if self._plane is not None: + self._draw_grid(frame, K) + + # 2 ── 3-D keypoints + skeleton + if self._kps: + self._draw_keypoints(frame, K) + + self._pub.publish( + self._bridge.cv2_to_imgmsg(frame, encoding='bgr8')) + + # ── Ground-plane grid ──────────────────────────────────────────────────── + + def _draw_grid(self, frame: np.ndarray, K: np.ndarray): + origin, R = self._plane + + # In-plane axes and plane normal (all in left-camera frame) + u_axis = R[:, 0] # first in-plane direction + v_axis = R[:, 1] # second in-plane direction + + steps = np.arange( + -GRID_EXTENT, GRID_EXTENT + GRID_SPACING * 0.5, GRID_SPACING) + + for s in steps: + is_axis = abs(s) < 1e-6 # centre line gets a different style + color = GRID_AXIS_COLOR if is_axis else GRID_COLOR + thick = GRID_AXIS_THICKNESS if is_axis else GRID_THICKNESS + + # Line running along v (u = s) + self._draw_3d_line( + frame, K, color, thick, + origin + s * u_axis - GRID_EXTENT * v_axis, + origin + s * u_axis + GRID_EXTENT * v_axis) + + # Line running along u (v = s) + self._draw_3d_line( + frame, K, color, thick, + origin - GRID_EXTENT * u_axis + s * v_axis, + origin + GRID_EXTENT * u_axis + s * v_axis) + + # Plane origin dot + label + if origin[2] > 0.01: + ox, oy = _project_pt(origin, K) + cv2.circle(frame, (ox, oy), GRID_ORIGIN_RADIUS, + GRID_AXIS_COLOR, -1, cv2.LINE_AA) + cv2.putText(frame, 'plane origin', + (ox + 8, oy - 8), + cv2.FONT_HERSHEY_SIMPLEX, 0.45, + GRID_AXIS_COLOR, 1, cv2.LINE_AA) + + def _draw_3d_line(self, frame: np.ndarray, K: np.ndarray, + color: tuple, thickness: int, + p0: np.ndarray, p1: np.ndarray): + """Project and draw a 3-D line segment, clipping against z=0.""" + cp0, cp1 = _clip_to_camera(p0, p1) + if cp0 is None: + return + pt0 = _project_pt(cp0, K) + pt1 = _project_pt(cp1, K) + cv2.line(frame, pt0, pt1, color, thickness, cv2.LINE_AA) + + # ── Keypoints + skeleton ───────────────────────────────────────────────── + + def _draw_keypoints(self, frame: np.ndarray, K: np.ndarray): + for person_id, kp_dict in self._kps.items(): + color = PERSON_COLORS[person_id % len(PERSON_COLORS)] + + # Project all keypoints for this person + kp_px: dict[int, tuple[int, int]] = {} + for kid, pt3d in kp_dict.items(): + if pt3d[2] <= 0.01: + continue + ix, iy = _project_pt(pt3d, K) + kp_px[kid] = (ix, iy) + + # Foot keypoints: yellow highlight ring + if kid in FOOT_KP_INDICES: + cv2.circle(frame, (ix, iy), KP_RADIUS + 5, + (0, 230, 230), 2, cv2.LINE_AA) + + cv2.circle(frame, (ix, iy), KP_RADIUS, color, -1, cv2.LINE_AA) + + # Skeleton edges + for i, j in COCO_SKELETON: + if i in kp_px and j in kp_px: + cv2.line(frame, kp_px[i], kp_px[j], + color, SKELETON_THICKNESS, cv2.LINE_AA) + + # Person label near nose (kp 0) or first visible keypoint + label_pt = kp_px.get(0) or ( + next(iter(kp_px.values())) if kp_px else None) + if label_pt: + cv2.putText(frame, f'P{person_id}', + (label_pt[0] + 8, label_pt[1] - 8), + cv2.FONT_HERSHEY_SIMPLEX, 0.55, + color, 2, cv2.LINE_AA) + + +# ═══════════════════════════════════════════════════════════════════════════════ +# Entry point +# ═══════════════════════════════════════════════════════════════════════════════ + +def main(args=None): + rclpy.init(args=args) + node = OverlayNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.try_shutdown() + + +if __name__ == '__main__': + main()