core: optimize bulk chat item deletion 2 (#1172)
This commit is contained in:
parent
22edd92079
commit
135bdf3842
@ -53,6 +53,7 @@ library
|
||||
Simplex.Chat.Migrations.M20220928_settings
|
||||
Simplex.Chat.Migrations.M20221001_shared_msg_id_indices
|
||||
Simplex.Chat.Migrations.M20221003_delete_broken_integrity_error_chat_items
|
||||
Simplex.Chat.Migrations.M20221004_idx_msg_deliveries_message_id
|
||||
Simplex.Chat.Mobile
|
||||
Simplex.Chat.Options
|
||||
Simplex.Chat.ProfileGenerator
|
||||
|
@ -39,7 +39,7 @@ import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (fromMaybe, isJust, isNothing, mapMaybe)
|
||||
import Data.Text (Text)
|
||||
import qualified Data.Text as T
|
||||
import Data.Time (addUTCTime)
|
||||
import Data.Time (NominalDiffTime, addUTCTime)
|
||||
import Data.Time.Clock (UTCTime, diffUTCTime, getCurrentTime, nominalDiffTimeToSeconds)
|
||||
import Data.Time.Clock.System (SystemTime, systemToUTCTime)
|
||||
import Data.Time.LocalTime (getCurrentTimeZone, getZonedTime)
|
||||
@ -449,7 +449,7 @@ processChatCommand = \case
|
||||
deleteCIFile :: MsgDirectionI d => User -> Maybe (CIFile d) -> m ()
|
||||
deleteCIFile user file =
|
||||
forM_ file $ \CIFile {fileId, filePath, fileStatus} -> do
|
||||
let fileInfo = CIFileInfo {fileId, fileStatus = AFS msgDirection fileStatus, filePath}
|
||||
let fileInfo = CIFileInfo {fileId, fileStatus = Just $ AFS msgDirection fileStatus, filePath}
|
||||
deleteFile user fileInfo
|
||||
APIChatRead (ChatRef cType chatId) fromToIds -> withChatLock $ case cType of
|
||||
CTDirect -> withStore' (\db -> updateDirectChatItemsRead db chatId fromToIds) $> CRCmdOk
|
||||
@ -1123,17 +1123,18 @@ setExpireCIs b = do
|
||||
atomically $ writeTVar expire b
|
||||
|
||||
deleteFile :: forall m. ChatMonad m => User -> CIFileInfo -> m ()
|
||||
deleteFile user CIFileInfo {filePath, fileId, fileStatus = (AFS dir status)} =
|
||||
deleteFile user CIFileInfo {filePath, fileId, fileStatus} =
|
||||
cancel' >> delete
|
||||
where
|
||||
cancel' = unless (ciFileEnded status) $
|
||||
case dir of
|
||||
SMDSnd -> do
|
||||
(ftm@FileTransferMeta {cancelled}, fts) <- withStore (\db -> getSndFileTransfer db user fileId)
|
||||
unless cancelled $ cancelSndFile user ftm fts
|
||||
SMDRcv -> do
|
||||
ft@RcvFileTransfer {cancelled} <- withStore (\db -> getRcvFileTransfer db user fileId)
|
||||
unless cancelled $ cancelRcvFileTransfer user ft
|
||||
cancel' = forM_ fileStatus $ \(AFS dir status) ->
|
||||
unless (ciFileEnded status) $
|
||||
case dir of
|
||||
SMDSnd -> do
|
||||
(ftm@FileTransferMeta {cancelled}, fts) <- withStore (\db -> getSndFileTransfer db user fileId)
|
||||
unless cancelled $ cancelSndFile user ftm fts
|
||||
SMDRcv -> do
|
||||
ft@RcvFileTransfer {cancelled} <- withStore (\db -> getRcvFileTransfer db user fileId)
|
||||
unless cancelled $ cancelRcvFileTransfer user ft
|
||||
delete = withFilesFolder $ \filesFolder ->
|
||||
forM_ filePath $ \fPath -> do
|
||||
let fsFilePath = filesFolder <> "/" <> fPath
|
||||
@ -1400,15 +1401,40 @@ expireChatItems :: forall m. ChatMonad m => User -> Int64 -> Bool -> m ()
|
||||
expireChatItems user ttl sync = do
|
||||
currentTs <- liftIO getCurrentTime
|
||||
let expirationDate = addUTCTime (-1 * fromIntegral ttl) currentTs
|
||||
-- this is to keep group messages created during last 12 hours even if they're expired according to item_ts
|
||||
createdAtCutoff = addUTCTime (-43200 :: NominalDiffTime) currentTs
|
||||
expire <- asks expireCIs
|
||||
filesInfo <- withStore' $ \db -> getExpiredFileInfo db user expirationDate
|
||||
loop filesInfo expirationDate expire
|
||||
contacts <- withStore' (`getUserContacts` user)
|
||||
contactsLoop contacts expirationDate expire
|
||||
groups <- withStore' (`getUserGroupDetails` user)
|
||||
groupsLoop groups expirationDate createdAtCutoff expire
|
||||
where
|
||||
loop :: [CIFileInfo] -> UTCTime -> TVar Bool -> m ()
|
||||
loop [] expirationDate expire = continue expire $ withStore' (\db -> deleteExpiredCIs db user expirationDate)
|
||||
loop (fileInfo : filesInfo) expirationDate expire = continue expire $ do
|
||||
deleteFile user fileInfo
|
||||
loop filesInfo expirationDate expire
|
||||
contactsLoop :: [Contact] -> UTCTime -> TVar Bool -> m ()
|
||||
contactsLoop [] _ _ = pure ()
|
||||
contactsLoop (ct : cts) expirationDate expire = continue expire $ do
|
||||
filesInfo <- withStore' $ \db -> getContactExpiredFileInfo db user ct expirationDate
|
||||
maxItemTs_ <- withStore' $ \db -> getContactMaxItemTs db user ct
|
||||
forM_ filesInfo $ \fileInfo -> deleteFile user fileInfo
|
||||
withStore' $ \db -> deleteContactExpiredCIs db user ct expirationDate
|
||||
withStore' $ \db -> do
|
||||
ciCount_ <- getContactCICount db user ct
|
||||
case (maxItemTs_, ciCount_) of
|
||||
(Just ts, Just count) -> when (count == 0) $ updateContactTs db user ct ts
|
||||
_ -> pure ()
|
||||
contactsLoop cts expirationDate expire
|
||||
groupsLoop :: [GroupInfo] -> UTCTime -> UTCTime -> TVar Bool -> m ()
|
||||
groupsLoop [] _ _ _ = pure ()
|
||||
groupsLoop (gInfo : gInfos) expirationDate createdAtCutoff expire = continue expire $ do
|
||||
filesInfo <- withStore' $ \db -> getGroupExpiredFileInfo db user gInfo expirationDate createdAtCutoff
|
||||
maxItemTs_ <- withStore' $ \db -> getGroupMaxItemTs db user gInfo
|
||||
forM_ filesInfo $ \fileInfo -> deleteFile user fileInfo
|
||||
withStore' $ \db -> deleteGroupExpiredCIs db user gInfo expirationDate createdAtCutoff
|
||||
withStore' $ \db -> do
|
||||
ciCount_ <- getGroupCICount db user gInfo
|
||||
case (maxItemTs_, ciCount_) of
|
||||
(Just ts, Just count) -> when (count == 0) $ updateGroupTs db user gInfo ts
|
||||
_ -> pure ()
|
||||
groupsLoop gInfos expirationDate createdAtCutoff expire
|
||||
continue :: TVar Bool -> m () -> m ()
|
||||
continue expire = if sync then id else \a -> whenM (readTVarIO expire) $ threadDelay 100000 >> a
|
||||
|
||||
|
@ -395,7 +395,7 @@ instance StrEncoding ACIFileStatus where
|
||||
-- to conveniently read file data from db
|
||||
data CIFileInfo = CIFileInfo
|
||||
{ fileId :: Int64,
|
||||
fileStatus :: ACIFileStatus,
|
||||
fileStatus :: Maybe ACIFileStatus,
|
||||
filePath :: Maybe FilePath
|
||||
}
|
||||
deriving (Show)
|
||||
|
@ -0,0 +1,12 @@
|
||||
{-# LANGUAGE QuasiQuotes #-}
|
||||
|
||||
module Simplex.Chat.Migrations.M20221004_idx_msg_deliveries_message_id where
|
||||
|
||||
import Database.SQLite.Simple (Query)
|
||||
import Database.SQLite.Simple.QQ (sql)
|
||||
|
||||
m20221004_idx_msg_deliveries_message_id :: Query
|
||||
m20221004_idx_msg_deliveries_message_id =
|
||||
[sql|
|
||||
CREATE INDEX idx_msg_deliveries_message_id ON msg_deliveries(message_id);
|
||||
|]
|
@ -426,3 +426,4 @@ CREATE UNIQUE INDEX idx_chat_items_group_shared_msg_id ON chat_items(
|
||||
group_member_id,
|
||||
shared_msg_id
|
||||
);
|
||||
CREATE INDEX idx_msg_deliveries_message_id ON msg_deliveries(message_id);
|
||||
|
@ -194,11 +194,12 @@ module Simplex.Chat.Store
|
||||
getXGrpMemIntroContGroup,
|
||||
getChatItemTTL,
|
||||
setChatItemTTL,
|
||||
getExpiredFileInfo,
|
||||
deleteExpiredCIs,
|
||||
getChatsWithExpiredItems,
|
||||
getContactExpiredCIs,
|
||||
getGroupExpiredCIs,
|
||||
getContactExpiredFileInfo,
|
||||
deleteContactExpiredCIs,
|
||||
getContactCICount,
|
||||
getGroupExpiredFileInfo,
|
||||
deleteGroupExpiredCIs,
|
||||
getGroupCICount,
|
||||
getPendingContactConnection,
|
||||
deletePendingContactConnection,
|
||||
updateContactSettings,
|
||||
@ -224,7 +225,7 @@ import Data.Functor (($>))
|
||||
import Data.Int (Int64)
|
||||
import Data.List (find, sortBy, sortOn)
|
||||
import Data.List.NonEmpty (NonEmpty)
|
||||
import Data.Maybe (fromMaybe, isJust, listToMaybe, mapMaybe)
|
||||
import Data.Maybe (fromMaybe, isJust, listToMaybe)
|
||||
import Data.Ord (Down (..))
|
||||
import Data.Text (Text)
|
||||
import qualified Data.Text as T
|
||||
@ -263,6 +264,7 @@ import Simplex.Chat.Migrations.M20220926_connection_alias
|
||||
import Simplex.Chat.Migrations.M20220928_settings
|
||||
import Simplex.Chat.Migrations.M20221001_shared_msg_id_indices
|
||||
import Simplex.Chat.Migrations.M20221003_delete_broken_integrity_error_chat_items
|
||||
import Simplex.Chat.Migrations.M20221004_idx_msg_deliveries_message_id
|
||||
import Simplex.Chat.Protocol
|
||||
import Simplex.Chat.Types
|
||||
import Simplex.Messaging.Agent.Protocol (ACorrId, AgentMsgId, ConnId, InvitationId, MsgMeta (..))
|
||||
@ -301,7 +303,8 @@ schemaMigrations =
|
||||
("20220926_connection_alias", m20220926_connection_alias),
|
||||
("20220928_settings", m20220928_settings),
|
||||
("20221001_shared_msg_id_indices", m20221001_shared_msg_id_indices),
|
||||
("20221003_delete_broken_integrity_error_chat_items", m20221003_delete_broken_integrity_error_chat_items)
|
||||
("20221003_delete_broken_integrity_error_chat_items", m20221003_delete_broken_integrity_error_chat_items),
|
||||
("20221004_idx_msg_deliveries_message_id", m20221004_idx_msg_deliveries_message_id)
|
||||
]
|
||||
|
||||
-- | The list of migrations in ascending order by date
|
||||
@ -2456,7 +2459,7 @@ getContactFileInfo db User {userId} Contact {contactId} =
|
||||
|]
|
||||
(userId, contactId)
|
||||
|
||||
toFileInfo :: (Int64, ACIFileStatus, Maybe FilePath) -> CIFileInfo
|
||||
toFileInfo :: (Int64, Maybe ACIFileStatus, Maybe FilePath) -> CIFileInfo
|
||||
toFileInfo (fileId, fileStatus, filePath) = CIFileInfo {fileId, fileStatus, filePath}
|
||||
|
||||
getContactMaxItemTs :: DB.Connection -> User -> Contact -> IO (Maybe UTCTime)
|
||||
@ -2465,21 +2468,16 @@ getContactMaxItemTs db User {userId} Contact {contactId} =
|
||||
DB.query db "SELECT MAX(item_ts) FROM chat_items WHERE user_id = ? AND contact_id = ?" (userId, contactId)
|
||||
|
||||
deleteContactCIs :: DB.Connection -> User -> Contact -> IO ()
|
||||
deleteContactCIs db User {userId} Contact {contactId} = do
|
||||
deleteContactCIsMessages_
|
||||
deleteContactCIs db user@User {userId} ct@Contact {contactId} = do
|
||||
connIds <- getContactConnIds_ db user ct
|
||||
forM_ connIds $ \connId ->
|
||||
DB.execute db "DELETE FROM messages WHERE connection_id = ?" (Only connId)
|
||||
DB.execute db "DELETE FROM chat_items WHERE user_id = ? AND contact_id = ?" (userId, contactId)
|
||||
where
|
||||
deleteContactCIsMessages_ =
|
||||
DB.execute
|
||||
db
|
||||
[sql|
|
||||
DELETE FROM messages WHERE message_id IN (
|
||||
SELECT message_id FROM chat_item_messages WHERE chat_item_id IN (
|
||||
SELECT chat_item_id FROM chat_items WHERE user_id = ? AND contact_id = ?
|
||||
)
|
||||
)
|
||||
|]
|
||||
(userId, contactId)
|
||||
|
||||
getContactConnIds_ :: DB.Connection -> User -> Contact -> IO [Int64]
|
||||
getContactConnIds_ db User {userId} Contact {contactId} =
|
||||
map fromOnly
|
||||
<$> DB.query db "SELECT connection_id FROM connections WHERE user_id = ? AND contact_id = ?" (userId, contactId)
|
||||
|
||||
updateContactTs :: DB.Connection -> User -> Contact -> UTCTime -> IO ()
|
||||
updateContactTs db User {userId} Contact {contactId} updatedAt =
|
||||
@ -2508,20 +2506,8 @@ getGroupMaxItemTs db User {userId} GroupInfo {groupId} =
|
||||
|
||||
deleteGroupCIs :: DB.Connection -> User -> GroupInfo -> IO ()
|
||||
deleteGroupCIs db User {userId} GroupInfo {groupId} = do
|
||||
deleteGroupCIsMessages_
|
||||
DB.execute db "DELETE FROM messages WHERE group_id = ?" (Only groupId)
|
||||
DB.execute db "DELETE FROM chat_items WHERE user_id = ? AND group_id = ?" (userId, groupId)
|
||||
where
|
||||
deleteGroupCIsMessages_ =
|
||||
DB.execute
|
||||
db
|
||||
[sql|
|
||||
DELETE FROM messages WHERE message_id IN (
|
||||
SELECT message_id FROM chat_item_messages WHERE chat_item_id IN (
|
||||
SELECT chat_item_id FROM chat_items WHERE user_id = ? AND group_id = ?
|
||||
)
|
||||
)
|
||||
|]
|
||||
(userId, groupId)
|
||||
|
||||
updateGroupTs :: DB.Connection -> User -> GroupInfo -> UTCTime -> IO ()
|
||||
updateGroupTs db User {userId} GroupInfo {groupId} updatedAt =
|
||||
@ -4107,8 +4093,8 @@ setChatItemTTL db User {userId} chatItemTTL = do
|
||||
"INSERT INTO settings (user_id, chat_item_ttl, created_at, updated_at) VALUES (?,?,?,?)"
|
||||
(userId, chatItemTTL, currentTs, currentTs)
|
||||
|
||||
getExpiredFileInfo :: DB.Connection -> User -> UTCTime -> IO [CIFileInfo]
|
||||
getExpiredFileInfo db User {userId} expirationDate =
|
||||
getContactExpiredFileInfo :: DB.Connection -> User -> Contact -> UTCTime -> IO [CIFileInfo]
|
||||
getContactExpiredFileInfo db User {userId} Contact {contactId} expirationDate =
|
||||
map toFileInfo
|
||||
<$> DB.query
|
||||
db
|
||||
@ -4116,79 +4102,44 @@ getExpiredFileInfo db User {userId} expirationDate =
|
||||
SELECT f.file_id, f.ci_file_status, f.file_path
|
||||
FROM chat_items i
|
||||
JOIN files f ON f.chat_item_id = i.chat_item_id
|
||||
WHERE i.user_id = ? AND i.item_ts <= ?
|
||||
|]
|
||||
(userId, expirationDate)
|
||||
|
||||
deleteExpiredCIs :: DB.Connection -> User -> UTCTime -> IO ()
|
||||
deleteExpiredCIs db User {userId} expirationDate = do
|
||||
deleteExpiredCIsMessages_
|
||||
DB.execute db "DELETE FROM chat_items WHERE user_id = ? AND item_ts <= ?" (userId, expirationDate)
|
||||
where
|
||||
deleteExpiredCIsMessages_ =
|
||||
DB.execute
|
||||
db
|
||||
[sql|
|
||||
DELETE FROM messages WHERE message_id IN (
|
||||
SELECT message_id FROM chat_item_messages WHERE chat_item_id IN (
|
||||
SELECT chat_item_id FROM chat_items WHERE user_id = ? AND item_ts <= ?
|
||||
)
|
||||
)
|
||||
|]
|
||||
(userId, expirationDate)
|
||||
|
||||
getChatsWithExpiredItems :: DB.Connection -> User -> UTCTime -> IO [ChatRef]
|
||||
getChatsWithExpiredItems db User {userId} expirationDate =
|
||||
mapMaybe toChatRef
|
||||
<$> DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT contact_id, group_id
|
||||
FROM chat_items
|
||||
WHERE user_id = ? AND item_ts <= ?
|
||||
GROUP BY contact_id, group_id
|
||||
ORDER BY contact_id ASC, group_id ASC
|
||||
|]
|
||||
(userId, expirationDate)
|
||||
where
|
||||
toChatRef :: (Maybe ContactId, Maybe GroupId) -> Maybe ChatRef
|
||||
toChatRef (Just contactId, Nothing) = Just $ ChatRef CTDirect contactId
|
||||
toChatRef (Nothing, Just groupId) = Just $ ChatRef CTGroup groupId
|
||||
toChatRef _ = Nothing
|
||||
|
||||
getContactExpiredCIs :: DB.Connection -> User -> ContactId -> UTCTime -> IO [(ChatItemId, Maybe CIFileInfo)]
|
||||
getContactExpiredCIs db User {userId} contactId expirationDate =
|
||||
map toItemIdAndFileInfo'
|
||||
<$> DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT i.chat_item_id, f.file_id, f.ci_file_status, f.file_path
|
||||
FROM chat_items i
|
||||
LEFT JOIN files f ON f.chat_item_id = i.chat_item_id
|
||||
WHERE i.user_id = ? AND i.contact_id = ? AND i.item_ts <= ?
|
||||
ORDER BY i.item_ts ASC
|
||||
WHERE i.user_id = ? AND i.contact_id = ? AND i.created_at <= ?
|
||||
|]
|
||||
(userId, contactId, expirationDate)
|
||||
|
||||
getGroupExpiredCIs :: DB.Connection -> User -> Int64 -> UTCTime -> IO [(ChatItemId, Maybe CIFileInfo)]
|
||||
getGroupExpiredCIs db User {userId} groupId expirationDate =
|
||||
map toItemIdAndFileInfo'
|
||||
deleteContactExpiredCIs :: DB.Connection -> User -> Contact -> UTCTime -> IO ()
|
||||
deleteContactExpiredCIs db user@User {userId} ct@Contact {contactId} expirationDate = do
|
||||
connIds <- getContactConnIds_ db user ct
|
||||
forM_ connIds $ \connId ->
|
||||
DB.execute db "DELETE FROM messages WHERE connection_id = ? AND created_at <= ?" (connId, expirationDate)
|
||||
DB.execute db "DELETE FROM chat_items WHERE user_id = ? AND contact_id = ? AND created_at <= ?" (userId, contactId, expirationDate)
|
||||
|
||||
getContactCICount :: DB.Connection -> User -> Contact -> IO (Maybe Int64)
|
||||
getContactCICount db User {userId} Contact {contactId} =
|
||||
fmap join . maybeFirstRow fromOnly $
|
||||
DB.query db "SELECT COUNT(1) FROM chat_items WHERE user_id = ? AND contact_id = ?" (userId, contactId)
|
||||
|
||||
getGroupExpiredFileInfo :: DB.Connection -> User -> GroupInfo -> UTCTime -> UTCTime -> IO [CIFileInfo]
|
||||
getGroupExpiredFileInfo db User {userId} GroupInfo {groupId} expirationDate createdAtCutoff =
|
||||
map toFileInfo
|
||||
<$> DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT i.chat_item_id, f.file_id, f.ci_file_status, f.file_path
|
||||
SELECT f.file_id, f.ci_file_status, f.file_path
|
||||
FROM chat_items i
|
||||
LEFT JOIN files f ON f.chat_item_id = i.chat_item_id
|
||||
WHERE i.user_id = ? AND i.group_id = ? AND i.item_ts <= ?
|
||||
ORDER BY i.item_ts ASC
|
||||
JOIN files f ON f.chat_item_id = i.chat_item_id
|
||||
WHERE i.user_id = ? AND i.group_id = ? AND i.item_ts <= ? AND i.created_at <= ?
|
||||
|]
|
||||
(userId, groupId, expirationDate)
|
||||
(userId, groupId, expirationDate, createdAtCutoff)
|
||||
|
||||
toItemIdAndFileInfo' :: (ChatItemId, Maybe Int64, Maybe ACIFileStatus, Maybe FilePath) -> (ChatItemId, Maybe CIFileInfo)
|
||||
toItemIdAndFileInfo' (chatItemId, fileId_, fileStatus_, filePath) =
|
||||
case (fileId_, fileStatus_) of
|
||||
(Just fileId, Just fileStatus) -> (chatItemId, Just CIFileInfo {fileId, fileStatus, filePath})
|
||||
_ -> (chatItemId, Nothing)
|
||||
deleteGroupExpiredCIs :: DB.Connection -> User -> GroupInfo -> UTCTime -> UTCTime -> IO ()
|
||||
deleteGroupExpiredCIs db User {userId} GroupInfo {groupId} expirationDate createdAtCutoff = do
|
||||
DB.execute db "DELETE FROM messages WHERE group_id = ? AND created_at <= ?" (groupId, min expirationDate createdAtCutoff)
|
||||
DB.execute db "DELETE FROM chat_items WHERE user_id = ? AND group_id = ? AND item_ts <= ? AND created_at <= ?" (userId, groupId, expirationDate, createdAtCutoff)
|
||||
|
||||
getGroupCICount :: DB.Connection -> User -> GroupInfo -> IO (Maybe Int64)
|
||||
getGroupCICount db User {userId} GroupInfo {groupId} =
|
||||
fmap join . maybeFirstRow fromOnly $
|
||||
DB.query db "SELECT COUNT(1) FROM chat_items WHERE user_id = ? AND group_id = ?" (userId, groupId)
|
||||
|
||||
-- | Saves unique local display name based on passed displayName, suffixed with _N if required.
|
||||
-- This function should be called inside transaction.
|
||||
|
@ -3225,7 +3225,10 @@ send :: TestCC -> String -> IO ()
|
||||
send TestCC {chatController = cc} cmd = atomically $ writeTBQueue (inputQ cc) cmd
|
||||
|
||||
(<##) :: TestCC -> String -> Expectation
|
||||
cc <## line = getTermLine cc `shouldReturn` line
|
||||
cc <## line = do
|
||||
l <- getTermLine cc
|
||||
when (l /= line) $ print ("expected: " <> line, ", got: " <> l)
|
||||
l `shouldBe` line
|
||||
|
||||
getInAnyOrder :: (String -> String) -> TestCC -> [String] -> Expectation
|
||||
getInAnyOrder _ _ [] = pure ()
|
||||
|
Loading…
Reference in New Issue
Block a user