MM-57018: support reattaching plugins (#26421)

* ProfileImageBytes for EnsureBotOptions

* leverage plugintest.NewAPI

* fix linting

* add UpdateUserRoles to plugin api

* MM-57018: support reattaching plugins

Expose a local-only API for reattaching plugins: instead of the server starting and managing the process itself, allow the plugin to be launched externally (eg within a unit test) and reattach to an existing server instance to provide the unit test with a fully functional RPC API, sidestepping the need for mocking the plugin API in most cases.

In the future, this may become the basis for running plugins in a sidecar container.

Fixes: https://mattermost.atlassian.net/browse/MM-57018

* drop unused supervisor.pid

* factor out checkMinServerVersion

* factor out startPluginServer

* restore missing setPluginState on successful reattach

* avoid passing around a stale registeredPlugin

* inline initializePluginImplementation

* have IsValid return an error

* explicitly close rpcClient

In the case of reattached plugins, the Unix socket won't necessarily disappear leaving the muxBrokers blocked indefinitely. And `Kill()` doesn't do anything if there's no process being managed.

* explicitly detachPlugin

* emphasize gRPC not being supported

---------

Co-authored-by: Mattermost Build <build@mattermost.com>
This commit is contained in:
Jesse Hallam 2024-04-11 11:10:25 -04:00 committed by GitHub
parent f5ea554c96
commit 2230fb6f5f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 582 additions and 72 deletions

View File

@ -3,6 +3,13 @@
package api4
import (
"encoding/json"
"net/http"
"github.com/mattermost/mattermost/server/public/model"
)
func (api *API) InitPluginLocal() {
api.BaseRoutes.Plugins.Handle("", api.APILocal(uploadPlugin, handlerParamFileAPI)).Methods("POST")
api.BaseRoutes.Plugins.Handle("", api.APILocal(getPlugins)).Methods("GET")
@ -12,4 +19,44 @@ func (api *API) InitPluginLocal() {
api.BaseRoutes.Plugin.Handle("/disable", api.APILocal(disablePlugin)).Methods("POST")
api.BaseRoutes.Plugins.Handle("/marketplace", api.APILocal(installMarketplacePlugin)).Methods("POST")
api.BaseRoutes.Plugins.Handle("/marketplace", api.APILocal(getMarketplacePlugins)).Methods("GET")
api.BaseRoutes.Plugins.Handle("/reattach", api.APILocal(reattachPlugin)).Methods("POST")
api.BaseRoutes.Plugin.Handle("/detach", api.APILocal(detachPlugin)).Methods("POST")
}
// reattachPlugin allows the server to bind to an existing plugin instance launched elsewhere.
//
// This API is only exposed over a local socket.
func reattachPlugin(c *Context, w http.ResponseWriter, r *http.Request) {
var pluginReattachRequest model.PluginReattachRequest
if err := json.NewDecoder(r.Body).Decode(&pluginReattachRequest); err != nil {
c.Err = model.NewAppError("reattachPlugin", "api4.plugin.reattachPlugin.invalid_request", nil, err.Error(), http.StatusBadRequest)
return
}
if err := pluginReattachRequest.IsValid(); err != nil {
c.Err = err
return
}
err := c.App.ReattachPlugin(pluginReattachRequest.Manifest, pluginReattachRequest.PluginReattachConfig)
if err != nil {
c.Err = err
return
}
}
// detachPlugin detaches a previously reattached plugin.
//
// This API is only exposed over a local socket.
func detachPlugin(c *Context, w http.ResponseWriter, r *http.Request) {
c.RequirePluginId()
if c.Err != nil {
return
}
err := c.App.DetachPlugin(c.Params.PluginId)
if err != nil {
c.Err = err
return
}
}

View File

@ -129,6 +129,8 @@ type AppIface interface {
// DemoteUserToGuest Convert user's roles and all his membership's roles from
// regular user roles to guest roles.
DemoteUserToGuest(c request.CTX, user *model.User) *model.AppError
// DetachPlugin allows the server to bind to an existing plugin instance launched elsewhere.
DetachPlugin(pluginId string) *model.AppError
// DisablePlugin will set the config for an installed plugin to disabled, triggering deactivation if active.
// Notifies cluster peers through config change.
DisablePlugin(id string) *model.AppError
@ -303,6 +305,8 @@ type AppIface interface {
// PromoteGuestToUser Convert user's roles and all his membership's roles from
// guest roles to regular user roles.
PromoteGuestToUser(c request.CTX, user *model.User, requestorId string) *model.AppError
// ReattachPlugin allows the server to bind to an existing plugin instance launched elsewhere.
ReattachPlugin(manifest *model.Manifest, pluginReattachConfig *model.PluginReattachConfig) *model.AppError
// Removes a listener function by the unique ID returned when AddConfigListener was called
RemoveConfigListener(id string)
// RenameChannel is used to rename the channel Name and the DisplayName fields

View File

@ -3741,6 +3741,28 @@ func (a *OpenTracingAppLayer) DemoteUserToGuest(c request.CTX, user *model.User)
return resultVar0
}
func (a *OpenTracingAppLayer) DetachPlugin(pluginId string) *model.AppError {
origCtx := a.ctx
span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.DetachPlugin")
a.ctx = newCtx
a.app.Srv().Store().SetContext(newCtx)
defer func() {
a.app.Srv().Store().SetContext(origCtx)
a.ctx = origCtx
}()
defer span.Finish()
resultVar0 := a.app.DetachPlugin(pluginId)
if resultVar0 != nil {
span.LogFields(spanlog.Error(resultVar0))
ext.Error.Set(span, true)
}
return resultVar0
}
func (a *OpenTracingAppLayer) DisableAutoResponder(rctx request.CTX, userID string, asAdmin bool) *model.AppError {
origCtx := a.ctx
span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.DisableAutoResponder")
@ -13945,6 +13967,28 @@ func (a *OpenTracingAppLayer) ReadFile(path string) ([]byte, *model.AppError) {
return resultVar0, resultVar1
}
func (a *OpenTracingAppLayer) ReattachPlugin(manifest *model.Manifest, pluginReattachConfig *model.PluginReattachConfig) *model.AppError {
origCtx := a.ctx
span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.ReattachPlugin")
a.ctx = newCtx
a.app.Srv().Store().SetContext(newCtx)
defer func() {
a.app.Srv().Store().SetContext(origCtx)
a.ctx = origCtx
}()
defer span.Finish()
resultVar0 := a.app.ReattachPlugin(manifest, pluginReattachConfig)
if resultVar0 != nil {
span.LogFields(spanlog.Error(resultVar0))
ext.Error.Set(span, true)
}
return resultVar0
}
func (a *OpenTracingAppLayer) RecycleDatabaseConnection(rctx request.CTX) {
origCtx := a.ctx
span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.RecycleDatabaseConnection")

View File

@ -0,0 +1,50 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package app
import (
"net/http"
"github.com/mattermost/mattermost/server/public/model"
)
// ReattachPlugin allows the server to bind to an existing plugin instance launched elsewhere.
func (a *App) ReattachPlugin(manifest *model.Manifest, pluginReattachConfig *model.PluginReattachConfig) *model.AppError {
return a.ch.ReattachPlugin(manifest, pluginReattachConfig)
}
// ReattachPlugin allows the server to bind to an existing plugin instance launched elsewhere.
func (ch *Channels) ReattachPlugin(manifest *model.Manifest, pluginReattachConfig *model.PluginReattachConfig) *model.AppError {
pluginsEnvironment := ch.GetPluginsEnvironment()
if pluginsEnvironment == nil {
return model.NewAppError("ReattachPlugin", "app.plugin.disabled.app_error", nil, "", http.StatusNotImplemented)
}
ch.DetachPlugin(manifest.Id)
// Reattach to the plugin
if err := pluginsEnvironment.Reattach(manifest, pluginReattachConfig); err != nil {
return model.NewAppError("ReattachPlugin", "app.plugin.reattach.app_error", nil, "", http.StatusInternalServerError).Wrap(err)
}
return nil
}
// DetachPlugin allows the server to bind to an existing plugin instance launched elsewhere.
func (a *App) DetachPlugin(pluginId string) *model.AppError {
return a.ch.DetachPlugin(pluginId)
}
// DetachPlugin allows the server to bind to an existing plugin instance launched elsewhere.
func (ch *Channels) DetachPlugin(pluginID string) *model.AppError {
pluginsEnvironment := ch.GetPluginsEnvironment()
if pluginsEnvironment == nil {
return model.NewAppError("DetachPlugin", "app.plugin.disabled.app_error", nil, "", http.StatusNotImplemented)
}
// Deactivate and remove any existing plugin, if present.
pluginsEnvironment.Deactivate(pluginID)
pluginsEnvironment.RemovePlugin(pluginID)
return nil
}

View File

@ -4614,6 +4614,10 @@
"id": "api.websocket_handler.server_busy.app_error",
"translation": "Server is busy, non-critical services are temporarily unavailable."
},
{
"id": "api4.plugin.reattachPlugin.invalid_request",
"translation": "Failed to parse request"
},
{
"id": "app.acknowledgement.delete.app_error",
"translation": "Unable to delete acknowledgement."
@ -6290,6 +6294,10 @@
"id": "app.plugin.not_installed.app_error",
"translation": "Plugin is not installed."
},
{
"id": "app.plugin.reattach.app_error",
"translation": "Failed to reattach plugin"
},
{
"id": "app.plugin.remove.app_error",
"translation": "Unable to delete plugin."
@ -10142,6 +10150,14 @@
"id": "plugin_api.send_mail.missing_to",
"translation": "Missing TO address."
},
{
"id": "plugin_reattach_request.is_valid.manifest.app_error",
"translation": "Missing manifest"
},
{
"id": "plugin_reattach_request.is_valid.plugin_reattach_config.app_error",
"translation": "Missing plugin reattach config"
},
{
"id": "searchengine.bleve.disabled.error",
"translation": "Error purging Bleve indexes: engine is disabled"

View File

@ -7352,6 +7352,36 @@ func (c *Client4) InstallMarketplacePlugin(ctx context.Context, request *Install
return &m, BuildResponse(r), nil
}
// ReattachPlugin asks the server to reattach to a plugin launched by another process.
//
// Only available in local mode, and currently only used for testing.
func (c *Client4) ReattachPlugin(ctx context.Context, request *PluginReattachRequest) (*Response, error) {
buf, err := json.Marshal(request)
if err != nil {
return nil, NewAppError("ReattachPlugin", "api.marshal_error", nil, "", http.StatusInternalServerError).Wrap(err)
}
r, err := c.DoAPIPost(ctx, c.pluginsRoute()+"/reattach", string(buf))
if err != nil {
return BuildResponse(r), err
}
defer closeBody(r)
return BuildResponse(r), nil
}
// DetachPlugin detaches a previously reattached plugin.
//
// Only available in local mode, and currently only used for testing.
func (c *Client4) DetachPlugin(ctx context.Context, pluginID string) (*Response, error) {
r, err := c.DoAPIPost(ctx, c.pluginRoute(pluginID)+"/detach", "")
if err != nil {
return BuildResponse(r), err
}
defer closeBody(r)
return BuildResponse(r), nil
}
// GetPlugins will return a list of plugin manifests for currently active plugins.
func (c *Client4) GetPlugins(ctx context.Context) (*PluginsResponse, *Response, error) {
r, err := c.DoAPIGet(ctx, c.pluginsRoute(), "")

View File

@ -0,0 +1,59 @@
package model
import (
"net"
"net/http"
"github.com/hashicorp/go-plugin"
)
// PluginReattachConfig is a serializable version of go-plugin's ReattachConfig.
type PluginReattachConfig struct {
Protocol string
ProtocolVersion int
Addr net.UnixAddr
Pid int
Test bool
}
func NewPluginReattachConfig(pluginReattachmentConfig *plugin.ReattachConfig) *PluginReattachConfig {
return &PluginReattachConfig{
Protocol: string(pluginReattachmentConfig.Protocol),
ProtocolVersion: pluginReattachmentConfig.ProtocolVersion,
Addr: net.UnixAddr{
Name: pluginReattachmentConfig.Addr.String(),
Net: pluginReattachmentConfig.Addr.Network(),
},
Pid: pluginReattachmentConfig.Pid,
Test: pluginReattachmentConfig.Test,
}
}
func (prc *PluginReattachConfig) ToHashicorpPluginReattachmentConfig() *plugin.ReattachConfig {
addr := prc.Addr
return &plugin.ReattachConfig{
Protocol: plugin.Protocol(prc.Protocol),
ProtocolVersion: prc.ProtocolVersion,
Addr: &addr,
Pid: prc.Pid,
ReattachFunc: nil,
Test: prc.Test,
}
}
type PluginReattachRequest struct {
Manifest *Manifest
PluginReattachConfig *PluginReattachConfig
}
func (prr *PluginReattachRequest) IsValid() *AppError {
if prr.Manifest == nil {
return NewAppError("PluginReattachRequest.IsValid", "plugin_reattach_request.is_valid.manifest.app_error", nil, "", http.StatusBadRequest)
}
if prr.PluginReattachConfig == nil {
return NewAppError("PluginReattachRequest.IsValid", "plugin_reattach_request.is_valid.plugin_reattach_config.app_error", nil, "", http.StatusBadRequest)
}
return nil
}

View File

@ -4,6 +4,8 @@
package plugin
import (
"context"
"github.com/hashicorp/go-plugin"
)
@ -12,10 +14,51 @@ const (
BotUserKey = InternalKeyPrefix + "botid"
)
// Starts the serving of a Mattermost plugin over net/rpc. gRPC is not yet supported.
// WithTestContext provides a context typically used to terminate a plugin from a unit test.
func WithTestContext(ctx context.Context) func(*plugin.ServeConfig) error {
return func(config *plugin.ServeConfig) error {
if config.Test == nil {
config.Test = &plugin.ServeTestConfig{}
}
config.Test.Context = ctx
return nil
}
}
// WithTestReattachConfigCh configures the channel to receive the ReattachConfig used to reattach
// an externally launched plugin instance with the Mattermost server.
func WithTestReattachConfigCh(reattachConfigCh chan<- *plugin.ReattachConfig) func(*plugin.ServeConfig) error {
return func(config *plugin.ServeConfig) error {
if config.Test == nil {
config.Test = &plugin.ServeTestConfig{}
}
config.Test.ReattachConfigCh = reattachConfigCh
return nil
}
}
// WithTestCloseCh provides a channel that signals when the plugin exits.
func WithTestCloseCh(closeCh chan<- struct{}) func(*plugin.ServeConfig) error {
return func(config *plugin.ServeConfig) error {
if config.Test == nil {
config.Test = &plugin.ServeTestConfig{}
}
config.Test.CloseCh = closeCh
return nil
}
}
// Starts the serving of a Mattermost plugin over net/rpc. gRPC is not supported.
//
// Call this when your plugin is ready to start.
func ClientMain(pluginImplementation any) {
// Call this when your plugin is ready to start. Options allow configuring plugins for testing
// scenarios.
func ClientMain(pluginImplementation any, opts ...func(config *plugin.ServeConfig) error) {
impl, ok := pluginImplementation.(interface {
SetAPI(api API)
SetDriver(driver Driver)
@ -30,10 +73,19 @@ func ClientMain(pluginImplementation any) {
"hooks": &hooksPlugin{hooks: pluginImplementation},
}
plugin.Serve(&plugin.ServeConfig{
serveConfig := &plugin.ServeConfig{
HandshakeConfig: handshake,
Plugins: pluginMap,
})
}
for _, opt := range opts {
err := opt(serveConfig)
if err != nil {
panic("failed to start serving plugin: " + err.Error())
}
}
plugin.Serve(serveConfig)
}
type MattermostPlugin struct {

View File

@ -11,6 +11,7 @@ import (
"sync"
"time"
plugin "github.com/hashicorp/go-plugin"
"github.com/pkg/errors"
"github.com/mattermost/mattermost/server/public/model"
@ -196,6 +197,15 @@ func (env *Environment) setPluginState(id string, state int) {
}
}
// setPluginSupervisor records the supervisor for a registered plugin.
func (env *Environment) setPluginSupervisor(id string, supervisor *supervisor) {
if rp, ok := env.registeredPlugins.Load(id); ok {
p := rp.(registeredPlugin)
p.supervisor = supervisor
env.registeredPlugins.Store(id, p)
}
}
// PublicFilesPath returns a path and true if the plugin with the given id is active.
// It returns an empty string and false if the path is not set or invalid
func (env *Environment) PublicFilesPath(id string) (string, error) {
@ -254,6 +264,46 @@ func (env *Environment) GetManifest(pluginId string) (*model.Manifest, error) {
return nil, ErrNotFound
}
func checkMinServerVersion(pluginInfo *model.BundleInfo) error {
if pluginInfo.Manifest.MinServerVersion == "" {
return nil
}
fulfilled, err := pluginInfo.Manifest.MeetMinServerVersion(model.CurrentVersion)
if err != nil {
return fmt.Errorf("%v: %v", err.Error(), pluginInfo.Manifest.Id)
}
if !fulfilled {
return fmt.Errorf("plugin requires Mattermost %v: %v", pluginInfo.Manifest.MinServerVersion, pluginInfo.Manifest.Id)
}
return nil
}
func (env *Environment) startPluginServer(pluginInfo *model.BundleInfo, opts ...func(*supervisor, *plugin.ClientConfig) error) error {
sup, err := newSupervisor(pluginInfo, env.newAPIImpl(pluginInfo.Manifest), env.dbDriver, env.logger, env.metrics, opts...)
if err != nil {
return errors.Wrapf(err, "unable to start plugin: %v", pluginInfo.Manifest.Id)
}
// We pre-emptively set the state to running to prevent re-entrancy issues.
// The plugin's OnActivate hook can in-turn call UpdateConfiguration
// which again calls this method. This method is guarded against multiple calls,
// but fails if it is called recursively.
//
// Therefore, setting the state to running prevents this from happening,
// and in case there is an error, the defer clause will set the proper state anyways.
env.setPluginState(pluginInfo.Manifest.Id, model.PluginStateRunning)
if err := sup.Hooks().OnActivate(); err != nil {
sup.Shutdown()
return err
}
env.setPluginSupervisor(pluginInfo.Manifest.Id, sup)
return nil
}
func (env *Environment) Activate(id string) (manifest *model.Manifest, activated bool, reterr error) {
defer func() {
if reterr != nil {
@ -296,20 +346,16 @@ func (env *Environment) Activate(id string) (manifest *model.Manifest, activated
}
}()
if pluginInfo.Manifest.MinServerVersion != "" {
fulfilled, err := pluginInfo.Manifest.MeetMinServerVersion(model.CurrentVersion)
if err != nil {
return nil, false, fmt.Errorf("%v: %v", err.Error(), id)
}
if !fulfilled {
return nil, false, fmt.Errorf("plugin requires Mattermost %v: %v", pluginInfo.Manifest.MinServerVersion, id)
}
err = checkMinServerVersion(pluginInfo)
if err != nil {
return nil, false, err
}
componentActivated := false
if pluginInfo.Manifest.HasWebapp() {
updatedManifest, err := env.UnpackWebappBundle(id)
var updatedManifest *model.Manifest
updatedManifest, err = env.UnpackWebappBundle(id)
if err != nil {
return nil, false, errors.Wrapf(err, "unable to generate webapp bundle: %v", id)
}
@ -319,27 +365,10 @@ func (env *Environment) Activate(id string) (manifest *model.Manifest, activated
}
if pluginInfo.Manifest.HasServer() {
sup, err := newSupervisor(pluginInfo, env.newAPIImpl(pluginInfo.Manifest), env.dbDriver, env.logger, env.metrics)
err = env.startPluginServer(pluginInfo, WithExecutableFromManifest(pluginInfo))
if err != nil {
return nil, false, errors.Wrapf(err, "unable to start plugin: %v", id)
}
// We pre-emptively set the state to running to prevent re-entrancy issues.
// The plugin's OnActivate hook can in-turn call UpdateConfiguration
// which again calls this method. This method is guarded against multiple calls,
// but fails if it is called recursively.
//
// Therefore, setting the state to running prevents this from happening,
// and in case there is an error, the defer clause will set the proper state anyways.
env.setPluginState(id, model.PluginStateRunning)
if err := sup.Hooks().OnActivate(); err != nil {
sup.Shutdown()
return nil, false, err
}
rp.supervisor = sup
env.registeredPlugins.Store(id, rp)
componentActivated = true
}
@ -352,6 +381,64 @@ func (env *Environment) Activate(id string) (manifest *model.Manifest, activated
return pluginInfo.Manifest, true, nil
}
// Reattach allows the server to bind to an existing plugin instance launched elsewhere.
func (env *Environment) Reattach(manifest *model.Manifest, pluginReattachConfig *model.PluginReattachConfig) (reterr error) {
id := manifest.Id
defer func() {
if reterr != nil {
env.SetPluginError(id, reterr.Error())
} else {
env.SetPluginError(id, "")
}
}()
// Check if we are already active
if env.IsActive(id) {
return nil
}
pluginInfo := &model.BundleInfo{
Path: "",
Manifest: manifest,
ManifestPath: "",
ManifestError: nil,
}
rp := newRegisteredPlugin(pluginInfo)
env.registeredPlugins.Store(id, rp)
defer func() {
if reterr == nil {
env.setPluginState(id, model.PluginStateRunning)
} else {
env.setPluginState(id, model.PluginStateFailedToStart)
}
}()
err := checkMinServerVersion(pluginInfo)
if err != nil {
return nil
}
if !pluginInfo.Manifest.HasServer() {
return errors.New("cannot reattach plugin without server component")
}
if pluginInfo.Manifest.HasWebapp() {
env.logger.Warn("Ignoring webapp for reattached plugin", mlog.String("plugin_id", id))
}
err = env.startPluginServer(pluginInfo, WithReattachConfig(pluginReattachConfig))
if err != nil {
return nil
}
mlog.Debug("Plugin reattached", mlog.String("plugin_id", pluginInfo.Manifest.Id), mlog.String("version", pluginInfo.Manifest.Version))
return nil
}
func (env *Environment) RemovePlugin(id string) {
if _, ok := env.registeredPlugins.Load(id); ok {
env.registeredPlugins.Delete(id)

View File

@ -52,7 +52,7 @@ func testPluginHealthCheckSuccess(t *testing.T) {
bundle := model.BundleInfoForPath(dir)
logger := mlog.CreateConsoleTestLogger(t)
supervisor, err := newSupervisor(bundle, nil, nil, logger, nil)
supervisor, err := newSupervisor(bundle, nil, nil, logger, nil, WithExecutableFromManifest(bundle))
require.NoError(t, err)
require.NotNil(t, supervisor)
defer supervisor.Shutdown()
@ -93,7 +93,7 @@ func testPluginHealthCheckPanic(t *testing.T) {
bundle := model.BundleInfoForPath(dir)
logger := mlog.CreateConsoleTestLogger(t)
supervisor, err := newSupervisor(bundle, nil, nil, logger, nil)
supervisor, err := newSupervisor(bundle, nil, nil, logger, nil, WithExecutableFromManifest(bundle))
require.NoError(t, err)
require.NotNil(t, supervisor)
defer supervisor.Shutdown()

View File

@ -0,0 +1,85 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package plugintest_test
import (
"context"
"os"
"testing"
"time"
"github.com/stretchr/testify/require"
goPlugin "github.com/hashicorp/go-plugin"
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/plugin"
)
type UnitTestedPlugin struct {
plugin.MattermostPlugin
}
// This example demonstrates a plugin that's launched during a unit test and reattached to an
// existing server instance to obtain a real PluginAPI.
func Example_unitTestingPlugins() {
t := &testing.T{}
// The manifest is usually generated dynamically.
manifest := &model.Manifest{
Id: "reattach-plugin-test",
}
// ctx, and specifically cancel, gives us control over the plugin lifecycle
ctx, cancel := context.WithCancel(context.Background())
// reattachConfigCh is the means by which we get the Unix socket information to relay back
// to the server and finish the reattachment.
reattachConfigCh := make(chan *goPlugin.ReattachConfig)
// closeCh tells us when the plugin exits and allows for cleanup.
closeCh := make(chan struct{})
// plugin.ClientMain with options allows for reattachment.
go plugin.ClientMain(
&UnitTestedPlugin{},
plugin.WithTestContext(ctx),
plugin.WithTestReattachConfigCh(reattachConfigCh),
plugin.WithTestCloseCh(closeCh),
)
// Make sure the plugin shuts down normally with the test
t.Cleanup(func() {
cancel()
select {
case <-closeCh:
case <-time.After(5 * time.Second):
panic("plugin failed to close after 5 seconds")
}
})
// Wait for the plugin to start and then reattach to the server.
var reattachConfig *goPlugin.ReattachConfig
select {
case reattachConfig = <-reattachConfigCh:
case <-time.After(5 * time.Second):
t.Fatal("failed to get reattach config")
}
// Reattaching requires a local mode client.
socketPath := os.Getenv("MM_LOCALSOCKETPATH")
if socketPath == "" {
socketPath = model.LocalModeSocketPath
}
clientLocal := model.NewAPIv4SocketClient(socketPath)
_, err := clientLocal.ReattachPlugin(ctx, &model.PluginReattachRequest{
Manifest: manifest,
PluginReattachConfig: model.NewPluginReattachConfig(reattachConfig),
})
require.NoError(t, err)
// At this point, the plugin is ready for unit testing and will be cleaned up automatically
// with the testing.T instance.
}

View File

@ -23,15 +23,58 @@ import (
)
type supervisor struct {
lock sync.RWMutex
client *plugin.Client
hooks Hooks
implemented [TotalHooksID]bool
pid int
hooksClient *hooksRPCClient
lock sync.RWMutex
client *plugin.Client
hooks Hooks
implemented [TotalHooksID]bool
hooksClient *hooksRPCClient
isReattached bool
}
func newSupervisor(pluginInfo *model.BundleInfo, apiImpl API, driver Driver, parentLogger *mlog.Logger, metrics metricsInterface) (retSupervisor *supervisor, retErr error) {
func WithExecutableFromManifest(pluginInfo *model.BundleInfo) func(*supervisor, *plugin.ClientConfig) error {
return func(_ *supervisor, clientConfig *plugin.ClientConfig) error {
executable := pluginInfo.Manifest.GetExecutableForRuntime(runtime.GOOS, runtime.GOARCH)
if executable == "" {
return fmt.Errorf("backend executable not found for environment: %s/%s", runtime.GOOS, runtime.GOARCH)
}
executable = filepath.Clean(filepath.Join(".", executable))
if strings.HasPrefix(executable, "..") {
return fmt.Errorf("invalid backend executable: %s", executable)
}
executable = filepath.Join(pluginInfo.Path, executable)
cmd := exec.Command(executable)
// This doesn't add more security than before
// but removes the SecureConfig is nil warning.
// https://mattermost.atlassian.net/browse/MM-49167
pluginChecksum, err := getPluginExecutableChecksum(executable)
if err != nil {
return errors.Wrapf(err, "unable to generate plugin checksum")
}
clientConfig.Cmd = cmd
clientConfig.SecureConfig = &plugin.SecureConfig{
Checksum: pluginChecksum,
Hash: sha256.New(),
}
return nil
}
}
func WithReattachConfig(pluginReattachConfig *model.PluginReattachConfig) func(*supervisor, *plugin.ClientConfig) error {
return func(sup *supervisor, clientConfig *plugin.ClientConfig) error {
clientConfig.Reattach = pluginReattachConfig.ToHashicorpPluginReattachmentConfig()
sup.isReattached = true
return nil
}
}
func newSupervisor(pluginInfo *model.BundleInfo, apiImpl API, driver Driver, parentLogger *mlog.Logger, metrics metricsInterface, opts ...func(*supervisor, *plugin.ClientConfig) error) (retSupervisor *supervisor, retErr error) {
sup := supervisor{}
defer func() {
if retErr != nil {
@ -54,49 +97,28 @@ func newSupervisor(pluginInfo *model.BundleInfo, apiImpl API, driver Driver, par
},
}
executable := pluginInfo.Manifest.GetExecutableForRuntime(runtime.GOOS, runtime.GOARCH)
if executable == "" {
return nil, fmt.Errorf("backend executable not found for environment: %s/%s", runtime.GOOS, runtime.GOARCH)
}
executable = filepath.Clean(filepath.Join(".", executable))
if strings.HasPrefix(executable, "..") {
return nil, fmt.Errorf("invalid backend executable: %s", executable)
}
executable = filepath.Join(pluginInfo.Path, executable)
cmd := exec.Command(executable)
// This doesn't add more security than before
// but removes the SecureConfig is nil warning.
// https://mattermost.atlassian.net/browse/MM-49167
pluginChecksum, err := getPluginExecutableChecksum(executable)
if err != nil {
return nil, errors.Wrapf(err, "unable to generate plugin checksum")
}
sup.client = plugin.NewClient(&plugin.ClientConfig{
clientConfig := &plugin.ClientConfig{
HandshakeConfig: handshake,
Plugins: pluginMap,
Cmd: cmd,
SyncStdout: wrappedLogger.With(mlog.String("source", "plugin_stdout")).StdLogWriter(),
SyncStderr: wrappedLogger.With(mlog.String("source", "plugin_stderr")).StdLogWriter(),
Logger: hclogAdaptedLogger,
StartTimeout: time.Second * 3,
SecureConfig: &plugin.SecureConfig{
Checksum: pluginChecksum,
Hash: sha256.New(),
},
})
}
for _, opt := range opts {
err := opt(&sup, clientConfig)
if err != nil {
return nil, errors.Wrap(err, "failed to apply option")
}
}
sup.client = plugin.NewClient(clientConfig)
rpcClient, err := sup.client.Client()
if err != nil {
return nil, err
}
sup.pid = cmd.Process.Pid
raw, err := rpcClient.Dispense("hooks")
if err != nil {
return nil, err
@ -126,6 +148,20 @@ func (sup *supervisor) Shutdown() {
sup.lock.RLock()
defer sup.lock.RUnlock()
if sup.client != nil {
// For reattached plugins, Kill() is mostly a no-op, so manually clean up the
// underlying rpcClient. This might be something to upstream unless we're doing
// something else wrong.
if sup.isReattached {
rpcClient, err := sup.client.Client()
if err != nil {
mlog.Warn("Failed to obtain rpcClient on Shutdown")
} else {
if err = rpcClient.Close(); err != nil {
mlog.Warn("Failed to close rpcClient on Shutdown")
}
}
}
sup.client.Kill()
}