PLT-6472: Basic Elastic Search implementation. (#6382)

* PLT-6472: Basic Elastic Search implementation.

This currently supports indexing of posts at create/update/delete time.
It does not support batch indexing or reindexing, and does not support
any entities other than posts yet. The purpose is to more-or-less
replicate the existing full-text search feature but with some of the
immediate benefits of using elastic search.

* Alter settings for AWS compatability.

* Remove unneeded i18n strings.
This commit is contained in:
George Goldberg
2017-05-18 16:26:52 +01:00
committed by Harrison Healey
parent 2bbedd9def
commit 0db5e3922f
10 changed files with 343 additions and 37 deletions

View File

@@ -125,6 +125,11 @@ func CreatePost(post *model.Post, teamId string, triggerWebhooks bool) (*model.P
rpost = result.Data.(*model.Post)
}
esInterface := einterfaces.GetElasticSearchInterface()
if (esInterface != nil && *utils.Cfg.ElasticSearchSettings.EnableIndexing) {
go esInterface.IndexPost(rpost, teamId)
}
if einterfaces.GetMetricsInterface() != nil {
einterfaces.GetMetricsInterface().IncrementPostCreate()
}
@@ -308,6 +313,17 @@ func UpdatePost(post *model.Post, safeUpdate bool) (*model.Post, *model.AppError
} else {
rpost := result.Data.(*model.Post)
esInterface := einterfaces.GetElasticSearchInterface()
if (esInterface != nil && *utils.Cfg.ElasticSearchSettings.EnableIndexing) {
go func() {
if rchannel := <-Srv.Store.Channel().GetForPost(rpost.Id); rchannel.Err != nil {
l4g.Error("Couldn't get channel %v for post %v for ElasticSearch indexing.", rpost.ChannelId, rpost.Id)
} else {
esInterface.IndexPost(rpost, rchannel.Data.(*model.Channel).TeamId)
}
}()
}
sendUpdatedPostEvent(rpost)
InvalidateCacheForChannelPosts(rpost.ChannelId)
@@ -484,6 +500,11 @@ func DeletePost(postId string) (*model.Post, *model.AppError) {
go DeletePostFiles(post)
go DeleteFlaggedPosts(post.Id)
esInterface := einterfaces.GetElasticSearchInterface()
if (esInterface != nil && *utils.Cfg.ElasticSearchSettings.EnableIndexing) {
go esInterface.DeletePost(post.Id)
}
InvalidateCacheForChannelPosts(post.ChannelId)
return post, nil
@@ -509,27 +530,84 @@ func DeletePostFiles(post *model.Post) {
func SearchPostsInTeam(terms string, userId string, teamId string, isOrSearch bool) (*model.PostList, *model.AppError) {
paramsList := model.ParseSearchParams(terms)
channels := []store.StoreChannel{}
for _, params := range paramsList {
params.OrTerms = isOrSearch
// don't allow users to search for everything
if params.Terms != "*" {
channels = append(channels, Srv.Store.Post().Search(teamId, userId, params))
esInterface := einterfaces.GetElasticSearchInterface()
if (esInterface != nil && *utils.Cfg.ElasticSearchSettings.EnableSearching && utils.IsLicensed && *utils.License.Features.ElasticSearch) {
finalParamsList := []*model.SearchParams{}
for _, params := range paramsList {
params.OrTerms = isOrSearch
// Don't allow users to search for "*"
if params.Terms != "*" {
// Convert channel names to channel IDs
for idx, channelName := range params.InChannels {
if channel, err := GetChannelByName(channelName, teamId); err != nil {
l4g.Error(err)
} else {
params.InChannels[idx] = channel.Id
}
}
// Convert usernames to user IDs
for idx, username := range params.FromUsers {
if user, err := GetUserByUsername(username); err != nil {
l4g.Error(err)
} else {
params.FromUsers[idx] = user.Id
}
}
finalParamsList = append(finalParamsList, params)
}
}
}
posts := model.NewPostList()
for _, channel := range channels {
if result := <-channel; result.Err != nil {
return nil, result.Err
// We only allow the user to search in channels they are a member of.
userChannels, err := GetChannelsForUser(teamId, userId)
if err != nil {
l4g.Error(err)
return nil, err
}
postIds, err := einterfaces.GetElasticSearchInterface().SearchPosts(userChannels, finalParamsList)
if err != nil {
return nil, err
}
// Get the posts
postList := model.NewPostList()
if presult := <-Srv.Store.Post().GetPostsByIds(postIds); presult.Err != nil {
return nil, presult.Err
} else {
data := result.Data.(*model.PostList)
posts.Extend(data)
for _, p := range presult.Data.([]*model.Post) {
postList.AddPost(p)
postList.AddOrder(p.Id)
}
}
}
return posts, nil
return postList, nil
} else {
channels := []store.StoreChannel{}
for _, params := range paramsList {
params.OrTerms = isOrSearch
// don't allow users to search for everything
if params.Terms != "*" {
channels = append(channels, Srv.Store.Post().Search(teamId, userId, params))
}
}
posts := model.NewPostList()
for _, channel := range channels {
if result := <-channel; result.Err != nil {
return nil, result.Err
} else {
data := result.Data.(*model.PostList)
posts.Extend(data)
}
}
return posts, nil
}
}
func GetFileInfosForPost(postId string, readFromMaster bool) ([]*model.FileInfo, *model.AppError) {

View File

@@ -114,6 +114,12 @@ func runServer(configFileLocation string) {
einterfaces.GetMetricsInterface().StartServer()
}
if einterfaces.GetElasticSearchInterface() != nil {
if err := einterfaces.GetElasticSearchInterface().Start(); err != nil {
l4g.Error(err.Error())
}
}
// wait for kill signal before attempting to gracefully shutdown
// the running service
c := make(chan os.Signal)

View File

@@ -46,6 +46,14 @@
"EnableUserStatuses": true,
"ClusterLogTimeoutMilliseconds": 2000
},
"ElasticSearchSettings": {
"ConnectionUrl": "http://dockerhost:9200",
"Username": "elastic",
"Password": "changeme",
"EnableIndexing": false,
"EnableSearching": false,
"Sniff": true
},
"TeamSettings": {
"SiteName": "Mattermost",
"MaxUsersPerTeam": 50,

View File

@@ -0,0 +1,23 @@
// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
// See License.txt for license information.
package einterfaces
import "github.com/mattermost/platform/model"
type ElasticSearchInterface interface {
Start() *model.AppError
IndexPost(post *model.Post, teamId string)
SearchPosts(channels *model.ChannelList, searchParams []*model.SearchParams) ([]string, *model.AppError)
DeletePost(postId string)
}
var theElasticSearchInterface ElasticSearchInterface
func RegisterElasticSearchInterface(newInterface ElasticSearchInterface) {
theElasticSearchInterface = newInterface
}
func GetElasticSearchInterface() ElasticSearchInterface {
return theElasticSearchInterface
}

View File

@@ -3307,6 +3307,38 @@
"id": "ent.compliance.run_started.info",
"translation": "Compliance export started for job '{{.JobName}}' at '{{.FilePath}}'"
},
{
"id": "ent.elasticsearch.start.connect_failed",
"translation": "Setting up ElasticSearch Client Failed"
},
{
"id": "ent.elasticsearch.start.index_exists_failed",
"translation": "Failed to establish whether ElasticSearch index exists"
},
{
"id": "ent.elasticsearch.start.index_create_failed",
"translation": "Failed to create ElasticSearch index"
},
{
"id": "ent.elasticsearch.start.index_settings_failed",
"translation": "Failed to set ElasticSearch index settings"
},
{
"id": "ent.elasticsearch.start.index_mapping_failed",
"translation": "Failed to setup ElasticSearch index mapping"
},
{
"id": "ent.elasticsearch.search_posts.disabled",
"translation": "ElasticSearch searching is disabled on this server"
},
{
"id": "ent.elasticsearch.search_posts.search_failed",
"translation": "Search failed to complete"
},
{
"id": "ent.elasticsearch.search_posts.unmarshall_post_failed",
"translation": "Failed to decode search results"
},
{
"id": "ent.emoji.licence_disable.app_error",
"translation": "Custom emoji restrictions disabled by current license. Please contact your system administrator about upgrading your enterprise license."
@@ -3859,6 +3891,22 @@
"id": "model.compliance.is_valid.start_end_at.app_error",
"translation": "To must be greater than From"
},
{
"id": "model.config.is_valid.elastic_search.connection_url.app_error",
"translation": "Elastic Search ConnectionUrl setting must be provided when Elastic Search indexing is enabled."
},
{
"id": "model.config.is_valid.elastic_search.username.app_error",
"translation": "Elastic Search Username setting must be provided when Elastic Search indexing is enabled."
},
{
"id": "model.config.is_valid.elastic_search.password.app_error",
"translation": "Elastic Search Password setting must be provided when Elastic Search indexing is enabled."
},
{
"id": "model.config.is_valid.elastic_search.enable_searching.app_error",
"translation": "Elastic Search IndexingEnabled setting must be set to true when Elastic Search SearchEnabled is set to true."
},
{
"id": "model.config.is_valid.cluster_email_batching.app_error",
"translation": "Unable to enable email batching when clustering is enabled."
@@ -5179,6 +5227,10 @@
"id": "store.sql_post.get.app_error",
"translation": "We couldn't get the post"
},
{
"id": "store.sql_post.get_posts_by_ids.app_error",
"translation": "We couldn't get the posts"
},
{
"id": "store.sql_post.get_parents_posts.app_error",
"translation": "We couldn't get the parent post for the channel"

View File

@@ -401,29 +401,39 @@ type WebrtcSettings struct {
TurnSharedKey *string
}
type ElasticSearchSettings struct {
ConnectionUrl *string
Username *string
Password *string
EnableIndexing *bool
EnableSearching *bool
Sniff *bool
}
type Config struct {
ServiceSettings ServiceSettings
TeamSettings TeamSettings
SqlSettings SqlSettings
LogSettings LogSettings
PasswordSettings PasswordSettings
FileSettings FileSettings
EmailSettings EmailSettings
RateLimitSettings RateLimitSettings
PrivacySettings PrivacySettings
SupportSettings SupportSettings
GitLabSettings SSOSettings
GoogleSettings SSOSettings
Office365Settings SSOSettings
LdapSettings LdapSettings
ComplianceSettings ComplianceSettings
LocalizationSettings LocalizationSettings
SamlSettings SamlSettings
NativeAppSettings NativeAppSettings
ClusterSettings ClusterSettings
MetricsSettings MetricsSettings
AnalyticsSettings AnalyticsSettings
WebrtcSettings WebrtcSettings
ServiceSettings ServiceSettings
TeamSettings TeamSettings
SqlSettings SqlSettings
LogSettings LogSettings
PasswordSettings PasswordSettings
FileSettings FileSettings
EmailSettings EmailSettings
RateLimitSettings RateLimitSettings
PrivacySettings PrivacySettings
SupportSettings SupportSettings
GitLabSettings SSOSettings
GoogleSettings SSOSettings
Office365Settings SSOSettings
LdapSettings LdapSettings
ComplianceSettings ComplianceSettings
LocalizationSettings LocalizationSettings
SamlSettings SamlSettings
NativeAppSettings NativeAppSettings
ClusterSettings ClusterSettings
MetricsSettings MetricsSettings
AnalyticsSettings AnalyticsSettings
WebrtcSettings WebrtcSettings
ElasticSearchSettings ElasticSearchSettings
}
func (o *Config) ToJson() string {
@@ -1217,6 +1227,36 @@ func (o *Config) SetDefaults() {
*o.ServiceSettings.ClusterLogTimeoutMilliseconds = 2000
}
if o.ElasticSearchSettings.ConnectionUrl == nil {
o.ElasticSearchSettings.ConnectionUrl = new(string)
*o.ElasticSearchSettings.ConnectionUrl = ""
}
if o.ElasticSearchSettings.Username == nil {
o.ElasticSearchSettings.Username = new(string)
*o.ElasticSearchSettings.Username = ""
}
if o.ElasticSearchSettings.Password == nil {
o.ElasticSearchSettings.Password = new(string)
*o.ElasticSearchSettings.Password = ""
}
if o.ElasticSearchSettings.EnableIndexing == nil {
o.ElasticSearchSettings.EnableIndexing = new(bool)
*o.ElasticSearchSettings.EnableIndexing = false
}
if o.ElasticSearchSettings.EnableSearching == nil {
o.ElasticSearchSettings.EnableSearching = new(bool)
*o.ElasticSearchSettings.EnableSearching = false
}
if o.ElasticSearchSettings.Sniff == nil {
o.ElasticSearchSettings.Sniff = new(bool)
*o.ElasticSearchSettings.Sniff = true
}
o.defaultWebrtcSettings()
}
@@ -1448,6 +1488,16 @@ func (o *Config) IsValid() *AppError {
return NewLocAppError("Config.IsValid", "model.config.is_valid.time_between_user_typing.app_error", nil, "")
}
if *o.ElasticSearchSettings.EnableIndexing {
if len(*o.ElasticSearchSettings.ConnectionUrl) == 0 {
return NewLocAppError("Config.IsValid", "model.config.is_valid.elastic_search.connection_url.app_error", nil, "")
}
}
if *o.ElasticSearchSettings.EnableSearching && !*o.ElasticSearchSettings.EnableIndexing {
return NewLocAppError("Config.IsValid", "model.config.is_valid.elastic_search.enable_searching.app_error", nil, "")
}
return nil
}
@@ -1488,6 +1538,10 @@ func (o *Config) Sanitize() {
for i := range o.SqlSettings.DataSourceSearchReplicas {
o.SqlSettings.DataSourceSearchReplicas[i] = FAKE_SETTING
}
*o.ElasticSearchSettings.ConnectionUrl = FAKE_SETTING
*o.ElasticSearchSettings.Username = FAKE_SETTING
*o.ElasticSearchSettings.Password = FAKE_SETTING
}
func (o *Config) defaultWebrtcSettings() {

View File

@@ -1287,3 +1287,30 @@ func (s SqlPostStore) GetPostsCreatedAt(channelId string, time int64) StoreChann
return storeChannel
}
func (s SqlPostStore) GetPostsByIds(postIds []string) StoreChannel {
storeChannel := make(StoreChannel, 1)
go func() {
result := StoreResult{}
inClause := `'` + strings.Join(postIds, `', '`) + `'`
query := `SELECT * FROM Posts WHERE Id in (` + inClause + `) and DeleteAt = 0 ORDER BY CreateAt DESC`
var posts []*model.Post
_, err := s.GetReplica().Select(&posts, query, map[string]interface{}{})
if err != nil {
l4g.Error(err)
result.Err = model.NewAppError("SqlPostStore.GetPostsCreatedAt", "store.sql_post.get_posts_by_ids.app_error", nil, "", http.StatusInternalServerError)
} else {
result.Data = posts
}
storeChannel <- result
close(storeChannel)
}()
return storeChannel
}

View File

@@ -1550,3 +1550,45 @@ func TestPostStoreOverwrite(t *testing.T) {
t.Fatal("Failed to set FileIds")
}
}
func TestPostStoreGetPostsByIds(t *testing.T) {
Setup()
o1 := &model.Post{}
o1.ChannelId = model.NewId()
o1.UserId = model.NewId()
o1.Message = "a" + model.NewId() + "AAAAAAAAAAA"
o1 = (<-store.Post().Save(o1)).Data.(*model.Post)
o2 := &model.Post{}
o2.ChannelId = o1.ChannelId
o2.UserId = model.NewId()
o2.Message = "a" + model.NewId() + "CCCCCCCCC"
o2 = (<-store.Post().Save(o2)).Data.(*model.Post)
o3 := &model.Post{}
o3.ChannelId = o1.ChannelId
o3.UserId = model.NewId()
o3.Message = "a" + model.NewId() + "QQQQQQQQQQ"
o3 = (<-store.Post().Save(o3)).Data.(*model.Post)
ro1 := (<-store.Post().Get(o1.Id)).Data.(*model.PostList).Posts[o1.Id]
ro2 := (<-store.Post().Get(o2.Id)).Data.(*model.PostList).Posts[o2.Id]
ro3 := (<-store.Post().Get(o3.Id)).Data.(*model.PostList).Posts[o3.Id]
postIds := []string{
ro1.Id,
ro2.Id,
ro3.Id,
}
if ro4 := Must(store.Post().GetPostsByIds(postIds)).([]*model.Post); len(ro4) != 3 {
t.Fatalf("Expected 3 posts in results. Got %v", len(ro4))
}
Must(store.Post().Delete(ro1.Id, model.GetMillis()))
if ro5 := Must(store.Post().GetPostsByIds(postIds)).([]*model.Post); len(ro5) != 2 {
t.Fatalf("Expected 2 posts in results. Got %v", len(ro5))
}
}

View File

@@ -165,6 +165,7 @@ type PostStore interface {
InvalidateLastPostTimeCache(channelId string)
GetPostsCreatedAt(channelId string, time int64) StoreChannel
Overwrite(post *model.Post) StoreChannel
GetPostsByIds(postIds []string) StoreChannel
}
type UserStore interface {

View File

@@ -492,6 +492,11 @@ func getClientConfig(c *model.Config) map[string]string {
props["PasswordRequireNumber"] = strconv.FormatBool(*c.PasswordSettings.Number)
props["PasswordRequireSymbol"] = strconv.FormatBool(*c.PasswordSettings.Symbol)
}
if *License.Features.ElasticSearch {
props["ElasticSearchEnableIndexing"] = strconv.FormatBool(*c.ElasticSearchSettings.EnableIndexing)
props["ElasticSearchEnableSearching"] = strconv.FormatBool(*c.ElasticSearchSettings.EnableSearching)
}
}
return props
@@ -560,6 +565,16 @@ func Desanitize(cfg *model.Config) {
cfg.SqlSettings.AtRestEncryptKey = Cfg.SqlSettings.AtRestEncryptKey
}
if *cfg.ElasticSearchSettings.ConnectionUrl == model.FAKE_SETTING {
*cfg.ElasticSearchSettings.ConnectionUrl = *Cfg.ElasticSearchSettings.ConnectionUrl
}
if *cfg.ElasticSearchSettings.Username == model.FAKE_SETTING {
*cfg.ElasticSearchSettings.Username = *Cfg.ElasticSearchSettings.Username
}
if *cfg.ElasticSearchSettings.Password == model.FAKE_SETTING {
*cfg.ElasticSearchSettings.Password = *Cfg.ElasticSearchSettings.Password
}
for i := range cfg.SqlSettings.DataSourceReplicas {
cfg.SqlSettings.DataSourceReplicas[i] = Cfg.SqlSettings.DataSourceReplicas[i]
}