Chore: refactor grafana-apiserver a bit (#74177)

This commit is contained in:
Kristin Laemmert
2023-08-31 09:12:01 -04:00
committed by GitHub
parent 439270f6cb
commit d1876b68bc
14 changed files with 129 additions and 187 deletions

1
.github/CODEOWNERS vendored
View File

@@ -98,7 +98,6 @@
/pkg/server/ @grafana/backend-platform
/pkg/services/annotations/ @grafana/backend-platform
/pkg/services/apikey/ @grafana/grafana-authnz-team
/pkg/services/certgenerator @grafana/grafana-app-platform-squad
/pkg/services/cleanup/ @grafana/backend-platform
/pkg/services/contexthandler/ @grafana/backend-platform
/pkg/services/correlations/ @grafana/explore-squad

View File

@@ -0,0 +1,15 @@
package modules
const (
// All includes all modules necessary for Grafana to run as a standalone server
All string = "all"
Core string = "core"
GrafanaAPIServer string = "grafana-apiserver"
)
var dependencyMap = map[string][]string{
GrafanaAPIServer: {},
Core: {},
All: {Core},
}

View File

@@ -6,6 +6,7 @@ import (
"github.com/grafana/dskit/modules"
"github.com/grafana/dskit/services"
"github.com/grafana/grafana/pkg/infra/log"
)
@@ -30,12 +31,12 @@ func (l *serviceListener) Stopped() {
func (l *serviceListener) Failure(service services.Service) {
// if any service fails, stop all services
if err := l.service.Shutdown(context.Background()); err != nil {
if err := l.service.Shutdown(context.Background(), service.FailureCase().Error()); err != nil {
l.log.Error("Failed to stop all modules", "err", err)
}
// log which module failed
for module, s := range l.service.ServiceMap {
for module, s := range l.service.serviceMap {
if s == service {
if errors.Is(service.FailureCase(), modules.ErrStopProcess) {
l.log.Info("Received stop signal via return error", "module", module, "err", service.FailureCase())

View File

@@ -8,19 +8,16 @@ import (
"github.com/grafana/dskit/services"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/setting"
)
type Engine interface {
Init(context.Context) error
Run(context.Context) error
Shutdown(context.Context) error
Shutdown(context.Context, string) error
}
type Manager interface {
RegisterModule(name string, initFn func() (services.Service, error), deps ...string)
RegisterInvisibleModule(name string, initFn func() (services.Service, error), deps ...string)
RegisterModule(name string, fn initFn)
RegisterInvisibleModule(name string, fn initFn)
}
var _ Engine = (*service)(nil)
@@ -28,89 +25,86 @@ var _ Manager = (*service)(nil)
// service manages the registration and lifecycle of modules.
type service struct {
cfg *setting.Cfg
log log.Logger
targets []string
dependencyMap map[string][]string
log log.Logger
targets []string
ModuleManager *modules.Manager
ServiceManager *services.Manager
ServiceMap map[string]services.Service
moduleManager *modules.Manager
serviceManager *services.Manager
serviceMap map[string]services.Service
}
func ProvideService(
cfg *setting.Cfg,
features *featuremgmt.FeatureManager,
func New(
targets []string,
) *service {
logger := log.New("modules")
return &service{
cfg: cfg,
log: logger,
targets: cfg.Target,
dependencyMap: map[string][]string{},
log: logger,
targets: targets,
ModuleManager: modules.NewManager(logger),
ServiceMap: map[string]services.Service{},
moduleManager: modules.NewManager(logger),
serviceMap: map[string]services.Service{},
}
}
// Init initializes all registered modules.
func (m *service) Init(_ context.Context) error {
// Run starts all registered modules.
func (m *service) Run(ctx context.Context) error {
var err error
for mod, targets := range m.dependencyMap {
if err := m.ModuleManager.AddDependency(mod, targets...); err != nil {
for mod, targets := range dependencyMap {
if !m.moduleManager.IsModuleRegistered(mod) {
continue
}
if err := m.moduleManager.AddDependency(mod, targets...); err != nil {
return err
}
}
m.ServiceMap, err = m.ModuleManager.InitModuleServices(m.targets...)
m.serviceMap, err = m.moduleManager.InitModuleServices(m.targets...)
if err != nil {
return err
}
// if no modules are registered, we don't need to start the service manager
if len(m.ServiceMap) == 0 {
if len(m.serviceMap) == 0 {
return nil
}
var svcs []services.Service
for _, s := range m.ServiceMap {
for _, s := range m.serviceMap {
svcs = append(svcs, s)
}
m.ServiceManager, err = services.NewManager(svcs...)
return err
}
m.serviceManager, err = services.NewManager(svcs...)
if err != nil {
return err
}
// Run starts all registered modules.
func (m *service) Run(ctx context.Context) error {
// we don't need to continue if no modules are registered.
// this behavior may need to change if dskit services replace the
// current background service registry.
if len(m.ServiceMap) == 0 {
if len(m.serviceMap) == 0 {
m.log.Warn("No modules registered...")
<-ctx.Done()
return nil
}
listener := newServiceListener(m.log, m)
m.ServiceManager.AddListener(listener)
m.serviceManager.AddListener(listener)
m.log.Debug("Starting module service manager")
// wait until a service fails or stop signal was received
err := m.ServiceManager.StartAsync(ctx)
err = m.serviceManager.StartAsync(ctx)
if err != nil {
return err
}
err = m.ServiceManager.AwaitStopped(ctx)
err = m.serviceManager.AwaitStopped(ctx)
if err != nil {
return err
}
failed := m.ServiceManager.ServicesByState()[services.Failed]
failed := m.serviceManager.ServicesByState()[services.Failed]
for _, f := range failed {
// the service listener will log error details for all modules that failed,
// so here we return the first error that is not ErrStopProcess
@@ -123,27 +117,27 @@ func (m *service) Run(ctx context.Context) error {
}
// Shutdown stops all modules and waits for them to stop.
func (m *service) Shutdown(ctx context.Context) error {
if m.ServiceManager == nil {
func (m *service) Shutdown(ctx context.Context, reason string) error {
if m.serviceManager == nil {
m.log.Debug("No modules registered, nothing to stop...")
return nil
}
m.ServiceManager.StopAsync()
m.log.Info("Awaiting services to be stopped...")
return m.ServiceManager.AwaitStopped(ctx)
m.serviceManager.StopAsync()
m.log.Info("Awaiting services to be stopped...", "reason", reason)
return m.serviceManager.AwaitStopped(ctx)
}
type initFn func() (services.Service, error)
// RegisterModule registers a module with the dskit module manager.
func (m *service) RegisterModule(name string, initFn func() (services.Service, error), deps ...string) {
m.ModuleManager.RegisterModule(name, initFn)
m.dependencyMap[name] = deps
func (m *service) RegisterModule(name string, fn initFn) {
m.moduleManager.RegisterModule(name, fn)
}
// RegisterInvisibleModule registers an invisible module with the dskit module manager.
// Invisible modules are not visible to the user, and are intendent to be used as dependencies.
func (m *service) RegisterInvisibleModule(name string, initFn func() (services.Service, error), deps ...string) {
m.ModuleManager.RegisterModule(name, initFn, modules.UserInvisibleModule)
m.dependencyMap[name] = deps
func (m *service) RegisterInvisibleModule(name string, fn initFn) {
m.moduleManager.RegisterModule(name, fn, modules.UserInvisibleModule)
}
// IsModuleEnabled returns true if the module is enabled.

View File

@@ -1,9 +0,0 @@
package modules
import "github.com/google/wire"
var WireSet = wire.NewSet(
ProvideService,
wire.Bind(new(Engine), new(*service)),
wire.Bind(new(Manager), new(*service)),
)

View File

@@ -18,7 +18,6 @@ import (
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/metrics"
"github.com/grafana/grafana/pkg/infra/usagestats/statscollector"
"github.com/grafana/grafana/pkg/modules"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/services/accesscontrol"
"github.com/grafana/grafana/pkg/services/provisioning"
@@ -39,15 +38,14 @@ type Options struct {
func New(opts Options, cfg *setting.Cfg, httpServer *api.HTTPServer, roleRegistry accesscontrol.RoleRegistry,
provisioningService provisioning.ProvisioningService, backgroundServiceProvider registry.BackgroundServiceRegistry,
usageStatsProvidersRegistry registry.UsageStatsProvidersRegistry, statsCollectorService *statscollector.Service,
moduleService modules.Engine,
) (*Server, error) {
statsCollectorService.RegisterProviders(usageStatsProvidersRegistry.GetServices())
s, err := newServer(opts, cfg, httpServer, roleRegistry, provisioningService, backgroundServiceProvider, moduleService)
s, err := newServer(opts, cfg, httpServer, roleRegistry, provisioningService, backgroundServiceProvider)
if err != nil {
return nil, err
}
if err := s.init(); err != nil {
if err := s.Init(); err != nil {
return nil, err
}
@@ -56,7 +54,6 @@ func New(opts Options, cfg *setting.Cfg, httpServer *api.HTTPServer, roleRegistr
func newServer(opts Options, cfg *setting.Cfg, httpServer *api.HTTPServer, roleRegistry accesscontrol.RoleRegistry,
provisioningService provisioning.ProvisioningService, backgroundServiceProvider registry.BackgroundServiceRegistry,
moduleService modules.Engine,
) (*Server, error) {
rootCtx, shutdownFn := context.WithCancel(context.Background())
childRoutines, childCtx := errgroup.WithContext(rootCtx)
@@ -76,7 +73,6 @@ func newServer(opts Options, cfg *setting.Cfg, httpServer *api.HTTPServer, roleR
commit: opts.Commit,
buildBranch: opts.BuildBranch,
backgroundServices: backgroundServiceProvider.GetServices(),
moduleService: moduleService,
}
return s, nil
@@ -103,11 +99,10 @@ type Server struct {
HTTPServer *api.HTTPServer
roleRegistry accesscontrol.RoleRegistry
provisioningService provisioning.ProvisioningService
moduleService modules.Engine
}
// init initializes the server and its services.
func (s *Server) init() error {
// Init initializes the server and its services.
func (s *Server) Init() error {
s.mtx.Lock()
defer s.mtx.Unlock()
@@ -120,11 +115,6 @@ func (s *Server) init() error {
return err
}
// Initialize dskit modules.
if err := s.moduleService.Init(s.context); err != nil {
return err
}
if err := metrics.SetEnvironmentInformation(s.cfg.MetricsGrafanaEnvironmentInfo); err != nil {
return err
}
@@ -141,19 +131,10 @@ func (s *Server) init() error {
func (s *Server) Run() error {
defer close(s.shutdownFinished)
if err := s.init(); err != nil {
if err := s.Init(); err != nil {
return err
}
// Start dskit modules.
s.childRoutines.Go(func() error {
err := s.moduleService.Run(s.context)
if err != nil && !errors.Is(err, context.Canceled) {
return err
}
return nil
})
services := s.backgroundServices
// Start background services.
@@ -197,9 +178,6 @@ func (s *Server) Shutdown(ctx context.Context, reason string) error {
var err error
s.shutdownOnce.Do(func() {
s.log.Info("Shutdown started", "reason", reason)
if err := s.moduleService.Shutdown(ctx); err != nil {
s.log.Error("Failed to shutdown modules", "error", err)
}
// Call cancel func to stop background services.
s.shutdownFn()
// Wait for server to shut down

View File

@@ -48,7 +48,7 @@ func (s *testService) IsDisabled() bool {
func testServer(t *testing.T, services ...registry.BackgroundService) *Server {
t.Helper()
s, err := newServer(Options{}, setting.NewCfg(), nil, &acimpl.Service{}, nil, backgroundsvcs.NewBackgroundServiceRegistry(services...), &MockModuleService{})
s, err := newServer(Options{}, setting.NewCfg(), nil, &acimpl.Service{}, nil, backgroundsvcs.NewBackgroundServiceRegistry(services...))
require.NoError(t, err)
// Required to skip configuration initialization that causes
// DI errors in this test.

44
pkg/server/service.go Normal file
View File

@@ -0,0 +1,44 @@
package server
import (
"context"
"github.com/grafana/dskit/services"
"github.com/grafana/grafana/pkg/api"
"github.com/grafana/grafana/pkg/setting"
)
type coreService struct {
*services.BasicService
cla setting.CommandLineArgs
opts Options
apiOpts api.ServerOptions
server *Server
}
func NewService(opts Options, apiOpts api.ServerOptions) (*coreService, error) {
s := &coreService{
opts: opts,
apiOpts: apiOpts,
}
s.BasicService = services.NewBasicService(s.start, s.running, s.stop)
return s, nil
}
func (s *coreService) start(_ context.Context) error {
serv, err := Initialize(s.cla, s.opts, s.apiOpts)
if err != nil {
return err
}
s.server = serv
return s.server.Init()
}
func (s *coreService) running(_ context.Context) error {
return s.server.Run()
}
func (s *coreService) stop(failureReason error) error {
return s.server.Shutdown(context.Background(), failureReason.Error())
}

View File

@@ -29,7 +29,6 @@ import (
"github.com/grafana/grafana/pkg/login/social"
"github.com/grafana/grafana/pkg/middleware/csrf"
"github.com/grafana/grafana/pkg/middleware/loggermw"
"github.com/grafana/grafana/pkg/modules"
pluginDashboards "github.com/grafana/grafana/pkg/plugins/manager/dashboards"
"github.com/grafana/grafana/pkg/registry/corekind"
"github.com/grafana/grafana/pkg/services/accesscontrol"
@@ -42,7 +41,6 @@ import (
"github.com/grafana/grafana/pkg/services/auth/jwt"
"github.com/grafana/grafana/pkg/services/authn"
"github.com/grafana/grafana/pkg/services/authn/authnimpl"
"github.com/grafana/grafana/pkg/services/certgenerator"
"github.com/grafana/grafana/pkg/services/cleanup"
"github.com/grafana/grafana/pkg/services/contexthandler"
"github.com/grafana/grafana/pkg/services/correlations"
@@ -62,7 +60,6 @@ import (
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/folder"
"github.com/grafana/grafana/pkg/services/folder/folderimpl"
grafanaapiserver "github.com/grafana/grafana/pkg/services/grafana-apiserver"
"github.com/grafana/grafana/pkg/services/grpcserver"
grpccontext "github.com/grafana/grafana/pkg/services/grpcserver/context"
"github.com/grafana/grafana/pkg/services/grpcserver/interceptors"
@@ -202,7 +199,6 @@ var wireBasicSet = wire.NewSet(
wire.Bind(new(httpclient.Provider), new(*sdkhttpclient.Provider)),
serverlock.ProvideService,
annotationsimpl.ProvideCleanupService,
certgenerator.WireSet,
wire.Bind(new(annotations.Cleaner), new(*annotationsimpl.CleanupServiceImpl)),
cleanup.ProvideService,
shorturlimpl.ProvideService,
@@ -353,11 +349,9 @@ var wireBasicSet = wire.NewSet(
wire.Bind(new(authn.Service), new(*authnimpl.Service)),
wire.Bind(new(authn.IdentitySynchronizer), new(*authnimpl.Service)),
supportbundlesimpl.ProvideService,
grafanaapiserver.WireSet,
oasimpl.ProvideService,
wire.Bind(new(oauthserver.OAuth2Server), new(*oasimpl.OAuth2ServiceImpl)),
loggermw.Provide,
modules.WireSet,
signingkeysimpl.ProvideEmbeddedSigningKeysService,
wire.Bind(new(signingkeys.Service), new(*signingkeysimpl.Service)),
)

View File

@@ -1,13 +0,0 @@
package certgenerator
import (
"path/filepath"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana-apiserver/pkg/certgenerator"
)
func ProvideService(cfg *setting.Cfg) (*certgenerator.Service, error) {
return certgenerator.CreateService("cert-generator", filepath.Join(cfg.DataPath, "k8s"))
}

View File

@@ -1,11 +0,0 @@
package certgenerator
import (
"github.com/google/wire"
"github.com/grafana/grafana-apiserver/pkg/certgenerator"
)
var WireSet = wire.NewSet(
ProvideService,
wire.Bind(new(certgenerator.ServiceInterface), new(*certgenerator.Service)),
)

View File

@@ -6,16 +6,14 @@ import (
"net"
"os"
"path"
"strconv"
"cuelang.org/go/pkg/strings"
"github.com/go-logr/logr"
"github.com/grafana/dskit/services"
"github.com/grafana/grafana-apiserver/pkg/certgenerator"
grafanaapiserveroptions "github.com/grafana/grafana-apiserver/pkg/cmd/server/options"
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authentication/request/headerrequest"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/responsewriter"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/options"
"k8s.io/client-go/rest"
@@ -23,14 +21,7 @@ import (
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/klog/v2"
"github.com/grafana/grafana-apiserver/pkg/certgenerator"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/infra/appcontext"
"github.com/grafana/grafana/pkg/middleware"
contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/web"
"github.com/grafana/grafana/pkg/modules"
)
const (
@@ -54,39 +45,19 @@ type service struct {
*services.BasicService
restConfig *rest.Config
rr routing.RouteRegister
handler web.Handler
dataPath string
stopCh chan struct{}
stoppedCh chan error
}
func ProvideService(cfg *setting.Cfg, rr routing.RouteRegister) (*service, error) {
func New(dataPath string) (*service, error) {
s := &service{
rr: rr,
dataPath: path.Join(cfg.DataPath, "k8s"),
dataPath: dataPath,
stopCh: make(chan struct{}),
}
s.BasicService = services.NewBasicService(s.start, s.running, nil).WithName("grafana-apiserver")
s.rr.Group("/k8s", func(k8sRoute routing.RouteRegister) {
handler := func(c *contextmodel.ReqContext) {
if s.handler == nil {
c.Resp.WriteHeader(404)
_, _ = c.Resp.Write([]byte("Not found"))
return
}
if handle, ok := s.handler.(func(c *contextmodel.ReqContext)); ok {
handle(c)
return
}
}
k8sRoute.Any("/", middleware.ReqSignedIn, handler)
k8sRoute.Any("/*", middleware.ReqSignedIn, handler)
})
s.BasicService = services.NewBasicService(s.start, s.running, nil).WithName(modules.GrafanaAPIServer)
return s, nil
}
@@ -115,6 +86,16 @@ func (s *service) start(ctx context.Context) error {
K8sDataPath: s.dataPath,
}
err := certUtil.InitializeCACertPKI()
if err != nil {
return err
}
err = certUtil.EnsureApiServerPKI(certgenerator.DefaultAPIServerIp)
if err != nil {
return err
}
o.RecommendedOptions.SecureServing.BindAddress = net.ParseIP(certgenerator.DefaultAPIServerIp)
o.RecommendedOptions.SecureServing.ServerCert.CertKey = options.CertKey{
CertFile: certUtil.APIServerCertFile(),
@@ -159,26 +140,6 @@ func (s *service) start(ctx context.Context) error {
prepared := server.GenericAPIServer.PrepareRun()
s.handler = func(c *contextmodel.ReqContext) {
req := c.Req
req.URL.Path = strings.TrimPrefix(req.URL.Path, "/k8s")
if req.URL.Path == "" {
req.URL.Path = "/"
}
ctx := req.Context()
signedInUser := appcontext.MustUser(ctx)
req.Header.Set("X-Remote-User", strconv.FormatInt(signedInUser.UserID, 10))
req.Header.Set("X-Remote-Group", "grafana")
req.Header.Set("X-Remote-Extra-token-name", signedInUser.Name)
req.Header.Set("X-Remote-Extra-org-role", string(signedInUser.OrgRole))
req.Header.Set("X-Remote-Extra-org-id", strconv.FormatInt(signedInUser.OrgID, 10))
req.Header.Set("X-Remote-Extra-user-id", strconv.FormatInt(signedInUser.UserID, 10))
resp := responsewriter.WrapForHTTP1Or2(c.Resp)
prepared.GenericAPIServer.Handler.ServeHTTP(resp, req)
}
go func() {
s.stoppedCh <- prepared.Run(s.stopCh)
}()

View File

@@ -1,11 +0,0 @@
package grafanaapiserver
import (
"github.com/google/wire"
)
var WireSet = wire.NewSet(
ProvideService,
wire.Bind(new(Service), new(*service)),
wire.Bind(new(RestConfigProvider), new(*service)),
)

View File

@@ -970,7 +970,7 @@ var skipStaticRootValidation = false
func NewCfg() *Cfg {
return &Cfg{
Target: []string{},
Target: []string{"all"},
Logger: log.New("settings"),
Raw: ini.Empty(),
Azure: &azsettings.AzureSettings{},
@@ -1029,7 +1029,7 @@ func (cfg *Cfg) Load(args CommandLineArgs) error {
cfg.ErrTemplateName = "error"
Target := valueAsString(iniFile.Section(""), "target", "")
Target := valueAsString(iniFile.Section(""), "target", "all")
if Target != "" {
cfg.Target = strings.Split(Target, " ")
}