core: improve stability of file transfer handshake by using async agent commands (#1541)
This commit is contained in:
@@ -1032,7 +1032,7 @@ public enum ChatErrorType: Decodable {
|
||||
case groupNotJoined(groupInfo: GroupInfo)
|
||||
case groupMemberNotActive
|
||||
case groupMemberUserRemoved
|
||||
case groupMemberNotFound(contactName: ContactName)
|
||||
case groupMemberNotFound
|
||||
case groupMemberIntroNotFound(contactName: ContactName)
|
||||
case groupCantResendInvitation(groupInfo: GroupInfo, contactName: ContactName)
|
||||
case groupInternal(message: String)
|
||||
|
||||
@@ -1407,48 +1407,44 @@ acceptFileReceive :: forall m. ChatMonad m => User -> RcvFileTransfer -> Maybe B
|
||||
acceptFileReceive user@User {userId} RcvFileTransfer {fileId, fileInvitation = FileInvitation {fileName = fName, fileConnReq, fileInline, fileSize}, fileStatus, grpMemberId} rcvInline_ filePath_ = do
|
||||
unless (fileStatus == RFSNew) $ case fileStatus of
|
||||
RFSCancelled _ -> throwChatError $ CEFileCancelled fName
|
||||
_ -> pure () -- throwChatError $ CEFileAlreadyReceiving fName
|
||||
_ -> throwChatError $ CEFileAlreadyReceiving fName
|
||||
case fileConnReq of
|
||||
-- direct file protocol
|
||||
Just connReq -> do
|
||||
agentConnId <- withAgent $ \a -> joinConnection a True connReq . directMessage $ XFileAcpt fName
|
||||
connIds <- joinAgentConnectionAsync user True connReq . directMessage $ XFileAcpt fName
|
||||
filePath <- getRcvFilePath fileId filePath_ fName
|
||||
withStore $ \db -> acceptRcvFileTransfer db user fileId agentConnId ConnJoined filePath
|
||||
withStore $ \db -> acceptRcvFileTransfer db user fileId connIds ConnJoined filePath
|
||||
-- group & direct file protocol
|
||||
Nothing -> do
|
||||
chatRef <- withStore $ \db -> getChatRefByFileId db user fileId
|
||||
case (chatRef, grpMemberId) of
|
||||
(ChatRef CTDirect contactId, Nothing) -> do
|
||||
ct <- withStore $ \db -> getContact db user contactId
|
||||
(msg, ci) <- acceptFile
|
||||
void $ sendDirectContactMessage ct msg
|
||||
pure ci
|
||||
acceptFile CFCreateConnFileInvDirect $ \msg -> void $ sendDirectContactMessage ct msg
|
||||
(ChatRef CTGroup groupId, Just memId) -> do
|
||||
GroupMember {activeConn} <- withStore $ \db -> getGroupMember db user groupId memId
|
||||
case activeConn of
|
||||
Just conn -> do
|
||||
(msg, ci) <- acceptFile
|
||||
void $ sendDirectMessage conn msg $ GroupId groupId
|
||||
pure ci
|
||||
acceptFile CFCreateConnFileInvGroup $ \msg -> void $ sendDirectMessage conn msg $ GroupId groupId
|
||||
_ -> throwChatError $ CEFileInternal "member connection not active"
|
||||
_ -> throwChatError $ CEFileInternal "invalid chat ref for file transfer"
|
||||
where
|
||||
acceptFile :: m (ChatMsgEvent 'Json, AChatItem)
|
||||
acceptFile = do
|
||||
sharedMsgId <- withStore $ \db -> getSharedMsgIdByFileId db userId fileId
|
||||
acceptFile :: CommandFunction -> (ChatMsgEvent 'Json -> m ()) -> m AChatItem
|
||||
acceptFile cmdFunction send = do
|
||||
filePath <- getRcvFilePath fileId filePath_ fName
|
||||
inline <- receiveInline
|
||||
if
|
||||
| inline -> do
|
||||
-- accepting inline
|
||||
ci <- withStore $ \db -> acceptRcvInlineFT db user fileId filePath
|
||||
pure (XFileAcptInv sharedMsgId Nothing fName, ci)
|
||||
sharedMsgId <- withStore $ \db -> getSharedMsgIdByFileId db userId fileId
|
||||
send $ XFileAcptInv sharedMsgId Nothing fName
|
||||
pure ci
|
||||
| fileInline == Just IFMSent -> throwChatError $ CEFileAlreadyReceiving fName
|
||||
| otherwise -> do
|
||||
-- accepting via a new connection
|
||||
(agentConnId, fileInvConnReq) <- withAgent $ \a -> createConnection a True SCMInvitation Nothing
|
||||
ci <- withStore $ \db -> acceptRcvFileTransfer db user fileId agentConnId ConnNew filePath
|
||||
pure (XFileAcptInv sharedMsgId (Just fileInvConnReq) fName, ci)
|
||||
connIds <- createAgentConnectionAsync user cmdFunction True SCMInvitation
|
||||
withStore $ \db -> acceptRcvFileTransfer db user fileId connIds ConnNew filePath
|
||||
receiveInline :: m Bool
|
||||
receiveInline = do
|
||||
ChatConfig {fileChunkSize, inlineFiles = InlineFilesConfig {receiveChunks, offerChunks}} <- asks config
|
||||
@@ -2083,8 +2079,29 @@ processAgentMessage (Just user@User {userId}) corrId agentConnId agentMessage =
|
||||
_ -> pure ()
|
||||
|
||||
processRcvFileConn :: ACommand 'Agent -> Connection -> RcvFileTransfer -> m ()
|
||||
processRcvFileConn agentMsg conn ft =
|
||||
processRcvFileConn agentMsg conn ft@RcvFileTransfer {fileId, fileInvitation = FileInvitation {fileName}, grpMemberId} =
|
||||
case agentMsg of
|
||||
INV (ACR _ cReq) ->
|
||||
withCompletedCommand conn agentMsg $ \CommandData {cmdFunction} ->
|
||||
case cReq of
|
||||
fileInvConnReq@(CRInvitationUri _ _) -> case cmdFunction of
|
||||
-- [async agent commands] direct XFileAcptInv continuation on receiving INV
|
||||
CFCreateConnFileInvDirect -> do
|
||||
ct <- withStore $ \db -> getContactByFileId db user fileId
|
||||
sharedMsgId <- withStore $ \db -> getSharedMsgIdByFileId db userId fileId
|
||||
void $ sendDirectContactMessage ct (XFileAcptInv sharedMsgId (Just fileInvConnReq) fileName)
|
||||
-- [async agent commands] group XFileAcptInv continuation on receiving INV
|
||||
CFCreateConnFileInvGroup -> case grpMemberId of
|
||||
Just gMemberId -> do
|
||||
GroupMember {groupId, activeConn} <- withStore $ \db -> getGroupMemberById db user gMemberId
|
||||
case activeConn of
|
||||
Just gMemberConn -> do
|
||||
sharedMsgId <- withStore $ \db -> getSharedMsgIdByFileId db userId fileId
|
||||
void $ sendDirectMessage gMemberConn (XFileAcptInv sharedMsgId (Just fileInvConnReq) fileName) $ GroupId groupId
|
||||
_ -> throwChatError $ CECommandError "no GroupMember activeConn"
|
||||
_ -> throwChatError $ CECommandError "no grpMemberId"
|
||||
_ -> throwChatError $ CECommandError "unexpected cmdFunction"
|
||||
CRContactUri _ -> throwChatError $ CECommandError "unexpected ConnectionRequestUri type"
|
||||
-- SMP CONF for RcvFileConnection happens for group file protocol
|
||||
-- when sender of the file "joins" connection created by the recipient
|
||||
-- (sender doesn't create connections for all group members)
|
||||
@@ -2534,7 +2551,7 @@ processAgentMessage (Just user@User {userId}) corrId agentConnId agentMessage =
|
||||
xFileAcptInvGroup g@GroupInfo {groupId} m@GroupMember {activeConn} sharedMsgId fileConnReq_ fName msgMeta = do
|
||||
checkIntegrityCreateItem (CDGroupRcv g m) msgMeta
|
||||
fileId <- withStore $ \db -> getGroupFileIdBySharedMsgId db userId groupId sharedMsgId
|
||||
-- TODO check that it's not already accpeted
|
||||
-- TODO check that it's not already accepted
|
||||
ft@FileTransferMeta {fileName, fileSize, fileInline, cancelled} <- withStore (\db -> getFileTransferMeta db user fileId)
|
||||
if fName == fileName
|
||||
then unless cancelled $ case (fileConnReq_, activeConn) of
|
||||
|
||||
@@ -87,6 +87,7 @@ module Simplex.Chat.Store
|
||||
getGroupMemberIdByName,
|
||||
getGroupInfoByName,
|
||||
getGroupMember,
|
||||
getGroupMemberById,
|
||||
getGroupMembers,
|
||||
getGroupMembersForExpiration,
|
||||
deleteGroupConnectionsAndFiles,
|
||||
@@ -153,6 +154,7 @@ module Simplex.Chat.Store
|
||||
createRcvGroupFileTransfer,
|
||||
getRcvFileTransfer,
|
||||
acceptRcvFileTransfer,
|
||||
getContactByFileId,
|
||||
acceptRcvInlineFT,
|
||||
startRcvInlineFT,
|
||||
updateRcvFileStatus,
|
||||
@@ -782,8 +784,6 @@ toContactOrError user (((contactId, profileId, localDisplayName, viaGroup, displ
|
||||
in Right Contact {contactId, localDisplayName, profile, activeConn, viaGroup, contactUsed, chatSettings, userPreferences, mergedPreferences, createdAt, updatedAt}
|
||||
_ -> Left $ SEContactNotReady localDisplayName
|
||||
|
||||
-- TODO return the last connection that is ready, not any last connection
|
||||
-- requires updating connection status
|
||||
getContactByName :: DB.Connection -> User -> ContactName -> ExceptT StoreError IO Contact
|
||||
getContactByName db user localDisplayName = do
|
||||
cId <- getContactIdByName db user localDisplayName
|
||||
@@ -1895,48 +1895,45 @@ toGroupInfo userContactId ((groupId, localDisplayName, displayName, fullName, de
|
||||
groupProfile = GroupProfile {displayName, fullName, description, image, groupPreferences}
|
||||
in GroupInfo {groupId, localDisplayName, groupProfile, fullGroupPreferences, membership, hostConnCustomUserProfileId, chatSettings, createdAt, updatedAt}
|
||||
|
||||
groupMemberQuery :: Query
|
||||
groupMemberQuery =
|
||||
[sql|
|
||||
SELECT
|
||||
m.group_member_id, m.group_id, m.member_id, m.member_role, m.member_category, m.member_status,
|
||||
m.invited_by, m.local_display_name, m.contact_id, m.contact_profile_id, p.contact_profile_id, p.display_name, p.full_name, p.image, p.local_alias, p.preferences,
|
||||
c.connection_id, c.agent_conn_id, c.conn_level, c.via_contact, c.via_user_contact_link, c.via_group_link, c.group_link_id, c.custom_user_profile_id,
|
||||
c.conn_status, c.conn_type, c.local_alias, c.contact_id, c.group_member_id, c.snd_file_id, c.rcv_file_id, c.user_contact_link_id, c.created_at, c.security_code, c.security_code_verified_at
|
||||
FROM group_members m
|
||||
JOIN contact_profiles p ON p.contact_profile_id = COALESCE(m.member_profile_id, m.contact_profile_id)
|
||||
LEFT JOIN connections c ON c.connection_id = (
|
||||
SELECT max(cc.connection_id)
|
||||
FROM connections cc
|
||||
where cc.group_member_id = m.group_member_id
|
||||
)
|
||||
|]
|
||||
|
||||
getGroupMember :: DB.Connection -> User -> GroupId -> GroupMemberId -> ExceptT StoreError IO GroupMember
|
||||
getGroupMember db user@User {userId} groupId groupMemberId =
|
||||
ExceptT . firstRow (toContactMember user) (SEGroupMemberNotFound {groupId, groupMemberId}) $
|
||||
ExceptT . firstRow (toContactMember user) (SEGroupMemberNotFound groupMemberId) $
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT
|
||||
m.group_member_id, m.group_id, m.member_id, m.member_role, m.member_category, m.member_status,
|
||||
m.invited_by, m.local_display_name, m.contact_id, m.contact_profile_id, p.contact_profile_id, p.display_name, p.full_name, p.image, p.local_alias, p.preferences,
|
||||
c.connection_id, c.agent_conn_id, c.conn_level, c.via_contact, c.via_user_contact_link, c.via_group_link, c.group_link_id, c.custom_user_profile_id,
|
||||
c.conn_status, c.conn_type, c.local_alias, c.contact_id, c.group_member_id, c.snd_file_id, c.rcv_file_id, c.user_contact_link_id, c.created_at, c.security_code, c.security_code_verified_at
|
||||
FROM group_members m
|
||||
JOIN contact_profiles p ON p.contact_profile_id = COALESCE(m.member_profile_id, m.contact_profile_id)
|
||||
LEFT JOIN connections c ON c.connection_id = (
|
||||
SELECT max(cc.connection_id)
|
||||
FROM connections cc
|
||||
where cc.group_member_id = m.group_member_id
|
||||
)
|
||||
WHERE m.group_id = ? AND m.group_member_id = ? AND m.user_id = ?
|
||||
|]
|
||||
(groupMemberQuery <> " WHERE m.group_id = ? AND m.group_member_id = ? AND m.user_id = ?")
|
||||
(groupId, groupMemberId, userId)
|
||||
|
||||
getGroupMemberById :: DB.Connection -> User -> GroupMemberId -> ExceptT StoreError IO GroupMember
|
||||
getGroupMemberById db user@User {userId} groupMemberId =
|
||||
ExceptT . firstRow (toContactMember user) (SEGroupMemberNotFound groupMemberId) $
|
||||
DB.query
|
||||
db
|
||||
(groupMemberQuery <> " WHERE m.group_member_id = ? AND m.user_id = ?")
|
||||
(groupMemberId, userId)
|
||||
|
||||
getGroupMembers :: DB.Connection -> User -> GroupInfo -> IO [GroupMember]
|
||||
getGroupMembers db user@User {userId, userContactId} GroupInfo {groupId} = do
|
||||
map (toContactMember user)
|
||||
<$> DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT
|
||||
m.group_member_id, m.group_id, m.member_id, m.member_role, m.member_category, m.member_status,
|
||||
m.invited_by, m.local_display_name, m.contact_id, m.contact_profile_id, p.contact_profile_id, p.display_name, p.full_name, p.image, p.local_alias, p.preferences,
|
||||
c.connection_id, c.agent_conn_id, c.conn_level, c.via_contact, c.via_user_contact_link, c.via_group_link, c.group_link_id, c.custom_user_profile_id,
|
||||
c.conn_status, c.conn_type, c.local_alias, c.contact_id, c.group_member_id, c.snd_file_id, c.rcv_file_id, c.user_contact_link_id, c.created_at, c.security_code, c.security_code_verified_at
|
||||
FROM group_members m
|
||||
JOIN contact_profiles p ON p.contact_profile_id = COALESCE(m.member_profile_id, m.contact_profile_id)
|
||||
LEFT JOIN connections c ON c.connection_id = (
|
||||
SELECT max(cc.connection_id)
|
||||
FROM connections cc
|
||||
where cc.group_member_id = m.group_member_id
|
||||
)
|
||||
WHERE m.group_id = ? AND m.user_id = ? AND (m.contact_id IS NULL OR m.contact_id != ?)
|
||||
|]
|
||||
(groupMemberQuery <> " WHERE m.group_id = ? AND m.user_id = ? AND (m.contact_id IS NULL OR m.contact_id != ?)")
|
||||
(groupId, userId, userContactId)
|
||||
|
||||
getGroupMembersForExpiration :: DB.Connection -> User -> GroupInfo -> IO [GroupMember]
|
||||
@@ -1944,25 +1941,15 @@ getGroupMembersForExpiration db user@User {userId, userContactId} GroupInfo {gro
|
||||
map (toContactMember user)
|
||||
<$> DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT
|
||||
m.group_member_id, m.group_id, m.member_id, m.member_role, m.member_category, m.member_status,
|
||||
m.invited_by, m.local_display_name, m.contact_id, m.contact_profile_id, p.contact_profile_id, p.display_name, p.full_name, p.image, p.local_alias,
|
||||
c.connection_id, c.agent_conn_id, c.conn_level, c.via_contact, c.via_user_contact_link, c.via_group_link, c.group_link_id, c.custom_user_profile_id,
|
||||
c.conn_status, c.conn_type, c.local_alias, c.contact_id, c.group_member_id, c.snd_file_id, c.rcv_file_id, c.user_contact_link_id, c.created_at, c.security_code, c.security_code_verified_at
|
||||
FROM group_members m
|
||||
JOIN contact_profiles p ON p.contact_profile_id = COALESCE(m.member_profile_id, m.contact_profile_id)
|
||||
LEFT JOIN connections c ON c.connection_id = (
|
||||
SELECT max(cc.connection_id)
|
||||
FROM connections cc
|
||||
where cc.group_member_id = m.group_member_id
|
||||
)
|
||||
WHERE m.group_id = ? AND m.user_id = ? AND (m.contact_id IS NULL OR m.contact_id != ?)
|
||||
AND m.member_status IN (?, ?, ?)
|
||||
AND m.group_member_id NOT IN (
|
||||
SELECT DISTINCT group_member_id FROM chat_items
|
||||
)
|
||||
|]
|
||||
( groupMemberQuery
|
||||
<> [sql|
|
||||
WHERE m.group_id = ? AND m.user_id = ? AND (m.contact_id IS NULL OR m.contact_id != ?)
|
||||
AND m.member_status IN (?, ?, ?)
|
||||
AND m.group_member_id NOT IN (
|
||||
SELECT DISTINCT group_member_id FROM chat_items
|
||||
)
|
||||
|]
|
||||
)
|
||||
(groupId, userId, userContactId, GSMemRemoved, GSMemLeft, GSMemGroupDeleted)
|
||||
|
||||
toContactMember :: User -> (GroupMemberRow :. MaybeConnectionRow) -> GroupMember
|
||||
@@ -2753,16 +2740,27 @@ getRcvFileTransfer db user@User {userId} fileId = do
|
||||
_ -> pure Nothing
|
||||
cancelled = fromMaybe False cancelled_
|
||||
|
||||
acceptRcvFileTransfer :: DB.Connection -> User -> Int64 -> ConnId -> ConnStatus -> FilePath -> ExceptT StoreError IO AChatItem
|
||||
acceptRcvFileTransfer db user@User {userId} fileId agentConnId connStatus filePath = ExceptT $ do
|
||||
acceptRcvFileTransfer :: DB.Connection -> User -> Int64 -> (CommandId, ConnId) -> ConnStatus -> FilePath -> ExceptT StoreError IO AChatItem
|
||||
acceptRcvFileTransfer db user@User {userId} fileId (cmdId, acId) connStatus filePath = ExceptT $ do
|
||||
currentTs <- getCurrentTime
|
||||
acceptRcvFT_ db user fileId filePath currentTs
|
||||
DB.execute
|
||||
db
|
||||
"INSERT INTO connections (agent_conn_id, conn_status, conn_type, rcv_file_id, user_id, created_at, updated_at) VALUES (?,?,?,?,?,?,?)"
|
||||
(agentConnId, connStatus, ConnRcvFile, fileId, userId, currentTs, currentTs)
|
||||
(acId, connStatus, ConnRcvFile, fileId, userId, currentTs, currentTs)
|
||||
connId <- insertedRowId db
|
||||
setCommandConnId db user cmdId connId
|
||||
runExceptT $ getChatItemByFileId db user fileId
|
||||
|
||||
getContactByFileId :: DB.Connection -> User -> FileTransferId -> ExceptT StoreError IO Contact
|
||||
getContactByFileId db user@User {userId} fileId = do
|
||||
cId <- getContactIdByFileId
|
||||
getContact db user cId
|
||||
where
|
||||
getContactIdByFileId =
|
||||
ExceptT . firstRow fromOnly (SEContactNotFoundByFileId fileId) $
|
||||
DB.query db "SELECT contact_id FROM files WHERE user_id = ? AND file_id = ?" (userId, fileId)
|
||||
|
||||
acceptRcvInlineFT :: DB.Connection -> User -> Int64 -> FilePath -> ExceptT StoreError IO AChatItem
|
||||
acceptRcvInlineFT db user fileId filePath = do
|
||||
liftIO $ acceptRcvFT_ db user fileId filePath =<< getCurrentTime
|
||||
@@ -4628,7 +4626,7 @@ data StoreError
|
||||
| SEGroupNotFound {groupId :: GroupId}
|
||||
| SEGroupNotFoundByName {groupName :: GroupName}
|
||||
| SEGroupMemberNameNotFound {groupId :: GroupId, groupMemberName :: ContactName}
|
||||
| SEGroupMemberNotFound {groupId :: GroupId, groupMemberId :: GroupMemberId}
|
||||
| SEGroupMemberNotFound {groupMemberId :: GroupMemberId}
|
||||
| SEGroupWithoutUser
|
||||
| SEDuplicateGroupMember
|
||||
| SEGroupAlreadyJoined
|
||||
@@ -4657,6 +4655,7 @@ data StoreError
|
||||
| SEDuplicateGroupLink {groupInfo :: GroupInfo}
|
||||
| SEGroupLinkNotFound {groupInfo :: GroupInfo}
|
||||
| SEHostMemberIdNotFound {groupId :: Int64}
|
||||
| SEContactNotFoundByFileId {fileId :: FileTransferId}
|
||||
deriving (Show, Exception, Generic)
|
||||
|
||||
instance ToJSON StoreError where
|
||||
|
||||
@@ -1522,6 +1522,8 @@ instance TextEncoding CommandStatus where
|
||||
data CommandFunction
|
||||
= CFCreateConnGrpMemInv
|
||||
| CFCreateConnGrpInv
|
||||
| CFCreateConnFileInvDirect
|
||||
| CFCreateConnFileInvGroup
|
||||
| CFJoinConn
|
||||
| CFAllowConn
|
||||
| CFAcceptContact
|
||||
@@ -1537,6 +1539,8 @@ instance TextEncoding CommandFunction where
|
||||
textDecode = \case
|
||||
"create_conn" -> Just CFCreateConnGrpMemInv
|
||||
"create_conn_grp_inv" -> Just CFCreateConnGrpInv
|
||||
"create_conn_file_inv_direct" -> Just CFCreateConnFileInvDirect
|
||||
"create_conn_file_inv_group" -> Just CFCreateConnFileInvGroup
|
||||
"join_conn" -> Just CFJoinConn
|
||||
"allow_conn" -> Just CFAllowConn
|
||||
"accept_contact" -> Just CFAcceptContact
|
||||
@@ -1546,6 +1550,8 @@ instance TextEncoding CommandFunction where
|
||||
textEncode = \case
|
||||
CFCreateConnGrpMemInv -> "create_conn"
|
||||
CFCreateConnGrpInv -> "create_conn_grp_inv"
|
||||
CFCreateConnFileInvDirect -> "create_conn_file_inv_direct"
|
||||
CFCreateConnFileInvGroup -> "create_conn_file_inv_group"
|
||||
CFJoinConn -> "join_conn"
|
||||
CFAllowConn -> "allow_conn"
|
||||
CFAcceptContact -> "accept_contact"
|
||||
@@ -1556,6 +1562,8 @@ commandExpectedResponse :: CommandFunction -> ACommandTag 'Agent
|
||||
commandExpectedResponse = \case
|
||||
CFCreateConnGrpMemInv -> INV_
|
||||
CFCreateConnGrpInv -> INV_
|
||||
CFCreateConnFileInvDirect -> INV_
|
||||
CFCreateConnFileInvGroup -> INV_
|
||||
CFJoinConn -> OK_
|
||||
CFAllowConn -> OK_
|
||||
CFAcceptContact -> OK_
|
||||
|
||||
Reference in New Issue
Block a user