mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Chore: Fix plugins manager process data race in tests (#81914)
* Chore: synchronize writes to pkg.plugins.log.Logs to prevent data races in test code * Chore: fix data race in tests in plugins process manager * Chore: improve Logs method naming * Chore: fix type change
This commit is contained in:
parent
1b63c27bb4
commit
a6e27d1622
@ -1,6 +1,9 @@
|
||||
package log
|
||||
|
||||
import "context"
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var _ Logger = (*TestLogger)(nil)
|
||||
|
||||
@ -20,27 +23,19 @@ func (f *TestLogger) New(_ ...any) Logger {
|
||||
}
|
||||
|
||||
func (f *TestLogger) Info(msg string, ctx ...any) {
|
||||
f.InfoLogs.Calls++
|
||||
f.InfoLogs.Message = msg
|
||||
f.InfoLogs.Ctx = ctx
|
||||
f.InfoLogs.Call(msg, ctx)
|
||||
}
|
||||
|
||||
func (f *TestLogger) Warn(msg string, ctx ...any) {
|
||||
f.WarnLogs.Calls++
|
||||
f.WarnLogs.Message = msg
|
||||
f.WarnLogs.Ctx = ctx
|
||||
f.WarnLogs.Call(msg, ctx)
|
||||
}
|
||||
|
||||
func (f *TestLogger) Debug(msg string, ctx ...any) {
|
||||
f.DebugLogs.Calls++
|
||||
f.DebugLogs.Message = msg
|
||||
f.DebugLogs.Ctx = ctx
|
||||
f.DebugLogs.Call(msg, ctx)
|
||||
}
|
||||
|
||||
func (f *TestLogger) Error(msg string, ctx ...any) {
|
||||
f.ErrorLogs.Calls++
|
||||
f.ErrorLogs.Message = msg
|
||||
f.ErrorLogs.Ctx = ctx
|
||||
f.ErrorLogs.Call(msg, ctx)
|
||||
}
|
||||
|
||||
func (f *TestLogger) FromContext(_ context.Context) Logger {
|
||||
@ -51,6 +46,16 @@ type Logs struct {
|
||||
Calls int
|
||||
Message string
|
||||
Ctx []any
|
||||
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func (l *Logs) Call(msg string, ctx ...any) {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
l.Calls++
|
||||
l.Message = msg
|
||||
l.Ctx = ctx
|
||||
}
|
||||
|
||||
var _ PrettyLogger = (*TestPrettyLogger)(nil)
|
||||
|
@ -7,22 +7,24 @@ import (
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
)
|
||||
|
||||
var (
|
||||
keepPluginAliveTickerDuration = time.Second * 1
|
||||
)
|
||||
const defaultKeepPluginAliveTickerDuration = time.Second
|
||||
|
||||
type Service struct{}
|
||||
|
||||
func ProvideService() *Service {
|
||||
return &Service{}
|
||||
type Service struct {
|
||||
keepPluginAliveTickerDuration time.Duration
|
||||
}
|
||||
|
||||
func (*Service) Start(ctx context.Context, p *plugins.Plugin) error {
|
||||
func ProvideService() *Service {
|
||||
return &Service{
|
||||
keepPluginAliveTickerDuration: defaultKeepPluginAliveTickerDuration,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) Start(ctx context.Context, p *plugins.Plugin) error {
|
||||
if !p.IsManaged() || !p.Backend || p.SignatureError != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := startPluginAndKeepItAlive(ctx, p); err != nil {
|
||||
if err := s.startPluginAndKeepItAlive(ctx, p); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -43,7 +45,7 @@ func (*Service) Stop(ctx context.Context, p *plugins.Plugin) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func startPluginAndKeepItAlive(ctx context.Context, p *plugins.Plugin) error {
|
||||
func (s *Service) startPluginAndKeepItAlive(ctx context.Context, p *plugins.Plugin) error {
|
||||
if err := p.Start(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -53,7 +55,7 @@ func startPluginAndKeepItAlive(ctx context.Context, p *plugins.Plugin) error {
|
||||
}
|
||||
|
||||
go func(p *plugins.Plugin) {
|
||||
if err := keepPluginAlive(p); err != nil {
|
||||
if err := s.keepPluginAlive(p); err != nil {
|
||||
p.Logger().Error("Attempt to restart killed plugin process failed", "error", err)
|
||||
}
|
||||
}(p)
|
||||
@ -62,8 +64,8 @@ func startPluginAndKeepItAlive(ctx context.Context, p *plugins.Plugin) error {
|
||||
}
|
||||
|
||||
// keepPluginAlive will restart the plugin if the process is killed or exits
|
||||
func keepPluginAlive(p *plugins.Plugin) error {
|
||||
ticker := time.NewTicker(keepPluginAliveTickerDuration)
|
||||
func (s *Service) keepPluginAlive(p *plugins.Plugin) error {
|
||||
ticker := time.NewTicker(s.keepPluginAliveTickerDuration)
|
||||
|
||||
for {
|
||||
<-ticker.C
|
||||
|
@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
@ -15,7 +14,11 @@ import (
|
||||
)
|
||||
|
||||
func TestProcessManager_Start(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("Plugin state determines process start", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tcs := []struct {
|
||||
name string
|
||||
managed bool
|
||||
@ -52,14 +55,20 @@ func TestProcessManager_Start(t *testing.T) {
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
// create a local copy of "tc" to allow concurrent access within tests to the different items of testCases,
|
||||
// otherwise it would be like a moving pointer while tests run in parallel
|
||||
tc := tc
|
||||
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
bp := fakes.NewFakeBackendPlugin(tc.managed)
|
||||
p := createPlugin(t, bp, func(plugin *plugins.Plugin) {
|
||||
plugin.Backend = tc.backend
|
||||
plugin.SignatureError = tc.signatureError
|
||||
})
|
||||
|
||||
m := &Service{}
|
||||
m := ProvideService()
|
||||
err := m.Start(context.Background(), p)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expectedStartCount, bp.StartCount)
|
||||
@ -74,18 +83,15 @@ func TestProcessManager_Start(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("Won't stop the plugin if the context is cancelled", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
bp := fakes.NewFakeBackendPlugin(true)
|
||||
p := createPlugin(t, bp, func(plugin *plugins.Plugin) {
|
||||
plugin.Backend = true
|
||||
})
|
||||
|
||||
tickerDuration := keepPluginAliveTickerDuration
|
||||
keepPluginAliveTickerDuration = 1 * time.Millisecond
|
||||
defer func() {
|
||||
keepPluginAliveTickerDuration = tickerDuration
|
||||
}()
|
||||
|
||||
m := &Service{}
|
||||
m := ProvideService()
|
||||
m.keepPluginAliveTickerDuration = 1
|
||||
ctx := context.Background()
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
err := m.Start(ctx, p)
|
||||
@ -100,7 +106,11 @@ func TestProcessManager_Start(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestProcessManager_Stop(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("Can stop a running plugin", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
pluginID := "test-datasource"
|
||||
|
||||
bp := fakes.NewFakeBackendPlugin(true)
|
||||
@ -109,7 +119,7 @@ func TestProcessManager_Stop(t *testing.T) {
|
||||
plugin.Backend = true
|
||||
})
|
||||
|
||||
m := &Service{}
|
||||
m := ProvideService()
|
||||
err := m.Stop(context.Background(), p)
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -120,18 +130,21 @@ func TestProcessManager_Stop(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestProcessManager_ManagedBackendPluginLifecycle(t *testing.T) {
|
||||
bp := fakes.NewFakeBackendPlugin(true)
|
||||
p := createPlugin(t, bp, func(plugin *plugins.Plugin) {
|
||||
plugin.Backend = true
|
||||
})
|
||||
|
||||
m := &Service{}
|
||||
|
||||
err := m.Start(context.Background(), p)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, bp.StartCount)
|
||||
t.Parallel()
|
||||
|
||||
t.Run("When plugin process is killed, the process is restarted", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
bp := fakes.NewFakeBackendPlugin(true)
|
||||
p := createPlugin(t, bp, func(plugin *plugins.Plugin) {
|
||||
plugin.Backend = true
|
||||
})
|
||||
|
||||
m := ProvideService()
|
||||
|
||||
err := m.Start(context.Background(), p)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, bp.StartCount)
|
||||
|
||||
var wgKill sync.WaitGroup
|
||||
wgKill.Add(1)
|
||||
go func() {
|
||||
|
Loading…
Reference in New Issue
Block a user