Prep for MS Teams plugin API (#25565)

* - columns added to ShareChannelRemotes: lastpostcreateat, lastpostupdateat
- SyncMsg and SyncResponse moved to `model` package
- field added to RemoteCluster struct: PluginID

* sync new posts before updated posts to ensure post order in MS Teams

* add plugid to remoteclusters table and store

* don't sync history by default
This commit is contained in:
Doug Lauder 2023-12-04 13:10:20 -05:00 committed by GitHub
parent b2ec1ff8ae
commit 8bf9e4c481
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 471 additions and 132 deletions

View File

@ -230,6 +230,8 @@ channels/db/migrations/mysql/000115_user_reporting_changes.down.sql
channels/db/migrations/mysql/000115_user_reporting_changes.up.sql
channels/db/migrations/mysql/000116_create_outgoing_oauth_connections.down.sql
channels/db/migrations/mysql/000116_create_outgoing_oauth_connections.up.sql
channels/db/migrations/mysql/000117_msteams_shared_channels.down.sql
channels/db/migrations/mysql/000117_msteams_shared_channels.up.sql
channels/db/migrations/postgres/000001_create_teams.down.sql
channels/db/migrations/postgres/000001_create_teams.up.sql
channels/db/migrations/postgres/000002_create_team_members.down.sql
@ -460,3 +462,5 @@ channels/db/migrations/postgres/000115_user_reporting_changes.down.sql
channels/db/migrations/postgres/000115_user_reporting_changes.up.sql
channels/db/migrations/postgres/000116_create_outgoing_oauth_connections.down.sql
channels/db/migrations/postgres/000116_create_outgoing_oauth_connections.up.sql
channels/db/migrations/postgres/000117_msteams_shared_channels.down.sql
channels/db/migrations/postgres/000117_msteams_shared_channels.up.sql

View File

@ -0,0 +1,47 @@
SET @preparedStatement = (SELECT IF(
EXISTS(
SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS
WHERE table_name = 'RemoteClusters'
AND table_schema = DATABASE()
AND column_name = 'PluginID'
),
'ALTER TABLE RemoteClusters DROP COLUMN PluginID;',
'SELECT 1;'
));
PREPARE removeColumnIfExists FROM @preparedStatement;
EXECUTE removeColumnIfExists;
DEALLOCATE PREPARE removeColumnIfExists;
SET @preparedStatement = (SELECT IF(
EXISTS(
SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS
WHERE table_name = 'SharedChannelRemotes'
AND table_schema = DATABASE()
AND column_name = 'LastPostCreateAt'
),
'ALTER TABLE SharedChannelRemotes DROP COLUMN LastPostCreateAt;',
'SELECT 1;'
));
PREPARE removeColumnIfExists FROM @preparedStatement;
EXECUTE removeColumnIfExists;
DEALLOCATE PREPARE removeColumnIfExists;
SET @preparedStatement = (SELECT IF(
EXISTS(
SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS
WHERE table_name = 'SharedChannelRemotes'
AND table_schema = DATABASE()
AND column_name = 'LastPostCreateID'
),
'ALTER TABLE SharedChannelRemotes DROP COLUMN LastPostCreateID;',
'SELECT 1;'
));
PREPARE removeColumnIfExists FROM @preparedStatement;
EXECUTE removeColumnIfExists;
DEALLOCATE PREPARE removeColumnIfExists;

View File

@ -0,0 +1,47 @@
SET @preparedStatement = (SELECT IF(
NOT EXISTS(
SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS
WHERE table_name = 'RemoteClusters'
AND table_schema = DATABASE()
AND column_name = 'PluginID'
),
'ALTER TABLE RemoteClusters ADD COLUMN PluginID varchar(190) NOT NULL DEFAULT \'\';',
'SELECT 1;'
));
PREPARE addColumnIfNotExists FROM @preparedStatement;
EXECUTE addColumnIfNotExists;
DEALLOCATE PREPARE addColumnIfNotExists;
SET @preparedStatement = (SELECT IF(
NOT EXISTS(
SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS
WHERE table_name = 'SharedChannelRemotes'
AND table_schema = DATABASE()
AND column_name = 'LastPostCreateAt'
),
'ALTER TABLE SharedChannelRemotes ADD COLUMN LastPostCreateAt bigint NOT NULL DEFAULT 0;',
'SELECT 1;'
));
PREPARE addColumnIfNotExists FROM @preparedStatement;
EXECUTE addColumnIfNotExists;
DEALLOCATE PREPARE addColumnIfNotExists;
SET @preparedStatement = (SELECT IF(
NOT EXISTS(
SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS
WHERE table_name = 'SharedChannelRemotes'
AND table_schema = DATABASE()
AND column_name = 'LastPostCreateID'
),
'ALTER TABLE SharedChannelRemotes ADD COLUMN LastPostCreateID varchar(26);',
'SELECT 1;'
));
PREPARE addColumnIfNotExists FROM @preparedStatement;
EXECUTE addColumnIfNotExists;
DEALLOCATE PREPARE addColumnIfNotExists;

View File

@ -0,0 +1,6 @@
ALTER TABLE RemoteClusters DROP COLUMN IF EXISTS PluginID;
ALTER TABLE SharedChannelRemotes DROP COLUMN IF EXISTS LastPostCreateAt;
ALTER TABLE SharedChannelRemotes DROP COLUMN IF EXISTS LastPostCreateID;

View File

@ -0,0 +1,6 @@
ALTER TABLE RemoteClusters ADD COLUMN IF NOT EXISTS PluginID VARCHAR(190) NOT NULL DEFAULT '';
ALTER TABLE SharedChannelRemotes ADD COLUMN IF NOT EXISTS LastPostCreateAt bigint NOT NULL DEFAULT 0;
ALTER TABLE SharedChannelRemotes ADD COLUMN IF NOT EXISTS LastPostCreateID VARCHAR(26);

View File

@ -13,9 +13,10 @@ import (
"sync"
"time"
sq "github.com/mattermost/squirrel"
"github.com/pkg/errors"
sq "github.com/mattermost/squirrel"
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/shared/mlog"
"github.com/mattermost/mattermost/server/public/shared/request"
@ -1396,10 +1397,22 @@ func (s *SqlPostStore) GetPostsSinceForSync(options model.GetPostsSinceForSyncOp
query := s.getQueryBuilder().
Select("*").
From("Posts").
Where(sq.Or{sq.Gt{"Posts.UpdateAt": cursor.LastPostUpdateAt}, sq.And{sq.Eq{"Posts.UpdateAt": cursor.LastPostUpdateAt}, sq.Gt{"Posts.Id": cursor.LastPostId}}}).
OrderBy("Posts.UpdateAt", "Id").
Limit(uint64(limit))
if options.SinceCreateAt {
query = query.Where(sq.Or{
sq.Gt{"Posts.CreateAt": cursor.LastPostCreateAt},
sq.And{
sq.Eq{"Posts.CreateAt": cursor.LastPostCreateAt},
sq.Gt{"Posts.Id": cursor.LastPostCreateID},
},
})
} else {
query = query.Where(sq.Or{sq.Gt{"Posts.UpdateAt": cursor.LastPostUpdateAt},
sq.And{sq.Eq{"Posts.UpdateAt": cursor.LastPostUpdateAt}, sq.Gt{"Posts.Id": cursor.LastPostUpdateID}}})
}
if options.ChannelId != "" {
query = query.Where(sq.Eq{"Posts.ChannelId": options.ChannelId})
}
@ -1424,8 +1437,13 @@ func (s *SqlPostStore) GetPostsSinceForSync(options model.GetPostsSinceForSyncOp
}
if len(posts) != 0 {
cursor.LastPostUpdateAt = posts[len(posts)-1].UpdateAt
cursor.LastPostId = posts[len(posts)-1].Id
if options.SinceCreateAt {
cursor.LastPostCreateAt = posts[len(posts)-1].CreateAt
cursor.LastPostCreateID = posts[len(posts)-1].Id
} else {
cursor.LastPostUpdateAt = posts[len(posts)-1].UpdateAt
cursor.LastPostUpdateID = posts[len(posts)-1].Id
}
}
return posts, cursor, nil
}

View File

@ -7,9 +7,10 @@ import (
"fmt"
"strings"
sq "github.com/mattermost/squirrel"
"github.com/pkg/errors"
sq "github.com/mattermost/squirrel"
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/v8/channels/store"
)
@ -38,6 +39,7 @@ func remoteClusterFields(prefix string) []string {
prefix + "RemoteToken",
prefix + "Topics",
prefix + "CreatorId",
prefix + "PluginID",
}
}
@ -49,10 +51,10 @@ func (s sqlRemoteClusterStore) Save(remoteCluster *model.RemoteCluster) (*model.
query := `INSERT INTO RemoteClusters
(RemoteId, RemoteTeamId, Name, DisplayName, SiteURL, CreateAt,
LastPingAt, Token, RemoteToken, Topics, CreatorId)
LastPingAt, Token, RemoteToken, Topics, CreatorId, PluginID)
VALUES
(:RemoteId, :RemoteTeamId, :Name, :DisplayName, :SiteURL, :CreateAt,
:LastPingAt, :Token, :RemoteToken, :Topics, :CreatorId)`
:LastPingAt, :Token, :RemoteToken, :Topics, :CreatorId, :PluginID)`
if _, err := s.GetMasterX().NamedExec(query, remoteCluster); err != nil {
return nil, errors.Wrap(err, "failed to save RemoteCluster")
@ -75,7 +77,8 @@ func (s sqlRemoteClusterStore) Update(remoteCluster *model.RemoteCluster) (*mode
CreatorId = :CreatorId,
DisplayName = :DisplayName,
SiteURL = :SiteURL,
Topics = :Topics
Topics = :Topics,
PluginID = :PluginID
WHERE RemoteId = :RemoteId AND Name = :Name`
if _, err := s.GetMasterX().NamedExec(query, remoteCluster); err != nil {
@ -149,6 +152,10 @@ func (s sqlRemoteClusterStore) GetAll(filter model.RemoteClusterQueryFilter) ([]
query = query.Where(sq.NotEq{"rc.SiteURL": ""})
}
if filter.PluginID != "" {
query = query.Where(sq.Eq{"rc.PluginID": filter.PluginID})
}
if filter.Topic != "" {
trimmed := strings.TrimSpace(filter.Topic)
if trimmed == "" || trimmed == "*" {

View File

@ -11,8 +11,9 @@ import (
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/v8/channels/store"
sq "github.com/mattermost/squirrel"
"github.com/pkg/errors"
sq "github.com/mattermost/squirrel"
)
const (
@ -333,8 +334,10 @@ func (s SqlSharedChannelStore) SaveRemote(remote *model.SharedChannelRemote) (*m
}
query, args, err := s.getQueryBuilder().Insert("SharedChannelRemotes").
Columns("Id", "ChannelId", "CreatorId", "CreateAt", "UpdateAt", "IsInviteAccepted", "IsInviteConfirmed", "RemoteId", "LastPostUpdateAt", "LastPostId").
Values(remote.Id, remote.ChannelId, remote.CreatorId, remote.CreateAt, remote.UpdateAt, remote.IsInviteAccepted, remote.IsInviteConfirmed, remote.RemoteId, remote.LastPostUpdateAt, remote.LastPostId).
Columns("Id", "ChannelId", "CreatorId", "CreateAt", "UpdateAt", "IsInviteAccepted", "IsInviteConfirmed", "RemoteId",
"LastPostCreateAt", "LastPostCreateId", "LastPostUpdateAt", "LastPostId").
Values(remote.Id, remote.ChannelId, remote.CreatorId, remote.CreateAt, remote.UpdateAt, remote.IsInviteAccepted, remote.IsInviteConfirmed, remote.RemoteId,
remote.LastPostCreateAt, remote.LastPostCreateID, remote.LastPostUpdateAt, remote.LastPostUpdateID).
ToSql()
if err != nil {
return nil, errors.Wrapf(err, "savesharedchannelremote_tosql")
@ -359,8 +362,10 @@ func (s SqlSharedChannelStore) UpdateRemote(remote *model.SharedChannelRemote) (
Set("IsInviteAccepted", remote.IsInviteAccepted).
Set("IsInviteConfirmed", remote.IsInviteConfirmed).
Set("RemoteId", remote.RemoteId).
Set("LastPostCreateAt", remote.LastPostUpdateAt).
Set("LastPostCreateId", remote.LastPostCreateID).
Set("LastPostUpdateAt", remote.LastPostUpdateAt).
Set("LastPostId", remote.LastPostId).
Set("LastPostId", remote.LastPostUpdateID).
Where(sq.And{
sq.Eq{"Id": remote.Id},
sq.Eq{"ChannelId": remote.ChannelId},
@ -398,8 +403,10 @@ func sharedChannelRemoteFields(prefix string) []string {
prefix + "IsInviteAccepted",
prefix + "IsInviteConfirmed",
prefix + "RemoteId",
prefix + "LastPostCreateAt",
"COALESCE(" + prefix + "LastPostCreateID,'') AS LastPostCreateID",
prefix + "LastPostUpdateAt",
prefix + "LastPostId",
"COALESCE(" + prefix + "LastPostId,'') AS LastPostUpdateID",
}
}
@ -532,14 +539,32 @@ func (s SqlSharedChannelStore) GetRemoteForUser(remoteId string, userId string)
return &rc, nil
}
// UpdateRemoteCursor updates the LastPostUpdateAt timestamp and LastPostId for the specified SharedChannelRemote.
// UpdateRemoteCursor updates the cursor for the specified SharedChannelRemote.
func (s SqlSharedChannelStore) UpdateRemoteCursor(id string, cursor model.GetPostsSinceForSyncCursor) error {
squery, args, err := s.getQueryBuilder().
var updateNeeded bool
builder := s.getQueryBuilder().
Update("SharedChannelRemotes").
Set("LastPostUpdateAt", cursor.LastPostUpdateAt).
Set("LastPostId", cursor.LastPostId).
Where(sq.Eq{"Id": id}).
ToSql()
Where(sq.Eq{"Id": id})
if cursor.LastPostCreateAt > 0 || cursor.LastPostCreateID != "" {
builder = builder.Set("LastPostCreateAt", cursor.LastPostCreateAt)
builder = builder.Set("LastPostCreateId", cursor.LastPostCreateID)
updateNeeded = true
}
if cursor.LastPostUpdateAt > 0 || cursor.LastPostUpdateID != "" {
builder = builder.Set("LastPostUpdateAt", cursor.LastPostUpdateAt)
builder = builder.Set("LastPostId", cursor.LastPostUpdateID)
updateNeeded = true
}
if !updateNeeded {
// no new cursor provided.
return fmt.Errorf("cursor empty")
}
squery, args, err := builder.ToSql()
if err != nil {
return errors.Wrap(err, "update_shared_channel_remote_cursor_tosql")
}

View File

@ -58,7 +58,8 @@ func TestPostStore(t *testing.T, rctx request.CTX, ss store.Store, s SqlStore) {
t.Run("GetDirectPostParentsForExportAfterBatched", func(t *testing.T) { testPostStoreGetDirectPostParentsForExportAfterBatched(t, rctx, ss, s) })
t.Run("GetForThread", func(t *testing.T) { testPostStoreGetForThread(t, rctx, ss) })
t.Run("HasAutoResponsePostByUserSince", func(t *testing.T) { testHasAutoResponsePostByUserSince(t, rctx, ss) })
t.Run("GetPostsSinceForSync", func(t *testing.T) { testGetPostsSinceForSync(t, rctx, ss, s) })
t.Run("GetPostsSinceUpdateForSync", func(t *testing.T) { testGetPostsSinceUpdateForSync(t, rctx, ss, s) })
t.Run("GetPostsSinceCreateForSync", func(t *testing.T) { testGetPostsSinceCreateForSync(t, rctx, ss, s) })
t.Run("SetPostReminder", func(t *testing.T) { testSetPostReminder(t, rctx, ss, s) })
t.Run("GetPostReminders", func(t *testing.T) { testGetPostReminders(t, rctx, ss, s) })
t.Run("GetPostReminderMetadata", func(t *testing.T) { testGetPostReminderMetadata(t, rctx, ss, s) })
@ -4655,7 +4656,7 @@ func testHasAutoResponsePostByUserSince(t *testing.T, rctx request.CTX, ss store
})
}
func testGetPostsSinceForSync(t *testing.T, rctx request.CTX, ss store.Store, s SqlStore) {
func testGetPostsSinceUpdateForSync(t *testing.T, rctx request.CTX, ss store.Store, s SqlStore) {
// create some posts.
channelID := model.NewId()
remoteID := model.NewString(model.NewId())
@ -4758,6 +4759,113 @@ func testGetPostsSinceForSync(t *testing.T, rctx request.CTX, ss store.Store, s
})
}
func testGetPostsSinceCreateForSync(t *testing.T, rctx request.CTX, ss store.Store, s SqlStore) {
// create some posts.
channelID := model.NewId()
remoteID := model.NewString(model.NewId())
first := model.GetMillis()
data := []*model.Post{
{Id: model.NewId(), ChannelId: channelID, UserId: model.NewId(), Message: "test post 0"},
{Id: model.NewId(), ChannelId: channelID, UserId: model.NewId(), Message: "test post 1"},
{Id: model.NewId(), ChannelId: channelID, UserId: model.NewId(), Message: "test post 2"},
{Id: model.NewId(), ChannelId: channelID, UserId: model.NewId(), Message: "test post 3", RemoteId: remoteID},
{Id: model.NewId(), ChannelId: channelID, UserId: model.NewId(), Message: "test post 4", RemoteId: remoteID},
{Id: model.NewId(), ChannelId: channelID, UserId: model.NewId(), Message: "test post 5", RemoteId: remoteID},
{Id: model.NewId(), ChannelId: channelID, UserId: model.NewId(), Message: "test post 6", RemoteId: remoteID},
{Id: model.NewId(), ChannelId: channelID, UserId: model.NewId(), Message: "test post 7"},
{Id: model.NewId(), ChannelId: channelID, UserId: model.NewId(), Message: "test post 8", DeleteAt: model.GetMillis()},
{Id: model.NewId(), ChannelId: channelID, UserId: model.NewId(), Message: "test post 9", DeleteAt: model.GetMillis()},
}
for i, p := range data {
p.CreateAt = first + (int64(i) * 300000)
if p.RemoteId == nil {
p.RemoteId = model.NewString(model.NewId())
}
_, err := ss.Post().Save(p)
require.NoError(t, err, "couldn't save post")
}
t.Run("Invalid channel id", func(t *testing.T) {
opt := model.GetPostsSinceForSyncOptions{
ChannelId: model.NewId(),
SinceCreateAt: true,
}
cursor := model.GetPostsSinceForSyncCursor{}
posts, cursorOut, err := ss.Post().GetPostsSinceForSync(opt, cursor, 100)
require.NoError(t, err)
require.Empty(t, posts, "should return zero posts")
require.Equal(t, cursor, cursorOut)
})
t.Run("Get by channel, exclude remotes, exclude deleted", func(t *testing.T) {
opt := model.GetPostsSinceForSyncOptions{
ChannelId: channelID,
ExcludeRemoteId: *remoteID,
SinceCreateAt: true,
}
cursor := model.GetPostsSinceForSyncCursor{}
posts, _, err := ss.Post().GetPostsSinceForSync(opt, cursor, 100)
require.NoError(t, err)
require.ElementsMatch(t, getPostIds(data[0:3], data[7]), getPostIds(posts))
})
t.Run("Include deleted", func(t *testing.T) {
opt := model.GetPostsSinceForSyncOptions{
ChannelId: channelID,
IncludeDeleted: true,
SinceCreateAt: true,
}
cursor := model.GetPostsSinceForSyncCursor{}
posts, _, err := ss.Post().GetPostsSinceForSync(opt, cursor, 100)
require.NoError(t, err)
require.ElementsMatch(t, getPostIds(data), getPostIds(posts))
})
t.Run("Limit and cursor", func(t *testing.T) {
opt := model.GetPostsSinceForSyncOptions{
ChannelId: channelID,
SinceCreateAt: true,
}
cursor := model.GetPostsSinceForSyncCursor{}
posts1, cursor, err := ss.Post().GetPostsSinceForSync(opt, cursor, 5)
require.NoError(t, err)
require.Len(t, posts1, 5, "should get 5 posts")
posts2, _, err := ss.Post().GetPostsSinceForSync(opt, cursor, 5)
require.NoError(t, err)
require.Len(t, posts2, 3, "should get 3 posts")
require.ElementsMatch(t, getPostIds(data[0:8]), getPostIds(posts1, posts2...))
})
t.Run("CreateAt collisions", func(t *testing.T) {
// this test requires all the CreateAt timestamps to be the same.
result, err := s.GetMasterX().Exec("UPDATE Posts SET CreateAt = ?", model.GetMillis())
require.NoError(t, err)
rows, err := result.RowsAffected()
require.NoError(t, err)
require.Greater(t, rows, int64(0))
opt := model.GetPostsSinceForSyncOptions{
ChannelId: channelID,
}
cursor := model.GetPostsSinceForSyncCursor{}
posts1, cursor, err := ss.Post().GetPostsSinceForSync(opt, cursor, 5)
require.NoError(t, err)
require.Len(t, posts1, 5, "should get 5 posts")
posts2, _, err := ss.Post().GetPostsSinceForSync(opt, cursor, 5)
require.NoError(t, err)
require.Len(t, posts2, 3, "should get 3 posts")
require.ElementsMatch(t, getPostIds(data[0:8]), getPostIds(posts1, posts2...))
})
}
func testSetPostReminder(t *testing.T, rctx request.CTX, ss store.Store, s SqlStore) {
// Basic
userID := NewTestId()

View File

@ -15,6 +15,10 @@ import (
"github.com/stretchr/testify/require"
)
const (
testPluginID = "com.sample.blap"
)
func TestRemoteClusterStore(t *testing.T, rctx request.CTX, ss store.Store) {
t.Run("RemoteClusterGetAllInChannel", func(t *testing.T) { testRemoteClusterGetAllInChannel(t, rctx, ss) })
t.Run("RemoteClusterGetAllNotInChannel", func(t *testing.T) { testRemoteClusterGetAllNotInChannel(t, rctx, ss) })
@ -89,6 +93,7 @@ func testRemoteClusterGet(t *testing.T, rctx request.CTX, ss store.Store) {
Name: "shortlived_remote_2",
SiteURL: "nowhere.com",
CreatorId: model.NewId(),
PluginID: testPluginID,
}
rcSaved, err := ss.RemoteCluster().Save(rc)
require.NoError(t, err)
@ -96,6 +101,7 @@ func testRemoteClusterGet(t *testing.T, rctx request.CTX, ss store.Store) {
rcGet, err := ss.RemoteCluster().Get(rcSaved.RemoteId)
require.NoError(t, err)
require.Equal(t, rcSaved.RemoteId, rcGet.RemoteId)
require.Equal(t, testPluginID, rcGet.PluginID)
})
t.Run("Get not found", func(t *testing.T) {
@ -237,8 +243,8 @@ func testRemoteClusterGetAllInChannel(t *testing.T, rctx request.CTX, ss store.S
// Create some remote clusters
rcData := []*model.RemoteCluster{
{Name: "AAAA_Inc", CreatorId: userId, SiteURL: "aaaa.com", RemoteId: model.NewId(), LastPingAt: now},
{Name: "BBBB_Inc", CreatorId: userId, SiteURL: "bbbb.com", RemoteId: model.NewId(), LastPingAt: 0},
{Name: "AAAA_Inc", CreatorId: userId, SiteURL: "aaaa.com", RemoteId: model.NewId(), LastPingAt: now, PluginID: testPluginID},
{Name: "BBBB_Inc", CreatorId: userId, SiteURL: "bbbb.com", RemoteId: model.NewId(), LastPingAt: 0, PluginID: testPluginID},
{Name: "CCCC_Inc", CreatorId: userId, SiteURL: "cccc.com", RemoteId: model.NewId(), LastPingAt: now},
{Name: "DDDD_Inc", CreatorId: userId, SiteURL: "dddd.com", RemoteId: model.NewId(), LastPingAt: now},
{Name: "EEEE_Inc", CreatorId: userId, SiteURL: "eeee.com", RemoteId: model.NewId(), LastPingAt: 0},
@ -270,6 +276,8 @@ func testRemoteClusterGetAllInChannel(t *testing.T, rctx request.CTX, ss store.S
require.Len(t, list, 2, "channel 1 should have 2 remote clusters")
ids := getIds(list)
require.ElementsMatch(t, []string{rcData[0].RemoteId, rcData[1].RemoteId}, ids)
require.Equal(t, testPluginID, rcData[0].PluginID)
require.Equal(t, testPluginID, rcData[1].PluginID)
})
t.Run("Channel 1 online only", func(t *testing.T) {

View File

@ -765,28 +765,52 @@ func testUpdateSharedChannelRemoteCursor(t *testing.T, rctx request.CTX, ss stor
remoteSaved, err := ss.SharedChannel().SaveRemote(remote)
require.NoError(t, err, "couldn't save remote", err)
future := model.GetMillis() + 3600000 // 1 hour in the future
postID := model.NewId()
futureCreateAt := model.GetMillis() + 3600000 // 1 hour in the future
postCreateID := model.NewId()
cursor := model.GetPostsSinceForSyncCursor{
LastPostUpdateAt: future,
LastPostId: postID,
futureUpdateAt := model.GetMillis() + (3600000 * 2) // 2 hours in the future
postUpdateID := model.NewId()
cursorCreate := model.GetPostsSinceForSyncCursor{
LastPostCreateAt: futureCreateAt,
LastPostCreateID: postCreateID,
}
t.Run("Update NextSyncAt for remote", func(t *testing.T) {
err := ss.SharedChannel().UpdateRemoteCursor(remoteSaved.Id, cursor)
require.NoError(t, err, "update NextSyncAt should not error", err)
cursorUpdate := model.GetPostsSinceForSyncCursor{
LastPostUpdateAt: futureUpdateAt,
LastPostUpdateID: postUpdateID,
}
t.Run("Update cursor CreateAt for remote", func(t *testing.T) {
err := ss.SharedChannel().UpdateRemoteCursor(remoteSaved.Id, cursorCreate)
require.NoError(t, err, "update cursor should not error", err)
r, err := ss.SharedChannel().GetRemote(remoteSaved.Id)
require.NoError(t, err)
require.Equal(t, future, r.LastPostUpdateAt)
require.Equal(t, postID, r.LastPostId)
require.Equal(t, futureCreateAt, r.LastPostCreateAt)
require.Equal(t, postCreateID, r.LastPostCreateID)
})
t.Run("Update NextSyncAt for non-existent shared channel remote", func(t *testing.T) {
err := ss.SharedChannel().UpdateRemoteCursor(model.NewId(), cursor)
t.Run("Update cursor UpdateAt for remote", func(t *testing.T) {
err := ss.SharedChannel().UpdateRemoteCursor(remoteSaved.Id, cursorUpdate)
require.NoError(t, err, "update cursor should not error", err)
r, err := ss.SharedChannel().GetRemote(remoteSaved.Id)
require.NoError(t, err)
require.Equal(t, futureUpdateAt, r.LastPostUpdateAt)
require.Equal(t, postUpdateID, r.LastPostUpdateID)
})
t.Run("Update cursor for non-existent shared channel remote", func(t *testing.T) {
err := ss.SharedChannel().UpdateRemoteCursor(model.NewId(), cursorUpdate)
require.Error(t, err, "update non-existent remote should error", err)
})
t.Run("Update with empty cursor", func(t *testing.T) {
emptyCursor := model.GetPostsSinceForSyncCursor{}
err := ss.SharedChannel().UpdateRemoteCursor(remoteSaved.Id, emptyCursor)
require.Error(t, err, "update with empty cursor should error", err)
})
}
func testDeleteSharedChannelRemote(t *testing.T, rctx request.CTX, ss store.Store) {

View File

@ -106,6 +106,7 @@ func (rcs *Service) pingRemote(rc *model.RemoteCluster) error {
if err != nil {
return err
}
rc.LastPingAt = model.GetMillis()
ping := model.RemoteClusterPing{}
err = json.Unmarshal(resp, &ping)
@ -120,7 +121,6 @@ func (rcs *Service) pingRemote(rc *model.RemoteCluster) error {
mlog.Err(err),
)
}
rc.LastPingAt = model.GetMillis()
if metrics := rcs.server.GetMetrics(); metrics != nil {
sentAt := time.Unix(0, ping.SentAt*int64(time.Millisecond))

View File

@ -87,6 +87,8 @@ func (scs *Service) SendChannelInvite(channel *model.Channel, userId string, rc
RemoteId: rc.RemoteId,
IsInviteAccepted: true,
IsInviteConfirmed: true,
LastPostCreateAt: model.GetMillis(),
LastPostUpdateAt: model.GetMillis(),
}
if _, err = scs.server.GetStore().SharedChannel().SaveRemote(scr); err != nil {
scs.sendEphemeralPost(channel.Id, userId, fmt.Sprintf("Error confirming channel invite for %s: %v", rc.DisplayName, err))
@ -169,6 +171,8 @@ func (scs *Service) onReceiveChannelInvite(msg model.RemoteClusterMsg, rc *model
IsInviteAccepted: true,
IsInviteConfirmed: true,
RemoteId: rc.RemoteId,
LastPostCreateAt: model.GetMillis(),
LastPostUpdateAt: model.GetMillis(),
}
if _, err := scs.server.GetStore().SharedChannel().SaveRemote(sharedChannelRemote); err != nil {

View File

@ -1,43 +0,0 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package sharedchannel
import (
"encoding/json"
"github.com/mattermost/mattermost/server/public/model"
)
// syncMsg represents a change in content (post add/edit/delete, reaction add/remove, users).
// It is sent to remote clusters as the payload of a `RemoteClusterMsg`.
type syncMsg struct {
Id string `json:"id"`
ChannelId string `json:"channel_id"`
Users map[string]*model.User `json:"users,omitempty"`
Posts []*model.Post `json:"posts,omitempty"`
Reactions []*model.Reaction `json:"reactions,omitempty"`
}
func newSyncMsg(channelID string) *syncMsg {
return &syncMsg{
Id: model.NewId(),
ChannelId: channelID,
}
}
func (sm *syncMsg) ToJSON() ([]byte, error) {
b, err := json.Marshal(sm)
if err != nil {
return nil, err
}
return b, nil
}
func (sm *syncMsg) String() string {
json, err := sm.ToJSON()
if err != nil {
return ""
}
return string(json)
}

View File

@ -1,16 +0,0 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package sharedchannel
type SyncResponse struct {
UsersLastUpdateAt int64 `json:"users_last_update_at"`
UserErrors []string `json:"user_errors"`
UsersSyncd []string `json:"users_syncd"`
PostsLastUpdateAt int64 `json:"posts_last_update_at"`
PostErrors []string `json:"post_errors"`
ReactionsLastUpdateAt int64 `json:"reactions_last_update_at"`
ReactionErrors []string `json:"reaction_errors"`
}

View File

@ -23,7 +23,7 @@ const (
TopicChannelInvite = "sharedchannel_invite"
TopicUploadCreate = "sharedchannel_upload"
MaxRetries = 3
MaxPostsPerSync = 12 // a bit more than one typical screenfull of posts
MaxPostsPerSync = 50 // a bit more than 4 typical screenfulls of posts
MaxUsersPerSync = 25
NotifyRemoteOfflineThreshold = time.Second * 10
NotifyMinimumDelay = time.Second * 2

View File

@ -33,7 +33,7 @@ func (scs *Service) onReceiveSyncMessage(msg model.RemoteClusterMsg, rc *model.R
)
}
var sm syncMsg
var sm model.SyncMsg
if err := json.Unmarshal(msg.Payload, &sm); err != nil {
return fmt.Errorf("invalid sync message: %w", err)
@ -41,12 +41,12 @@ func (scs *Service) onReceiveSyncMessage(msg model.RemoteClusterMsg, rc *model.R
return scs.processSyncMessage(request.EmptyContext(scs.server.Log()), &sm, rc, response)
}
func (scs *Service) processSyncMessage(c request.CTX, syncMsg *syncMsg, rc *model.RemoteCluster, response *remotecluster.Response) error {
func (scs *Service) processSyncMessage(c request.CTX, syncMsg *model.SyncMsg, rc *model.RemoteCluster, response *remotecluster.Response) error {
var channel *model.Channel
var team *model.Team
var err error
syncResp := SyncResponse{
syncResp := model.SyncResponse{
UserErrors: make([]string, 0),
UsersSyncd: make([]string, 0),
PostErrors: make([]string, 0),

View File

@ -21,11 +21,11 @@ type syncTask struct {
remoteID string
AddedAt time.Time
retryCount int
retryMsg *syncMsg
retryMsg *model.SyncMsg
schedule time.Time
}
func newSyncTask(channelID string, remoteID string, retryMsg *syncMsg) syncTask {
func newSyncTask(channelID string, remoteID string, retryMsg *model.SyncMsg) syncTask {
var retryID string
if retryMsg != nil {
retryID = retryMsg.Id
@ -302,7 +302,7 @@ func (scs *Service) handlePostError(postId string, task syncTask, rc *model.Remo
return
}
syncMsg := newSyncMsg(task.channelID)
syncMsg := model.NewSyncMsg(task.channelID)
syncMsg.Posts = []*model.Post{post}
scs.addTask(newSyncTask(task.channelID, task.remoteID, syncMsg))
@ -349,8 +349,10 @@ func (scs *Service) updateCursorForRemote(scrId string, rc *model.RemoteCluster,
scs.server.Log().Log(mlog.LvlSharedChannelServiceDebug, "updated cursor for remote",
mlog.String("remote_id", rc.RemoteId),
mlog.String("remote", rc.DisplayName),
mlog.Int("last_post_create_at", cursor.LastPostCreateAt),
mlog.String("last_post_create_id", cursor.LastPostCreateID),
mlog.Int("last_post_update_at", cursor.LastPostUpdateAt),
mlog.String("last_post_id", cursor.LastPostId),
mlog.String("last_post_update_id", cursor.LastPostUpdateID),
)
}

View File

@ -14,10 +14,11 @@ import (
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/shared/mlog"
"github.com/mattermost/mattermost/server/public/shared/request"
"github.com/mattermost/mattermost/server/v8/channels/store"
"github.com/mattermost/mattermost/server/v8/platform/services/remotecluster"
)
type sendSyncMsgResultFunc func(syncResp SyncResponse, err error)
type sendSyncMsgResultFunc func(syncResp model.SyncResponse, err error)
type attachment struct {
fi *model.FileInfo
@ -41,12 +42,15 @@ type syncData struct {
func newSyncData(task syncTask, rc *model.RemoteCluster, scr *model.SharedChannelRemote) *syncData {
return &syncData{
task: task,
rc: rc,
scr: scr,
users: make(map[string]*model.User),
profileImages: make(map[string]*model.User),
resultNextCursor: model.GetPostsSinceForSyncCursor{LastPostUpdateAt: scr.LastPostUpdateAt, LastPostId: scr.LastPostId},
task: task,
rc: rc,
scr: scr,
users: make(map[string]*model.User),
profileImages: make(map[string]*model.User),
resultNextCursor: model.GetPostsSinceForSyncCursor{
LastPostUpdateAt: scr.LastPostUpdateAt, LastPostUpdateID: scr.LastPostUpdateID,
LastPostCreateAt: scr.LastPostCreateAt, LastPostCreateID: scr.LastPostCreateID,
},
}
}
@ -55,7 +59,12 @@ func (sd *syncData) isEmpty() bool {
}
func (sd *syncData) isCursorChanged() bool {
return sd.scr.LastPostUpdateAt != sd.resultNextCursor.LastPostUpdateAt || sd.scr.LastPostId != sd.resultNextCursor.LastPostId
if sd.resultNextCursor.IsEmpty() {
return false
}
return sd.scr.LastPostCreateAt != sd.resultNextCursor.LastPostCreateAt || sd.scr.LastPostCreateID != sd.resultNextCursor.LastPostCreateID ||
sd.scr.LastPostUpdateAt != sd.resultNextCursor.LastPostUpdateAt || sd.scr.LastPostUpdateID != sd.resultNextCursor.LastPostUpdateID
}
// syncForRemote updates a remote cluster with any new posts/reactions for a specific
@ -186,43 +195,67 @@ func (scs *Service) fetchUsersForSync(sd *syncData) error {
return nil
}
// fetchPostsForSync populates the sync data with any new posts since the last sync.
// fetchPostsForSync populates the sync data with any new or edited posts since the last sync.
func (scs *Service) fetchPostsForSync(sd *syncData) error {
options := model.GetPostsSinceForSyncOptions{
ChannelId: sd.task.channelID,
IncludeDeleted: true,
SinceCreateAt: true,
}
cursor := model.GetPostsSinceForSyncCursor{
LastPostUpdateAt: sd.scr.LastPostUpdateAt,
LastPostId: sd.scr.LastPostId,
LastPostUpdateID: sd.scr.LastPostUpdateID,
LastPostCreateAt: sd.scr.LastPostCreateAt,
LastPostCreateID: sd.scr.LastPostCreateID,
}
// Fetch all newly created posts first. This is to ensure that post order is preserved for sync targets
// that cannot set the CreateAt timestamp for incoming posts (e.g. MS Teams). If we simply used UpdateAt
// then posts could get out of order. For example: p1 created, p2 created, p1 updated... sync'ing on UpdateAt
// would order the posts p2, p1.
posts, nextCursor, err := scs.server.GetStore().Post().GetPostsSinceForSync(options, cursor, MaxPostsPerSync)
if err != nil {
return fmt.Errorf("could not fetch new posts for sync: %w", err)
}
count := len(posts)
sd.posts = appendPosts(sd.posts, posts, scs.server.GetStore().Post(), cursor.LastPostCreateAt)
// Fill remaining batch capacity with updated posts.
if len(posts) < MaxPostsPerSync {
options.SinceCreateAt = false
posts, nextCursor, err = scs.server.GetStore().Post().GetPostsSinceForSync(options, nextCursor, MaxPostsPerSync-len(posts))
if err != nil {
return fmt.Errorf("could not fetch modified posts for sync: %w", err)
}
count += len(posts)
sd.posts = appendPosts(sd.posts, posts, scs.server.GetStore().Post(), cursor.LastPostUpdateAt)
}
sd.resultNextCursor = nextCursor
sd.resultRepeat = count >= MaxPostsPerSync
return nil
}
func appendPosts(dest []*model.Post, posts []*model.Post, postStore store.PostStore, timestamp int64) []*model.Post {
// Append the posts individually, checking for root posts that might appear later in the list.
// This is due to the UpdateAt collision handling algorithm where the order of posts is not based
// on UpdateAt or CreateAt when the posts have the same UpdateAt value. Here we are guarding
// against a root post with the same UpdateAt (and probably the same CreateAt) appearing later
// in the list and must be sync'd before the child post. This is and edge case that likely only
// in the list and must be sync'd before the child post. This is an edge case that likely only
// happens during load testing or bulk imports.
for _, p := range posts {
if p.RootId != "" {
root, err := scs.server.GetStore().Post().GetSingle(p.RootId, true)
root, err := postStore.GetSingle(p.RootId, true)
if err == nil {
if (root.CreateAt >= cursor.LastPostUpdateAt || root.UpdateAt >= cursor.LastPostUpdateAt) && !containsPost(sd.posts, root) {
sd.posts = append(sd.posts, root)
if (root.CreateAt >= timestamp || root.UpdateAt >= timestamp) && !containsPost(dest, root) {
dest = append(dest, root)
}
}
}
sd.posts = append(sd.posts, p)
dest = append(dest, p)
}
sd.resultNextCursor = nextCursor
sd.resultRepeat = len(posts) == MaxPostsPerSync
return nil
return dest
}
func containsPost(posts []*model.Post, post *model.Post) bool {
@ -410,10 +443,10 @@ func (scs *Service) sendSyncData(sd *syncData) error {
// sendUserSyncData sends the collected user updates to the remote cluster.
func (scs *Service) sendUserSyncData(sd *syncData) error {
msg := newSyncMsg(sd.task.channelID)
msg := model.NewSyncMsg(sd.task.channelID)
msg.Users = sd.users
err := scs.sendSyncMsgToRemote(msg, sd.rc, func(syncResp SyncResponse, errResp error) {
err := scs.sendSyncMsgToRemote(msg, sd.rc, func(syncResp model.SyncResponse, errResp error) {
for _, userID := range syncResp.UsersSyncd {
if err := scs.server.GetStore().SharedChannel().UpdateUserLastSyncAt(userID, sd.task.channelID, sd.rc.RemoteId); err != nil {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Cannot update shared channel user LastSyncAt",
@ -452,10 +485,10 @@ func (scs *Service) sendAttachmentSyncData(sd *syncData) {
// sendPostSyncData sends the collected post updates to the remote cluster.
func (scs *Service) sendPostSyncData(sd *syncData) error {
msg := newSyncMsg(sd.task.channelID)
msg := model.NewSyncMsg(sd.task.channelID)
msg.Posts = sd.posts
return scs.sendSyncMsgToRemote(msg, sd.rc, func(syncResp SyncResponse, errResp error) {
return scs.sendSyncMsgToRemote(msg, sd.rc, func(syncResp model.SyncResponse, errResp error) {
if len(syncResp.PostErrors) != 0 {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Response indicates error for post(s) sync",
mlog.String("channel_id", sd.task.channelID),
@ -473,10 +506,10 @@ func (scs *Service) sendPostSyncData(sd *syncData) error {
// sendReactionSyncData sends the collected reaction updates to the remote cluster.
func (scs *Service) sendReactionSyncData(sd *syncData) error {
msg := newSyncMsg(sd.task.channelID)
msg := model.NewSyncMsg(sd.task.channelID)
msg.Reactions = sd.reactions
return scs.sendSyncMsgToRemote(msg, sd.rc, func(syncResp SyncResponse, errResp error) {
return scs.sendSyncMsgToRemote(msg, sd.rc, func(syncResp model.SyncResponse, errResp error) {
if len(syncResp.ReactionErrors) != 0 {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Response indicates error for reactions(s) sync",
mlog.String("channel_id", sd.task.channelID),
@ -495,7 +528,7 @@ func (scs *Service) sendProfileImageSyncData(sd *syncData) {
}
// sendSyncMsgToRemote synchronously sends the sync message to the remote cluster.
func (scs *Service) sendSyncMsgToRemote(msg *syncMsg, rc *model.RemoteCluster, f sendSyncMsgResultFunc) error {
func (scs *Service) sendSyncMsgToRemote(msg *model.SyncMsg, rc *model.RemoteCluster, f sendSyncMsgResultFunc) error {
rcs := scs.server.GetRemoteClusterService()
if rcs == nil {
return fmt.Errorf("cannot update remote cluster %s for channel id %s; Remote Cluster Service not enabled", rc.Name, msg.ChannelId)
@ -516,7 +549,7 @@ func (scs *Service) sendSyncMsgToRemote(msg *syncMsg, rc *model.RemoteCluster, f
err = rcs.SendMsg(ctx, rcMsg, rc, func(rcMsg model.RemoteClusterMsg, rc *model.RemoteCluster, rcResp *remotecluster.Response, errResp error) {
defer wg.Done()
var syncResp SyncResponse
var syncResp model.SyncResponse
if err2 := json.Unmarshal(rcResp.Payload, &syncResp); err2 != nil {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Invalid sync msg response from remote cluster",
mlog.String("remote", rc.Name),

View File

@ -327,13 +327,20 @@ type GetPostsSinceOptions struct {
type GetPostsSinceForSyncCursor struct {
LastPostUpdateAt int64
LastPostId string
LastPostUpdateID string
LastPostCreateAt int64
LastPostCreateID string
}
func (c GetPostsSinceForSyncCursor) IsEmpty() bool {
return c.LastPostCreateAt == 0 && c.LastPostCreateID == "" && c.LastPostUpdateAt == 0 && c.LastPostUpdateID == ""
}
type GetPostsSinceForSyncOptions struct {
ChannelId string
ExcludeRemoteId string
IncludeDeleted bool
SinceCreateAt bool // determines whether the cursor will be based on CreateAt or UpdateAt
}
type GetPostsOptions struct {

View File

@ -39,6 +39,7 @@ type RemoteCluster struct {
RemoteToken string `json:"remote_token"`
Topics string `json:"topics"`
CreatorId string `json:"creator_id"`
PluginID string `json:"plugin_id"` // non-empty when sync message are to be delivered via plugin API
}
func (rc *RemoteCluster) Auditable() map[string]interface{} {
@ -51,6 +52,7 @@ func (rc *RemoteCluster) Auditable() map[string]interface{} {
"create_at": rc.CreateAt,
"last_ping_at": rc.LastPingAt,
"creator_id": rc.CreatorId,
"plugin_id": rc.PluginID,
}
}
@ -319,4 +321,5 @@ type RemoteClusterQueryFilter struct {
Topic string
CreatorId string
OnlyConfirmed bool
PluginID string
}

View File

@ -4,6 +4,7 @@
package model
import (
"encoding/json"
"net/http"
"unicode/utf8"
)
@ -100,7 +101,9 @@ type SharedChannelRemote struct {
IsInviteConfirmed bool `json:"is_invite_confirmed"`
RemoteId string `json:"remote_id"`
LastPostUpdateAt int64 `json:"last_post_update_at"`
LastPostId string `json:"last_post_id"`
LastPostUpdateID string `json:"last_post_id"`
LastPostCreateAt int64 `json:"last_post_create_at"`
LastPostCreateID string `json:"last_post_create_id"`
}
func (sc *SharedChannelRemote) IsValid() *AppError {
@ -248,3 +251,49 @@ type SharedChannelRemoteFilterOpts struct {
RemoteId string
InclUnconfirmed bool
}
// SyncMsg represents a change in content (post add/edit/delete, reaction add/remove, users).
// It is sent to remote clusters as the payload of a `RemoteClusterMsg`.
type SyncMsg struct {
Id string `json:"id"`
ChannelId string `json:"channel_id"`
Users map[string]*User `json:"users,omitempty"`
Posts []*Post `json:"posts,omitempty"`
Reactions []*Reaction `json:"reactions,omitempty"`
}
func NewSyncMsg(channelID string) *SyncMsg {
return &SyncMsg{
Id: NewId(),
ChannelId: channelID,
}
}
func (sm *SyncMsg) ToJSON() ([]byte, error) {
b, err := json.Marshal(sm)
if err != nil {
return nil, err
}
return b, nil
}
func (sm *SyncMsg) String() string {
json, err := sm.ToJSON()
if err != nil {
return ""
}
return string(json)
}
// SyncResponse represents the response to a synchronization event
type SyncResponse struct {
UsersLastUpdateAt int64 `json:"users_last_update_at"`
UserErrors []string `json:"user_errors"`
UsersSyncd []string `json:"users_syncd"`
PostsLastUpdateAt int64 `json:"posts_last_update_at"`
PostErrors []string `json:"post_errors"`
ReactionsLastUpdateAt int64 `json:"reactions_last_update_at"`
ReactionErrors []string `json:"reaction_errors"`
}