mirror of
https://github.com/mattermost/mattermost.git
synced 2025-02-25 18:55:24 -06:00
* MM-37165: Fix improper plugin shutdown
This was caught from a race test failure. While the failure manifested due to a log being
written from a test after the test exited, the real reason was hidden further deeper.
What was happening is that the server would always listen for plugin requests in a
separate goroutine via `g.muxBroker.AcceptAndServe` in the `OnActivate` hook. But the
plugin shutdown process would just close the plugin connections and move on, leading
to the classic case of improperly shut down goroutines.
When this happened, an opportunity opens up in a way that the server
would still be executing a request whereas the main goroutine and therefore the parent
test has already finished. This would lead to an error like
```
{"level":"error","ts":1626451258.4141135,"caller":"mlog/sugar.go:25","msg":"pluginAPI scheduleOnce poller encountered an error but is still polling","plugin_id":"com.mattermost.plugin-incident-management","error":"ListPluginKeys: Unable to list all the plugin keys., failed to get PluginKeyValues with pluginId=com.mattermost.plugin-incident-management: sql: database is closed
```
And now, this finally calls `mlog.Error`, which finally triggers our race condition :)
To fix this, we use some basic synchronization via waitgroups and just wait for it
to finish after closing the plugin process.
https://mattermost.atlassian.net/browse/MM-37165
```release-note
NONE
```
* gofmt
```release-note
NONE
```
* split waitgroup additions
```release-note
NONE
```
161 lines
3.7 KiB
Go
161 lines
3.7 KiB
Go
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
|
// See LICENSE.txt for license information.
|
|
|
|
package plugin
|
|
|
|
import (
|
|
"fmt"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
plugin "github.com/hashicorp/go-plugin"
|
|
|
|
"github.com/mattermost/mattermost-server/v6/einterfaces"
|
|
"github.com/mattermost/mattermost-server/v6/model"
|
|
"github.com/mattermost/mattermost-server/v6/shared/mlog"
|
|
)
|
|
|
|
type supervisor struct {
|
|
lock sync.RWMutex
|
|
client *plugin.Client
|
|
hooks Hooks
|
|
implemented [TotalHooksID]bool
|
|
pid int
|
|
hooksClient *hooksRPCClient
|
|
}
|
|
|
|
func newSupervisor(pluginInfo *model.BundleInfo, apiImpl API, driver Driver, parentLogger *mlog.Logger, metrics einterfaces.MetricsInterface) (retSupervisor *supervisor, retErr error) {
|
|
sup := supervisor{}
|
|
defer func() {
|
|
if retErr != nil {
|
|
sup.Shutdown()
|
|
}
|
|
}()
|
|
|
|
wrappedLogger := pluginInfo.WrapLogger(parentLogger)
|
|
|
|
hclogAdaptedLogger := &hclogAdapter{
|
|
wrappedLogger: wrappedLogger.WithCallerSkip(1),
|
|
extrasKey: "wrapped_extras",
|
|
}
|
|
|
|
pluginMap := map[string]plugin.Plugin{
|
|
"hooks": &hooksPlugin{
|
|
log: wrappedLogger,
|
|
driverImpl: driver,
|
|
apiImpl: &apiTimerLayer{pluginInfo.Manifest.Id, apiImpl, metrics},
|
|
},
|
|
}
|
|
|
|
executable := filepath.Clean(filepath.Join(
|
|
".",
|
|
pluginInfo.Manifest.GetExecutableForRuntime(runtime.GOOS, runtime.GOARCH),
|
|
))
|
|
if strings.HasPrefix(executable, "..") {
|
|
return nil, fmt.Errorf("invalid backend executable")
|
|
}
|
|
executable = filepath.Join(pluginInfo.Path, executable)
|
|
|
|
cmd := exec.Command(executable)
|
|
|
|
sup.client = plugin.NewClient(&plugin.ClientConfig{
|
|
HandshakeConfig: handshake,
|
|
Plugins: pluginMap,
|
|
Cmd: cmd,
|
|
SyncStdout: wrappedLogger.With(mlog.String("source", "plugin_stdout")).StdLogWriter(),
|
|
SyncStderr: wrappedLogger.With(mlog.String("source", "plugin_stderr")).StdLogWriter(),
|
|
Logger: hclogAdaptedLogger,
|
|
StartTimeout: time.Second * 3,
|
|
})
|
|
|
|
rpcClient, err := sup.client.Client()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
sup.pid = cmd.Process.Pid
|
|
|
|
raw, err := rpcClient.Dispense("hooks")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
c, ok := raw.(*hooksRPCClient)
|
|
if ok {
|
|
sup.hooksClient = c
|
|
}
|
|
|
|
sup.hooks = &hooksTimerLayer{pluginInfo.Manifest.Id, raw.(Hooks), metrics}
|
|
|
|
impl, err := sup.hooks.Implemented()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, hookName := range impl {
|
|
if hookId, ok := hookNameToId[hookName]; ok {
|
|
sup.implemented[hookId] = true
|
|
}
|
|
}
|
|
|
|
return &sup, nil
|
|
}
|
|
|
|
func (sup *supervisor) Shutdown() {
|
|
sup.lock.RLock()
|
|
defer sup.lock.RUnlock()
|
|
if sup.client != nil {
|
|
sup.client.Kill()
|
|
}
|
|
|
|
// Wait for API RPC server and DB RPC server to exit.
|
|
if sup.hooksClient != nil {
|
|
sup.hooksClient.doneWg.Wait()
|
|
}
|
|
}
|
|
|
|
func (sup *supervisor) Hooks() Hooks {
|
|
sup.lock.RLock()
|
|
defer sup.lock.RUnlock()
|
|
return sup.hooks
|
|
}
|
|
|
|
// PerformHealthCheck checks the plugin through an an RPC ping.
|
|
func (sup *supervisor) PerformHealthCheck() error {
|
|
// No need for a lock here because Ping is read-locked.
|
|
if pingErr := sup.Ping(); pingErr != nil {
|
|
for pingFails := 1; pingFails < HealthCheckPingFailLimit; pingFails++ {
|
|
pingErr = sup.Ping()
|
|
if pingErr == nil {
|
|
break
|
|
}
|
|
}
|
|
if pingErr != nil {
|
|
return fmt.Errorf("plugin RPC connection is not responding")
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Ping checks that the RPC connection with the plugin is alive and healthy.
|
|
func (sup *supervisor) Ping() error {
|
|
sup.lock.RLock()
|
|
defer sup.lock.RUnlock()
|
|
client, err := sup.client.Client()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return client.Ping()
|
|
}
|
|
|
|
func (sup *supervisor) Implements(hookId int) bool {
|
|
sup.lock.RLock()
|
|
defer sup.lock.RUnlock()
|
|
return sup.implemented[hookId]
|
|
}
|