[MM-25406] Include missing methods in the search layer (#14799)

* Two missing methods to add in the channel layer

* Added delete user/channel posts methods

- Created in both search engines but only implemented in ES
- Add those methods in the search layer
- Included the PermanentDeleteByUser/Channel methods

* Two new delete documents are included in the bleve code with this
change:

- DeleteChannelPosts
- DeleteUserPosts

These two new functions delete post documents from the index-based
in the filed value provided
This commit is contained in:
Mario de Frutos Dieguez
2020-06-25 13:45:39 +02:00
committed by GitHub
parent aaea36a24d
commit 05ec3733c0
8 changed files with 347 additions and 4 deletions

View File

@@ -4050,6 +4050,10 @@
"id": "bleveengine.delete_channel.error",
"translation": "Failed to delete the channel."
},
{
"id": "bleveengine.delete_channel_posts.error",
"translation": "Failed to delete channel posts"
},
{
"id": "bleveengine.delete_post.error",
"translation": "Failed to delete the post."
@@ -4058,6 +4062,10 @@
"id": "bleveengine.delete_user.error",
"translation": "Failed to delete the user."
},
{
"id": "bleveengine.delete_user_posts.error",
"translation": "Failed to delete user posts"
},
{
"id": "bleveengine.index_channel.error",
"translation": "Failed to index the channel."
@@ -4322,6 +4330,10 @@
"id": "ent.elasticsearch.delete_channel.error",
"translation": "Failed to delete the channel"
},
{
"id": "ent.elasticsearch.delete_channel_posts.error",
"translation": "Failed to delete channel posts"
},
{
"id": "ent.elasticsearch.delete_post.error",
"translation": "Failed to delete the post"
@@ -4330,6 +4342,10 @@
"id": "ent.elasticsearch.delete_user.error",
"translation": "Failed to delete the user"
},
{
"id": "ent.elasticsearch.delete_user_posts.error",
"translation": "Failed to delete user posts"
},
{
"id": "ent.elasticsearch.generic.disabled",
"translation": "Elasticsearch search is not enabled on this server"

View File

@@ -8,6 +8,8 @@ import (
"os"
"testing"
"github.com/blevesearch/bleve"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/mattermost/mattermost-server/v5/model"
@@ -26,6 +28,7 @@ type BleveEngineTestSuite struct {
SQLSupplier *sqlstore.SqlSupplier
SearchEngine *searchengine.Broker
Store *searchlayer.SearchStore
BleveEngine *BleveEngine
IndexDir string
}
@@ -60,10 +63,10 @@ func (s *BleveEngineTestSuite) setupStore() {
s.SearchEngine = searchengine.NewBroker(cfg, nil)
s.Store = searchlayer.NewSearchLayer(&testlib.TestStore{Store: s.SQLSupplier}, s.SearchEngine, cfg)
bleveEngine := NewBleveEngine(cfg, nil)
bleveEngine.indexSync = true
s.SearchEngine.RegisterBleveEngine(bleveEngine)
if err := bleveEngine.Start(); err != nil {
s.BleveEngine = NewBleveEngine(cfg, nil)
s.BleveEngine.indexSync = true
s.SearchEngine.RegisterBleveEngine(s.BleveEngine)
if err := s.BleveEngine.Start(); err != nil {
s.Require().FailNow("Cannot start bleveengine: %s", err.Error())
}
}
@@ -96,3 +99,131 @@ func (s *BleveEngineTestSuite) TestBleveSearchStoreTests() {
searchtest.TestSearchPostStore(s.T(), s.Store, searchTestEngine)
})
}
func (s *BleveEngineTestSuite) TestDeleteChannelPosts() {
s.Run("Should remove all the posts that belongs to a channel", func() {
s.BleveEngine.PurgeIndexes()
teamID := model.NewId()
userID := model.NewId()
channelID := model.NewId()
channelToAvoidID := model.NewId()
posts := make([]*model.Post, 0)
for i := 0; i < 10; i++ {
post := createPost(userID, channelID, "test one two three")
appErr := s.SearchEngine.BleveEngine.IndexPost(post, teamID)
require.Nil(s.T(), appErr)
posts = append(posts, post)
}
postToAvoid := createPost(userID, channelToAvoidID, "test one two three")
appErr := s.SearchEngine.BleveEngine.IndexPost(postToAvoid, teamID)
require.Nil(s.T(), appErr)
s.SearchEngine.BleveEngine.DeleteChannelPosts(channelID)
doc, err := s.BleveEngine.PostIndex.Document(postToAvoid.Id)
require.Nil(s.T(), err)
require.Equal(s.T(), postToAvoid.Id, doc.ID)
numberDocs, err := s.BleveEngine.PostIndex.DocCount()
require.Nil(s.T(), err)
require.Equal(s.T(), 1, int(numberDocs))
})
s.Run("Shouldn't do anything if there is not posts for the selected channel", func() {
s.BleveEngine.PurgeIndexes()
teamID := model.NewId()
userID := model.NewId()
channelID := model.NewId()
channelToDeleteID := model.NewId()
post := createPost(userID, channelID, "test one two three")
appErr := s.SearchEngine.BleveEngine.IndexPost(post, teamID)
require.Nil(s.T(), appErr)
s.SearchEngine.BleveEngine.DeleteChannelPosts(channelToDeleteID)
_, err := s.BleveEngine.PostIndex.Document(post.Id)
require.Nil(s.T(), err)
numberDocs, err := s.BleveEngine.PostIndex.DocCount()
require.Nil(s.T(), err)
require.Equal(s.T(), 1, int(numberDocs))
})
}
func (s *BleveEngineTestSuite) TestDeleteUserPosts() {
s.Run("Should remove all the posts that belongs to a user", func() {
s.BleveEngine.PurgeIndexes()
teamID := model.NewId()
userID := model.NewId()
userToAvoidID := model.NewId()
channelID := model.NewId()
posts := make([]*model.Post, 0)
for i := 0; i < 10; i++ {
post := createPost(userID, channelID, "test one two three")
appErr := s.SearchEngine.BleveEngine.IndexPost(post, teamID)
require.Nil(s.T(), appErr)
posts = append(posts, post)
}
postToAvoid := createPost(userToAvoidID, channelID, "test one two three")
appErr := s.SearchEngine.BleveEngine.IndexPost(postToAvoid, teamID)
require.Nil(s.T(), appErr)
s.SearchEngine.BleveEngine.DeleteUserPosts(userID)
doc, err := s.BleveEngine.PostIndex.Document(postToAvoid.Id)
require.Nil(s.T(), err)
require.Equal(s.T(), postToAvoid.Id, doc.ID)
numberDocs, err := s.BleveEngine.PostIndex.DocCount()
require.Nil(s.T(), err)
require.Equal(s.T(), 1, int(numberDocs))
})
s.Run("Shouldn't do anything if there is not posts for the selected user", func() {
s.BleveEngine.PurgeIndexes()
teamID := model.NewId()
userID := model.NewId()
userToDeleteID := model.NewId()
channelID := model.NewId()
post := createPost(userID, channelID, "test one two three")
appErr := s.SearchEngine.BleveEngine.IndexPost(post, teamID)
require.Nil(s.T(), appErr)
s.SearchEngine.BleveEngine.DeleteUserPosts(userToDeleteID)
_, err := s.BleveEngine.PostIndex.Document(post.Id)
require.Nil(s.T(), err)
numberDocs, err := s.BleveEngine.PostIndex.DocCount()
require.Nil(s.T(), err)
require.Equal(s.T(), 1, int(numberDocs))
})
}
func (s *BleveEngineTestSuite) TestDeletePosts() {
s.BleveEngine.PurgeIndexes()
teamID := model.NewId()
userID := model.NewId()
userToAvoidID := model.NewId()
channelID := model.NewId()
posts := make([]*model.Post, 0)
for i := 0; i < 10; i++ {
post := createPost(userID, channelID, "test one two three")
appErr := s.SearchEngine.BleveEngine.IndexPost(post, teamID)
require.Nil(s.T(), appErr)
posts = append(posts, post)
}
postToAvoid := createPost(userToAvoidID, channelID, "test one two three")
appErr := s.SearchEngine.BleveEngine.IndexPost(postToAvoid, teamID)
require.Nil(s.T(), appErr)
query := bleve.NewTermQuery(userID)
query.SetField("UserId")
search := bleve.NewSearchRequest(query)
count, err := s.BleveEngine.deletePosts(search, 1)
require.Nil(s.T(), err)
require.Equal(s.T(), 10, int(count))
doc, err := s.BleveEngine.PostIndex.Document(postToAvoid.Id)
require.Nil(s.T(), err)
require.Equal(s.T(), postToAvoid.Id, doc.ID)
numberDocs, err := s.BleveEngine.PostIndex.DocCount()
require.Nil(s.T(), err)
require.Equal(s.T(), 1, int(numberDocs))
}

View File

@@ -7,12 +7,15 @@ import (
"net/http"
"strings"
"github.com/mattermost/mattermost-server/v5/mlog"
"github.com/mattermost/mattermost-server/v5/model"
"github.com/blevesearch/bleve"
"github.com/blevesearch/bleve/search/query"
)
const DELETE_POSTS_BATCH_SIZE = 500
func (b *BleveEngine) IndexPost(post *model.Post, teamId string) *model.AppError {
b.Mutex.RLock()
defer b.Mutex.RUnlock()
@@ -209,6 +212,72 @@ func (b *BleveEngine) SearchPosts(channels *model.ChannelList, searchParams []*m
return postIds, matches, nil
}
func (b *BleveEngine) deletePosts(searchRequest *bleve.SearchRequest, batchSize int) (int64, error) {
resultsCount := int64(0)
for {
// As we are deleting the posts after fetching them, we need to keep
// From fixed always to 0
searchRequest.From = 0
searchRequest.Size = batchSize
results, err := b.PostIndex.Search(searchRequest)
if err != nil {
return -1, err
}
batch := b.PostIndex.NewBatch()
for _, post := range results.Hits {
batch.Delete(post.ID)
}
if err := b.PostIndex.Batch(batch); err != nil {
return -1, err
}
resultsCount += int64(results.Hits.Len())
if results.Hits.Len() < batchSize {
break
}
}
return resultsCount, nil
}
func (b *BleveEngine) DeleteChannelPosts(channelID string) *model.AppError {
b.Mutex.RLock()
defer b.Mutex.RUnlock()
query := bleve.NewTermQuery(channelID)
query.SetField("ChannelId")
search := bleve.NewSearchRequest(query)
deleted, err := b.deletePosts(search, DELETE_POSTS_BATCH_SIZE)
if err != nil {
return model.NewAppError("Bleveengine.DeleteChannelPosts",
"bleveengine.delete_channel_posts.error", nil,
err.Error(), http.StatusInternalServerError)
}
mlog.Info("Posts for channel deleted", mlog.String("channel_id", channelID), mlog.Int64("deleted", deleted))
return nil
}
func (b *BleveEngine) DeleteUserPosts(userID string) *model.AppError {
b.Mutex.RLock()
defer b.Mutex.RUnlock()
query := bleve.NewTermQuery(userID)
query.SetField("UserId")
search := bleve.NewSearchRequest(query)
deleted, err := b.deletePosts(search, DELETE_POSTS_BATCH_SIZE)
if err != nil {
return model.NewAppError("Bleveengine.DeleteUserPosts",
"bleveengine.delete_user_posts.error", nil,
err.Error(), http.StatusInternalServerError)
}
mlog.Info("Posts for user deleted", mlog.String("user_id", userID), mlog.Int64("deleted", deleted))
return nil
}
func (b *BleveEngine) DeletePost(post *model.Post) *model.AppError {
b.Mutex.RLock()
defer b.Mutex.RUnlock()

View File

@@ -0,0 +1,23 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package bleveengine
import (
"fmt"
"github.com/mattermost/mattermost-server/v5/model"
)
func createPost(userId string, channelId string, message string) *model.Post {
post := &model.Post{
Message: message,
ChannelId: channelId,
PendingPostId: model.NewId() + ":" + fmt.Sprint(model.GetMillis()),
UserId: userId,
CreateAt: 1000000,
}
post.PreSave()
return post
}

View File

@@ -23,6 +23,8 @@ type SearchEngineInterface interface {
IndexPost(post *model.Post, teamId string) *model.AppError
SearchPosts(channels *model.ChannelList, searchParams []*model.SearchParams, page, perPage int) ([]string, model.PostSearchMatches, *model.AppError)
DeletePost(post *model.Post) *model.AppError
DeleteChannelPosts(channelID string) *model.AppError
DeleteUserPosts(userID string) *model.AppError
IndexChannel(channel *model.Channel) *model.AppError
SearchChannels(teamId, term string) ([]string, *model.AppError)
DeleteChannel(channel *model.Channel) *model.AppError

View File

@@ -48,6 +48,22 @@ func (_m *SearchEngineInterface) DeleteChannel(channel *model.Channel) *model.Ap
return r0
}
// DeleteChannelPosts provides a mock function with given fields: channelID
func (_m *SearchEngineInterface) DeleteChannelPosts(channelID string) *model.AppError {
ret := _m.Called(channelID)
var r0 *model.AppError
if rf, ok := ret.Get(0).(func(string) *model.AppError); ok {
r0 = rf(channelID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*model.AppError)
}
}
return r0
}
// DeletePost provides a mock function with given fields: post
func (_m *SearchEngineInterface) DeletePost(post *model.Post) *model.AppError {
ret := _m.Called(post)
@@ -80,6 +96,22 @@ func (_m *SearchEngineInterface) DeleteUser(user *model.User) *model.AppError {
return r0
}
// DeleteUserPosts provides a mock function with given fields: userID
func (_m *SearchEngineInterface) DeleteUserPosts(userID string) *model.AppError {
ret := _m.Called(userID)
var r0 *model.AppError
if rf, ok := ret.Get(0).(func(string) *model.AppError); ok {
r0 = rf(userID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*model.AppError)
}
}
return r0
}
// GetName provides a mock function with given fields:
func (_m *SearchEngineInterface) GetName() string {
ret := _m.Called()

View File

@@ -97,6 +97,17 @@ func (c *SearchChannelStore) RemoveMember(channelId, userIdToRemove string) *mod
return err
}
func (c *SearchChannelStore) RemoveMembers(channelId string, userIds []string) *model.AppError {
if err := c.ChannelStore.RemoveMembers(channelId, userIds); err != nil {
return err
}
for _, uid := range userIds {
c.rootStore.indexUserFromID(uid)
}
return nil
}
func (c *SearchChannelStore) CreateDirectChannel(user *model.User, otherUser *model.User) (*model.Channel, error) {
channel, err := c.ChannelStore.CreateDirectChannel(user, otherUser)
if err == nil {
@@ -106,6 +117,15 @@ func (c *SearchChannelStore) CreateDirectChannel(user *model.User, otherUser *mo
return channel, err
}
func (c *SearchChannelStore) SaveDirectChannel(directchannel *model.Channel, member1 *model.ChannelMember, member2 *model.ChannelMember) (*model.Channel, error) {
channel, err := c.ChannelStore.SaveDirectChannel(directchannel, member1, member2)
if err != nil {
c.rootStore.indexUserFromID(member1.UserId)
c.rootStore.indexUserFromID(member2.UserId)
}
return channel, err
}
func (c *SearchChannelStore) AutocompleteInTeam(teamId string, term string, includeDeleted bool) (*model.ChannelList, *model.AppError) {
var channelList *model.ChannelList
var err *model.AppError

View File

@@ -49,6 +49,32 @@ func (s SearchPostStore) deletePostIndex(post *model.Post) {
}
}
func (s SearchPostStore) deleteChannelPostsIndex(channelID string) {
for _, engine := range s.rootStore.searchEngine.GetActiveEngines() {
if engine.IsIndexingEnabled() {
runIndexFn(engine, func(engineCopy searchengine.SearchEngineInterface) {
if err := engineCopy.DeleteChannelPosts(channelID); err != nil {
mlog.Error("Encountered error deleting channel posts", mlog.String("channel_id", channelID), mlog.String("search_engine", engineCopy.GetName()), mlog.Err(err))
}
mlog.Debug("Removed all channel posts from the index in search engine", mlog.String("channel_id", channelID), mlog.String("search_engine", engineCopy.GetName()))
})
}
}
}
func (s SearchPostStore) deleteUserPostsIndex(userID string) {
for _, engine := range s.rootStore.searchEngine.GetActiveEngines() {
if engine.IsIndexingEnabled() {
runIndexFn(engine, func(engineCopy searchengine.SearchEngineInterface) {
if err := engineCopy.DeleteUserPosts(userID); err != nil {
mlog.Error("Encountered error deleting user posts", mlog.String("user_id", userID), mlog.String("search_engine", engineCopy.GetName()), mlog.Err(err))
}
mlog.Debug("Removed all user posts from the index in search engine", mlog.String("user_id", userID), mlog.String("search_engine", engineCopy.GetName()))
})
}
}
}
func (s SearchPostStore) Update(newPost, oldPost *model.Post) (*model.Post, *model.AppError) {
post, err := s.PostStore.Update(newPost, oldPost)
@@ -58,6 +84,14 @@ func (s SearchPostStore) Update(newPost, oldPost *model.Post) (*model.Post, *mod
return post, err
}
func (s *SearchPostStore) Overwrite(post *model.Post) (*model.Post, *model.AppError) {
post, err := s.PostStore.Overwrite(post)
if err == nil {
s.indexPost(post)
}
return post, err
}
func (s SearchPostStore) Save(post *model.Post) (*model.Post, *model.AppError) {
npost, err := s.PostStore.Save(post)
@@ -81,6 +115,22 @@ func (s SearchPostStore) Delete(postId string, date int64, deletedByID string) *
return err
}
func (s SearchPostStore) PermanentDeleteByUser(userID string) *model.AppError {
err := s.PostStore.PermanentDeleteByUser(userID)
if err == nil {
s.deleteUserPostsIndex(userID)
}
return err
}
func (s SearchPostStore) PermanentDeleteByChannel(channelID string) *model.AppError {
err := s.PostStore.PermanentDeleteByChannel(channelID)
if err == nil {
s.deleteChannelPostsIndex(channelID)
}
return err
}
func (s SearchPostStore) searchPostsInTeamForUserByEngine(engine searchengine.SearchEngineInterface, paramsList []*model.SearchParams, userId, teamId string, isOrSearch, includeDeletedChannels bool, page, perPage int) (*model.PostSearchResults, *model.AppError) {
// We only allow the user to search in channels they are a member of.
userChannels, nErr := s.rootStore.Channel().GetChannels(teamId, userId, includeDeletedChannels)