Compare commits
1 Commits
ep/test-nt
...
ab/suspend
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4c92e32dc2 |
@@ -1,8 +1,11 @@
|
|||||||
{-# LANGUAGE DataKinds #-}
|
{-# LANGUAGE DataKinds #-}
|
||||||
{-# LANGUAGE DuplicateRecordFields #-}
|
{-# LANGUAGE DuplicateRecordFields #-}
|
||||||
{-# LANGUAGE FlexibleContexts #-}
|
{-# LANGUAGE FlexibleContexts #-}
|
||||||
|
{-# LANGUAGE FlexibleInstances #-}
|
||||||
{-# LANGUAGE GADTs #-}
|
{-# LANGUAGE GADTs #-}
|
||||||
|
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
|
||||||
{-# LANGUAGE LambdaCase #-}
|
{-# LANGUAGE LambdaCase #-}
|
||||||
|
{-# LANGUAGE MultiParamTypeClasses #-}
|
||||||
{-# LANGUAGE MultiWayIf #-}
|
{-# LANGUAGE MultiWayIf #-}
|
||||||
{-# LANGUAGE NamedFieldPuns #-}
|
{-# LANGUAGE NamedFieldPuns #-}
|
||||||
{-# LANGUAGE OverloadedRecordDot #-}
|
{-# LANGUAGE OverloadedRecordDot #-}
|
||||||
@@ -4989,9 +4992,10 @@ processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do
|
|||||||
Left _ -> messageError "x.grp.mem.inv error: referenced member does not exist"
|
Left _ -> messageError "x.grp.mem.inv error: referenced member does not exist"
|
||||||
Right reMember -> do
|
Right reMember -> do
|
||||||
GroupMemberIntro {introId} <- withStore $ \db -> saveIntroInvitation db reMember m introInv
|
GroupMemberIntro {introId} <- withStore $ \db -> saveIntroInvitation db reMember m introInv
|
||||||
void . sendGroupMessage' user [reMember] (XGrpMemFwd (memberInfo m) introInv) groupId (Just introId) $
|
void . runDBSuspendingChat_ $ sendGroupMessage' user [reMember] (XGrpMemFwd (memberInfo m) introInv) groupId (Just introId) $
|
||||||
withStore' $
|
-- withStore' $ \db ->
|
||||||
\db -> updateIntroStatus db introId GMIntroInvForwarded
|
suspendDB $ \db ->
|
||||||
|
updateIntroStatus db introId GMIntroInvForwarded
|
||||||
_ -> messageError "x.grp.mem.inv can be only sent by invitee member"
|
_ -> messageError "x.grp.mem.inv can be only sent by invitee member"
|
||||||
|
|
||||||
xGrpMemFwd :: GroupInfo -> GroupMember -> MemberInfo -> IntroInvitation -> m ()
|
xGrpMemFwd :: GroupInfo -> GroupMember -> MemberInfo -> IntroInvitation -> m ()
|
||||||
@@ -5522,11 +5526,91 @@ deliverMessage conn@Connection {connId} cmEventTag msgBody msgId = do
|
|||||||
let sndMsgDelivery = SndMsgDelivery {connId, agentMsgId}
|
let sndMsgDelivery = SndMsgDelivery {connId, agentMsgId}
|
||||||
withStore' $ \db -> createSndMsgDelivery db sndMsgDelivery msgId
|
withStore' $ \db -> createSndMsgDelivery db sndMsgDelivery msgId
|
||||||
|
|
||||||
|
newtype Sus i a = Sus
|
||||||
|
{ runSus :: ReaderT (TVar [i], ChatController) IO a
|
||||||
|
} deriving (Functor, Applicative, Monad, MonadIO, MonadUnliftIO)
|
||||||
|
|
||||||
|
-- Recover MonadError via MonadUnliftIO
|
||||||
|
instance MonadError ChatError (Sus i) where
|
||||||
|
throwError = E.throwIO
|
||||||
|
{-# INLINE throwError #-}
|
||||||
|
catchError = E.catch
|
||||||
|
{-# INLINE catchError #-}
|
||||||
|
|
||||||
|
-- Alas, there's no MonadCont IO, and no MonadUnliftIO (ContT IO)
|
||||||
|
-- instance MonadCont Sus where
|
||||||
|
-- callCC a'susB_susA = Sus $ do
|
||||||
|
-- callCC $ \a'rB ->
|
||||||
|
-- let Sus s = a'susB_susA $ \a -> Sus (a'rB a)
|
||||||
|
-- in s
|
||||||
|
|
||||||
|
-- Hide our extra state var from ChatMonad/Reader
|
||||||
|
instance MonadReader ChatController (Sus i) where
|
||||||
|
ask = Sus $ asks snd
|
||||||
|
{-# INLINE ask #-}
|
||||||
|
local f (Sus action) = Sus $ local (fmap f) action
|
||||||
|
{-# INLINE local #-}
|
||||||
|
|
||||||
|
-- Instantiate TVar-powered MonadState, breaking atomic updates
|
||||||
|
-- instance MonadState [IO ()] Sus where
|
||||||
|
-- state f = Sus $ asks fst >>= \st -> atomically (stateTVar st f)
|
||||||
|
-- get = Sus $ asks fst >>= readTVarIO
|
||||||
|
-- put x = seq x $ Sus $ asks fst >>= \st -> atomically (writeTVar st x)
|
||||||
|
|
||||||
|
-- Or not... just use our hidden state directly
|
||||||
|
suspend :: i -> Sus i ()
|
||||||
|
suspend action = Sus $ asks fst >>= \st -> atomically $ modifyTVar' st (action :) -- XXX: pushing actions to a stack - reverse to get original order (if needed)
|
||||||
|
{-# INLINE suspend #-}
|
||||||
|
|
||||||
|
-- Make it classy to prevent monomorphization of ChatMonad constraints
|
||||||
|
class SuspendIO m where
|
||||||
|
suspendIO :: IO () -> m ()
|
||||||
|
|
||||||
|
instance SuspendIO (Sus (IO ())) where
|
||||||
|
suspendIO = suspend
|
||||||
|
|
||||||
|
suspendM :: (MonadUnliftIO m, SuspendIO m) => m () -> m ()
|
||||||
|
suspendM action = toIO action >>= suspendIO
|
||||||
|
|
||||||
|
-- Boo, does not aggregate transactions
|
||||||
|
-- withStoreLater :: (ChatMonad m, SuspendIO m) => (DB.Connection -> IO ()) -> m ()
|
||||||
|
-- withStoreLater action = suspendM (withStore' action)
|
||||||
|
|
||||||
|
withStoreLater :: SuspendDB m => (DB.Connection -> IO ()) -> m ()
|
||||||
|
withStoreLater = suspendDB
|
||||||
|
|
||||||
|
class SuspendDB m where
|
||||||
|
suspendDB :: (DB.Connection -> IO ()) -> m ()
|
||||||
|
|
||||||
|
instance SuspendDB (Sus (DB.Connection -> IO ())) where
|
||||||
|
suspendDB = suspend
|
||||||
|
|
||||||
|
runSuspendingChat :: ChatMonad m => ([i] -> m r) -> Sus i a -> m (r, a)
|
||||||
|
runSuspendingChat animate (Sus action) = do
|
||||||
|
cc <- ask
|
||||||
|
suspended <- newTVarIO []
|
||||||
|
r <- E.try (liftIO $ runReaderT action (suspended, cc)) >>= liftEither
|
||||||
|
readTVarIO suspended >>= animate >>= \susResults -> pure (susResults, r)
|
||||||
|
|
||||||
|
runIOSuspendingChat_ :: forall m a . ChatMonad m => Sus (IO ()) a -> m a
|
||||||
|
runIOSuspendingChat_ action = snd <$> runSuspendingChat animate_ action
|
||||||
|
where
|
||||||
|
animate_ :: [IO ()] -> m ()
|
||||||
|
animate_ = mapM_ liftIO
|
||||||
|
|
||||||
|
runDBSuspendingChat_ :: forall m a . ChatMonad m => Sus (DB.Connection -> IO ()) a -> m a
|
||||||
|
runDBSuspendingChat_ action = snd <$> runSuspendingChat animate action
|
||||||
|
where
|
||||||
|
animate :: [DB.Connection -> IO ()] -> m ()
|
||||||
|
animate batch = withStore' $ \db -> mapM_ ($ db) $ reverse batch -- XXX: recover original order from consing to []
|
||||||
|
|
||||||
sendGroupMessage :: (MsgEncodingI e, ChatMonad m) => User -> GroupInfo -> [GroupMember] -> ChatMsgEvent e -> m (SndMessage, [GroupMember])
|
sendGroupMessage :: (MsgEncodingI e, ChatMonad m) => User -> GroupInfo -> [GroupMember] -> ChatMsgEvent e -> m (SndMessage, [GroupMember])
|
||||||
sendGroupMessage user GroupInfo {groupId} members chatMsgEvent =
|
sendGroupMessage user GroupInfo {groupId} members chatMsgEvent =
|
||||||
sendGroupMessage' user members chatMsgEvent groupId Nothing $ pure ()
|
-- Give SuspendDB powers
|
||||||
|
runDBSuspendingChat_ $ sendGroupMessage' user members chatMsgEvent groupId Nothing $ pure ()
|
||||||
|
|
||||||
sendGroupMessage' :: forall e m. (MsgEncodingI e, ChatMonad m) => User -> [GroupMember] -> ChatMsgEvent e -> Int64 -> Maybe Int64 -> m () -> m (SndMessage, [GroupMember])
|
-- Require SuspendDB powers
|
||||||
|
sendGroupMessage' :: forall e m. (MsgEncodingI e, ChatMonad m, SuspendDB m) => User -> [GroupMember] -> ChatMsgEvent e -> Int64 -> Maybe Int64 -> m () -> m (SndMessage, [GroupMember])
|
||||||
sendGroupMessage' user members chatMsgEvent groupId introId_ postDeliver = do
|
sendGroupMessage' user members chatMsgEvent groupId introId_ postDeliver = do
|
||||||
msg <- createSndMessage chatMsgEvent (GroupId groupId)
|
msg <- createSndMessage chatMsgEvent (GroupId groupId)
|
||||||
-- TODO collect failed deliveries into a single error
|
-- TODO collect failed deliveries into a single error
|
||||||
@@ -5551,7 +5635,7 @@ sendGroupMessage' user members chatMsgEvent groupId introId_ postDeliver = do
|
|||||||
| forwardSupported && isForwardedGroupMsg chatMsgEvent = pure Nothing
|
| forwardSupported && isForwardedGroupMsg chatMsgEvent = pure Nothing
|
||||||
| isXGrpMsgForward chatMsgEvent = pure Nothing
|
| isXGrpMsgForward chatMsgEvent = pure Nothing
|
||||||
| otherwise = do
|
| otherwise = do
|
||||||
withStore' $ \db -> createPendingGroupMessage db groupMemberId msgId introId_
|
withStoreLater $ \db -> createPendingGroupMessage db groupMemberId msgId introId_
|
||||||
pure $ Just m
|
pure $ Just m
|
||||||
forwardSupported = do
|
forwardSupported = do
|
||||||
let mcvr = memberChatVRange' m
|
let mcvr = memberChatVRange' m
|
||||||
|
|||||||
Reference in New Issue
Block a user