add to snapshots, persist history so snapshots have a useful path

This commit is contained in:
Gabriel Grant 2025-02-15 19:59:29 -05:00
parent 851b0ebd6b
commit 575515c6dc
5 changed files with 82 additions and 30 deletions

View File

@ -8,32 +8,15 @@ __all__ = ["MotionPathConfig"]
class MotionPathConfig(FrigateBaseModel):
enabled: bool = Field(default=True, title="Enable motion path tracking on all cameras.")
threshold: int = Field(
default=30,
title="Motion detection threshold (1-255).",
ge=1,
le=255,
enabled: bool = Field(default=True, title="Enable motion path tracking.")
max_history: int = Field(
default=10,
title="Number of positions to maintain in motion path history.",
ge=2, # Minimum of 2 positions needed to draw a path
le=100, # Reasonable upper limit
)
lightning_threshold: float = Field(
default=0.8, title="Lightning detection threshold (0.3-1.0).", ge=0.3, le=1.0
)
improve_contrast: bool = Field(default=True, title="Improve Contrast")
contour_area: Optional[int] = Field(default=10, title="Contour Area")
delta_alpha: float = Field(default=0.2, title="Delta Alpha")
frame_alpha: float = Field(default=0.01, title="Frame Alpha")
frame_height: Optional[int] = Field(default=100, title="Frame Height")
mask: Union[str, list[str]] = Field(
default="", title="Coordinates polygon for the motion mask."
)
mqtt_off_delay: int = Field(
default=30,
title="Delay for updating MQTT with no motion detected.",
)
enabled_in_config: Optional[bool] = Field(
default=None, title="Keep track of original state of motion detection."
)
raw_mask: Union[str, list[str]] = ""
mask: Optional[Any] = Field(default=None)
raw_mask: Optional[Any] = Field(default=None)
@field_serializer("mask", when_used="json")
def serialize_mask(self, value: Any, info):

View File

@ -9,6 +9,7 @@ class CameraMqttConfig(FrigateBaseModel):
enabled: bool = Field(default=True, title="Send image over MQTT.")
timestamp: bool = Field(default=True, title="Add timestamp to MQTT image.")
bounding_box: bool = Field(default=True, title="Add bounding box to MQTT image.")
motion_paths: bool = Field(default=True, title="Add motion paths to MQTT image.")
crop: bool = Field(default=True, title="Crop MQTT image to detected object.")
height: int = Field(default=270, title="MQTT image height.")
required_zones: list[str] = Field(

View File

@ -42,7 +42,7 @@ class PathVisualizer:
self.kalman_filters[object_id] = kf
def update_position(self, object_id: str, centroid: Tuple[int, int], frame_shape: Tuple[int, int]):
def update_position(self, object_id: str, centroid: Tuple[int, int]):
"""Update position history and Kalman filter for an object"""
if object_id not in self.position_history:
self.position_history[object_id] = []

View File

@ -18,6 +18,7 @@ from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import (
FrigateConfig,
MqttConfig,
CameraMqttConfig,
RecordConfig,
SnapshotsConfig,
ZoomingModeEnum,
@ -46,6 +47,7 @@ class CameraState:
config: FrigateConfig,
frame_manager: SharedMemoryFrameManager,
ptz_autotracker_thread: PtzAutoTrackerThread,
path_history,
):
self.name = name
self.config = config
@ -64,6 +66,7 @@ class CameraState:
self.callbacks = defaultdict(list)
self.ptz_autotracker_thread = ptz_autotracker_thread
self.path_visualizer = PathVisualizer()
self.path_history = path_history
def get_current_frame(self, draw_options={}):
with self.current_frame_lock:
@ -244,7 +247,7 @@ class CameraState:
int((obj["box"][0] + obj["box"][2]) / 2), # x center
int((obj["box"][1] + obj["box"][3]) / 2) # y center
)
self.path_visualizer.update_position(obj_id, centroid, frame_copy.shape[:2])
self.path_visualizer.update_position(obj_id, centroid)
# Draw paths
self.path_visualizer.draw_paths(frame_copy, active_objects)
@ -286,6 +289,7 @@ class CameraState:
self.config.ui,
self.frame_cache,
current_detections[id],
self.path_history,
)
# call event handlers
@ -298,6 +302,8 @@ class CameraState:
frame_time, current_detections[id], current_frame is not None
)
self.update_paths(id, updated_obj)
if autotracker_update or significant_update:
for c in self.callbacks["autotrack"]:
c(self.name, updated_obj, frame_name)
@ -437,6 +443,39 @@ class CameraState:
self.previous_frame_id = frame_name
def update_paths(self, id: str, obj: TrackedObject):
"""Store motion path data globally if movement exceeds motion threshold."""
motion_paths_config = self.camera_config.motion_paths or self.config.motion_paths
if motion_paths_config is None:
return
motion_threshold = self.camera_config.motion.threshold
max_path_length = motion_paths_config.max_history
current_box = obj.obj_data["box"]
current_centroid = (
int((current_box[0] + current_box[2]) / 2),
int((current_box[1] + current_box[3]) / 2)
)
history = self.path_history[self.name][id]
if history and len(history) > 0:
prev_pos = history[-1]
# Calculate motion delta
delta = (
abs(current_centroid[0] - prev_pos[0]) +
abs(current_centroid[1] - prev_pos[1])
)
if delta > motion_threshold:
history.append(current_centroid)
else:
# Always record first position
history.append(current_centroid)
# Keep last N positions based on config
if len(history) > max_path_length:
history.pop(0)
class TrackedObjectProcessor(threading.Thread):
def __init__(
@ -475,6 +514,8 @@ class TrackedObjectProcessor(threading.Thread):
self.zone_data = defaultdict(lambda: defaultdict(dict))
self.active_zone_data = defaultdict(lambda: defaultdict(dict))
self.path_history = defaultdict(lambda: defaultdict(list))
def start(camera: str, obj: TrackedObject, frame_name: str):
self.event_sender.publish(
(
@ -521,6 +562,7 @@ class TrackedObjectProcessor(threading.Thread):
jpg_bytes = obj.get_jpg_bytes(
timestamp=snapshot_config.timestamp,
bounding_box=snapshot_config.bounding_box,
motion_paths=snapshot_config.motion_paths,
crop=snapshot_config.crop,
height=snapshot_config.height,
quality=snapshot_config.quality,
@ -571,11 +613,12 @@ class TrackedObjectProcessor(threading.Thread):
)
def snapshot(camera, obj: TrackedObject, frame_name: str):
mqtt_config: MqttConfig = self.config.cameras[camera].mqtt
mqtt_config: CameraMqttConfig = self.config.cameras[camera].mqtt
if mqtt_config.enabled and self.should_mqtt_snapshot(camera, obj):
jpg_bytes = obj.get_jpg_bytes(
timestamp=mqtt_config.timestamp,
bounding_box=mqtt_config.bounding_box,
motion_paths=mqtt_config.motion_paths,
crop=mqtt_config.crop,
height=mqtt_config.height,
quality=mqtt_config.quality,
@ -601,7 +644,11 @@ class TrackedObjectProcessor(threading.Thread):
for camera in self.config.cameras.keys():
camera_state = CameraState(
camera, self.config, self.frame_manager, self.ptz_autotracker_thread
camera,
self.config,
self.frame_manager,
self.ptz_autotracker_thread,
self.path_history,
)
camera_state.on("start", start)
camera_state.on("autotrack", autotrack)

View File

@ -24,6 +24,7 @@ from frigate.util.image import (
)
from frigate.util.object import box_inside
from frigate.util.velocity import calculate_real_world_speed
from frigate.motion.path_visualizer import PathVisualizer
logger = logging.getLogger(__name__)
@ -36,6 +37,7 @@ class TrackedObject:
ui_config: UIConfig,
frame_cache,
obj_data: dict[str, any],
path_history,
):
# set the score history then remove as it is not part of object state
self.score_history = obj_data["score_history"]
@ -67,6 +69,8 @@ class TrackedObject:
self.average_estimated_speed = 0
self.velocity_angle = 0
self.previous = self.to_dict()
self.path_visualizer = PathVisualizer()
self.path_history = path_history
@property
def max_severity(self) -> Optional[str]:
@ -391,7 +395,7 @@ class TrackedObject:
return None
def get_jpg_bytes(
self, timestamp=False, bounding_box=False, crop=False, height=None, quality=70
self, timestamp=False, bounding_box=False, motion_paths=False, crop=False, height=None, quality=70
):
if self.thumbnail_data is None:
return None
@ -445,6 +449,23 @@ class TrackedObject:
color=color,
)
if motion_paths:
# Draw historical path data
positions = self.path_history[self.camera_config.name][self.obj_data["id"]]
if len(positions) > 1:
# Draw lines connecting positions
for i in range(1, len(positions)):
start_pos = positions[i-1]
end_pos = positions[i]
# Color transitions from blue to green (past to present)
alpha = i / len(positions)
color = (
int(255 * (1-alpha)), # Blue
int(255 * alpha), # Green
0 # Red
)
cv2.line(best_frame, start_pos, end_pos, color, 2, cv2.LINE_AA)
if crop:
box = self.thumbnail_data["box"]
box_size = 300