From 05a64c99a217e3e86b5f98a21e98402591697c9f Mon Sep 17 00:00:00 2001 From: Stanislav Dmitrenko <7953703+avently@users.noreply.github.com> Date: Wed, 29 Nov 2023 01:36:05 +0800 Subject: [PATCH] ios: moving webrtc commands processing to another mechanism (#3480) * ios: moving webrtc commands processing to another mechanism * async * decide * handle errors * error alert * await --------- Co-authored-by: Avently Co-authored-by: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> --- apps/ios/Shared/Model/ChatModel.swift | 2 +- apps/ios/Shared/Model/SimpleXAPI.swift | 26 ++--- .../Shared/Views/Call/ActiveCallView.swift | 23 +---- apps/ios/Shared/Views/Call/CallManager.swift | 22 +++-- apps/ios/Shared/Views/Call/WebRTC.swift | 44 +++++++++ apps/ios/Shared/Views/Call/WebRTCClient.swift | 95 ++++++++++--------- 6 files changed, 124 insertions(+), 88 deletions(-) diff --git a/apps/ios/Shared/Model/ChatModel.swift b/apps/ios/Shared/Model/ChatModel.swift index 8d398eb89..13fe0737e 100644 --- a/apps/ios/Shared/Model/ChatModel.swift +++ b/apps/ios/Shared/Model/ChatModel.swift @@ -83,7 +83,7 @@ final class ChatModel: ObservableObject { // current WebRTC call @Published var callInvitations: Dictionary = [:] @Published var activeCall: Call? - @Published var callCommand: WCallCommand? + let callCommand: WebRTCCommandProcessor = WebRTCCommandProcessor() @Published var showCallView = false // remote desktop @Published var remoteCtrlSession: RemoteCtrlSession? diff --git a/apps/ios/Shared/Model/SimpleXAPI.swift b/apps/ios/Shared/Model/SimpleXAPI.swift index e010de3e8..bb01b631b 100644 --- a/apps/ios/Shared/Model/SimpleXAPI.swift +++ b/apps/ios/Shared/Model/SimpleXAPI.swift @@ -1666,36 +1666,40 @@ func processReceivedMsg(_ res: ChatResponse) async { activateCall(invitation) case let .callOffer(_, contact, callType, offer, sharedKey, _): await withCall(contact) { call in - call.callState = .offerReceived - call.peerMedia = callType.media - call.sharedKey = sharedKey + await MainActor.run { + call.callState = .offerReceived + call.peerMedia = callType.media + call.sharedKey = sharedKey + } let useRelay = UserDefaults.standard.bool(forKey: DEFAULT_WEBRTC_POLICY_RELAY) let iceServers = getIceServers() logger.debug(".callOffer useRelay \(useRelay)") logger.debug(".callOffer iceServers \(String(describing: iceServers))") - m.callCommand = .offer( + await m.callCommand.processCommand(.offer( offer: offer.rtcSession, iceCandidates: offer.rtcIceCandidates, media: callType.media, aesKey: sharedKey, iceServers: iceServers, relay: useRelay - ) + )) } case let .callAnswer(_, contact, answer): await withCall(contact) { call in - call.callState = .answerReceived - m.callCommand = .answer(answer: answer.rtcSession, iceCandidates: answer.rtcIceCandidates) + await MainActor.run { + call.callState = .answerReceived + } + await m.callCommand.processCommand(.answer(answer: answer.rtcSession, iceCandidates: answer.rtcIceCandidates)) } case let .callExtraInfo(_, contact, extraInfo): await withCall(contact) { _ in - m.callCommand = .ice(iceCandidates: extraInfo.rtcIceCandidates) + await m.callCommand.processCommand(.ice(iceCandidates: extraInfo.rtcIceCandidates)) } case let .callEnded(_, contact): if let invitation = await MainActor.run(body: { m.callInvitations.removeValue(forKey: contact.id) }) { CallController.shared.reportCallRemoteEnded(invitation: invitation) } await withCall(contact) { call in - m.callCommand = .end + await m.callCommand.processCommand(.end) CallController.shared.reportCallRemoteEnded(call: call) } case .chatSuspended: @@ -1753,9 +1757,9 @@ func processReceivedMsg(_ res: ChatResponse) async { logger.debug("unsupported event: \(res.responseType)") } - func withCall(_ contact: Contact, _ perform: (Call) -> Void) async { + func withCall(_ contact: Contact, _ perform: (Call) async -> Void) async { if let call = m.activeCall, call.contact.apiId == contact.apiId { - await MainActor.run { perform(call) } + await perform(call) } else { logger.debug("processReceivedMsg: ignoring \(res.responseType), not in call with the contact \(contact.id)") } diff --git a/apps/ios/Shared/Views/Call/ActiveCallView.swift b/apps/ios/Shared/Views/Call/ActiveCallView.swift index 650e9450a..e613476a1 100644 --- a/apps/ios/Shared/Views/Call/ActiveCallView.swift +++ b/apps/ios/Shared/Views/Call/ActiveCallView.swift @@ -49,15 +49,10 @@ struct ActiveCallView: View { } .onDisappear { logger.debug("ActiveCallView: disappear") + Task { await m.callCommand.setClient(nil) } AppDelegate.keepScreenOn(false) client?.endCall() } - .onChange(of: m.callCommand) { cmd in - if let cmd = cmd { - m.callCommand = nil - sendCommandToClient(cmd) - } - } .background(.black) .preferredColorScheme(.dark) } @@ -65,20 +60,8 @@ struct ActiveCallView: View { private func createWebRTCClient() { if client == nil && canConnectCall { client = WebRTCClient($activeCall, { msg in await MainActor.run { processRtcMessage(msg: msg) } }, $localRendererAspectRatio) - if let cmd = m.callCommand { - m.callCommand = nil - sendCommandToClient(cmd) - } - } - } - - private func sendCommandToClient(_ cmd: WCallCommand) { - if call == m.activeCall, - m.activeCall != nil, - let client = client { - logger.debug("sendCallCommand: \(cmd.cmdType)") Task { - await client.sendCallCommand(command: cmd) + await m.callCommand.setClient(client) } } } @@ -174,8 +157,10 @@ struct ActiveCallView: View { } case let .error(message): logger.debug("ActiveCallView: command error: \(message)") + AlertManager.shared.showAlert(Alert(title: Text("Error"), message: Text(message))) case let .invalid(type): logger.debug("ActiveCallView: invalid response: \(type)") + AlertManager.shared.showAlert(Alert(title: Text("Invalid response"), message: Text(type))) } } } diff --git a/apps/ios/Shared/Views/Call/CallManager.swift b/apps/ios/Shared/Views/Call/CallManager.swift index 6b71e88cf..194af3ab0 100644 --- a/apps/ios/Shared/Views/Call/CallManager.swift +++ b/apps/ios/Shared/Views/Call/CallManager.swift @@ -22,7 +22,7 @@ class CallManager { let m = ChatModel.shared if let call = m.activeCall, call.callkitUUID == callUUID { m.showCallView = true - m.callCommand = .capabilities(media: call.localMedia) + Task { await m.callCommand.processCommand(.capabilities(media: call.localMedia)) } return true } return false @@ -57,19 +57,21 @@ class CallManager { m.activeCall = call m.showCallView = true - m.callCommand = .start( + Task { + await m.callCommand.processCommand(.start( media: invitation.callType.media, aesKey: invitation.sharedKey, iceServers: iceServers, relay: useRelay - ) + )) + } } } func enableMedia(media: CallMediaType, enable: Bool, callUUID: UUID) -> Bool { if let call = ChatModel.shared.activeCall, call.callkitUUID == callUUID { let m = ChatModel.shared - m.callCommand = .media(media: media, enable: enable) + Task { await m.callCommand.processCommand(.media(media: media, enable: enable)) } return true } return false @@ -94,13 +96,13 @@ class CallManager { completed() } else { logger.debug("CallManager.endCall: ending call...") - // TODO this command won't be executed because activeCall is assigned nil, - // and there is a condition in sendCommandToClient that would prevent its execution. - m.callCommand = .end - m.activeCall = nil - m.showCallView = false - completed() Task { + await m.callCommand.processCommand(.end) + await MainActor.run { + m.activeCall = nil + m.showCallView = false + completed() + } do { try await apiEndCall(call.contact) } catch { diff --git a/apps/ios/Shared/Views/Call/WebRTC.swift b/apps/ios/Shared/Views/Call/WebRTC.swift index a36a239e2..c21ef5019 100644 --- a/apps/ios/Shared/Views/Call/WebRTC.swift +++ b/apps/ios/Shared/Views/Call/WebRTC.swift @@ -335,6 +335,50 @@ extension WCallResponse: Encodable { } } +actor WebRTCCommandProcessor { + private var client: WebRTCClient? = nil + private var commands: [WCallCommand] = [] + private var running: Bool = false + + func setClient(_ client: WebRTCClient?) async { + logger.debug("WebRTC: setClient, commands count \(self.commands.count)") + self.client = client + if client != nil { + await processAllCommands() + } else { + commands.removeAll() + } + } + + func processCommand(_ c: WCallCommand) async { +// logger.debug("WebRTC: process command \(c.cmdType)") + commands.append(c) + if !running && client != nil { + await processAllCommands() + } + } + + func processAllCommands() async { + logger.debug("WebRTC: process all commands, commands count \(self.commands.count), client == nil \(self.client == nil)") + if let client = client { + running = true + while let c = commands.first, shouldRunCommand(client, c) { + commands.remove(at: 0) + await client.sendCallCommand(command: c) + logger.debug("WebRTC: processed cmd \(c.cmdType)") + } + running = false + } + } + + func shouldRunCommand(_ client: WebRTCClient, _ c: WCallCommand) -> Bool { + switch c { + case .capabilities, .start, .offer, .end: true + default: client.activeCall.wrappedValue != nil + } + } +} + struct ConnectionState: Codable, Equatable { var connectionState: String var iceConnectionState: String diff --git a/apps/ios/Shared/Views/Call/WebRTCClient.swift b/apps/ios/Shared/Views/Call/WebRTCClient.swift index b3cad62fa..acb459938 100644 --- a/apps/ios/Shared/Views/Call/WebRTCClient.swift +++ b/apps/ios/Shared/Views/Call/WebRTCClient.swift @@ -50,7 +50,7 @@ final class WebRTCClient: NSObject, RTCVideoViewDelegate, RTCFrameEncryptorDeleg private let rtcAudioSession = RTCAudioSession.sharedInstance() private let audioQueue = DispatchQueue(label: "audio") private var sendCallResponse: (WVAPIMessage) async -> Void - private var activeCall: Binding + var activeCall: Binding private var localRendererAspectRatio: Binding @available(*, unavailable) @@ -160,19 +160,16 @@ final class WebRTCClient: NSObject, RTCVideoViewDelegate, RTCFrameEncryptorDeleg let encryption = WebRTCClient.enableEncryption let call = initializeCall(iceServers?.toWebRTCIceServers(), media, encryption ? aesKey : nil, relay) activeCall.wrappedValue = call - call.connection.offer { answer in - Task { - await self.sendCallResponse(.init( - corrId: nil, - resp: .offer( - offer: compressToBase64(input: encodeJSON(CustomRTCSessionDescription(type: answer.type.toSdpType(), sdp: answer.sdp))), - iceCandidates: compressToBase64(input: encodeJSON(await self.getInitialIceCandidates())), - capabilities: CallCapabilities(encryption: encryption) - ), - command: command) - ) - await self.waitForMoreIceCandidates() - } + let (offer, error) = await call.connection.offer() + if let offer = offer { + resp = .offer( + offer: compressToBase64(input: encodeJSON(CustomRTCSessionDescription(type: offer.type.toSdpType(), sdp: offer.sdp))), + iceCandidates: compressToBase64(input: encodeJSON(await self.getInitialIceCandidates())), + capabilities: CallCapabilities(encryption: encryption) + ) + self.waitForMoreIceCandidates() + } else { + resp = .error(message: "offer error: \(error?.localizedDescription ?? "unknown error")") } case let .offer(offer, iceCandidates, media, aesKey, iceServers, relay): if activeCall.wrappedValue != nil { @@ -186,22 +183,16 @@ final class WebRTCClient: NSObject, RTCVideoViewDelegate, RTCFrameEncryptorDeleg let pc = call.connection if let type = offer.type, let sdp = offer.sdp { if (try? await pc.setRemoteDescription(RTCSessionDescription(type: type.toWebRTCSdpType(), sdp: sdp))) != nil { - pc.answer { answer in + let (answer, error) = await pc.answer() + if let answer = answer { self.addIceCandidates(pc, remoteIceCandidates) -// Task { -// try? await Task.sleep(nanoseconds: 32_000 * 1000000) - Task { - await self.sendCallResponse(.init( - corrId: nil, - resp: .answer( - answer: compressToBase64(input: encodeJSON(CustomRTCSessionDescription(type: answer.type.toSdpType(), sdp: answer.sdp))), - iceCandidates: compressToBase64(input: encodeJSON(await self.getInitialIceCandidates())) - ), - command: command) - ) - await self.waitForMoreIceCandidates() - } -// } + resp = .answer( + answer: compressToBase64(input: encodeJSON(CustomRTCSessionDescription(type: answer.type.toSdpType(), sdp: answer.sdp))), + iceCandidates: compressToBase64(input: encodeJSON(await self.getInitialIceCandidates())) + ) + self.waitForMoreIceCandidates() + } else { + resp = .error(message: "answer error: \(error?.localizedDescription ?? "unknown error")") } } else { resp = .error(message: "accept: remote description is not set") @@ -260,12 +251,14 @@ final class WebRTCClient: NSObject, RTCVideoViewDelegate, RTCFrameEncryptorDeleg return candidates } - func waitForMoreIceCandidates() async { - await untilIceComplete(timeoutMs: 12000, stepMs: 1500) { - let candidates = await self.activeCall.wrappedValue?.iceCandidates.getAndClear() ?? [] - if candidates.count > 0 { - logger.debug("WebRTCClient: sending more ice candidates: \(candidates.count)") - await self.sendIceCandidates(candidates) + func waitForMoreIceCandidates() { + Task { + await untilIceComplete(timeoutMs: 12000, stepMs: 1500) { + let candidates = await self.activeCall.wrappedValue?.iceCandidates.getAndClear() ?? [] + if candidates.count > 0 { + logger.debug("WebRTCClient: sending more ice candidates: \(candidates.count)") + await self.sendIceCandidates(candidates) + } } } } @@ -442,25 +435,33 @@ extension WebRTC.RTCPeerConnection { optionalConstraints: nil) } - func offer(_ completion: @escaping (_ sdp: RTCSessionDescription) -> Void) { - offer(for: mediaConstraints()) { (sdp, error) in - guard let sdp = sdp else { - return + func offer() async -> (RTCSessionDescription?, Error?) { + await withCheckedContinuation { cont in + offer(for: mediaConstraints()) { (sdp, error) in + self.processSDP(cont, sdp, error) } - self.setLocalDescription(sdp, completionHandler: { (error) in - completion(sdp) - }) } } - func answer(_ completion: @escaping (_ sdp: RTCSessionDescription) -> Void) { - answer(for: mediaConstraints()) { (sdp, error) in - guard let sdp = sdp else { - return + func answer() async -> (RTCSessionDescription?, Error?) { + await withCheckedContinuation { cont in + answer(for: mediaConstraints()) { (sdp, error) in + self.processSDP(cont, sdp, error) } + } + } + + private func processSDP(_ cont: CheckedContinuation<(RTCSessionDescription?, Error?), Never>, _ sdp: RTCSessionDescription?, _ error: Error?) { + if let sdp = sdp { self.setLocalDescription(sdp, completionHandler: { (error) in - completion(sdp) + if let error = error { + cont.resume(returning: (nil, error)) + } else { + cont.resume(returning: (sdp, nil)) + } }) + } else { + cont.resume(returning: (nil, error)) } } }