mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Before these changes the request tracing was added for each route registered using the routing.RouteRegister, see code. This had the consequence that middleware executed earlier/later in the request pipeline was not part of the request tracing middleware life-cycle which measures the duration of requests among other things. In the logger middleware we do extract the current distributed trace identifier, if available, and set that on request info/error log messages. With these changes we can extract the current distributed trace identifier, if available, and set that on the contextual HTTP request logger (models.ReqContext.Logger) which would improve the possibility to correlate all HTTP request log messages with traces. In addition, the request tracing middleware is now executed first and last in the request pipeline and should therefore result in more accurate timing measurements (request duration). Co-authored-by: Arve Knudsen <arve.knudsen@gmail.com>
335 lines
9.0 KiB
Go
335 lines
9.0 KiB
Go
package server
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"flag"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
"github.com/grafana/grafana/pkg/api"
|
|
"github.com/grafana/grafana/pkg/api/routing"
|
|
"github.com/grafana/grafana/pkg/bus"
|
|
_ "github.com/grafana/grafana/pkg/extensions"
|
|
"github.com/grafana/grafana/pkg/infra/localcache"
|
|
"github.com/grafana/grafana/pkg/infra/log"
|
|
"github.com/grafana/grafana/pkg/infra/metrics"
|
|
_ "github.com/grafana/grafana/pkg/infra/remotecache"
|
|
_ "github.com/grafana/grafana/pkg/infra/serverlock"
|
|
_ "github.com/grafana/grafana/pkg/infra/tracing"
|
|
_ "github.com/grafana/grafana/pkg/infra/usagestats"
|
|
"github.com/grafana/grafana/pkg/login"
|
|
"github.com/grafana/grafana/pkg/login/social"
|
|
"github.com/grafana/grafana/pkg/middleware"
|
|
_ "github.com/grafana/grafana/pkg/plugins/manager"
|
|
"github.com/grafana/grafana/pkg/registry"
|
|
_ "github.com/grafana/grafana/pkg/services/alerting"
|
|
_ "github.com/grafana/grafana/pkg/services/auth"
|
|
_ "github.com/grafana/grafana/pkg/services/auth/jwt"
|
|
_ "github.com/grafana/grafana/pkg/services/cleanup"
|
|
_ "github.com/grafana/grafana/pkg/services/librarypanels"
|
|
_ "github.com/grafana/grafana/pkg/services/login/loginservice"
|
|
_ "github.com/grafana/grafana/pkg/services/ngalert"
|
|
_ "github.com/grafana/grafana/pkg/services/notifications"
|
|
_ "github.com/grafana/grafana/pkg/services/provisioning"
|
|
_ "github.com/grafana/grafana/pkg/services/rendering"
|
|
_ "github.com/grafana/grafana/pkg/services/search"
|
|
_ "github.com/grafana/grafana/pkg/services/sqlstore"
|
|
"github.com/grafana/grafana/pkg/setting"
|
|
)
|
|
|
|
// Config contains parameters for the New function.
|
|
type Config struct {
|
|
ConfigFile string
|
|
HomePath string
|
|
PidFile string
|
|
Version string
|
|
Commit string
|
|
BuildBranch string
|
|
Listener net.Listener
|
|
}
|
|
|
|
type serviceRegistry interface {
|
|
IsDisabled(srv registry.Service) bool
|
|
GetServices() []*registry.Descriptor
|
|
}
|
|
|
|
type globalServiceRegistry struct{}
|
|
|
|
func (r *globalServiceRegistry) IsDisabled(srv registry.Service) bool {
|
|
return registry.IsDisabled(srv)
|
|
}
|
|
|
|
func (r *globalServiceRegistry) GetServices() []*registry.Descriptor {
|
|
return registry.GetServices()
|
|
}
|
|
|
|
// New returns a new instance of Server.
|
|
func New(cfg Config) (*Server, error) {
|
|
s := newServer(cfg)
|
|
if err := s.init(); err != nil {
|
|
return nil, err
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
func newServer(cfg Config) *Server {
|
|
rootCtx, shutdownFn := context.WithCancel(context.Background())
|
|
childRoutines, childCtx := errgroup.WithContext(rootCtx)
|
|
|
|
return &Server{
|
|
context: childCtx,
|
|
shutdownFn: shutdownFn,
|
|
shutdownFinished: make(chan struct{}),
|
|
childRoutines: childRoutines,
|
|
log: log.New("server"),
|
|
// Need to use the singleton setting.Cfg instance, to make sure we use the same as is injected in the DI
|
|
// graph
|
|
cfg: setting.GetCfg(),
|
|
|
|
configFile: cfg.ConfigFile,
|
|
homePath: cfg.HomePath,
|
|
pidFile: cfg.PidFile,
|
|
version: cfg.Version,
|
|
commit: cfg.Commit,
|
|
buildBranch: cfg.BuildBranch,
|
|
|
|
serviceRegistry: &globalServiceRegistry{},
|
|
listener: cfg.Listener,
|
|
}
|
|
}
|
|
|
|
// Server is responsible for managing the lifecycle of services.
|
|
type Server struct {
|
|
context context.Context
|
|
shutdownFn context.CancelFunc
|
|
childRoutines *errgroup.Group
|
|
log log.Logger
|
|
cfg *setting.Cfg
|
|
shutdownOnce sync.Once
|
|
shutdownFinished chan struct{}
|
|
isInitialized bool
|
|
mtx sync.Mutex
|
|
listener net.Listener
|
|
|
|
configFile string
|
|
homePath string
|
|
pidFile string
|
|
version string
|
|
commit string
|
|
buildBranch string
|
|
|
|
serviceRegistry serviceRegistry
|
|
|
|
HTTPServer *api.HTTPServer `inject:""`
|
|
}
|
|
|
|
// init initializes the server and its services.
|
|
func (s *Server) init() error {
|
|
s.mtx.Lock()
|
|
defer s.mtx.Unlock()
|
|
|
|
if s.isInitialized {
|
|
return nil
|
|
}
|
|
s.isInitialized = true
|
|
|
|
s.loadConfiguration()
|
|
s.writePIDFile()
|
|
if err := metrics.SetEnvironmentInformation(s.cfg.MetricsGrafanaEnvironmentInfo); err != nil {
|
|
return err
|
|
}
|
|
|
|
login.Init()
|
|
social.NewOAuthService()
|
|
|
|
services := s.serviceRegistry.GetServices()
|
|
if err := s.buildServiceGraph(services); err != nil {
|
|
return err
|
|
}
|
|
|
|
if s.listener != nil {
|
|
for _, service := range services {
|
|
if httpS, ok := service.Instance.(*api.HTTPServer); ok {
|
|
// Configure the api.HTTPServer if necessary
|
|
// Hopefully we can find a better solution, maybe with a more advanced DI framework, f.ex. Dig?
|
|
s.log.Debug("Using provided listener for HTTP server")
|
|
httpS.Listener = s.listener
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Run initializes and starts services. This will block until all services have
|
|
// exited. To initiate shutdown, call the Shutdown method in another goroutine.
|
|
func (s *Server) Run() error {
|
|
defer close(s.shutdownFinished)
|
|
|
|
if err := s.init(); err != nil {
|
|
return err
|
|
}
|
|
|
|
services := s.serviceRegistry.GetServices()
|
|
|
|
// Start background services.
|
|
for _, svc := range services {
|
|
service, ok := svc.Instance.(registry.BackgroundService)
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
if s.serviceRegistry.IsDisabled(svc.Instance) {
|
|
continue
|
|
}
|
|
|
|
// Variable is needed for accessing loop variable in callback
|
|
descriptor := svc
|
|
s.childRoutines.Go(func() error {
|
|
select {
|
|
case <-s.context.Done():
|
|
return s.context.Err()
|
|
default:
|
|
}
|
|
err := service.Run(s.context)
|
|
// Do not return context.Canceled error since errgroup.Group only
|
|
// returns the first error to the caller - thus we can miss a more
|
|
// interesting error.
|
|
if err != nil && !errors.Is(err, context.Canceled) {
|
|
s.log.Error("Stopped "+descriptor.Name, "reason", err)
|
|
return fmt.Errorf("%s run error: %w", descriptor.Name, err)
|
|
}
|
|
s.log.Debug("Stopped "+descriptor.Name, "reason", err)
|
|
return nil
|
|
})
|
|
}
|
|
|
|
s.notifySystemd("READY=1")
|
|
|
|
s.log.Debug("Waiting on services...")
|
|
return s.childRoutines.Wait()
|
|
}
|
|
|
|
// Shutdown initiates Grafana graceful shutdown. This shuts down all
|
|
// running background services. Since Run blocks Shutdown supposed to
|
|
// be run from a separate goroutine.
|
|
func (s *Server) Shutdown(reason string) {
|
|
s.shutdownOnce.Do(func() {
|
|
s.log.Info("Shutdown started", "reason", reason)
|
|
// Call cancel func to stop services.
|
|
s.shutdownFn()
|
|
// Can introduce termination timeout here if needed over incoming Context,
|
|
// but this will require changing Shutdown method signature to accept context
|
|
// and return an error - caller can exit with code > 0 then.
|
|
// I.e. sth like server.Shutdown(context.WithTimeout(...), reason).
|
|
<-s.shutdownFinished
|
|
})
|
|
}
|
|
|
|
// ExitCode returns an exit code for a given error.
|
|
func (s *Server) ExitCode(runError error) int {
|
|
if runError != nil {
|
|
s.log.Error("Server shutdown", "error", runError)
|
|
return 1
|
|
}
|
|
return 0
|
|
}
|
|
|
|
// writePIDFile retrieves the current process ID and writes it to file.
|
|
func (s *Server) writePIDFile() {
|
|
if s.pidFile == "" {
|
|
return
|
|
}
|
|
|
|
// Ensure the required directory structure exists.
|
|
err := os.MkdirAll(filepath.Dir(s.pidFile), 0700)
|
|
if err != nil {
|
|
s.log.Error("Failed to verify pid directory", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Retrieve the PID and write it to file.
|
|
pid := strconv.Itoa(os.Getpid())
|
|
if err := ioutil.WriteFile(s.pidFile, []byte(pid), 0644); err != nil {
|
|
s.log.Error("Failed to write pidfile", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
s.log.Info("Writing PID file", "path", s.pidFile, "pid", pid)
|
|
}
|
|
|
|
// buildServiceGraph builds a graph of services and their dependencies.
|
|
func (s *Server) buildServiceGraph(services []*registry.Descriptor) error {
|
|
// Specify service dependencies.
|
|
objs := []interface{}{
|
|
bus.GetBus(),
|
|
s.cfg,
|
|
routing.NewRouteRegister(middleware.ProvideRouteOperationName, middleware.RequestMetrics(s.cfg)),
|
|
localcache.New(5*time.Minute, 10*time.Minute),
|
|
s,
|
|
}
|
|
return registry.BuildServiceGraph(objs, services)
|
|
}
|
|
|
|
// loadConfiguration loads settings and configuration from config files.
|
|
func (s *Server) loadConfiguration() {
|
|
args := &setting.CommandLineArgs{
|
|
Config: s.configFile,
|
|
HomePath: s.homePath,
|
|
Args: flag.Args(),
|
|
}
|
|
|
|
if err := s.cfg.Load(args); err != nil {
|
|
_, _ = fmt.Fprintf(os.Stderr, "Failed to start grafana. error: %s\n", err.Error())
|
|
os.Exit(1)
|
|
}
|
|
|
|
s.log.Info("Starting "+setting.ApplicationName,
|
|
"version", s.version,
|
|
"commit", s.commit,
|
|
"branch", s.buildBranch,
|
|
"compiled", time.Unix(setting.BuildStamp, 0),
|
|
)
|
|
|
|
s.cfg.LogConfigSources()
|
|
}
|
|
|
|
// notifySystemd sends state notifications to systemd.
|
|
func (s *Server) notifySystemd(state string) {
|
|
notifySocket := os.Getenv("NOTIFY_SOCKET")
|
|
if notifySocket == "" {
|
|
s.log.Debug(
|
|
"NOTIFY_SOCKET environment variable empty or unset, can't send systemd notification")
|
|
return
|
|
}
|
|
|
|
socketAddr := &net.UnixAddr{
|
|
Name: notifySocket,
|
|
Net: "unixgram",
|
|
}
|
|
conn, err := net.DialUnix(socketAddr.Net, nil, socketAddr)
|
|
if err != nil {
|
|
s.log.Warn("Failed to connect to systemd", "err", err, "socket", notifySocket)
|
|
return
|
|
}
|
|
defer func() {
|
|
if err := conn.Close(); err != nil {
|
|
s.log.Warn("Failed to close connection", "err", err)
|
|
}
|
|
}()
|
|
|
|
_, err = conn.Write([]byte(state))
|
|
if err != nil {
|
|
s.log.Warn("Failed to write notification to systemd", "err", err)
|
|
}
|
|
}
|