From f67ec241d4851d37d49ea819d764f2d4ea34adf9 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Thu, 10 Oct 2024 14:28:43 -0500 Subject: [PATCH] Add embeddings reindex progress to the UI (#14268) * refactor dispatcher * add reindex to dictionary * add circular progress bar component * Add progress to UI when embeddings are reindexing * readd comments to dispatcher for clarity * Only report progress every 10 events so we don't spam the logs and websocket * clean up --- .cspell/frigate-dictionary.txt | 1 + frigate/comms/dispatcher.py | 135 ++++++++++----- frigate/const.py | 1 + frigate/embeddings/embeddings.py | 52 +++++- web/src/api/ws.tsx | 37 +++++ .../components/ui/circular-progress-bar.tsx | 108 ++++++++++++ web/src/pages/Explore.tsx | 155 ++++++++++++------ web/src/types/ws.ts | 7 + 8 files changed, 397 insertions(+), 99 deletions(-) create mode 100644 web/src/components/ui/circular-progress-bar.tsx diff --git a/.cspell/frigate-dictionary.txt b/.cspell/frigate-dictionary.txt index 6c0e8022f..0cbcc4beb 100644 --- a/.cspell/frigate-dictionary.txt +++ b/.cspell/frigate-dictionary.txt @@ -212,6 +212,7 @@ rcond RDONLY rebranded referer +reindex Reolink restream restreamed diff --git a/frigate/comms/dispatcher.py b/frigate/comms/dispatcher.py index 1605d645a..c1a9f7e86 100644 --- a/frigate/comms/dispatcher.py +++ b/frigate/comms/dispatcher.py @@ -15,6 +15,7 @@ from frigate.const import ( INSERT_PREVIEW, REQUEST_REGION_GRID, UPDATE_CAMERA_ACTIVITY, + UPDATE_EMBEDDINGS_REINDEX_PROGRESS, UPDATE_EVENT_DESCRIPTION, UPDATE_MODEL_STATE, UPSERT_REVIEW_SEGMENT, @@ -86,35 +87,27 @@ class Dispatcher: self.camera_activity = {} self.model_state = {} + self.embeddings_reindex = {} def _receive(self, topic: str, payload: str) -> Optional[Any]: """Handle receiving of payload from communicators.""" - if topic.endswith("set"): + + def handle_camera_command(command_type, camera_name, payload): try: - # example /cam_name/detect/set payload=ON|OFF - if topic.count("/") == 2: - camera_name = topic.split("/")[-3] - command = topic.split("/")[-2] - self._camera_settings_handlers[command](camera_name, payload) - elif topic.count("/") == 1: - command = topic.split("/")[-2] - self._global_settings_handlers[command](payload) - except IndexError: - logger.error(f"Received invalid set command: {topic}") - return - elif topic.endswith("ptz"): - try: - # example /cam_name/ptz payload=MOVE_UP|MOVE_DOWN|STOP... - camera_name = topic.split("/")[-2] - self._on_ptz_command(camera_name, payload) - except IndexError: - logger.error(f"Received invalid ptz command: {topic}") - return - elif topic == "restart": + if command_type == "set": + self._camera_settings_handlers[camera_name](camera_name, payload) + elif command_type == "ptz": + self._on_ptz_command(camera_name, payload) + except KeyError: + logger.error(f"Invalid command type: {command_type}") + + def handle_restart(): restart_frigate() - elif topic == INSERT_MANY_RECORDINGS: + + def handle_insert_many_recordings(): Recordings.insert_many(payload).execute() - elif topic == REQUEST_REGION_GRID: + + def handle_request_region_grid(): camera = payload grid = get_camera_regions_grid( camera, @@ -122,24 +115,25 @@ class Dispatcher: max(self.config.model.width, self.config.model.height), ) return grid - elif topic == INSERT_PREVIEW: + + def handle_insert_preview(): Previews.insert(payload).execute() - elif topic == UPSERT_REVIEW_SEGMENT: - ( - ReviewSegment.insert(payload) - .on_conflict( - conflict_target=[ReviewSegment.id], - update=payload, - ) - .execute() - ) - elif topic == CLEAR_ONGOING_REVIEW_SEGMENTS: - ReviewSegment.update(end_time=datetime.datetime.now().timestamp()).where( - ReviewSegment.end_time == None + + def handle_upsert_review_segment(): + ReviewSegment.insert(payload).on_conflict( + conflict_target=[ReviewSegment.id], + update=payload, ).execute() - elif topic == UPDATE_CAMERA_ACTIVITY: + + def handle_clear_ongoing_review_segments(): + ReviewSegment.update(end_time=datetime.datetime.now().timestamp()).where( + ReviewSegment.end_time.is_null(True) + ).execute() + + def handle_update_camera_activity(): self.camera_activity = payload - elif topic == UPDATE_EVENT_DESCRIPTION: + + def handle_update_event_description(): event: Event = Event.get(Event.id == payload["id"]) event.data["description"] = payload["description"] event.save() @@ -147,15 +141,30 @@ class Dispatcher: "event_update", json.dumps({"id": event.id, "description": event.data["description"]}), ) - elif topic == UPDATE_MODEL_STATE: + + def handle_update_model_state(): model = payload["model"] state = payload["state"] self.model_state[model] = ModelStatusTypesEnum[state] self.publish("model_state", json.dumps(self.model_state)) - elif topic == "modelState": - model_state = self.model_state.copy() - self.publish("model_state", json.dumps(model_state)) - elif topic == "onConnect": + + def handle_model_state(): + self.publish("model_state", json.dumps(self.model_state.copy())) + + def handle_update_embeddings_reindex_progress(): + self.embeddings_reindex = payload + self.publish( + "embeddings_reindex_progress", + json.dumps(payload), + ) + + def handle_embeddings_reindex_progress(): + self.publish( + "embeddings_reindex_progress", + json.dumps(self.embeddings_reindex.copy()), + ) + + def handle_on_connect(): camera_status = self.camera_activity.copy() for camera in camera_status.keys(): @@ -170,6 +179,46 @@ class Dispatcher: } self.publish("camera_activity", json.dumps(camera_status)) + + # Dictionary mapping topic to handlers + topic_handlers = { + INSERT_MANY_RECORDINGS: handle_insert_many_recordings, + REQUEST_REGION_GRID: handle_request_region_grid, + INSERT_PREVIEW: handle_insert_preview, + UPSERT_REVIEW_SEGMENT: handle_upsert_review_segment, + CLEAR_ONGOING_REVIEW_SEGMENTS: handle_clear_ongoing_review_segments, + UPDATE_CAMERA_ACTIVITY: handle_update_camera_activity, + UPDATE_EVENT_DESCRIPTION: handle_update_event_description, + UPDATE_MODEL_STATE: handle_update_model_state, + UPDATE_EMBEDDINGS_REINDEX_PROGRESS: handle_update_embeddings_reindex_progress, + "restart": handle_restart, + "embeddingsReindexProgress": handle_embeddings_reindex_progress, + "modelState": handle_model_state, + "onConnect": handle_on_connect, + } + + if topic.endswith("set") or topic.endswith("ptz"): + try: + parts = topic.split("/") + if len(parts) == 3 and topic.endswith("set"): + # example /cam_name/detect/set payload=ON|OFF + camera_name = parts[-3] + command = parts[-2] + handle_camera_command("set", camera_name, payload) + elif len(parts) == 2 and topic.endswith("set"): + command = parts[-2] + self._global_settings_handlers[command](payload) + elif len(parts) == 2 and topic.endswith("ptz"): + # example /cam_name/ptz payload=MOVE_UP|MOVE_DOWN|STOP... + camera_name = parts[-2] + handle_camera_command("ptz", camera_name, payload) + except IndexError: + logger.error( + f"Received invalid {topic.split('/')[-1]} command: {topic}" + ) + return + elif topic in topic_handlers: + return topic_handlers[topic]() else: self.publish(topic, payload, retain=False) diff --git a/frigate/const.py b/frigate/const.py index e8e841f4f..ad1aacd0f 100644 --- a/frigate/const.py +++ b/frigate/const.py @@ -85,6 +85,7 @@ CLEAR_ONGOING_REVIEW_SEGMENTS = "clear_ongoing_review_segments" UPDATE_CAMERA_ACTIVITY = "update_camera_activity" UPDATE_EVENT_DESCRIPTION = "update_event_description" UPDATE_MODEL_STATE = "update_model_state" +UPDATE_EMBEDDINGS_REINDEX_PROGRESS = "handle_embeddings_reindex_progress" # Stats Values diff --git a/frigate/embeddings/embeddings.py b/frigate/embeddings/embeddings.py index 9bcf2e6c0..dda4d95fd 100644 --- a/frigate/embeddings/embeddings.py +++ b/frigate/embeddings/embeddings.py @@ -10,7 +10,7 @@ from playhouse.shortcuts import model_to_dict from frigate.comms.inter_process import InterProcessRequestor from frigate.config.semantic_search import SemanticSearchConfig -from frigate.const import UPDATE_MODEL_STATE +from frigate.const import UPDATE_EMBEDDINGS_REINDEX_PROGRESS, UPDATE_MODEL_STATE from frigate.db.sqlitevecq import SqliteVecQueueDatabase from frigate.models import Event from frigate.types import ModelStatusTypesEnum @@ -165,19 +165,36 @@ class Embeddings: return embedding def reindex(self) -> None: - logger.info("Indexing event embeddings...") + logger.info("Indexing tracked object embeddings...") self._drop_tables() self._create_tables() st = time.time() totals = { - "thumb": 0, - "desc": 0, + "thumbnails": 0, + "descriptions": 0, + "processed_objects": 0, + "total_objects": 0, } + self.requestor.send_data(UPDATE_EMBEDDINGS_REINDEX_PROGRESS, totals) + + # Get total count of events to process + total_events = ( + Event.select() + .where( + (Event.has_clip == True | Event.has_snapshot == True) + & Event.thumbnail.is_null(False) + ) + .count() + ) + totals["total_objects"] = total_events + batch_size = 100 current_page = 1 + processed_events = 0 + events = ( Event.select() .where( @@ -193,11 +210,29 @@ class Embeddings: for event in events: thumbnail = base64.b64decode(event.thumbnail) self.upsert_thumbnail(event.id, thumbnail) - totals["thumb"] += 1 + totals["thumbnails"] += 1 + if description := event.data.get("description", "").strip(): - totals["desc"] += 1 + totals["descriptions"] += 1 self.upsert_description(event.id, description) + totals["processed_objects"] += 1 + + # report progress every 10 events so we don't spam the logs + if (totals["processed_objects"] % 10) == 0: + progress = (processed_events / total_events) * 100 + logger.debug( + "Processed %d/%d events (%.2f%% complete) | Thumbnails: %d, Descriptions: %d", + processed_events, + total_events, + progress, + totals["thumbnails"], + totals["descriptions"], + ) + + self.requestor.send_data(UPDATE_EMBEDDINGS_REINDEX_PROGRESS, totals) + + # Move to the next page current_page += 1 events = ( Event.select() @@ -211,7 +246,8 @@ class Embeddings: logger.info( "Embedded %d thumbnails and %d descriptions in %s seconds", - totals["thumb"], - totals["desc"], + totals["thumbnails"], + totals["descriptions"], time.time() - st, ) + self.requestor.send_data(UPDATE_EMBEDDINGS_REINDEX_PROGRESS, totals) diff --git a/web/src/api/ws.tsx b/web/src/api/ws.tsx index a78722b66..2e083cf83 100644 --- a/web/src/api/ws.tsx +++ b/web/src/api/ws.tsx @@ -2,6 +2,7 @@ import { baseUrl } from "./baseUrl"; import { useCallback, useEffect, useState } from "react"; import useWebSocket, { ReadyState } from "react-use-websocket"; import { + EmbeddingsReindexProgressType, FrigateCameraState, FrigateEvent, FrigateReview, @@ -302,6 +303,42 @@ export function useModelState( return { payload: data ? data[model] : undefined }; } +export function useEmbeddingsReindexProgress( + revalidateOnFocus: boolean = true, +): { + payload: EmbeddingsReindexProgressType; +} { + const { + value: { payload }, + send: sendCommand, + } = useWs("embeddings_reindex_progress", "embeddingsReindexProgress"); + + const data = useDeepMemo(JSON.parse(payload as string)); + + useEffect(() => { + let listener = undefined; + if (revalidateOnFocus) { + sendCommand("embeddingsReindexProgress"); + listener = () => { + if (document.visibilityState == "visible") { + sendCommand("embeddingsReindexProgress"); + } + }; + addEventListener("visibilitychange", listener); + } + + return () => { + if (listener) { + removeEventListener("visibilitychange", listener); + } + }; + // we know that these deps are correct + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [revalidateOnFocus]); + + return { payload: data }; +} + export function useMotionActivity(camera: string): { payload: string } { const { value: { payload }, diff --git a/web/src/components/ui/circular-progress-bar.tsx b/web/src/components/ui/circular-progress-bar.tsx new file mode 100644 index 000000000..c1714829e --- /dev/null +++ b/web/src/components/ui/circular-progress-bar.tsx @@ -0,0 +1,108 @@ +import { cn } from "@/lib/utils"; + +interface Props { + max: number; + value: number; + min: number; + gaugePrimaryColor: string; + gaugeSecondaryColor: string; + className?: string; +} + +export default function AnimatedCircularProgressBar({ + max = 100, + min = 0, + value = 0, + gaugePrimaryColor, + gaugeSecondaryColor, + className, +}: Props) { + const circumference = 2 * Math.PI * 45; + const percentPx = circumference / 100; + const currentPercent = Math.floor(((value - min) / (max - min)) * 100); + + return ( +