core: batch sending messages (#3566)

* core: batch sending messages

* batch without iorefs (#3573)

* one-pass

* simplexmq

* simplexmq

* simplexmq

* simplexmq

* revert change to ios project file

* refactor

* simplify

---------

Co-authored-by: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com>
This commit is contained in:
Evgeny Poberezkin 2023-12-20 06:38:39 +00:00 committed by GitHub
parent 7b073ba9f8
commit 6ba3100d34
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 87 additions and 45 deletions

View File

@ -14,7 +14,7 @@ constraints: zip +disable-bzip2 +disable-zstd
source-repository-package source-repository-package
type: git type: git
location: https://github.com/simplex-chat/simplexmq.git location: https://github.com/simplex-chat/simplexmq.git
tag: 18be2709f59a4cb20fe9758b899622092dba062e tag: 8c250ebe19f56dd7d53572d984e8016cb0e4d658
source-repository-package source-repository-package
type: git type: git

View File

@ -45,7 +45,7 @@ dependencies:
- sqlcipher-simple == 0.4.* - sqlcipher-simple == 0.4.*
- stm == 2.5.* - stm == 2.5.*
- terminal == 0.2.* - terminal == 0.2.*
- time == 1.9.* - time == 1.12.*
- tls >= 1.7.0 && < 1.8 - tls >= 1.7.0 && < 1.8
- unliftio == 0.2.* - unliftio == 0.2.*
- unliftio-core == 0.2.* - unliftio-core == 0.2.*

View File

@ -1,5 +1,5 @@
{ {
"https://github.com/simplex-chat/simplexmq.git"."18be2709f59a4cb20fe9758b899622092dba062e" = "08dr4vyg1wz2z768iikg8fks5zqf4dw5myr87hbpv964idda3pmj"; "https://github.com/simplex-chat/simplexmq.git"."8c250ebe19f56dd7d53572d984e8016cb0e4d658" = "080rw86yncf1h3zr5a8y65cndihq6f3ji43vxrdhr2mrb75vmw8m";
"https://github.com/simplex-chat/hs-socks.git"."a30cc7a79a08d8108316094f8f2f82a0c5e1ac51" = "0yasvnr7g91k76mjkamvzab2kvlb1g5pspjyjn2fr6v83swjhj38"; "https://github.com/simplex-chat/hs-socks.git"."a30cc7a79a08d8108316094f8f2f82a0c5e1ac51" = "0yasvnr7g91k76mjkamvzab2kvlb1g5pspjyjn2fr6v83swjhj38";
"https://github.com/simplex-chat/direct-sqlcipher.git"."f814ee68b16a9447fbb467ccc8f29bdd3546bfd9" = "1ql13f4kfwkbaq7nygkxgw84213i0zm7c1a8hwvramayxl38dq5d"; "https://github.com/simplex-chat/direct-sqlcipher.git"."f814ee68b16a9447fbb467ccc8f29bdd3546bfd9" = "1ql13f4kfwkbaq7nygkxgw84213i0zm7c1a8hwvramayxl38dq5d";
"https://github.com/simplex-chat/sqlcipher-simple.git"."a46bd361a19376c5211f1058908fc0ae6bf42446" = "1z0r78d8f0812kxbgsm735qf6xx8lvaz27k1a0b4a2m0sshpd5gl"; "https://github.com/simplex-chat/sqlcipher-simple.git"."a46bd361a19376c5211f1058908fc0ae6bf42446" = "1z0r78d8f0812kxbgsm735qf6xx8lvaz27k1a0b4a2m0sshpd5gl";

View File

@ -199,7 +199,7 @@ library
, sqlcipher-simple ==0.4.* , sqlcipher-simple ==0.4.*
, stm ==2.5.* , stm ==2.5.*
, terminal ==0.2.* , terminal ==0.2.*
, time ==1.9.* , time ==1.12.*
, tls >=1.7.0 && <1.8 , tls >=1.7.0 && <1.8
, unliftio ==0.2.* , unliftio ==0.2.*
, unliftio-core ==0.2.* , unliftio-core ==0.2.*
@ -259,7 +259,7 @@ executable simplex-bot
, sqlcipher-simple ==0.4.* , sqlcipher-simple ==0.4.*
, stm ==2.5.* , stm ==2.5.*
, terminal ==0.2.* , terminal ==0.2.*
, time ==1.9.* , time ==1.12.*
, tls >=1.7.0 && <1.8 , tls >=1.7.0 && <1.8
, unliftio ==0.2.* , unliftio ==0.2.*
, unliftio-core ==0.2.* , unliftio-core ==0.2.*
@ -319,7 +319,7 @@ executable simplex-bot-advanced
, sqlcipher-simple ==0.4.* , sqlcipher-simple ==0.4.*
, stm ==2.5.* , stm ==2.5.*
, terminal ==0.2.* , terminal ==0.2.*
, time ==1.9.* , time ==1.12.*
, tls >=1.7.0 && <1.8 , tls >=1.7.0 && <1.8
, unliftio ==0.2.* , unliftio ==0.2.*
, unliftio-core ==0.2.* , unliftio-core ==0.2.*
@ -381,7 +381,7 @@ executable simplex-broadcast-bot
, sqlcipher-simple ==0.4.* , sqlcipher-simple ==0.4.*
, stm ==2.5.* , stm ==2.5.*
, terminal ==0.2.* , terminal ==0.2.*
, time ==1.9.* , time ==1.12.*
, tls >=1.7.0 && <1.8 , tls >=1.7.0 && <1.8
, unliftio ==0.2.* , unliftio ==0.2.*
, unliftio-core ==0.2.* , unliftio-core ==0.2.*
@ -442,7 +442,7 @@ executable simplex-chat
, sqlcipher-simple ==0.4.* , sqlcipher-simple ==0.4.*
, stm ==2.5.* , stm ==2.5.*
, terminal ==0.2.* , terminal ==0.2.*
, time ==1.9.* , time ==1.12.*
, tls >=1.7.0 && <1.8 , tls >=1.7.0 && <1.8
, unliftio ==0.2.* , unliftio ==0.2.*
, unliftio-core ==0.2.* , unliftio-core ==0.2.*
@ -508,7 +508,7 @@ executable simplex-directory-service
, sqlcipher-simple ==0.4.* , sqlcipher-simple ==0.4.*
, stm ==2.5.* , stm ==2.5.*
, terminal ==0.2.* , terminal ==0.2.*
, time ==1.9.* , time ==1.12.*
, tls >=1.7.0 && <1.8 , tls >=1.7.0 && <1.8
, unliftio ==0.2.* , unliftio ==0.2.*
, unliftio-core ==0.2.* , unliftio-core ==0.2.*
@ -602,7 +602,7 @@ test-suite simplex-chat-test
, sqlcipher-simple ==0.4.* , sqlcipher-simple ==0.4.*
, stm ==2.5.* , stm ==2.5.*
, terminal ==0.2.* , terminal ==0.2.*
, time ==1.9.* , time ==1.12.*
, tls >=1.7.0 && <1.8 , tls >=1.7.0 && <1.8
, unliftio ==0.2.* , unliftio ==0.2.*
, unliftio-core ==0.2.* , unliftio-core ==0.2.*

View File

@ -35,7 +35,7 @@ import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy.Char8 as LB import qualified Data.ByteString.Lazy.Char8 as LB
import Data.Char import Data.Char
import Data.Constraint (Dict (..)) import Data.Constraint (Dict (..))
import Data.Either (fromRight, partitionEithers, rights) import Data.Either (fromRight, lefts, partitionEithers, rights)
import Data.Fixed (div') import Data.Fixed (div')
import Data.Functor (($>)) import Data.Functor (($>))
import Data.Int (Int64) import Data.Int (Int64)
@ -5002,7 +5002,7 @@ processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do
Left _ -> messageError "x.grp.mem.inv error: referenced member does not exist" Left _ -> messageError "x.grp.mem.inv error: referenced member does not exist"
Right reMember -> do Right reMember -> do
GroupMemberIntro {introId} <- withStore $ \db -> saveIntroInvitation db reMember m introInv GroupMemberIntro {introId} <- withStore $ \db -> saveIntroInvitation db reMember m introInv
void . sendGroupMessage' user [reMember] (XGrpMemFwd (memberInfo m) introInv) groupId (Just introId) $ sendGroupMemberMessage user reMember (XGrpMemFwd (memberInfo m) introInv) groupId (Just introId) $
withStore' $ withStore' $
\db -> updateIntroStatus db introId GMIntroInvForwarded \db -> updateIntroStatus db introId GMIntroInvForwarded
_ -> messageError "x.grp.mem.inv can be only sent by invitee member" _ -> messageError "x.grp.mem.inv can be only sent by invitee member"
@ -5529,46 +5529,62 @@ directMessage chatMsgEvent = do
pure $ strEncode ChatMessage {chatVRange, msgId = Nothing, chatMsgEvent} pure $ strEncode ChatMessage {chatVRange, msgId = Nothing, chatMsgEvent}
deliverMessage :: ChatMonad m => Connection -> CMEventTag e -> MsgBody -> MessageId -> m Int64 deliverMessage :: ChatMonad m => Connection -> CMEventTag e -> MsgBody -> MessageId -> m Int64
deliverMessage conn@Connection {connId} cmEventTag msgBody msgId = do deliverMessage conn cmEventTag msgBody msgId =
let msgFlags = MsgFlags {notification = hasNotification cmEventTag} deliverMessages [(conn, cmEventTag, msgBody, msgId)] >>= \case
agentMsgId <- withAgent $ \a -> sendMessage a (aConnId conn) msgFlags msgBody [r] -> liftEither r
let sndMsgDelivery = SndMsgDelivery {connId, agentMsgId} rs -> throwChatError $ CEInternalError $ "deliverMessage: expected 1 result, got " <> show (length rs)
withStore' $ \db -> createSndMsgDelivery db sndMsgDelivery msgId
deliverMessages :: ChatMonad' m => [(Connection, CMEventTag e, MsgBody, MessageId)] -> m [Either ChatError Int64]
deliverMessages msgReqs = do
sent <- zipWith prepareBatch msgReqs <$> withAgent' (`sendMessages` aReqs)
withStoreBatch $ \db -> map (bindRight $ createDelivery db) sent
where
aReqs = map (\(conn, cmEvTag, msgBody, _msgId) -> (aConnId conn, msgFlags cmEvTag, msgBody)) msgReqs
msgFlags cmEvTag = MsgFlags {notification = hasNotification cmEvTag}
prepareBatch req = bimap (`ChatErrorAgent` Nothing) (req,)
createDelivery :: DB.Connection -> ((Connection, CMEventTag e, MsgBody, MessageId), AgentMsgId) -> IO (Either ChatError Int64)
createDelivery db ((Connection {connId}, _, _, msgId), agentMsgId) =
Right <$> createSndMsgDelivery db (SndMsgDelivery {connId, agentMsgId}) msgId
sendGroupMessage :: (MsgEncodingI e, ChatMonad m) => User -> GroupInfo -> [GroupMember] -> ChatMsgEvent e -> m (SndMessage, [GroupMember]) sendGroupMessage :: (MsgEncodingI e, ChatMonad m) => User -> GroupInfo -> [GroupMember] -> ChatMsgEvent e -> m (SndMessage, [GroupMember])
sendGroupMessage user GroupInfo {groupId} members chatMsgEvent = sendGroupMessage user GroupInfo {groupId} members chatMsgEvent = do
sendGroupMessage' user members chatMsgEvent groupId Nothing $ pure () msg@SndMessage {msgId, msgBody} <- createSndMessage chatMsgEvent (GroupId groupId)
sendGroupMessage' :: forall e m. (MsgEncodingI e, ChatMonad m) => User -> [GroupMember] -> ChatMsgEvent e -> Int64 -> Maybe Int64 -> m () -> m (SndMessage, [GroupMember])
sendGroupMessage' user members chatMsgEvent groupId introId_ postDeliver = do
msg <- createSndMessage chatMsgEvent (GroupId groupId)
-- TODO collect failed deliveries into a single error
recipientMembers <- liftIO $ shuffleMembers (filter memberCurrent members) $ \GroupMember {memberRole} -> memberRole recipientMembers <- liftIO $ shuffleMembers (filter memberCurrent members) $ \GroupMember {memberRole} -> memberRole
rs <- forM recipientMembers $ \m -> let tag = toCMEventTag chatMsgEvent
messageMember m msg `catchChatError` (\e -> toView (CRChatError (Just user) e) $> Nothing) (toSend, pending) = foldr addMember ([], []) recipientMembers
let sentToMembers = catMaybes rs msgReqs = map (\(_, conn) -> (conn, tag, msgBody, msgId)) toSend
delivered <- deliverMessages msgReqs
let errors = lefts delivered
unless (null errors) $ toView $ CRChatErrors (Just user) errors
stored <- withStoreBatch' $ \db -> map (\m -> createPendingGroupMessage db (groupMemberId' m) msgId Nothing) pending
let sentToMembers = filterSent delivered toSend fst <> filterSent stored pending id
pure (msg, sentToMembers) pure (msg, sentToMembers)
where where
messageMember :: GroupMember -> SndMessage -> m (Maybe GroupMember) addMember m (toSend, pending) = case memberSendAction chatMsgEvent members m of
messageMember m@GroupMember {groupMemberId} SndMessage {msgId, msgBody} = case memberConn m of Just (MSASend conn) -> ((m, conn) : toSend, pending)
Nothing -> pendingOrForwarded Just MSAPending -> (toSend, m : pending)
Just conn@Connection {connStatus} Nothing -> (toSend, pending)
| connDisabled conn || connStatus == ConnDeleted -> pure Nothing filterSent :: [Either ChatError a] -> [mem] -> (mem -> GroupMember) -> [GroupMember]
| connStatus == ConnSndReady || connStatus == ConnReady -> do filterSent rs ms mem = [mem m | (Right _, m) <- zip rs ms]
let tag = toCMEventTag chatMsgEvent
deliverMessage conn tag msgBody msgId >> postDeliver data MemberSendAction = MSASend Connection | MSAPending
pure $ Just m
| otherwise -> pendingOrForwarded memberSendAction :: ChatMsgEvent e -> [GroupMember] -> GroupMember -> Maybe MemberSendAction
memberSendAction chatMsgEvent members m = case memberConn m of
Nothing -> pendingOrForwarded
Just conn@Connection {connStatus}
| connDisabled conn || connStatus == ConnDeleted -> Nothing
| connStatus == ConnSndReady || connStatus == ConnReady -> Just (MSASend conn)
| otherwise -> pendingOrForwarded
where
pendingOrForwarded
| forwardSupported && isForwardedGroupMsg chatMsgEvent = Nothing
| isXGrpMsgForward chatMsgEvent = Nothing
| otherwise = Just MSAPending
where where
pendingOrForwarded forwardSupported =
| forwardSupported && isForwardedGroupMsg chatMsgEvent = pure Nothing
| isXGrpMsgForward chatMsgEvent = pure Nothing
| otherwise = do
withStore' $ \db -> createPendingGroupMessage db groupMemberId msgId introId_
pure $ Just m
forwardSupported = do
let mcvr = memberChatVRange' m let mcvr = memberChatVRange' m
isCompatibleRange mcvr groupForwardVRange && invitingMemberSupportsForward in isCompatibleRange mcvr groupForwardVRange && invitingMemberSupportsForward
invitingMemberSupportsForward = case m.invitedByGroupMemberId of invitingMemberSupportsForward = case m.invitedByGroupMemberId of
Just invMemberId -> Just invMemberId ->
-- can be optimized for large groups by replacing [GroupMember] with Map GroupMemberId GroupMember -- can be optimized for large groups by replacing [GroupMember] with Map GroupMemberId GroupMember
@ -5582,6 +5598,16 @@ sendGroupMessage' user members chatMsgEvent groupId introId_ postDeliver = do
XGrpMsgForward {} -> True XGrpMsgForward {} -> True
_ -> False _ -> False
sendGroupMemberMessage :: forall e m. (MsgEncodingI e, ChatMonad m) => User -> GroupMember -> ChatMsgEvent e -> Int64 -> Maybe Int64 -> m () -> m ()
sendGroupMemberMessage user m@GroupMember {groupMemberId} chatMsgEvent groupId introId_ postDeliver = do
msg <- createSndMessage chatMsgEvent (GroupId groupId)
messageMember msg `catchChatError` (\e -> toView (CRChatError (Just user) e))
where
messageMember :: SndMessage -> m ()
messageMember SndMessage {msgId, msgBody} = forM_ (memberSendAction chatMsgEvent [m] m) $ \case
MSASend conn -> deliverMessage conn (toCMEventTag chatMsgEvent) msgBody msgId >> postDeliver
MSAPending -> withStore' $ \db -> createPendingGroupMessage db groupMemberId msgId introId_
shuffleMembers :: [a] -> (a -> GroupMemberRole) -> IO [a] shuffleMembers :: [a] -> (a -> GroupMemberRole) -> IO [a]
shuffleMembers ms role = do shuffleMembers ms role = do
let (adminMs, otherMs) = partition ((GRAdmin <=) . role) ms let (adminMs, otherMs) = partition ((GRAdmin <=) . role) ms

View File

@ -84,6 +84,7 @@ import Simplex.RemoteControl.Invitation (RCSignedInvitation, RCVerifiedInvitatio
import Simplex.RemoteControl.Types import Simplex.RemoteControl.Types
import System.IO (Handle) import System.IO (Handle)
import System.Mem.Weak (Weak) import System.Mem.Weak (Weak)
import qualified UnliftIO.Exception as E
import UnliftIO.STM import UnliftIO.STM
versionNumber :: String versionNumber :: String
@ -1287,12 +1288,26 @@ withStoreCtx ctx_ action = do
handleInternal :: String -> SomeException -> IO (Either StoreError a) handleInternal :: String -> SomeException -> IO (Either StoreError a)
handleInternal ctxStr e = pure . Left . SEInternalError $ show e <> ctxStr handleInternal ctxStr e = pure . Left . SEInternalError $ show e <> ctxStr
withStoreBatch :: (ChatMonad' m, Traversable t) => (DB.Connection -> t (IO (Either ChatError a))) -> m (t (Either ChatError a))
withStoreBatch actions = do
ChatController {chatStore} <- ask
liftIO $ withTransaction chatStore $ mapM (`E.catch` handleInternal) . actions
where
handleInternal :: E.SomeException -> IO (Either ChatError a)
handleInternal = pure . Left . ChatError . CEInternalError . show
withStoreBatch' :: (ChatMonad' m, Traversable t) => (DB.Connection -> t (IO a)) -> m (t (Either ChatError a))
withStoreBatch' actions = withStoreBatch $ fmap (fmap Right) . actions
withAgent :: ChatMonad m => (AgentClient -> ExceptT AgentErrorType m a) -> m a withAgent :: ChatMonad m => (AgentClient -> ExceptT AgentErrorType m a) -> m a
withAgent action = withAgent action =
asks smpAgent asks smpAgent
>>= runExceptT . action >>= runExceptT . action
>>= liftEither . first (`ChatErrorAgent` Nothing) >>= liftEither . first (`ChatErrorAgent` Nothing)
withAgent' :: ChatMonad' m => (AgentClient -> m a) -> m a
withAgent' action = asks smpAgent >>= action
$(JQ.deriveJSON (enumJSON $ dropPrefix "HS") ''HelpSection) $(JQ.deriveJSON (enumJSON $ dropPrefix "HS") ''HelpSection)
$(JQ.deriveJSON (sumTypeJSON $ dropPrefix "CLQ") ''ChatListQuery) $(JQ.deriveJSON (sumTypeJSON $ dropPrefix "CLQ") ''ChatListQuery)

View File

@ -353,6 +353,7 @@ serverCfg =
serverStatsBackupFile = Nothing, serverStatsBackupFile = Nothing,
smpServerVRange = supportedSMPServerVRange, smpServerVRange = supportedSMPServerVRange,
transportConfig = defaultTransportServerConfig, transportConfig = defaultTransportServerConfig,
smpHandshakeTimeout = 1000000,
controlPort = Nothing controlPort = Nothing
} }