Round robin SHM management (#15027)

* Output frame name to frames processor

* Finish implementing round robin

* Formatting
This commit is contained in:
Nicolas Mowen 2024-11-16 16:00:19 -07:00 committed by GitHub
parent f9c1600f0d
commit 45e9030358
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 134 additions and 97 deletions

View File

@ -233,17 +233,18 @@ class CameraState:
def on(self, event_type: str, callback: Callable[[dict], None]): def on(self, event_type: str, callback: Callable[[dict], None]):
self.callbacks[event_type].append(callback) self.callbacks[event_type].append(callback)
def update(self, frame_time, current_detections, motion_boxes, regions): def update(
# get the new frame self,
frame_id = f"{self.name}{frame_time}" frame_name: str,
frame_time: float,
current_detections: dict[str, dict[str, any]],
motion_boxes: list[tuple[int, int, int, int]],
regions: list[tuple[int, int, int, int]],
):
current_frame = self.frame_manager.get( current_frame = self.frame_manager.get(
frame_id, self.camera_config.frame_shape_yuv frame_name, self.camera_config.frame_shape_yuv
) )
if current_frame is None:
logger.debug(f"Failed to get frame {frame_id} from SHM")
tracked_objects = self.tracked_objects.copy() tracked_objects = self.tracked_objects.copy()
current_ids = set(current_detections.keys()) current_ids = set(current_detections.keys())
previous_ids = set(tracked_objects.keys()) previous_ids = set(tracked_objects.keys())
@ -477,7 +478,7 @@ class CameraState:
if self.previous_frame_id is not None: if self.previous_frame_id is not None:
self.frame_manager.close(self.previous_frame_id) self.frame_manager.close(self.previous_frame_id)
self.previous_frame_id = frame_id self.previous_frame_id = frame_name
class TrackedObjectProcessor(threading.Thread): class TrackedObjectProcessor(threading.Thread):
@ -798,6 +799,7 @@ class TrackedObjectProcessor(threading.Thread):
try: try:
( (
camera, camera,
frame_name,
frame_time, frame_time,
current_tracked_objects, current_tracked_objects,
motion_boxes, motion_boxes,
@ -809,7 +811,7 @@ class TrackedObjectProcessor(threading.Thread):
camera_state = self.camera_states[camera] camera_state = self.camera_states[camera]
camera_state.update( camera_state.update(
frame_time, current_tracked_objects, motion_boxes, regions frame_name, frame_time, current_tracked_objects, motion_boxes, regions
) )
self.update_mqtt_motion(camera, frame_time, motion_boxes) self.update_mqtt_motion(camera, frame_time, motion_boxes)
@ -822,6 +824,7 @@ class TrackedObjectProcessor(threading.Thread):
self.detection_publisher.publish( self.detection_publisher.publish(
( (
camera, camera,
frame_name,
frame_time, frame_time,
tracked_objects, tracked_objects,
motion_boxes, motion_boxes,

View File

@ -268,12 +268,10 @@ class BirdsEyeFrameManager:
def __init__( def __init__(
self, self,
config: FrigateConfig, config: FrigateConfig,
frame_manager: SharedMemoryFrameManager,
stop_event: mp.Event, stop_event: mp.Event,
): ):
self.config = config self.config = config
self.mode = config.birdseye.mode self.mode = config.birdseye.mode
self.frame_manager = frame_manager
width, height = get_canvas_shape(config.birdseye.width, config.birdseye.height) width, height = get_canvas_shape(config.birdseye.width, config.birdseye.height)
self.frame_shape = (height, width) self.frame_shape = (height, width)
self.yuv_shape = (height * 3 // 2, width) self.yuv_shape = (height * 3 // 2, width)
@ -351,18 +349,13 @@ class BirdsEyeFrameManager:
logger.debug("Clearing the birdseye frame") logger.debug("Clearing the birdseye frame")
self.frame[:] = self.blank_frame self.frame[:] = self.blank_frame
def copy_to_position(self, position, camera=None, frame_time=None): def copy_to_position(self, position, camera=None, frame: np.ndarray = None):
if camera is None: if camera is None:
frame = None frame = None
channel_dims = None channel_dims = None
else: else:
frame_id = f"{camera}{frame_time}"
frame = self.frame_manager.get(
frame_id, self.config.cameras[camera].frame_shape_yuv
)
if frame is None: if frame is None:
logger.debug(f"Unable to copy frame {camera}{frame_time} to birdseye.") logger.debug(f"Unable to copy frame {camera} to birdseye.")
return return
channel_dims = self.cameras[camera]["channel_dims"] channel_dims = self.cameras[camera]["channel_dims"]
@ -375,8 +368,6 @@ class BirdsEyeFrameManager:
channel_dims, channel_dims,
) )
self.frame_manager.close(frame_id)
def camera_active(self, mode, object_box_count, motion_box_count): def camera_active(self, mode, object_box_count, motion_box_count):
if mode == BirdseyeModeEnum.continuous: if mode == BirdseyeModeEnum.continuous:
return True return True
@ -387,7 +378,7 @@ class BirdsEyeFrameManager:
if mode == BirdseyeModeEnum.objects and object_box_count > 0: if mode == BirdseyeModeEnum.objects and object_box_count > 0:
return True return True
def update_frame(self): def update_frame(self, frame: np.ndarray):
"""Update to a new frame for birdseye.""" """Update to a new frame for birdseye."""
# determine how many cameras are tracking objects within the last inactivity_threshold seconds # determine how many cameras are tracking objects within the last inactivity_threshold seconds
@ -524,7 +515,9 @@ class BirdsEyeFrameManager:
for row in self.camera_layout: for row in self.camera_layout:
for position in row: for position in row:
self.copy_to_position( self.copy_to_position(
position[1], position[0], self.cameras[position[0]]["current_frame"] position[1],
position[0],
frame,
) )
return True return True
@ -672,7 +665,14 @@ class BirdsEyeFrameManager:
else: else:
return standard_candidate_layout return standard_candidate_layout
def update(self, camera, object_count, motion_count, frame_time, frame) -> bool: def update(
self,
camera: str,
object_count: int,
motion_count: int,
frame_time: float,
frame: np.ndarray,
) -> bool:
# don't process if birdseye is disabled for this camera # don't process if birdseye is disabled for this camera
camera_config = self.config.cameras[camera].birdseye camera_config = self.config.cameras[camera].birdseye
@ -700,7 +700,7 @@ class BirdsEyeFrameManager:
return False return False
try: try:
updated_frame = self.update_frame() updated_frame = self.update_frame(frame)
except Exception: except Exception:
updated_frame = False updated_frame = False
self.active_cameras = [] self.active_cameras = []
@ -737,12 +737,11 @@ class Birdseye:
self.broadcaster = BroadcastThread( self.broadcaster = BroadcastThread(
"birdseye", self.converter, websocket_server, stop_event "birdseye", self.converter, websocket_server, stop_event
) )
frame_manager = SharedMemoryFrameManager() self.birdseye_manager = BirdsEyeFrameManager(config, stop_event)
self.birdseye_manager = BirdsEyeFrameManager(config, frame_manager, stop_event)
self.config_subscriber = ConfigSubscriber("config/birdseye/") self.config_subscriber = ConfigSubscriber("config/birdseye/")
if config.birdseye.restream: if config.birdseye.restream:
self.birdseye_buffer = frame_manager.create( self.birdseye_buffer = SharedMemoryFrameManager().create(
"birdseye", "birdseye",
self.birdseye_manager.yuv_shape[0] * self.birdseye_manager.yuv_shape[1], self.birdseye_manager.yuv_shape[0] * self.birdseye_manager.yuv_shape[1],
) )

View File

@ -88,18 +88,17 @@ def output_frames(
( (
camera, camera,
frame_name,
frame_time, frame_time,
current_tracked_objects, current_tracked_objects,
motion_boxes, motion_boxes,
regions, _,
) = data ) = data
frame_id = f"{camera}{frame_time}" frame = frame_manager.get(frame_name, config.cameras[camera].frame_shape_yuv)
frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv)
if frame is None: if frame is None:
logger.debug(f"Failed to get frame {frame_id} from SHM") logger.debug(f"Failed to get frame {frame_name} from SHM")
failed_frame_requests[camera] = failed_frame_requests.get(camera, 0) + 1 failed_frame_requests[camera] = failed_frame_requests.get(camera, 0) + 1
if failed_frame_requests[camera] > config.cameras[camera].detect.fps: if failed_frame_requests[camera] > config.cameras[camera].detect.fps:
@ -152,7 +151,7 @@ def output_frames(
preview_recorders[camera].flag_offline(frame_time) preview_recorders[camera].flag_offline(frame_time)
preview_write_times[camera] = frame_time preview_write_times[camera] = frame_time
frame_manager.close(frame_id) frame_manager.close(frame_name)
move_preview_frames("clips") move_preview_frames("clips")
@ -164,15 +163,15 @@ def output_frames(
( (
camera, camera,
frame_name,
frame_time, frame_time,
current_tracked_objects, current_tracked_objects,
motion_boxes, motion_boxes,
regions, regions,
) = data ) = data
frame_id = f"{camera}{frame_time}" frame = frame_manager.get(frame_name, config.cameras[camera].frame_shape_yuv)
frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv) frame_manager.close(frame_name)
frame_manager.close(frame_id)
detection_subscriber.stop() detection_subscriber.stop()

View File

@ -59,7 +59,13 @@ class PtzMotionEstimator:
self.ptz_metrics.reset.set() self.ptz_metrics.reset.set()
logger.debug(f"{config.name}: Motion estimator init") logger.debug(f"{config.name}: Motion estimator init")
def motion_estimator(self, detections, frame_time, camera): def motion_estimator(
self,
detections: list[dict[str, any]],
frame_name: str,
frame_time: float,
camera: str,
):
# If we've just started up or returned to our preset, reset motion estimator for new tracking session # If we've just started up or returned to our preset, reset motion estimator for new tracking session
if self.ptz_metrics.reset.is_set(): if self.ptz_metrics.reset.is_set():
self.ptz_metrics.reset.clear() self.ptz_metrics.reset.clear()
@ -92,9 +98,8 @@ class PtzMotionEstimator:
f"{camera}: Motion estimator running - frame time: {frame_time}" f"{camera}: Motion estimator running - frame time: {frame_time}"
) )
frame_id = f"{camera}{frame_time}"
yuv_frame = self.frame_manager.get( yuv_frame = self.frame_manager.get(
frame_id, self.camera_config.frame_shape_yuv frame_name, self.camera_config.frame_shape_yuv
) )
if yuv_frame is None: if yuv_frame is None:
@ -136,7 +141,7 @@ class PtzMotionEstimator:
except Exception: except Exception:
pass pass
self.frame_manager.close(frame_id) self.frame_manager.close(frame_name)
return self.coord_transformations return self.coord_transformations

View File

@ -514,6 +514,7 @@ class RecordingMaintainer(threading.Thread):
if topic == DetectionTypeEnum.video: if topic == DetectionTypeEnum.video:
( (
camera, camera,
_,
frame_time, frame_time,
current_tracked_objects, current_tracked_objects,
motion_boxes, motion_boxes,

View File

@ -234,6 +234,7 @@ class ReviewSegmentMaintainer(threading.Thread):
def update_existing_segment( def update_existing_segment(
self, self,
segment: PendingReviewSegment, segment: PendingReviewSegment,
frame_name: str,
frame_time: float, frame_time: float,
objects: list[TrackedObject], objects: list[TrackedObject],
) -> None: ) -> None:
@ -292,36 +293,34 @@ class ReviewSegmentMaintainer(threading.Thread):
if should_update: if should_update:
try: try:
frame_id = f"{camera_config.name}{frame_time}"
yuv_frame = self.frame_manager.get( yuv_frame = self.frame_manager.get(
frame_id, camera_config.frame_shape_yuv frame_name, camera_config.frame_shape_yuv
) )
if yuv_frame is None: if yuv_frame is None:
logger.debug(f"Failed to get frame {frame_id} from SHM") logger.debug(f"Failed to get frame {frame_name} from SHM")
return return
self._publish_segment_update( self._publish_segment_update(
segment, camera_config, yuv_frame, active_objects, prev_data segment, camera_config, yuv_frame, active_objects, prev_data
) )
self.frame_manager.close(frame_id) self.frame_manager.close(frame_name)
except FileNotFoundError: except FileNotFoundError:
return return
if not has_activity: if not has_activity:
if not segment.has_frame: if not segment.has_frame:
try: try:
frame_id = f"{camera_config.name}{frame_time}"
yuv_frame = self.frame_manager.get( yuv_frame = self.frame_manager.get(
frame_id, camera_config.frame_shape_yuv frame_name, camera_config.frame_shape_yuv
) )
if yuv_frame is None: if yuv_frame is None:
logger.debug(f"Failed to get frame {frame_id} from SHM") logger.debug(f"Failed to get frame {frame_name} from SHM")
return return
segment.save_full_frame(camera_config, yuv_frame) segment.save_full_frame(camera_config, yuv_frame)
self.frame_manager.close(frame_id) self.frame_manager.close(frame_name)
self._publish_segment_update( self._publish_segment_update(
segment, camera_config, None, [], prev_data segment, camera_config, None, [], prev_data
) )
@ -338,6 +337,7 @@ class ReviewSegmentMaintainer(threading.Thread):
def check_if_new_segment( def check_if_new_segment(
self, self,
camera: str, camera: str,
frame_name: str,
frame_time: float, frame_time: float,
objects: list[TrackedObject], objects: list[TrackedObject],
) -> None: ) -> None:
@ -414,19 +414,18 @@ class ReviewSegmentMaintainer(threading.Thread):
) )
try: try:
frame_id = f"{camera_config.name}{frame_time}"
yuv_frame = self.frame_manager.get( yuv_frame = self.frame_manager.get(
frame_id, camera_config.frame_shape_yuv frame_name, camera_config.frame_shape_yuv
) )
if yuv_frame is None: if yuv_frame is None:
logger.debug(f"Failed to get frame {frame_id} from SHM") logger.debug(f"Failed to get frame {frame_name} from SHM")
return return
self.active_review_segments[camera].update_frame( self.active_review_segments[camera].update_frame(
camera_config, yuv_frame, active_objects camera_config, yuv_frame, active_objects
) )
self.frame_manager.close(frame_id) self.frame_manager.close(frame_name)
self._publish_segment_start(self.active_review_segments[camera]) self._publish_segment_start(self.active_review_segments[camera])
except FileNotFoundError: except FileNotFoundError:
return return
@ -454,16 +453,17 @@ class ReviewSegmentMaintainer(threading.Thread):
if topic == DetectionTypeEnum.video: if topic == DetectionTypeEnum.video:
( (
camera, camera,
frame_name,
frame_time, frame_time,
current_tracked_objects, current_tracked_objects,
motion_boxes, _,
regions, _,
) = data ) = data
elif topic == DetectionTypeEnum.audio: elif topic == DetectionTypeEnum.audio:
( (
camera, camera,
frame_time, frame_time,
dBFS, _,
audio_detections, audio_detections,
) = data ) = data
elif topic == DetectionTypeEnum.api: elif topic == DetectionTypeEnum.api:
@ -488,6 +488,7 @@ class ReviewSegmentMaintainer(threading.Thread):
if topic == DetectionTypeEnum.video: if topic == DetectionTypeEnum.video:
self.update_existing_segment( self.update_existing_segment(
current_segment, current_segment,
frame_name,
frame_time, frame_time,
current_tracked_objects, current_tracked_objects,
) )
@ -538,6 +539,7 @@ class ReviewSegmentMaintainer(threading.Thread):
if topic == DetectionTypeEnum.video: if topic == DetectionTypeEnum.video:
self.check_if_new_segment( self.check_if_new_segment(
camera, camera,
frame_name,
frame_time, frame_time,
current_tracked_objects, current_tracked_objects,
) )

View File

@ -9,5 +9,7 @@ class ObjectTracker(ABC):
pass pass
@abstractmethod @abstractmethod
def match_and_update(self, frame_time: float, detections) -> None: def match_and_update(
self, frame_name: str, frame_time: float, detections: list[dict[str, any]]
) -> None:
pass pass

View File

@ -129,7 +129,7 @@ class CentroidTracker(ObjectTracker):
self.tracked_objects[id].update(new_obj) self.tracked_objects[id].update(new_obj)
def update_frame_times(self, frame_time): def update_frame_times(self, frame_name, frame_time):
for id in list(self.tracked_objects.keys()): for id in list(self.tracked_objects.keys()):
self.tracked_objects[id]["frame_time"] = frame_time self.tracked_objects[id]["frame_time"] = frame_time
self.tracked_objects[id]["motionless_count"] += 1 self.tracked_objects[id]["motionless_count"] += 1

View File

@ -268,7 +268,7 @@ class NorfairTracker(ObjectTracker):
self.tracked_objects[id].update(obj) self.tracked_objects[id].update(obj)
def update_frame_times(self, frame_time): def update_frame_times(self, frame_name: str, frame_time: float):
# if the object was there in the last frame, assume it's still there # if the object was there in the last frame, assume it's still there
detections = [ detections = [
( (
@ -282,9 +282,11 @@ class NorfairTracker(ObjectTracker):
for id, obj in self.tracked_objects.items() for id, obj in self.tracked_objects.items()
if self.disappeared[id] == 0 if self.disappeared[id] == 0
] ]
self.match_and_update(frame_time, detections=detections) self.match_and_update(frame_name, frame_time, detections=detections)
def match_and_update(self, frame_time, detections): def match_and_update(
self, frame_name: str, frame_time: float, detections: list[dict[str, any]]
):
norfair_detections = [] norfair_detections = []
for obj in detections: for obj in detections:
@ -322,7 +324,7 @@ class NorfairTracker(ObjectTracker):
) )
coord_transformations = self.ptz_motion_estimator.motion_estimator( coord_transformations = self.ptz_motion_estimator.motion_estimator(
detections, frame_time, self.camera_name detections, frame_name, frame_time, self.camera_name
) )
tracked_objects = self.tracker.update( tracked_objects = self.tracker.update(

View File

@ -717,19 +717,27 @@ def clipped(obj, frame_shape):
class FrameManager(ABC): class FrameManager(ABC):
@abstractmethod @abstractmethod
def create(self, name, size) -> AnyStr: def create(self, name: str, size: int) -> AnyStr:
pass pass
@abstractmethod @abstractmethod
def get(self, name, timeout_ms=0): def write(self, name: str) -> memoryview:
pass pass
@abstractmethod @abstractmethod
def close(self, name): def get(self, name: str, timeout_ms: int = 0):
pass pass
@abstractmethod @abstractmethod
def delete(self, name): def close(self, name: str):
pass
@abstractmethod
def delete(self, name: str):
pass
@abstractmethod
def cleanup(self):
pass pass
@ -790,6 +798,18 @@ class SharedMemoryFrameManager(FrameManager):
self.shm_store[name] = shm self.shm_store[name] = shm
return shm.buf return shm.buf
def write(self, name: str) -> memoryview:
try:
if name in self.shm_store:
shm = self.shm_store[name]
else:
shm = UntrackedSharedMemory(name=name)
self.shm_store[name] = shm
return shm.buf
except FileNotFoundError:
logger.info(f"the file {name} not found")
return None
def get(self, name: str, shape) -> Optional[np.ndarray]: def get(self, name: str, shape) -> Optional[np.ndarray]:
try: try:
if name in self.shm_store: if name in self.shm_store:
@ -824,6 +844,15 @@ class SharedMemoryFrameManager(FrameManager):
except FileNotFoundError: except FileNotFoundError:
pass pass
def cleanup(self) -> None:
for shm in self.shm_store.values():
shm.close()
try:
shm.unlink()
except FileNotFoundError:
pass
def create_mask(frame_shape, mask): def create_mask(frame_shape, mask):
mask_img = np.zeros(frame_shape, np.uint8) mask_img = np.zeros(frame_shape, np.uint8)

View File

@ -94,7 +94,6 @@ def capture_frames(
ffmpeg_process, ffmpeg_process,
config: CameraConfig, config: CameraConfig,
shm_frame_count: int, shm_frame_count: int,
shm_frames: list[str],
frame_shape, frame_shape,
frame_manager: FrameManager, frame_manager: FrameManager,
frame_queue, frame_queue,
@ -109,25 +108,21 @@ def capture_frames(
skipped_eps = EventsPerSecond() skipped_eps = EventsPerSecond()
skipped_eps.start() skipped_eps.start()
# pre-create shms
for i in range(shm_frame_count):
frame_manager.create(f"{config.name}{i}", frame_size)
frame_index = 0
while True: while True:
fps.value = frame_rate.eps() fps.value = frame_rate.eps()
skipped_fps.value = skipped_eps.eps() skipped_fps.value = skipped_eps.eps()
current_frame.value = datetime.datetime.now().timestamp() current_frame.value = datetime.datetime.now().timestamp()
frame_name = f"{config.name}{current_frame.value}" frame_name = f"{config.name}{frame_index}"
frame_buffer = frame_manager.create(frame_name, frame_size) frame_buffer = frame_manager.write(frame_name)
try: try:
frame_buffer[:] = ffmpeg_process.stdout.read(frame_size) frame_buffer[:] = ffmpeg_process.stdout.read(frame_size)
# update frame cache and cleanup existing frames
shm_frames.append(frame_name)
if len(shm_frames) > shm_frame_count:
expired_frame_name = shm_frames.pop(0)
frame_manager.delete(expired_frame_name)
except Exception: except Exception:
# always delete the frame
frame_manager.delete(frame_name)
# shutdown has been initiated # shutdown has been initiated
if stop_event.is_set(): if stop_event.is_set():
break break
@ -147,12 +142,16 @@ def capture_frames(
# don't lock the queue to check, just try since it should rarely be full # don't lock the queue to check, just try since it should rarely be full
try: try:
# add to the queue # add to the queue
frame_queue.put(current_frame.value, False) frame_queue.put((frame_name, current_frame.value), False)
frame_manager.close(frame_name) frame_manager.close(frame_name)
except queue.Full: except queue.Full:
# if the queue is full, skip this frame # if the queue is full, skip this frame
skipped_eps.update() skipped_eps.update()
frame_index = 0 if frame_index == shm_frame_count - 1 else frame_index + 1
frame_manager.cleanup()
class CameraWatchdog(threading.Thread): class CameraWatchdog(threading.Thread):
def __init__( def __init__(
@ -171,7 +170,6 @@ class CameraWatchdog(threading.Thread):
self.camera_name = camera_name self.camera_name = camera_name
self.config = config self.config = config
self.shm_frame_count = shm_frame_count self.shm_frame_count = shm_frame_count
self.shm_frames: list[str] = []
self.capture_thread = None self.capture_thread = None
self.ffmpeg_detect_process = None self.ffmpeg_detect_process = None
self.logpipe = LogPipe(f"ffmpeg.{self.camera_name}.detect") self.logpipe = LogPipe(f"ffmpeg.{self.camera_name}.detect")
@ -304,7 +302,6 @@ class CameraWatchdog(threading.Thread):
self.capture_thread = CameraCapture( self.capture_thread = CameraCapture(
self.config, self.config,
self.shm_frame_count, self.shm_frame_count,
self.shm_frames,
self.ffmpeg_detect_process, self.ffmpeg_detect_process,
self.frame_shape, self.frame_shape,
self.frame_queue, self.frame_queue,
@ -345,7 +342,6 @@ class CameraCapture(threading.Thread):
self, self,
config: CameraConfig, config: CameraConfig,
shm_frame_count: int, shm_frame_count: int,
shm_frames: list[str],
ffmpeg_process, ffmpeg_process,
frame_shape, frame_shape,
frame_queue, frame_queue,
@ -357,7 +353,6 @@ class CameraCapture(threading.Thread):
self.name = f"capture:{config.name}" self.name = f"capture:{config.name}"
self.config = config self.config = config
self.shm_frame_count = shm_frame_count self.shm_frame_count = shm_frame_count
self.shm_frames = shm_frames
self.frame_shape = frame_shape self.frame_shape = frame_shape
self.frame_queue = frame_queue self.frame_queue = frame_queue
self.fps = fps self.fps = fps
@ -373,7 +368,6 @@ class CameraCapture(threading.Thread):
self.ffmpeg_process, self.ffmpeg_process,
self.config, self.config,
self.shm_frame_count, self.shm_frame_count,
self.shm_frames,
self.frame_shape, self.frame_shape,
self.frame_manager, self.frame_manager,
self.frame_queue, self.frame_queue,
@ -479,8 +473,8 @@ def track_camera(
# empty the frame queue # empty the frame queue
logger.info(f"{name}: emptying frame queue") logger.info(f"{name}: emptying frame queue")
while not frame_queue.empty(): while not frame_queue.empty():
frame_time = frame_queue.get(False) (frame_name, _) = frame_queue.get(False)
frame_manager.delete(f"{name}{frame_time}") frame_manager.delete(frame_name)
logger.info(f"{name}: exiting subprocess") logger.info(f"{name}: exiting subprocess")
@ -576,9 +570,9 @@ def process_frames(
try: try:
if exit_on_empty: if exit_on_empty:
frame_time = frame_queue.get(False) frame_name, frame_time = frame_queue.get(False)
else: else:
frame_time = frame_queue.get(True, 1) frame_name, frame_time = frame_queue.get(True, 1)
except queue.Empty: except queue.Empty:
if exit_on_empty: if exit_on_empty:
logger.info("Exiting track_objects...") logger.info("Exiting track_objects...")
@ -588,9 +582,7 @@ def process_frames(
camera_metrics.detection_frame.value = frame_time camera_metrics.detection_frame.value = frame_time
ptz_metrics.frame_time.value = frame_time ptz_metrics.frame_time.value = frame_time
frame = frame_manager.get( frame = frame_manager.get(frame_name, (frame_shape[0] * 3 // 2, frame_shape[1]))
f"{camera_name}{frame_time}", (frame_shape[0] * 3 // 2, frame_shape[1])
)
if frame is None: if frame is None:
logger.debug(f"{camera_name}: frame {frame_time} is not in memory store.") logger.debug(f"{camera_name}: frame {frame_time} is not in memory store.")
@ -604,7 +596,7 @@ def process_frames(
# if detection is disabled # if detection is disabled
if not detect_config.enabled: if not detect_config.enabled:
object_tracker.match_and_update(frame_time, []) object_tracker.match_and_update(frame_name, frame_time, [])
else: else:
# get stationary object ids # get stationary object ids
# check every Nth frame for stationary objects # check every Nth frame for stationary objects
@ -728,10 +720,12 @@ def process_frames(
if d[0] not in model_config.all_attributes if d[0] not in model_config.all_attributes
] ]
# now that we have refined our detections, we need to track objects # now that we have refined our detections, we need to track objects
object_tracker.match_and_update(frame_time, tracked_detections) object_tracker.match_and_update(
frame_name, frame_time, tracked_detections
)
# else, just update the frame times for the stationary objects # else, just update the frame times for the stationary objects
else: else:
object_tracker.update_frame_times(frame_time) object_tracker.update_frame_times(frame_name, frame_time)
# group the attribute detections based on what label they apply to # group the attribute detections based on what label they apply to
attribute_detections: dict[str, list[TrackedObjectAttribute]] = {} attribute_detections: dict[str, list[TrackedObjectAttribute]] = {}
@ -836,7 +830,7 @@ def process_frames(
) )
# add to the queue if not full # add to the queue if not full
if detected_objects_queue.full(): if detected_objects_queue.full():
frame_manager.delete(f"{camera_name}{frame_time}") frame_manager.close(frame_name)
continue continue
else: else:
fps_tracker.update() fps_tracker.update()
@ -844,6 +838,7 @@ def process_frames(
detected_objects_queue.put( detected_objects_queue.put(
( (
camera_name, camera_name,
frame_name,
frame_time, frame_time,
detections, detections,
motion_boxes, motion_boxes,
@ -851,7 +846,7 @@ def process_frames(
) )
) )
camera_metrics.detection_fps.value = object_detector.fps.eps() camera_metrics.detection_fps.value = object_detector.fps.eps()
frame_manager.close(f"{camera_name}{frame_time}") frame_manager.close(frame_name)
motion_detector.stop() motion_detector.stop()
requestor.stop() requestor.stop()