This commit is contained in:
Gabriel Grant 2025-02-16 02:29:49 +00:00 committed by GitHub
commit 29cdb6d2a8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 440 additions and 4 deletions

View File

@ -17,6 +17,7 @@ class MediaLatestFrameQueryParams(BaseModel):
zones: Optional[int] = None
mask: Optional[int] = None
motion: Optional[int] = None
motion_paths: Optional[int] = None
regions: Optional[int] = None
quality: Optional[int] = 70
height: Optional[int] = None
@ -40,6 +41,7 @@ class MediaMjpegFeedQueryParams(BaseModel):
zones: Optional[int] = None
mask: Optional[int] = None
motion: Optional[int] = None
motion_paths: Optional[int] = None
regions: Optional[int] = None

View File

@ -55,6 +55,7 @@ def mjpeg_feed(
):
draw_options = {
"bounding_boxes": params.bbox,
"motion_paths": params.motion_paths,
"timestamp": params.timestamp,
"zones": params.zones,
"mask": params.mask,
@ -127,6 +128,7 @@ def latest_frame(
frame_processor: TrackedObjectProcessor = request.app.detected_frames_processor
draw_options = {
"bounding_boxes": params.bbox,
"motion_paths": params.motion_paths,
"timestamp": params.timestamp,
"zones": params.zones,
"mask": params.mask,

View File

@ -24,6 +24,7 @@ from .ffmpeg import CameraFfmpegConfig, CameraInput
from .genai import GenAICameraConfig
from .live import CameraLiveConfig
from .motion import MotionConfig
from .motion_path import MotionPathConfig
from .mqtt import CameraMqttConfig
from .notification import NotificationConfig
from .objects import ObjectConfig
@ -62,6 +63,7 @@ class CameraConfig(FrigateBaseModel):
motion: Optional[MotionConfig] = Field(
None, title="Motion detection configuration."
)
motion_paths: Optional[MotionPathConfig] = Field(None, title="Enable motion paths.")
objects: ObjectConfig = Field(
default_factory=ObjectConfig, title="Object configuration."
)

View File

@ -0,0 +1,35 @@
from typing import Any, Optional
from pydantic import Field, field_serializer, validator
from ..base import FrigateBaseModel
__all__ = ["MotionPathConfig"]
class MotionPathConfig(FrigateBaseModel):
enabled: bool = Field(default=True, title="Enable motion path tracking.")
max_history: int = Field(
default=10,
title="Number of positions to maintain in motion path history.",
ge=2,
le=100,
)
mask: Optional[Any] = Field(default=None)
raw_mask: Optional[Any] = Field(default=None)
@field_serializer("mask", when_used="json")
def serialize_mask(self, value: Any, info):
return self.raw_mask
@field_serializer("raw_mask", when_used="json")
def serialize_raw_mask(self, value: Any, info):
return None
@validator("max_history")
def max_history_range(cls, v):
if v < 2:
raise ValueError("max_history must be >= 2")
if v > 100:
raise ValueError("max_history must be <= 100")
return v

View File

@ -9,6 +9,7 @@ class CameraMqttConfig(FrigateBaseModel):
enabled: bool = Field(default=True, title="Send image over MQTT.")
timestamp: bool = Field(default=True, title="Add timestamp to MQTT image.")
bounding_box: bool = Field(default=True, title="Add bounding box to MQTT image.")
motion_paths: bool = Field(default=True, title="Add motion paths to MQTT image.")
crop: bool = Field(default=True, title="Crop MQTT image to detected object.")
height: int = Field(default=270, title="MQTT image height.")
required_zones: list[str] = Field(

View File

@ -27,6 +27,9 @@ class SnapshotsConfig(FrigateBaseModel):
bounding_box: bool = Field(
default=True, title="Add a bounding box overlay on the snapshot."
)
motion_paths: bool = Field(
default=True, title="Add a motion path overlay on the snapshot."
)
crop: bool = Field(default=False, title="Crop the snapshot to the detected object.")
required_zones: list[str] = Field(
default_factory=list,

View File

@ -46,6 +46,7 @@ from .camera.detect import DetectConfig
from .camera.ffmpeg import FfmpegConfig
from .camera.genai import GenAIConfig
from .camera.motion import MotionConfig
from .camera.motion_path import MotionPathConfig
from .camera.notification import NotificationConfig
from .camera.objects import FilterConfig, ObjectConfig
from .camera.record import RecordConfig, RetainModeEnum
@ -388,6 +389,9 @@ class FrigateConfig(FrigateBaseModel):
motion: Optional[MotionConfig] = Field(
default=None, title="Global motion detection configuration."
)
motion_paths: Optional[MotionPathConfig] = Field(
default=None, title="Global motion path tracking configuration."
)
objects: ObjectConfig = Field(
default_factory=ObjectConfig, title="Global object configuration."
)

View File

@ -0,0 +1,147 @@
from typing import Dict, List, Tuple
import cv2
import numpy as np
from filterpy.kalman import KalmanFilter
class PathVisualizer:
def __init__(self, history_length: int = 30, prediction_length: int = 15):
self.history_length = history_length
self.prediction_length = prediction_length
self.position_history: Dict[str, List[Tuple[int, int]]] = {}
self.kalman_filters: Dict[str, KalmanFilter] = {}
def _init_kalman(self, object_id: str, initial_pos: Tuple[int, int]):
"""Initialize Kalman filter for new object with position and velocity state"""
kf = KalmanFilter(
dim_x=4, dim_z=2
) # State: [x, y, vx, vy], Measurement: [x, y]
# State transition matrix
kf.F = np.array(
[
[1, 0, 1, 0], # x = x + vx
[0, 1, 0, 1], # y = y + vy
[0, 0, 1, 0], # vx = vx
[0, 0, 0, 1], # vy = vy
]
)
# Measurement matrix
kf.H = np.array([[1, 0, 0, 0], [0, 1, 0, 0]])
# Measurement noise
kf.R = np.eye(2) * 5
# Process noise
kf.Q = np.eye(4) * 0.1
# Initial state
kf.x = np.array([initial_pos[0], initial_pos[1], 0, 0])
# Initial state covariance
kf.P = np.eye(4) * 100
self.kalman_filters[object_id] = kf
def update_position(self, object_id: str, centroid: Tuple[int, int]):
"""Update position history and Kalman filter for an object"""
if object_id not in self.position_history:
self.position_history[object_id] = []
self._init_kalman(object_id, centroid)
# Update Kalman filter
kf = self.kalman_filters[object_id]
measurement = np.array([centroid[0], centroid[1]])
kf.predict()
kf.update(measurement)
# Store filtered position
filtered_pos = (int(kf.x[0]), int(kf.x[1]))
self.position_history[object_id].append(filtered_pos)
# Trim history
if len(self.position_history[object_id]) > self.history_length:
self.position_history[object_id] = self.position_history[object_id][
-self.history_length :
]
def predict_path(
self, object_id: str, frame_shape: Tuple[int, int]
) -> List[Tuple[int, int]]:
"""Predict future path using Kalman filter"""
if object_id not in self.kalman_filters:
return []
kf = self.kalman_filters[object_id]
predictions = []
# Save current state
current_state = kf.x.copy()
current_covar = kf.P.copy()
# Predict future positions
for _ in range(self.prediction_length):
kf.predict()
pred_x = int(kf.x[0])
pred_y = int(kf.x[1])
# Constrain predictions to frame boundaries
pred_x = max(0, min(pred_x, frame_shape[1]))
pred_y = max(0, min(pred_y, frame_shape[0]))
predictions.append((pred_x, pred_y))
# Restore saved state
kf.x = current_state
kf.P = current_covar
return predictions
def draw_paths(self, frame: np.ndarray, active_objects: List[str]):
"""Draw historical and predicted paths for active objects"""
frame_shape = frame.shape[:2]
for object_id in active_objects:
if object_id not in self.position_history:
continue
# Draw historical path
positions = self.position_history[object_id]
for i in range(1, len(positions)):
# Color transitions from blue to green (past to present)
alpha = i / len(positions)
color = (
int(255 * (1 - alpha)), # Blue
int(255 * alpha), # Green
0, # Red
)
start_pos = positions[i - 1]
end_pos = positions[i]
cv2.line(frame, start_pos, end_pos, color, 2, cv2.LINE_AA)
# Draw predicted path
predictions = self.predict_path(object_id, frame_shape)
for i in range(1, len(predictions)):
# Red color with fading opacity for future predictions
alpha = 1 - (i / len(predictions))
color = (0, 0, 255) # Red
start_pos = predictions[i - 1]
end_pos = predictions[i]
overlay = frame.copy()
cv2.line(overlay, start_pos, end_pos, color, 2, cv2.LINE_AA)
cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0, frame)
def cleanup_inactive(self, active_objects: List[str]):
"""Remove tracking data for inactive objects"""
current_ids = set(active_objects)
tracked_ids = set(self.position_history.keys())
for inactive_id in tracked_ids - current_ids:
self.position_history.pop(inactive_id, None)
self.kalman_filters.pop(inactive_id, None)

View File

@ -16,14 +16,15 @@ from frigate.comms.dispatcher import Dispatcher
from frigate.comms.events_updater import EventEndSubscriber, EventUpdatePublisher
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import (
CameraMqttConfig,
FrigateConfig,
MqttConfig,
RecordConfig,
SnapshotsConfig,
ZoomingModeEnum,
)
from frigate.const import CLIPS_DIR, UPDATE_CAMERA_ACTIVITY
from frigate.events.types import EventStateEnum, EventTypeEnum
from frigate.motion.path_visualizer import PathVisualizer
from frigate.ptz.autotrack import PtzAutoTrackerThread
from frigate.track.tracked_object import TrackedObject
from frigate.util.image import (
@ -45,6 +46,7 @@ class CameraState:
config: FrigateConfig,
frame_manager: SharedMemoryFrameManager,
ptz_autotracker_thread: PtzAutoTrackerThread,
path_history,
):
self.name = name
self.config = config
@ -62,6 +64,8 @@ class CameraState:
self.previous_frame_id = None
self.callbacks = defaultdict(list)
self.ptz_autotracker_thread = ptz_autotracker_thread
self.path_visualizer = PathVisualizer()
self.path_history = path_history
def get_current_frame(self, draw_options={}):
with self.current_frame_lock:
@ -228,6 +232,27 @@ class CameraState:
position=self.camera_config.timestamp_style.position,
)
if draw_options.get("motion_paths"):
# Update and draw paths for non-stationary objects
active_objects = [
obj_id
for obj_id, obj in tracked_objects.items()
if not obj["stationary"] and obj["frame_time"] == frame_time
]
# Update positions for active objects
for obj_id in active_objects:
obj = tracked_objects[obj_id]
centroid = (
int((obj["box"][0] + obj["box"][2]) / 2), # x center
int((obj["box"][1] + obj["box"][3]) / 2), # y center
)
self.path_visualizer.update_position(obj_id, centroid)
self.path_visualizer.draw_paths(frame_copy, active_objects)
self.path_visualizer.cleanup_inactive(active_objects)
return frame_copy
def finished(self, obj_id):
@ -262,6 +287,7 @@ class CameraState:
self.config.ui,
self.frame_cache,
current_detections[id],
self.path_history,
)
# call event handlers
@ -274,6 +300,8 @@ class CameraState:
frame_time, current_detections[id], current_frame is not None
)
self.update_paths(id, updated_obj)
if autotracker_update or significant_update:
for c in self.callbacks["autotrack"]:
c(self.name, updated_obj, frame_name)
@ -413,6 +441,40 @@ class CameraState:
self.previous_frame_id = frame_name
def update_paths(self, id: str, obj: TrackedObject):
"""Store motion path data globally if movement exceeds motion threshold."""
motion_paths_config = (
self.camera_config.motion_paths or self.config.motion_paths
)
if motion_paths_config is None:
return
motion_threshold = self.camera_config.motion.threshold
max_path_length = motion_paths_config.max_history
current_box = obj.obj_data["box"]
current_centroid = (
int((current_box[0] + current_box[2]) / 2),
int((current_box[1] + current_box[3]) / 2),
)
history = self.path_history[self.name][id]
if history and len(history) > 0:
prev_pos = history[-1]
# Calculate motion delta
delta = abs(current_centroid[0] - prev_pos[0]) + abs(
current_centroid[1] - prev_pos[1]
)
if delta > motion_threshold:
history.append(current_centroid)
else:
# Always record first position
history.append(current_centroid)
# Keep last N positions based on config
if len(history) > max_path_length:
history.pop(0)
class TrackedObjectProcessor(threading.Thread):
def __init__(
@ -451,6 +513,8 @@ class TrackedObjectProcessor(threading.Thread):
self.zone_data = defaultdict(lambda: defaultdict(dict))
self.active_zone_data = defaultdict(lambda: defaultdict(dict))
self.path_history = defaultdict(lambda: defaultdict(list))
def start(camera: str, obj: TrackedObject, frame_name: str):
self.event_sender.publish(
(
@ -497,6 +561,7 @@ class TrackedObjectProcessor(threading.Thread):
jpg_bytes = obj.get_jpg_bytes(
timestamp=snapshot_config.timestamp,
bounding_box=snapshot_config.bounding_box,
motion_paths=snapshot_config.motion_paths,
crop=snapshot_config.crop,
height=snapshot_config.height,
quality=snapshot_config.quality,
@ -547,11 +612,12 @@ class TrackedObjectProcessor(threading.Thread):
)
def snapshot(camera, obj: TrackedObject, frame_name: str):
mqtt_config: MqttConfig = self.config.cameras[camera].mqtt
mqtt_config: CameraMqttConfig = self.config.cameras[camera].mqtt
if mqtt_config.enabled and self.should_mqtt_snapshot(camera, obj):
jpg_bytes = obj.get_jpg_bytes(
timestamp=mqtt_config.timestamp,
bounding_box=mqtt_config.bounding_box,
motion_paths=mqtt_config.motion_paths,
crop=mqtt_config.crop,
height=mqtt_config.height,
quality=mqtt_config.quality,
@ -577,7 +643,11 @@ class TrackedObjectProcessor(threading.Thread):
for camera in self.config.cameras.keys():
camera_state = CameraState(
camera, self.config, self.frame_manager, self.ptz_autotracker_thread
camera,
self.config,
self.frame_manager,
self.ptz_autotracker_thread,
self.path_history,
)
camera_state.on("start", start)
camera_state.on("autotrack", autotrack)

View File

@ -0,0 +1,62 @@
import unittest
from pydantic import ValidationError
from frigate.config.camera.motion_path import MotionPathConfig
class TestMotionPathConfig(unittest.TestCase):
def setUp(self):
self.default_config = {
"enabled": True,
"max_history": 10,
"mask": None,
"raw_mask": None,
}
def test_default_motion_path_config(self):
motion_path_config = MotionPathConfig(**self.default_config)
assert motion_path_config.enabled is True
assert motion_path_config.max_history == 10
assert motion_path_config.mask is None
assert motion_path_config.raw_mask is None
def test_valid_max_history(self):
config = self.default_config.copy()
config["max_history"] = 50
motion_path_config = MotionPathConfig(**config)
assert motion_path_config.max_history == 50
def test_invalid_max_history_below_minimum(self):
config = self.default_config.copy()
config["max_history"] = 1
with self.assertRaises(ValidationError):
MotionPathConfig(**config)
def test_invalid_max_history_above_maximum(self):
config = self.default_config.copy()
config["max_history"] = 101
with self.assertRaises(ValidationError):
MotionPathConfig(**config)
def test_serialize_mask(self):
config = self.default_config.copy()
config["raw_mask"] = "test_mask"
motion_path_config = MotionPathConfig(**config)
assert (
motion_path_config.serialize_mask(motion_path_config.mask, None)
== "test_mask"
)
def test_serialize_raw_mask(self):
config = self.default_config.copy()
config["raw_mask"] = "test_mask"
motion_path_config = MotionPathConfig(**config)
assert (
motion_path_config.serialize_raw_mask(motion_path_config.raw_mask, None)
is None
)
if __name__ == "__main__":
unittest.main(verbosity=2)

View File

@ -0,0 +1,66 @@
import unittest
import numpy as np
from frigate.motion.path_visualizer import PathVisualizer
class TestPathVisualizer(unittest.TestCase):
def setUp(self):
self.visualizer = PathVisualizer(history_length=5, prediction_length=3)
def test_init_kalman_creates_filter(self):
object_id = "object_1"
initial_pos = (100, 200)
self.visualizer._init_kalman(object_id, initial_pos)
self.assertIn(object_id, self.visualizer.kalman_filters)
kf = self.visualizer.kalman_filters[object_id]
self.assertTrue(np.array_equal(kf.x[:2], np.array(initial_pos)))
def test_update_position_adds_to_history(self):
object_id = "object_1"
centroid = (100, 200)
self.visualizer.update_position(object_id, centroid)
self.assertIn(object_id, self.visualizer.position_history)
self.assertEqual(self.visualizer.position_history[object_id][-1], centroid)
def test_update_position_trims_history(self):
object_id = "object_1"
for i in range(10):
self.visualizer.update_position(object_id, (i, i))
self.assertEqual(len(self.visualizer.position_history[object_id]), 5)
def test_predict_path_returns_correct_length(self):
object_id = "object_1"
initial_pos = (100, 200)
self.visualizer._init_kalman(object_id, initial_pos)
predictions = self.visualizer.predict_path(object_id, (500, 500))
self.assertEqual(len(predictions), 3)
def test_draw_paths(self):
frame = np.zeros((500, 500, 3), dtype=np.uint8)
object_id = "object_1"
self.visualizer.update_position(object_id, (100, 100))
self.visualizer.update_position(object_id, (150, 150))
self.visualizer.draw_paths(frame, [object_id])
# Check if the line was drawn
self.assertTrue(np.any(frame != 0))
def test_cleanup_inactive_removes_data(self):
object_id = "object_1"
self.visualizer.update_position(object_id, (100, 100))
self.visualizer.cleanup_inactive([])
self.assertNotIn(object_id, self.visualizer.position_history)
self.assertNotIn(object_id, self.visualizer.kalman_filters)
if __name__ == "__main__":
unittest.main(verbosity=2)

View File

@ -24,6 +24,7 @@ from frigate.util.image import (
)
from frigate.util.object import box_inside
from frigate.util.velocity import calculate_real_world_speed
from frigate.motion.path_visualizer import PathVisualizer
logger = logging.getLogger(__name__)
@ -36,6 +37,7 @@ class TrackedObject:
ui_config: UIConfig,
frame_cache,
obj_data: dict[str, any],
path_history,
):
# set the score history then remove as it is not part of object state
self.score_history = obj_data["score_history"]
@ -67,6 +69,8 @@ class TrackedObject:
self.average_estimated_speed = 0
self.velocity_angle = 0
self.previous = self.to_dict()
self.path_visualizer = PathVisualizer()
self.path_history = path_history
@property
def max_severity(self) -> Optional[str]:
@ -391,7 +395,13 @@ class TrackedObject:
return None
def get_jpg_bytes(
self, timestamp=False, bounding_box=False, crop=False, height=None, quality=70
self,
timestamp=False,
bounding_box=False,
motion_paths=False,
crop=False,
height=None,
quality=70,
):
if self.thumbnail_data is None:
return None
@ -445,6 +455,23 @@ class TrackedObject:
color=color,
)
if motion_paths:
# Draw historical path data
positions = self.path_history[self.camera_config.name][self.obj_data["id"]]
if len(positions) > 1:
# Draw lines connecting positions
for i in range(1, len(positions)):
start_pos = positions[i - 1]
end_pos = positions[i]
# Color transitions from blue to green (past to present)
alpha = i / len(positions)
color = (
int(255 * (1 - alpha)), # Blue
int(255 * alpha), # Green
0, # Red
)
cv2.line(best_frame, start_pos, end_pos, color, 2, cv2.LINE_AA)
if crop:
box = self.thumbnail_data["box"]
box_size = 300

View File

@ -101,6 +101,16 @@ function DebugSettings({ handleSetOption, options }: DebugSettingsProps) {
/>
<Label htmlFor="bbox">Bounding Box</Label>
</div>
<div className="flex items-center space-x-2">
<Switch
id="motion_paths"
checked={options["motion_paths"]}
onCheckedChange={(isChecked) => {
handleSetOption("motion_paths", isChecked);
}}
/>
<Label htmlFor="motion_paths">Motion Paths</Label>
</div>
<div className="flex items-center space-x-2">
<Switch
id="timestamp"

View File

@ -72,6 +72,11 @@ export default function ObjectSettingsView({
</>
),
},
{
param: "motion_paths",
title: "Motion Paths",
description: "Show motion paths of tracked objects",
},
{
param: "timestamp",
title: "Timestamp",