ReplStore: Add support for round robin load balancing between multiple read replicas (#90530)

* ReplStore: Add support for multiple replicas and round-robin load balancing

* add check for zero-length repls list
This commit is contained in:
Kristin Laemmert 2024-07-18 08:20:28 -04:00 committed by GitHub
parent 2d152b7ec1
commit 27b52b1507
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 154 additions and 40 deletions

View File

@ -9,6 +9,5 @@ type ReplDB interface {
DB() *sqlstore.SQLStore
// ReadReplica is the read-only database connection. If no read replica is configured, the implementation must return the primary DB.
// TODO: ReadReplica will take a list of replicas and load-balance across them in a future milestone.
ReadReplica() *sqlstore.SQLStore
}

View File

@ -10,6 +10,7 @@ import (
"strings"
"github.com/go-sql-driver/mysql"
"gopkg.in/ini.v1"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
@ -70,6 +71,10 @@ func NewDatabaseConfig(cfg *setting.Cfg, features featuremgmt.FeatureToggles) (*
// section to the configuration file while using the same cfg struct.
func (dbCfg *DatabaseConfig) readConfigSection(cfg *setting.Cfg, section string) error {
sec := cfg.Raw.Section(section)
return dbCfg.parseConfigIni(sec)
}
func (dbCfg *DatabaseConfig) parseConfigIni(sec *ini.Section) error {
cfgURL := sec.Key("url").String()
if len(cfgURL) != 0 {
dbURL, err := url.Parse(cfgURL)

View File

@ -2,6 +2,7 @@ package sqlstore
import (
"errors"
"sync/atomic"
"time"
"github.com/dlmiddlecote/sqlstats"
@ -24,23 +25,37 @@ import (
// SQLStore.
type ReplStore struct {
*SQLStore
repl *SQLStore
repls []*SQLStore
// next is the index of the next read-only SQLStore in the chain.
next uint64
}
// DB returns the main SQLStore.
func (rs ReplStore) DB() *SQLStore {
func (rs *ReplStore) DB() *SQLStore {
return rs.SQLStore
}
// ReadReplica returns the read-only SQLStore. If no read replica is configured,
// it returns the main SQLStore.
func (rs ReplStore) ReadReplica() *SQLStore {
if rs.repl == nil {
func (rs *ReplStore) ReadReplica() *SQLStore {
if rs.repls == nil || len(rs.repls) == 0 {
rs.log.Debug("ReadReplica not configured, using main SQLStore")
return rs.SQLStore
}
rs.log.Debug("Using ReadReplica")
return rs.repl
return rs.nextRepl()
}
// nextRepl() returns the next read-only SQLStore in the chain. If no read replica is configured, the Primary is returned.
func (rs *ReplStore) nextRepl() *SQLStore {
// start by grabbing the replica at the current index
selected := rs.repls[(int(rs.next))%len(rs.repls)]
// then increment the index for the next call
atomic.AddUint64(&rs.next, 1)
return selected
}
// ProvideServiceWithReadReplica creates a new *SQLStore connection intended for
@ -50,7 +65,7 @@ func ProvideServiceWithReadReplica(primary *SQLStore, cfg *setting.Cfg,
features featuremgmt.FeatureToggles, migrations registry.DatabaseMigrator,
bus bus.Bus, tracer tracing.Tracer) (*ReplStore, error) {
// start with the initialized SQLStore
replStore := &ReplStore{primary, nil}
replStore := &ReplStore{primary, nil, 0}
// FeatureToggle fallback: If the FlagDatabaseReadReplica feature flag is not enabled, return a single SQLStore.
if !features.IsEnabledGlobally(featuremgmt.FlagDatabaseReadReplica) {
@ -62,22 +77,32 @@ func ProvideServiceWithReadReplica(primary *SQLStore, cfg *setting.Cfg,
// by that mimic the functionality of how it was functioning before
// xorm's changes above.
xorm.DefaultPostgresSchema = ""
s, err := newReadOnlySQLStore(cfg, features, bus, tracer)
// Parsing the configuration to get the number of repls
replCfgs, err := NewRODatabaseConfigs(cfg, features)
if err != nil {
return nil, err
}
s.features = features
s.tracer = tracer
// initialize and register metrics wrapper around the *sql.DB
db := s.engine.DB().DB
// register the go_sql_stats_connections_* metrics
if err := prometheus.Register(sqlstats.NewStatsCollector("grafana_repl", db)); err != nil {
s.log.Warn("Failed to register sqlstore stats collector", "error", err)
if len(replCfgs) > 0 {
replStore.repls = make([]*SQLStore, len(replCfgs))
}
replStore.repl = s
for i, replCfg := range replCfgs {
s, err := newReadOnlySQLStore(cfg, replCfg, features, bus, tracer)
if err != nil {
return nil, err
}
// initialize and register metrics wrapper around the *sql.DB
db := s.engine.DB().DB
// register the go_sql_stats_connections_* metrics
if err := prometheus.Register(sqlstats.NewStatsCollector("grafana_repl", db)); err != nil {
s.log.Warn("Failed to register sqlstore stats collector", "error", err)
}
replStore.repls[i] = s
}
return replStore, nil
}
@ -85,17 +110,16 @@ func ProvideServiceWithReadReplica(primary *SQLStore, cfg *setting.Cfg,
// fully-populated read replica of the main Grafana Database. It provides no
// write capabilities and does not run migrations, but other tracing and logging
// features are enabled.
func newReadOnlySQLStore(cfg *setting.Cfg, features featuremgmt.FeatureToggles, bus bus.Bus, tracer tracing.Tracer) (*SQLStore, error) {
func newReadOnlySQLStore(cfg *setting.Cfg, dbCfg *DatabaseConfig, features featuremgmt.FeatureToggles, bus bus.Bus, tracer tracing.Tracer) (*SQLStore, error) {
s := &SQLStore{
cfg: cfg,
log: log.New("replstore"),
bus: bus,
tracer: tracer,
log: log.New("replstore"),
bus: bus,
tracer: tracer,
features: features,
dbCfg: dbCfg,
cfg: cfg,
}
s.features = features
s.tracer = tracer
err := s.initReadOnlyEngine(s.engine)
if err != nil {
return nil, err
@ -111,12 +135,6 @@ func (ss *SQLStore) initReadOnlyEngine(engine *xorm.Engine) error {
return nil
}
dbCfg, err := NewRODatabaseConfig(ss.cfg, ss.features)
if err != nil {
return err
}
ss.dbCfg = dbCfg
if ss.cfg.DatabaseInstrumentQueries {
ss.dbCfg.Type = WrapDatabaseReplDriverWithHooks(ss.dbCfg.Type, ss.tracer)
}
@ -158,27 +176,44 @@ func (ss *SQLStore) initReadOnlyEngine(engine *xorm.Engine) error {
}
// NewRODatabaseConfig creates a new read-only database configuration.
func NewRODatabaseConfig(cfg *setting.Cfg, features featuremgmt.FeatureToggles) (*DatabaseConfig, error) {
func NewRODatabaseConfigs(cfg *setting.Cfg, features featuremgmt.FeatureToggles) ([]*DatabaseConfig, error) {
if cfg == nil {
return nil, errors.New("cfg cannot be nil")
}
dbCfg := &DatabaseConfig{}
if err := dbCfg.readConfigSection(cfg, "database_replica"); err != nil {
// if only one replica is configured in the database_replicas section, use it as the default
defaultReplCfg := &DatabaseConfig{}
if err := defaultReplCfg.readConfigSection(cfg, "database_replicas"); err != nil {
return nil, err
}
if err := dbCfg.buildConnectionString(cfg, features); err != nil {
err := defaultReplCfg.buildConnectionString(cfg, features)
if err != nil {
return nil, err
}
ret := []*DatabaseConfig{defaultReplCfg}
return dbCfg, nil
// Check for additional replicas as children of the database_replicas section (e.g. database_replicas.one, database_replicas.cheetara)
repls := cfg.Raw.Section("database_replicas")
if len(repls.ChildSections()) > 0 {
for _, sec := range repls.ChildSections() {
replCfg := &DatabaseConfig{}
if err := replCfg.parseConfigIni(sec); err != nil {
return nil, err
}
if err := replCfg.buildConnectionString(cfg, features); err != nil {
return nil, err
}
ret = append(ret, replCfg)
}
}
return ret, nil
}
// ProvideServiceWithReadReplicaForTests wraps the SQLStore in a ReplStore, with the main sqlstore as both the primary and read replica.
// TODO: eventually this should be replaced with a more robust test setup which in
func ProvideServiceWithReadReplicaForTests(testDB *SQLStore, t sqlutil.ITestDB, cfg *setting.Cfg, features featuremgmt.FeatureToggles, migrations registry.DatabaseMigrator) (*ReplStore, error) {
return &ReplStore{testDB, testDB}, nil
return newReplStore(testDB, testDB), nil
}
// InitTestReplDB initializes a test DB and returns it wrapped in a ReplStore with the main SQLStore as both the primary and read replica.
@ -190,7 +225,7 @@ func InitTestReplDB(t sqlutil.ITestDB, opts ...InitTestDBOpt) (*ReplStore, *sett
if err != nil {
t.Fatalf("failed to initialize sql repl store: %s", err)
}
return &ReplStore{ss, ss}, cfg
return newReplStore(ss, ss), cfg
}
// InitTestReplDBWithMigration initializes the test DB given custom migrations.
@ -202,5 +237,16 @@ func InitTestReplDBWithMigration(t sqlutil.ITestDB, migration registry.DatabaseM
if err != nil {
t.Fatalf("failed to initialize sql store: %s", err)
}
return &ReplStore{ss, ss}
return newReplStore(ss, ss)
}
// newReplStore is a wrapper function that returns a ReplStore with the given primary and read replicas.
func newReplStore(primary *SQLStore, readReplicas ...*SQLStore) *ReplStore {
ret := &ReplStore{
SQLStore: primary,
repls: make([]*SQLStore, len(readReplicas)),
next: 0,
}
ret.repls = readReplicas
return ret
}

View File

@ -0,0 +1,64 @@
package sqlstore
import (
"fmt"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
"gopkg.in/ini.v1"
"github.com/grafana/grafana/pkg/setting"
)
func TestReplStore_ReadReplica(t *testing.T) {
// Using the connection strings to differentiate between the replicas
replStore, _ := InitTestReplDB(t)
replStore.repls[0].dbCfg.ConnectionString = "repl0"
repl1 := &SQLStore{dbCfg: &DatabaseConfig{ConnectionString: "repl1"}}
repl2 := &SQLStore{dbCfg: &DatabaseConfig{ConnectionString: "repl2"}}
replStore.repls = append(replStore.repls, repl1, repl2)
got := make([]string, 5)
for i := 0; i < 5; i++ {
got[i] = replStore.ReadReplica().dbCfg.ConnectionString
}
want := []string{"repl0", "repl1", "repl2", "repl0", "repl1"}
if cmp.Equal(got, want) == false {
t.Fatal("wrong result. Got:", got, "Want:", want)
}
}
func TestNewRODatabaseConfig(t *testing.T) {
inicfg, err := ini.Load([]byte(replCfg))
require.NoError(t, err)
cfg, err := setting.NewCfgFromINIFile(inicfg)
require.NoError(t, err)
dbCfgs, err := NewRODatabaseConfigs(cfg, nil)
require.NoError(t, err)
var connStr = func(port int) string {
return fmt.Sprintf("grafana:password@tcp(127.0.0.1:%d)/grafana?collation=utf8mb4_unicode_ci&allowNativePasswords=true&clientFoundRows=true", port)
}
for i, c := range dbCfgs {
if !cmp.Equal(c.ConnectionString, connStr(i+3306)) {
t.Errorf("wrong result for connection string %d.\nGot: %s,\nWant: %s", i, c.ConnectionString, connStr(i+3306))
}
}
}
var replCfg = `
[database_replicas]
type = mysql
name = grafana
user = grafana
password = password
host = 127.0.0.1:3306
[database_replicas.one] =
host = 127.0.0.1:3307
[database_replicas.two] =
host = 127.0.0.1:3308`