Create ReviewSegment table in DB for organizing detections to be reviewed (#9918)

* Add review to database

* Create main manager for review segments

* Upsert and maintain review segments

* Update logic for adding new segments

* Add api

* Support deleting review segments on recording cleanup

* Add field for alert labels

* Formatting

* Logic fixes

* Save 16:9 thumbnail for review segment

* Ensure that crop is 16:9

* Fix non detected objects being added

* Only include true positives

* Add sub labels to data
This commit is contained in:
Nicolas Mowen 2024-02-20 16:26:09 -07:00 committed by GitHub
parent cdd6ac9071
commit 940be5dc6f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 598 additions and 5 deletions

View File

@ -45,6 +45,7 @@ from frigate.models import (
Recordings,
RecordingsToDelete,
Regions,
ReviewSegment,
Timeline,
)
from frigate.object_detection import ObjectDetectProcess
@ -55,6 +56,7 @@ from frigate.ptz.autotrack import PtzAutoTrackerThread
from frigate.ptz.onvif import OnvifController
from frigate.record.cleanup import RecordingCleanup
from frigate.record.record import manage_recordings
from frigate.review.review import manage_review_segments
from frigate.stats import StatsEmitter, stats_init
from frigate.storage import StorageMaintainer
from frigate.timeline import TimelineProcessor
@ -283,6 +285,18 @@ class FrigateApp:
self.processes["recording"] = recording_process.pid or 0
logger.info(f"Recording process started: {recording_process.pid}")
def init_review_segment_manager(self) -> None:
review_segment_process = mp.Process(
target=manage_review_segments,
name="review_segment_manager",
args=(self.config,),
)
review_segment_process.daemon = True
self.review_segment_process = review_segment_process
review_segment_process.start()
self.processes["review_segment"] = review_segment_process.pid or 0
logger.info(f"Recording process started: {review_segment_process.pid}")
def bind_database(self) -> None:
"""Bind db to the main process."""
# NOTE: all db accessing processes need to be created before the db can be bound to the main process
@ -297,7 +311,15 @@ class FrigateApp:
60, 10 * len([c for c in self.config.cameras.values() if c.enabled])
),
)
models = [Event, Recordings, RecordingsToDelete, Previews, Regions, Timeline]
models = [
Event,
Previews,
Recordings,
RecordingsToDelete,
Regions,
ReviewSegment,
Timeline,
]
self.db.bind(models)
def init_stats(self) -> None:
@ -608,6 +630,7 @@ class FrigateApp:
self.init_database()
self.init_onvif()
self.init_recording_manager()
self.init_review_segment_manager()
self.init_go2rtc()
self.bind_database()
self.init_inter_process_communicator()

View File

@ -6,8 +6,13 @@ from typing import Any, Callable, Optional
from frigate.comms.config_updater import ConfigPublisher
from frigate.config import BirdseyeModeEnum, FrigateConfig
from frigate.const import INSERT_MANY_RECORDINGS, INSERT_PREVIEW, REQUEST_REGION_GRID
from frigate.models import Previews, Recordings
from frigate.const import (
INSERT_MANY_RECORDINGS,
INSERT_PREVIEW,
REQUEST_REGION_GRID,
UPSERT_REVIEW_SEGMENT,
)
from frigate.models import Previews, Recordings, ReviewSegment
from frigate.ptz.onvif import OnvifCommandEnum, OnvifController
from frigate.types import PTZMetricsTypes
from frigate.util.object import get_camera_regions_grid
@ -102,6 +107,15 @@ class Dispatcher:
return grid
elif topic == INSERT_PREVIEW:
Previews.insert(payload).execute()
elif topic == UPSERT_REVIEW_SEGMENT:
(
ReviewSegment.insert(payload)
.on_conflict(
conflict_target=[ReviewSegment.id],
update=payload,
)
.execute()
)
else:
self.publish(topic, payload, retain=False)

View File

@ -58,6 +58,7 @@ if os.path.isdir("/run/secrets"):
).read_text()
DEFAULT_TRACKED_OBJECTS = ["person"]
DEFAULT_ALERT_OBJECTS = ["person", "car"]
DEFAULT_LISTEN_AUDIO = ["bark", "fire_alarm", "scream", "speech", "yell"]
DEFAULT_DETECTORS = {"cpu": {"type": "cpu"}}
DEFAULT_DETECT_DIMENSIONS = {"width": 1280, "height": 720}
@ -512,6 +513,9 @@ class ZoneConfig(BaseModel):
class ObjectConfig(FrigateBaseModel):
track: List[str] = Field(default=DEFAULT_TRACKED_OBJECTS, title="Objects to track.")
alert: List[str] = Field(
default=DEFAULT_ALERT_OBJECTS, title="Objects to create alerts for."
)
filters: Dict[str, FilterConfig] = Field(default={}, title="Object filters.")
mask: Union[str, List[str]] = Field(default="", title="Object mask.")

View File

@ -70,6 +70,7 @@ MAX_PLAYLIST_SECONDS = 7200 # support 2 hour segments for a single playlist to
INSERT_MANY_RECORDINGS = "insert_many_recordings"
INSERT_PREVIEW = "insert_preview"
REQUEST_REGION_GRID = "request_region_grid"
UPSERT_REVIEW_SEGMENT = "upsert_review_segment"
# Autotracking

View File

@ -45,7 +45,7 @@ from frigate.const import (
RECORD_DIR,
)
from frigate.events.external import ExternalEventProcessor
from frigate.models import Event, Previews, Recordings, Regions, Timeline
from frigate.models import Event, Previews, Recordings, Regions, ReviewSegment, Timeline
from frigate.object_processing import TrackedObject
from frigate.plus import PlusApi
from frigate.ptz.onvif import OnvifController
@ -2390,6 +2390,36 @@ def vod_event(id):
)
@bp.route("/review")
def review():
camera = request.args.get("camera", "all")
limit = request.args.get("limit", 100)
severity = request.args.get("severity", None)
before = request.args.get("before", type=float, default=datetime.now().timestamp())
after = request.args.get(
"after", type=float, default=(datetime.now() - timedelta(hours=18)).timestamp()
)
clauses = [((ReviewSegment.start_time > after) & (ReviewSegment.end_time < before))]
if camera != "all":
clauses.append((ReviewSegment.camera == camera))
if severity:
clauses.append((ReviewSegment.severity == severity))
review = (
ReviewSegment.select()
.where(reduce(operator.and_, clauses))
.order_by(ReviewSegment.start_time.desc())
.limit(limit)
.dicts()
)
return jsonify([r for r in review])
@bp.route(
"/export/<camera_name>/start/<int:start_time>/end/<int:end_time>", methods=["POST"]
)

View File

@ -76,6 +76,17 @@ class Recordings(Model): # type: ignore[misc]
segment_size = FloatField(default=0) # this should be stored as MB
class ReviewSegment(Model): # type: ignore[misc]
id = CharField(null=False, primary_key=True, max_length=30)
camera = CharField(index=True, max_length=20)
start_time = DateTimeField()
end_time = DateTimeField()
has_been_reviewed = BooleanField(default=False)
severity = CharField(max_length=30) # alert, detection, significant_motion
thumb_path = CharField(unique=True)
data = JSONField() # additional data about detection like list of labels, zone, areas of significant motion
class Previews(Model): # type: ignore[misc]
id = CharField(null=False, primary_key=True, max_length=30)
camera = CharField(index=True, max_length=20)

View File

@ -9,7 +9,7 @@ from pathlib import Path
from frigate.config import CameraConfig, FrigateConfig, RetainModeEnum
from frigate.const import CACHE_DIR, RECORD_DIR
from frigate.models import Event, Previews, Recordings
from frigate.models import Event, Previews, Recordings, ReviewSegment
from frigate.record.util import remove_empty_directories, sync_recordings
from frigate.util.builtin import clear_and_unlink, get_tomorrow_at_time
@ -174,6 +174,65 @@ class RecordingCleanup(threading.Thread):
Previews.id << deleted_previews_list[i : i + max_deletes]
).execute()
review_segments: list[ReviewSegment] = (
ReviewSegment.select(
ReviewSegment.id,
ReviewSegment.start_time,
ReviewSegment.end_time,
ReviewSegment.thumb_path,
)
.where(
ReviewSegment.camera == config.name,
ReviewSegment.end_time < expire_date,
)
.order_by(ReviewSegment.start_time)
.namedtuples()
.iterator()
)
# expire review segments
recording_start = 0
deleted_segments = set()
for segment in review_segments:
keep = False
# look for a reason to keep this segment
for idx in range(recording_start, len(kept_recordings)):
start_time, end_time = kept_recordings[idx]
# if the recording starts in the future, stop checking recordings
# and let this segment expire
if start_time > segment.end_time:
keep = False
break
# if the recording ends after the segment starts, keep it
# and stop looking at recordings
if end_time >= segment.start_time:
keep = True
break
# if the recording ends before this segment starts, skip
# this recording and check the next recording for an overlap.
# since the kept recordings and segments are sorted, we can skip recordings
# that end before the current segment started
if end_time < segment.start_time:
recording_start = idx
# Delete segments without any relevant recordings
if not keep:
Path(segment.thumb_path).unlink(missing_ok=True)
deleted_segments.add(segment.id)
# expire segments
logger.debug(f"Expiring {len(deleted_segments)} segments")
# delete up to 100,000 at a time
max_deletes = 100000
deleted_segments_list = list(deleted_segments)
for i in range(0, len(deleted_segments_list), max_deletes):
ReviewSegment.delete().where(
ReviewSegment.id << deleted_segments_list[i : i + max_deletes]
).execute()
def expire_recordings(self) -> None:
"""Delete recordings based on retention config."""
logger.debug("Start expire recordings.")

View File

View File

@ -0,0 +1,328 @@
"""Maintain review segments in db."""
import logging
import os
import random
import string
import threading
from enum import Enum
from multiprocessing.synchronize import Event as MpEvent
from typing import Optional
import cv2
import numpy as np
from frigate.comms.config_updater import ConfigSubscriber
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import CameraConfig, FrigateConfig
from frigate.const import CLIPS_DIR, UPSERT_REVIEW_SEGMENT
from frigate.models import ReviewSegment
from frigate.object_processing import TrackedObject
from frigate.util.image import SharedMemoryFrameManager, calculate_16_9_crop
logger = logging.getLogger(__name__)
THUMB_HEIGHT = 180
THUMB_WIDTH = 320
class SeverityEnum(str, Enum):
alert = "alert"
detection = "detection"
signification_motion = "significant_motion"
class PendingReviewSegment:
def __init__(
self,
camera: str,
frame_time: float,
severity: SeverityEnum,
detections: set[str] = set(),
objects: set[str] = set(),
sub_labels: set[str] = set(),
zones: set[str] = set(),
audio: set[str] = set(),
motion: list[int] = [],
):
rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
self.id = f"{frame_time}-{rand_id}"
self.camera = camera
self.start_time = frame_time
self.severity = severity
self.detections = detections
self.objects = objects
self.sub_labels = sub_labels
self.zones = zones
self.audio = audio
self.sig_motion_areas = motion
self.last_update = frame_time
# thumbnail
self.frame = np.zeros((THUMB_HEIGHT * 3 // 2, THUMB_WIDTH), np.uint8)
self.frame_active_count = 0
def update_frame(
self, camera_config: CameraConfig, frame, objects: list[TrackedObject]
):
min_x = camera_config.frame_shape[1]
min_y = camera_config.frame_shape[0]
max_x = 0
max_y = 0
# find bounds for all boxes
for o in objects:
min_x = min(o["box"][0], min_x)
min_y = min(o["box"][1], min_y)
max_x = max(o["box"][2], max_x)
max_y = max(o["box"][3], max_y)
region = calculate_16_9_crop(
camera_config.frame_shape, min_x, min_y, max_x, max_y
)
# could not find suitable 16:9 region
if not region:
return
self.frame_active_count = len(objects)
color_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
color_frame = color_frame[region[1] : region[3], region[0] : region[2]]
width = int(THUMB_HEIGHT * color_frame.shape[1] / color_frame.shape[0])
self.frame = cv2.resize(
color_frame, dsize=(width, THUMB_HEIGHT), interpolation=cv2.INTER_AREA
)
def end(self) -> dict:
path = os.path.join(CLIPS_DIR, f"thumb-{self.camera}-{self.id}.jpg")
if self.frame is not None:
cv2.imwrite(path, self.frame)
return {
ReviewSegment.id: self.id,
ReviewSegment.camera: self.camera,
ReviewSegment.start_time: self.start_time,
ReviewSegment.end_time: self.last_update,
ReviewSegment.severity: self.severity.value,
ReviewSegment.thumb_path: path,
ReviewSegment.data: {
"detections": list(self.detections),
"objects": list(self.objects),
"sub_labels": list(self.sub_labels),
"zones": list(self.zones),
"audio": list(self.audio),
"significant_motion_areas": self.sig_motion_areas,
},
}
class ReviewSegmentMaintainer(threading.Thread):
"""Maintain review segments."""
def __init__(self, config: FrigateConfig, stop_event: MpEvent):
threading.Thread.__init__(self)
self.name = "review_segment_maintainer"
self.config = config
self.active_review_segments: dict[str, Optional[PendingReviewSegment]] = {}
self.frame_manager = SharedMemoryFrameManager()
# create communication for review segments
self.requestor = InterProcessRequestor()
self.config_subscriber = ConfigSubscriber("config/record/")
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all)
self.stop_event = stop_event
def end_segment(self, segment: PendingReviewSegment) -> None:
"""End segment."""
self.requestor.send_data(UPSERT_REVIEW_SEGMENT, segment.end())
self.active_review_segments[segment.camera] = None
def update_existing_segment(
self,
segment: PendingReviewSegment,
frame_time: float,
objects: list[TrackedObject],
motion: list,
) -> None:
"""Validate if existing review segment should continue."""
camera_config = self.config.cameras[segment.camera]
active_objects = get_active_objects(frame_time, camera_config, objects)
if len(active_objects) > 0:
segment.last_update = frame_time
# update type for this segment now that active objects are detected
if segment.severity == SeverityEnum.signification_motion:
segment.severity = SeverityEnum.detection
if len(active_objects) > segment.frame_active_count:
frame_id = f"{camera_config.name}{frame_time}"
yuv_frame = self.frame_manager.get(
frame_id, camera_config.frame_shape_yuv
)
segment.update_frame(camera_config, yuv_frame, active_objects)
self.frame_manager.close(frame_id)
for object in active_objects:
segment.detections.add(object["id"])
segment.objects.add(object["label"])
if object["sub_label"]:
segment.sub_labels.add(object["sub_label"])
# if object is alert label and has qualified for recording
# mark this review as alert
if (
segment.severity == SeverityEnum.detection
and object["has_clip"]
and object["label"] in camera_config.objects.alert
):
segment.severity = SeverityEnum.alert
# keep zones up to date
if len(object["current_zones"]) > 0:
segment.zones.update(object["current_zones"])
elif (
segment.severity == SeverityEnum.signification_motion and len(motion) >= 20
):
segment.last_update = frame_time
else:
if segment.severity == SeverityEnum.alert and frame_time > (
segment.last_update + 60
):
self.end_segment(segment)
elif frame_time > (segment.last_update + 10):
self.end_segment(segment)
def check_if_new_segment(
self,
camera: str,
frame_time: float,
objects: list[TrackedObject],
motion: list,
) -> None:
"""Check if a new review segment should be created."""
camera_config = self.config.cameras[camera]
active_objects = get_active_objects(frame_time, camera_config, objects)
if len(active_objects) > 0:
has_sig_object = False
detections: set = set()
objects: set = set()
sub_labels: set = (set(),)
zones: set = set()
for object in active_objects:
if (
not has_sig_object
and object["has_clip"]
and object["label"] in camera_config.objects.alert
):
has_sig_object = True
detections.add(object["id"])
objects.add(object["label"])
if object["sub_label"]:
sub_labels.add(object["sub_label"])
zones.update(object["current_zones"])
self.active_review_segments[camera] = PendingReviewSegment(
camera,
frame_time,
SeverityEnum.alert if has_sig_object else SeverityEnum.detection,
detections,
objects=objects,
sub_labels=sub_labels,
zones=zones,
)
elif len(motion) >= 20:
self.active_review_segments[camera] = PendingReviewSegment(
camera, frame_time, SeverityEnum.signification_motion, motion=motion
)
def run(self) -> None:
while not self.stop_event.is_set():
# check if there is an updated config
while True:
(
updated_topic,
updated_record_config,
) = self.config_subscriber.check_for_update()
if not updated_topic:
break
camera_name = updated_topic.rpartition("/")[-1]
self.config.cameras[camera_name].record = updated_record_config
(topic, data) = self.detection_subscriber.get_data(timeout=1)
if not topic:
continue
if topic == DetectionTypeEnum.video:
(
camera,
frame_time,
current_tracked_objects,
motion_boxes,
regions,
) = data
elif topic == DetectionTypeEnum.audio:
(
camera,
frame_time,
dBFS,
audio_detections,
) = data
if not self.config.cameras[camera].record.enabled:
continue
current_segment = self.active_review_segments.get(camera)
if current_segment is not None:
if topic == DetectionTypeEnum.video:
self.update_existing_segment(
current_segment,
frame_time,
current_tracked_objects,
motion_boxes,
)
elif topic == DetectionTypeEnum.audio and len(audio_detections) > 0:
current_segment.last_update = frame_time
current_segment.audio.update(audio_detections)
else:
if topic == DetectionTypeEnum.video:
self.check_if_new_segment(
camera,
frame_time,
current_tracked_objects,
motion_boxes,
)
elif topic == DetectionTypeEnum.audio and len(audio_detections) > 0:
self.active_review_segments[camera] = PendingReviewSegment(
camera,
frame_time,
SeverityEnum.detection,
audio=set(audio_detections),
)
def get_active_objects(
frame_time: float, camera_config: CameraConfig, all_objects: list[TrackedObject]
) -> list[TrackedObject]:
"""get active objects for detection."""
return [
o
for o in all_objects
if o["motionless_count"] < camera_config.detect.stationary.threshold
and o["frame_time"] == frame_time
and not o["false_positive"]
]

36
frigate/review/review.py Normal file
View File

@ -0,0 +1,36 @@
"""Run recording maintainer and cleanup."""
import logging
import multiprocessing as mp
import signal
import threading
from types import FrameType
from typing import Optional
from setproctitle import setproctitle
from frigate.config import FrigateConfig
from frigate.review.maintainer import ReviewSegmentMaintainer
from frigate.util.services import listen
logger = logging.getLogger(__name__)
def manage_review_segments(config: FrigateConfig) -> None:
stop_event = mp.Event()
def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None:
stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
threading.current_thread().name = "process:review_segment_manager"
setproctitle("frigate.review_segment_manager")
listen()
maintainer = ReviewSegmentMaintainer(
config,
stop_event,
)
maintainer.start()

View File

@ -211,6 +211,51 @@ def calculate_region(frame_shape, xmin, ymin, xmax, ymax, model_size, multiplier
return (x_offset, y_offset, x_offset + size, y_offset + size)
def calculate_16_9_crop(frame_shape, xmin, ymin, xmax, ymax, multiplier=1.25):
min_size = 200
# size is the longest edge and divisible by 4
x_size = int(xmax - xmin * multiplier)
if x_size < min_size:
x_size = min_size
y_size = int(ymax - ymin * multiplier)
if y_size < min_size:
y_size = min_size
# calculate 16x9 using height
aspect_y_size = int(9 / 16 * x_size)
# if 16:9 by height is too small
if aspect_y_size < y_size or aspect_y_size > frame_shape[0]:
x_size = int((16 / 9) * y_size) // 4 * 4
if x_size / y_size > 1.8:
return None
else:
y_size = aspect_y_size // 4 * 4
# x_offset is midpoint of bounding box minus half the size
x_offset = int((xmax - xmin) / 2.0 + xmin - x_size / 2.0)
# if outside the image
if x_offset < 0:
x_offset = 0
elif x_offset > (frame_shape[1] - x_size):
x_offset = max(0, (frame_shape[1] - x_size))
# y_offset is midpoint of bounding box minus half the size
y_offset = int((ymax - ymin) / 2.0 + ymin - y_size / 2.0)
# # if outside the image
if y_offset < 0:
y_offset = 0
elif y_offset > (frame_shape[0] - y_size):
y_offset = max(0, (frame_shape[0] - y_size))
return (x_offset, y_offset, x_offset + x_size, y_offset + y_size)
def get_yuv_crop(frame_shape, crop):
# crop should be (x1,y1,x2,y2)
frame_height = frame_shape[0] // 3 * 2

View File

@ -0,0 +1,42 @@
"""Peewee migrations -- 022_create_review_segment_table.py.
Some examples (model - class or model name)::
> Model = migrator.orm['model_name'] # Return model in current state by name
> migrator.sql(sql) # Run custom SQL
> migrator.python(func, *args, **kwargs) # Run python code
> migrator.create_model(Model) # Create a model (could be used as decorator)
> migrator.remove_model(model, cascade=True) # Remove a model
> migrator.add_fields(model, **fields) # Add fields to a model
> migrator.change_fields(model, **fields) # Change fields
> migrator.remove_fields(model, *field_names, cascade=True)
> migrator.rename_field(model, old_field_name, new_field_name)
> migrator.rename_table(model, new_table_name)
> migrator.add_index(model, *col_names, unique=False)
> migrator.drop_index(model, *col_names)
> migrator.add_not_null(model, *field_names)
> migrator.drop_not_null(model, *field_names)
> migrator.add_default(model, field_name, default)
"""
import peewee as pw
SQL = pw.SQL
def migrate(migrator, database, fake=False, **kwargs):
migrator.sql(
'CREATE TABLE IF NOT EXISTS "reviewsegment" ("id" VARCHAR(30) NOT NULL PRIMARY KEY, "camera" VARCHAR(20) NOT NULL, "start_time" DATETIME NOT NULL, "end_time" DATETIME, "has_been_reviewed" INTEGER NOT NULL, "severity" VARCHAR(30) NOT NULL, "thumb_path" VARCHAR(255) NOT NULL, "data" JSON NOT NULL)'
)
migrator.sql(
'CREATE INDEX IF NOT EXISTS "review_segment_camera" ON "reviewsegment" ("camera")'
)
migrator.sql(
'CREATE INDEX "review_segment_start_time_end_time" ON "reviewsegment" ("start_time" DESC, "end_time" DESC)'
)
def rollback(migrator, database, fake=False, **kwargs):
pass