overlay view
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
"""Launch the full 3D tracking + ground-plane estimation pipeline.
|
||||
"""Launch the full 3D tracking + ground-plane estimation + image overlay pipeline.
|
||||
|
||||
Nodes started:
|
||||
1. single_person_loc_node -- headless keypoint triangulator
|
||||
@@ -7,10 +7,11 @@ Nodes started:
|
||||
2. ground_plane_node -- ground-plane estimator
|
||||
publishes: /ground_plane_markers (MarkerArray)
|
||||
/ground_plane_pose (PoseStamped)
|
||||
3. overlay_node -- undistorted image with plane grid + skeleton
|
||||
publishes: /overlay_image (Image)
|
||||
|
||||
Visualise in RViz with Fixed Frame = 'left', then add:
|
||||
MarkerArray /keypoint_markers (3D skeleton + keypoints)
|
||||
MarkerArray /ground_plane_markers (plane disc, normal arrow, ground samples)
|
||||
To view the annotated image:
|
||||
ros2 run rqt_image_view rqt_image_view → select /overlay_image
|
||||
"""
|
||||
|
||||
import os
|
||||
@@ -55,4 +56,14 @@ def generate_launch_description():
|
||||
env={**os.environ},
|
||||
),
|
||||
|
||||
# ── 3. Image overlay (undistorted image + plane grid + skeleton) ──
|
||||
ExecuteProcess(
|
||||
cmd=[
|
||||
python_exe, '-m', 'tracking_re_id.overlay_node',
|
||||
'--ros-args',
|
||||
],
|
||||
output='screen',
|
||||
env={**os.environ},
|
||||
),
|
||||
|
||||
])
|
||||
|
||||
@@ -29,6 +29,7 @@ setup(
|
||||
'console_scripts': [
|
||||
'single_person_loc_node = tracking_re_id.single_person_loc_node:main',
|
||||
'ground_plane_node = tracking_re_id.ground_plane_node:main',
|
||||
'overlay_node = tracking_re_id.overlay_node:main',
|
||||
],
|
||||
},
|
||||
)
|
||||
|
||||
352
tracking_re_id/tracking_re_id/overlay_node.py
Normal file
352
tracking_re_id/tracking_re_id/overlay_node.py
Normal file
@@ -0,0 +1,352 @@
|
||||
"""
|
||||
ROS 2 node: overlay_node
|
||||
|
||||
Reprojects the estimated ground plane and 3D keypoints back onto the raw
|
||||
camera image. The image is first undistorted using the camera's intrinsic
|
||||
calibration so that projection with the new (distortion-free) camera matrix
|
||||
K_new is geometrically exact.
|
||||
|
||||
Subscriptions
|
||||
─────────────
|
||||
/stereo/left/image_raw sensor_msgs/Image
|
||||
/stereo/left/camera_info sensor_msgs/CameraInfo
|
||||
/keypoints_3d sensor_msgs/PointCloud2 (from single_person_loc_node)
|
||||
/ground_plane_pose geometry_msgs/PoseStamped (from ground_plane_node)
|
||||
|
||||
Publications
|
||||
────────────
|
||||
/overlay_image sensor_msgs/Image (undistorted + annotated)
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
import cv2
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from sensor_msgs.msg import Image, CameraInfo, PointCloud2
|
||||
from geometry_msgs.msg import PoseStamped
|
||||
from sensor_msgs_py import point_cloud2 as pc2
|
||||
from cv_bridge import CvBridge
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════════
|
||||
# TUNABLE PARAMETERS
|
||||
# ═══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
# Distance (metres) between adjacent parallel grid lines on the ground plane.
|
||||
GRID_SPACING: float = 0.5
|
||||
|
||||
# How far the grid extends in each in-plane direction from the plane origin.
|
||||
# A value of 3.0 m gives a 6 m × 6 m grid.
|
||||
GRID_EXTENT: float = 3.0
|
||||
|
||||
# Colour of regular grid lines (B, G, R)
|
||||
GRID_COLOR: tuple = (0, 215, 255)
|
||||
|
||||
# Thickness of regular grid lines in pixels.
|
||||
GRID_THICKNESS: int = 1
|
||||
|
||||
# Colour of the two centre (axis) lines of the grid.
|
||||
GRID_AXIS_COLOR: tuple = (0, 255, 128)
|
||||
|
||||
# Thickness of grid axis lines in pixels.
|
||||
GRID_AXIS_THICKNESS: int = 2
|
||||
|
||||
# Radius of the plane-origin dot drawn on the image, in pixels.
|
||||
GRID_ORIGIN_RADIUS: int = 6
|
||||
|
||||
# Radius of ordinary keypoint dots, in pixels.
|
||||
KP_RADIUS: int = 6
|
||||
|
||||
# Thickness of skeleton line segments, in pixels.
|
||||
SKELETON_THICKNESS: int = 2
|
||||
|
||||
# Cycle of per-person colours (B, G, R).
|
||||
PERSON_COLORS: list = [
|
||||
(0, 230, 0),
|
||||
(0, 100, 255),
|
||||
(255, 50, 200),
|
||||
(0, 200, 255),
|
||||
]
|
||||
|
||||
# COCO-17 keypoint index pairs that form the skeleton.
|
||||
COCO_SKELETON: list = [
|
||||
(0, 1), (0, 2), # nose → eyes
|
||||
(1, 3), (2, 4), # eyes → ears
|
||||
(5, 6), # shoulders
|
||||
(5, 7), (7, 9), # left arm
|
||||
(6, 8), (8, 10), # right arm
|
||||
(5, 11), (6, 12), # torso sides
|
||||
(11, 12), # hips
|
||||
(11, 13), (13, 15), # left leg
|
||||
(12, 14), (14, 16), # right leg
|
||||
]
|
||||
|
||||
# COCO-17 indices of ankle / foot keypoints — drawn with a highlight ring.
|
||||
FOOT_KP_INDICES: frozenset = frozenset({15, 16})
|
||||
|
||||
# If alpha=0.0, getOptimalNewCameraMatrix keeps all valid pixels but adds
|
||||
# black borders. alpha=0.0 crops to the fully-valid rectangle (no black).
|
||||
UNDISTORT_ALPHA: float = 0.0
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════════
|
||||
# Helpers
|
||||
# ═══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
def _quat_to_rot(qx: float, qy: float, qz: float, qw: float) -> np.ndarray:
|
||||
"""Return the 3×3 rotation matrix for unit quaternion (x, y, z, w)."""
|
||||
return np.array([
|
||||
[1 - 2*(qy*qy + qz*qz), 2*(qx*qy - qz*qw), 2*(qx*qz + qy*qw)],
|
||||
[ 2*(qx*qy + qz*qw), 1 - 2*(qx*qx + qz*qz), 2*(qy*qz - qx*qw)],
|
||||
[ 2*(qx*qz - qy*qw), 2*(qy*qz + qx*qw), 1 - 2*(qx*qx + qy*qy)],
|
||||
], dtype=np.float64)
|
||||
|
||||
|
||||
def _project_pt(pt: np.ndarray, K: np.ndarray) -> tuple[int, int]:
|
||||
"""Project a single 3-D point (camera frame) to integer pixel coords."""
|
||||
p = K @ pt
|
||||
return int(p[0] / p[2]), int(p[1] / p[2])
|
||||
|
||||
|
||||
def _clip_to_camera(p0: np.ndarray, p1: np.ndarray,
|
||||
z_near: float = 0.01
|
||||
) -> tuple[np.ndarray | None, np.ndarray | None]:
|
||||
"""
|
||||
Clip a 3-D line segment to the half-space z >= z_near.
|
||||
|
||||
Returns (clipped_p0, clipped_p1) or (None, None) if fully behind camera.
|
||||
"""
|
||||
in0 = p0[2] >= z_near
|
||||
in1 = p1[2] >= z_near
|
||||
|
||||
if in0 and in1:
|
||||
return p0, p1
|
||||
if not in0 and not in1:
|
||||
return None, None
|
||||
|
||||
# One endpoint is behind — find the crossing point
|
||||
t = (z_near - p0[2]) / (p1[2] - p0[2])
|
||||
p_cross = p0 + t * (p1 - p0)
|
||||
|
||||
if in0:
|
||||
return p0, p_cross
|
||||
else:
|
||||
return p_cross, p1
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════════
|
||||
# Node
|
||||
# ═══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class OverlayNode(Node):
|
||||
"""
|
||||
Undistorts the left camera image and overlays:
|
||||
• a perspective grid on the fitted ground plane
|
||||
• the triangulated 3D skeleton keypoints
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__('overlay_node')
|
||||
|
||||
self._bridge = CvBridge()
|
||||
|
||||
# Camera calibration — populated on first CameraInfo message
|
||||
self._K_new: np.ndarray | None = None # undistorted camera matrix
|
||||
self._mapx: np.ndarray | None = None # remap X-map
|
||||
self._mapy: np.ndarray | None = None # remap Y-map
|
||||
|
||||
# Latest data from upstream nodes (cached; applied to every image)
|
||||
# {person_id: {kp_id: np.ndarray(3)}}
|
||||
self._kps: dict[int, dict[int, np.ndarray]] = {}
|
||||
|
||||
# (origin_3d, rotation_3x3) — axes: col0=u, col1=v, col2=normal
|
||||
self._plane: tuple[np.ndarray, np.ndarray] | None = None
|
||||
|
||||
# Subscriptions
|
||||
self.create_subscription(
|
||||
CameraInfo, '/stereo/left/camera_info', self._cam_info_cb, 1)
|
||||
self.create_subscription(
|
||||
Image, '/stereo/left/image_raw', self._image_cb, 10)
|
||||
self.create_subscription(
|
||||
PointCloud2, '/keypoints_3d', self._kp_cb, 10)
|
||||
self.create_subscription(
|
||||
PoseStamped, '/ground_plane_pose', self._plane_cb, 10)
|
||||
|
||||
# Publisher
|
||||
self._pub = self.create_publisher(Image, '/overlay_image', 10)
|
||||
|
||||
self.get_logger().info('Overlay node started — publishing /overlay_image')
|
||||
|
||||
# ── Camera calibration ───────────────────────────────────────────────────
|
||||
|
||||
def _cam_info_cb(self, msg: CameraInfo):
|
||||
if self._K_new is not None:
|
||||
return # already initialised
|
||||
|
||||
K = np.array(msg.k, dtype=np.float64).reshape(3, 3)
|
||||
D = np.array(msg.d, dtype=np.float64)
|
||||
w, h = msg.width, msg.height
|
||||
|
||||
K_new, _ = cv2.getOptimalNewCameraMatrix(
|
||||
K, D, (w, h), alpha=UNDISTORT_ALPHA)
|
||||
mapx, mapy = cv2.initUndistortRectifyMap(
|
||||
K, D, None, K_new, (w, h), cv2.CV_32FC1)
|
||||
|
||||
self._K_new = K_new
|
||||
self._mapx = mapx
|
||||
self._mapy = mapy
|
||||
|
||||
self.get_logger().info(
|
||||
f'Calibration received — K_new: '
|
||||
f'fx={K_new[0,0]:.1f} fy={K_new[1,1]:.1f} '
|
||||
f'cx={K_new[0,2]:.1f} cy={K_new[1,2]:.1f}')
|
||||
|
||||
# ── Upstream data caches ─────────────────────────────────────────────────
|
||||
|
||||
def _kp_cb(self, msg: PointCloud2):
|
||||
kps: dict[int, dict[int, np.ndarray]] = {}
|
||||
for pt in pc2.read_points(
|
||||
msg,
|
||||
field_names=('x', 'y', 'z', 'person_id', 'kp_id'),
|
||||
skip_nans=True):
|
||||
pid = int(pt[3])
|
||||
kid = int(pt[4])
|
||||
kps.setdefault(pid, {})[kid] = np.array(
|
||||
[pt[0], pt[1], pt[2]], dtype=np.float64)
|
||||
self._kps = kps
|
||||
|
||||
def _plane_cb(self, msg: PoseStamped):
|
||||
o = msg.pose.position
|
||||
q = msg.pose.orientation
|
||||
origin = np.array([o.x, o.y, o.z], dtype=np.float64)
|
||||
R = _quat_to_rot(q.x, q.y, q.z, q.w)
|
||||
self._plane = (origin, R)
|
||||
|
||||
# ── Main rendering loop ──────────────────────────────────────────────────
|
||||
|
||||
def _image_cb(self, msg: Image):
|
||||
if self._K_new is None:
|
||||
return # waiting for calibration
|
||||
|
||||
# Decode & undistort
|
||||
raw = self._bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
|
||||
frame = cv2.remap(raw, self._mapx, self._mapy, cv2.INTER_LINEAR)
|
||||
|
||||
K = self._K_new
|
||||
|
||||
# 1 ── Ground-plane grid
|
||||
if self._plane is not None:
|
||||
self._draw_grid(frame, K)
|
||||
|
||||
# 2 ── 3-D keypoints + skeleton
|
||||
if self._kps:
|
||||
self._draw_keypoints(frame, K)
|
||||
|
||||
self._pub.publish(
|
||||
self._bridge.cv2_to_imgmsg(frame, encoding='bgr8'))
|
||||
|
||||
# ── Ground-plane grid ────────────────────────────────────────────────────
|
||||
|
||||
def _draw_grid(self, frame: np.ndarray, K: np.ndarray):
|
||||
origin, R = self._plane
|
||||
|
||||
# In-plane axes and plane normal (all in left-camera frame)
|
||||
u_axis = R[:, 0] # first in-plane direction
|
||||
v_axis = R[:, 1] # second in-plane direction
|
||||
|
||||
steps = np.arange(
|
||||
-GRID_EXTENT, GRID_EXTENT + GRID_SPACING * 0.5, GRID_SPACING)
|
||||
|
||||
for s in steps:
|
||||
is_axis = abs(s) < 1e-6 # centre line gets a different style
|
||||
color = GRID_AXIS_COLOR if is_axis else GRID_COLOR
|
||||
thick = GRID_AXIS_THICKNESS if is_axis else GRID_THICKNESS
|
||||
|
||||
# Line running along v (u = s)
|
||||
self._draw_3d_line(
|
||||
frame, K, color, thick,
|
||||
origin + s * u_axis - GRID_EXTENT * v_axis,
|
||||
origin + s * u_axis + GRID_EXTENT * v_axis)
|
||||
|
||||
# Line running along u (v = s)
|
||||
self._draw_3d_line(
|
||||
frame, K, color, thick,
|
||||
origin - GRID_EXTENT * u_axis + s * v_axis,
|
||||
origin + GRID_EXTENT * u_axis + s * v_axis)
|
||||
|
||||
# Plane origin dot + label
|
||||
if origin[2] > 0.01:
|
||||
ox, oy = _project_pt(origin, K)
|
||||
cv2.circle(frame, (ox, oy), GRID_ORIGIN_RADIUS,
|
||||
GRID_AXIS_COLOR, -1, cv2.LINE_AA)
|
||||
cv2.putText(frame, 'plane origin',
|
||||
(ox + 8, oy - 8),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.45,
|
||||
GRID_AXIS_COLOR, 1, cv2.LINE_AA)
|
||||
|
||||
def _draw_3d_line(self, frame: np.ndarray, K: np.ndarray,
|
||||
color: tuple, thickness: int,
|
||||
p0: np.ndarray, p1: np.ndarray):
|
||||
"""Project and draw a 3-D line segment, clipping against z=0."""
|
||||
cp0, cp1 = _clip_to_camera(p0, p1)
|
||||
if cp0 is None:
|
||||
return
|
||||
pt0 = _project_pt(cp0, K)
|
||||
pt1 = _project_pt(cp1, K)
|
||||
cv2.line(frame, pt0, pt1, color, thickness, cv2.LINE_AA)
|
||||
|
||||
# ── Keypoints + skeleton ─────────────────────────────────────────────────
|
||||
|
||||
def _draw_keypoints(self, frame: np.ndarray, K: np.ndarray):
|
||||
for person_id, kp_dict in self._kps.items():
|
||||
color = PERSON_COLORS[person_id % len(PERSON_COLORS)]
|
||||
|
||||
# Project all keypoints for this person
|
||||
kp_px: dict[int, tuple[int, int]] = {}
|
||||
for kid, pt3d in kp_dict.items():
|
||||
if pt3d[2] <= 0.01:
|
||||
continue
|
||||
ix, iy = _project_pt(pt3d, K)
|
||||
kp_px[kid] = (ix, iy)
|
||||
|
||||
# Foot keypoints: yellow highlight ring
|
||||
if kid in FOOT_KP_INDICES:
|
||||
cv2.circle(frame, (ix, iy), KP_RADIUS + 5,
|
||||
(0, 230, 230), 2, cv2.LINE_AA)
|
||||
|
||||
cv2.circle(frame, (ix, iy), KP_RADIUS, color, -1, cv2.LINE_AA)
|
||||
|
||||
# Skeleton edges
|
||||
for i, j in COCO_SKELETON:
|
||||
if i in kp_px and j in kp_px:
|
||||
cv2.line(frame, kp_px[i], kp_px[j],
|
||||
color, SKELETON_THICKNESS, cv2.LINE_AA)
|
||||
|
||||
# Person label near nose (kp 0) or first visible keypoint
|
||||
label_pt = kp_px.get(0) or (
|
||||
next(iter(kp_px.values())) if kp_px else None)
|
||||
if label_pt:
|
||||
cv2.putText(frame, f'P{person_id}',
|
||||
(label_pt[0] + 8, label_pt[1] - 8),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.55,
|
||||
color, 2, cv2.LINE_AA)
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════════
|
||||
# Entry point
|
||||
# ═══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = OverlayNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.try_shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user