MM-33818: Add replica lag metric (#17148)

* MM-33818: Add replica lag metric

We add two new metrics for monitoring replica lag:
- Monitor absolute lag based on binlog distance/transaction queue length.
- Monitor time taken for the replica to catch up.

To achieve this, we add a config setting to run a user defined SQL query
on the database.

We need to specify a separate datasource field as part of the config because
in some databases, querying the replica lag value requires elevated credentials
which are not needed for usual running of the application, and can even be a security risk.

Arguably, a peculiar part of the design is the requirement of the query output to be in a (node, value)
format. But since from the application, the SQL query is a black box and the user can set any query
they want, we cannot, in any way templatize this.

And as an extra note, freno also does it in a similar way.

The last bit is because we need to have a separate datasources, now we consume one extra connection
rather than sharing it with the pool. This is an unfortunate result of the design, and while one extra
connection doesn't make much of a difference in a single-tenant scenario. It does make so, in a multi-tenant scenario.
But in a multi-tenant scenario, the expectation would already be to use a connection pool. So this
is not a big concern.

https://mattermost.atlassian.net/browse/MM-33818

```release-note
Two new gauge metrics were added:
mattermost_db_replica_lag_abs and mattermost_db_replica_lag_time, both
containing a label of "node", signifying which db host is the metric from.

These metrics signify the replica lag in absolute terms and in the time dimension
capturing the whole picture of replica lag.

To use these metrics, a separate config section ReplicaLagSettings was added
under SqlSettings. This is an array of maps which contain three keys: DataSource,
QueryAbsoluteLag, and QueryTimeLag. Each map entry is for a single replica instance.

DataSource contains the DB credentials to connect to the replica instance.

QueryAbsoluteLag is a plain SQL query that must return a single row of which the first column
must be the node value of the Prometheus metric, and the second column must be the value of the lag.

QueryTimeLag is the same as above, but used to measure the time lag.

As an example, for AWS Aurora instances, the QueryAbsoluteLag can be:

select server_id, highest_lsn_rcvd-durable_lsn as bindiff from aurora_global_db_instance_status() where server_id=<>

and QueryTimeLag can be:

select server_id, visibility_lag_in_msec from aurora_global_db_instance_status() where server_id=<>

For MySQL Group Replication, the absolute lag can be measured from the number of pending transactions
in the applier queue:

select member_id, count_transaction_remote_in_applier_queue FROM performance_schema.replication_group_member_stats where member_id=<>

Overall, what query to choose is left to the administrator, and depending on the database and need, an appropriate
query can be chosen.
```

* Trigger CI

* Fix tests

* address review comments

* Remove t.Parallel

It was spawning too many connections,
and overloading the docker container.

Co-authored-by: Mattermod <mattermod@users.noreply.github.com>
This commit is contained in:
Agniva De Sarker
2021-03-29 21:35:24 +05:30
committed by GitHub
parent 9d997dbbde
commit 7b1dee4936
11 changed files with 211 additions and 28 deletions

View File

@@ -68,4 +68,7 @@ type MetricsInterface interface {
IncrementJobActive(jobType string)
DecrementJobActive(jobType string)
SetReplicaLagAbsolute(node string, value float64)
SetReplicaLagTime(node string, value float64)
}

View File

@@ -264,3 +264,13 @@ func (_m *MetricsInterface) ObserveStoreMethodDuration(method string, success st
func (_m *MetricsInterface) Register() {
_m.Called()
}
// SetReplicaLagAbsolute provides a mock function with given fields: node, value
func (_m *MetricsInterface) SetReplicaLagAbsolute(node string, value float64) {
_m.Called(node, value)
}
// SetReplicaLagTime provides a mock function with given fields: node, value
func (_m *MetricsInterface) SetReplicaLagTime(node string, value float64) {
_m.Called(node, value)
}

View File

@@ -7,7 +7,7 @@ require (
github.com/golang-migrate/migrate/v4 v4.14.1 // indirect
github.com/jstemmer/go-junit-report v0.9.1 // indirect
github.com/jteeuwen/go-bindata v3.0.7+incompatible // indirect
github.com/mattermost/mattermost-utilities/mmgotool v0.0.0-20210218104610-40d7640e8538 // indirect
github.com/mattermost/mattermost-utilities/mmgotool v0.0.0-20210309083648-c1e5575135f9 // indirect
github.com/philhofer/fwd v1.0.0 // indirect
github.com/reflog/struct2interface v0.6.1 // indirect
github.com/spf13/cobra v1.1.3 // indirect

View File

@@ -342,6 +342,8 @@ github.com/mattermost/mattermost-utilities/mmgotool v0.0.0-20210205183519-d9e542
github.com/mattermost/mattermost-utilities/mmgotool v0.0.0-20210205183519-d9e542a75ab2/go.mod h1:3gKozJI8n2Y/vW37GfnFWAdehGXe5yZlt+HykK6Y3DM=
github.com/mattermost/mattermost-utilities/mmgotool v0.0.0-20210218104610-40d7640e8538 h1:7G1fAVBGjrQaKqXnxRjVYMJfuiE6iVMdt9YhBwe2enM=
github.com/mattermost/mattermost-utilities/mmgotool v0.0.0-20210218104610-40d7640e8538/go.mod h1:3gKozJI8n2Y/vW37GfnFWAdehGXe5yZlt+HykK6Y3DM=
github.com/mattermost/mattermost-utilities/mmgotool v0.0.0-20210309083648-c1e5575135f9 h1:EdA8k1LBxdk1SslBITXYiGVIptfPWFt7fRwxiy2BsTk=
github.com/mattermost/mattermost-utilities/mmgotool v0.0.0-20210309083648-c1e5575135f9/go.mod h1:3gKozJI8n2Y/vW37GfnFWAdehGXe5yZlt+HykK6Y3DM=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-colorable v0.1.1 h1:G1f5SKeVxmagw/IyvzvtZE4Gybcc4Tr1tf7I8z0XgOg=
github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ=

View File

@@ -7694,6 +7694,10 @@
"id": "model.config.is_valid.read_timeout.app_error",
"translation": "Invalid value for read timeout."
},
{
"id": "model.config.is_valid.replica_mismatch.app_error",
"translation": "Number of replica lag queries is more than the number of replicas configured. Must be less than or equal to the number of replicas."
},
{
"id": "model.config.is_valid.restrict_direct_message.app_error",
"translation": "Invalid direct message restriction. Must be 'any', or 'team'."

View File

@@ -1104,19 +1104,26 @@ func (s *Office365Settings) SSOSettings() *SSOSettings {
return &ssoSettings
}
type ReplicaLagSettings struct {
DataSource *string `access:"environment,write_restrictable,cloud_restrictable"` // telemetry: none
QueryAbsoluteLag *string `access:"environment,write_restrictable,cloud_restrictable"` // telemetry: none
QueryTimeLag *string `access:"environment,write_restrictable,cloud_restrictable"` // telemetry: none
}
type SqlSettings struct {
DriverName *string `access:"environment,write_restrictable,cloud_restrictable"`
DataSource *string `access:"environment,write_restrictable,cloud_restrictable"` // telemetry: none
DataSourceReplicas []string `access:"environment,write_restrictable,cloud_restrictable"`
DataSourceSearchReplicas []string `access:"environment,write_restrictable,cloud_restrictable"`
MaxIdleConns *int `access:"environment,write_restrictable,cloud_restrictable"`
ConnMaxLifetimeMilliseconds *int `access:"environment,write_restrictable,cloud_restrictable"`
ConnMaxIdleTimeMilliseconds *int `access:"environment,write_restrictable,cloud_restrictable"`
MaxOpenConns *int `access:"environment,write_restrictable,cloud_restrictable"`
Trace *bool `access:"environment,write_restrictable,cloud_restrictable"`
AtRestEncryptKey *string `access:"environment,write_restrictable,cloud_restrictable"` // telemetry: none
QueryTimeout *int `access:"environment,write_restrictable,cloud_restrictable"`
DisableDatabaseSearch *bool `access:"environment,write_restrictable,cloud_restrictable"`
DriverName *string `access:"environment,write_restrictable,cloud_restrictable"`
DataSource *string `access:"environment,write_restrictable,cloud_restrictable"` // telemetry: none
DataSourceReplicas []string `access:"environment,write_restrictable,cloud_restrictable"`
DataSourceSearchReplicas []string `access:"environment,write_restrictable,cloud_restrictable"`
MaxIdleConns *int `access:"environment,write_restrictable,cloud_restrictable"`
ConnMaxLifetimeMilliseconds *int `access:"environment,write_restrictable,cloud_restrictable"`
ConnMaxIdleTimeMilliseconds *int `access:"environment,write_restrictable,cloud_restrictable"`
MaxOpenConns *int `access:"environment,write_restrictable,cloud_restrictable"`
Trace *bool `access:"environment,write_restrictable,cloud_restrictable"`
AtRestEncryptKey *string `access:"environment,write_restrictable,cloud_restrictable"` // telemetry: none
QueryTimeout *int `access:"environment,write_restrictable,cloud_restrictable"`
DisableDatabaseSearch *bool `access:"environment,write_restrictable,cloud_restrictable"`
ReplicaLagSettings []*ReplicaLagSettings `access:"environment,write_restrictable,cloud_restrictable"` // telemetry: none
}
func (s *SqlSettings) SetDefaults(isUpdate bool) {
@@ -1173,6 +1180,10 @@ func (s *SqlSettings) SetDefaults(isUpdate bool) {
if s.DisableDatabaseSearch == nil {
s.DisableDatabaseSearch = NewBool(false)
}
if s.ReplicaLagSettings == nil {
s.ReplicaLagSettings = []*ReplicaLagSettings{}
}
}
type LogSettings struct {
@@ -3356,6 +3367,10 @@ func (s *SqlSettings) isValid() *AppError {
return NewAppError("Config.IsValid", "model.config.is_valid.sql_max_conn.app_error", nil, "", http.StatusBadRequest)
}
if len(s.ReplicaLagSettings) > len(s.DataSourceReplicas) {
return NewAppError("Config.IsValid", "model.config.is_valid.replica_mismatch.app_error", nil, "", http.StatusBadRequest)
}
return nil
}

View File

@@ -58,6 +58,8 @@ const (
migrationsDirectionUp migrationDirection = "up"
migrationsDirectionDown migrationDirection = "down"
replicaLagPrefix = "replica-lag"
)
const (
@@ -130,17 +132,19 @@ type SqlStoreStores struct {
type SqlStore struct {
// rrCounter and srCounter should be kept first.
// See https://github.com/mattermost/mattermost-server/v5/pull/7281
rrCounter int64
srCounter int64
master *gorp.DbMap
Replicas []*gorp.DbMap
searchReplicas []*gorp.DbMap
stores SqlStoreStores
settings *model.SqlSettings
lockedToMaster bool
context context.Context
license *model.License
licenseMutex sync.RWMutex
rrCounter int64
srCounter int64
master *gorp.DbMap
Replicas []*gorp.DbMap
searchReplicas []*gorp.DbMap
replicaLagHandles []*dbsql.DB
stores SqlStoreStores
settings *model.SqlSettings
lockedToMaster bool
context context.Context
license *model.License
licenseMutex sync.RWMutex
metrics einterfaces.MetricsInterface
}
type TraceOnAdapter struct{}
@@ -164,6 +168,7 @@ func New(settings model.SqlSettings, metrics einterfaces.MetricsInterface) *SqlS
rrCounter: 0,
srCounter: 0,
settings: &settings,
metrics: metrics,
}
store.initConnection()
@@ -287,8 +292,20 @@ func setupConnection(connType string, dataSource string, settings *model.SqlSett
}
}
db.SetMaxIdleConns(*settings.MaxIdleConns)
db.SetMaxOpenConns(*settings.MaxOpenConns)
if strings.HasPrefix(connType, replicaLagPrefix) {
// If this is a replica lag connection, we just open one connection.
//
// Arguably, if the query doesn't require a special credential, it does take up
// one extra connection from the replica DB. But falling back to the replica
// data source when the replica lag data source is null implies an ordering constraint
// which makes things brittle and is not a good design.
// If connections are an overhead, it is advised to use a connection pool.
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)
} else {
db.SetMaxIdleConns(*settings.MaxIdleConns)
db.SetMaxOpenConns(*settings.MaxOpenConns)
}
db.SetConnMaxLifetime(time.Duration(*settings.ConnMaxLifetimeMilliseconds) * time.Millisecond)
db.SetConnMaxIdleTime(time.Duration(*settings.ConnMaxIdleTimeMilliseconds) * time.Millisecond)
@@ -337,6 +354,17 @@ func (ss *SqlStore) initConnection() {
ss.searchReplicas[i] = setupConnection(fmt.Sprintf("search-replica-%v", i), replica, ss.settings)
}
}
if len(ss.settings.ReplicaLagSettings) > 0 {
ss.replicaLagHandles = make([]*dbsql.DB, len(ss.settings.ReplicaLagSettings))
for i, src := range ss.settings.ReplicaLagSettings {
if src.DataSource == nil {
continue
}
gorpConn := setupConnection(fmt.Sprintf(replicaLagPrefix+"-%d", i), *src.DataSource, ss.settings)
ss.replicaLagHandles[i] = gorpConn.Db
}
}
}
func (ss *SqlStore) DriverName() string {
@@ -410,6 +438,44 @@ func (ss *SqlStore) TotalMasterDbConnections() int {
return ss.GetMaster().Db.Stats().OpenConnections
}
// ReplicaLagAbs queries all the replica databases to get the absolute replica lag value
// and updates the Prometheus metric with it.
func (ss *SqlStore) ReplicaLagAbs() error {
for i, item := range ss.settings.ReplicaLagSettings {
if item.QueryAbsoluteLag == nil || *item.QueryAbsoluteLag == "" {
continue
}
var binDiff float64
var node string
err := ss.replicaLagHandles[i].QueryRow(*item.QueryAbsoluteLag).Scan(&node, &binDiff)
if err != nil {
return err
}
// There is no nil check needed here because it's called from the metrics store.
ss.metrics.SetReplicaLagAbsolute(node, binDiff)
}
return nil
}
// ReplicaLagAbs queries all the replica databases to get the time-based replica lag value
// and updates the Prometheus metric with it.
func (ss *SqlStore) ReplicaLagTime() error {
for i, item := range ss.settings.ReplicaLagSettings {
if item.QueryTimeLag == nil || *item.QueryTimeLag == "" {
continue
}
var timeDiff float64
var node string
err := ss.replicaLagHandles[i].QueryRow(*item.QueryTimeLag).Scan(&node, &timeDiff)
if err != nil {
return err
}
// There is no nil check needed here because it's called from the metrics store.
ss.metrics.SetReplicaLagTime(node, timeDiff)
}
return nil
}
func (ss *SqlStore) TotalReadDbConnections() int {
if len(ss.settings.DataSourceReplicas) == 0 {
return 0

View File

@@ -17,6 +17,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/mattermost/mattermost-server/v5/einterfaces/mocks"
"github.com/mattermost/mattermost-server/v5/model"
"github.com/mattermost/mattermost-server/v5/store"
"github.com/mattermost/mattermost-server/v5/store/searchtest"
@@ -246,7 +247,6 @@ func TestGetReplica(t *testing.T) {
for _, testCase := range testCases {
testCase := testCase
t.Run(testCase.Description+" with license", func(t *testing.T) {
t.Parallel()
settings := makeSqlSettings(model.DATABASE_DRIVER_POSTGRES)
dataSourceReplicas := []string{}
@@ -317,7 +317,6 @@ func TestGetReplica(t *testing.T) {
})
t.Run(testCase.Description+" without license", func(t *testing.T) {
t.Parallel()
settings := makeSqlSettings(model.DATABASE_DRIVER_POSTGRES)
dataSourceReplicas := []string{}
@@ -556,6 +555,58 @@ func TestVersionString(t *testing.T) {
}
}
func TestReplicaLagQuery(t *testing.T) {
testDrivers := []string{
model.DATABASE_DRIVER_POSTGRES,
model.DATABASE_DRIVER_MYSQL,
}
for _, driver := range testDrivers {
settings := makeSqlSettings(driver)
var query string
var tableName string
// Just any random query which returns a row in (string, int) format.
switch driver {
case model.DATABASE_DRIVER_POSTGRES:
query = `SELECT relname, count(relname) FROM pg_class WHERE relname='posts' GROUP BY relname`
tableName = "posts"
case model.DATABASE_DRIVER_MYSQL:
query = `SELECT table_name, count(table_name) FROM information_schema.tables WHERE table_name='Posts' and table_schema=Database() GROUP BY table_name`
tableName = "Posts"
}
settings.ReplicaLagSettings = []*model.ReplicaLagSettings{{
DataSource: model.NewString(*settings.DataSource),
QueryAbsoluteLag: model.NewString(query),
QueryTimeLag: model.NewString(query),
}}
mockMetrics := &mocks.MetricsInterface{}
defer mockMetrics.AssertExpectations(t)
mockMetrics.On("SetReplicaLagAbsolute", tableName, float64(1))
mockMetrics.On("SetReplicaLagTime", tableName, float64(1))
store := &SqlStore{
rrCounter: 0,
srCounter: 0,
settings: settings,
metrics: mockMetrics,
}
store.initConnection()
store.stores.post = newSqlPostStore(store, mockMetrics)
err := store.GetMaster().CreateTablesIfNotExists()
require.NoError(t, err)
defer store.Close()
err = store.ReplicaLagAbs()
require.NoError(t, err)
err = store.ReplicaLagTime()
require.NoError(t, err)
}
}
func makeSqlSettings(driver string) *model.SqlSettings {
switch driver {
case model.DATABASE_DRIVER_POSTGRES:

View File

@@ -65,6 +65,8 @@ type Store interface {
TotalMasterDbConnections() int
TotalReadDbConnections() int
TotalSearchDbConnections() int
ReplicaLagTime() error
ReplicaLagAbs() error
CheckIntegrity() <-chan model.IntegrityCheckResult
SetContext(context context.Context)
Context() context.Context

View File

@@ -432,6 +432,34 @@ func (_m *Store) RecycleDBConnections(d time.Duration) {
_m.Called(d)
}
// ReplicaLagAbs provides a mock function with given fields:
func (_m *Store) ReplicaLagAbs() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// ReplicaLagTime provides a mock function with given fields:
func (_m *Store) ReplicaLagTime() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// Role provides a mock function with given fields:
func (_m *Store) Role() store.RoleStore {
ret := _m.Called()

View File

@@ -105,6 +105,8 @@ func (s *Store) GetCurrentSchemaVersion() string { return "" }
func (s *Store) CheckIntegrity() <-chan model.IntegrityCheckResult {
return make(chan model.IntegrityCheckResult)
}
func (s *Store) ReplicaLagAbs() error { return nil }
func (s *Store) ReplicaLagTime() error { return nil }
func (s *Store) AssertExpectations(t mock.TestingT) bool {
return mock.AssertExpectationsForObjects(t,