mirror of
https://github.com/mattermost/mattermost.git
synced 2025-02-25 18:55:24 -06:00
[MM-23281] Persist registeredPlugin in plugin.Environment when plugin is deactivated (#14110)
* store failed timestamps on health check job instead of on registeredPlugin Update test * change EnsurePlugin calls * Make env.SetPluginState private * Write test for plugin deactivate and PluginStateFailedToStayRunning * Add license comment * adjust comments, use time.Since * Additional PR feedback: time.Since cleanup test cleanup remove duplicate .Store() call * PR Feedback - Add test case for reactivating the failed plugin - Change `crashed` to `healthy` and `hasPluginCrashed` to `isPluginHealthy` - remove stale timestamps from health check job * Keep registeredPlugins in env when plugin is deactivated, so the crashed state of a plugin can be persisted. * PR feedback * PR feedback from Jesse Co-authored-by: mattermod <mattermod@users.noreply.github.com>
This commit is contained in:
@@ -32,9 +32,7 @@ type registeredPlugin struct {
|
||||
BundleInfo *model.BundleInfo
|
||||
State int
|
||||
|
||||
failTimeStamps []time.Time
|
||||
lastError error
|
||||
supervisor *supervisor
|
||||
supervisor *supervisor
|
||||
}
|
||||
|
||||
// PrepackagedPlugin is a plugin prepackaged with the server and found on startup.
|
||||
@@ -140,8 +138,8 @@ func (env *Environment) GetPluginState(id string) int {
|
||||
return rp.(registeredPlugin).State
|
||||
}
|
||||
|
||||
// SetPluginState sets the current state of a plugin (disabled, running, or error)
|
||||
func (env *Environment) SetPluginState(id string, state int) {
|
||||
// setPluginState sets the current state of a plugin (disabled, running, or error)
|
||||
func (env *Environment) setPluginState(id string, state int) {
|
||||
if rp, ok := env.registeredPlugins.Load(id); ok {
|
||||
p := rp.(registeredPlugin)
|
||||
p.State = state
|
||||
@@ -152,7 +150,7 @@ func (env *Environment) SetPluginState(id string, state int) {
|
||||
// PublicFilesPath returns a path and true if the plugin with the given id is active.
|
||||
// It returns an empty string and false if the path is not set or invalid
|
||||
func (env *Environment) PublicFilesPath(id string) (string, error) {
|
||||
if _, ok := env.registeredPlugins.Load(id); !ok {
|
||||
if !env.IsActive(id) {
|
||||
return "", fmt.Errorf("plugin not found: %v", id)
|
||||
}
|
||||
return filepath.Join(env.pluginDir, id, "public"), nil
|
||||
@@ -229,21 +227,14 @@ func (env *Environment) Activate(id string) (manifest *model.Manifest, activated
|
||||
return nil, false, fmt.Errorf("plugin not found: %v", id)
|
||||
}
|
||||
|
||||
value, ok := env.registeredPlugins.Load(id)
|
||||
if !ok {
|
||||
value = newRegisteredPlugin(pluginInfo)
|
||||
}
|
||||
|
||||
rp := value.(registeredPlugin)
|
||||
// Store latest BundleInfo in case something has changed since last activation
|
||||
rp.BundleInfo = pluginInfo
|
||||
rp := newRegisteredPlugin(pluginInfo)
|
||||
env.registeredPlugins.Store(id, rp)
|
||||
|
||||
defer func() {
|
||||
if reterr == nil {
|
||||
env.SetPluginState(id, model.PluginStateRunning)
|
||||
env.setPluginState(id, model.PluginStateRunning)
|
||||
} else {
|
||||
env.SetPluginState(id, model.PluginStateFailedToStart)
|
||||
env.setPluginState(id, model.PluginStateFailedToStart)
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -307,7 +298,7 @@ func (env *Environment) Deactivate(id string) bool {
|
||||
|
||||
isActive := env.IsActive(id)
|
||||
|
||||
env.SetPluginState(id, model.PluginStateNotRunning)
|
||||
env.setPluginState(id, model.PluginStateNotRunning)
|
||||
|
||||
if !isActive {
|
||||
return false
|
||||
@@ -321,8 +312,6 @@ func (env *Environment) Deactivate(id string) bool {
|
||||
rp.supervisor.Shutdown()
|
||||
}
|
||||
|
||||
env.registeredPlugins.Delete(id)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -483,6 +472,21 @@ func (env *Environment) RunMultiPluginHook(hookRunnerFunc func(hooks Hooks) bool
|
||||
}
|
||||
}
|
||||
|
||||
// performHealthCheck uses the active plugin's supervisor to verify if the plugin has crashed.
|
||||
func (env *Environment) performHealthCheck(id string) error {
|
||||
p, ok := env.registeredPlugins.Load(id)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
rp := p.(registeredPlugin)
|
||||
|
||||
sup := rp.supervisor
|
||||
if sup == nil {
|
||||
return nil
|
||||
}
|
||||
return sup.PerformHealthCheck()
|
||||
}
|
||||
|
||||
// SetPrepackagedPlugins saves prepackaged plugins in the environment.
|
||||
func (env *Environment) SetPrepackagedPlugins(plugins []*PrepackagedPlugin) {
|
||||
env.prepackagedPluginsLock.Lock()
|
||||
@@ -492,5 +496,30 @@ func (env *Environment) SetPrepackagedPlugins(plugins []*PrepackagedPlugin) {
|
||||
|
||||
func newRegisteredPlugin(bundle *model.BundleInfo) registeredPlugin {
|
||||
state := model.PluginStateNotRunning
|
||||
return registeredPlugin{failTimeStamps: []time.Time{}, State: state, BundleInfo: bundle}
|
||||
return registeredPlugin{State: state, BundleInfo: bundle}
|
||||
}
|
||||
|
||||
// InitPluginHealthCheckJob starts a new job if one is not running and is set to enabled, or kills an existing one if set to disabled.
|
||||
func (env *Environment) InitPluginHealthCheckJob(enable bool) {
|
||||
// Config is set to enable. No job exists, start a new job.
|
||||
if enable && env.pluginHealthCheckJob == nil {
|
||||
mlog.Debug("Enabling plugin health check job", mlog.Duration("interval_s", HEALTH_CHECK_INTERVAL))
|
||||
|
||||
job := newPluginHealthCheckJob(env)
|
||||
env.pluginHealthCheckJob = job
|
||||
go job.run()
|
||||
}
|
||||
|
||||
// Config is set to disable. Job exists, kill existing job.
|
||||
if !enable && env.pluginHealthCheckJob != nil {
|
||||
mlog.Debug("Disabling plugin health check job")
|
||||
|
||||
env.pluginHealthCheckJob.Cancel()
|
||||
env.pluginHealthCheckJob = nil
|
||||
}
|
||||
}
|
||||
|
||||
// GetPluginHealthCheckJob returns the configured PluginHealthCheckJob, if any.
|
||||
func (env *Environment) GetPluginHealthCheckJob() *PluginHealthCheckJob {
|
||||
return env.pluginHealthCheckJob
|
||||
}
|
||||
|
||||
@@ -12,113 +12,82 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
HEALTH_CHECK_INTERVAL = 30 * time.Second // How often the health check should run
|
||||
HEALTH_CHECK_DISABLE_DURATION = 60 * time.Minute // How long we wait for num fails to incur before disabling the plugin
|
||||
HEALTH_CHECK_PING_FAIL_LIMIT = 3 // How many times we call RPC ping in a row before it is considered a failure
|
||||
HEALTH_CHECK_RESTART_LIMIT = 3 // How many times we restart a plugin before we disable it
|
||||
HEALTH_CHECK_INTERVAL = 30 * time.Second // How often the health check should run
|
||||
HEALTH_CHECK_DEACTIVATION_WINDOW = 60 * time.Minute // How long we wait for num fails to occur before deactivating the plugin
|
||||
HEALTH_CHECK_PING_FAIL_LIMIT = 3 // How many times we call RPC ping in a row before it is considered a failure
|
||||
HEALTH_CHECK_NUM_RESTARTS_LIMIT = 3 // How many times we restart a plugin before we deactivate it
|
||||
)
|
||||
|
||||
type PluginHealthCheckJob struct {
|
||||
cancel chan struct{}
|
||||
cancelled chan struct{}
|
||||
cancelOnce sync.Once
|
||||
env *Environment
|
||||
cancel chan struct{}
|
||||
cancelled chan struct{}
|
||||
cancelOnce sync.Once
|
||||
env *Environment
|
||||
failureTimestamps sync.Map
|
||||
}
|
||||
|
||||
// InitPluginHealthCheckJob starts a new job if one is not running and is set to enabled, or kills an existing one if set to disabled.
|
||||
func (env *Environment) InitPluginHealthCheckJob(enable bool) {
|
||||
// Config is set to enable. No job exists, start a new job.
|
||||
if enable && env.pluginHealthCheckJob == nil {
|
||||
mlog.Debug("Enabling plugin health check job", mlog.Duration("interval_s", HEALTH_CHECK_INTERVAL))
|
||||
|
||||
job := newPluginHealthCheckJob(env)
|
||||
env.pluginHealthCheckJob = job
|
||||
job.Start()
|
||||
}
|
||||
|
||||
// Config is set to disable. Job exists, kill existing job.
|
||||
if !enable && env.pluginHealthCheckJob != nil {
|
||||
mlog.Debug("Disabling plugin health check job")
|
||||
|
||||
env.pluginHealthCheckJob.Cancel()
|
||||
env.pluginHealthCheckJob = nil
|
||||
}
|
||||
}
|
||||
|
||||
// Start continuously runs health checks on all active plugins, on a timer.
|
||||
func (job *PluginHealthCheckJob) Start() {
|
||||
// run continuously performs health checks on all active plugins, on a timer.
|
||||
func (job *PluginHealthCheckJob) run() {
|
||||
mlog.Debug("Plugin health check job starting.")
|
||||
defer close(job.cancelled)
|
||||
|
||||
go func() {
|
||||
defer close(job.cancelled)
|
||||
ticker := time.NewTicker(HEALTH_CHECK_INTERVAL)
|
||||
defer ticker.Stop()
|
||||
|
||||
ticker := time.NewTicker(HEALTH_CHECK_INTERVAL)
|
||||
defer func() {
|
||||
ticker.Stop()
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
activePlugins := job.env.Active()
|
||||
for _, plugin := range activePlugins {
|
||||
job.checkPlugin(plugin.Manifest.Id)
|
||||
}
|
||||
case <-job.cancel:
|
||||
return
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
activePlugins := job.env.Active()
|
||||
for _, plugin := range activePlugins {
|
||||
job.CheckPlugin(plugin.Manifest.Id)
|
||||
}
|
||||
case <-job.cancel:
|
||||
return
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// checkPlugin determines the plugin's health status, then handles the error or success case.
|
||||
func (job *PluginHealthCheckJob) checkPlugin(id string) {
|
||||
p, ok := job.env.registeredPlugins.Load(id)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
rp := p.(registeredPlugin)
|
||||
|
||||
sup := rp.supervisor
|
||||
if sup == nil {
|
||||
return
|
||||
}
|
||||
|
||||
pluginErr := sup.PerformHealthCheck()
|
||||
|
||||
if pluginErr != nil {
|
||||
mlog.Error("Health check failed for plugin", mlog.String("id", id), mlog.Err(pluginErr))
|
||||
job.handleHealthCheckFail(id, pluginErr)
|
||||
}
|
||||
}
|
||||
|
||||
// handleHealthCheckFail restarts or deactivates the plugin based on how many times it has failed in a configured amount of time.
|
||||
func (job *PluginHealthCheckJob) handleHealthCheckFail(id string, err error) {
|
||||
rp, ok := job.env.registeredPlugins.Load(id)
|
||||
if !ok {
|
||||
// CheckPlugin determines the plugin's health status, then handles the error or success case.
|
||||
// If the plugin passes the health check, do nothing.
|
||||
// If the plugin fails the health check, the function either restarts or deactivates the plugin, based on the quantity and frequency of its failures.
|
||||
func (job *PluginHealthCheckJob) CheckPlugin(id string) {
|
||||
err := job.env.performHealthCheck(id)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
p := rp.(registeredPlugin)
|
||||
|
||||
// Append current failure before checking for deactivate vs restart action
|
||||
p.failTimeStamps = append(p.failTimeStamps, time.Now())
|
||||
p.lastError = err
|
||||
job.env.registeredPlugins.Store(id, p)
|
||||
mlog.Error("Health check failed for plugin", mlog.String("id", id), mlog.Err(err))
|
||||
timestamps := job.getStoredTimestamps(id)
|
||||
timestamps = append(timestamps, time.Now())
|
||||
|
||||
if shouldDeactivatePlugin(p) {
|
||||
p.failTimeStamps = []time.Time{}
|
||||
job.env.registeredPlugins.Store(id, p)
|
||||
if shouldDeactivatePlugin(timestamps) {
|
||||
// Order matters here, must deactivate first and then set plugin state
|
||||
mlog.Debug("Deactivating plugin due to multiple crashes", mlog.String("id", id))
|
||||
job.env.Deactivate(id)
|
||||
job.env.SetPluginState(id, model.PluginStateFailedToStayRunning)
|
||||
|
||||
// Reset timestamp state for this plugin
|
||||
job.failureTimestamps.Delete(id)
|
||||
job.env.setPluginState(id, model.PluginStateFailedToStayRunning)
|
||||
} else {
|
||||
mlog.Debug("Restarting plugin due to failed health check", mlog.String("id", id))
|
||||
if err := job.env.RestartPlugin(id); err != nil {
|
||||
mlog.Error("Failed to restart plugin", mlog.String("id", id), mlog.Err(err))
|
||||
}
|
||||
|
||||
// Store this failure so we can continue to monitor the plugin
|
||||
job.failureTimestamps.Store(id, removeStaleTimestamps(timestamps))
|
||||
}
|
||||
}
|
||||
|
||||
// getStoredTimestamps returns the stored failure timestamps for a plugin.
|
||||
func (job *PluginHealthCheckJob) getStoredTimestamps(id string) []time.Time {
|
||||
timestamps, ok := job.failureTimestamps.Load(id)
|
||||
if !ok {
|
||||
timestamps = []time.Time{}
|
||||
}
|
||||
return timestamps.([]time.Time)
|
||||
}
|
||||
|
||||
func newPluginHealthCheckJob(env *Environment) *PluginHealthCheckJob {
|
||||
return &PluginHealthCheckJob{
|
||||
cancel: make(chan struct{}),
|
||||
@@ -134,18 +103,22 @@ func (job *PluginHealthCheckJob) Cancel() {
|
||||
<-job.cancelled
|
||||
}
|
||||
|
||||
// shouldDeactivatePlugin determines if a plugin needs to be deactivated after certain criteria is met.
|
||||
//
|
||||
// The criteria is based on if the plugin has consistently failed during the configured number of restarts, within the configured time window.
|
||||
func shouldDeactivatePlugin(rp registeredPlugin) bool {
|
||||
if len(rp.failTimeStamps) >= HEALTH_CHECK_RESTART_LIMIT {
|
||||
index := len(rp.failTimeStamps) - HEALTH_CHECK_RESTART_LIMIT
|
||||
t := rp.failTimeStamps[index]
|
||||
now := time.Now()
|
||||
elapsed := now.Sub(t)
|
||||
if elapsed <= HEALTH_CHECK_DISABLE_DURATION {
|
||||
return true
|
||||
}
|
||||
// shouldDeactivatePlugin determines if a plugin needs to be deactivated after the plugin has failed (HEALTH_CHECK_NUM_RESTARTS_LIMIT) times,
|
||||
// within the configured time window (HEALTH_CHECK_DEACTIVATION_WINDOW).
|
||||
func shouldDeactivatePlugin(failedTimestamps []time.Time) bool {
|
||||
if len(failedTimestamps) < HEALTH_CHECK_NUM_RESTARTS_LIMIT {
|
||||
return false
|
||||
}
|
||||
return false
|
||||
|
||||
index := len(failedTimestamps) - HEALTH_CHECK_NUM_RESTARTS_LIMIT
|
||||
return time.Since(failedTimestamps[index]) <= HEALTH_CHECK_DEACTIVATION_WINDOW
|
||||
}
|
||||
|
||||
// removeStaleTimestamps only keeps the last HEALTH_CHECK_NUM_RESTARTS_LIMIT items in timestamps.
|
||||
func removeStaleTimestamps(timestamps []time.Time) []time.Time {
|
||||
if len(timestamps) > HEALTH_CHECK_NUM_RESTARTS_LIMIT {
|
||||
timestamps = timestamps[len(timestamps)-HEALTH_CHECK_NUM_RESTARTS_LIMIT:]
|
||||
}
|
||||
|
||||
return timestamps
|
||||
}
|
||||
|
||||
@@ -118,39 +118,36 @@ func testPluginHealthCheckPanic(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestShouldDeactivatePlugin(t *testing.T) {
|
||||
bundle := &model.BundleInfo{}
|
||||
rp := newRegisteredPlugin(bundle)
|
||||
require.NotNil(t, rp)
|
||||
|
||||
// No failures, don't restart
|
||||
result := shouldDeactivatePlugin(rp)
|
||||
ftime := []time.Time{}
|
||||
result := shouldDeactivatePlugin(ftime)
|
||||
require.Equal(t, false, result)
|
||||
|
||||
now := time.Now()
|
||||
|
||||
// Failures are recent enough to restart
|
||||
rp = newRegisteredPlugin(bundle)
|
||||
rp.failTimeStamps = append(rp.failTimeStamps, now.Add(-HEALTH_CHECK_DISABLE_DURATION/10*2))
|
||||
rp.failTimeStamps = append(rp.failTimeStamps, now.Add(-HEALTH_CHECK_DISABLE_DURATION/10))
|
||||
rp.failTimeStamps = append(rp.failTimeStamps, now)
|
||||
ftime = []time.Time{}
|
||||
ftime = append(ftime, now.Add(-HEALTH_CHECK_DEACTIVATION_WINDOW/10*2))
|
||||
ftime = append(ftime, now.Add(-HEALTH_CHECK_DEACTIVATION_WINDOW/10))
|
||||
ftime = append(ftime, now)
|
||||
|
||||
result = shouldDeactivatePlugin(rp)
|
||||
result = shouldDeactivatePlugin(ftime)
|
||||
require.Equal(t, true, result)
|
||||
|
||||
// Failures are too spaced out to warrant a restart
|
||||
rp = newRegisteredPlugin(bundle)
|
||||
rp.failTimeStamps = append(rp.failTimeStamps, now.Add(-HEALTH_CHECK_DISABLE_DURATION*2))
|
||||
rp.failTimeStamps = append(rp.failTimeStamps, now.Add(-HEALTH_CHECK_DISABLE_DURATION*1))
|
||||
rp.failTimeStamps = append(rp.failTimeStamps, now)
|
||||
ftime = []time.Time{}
|
||||
ftime = append(ftime, now.Add(-HEALTH_CHECK_DEACTIVATION_WINDOW*2))
|
||||
ftime = append(ftime, now.Add(-HEALTH_CHECK_DEACTIVATION_WINDOW*1))
|
||||
ftime = append(ftime, now)
|
||||
|
||||
result = shouldDeactivatePlugin(rp)
|
||||
result = shouldDeactivatePlugin(ftime)
|
||||
require.Equal(t, false, result)
|
||||
|
||||
// Not enough failures are present to warrant a restart
|
||||
rp = newRegisteredPlugin(bundle)
|
||||
rp.failTimeStamps = append(rp.failTimeStamps, now.Add(-HEALTH_CHECK_DISABLE_DURATION/10))
|
||||
rp.failTimeStamps = append(rp.failTimeStamps, now)
|
||||
ftime = []time.Time{}
|
||||
ftime = append(ftime, now.Add(-HEALTH_CHECK_DEACTIVATION_WINDOW/10))
|
||||
ftime = append(ftime, now)
|
||||
|
||||
result = shouldDeactivatePlugin(rp)
|
||||
result = shouldDeactivatePlugin(ftime)
|
||||
require.Equal(t, false, result)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user