From 2de1694f265fe9afb79d9f74a2cfe4f5c4431774 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Mon, 17 Apr 2023 00:22:43 +0100 Subject: [PATCH] core: increase cuncurrency with separate locks per each chat entity --- cabal.project | 2 +- scripts/nix/sha256map.nix | 2 +- src/Simplex/Chat.hs | 257 ++++++++++++++++++++------------- src/Simplex/Chat/Controller.hs | 15 +- stack.yaml | 2 +- 5 files changed, 174 insertions(+), 104 deletions(-) diff --git a/cabal.project b/cabal.project index 81e856ace..c3b518e59 100644 --- a/cabal.project +++ b/cabal.project @@ -7,7 +7,7 @@ constraints: zip +disable-bzip2 +disable-zstd source-repository-package type: git location: https://github.com/simplex-chat/simplexmq.git - tag: 2b93e0b17d0556988885757e5b7305f6a1db65a7 + tag: bd86d3b075e0178fc775af09e4b071a0555e73c5 source-repository-package type: git diff --git a/scripts/nix/sha256map.nix b/scripts/nix/sha256map.nix index 94935cac4..ead349fcb 100644 --- a/scripts/nix/sha256map.nix +++ b/scripts/nix/sha256map.nix @@ -1,5 +1,5 @@ { - "https://github.com/simplex-chat/simplexmq.git"."2b93e0b17d0556988885757e5b7305f6a1db65a7" = "08dvvd5fgfypdrb0x9pd8f4xm4xwawbrb59k24zn7fdmg636q8ij"; + "https://github.com/simplex-chat/simplexmq.git"."bd86d3b075e0178fc775af09e4b071a0555e73c5" = "0fyag49ljq9h8y452w9pfs4aszwbj9wmm5qsc7i45wbq9wizapv1"; "https://github.com/simplex-chat/hs-socks.git"."a30cc7a79a08d8108316094f8f2f82a0c5e1ac51" = "0yasvnr7g91k76mjkamvzab2kvlb1g5pspjyjn2fr6v83swjhj38"; "https://github.com/kazu-yamamoto/http2.git"."b5a1b7200cf5bc7044af34ba325284271f6dff25" = "0dqb50j57an64nf4qcf5vcz4xkd1vzvghvf8bk529c1k30r9nfzb"; "https://github.com/simplex-chat/direct-sqlcipher.git"."34309410eb2069b029b8fc1872deb1e0db123294" = "0kwkmhyfsn2lixdlgl15smgr1h5gjk7fky6abzh8rng2h5ymnffd"; diff --git a/src/Simplex/Chat.hs b/src/Simplex/Chat.hs index db7e6874c..7c777ac1e 100644 --- a/src/Simplex/Chat.hs +++ b/src/Simplex/Chat.hs @@ -161,20 +161,53 @@ newChatController ChatDatabase {chatStore, agentStore} user cfg@ChatConfig {agen outputQ <- newTBQueueIO tbqSize notifyQ <- newTBQueueIO tbqSize chatLock <- newEmptyTMVarIO - sndFiles <- newTVarIO M.empty - rcvFiles <- newTVarIO M.empty + entityChatLocks <- atomically TM.empty + entityLocks <- newTVarIO 0 + sndFiles <- atomically TM.empty + rcvFiles <- atomically TM.empty currentCalls <- atomically TM.empty filesFolder <- newTVarIO optFilesFolder incognitoMode <- newTVarIO False chatStoreChanged <- newTVarIO False - expireCIThreads <- newTVarIO M.empty - expireCIFlags <- newTVarIO M.empty + expireCIThreads <- atomically TM.empty + expireCIFlags <- atomically TM.empty cleanupManagerAsync <- newTVarIO Nothing timedItemThreads <- atomically TM.empty showLiveItems <- newTVarIO False userXFTPFileConfig <- newTVarIO $ xftpFileConfig cfg tempDirectory <- newTVarIO tempDir - pure ChatController {activeTo, firstTime, currentUser, smpAgent, agentAsync, chatStore, chatStoreChanged, idsDrg, inputQ, outputQ, notifyQ, chatLock, sndFiles, rcvFiles, currentCalls, config, sendNotification, incognitoMode, filesFolder, expireCIThreads, expireCIFlags, cleanupManagerAsync, timedItemThreads, showLiveItems, userXFTPFileConfig, tempDirectory, logFilePath = logFile} + pure + ChatController + { activeTo, + firstTime, + currentUser, + smpAgent, + agentAsync, + chatStore, + chatStoreChanged, + idsDrg, + inputQ, + outputQ, + notifyQ, + chatLock, + entityChatLocks, + entityLocks, + sndFiles, + rcvFiles, + currentCalls, + config, + sendNotification, + incognitoMode, + filesFolder, + expireCIThreads, + expireCIFlags, + cleanupManagerAsync, + timedItemThreads, + showLiveItems, + userXFTPFileConfig, + tempDirectory, + logFilePath = logFile + } where configServers :: DefaultAgentServers configServers = @@ -378,7 +411,7 @@ processChatCommand = \case user' <- privateGetUser userId' validateUserPassword user user' viewPwd_ checkDeleteChatUser user' - withChatLock "deleteUser" . procCmd $ deleteChatUser user' delSMPQueues + withFullChatLock "deleteUser" . procCmd $ deleteChatUser user' delSMPQueues DeleteUser uName delSMPQueues viewPwd_ -> withUserName uName $ \userId -> APIDeleteUser userId delSMPQueues viewPwd_ StartChat subConns enableExpireCIs -> withUser' $ \_ -> asks agentAsync >>= readTVarIO >>= \case @@ -435,8 +468,8 @@ processChatCommand = \case CTContactRequest -> pure $ chatCmdError (Just user) "not implemented" CTContactConnection -> pure $ chatCmdError (Just user) "not supported" APIGetChatItems _pagination -> pure $ chatCmdError Nothing "not implemented" - APISendMessage (ChatRef cType chatId) live (ComposedMessage file_ quotedItemId_ mc) -> withUser $ \user@User {userId} -> withChatLock "sendMessage" $ case cType of - CTDirect -> do + APISendMessage (ChatRef cType chatId) live (ComposedMessage file_ quotedItemId_ mc) -> withUser $ \user@User {userId} -> case cType of + CTDirect -> withContactLock "sendMessage" chatId $ do ct@Contact {contactId, localDisplayName = c, contactUsed} <- withStore $ \db -> getContact db user chatId assertDirectAllowed user MDSnd ct XMsgNew_ unless contactUsed $ withStore' $ \db -> updateContactUsed db user ct @@ -497,7 +530,7 @@ processChatCommand = \case quoteData ChatItem {content = CISndMsgContent qmc} = pure (qmc, CIQDirectSnd, True) quoteData ChatItem {content = CIRcvMsgContent qmc} = pure (qmc, CIQDirectRcv, False) quoteData _ = throwChatError CEInvalidQuote - CTGroup -> do + CTGroup -> withGroupLock "sendMessage" chatId $ do g@(Group gInfo@GroupInfo {groupId, membership, localDisplayName = gName} ms) <- withStore $ \db -> getGroup db user chatId assertUserGroupRole gInfo GRAuthor if isVoice mc && not (groupFeatureAllowed SGFVoice gInfo) @@ -608,8 +641,8 @@ processChatCommand = \case unzipMaybe3 :: Maybe (a, b, c) -> (Maybe a, Maybe b, Maybe c) unzipMaybe3 (Just (a, b, c)) = (Just a, Just b, Just c) unzipMaybe3 _ = (Nothing, Nothing, Nothing) - APIUpdateChatItem (ChatRef cType chatId) itemId live mc -> withUser $ \user -> withChatLock "updateChatItem" $ case cType of - CTDirect -> do + APIUpdateChatItem (ChatRef cType chatId) itemId live mc -> withUser $ \user -> case cType of + CTDirect -> withContactLock "updateChatItem" chatId $ do (ct@Contact {contactId, localDisplayName = c}, cci) <- withStore $ \db -> (,) <$> getContact db user chatId <*> getDirectChatItem db user chatId itemId assertDirectAllowed user MDSnd ct XMsgUpdate_ case cci of @@ -623,7 +656,7 @@ processChatCommand = \case pure $ CRChatItemUpdated user (AChatItem SCTDirect SMDSnd (DirectChat ct) ci') _ -> throwChatError CEInvalidChatItemUpdate CChatItem SMDRcv _ -> throwChatError CEInvalidChatItemUpdate - CTGroup -> do + CTGroup -> withGroupLock "updateChatItem" chatId $ do Group gInfo@GroupInfo {groupId, localDisplayName = gName} ms <- withStore $ \db -> getGroup db user chatId assertUserGroupRole gInfo GRAuthor cci <- withStore $ \db -> getGroupChatItem db user chatId itemId @@ -640,8 +673,8 @@ processChatCommand = \case CChatItem SMDRcv _ -> throwChatError CEInvalidChatItemUpdate CTContactRequest -> pure $ chatCmdError (Just user) "not supported" CTContactConnection -> pure $ chatCmdError (Just user) "not supported" - APIDeleteChatItem (ChatRef cType chatId) itemId mode -> withUser $ \user -> withChatLock "deleteChatItem" $ case cType of - CTDirect -> do + APIDeleteChatItem (ChatRef cType chatId) itemId mode -> withUser $ \user -> case cType of + CTDirect -> withContactLock "deleteChatItem" chatId $ do (ct@Contact {localDisplayName = c}, ci@(CChatItem msgDir ChatItem {meta = CIMeta {itemSharedMsgId}})) <- withStore $ \db -> (,) <$> getContact db user chatId <*> getDirectChatItem db user chatId itemId case (mode, msgDir, itemSharedMsgId) of (CIDMInternal, _, _) -> deleteDirectCI user ct ci True False @@ -653,7 +686,7 @@ processChatCommand = \case then deleteDirectCI user ct ci True False else markDirectCIDeleted user ct ci msgId True (CIDMBroadcast, _, _) -> throwChatError CEInvalidChatItemDelete - CTGroup -> do + CTGroup -> withGroupLock "deleteChatItem" chatId $ do Group gInfo ms <- withStore $ \db -> getGroup db user chatId ci@(CChatItem msgDir ChatItem {meta = CIMeta {itemSharedMsgId}}) <- withStore $ \db -> getGroupChatItem db user chatId itemId case (mode, msgDir, itemSharedMsgId) of @@ -665,7 +698,7 @@ processChatCommand = \case (CIDMBroadcast, _, _) -> throwChatError CEInvalidChatItemDelete CTContactRequest -> pure $ chatCmdError (Just user) "not supported" CTContactConnection -> pure $ chatCmdError (Just user) "not supported" - APIDeleteMemberChatItem gId mId itemId -> withUser $ \user -> withChatLock "deleteChatItem" $ do + APIDeleteMemberChatItem gId mId itemId -> withUser $ \user -> withGroupLock "deleteChatItem" gId $ do Group gInfo@GroupInfo {membership} ms <- withStore $ \db -> getGroup db user gId ci@(CChatItem _ ChatItem {chatDir, meta = CIMeta {itemSharedMsgId}}) <- withStore $ \db -> getGroupChatItem db user gId itemId case (chatDir, itemSharedMsgId) of @@ -715,7 +748,7 @@ processChatCommand = \case ct@Contact {localDisplayName} <- withStore $ \db -> getContact db user chatId filesInfo <- withStore' $ \db -> getContactFileInfo db user ct contactConnIds <- map aConnId <$> withStore (\db -> getContactConnections db userId ct) - withChatLock "deleteChat direct" . procCmd $ do + withContactLock "deleteChat direct" chatId . procCmd $ do fileAgentConnIds <- concat <$> forM filesInfo (deleteFile user) deleteAgentConnectionsAsync user $ fileAgentConnIds <> contactConnIds -- functions below are called in separate transactions to prevent crashes on android @@ -724,7 +757,7 @@ processChatCommand = \case withStore' $ \db -> deleteContact db user ct unsetActive $ ActiveC localDisplayName pure $ CRContactDeleted user ct - CTContactConnection -> withChatLock "deleteChat contactConnection" . procCmd $ do + CTContactConnection -> withUserContactLock "deleteChat contactConnection" chatId . procCmd $ do conn@PendingContactConnection {pccAgentConnId = AgentConnId acId} <- withStore $ \db -> getPendingContactConnection db userId chatId deleteAgentConnectionAsync user acId withStore' $ \db -> deletePendingContactConnection db userId chatId @@ -735,7 +768,7 @@ processChatCommand = \case canDelete = isOwner || not (memberCurrent membership) unless canDelete $ throwChatError $ CEGroupUserRole gInfo GROwner filesInfo <- withStore' $ \db -> getGroupFileInfo db user gInfo - withChatLock "deleteChat group" . procCmd $ do + withGroupLock "deleteChat group" chatId . procCmd $ do deleteFilesAndConns user filesInfo when (memberActive membership && isOwner) . void $ sendGroupMessage user gInfo members XGrpDel deleteGroupLinkIfExists user gInfo @@ -782,26 +815,28 @@ processChatCommand = \case pure $ CRChatCleared user (AChatInfo SCTGroup $ GroupChat gInfo) CTContactConnection -> pure $ chatCmdError (Just user) "not supported" CTContactRequest -> pure $ chatCmdError (Just user) "not supported" - APIAcceptContact connReqId -> withUser $ \_ -> withChatLock "acceptContact" $ do - (user, cReq) <- withStore $ \db -> getContactRequest' db connReqId + APIAcceptContact connReqId -> withUser $ \_ -> do + (user, cReq@UserContactRequest {userContactLinkId}) <- withStore $ \db -> getContactRequest' db connReqId -- [incognito] generate profile to send, create connection with incognito profile - incognito <- readTVarIO =<< asks incognitoMode - incognitoProfile <- if incognito then Just . NewIncognito <$> liftIO generateRandomProfile else pure Nothing - ct <- acceptContactRequest user cReq incognitoProfile - pure $ CRAcceptingContactRequest user ct - APIRejectContact connReqId -> withUser $ \user -> withChatLock "rejectContact" $ do - cReq@UserContactRequest {agentContactConnId = AgentConnId connId, agentInvitationId = AgentInvId invId} <- + withUserContactLock "acceptContact" userContactLinkId $ do + incognito <- readTVarIO =<< asks incognitoMode + incognitoProfile <- if incognito then Just . NewIncognito <$> liftIO generateRandomProfile else pure Nothing + ct <- acceptContactRequest user cReq incognitoProfile + pure $ CRAcceptingContactRequest user ct + APIRejectContact connReqId -> withUser $ \user -> do + cReq@UserContactRequest {userContactLinkId, agentContactConnId = AgentConnId connId, agentInvitationId = AgentInvId invId} <- withStore $ \db -> getContactRequest db user connReqId `E.finally` liftIO (deleteContactRequest db user connReqId) - withAgent $ \a -> rejectContact a connId invId - pure $ CRContactRequestRejected user cReq + withUserContactLock "rejectContact" userContactLinkId $ do + withAgent $ \a -> rejectContact a connId invId + pure $ CRContactRequestRejected user cReq APISendCallInvitation contactId callType -> withUser $ \user -> do -- party initiating call ct <- withStore $ \db -> getContact db user contactId assertDirectAllowed user MDSnd ct XCallInv_ calls <- asks currentCalls - withChatLock "sendCallInvitation" $ do + withContactLock "sendCallInvitation" contactId $ do callId <- CallId <$> drgRandomBytes 16 dhKeyPair <- if encryptedCall callType then Just <$> liftIO C.generateKeyPair' else pure Nothing let invitation = CallInvitation {callType, callDhPubKey = fst <$> dhKeyPair} @@ -923,12 +958,11 @@ processChatCommand = \case toServerCfg server = ServerCfg {server, preset = True, tested = Nothing, enabled = True} GetUserProtoServers aProtocol -> withUser $ \User {userId} -> processChatCommand $ APIGetUserProtoServers userId aProtocol - APISetUserProtoServers userId (APSC p (ProtoServersConfig servers)) -> withUserId userId $ \user -> withServerProtocol p $ - withChatLock "setUserSMPServers" $ do - withStore $ \db -> overwriteProtocolServers db user servers - cfg <- asks config - withAgent $ \a -> setProtocolServers a (aUserId user) $ activeAgentServers cfg p servers - ok user + APISetUserProtoServers userId (APSC p (ProtoServersConfig servers)) -> withUserId userId $ \user -> withServerProtocol p $ do + withStore $ \db -> overwriteProtocolServers db user servers + cfg <- asks config + withAgent $ \a -> setProtocolServers a (aUserId user) $ activeAgentServers cfg p servers + ok user SetUserProtoServers serversConfig -> withUser $ \User {userId} -> processChatCommand $ APISetUserProtoServers userId serversConfig APITestProtoServer userId srv@(AProtoServerWithAuth p server) -> withUserId userId $ \user -> @@ -939,7 +973,7 @@ processChatCommand = \case APISetChatItemTTL userId newTTL_ -> withUser' $ \user -> do checkSameUser userId user checkStoreNotChanged $ - withChatLock "setChatItemTTL" $ do + withFullChatLock "setChatItemTTL" $ do case newTTL_ of Nothing -> do withStore' $ \db -> setChatItemTTL db user newTTL_ @@ -1061,7 +1095,7 @@ processChatCommand = \case EnableGroupMember gName mName -> withMemberName gName mName $ \gId mId -> APIEnableGroupMember gId mId ChatHelp section -> pure $ CRChatHelp section Welcome -> withUser $ pure . CRWelcome - APIAddContact userId -> withUserId userId $ \user -> withChatLock "addContact" . procCmd $ do + APIAddContact userId -> withUserId userId $ \user -> withFullChatLock "addContact" . procCmd $ do -- [incognito] generate profile for connection incognito <- readTVarIO =<< asks incognitoMode incognitoProfile <- if incognito then Just <$> liftIO generateRandomProfile else pure Nothing @@ -1071,7 +1105,7 @@ processChatCommand = \case pure $ CRInvitation user cReq AddContact -> withUser $ \User {userId} -> processChatCommand $ APIAddContact userId - APIConnect userId (Just (ACR SCMInvitation cReq)) -> withUserId userId $ \user -> withChatLock "connect" . procCmd $ do + APIConnect userId (Just (ACR SCMInvitation cReq)) -> withUserId userId $ \user -> withFullChatLock "connect" . procCmd $ do -- [incognito] generate profile to send incognito <- readTVarIO =<< asks incognitoMode incognitoProfile <- if incognito then Just <$> liftIO generateRandomProfile else pure Nothing @@ -1093,13 +1127,13 @@ processChatCommand = \case CRContactsList user <$> withStore' (`getUserContacts` user) ListContacts -> withUser $ \User {userId} -> processChatCommand $ APIListContacts userId - APICreateMyAddress userId -> withUserId userId $ \user -> withChatLock "createMyAddress" . procCmd $ do + APICreateMyAddress userId -> withUserId userId $ \user -> withFullChatLock "createMyAddress" . procCmd $ do (connId, cReq) <- withAgent $ \a -> createConnection a (aUserId user) True SCMContact Nothing withStore $ \db -> createUserContactLink db user connId cReq pure $ CRUserContactLinkCreated user cReq CreateMyAddress -> withUser $ \User {userId} -> processChatCommand $ APICreateMyAddress userId - APIDeleteMyAddress userId -> withUserId userId $ \user -> withChatLock "deleteMyAddress" $ do + APIDeleteMyAddress userId -> withUserId userId $ \user -> withFullChatLock "deleteMyAddress" $ do conns <- withStore (`getUserAddressConnections` user) procCmd $ do deleteAgentConnectionsAsync user $ map aConnId conns @@ -1126,17 +1160,17 @@ processChatCommand = \case SendLiveMessage chatName msg -> sendTextMessage chatName msg True SendMessageBroadcast msg -> withUser $ \user -> do contacts <- withStore' (`getUserContacts` user) - withChatLock "sendMessageBroadcast" . procCmd $ do + procCmd $ do let mc = MCText msg cts = filter (\ct -> isReady ct && directOrUsed ct) contacts forM_ cts $ \ct -> - void - ( do - (sndMsg, _) <- sendDirectContactMessage ct (XMsgNew $ MCSimple (extMsgContent mc Nothing)) - saveSndChatItem user (CDDirectSnd ct) sndMsg (CISndMsgContent mc) - ) - `catchError` (toView . CRChatError (Just user)) + withContactLock "sendMessageBroadcast" (contactId' ct) $ + void (send user ct mc) `catchError` (toView . CRChatError (Just user)) CRBroadcastSent user mc (length cts) <$> liftIO getZonedTime + where + send user ct mc = do + (sndMsg, _) <- sendDirectContactMessage ct (XMsgNew $ MCSimple (extMsgContent mc Nothing)) + saveSndChatItem user (CDDirectSnd ct) sndMsg (CISndMsgContent mc) SendMessageQuote cName (AMsgDirection msgDir) quotedMsg msg -> withUser $ \user@User {userId} -> do contactId <- withStore $ \db -> getContactIdByName db user cName quotedItemId <- withStore $ \db -> getDirectChatItemIdByText db userId contactId msgDir quotedMsg @@ -1165,7 +1199,7 @@ processChatCommand = \case pure $ CRGroupCreated user groupInfo NewGroup gProfile -> withUser $ \User {userId} -> processChatCommand $ APINewGroup userId gProfile - APIAddMember groupId contactId memRole -> withUser $ \user -> withChatLock "addMember" $ do + APIAddMember groupId contactId memRole -> withUser $ \user -> withGroupLock "addMember" groupId $ do -- TODO for large groups: no need to load all members to determine if contact is a member (group, contact) <- withStore $ \db -> (,) <$> getGroup db user groupId <*> getContact db user contactId assertDirectAllowed user MDSnd contact XGrpInv_ @@ -1195,7 +1229,7 @@ processChatCommand = \case | otherwise -> throwChatError $ CEGroupDuplicateMember cName APIJoinGroup groupId -> withUser $ \user@User {userId} -> do ReceivedGroupInvitation {fromMember, connRequest, groupInfo = g@GroupInfo {membership}} <- withStore $ \db -> getGroupInvitation db user groupId - withChatLock "joinGroup" . procCmd $ do + withGroupLock "joinGroup" groupId . procCmd $ do agentConnId <- withAgent $ \a -> joinConnection a (aUserId user) True connRequest . directMessage $ XGrpAcpt (memberId (membership :: GroupMember)) withStore' $ \db -> do createMemberConnection db userId fromMember agentConnId @@ -1222,7 +1256,7 @@ processChatCommand = \case changeMemberRole user gInfo members m gEvent = do let GroupMember {memberId = mId, memberRole = mRole, memberStatus = mStatus, memberContactId, localDisplayName = cName} = m assertUserGroupRole gInfo $ maximum [GRAdmin, mRole, memRole] - withChatLock "memberRole" . procCmd $ do + withGroupLock "memberRole" groupId . procCmd $ do unless (mRole == memRole) $ do withStore' $ \db -> updateGroupMemberRole db user m memRole case mStatus of @@ -1241,7 +1275,7 @@ processChatCommand = \case Nothing -> throwChatError CEGroupMemberNotFound Just m@GroupMember {memberId = mId, memberRole = mRole, memberStatus = mStatus, memberProfile} -> do assertUserGroupRole gInfo $ max GRAdmin mRole - withChatLock "removeMember" . procCmd $ do + withGroupLock "removeMember" groupId . procCmd $ do case mStatus of GSMemInvited -> do deleteMemberConnection user m @@ -1256,7 +1290,7 @@ processChatCommand = \case pure $ CRUserDeletedMember user gInfo m {memberStatus = GSMemRemoved} APILeaveGroup groupId -> withUser $ \user@User {userId} -> do Group gInfo@GroupInfo {membership} members <- withStore $ \db -> getGroup db user groupId - withChatLock "leaveGroup" . procCmd $ do + withGroupLock "leaveGroup" groupId . procCmd $ do msg <- sendGroupMessage user gInfo members XGrpLeave ci <- saveSndChatItem user (CDGroupSnd gInfo) msg (CISndGroupEvent SGEUserLeft) toView $ CRNewChatItem user (AChatItem SCTGroup SMDSnd (GroupChat gInfo) ci) @@ -1299,7 +1333,7 @@ processChatCommand = \case CRGroupProfile user <$> withStore (\db -> getGroupInfoByName db user gName) UpdateGroupDescription gName description -> updateGroupProfileByName gName $ \p -> p {description} - APICreateGroupLink groupId mRole -> withUser $ \user -> withChatLock "createGroupLink" $ do + APICreateGroupLink groupId mRole -> withUser $ \user -> withGroupLock "createGroupLink" groupId $ do gInfo <- withStore $ \db -> getGroupInfo db user groupId assertUserGroupRole gInfo GRAdmin when (mRole > GRMember) $ throwChatError $ CEGroupMemberInitialRole gInfo mRole @@ -1308,14 +1342,14 @@ processChatCommand = \case (connId, cReq) <- withAgent $ \a -> createConnection a (aUserId user) True SCMContact $ Just crClientData withStore $ \db -> createGroupLink db user gInfo connId cReq groupLinkId mRole pure $ CRGroupLinkCreated user gInfo cReq mRole - APIGroupLinkMemberRole groupId mRole' -> withUser $ \user -> withChatLock "groupLinkMemberRole " $ do + APIGroupLinkMemberRole groupId mRole' -> withUser $ \user -> withGroupLock "groupLinkMemberRole" groupId $ do gInfo <- withStore $ \db -> getGroupInfo db user groupId (groupLinkId, groupLink, mRole) <- withStore $ \db -> getGroupLink db user gInfo assertUserGroupRole gInfo GRAdmin when (mRole' > GRMember) $ throwChatError $ CEGroupMemberInitialRole gInfo mRole' when (mRole' /= mRole) $ withStore' $ \db -> setGroupLinkMemberRole db user groupLinkId mRole' pure $ CRGroupLink user gInfo groupLink mRole' - APIDeleteGroupLink groupId -> withUser $ \user -> withChatLock "deleteGroupLink" $ do + APIDeleteGroupLink groupId -> withUser $ \user -> withGroupLock "deleteGroupLink" groupId $ do gInfo <- withStore $ \db -> getGroupInfo db user groupId deleteGroupLink' user gInfo pure $ CRGroupLinkDeleted user gInfo @@ -1380,7 +1414,7 @@ processChatCommand = \case ForwardImage chatName fileId -> forwardFile chatName fileId SendImage SendFileDescription _chatName _f -> pure $ chatCmdError Nothing "TODO" ReceiveFile fileId rcvInline_ filePath_ -> withUser $ \_ -> - withChatLock "receiveFile" . procCmd $ do + withFileLock "receiveFile" fileId . procCmd $ do (user, ft) <- withStore $ \db -> getRcvFileTransferById db fileId (CRRcvFileAccepted user <$> acceptFileReceive user ft rcvInline_ filePath_) `catchError` processError user ft where @@ -1390,7 +1424,7 @@ processChatCommand = \case ChatErrorAgent (CONN DUPLICATE) _ -> pure $ CRRcvFileAcceptedSndCancelled user ft e -> throwError e CancelFile fileId -> withUser $ \user@User {userId} -> - withChatLock "cancelFile" . procCmd $ + withFileLock "cancelFile" fileId . procCmd $ withStore (\db -> getFileTransfer db user fileId) >>= \case FTSnd ftm@FileTransferMeta {cancelled} fts | cancelled -> throwChatError $ CEFileCancel fileId "file already cancelled" @@ -1482,7 +1516,6 @@ processChatCommand = \case map B.unpack [host, clientTs, cmd, res, bshow count] ResetAgentStats -> withAgent resetAgentStats >> ok_ where - withChatLock name action = asks chatLock >>= \l -> withLock l name action -- below code would make command responses asynchronous where they can be slow -- in View.hs `r'` should be defined as `id` in this case -- procCmd :: m ChatResponse -> m ChatResponse @@ -1538,7 +1571,7 @@ processChatCommand = \case CTGroup -> withStore $ \db -> getGroupChatItemIdByText db user cId (Just localDisplayName) msg _ -> throwChatError $ CECommandError "not supported" connectViaContact :: User -> ConnectionRequestUri 'CMContact -> m ChatResponse - connectViaContact user@User {userId} cReq@(CRContactUri ConnReqUriData {crClientData}) = withChatLock "connectViaContact" $ do + connectViaContact user@User {userId} cReq@(CRContactUri ConnReqUriData {crClientData}) = withFullChatLock "connectViaContact" $ do let cReqHash = ConnReqUriHash . C.sha256Hash $ strEncode cReq withStore' (\db -> getConnReqContactXContactId db user cReqHash) >>= \case (Just contact, _) -> pure $ CRContactAlreadyExists user contact @@ -1592,8 +1625,8 @@ processChatCommand = \case <$> withStore' (`getUserContacts` user) user' <- withStore $ \db -> updateUserProfile db user p' asks currentUser >>= atomically . (`writeTVar` Just user') - withChatLock "updateProfile" . procCmd $ do - forM_ contacts $ \ct -> do + procCmd $ do + forM_ contacts $ \ct -> withContactLock "updateProfile" (contactId' ct) $ do processContact user' ct `catchError` (toView . CRChatError (Just user)) pure $ CRUserProfileUpdated user' (fromLocalProfile p) p' where @@ -1614,7 +1647,7 @@ processChatCommand = \case let mergedProfile = userProfileToSend user (fromLocalProfile <$> incognitoProfile) (Just ct) mergedProfile' = userProfileToSend user (fromLocalProfile <$> incognitoProfile) (Just ct') when (mergedProfile' /= mergedProfile) $ - withChatLock "updateProfile" $ do + withContactLock "updateProfile" (contactId' ct) $ do void (sendDirectContactMessage ct' $ XInfo mergedProfile') `catchError` (toView . CRChatError (Just user)) when (directOrUsed ct') $ createSndFeatureItems user ct ct' pure $ CRContactPrefsUpdated user ct ct' @@ -1656,7 +1689,7 @@ processChatCommand = \case user <- getUserByContactId db ctId (user,) <$> getContact db user ctId calls <- asks currentCalls - withChatLock "currentCall" $ + withContactLock "currentCall" ctId $ atomically (TM.lookup ctId calls) >>= \case Nothing -> throwChatError CENoCurrentCall Just call@Call {contactId} @@ -1759,6 +1792,36 @@ processChatCommand = \case withStore' (`deleteUserRecord` user) ok_ +withFullChatLock :: ChatMonad' m => String -> m a -> m a +withFullChatLock name action = do + l <- asks chatLock + count <- asks entityLocks + withGetLock (waitForEntityLocks count $> l) name action + where + waitForEntityLocks count = readTVar count >>= \n -> when (n > 0) retry + +withEntityLock :: ChatMonad' m => String -> ChatLockEntity -> m a -> m a +withEntityLock name entity action = do + l <- asks chatLock + ls <- asks entityChatLocks + count <- asks entityLocks + E.bracket_ + (atomically $ waitForLock l >> modifyTVar' count (+ 1)) + (atomically $ modifyTVar' count $ \n -> max 0 (n - 1)) + (withLockMap ls entity name action) + +withContactLock :: ChatMonad' m => String -> ContactId -> m a -> m a +withContactLock name = withEntityLock name . CLContact + +withGroupLock :: ChatMonad' m => String -> GroupId -> m a -> m a +withGroupLock name = withEntityLock name . CLGroup + +withUserContactLock :: ChatMonad' m => String -> Int64 -> m a -> m a +withUserContactLock name = withEntityLock name . CLUserContact + +withFileLock :: ChatMonad' m => String -> Int64 -> m a -> m a +withFileLock name = withEntityLock name . CLFile + assertDirectAllowed :: ChatMonad m => User -> MsgDirection -> Contact -> CMEventTag e -> m () assertDirectAllowed user dir ct event = unless (allowedChatEvent || anyDirectOrUsed ct) . unlessM directMessagesAllowed $ @@ -2056,22 +2119,21 @@ deleteGroupLink_ user gInfo conn = do deleteAgentConnectionAsync user $ aConnId conn withStore' $ \db -> deleteGroupLink db user gInfo -agentSubscriber :: forall m. (MonadUnliftIO m, MonadReader ChatController m) => m () +agentSubscriber :: forall m. ChatMonad' m => m () agentSubscriber = do q <- asks $ subQ . smpAgent - l <- asks chatLock - forever $ atomically (readTBQueue q) >>= void . process l + forever $ atomically (readTBQueue q) >>= void . process where - process :: Lock -> (ACorrId, EntityId, APartyCmd 'Agent) -> m (Either ChatError ()) - process l (corrId, entId, APC e msg) = run $ case e of - SAENone -> processAgentMessageNoConn msg - SAEConn -> processAgentMessage corrId entId msg - SAERcvFile -> processAgentMsgRcvFile corrId entId msg - SAESndFile -> processAgentMsgSndFile corrId entId msg + process :: (ACorrId, EntityId, APartyCmd 'Agent) -> m (Either ChatError ()) + process (corrId, entId, APC e msg) = run $ case e of + SAENone -> processAgentMessageNoConn + SAEConn -> processAgentMessage corrId entId + SAERcvFile -> processAgentMsgRcvFile corrId entId + SAESndFile -> processAgentMsgSndFile corrId entId where run action = do let name = "agentSubscriber entity=" <> show e <> " entId=" <> str entId <> " msg=" <> str (aCommandTag msg) - withLock l name $ runExceptT $ action `catchError` (toView . CRChatError Nothing) + runExceptT $ action name msg `catchError` (toView . CRChatError Nothing) str :: StrEncoding a => a -> String str = B.unpack . strEncode @@ -2168,8 +2230,7 @@ subscribeUserConnections agentBatchSubscribe user = do forM_ err_ $ toView . CRSndFileSubError user ft void . forkIO $ do threadDelay 1000000 - l <- asks chatLock - when (fileStatus == FSConnected) . unlessM (isFileActive fileId sndFiles) . withLock l "subscribe sendFileChunk" $ + when (fileStatus == FSConnected) . unlessM (isFileActive fileId sndFiles) . withFullChatLock "subscribe sendFileChunk" $ sendFileChunk user ft rcvFileSubsToView :: Map ConnId (Either AgentErrorType ()) -> Map ConnId RcvFileTransfer -> m () rcvFileSubsToView rs = mapM_ (toView . uncurry (CRRcvFileSubError user)) . filterErrors . resultsFor rs @@ -2291,18 +2352,18 @@ expireChatItems user@User {userId} ttl sync = do membersToDelete <- withStore' $ \db -> getGroupMembersForExpiration db user gInfo forM_ membersToDelete $ \m -> withStore' $ \db -> deleteGroupMember db user m -processAgentMessage :: forall m. ChatMonad m => ACorrId -> ConnId -> ACommand 'Agent 'AEConn -> m () -processAgentMessage _ connId (DEL_RCVQ srv qId err_) = +processAgentMessage :: forall m. ChatMonad m => ACorrId -> ConnId -> String -> ACommand 'Agent 'AEConn -> m () +processAgentMessage _ connId _ (DEL_RCVQ srv qId err_) = toView $ CRAgentRcvQueueDeleted (AgentConnId connId) srv (AgentQueueId qId) err_ -processAgentMessage _ connId DEL_CONN = +processAgentMessage _ connId _ DEL_CONN = toView $ CRAgentConnDeleted (AgentConnId connId) -processAgentMessage corrId connId msg = +processAgentMessage corrId connId name msg = withStore' (`getUserByAConnId` AgentConnId connId) >>= \case - Just user -> processAgentMessageConn user corrId connId msg `catchError` (toView . CRChatError (Just user)) + Just user -> processAgentMessageConn user corrId connId name msg `catchError` (toView . CRChatError (Just user)) _ -> throwChatError $ CENoConnectionUser (AgentConnId connId) -processAgentMessageNoConn :: forall m. ChatMonad m => ACommand 'Agent 'AENone -> m () -processAgentMessageNoConn = \case +processAgentMessageNoConn :: forall m. ChatMonad m => String -> ACommand 'Agent 'AENone -> m () +processAgentMessageNoConn _ = \case CONNECT p h -> hostEvent $ CRHostConnected p h DISCONNECT p h -> hostEvent $ CRHostDisconnected p h DOWN srv conns -> serverEvent srv conns CRContactsDisconnected "disconnected" @@ -2317,8 +2378,8 @@ processAgentMessageNoConn = \case toView $ event srv cs showToast ("server " <> str) (safeDecodeUtf8 $ strEncode host) -processAgentMsgSndFile :: forall m. ChatMonad m => ACorrId -> SndFileId -> ACommand 'Agent 'AESndFile -> m () -processAgentMsgSndFile _corrId aFileId msg = +processAgentMsgSndFile :: forall m. ChatMonad m => ACorrId -> SndFileId -> String -> ACommand 'Agent 'AESndFile -> m () +processAgentMsgSndFile _corrId aFileId lockName msg = withStore' (`getUserByASndFileId` AgentSndFileId aFileId) >>= \case Just user -> process user `catchError` (toView . CRChatError (Just user)) _ -> throwChatError $ CENoSndFileUser $ AgentSndFileId aFileId @@ -2328,7 +2389,7 @@ processAgentMsgSndFile _corrId aFileId msg = (ft@FileTransferMeta {fileId, cancelled}, sfts) <- withStore $ \db -> do fileId <- getXFTPSndFileDBId db user $ AgentSndFileId aFileId getSndFileTransfer db user fileId - case msg of + withFileLock lockName fileId $ case msg of SFPROG sndProgress sndTotal -> unless cancelled $ do let status = CIFSSndTransfer {sndProgress, sndTotal} @@ -2403,8 +2464,8 @@ processAgentMsgSndFile _corrId aFileId msg = then pure msgDeliveryId else sendParts (partNo + 1) partSize rest -processAgentMsgRcvFile :: forall m. ChatMonad m => ACorrId -> RcvFileId -> ACommand 'Agent 'AERcvFile -> m () -processAgentMsgRcvFile _corrId aFileId msg = +processAgentMsgRcvFile :: forall m. ChatMonad m => ACorrId -> RcvFileId -> String -> ACommand 'Agent 'AERcvFile -> m () +processAgentMsgRcvFile _corrId aFileId lockName msg = withStore' (`getUserByARcvFileId` AgentRcvFileId aFileId) >>= \case Just user -> process user `catchError` (toView . CRChatError (Just user)) _ -> throwChatError $ CENoRcvFileUser $ AgentRcvFileId aFileId @@ -2414,7 +2475,7 @@ processAgentMsgRcvFile _corrId aFileId msg = ft@RcvFileTransfer {fileId, cancelled} <- withStore $ \db -> do fileId <- getXFTPRcvFileDBId db $ AgentRcvFileId aFileId getRcvFileTransfer db user fileId - case msg of + withFileLock lockName fileId $ case msg of RFPROG rcvProgress rcvTotal -> unless cancelled $ do let status = CIFSRcvTransfer {rcvProgress, rcvTotal} @@ -2443,15 +2504,15 @@ processAgentMsgRcvFile _corrId aFileId msg = agentXFTPDeleteRcvFile user aFileId fileId throwChatError $ CEXFTPRcvFile fileId (AgentRcvFileId aFileId) e -processAgentMessageConn :: forall m. ChatMonad m => User -> ACorrId -> ConnId -> ACommand 'Agent 'AEConn -> m () -processAgentMessageConn user _ agentConnId END = +processAgentMessageConn :: forall m. ChatMonad m => User -> ACorrId -> ConnId -> String -> ACommand 'Agent 'AEConn -> m () +processAgentMessageConn user _ agentConnId _ END = withStore (\db -> getConnectionEntity db user $ AgentConnId agentConnId) >>= \case RcvDirectMsgConnection _ (Just ct@Contact {localDisplayName = c}) -> do toView $ CRContactAnotherClient user ct whenUserNtfs user $ showToast (c <> "> ") "connected to another client" unsetActive $ ActiveC c entity -> toView $ CRSubscriptionEnd user entity -processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do +processAgentMessageConn user@User {userId} corrId agentConnId lockName agentMessage = do entity <- withStore (\db -> getConnectionEntity db user $ AgentConnId agentConnId) >>= updateConnStatus case entity of RcvDirectMsgConnection conn contact_ -> @@ -2486,7 +2547,7 @@ processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do processDirectMessage :: ACommand 'Agent e -> ConnectionEntity -> Connection -> Maybe Contact -> m () processDirectMessage agentMsg connEntity conn@Connection {connId, viaUserContactLink, groupLinkId, customUserProfileId} = \case - Nothing -> case agentMsg of + Nothing -> withEntityLock lockName (CLConnection connId) $ case agentMsg of CONF confId _ connInfo -> do -- [incognito] send saved profile incognitoProfile <- forM customUserProfileId $ \profileId -> withStore (\db -> getProfileById db userId profileId) @@ -2514,7 +2575,7 @@ processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do when (corrId /= "") $ withCompletedCommand conn agentMsg $ \_cmdData -> pure () -- TODO add debugging output _ -> pure () - Just ct@Contact {localDisplayName = c, contactId} -> case agentMsg of + Just ct@Contact {localDisplayName = c, contactId} -> withGroupLock lockName contactId $ case agentMsg of INV (ACR _ cReq) -> -- [async agent commands] XGrpMemIntro continuation on receiving INV withCompletedCommand conn agentMsg $ \_ -> @@ -2648,7 +2709,7 @@ processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do _ -> pure () processGroupMessage :: ACommand 'Agent e -> ConnectionEntity -> Connection -> GroupInfo -> GroupMember -> m () - processGroupMessage agentMsg connEntity conn@Connection {connId} gInfo@GroupInfo {groupId, localDisplayName = gName, groupProfile, membership, chatSettings} m = case agentMsg of + processGroupMessage agentMsg connEntity conn@Connection {connId} gInfo@GroupInfo {groupId, localDisplayName = gName, groupProfile, membership, chatSettings} m = withGroupLock lockName groupId $ case agentMsg of INV (ACR _ cReq) -> withCompletedCommand conn agentMsg $ \CommandData {cmdFunction} -> case cReq of @@ -2841,7 +2902,7 @@ processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do processSndFileConn :: ACommand 'Agent e -> ConnectionEntity -> Connection -> SndFileTransfer -> m () processSndFileConn agentMsg connEntity conn ft@SndFileTransfer {fileId, fileName, fileStatus} = - case agentMsg of + withFileLock lockName fileId $ case agentMsg of -- SMP CONF for SndFileConnection happens for direct file protocol -- when recipient of the file "joins" connection created by the sender CONF confId _ connInfo -> do @@ -2889,7 +2950,7 @@ processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do processRcvFileConn :: ACommand 'Agent e -> ConnectionEntity -> Connection -> RcvFileTransfer -> m () processRcvFileConn agentMsg connEntity conn ft@RcvFileTransfer {fileId, fileInvitation = FileInvitation {fileName}, grpMemberId} = - case agentMsg of + withFileLock lockName fileId $ case agentMsg of INV (ACR _ cReq) -> withCompletedCommand conn agentMsg $ \CommandData {cmdFunction} -> case cReq of @@ -2976,7 +3037,7 @@ processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do Nothing -> a processUserContactRequest :: ACommand 'Agent e -> ConnectionEntity -> Connection -> UserContact -> m () - processUserContactRequest agentMsg connEntity conn UserContact {userContactLinkId} = case agentMsg of + processUserContactRequest agentMsg connEntity conn UserContact {userContactLinkId} = withFileLock lockName userContactLinkId $ case agentMsg of REQ invId _ connInfo -> do ChatMessage {chatMsgEvent} <- parseChatMessage connInfo case chatMsgEvent of diff --git a/src/Simplex/Chat/Controller.hs b/src/Simplex/Chat/Controller.hs index a437f8a7a..4e17b6a02 100644 --- a/src/Simplex/Chat/Controller.hs +++ b/src/Simplex/Chat/Controller.hs @@ -29,7 +29,6 @@ import qualified Data.ByteString.Char8 as B import Data.Char (ord) import Data.Int (Int64) import Data.List.NonEmpty (NonEmpty) -import Data.Map.Strict (Map) import Data.String import Data.Text (Text) import Data.Time (ZonedTime) @@ -155,8 +154,10 @@ data ChatController = ChatController notifyQ :: TBQueue Notification, sendNotification :: Notification -> IO (), chatLock :: Lock, - sndFiles :: TVar (Map Int64 Handle), - rcvFiles :: TVar (Map Int64 Handle), + entityChatLocks :: TMap ChatLockEntity Lock, + entityLocks :: TVar Int, + sndFiles :: TMap Int64 Handle, + rcvFiles :: TMap Int64 Handle, currentCalls :: TMap ContactId Call, config :: ChatConfig, filesFolder :: TVar (Maybe FilePath), -- path to files folder for mobile apps, @@ -171,6 +172,14 @@ data ChatController = ChatController logFilePath :: Maybe FilePath } +data ChatLockEntity + = CLConnection Int64 + | CLContact ContactId + | CLGroup GroupId + | CLUserContact Int64 + | CLFile Int64 + deriving (Eq, Ord) + data HelpSection = HSMain | HSFiles | HSGroups | HSContacts | HSMyAddress | HSMarkdown | HSMessages | HSSettings | HSDatabase deriving (Show, Generic) diff --git a/stack.yaml b/stack.yaml index 3c800500a..7605b15bf 100644 --- a/stack.yaml +++ b/stack.yaml @@ -49,7 +49,7 @@ extra-deps: # - simplexmq-1.0.0@sha256:34b2004728ae396e3ae449cd090ba7410781e2b3cefc59259915f4ca5daa9ea8,8561 # - ../simplexmq - github: simplex-chat/simplexmq - commit: 2b93e0b17d0556988885757e5b7305f6a1db65a7 + commit: bd86d3b075e0178fc775af09e4b071a0555e73c5 - github: kazu-yamamoto/http2 commit: b5a1b7200cf5bc7044af34ba325284271f6dff25 # - ../direct-sqlcipher