diff --git a/server/channels/db/migrations/migrations.list b/server/channels/db/migrations/migrations.list index ece2c4075b..bed6fe45e1 100644 --- a/server/channels/db/migrations/migrations.list +++ b/server/channels/db/migrations/migrations.list @@ -230,6 +230,8 @@ channels/db/migrations/mysql/000115_user_reporting_changes.down.sql channels/db/migrations/mysql/000115_user_reporting_changes.up.sql channels/db/migrations/mysql/000116_create_outgoing_oauth_connections.down.sql channels/db/migrations/mysql/000116_create_outgoing_oauth_connections.up.sql +channels/db/migrations/mysql/000117_msteams_shared_channels.down.sql +channels/db/migrations/mysql/000117_msteams_shared_channels.up.sql channels/db/migrations/postgres/000001_create_teams.down.sql channels/db/migrations/postgres/000001_create_teams.up.sql channels/db/migrations/postgres/000002_create_team_members.down.sql @@ -460,3 +462,5 @@ channels/db/migrations/postgres/000115_user_reporting_changes.down.sql channels/db/migrations/postgres/000115_user_reporting_changes.up.sql channels/db/migrations/postgres/000116_create_outgoing_oauth_connections.down.sql channels/db/migrations/postgres/000116_create_outgoing_oauth_connections.up.sql +channels/db/migrations/postgres/000117_msteams_shared_channels.down.sql +channels/db/migrations/postgres/000117_msteams_shared_channels.up.sql diff --git a/server/channels/db/migrations/mysql/000117_msteams_shared_channels.down.sql b/server/channels/db/migrations/mysql/000117_msteams_shared_channels.down.sql new file mode 100644 index 0000000000..42055c1139 --- /dev/null +++ b/server/channels/db/migrations/mysql/000117_msteams_shared_channels.down.sql @@ -0,0 +1,47 @@ +SET @preparedStatement = (SELECT IF( + EXISTS( + SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS + WHERE table_name = 'RemoteClusters' + AND table_schema = DATABASE() + AND column_name = 'PluginID' + ), + 'ALTER TABLE RemoteClusters DROP COLUMN PluginID;', + 'SELECT 1;' +)); + +PREPARE removeColumnIfExists FROM @preparedStatement; +EXECUTE removeColumnIfExists; +DEALLOCATE PREPARE removeColumnIfExists; + + +SET @preparedStatement = (SELECT IF( + EXISTS( + SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS + WHERE table_name = 'SharedChannelRemotes' + AND table_schema = DATABASE() + AND column_name = 'LastPostCreateAt' + ), + 'ALTER TABLE SharedChannelRemotes DROP COLUMN LastPostCreateAt;', + 'SELECT 1;' +)); + +PREPARE removeColumnIfExists FROM @preparedStatement; +EXECUTE removeColumnIfExists; +DEALLOCATE PREPARE removeColumnIfExists; + + +SET @preparedStatement = (SELECT IF( + EXISTS( + SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS + WHERE table_name = 'SharedChannelRemotes' + AND table_schema = DATABASE() + AND column_name = 'LastPostCreateID' + ), + 'ALTER TABLE SharedChannelRemotes DROP COLUMN LastPostCreateID;', + 'SELECT 1;' +)); + +PREPARE removeColumnIfExists FROM @preparedStatement; +EXECUTE removeColumnIfExists; +DEALLOCATE PREPARE removeColumnIfExists; + diff --git a/server/channels/db/migrations/mysql/000117_msteams_shared_channels.up.sql b/server/channels/db/migrations/mysql/000117_msteams_shared_channels.up.sql new file mode 100644 index 0000000000..4a7891cdb4 --- /dev/null +++ b/server/channels/db/migrations/mysql/000117_msteams_shared_channels.up.sql @@ -0,0 +1,47 @@ +SET @preparedStatement = (SELECT IF( + NOT EXISTS( + SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS + WHERE table_name = 'RemoteClusters' + AND table_schema = DATABASE() + AND column_name = 'PluginID' + ), + 'ALTER TABLE RemoteClusters ADD COLUMN PluginID varchar(190) NOT NULL DEFAULT \'\';', + 'SELECT 1;' +)); + +PREPARE addColumnIfNotExists FROM @preparedStatement; +EXECUTE addColumnIfNotExists; +DEALLOCATE PREPARE addColumnIfNotExists; + + +SET @preparedStatement = (SELECT IF( + NOT EXISTS( + SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS + WHERE table_name = 'SharedChannelRemotes' + AND table_schema = DATABASE() + AND column_name = 'LastPostCreateAt' + ), + 'ALTER TABLE SharedChannelRemotes ADD COLUMN LastPostCreateAt bigint NOT NULL DEFAULT 0;', + 'SELECT 1;' +)); + +PREPARE addColumnIfNotExists FROM @preparedStatement; +EXECUTE addColumnIfNotExists; +DEALLOCATE PREPARE addColumnIfNotExists; + + +SET @preparedStatement = (SELECT IF( + NOT EXISTS( + SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS + WHERE table_name = 'SharedChannelRemotes' + AND table_schema = DATABASE() + AND column_name = 'LastPostCreateID' + ), + 'ALTER TABLE SharedChannelRemotes ADD COLUMN LastPostCreateID varchar(26);', + 'SELECT 1;' +)); + +PREPARE addColumnIfNotExists FROM @preparedStatement; +EXECUTE addColumnIfNotExists; +DEALLOCATE PREPARE addColumnIfNotExists; + diff --git a/server/channels/db/migrations/postgres/000117_msteams_shared_channels.down.sql b/server/channels/db/migrations/postgres/000117_msteams_shared_channels.down.sql new file mode 100644 index 0000000000..f0c77f48e4 --- /dev/null +++ b/server/channels/db/migrations/postgres/000117_msteams_shared_channels.down.sql @@ -0,0 +1,6 @@ +ALTER TABLE RemoteClusters DROP COLUMN IF EXISTS PluginID; + +ALTER TABLE SharedChannelRemotes DROP COLUMN IF EXISTS LastPostCreateAt; + +ALTER TABLE SharedChannelRemotes DROP COLUMN IF EXISTS LastPostCreateID; + diff --git a/server/channels/db/migrations/postgres/000117_msteams_shared_channels.up.sql b/server/channels/db/migrations/postgres/000117_msteams_shared_channels.up.sql new file mode 100644 index 0000000000..3ee8f7d9e9 --- /dev/null +++ b/server/channels/db/migrations/postgres/000117_msteams_shared_channels.up.sql @@ -0,0 +1,6 @@ +ALTER TABLE RemoteClusters ADD COLUMN IF NOT EXISTS PluginID VARCHAR(190) NOT NULL DEFAULT ''; + +ALTER TABLE SharedChannelRemotes ADD COLUMN IF NOT EXISTS LastPostCreateAt bigint NOT NULL DEFAULT 0; + +ALTER TABLE SharedChannelRemotes ADD COLUMN IF NOT EXISTS LastPostCreateID VARCHAR(26); + diff --git a/server/channels/store/sqlstore/post_store.go b/server/channels/store/sqlstore/post_store.go index 5d90623b7b..3e702b4b0a 100644 --- a/server/channels/store/sqlstore/post_store.go +++ b/server/channels/store/sqlstore/post_store.go @@ -13,9 +13,10 @@ import ( "sync" "time" - sq "github.com/mattermost/squirrel" "github.com/pkg/errors" + sq "github.com/mattermost/squirrel" + "github.com/mattermost/mattermost/server/public/model" "github.com/mattermost/mattermost/server/public/shared/mlog" "github.com/mattermost/mattermost/server/public/shared/request" @@ -1396,10 +1397,22 @@ func (s *SqlPostStore) GetPostsSinceForSync(options model.GetPostsSinceForSyncOp query := s.getQueryBuilder(). Select("*"). From("Posts"). - Where(sq.Or{sq.Gt{"Posts.UpdateAt": cursor.LastPostUpdateAt}, sq.And{sq.Eq{"Posts.UpdateAt": cursor.LastPostUpdateAt}, sq.Gt{"Posts.Id": cursor.LastPostId}}}). OrderBy("Posts.UpdateAt", "Id"). Limit(uint64(limit)) + if options.SinceCreateAt { + query = query.Where(sq.Or{ + sq.Gt{"Posts.CreateAt": cursor.LastPostCreateAt}, + sq.And{ + sq.Eq{"Posts.CreateAt": cursor.LastPostCreateAt}, + sq.Gt{"Posts.Id": cursor.LastPostCreateID}, + }, + }) + } else { + query = query.Where(sq.Or{sq.Gt{"Posts.UpdateAt": cursor.LastPostUpdateAt}, + sq.And{sq.Eq{"Posts.UpdateAt": cursor.LastPostUpdateAt}, sq.Gt{"Posts.Id": cursor.LastPostUpdateID}}}) + } + if options.ChannelId != "" { query = query.Where(sq.Eq{"Posts.ChannelId": options.ChannelId}) } @@ -1424,8 +1437,13 @@ func (s *SqlPostStore) GetPostsSinceForSync(options model.GetPostsSinceForSyncOp } if len(posts) != 0 { - cursor.LastPostUpdateAt = posts[len(posts)-1].UpdateAt - cursor.LastPostId = posts[len(posts)-1].Id + if options.SinceCreateAt { + cursor.LastPostCreateAt = posts[len(posts)-1].CreateAt + cursor.LastPostCreateID = posts[len(posts)-1].Id + } else { + cursor.LastPostUpdateAt = posts[len(posts)-1].UpdateAt + cursor.LastPostUpdateID = posts[len(posts)-1].Id + } } return posts, cursor, nil } diff --git a/server/channels/store/sqlstore/remote_cluster_store.go b/server/channels/store/sqlstore/remote_cluster_store.go index a9ce36aed9..7f7d3b080c 100644 --- a/server/channels/store/sqlstore/remote_cluster_store.go +++ b/server/channels/store/sqlstore/remote_cluster_store.go @@ -7,9 +7,10 @@ import ( "fmt" "strings" - sq "github.com/mattermost/squirrel" "github.com/pkg/errors" + sq "github.com/mattermost/squirrel" + "github.com/mattermost/mattermost/server/public/model" "github.com/mattermost/mattermost/server/v8/channels/store" ) @@ -38,6 +39,7 @@ func remoteClusterFields(prefix string) []string { prefix + "RemoteToken", prefix + "Topics", prefix + "CreatorId", + prefix + "PluginID", } } @@ -49,10 +51,10 @@ func (s sqlRemoteClusterStore) Save(remoteCluster *model.RemoteCluster) (*model. query := `INSERT INTO RemoteClusters (RemoteId, RemoteTeamId, Name, DisplayName, SiteURL, CreateAt, - LastPingAt, Token, RemoteToken, Topics, CreatorId) + LastPingAt, Token, RemoteToken, Topics, CreatorId, PluginID) VALUES (:RemoteId, :RemoteTeamId, :Name, :DisplayName, :SiteURL, :CreateAt, - :LastPingAt, :Token, :RemoteToken, :Topics, :CreatorId)` + :LastPingAt, :Token, :RemoteToken, :Topics, :CreatorId, :PluginID)` if _, err := s.GetMasterX().NamedExec(query, remoteCluster); err != nil { return nil, errors.Wrap(err, "failed to save RemoteCluster") @@ -75,7 +77,8 @@ func (s sqlRemoteClusterStore) Update(remoteCluster *model.RemoteCluster) (*mode CreatorId = :CreatorId, DisplayName = :DisplayName, SiteURL = :SiteURL, - Topics = :Topics + Topics = :Topics, + PluginID = :PluginID WHERE RemoteId = :RemoteId AND Name = :Name` if _, err := s.GetMasterX().NamedExec(query, remoteCluster); err != nil { @@ -149,6 +152,10 @@ func (s sqlRemoteClusterStore) GetAll(filter model.RemoteClusterQueryFilter) ([] query = query.Where(sq.NotEq{"rc.SiteURL": ""}) } + if filter.PluginID != "" { + query = query.Where(sq.Eq{"rc.PluginID": filter.PluginID}) + } + if filter.Topic != "" { trimmed := strings.TrimSpace(filter.Topic) if trimmed == "" || trimmed == "*" { diff --git a/server/channels/store/sqlstore/shared_channel_store.go b/server/channels/store/sqlstore/shared_channel_store.go index a374460efd..4a93a5c11e 100644 --- a/server/channels/store/sqlstore/shared_channel_store.go +++ b/server/channels/store/sqlstore/shared_channel_store.go @@ -11,8 +11,9 @@ import ( "github.com/mattermost/mattermost/server/public/model" "github.com/mattermost/mattermost/server/v8/channels/store" - sq "github.com/mattermost/squirrel" "github.com/pkg/errors" + + sq "github.com/mattermost/squirrel" ) const ( @@ -333,8 +334,10 @@ func (s SqlSharedChannelStore) SaveRemote(remote *model.SharedChannelRemote) (*m } query, args, err := s.getQueryBuilder().Insert("SharedChannelRemotes"). - Columns("Id", "ChannelId", "CreatorId", "CreateAt", "UpdateAt", "IsInviteAccepted", "IsInviteConfirmed", "RemoteId", "LastPostUpdateAt", "LastPostId"). - Values(remote.Id, remote.ChannelId, remote.CreatorId, remote.CreateAt, remote.UpdateAt, remote.IsInviteAccepted, remote.IsInviteConfirmed, remote.RemoteId, remote.LastPostUpdateAt, remote.LastPostId). + Columns("Id", "ChannelId", "CreatorId", "CreateAt", "UpdateAt", "IsInviteAccepted", "IsInviteConfirmed", "RemoteId", + "LastPostCreateAt", "LastPostCreateId", "LastPostUpdateAt", "LastPostId"). + Values(remote.Id, remote.ChannelId, remote.CreatorId, remote.CreateAt, remote.UpdateAt, remote.IsInviteAccepted, remote.IsInviteConfirmed, remote.RemoteId, + remote.LastPostCreateAt, remote.LastPostCreateID, remote.LastPostUpdateAt, remote.LastPostUpdateID). ToSql() if err != nil { return nil, errors.Wrapf(err, "savesharedchannelremote_tosql") @@ -359,8 +362,10 @@ func (s SqlSharedChannelStore) UpdateRemote(remote *model.SharedChannelRemote) ( Set("IsInviteAccepted", remote.IsInviteAccepted). Set("IsInviteConfirmed", remote.IsInviteConfirmed). Set("RemoteId", remote.RemoteId). + Set("LastPostCreateAt", remote.LastPostUpdateAt). + Set("LastPostCreateId", remote.LastPostCreateID). Set("LastPostUpdateAt", remote.LastPostUpdateAt). - Set("LastPostId", remote.LastPostId). + Set("LastPostId", remote.LastPostUpdateID). Where(sq.And{ sq.Eq{"Id": remote.Id}, sq.Eq{"ChannelId": remote.ChannelId}, @@ -398,8 +403,10 @@ func sharedChannelRemoteFields(prefix string) []string { prefix + "IsInviteAccepted", prefix + "IsInviteConfirmed", prefix + "RemoteId", + prefix + "LastPostCreateAt", + "COALESCE(" + prefix + "LastPostCreateID,'') AS LastPostCreateID", prefix + "LastPostUpdateAt", - prefix + "LastPostId", + "COALESCE(" + prefix + "LastPostId,'') AS LastPostUpdateID", } } @@ -532,14 +539,32 @@ func (s SqlSharedChannelStore) GetRemoteForUser(remoteId string, userId string) return &rc, nil } -// UpdateRemoteCursor updates the LastPostUpdateAt timestamp and LastPostId for the specified SharedChannelRemote. +// UpdateRemoteCursor updates the cursor for the specified SharedChannelRemote. func (s SqlSharedChannelStore) UpdateRemoteCursor(id string, cursor model.GetPostsSinceForSyncCursor) error { - squery, args, err := s.getQueryBuilder(). + var updateNeeded bool + + builder := s.getQueryBuilder(). Update("SharedChannelRemotes"). - Set("LastPostUpdateAt", cursor.LastPostUpdateAt). - Set("LastPostId", cursor.LastPostId). - Where(sq.Eq{"Id": id}). - ToSql() + Where(sq.Eq{"Id": id}) + + if cursor.LastPostCreateAt > 0 || cursor.LastPostCreateID != "" { + builder = builder.Set("LastPostCreateAt", cursor.LastPostCreateAt) + builder = builder.Set("LastPostCreateId", cursor.LastPostCreateID) + updateNeeded = true + } + + if cursor.LastPostUpdateAt > 0 || cursor.LastPostUpdateID != "" { + builder = builder.Set("LastPostUpdateAt", cursor.LastPostUpdateAt) + builder = builder.Set("LastPostId", cursor.LastPostUpdateID) + updateNeeded = true + } + + if !updateNeeded { + // no new cursor provided. + return fmt.Errorf("cursor empty") + } + + squery, args, err := builder.ToSql() if err != nil { return errors.Wrap(err, "update_shared_channel_remote_cursor_tosql") } diff --git a/server/channels/store/storetest/post_store.go b/server/channels/store/storetest/post_store.go index 3717d3ab2c..0dde75add5 100644 --- a/server/channels/store/storetest/post_store.go +++ b/server/channels/store/storetest/post_store.go @@ -58,7 +58,8 @@ func TestPostStore(t *testing.T, rctx request.CTX, ss store.Store, s SqlStore) { t.Run("GetDirectPostParentsForExportAfterBatched", func(t *testing.T) { testPostStoreGetDirectPostParentsForExportAfterBatched(t, rctx, ss, s) }) t.Run("GetForThread", func(t *testing.T) { testPostStoreGetForThread(t, rctx, ss) }) t.Run("HasAutoResponsePostByUserSince", func(t *testing.T) { testHasAutoResponsePostByUserSince(t, rctx, ss) }) - t.Run("GetPostsSinceForSync", func(t *testing.T) { testGetPostsSinceForSync(t, rctx, ss, s) }) + t.Run("GetPostsSinceUpdateForSync", func(t *testing.T) { testGetPostsSinceUpdateForSync(t, rctx, ss, s) }) + t.Run("GetPostsSinceCreateForSync", func(t *testing.T) { testGetPostsSinceCreateForSync(t, rctx, ss, s) }) t.Run("SetPostReminder", func(t *testing.T) { testSetPostReminder(t, rctx, ss, s) }) t.Run("GetPostReminders", func(t *testing.T) { testGetPostReminders(t, rctx, ss, s) }) t.Run("GetPostReminderMetadata", func(t *testing.T) { testGetPostReminderMetadata(t, rctx, ss, s) }) @@ -4655,7 +4656,7 @@ func testHasAutoResponsePostByUserSince(t *testing.T, rctx request.CTX, ss store }) } -func testGetPostsSinceForSync(t *testing.T, rctx request.CTX, ss store.Store, s SqlStore) { +func testGetPostsSinceUpdateForSync(t *testing.T, rctx request.CTX, ss store.Store, s SqlStore) { // create some posts. channelID := model.NewId() remoteID := model.NewString(model.NewId()) @@ -4758,6 +4759,113 @@ func testGetPostsSinceForSync(t *testing.T, rctx request.CTX, ss store.Store, s }) } +func testGetPostsSinceCreateForSync(t *testing.T, rctx request.CTX, ss store.Store, s SqlStore) { + // create some posts. + channelID := model.NewId() + remoteID := model.NewString(model.NewId()) + first := model.GetMillis() + + data := []*model.Post{ + {Id: model.NewId(), ChannelId: channelID, UserId: model.NewId(), Message: "test post 0"}, + {Id: model.NewId(), ChannelId: channelID, UserId: model.NewId(), Message: "test post 1"}, + {Id: model.NewId(), ChannelId: channelID, UserId: model.NewId(), Message: "test post 2"}, + {Id: model.NewId(), ChannelId: channelID, UserId: model.NewId(), Message: "test post 3", RemoteId: remoteID}, + {Id: model.NewId(), ChannelId: channelID, UserId: model.NewId(), Message: "test post 4", RemoteId: remoteID}, + {Id: model.NewId(), ChannelId: channelID, UserId: model.NewId(), Message: "test post 5", RemoteId: remoteID}, + {Id: model.NewId(), ChannelId: channelID, UserId: model.NewId(), Message: "test post 6", RemoteId: remoteID}, + {Id: model.NewId(), ChannelId: channelID, UserId: model.NewId(), Message: "test post 7"}, + {Id: model.NewId(), ChannelId: channelID, UserId: model.NewId(), Message: "test post 8", DeleteAt: model.GetMillis()}, + {Id: model.NewId(), ChannelId: channelID, UserId: model.NewId(), Message: "test post 9", DeleteAt: model.GetMillis()}, + } + + for i, p := range data { + p.CreateAt = first + (int64(i) * 300000) + if p.RemoteId == nil { + p.RemoteId = model.NewString(model.NewId()) + } + _, err := ss.Post().Save(p) + require.NoError(t, err, "couldn't save post") + } + + t.Run("Invalid channel id", func(t *testing.T) { + opt := model.GetPostsSinceForSyncOptions{ + ChannelId: model.NewId(), + SinceCreateAt: true, + } + cursor := model.GetPostsSinceForSyncCursor{} + posts, cursorOut, err := ss.Post().GetPostsSinceForSync(opt, cursor, 100) + require.NoError(t, err) + require.Empty(t, posts, "should return zero posts") + require.Equal(t, cursor, cursorOut) + }) + + t.Run("Get by channel, exclude remotes, exclude deleted", func(t *testing.T) { + opt := model.GetPostsSinceForSyncOptions{ + ChannelId: channelID, + ExcludeRemoteId: *remoteID, + SinceCreateAt: true, + } + cursor := model.GetPostsSinceForSyncCursor{} + posts, _, err := ss.Post().GetPostsSinceForSync(opt, cursor, 100) + require.NoError(t, err) + + require.ElementsMatch(t, getPostIds(data[0:3], data[7]), getPostIds(posts)) + }) + + t.Run("Include deleted", func(t *testing.T) { + opt := model.GetPostsSinceForSyncOptions{ + ChannelId: channelID, + IncludeDeleted: true, + SinceCreateAt: true, + } + cursor := model.GetPostsSinceForSyncCursor{} + posts, _, err := ss.Post().GetPostsSinceForSync(opt, cursor, 100) + require.NoError(t, err) + + require.ElementsMatch(t, getPostIds(data), getPostIds(posts)) + }) + + t.Run("Limit and cursor", func(t *testing.T) { + opt := model.GetPostsSinceForSyncOptions{ + ChannelId: channelID, + SinceCreateAt: true, + } + cursor := model.GetPostsSinceForSyncCursor{} + posts1, cursor, err := ss.Post().GetPostsSinceForSync(opt, cursor, 5) + require.NoError(t, err) + require.Len(t, posts1, 5, "should get 5 posts") + + posts2, _, err := ss.Post().GetPostsSinceForSync(opt, cursor, 5) + require.NoError(t, err) + require.Len(t, posts2, 3, "should get 3 posts") + + require.ElementsMatch(t, getPostIds(data[0:8]), getPostIds(posts1, posts2...)) + }) + + t.Run("CreateAt collisions", func(t *testing.T) { + // this test requires all the CreateAt timestamps to be the same. + result, err := s.GetMasterX().Exec("UPDATE Posts SET CreateAt = ?", model.GetMillis()) + require.NoError(t, err) + rows, err := result.RowsAffected() + require.NoError(t, err) + require.Greater(t, rows, int64(0)) + + opt := model.GetPostsSinceForSyncOptions{ + ChannelId: channelID, + } + cursor := model.GetPostsSinceForSyncCursor{} + posts1, cursor, err := ss.Post().GetPostsSinceForSync(opt, cursor, 5) + require.NoError(t, err) + require.Len(t, posts1, 5, "should get 5 posts") + + posts2, _, err := ss.Post().GetPostsSinceForSync(opt, cursor, 5) + require.NoError(t, err) + require.Len(t, posts2, 3, "should get 3 posts") + + require.ElementsMatch(t, getPostIds(data[0:8]), getPostIds(posts1, posts2...)) + }) +} + func testSetPostReminder(t *testing.T, rctx request.CTX, ss store.Store, s SqlStore) { // Basic userID := NewTestId() diff --git a/server/channels/store/storetest/remote_cluster_store.go b/server/channels/store/storetest/remote_cluster_store.go index 1c772fc049..4557ed3c7b 100644 --- a/server/channels/store/storetest/remote_cluster_store.go +++ b/server/channels/store/storetest/remote_cluster_store.go @@ -15,6 +15,10 @@ import ( "github.com/stretchr/testify/require" ) +const ( + testPluginID = "com.sample.blap" +) + func TestRemoteClusterStore(t *testing.T, rctx request.CTX, ss store.Store) { t.Run("RemoteClusterGetAllInChannel", func(t *testing.T) { testRemoteClusterGetAllInChannel(t, rctx, ss) }) t.Run("RemoteClusterGetAllNotInChannel", func(t *testing.T) { testRemoteClusterGetAllNotInChannel(t, rctx, ss) }) @@ -89,6 +93,7 @@ func testRemoteClusterGet(t *testing.T, rctx request.CTX, ss store.Store) { Name: "shortlived_remote_2", SiteURL: "nowhere.com", CreatorId: model.NewId(), + PluginID: testPluginID, } rcSaved, err := ss.RemoteCluster().Save(rc) require.NoError(t, err) @@ -96,6 +101,7 @@ func testRemoteClusterGet(t *testing.T, rctx request.CTX, ss store.Store) { rcGet, err := ss.RemoteCluster().Get(rcSaved.RemoteId) require.NoError(t, err) require.Equal(t, rcSaved.RemoteId, rcGet.RemoteId) + require.Equal(t, testPluginID, rcGet.PluginID) }) t.Run("Get not found", func(t *testing.T) { @@ -237,8 +243,8 @@ func testRemoteClusterGetAllInChannel(t *testing.T, rctx request.CTX, ss store.S // Create some remote clusters rcData := []*model.RemoteCluster{ - {Name: "AAAA_Inc", CreatorId: userId, SiteURL: "aaaa.com", RemoteId: model.NewId(), LastPingAt: now}, - {Name: "BBBB_Inc", CreatorId: userId, SiteURL: "bbbb.com", RemoteId: model.NewId(), LastPingAt: 0}, + {Name: "AAAA_Inc", CreatorId: userId, SiteURL: "aaaa.com", RemoteId: model.NewId(), LastPingAt: now, PluginID: testPluginID}, + {Name: "BBBB_Inc", CreatorId: userId, SiteURL: "bbbb.com", RemoteId: model.NewId(), LastPingAt: 0, PluginID: testPluginID}, {Name: "CCCC_Inc", CreatorId: userId, SiteURL: "cccc.com", RemoteId: model.NewId(), LastPingAt: now}, {Name: "DDDD_Inc", CreatorId: userId, SiteURL: "dddd.com", RemoteId: model.NewId(), LastPingAt: now}, {Name: "EEEE_Inc", CreatorId: userId, SiteURL: "eeee.com", RemoteId: model.NewId(), LastPingAt: 0}, @@ -270,6 +276,8 @@ func testRemoteClusterGetAllInChannel(t *testing.T, rctx request.CTX, ss store.S require.Len(t, list, 2, "channel 1 should have 2 remote clusters") ids := getIds(list) require.ElementsMatch(t, []string{rcData[0].RemoteId, rcData[1].RemoteId}, ids) + require.Equal(t, testPluginID, rcData[0].PluginID) + require.Equal(t, testPluginID, rcData[1].PluginID) }) t.Run("Channel 1 online only", func(t *testing.T) { diff --git a/server/channels/store/storetest/shared_channel_store.go b/server/channels/store/storetest/shared_channel_store.go index 6c885394c3..de14d3e46f 100644 --- a/server/channels/store/storetest/shared_channel_store.go +++ b/server/channels/store/storetest/shared_channel_store.go @@ -765,28 +765,52 @@ func testUpdateSharedChannelRemoteCursor(t *testing.T, rctx request.CTX, ss stor remoteSaved, err := ss.SharedChannel().SaveRemote(remote) require.NoError(t, err, "couldn't save remote", err) - future := model.GetMillis() + 3600000 // 1 hour in the future - postID := model.NewId() + futureCreateAt := model.GetMillis() + 3600000 // 1 hour in the future + postCreateID := model.NewId() - cursor := model.GetPostsSinceForSyncCursor{ - LastPostUpdateAt: future, - LastPostId: postID, + futureUpdateAt := model.GetMillis() + (3600000 * 2) // 2 hours in the future + postUpdateID := model.NewId() + + cursorCreate := model.GetPostsSinceForSyncCursor{ + LastPostCreateAt: futureCreateAt, + LastPostCreateID: postCreateID, } - t.Run("Update NextSyncAt for remote", func(t *testing.T) { - err := ss.SharedChannel().UpdateRemoteCursor(remoteSaved.Id, cursor) - require.NoError(t, err, "update NextSyncAt should not error", err) + cursorUpdate := model.GetPostsSinceForSyncCursor{ + LastPostUpdateAt: futureUpdateAt, + LastPostUpdateID: postUpdateID, + } + + t.Run("Update cursor CreateAt for remote", func(t *testing.T) { + err := ss.SharedChannel().UpdateRemoteCursor(remoteSaved.Id, cursorCreate) + require.NoError(t, err, "update cursor should not error", err) r, err := ss.SharedChannel().GetRemote(remoteSaved.Id) require.NoError(t, err) - require.Equal(t, future, r.LastPostUpdateAt) - require.Equal(t, postID, r.LastPostId) + require.Equal(t, futureCreateAt, r.LastPostCreateAt) + require.Equal(t, postCreateID, r.LastPostCreateID) }) - t.Run("Update NextSyncAt for non-existent shared channel remote", func(t *testing.T) { - err := ss.SharedChannel().UpdateRemoteCursor(model.NewId(), cursor) + t.Run("Update cursor UpdateAt for remote", func(t *testing.T) { + err := ss.SharedChannel().UpdateRemoteCursor(remoteSaved.Id, cursorUpdate) + require.NoError(t, err, "update cursor should not error", err) + + r, err := ss.SharedChannel().GetRemote(remoteSaved.Id) + require.NoError(t, err) + require.Equal(t, futureUpdateAt, r.LastPostUpdateAt) + require.Equal(t, postUpdateID, r.LastPostUpdateID) + }) + + t.Run("Update cursor for non-existent shared channel remote", func(t *testing.T) { + err := ss.SharedChannel().UpdateRemoteCursor(model.NewId(), cursorUpdate) require.Error(t, err, "update non-existent remote should error", err) }) + + t.Run("Update with empty cursor", func(t *testing.T) { + emptyCursor := model.GetPostsSinceForSyncCursor{} + err := ss.SharedChannel().UpdateRemoteCursor(remoteSaved.Id, emptyCursor) + require.Error(t, err, "update with empty cursor should error", err) + }) } func testDeleteSharedChannelRemote(t *testing.T, rctx request.CTX, ss store.Store) { diff --git a/server/platform/services/remotecluster/ping.go b/server/platform/services/remotecluster/ping.go index dbdfa62899..29e5d7fcc8 100644 --- a/server/platform/services/remotecluster/ping.go +++ b/server/platform/services/remotecluster/ping.go @@ -106,6 +106,7 @@ func (rcs *Service) pingRemote(rc *model.RemoteCluster) error { if err != nil { return err } + rc.LastPingAt = model.GetMillis() ping := model.RemoteClusterPing{} err = json.Unmarshal(resp, &ping) @@ -120,7 +121,6 @@ func (rcs *Service) pingRemote(rc *model.RemoteCluster) error { mlog.Err(err), ) } - rc.LastPingAt = model.GetMillis() if metrics := rcs.server.GetMetrics(); metrics != nil { sentAt := time.Unix(0, ping.SentAt*int64(time.Millisecond)) diff --git a/server/platform/services/sharedchannel/channelinvite.go b/server/platform/services/sharedchannel/channelinvite.go index bad0e87eaa..4545da9866 100644 --- a/server/platform/services/sharedchannel/channelinvite.go +++ b/server/platform/services/sharedchannel/channelinvite.go @@ -87,6 +87,8 @@ func (scs *Service) SendChannelInvite(channel *model.Channel, userId string, rc RemoteId: rc.RemoteId, IsInviteAccepted: true, IsInviteConfirmed: true, + LastPostCreateAt: model.GetMillis(), + LastPostUpdateAt: model.GetMillis(), } if _, err = scs.server.GetStore().SharedChannel().SaveRemote(scr); err != nil { scs.sendEphemeralPost(channel.Id, userId, fmt.Sprintf("Error confirming channel invite for %s: %v", rc.DisplayName, err)) @@ -169,6 +171,8 @@ func (scs *Service) onReceiveChannelInvite(msg model.RemoteClusterMsg, rc *model IsInviteAccepted: true, IsInviteConfirmed: true, RemoteId: rc.RemoteId, + LastPostCreateAt: model.GetMillis(), + LastPostUpdateAt: model.GetMillis(), } if _, err := scs.server.GetStore().SharedChannel().SaveRemote(sharedChannelRemote); err != nil { diff --git a/server/platform/services/sharedchannel/msg.go b/server/platform/services/sharedchannel/msg.go deleted file mode 100644 index 7644b9938a..0000000000 --- a/server/platform/services/sharedchannel/msg.go +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. -// See LICENSE.txt for license information. - -package sharedchannel - -import ( - "encoding/json" - - "github.com/mattermost/mattermost/server/public/model" -) - -// syncMsg represents a change in content (post add/edit/delete, reaction add/remove, users). -// It is sent to remote clusters as the payload of a `RemoteClusterMsg`. -type syncMsg struct { - Id string `json:"id"` - ChannelId string `json:"channel_id"` - Users map[string]*model.User `json:"users,omitempty"` - Posts []*model.Post `json:"posts,omitempty"` - Reactions []*model.Reaction `json:"reactions,omitempty"` -} - -func newSyncMsg(channelID string) *syncMsg { - return &syncMsg{ - Id: model.NewId(), - ChannelId: channelID, - } -} - -func (sm *syncMsg) ToJSON() ([]byte, error) { - b, err := json.Marshal(sm) - if err != nil { - return nil, err - } - return b, nil -} - -func (sm *syncMsg) String() string { - json, err := sm.ToJSON() - if err != nil { - return "" - } - return string(json) -} diff --git a/server/platform/services/sharedchannel/response.go b/server/platform/services/sharedchannel/response.go deleted file mode 100644 index 61a9fc22f5..0000000000 --- a/server/platform/services/sharedchannel/response.go +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. -// See LICENSE.txt for license information. - -package sharedchannel - -type SyncResponse struct { - UsersLastUpdateAt int64 `json:"users_last_update_at"` - UserErrors []string `json:"user_errors"` - UsersSyncd []string `json:"users_syncd"` - - PostsLastUpdateAt int64 `json:"posts_last_update_at"` - PostErrors []string `json:"post_errors"` - - ReactionsLastUpdateAt int64 `json:"reactions_last_update_at"` - ReactionErrors []string `json:"reaction_errors"` -} diff --git a/server/platform/services/sharedchannel/service.go b/server/platform/services/sharedchannel/service.go index b1691ab730..bb99cf8e18 100644 --- a/server/platform/services/sharedchannel/service.go +++ b/server/platform/services/sharedchannel/service.go @@ -23,7 +23,7 @@ const ( TopicChannelInvite = "sharedchannel_invite" TopicUploadCreate = "sharedchannel_upload" MaxRetries = 3 - MaxPostsPerSync = 12 // a bit more than one typical screenfull of posts + MaxPostsPerSync = 50 // a bit more than 4 typical screenfulls of posts MaxUsersPerSync = 25 NotifyRemoteOfflineThreshold = time.Second * 10 NotifyMinimumDelay = time.Second * 2 diff --git a/server/platform/services/sharedchannel/sync_recv.go b/server/platform/services/sharedchannel/sync_recv.go index a448dbc17d..5fd08e2fa4 100644 --- a/server/platform/services/sharedchannel/sync_recv.go +++ b/server/platform/services/sharedchannel/sync_recv.go @@ -33,7 +33,7 @@ func (scs *Service) onReceiveSyncMessage(msg model.RemoteClusterMsg, rc *model.R ) } - var sm syncMsg + var sm model.SyncMsg if err := json.Unmarshal(msg.Payload, &sm); err != nil { return fmt.Errorf("invalid sync message: %w", err) @@ -41,12 +41,12 @@ func (scs *Service) onReceiveSyncMessage(msg model.RemoteClusterMsg, rc *model.R return scs.processSyncMessage(request.EmptyContext(scs.server.Log()), &sm, rc, response) } -func (scs *Service) processSyncMessage(c request.CTX, syncMsg *syncMsg, rc *model.RemoteCluster, response *remotecluster.Response) error { +func (scs *Service) processSyncMessage(c request.CTX, syncMsg *model.SyncMsg, rc *model.RemoteCluster, response *remotecluster.Response) error { var channel *model.Channel var team *model.Team var err error - syncResp := SyncResponse{ + syncResp := model.SyncResponse{ UserErrors: make([]string, 0), UsersSyncd: make([]string, 0), PostErrors: make([]string, 0), diff --git a/server/platform/services/sharedchannel/sync_send.go b/server/platform/services/sharedchannel/sync_send.go index 891c6cdc93..ddcf65a6c5 100644 --- a/server/platform/services/sharedchannel/sync_send.go +++ b/server/platform/services/sharedchannel/sync_send.go @@ -21,11 +21,11 @@ type syncTask struct { remoteID string AddedAt time.Time retryCount int - retryMsg *syncMsg + retryMsg *model.SyncMsg schedule time.Time } -func newSyncTask(channelID string, remoteID string, retryMsg *syncMsg) syncTask { +func newSyncTask(channelID string, remoteID string, retryMsg *model.SyncMsg) syncTask { var retryID string if retryMsg != nil { retryID = retryMsg.Id @@ -302,7 +302,7 @@ func (scs *Service) handlePostError(postId string, task syncTask, rc *model.Remo return } - syncMsg := newSyncMsg(task.channelID) + syncMsg := model.NewSyncMsg(task.channelID) syncMsg.Posts = []*model.Post{post} scs.addTask(newSyncTask(task.channelID, task.remoteID, syncMsg)) @@ -349,8 +349,10 @@ func (scs *Service) updateCursorForRemote(scrId string, rc *model.RemoteCluster, scs.server.Log().Log(mlog.LvlSharedChannelServiceDebug, "updated cursor for remote", mlog.String("remote_id", rc.RemoteId), mlog.String("remote", rc.DisplayName), + mlog.Int("last_post_create_at", cursor.LastPostCreateAt), + mlog.String("last_post_create_id", cursor.LastPostCreateID), mlog.Int("last_post_update_at", cursor.LastPostUpdateAt), - mlog.String("last_post_id", cursor.LastPostId), + mlog.String("last_post_update_id", cursor.LastPostUpdateID), ) } diff --git a/server/platform/services/sharedchannel/sync_send_remote.go b/server/platform/services/sharedchannel/sync_send_remote.go index 4b87820536..9124087cf4 100644 --- a/server/platform/services/sharedchannel/sync_send_remote.go +++ b/server/platform/services/sharedchannel/sync_send_remote.go @@ -14,10 +14,11 @@ import ( "github.com/mattermost/mattermost/server/public/model" "github.com/mattermost/mattermost/server/public/shared/mlog" "github.com/mattermost/mattermost/server/public/shared/request" + "github.com/mattermost/mattermost/server/v8/channels/store" "github.com/mattermost/mattermost/server/v8/platform/services/remotecluster" ) -type sendSyncMsgResultFunc func(syncResp SyncResponse, err error) +type sendSyncMsgResultFunc func(syncResp model.SyncResponse, err error) type attachment struct { fi *model.FileInfo @@ -41,12 +42,15 @@ type syncData struct { func newSyncData(task syncTask, rc *model.RemoteCluster, scr *model.SharedChannelRemote) *syncData { return &syncData{ - task: task, - rc: rc, - scr: scr, - users: make(map[string]*model.User), - profileImages: make(map[string]*model.User), - resultNextCursor: model.GetPostsSinceForSyncCursor{LastPostUpdateAt: scr.LastPostUpdateAt, LastPostId: scr.LastPostId}, + task: task, + rc: rc, + scr: scr, + users: make(map[string]*model.User), + profileImages: make(map[string]*model.User), + resultNextCursor: model.GetPostsSinceForSyncCursor{ + LastPostUpdateAt: scr.LastPostUpdateAt, LastPostUpdateID: scr.LastPostUpdateID, + LastPostCreateAt: scr.LastPostCreateAt, LastPostCreateID: scr.LastPostCreateID, + }, } } @@ -55,7 +59,12 @@ func (sd *syncData) isEmpty() bool { } func (sd *syncData) isCursorChanged() bool { - return sd.scr.LastPostUpdateAt != sd.resultNextCursor.LastPostUpdateAt || sd.scr.LastPostId != sd.resultNextCursor.LastPostId + if sd.resultNextCursor.IsEmpty() { + return false + } + + return sd.scr.LastPostCreateAt != sd.resultNextCursor.LastPostCreateAt || sd.scr.LastPostCreateID != sd.resultNextCursor.LastPostCreateID || + sd.scr.LastPostUpdateAt != sd.resultNextCursor.LastPostUpdateAt || sd.scr.LastPostUpdateID != sd.resultNextCursor.LastPostUpdateID } // syncForRemote updates a remote cluster with any new posts/reactions for a specific @@ -186,43 +195,67 @@ func (scs *Service) fetchUsersForSync(sd *syncData) error { return nil } -// fetchPostsForSync populates the sync data with any new posts since the last sync. +// fetchPostsForSync populates the sync data with any new or edited posts since the last sync. func (scs *Service) fetchPostsForSync(sd *syncData) error { options := model.GetPostsSinceForSyncOptions{ ChannelId: sd.task.channelID, IncludeDeleted: true, + SinceCreateAt: true, } cursor := model.GetPostsSinceForSyncCursor{ LastPostUpdateAt: sd.scr.LastPostUpdateAt, - LastPostId: sd.scr.LastPostId, + LastPostUpdateID: sd.scr.LastPostUpdateID, + LastPostCreateAt: sd.scr.LastPostCreateAt, + LastPostCreateID: sd.scr.LastPostCreateID, } + // Fetch all newly created posts first. This is to ensure that post order is preserved for sync targets + // that cannot set the CreateAt timestamp for incoming posts (e.g. MS Teams). If we simply used UpdateAt + // then posts could get out of order. For example: p1 created, p2 created, p1 updated... sync'ing on UpdateAt + // would order the posts p2, p1. posts, nextCursor, err := scs.server.GetStore().Post().GetPostsSinceForSync(options, cursor, MaxPostsPerSync) if err != nil { return fmt.Errorf("could not fetch new posts for sync: %w", err) } + count := len(posts) + sd.posts = appendPosts(sd.posts, posts, scs.server.GetStore().Post(), cursor.LastPostCreateAt) + // Fill remaining batch capacity with updated posts. + if len(posts) < MaxPostsPerSync { + options.SinceCreateAt = false + posts, nextCursor, err = scs.server.GetStore().Post().GetPostsSinceForSync(options, nextCursor, MaxPostsPerSync-len(posts)) + if err != nil { + return fmt.Errorf("could not fetch modified posts for sync: %w", err) + } + count += len(posts) + sd.posts = appendPosts(sd.posts, posts, scs.server.GetStore().Post(), cursor.LastPostUpdateAt) + } + + sd.resultNextCursor = nextCursor + sd.resultRepeat = count >= MaxPostsPerSync + + return nil +} + +func appendPosts(dest []*model.Post, posts []*model.Post, postStore store.PostStore, timestamp int64) []*model.Post { // Append the posts individually, checking for root posts that might appear later in the list. // This is due to the UpdateAt collision handling algorithm where the order of posts is not based // on UpdateAt or CreateAt when the posts have the same UpdateAt value. Here we are guarding // against a root post with the same UpdateAt (and probably the same CreateAt) appearing later - // in the list and must be sync'd before the child post. This is and edge case that likely only + // in the list and must be sync'd before the child post. This is an edge case that likely only // happens during load testing or bulk imports. for _, p := range posts { if p.RootId != "" { - root, err := scs.server.GetStore().Post().GetSingle(p.RootId, true) + root, err := postStore.GetSingle(p.RootId, true) if err == nil { - if (root.CreateAt >= cursor.LastPostUpdateAt || root.UpdateAt >= cursor.LastPostUpdateAt) && !containsPost(sd.posts, root) { - sd.posts = append(sd.posts, root) + if (root.CreateAt >= timestamp || root.UpdateAt >= timestamp) && !containsPost(dest, root) { + dest = append(dest, root) } } } - sd.posts = append(sd.posts, p) + dest = append(dest, p) } - - sd.resultNextCursor = nextCursor - sd.resultRepeat = len(posts) == MaxPostsPerSync - return nil + return dest } func containsPost(posts []*model.Post, post *model.Post) bool { @@ -410,10 +443,10 @@ func (scs *Service) sendSyncData(sd *syncData) error { // sendUserSyncData sends the collected user updates to the remote cluster. func (scs *Service) sendUserSyncData(sd *syncData) error { - msg := newSyncMsg(sd.task.channelID) + msg := model.NewSyncMsg(sd.task.channelID) msg.Users = sd.users - err := scs.sendSyncMsgToRemote(msg, sd.rc, func(syncResp SyncResponse, errResp error) { + err := scs.sendSyncMsgToRemote(msg, sd.rc, func(syncResp model.SyncResponse, errResp error) { for _, userID := range syncResp.UsersSyncd { if err := scs.server.GetStore().SharedChannel().UpdateUserLastSyncAt(userID, sd.task.channelID, sd.rc.RemoteId); err != nil { scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Cannot update shared channel user LastSyncAt", @@ -452,10 +485,10 @@ func (scs *Service) sendAttachmentSyncData(sd *syncData) { // sendPostSyncData sends the collected post updates to the remote cluster. func (scs *Service) sendPostSyncData(sd *syncData) error { - msg := newSyncMsg(sd.task.channelID) + msg := model.NewSyncMsg(sd.task.channelID) msg.Posts = sd.posts - return scs.sendSyncMsgToRemote(msg, sd.rc, func(syncResp SyncResponse, errResp error) { + return scs.sendSyncMsgToRemote(msg, sd.rc, func(syncResp model.SyncResponse, errResp error) { if len(syncResp.PostErrors) != 0 { scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Response indicates error for post(s) sync", mlog.String("channel_id", sd.task.channelID), @@ -473,10 +506,10 @@ func (scs *Service) sendPostSyncData(sd *syncData) error { // sendReactionSyncData sends the collected reaction updates to the remote cluster. func (scs *Service) sendReactionSyncData(sd *syncData) error { - msg := newSyncMsg(sd.task.channelID) + msg := model.NewSyncMsg(sd.task.channelID) msg.Reactions = sd.reactions - return scs.sendSyncMsgToRemote(msg, sd.rc, func(syncResp SyncResponse, errResp error) { + return scs.sendSyncMsgToRemote(msg, sd.rc, func(syncResp model.SyncResponse, errResp error) { if len(syncResp.ReactionErrors) != 0 { scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Response indicates error for reactions(s) sync", mlog.String("channel_id", sd.task.channelID), @@ -495,7 +528,7 @@ func (scs *Service) sendProfileImageSyncData(sd *syncData) { } // sendSyncMsgToRemote synchronously sends the sync message to the remote cluster. -func (scs *Service) sendSyncMsgToRemote(msg *syncMsg, rc *model.RemoteCluster, f sendSyncMsgResultFunc) error { +func (scs *Service) sendSyncMsgToRemote(msg *model.SyncMsg, rc *model.RemoteCluster, f sendSyncMsgResultFunc) error { rcs := scs.server.GetRemoteClusterService() if rcs == nil { return fmt.Errorf("cannot update remote cluster %s for channel id %s; Remote Cluster Service not enabled", rc.Name, msg.ChannelId) @@ -516,7 +549,7 @@ func (scs *Service) sendSyncMsgToRemote(msg *syncMsg, rc *model.RemoteCluster, f err = rcs.SendMsg(ctx, rcMsg, rc, func(rcMsg model.RemoteClusterMsg, rc *model.RemoteCluster, rcResp *remotecluster.Response, errResp error) { defer wg.Done() - var syncResp SyncResponse + var syncResp model.SyncResponse if err2 := json.Unmarshal(rcResp.Payload, &syncResp); err2 != nil { scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Invalid sync msg response from remote cluster", mlog.String("remote", rc.Name), diff --git a/server/public/model/post.go b/server/public/model/post.go index e67728fe29..3d761e8d3f 100644 --- a/server/public/model/post.go +++ b/server/public/model/post.go @@ -327,13 +327,20 @@ type GetPostsSinceOptions struct { type GetPostsSinceForSyncCursor struct { LastPostUpdateAt int64 - LastPostId string + LastPostUpdateID string + LastPostCreateAt int64 + LastPostCreateID string +} + +func (c GetPostsSinceForSyncCursor) IsEmpty() bool { + return c.LastPostCreateAt == 0 && c.LastPostCreateID == "" && c.LastPostUpdateAt == 0 && c.LastPostUpdateID == "" } type GetPostsSinceForSyncOptions struct { ChannelId string ExcludeRemoteId string IncludeDeleted bool + SinceCreateAt bool // determines whether the cursor will be based on CreateAt or UpdateAt } type GetPostsOptions struct { diff --git a/server/public/model/remote_cluster.go b/server/public/model/remote_cluster.go index fce156ddec..54a305b149 100644 --- a/server/public/model/remote_cluster.go +++ b/server/public/model/remote_cluster.go @@ -39,6 +39,7 @@ type RemoteCluster struct { RemoteToken string `json:"remote_token"` Topics string `json:"topics"` CreatorId string `json:"creator_id"` + PluginID string `json:"plugin_id"` // non-empty when sync message are to be delivered via plugin API } func (rc *RemoteCluster) Auditable() map[string]interface{} { @@ -51,6 +52,7 @@ func (rc *RemoteCluster) Auditable() map[string]interface{} { "create_at": rc.CreateAt, "last_ping_at": rc.LastPingAt, "creator_id": rc.CreatorId, + "plugin_id": rc.PluginID, } } @@ -319,4 +321,5 @@ type RemoteClusterQueryFilter struct { Topic string CreatorId string OnlyConfirmed bool + PluginID string } diff --git a/server/public/model/shared_channel.go b/server/public/model/shared_channel.go index 453d18e48e..98f49b1e76 100644 --- a/server/public/model/shared_channel.go +++ b/server/public/model/shared_channel.go @@ -4,6 +4,7 @@ package model import ( + "encoding/json" "net/http" "unicode/utf8" ) @@ -100,7 +101,9 @@ type SharedChannelRemote struct { IsInviteConfirmed bool `json:"is_invite_confirmed"` RemoteId string `json:"remote_id"` LastPostUpdateAt int64 `json:"last_post_update_at"` - LastPostId string `json:"last_post_id"` + LastPostUpdateID string `json:"last_post_id"` + LastPostCreateAt int64 `json:"last_post_create_at"` + LastPostCreateID string `json:"last_post_create_id"` } func (sc *SharedChannelRemote) IsValid() *AppError { @@ -248,3 +251,49 @@ type SharedChannelRemoteFilterOpts struct { RemoteId string InclUnconfirmed bool } + +// SyncMsg represents a change in content (post add/edit/delete, reaction add/remove, users). +// It is sent to remote clusters as the payload of a `RemoteClusterMsg`. +type SyncMsg struct { + Id string `json:"id"` + ChannelId string `json:"channel_id"` + Users map[string]*User `json:"users,omitempty"` + Posts []*Post `json:"posts,omitempty"` + Reactions []*Reaction `json:"reactions,omitempty"` +} + +func NewSyncMsg(channelID string) *SyncMsg { + return &SyncMsg{ + Id: NewId(), + ChannelId: channelID, + } +} + +func (sm *SyncMsg) ToJSON() ([]byte, error) { + b, err := json.Marshal(sm) + if err != nil { + return nil, err + } + return b, nil +} + +func (sm *SyncMsg) String() string { + json, err := sm.ToJSON() + if err != nil { + return "" + } + return string(json) +} + +// SyncResponse represents the response to a synchronization event +type SyncResponse struct { + UsersLastUpdateAt int64 `json:"users_last_update_at"` + UserErrors []string `json:"user_errors"` + UsersSyncd []string `json:"users_syncd"` + + PostsLastUpdateAt int64 `json:"posts_last_update_at"` + PostErrors []string `json:"post_errors"` + + ReactionsLastUpdateAt int64 `json:"reactions_last_update_at"` + ReactionErrors []string `json:"reaction_errors"` +}