From cb43f4b6962fca18655b3ba634adeb4d59dc89df Mon Sep 17 00:00:00 2001 From: Alexander Akhmetov Date: Mon, 27 Jan 2025 18:47:33 +0100 Subject: [PATCH] Alerting: Add compressed protobuf-based alert state storage (#99193) --- Makefile | 1 + .../feature-toggles/index.md | 1 + go.work.sum | 4 + .../src/types/featureToggles.gen.ts | 1 + pkg/services/featuremgmt/registry.go | 7 + pkg/services/featuremgmt/toggles_gen.csv | 1 + pkg/services/featuremgmt/toggles_gen.go | 4 + pkg/services/featuremgmt/toggles_gen.json | 15 + pkg/services/ngalert/ngalert.go | 67 +++- pkg/services/ngalert/ngalert_test.go | 95 +++++ pkg/services/ngalert/state/manager_test.go | 50 +-- .../ngalert/store/instance_database.go | 24 +- .../store/instance_database_bench_test.go | 105 ++++++ .../ngalert/store/instance_database_test.go | 353 ++++++++++++++---- .../store/proto/v1/alert_rule_state.pb.go | 301 +++++++++++++++ .../store/proto/v1/alert_rule_state.proto | 24 ++ .../ngalert/store/proto/v1/buf.gen.yaml | 5 + pkg/services/ngalert/store/proto/v1/buf.yaml | 7 + .../ngalert/store/proto_instance_database.go | 261 +++++++++++++ .../store/proto_instance_database_test.go | 176 +++++++++ pkg/services/ngalert/tests/util.go | 32 +- .../sqlstore/migrations/migrations.go | 2 + .../ualert/alert_rule_state_table.go | 29 ++ .../datasource/azuremonitor/dataquery.cue | 6 +- 24 files changed, 1437 insertions(+), 134 deletions(-) create mode 100644 pkg/services/ngalert/store/instance_database_bench_test.go create mode 100644 pkg/services/ngalert/store/proto/v1/alert_rule_state.pb.go create mode 100644 pkg/services/ngalert/store/proto/v1/alert_rule_state.proto create mode 100644 pkg/services/ngalert/store/proto/v1/buf.gen.yaml create mode 100644 pkg/services/ngalert/store/proto/v1/buf.yaml create mode 100644 pkg/services/ngalert/store/proto_instance_database.go create mode 100644 pkg/services/ngalert/store/proto_instance_database_test.go create mode 100644 pkg/services/sqlstore/migrations/ualert/alert_rule_state_table.go diff --git a/Makefile b/Makefile index 3df11c69545..2721c2830b8 100644 --- a/Makefile +++ b/Makefile @@ -421,6 +421,7 @@ protobuf: ## Compile protobuf definitions buf generate pkg/plugins/backendplugin/secretsmanagerplugin --template pkg/plugins/backendplugin/secretsmanagerplugin/buf.gen.yaml buf generate pkg/storage/unified/resource --template pkg/storage/unified/resource/buf.gen.yaml buf generate pkg/services/authz/proto/v1 --template pkg/services/authz/proto/v1/buf.gen.yaml + buf generate pkg/services/ngalert/store/proto/v1 --template pkg/services/ngalert/store/proto/v1/buf.gen.yaml .PHONY: clean clean: ## Clean up intermediate build artifacts. diff --git a/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md b/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md index 68bb56787e1..3038e1624b3 100644 --- a/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md +++ b/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md @@ -191,6 +191,7 @@ Experimental features might be changed or removed without prior notice. | `tableSharedCrosshair` | Enables shared crosshair in table panel | | `kubernetesFeatureToggles` | Use the kubernetes API for feature toggle management in the frontend | | `newFolderPicker` | Enables the nested folder picker without having nested folders enabled | +| `alertingSaveStateCompressed` | Enables the compressed protobuf-based alert state storage | | `scopeApi` | In-development feature flag for the scope api using the app platform. | | `sqlExpressions` | Enables using SQL and DuckDB functions as Expressions. | | `nodeGraphDotLayout` | Changed the layout algorithm for the node graph | diff --git a/go.work.sum b/go.work.sum index 2fe6cfdaa6e..cad88eb884c 100644 --- a/go.work.sum +++ b/go.work.sum @@ -1501,13 +1501,17 @@ github.com/gorilla/handlers v1.5.1/go.mod h1:t8XrUpc4KVXb7HGyJ4/cEnwQiaxrX/hz1Zv github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/grafana/alerting v0.0.0-20250115195200-209e052dba64/go.mod h1:QsnoKX/iYZxA4Cv+H+wC7uxutBD8qi8ZW5UJvD2TYmU= github.com/grafana/authlib v0.0.0-20250120144156-d6737a7dc8f5/go.mod h1:V63rh3udd7sqXJeaG+nGUmViwVnM/bY6t8U9Tols2GU= +github.com/grafana/authlib v0.0.0-20250120145936-5f0e28e7a87c/go.mod h1:/gYfphsNu9v1qYWXxpv1NSvMEMSwvdf8qb8YlgwIRl8= github.com/grafana/authlib/types v0.0.0-20250120144156-d6737a7dc8f5/go.mod h1:qYjSd1tmJiuVoSICp7Py9/zD54O9uQQA3wuM6Gg4DFM= github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 h1:qhugDMdQ4Vp68H0tp/0iN17DM2ehRo1rLEdOFe/gB8I= github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2/go.mod h1:w/aiO1POVIeXUQyl0VQSZjl5OAGDTL5aX+4v0RA1tcw= github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak= github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90= github.com/grafana/grafana-app-sdk v0.29.0/go.mod h1:XLt308EmK6kvqPlzjUyXxbwZKEk2vur/eiypUNDay5I= +github.com/grafana/grafana/apps/advisor v0.0.0-20250121115006-c1eac9f9973f/go.mod h1:goSDiy3jtC2cp8wjpPZdUHRENcoSUHae1/Px/MDfddA= +github.com/grafana/grafana/pkg/promlib v0.0.7/go.mod h1:rnwJXCA2xRwb7F27NB35iO/JsLL/H/+eVXECk/hrEhQ= github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd/go.mod h1:M5qHK+eWfAv8VR/265dIuEpL3fNfeC21tXXp9itM24A= github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0 h1:bjh0PVYSVVFxzINqPFYJmAmJNrWPgnVjuSdYJGHmtFU= github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0/go.mod h1:7t5XR+2IA8P2qggOAHTj/GCZfoLBle3OvNSYh1VkRBU= diff --git a/packages/grafana-data/src/types/featureToggles.gen.ts b/packages/grafana-data/src/types/featureToggles.gen.ts index 683bdb2a59a..04d84ee202e 100644 --- a/packages/grafana-data/src/types/featureToggles.gen.ts +++ b/packages/grafana-data/src/types/featureToggles.gen.ts @@ -154,6 +154,7 @@ export interface FeatureToggles { jitterAlertRulesWithinGroups?: boolean; onPremToCloudMigrations?: boolean; alertingSaveStatePeriodic?: boolean; + alertingSaveStateCompressed?: boolean; scopeApi?: boolean; promQLScope?: boolean; logQLScope?: boolean; diff --git a/pkg/services/featuremgmt/registry.go b/pkg/services/featuremgmt/registry.go index b0ca965e45d..a717b3bb707 100644 --- a/pkg/services/featuremgmt/registry.go +++ b/pkg/services/featuremgmt/registry.go @@ -1027,6 +1027,13 @@ var ( FrontendOnly: false, Owner: grafanaAlertingSquad, }, + { + Name: "alertingSaveStateCompressed", + Description: "Enables the compressed protobuf-based alert state storage", + Stage: FeatureStageExperimental, + FrontendOnly: false, + Owner: grafanaAlertingSquad, + }, { Name: "scopeApi", Description: "In-development feature flag for the scope api using the app platform.", diff --git a/pkg/services/featuremgmt/toggles_gen.csv b/pkg/services/featuremgmt/toggles_gen.csv index 169683aea81..5a23134e827 100644 --- a/pkg/services/featuremgmt/toggles_gen.csv +++ b/pkg/services/featuremgmt/toggles_gen.csv @@ -135,6 +135,7 @@ newFolderPicker,experimental,@grafana/grafana-frontend-platform,false,false,true jitterAlertRulesWithinGroups,preview,@grafana/alerting-squad,false,true,false onPremToCloudMigrations,preview,@grafana/grafana-operator-experience-squad,false,false,false alertingSaveStatePeriodic,privatePreview,@grafana/alerting-squad,false,false,false +alertingSaveStateCompressed,experimental,@grafana/alerting-squad,false,false,false scopeApi,experimental,@grafana/grafana-app-platform-squad,false,false,false promQLScope,GA,@grafana/oss-big-tent,false,false,false logQLScope,privatePreview,@grafana/observability-logs,false,false,false diff --git a/pkg/services/featuremgmt/toggles_gen.go b/pkg/services/featuremgmt/toggles_gen.go index 9a82b4d86a9..6ca1cd23fef 100644 --- a/pkg/services/featuremgmt/toggles_gen.go +++ b/pkg/services/featuremgmt/toggles_gen.go @@ -551,6 +551,10 @@ const ( // Writes the state periodically to the database, asynchronous to rule evaluation FlagAlertingSaveStatePeriodic = "alertingSaveStatePeriodic" + // FlagAlertingSaveStateCompressed + // Enables the compressed protobuf-based alert state storage + FlagAlertingSaveStateCompressed = "alertingSaveStateCompressed" + // FlagScopeApi // In-development feature flag for the scope api using the app platform. FlagScopeApi = "scopeApi" diff --git a/pkg/services/featuremgmt/toggles_gen.json b/pkg/services/featuremgmt/toggles_gen.json index 23615673bd0..0ac61ba717c 100644 --- a/pkg/services/featuremgmt/toggles_gen.json +++ b/pkg/services/featuremgmt/toggles_gen.json @@ -339,6 +339,21 @@ "expression": "false" } }, + { + "metadata": { + "name": "alertingSaveStateCompressed", + "resourceVersion": "1737472824047", + "creationTimestamp": "2025-01-17T18:17:20Z", + "annotations": { + "grafana.app/updatedTimestamp": "2025-01-21 15:20:24.047499 +0000 UTC" + } + }, + "spec": { + "description": "Enables the compressed protobuf-based alert state storage", + "stage": "experimental", + "codeowner": "@grafana/alerting-squad" + } + }, { "metadata": { "name": "alertingSaveStatePeriodic", diff --git a/pkg/services/ngalert/ngalert.go b/pkg/services/ngalert/ngalert.go index e538a14c9b0..51ad5caffe7 100644 --- a/pkg/services/ngalert/ngalert.go +++ b/pkg/services/ngalert/ngalert.go @@ -144,6 +144,7 @@ type AlertNG struct { dashboardService dashboards.DashboardService Api *api.API httpClientProvider httpclient.Provider + InstanceStore state.InstanceStore // Alerting notification services MultiOrgAlertmanager *notifier.MultiOrgAlertmanager @@ -398,11 +399,14 @@ func (ng *AlertNG) init() error { if err != nil { return err } - cfg := state.ManagerCfg{ + + ng.InstanceStore = initInstanceStore(ng.store.SQLStore, ng.Log.New("ngalert.state.instancestore"), ng.FeatureToggles) + + stateManagerCfg := state.ManagerCfg{ Metrics: ng.Metrics.GetStateMetrics(), ExternalURL: appUrl, DisableExecution: !ng.Cfg.UnifiedAlerting.ExecuteAlerts, - InstanceStore: ng.store, + InstanceStore: ng.InstanceStore, Images: ng.ImageService, Clock: clk, Historian: history, @@ -415,13 +419,8 @@ func (ng *AlertNG) init() error { Log: log.New("ngalert.state.manager"), ResolvedRetention: ng.Cfg.UnifiedAlerting.ResolvedAlertRetention, } - logger := log.New("ngalert.state.manager.persist") - statePersister := state.NewSyncStatePersisiter(logger, cfg) - if ng.FeatureToggles.IsEnabledGlobally(featuremgmt.FlagAlertingSaveStatePeriodic) { - ticker := clock.New().Ticker(ng.Cfg.UnifiedAlerting.StatePeriodicSaveInterval) - statePersister = state.NewAsyncStatePersister(logger, ticker, cfg) - } - stateManager := state.NewManager(cfg, statePersister) + statePersister := initStatePersister(ng.Cfg.UnifiedAlerting, stateManagerCfg, ng.FeatureToggles) + stateManager := state.NewManager(stateManagerCfg, statePersister) scheduler := schedule.NewScheduler(schedCfg, stateManager) // if it is required to include folder title to the alerts, we need to subscribe to changes of alert title @@ -515,6 +514,54 @@ func (ng *AlertNG) init() error { return DeclareFixedRoles(ng.AccesscontrolService, ng.FeatureToggles) } +func initInstanceStore(sqlStore db.DB, logger log.Logger, featureToggles featuremgmt.FeatureToggles) state.InstanceStore { + var instanceStore state.InstanceStore + + if featureToggles.IsEnabledGlobally(featuremgmt.FlagAlertingSaveStateCompressed) { + logger.Info("Using protobuf-based alert instance store") + instanceStore = store.ProtoInstanceDBStore{ + SQLStore: sqlStore, + Logger: logger, + FeatureToggles: featureToggles, + } + + // If FlagAlertingSaveStateCompressed is enabled, ProtoInstanceDBStore is used, + // which functions differently from InstanceDBStore. FlagAlertingSaveStatePeriodic is + // not applicable to ProtoInstanceDBStore, so a warning is logged if it is set. + if featureToggles.IsEnabledGlobally(featuremgmt.FlagAlertingSaveStatePeriodic) { + logger.Warn("alertingSaveStatePeriodic is not used when alertingSaveStateCompressed feature flag enabled") + } + } else { + logger.Info("Using simple database alert instance store") + instanceStore = store.InstanceDBStore{ + SQLStore: sqlStore, + Logger: logger, + FeatureToggles: featureToggles, + } + } + + return instanceStore +} + +func initStatePersister(uaCfg setting.UnifiedAlertingSettings, cfg state.ManagerCfg, featureToggles featuremgmt.FeatureToggles) state.StatePersister { + logger := log.New("ngalert.state.manager.persist") + var statePersister state.StatePersister + + if featureToggles.IsEnabledGlobally(featuremgmt.FlagAlertingSaveStateCompressed) { + logger.Info("Using rule state persister") + statePersister = state.NewSyncRuleStatePersisiter(logger, cfg) + } else if featureToggles.IsEnabledGlobally(featuremgmt.FlagAlertingSaveStatePeriodic) { + logger.Info("Using periodic state persister") + ticker := clock.New().Ticker(uaCfg.StatePeriodicSaveInterval) + statePersister = state.NewAsyncStatePersister(logger, ticker, cfg) + } else { + logger.Info("Using sync state persister") + statePersister = state.NewSyncStatePersisiter(logger, cfg) + } + + return statePersister +} + func subscribeToFolderChanges(logger log.Logger, bus bus.Bus, dbStore api.RuleStore) { // if full path to the folder is changed, we update all alert rules in that folder to make sure that all peers (in HA mode) will update folder title and // clean up the current state @@ -553,7 +600,7 @@ func (ng *AlertNG) Run(ctx context.Context) error { // Also note that this runs synchronously to ensure state is loaded // before rule evaluation begins, hence we use ctx and not subCtx. // - ng.stateManager.Warm(ctx, ng.store, ng.store) + ng.stateManager.Warm(ctx, ng.store, ng.InstanceStore) children.Go(func() error { return ng.schedule.Run(subCtx) diff --git a/pkg/services/ngalert/ngalert_test.go b/pkg/services/ngalert/ngalert_test.go index 82542eeba71..2012ff930fc 100644 --- a/pkg/services/ngalert/ngalert_test.go +++ b/pkg/services/ngalert/ngalert_test.go @@ -14,12 +14,16 @@ import ( "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/events" + "github.com/grafana/grafana/pkg/infra/db" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/tracing" + "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/folder" acfakes "github.com/grafana/grafana/pkg/services/ngalert/accesscontrol/fakes" "github.com/grafana/grafana/pkg/services/ngalert/metrics" "github.com/grafana/grafana/pkg/services/ngalert/models" + "github.com/grafana/grafana/pkg/services/ngalert/state" + "github.com/grafana/grafana/pkg/services/ngalert/store" "github.com/grafana/grafana/pkg/services/ngalert/tests/fakes" "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/util" @@ -192,3 +196,94 @@ grafana_alerting_state_history_info{backend="noop"} 0 require.NoError(t, err) }) } + +type mockDB struct { + db.DB +} + +func TestInitInstanceStore(t *testing.T) { + sqlStore := &mockDB{} + logger := log.New() + + tests := []struct { + name string + ft featuremgmt.FeatureToggles + expectedInstanceStoreType interface{} + }{ + { + name: "Compressed flag enabled, no periodic flag", + ft: featuremgmt.WithFeatures( + featuremgmt.FlagAlertingSaveStateCompressed, + ), + expectedInstanceStoreType: store.ProtoInstanceDBStore{}, + }, + { + name: "Compressed flag enabled with periodic flag", + ft: featuremgmt.WithFeatures( + featuremgmt.FlagAlertingSaveStateCompressed, + featuremgmt.FlagAlertingSaveStatePeriodic, + ), + expectedInstanceStoreType: store.ProtoInstanceDBStore{}, + }, + { + name: "Compressed flag disabled", + ft: featuremgmt.WithFeatures(), + expectedInstanceStoreType: store.InstanceDBStore{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + instanceStore := initInstanceStore(sqlStore, logger, tt.ft) + assert.IsType(t, tt.expectedInstanceStoreType, instanceStore) + }) + } +} + +func TestInitStatePersister(t *testing.T) { + ua := setting.UnifiedAlertingSettings{ + StatePeriodicSaveInterval: 1 * time.Minute, + } + cfg := state.ManagerCfg{} + + tests := []struct { + name string + ft featuremgmt.FeatureToggles + expectedStatePersisterType state.StatePersister + }{ + { + name: "Compressed flag enabled", + ft: featuremgmt.WithFeatures( + featuremgmt.FlagAlertingSaveStateCompressed, + ), + expectedStatePersisterType: &state.SyncRuleStatePersister{}, + }, + { + name: "Periodic flag enabled", + ft: featuremgmt.WithFeatures( + featuremgmt.FlagAlertingSaveStatePeriodic, + ), + expectedStatePersisterType: &state.AsyncStatePersister{}, + }, + { + name: "No flags enabled", + ft: featuremgmt.WithFeatures(), + expectedStatePersisterType: &state.SyncStatePersister{}, + }, + { + name: "Both flags enabled - compressed takes precedence", + ft: featuremgmt.WithFeatures( + featuremgmt.FlagAlertingSaveStateCompressed, + featuremgmt.FlagAlertingSaveStatePeriodic, + ), + expectedStatePersisterType: &state.SyncRuleStatePersister{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + statePersister := initStatePersister(ua, cfg, tt.ft) + assert.IsType(t, tt.expectedStatePersisterType, statePersister) + }) + } +} diff --git a/pkg/services/ngalert/state/manager_test.go b/pkg/services/ngalert/state/manager_test.go index 59b11bce96b..8bc43fee306 100644 --- a/pkg/services/ngalert/state/manager_test.go +++ b/pkg/services/ngalert/state/manager_test.go @@ -47,7 +47,7 @@ func TestWarmStateCache(t *testing.T) { evaluationTime, err := time.Parse("2006-01-02", "2021-03-25") require.NoError(t, err) ctx := context.Background() - _, dbstore := tests.SetupTestEnv(t, 1) + ng, dbstore := tests.SetupTestEnv(t, 1) const mainOrgID int64 = 1 rule := tests.CreateTestAlertRule(t, ctx, dbstore, 600, mainOrgID) @@ -216,13 +216,13 @@ func TestWarmStateCache(t *testing.T) { ResultFingerprint: data.Fingerprint(2).String(), }) for _, instance := range instances { - _ = dbstore.SaveAlertInstance(ctx, instance) + _ = ng.InstanceStore.SaveAlertInstance(ctx, instance) } cfg := state.ManagerCfg{ Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), ExternalURL: nil, - InstanceStore: dbstore, + InstanceStore: ng.InstanceStore, Images: &state.NoopImageService{}, Clock: clock.NewMock(), Historian: &state.FakeHistorian{}, @@ -230,7 +230,7 @@ func TestWarmStateCache(t *testing.T) { Log: log.New("ngalert.state.manager"), } st := state.NewManager(cfg, state.NewNoopPersister()) - st.Warm(ctx, dbstore, dbstore) + st.Warm(ctx, dbstore, ng.InstanceStore) t.Run("instance cache has expected entries", func(t *testing.T) { for _, entry := range expectedEntries { @@ -250,7 +250,7 @@ func TestDashboardAnnotations(t *testing.T) { require.NoError(t, err) ctx := context.Background() - _, dbstore := tests.SetupTestEnv(t, 1) + ng, dbstore := tests.SetupTestEnv(t, 1) fakeAnnoRepo := annotationstest.NewFakeAnnotationsRepo() historianMetrics := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem) @@ -261,7 +261,7 @@ func TestDashboardAnnotations(t *testing.T) { cfg := state.ManagerCfg{ Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), ExternalURL: nil, - InstanceStore: dbstore, + InstanceStore: ng.InstanceStore, Images: &state.NoopImageService{}, Clock: clock.New(), Historian: hist, @@ -277,7 +277,7 @@ func TestDashboardAnnotations(t *testing.T) { "test2": "{{ $labels.instance_label }}", }) - st.Warm(ctx, dbstore, dbstore) + st.Warm(ctx, dbstore, ng.InstanceStore) bValue := float64(42) cValue := float64(1) _ = st.ProcessEvalResults(ctx, evaluationTime, rule, eval.Results{{ @@ -1697,7 +1697,7 @@ func TestStaleResultsHandler(t *testing.T) { interval := time.Minute ctx := context.Background() - _, dbstore := tests.SetupTestEnv(t, 1) + ng, dbstore := tests.SetupTestEnv(t, 1) const mainOrgID int64 = 1 rule := tests.CreateTestAlertRule(t, ctx, dbstore, int64(interval.Seconds()), mainOrgID) @@ -1751,7 +1751,7 @@ func TestStaleResultsHandler(t *testing.T) { } for _, instance := range instances { - _ = dbstore.SaveAlertInstance(ctx, instance) + _ = ng.InstanceStore.SaveAlertInstance(ctx, instance) } testCases := []struct { @@ -1805,7 +1805,7 @@ func TestStaleResultsHandler(t *testing.T) { cfg := state.ManagerCfg{ Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), ExternalURL: nil, - InstanceStore: dbstore, + InstanceStore: ng.InstanceStore, Images: &state.NoopImageService{}, Clock: clock.New(), Historian: &state.FakeHistorian{}, @@ -1813,7 +1813,7 @@ func TestStaleResultsHandler(t *testing.T) { Log: log.New("ngalert.state.manager"), } st := state.NewManager(cfg, state.NewNoopPersister()) - st.Warm(ctx, dbstore, dbstore) + st.Warm(ctx, dbstore, ng.InstanceStore) existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID) // We have loaded the expected number of entries from the db @@ -1978,7 +1978,7 @@ func TestStaleResults(t *testing.T) { func TestDeleteStateByRuleUID(t *testing.T) { interval := time.Minute ctx := context.Background() - _, dbstore := tests.SetupTestEnv(t, 1) + ng, dbstore := tests.SetupTestEnv(t, 1) const mainOrgID int64 = 1 rule := tests.CreateTestAlertRule(t, ctx, dbstore, int64(interval.Seconds()), mainOrgID) @@ -2009,7 +2009,7 @@ func TestDeleteStateByRuleUID(t *testing.T) { } for _, instance := range instances { - _ = dbstore.SaveAlertInstance(ctx, instance) + _ = ng.InstanceStore.SaveAlertInstance(ctx, instance) } testCases := []struct { @@ -2025,7 +2025,7 @@ func TestDeleteStateByRuleUID(t *testing.T) { }{ { desc: "all states/instances are removed from cache and DB", - instanceStore: dbstore, + instanceStore: ng.InstanceStore, expectedStates: []*state.State{ { AlertRuleUID: rule.UID, @@ -2065,7 +2065,7 @@ func TestDeleteStateByRuleUID(t *testing.T) { cfg := state.ManagerCfg{ Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), ExternalURL: nil, - InstanceStore: dbstore, + InstanceStore: ng.InstanceStore, Images: &state.NoopImageService{}, Clock: clk, Historian: &state.FakeHistorian{}, @@ -2073,9 +2073,9 @@ func TestDeleteStateByRuleUID(t *testing.T) { Log: log.New("ngalert.state.manager"), } st := state.NewManager(cfg, state.NewNoopPersister()) - st.Warm(ctx, dbstore, dbstore) + st.Warm(ctx, dbstore, ng.InstanceStore) q := &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID} - alerts, _ := dbstore.ListAlertInstances(ctx, q) + alerts, _ := ng.InstanceStore.ListAlertInstances(ctx, q) existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID) // We have loaded the expected number of entries from the db @@ -2107,7 +2107,7 @@ func TestDeleteStateByRuleUID(t *testing.T) { } q = &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID} - alertInstances, _ := dbstore.ListAlertInstances(ctx, q) + alertInstances, _ := ng.InstanceStore.ListAlertInstances(ctx, q) existingStatesForRule = st.GetStatesForRuleUID(rule.OrgID, rule.UID) // The expected number of state entries remains after states are deleted @@ -2120,7 +2120,7 @@ func TestDeleteStateByRuleUID(t *testing.T) { func TestResetStateByRuleUID(t *testing.T) { interval := time.Minute ctx := context.Background() - _, dbstore := tests.SetupTestEnv(t, 1) + ng, dbstore := tests.SetupTestEnv(t, 1) const mainOrgID int64 = 1 rule := tests.CreateTestAlertRule(t, ctx, dbstore, int64(interval.Seconds()), mainOrgID) @@ -2151,7 +2151,7 @@ func TestResetStateByRuleUID(t *testing.T) { } for _, instance := range instances { - _ = dbstore.SaveAlertInstance(ctx, instance) + _ = ng.InstanceStore.SaveAlertInstance(ctx, instance) } testCases := []struct { @@ -2168,7 +2168,7 @@ func TestResetStateByRuleUID(t *testing.T) { }{ { desc: "all states/instances are removed from cache and DB and saved in historian", - instanceStore: dbstore, + instanceStore: ng.InstanceStore, expectedStates: []*state.State{ { AlertRuleUID: rule.UID, @@ -2206,7 +2206,7 @@ func TestResetStateByRuleUID(t *testing.T) { cfg := state.ManagerCfg{ Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), ExternalURL: nil, - InstanceStore: dbstore, + InstanceStore: ng.InstanceStore, Images: &state.NoopImageService{}, Clock: clk, Historian: fakeHistorian, @@ -2214,9 +2214,9 @@ func TestResetStateByRuleUID(t *testing.T) { Log: log.New("ngalert.state.manager"), } st := state.NewManager(cfg, state.NewNoopPersister()) - st.Warm(ctx, dbstore, dbstore) + st.Warm(ctx, dbstore, ng.InstanceStore) q := &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID} - alerts, _ := dbstore.ListAlertInstances(ctx, q) + alerts, _ := ng.InstanceStore.ListAlertInstances(ctx, q) existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID) // We have loaded the expected number of entries from the db @@ -2251,7 +2251,7 @@ func TestResetStateByRuleUID(t *testing.T) { assert.Equal(t, transitions, fakeHistorian.StateTransitions) q = &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID} - alertInstances, _ := dbstore.ListAlertInstances(ctx, q) + alertInstances, _ := ng.InstanceStore.ListAlertInstances(ctx, q) existingStatesForRule = st.GetStatesForRuleUID(rule.OrgID, rule.UID) // The expected number of state entries remains after states are deleted diff --git a/pkg/services/ngalert/store/instance_database.go b/pkg/services/ngalert/store/instance_database.go index bb11d90a464..6201188c9ff 100644 --- a/pkg/services/ngalert/store/instance_database.go +++ b/pkg/services/ngalert/store/instance_database.go @@ -9,14 +9,21 @@ import ( "time" "github.com/grafana/grafana/pkg/infra/db" + "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/ngalert/models" "github.com/grafana/grafana/pkg/services/sqlstore" ) +type InstanceDBStore struct { + SQLStore db.DB + Logger log.Logger + FeatureToggles featuremgmt.FeatureToggles +} + // ListAlertInstances is a handler for retrieving alert instances within specific organisation // based on various filters. -func (st DBstore) ListAlertInstances(ctx context.Context, cmd *models.ListAlertInstancesQuery) (result []*models.AlertInstance, err error) { +func (st InstanceDBStore) ListAlertInstances(ctx context.Context, cmd *models.ListAlertInstancesQuery) (result []*models.AlertInstance, err error) { err = st.SQLStore.WithDbSession(ctx, func(sess *db.Session) error { alertInstances := make([]*models.AlertInstance, 0) @@ -51,7 +58,7 @@ func (st DBstore) ListAlertInstances(ctx context.Context, cmd *models.ListAlertI } // SaveAlertInstance is a handler for saving a new alert instance. -func (st DBstore) SaveAlertInstance(ctx context.Context, alertInstance models.AlertInstance) error { +func (st InstanceDBStore) SaveAlertInstance(ctx context.Context, alertInstance models.AlertInstance) error { return st.SQLStore.WithDbSession(ctx, func(sess *db.Session) error { if err := models.ValidateAlertInstance(alertInstance); err != nil { return err @@ -89,7 +96,7 @@ func (st DBstore) SaveAlertInstance(ctx context.Context, alertInstance models.Al }) } -func (st DBstore) FetchOrgIds(ctx context.Context) ([]int64, error) { +func (st InstanceDBStore) FetchOrgIds(ctx context.Context) ([]int64, error) { orgIds := []int64{} err := st.SQLStore.WithDbSession(ctx, func(sess *db.Session) error { @@ -113,7 +120,7 @@ func (st DBstore) FetchOrgIds(ctx context.Context) ([]int64, error) { } // DeleteAlertInstances deletes instances with the provided keys in a single transaction. -func (st DBstore) DeleteAlertInstances(ctx context.Context, keys ...models.AlertInstanceKey) error { +func (st InstanceDBStore) DeleteAlertInstances(ctx context.Context, keys ...models.AlertInstanceKey) error { if len(keys) == 0 { return nil } @@ -212,12 +219,13 @@ func (st DBstore) DeleteAlertInstances(ctx context.Context, keys ...models.Alert } // SaveAlertInstancesForRule is not implemented for instance database store. -func (st DBstore) SaveAlertInstancesForRule(ctx context.Context, key models.AlertRuleKeyWithGroup, instances []models.AlertInstance) error { +func (st InstanceDBStore) SaveAlertInstancesForRule(ctx context.Context, key models.AlertRuleKeyWithGroup, instances []models.AlertInstance) error { st.Logger.Error("SaveAlertInstancesForRule is not implemented for instance database store.") return errors.New("method SaveAlertInstancesForRule is not implemented for instance database store") } -func (st DBstore) DeleteAlertInstancesByRule(ctx context.Context, key models.AlertRuleKeyWithGroup) error { +// DeleteAlertInstancesByRule deletes all instances for a given rule. +func (st InstanceDBStore) DeleteAlertInstancesByRule(ctx context.Context, key models.AlertRuleKeyWithGroup) error { return st.SQLStore.WithTransactionalDbSession(ctx, func(sess *db.Session) error { _, err := sess.Exec("DELETE FROM alert_instance WHERE rule_org_id = ? AND rule_uid = ?", key.OrgID, key.UID) return err @@ -230,7 +238,7 @@ func (st DBstore) DeleteAlertInstancesByRule(ctx context.Context, key models.Ale // // The batchSize parameter controls how many instances are inserted per batch. Increasing batchSize can improve // performance for large datasets, but can also increase load on the database. -func (st DBstore) FullSync(ctx context.Context, instances []models.AlertInstance, batchSize int) error { +func (st InstanceDBStore) FullSync(ctx context.Context, instances []models.AlertInstance, batchSize int) error { if len(instances) == 0 { return nil } @@ -267,7 +275,7 @@ func (st DBstore) FullSync(ctx context.Context, instances []models.AlertInstance }) } -func (st DBstore) insertInstancesBatch(sess *sqlstore.DBSession, batch []models.AlertInstance) error { +func (st InstanceDBStore) insertInstancesBatch(sess *sqlstore.DBSession, batch []models.AlertInstance) error { // If the batch is empty, nothing to insert. if len(batch) == 0 { return nil diff --git a/pkg/services/ngalert/store/instance_database_bench_test.go b/pkg/services/ngalert/store/instance_database_bench_test.go new file mode 100644 index 00000000000..739f052d94d --- /dev/null +++ b/pkg/services/ngalert/store/instance_database_bench_test.go @@ -0,0 +1,105 @@ +package store_test + +import ( + "context" + "flag" + "fmt" + "testing" + + "github.com/grafana/grafana/pkg/services/featuremgmt" + "github.com/grafana/grafana/pkg/services/ngalert/models" + "github.com/grafana/grafana/pkg/services/ngalert/tests" +) + +var saveStateCompressed = flag.Bool("save-state-compressed", false, "Save state compressed") + +func BenchmarkSaveAlertInstances(b *testing.B) { + ctx := context.Background() + + opts := []tests.TestEnvOption{} + if *saveStateCompressed { + opts = append(opts, tests.WithFeatureToggles( + featuremgmt.WithFeatures(featuremgmt.FlagAlertingSaveStateCompressed)), + ) + } + + benchmarkRun := func(b *testing.B, instanceCount, labelCount int) { + ng, dbstore := tests.SetupTestEnv(b, baseIntervalSeconds, opts...) + + const mainOrgID int64 = 1 + + alertRule := tests.CreateTestAlertRule(b, ctx, dbstore, 60, mainOrgID) + + // Create some instances to write down and then delete. + instances := make([]models.AlertInstance, 0, instanceCount) + keys := make([]models.AlertInstanceKey, 0, instanceCount) + for i := 0; i < instanceCount; i++ { + labels := models.InstanceLabels{"instance": fmt.Sprintf("instance-%d", i)} + for li := 0; li < labelCount; li++ { + labels[fmt.Sprintf("label-%d", li)] = fmt.Sprintf("value-%d", li) + } + _, labelsHash, _ := labels.StringAndHash() + + instance := models.AlertInstance{ + AlertInstanceKey: models.AlertInstanceKey{ + RuleOrgID: alertRule.OrgID, + RuleUID: alertRule.UID, + LabelsHash: labelsHash, + }, + CurrentState: models.InstanceStateFiring, + CurrentReason: string(models.InstanceStateError), + Labels: labels, + } + instances = append(instances, instance) + keys = append(keys, instance.AlertInstanceKey) + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + var err error + + if *saveStateCompressed { + err = ng.InstanceStore.SaveAlertInstancesForRule(ctx, alertRule.GetKeyWithGroup(), instances) + if err != nil { + b.Fatalf("error: %s", err) + } + + // Clean up instances. + b.StopTimer() + err = ng.InstanceStore.DeleteAlertInstancesByRule(ctx, alertRule.GetKeyWithGroup()) + if err != nil { + b.Fatalf("error: %s", err) + } + b.StartTimer() + } else { + for _, instance := range instances { + err = ng.InstanceStore.SaveAlertInstance(ctx, instance) + if err != nil { + b.Fatalf("error: %s", err) + } + } + + // Clean up instances. + b.StopTimer() + err = ng.InstanceStore.DeleteAlertInstances(ctx, keys...) + if err != nil { + b.Fatalf("error: %s", err) + } + b.StartTimer() + } + } + } + + b.Run("100 instances with 10 labels each", func(b *testing.B) { + benchmarkRun(b, 100, 10) + }) + + b.Run("100 instances with 100 labels each", func(b *testing.B) { + benchmarkRun(b, 100, 100) + }) + + b.Run("1000 instances with 10 labels each", func(b *testing.B) { + benchmarkRun(b, 1000, 10) + }) +} diff --git a/pkg/services/ngalert/store/instance_database_test.go b/pkg/services/ngalert/store/instance_database_test.go index 87016b9e50a..cb29fa394e7 100644 --- a/pkg/services/ngalert/store/instance_database_test.go +++ b/pkg/services/ngalert/store/instance_database_test.go @@ -1,57 +1,188 @@ package store_test import ( + "bytes" "context" "fmt" "testing" "time" + "github.com/golang/snappy" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + "github.com/grafana/grafana/pkg/infra/db" "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/ngalert/models" + pb "github.com/grafana/grafana/pkg/services/ngalert/store/proto/v1" "github.com/grafana/grafana/pkg/services/ngalert/tests" "github.com/grafana/grafana/pkg/util" ) const baseIntervalSeconds = 10 -func BenchmarkAlertInstanceOperations(b *testing.B) { - b.StopTimer() +func TestIntegration_CompressedAlertRuleStateOperations(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + ctx := context.Background() - _, dbstore := tests.SetupTestEnv(b, baseIntervalSeconds) + ng, dbstore := tests.SetupTestEnv( + t, + baseIntervalSeconds, + tests.WithFeatureToggles( + featuremgmt.WithFeatures(featuremgmt.FlagAlertingSaveStateCompressed), + ), + ) const mainOrgID int64 = 1 - alertRule := tests.CreateTestAlertRule(b, ctx, dbstore, 60, mainOrgID) + alertRule1 := tests.CreateTestAlertRule(t, ctx, dbstore, 60, mainOrgID) + orgID := alertRule1.OrgID + alertRule2 := tests.CreateTestAlertRule(t, ctx, dbstore, 60, mainOrgID) + require.Equal(t, orgID, alertRule2.OrgID) - // Create some instances to write down and then delete. - count := 10_003 - instances := make([]models.AlertInstance, 0, count) - keys := make([]models.AlertInstanceKey, 0, count) - for i := 0; i < count; i++ { - labels := models.InstanceLabels{"test": fmt.Sprint(i)} - _, labelsHash, _ := labels.StringAndHash() - instance := models.AlertInstance{ - AlertInstanceKey: models.AlertInstanceKey{ - RuleOrgID: alertRule.OrgID, - RuleUID: alertRule.UID, - LabelsHash: labelsHash, + tests := []struct { + name string + setupInstances func() []models.AlertInstance + listQuery *models.ListAlertInstancesQuery + validate func(t *testing.T, alerts []*models.AlertInstance) + }{ + { + name: "can save and read alert rule state", + setupInstances: func() []models.AlertInstance { + return []models.AlertInstance{ + createAlertInstance(alertRule1.OrgID, alertRule1.UID, "labelsHash1", string(models.InstanceStateError), models.InstanceStateFiring), + } }, - CurrentState: models.InstanceStateFiring, - CurrentReason: string(models.InstanceStateError), - Labels: labels, - } - instances = append(instances, instance) - keys = append(keys, instance.AlertInstanceKey) + listQuery: &models.ListAlertInstancesQuery{ + RuleOrgID: alertRule1.OrgID, + RuleUID: alertRule1.UID, + }, + validate: func(t *testing.T, alerts []*models.AlertInstance) { + require.Len(t, alerts, 1) + require.Equal(t, "labelsHash1", alerts[0].LabelsHash) + }, + }, + { + name: "can save and read alert rule state with multiple instances", + setupInstances: func() []models.AlertInstance { + return []models.AlertInstance{ + createAlertInstance(alertRule1.OrgID, alertRule1.UID, "hash1", "", models.InstanceStateFiring), + createAlertInstance(alertRule1.OrgID, alertRule1.UID, "hash2", "", models.InstanceStateFiring), + } + }, + listQuery: &models.ListAlertInstancesQuery{ + RuleOrgID: alertRule1.OrgID, + RuleUID: alertRule1.UID, + }, + validate: func(t *testing.T, alerts []*models.AlertInstance) { + require.Len(t, alerts, 2) + containsHash(t, alerts, "hash1") + containsHash(t, alerts, "hash2") + }, + }, } - b.StartTimer() - for i := 0; i < b.N; i++ { - for _, instance := range instances { - _ = dbstore.SaveAlertInstance(ctx, instance) + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + instances := tc.setupInstances() + err := ng.InstanceStore.SaveAlertInstancesForRule(ctx, alertRule1.GetKeyWithGroup(), instances) + require.NoError(t, err) + alerts, err := ng.InstanceStore.ListAlertInstances(ctx, tc.listQuery) + require.NoError(t, err) + tc.validate(t, alerts) + }) + } +} + +func TestIntegration_CompressedAlertRuleStateOperations_NoNormalState(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + ctx := context.Background() + ng, dbstore := tests.SetupTestEnv( + t, + baseIntervalSeconds, + tests.WithFeatureToggles( + featuremgmt.WithFeatures( + featuremgmt.FlagAlertingSaveStateCompressed, + featuremgmt.FlagAlertingNoNormalState, + ), + ), + ) + + const mainOrgID int64 = 1 + + alertRule1 := tests.CreateTestAlertRule(t, ctx, dbstore, 60, mainOrgID) + orgID := alertRule1.OrgID + + tests := []struct { + name string + setupInstances func() []models.AlertInstance + listQuery *models.ListAlertInstancesQuery + validate func(t *testing.T, alerts []*models.AlertInstance) + }{ + { + name: "should ignore Normal state with no reason if feature flag is enabled", + setupInstances: func() []models.AlertInstance { + return []models.AlertInstance{ + createAlertInstance(orgID, util.GenerateShortUID(), util.GenerateShortUID(), "", models.InstanceStateNormal), + createAlertInstance(orgID, util.GenerateShortUID(), "errorHash", "error", models.InstanceStateNormal), + } + }, + listQuery: &models.ListAlertInstancesQuery{ + RuleOrgID: orgID, + }, + validate: func(t *testing.T, alerts []*models.AlertInstance) { + require.Len(t, alerts, 1) + containsHash(t, alerts, "errorHash") + for _, instance := range alerts { + if instance.CurrentState == models.InstanceStateNormal && instance.CurrentReason == "" { + require.Fail(t, "List operation expected to return all states except Normal but the result contains Normal states") + } + } + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + instances := tc.setupInstances() + err := ng.InstanceStore.SaveAlertInstancesForRule(ctx, alertRule1.GetKeyWithGroup(), instances) + require.NoError(t, err) + alerts, err := ng.InstanceStore.ListAlertInstances(ctx, tc.listQuery) + require.NoError(t, err) + tc.validate(t, alerts) + }) + } +} + +// containsHash is a helper function to check if an instance with +// a given labels hash exists in the list of alert instances. +func containsHash(t *testing.T, instances []*models.AlertInstance, hash string) { + t.Helper() + + for _, i := range instances { + if i.LabelsHash == hash { + return } - _ = dbstore.DeleteAlertInstances(ctx, keys...) + } + + require.Fail(t, fmt.Sprintf("%v does not contain an instance with hash %s", instances, hash)) +} + +func createAlertInstance(orgID int64, ruleUID, labelsHash, reason string, state models.InstanceStateType) models.AlertInstance { + return models.AlertInstance{ + AlertInstanceKey: models.AlertInstanceKey{ + RuleOrgID: orgID, + RuleUID: ruleUID, + LabelsHash: labelsHash, + }, + CurrentState: state, + CurrentReason: reason, + Labels: models.InstanceLabels{"label1": "value1"}, } } @@ -60,20 +191,10 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) { t.Skip("skipping integration test") } ctx := context.Background() - _, dbstore := tests.SetupTestEnv(t, baseIntervalSeconds) + ng, dbstore := tests.SetupTestEnv(t, baseIntervalSeconds) const mainOrgID int64 = 1 - containsHash := func(t *testing.T, instances []*models.AlertInstance, hash string) { - t.Helper() - for _, i := range instances { - if i.LabelsHash == hash { - return - } - } - require.Fail(t, "%v does not contain an instance with hash %s", instances, hash) - } - alertRule1 := tests.CreateTestAlertRule(t, ctx, dbstore, 60, mainOrgID) orgID := alertRule1.OrgID @@ -99,14 +220,14 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) { CurrentReason: string(models.InstanceStateError), Labels: labels, } - err := dbstore.SaveAlertInstance(ctx, instance) + err := ng.InstanceStore.SaveAlertInstance(ctx, instance) require.NoError(t, err) listCmd := &models.ListAlertInstancesQuery{ RuleOrgID: instance.RuleOrgID, RuleUID: instance.RuleUID, } - alerts, err := dbstore.ListAlertInstances(ctx, listCmd) + alerts, err := ng.InstanceStore.ListAlertInstances(ctx, listCmd) require.NoError(t, err) require.Len(t, alerts, 1) @@ -128,7 +249,7 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) { CurrentState: models.InstanceStateNormal, Labels: labels, } - err := dbstore.SaveAlertInstance(ctx, instance) + err := ng.InstanceStore.SaveAlertInstance(ctx, instance) require.NoError(t, err) listCmd := &models.ListAlertInstancesQuery{ @@ -136,7 +257,7 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) { RuleUID: instance.RuleUID, } - alerts, err := dbstore.ListAlertInstances(ctx, listCmd) + alerts, err := ng.InstanceStore.ListAlertInstances(ctx, listCmd) require.NoError(t, err) require.Len(t, alerts, 1) @@ -158,7 +279,7 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) { Labels: labels, } - err := dbstore.SaveAlertInstance(ctx, instance1) + err := ng.InstanceStore.SaveAlertInstance(ctx, instance1) require.NoError(t, err) labels = models.InstanceLabels{"test": "testValue2"} @@ -172,7 +293,7 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) { CurrentState: models.InstanceStateFiring, Labels: labels, } - err = dbstore.SaveAlertInstance(ctx, instance2) + err = ng.InstanceStore.SaveAlertInstance(ctx, instance2) require.NoError(t, err) listQuery := &models.ListAlertInstancesQuery{ @@ -180,7 +301,7 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) { RuleUID: instance1.RuleUID, } - alerts, err := dbstore.ListAlertInstances(ctx, listQuery) + alerts, err := ng.InstanceStore.ListAlertInstances(ctx, listQuery) require.NoError(t, err) require.Len(t, alerts, 2) @@ -191,13 +312,21 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) { RuleOrgID: orgID, } - alerts, err := dbstore.ListAlertInstances(ctx, listQuery) + alerts, err := ng.InstanceStore.ListAlertInstances(ctx, listQuery) require.NoError(t, err) require.Len(t, alerts, 4) }) t.Run("should ignore Normal state with no reason if feature flag is enabled", func(t *testing.T) { + ng, _ := tests.SetupTestEnv( + t, + baseIntervalSeconds, + tests.WithFeatureToggles( + featuremgmt.WithFeatures(featuremgmt.FlagAlertingNoNormalState), + ), + ) + labels := models.InstanceLabels{"test": util.GenerateShortUID()} instance1 := models.AlertInstance{ AlertInstanceKey: models.AlertInstanceKey{ @@ -219,27 +348,16 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) { CurrentReason: models.StateReasonError, Labels: labels, } - err := dbstore.SaveAlertInstance(ctx, instance1) + err := ng.InstanceStore.SaveAlertInstance(ctx, instance1) require.NoError(t, err) - err = dbstore.SaveAlertInstance(ctx, instance2) + err = ng.InstanceStore.SaveAlertInstance(ctx, instance2) require.NoError(t, err) listQuery := &models.ListAlertInstancesQuery{ RuleOrgID: orgID, } - alerts, err := dbstore.ListAlertInstances(ctx, listQuery) - require.NoError(t, err) - - containsHash(t, alerts, instance1.LabelsHash) - - f := dbstore.FeatureToggles - dbstore.FeatureToggles = featuremgmt.WithFeatures(featuremgmt.FlagAlertingNoNormalState) - t.Cleanup(func() { - dbstore.FeatureToggles = f - }) - - alerts, err = dbstore.ListAlertInstances(ctx, listQuery) + alerts, err := ng.InstanceStore.ListAlertInstances(ctx, listQuery) require.NoError(t, err) containsHash(t, alerts, instance2.LabelsHash) @@ -264,7 +382,7 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) { Labels: labels, } - err := dbstore.SaveAlertInstance(ctx, instance1) + err := ng.InstanceStore.SaveAlertInstance(ctx, instance1) require.NoError(t, err) instance2 := models.AlertInstance{ @@ -276,7 +394,7 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) { CurrentState: models.InstanceStateNormal, Labels: instance1.Labels, } - err = dbstore.SaveAlertInstance(ctx, instance2) + err = ng.InstanceStore.SaveAlertInstance(ctx, instance2) require.NoError(t, err) listQuery := &models.ListAlertInstancesQuery{ @@ -284,7 +402,7 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) { RuleUID: alertRule4.UID, } - alerts, err := dbstore.ListAlertInstances(ctx, listQuery) + alerts, err := ng.InstanceStore.ListAlertInstances(ctx, listQuery) require.NoError(t, err) require.Len(t, alerts, 1) @@ -300,7 +418,7 @@ func TestIntegrationFullSync(t *testing.T) { batchSize := 1 ctx := context.Background() - _, dbstore := tests.SetupTestEnv(t, baseIntervalSeconds) + ng, _ := tests.SetupTestEnv(t, baseIntervalSeconds) orgID := int64(1) @@ -312,10 +430,10 @@ func TestIntegrationFullSync(t *testing.T) { } t.Run("Should do a proper full sync", func(t *testing.T) { - err := dbstore.FullSync(ctx, instances, batchSize) + err := ng.InstanceStore.FullSync(ctx, instances, batchSize) require.NoError(t, err) - res, err := dbstore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{ + res, err := ng.InstanceStore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{ RuleOrgID: orgID, }) require.NoError(t, err) @@ -335,10 +453,10 @@ func TestIntegrationFullSync(t *testing.T) { }) t.Run("Should remove non existing entries on sync", func(t *testing.T) { - err := dbstore.FullSync(ctx, instances[1:], batchSize) + err := ng.InstanceStore.FullSync(ctx, instances[1:], batchSize) require.NoError(t, err) - res, err := dbstore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{ + res, err := ng.InstanceStore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{ RuleOrgID: orgID, }) require.NoError(t, err) @@ -352,10 +470,10 @@ func TestIntegrationFullSync(t *testing.T) { t.Run("Should add new entries on sync", func(t *testing.T) { newRuleUID := "y" - err := dbstore.FullSync(ctx, append(instances, generateTestAlertInstance(orgID, newRuleUID)), batchSize) + err := ng.InstanceStore.FullSync(ctx, append(instances, generateTestAlertInstance(orgID, newRuleUID)), batchSize) require.NoError(t, err) - res, err := dbstore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{ + res, err := ng.InstanceStore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{ RuleOrgID: orgID, }) require.NoError(t, err) @@ -377,10 +495,10 @@ func TestIntegrationFullSync(t *testing.T) { t.Run("Should save all instances when batch size is bigger than 1", func(t *testing.T) { batchSize = 2 newRuleUID := "y" - err := dbstore.FullSync(ctx, append(instances, generateTestAlertInstance(orgID, newRuleUID)), batchSize) + err := ng.InstanceStore.FullSync(ctx, append(instances, generateTestAlertInstance(orgID, newRuleUID)), batchSize) require.NoError(t, err) - res, err := dbstore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{ + res, err := ng.InstanceStore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{ RuleOrgID: orgID, }) require.NoError(t, err) @@ -405,16 +523,16 @@ func TestIntegrationFullSync(t *testing.T) { generateTestAlertInstance(orgID, "preexisting-1"), generateTestAlertInstance(orgID, "preexisting-2"), } - err := dbstore.FullSync(ctx, initialInstances, 5) + err := ng.InstanceStore.FullSync(ctx, initialInstances, 5) require.NoError(t, err) // Now call FullSync with no instances. According to the code, this should return nil // and should not delete anything in the table. - err = dbstore.FullSync(ctx, []models.AlertInstance{}, 5) + err = ng.InstanceStore.FullSync(ctx, []models.AlertInstance{}, 5) require.NoError(t, err) // Check that the previously inserted instances are still present. - res, err := dbstore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{ + res, err := ng.InstanceStore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{ RuleOrgID: orgID, }) require.NoError(t, err) @@ -441,11 +559,11 @@ func TestIntegrationFullSync(t *testing.T) { // Make the invalid instance actually invalid invalidInstance.AlertInstanceKey.RuleUID = "" - err := dbstore.FullSync(ctx, []models.AlertInstance{validInstance, invalidInstance}, 2) + err := ng.InstanceStore.FullSync(ctx, []models.AlertInstance{validInstance, invalidInstance}, 2) require.NoError(t, err) // Only the valid instance should be saved. - res, err := dbstore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{ + res, err := ng.InstanceStore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{ RuleOrgID: orgID, }) require.NoError(t, err) @@ -460,10 +578,10 @@ func TestIntegrationFullSync(t *testing.T) { generateTestAlertInstance(orgID, "batch-test2"), } - err := dbstore.FullSync(ctx, smallSet, 100) + err := ng.InstanceStore.FullSync(ctx, smallSet, 100) require.NoError(t, err) - res, err := dbstore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{ + res, err := ng.InstanceStore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{ RuleOrgID: orgID, }) require.NoError(t, err) @@ -483,7 +601,7 @@ func TestIntegrationFullSync(t *testing.T) { t.Run("Should handle a large set of instances with a moderate batchSize", func(t *testing.T) { // Clear everything first. - err := dbstore.FullSync(ctx, []models.AlertInstance{}, 1) + err := ng.InstanceStore.FullSync(ctx, []models.AlertInstance{}, 1) require.NoError(t, err) largeCount := 300 @@ -492,10 +610,10 @@ func TestIntegrationFullSync(t *testing.T) { largeSet[i] = generateTestAlertInstance(orgID, fmt.Sprintf("large-%d", i)) } - err = dbstore.FullSync(ctx, largeSet, 50) + err = ng.InstanceStore.FullSync(ctx, largeSet, 50) require.NoError(t, err) - res, err := dbstore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{ + res, err := ng.InstanceStore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{ RuleOrgID: orgID, }) require.NoError(t, err) @@ -503,6 +621,77 @@ func TestIntegrationFullSync(t *testing.T) { }) } +func TestIntegration_ProtoInstanceDBStore_VerifyCompressedData(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + ctx := context.Background() + ng, dbstore := tests.SetupTestEnv( + t, + baseIntervalSeconds, + tests.WithFeatureToggles( + featuremgmt.WithFeatures( + featuremgmt.FlagAlertingSaveStateCompressed, + ), + ), + ) + + alertRule := tests.CreateTestAlertRule(t, ctx, dbstore, 60, 1) + + labelsHash := "hash1" + reason := "reason" + state := models.InstanceStateFiring + instances := []models.AlertInstance{ + createAlertInstance(alertRule.OrgID, alertRule.UID, labelsHash, reason, state), + } + + err := ng.InstanceStore.SaveAlertInstancesForRule(ctx, alertRule.GetKeyWithGroup(), instances) + require.NoError(t, err) + + // Query raw data from the database + type compressedRow struct { + OrgID int64 `xorm:"org_id"` + RuleUID string `xorm:"rule_uid"` + Data []byte `xorm:"data"` + } + var rawData compressedRow + err = dbstore.SQLStore.WithDbSession(ctx, func(sess *db.Session) error { + _, err := sess.SQL("SELECT * FROM alert_rule_state").Get(&rawData) + return err + }) + require.NoError(t, err) + + // Decompress and compare + require.NotNil(t, rawData) + decompressedInstances, err := decompressAlertInstances(rawData.Data) + require.NoError(t, err) + + require.Len(t, decompressedInstances, 1) + require.Equal(t, instances[0].LabelsHash, decompressedInstances[0].LabelsHash) + require.Equal(t, string(instances[0].CurrentState), decompressedInstances[0].CurrentState) + require.Equal(t, instances[0].CurrentReason, decompressedInstances[0].CurrentReason) +} + +func decompressAlertInstances(compressed []byte) ([]*pb.AlertInstance, error) { + if len(compressed) == 0 { + return nil, nil + } + + reader := snappy.NewReader(bytes.NewReader(compressed)) + var b bytes.Buffer + if _, err := b.ReadFrom(reader); err != nil { + return nil, fmt.Errorf("failed to read compressed data: %w", err) + } + + var instances pb.AlertInstances + if err := proto.Unmarshal(b.Bytes(), &instances); err != nil { + return nil, fmt.Errorf("failed to unmarshal protobuf: %w", err) + } + + return instances.Instances, nil +} + func generateTestAlertInstance(orgID int64, ruleID string) models.AlertInstance { return models.AlertInstance{ AlertInstanceKey: models.AlertInstanceKey{ diff --git a/pkg/services/ngalert/store/proto/v1/alert_rule_state.pb.go b/pkg/services/ngalert/store/proto/v1/alert_rule_state.pb.go new file mode 100644 index 00000000000..e07a4bbdf04 --- /dev/null +++ b/pkg/services/ngalert/store/proto/v1/alert_rule_state.pb.go @@ -0,0 +1,301 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.1 +// protoc (unknown) +// source: alert_rule_state.proto + +package v1 + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type AlertInstance struct { + state protoimpl.MessageState `protogen:"open.v1"` + LabelsHash string `protobuf:"bytes,1,opt,name=labels_hash,json=labelsHash,proto3" json:"labels_hash,omitempty"` + Labels map[string]string `protobuf:"bytes,2,rep,name=labels,proto3" json:"labels,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + CurrentState string `protobuf:"bytes,3,opt,name=current_state,json=currentState,proto3" json:"current_state,omitempty"` + CurrentReason string `protobuf:"bytes,4,opt,name=current_reason,json=currentReason,proto3" json:"current_reason,omitempty"` + CurrentStateSince *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=current_state_since,json=currentStateSince,proto3" json:"current_state_since,omitempty"` + CurrentStateEnd *timestamppb.Timestamp `protobuf:"bytes,6,opt,name=current_state_end,json=currentStateEnd,proto3" json:"current_state_end,omitempty"` + LastEvalTime *timestamppb.Timestamp `protobuf:"bytes,7,opt,name=last_eval_time,json=lastEvalTime,proto3" json:"last_eval_time,omitempty"` + LastSentAt *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=last_sent_at,json=lastSentAt,proto3" json:"last_sent_at,omitempty"` + ResolvedAt *timestamppb.Timestamp `protobuf:"bytes,9,opt,name=resolved_at,json=resolvedAt,proto3" json:"resolved_at,omitempty"` + ResultFingerprint string `protobuf:"bytes,10,opt,name=result_fingerprint,json=resultFingerprint,proto3" json:"result_fingerprint,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *AlertInstance) Reset() { + *x = AlertInstance{} + mi := &file_alert_rule_state_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *AlertInstance) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AlertInstance) ProtoMessage() {} + +func (x *AlertInstance) ProtoReflect() protoreflect.Message { + mi := &file_alert_rule_state_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AlertInstance.ProtoReflect.Descriptor instead. +func (*AlertInstance) Descriptor() ([]byte, []int) { + return file_alert_rule_state_proto_rawDescGZIP(), []int{0} +} + +func (x *AlertInstance) GetLabelsHash() string { + if x != nil { + return x.LabelsHash + } + return "" +} + +func (x *AlertInstance) GetLabels() map[string]string { + if x != nil { + return x.Labels + } + return nil +} + +func (x *AlertInstance) GetCurrentState() string { + if x != nil { + return x.CurrentState + } + return "" +} + +func (x *AlertInstance) GetCurrentReason() string { + if x != nil { + return x.CurrentReason + } + return "" +} + +func (x *AlertInstance) GetCurrentStateSince() *timestamppb.Timestamp { + if x != nil { + return x.CurrentStateSince + } + return nil +} + +func (x *AlertInstance) GetCurrentStateEnd() *timestamppb.Timestamp { + if x != nil { + return x.CurrentStateEnd + } + return nil +} + +func (x *AlertInstance) GetLastEvalTime() *timestamppb.Timestamp { + if x != nil { + return x.LastEvalTime + } + return nil +} + +func (x *AlertInstance) GetLastSentAt() *timestamppb.Timestamp { + if x != nil { + return x.LastSentAt + } + return nil +} + +func (x *AlertInstance) GetResolvedAt() *timestamppb.Timestamp { + if x != nil { + return x.ResolvedAt + } + return nil +} + +func (x *AlertInstance) GetResultFingerprint() string { + if x != nil { + return x.ResultFingerprint + } + return "" +} + +type AlertInstances struct { + state protoimpl.MessageState `protogen:"open.v1"` + Instances []*AlertInstance `protobuf:"bytes,1,rep,name=instances,proto3" json:"instances,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *AlertInstances) Reset() { + *x = AlertInstances{} + mi := &file_alert_rule_state_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *AlertInstances) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AlertInstances) ProtoMessage() {} + +func (x *AlertInstances) ProtoReflect() protoreflect.Message { + mi := &file_alert_rule_state_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AlertInstances.ProtoReflect.Descriptor instead. +func (*AlertInstances) Descriptor() ([]byte, []int) { + return file_alert_rule_state_proto_rawDescGZIP(), []int{1} +} + +func (x *AlertInstances) GetInstances() []*AlertInstance { + if x != nil { + return x.Instances + } + return nil +} + +var File_alert_rule_state_proto protoreflect.FileDescriptor + +var file_alert_rule_state_proto_rawDesc = []byte{ + 0x0a, 0x16, 0x61, 0x6c, 0x65, 0x72, 0x74, 0x5f, 0x72, 0x75, 0x6c, 0x65, 0x5f, 0x73, 0x74, 0x61, + 0x74, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x10, 0x6e, 0x67, 0x61, 0x6c, 0x65, 0x72, + 0x74, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xfc, 0x04, 0x0a, 0x0d, + 0x41, 0x6c, 0x65, 0x72, 0x74, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x12, 0x1f, 0x0a, + 0x0b, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x5f, 0x68, 0x61, 0x73, 0x68, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x0a, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x48, 0x61, 0x73, 0x68, 0x12, 0x43, + 0x0a, 0x06, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2b, + 0x2e, 0x6e, 0x67, 0x61, 0x6c, 0x65, 0x72, 0x74, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, + 0x31, 0x2e, 0x41, 0x6c, 0x65, 0x72, 0x74, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x2e, + 0x4c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x06, 0x6c, 0x61, 0x62, + 0x65, 0x6c, 0x73, 0x12, 0x23, 0x0a, 0x0d, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x73, + 0x74, 0x61, 0x74, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x63, 0x75, 0x72, 0x72, + 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x63, 0x75, 0x72, 0x72, + 0x65, 0x6e, 0x74, 0x5f, 0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x0d, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x12, + 0x4a, 0x0a, 0x13, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x65, + 0x5f, 0x73, 0x69, 0x6e, 0x63, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, + 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, + 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x11, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, + 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x53, 0x69, 0x6e, 0x63, 0x65, 0x12, 0x46, 0x0a, 0x11, 0x63, + 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x65, 0x5f, 0x65, 0x6e, 0x64, + 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, + 0x6d, 0x70, 0x52, 0x0f, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, + 0x45, 0x6e, 0x64, 0x12, 0x40, 0x0a, 0x0e, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x65, 0x76, 0x61, 0x6c, + 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, + 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0c, 0x6c, 0x61, 0x73, 0x74, 0x45, 0x76, 0x61, + 0x6c, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x3c, 0x0a, 0x0c, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x73, 0x65, + 0x6e, 0x74, 0x5f, 0x61, 0x74, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, + 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0a, 0x6c, 0x61, 0x73, 0x74, 0x53, 0x65, 0x6e, + 0x74, 0x41, 0x74, 0x12, 0x3b, 0x0a, 0x0b, 0x72, 0x65, 0x73, 0x6f, 0x6c, 0x76, 0x65, 0x64, 0x5f, + 0x61, 0x74, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, + 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0a, 0x72, 0x65, 0x73, 0x6f, 0x6c, 0x76, 0x65, 0x64, 0x41, 0x74, + 0x12, 0x2d, 0x0a, 0x12, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x5f, 0x66, 0x69, 0x6e, 0x67, 0x65, + 0x72, 0x70, 0x72, 0x69, 0x6e, 0x74, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x11, 0x72, 0x65, + 0x73, 0x75, 0x6c, 0x74, 0x46, 0x69, 0x6e, 0x67, 0x65, 0x72, 0x70, 0x72, 0x69, 0x6e, 0x74, 0x1a, + 0x39, 0x0a, 0x0b, 0x4c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, + 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, + 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x4f, 0x0a, 0x0e, 0x41, 0x6c, + 0x65, 0x72, 0x74, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x73, 0x12, 0x3d, 0x0a, 0x09, + 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, + 0x1f, 0x2e, 0x6e, 0x67, 0x61, 0x6c, 0x65, 0x72, 0x74, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, + 0x76, 0x31, 0x2e, 0x41, 0x6c, 0x65, 0x72, 0x74, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, + 0x52, 0x09, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x73, 0x42, 0x40, 0x5a, 0x3e, 0x67, + 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x72, 0x61, 0x66, 0x61, 0x6e, + 0x61, 0x2f, 0x67, 0x72, 0x61, 0x66, 0x61, 0x6e, 0x61, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x73, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, 0x6e, 0x67, 0x61, 0x6c, 0x65, 0x72, 0x74, 0x2f, 0x73, + 0x74, 0x6f, 0x72, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_alert_rule_state_proto_rawDescOnce sync.Once + file_alert_rule_state_proto_rawDescData = file_alert_rule_state_proto_rawDesc +) + +func file_alert_rule_state_proto_rawDescGZIP() []byte { + file_alert_rule_state_proto_rawDescOnce.Do(func() { + file_alert_rule_state_proto_rawDescData = protoimpl.X.CompressGZIP(file_alert_rule_state_proto_rawDescData) + }) + return file_alert_rule_state_proto_rawDescData +} + +var file_alert_rule_state_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_alert_rule_state_proto_goTypes = []any{ + (*AlertInstance)(nil), // 0: ngalert.store.v1.AlertInstance + (*AlertInstances)(nil), // 1: ngalert.store.v1.AlertInstances + nil, // 2: ngalert.store.v1.AlertInstance.LabelsEntry + (*timestamppb.Timestamp)(nil), // 3: google.protobuf.Timestamp +} +var file_alert_rule_state_proto_depIdxs = []int32{ + 2, // 0: ngalert.store.v1.AlertInstance.labels:type_name -> ngalert.store.v1.AlertInstance.LabelsEntry + 3, // 1: ngalert.store.v1.AlertInstance.current_state_since:type_name -> google.protobuf.Timestamp + 3, // 2: ngalert.store.v1.AlertInstance.current_state_end:type_name -> google.protobuf.Timestamp + 3, // 3: ngalert.store.v1.AlertInstance.last_eval_time:type_name -> google.protobuf.Timestamp + 3, // 4: ngalert.store.v1.AlertInstance.last_sent_at:type_name -> google.protobuf.Timestamp + 3, // 5: ngalert.store.v1.AlertInstance.resolved_at:type_name -> google.protobuf.Timestamp + 0, // 6: ngalert.store.v1.AlertInstances.instances:type_name -> ngalert.store.v1.AlertInstance + 7, // [7:7] is the sub-list for method output_type + 7, // [7:7] is the sub-list for method input_type + 7, // [7:7] is the sub-list for extension type_name + 7, // [7:7] is the sub-list for extension extendee + 0, // [0:7] is the sub-list for field type_name +} + +func init() { file_alert_rule_state_proto_init() } +func file_alert_rule_state_proto_init() { + if File_alert_rule_state_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_alert_rule_state_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_alert_rule_state_proto_goTypes, + DependencyIndexes: file_alert_rule_state_proto_depIdxs, + MessageInfos: file_alert_rule_state_proto_msgTypes, + }.Build() + File_alert_rule_state_proto = out.File + file_alert_rule_state_proto_rawDesc = nil + file_alert_rule_state_proto_goTypes = nil + file_alert_rule_state_proto_depIdxs = nil +} diff --git a/pkg/services/ngalert/store/proto/v1/alert_rule_state.proto b/pkg/services/ngalert/store/proto/v1/alert_rule_state.proto new file mode 100644 index 00000000000..3f0ec7de689 --- /dev/null +++ b/pkg/services/ngalert/store/proto/v1/alert_rule_state.proto @@ -0,0 +1,24 @@ +syntax = "proto3"; + +package ngalert.store.v1; + +import "google/protobuf/timestamp.proto"; + +option go_package = "github.com/grafana/grafana/pkg/services/ngalert/store/proto/v1"; + +message AlertInstance { + string labels_hash = 1; + map labels = 2; + string current_state = 3; + string current_reason = 4; + google.protobuf.Timestamp current_state_since = 5; + google.protobuf.Timestamp current_state_end = 6; + google.protobuf.Timestamp last_eval_time = 7; + google.protobuf.Timestamp last_sent_at = 8; + google.protobuf.Timestamp resolved_at = 9; + string result_fingerprint = 10; +} + +message AlertInstances { + repeated AlertInstance instances = 1; +} diff --git a/pkg/services/ngalert/store/proto/v1/buf.gen.yaml b/pkg/services/ngalert/store/proto/v1/buf.gen.yaml new file mode 100644 index 00000000000..552d42bc105 --- /dev/null +++ b/pkg/services/ngalert/store/proto/v1/buf.gen.yaml @@ -0,0 +1,5 @@ +version: v1 +plugins: + - plugin: go + out: pkg/services/ngalert/store/proto/v1 + opt: paths=source_relative diff --git a/pkg/services/ngalert/store/proto/v1/buf.yaml b/pkg/services/ngalert/store/proto/v1/buf.yaml new file mode 100644 index 00000000000..4cb6a23ed85 --- /dev/null +++ b/pkg/services/ngalert/store/proto/v1/buf.yaml @@ -0,0 +1,7 @@ +version: v2 +lint: + use: + - DEFAULT +breaking: + use: + - FILE diff --git a/pkg/services/ngalert/store/proto_instance_database.go b/pkg/services/ngalert/store/proto_instance_database.go new file mode 100644 index 00000000000..e713d43be31 --- /dev/null +++ b/pkg/services/ngalert/store/proto_instance_database.go @@ -0,0 +1,261 @@ +package store + +import ( + "bytes" + "context" + "errors" + "fmt" + "strings" + "time" + + "github.com/golang/snappy" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/grafana/grafana/pkg/infra/db" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/services/featuremgmt" + "github.com/grafana/grafana/pkg/services/ngalert/models" + pb "github.com/grafana/grafana/pkg/services/ngalert/store/proto/v1" +) + +// ProtoInstanceDBStore is a store for alert instances that stores state of a rule as a single +// row in the database with alert instances as a compressed protobuf message. +type ProtoInstanceDBStore struct { + SQLStore db.DB + Logger log.Logger + FeatureToggles featuremgmt.FeatureToggles +} + +func (st ProtoInstanceDBStore) ListAlertInstances(ctx context.Context, cmd *models.ListAlertInstancesQuery) (result []*models.AlertInstance, err error) { + logger := st.Logger.FromContext(ctx) + logger.Debug("ListAlertInstances called", "rule_uid", cmd.RuleUID, "org_id", cmd.RuleOrgID) + alertInstances := make([]*models.AlertInstance, 0) + + err = st.SQLStore.WithDbSession(ctx, func(sess *db.Session) error { + s := strings.Builder{} + params := make([]any, 0) + + addToQuery := func(stmt string, p ...any) { + s.WriteString(stmt) + params = append(params, p...) + } + + addToQuery("SELECT * FROM alert_rule_state WHERE org_id = ?", cmd.RuleOrgID) + + if cmd.RuleUID != "" { + addToQuery(" AND rule_uid = ?", cmd.RuleUID) + } + + // Execute query to get compressed instances + type compressedRow struct { + OrgID int64 `xorm:"org_id"` + RuleUID string `xorm:"rule_uid"` + Data []byte `xorm:"data"` + } + + rows := make([]compressedRow, 0) + if err := sess.SQL(s.String(), params...).Find(&rows); err != nil { + return fmt.Errorf("failed to query alert_rule_state: %w", err) + } + + for _, row := range rows { + instances, err := decompressAlertInstances(row.Data) + if err != nil { + return fmt.Errorf("failed to decompress alert instances for rule %s: %w", row.RuleUID, err) + } + + // Convert proto instances to model instances + for _, protoInstance := range instances { + modelInstance := alertInstanceProtoToModel(row.RuleUID, row.OrgID, protoInstance) + if modelInstance != nil { + // If FlagAlertingNoNormalState is enabled, we should not return instances with normal state and no reason. + if st.FeatureToggles.IsEnabled(ctx, featuremgmt.FlagAlertingNoNormalState) { + if modelInstance.CurrentState == models.InstanceStateNormal && modelInstance.CurrentReason == "" { + continue + } + } + alertInstances = append(alertInstances, modelInstance) + } + } + } + + return nil + }) + + logger.Debug("ListAlertInstances completed", "instances", len(alertInstances)) + + return alertInstances, err +} + +func (st ProtoInstanceDBStore) SaveAlertInstance(ctx context.Context, alertInstance models.AlertInstance) error { + st.Logger.Error("SaveAlertInstance called and not implemented") + return errors.New("save alert instance is not implemented for proto instance database store") +} + +func (st ProtoInstanceDBStore) FetchOrgIds(ctx context.Context) ([]int64, error) { + orgIds := []int64{} + + err := st.SQLStore.WithDbSession(ctx, func(sess *db.Session) error { + s := strings.Builder{} + params := make([]any, 0) + + addToQuery := func(stmt string, p ...any) { + s.WriteString(stmt) + params = append(params, p...) + } + + addToQuery("SELECT DISTINCT org_id FROM alert_rule_state") + + if err := sess.SQL(s.String(), params...).Find(&orgIds); err != nil { + return err + } + return nil + }) + + return orgIds, err +} + +func (st ProtoInstanceDBStore) DeleteAlertInstances(ctx context.Context, keys ...models.AlertInstanceKey) error { + logger := st.Logger.FromContext(ctx) + logger.Error("DeleteAlertInstances called and not implemented") + return errors.New("delete alert instances is not implemented for proto instance database store") +} + +func (st ProtoInstanceDBStore) SaveAlertInstancesForRule(ctx context.Context, key models.AlertRuleKeyWithGroup, instances []models.AlertInstance) error { + logger := st.Logger.FromContext(ctx) + logger.Debug("SaveAlertInstancesForRule called", "rule_uid", key.UID, "org_id", key.OrgID, "instances", len(instances)) + + alert_instances_proto := make([]*pb.AlertInstance, len(instances)) + + for i, instance := range instances { + alert_instances_proto[i] = alertInstanceModelToProto(instance) + } + + compressedAlertInstances, err := compressAlertInstances(alert_instances_proto) + if err != nil { + return fmt.Errorf("failed to compress alert instances: %w", err) + } + + return st.SQLStore.WithTransactionalDbSession(ctx, func(sess *db.Session) error { + params := []any{key.OrgID, key.UID, compressedAlertInstances, time.Now()} + + upsertSQL := st.SQLStore.GetDialect().UpsertSQL( + "alert_rule_state", + []string{"org_id", "rule_uid"}, + []string{"org_id", "rule_uid", "data", "updated_at"}, + ) + _, err = sess.SQL(upsertSQL, params...).Query() + + return err + }) +} + +func (st ProtoInstanceDBStore) DeleteAlertInstancesByRule(ctx context.Context, key models.AlertRuleKeyWithGroup) error { + logger := st.Logger.FromContext(ctx) + logger.Debug("DeleteAlertInstancesByRule called", "rule_uid", key.UID, "org_id", key.OrgID) + + return st.SQLStore.WithTransactionalDbSession(ctx, func(sess *db.Session) error { + _, err := sess.Exec("DELETE FROM alert_rule_state WHERE org_id = ? AND rule_uid = ?", key.OrgID, key.UID) + return err + }) +} + +func (st ProtoInstanceDBStore) FullSync(ctx context.Context, instances []models.AlertInstance, batchSize int) error { + logger := st.Logger.FromContext(ctx) + logger.Error("FullSync called and not implemented") + return errors.New("fullsync is not implemented for proto instance database store") +} + +func alertInstanceModelToProto(modelInstance models.AlertInstance) *pb.AlertInstance { + return &pb.AlertInstance{ + Labels: modelInstance.Labels, + LabelsHash: modelInstance.LabelsHash, + CurrentState: string(modelInstance.CurrentState), + CurrentStateSince: timestamppb.New(modelInstance.CurrentStateSince), + CurrentStateEnd: timestamppb.New(modelInstance.CurrentStateEnd), + CurrentReason: modelInstance.CurrentReason, + LastEvalTime: timestamppb.New(modelInstance.LastEvalTime), + LastSentAt: nullableTimeToTimestamp(modelInstance.LastSentAt), + ResolvedAt: nullableTimeToTimestamp(modelInstance.ResolvedAt), + ResultFingerprint: modelInstance.ResultFingerprint, + } +} + +func compressAlertInstances(instances []*pb.AlertInstance) ([]byte, error) { + mProto, err := proto.Marshal(&pb.AlertInstances{Instances: instances}) + if err != nil { + return nil, fmt.Errorf("failed to marshal protobuf: %w", err) + } + + var b bytes.Buffer + writer := snappy.NewBufferedWriter(&b) + if _, err := writer.Write(mProto); err != nil { + return nil, fmt.Errorf("failed to write compressed data: %w", err) + } + + if err := writer.Close(); err != nil { + return nil, fmt.Errorf("failed to close snappy writer: %w", err) + } + + return b.Bytes(), nil +} + +func alertInstanceProtoToModel(ruleUID string, ruleOrgID int64, protoInstance *pb.AlertInstance) *models.AlertInstance { + if protoInstance == nil { + return nil + } + + return &models.AlertInstance{ + AlertInstanceKey: models.AlertInstanceKey{ + RuleOrgID: ruleOrgID, + RuleUID: ruleUID, + LabelsHash: protoInstance.LabelsHash, + }, + Labels: protoInstance.Labels, + CurrentState: models.InstanceStateType(protoInstance.CurrentState), + CurrentStateSince: protoInstance.CurrentStateSince.AsTime(), + CurrentStateEnd: protoInstance.CurrentStateEnd.AsTime(), + CurrentReason: protoInstance.CurrentReason, + LastEvalTime: protoInstance.LastEvalTime.AsTime(), + LastSentAt: nullableTimestampToTime(protoInstance.LastSentAt), + ResolvedAt: nullableTimestampToTime(protoInstance.ResolvedAt), + ResultFingerprint: protoInstance.ResultFingerprint, + } +} + +func decompressAlertInstances(compressed []byte) ([]*pb.AlertInstance, error) { + if len(compressed) == 0 { + return nil, nil + } + + reader := snappy.NewReader(bytes.NewReader(compressed)) + var b bytes.Buffer + if _, err := b.ReadFrom(reader); err != nil { + return nil, fmt.Errorf("failed to read compressed data: %w", err) + } + + var instances pb.AlertInstances + if err := proto.Unmarshal(b.Bytes(), &instances); err != nil { + return nil, fmt.Errorf("failed to unmarshal protobuf: %w", err) + } + + return instances.Instances, nil +} + +// nullableTimeToTimestamp converts a nullable time.Time to nil, if it is nil, otherwise it converts to timestamppb.Timestamp. +func nullableTimeToTimestamp(t *time.Time) *timestamppb.Timestamp { + if t == nil { + return nil + } + return timestamppb.New(*t) +} + +// nullableTimestampToTime converts a nullable timestamppb.Timestamp to nil, if it is nil, otherwise it converts to time.Time. +func nullableTimestampToTime(ts *timestamppb.Timestamp) *time.Time { + if ts == nil { + return nil + } + t := ts.AsTime() + return &t +} diff --git a/pkg/services/ngalert/store/proto_instance_database_test.go b/pkg/services/ngalert/store/proto_instance_database_test.go new file mode 100644 index 00000000000..0993a68da72 --- /dev/null +++ b/pkg/services/ngalert/store/proto_instance_database_test.go @@ -0,0 +1,176 @@ +package store + +import ( + "reflect" + "testing" + "time" + + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/grafana/grafana/pkg/services/ngalert/models" + pb "github.com/grafana/grafana/pkg/services/ngalert/store/proto/v1" +) + +func TestAlertInstanceModelToProto(t *testing.T) { + currentStateSince := time.Now() + currentStateEnd := currentStateSince.Add(time.Minute) + lastEvalTime := currentStateSince.Add(-time.Minute) + lastSentAt := currentStateSince.Add(-2 * time.Minute) + resolvedAt := currentStateSince.Add(-3 * time.Minute) + + tests := []struct { + name string + input models.AlertInstance + expected *pb.AlertInstance + }{ + { + name: "valid instance", + input: models.AlertInstance{ + Labels: map[string]string{"key": "value"}, + AlertInstanceKey: models.AlertInstanceKey{ + RuleUID: "rule-uid-1", + RuleOrgID: 1, + LabelsHash: "hash123", + }, + CurrentState: models.InstanceStateFiring, + CurrentStateSince: currentStateSince, + CurrentStateEnd: currentStateEnd, + CurrentReason: "Some reason", + LastEvalTime: lastEvalTime, + LastSentAt: &lastSentAt, + ResolvedAt: &resolvedAt, + ResultFingerprint: "fingerprint", + }, + expected: &pb.AlertInstance{ + Labels: map[string]string{"key": "value"}, + LabelsHash: "hash123", + CurrentState: "Alerting", + CurrentStateSince: timestamppb.New(currentStateSince), + CurrentStateEnd: timestamppb.New(currentStateEnd), + CurrentReason: "Some reason", + LastEvalTime: timestamppb.New(lastEvalTime), + LastSentAt: toProtoTimestampPtr(&lastSentAt), + ResolvedAt: toProtoTimestampPtr(&resolvedAt), + ResultFingerprint: "fingerprint", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := alertInstanceModelToProto(tt.input) + require.Equal(t, tt.expected, result) + }) + } +} + +func TestAlertInstanceProtoToModel(t *testing.T) { + currentStateSince := time.Now().UTC() + currentStateEnd := currentStateSince.Add(time.Minute).UTC() + lastEvalTime := currentStateSince.Add(-time.Minute).UTC() + lastSentAt := currentStateSince.Add(-2 * time.Minute).UTC() + resolvedAt := currentStateSince.Add(-3 * time.Minute).UTC() + ruleUID := "rule-uid-1" + orgID := int64(1) + + tests := []struct { + name string + input *pb.AlertInstance + expected *models.AlertInstance + }{ + { + name: "valid instance", + input: &pb.AlertInstance{ + Labels: map[string]string{"key": "value"}, + LabelsHash: "hash123", + CurrentState: "Alerting", + CurrentStateSince: timestamppb.New(currentStateSince), + CurrentStateEnd: timestamppb.New(currentStateEnd), + LastEvalTime: timestamppb.New(lastEvalTime), + LastSentAt: toProtoTimestampPtr(&lastSentAt), + ResolvedAt: toProtoTimestampPtr(&resolvedAt), + ResultFingerprint: "fingerprint", + }, + expected: &models.AlertInstance{ + Labels: map[string]string{"key": "value"}, + AlertInstanceKey: models.AlertInstanceKey{ + RuleUID: ruleUID, + RuleOrgID: orgID, + LabelsHash: "hash123", + }, + CurrentState: models.InstanceStateFiring, + CurrentStateSince: currentStateSince, + CurrentStateEnd: currentStateEnd, + LastEvalTime: lastEvalTime, + LastSentAt: &lastSentAt, + ResolvedAt: &resolvedAt, + ResultFingerprint: "fingerprint", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := alertInstanceProtoToModel(ruleUID, orgID, tt.input) + require.Equal(t, tt.expected, result) + }) + } +} + +func TestModelAlertInstanceMatchesProtobuf(t *testing.T) { + // The AlertInstance protobuf must always contain the same information + // as the model, so that it's preserved between the Grafana restarts. + // + // If the AlertInstance model changes, review the protobuf and the test + // and update them accordingly. + t.Run("when AlertInstance model changes", func(t *testing.T) { + modelType := reflect.TypeOf(models.AlertInstance{}) + require.Equal(t, 10, modelType.NumField(), "AlertInstance model has changed, update the protobuf") + }) +} + +func TestCompressAndDecompressAlertInstances(t *testing.T) { + now := time.Now() + + alertInstances := []*pb.AlertInstance{ + { + Labels: map[string]string{"label-1": "value-1"}, + LabelsHash: "hash-1", + CurrentState: "normal", + CurrentStateSince: timestamppb.New(now), + CurrentStateEnd: timestamppb.New(now.Add(time.Hour)), + CurrentReason: "reason-1", + LastEvalTime: timestamppb.New(now.Add(-time.Minute)), + ResolvedAt: timestamppb.New(now.Add(time.Hour * 2)), + ResultFingerprint: "fingerprint-1", + }, + { + Labels: map[string]string{"label-2": "value-2"}, + LabelsHash: "hash-2", + CurrentState: "firing", + CurrentStateSince: timestamppb.New(now), + CurrentReason: "reason-2", + LastEvalTime: timestamppb.New(now.Add(-time.Minute * 2)), + }, + } + + compressedData, err := compressAlertInstances(alertInstances) + require.NoError(t, err) + + decompressedInstances, err := decompressAlertInstances(compressedData) + require.NoError(t, err) + + // Compare the original and decompressed instances + require.Equal(t, len(alertInstances), len(decompressedInstances)) + require.EqualExportedValues(t, alertInstances[0], decompressedInstances[0]) + require.EqualExportedValues(t, alertInstances[1], decompressedInstances[1]) +} + +func toProtoTimestampPtr(tm *time.Time) *timestamppb.Timestamp { + if tm == nil { + return nil + } + + return timestamppb.New(*tm) +} diff --git a/pkg/services/ngalert/tests/util.go b/pkg/services/ngalert/tests/util.go index 6188d6291a9..8ea4fef13c3 100644 --- a/pkg/services/ngalert/tests/util.go +++ b/pkg/services/ngalert/tests/util.go @@ -41,10 +41,30 @@ import ( "github.com/grafana/grafana/pkg/util" ) +type TestEnvOptions struct { + featureToggles featuremgmt.FeatureToggles +} + +type TestEnvOption func(*TestEnvOptions) + +func WithFeatureToggles(toggles featuremgmt.FeatureToggles) TestEnvOption { + return func(opts *TestEnvOptions) { + opts.featureToggles = toggles + } +} + // SetupTestEnv initializes a store to used by the tests. -func SetupTestEnv(tb testing.TB, baseInterval time.Duration) (*ngalert.AlertNG, *store.DBstore) { +func SetupTestEnv(tb testing.TB, baseInterval time.Duration, opts ...TestEnvOption) (*ngalert.AlertNG, *store.DBstore) { tb.Helper() + options := TestEnvOptions{ + featureToggles: featuremgmt.WithFeatures(), + } + + for _, opt := range opts { + opt(&options) + } + cfg := setting.NewCfg() cfg.UnifiedAlerting = setting.UnifiedAlertingSettings{ BaseInterval: setting.SchedulerBaseInterval, @@ -64,18 +84,18 @@ func SetupTestEnv(tb testing.TB, baseInterval time.Duration) (*ngalert.AlertNG, bus := bus.ProvideBus(tracer) folderStore := folderimpl.ProvideDashboardFolderStore(sqlStore) dashboardService, dashboardStore := testutil.SetupDashboardService(tb, sqlStore, folderStore, cfg) - features := featuremgmt.WithFeatures() - folderService := testutil.SetupFolderService(tb, cfg, sqlStore, dashboardStore, folderStore, bus, features, ac) - ruleStore, err := store.ProvideDBStore(cfg, featuremgmt.WithFeatures(), sqlStore, folderService, &dashboards.FakeDashboardService{}, ac, bus) + folderService := testutil.SetupFolderService(tb, cfg, sqlStore, dashboardStore, folderStore, bus, options.featureToggles, ac) + ruleStore, err := store.ProvideDBStore(cfg, options.featureToggles, sqlStore, folderService, &dashboards.FakeDashboardService{}, ac, bus) require.NoError(tb, err) ng, err := ngalert.ProvideService( - cfg, features, nil, nil, routing.NewRouteRegister(), sqlStore, kvstore.NewFakeKVStore(), nil, nil, quotatest.New(false, nil), + cfg, options.featureToggles, nil, nil, routing.NewRouteRegister(), sqlStore, kvstore.NewFakeKVStore(), nil, nil, quotatest.New(false, nil), secretsService, nil, m, folderService, ac, &dashboards.FakeDashboardService{}, nil, bus, ac, annotationstest.NewFakeAnnotationsRepo(), &pluginstore.FakePluginStore{}, tracer, ruleStore, httpclient.NewProvider(), ngalertfakes.NewFakeReceiverPermissionsService(), ) require.NoError(tb, err) + return ng, &store.DBstore{ - FeatureToggles: features, + FeatureToggles: options.featureToggles, SQLStore: ng.SQLStore, Cfg: setting.UnifiedAlertingSettings{ BaseInterval: baseInterval * time.Second, diff --git a/pkg/services/sqlstore/migrations/migrations.go b/pkg/services/sqlstore/migrations/migrations.go index 317eb39a13d..40704dac838 100644 --- a/pkg/services/sqlstore/migrations/migrations.go +++ b/pkg/services/sqlstore/migrations/migrations.go @@ -143,4 +143,6 @@ func (oss *OSSMigrations) AddMigration(mg *Migrator) { accesscontrol.AddReceiverCreateScopeMigration(mg) ualert.AddAlertRuleUpdatedByMigration(mg) + + ualert.AddAlertRuleStateTable(mg) } diff --git a/pkg/services/sqlstore/migrations/ualert/alert_rule_state_table.go b/pkg/services/sqlstore/migrations/ualert/alert_rule_state_table.go new file mode 100644 index 00000000000..bf2a662c063 --- /dev/null +++ b/pkg/services/sqlstore/migrations/ualert/alert_rule_state_table.go @@ -0,0 +1,29 @@ +package ualert + +import "github.com/grafana/grafana/pkg/services/sqlstore/migrator" + +// AddAlertRuleStateTable adds column to store alert rule state data. +func AddAlertRuleStateTable(mg *migrator.Migrator) { + alertStateTable := migrator.Table{ + Name: "alert_rule_state", + Columns: []*migrator.Column{ + {Name: "id", Type: migrator.DB_BigInt, IsPrimaryKey: true, IsAutoIncrement: true}, + {Name: "org_id", Type: migrator.DB_BigInt, Nullable: false}, + {Name: "rule_uid", Type: migrator.DB_NVarchar, Length: UIDMaxLength, Nullable: false}, + {Name: "data", Type: migrator.DB_LongBlob, Nullable: false}, + {Name: "updated_at", Type: migrator.DB_DateTime, Nullable: false}, + }, + Indices: []*migrator.Index{ + {Cols: []string{"org_id", "rule_uid"}, Type: migrator.UniqueIndex}, + }, + } + + mg.AddMigration( + "add alert_rule_state table", + migrator.NewAddTableMigration(alertStateTable), + ) + mg.AddMigration( + "add index to alert_rule_state on org_id and rule_uid columns", + migrator.NewAddIndexMigration(alertStateTable, alertStateTable.Indices[0]), + ) +} diff --git a/public/app/plugins/datasource/azuremonitor/dataquery.cue b/public/app/plugins/datasource/azuremonitor/dataquery.cue index ea1b86ead3e..32447a67a04 100644 --- a/public/app/plugins/datasource/azuremonitor/dataquery.cue +++ b/public/app/plugins/datasource/azuremonitor/dataquery.cue @@ -47,11 +47,11 @@ composableKinds: DataQuery: { // Resource group used in template variable queries resourceGroup?: string // Namespace used in template variable queries - namespace?: string + namespace?: string // Resource used in template variable queries - resource?: string + resource?: string // Region used in template variable queries - region?: string + region?: string // Custom namespace used in template variable queries customNamespace?: string // Azure Monitor query type.