diff --git a/i18n/en.json b/i18n/en.json index 251593356b..683e1b2457 100644 --- a/i18n/en.json +++ b/i18n/en.json @@ -3486,6 +3486,10 @@ "id": "ent.elasticsearch.indexer.do_job.parse_start_time.error", "translation": "Elasticsearch indexing worker failed to parse the start time" }, + { + "id": "ent.elasticsearch.indexer.index_batch.nothing_left_to_index.error", + "translation": "Trying to index a new batch when all the entities are completed." + }, { "id": "ent.elasticsearch.not_started.error", "translation": "Elasticsearch is not started" @@ -5042,6 +5046,10 @@ "id": "plugin_api.send_mail.missing_subject", "translation": "Missing email subject." }, + { + "id": "store.sql_channel.get_channels_batch_for_indexing.get.app_error", + "translation": "Unable to get the channels batch for indexing" + }, { "id": "store.sql_channel.remove_all_deactivated_members.app_error", "translation": "We could not remove the deactivated users from the channel" @@ -6482,6 +6490,18 @@ "id": "store.sql_user.get_unread_count_for_channel.app_error", "translation": "We could not get the unread message count for the user and channel" }, + { + "id": "store.sql_user.get_users_batch_for_indexing.get_channel_members.app_error", + "translation": "Unable to get the channel members for the users batch for indexing" + }, + { + "id": "store.sql_user.get_users_batch_for_indexing.get_team_members.app_error", + "translation": "Unable to get the team members for the users batch for indexing" + }, + { + "id": "store.sql_user.get_users_batch_for_indexing.get_users.app_error", + "translation": "Unable to get the users batch for indexing" + }, { "id": "store.sql_user.missing_account.const", "translation": "Unable to find the user." diff --git a/model/user.go b/model/user.go index 36a62d9422..a864ebd4ed 100644 --- a/model/user.go +++ b/model/user.go @@ -103,6 +103,18 @@ type UserAuth struct { AuthService string `json:"auth_service,omitempty"` } +type UserForIndexing struct { + Id string `json:"id"` + Username string `json:"username"` + Nickname string `json:"nickname"` + FirstName string `json:"first_name"` + LastName string `json:"last_name"` + CreateAt int64 `json:"create_at"` + DeleteAt int64 `json:"delete_at"` + TeamsIds []string `json:"team_id"` + ChannelsIds []string `json:"channel_id"` +} + func (u *User) DeepCopy() *User { copyUser := *u if u.AuthData != nil { diff --git a/store/sqlstore/channel_store.go b/store/sqlstore/channel_store.go index 17dbab4825..dc167c795d 100644 --- a/store/sqlstore/channel_store.go +++ b/store/sqlstore/channel_store.go @@ -2583,3 +2583,32 @@ func (s SqlChannelStore) GetAllDirectChannelsForExportAfter(limit int, afterId s result.Data = directChannelsForExport }) } + +func (s SqlChannelStore) GetChannelsBatchForIndexing(startTime, endTime int64, limit int) store.StoreChannel { + return store.Do(func(result *store.StoreResult) { + var channels []*model.Channel + _, err1 := s.GetSearchReplica().Select(&channels, + `SELECT + * + FROM + Channels + WHERE + Type = 'O' + AND + CreateAt >= :StartTime + AND + CreateAt < :EndTime + ORDER BY + CreateAt + LIMIT + :NumChannels`, + map[string]interface{}{"StartTime": startTime, "EndTime": endTime, "NumChannels": limit}) + + if err1 != nil { + result.Err = model.NewAppError("SqlChannelStore.GetChannelsBatchForIndexing", "store.sql_channel.get_channels_batch_for_indexing.get.app_error", nil, err1.Error(), http.StatusInternalServerError) + return + } + + result.Data = channels + }) +} diff --git a/store/sqlstore/post_store.go b/store/sqlstore/post_store.go index d4a6bd42ab..b515563501 100644 --- a/store/sqlstore/post_store.go +++ b/store/sqlstore/post_store.go @@ -269,14 +269,14 @@ func (s *SqlPostStore) GetFlaggedPostsForChannel(userId, channelId string, offse var posts []*model.Post query := ` - SELECT - * - FROM Posts - WHERE - Id IN (SELECT Name FROM Preferences WHERE UserId = :UserId AND Category = :Category) + SELECT + * + FROM Posts + WHERE + Id IN (SELECT Name FROM Preferences WHERE UserId = :UserId AND Category = :Category) AND ChannelId = :ChannelId - AND DeleteAt = 0 - ORDER BY CreateAt DESC + AND DeleteAt = 0 + ORDER BY CreateAt DESC LIMIT :Limit OFFSET :Offset` if _, err := s.GetReplica().Select(&posts, query, map[string]interface{}{"UserId": userId, "Category": model.PREFERENCE_CATEGORY_FLAGGED_POST, "ChannelId": channelId, "Offset": offset, "Limit": limit}); err != nil { @@ -835,7 +835,7 @@ func (s *SqlPostStore) Search(teamId string, userId string, params *model.Search ` + userIdPart + ` ` + deletedQueryPart + ` CHANNEL_FILTER) - CREATEDATE_CLAUSE + CREATEDATE_CLAUSE SEARCH_CLAUSE ORDER BY CreateAt DESC LIMIT 100` @@ -1264,11 +1264,11 @@ func (s *SqlPostStore) determineMaxPostSize() int { // The Post.Message column in MySQL has historically been TEXT, with a maximum // limit of 65535. if err := s.GetReplica().SelectOne(&maxPostSizeBytes, ` - SELECT + SELECT COALESCE(CHARACTER_MAXIMUM_LENGTH, 0) - FROM + FROM INFORMATION_SCHEMA.COLUMNS - WHERE + WHERE table_schema = DATABASE() AND table_name = 'Posts' AND column_name = 'Message' diff --git a/store/sqlstore/user_store.go b/store/sqlstore/user_store.go index 00fee28c41..e3b3dc489c 100644 --- a/store/sqlstore/user_store.go +++ b/store/sqlstore/user_store.go @@ -7,6 +7,7 @@ import ( "database/sql" "fmt" "net/http" + "sort" "strings" sq "github.com/Masterminds/squirrel" @@ -1363,15 +1364,15 @@ func (us SqlUserStore) GetEtagForProfilesNotInTeam(teamId string) store.StoreCha var querystr string querystr = ` - SELECT + SELECT CONCAT(MAX(UpdateAt), '.', COUNT(Id)) as etag - FROM + FROM Users as u - LEFT JOIN TeamMembers tm - ON tm.UserId = u.Id - AND tm.TeamId = :TeamId + LEFT JOIN TeamMembers tm + ON tm.UserId = u.Id + AND tm.TeamId = :TeamId AND tm.DeleteAt = 0 - WHERE + WHERE tm.UserId IS NULL ` etag, err := us.GetReplica().SelectStr(querystr, map[string]interface{}{"TeamId": teamId}) @@ -1449,3 +1450,89 @@ func (us SqlUserStore) InferSystemInstallDate() store.StoreChannel { result.Data = createAt }) } + +func (us SqlUserStore) GetUsersBatchForIndexing(startTime, endTime int64, limit int) store.StoreChannel { + return store.Do(func(result *store.StoreResult) { + var users []*model.User + usersQuery, args, _ := us.usersQuery. + Where(sq.GtOrEq{"u.CreateAt": startTime}). + Where(sq.Lt{"u.CreateAt": endTime}). + OrderBy("u.CreateAt"). + Limit(uint64(limit)). + ToSql() + _, err1 := us.GetSearchReplica().Select(&users, usersQuery, args...) + + if err1 != nil { + result.Err = model.NewAppError("SqlUserStore.GetUsersBatchForIndexing", "store.sql_user.get_users_batch_for_indexing.get_users.app_error", nil, err1.Error(), http.StatusInternalServerError) + return + } + + userIds := []string{} + for _, user := range users { + userIds = append(userIds, user.Id) + } + + var channelMembers []*model.ChannelMember + channelMembersQuery, args, _ := us.getQueryBuilder(). + Select("cm.*"). + From("ChannelMembers cm"). + Join("Channels c ON cm.ChannelId = c.Id"). + Where(sq.Eq{"c.Type": "O", "cm.UserId": userIds}). + ToSql() + _, err2 := us.GetSearchReplica().Select(&channelMembers, channelMembersQuery, args...) + + if err2 != nil { + result.Err = model.NewAppError("SqlUserStore.GetUsersBatchForIndexing", "store.sql_user.get_users_batch_for_indexing.get_channel_members.app_error", nil, err2.Error(), http.StatusInternalServerError) + return + } + + var teamMembers []*model.TeamMember + teamMembersQuery, args, _ := us.getQueryBuilder(). + Select("*"). + From("TeamMembers"). + Where(sq.Eq{"UserId": userIds, "DeleteAt": 0}). + ToSql() + _, err3 := us.GetSearchReplica().Select(&teamMembers, teamMembersQuery, args...) + + if err3 != nil { + result.Err = model.NewAppError("SqlUserStore.GetUsersBatchForIndexing", "store.sql_user.get_users_batch_for_indexing.get_team_members.app_error", nil, err3.Error(), http.StatusInternalServerError) + return + } + + userMap := map[string]*model.UserForIndexing{} + for _, user := range users { + userMap[user.Id] = &model.UserForIndexing{ + Id: user.Id, + Username: user.Username, + Nickname: user.Nickname, + FirstName: user.FirstName, + LastName: user.LastName, + CreateAt: user.CreateAt, + DeleteAt: user.DeleteAt, + TeamsIds: []string{}, + ChannelsIds: []string{}, + } + } + + for _, c := range channelMembers { + if userMap[c.UserId] != nil { + userMap[c.UserId].ChannelsIds = append(userMap[c.UserId].ChannelsIds, c.ChannelId) + } + } + for _, t := range teamMembers { + if userMap[t.UserId] != nil { + userMap[t.UserId].TeamsIds = append(userMap[t.UserId].TeamsIds, t.TeamId) + } + } + + usersForIndexing := []*model.UserForIndexing{} + for _, user := range userMap { + usersForIndexing = append(usersForIndexing, user) + } + sort.Slice(usersForIndexing, func(i, j int) bool { + return usersForIndexing[i].CreateAt < usersForIndexing[j].CreateAt + }) + + result.Data = usersForIndexing + }) +} diff --git a/store/store.go b/store/store.go index c61d62a8c9..d395e2eec7 100644 --- a/store/store.go +++ b/store/store.go @@ -194,6 +194,7 @@ type ChannelStore interface { GetAllDirectChannelsForExportAfter(limit int, afterId string) StoreChannel GetChannelMembersForExport(userId string, teamId string) StoreChannel RemoveAllDeactivatedMembers(channelId string) StoreChannel + GetChannelsBatchForIndexing(startTime, endTime int64, limit int) StoreChannel } type ChannelMemberHistoryStore interface { @@ -291,6 +292,7 @@ type UserStore interface { ClearAllCustomRoleAssignments() StoreChannel InferSystemInstallDate() StoreChannel GetAllAfter(limit int, afterId string) StoreChannel + GetUsersBatchForIndexing(startTime, endTime int64, limit int) StoreChannel Count(options model.UserCountOptions) StoreChannel } diff --git a/store/storetest/channel_store.go b/store/storetest/channel_store.go index cb1ff30ce8..c68638ac17 100644 --- a/store/storetest/channel_store.go +++ b/store/storetest/channel_store.go @@ -85,6 +85,7 @@ func TestChannelStore(t *testing.T, ss store.Store, s SqlSupplier) { t.Run("ExportAllDirectChannels", func(t *testing.T) { testChannelStoreExportAllDirectChannels(t, ss, s) }) t.Run("ExportAllDirectChannelsExcludePrivateAndPublic", func(t *testing.T) { testChannelStoreExportAllDirectChannelsExcludePrivateAndPublic(t, ss, s) }) t.Run("ExportAllDirectChannelsDeletedChannel", func(t *testing.T) { testChannelStoreExportAllDirectChannelsDeletedChannel(t, ss, s) }) + t.Run("GetChannelsBatchForIndexing", func(t *testing.T) { testChannelStoreGetChannelsBatchForIndexing(t, ss) }) } func testChannelStoreSave(t *testing.T, ss store.Store) { @@ -3397,3 +3398,67 @@ func testChannelStoreExportAllDirectChannelsDeletedChannel(t *testing.T, ss stor // Manually truncate Channels table until testlib can handle cleanups s.GetMaster().Exec("TRUNCATE Channels") } + +func testChannelStoreGetChannelsBatchForIndexing(t *testing.T, ss store.Store) { + // Set up all the objects needed + c1 := &model.Channel{} + c1.DisplayName = "Channel1" + c1.Name = "zz" + model.NewId() + "b" + c1.Type = model.CHANNEL_OPEN + store.Must(ss.Channel().Save(c1, -1)) + + time.Sleep(10 * time.Millisecond) + + c2 := &model.Channel{} + c2.DisplayName = "Channel2" + c2.Name = "zz" + model.NewId() + "b" + c2.Type = model.CHANNEL_OPEN + store.Must(ss.Channel().Save(c2, -1)) + + time.Sleep(10 * time.Millisecond) + startTime := c2.CreateAt + + c3 := &model.Channel{} + c3.DisplayName = "Channel3" + c3.Name = "zz" + model.NewId() + "b" + c3.Type = model.CHANNEL_OPEN + store.Must(ss.Channel().Save(c3, -1)) + + c4 := &model.Channel{} + c4.DisplayName = "Channel4" + c4.Name = "zz" + model.NewId() + "b" + c4.Type = model.CHANNEL_PRIVATE + store.Must(ss.Channel().Save(c4, -1)) + + c5 := &model.Channel{} + c5.DisplayName = "Channel5" + c5.Name = "zz" + model.NewId() + "b" + c5.Type = model.CHANNEL_OPEN + store.Must(ss.Channel().Save(c5, -1)) + + time.Sleep(10 * time.Millisecond) + + c6 := &model.Channel{} + c6.DisplayName = "Channel6" + c6.Name = "zz" + model.NewId() + "b" + c6.Type = model.CHANNEL_OPEN + store.Must(ss.Channel().Save(c6, -1)) + + endTime := c6.CreateAt + + // First and last channel should be outside the range + res1 := <-ss.Channel().GetChannelsBatchForIndexing(startTime, endTime, 1000) + assert.Nil(t, res1.Err) + assert.ElementsMatch(t, []*model.Channel{c2, c3, c5}, res1.Data) + + // Update the endTime, last channel should be in + endTime = model.GetMillis() + res2 := <-ss.Channel().GetChannelsBatchForIndexing(startTime, endTime, 1000) + assert.Nil(t, res2.Err) + assert.ElementsMatch(t, []*model.Channel{c2, c3, c5, c6}, res2.Data) + + // Testing the limit + res3 := <-ss.Channel().GetChannelsBatchForIndexing(startTime, endTime, 2) + assert.Nil(t, res3.Err) + assert.ElementsMatch(t, []*model.Channel{c2, c3}, res3.Data) +} diff --git a/store/storetest/mocks/ChannelStore.go b/store/storetest/mocks/ChannelStore.go index d1ade7636e..ee32173052 100644 --- a/store/storetest/mocks/ChannelStore.go +++ b/store/storetest/mocks/ChannelStore.go @@ -370,6 +370,38 @@ func (_m *ChannelStore) GetChannels(teamId string, userId string, includeDeleted return r0 } +// GetChannelsBatchForIndexing provides a mock function with given fields: startTime, endTime, limit +func (_m *ChannelStore) GetChannelsBatchForIndexing(startTime int64, endTime int64, limit int) store.StoreChannel { + ret := _m.Called(startTime, endTime, limit) + + var r0 store.StoreChannel + if rf, ok := ret.Get(0).(func(int64, int64, int) store.StoreChannel); ok { + r0 = rf(startTime, endTime, limit) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(store.StoreChannel) + } + } + + return r0 +} + +// GetChannelsByIds provides a mock function with given fields: channelIds +func (_m *ChannelStore) GetChannelsByIds(channelIds []string) store.StoreChannel { + ret := _m.Called(channelIds) + + var r0 store.StoreChannel + if rf, ok := ret.Get(0).(func([]string) store.StoreChannel); ok { + r0 = rf(channelIds) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(store.StoreChannel) + } + } + + return r0 +} + // GetChannelsByScheme provides a mock function with given fields: schemeId, offset, limit func (_m *ChannelStore) GetChannelsByScheme(schemeId string, offset int, limit int) store.StoreChannel { ret := _m.Called(schemeId, offset, limit) @@ -418,22 +450,6 @@ func (_m *ChannelStore) GetDeletedByName(team_id string, name string) store.Stor return r0 } -// GetChannelsByIds provides a mock funcion with given fields: channelIds -func (_m *ChannelStore) GetChannelsByIds(channelIds []string) store.StoreChannel { - ret := _m.Called(channelIds) - - var r0 store.StoreChannel - if rf, ok := ret.Get(0).(func([]string) store.StoreChannel); ok { - r0 = rf(channelIds) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(store.StoreChannel) - } - } - - return r0 -} - // GetForPost provides a mock function with given fields: postId func (_m *ChannelStore) GetForPost(postId string) store.StoreChannel { ret := _m.Called(postId) diff --git a/store/storetest/mocks/UserStore.go b/store/storetest/mocks/UserStore.go index 6a6be180b1..4cf5b7096f 100644 --- a/store/storetest/mocks/UserStore.go +++ b/store/storetest/mocks/UserStore.go @@ -530,6 +530,22 @@ func (_m *UserStore) GetUnreadCountForChannel(userId string, channelId string) s return r0 } +// GetUsersBatchForIndexing provides a mock function with given fields: startTime, endTime, limit +func (_m *UserStore) GetUsersBatchForIndexing(startTime int64, endTime int64, limit int) store.StoreChannel { + ret := _m.Called(startTime, endTime, limit) + + var r0 store.StoreChannel + if rf, ok := ret.Get(0).(func(int64, int64, int) store.StoreChannel); ok { + r0 = rf(startTime, endTime, limit) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(store.StoreChannel) + } + } + + return r0 +} + // InferSystemInstallDate provides a mock function with given fields: func (_m *UserStore) InferSystemInstallDate() store.StoreChannel { ret := _m.Called() diff --git a/store/storetest/user_store.go b/store/storetest/user_store.go index 93f354457b..baa2858ff8 100644 --- a/store/storetest/user_store.go +++ b/store/storetest/user_store.go @@ -64,6 +64,7 @@ func TestUserStore(t *testing.T, ss store.Store) { t.Run("GetProfilesNotInTeam", func(t *testing.T) { testUserStoreGetProfilesNotInTeam(t, ss) }) t.Run("ClearAllCustomRoleAssignments", func(t *testing.T) { testUserStoreClearAllCustomRoleAssignments(t, ss) }) t.Run("GetAllAfter", func(t *testing.T) { testUserStoreGetAllAfter(t, ss) }) + t.Run("GetUsersBatchForIndexing", func(t *testing.T) { testUserStoreGetUsersBatchForIndexing(t, ss) }) } func testUserStoreSave(t *testing.T, ss store.Store) { @@ -3310,3 +3311,122 @@ func testUserStoreGetAllAfter(t *testing.T, ss store.Store) { assert.Equal(t, []*model.User{}, actual) }) } + +func testUserStoreGetUsersBatchForIndexing(t *testing.T, ss store.Store) { + // Set up all the objects needed + t1 := store.Must(ss.Team().Save(&model.Team{ + DisplayName: "Team1", + Name: model.NewId(), + Type: model.TEAM_OPEN, + })).(*model.Team) + cPub1 := store.Must(ss.Channel().Save(&model.Channel{ + Name: model.NewId(), + Type: model.CHANNEL_OPEN, + }, -1)).(*model.Channel) + cPub2 := store.Must(ss.Channel().Save(&model.Channel{ + Name: model.NewId(), + Type: model.CHANNEL_OPEN, + }, -1)).(*model.Channel) + cPriv := store.Must(ss.Channel().Save(&model.Channel{ + Name: model.NewId(), + Type: model.CHANNEL_PRIVATE, + }, -1)).(*model.Channel) + + u1 := store.Must(ss.User().Save(&model.User{ + Email: MakeEmail(), + Username: model.NewId(), + CreateAt: model.GetMillis(), + })).(*model.User) + + time.Sleep(10 * time.Millisecond) + + u2 := store.Must(ss.User().Save(&model.User{ + Email: MakeEmail(), + Username: model.NewId(), + CreateAt: model.GetMillis(), + })).(*model.User) + store.Must(ss.Team().SaveMember(&model.TeamMember{ + UserId: u2.Id, + TeamId: t1.Id, + }, 100)) + store.Must(ss.Channel().SaveMember(&model.ChannelMember{ + UserId: u2.Id, + ChannelId: cPub1.Id, + NotifyProps: model.GetDefaultChannelNotifyProps(), + })) + store.Must(ss.Channel().SaveMember(&model.ChannelMember{ + UserId: u2.Id, + ChannelId: cPub2.Id, + NotifyProps: model.GetDefaultChannelNotifyProps(), + })) + + startTime := u2.CreateAt + time.Sleep(10 * time.Millisecond) + + u3 := store.Must(ss.User().Save(&model.User{ + Email: MakeEmail(), + Username: model.NewId(), + CreateAt: model.GetMillis(), + })).(*model.User) + store.Must(ss.Team().SaveMember(&model.TeamMember{ + UserId: u3.Id, + TeamId: t1.Id, + DeleteAt: model.GetMillis(), + }, 100)) + store.Must(ss.Channel().SaveMember(&model.ChannelMember{ + UserId: u3.Id, + ChannelId: cPub2.Id, + NotifyProps: model.GetDefaultChannelNotifyProps(), + })) + store.Must(ss.Channel().SaveMember(&model.ChannelMember{ + UserId: u3.Id, + ChannelId: cPriv.Id, + NotifyProps: model.GetDefaultChannelNotifyProps(), + })) + + endTime := u3.CreateAt + + // First and last user should be outside the range + res1 := <-ss.User().GetUsersBatchForIndexing(startTime, endTime, 100) + assert.Nil(t, res1.Err) + res1List := res1.Data.([]*model.UserForIndexing) + + assert.Len(t, res1List, 1) + assert.Equal(t, res1List[0].Username, u2.Username) + assert.ElementsMatch(t, res1List[0].TeamsIds, []string{t1.Id}) + assert.ElementsMatch(t, res1List[0].ChannelsIds, []string{cPub1.Id, cPub2.Id}) + + // Update startTime to include first user + startTime = u1.CreateAt + res2 := <-ss.User().GetUsersBatchForIndexing(startTime, endTime, 100) + assert.Nil(t, res1.Err) + res2List := res2.Data.([]*model.UserForIndexing) + + assert.Len(t, res2List, 2) + assert.Equal(t, res2List[0].Username, u1.Username) + assert.Equal(t, res2List[0].ChannelsIds, []string{}) + assert.Equal(t, res2List[0].TeamsIds, []string{}) + assert.Equal(t, res2List[1].Username, u2.Username) + + // Update endTime to include last user + endTime = model.GetMillis() + res3 := <-ss.User().GetUsersBatchForIndexing(startTime, endTime, 100) + assert.Nil(t, res3.Err) + res3List := res3.Data.([]*model.UserForIndexing) + + assert.Len(t, res3List, 3) + assert.Equal(t, res3List[0].Username, u1.Username) + assert.Equal(t, res3List[1].Username, u2.Username) + assert.Equal(t, res3List[2].Username, u3.Username) + assert.ElementsMatch(t, res3List[2].TeamsIds, []string{}) + assert.ElementsMatch(t, res3List[2].ChannelsIds, []string{cPub2.Id}) + + // Testing the limit + res4 := <-ss.User().GetUsersBatchForIndexing(startTime, endTime, 2) + assert.Nil(t, res4.Err) + res4List := res4.Data.([]*model.UserForIndexing) + + assert.Len(t, res4List, 2) + assert.Equal(t, res4List[0].Username, u1.Username) + assert.Equal(t, res4List[1].Username, u2.Username) +}