Alerting: Scheduler use rule fingerprint instead of version (#66531)

* implement calculation of fingerprint for ruleWithFolder
* update scheduler to use fingerprint instead of rule's version
This commit is contained in:
Yuri Tseretyan 2023-04-28 10:42:16 -04:00 committed by GitHub
parent ffd83a027d
commit 9eb10bee1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 367 additions and 83 deletions

View File

@ -2,9 +2,15 @@ package schedule
import (
"context"
"encoding/binary"
"errors"
"fmt"
"hash/fnv"
"math"
"sort"
"sync"
"time"
"unsafe"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/util"
@ -62,10 +68,9 @@ func (r *alertRuleInfoRegistry) keyMap() map[models.AlertRuleKey]struct{} {
return definitionsIDs
}
type ruleVersion int64
type ruleVersionAndPauseStatus struct {
Version ruleVersion
IsPaused bool
Fingerprint fingerprint
IsPaused bool
}
type alertRuleInfo struct {
@ -106,20 +111,15 @@ func (a *alertRuleInfo) eval(eval *evaluation) (bool, *evaluation) {
// update sends an instruction to the rule evaluation routine to update the scheduled rule to the specified version. The specified version must be later than the current version, otherwise no update will happen.
func (a *alertRuleInfo) update(lastVersion ruleVersionAndPauseStatus) bool {
// check if the channel is not empty.
msg := lastVersion
select {
case v := <-a.updateCh:
// if it has a version pick the greatest one.
if v.Version > msg.Version {
msg = v
}
case <-a.updateCh:
case <-a.ctx.Done():
return false
default:
}
select {
case a.updateCh <- msg:
case a.updateCh <- lastVersion:
return true
case <-a.ctx.Done():
return false
@ -233,3 +233,129 @@ func (r *alertRulesRegistry) getDiff(rules map[models.AlertRuleKey]*models.Alert
}
return result
}
type fingerprint uint64
func (f fingerprint) String() string {
return fmt.Sprintf("%016x", uint64(f))
}
// fingerprintSeparator used during calculation of fingerprint to separate different fields. Contains a byte sequence that cannot happen in UTF-8 strings.
var fingerprintSeparator = []byte{255}
type ruleWithFolder struct {
rule *models.AlertRule
folderTitle string
}
// fingerprint calculates a fingerprint that includes all fields except rule's Version and Update timestamp.
func (r ruleWithFolder) Fingerprint() fingerprint {
rule := r.rule
sum := fnv.New64()
writeBytes := func(b []byte) {
_, _ = sum.Write(b)
_, _ = sum.Write(fingerprintSeparator)
}
writeString := func(s string) {
if len(s) == 0 {
writeBytes(nil)
return
}
// TODO fix it when upgraded to in GO1.20 to
/*
writeBytes(unsafe.Slice(unsafe.StringData(s), len(s))) //nolint:gosec
*/
// avoid allocation when converting string to byte slice
writeBytes(*(*[]byte)(unsafe.Pointer(&s))) //nolint:gosec
}
// this temp slice is used to convert ints to bytes.
tmp := make([]byte, 8)
writeInt := func(u int64) {
binary.LittleEndian.PutUint64(tmp, uint64(u))
writeBytes(tmp)
}
// allocate a slice that will be used for sorting keys, so we allocate it only once
var keys []string
maxLen := int(math.Max(math.Max(float64(len(rule.Annotations)), float64(len(rule.Labels))), float64(len(rule.Data))))
if maxLen > 0 {
keys = make([]string, maxLen)
}
writeLabels := func(lbls map[string]string) {
// maps do not guarantee predictable sequence of keys.
// Therefore, to make hash stable, we need to sort keys
if len(lbls) == 0 {
return
}
idx := 0
for labelName := range lbls {
keys[idx] = labelName
idx++
}
sub := keys[:idx]
sort.Strings(sub)
for _, name := range sub {
writeString(name)
writeString(lbls[name])
}
}
writeQuery := func() {
// The order of queries is not important as they represent an expression tree.
// Therefore, the order of elements should not change the hash. Sort by RefID because it is the unique key.
for i, q := range rule.Data {
keys[i] = q.RefID
}
sub := keys[:len(rule.Data)]
sort.Strings(sub)
for _, id := range sub {
for _, q := range rule.Data {
if q.RefID == id {
writeString(q.RefID)
writeString(q.DatasourceUID)
writeString(q.QueryType)
writeInt(int64(q.RelativeTimeRange.From))
writeInt(int64(q.RelativeTimeRange.To))
writeBytes(q.Model)
break
}
}
}
}
// fields that determine the rule state
writeString(rule.UID)
writeString(rule.Title)
writeString(rule.NamespaceUID)
writeString(r.folderTitle)
writeLabels(rule.Labels)
writeString(rule.Condition)
writeQuery()
if rule.IsPaused {
writeInt(1)
} else {
writeInt(0)
}
// fields that do not affect the state.
// TODO consider removing fields below from the fingerprint
writeInt(rule.ID)
writeInt(rule.OrgID)
writeInt(rule.IntervalSeconds)
writeInt(int64(rule.For))
writeLabels(rule.Annotations)
if rule.DashboardUID != nil {
writeString(*rule.DashboardUID)
}
if rule.PanelID != nil {
writeInt(*rule.PanelID)
}
writeString(rule.RuleGroup)
writeInt(int64(rule.RuleGroupIndex))
writeString(string(rule.NoDataState))
writeString(string(rule.ExecErrState))
return fingerprint(sum.Sum64())
}

View File

@ -0,0 +1,30 @@
package schedule
import (
"fmt"
"io"
"math/rand"
"testing"
"github.com/google/uuid"
"github.com/grafana/grafana/pkg/services/ngalert/models"
)
func BenchmarkRuleWithFolderFingerprint(b *testing.B) {
rules := models.GenerateAlertRules(b.N, models.AlertRuleGen(func(rule *models.AlertRule) {
rule.Data = make([]models.AlertQuery, 0, 5)
for i := 0; i < rand.Intn(5)+1; i++ {
rule.Data = append(rule.Data, models.GenerateAlertQuery())
}
}))
folder := uuid.NewString()
b.ReportAllocs()
b.ResetTimer()
var f fingerprint
for i := 0; i < b.N; i++ {
f = ruleWithFolder{rule: rules[i], folderTitle: folder}.Fingerprint()
}
b.StopTimer()
_, _ = fmt.Fprint(io.Discard, f)
}

View File

@ -2,13 +2,16 @@ package schedule
import (
"context"
"encoding/json"
"math"
"math/rand"
"reflect"
"runtime"
"sync"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -27,7 +30,7 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
r := newAlertRuleInfo(context.Background())
resultCh := make(chan bool)
go func() {
resultCh <- r.update(ruleVersionAndPauseStatus{ruleVersion(rand.Int63()), false})
resultCh <- r.update(ruleVersionAndPauseStatus{fingerprint(rand.Uint64()), false})
}()
select {
case <-r.updateCh:
@ -38,52 +41,26 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
})
t.Run("update should drop any concurrent sending to updateCh", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
version1 := ruleVersion(rand.Int31())
version2 := version1 + 1
version1 := ruleVersionAndPauseStatus{fingerprint(rand.Uint64()), false}
version2 := ruleVersionAndPauseStatus{fingerprint(rand.Uint64()), false}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
wg.Done()
r.update(ruleVersionAndPauseStatus{version1, false})
r.update(version1)
wg.Done()
}()
wg.Wait()
wg.Add(2) // one when time1 is sent, another when go-routine for time2 has started
go func() {
wg.Done()
r.update(ruleVersionAndPauseStatus{version2, false})
r.update(version2)
}()
wg.Wait() // at this point tick 1 has already been dropped
select {
case version := <-r.updateCh:
require.Equal(t, ruleVersionAndPauseStatus{version2, false}, version)
case <-time.After(5 * time.Second):
t.Fatal("No message was received on eval channel")
}
})
t.Run("update should drop any concurrent sending to updateCh and use greater version", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
version1 := ruleVersion(rand.Int31())
version2 := version1 + 1
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
wg.Done()
r.update(ruleVersionAndPauseStatus{version2, false})
wg.Done()
}()
wg.Wait()
wg.Add(2) // one when time1 is sent, another when go-routine for time2 has started
go func() {
wg.Done()
r.update(ruleVersionAndPauseStatus{version1, false})
}()
wg.Wait() // at this point tick 1 has already been dropped
select {
case version := <-r.updateCh:
require.Equal(t, ruleVersionAndPauseStatus{version2, false}, version)
require.Equal(t, version2, version)
case <-time.After(5 * time.Second):
t.Fatal("No message was received on eval channel")
}
@ -185,7 +162,7 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
r := newAlertRuleInfo(context.Background())
r.stop(errRuleDeleted)
require.ErrorIs(t, r.ctx.Err(), errRuleDeleted)
require.False(t, r.update(ruleVersionAndPauseStatus{ruleVersion(rand.Int63()), false}))
require.False(t, r.update(ruleVersionAndPauseStatus{fingerprint(rand.Uint64()), false}))
})
t.Run("eval should do nothing", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
@ -237,7 +214,7 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
}
switch rand.Intn(max) + 1 {
case 1:
r.update(ruleVersionAndPauseStatus{ruleVersion(rand.Int63()), false})
r.update(ruleVersionAndPauseStatus{fingerprint(rand.Uint64()), false})
case 2:
r.eval(&evaluation{
scheduledAt: time.Now(),
@ -368,3 +345,148 @@ func TestSchedulableAlertRulesRegistry_set(t *testing.T) {
require.Equal(t, expectedUpdated, diff.updated)
})
}
func TestRuleWithFolderFingerprint(t *testing.T) {
rule := models.AlertRuleGen()()
title := uuid.NewString()
f := ruleWithFolder{rule: rule, folderTitle: title}.Fingerprint()
t.Run("should calculate a fingerprint", func(t *testing.T) {
require.NotEqual(t, 0, uint64(f))
})
t.Run("mirror copy should have the same fingerprint", func(t *testing.T) {
f2 := ruleWithFolder{rule: models.CopyRule(rule), folderTitle: title}.Fingerprint()
require.Equal(t, f, f2)
})
t.Run("order of queries should not affect the fingerprint", func(t *testing.T) {
cp := models.CopyRule(rule)
rand.Shuffle(len(cp.Data), func(i, j int) {
cp.Data[i], cp.Data[j] = cp.Data[j], cp.Data[i]
})
f2 := ruleWithFolder{rule: cp, folderTitle: title}.Fingerprint()
require.Equal(t, f, f2)
})
t.Run("folder name should be used in fingerprint", func(t *testing.T) {
f2 := ruleWithFolder{rule: rule, folderTitle: uuid.NewString()}.Fingerprint()
require.NotEqual(t, f, f2)
})
t.Run("Version and Updated should be excluded from fingerprint", func(t *testing.T) {
cp := models.CopyRule(rule)
cp.Version++
cp.Updated = cp.Updated.Add(1 * time.Second)
f2 := ruleWithFolder{rule: cp, folderTitle: title}.Fingerprint()
require.Equal(t, f, f2)
})
t.Run("all other fields should be considered", func(t *testing.T) {
r1 := &models.AlertRule{
ID: 1,
OrgID: 2,
Title: "test",
Condition: "A",
Data: []models.AlertQuery{
{
RefID: "1",
QueryType: "323",
RelativeTimeRange: models.RelativeTimeRange{
From: 1,
To: 2,
},
DatasourceUID: "123",
Model: json.RawMessage(`{"test": "test-model"}`),
},
},
Updated: time.Now(),
IntervalSeconds: 2,
Version: 1,
UID: "test-uid",
NamespaceUID: "test-ns",
DashboardUID: func(s string) *string { return &s }("dashboard"),
PanelID: func(i int64) *int64 { return &i }(123),
RuleGroup: "test-group",
RuleGroupIndex: 1,
NoDataState: "test-nodata",
ExecErrState: "test-err",
For: 12,
Annotations: map[string]string{
"key-annotation": "value-annotation",
},
Labels: map[string]string{
"key-label": "value-label",
},
IsPaused: false,
}
r2 := &models.AlertRule{
ID: 2,
OrgID: 3,
Title: "test-2",
Condition: "B",
Data: []models.AlertQuery{
{
RefID: "2",
QueryType: "12313123",
RelativeTimeRange: models.RelativeTimeRange{
From: 2,
To: 3,
},
DatasourceUID: "asdasdasd21",
Model: json.RawMessage(`{"test": "test-model-2"}`),
},
},
IntervalSeconds: 23,
UID: "test-uid2",
NamespaceUID: "test-ns2",
DashboardUID: func(s string) *string { return &s }("dashboard-2"),
PanelID: func(i int64) *int64 { return &i }(1222),
RuleGroup: "test-group-2",
RuleGroupIndex: 22,
NoDataState: "test-nodata2",
ExecErrState: "test-err2",
For: 1141,
Annotations: map[string]string{
"key-annotation2": "value-annotation",
},
Labels: map[string]string{
"key-label": "value-label23",
},
IsPaused: true,
}
excludedFields := map[string]struct{}{
"Version": {},
"Updated": {},
}
tp := reflect.TypeOf(rule).Elem()
var nonDiffFields []string
// making sure that we get completely different struct
dif := r1.Diff(r2)
nonDiffFields = make([]string, 0)
for j := 0; j < tp.NumField(); j++ {
name := tp.Field(j).Name
if _, ok := excludedFields[name]; ok {
continue
}
if len(dif.GetDiffsForField(tp.Field(j).Name)) == 0 {
nonDiffFields = append(nonDiffFields, tp.Field(j).Name)
}
}
require.Emptyf(t, nonDiffFields, "cannot generate completely unique alert rule. Some fields are not randomized")
r2v := reflect.ValueOf(r2).Elem()
for i := 0; i < tp.NumField(); i++ {
if _, ok := excludedFields[tp.Field(i).Name]; ok {
continue
}
cp := models.CopyRule(r1)
v := reflect.ValueOf(cp).Elem()
vf := v.Field(i)
vf.Set(r2v.Field(i))
f2 := ruleWithFolder{rule: cp, folderTitle: title}.Fingerprint()
if f2 == f {
t.Fatalf("Field %s does not seem to be used in fingerprint. Diff: %s", tp.Field(i).Name, r1.Diff(cp))
}
}
})
}

View File

@ -271,16 +271,18 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
itemFrequency := item.IntervalSeconds / int64(sch.baseInterval.Seconds())
isReadyToRun := item.IntervalSeconds != 0 && tickNum%itemFrequency == 0
if isReadyToRun {
var folderTitle string
if !sch.disableGrafanaFolder {
title, ok := folderTitles[item.NamespaceUID]
if ok {
folderTitle = title
} else {
missingFolder[item.NamespaceUID] = append(missingFolder[item.NamespaceUID], item.UID)
}
var folderTitle string
if !sch.disableGrafanaFolder {
title, ok := folderTitles[item.NamespaceUID]
if ok {
folderTitle = title
} else {
missingFolder[item.NamespaceUID] = append(missingFolder[item.NamespaceUID], item.UID)
}
}
if isReadyToRun {
readyToRun = append(readyToRun, readyToRunItem{ruleInfo: ruleInfo, evaluation: evaluation{
scheduledAt: tick,
rule: item,
@ -292,8 +294,8 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
sch.log.Debug("Rule has been updated. Notifying evaluation routine", key.LogContext()...)
go func(ri *alertRuleInfo, rule *ngmodels.AlertRule) {
ri.update(ruleVersionAndPauseStatus{
Version: ruleVersion(rule.Version),
IsPaused: rule.IsPaused,
Fingerprint: ruleWithFolder{rule: rule, folderTitle: folderTitle}.Fingerprint(),
IsPaused: rule.IsPaused,
})
}(ruleInfo, item)
updatedRules = append(updatedRules, ngmodels.AlertRuleKeyWithVersion{
@ -369,8 +371,8 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
notify(states)
}
evaluate := func(ctx context.Context, attempt int64, e *evaluation, span tracing.Span) {
logger := logger.New("version", e.rule.Version, "attempt", attempt, "now", e.scheduledAt)
evaluate := func(ctx context.Context, f fingerprint, attempt int64, e *evaluation, span tracing.Span) {
logger := logger.New("version", e.rule.Version, "fingerprint", f, "attempt", attempt, "now", e.scheduledAt)
start := sch.clock.Now()
evalCtx := eval.NewContext(ctx, SchedulerUserFor(e.rule.OrgID))
@ -450,24 +452,21 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
}
evalRunning := false
var currentRuleVersion int64 = 0
var currentFingerprint fingerprint
defer sch.stopApplied(key)
for {
select {
// used by external services (API) to notify that rule is updated.
case ctx := <-updateCh:
// sometimes it can happen when, for example, the rule evaluation took so long,
// and there were two concurrent messages in updateCh and evalCh, and the eval's one got processed first.
// therefore, at the time when message from updateCh is processed the current rule will have
// at least the same version (or greater) and the state created for the new version of the rule.
if currentRuleVersion >= int64(ctx.Version) {
logger.Info("Skip updating rule because its current version is actual", "version", currentRuleVersion, "newVersion", ctx.Version)
if currentFingerprint == ctx.Fingerprint {
logger.Info("Rule's fingerprint has not changed. Skip resetting the state", "currentFingerprint", currentFingerprint)
continue
}
logger.Info("Clearing the state of the rule because it was updated", "version", currentRuleVersion, "newVersion", ctx.Version, "isPaused", ctx.IsPaused)
logger.Info("Clearing the state of the rule because it was updated", "isPaused", ctx.IsPaused, "fingerprint", ctx.Fingerprint)
// clear the state. So the next evaluation will start from the scratch.
resetState(grafanaCtx, ctx.IsPaused)
currentFingerprint = ctx.Fingerprint
// evalCh - used by the scheduler to signal that evaluation is needed.
case ctx, ok := <-evalCh:
if !ok {
@ -486,21 +485,24 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
}()
err := retryIfError(func(attempt int64) error {
newVersion := ctx.rule.Version
isPaused := ctx.rule.IsPaused
// fetch latest alert rule version
if currentRuleVersion != newVersion {
// Do not clean up state if the eval loop has just started.
// We need to reset state if the loop has started and the alert is already paused. It can happen,
// if we have an alert with state and we do file provision with stateful Grafana, that state
// lingers in DB and won't be cleaned up until next alert rule update.
if currentRuleVersion > 0 || isPaused {
logger.Debug("Got a new version of alert rule. Clear up the state and refresh extra labels", "version", currentRuleVersion, "newVersion", newVersion)
resetState(grafanaCtx, isPaused)
}
currentRuleVersion = newVersion
f := ruleWithFolder{ctx.rule, ctx.folderTitle}.Fingerprint()
// Do not clean up state if the eval loop has just started.
var needReset bool
if currentFingerprint != 0 && currentFingerprint != f {
logger.Debug("Got a new version of alert rule. Clear up the state", "fingerprint", f)
needReset = true
}
// We need to reset state if the loop has started and the alert is already paused. It can happen,
// if we have an alert with state and we do file provision with stateful Grafana, that state
// lingers in DB and won't be cleaned up until next alert rule update.
needReset = needReset || (currentFingerprint == 0 && isPaused)
if needReset {
resetState(grafanaCtx, isPaused)
}
currentFingerprint = f
if isPaused {
logger.Debug("Skip rule evaluation because it is paused")
return nil
}
tracingCtx, span := sch.tracer.Start(grafanaCtx, "alert rule execution")
@ -509,10 +511,12 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
span.SetAttributes("rule_uid", ctx.rule.UID, attribute.String("rule_uid", ctx.rule.UID))
span.SetAttributes("org_id", ctx.rule.OrgID, attribute.Int64("org_id", ctx.rule.OrgID))
span.SetAttributes("rule_version", ctx.rule.Version, attribute.Int64("rule_version", ctx.rule.Version))
fpStr := currentFingerprint.String()
span.SetAttributes("rule_fingerprint", fpStr, attribute.String("rule_fingerprint", fpStr))
utcTick := ctx.scheduledAt.UTC().Format(time.RFC3339Nano)
span.SetAttributes("tick", utcTick, attribute.String("tick", utcTick))
evaluate(tracingCtx, attempt, ctx, span)
evaluate(tracingCtx, f, attempt, ctx, span)
return nil
})
if err != nil {

View File

@ -530,6 +530,8 @@ func TestSchedule_ruleRoutine(t *testing.T) {
t.Run("when a message is sent to update channel", func(t *testing.T) {
rule := models.AlertRuleGen(withQueryForState(t, eval.Normal))()
folderTitle := "folderName"
ruleFp := ruleWithFolder{rule, folderTitle}.Fingerprint()
evalChan := make(chan *evaluation)
evalAppliedChan := make(chan time.Time)
@ -540,7 +542,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
sch, ruleStore, _, _ := createSchedule(evalAppliedChan, &sender)
ruleStore.PutRule(context.Background(), rule)
sch.schedulableAlertRules.set([]*models.AlertRule{rule}, map[string]string{rule.NamespaceUID: "folderName"})
sch.schedulableAlertRules.set([]*models.AlertRule{rule}, map[string]string{rule.NamespaceUID: folderTitle})
go func() {
ctx, cancel := context.WithCancel(context.Background())
@ -552,6 +554,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
evalChan <- &evaluation{
scheduledAt: sch.clock.Now(),
rule: rule,
folderTitle: folderTitle,
}
waitForTimeChannel(t, evalAppliedChan)
@ -584,9 +587,8 @@ func TestSchedule_ruleRoutine(t *testing.T) {
require.Greaterf(t, expectedToBeSent, 0, "State manager was expected to return at least one state that can be expired")
t.Run("should do nothing if version in channel is the same", func(t *testing.T) {
updateChan <- ruleVersionAndPauseStatus{ruleVersion(rule.Version - 1), false}
updateChan <- ruleVersionAndPauseStatus{ruleVersion(rule.Version), false}
updateChan <- ruleVersionAndPauseStatus{ruleVersion(rule.Version), false} // second time just to make sure that previous messages were handled
updateChan <- ruleVersionAndPauseStatus{ruleFp, false}
updateChan <- ruleVersionAndPauseStatus{ruleFp, false} // second time just to make sure that previous messages were handled
actualStates := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
require.Len(t, actualStates, len(states))
@ -595,7 +597,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
})
t.Run("should clear the state and expire firing alerts if version in channel is greater", func(t *testing.T) {
updateChan <- ruleVersionAndPauseStatus{ruleVersion(rule.Version + rand.Int63n(1000) + 1), false}
updateChan <- ruleVersionAndPauseStatus{ruleFp + 1, false}
require.Eventually(t, func() bool {
return len(sender.Calls) > 0