simplex-chat/apps/ios/SimpleX NSE/ConcurrentQueue.swift
Evgeny Poberezkin d3059afc99
ios, core: better notifications processing to avoid contention for database (#3485)
* core: forward notifications about message processing (for iOS notifications)

* simplexmq

* the option to keep database key, to allow re-opening the database

* export new init with keepKey and reopen DB api

* stop remote ctrl when suspending chat

* ios: close/re-open db on suspend/activate

* allow activating chat without restoring (for NSE)

* update NSE to suspend/activate (does not work)

* simplexmq

* suspend chat and close database when last notification in the process is processed

* stop reading notifications on message markers

* replace async stream with cancellable concurrent queue

* better synchronization of app and NSE

* remove outside of task

* remove unused var

* whitespace

* more debug logging, handle cancelled read after dequeue

* comments

* more comments
2023-12-09 21:59:40 +00:00

65 lines
1.9 KiB
Swift

//
// ConcurrentQueue.swift
// SimpleX NSE
//
// Created by Evgeny on 08/12/2023.
// Copyright © 2023 SimpleX Chat. All rights reserved.
//
import Foundation
struct DequeueElement<T> {
var elementId: UUID?
var task: Task<T?, Never>
}
class ConcurrentQueue<T> {
private var queue: [T] = []
private var queueLock = DispatchQueue(label: "chat.simplex.app.SimpleX-NSE.concurrent-queue.lock.\(UUID())")
private var continuations = [(elementId: UUID, continuation: CheckedContinuation<T?, Never>)]()
func enqueue(_ el: T) {
resumeContinuation(el) { self.queue.append(el) }
}
func frontEnqueue(_ el: T) {
resumeContinuation(el) { self.queue.insert(el, at: 0) }
}
private func resumeContinuation(_ el: T, add: @escaping () -> Void) {
queueLock.sync {
if let (_, cont) = continuations.first {
continuations.remove(at: 0)
cont.resume(returning: el)
} else {
add()
}
}
}
func dequeue() -> DequeueElement<T> {
queueLock.sync {
if queue.isEmpty {
let elementId = UUID()
let task = Task {
await withCheckedContinuation { cont in
continuations.append((elementId, cont))
}
}
return DequeueElement(elementId: elementId, task: task)
} else {
let el = queue.remove(at: 0)
return DequeueElement(task: Task { el })
}
}
}
func cancelDequeue(_ elementId: UUID) {
queueLock.sync {
let cancelled = continuations.filter { $0.elementId == elementId }
continuations.removeAll { $0.elementId == elementId }
cancelled.forEach { $0.continuation.resume(returning: nil) }
}
}
}