Merge branch 'stable'
This commit is contained in:
commit
bfb274b037
@ -12,7 +12,7 @@ constraints: zip +disable-bzip2 +disable-zstd
|
||||
source-repository-package
|
||||
type: git
|
||||
location: https://github.com/simplex-chat/simplexmq.git
|
||||
tag: ad8cd1d5154617663065652b45c784ad5a0a584d
|
||||
tag: aee90884175a3092828be1f0be2fc702c69bc101
|
||||
|
||||
source-repository-package
|
||||
type: git
|
||||
|
@ -1,5 +1,5 @@
|
||||
{
|
||||
"https://github.com/simplex-chat/simplexmq.git"."ad8cd1d5154617663065652b45c784ad5a0a584d" = "19sinz1gynab776x8h9va7r6ifm9pmgzljsbc7z5cbkcnjl5sfh3";
|
||||
"https://github.com/simplex-chat/simplexmq.git"."aee90884175a3092828be1f0be2fc702c69bc101" = "0ca5xzcpria481jhl9nlazvjljg3wwfkzzd2x6h4lxql2wbdnlx6";
|
||||
"https://github.com/simplex-chat/hs-socks.git"."a30cc7a79a08d8108316094f8f2f82a0c5e1ac51" = "0yasvnr7g91k76mjkamvzab2kvlb1g5pspjyjn2fr6v83swjhj38";
|
||||
"https://github.com/simplex-chat/direct-sqlcipher.git"."f814ee68b16a9447fbb467ccc8f29bdd3546bfd9" = "1ql13f4kfwkbaq7nygkxgw84213i0zm7c1a8hwvramayxl38dq5d";
|
||||
"https://github.com/simplex-chat/sqlcipher-simple.git"."a46bd361a19376c5211f1058908fc0ae6bf42446" = "1z0r78d8f0812kxbgsm735qf6xx8lvaz27k1a0b4a2m0sshpd5gl";
|
||||
|
@ -28,7 +28,6 @@ import Data.Bifunctor (bimap, first, second)
|
||||
import Data.ByteArray (ScrubbedBytes)
|
||||
import qualified Data.ByteArray as BA
|
||||
import qualified Data.ByteString.Base64 as B64
|
||||
import Data.ByteString.Builder (toLazyByteString)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import qualified Data.ByteString.Lazy.Char8 as LB
|
||||
@ -5791,8 +5790,7 @@ sendGroupMemberMessages user conn@Connection {connId} events groupId = do
|
||||
processBatch batch `catchChatError` (toView . CRChatError (Just user))
|
||||
where
|
||||
processBatch :: MsgBatch -> m ()
|
||||
processBatch (MsgBatch builder sndMsgs) = do
|
||||
let batchBody = LB.toStrict $ toLazyByteString builder
|
||||
processBatch (MsgBatch batchBody sndMsgs) = do
|
||||
agentMsgId <- withAgent $ \a -> sendMessage a (aConnId conn) MsgFlags {notification = True} batchBody
|
||||
let sndMsgDelivery = SndMsgDelivery {connId, agentMsgId}
|
||||
void . withStoreBatch' $ \db -> map (\SndMessage {msgId} -> createSndMsgDelivery db sndMsgDelivery msgId) sndMsgs
|
||||
@ -5802,21 +5800,21 @@ directMessage chatMsgEvent = do
|
||||
chatVRange <- chatVersionRange
|
||||
let r = encodeChatMessage ChatMessage {chatVRange, msgId = Nothing, chatMsgEvent}
|
||||
case r of
|
||||
ECMEncoded encodedBody -> pure . LB.toStrict $ encodedBody
|
||||
ECMEncoded encodedBody -> pure encodedBody
|
||||
ECMLarge -> throwChatError $ CEException "large message"
|
||||
|
||||
deliverMessage :: ChatMonad m => Connection -> CMEventTag e -> LazyMsgBody -> MessageId -> m Int64
|
||||
deliverMessage :: ChatMonad m => Connection -> CMEventTag e -> MsgBody -> MessageId -> m Int64
|
||||
deliverMessage conn cmEventTag msgBody msgId = do
|
||||
let msgFlags = MsgFlags {notification = hasNotification cmEventTag}
|
||||
deliverMessage' conn msgFlags msgBody msgId
|
||||
|
||||
deliverMessage' :: ChatMonad m => Connection -> MsgFlags -> LazyMsgBody -> MessageId -> m Int64
|
||||
deliverMessage' :: ChatMonad m => Connection -> MsgFlags -> MsgBody -> MessageId -> m Int64
|
||||
deliverMessage' conn msgFlags msgBody msgId =
|
||||
deliverMessages [(conn, msgFlags, msgBody, msgId)] >>= \case
|
||||
[r] -> liftEither r
|
||||
rs -> throwChatError $ CEInternalError $ "deliverMessage: expected 1 result, got " <> show (length rs)
|
||||
|
||||
type MsgReq = (Connection, MsgFlags, LazyMsgBody, MessageId)
|
||||
type MsgReq = (Connection, MsgFlags, MsgBody, MessageId)
|
||||
|
||||
deliverMessages :: ChatMonad' m => [MsgReq] -> m [Either ChatError Int64]
|
||||
deliverMessages = deliverMessagesB . map Right
|
||||
@ -5827,7 +5825,7 @@ deliverMessagesB msgReqs = do
|
||||
withStoreBatch $ \db -> map (bindRight $ createDelivery db) sent
|
||||
where
|
||||
toAgent = \case
|
||||
Right (conn, msgFlags, msgBody, _msgId) -> Right (aConnId conn, msgFlags, LB.toStrict msgBody)
|
||||
Right (conn, msgFlags, msgBody, _msgId) -> Right (aConnId conn, msgFlags, msgBody)
|
||||
Left _ce -> Left (AP.INTERNAL "ChatError, skip") -- as long as it is Left, the agent batchers should just step over it
|
||||
prepareBatch (Right req) (Right ar) = Right (req, ar)
|
||||
prepareBatch (Left ce) _ = Left ce -- restore original ChatError
|
||||
|
@ -25,7 +25,6 @@ import qualified Data.Aeson.Encoding as JE
|
||||
import qualified Data.Aeson.TH as JQ
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import qualified Data.ByteString.Base64 as B64
|
||||
import qualified Data.ByteString.Lazy as L
|
||||
import qualified Data.ByteString.Lazy.Char8 as LB
|
||||
import Data.Char (isSpace)
|
||||
import Data.Int (Int64)
|
||||
@ -812,12 +811,10 @@ checkChatType x = case testEquality (chatTypeI @c) (chatTypeI @c') of
|
||||
Just Refl -> Right x
|
||||
Nothing -> Left "bad chat type"
|
||||
|
||||
type LazyMsgBody = L.ByteString
|
||||
|
||||
data SndMessage = SndMessage
|
||||
{ msgId :: MessageId,
|
||||
sharedMsgId :: SharedMsgId,
|
||||
msgBody :: LazyMsgBody
|
||||
msgBody :: MsgBody
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
@ -839,7 +836,7 @@ data RcvMessage = RcvMessage
|
||||
data PendingGroupMessage = PendingGroupMessage
|
||||
{ msgId :: MessageId,
|
||||
cmEventTag :: ACMEventTag,
|
||||
msgBody :: LazyMsgBody,
|
||||
msgBody :: MsgBody,
|
||||
introId_ :: Maybe Int64
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
|
||||
module Simplex.Chat.Messages.Batch
|
||||
@ -9,32 +10,28 @@ module Simplex.Chat.Messages.Batch
|
||||
)
|
||||
where
|
||||
|
||||
import Data.ByteString.Builder (Builder, charUtf8, lazyByteString)
|
||||
import qualified Data.ByteString.Lazy as LB
|
||||
import Data.Int (Int64)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Simplex.Chat.Controller (ChatError (..), ChatErrorType (..))
|
||||
import Simplex.Chat.Messages
|
||||
|
||||
data MsgBatch = MsgBatch Builder [SndMessage]
|
||||
data MsgBatch = MsgBatch ByteString [SndMessage]
|
||||
|
||||
-- | Batches [SndMessage] into batches of ByteString builders in form of JSON arrays.
|
||||
-- | Batches [SndMessage] into batches of ByteStrings in form of JSON arrays.
|
||||
-- Does not check if the resulting batch is a valid JSON.
|
||||
-- If a single element is passed, it is returned as is (a JSON string).
|
||||
-- If an element exceeds maxLen, it is returned as ChatError.
|
||||
batchMessages :: Int64 -> [SndMessage] -> [Either ChatError MsgBatch]
|
||||
batchMessages maxLen msgs =
|
||||
let (batches, batch, _, n) = foldr addToBatch ([], [], 0, 0) msgs
|
||||
in if n == 0 then batches else msgBatch batch : batches
|
||||
batchMessages :: Int -> [SndMessage] -> [Either ChatError MsgBatch]
|
||||
batchMessages maxLen = addBatch . foldr addToBatch ([], [], 0, 0)
|
||||
where
|
||||
msgBatch batch = Right (MsgBatch (encodeMessages batch) batch)
|
||||
addToBatch :: SndMessage -> ([Either ChatError MsgBatch], [SndMessage], Int64, Int) -> ([Either ChatError MsgBatch], [SndMessage], Int64, Int)
|
||||
addToBatch msg@SndMessage {msgBody} (batches, batch, len, n)
|
||||
addToBatch :: SndMessage -> ([Either ChatError MsgBatch], [SndMessage], Int, Int) -> ([Either ChatError MsgBatch], [SndMessage], Int, Int)
|
||||
addToBatch msg@SndMessage {msgBody} acc@(batches, batch, len, n)
|
||||
| batchLen <= maxLen = (batches, msg : batch, len', n + 1)
|
||||
| msgLen <= maxLen = (batches', [msg], msgLen, 1)
|
||||
| otherwise = (errLarge msg : (if n == 0 then batches else batches'), [], 0, 0)
|
||||
| msgLen <= maxLen = (addBatch acc, [msg], msgLen, 1)
|
||||
| otherwise = (errLarge msg : addBatch acc, [], 0, 0)
|
||||
where
|
||||
msgLen = LB.length msgBody
|
||||
batches' = msgBatch batch : batches
|
||||
msgLen = B.length msgBody
|
||||
len'
|
||||
| n == 0 = msgLen
|
||||
| otherwise = msgLen + len + 1 -- 1 accounts for comma
|
||||
@ -42,11 +39,11 @@ batchMessages maxLen msgs =
|
||||
| n == 0 = len'
|
||||
| otherwise = len' + 2 -- 2 accounts for opening and closing brackets
|
||||
errLarge SndMessage {msgId} = Left $ ChatError $ CEInternalError ("large message " <> show msgId)
|
||||
|
||||
encodeMessages :: [SndMessage] -> Builder
|
||||
addBatch :: ([Either ChatError MsgBatch], [SndMessage], Int, Int) -> [Either ChatError MsgBatch]
|
||||
addBatch (batches, batch, _, n) = if n == 0 then batches else msgBatch batch : batches
|
||||
encodeMessages :: [SndMessage] -> ByteString
|
||||
encodeMessages = \case
|
||||
[] -> mempty
|
||||
[msg] -> encodeMsg msg
|
||||
(msg : msgs) -> charUtf8 '[' <> encodeMsg msg <> mconcat [charUtf8 ',' <> encodeMsg msg' | msg' <- msgs] <> charUtf8 ']'
|
||||
where
|
||||
encodeMsg SndMessage {msgBody} = lazyByteString msgBody
|
||||
[msg] -> body msg
|
||||
msgs -> B.concat ["[", B.intercalate "," (map body msgs), "]"]
|
||||
body SndMessage {msgBody} = msgBody
|
||||
|
@ -29,9 +29,7 @@ import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.ByteString.Internal (c2w, w2c)
|
||||
import qualified Data.ByteString.Lazy as L
|
||||
import qualified Data.ByteString.Lazy.Char8 as LB
|
||||
import Data.Int (Int64)
|
||||
import Data.Maybe (fromMaybe)
|
||||
import Data.String
|
||||
import Data.Text (Text)
|
||||
@ -495,20 +493,20 @@ $(JQ.deriveJSON defaultJSON ''QuotedMsg)
|
||||
|
||||
-- this limit reserves space for metadata in forwarded messages
|
||||
-- 15780 (limit used for fileChunkSize) - 161 (x.grp.msg.forward overhead) = 15619, round to 15610
|
||||
maxChatMsgSize :: Int64
|
||||
maxChatMsgSize :: Int
|
||||
maxChatMsgSize = 15610
|
||||
|
||||
data EncodedChatMessage = ECMEncoded L.ByteString | ECMLarge
|
||||
data EncodedChatMessage = ECMEncoded ByteString | ECMLarge
|
||||
|
||||
encodeChatMessage :: MsgEncodingI e => ChatMessage e -> EncodedChatMessage
|
||||
encodeChatMessage msg = do
|
||||
case chatToAppMessage msg of
|
||||
AMJson m -> do
|
||||
let body = J.encode m
|
||||
if LB.length body > maxChatMsgSize
|
||||
let body = LB.toStrict $ J.encode m
|
||||
if B.length body > maxChatMsgSize
|
||||
then ECMLarge
|
||||
else ECMEncoded body
|
||||
AMBinary m -> ECMEncoded . LB.fromStrict $ strEncode m
|
||||
AMBinary m -> ECMEncoded $ strEncode m
|
||||
|
||||
parseChatMessages :: ByteString -> [Either String AChatMessage]
|
||||
parseChatMessages "" = [Left "empty string"]
|
||||
|
@ -7,8 +7,8 @@
|
||||
module MessageBatching (batchingTests) where
|
||||
|
||||
import Crypto.Number.Serialize (os2ip)
|
||||
import Data.ByteString.Builder (toLazyByteString)
|
||||
import qualified Data.ByteString.Lazy as LB
|
||||
import Data.ByteString (ByteString)
|
||||
import qualified Data.ByteString as B
|
||||
import Data.Either (partitionEithers)
|
||||
import Data.Int (Int64)
|
||||
import Data.String (IsString (..))
|
||||
@ -26,7 +26,7 @@ batchingTests = describe "message batching tests" $ do
|
||||
it "image x.msg.new and x.msg.file.descr should fit into single batch" testImageFitsSingleBatch
|
||||
|
||||
instance IsString SndMessage where
|
||||
fromString s = SndMessage {msgId, sharedMsgId = SharedMsgId "", msgBody = LB.fromStrict s'}
|
||||
fromString s = SndMessage {msgId, sharedMsgId = SharedMsgId "", msgBody = s'}
|
||||
where
|
||||
s' = encodeUtf8 $ T.pack s
|
||||
msgId = fromInteger $ os2ip s'
|
||||
@ -94,14 +94,14 @@ testImageFitsSingleBatch = do
|
||||
-- 261_120 bytes (MAX_IMAGE_SIZE in UI), rounded up, example was 743
|
||||
let descrRoundedSize = 800
|
||||
|
||||
let xMsgNewStr = LB.replicate xMsgNewRoundedSize 1
|
||||
descrStr = LB.replicate descrRoundedSize 2
|
||||
let xMsgNewStr = B.replicate xMsgNewRoundedSize 1
|
||||
descrStr = B.replicate descrRoundedSize 2
|
||||
msg s = SndMessage {msgId = 0, sharedMsgId = SharedMsgId "", msgBody = s}
|
||||
batched = "[" <> xMsgNewStr <> "," <> descrStr <> "]"
|
||||
|
||||
runBatcherTest' maxChatMsgSize [msg xMsgNewStr, msg descrStr] [] [batched]
|
||||
|
||||
runBatcherTest :: Int64 -> [SndMessage] -> [ChatError] -> [LB.ByteString] -> Spec
|
||||
runBatcherTest :: Int -> [SndMessage] -> [ChatError] -> [ByteString] -> Spec
|
||||
runBatcherTest maxLen msgs expectedErrors expectedBatches =
|
||||
it
|
||||
( (show (map (\SndMessage {msgBody} -> msgBody) msgs) <> ", limit " <> show maxLen <> ": should return ")
|
||||
@ -110,10 +110,10 @@ runBatcherTest maxLen msgs expectedErrors expectedBatches =
|
||||
)
|
||||
(runBatcherTest' maxLen msgs expectedErrors expectedBatches)
|
||||
|
||||
runBatcherTest' :: Int64 -> [SndMessage] -> [ChatError] -> [LB.ByteString] -> IO ()
|
||||
runBatcherTest' :: Int -> [SndMessage] -> [ChatError] -> [ByteString] -> IO ()
|
||||
runBatcherTest' maxLen msgs expectedErrors expectedBatches = do
|
||||
let (errors, batches) = partitionEithers $ batchMessages maxLen msgs
|
||||
batchedStrs = map (\(MsgBatch builder _) -> toLazyByteString builder) batches
|
||||
batchedStrs = map (\(MsgBatch batchBody _) -> batchBody) batches
|
||||
testErrors errors `shouldBe` testErrors expectedErrors
|
||||
batchedStrs `shouldBe` expectedBatches
|
||||
where
|
||||
|
@ -7,7 +7,6 @@ module ProtocolTests where
|
||||
|
||||
import qualified Data.Aeson as J
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Lazy.Char8 as LB
|
||||
import Data.Time.Clock.System (SystemTime (..), systemToUTCTime)
|
||||
import Simplex.Chat.Protocol
|
||||
import Simplex.Chat.Types
|
||||
@ -74,7 +73,7 @@ s ##== msg = do
|
||||
let r = encodeChatMessage msg
|
||||
case r of
|
||||
ECMEncoded encodedBody ->
|
||||
J.eitherDecodeStrict' (LB.toStrict encodedBody)
|
||||
J.eitherDecodeStrict' encodedBody
|
||||
`shouldBe` (J.eitherDecodeStrict' s :: Either String J.Value)
|
||||
ECMLarge -> expectationFailure $ "large message"
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user