Rewrite remote controller
This commit is contained in:
@@ -19,7 +19,6 @@ module Simplex.Chat where
|
||||
|
||||
import Control.Applicative (optional, (<|>))
|
||||
import Control.Concurrent.STM (retry)
|
||||
import qualified Control.Exception as E
|
||||
import Control.Logger.Simple
|
||||
import Control.Monad
|
||||
import Control.Monad.Except
|
||||
@@ -63,7 +62,6 @@ import Simplex.Chat.Options
|
||||
import Simplex.Chat.ProfileGenerator (generateRandomProfile)
|
||||
import Simplex.Chat.Protocol
|
||||
import Simplex.Chat.Remote
|
||||
import qualified Simplex.Chat.Remote.Discovery as Discovery
|
||||
import Simplex.Chat.Remote.Types
|
||||
import Simplex.Chat.Store
|
||||
import Simplex.Chat.Store.Connections
|
||||
@@ -72,7 +70,6 @@ import Simplex.Chat.Store.Files
|
||||
import Simplex.Chat.Store.Groups
|
||||
import Simplex.Chat.Store.Messages
|
||||
import Simplex.Chat.Store.Profiles
|
||||
import Simplex.Chat.Store.Remote
|
||||
import Simplex.Chat.Store.Shared
|
||||
import Simplex.Chat.Types
|
||||
import Simplex.Chat.Types.Preferences
|
||||
@@ -367,19 +364,6 @@ execRemoteCommand u rh scmd = either (CRChatCmdError u) id <$> runExceptT (withR
|
||||
parseChatCommand :: ByteString -> Either String ChatCommand
|
||||
parseChatCommand = A.parseOnly chatCommandP . B.dropWhileEnd isSpace
|
||||
|
||||
-- | Emit local events.
|
||||
toView :: ChatMonad' m => ChatResponse -> m ()
|
||||
toView = toView_ Nothing
|
||||
|
||||
-- | Used by transport to mark remote events with source.
|
||||
toViewRemote :: ChatMonad' m => RemoteHostId -> ChatResponse -> m ()
|
||||
toViewRemote = toView_ . Just
|
||||
|
||||
toView_ :: ChatMonad' m => Maybe RemoteHostId -> ChatResponse -> m ()
|
||||
toView_ rh event = do
|
||||
q <- asks outputQ
|
||||
atomically $ writeTBQueue q (Nothing, rh, event)
|
||||
|
||||
-- | Chat API commands interpreted in context of a local zone
|
||||
processChatCommand :: forall m. ChatMonad m => ChatCommand -> m ChatResponse
|
||||
processChatCommand = \case
|
||||
@@ -1852,69 +1836,16 @@ processChatCommand = \case
|
||||
p {groupPreferences = Just . setGroupPreference' SGFTimedMessages pref $ groupPreferences p}
|
||||
CreateRemoteHost _displayName -> pure $ chatCmdError Nothing "not supported"
|
||||
ListRemoteHosts -> pure $ chatCmdError Nothing "not supported"
|
||||
StartRemoteHost rh -> do
|
||||
RemoteHost {displayName = _, storePath, caKey, caCert} <- error "TODO: get from DB"
|
||||
(fingerprint :: ByteString, sessionCreds) <- error "TODO: derive session creds" (caKey, caCert)
|
||||
cleanup <- toIO $ chatModifyVar remoteHostSessions (M.delete rh)
|
||||
Discovery.runAnnouncer cleanup fingerprint sessionCreds >>= \case
|
||||
Left todo'err -> pure $ chatCmdError Nothing "TODO: Some HTTP2 error"
|
||||
Right ctrlClient -> do
|
||||
chatModifyVar remoteHostSessions $ M.insert rh RemoteHostSession {storePath, ctrlClient}
|
||||
pure $ CRRemoteHostStarted rh
|
||||
StartRemoteHost rh -> startRemoteHost rh
|
||||
StopRemoteHost rh -> closeRemoteHostSession rh $> CRRemoteHostStopped rh
|
||||
DisposeRemoteHost _rh -> pure $ chatCmdError Nothing "not supported"
|
||||
StartRemoteCtrl -> startRemoteCtrl
|
||||
ConfirmRemoteCtrl rc -> confirmRemoteCtrl rc
|
||||
RejectRemoteCtrl rc -> rejectRemoteCtrl rc
|
||||
StopRemoteCtrl rc -> stopRemoteCtrl rc
|
||||
RegisterRemoteCtrl _displayName _oobData -> pure $ chatCmdError Nothing "not supported"
|
||||
ListRemoteCtrls -> pure $ chatCmdError Nothing "not supported"
|
||||
StartRemoteCtrl ->
|
||||
chatReadVar remoteCtrlSession >>= \case
|
||||
Just _busy -> throwError $ ChatErrorRemoteCtrl RCEBusy
|
||||
Nothing -> do
|
||||
uio <- askUnliftIO
|
||||
accepted <- newEmptyTMVarIO
|
||||
let getControllers = unliftIO uio $ withStore' $ \db ->
|
||||
map (\RemoteCtrl{remoteCtrlId, fingerprint} -> (fingerprint, remoteCtrlId)) <$> getRemoteCtrls (DB.conn db)
|
||||
let started remoteCtrlId = unliftIO uio $ do
|
||||
withStore' (\db -> getRemoteCtrl (DB.conn db) remoteCtrlId) >>= \case
|
||||
Nothing -> pure False
|
||||
Just RemoteCtrl{displayName, accepted=resolution} -> case resolution of
|
||||
Nothing -> do
|
||||
-- started/finished wrapper is synchronous, running HTTP server can be delayed here until UI processes the first contact dialogue
|
||||
toView $ CRRemoteCtrlFirstContact {remoteCtrlId, displayName}
|
||||
atomically $ takeTMVar accepted
|
||||
Just known -> atomically $ putTMVar accepted known $> known
|
||||
let finished remoteCtrlId todo'error = unliftIO uio $ do
|
||||
chatWriteVar remoteCtrlSession Nothing
|
||||
toView $ CRRemoteCtrlDisconnected {remoteCtrlId}
|
||||
let process rc req = unliftIO uio $ processControllerCommand rc req
|
||||
ctrlAsync <- async . liftIO $ Discovery.runDiscoverer getControllers started finished process
|
||||
chatWriteVar remoteCtrlSession $ Just RemoteCtrlSession {ctrlAsync, accepted}
|
||||
pure CRRemoteCtrlStarted
|
||||
ConfirmRemoteCtrl remoteCtrlId -> do
|
||||
chatReadVar remoteCtrlSession >>= \case
|
||||
Nothing -> throwError $ ChatErrorRemoteCtrl RCEInactive
|
||||
Just RemoteCtrlSession {accepted} -> do
|
||||
withStore' $ \db -> markRemoteCtrlResolution (DB.conn db) remoteCtrlId True
|
||||
atomically $ putTMVar accepted True
|
||||
pure $ CRRemoteCtrlAccepted {remoteCtrlId}
|
||||
RejectRemoteCtrl remoteCtrlId -> do
|
||||
chatReadVar remoteCtrlSession >>= \case
|
||||
Nothing -> throwError $ ChatErrorRemoteCtrl RCEInactive
|
||||
Just RemoteCtrlSession {accepted} -> do
|
||||
withStore' $ \db -> markRemoteCtrlResolution (DB.conn db) remoteCtrlId False
|
||||
atomically $ putTMVar accepted False
|
||||
pure $ CRRemoteCtrlRejected {remoteCtrlId}
|
||||
StopRemoteCtrl remoteCtrlId ->
|
||||
chatReadVar remoteCtrlSession >>= \case
|
||||
Nothing -> throwError $ ChatErrorRemoteCtrl RCEInactive
|
||||
Just RemoteCtrlSession {ctrlAsync} -> do
|
||||
cancel ctrlAsync
|
||||
pure $ CRRemoteCtrlDisconnected {remoteCtrlId}
|
||||
DisposeRemoteCtrl remoteCtrlId ->
|
||||
chatReadVar remoteCtrlSession >>= \case
|
||||
Nothing -> do
|
||||
withStore' $ \db -> deleteRemoteCtrl (DB.conn db) remoteCtrlId
|
||||
pure $ CRRemoteCtrlDisposed {remoteCtrlId}
|
||||
Just _ -> throwError $ ChatErrorRemoteCtrl RCEBusy
|
||||
DisposeRemoteCtrl rc -> disposeRemoteCtrl rc
|
||||
QuitChat -> liftIO exitSuccess
|
||||
ShowVersion -> do
|
||||
let versionInfo = coreVersionInfo $(simplexmqCommitQ)
|
||||
@@ -5410,33 +5341,6 @@ withAgent action =
|
||||
>>= runExceptT . action
|
||||
>>= liftEither . first (`ChatErrorAgent` Nothing)
|
||||
|
||||
withStore' :: ChatMonad m => (DB.Connection -> IO a) -> m a
|
||||
withStore' action = withStore $ liftIO . action
|
||||
|
||||
withStore :: ChatMonad m => (DB.Connection -> ExceptT StoreError IO a) -> m a
|
||||
withStore = withStoreCtx Nothing
|
||||
|
||||
withStoreCtx' :: ChatMonad m => Maybe String -> (DB.Connection -> IO a) -> m a
|
||||
withStoreCtx' ctx_ action = withStoreCtx ctx_ $ liftIO . action
|
||||
|
||||
withStoreCtx :: ChatMonad m => Maybe String -> (DB.Connection -> ExceptT StoreError IO a) -> m a
|
||||
withStoreCtx ctx_ action = do
|
||||
ChatController {chatStore} <- ask
|
||||
liftEitherError ChatErrorStore $ case ctx_ of
|
||||
Nothing -> withTransaction chatStore (runExceptT . action) `E.catch` handleInternal ""
|
||||
-- uncomment to debug store performance
|
||||
-- Just ctx -> do
|
||||
-- t1 <- liftIO getCurrentTime
|
||||
-- putStrLn $ "withStoreCtx start :: " <> show t1 <> " :: " <> ctx
|
||||
-- r <- withTransactionCtx ctx_ chatStore (runExceptT . action) `E.catch` handleInternal (" (" <> ctx <> ")")
|
||||
-- t2 <- liftIO getCurrentTime
|
||||
-- putStrLn $ "withStoreCtx end :: " <> show t2 <> " :: " <> ctx <> " :: duration=" <> show (diffToMilliseconds $ diffUTCTime t2 t1)
|
||||
-- pure r
|
||||
Just _ -> withTransaction chatStore (runExceptT . action) `E.catch` handleInternal ""
|
||||
where
|
||||
handleInternal :: String -> E.SomeException -> IO (Either StoreError a)
|
||||
handleInternal ctxStr e = pure . Left . SEInternalError $ show e <> ctxStr
|
||||
|
||||
chatCommandP :: Parser ChatCommand
|
||||
chatCommandP =
|
||||
choice
|
||||
@@ -5689,8 +5593,8 @@ chatCommandP =
|
||||
"/start remote host " *> (StartRemoteHost <$> A.decimal),
|
||||
"/stop remote host " *> (StopRemoteHost <$> A.decimal),
|
||||
"/dispose remote host " *> (DisposeRemoteHost <$> A.decimal),
|
||||
"/register remote ctrl " *> (RegisterRemoteCtrl <$> textP <*> remoteHostOOBP),
|
||||
"/start remote ctrl" $> StartRemoteCtrl,
|
||||
"/register remote ctrl " *> (RegisterRemoteCtrl <$> textP <*> remoteHostOOBP),
|
||||
"/confirm remote ctrl " *> (ConfirmRemoteCtrl <$> A.decimal),
|
||||
"/reject remote ctrl " *> (RejectRemoteCtrl <$> A.decimal),
|
||||
"/stop remote ctrl " *> (StopRemoteCtrl <$> A.decimal),
|
||||
|
||||
@@ -47,7 +47,7 @@ import Simplex.Chat.Messages
|
||||
import Simplex.Chat.Messages.CIContent
|
||||
import Simplex.Chat.Protocol
|
||||
import Simplex.Chat.Remote.Types
|
||||
import Simplex.Chat.Store (AutoAccept, StoreError, UserContactLink, UserMsgReceiptSettings)
|
||||
import Simplex.Chat.Store (AutoAccept, StoreError (..), UserContactLink, UserMsgReceiptSettings)
|
||||
import Simplex.Chat.Types
|
||||
import Simplex.Chat.Types.Preferences
|
||||
import Simplex.Messaging.Agent (AgentClient, SubscriptionsInfo)
|
||||
@@ -57,6 +57,8 @@ import Simplex.Messaging.Agent.Lock
|
||||
import Simplex.Messaging.Agent.Protocol
|
||||
import Simplex.Messaging.Agent.Store.SQLite (MigrationConfirmation, SQLiteStore, UpMigration)
|
||||
import Simplex.Messaging.Agent.Store.SQLite.DB (SlowQueryStats (..))
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Common (withTransaction)
|
||||
import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Crypto.File (CryptoFile (..))
|
||||
import qualified Simplex.Messaging.Crypto.File as CF
|
||||
@@ -67,7 +69,7 @@ import Simplex.Messaging.Protocol (AProtoServerWithAuth, AProtocolType, CorrId,
|
||||
import Simplex.Messaging.TMap (TMap)
|
||||
import Simplex.Messaging.Transport (simplexMQVersion)
|
||||
import Simplex.Messaging.Transport.Client (TransportHost)
|
||||
import Simplex.Messaging.Util (allFinally, catchAllErrors, tryAllErrors, (<$$>))
|
||||
import Simplex.Messaging.Util (allFinally, catchAllErrors, liftEitherError, tryAllErrors, (<$$>))
|
||||
import Simplex.Messaging.Version
|
||||
import System.IO (Handle)
|
||||
import System.Mem.Weak (Weak)
|
||||
@@ -603,11 +605,14 @@ data ChatResponse
|
||||
| CRRemoteCtrlList {remoteCtrls :: [RemoteCtrlInfo]}
|
||||
| CRRemoteCtrlRegistered {remoteCtrlId :: RemoteCtrlId}
|
||||
| CRRemoteCtrlStarted
|
||||
| CRRemoteCtrlFirstContact {remoteCtrlId :: RemoteCtrlId, displayName :: Text}
|
||||
| CRRemoteCtrlAnnounce {fingerprint :: C.KeyHash} -- unregistered fingerprint, needs confirmation
|
||||
| CRRemoteCtrlFound {remoteCtrl::RemoteCtrl} -- registered fingerprint, may connect
|
||||
-- | CRRemoteCtrlFirstContact {remoteCtrlId :: RemoteCtrlId, displayName :: Text}
|
||||
| CRRemoteCtrlAccepted {remoteCtrlId :: RemoteCtrlId}
|
||||
| CRRemoteCtrlRejected {remoteCtrlId :: RemoteCtrlId}
|
||||
| CRRemoteCtrlConnected {remoteCtrlId :: RemoteCtrlId, displayName :: Text}
|
||||
| CRRemoteCtrlDisconnected {remoteCtrlId :: RemoteCtrlId}
|
||||
| CRRemoteCtrlStopped {remoteCtrlId :: RemoteCtrlId}
|
||||
| CRRemoteCtrlDisposed {remoteCtrlId :: RemoteCtrlId}
|
||||
| CRSQLResult {rows :: [Text]}
|
||||
| CRSlowSQLQueries {chatQueries :: [SlowSQLQuery], agentQueries :: [SlowSQLQuery]}
|
||||
| CRDebugLocks {chatLockName :: Maybe String, agentLocks :: AgentLocks}
|
||||
@@ -1106,3 +1111,43 @@ data ArchiveError
|
||||
instance ToJSON ArchiveError where
|
||||
toJSON = J.genericToJSON . sumTypeJSON $ dropPrefix "AE"
|
||||
toEncoding = J.genericToEncoding . sumTypeJSON $ dropPrefix "AE"
|
||||
|
||||
-- | Emit local events.
|
||||
toView :: ChatMonad' m => ChatResponse -> m ()
|
||||
toView = toView_ Nothing
|
||||
|
||||
-- | Used by transport to mark remote events with source.
|
||||
toViewRemote :: ChatMonad' m => RemoteHostId -> ChatResponse -> m ()
|
||||
toViewRemote = toView_ . Just
|
||||
|
||||
toView_ :: ChatMonad' m => Maybe RemoteHostId -> ChatResponse -> m ()
|
||||
toView_ rh event = do
|
||||
q <- asks outputQ
|
||||
atomically $ writeTBQueue q (Nothing, rh, event)
|
||||
|
||||
withStore' :: ChatMonad m => (DB.Connection -> IO a) -> m a
|
||||
withStore' action = withStore $ liftIO . action
|
||||
|
||||
withStore :: ChatMonad m => (DB.Connection -> ExceptT StoreError IO a) -> m a
|
||||
withStore = withStoreCtx Nothing
|
||||
|
||||
withStoreCtx' :: ChatMonad m => Maybe String -> (DB.Connection -> IO a) -> m a
|
||||
withStoreCtx' ctx_ action = withStoreCtx ctx_ $ liftIO . action
|
||||
|
||||
withStoreCtx :: ChatMonad m => Maybe String -> (DB.Connection -> ExceptT StoreError IO a) -> m a
|
||||
withStoreCtx ctx_ action = do
|
||||
ChatController {chatStore} <- ask
|
||||
liftEitherError ChatErrorStore $ case ctx_ of
|
||||
Nothing -> withTransaction chatStore (runExceptT . action) `catch` handleInternal ""
|
||||
-- uncomment to debug store performance
|
||||
-- Just ctx -> do
|
||||
-- t1 <- liftIO getCurrentTime
|
||||
-- putStrLn $ "withStoreCtx start :: " <> show t1 <> " :: " <> ctx
|
||||
-- r <- withTransactionCtx ctx_ chatStore (runExceptT . action) `E.catch` handleInternal (" (" <> ctx <> ")")
|
||||
-- t2 <- liftIO getCurrentTime
|
||||
-- putStrLn $ "withStoreCtx end :: " <> show t2 <> " :: " <> ctx <> " :: duration=" <> show (diffToMilliseconds $ diffUTCTime t2 t1)
|
||||
-- pure r
|
||||
Just _ -> withTransaction chatStore (runExceptT . action) `catch` handleInternal ""
|
||||
where
|
||||
handleInternal :: String -> SomeException -> IO (Either StoreError a)
|
||||
handleInternal ctxStr e = pure . Left . SEInternalError $ show e <> ctxStr
|
||||
|
||||
@@ -20,7 +20,7 @@ CREATE TABLE remote_controllers ( -- controllers known to a hosting app
|
||||
remote_controller_id INTEGER PRIMARY KEY,
|
||||
display_name TEXT NOT NULL,
|
||||
fingerprint BLOB NOT NULL,
|
||||
accepted INTEGER
|
||||
accepted INTEGER -- unknown/rejected/confirmed
|
||||
);
|
||||
|]
|
||||
|
||||
|
||||
@@ -15,14 +15,23 @@ import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.Map.Strict as M
|
||||
import qualified Network.HTTP.Types as HTTP
|
||||
import qualified Network.HTTP2.Client as HTTP2Client
|
||||
import Network.Socket (SockAddr (..), hostAddressToTuple)
|
||||
import Simplex.Chat.Controller
|
||||
import qualified Simplex.Chat.Remote.Discovery as Discovery
|
||||
import Simplex.Chat.Remote.Types
|
||||
import Simplex.Chat.Store.Remote
|
||||
import Simplex.Chat.Types
|
||||
import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Encoding.String (StrEncoding (..))
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
import Simplex.Messaging.Transport.Client (TransportHost (..))
|
||||
import Simplex.Messaging.Transport.HTTP2 (HTTP2Body (..))
|
||||
import qualified Simplex.Messaging.Transport.HTTP2.Client as HTTP2
|
||||
import qualified Simplex.Messaging.Transport.HTTP2.Server as HTTP2
|
||||
import Simplex.Messaging.Util (bshow)
|
||||
import System.Directory (getFileSize)
|
||||
import UnliftIO
|
||||
|
||||
withRemoteHostSession :: (ChatMonad m) => RemoteHostId -> (RemoteHostSession -> m a) -> m a
|
||||
withRemoteHostSession remoteHostId action = do
|
||||
@@ -30,6 +39,17 @@ withRemoteHostSession remoteHostId action = do
|
||||
where
|
||||
err = throwError $ ChatErrorRemoteHost remoteHostId RHMissing
|
||||
|
||||
startRemoteHost :: (ChatMonad m) => RemoteHostId -> m ChatResponse
|
||||
startRemoteHost remoteHostId = do
|
||||
RemoteHost {displayName = _, storePath, caKey, caCert} <- error "TODO: get from DB"
|
||||
(fingerprint :: ByteString, sessionCreds) <- error "TODO: derive session creds" (caKey, caCert)
|
||||
cleanup <- toIO $ chatModifyVar remoteHostSessions (M.delete remoteHostId)
|
||||
Discovery.runAnnouncer cleanup fingerprint sessionCreds >>= \case
|
||||
Left todo'err -> pure $ chatCmdError Nothing "TODO: Some HTTP2 error"
|
||||
Right ctrlClient -> do
|
||||
chatModifyVar remoteHostSessions $ M.insert remoteHostId RemoteHostSession {storePath, ctrlClient}
|
||||
pure $ CRRemoteHostStarted remoteHostId
|
||||
|
||||
closeRemoteHostSession :: (ChatMonad m) => RemoteHostId -> m ()
|
||||
closeRemoteHostSession rh = withRemoteHostSession rh (liftIO . HTTP2.closeHTTP2Client . ctrlClient)
|
||||
|
||||
@@ -68,10 +88,10 @@ relayCommand RemoteHostSession {ctrlClient} s =
|
||||
storeRemoteFile :: (ChatMonad m) => RemoteHostSession -> FilePath -> m ChatResponse
|
||||
storeRemoteFile RemoteHostSession {ctrlClient} localFile = do
|
||||
postFile Nothing ctrlClient "/store" mempty localFile >>= \case
|
||||
Left e -> error "TODO: http2chatError"
|
||||
Left todo'err -> error "TODO: http2chatError"
|
||||
Right HTTP2.HTTP2Response {response} -> case HTTP.statusCode <$> HTTP2Client.responseStatus response of
|
||||
Just 200 -> pure $ CRCmdOk Nothing
|
||||
unexpected -> error "TODO: http2chatError"
|
||||
todo'notOk -> error "TODO: http2chatError"
|
||||
where
|
||||
postFile timeout c path hs file = liftIO $ do
|
||||
fileSize <- fromIntegral <$> getFileSize file
|
||||
@@ -95,11 +115,88 @@ sum2tagged = \case
|
||||
J.Object todo'convert -> J.Object todo'convert
|
||||
skip -> skip
|
||||
|
||||
-- withRemoteCtrlSession :: (ChatMonad m) => RemoteCtrlId -> (RemoteCtrlSession -> m a) -> m a
|
||||
-- withRemoteCtrlSession remoteCtrlId action = do
|
||||
-- chatReadVar remoteHostSessions >>= maybe err action . M.lookup remoteCtrlId
|
||||
-- where
|
||||
-- err = throwError $ ChatErrorRemoteCtrl (Just remoteCtrlId) RCMissing
|
||||
|
||||
processControllerCommand :: (ChatMonad m) => RemoteCtrlId -> HTTP2.HTTP2Request -> m ()
|
||||
processControllerCommand rc req = error "TODO: processControllerCommand"
|
||||
|
||||
-- * ChatRequest handlers
|
||||
|
||||
startRemoteCtrl :: (ChatMonad m) => m ChatResponse
|
||||
startRemoteCtrl =
|
||||
chatReadVar remoteCtrlSession >>= \case
|
||||
Just _busy -> throwError $ ChatErrorRemoteCtrl RCEBusy
|
||||
Nothing -> do
|
||||
accepted <- newEmptyTMVarIO
|
||||
discovered <- newTVarIO mempty
|
||||
listener <- async $ discoverRemoteCtrls discovered
|
||||
_supervisor <- async $ do
|
||||
uiEvent <- async $ atomically $ readTMVar accepted
|
||||
waitEitherCatchCancel listener uiEvent >>= \case
|
||||
Left _ -> pure () -- discover got cancelled or crashed on some UDP error
|
||||
Right (Left _) -> pure () -- readTMVar blocked indefinitely (should not happen)
|
||||
Right (Right remoteCtrlId) -> do
|
||||
-- got connection confirmation
|
||||
(source, fingerprint) <-
|
||||
atomically $
|
||||
TM.lookup remoteCtrlId discovered >>= \case
|
||||
Nothing -> error "Session accepted without getting registered"
|
||||
Just found -> found <$ writeTVar discovered mempty -- flush unused sources
|
||||
host <- async $ runRemoteHost remoteCtrlId source fingerprint
|
||||
chatWriteVar remoteCtrlSession $ Just RemoteCtrlSession {ctrlAsync = host, accepted}
|
||||
_ <- waitCatch host
|
||||
chatWriteVar remoteCtrlSession Nothing
|
||||
toView $ CRRemoteCtrlStopped {remoteCtrlId}
|
||||
chatWriteVar remoteCtrlSession $ Just RemoteCtrlSession {ctrlAsync = listener, accepted}
|
||||
pure CRRemoteCtrlStarted
|
||||
|
||||
discoverRemoteCtrls :: (ChatMonad m) => TM.TMap RemoteCtrlId (TransportHost, C.KeyHash) -> m ()
|
||||
discoverRemoteCtrls discovered = Discovery.openListener >>= go
|
||||
where
|
||||
go sock =
|
||||
Discovery.recvAnnounce sock >>= \case
|
||||
(SockAddrInet _port addr, invite) -> case strDecode invite of
|
||||
Left _ -> go sock -- ignore malformed datagrams
|
||||
Right fingerprint -> do
|
||||
withStore' (\db -> getRemoteCtrlByFingerprint (DB.conn db) fingerprint) >>= \case
|
||||
Nothing -> toView $ CRRemoteCtrlAnnounce fingerprint
|
||||
Just found@RemoteCtrl {remoteCtrlId} -> do
|
||||
atomically $ TM.insert remoteCtrlId (THIPv4 (hostAddressToTuple addr), fingerprint) discovered
|
||||
toView $ CRRemoteCtrlFound found
|
||||
_nonV4 -> go sock
|
||||
|
||||
runRemoteHost :: (ChatMonad m) => RemoteCtrlId -> TransportHost -> C.KeyHash -> m ()
|
||||
runRemoteHost remoteCtrlId remoteCtrlHost fingerprint =
|
||||
Discovery.connectSessionHost remoteCtrlHost fingerprint $ Discovery.attachServer (processControllerCommand remoteCtrlId)
|
||||
|
||||
confirmRemoteCtrl :: (ChatMonad m) => RemoteCtrlId -> m ChatResponse
|
||||
confirmRemoteCtrl remoteCtrlId =
|
||||
chatReadVar remoteCtrlSession >>= \case
|
||||
Nothing -> throwError $ ChatErrorRemoteCtrl RCEInactive
|
||||
Just RemoteCtrlSession {accepted} -> do
|
||||
withStore' $ \db -> markRemoteCtrlResolution (DB.conn db) remoteCtrlId True
|
||||
atomically $ putTMVar accepted remoteCtrlId -- the remote host can now proceed with connection
|
||||
pure $ CRRemoteCtrlAccepted {remoteCtrlId}
|
||||
|
||||
rejectRemoteCtrl :: (ChatMonad m) => RemoteCtrlId -> m ChatResponse
|
||||
rejectRemoteCtrl remoteCtrlId =
|
||||
chatReadVar remoteCtrlSession >>= \case
|
||||
Nothing -> throwError $ ChatErrorRemoteCtrl RCEInactive
|
||||
Just RemoteCtrlSession {ctrlAsync} -> do
|
||||
withStore' $ \db -> markRemoteCtrlResolution (DB.conn db) remoteCtrlId False
|
||||
cancel ctrlAsync
|
||||
pure $ CRRemoteCtrlRejected {remoteCtrlId}
|
||||
|
||||
stopRemoteCtrl :: (ChatMonad m) => RemoteCtrlId -> m ChatResponse
|
||||
stopRemoteCtrl remoteCtrlId =
|
||||
chatReadVar remoteCtrlSession >>= \case
|
||||
Nothing -> throwError $ ChatErrorRemoteCtrl RCEInactive
|
||||
Just RemoteCtrlSession {ctrlAsync} -> do
|
||||
cancel ctrlAsync
|
||||
pure CRRemoteCtrlStopped {remoteCtrlId}
|
||||
|
||||
disposeRemoteCtrl :: (ChatMonad m) => RemoteCtrlId -> m ChatResponse
|
||||
disposeRemoteCtrl remoteCtrlId =
|
||||
chatReadVar remoteCtrlSession >>= \case
|
||||
Nothing -> do
|
||||
withStore' $ \db -> deleteRemoteCtrl (DB.conn db) remoteCtrlId
|
||||
pure $ CRRemoteCtrlDisposed {remoteCtrlId}
|
||||
Just _ -> throwError $ ChatErrorRemoteCtrl RCEBusy
|
||||
|
||||
@@ -2,14 +2,22 @@
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE PatternSynonyms #-}
|
||||
|
||||
module Simplex.Chat.Remote.Discovery
|
||||
( runAnnouncer,
|
||||
runDiscoverer,
|
||||
( -- * Announce
|
||||
runAnnouncer,
|
||||
|
||||
-- * Discovery
|
||||
openListener,
|
||||
recvAnnounce,
|
||||
connectSessionHost,
|
||||
attachServer,
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Monad
|
||||
import Data.ByteString (ByteString)
|
||||
import Data.Default (def)
|
||||
import Data.String (IsString)
|
||||
import Debug.Trace
|
||||
@@ -28,6 +36,13 @@ import Simplex.Messaging.Transport.Server (defaultTransportServerConfig, runTran
|
||||
import UnliftIO
|
||||
import UnliftIO.Concurrent
|
||||
|
||||
-- | Link-local broadcast address.
|
||||
pattern BROADCAST_ADDR_V4 :: (IsString a, Eq a) => a
|
||||
pattern BROADCAST_ADDR_V4 = "255.255.255.255"
|
||||
|
||||
pattern BROADCAST_PORT :: (IsString a, Eq a) => a
|
||||
pattern BROADCAST_PORT = "5226"
|
||||
|
||||
runAnnouncer :: (StrEncoding invite, MonadUnliftIO m) => IO () -> invite -> TLS.Credentials -> m (Either HTTP2ClientError HTTP2Client)
|
||||
runAnnouncer finished invite credentials = do
|
||||
started <- newEmptyTMVarIO
|
||||
@@ -40,7 +55,7 @@ runAnnouncer finished invite credentials = do
|
||||
TLS.serverSupported = supportedParameters
|
||||
}
|
||||
httpClient <- newEmptyMVar
|
||||
liftIO $ runTransportServer started partyPort serverParams defaultTransportServerConfig (run aPid httpClient)
|
||||
liftIO $ runTransportServer started BROADCAST_PORT serverParams defaultTransportServerConfig (run aPid httpClient)
|
||||
takeMVar httpClient
|
||||
where
|
||||
announcer started inviteBS = do
|
||||
@@ -48,10 +63,10 @@ runAnnouncer finished invite credentials = do
|
||||
False ->
|
||||
error "Server not started?.."
|
||||
True -> liftIO $ do
|
||||
traceM $ "TCP server started at " <> partyPort
|
||||
sock <- UDP.clientSocket broadcastAddrV4 partyPort False
|
||||
traceM $ "TCP server started at " <> BROADCAST_PORT
|
||||
sock <- UDP.clientSocket BROADCAST_ADDR_V4 BROADCAST_PORT False
|
||||
N.setSocketOption (UDP.udpSocket sock) N.Broadcast 1
|
||||
traceM $ "UDP announce started at " <> broadcastAddrV4 <> ":" <> partyPort
|
||||
traceM $ "UDP announce started at " <> BROADCAST_ADDR_V4 <> ":" <> BROADCAST_PORT
|
||||
traceM $ "Server invite: " <> show inviteBS
|
||||
forever $ do
|
||||
UDP.send sock inviteBS
|
||||
@@ -61,47 +76,25 @@ runAnnouncer finished invite credentials = do
|
||||
run aPid clientVar tls = do
|
||||
cancel aPid
|
||||
let partyHost = "255.255.255.255" -- XXX: get from tls somehow? not required as host verification is disabled.
|
||||
attachHTTP2Client defaultHTTP2ClientConfig partyHost partyPort finished defaultHTTP2BufferSize tls >>= putMVar clientVar
|
||||
attachHTTP2Client defaultHTTP2ClientConfig partyHost BROADCAST_PORT finished defaultHTTP2BufferSize tls >>= putMVar clientVar
|
||||
|
||||
-- | Link-local broadcast address.
|
||||
broadcastAddrV4 :: (IsString a) => a
|
||||
broadcastAddrV4 = "255.255.255.255"
|
||||
|
||||
partyPort :: (IsString a) => a
|
||||
partyPort = "5226" -- XXX: should be `0` or something, to get a random port and announce it
|
||||
|
||||
runDiscoverer :: IO [(C.KeyHash, ctx)] -> (ctx -> IO Bool) -> (ctx -> Maybe SomeException -> IO ()) -> (ctx -> HTTP2Request -> IO ()) -> IO ()
|
||||
runDiscoverer getFingerprints started finished processRequest = do
|
||||
sock <- UDP.serverSocket (broadcastAddrV4, read partyPort)
|
||||
openListener :: (MonadIO m) => m UDP.ListenSocket
|
||||
openListener = liftIO $ do
|
||||
sock <- UDP.serverSocket (BROADCAST_ADDR_V4, read BROADCAST_PORT)
|
||||
N.setSocketOption (UDP.listenSocket sock) N.Broadcast 1
|
||||
traceM $ "runDiscoverer: " <> show sock
|
||||
go sock
|
||||
where
|
||||
go sock = do
|
||||
(invite, UDP.ClientSockAddr source _cmsg) <- UDP.recvFrom sock
|
||||
case strDecode invite of
|
||||
Left err -> do
|
||||
traceM $ "Inivite decode error: " <> err
|
||||
go sock
|
||||
Right inviteHash -> do
|
||||
expected <- getFingerprints
|
||||
case lookup inviteHash expected of
|
||||
Nothing -> do
|
||||
traceM $ "Unexpected invite: " <> show (invite, source)
|
||||
go sock
|
||||
Just ctx -> do
|
||||
host <- case source of
|
||||
N.SockAddrInet _port addr -> do
|
||||
pure $ THIPv4 (N.hostAddressToTuple addr)
|
||||
unexpected ->
|
||||
-- TODO: actually, Apple mandates IPv6 support
|
||||
fail $ "Discoverer: expected an IPv4 party, got " <> show unexpected
|
||||
runTransportClient defaultTransportClientConfig Nothing host partyPort (Just inviteHash) $ \tls -> do
|
||||
accepted <- started ctx
|
||||
if not accepted
|
||||
then go sock -- Ignore rejected invites and wait for another
|
||||
else do
|
||||
res <- try $ runHTTP2ServerWith defaultHTTP2BufferSize ($ tls) $ \sessionId r sendResponse -> do
|
||||
reqBody <- getHTTP2Body r 16384
|
||||
processRequest ctx HTTP2Request {sessionId, request = r, reqBody, sendResponse}
|
||||
finished ctx $ either Just (\() -> Nothing) res
|
||||
pure sock
|
||||
|
||||
recvAnnounce :: (MonadIO m) => UDP.ListenSocket -> m (N.SockAddr, ByteString)
|
||||
recvAnnounce sock = liftIO $ do
|
||||
(invite, UDP.ClientSockAddr source _cmsg) <- UDP.recvFrom sock
|
||||
pure (source, invite)
|
||||
|
||||
connectSessionHost :: (MonadUnliftIO m) => TransportHost -> C.KeyHash -> (Transport.TLS -> m a) -> m a
|
||||
connectSessionHost host caFingerprint = runTransportClient defaultTransportClientConfig Nothing host BROADCAST_PORT (Just caFingerprint)
|
||||
|
||||
attachServer :: (MonadUnliftIO m) => (HTTP2Request -> m ()) -> Transport.TLS -> m ()
|
||||
attachServer processRequest tls = do
|
||||
withRunInIO $ \unlift ->
|
||||
runHTTP2ServerWith defaultHTTP2BufferSize ($ tls) $ \sessionId r sendResponse -> do
|
||||
reqBody <- getHTTP2Body r defaultHTTP2BufferSize
|
||||
unlift $ processRequest HTTP2Request {sessionId, request = r, reqBody, sendResponse}
|
||||
|
||||
@@ -1,11 +1,15 @@
|
||||
{-# LANGUAGE DeriveAnyClass #-}
|
||||
{-# LANGUAGE DeriveGeneric #-}
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
|
||||
module Simplex.Chat.Remote.Types where
|
||||
|
||||
import Control.Concurrent.Async (Async)
|
||||
import Data.Aeson (ToJSON)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import Data.Int (Int64)
|
||||
import Data.Text (Text)
|
||||
import GHC.Generics (Generic)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Transport.HTTP2.Client (HTTP2Client)
|
||||
import UnliftIO.STM
|
||||
@@ -22,6 +26,7 @@ data RemoteHost = RemoteHost
|
||||
-- | Credentials signing key for root and session certs
|
||||
caKey :: C.Key
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
type RemoteCtrlId = Int64
|
||||
|
||||
@@ -31,6 +36,7 @@ data RemoteCtrl = RemoteCtrl
|
||||
fingerprint :: C.KeyHash,
|
||||
accepted :: Maybe Bool
|
||||
}
|
||||
deriving (Show, Generic, ToJSON)
|
||||
|
||||
data RemoteHostSession = RemoteHostSession
|
||||
{ -- | Path for local resources to be synchronized with host
|
||||
@@ -41,5 +47,5 @@ data RemoteHostSession = RemoteHostSession
|
||||
data RemoteCtrlSession = RemoteCtrlSession
|
||||
{ -- | Server side of transport to process remote commands and forward notifications
|
||||
ctrlAsync :: Async (),
|
||||
accepted :: TMVar Bool
|
||||
accepted :: TMVar RemoteCtrlId
|
||||
}
|
||||
|
||||
@@ -37,6 +37,11 @@ getRemoteCtrl db remoteCtrlId =
|
||||
maybeFirstRow toRemoteCtrl $
|
||||
DB.query db (remoteCtrlQuery <> "WHERE remote_controller_id = ?") (DB.Only remoteCtrlId)
|
||||
|
||||
getRemoteCtrlByFingerprint :: DB.Connection -> C.KeyHash -> IO (Maybe RemoteCtrl)
|
||||
getRemoteCtrlByFingerprint db fingerprint =
|
||||
maybeFirstRow toRemoteCtrl $
|
||||
DB.query db (remoteCtrlQuery <> "WHERE fingerprint = ?") (DB.Only fingerprint)
|
||||
|
||||
remoteCtrlQuery :: DB.Query
|
||||
remoteCtrlQuery = "SELECT remote_controller_id, display_name, fingerprint, accepted FROM remote_controllers"
|
||||
|
||||
@@ -46,7 +51,7 @@ toRemoteCtrl (remoteCtrlId, displayName, fingerprint, accepted) =
|
||||
|
||||
markRemoteCtrlResolution :: DB.Connection -> RemoteCtrlId -> Bool -> IO ()
|
||||
markRemoteCtrlResolution db remoteCtrlId accepted =
|
||||
DB.execute db "UPDATE remote_controllers SET accepted = ? WHERE remote_controller_id = ?" (accepted, remoteCtrlId)
|
||||
DB.execute db "UPDATE remote_controllers SET accepted = ? WHERE remote_controller_id = ? AND accepted IS NULL" (accepted, remoteCtrlId)
|
||||
|
||||
deleteRemoteCtrl :: DB.Connection -> RemoteCtrlId -> IO ()
|
||||
deleteRemoteCtrl db remoteCtrlId =
|
||||
|
||||
Reference in New Issue
Block a user