MM-50427: Make MM survive DB replica outage (#22888)

We monitor the health of DB replicas, and on a fatal error,
take them out of the pool.

On a separate goroutine, we keep pinging the unhealthy replicas,
and on getting a good response back, we add them back to the pool.

https://mattermost.atlassian.net/browse/MM-50427

```release-note
Mattermost is now resilient against DB replica outages and will
dynamically choose a replica if it's alive.

Also added a config parameter ReplicaMonitorIntervalSeconds
whose default value is 5. This controls how frequently unhealthy
replicas will be monitored for liveness check.
```

Co-authored-by: Mattermost Build <build@mattermost.com>
This commit is contained in:
Agniva De Sarker 2023-04-19 17:03:18 +05:30 committed by yasserfaraazkhan
parent bab2620b59
commit 7216dcf367
13 changed files with 245 additions and 121 deletions

View File

@ -70,7 +70,10 @@ func (s *SQLStore) getMigrationConnection() (*sql.DB, error) {
}
*settings.DriverName = s.dbType
db := sqlstore.SetupConnection("master", connectionString, &settings)
db, err := sqlstore.SetupConnection("master", connectionString, &settings, sqlstore.DBPingAttempts)
if err != nil {
return nil, err
}
return db, nil
}

View File

@ -13,6 +13,7 @@ import (
type MetricsInterface interface {
Register()
RegisterDBCollector(db *sql.DB, name string)
UnregisterDBCollector(db *sql.DB, name string)
IncrementPostCreate()
IncrementWebhookPost()

View File

@ -319,6 +319,11 @@ func (_m *MetricsInterface) SetReplicaLagTime(node string, value float64) {
_m.Called(node, value)
}
// UnregisterDBCollector provides a mock function with given fields: db, name
func (_m *MetricsInterface) UnregisterDBCollector(db *sql.DB, name string) {
_m.Called(db, name)
}
type mockConstructorTestingTNewMetricsInterface interface {
mock.TestingT
Cleanup(func())

View File

@ -6,9 +6,12 @@ package sqlstore
import (
"context"
"database/sql"
"errors"
"net"
"regexp"
"strconv"
"strings"
"sync/atomic"
"time"
"unicode"
@ -66,14 +69,18 @@ type sqlxDBWrapper struct {
*sqlx.DB
queryTimeout time.Duration
trace bool
isOnline *atomic.Bool
}
func newSqlxDBWrapper(db *sqlx.DB, timeout time.Duration, trace bool) *sqlxDBWrapper {
return &sqlxDBWrapper{
w := &sqlxDBWrapper{
DB: db,
queryTimeout: timeout,
trace: trace,
isOnline: &atomic.Bool{},
}
w.isOnline.Store(true)
return w
}
func (w *sqlxDBWrapper) Stats() sql.DBStats {
@ -83,19 +90,19 @@ func (w *sqlxDBWrapper) Stats() sql.DBStats {
func (w *sqlxDBWrapper) Beginx() (*sqlxTxWrapper, error) {
tx, err := w.DB.Beginx()
if err != nil {
return nil, err
return nil, w.checkErr(err)
}
return newSqlxTxWrapper(tx, w.queryTimeout, w.trace), nil
return newSqlxTxWrapper(tx, w.queryTimeout, w.trace, w), nil
}
func (w *sqlxDBWrapper) BeginXWithIsolation(opts *sql.TxOptions) (*sqlxTxWrapper, error) {
tx, err := w.DB.BeginTxx(context.Background(), opts)
if err != nil {
return nil, err
return nil, w.checkErr(err)
}
return newSqlxTxWrapper(tx, w.queryTimeout, w.trace), nil
return newSqlxTxWrapper(tx, w.queryTimeout, w.trace, w), nil
}
func (w *sqlxDBWrapper) Get(dest any, query string, args ...any) error {
@ -109,7 +116,7 @@ func (w *sqlxDBWrapper) Get(dest any, query string, args ...any) error {
}(time.Now())
}
return w.DB.GetContext(ctx, dest, query, args...)
return w.checkErr(w.DB.GetContext(ctx, dest, query, args...))
}
func (w *sqlxDBWrapper) GetBuilder(dest any, builder Builder) error {
@ -134,7 +141,7 @@ func (w *sqlxDBWrapper) NamedExec(query string, arg any) (sql.Result, error) {
}(time.Now())
}
return w.DB.NamedExecContext(ctx, query, arg)
return w.checkErrWithResult(w.DB.NamedExecContext(ctx, query, arg))
}
func (w *sqlxDBWrapper) Exec(query string, args ...any) (sql.Result, error) {
@ -161,7 +168,7 @@ func (w *sqlxDBWrapper) ExecNoTimeout(query string, args ...any) (sql.Result, er
}(time.Now())
}
return w.DB.ExecContext(context.Background(), query, args...)
return w.checkErrWithResult(w.DB.ExecContext(context.Background(), query, args...))
}
// ExecRaw is like Exec but without any rebinding of params. You need to pass
@ -176,7 +183,7 @@ func (w *sqlxDBWrapper) ExecRaw(query string, args ...any) (sql.Result, error) {
}(time.Now())
}
return w.DB.ExecContext(ctx, query, args...)
return w.checkErrWithResult(w.DB.ExecContext(ctx, query, args...))
}
func (w *sqlxDBWrapper) NamedQuery(query string, arg any) (*sqlx.Rows, error) {
@ -192,7 +199,7 @@ func (w *sqlxDBWrapper) NamedQuery(query string, arg any) (*sqlx.Rows, error) {
}(time.Now())
}
return w.DB.NamedQueryContext(ctx, query, arg)
return w.checkErrWithRows(w.DB.NamedQueryContext(ctx, query, arg))
}
func (w *sqlxDBWrapper) QueryRowX(query string, args ...any) *sqlx.Row {
@ -220,7 +227,7 @@ func (w *sqlxDBWrapper) QueryX(query string, args ...any) (*sqlx.Rows, error) {
}(time.Now())
}
return w.DB.QueryxContext(ctx, query, args)
return w.checkErrWithRows(w.DB.QueryxContext(ctx, query, args))
}
func (w *sqlxDBWrapper) Select(dest any, query string, args ...any) error {
@ -238,7 +245,7 @@ func (w *sqlxDBWrapper) SelectCtx(ctx context.Context, dest any, query string, a
}(time.Now())
}
return w.DB.SelectContext(ctx, dest, query, args...)
return w.checkErr(w.DB.SelectContext(ctx, dest, query, args...))
}
func (w *sqlxDBWrapper) SelectBuilder(dest any, builder Builder) error {
@ -254,13 +261,15 @@ type sqlxTxWrapper struct {
*sqlx.Tx
queryTimeout time.Duration
trace bool
dbw *sqlxDBWrapper
}
func newSqlxTxWrapper(tx *sqlx.Tx, timeout time.Duration, trace bool) *sqlxTxWrapper {
func newSqlxTxWrapper(tx *sqlx.Tx, timeout time.Duration, trace bool, dbw *sqlxDBWrapper) *sqlxTxWrapper {
return &sqlxTxWrapper{
Tx: tx,
queryTimeout: timeout,
trace: trace,
dbw: dbw,
}
}
@ -275,7 +284,7 @@ func (w *sqlxTxWrapper) Get(dest any, query string, args ...any) error {
}(time.Now())
}
return w.Tx.GetContext(ctx, dest, query, args...)
return w.dbw.checkErr(w.Tx.GetContext(ctx, dest, query, args...))
}
func (w *sqlxTxWrapper) GetBuilder(dest any, builder Builder) error {
@ -284,13 +293,13 @@ func (w *sqlxTxWrapper) GetBuilder(dest any, builder Builder) error {
return err
}
return w.Get(dest, query, args...)
return w.dbw.checkErr(w.Get(dest, query, args...))
}
func (w *sqlxTxWrapper) Exec(query string, args ...any) (sql.Result, error) {
query = w.Tx.Rebind(query)
return w.ExecRaw(query, args...)
return w.dbw.checkErrWithResult(w.ExecRaw(query, args...))
}
func (w *sqlxTxWrapper) ExecNoTimeout(query string, args ...any) (sql.Result, error) {
@ -302,7 +311,7 @@ func (w *sqlxTxWrapper) ExecNoTimeout(query string, args ...any) (sql.Result, er
}(time.Now())
}
return w.Tx.ExecContext(context.Background(), query, args...)
return w.dbw.checkErrWithResult(w.Tx.ExecContext(context.Background(), query, args...))
}
func (w *sqlxTxWrapper) ExecBuilder(builder Builder) (sql.Result, error) {
@ -326,7 +335,7 @@ func (w *sqlxTxWrapper) ExecRaw(query string, args ...any) (sql.Result, error) {
}(time.Now())
}
return w.Tx.ExecContext(ctx, query, args...)
return w.dbw.checkErrWithResult(w.Tx.ExecContext(ctx, query, args...))
}
func (w *sqlxTxWrapper) NamedExec(query string, arg any) (sql.Result, error) {
@ -342,7 +351,7 @@ func (w *sqlxTxWrapper) NamedExec(query string, arg any) (sql.Result, error) {
}(time.Now())
}
return w.Tx.NamedExecContext(ctx, query, arg)
return w.dbw.checkErrWithResult(w.Tx.NamedExecContext(ctx, query, arg))
}
func (w *sqlxTxWrapper) NamedQuery(query string, arg any) (*sqlx.Rows, error) {
@ -386,7 +395,7 @@ func (w *sqlxTxWrapper) NamedQuery(query string, arg any) (*sqlx.Rows, error) {
}
}
return res.rows, res.err
return res.rows, w.dbw.checkErr(res.err)
}
func (w *sqlxTxWrapper) QueryRowX(query string, args ...any) *sqlx.Row {
@ -414,7 +423,7 @@ func (w *sqlxTxWrapper) QueryX(query string, args ...any) (*sqlx.Rows, error) {
}(time.Now())
}
return w.Tx.QueryxContext(ctx, query, args)
return w.dbw.checkErrWithRows(w.Tx.QueryxContext(ctx, query, args))
}
func (w *sqlxTxWrapper) Select(dest any, query string, args ...any) error {
@ -428,7 +437,7 @@ func (w *sqlxTxWrapper) Select(dest any, query string, args ...any) error {
}(time.Now())
}
return w.Tx.SelectContext(ctx, dest, query, args...)
return w.dbw.checkErr(w.Tx.SelectContext(ctx, dest, query, args...))
}
func (w *sqlxTxWrapper) SelectBuilder(dest any, builder Builder) error {
@ -459,3 +468,23 @@ func printArgs(query string, dur time.Duration, args ...any) {
}
mlog.Debug(query, fields...)
}
func (w *sqlxDBWrapper) checkErrWithResult(res sql.Result, err error) (sql.Result, error) {
return res, w.checkErr(err)
}
func (w *sqlxDBWrapper) checkErrWithRows(res *sqlx.Rows, err error) (*sqlx.Rows, error) {
return res, w.checkErr(err)
}
func (w *sqlxDBWrapper) checkErr(err error) error {
var netError *net.OpError
if errors.As(err, &netError) && (!netError.Temporary() && !netError.Timeout()) {
w.isOnline.Store(false)
}
return err
}
func (w *sqlxDBWrapper) Online() bool {
return w.isOnline.Load()
}

View File

@ -6,6 +6,7 @@ package sqlstore
import (
"context"
"strings"
"sync"
"testing"
"github.com/stretchr/testify/assert"
@ -28,12 +29,14 @@ func TestSqlX(t *testing.T) {
}
*settings.QueryTimeout = 1
store := &SqlStore{
rrCounter: 0,
srCounter: 0,
settings: settings,
rrCounter: 0,
srCounter: 0,
settings: settings,
quitMonitor: make(chan struct{}),
wgMonitor: &sync.WaitGroup{},
}
store.initConnection()
require.NoError(t, store.initConnection())
defer store.Close()

View File

@ -49,7 +49,7 @@ const (
MySQLForeignKeyViolationErrorCode = 1452
PGDuplicateObjectErrorCode = "42710"
MySQLDuplicateObjectErrorCode = 1022
DBPingAttempts = 18
DBPingAttempts = 5
DBPingTimeoutSecs = 10
// This is a numerical version string by postgres. The format is
// 2 characters for major, minor, and patch version prior to 10.
@ -123,9 +123,9 @@ type SqlStore struct {
masterX *sqlxDBWrapper
ReplicaXs []*sqlxDBWrapper
ReplicaXs []*atomic.Pointer[sqlxDBWrapper]
searchReplicaXs []*sqlxDBWrapper
searchReplicaXs []*atomic.Pointer[sqlxDBWrapper]
replicaLagHandles []*dbsql.DB
stores SqlStoreStores
@ -138,17 +138,28 @@ type SqlStore struct {
isBinaryParam bool
pgDefaultTextSearchConfig string
quitMonitor chan struct{}
wgMonitor *sync.WaitGroup
}
func New(settings model.SqlSettings, metrics einterfaces.MetricsInterface) *SqlStore {
store := &SqlStore{
rrCounter: 0,
srCounter: 0,
settings: &settings,
metrics: metrics,
rrCounter: 0,
srCounter: 0,
settings: &settings,
metrics: metrics,
quitMonitor: make(chan struct{}),
wgMonitor: &sync.WaitGroup{},
}
store.initConnection()
err := store.initConnection()
if err != nil {
mlog.Fatal("Error setting up connections", mlog.Err(err))
}
store.wgMonitor.Add(1)
go store.monitorReplicas()
ver, err := store.GetDbVersion(true)
if err != nil {
@ -230,29 +241,28 @@ func New(settings model.SqlSettings, metrics einterfaces.MetricsInterface) *SqlS
// SetupConnection sets up the connection to the database and pings it to make sure it's alive.
// It also applies any database configuration settings that are required.
func SetupConnection(connType string, dataSource string, settings *model.SqlSettings) *dbsql.DB {
func SetupConnection(connType string, dataSource string, settings *model.SqlSettings, attempts int) (*dbsql.DB, error) {
db, err := dbsql.Open(*settings.DriverName, dataSource)
if err != nil {
mlog.Fatal("Failed to open SQL connection to err.", mlog.Err(err))
return nil, errors.Wrap(err, "failed to open SQL connection")
}
for i := 0; i < DBPingAttempts; i++ {
for i := 0; i < attempts; i++ {
// At this point, we have passed sql.Open, so we deliberately ignore any errors.
sanitized, _ := SanitizeDataSource(*settings.DriverName, dataSource)
mlog.Info("Pinging SQL", mlog.String("database", connType), mlog.String("dataSource", sanitized))
ctx, cancel := context.WithTimeout(context.Background(), DBPingTimeoutSecs*time.Second)
defer cancel()
err = db.PingContext(ctx)
if err == nil {
break
} else {
if i == DBPingAttempts-1 {
mlog.Fatal("Failed to ping DB, server will exit.", mlog.Err(err))
} else {
mlog.Error("Failed to ping DB", mlog.Err(err), mlog.Int("retrying in seconds", DBPingTimeoutSecs))
time.Sleep(DBPingTimeoutSecs * time.Second)
if err != nil {
if i == attempts-1 {
return nil, err
}
mlog.Error("Failed to ping DB", mlog.Err(err), mlog.Int("retrying in seconds", DBPingTimeoutSecs))
time.Sleep(DBPingTimeoutSecs * time.Second)
continue
}
break
}
if strings.HasPrefix(connType, replicaLagPrefix) {
@ -272,7 +282,7 @@ func SetupConnection(connType string, dataSource string, settings *model.SqlSett
db.SetConnMaxLifetime(time.Duration(*settings.ConnMaxLifetimeMilliseconds) * time.Millisecond)
db.SetConnMaxIdleTime(time.Duration(*settings.ConnMaxIdleTimeMilliseconds) * time.Millisecond)
return db
return db, nil
}
func (ss *SqlStore) SetContext(context context.Context) {
@ -285,7 +295,7 @@ func (ss *SqlStore) Context() context.Context {
func noOpMapper(s string) string { return s }
func (ss *SqlStore) initConnection() {
func (ss *SqlStore) initConnection() error {
dataSource := *ss.settings.DataSource
if ss.DriverName() == model.DatabaseDriverMysql {
// TODO: We ignore the readTimeout datasource parameter for MySQL since QueryTimeout
@ -294,11 +304,14 @@ func (ss *SqlStore) initConnection() {
var err error
dataSource, err = ResetReadTimeout(dataSource)
if err != nil {
mlog.Fatal("Failed to reset read timeout from datasource.", mlog.Err(err), mlog.String("src", dataSource))
return errors.Wrap(err, "failed to reset read timeout from datasource")
}
}
handle := SetupConnection("master", dataSource, ss.settings)
handle, err := SetupConnection("master", dataSource, ss.settings, DBPingAttempts)
if err != nil {
return err
}
ss.masterX = newSqlxDBWrapper(sqlx.NewDb(handle, ss.DriverName()),
time.Duration(*ss.settings.QueryTimeout)*time.Second,
*ss.settings.Trace)
@ -310,34 +323,32 @@ func (ss *SqlStore) initConnection() {
}
if len(ss.settings.DataSourceReplicas) > 0 {
ss.ReplicaXs = make([]*sqlxDBWrapper, len(ss.settings.DataSourceReplicas))
ss.ReplicaXs = make([]*atomic.Pointer[sqlxDBWrapper], len(ss.settings.DataSourceReplicas))
for i, replica := range ss.settings.DataSourceReplicas {
handle := SetupConnection(fmt.Sprintf("replica-%v", i), replica, ss.settings)
ss.ReplicaXs[i] = newSqlxDBWrapper(sqlx.NewDb(handle, ss.DriverName()),
time.Duration(*ss.settings.QueryTimeout)*time.Second,
*ss.settings.Trace)
if ss.DriverName() == model.DatabaseDriverMysql {
ss.ReplicaXs[i].MapperFunc(noOpMapper)
}
if ss.metrics != nil {
ss.metrics.RegisterDBCollector(ss.ReplicaXs[i].DB.DB, "replica-"+strconv.Itoa(i))
ss.ReplicaXs[i] = &atomic.Pointer[sqlxDBWrapper]{}
handle, err = SetupConnection(fmt.Sprintf("replica-%v", i), replica, ss.settings, DBPingAttempts)
if err != nil {
// Initializing to be offline
ss.ReplicaXs[i].Store(&sqlxDBWrapper{isOnline: &atomic.Bool{}})
mlog.Warn("Failed to setup connection. Skipping..", mlog.String("db", fmt.Sprintf("replica-%v", i)), mlog.Err(err))
continue
}
ss.setDB(ss.ReplicaXs[i], handle, "replica-"+strconv.Itoa(i))
}
}
if len(ss.settings.DataSourceSearchReplicas) > 0 {
ss.searchReplicaXs = make([]*sqlxDBWrapper, len(ss.settings.DataSourceSearchReplicas))
ss.searchReplicaXs = make([]*atomic.Pointer[sqlxDBWrapper], len(ss.settings.DataSourceSearchReplicas))
for i, replica := range ss.settings.DataSourceSearchReplicas {
handle := SetupConnection(fmt.Sprintf("search-replica-%v", i), replica, ss.settings)
ss.searchReplicaXs[i] = newSqlxDBWrapper(sqlx.NewDb(handle, ss.DriverName()),
time.Duration(*ss.settings.QueryTimeout)*time.Second,
*ss.settings.Trace)
if ss.DriverName() == model.DatabaseDriverMysql {
ss.searchReplicaXs[i].MapperFunc(noOpMapper)
}
if ss.metrics != nil {
ss.metrics.RegisterDBCollector(ss.searchReplicaXs[i].DB.DB, "searchreplica-"+strconv.Itoa(i))
ss.searchReplicaXs[i] = &atomic.Pointer[sqlxDBWrapper]{}
handle, err = SetupConnection(fmt.Sprintf("search-replica-%v", i), replica, ss.settings, DBPingAttempts)
if err != nil {
// Initializing to be offline
ss.searchReplicaXs[i].Store(&sqlxDBWrapper{isOnline: &atomic.Bool{}})
mlog.Warn("Failed to setup connection. Skipping..", mlog.String("db", fmt.Sprintf("search-replica-%v", i)), mlog.Err(err))
continue
}
ss.setDB(ss.searchReplicaXs[i], handle, "searchreplica-"+strconv.Itoa(i))
}
}
@ -347,9 +358,14 @@ func (ss *SqlStore) initConnection() {
if src.DataSource == nil {
continue
}
ss.replicaLagHandles[i] = SetupConnection(fmt.Sprintf(replicaLagPrefix+"-%d", i), *src.DataSource, ss.settings)
ss.replicaLagHandles[i], err = SetupConnection(fmt.Sprintf(replicaLagPrefix+"-%d", i), *src.DataSource, ss.settings, DBPingAttempts)
if err != nil {
mlog.Warn("Failed to setup replica lag handle. Skipping..", mlog.String("db", fmt.Sprintf(replicaLagPrefix+"-%d", i)), mlog.Err(err))
continue
}
}
}
return nil
}
func (ss *SqlStore) DriverName() string {
@ -455,8 +471,15 @@ func (ss *SqlStore) GetSearchReplicaX() *sqlxDBWrapper {
return ss.GetReplicaX()
}
rrNum := atomic.AddInt64(&ss.srCounter, 1) % int64(len(ss.searchReplicaXs))
return ss.searchReplicaXs[rrNum]
for i := 0; i < len(ss.searchReplicaXs); i++ {
rrNum := atomic.AddInt64(&ss.srCounter, 1) % int64(len(ss.searchReplicaXs))
if ss.searchReplicaXs[rrNum].Load().Online() {
return ss.searchReplicaXs[rrNum].Load()
}
}
// If all search replicas are down, then go with replica.
return ss.GetReplicaX()
}
func (ss *SqlStore) GetReplicaX() *sqlxDBWrapper {
@ -464,23 +487,64 @@ func (ss *SqlStore) GetReplicaX() *sqlxDBWrapper {
return ss.GetMasterX()
}
rrNum := atomic.AddInt64(&ss.rrCounter, 1) % int64(len(ss.ReplicaXs))
return ss.ReplicaXs[rrNum]
}
func (ss *SqlStore) GetInternalReplicaDBs() []*sql.DB {
if len(ss.settings.DataSourceReplicas) == 0 || ss.lockedToMaster || !ss.hasLicense() {
return []*sql.DB{
ss.GetMasterX().DB.DB,
for i := 0; i < len(ss.ReplicaXs); i++ {
rrNum := atomic.AddInt64(&ss.rrCounter, 1) % int64(len(ss.ReplicaXs))
if ss.ReplicaXs[rrNum].Load().Online() {
return ss.ReplicaXs[rrNum].Load()
}
}
dbs := make([]*sql.DB, len(ss.ReplicaXs))
for i, rx := range ss.ReplicaXs {
dbs[i] = rx.DB.DB
}
// If all replicas are down, then go with master.
return ss.GetMasterX()
}
return dbs
func (ss *SqlStore) monitorReplicas() {
t := time.NewTicker(time.Duration(*ss.settings.ReplicaMonitorIntervalSeconds) * time.Second)
defer func() {
t.Stop()
ss.wgMonitor.Done()
}()
for {
select {
case <-ss.quitMonitor:
return
case <-t.C:
setupReplica := func(r *atomic.Pointer[sqlxDBWrapper], dsn, name string) {
if r.Load().Online() {
return
}
handle, err := SetupConnection(name, dsn, ss.settings, 1)
if err != nil {
mlog.Warn("Failed to setup connection. Skipping..", mlog.String("db", name), mlog.Err(err))
return
}
if ss.metrics != nil && r.Load() != nil && r.Load().DB != nil {
ss.metrics.UnregisterDBCollector(r.Load().DB.DB, name)
}
ss.setDB(r, handle, name)
}
for i, replica := range ss.ReplicaXs {
setupReplica(replica, ss.settings.DataSourceReplicas[i], "replica-"+strconv.Itoa(i))
}
for i, replica := range ss.searchReplicaXs {
setupReplica(replica, ss.settings.DataSourceSearchReplicas[i], "search-replica-"+strconv.Itoa(i))
}
}
}
}
func (ss *SqlStore) setDB(replica *atomic.Pointer[sqlxDBWrapper], handle *dbsql.DB, name string) {
replica.Store(newSqlxDBWrapper(sqlx.NewDb(handle, ss.DriverName()),
time.Duration(*ss.settings.QueryTimeout)*time.Second,
*ss.settings.Trace))
if ss.DriverName() == model.DatabaseDriverMysql {
replica.Load().MapperFunc(noOpMapper)
}
if ss.metrics != nil {
ss.metrics.RegisterDBCollector(replica.Load().DB.DB, name)
}
}
func (ss *SqlStore) GetInternalReplicaDB() *sql.DB {
@ -489,7 +553,7 @@ func (ss *SqlStore) GetInternalReplicaDB() *sql.DB {
}
rrNum := atomic.AddInt64(&ss.rrCounter, 1) % int64(len(ss.ReplicaXs))
return ss.ReplicaXs[rrNum].DB.DB
return ss.ReplicaXs[rrNum].Load().DB.DB
}
func (ss *SqlStore) TotalMasterDbConnections() int {
@ -541,7 +605,10 @@ func (ss *SqlStore) TotalReadDbConnections() int {
count := 0
for _, db := range ss.ReplicaXs {
count = count + db.Stats().OpenConnections
if !db.Load().Online() {
continue
}
count = count + db.Load().Stats().OpenConnections
}
return count
@ -554,7 +621,10 @@ func (ss *SqlStore) TotalSearchDbConnections() int {
count := 0
for _, db := range ss.searchReplicaXs {
count = count + db.Stats().OpenConnections
if !db.Load().Online() {
continue
}
count = count + db.Load().Stats().OpenConnections
}
return count
@ -782,9 +852,14 @@ func IsUniqueConstraintError(err error, indexName []string) bool {
}
func (ss *SqlStore) GetAllConns() []*sqlxDBWrapper {
all := make([]*sqlxDBWrapper, len(ss.ReplicaXs)+1)
copy(all, ss.ReplicaXs)
all[len(ss.ReplicaXs)] = ss.masterX
all := make([]*sqlxDBWrapper, 0, len(ss.ReplicaXs)+1)
for i := range ss.ReplicaXs {
if !ss.ReplicaXs[i].Load().Online() {
continue
}
all = append(all, ss.ReplicaXs[i].Load())
}
all = append(all, ss.masterX)
return all
}
@ -807,11 +882,24 @@ func (ss *SqlStore) RecycleDBConnections(d time.Duration) {
func (ss *SqlStore) Close() {
ss.masterX.Close()
// Closing monitor and waiting for it to be done.
// This needs to be done before closing the replica handles.
close(ss.quitMonitor)
ss.wgMonitor.Wait()
for _, replica := range ss.ReplicaXs {
replica.Close()
if replica.Load().Online() {
replica.Load().Close()
}
}
for _, replica := range ss.searchReplicaXs {
if replica.Load().Online() {
replica.Load().Close()
}
}
for _, replica := range ss.replicaLagHandles {
replica.Close()
}
}
@ -1132,7 +1220,10 @@ func (ss *SqlStore) migrate(direction migrationDirection) error {
if err != nil {
return err
}
db := SetupConnection("master", dataSource, ss.settings)
db, err2 := SetupConnection("master", dataSource, ss.settings, DBPingAttempts)
if err2 != nil {
return err2
}
driver, err = ms.WithInstance(db)
defer db.Close()
case model.DatabaseDriverPostgres:

View File

@ -761,13 +761,15 @@ func TestReplicaLagQuery(t *testing.T) {
mockMetrics.On("RegisterDBCollector", mock.AnythingOfType("*sql.DB"), "master")
store := &SqlStore{
rrCounter: 0,
srCounter: 0,
settings: settings,
metrics: mockMetrics,
rrCounter: 0,
srCounter: 0,
settings: settings,
metrics: mockMetrics,
quitMonitor: make(chan struct{}),
wgMonitor: &sync.WaitGroup{},
}
store.initConnection()
require.NoError(t, store.initConnection())
store.stores.post = newSqlPostStore(store, mockMetrics)
err = store.migrate(migrationsDirectionUp)
require.NoError(t, err)
@ -839,9 +841,11 @@ func TestMySQLReadTimeout(t *testing.T) {
settings.DataSource = &dataSource
store := &SqlStore{
settings: settings,
settings: settings,
quitMonitor: make(chan struct{}),
wgMonitor: &sync.WaitGroup{},
}
store.initConnection()
require.NoError(t, store.initConnection())
defer store.Close()
_, err = store.GetMasterX().ExecNoTimeout(`SELECT SLEEP(3)`)

View File

@ -72,10 +72,7 @@ type Store interface {
// GetInternalMasterDB allows access to the raw master DB
// handle for the multi-product architecture.
GetInternalMasterDB() *sql.DB
// GetInternalReplicaDBs allows access to the raw replica DB
// handles for the multi-product architecture.
GetInternalReplicaDB() *sql.DB
GetInternalReplicaDBs() []*sql.DB
TotalMasterDbConnections() int
TotalReadDbConnections() int
TotalSearchDbConnections() int

View File

@ -346,22 +346,6 @@ func (_m *Store) GetInternalReplicaDB() *sql.DB {
return r0
}
// GetInternalReplicaDBs provides a mock function with given fields:
func (_m *Store) GetInternalReplicaDBs() []*sql.DB {
ret := _m.Called()
var r0 []*sql.DB
if rf, ok := ret.Get(0).(func() []*sql.DB); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*sql.DB)
}
}
return r0
}
// Group provides a mock function with given fields:
func (_m *Store) Group() store.GroupStore {
ret := _m.Called()

View File

@ -261,6 +261,7 @@ func MakeSqlSettings(driver string, withReplica bool) *model.SqlSettings {
}
log("Created temporary " + driver + " database " + dbName)
settings.ReplicaMonitorIntervalSeconds = model.NewInt(5)
return settings
}

View File

@ -331,7 +331,7 @@ func (h *MainHelper) SetReplicationLagForTesting(seconds int) error {
func (h *MainHelper) execOnEachReplica(query string, args ...any) error {
for _, replica := range h.SQLStore.ReplicaXs {
_, err := replica.Exec(query, args...)
_, err := replica.Load().Exec(query, args...)
if err != nil {
return err
}

View File

@ -1173,6 +1173,7 @@ type SqlSettings struct {
DisableDatabaseSearch *bool `access:"environment_database,write_restrictable,cloud_restrictable"`
MigrationsStatementTimeoutSeconds *int `access:"environment_database,write_restrictable,cloud_restrictable"`
ReplicaLagSettings []*ReplicaLagSettings `access:"environment_database,write_restrictable,cloud_restrictable"` // telemetry: none
ReplicaMonitorIntervalSeconds *int `access:"environment_database,write_restrictable,cloud_restrictable"`
}
func (s *SqlSettings) SetDefaults(isUpdate bool) {
@ -1237,6 +1238,10 @@ func (s *SqlSettings) SetDefaults(isUpdate bool) {
if s.ReplicaLagSettings == nil {
s.ReplicaLagSettings = []*ReplicaLagSettings{}
}
if s.ReplicaMonitorIntervalSeconds == nil {
s.ReplicaMonitorIntervalSeconds = NewInt(5)
}
}
type LogSettings struct {

View File

@ -522,6 +522,7 @@ func (ts *TelemetryService) trackConfig() {
"query_timeout": *cfg.SqlSettings.QueryTimeout,
"disable_database_search": *cfg.SqlSettings.DisableDatabaseSearch,
"migrations_statement_timeout_seconds": *cfg.SqlSettings.MigrationsStatementTimeoutSeconds,
"replica_monitor_interval_seconds": *cfg.SqlSettings.ReplicaMonitorIntervalSeconds,
})
ts.SendTelemetry(TrackConfigLog, map[string]any{