Experimental Feature Toggle: databaseReadReplica (#89232)

This adds a version of the SQLStore that includes a ReadReplica. The primary DB can be accessed directly - from the caller's standpoint, there is no difference between the SQLStore and ReplStore unless they wish to explicitly call the ReadReplica() and use that for the DB sessions.

Currently only the stats service GetSystemStats and GetAdminStats are using the ReadReplica(); if it's misconfigured or if the databaseReadReplica feature flag is not turned on, it will fall back to the usual (SQLStore) behavior.

Testing requires a database and read replica - the replication should already be configured. I have been testing this locally with a docker mysql setup (https://medium.com/@vbabak/docker-mysql-master-slave-replication-setup-2ff553fceef2) and the following config:

[feature_toggles]
databaseReadReplica = true

[database]
type = mysql
name = grafana
user = grafana
password = password
host = 127.0.0.1:3306

[database_replica]
type = mysql
name = grafana
user = grafana
password = password
host = 127.0.0.1:3307
This commit is contained in:
Kristin Laemmert 2024-06-18 11:07:15 -04:00 committed by GitHub
parent 791bcd93df
commit 50244ed4a1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 272 additions and 23 deletions

View File

@ -190,6 +190,7 @@ Experimental features might be changed or removed without prior notice.
| `alertingCentralAlertHistory` | Enables the new central alert history. |
| `azureMonitorPrometheusExemplars` | Allows configuration of Azure Monitor as a data source that can provide Prometheus exemplars |
| `pinNavItems` | Enables pinning of nav items |
| `databaseReadReplica` | Use a read replica for some database queries. |
## Development feature toggles

View File

@ -1286,6 +1286,7 @@ github.com/rogpeppe/go-charset v0.0.0-20180617210344-2471d30d28b4 h1:BN/Nyn2nWMo
github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc=
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.15.0 h1:uPRuwkWF4J6fGsJ2R0Gn2jB1EQiav9k3S6CSdygQJXY=
github.com/russross/blackfriday v1.5.2 h1:HyvC0ARfnZBqnXwABFeSZHpKvJHJJfPz81GNueLj0oo=
github.com/russross/blackfriday v1.6.0 h1:KqfZb0pUVN2lYqZUYRddxF4OR8ZMURnJIG5Y3VRLtww=
github.com/russross/blackfriday v1.6.0/go.mod h1:ti0ldHuxg49ri4ksnFxlkCfN+hvslNlmVHqNRXXJNAY=
github.com/ruudk/golang-pdf417 v0.0.0-20201230142125-a7e3863a1245 h1:K1Xf3bKttbF+koVGaX5xngRIZ5bVjbmPnaxE/dR08uY=

View File

@ -195,4 +195,5 @@ export interface FeatureToggles {
authZGRPCServer?: boolean;
openSearchBackendFlowEnabled?: boolean;
ssoSettingsLDAP?: boolean;
databaseReadReplica?: boolean;
}

View File

@ -62,6 +62,15 @@ func InitTestDB(t sqlutil.ITestDB, opts ...InitTestDBOpt) *sqlstore.SQLStore {
return db
}
func InitTestReplDBWithCfg(t sqlutil.ITestDB, opts ...InitTestDBOpt) (*sqlstore.ReplStore, *setting.Cfg) {
return sqlstore.InitTestReplDB(t, opts...)
}
func InitTestReplDB(t sqlutil.ITestDB, opts ...InitTestDBOpt) *sqlstore.ReplStore {
db, _ := InitTestReplDBWithCfg(t, opts...)
return db
}
func InitTestDBWithCfg(t sqlutil.ITestDB, opts ...InitTestDBOpt) (*sqlstore.SQLStore, *setting.Cfg) {
return sqlstore.InitTestDB(t, opts...)
}

9
pkg/infra/db/dbrepl.go Normal file
View File

@ -0,0 +1,9 @@
package db
import "github.com/grafana/grafana/pkg/services/sqlstore"
type ReplDB interface {
// DB is the primary database connection.
DB() *sqlstore.SQLStore
ReadReplica() *sqlstore.SQLStore
}

View File

@ -24,7 +24,7 @@ func TestMain(m *testing.M) {
}
func TestConcurrentUsersMetrics(t *testing.T) {
sqlStore, cfg := db.InitTestDBWithCfg(t)
sqlStore, cfg := db.InitTestReplDBWithCfg(t)
statsService := statsimpl.ProvideService(&setting.Cfg{}, sqlStore)
s := createService(t, cfg, sqlStore, statsService)
@ -42,7 +42,7 @@ func TestConcurrentUsersMetrics(t *testing.T) {
}
func TestConcurrentUsersStats(t *testing.T) {
sqlStore, cfg := db.InitTestDBWithCfg(t)
sqlStore, cfg := db.InitTestReplDBWithCfg(t)
statsService := statsimpl.ProvideService(&setting.Cfg{}, sqlStore)
s := createService(t, cfg, sqlStore, statsService)

View File

@ -15,6 +15,7 @@ import (
func ProvideTestEnv(
server *Server,
db db.DB,
repldb db.ReplDB,
cfg *setting.Cfg,
ns *notifications.NotificationServiceMock,
grpcServer grpcserver.Provider,
@ -26,6 +27,7 @@ func ProvideTestEnv(
return &TestEnv{
Server: server,
SQLStore: db,
ReadReplStore: repldb,
Cfg: cfg,
NotificationService: ns,
GRPCServer: grpcServer,
@ -39,6 +41,7 @@ func ProvideTestEnv(
type TestEnv struct {
Server *Server
SQLStore db.DB
ReadReplStore db.ReplDB
Cfg *setting.Cfg
NotificationService *notifications.NotificationServiceMock
GRPCServer grpcserver.Provider

View File

@ -390,6 +390,7 @@ var wireSet = wire.NewSet(
wireBasicSet,
metrics.WireSet,
sqlstore.ProvideService,
sqlstore.ProvideServiceWithReadReplica,
ngmetrics.ProvideService,
wire.Bind(new(notifications.Service), new(*notifications.NotificationService)),
wire.Bind(new(notifications.WebhookSender), new(*notifications.NotificationService)),
@ -405,6 +406,7 @@ var wireCLISet = wire.NewSet(
wireBasicSet,
metrics.WireSet,
sqlstore.ProvideService,
sqlstore.ProvideServiceWithReadReplica,
ngmetrics.ProvideService,
wire.Bind(new(notifications.Service), new(*notifications.NotificationService)),
wire.Bind(new(notifications.WebhookSender), new(*notifications.NotificationService)),
@ -420,12 +422,14 @@ var wireTestSet = wire.NewSet(
ProvideTestEnv,
metrics.WireSetForTest,
sqlstore.ProvideServiceForTests,
sqlstore.ProvideServiceWithReadReplicaForTests,
ngmetrics.ProvideServiceForTest,
notifications.MockNotificationService,
wire.Bind(new(notifications.Service), new(*notifications.NotificationServiceMock)),
wire.Bind(new(notifications.WebhookSender), new(*notifications.NotificationServiceMock)),
wire.Bind(new(notifications.EmailSender), new(*notifications.NotificationServiceMock)),
wire.Bind(new(db.DB), new(*sqlstore.SQLStore)),
wire.Bind(new(db.ReplDB), new(*sqlstore.ReplStore)),
prefimpl.ProvideService,
oauthtoken.ProvideService,
oauthtokentest.ProvideService,
@ -439,7 +443,7 @@ func Initialize(cfg *setting.Cfg, opts Options, apiOpts api.ServerOptions) (*Ser
func InitializeForTest(t sqlutil.ITestDB, cfg *setting.Cfg, opts Options, apiOpts api.ServerOptions) (*TestEnv, error) {
wire.Build(wireExtsTestSet)
return &TestEnv{Server: &Server{}, SQLStore: &sqlstore.SQLStore{}, Cfg: &setting.Cfg{}}, nil
return &TestEnv{Server: &Server{}, SQLStore: &sqlstore.SQLStore{}, ReadReplStore: &sqlstore.ReplStore{}, Cfg: &setting.Cfg{}}, nil
}
func InitializeForCLI(cfg *setting.Cfg) (Runner, error) {

View File

@ -1323,6 +1323,13 @@ var (
HideFromDocs: true,
HideFromAdminPage: true,
},
{
Name: "databaseReadReplica",
Description: "Use a read replica for some database queries.",
Stage: FeatureStageExperimental,
Owner: grafanaBackendServicesSquad,
Expression: "false", // enabled by default
},
}
)

View File

@ -176,3 +176,4 @@ pinNavItems,experimental,@grafana/grafana-frontend-platform,false,false,false
authZGRPCServer,experimental,@grafana/identity-access-team,false,false,false
openSearchBackendFlowEnabled,preview,@grafana/aws-datasources,false,false,false
ssoSettingsLDAP,experimental,@grafana/identity-access-team,false,false,false
databaseReadReplica,experimental,@grafana/grafana-backend-services-squad,false,false,false

1 Name Stage Owner requiresDevMode RequiresRestart FrontendOnly
176 authZGRPCServer experimental @grafana/identity-access-team false false false
177 openSearchBackendFlowEnabled preview @grafana/aws-datasources false false false
178 ssoSettingsLDAP experimental @grafana/identity-access-team false false false
179 databaseReadReplica experimental @grafana/grafana-backend-services-squad false false false

View File

@ -714,4 +714,8 @@ const (
// FlagSsoSettingsLDAP
// Use the new SSO Settings API to configure LDAP
FlagSsoSettingsLDAP = "ssoSettingsLDAP"
// FlagDatabaseReadReplica
// Use a read replica for some database queries.
FlagDatabaseReadReplica = "databaseReadReplica"
)

View File

@ -655,6 +655,18 @@
"frontend": true
}
},
{
"metadata": {
"name": "databaseReadReplica",
"resourceVersion": "1718308641844",
"creationTimestamp": "2024-06-13T19:57:21Z"
},
"spec": {
"description": "Use a read replica for some database queries.",
"stage": "experimental",
"codeowner": "@grafana/grafana-backend-services-squad"
}
},
{
"metadata": {
"name": "dataplaneFrontendFallback",

View File

@ -65,9 +65,11 @@ func NewDatabaseConfig(cfg *setting.Cfg, features featuremgmt.FeatureToggles) (*
return dbCfg, nil
}
func (dbCfg *DatabaseConfig) readConfig(cfg *setting.Cfg) error {
sec := cfg.Raw.Section("database")
// readConfigSection reads the database configuration from the given block of
// the configuration file. This method allows us to add a "database_replica"
// section to the configuration file while using the same cfg struct.
func (dbCfg *DatabaseConfig) readConfigSection(cfg *setting.Cfg, section string) error {
sec := cfg.Raw.Section(section)
cfgURL := sec.Key("url").String()
if len(cfgURL) != 0 {
dbURL, err := url.Parse(cfgURL)
@ -101,7 +103,6 @@ func (dbCfg *DatabaseConfig) readConfig(cfg *setting.Cfg) error {
dbCfg.MaxOpenConn = sec.Key("max_open_conn").MustInt(0)
dbCfg.MaxIdleConn = sec.Key("max_idle_conn").MustInt(2)
dbCfg.ConnMaxLifetime = sec.Key("conn_max_lifetime").MustInt(14400)
dbCfg.SslMode = sec.Key("ssl_mode").String()
dbCfg.SSLSNI = sec.Key("ssl_sni").String()
dbCfg.CaCertPath = sec.Key("ca_cert_path").String()
@ -110,21 +111,22 @@ func (dbCfg *DatabaseConfig) readConfig(cfg *setting.Cfg) error {
dbCfg.ServerCertName = sec.Key("server_cert_name").String()
dbCfg.Path = sec.Key("path").MustString("data/grafana.db")
dbCfg.IsolationLevel = sec.Key("isolation_level").String()
dbCfg.CacheMode = sec.Key("cache_mode").MustString("private")
dbCfg.WALEnabled = sec.Key("wal").MustBool(false)
dbCfg.SkipMigrations = sec.Key("skip_migrations").MustBool()
dbCfg.MigrationLock = sec.Key("migration_locking").MustBool(true)
dbCfg.MigrationLockAttemptTimeout = sec.Key("locking_attempt_timeout_sec").MustInt()
dbCfg.QueryRetries = sec.Key("query_retries").MustInt()
dbCfg.TransactionRetries = sec.Key("transaction_retries").MustInt(5)
dbCfg.LogQueries = sec.Key("log_queries").MustBool(false)
return nil
}
// readConfig is a wrapper around readConfigSection that read the "database" configuration block.
func (dbCfg *DatabaseConfig) readConfig(cfg *setting.Cfg) error {
return dbCfg.readConfigSection(cfg, "database")
}
func (dbCfg *DatabaseConfig) buildConnectionString(cfg *setting.Cfg, features featuremgmt.FeatureToggles) error {
if dbCfg.ConnectionString != "" {
return nil

View File

@ -0,0 +1,194 @@
package sqlstore
import (
"errors"
"time"
"github.com/dlmiddlecote/sqlstats"
"github.com/prometheus/client_golang/prometheus"
"xorm.io/xorm"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/sqlstore/migrations"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/services/sqlstore/sqlutil"
"github.com/grafana/grafana/pkg/setting"
)
// ReplStore is a wrapper around a main SQLStore and a read-only SQLStore. The
// main SQLStore is anonymous, so the ReplStore may be used directly as a
// SQLStore.
type ReplStore struct {
*SQLStore
repl *SQLStore
}
// DB returns the main SQLStore.
func (rs ReplStore) DB() *SQLStore {
return rs.SQLStore
}
// ReadReplica returns the read-only SQLStore. If no read replica is configured,
// it returns the main SQLStore.
func (rs ReplStore) ReadReplica() *SQLStore {
if rs.repl == nil {
rs.log.Debug("ReadReplica not configured, using main SQLStore")
return rs.SQLStore
}
rs.log.Debug("Using ReadReplica")
return rs.repl
}
// ProvideServiceWithReadReplica creates a new *SQLStore connection intended for
// use as a ReadReplica of the main SQLStore. The primary SQLStore must already
// be initialized.
func ProvideServiceWithReadReplica(primary *SQLStore, cfg *setting.Cfg,
features featuremgmt.FeatureToggles, migrations registry.DatabaseMigrator,
bus bus.Bus, tracer tracing.Tracer) (*ReplStore, error) {
// start with the initialized SQLStore
replStore := &ReplStore{primary, nil}
// FeatureToggle fallback: If the FlagDatabaseReadReplica feature flag is not enabled, return a single SQLStore.
if !features.IsEnabledGlobally(featuremgmt.FlagDatabaseReadReplica) {
primary.log.Debug("ReadReplica feature flag not enabled, using main SQLStore")
return replStore, nil
}
// This change will make xorm use an empty default schema for postgres and
// by that mimic the functionality of how it was functioning before
// xorm's changes above.
xorm.DefaultPostgresSchema = ""
s, err := newReadOnlySQLStore(cfg, features, bus, tracer)
if err != nil {
return nil, err
}
s.features = features
s.tracer = tracer
// initialize and register metrics wrapper around the *sql.DB
db := s.engine.DB().DB
// register the go_sql_stats_connections_* metrics
if err := prometheus.Register(sqlstats.NewStatsCollector("grafana_repl", db)); err != nil {
s.log.Warn("Failed to register sqlstore stats collector", "error", err)
}
replStore.repl = s
return replStore, nil
}
// newReadOnlySQLStore creates a new *SQLStore intended for use with a
// fully-populated read replica of the main Grafana Database. It provides no
// write capabilities and does not run migrations, but other tracing and logging
// features are enabled.
func newReadOnlySQLStore(cfg *setting.Cfg, features featuremgmt.FeatureToggles, bus bus.Bus, tracer tracing.Tracer) (*SQLStore, error) {
s := &SQLStore{
cfg: cfg,
log: log.New("replstore"),
bus: bus,
tracer: tracer,
}
s.features = features
s.tracer = tracer
err := s.initReadOnlyEngine(s.engine)
if err != nil {
return nil, err
}
s.dialect = migrator.NewDialect(s.engine.DriverName())
return s, nil
}
// initReadOnlyEngine initializes ss.engine for read-only operations. The database must be a fully-populated read replica.
func (ss *SQLStore) initReadOnlyEngine(engine *xorm.Engine) error {
if ss.engine != nil {
ss.log.Debug("Already connected to database replica")
return nil
}
dbCfg, err := NewRODatabaseConfig(ss.cfg, ss.features)
if err != nil {
return err
}
ss.dbCfg = dbCfg
if ss.cfg.DatabaseInstrumentQueries {
ss.dbCfg.Type = WrapDatabaseDriverWithHooks(ss.dbCfg.Type, ss.tracer)
}
if engine == nil {
var err error
engine, err = xorm.NewEngine(ss.dbCfg.Type, ss.dbCfg.ConnectionString)
if err != nil {
ss.log.Error("failed to connect to database replica", "error", err)
return err
}
// Only for MySQL or MariaDB, verify we can connect with the current connection string's system var for transaction isolation.
// If not, create a new engine with a compatible connection string.
if ss.dbCfg.Type == migrator.MySQL {
engine, err = ss.ensureTransactionIsolationCompatibility(engine, ss.dbCfg.ConnectionString)
if err != nil {
return err
}
}
}
engine.SetMaxOpenConns(ss.dbCfg.MaxOpenConn)
engine.SetMaxIdleConns(ss.dbCfg.MaxIdleConn)
engine.SetConnMaxLifetime(time.Second * time.Duration(ss.dbCfg.ConnMaxLifetime))
// configure sql logging
debugSQL := ss.cfg.Raw.Section("database_replica").Key("log_queries").MustBool(false)
if !debugSQL {
engine.SetLogger(&xorm.DiscardLogger{})
} else {
// add stack to database calls to be able to see what repository initiated queries. Top 7 items from the stack as they are likely in the xorm library.
engine.SetLogger(NewXormLogger(log.LvlInfo, log.WithSuffix(log.New("replsstore.xorm"), log.CallerContextKey, log.StackCaller(log.DefaultCallerDepth))))
engine.ShowSQL(true)
engine.ShowExecTime(true)
}
ss.engine = engine
return nil
}
// NewRODatabaseConfig creates a new read-only database configuration.
func NewRODatabaseConfig(cfg *setting.Cfg, features featuremgmt.FeatureToggles) (*DatabaseConfig, error) {
if cfg == nil {
return nil, errors.New("cfg cannot be nil")
}
dbCfg := &DatabaseConfig{}
if err := dbCfg.readConfigSection(cfg, "database_replica"); err != nil {
return nil, err
}
if err := dbCfg.buildConnectionString(cfg, features); err != nil {
return nil, err
}
return dbCfg, nil
}
// ProvideServiceWithReadReplicaForTests wraps the SQLStore in a ReplStore, with the main sqlstore as both the primary and read replica.
// TODO: eventually this should be replaced with a more robust test setup which in
func ProvideServiceWithReadReplicaForTests(testDB *SQLStore, t sqlutil.ITestDB, cfg *setting.Cfg, features featuremgmt.FeatureToggles, migrations registry.DatabaseMigrator) (*ReplStore, error) {
return &ReplStore{testDB, testDB}, nil
}
// InitTestReplDB initializes a test DB and returns it wrapped in a ReplStore with the main SQLStore as both the primary and read replica.
func InitTestReplDB(t sqlutil.ITestDB, opts ...InitTestDBOpt) (*ReplStore, *setting.Cfg) {
t.Helper()
features := getFeaturesForTesting(opts...)
cfg := getCfgForTesting(opts...)
ss, err := initTestDB(t, cfg, features, migrations.ProvideOSSMigrations(features), opts...)
if err != nil {
t.Fatalf("failed to initialize sql repl store: %s", err)
}
return &ReplStore{ss, ss}, cfg
}

View File

@ -9,6 +9,7 @@ import (
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/services/libraryelements/model"
"github.com/grafana/grafana/pkg/services/org"
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/services/stats"
"github.com/grafana/grafana/pkg/setting"
@ -17,12 +18,12 @@ import (
const activeUserTimeLimit = time.Hour * 24 * 30
const dailyActiveUserTimeLimit = time.Hour * 24
func ProvideService(cfg *setting.Cfg, db db.DB) stats.Service {
func ProvideService(cfg *setting.Cfg, db *sqlstore.ReplStore) stats.Service {
return &sqlStatsService{cfg: cfg, db: db}
}
type sqlStatsService struct {
db db.DB
db *sqlstore.ReplStore
cfg *setting.Cfg
}
@ -62,8 +63,8 @@ func notServiceAccount(dialect migrator.Dialect) string {
}
func (ss *sqlStatsService) GetSystemStats(ctx context.Context, query *stats.GetSystemStatsQuery) (result *stats.SystemStats, err error) {
dialect := ss.db.GetDialect()
err = ss.db.WithDbSession(ctx, func(dbSession *db.Session) error {
dialect := ss.db.ReadReplica().GetDialect()
err = ss.db.ReadReplica().WithDbSession(ctx, func(dbSession *db.Session) error {
sb := &db.SQLBuilder{}
sb.Write("SELECT ")
sb.Write(`(SELECT COUNT(*) FROM ` + dialect.Quote("user") + ` WHERE ` + notServiceAccount(dialect) + `) AS users,`)
@ -148,8 +149,8 @@ func (ss *sqlStatsService) roleCounterSQL(ctx context.Context) string {
}
func (ss *sqlStatsService) GetAdminStats(ctx context.Context, query *stats.GetAdminStatsQuery) (result *stats.AdminStats, err error) {
err = ss.db.WithDbSession(ctx, func(dbSession *db.Session) error {
dialect := ss.db.GetDialect()
err = ss.db.ReadReplica().WithDbSession(ctx, func(dbSession *db.Session) error {
dialect := ss.db.ReadReplica().GetDialect()
now := time.Now()
activeEndDate := now.Add(-activeUserTimeLimit)
dailyActiveEndDate := now.Add(-dailyActiveUserTimeLimit)

View File

@ -32,9 +32,9 @@ func TestIntegrationStatsDataAccess(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
db, cfg := db.InitTestDBWithCfg(t)
statsService := &sqlStatsService{db: db}
populateDB(t, db, cfg)
store, cfg := db.InitTestReplDBWithCfg(t)
statsService := &sqlStatsService{db: store}
populateDB(t, store, cfg)
t.Run("Get system stats should not results in error", func(t *testing.T) {
query := stats.GetSystemStatsQuery{}
@ -49,7 +49,7 @@ func TestIntegrationStatsDataAccess(t *testing.T) {
assert.Equal(t, int64(0), result.APIKeys)
assert.Equal(t, int64(2), result.Correlations)
assert.NotNil(t, result.DatabaseCreatedTime)
assert.Equal(t, db.GetDialect().DriverName(), result.DatabaseDriver)
assert.Equal(t, store.GetDialect().DriverName(), result.DatabaseDriver)
})
t.Run("Get system user count stats should not results in error", func(t *testing.T) {
@ -157,8 +157,8 @@ func TestIntegration_GetAdminStats(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
db, cfg := db.InitTestDBWithCfg(t)
statsService := ProvideService(cfg, db)
store, cfg := db.InitTestReplDBWithCfg(t)
statsService := ProvideService(cfg, store)
query := stats.GetAdminStatsQuery{}
_, err := statsService.GetAdminStats(context.Background(), &query)