core: increase cuncurrency with separate locks per each chat entity

This commit is contained in:
Evgeny Poberezkin 2023-04-17 00:22:43 +01:00
parent 5b4c183466
commit 2de1694f26
5 changed files with 174 additions and 104 deletions

View File

@ -7,7 +7,7 @@ constraints: zip +disable-bzip2 +disable-zstd
source-repository-package
type: git
location: https://github.com/simplex-chat/simplexmq.git
tag: 2b93e0b17d0556988885757e5b7305f6a1db65a7
tag: bd86d3b075e0178fc775af09e4b071a0555e73c5
source-repository-package
type: git

View File

@ -1,5 +1,5 @@
{
"https://github.com/simplex-chat/simplexmq.git"."2b93e0b17d0556988885757e5b7305f6a1db65a7" = "08dvvd5fgfypdrb0x9pd8f4xm4xwawbrb59k24zn7fdmg636q8ij";
"https://github.com/simplex-chat/simplexmq.git"."bd86d3b075e0178fc775af09e4b071a0555e73c5" = "0fyag49ljq9h8y452w9pfs4aszwbj9wmm5qsc7i45wbq9wizapv1";
"https://github.com/simplex-chat/hs-socks.git"."a30cc7a79a08d8108316094f8f2f82a0c5e1ac51" = "0yasvnr7g91k76mjkamvzab2kvlb1g5pspjyjn2fr6v83swjhj38";
"https://github.com/kazu-yamamoto/http2.git"."b5a1b7200cf5bc7044af34ba325284271f6dff25" = "0dqb50j57an64nf4qcf5vcz4xkd1vzvghvf8bk529c1k30r9nfzb";
"https://github.com/simplex-chat/direct-sqlcipher.git"."34309410eb2069b029b8fc1872deb1e0db123294" = "0kwkmhyfsn2lixdlgl15smgr1h5gjk7fky6abzh8rng2h5ymnffd";

View File

@ -161,20 +161,53 @@ newChatController ChatDatabase {chatStore, agentStore} user cfg@ChatConfig {agen
outputQ <- newTBQueueIO tbqSize
notifyQ <- newTBQueueIO tbqSize
chatLock <- newEmptyTMVarIO
sndFiles <- newTVarIO M.empty
rcvFiles <- newTVarIO M.empty
entityChatLocks <- atomically TM.empty
entityLocks <- newTVarIO 0
sndFiles <- atomically TM.empty
rcvFiles <- atomically TM.empty
currentCalls <- atomically TM.empty
filesFolder <- newTVarIO optFilesFolder
incognitoMode <- newTVarIO False
chatStoreChanged <- newTVarIO False
expireCIThreads <- newTVarIO M.empty
expireCIFlags <- newTVarIO M.empty
expireCIThreads <- atomically TM.empty
expireCIFlags <- atomically TM.empty
cleanupManagerAsync <- newTVarIO Nothing
timedItemThreads <- atomically TM.empty
showLiveItems <- newTVarIO False
userXFTPFileConfig <- newTVarIO $ xftpFileConfig cfg
tempDirectory <- newTVarIO tempDir
pure ChatController {activeTo, firstTime, currentUser, smpAgent, agentAsync, chatStore, chatStoreChanged, idsDrg, inputQ, outputQ, notifyQ, chatLock, sndFiles, rcvFiles, currentCalls, config, sendNotification, incognitoMode, filesFolder, expireCIThreads, expireCIFlags, cleanupManagerAsync, timedItemThreads, showLiveItems, userXFTPFileConfig, tempDirectory, logFilePath = logFile}
pure
ChatController
{ activeTo,
firstTime,
currentUser,
smpAgent,
agentAsync,
chatStore,
chatStoreChanged,
idsDrg,
inputQ,
outputQ,
notifyQ,
chatLock,
entityChatLocks,
entityLocks,
sndFiles,
rcvFiles,
currentCalls,
config,
sendNotification,
incognitoMode,
filesFolder,
expireCIThreads,
expireCIFlags,
cleanupManagerAsync,
timedItemThreads,
showLiveItems,
userXFTPFileConfig,
tempDirectory,
logFilePath = logFile
}
where
configServers :: DefaultAgentServers
configServers =
@ -378,7 +411,7 @@ processChatCommand = \case
user' <- privateGetUser userId'
validateUserPassword user user' viewPwd_
checkDeleteChatUser user'
withChatLock "deleteUser" . procCmd $ deleteChatUser user' delSMPQueues
withFullChatLock "deleteUser" . procCmd $ deleteChatUser user' delSMPQueues
DeleteUser uName delSMPQueues viewPwd_ -> withUserName uName $ \userId -> APIDeleteUser userId delSMPQueues viewPwd_
StartChat subConns enableExpireCIs -> withUser' $ \_ ->
asks agentAsync >>= readTVarIO >>= \case
@ -435,8 +468,8 @@ processChatCommand = \case
CTContactRequest -> pure $ chatCmdError (Just user) "not implemented"
CTContactConnection -> pure $ chatCmdError (Just user) "not supported"
APIGetChatItems _pagination -> pure $ chatCmdError Nothing "not implemented"
APISendMessage (ChatRef cType chatId) live (ComposedMessage file_ quotedItemId_ mc) -> withUser $ \user@User {userId} -> withChatLock "sendMessage" $ case cType of
CTDirect -> do
APISendMessage (ChatRef cType chatId) live (ComposedMessage file_ quotedItemId_ mc) -> withUser $ \user@User {userId} -> case cType of
CTDirect -> withContactLock "sendMessage" chatId $ do
ct@Contact {contactId, localDisplayName = c, contactUsed} <- withStore $ \db -> getContact db user chatId
assertDirectAllowed user MDSnd ct XMsgNew_
unless contactUsed $ withStore' $ \db -> updateContactUsed db user ct
@ -497,7 +530,7 @@ processChatCommand = \case
quoteData ChatItem {content = CISndMsgContent qmc} = pure (qmc, CIQDirectSnd, True)
quoteData ChatItem {content = CIRcvMsgContent qmc} = pure (qmc, CIQDirectRcv, False)
quoteData _ = throwChatError CEInvalidQuote
CTGroup -> do
CTGroup -> withGroupLock "sendMessage" chatId $ do
g@(Group gInfo@GroupInfo {groupId, membership, localDisplayName = gName} ms) <- withStore $ \db -> getGroup db user chatId
assertUserGroupRole gInfo GRAuthor
if isVoice mc && not (groupFeatureAllowed SGFVoice gInfo)
@ -608,8 +641,8 @@ processChatCommand = \case
unzipMaybe3 :: Maybe (a, b, c) -> (Maybe a, Maybe b, Maybe c)
unzipMaybe3 (Just (a, b, c)) = (Just a, Just b, Just c)
unzipMaybe3 _ = (Nothing, Nothing, Nothing)
APIUpdateChatItem (ChatRef cType chatId) itemId live mc -> withUser $ \user -> withChatLock "updateChatItem" $ case cType of
CTDirect -> do
APIUpdateChatItem (ChatRef cType chatId) itemId live mc -> withUser $ \user -> case cType of
CTDirect -> withContactLock "updateChatItem" chatId $ do
(ct@Contact {contactId, localDisplayName = c}, cci) <- withStore $ \db -> (,) <$> getContact db user chatId <*> getDirectChatItem db user chatId itemId
assertDirectAllowed user MDSnd ct XMsgUpdate_
case cci of
@ -623,7 +656,7 @@ processChatCommand = \case
pure $ CRChatItemUpdated user (AChatItem SCTDirect SMDSnd (DirectChat ct) ci')
_ -> throwChatError CEInvalidChatItemUpdate
CChatItem SMDRcv _ -> throwChatError CEInvalidChatItemUpdate
CTGroup -> do
CTGroup -> withGroupLock "updateChatItem" chatId $ do
Group gInfo@GroupInfo {groupId, localDisplayName = gName} ms <- withStore $ \db -> getGroup db user chatId
assertUserGroupRole gInfo GRAuthor
cci <- withStore $ \db -> getGroupChatItem db user chatId itemId
@ -640,8 +673,8 @@ processChatCommand = \case
CChatItem SMDRcv _ -> throwChatError CEInvalidChatItemUpdate
CTContactRequest -> pure $ chatCmdError (Just user) "not supported"
CTContactConnection -> pure $ chatCmdError (Just user) "not supported"
APIDeleteChatItem (ChatRef cType chatId) itemId mode -> withUser $ \user -> withChatLock "deleteChatItem" $ case cType of
CTDirect -> do
APIDeleteChatItem (ChatRef cType chatId) itemId mode -> withUser $ \user -> case cType of
CTDirect -> withContactLock "deleteChatItem" chatId $ do
(ct@Contact {localDisplayName = c}, ci@(CChatItem msgDir ChatItem {meta = CIMeta {itemSharedMsgId}})) <- withStore $ \db -> (,) <$> getContact db user chatId <*> getDirectChatItem db user chatId itemId
case (mode, msgDir, itemSharedMsgId) of
(CIDMInternal, _, _) -> deleteDirectCI user ct ci True False
@ -653,7 +686,7 @@ processChatCommand = \case
then deleteDirectCI user ct ci True False
else markDirectCIDeleted user ct ci msgId True
(CIDMBroadcast, _, _) -> throwChatError CEInvalidChatItemDelete
CTGroup -> do
CTGroup -> withGroupLock "deleteChatItem" chatId $ do
Group gInfo ms <- withStore $ \db -> getGroup db user chatId
ci@(CChatItem msgDir ChatItem {meta = CIMeta {itemSharedMsgId}}) <- withStore $ \db -> getGroupChatItem db user chatId itemId
case (mode, msgDir, itemSharedMsgId) of
@ -665,7 +698,7 @@ processChatCommand = \case
(CIDMBroadcast, _, _) -> throwChatError CEInvalidChatItemDelete
CTContactRequest -> pure $ chatCmdError (Just user) "not supported"
CTContactConnection -> pure $ chatCmdError (Just user) "not supported"
APIDeleteMemberChatItem gId mId itemId -> withUser $ \user -> withChatLock "deleteChatItem" $ do
APIDeleteMemberChatItem gId mId itemId -> withUser $ \user -> withGroupLock "deleteChatItem" gId $ do
Group gInfo@GroupInfo {membership} ms <- withStore $ \db -> getGroup db user gId
ci@(CChatItem _ ChatItem {chatDir, meta = CIMeta {itemSharedMsgId}}) <- withStore $ \db -> getGroupChatItem db user gId itemId
case (chatDir, itemSharedMsgId) of
@ -715,7 +748,7 @@ processChatCommand = \case
ct@Contact {localDisplayName} <- withStore $ \db -> getContact db user chatId
filesInfo <- withStore' $ \db -> getContactFileInfo db user ct
contactConnIds <- map aConnId <$> withStore (\db -> getContactConnections db userId ct)
withChatLock "deleteChat direct" . procCmd $ do
withContactLock "deleteChat direct" chatId . procCmd $ do
fileAgentConnIds <- concat <$> forM filesInfo (deleteFile user)
deleteAgentConnectionsAsync user $ fileAgentConnIds <> contactConnIds
-- functions below are called in separate transactions to prevent crashes on android
@ -724,7 +757,7 @@ processChatCommand = \case
withStore' $ \db -> deleteContact db user ct
unsetActive $ ActiveC localDisplayName
pure $ CRContactDeleted user ct
CTContactConnection -> withChatLock "deleteChat contactConnection" . procCmd $ do
CTContactConnection -> withUserContactLock "deleteChat contactConnection" chatId . procCmd $ do
conn@PendingContactConnection {pccAgentConnId = AgentConnId acId} <- withStore $ \db -> getPendingContactConnection db userId chatId
deleteAgentConnectionAsync user acId
withStore' $ \db -> deletePendingContactConnection db userId chatId
@ -735,7 +768,7 @@ processChatCommand = \case
canDelete = isOwner || not (memberCurrent membership)
unless canDelete $ throwChatError $ CEGroupUserRole gInfo GROwner
filesInfo <- withStore' $ \db -> getGroupFileInfo db user gInfo
withChatLock "deleteChat group" . procCmd $ do
withGroupLock "deleteChat group" chatId . procCmd $ do
deleteFilesAndConns user filesInfo
when (memberActive membership && isOwner) . void $ sendGroupMessage user gInfo members XGrpDel
deleteGroupLinkIfExists user gInfo
@ -782,26 +815,28 @@ processChatCommand = \case
pure $ CRChatCleared user (AChatInfo SCTGroup $ GroupChat gInfo)
CTContactConnection -> pure $ chatCmdError (Just user) "not supported"
CTContactRequest -> pure $ chatCmdError (Just user) "not supported"
APIAcceptContact connReqId -> withUser $ \_ -> withChatLock "acceptContact" $ do
(user, cReq) <- withStore $ \db -> getContactRequest' db connReqId
APIAcceptContact connReqId -> withUser $ \_ -> do
(user, cReq@UserContactRequest {userContactLinkId}) <- withStore $ \db -> getContactRequest' db connReqId
-- [incognito] generate profile to send, create connection with incognito profile
incognito <- readTVarIO =<< asks incognitoMode
incognitoProfile <- if incognito then Just . NewIncognito <$> liftIO generateRandomProfile else pure Nothing
ct <- acceptContactRequest user cReq incognitoProfile
pure $ CRAcceptingContactRequest user ct
APIRejectContact connReqId -> withUser $ \user -> withChatLock "rejectContact" $ do
cReq@UserContactRequest {agentContactConnId = AgentConnId connId, agentInvitationId = AgentInvId invId} <-
withUserContactLock "acceptContact" userContactLinkId $ do
incognito <- readTVarIO =<< asks incognitoMode
incognitoProfile <- if incognito then Just . NewIncognito <$> liftIO generateRandomProfile else pure Nothing
ct <- acceptContactRequest user cReq incognitoProfile
pure $ CRAcceptingContactRequest user ct
APIRejectContact connReqId -> withUser $ \user -> do
cReq@UserContactRequest {userContactLinkId, agentContactConnId = AgentConnId connId, agentInvitationId = AgentInvId invId} <-
withStore $ \db ->
getContactRequest db user connReqId
`E.finally` liftIO (deleteContactRequest db user connReqId)
withAgent $ \a -> rejectContact a connId invId
pure $ CRContactRequestRejected user cReq
withUserContactLock "rejectContact" userContactLinkId $ do
withAgent $ \a -> rejectContact a connId invId
pure $ CRContactRequestRejected user cReq
APISendCallInvitation contactId callType -> withUser $ \user -> do
-- party initiating call
ct <- withStore $ \db -> getContact db user contactId
assertDirectAllowed user MDSnd ct XCallInv_
calls <- asks currentCalls
withChatLock "sendCallInvitation" $ do
withContactLock "sendCallInvitation" contactId $ do
callId <- CallId <$> drgRandomBytes 16
dhKeyPair <- if encryptedCall callType then Just <$> liftIO C.generateKeyPair' else pure Nothing
let invitation = CallInvitation {callType, callDhPubKey = fst <$> dhKeyPair}
@ -923,12 +958,11 @@ processChatCommand = \case
toServerCfg server = ServerCfg {server, preset = True, tested = Nothing, enabled = True}
GetUserProtoServers aProtocol -> withUser $ \User {userId} ->
processChatCommand $ APIGetUserProtoServers userId aProtocol
APISetUserProtoServers userId (APSC p (ProtoServersConfig servers)) -> withUserId userId $ \user -> withServerProtocol p $
withChatLock "setUserSMPServers" $ do
withStore $ \db -> overwriteProtocolServers db user servers
cfg <- asks config
withAgent $ \a -> setProtocolServers a (aUserId user) $ activeAgentServers cfg p servers
ok user
APISetUserProtoServers userId (APSC p (ProtoServersConfig servers)) -> withUserId userId $ \user -> withServerProtocol p $ do
withStore $ \db -> overwriteProtocolServers db user servers
cfg <- asks config
withAgent $ \a -> setProtocolServers a (aUserId user) $ activeAgentServers cfg p servers
ok user
SetUserProtoServers serversConfig -> withUser $ \User {userId} ->
processChatCommand $ APISetUserProtoServers userId serversConfig
APITestProtoServer userId srv@(AProtoServerWithAuth p server) -> withUserId userId $ \user ->
@ -939,7 +973,7 @@ processChatCommand = \case
APISetChatItemTTL userId newTTL_ -> withUser' $ \user -> do
checkSameUser userId user
checkStoreNotChanged $
withChatLock "setChatItemTTL" $ do
withFullChatLock "setChatItemTTL" $ do
case newTTL_ of
Nothing -> do
withStore' $ \db -> setChatItemTTL db user newTTL_
@ -1061,7 +1095,7 @@ processChatCommand = \case
EnableGroupMember gName mName -> withMemberName gName mName $ \gId mId -> APIEnableGroupMember gId mId
ChatHelp section -> pure $ CRChatHelp section
Welcome -> withUser $ pure . CRWelcome
APIAddContact userId -> withUserId userId $ \user -> withChatLock "addContact" . procCmd $ do
APIAddContact userId -> withUserId userId $ \user -> withFullChatLock "addContact" . procCmd $ do
-- [incognito] generate profile for connection
incognito <- readTVarIO =<< asks incognitoMode
incognitoProfile <- if incognito then Just <$> liftIO generateRandomProfile else pure Nothing
@ -1071,7 +1105,7 @@ processChatCommand = \case
pure $ CRInvitation user cReq
AddContact -> withUser $ \User {userId} ->
processChatCommand $ APIAddContact userId
APIConnect userId (Just (ACR SCMInvitation cReq)) -> withUserId userId $ \user -> withChatLock "connect" . procCmd $ do
APIConnect userId (Just (ACR SCMInvitation cReq)) -> withUserId userId $ \user -> withFullChatLock "connect" . procCmd $ do
-- [incognito] generate profile to send
incognito <- readTVarIO =<< asks incognitoMode
incognitoProfile <- if incognito then Just <$> liftIO generateRandomProfile else pure Nothing
@ -1093,13 +1127,13 @@ processChatCommand = \case
CRContactsList user <$> withStore' (`getUserContacts` user)
ListContacts -> withUser $ \User {userId} ->
processChatCommand $ APIListContacts userId
APICreateMyAddress userId -> withUserId userId $ \user -> withChatLock "createMyAddress" . procCmd $ do
APICreateMyAddress userId -> withUserId userId $ \user -> withFullChatLock "createMyAddress" . procCmd $ do
(connId, cReq) <- withAgent $ \a -> createConnection a (aUserId user) True SCMContact Nothing
withStore $ \db -> createUserContactLink db user connId cReq
pure $ CRUserContactLinkCreated user cReq
CreateMyAddress -> withUser $ \User {userId} ->
processChatCommand $ APICreateMyAddress userId
APIDeleteMyAddress userId -> withUserId userId $ \user -> withChatLock "deleteMyAddress" $ do
APIDeleteMyAddress userId -> withUserId userId $ \user -> withFullChatLock "deleteMyAddress" $ do
conns <- withStore (`getUserAddressConnections` user)
procCmd $ do
deleteAgentConnectionsAsync user $ map aConnId conns
@ -1126,17 +1160,17 @@ processChatCommand = \case
SendLiveMessage chatName msg -> sendTextMessage chatName msg True
SendMessageBroadcast msg -> withUser $ \user -> do
contacts <- withStore' (`getUserContacts` user)
withChatLock "sendMessageBroadcast" . procCmd $ do
procCmd $ do
let mc = MCText msg
cts = filter (\ct -> isReady ct && directOrUsed ct) contacts
forM_ cts $ \ct ->
void
( do
(sndMsg, _) <- sendDirectContactMessage ct (XMsgNew $ MCSimple (extMsgContent mc Nothing))
saveSndChatItem user (CDDirectSnd ct) sndMsg (CISndMsgContent mc)
)
`catchError` (toView . CRChatError (Just user))
withContactLock "sendMessageBroadcast" (contactId' ct) $
void (send user ct mc) `catchError` (toView . CRChatError (Just user))
CRBroadcastSent user mc (length cts) <$> liftIO getZonedTime
where
send user ct mc = do
(sndMsg, _) <- sendDirectContactMessage ct (XMsgNew $ MCSimple (extMsgContent mc Nothing))
saveSndChatItem user (CDDirectSnd ct) sndMsg (CISndMsgContent mc)
SendMessageQuote cName (AMsgDirection msgDir) quotedMsg msg -> withUser $ \user@User {userId} -> do
contactId <- withStore $ \db -> getContactIdByName db user cName
quotedItemId <- withStore $ \db -> getDirectChatItemIdByText db userId contactId msgDir quotedMsg
@ -1165,7 +1199,7 @@ processChatCommand = \case
pure $ CRGroupCreated user groupInfo
NewGroup gProfile -> withUser $ \User {userId} ->
processChatCommand $ APINewGroup userId gProfile
APIAddMember groupId contactId memRole -> withUser $ \user -> withChatLock "addMember" $ do
APIAddMember groupId contactId memRole -> withUser $ \user -> withGroupLock "addMember" groupId $ do
-- TODO for large groups: no need to load all members to determine if contact is a member
(group, contact) <- withStore $ \db -> (,) <$> getGroup db user groupId <*> getContact db user contactId
assertDirectAllowed user MDSnd contact XGrpInv_
@ -1195,7 +1229,7 @@ processChatCommand = \case
| otherwise -> throwChatError $ CEGroupDuplicateMember cName
APIJoinGroup groupId -> withUser $ \user@User {userId} -> do
ReceivedGroupInvitation {fromMember, connRequest, groupInfo = g@GroupInfo {membership}} <- withStore $ \db -> getGroupInvitation db user groupId
withChatLock "joinGroup" . procCmd $ do
withGroupLock "joinGroup" groupId . procCmd $ do
agentConnId <- withAgent $ \a -> joinConnection a (aUserId user) True connRequest . directMessage $ XGrpAcpt (memberId (membership :: GroupMember))
withStore' $ \db -> do
createMemberConnection db userId fromMember agentConnId
@ -1222,7 +1256,7 @@ processChatCommand = \case
changeMemberRole user gInfo members m gEvent = do
let GroupMember {memberId = mId, memberRole = mRole, memberStatus = mStatus, memberContactId, localDisplayName = cName} = m
assertUserGroupRole gInfo $ maximum [GRAdmin, mRole, memRole]
withChatLock "memberRole" . procCmd $ do
withGroupLock "memberRole" groupId . procCmd $ do
unless (mRole == memRole) $ do
withStore' $ \db -> updateGroupMemberRole db user m memRole
case mStatus of
@ -1241,7 +1275,7 @@ processChatCommand = \case
Nothing -> throwChatError CEGroupMemberNotFound
Just m@GroupMember {memberId = mId, memberRole = mRole, memberStatus = mStatus, memberProfile} -> do
assertUserGroupRole gInfo $ max GRAdmin mRole
withChatLock "removeMember" . procCmd $ do
withGroupLock "removeMember" groupId . procCmd $ do
case mStatus of
GSMemInvited -> do
deleteMemberConnection user m
@ -1256,7 +1290,7 @@ processChatCommand = \case
pure $ CRUserDeletedMember user gInfo m {memberStatus = GSMemRemoved}
APILeaveGroup groupId -> withUser $ \user@User {userId} -> do
Group gInfo@GroupInfo {membership} members <- withStore $ \db -> getGroup db user groupId
withChatLock "leaveGroup" . procCmd $ do
withGroupLock "leaveGroup" groupId . procCmd $ do
msg <- sendGroupMessage user gInfo members XGrpLeave
ci <- saveSndChatItem user (CDGroupSnd gInfo) msg (CISndGroupEvent SGEUserLeft)
toView $ CRNewChatItem user (AChatItem SCTGroup SMDSnd (GroupChat gInfo) ci)
@ -1299,7 +1333,7 @@ processChatCommand = \case
CRGroupProfile user <$> withStore (\db -> getGroupInfoByName db user gName)
UpdateGroupDescription gName description ->
updateGroupProfileByName gName $ \p -> p {description}
APICreateGroupLink groupId mRole -> withUser $ \user -> withChatLock "createGroupLink" $ do
APICreateGroupLink groupId mRole -> withUser $ \user -> withGroupLock "createGroupLink" groupId $ do
gInfo <- withStore $ \db -> getGroupInfo db user groupId
assertUserGroupRole gInfo GRAdmin
when (mRole > GRMember) $ throwChatError $ CEGroupMemberInitialRole gInfo mRole
@ -1308,14 +1342,14 @@ processChatCommand = \case
(connId, cReq) <- withAgent $ \a -> createConnection a (aUserId user) True SCMContact $ Just crClientData
withStore $ \db -> createGroupLink db user gInfo connId cReq groupLinkId mRole
pure $ CRGroupLinkCreated user gInfo cReq mRole
APIGroupLinkMemberRole groupId mRole' -> withUser $ \user -> withChatLock "groupLinkMemberRole " $ do
APIGroupLinkMemberRole groupId mRole' -> withUser $ \user -> withGroupLock "groupLinkMemberRole" groupId $ do
gInfo <- withStore $ \db -> getGroupInfo db user groupId
(groupLinkId, groupLink, mRole) <- withStore $ \db -> getGroupLink db user gInfo
assertUserGroupRole gInfo GRAdmin
when (mRole' > GRMember) $ throwChatError $ CEGroupMemberInitialRole gInfo mRole'
when (mRole' /= mRole) $ withStore' $ \db -> setGroupLinkMemberRole db user groupLinkId mRole'
pure $ CRGroupLink user gInfo groupLink mRole'
APIDeleteGroupLink groupId -> withUser $ \user -> withChatLock "deleteGroupLink" $ do
APIDeleteGroupLink groupId -> withUser $ \user -> withGroupLock "deleteGroupLink" groupId $ do
gInfo <- withStore $ \db -> getGroupInfo db user groupId
deleteGroupLink' user gInfo
pure $ CRGroupLinkDeleted user gInfo
@ -1380,7 +1414,7 @@ processChatCommand = \case
ForwardImage chatName fileId -> forwardFile chatName fileId SendImage
SendFileDescription _chatName _f -> pure $ chatCmdError Nothing "TODO"
ReceiveFile fileId rcvInline_ filePath_ -> withUser $ \_ ->
withChatLock "receiveFile" . procCmd $ do
withFileLock "receiveFile" fileId . procCmd $ do
(user, ft) <- withStore $ \db -> getRcvFileTransferById db fileId
(CRRcvFileAccepted user <$> acceptFileReceive user ft rcvInline_ filePath_) `catchError` processError user ft
where
@ -1390,7 +1424,7 @@ processChatCommand = \case
ChatErrorAgent (CONN DUPLICATE) _ -> pure $ CRRcvFileAcceptedSndCancelled user ft
e -> throwError e
CancelFile fileId -> withUser $ \user@User {userId} ->
withChatLock "cancelFile" . procCmd $
withFileLock "cancelFile" fileId . procCmd $
withStore (\db -> getFileTransfer db user fileId) >>= \case
FTSnd ftm@FileTransferMeta {cancelled} fts
| cancelled -> throwChatError $ CEFileCancel fileId "file already cancelled"
@ -1482,7 +1516,6 @@ processChatCommand = \case
map B.unpack [host, clientTs, cmd, res, bshow count]
ResetAgentStats -> withAgent resetAgentStats >> ok_
where
withChatLock name action = asks chatLock >>= \l -> withLock l name action
-- below code would make command responses asynchronous where they can be slow
-- in View.hs `r'` should be defined as `id` in this case
-- procCmd :: m ChatResponse -> m ChatResponse
@ -1538,7 +1571,7 @@ processChatCommand = \case
CTGroup -> withStore $ \db -> getGroupChatItemIdByText db user cId (Just localDisplayName) msg
_ -> throwChatError $ CECommandError "not supported"
connectViaContact :: User -> ConnectionRequestUri 'CMContact -> m ChatResponse
connectViaContact user@User {userId} cReq@(CRContactUri ConnReqUriData {crClientData}) = withChatLock "connectViaContact" $ do
connectViaContact user@User {userId} cReq@(CRContactUri ConnReqUriData {crClientData}) = withFullChatLock "connectViaContact" $ do
let cReqHash = ConnReqUriHash . C.sha256Hash $ strEncode cReq
withStore' (\db -> getConnReqContactXContactId db user cReqHash) >>= \case
(Just contact, _) -> pure $ CRContactAlreadyExists user contact
@ -1592,8 +1625,8 @@ processChatCommand = \case
<$> withStore' (`getUserContacts` user)
user' <- withStore $ \db -> updateUserProfile db user p'
asks currentUser >>= atomically . (`writeTVar` Just user')
withChatLock "updateProfile" . procCmd $ do
forM_ contacts $ \ct -> do
procCmd $ do
forM_ contacts $ \ct -> withContactLock "updateProfile" (contactId' ct) $ do
processContact user' ct `catchError` (toView . CRChatError (Just user))
pure $ CRUserProfileUpdated user' (fromLocalProfile p) p'
where
@ -1614,7 +1647,7 @@ processChatCommand = \case
let mergedProfile = userProfileToSend user (fromLocalProfile <$> incognitoProfile) (Just ct)
mergedProfile' = userProfileToSend user (fromLocalProfile <$> incognitoProfile) (Just ct')
when (mergedProfile' /= mergedProfile) $
withChatLock "updateProfile" $ do
withContactLock "updateProfile" (contactId' ct) $ do
void (sendDirectContactMessage ct' $ XInfo mergedProfile') `catchError` (toView . CRChatError (Just user))
when (directOrUsed ct') $ createSndFeatureItems user ct ct'
pure $ CRContactPrefsUpdated user ct ct'
@ -1656,7 +1689,7 @@ processChatCommand = \case
user <- getUserByContactId db ctId
(user,) <$> getContact db user ctId
calls <- asks currentCalls
withChatLock "currentCall" $
withContactLock "currentCall" ctId $
atomically (TM.lookup ctId calls) >>= \case
Nothing -> throwChatError CENoCurrentCall
Just call@Call {contactId}
@ -1759,6 +1792,36 @@ processChatCommand = \case
withStore' (`deleteUserRecord` user)
ok_
withFullChatLock :: ChatMonad' m => String -> m a -> m a
withFullChatLock name action = do
l <- asks chatLock
count <- asks entityLocks
withGetLock (waitForEntityLocks count $> l) name action
where
waitForEntityLocks count = readTVar count >>= \n -> when (n > 0) retry
withEntityLock :: ChatMonad' m => String -> ChatLockEntity -> m a -> m a
withEntityLock name entity action = do
l <- asks chatLock
ls <- asks entityChatLocks
count <- asks entityLocks
E.bracket_
(atomically $ waitForLock l >> modifyTVar' count (+ 1))
(atomically $ modifyTVar' count $ \n -> max 0 (n - 1))
(withLockMap ls entity name action)
withContactLock :: ChatMonad' m => String -> ContactId -> m a -> m a
withContactLock name = withEntityLock name . CLContact
withGroupLock :: ChatMonad' m => String -> GroupId -> m a -> m a
withGroupLock name = withEntityLock name . CLGroup
withUserContactLock :: ChatMonad' m => String -> Int64 -> m a -> m a
withUserContactLock name = withEntityLock name . CLUserContact
withFileLock :: ChatMonad' m => String -> Int64 -> m a -> m a
withFileLock name = withEntityLock name . CLFile
assertDirectAllowed :: ChatMonad m => User -> MsgDirection -> Contact -> CMEventTag e -> m ()
assertDirectAllowed user dir ct event =
unless (allowedChatEvent || anyDirectOrUsed ct) . unlessM directMessagesAllowed $
@ -2056,22 +2119,21 @@ deleteGroupLink_ user gInfo conn = do
deleteAgentConnectionAsync user $ aConnId conn
withStore' $ \db -> deleteGroupLink db user gInfo
agentSubscriber :: forall m. (MonadUnliftIO m, MonadReader ChatController m) => m ()
agentSubscriber :: forall m. ChatMonad' m => m ()
agentSubscriber = do
q <- asks $ subQ . smpAgent
l <- asks chatLock
forever $ atomically (readTBQueue q) >>= void . process l
forever $ atomically (readTBQueue q) >>= void . process
where
process :: Lock -> (ACorrId, EntityId, APartyCmd 'Agent) -> m (Either ChatError ())
process l (corrId, entId, APC e msg) = run $ case e of
SAENone -> processAgentMessageNoConn msg
SAEConn -> processAgentMessage corrId entId msg
SAERcvFile -> processAgentMsgRcvFile corrId entId msg
SAESndFile -> processAgentMsgSndFile corrId entId msg
process :: (ACorrId, EntityId, APartyCmd 'Agent) -> m (Either ChatError ())
process (corrId, entId, APC e msg) = run $ case e of
SAENone -> processAgentMessageNoConn
SAEConn -> processAgentMessage corrId entId
SAERcvFile -> processAgentMsgRcvFile corrId entId
SAESndFile -> processAgentMsgSndFile corrId entId
where
run action = do
let name = "agentSubscriber entity=" <> show e <> " entId=" <> str entId <> " msg=" <> str (aCommandTag msg)
withLock l name $ runExceptT $ action `catchError` (toView . CRChatError Nothing)
runExceptT $ action name msg `catchError` (toView . CRChatError Nothing)
str :: StrEncoding a => a -> String
str = B.unpack . strEncode
@ -2168,8 +2230,7 @@ subscribeUserConnections agentBatchSubscribe user = do
forM_ err_ $ toView . CRSndFileSubError user ft
void . forkIO $ do
threadDelay 1000000
l <- asks chatLock
when (fileStatus == FSConnected) . unlessM (isFileActive fileId sndFiles) . withLock l "subscribe sendFileChunk" $
when (fileStatus == FSConnected) . unlessM (isFileActive fileId sndFiles) . withFullChatLock "subscribe sendFileChunk" $
sendFileChunk user ft
rcvFileSubsToView :: Map ConnId (Either AgentErrorType ()) -> Map ConnId RcvFileTransfer -> m ()
rcvFileSubsToView rs = mapM_ (toView . uncurry (CRRcvFileSubError user)) . filterErrors . resultsFor rs
@ -2291,18 +2352,18 @@ expireChatItems user@User {userId} ttl sync = do
membersToDelete <- withStore' $ \db -> getGroupMembersForExpiration db user gInfo
forM_ membersToDelete $ \m -> withStore' $ \db -> deleteGroupMember db user m
processAgentMessage :: forall m. ChatMonad m => ACorrId -> ConnId -> ACommand 'Agent 'AEConn -> m ()
processAgentMessage _ connId (DEL_RCVQ srv qId err_) =
processAgentMessage :: forall m. ChatMonad m => ACorrId -> ConnId -> String -> ACommand 'Agent 'AEConn -> m ()
processAgentMessage _ connId _ (DEL_RCVQ srv qId err_) =
toView $ CRAgentRcvQueueDeleted (AgentConnId connId) srv (AgentQueueId qId) err_
processAgentMessage _ connId DEL_CONN =
processAgentMessage _ connId _ DEL_CONN =
toView $ CRAgentConnDeleted (AgentConnId connId)
processAgentMessage corrId connId msg =
processAgentMessage corrId connId name msg =
withStore' (`getUserByAConnId` AgentConnId connId) >>= \case
Just user -> processAgentMessageConn user corrId connId msg `catchError` (toView . CRChatError (Just user))
Just user -> processAgentMessageConn user corrId connId name msg `catchError` (toView . CRChatError (Just user))
_ -> throwChatError $ CENoConnectionUser (AgentConnId connId)
processAgentMessageNoConn :: forall m. ChatMonad m => ACommand 'Agent 'AENone -> m ()
processAgentMessageNoConn = \case
processAgentMessageNoConn :: forall m. ChatMonad m => String -> ACommand 'Agent 'AENone -> m ()
processAgentMessageNoConn _ = \case
CONNECT p h -> hostEvent $ CRHostConnected p h
DISCONNECT p h -> hostEvent $ CRHostDisconnected p h
DOWN srv conns -> serverEvent srv conns CRContactsDisconnected "disconnected"
@ -2317,8 +2378,8 @@ processAgentMessageNoConn = \case
toView $ event srv cs
showToast ("server " <> str) (safeDecodeUtf8 $ strEncode host)
processAgentMsgSndFile :: forall m. ChatMonad m => ACorrId -> SndFileId -> ACommand 'Agent 'AESndFile -> m ()
processAgentMsgSndFile _corrId aFileId msg =
processAgentMsgSndFile :: forall m. ChatMonad m => ACorrId -> SndFileId -> String -> ACommand 'Agent 'AESndFile -> m ()
processAgentMsgSndFile _corrId aFileId lockName msg =
withStore' (`getUserByASndFileId` AgentSndFileId aFileId) >>= \case
Just user -> process user `catchError` (toView . CRChatError (Just user))
_ -> throwChatError $ CENoSndFileUser $ AgentSndFileId aFileId
@ -2328,7 +2389,7 @@ processAgentMsgSndFile _corrId aFileId msg =
(ft@FileTransferMeta {fileId, cancelled}, sfts) <- withStore $ \db -> do
fileId <- getXFTPSndFileDBId db user $ AgentSndFileId aFileId
getSndFileTransfer db user fileId
case msg of
withFileLock lockName fileId $ case msg of
SFPROG sndProgress sndTotal ->
unless cancelled $ do
let status = CIFSSndTransfer {sndProgress, sndTotal}
@ -2403,8 +2464,8 @@ processAgentMsgSndFile _corrId aFileId msg =
then pure msgDeliveryId
else sendParts (partNo + 1) partSize rest
processAgentMsgRcvFile :: forall m. ChatMonad m => ACorrId -> RcvFileId -> ACommand 'Agent 'AERcvFile -> m ()
processAgentMsgRcvFile _corrId aFileId msg =
processAgentMsgRcvFile :: forall m. ChatMonad m => ACorrId -> RcvFileId -> String -> ACommand 'Agent 'AERcvFile -> m ()
processAgentMsgRcvFile _corrId aFileId lockName msg =
withStore' (`getUserByARcvFileId` AgentRcvFileId aFileId) >>= \case
Just user -> process user `catchError` (toView . CRChatError (Just user))
_ -> throwChatError $ CENoRcvFileUser $ AgentRcvFileId aFileId
@ -2414,7 +2475,7 @@ processAgentMsgRcvFile _corrId aFileId msg =
ft@RcvFileTransfer {fileId, cancelled} <- withStore $ \db -> do
fileId <- getXFTPRcvFileDBId db $ AgentRcvFileId aFileId
getRcvFileTransfer db user fileId
case msg of
withFileLock lockName fileId $ case msg of
RFPROG rcvProgress rcvTotal ->
unless cancelled $ do
let status = CIFSRcvTransfer {rcvProgress, rcvTotal}
@ -2443,15 +2504,15 @@ processAgentMsgRcvFile _corrId aFileId msg =
agentXFTPDeleteRcvFile user aFileId fileId
throwChatError $ CEXFTPRcvFile fileId (AgentRcvFileId aFileId) e
processAgentMessageConn :: forall m. ChatMonad m => User -> ACorrId -> ConnId -> ACommand 'Agent 'AEConn -> m ()
processAgentMessageConn user _ agentConnId END =
processAgentMessageConn :: forall m. ChatMonad m => User -> ACorrId -> ConnId -> String -> ACommand 'Agent 'AEConn -> m ()
processAgentMessageConn user _ agentConnId _ END =
withStore (\db -> getConnectionEntity db user $ AgentConnId agentConnId) >>= \case
RcvDirectMsgConnection _ (Just ct@Contact {localDisplayName = c}) -> do
toView $ CRContactAnotherClient user ct
whenUserNtfs user $ showToast (c <> "> ") "connected to another client"
unsetActive $ ActiveC c
entity -> toView $ CRSubscriptionEnd user entity
processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do
processAgentMessageConn user@User {userId} corrId agentConnId lockName agentMessage = do
entity <- withStore (\db -> getConnectionEntity db user $ AgentConnId agentConnId) >>= updateConnStatus
case entity of
RcvDirectMsgConnection conn contact_ ->
@ -2486,7 +2547,7 @@ processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do
processDirectMessage :: ACommand 'Agent e -> ConnectionEntity -> Connection -> Maybe Contact -> m ()
processDirectMessage agentMsg connEntity conn@Connection {connId, viaUserContactLink, groupLinkId, customUserProfileId} = \case
Nothing -> case agentMsg of
Nothing -> withEntityLock lockName (CLConnection connId) $ case agentMsg of
CONF confId _ connInfo -> do
-- [incognito] send saved profile
incognitoProfile <- forM customUserProfileId $ \profileId -> withStore (\db -> getProfileById db userId profileId)
@ -2514,7 +2575,7 @@ processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do
when (corrId /= "") $ withCompletedCommand conn agentMsg $ \_cmdData -> pure ()
-- TODO add debugging output
_ -> pure ()
Just ct@Contact {localDisplayName = c, contactId} -> case agentMsg of
Just ct@Contact {localDisplayName = c, contactId} -> withGroupLock lockName contactId $ case agentMsg of
INV (ACR _ cReq) ->
-- [async agent commands] XGrpMemIntro continuation on receiving INV
withCompletedCommand conn agentMsg $ \_ ->
@ -2648,7 +2709,7 @@ processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do
_ -> pure ()
processGroupMessage :: ACommand 'Agent e -> ConnectionEntity -> Connection -> GroupInfo -> GroupMember -> m ()
processGroupMessage agentMsg connEntity conn@Connection {connId} gInfo@GroupInfo {groupId, localDisplayName = gName, groupProfile, membership, chatSettings} m = case agentMsg of
processGroupMessage agentMsg connEntity conn@Connection {connId} gInfo@GroupInfo {groupId, localDisplayName = gName, groupProfile, membership, chatSettings} m = withGroupLock lockName groupId $ case agentMsg of
INV (ACR _ cReq) ->
withCompletedCommand conn agentMsg $ \CommandData {cmdFunction} ->
case cReq of
@ -2841,7 +2902,7 @@ processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do
processSndFileConn :: ACommand 'Agent e -> ConnectionEntity -> Connection -> SndFileTransfer -> m ()
processSndFileConn agentMsg connEntity conn ft@SndFileTransfer {fileId, fileName, fileStatus} =
case agentMsg of
withFileLock lockName fileId $ case agentMsg of
-- SMP CONF for SndFileConnection happens for direct file protocol
-- when recipient of the file "joins" connection created by the sender
CONF confId _ connInfo -> do
@ -2889,7 +2950,7 @@ processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do
processRcvFileConn :: ACommand 'Agent e -> ConnectionEntity -> Connection -> RcvFileTransfer -> m ()
processRcvFileConn agentMsg connEntity conn ft@RcvFileTransfer {fileId, fileInvitation = FileInvitation {fileName}, grpMemberId} =
case agentMsg of
withFileLock lockName fileId $ case agentMsg of
INV (ACR _ cReq) ->
withCompletedCommand conn agentMsg $ \CommandData {cmdFunction} ->
case cReq of
@ -2976,7 +3037,7 @@ processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do
Nothing -> a
processUserContactRequest :: ACommand 'Agent e -> ConnectionEntity -> Connection -> UserContact -> m ()
processUserContactRequest agentMsg connEntity conn UserContact {userContactLinkId} = case agentMsg of
processUserContactRequest agentMsg connEntity conn UserContact {userContactLinkId} = withFileLock lockName userContactLinkId $ case agentMsg of
REQ invId _ connInfo -> do
ChatMessage {chatMsgEvent} <- parseChatMessage connInfo
case chatMsgEvent of

View File

@ -29,7 +29,6 @@ import qualified Data.ByteString.Char8 as B
import Data.Char (ord)
import Data.Int (Int64)
import Data.List.NonEmpty (NonEmpty)
import Data.Map.Strict (Map)
import Data.String
import Data.Text (Text)
import Data.Time (ZonedTime)
@ -155,8 +154,10 @@ data ChatController = ChatController
notifyQ :: TBQueue Notification,
sendNotification :: Notification -> IO (),
chatLock :: Lock,
sndFiles :: TVar (Map Int64 Handle),
rcvFiles :: TVar (Map Int64 Handle),
entityChatLocks :: TMap ChatLockEntity Lock,
entityLocks :: TVar Int,
sndFiles :: TMap Int64 Handle,
rcvFiles :: TMap Int64 Handle,
currentCalls :: TMap ContactId Call,
config :: ChatConfig,
filesFolder :: TVar (Maybe FilePath), -- path to files folder for mobile apps,
@ -171,6 +172,14 @@ data ChatController = ChatController
logFilePath :: Maybe FilePath
}
data ChatLockEntity
= CLConnection Int64
| CLContact ContactId
| CLGroup GroupId
| CLUserContact Int64
| CLFile Int64
deriving (Eq, Ord)
data HelpSection = HSMain | HSFiles | HSGroups | HSContacts | HSMyAddress | HSMarkdown | HSMessages | HSSettings | HSDatabase
deriving (Show, Generic)

View File

@ -49,7 +49,7 @@ extra-deps:
# - simplexmq-1.0.0@sha256:34b2004728ae396e3ae449cd090ba7410781e2b3cefc59259915f4ca5daa9ea8,8561
# - ../simplexmq
- github: simplex-chat/simplexmq
commit: 2b93e0b17d0556988885757e5b7305f6a1db65a7
commit: bd86d3b075e0178fc775af09e4b071a0555e73c5
- github: kazu-yamamoto/http2
commit: b5a1b7200cf5bc7044af34ba325284271f6dff25
# - ../direct-sqlcipher