diff --git a/einterfaces/metrics.go b/einterfaces/metrics.go index 565f2374a7..b1fcb19bbb 100644 --- a/einterfaces/metrics.go +++ b/einterfaces/metrics.go @@ -68,4 +68,7 @@ type MetricsInterface interface { IncrementJobActive(jobType string) DecrementJobActive(jobType string) + + SetReplicaLagAbsolute(node string, value float64) + SetReplicaLagTime(node string, value float64) } diff --git a/einterfaces/mocks/MetricsInterface.go b/einterfaces/mocks/MetricsInterface.go index 20480850db..ca9d0e79ec 100644 --- a/einterfaces/mocks/MetricsInterface.go +++ b/einterfaces/mocks/MetricsInterface.go @@ -264,3 +264,13 @@ func (_m *MetricsInterface) ObserveStoreMethodDuration(method string, success st func (_m *MetricsInterface) Register() { _m.Called() } + +// SetReplicaLagAbsolute provides a mock function with given fields: node, value +func (_m *MetricsInterface) SetReplicaLagAbsolute(node string, value float64) { + _m.Called(node, value) +} + +// SetReplicaLagTime provides a mock function with given fields: node, value +func (_m *MetricsInterface) SetReplicaLagTime(node string, value float64) { + _m.Called(node, value) +} diff --git a/go.tools.mod b/go.tools.mod index 95a691741e..f3192f8bd6 100644 --- a/go.tools.mod +++ b/go.tools.mod @@ -7,7 +7,7 @@ require ( github.com/golang-migrate/migrate/v4 v4.14.1 // indirect github.com/jstemmer/go-junit-report v0.9.1 // indirect github.com/jteeuwen/go-bindata v3.0.7+incompatible // indirect - github.com/mattermost/mattermost-utilities/mmgotool v0.0.0-20210218104610-40d7640e8538 // indirect + github.com/mattermost/mattermost-utilities/mmgotool v0.0.0-20210309083648-c1e5575135f9 // indirect github.com/philhofer/fwd v1.0.0 // indirect github.com/reflog/struct2interface v0.6.1 // indirect github.com/spf13/cobra v1.1.3 // indirect diff --git a/go.tools.sum b/go.tools.sum index 1250b65ffd..147133a7bc 100644 --- a/go.tools.sum +++ b/go.tools.sum @@ -342,6 +342,8 @@ github.com/mattermost/mattermost-utilities/mmgotool v0.0.0-20210205183519-d9e542 github.com/mattermost/mattermost-utilities/mmgotool v0.0.0-20210205183519-d9e542a75ab2/go.mod h1:3gKozJI8n2Y/vW37GfnFWAdehGXe5yZlt+HykK6Y3DM= github.com/mattermost/mattermost-utilities/mmgotool v0.0.0-20210218104610-40d7640e8538 h1:7G1fAVBGjrQaKqXnxRjVYMJfuiE6iVMdt9YhBwe2enM= github.com/mattermost/mattermost-utilities/mmgotool v0.0.0-20210218104610-40d7640e8538/go.mod h1:3gKozJI8n2Y/vW37GfnFWAdehGXe5yZlt+HykK6Y3DM= +github.com/mattermost/mattermost-utilities/mmgotool v0.0.0-20210309083648-c1e5575135f9 h1:EdA8k1LBxdk1SslBITXYiGVIptfPWFt7fRwxiy2BsTk= +github.com/mattermost/mattermost-utilities/mmgotool v0.0.0-20210309083648-c1e5575135f9/go.mod h1:3gKozJI8n2Y/vW37GfnFWAdehGXe5yZlt+HykK6Y3DM= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.1 h1:G1f5SKeVxmagw/IyvzvtZE4Gybcc4Tr1tf7I8z0XgOg= github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= diff --git a/i18n/en.json b/i18n/en.json index a681f198f8..8a4aa12c31 100644 --- a/i18n/en.json +++ b/i18n/en.json @@ -7694,6 +7694,10 @@ "id": "model.config.is_valid.read_timeout.app_error", "translation": "Invalid value for read timeout." }, + { + "id": "model.config.is_valid.replica_mismatch.app_error", + "translation": "Number of replica lag queries is more than the number of replicas configured. Must be less than or equal to the number of replicas." + }, { "id": "model.config.is_valid.restrict_direct_message.app_error", "translation": "Invalid direct message restriction. Must be 'any', or 'team'." diff --git a/model/config.go b/model/config.go index d7364cda15..d0e1adab02 100644 --- a/model/config.go +++ b/model/config.go @@ -1104,19 +1104,26 @@ func (s *Office365Settings) SSOSettings() *SSOSettings { return &ssoSettings } +type ReplicaLagSettings struct { + DataSource *string `access:"environment,write_restrictable,cloud_restrictable"` // telemetry: none + QueryAbsoluteLag *string `access:"environment,write_restrictable,cloud_restrictable"` // telemetry: none + QueryTimeLag *string `access:"environment,write_restrictable,cloud_restrictable"` // telemetry: none +} + type SqlSettings struct { - DriverName *string `access:"environment,write_restrictable,cloud_restrictable"` - DataSource *string `access:"environment,write_restrictable,cloud_restrictable"` // telemetry: none - DataSourceReplicas []string `access:"environment,write_restrictable,cloud_restrictable"` - DataSourceSearchReplicas []string `access:"environment,write_restrictable,cloud_restrictable"` - MaxIdleConns *int `access:"environment,write_restrictable,cloud_restrictable"` - ConnMaxLifetimeMilliseconds *int `access:"environment,write_restrictable,cloud_restrictable"` - ConnMaxIdleTimeMilliseconds *int `access:"environment,write_restrictable,cloud_restrictable"` - MaxOpenConns *int `access:"environment,write_restrictable,cloud_restrictable"` - Trace *bool `access:"environment,write_restrictable,cloud_restrictable"` - AtRestEncryptKey *string `access:"environment,write_restrictable,cloud_restrictable"` // telemetry: none - QueryTimeout *int `access:"environment,write_restrictable,cloud_restrictable"` - DisableDatabaseSearch *bool `access:"environment,write_restrictable,cloud_restrictable"` + DriverName *string `access:"environment,write_restrictable,cloud_restrictable"` + DataSource *string `access:"environment,write_restrictable,cloud_restrictable"` // telemetry: none + DataSourceReplicas []string `access:"environment,write_restrictable,cloud_restrictable"` + DataSourceSearchReplicas []string `access:"environment,write_restrictable,cloud_restrictable"` + MaxIdleConns *int `access:"environment,write_restrictable,cloud_restrictable"` + ConnMaxLifetimeMilliseconds *int `access:"environment,write_restrictable,cloud_restrictable"` + ConnMaxIdleTimeMilliseconds *int `access:"environment,write_restrictable,cloud_restrictable"` + MaxOpenConns *int `access:"environment,write_restrictable,cloud_restrictable"` + Trace *bool `access:"environment,write_restrictable,cloud_restrictable"` + AtRestEncryptKey *string `access:"environment,write_restrictable,cloud_restrictable"` // telemetry: none + QueryTimeout *int `access:"environment,write_restrictable,cloud_restrictable"` + DisableDatabaseSearch *bool `access:"environment,write_restrictable,cloud_restrictable"` + ReplicaLagSettings []*ReplicaLagSettings `access:"environment,write_restrictable,cloud_restrictable"` // telemetry: none } func (s *SqlSettings) SetDefaults(isUpdate bool) { @@ -1173,6 +1180,10 @@ func (s *SqlSettings) SetDefaults(isUpdate bool) { if s.DisableDatabaseSearch == nil { s.DisableDatabaseSearch = NewBool(false) } + + if s.ReplicaLagSettings == nil { + s.ReplicaLagSettings = []*ReplicaLagSettings{} + } } type LogSettings struct { @@ -3356,6 +3367,10 @@ func (s *SqlSettings) isValid() *AppError { return NewAppError("Config.IsValid", "model.config.is_valid.sql_max_conn.app_error", nil, "", http.StatusBadRequest) } + if len(s.ReplicaLagSettings) > len(s.DataSourceReplicas) { + return NewAppError("Config.IsValid", "model.config.is_valid.replica_mismatch.app_error", nil, "", http.StatusBadRequest) + } + return nil } diff --git a/store/sqlstore/store.go b/store/sqlstore/store.go index 4229409a70..0340f410a1 100644 --- a/store/sqlstore/store.go +++ b/store/sqlstore/store.go @@ -58,6 +58,8 @@ const ( migrationsDirectionUp migrationDirection = "up" migrationsDirectionDown migrationDirection = "down" + + replicaLagPrefix = "replica-lag" ) const ( @@ -130,17 +132,19 @@ type SqlStoreStores struct { type SqlStore struct { // rrCounter and srCounter should be kept first. // See https://github.com/mattermost/mattermost-server/v5/pull/7281 - rrCounter int64 - srCounter int64 - master *gorp.DbMap - Replicas []*gorp.DbMap - searchReplicas []*gorp.DbMap - stores SqlStoreStores - settings *model.SqlSettings - lockedToMaster bool - context context.Context - license *model.License - licenseMutex sync.RWMutex + rrCounter int64 + srCounter int64 + master *gorp.DbMap + Replicas []*gorp.DbMap + searchReplicas []*gorp.DbMap + replicaLagHandles []*dbsql.DB + stores SqlStoreStores + settings *model.SqlSettings + lockedToMaster bool + context context.Context + license *model.License + licenseMutex sync.RWMutex + metrics einterfaces.MetricsInterface } type TraceOnAdapter struct{} @@ -164,6 +168,7 @@ func New(settings model.SqlSettings, metrics einterfaces.MetricsInterface) *SqlS rrCounter: 0, srCounter: 0, settings: &settings, + metrics: metrics, } store.initConnection() @@ -287,8 +292,20 @@ func setupConnection(connType string, dataSource string, settings *model.SqlSett } } - db.SetMaxIdleConns(*settings.MaxIdleConns) - db.SetMaxOpenConns(*settings.MaxOpenConns) + if strings.HasPrefix(connType, replicaLagPrefix) { + // If this is a replica lag connection, we just open one connection. + // + // Arguably, if the query doesn't require a special credential, it does take up + // one extra connection from the replica DB. But falling back to the replica + // data source when the replica lag data source is null implies an ordering constraint + // which makes things brittle and is not a good design. + // If connections are an overhead, it is advised to use a connection pool. + db.SetMaxOpenConns(1) + db.SetMaxIdleConns(1) + } else { + db.SetMaxIdleConns(*settings.MaxIdleConns) + db.SetMaxOpenConns(*settings.MaxOpenConns) + } db.SetConnMaxLifetime(time.Duration(*settings.ConnMaxLifetimeMilliseconds) * time.Millisecond) db.SetConnMaxIdleTime(time.Duration(*settings.ConnMaxIdleTimeMilliseconds) * time.Millisecond) @@ -337,6 +354,17 @@ func (ss *SqlStore) initConnection() { ss.searchReplicas[i] = setupConnection(fmt.Sprintf("search-replica-%v", i), replica, ss.settings) } } + + if len(ss.settings.ReplicaLagSettings) > 0 { + ss.replicaLagHandles = make([]*dbsql.DB, len(ss.settings.ReplicaLagSettings)) + for i, src := range ss.settings.ReplicaLagSettings { + if src.DataSource == nil { + continue + } + gorpConn := setupConnection(fmt.Sprintf(replicaLagPrefix+"-%d", i), *src.DataSource, ss.settings) + ss.replicaLagHandles[i] = gorpConn.Db + } + } } func (ss *SqlStore) DriverName() string { @@ -410,6 +438,44 @@ func (ss *SqlStore) TotalMasterDbConnections() int { return ss.GetMaster().Db.Stats().OpenConnections } +// ReplicaLagAbs queries all the replica databases to get the absolute replica lag value +// and updates the Prometheus metric with it. +func (ss *SqlStore) ReplicaLagAbs() error { + for i, item := range ss.settings.ReplicaLagSettings { + if item.QueryAbsoluteLag == nil || *item.QueryAbsoluteLag == "" { + continue + } + var binDiff float64 + var node string + err := ss.replicaLagHandles[i].QueryRow(*item.QueryAbsoluteLag).Scan(&node, &binDiff) + if err != nil { + return err + } + // There is no nil check needed here because it's called from the metrics store. + ss.metrics.SetReplicaLagAbsolute(node, binDiff) + } + return nil +} + +// ReplicaLagAbs queries all the replica databases to get the time-based replica lag value +// and updates the Prometheus metric with it. +func (ss *SqlStore) ReplicaLagTime() error { + for i, item := range ss.settings.ReplicaLagSettings { + if item.QueryTimeLag == nil || *item.QueryTimeLag == "" { + continue + } + var timeDiff float64 + var node string + err := ss.replicaLagHandles[i].QueryRow(*item.QueryTimeLag).Scan(&node, &timeDiff) + if err != nil { + return err + } + // There is no nil check needed here because it's called from the metrics store. + ss.metrics.SetReplicaLagTime(node, timeDiff) + } + return nil +} + func (ss *SqlStore) TotalReadDbConnections() int { if len(ss.settings.DataSourceReplicas) == 0 { return 0 diff --git a/store/sqlstore/store_test.go b/store/sqlstore/store_test.go index 77f9778143..720763b71c 100644 --- a/store/sqlstore/store_test.go +++ b/store/sqlstore/store_test.go @@ -17,6 +17,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/mattermost/mattermost-server/v5/einterfaces/mocks" "github.com/mattermost/mattermost-server/v5/model" "github.com/mattermost/mattermost-server/v5/store" "github.com/mattermost/mattermost-server/v5/store/searchtest" @@ -246,7 +247,6 @@ func TestGetReplica(t *testing.T) { for _, testCase := range testCases { testCase := testCase t.Run(testCase.Description+" with license", func(t *testing.T) { - t.Parallel() settings := makeSqlSettings(model.DATABASE_DRIVER_POSTGRES) dataSourceReplicas := []string{} @@ -317,7 +317,6 @@ func TestGetReplica(t *testing.T) { }) t.Run(testCase.Description+" without license", func(t *testing.T) { - t.Parallel() settings := makeSqlSettings(model.DATABASE_DRIVER_POSTGRES) dataSourceReplicas := []string{} @@ -556,6 +555,58 @@ func TestVersionString(t *testing.T) { } } +func TestReplicaLagQuery(t *testing.T) { + testDrivers := []string{ + model.DATABASE_DRIVER_POSTGRES, + model.DATABASE_DRIVER_MYSQL, + } + + for _, driver := range testDrivers { + settings := makeSqlSettings(driver) + var query string + var tableName string + // Just any random query which returns a row in (string, int) format. + switch driver { + case model.DATABASE_DRIVER_POSTGRES: + query = `SELECT relname, count(relname) FROM pg_class WHERE relname='posts' GROUP BY relname` + tableName = "posts" + case model.DATABASE_DRIVER_MYSQL: + query = `SELECT table_name, count(table_name) FROM information_schema.tables WHERE table_name='Posts' and table_schema=Database() GROUP BY table_name` + tableName = "Posts" + } + + settings.ReplicaLagSettings = []*model.ReplicaLagSettings{{ + DataSource: model.NewString(*settings.DataSource), + QueryAbsoluteLag: model.NewString(query), + QueryTimeLag: model.NewString(query), + }} + + mockMetrics := &mocks.MetricsInterface{} + defer mockMetrics.AssertExpectations(t) + mockMetrics.On("SetReplicaLagAbsolute", tableName, float64(1)) + mockMetrics.On("SetReplicaLagTime", tableName, float64(1)) + + store := &SqlStore{ + rrCounter: 0, + srCounter: 0, + settings: settings, + metrics: mockMetrics, + } + + store.initConnection() + store.stores.post = newSqlPostStore(store, mockMetrics) + err := store.GetMaster().CreateTablesIfNotExists() + require.NoError(t, err) + + defer store.Close() + + err = store.ReplicaLagAbs() + require.NoError(t, err) + err = store.ReplicaLagTime() + require.NoError(t, err) + } +} + func makeSqlSettings(driver string) *model.SqlSettings { switch driver { case model.DATABASE_DRIVER_POSTGRES: diff --git a/store/store.go b/store/store.go index 2c61b2aae6..d702363f8f 100644 --- a/store/store.go +++ b/store/store.go @@ -65,6 +65,8 @@ type Store interface { TotalMasterDbConnections() int TotalReadDbConnections() int TotalSearchDbConnections() int + ReplicaLagTime() error + ReplicaLagAbs() error CheckIntegrity() <-chan model.IntegrityCheckResult SetContext(context context.Context) Context() context.Context diff --git a/store/storetest/mocks/Store.go b/store/storetest/mocks/Store.go index 5ac8b24f46..b3da958e01 100644 --- a/store/storetest/mocks/Store.go +++ b/store/storetest/mocks/Store.go @@ -432,6 +432,34 @@ func (_m *Store) RecycleDBConnections(d time.Duration) { _m.Called(d) } +// ReplicaLagAbs provides a mock function with given fields: +func (_m *Store) ReplicaLagAbs() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// ReplicaLagTime provides a mock function with given fields: +func (_m *Store) ReplicaLagTime() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + // Role provides a mock function with given fields: func (_m *Store) Role() store.RoleStore { ret := _m.Called() diff --git a/store/storetest/store.go b/store/storetest/store.go index 78537a1e69..4d6171e755 100644 --- a/store/storetest/store.go +++ b/store/storetest/store.go @@ -105,6 +105,8 @@ func (s *Store) GetCurrentSchemaVersion() string { return "" } func (s *Store) CheckIntegrity() <-chan model.IntegrityCheckResult { return make(chan model.IntegrityCheckResult) } +func (s *Store) ReplicaLagAbs() error { return nil } +func (s *Store) ReplicaLagTime() error { return nil } func (s *Store) AssertExpectations(t mock.TestingT) bool { return mock.AssertExpectationsForObjects(t,