From c3521440401e008436c4702e22395f7cd4a7b1c1 Mon Sep 17 00:00:00 2001 From: pulipakaa24 Date: Sun, 15 Mar 2026 15:21:03 -0500 Subject: [PATCH] Test foot to plane --- .../3D_tracking_Re-ID/ground_plane_node.py | 426 ++++++++++++++++++ .../single_person_loc_node.py | 154 ++++--- .../launch/full_pipeline.launch.py | 58 +++ .../launch/ground_plane.launch.py | 53 +++ ...launch.py => single_person_demo.launch.py} | 0 .../launch/single_person_headless.launch.py | 33 ++ 3D_tracking_Re-ID/setup.py | 1 + 7 files changed, 665 insertions(+), 60 deletions(-) create mode 100644 3D_tracking_Re-ID/3D_tracking_Re-ID/ground_plane_node.py create mode 100644 3D_tracking_Re-ID/launch/full_pipeline.launch.py create mode 100644 3D_tracking_Re-ID/launch/ground_plane.launch.py rename 3D_tracking_Re-ID/launch/{keypoint_node.launch.py => single_person_demo.launch.py} (100%) create mode 100644 3D_tracking_Re-ID/launch/single_person_headless.launch.py diff --git a/3D_tracking_Re-ID/3D_tracking_Re-ID/ground_plane_node.py b/3D_tracking_Re-ID/3D_tracking_Re-ID/ground_plane_node.py new file mode 100644 index 0000000..7f10ba9 --- /dev/null +++ b/3D_tracking_Re-ID/3D_tracking_Re-ID/ground_plane_node.py @@ -0,0 +1,426 @@ +""" +ROS 2 node: ground-plane estimator from stable foot keypoints. + +Subscribes to /keypoints_3d (PointCloud2) published by +single_person_loc_node. Detects locations where foot keypoints remain +stable for several consecutive frames, registers those locations as +ground-plane samples, and fits a least-squares plane once three or more +non-collinear samples have been accumulated. + +Published topics: + /ground_plane_pose (geometry_msgs/PoseStamped) -- plane origin + orientation + /ground_plane_markers (visualization_msgs/MarkerArray) -- RViz visualisation +""" + +import numpy as np +import rclpy +from rclpy.node import Node +from sensor_msgs.msg import PointCloud2 +from sensor_msgs_py import point_cloud2 as pc2 +from geometry_msgs.msg import PoseStamped +from visualization_msgs.msg import Marker, MarkerArray + + +# ═══════════════════════════════════════════════════════════════════════════════ +# TUNABLE PARAMETERS +# All of these are also exposed as ROS parameters so they can be overridden +# from a launch file or the command line without editing source. +# ═══════════════════════════════════════════════════════════════════════════════ + +# Number of *consecutive* frames a foot keypoint must appear within +# STABLE_RADIUS of the same cluster centroid before that centroid is +# registered as a ground-plane sample. +STABLE_FRAMES: int = 5 + +# Two foot positions are considered the "same location" if they lie within +# this radius (metres) of the running cluster centroid. +STABLE_RADIUS: float = 0.05 + +# A newly registered ground-plane sample is suppressed (treated as a +# duplicate) if it lies within this distance (metres) of any already- +# registered sample. Prevents the same standing spot from being added +# repeatedly. Set to 0 to disable. +DUPLICATE_RADIUS: float = 0 + +# Minimum perpendicular distance (metres) that at least one registered +# sample must lie from the best-fit line through all current samples for +# the point set to be considered non-collinear. Points are always recorded; +# plane publication is gated on this condition being satisfied. +COLLINEARITY_THRESHOLD: float = 0.25 + +# Maximum number of ground-plane samples retained for the least-squares fit. +# When the limit is reached, the oldest sample is discarded (sliding window). +MAX_GROUND_POINTS: int = 100 + +# Minimum number of registered ground-plane samples required before any +# plane is published. Must be >= 3. +MIN_PLANE_POINTS: int = 3 + +# COCO-17 keypoint indices treated as foot/ankle keypoints. +# 15 = left_ankle, 16 = right_ankle +FOOT_KP_INDICES: frozenset = frozenset({15, 16}) + +# Radius (metres) of the semi-transparent plane disc drawn in RViz. +PLANE_VIS_RADIUS: float = 2.0 + + +# ═══════════════════════════════════════════════════════════════════════════════ +# Helpers +# ═══════════════════════════════════════════════════════════════════════════════ + +def _quat_from_z_to_normal(n: np.ndarray) -> np.ndarray: + """Return quaternion [x, y, z, w] that rotates (0,0,1) onto unit vector n.""" + z = np.array([0.0, 0.0, 1.0]) + n = n / np.linalg.norm(n) + dot = float(np.clip(np.dot(z, n), -1.0, 1.0)) + + if dot > 0.9999: + return np.array([0.0, 0.0, 0.0, 1.0]) + if dot < -0.9999: + # 180° rotation around x-axis + return np.array([1.0, 0.0, 0.0, 0.0]) + + axis = np.cross(z, n) + axis /= np.linalg.norm(axis) + half = np.arccos(dot) / 2.0 + s = np.sin(half) + return np.array([axis[0] * s, axis[1] * s, axis[2] * s, np.cos(half)]) + + +def _fit_plane(pts: np.ndarray): + """ + Fit a plane to an (N, 3) array of points via PCA / SVD. + + Returns + ------- + centroid : np.ndarray (3,) + normal : np.ndarray (3,) -- unit normal, z-component >= 0 + max_off_line : float -- max perpendicular distance of any point + from the best-fit *line* through centroid; + zero when N == 1. + """ + centroid = pts.mean(axis=0) + if len(pts) < 2: + return centroid, np.array([0.0, 0.0, 1.0]), 0.0 + + centered = pts - centroid + _, _, Vt = np.linalg.svd(centered, full_matrices=False) + + # Best-fit line direction (largest singular value) + line_dir = Vt[0] + proj = (centered @ line_dir)[:, None] * line_dir + residuals = np.linalg.norm(centered - proj, axis=1) + max_off_line = float(residuals.max()) + + # Plane normal (smallest singular value) + normal = Vt[-1] + if normal[2] < 0: + normal = -normal + + return centroid, normal, max_off_line + + +# ═══════════════════════════════════════════════════════════════════════════════ +# Node +# ═══════════════════════════════════════════════════════════════════════════════ + +class GroundPlaneNode(Node): + """Estimates a ground plane from stable foot-keypoint positions.""" + + def __init__(self): + super().__init__('ground_plane_node') + + # ── ROS parameters (fall back to module-level constants) ───────── + self.declare_parameter('stable_frames', STABLE_FRAMES) + self.declare_parameter('stable_radius', STABLE_RADIUS) + self.declare_parameter('duplicate_radius', DUPLICATE_RADIUS) + self.declare_parameter('collinearity_threshold', COLLINEARITY_THRESHOLD) + self.declare_parameter('max_ground_points', MAX_GROUND_POINTS) + self.declare_parameter('min_plane_points', MIN_PLANE_POINTS) + + self._stable_frames = self.get_parameter('stable_frames').value + self._stable_radius = self.get_parameter('stable_radius').value + self._dup_radius = self.get_parameter('duplicate_radius').value + self._collinear_thr = self.get_parameter('collinearity_threshold').value + self._max_pts = self.get_parameter('max_ground_points').value + self._min_pts = max(3, self.get_parameter('min_plane_points').value) + + # ── State ──────────────────────────────────────────────────────── + # Active foot-position candidates, each a dict: + # centroid (np.array 3) -- running mean of positions seen so far + # _sum (np.array 3) -- accumulator for running mean + # _n (int) -- total points in sum (for running mean) + # streak (int) -- consecutive frames seen + # seen (bool) -- updated during current frame + # registered (bool) -- True once this candidate has fired; + # prevents re-registering the same spot + # while the foot stays put + self._candidates: list[dict] = [] + + # Registered ground-plane samples (list of np.ndarray(3)) + self._ground_pts: list[np.ndarray] = [] + + # Current plane state + self._plane_valid: bool = False + self._plane_normal: np.ndarray | None = None + self._plane_origin: np.ndarray | None = None + + # ── ROS I/O ────────────────────────────────────────────────────── + self.create_subscription( + PointCloud2, '/keypoints_3d', self._kp_cb, 10) + + self._pose_pub = self.create_publisher( + PoseStamped, '/ground_plane_pose', 10) + self._marker_pub = self.create_publisher( + MarkerArray, '/ground_plane_markers', 10) + + self.get_logger().info( + f'Ground-plane node ready. ' + f'stable_frames={self._stable_frames} ' + f'stable_radius={self._stable_radius} m ' + f'collinearity_threshold={self._collinear_thr} m') + + # ── Keypoint callback ──────────────────────────────────────────────── + + def _kp_cb(self, msg: PointCloud2): + # Collect foot positions from this frame + foot_positions: list[np.ndarray] = [] + for pt in pc2.read_points( + msg, field_names=('x', 'y', 'z', 'kp_id'), skip_nans=True): + if int(pt[3]) in FOOT_KP_INDICES: + foot_positions.append(np.array([pt[0], pt[1], pt[2]], + dtype=np.float64)) + + self._update_candidates(foot_positions) + self._publish(msg.header.stamp) + + # ── Candidate tracking ─────────────────────────────────────────────── + + def _update_candidates(self, foot_positions: list[np.ndarray]): + # Reset "seen" flag for all candidates at the start of each frame + for c in self._candidates: + c['seen'] = False + + for pos in foot_positions: + # Associate pos with the nearest candidate within stable_radius + best_i, best_d = -1, float('inf') + for i, c in enumerate(self._candidates): + d = float(np.linalg.norm(pos - c['centroid'])) + if d < best_d: + best_d, best_i = d, i + + if best_i >= 0 and best_d < self._stable_radius: + c = self._candidates[best_i] + c['_sum'] += pos + c['_n'] += 1 + c['centroid'] = c['_sum'] / c['_n'] + c['streak'] += 1 + c['seen'] = True + else: + self._candidates.append({ + 'centroid': pos.copy(), + '_sum': pos.copy(), + '_n': 1, + 'streak': 1, + 'seen': True, + 'registered': False, + }) + + # Process matured / stale candidates (iterate in reverse for safe removal) + for i in reversed(range(len(self._candidates))): + c = self._candidates[i] + if not c['seen']: + # Foot left — discard candidate regardless of registration state + del self._candidates[i] + elif not c['registered'] and c['streak'] >= self._stable_frames: + # Matured for the first time — register and keep alive + self._try_register(c['centroid']) + c['registered'] = True + + # ── Ground-point registration ──────────────────────────────────────── + + def _try_register(self, pt: np.ndarray): + """Add pt as a ground-plane sample if it is not a near-duplicate.""" + if self._dup_radius > 0: + for existing in self._ground_pts: + if np.linalg.norm(pt - existing) < self._dup_radius: + return # duplicate — discard + + self.get_logger().info( + f'Ground sample #{len(self._ground_pts) + 1}: ' + f'({pt[0]:.3f}, {pt[1]:.3f}, {pt[2]:.3f})') + + self._ground_pts.append(pt.copy()) + if len(self._ground_pts) > self._max_pts: + self._ground_pts.pop(0) + + self._refit_plane() + + # ── Plane fitting ──────────────────────────────────────────────────── + + def _refit_plane(self): + n = len(self._ground_pts) + if n < self._min_pts: + self._plane_valid = False + return + + pts = np.array(self._ground_pts) + centroid, normal, max_off_line = _fit_plane(pts) + + if max_off_line < self._collinear_thr: + self._plane_valid = False + self.get_logger().info( + f'{n} samples but all nearly collinear ' + f'(max off-line = {max_off_line:.4f} m, ' + f'threshold = {self._collinear_thr} m). ' + f'Plane not published until a non-collinear point is found.') + return + + # Move origin to the point on the new plane closest to the old origin. + # If there is no prior origin, use the centroid. + old_origin = self._plane_origin if self._plane_origin is not None else centroid + t = float(np.dot(old_origin - centroid, normal)) + new_origin = old_origin - t * normal + + self._plane_normal = normal + self._plane_origin = new_origin + self._plane_valid = True + + self.get_logger().info( + f'Plane (re)fit: normal=({normal[0]:.3f}, {normal[1]:.3f}, {normal[2]:.3f}) ' + f'origin=({new_origin[0]:.3f}, {new_origin[1]:.3f}, {new_origin[2]:.3f}) ' + f'n_samples={n} max_off_line={max_off_line:.4f} m') + + # ── Publishing ─────────────────────────────────────────────────────── + + def _publish(self, stamp): + ma = MarkerArray() + + # Clear previous markers + delete = Marker() + delete.action = Marker.DELETEALL + delete.header.frame_id = 'left' + delete.header.stamp = stamp + ma.markers.append(delete) + + mid = 0 + + # Registered ground-plane samples — orange spheres + for pt in self._ground_pts: + m = Marker() + m.header.frame_id = 'left' + m.header.stamp = stamp + m.ns = 'ground_samples' + m.id = mid; mid += 1 + m.type = Marker.SPHERE + m.action = Marker.ADD + m.pose.position.x = float(pt[0]) + m.pose.position.y = float(pt[1]) + m.pose.position.z = float(pt[2]) + m.pose.orientation.w = 1.0 + m.scale.x = m.scale.y = m.scale.z = 0.08 + m.color.r = 1.0; m.color.g = 0.45; m.color.b = 0.0 + m.color.a = 1.0 + ma.markers.append(m) + + # Active tracking candidates — dim yellow spheres whose alpha + # grows as the streak approaches stable_frames + for c in self._candidates: + pt = c['centroid'] + alpha = float(min(c['streak'] / self._stable_frames, 1.0)) * 0.7 + m = Marker() + m.header.frame_id = 'left' + m.header.stamp = stamp + m.ns = 'candidates' + m.id = mid; mid += 1 + m.type = Marker.SPHERE + m.action = Marker.ADD + m.pose.position.x = float(pt[0]) + m.pose.position.y = float(pt[1]) + m.pose.position.z = float(pt[2]) + m.pose.orientation.w = 1.0 + m.scale.x = m.scale.y = m.scale.z = 0.05 + m.color.r = 0.9; m.color.g = 0.9; m.color.b = 0.0 + m.color.a = alpha + ma.markers.append(m) + + if self._plane_valid: + n = self._plane_normal + o = self._plane_origin + q = _quat_from_z_to_normal(n) + + # Semi-transparent cyan disc representing the plane + disc = Marker() + disc.header.frame_id = 'left' + disc.header.stamp = stamp + disc.ns = 'plane_disc' + disc.id = mid; mid += 1 + disc.type = Marker.CYLINDER + disc.action = Marker.ADD + disc.pose.position.x = float(o[0]) + disc.pose.position.y = float(o[1]) + disc.pose.position.z = float(o[2]) + disc.pose.orientation.x = float(q[0]) + disc.pose.orientation.y = float(q[1]) + disc.pose.orientation.z = float(q[2]) + disc.pose.orientation.w = float(q[3]) + disc.scale.x = disc.scale.y = PLANE_VIS_RADIUS * 2.0 + disc.scale.z = 0.005 + disc.color.r = 0.0; disc.color.g = 0.75; disc.color.b = 1.0 + disc.color.a = 0.25 + ma.markers.append(disc) + + # Green arrow showing the plane normal + arrow = Marker() + arrow.header.frame_id = 'left' + arrow.header.stamp = stamp + arrow.ns = 'plane_normal' + arrow.id = mid; mid += 1 + arrow.type = Marker.ARROW + arrow.action = Marker.ADD + arrow.pose.position.x = float(o[0]) + arrow.pose.position.y = float(o[1]) + arrow.pose.position.z = float(o[2]) + arrow.pose.orientation.x = float(q[0]) + arrow.pose.orientation.y = float(q[1]) + arrow.pose.orientation.z = float(q[2]) + arrow.pose.orientation.w = float(q[3]) + arrow.scale.x = 0.4 # length + arrow.scale.y = 0.03 # shaft diameter + arrow.scale.z = 0.06 # head diameter + arrow.color.r = 0.0; arrow.color.g = 1.0; arrow.color.b = 0.4 + arrow.color.a = 1.0 + ma.markers.append(arrow) + + # PoseStamped — origin at plane centre, z-axis along normal + ps = PoseStamped() + ps.header.frame_id = 'left' + ps.header.stamp = stamp + ps.pose.position.x = float(o[0]) + ps.pose.position.y = float(o[1]) + ps.pose.position.z = float(o[2]) + ps.pose.orientation.x = float(q[0]) + ps.pose.orientation.y = float(q[1]) + ps.pose.orientation.z = float(q[2]) + ps.pose.orientation.w = float(q[3]) + self._pose_pub.publish(ps) + + self._marker_pub.publish(ma) + + +# ── Entry point ─────────────────────────────────────────────────────────────── + +def main(args=None): + rclpy.init(args=args) + node = GroundPlaneNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.try_shutdown() + + +if __name__ == '__main__': + main() diff --git a/3D_tracking_Re-ID/3D_tracking_Re-ID/single_person_loc_node.py b/3D_tracking_Re-ID/3D_tracking_Re-ID/single_person_loc_node.py index 7349d47..26dffb7 100644 --- a/3D_tracking_Re-ID/3D_tracking_Re-ID/single_person_loc_node.py +++ b/3D_tracking_Re-ID/3D_tracking_Re-ID/single_person_loc_node.py @@ -14,10 +14,12 @@ import cv2 import numpy as np import rclpy from rclpy.node import Node -from sensor_msgs.msg import Image, CameraInfo +from sensor_msgs.msg import Image, CameraInfo, PointCloud2, PointField +from std_msgs.msg import Header from visualization_msgs.msg import Marker, MarkerArray from geometry_msgs.msg import Point from cv_bridge import CvBridge +from sensor_msgs_py import point_cloud2 as pc2 from message_filters import Subscriber, ApproximateTimeSynchronizer from mmpose.apis import MMPoseInferencer @@ -83,10 +85,12 @@ class KeypointTriangulationNode(Node): self.declare_parameter('threshold', 0.3) self.declare_parameter('device', 'cuda:0') self.declare_parameter('max_residual', 0.10) # metres + self.declare_parameter('headless', False) self._threshold = self.get_parameter('threshold').value self._max_residual = self.get_parameter('max_residual').value device = self.get_parameter('device').value + self._headless = self.get_parameter('headless').value # ── MMPose (single shared instance) ───────────────────────────── self.get_logger().info(f'Loading MMPose on {device} ...') @@ -114,18 +118,32 @@ class KeypointTriangulationNode(Node): [left_sub, right_sub], queue_size=5, slop=0.05) self._sync.registerCallback(self._synced_cb) - # ── Publisher ─────────────────────────────────────────────────── + # ── Publishers ────────────────────────────────────────────────── self._marker_pub = self.create_publisher( MarkerArray, '/keypoint_markers', 10) + self._kp3d_pub = self.create_publisher( + PointCloud2, '/keypoints_3d', 10) + + # PointCloud2 field layout: x, y, z, person_id, kp_id (all float32) + self._pc2_fields = [ + PointField(name='x', offset=0, datatype=PointField.FLOAT32, count=1), + PointField(name='y', offset=4, datatype=PointField.FLOAT32, count=1), + PointField(name='z', offset=8, datatype=PointField.FLOAT32, count=1), + PointField(name='person_id', offset=12, datatype=PointField.FLOAT32, count=1), + PointField(name='kp_id', offset=16, datatype=PointField.FLOAT32, count=1), + ] # ── Display state ─────────────────────────────────────────────── - self._left_display = None - self._right_display = None - self._dist_display = None - self.get_logger().info( - f'Distance window uses keypoints with >= {DIST_THRESHOLD*100:.0f}% ' - 'confidence in both camera feeds.') - self.create_timer(1.0 / 30.0, self._display_timer_cb) + if not self._headless: + self._left_display = None + self._right_display = None + self._dist_display = None + self.get_logger().info( + f'Distance window uses keypoints with >= {DIST_THRESHOLD*100:.0f}% ' + 'confidence in both camera feeds.') + self.create_timer(1.0 / 30.0, self._display_timer_cb) + else: + self.get_logger().info('Running in headless mode — no display windows.') self.get_logger().info( 'Waiting for camera_info and synced image_raw from ' @@ -415,60 +433,75 @@ class KeypointTriangulationNode(Node): self._marker_pub.publish( self._build_markers(all_points_3d, left_msg.header.stamp)) - # Build distance display window - row_h = 220 - frame_h = max(400, 140 + row_h * max(len(avg_distances), 1)) - dist_frame = np.zeros((frame_h, 1600, 3), dtype=np.uint8) - cv2.putText(dist_frame, 'Avg distance (>=75% conf keypoints)', - (20, 70), cv2.FONT_HERSHEY_SIMPLEX, 2.0, (180, 180, 180), 3) - if avg_distances: - for i, (d, xyz) in enumerate(zip(avg_distances, avg_coords)): - if d is not None: - txt = f'Person {i + 1}: {d:.2f} m' - color = (100, 255, 100) - coord_txt = f'x={xyz[0]:+.3f} y={xyz[1]:+.3f} z={xyz[2]:+.3f} [m, left cam frame]' - coord_color = (160, 160, 160) - else: - txt = f'Person {i + 1}: -- (no 75%+ kps)' - color = (80, 80, 200) - coord_txt = '' - coord_color = (80, 80, 80) - cv2.putText(dist_frame, txt, (20, 140 + i * row_h), - cv2.FONT_HERSHEY_SIMPLEX, 4.5, color, 8) - if coord_txt: - cv2.putText(dist_frame, coord_txt, (28, 140 + i * row_h + 70), - cv2.FONT_HERSHEY_SIMPLEX, 1.4, coord_color, 2) - else: - cv2.putText(dist_frame, 'No people detected', (20, 200), - cv2.FONT_HERSHEY_SIMPLEX, 4.0, (100, 100, 100), 6) - self._dist_display = dist_frame + # Publish structured keypoints for downstream nodes (e.g. ground plane) + kp_points = [ + (float(pt[0]), float(pt[1]), float(pt[2]), + float(person_idx), float(kp_idx)) + for person_idx, person_pts in enumerate(all_points_3d) + for kp_idx, (pt, _) in person_pts.items() + ] + if kp_points: + h = Header() + h.stamp = left_msg.header.stamp + h.frame_id = 'left' + self._kp3d_pub.publish( + pc2.create_cloud(h, self._pc2_fields, kp_points)) - # Draw 2D keypoints on display frames - if left_people: - draw_keypoints( + if not self._headless: + # Build distance display window + row_h = 220 + frame_h = max(400, 140 + row_h * max(len(avg_distances), 1)) + dist_frame = np.zeros((frame_h, 1600, 3), dtype=np.uint8) + cv2.putText(dist_frame, 'Avg distance (>=75% conf keypoints)', + (20, 70), cv2.FONT_HERSHEY_SIMPLEX, 2.0, (180, 180, 180), 3) + if avg_distances: + for i, (d, xyz) in enumerate(zip(avg_distances, avg_coords)): + if d is not None: + txt = f'Person {i + 1}: {d:.2f} m' + color = (100, 255, 100) + coord_txt = f'x={xyz[0]:+.3f} y={xyz[1]:+.3f} z={xyz[2]:+.3f} [m, left cam frame]' + coord_color = (160, 160, 160) + else: + txt = f'Person {i + 1}: -- (no 75%+ kps)' + color = (80, 80, 200) + coord_txt = '' + coord_color = (80, 80, 80) + cv2.putText(dist_frame, txt, (20, 140 + i * row_h), + cv2.FONT_HERSHEY_SIMPLEX, 4.5, color, 8) + if coord_txt: + cv2.putText(dist_frame, coord_txt, (28, 140 + i * row_h + 70), + cv2.FONT_HERSHEY_SIMPLEX, 1.4, coord_color, 2) + else: + cv2.putText(dist_frame, 'No people detected', (20, 200), + cv2.FONT_HERSHEY_SIMPLEX, 4.0, (100, 100, 100), 6) + self._dist_display = dist_frame + + # Draw 2D keypoints on display frames + if left_people: + draw_keypoints( + left_frame, + [p['keypoints'] for p in left_people], + [p['scores'] for p in left_people], + self._threshold) + if right_people: + draw_keypoints( + right_frame, + [p['keypoints'] for p in right_people], + [p['scores'] for p in right_people], + self._threshold) + + n_3d = sum(len(p) for p in all_points_3d) + cv2.putText( left_frame, - [p['keypoints'] for p in left_people], - [p['scores'] for p in left_people], - self._threshold) - if right_people: - draw_keypoints( + f'People: {len(left_people)} 3D pts: {n_3d}', + (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) + cv2.putText( right_frame, - [p['keypoints'] for p in right_people], - [p['scores'] for p in right_people], - self._threshold) + f'People: {len(right_people)} Matched: {len(matches)}', + (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) - n_3d = sum(len(p) for p in all_points_3d) - cv2.putText( - left_frame, - f'People: {len(left_people)} 3D pts: {n_3d}', - (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) - cv2.putText( - right_frame, - f'People: {len(right_people)} Matched: {len(matches)}', - (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) - - self._left_display = left_frame - self._right_display = right_frame + self._left_display = left_frame + self._right_display = right_frame # ── Display timer ─────────────────────────────────────────────────── @@ -497,7 +530,8 @@ def main(args=None): except KeyboardInterrupt: pass finally: - cv2.destroyAllWindows() + if not node._headless: + cv2.destroyAllWindows() node.destroy_node() rclpy.try_shutdown() diff --git a/3D_tracking_Re-ID/launch/full_pipeline.launch.py b/3D_tracking_Re-ID/launch/full_pipeline.launch.py new file mode 100644 index 0000000..486fa06 --- /dev/null +++ b/3D_tracking_Re-ID/launch/full_pipeline.launch.py @@ -0,0 +1,58 @@ +"""Launch the full 3D tracking + ground-plane estimation pipeline. + +Nodes started: + 1. single_person_loc_node -- headless keypoint triangulator + publishes: /keypoint_markers (MarkerArray) + /keypoints_3d (PointCloud2) + 2. ground_plane_node -- ground-plane estimator + publishes: /ground_plane_markers (MarkerArray) + /ground_plane_pose (PoseStamped) + +Visualise in RViz with Fixed Frame = 'left', then add: + MarkerArray /keypoint_markers (3D skeleton + keypoints) + MarkerArray /ground_plane_markers (plane disc, normal arrow, ground samples) +""" + +import os +from launch import LaunchDescription +from launch.actions import ExecuteProcess + + +def generate_launch_description(): + python_exe = os.path.expanduser( + '~/miniconda3/envs/mmpose/bin/python3' + ) + + return LaunchDescription([ + + # ── 1. Keypoint triangulator (headless) ───────────────────────── + ExecuteProcess( + cmd=[ + python_exe, '-m', '3D_tracking_Re-ID.single_person_loc_node', + '--ros-args', + '-p', 'threshold:=0.3', + '-p', 'device:=cuda:0', + '-p', 'max_residual:=0.10', + '-p', 'headless:=true', + ], + output='screen', + env={**os.environ}, + ), + + # ── 2. Ground-plane estimator ──────────────────────────────────── + ExecuteProcess( + cmd=[ + python_exe, '-m', '3D_tracking_Re-ID.ground_plane_node', + '--ros-args', + '-p', 'stable_frames:=5', + '-p', 'stable_radius:=0.05', + '-p', 'duplicate_radius:=0', + '-p', 'collinearity_threshold:=0.25', + '-p', 'max_ground_points:=100', + '-p', 'min_plane_points:=5', + ], + output='screen', + env={**os.environ}, + ), + + ]) diff --git a/3D_tracking_Re-ID/launch/ground_plane.launch.py b/3D_tracking_Re-ID/launch/ground_plane.launch.py new file mode 100644 index 0000000..4c45c54 --- /dev/null +++ b/3D_tracking_Re-ID/launch/ground_plane.launch.py @@ -0,0 +1,53 @@ +"""Launch ground_plane_node alongside single_person_loc_node (headless). + +Runs the keypoint triangulator in headless mode and pipes its output +into the ground-plane estimator. Both nodes share the mmpose conda +Python environment. +""" + +import os +from launch import LaunchDescription +from launch.actions import ExecuteProcess + + +def generate_launch_description(): + python_exe = os.path.expanduser( + '~/miniconda3/envs/mmpose/bin/python3' + ) + + return LaunchDescription([ + + # ── Keypoint triangulator (headless) ──────────────────────────── + ExecuteProcess( + cmd=[ + python_exe, '-m', '3D_tracking_Re-ID.single_person_loc_node', + '--ros-args', + '-p', 'threshold:=0.3', + '-p', 'device:=cuda:0', + '-p', 'max_residual:=0.10', + '-p', 'headless:=true', + ], + output='screen', + env={**os.environ}, + ), + + # ── Ground-plane estimator ─────────────────────────────────────── + ExecuteProcess( + cmd=[ + python_exe, '-m', '3D_tracking_Re-ID.ground_plane_node', + '--ros-args', + # Foot must stay within 15 cm for 5 consecutive frames + '-p', 'stable_frames:=5', + '-p', 'stable_radius:=0.15', + # Suppress ground points closer than 12 cm to an existing one + '-p', 'duplicate_radius:=0.12', + # Points are collinear if none deviates > 5 cm from best-fit line + '-p', 'collinearity_threshold:=0.05', + '-p', 'max_ground_points:=100', + '-p', 'min_plane_points:=3', + ], + output='screen', + env={**os.environ}, + ), + + ]) diff --git a/3D_tracking_Re-ID/launch/keypoint_node.launch.py b/3D_tracking_Re-ID/launch/single_person_demo.launch.py similarity index 100% rename from 3D_tracking_Re-ID/launch/keypoint_node.launch.py rename to 3D_tracking_Re-ID/launch/single_person_demo.launch.py diff --git a/3D_tracking_Re-ID/launch/single_person_headless.launch.py b/3D_tracking_Re-ID/launch/single_person_headless.launch.py new file mode 100644 index 0000000..d5ec2a9 --- /dev/null +++ b/3D_tracking_Re-ID/launch/single_person_headless.launch.py @@ -0,0 +1,33 @@ +"""Launch single_person_loc_node in headless mode (no display windows). + +Publishes 3D keypoint markers to /keypoint_markers without opening any +OpenCV windows. Useful for running on a server or as part of a larger +pipeline where visualisation is handled elsewhere. +""" + +import os +from launch import LaunchDescription +from launch.actions import ExecuteProcess + + +def generate_launch_description(): + python_exe = os.path.expanduser( + '~/miniconda3/envs/mmpose/bin/python3' + ) + + node_module = '3D_tracking_Re-ID.single_person_loc_node' + + return LaunchDescription([ + ExecuteProcess( + cmd=[ + python_exe, '-m', node_module, + '--ros-args', + '-p', 'threshold:=0.3', + '-p', 'device:=cuda:0', + '-p', 'max_residual:=0.10', + '-p', 'headless:=true', + ], + output='screen', + env={**os.environ}, + ), + ]) diff --git a/3D_tracking_Re-ID/setup.py b/3D_tracking_Re-ID/setup.py index b22a159..ba5534f 100644 --- a/3D_tracking_Re-ID/setup.py +++ b/3D_tracking_Re-ID/setup.py @@ -28,6 +28,7 @@ setup( entry_points={ 'console_scripts': [ 'single_person_loc_node = 3D_tracking_Re-ID.single_person_loc_node:main', + 'ground_plane_node = 3D_tracking_Re-ID.ground_plane_node:main', ], }, )