[MM-13507] Plugin framework: auto-restart plugins that crash (#10781)

* introduce plugin health check

* implement plugin health check job

* add support for checking pid of plugin process and RPC ping, to determine a plugin's health

* implement restart policy with back-offs

* support "EnableHealthCheck" boolean from config file.

* add tests for supervisor.PerformHealthCheck() and shouldDeactivatePlugin()

* improve error handling. clean up if blocks to be more concise
This commit is contained in:
Michael Kochell
2019-05-09 16:08:31 -04:00
committed by Christopher Speller
parent 818e0470df
commit 43e95b0b2b
7 changed files with 485 additions and 12 deletions

View File

@@ -31,11 +31,13 @@ type activePlugin struct {
// It is meant for use by the Mattermost server to manipulate, interact with and report on the set
// of active plugins.
type Environment struct {
activePlugins sync.Map
logger *mlog.Logger
newAPIImpl apiImplCreatorFunc
pluginDir string
webappPluginDir string
activePlugins sync.Map
pluginHealthStatuses sync.Map
pluginHealthCheckJob *PluginHealthCheckJob
logger *mlog.Logger
newAPIImpl apiImplCreatorFunc
pluginDir string
webappPluginDir string
}
func NewEnvironment(newAPIImpl apiImplCreatorFunc, pluginDir string, webappPluginDir string, logger *mlog.Logger) (*Environment, error) {
@@ -227,6 +229,15 @@ func (env *Environment) Activate(id string) (manifest *model.Manifest, activated
ap.supervisor = sup
componentActivated = true
var h *PluginHealthStatus
if health, ok := env.pluginHealthStatuses.Load(id); ok {
h = health.(*PluginHealthStatus)
} else {
h = newPluginHealthStatus()
env.pluginHealthStatuses.Store(id, h)
}
h.Crashed = false
}
if !componentActivated {
@@ -256,6 +267,30 @@ func (env *Environment) Deactivate(id string) bool {
return true
}
// RestartPlugin deactivates, then activates the plugin with the given id.
func (env *Environment) RestartPlugin(id string) error {
env.Deactivate(id)
_, _, err := env.Activate(id)
return err
}
// UpdatePluginHealthStatus accepts a callback to edit the stored health status of the plugin.
func (env *Environment) UpdatePluginHealthStatus(id string, callback func(*PluginHealthStatus)) {
if h, ok := env.pluginHealthStatuses.Load(id); ok {
callback(h.(*PluginHealthStatus))
}
}
// CheckPluginHealthStatus checks if the plugin is in a failed state, based on information gathered from previous health checks.
func (env *Environment) CheckPluginHealthStatus(id string) error {
if h, ok := env.pluginHealthStatuses.Load(id); ok {
if h.(*PluginHealthStatus).Crashed {
return h.(*PluginHealthStatus).lastError
}
}
return nil
}
// Shutdown deactivates all plugins and gracefully shuts down the environment.
func (env *Environment) Shutdown() {
env.activePlugins.Range(func(key, value interface{}) bool {

156
plugin/health_check.go Normal file
View File

@@ -0,0 +1,156 @@
// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
// See License.txt for license information.
package plugin
import (
"fmt"
"time"
"github.com/mattermost/mattermost-server/mlog"
)
const (
HEALTH_CHECK_INTERVAL = 30 // seconds. How often the health check should run
HEALTH_CHECK_DISABLE_DURATION = 60 // minutes. How long we wait for num fails to incur before disabling the plugin
HEALTH_CHECK_PING_FAIL_LIMIT = 3 // How many times we call RPC ping in a row before it is considered a failure
HEALTH_CHECK_RESTART_LIMIT = 3 // How many times we restart a plugin before we disable it
)
type PluginHealthCheckJob struct {
cancel chan struct{}
cancelled chan struct{}
env *Environment
}
type PluginHealthStatus struct {
Crashed bool
failTimeStamps []time.Time
lastError error
}
// InitPluginHealthCheckJob starts a new job if one is not running and is set to enabled, or kills an existing one if set to disabled.
func (env *Environment) InitPluginHealthCheckJob(enable bool) {
// Config is set to enable. No job exists, start a new job.
if enable && env.pluginHealthCheckJob == nil {
job := newPluginHealthCheckJob(env)
env.pluginHealthCheckJob = job
job.Start()
}
// Config is set to disable. Job exists, kill existing job.
if !enable && env.pluginHealthCheckJob != nil {
env.pluginHealthCheckJob.Cancel()
env.pluginHealthCheckJob = nil
}
}
// Start continuously runs health checks on all active plugins, on a timer.
func (job *PluginHealthCheckJob) Start() {
interval := time.Duration(HEALTH_CHECK_INTERVAL) * time.Second
mlog.Debug(fmt.Sprintf("Plugin health check job starting. Sending health check pings every %v seconds.", interval))
go func() {
defer close(job.cancelled)
ticker := time.NewTicker(interval)
defer func() {
ticker.Stop()
}()
for {
select {
case <-ticker.C:
activePlugins := job.env.Active()
for _, plugin := range activePlugins {
job.checkPlugin(plugin.Manifest.Id)
}
case <-job.cancel:
return
}
}
}()
}
// checkPlugin determines the plugin's health status, then handles the error or success case.
func (job *PluginHealthCheckJob) checkPlugin(id string) {
p, ok := job.env.activePlugins.Load(id)
if !ok {
return
}
ap := p.(activePlugin)
if _, ok := job.env.pluginHealthStatuses.Load(id); !ok {
job.env.pluginHealthStatuses.Store(id, newPluginHealthStatus())
}
sup := ap.supervisor
if sup == nil {
return
}
pluginErr := sup.PerformHealthCheck()
if pluginErr != nil {
mlog.Error(fmt.Sprintf("Health check failed for plugin %s, error: %s", id, pluginErr.Error()))
job.handleHealthCheckFail(id, pluginErr)
}
}
// handleHealthCheckFail restarts or deactivates the plugin based on how many times it has failed in a configured amount of time.
func (job *PluginHealthCheckJob) handleHealthCheckFail(id string, err error) {
health, ok := job.env.pluginHealthStatuses.Load(id)
if !ok {
return
}
h := health.(*PluginHealthStatus)
// Append current failure before checking for deactivate vs restart action
h.failTimeStamps = append(h.failTimeStamps, time.Now())
h.lastError = err
if shouldDeactivatePlugin(h) {
h.failTimeStamps = []time.Time{}
h.Crashed = true
mlog.Debug(fmt.Sprintf("Deactivating plugin due to multiple crashes `%s`", id))
job.env.Deactivate(id)
} else {
mlog.Debug(fmt.Sprintf("Restarting plugin due to failed health check `%s`", id))
if err := job.env.RestartPlugin(id); err != nil {
mlog.Error(fmt.Sprintf("Failed to restart plugin `%s`: %s", id, err.Error()))
}
}
}
func newPluginHealthCheckJob(env *Environment) *PluginHealthCheckJob {
return &PluginHealthCheckJob{
cancel: make(chan struct{}),
cancelled: make(chan struct{}),
env: env,
}
}
func (job *PluginHealthCheckJob) Cancel() {
close(job.cancel)
<-job.cancelled
}
func newPluginHealthStatus() *PluginHealthStatus {
return &PluginHealthStatus{failTimeStamps: []time.Time{}, Crashed: false}
}
// shouldDeactivatePlugin determines if a plugin needs to be deactivated after certain criteria is met.
//
// The criteria is based on if the plugin has consistently failed during the configured number of restarts, within the configured time window.
func shouldDeactivatePlugin(h *PluginHealthStatus) bool {
if len(h.failTimeStamps) >= HEALTH_CHECK_RESTART_LIMIT {
index := len(h.failTimeStamps) - HEALTH_CHECK_RESTART_LIMIT
t := h.failTimeStamps[index]
now := time.Now()
elapsed := now.Sub(t).Minutes()
if elapsed <= HEALTH_CHECK_DISABLE_DURATION {
return true
}
}
return false
}

207
plugin/health_check_test.go Normal file
View File

@@ -0,0 +1,207 @@
// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
// See License.txt for license information.
package plugin
import (
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
"github.com/mattermost/mattermost-server/mlog"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/utils"
"github.com/stretchr/testify/require"
)
func TestPluginHealthCheck(t *testing.T) {
for name, f := range map[string]func(*testing.T){
"PluginHealthCheck_Success": testPluginHealthCheck_Success,
"PluginHealthCheck_PluginPanicProcessCheck": testPluginHealthCheck_PluginPanicProcessCheck,
"PluginHealthCheck_RPCPingFail": testPluginHealthCheck_RPCPingFail,
} {
t.Run(name, f)
}
}
func testPluginHealthCheck_Success(t *testing.T) {
dir, err := ioutil.TempDir("", "")
require.NoError(t, err)
defer os.RemoveAll(dir)
backend := filepath.Join(dir, "backend.exe")
utils.CompileGo(t, `
package main
import (
"github.com/mattermost/mattermost-server/plugin"
)
type MyPlugin struct {
plugin.MattermostPlugin
}
func main() {
plugin.ClientMain(&MyPlugin{})
}
`, backend)
err = ioutil.WriteFile(filepath.Join(dir, "plugin.json"), []byte(`{"id": "foo", "backend": {"executable": "backend.exe"}}`), 0600)
require.NoError(t, err)
bundle := model.BundleInfoForPath(dir)
log := mlog.NewLogger(&mlog.LoggerConfiguration{
EnableConsole: true,
ConsoleJson: true,
ConsoleLevel: "error",
EnableFile: false,
})
supervisor, err := newSupervisor(bundle, log, nil)
require.Nil(t, err)
require.NotNil(t, supervisor)
err = supervisor.PerformHealthCheck()
require.Nil(t, err)
}
func testPluginHealthCheck_PluginPanicProcessCheck(t *testing.T) {
dir, err := ioutil.TempDir("", "")
require.NoError(t, err)
defer os.RemoveAll(dir)
backend := filepath.Join(dir, "backend.exe")
utils.CompileGo(t, `
package main
import (
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/plugin"
)
type MyPlugin struct {
plugin.MattermostPlugin
}
func (p *MyPlugin) MessageWillBePosted(c *plugin.Context, post *model.Post) (*model.Post, string) {
panic("Uncaught error")
}
func main() {
plugin.ClientMain(&MyPlugin{})
}
`, backend)
err = ioutil.WriteFile(filepath.Join(dir, "plugin.json"), []byte(`{"id": "foo", "backend": {"executable": "backend.exe"}}`), 0600)
require.NoError(t, err)
bundle := model.BundleInfoForPath(dir)
log := mlog.NewLogger(&mlog.LoggerConfiguration{
EnableConsole: true,
ConsoleJson: true,
ConsoleLevel: "error",
EnableFile: false,
})
supervisor, err := newSupervisor(bundle, log, nil)
require.Nil(t, err)
require.NotNil(t, supervisor)
err = supervisor.PerformHealthCheck()
require.Nil(t, err)
supervisor.hooks.MessageWillBePosted(&Context{}, &model.Post{})
time.Sleep(10 * time.Millisecond)
err = supervisor.PerformHealthCheck()
require.NotNil(t, err)
require.Equal(t, "Plugin process not found, or not responding", err.Error())
}
func testPluginHealthCheck_RPCPingFail(t *testing.T) {
dir, err := ioutil.TempDir("", "")
require.NoError(t, err)
defer os.RemoveAll(dir)
backend := filepath.Join(dir, "backend.exe")
utils.CompileGo(t, `
package main
import (
"github.com/mattermost/mattermost-server/plugin"
)
type MyPlugin struct {
plugin.MattermostPlugin
}
func main() {
plugin.ClientMain(&MyPlugin{})
}
`, backend)
err = ioutil.WriteFile(filepath.Join(dir, "plugin.json"), []byte(`{"id": "foo", "backend": {"executable": "backend.exe"}}`), 0600)
require.NoError(t, err)
bundle := model.BundleInfoForPath(dir)
log := mlog.NewLogger(&mlog.LoggerConfiguration{
EnableConsole: true,
ConsoleJson: true,
ConsoleLevel: "error",
EnableFile: false,
})
supervisor, err := newSupervisor(bundle, log, nil)
require.Nil(t, err)
require.NotNil(t, supervisor)
err = supervisor.PerformHealthCheck()
require.Nil(t, err)
c, err := supervisor.client.Client()
require.Nil(t, err)
c.Close()
err = supervisor.PerformHealthCheck()
require.NotNil(t, err)
require.Equal(t, "Plugin RPC connection is not responding", err.Error())
}
func TestShouldDeactivatePlugin(t *testing.T) {
h := newPluginHealthStatus()
require.NotNil(t, h)
// No failures, don't restart
result := shouldDeactivatePlugin(h)
require.Equal(t, false, result)
now := time.Now()
// Failures are recent enough to restart
h = newPluginHealthStatus()
h.failTimeStamps = append(h.failTimeStamps, now.Add(-HEALTH_CHECK_DISABLE_DURATION*0.2*time.Minute))
h.failTimeStamps = append(h.failTimeStamps, now.Add(-HEALTH_CHECK_DISABLE_DURATION*0.1*time.Minute))
h.failTimeStamps = append(h.failTimeStamps, now)
result = shouldDeactivatePlugin(h)
require.Equal(t, true, result)
// Failures are too spaced out to warrant a restart
h = newPluginHealthStatus()
h.failTimeStamps = append(h.failTimeStamps, now.Add(-HEALTH_CHECK_DISABLE_DURATION*2*time.Minute))
h.failTimeStamps = append(h.failTimeStamps, now.Add(-HEALTH_CHECK_DISABLE_DURATION*1*time.Minute))
h.failTimeStamps = append(h.failTimeStamps, now)
result = shouldDeactivatePlugin(h)
require.Equal(t, false, result)
// Not enough failures are present to warrant a restart
h = newPluginHealthStatus()
h.failTimeStamps = append(h.failTimeStamps, now.Add(-HEALTH_CHECK_DISABLE_DURATION*0.1*time.Minute))
h.failTimeStamps = append(h.failTimeStamps, now)
result = shouldDeactivatePlugin(h)
require.Equal(t, false, result)
}

View File

@@ -4,11 +4,14 @@
package plugin
import (
"errors"
"fmt"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"syscall"
"time"
plugin "github.com/hashicorp/go-plugin"
@@ -20,6 +23,7 @@ type supervisor struct {
client *plugin.Client
hooks Hooks
implemented [TotalHooksId]bool
pid int
}
func newSupervisor(pluginInfo *model.BundleInfo, parentLogger *mlog.Logger, apiImpl API) (retSupervisor *supervisor, retErr error) {
@@ -53,10 +57,12 @@ func newSupervisor(pluginInfo *model.BundleInfo, parentLogger *mlog.Logger, apiI
}
executable = filepath.Join(pluginInfo.Path, executable)
cmd := exec.Command(executable)
sup.client = plugin.NewClient(&plugin.ClientConfig{
HandshakeConfig: handshake,
Plugins: pluginMap,
Cmd: exec.Command(executable),
Cmd: cmd,
SyncStdout: wrappedLogger.With(mlog.String("source", "plugin_stdout")).StdLogWriter(),
SyncStderr: wrappedLogger.With(mlog.String("source", "plugin_stderr")).StdLogWriter(),
Logger: hclogAdaptedLogger,
@@ -68,6 +74,8 @@ func newSupervisor(pluginInfo *model.BundleInfo, parentLogger *mlog.Logger, apiI
return nil, err
}
sup.pid = cmd.Process.Pid
raw, err := rpcClient.Dispense("hooks")
if err != nil {
return nil, err
@@ -103,6 +111,55 @@ func (sup *supervisor) Hooks() Hooks {
return sup.hooks
}
// PerformHealthCheck checks the plugin through a process check, an RPC ping, and a HealthCheck hook call.
func (sup *supervisor) PerformHealthCheck() error {
if procErr := sup.CheckProcess(); procErr != nil {
mlog.Debug(fmt.Sprintf("Error checking plugin process, error: %s", procErr.Error()))
return errors.New("Plugin process not found, or not responding")
}
if pingErr := sup.Ping(); pingErr != nil {
for pingFails := 1; pingFails < HEALTH_CHECK_PING_FAIL_LIMIT; pingFails++ {
pingErr = sup.Ping()
if pingErr == nil {
break
}
}
if pingErr != nil {
mlog.Debug(fmt.Sprintf("Error pinging plugin, error: %s", pingErr.Error()))
return fmt.Errorf("Plugin RPC connection is not responding")
}
}
return nil
}
// Ping checks that the RPC connection with the plugin is alive and healthy.
func (sup *supervisor) Ping() error {
client, err := sup.client.Client()
if err != nil {
return err
}
return client.Ping()
}
// CheckProcess checks if the plugin process's PID exists and can respond to a signal.
func (sup *supervisor) CheckProcess() error {
process, err := os.FindProcess(sup.pid)
if err != nil {
return err
}
err = process.Signal(syscall.Signal(0))
if err != nil {
return err
}
return nil
}
func (sup *supervisor) Implements(hookId int) bool {
return sup.implemented[hookId]
}