core: improve queries performance; delay first chat item expiration cycle on start (#2521)
This commit is contained in:
parent
cc75b75d4e
commit
128883b8a3
@ -99,6 +99,7 @@ library
|
||||
Simplex.Chat.Migrations.M20230511_reactions
|
||||
Simplex.Chat.Migrations.M20230519_item_deleted_ts
|
||||
Simplex.Chat.Migrations.M20230526_indexes
|
||||
Simplex.Chat.Migrations.M20230529_indexes
|
||||
Simplex.Chat.Mobile
|
||||
Simplex.Chat.Mobile.WebRTC
|
||||
Simplex.Chat.Options
|
||||
|
@ -69,7 +69,7 @@ import Simplex.Messaging.Agent.Client (AgentStatsKey (..), temporaryAgentError)
|
||||
import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..), InitialAgentServers (..), createAgentStore, defaultAgentConfig)
|
||||
import Simplex.Messaging.Agent.Lock
|
||||
import Simplex.Messaging.Agent.Protocol
|
||||
import Simplex.Messaging.Agent.Store.SQLite (MigrationConfirmation (..), MigrationError, SQLiteStore (dbNew), execSQL, upMigration)
|
||||
import Simplex.Messaging.Agent.Store.SQLite (MigrationConfirmation (..), MigrationError, SQLiteStore (dbNew), execSQL, upMigration, withTransactionCtx)
|
||||
import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations
|
||||
import Simplex.Messaging.Client (defaultNetworkConfig)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
@ -222,7 +222,7 @@ cfgServers = \case
|
||||
startChatController :: forall m. ChatMonad' m => Bool -> Bool -> Bool -> m (Async ())
|
||||
startChatController subConns enableExpireCIs startXFTPWorkers = do
|
||||
asks smpAgent >>= resumeAgentClient
|
||||
users <- fromRight [] <$> runExceptT (withStore' getUsers)
|
||||
users <- fromRight [] <$> runExceptT (withStoreCtx' (Just "startChatController, getUsers") getUsers)
|
||||
restoreCalls
|
||||
s <- asks agentAsync
|
||||
readTVarIO s >>= maybe (start s users) (pure . fst)
|
||||
@ -254,7 +254,7 @@ startChatController subConns enableExpireCIs startXFTPWorkers = do
|
||||
_ -> pure ()
|
||||
startExpireCIs users =
|
||||
forM_ users $ \user -> do
|
||||
ttl <- fromRight Nothing <$> runExceptT (withStore' (`getChatItemTTL` user))
|
||||
ttl <- fromRight Nothing <$> runExceptT (withStoreCtx' (Just "startExpireCIs, getChatItemTTL") (`getChatItemTTL` user))
|
||||
forM_ ttl $ \_ -> do
|
||||
startExpireCIThread user
|
||||
setExpireCIFlag user True
|
||||
@ -279,14 +279,14 @@ startFilesToReceive users = do
|
||||
|
||||
startReceiveUserFiles :: forall m. ChatMonad m => User -> m ()
|
||||
startReceiveUserFiles user = do
|
||||
filesToReceive <- withStore' (`getRcvFilesToReceive` user)
|
||||
filesToReceive <- withStoreCtx' (Just "startReceiveUserFiles, getRcvFilesToReceive") (`getRcvFilesToReceive` user)
|
||||
forM_ filesToReceive $ \ft ->
|
||||
flip catchError (toView . CRChatError (Just user)) $
|
||||
toView =<< receiveFile' user ft Nothing Nothing
|
||||
|
||||
restoreCalls :: ChatMonad' m => m ()
|
||||
restoreCalls = do
|
||||
savedCalls <- fromRight [] <$> runExceptT (withStore' $ \db -> getCalls db)
|
||||
savedCalls <- fromRight [] <$> runExceptT (withStoreCtx' (Just "restoreCalls, getCalls") $ \db -> getCalls db)
|
||||
let callsMap = M.fromList $ map (\call@Call {contactId} -> (contactId, call)) savedCalls
|
||||
calls <- asks currentCalls
|
||||
atomically $ writeTVar calls callsMap
|
||||
@ -363,11 +363,11 @@ processChatCommand = \case
|
||||
withStore $ \db -> overwriteProtocolServers db user servers
|
||||
coupleDaysAgo t = (`addUTCTime` t) . fromInteger . negate . (+ (2 * day)) <$> randomRIO (0, day)
|
||||
day = 86400
|
||||
ListUsers -> CRUsersList <$> withStore' getUsersInfo
|
||||
ListUsers -> CRUsersList <$> withStoreCtx' (Just "ListUsers, getUsersInfo") getUsersInfo
|
||||
APISetActiveUser userId' viewPwd_ -> withUser $ \user -> do
|
||||
user' <- privateGetUser userId'
|
||||
validateUserPassword user user' viewPwd_
|
||||
withStore' $ \db -> setActiveUser db userId'
|
||||
withStoreCtx' (Just "APISetActiveUser, setActiveUser") $ \db -> setActiveUser db userId'
|
||||
setActive ActiveNone
|
||||
let user'' = user' {activeUser = True}
|
||||
asks currentUser >>= atomically . (`writeTVar` Just user'')
|
||||
@ -421,14 +421,14 @@ processChatCommand = \case
|
||||
APIActivateChat -> withUser $ \_ -> do
|
||||
restoreCalls
|
||||
withAgent foregroundAgent
|
||||
withStore' getUsers >>= void . forkIO . startFilesToReceive
|
||||
withStoreCtx' (Just "APIActivateChat, getUsers") getUsers >>= void . forkIO . startFilesToReceive
|
||||
setAllExpireCIFlags True
|
||||
ok_
|
||||
APISuspendChat t -> do
|
||||
setAllExpireCIFlags False
|
||||
withAgent (`suspendAgent` t)
|
||||
ok_
|
||||
ResubscribeAllConnections -> withStore' getUsers >>= subscribeUsers >> ok_
|
||||
ResubscribeAllConnections -> withStoreCtx' (Just "ResubscribeAllConnections, getUsers") getUsers >>= subscribeUsers >> ok_
|
||||
-- has to be called before StartChat
|
||||
SetTempFolder tf -> do
|
||||
createDirectoryIfMissing True tf
|
||||
@ -458,7 +458,7 @@ processChatCommand = \case
|
||||
ExecChatStoreSQL query -> CRSQLResult <$> withStore' (`execSQL` query)
|
||||
ExecAgentStoreSQL query -> CRSQLResult <$> withAgent (`execAgentStoreSQL` query)
|
||||
APIGetChats userId withPCC -> withUserId userId $ \user ->
|
||||
CRApiChats user <$> withStore' (\db -> getChatPreviews db user withPCC)
|
||||
CRApiChats user <$> withStoreCtx' (Just "APIGetChats, getChatPreviews") (\db -> getChatPreviews db user withPCC)
|
||||
APIGetChat (ChatRef cType cId) pagination search -> withUser $ \user -> case cType of
|
||||
-- TODO optimize queries calculating ChatStats, currently they're disabled
|
||||
CTDirect -> do
|
||||
@ -1063,7 +1063,7 @@ processChatCommand = \case
|
||||
SetChatItemTTL newTTL_ -> withUser' $ \User {userId} -> do
|
||||
processChatCommand $ APISetChatItemTTL userId newTTL_
|
||||
APIGetChatItemTTL userId -> withUserId userId $ \user -> do
|
||||
ttl <- withStore' (`getChatItemTTL` user)
|
||||
ttl <- withStoreCtx' (Just "APIGetChatItemTTL, getChatItemTTL") (`getChatItemTTL` user)
|
||||
pure $ CRChatItemTTL user ttl
|
||||
GetChatItemTTL -> withUser' $ \User {userId} -> do
|
||||
processChatCommand $ APIGetChatItemTTL userId
|
||||
@ -1220,7 +1220,7 @@ processChatCommand = \case
|
||||
DeleteMyAddress -> withUser $ \User {userId} ->
|
||||
processChatCommand $ APIDeleteMyAddress userId
|
||||
APIShowMyAddress userId -> withUserId userId $ \user ->
|
||||
CRUserContactLink user <$> withStore (`getUserAddress` user)
|
||||
CRUserContactLink user <$> withStoreCtx (Just "APIShowMyAddress, getUserAddress") (`getUserAddress` user)
|
||||
ShowMyAddress -> withUser $ \User {userId} ->
|
||||
processChatCommand $ APIShowMyAddress userId
|
||||
APISetProfileAddress userId False -> withUserId userId $ \user@User {profile = p} -> do
|
||||
@ -1936,12 +1936,14 @@ startExpireCIThread user@User {userId} = do
|
||||
_ -> pure ()
|
||||
where
|
||||
runExpireCIs = do
|
||||
delay <- asks (initialCleanupManagerDelay . config)
|
||||
liftIO $ threadDelay' delay
|
||||
interval <- asks $ ciExpirationInterval . config
|
||||
forever $ do
|
||||
flip catchError (toView . CRChatError (Just user)) $ do
|
||||
expireFlags <- asks expireCIFlags
|
||||
atomically $ TM.lookup userId expireFlags >>= \b -> unless (b == Just True) retry
|
||||
ttl <- withStore' (`getChatItemTTL` user)
|
||||
ttl <- withStoreCtx' (Just "startExpireCIThread, getChatItemTTL") (`getChatItemTTL` user)
|
||||
forM_ ttl $ \t -> expireChatItems user t False
|
||||
liftIO $ threadDelay' interval
|
||||
|
||||
@ -2065,11 +2067,11 @@ acceptFileReceive user@User {userId} RcvFileTransfer {fileId, xftpRcvFile, fileI
|
||||
(Nothing, Just connReq) -> do
|
||||
connIds <- joinAgentConnectionAsync user True connReq . directMessage $ XFileAcpt fName
|
||||
filePath <- getRcvFilePath fileId filePath_ fName True
|
||||
withStore $ \db -> acceptRcvFileTransfer db user fileId connIds ConnJoined filePath
|
||||
withStoreCtx (Just "acceptFileReceive, acceptRcvFileTransfer") $ \db -> acceptRcvFileTransfer db user fileId connIds ConnJoined filePath
|
||||
-- XFTP
|
||||
(Just _xftpRcvFile, _) -> do
|
||||
filePath <- getRcvFilePath fileId filePath_ fName False
|
||||
(ci, rfd) <- withStore $ \db -> do
|
||||
(ci, rfd) <- withStoreCtx (Just "acceptFileReceive, xftpAcceptRcvFT ...") $ \db -> do
|
||||
-- marking file as accepted and reading description in the same transaction
|
||||
-- to prevent race condition with appending description
|
||||
ci <- xftpAcceptRcvFT db user fileId filePath
|
||||
@ -2079,13 +2081,13 @@ acceptFileReceive user@User {userId} RcvFileTransfer {fileId, xftpRcvFile, fileI
|
||||
pure ci
|
||||
-- group & direct file protocol
|
||||
_ -> do
|
||||
chatRef <- withStore $ \db -> getChatRefByFileId db user fileId
|
||||
chatRef <- withStoreCtx (Just "acceptFileReceive, getChatRefByFileId") $ \db -> getChatRefByFileId db user fileId
|
||||
case (chatRef, grpMemberId) of
|
||||
(ChatRef CTDirect contactId, Nothing) -> do
|
||||
ct <- withStore $ \db -> getContact db user contactId
|
||||
ct <- withStoreCtx (Just "acceptFileReceive, getContact") $ \db -> getContact db user contactId
|
||||
acceptFile CFCreateConnFileInvDirect $ \msg -> void $ sendDirectContactMessage ct msg
|
||||
(ChatRef CTGroup groupId, Just memId) -> do
|
||||
GroupMember {activeConn} <- withStore $ \db -> getGroupMember db user groupId memId
|
||||
GroupMember {activeConn} <- withStoreCtx (Just "acceptFileReceive, getGroupMember") $ \db -> getGroupMember db user groupId memId
|
||||
case activeConn of
|
||||
Just conn -> do
|
||||
acceptFile CFCreateConnFileInvGroup $ \msg -> void $ sendDirectMessage conn msg $ GroupId groupId
|
||||
@ -2099,7 +2101,7 @@ acceptFileReceive user@User {userId} RcvFileTransfer {fileId, xftpRcvFile, fileI
|
||||
if
|
||||
| inline -> do
|
||||
-- accepting inline
|
||||
ci <- withStore $ \db -> acceptRcvInlineFT db user fileId filePath
|
||||
ci <- withStoreCtx (Just "acceptFile, acceptRcvInlineFT") $ \db -> acceptRcvInlineFT db user fileId filePath
|
||||
sharedMsgId <- withStore $ \db -> getSharedMsgIdByFileId db userId fileId
|
||||
send $ XFileAcptInv sharedMsgId Nothing fName
|
||||
pure ci
|
||||
@ -2107,7 +2109,7 @@ acceptFileReceive user@User {userId} RcvFileTransfer {fileId, xftpRcvFile, fileI
|
||||
| otherwise -> do
|
||||
-- accepting via a new connection
|
||||
connIds <- createAgentConnectionAsync user cmdFunction True SCMInvitation
|
||||
withStore $ \db -> acceptRcvFileTransfer db user fileId connIds ConnNew filePath
|
||||
withStoreCtx (Just "acceptFile, acceptRcvFileTransfer") $ \db -> acceptRcvFileTransfer db user fileId connIds ConnNew filePath
|
||||
receiveInline :: m Bool
|
||||
receiveInline = do
|
||||
ChatConfig {fileChunkSize, inlineFiles = InlineFilesConfig {receiveChunks, offerChunks}} <- asks config
|
||||
@ -2124,11 +2126,11 @@ receiveViaCompleteFD user fileId RcvFileDescr {fileDescrText, fileDescrComplete}
|
||||
rd <- parseFileDescription fileDescrText
|
||||
aFileId <- withAgent $ \a -> xftpReceiveFile a (aUserId user) rd
|
||||
startReceivingFile user fileId
|
||||
withStore' $ \db -> updateRcvFileAgentId db fileId (Just $ AgentRcvFileId aFileId)
|
||||
withStoreCtx' (Just "receiveViaCompleteFD, updateRcvFileAgentId") $ \db -> updateRcvFileAgentId db fileId (Just $ AgentRcvFileId aFileId)
|
||||
|
||||
startReceivingFile :: ChatMonad m => User -> FileTransferId -> m ()
|
||||
startReceivingFile user fileId = do
|
||||
ci <- withStore $ \db -> do
|
||||
ci <- withStoreCtx (Just "startReceivingFile, updateRcvFileStatus ...") $ \db -> do
|
||||
liftIO $ updateRcvFileStatus db fileId FSConnected
|
||||
liftIO $ updateCIFileStatus db user fileId $ CIFSRcvTransfer 0 1
|
||||
getChatItemByFileId db user fileId
|
||||
@ -2237,7 +2239,7 @@ agentSubscriber = do
|
||||
type AgentBatchSubscribe m = AgentClient -> [ConnId] -> ExceptT AgentErrorType m (Map ConnId (Either AgentErrorType ()))
|
||||
|
||||
subscribeUserConnections :: forall m. ChatMonad m => AgentBatchSubscribe m -> User -> m ()
|
||||
subscribeUserConnections agentBatchSubscribe user = do
|
||||
subscribeUserConnections agentBatchSubscribe user@User {userId} = do
|
||||
-- get user connections
|
||||
ce <- asks $ subscriptionEvents . config
|
||||
(ctConns, cts) <- getContactConns
|
||||
@ -2258,32 +2260,32 @@ subscribeUserConnections agentBatchSubscribe user = do
|
||||
where
|
||||
getContactConns :: m ([ConnId], Map ConnId Contact)
|
||||
getContactConns = do
|
||||
cts <- withStore_ getUserContacts
|
||||
cts <- withStore_ ("subscribeUserConnections " <> show userId <> ", getUserContacts") getUserContacts
|
||||
let connIds = map contactConnId cts
|
||||
pure (connIds, M.fromList $ zip connIds cts)
|
||||
getUserContactLinkConns :: m ([ConnId], Map ConnId UserContact)
|
||||
getUserContactLinkConns = do
|
||||
(cs, ucs) <- unzip <$> withStore_ getUserContactLinks
|
||||
(cs, ucs) <- unzip <$> withStore_ ("subscribeUserConnections " <> show userId <> ", getUserContactLinks") getUserContactLinks
|
||||
let connIds = map aConnId cs
|
||||
pure (connIds, M.fromList $ zip connIds ucs)
|
||||
getGroupMemberConns :: m ([Group], [ConnId], Map ConnId GroupMember)
|
||||
getGroupMemberConns = do
|
||||
gs <- withStore_ getUserGroups
|
||||
gs <- withStore_ ("subscribeUserConnections " <> show userId <> ", getUserGroups") getUserGroups
|
||||
let mPairs = concatMap (\(Group _ ms) -> mapMaybe (\m -> (,m) <$> memberConnId m) ms) gs
|
||||
pure (gs, map fst mPairs, M.fromList mPairs)
|
||||
getSndFileTransferConns :: m ([ConnId], Map ConnId SndFileTransfer)
|
||||
getSndFileTransferConns = do
|
||||
sfts <- withStore_ getLiveSndFileTransfers
|
||||
sfts <- withStore_ ("subscribeUserConnections " <> show userId <> ", getLiveSndFileTransfers") getLiveSndFileTransfers
|
||||
let connIds = map sndFileTransferConnId sfts
|
||||
pure (connIds, M.fromList $ zip connIds sfts)
|
||||
getRcvFileTransferConns :: m ([ConnId], Map ConnId RcvFileTransfer)
|
||||
getRcvFileTransferConns = do
|
||||
rfts <- withStore_ getLiveRcvFileTransfers
|
||||
rfts <- withStore_ ("subscribeUserConnections " <> show userId <> ", getLiveRcvFileTransfers") getLiveRcvFileTransfers
|
||||
let rftPairs = mapMaybe (\ft -> (,ft) <$> liveRcvFileTransferConnId ft) rfts
|
||||
pure (map fst rftPairs, M.fromList rftPairs)
|
||||
getPendingContactConns :: m ([ConnId], Map ConnId PendingContactConnection)
|
||||
getPendingContactConns = do
|
||||
pcs <- withStore_ getPendingContactConnections
|
||||
pcs <- withStore_ ("subscribeUserConnections " <> show userId <> ", getPendingContactConnections") getPendingContactConnections
|
||||
let connIds = map aConnId' pcs
|
||||
pure (connIds, M.fromList $ zip connIds pcs)
|
||||
contactSubsToView :: Map ConnId (Either AgentErrorType ()) -> Map ConnId Contact -> Bool -> m ()
|
||||
@ -2334,8 +2336,8 @@ subscribeUserConnections agentBatchSubscribe user = do
|
||||
rcvFileSubsToView rs = mapM_ (toView . uncurry (CRRcvFileSubError user)) . filterErrors . resultsFor rs
|
||||
pendingConnSubsToView :: Map ConnId (Either AgentErrorType ()) -> Map ConnId PendingContactConnection -> m ()
|
||||
pendingConnSubsToView rs = toView . CRPendingSubSummary user . map (uncurry PendingSubStatus) . resultsFor rs
|
||||
withStore_ :: (DB.Connection -> User -> IO [a]) -> m [a]
|
||||
withStore_ a = withStore' (`a` user) `catchError` \e -> toView (CRChatError (Just user) e) $> []
|
||||
withStore_ :: String -> (DB.Connection -> User -> IO [a]) -> m [a]
|
||||
withStore_ ctx a = withStoreCtx' (Just ctx) (`a` user) `catchError` \e -> toView (CRChatError (Just user) e) $> []
|
||||
filterErrors :: [(a, Maybe ChatError)] -> [(a, ChatError)]
|
||||
filterErrors = mapMaybe (\(a, e_) -> (a,) <$> e_)
|
||||
resultsFor :: Map ConnId (Either AgentErrorType ()) -> Map ConnId a -> [(a, Maybe ChatError)]
|
||||
@ -2358,7 +2360,7 @@ cleanupManager = do
|
||||
forever $ do
|
||||
flip catchError (toView . CRChatError Nothing) $ do
|
||||
waitChatStarted
|
||||
users <- withStore' getUsers
|
||||
users <- withStoreCtx' (Just "cleanupManager, getUsers 1") getUsers
|
||||
let (us, us') = partition activeUser users
|
||||
forM_ us $ cleanupUser interval
|
||||
forM_ us' $ cleanupUser interval
|
||||
@ -2367,7 +2369,7 @@ cleanupManager = do
|
||||
where
|
||||
runWithoutInitialDelay cleanupInterval = flip catchError (toView . CRChatError Nothing) $ do
|
||||
waitChatStarted
|
||||
users <- withStore' getUsers
|
||||
users <- withStoreCtx' (Just "cleanupManager, getUsers 2") getUsers
|
||||
let (us, us') = partition activeUser users
|
||||
forM_ us $ \u -> cleanupTimedItems cleanupInterval u `catchError` (toView . CRChatError (Just u))
|
||||
forM_ us' $ \u -> cleanupTimedItems cleanupInterval u `catchError` (toView . CRChatError (Just u))
|
||||
@ -2376,12 +2378,12 @@ cleanupManager = do
|
||||
cleanupTimedItems cleanupInterval user = do
|
||||
ts <- liftIO getCurrentTime
|
||||
let startTimedThreadCutoff = addUTCTime cleanupInterval ts
|
||||
timedItems <- withStore' $ \db -> getTimedItems db user startTimedThreadCutoff
|
||||
timedItems <- withStoreCtx' (Just "cleanupManager, getTimedItems") $ \db -> getTimedItems db user startTimedThreadCutoff
|
||||
forM_ timedItems $ \(itemRef, deleteAt) -> startTimedItemThread user itemRef deleteAt `catchError` const (pure ())
|
||||
cleanupMessages = do
|
||||
ts <- liftIO getCurrentTime
|
||||
let cutoffTs = addUTCTime (- (30 * nominalDay)) ts
|
||||
withStore' (`deleteOldMessages` cutoffTs)
|
||||
withStoreCtx' (Just "cleanupManager, deleteOldMessages") (`deleteOldMessages` cutoffTs)
|
||||
|
||||
startProximateTimedItemThread :: ChatMonad m => User -> (ChatRef, ChatItemId) -> UTCTime -> m ()
|
||||
startProximateTimedItemThread user itemRef deleteAt = do
|
||||
@ -2412,10 +2414,10 @@ deleteTimedItem user (ChatRef cType chatId, itemId) deleteAt = do
|
||||
waitChatStarted
|
||||
case cType of
|
||||
CTDirect -> do
|
||||
(ct, ci) <- withStore $ \db -> (,) <$> getContact db user chatId <*> getDirectChatItem db user chatId itemId
|
||||
(ct, ci) <- withStoreCtx (Just "deleteTimedItem, getContact ...") $ \db -> (,) <$> getContact db user chatId <*> getDirectChatItem db user chatId itemId
|
||||
deleteDirectCI user ct ci True True >>= toView
|
||||
CTGroup -> do
|
||||
(gInfo, ci) <- withStore $ \db -> (,) <$> getGroupInfo db user chatId <*> getGroupChatItem db user chatId itemId
|
||||
(gInfo, ci) <- withStoreCtx (Just "deleteTimedItem, getGroupInfo ...") $ \db -> (,) <$> getGroupInfo db user chatId <*> getGroupChatItem db user chatId itemId
|
||||
deletedTs <- liftIO getCurrentTime
|
||||
deleteGroupCI user gInfo ci True True Nothing deletedTs >>= toView
|
||||
_ -> toView . CRChatError (Just user) . ChatError $ CEInternalError "bad deleteTimedItem cType"
|
||||
@ -2433,9 +2435,9 @@ expireChatItems user@User {userId} ttl sync = do
|
||||
let expirationDate = addUTCTime (-1 * fromIntegral ttl) currentTs
|
||||
-- this is to keep group messages created during last 12 hours even if they're expired according to item_ts
|
||||
createdAtCutoff = addUTCTime (-43200 :: NominalDiffTime) currentTs
|
||||
contacts <- withStore' (`getUserContacts` user)
|
||||
contacts <- withStoreCtx' (Just "expireChatItems, getUserContacts") (`getUserContacts` user)
|
||||
loop contacts $ processContact expirationDate
|
||||
groups <- withStore' (`getUserGroupDetails` user)
|
||||
groups <- withStoreCtx' (Just "expireChatItems, getUserGroupDetails") (`getUserGroupDetails` user)
|
||||
loop groups $ processGroup expirationDate createdAtCutoff
|
||||
where
|
||||
loop :: [a] -> (a -> m ()) -> m ()
|
||||
@ -2453,16 +2455,16 @@ expireChatItems user@User {userId} ttl sync = do
|
||||
when (expire == Just True) $ threadDelay 100000 >> a
|
||||
processContact :: UTCTime -> Contact -> m ()
|
||||
processContact expirationDate ct = do
|
||||
filesInfo <- withStore' $ \db -> getContactExpiredFileInfo db user ct expirationDate
|
||||
filesInfo <- withStoreCtx' (Just "processContact, getContactExpiredFileInfo") $ \db -> getContactExpiredFileInfo db user ct expirationDate
|
||||
deleteFilesAndConns user filesInfo
|
||||
withStore' $ \db -> deleteContactExpiredCIs db user ct expirationDate
|
||||
withStoreCtx' (Just "processContact, deleteContactExpiredCIs") $ \db -> deleteContactExpiredCIs db user ct expirationDate
|
||||
processGroup :: UTCTime -> UTCTime -> GroupInfo -> m ()
|
||||
processGroup expirationDate createdAtCutoff gInfo = do
|
||||
filesInfo <- withStore' $ \db -> getGroupExpiredFileInfo db user gInfo expirationDate createdAtCutoff
|
||||
filesInfo <- withStoreCtx' (Just "processGroup, getGroupExpiredFileInfo") $ \db -> getGroupExpiredFileInfo db user gInfo expirationDate createdAtCutoff
|
||||
deleteFilesAndConns user filesInfo
|
||||
withStore' $ \db -> deleteGroupExpiredCIs db user gInfo expirationDate createdAtCutoff
|
||||
membersToDelete <- withStore' $ \db -> getGroupMembersForExpiration db user gInfo
|
||||
forM_ membersToDelete $ \m -> withStore' $ \db -> deleteGroupMember db user m
|
||||
withStoreCtx' (Just "processGroup, deleteGroupExpiredCIs") $ \db -> deleteGroupExpiredCIs db user gInfo expirationDate createdAtCutoff
|
||||
membersToDelete <- withStoreCtx' (Just "processGroup, getGroupMembersForExpiration") $ \db -> getGroupMembersForExpiration db user gInfo
|
||||
forM_ membersToDelete $ \m -> withStoreCtx' (Just "processGroup, deleteGroupMember") $ \db -> deleteGroupMember db user m
|
||||
|
||||
processAgentMessage :: forall m. ChatMonad m => ACorrId -> ConnId -> ACommand 'Agent 'AEConn -> m ()
|
||||
processAgentMessage _ connId (DEL_RCVQ srv qId err_) =
|
||||
@ -4423,13 +4425,13 @@ mkChatItem cd ciId content file quotedItem sharedMsgId itemTimed live itemTs cur
|
||||
deleteDirectCI :: ChatMonad m => User -> Contact -> CChatItem 'CTDirect -> Bool -> Bool -> m ChatResponse
|
||||
deleteDirectCI user ct ci@(CChatItem msgDir deletedItem@ChatItem {file}) byUser timed = do
|
||||
deleteCIFile user file
|
||||
withStore' $ \db -> deleteDirectChatItem db user ct ci
|
||||
withStoreCtx' (Just "deleteDirectCI, deleteDirectChatItem") $ \db -> deleteDirectChatItem db user ct ci
|
||||
pure $ CRChatItemDeleted user (AChatItem SCTDirect msgDir (DirectChat ct) deletedItem) Nothing byUser timed
|
||||
|
||||
deleteGroupCI :: ChatMonad m => User -> GroupInfo -> CChatItem 'CTGroup -> Bool -> Bool -> Maybe GroupMember -> UTCTime -> m ChatResponse
|
||||
deleteGroupCI user gInfo ci@(CChatItem msgDir deletedItem@ChatItem {file}) byUser timed byGroupMember_ deletedTs = do
|
||||
deleteCIFile user file
|
||||
toCi <- withStore' $ \db ->
|
||||
toCi <- withStoreCtx' (Just "deleteGroupCI, deleteGroupChatItem ...") $ \db ->
|
||||
case byGroupMember_ of
|
||||
Nothing -> deleteGroupChatItem db user gInfo ci $> Nothing
|
||||
Just m -> Just <$> updateGroupChatItemModerated db user gInfo ci m deletedTs
|
||||
@ -4713,11 +4715,20 @@ withStoreCtx' ctx_ action = withStoreCtx ctx_ $ liftIO . action
|
||||
withStoreCtx :: ChatMonad m => Maybe String -> (DB.Connection -> ExceptT StoreError IO a) -> m a
|
||||
withStoreCtx ctx_ action = do
|
||||
ChatController {chatStore} <- ask
|
||||
liftEitherError ChatErrorStore $
|
||||
withTransaction chatStore (runExceptT . action) `E.catch` handleInternal
|
||||
liftEitherError ChatErrorStore $ case ctx_ of
|
||||
Nothing -> withTransaction chatStore (runExceptT . action) `E.catch` handleInternal ""
|
||||
Just _ -> withTransaction chatStore (runExceptT . action) `E.catch` handleInternal ""
|
||||
-- uncomment to debug store performance
|
||||
-- Just ctx -> do
|
||||
-- t1 <- liftIO getCurrentTime
|
||||
-- putStrLn $ "withStoreCtx start :: " <> show t1 <> " :: " <> ctx
|
||||
-- r <- withTransactionCtx ctx_ chatStore (runExceptT . action) `E.catch` handleInternal (" (" <> ctx <> ")")
|
||||
-- t2 <- liftIO getCurrentTime
|
||||
-- putStrLn $ "withStoreCtx end :: " <> show t2 <> " :: " <> ctx <> " :: duration=" <> show (diffToMilliseconds $ diffUTCTime t2 t1)
|
||||
-- pure r
|
||||
where
|
||||
handleInternal :: E.SomeException -> IO (Either StoreError a)
|
||||
handleInternal e = pure . Left . SEInternalError $ show e <> maybe "" (\ctx -> " (" <> ctx <> ")") ctx_
|
||||
handleInternal :: String -> E.SomeException -> IO (Either StoreError a)
|
||||
handleInternal ctxStr e = pure . Left . SEInternalError $ show e <> ctxStr
|
||||
|
||||
chatCommandP :: Parser ChatCommand
|
||||
chatCommandP =
|
||||
|
30
src/Simplex/Chat/Migrations/M20230529_indexes.hs
Normal file
30
src/Simplex/Chat/Migrations/M20230529_indexes.hs
Normal file
@ -0,0 +1,30 @@
|
||||
{-# LANGUAGE QuasiQuotes #-}
|
||||
|
||||
module Simplex.Chat.Migrations.M20230529_indexes where
|
||||
|
||||
import Database.SQLite.Simple (Query)
|
||||
import Database.SQLite.Simple.QQ (sql)
|
||||
|
||||
m20230529_indexes :: Query
|
||||
m20230529_indexes =
|
||||
[sql|
|
||||
DROP INDEX idx_chat_items_timed_delete_at;
|
||||
|
||||
CREATE INDEX idx_chat_items_timed_delete_at ON chat_items(user_id, timed_delete_at);
|
||||
|
||||
CREATE INDEX idx_group_members_group_id ON group_members(user_id, group_id);
|
||||
|
||||
CREATE INDEX idx_msg_deliveries_agent_ack_cmd_id ON msg_deliveries(connection_id, agent_ack_cmd_id);
|
||||
|]
|
||||
|
||||
down_m20230529_indexes :: Query
|
||||
down_m20230529_indexes =
|
||||
[sql|
|
||||
DROP INDEX idx_msg_deliveries_agent_ack_cmd_id;
|
||||
|
||||
DROP INDEX idx_group_members_group_id;
|
||||
|
||||
DROP INDEX idx_chat_items_timed_delete_at;
|
||||
|
||||
CREATE INDEX idx_chat_items_timed_delete_at ON chat_items(timed_delete_at);
|
||||
|]
|
@ -522,7 +522,6 @@ CREATE UNIQUE INDEX idx_snd_files_last_inline_msg_delivery_id ON snd_files(
|
||||
CREATE INDEX idx_messages_connection_id ON messages(connection_id);
|
||||
CREATE INDEX idx_chat_items_group_member_id ON chat_items(group_member_id);
|
||||
CREATE INDEX idx_chat_items_contact_id ON chat_items(contact_id);
|
||||
CREATE INDEX idx_chat_items_timed_delete_at ON chat_items(timed_delete_at);
|
||||
CREATE INDEX idx_chat_items_item_status ON chat_items(item_status);
|
||||
CREATE INDEX idx_connections_group_member ON connections(
|
||||
user_id,
|
||||
@ -644,3 +643,12 @@ CREATE INDEX idx_messages_created_at ON messages(created_at);
|
||||
CREATE INDEX idx_chat_item_reactions_created_by_msg_id ON chat_item_reactions(
|
||||
created_by_msg_id
|
||||
);
|
||||
CREATE INDEX idx_chat_items_timed_delete_at ON chat_items(
|
||||
user_id,
|
||||
timed_delete_at
|
||||
);
|
||||
CREATE INDEX idx_group_members_group_id ON group_members(user_id, group_id);
|
||||
CREATE INDEX idx_msg_deliveries_agent_ack_cmd_id ON msg_deliveries(
|
||||
connection_id,
|
||||
agent_ack_cmd_id
|
||||
);
|
||||
|
@ -395,6 +395,7 @@ import Simplex.Chat.Migrations.M20230505_chat_item_versions
|
||||
import Simplex.Chat.Migrations.M20230511_reactions
|
||||
import Simplex.Chat.Migrations.M20230519_item_deleted_ts
|
||||
import Simplex.Chat.Migrations.M20230526_indexes
|
||||
import Simplex.Chat.Migrations.M20230529_indexes
|
||||
import Simplex.Chat.Protocol
|
||||
import Simplex.Chat.Types
|
||||
import Simplex.Chat.Util (week)
|
||||
@ -475,7 +476,8 @@ schemaMigrations =
|
||||
("20230505_chat_item_versions", m20230505_chat_item_versions, Just down_m20230505_chat_item_versions),
|
||||
("20230511_reactions", m20230511_reactions, Just down_m20230511_reactions),
|
||||
("20230519_item_deleted_ts", m20230519_item_deleted_ts, Just down_m20230519_item_deleted_ts),
|
||||
("20230526_indexes", m20230526_indexes, Just down_m20230526_indexes)
|
||||
("20230526_indexes", m20230526_indexes, Just down_m20230526_indexes),
|
||||
("20230529_indexes", m20230529_indexes, Just down_m20230529_indexes)
|
||||
]
|
||||
|
||||
-- | The list of migrations in ascending order by date
|
||||
|
@ -1309,7 +1309,7 @@ testUsersDifferentCIExpirationTTL tmp = do
|
||||
|
||||
alice #$> ("/_get chat @4 count=100", chat, [])
|
||||
where
|
||||
cfg = testCfg {ciExpirationInterval = 500000}
|
||||
cfg = testCfg {initialCleanupManagerDelay = 0, ciExpirationInterval = 500000}
|
||||
|
||||
testUsersRestartCIExpiration :: HasCallStack => FilePath -> IO ()
|
||||
testUsersRestartCIExpiration tmp = do
|
||||
@ -1392,7 +1392,7 @@ testUsersRestartCIExpiration tmp = do
|
||||
|
||||
alice #$> ("/_get chat @4 count=100", chat, [])
|
||||
where
|
||||
cfg = testCfg {ciExpirationInterval = 500000}
|
||||
cfg = testCfg {initialCleanupManagerDelay = 0, ciExpirationInterval = 500000}
|
||||
|
||||
testEnableCIExpirationOnlyForOneUser :: HasCallStack => FilePath -> IO ()
|
||||
testEnableCIExpirationOnlyForOneUser tmp = do
|
||||
@ -1463,7 +1463,7 @@ testEnableCIExpirationOnlyForOneUser tmp = do
|
||||
-- new messages are not deleted for second user
|
||||
alice #$> ("/_get chat @4 count=100", chat, chatFeatures <> [(1, "alisa 1"), (0, "alisa 2"), (1, "alisa 3"), (0, "alisa 4"), (1, "alisa 5"), (0, "alisa 6")])
|
||||
where
|
||||
cfg = testCfg {ciExpirationInterval = 500000}
|
||||
cfg = testCfg {initialCleanupManagerDelay = 0, ciExpirationInterval = 500000}
|
||||
|
||||
testDisableCIExpirationOnlyForOneUser :: HasCallStack => FilePath -> IO ()
|
||||
testDisableCIExpirationOnlyForOneUser tmp = do
|
||||
@ -1521,7 +1521,7 @@ testDisableCIExpirationOnlyForOneUser tmp = do
|
||||
-- second user messages are deleted
|
||||
alice #$> ("/_get chat @4 count=100", chat, [])
|
||||
where
|
||||
cfg = testCfg {ciExpirationInterval = 500000}
|
||||
cfg = testCfg {initialCleanupManagerDelay = 0, ciExpirationInterval = 500000}
|
||||
|
||||
testUsersTimedMessages :: HasCallStack => FilePath -> IO ()
|
||||
testUsersTimedMessages tmp = do
|
||||
|
@ -65,7 +65,12 @@ testSchemaMigrations = withTmpFiles $ do
|
||||
schema''' `shouldBe` schema'
|
||||
|
||||
skipComparisonForDownMigrations :: [String]
|
||||
skipComparisonForDownMigrations = ["20230504_recreate_msg_delivery_events_cleanup_messages"]
|
||||
skipComparisonForDownMigrations =
|
||||
[ -- on down migration msg_delivery_events table moves down to the end of the file
|
||||
"20230504_recreate_msg_delivery_events_cleanup_messages",
|
||||
-- on down migration idx_chat_items_timed_delete_at index moves down to the end of the file
|
||||
"20230529_indexes"
|
||||
]
|
||||
|
||||
getSchema :: FilePath -> FilePath -> IO String
|
||||
getSchema dpPath schemaPath = do
|
||||
|
Loading…
Reference in New Issue
Block a user