mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Chore: Avoid stopping plugin (#74654)
This commit is contained in:
parent
8cb93bf3e7
commit
1a8a19a9ed
@ -496,13 +496,17 @@ type FakeBackendPlugin struct {
|
|||||||
Decommissioned bool
|
Decommissioned bool
|
||||||
Running bool
|
Running bool
|
||||||
|
|
||||||
|
// ExitedCheckDoneOrStopped is used to signal that the Exited() or Stop() method has been called.
|
||||||
|
ExitedCheckDoneOrStopped chan struct{}
|
||||||
|
|
||||||
mutex sync.RWMutex
|
mutex sync.RWMutex
|
||||||
backendplugin.Plugin
|
backendplugin.Plugin
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFakeBackendPlugin(managed bool) *FakeBackendPlugin {
|
func NewFakeBackendPlugin(managed bool) *FakeBackendPlugin {
|
||||||
return &FakeBackendPlugin{
|
return &FakeBackendPlugin{
|
||||||
Managed: managed,
|
Managed: managed,
|
||||||
|
ExitedCheckDoneOrStopped: make(chan struct{}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -519,6 +523,7 @@ func (p *FakeBackendPlugin) Stop(_ context.Context) error {
|
|||||||
defer p.mutex.Unlock()
|
defer p.mutex.Unlock()
|
||||||
p.Running = false
|
p.Running = false
|
||||||
p.StopCount++
|
p.StopCount++
|
||||||
|
go func() { p.ExitedCheckDoneOrStopped <- struct{}{} }()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -544,6 +549,7 @@ func (p *FakeBackendPlugin) IsManaged() bool {
|
|||||||
func (p *FakeBackendPlugin) Exited() bool {
|
func (p *FakeBackendPlugin) Exited() bool {
|
||||||
p.mutex.RLock()
|
p.mutex.RLock()
|
||||||
defer p.mutex.RUnlock()
|
defer p.mutex.RUnlock()
|
||||||
|
go func() { p.ExitedCheckDoneOrStopped <- struct{}{} }()
|
||||||
return !p.Running
|
return !p.Running
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2,12 +2,15 @@ package process
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/plugins"
|
"github.com/grafana/grafana/pkg/plugins"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
keepPluginAliveTickerDuration = time.Second * 1
|
||||||
|
)
|
||||||
|
|
||||||
type Service struct{}
|
type Service struct{}
|
||||||
|
|
||||||
func ProvideService() *Service {
|
func ProvideService() *Service {
|
||||||
@ -19,7 +22,7 @@ func (*Service) Start(ctx context.Context, p *plugins.Plugin) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := startPluginAndRestartKilledProcesses(ctx, p); err != nil {
|
if err := startPluginAndKeepItAlive(ctx, p); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -40,7 +43,7 @@ func (*Service) Stop(ctx context.Context, p *plugins.Plugin) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func startPluginAndRestartKilledProcesses(ctx context.Context, p *plugins.Plugin) error {
|
func startPluginAndKeepItAlive(ctx context.Context, p *plugins.Plugin) error {
|
||||||
if err := p.Start(ctx); err != nil {
|
if err := p.Start(ctx); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -49,41 +52,35 @@ func startPluginAndRestartKilledProcesses(ctx context.Context, p *plugins.Plugin
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
go func(ctx context.Context, p *plugins.Plugin) {
|
go func(p *plugins.Plugin) {
|
||||||
if err := restartKilledProcess(ctx, p); err != nil {
|
if err := keepPluginAlive(p); err != nil {
|
||||||
p.Logger().Error("Attempt to restart killed plugin process failed", "error", err)
|
p.Logger().Error("Attempt to restart killed plugin process failed", "error", err)
|
||||||
}
|
}
|
||||||
}(ctx, p)
|
}(p)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func restartKilledProcess(ctx context.Context, p *plugins.Plugin) error {
|
// keepPluginAlive will restart the plugin if the process is killed or exits
|
||||||
ticker := time.NewTicker(time.Second * 1)
|
func keepPluginAlive(p *plugins.Plugin) error {
|
||||||
|
ticker := time.NewTicker(keepPluginAliveTickerDuration)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
<-ticker.C
|
||||||
case <-ctx.Done():
|
if p.IsDecommissioned() {
|
||||||
if err := ctx.Err(); err != nil && !errors.Is(err, context.Canceled) {
|
p.Logger().Debug("Plugin decommissioned")
|
||||||
return err
|
return nil
|
||||||
}
|
|
||||||
return p.Stop(ctx)
|
|
||||||
case <-ticker.C:
|
|
||||||
if p.IsDecommissioned() {
|
|
||||||
p.Logger().Debug("Plugin decommissioned")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if !p.Exited() {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
p.Logger().Debug("Restarting plugin")
|
|
||||||
if err := p.Start(ctx); err != nil {
|
|
||||||
p.Logger().Error("Failed to restart plugin", "error", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
p.Logger().Debug("Plugin restarted")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !p.Exited() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
p.Logger().Debug("Restarting plugin")
|
||||||
|
if err := p.Start(context.Background()); err != nil {
|
||||||
|
p.Logger().Error("Failed to restart plugin", "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
p.Logger().Debug("Plugin restarted")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
@ -71,6 +72,31 @@ func TestProcessManager_Start(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("Won't stop the plugin if the context is cancelled", func(t *testing.T) {
|
||||||
|
bp := fakes.NewFakeBackendPlugin(true)
|
||||||
|
p := createPlugin(t, bp, func(plugin *plugins.Plugin) {
|
||||||
|
plugin.Backend = true
|
||||||
|
})
|
||||||
|
|
||||||
|
tickerDuration := keepPluginAliveTickerDuration
|
||||||
|
keepPluginAliveTickerDuration = 1 * time.Millisecond
|
||||||
|
defer func() {
|
||||||
|
keepPluginAliveTickerDuration = tickerDuration
|
||||||
|
}()
|
||||||
|
|
||||||
|
m := &Service{}
|
||||||
|
ctx := context.Background()
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
err := m.Start(ctx, p)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, bp.StartCount)
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
<-bp.ExitedCheckDoneOrStopped
|
||||||
|
require.False(t, p.Exited())
|
||||||
|
require.Equal(t, 0, bp.StopCount)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestProcessManager_Stop(t *testing.T) {
|
func TestProcessManager_Stop(t *testing.T) {
|
||||||
|
Loading…
Reference in New Issue
Block a user