Alerting: Rewrite range to instant queries if possible (#69976)

This commit is contained in:
Jean-Philippe Quéméner
2023-06-16 19:55:49 +02:00
committed by GitHub
parent a4a16b62c7
commit 934ba1aaa1
9 changed files with 277 additions and 4 deletions

View File

@@ -114,6 +114,7 @@ Experimental features might be changed or removed without prior notice.
| `cloudWatchLogsMonacoEditor` | Enables the Monaco editor for CloudWatch Logs queries |
| `exploreScrollableLogsContainer` | Improves the scrolling behavior of logs in Explore |
| `recordedQueriesMulti` | Enables writing multiple items from a single query within Recorded Queries |
| `alertingLokiRangeToInstant` | Rewrites eligible loki range queries to instant queries |
## Development feature toggles

View File

@@ -101,4 +101,5 @@ export interface FeatureToggles {
cloudWatchLogsMonacoEditor?: boolean;
exploreScrollableLogsContainer?: boolean;
recordedQueriesMulti?: boolean;
alertingLokiRangeToInstant?: boolean;
}

View File

@@ -564,5 +564,12 @@ var (
Stage: FeatureStageExperimental,
Owner: grafanaObservabilityMetricsSquad,
},
{
Name: "alertingLokiRangeToInstant",
Description: "Rewrites eligible loki range queries to instant queries",
Stage: FeatureStageExperimental,
FrontendOnly: false,
Owner: grafanaAlertingSquad,
},
}
)

View File

@@ -82,3 +82,4 @@ sqlDatasourceDatabaseSelection,preview,@grafana/grafana-bi-squad,false,false,fal
cloudWatchLogsMonacoEditor,experimental,@grafana/aws-plugins,false,false,false,true
exploreScrollableLogsContainer,experimental,@grafana/observability-logs,false,false,false,true
recordedQueriesMulti,experimental,@grafana/observability-metrics,false,false,false,false
alertingLokiRangeToInstant,experimental,@grafana/alerting-squad,false,false,false,false
1 Name Stage Owner requiresDevMode RequiresLicense RequiresRestart FrontendOnly
82 cloudWatchLogsMonacoEditor experimental @grafana/aws-plugins false false false true
83 exploreScrollableLogsContainer experimental @grafana/observability-logs false false false true
84 recordedQueriesMulti experimental @grafana/observability-metrics false false false false
85 alertingLokiRangeToInstant experimental @grafana/alerting-squad false false false false

View File

@@ -338,4 +338,8 @@ const (
// FlagRecordedQueriesMulti
// Enables writing multiple items from a single query within Recorded Queries
FlagRecordedQueriesMulti = "recordedQueriesMulti"
// FlagAlertingLokiRangeToInstant
// Rewrites eligible loki range queries to instant queries
FlagAlertingLokiRangeToInstant = "alertingLokiRangeToInstant"
)

View File

@@ -10,6 +10,7 @@ import (
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/services/dashboards"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/folder"
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/search/model"
@@ -539,7 +540,7 @@ func (st DBstore) GetAlertRulesForScheduling(ctx context.Context, query *ngmodel
st.Logger.Error("unable to close rows session", "error", err)
}
}()
lokiRangeToInstantEnabled := st.FeatureToggles.IsEnabled(featuremgmt.FlagAlertingLokiRangeToInstant)
// Deserialize each rule separately in case any of them contain invalid JSON.
for rows.Next() {
rule := new(ngmodels.AlertRule)
@@ -548,6 +549,17 @@ func (st DBstore) GetAlertRulesForScheduling(ctx context.Context, query *ngmodel
st.Logger.Error("Invalid rule found in DB store, ignoring it", "func", "GetAlertRulesForScheduling", "error", err)
continue
}
// This was added to mitigate the high load that could be created by loki range queries.
// In previous versions of Grafana, Loki datasources would default to range queries
// instead of instant queries, sometimes creating unnecessary load. This is only
// done for Grafana Cloud.
if lokiRangeToInstantEnabled && canBeInstant(rule) {
if err := migrateToInstant(rule); err != nil {
st.Logger.Error("Could not migrate rule from range to instant query", "rule", rule.UID, "err", err)
} else {
st.Logger.Info("Migrated rule from range to instant query", "rule", rule.UID)
}
}
rules = append(rules, rule)
}

View File

@@ -12,6 +12,7 @@ import (
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/log/logtest"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/folder"
"github.com/grafana/grafana/pkg/services/folder/folderimpl"
"github.com/grafana/grafana/pkg/services/ngalert/testutil"
@@ -324,9 +325,10 @@ func TestIntegration_GetAlertRulesForScheduling(t *testing.T) {
sqlStore := db.InitTestDB(t)
store := &DBstore{
SQLStore: sqlStore,
Cfg: cfg.UnifiedAlerting,
FolderService: setupFolderService(t, sqlStore, cfg),
SQLStore: sqlStore,
Cfg: cfg.UnifiedAlerting,
FolderService: setupFolderService(t, sqlStore, cfg),
FeatureToggles: featuremgmt.WithFeatures(),
}
generator := models.AlertRuleGen(withIntervalMatching(store.Cfg.BaseInterval), models.WithUniqueID(), models.WithUniqueOrgID())

View File

@@ -0,0 +1,63 @@
package store
import (
"encoding/json"
"github.com/grafana/grafana/pkg/services/ngalert/models"
)
const (
grafanaCloudLogs = "grafanacloud-logs"
grafanaCloudUsageInsights = "grafanacloud-usage-insights"
grafanaCloudStateHistory = "grafanacloud-loki-alert-state-history"
)
func canBeInstant(r *models.AlertRule) bool {
if len(r.Data) < 2 {
return false
}
// First query part should be range query.
if r.Data[0].QueryType != "range" {
return false
}
// First query part should go to cloud logs or insights.
if r.Data[0].DatasourceUID != grafanaCloudLogs &&
r.Data[0].DatasourceUID != grafanaCloudUsageInsights &&
r.Data[0].DatasourceUID != grafanaCloudStateHistory {
return false
}
// Second query part should be and expression, '-100' is the legacy way to define it.
if r.Data[1].DatasourceUID != "__expr__" && r.Data[1].DatasourceUID != "-100" {
return false
}
exprRaw := make(map[string]interface{})
if err := json.Unmarshal(r.Data[1].Model, &exprRaw); err != nil {
return false
}
// Second query part should be "last()"
if val, ok := exprRaw["reducer"].(string); !ok || val != "last" {
return false
}
// Second query part should use first query part as expression.
if ref, ok := exprRaw["expression"].(string); !ok || ref != r.Data[0].RefID {
return false
}
return true
}
// migrateToInstant will move a range-query to an instant query. This should only
// be used for loki.
func migrateToInstant(r *models.AlertRule) error {
modelRaw := make(map[string]interface{})
if err := json.Unmarshal(r.Data[0].Model, &modelRaw); err != nil {
return err
}
modelRaw["queryType"] = "instant"
model, err := json.Marshal(modelRaw)
if err != nil {
return err
}
r.Data[0].Model = model
r.Data[0].QueryType = "instant"
return nil
}

View File

@@ -0,0 +1,182 @@
package store
import (
"encoding/json"
"testing"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/stretchr/testify/require"
)
func TestCanBeInstant(t *testing.T) {
tcs := []struct {
name string
expected bool
rule *models.AlertRule
}{
{
name: "valid rule that can be migrated from range to instant",
expected: true,
rule: createMigrateableLokiRule(t),
},
{
name: "invalid rule where the data array is too short to be migrateable",
expected: false,
rule: createMigrateableLokiRule(t, func(r *models.AlertRule) {
r.Data = []models.AlertQuery{r.Data[0]}
}),
},
{
name: "invalid rule that is not a range query",
expected: false,
rule: createMigrateableLokiRule(t, func(r *models.AlertRule) {
r.Data[0].QueryType = "something-else"
}),
},
{
name: "invalid rule that does not use a cloud datasource",
expected: false,
rule: createMigrateableLokiRule(t, func(r *models.AlertRule) {
r.Data[0].DatasourceUID = "something-else"
}),
},
{
name: "invalid rule that has no aggregation as second item",
expected: false,
rule: createMigrateableLokiRule(t, func(r *models.AlertRule) {
r.Data[1].DatasourceUID = "something-else"
}),
},
{
name: "invalid rule that has not last() as aggregation",
expected: false,
rule: createMigrateableLokiRule(t, func(r *models.AlertRule) {
raw := make(map[string]interface{})
err := json.Unmarshal(r.Data[1].Model, &raw)
require.NoError(t, err)
raw["reducer"] = "avg"
r.Data[1].Model, err = json.Marshal(raw)
require.NoError(t, err)
}),
},
{
name: "invalid rule that has not last() pointing to range query",
expected: false,
rule: createMigrateableLokiRule(t, func(r *models.AlertRule) {
raw := make(map[string]interface{})
err := json.Unmarshal(r.Data[1].Model, &raw)
require.NoError(t, err)
raw["expression"] = "C"
r.Data[1].Model, err = json.Marshal(raw)
require.NoError(t, err)
}),
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.expected, canBeInstant(tc.rule))
})
}
}
func TestMigrateLokiQueryToInstant(t *testing.T) {
original := createMigrateableLokiRule(t)
mirgrated := createMigrateableLokiRule(t, func(r *models.AlertRule) {
r.Data[0].QueryType = "instant"
r.Data[0].Model = []byte(`{
"datasource": {
"type": "loki",
"uid": "grafanacloud-logs"
},
"editorMode": "code",
"expr": "1",
"hide": false,
"intervalMs": 1000,
"maxDataPoints": 43200,
"queryType": "instant",
"refId": "A"
}`)
})
require.True(t, canBeInstant(original))
require.NoError(t, migrateToInstant(original))
require.Equal(t, mirgrated.Data[0].QueryType, original.Data[0].QueryType)
originalModel := make(map[string]interface{})
require.NoError(t, json.Unmarshal(original.Data[0].Model, &originalModel))
migratedModel := make(map[string]interface{})
require.NoError(t, json.Unmarshal(mirgrated.Data[0].Model, &migratedModel))
require.Equal(t, migratedModel, originalModel)
require.False(t, canBeInstant(original))
}
func createMigrateableLokiRule(t *testing.T, muts ...func(*models.AlertRule)) *models.AlertRule {
t.Helper()
r := &models.AlertRule{
Data: []models.AlertQuery{
{
RefID: "A",
QueryType: "range",
DatasourceUID: grafanaCloudLogs,
Model: []byte(`{
"datasource": {
"type": "loki",
"uid": "grafanacloud-logs"
},
"editorMode": "code",
"expr": "1",
"hide": false,
"intervalMs": 1000,
"maxDataPoints": 43200,
"queryType": "range",
"refId": "A"
}`),
},
{
RefID: "B",
DatasourceUID: "__expr__",
Model: []byte(`{
"conditions": [
{
"evaluator": {
"params": [],
"type": "gt"
},
"operator": {
"type": "and"
},
"query": {
"params": [
"B"
]
},
"reducer": {
"params": [],
"type": "last"
},
"type": "query"
}
],
"datasource": {
"type": "__expr__",
"uid": "__expr__"
},
"expression": "A",
"hide": false,
"intervalMs": 1000,
"maxDataPoints": 43200,
"reducer": "last",
"refId": "B",
"type": "reduce"
}`),
},
},
}
for _, m := range muts {
m(r)
}
return r
}