mirror of
https://github.com/mattermost/mattermost.git
synced 2025-02-25 18:55:24 -06:00
* allow `Wait()`ing on the supervisor In the event the plugin supervisor shuts down a plugin for crashing too many times, the new `Wait()` interface allows the `ActivatePlugin` to accept a callback function to trigger when `supervisor.Wait()` returns. If the supervisor shuts down normally, this callback is invoked with a nil error, otherwise any error reported by the supervisor is passed along. * improve plugin activation/deactivation logic Avoid triggering activation of previously failed-to-start plugins just becase something in the configuration changed. Now, intelligently compare the global enable bit as well as the each individual plugin's enabled bit. * expose store to manipulate PluginStatuses * expose API to fetch plugin statuses * keep track of whether or not plugin sandboxing is supported * transition plugin statuses * restore error on plugin activation if already active * don't initialize test plugins until successfully loaded * emit websocket events when plugin statuses change * skip pruning if already initialized * MM-8622: maintain plugin statuses in memory Switch away from persisting plugin statuses to the database, and maintain in memory instead. This will be followed by a cluster interface to query the in-memory status of plugin statuses from all cluster nodes. At the same time, rename `cluster_discovery_id` on the `PluginStatus` model object to `cluster_id`. * MM-8622: aggregate plugin statuses across cluster * fetch cluster plugin statuses when emitting websocket notification * address unit test fixes after rebasing * relax (poor) racey unit test re: supervisor.Wait() * make store-mocks
177 lines
4.6 KiB
Go
177 lines
4.6 KiB
Go
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
|
// See License.txt for license information.
|
|
|
|
package rpcplugin
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/mattermost/mattermost-server/mlog"
|
|
"github.com/mattermost/mattermost-server/model"
|
|
"github.com/mattermost/mattermost-server/plugin"
|
|
)
|
|
|
|
const (
|
|
MaxProcessRestarts = 3
|
|
)
|
|
|
|
// Supervisor implements a plugin.Supervisor that launches the plugin in a separate process and
|
|
// communicates via RPC.
|
|
//
|
|
// If the plugin unexpectedly exits, the supervisor will relaunch it after a short delay, but will
|
|
// only restart a plugin at most three times.
|
|
type Supervisor struct {
|
|
hooks atomic.Value
|
|
done chan bool
|
|
cancel context.CancelFunc
|
|
newProcess func(context.Context) (Process, io.ReadWriteCloser, error)
|
|
pluginId string
|
|
pluginErr error
|
|
}
|
|
|
|
var _ plugin.Supervisor = (*Supervisor)(nil)
|
|
|
|
// Starts the plugin. This method will block until the plugin is successfully launched for the first
|
|
// time and will return an error if the plugin cannot be launched at all.
|
|
func (s *Supervisor) Start(api plugin.API) error {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
s.done = make(chan bool, 1)
|
|
start := make(chan error, 1)
|
|
go s.run(ctx, start, api)
|
|
|
|
select {
|
|
case <-time.After(time.Second * 3):
|
|
cancel()
|
|
<-s.done
|
|
return fmt.Errorf("timed out waiting for plugin")
|
|
case err := <-start:
|
|
s.cancel = cancel
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Waits for the supervisor to stop (on demand or of its own accord), returning any error that
|
|
// triggered the supervisor to stop.
|
|
func (s *Supervisor) Wait() error {
|
|
<-s.done
|
|
return s.pluginErr
|
|
}
|
|
|
|
// Stops the plugin.
|
|
func (s *Supervisor) Stop() error {
|
|
s.cancel()
|
|
<-s.done
|
|
return nil
|
|
}
|
|
|
|
// Returns the hooks used to communicate with the plugin. The hooks may change if the plugin is
|
|
// restarted, so the return value should not be cached.
|
|
func (s *Supervisor) Hooks() plugin.Hooks {
|
|
return s.hooks.Load().(plugin.Hooks)
|
|
}
|
|
|
|
func (s *Supervisor) run(ctx context.Context, start chan<- error, api plugin.API) {
|
|
defer func() {
|
|
close(s.done)
|
|
}()
|
|
done := ctx.Done()
|
|
for i := 0; i <= MaxProcessRestarts; i++ {
|
|
s.runPlugin(ctx, start, api)
|
|
select {
|
|
case <-done:
|
|
return
|
|
default:
|
|
start = nil
|
|
if i < MaxProcessRestarts {
|
|
mlog.Error("Plugin terminated unexpectedly", mlog.String("plugin_id", s.pluginId))
|
|
time.Sleep(time.Duration((1 + i*i)) * time.Second)
|
|
} else {
|
|
s.pluginErr = fmt.Errorf("plugin terminated unexpectedly too many times")
|
|
mlog.Error("Plugin shutdown", mlog.String("plugin_id", s.pluginId), mlog.Int("max_process_restarts", MaxProcessRestarts), mlog.Err(s.pluginErr))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Supervisor) runPlugin(ctx context.Context, start chan<- error, api plugin.API) error {
|
|
if start == nil {
|
|
mlog.Debug("Restarting plugin", mlog.String("plugin_id", s.pluginId))
|
|
}
|
|
|
|
p, ipc, err := s.newProcess(ctx)
|
|
if err != nil {
|
|
if start != nil {
|
|
start <- err
|
|
}
|
|
return err
|
|
}
|
|
|
|
muxer := NewMuxer(ipc, false)
|
|
closeMuxer := make(chan bool, 1)
|
|
muxerClosed := make(chan error, 1)
|
|
go func() {
|
|
select {
|
|
case <-ctx.Done():
|
|
break
|
|
case <-closeMuxer:
|
|
break
|
|
}
|
|
muxerClosed <- muxer.Close()
|
|
}()
|
|
|
|
hooks, err := ConnectMain(muxer, s.pluginId)
|
|
if err == nil {
|
|
err = hooks.OnActivate(api)
|
|
}
|
|
|
|
if err != nil {
|
|
if start != nil {
|
|
start <- err
|
|
}
|
|
closeMuxer <- true
|
|
<-muxerClosed
|
|
p.Wait()
|
|
return err
|
|
}
|
|
|
|
s.hooks.Store(hooks)
|
|
|
|
if start != nil {
|
|
start <- nil
|
|
}
|
|
p.Wait()
|
|
closeMuxer <- true
|
|
<-muxerClosed
|
|
|
|
return nil
|
|
}
|
|
|
|
func SupervisorProvider(bundle *model.BundleInfo) (plugin.Supervisor, error) {
|
|
return SupervisorWithNewProcessFunc(bundle, func(ctx context.Context) (Process, io.ReadWriteCloser, error) {
|
|
executable := filepath.Clean(filepath.Join(".", bundle.Manifest.Backend.Executable))
|
|
if strings.HasPrefix(executable, "..") {
|
|
return nil, nil, fmt.Errorf("invalid backend executable")
|
|
}
|
|
return NewProcess(ctx, filepath.Join(bundle.Path, executable))
|
|
})
|
|
}
|
|
|
|
func SupervisorWithNewProcessFunc(bundle *model.BundleInfo, newProcess func(context.Context) (Process, io.ReadWriteCloser, error)) (plugin.Supervisor, error) {
|
|
if bundle.Manifest == nil {
|
|
return nil, fmt.Errorf("no manifest available")
|
|
} else if bundle.Manifest.Backend == nil || bundle.Manifest.Backend.Executable == "" {
|
|
return nil, fmt.Errorf("no backend executable specified")
|
|
}
|
|
executable := filepath.Clean(filepath.Join(".", bundle.Manifest.Backend.Executable))
|
|
if strings.HasPrefix(executable, "..") {
|
|
return nil, fmt.Errorf("invalid backend executable")
|
|
}
|
|
return &Supervisor{pluginId: bundle.Manifest.Id, newProcess: newProcess}, nil
|
|
}
|