diff --git a/frigate/object_processing.py b/frigate/object_processing.py index ca7ea47b8..23c84eedd 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -233,17 +233,18 @@ class CameraState: def on(self, event_type: str, callback: Callable[[dict], None]): self.callbacks[event_type].append(callback) - def update(self, frame_time, current_detections, motion_boxes, regions): - # get the new frame - frame_id = f"{self.name}{frame_time}" - + def update( + self, + frame_name: str, + frame_time: float, + current_detections: dict[str, dict[str, any]], + motion_boxes: list[tuple[int, int, int, int]], + regions: list[tuple[int, int, int, int]], + ): current_frame = self.frame_manager.get( - frame_id, self.camera_config.frame_shape_yuv + frame_name, self.camera_config.frame_shape_yuv ) - if current_frame is None: - logger.debug(f"Failed to get frame {frame_id} from SHM") - tracked_objects = self.tracked_objects.copy() current_ids = set(current_detections.keys()) previous_ids = set(tracked_objects.keys()) @@ -477,7 +478,7 @@ class CameraState: if self.previous_frame_id is not None: self.frame_manager.close(self.previous_frame_id) - self.previous_frame_id = frame_id + self.previous_frame_id = frame_name class TrackedObjectProcessor(threading.Thread): @@ -798,6 +799,7 @@ class TrackedObjectProcessor(threading.Thread): try: ( camera, + frame_name, frame_time, current_tracked_objects, motion_boxes, @@ -809,7 +811,7 @@ class TrackedObjectProcessor(threading.Thread): camera_state = self.camera_states[camera] camera_state.update( - frame_time, current_tracked_objects, motion_boxes, regions + frame_name, frame_time, current_tracked_objects, motion_boxes, regions ) self.update_mqtt_motion(camera, frame_time, motion_boxes) @@ -822,6 +824,7 @@ class TrackedObjectProcessor(threading.Thread): self.detection_publisher.publish( ( camera, + frame_name, frame_time, tracked_objects, motion_boxes, diff --git a/frigate/output/birdseye.py b/frigate/output/birdseye.py index c187c77ea..cab155b9b 100644 --- a/frigate/output/birdseye.py +++ b/frigate/output/birdseye.py @@ -268,12 +268,10 @@ class BirdsEyeFrameManager: def __init__( self, config: FrigateConfig, - frame_manager: SharedMemoryFrameManager, stop_event: mp.Event, ): self.config = config self.mode = config.birdseye.mode - self.frame_manager = frame_manager width, height = get_canvas_shape(config.birdseye.width, config.birdseye.height) self.frame_shape = (height, width) self.yuv_shape = (height * 3 // 2, width) @@ -351,18 +349,13 @@ class BirdsEyeFrameManager: logger.debug("Clearing the birdseye frame") self.frame[:] = self.blank_frame - def copy_to_position(self, position, camera=None, frame_time=None): + def copy_to_position(self, position, camera=None, frame: np.ndarray = None): if camera is None: frame = None channel_dims = None else: - frame_id = f"{camera}{frame_time}" - frame = self.frame_manager.get( - frame_id, self.config.cameras[camera].frame_shape_yuv - ) - if frame is None: - logger.debug(f"Unable to copy frame {camera}{frame_time} to birdseye.") + logger.debug(f"Unable to copy frame {camera} to birdseye.") return channel_dims = self.cameras[camera]["channel_dims"] @@ -375,8 +368,6 @@ class BirdsEyeFrameManager: channel_dims, ) - self.frame_manager.close(frame_id) - def camera_active(self, mode, object_box_count, motion_box_count): if mode == BirdseyeModeEnum.continuous: return True @@ -387,7 +378,7 @@ class BirdsEyeFrameManager: if mode == BirdseyeModeEnum.objects and object_box_count > 0: return True - def update_frame(self): + def update_frame(self, frame: np.ndarray): """Update to a new frame for birdseye.""" # determine how many cameras are tracking objects within the last inactivity_threshold seconds @@ -524,7 +515,9 @@ class BirdsEyeFrameManager: for row in self.camera_layout: for position in row: self.copy_to_position( - position[1], position[0], self.cameras[position[0]]["current_frame"] + position[1], + position[0], + frame, ) return True @@ -672,7 +665,14 @@ class BirdsEyeFrameManager: else: return standard_candidate_layout - def update(self, camera, object_count, motion_count, frame_time, frame) -> bool: + def update( + self, + camera: str, + object_count: int, + motion_count: int, + frame_time: float, + frame: np.ndarray, + ) -> bool: # don't process if birdseye is disabled for this camera camera_config = self.config.cameras[camera].birdseye @@ -700,7 +700,7 @@ class BirdsEyeFrameManager: return False try: - updated_frame = self.update_frame() + updated_frame = self.update_frame(frame) except Exception: updated_frame = False self.active_cameras = [] @@ -737,12 +737,11 @@ class Birdseye: self.broadcaster = BroadcastThread( "birdseye", self.converter, websocket_server, stop_event ) - frame_manager = SharedMemoryFrameManager() - self.birdseye_manager = BirdsEyeFrameManager(config, frame_manager, stop_event) + self.birdseye_manager = BirdsEyeFrameManager(config, stop_event) self.config_subscriber = ConfigSubscriber("config/birdseye/") if config.birdseye.restream: - self.birdseye_buffer = frame_manager.create( + self.birdseye_buffer = SharedMemoryFrameManager().create( "birdseye", self.birdseye_manager.yuv_shape[0] * self.birdseye_manager.yuv_shape[1], ) diff --git a/frigate/output/output.py b/frigate/output/output.py index 5d564b936..bb2d73511 100644 --- a/frigate/output/output.py +++ b/frigate/output/output.py @@ -88,18 +88,17 @@ def output_frames( ( camera, + frame_name, frame_time, current_tracked_objects, motion_boxes, - regions, + _, ) = data - frame_id = f"{camera}{frame_time}" - - frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv) + frame = frame_manager.get(frame_name, config.cameras[camera].frame_shape_yuv) if frame is None: - logger.debug(f"Failed to get frame {frame_id} from SHM") + logger.debug(f"Failed to get frame {frame_name} from SHM") failed_frame_requests[camera] = failed_frame_requests.get(camera, 0) + 1 if failed_frame_requests[camera] > config.cameras[camera].detect.fps: @@ -152,7 +151,7 @@ def output_frames( preview_recorders[camera].flag_offline(frame_time) preview_write_times[camera] = frame_time - frame_manager.close(frame_id) + frame_manager.close(frame_name) move_preview_frames("clips") @@ -164,15 +163,15 @@ def output_frames( ( camera, + frame_name, frame_time, current_tracked_objects, motion_boxes, regions, ) = data - frame_id = f"{camera}{frame_time}" - frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv) - frame_manager.close(frame_id) + frame = frame_manager.get(frame_name, config.cameras[camera].frame_shape_yuv) + frame_manager.close(frame_name) detection_subscriber.stop() diff --git a/frigate/ptz/autotrack.py b/frigate/ptz/autotrack.py index 24b12087d..03bb3840e 100644 --- a/frigate/ptz/autotrack.py +++ b/frigate/ptz/autotrack.py @@ -59,7 +59,13 @@ class PtzMotionEstimator: self.ptz_metrics.reset.set() logger.debug(f"{config.name}: Motion estimator init") - def motion_estimator(self, detections, frame_time, camera): + def motion_estimator( + self, + detections: list[dict[str, any]], + frame_name: str, + frame_time: float, + camera: str, + ): # If we've just started up or returned to our preset, reset motion estimator for new tracking session if self.ptz_metrics.reset.is_set(): self.ptz_metrics.reset.clear() @@ -92,9 +98,8 @@ class PtzMotionEstimator: f"{camera}: Motion estimator running - frame time: {frame_time}" ) - frame_id = f"{camera}{frame_time}" yuv_frame = self.frame_manager.get( - frame_id, self.camera_config.frame_shape_yuv + frame_name, self.camera_config.frame_shape_yuv ) if yuv_frame is None: @@ -136,7 +141,7 @@ class PtzMotionEstimator: except Exception: pass - self.frame_manager.close(frame_id) + self.frame_manager.close(frame_name) return self.coord_transformations diff --git a/frigate/record/maintainer.py b/frigate/record/maintainer.py index e97fb0a44..4f976bbf6 100644 --- a/frigate/record/maintainer.py +++ b/frigate/record/maintainer.py @@ -514,6 +514,7 @@ class RecordingMaintainer(threading.Thread): if topic == DetectionTypeEnum.video: ( camera, + _, frame_time, current_tracked_objects, motion_boxes, diff --git a/frigate/review/maintainer.py b/frigate/review/maintainer.py index 38ed59294..23a42e7a7 100644 --- a/frigate/review/maintainer.py +++ b/frigate/review/maintainer.py @@ -234,6 +234,7 @@ class ReviewSegmentMaintainer(threading.Thread): def update_existing_segment( self, segment: PendingReviewSegment, + frame_name: str, frame_time: float, objects: list[TrackedObject], ) -> None: @@ -292,36 +293,34 @@ class ReviewSegmentMaintainer(threading.Thread): if should_update: try: - frame_id = f"{camera_config.name}{frame_time}" yuv_frame = self.frame_manager.get( - frame_id, camera_config.frame_shape_yuv + frame_name, camera_config.frame_shape_yuv ) if yuv_frame is None: - logger.debug(f"Failed to get frame {frame_id} from SHM") + logger.debug(f"Failed to get frame {frame_name} from SHM") return self._publish_segment_update( segment, camera_config, yuv_frame, active_objects, prev_data ) - self.frame_manager.close(frame_id) + self.frame_manager.close(frame_name) except FileNotFoundError: return if not has_activity: if not segment.has_frame: try: - frame_id = f"{camera_config.name}{frame_time}" yuv_frame = self.frame_manager.get( - frame_id, camera_config.frame_shape_yuv + frame_name, camera_config.frame_shape_yuv ) if yuv_frame is None: - logger.debug(f"Failed to get frame {frame_id} from SHM") + logger.debug(f"Failed to get frame {frame_name} from SHM") return segment.save_full_frame(camera_config, yuv_frame) - self.frame_manager.close(frame_id) + self.frame_manager.close(frame_name) self._publish_segment_update( segment, camera_config, None, [], prev_data ) @@ -338,6 +337,7 @@ class ReviewSegmentMaintainer(threading.Thread): def check_if_new_segment( self, camera: str, + frame_name: str, frame_time: float, objects: list[TrackedObject], ) -> None: @@ -414,19 +414,18 @@ class ReviewSegmentMaintainer(threading.Thread): ) try: - frame_id = f"{camera_config.name}{frame_time}" yuv_frame = self.frame_manager.get( - frame_id, camera_config.frame_shape_yuv + frame_name, camera_config.frame_shape_yuv ) if yuv_frame is None: - logger.debug(f"Failed to get frame {frame_id} from SHM") + logger.debug(f"Failed to get frame {frame_name} from SHM") return self.active_review_segments[camera].update_frame( camera_config, yuv_frame, active_objects ) - self.frame_manager.close(frame_id) + self.frame_manager.close(frame_name) self._publish_segment_start(self.active_review_segments[camera]) except FileNotFoundError: return @@ -454,16 +453,17 @@ class ReviewSegmentMaintainer(threading.Thread): if topic == DetectionTypeEnum.video: ( camera, + frame_name, frame_time, current_tracked_objects, - motion_boxes, - regions, + _, + _, ) = data elif topic == DetectionTypeEnum.audio: ( camera, frame_time, - dBFS, + _, audio_detections, ) = data elif topic == DetectionTypeEnum.api: @@ -488,6 +488,7 @@ class ReviewSegmentMaintainer(threading.Thread): if topic == DetectionTypeEnum.video: self.update_existing_segment( current_segment, + frame_name, frame_time, current_tracked_objects, ) @@ -538,6 +539,7 @@ class ReviewSegmentMaintainer(threading.Thread): if topic == DetectionTypeEnum.video: self.check_if_new_segment( camera, + frame_name, frame_time, current_tracked_objects, ) diff --git a/frigate/track/__init__.py b/frigate/track/__init__.py index 3d9e45da2..4fe51f476 100644 --- a/frigate/track/__init__.py +++ b/frigate/track/__init__.py @@ -9,5 +9,7 @@ class ObjectTracker(ABC): pass @abstractmethod - def match_and_update(self, frame_time: float, detections) -> None: + def match_and_update( + self, frame_name: str, frame_time: float, detections: list[dict[str, any]] + ) -> None: pass diff --git a/frigate/track/centroid_tracker.py b/frigate/track/centroid_tracker.py index 36d780cdf..25d4cb860 100644 --- a/frigate/track/centroid_tracker.py +++ b/frigate/track/centroid_tracker.py @@ -129,7 +129,7 @@ class CentroidTracker(ObjectTracker): self.tracked_objects[id].update(new_obj) - def update_frame_times(self, frame_time): + def update_frame_times(self, frame_name, frame_time): for id in list(self.tracked_objects.keys()): self.tracked_objects[id]["frame_time"] = frame_time self.tracked_objects[id]["motionless_count"] += 1 diff --git a/frigate/track/norfair_tracker.py b/frigate/track/norfair_tracker.py index 99085be4d..67950bd0c 100644 --- a/frigate/track/norfair_tracker.py +++ b/frigate/track/norfair_tracker.py @@ -268,7 +268,7 @@ class NorfairTracker(ObjectTracker): self.tracked_objects[id].update(obj) - def update_frame_times(self, frame_time): + def update_frame_times(self, frame_name: str, frame_time: float): # if the object was there in the last frame, assume it's still there detections = [ ( @@ -282,9 +282,11 @@ class NorfairTracker(ObjectTracker): for id, obj in self.tracked_objects.items() if self.disappeared[id] == 0 ] - self.match_and_update(frame_time, detections=detections) + self.match_and_update(frame_name, frame_time, detections=detections) - def match_and_update(self, frame_time, detections): + def match_and_update( + self, frame_name: str, frame_time: float, detections: list[dict[str, any]] + ): norfair_detections = [] for obj in detections: @@ -322,7 +324,7 @@ class NorfairTracker(ObjectTracker): ) coord_transformations = self.ptz_motion_estimator.motion_estimator( - detections, frame_time, self.camera_name + detections, frame_name, frame_time, self.camera_name ) tracked_objects = self.tracker.update( diff --git a/frigate/util/image.py b/frigate/util/image.py index cf1332752..4e3161192 100644 --- a/frigate/util/image.py +++ b/frigate/util/image.py @@ -717,19 +717,27 @@ def clipped(obj, frame_shape): class FrameManager(ABC): @abstractmethod - def create(self, name, size) -> AnyStr: + def create(self, name: str, size: int) -> AnyStr: pass @abstractmethod - def get(self, name, timeout_ms=0): + def write(self, name: str) -> memoryview: pass @abstractmethod - def close(self, name): + def get(self, name: str, timeout_ms: int = 0): pass @abstractmethod - def delete(self, name): + def close(self, name: str): + pass + + @abstractmethod + def delete(self, name: str): + pass + + @abstractmethod + def cleanup(self): pass @@ -790,6 +798,18 @@ class SharedMemoryFrameManager(FrameManager): self.shm_store[name] = shm return shm.buf + def write(self, name: str) -> memoryview: + try: + if name in self.shm_store: + shm = self.shm_store[name] + else: + shm = UntrackedSharedMemory(name=name) + self.shm_store[name] = shm + return shm.buf + except FileNotFoundError: + logger.info(f"the file {name} not found") + return None + def get(self, name: str, shape) -> Optional[np.ndarray]: try: if name in self.shm_store: @@ -824,6 +844,15 @@ class SharedMemoryFrameManager(FrameManager): except FileNotFoundError: pass + def cleanup(self) -> None: + for shm in self.shm_store.values(): + shm.close() + + try: + shm.unlink() + except FileNotFoundError: + pass + def create_mask(frame_shape, mask): mask_img = np.zeros(frame_shape, np.uint8) diff --git a/frigate/video.py b/frigate/video.py index c0341446a..4e7fe660d 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -94,7 +94,6 @@ def capture_frames( ffmpeg_process, config: CameraConfig, shm_frame_count: int, - shm_frames: list[str], frame_shape, frame_manager: FrameManager, frame_queue, @@ -109,25 +108,21 @@ def capture_frames( skipped_eps = EventsPerSecond() skipped_eps.start() + # pre-create shms + for i in range(shm_frame_count): + frame_manager.create(f"{config.name}{i}", frame_size) + + frame_index = 0 + while True: fps.value = frame_rate.eps() skipped_fps.value = skipped_eps.eps() current_frame.value = datetime.datetime.now().timestamp() - frame_name = f"{config.name}{current_frame.value}" - frame_buffer = frame_manager.create(frame_name, frame_size) + frame_name = f"{config.name}{frame_index}" + frame_buffer = frame_manager.write(frame_name) try: frame_buffer[:] = ffmpeg_process.stdout.read(frame_size) - - # update frame cache and cleanup existing frames - shm_frames.append(frame_name) - - if len(shm_frames) > shm_frame_count: - expired_frame_name = shm_frames.pop(0) - frame_manager.delete(expired_frame_name) except Exception: - # always delete the frame - frame_manager.delete(frame_name) - # shutdown has been initiated if stop_event.is_set(): break @@ -147,12 +142,16 @@ def capture_frames( # don't lock the queue to check, just try since it should rarely be full try: # add to the queue - frame_queue.put(current_frame.value, False) + frame_queue.put((frame_name, current_frame.value), False) frame_manager.close(frame_name) except queue.Full: # if the queue is full, skip this frame skipped_eps.update() + frame_index = 0 if frame_index == shm_frame_count - 1 else frame_index + 1 + + frame_manager.cleanup() + class CameraWatchdog(threading.Thread): def __init__( @@ -171,7 +170,6 @@ class CameraWatchdog(threading.Thread): self.camera_name = camera_name self.config = config self.shm_frame_count = shm_frame_count - self.shm_frames: list[str] = [] self.capture_thread = None self.ffmpeg_detect_process = None self.logpipe = LogPipe(f"ffmpeg.{self.camera_name}.detect") @@ -304,7 +302,6 @@ class CameraWatchdog(threading.Thread): self.capture_thread = CameraCapture( self.config, self.shm_frame_count, - self.shm_frames, self.ffmpeg_detect_process, self.frame_shape, self.frame_queue, @@ -345,7 +342,6 @@ class CameraCapture(threading.Thread): self, config: CameraConfig, shm_frame_count: int, - shm_frames: list[str], ffmpeg_process, frame_shape, frame_queue, @@ -357,7 +353,6 @@ class CameraCapture(threading.Thread): self.name = f"capture:{config.name}" self.config = config self.shm_frame_count = shm_frame_count - self.shm_frames = shm_frames self.frame_shape = frame_shape self.frame_queue = frame_queue self.fps = fps @@ -373,7 +368,6 @@ class CameraCapture(threading.Thread): self.ffmpeg_process, self.config, self.shm_frame_count, - self.shm_frames, self.frame_shape, self.frame_manager, self.frame_queue, @@ -479,8 +473,8 @@ def track_camera( # empty the frame queue logger.info(f"{name}: emptying frame queue") while not frame_queue.empty(): - frame_time = frame_queue.get(False) - frame_manager.delete(f"{name}{frame_time}") + (frame_name, _) = frame_queue.get(False) + frame_manager.delete(frame_name) logger.info(f"{name}: exiting subprocess") @@ -576,9 +570,9 @@ def process_frames( try: if exit_on_empty: - frame_time = frame_queue.get(False) + frame_name, frame_time = frame_queue.get(False) else: - frame_time = frame_queue.get(True, 1) + frame_name, frame_time = frame_queue.get(True, 1) except queue.Empty: if exit_on_empty: logger.info("Exiting track_objects...") @@ -588,9 +582,7 @@ def process_frames( camera_metrics.detection_frame.value = frame_time ptz_metrics.frame_time.value = frame_time - frame = frame_manager.get( - f"{camera_name}{frame_time}", (frame_shape[0] * 3 // 2, frame_shape[1]) - ) + frame = frame_manager.get(frame_name, (frame_shape[0] * 3 // 2, frame_shape[1])) if frame is None: logger.debug(f"{camera_name}: frame {frame_time} is not in memory store.") @@ -604,7 +596,7 @@ def process_frames( # if detection is disabled if not detect_config.enabled: - object_tracker.match_and_update(frame_time, []) + object_tracker.match_and_update(frame_name, frame_time, []) else: # get stationary object ids # check every Nth frame for stationary objects @@ -728,10 +720,12 @@ def process_frames( if d[0] not in model_config.all_attributes ] # now that we have refined our detections, we need to track objects - object_tracker.match_and_update(frame_time, tracked_detections) + object_tracker.match_and_update( + frame_name, frame_time, tracked_detections + ) # else, just update the frame times for the stationary objects else: - object_tracker.update_frame_times(frame_time) + object_tracker.update_frame_times(frame_name, frame_time) # group the attribute detections based on what label they apply to attribute_detections: dict[str, list[TrackedObjectAttribute]] = {} @@ -836,7 +830,7 @@ def process_frames( ) # add to the queue if not full if detected_objects_queue.full(): - frame_manager.delete(f"{camera_name}{frame_time}") + frame_manager.close(frame_name) continue else: fps_tracker.update() @@ -844,6 +838,7 @@ def process_frames( detected_objects_queue.put( ( camera_name, + frame_name, frame_time, detections, motion_boxes, @@ -851,7 +846,7 @@ def process_frames( ) ) camera_metrics.detection_fps.value = object_detector.fps.eps() - frame_manager.close(f"{camera_name}{frame_time}") + frame_manager.close(frame_name) motion_detector.stop() requestor.stop()