mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Chore: Add initial support for deployment modes (#63992)
Co-authored-by: Will Browne <wbrowne@users.noreply.github.com>
This commit is contained in:
parent
3ebc604bb7
commit
e217854c24
1
.github/CODEOWNERS
vendored
1
.github/CODEOWNERS
vendored
@ -262,6 +262,7 @@
|
||||
/pkg/services/querylibrary/ @grafana/multitenancy-squad
|
||||
/pkg/infra/filestorage/ @grafana/multitenancy-squad
|
||||
/pkg/util/converter/ @grafana/multitenancy-squad
|
||||
/pkg/modules/ @grafana/multitenancy-squad
|
||||
|
||||
# Alerting
|
||||
/pkg/services/ngalert/ @grafana/alerting-squad-backend
|
||||
|
50
pkg/modules/listener.go
Normal file
50
pkg/modules/listener.go
Normal file
@ -0,0 +1,50 @@
|
||||
package modules
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/grafana/dskit/modules"
|
||||
"github.com/grafana/dskit/services"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
)
|
||||
|
||||
var _ services.ManagerListener = (*serviceListener)(nil)
|
||||
|
||||
type serviceListener struct {
|
||||
log log.Logger
|
||||
service *service
|
||||
}
|
||||
|
||||
func newServiceListener(logger log.Logger, s *service) *serviceListener {
|
||||
return &serviceListener{log: logger, service: s}
|
||||
}
|
||||
|
||||
func (l *serviceListener) Healthy() {
|
||||
l.log.Info("All modules healthy")
|
||||
}
|
||||
|
||||
func (l *serviceListener) Stopped() {
|
||||
l.log.Info("All modules stopped")
|
||||
}
|
||||
|
||||
func (l *serviceListener) Failure(service services.Service) {
|
||||
// if any service fails, stop all services
|
||||
if err := l.service.Shutdown(context.Background()); err != nil {
|
||||
l.log.Error("Failed to stop all modules", "err", err)
|
||||
}
|
||||
|
||||
// log which module failed
|
||||
for module, s := range l.service.ServiceMap {
|
||||
if s == service {
|
||||
if errors.Is(service.FailureCase(), modules.ErrStopProcess) {
|
||||
l.log.Info("Received stop signal via return error", "module", module, "err", service.FailureCase())
|
||||
} else {
|
||||
l.log.Error("Module failed", "module", module, "err", service.FailureCase())
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
l.log.Error("Module failed", "module", "unknown", "err", service.FailureCase())
|
||||
}
|
155
pkg/modules/modules.go
Normal file
155
pkg/modules/modules.go
Normal file
@ -0,0 +1,155 @@
|
||||
package modules
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/grafana/dskit/modules"
|
||||
"github.com/grafana/dskit/services"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
)
|
||||
|
||||
// List of available targets.
|
||||
const (
|
||||
All string = "all"
|
||||
)
|
||||
|
||||
type Engine interface {
|
||||
Init(context.Context) error
|
||||
Run(context.Context) error
|
||||
Shutdown(context.Context) error
|
||||
}
|
||||
|
||||
type Manager interface {
|
||||
RegisterModule(name string, initFn func() (services.Service, error), deps ...string)
|
||||
RegisterInvisibleModule(name string, initFn func() (services.Service, error), deps ...string)
|
||||
}
|
||||
|
||||
var _ Engine = (*service)(nil)
|
||||
var _ Manager = (*service)(nil)
|
||||
|
||||
// service manages the registration and lifecycle of modules.
|
||||
type service struct {
|
||||
cfg *setting.Cfg
|
||||
log log.Logger
|
||||
targets []string
|
||||
dependencyMap map[string][]string
|
||||
|
||||
ModuleManager *modules.Manager
|
||||
ServiceManager *services.Manager
|
||||
ServiceMap map[string]services.Service
|
||||
}
|
||||
|
||||
func ProvideService(cfg *setting.Cfg) *service {
|
||||
logger := log.New("modules")
|
||||
|
||||
return &service{
|
||||
cfg: cfg,
|
||||
log: logger,
|
||||
targets: cfg.Target,
|
||||
dependencyMap: map[string][]string{},
|
||||
|
||||
ModuleManager: modules.NewManager(logger),
|
||||
ServiceMap: map[string]services.Service{},
|
||||
}
|
||||
}
|
||||
|
||||
// Init initializes all registered modules.
|
||||
func (m *service) Init(_ context.Context) error {
|
||||
var err error
|
||||
|
||||
// module registration
|
||||
m.RegisterModule(All, nil)
|
||||
|
||||
for mod, targets := range m.dependencyMap {
|
||||
if err := m.ModuleManager.AddDependency(mod, targets...); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
m.ServiceMap, err = m.ModuleManager.InitModuleServices(m.targets...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// if no modules are registered, we don't need to start the service manager
|
||||
if len(m.ServiceMap) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var svcs []services.Service
|
||||
for _, s := range m.ServiceMap {
|
||||
svcs = append(svcs, s)
|
||||
}
|
||||
m.ServiceManager, err = services.NewManager(svcs...)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Run starts all registered modules.
|
||||
func (m *service) Run(ctx context.Context) error {
|
||||
// we don't need to continue if no modules are registered.
|
||||
// this behavior may need to change if dskit services replace the
|
||||
// current background service registry.
|
||||
if len(m.ServiceMap) == 0 {
|
||||
m.log.Warn("No modules registered...")
|
||||
<-ctx.Done()
|
||||
return nil
|
||||
}
|
||||
|
||||
listener := newServiceListener(m.log, m)
|
||||
m.ServiceManager.AddListener(listener)
|
||||
|
||||
// wait until a service fails or stop signal was received
|
||||
err := m.ServiceManager.StartAsync(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = m.ServiceManager.AwaitStopped(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
failed := m.ServiceManager.ServicesByState()[services.Failed]
|
||||
for _, f := range failed {
|
||||
// the service listener will log error details for all modules that failed,
|
||||
// so here we return the first error that is not ErrStopProcess
|
||||
if !errors.Is(f.FailureCase(), modules.ErrStopProcess) {
|
||||
return f.FailureCase()
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown stops all modules and waits for them to stop.
|
||||
func (m *service) Shutdown(ctx context.Context) error {
|
||||
if m.ServiceManager == nil {
|
||||
m.log.Debug("No modules registered, nothing to stop...")
|
||||
return nil
|
||||
}
|
||||
m.ServiceManager.StopAsync()
|
||||
m.log.Info("Awaiting services to be stopped...")
|
||||
return m.ServiceManager.AwaitStopped(ctx)
|
||||
}
|
||||
|
||||
// RegisterModule registers a module with the dskit module manager.
|
||||
func (m *service) RegisterModule(name string, initFn func() (services.Service, error), deps ...string) {
|
||||
m.ModuleManager.RegisterModule(name, initFn)
|
||||
m.dependencyMap[name] = deps
|
||||
}
|
||||
|
||||
// RegisterInvisibleModule registers an invisible module with the dskit module manager.
|
||||
// Invisible modules are not visible to the user, and are intendent to be used as dependencies.
|
||||
func (m *service) RegisterInvisibleModule(name string, initFn func() (services.Service, error), deps ...string) {
|
||||
m.ModuleManager.RegisterModule(name, initFn, modules.UserInvisibleModule)
|
||||
m.dependencyMap[name] = deps
|
||||
}
|
||||
|
||||
// IsModuleEnabled returns true if the module is enabled.
|
||||
func (m *service) IsModuleEnabled(name string) bool {
|
||||
return stringsContain(m.targets, name)
|
||||
}
|
11
pkg/modules/util.go
Normal file
11
pkg/modules/util.go
Normal file
@ -0,0 +1,11 @@
|
||||
package modules
|
||||
|
||||
func stringsContain(values []string, search string) bool {
|
||||
for _, v := range values {
|
||||
if search == v {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
9
pkg/modules/wire.go
Normal file
9
pkg/modules/wire.go
Normal file
@ -0,0 +1,9 @@
|
||||
package modules
|
||||
|
||||
import "github.com/google/wire"
|
||||
|
||||
var WireSet = wire.NewSet(
|
||||
ProvideService,
|
||||
wire.Bind(new(Engine), new(*service)),
|
||||
wire.Bind(new(Manager), new(*service)),
|
||||
)
|
@ -18,6 +18,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/metrics"
|
||||
"github.com/grafana/grafana/pkg/infra/usagestats/statscollector"
|
||||
"github.com/grafana/grafana/pkg/modules"
|
||||
"github.com/grafana/grafana/pkg/registry"
|
||||
"github.com/grafana/grafana/pkg/services/accesscontrol"
|
||||
"github.com/grafana/grafana/pkg/services/provisioning"
|
||||
@ -38,9 +39,10 @@ type Options struct {
|
||||
func New(opts Options, cfg *setting.Cfg, httpServer *api.HTTPServer, roleRegistry accesscontrol.RoleRegistry,
|
||||
provisioningService provisioning.ProvisioningService, backgroundServiceProvider registry.BackgroundServiceRegistry,
|
||||
usageStatsProvidersRegistry registry.UsageStatsProvidersRegistry, statsCollectorService *statscollector.Service,
|
||||
moduleService modules.Engine,
|
||||
) (*Server, error) {
|
||||
statsCollectorService.RegisterProviders(usageStatsProvidersRegistry.GetServices())
|
||||
s, err := newServer(opts, cfg, httpServer, roleRegistry, provisioningService, backgroundServiceProvider)
|
||||
s, err := newServer(opts, cfg, httpServer, roleRegistry, provisioningService, backgroundServiceProvider, moduleService)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -54,6 +56,7 @@ func New(opts Options, cfg *setting.Cfg, httpServer *api.HTTPServer, roleRegistr
|
||||
|
||||
func newServer(opts Options, cfg *setting.Cfg, httpServer *api.HTTPServer, roleRegistry accesscontrol.RoleRegistry,
|
||||
provisioningService provisioning.ProvisioningService, backgroundServiceProvider registry.BackgroundServiceRegistry,
|
||||
moduleService modules.Engine,
|
||||
) (*Server, error) {
|
||||
rootCtx, shutdownFn := context.WithCancel(context.Background())
|
||||
childRoutines, childCtx := errgroup.WithContext(rootCtx)
|
||||
@ -73,6 +76,7 @@ func newServer(opts Options, cfg *setting.Cfg, httpServer *api.HTTPServer, roleR
|
||||
commit: opts.Commit,
|
||||
buildBranch: opts.BuildBranch,
|
||||
backgroundServices: backgroundServiceProvider.GetServices(),
|
||||
moduleService: moduleService,
|
||||
}
|
||||
|
||||
return s, nil
|
||||
@ -99,6 +103,7 @@ type Server struct {
|
||||
HTTPServer *api.HTTPServer
|
||||
roleRegistry accesscontrol.RoleRegistry
|
||||
provisioningService provisioning.ProvisioningService
|
||||
moduleService modules.Engine
|
||||
}
|
||||
|
||||
// init initializes the server and its services.
|
||||
@ -115,6 +120,11 @@ func (s *Server) init() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Initialize dskit modules.
|
||||
if err := s.moduleService.Init(s.context); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := metrics.SetEnvironmentInformation(s.cfg.MetricsGrafanaEnvironmentInfo); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -135,6 +145,15 @@ func (s *Server) Run() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Start dskit modules.
|
||||
s.childRoutines.Go(func() error {
|
||||
err := s.moduleService.Run(s.context)
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
services := s.backgroundServices
|
||||
|
||||
// Start background services.
|
||||
@ -178,7 +197,10 @@ func (s *Server) Shutdown(ctx context.Context, reason string) error {
|
||||
var err error
|
||||
s.shutdownOnce.Do(func() {
|
||||
s.log.Info("Shutdown started", "reason", reason)
|
||||
// Call cancel func to stop services.
|
||||
if err := s.moduleService.Shutdown(ctx); err != nil {
|
||||
s.log.Error("Failed to shutdown modules", "error", err)
|
||||
}
|
||||
// Call cancel func to stop background services.
|
||||
s.shutdownFn()
|
||||
// Wait for server to shut down
|
||||
select {
|
||||
|
@ -48,7 +48,7 @@ func (s *testService) IsDisabled() bool {
|
||||
|
||||
func testServer(t *testing.T, services ...registry.BackgroundService) *Server {
|
||||
t.Helper()
|
||||
s, err := newServer(Options{}, setting.NewCfg(), nil, &acimpl.Service{}, nil, backgroundsvcs.NewBackgroundServiceRegistry(services...))
|
||||
s, err := newServer(Options{}, setting.NewCfg(), nil, &acimpl.Service{}, nil, backgroundsvcs.NewBackgroundServiceRegistry(services...), &MockModuleService{})
|
||||
require.NoError(t, err)
|
||||
// Required to skip configuration initialization that causes
|
||||
// DI errors in this test.
|
||||
@ -90,3 +90,30 @@ func TestServer_Shutdown(t *testing.T) {
|
||||
err = <-ch
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
type MockModuleService struct {
|
||||
initFunc func(context.Context) error
|
||||
runFunc func(context.Context) error
|
||||
shutdownFunc func(context.Context) error
|
||||
}
|
||||
|
||||
func (m *MockModuleService) Init(ctx context.Context) error {
|
||||
if m.initFunc != nil {
|
||||
return m.initFunc(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MockModuleService) Run(ctx context.Context) error {
|
||||
if m.runFunc != nil {
|
||||
return m.runFunc(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MockModuleService) Shutdown(ctx context.Context) error {
|
||||
if m.shutdownFunc != nil {
|
||||
return m.shutdownFunc(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -30,6 +30,7 @@ import (
|
||||
loginpkg "github.com/grafana/grafana/pkg/login"
|
||||
"github.com/grafana/grafana/pkg/login/social"
|
||||
"github.com/grafana/grafana/pkg/middleware/csrf"
|
||||
"github.com/grafana/grafana/pkg/modules"
|
||||
pluginDashboards "github.com/grafana/grafana/pkg/plugins/manager/dashboards"
|
||||
"github.com/grafana/grafana/pkg/registry/corekind"
|
||||
"github.com/grafana/grafana/pkg/services/accesscontrol"
|
||||
@ -366,6 +367,7 @@ var wireBasicSet = wire.NewSet(
|
||||
authnimpl.ProvideService,
|
||||
wire.Bind(new(authn.Service), new(*authnimpl.Service)),
|
||||
supportbundlesimpl.ProvideService,
|
||||
modules.WireSet,
|
||||
)
|
||||
|
||||
var wireSet = wire.NewSet(
|
||||
|
@ -145,6 +145,7 @@ var (
|
||||
|
||||
// TODO move all global vars to this struct
|
||||
type Cfg struct {
|
||||
Target []string
|
||||
Raw *ini.File
|
||||
Logger log.Logger
|
||||
|
||||
@ -908,6 +909,7 @@ var skipStaticRootValidation = false
|
||||
|
||||
func NewCfg() *Cfg {
|
||||
return &Cfg{
|
||||
Target: []string{"all"},
|
||||
Logger: log.New("settings"),
|
||||
Raw: ini.Empty(),
|
||||
Azure: &azsettings.AzureSettings{},
|
||||
@ -966,6 +968,8 @@ func (cfg *Cfg) Load(args CommandLineArgs) error {
|
||||
|
||||
cfg.ErrTemplateName = "error"
|
||||
|
||||
Target := valueAsString(iniFile.Section(""), "target", "all")
|
||||
cfg.Target = strings.Split(Target, " ")
|
||||
Env = valueAsString(iniFile.Section(""), "app_mode", "development")
|
||||
cfg.Env = Env
|
||||
cfg.ForceMigration = iniFile.Section("").Key("force_migration").MustBool(false)
|
||||
@ -1265,6 +1269,7 @@ func (cfg *Cfg) LogConfigSources() {
|
||||
}
|
||||
}
|
||||
|
||||
cfg.Logger.Info("Target", "target", cfg.Target)
|
||||
cfg.Logger.Info("Path Home", "path", HomePath)
|
||||
cfg.Logger.Info("Path Data", "path", cfg.DataPath)
|
||||
cfg.Logger.Info("Path Logs", "path", cfg.LogsPath)
|
||||
|
Loading…
Reference in New Issue
Block a user