use a Kalman filter for smoother pathing

This commit is contained in:
Gabriel Grant 2025-02-15 15:23:54 -05:00
parent fa15fd7ef0
commit bb4fd9531a
2 changed files with 81 additions and 22 deletions

View File

@ -1,43 +1,101 @@
import cv2 import cv2
import numpy as np import numpy as np
from typing import Dict, List, Tuple from typing import Dict, List, Tuple
from filterpy.kalman import KalmanFilter
class PathVisualizer: class PathVisualizer:
def __init__(self, history_length: int = 30, prediction_length: int = 15): def __init__(self, history_length: int = 30, prediction_length: int = 15):
self.history_length = history_length # Number of past positions to show self.history_length = history_length
self.prediction_length = prediction_length # Number of predicted positions self.prediction_length = prediction_length
self.position_history: Dict[str, List[Tuple[int, int]]] = {} # object_id -> list of positions self.position_history: Dict[str, List[Tuple[int, int]]] = {}
self.kalman_filters: Dict[str, KalmanFilter] = {}
def update_position(self, object_id: str, centroid: Tuple[int, int]): def _init_kalman(self, object_id: str, initial_pos: Tuple[int, int]):
"""Initialize Kalman filter for new object with position and velocity state"""
kf = KalmanFilter(dim_x=4, dim_z=2) # State: [x, y, vx, vy], Measurement: [x, y]
# State transition matrix
kf.F = np.array([
[1, 0, 1, 0], # x = x + vx
[0, 1, 0, 1], # y = y + vy
[0, 0, 1, 0], # vx = vx
[0, 0, 0, 1], # vy = vy
])
# Measurement matrix
kf.H = np.array([
[1, 0, 0, 0],
[0, 1, 0, 0]
])
# Measurement noise
kf.R = np.eye(2) * 5
# Process noise
kf.Q = np.eye(4) * 0.1
# Initial state
kf.x = np.array([initial_pos[0], initial_pos[1], 0, 0])
# Initial state covariance
kf.P = np.eye(4) * 100
self.kalman_filters[object_id] = kf
def update_position(self, object_id: str, centroid: Tuple[int, int], frame_shape: Tuple[int, int]):
"""Update position history and Kalman filter for an object"""
if object_id not in self.position_history: if object_id not in self.position_history:
self.position_history[object_id] = [] self.position_history[object_id] = []
self._init_kalman(object_id, centroid)
self.position_history[object_id].append(centroid) # Update Kalman filter
kf = self.kalman_filters[object_id]
measurement = np.array([centroid[0], centroid[1]])
kf.predict()
kf.update(measurement)
# Keep only recent history # Store filtered position
filtered_pos = (int(kf.x[0]), int(kf.x[1]))
self.position_history[object_id].append(filtered_pos)
# Trim history
if len(self.position_history[object_id]) > self.history_length: if len(self.position_history[object_id]) > self.history_length:
self.position_history[object_id] = self.position_history[object_id][-self.history_length:] self.position_history[object_id] = self.position_history[object_id][-self.history_length:]
def predict_path(self, object_id: str) -> List[Tuple[int, int]]: def predict_path(self, object_id: str, frame_shape: Tuple[int, int]) -> List[Tuple[int, int]]:
if object_id not in self.position_history or len(self.position_history[object_id]) < 2: """Predict future path using Kalman filter"""
if object_id not in self.kalman_filters:
return [] return []
# Get last two positions to calculate velocity vector kf = self.kalman_filters[object_id]
positions = self.position_history[object_id] predictions = []
p1 = np.array(positions[-2])
p2 = np.array(positions[-1]) # Save current state
velocity = p2 - p1 current_state = kf.x.copy()
current_covar = kf.P.copy()
# Predict future positions # Predict future positions
predictions = []
current_pos = p2
for _ in range(self.prediction_length): for _ in range(self.prediction_length):
current_pos = current_pos + velocity kf.predict()
predictions.append(tuple(map(int, current_pos))) pred_x = int(kf.x[0])
pred_y = int(kf.x[1])
# Constrain predictions to frame boundaries
pred_x = max(0, min(pred_x, frame_shape[1]))
pred_y = max(0, min(pred_y, frame_shape[0]))
predictions.append((pred_x, pred_y))
# Restore saved state
kf.x = current_state
kf.P = current_covar
return predictions return predictions
def draw_paths(self, frame: np.ndarray, active_objects: List[str]): def draw_paths(self, frame: np.ndarray, active_objects: List[str]):
"""Draw historical and predicted paths for active objects"""
frame_shape = frame.shape[:2]
for object_id in active_objects: for object_id in active_objects:
if object_id not in self.position_history: if object_id not in self.position_history:
continue continue
@ -60,7 +118,7 @@ class PathVisualizer:
cv2.line(frame, start_pos, end_pos, color, 2, cv2.LINE_AA) cv2.line(frame, start_pos, end_pos, color, 2, cv2.LINE_AA)
# Draw predicted path # Draw predicted path
predictions = self.predict_path(object_id) predictions = self.predict_path(object_id, frame_shape)
for i in range(1, len(predictions)): for i in range(1, len(predictions)):
# Red color with fading opacity for future predictions # Red color with fading opacity for future predictions
alpha = 1 - (i / len(predictions)) alpha = 1 - (i / len(predictions))
@ -73,11 +131,12 @@ class PathVisualizer:
overlay = frame.copy() overlay = frame.copy()
cv2.line(overlay, start_pos, end_pos, color, 2, cv2.LINE_AA) cv2.line(overlay, start_pos, end_pos, color, 2, cv2.LINE_AA)
cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0, frame) cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0, frame)
def cleanup_inactive(self, active_objects: List[str]): def cleanup_inactive(self, active_objects: List[str]):
"""Remove tracking data for inactive objects""" """Remove tracking data for inactive objects"""
current_ids = set(active_objects) current_ids = set(active_objects)
tracked_ids = set(self.position_history.keys()) tracked_ids = set(self.position_history.keys())
for inactive_id in tracked_ids - current_ids: for inactive_id in tracked_ids - current_ids:
self.position_history.pop(inactive_id, None) self.position_history.pop(inactive_id, None)
self.kalman_filters.pop(inactive_id, None)

View File

@ -230,7 +230,7 @@ class CameraState:
position=self.camera_config.timestamp_style.position, position=self.camera_config.timestamp_style.position,
) )
if draw_options.get("motion_paths", True): # Enable by default if draw_options.get("motion_paths"):
# Update and draw paths for non-stationary objects # Update and draw paths for non-stationary objects
active_objects = [ active_objects = [
obj_id for obj_id, obj in tracked_objects.items() obj_id for obj_id, obj in tracked_objects.items()
@ -244,7 +244,7 @@ class CameraState:
int((obj["box"][0] + obj["box"][2]) / 2), # x center int((obj["box"][0] + obj["box"][2]) / 2), # x center
int((obj["box"][1] + obj["box"][3]) / 2) # y center int((obj["box"][1] + obj["box"][3]) / 2) # y center
) )
self.path_visualizer.update_position(obj_id, centroid) self.path_visualizer.update_position(obj_id, centroid, frame_copy.shape[:2])
# Draw paths # Draw paths
self.path_visualizer.draw_paths(frame_copy, active_objects) self.path_visualizer.draw_paths(frame_copy, active_objects)