From fea64fa93a4d678d50b3b606381da69238eaa950 Mon Sep 17 00:00:00 2001 From: Gabriel Grant Date: Sat, 15 Feb 2025 21:12:08 -0500 Subject: [PATCH] format --- frigate/config/camera/camera.py | 4 +- frigate/config/camera/motion_path.py | 6 +- frigate/motion/path_visualizer.py | 104 ++++++++++++++------------- frigate/object_processing.py | 32 +++++---- frigate/track/tracked_object.py | 16 +++-- 5 files changed, 87 insertions(+), 75 deletions(-) diff --git a/frigate/config/camera/camera.py b/frigate/config/camera/camera.py index be794ce0c..6107658d0 100644 --- a/frigate/config/camera/camera.py +++ b/frigate/config/camera/camera.py @@ -63,9 +63,7 @@ class CameraConfig(FrigateBaseModel): motion: Optional[MotionConfig] = Field( None, title="Motion detection configuration." ) - motion_paths: Optional[MotionPathConfig] = Field( - None, title="Enable motion paths." - ) + motion_paths: Optional[MotionPathConfig] = Field(None, title="Enable motion paths.") objects: ObjectConfig = Field( default_factory=ObjectConfig, title="Object configuration." ) diff --git a/frigate/config/camera/motion_path.py b/frigate/config/camera/motion_path.py index 5093dda76..5ce8fce62 100644 --- a/frigate/config/camera/motion_path.py +++ b/frigate/config/camera/motion_path.py @@ -26,10 +26,10 @@ class MotionPathConfig(FrigateBaseModel): def serialize_raw_mask(self, value: Any, info): return None - @validator('max_history') + @validator("max_history") def max_history_range(cls, v): if v < 2: - raise ValueError('max_history must be >= 2') + raise ValueError("max_history must be >= 2") if v > 100: - raise ValueError('max_history must be <= 100') + raise ValueError("max_history must be <= 100") return v diff --git a/frigate/motion/path_visualizer.py b/frigate/motion/path_visualizer.py index a6ea2a7f7..e145e39a5 100644 --- a/frigate/motion/path_visualizer.py +++ b/frigate/motion/path_visualizer.py @@ -3,140 +3,146 @@ import numpy as np from typing import Dict, List, Tuple from filterpy.kalman import KalmanFilter + class PathVisualizer: def __init__(self, history_length: int = 30, prediction_length: int = 15): self.history_length = history_length self.prediction_length = prediction_length self.position_history: Dict[str, List[Tuple[int, int]]] = {} self.kalman_filters: Dict[str, KalmanFilter] = {} - + def _init_kalman(self, object_id: str, initial_pos: Tuple[int, int]): """Initialize Kalman filter for new object with position and velocity state""" - kf = KalmanFilter(dim_x=4, dim_z=2) # State: [x, y, vx, vy], Measurement: [x, y] - + kf = KalmanFilter( + dim_x=4, dim_z=2 + ) # State: [x, y, vx, vy], Measurement: [x, y] + # State transition matrix - kf.F = np.array([ - [1, 0, 1, 0], # x = x + vx - [0, 1, 0, 1], # y = y + vy - [0, 0, 1, 0], # vx = vx - [0, 0, 0, 1], # vy = vy - ]) - + kf.F = np.array( + [ + [1, 0, 1, 0], # x = x + vx + [0, 1, 0, 1], # y = y + vy + [0, 0, 1, 0], # vx = vx + [0, 0, 0, 1], # vy = vy + ] + ) + # Measurement matrix - kf.H = np.array([ - [1, 0, 0, 0], - [0, 1, 0, 0] - ]) - + kf.H = np.array([[1, 0, 0, 0], [0, 1, 0, 0]]) + # Measurement noise kf.R = np.eye(2) * 5 - + # Process noise kf.Q = np.eye(4) * 0.1 - + # Initial state kf.x = np.array([initial_pos[0], initial_pos[1], 0, 0]) - + # Initial state covariance kf.P = np.eye(4) * 100 - + self.kalman_filters[object_id] = kf - + def update_position(self, object_id: str, centroid: Tuple[int, int]): """Update position history and Kalman filter for an object""" if object_id not in self.position_history: self.position_history[object_id] = [] self._init_kalman(object_id, centroid) - + # Update Kalman filter kf = self.kalman_filters[object_id] measurement = np.array([centroid[0], centroid[1]]) kf.predict() kf.update(measurement) - + # Store filtered position filtered_pos = (int(kf.x[0]), int(kf.x[1])) self.position_history[object_id].append(filtered_pos) - + # Trim history if len(self.position_history[object_id]) > self.history_length: - self.position_history[object_id] = self.position_history[object_id][-self.history_length:] - - def predict_path(self, object_id: str, frame_shape: Tuple[int, int]) -> List[Tuple[int, int]]: + self.position_history[object_id] = self.position_history[object_id][ + -self.history_length : + ] + + def predict_path( + self, object_id: str, frame_shape: Tuple[int, int] + ) -> List[Tuple[int, int]]: """Predict future path using Kalman filter""" if object_id not in self.kalman_filters: return [] - + kf = self.kalman_filters[object_id] predictions = [] - + # Save current state current_state = kf.x.copy() current_covar = kf.P.copy() - + # Predict future positions for _ in range(self.prediction_length): kf.predict() pred_x = int(kf.x[0]) pred_y = int(kf.x[1]) - + # Constrain predictions to frame boundaries pred_x = max(0, min(pred_x, frame_shape[1])) pred_y = max(0, min(pred_y, frame_shape[0])) - + predictions.append((pred_x, pred_y)) - + # Restore saved state kf.x = current_state kf.P = current_covar - + return predictions - + def draw_paths(self, frame: np.ndarray, active_objects: List[str]): """Draw historical and predicted paths for active objects""" frame_shape = frame.shape[:2] - + for object_id in active_objects: if object_id not in self.position_history: continue - + # Draw historical path positions = self.position_history[object_id] for i in range(1, len(positions)): # Color transitions from blue to green (past to present) alpha = i / len(positions) color = ( - int(255 * (1-alpha)), # Blue - int(255 * alpha), # Green - 0 # Red + int(255 * (1 - alpha)), # Blue + int(255 * alpha), # Green + 0, # Red ) - - start_pos = positions[i-1] + + start_pos = positions[i - 1] end_pos = positions[i] - + # Draw line with anti-aliasing cv2.line(frame, start_pos, end_pos, color, 2, cv2.LINE_AA) - + # Draw predicted path predictions = self.predict_path(object_id, frame_shape) for i in range(1, len(predictions)): # Red color with fading opacity for future predictions alpha = 1 - (i / len(predictions)) color = (0, 0, 255) # Red - - start_pos = predictions[i-1] + + start_pos = predictions[i - 1] end_pos = predictions[i] - + # Create overlay for alpha blending overlay = frame.copy() cv2.line(overlay, start_pos, end_pos, color, 2, cv2.LINE_AA) cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0, frame) - + def cleanup_inactive(self, active_objects: List[str]): """Remove tracking data for inactive objects""" current_ids = set(active_objects) tracked_ids = set(self.position_history.keys()) - + for inactive_id in tracked_ids - current_ids: self.position_history.pop(inactive_id, None) - self.kalman_filters.pop(inactive_id, None) \ No newline at end of file + self.kalman_filters.pop(inactive_id, None) diff --git a/frigate/object_processing.py b/frigate/object_processing.py index 1ca2486f5..14c4be83a 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -236,22 +236,23 @@ class CameraState: if draw_options.get("motion_paths"): # Update and draw paths for non-stationary objects active_objects = [ - obj_id for obj_id, obj in tracked_objects.items() + obj_id + for obj_id, obj in tracked_objects.items() if not obj["stationary"] and obj["frame_time"] == frame_time ] - + # Update positions for active objects for obj_id in active_objects: obj = tracked_objects[obj_id] centroid = ( int((obj["box"][0] + obj["box"][2]) / 2), # x center - int((obj["box"][1] + obj["box"][3]) / 2) # y center + int((obj["box"][1] + obj["box"][3]) / 2), # y center ) self.path_visualizer.update_position(obj_id, centroid) - + # Draw paths self.path_visualizer.draw_paths(frame_copy, active_objects) - + # Cleanup inactive objects self.path_visualizer.cleanup_inactive(active_objects) @@ -445,33 +446,34 @@ class CameraState: def update_paths(self, id: str, obj: TrackedObject): """Store motion path data globally if movement exceeds motion threshold.""" - motion_paths_config = self.camera_config.motion_paths or self.config.motion_paths + motion_paths_config = ( + self.camera_config.motion_paths or self.config.motion_paths + ) if motion_paths_config is None: return - + motion_threshold = self.camera_config.motion.threshold max_path_length = motion_paths_config.max_history - + current_box = obj.obj_data["box"] current_centroid = ( int((current_box[0] + current_box[2]) / 2), - int((current_box[1] + current_box[3]) / 2) + int((current_box[1] + current_box[3]) / 2), ) - + history = self.path_history[self.name][id] if history and len(history) > 0: prev_pos = history[-1] # Calculate motion delta - delta = ( - abs(current_centroid[0] - prev_pos[0]) + - abs(current_centroid[1] - prev_pos[1]) + delta = abs(current_centroid[0] - prev_pos[0]) + abs( + current_centroid[1] - prev_pos[1] ) if delta > motion_threshold: history.append(current_centroid) else: # Always record first position history.append(current_centroid) - + # Keep last N positions based on config if len(history) > max_path_length: history.pop(0) @@ -514,7 +516,7 @@ class TrackedObjectProcessor(threading.Thread): self.zone_data = defaultdict(lambda: defaultdict(dict)) self.active_zone_data = defaultdict(lambda: defaultdict(dict)) - self.path_history = defaultdict(lambda: defaultdict(list)) + self.path_history = defaultdict(lambda: defaultdict(list)) def start(camera: str, obj: TrackedObject, frame_name: str): self.event_sender.publish( diff --git a/frigate/track/tracked_object.py b/frigate/track/tracked_object.py index 36a03b6aa..5bfcc0fa6 100644 --- a/frigate/track/tracked_object.py +++ b/frigate/track/tracked_object.py @@ -395,7 +395,13 @@ class TrackedObject: return None def get_jpg_bytes( - self, timestamp=False, bounding_box=False, motion_paths=False, crop=False, height=None, quality=70 + self, + timestamp=False, + bounding_box=False, + motion_paths=False, + crop=False, + height=None, + quality=70, ): if self.thumbnail_data is None: return None @@ -455,14 +461,14 @@ class TrackedObject: if len(positions) > 1: # Draw lines connecting positions for i in range(1, len(positions)): - start_pos = positions[i-1] + start_pos = positions[i - 1] end_pos = positions[i] # Color transitions from blue to green (past to present) alpha = i / len(positions) color = ( - int(255 * (1-alpha)), # Blue - int(255 * alpha), # Green - 0 # Red + int(255 * (1 - alpha)), # Blue + int(255 * alpha), # Green + 0, # Red ) cv2.line(best_frame, start_pos, end_pos, color, 2, cv2.LINE_AA)