Migrate cluster to use Server struct directly. (#10101)

This commit is contained in:
Christopher Speller
2019-01-15 09:09:25 -08:00
committed by GitHub
parent 004e7d383b
commit ae76d27b7d
6 changed files with 33 additions and 17 deletions

View File

@@ -4,28 +4,27 @@
package app
import (
"github.com/mattermost/mattermost-server/mlog"
"github.com/mattermost/mattermost-server/model"
)
// Registers a given function to be called when the cluster leader may have changed. Returns a unique ID for the
// listener which can later be used to remove it. If clustering is not enabled in this build, the callback will never
// be called.
func (a *App) AddClusterLeaderChangedListener(listener func()) string {
func (s *Server) AddClusterLeaderChangedListener(listener func()) string {
id := model.NewId()
a.Srv.clusterLeaderListeners.Store(id, listener)
s.clusterLeaderListeners.Store(id, listener)
return id
}
// Removes a listener function by the unique ID returned when AddConfigListener was called
func (a *App) RemoveClusterLeaderChangedListener(id string) {
a.Srv.clusterLeaderListeners.Delete(id)
func (s *Server) RemoveClusterLeaderChangedListener(id string) {
s.clusterLeaderListeners.Delete(id)
}
func (a *App) InvokeClusterLeaderChangedListeners() {
mlog.Info("Cluster leader changed. Invoking ClusterLeaderChanged listeners.")
a.Srv.Go(func() {
a.Srv.clusterLeaderListeners.Range(func(_, listener interface{}) bool {
func (s *Server) InvokeClusterLeaderChangedListeners() {
s.Log.Info("Cluster leader changed. Invoking ClusterLeaderChanged listeners.")
s.Go(func() {
s.clusterLeaderListeners.Range(func(_, listener interface{}) bool {
listener.(func())()
return true
})

View File

@@ -17,9 +17,9 @@ func RegisterAccountMigrationInterface(f func(*App) einterfaces.AccountMigration
accountMigrationInterface = f
}
var clusterInterface func(*App) einterfaces.ClusterInterface
var clusterInterface func(*Server) einterfaces.ClusterInterface
func RegisterClusterInterface(f func(*App) einterfaces.ClusterInterface) {
func RegisterClusterInterface(f func(*Server) einterfaces.ClusterInterface) {
clusterInterface = f
}
@@ -135,6 +135,6 @@ func (s *Server) initEnterprise() {
s.DataRetention = dataRetentionInterface(s.FakeApp())
}
if clusterInterface != nil {
s.Cluster = clusterInterface(s.FakeApp())
s.Cluster = clusterInterface(s)
}
}

View File

@@ -40,6 +40,18 @@ func RunJobs(s *Server) {
s.runjobs = true
}
func JoinCluster(s *Server) {
s.joinCluster = true
}
func StartMetrics(s *Server) {
s.startMetrics = true
}
func StartElasticsearch(s *Server) {
s.startElasticsearch = true
}
func DisableConfigWatch(s *Server) {
s.disableConfigWatch = true
}

View File

@@ -112,6 +112,10 @@ type Server struct {
Log *mlog.Logger
joinCluster bool
startMetrics bool
startElasticsearch bool
AccountMigration einterfaces.AccountMigrationInterface
Cluster einterfaces.ClusterInterface
Compliance einterfaces.ComplianceInterface
@@ -214,16 +218,16 @@ func NewServer(options ...Option) (*Server, error) {
mlog.Error(fmt.Sprint("Error to reset the server status.", result.Err.Error()))
}
if s.Cluster != nil {
if s.joinCluster && s.Cluster != nil {
s.FakeApp().RegisterAllClusterMessageHandlers()
s.Cluster.StartInterNodeCommunication()
}
if s.Metrics != nil {
if s.startMetrics && s.Metrics != nil {
s.Metrics.StartServer()
}
if s.Elasticsearch != nil {
if s.startElasticsearch && s.Elasticsearch != nil {
s.StartElasticsearch()
}

View File

@@ -86,7 +86,7 @@ func (s *Server) RunOldAppInitalization() error {
a.EnsureDiagnosticId()
a.regenerateClientConfig()
a.Srv.clusterLeaderListenerId = a.AddClusterLeaderChangedListener(func() {
a.Srv.clusterLeaderListenerId = a.Srv.AddClusterLeaderChangedListener(func() {
mlog.Info("Cluster leader changed. Determining if job schedulers should be running:", mlog.Bool("isLeader", a.IsLeader()))
if a.Srv.Jobs != nil {
a.Srv.Jobs.Schedulers.HandleClusterLeaderChange(a.IsLeader())
@@ -156,7 +156,7 @@ func (s *Server) RunOldAppShutdown() {
a.StopPushNotificationsHubWorkers()
a.ShutDownPlugins()
a.RemoveLicenseListener(s.licenseListenerId)
a.RemoveClusterLeaderChangedListener(s.clusterLeaderListenerId)
s.RemoveClusterLeaderChangedListener(s.clusterLeaderListenerId)
}
// A temporary bridge to deal with cases where the code is so tighly coupled that

View File

@@ -47,6 +47,7 @@ func runServer(configFileLocation string, disableConfigWatch bool, usedPlatform
options := []app.Option{
app.ConfigFile(configFileLocation),
app.RunJobs,
app.JoinCluster,
}
if disableConfigWatch {
options = append(options, app.DisableConfigWatch)