WIP: add batching

This commit is contained in:
IC Rainbow 2023-12-06 01:31:13 +02:00
parent 9df63160e5
commit 4c92e32dc2

View File

@ -1,8 +1,11 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedRecordDot #-}
@ -4989,9 +4992,10 @@ processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do
Left _ -> messageError "x.grp.mem.inv error: referenced member does not exist"
Right reMember -> do
GroupMemberIntro {introId} <- withStore $ \db -> saveIntroInvitation db reMember m introInv
void . sendGroupMessage' user [reMember] (XGrpMemFwd (memberInfo m) introInv) groupId (Just introId) $
withStore' $
\db -> updateIntroStatus db introId GMIntroInvForwarded
void . runDBSuspendingChat_ $ sendGroupMessage' user [reMember] (XGrpMemFwd (memberInfo m) introInv) groupId (Just introId) $
-- withStore' $ \db ->
suspendDB $ \db ->
updateIntroStatus db introId GMIntroInvForwarded
_ -> messageError "x.grp.mem.inv can be only sent by invitee member"
xGrpMemFwd :: GroupInfo -> GroupMember -> MemberInfo -> IntroInvitation -> m ()
@ -5522,11 +5526,91 @@ deliverMessage conn@Connection {connId} cmEventTag msgBody msgId = do
let sndMsgDelivery = SndMsgDelivery {connId, agentMsgId}
withStore' $ \db -> createSndMsgDelivery db sndMsgDelivery msgId
newtype Sus i a = Sus
{ runSus :: ReaderT (TVar [i], ChatController) IO a
} deriving (Functor, Applicative, Monad, MonadIO, MonadUnliftIO)
-- Recover MonadError via MonadUnliftIO
instance MonadError ChatError (Sus i) where
throwError = E.throwIO
{-# INLINE throwError #-}
catchError = E.catch
{-# INLINE catchError #-}
-- Alas, there's no MonadCont IO, and no MonadUnliftIO (ContT IO)
-- instance MonadCont Sus where
-- callCC a'susB_susA = Sus $ do
-- callCC $ \a'rB ->
-- let Sus s = a'susB_susA $ \a -> Sus (a'rB a)
-- in s
-- Hide our extra state var from ChatMonad/Reader
instance MonadReader ChatController (Sus i) where
ask = Sus $ asks snd
{-# INLINE ask #-}
local f (Sus action) = Sus $ local (fmap f) action
{-# INLINE local #-}
-- Instantiate TVar-powered MonadState, breaking atomic updates
-- instance MonadState [IO ()] Sus where
-- state f = Sus $ asks fst >>= \st -> atomically (stateTVar st f)
-- get = Sus $ asks fst >>= readTVarIO
-- put x = seq x $ Sus $ asks fst >>= \st -> atomically (writeTVar st x)
-- Or not... just use our hidden state directly
suspend :: i -> Sus i ()
suspend action = Sus $ asks fst >>= \st -> atomically $ modifyTVar' st (action :) -- XXX: pushing actions to a stack - reverse to get original order (if needed)
{-# INLINE suspend #-}
-- Make it classy to prevent monomorphization of ChatMonad constraints
class SuspendIO m where
suspendIO :: IO () -> m ()
instance SuspendIO (Sus (IO ())) where
suspendIO = suspend
suspendM :: (MonadUnliftIO m, SuspendIO m) => m () -> m ()
suspendM action = toIO action >>= suspendIO
-- Boo, does not aggregate transactions
-- withStoreLater :: (ChatMonad m, SuspendIO m) => (DB.Connection -> IO ()) -> m ()
-- withStoreLater action = suspendM (withStore' action)
withStoreLater :: SuspendDB m => (DB.Connection -> IO ()) -> m ()
withStoreLater = suspendDB
class SuspendDB m where
suspendDB :: (DB.Connection -> IO ()) -> m ()
instance SuspendDB (Sus (DB.Connection -> IO ())) where
suspendDB = suspend
runSuspendingChat :: ChatMonad m => ([i] -> m r) -> Sus i a -> m (r, a)
runSuspendingChat animate (Sus action) = do
cc <- ask
suspended <- newTVarIO []
r <- E.try (liftIO $ runReaderT action (suspended, cc)) >>= liftEither
readTVarIO suspended >>= animate >>= \susResults -> pure (susResults, r)
runIOSuspendingChat_ :: forall m a . ChatMonad m => Sus (IO ()) a -> m a
runIOSuspendingChat_ action = snd <$> runSuspendingChat animate_ action
where
animate_ :: [IO ()] -> m ()
animate_ = mapM_ liftIO
runDBSuspendingChat_ :: forall m a . ChatMonad m => Sus (DB.Connection -> IO ()) a -> m a
runDBSuspendingChat_ action = snd <$> runSuspendingChat animate action
where
animate :: [DB.Connection -> IO ()] -> m ()
animate batch = withStore' $ \db -> mapM_ ($ db) $ reverse batch -- XXX: recover original order from consing to []
sendGroupMessage :: (MsgEncodingI e, ChatMonad m) => User -> GroupInfo -> [GroupMember] -> ChatMsgEvent e -> m (SndMessage, [GroupMember])
sendGroupMessage user GroupInfo {groupId} members chatMsgEvent =
sendGroupMessage' user members chatMsgEvent groupId Nothing $ pure ()
-- Give SuspendDB powers
runDBSuspendingChat_ $ sendGroupMessage' user members chatMsgEvent groupId Nothing $ pure ()
sendGroupMessage' :: forall e m. (MsgEncodingI e, ChatMonad m) => User -> [GroupMember] -> ChatMsgEvent e -> Int64 -> Maybe Int64 -> m () -> m (SndMessage, [GroupMember])
-- Require SuspendDB powers
sendGroupMessage' :: forall e m. (MsgEncodingI e, ChatMonad m, SuspendDB m) => User -> [GroupMember] -> ChatMsgEvent e -> Int64 -> Maybe Int64 -> m () -> m (SndMessage, [GroupMember])
sendGroupMessage' user members chatMsgEvent groupId introId_ postDeliver = do
msg <- createSndMessage chatMsgEvent (GroupId groupId)
-- TODO collect failed deliveries into a single error
@ -5551,7 +5635,7 @@ sendGroupMessage' user members chatMsgEvent groupId introId_ postDeliver = do
| forwardSupported && isForwardedGroupMsg chatMsgEvent = pure Nothing
| isXGrpMsgForward chatMsgEvent = pure Nothing
| otherwise = do
withStore' $ \db -> createPendingGroupMessage db groupMemberId msgId introId_
withStoreLater $ \db -> createPendingGroupMessage db groupMemberId msgId introId_
pure $ Just m
forwardSupported = do
let mcvr = memberChatVRange' m