MM-8622: Improved plugin error reporting (#8737)

* allow `Wait()`ing on the supervisor

In the event the plugin supervisor shuts down a plugin for crashing too
many times, the new `Wait()` interface allows the `ActivatePlugin` to
accept a callback function to trigger when `supervisor.Wait()` returns.
If the supervisor shuts down normally, this callback is invoked with
a nil error, otherwise any error reported by the supervisor is passed
along.

* improve plugin activation/deactivation logic

Avoid triggering activation of previously failed-to-start plugins just
becase something in the configuration changed. Now, intelligently
compare the global enable bit as well as the each individual plugin's
enabled bit.

* expose store to manipulate PluginStatuses

* expose API to fetch plugin statuses

* keep track of whether or not plugin sandboxing is supported

* transition plugin statuses

* restore error on plugin activation if already active

* don't initialize test plugins until successfully loaded

* emit websocket events when plugin statuses change

* skip pruning if already initialized

* MM-8622: maintain plugin statuses in memory

Switch away from persisting plugin statuses to the database, and
maintain in memory instead. This will be followed by a cluster interface
to query the in-memory status of plugin statuses from all cluster nodes.

At the same time, rename `cluster_discovery_id` on the `PluginStatus`
model object to `cluster_id`.

* MM-8622: aggregate plugin statuses across cluster

* fetch cluster plugin statuses when emitting websocket notification

* address unit test fixes after rebasing

* relax (poor) racey unit test re: supervisor.Wait()

* make store-mocks
This commit is contained in:
Jesse Hallam
2018-05-23 14:26:35 -04:00
committed by GitHub
parent 5c21bdc178
commit 847c181ec9
17 changed files with 519 additions and 96 deletions

View File

@@ -108,7 +108,7 @@ func (env *Environment) IsPluginActive(pluginId string) bool {
}
// Activates the plugin with the given id.
func (env *Environment) ActivatePlugin(id string) error {
func (env *Environment) ActivatePlugin(id string, onError func(error)) error {
env.mutex.Lock()
defer env.mutex.Unlock()
@@ -117,7 +117,7 @@ func (env *Environment) ActivatePlugin(id string) error {
}
if _, ok := env.activePlugins[id]; ok {
return nil
return fmt.Errorf("plugin already active: %v", id)
}
plugins, err := ScanSearchPath(env.searchPath)
if err != nil {
@@ -156,6 +156,14 @@ func (env *Environment) ActivatePlugin(id string) error {
if err := supervisor.Start(api); err != nil {
return errors.Wrapf(err, "unable to start plugin: %v", id)
}
if onError != nil {
go func() {
err := supervisor.Wait()
if err != nil {
onError(err)
}
}()
}
activePlugin.Supervisor = supervisor
}

View File

@@ -56,6 +56,10 @@ func (m *MockSupervisor) Hooks() plugin.Hooks {
return m.Called().Get(0).(plugin.Hooks)
}
func (m *MockSupervisor) Wait() error {
return m.Called().Get(0).(error)
}
func initTmpDir(t *testing.T, files map[string]string) string {
success := false
dir, err := ioutil.TempDir("", "mm-plugin-test")
@@ -130,7 +134,7 @@ func TestEnvironment(t *testing.T) {
activePlugins := env.ActivePlugins()
assert.Len(t, activePlugins, 0)
assert.Error(t, env.ActivatePlugin("x"))
assert.Error(t, env.ActivatePlugin("x", nil))
var api struct{ plugin.API }
var supervisor MockSupervisor
@@ -145,11 +149,11 @@ func TestEnvironment(t *testing.T) {
supervisor.On("Stop").Return(nil)
supervisor.On("Hooks").Return(&hooks)
assert.NoError(t, env.ActivatePlugin("foo"))
assert.NoError(t, env.ActivatePlugin("foo", nil))
assert.Equal(t, env.ActivePluginIds(), []string{"foo"})
activePlugins = env.ActivePlugins()
assert.Len(t, activePlugins, 1)
assert.NoError(t, env.ActivatePlugin("foo"))
assert.Error(t, env.ActivatePlugin("foo", nil))
assert.True(t, env.IsPluginActive("foo"))
hooks.On("OnDeactivate").Return(nil)
@@ -157,7 +161,7 @@ func TestEnvironment(t *testing.T) {
assert.Error(t, env.DeactivatePlugin("foo"))
assert.False(t, env.IsPluginActive("foo"))
assert.NoError(t, env.ActivatePlugin("foo"))
assert.NoError(t, env.ActivatePlugin("foo", nil))
assert.Equal(t, env.ActivePluginIds(), []string{"foo"})
assert.Equal(t, env.SearchPath(), dir)
@@ -184,7 +188,7 @@ func TestEnvironment_DuplicatePluginError(t *testing.T) {
require.NoError(t, err)
defer env.Shutdown()
assert.Error(t, env.ActivatePlugin("foo"))
assert.Error(t, env.ActivatePlugin("foo", nil))
assert.Empty(t, env.ActivePluginIds())
}
@@ -200,7 +204,7 @@ func TestEnvironment_BadSearchPathError(t *testing.T) {
require.NoError(t, err)
defer env.Shutdown()
assert.Error(t, env.ActivatePlugin("foo"))
assert.Error(t, env.ActivatePlugin("foo", nil))
assert.Empty(t, env.ActivePluginIds())
}
@@ -244,7 +248,7 @@ func TestEnvironment_ActivatePluginErrors(t *testing.T) {
hooks.Mock = mock.Mock{}
provider.Mock = mock.Mock{}
setup()
assert.Error(t, env.ActivatePlugin("foo"))
assert.Error(t, env.ActivatePlugin("foo", nil))
assert.Empty(t, env.ActivePluginIds())
supervisor.AssertExpectations(t)
hooks.AssertExpectations(t)
@@ -285,7 +289,7 @@ func TestEnvironment_ShutdownError(t *testing.T) {
hooks.On("OnDeactivate").Return(fmt.Errorf("test error"))
assert.NoError(t, env.ActivatePlugin("foo"))
assert.NoError(t, env.ActivatePlugin("foo", nil))
assert.Equal(t, env.ActivePluginIds(), []string{"foo"})
assert.Len(t, env.Shutdown(), 2)
}
@@ -332,7 +336,7 @@ func TestEnvironment_ConcurrentHookInvocations(t *testing.T) {
}
})
assert.NoError(t, env.ActivatePlugin("foo"))
assert.NoError(t, env.ActivatePlugin("foo", nil))
rec := httptest.NewRecorder()
@@ -391,7 +395,7 @@ func TestEnvironment_HooksForPlugins(t *testing.T) {
Text: "bar",
}, nil)
assert.NoError(t, env.ActivatePlugin("foo"))
assert.NoError(t, env.ActivatePlugin("foo", nil))
assert.Equal(t, env.ActivePluginIds(), []string{"foo"})
resp, appErr, err := env.HooksForPlugin("foo").ExecuteCommand(&model.CommandArgs{

View File

@@ -174,6 +174,14 @@ func testSupervisor_PluginCrash(t *testing.T, sp SupervisorProviderFunc) {
bundle := model.BundleInfoForPath(dir)
supervisor, err := sp(bundle)
require.NoError(t, err)
var supervisorWaitErr error
supervisorWaitDone := make(chan bool, 1)
go func() {
supervisorWaitErr = supervisor.Wait()
close(supervisorWaitDone)
}()
require.NoError(t, supervisor.Start(&api))
failed := false
@@ -189,7 +197,21 @@ func testSupervisor_PluginCrash(t *testing.T, sp SupervisorProviderFunc) {
time.Sleep(time.Millisecond * 100)
}
assert.True(t, recovered)
select {
case <-supervisorWaitDone:
require.Fail(t, "supervisor.Wait() unexpectedly returned")
case <-time.After(500 * time.Millisecond):
}
require.NoError(t, supervisor.Stop())
select {
case <-supervisorWaitDone:
require.Nil(t, supervisorWaitErr)
case <-time.After(5000 * time.Millisecond):
require.Fail(t, "supervisor.Wait() failed to return")
}
}
// Crashed plugins should be relaunched at most three times.
@@ -239,6 +261,14 @@ func testSupervisor_PluginRepeatedlyCrash(t *testing.T, sp SupervisorProviderFun
bundle := model.BundleInfoForPath(dir)
supervisor, err := sp(bundle)
require.NoError(t, err)
var supervisorWaitErr error
supervisorWaitDone := make(chan bool, 1)
go func() {
supervisorWaitErr = supervisor.Wait()
close(supervisorWaitDone)
}()
require.NoError(t, supervisor.Start(&api))
for attempt := 1; attempt <= 4; attempt++ {
@@ -264,10 +294,19 @@ func testSupervisor_PluginRepeatedlyCrash(t *testing.T, sp SupervisorProviderFun
}
if attempt < 4 {
require.Nil(t, supervisorWaitErr)
require.True(t, recovered, "failed to recover after attempt %d", attempt)
} else {
require.False(t, recovered, "unexpectedly recovered after attempt %d", attempt)
}
}
select {
case <-supervisorWaitDone:
require.NotNil(t, supervisorWaitErr)
case <-time.After(500 * time.Millisecond):
require.Fail(t, "supervisor.Wait() failed to return after plugin crashed")
}
require.NoError(t, supervisor.Stop())
}

View File

@@ -32,6 +32,7 @@ type Supervisor struct {
cancel context.CancelFunc
newProcess func(context.Context) (Process, io.ReadWriteCloser, error)
pluginId string
pluginErr error
}
var _ plugin.Supervisor = (*Supervisor)(nil)
@@ -55,6 +56,13 @@ func (s *Supervisor) Start(api plugin.API) error {
}
}
// Waits for the supervisor to stop (on demand or of its own accord), returning any error that
// triggered the supervisor to stop.
func (s *Supervisor) Wait() error {
<-s.done
return s.pluginErr
}
// Stops the plugin.
func (s *Supervisor) Stop() error {
s.cancel()
@@ -70,7 +78,7 @@ func (s *Supervisor) Hooks() plugin.Hooks {
func (s *Supervisor) run(ctx context.Context, start chan<- error, api plugin.API) {
defer func() {
s.done <- true
close(s.done)
}()
done := ctx.Done()
for i := 0; i <= MaxProcessRestarts; i++ {
@@ -81,10 +89,11 @@ func (s *Supervisor) run(ctx context.Context, start chan<- error, api plugin.API
default:
start = nil
if i < MaxProcessRestarts {
mlog.Debug("Plugin terminated unexpectedly", mlog.String("plugin_id", s.pluginId))
mlog.Error("Plugin terminated unexpectedly", mlog.String("plugin_id", s.pluginId))
time.Sleep(time.Duration((1 + i*i)) * time.Second)
} else {
mlog.Debug("Plugin terminated unexpectedly too many times", mlog.String("plugin_id", s.pluginId), mlog.Int("max_process_restarts", MaxProcessRestarts))
s.pluginErr = fmt.Errorf("plugin terminated unexpectedly too many times")
mlog.Error("Plugin shutdown", mlog.String("plugin_id", s.pluginId), mlog.Int("max_process_restarts", MaxProcessRestarts), mlog.Err(s.pluginErr))
}
}
}

View File

@@ -7,6 +7,7 @@ package plugin
// type is only relevant to the server, and isn't used by the plugins themselves.
type Supervisor interface {
Start(API) error
Wait() error
Stop() error
Hooks() Hooks
}