Chore: Refactor Server Run and Shutdown (#32611)

This commit is contained in:
Alexander Emelin 2021-04-07 10:44:06 +03:00 committed by GitHub
parent 2c8728a2a4
commit 7c182b1d2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 198 additions and 78 deletions

View File

@ -194,6 +194,7 @@ func listenToSystemSignals(s *server.Server) {
}
case sig := <-signalChan:
s.Shutdown(fmt.Sprintf("System signal: %s", sig))
return
}
}
}

View File

@ -57,16 +57,40 @@ type Config struct {
Listener net.Listener
}
type serviceRegistry interface {
IsDisabled(srv registry.Service) bool
GetServices() []*registry.Descriptor
}
type globalServiceRegistry struct{}
func (r *globalServiceRegistry) IsDisabled(srv registry.Service) bool {
return registry.IsDisabled(srv)
}
func (r *globalServiceRegistry) GetServices() []*registry.Descriptor {
return registry.GetServices()
}
// New returns a new instance of Server.
func New(cfg Config) (*Server, error) {
s := newServer(cfg)
if err := s.init(); err != nil {
return nil, err
}
return s, nil
}
func newServer(cfg Config) *Server {
rootCtx, shutdownFn := context.WithCancel(context.Background())
childRoutines, childCtx := errgroup.WithContext(rootCtx)
s := &Server{
context: childCtx,
shutdownFn: shutdownFn,
childRoutines: childRoutines,
log: log.New("server"),
return &Server{
context: childCtx,
shutdownFn: shutdownFn,
shutdownFinished: make(chan struct{}),
childRoutines: childRoutines,
log: log.New("server"),
// Need to use the singleton setting.Cfg instance, to make sure we use the same as is injected in the DI
// graph
cfg: setting.GetCfg(),
@ -77,28 +101,24 @@ func New(cfg Config) (*Server, error) {
version: cfg.Version,
commit: cfg.Commit,
buildBranch: cfg.BuildBranch,
listener: cfg.Listener,
}
if err := s.init(); err != nil {
return nil, err
serviceRegistry: &globalServiceRegistry{},
listener: cfg.Listener,
}
return s, nil
}
// Server is responsible for managing the lifecycle of services.
type Server struct {
context context.Context
shutdownFn context.CancelFunc
childRoutines *errgroup.Group
log log.Logger
cfg *setting.Cfg
shutdownReason string
shutdownInProgress bool
isInitialized bool
mtx sync.Mutex
listener net.Listener
context context.Context
shutdownFn context.CancelFunc
childRoutines *errgroup.Group
log log.Logger
cfg *setting.Cfg
shutdownOnce sync.Once
shutdownFinished chan struct{}
isInitialized bool
mtx sync.Mutex
listener net.Listener
configFile string
homePath string
@ -107,6 +127,8 @@ type Server struct {
commit string
buildBranch string
serviceRegistry serviceRegistry
HTTPServer *api.HTTPServer `inject:""`
}
@ -129,7 +151,7 @@ func (s *Server) init() error {
login.Init()
social.NewOAuthService()
services := registry.GetServices()
services := s.serviceRegistry.GetServices()
if err := s.buildServiceGraph(services); err != nil {
return err
}
@ -150,12 +172,14 @@ func (s *Server) init() error {
// Run initializes and starts services. This will block until all services have
// exited. To initiate shutdown, call the Shutdown method in another goroutine.
func (s *Server) Run() (err error) {
if err = s.init(); err != nil {
return
func (s *Server) Run() error {
defer close(s.shutdownFinished)
if err := s.init(); err != nil {
return err
}
services := registry.GetServices()
services := s.serviceRegistry.GetServices()
// Start background services.
for _, svc := range services {
@ -164,78 +188,60 @@ func (s *Server) Run() (err error) {
continue
}
if registry.IsDisabled(svc.Instance) {
if s.serviceRegistry.IsDisabled(svc.Instance) {
continue
}
// Variable is needed for accessing loop variable in callback
descriptor := svc
s.childRoutines.Go(func() error {
// Don't start new services when server is shutting down.
if s.shutdownInProgress {
return nil
select {
case <-s.context.Done():
return s.context.Err()
default:
}
err := service.Run(s.context)
if err != nil {
// Mark that we are in shutdown mode
// So no more services are started
s.shutdownInProgress = true
if !errors.Is(err, context.Canceled) {
// Server has crashed.
s.log.Error("Stopped "+descriptor.Name, "reason", err)
} else {
s.log.Debug("Stopped "+descriptor.Name, "reason", err)
}
return err
// Do not return context.Canceled error since errgroup.Group only
// returns the first error to the caller - thus we can miss a more
// interesting error.
if err != nil && !errors.Is(err, context.Canceled) {
s.log.Error("Stopped "+descriptor.Name, "reason", err)
return fmt.Errorf("%s run error: %w", descriptor.Name, err)
}
s.log.Debug("Stopped "+descriptor.Name, "reason", err)
return nil
})
}
defer func() {
s.log.Debug("Waiting on services...")
if waitErr := s.childRoutines.Wait(); waitErr != nil && !errors.Is(waitErr, context.Canceled) {
s.log.Error("A service failed", "err", waitErr)
if err == nil {
err = waitErr
}
}
}()
s.notifySystemd("READY=1")
return nil
s.log.Debug("Waiting on services...")
return s.childRoutines.Wait()
}
// Shutdown initiates Grafana graceful shutdown. This shuts down all
// running background services. Since Run blocks Shutdown supposed to
// be run from a separate goroutine.
func (s *Server) Shutdown(reason string) {
s.log.Info("Shutdown started", "reason", reason)
s.shutdownReason = reason
s.shutdownInProgress = true
// call cancel func on root context
s.shutdownFn()
// wait for child routines
if err := s.childRoutines.Wait(); err != nil && !errors.Is(err, context.Canceled) {
s.log.Error("Failed waiting for services to shutdown", "err", err)
}
s.shutdownOnce.Do(func() {
s.log.Info("Shutdown started", "reason", reason)
// Call cancel func to stop services.
s.shutdownFn()
// Can introduce termination timeout here if needed over incoming Context,
// but this will require changing Shutdown method signature to accept context
// and return an error - caller can exit with code > 0 then.
// I.e. sth like server.Shutdown(context.WithTimeout(...), reason).
<-s.shutdownFinished
})
}
// ExitCode returns an exit code for a given error.
func (s *Server) ExitCode(reason error) int {
code := 1
if errors.Is(reason, context.Canceled) && s.shutdownReason != "" {
reason = fmt.Errorf(s.shutdownReason)
code = 0
func (s *Server) ExitCode(runError error) int {
if runError != nil {
s.log.Error("Server shutdown", "error", runError)
return 1
}
s.log.Error("Server shutdown", "reason", reason)
return code
return 0
}
// writePIDFile retrieves the current process ID and writes it to file.
@ -283,7 +289,7 @@ func (s *Server) loadConfiguration() {
}
if err := s.cfg.Load(args); err != nil {
fmt.Fprintf(os.Stderr, "Failed to start grafana. error: %s\n", err.Error())
_, _ = fmt.Fprintf(os.Stderr, "Failed to start grafana. error: %s\n", err.Error())
os.Exit(1)
}

113
pkg/server/server_test.go Normal file
View File

@ -0,0 +1,113 @@
package server
import (
"context"
"errors"
"testing"
"github.com/grafana/grafana/pkg/registry"
"github.com/stretchr/testify/require"
)
type testServiceRegistry struct {
services []*registry.Descriptor
}
func (r *testServiceRegistry) GetServices() []*registry.Descriptor {
return r.services
}
func (r *testServiceRegistry) IsDisabled(_ registry.Service) bool {
return false
}
type testService struct {
started chan struct{}
initErr error
runErr error
}
func newTestService(initErr, runErr error) *testService {
return &testService{
started: make(chan struct{}),
initErr: initErr,
runErr: runErr,
}
}
func (s *testService) Init() error {
return s.initErr
}
func (s *testService) Run(ctx context.Context) error {
if s.runErr != nil {
return s.runErr
}
close(s.started)
<-ctx.Done()
return ctx.Err()
}
func testServer() *Server {
s := newServer(Config{})
// Required to skip configuration initialization that causes
// DI errors in this test.
s.isInitialized = true
return s
}
func TestServer_Run_Error(t *testing.T) {
s := testServer()
var testErr = errors.New("boom")
s.serviceRegistry = &testServiceRegistry{
services: []*registry.Descriptor{
{
Name: "TestService1",
Instance: newTestService(nil, nil),
InitPriority: registry.High,
},
{
Name: "TestService2",
Instance: newTestService(nil, testErr),
InitPriority: registry.High,
},
},
}
err := s.Run()
require.ErrorIs(t, err, testErr)
require.NotZero(t, s.ExitCode(err))
}
func TestServer_Shutdown(t *testing.T) {
s := testServer()
services := []*registry.Descriptor{
{
Name: "TestService1",
Instance: newTestService(nil, nil),
InitPriority: registry.High,
},
{
Name: "TestService2",
Instance: newTestService(nil, nil),
InitPriority: registry.High,
},
}
s.serviceRegistry = &testServiceRegistry{
services: services,
}
go func() {
// Wait until all services launched.
for _, svc := range services {
<-svc.Instance.(*testService).started
}
s.Shutdown("test interrupt")
}()
err := s.Run()
require.NoError(t, err)
require.Zero(t, s.ExitCode(err))
}

View File

@ -56,7 +56,7 @@ func StartGrafana(t *testing.T, grafDir, cfgPath string, sqlStore *sqlstore.SQLS
}
}()
t.Cleanup(func() {
server.Shutdown("")
server.Shutdown("test cleanup")
})
// Wait for Grafana to be ready