diff --git a/frigate/motion/path_visualizer.py b/frigate/motion/path_visualizer.py index 818aff6ce..55a01fa7f 100644 --- a/frigate/motion/path_visualizer.py +++ b/frigate/motion/path_visualizer.py @@ -1,43 +1,101 @@ import cv2 import numpy as np from typing import Dict, List, Tuple +from filterpy.kalman import KalmanFilter class PathVisualizer: def __init__(self, history_length: int = 30, prediction_length: int = 15): - self.history_length = history_length # Number of past positions to show - self.prediction_length = prediction_length # Number of predicted positions - self.position_history: Dict[str, List[Tuple[int, int]]] = {} # object_id -> list of positions + self.history_length = history_length + self.prediction_length = prediction_length + self.position_history: Dict[str, List[Tuple[int, int]]] = {} + self.kalman_filters: Dict[str, KalmanFilter] = {} - def update_position(self, object_id: str, centroid: Tuple[int, int]): + def _init_kalman(self, object_id: str, initial_pos: Tuple[int, int]): + """Initialize Kalman filter for new object with position and velocity state""" + kf = KalmanFilter(dim_x=4, dim_z=2) # State: [x, y, vx, vy], Measurement: [x, y] + + # State transition matrix + kf.F = np.array([ + [1, 0, 1, 0], # x = x + vx + [0, 1, 0, 1], # y = y + vy + [0, 0, 1, 0], # vx = vx + [0, 0, 0, 1], # vy = vy + ]) + + # Measurement matrix + kf.H = np.array([ + [1, 0, 0, 0], + [0, 1, 0, 0] + ]) + + # Measurement noise + kf.R = np.eye(2) * 5 + + # Process noise + kf.Q = np.eye(4) * 0.1 + + # Initial state + kf.x = np.array([initial_pos[0], initial_pos[1], 0, 0]) + + # Initial state covariance + kf.P = np.eye(4) * 100 + + self.kalman_filters[object_id] = kf + + def update_position(self, object_id: str, centroid: Tuple[int, int], frame_shape: Tuple[int, int]): + """Update position history and Kalman filter for an object""" if object_id not in self.position_history: self.position_history[object_id] = [] + self._init_kalman(object_id, centroid) - self.position_history[object_id].append(centroid) + # Update Kalman filter + kf = self.kalman_filters[object_id] + measurement = np.array([centroid[0], centroid[1]]) + kf.predict() + kf.update(measurement) - # Keep only recent history + # Store filtered position + filtered_pos = (int(kf.x[0]), int(kf.x[1])) + self.position_history[object_id].append(filtered_pos) + + # Trim history if len(self.position_history[object_id]) > self.history_length: self.position_history[object_id] = self.position_history[object_id][-self.history_length:] - def predict_path(self, object_id: str) -> List[Tuple[int, int]]: - if object_id not in self.position_history or len(self.position_history[object_id]) < 2: + def predict_path(self, object_id: str, frame_shape: Tuple[int, int]) -> List[Tuple[int, int]]: + """Predict future path using Kalman filter""" + if object_id not in self.kalman_filters: return [] - # Get last two positions to calculate velocity vector - positions = self.position_history[object_id] - p1 = np.array(positions[-2]) - p2 = np.array(positions[-1]) - velocity = p2 - p1 + kf = self.kalman_filters[object_id] + predictions = [] + + # Save current state + current_state = kf.x.copy() + current_covar = kf.P.copy() # Predict future positions - predictions = [] - current_pos = p2 for _ in range(self.prediction_length): - current_pos = current_pos + velocity - predictions.append(tuple(map(int, current_pos))) + kf.predict() + pred_x = int(kf.x[0]) + pred_y = int(kf.x[1]) + + # Constrain predictions to frame boundaries + pred_x = max(0, min(pred_x, frame_shape[1])) + pred_y = max(0, min(pred_y, frame_shape[0])) + + predictions.append((pred_x, pred_y)) + + # Restore saved state + kf.x = current_state + kf.P = current_covar return predictions def draw_paths(self, frame: np.ndarray, active_objects: List[str]): + """Draw historical and predicted paths for active objects""" + frame_shape = frame.shape[:2] + for object_id in active_objects: if object_id not in self.position_history: continue @@ -60,7 +118,7 @@ class PathVisualizer: cv2.line(frame, start_pos, end_pos, color, 2, cv2.LINE_AA) # Draw predicted path - predictions = self.predict_path(object_id) + predictions = self.predict_path(object_id, frame_shape) for i in range(1, len(predictions)): # Red color with fading opacity for future predictions alpha = 1 - (i / len(predictions)) @@ -73,11 +131,12 @@ class PathVisualizer: overlay = frame.copy() cv2.line(overlay, start_pos, end_pos, color, 2, cv2.LINE_AA) cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0, frame) - + def cleanup_inactive(self, active_objects: List[str]): """Remove tracking data for inactive objects""" current_ids = set(active_objects) tracked_ids = set(self.position_history.keys()) for inactive_id in tracked_ids - current_ids: - self.position_history.pop(inactive_id, None) \ No newline at end of file + self.position_history.pop(inactive_id, None) + self.kalman_filters.pop(inactive_id, None) \ No newline at end of file diff --git a/frigate/object_processing.py b/frigate/object_processing.py index 3ea31056d..fd5f21293 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -230,7 +230,7 @@ class CameraState: position=self.camera_config.timestamp_style.position, ) - if draw_options.get("motion_paths", True): # Enable by default + if draw_options.get("motion_paths"): # Update and draw paths for non-stationary objects active_objects = [ obj_id for obj_id, obj in tracked_objects.items() @@ -244,7 +244,7 @@ class CameraState: int((obj["box"][0] + obj["box"][2]) / 2), # x center int((obj["box"][1] + obj["box"][3]) / 2) # y center ) - self.path_visualizer.update_position(obj_id, centroid) + self.path_visualizer.update_position(obj_id, centroid, frame_copy.shape[:2]) # Draw paths self.path_visualizer.draw_paths(frame_copy, active_objects)