[MM-14253] Adds channels and users to the bulk index process (#10434)

* [MM-14253] Adds channels and users to the bulk index process

* Add support for PostgreSQL and sort the user query result

* Add tests for user and channel batch queries

* Fix test times
This commit is contained in:
Miguel de la Cruz
2019-03-19 12:42:08 +00:00
committed by GitHub
parent 5a9d95d9c7
commit 372ef87f76
10 changed files with 400 additions and 33 deletions

View File

@@ -3486,6 +3486,10 @@
"id": "ent.elasticsearch.indexer.do_job.parse_start_time.error",
"translation": "Elasticsearch indexing worker failed to parse the start time"
},
{
"id": "ent.elasticsearch.indexer.index_batch.nothing_left_to_index.error",
"translation": "Trying to index a new batch when all the entities are completed."
},
{
"id": "ent.elasticsearch.not_started.error",
"translation": "Elasticsearch is not started"
@@ -5042,6 +5046,10 @@
"id": "plugin_api.send_mail.missing_subject",
"translation": "Missing email subject."
},
{
"id": "store.sql_channel.get_channels_batch_for_indexing.get.app_error",
"translation": "Unable to get the channels batch for indexing"
},
{
"id": "store.sql_channel.remove_all_deactivated_members.app_error",
"translation": "We could not remove the deactivated users from the channel"
@@ -6482,6 +6490,18 @@
"id": "store.sql_user.get_unread_count_for_channel.app_error",
"translation": "We could not get the unread message count for the user and channel"
},
{
"id": "store.sql_user.get_users_batch_for_indexing.get_channel_members.app_error",
"translation": "Unable to get the channel members for the users batch for indexing"
},
{
"id": "store.sql_user.get_users_batch_for_indexing.get_team_members.app_error",
"translation": "Unable to get the team members for the users batch for indexing"
},
{
"id": "store.sql_user.get_users_batch_for_indexing.get_users.app_error",
"translation": "Unable to get the users batch for indexing"
},
{
"id": "store.sql_user.missing_account.const",
"translation": "Unable to find the user."

View File

@@ -103,6 +103,18 @@ type UserAuth struct {
AuthService string `json:"auth_service,omitempty"`
}
type UserForIndexing struct {
Id string `json:"id"`
Username string `json:"username"`
Nickname string `json:"nickname"`
FirstName string `json:"first_name"`
LastName string `json:"last_name"`
CreateAt int64 `json:"create_at"`
DeleteAt int64 `json:"delete_at"`
TeamsIds []string `json:"team_id"`
ChannelsIds []string `json:"channel_id"`
}
func (u *User) DeepCopy() *User {
copyUser := *u
if u.AuthData != nil {

View File

@@ -2583,3 +2583,32 @@ func (s SqlChannelStore) GetAllDirectChannelsForExportAfter(limit int, afterId s
result.Data = directChannelsForExport
})
}
func (s SqlChannelStore) GetChannelsBatchForIndexing(startTime, endTime int64, limit int) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
var channels []*model.Channel
_, err1 := s.GetSearchReplica().Select(&channels,
`SELECT
*
FROM
Channels
WHERE
Type = 'O'
AND
CreateAt >= :StartTime
AND
CreateAt < :EndTime
ORDER BY
CreateAt
LIMIT
:NumChannels`,
map[string]interface{}{"StartTime": startTime, "EndTime": endTime, "NumChannels": limit})
if err1 != nil {
result.Err = model.NewAppError("SqlChannelStore.GetChannelsBatchForIndexing", "store.sql_channel.get_channels_batch_for_indexing.get.app_error", nil, err1.Error(), http.StatusInternalServerError)
return
}
result.Data = channels
})
}

View File

@@ -269,14 +269,14 @@ func (s *SqlPostStore) GetFlaggedPostsForChannel(userId, channelId string, offse
var posts []*model.Post
query := `
SELECT
*
FROM Posts
WHERE
Id IN (SELECT Name FROM Preferences WHERE UserId = :UserId AND Category = :Category)
SELECT
*
FROM Posts
WHERE
Id IN (SELECT Name FROM Preferences WHERE UserId = :UserId AND Category = :Category)
AND ChannelId = :ChannelId
AND DeleteAt = 0
ORDER BY CreateAt DESC
AND DeleteAt = 0
ORDER BY CreateAt DESC
LIMIT :Limit OFFSET :Offset`
if _, err := s.GetReplica().Select(&posts, query, map[string]interface{}{"UserId": userId, "Category": model.PREFERENCE_CATEGORY_FLAGGED_POST, "ChannelId": channelId, "Offset": offset, "Limit": limit}); err != nil {
@@ -835,7 +835,7 @@ func (s *SqlPostStore) Search(teamId string, userId string, params *model.Search
` + userIdPart + `
` + deletedQueryPart + `
CHANNEL_FILTER)
CREATEDATE_CLAUSE
CREATEDATE_CLAUSE
SEARCH_CLAUSE
ORDER BY CreateAt DESC
LIMIT 100`
@@ -1264,11 +1264,11 @@ func (s *SqlPostStore) determineMaxPostSize() int {
// The Post.Message column in MySQL has historically been TEXT, with a maximum
// limit of 65535.
if err := s.GetReplica().SelectOne(&maxPostSizeBytes, `
SELECT
SELECT
COALESCE(CHARACTER_MAXIMUM_LENGTH, 0)
FROM
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
WHERE
table_schema = DATABASE()
AND table_name = 'Posts'
AND column_name = 'Message'

View File

@@ -7,6 +7,7 @@ import (
"database/sql"
"fmt"
"net/http"
"sort"
"strings"
sq "github.com/Masterminds/squirrel"
@@ -1363,15 +1364,15 @@ func (us SqlUserStore) GetEtagForProfilesNotInTeam(teamId string) store.StoreCha
var querystr string
querystr = `
SELECT
SELECT
CONCAT(MAX(UpdateAt), '.', COUNT(Id)) as etag
FROM
FROM
Users as u
LEFT JOIN TeamMembers tm
ON tm.UserId = u.Id
AND tm.TeamId = :TeamId
LEFT JOIN TeamMembers tm
ON tm.UserId = u.Id
AND tm.TeamId = :TeamId
AND tm.DeleteAt = 0
WHERE
WHERE
tm.UserId IS NULL
`
etag, err := us.GetReplica().SelectStr(querystr, map[string]interface{}{"TeamId": teamId})
@@ -1449,3 +1450,89 @@ func (us SqlUserStore) InferSystemInstallDate() store.StoreChannel {
result.Data = createAt
})
}
func (us SqlUserStore) GetUsersBatchForIndexing(startTime, endTime int64, limit int) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
var users []*model.User
usersQuery, args, _ := us.usersQuery.
Where(sq.GtOrEq{"u.CreateAt": startTime}).
Where(sq.Lt{"u.CreateAt": endTime}).
OrderBy("u.CreateAt").
Limit(uint64(limit)).
ToSql()
_, err1 := us.GetSearchReplica().Select(&users, usersQuery, args...)
if err1 != nil {
result.Err = model.NewAppError("SqlUserStore.GetUsersBatchForIndexing", "store.sql_user.get_users_batch_for_indexing.get_users.app_error", nil, err1.Error(), http.StatusInternalServerError)
return
}
userIds := []string{}
for _, user := range users {
userIds = append(userIds, user.Id)
}
var channelMembers []*model.ChannelMember
channelMembersQuery, args, _ := us.getQueryBuilder().
Select("cm.*").
From("ChannelMembers cm").
Join("Channels c ON cm.ChannelId = c.Id").
Where(sq.Eq{"c.Type": "O", "cm.UserId": userIds}).
ToSql()
_, err2 := us.GetSearchReplica().Select(&channelMembers, channelMembersQuery, args...)
if err2 != nil {
result.Err = model.NewAppError("SqlUserStore.GetUsersBatchForIndexing", "store.sql_user.get_users_batch_for_indexing.get_channel_members.app_error", nil, err2.Error(), http.StatusInternalServerError)
return
}
var teamMembers []*model.TeamMember
teamMembersQuery, args, _ := us.getQueryBuilder().
Select("*").
From("TeamMembers").
Where(sq.Eq{"UserId": userIds, "DeleteAt": 0}).
ToSql()
_, err3 := us.GetSearchReplica().Select(&teamMembers, teamMembersQuery, args...)
if err3 != nil {
result.Err = model.NewAppError("SqlUserStore.GetUsersBatchForIndexing", "store.sql_user.get_users_batch_for_indexing.get_team_members.app_error", nil, err3.Error(), http.StatusInternalServerError)
return
}
userMap := map[string]*model.UserForIndexing{}
for _, user := range users {
userMap[user.Id] = &model.UserForIndexing{
Id: user.Id,
Username: user.Username,
Nickname: user.Nickname,
FirstName: user.FirstName,
LastName: user.LastName,
CreateAt: user.CreateAt,
DeleteAt: user.DeleteAt,
TeamsIds: []string{},
ChannelsIds: []string{},
}
}
for _, c := range channelMembers {
if userMap[c.UserId] != nil {
userMap[c.UserId].ChannelsIds = append(userMap[c.UserId].ChannelsIds, c.ChannelId)
}
}
for _, t := range teamMembers {
if userMap[t.UserId] != nil {
userMap[t.UserId].TeamsIds = append(userMap[t.UserId].TeamsIds, t.TeamId)
}
}
usersForIndexing := []*model.UserForIndexing{}
for _, user := range userMap {
usersForIndexing = append(usersForIndexing, user)
}
sort.Slice(usersForIndexing, func(i, j int) bool {
return usersForIndexing[i].CreateAt < usersForIndexing[j].CreateAt
})
result.Data = usersForIndexing
})
}

View File

@@ -194,6 +194,7 @@ type ChannelStore interface {
GetAllDirectChannelsForExportAfter(limit int, afterId string) StoreChannel
GetChannelMembersForExport(userId string, teamId string) StoreChannel
RemoveAllDeactivatedMembers(channelId string) StoreChannel
GetChannelsBatchForIndexing(startTime, endTime int64, limit int) StoreChannel
}
type ChannelMemberHistoryStore interface {
@@ -291,6 +292,7 @@ type UserStore interface {
ClearAllCustomRoleAssignments() StoreChannel
InferSystemInstallDate() StoreChannel
GetAllAfter(limit int, afterId string) StoreChannel
GetUsersBatchForIndexing(startTime, endTime int64, limit int) StoreChannel
Count(options model.UserCountOptions) StoreChannel
}

View File

@@ -85,6 +85,7 @@ func TestChannelStore(t *testing.T, ss store.Store, s SqlSupplier) {
t.Run("ExportAllDirectChannels", func(t *testing.T) { testChannelStoreExportAllDirectChannels(t, ss, s) })
t.Run("ExportAllDirectChannelsExcludePrivateAndPublic", func(t *testing.T) { testChannelStoreExportAllDirectChannelsExcludePrivateAndPublic(t, ss, s) })
t.Run("ExportAllDirectChannelsDeletedChannel", func(t *testing.T) { testChannelStoreExportAllDirectChannelsDeletedChannel(t, ss, s) })
t.Run("GetChannelsBatchForIndexing", func(t *testing.T) { testChannelStoreGetChannelsBatchForIndexing(t, ss) })
}
func testChannelStoreSave(t *testing.T, ss store.Store) {
@@ -3397,3 +3398,67 @@ func testChannelStoreExportAllDirectChannelsDeletedChannel(t *testing.T, ss stor
// Manually truncate Channels table until testlib can handle cleanups
s.GetMaster().Exec("TRUNCATE Channels")
}
func testChannelStoreGetChannelsBatchForIndexing(t *testing.T, ss store.Store) {
// Set up all the objects needed
c1 := &model.Channel{}
c1.DisplayName = "Channel1"
c1.Name = "zz" + model.NewId() + "b"
c1.Type = model.CHANNEL_OPEN
store.Must(ss.Channel().Save(c1, -1))
time.Sleep(10 * time.Millisecond)
c2 := &model.Channel{}
c2.DisplayName = "Channel2"
c2.Name = "zz" + model.NewId() + "b"
c2.Type = model.CHANNEL_OPEN
store.Must(ss.Channel().Save(c2, -1))
time.Sleep(10 * time.Millisecond)
startTime := c2.CreateAt
c3 := &model.Channel{}
c3.DisplayName = "Channel3"
c3.Name = "zz" + model.NewId() + "b"
c3.Type = model.CHANNEL_OPEN
store.Must(ss.Channel().Save(c3, -1))
c4 := &model.Channel{}
c4.DisplayName = "Channel4"
c4.Name = "zz" + model.NewId() + "b"
c4.Type = model.CHANNEL_PRIVATE
store.Must(ss.Channel().Save(c4, -1))
c5 := &model.Channel{}
c5.DisplayName = "Channel5"
c5.Name = "zz" + model.NewId() + "b"
c5.Type = model.CHANNEL_OPEN
store.Must(ss.Channel().Save(c5, -1))
time.Sleep(10 * time.Millisecond)
c6 := &model.Channel{}
c6.DisplayName = "Channel6"
c6.Name = "zz" + model.NewId() + "b"
c6.Type = model.CHANNEL_OPEN
store.Must(ss.Channel().Save(c6, -1))
endTime := c6.CreateAt
// First and last channel should be outside the range
res1 := <-ss.Channel().GetChannelsBatchForIndexing(startTime, endTime, 1000)
assert.Nil(t, res1.Err)
assert.ElementsMatch(t, []*model.Channel{c2, c3, c5}, res1.Data)
// Update the endTime, last channel should be in
endTime = model.GetMillis()
res2 := <-ss.Channel().GetChannelsBatchForIndexing(startTime, endTime, 1000)
assert.Nil(t, res2.Err)
assert.ElementsMatch(t, []*model.Channel{c2, c3, c5, c6}, res2.Data)
// Testing the limit
res3 := <-ss.Channel().GetChannelsBatchForIndexing(startTime, endTime, 2)
assert.Nil(t, res3.Err)
assert.ElementsMatch(t, []*model.Channel{c2, c3}, res3.Data)
}

View File

@@ -370,6 +370,38 @@ func (_m *ChannelStore) GetChannels(teamId string, userId string, includeDeleted
return r0
}
// GetChannelsBatchForIndexing provides a mock function with given fields: startTime, endTime, limit
func (_m *ChannelStore) GetChannelsBatchForIndexing(startTime int64, endTime int64, limit int) store.StoreChannel {
ret := _m.Called(startTime, endTime, limit)
var r0 store.StoreChannel
if rf, ok := ret.Get(0).(func(int64, int64, int) store.StoreChannel); ok {
r0 = rf(startTime, endTime, limit)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(store.StoreChannel)
}
}
return r0
}
// GetChannelsByIds provides a mock function with given fields: channelIds
func (_m *ChannelStore) GetChannelsByIds(channelIds []string) store.StoreChannel {
ret := _m.Called(channelIds)
var r0 store.StoreChannel
if rf, ok := ret.Get(0).(func([]string) store.StoreChannel); ok {
r0 = rf(channelIds)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(store.StoreChannel)
}
}
return r0
}
// GetChannelsByScheme provides a mock function with given fields: schemeId, offset, limit
func (_m *ChannelStore) GetChannelsByScheme(schemeId string, offset int, limit int) store.StoreChannel {
ret := _m.Called(schemeId, offset, limit)
@@ -418,22 +450,6 @@ func (_m *ChannelStore) GetDeletedByName(team_id string, name string) store.Stor
return r0
}
// GetChannelsByIds provides a mock funcion with given fields: channelIds
func (_m *ChannelStore) GetChannelsByIds(channelIds []string) store.StoreChannel {
ret := _m.Called(channelIds)
var r0 store.StoreChannel
if rf, ok := ret.Get(0).(func([]string) store.StoreChannel); ok {
r0 = rf(channelIds)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(store.StoreChannel)
}
}
return r0
}
// GetForPost provides a mock function with given fields: postId
func (_m *ChannelStore) GetForPost(postId string) store.StoreChannel {
ret := _m.Called(postId)

View File

@@ -530,6 +530,22 @@ func (_m *UserStore) GetUnreadCountForChannel(userId string, channelId string) s
return r0
}
// GetUsersBatchForIndexing provides a mock function with given fields: startTime, endTime, limit
func (_m *UserStore) GetUsersBatchForIndexing(startTime int64, endTime int64, limit int) store.StoreChannel {
ret := _m.Called(startTime, endTime, limit)
var r0 store.StoreChannel
if rf, ok := ret.Get(0).(func(int64, int64, int) store.StoreChannel); ok {
r0 = rf(startTime, endTime, limit)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(store.StoreChannel)
}
}
return r0
}
// InferSystemInstallDate provides a mock function with given fields:
func (_m *UserStore) InferSystemInstallDate() store.StoreChannel {
ret := _m.Called()

View File

@@ -64,6 +64,7 @@ func TestUserStore(t *testing.T, ss store.Store) {
t.Run("GetProfilesNotInTeam", func(t *testing.T) { testUserStoreGetProfilesNotInTeam(t, ss) })
t.Run("ClearAllCustomRoleAssignments", func(t *testing.T) { testUserStoreClearAllCustomRoleAssignments(t, ss) })
t.Run("GetAllAfter", func(t *testing.T) { testUserStoreGetAllAfter(t, ss) })
t.Run("GetUsersBatchForIndexing", func(t *testing.T) { testUserStoreGetUsersBatchForIndexing(t, ss) })
}
func testUserStoreSave(t *testing.T, ss store.Store) {
@@ -3310,3 +3311,122 @@ func testUserStoreGetAllAfter(t *testing.T, ss store.Store) {
assert.Equal(t, []*model.User{}, actual)
})
}
func testUserStoreGetUsersBatchForIndexing(t *testing.T, ss store.Store) {
// Set up all the objects needed
t1 := store.Must(ss.Team().Save(&model.Team{
DisplayName: "Team1",
Name: model.NewId(),
Type: model.TEAM_OPEN,
})).(*model.Team)
cPub1 := store.Must(ss.Channel().Save(&model.Channel{
Name: model.NewId(),
Type: model.CHANNEL_OPEN,
}, -1)).(*model.Channel)
cPub2 := store.Must(ss.Channel().Save(&model.Channel{
Name: model.NewId(),
Type: model.CHANNEL_OPEN,
}, -1)).(*model.Channel)
cPriv := store.Must(ss.Channel().Save(&model.Channel{
Name: model.NewId(),
Type: model.CHANNEL_PRIVATE,
}, -1)).(*model.Channel)
u1 := store.Must(ss.User().Save(&model.User{
Email: MakeEmail(),
Username: model.NewId(),
CreateAt: model.GetMillis(),
})).(*model.User)
time.Sleep(10 * time.Millisecond)
u2 := store.Must(ss.User().Save(&model.User{
Email: MakeEmail(),
Username: model.NewId(),
CreateAt: model.GetMillis(),
})).(*model.User)
store.Must(ss.Team().SaveMember(&model.TeamMember{
UserId: u2.Id,
TeamId: t1.Id,
}, 100))
store.Must(ss.Channel().SaveMember(&model.ChannelMember{
UserId: u2.Id,
ChannelId: cPub1.Id,
NotifyProps: model.GetDefaultChannelNotifyProps(),
}))
store.Must(ss.Channel().SaveMember(&model.ChannelMember{
UserId: u2.Id,
ChannelId: cPub2.Id,
NotifyProps: model.GetDefaultChannelNotifyProps(),
}))
startTime := u2.CreateAt
time.Sleep(10 * time.Millisecond)
u3 := store.Must(ss.User().Save(&model.User{
Email: MakeEmail(),
Username: model.NewId(),
CreateAt: model.GetMillis(),
})).(*model.User)
store.Must(ss.Team().SaveMember(&model.TeamMember{
UserId: u3.Id,
TeamId: t1.Id,
DeleteAt: model.GetMillis(),
}, 100))
store.Must(ss.Channel().SaveMember(&model.ChannelMember{
UserId: u3.Id,
ChannelId: cPub2.Id,
NotifyProps: model.GetDefaultChannelNotifyProps(),
}))
store.Must(ss.Channel().SaveMember(&model.ChannelMember{
UserId: u3.Id,
ChannelId: cPriv.Id,
NotifyProps: model.GetDefaultChannelNotifyProps(),
}))
endTime := u3.CreateAt
// First and last user should be outside the range
res1 := <-ss.User().GetUsersBatchForIndexing(startTime, endTime, 100)
assert.Nil(t, res1.Err)
res1List := res1.Data.([]*model.UserForIndexing)
assert.Len(t, res1List, 1)
assert.Equal(t, res1List[0].Username, u2.Username)
assert.ElementsMatch(t, res1List[0].TeamsIds, []string{t1.Id})
assert.ElementsMatch(t, res1List[0].ChannelsIds, []string{cPub1.Id, cPub2.Id})
// Update startTime to include first user
startTime = u1.CreateAt
res2 := <-ss.User().GetUsersBatchForIndexing(startTime, endTime, 100)
assert.Nil(t, res1.Err)
res2List := res2.Data.([]*model.UserForIndexing)
assert.Len(t, res2List, 2)
assert.Equal(t, res2List[0].Username, u1.Username)
assert.Equal(t, res2List[0].ChannelsIds, []string{})
assert.Equal(t, res2List[0].TeamsIds, []string{})
assert.Equal(t, res2List[1].Username, u2.Username)
// Update endTime to include last user
endTime = model.GetMillis()
res3 := <-ss.User().GetUsersBatchForIndexing(startTime, endTime, 100)
assert.Nil(t, res3.Err)
res3List := res3.Data.([]*model.UserForIndexing)
assert.Len(t, res3List, 3)
assert.Equal(t, res3List[0].Username, u1.Username)
assert.Equal(t, res3List[1].Username, u2.Username)
assert.Equal(t, res3List[2].Username, u3.Username)
assert.ElementsMatch(t, res3List[2].TeamsIds, []string{})
assert.ElementsMatch(t, res3List[2].ChannelsIds, []string{cPub2.Id})
// Testing the limit
res4 := <-ss.User().GetUsersBatchForIndexing(startTime, endTime, 2)
assert.Nil(t, res4.Err)
res4List := res4.Data.([]*model.UserForIndexing)
assert.Len(t, res4List, 2)
assert.Equal(t, res4List[0].Username, u1.Username)
assert.Equal(t, res4List[1].Username, u2.Username)
}