mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Storage Api: Add metrics (#85316)
* Storage server runs own instrumentation server if its the sole target. Starts adding some sample metrics for now. * adds metric for failed optimistic locks * refactors metrics registration to own method on service for testability. Adds tests. * Register sql storage server metrics from within the service * fixes test * troubleshooting drone test failures. Maybe timing when starting instrumentation server? * Waits until instrumentation server has started. Updates tests. * defer wont get called unless theres an error. removing. * wait for instrumentation server to be running * linter - close res body * use port 3000 for metrics and removes test metric inc() call * fixes test - updates port * refactors module server to provide an instrumentation server module when there is no ALL or CORE target provided and running as single target * make instrumentation server a dependency of all modules that do not run their own http server * adds module server test * adds tests for instrumentation service and removes old tests that aren't needed * ignore error in test * uses helper to start and run service * when running wait on ctx done or http server err * wait for http server * removes println * updates module server test to be integration test * require no error in goroutine * skips integration test when GRAFANA_TEST_DB not defined * move http server start into start, verify returned content * make test error when run fails * try waiting longer and see if drone tests pass * update integration test mysql creds to match drone * go back to only waiting half second * debug log drone mysql connection string * use same db connection config as drone * try using same hostname as drone * cant use localhost as mysql hostname in drone tests. Need to parse it from the cfg db connection string --------- Co-authored-by: Dan Cech <dcech@grafana.com>
This commit is contained in:
parent
3d9a0c8398
commit
37d39de36d
@ -4,14 +4,15 @@ const (
|
||||
// All includes all modules necessary for Grafana to run as a standalone server
|
||||
All string = "all"
|
||||
|
||||
Core string = "core"
|
||||
GrafanaAPIServer string = "grafana-apiserver"
|
||||
StorageServer string = "storage-server"
|
||||
Core string = "core"
|
||||
GrafanaAPIServer string = "grafana-apiserver"
|
||||
StorageServer string = "storage-server"
|
||||
InstrumentationServer string = "instrumentation-server"
|
||||
)
|
||||
|
||||
var dependencyMap = map[string][]string{
|
||||
GrafanaAPIServer: {},
|
||||
StorageServer: {},
|
||||
GrafanaAPIServer: {InstrumentationServer},
|
||||
StorageServer: {InstrumentationServer},
|
||||
Core: {},
|
||||
All: {Core},
|
||||
}
|
||||
|
68
pkg/server/instrumentation_service.go
Normal file
68
pkg/server/instrumentation_service.go
Normal file
@ -0,0 +1,68 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/dskit/services"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
)
|
||||
|
||||
type instrumentationService struct {
|
||||
*services.BasicService
|
||||
httpServ *http.Server
|
||||
log log.Logger
|
||||
errChan chan error
|
||||
}
|
||||
|
||||
func NewInstrumentationService(log log.Logger) (*instrumentationService, error) {
|
||||
s := &instrumentationService{log: log}
|
||||
s.BasicService = services.NewBasicService(s.start, s.running, s.stop)
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *instrumentationService) start(ctx context.Context) error {
|
||||
s.httpServ = s.newInstrumentationServer(ctx)
|
||||
s.errChan = make(chan error)
|
||||
go func() {
|
||||
s.errChan <- s.httpServ.ListenAndServe()
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *instrumentationService) running(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case err := <-s.errChan:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func (s *instrumentationService) stop(failureReason error) error {
|
||||
s.log.Info("stopping instrumentation server", "reason", failureReason)
|
||||
if err := s.httpServ.Shutdown(context.Background()); err != nil {
|
||||
s.log.Error("failed to shutdown instrumentation server", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *instrumentationService) newInstrumentationServer(ctx context.Context) *http.Server {
|
||||
router := http.NewServeMux()
|
||||
router.Handle("/metrics", promhttp.Handler())
|
||||
|
||||
srv := &http.Server{
|
||||
// 5s timeout for header reads to avoid Slowloris attacks (https://thetooth.io/blog/slowloris-attack/)
|
||||
ReadHeaderTimeout: 5 * time.Second,
|
||||
Addr: ":3000", // TODO - make configurable?
|
||||
Handler: router,
|
||||
BaseContext: func(_ net.Listener) context.Context { return ctx },
|
||||
}
|
||||
|
||||
return srv
|
||||
}
|
52
pkg/server/instrumentation_service_test.go
Normal file
52
pkg/server/instrumentation_service_test.go
Normal file
@ -0,0 +1,52 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/dskit/services"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestRunInstrumentationService(t *testing.T) {
|
||||
s, err := NewInstrumentationService(log.New("test-logger"))
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second)
|
||||
defer cancel()
|
||||
|
||||
err = services.StartAndAwaitRunning(ctx, s)
|
||||
require.NoError(t, err)
|
||||
|
||||
testCounter := prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "test_counter",
|
||||
})
|
||||
err = prometheus.Register(testCounter)
|
||||
require.NoError(t, err)
|
||||
|
||||
testCounter.Inc()
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
client := http.Client{}
|
||||
res, err := client.Get("http://localhost:3000/metrics")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 200, res.StatusCode)
|
||||
|
||||
b, err := io.ReadAll(res.Body)
|
||||
require.NoError(t, err)
|
||||
resp := string(b[len(b)-16:])
|
||||
assert.Equal(t, "\ntest_counter 1\n", resp)
|
||||
|
||||
err = res.Body.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
err = services.StopAndAwaitTerminated(ctx, s)
|
||||
require.NoError(t, err)
|
||||
}
|
@ -10,7 +10,6 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/grafana/dskit/services"
|
||||
|
||||
"github.com/grafana/grafana/pkg/api"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/modules"
|
||||
@ -117,6 +116,14 @@ func (s *ModuleServer) Run() error {
|
||||
|
||||
m := modules.New(s.cfg.Target)
|
||||
|
||||
// only run the instrumentation server module if were not running a module that already contains an http server
|
||||
m.RegisterInvisibleModule(modules.InstrumentationServer, func() (services.Service, error) {
|
||||
if m.IsModuleEnabled(modules.All) || m.IsModuleEnabled(modules.Core) {
|
||||
return services.NewBasicService(nil, nil, nil).WithName(modules.InstrumentationServer), nil
|
||||
}
|
||||
return NewInstrumentationService(s.log)
|
||||
})
|
||||
|
||||
m.RegisterModule(modules.Core, func() (services.Service, error) {
|
||||
return NewService(s.cfg, s.opts, s.apiOpts)
|
||||
})
|
||||
@ -131,7 +138,7 @@ func (s *ModuleServer) Run() error {
|
||||
//}
|
||||
|
||||
m.RegisterModule(modules.StorageServer, func() (services.Service, error) {
|
||||
return storageServer.ProvideService(s.cfg, s.features)
|
||||
return storageServer.ProvideService(s.cfg, s.features, s.log)
|
||||
})
|
||||
|
||||
m.RegisterModule(modules.All, nil)
|
||||
|
88
pkg/server/module_server_test.go
Normal file
88
pkg/server/module_server_test.go
Normal file
@ -0,0 +1,88 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"cuelang.org/go/pkg/regexp"
|
||||
"github.com/grafana/grafana/pkg/api"
|
||||
"github.com/grafana/grafana/pkg/infra/db"
|
||||
"github.com/grafana/grafana/pkg/modules"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/tests/testsuite"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
testsuite.Run(m)
|
||||
}
|
||||
|
||||
func TestIntegrationWillRunInstrumentationServerWhenTargetHasNoHttpServer(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test")
|
||||
}
|
||||
dbType := os.Getenv("GRAFANA_TEST_DB")
|
||||
if dbType == "" {
|
||||
t.Skip("skipping - GRAFANA_TEST_DB not defined")
|
||||
}
|
||||
if dbType == "sqlite3" {
|
||||
t.Skip("skipping - sqlite not supported for storage server target")
|
||||
}
|
||||
|
||||
testdb := db.InitTestDB(t)
|
||||
cfg := testdb.Cfg
|
||||
cfg.GRPCServerNetwork = "tcp"
|
||||
cfg.GRPCServerAddress = "localhost:10000"
|
||||
addStorageServerToConfig(t, cfg, dbType)
|
||||
cfg.Target = []string{modules.StorageServer}
|
||||
|
||||
ms, err := InitializeModuleServer(cfg, Options{}, api.ServerOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
go func() {
|
||||
err = ms.Run()
|
||||
if err.Error() != "context canceled" {
|
||||
t.Error(err)
|
||||
}
|
||||
}()
|
||||
time.Sleep(500 * time.Millisecond) // wait for http server to be running
|
||||
|
||||
client := http.Client{}
|
||||
res, err := client.Get("http://localhost:3000/metrics")
|
||||
require.NoError(t, err)
|
||||
err = res.Body.Close()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 200, res.StatusCode)
|
||||
|
||||
err = ms.Shutdown(context.Background(), "test over")
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func addStorageServerToConfig(t *testing.T, cfg *setting.Cfg, dbType string) {
|
||||
s, err := cfg.Raw.NewSection("entity_api")
|
||||
require.NoError(t, err)
|
||||
_, err = s.NewKey("db_type", dbType)
|
||||
require.NoError(t, err)
|
||||
|
||||
if dbType == "postgres" {
|
||||
_, _ = s.NewKey("db_host", "localhost")
|
||||
_, _ = s.NewKey("db_name", "grafanatest")
|
||||
_, _ = s.NewKey("db_user", "grafanatest")
|
||||
_, _ = s.NewKey("db_pass", "grafanatest")
|
||||
} else {
|
||||
// cant use localhost as hostname in drone tests for mysql, so need to parse it from connection string
|
||||
sec, err := cfg.Raw.GetSection("database")
|
||||
require.NoError(t, err)
|
||||
connString := sec.Key("connection_string").String()
|
||||
matches, err := regexp.FindSubmatch("(.+):(.+)@tcp\\((.+):(\\d+)\\)/(.+)\\?", connString)
|
||||
require.NoError(t, err)
|
||||
_, _ = s.NewKey("db_host", matches[3])
|
||||
_, _ = s.NewKey("db_name", matches[5])
|
||||
_, _ = s.NewKey("db_user", matches[1])
|
||||
_, _ = s.NewKey("db_pass", matches[2])
|
||||
}
|
||||
}
|
@ -4,8 +4,7 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/grafana/dskit/services"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/modules"
|
||||
"github.com/grafana/grafana/pkg/registry"
|
||||
@ -17,6 +16,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/store/entity/grpc"
|
||||
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -51,11 +51,14 @@ type service struct {
|
||||
tracing *tracing.TracingService
|
||||
|
||||
authenticator interceptors.Authenticator
|
||||
|
||||
log log.Logger
|
||||
}
|
||||
|
||||
func ProvideService(
|
||||
cfg *setting.Cfg,
|
||||
features featuremgmt.FeatureToggles,
|
||||
log log.Logger,
|
||||
) (*service, error) {
|
||||
tracingCfg, err := tracing.ProvideTracingConfig(cfg)
|
||||
if err != nil {
|
||||
@ -76,6 +79,7 @@ func ProvideService(
|
||||
stopCh: make(chan struct{}),
|
||||
authenticator: authn,
|
||||
tracing: tracing,
|
||||
log: log,
|
||||
}
|
||||
|
||||
// This will be used when running as a dskit service
|
||||
|
41
pkg/services/store/entity/sqlstash/metrics.go
Normal file
41
pkg/services/store/entity/sqlstash/metrics.go
Normal file
@ -0,0 +1,41 @@
|
||||
package sqlstash
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
var (
|
||||
once sync.Once
|
||||
StorageServerMetrics *StorageApiMetrics
|
||||
)
|
||||
|
||||
type StorageApiMetrics struct {
|
||||
OptimisticLockFailed *prometheus.CounterVec
|
||||
}
|
||||
|
||||
func NewStorageMetrics() *StorageApiMetrics {
|
||||
once.Do(func() {
|
||||
StorageServerMetrics = &StorageApiMetrics{
|
||||
OptimisticLockFailed: prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "storage_server",
|
||||
Name: "optimistic_lock_failed",
|
||||
Help: "count of optimistic locks failed",
|
||||
},
|
||||
[]string{"action"},
|
||||
),
|
||||
}
|
||||
})
|
||||
|
||||
return StorageServerMetrics
|
||||
}
|
||||
|
||||
func (s *StorageApiMetrics) Collect(ch chan<- prometheus.Metric) {
|
||||
s.OptimisticLockFailed.Collect(ch)
|
||||
}
|
||||
|
||||
func (s *StorageApiMetrics) Describe(ch chan<- *prometheus.Desc) {
|
||||
s.OptimisticLockFailed.Describe(ch)
|
||||
}
|
@ -14,7 +14,6 @@ import (
|
||||
|
||||
"github.com/bwmarrin/snowflake"
|
||||
"github.com/google/uuid"
|
||||
|
||||
folder "github.com/grafana/grafana/pkg/apis/folder/v0alpha1"
|
||||
"github.com/grafana/grafana/pkg/infra/appcontext"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
@ -23,6 +22,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/store"
|
||||
"github.com/grafana/grafana/pkg/services/store/entity"
|
||||
"github.com/grafana/grafana/pkg/services/store/entity/db"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const entityTable = "entity"
|
||||
@ -38,6 +38,10 @@ func ProvideSQLEntityServer(db db.EntityDBInterface /*, cfg *setting.Cfg */) (en
|
||||
ctx: context.Background(),
|
||||
}
|
||||
|
||||
if err := prometheus.Register(NewStorageMetrics()); err != nil {
|
||||
entityServer.log.Warn("error registering storage server metrics", "error", err)
|
||||
}
|
||||
|
||||
return entityServer, nil
|
||||
}
|
||||
|
||||
@ -514,6 +518,7 @@ func (s *sqlEntityServer) Update(ctx context.Context, r *entity.UpdateEntityRequ
|
||||
|
||||
// Optimistic locking
|
||||
if r.PreviousVersion > 0 && r.PreviousVersion != current.ResourceVersion {
|
||||
StorageServerMetrics.OptimisticLockFailed.WithLabelValues("update").Inc()
|
||||
return fmt.Errorf("optimistic lock failed")
|
||||
}
|
||||
|
||||
@ -759,6 +764,7 @@ func (s *sqlEntityServer) Delete(ctx context.Context, r *entity.DeleteEntityRequ
|
||||
|
||||
if r.PreviousVersion > 0 && r.PreviousVersion != rsp.Entity.ResourceVersion {
|
||||
rsp.Status = entity.DeleteEntityResponse_ERROR
|
||||
StorageServerMetrics.OptimisticLockFailed.WithLabelValues("delete").Inc()
|
||||
return fmt.Errorf("optimistic lock failed")
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user