package sqlstore import ( "errors" "fmt" "regexp" "sync/atomic" "time" "github.com/dlmiddlecote/sqlstats" "github.com/prometheus/client_golang/prometheus" "xorm.io/xorm" "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/registry" "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/sqlstore/migrations" "github.com/grafana/grafana/pkg/services/sqlstore/migrator" "github.com/grafana/grafana/pkg/services/sqlstore/sqlutil" "github.com/grafana/grafana/pkg/setting" ) // ReplStore is a wrapper around a main SQLStore and a read-only SQLStore. The // main SQLStore is anonymous, so the ReplStore may be used directly as a // SQLStore. type ReplStore struct { *SQLStore repls []*SQLStore // next is the index of the next read-only SQLStore in the chain. next uint64 } // DB returns the main SQLStore. func (rs *ReplStore) DB() *SQLStore { return rs.SQLStore } // ReadReplica returns the read-only SQLStore. If no read replica is configured, // it returns the main SQLStore. func (rs *ReplStore) ReadReplica() *SQLStore { if len(rs.repls) == 0 { rs.log.Debug("ReadReplica not configured, using main SQLStore") return rs.SQLStore } return rs.nextRepl() } // nextRepl() returns the next read-only SQLStore in the chain. If no read replica is configured, the Primary is returned. func (rs *ReplStore) nextRepl() *SQLStore { // start by grabbing the replica at the current index selected := rs.repls[(int(rs.next))%len(rs.repls)] // then increment the index for the next call atomic.AddUint64(&rs.next, 1) return selected } // ProvideServiceWithReadReplica creates a new *SQLStore connection intended for // use as a ReadReplica of the main SQLStore. The primary SQLStore must already // be initialized. func ProvideServiceWithReadReplica(primary *SQLStore, cfg *setting.Cfg, features featuremgmt.FeatureToggles, migrations registry.DatabaseMigrator, bus bus.Bus, tracer tracing.Tracer) (*ReplStore, error) { // start with the initialized SQLStore replStore := &ReplStore{primary, nil, 0} // FeatureToggle fallback: If the FlagDatabaseReadReplica feature flag is not enabled, return a single SQLStore. if !features.IsEnabledGlobally(featuremgmt.FlagDatabaseReadReplica) { primary.log.Debug("ReadReplica feature flag not enabled, using main SQLStore") return replStore, nil } // This change will make xorm use an empty default schema for postgres and // by that mimic the functionality of how it was functioning before // xorm's changes above. xorm.DefaultPostgresSchema = "" // Parsing the configuration to get the number of repls replCfgs, err := NewRODatabaseConfigs(cfg, features) if err != nil { return nil, err } if err := validateReplicaConfigs(primary.dbCfg, replCfgs); err != nil { return nil, fmt.Errorf("failed to validate replica configurations: %w", err) } if len(replCfgs) > 0 { replStore.repls = make([]*SQLStore, len(replCfgs)) } for i, replCfg := range replCfgs { // If the database_instrument_queries feature is enabled, wrap the driver with hooks. if cfg.DatabaseInstrumentQueries { replCfg.Type = WrapDatabaseReplDriverWithHooks(replCfg.Type, uint(i), tracer) } s, err := newReadOnlySQLStore(cfg, &replCfg, features, bus, tracer) if err != nil { return nil, err } // initialize and register metrics wrapper around the *sql.DB db := s.engine.DB().DB // register the go_sql_stats_connections_* metrics if err := prometheus.Register(sqlstats.NewStatsCollector("grafana_repl", db)); err != nil { s.log.Warn("Failed to register sqlstore stats collector", "error", err) } replStore.repls[i] = s } return replStore, nil } // newReadOnlySQLStore creates a new *SQLStore intended for use with a // fully-populated read replica of the main Grafana Database. It provides no // write capabilities and does not run migrations, but other tracing and logging // features are enabled. func newReadOnlySQLStore(cfg *setting.Cfg, dbCfg *DatabaseConfig, features featuremgmt.FeatureToggles, bus bus.Bus, tracer tracing.Tracer) (*SQLStore, error) { s := &SQLStore{ log: log.New("replstore"), bus: bus, tracer: tracer, features: features, dbCfg: dbCfg, cfg: cfg, } err := s.initReadOnlyEngine(s.engine) if err != nil { return nil, err } // When there are multiple read replicas, we append an index to the driver name (ex: mysqlWithHooks11). // Remove the index from the end of the driver name to get the original driver name that xorm and other libraries recognize. driverName := digitsRegexp.ReplaceAllString(s.engine.DriverName(), "") s.dialect = migrator.NewDialect(driverName) return s, nil } // digitsRegexp is used to remove the index from the end of the driver name. var digitsRegexp = regexp.MustCompile("[0-9]+") // initReadOnlyEngine initializes ss.engine for read-only operations. The database must be a fully-populated read replica. func (ss *SQLStore) initReadOnlyEngine(engine *xorm.Engine) error { if ss.engine != nil { ss.log.Debug("Already connected to database replica") return nil } if engine == nil { var err error engine, err = xorm.NewEngine(ss.dbCfg.Type, ss.dbCfg.ConnectionString) if err != nil { ss.log.Error("failed to connect to database replica", "error", err) return err } // Only for MySQL or MariaDB, verify we can connect with the current connection string's system var for transaction isolation. // If not, create a new engine with a compatible connection string. if ss.dbCfg.Type == migrator.MySQL { engine, err = ss.ensureTransactionIsolationCompatibility(engine, ss.dbCfg.ConnectionString) if err != nil { return err } } } engine.SetMaxOpenConns(ss.dbCfg.MaxOpenConn) engine.SetMaxIdleConns(ss.dbCfg.MaxIdleConn) engine.SetConnMaxLifetime(time.Second * time.Duration(ss.dbCfg.ConnMaxLifetime)) // configure sql logging debugSQL := ss.cfg.Raw.Section("database_replica").Key("log_queries").MustBool(false) if !debugSQL { engine.SetLogger(&xorm.DiscardLogger{}) } else { // add stack to database calls to be able to see what repository initiated queries. Top 7 items from the stack as they are likely in the xorm library. engine.SetLogger(NewXormLogger(log.LvlInfo, log.WithSuffix(log.New("replsstore.xorm"), log.CallerContextKey, log.StackCaller(log.DefaultCallerDepth)))) engine.ShowSQL(true) engine.ShowExecTime(true) } ss.engine = engine return nil } // NewRODatabaseConfig creates a new read-only database configuration. func NewRODatabaseConfigs(cfg *setting.Cfg, features featuremgmt.FeatureToggles) ([]DatabaseConfig, error) { if cfg == nil { return nil, errors.New("cfg cannot be nil") } // If one replica is configured in the database_replicas section, use it as the default defaultReplCfg := DatabaseConfig{} if err := defaultReplCfg.readConfigSection(cfg, "database_replicas"); err != nil { return nil, err } err := defaultReplCfg.buildConnectionString(cfg, features) if err != nil { return nil, err } ret := []DatabaseConfig{defaultReplCfg} // Check for individual replicas in the database_replica section (e.g. database_replica.one, database_replica.cheetara) repls := cfg.Raw.Section("database_replica") if len(repls.ChildSections()) > 0 { for _, sec := range repls.ChildSections() { replCfg := DatabaseConfig{} if err := replCfg.parseConfigIni(sec); err != nil { return nil, err } if err := replCfg.buildConnectionString(cfg, features); err != nil { return nil, err } ret = append(ret, replCfg) } } return ret, nil } // ProvideServiceWithReadReplicaForTests wraps the SQLStore in a ReplStore, with the main sqlstore as both the primary and read replica. // TODO: eventually this should be replaced with a more robust test setup which in func ProvideServiceWithReadReplicaForTests(testDB *SQLStore, t sqlutil.ITestDB, cfg *setting.Cfg, features featuremgmt.FeatureToggles, migrations registry.DatabaseMigrator) (*ReplStore, error) { return newReplStore(testDB, testDB), nil } // InitTestReplDB initializes a test DB and returns it wrapped in a ReplStore with the main SQLStore as both the primary and read replica. func InitTestReplDB(t sqlutil.ITestDB, opts ...InitTestDBOpt) (*ReplStore, *setting.Cfg) { t.Helper() features := getFeaturesForTesting(opts...) cfg := getCfgForTesting(opts...) ss, err := initTestDB(t, cfg, features, migrations.ProvideOSSMigrations(features), opts...) if err != nil { t.Fatalf("failed to initialize sql repl store: %s", err) } return newReplStore(ss, ss), cfg } // InitTestReplDBWithMigration initializes the test DB given custom migrations. func InitTestReplDBWithMigration(t sqlutil.ITestDB, migration registry.DatabaseMigrator, opts ...InitTestDBOpt) *ReplStore { t.Helper() features := getFeaturesForTesting(opts...) cfg := getCfgForTesting(opts...) ss, err := initTestDB(t, cfg, features, migration, opts...) if err != nil { t.Fatalf("failed to initialize sql store: %s", err) } return newReplStore(ss, ss) } // newReplStore is a wrapper function that returns a ReplStore with the given primary and read replicas. func newReplStore(primary *SQLStore, readReplicas ...*SQLStore) *ReplStore { ret := &ReplStore{ SQLStore: primary, repls: make([]*SQLStore, len(readReplicas)), next: 0, } ret.repls = readReplicas return ret } // FakeReplStoreFromStore returns a ReplStore with the given primary // SQLStore and no read replicas. This is a bare-minimum wrapper for testing, // and should be removed when all services are using ReplStore in favor of // InitTestReplDB. func FakeReplStoreFromStore(primary *SQLStore) *ReplStore { return &ReplStore{ SQLStore: primary, next: 0, } }