mirror of
https://github.com/mattermost/mattermost.git
synced 2025-02-25 18:55:24 -06:00
PLT-7542 Converting LDAP sync to the job server (#7452)
* PLT-7542 Converting LDAP sync to the job server * Fixing minor issues * Fixing build failure * Translate error message * Translate error message * Translate error message * Translate error message * Fixing merge * Fixing bad merge
This commit is contained in:
@@ -102,8 +102,6 @@ func (a *App) initEnterprise() {
|
||||
if err := utils.ValidateLdapFilter(cfg, a.Ldap); err != nil {
|
||||
panic(utils.T(err.Id))
|
||||
}
|
||||
|
||||
a.Ldap.StartLdapSyncJob()
|
||||
})
|
||||
}
|
||||
if metricsInterface != nil {
|
||||
|
||||
@@ -15,7 +15,7 @@ func (a *App) SyncLdap() {
|
||||
go func() {
|
||||
if utils.IsLicensed() && *utils.License().Features.LDAP && *utils.Cfg.LdapSettings.Enable {
|
||||
if ldapI := a.Ldap; ldapI != nil {
|
||||
ldapI.SyncNow()
|
||||
ldapI.StartSynchronizeJob(false)
|
||||
} else {
|
||||
l4g.Error("%v", model.NewAppError("SyncLdap", "ent.ldap.disabled.app_error", nil, "", http.StatusNotImplemented).Error())
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/mattermost/mattermost-server/model"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
@@ -32,8 +33,9 @@ func ldapSyncCmdF(cmd *cobra.Command, args []string) error {
|
||||
}
|
||||
|
||||
if ldapI := a.Ldap; ldapI != nil {
|
||||
if err := ldapI.Syncronize(); err != nil {
|
||||
CommandPrintErrorln("ERROR: AD/LDAP Synchronization Failed")
|
||||
job, err := ldapI.StartSynchronizeJob(true)
|
||||
if err != nil || job.Status == model.JOB_STATUS_ERROR || job.Status == model.JOB_STATUS_CANCELED {
|
||||
CommandPrintErrorln("ERROR: AD/LDAP Synchronization please check the server logs")
|
||||
} else {
|
||||
CommandPrettyPrintln("SUCCESS: AD/LDAP Synchronization Complete")
|
||||
}
|
||||
|
||||
23
einterfaces/jobs/ldap_sync.go
Normal file
23
einterfaces/jobs/ldap_sync.go
Normal file
@@ -0,0 +1,23 @@
|
||||
// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
|
||||
// See License.txt for license information.
|
||||
|
||||
package jobs
|
||||
|
||||
import (
|
||||
"github.com/mattermost/mattermost-server/model"
|
||||
)
|
||||
|
||||
type LdapSyncInterface interface {
|
||||
MakeWorker() model.Worker
|
||||
MakeScheduler() model.Scheduler
|
||||
}
|
||||
|
||||
var theLdapSyncInterface LdapSyncInterface
|
||||
|
||||
func RegisterLdapSyncInterface(newInterface LdapSyncInterface) {
|
||||
theLdapSyncInterface = newInterface
|
||||
}
|
||||
|
||||
func GetLdapSyncInterface() LdapSyncInterface {
|
||||
return theLdapSyncInterface
|
||||
}
|
||||
@@ -4,6 +4,8 @@
|
||||
package einterfaces
|
||||
|
||||
import (
|
||||
"github.com/go-ldap/ldap"
|
||||
|
||||
"github.com/mattermost/mattermost-server/model"
|
||||
)
|
||||
|
||||
@@ -14,9 +16,10 @@ type LdapInterface interface {
|
||||
CheckPassword(id string, password string) *model.AppError
|
||||
SwitchToLdap(userId, ldapId, ldapPassword string) *model.AppError
|
||||
ValidateFilter(filter string) *model.AppError
|
||||
Syncronize() *model.AppError
|
||||
StartLdapSyncJob()
|
||||
SyncNow()
|
||||
StartSynchronizeJob(waitForJobToFinish bool) (*model.Job, *model.AppError)
|
||||
RunTest() *model.AppError
|
||||
GetAllLdapUsers() ([]*model.User, *model.AppError)
|
||||
UserFromLdapUser(ldapUser *ldap.Entry) *model.User
|
||||
UserHasUpdateFromLdap(existingUser *model.User, currentLdapUser *model.User) bool
|
||||
UpdateLdapUser(existingUser *model.User, currentLdapUser *model.User) *model.User
|
||||
}
|
||||
|
||||
@@ -3687,6 +3687,14 @@
|
||||
"id": "ent.compliance.run_started.info",
|
||||
"translation": "Compliance export started for job '{{.JobName}}' at '{{.FilePath}}'"
|
||||
},
|
||||
{
|
||||
"id": "ent.ldap.sync_worker.create_index_job.error",
|
||||
"translation": "LDAP sync worker failed to create the sync job"
|
||||
},
|
||||
{
|
||||
"id": "ent.ldap.sync.index_job_failed.error",
|
||||
"translation": "LDAP sync worker failed due to the sync job failing"
|
||||
},
|
||||
{
|
||||
"id": "ent.elasticsearch.aggregator_worker.create_index_job.error",
|
||||
"translation": "Elasticsearch aggregator worker failed to create the indexing job"
|
||||
|
||||
12
jobs/jobs.go
12
jobs/jobs.go
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
l4g "github.com/alecthomas/log4go"
|
||||
"github.com/mattermost/mattermost-server/model"
|
||||
"github.com/mattermost/mattermost-server/utils"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -37,6 +38,14 @@ func CreateJob(jobType string, jobData map[string]string) (*model.Job, *model.Ap
|
||||
return &job, nil
|
||||
}
|
||||
|
||||
func GetJob(id string) (*model.Job, *model.AppError) {
|
||||
if result := <-Srv.Store.Job().Get(id); result.Err != nil {
|
||||
return nil, result.Err
|
||||
} else {
|
||||
return result.Data.(*model.Job), nil
|
||||
}
|
||||
}
|
||||
|
||||
func ClaimJob(job *model.Job) (bool, *model.AppError) {
|
||||
if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
|
||||
return false, result.Err
|
||||
@@ -73,7 +82,8 @@ func SetJobError(job *model.Job, jobError *model.AppError) *model.AppError {
|
||||
if job.Data == nil {
|
||||
job.Data = make(map[string]string)
|
||||
}
|
||||
job.Data["error"] = jobError.Error()
|
||||
jobError.Translate(utils.T)
|
||||
job.Data["error"] = jobError.Message + " (" + jobError.DetailedError + ")"
|
||||
|
||||
if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
|
||||
return result.Err
|
||||
|
||||
@@ -12,21 +12,23 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
WATCHER_POLLING_INTERVAL = 15000
|
||||
DEFAULT_WATCHER_POLLING_INTERVAL = 15000
|
||||
)
|
||||
|
||||
type Watcher struct {
|
||||
workers *Workers
|
||||
|
||||
stop chan bool
|
||||
stopped chan bool
|
||||
stop chan bool
|
||||
stopped chan bool
|
||||
pollingInterval int
|
||||
}
|
||||
|
||||
func MakeWatcher(workers *Workers) *Watcher {
|
||||
func MakeWatcher(workers *Workers, pollingInterval int) *Watcher {
|
||||
return &Watcher{
|
||||
stop: make(chan bool, 1),
|
||||
stopped: make(chan bool, 1),
|
||||
workers: workers,
|
||||
stop: make(chan bool, 1),
|
||||
stopped: make(chan bool, 1),
|
||||
pollingInterval: pollingInterval,
|
||||
workers: workers,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,7 +38,7 @@ func (watcher *Watcher) Start() {
|
||||
// Delay for some random number of milliseconds before starting to ensure that multiple
|
||||
// instances of the jobserver don't poll at a time too close to each other.
|
||||
rand.Seed(time.Now().UTC().UnixNano())
|
||||
_ = <-time.After(time.Duration(rand.Intn(WATCHER_POLLING_INTERVAL)) * time.Millisecond)
|
||||
_ = <-time.After(time.Duration(rand.Intn(watcher.pollingInterval)) * time.Millisecond)
|
||||
|
||||
defer func() {
|
||||
l4g.Debug("Watcher Finished")
|
||||
@@ -48,7 +50,7 @@ func (watcher *Watcher) Start() {
|
||||
case <-watcher.stop:
|
||||
l4g.Debug("Watcher: Received stop signal")
|
||||
return
|
||||
case <-time.After(WATCHER_POLLING_INTERVAL * time.Millisecond):
|
||||
case <-time.After(time.Duration(watcher.pollingInterval) * time.Millisecond):
|
||||
watcher.PollAndNotify()
|
||||
}
|
||||
}
|
||||
@@ -88,6 +90,13 @@ func (watcher *Watcher) PollAndNotify() {
|
||||
default:
|
||||
}
|
||||
}
|
||||
} else if job.Type == model.JOB_TYPE_LDAP_SYNC {
|
||||
if watcher.workers.LdapSync != nil {
|
||||
select {
|
||||
case watcher.workers.LdapSync.JobChannel() <- *job:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ type Schedulers struct {
|
||||
|
||||
DataRetention model.Scheduler
|
||||
ElasticsearchAggregation model.Scheduler
|
||||
LdapSync model.Scheduler
|
||||
|
||||
listenerId string
|
||||
}
|
||||
@@ -33,6 +34,10 @@ func InitSchedulers() *Schedulers {
|
||||
schedulers.ElasticsearchAggregation = elasticsearchAggregatorInterface.MakeScheduler()
|
||||
}
|
||||
|
||||
if ldaySyncInterface := ejobs.GetLdapSyncInterface(); ldaySyncInterface != nil {
|
||||
schedulers.LdapSync = ldaySyncInterface.MakeScheduler()
|
||||
}
|
||||
|
||||
return schedulers
|
||||
}
|
||||
|
||||
@@ -47,6 +52,10 @@ func (schedulers *Schedulers) Start() *Schedulers {
|
||||
if schedulers.ElasticsearchAggregation != nil && *utils.Cfg.ElasticsearchSettings.EnableIndexing {
|
||||
go schedulers.ElasticsearchAggregation.Run()
|
||||
}
|
||||
|
||||
if schedulers.LdapSync != nil && *utils.Cfg.LdapSettings.Enable {
|
||||
go schedulers.LdapSync.Run()
|
||||
}
|
||||
})
|
||||
|
||||
schedulers.listenerId = utils.AddConfigListener(schedulers.handleConfigChange)
|
||||
@@ -70,6 +79,14 @@ func (schedulers *Schedulers) handleConfigChange(oldConfig *model.Config, newCon
|
||||
schedulers.ElasticsearchAggregation.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
if schedulers.LdapSync != nil {
|
||||
if !*oldConfig.LdapSettings.Enable && *newConfig.LdapSettings.Enable {
|
||||
go schedulers.LdapSync.Run()
|
||||
} else if *oldConfig.LdapSettings.Enable && !*newConfig.LdapSettings.Enable {
|
||||
schedulers.LdapSync.Stop()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (schedulers *Schedulers) Stop() *Schedulers {
|
||||
@@ -83,6 +100,10 @@ func (schedulers *Schedulers) Stop() *Schedulers {
|
||||
schedulers.ElasticsearchAggregation.Stop()
|
||||
}
|
||||
|
||||
if schedulers.LdapSync != nil && *utils.Cfg.LdapSettings.Enable {
|
||||
schedulers.LdapSync.Stop()
|
||||
}
|
||||
|
||||
l4g.Info("Stopped schedulers")
|
||||
|
||||
return schedulers
|
||||
|
||||
@@ -14,18 +14,19 @@ import (
|
||||
|
||||
type Workers struct {
|
||||
startOnce sync.Once
|
||||
watcher *Watcher
|
||||
Watcher *Watcher
|
||||
|
||||
DataRetention model.Worker
|
||||
ElasticsearchIndexing model.Worker
|
||||
ElasticsearchAggregation model.Worker
|
||||
LdapSync model.Worker
|
||||
|
||||
listenerId string
|
||||
}
|
||||
|
||||
func InitWorkers() *Workers {
|
||||
workers := &Workers{}
|
||||
workers.watcher = MakeWatcher(workers)
|
||||
workers.Watcher = MakeWatcher(workers, DEFAULT_WATCHER_POLLING_INTERVAL)
|
||||
|
||||
if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil {
|
||||
workers.DataRetention = dataRetentionInterface.MakeWorker()
|
||||
@@ -39,6 +40,10 @@ func InitWorkers() *Workers {
|
||||
workers.ElasticsearchAggregation = elasticsearchAggregatorInterface.MakeWorker()
|
||||
}
|
||||
|
||||
if ldapSyncInterface := ejobs.GetLdapSyncInterface(); ldapSyncInterface != nil {
|
||||
workers.LdapSync = ldapSyncInterface.MakeWorker()
|
||||
}
|
||||
|
||||
return workers
|
||||
}
|
||||
|
||||
@@ -58,7 +63,11 @@ func (workers *Workers) Start() *Workers {
|
||||
go workers.ElasticsearchAggregation.Run()
|
||||
}
|
||||
|
||||
go workers.watcher.Start()
|
||||
if workers.LdapSync != nil && *utils.Cfg.LdapSettings.Enable {
|
||||
go workers.LdapSync.Run()
|
||||
}
|
||||
|
||||
go workers.Watcher.Start()
|
||||
})
|
||||
|
||||
workers.listenerId = utils.AddConfigListener(workers.handleConfigChange)
|
||||
@@ -90,12 +99,20 @@ func (workers *Workers) handleConfigChange(oldConfig *model.Config, newConfig *m
|
||||
workers.ElasticsearchAggregation.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
if workers.LdapSync != nil {
|
||||
if !*oldConfig.LdapSettings.Enable && *newConfig.LdapSettings.Enable {
|
||||
go workers.LdapSync.Run()
|
||||
} else if *oldConfig.LdapSettings.Enable && !*newConfig.LdapSettings.Enable {
|
||||
workers.LdapSync.Stop()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (workers *Workers) Stop() *Workers {
|
||||
utils.RemoveConfigListener(workers.listenerId)
|
||||
|
||||
workers.watcher.Stop()
|
||||
workers.Watcher.Stop()
|
||||
|
||||
if workers.DataRetention != nil && (*utils.Cfg.DataRetentionSettings.EnableMessageDeletion || *utils.Cfg.DataRetentionSettings.EnableFileDeletion) {
|
||||
workers.DataRetention.Stop()
|
||||
@@ -109,6 +126,10 @@ func (workers *Workers) Stop() *Workers {
|
||||
workers.ElasticsearchAggregation.Stop()
|
||||
}
|
||||
|
||||
if workers.LdapSync != nil && *utils.Cfg.LdapSettings.Enable {
|
||||
workers.LdapSync.Stop()
|
||||
}
|
||||
|
||||
l4g.Info("Stopped workers")
|
||||
|
||||
return workers
|
||||
|
||||
@@ -13,6 +13,7 @@ const (
|
||||
JOB_TYPE_DATA_RETENTION = "data_retention"
|
||||
JOB_TYPE_ELASTICSEARCH_POST_INDEXING = "elasticsearch_post_indexing"
|
||||
JOB_TYPE_ELASTICSEARCH_POST_AGGREGATION = "elasticsearch_post_aggregation"
|
||||
JOB_TYPE_LDAP_SYNC = "ldap_sync"
|
||||
|
||||
JOB_STATUS_PENDING = "pending"
|
||||
JOB_STATUS_IN_PROGRESS = "in_progress"
|
||||
@@ -47,6 +48,7 @@ func (j *Job) IsValid() *AppError {
|
||||
case JOB_TYPE_DATA_RETENTION:
|
||||
case JOB_TYPE_ELASTICSEARCH_POST_INDEXING:
|
||||
case JOB_TYPE_ELASTICSEARCH_POST_AGGREGATION:
|
||||
case JOB_TYPE_LDAP_SYNC:
|
||||
default:
|
||||
return NewAppError("Job.IsValid", "model.job.is_valid.type.app_error", nil, "id="+j.Id, http.StatusBadRequest)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user