core: add indexes for cleanup, initial delay (#2514)

This commit is contained in:
spaced4ndy 2023-05-26 14:03:26 +04:00 committed by GitHub
parent 57ed903a48
commit 8b1e5d3db7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 55 additions and 14 deletions

View File

@ -98,6 +98,7 @@ library
Simplex.Chat.Migrations.M20230505_chat_item_versions
Simplex.Chat.Migrations.M20230511_reactions
Simplex.Chat.Migrations.M20230519_item_deleted_ts
Simplex.Chat.Migrations.M20230526_indexes
Simplex.Chat.Mobile
Simplex.Chat.Mobile.WebRTC
Simplex.Chat.Options

View File

@ -120,7 +120,9 @@ defaultChatConfig =
subscriptionEvents = False,
hostEvents = False,
testView = False,
ciExpirationInterval = 1800 * 1000000 -- 30 minutes
initialCleanupManagerDelay = 30 * 1000000, -- 30 seconds
cleanupManagerInterval = 30 * 60, -- 30 minutes
ciExpirationInterval = 30 * 60 * 1000000 -- 30 minutes
}
_defaultSMPServers :: NonEmpty SMPServerWithAuth
@ -2349,26 +2351,33 @@ subscribeUserConnections agentBatchSubscribe user = do
Just _ -> Nothing
_ -> Just . ChatError . CEAgentNoSubResult $ AgentConnId connId
cleanupManagerInterval :: NominalDiffTime
cleanupManagerInterval = 1800 -- 30 minutes
cleanupManager :: forall m. ChatMonad m => m ()
cleanupManager = do
interval <- asks (cleanupManagerInterval . config)
runWithoutInitialDelay interval
delay <- asks (initialCleanupManagerDelay . config)
liftIO $ threadDelay' delay
forever $ do
flip catchError (toView . CRChatError Nothing) $ do
waitChatStarted
users <- withStore' getUsers
let (us, us') = partition activeUser users
forM_ us cleanupUser
forM_ us' cleanupUser
forM_ us $ cleanupUser interval
forM_ us' $ cleanupUser interval
cleanupMessages `catchError` (toView . CRChatError Nothing)
liftIO $ threadDelay' $ diffToMicroseconds cleanupManagerInterval
liftIO $ threadDelay' $ diffToMicroseconds interval
where
cleanupUser user =
cleanupTimedItems user `catchError` (toView . CRChatError (Just user))
cleanupTimedItems user = do
runWithoutInitialDelay cleanupInterval = flip catchError (toView . CRChatError Nothing) $ do
waitChatStarted
users <- withStore' getUsers
let (us, us') = partition activeUser users
forM_ us $ \u -> cleanupTimedItems cleanupInterval u `catchError` (toView . CRChatError (Just u))
forM_ us' $ \u -> cleanupTimedItems cleanupInterval u `catchError` (toView . CRChatError (Just u))
cleanupUser cleanupInterval user =
cleanupTimedItems cleanupInterval user `catchError` (toView . CRChatError (Just user))
cleanupTimedItems cleanupInterval user = do
ts <- liftIO getCurrentTime
let startTimedThreadCutoff = addUTCTime cleanupManagerInterval ts
let startTimedThreadCutoff = addUTCTime cleanupInterval ts
timedItems <- withStore' $ \db -> getTimedItems db user startTimedThreadCutoff
forM_ timedItems $ \(itemRef, deleteAt) -> startTimedItemThread user itemRef deleteAt `catchError` const (pure ())
cleanupMessages = do
@ -2378,8 +2387,9 @@ cleanupManager = do
startProximateTimedItemThread :: ChatMonad m => User -> (ChatRef, ChatItemId) -> UTCTime -> m ()
startProximateTimedItemThread user itemRef deleteAt = do
interval <- asks (cleanupManagerInterval . config)
ts <- liftIO getCurrentTime
when (diffUTCTime deleteAt ts <= cleanupManagerInterval) $
when (diffUTCTime deleteAt ts <= interval) $
startTimedItemThread user itemRef deleteAt
startTimedItemThread :: ChatMonad m => User -> (ChatRef, ChatItemId) -> UTCTime -> m ()

View File

@ -32,7 +32,7 @@ import Data.List.NonEmpty (NonEmpty)
import Data.Map.Strict (Map)
import Data.String
import Data.Text (Text)
import Data.Time (ZonedTime)
import Data.Time (NominalDiffTime, ZonedTime)
import Data.Time.Clock (UTCTime)
import Data.Version (showVersion)
import GHC.Generics (Generic)
@ -110,6 +110,8 @@ data ChatConfig = ChatConfig
hostEvents :: Bool,
logLevel :: ChatLogLevel,
testView :: Bool,
initialCleanupManagerDelay :: Int64,
cleanupManagerInterval :: NominalDiffTime,
ciExpirationInterval :: Int64 -- microseconds
}

View File

@ -0,0 +1,22 @@
{-# LANGUAGE QuasiQuotes #-}
module Simplex.Chat.Migrations.M20230526_indexes where
import Database.SQLite.Simple (Query)
import Database.SQLite.Simple.QQ (sql)
m20230526_indexes :: Query
m20230526_indexes =
[sql|
CREATE INDEX idx_messages_created_at ON messages(created_at);
CREATE INDEX idx_chat_item_reactions_created_by_msg_id ON chat_item_reactions(created_by_msg_id);
|]
down_m20230526_indexes :: Query
down_m20230526_indexes =
[sql|
DROP INDEX idx_chat_item_reactions_created_by_msg_id;
DROP INDEX idx_messages_created_at;
|]

View File

@ -640,3 +640,7 @@ CREATE INDEX idx_chat_item_reactions_group ON chat_item_reactions(
group_id,
shared_msg_id
);
CREATE INDEX idx_messages_created_at ON messages(created_at);
CREATE INDEX idx_chat_item_reactions_created_by_msg_id ON chat_item_reactions(
created_by_msg_id
);

View File

@ -394,6 +394,7 @@ import Simplex.Chat.Migrations.M20230504_recreate_msg_delivery_events_cleanup_me
import Simplex.Chat.Migrations.M20230505_chat_item_versions
import Simplex.Chat.Migrations.M20230511_reactions
import Simplex.Chat.Migrations.M20230519_item_deleted_ts
import Simplex.Chat.Migrations.M20230526_indexes
import Simplex.Chat.Protocol
import Simplex.Chat.Types
import Simplex.Chat.Util (week)
@ -473,7 +474,8 @@ schemaMigrations =
("20230504_recreate_msg_delivery_events_cleanup_messages", m20230504_recreate_msg_delivery_events_cleanup_messages, Just down_m20230504_recreate_msg_delivery_events_cleanup_messages),
("20230505_chat_item_versions", m20230505_chat_item_versions, Just down_m20230505_chat_item_versions),
("20230511_reactions", m20230511_reactions, Just down_m20230511_reactions),
("20230519_item_deleted_ts", m20230519_item_deleted_ts, Just down_m20230519_item_deleted_ts)
("20230519_item_deleted_ts", m20230519_item_deleted_ts, Just down_m20230519_item_deleted_ts),
("20230526_indexes", m20230526_indexes, Just down_m20230526_indexes)
]
-- | The list of migrations in ascending order by date