diff --git a/server/boards/services/store/sqlstore/migrate.go b/server/boards/services/store/sqlstore/migrate.go index 1c876a5168..63b4665490 100644 --- a/server/boards/services/store/sqlstore/migrate.go +++ b/server/boards/services/store/sqlstore/migrate.go @@ -70,7 +70,10 @@ func (s *SQLStore) getMigrationConnection() (*sql.DB, error) { } *settings.DriverName = s.dbType - db := sqlstore.SetupConnection("master", connectionString, &settings) + db, err := sqlstore.SetupConnection("master", connectionString, &settings, sqlstore.DBPingAttempts) + if err != nil { + return nil, err + } return db, nil } diff --git a/server/channels/einterfaces/metrics.go b/server/channels/einterfaces/metrics.go index 06f44f7b66..c44af2a3c5 100644 --- a/server/channels/einterfaces/metrics.go +++ b/server/channels/einterfaces/metrics.go @@ -13,6 +13,7 @@ import ( type MetricsInterface interface { Register() RegisterDBCollector(db *sql.DB, name string) + UnregisterDBCollector(db *sql.DB, name string) IncrementPostCreate() IncrementWebhookPost() diff --git a/server/channels/einterfaces/mocks/MetricsInterface.go b/server/channels/einterfaces/mocks/MetricsInterface.go index 0d6f799ee5..06f568546a 100644 --- a/server/channels/einterfaces/mocks/MetricsInterface.go +++ b/server/channels/einterfaces/mocks/MetricsInterface.go @@ -319,6 +319,11 @@ func (_m *MetricsInterface) SetReplicaLagTime(node string, value float64) { _m.Called(node, value) } +// UnregisterDBCollector provides a mock function with given fields: db, name +func (_m *MetricsInterface) UnregisterDBCollector(db *sql.DB, name string) { + _m.Called(db, name) +} + type mockConstructorTestingTNewMetricsInterface interface { mock.TestingT Cleanup(func()) diff --git a/server/channels/store/sqlstore/sqlx_wrapper.go b/server/channels/store/sqlstore/sqlx_wrapper.go index 0dab579512..e8d771cada 100644 --- a/server/channels/store/sqlstore/sqlx_wrapper.go +++ b/server/channels/store/sqlstore/sqlx_wrapper.go @@ -6,9 +6,12 @@ package sqlstore import ( "context" "database/sql" + "errors" + "net" "regexp" "strconv" "strings" + "sync/atomic" "time" "unicode" @@ -66,14 +69,18 @@ type sqlxDBWrapper struct { *sqlx.DB queryTimeout time.Duration trace bool + isOnline *atomic.Bool } func newSqlxDBWrapper(db *sqlx.DB, timeout time.Duration, trace bool) *sqlxDBWrapper { - return &sqlxDBWrapper{ + w := &sqlxDBWrapper{ DB: db, queryTimeout: timeout, trace: trace, + isOnline: &atomic.Bool{}, } + w.isOnline.Store(true) + return w } func (w *sqlxDBWrapper) Stats() sql.DBStats { @@ -83,19 +90,19 @@ func (w *sqlxDBWrapper) Stats() sql.DBStats { func (w *sqlxDBWrapper) Beginx() (*sqlxTxWrapper, error) { tx, err := w.DB.Beginx() if err != nil { - return nil, err + return nil, w.checkErr(err) } - return newSqlxTxWrapper(tx, w.queryTimeout, w.trace), nil + return newSqlxTxWrapper(tx, w.queryTimeout, w.trace, w), nil } func (w *sqlxDBWrapper) BeginXWithIsolation(opts *sql.TxOptions) (*sqlxTxWrapper, error) { tx, err := w.DB.BeginTxx(context.Background(), opts) if err != nil { - return nil, err + return nil, w.checkErr(err) } - return newSqlxTxWrapper(tx, w.queryTimeout, w.trace), nil + return newSqlxTxWrapper(tx, w.queryTimeout, w.trace, w), nil } func (w *sqlxDBWrapper) Get(dest any, query string, args ...any) error { @@ -109,7 +116,7 @@ func (w *sqlxDBWrapper) Get(dest any, query string, args ...any) error { }(time.Now()) } - return w.DB.GetContext(ctx, dest, query, args...) + return w.checkErr(w.DB.GetContext(ctx, dest, query, args...)) } func (w *sqlxDBWrapper) GetBuilder(dest any, builder Builder) error { @@ -134,7 +141,7 @@ func (w *sqlxDBWrapper) NamedExec(query string, arg any) (sql.Result, error) { }(time.Now()) } - return w.DB.NamedExecContext(ctx, query, arg) + return w.checkErrWithResult(w.DB.NamedExecContext(ctx, query, arg)) } func (w *sqlxDBWrapper) Exec(query string, args ...any) (sql.Result, error) { @@ -161,7 +168,7 @@ func (w *sqlxDBWrapper) ExecNoTimeout(query string, args ...any) (sql.Result, er }(time.Now()) } - return w.DB.ExecContext(context.Background(), query, args...) + return w.checkErrWithResult(w.DB.ExecContext(context.Background(), query, args...)) } // ExecRaw is like Exec but without any rebinding of params. You need to pass @@ -176,7 +183,7 @@ func (w *sqlxDBWrapper) ExecRaw(query string, args ...any) (sql.Result, error) { }(time.Now()) } - return w.DB.ExecContext(ctx, query, args...) + return w.checkErrWithResult(w.DB.ExecContext(ctx, query, args...)) } func (w *sqlxDBWrapper) NamedQuery(query string, arg any) (*sqlx.Rows, error) { @@ -192,7 +199,7 @@ func (w *sqlxDBWrapper) NamedQuery(query string, arg any) (*sqlx.Rows, error) { }(time.Now()) } - return w.DB.NamedQueryContext(ctx, query, arg) + return w.checkErrWithRows(w.DB.NamedQueryContext(ctx, query, arg)) } func (w *sqlxDBWrapper) QueryRowX(query string, args ...any) *sqlx.Row { @@ -220,7 +227,7 @@ func (w *sqlxDBWrapper) QueryX(query string, args ...any) (*sqlx.Rows, error) { }(time.Now()) } - return w.DB.QueryxContext(ctx, query, args) + return w.checkErrWithRows(w.DB.QueryxContext(ctx, query, args)) } func (w *sqlxDBWrapper) Select(dest any, query string, args ...any) error { @@ -238,7 +245,7 @@ func (w *sqlxDBWrapper) SelectCtx(ctx context.Context, dest any, query string, a }(time.Now()) } - return w.DB.SelectContext(ctx, dest, query, args...) + return w.checkErr(w.DB.SelectContext(ctx, dest, query, args...)) } func (w *sqlxDBWrapper) SelectBuilder(dest any, builder Builder) error { @@ -254,13 +261,15 @@ type sqlxTxWrapper struct { *sqlx.Tx queryTimeout time.Duration trace bool + dbw *sqlxDBWrapper } -func newSqlxTxWrapper(tx *sqlx.Tx, timeout time.Duration, trace bool) *sqlxTxWrapper { +func newSqlxTxWrapper(tx *sqlx.Tx, timeout time.Duration, trace bool, dbw *sqlxDBWrapper) *sqlxTxWrapper { return &sqlxTxWrapper{ Tx: tx, queryTimeout: timeout, trace: trace, + dbw: dbw, } } @@ -275,7 +284,7 @@ func (w *sqlxTxWrapper) Get(dest any, query string, args ...any) error { }(time.Now()) } - return w.Tx.GetContext(ctx, dest, query, args...) + return w.dbw.checkErr(w.Tx.GetContext(ctx, dest, query, args...)) } func (w *sqlxTxWrapper) GetBuilder(dest any, builder Builder) error { @@ -284,13 +293,13 @@ func (w *sqlxTxWrapper) GetBuilder(dest any, builder Builder) error { return err } - return w.Get(dest, query, args...) + return w.dbw.checkErr(w.Get(dest, query, args...)) } func (w *sqlxTxWrapper) Exec(query string, args ...any) (sql.Result, error) { query = w.Tx.Rebind(query) - return w.ExecRaw(query, args...) + return w.dbw.checkErrWithResult(w.ExecRaw(query, args...)) } func (w *sqlxTxWrapper) ExecNoTimeout(query string, args ...any) (sql.Result, error) { @@ -302,7 +311,7 @@ func (w *sqlxTxWrapper) ExecNoTimeout(query string, args ...any) (sql.Result, er }(time.Now()) } - return w.Tx.ExecContext(context.Background(), query, args...) + return w.dbw.checkErrWithResult(w.Tx.ExecContext(context.Background(), query, args...)) } func (w *sqlxTxWrapper) ExecBuilder(builder Builder) (sql.Result, error) { @@ -326,7 +335,7 @@ func (w *sqlxTxWrapper) ExecRaw(query string, args ...any) (sql.Result, error) { }(time.Now()) } - return w.Tx.ExecContext(ctx, query, args...) + return w.dbw.checkErrWithResult(w.Tx.ExecContext(ctx, query, args...)) } func (w *sqlxTxWrapper) NamedExec(query string, arg any) (sql.Result, error) { @@ -342,7 +351,7 @@ func (w *sqlxTxWrapper) NamedExec(query string, arg any) (sql.Result, error) { }(time.Now()) } - return w.Tx.NamedExecContext(ctx, query, arg) + return w.dbw.checkErrWithResult(w.Tx.NamedExecContext(ctx, query, arg)) } func (w *sqlxTxWrapper) NamedQuery(query string, arg any) (*sqlx.Rows, error) { @@ -386,7 +395,7 @@ func (w *sqlxTxWrapper) NamedQuery(query string, arg any) (*sqlx.Rows, error) { } } - return res.rows, res.err + return res.rows, w.dbw.checkErr(res.err) } func (w *sqlxTxWrapper) QueryRowX(query string, args ...any) *sqlx.Row { @@ -414,7 +423,7 @@ func (w *sqlxTxWrapper) QueryX(query string, args ...any) (*sqlx.Rows, error) { }(time.Now()) } - return w.Tx.QueryxContext(ctx, query, args) + return w.dbw.checkErrWithRows(w.Tx.QueryxContext(ctx, query, args)) } func (w *sqlxTxWrapper) Select(dest any, query string, args ...any) error { @@ -428,7 +437,7 @@ func (w *sqlxTxWrapper) Select(dest any, query string, args ...any) error { }(time.Now()) } - return w.Tx.SelectContext(ctx, dest, query, args...) + return w.dbw.checkErr(w.Tx.SelectContext(ctx, dest, query, args...)) } func (w *sqlxTxWrapper) SelectBuilder(dest any, builder Builder) error { @@ -459,3 +468,23 @@ func printArgs(query string, dur time.Duration, args ...any) { } mlog.Debug(query, fields...) } + +func (w *sqlxDBWrapper) checkErrWithResult(res sql.Result, err error) (sql.Result, error) { + return res, w.checkErr(err) +} + +func (w *sqlxDBWrapper) checkErrWithRows(res *sqlx.Rows, err error) (*sqlx.Rows, error) { + return res, w.checkErr(err) +} + +func (w *sqlxDBWrapper) checkErr(err error) error { + var netError *net.OpError + if errors.As(err, &netError) && (!netError.Temporary() && !netError.Timeout()) { + w.isOnline.Store(false) + } + return err +} + +func (w *sqlxDBWrapper) Online() bool { + return w.isOnline.Load() +} diff --git a/server/channels/store/sqlstore/sqlx_wrapper_test.go b/server/channels/store/sqlstore/sqlx_wrapper_test.go index 07c6391767..c03d228935 100644 --- a/server/channels/store/sqlstore/sqlx_wrapper_test.go +++ b/server/channels/store/sqlstore/sqlx_wrapper_test.go @@ -6,6 +6,7 @@ package sqlstore import ( "context" "strings" + "sync" "testing" "github.com/stretchr/testify/assert" @@ -28,12 +29,14 @@ func TestSqlX(t *testing.T) { } *settings.QueryTimeout = 1 store := &SqlStore{ - rrCounter: 0, - srCounter: 0, - settings: settings, + rrCounter: 0, + srCounter: 0, + settings: settings, + quitMonitor: make(chan struct{}), + wgMonitor: &sync.WaitGroup{}, } - store.initConnection() + require.NoError(t, store.initConnection()) defer store.Close() diff --git a/server/channels/store/sqlstore/store.go b/server/channels/store/sqlstore/store.go index d39f92661c..acd02b0853 100644 --- a/server/channels/store/sqlstore/store.go +++ b/server/channels/store/sqlstore/store.go @@ -49,7 +49,7 @@ const ( MySQLForeignKeyViolationErrorCode = 1452 PGDuplicateObjectErrorCode = "42710" MySQLDuplicateObjectErrorCode = 1022 - DBPingAttempts = 18 + DBPingAttempts = 5 DBPingTimeoutSecs = 10 // This is a numerical version string by postgres. The format is // 2 characters for major, minor, and patch version prior to 10. @@ -123,9 +123,9 @@ type SqlStore struct { masterX *sqlxDBWrapper - ReplicaXs []*sqlxDBWrapper + ReplicaXs []*atomic.Pointer[sqlxDBWrapper] - searchReplicaXs []*sqlxDBWrapper + searchReplicaXs []*atomic.Pointer[sqlxDBWrapper] replicaLagHandles []*dbsql.DB stores SqlStoreStores @@ -138,17 +138,28 @@ type SqlStore struct { isBinaryParam bool pgDefaultTextSearchConfig string + + quitMonitor chan struct{} + wgMonitor *sync.WaitGroup } func New(settings model.SqlSettings, metrics einterfaces.MetricsInterface) *SqlStore { store := &SqlStore{ - rrCounter: 0, - srCounter: 0, - settings: &settings, - metrics: metrics, + rrCounter: 0, + srCounter: 0, + settings: &settings, + metrics: metrics, + quitMonitor: make(chan struct{}), + wgMonitor: &sync.WaitGroup{}, } - store.initConnection() + err := store.initConnection() + if err != nil { + mlog.Fatal("Error setting up connections", mlog.Err(err)) + } + + store.wgMonitor.Add(1) + go store.monitorReplicas() ver, err := store.GetDbVersion(true) if err != nil { @@ -230,29 +241,28 @@ func New(settings model.SqlSettings, metrics einterfaces.MetricsInterface) *SqlS // SetupConnection sets up the connection to the database and pings it to make sure it's alive. // It also applies any database configuration settings that are required. -func SetupConnection(connType string, dataSource string, settings *model.SqlSettings) *dbsql.DB { +func SetupConnection(connType string, dataSource string, settings *model.SqlSettings, attempts int) (*dbsql.DB, error) { db, err := dbsql.Open(*settings.DriverName, dataSource) if err != nil { - mlog.Fatal("Failed to open SQL connection to err.", mlog.Err(err)) + return nil, errors.Wrap(err, "failed to open SQL connection") } - for i := 0; i < DBPingAttempts; i++ { + for i := 0; i < attempts; i++ { // At this point, we have passed sql.Open, so we deliberately ignore any errors. sanitized, _ := SanitizeDataSource(*settings.DriverName, dataSource) mlog.Info("Pinging SQL", mlog.String("database", connType), mlog.String("dataSource", sanitized)) ctx, cancel := context.WithTimeout(context.Background(), DBPingTimeoutSecs*time.Second) defer cancel() err = db.PingContext(ctx) - if err == nil { - break - } else { - if i == DBPingAttempts-1 { - mlog.Fatal("Failed to ping DB, server will exit.", mlog.Err(err)) - } else { - mlog.Error("Failed to ping DB", mlog.Err(err), mlog.Int("retrying in seconds", DBPingTimeoutSecs)) - time.Sleep(DBPingTimeoutSecs * time.Second) + if err != nil { + if i == attempts-1 { + return nil, err } + mlog.Error("Failed to ping DB", mlog.Err(err), mlog.Int("retrying in seconds", DBPingTimeoutSecs)) + time.Sleep(DBPingTimeoutSecs * time.Second) + continue } + break } if strings.HasPrefix(connType, replicaLagPrefix) { @@ -272,7 +282,7 @@ func SetupConnection(connType string, dataSource string, settings *model.SqlSett db.SetConnMaxLifetime(time.Duration(*settings.ConnMaxLifetimeMilliseconds) * time.Millisecond) db.SetConnMaxIdleTime(time.Duration(*settings.ConnMaxIdleTimeMilliseconds) * time.Millisecond) - return db + return db, nil } func (ss *SqlStore) SetContext(context context.Context) { @@ -285,7 +295,7 @@ func (ss *SqlStore) Context() context.Context { func noOpMapper(s string) string { return s } -func (ss *SqlStore) initConnection() { +func (ss *SqlStore) initConnection() error { dataSource := *ss.settings.DataSource if ss.DriverName() == model.DatabaseDriverMysql { // TODO: We ignore the readTimeout datasource parameter for MySQL since QueryTimeout @@ -294,11 +304,14 @@ func (ss *SqlStore) initConnection() { var err error dataSource, err = ResetReadTimeout(dataSource) if err != nil { - mlog.Fatal("Failed to reset read timeout from datasource.", mlog.Err(err), mlog.String("src", dataSource)) + return errors.Wrap(err, "failed to reset read timeout from datasource") } } - handle := SetupConnection("master", dataSource, ss.settings) + handle, err := SetupConnection("master", dataSource, ss.settings, DBPingAttempts) + if err != nil { + return err + } ss.masterX = newSqlxDBWrapper(sqlx.NewDb(handle, ss.DriverName()), time.Duration(*ss.settings.QueryTimeout)*time.Second, *ss.settings.Trace) @@ -310,34 +323,32 @@ func (ss *SqlStore) initConnection() { } if len(ss.settings.DataSourceReplicas) > 0 { - ss.ReplicaXs = make([]*sqlxDBWrapper, len(ss.settings.DataSourceReplicas)) + ss.ReplicaXs = make([]*atomic.Pointer[sqlxDBWrapper], len(ss.settings.DataSourceReplicas)) for i, replica := range ss.settings.DataSourceReplicas { - handle := SetupConnection(fmt.Sprintf("replica-%v", i), replica, ss.settings) - ss.ReplicaXs[i] = newSqlxDBWrapper(sqlx.NewDb(handle, ss.DriverName()), - time.Duration(*ss.settings.QueryTimeout)*time.Second, - *ss.settings.Trace) - if ss.DriverName() == model.DatabaseDriverMysql { - ss.ReplicaXs[i].MapperFunc(noOpMapper) - } - if ss.metrics != nil { - ss.metrics.RegisterDBCollector(ss.ReplicaXs[i].DB.DB, "replica-"+strconv.Itoa(i)) + ss.ReplicaXs[i] = &atomic.Pointer[sqlxDBWrapper]{} + handle, err = SetupConnection(fmt.Sprintf("replica-%v", i), replica, ss.settings, DBPingAttempts) + if err != nil { + // Initializing to be offline + ss.ReplicaXs[i].Store(&sqlxDBWrapper{isOnline: &atomic.Bool{}}) + mlog.Warn("Failed to setup connection. Skipping..", mlog.String("db", fmt.Sprintf("replica-%v", i)), mlog.Err(err)) + continue } + ss.setDB(ss.ReplicaXs[i], handle, "replica-"+strconv.Itoa(i)) } } if len(ss.settings.DataSourceSearchReplicas) > 0 { - ss.searchReplicaXs = make([]*sqlxDBWrapper, len(ss.settings.DataSourceSearchReplicas)) + ss.searchReplicaXs = make([]*atomic.Pointer[sqlxDBWrapper], len(ss.settings.DataSourceSearchReplicas)) for i, replica := range ss.settings.DataSourceSearchReplicas { - handle := SetupConnection(fmt.Sprintf("search-replica-%v", i), replica, ss.settings) - ss.searchReplicaXs[i] = newSqlxDBWrapper(sqlx.NewDb(handle, ss.DriverName()), - time.Duration(*ss.settings.QueryTimeout)*time.Second, - *ss.settings.Trace) - if ss.DriverName() == model.DatabaseDriverMysql { - ss.searchReplicaXs[i].MapperFunc(noOpMapper) - } - if ss.metrics != nil { - ss.metrics.RegisterDBCollector(ss.searchReplicaXs[i].DB.DB, "searchreplica-"+strconv.Itoa(i)) + ss.searchReplicaXs[i] = &atomic.Pointer[sqlxDBWrapper]{} + handle, err = SetupConnection(fmt.Sprintf("search-replica-%v", i), replica, ss.settings, DBPingAttempts) + if err != nil { + // Initializing to be offline + ss.searchReplicaXs[i].Store(&sqlxDBWrapper{isOnline: &atomic.Bool{}}) + mlog.Warn("Failed to setup connection. Skipping..", mlog.String("db", fmt.Sprintf("search-replica-%v", i)), mlog.Err(err)) + continue } + ss.setDB(ss.searchReplicaXs[i], handle, "searchreplica-"+strconv.Itoa(i)) } } @@ -347,9 +358,14 @@ func (ss *SqlStore) initConnection() { if src.DataSource == nil { continue } - ss.replicaLagHandles[i] = SetupConnection(fmt.Sprintf(replicaLagPrefix+"-%d", i), *src.DataSource, ss.settings) + ss.replicaLagHandles[i], err = SetupConnection(fmt.Sprintf(replicaLagPrefix+"-%d", i), *src.DataSource, ss.settings, DBPingAttempts) + if err != nil { + mlog.Warn("Failed to setup replica lag handle. Skipping..", mlog.String("db", fmt.Sprintf(replicaLagPrefix+"-%d", i)), mlog.Err(err)) + continue + } } } + return nil } func (ss *SqlStore) DriverName() string { @@ -455,8 +471,15 @@ func (ss *SqlStore) GetSearchReplicaX() *sqlxDBWrapper { return ss.GetReplicaX() } - rrNum := atomic.AddInt64(&ss.srCounter, 1) % int64(len(ss.searchReplicaXs)) - return ss.searchReplicaXs[rrNum] + for i := 0; i < len(ss.searchReplicaXs); i++ { + rrNum := atomic.AddInt64(&ss.srCounter, 1) % int64(len(ss.searchReplicaXs)) + if ss.searchReplicaXs[rrNum].Load().Online() { + return ss.searchReplicaXs[rrNum].Load() + } + } + + // If all search replicas are down, then go with replica. + return ss.GetReplicaX() } func (ss *SqlStore) GetReplicaX() *sqlxDBWrapper { @@ -464,23 +487,64 @@ func (ss *SqlStore) GetReplicaX() *sqlxDBWrapper { return ss.GetMasterX() } - rrNum := atomic.AddInt64(&ss.rrCounter, 1) % int64(len(ss.ReplicaXs)) - return ss.ReplicaXs[rrNum] -} - -func (ss *SqlStore) GetInternalReplicaDBs() []*sql.DB { - if len(ss.settings.DataSourceReplicas) == 0 || ss.lockedToMaster || !ss.hasLicense() { - return []*sql.DB{ - ss.GetMasterX().DB.DB, + for i := 0; i < len(ss.ReplicaXs); i++ { + rrNum := atomic.AddInt64(&ss.rrCounter, 1) % int64(len(ss.ReplicaXs)) + if ss.ReplicaXs[rrNum].Load().Online() { + return ss.ReplicaXs[rrNum].Load() } } - dbs := make([]*sql.DB, len(ss.ReplicaXs)) - for i, rx := range ss.ReplicaXs { - dbs[i] = rx.DB.DB - } + // If all replicas are down, then go with master. + return ss.GetMasterX() +} - return dbs +func (ss *SqlStore) monitorReplicas() { + t := time.NewTicker(time.Duration(*ss.settings.ReplicaMonitorIntervalSeconds) * time.Second) + defer func() { + t.Stop() + ss.wgMonitor.Done() + }() + for { + select { + case <-ss.quitMonitor: + return + case <-t.C: + setupReplica := func(r *atomic.Pointer[sqlxDBWrapper], dsn, name string) { + if r.Load().Online() { + return + } + + handle, err := SetupConnection(name, dsn, ss.settings, 1) + if err != nil { + mlog.Warn("Failed to setup connection. Skipping..", mlog.String("db", name), mlog.Err(err)) + return + } + if ss.metrics != nil && r.Load() != nil && r.Load().DB != nil { + ss.metrics.UnregisterDBCollector(r.Load().DB.DB, name) + } + ss.setDB(r, handle, name) + } + for i, replica := range ss.ReplicaXs { + setupReplica(replica, ss.settings.DataSourceReplicas[i], "replica-"+strconv.Itoa(i)) + } + + for i, replica := range ss.searchReplicaXs { + setupReplica(replica, ss.settings.DataSourceSearchReplicas[i], "search-replica-"+strconv.Itoa(i)) + } + } + } +} + +func (ss *SqlStore) setDB(replica *atomic.Pointer[sqlxDBWrapper], handle *dbsql.DB, name string) { + replica.Store(newSqlxDBWrapper(sqlx.NewDb(handle, ss.DriverName()), + time.Duration(*ss.settings.QueryTimeout)*time.Second, + *ss.settings.Trace)) + if ss.DriverName() == model.DatabaseDriverMysql { + replica.Load().MapperFunc(noOpMapper) + } + if ss.metrics != nil { + ss.metrics.RegisterDBCollector(replica.Load().DB.DB, name) + } } func (ss *SqlStore) GetInternalReplicaDB() *sql.DB { @@ -489,7 +553,7 @@ func (ss *SqlStore) GetInternalReplicaDB() *sql.DB { } rrNum := atomic.AddInt64(&ss.rrCounter, 1) % int64(len(ss.ReplicaXs)) - return ss.ReplicaXs[rrNum].DB.DB + return ss.ReplicaXs[rrNum].Load().DB.DB } func (ss *SqlStore) TotalMasterDbConnections() int { @@ -541,7 +605,10 @@ func (ss *SqlStore) TotalReadDbConnections() int { count := 0 for _, db := range ss.ReplicaXs { - count = count + db.Stats().OpenConnections + if !db.Load().Online() { + continue + } + count = count + db.Load().Stats().OpenConnections } return count @@ -554,7 +621,10 @@ func (ss *SqlStore) TotalSearchDbConnections() int { count := 0 for _, db := range ss.searchReplicaXs { - count = count + db.Stats().OpenConnections + if !db.Load().Online() { + continue + } + count = count + db.Load().Stats().OpenConnections } return count @@ -782,9 +852,14 @@ func IsUniqueConstraintError(err error, indexName []string) bool { } func (ss *SqlStore) GetAllConns() []*sqlxDBWrapper { - all := make([]*sqlxDBWrapper, len(ss.ReplicaXs)+1) - copy(all, ss.ReplicaXs) - all[len(ss.ReplicaXs)] = ss.masterX + all := make([]*sqlxDBWrapper, 0, len(ss.ReplicaXs)+1) + for i := range ss.ReplicaXs { + if !ss.ReplicaXs[i].Load().Online() { + continue + } + all = append(all, ss.ReplicaXs[i].Load()) + } + all = append(all, ss.masterX) return all } @@ -807,11 +882,24 @@ func (ss *SqlStore) RecycleDBConnections(d time.Duration) { func (ss *SqlStore) Close() { ss.masterX.Close() + // Closing monitor and waiting for it to be done. + // This needs to be done before closing the replica handles. + close(ss.quitMonitor) + ss.wgMonitor.Wait() + for _, replica := range ss.ReplicaXs { - replica.Close() + if replica.Load().Online() { + replica.Load().Close() + } } for _, replica := range ss.searchReplicaXs { + if replica.Load().Online() { + replica.Load().Close() + } + } + + for _, replica := range ss.replicaLagHandles { replica.Close() } } @@ -1132,7 +1220,10 @@ func (ss *SqlStore) migrate(direction migrationDirection) error { if err != nil { return err } - db := SetupConnection("master", dataSource, ss.settings) + db, err2 := SetupConnection("master", dataSource, ss.settings, DBPingAttempts) + if err2 != nil { + return err2 + } driver, err = ms.WithInstance(db) defer db.Close() case model.DatabaseDriverPostgres: diff --git a/server/channels/store/sqlstore/store_test.go b/server/channels/store/sqlstore/store_test.go index c218fa205d..699ee53e98 100644 --- a/server/channels/store/sqlstore/store_test.go +++ b/server/channels/store/sqlstore/store_test.go @@ -761,13 +761,15 @@ func TestReplicaLagQuery(t *testing.T) { mockMetrics.On("RegisterDBCollector", mock.AnythingOfType("*sql.DB"), "master") store := &SqlStore{ - rrCounter: 0, - srCounter: 0, - settings: settings, - metrics: mockMetrics, + rrCounter: 0, + srCounter: 0, + settings: settings, + metrics: mockMetrics, + quitMonitor: make(chan struct{}), + wgMonitor: &sync.WaitGroup{}, } - store.initConnection() + require.NoError(t, store.initConnection()) store.stores.post = newSqlPostStore(store, mockMetrics) err = store.migrate(migrationsDirectionUp) require.NoError(t, err) @@ -839,9 +841,11 @@ func TestMySQLReadTimeout(t *testing.T) { settings.DataSource = &dataSource store := &SqlStore{ - settings: settings, + settings: settings, + quitMonitor: make(chan struct{}), + wgMonitor: &sync.WaitGroup{}, } - store.initConnection() + require.NoError(t, store.initConnection()) defer store.Close() _, err = store.GetMasterX().ExecNoTimeout(`SELECT SLEEP(3)`) diff --git a/server/channels/store/store.go b/server/channels/store/store.go index 7da24fd24c..20af689736 100644 --- a/server/channels/store/store.go +++ b/server/channels/store/store.go @@ -72,10 +72,7 @@ type Store interface { // GetInternalMasterDB allows access to the raw master DB // handle for the multi-product architecture. GetInternalMasterDB() *sql.DB - // GetInternalReplicaDBs allows access to the raw replica DB - // handles for the multi-product architecture. GetInternalReplicaDB() *sql.DB - GetInternalReplicaDBs() []*sql.DB TotalMasterDbConnections() int TotalReadDbConnections() int TotalSearchDbConnections() int diff --git a/server/channels/store/storetest/mocks/Store.go b/server/channels/store/storetest/mocks/Store.go index bb06fb9005..bca15c95e0 100644 --- a/server/channels/store/storetest/mocks/Store.go +++ b/server/channels/store/storetest/mocks/Store.go @@ -346,22 +346,6 @@ func (_m *Store) GetInternalReplicaDB() *sql.DB { return r0 } -// GetInternalReplicaDBs provides a mock function with given fields: -func (_m *Store) GetInternalReplicaDBs() []*sql.DB { - ret := _m.Called() - - var r0 []*sql.DB - if rf, ok := ret.Get(0).(func() []*sql.DB); ok { - r0 = rf() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]*sql.DB) - } - } - - return r0 -} - // Group provides a mock function with given fields: func (_m *Store) Group() store.GroupStore { ret := _m.Called() diff --git a/server/channels/store/storetest/settings.go b/server/channels/store/storetest/settings.go index a1253f28bb..0104b950bb 100644 --- a/server/channels/store/storetest/settings.go +++ b/server/channels/store/storetest/settings.go @@ -261,6 +261,7 @@ func MakeSqlSettings(driver string, withReplica bool) *model.SqlSettings { } log("Created temporary " + driver + " database " + dbName) + settings.ReplicaMonitorIntervalSeconds = model.NewInt(5) return settings } diff --git a/server/channels/testlib/helper.go b/server/channels/testlib/helper.go index f74a562568..f6a1b22531 100644 --- a/server/channels/testlib/helper.go +++ b/server/channels/testlib/helper.go @@ -331,7 +331,7 @@ func (h *MainHelper) SetReplicationLagForTesting(seconds int) error { func (h *MainHelper) execOnEachReplica(query string, args ...any) error { for _, replica := range h.SQLStore.ReplicaXs { - _, err := replica.Exec(query, args...) + _, err := replica.Load().Exec(query, args...) if err != nil { return err } diff --git a/server/model/config.go b/server/model/config.go index f278c97cdf..af4341bdfa 100644 --- a/server/model/config.go +++ b/server/model/config.go @@ -1173,6 +1173,7 @@ type SqlSettings struct { DisableDatabaseSearch *bool `access:"environment_database,write_restrictable,cloud_restrictable"` MigrationsStatementTimeoutSeconds *int `access:"environment_database,write_restrictable,cloud_restrictable"` ReplicaLagSettings []*ReplicaLagSettings `access:"environment_database,write_restrictable,cloud_restrictable"` // telemetry: none + ReplicaMonitorIntervalSeconds *int `access:"environment_database,write_restrictable,cloud_restrictable"` } func (s *SqlSettings) SetDefaults(isUpdate bool) { @@ -1237,6 +1238,10 @@ func (s *SqlSettings) SetDefaults(isUpdate bool) { if s.ReplicaLagSettings == nil { s.ReplicaLagSettings = []*ReplicaLagSettings{} } + + if s.ReplicaMonitorIntervalSeconds == nil { + s.ReplicaMonitorIntervalSeconds = NewInt(5) + } } type LogSettings struct { diff --git a/server/platform/services/telemetry/telemetry.go b/server/platform/services/telemetry/telemetry.go index d4da4770bc..4fdbdf51ec 100644 --- a/server/platform/services/telemetry/telemetry.go +++ b/server/platform/services/telemetry/telemetry.go @@ -522,6 +522,7 @@ func (ts *TelemetryService) trackConfig() { "query_timeout": *cfg.SqlSettings.QueryTimeout, "disable_database_search": *cfg.SqlSettings.DisableDatabaseSearch, "migrations_statement_timeout_seconds": *cfg.SqlSettings.MigrationsStatementTimeoutSeconds, + "replica_monitor_interval_seconds": *cfg.SqlSettings.ReplicaMonitorIntervalSeconds, }) ts.SendTelemetry(TrackConfigLog, map[string]any{