core: api to resubscribe connections (#586)

* core: api to resubscribe connections

* update simplexmq
This commit is contained in:
Evgeny Poberezkin
2022-04-30 12:47:50 +01:00
committed by GitHub
parent 099f25c63f
commit 305052ecaf
6 changed files with 17 additions and 10 deletions

View File

@@ -183,6 +183,7 @@ processChatCommand = \case
asks agentAsync >>= readTVarIO >>= \case
Just _ -> pure CRChatRunning
_ -> startChatController user $> CRChatStarted
ResubscribeAllConnections -> withUser (subscribeUserConnections resubscribeConnection) $> CRCmdOk
SetFilesFolder filesFolder' -> withUser $ \_ -> do
createDirectoryIfMissing True filesFolder'
ff <- asks filesFolder
@@ -824,15 +825,19 @@ agentSubscriber :: (MonadUnliftIO m, MonadReader ChatController m) => User -> m
agentSubscriber user = do
q <- asks $ subQ . smpAgent
l <- asks chatLock
subscribeUserConnections user
subscribeUserConnections subscribeConnection user
forever $ do
(_, connId, msg) <- atomically $ readTBQueue q
u <- readTVarIO =<< asks currentUser
withLock l . void . runExceptT $
processAgentMessage u connId msg `catchError` (toView . CRChatError)
subscribeUserConnections :: (MonadUnliftIO m, MonadReader ChatController m) => User -> m ()
subscribeUserConnections user@User {userId} = do
subscribeUserConnections ::
(MonadUnliftIO m, MonadReader ChatController m) =>
(forall m'. ChatMonad m' => AgentClient -> ConnId -> ExceptT AgentErrorType m' ()) ->
User ->
m ()
subscribeUserConnections agentSubscribe user@User {userId} = do
n <- asks $ subscriptionConcurrency . config
ce <- asks $ subscriptionEvents . config
void . runExceptT $ do
@@ -902,10 +907,10 @@ subscribeUserConnections user@User {userId} = do
cs <- withStore (`getUserContactLinkConnections` userId)
(subscribeConns n cs >> toView CRUserContactLinkSubscribed)
`catchError` (toView . CRUserContactLinkSubError)
subscribe cId = withAgent (`subscribeConnection` cId)
subscribe cId = withAgent (`agentSubscribe` cId)
subscribeConns n conns =
withAgent $ \a ->
pooledForConcurrentlyN_ n conns $ \c -> subscribeConnection a (aConnId c)
pooledForConcurrentlyN_ n conns $ \c -> agentSubscribe a (aConnId c)
processAgentMessage :: forall m. ChatMonad m => Maybe User -> ConnId -> ACommand 'Agent -> m ()
processAgentMessage Nothing _ _ = throwChatError CENoActiveUser
@@ -1914,6 +1919,7 @@ chatCommandP =
("/user " <|> "/u ") *> (CreateActiveUser <$> userProfile)
<|> ("/user" <|> "/u") $> ShowActiveUser
<|> "/_start" $> StartChat
<|> "/_resubscribe all" $> ResubscribeAllConnections
<|> "/_files_folder " *> (SetFilesFolder <$> filePath)
<|> "/_get chats" *> (APIGetChats <$> (" pcc=on" $> True <|> " pcc=off" $> False <|> pure False))
<|> "/_get chat " *> (APIGetChat <$> chatRefP <* A.space <*> chatPaginationP)

View File

@@ -95,6 +95,7 @@ data ChatCommand
= ShowActiveUser
| CreateActiveUser Profile
| StartChat
| ResubscribeAllConnections
| SetFilesFolder FilePath
| APIGetChats {pendingConnections :: Bool}
| APIGetChat ChatRef ChatPagination