mirror of
https://github.com/mattermost/mattermost.git
synced 2025-02-25 18:55:24 -06:00
PLT-6475: Elasticsearch Indexing Worker. (#6879)
This commit is contained in:
22
einterfaces/jobs/elasticsearch.go
Normal file
22
einterfaces/jobs/elasticsearch.go
Normal file
@@ -0,0 +1,22 @@
|
||||
// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
|
||||
// See License.txt for license information.
|
||||
|
||||
package jobs
|
||||
|
||||
import (
|
||||
"github.com/mattermost/platform/model"
|
||||
)
|
||||
|
||||
type ElasticsearchIndexerInterface interface {
|
||||
MakeWorker() model.Worker
|
||||
}
|
||||
|
||||
var theElasticsearchIndexerInterface ElasticsearchIndexerInterface
|
||||
|
||||
func RegisterElasticsearchIndexerInterface(newInterface ElasticsearchIndexerInterface) {
|
||||
theElasticsearchIndexerInterface = newInterface
|
||||
}
|
||||
|
||||
func GetElasticsearchIndexerInterface() ElasticsearchIndexerInterface {
|
||||
return theElasticsearchIndexerInterface
|
||||
}
|
||||
16
i18n/en.json
16
i18n/en.json
@@ -3516,19 +3516,19 @@
|
||||
"translation": "Failed to decode search results"
|
||||
},
|
||||
{
|
||||
"id": "ent.elasticsearch.start.connect_failed",
|
||||
"id": "ent.elasticsearch.create_client.connect_failed",
|
||||
"translation": "Setting up ElasticSearch Client Failed"
|
||||
},
|
||||
{
|
||||
"id": "ent.elasticsearch.start.index_create_failed",
|
||||
"id": "ent.elasticsearch.create_index_if_not_exists.index_create_failed",
|
||||
"translation": "Failed to create ElasticSearch index"
|
||||
},
|
||||
{
|
||||
"id": "ent.elasticsearch.start.index_exists_failed",
|
||||
"id": "ent.elasticsearch.create_index_if_not_exists.index_exists_failed",
|
||||
"translation": "Failed to establish whether ElasticSearch index exists"
|
||||
},
|
||||
{
|
||||
"id": "ent.elasticsearch.start.index_mapping_failed",
|
||||
"id": "ent.elasticsearch.create_index_if_not_exists.index_mapping_failed",
|
||||
"translation": "Failed to setup ElasticSearch index mapping"
|
||||
},
|
||||
{
|
||||
@@ -3763,6 +3763,10 @@
|
||||
"id": "error.not_found.title",
|
||||
"translation": "Page not found"
|
||||
},
|
||||
{
|
||||
"id": "jobs.set_job_error.update.error",
|
||||
"translation": "Failed to set job status to error"
|
||||
},
|
||||
{
|
||||
"id": "jobs.request_cancellation.status.error",
|
||||
"translation": "Could not request cancellation for job that is not in a cancelable state."
|
||||
@@ -5483,6 +5487,10 @@
|
||||
"id": "store.sql_post.get_posts_around.get_parent.app_error",
|
||||
"translation": "We couldn't get the parent posts for the channel"
|
||||
},
|
||||
{
|
||||
"id": "store.sql_post.get_posts_batch_for_indexing.get.app_error",
|
||||
"translation": "We couldn't get the posts batch for indexing"
|
||||
},
|
||||
{
|
||||
"id": "store.sql_post.get_posts_by_ids.app_error",
|
||||
"translation": "We couldn't get the posts"
|
||||
|
||||
52
jobs/jobs.go
52
jobs/jobs.go
@@ -9,6 +9,7 @@ import (
|
||||
|
||||
l4g "github.com/alecthomas/log4go"
|
||||
"github.com/mattermost/platform/model"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -40,27 +41,15 @@ func ClaimJob(job *model.Job) (bool, *model.AppError) {
|
||||
}
|
||||
}
|
||||
|
||||
func SetJobProgress(jobId string, progress int64) (bool, *model.AppError) {
|
||||
var job *model.Job
|
||||
|
||||
if result := <-Srv.Store.Job().Get(jobId); result.Err != nil {
|
||||
return false, result.Err
|
||||
} else {
|
||||
job = result.Data.(*model.Job)
|
||||
}
|
||||
|
||||
func SetJobProgress(job *model.Job, progress int64) (*model.AppError) {
|
||||
job.Status = model.JOB_STATUS_IN_PROGRESS
|
||||
job.Progress = progress
|
||||
|
||||
if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
|
||||
return false, result.Err
|
||||
return result.Err
|
||||
} else {
|
||||
if !result.Data.(bool) {
|
||||
return false, nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func SetJobSuccess(job *model.Job) *model.AppError {
|
||||
@@ -68,9 +57,34 @@ func SetJobSuccess(job *model.Job) *model.AppError {
|
||||
return result.Err
|
||||
}
|
||||
|
||||
func SetJobError(job *model.Job) *model.AppError {
|
||||
result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_ERROR)
|
||||
return result.Err
|
||||
func SetJobError(job *model.Job, jobError *model.AppError) *model.AppError {
|
||||
if jobError == nil {
|
||||
result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_ERROR)
|
||||
return result.Err
|
||||
}
|
||||
|
||||
job.Status = model.JOB_STATUS_ERROR
|
||||
job.Progress = -1
|
||||
if job.Data == nil {
|
||||
job.Data = make(map[string]interface{})
|
||||
}
|
||||
job.Data["error"] = jobError
|
||||
|
||||
if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
|
||||
return result.Err
|
||||
} else {
|
||||
if !result.Data.(bool) {
|
||||
if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil {
|
||||
return result.Err
|
||||
} else {
|
||||
if !result.Data.(bool) {
|
||||
return model.NewAppError("Jobs.SetJobError", "jobs.set_job_error.update.error", nil, "id=" + job.Id, http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func SetJobCanceled(job *model.Job) *model.AppError {
|
||||
@@ -91,7 +105,7 @@ func RequestCancellation(job *model.Job) *model.AppError {
|
||||
return nil
|
||||
}
|
||||
|
||||
return model.NewLocAppError("Jobs.RequestCancellation", "jobs.request_cancellation.status.error", nil, "id=" + job.Id)
|
||||
return model.NewAppError("Jobs.RequestCancellation", "jobs.request_cancellation.status.error", nil, "id=" + job.Id, http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
func CancellationWatcher(ctx context.Context, jobId string, cancelChan chan interface{}) {
|
||||
|
||||
@@ -79,6 +79,13 @@ func (watcher *Watcher) PollAndNotify() {
|
||||
default:
|
||||
}
|
||||
}
|
||||
} else if js.Type == model.JOB_TYPE_SEARCH_INDEXING {
|
||||
if watcher.workers.ElasticsearchIndexing != nil {
|
||||
select {
|
||||
case watcher.workers.ElasticsearchIndexing.JobChannel() <- j:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -85,7 +85,7 @@ func (worker *TestWorker) DoJob(job *model.Job) {
|
||||
}
|
||||
return
|
||||
} else {
|
||||
if _, err := SetJobProgress(job.Id, int64(counter*10)); err != nil {
|
||||
if err := SetJobProgress(job, int64(counter*10)); err != nil {
|
||||
l4g.Error("Job: %v: an error occured while trying to set job progress: %v", job.Id, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,13 +13,13 @@ import (
|
||||
)
|
||||
|
||||
type Workers struct {
|
||||
startOnce sync.Once
|
||||
watcher *Watcher
|
||||
startOnce sync.Once
|
||||
watcher *Watcher
|
||||
|
||||
DataRetention model.Worker
|
||||
// SearchIndexing model.Job
|
||||
DataRetention model.Worker
|
||||
ElasticsearchIndexing model.Worker
|
||||
|
||||
listenerId string
|
||||
listenerId string
|
||||
}
|
||||
|
||||
func InitWorkers() *Workers {
|
||||
@@ -32,6 +32,10 @@ func InitWorkers() *Workers {
|
||||
workers.DataRetention = dataRetentionInterface.MakeWorker()
|
||||
}
|
||||
|
||||
if elasticsearchIndexerInterface := ejobs.GetElasticsearchIndexerInterface(); elasticsearchIndexerInterface != nil {
|
||||
workers.ElasticsearchIndexing = elasticsearchIndexerInterface.MakeWorker()
|
||||
}
|
||||
|
||||
return workers
|
||||
}
|
||||
|
||||
@@ -43,7 +47,9 @@ func (workers *Workers) Start() *Workers {
|
||||
go workers.DataRetention.Run()
|
||||
}
|
||||
|
||||
// go workers.SearchIndexing.Run()
|
||||
if workers.ElasticsearchIndexing != nil && *utils.Cfg.ElasticSearchSettings.EnableIndexing {
|
||||
go workers.ElasticsearchIndexing.Run()
|
||||
}
|
||||
|
||||
go workers.watcher.Start()
|
||||
})
|
||||
@@ -61,6 +67,14 @@ func (workers *Workers) handleConfigChange(oldConfig *model.Config, newConfig *m
|
||||
workers.DataRetention.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
if workers.ElasticsearchIndexing != nil {
|
||||
if !*oldConfig.ElasticSearchSettings.EnableIndexing && *newConfig.ElasticSearchSettings.EnableIndexing {
|
||||
go workers.ElasticsearchIndexing.Run()
|
||||
} else if *oldConfig.ElasticSearchSettings.EnableIndexing && !*newConfig.ElasticSearchSettings.EnableIndexing {
|
||||
workers.ElasticsearchIndexing.Stop()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (workers *Workers) Stop() *Workers {
|
||||
@@ -71,7 +85,10 @@ func (workers *Workers) Stop() *Workers {
|
||||
if workers.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
|
||||
workers.DataRetention.Stop()
|
||||
}
|
||||
// workers.SearchIndexing.Stop()
|
||||
|
||||
if workers.ElasticsearchIndexing != nil && *utils.Cfg.ElasticSearchSettings.EnableIndexing {
|
||||
workers.ElasticsearchIndexing.Stop()
|
||||
}
|
||||
|
||||
l4g.Info("Stopped workers")
|
||||
|
||||
|
||||
@@ -62,6 +62,12 @@ type PostPatch struct {
|
||||
HasReactions *bool `json:"has_reactions"`
|
||||
}
|
||||
|
||||
type PostForIndexing struct {
|
||||
Post
|
||||
TeamId string `json:"team_id"`
|
||||
ParentCreateAt *int64 `json:"parent_create_at"`
|
||||
}
|
||||
|
||||
func (o *Post) ToJson() string {
|
||||
b, err := json.Marshal(o)
|
||||
if err != nil {
|
||||
|
||||
@@ -1315,3 +1315,44 @@ func (s SqlPostStore) GetPostsByIds(postIds []string) StoreChannel {
|
||||
|
||||
return storeChannel
|
||||
}
|
||||
|
||||
func (s SqlPostStore) GetPostsBatchForIndexing(startTime int64, limit int) StoreChannel {
|
||||
storeChannel := make(StoreChannel, 1)
|
||||
|
||||
go func() {
|
||||
result := StoreResult{}
|
||||
|
||||
var posts []*model.PostForIndexing
|
||||
_, err1 := s.GetSearchReplica().Select(&posts,
|
||||
`(SELECT
|
||||
Posts.*,
|
||||
Channels.TeamId,
|
||||
ParentPosts.CreateAt ParentCreateAt
|
||||
FROM
|
||||
Posts
|
||||
LEFT JOIN
|
||||
Channels
|
||||
ON
|
||||
Posts.ChannelId = Channels.Id
|
||||
LEFT JOIN
|
||||
Posts ParentPosts
|
||||
ON
|
||||
Posts.RootId = ParentPosts.Id
|
||||
WHERE
|
||||
Posts.CreateAt >= :StartTime
|
||||
ORDER BY CreateAt ASC
|
||||
LIMIT :NumPosts)`,
|
||||
map[string]interface{}{"StartTime": startTime, "NumPosts": limit})
|
||||
|
||||
if err1 != nil {
|
||||
result.Err = model.NewLocAppError("SqlPostStore.GetPostContext", "store.sql_post.get_posts_batch_for_indexing.get.app_error", nil, err1.Error())
|
||||
} else {
|
||||
result.Data = posts
|
||||
}
|
||||
|
||||
storeChannel <- result
|
||||
close(storeChannel)
|
||||
}()
|
||||
|
||||
return storeChannel
|
||||
}
|
||||
|
||||
@@ -1592,3 +1592,72 @@ func TestPostStoreGetPostsByIds(t *testing.T) {
|
||||
t.Fatalf("Expected 2 posts in results. Got %v", len(ro5))
|
||||
}
|
||||
}
|
||||
|
||||
func TestPostStoreGetPostsBatchForIndexing(t *testing.T) {
|
||||
Setup()
|
||||
|
||||
c1 := &model.Channel{}
|
||||
c1.TeamId = model.NewId()
|
||||
c1.DisplayName = "Channel1"
|
||||
c1.Name = "zz" + model.NewId() + "b"
|
||||
c1.Type = model.CHANNEL_OPEN
|
||||
c1 = (<-store.Channel().Save(c1)).Data.(*model.Channel)
|
||||
|
||||
c2 := &model.Channel{}
|
||||
c2.TeamId = model.NewId()
|
||||
c2.DisplayName = "Channel2"
|
||||
c2.Name = "zz" + model.NewId() + "b"
|
||||
c2.Type = model.CHANNEL_OPEN
|
||||
c2 = (<-store.Channel().Save(c2)).Data.(*model.Channel)
|
||||
|
||||
o1 := &model.Post{}
|
||||
o1.ChannelId = c1.Id
|
||||
o1.UserId = model.NewId()
|
||||
o1.Message = "zz" + model.NewId() + "AAAAAAAAAAA"
|
||||
o1 = (<-store.Post().Save(o1)).Data.(*model.Post)
|
||||
|
||||
o2 := &model.Post{}
|
||||
o2.ChannelId = c2.Id
|
||||
o2.UserId = model.NewId()
|
||||
o2.Message = "zz" + model.NewId() + "CCCCCCCCC"
|
||||
o2 = (<-store.Post().Save(o2)).Data.(*model.Post)
|
||||
|
||||
o3 := &model.Post{}
|
||||
o3.ChannelId = c1.Id
|
||||
o3.UserId = model.NewId()
|
||||
o3.ParentId = o1.Id
|
||||
o3.RootId = o1.Id
|
||||
o3.Message = "zz" + model.NewId() + "QQQQQQQQQQ"
|
||||
o3 = (<-store.Post().Save(o3)).Data.(*model.Post)
|
||||
|
||||
if r := Must(store.Post().GetPostsBatchForIndexing(o1.CreateAt, 100)).([]*model.PostForIndexing); len(r) != 3 {
|
||||
t.Fatalf("Expected 3 posts in results. Got %v", len(r))
|
||||
} else {
|
||||
for _, p := range r {
|
||||
if p.Id == o1.Id {
|
||||
if p.TeamId != c1.TeamId {
|
||||
t.Fatalf("Unexpected team ID")
|
||||
}
|
||||
if p.ParentCreateAt != nil {
|
||||
t.Fatalf("Unexpected parent create at")
|
||||
}
|
||||
} else if p.Id == o2.Id {
|
||||
if p.TeamId != c2.TeamId {
|
||||
t.Fatalf("Unexpected team ID")
|
||||
}
|
||||
if p.ParentCreateAt != nil {
|
||||
t.Fatalf("Unexpected parent create at")
|
||||
}
|
||||
} else if p.Id == o3.Id {
|
||||
if p.TeamId != c1.TeamId {
|
||||
t.Fatalf("Unexpected team ID")
|
||||
}
|
||||
if *p.ParentCreateAt != o1.CreateAt {
|
||||
t.Fatalf("Unexpected parent create at")
|
||||
}
|
||||
} else {
|
||||
t.Fatalf("unexpected post returned")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -168,6 +168,7 @@ type PostStore interface {
|
||||
GetPostsCreatedAt(channelId string, time int64) StoreChannel
|
||||
Overwrite(post *model.Post) StoreChannel
|
||||
GetPostsByIds(postIds []string) StoreChannel
|
||||
GetPostsBatchForIndexing(startTime int64, limit int) StoreChannel
|
||||
}
|
||||
|
||||
type UserStore interface {
|
||||
|
||||
Reference in New Issue
Block a user