Chore: Convert background service registry to dskit module (#64062)

* Chore: Add initial support for deployment modes

* revert CLI changes and start modules independently

* add modules to codeowners

* additional comments

* add Engine and Manager interface to fix test issues

* convert background service registry to dskit module

* remove extra context from serviceListener logger

Co-authored-by: Will Browne <wbrowne@users.noreply.github.com>

* Remove whitespace

* fix import

* undo ide changes

* only register All by default

* with registry

* add test

* add comments

* re-add debug log

* fix import

* reorganize arg

* undo kind changes

* add provide service test

* fix import

* rejig systemd calls

* update codeowners

---------

Co-authored-by: Todd Treece <todd.treece@grafana.com>
Co-authored-by: Todd Treece <360020+toddtreece@users.noreply.github.com>
This commit is contained in:
Will Browne 2023-07-06 14:45:47 +02:00 committed by GitHub
parent 76974009d0
commit 4818568c65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 287 additions and 190 deletions

1
.github/CODEOWNERS vendored
View File

@ -66,6 +66,7 @@
/pkg/api/ @grafana/backend-platform
/pkg/bus/ @grafana/backend-platform
/pkg/cmd/ @grafana/backend-platform
/pkg/systemd/ @grafana/backend-platform
/pkg/components/apikeygen/ @grafana/grafana-authnz-team
/pkg/components/satokengen/ @grafana/grafana-authnz-team
/pkg/components/dashdiffs/ @grafana/backend-platform

View File

@ -248,7 +248,7 @@ func RunServer(opt ServerOptions) error {
go listenToSystemSignals(ctx, s)
return s.Run()
return s.Run(ctx)
}
func validPackaging(packaging string) string {

View File

@ -3,9 +3,11 @@ package modules
const (
// All includes all modules necessary for Grafana to run as a standalone application.
All string = "all"
// BackgroundServices includes all Grafana services that run in the background
BackgroundServices string = "background-services"
)
// dependencyMap defines Module Targets => Dependencies
var dependencyMap = map[string][]string{
All: {},
All: {BackgroundServices},
}

View File

@ -9,6 +9,7 @@ import (
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/systemd"
)
type Engine interface {
@ -98,6 +99,13 @@ func (m *service) Run(ctx context.Context) error {
return err
}
err = m.serviceManager.AwaitHealthy(ctx)
if err != nil {
return err
}
systemd.NotifyReady(m.log)
err = m.serviceManager.AwaitStopped(ctx)
if err != nil {
return err

View File

@ -5,6 +5,7 @@ import (
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/modules"
"github.com/grafana/grafana/pkg/server/backgroundsvcs"
)
type Registry interface{}
@ -16,10 +17,12 @@ type registry struct {
func ProvideRegistry(
moduleManager modules.Manager,
backgroundServiceRunner *backgroundsvcs.BackgroundServiceRunner,
) *registry {
return newRegistry(
log.New("modules.registry"),
moduleManager,
backgroundServiceRunner,
)
}

View File

@ -9,9 +9,32 @@ import (
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/modules"
"github.com/grafana/grafana/pkg/server/backgroundsvcs"
)
func TestRegistry(t *testing.T) {
func TestProvideRegistry(t *testing.T) {
var registeredInvisibleModules []string
var registeredModules []string
moduleManager := &modules.MockModuleManager{
RegisterModuleFunc: func(name string, initFn func() (services.Service, error)) {
registeredModules = append(registeredModules, name)
},
RegisterInvisibleModuleFunc: func(name string, initFn func() (services.Service, error)) {
registeredInvisibleModules = append(registeredInvisibleModules, name)
},
}
svcRegistry := backgroundsvcs.NewBackgroundServiceRegistry()
svcRunner := backgroundsvcs.ProvideBackgroundServiceRunner(svcRegistry)
r := ProvideRegistry(moduleManager, svcRunner)
require.NotNil(t, r)
require.Equal(t, []string{modules.BackgroundServices}, registeredInvisibleModules)
require.Equal(t, []string{modules.All}, registeredModules)
}
func TestNewRegistry(t *testing.T) {
var registeredInvisibleModules []string
var registeredModules []string

View File

@ -94,13 +94,13 @@ func ProvideBackgroundServiceRegistry(
// BackgroundServiceRegistry provides background services.
type BackgroundServiceRegistry struct {
Services []registry.BackgroundService
services []registry.BackgroundService
}
func NewBackgroundServiceRegistry(services ...registry.BackgroundService) *BackgroundServiceRegistry {
return &BackgroundServiceRegistry{services}
func NewBackgroundServiceRegistry(s ...registry.BackgroundService) *BackgroundServiceRegistry {
return &BackgroundServiceRegistry{services: s}
}
func (r *BackgroundServiceRegistry) GetServices() []registry.BackgroundService {
return r.Services
return r.services
}

View File

@ -0,0 +1,13 @@
package backgroundsvcs
import (
"errors"
"testing"
"github.com/stretchr/testify/require"
)
func TestBackgroundServiceRegistry_GetServices(t *testing.T) {
s := NewBackgroundServiceRegistry(newTestService("A", nil, false), newTestService("B", errors.New("boom"), false))
require.Len(t, s.GetServices(), 2)
}

View File

@ -0,0 +1,64 @@
package backgroundsvcs
import (
"context"
"errors"
"fmt"
"reflect"
"golang.org/x/sync/errgroup"
"github.com/grafana/dskit/services"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/modules"
"github.com/grafana/grafana/pkg/registry"
)
// BackgroundServiceRunner provides a runner for background services.
type BackgroundServiceRunner struct {
*services.BasicService
registry registry.BackgroundServiceRegistry
log log.Logger
}
func ProvideBackgroundServiceRunner(registry registry.BackgroundServiceRegistry) *BackgroundServiceRunner {
r := &BackgroundServiceRunner{registry: registry, log: log.New("background-services-runner")}
r.BasicService = services.NewBasicService(nil, r.run, nil).WithName(modules.BackgroundServices)
return r
}
func (r *BackgroundServiceRunner) run(ctx context.Context) error {
childRoutines, childCtx := errgroup.WithContext(ctx)
// Start background services.
for _, svc := range r.registry.GetServices() {
if registry.IsDisabled(svc) {
continue
}
service := svc
serviceName := reflect.TypeOf(service).String()
childRoutines.Go(func() error {
select {
case <-childCtx.Done():
return childCtx.Err()
default:
}
r.log.Debug("Starting background service", "service", serviceName)
err := service.Run(childCtx)
// Do not return context.Canceled error since errgroup.Group only
// returns the first error to the caller - thus we can miss a more
// interesting error.
if err != nil && !errors.Is(err, context.Canceled) {
r.log.Error("Stopped background service", "service", serviceName, "reason", err)
return fmt.Errorf("%s run error: %w", serviceName, err)
}
r.log.Debug("Stopped background service", "service", serviceName, "reason", err)
return nil
})
}
r.log.Debug("Waiting on services...")
return childRoutines.Wait()
}

View File

@ -0,0 +1,52 @@
package backgroundsvcs
import (
"context"
"errors"
"fmt"
"testing"
"github.com/stretchr/testify/require"
)
func TestBackgroundServiceRunner_Run_Error(t *testing.T) {
testErr := errors.New("boom")
registry := NewBackgroundServiceRegistry(newTestService("A", nil, false), newTestService("B", testErr, false))
r := ProvideBackgroundServiceRunner(registry)
err := r.run(context.Background())
require.ErrorIs(t, err, testErr)
}
type testBackgroundService struct {
name string
started chan struct{}
runErr error
isDisabled bool
}
func newTestService(name string, runErr error, disabled bool) *testBackgroundService {
return &testBackgroundService{
name: name,
started: make(chan struct{}),
runErr: runErr,
isDisabled: disabled,
}
}
func (s *testBackgroundService) Run(ctx context.Context) error {
if s.isDisabled {
return fmt.Errorf("shouldn't run disabled service")
}
if s.runErr != nil {
return s.runErr
}
close(s.started)
<-ctx.Done()
return ctx.Err()
}
func (s *testBackgroundService) IsDisabled() bool {
return s.isDisabled
}

View File

@ -7,12 +7,9 @@ import (
"net"
"os"
"path/filepath"
"reflect"
"strconv"
"sync"
"golang.org/x/sync/errgroup"
"github.com/grafana/grafana/pkg/api"
_ "github.com/grafana/grafana/pkg/extensions"
"github.com/grafana/grafana/pkg/infra/log"
@ -38,18 +35,18 @@ type Options struct {
// New returns a new instance of Server.
func New(opts Options, cfg *setting.Cfg, httpServer *api.HTTPServer, roleRegistry accesscontrol.RoleRegistry,
provisioningService provisioning.ProvisioningService, backgroundServiceProvider registry.BackgroundServiceRegistry,
provisioningService provisioning.ProvisioningService,
usageStatsProvidersRegistry registry.UsageStatsProvidersRegistry, statsCollectorService *statscollector.Service,
moduleService modules.Engine,
_ moduleRegistry.Registry, // imported to invoke initialization via Wire
) (*Server, error) {
statsCollectorService.RegisterProviders(usageStatsProvidersRegistry.GetServices())
s, err := newServer(opts, cfg, httpServer, roleRegistry, provisioningService, backgroundServiceProvider, moduleService)
s, err := newServer(opts, cfg, httpServer, roleRegistry, provisioningService, moduleService)
if err != nil {
return nil, err
}
if err := s.init(); err != nil {
if err = s.init(context.Background()); err != nil {
return nil, err
}
@ -57,19 +54,12 @@ func New(opts Options, cfg *setting.Cfg, httpServer *api.HTTPServer, roleRegistr
}
func newServer(opts Options, cfg *setting.Cfg, httpServer *api.HTTPServer, roleRegistry accesscontrol.RoleRegistry,
provisioningService provisioning.ProvisioningService, backgroundServiceProvider registry.BackgroundServiceRegistry,
moduleService modules.Engine,
) (*Server, error) {
rootCtx, shutdownFn := context.WithCancel(context.Background())
childRoutines, childCtx := errgroup.WithContext(rootCtx)
s := &Server{
context: childCtx,
childRoutines: childRoutines,
provisioningService provisioning.ProvisioningService,
moduleService modules.Engine) (*Server, error) {
return &Server{
HTTPServer: httpServer,
provisioningService: provisioningService,
roleRegistry: roleRegistry,
shutdownFn: shutdownFn,
shutdownFinished: make(chan struct{}),
log: log.New("server"),
cfg: cfg,
@ -77,18 +67,12 @@ func newServer(opts Options, cfg *setting.Cfg, httpServer *api.HTTPServer, roleR
version: opts.Version,
commit: opts.Commit,
buildBranch: opts.BuildBranch,
backgroundServices: backgroundServiceProvider.GetServices(),
moduleService: moduleService,
}
return s, nil
}, nil
}
// Server is responsible for managing the lifecycle of services.
type Server struct {
context context.Context
shutdownFn context.CancelFunc
childRoutines *errgroup.Group
log log.Logger
cfg *setting.Cfg
shutdownOnce sync.Once
@ -96,11 +80,10 @@ type Server struct {
isInitialized bool
mtx sync.Mutex
pidFile string
version string
commit string
buildBranch string
backgroundServices []registry.BackgroundService
pidFile string
version string
commit string
buildBranch string
HTTPServer *api.HTTPServer
roleRegistry accesscontrol.RoleRegistry
@ -109,7 +92,7 @@ type Server struct {
}
// init initializes the server and its services.
func (s *Server) init() error {
func (s *Server) init(ctx context.Context) error {
s.mtx.Lock()
defer s.mtx.Unlock()
@ -123,7 +106,7 @@ func (s *Server) init() error {
}
// Initialize dskit modules.
if err := s.moduleService.Init(s.context); err != nil {
if err := s.moduleService.Init(ctx); err != nil {
return err
}
@ -131,65 +114,27 @@ func (s *Server) init() error {
return err
}
if err := s.roleRegistry.RegisterFixedRoles(s.context); err != nil {
if err := s.roleRegistry.RegisterFixedRoles(ctx); err != nil {
return err
}
return s.provisioningService.RunInitProvisioners(s.context)
return s.provisioningService.RunInitProvisioners(ctx)
}
// Run initializes and starts services. This will block until all services have
// exited. To initiate shutdown, call the Shutdown method in another goroutine.
func (s *Server) Run() error {
func (s *Server) Run(ctx context.Context) error {
defer close(s.shutdownFinished)
if err := s.init(); err != nil {
if err := s.init(ctx); err != nil {
return err
}
// Start dskit modules.
s.childRoutines.Go(func() error {
err := s.moduleService.Run(s.context)
if err != nil && !errors.Is(err, context.Canceled) {
return err
}
return nil
})
services := s.backgroundServices
// Start background services.
for _, svc := range services {
if registry.IsDisabled(svc) {
continue
}
service := svc
serviceName := reflect.TypeOf(service).String()
s.childRoutines.Go(func() error {
select {
case <-s.context.Done():
return s.context.Err()
default:
}
s.log.Debug("Starting background service", "service", serviceName)
err := service.Run(s.context)
// Do not return context.Canceled error since errgroup.Group only
// returns the first error to the caller - thus we can miss a more
// interesting error.
if err != nil && !errors.Is(err, context.Canceled) {
s.log.Error("Stopped background service", "service", serviceName, "reason", err)
return fmt.Errorf("%s run error: %w", serviceName, err)
}
s.log.Debug("Stopped background service", "service", serviceName, "reason", err)
return nil
})
err := s.moduleService.Run(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
return err
}
s.notifySystemd("READY=1")
s.log.Debug("Waiting on services...")
return s.childRoutines.Wait()
return nil
}
// Shutdown initiates Grafana graceful shutdown. This shuts down all
@ -199,11 +144,9 @@ func (s *Server) Shutdown(ctx context.Context, reason string) error {
var err error
s.shutdownOnce.Do(func() {
s.log.Info("Shutdown started", "reason", reason)
if err := s.moduleService.Shutdown(ctx); err != nil {
if err = s.moduleService.Shutdown(ctx); err != nil {
s.log.Error("Failed to shutdown modules", "error", err)
}
// Call cancel func to stop background services.
s.shutdownFn()
// Wait for server to shut down
select {
case <-s.shutdownFinished:
@ -240,33 +183,3 @@ func (s *Server) writePIDFile() error {
s.log.Info("Writing PID file", "path", s.pidFile, "pid", pid)
return nil
}
// notifySystemd sends state notifications to systemd.
func (s *Server) notifySystemd(state string) {
notifySocket := os.Getenv("NOTIFY_SOCKET")
if notifySocket == "" {
s.log.Debug(
"NOTIFY_SOCKET environment variable empty or unset, can't send systemd notification")
return
}
socketAddr := &net.UnixAddr{
Name: notifySocket,
Net: "unixgram",
}
conn, err := net.DialUnix(socketAddr.Net, nil, socketAddr)
if err != nil {
s.log.Warn("Failed to connect to systemd", "err", err, "socket", notifySocket)
return
}
defer func() {
if err := conn.Close(); err != nil {
s.log.Warn("Failed to close connection", "err", err)
}
}()
_, err = conn.Write([]byte(state))
if err != nil {
s.log.Warn("Failed to write notification to systemd", "err", err)
}
}

View File

@ -3,52 +3,19 @@ package server
import (
"context"
"errors"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/server/backgroundsvcs"
"github.com/grafana/grafana/pkg/modules"
"github.com/grafana/grafana/pkg/services/accesscontrol/acimpl"
"github.com/grafana/grafana/pkg/setting"
)
type testService struct {
started chan struct{}
runErr error
isDisabled bool
}
func newTestService(runErr error, disabled bool) *testService {
return &testService{
started: make(chan struct{}),
runErr: runErr,
isDisabled: disabled,
}
}
func (s *testService) Run(ctx context.Context) error {
if s.isDisabled {
return fmt.Errorf("Shouldn't run disabled service")
}
if s.runErr != nil {
return s.runErr
}
close(s.started)
<-ctx.Done()
return ctx.Err()
}
func (s *testService) IsDisabled() bool {
return s.isDisabled
}
func testServer(t *testing.T, services ...registry.BackgroundService) *Server {
func testServer(t *testing.T, m *modules.MockModuleEngine) *Server {
t.Helper()
s, err := newServer(Options{}, setting.NewCfg(), nil, &acimpl.Service{}, nil, backgroundsvcs.NewBackgroundServiceRegistry(services...), &MockModuleService{})
s, err := newServer(Options{}, setting.NewCfg(), nil, &acimpl.Service{}, nil, m)
require.NoError(t, err)
// Required to skip configuration initialization that causes
// DI errors in this test.
@ -58,62 +25,68 @@ func testServer(t *testing.T, services ...registry.BackgroundService) *Server {
func TestServer_Run_Error(t *testing.T) {
testErr := errors.New("boom")
s := testServer(t, newTestService(nil, false), newTestService(testErr, false))
err := s.Run()
require.ErrorIs(t, err, testErr)
t.Run("Modules Run error bubbles up", func(t *testing.T) {
ctx := context.Background()
s := testServer(t, &modules.MockModuleEngine{
RunFunc: func(c context.Context) error {
require.Equal(t, ctx, c)
return testErr
},
})
err := s.Run(ctx)
require.ErrorIs(t, err, testErr)
})
}
func TestServer_Shutdown(t *testing.T) {
ctx := context.Background()
s := testServer(t, newTestService(nil, false), newTestService(nil, true))
modulesShutdown := false
s := testServer(t, &modules.MockModuleEngine{
ShutdownFunc: func(_ context.Context) error {
modulesShutdown = true
return nil
},
})
ch := make(chan error)
go func() {
defer close(ch)
// Wait until all services launched.
for _, svc := range s.backgroundServices {
if !svc.(*testService).isDisabled {
<-svc.(*testService).started
}
}
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
err := s.Shutdown(ctx, "test interrupt")
ch <- err
}()
err := s.Run()
err := s.Run(ctx)
require.NoError(t, err)
err = <-ch
require.NoError(t, err)
}
require.True(t, modulesShutdown)
type MockModuleService struct {
initFunc func(context.Context) error
runFunc func(context.Context) error
shutdownFunc func(context.Context) error
}
t.Run("Modules Shutdown error bubbles up", func(t *testing.T) {
testErr := errors.New("boom")
func (m *MockModuleService) Init(ctx context.Context) error {
if m.initFunc != nil {
return m.initFunc(ctx)
}
return nil
}
s = testServer(t, &modules.MockModuleEngine{
ShutdownFunc: func(_ context.Context) error {
return testErr
},
})
func (m *MockModuleService) Run(ctx context.Context) error {
if m.runFunc != nil {
return m.runFunc(ctx)
}
return nil
}
ch = make(chan error)
go func() {
defer close(ch)
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
err = s.Shutdown(ctx, "test interrupt")
ch <- err
}()
err = s.Run(ctx)
require.NoError(t, err)
func (m *MockModuleService) Shutdown(ctx context.Context) error {
if m.shutdownFunc != nil {
return m.shutdownFunc(ctx)
}
return nil
err = <-ch
require.ErrorIs(t, err, testErr)
})
}

View File

@ -34,6 +34,7 @@ import (
moduleRegistry "github.com/grafana/grafana/pkg/modules/registry"
pluginDashboards "github.com/grafana/grafana/pkg/plugins/manager/dashboards"
"github.com/grafana/grafana/pkg/registry/corekind"
"github.com/grafana/grafana/pkg/server/backgroundsvcs"
"github.com/grafana/grafana/pkg/services/accesscontrol"
"github.com/grafana/grafana/pkg/services/accesscontrol/acimpl"
"github.com/grafana/grafana/pkg/services/accesscontrol/ossaccesscontrol"
@ -141,7 +142,7 @@ import (
"github.com/grafana/grafana/pkg/services/user/userimpl"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb/azuremonitor"
"github.com/grafana/grafana/pkg/tsdb/cloud-monitoring"
cloudmonitoring "github.com/grafana/grafana/pkg/tsdb/cloud-monitoring"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch"
"github.com/grafana/grafana/pkg/tsdb/elasticsearch"
pyroscope "github.com/grafana/grafana/pkg/tsdb/grafana-pyroscope-datasource"
@ -359,6 +360,7 @@ var wireBasicSet = wire.NewSet(
loggermw.Provide,
modules.WireSet,
moduleRegistry.WireSet,
backgroundsvcs.ProvideBackgroundServiceRunner,
signingkeysimpl.ProvideEmbeddedSigningKeysService,
wire.Bind(new(signingkeys.Service), new(*signingkeysimpl.Service)),
)

43
pkg/systemd/systemd.go Normal file
View File

@ -0,0 +1,43 @@
package systemd
import (
"net"
"os"
"github.com/grafana/grafana/pkg/infra/log"
)
// NotifyReady sends READY state notifications to systemd.
func NotifyReady(log log.Logger) {
notify(log, "READY=1")
}
// notify sends state notifications to systemd.
func notify(log log.Logger, state string) {
notifySocket := os.Getenv("NOTIFY_SOCKET")
if notifySocket == "" {
log.Debug(
"NOTIFY_SOCKET environment variable empty or unset, can't send systemd notification")
return
}
socketAddr := &net.UnixAddr{
Name: notifySocket,
Net: "unixgram",
}
conn, err := net.DialUnix(socketAddr.Net, nil, socketAddr)
if err != nil {
log.Warn("Failed to connect to systemd", "err", err, "socket", notifySocket)
return
}
defer func() {
if err = conn.Close(); err != nil {
log.Warn("Failed to close connection", "err", err)
}
}()
_, err = conn.Write([]byte(state))
if err != nil {
log.Warn("Failed to write notification to systemd", "err", err)
}
}

View File

@ -60,7 +60,7 @@ func StartGrafanaEnv(t *testing.T, grafDir, cfgPath string) (string, *server.Tes
go func() {
// When the server runs, it will also build and initialize the service graph
if err := env.Server.Run(); err != nil {
if err := env.Server.Run(ctx); err != nil {
t.Log("Server exited uncleanly", "error", err)
}
}()