mirror of
https://github.com/grafana/grafana.git
synced 2025-02-12 08:35:43 -06:00
Chore: Revert dskit service additions (#72608)
This commit is contained in:
parent
f6e836302b
commit
f3ffc850aa
1
.github/CODEOWNERS
vendored
1
.github/CODEOWNERS
vendored
@ -68,7 +68,6 @@
|
||||
/pkg/api/ @grafana/backend-platform
|
||||
/pkg/bus/ @grafana/backend-platform
|
||||
/pkg/cmd/ @grafana/backend-platform
|
||||
/pkg/systemd/ @grafana/backend-platform
|
||||
/pkg/components/apikeygen/ @grafana/grafana-authnz-team
|
||||
/pkg/components/satokengen/ @grafana/grafana-authnz-team
|
||||
/pkg/components/dashdiffs/ @grafana/backend-platform
|
||||
|
@ -11,9 +11,8 @@ import (
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
"sync"
|
||||
|
||||
"github.com/grafana/dskit/services"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
|
||||
@ -33,7 +32,6 @@ import (
|
||||
"github.com/grafana/grafana/pkg/middleware"
|
||||
"github.com/grafana/grafana/pkg/middleware/csrf"
|
||||
"github.com/grafana/grafana/pkg/middleware/loggermw"
|
||||
"github.com/grafana/grafana/pkg/modules"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/plugins/pluginscdn"
|
||||
"github.com/grafana/grafana/pkg/registry/corekind"
|
||||
@ -105,10 +103,6 @@ import (
|
||||
)
|
||||
|
||||
type HTTPServer struct {
|
||||
// services.NamedService is embedded so we can inherit AddListener, without
|
||||
// implementing the service interface.
|
||||
services.NamedService
|
||||
|
||||
log log.Logger
|
||||
web *web.Mux
|
||||
context context.Context
|
||||
@ -116,7 +110,6 @@ type HTTPServer struct {
|
||||
middlewares []web.Handler
|
||||
namedMiddlewares []routing.RegisterNamedMiddleware
|
||||
bus bus.Bus
|
||||
errs chan error
|
||||
|
||||
pluginContextProvider *plugincontext.Provider
|
||||
RouteRegister routing.RouteRegister
|
||||
@ -358,7 +351,6 @@ func ProvideHTTPServer(opts ServerOptions, cfg *setting.Cfg, routeRegister routi
|
||||
authnService: authnService,
|
||||
pluginsCDNService: pluginsCDNService,
|
||||
starApi: starApi,
|
||||
errs: make(chan error),
|
||||
}
|
||||
if hs.Listener != nil {
|
||||
hs.log.Debug("Using provided listener")
|
||||
@ -371,8 +363,6 @@ func ProvideHTTPServer(opts ServerOptions, cfg *setting.Cfg, routeRegister routi
|
||||
if err := hs.declareFixedRoles(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
hs.NamedService = services.NewBasicService(hs.start, hs.running, hs.stop).WithName(modules.HTTPServer)
|
||||
return hs, nil
|
||||
}
|
||||
|
||||
@ -384,7 +374,7 @@ func (hs *HTTPServer) AddNamedMiddleware(middleware routing.RegisterNamedMiddlew
|
||||
hs.namedMiddlewares = append(hs.namedMiddlewares, middleware)
|
||||
}
|
||||
|
||||
func (hs *HTTPServer) start(ctx context.Context) error {
|
||||
func (hs *HTTPServer) Run(ctx context.Context) error {
|
||||
hs.context = ctx
|
||||
|
||||
hs.applyRoutes()
|
||||
@ -416,57 +406,42 @@ func (hs *HTTPServer) start(ctx context.Context) error {
|
||||
hs.log.Info("HTTP Server Listen", "address", listener.Addr().String(), "protocol",
|
||||
hs.Cfg.Protocol, "subUrl", hs.Cfg.AppSubURL, "socket", hs.Cfg.SocketPath)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
|
||||
// handle http shutdown on server context done
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
<-ctx.Done()
|
||||
if err := hs.httpSrv.Shutdown(context.Background()); err != nil {
|
||||
hs.log.Error("Failed to shutdown server", "error", err)
|
||||
}
|
||||
}()
|
||||
|
||||
switch hs.Cfg.Protocol {
|
||||
case setting.HTTPScheme, setting.SocketScheme:
|
||||
go func() {
|
||||
if err := hs.httpSrv.Serve(listener); err != nil {
|
||||
if errors.Is(err, http.ErrServerClosed) {
|
||||
hs.log.Debug("server was shutdown gracefully")
|
||||
close(hs.errs)
|
||||
return
|
||||
}
|
||||
hs.errs <- err
|
||||
if err := hs.httpSrv.Serve(listener); err != nil {
|
||||
if errors.Is(err, http.ErrServerClosed) {
|
||||
hs.log.Debug("server was shutdown gracefully")
|
||||
return nil
|
||||
}
|
||||
}()
|
||||
return err
|
||||
}
|
||||
case setting.HTTP2Scheme, setting.HTTPSScheme:
|
||||
go func() {
|
||||
if err := hs.httpSrv.ServeTLS(listener, hs.Cfg.CertFile, hs.Cfg.KeyFile); err != nil {
|
||||
if errors.Is(err, http.ErrServerClosed) {
|
||||
hs.log.Debug("server was shutdown gracefully")
|
||||
close(hs.errs)
|
||||
return
|
||||
}
|
||||
hs.errs <- err
|
||||
if err := hs.httpSrv.ServeTLS(listener, hs.Cfg.CertFile, hs.Cfg.KeyFile); err != nil {
|
||||
if errors.Is(err, http.ErrServerClosed) {
|
||||
hs.log.Debug("server was shutdown gracefully")
|
||||
return nil
|
||||
}
|
||||
}()
|
||||
return err
|
||||
}
|
||||
default:
|
||||
panic(fmt.Sprintf("Unhandled protocol %q", hs.Cfg.Protocol))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
func (hs *HTTPServer) running(ctx context.Context) error {
|
||||
select {
|
||||
case err, ok := <-hs.errs:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (hs *HTTPServer) stop(_ error) error {
|
||||
// Create a context with a timeout
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if err := hs.httpSrv.Shutdown(ctx); err != nil {
|
||||
return fmt.Errorf("failed to shutdown server: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -248,7 +248,7 @@ func RunServer(opt ServerOptions) error {
|
||||
|
||||
go listenToSystemSignals(ctx, s)
|
||||
|
||||
return s.Run(ctx)
|
||||
return s.Run()
|
||||
}
|
||||
|
||||
func validPackaging(packaging string) string {
|
||||
|
@ -1,28 +0,0 @@
|
||||
package modules
|
||||
|
||||
const (
|
||||
// All includes all modules necessary for Grafana to run as a standalone application.
|
||||
All string = "all"
|
||||
// BackgroundServices includes all Grafana services that run in the background
|
||||
BackgroundServices string = "background-services"
|
||||
// CertGenerator generates certificates for grafana-apiserver
|
||||
CertGenerator string = "cert-generator"
|
||||
// GrafanaAPIServer is the Kubertenes API server for Grafana Resources
|
||||
GrafanaAPIServer string = "grafana-apiserver"
|
||||
// HTTPServer is the HTTP server for Grafana
|
||||
HTTPServer string = "http-server"
|
||||
// Provisioning sets up Grafana with preconfigured datasources, dashboards, etc.
|
||||
Provisioning string = "provisioning"
|
||||
// SecretMigrator handles legacy secrets migrations
|
||||
SecretMigrator string = "secret-migrator"
|
||||
)
|
||||
|
||||
// dependencyMap defines Module Targets => Dependencies
|
||||
var dependencyMap = map[string][]string{
|
||||
BackgroundServices: {Provisioning, HTTPServer},
|
||||
CertGenerator: {},
|
||||
GrafanaAPIServer: {CertGenerator},
|
||||
Provisioning: {SecretMigrator},
|
||||
|
||||
All: {BackgroundServices},
|
||||
}
|
@ -21,19 +21,11 @@ func newServiceListener(logger log.Logger, s *service) *serviceListener {
|
||||
}
|
||||
|
||||
func (l *serviceListener) Healthy() {
|
||||
l.log.Info("All modules healthy", "modules", l.moduleNames())
|
||||
l.log.Info("All modules healthy")
|
||||
}
|
||||
|
||||
func (l *serviceListener) Stopped() {
|
||||
l.log.Info("All modules stopped", "modules", l.moduleNames())
|
||||
}
|
||||
|
||||
func (l *serviceListener) moduleNames() []string {
|
||||
var ms []string
|
||||
for m := range l.service.serviceMap {
|
||||
ms = append(ms, m)
|
||||
}
|
||||
return ms
|
||||
l.log.Info("All modules stopped")
|
||||
}
|
||||
|
||||
func (l *serviceListener) Failure(service services.Service) {
|
||||
@ -43,7 +35,7 @@ func (l *serviceListener) Failure(service services.Service) {
|
||||
}
|
||||
|
||||
// log which module failed
|
||||
for module, s := range l.service.serviceMap {
|
||||
for module, s := range l.service.ServiceMap {
|
||||
if s == service {
|
||||
if errors.Is(service.FailureCase(), modules.ErrStopProcess) {
|
||||
l.log.Info("Received stop signal via return error", "module", module, "err", service.FailureCase())
|
||||
|
@ -3,7 +3,6 @@ package modules
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/grafana/dskit/modules"
|
||||
"github.com/grafana/dskit/services"
|
||||
@ -11,19 +10,17 @@ import (
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/systemd"
|
||||
)
|
||||
|
||||
type Engine interface {
|
||||
AwaitHealthy(context.Context) error
|
||||
Init(context.Context) error
|
||||
Run(context.Context) error
|
||||
Shutdown(context.Context) error
|
||||
}
|
||||
|
||||
type Manager interface {
|
||||
RegisterModule(name string, initFn func() (services.Service, error))
|
||||
RegisterInvisibleModule(name string, initFn func() (services.Service, error))
|
||||
RegisterModule(name string, initFn func() (services.Service, error), deps ...string)
|
||||
RegisterInvisibleModule(name string, initFn func() (services.Service, error), deps ...string)
|
||||
}
|
||||
|
||||
var _ Engine = (*service)(nil)
|
||||
@ -31,15 +28,14 @@ var _ Manager = (*service)(nil)
|
||||
|
||||
// service manages the registration and lifecycle of modules.
|
||||
type service struct {
|
||||
cfg *setting.Cfg
|
||||
log log.Logger
|
||||
targets []string
|
||||
cfg *setting.Cfg
|
||||
log log.Logger
|
||||
targets []string
|
||||
dependencyMap map[string][]string
|
||||
|
||||
moduleManager *modules.Manager
|
||||
serviceManager *services.Manager
|
||||
serviceMap map[string]services.Service
|
||||
|
||||
features *featuremgmt.FeatureManager
|
||||
ModuleManager *modules.Manager
|
||||
ServiceManager *services.Manager
|
||||
ServiceMap map[string]services.Service
|
||||
}
|
||||
|
||||
func ProvideService(
|
||||
@ -49,55 +45,41 @@ func ProvideService(
|
||||
logger := log.New("modules")
|
||||
|
||||
return &service{
|
||||
cfg: cfg,
|
||||
log: logger,
|
||||
targets: cfg.Target,
|
||||
cfg: cfg,
|
||||
log: logger,
|
||||
targets: cfg.Target,
|
||||
dependencyMap: map[string][]string{},
|
||||
|
||||
moduleManager: modules.NewManager(logger),
|
||||
serviceMap: map[string]services.Service{},
|
||||
|
||||
features: features,
|
||||
ModuleManager: modules.NewManager(logger),
|
||||
ServiceMap: map[string]services.Service{},
|
||||
}
|
||||
}
|
||||
|
||||
// AwaitHealthy waits for all registered modules to be healthy.
|
||||
func (m *service) AwaitHealthy(ctx context.Context) error {
|
||||
if m.serviceManager == nil {
|
||||
return errors.New("service manager has not been initialized")
|
||||
}
|
||||
return m.serviceManager.AwaitHealthy(ctx)
|
||||
}
|
||||
|
||||
// Init initializes all registered modules.
|
||||
func (m *service) Init(_ context.Context) error {
|
||||
var err error
|
||||
|
||||
if err = m.processFeatureFlags(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.log.Debug("Initializing module manager", "targets", m.targets)
|
||||
for mod, targets := range dependencyMap {
|
||||
if err := m.moduleManager.AddDependency(mod, targets...); err != nil {
|
||||
for mod, targets := range m.dependencyMap {
|
||||
if err := m.ModuleManager.AddDependency(mod, targets...); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
m.serviceMap, err = m.moduleManager.InitModuleServices(m.targets...)
|
||||
m.ServiceMap, err = m.ModuleManager.InitModuleServices(m.targets...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// if no modules are registered, we don't need to start the service manager
|
||||
if len(m.serviceMap) == 0 {
|
||||
if len(m.ServiceMap) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var svcs []services.Service
|
||||
for _, s := range m.serviceMap {
|
||||
for _, s := range m.ServiceMap {
|
||||
svcs = append(svcs, s)
|
||||
}
|
||||
m.serviceManager, err = services.NewManager(svcs...)
|
||||
m.ServiceManager, err = services.NewManager(svcs...)
|
||||
|
||||
return err
|
||||
}
|
||||
@ -107,35 +89,28 @@ func (m *service) Run(ctx context.Context) error {
|
||||
// we don't need to continue if no modules are registered.
|
||||
// this behavior may need to change if dskit services replace the
|
||||
// current background service registry.
|
||||
if len(m.serviceMap) == 0 {
|
||||
if len(m.ServiceMap) == 0 {
|
||||
m.log.Warn("No modules registered...")
|
||||
<-ctx.Done()
|
||||
return nil
|
||||
}
|
||||
|
||||
listener := newServiceListener(m.log, m)
|
||||
m.serviceManager.AddListener(listener)
|
||||
m.ServiceManager.AddListener(listener)
|
||||
|
||||
m.log.Debug("Starting module service manager")
|
||||
// wait until a service fails or stop signal was received
|
||||
err := m.serviceManager.StartAsync(ctx)
|
||||
err := m.ServiceManager.StartAsync(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = m.serviceManager.AwaitHealthy(ctx)
|
||||
err = m.ServiceManager.AwaitStopped(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
systemd.NotifyReady(m.log)
|
||||
|
||||
err = m.serviceManager.AwaitStopped(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
failed := m.serviceManager.ServicesByState()[services.Failed]
|
||||
failed := m.ServiceManager.ServicesByState()[services.Failed]
|
||||
for _, f := range failed {
|
||||
// the service listener will log error details for all modules that failed,
|
||||
// so here we return the first error that is not ErrStopProcess
|
||||
@ -149,55 +124,29 @@ func (m *service) Run(ctx context.Context) error {
|
||||
|
||||
// Shutdown stops all modules and waits for them to stop.
|
||||
func (m *service) Shutdown(ctx context.Context) error {
|
||||
if m.serviceManager == nil {
|
||||
if m.ServiceManager == nil {
|
||||
m.log.Debug("No modules registered, nothing to stop...")
|
||||
return nil
|
||||
}
|
||||
m.serviceManager.StopAsync()
|
||||
m.ServiceManager.StopAsync()
|
||||
m.log.Info("Awaiting services to be stopped...")
|
||||
return m.serviceManager.AwaitStopped(ctx)
|
||||
return m.ServiceManager.AwaitStopped(ctx)
|
||||
}
|
||||
|
||||
// RegisterModule registers a module with the dskit module manager.
|
||||
func (m *service) RegisterModule(name string, initFn func() (services.Service, error)) {
|
||||
m.moduleManager.RegisterModule(name, initFn)
|
||||
func (m *service) RegisterModule(name string, initFn func() (services.Service, error), deps ...string) {
|
||||
m.ModuleManager.RegisterModule(name, initFn)
|
||||
m.dependencyMap[name] = deps
|
||||
}
|
||||
|
||||
// RegisterInvisibleModule registers an invisible module with the dskit module manager.
|
||||
// Invisible modules are not visible to the user, and are intended to be used as dependencies.
|
||||
func (m *service) RegisterInvisibleModule(name string, initFn func() (services.Service, error)) {
|
||||
m.moduleManager.RegisterModule(name, initFn, modules.UserInvisibleModule)
|
||||
// Invisible modules are not visible to the user, and are intendent to be used as dependencies.
|
||||
func (m *service) RegisterInvisibleModule(name string, initFn func() (services.Service, error), deps ...string) {
|
||||
m.ModuleManager.RegisterModule(name, initFn, modules.UserInvisibleModule)
|
||||
m.dependencyMap[name] = deps
|
||||
}
|
||||
|
||||
// IsModuleEnabled returns true if the module is enabled.
|
||||
func (m *service) IsModuleEnabled(name string) bool {
|
||||
return stringsContain(m.targets, name)
|
||||
}
|
||||
|
||||
// processFeatureFlags adds or removes targets based on feature flags.
|
||||
func (m *service) processFeatureFlags() error {
|
||||
// add GrafanaAPIServer to targets if feature is enabled
|
||||
if m.features.IsEnabled(featuremgmt.FlagGrafanaAPIServer) {
|
||||
m.targets = append(m.targets, GrafanaAPIServer)
|
||||
}
|
||||
|
||||
if !m.features.IsEnabled(featuremgmt.FlagGrafanaAPIServer) {
|
||||
// error if GrafanaAPIServer is in targets
|
||||
for _, t := range m.targets {
|
||||
if t == GrafanaAPIServer {
|
||||
return fmt.Errorf("feature flag %s is disabled, but target %s is still enabled", featuremgmt.FlagGrafanaAPIServer, GrafanaAPIServer)
|
||||
}
|
||||
}
|
||||
|
||||
// error if GrafanaAPIServer is a dependency of a target
|
||||
for parent, targets := range dependencyMap {
|
||||
for _, t := range targets {
|
||||
if t == GrafanaAPIServer && m.IsModuleEnabled(parent) {
|
||||
return fmt.Errorf("feature flag %s is disabled, but target %s is enabled with dependency on %s", featuremgmt.FlagGrafanaAPIServer, parent, GrafanaAPIServer)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -1,63 +0,0 @@
|
||||
package registry
|
||||
|
||||
import (
|
||||
"github.com/grafana/dskit/services"
|
||||
"github.com/grafana/grafana-apiserver/pkg/certgenerator"
|
||||
|
||||
"github.com/grafana/grafana/pkg/api"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/modules"
|
||||
"github.com/grafana/grafana/pkg/registry/backgroundsvcs"
|
||||
grafanaapiserver "github.com/grafana/grafana/pkg/services/grafana-apiserver"
|
||||
"github.com/grafana/grafana/pkg/services/provisioning"
|
||||
"github.com/grafana/grafana/pkg/services/secrets/kvstore/migrations"
|
||||
)
|
||||
|
||||
type Registry interface{}
|
||||
|
||||
type registry struct {
|
||||
moduleManager modules.Manager
|
||||
log log.Logger
|
||||
}
|
||||
|
||||
func ProvideRegistry(
|
||||
moduleManager modules.Manager,
|
||||
apiServer grafanaapiserver.Service,
|
||||
backgroundServiceRunner *backgroundsvcs.BackgroundServiceRunner,
|
||||
certGenerator certgenerator.ServiceInterface,
|
||||
httpServer *api.HTTPServer,
|
||||
provisioningService *provisioning.ProvisioningServiceImpl,
|
||||
secretsMigrator *migrations.SecretMigrationProviderImpl,
|
||||
) *registry {
|
||||
return newRegistry(
|
||||
log.New("modules.registry"),
|
||||
moduleManager,
|
||||
apiServer,
|
||||
backgroundServiceRunner,
|
||||
certGenerator,
|
||||
httpServer,
|
||||
provisioningService,
|
||||
secretsMigrator,
|
||||
)
|
||||
}
|
||||
|
||||
func newRegistry(logger log.Logger, moduleManager modules.Manager, svcs ...services.NamedService) *registry {
|
||||
r := ®istry{
|
||||
log: logger,
|
||||
moduleManager: moduleManager,
|
||||
}
|
||||
|
||||
// Register (invisible) modules which act solely as dependencies to module targets
|
||||
for _, svc := range svcs {
|
||||
s := svc
|
||||
logger.Debug("Registering invisible module", "name", s.ServiceName())
|
||||
r.moduleManager.RegisterInvisibleModule(s.ServiceName(), func() (services.Service, error) {
|
||||
return s, nil
|
||||
})
|
||||
}
|
||||
|
||||
logger.Debug("Registering module", "name", modules.All)
|
||||
r.moduleManager.RegisterModule(modules.All, nil)
|
||||
|
||||
return r
|
||||
}
|
@ -1,33 +0,0 @@
|
||||
package registry
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/grafana/dskit/services"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/modules"
|
||||
)
|
||||
|
||||
func TestNewRegistry(t *testing.T) {
|
||||
var registeredInvisibleModules []string
|
||||
var registeredModules []string
|
||||
|
||||
moduleManager := &modules.MockModuleManager{
|
||||
RegisterModuleFunc: func(name string, initFn func() (services.Service, error)) {
|
||||
registeredModules = append(registeredModules, name)
|
||||
},
|
||||
RegisterInvisibleModuleFunc: func(name string, initFn func() (services.Service, error)) {
|
||||
registeredInvisibleModules = append(registeredInvisibleModules, name)
|
||||
},
|
||||
}
|
||||
|
||||
mockSvcName := "test-registry"
|
||||
mockSvc := modules.NewMockNamedService(mockSvcName)
|
||||
|
||||
r := newRegistry(log.New("modules.registry"), moduleManager, mockSvc)
|
||||
require.NotNil(t, r)
|
||||
require.Equal(t, []string{mockSvcName}, registeredInvisibleModules)
|
||||
require.Equal(t, []string{modules.All}, registeredModules)
|
||||
}
|
@ -1,8 +0,0 @@
|
||||
package registry
|
||||
|
||||
import "github.com/google/wire"
|
||||
|
||||
var WireSet = wire.NewSet(
|
||||
ProvideRegistry,
|
||||
wire.Bind(new(Registry), new(*registry)),
|
||||
)
|
@ -1,66 +1,5 @@
|
||||
package modules
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/grafana/dskit/services"
|
||||
)
|
||||
|
||||
var _ Manager = (*MockModuleManager)(nil)
|
||||
var _ Engine = (*MockModuleEngine)(nil)
|
||||
|
||||
type MockModuleManager struct {
|
||||
RegisterModuleFunc func(name string, initFn func() (services.Service, error))
|
||||
RegisterInvisibleModuleFunc func(name string, initFn func() (services.Service, error))
|
||||
}
|
||||
|
||||
func (m *MockModuleManager) RegisterModule(name string, initFn func() (services.Service, error)) {
|
||||
if m.RegisterModuleFunc != nil {
|
||||
m.RegisterModuleFunc(name, initFn)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MockModuleManager) RegisterInvisibleModule(name string, initFn func() (services.Service, error)) {
|
||||
if m.RegisterInvisibleModuleFunc != nil {
|
||||
m.RegisterInvisibleModuleFunc(name, initFn)
|
||||
}
|
||||
}
|
||||
|
||||
type MockModuleEngine struct {
|
||||
AwaitHealthyFunc func(context.Context) error
|
||||
InitFunc func(context.Context) error
|
||||
RunFunc func(context.Context) error
|
||||
ShutdownFunc func(context.Context) error
|
||||
}
|
||||
|
||||
func (m *MockModuleEngine) AwaitHealthy(ctx context.Context) error {
|
||||
if m.AwaitHealthyFunc != nil {
|
||||
return m.AwaitHealthyFunc(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MockModuleEngine) Init(ctx context.Context) error {
|
||||
if m.InitFunc != nil {
|
||||
return m.InitFunc(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MockModuleEngine) Run(ctx context.Context) error {
|
||||
if m.RunFunc != nil {
|
||||
return m.RunFunc(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MockModuleEngine) Shutdown(ctx context.Context) error {
|
||||
if m.ShutdownFunc != nil {
|
||||
return m.ShutdownFunc(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func stringsContain(values []string, search string) bool {
|
||||
for _, v := range values {
|
||||
if search == v {
|
||||
@ -70,14 +9,3 @@ func stringsContain(values []string, search string) bool {
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
type MockNamedService struct {
|
||||
*services.BasicService
|
||||
}
|
||||
|
||||
func NewMockNamedService(name string) *MockNamedService {
|
||||
startFn := func(_ context.Context) error { return nil }
|
||||
return &MockNamedService{
|
||||
BasicService: services.NewIdleService(startFn, nil).WithName(name),
|
||||
}
|
||||
}
|
||||
|
@ -1,8 +1,6 @@
|
||||
package modules
|
||||
|
||||
import (
|
||||
"github.com/google/wire"
|
||||
)
|
||||
import "github.com/google/wire"
|
||||
|
||||
var WireSet = wire.NewSet(
|
||||
ProvideService,
|
||||
|
@ -1,6 +1,7 @@
|
||||
package backgroundsvcs
|
||||
|
||||
import (
|
||||
"github.com/grafana/grafana/pkg/api"
|
||||
"github.com/grafana/grafana/pkg/infra/metrics"
|
||||
"github.com/grafana/grafana/pkg/infra/remotecache"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
@ -28,6 +29,7 @@ import (
|
||||
publicdashboardsmetric "github.com/grafana/grafana/pkg/services/publicdashboards/metric"
|
||||
"github.com/grafana/grafana/pkg/services/rendering"
|
||||
"github.com/grafana/grafana/pkg/services/searchV2"
|
||||
secretsMigrations "github.com/grafana/grafana/pkg/services/secrets/kvstore/migrations"
|
||||
secretsManager "github.com/grafana/grafana/pkg/services/secrets/manager"
|
||||
"github.com/grafana/grafana/pkg/services/serviceaccounts"
|
||||
samanager "github.com/grafana/grafana/pkg/services/serviceaccounts/manager"
|
||||
@ -39,7 +41,7 @@ import (
|
||||
)
|
||||
|
||||
func ProvideBackgroundServiceRegistry(
|
||||
ng *ngalert.AlertNG, cleanup *cleanup.CleanUpService, live *live.GrafanaLive,
|
||||
httpServer *api.HTTPServer, ng *ngalert.AlertNG, cleanup *cleanup.CleanUpService, live *live.GrafanaLive,
|
||||
pushGateway *pushhttp.Gateway, notifications *notifications.NotificationService, processManager *process.Manager,
|
||||
rendering *rendering.RenderingService, tokenService auth.UserTokenBackgroundService, tracing tracing.Tracer,
|
||||
provisioning *provisioning.ProvisioningServiceImpl, alerting *alerting.AlertEngine, usageStats *uss.UsageStats,
|
||||
@ -47,7 +49,7 @@ func ProvideBackgroundServiceRegistry(
|
||||
pluginsUpdateChecker *updatechecker.PluginsService, metrics *metrics.InternalMetricsService,
|
||||
secretsService *secretsManager.SecretsService, remoteCache *remotecache.RemoteCache, StorageService store.StorageService, searchService searchV2.SearchService, entityEventsService store.EntityEventsService,
|
||||
saService *samanager.ServiceAccountsService, authInfoService *authinfoservice.Implementation,
|
||||
grpcServerProvider grpcserver.Provider, loginAttemptService *loginattemptimpl.Service,
|
||||
grpcServerProvider grpcserver.Provider, secretMigrationProvider secretsMigrations.SecretMigrationProvider, loginAttemptService *loginattemptimpl.Service,
|
||||
bundleService *supportbundlesimpl.Service,
|
||||
publicDashboardsMetric *publicdashboardsmetric.Service,
|
||||
keyRetriever *dynamic.KeyRetriever,
|
||||
@ -59,6 +61,7 @@ func ProvideBackgroundServiceRegistry(
|
||||
_ *grpcserver.HealthService, _ entity.EntityStoreServer, _ *grpcserver.ReflectionService, _ *ldapapi.Service,
|
||||
) *BackgroundServiceRegistry {
|
||||
return NewBackgroundServiceRegistry(
|
||||
httpServer,
|
||||
ng,
|
||||
cleanup,
|
||||
live,
|
||||
@ -83,6 +86,7 @@ func ProvideBackgroundServiceRegistry(
|
||||
saService,
|
||||
authInfoService,
|
||||
processManager,
|
||||
secretMigrationProvider,
|
||||
loginAttemptService,
|
||||
bundleService,
|
||||
publicDashboardsMetric,
|
||||
@ -93,13 +97,13 @@ func ProvideBackgroundServiceRegistry(
|
||||
|
||||
// BackgroundServiceRegistry provides background services.
|
||||
type BackgroundServiceRegistry struct {
|
||||
services []registry.BackgroundService
|
||||
Services []registry.BackgroundService
|
||||
}
|
||||
|
||||
func NewBackgroundServiceRegistry(s ...registry.BackgroundService) *BackgroundServiceRegistry {
|
||||
return &BackgroundServiceRegistry{services: s}
|
||||
func NewBackgroundServiceRegistry(services ...registry.BackgroundService) *BackgroundServiceRegistry {
|
||||
return &BackgroundServiceRegistry{services}
|
||||
}
|
||||
|
||||
func (r *BackgroundServiceRegistry) GetServices() []registry.BackgroundService {
|
||||
return r.services
|
||||
return r.Services
|
||||
}
|
||||
|
@ -1,13 +0,0 @@
|
||||
package backgroundsvcs
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestBackgroundServiceRegistry_GetServices(t *testing.T) {
|
||||
s := NewBackgroundServiceRegistry(newTestService("A", nil, false), newTestService("B", errors.New("boom"), false))
|
||||
require.Len(t, s.GetServices(), 2)
|
||||
}
|
@ -1,64 +0,0 @@
|
||||
package backgroundsvcs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/grafana/dskit/services"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/modules"
|
||||
"github.com/grafana/grafana/pkg/registry"
|
||||
)
|
||||
|
||||
// BackgroundServiceRunner provides a runner for background services.
|
||||
type BackgroundServiceRunner struct {
|
||||
*services.BasicService
|
||||
registry registry.BackgroundServiceRegistry
|
||||
log log.Logger
|
||||
}
|
||||
|
||||
func ProvideBackgroundServiceRunner(registry registry.BackgroundServiceRegistry) *BackgroundServiceRunner {
|
||||
r := &BackgroundServiceRunner{registry: registry, log: log.New("background-services-runner")}
|
||||
r.BasicService = services.NewBasicService(nil, r.run, nil).WithName(modules.BackgroundServices)
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *BackgroundServiceRunner) run(ctx context.Context) error {
|
||||
childRoutines, childCtx := errgroup.WithContext(ctx)
|
||||
|
||||
// Start background services.
|
||||
for _, svc := range r.registry.GetServices() {
|
||||
if registry.IsDisabled(svc) {
|
||||
continue
|
||||
}
|
||||
|
||||
service := svc
|
||||
serviceName := reflect.TypeOf(service).String()
|
||||
childRoutines.Go(func() error {
|
||||
select {
|
||||
case <-childCtx.Done():
|
||||
return childCtx.Err()
|
||||
default:
|
||||
}
|
||||
r.log.Debug("Starting background service", "service", serviceName)
|
||||
err := service.Run(childCtx)
|
||||
// Do not return context.Canceled error since errgroup.Group only
|
||||
// returns the first error to the caller - thus we can miss a more
|
||||
// interesting error.
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
r.log.Error("Stopped background service", "service", serviceName, "reason", err)
|
||||
return fmt.Errorf("%s run error: %w", serviceName, err)
|
||||
}
|
||||
r.log.Debug("Stopped background service", "service", serviceName, "reason", err)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
r.log.Debug("Waiting on services...")
|
||||
return childRoutines.Wait()
|
||||
}
|
@ -1,52 +0,0 @@
|
||||
package backgroundsvcs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestBackgroundServiceRunner_Run_Error(t *testing.T) {
|
||||
testErr := errors.New("boom")
|
||||
registry := NewBackgroundServiceRegistry(newTestService("A", nil, false), newTestService("B", testErr, false))
|
||||
r := ProvideBackgroundServiceRunner(registry)
|
||||
|
||||
err := r.run(context.Background())
|
||||
require.ErrorIs(t, err, testErr)
|
||||
}
|
||||
|
||||
type testBackgroundService struct {
|
||||
name string
|
||||
started chan struct{}
|
||||
runErr error
|
||||
isDisabled bool
|
||||
}
|
||||
|
||||
func newTestService(name string, runErr error, disabled bool) *testBackgroundService {
|
||||
return &testBackgroundService{
|
||||
name: name,
|
||||
started: make(chan struct{}),
|
||||
runErr: runErr,
|
||||
isDisabled: disabled,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *testBackgroundService) Run(ctx context.Context) error {
|
||||
if s.isDisabled {
|
||||
return fmt.Errorf("shouldn't run disabled service")
|
||||
}
|
||||
|
||||
if s.runErr != nil {
|
||||
return s.runErr
|
||||
}
|
||||
close(s.started)
|
||||
<-ctx.Done()
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
func (s *testBackgroundService) IsDisabled() bool {
|
||||
return s.isDisabled
|
||||
}
|
@ -7,18 +7,21 @@ import (
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/grafana/grafana/pkg/api"
|
||||
_ "github.com/grafana/grafana/pkg/extensions"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/metrics"
|
||||
"github.com/grafana/grafana/pkg/infra/usagestats/statscollector"
|
||||
"github.com/grafana/grafana/pkg/modules"
|
||||
moduleRegistry "github.com/grafana/grafana/pkg/modules/registry"
|
||||
"github.com/grafana/grafana/pkg/registry"
|
||||
"github.com/grafana/grafana/pkg/services/accesscontrol"
|
||||
"github.com/grafana/grafana/pkg/services/provisioning"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
)
|
||||
|
||||
@ -34,17 +37,17 @@ type Options struct {
|
||||
|
||||
// New returns a new instance of Server.
|
||||
func New(opts Options, cfg *setting.Cfg, httpServer *api.HTTPServer, roleRegistry accesscontrol.RoleRegistry,
|
||||
provisioningService provisioning.ProvisioningService, backgroundServiceProvider registry.BackgroundServiceRegistry,
|
||||
usageStatsProvidersRegistry registry.UsageStatsProvidersRegistry, statsCollectorService *statscollector.Service,
|
||||
moduleService modules.Engine,
|
||||
_ moduleRegistry.Registry, // imported to invoke initialization via Wire
|
||||
) (*Server, error) {
|
||||
statsCollectorService.RegisterProviders(usageStatsProvidersRegistry.GetServices())
|
||||
s, err := newServer(opts, cfg, httpServer, roleRegistry, moduleService)
|
||||
s, err := newServer(opts, cfg, httpServer, roleRegistry, provisioningService, backgroundServiceProvider, moduleService)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = s.init(context.Background()); err != nil {
|
||||
if err := s.init(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -52,23 +55,38 @@ func New(opts Options, cfg *setting.Cfg, httpServer *api.HTTPServer, roleRegistr
|
||||
}
|
||||
|
||||
func newServer(opts Options, cfg *setting.Cfg, httpServer *api.HTTPServer, roleRegistry accesscontrol.RoleRegistry,
|
||||
moduleService modules.Engine) (*Server, error) {
|
||||
return &Server{
|
||||
HTTPServer: httpServer,
|
||||
roleRegistry: roleRegistry,
|
||||
shutdownFinished: make(chan struct{}),
|
||||
log: log.New("server"),
|
||||
cfg: cfg,
|
||||
pidFile: opts.PidFile,
|
||||
version: opts.Version,
|
||||
commit: opts.Commit,
|
||||
buildBranch: opts.BuildBranch,
|
||||
moduleService: moduleService,
|
||||
}, nil
|
||||
provisioningService provisioning.ProvisioningService, backgroundServiceProvider registry.BackgroundServiceRegistry,
|
||||
moduleService modules.Engine,
|
||||
) (*Server, error) {
|
||||
rootCtx, shutdownFn := context.WithCancel(context.Background())
|
||||
childRoutines, childCtx := errgroup.WithContext(rootCtx)
|
||||
|
||||
s := &Server{
|
||||
context: childCtx,
|
||||
childRoutines: childRoutines,
|
||||
HTTPServer: httpServer,
|
||||
provisioningService: provisioningService,
|
||||
roleRegistry: roleRegistry,
|
||||
shutdownFn: shutdownFn,
|
||||
shutdownFinished: make(chan struct{}),
|
||||
log: log.New("server"),
|
||||
cfg: cfg,
|
||||
pidFile: opts.PidFile,
|
||||
version: opts.Version,
|
||||
commit: opts.Commit,
|
||||
buildBranch: opts.BuildBranch,
|
||||
backgroundServices: backgroundServiceProvider.GetServices(),
|
||||
moduleService: moduleService,
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// Server is responsible for managing the lifecycle of services.
|
||||
type Server struct {
|
||||
context context.Context
|
||||
shutdownFn context.CancelFunc
|
||||
childRoutines *errgroup.Group
|
||||
log log.Logger
|
||||
cfg *setting.Cfg
|
||||
shutdownOnce sync.Once
|
||||
@ -76,18 +94,20 @@ type Server struct {
|
||||
isInitialized bool
|
||||
mtx sync.Mutex
|
||||
|
||||
pidFile string
|
||||
version string
|
||||
commit string
|
||||
buildBranch string
|
||||
pidFile string
|
||||
version string
|
||||
commit string
|
||||
buildBranch string
|
||||
backgroundServices []registry.BackgroundService
|
||||
|
||||
HTTPServer *api.HTTPServer
|
||||
roleRegistry accesscontrol.RoleRegistry
|
||||
moduleService modules.Engine
|
||||
HTTPServer *api.HTTPServer
|
||||
roleRegistry accesscontrol.RoleRegistry
|
||||
provisioningService provisioning.ProvisioningService
|
||||
moduleService modules.Engine
|
||||
}
|
||||
|
||||
// init initializes the server and its services.
|
||||
func (s *Server) init(ctx context.Context) error {
|
||||
func (s *Server) init() error {
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
|
||||
@ -101,7 +121,7 @@ func (s *Server) init(ctx context.Context) error {
|
||||
}
|
||||
|
||||
// Initialize dskit modules.
|
||||
if err := s.moduleService.Init(ctx); err != nil {
|
||||
if err := s.moduleService.Init(s.context); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -109,28 +129,65 @@ func (s *Server) init(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.roleRegistry.RegisterFixedRoles(ctx)
|
||||
}
|
||||
if err := s.roleRegistry.RegisterFixedRoles(s.context); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// AwaitHealthy waits for the server to become healthy.
|
||||
func (s *Server) AwaitHealthy(ctx context.Context) error {
|
||||
return s.moduleService.AwaitHealthy(ctx)
|
||||
return s.provisioningService.RunInitProvisioners(s.context)
|
||||
}
|
||||
|
||||
// Run initializes and starts services. This will block until all services have
|
||||
// exited. To initiate shutdown, call the Shutdown method in another goroutine.
|
||||
func (s *Server) Run(ctx context.Context) error {
|
||||
func (s *Server) Run() error {
|
||||
defer close(s.shutdownFinished)
|
||||
|
||||
if err := s.init(ctx); err != nil {
|
||||
if err := s.init(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err := s.moduleService.Run(ctx)
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
return err
|
||||
// Start dskit modules.
|
||||
s.childRoutines.Go(func() error {
|
||||
err := s.moduleService.Run(s.context)
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
services := s.backgroundServices
|
||||
|
||||
// Start background services.
|
||||
for _, svc := range services {
|
||||
if registry.IsDisabled(svc) {
|
||||
continue
|
||||
}
|
||||
|
||||
service := svc
|
||||
serviceName := reflect.TypeOf(service).String()
|
||||
s.childRoutines.Go(func() error {
|
||||
select {
|
||||
case <-s.context.Done():
|
||||
return s.context.Err()
|
||||
default:
|
||||
}
|
||||
s.log.Debug("Starting background service", "service", serviceName)
|
||||
err := service.Run(s.context)
|
||||
// Do not return context.Canceled error since errgroup.Group only
|
||||
// returns the first error to the caller - thus we can miss a more
|
||||
// interesting error.
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
s.log.Error("Stopped background service", "service", serviceName, "reason", err)
|
||||
return fmt.Errorf("%s run error: %w", serviceName, err)
|
||||
}
|
||||
s.log.Debug("Stopped background service", "service", serviceName, "reason", err)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
return nil
|
||||
|
||||
s.notifySystemd("READY=1")
|
||||
|
||||
s.log.Debug("Waiting on services...")
|
||||
return s.childRoutines.Wait()
|
||||
}
|
||||
|
||||
// Shutdown initiates Grafana graceful shutdown. This shuts down all
|
||||
@ -140,9 +197,11 @@ func (s *Server) Shutdown(ctx context.Context, reason string) error {
|
||||
var err error
|
||||
s.shutdownOnce.Do(func() {
|
||||
s.log.Info("Shutdown started", "reason", reason)
|
||||
if err = s.moduleService.Shutdown(ctx); err != nil {
|
||||
if err := s.moduleService.Shutdown(ctx); err != nil {
|
||||
s.log.Error("Failed to shutdown modules", "error", err)
|
||||
}
|
||||
// Call cancel func to stop background services.
|
||||
s.shutdownFn()
|
||||
// Wait for server to shut down
|
||||
select {
|
||||
case <-s.shutdownFinished:
|
||||
@ -179,3 +238,33 @@ func (s *Server) writePIDFile() error {
|
||||
s.log.Info("Writing PID file", "path", s.pidFile, "pid", pid)
|
||||
return nil
|
||||
}
|
||||
|
||||
// notifySystemd sends state notifications to systemd.
|
||||
func (s *Server) notifySystemd(state string) {
|
||||
notifySocket := os.Getenv("NOTIFY_SOCKET")
|
||||
if notifySocket == "" {
|
||||
s.log.Debug(
|
||||
"NOTIFY_SOCKET environment variable empty or unset, can't send systemd notification")
|
||||
return
|
||||
}
|
||||
|
||||
socketAddr := &net.UnixAddr{
|
||||
Name: notifySocket,
|
||||
Net: "unixgram",
|
||||
}
|
||||
conn, err := net.DialUnix(socketAddr.Net, nil, socketAddr)
|
||||
if err != nil {
|
||||
s.log.Warn("Failed to connect to systemd", "err", err, "socket", notifySocket)
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
if err := conn.Close(); err != nil {
|
||||
s.log.Warn("Failed to close connection", "err", err)
|
||||
}
|
||||
}()
|
||||
|
||||
_, err = conn.Write([]byte(state))
|
||||
if err != nil {
|
||||
s.log.Warn("Failed to write notification to systemd", "err", err)
|
||||
}
|
||||
}
|
||||
|
@ -3,19 +3,52 @@ package server
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana/pkg/modules"
|
||||
"github.com/grafana/grafana/pkg/registry"
|
||||
"github.com/grafana/grafana/pkg/registry/backgroundsvcs"
|
||||
"github.com/grafana/grafana/pkg/services/accesscontrol/acimpl"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
)
|
||||
|
||||
func testServer(t *testing.T, m *modules.MockModuleEngine) *Server {
|
||||
type testService struct {
|
||||
started chan struct{}
|
||||
runErr error
|
||||
isDisabled bool
|
||||
}
|
||||
|
||||
func newTestService(runErr error, disabled bool) *testService {
|
||||
return &testService{
|
||||
started: make(chan struct{}),
|
||||
runErr: runErr,
|
||||
isDisabled: disabled,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *testService) Run(ctx context.Context) error {
|
||||
if s.isDisabled {
|
||||
return fmt.Errorf("Shouldn't run disabled service")
|
||||
}
|
||||
|
||||
if s.runErr != nil {
|
||||
return s.runErr
|
||||
}
|
||||
close(s.started)
|
||||
<-ctx.Done()
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
func (s *testService) IsDisabled() bool {
|
||||
return s.isDisabled
|
||||
}
|
||||
|
||||
func testServer(t *testing.T, services ...registry.BackgroundService) *Server {
|
||||
t.Helper()
|
||||
s, err := newServer(Options{}, setting.NewCfg(), nil, &acimpl.Service{}, m)
|
||||
s, err := newServer(Options{}, setting.NewCfg(), nil, &acimpl.Service{}, nil, backgroundsvcs.NewBackgroundServiceRegistry(services...), &MockModuleService{})
|
||||
require.NoError(t, err)
|
||||
// Required to skip configuration initialization that causes
|
||||
// DI errors in this test.
|
||||
@ -25,68 +58,62 @@ func testServer(t *testing.T, m *modules.MockModuleEngine) *Server {
|
||||
|
||||
func TestServer_Run_Error(t *testing.T) {
|
||||
testErr := errors.New("boom")
|
||||
|
||||
t.Run("Modules Run error bubbles up", func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
s := testServer(t, &modules.MockModuleEngine{
|
||||
RunFunc: func(c context.Context) error {
|
||||
require.Equal(t, ctx, c)
|
||||
return testErr
|
||||
},
|
||||
})
|
||||
|
||||
err := s.Run(ctx)
|
||||
require.ErrorIs(t, err, testErr)
|
||||
})
|
||||
s := testServer(t, newTestService(nil, false), newTestService(testErr, false))
|
||||
err := s.Run()
|
||||
require.ErrorIs(t, err, testErr)
|
||||
}
|
||||
|
||||
func TestServer_Shutdown(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
modulesShutdown := false
|
||||
s := testServer(t, &modules.MockModuleEngine{
|
||||
ShutdownFunc: func(_ context.Context) error {
|
||||
modulesShutdown = true
|
||||
return nil
|
||||
},
|
||||
})
|
||||
s := testServer(t, newTestService(nil, false), newTestService(nil, true))
|
||||
|
||||
ch := make(chan error)
|
||||
|
||||
go func() {
|
||||
defer close(ch)
|
||||
|
||||
// Wait until all services launched.
|
||||
for _, svc := range s.backgroundServices {
|
||||
if !svc.(*testService).isDisabled {
|
||||
<-svc.(*testService).started
|
||||
}
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
|
||||
defer cancel()
|
||||
err := s.Shutdown(ctx, "test interrupt")
|
||||
ch <- err
|
||||
}()
|
||||
err := s.Run(ctx)
|
||||
err := s.Run()
|
||||
require.NoError(t, err)
|
||||
|
||||
err = <-ch
|
||||
require.NoError(t, err)
|
||||
require.True(t, modulesShutdown)
|
||||
|
||||
t.Run("Modules Shutdown error bubbles up", func(t *testing.T) {
|
||||
testErr := errors.New("boom")
|
||||
|
||||
s = testServer(t, &modules.MockModuleEngine{
|
||||
ShutdownFunc: func(_ context.Context) error {
|
||||
return testErr
|
||||
},
|
||||
})
|
||||
|
||||
ch = make(chan error)
|
||||
go func() {
|
||||
defer close(ch)
|
||||
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
|
||||
defer cancel()
|
||||
err = s.Shutdown(ctx, "test interrupt")
|
||||
ch <- err
|
||||
}()
|
||||
err = s.Run(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = <-ch
|
||||
require.ErrorIs(t, err, testErr)
|
||||
})
|
||||
}
|
||||
|
||||
type MockModuleService struct {
|
||||
initFunc func(context.Context) error
|
||||
runFunc func(context.Context) error
|
||||
shutdownFunc func(context.Context) error
|
||||
}
|
||||
|
||||
func (m *MockModuleService) Init(ctx context.Context) error {
|
||||
if m.initFunc != nil {
|
||||
return m.initFunc(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MockModuleService) Run(ctx context.Context) error {
|
||||
if m.runFunc != nil {
|
||||
return m.runFunc(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MockModuleService) Shutdown(ctx context.Context) error {
|
||||
if m.shutdownFunc != nil {
|
||||
return m.shutdownFunc(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -31,9 +31,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/middleware/csrf"
|
||||
"github.com/grafana/grafana/pkg/middleware/loggermw"
|
||||
"github.com/grafana/grafana/pkg/modules"
|
||||
moduleRegistry "github.com/grafana/grafana/pkg/modules/registry"
|
||||
pluginDashboards "github.com/grafana/grafana/pkg/plugins/manager/dashboards"
|
||||
"github.com/grafana/grafana/pkg/registry/backgroundsvcs"
|
||||
"github.com/grafana/grafana/pkg/registry/corekind"
|
||||
"github.com/grafana/grafana/pkg/services/accesscontrol"
|
||||
"github.com/grafana/grafana/pkg/services/accesscontrol/acimpl"
|
||||
@ -363,8 +361,6 @@ var wireBasicSet = wire.NewSet(
|
||||
wire.Bind(new(oauthserver.OAuth2Server), new(*oasimpl.OAuth2ServiceImpl)),
|
||||
loggermw.Provide,
|
||||
modules.WireSet,
|
||||
moduleRegistry.WireSet,
|
||||
backgroundsvcs.ProvideBackgroundServiceRunner,
|
||||
signingkeysimpl.ProvideEmbeddedSigningKeysService,
|
||||
wire.Bind(new(signingkeys.Service), new(*signingkeysimpl.Service)),
|
||||
)
|
||||
|
@ -3,12 +3,11 @@ package certgenerator
|
||||
import (
|
||||
"path/filepath"
|
||||
|
||||
"github.com/grafana/grafana/pkg/modules"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
|
||||
"github.com/grafana/grafana-apiserver/pkg/certgenerator"
|
||||
)
|
||||
|
||||
func ProvideService(cfg *setting.Cfg) (*certgenerator.Service, error) {
|
||||
return certgenerator.CreateService(modules.CertGenerator, filepath.Join(cfg.DataPath, "k8s"))
|
||||
return certgenerator.CreateService("cert-generator", filepath.Join(cfg.DataPath, "k8s"))
|
||||
}
|
||||
|
@ -24,10 +24,10 @@ import (
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"github.com/grafana/grafana-apiserver/pkg/certgenerator"
|
||||
|
||||
"github.com/grafana/grafana/pkg/api/routing"
|
||||
"github.com/grafana/grafana/pkg/infra/appcontext"
|
||||
"github.com/grafana/grafana/pkg/middleware"
|
||||
"github.com/grafana/grafana/pkg/modules"
|
||||
contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/web"
|
||||
@ -69,7 +69,7 @@ func ProvideService(cfg *setting.Cfg, rr routing.RouteRegister) (*service, error
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
s.BasicService = services.NewBasicService(s.start, s.running, nil).WithName(modules.GrafanaAPIServer)
|
||||
s.BasicService = services.NewBasicService(s.start, s.running, nil).WithName("grafana-apiserver")
|
||||
|
||||
s.rr.Group("/k8s", func(k8sRoute routing.RouteRegister) {
|
||||
handler := func(c *contextmodel.ReqContext) {
|
||||
|
@ -6,12 +6,10 @@ import (
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/grafana/dskit/services"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/db"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/modules"
|
||||
plugifaces "github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/registry"
|
||||
"github.com/grafana/grafana/pkg/services/accesscontrol"
|
||||
"github.com/grafana/grafana/pkg/services/alerting"
|
||||
"github.com/grafana/grafana/pkg/services/correlations"
|
||||
@ -54,7 +52,7 @@ func ProvideService(
|
||||
secrectService secrets.Service,
|
||||
orgService org.Service,
|
||||
) (*ProvisioningServiceImpl, error) {
|
||||
ps := &ProvisioningServiceImpl{
|
||||
s := &ProvisioningServiceImpl{
|
||||
Cfg: cfg,
|
||||
SQLStore: sqlStore,
|
||||
ac: ac,
|
||||
@ -78,14 +76,12 @@ func ProvideService(
|
||||
log: log.New("provisioning"),
|
||||
orgService: orgService,
|
||||
}
|
||||
|
||||
ps.BasicService = services.NewBasicService(ps.RunInitProvisioners, ps.Run, nil).WithName(modules.Provisioning)
|
||||
|
||||
return ps, nil
|
||||
return s, nil
|
||||
}
|
||||
|
||||
type ProvisioningService interface {
|
||||
services.NamedService
|
||||
registry.BackgroundService
|
||||
RunInitProvisioners(ctx context.Context) error
|
||||
ProvisionDatasources(ctx context.Context) error
|
||||
ProvisionPlugins(ctx context.Context) error
|
||||
ProvisionNotifications(ctx context.Context) error
|
||||
@ -93,21 +89,18 @@ type ProvisioningService interface {
|
||||
ProvisionAlerting(ctx context.Context) error
|
||||
GetDashboardProvisionerResolvedPath(name string) string
|
||||
GetAllowUIUpdatesFromConfig(name string) bool
|
||||
RunInitProvisioners(ctx context.Context) error
|
||||
}
|
||||
|
||||
// Add a public constructor for overriding service to be able to instantiate OSS as fallback
|
||||
func NewProvisioningServiceImpl() *ProvisioningServiceImpl {
|
||||
logger := log.New("provisioning")
|
||||
ps := &ProvisioningServiceImpl{
|
||||
return &ProvisioningServiceImpl{
|
||||
log: logger,
|
||||
newDashboardProvisioner: dashboards.New,
|
||||
provisionNotifiers: notifiers.Provision,
|
||||
provisionDatasources: datasources.Provision,
|
||||
provisionPlugins: plugins.Provision,
|
||||
}
|
||||
ps.BasicService = services.NewBasicService(ps.RunInitProvisioners, ps.Run, nil).WithName(modules.Provisioning)
|
||||
return ps
|
||||
}
|
||||
|
||||
// Used for testing purposes
|
||||
@ -117,20 +110,16 @@ func newProvisioningServiceImpl(
|
||||
provisionDatasources func(context.Context, string, datasources.Store, datasources.CorrelationsStore, org.Service) error,
|
||||
provisionPlugins func(context.Context, string, plugifaces.Store, pluginsettings.Service, org.Service) error,
|
||||
) *ProvisioningServiceImpl {
|
||||
ps := &ProvisioningServiceImpl{
|
||||
return &ProvisioningServiceImpl{
|
||||
log: log.New("provisioning"),
|
||||
newDashboardProvisioner: newDashboardProvisioner,
|
||||
provisionNotifiers: provisionNotifiers,
|
||||
provisionDatasources: provisionDatasources,
|
||||
provisionPlugins: provisionPlugins,
|
||||
}
|
||||
ps.BasicService = services.NewBasicService(ps.RunInitProvisioners, ps.Run, nil).WithName(modules.Provisioning)
|
||||
return ps
|
||||
}
|
||||
|
||||
type ProvisioningServiceImpl struct {
|
||||
*services.BasicService
|
||||
|
||||
Cfg *setting.Cfg
|
||||
SQLStore db.DB
|
||||
orgService org.Service
|
||||
@ -208,10 +197,8 @@ func (ps *ProvisioningServiceImpl) Run(ctx context.Context) error {
|
||||
continue
|
||||
case <-ctx.Done():
|
||||
// Root server context was cancelled so cancel polling and leave.
|
||||
ps.mutex.Lock()
|
||||
ps.cancelPolling()
|
||||
ps.mutex.Unlock()
|
||||
return nil
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,12 +1,6 @@
|
||||
package provisioning
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/grafana/dskit/services"
|
||||
|
||||
"github.com/grafana/grafana/pkg/modules"
|
||||
)
|
||||
import "context"
|
||||
|
||||
type Calls struct {
|
||||
RunInitProvisioners []interface{}
|
||||
@ -21,7 +15,6 @@ type Calls struct {
|
||||
}
|
||||
|
||||
type ProvisioningServiceMock struct {
|
||||
*services.BasicService
|
||||
Calls *Calls
|
||||
RunInitProvisionersFunc func(ctx context.Context) error
|
||||
ProvisionDatasourcesFunc func(ctx context.Context) error
|
||||
@ -34,11 +27,9 @@ type ProvisioningServiceMock struct {
|
||||
}
|
||||
|
||||
func NewProvisioningServiceMock(ctx context.Context) *ProvisioningServiceMock {
|
||||
s := &ProvisioningServiceMock{
|
||||
return &ProvisioningServiceMock{
|
||||
Calls: &Calls{},
|
||||
}
|
||||
s.BasicService = services.NewBasicService(s.RunInitProvisioners, s.Run, nil).WithName(modules.Provisioning)
|
||||
return s
|
||||
}
|
||||
|
||||
func (mock *ProvisioningServiceMock) RunInitProvisioners(ctx context.Context) error {
|
||||
|
@ -40,7 +40,7 @@ func TestProvisioningServiceImpl(t *testing.T) {
|
||||
serviceTest.waitForStop()
|
||||
|
||||
assert.False(t, serviceTest.serviceRunning, "Service should not be running")
|
||||
assert.Nil(t, serviceTest.serviceError, "Service should not return canceled error")
|
||||
assert.Equal(t, context.Canceled, serviceTest.serviceError, "Service should have returned canceled error")
|
||||
})
|
||||
|
||||
t.Run("Failed reloading does not stop polling with old provisioned", func(t *testing.T) {
|
||||
|
@ -5,11 +5,9 @@ import (
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/dskit/services"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/serverlock"
|
||||
"github.com/grafana/grafana/pkg/modules"
|
||||
"github.com/grafana/grafana/pkg/registry"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
)
|
||||
|
||||
@ -23,21 +21,15 @@ type SecretMigrationService interface {
|
||||
}
|
||||
|
||||
type SecretMigrationProvider interface {
|
||||
registry.BackgroundService
|
||||
TriggerPluginMigration(ctx context.Context, toPlugin bool) error
|
||||
}
|
||||
|
||||
type SecretMigrationProviderImpl struct {
|
||||
migServices []SecretMigrationService
|
||||
services []SecretMigrationService
|
||||
ServerLockService *serverlock.ServerLockService
|
||||
migrateToPluginService *MigrateToPluginService
|
||||
migrateFromPluginService *MigrateFromPluginService
|
||||
|
||||
// SecretMigrationProviderImpl is a dskit module Note on dskit module usage:
|
||||
// The SecretMigrationProviderImpl iterates over several service's
|
||||
// Migration() method sequentially. dskit has the concept of a service
|
||||
// Manager which launches services. We could use the Manager here, but it
|
||||
// seems heavyweight given that these services only log errors.
|
||||
*services.BasicService
|
||||
}
|
||||
|
||||
func ProvideSecretMigrationProvider(
|
||||
@ -47,30 +39,27 @@ func ProvideSecretMigrationProvider(
|
||||
migrateToPluginService *MigrateToPluginService,
|
||||
migrateFromPluginService *MigrateFromPluginService,
|
||||
) *SecretMigrationProviderImpl {
|
||||
migServices := make([]SecretMigrationService, 0)
|
||||
migServices = append(migServices, dataSourceSecretMigrationService)
|
||||
services := make([]SecretMigrationService, 0)
|
||||
services = append(services, dataSourceSecretMigrationService)
|
||||
// Plugin migration should always be last; should either migrate to or from, not both
|
||||
// This is because the migrateTo checks for use_plugin = true, in which case we should always
|
||||
// migrate by default to ensure users don't lose access to secrets. If migration has
|
||||
// already occurred, the migrateTo function will be called but it won't do anything
|
||||
if cfg.SectionWithEnvOverrides("secrets").Key("migrate_from_plugin").MustBool(false) {
|
||||
migServices = append(migServices, migrateFromPluginService)
|
||||
services = append(services, migrateFromPluginService)
|
||||
} else {
|
||||
migServices = append(migServices, migrateToPluginService)
|
||||
services = append(services, migrateToPluginService)
|
||||
}
|
||||
|
||||
s := &SecretMigrationProviderImpl{
|
||||
return &SecretMigrationProviderImpl{
|
||||
ServerLockService: serverLockService,
|
||||
migServices: migServices,
|
||||
services: services,
|
||||
migrateToPluginService: migrateToPluginService,
|
||||
migrateFromPluginService: migrateFromPluginService,
|
||||
}
|
||||
|
||||
s.BasicService = services.NewIdleService(s.start, nil).WithName(modules.SecretMigrator)
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *SecretMigrationProviderImpl) start(ctx context.Context) error {
|
||||
func (s *SecretMigrationProviderImpl) Run(ctx context.Context) error {
|
||||
return s.Migrate(ctx)
|
||||
}
|
||||
|
||||
@ -79,7 +68,7 @@ func (s *SecretMigrationProviderImpl) start(ctx context.Context) error {
|
||||
func (s *SecretMigrationProviderImpl) Migrate(ctx context.Context) error {
|
||||
// Start migration services.
|
||||
err := s.ServerLockService.LockExecuteAndRelease(ctx, actionName, time.Minute*10, func(context.Context) {
|
||||
for _, service := range s.migServices {
|
||||
for _, service := range s.services {
|
||||
serviceName := reflect.TypeOf(service).String()
|
||||
logger.Debug("Starting secret migration service", "service", serviceName)
|
||||
err := service.Migrate(ctx)
|
||||
|
@ -966,7 +966,7 @@ var skipStaticRootValidation = false
|
||||
|
||||
func NewCfg() *Cfg {
|
||||
return &Cfg{
|
||||
Target: []string{"all"},
|
||||
Target: []string{},
|
||||
Logger: log.New("settings"),
|
||||
Raw: ini.Empty(),
|
||||
Azure: &azsettings.AzureSettings{},
|
||||
@ -1025,8 +1025,10 @@ func (cfg *Cfg) Load(args CommandLineArgs) error {
|
||||
|
||||
cfg.ErrTemplateName = "error"
|
||||
|
||||
Target := valueAsString(iniFile.Section(""), "target", "all")
|
||||
cfg.Target = strings.Split(Target, " ")
|
||||
Target := valueAsString(iniFile.Section(""), "target", "")
|
||||
if Target != "" {
|
||||
cfg.Target = strings.Split(Target, " ")
|
||||
}
|
||||
Env = valueAsString(iniFile.Section(""), "app_mode", "development")
|
||||
cfg.Env = Env
|
||||
cfg.ForceMigration = iniFile.Section("").Key("force_migration").MustBool(false)
|
||||
|
@ -1,43 +0,0 @@
|
||||
package systemd
|
||||
|
||||
import (
|
||||
"net"
|
||||
"os"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
)
|
||||
|
||||
// NotifyReady sends READY state notifications to systemd.
|
||||
func NotifyReady(log log.Logger) {
|
||||
notify(log, "READY=1")
|
||||
}
|
||||
|
||||
// notify sends state notifications to systemd.
|
||||
func notify(log log.Logger, state string) {
|
||||
notifySocket := os.Getenv("NOTIFY_SOCKET")
|
||||
if notifySocket == "" {
|
||||
log.Debug(
|
||||
"NOTIFY_SOCKET environment variable empty or unset, can't send systemd notification")
|
||||
return
|
||||
}
|
||||
|
||||
socketAddr := &net.UnixAddr{
|
||||
Name: notifySocket,
|
||||
Net: "unixgram",
|
||||
}
|
||||
conn, err := net.DialUnix(socketAddr.Net, nil, socketAddr)
|
||||
if err != nil {
|
||||
log.Warn("Failed to connect to systemd", "err", err, "socket", notifySocket)
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
if err = conn.Close(); err != nil {
|
||||
log.Warn("Failed to close connection", "err", err)
|
||||
}
|
||||
}()
|
||||
|
||||
_, err = conn.Write([]byte(state))
|
||||
if err != nil {
|
||||
log.Warn("Failed to write notification to systemd", "err", err)
|
||||
}
|
||||
}
|
@ -60,7 +60,7 @@ func StartGrafanaEnv(t *testing.T, grafDir, cfgPath string) (string, *server.Tes
|
||||
|
||||
go func() {
|
||||
// When the server runs, it will also build and initialize the service graph
|
||||
if err := env.Server.Run(ctx); err != nil {
|
||||
if err := env.Server.Run(); err != nil {
|
||||
t.Log("Server exited uncleanly", "error", err)
|
||||
}
|
||||
}()
|
||||
@ -71,8 +71,6 @@ func StartGrafanaEnv(t *testing.T, grafDir, cfgPath string) (string, *server.Tes
|
||||
})
|
||||
|
||||
// Wait for Grafana to be ready
|
||||
err = env.Server.AwaitHealthy(ctx)
|
||||
require.NoError(t, err)
|
||||
addr := listener.Addr().String()
|
||||
resp, err := http.Get(fmt.Sprintf("http://%s/api/health", addr))
|
||||
require.NoError(t, err)
|
||||
|
Loading…
Reference in New Issue
Block a user