diff --git a/.gitignore b/.gitignore
index 2db234ffb4..df9700c68c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -33,6 +33,7 @@ imports/imports.go
# Folders
_obj
_test
+.vscode
# Architecture specific extensions/prefixes
[568vq].out
diff --git a/api/admin.go b/api/admin.go
index a6ced71a9b..090ca0550f 100644
--- a/api/admin.go
+++ b/api/admin.go
@@ -10,6 +10,7 @@ import (
l4g "github.com/alecthomas/log4go"
"github.com/gorilla/mux"
"github.com/mattermost/platform/app"
+ "github.com/mattermost/platform/einterfaces"
"github.com/mattermost/platform/model"
"github.com/mattermost/platform/utils"
"github.com/mssola/user_agent"
@@ -46,7 +47,7 @@ func InitAdmin() {
}
func getLogs(c *Context, w http.ResponseWriter, r *http.Request) {
- lines, err := app.GetLogs(0, 100000)
+ lines, err := app.GetLogs(0, 10000)
if err != nil {
c.Err = err
return
@@ -57,6 +58,11 @@ func getLogs(c *Context, w http.ResponseWriter, r *http.Request) {
func getClusterStatus(c *Context, w http.ResponseWriter, r *http.Request) {
infos := app.GetClusterStatus()
+
+ if einterfaces.GetClusterInterface() != nil {
+ w.Header().Set(model.HEADER_CLUSTER_ID, einterfaces.GetClusterInterface().GetClusterId())
+ }
+
w.Write([]byte(model.ClusterInfosToJson(infos)))
}
@@ -107,7 +113,7 @@ func saveConfig(c *Context, w http.ResponseWriter, r *http.Request) {
return
}
- err := app.SaveConfig(cfg)
+ err := app.SaveConfig(cfg, true)
if err != nil {
c.Err = err
return
diff --git a/api/context.go b/api/context.go
index 33dc8b2acf..6d1e758e8d 100644
--- a/api/context.go
+++ b/api/context.go
@@ -150,9 +150,6 @@ func (h handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set(model.HEADER_REQUEST_ID, c.RequestId)
w.Header().Set(model.HEADER_VERSION_ID, fmt.Sprintf("%v.%v.%v.%v", model.CurrentVersion, model.BuildNumber, utils.ClientCfgHash, utils.IsLicensed))
- if einterfaces.GetClusterInterface() != nil {
- w.Header().Set(model.HEADER_CLUSTER_ID, einterfaces.GetClusterInterface().GetClusterId())
- }
// Instruct the browser not to display us in an iframe unless is the same origin for anti-clickjacking
if !h.isApi {
diff --git a/api4/context.go b/api4/context.go
index 8d4ed7f794..7a908c5884 100644
--- a/api4/context.go
+++ b/api4/context.go
@@ -130,9 +130,6 @@ func (h handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set(model.HEADER_REQUEST_ID, c.RequestId)
w.Header().Set(model.HEADER_VERSION_ID, fmt.Sprintf("%v.%v.%v.%v", model.CurrentVersion, model.BuildNumber, utils.ClientCfgHash, utils.IsLicensed))
- if einterfaces.GetClusterInterface() != nil {
- w.Header().Set(model.HEADER_CLUSTER_ID, einterfaces.GetClusterInterface().GetClusterId())
- }
w.Header().Set("Content-Type", "application/json")
diff --git a/api4/system.go b/api4/system.go
index 465f4e71d1..97d8bb7dc9 100644
--- a/api4/system.go
+++ b/api4/system.go
@@ -107,7 +107,7 @@ func updateConfig(c *Context, w http.ResponseWriter, r *http.Request) {
return
}
- err := app.SaveConfig(cfg)
+ err := app.SaveConfig(cfg, true)
if err != nil {
c.Err = err
return
diff --git a/app/admin.go b/app/admin.go
index 103c4617b4..4f8125106e 100644
--- a/app/admin.go
+++ b/app/admin.go
@@ -19,12 +19,23 @@ import (
)
func GetLogs(page, perPage int) ([]string, *model.AppError) {
- lines, err := GetLogsSkipSend(page, perPage)
+ var lines []string
+ if einterfaces.GetClusterInterface() != nil && *utils.Cfg.ClusterSettings.Enable {
+ lines = append(lines, "-----------------------------------------------------------------------------------------------------------")
+ lines = append(lines, "-----------------------------------------------------------------------------------------------------------")
+ lines = append(lines, einterfaces.GetClusterInterface().GetClusterId())
+ lines = append(lines, "-----------------------------------------------------------------------------------------------------------")
+ lines = append(lines, "-----------------------------------------------------------------------------------------------------------")
+ }
+
+ melines, err := GetLogsSkipSend(page, perPage)
if err != nil {
return nil, err
}
- if einterfaces.GetClusterInterface() != nil {
+ lines = append(lines, melines...)
+
+ if einterfaces.GetClusterInterface() != nil && *utils.Cfg.ClusterSettings.Enable {
clines, err := einterfaces.GetClusterInterface().GetLogs(page, perPage)
if err != nil {
return nil, err
@@ -84,10 +95,14 @@ func InvalidateAllCaches() *model.AppError {
InvalidateAllCachesSkipSend()
if einterfaces.GetClusterInterface() != nil {
- err := einterfaces.GetClusterInterface().InvalidateAllCaches()
- if err != nil {
- return err
+
+ msg := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_INVALIDATE_ALL_CACHES,
+ SendType: model.CLUSTER_SEND_RELIABLE,
+ WaitForAllToSend: true,
}
+
+ einterfaces.GetClusterInterface().SendClusterMessage(msg)
}
return nil
@@ -120,7 +135,8 @@ func ReloadConfig() {
InitEmailBatching()
}
-func SaveConfig(cfg *model.Config) *model.AppError {
+func SaveConfig(cfg *model.Config, sendConfigChangeClusterMessage bool) *model.AppError {
+ oldCfg := utils.Cfg
cfg.SetDefaults()
utils.Desanitize(cfg)
@@ -132,7 +148,7 @@ func SaveConfig(cfg *model.Config) *model.AppError {
return err
}
- if *utils.Cfg.ClusterSettings.Enable {
+ if *utils.Cfg.ClusterSettings.Enable && *utils.Cfg.ClusterSettings.ReadOnlyConfig {
return model.NewLocAppError("saveConfig", "ent.cluster.save_config.error", nil, "")
}
@@ -149,14 +165,12 @@ func SaveConfig(cfg *model.Config) *model.AppError {
}
}
- // oldCfg := utils.Cfg
- // Future feature is to sync the configuration files
- // if einterfaces.GetClusterInterface() != nil {
- // err := einterfaces.GetClusterInterface().ConfigChanged(cfg, oldCfg, true)
- // if err != nil {
- // return err
- // }
- // }
+ if einterfaces.GetClusterInterface() != nil {
+ err := einterfaces.GetClusterInterface().ConfigChanged(cfg, oldCfg, sendConfigChangeClusterMessage)
+ if err != nil {
+ return err
+ }
+ }
// start/restart email batching job if necessary
InitEmailBatching()
diff --git a/app/cluster_discovery.go b/app/cluster_discovery.go
new file mode 100644
index 0000000000..6584418f15
--- /dev/null
+++ b/app/cluster_discovery.go
@@ -0,0 +1,77 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package app
+
+import (
+ "fmt"
+ "time"
+
+ l4g "github.com/alecthomas/log4go"
+ "github.com/mattermost/platform/model"
+)
+
+const (
+ DISCOVERY_SERVICE_WRITE_PING = 60 * time.Second
+)
+
+type ClusterDiscoveryService struct {
+ model.ClusterDiscovery
+ stop chan bool
+}
+
+func NewClusterDiscoveryService() *ClusterDiscoveryService {
+ ds := &ClusterDiscoveryService{
+ ClusterDiscovery: model.ClusterDiscovery{},
+ stop: make(chan bool),
+ }
+
+ return ds
+}
+
+func (me *ClusterDiscoveryService) Start() {
+
+ <-Srv.Store.ClusterDiscovery().Cleanup()
+
+ if cresult := <-Srv.Store.ClusterDiscovery().Exists(&me.ClusterDiscovery); cresult.Err != nil {
+ l4g.Error(fmt.Sprintf("ClusterDiscoveryService failed to check if row exists for %v with err=%v", me.ClusterDiscovery.ToJson(), cresult.Err))
+ } else {
+ if cresult.Data.(bool) {
+ if u := <-Srv.Store.ClusterDiscovery().Delete(&me.ClusterDiscovery); u.Err != nil {
+ l4g.Error(fmt.Sprintf("ClusterDiscoveryService failed to start clean for %v with err=%v", me.ClusterDiscovery.ToJson(), u.Err))
+ }
+ }
+ }
+
+ if result := <-Srv.Store.ClusterDiscovery().Save(&me.ClusterDiscovery); result.Err != nil {
+ l4g.Error(fmt.Sprintf("ClusterDiscoveryService failed to save for %v with err=%v", me.ClusterDiscovery.ToJson(), result.Err))
+ return
+ }
+
+ go func() {
+ l4g.Debug(fmt.Sprintf("ClusterDiscoveryService ping writer started for %v", me.ClusterDiscovery.ToJson()))
+ ticker := time.NewTicker(DISCOVERY_SERVICE_WRITE_PING)
+ defer func() {
+ ticker.Stop()
+ if u := <-Srv.Store.ClusterDiscovery().Delete(&me.ClusterDiscovery); u.Err != nil {
+ l4g.Error(fmt.Sprintf("ClusterDiscoveryService failed to cleanup for %v with err=%v", me.ClusterDiscovery.ToJson(), u.Err))
+ }
+ l4g.Debug(fmt.Sprintf("ClusterDiscoveryService ping writer stopped for %v", me.ClusterDiscovery.ToJson()))
+ }()
+
+ for {
+ select {
+ case <-ticker.C:
+ if u := <-Srv.Store.ClusterDiscovery().SetLastPingAt(&me.ClusterDiscovery); u.Err != nil {
+ l4g.Error(fmt.Sprintf("ClusterDiscoveryService failed to write ping for %v with err=%v", me.ClusterDiscovery.ToJson(), u.Err))
+ }
+ case <-me.stop:
+ return
+ }
+ }
+ }()
+}
+
+func (me *ClusterDiscoveryService) Stop() {
+ me.stop <- true
+}
diff --git a/app/cluster_discovery_test.go b/app/cluster_discovery_test.go
new file mode 100644
index 0000000000..ca5b1bfa45
--- /dev/null
+++ b/app/cluster_discovery_test.go
@@ -0,0 +1,27 @@
+// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package app
+
+import (
+ "testing"
+
+ "time"
+
+ "github.com/mattermost/platform/model"
+)
+
+func TestClusterDiscoveryService(t *testing.T) {
+ Setup()
+
+ ds := NewClusterDiscoveryService()
+ ds.Type = model.CDS_TYPE_APP
+ ds.ClusterName = "ClusterA"
+ ds.AutoFillHostname()
+
+ ds.Start()
+ time.Sleep(2 * time.Second)
+
+ ds.Stop()
+ time.Sleep(2 * time.Second)
+}
diff --git a/app/cluster_handlers.go b/app/cluster_handlers.go
new file mode 100644
index 0000000000..d15bb851af
--- /dev/null
+++ b/app/cluster_handlers.go
@@ -0,0 +1,77 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package app
+
+import (
+ "strings"
+
+ "github.com/mattermost/platform/einterfaces"
+ "github.com/mattermost/platform/model"
+)
+
+func RegisterAllClusterMessageHandlers() {
+ einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_PUBLISH, ClusterPublishHandler)
+ einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_UPDATE_STATUS, ClusterUpdateStatusHandler)
+ einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_ALL_CACHES, ClusterInvalidateAllCachesHandler)
+ einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_REACTIONS, ClusterInvalidateCacheForReactionsHandler)
+ einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_WEBHOOK, ClusterInvalidateCacheForWebhookHandler)
+ einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_POSTS, ClusterInvalidateCacheForChannelPostsHandler)
+ einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_MEMBERS_NOTIFY_PROPS, ClusterInvalidateCacheForChannelMembersNotifyPropHandler)
+ einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_MEMBERS, ClusterInvalidateCacheForChannelMembersHandler)
+ einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_BY_NAME, ClusterInvalidateCacheForChannelByNameHandler)
+ einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL, ClusterInvalidateCacheForChannelHandler)
+ einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_USER, ClusterInvalidateCacheForUserHandler)
+ einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_CLEAR_SESSION_CACHE_FOR_USER, ClusterClearSessionCacheForUserHandler)
+
+}
+
+func ClusterPublishHandler(msg *model.ClusterMessage) {
+ event := model.WebSocketEventFromJson(strings.NewReader(msg.Data))
+ PublishSkipClusterSend(event)
+}
+
+func ClusterUpdateStatusHandler(msg *model.ClusterMessage) {
+ status := model.StatusFromJson(strings.NewReader(msg.Data))
+ AddStatusCacheSkipClusterSend(status)
+}
+
+func ClusterInvalidateAllCachesHandler(msg *model.ClusterMessage) {
+ InvalidateAllCachesSkipSend()
+}
+
+func ClusterInvalidateCacheForReactionsHandler(msg *model.ClusterMessage) {
+ InvalidateCacheForReactionsSkipClusterSend(msg.Data)
+}
+
+func ClusterInvalidateCacheForWebhookHandler(msg *model.ClusterMessage) {
+ InvalidateCacheForWebhookSkipClusterSend(msg.Data)
+}
+
+func ClusterInvalidateCacheForChannelPostsHandler(msg *model.ClusterMessage) {
+ InvalidateCacheForWebhookSkipClusterSend(msg.Data)
+}
+
+func ClusterInvalidateCacheForChannelMembersNotifyPropHandler(msg *model.ClusterMessage) {
+ InvalidateCacheForChannelMembersNotifyPropsSkipClusterSend(msg.Data)
+}
+
+func ClusterInvalidateCacheForChannelMembersHandler(msg *model.ClusterMessage) {
+ InvalidateCacheForChannelMembersSkipClusterSend(msg.Data)
+}
+
+func ClusterInvalidateCacheForChannelByNameHandler(msg *model.ClusterMessage) {
+ InvalidateCacheForChannelByNameSkipClusterSend(msg.Props["id"], msg.Props["name"])
+}
+
+func ClusterInvalidateCacheForChannelHandler(msg *model.ClusterMessage) {
+ InvalidateCacheForChannelSkipClusterSend(msg.Data)
+}
+
+func ClusterInvalidateCacheForUserHandler(msg *model.ClusterMessage) {
+ InvalidateCacheForUserSkipClusterSend(msg.Data)
+}
+
+func ClusterClearSessionCacheForUserHandler(msg *model.ClusterMessage) {
+ ClearSessionCacheForUserSkipClusterSend(msg.Data)
+}
diff --git a/app/session.go b/app/session.go
index 7290bfd88f..4b1ea18f2a 100644
--- a/app/session.go
+++ b/app/session.go
@@ -101,7 +101,12 @@ func ClearSessionCacheForUser(userId string) {
ClearSessionCacheForUserSkipClusterSend(userId)
if einterfaces.GetClusterInterface() != nil {
- einterfaces.GetClusterInterface().ClearSessionCacheForUser(userId)
+ msg := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_CLEAR_SESSION_CACHE_FOR_USER,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Data: userId,
+ }
+ einterfaces.GetClusterInterface().SendClusterMessage(msg)
}
}
diff --git a/app/status.go b/app/status.go
index 868e57563c..9f6ad2e051 100644
--- a/app/status.go
+++ b/app/status.go
@@ -26,7 +26,12 @@ func AddStatusCache(status *model.Status) {
AddStatusCacheSkipClusterSend(status)
if einterfaces.GetClusterInterface() != nil {
- einterfaces.GetClusterInterface().UpdateStatus(status)
+ msg := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_UPDATE_STATUS,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Data: status.ToJson(),
+ }
+ einterfaces.GetClusterInterface().SendClusterMessage(msg)
}
}
diff --git a/app/web_hub.go b/app/web_hub.go
index 6b61430dc1..cadad0de42 100644
--- a/app/web_hub.go
+++ b/app/web_hub.go
@@ -64,10 +64,11 @@ func TotalWebsocketConnections() int {
}
func HubStart() {
- l4g.Info(utils.T("api.web_hub.start.starting.debug"), runtime.NumCPU()*2)
-
// Total number of hubs is twice the number of CPUs.
- hubs = make([]*Hub, runtime.NumCPU()*2)
+ numberOfHubs := runtime.NumCPU() * 2
+ l4g.Info(utils.T("api.web_hub.start.starting.debug"), numberOfHubs)
+
+ hubs = make([]*Hub, numberOfHubs)
for i := 0; i < len(hubs); i++ {
hubs[i] = NewWebHub()
@@ -142,17 +143,28 @@ func HubUnregister(webConn *WebConn) {
}
func Publish(message *model.WebSocketEvent) {
-
if metrics := einterfaces.GetMetricsInterface(); metrics != nil {
metrics.IncrementWebsocketEvent(message.Event)
}
- for _, hub := range hubs {
- hub.Broadcast(message)
- }
+ PublishSkipClusterSend(message)
if einterfaces.GetClusterInterface() != nil {
- einterfaces.GetClusterInterface().Publish(message)
+ cm := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_PUBLISH,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Data: message.ToJson(),
+ }
+
+ if message.Event == model.WEBSOCKET_EVENT_POSTED ||
+ message.Event == model.WEBSOCKET_EVENT_POST_EDITED ||
+ message.Event == model.WEBSOCKET_EVENT_DIRECT_ADDED ||
+ message.Event == model.WEBSOCKET_EVENT_GROUP_ADDED ||
+ message.Event == model.WEBSOCKET_EVENT_ADDED_TO_TEAM {
+ cm.SendType = model.CLUSTER_SEND_RELIABLE
+ }
+
+ einterfaces.GetClusterInterface().SendClusterMessage(cm)
}
}
@@ -167,16 +179,28 @@ func InvalidateCacheForChannel(channel *model.Channel) {
InvalidateCacheForChannelByNameSkipClusterSend(channel.TeamId, channel.Name)
if cluster := einterfaces.GetClusterInterface(); cluster != nil {
- cluster.InvalidateCacheForChannel(channel.Id)
- cluster.InvalidateCacheForChannelByName(channel.TeamId, channel.Name)
- }
-}
+ msg := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Data: channel.Id,
+ }
-func InvalidateCacheForChannelMembers(channelId string) {
- InvalidateCacheForChannelMembersSkipClusterSend(channelId)
+ einterfaces.GetClusterInterface().SendClusterMessage(msg)
- if cluster := einterfaces.GetClusterInterface(); cluster != nil {
- cluster.InvalidateCacheForChannelMembers(channelId)
+ nameMsg := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_BY_NAME,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Props: make(map[string]string),
+ }
+
+ nameMsg.Props["name"] = channel.Name
+ if channel.TeamId == "" {
+ nameMsg.Props["id"] = "dm"
+ } else {
+ nameMsg.Props["id"] = channel.TeamId
+ }
+
+ einterfaces.GetClusterInterface().SendClusterMessage(nameMsg)
}
}
@@ -184,6 +208,19 @@ func InvalidateCacheForChannelSkipClusterSend(channelId string) {
Srv.Store.Channel().InvalidateChannel(channelId)
}
+func InvalidateCacheForChannelMembers(channelId string) {
+ InvalidateCacheForChannelMembersSkipClusterSend(channelId)
+
+ if einterfaces.GetClusterInterface() != nil {
+ msg := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_MEMBERS,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Data: channelId,
+ }
+ einterfaces.GetClusterInterface().SendClusterMessage(msg)
+ }
+}
+
func InvalidateCacheForChannelMembersSkipClusterSend(channelId string) {
Srv.Store.User().InvalidateProfilesInChannelCache(channelId)
Srv.Store.Channel().InvalidateMemberCount(channelId)
@@ -192,8 +229,13 @@ func InvalidateCacheForChannelMembersSkipClusterSend(channelId string) {
func InvalidateCacheForChannelMembersNotifyProps(channelId string) {
InvalidateCacheForChannelMembersNotifyPropsSkipClusterSend(channelId)
- if cluster := einterfaces.GetClusterInterface(); cluster != nil {
- cluster.InvalidateCacheForChannelMembersNotifyProps(channelId)
+ if einterfaces.GetClusterInterface() != nil {
+ msg := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_MEMBERS_NOTIFY_PROPS,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Data: channelId,
+ }
+ einterfaces.GetClusterInterface().SendClusterMessage(msg)
}
}
@@ -202,14 +244,23 @@ func InvalidateCacheForChannelMembersNotifyPropsSkipClusterSend(channelId string
}
func InvalidateCacheForChannelByNameSkipClusterSend(teamId, name string) {
+ if teamId == "" {
+ teamId = "dm"
+ }
+
Srv.Store.Channel().InvalidateChannelByName(teamId, name)
}
func InvalidateCacheForChannelPosts(channelId string) {
InvalidateCacheForChannelPostsSkipClusterSend(channelId)
- if cluster := einterfaces.GetClusterInterface(); cluster != nil {
- cluster.InvalidateCacheForChannelPosts(channelId)
+ if einterfaces.GetClusterInterface() != nil {
+ msg := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_POSTS,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Data: channelId,
+ }
+ einterfaces.GetClusterInterface().SendClusterMessage(msg)
}
}
@@ -221,7 +272,12 @@ func InvalidateCacheForUser(userId string) {
InvalidateCacheForUserSkipClusterSend(userId)
if einterfaces.GetClusterInterface() != nil {
- einterfaces.GetClusterInterface().InvalidateCacheForUser(userId)
+ msg := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_USER,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Data: userId,
+ }
+ einterfaces.GetClusterInterface().SendClusterMessage(msg)
}
}
@@ -238,8 +294,13 @@ func InvalidateCacheForUserSkipClusterSend(userId string) {
func InvalidateCacheForWebhook(webhookId string) {
InvalidateCacheForWebhookSkipClusterSend(webhookId)
- if cluster := einterfaces.GetClusterInterface(); cluster != nil {
- cluster.InvalidateCacheForWebhook(webhookId)
+ if einterfaces.GetClusterInterface() != nil {
+ msg := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_WEBHOOK,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Data: webhookId,
+ }
+ einterfaces.GetClusterInterface().SendClusterMessage(msg)
}
}
@@ -256,8 +317,13 @@ func InvalidateWebConnSessionCacheForUser(userId string) {
func InvalidateCacheForReactions(postId string) {
InvalidateCacheForReactionsSkipClusterSend(postId)
- if cluster := einterfaces.GetClusterInterface(); cluster != nil {
- cluster.InvalidateCacheForReactions(postId)
+ if einterfaces.GetClusterInterface() != nil {
+ msg := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_REACTIONS,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Data: postId,
+ }
+ einterfaces.GetClusterInterface().SendClusterMessage(msg)
}
}
diff --git a/cmd/platform/server.go b/cmd/platform/server.go
index ba7ace0628..2eedbd54a9 100644
--- a/cmd/platform/server.go
+++ b/cmd/platform/server.go
@@ -112,6 +112,7 @@ func runServer(configFileLocation string) {
}
if einterfaces.GetClusterInterface() != nil {
+ app.RegisterAllClusterMessageHandlers()
einterfaces.GetClusterInterface().StartInterNodeCommunication()
}
diff --git a/config/config.json b/config/config.json
index 31b1b1a458..85e07bdce3 100644
--- a/config/config.json
+++ b/config/config.json
@@ -263,8 +263,13 @@
},
"ClusterSettings": {
"Enable": false,
- "InterNodeListenAddress": ":8075",
- "InterNodeUrls": []
+ "ClusterName": "",
+ "OverrideHostname": "",
+ "UseIpAddress": true,
+ "UseExperimentalGossip": false,
+ "ReadOnlyConfig": true,
+ "GossipPort": 8074,
+ "StreamingPort": 8075
},
"MetricsSettings": {
"Enable": false,
diff --git a/einterfaces/cluster.go b/einterfaces/cluster.go
index ba780f8b16..096a775fe4 100644
--- a/einterfaces/cluster.go
+++ b/einterfaces/cluster.go
@@ -7,26 +7,19 @@ import (
"github.com/mattermost/platform/model"
)
+type ClusterMessageHandler func(msg *model.ClusterMessage)
+
type ClusterInterface interface {
StartInterNodeCommunication()
StopInterNodeCommunication()
- GetClusterInfos() []*model.ClusterInfo
- GetClusterStats() ([]*model.ClusterStats, *model.AppError)
- ClearSessionCacheForUser(userId string)
- InvalidateCacheForUser(userId string)
- InvalidateCacheForChannel(channelId string)
- InvalidateCacheForChannelByName(teamId, name string)
- InvalidateCacheForChannelMembers(channelId string)
- InvalidateCacheForChannelMembersNotifyProps(channelId string)
- InvalidateCacheForChannelPosts(channelId string)
- InvalidateCacheForWebhook(webhookId string)
- InvalidateCacheForReactions(postId string)
- Publish(event *model.WebSocketEvent)
- UpdateStatus(status *model.Status)
- GetLogs(page, perPage int) ([]string, *model.AppError)
+ RegisterClusterMessageHandler(event string, crm ClusterMessageHandler)
GetClusterId() string
+ GetClusterInfos() []*model.ClusterInfo
+ SendClusterMessage(cluster *model.ClusterMessage)
+ NotifyMsg(buf []byte)
+ GetClusterStats() ([]*model.ClusterStats, *model.AppError)
+ GetLogs(page, perPage int) ([]string, *model.AppError)
ConfigChanged(previousConfig *model.Config, newConfig *model.Config, sendToOtherServer bool) *model.AppError
- InvalidateAllCaches() *model.AppError
}
var theClusterInterface ClusterInterface
diff --git a/i18n/en.json b/i18n/en.json
index 188b5526b6..3fec5d17f3 100644
--- a/i18n/en.json
+++ b/i18n/en.json
@@ -3261,7 +3261,7 @@
},
{
"id": "ent.cluster.config_changed.info",
- "translation": "Cluster configuration has changed for id=%v. Attempting to restart cluster service. To ensure the cluster is configured correctly you should not rely on this restart because we detected a core configuration change."
+ "translation": "Cluster configuration has changed for id={{ .id }}. The cluster may become unstable and a restart is required. To ensure the cluster is configured correctly you should perform a rolling restart immediately."
},
{
"id": "ent.cluster.debug_fail.debug",
@@ -3293,7 +3293,7 @@
},
{
"id": "ent.cluster.save_config.error",
- "translation": "System Console is set to read-only when High Availability is enabled."
+ "translation": "System Console is set to read-only when High Availability is enabled unless ReadOnlyConfig is disabled in the configuration file."
},
{
"id": "ent.cluster.starting.info",
@@ -4711,14 +4711,6 @@
"id": "store.sql.open_conn.panic",
"translation": "Failed to open SQL connection %v"
},
- {
- "id": "store.sql.ping.critical",
- "translation": "Failed to ping DB err:%v"
- },
- {
- "id": "store.sql.pinging.info",
- "translation": "Pinging SQL %v database"
- },
{
"id": "store.sql.read_replicas_not_licensed.critical",
"translation": "More than 1 read replica functionality disabled by current license. Please contact your system administrator about upgrading your enterprise license."
diff --git a/model/cluster_discovery.go b/model/cluster_discovery.go
new file mode 100644
index 0000000000..4b92696566
--- /dev/null
+++ b/model/cluster_discovery.go
@@ -0,0 +1,132 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package model
+
+import (
+ "encoding/json"
+ "io"
+ "os"
+)
+
+const (
+ CDS_OFFLINE_AFTER_MILLIS = 1000 * 60 * 30 // 30 minutes
+ CDS_TYPE_APP = "mattermost_app"
+)
+
+type ClusterDiscovery struct {
+ Id string `json:"id"`
+ Type string `json:"type"`
+ ClusterName string `json:"cluster_name"`
+ Hostname string `json:"hostname"`
+ GossipPort int32 `json:"gossip_port"`
+ Port int32 `json:"port"`
+ CreateAt int64 `json:"create_at"`
+ LastPingAt int64 `json:"last_ping_at"`
+}
+
+func (o *ClusterDiscovery) PreSave() {
+ if o.Id == "" {
+ o.Id = NewId()
+ }
+
+ if o.CreateAt == 0 {
+ o.CreateAt = GetMillis()
+ o.LastPingAt = o.CreateAt
+ }
+}
+
+func (o *ClusterDiscovery) AutoFillHostname() {
+ // attempt to set the hostname from the OS
+ if len(o.Hostname) == 0 {
+ if hn, err := os.Hostname(); err == nil {
+ o.Hostname = hn
+ }
+ }
+}
+
+func (o *ClusterDiscovery) AutoFillIpAddress() {
+ // attempt to set the hostname to the first non-local IP address
+ if len(o.Hostname) == 0 {
+ o.Hostname = GetServerIpAddress()
+ }
+}
+
+func (o *ClusterDiscovery) IsEqual(in *ClusterDiscovery) bool {
+ if in == nil {
+ return false
+ }
+
+ if o.Type != in.Type {
+ return false
+ }
+
+ if o.ClusterName != in.ClusterName {
+ return false
+ }
+
+ if o.Hostname != in.Hostname {
+ return false
+ }
+
+ return true
+}
+
+func FilterClusterDiscovery(vs []*ClusterDiscovery, f func(*ClusterDiscovery) bool) []*ClusterDiscovery {
+ copy := make([]*ClusterDiscovery, 0)
+ for _, v := range vs {
+ if f(v) {
+ copy = append(copy, v)
+ }
+ }
+
+ return copy
+}
+
+func (o *ClusterDiscovery) IsValid() *AppError {
+ if len(o.Id) != 26 {
+ return NewLocAppError("Channel.IsValid", "model.channel.is_valid.id.app_error", nil, "")
+ }
+
+ if len(o.ClusterName) == 0 {
+ return NewLocAppError("ClusterDiscovery.IsValid", "ClusterName must be set", nil, "")
+ }
+
+ if len(o.Type) == 0 {
+ return NewLocAppError("ClusterDiscovery.IsValid", "Type must be set", nil, "")
+ }
+
+ if len(o.Hostname) == 0 {
+ return NewLocAppError("ClusterDiscovery.IsValid", "Hostname must be set", nil, "")
+ }
+
+ if o.CreateAt == 0 {
+ return NewLocAppError("ClusterDiscovery.IsValid", "CreateAt must be set", nil, "")
+ }
+
+ if o.LastPingAt == 0 {
+ return NewLocAppError("ClusterDiscovery.IsValid", "LastPingAt must be set", nil, "")
+ }
+
+ return nil
+}
+
+func (o *ClusterDiscovery) ToJson() string {
+ b, err := json.Marshal(o)
+ if err != nil {
+ return ""
+ }
+
+ return string(b)
+}
+
+func ClusterDiscoveryFromJson(data io.Reader) *ClusterDiscovery {
+ decoder := json.NewDecoder(data)
+ var me ClusterDiscovery
+ err := decoder.Decode(&me)
+ if err == nil {
+ return &me
+ }
+
+ return nil
+}
diff --git a/model/cluster_discovery_test.go b/model/cluster_discovery_test.go
new file mode 100644
index 0000000000..bfbdbd303d
--- /dev/null
+++ b/model/cluster_discovery_test.go
@@ -0,0 +1,59 @@
+// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package model
+
+import (
+ "strings"
+ "testing"
+)
+
+func TestClusterDiscovery(t *testing.T) {
+ o := ClusterDiscovery{
+ Type: "test_type",
+ ClusterName: "cluster_name",
+ Hostname: "test_hostname",
+ }
+
+ json := o.ToJson()
+ result1 := ClusterDiscoveryFromJson(strings.NewReader(json))
+
+ if result1.ClusterName != "cluster_name" {
+ t.Fatal("should be set")
+ }
+
+ result2 := ClusterDiscoveryFromJson(strings.NewReader(json))
+ result3 := ClusterDiscoveryFromJson(strings.NewReader(json))
+
+ o.Id = "0"
+ result1.Id = "1"
+ result2.Id = "2"
+ result3.Id = "3"
+ result3.Hostname = "something_diff"
+
+ if !o.IsEqual(result1) {
+ t.Fatal("Should be equal")
+ }
+
+ list := make([]*ClusterDiscovery, 0)
+ list = append(list, &o)
+ list = append(list, result1)
+ list = append(list, result2)
+ list = append(list, result3)
+
+ rlist := FilterClusterDiscovery(list, func(in *ClusterDiscovery) bool {
+ return !o.IsEqual(in)
+ })
+
+ if len(rlist) != 1 {
+ t.Fatal("should only have 1 result")
+ }
+
+ o.AutoFillHostname()
+ o.Hostname = ""
+ o.AutoFillHostname()
+
+ o.AutoFillIpAddress()
+ o.Hostname = ""
+ o.AutoFillIpAddress()
+}
diff --git a/model/cluster_info.go b/model/cluster_info.go
index f76a03c0bc..1e468044e0 100644
--- a/model/cluster_info.go
+++ b/model/cluster_info.go
@@ -7,24 +7,16 @@ import (
"encoding/json"
"io"
"strings"
- "sync"
- "sync/atomic"
)
type ClusterInfo struct {
- Id string `json:"id"`
- Version string `json:"version"`
- ConfigHash string `json:"config_hash"`
- InterNodeUrl string `json:"internode_url"`
- Hostname string `json:"hostname"`
- LastSuccessfulPing int64 `json:"last_ping"`
- Alive int32 `json:"is_alive"`
- Mutex sync.RWMutex `json:"-"`
+ Version string `json:"version"`
+ ConfigHash string `json:"config_hash"`
+ IpAddress string `json:"ipaddress"`
+ Hostname string `json:"hostname"`
}
func (me *ClusterInfo) ToJson() string {
- me.Mutex.RLock()
- defer me.Mutex.RUnlock()
b, err := json.Marshal(me)
if err != nil {
return ""
@@ -41,7 +33,6 @@ func (me *ClusterInfo) Copy() *ClusterInfo {
func ClusterInfoFromJson(data io.Reader) *ClusterInfo {
decoder := json.NewDecoder(data)
var me ClusterInfo
- me.Mutex = sync.RWMutex{}
err := decoder.Decode(&me)
if err == nil {
return &me
@@ -50,38 +41,6 @@ func ClusterInfoFromJson(data io.Reader) *ClusterInfo {
}
}
-func (me *ClusterInfo) SetAlive(alive bool) {
- if alive {
- atomic.StoreInt32(&me.Alive, 1)
- } else {
- atomic.StoreInt32(&me.Alive, 0)
- }
-}
-
-func (me *ClusterInfo) IsAlive() bool {
- return atomic.LoadInt32(&me.Alive) == 1
-}
-
-func (me *ClusterInfo) HaveEstablishedInitialContact() bool {
- me.Mutex.RLock()
- defer me.Mutex.RUnlock()
- if me.Id != "" {
- return true
- }
-
- return false
-}
-
-func (me *ClusterInfo) IdEqualTo(in string) bool {
- me.Mutex.RLock()
- defer me.Mutex.RUnlock()
- if me.Id == in {
- return true
- }
-
- return false
-}
-
func ClusterInfosToJson(objmap []*ClusterInfo) string {
if b, err := json.Marshal(objmap); err != nil {
return ""
diff --git a/model/cluster_info_test.go b/model/cluster_info_test.go
index 038927120f..c019df40ae 100644
--- a/model/cluster_info_test.go
+++ b/model/cluster_info_test.go
@@ -9,33 +9,23 @@ import (
)
func TestClusterInfoJson(t *testing.T) {
- cluster := ClusterInfo{Id: NewId(), InterNodeUrl: NewId(), Hostname: NewId()}
+ cluster := ClusterInfo{IpAddress: NewId(), Hostname: NewId()}
json := cluster.ToJson()
result := ClusterInfoFromJson(strings.NewReader(json))
- if cluster.Id != result.Id {
+ if cluster.IpAddress != result.IpAddress {
t.Fatal("Ids do not match")
}
-
- cluster.SetAlive(true)
- if !cluster.IsAlive() {
- t.Fatal("should be live")
- }
-
- cluster.SetAlive(false)
- if cluster.IsAlive() {
- t.Fatal("should be not live")
- }
}
func TestClusterInfosJson(t *testing.T) {
- cluster := ClusterInfo{Id: NewId(), InterNodeUrl: NewId(), Hostname: NewId()}
+ cluster := ClusterInfo{IpAddress: NewId(), Hostname: NewId()}
clusterInfos := make([]*ClusterInfo, 1)
clusterInfos[0] = &cluster
json := ClusterInfosToJson(clusterInfos)
result := ClusterInfosFromJson(strings.NewReader(json))
- if clusterInfos[0].Id != result[0].Id {
+ if clusterInfos[0].IpAddress != result[0].IpAddress {
t.Fatal("Ids do not match")
}
diff --git a/model/cluster_message.go b/model/cluster_message.go
new file mode 100644
index 0000000000..a6dec2e7fb
--- /dev/null
+++ b/model/cluster_message.go
@@ -0,0 +1,55 @@
+// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package model
+
+import (
+ "encoding/json"
+ "io"
+)
+
+const (
+ CLUSTER_EVENT_PUBLISH = "publish"
+ CLUSTER_EVENT_UPDATE_STATUS = "update_status"
+ CLUSTER_EVENT_INVALIDATE_ALL_CACHES = "inv_all_caches"
+ CLUSTER_EVENT_INVALIDATE_CACHE_FOR_REACTIONS = "inv_reactions"
+ CLUSTER_EVENT_INVALIDATE_CACHE_FOR_WEBHOOK = "inv_webhook"
+ CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_POSTS = "inv_channel_posts"
+ CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_MEMBERS_NOTIFY_PROPS = "inv_channel_members_notify_props"
+ CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_MEMBERS = "inv_channel_members"
+ CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_BY_NAME = "inv_channel_name"
+ CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL = "inv_channel"
+ CLUSTER_EVENT_INVALIDATE_CACHE_FOR_USER = "inv_user"
+ CLUSTER_EVENT_CLEAR_SESSION_CACHE_FOR_USER = "clear_session_user"
+
+ CLUSTER_SEND_BEST_EFFORT = "best_effort"
+ CLUSTER_SEND_RELIABLE = "reliable"
+)
+
+type ClusterMessage struct {
+ Event string `json:"event"`
+ SendType string `json:"-"`
+ WaitForAllToSend bool `json:"-"`
+ Data string `json:"data,omitempty"`
+ Props map[string]string `json:"props,omitempty"`
+}
+
+func (o *ClusterMessage) ToJson() string {
+ b, err := json.Marshal(o)
+ if err != nil {
+ return ""
+ } else {
+ return string(b)
+ }
+}
+
+func ClusterMessageFromJson(data io.Reader) *ClusterMessage {
+ decoder := json.NewDecoder(data)
+ var o ClusterMessage
+ err := decoder.Decode(&o)
+ if err == nil {
+ return &o
+ } else {
+ return nil
+ }
+}
diff --git a/model/cluster_message_test.go b/model/cluster_message_test.go
new file mode 100644
index 0000000000..38603e5778
--- /dev/null
+++ b/model/cluster_message_test.go
@@ -0,0 +1,28 @@
+// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package model
+
+import (
+ "strings"
+ "testing"
+)
+
+func TestClusterMessage(t *testing.T) {
+ m := ClusterMessage{
+ Event: CLUSTER_EVENT_PUBLISH,
+ SendType: CLUSTER_SEND_BEST_EFFORT,
+ Data: "hello",
+ }
+ json := m.ToJson()
+ result := ClusterMessageFromJson(strings.NewReader(json))
+
+ if result.Data != "hello" {
+ t.Fatal()
+ }
+
+ badresult := ClusterMessageFromJson(strings.NewReader("junk"))
+ if badresult != nil {
+ t.Fatal("should not have parsed")
+ }
+}
diff --git a/model/config.go b/model/config.go
index 4e3a3f7cc0..f2b17bcedd 100644
--- a/model/config.go
+++ b/model/config.go
@@ -163,9 +163,14 @@ type ServiceSettings struct {
}
type ClusterSettings struct {
- Enable *bool
- InterNodeListenAddress *string
- InterNodeUrls []string
+ Enable *bool
+ ClusterName *string
+ OverrideHostname *string
+ UseIpAddress *bool
+ UseExperimentalGossip *bool
+ ReadOnlyConfig *bool
+ GossipPort *int
+ StreamingPort *int
}
type MetricsSettings struct {
@@ -1036,18 +1041,44 @@ func (o *Config) SetDefaults() {
*o.ServiceSettings.PostEditTimeLimit = 300
}
- if o.ClusterSettings.InterNodeListenAddress == nil {
- o.ClusterSettings.InterNodeListenAddress = new(string)
- *o.ClusterSettings.InterNodeListenAddress = ":8075"
- }
-
if o.ClusterSettings.Enable == nil {
o.ClusterSettings.Enable = new(bool)
*o.ClusterSettings.Enable = false
}
- if o.ClusterSettings.InterNodeUrls == nil {
- o.ClusterSettings.InterNodeUrls = []string{}
+ if o.ClusterSettings.ClusterName == nil {
+ o.ClusterSettings.ClusterName = new(string)
+ *o.ClusterSettings.ClusterName = ""
+ }
+
+ if o.ClusterSettings.OverrideHostname == nil {
+ o.ClusterSettings.OverrideHostname = new(string)
+ *o.ClusterSettings.OverrideHostname = ""
+ }
+
+ if o.ClusterSettings.UseIpAddress == nil {
+ o.ClusterSettings.UseIpAddress = new(bool)
+ *o.ClusterSettings.UseIpAddress = true
+ }
+
+ if o.ClusterSettings.UseExperimentalGossip == nil {
+ o.ClusterSettings.UseExperimentalGossip = new(bool)
+ *o.ClusterSettings.UseExperimentalGossip = false
+ }
+
+ if o.ClusterSettings.ReadOnlyConfig == nil {
+ o.ClusterSettings.ReadOnlyConfig = new(bool)
+ *o.ClusterSettings.ReadOnlyConfig = true
+ }
+
+ if o.ClusterSettings.GossipPort == nil {
+ o.ClusterSettings.GossipPort = new(int)
+ *o.ClusterSettings.GossipPort = 8074
+ }
+
+ if o.ClusterSettings.StreamingPort == nil {
+ o.ClusterSettings.StreamingPort = new(int)
+ *o.ClusterSettings.StreamingPort = 8075
}
if o.MetricsSettings.ListenAddress == nil {
diff --git a/model/utils.go b/model/utils.go
index d245406832..e7d8bfdacd 100644
--- a/model/utils.go
+++ b/model/utils.go
@@ -17,6 +17,8 @@ import (
"strings"
"time"
+ "net"
+
goi18n "github.com/nicksnyder/go-i18n/i18n"
"github.com/pborman/uuid"
)
@@ -264,6 +266,23 @@ func StringFromJson(data io.Reader) string {
}
}
+func GetServerIpAddress() string {
+ if addrs, err := net.InterfaceAddrs(); err != nil {
+ return ""
+ } else {
+ for _, addr := range addrs {
+
+ if ip, ok := addr.(*net.IPNet); ok && !ip.IP.IsLoopback() {
+ if ip.IP.To4() != nil {
+ return ip.IP.String()
+ }
+ }
+ }
+ }
+
+ return ""
+}
+
func IsLower(s string) bool {
if strings.ToLower(s) == s {
return true
diff --git a/model/utils_test.go b/model/utils_test.go
index 94ee55aa91..bc2aa6ce71 100644
--- a/model/utils_test.go
+++ b/model/utils_test.go
@@ -193,6 +193,12 @@ func TestIsValidAlphaNum(t *testing.T) {
}
}
+func TestGetServerIpAddress(t *testing.T) {
+ if len(GetServerIpAddress()) == 0 {
+ t.Fatal("Should find local ip address")
+ }
+}
+
func TestIsValidAlphaNumHyphenUnderscore(t *testing.T) {
casesWithFormat := []struct {
Input string
diff --git a/store/sql_cluster_discovery_store.go b/store/sql_cluster_discovery_store.go
new file mode 100644
index 0000000000..81d3d6e11b
--- /dev/null
+++ b/store/sql_cluster_discovery_store.go
@@ -0,0 +1,226 @@
+// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package store
+
+import (
+ "github.com/mattermost/platform/model"
+)
+
+type sqlClusterDiscoveryStore struct {
+ *SqlStore
+}
+
+func NewSqlClusterDiscoveryStore(sqlStore *SqlStore) ClusterDiscoveryStore {
+ s := &sqlClusterDiscoveryStore{sqlStore}
+
+ for _, db := range sqlStore.GetAllConns() {
+ table := db.AddTableWithName(model.ClusterDiscovery{}, "ClusterDiscovery").SetKeys(false, "Id")
+ table.ColMap("Id").SetMaxSize(26)
+ table.ColMap("Type").SetMaxSize(64)
+ table.ColMap("ClusterName").SetMaxSize(64)
+ table.ColMap("Hostname").SetMaxSize(512)
+ }
+
+ return s
+}
+
+func (s sqlClusterDiscoveryStore) Save(ClusterDiscovery *model.ClusterDiscovery) StoreChannel {
+
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ ClusterDiscovery.PreSave()
+ if result.Err = ClusterDiscovery.IsValid(); result.Err != nil {
+ storeChannel <- result
+ close(storeChannel)
+ return
+ }
+
+ if err := s.GetMaster().Insert(ClusterDiscovery); err != nil {
+ result.Err = model.NewLocAppError("SqlClusterDiscoveryStore.Save", "Failed to save ClusterDiscovery row", nil, err.Error())
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (s sqlClusterDiscoveryStore) Delete(ClusterDiscovery *model.ClusterDiscovery) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+ result.Data = false
+
+ if count, err := s.GetMaster().SelectInt(
+ `
+ DELETE
+ FROM
+ ClusterDiscovery
+ WHERE
+ Type = :Type
+ AND ClusterName = :ClusterName
+ AND Hostname = :Hostname
+ `,
+ map[string]interface{}{
+ "Type": ClusterDiscovery.Type,
+ "ClusterName": ClusterDiscovery.ClusterName,
+ "Hostname": ClusterDiscovery.Hostname,
+ },
+ ); err != nil {
+ result.Err = model.NewLocAppError("SqlClusterDiscoveryStore.Delete", "Failed to delete", nil, err.Error())
+ } else {
+ if count > 0 {
+ result.Data = true
+ }
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (s sqlClusterDiscoveryStore) Exists(ClusterDiscovery *model.ClusterDiscovery) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+ result.Data = false
+
+ if count, err := s.GetMaster().SelectInt(
+ `
+ SELECT
+ COUNT(*)
+ FROM
+ ClusterDiscovery
+ WHERE
+ Type = :Type
+ AND ClusterName = :ClusterName
+ AND Hostname = :Hostname
+ `,
+ map[string]interface{}{
+ "Type": ClusterDiscovery.Type,
+ "ClusterName": ClusterDiscovery.ClusterName,
+ "Hostname": ClusterDiscovery.Hostname,
+ },
+ ); err != nil {
+ result.Err = model.NewLocAppError("SqlClusterDiscoveryStore.Exists", "Failed to check if it exists", nil, err.Error())
+ } else {
+ if count > 0 {
+ result.Data = true
+ }
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (s sqlClusterDiscoveryStore) GetAll(ClusterDiscoveryType, clusterName string) StoreChannel {
+
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ lastPingAt := model.GetMillis() - model.CDS_OFFLINE_AFTER_MILLIS
+
+ var list []*model.ClusterDiscovery
+ if _, err := s.GetMaster().Select(
+ &list,
+ `
+ SELECT
+ *
+ FROM
+ ClusterDiscovery
+ WHERE
+ Type = :ClusterDiscoveryType
+ AND ClusterName = :ClusterName
+ AND LastPingAt > :LastPingAt
+ `,
+ map[string]interface{}{
+ "ClusterDiscoveryType": ClusterDiscoveryType,
+ "ClusterName": clusterName,
+ "LastPingAt": lastPingAt,
+ },
+ ); err != nil {
+ result.Err = model.NewLocAppError("SqlClusterDiscoveryStore.GetAllForType", "Failed to get all disoery rows", nil, err.Error())
+ } else {
+ result.Data = list
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (s sqlClusterDiscoveryStore) SetLastPingAt(ClusterDiscovery *model.ClusterDiscovery) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ if _, err := s.GetMaster().Exec(
+ `
+ UPDATE ClusterDiscovery
+ SET
+ LastPingAt = :LastPingAt
+ WHERE
+ Type = :Type
+ AND ClusterName = :ClusterName
+ AND Hostname = :Hostname
+ `,
+ map[string]interface{}{
+ "LastPingAt": model.GetMillis(),
+ "Type": ClusterDiscovery.Type,
+ "ClusterName": ClusterDiscovery.ClusterName,
+ "Hostname": ClusterDiscovery.Hostname,
+ },
+ ); err != nil {
+ result.Err = model.NewLocAppError("SqlClusterDiscoveryStore.GetAllForType", "Failed to update last ping at", nil, err.Error())
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (s sqlClusterDiscoveryStore) Cleanup() StoreChannel {
+
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ if _, err := s.GetMaster().Exec(
+ `
+ DELETE FROM ClusterDiscovery
+ WHERE
+ LastPingAt < :LastPingAt
+ `,
+ map[string]interface{}{
+ "LastPingAt": model.GetMillis() - model.CDS_OFFLINE_AFTER_MILLIS,
+ },
+ ); err != nil {
+ result.Err = model.NewLocAppError("SqlClusterDiscoveryStore.Save", "Failed to save ClusterDiscovery row", nil, err.Error())
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
diff --git a/store/sql_cluster_discovery_store_test.go b/store/sql_cluster_discovery_store_test.go
new file mode 100644
index 0000000000..159d3b4db5
--- /dev/null
+++ b/store/sql_cluster_discovery_store_test.go
@@ -0,0 +1,201 @@
+// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package store
+
+import (
+ "testing"
+
+ "time"
+
+ "github.com/mattermost/platform/model"
+)
+
+func TestSqlClusterDiscoveryStore(t *testing.T) {
+ Setup()
+
+ discovery := &model.ClusterDiscovery{
+ ClusterName: "cluster_name",
+ Hostname: "hostname" + model.NewId(),
+ Type: "test_test",
+ }
+
+ if result := <-store.ClusterDiscovery().Save(discovery); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+
+ if result := <-store.ClusterDiscovery().Cleanup(); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+}
+
+func TestSqlClusterDiscoveryStoreDelete(t *testing.T) {
+ Setup()
+
+ discovery := &model.ClusterDiscovery{
+ ClusterName: "cluster_name",
+ Hostname: "hostname" + model.NewId(),
+ Type: "test_test",
+ }
+
+ if result := <-store.ClusterDiscovery().Save(discovery); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+
+ if result := <-store.ClusterDiscovery().Delete(discovery); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+}
+
+func TestSqlClusterDiscoveryStoreLastPing(t *testing.T) {
+ Setup()
+
+ discovery := &model.ClusterDiscovery{
+ ClusterName: "cluster_name_lastPing",
+ Hostname: "hostname" + model.NewId(),
+ Type: "test_test_lastPing" + model.NewId(),
+ }
+
+ if result := <-store.ClusterDiscovery().Save(discovery); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+
+ if result := <-store.ClusterDiscovery().SetLastPingAt(discovery); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+
+ ttime := model.GetMillis()
+
+ time.Sleep(1 * time.Second)
+
+ if result := <-store.ClusterDiscovery().SetLastPingAt(discovery); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+
+ if result := <-store.ClusterDiscovery().GetAll(discovery.Type, "cluster_name_lastPing"); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ list := result.Data.([]*model.ClusterDiscovery)
+
+ if len(list) != 1 {
+ t.Fatal("should only be 1 items")
+ return
+ }
+
+ if list[0].LastPingAt-ttime < 500 {
+ t.Fatal("failed to set time")
+ }
+ }
+
+ discovery2 := &model.ClusterDiscovery{
+ ClusterName: "cluster_name_missing",
+ Hostname: "hostname" + model.NewId(),
+ Type: "test_test_missing",
+ }
+
+ if result := <-store.ClusterDiscovery().SetLastPingAt(discovery2); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+}
+
+func TestSqlClusterDiscoveryStoreExists(t *testing.T) {
+ Setup()
+
+ discovery := &model.ClusterDiscovery{
+ ClusterName: "cluster_name_Exists",
+ Hostname: "hostname" + model.NewId(),
+ Type: "test_test_Exists" + model.NewId(),
+ }
+
+ if result := <-store.ClusterDiscovery().Save(discovery); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+
+ if result := <-store.ClusterDiscovery().Exists(discovery); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ val := result.Data.(bool)
+ if !val {
+ t.Fatal("should be true")
+ }
+ }
+
+ discovery.ClusterName = "cluster_name_Exists2"
+
+ if result := <-store.ClusterDiscovery().Exists(discovery); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ val := result.Data.(bool)
+ if val {
+ t.Fatal("should be true")
+ }
+ }
+}
+
+func TestSqlClusterDiscoveryGetStore(t *testing.T) {
+ Setup()
+
+ testType1 := model.NewId()
+
+ discovery1 := &model.ClusterDiscovery{
+ ClusterName: "cluster_name",
+ Hostname: "hostname1",
+ Type: testType1,
+ }
+ Must(store.ClusterDiscovery().Save(discovery1))
+
+ discovery2 := &model.ClusterDiscovery{
+ ClusterName: "cluster_name",
+ Hostname: "hostname2",
+ Type: testType1,
+ }
+ Must(store.ClusterDiscovery().Save(discovery2))
+
+ discovery3 := &model.ClusterDiscovery{
+ ClusterName: "cluster_name",
+ Hostname: "hostname3",
+ Type: testType1,
+ CreateAt: 1,
+ LastPingAt: 1,
+ }
+ Must(store.ClusterDiscovery().Save(discovery3))
+
+ testType2 := model.NewId()
+
+ discovery4 := &model.ClusterDiscovery{
+ ClusterName: "cluster_name",
+ Hostname: "hostname1",
+ Type: testType2,
+ }
+ Must(store.ClusterDiscovery().Save(discovery4))
+
+ if result := <-store.ClusterDiscovery().GetAll(testType1, "cluster_name"); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ list := result.Data.([]*model.ClusterDiscovery)
+
+ if len(list) != 2 {
+ t.Fatal("Should only have returned 2")
+ }
+ }
+
+ if result := <-store.ClusterDiscovery().GetAll(testType2, "cluster_name"); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ list := result.Data.([]*model.ClusterDiscovery)
+
+ if len(list) != 1 {
+ t.Fatal("Should only have returned 1")
+ }
+ }
+
+ if result := <-store.ClusterDiscovery().GetAll(model.NewId(), "cluster_name"); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ list := result.Data.([]*model.ClusterDiscovery)
+
+ if len(list) != 0 {
+ t.Fatal("shouldn't be any")
+ }
+ }
+}
diff --git a/store/sql_store.go b/store/sql_store.go
index 1a681fe817..ee2c678f63 100644
--- a/store/sql_store.go
+++ b/store/sql_store.go
@@ -4,6 +4,7 @@
package store
import (
+ "context"
"crypto/aes"
"crypto/cipher"
"crypto/hmac"
@@ -35,6 +36,8 @@ const (
INDEX_TYPE_FULL_TEXT = "full_text"
INDEX_TYPE_DEFAULT = "default"
MAX_DB_CONN_LIFETIME = 60
+ DB_PING_ATTEMPTS = 18
+ DB_PING_TIMEOUT_SECS = 10
)
const (
@@ -66,31 +69,32 @@ const (
)
type SqlStore struct {
- master *gorp.DbMap
- replicas []*gorp.DbMap
- searchReplicas []*gorp.DbMap
- team TeamStore
- channel ChannelStore
- post PostStore
- user UserStore
- audit AuditStore
- compliance ComplianceStore
- session SessionStore
- oauth OAuthStore
- system SystemStore
- webhook WebhookStore
- command CommandStore
- preference PreferenceStore
- license LicenseStore
- token TokenStore
- emoji EmojiStore
- status StatusStore
- fileInfo FileInfoStore
- reaction ReactionStore
- jobStatus JobStatusStore
- SchemaVersion string
- rrCounter int64
- srCounter int64
+ master *gorp.DbMap
+ replicas []*gorp.DbMap
+ searchReplicas []*gorp.DbMap
+ team TeamStore
+ channel ChannelStore
+ post PostStore
+ user UserStore
+ audit AuditStore
+ clusterDiscovery ClusterDiscoveryStore
+ compliance ComplianceStore
+ session SessionStore
+ oauth OAuthStore
+ system SystemStore
+ webhook WebhookStore
+ command CommandStore
+ preference PreferenceStore
+ license LicenseStore
+ token TokenStore
+ emoji EmojiStore
+ status StatusStore
+ fileInfo FileInfoStore
+ reaction ReactionStore
+ jobStatus JobStatusStore
+ SchemaVersion string
+ rrCounter int64
+ srCounter int64
}
func initConnection() *SqlStore {
@@ -139,6 +143,7 @@ func NewSqlStore() Store {
sqlStore.post = NewSqlPostStore(sqlStore)
sqlStore.user = NewSqlUserStore(sqlStore)
sqlStore.audit = NewSqlAuditStore(sqlStore)
+ sqlStore.clusterDiscovery = NewSqlClusterDiscoveryStore(sqlStore)
sqlStore.compliance = NewSqlComplianceStore(sqlStore)
sqlStore.session = NewSqlSessionStore(sqlStore)
sqlStore.oauth = NewSqlOAuthStore(sqlStore)
@@ -197,12 +202,23 @@ func setupConnection(con_type string, driver string, dataSource string, maxIdle
os.Exit(EXIT_DB_OPEN)
}
- l4g.Info(utils.T("store.sql.pinging.info"), con_type)
- err = db.Ping()
- if err != nil {
- l4g.Critical(utils.T("store.sql.ping.critical"), err)
- time.Sleep(time.Second)
- os.Exit(EXIT_PING)
+ for i := 0; i < DB_PING_ATTEMPTS; i++ {
+ l4g.Info("Pinging SQL %v database", con_type)
+ ctx, cancel := context.WithTimeout(context.Background(), DB_PING_TIMEOUT_SECS*time.Second)
+ defer cancel()
+ err = db.PingContext(ctx)
+ if err == nil {
+ break
+ } else {
+ if i == DB_PING_ATTEMPTS-1 {
+ l4g.Critical("Failed to ping DB, server will exit err=%v", err)
+ time.Sleep(time.Second)
+ os.Exit(EXIT_PING)
+ } else {
+ l4g.Error("Failed to ping DB retrying in %v seconds err=%v", DB_PING_TIMEOUT_SECS, err)
+ time.Sleep(DB_PING_TIMEOUT_SECS * time.Second)
+ }
+ }
}
db.SetMaxIdleConns(maxIdle)
@@ -692,6 +708,10 @@ func (ss *SqlStore) Audit() AuditStore {
return ss.audit
}
+func (ss *SqlStore) ClusterDiscovery() ClusterDiscoveryStore {
+ return ss.clusterDiscovery
+}
+
func (ss *SqlStore) Compliance() ComplianceStore {
return ss.compliance
}
diff --git a/store/store.go b/store/store.go
index cd7792ce17..23c6acd37d 100644
--- a/store/store.go
+++ b/store/store.go
@@ -34,6 +34,7 @@ type Store interface {
Post() PostStore
User() UserStore
Audit() AuditStore
+ ClusterDiscovery() ClusterDiscoveryStore
Compliance() ComplianceStore
Session() SessionStore
OAuth() OAuthStore
@@ -239,6 +240,15 @@ type AuditStore interface {
PermanentDeleteByUser(userId string) StoreChannel
}
+type ClusterDiscoveryStore interface {
+ Save(discovery *model.ClusterDiscovery) StoreChannel
+ Delete(discovery *model.ClusterDiscovery) StoreChannel
+ Exists(discovery *model.ClusterDiscovery) StoreChannel
+ GetAll(discoveryType, clusterName string) StoreChannel
+ SetLastPingAt(discovery *model.ClusterDiscovery) StoreChannel
+ Cleanup() StoreChannel
+}
+
type ComplianceStore interface {
Save(compliance *model.Compliance) StoreChannel
Update(compliance *model.Compliance) StoreChannel
diff --git a/utils/config_test.go b/utils/config_test.go
index bce85d2aeb..3032766ecd 100644
--- a/utils/config_test.go
+++ b/utils/config_test.go
@@ -6,6 +6,7 @@ package utils
import (
"os"
"testing"
+ "time"
"github.com/mattermost/platform/model"
)
@@ -59,7 +60,25 @@ func TestConfigFromEnviroVars(t *testing.T) {
if Cfg.TeamSettings.SiteName != "Mattermost" {
t.Fatal("should have been reset")
}
+}
+func TestRedirectStdLog(t *testing.T) {
+ TranslationsPreInit()
+ LoadConfig("config.json")
+ InitTranslations(Cfg.LocalizationSettings)
+
+ log := NewRedirectStdLog("test", false)
+
+ log.Println("[DEBUG] this is a message")
+ log.Println("[DEBG] this is a message")
+ log.Println("[WARN] this is a message")
+ log.Println("[ERROR] this is a message")
+ log.Println("[EROR] this is a message")
+ log.Println("[ERR] this is a message")
+ log.Println("[INFO] this is a message")
+ log.Println("this is a message")
+
+ time.Sleep(time.Second * 1)
}
func TestAddRemoveConfigListener(t *testing.T) {
diff --git a/utils/redirect_std_log.go b/utils/redirect_std_log.go
new file mode 100644
index 0000000000..4fbfcf8ec2
--- /dev/null
+++ b/utils/redirect_std_log.go
@@ -0,0 +1,65 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package utils
+
+import (
+ "bufio"
+ "log"
+ "os"
+ "strings"
+
+ l4g "github.com/alecthomas/log4go"
+)
+
+type RedirectStdLog struct {
+ reader *os.File
+ writer *os.File
+ system string
+ ignoreDebug bool
+}
+
+func NewRedirectStdLog(system string, ignoreDebug bool) *log.Logger {
+ r, w, _ := os.Pipe()
+ logger := &RedirectStdLog{
+ reader: r,
+ writer: w,
+ system: system,
+ ignoreDebug: ignoreDebug,
+ }
+
+ go func(l *RedirectStdLog) {
+ scanner := bufio.NewScanner(l.reader)
+ for scanner.Scan() {
+ line := scanner.Text()
+
+ if strings.Index(line, "[DEBUG]") == 0 {
+ if !ignoreDebug {
+ l4g.Debug("%v%v", system, line[7:])
+ }
+ } else if strings.Index(line, "[DEBG]") == 0 {
+ if !ignoreDebug {
+ l4g.Debug("%v%v", system, line[6:])
+ }
+ } else if strings.Index(line, "[WARN]") == 0 {
+ l4g.Info("%v%v", system, line[6:])
+ } else if strings.Index(line, "[ERROR]") == 0 {
+ l4g.Error("%v%v", system, line[7:])
+ } else if strings.Index(line, "[EROR]") == 0 {
+ l4g.Error("%v%v", system, line[6:])
+ } else if strings.Index(line, "[ERR]") == 0 {
+ l4g.Error("%v%v", system, line[5:])
+ } else if strings.Index(line, "[INFO]") == 0 {
+ l4g.Info("%v%v", system, line[6:])
+ } else {
+ l4g.Info("%v %v", system, line)
+ }
+ }
+ }(logger)
+
+ return log.New(logger.writer, "", 0)
+}
+
+func (l *RedirectStdLog) Write(p []byte) (n int, err error) {
+ return l.writer.Write(p)
+}
diff --git a/webapp/components/admin_console/cluster_settings.jsx b/webapp/components/admin_console/cluster_settings.jsx
index 14bc46240c..0c3346c5a2 100644
--- a/webapp/components/admin_console/cluster_settings.jsx
+++ b/webapp/components/admin_console/cluster_settings.jsx
@@ -21,21 +21,18 @@ export default class ClusterSettings extends AdminSettings {
this.getConfigFromState = this.getConfigFromState.bind(this);
this.renderSettings = this.renderSettings.bind(this);
+ this.overrideHandleChange = this.overrideHandleChange.bind(this);
}
getConfigFromState(config) {
- config.ClusterSettings.Enable = this.state.enable;
- config.ClusterSettings.InterNodeListenAddress = this.state.interNodeListenAddress;
-
- config.ClusterSettings.InterNodeUrls = this.state.interNodeUrls.split(',');
- config.ClusterSettings.InterNodeUrls = config.ClusterSettings.InterNodeUrls.map((url) => {
- return url.trim();
- });
-
- if (config.ClusterSettings.InterNodeUrls.length === 1 && config.ClusterSettings.InterNodeUrls[0] === '') {
- config.ClusterSettings.InterNodeUrls = [];
- }
-
+ config.ClusterSettings.Enable = this.state.Enable;
+ config.ClusterSettings.ClusterName = this.state.ClusterName;
+ config.ClusterSettings.OverrideHostname = this.state.OverrideHostname;
+ config.ClusterSettings.UseIpAddress = this.state.UseIpAddress;
+ config.ClusterSettings.UseExperimentalGossip = this.state.UseExperimentalGossip;
+ config.ClusterSettings.ReadOnlyConfig = this.state.ReadOnlyConfig;
+ config.ClusterSettings.GossipPort = this.parseIntNonZero(this.state.GossipPort, 8074);
+ config.ClusterSettings.StreamingPort = this.parseIntNonZero(this.state.StreamingPort, 8075);
return config;
}
@@ -43,9 +40,14 @@ export default class ClusterSettings extends AdminSettings {
const settings = config.ClusterSettings;
return {
- enable: settings.Enable,
- interNodeUrls: settings.InterNodeUrls.join(', '),
- interNodeListenAddress: settings.InterNodeListenAddress,
+ Enable: settings.Enable,
+ ClusterName: settings.ClusterName,
+ OverrideHostname: settings.OverrideHostname,
+ UseIpAddress: settings.UseIpAddress,
+ UseExperimentalGossip: settings.UseExperimentalGossip,
+ ReadOnlyConfig: settings.ReadOnlyConfig,
+ GossipPort: settings.GossipPort,
+ StreamingPort: settings.StreamingPort,
showWarning: false
};
}
@@ -101,7 +103,7 @@ export default class ClusterSettings extends AdminSettings {
className='alert alert-warning'
>
-