coordinate locks

This commit is contained in:
Evgeny Poberezkin 2023-04-17 09:45:01 +01:00
parent 2de1694f26
commit ab7c816539
2 changed files with 24 additions and 24 deletions

View File

@ -160,9 +160,9 @@ newChatController ChatDatabase {chatStore, agentStore} user cfg@ChatConfig {agen
inputQ <- newTBQueueIO tbqSize inputQ <- newTBQueueIO tbqSize
outputQ <- newTBQueueIO tbqSize outputQ <- newTBQueueIO tbqSize
notifyQ <- newTBQueueIO tbqSize notifyQ <- newTBQueueIO tbqSize
chatLock <- newEmptyTMVarIO chatLock <- atomically $ (,) <$> createLock <*> createLock
entityChatLocks <- atomically TM.empty entityLocks <- atomically TM.empty
entityLocks <- newTVarIO 0 entityLocksCount <- newTVarIO 0
sndFiles <- atomically TM.empty sndFiles <- atomically TM.empty
rcvFiles <- atomically TM.empty rcvFiles <- atomically TM.empty
currentCalls <- atomically TM.empty currentCalls <- atomically TM.empty
@ -190,8 +190,8 @@ newChatController ChatDatabase {chatStore, agentStore} user cfg@ChatConfig {agen
outputQ, outputQ,
notifyQ, notifyQ,
chatLock, chatLock,
entityChatLocks,
entityLocks, entityLocks,
entityLocksCount,
sndFiles, sndFiles,
rcvFiles, rcvFiles,
currentCalls, currentCalls,
@ -768,7 +768,7 @@ processChatCommand = \case
canDelete = isOwner || not (memberCurrent membership) canDelete = isOwner || not (memberCurrent membership)
unless canDelete $ throwChatError $ CEGroupUserRole gInfo GROwner unless canDelete $ throwChatError $ CEGroupUserRole gInfo GROwner
filesInfo <- withStore' $ \db -> getGroupFileInfo db user gInfo filesInfo <- withStore' $ \db -> getGroupFileInfo db user gInfo
withGroupLock "deleteChat group" chatId . procCmd $ do withFullChatLock "deleteChat group" . procCmd $ do
deleteFilesAndConns user filesInfo deleteFilesAndConns user filesInfo
when (memberActive membership && isOwner) . void $ sendGroupMessage user gInfo members XGrpDel when (memberActive membership && isOwner) . void $ sendGroupMessage user gInfo members XGrpDel
deleteGroupLinkIfExists user gInfo deleteGroupLinkIfExists user gInfo
@ -1507,7 +1507,7 @@ processChatCommand = \case
agentMigrations <- withAgent getAgentMigrations agentMigrations <- withAgent getAgentMigrations
pure $ CRVersionInfo {versionInfo, chatMigrations, agentMigrations} pure $ CRVersionInfo {versionInfo, chatMigrations, agentMigrations}
DebugLocks -> do DebugLocks -> do
chatLockName <- atomically . tryReadTMVar =<< asks chatLock chatLockName <- atomically . tryReadTMVar . fst =<< asks chatLock
agentLocks <- withAgent debugAgentLocks agentLocks <- withAgent debugAgentLocks
pure CRDebugLocks {chatLockName, agentLocks} pure CRDebugLocks {chatLockName, agentLocks}
GetAgentStats -> CRAgentStats . map stat <$> withAgent getAgentStats GetAgentStats -> CRAgentStats . map stat <$> withAgent getAgentStats
@ -1794,19 +1794,19 @@ processChatCommand = \case
withFullChatLock :: ChatMonad' m => String -> m a -> m a withFullChatLock :: ChatMonad' m => String -> m a -> m a
withFullChatLock name action = do withFullChatLock name action = do
l <- asks chatLock (l1, l2) <- asks chatLock
count <- asks entityLocks count <- asks entityLocksCount
withGetLock (waitForEntityLocks count $> l) name action withLock l1 name $ withGetLock (waitForEntityLocks count $> l2) name action
where where
waitForEntityLocks count = readTVar count >>= \n -> when (n > 0) retry waitForEntityLocks count = readTVar count >>= \n -> when (n > 0) retry
withEntityLock :: ChatMonad' m => String -> ChatLockEntity -> m a -> m a withEntityLock :: ChatMonad' m => String -> ChatLockEntity -> m a -> m a
withEntityLock name entity action = do withEntityLock name entity action = do
l <- asks chatLock (l1, _) <- asks chatLock
ls <- asks entityChatLocks ls <- asks entityLocks
count <- asks entityLocks count <- asks entityLocksCount
E.bracket_ E.bracket_
(atomically $ waitForLock l >> modifyTVar' count (+ 1)) (atomically $ waitForLock l1 >> modifyTVar' count (+ 1))
(atomically $ modifyTVar' count $ \n -> max 0 (n - 1)) (atomically $ modifyTVar' count $ \n -> max 0 (n - 1))
(withLockMap ls entity name action) (withLockMap ls entity name action)
@ -2592,7 +2592,7 @@ processAgentMessageConn user@User {userId} corrId agentConnId lockName agentMess
withAckMessage agentConnId cmdId msgMeta $ do withAckMessage agentConnId cmdId msgMeta $ do
msg@RcvMessage {chatMsgEvent = ACME _ event} <- saveRcvMSG conn (ConnectionId connId) msgMeta msgBody cmdId msg@RcvMessage {chatMsgEvent = ACME _ event} <- saveRcvMSG conn (ConnectionId connId) msgMeta msgBody cmdId
assertDirectAllowed user MDRcv ct $ toCMEventTag event assertDirectAllowed user MDRcv ct $ toCMEventTag event
updateChatLock "directMessage" event -- updateChatLock "directMessage" event
case event of case event of
XMsgNew mc -> newContentMessage ct mc msg msgMeta XMsgNew mc -> newContentMessage ct mc msg msgMeta
XMsgFileDescr sharedMsgId fileDescr -> messageFileDescription ct sharedMsgId fileDescr msgMeta XMsgFileDescr sharedMsgId fileDescr -> messageFileDescription ct sharedMsgId fileDescr msgMeta
@ -2822,7 +2822,7 @@ processAgentMessageConn user@User {userId} corrId agentConnId lockName agentMess
cmdId <- createAckCmd conn cmdId <- createAckCmd conn
withAckMessage agentConnId cmdId msgMeta $ do withAckMessage agentConnId cmdId msgMeta $ do
msg@RcvMessage {chatMsgEvent = ACME _ event} <- saveRcvMSG conn (GroupId groupId) msgMeta msgBody cmdId msg@RcvMessage {chatMsgEvent = ACME _ event} <- saveRcvMSG conn (GroupId groupId) msgMeta msgBody cmdId
updateChatLock "groupMessage" event -- updateChatLock "groupMessage" event
case event of case event of
XMsgNew mc -> canSend $ newGroupContentMessage gInfo m mc msg msgMeta XMsgNew mc -> canSend $ newGroupContentMessage gInfo m mc msg msgMeta
XMsgFileDescr sharedMsgId fileDescr -> canSend $ groupMessageFileDescription gInfo m sharedMsgId fileDescr msgMeta XMsgFileDescr sharedMsgId fileDescr -> canSend $ groupMessageFileDescription gInfo m sharedMsgId fileDescr msgMeta
@ -3088,12 +3088,12 @@ processAgentMessageConn user@User {userId} corrId agentConnId lockName agentMess
toView $ CRConnectionDisabled connEntity toView $ CRConnectionDisabled connEntity
_ -> pure () _ -> pure ()
updateChatLock :: MsgEncodingI enc => String -> ChatMsgEvent enc -> m () -- updateChatLock :: MsgEncodingI enc => String -> ChatMsgEvent enc -> m ()
updateChatLock name event = do -- updateChatLock name event = do
l <- asks chatLock -- l <- asks chatLock
atomically $ tryReadTMVar l >>= mapM_ (swapTMVar l . (<> s)) -- atomically $ tryReadTMVar l >>= mapM_ (swapTMVar l . (<> s))
where -- where
s = " " <> name <> "=" <> B.unpack (strEncode $ toCMEventTag event) -- s = " " <> name <> "=" <> B.unpack (strEncode $ toCMEventTag event)
withCompletedCommand :: forall e. AEntityI e => Connection -> ACommand 'Agent e -> (CommandData -> m ()) -> m () withCompletedCommand :: forall e. AEntityI e => Connection -> ACommand 'Agent e -> (CommandData -> m ()) -> m ()
withCompletedCommand Connection {connId} agentMsg action = do withCompletedCommand Connection {connId} agentMsg action = do

View File

@ -153,9 +153,9 @@ data ChatController = ChatController
outputQ :: TBQueue (Maybe CorrId, ChatResponse), outputQ :: TBQueue (Maybe CorrId, ChatResponse),
notifyQ :: TBQueue Notification, notifyQ :: TBQueue Notification,
sendNotification :: Notification -> IO (), sendNotification :: Notification -> IO (),
chatLock :: Lock, chatLock :: (Lock, Lock),
entityChatLocks :: TMap ChatLockEntity Lock, entityLocks :: TMap ChatLockEntity Lock,
entityLocks :: TVar Int, entityLocksCount :: TVar Int,
sndFiles :: TMap Int64 Handle, sndFiles :: TMap Int64 Handle,
rcvFiles :: TMap Int64 Handle, rcvFiles :: TMap Int64 Handle,
currentCalls :: TMap ContactId Call, currentCalls :: TMap ContactId Call,