* core: forward notifications about message processing (for iOS notifications) * simplexmq * the option to keep database key, to allow re-opening the database * export new init with keepKey and reopen DB api * stop remote ctrl when suspending chat * ios: close/re-open db on suspend/activate * allow activating chat without restoring (for NSE) * update NSE to suspend/activate (does not work) * simplexmq * suspend chat and close database when last notification in the process is processed * stop reading notifications on message markers * replace async stream with cancellable concurrent queue * better synchronization of app and NSE * remove outside of task * remove unused var * whitespace * more debug logging, handle cancelled read after dequeue * comments * more comments
65 lines
1.9 KiB
Swift
65 lines
1.9 KiB
Swift
//
|
|
// ConcurrentQueue.swift
|
|
// SimpleX NSE
|
|
//
|
|
// Created by Evgeny on 08/12/2023.
|
|
// Copyright © 2023 SimpleX Chat. All rights reserved.
|
|
//
|
|
|
|
import Foundation
|
|
|
|
struct DequeueElement<T> {
|
|
var elementId: UUID?
|
|
var task: Task<T?, Never>
|
|
}
|
|
|
|
class ConcurrentQueue<T> {
|
|
private var queue: [T] = []
|
|
private var queueLock = DispatchQueue(label: "chat.simplex.app.SimpleX-NSE.concurrent-queue.lock.\(UUID())")
|
|
private var continuations = [(elementId: UUID, continuation: CheckedContinuation<T?, Never>)]()
|
|
|
|
func enqueue(_ el: T) {
|
|
resumeContinuation(el) { self.queue.append(el) }
|
|
}
|
|
|
|
func frontEnqueue(_ el: T) {
|
|
resumeContinuation(el) { self.queue.insert(el, at: 0) }
|
|
}
|
|
|
|
private func resumeContinuation(_ el: T, add: @escaping () -> Void) {
|
|
queueLock.sync {
|
|
if let (_, cont) = continuations.first {
|
|
continuations.remove(at: 0)
|
|
cont.resume(returning: el)
|
|
} else {
|
|
add()
|
|
}
|
|
}
|
|
}
|
|
|
|
func dequeue() -> DequeueElement<T> {
|
|
queueLock.sync {
|
|
if queue.isEmpty {
|
|
let elementId = UUID()
|
|
let task = Task {
|
|
await withCheckedContinuation { cont in
|
|
continuations.append((elementId, cont))
|
|
}
|
|
}
|
|
return DequeueElement(elementId: elementId, task: task)
|
|
} else {
|
|
let el = queue.remove(at: 0)
|
|
return DequeueElement(task: Task { el })
|
|
}
|
|
}
|
|
}
|
|
|
|
func cancelDequeue(_ elementId: UUID) {
|
|
queueLock.sync {
|
|
let cancelled = continuations.filter { $0.elementId == elementId }
|
|
continuations.removeAll { $0.elementId == elementId }
|
|
cancelled.forEach { $0.continuation.resume(returning: nil) }
|
|
}
|
|
}
|
|
}
|