mirror of
https://github.com/grafana/grafana.git
synced 2025-01-27 16:57:14 -06:00
Alerting: Write and Delete multiple alert instances. (#55350)
Prior to this change, all alert instance writes and deletes happened individually, in their own database transaction. This change batches up writes or deletes for a given rule's evaluation loop into a single transaction before applying it. These new transactions are off by default, guarded by the feature toggle "alertingBigTransactions" Before: ``` goos: darwin goarch: arm64 pkg: github.com/grafana/grafana/pkg/services/ngalert/store BenchmarkAlertInstanceOperations-8 398 2991381 ns/op 1133537 B/op 27703 allocs/op --- BENCH: BenchmarkAlertInstanceOperations-8 util.go:127: alert definition: {orgID: 1, UID: FovKXiRVzm} with title: "an alert definition FTvFXmRVkz" interval: 60 created util.go:127: alert definition: {orgID: 1, UID: foDFXmRVkm} with title: "an alert definition fovFXmRVkz" interval: 60 created util.go:127: alert definition: {orgID: 1, UID: VQvFuigVkm} with title: "an alert definition VwDKXmR4kz" interval: 60 created PASS ok github.com/grafana/grafana/pkg/services/ngalert/store 1.619s ``` After: ``` goos: darwin goarch: arm64 pkg: github.com/grafana/grafana/pkg/services/ngalert/store BenchmarkAlertInstanceOperations-8 1440 816484 ns/op 352297 B/op 6529 allocs/op --- BENCH: BenchmarkAlertInstanceOperations-8 util.go:127: alert definition: {orgID: 1, UID: 302r_igVzm} with title: "an alert definition q0h9lmR4zz" interval: 60 created util.go:127: alert definition: {orgID: 1, UID: 71hrlmR4km} with title: "an alert definition nJ29_mR4zz" interval: 60 created util.go:127: alert definition: {orgID: 1, UID: Cahr_mR4zm} with title: "an alert definition ja2rlmg4zz" interval: 60 created PASS ok github.com/grafana/grafana/pkg/services/ngalert/store 1.383s ``` So we cut time by about 75% and memory allocations by about 60% when storing and deleting 100 instances.
This commit is contained in:
parent
b4e23e5d32
commit
b476ae62fb
@ -16,6 +16,7 @@
|
||||
export interface FeatureToggles {
|
||||
[name: string]: boolean | undefined; // support any string value
|
||||
|
||||
alertingBigTransactions?: boolean;
|
||||
trimDefaults?: boolean;
|
||||
disableEnvelopeEncryption?: boolean;
|
||||
database_metrics?: boolean;
|
||||
|
@ -9,6 +9,11 @@ package featuremgmt
|
||||
var (
|
||||
// Register each toggle here
|
||||
standardFeatureFlags = []FeatureFlag{
|
||||
{
|
||||
Name: "alertingBigTransactions",
|
||||
Description: "Use big transactions for alerting database writes",
|
||||
State: FeatureStateAlpha,
|
||||
},
|
||||
{
|
||||
Name: "trimDefaults",
|
||||
Description: "Use cue schema to remove values that will be applied automatically",
|
||||
|
@ -7,6 +7,10 @@
|
||||
package featuremgmt
|
||||
|
||||
const (
|
||||
// FlagAlertingBigTransactions
|
||||
// Use big transactions for alerting database writes
|
||||
FlagAlertingBigTransactions = "alertingBigTransactions"
|
||||
|
||||
// FlagTrimDefaults
|
||||
// Use cue schema to remove values that will be applied automatically
|
||||
FlagTrimDefaults = "trimDefaults"
|
||||
|
@ -7,10 +7,8 @@ import (
|
||||
|
||||
// AlertInstance represents a single alert instance.
|
||||
type AlertInstance struct {
|
||||
RuleOrgID int64 `xorm:"rule_org_id"`
|
||||
RuleUID string `xorm:"rule_uid"`
|
||||
AlertInstanceKey `xorm:"extends"`
|
||||
Labels InstanceLabels
|
||||
LabelsHash string
|
||||
CurrentState InstanceStateType
|
||||
CurrentReason string
|
||||
CurrentStateSince time.Time
|
||||
@ -18,6 +16,12 @@ type AlertInstance struct {
|
||||
LastEvalTime time.Time
|
||||
}
|
||||
|
||||
type AlertInstanceKey struct {
|
||||
RuleOrgID int64 `xorm:"rule_org_id"`
|
||||
RuleUID string `xorm:"rule_uid"`
|
||||
LabelsHash string
|
||||
}
|
||||
|
||||
// InstanceStateType is an enum for instance states.
|
||||
type InstanceStateType string
|
||||
|
||||
@ -44,18 +48,6 @@ func (i InstanceStateType) IsValid() bool {
|
||||
i == InstanceStateError
|
||||
}
|
||||
|
||||
// SaveAlertInstanceCommand is the query for saving a new alert instance.
|
||||
type SaveAlertInstanceCommand struct {
|
||||
RuleOrgID int64
|
||||
RuleUID string
|
||||
Labels InstanceLabels
|
||||
State InstanceStateType
|
||||
StateReason string
|
||||
LastEvalTime time.Time
|
||||
CurrentStateSince time.Time
|
||||
CurrentStateEnd time.Time
|
||||
}
|
||||
|
||||
// ListAlertInstancesQuery is the query list alert Instances.
|
||||
type ListAlertInstancesQuery struct {
|
||||
RuleOrgID int64 `json:"-"`
|
||||
@ -68,11 +60,7 @@ type ListAlertInstancesQuery struct {
|
||||
|
||||
// ValidateAlertInstance validates that the alert instance contains an alert rule id,
|
||||
// and state.
|
||||
func ValidateAlertInstance(alertInstance *AlertInstance) error {
|
||||
if alertInstance == nil {
|
||||
return fmt.Errorf("alert instance is invalid because it is nil")
|
||||
}
|
||||
|
||||
func ValidateAlertInstance(alertInstance AlertInstance) error {
|
||||
if alertInstance.RuleOrgID == 0 {
|
||||
return fmt.Errorf("alert instance is invalid due to missing alert rule organisation")
|
||||
}
|
||||
|
@ -42,7 +42,7 @@ func (il *InstanceLabels) StringKey() (string, error) {
|
||||
tl := labelsToTupleLabels(*il)
|
||||
b, err := json.Marshal(tl)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("can not gereate key due to failure to encode labels: %w", err)
|
||||
return "", fmt.Errorf("could not generate key due to failure to encode labels: %w", err)
|
||||
}
|
||||
return string(b), nil
|
||||
}
|
||||
@ -54,7 +54,7 @@ func (il *InstanceLabels) StringAndHash() (string, string, error) {
|
||||
|
||||
b, err := json.Marshal(tl)
|
||||
if err != nil {
|
||||
return "", "", fmt.Errorf("can not gereate key for alert instance due to failure to encode labels: %w", err)
|
||||
return "", "", fmt.Errorf("could not generate key for alert instance due to failure to encode labels: %w", err)
|
||||
}
|
||||
|
||||
h := sha1.New()
|
||||
@ -76,7 +76,7 @@ type tupleLabels []tupleLabel
|
||||
type tupleLabel [2]string
|
||||
|
||||
// Sort tupleLabels by each elements first property (key).
|
||||
func (t *tupleLabels) sortBtKey() {
|
||||
func (t *tupleLabels) sortByKey() {
|
||||
if t == nil {
|
||||
return
|
||||
}
|
||||
@ -91,7 +91,7 @@ func labelsToTupleLabels(l InstanceLabels) tupleLabels {
|
||||
for k, v := range l {
|
||||
t = append(t, tupleLabel{k, v})
|
||||
}
|
||||
t.sortBtKey()
|
||||
t.sortByKey()
|
||||
return t
|
||||
}
|
||||
|
||||
|
@ -54,25 +54,25 @@ func AlertRuleGen(mutators ...AlertRuleMutator) func() *AlertRule {
|
||||
if rand.Int63()%2 == 0 {
|
||||
d := util.GenerateShortUID()
|
||||
dashUID = &d
|
||||
p := rand.Int63()
|
||||
p := rand.Int63n(1500)
|
||||
panelID = &p
|
||||
}
|
||||
|
||||
rule := &AlertRule{
|
||||
ID: rand.Int63(),
|
||||
OrgID: rand.Int63(),
|
||||
ID: rand.Int63n(1500),
|
||||
OrgID: rand.Int63n(1500),
|
||||
Title: "TEST-ALERT-" + util.GenerateShortUID(),
|
||||
Condition: "A",
|
||||
Data: []AlertQuery{GenerateAlertQuery()},
|
||||
Updated: time.Now().Add(-time.Duration(rand.Intn(100) + 1)),
|
||||
IntervalSeconds: rand.Int63n(60) + 1,
|
||||
Version: rand.Int63(),
|
||||
Version: rand.Int63n(1500), // Don't generate a rule ID too big for postgres
|
||||
UID: util.GenerateShortUID(),
|
||||
NamespaceUID: util.GenerateShortUID(),
|
||||
DashboardUID: dashUID,
|
||||
PanelID: panelID,
|
||||
RuleGroup: "TEST-GROUP-" + util.GenerateShortUID(),
|
||||
RuleGroupIndex: rand.Int(),
|
||||
RuleGroupIndex: rand.Intn(1500),
|
||||
NoDataState: randNoDataState(),
|
||||
ExecErrState: randErrState(),
|
||||
For: forInterval,
|
||||
@ -96,7 +96,7 @@ func WithUniqueID() AlertRuleMutator {
|
||||
usedID := make(map[int64]struct{})
|
||||
return func(rule *AlertRule) {
|
||||
for {
|
||||
id := rand.Int63()
|
||||
id := rand.Int63n(1500)
|
||||
if _, ok := usedID[id]; !ok {
|
||||
usedID[id] = struct{}{}
|
||||
rule.ID = id
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/dashboards"
|
||||
"github.com/grafana/grafana/pkg/services/datasourceproxy"
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/api"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/image"
|
||||
@ -39,13 +40,31 @@ import (
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
)
|
||||
|
||||
func ProvideService(cfg *setting.Cfg, dataSourceCache datasources.CacheService, dataSourceService datasources.DataSourceService, routeRegister routing.RouteRegister,
|
||||
sqlStore *sqlstore.SQLStore, kvStore kvstore.KVStore, expressionService *expr.Service, dataProxy *datasourceproxy.DataSourceProxyService,
|
||||
quotaService quota.Service, secretsService secrets.Service, notificationService notifications.Service, m *metrics.NGAlert,
|
||||
folderService dashboards.FolderService, ac accesscontrol.AccessControl, dashboardService dashboards.DashboardService, renderService rendering.Service,
|
||||
bus bus.Bus, accesscontrolService accesscontrol.Service, annotationsRepo annotations.Repository) (*AlertNG, error) {
|
||||
func ProvideService(
|
||||
cfg *setting.Cfg,
|
||||
featureToggles featuremgmt.FeatureToggles,
|
||||
dataSourceCache datasources.CacheService,
|
||||
dataSourceService datasources.DataSourceService,
|
||||
routeRegister routing.RouteRegister,
|
||||
sqlStore *sqlstore.SQLStore,
|
||||
kvStore kvstore.KVStore,
|
||||
expressionService *expr.Service,
|
||||
dataProxy *datasourceproxy.DataSourceProxyService,
|
||||
quotaService quota.Service,
|
||||
secretsService secrets.Service,
|
||||
notificationService notifications.Service,
|
||||
m *metrics.NGAlert,
|
||||
folderService dashboards.FolderService,
|
||||
ac accesscontrol.AccessControl,
|
||||
dashboardService dashboards.DashboardService,
|
||||
renderService rendering.Service,
|
||||
bus bus.Bus,
|
||||
accesscontrolService accesscontrol.Service,
|
||||
annotationsRepo annotations.Repository,
|
||||
) (*AlertNG, error) {
|
||||
ng := &AlertNG{
|
||||
Cfg: cfg,
|
||||
FeatureToggles: featureToggles,
|
||||
DataSourceCache: dataSourceCache,
|
||||
DataSourceService: dataSourceService,
|
||||
RouteRegister: routeRegister,
|
||||
@ -81,6 +100,7 @@ func ProvideService(cfg *setting.Cfg, dataSourceCache datasources.CacheService,
|
||||
// AlertNG is the service for evaluating the condition of an alert definition.
|
||||
type AlertNG struct {
|
||||
Cfg *setting.Cfg
|
||||
FeatureToggles featuremgmt.FeatureToggles
|
||||
DataSourceCache datasources.CacheService
|
||||
DataSourceService datasources.DataSourceService
|
||||
RouteRegister routing.RouteRegister
|
||||
@ -115,6 +135,7 @@ func (ng *AlertNG) init() error {
|
||||
|
||||
store := &store.DBstore{
|
||||
Cfg: ng.Cfg.UnifiedAlerting,
|
||||
FeatureToggles: ng.FeatureToggles,
|
||||
SQLStore: ng.SQLStore,
|
||||
Logger: ng.Log,
|
||||
FolderService: ng.folderService,
|
||||
|
@ -75,28 +75,38 @@ func TestWarmStateCache(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
saveCmd1 := &models.SaveAlertInstanceCommand{
|
||||
RuleOrgID: rule.OrgID,
|
||||
RuleUID: rule.UID,
|
||||
Labels: models.InstanceLabels{"test1": "testValue1"},
|
||||
State: models.InstanceStateNormal,
|
||||
labels := models.InstanceLabels{"test1": "testValue1"}
|
||||
_, hash, _ := labels.StringAndHash()
|
||||
instance1 := models.AlertInstance{
|
||||
AlertInstanceKey: models.AlertInstanceKey{
|
||||
RuleOrgID: rule.OrgID,
|
||||
RuleUID: rule.UID,
|
||||
LabelsHash: hash,
|
||||
},
|
||||
CurrentState: models.InstanceStateNormal,
|
||||
LastEvalTime: evaluationTime,
|
||||
CurrentStateSince: evaluationTime.Add(-1 * time.Minute),
|
||||
CurrentStateEnd: evaluationTime.Add(1 * time.Minute),
|
||||
Labels: labels,
|
||||
}
|
||||
|
||||
_ = dbstore.SaveAlertInstance(ctx, saveCmd1)
|
||||
_ = dbstore.SaveAlertInstances(ctx, instance1)
|
||||
|
||||
saveCmd2 := &models.SaveAlertInstanceCommand{
|
||||
RuleOrgID: rule.OrgID,
|
||||
RuleUID: rule.UID,
|
||||
Labels: models.InstanceLabels{"test2": "testValue2"},
|
||||
State: models.InstanceStateFiring,
|
||||
labels = models.InstanceLabels{"test2": "testValue2"}
|
||||
_, hash, _ = labels.StringAndHash()
|
||||
instance2 := models.AlertInstance{
|
||||
AlertInstanceKey: models.AlertInstanceKey{
|
||||
RuleOrgID: rule.OrgID,
|
||||
RuleUID: rule.UID,
|
||||
LabelsHash: hash,
|
||||
},
|
||||
CurrentState: models.InstanceStateFiring,
|
||||
LastEvalTime: evaluationTime,
|
||||
CurrentStateSince: evaluationTime.Add(-1 * time.Minute),
|
||||
CurrentStateEnd: evaluationTime.Add(1 * time.Minute),
|
||||
Labels: labels,
|
||||
}
|
||||
_ = dbstore.SaveAlertInstance(ctx, saveCmd2)
|
||||
_ = dbstore.SaveAlertInstances(ctx, instance2)
|
||||
|
||||
cfg := setting.UnifiedAlertingSettings{
|
||||
BaseInterval: time.Second,
|
||||
|
@ -108,10 +108,10 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
require.Len(t, states, 1)
|
||||
s := states[0]
|
||||
|
||||
var cmd *models.SaveAlertInstanceCommand
|
||||
var cmd *models.AlertInstance
|
||||
for _, op := range instanceStore.RecordedOps {
|
||||
switch q := op.(type) {
|
||||
case models.SaveAlertInstanceCommand:
|
||||
case models.AlertInstance:
|
||||
cmd = &q
|
||||
}
|
||||
if cmd != nil {
|
||||
@ -120,11 +120,11 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
}
|
||||
|
||||
require.NotNil(t, cmd)
|
||||
t.Logf("Saved alert instance: %v", cmd)
|
||||
t.Logf("Saved alert instances: %v", cmd)
|
||||
require.Equal(t, rule.OrgID, cmd.RuleOrgID)
|
||||
require.Equal(t, expectedTime, cmd.LastEvalTime)
|
||||
require.Equal(t, cmd.RuleUID, cmd.RuleUID)
|
||||
require.Equal(t, evalState.String(), string(cmd.State))
|
||||
require.Equal(t, rule.UID, cmd.RuleUID)
|
||||
require.Equal(t, evalState.String(), string(cmd.CurrentState))
|
||||
require.Equal(t, s.Labels, data.Labels(cmd.Labels))
|
||||
})
|
||||
|
||||
|
@ -177,11 +177,7 @@ func (st *Manager) ProcessEvalResults(ctx context.Context, evaluatedAt time.Time
|
||||
resolvedStates := st.staleResultsHandler(ctx, evaluatedAt, alertRule, processedResults)
|
||||
if len(states) > 0 {
|
||||
logger.Debug("saving new states to the database", "count", len(states))
|
||||
for _, state := range states {
|
||||
if err := st.saveState(ctx, state); err != nil {
|
||||
logger.Error("failed to save alert state", "labels", state.Labels.String(), "state", state.State.String(), "err", err.Error())
|
||||
}
|
||||
}
|
||||
_, _ = st.saveAlertStates(ctx, states...)
|
||||
}
|
||||
return append(states, resolvedStates...)
|
||||
}
|
||||
@ -310,18 +306,52 @@ func (st *Manager) Put(states []*State) {
|
||||
}
|
||||
}
|
||||
|
||||
func (st *Manager) saveState(ctx context.Context, s *State) error {
|
||||
cmd := ngModels.SaveAlertInstanceCommand{
|
||||
RuleOrgID: s.OrgID,
|
||||
RuleUID: s.AlertRuleUID,
|
||||
Labels: ngModels.InstanceLabels(s.Labels),
|
||||
State: ngModels.InstanceStateType(s.State.String()),
|
||||
StateReason: s.StateReason,
|
||||
LastEvalTime: s.LastEvaluationTime,
|
||||
CurrentStateSince: s.StartsAt,
|
||||
CurrentStateEnd: s.EndsAt,
|
||||
// TODO: Is the `State` type necessary? Should it embed the instance?
|
||||
func (st *Manager) saveAlertStates(ctx context.Context, states ...*State) (saved, failed int) {
|
||||
st.log.Debug("saving alert states", "count", len(states))
|
||||
instances := make([]ngModels.AlertInstance, 0, len(states))
|
||||
|
||||
type debugInfo struct {
|
||||
OrgID int64
|
||||
Uid string
|
||||
State string
|
||||
Labels string
|
||||
}
|
||||
return st.instanceStore.SaveAlertInstance(ctx, &cmd)
|
||||
debug := make([]debugInfo, 0)
|
||||
|
||||
for _, s := range states {
|
||||
labels := ngModels.InstanceLabels(s.Labels)
|
||||
_, hash, err := labels.StringAndHash()
|
||||
if err != nil {
|
||||
debug = append(debug, debugInfo{s.OrgID, s.AlertRuleUID, s.State.String(), s.Labels.String()})
|
||||
st.log.Error("failed to save alert instance with invalid labels", "orgID", s.OrgID, "ruleUID", s.AlertRuleUID, "err", err)
|
||||
continue
|
||||
}
|
||||
fields := ngModels.AlertInstance{
|
||||
AlertInstanceKey: ngModels.AlertInstanceKey{
|
||||
RuleOrgID: s.OrgID,
|
||||
RuleUID: s.AlertRuleUID,
|
||||
LabelsHash: hash,
|
||||
},
|
||||
Labels: ngModels.InstanceLabels(s.Labels),
|
||||
CurrentState: ngModels.InstanceStateType(s.State.String()),
|
||||
CurrentReason: s.StateReason,
|
||||
LastEvalTime: s.LastEvaluationTime,
|
||||
CurrentStateSince: s.StartsAt,
|
||||
CurrentStateEnd: s.EndsAt,
|
||||
}
|
||||
instances = append(instances, fields)
|
||||
}
|
||||
|
||||
if err := st.instanceStore.SaveAlertInstances(ctx, instances...); err != nil {
|
||||
for _, inst := range instances {
|
||||
debug = append(debug, debugInfo{inst.RuleOrgID, inst.RuleUID, string(inst.CurrentState), data.Labels(inst.Labels).String()})
|
||||
}
|
||||
st.log.Error("failed to save alert states", "states", debug, "err", err)
|
||||
return 0, len(debug)
|
||||
}
|
||||
|
||||
return len(instances), len(debug)
|
||||
}
|
||||
|
||||
// TODO: why wouldn't you allow other types like NoData or Error?
|
||||
@ -353,9 +383,11 @@ func (i InstanceStateAndReason) String() string {
|
||||
func (st *Manager) staleResultsHandler(ctx context.Context, evaluatedAt time.Time, alertRule *ngModels.AlertRule, states map[string]*State) []*State {
|
||||
var resolvedStates []*State
|
||||
allStates := st.GetStatesForRuleUID(alertRule.OrgID, alertRule.UID)
|
||||
toDelete := make([]ngModels.AlertInstanceKey, 0)
|
||||
|
||||
for _, s := range allStates {
|
||||
_, ok := states[s.CacheId]
|
||||
if !ok && isItStale(evaluatedAt, s.LastEvaluationTime, alertRule.IntervalSeconds) {
|
||||
// Is the cached state in our recently processed results? If not, is it stale?
|
||||
if _, ok := states[s.CacheId]; !ok && stateIsStale(evaluatedAt, s.LastEvaluationTime, alertRule.IntervalSeconds) {
|
||||
st.log.Debug("removing stale state entry", "orgID", s.OrgID, "alertRuleUID", s.AlertRuleUID, "cacheID", s.CacheId)
|
||||
st.cache.deleteEntry(s.OrgID, s.AlertRuleUID, s.CacheId)
|
||||
ilbs := ngModels.InstanceLabels(s.Labels)
|
||||
@ -364,9 +396,7 @@ func (st *Manager) staleResultsHandler(ctx context.Context, evaluatedAt time.Tim
|
||||
st.log.Error("unable to get labelsHash", "err", err.Error(), "orgID", s.OrgID, "alertRuleUID", s.AlertRuleUID)
|
||||
}
|
||||
|
||||
if err = st.instanceStore.DeleteAlertInstance(ctx, s.OrgID, s.AlertRuleUID, labelsHash); err != nil {
|
||||
st.log.Error("unable to delete stale instance from database", "err", err.Error(), "orgID", s.OrgID, "alertRuleUID", s.AlertRuleUID, "cacheID", s.CacheId)
|
||||
}
|
||||
toDelete = append(toDelete, ngModels.AlertInstanceKey{RuleOrgID: s.OrgID, RuleUID: s.AlertRuleUID, LabelsHash: labelsHash})
|
||||
|
||||
if s.State == eval.Alerting {
|
||||
previousState := InstanceStateAndReason{State: s.State, Reason: s.StateReason}
|
||||
@ -382,9 +412,14 @@ func (st *Manager) staleResultsHandler(ctx context.Context, evaluatedAt time.Tim
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := st.instanceStore.DeleteAlertInstances(ctx, toDelete...); err != nil {
|
||||
st.log.Error("unable to delete stale instances from database", "err", err.Error(),
|
||||
"orgID", alertRule.OrgID, "alertRuleUID", alertRule.UID, "count", len(toDelete))
|
||||
}
|
||||
return resolvedStates
|
||||
}
|
||||
|
||||
func isItStale(evaluatedAt time.Time, lastEval time.Time, intervalSeconds int64) bool {
|
||||
func stateIsStale(evaluatedAt time.Time, lastEval time.Time, intervalSeconds int64) bool {
|
||||
return !lastEval.Add(2 * time.Duration(intervalSeconds) * time.Second).After(evaluatedAt)
|
||||
}
|
||||
|
@ -106,7 +106,7 @@ func Test_maybeNewImage(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsItStale(t *testing.T) {
|
||||
func TestStateIsStale(t *testing.T) {
|
||||
now := time.Now()
|
||||
intervalSeconds := rand.Int63n(10) + 5
|
||||
|
||||
@ -143,7 +143,7 @@ func TestIsItStale(t *testing.T) {
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
require.Equal(t, tc.expectedResult, isItStale(now, tc.lastEvaluation, intervalSeconds))
|
||||
require.Equal(t, tc.expectedResult, stateIsStale(now, tc.lastEvaluation, intervalSeconds))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -2019,10 +2019,10 @@ func TestProcessEvalResults(t *testing.T) {
|
||||
|
||||
require.NotEmpty(t, states)
|
||||
|
||||
savedStates := make(map[string]models.SaveAlertInstanceCommand)
|
||||
savedStates := make(map[string]models.AlertInstance)
|
||||
for _, op := range instanceStore.RecordedOps {
|
||||
switch q := op.(type) {
|
||||
case models.SaveAlertInstanceCommand:
|
||||
case models.AlertInstance:
|
||||
cacheId, err := q.Labels.StringKey()
|
||||
require.NoError(t, err)
|
||||
savedStates[cacheId] = q
|
||||
@ -2055,28 +2055,39 @@ func TestStaleResultsHandler(t *testing.T) {
|
||||
const mainOrgID int64 = 1
|
||||
rule := tests.CreateTestAlertRule(t, ctx, dbstore, int64(interval.Seconds()), mainOrgID)
|
||||
lastEval := evaluationTime.Add(-2 * interval)
|
||||
saveCmd1 := &models.SaveAlertInstanceCommand{
|
||||
RuleOrgID: rule.OrgID,
|
||||
RuleUID: rule.UID,
|
||||
Labels: models.InstanceLabels{"test1": "testValue1"},
|
||||
State: models.InstanceStateNormal,
|
||||
LastEvalTime: lastEval,
|
||||
CurrentStateSince: lastEval,
|
||||
CurrentStateEnd: lastEval.Add(3 * interval),
|
||||
|
||||
labels1 := models.InstanceLabels{"test1": "testValue1"}
|
||||
_, hash1, _ := labels1.StringAndHash()
|
||||
labels2 := models.InstanceLabels{"test2": "testValue2"}
|
||||
_, hash2, _ := labels2.StringAndHash()
|
||||
instances := []models.AlertInstance{
|
||||
{
|
||||
AlertInstanceKey: models.AlertInstanceKey{
|
||||
RuleOrgID: rule.OrgID,
|
||||
RuleUID: rule.UID,
|
||||
LabelsHash: hash1,
|
||||
},
|
||||
CurrentState: models.InstanceStateNormal,
|
||||
Labels: labels1,
|
||||
LastEvalTime: lastEval,
|
||||
CurrentStateSince: lastEval,
|
||||
CurrentStateEnd: lastEval.Add(3 * interval),
|
||||
},
|
||||
{
|
||||
AlertInstanceKey: models.AlertInstanceKey{
|
||||
RuleOrgID: rule.OrgID,
|
||||
RuleUID: rule.UID,
|
||||
LabelsHash: hash2,
|
||||
},
|
||||
CurrentState: models.InstanceStateFiring,
|
||||
Labels: labels2,
|
||||
LastEvalTime: lastEval,
|
||||
CurrentStateSince: lastEval,
|
||||
CurrentStateEnd: lastEval.Add(3 * interval),
|
||||
},
|
||||
}
|
||||
|
||||
_ = dbstore.SaveAlertInstance(ctx, saveCmd1)
|
||||
|
||||
saveCmd2 := &models.SaveAlertInstanceCommand{
|
||||
RuleOrgID: rule.OrgID,
|
||||
RuleUID: rule.UID,
|
||||
Labels: models.InstanceLabels{"test2": "testValue2"},
|
||||
State: models.InstanceStateFiring,
|
||||
LastEvalTime: lastEval,
|
||||
CurrentStateSince: lastEval,
|
||||
CurrentStateEnd: lastEval.Add(3 * interval),
|
||||
}
|
||||
_ = dbstore.SaveAlertInstance(ctx, saveCmd2)
|
||||
_ = dbstore.SaveAlertInstances(ctx, instances...)
|
||||
|
||||
testCases := []struct {
|
||||
desc string
|
||||
|
@ -12,8 +12,8 @@ import (
|
||||
type InstanceStore interface {
|
||||
FetchOrgIds(ctx context.Context) ([]int64, error)
|
||||
ListAlertInstances(ctx context.Context, cmd *models.ListAlertInstancesQuery) error
|
||||
SaveAlertInstance(ctx context.Context, cmd *models.SaveAlertInstanceCommand) error
|
||||
DeleteAlertInstance(ctx context.Context, orgID int64, ruleUID, labelsHash string) error
|
||||
SaveAlertInstances(ctx context.Context, cmd ...models.AlertInstance) error
|
||||
DeleteAlertInstances(ctx context.Context, keys ...models.AlertInstanceKey) error
|
||||
DeleteAlertInstancesByRule(ctx context.Context, key models.AlertRuleKey) error
|
||||
}
|
||||
|
||||
|
@ -9,6 +9,8 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
)
|
||||
|
||||
var _ InstanceStore = &FakeInstanceStore{}
|
||||
|
||||
type FakeInstanceStore struct {
|
||||
mtx sync.Mutex
|
||||
RecordedOps []interface{}
|
||||
@ -21,16 +23,18 @@ func (f *FakeInstanceStore) ListAlertInstances(_ context.Context, q *models.List
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *FakeInstanceStore) SaveAlertInstance(_ context.Context, q *models.SaveAlertInstanceCommand) error {
|
||||
func (f *FakeInstanceStore) SaveAlertInstances(_ context.Context, q ...models.AlertInstance) error {
|
||||
f.mtx.Lock()
|
||||
defer f.mtx.Unlock()
|
||||
f.RecordedOps = append(f.RecordedOps, *q)
|
||||
for _, inst := range q {
|
||||
f.RecordedOps = append(f.RecordedOps, inst)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *FakeInstanceStore) FetchOrgIds(_ context.Context) ([]int64, error) { return []int64{}, nil }
|
||||
|
||||
func (f *FakeInstanceStore) DeleteAlertInstance(_ context.Context, _ int64, _, _ string) error {
|
||||
func (f *FakeInstanceStore) DeleteAlertInstances(_ context.Context, _ ...models.AlertInstanceKey) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/accesscontrol"
|
||||
"github.com/grafana/grafana/pkg/services/dashboards"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
@ -30,6 +31,7 @@ type AlertingStore interface {
|
||||
// DBstore stores the alert definitions and instances in the database.
|
||||
type DBstore struct {
|
||||
Cfg setting.UnifiedAlertingSettings
|
||||
FeatureToggles featuremgmt.FeatureToggles
|
||||
SQLStore *sqlstore.SQLStore
|
||||
Logger log.Logger
|
||||
FolderService dashboards.FolderService
|
||||
@ -38,10 +40,11 @@ type DBstore struct {
|
||||
}
|
||||
|
||||
func ProvideDBStore(
|
||||
cfg *setting.Cfg, sqlstore *sqlstore.SQLStore, folderService dashboards.FolderService,
|
||||
cfg *setting.Cfg, featureToggles featuremgmt.FeatureToggles, sqlstore *sqlstore.SQLStore, folderService dashboards.FolderService,
|
||||
access accesscontrol.AccessControl, dashboards dashboards.DashboardService) *DBstore {
|
||||
return &DBstore{
|
||||
Cfg: cfg.UnifiedAlerting,
|
||||
FeatureToggles: featureToggles,
|
||||
SQLStore: sqlstore,
|
||||
Logger: log.New("dbstore"),
|
||||
FolderService: folderService,
|
||||
|
@ -17,7 +17,7 @@ import (
|
||||
)
|
||||
|
||||
func TestCalculateChanges(t *testing.T) {
|
||||
orgId := rand.Int63()
|
||||
orgId := int64(rand.Int31())
|
||||
|
||||
t.Run("detects alerts that need to be added", func(t *testing.T) {
|
||||
fakeStore := fakes.NewRuleStore(t)
|
||||
@ -102,8 +102,8 @@ func TestCalculateChanges(t *testing.T) {
|
||||
r := models.CopyRule(rule)
|
||||
|
||||
// Ignore difference in the following fields as submitted models do not have them set
|
||||
r.ID = rand.Int63()
|
||||
r.Version = rand.Int63()
|
||||
r.ID = int64(rand.Int31())
|
||||
r.Version = int64(rand.Int31())
|
||||
r.Updated = r.Updated.Add(1 * time.Minute)
|
||||
|
||||
submitted = append(submitted, r)
|
||||
|
@ -2,8 +2,11 @@ package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||
)
|
||||
@ -22,7 +25,7 @@ func (st DBstore) ListAlertInstances(ctx context.Context, cmd *models.ListAlertI
|
||||
params = append(params, p...)
|
||||
}
|
||||
|
||||
addToQuery("SELECT alert_instance.*, alert_rule.title AS rule_title FROM alert_instance LEFT JOIN alert_rule ON alert_instance.rule_org_id = alert_rule.org_id AND alert_instance.rule_uid = alert_rule.uid WHERE rule_org_id = ?", cmd.RuleOrgID)
|
||||
addToQuery("SELECT * FROM alert_instance WHERE rule_org_id = ?", cmd.RuleOrgID)
|
||||
|
||||
if cmd.RuleUID != "" {
|
||||
addToQuery(` AND rule_uid = ?`, cmd.RuleUID)
|
||||
@ -45,30 +48,110 @@ func (st DBstore) ListAlertInstances(ctx context.Context, cmd *models.ListAlertI
|
||||
})
|
||||
}
|
||||
|
||||
// SaveAlertInstance is a handler for saving a new alert instance.
|
||||
func (st DBstore) SaveAlertInstance(ctx context.Context, cmd *models.SaveAlertInstanceCommand) error {
|
||||
return st.SQLStore.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
labelTupleJSON, labelsHash, err := cmd.Labels.StringAndHash()
|
||||
// SaveAlertInstances saves all the provided alert instances to the store.
|
||||
func (st DBstore) SaveAlertInstances(ctx context.Context, cmd ...models.AlertInstance) error {
|
||||
if !st.FeatureToggles.IsEnabled(featuremgmt.FlagAlertingBigTransactions) {
|
||||
// This mimics the replace code-path by calling SaveAlertInstance in a loop, with a transaction per call.
|
||||
for _, c := range cmd {
|
||||
err := st.SaveAlertInstance(ctx, c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
} else {
|
||||
// Batches write into statements with `maxRows` instances per statements.
|
||||
// This makes sure we don't create statements that are too long for some
|
||||
// databases to process. For example, SQLite has a limit of 999 variables
|
||||
// per write.
|
||||
keyNames := []string{"rule_org_id", "rule_uid", "labels_hash"}
|
||||
fieldNames := []string{
|
||||
"rule_org_id", "rule_uid", "labels", "labels_hash", "current_state",
|
||||
"current_reason", "current_state_since", "current_state_end", "last_eval_time",
|
||||
}
|
||||
fieldsPerRow := len(fieldNames)
|
||||
maxRows := 20
|
||||
maxArgs := maxRows * fieldsPerRow
|
||||
|
||||
bigUpsertSQL, err := st.SQLStore.Dialect.UpsertMultipleSQL(
|
||||
"alert_instance", keyNames, fieldNames, maxRows)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
alertInstance := &models.AlertInstance{
|
||||
RuleOrgID: cmd.RuleOrgID,
|
||||
RuleUID: cmd.RuleUID,
|
||||
Labels: cmd.Labels,
|
||||
LabelsHash: labelsHash,
|
||||
CurrentState: cmd.State,
|
||||
CurrentReason: cmd.StateReason,
|
||||
CurrentStateSince: cmd.CurrentStateSince,
|
||||
CurrentStateEnd: cmd.CurrentStateEnd,
|
||||
LastEvalTime: cmd.LastEvalTime,
|
||||
// Args contains the SQL statement, and the values to fill into the SQL statement.
|
||||
args := make([]interface{}, 0, maxArgs)
|
||||
args = append(args, bigUpsertSQL)
|
||||
values := func(a []interface{}) int {
|
||||
return len(a) - 1
|
||||
}
|
||||
|
||||
// Generate batches of `maxRows` and write the statements when full.
|
||||
for _, alertInstance := range cmd {
|
||||
labelTupleJSON, err := alertInstance.Labels.StringKey()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := models.ValidateAlertInstance(alertInstance); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
args = append(args,
|
||||
alertInstance.RuleOrgID, alertInstance.RuleUID, labelTupleJSON, alertInstance.LabelsHash,
|
||||
alertInstance.CurrentState, alertInstance.CurrentReason, alertInstance.CurrentStateSince.Unix(),
|
||||
alertInstance.CurrentStateEnd.Unix(), alertInstance.LastEvalTime.Unix())
|
||||
|
||||
// If we've reached the maximum batch size, write to the database.
|
||||
if values(args) >= maxArgs {
|
||||
err = st.SQLStore.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
_, err := sess.Exec(args...)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to save alert instances: %w", err)
|
||||
}
|
||||
|
||||
// Reset args so we can re-use the allocated interface pointers.
|
||||
args = args[:1]
|
||||
}
|
||||
}
|
||||
|
||||
// Write the final batch of up to maxRows in size.
|
||||
if values(args) != 0 && values(args)%fieldsPerRow == 0 {
|
||||
upsertSQL, err := st.SQLStore.Dialect.UpsertMultipleSQL(
|
||||
"alert_instance", keyNames, fieldNames, values(args)/fieldsPerRow)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
args[0] = upsertSQL
|
||||
err = st.SQLStore.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
_, err := sess.Exec(args...)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to save alert instances: %w", err)
|
||||
}
|
||||
} else if values(args) != 0 {
|
||||
return fmt.Errorf("failed to upsert alert instances. Last statements had %v fields, which is not a multiple of the number of fields, %v", len(args), fieldsPerRow)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// SaveAlertInstance is a handler for saving a new alert instance.
|
||||
func (st DBstore) SaveAlertInstance(ctx context.Context, alertInstance models.AlertInstance) error {
|
||||
return st.SQLStore.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
if err := models.ValidateAlertInstance(alertInstance); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
labelTupleJSON, err := alertInstance.Labels.StringKey()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
params := append(make([]interface{}, 0), alertInstance.RuleOrgID, alertInstance.RuleUID, labelTupleJSON, alertInstance.LabelsHash, alertInstance.CurrentState, alertInstance.CurrentReason, alertInstance.CurrentStateSince.Unix(), alertInstance.CurrentStateEnd.Unix(), alertInstance.LastEvalTime.Unix())
|
||||
|
||||
upsertSQL := st.SQLStore.Dialect.UpsertSQL(
|
||||
@ -107,14 +190,103 @@ func (st DBstore) FetchOrgIds(ctx context.Context) ([]int64, error) {
|
||||
return orgIds, err
|
||||
}
|
||||
|
||||
func (st DBstore) DeleteAlertInstance(ctx context.Context, orgID int64, ruleUID, labelsHash string) error {
|
||||
return st.SQLStore.WithTransactionalDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
_, err := sess.Exec("DELETE FROM alert_instance WHERE rule_org_id = ? AND rule_uid = ? AND labels_hash = ?", orgID, ruleUID, labelsHash)
|
||||
// DeleteAlertInstances deletes instances with the provided keys in a single transaction.
|
||||
func (st DBstore) DeleteAlertInstances(ctx context.Context, keys ...models.AlertInstanceKey) error {
|
||||
if len(keys) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
type data struct {
|
||||
ruleOrgID int64
|
||||
ruleUID string
|
||||
labelHashes []interface{}
|
||||
}
|
||||
|
||||
// Sort by org and rule UID. Most callers will have grouped already, but it's
|
||||
// cheap to verify and leads to more compact transactions.
|
||||
sort.Slice(keys, func(i, j int) bool {
|
||||
aye := keys[i]
|
||||
jay := keys[j]
|
||||
|
||||
if aye.RuleOrgID < jay.RuleOrgID {
|
||||
return true
|
||||
}
|
||||
|
||||
if aye.RuleOrgID == jay.RuleOrgID && aye.RuleUID < jay.RuleUID {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
maxRows := 200
|
||||
rowData := data{
|
||||
0, "", make([]interface{}, 0, maxRows),
|
||||
}
|
||||
placeholdersBuilder := strings.Builder{}
|
||||
placeholdersBuilder.WriteString("(")
|
||||
|
||||
execQuery := func(s *sqlstore.DBSession, rd data, placeholders string) error {
|
||||
if len(rd.labelHashes) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
placeholders = strings.TrimRight(placeholders, ", ")
|
||||
placeholders = placeholders + ")"
|
||||
|
||||
queryString := fmt.Sprintf(
|
||||
"DELETE FROM alert_instance WHERE rule_org_id = ? AND rule_uid = ? AND labels_hash IN %s;",
|
||||
placeholders,
|
||||
)
|
||||
|
||||
execArgs := make([]interface{}, 0, 3+len(rd.labelHashes))
|
||||
execArgs = append(execArgs, queryString, rd.ruleOrgID, rd.ruleUID)
|
||||
execArgs = append(execArgs, rd.labelHashes...)
|
||||
_, err := s.Exec(execArgs...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
err := st.SQLStore.WithTransactionalDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
counter := 0
|
||||
|
||||
// Create batches of up to 200 items and execute a new delete statement for each batch.
|
||||
for _, k := range keys {
|
||||
counter++
|
||||
// When a rule ID changes or we hit 200 hashes, issue a statement.
|
||||
if rowData.ruleOrgID != k.RuleOrgID || rowData.ruleUID != k.RuleUID || len(rowData.labelHashes) >= 200 {
|
||||
err := execQuery(sess, rowData, placeholdersBuilder.String())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// reset our reused data.
|
||||
rowData.ruleOrgID = k.RuleOrgID
|
||||
rowData.ruleUID = k.RuleUID
|
||||
rowData.labelHashes = rowData.labelHashes[:0]
|
||||
placeholdersBuilder.Reset()
|
||||
placeholdersBuilder.WriteString("(")
|
||||
}
|
||||
|
||||
// Accumulate new values.
|
||||
rowData.labelHashes = append(rowData.labelHashes, k.LabelsHash)
|
||||
placeholdersBuilder.WriteString("?, ")
|
||||
}
|
||||
|
||||
// Delete any remaining rows.
|
||||
if len(rowData.labelHashes) != 0 {
|
||||
err := execQuery(sess, rowData, placeholdersBuilder.String())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (st DBstore) DeleteAlertInstancesByRule(ctx context.Context, key models.AlertRuleKey) error {
|
||||
|
@ -2,6 +2,7 @@ package store_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
@ -12,6 +13,110 @@ import (
|
||||
|
||||
const baseIntervalSeconds = 10
|
||||
|
||||
func BenchmarkAlertInstanceOperations(b *testing.B) {
|
||||
b.StopTimer()
|
||||
ctx := context.Background()
|
||||
_, dbstore := tests.SetupTestEnv(b, baseIntervalSeconds)
|
||||
dbstore.FeatureToggles.(*tests.FakeFeatures).BigTransactions = false
|
||||
|
||||
const mainOrgID int64 = 1
|
||||
|
||||
alertRule := tests.CreateTestAlertRule(b, ctx, dbstore, 60, mainOrgID)
|
||||
|
||||
// Create some instances to write down and then delete.
|
||||
count := 10_003
|
||||
instances := make([]models.AlertInstance, 0, count)
|
||||
keys := make([]models.AlertInstanceKey, 0, count)
|
||||
for i := 0; i < count; i++ {
|
||||
labels := models.InstanceLabels{"test": fmt.Sprint(i)}
|
||||
_, labelsHash, _ := labels.StringAndHash()
|
||||
instance := models.AlertInstance{
|
||||
AlertInstanceKey: models.AlertInstanceKey{
|
||||
RuleOrgID: alertRule.OrgID,
|
||||
RuleUID: alertRule.UID,
|
||||
LabelsHash: labelsHash,
|
||||
},
|
||||
CurrentState: models.InstanceStateFiring,
|
||||
CurrentReason: string(models.InstanceStateError),
|
||||
Labels: labels,
|
||||
}
|
||||
instances = append(instances, instance)
|
||||
keys = append(keys, instance.AlertInstanceKey)
|
||||
}
|
||||
|
||||
b.StartTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_ = dbstore.SaveAlertInstances(ctx, instances...)
|
||||
_ = dbstore.DeleteAlertInstances(ctx, keys...)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIntegrationAlertInstanceBulkWrite(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test")
|
||||
}
|
||||
ctx := context.Background()
|
||||
_, dbstore := tests.SetupTestEnv(t, baseIntervalSeconds)
|
||||
|
||||
orgIDs := []int64{1, 2, 3, 4, 5}
|
||||
counts := []int{20_000, 200, 503, 0, 1256}
|
||||
instances := []models.AlertInstance{}
|
||||
keys := []models.AlertInstanceKey{}
|
||||
|
||||
for i, id := range orgIDs {
|
||||
alertRule := tests.CreateTestAlertRule(t, ctx, dbstore, 60, id)
|
||||
|
||||
// Create some instances to write down and then delete.
|
||||
for j := 0; j < counts[i]; j++ {
|
||||
labels := models.InstanceLabels{"test": fmt.Sprint(j)}
|
||||
_, labelsHash, _ := labels.StringAndHash()
|
||||
instance := models.AlertInstance{
|
||||
AlertInstanceKey: models.AlertInstanceKey{
|
||||
RuleOrgID: alertRule.OrgID,
|
||||
RuleUID: alertRule.UID,
|
||||
LabelsHash: labelsHash,
|
||||
},
|
||||
CurrentState: models.InstanceStateFiring,
|
||||
CurrentReason: string(models.InstanceStateError),
|
||||
Labels: labels,
|
||||
}
|
||||
instances = append(instances, instance)
|
||||
keys = append(keys, instance.AlertInstanceKey)
|
||||
}
|
||||
}
|
||||
|
||||
for _, bigStmts := range []bool{false, true} {
|
||||
dbstore.FeatureToggles.(*tests.FakeFeatures).BigTransactions = bigStmts
|
||||
err := dbstore.SaveAlertInstances(ctx, instances...)
|
||||
require.NoError(t, err)
|
||||
t.Log("Finished database write")
|
||||
|
||||
// List our instances. Make sure we have the right count.
|
||||
for i, id := range orgIDs {
|
||||
q := &models.ListAlertInstancesQuery{
|
||||
RuleOrgID: id,
|
||||
}
|
||||
err = dbstore.ListAlertInstances(ctx, q)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, counts[i], len(q.Result), "Org %v: Expected %v instances but got %v", id, counts[i], len(q.Result))
|
||||
}
|
||||
t.Log("Finished database read")
|
||||
|
||||
err = dbstore.DeleteAlertInstances(ctx, keys...)
|
||||
require.NoError(t, err)
|
||||
t.Log("Finished database delete")
|
||||
|
||||
for _, id := range orgIDs {
|
||||
q := &models.ListAlertInstancesQuery{
|
||||
RuleOrgID: id,
|
||||
}
|
||||
err = dbstore.ListAlertInstances(ctx, q)
|
||||
require.NoError(t, err)
|
||||
require.Zero(t, len(q.Result), "Org %v: Deleted instances but still had %v", id, len(q.Result))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestIntegrationAlertInstanceOperations(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test")
|
||||
@ -34,43 +139,53 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) {
|
||||
require.Equal(t, orgID, alertRule4.OrgID)
|
||||
|
||||
t.Run("can save and read new alert instance", func(t *testing.T) {
|
||||
saveCmd := &models.SaveAlertInstanceCommand{
|
||||
RuleOrgID: alertRule1.OrgID,
|
||||
RuleUID: alertRule1.UID,
|
||||
State: models.InstanceStateFiring,
|
||||
StateReason: string(models.InstanceStateError),
|
||||
Labels: models.InstanceLabels{"test": "testValue"},
|
||||
labels := models.InstanceLabels{"test": "testValue"}
|
||||
_, hash, _ := labels.StringAndHash()
|
||||
instance := models.AlertInstance{
|
||||
AlertInstanceKey: models.AlertInstanceKey{
|
||||
RuleOrgID: alertRule1.OrgID,
|
||||
RuleUID: alertRule1.UID,
|
||||
LabelsHash: hash,
|
||||
},
|
||||
CurrentState: models.InstanceStateFiring,
|
||||
CurrentReason: string(models.InstanceStateError),
|
||||
Labels: labels,
|
||||
}
|
||||
err := dbstore.SaveAlertInstance(ctx, saveCmd)
|
||||
err := dbstore.SaveAlertInstances(ctx, instance)
|
||||
require.NoError(t, err)
|
||||
|
||||
listCmd := &models.ListAlertInstancesQuery{
|
||||
RuleOrgID: saveCmd.RuleOrgID,
|
||||
RuleUID: saveCmd.RuleUID,
|
||||
RuleOrgID: instance.RuleOrgID,
|
||||
RuleUID: instance.RuleUID,
|
||||
}
|
||||
err = dbstore.ListAlertInstances(ctx, listCmd)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, listCmd.Result, 1)
|
||||
require.Equal(t, saveCmd.Labels, listCmd.Result[0].Labels)
|
||||
require.Equal(t, instance.Labels, listCmd.Result[0].Labels)
|
||||
require.Equal(t, alertRule1.OrgID, listCmd.Result[0].RuleOrgID)
|
||||
require.Equal(t, alertRule1.UID, listCmd.Result[0].RuleUID)
|
||||
require.Equal(t, saveCmd.StateReason, listCmd.Result[0].CurrentReason)
|
||||
require.Equal(t, instance.CurrentReason, listCmd.Result[0].CurrentReason)
|
||||
})
|
||||
|
||||
t.Run("can save and read new alert instance with no labels", func(t *testing.T) {
|
||||
saveCmd := &models.SaveAlertInstanceCommand{
|
||||
RuleOrgID: alertRule2.OrgID,
|
||||
RuleUID: alertRule2.UID,
|
||||
State: models.InstanceStateNormal,
|
||||
Labels: models.InstanceLabels{},
|
||||
labels := models.InstanceLabels{}
|
||||
_, hash, _ := labels.StringAndHash()
|
||||
instance := models.AlertInstance{
|
||||
AlertInstanceKey: models.AlertInstanceKey{
|
||||
RuleOrgID: alertRule2.OrgID,
|
||||
RuleUID: alertRule2.UID,
|
||||
LabelsHash: hash,
|
||||
},
|
||||
CurrentState: models.InstanceStateNormal,
|
||||
Labels: labels,
|
||||
}
|
||||
err := dbstore.SaveAlertInstance(ctx, saveCmd)
|
||||
err := dbstore.SaveAlertInstances(ctx, instance)
|
||||
require.NoError(t, err)
|
||||
|
||||
listCmd := &models.ListAlertInstancesQuery{
|
||||
RuleOrgID: saveCmd.RuleOrgID,
|
||||
RuleUID: saveCmd.RuleUID,
|
||||
RuleOrgID: instance.RuleOrgID,
|
||||
RuleUID: instance.RuleUID,
|
||||
}
|
||||
|
||||
err = dbstore.ListAlertInstances(ctx, listCmd)
|
||||
@ -79,32 +194,42 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) {
|
||||
require.Len(t, listCmd.Result, 1)
|
||||
require.Equal(t, alertRule2.OrgID, listCmd.Result[0].RuleOrgID)
|
||||
require.Equal(t, alertRule2.UID, listCmd.Result[0].RuleUID)
|
||||
require.Equal(t, saveCmd.Labels, listCmd.Result[0].Labels)
|
||||
require.Equal(t, instance.Labels, listCmd.Result[0].Labels)
|
||||
})
|
||||
|
||||
t.Run("can save two instances with same org_id, uid and different labels", func(t *testing.T) {
|
||||
saveCmdOne := &models.SaveAlertInstanceCommand{
|
||||
RuleOrgID: alertRule3.OrgID,
|
||||
RuleUID: alertRule3.UID,
|
||||
State: models.InstanceStateFiring,
|
||||
Labels: models.InstanceLabels{"test": "testValue"},
|
||||
labels := models.InstanceLabels{"test": "testValue"}
|
||||
_, hash, _ := labels.StringAndHash()
|
||||
instance1 := models.AlertInstance{
|
||||
AlertInstanceKey: models.AlertInstanceKey{
|
||||
RuleOrgID: alertRule3.OrgID,
|
||||
RuleUID: alertRule3.UID,
|
||||
LabelsHash: hash,
|
||||
},
|
||||
CurrentState: models.InstanceStateFiring,
|
||||
Labels: labels,
|
||||
}
|
||||
|
||||
err := dbstore.SaveAlertInstance(ctx, saveCmdOne)
|
||||
err := dbstore.SaveAlertInstances(ctx, instance1)
|
||||
require.NoError(t, err)
|
||||
|
||||
saveCmdTwo := &models.SaveAlertInstanceCommand{
|
||||
RuleOrgID: saveCmdOne.RuleOrgID,
|
||||
RuleUID: saveCmdOne.RuleUID,
|
||||
State: models.InstanceStateFiring,
|
||||
Labels: models.InstanceLabels{"test": "meow"},
|
||||
labels = models.InstanceLabels{"test": "testValue2"}
|
||||
_, hash, _ = labels.StringAndHash()
|
||||
instance2 := models.AlertInstance{
|
||||
AlertInstanceKey: models.AlertInstanceKey{
|
||||
RuleOrgID: instance1.RuleOrgID,
|
||||
RuleUID: instance1.RuleUID,
|
||||
LabelsHash: hash,
|
||||
},
|
||||
CurrentState: models.InstanceStateFiring,
|
||||
Labels: labels,
|
||||
}
|
||||
err = dbstore.SaveAlertInstance(ctx, saveCmdTwo)
|
||||
err = dbstore.SaveAlertInstances(ctx, instance2)
|
||||
require.NoError(t, err)
|
||||
|
||||
listQuery := &models.ListAlertInstancesQuery{
|
||||
RuleOrgID: saveCmdOne.RuleOrgID,
|
||||
RuleUID: saveCmdOne.RuleUID,
|
||||
RuleOrgID: instance1.RuleOrgID,
|
||||
RuleUID: instance1.RuleUID,
|
||||
}
|
||||
|
||||
err = dbstore.ListAlertInstances(ctx, listQuery)
|
||||
@ -136,24 +261,32 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) {
|
||||
require.Len(t, listQuery.Result, 1)
|
||||
})
|
||||
|
||||
t.Run("update instance with same org_id, uid and different labels", func(t *testing.T) {
|
||||
saveCmdOne := &models.SaveAlertInstanceCommand{
|
||||
RuleOrgID: alertRule4.OrgID,
|
||||
RuleUID: alertRule4.UID,
|
||||
State: models.InstanceStateFiring,
|
||||
Labels: models.InstanceLabels{"test": "testValue"},
|
||||
t.Run("update instance with same org_id, uid and different state", func(t *testing.T) {
|
||||
labels := models.InstanceLabels{"test": "testValue"}
|
||||
_, hash, _ := labels.StringAndHash()
|
||||
instance1 := models.AlertInstance{
|
||||
AlertInstanceKey: models.AlertInstanceKey{
|
||||
RuleOrgID: alertRule4.OrgID,
|
||||
RuleUID: alertRule4.UID,
|
||||
LabelsHash: hash,
|
||||
},
|
||||
CurrentState: models.InstanceStateFiring,
|
||||
Labels: labels,
|
||||
}
|
||||
|
||||
err := dbstore.SaveAlertInstance(ctx, saveCmdOne)
|
||||
err := dbstore.SaveAlertInstances(ctx, instance1)
|
||||
require.NoError(t, err)
|
||||
|
||||
saveCmdTwo := &models.SaveAlertInstanceCommand{
|
||||
RuleOrgID: saveCmdOne.RuleOrgID,
|
||||
RuleUID: saveCmdOne.RuleUID,
|
||||
State: models.InstanceStateNormal,
|
||||
Labels: models.InstanceLabels{"test": "testValue"},
|
||||
instance2 := models.AlertInstance{
|
||||
AlertInstanceKey: models.AlertInstanceKey{
|
||||
RuleOrgID: alertRule4.OrgID,
|
||||
RuleUID: instance1.RuleUID,
|
||||
LabelsHash: instance1.LabelsHash,
|
||||
},
|
||||
CurrentState: models.InstanceStateNormal,
|
||||
Labels: instance1.Labels,
|
||||
}
|
||||
err = dbstore.SaveAlertInstance(ctx, saveCmdTwo)
|
||||
err = dbstore.SaveAlertInstances(ctx, instance2)
|
||||
require.NoError(t, err)
|
||||
|
||||
listQuery := &models.ListAlertInstancesQuery{
|
||||
@ -166,9 +299,9 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) {
|
||||
|
||||
require.Len(t, listQuery.Result, 1)
|
||||
|
||||
require.Equal(t, saveCmdTwo.RuleOrgID, listQuery.Result[0].RuleOrgID)
|
||||
require.Equal(t, saveCmdTwo.RuleUID, listQuery.Result[0].RuleUID)
|
||||
require.Equal(t, saveCmdTwo.Labels, listQuery.Result[0].Labels)
|
||||
require.Equal(t, saveCmdTwo.State, listQuery.Result[0].CurrentState)
|
||||
require.Equal(t, instance2.RuleOrgID, listQuery.Result[0].RuleOrgID)
|
||||
require.Equal(t, instance2.RuleUID, listQuery.Result[0].RuleUID)
|
||||
require.Equal(t, instance2.Labels, listQuery.Result[0].Labels)
|
||||
require.Equal(t, instance2.CurrentState, listQuery.Result[0].CurrentState)
|
||||
})
|
||||
}
|
||||
|
@ -38,16 +38,28 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type FakeFeatures struct {
|
||||
BigTransactions bool
|
||||
}
|
||||
|
||||
func (f *FakeFeatures) IsEnabled(feature string) bool {
|
||||
if feature == featuremgmt.FlagAlertingBigTransactions {
|
||||
return f.BigTransactions
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// SetupTestEnv initializes a store to used by the tests.
|
||||
func SetupTestEnv(t *testing.T, baseInterval time.Duration) (*ngalert.AlertNG, *store.DBstore) {
|
||||
t.Helper()
|
||||
func SetupTestEnv(tb testing.TB, baseInterval time.Duration) (*ngalert.AlertNG, *store.DBstore) {
|
||||
tb.Helper()
|
||||
origNewGuardian := guardian.New
|
||||
guardian.MockDashboardGuardian(&guardian.FakeDashboardGuardian{
|
||||
CanSaveValue: true,
|
||||
CanViewValue: true,
|
||||
CanAdminValue: true,
|
||||
})
|
||||
t.Cleanup(func() {
|
||||
tb.Cleanup(func() {
|
||||
guardian.New = origNewGuardian
|
||||
})
|
||||
|
||||
@ -60,8 +72,8 @@ func SetupTestEnv(t *testing.T, baseInterval time.Duration) (*ngalert.AlertNG, *
|
||||
*cfg.UnifiedAlerting.Enabled = true
|
||||
|
||||
m := metrics.NewNGAlert(prometheus.NewRegistry())
|
||||
sqlStore := sqlstore.InitTestDB(t)
|
||||
secretsService := secretsManager.SetupTestService(t, database.ProvideSecretsStore(sqlStore))
|
||||
sqlStore := sqlstore.InitTestDB(tb)
|
||||
secretsService := secretsManager.SetupTestService(tb, database.ProvideSecretsStore(sqlStore))
|
||||
dashboardStore := databasestore.ProvideDashboardStore(sqlStore, featuremgmt.WithFeatures(), tagimpl.ProvideService(sqlStore, sqlStore.Cfg))
|
||||
|
||||
ac := acmock.New()
|
||||
@ -82,12 +94,13 @@ func SetupTestEnv(t *testing.T, baseInterval time.Duration) (*ngalert.AlertNG, *
|
||||
)
|
||||
|
||||
ng, err := ngalert.ProvideService(
|
||||
cfg, nil, nil, routing.NewRouteRegister(), sqlStore, nil, nil, nil, nil,
|
||||
cfg, &FakeFeatures{}, nil, nil, routing.NewRouteRegister(), sqlStore, nil, nil, nil, nil,
|
||||
secretsService, nil, m, folderService, ac, &dashboards.FakeDashboardService{}, nil, bus, ac, annotationstest.NewFakeAnnotationsRepo(),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NoError(tb, err)
|
||||
return ng, &store.DBstore{
|
||||
SQLStore: ng.SQLStore,
|
||||
FeatureToggles: ng.FeatureToggles,
|
||||
SQLStore: ng.SQLStore,
|
||||
Cfg: setting.UnifiedAlertingSettings{
|
||||
BaseInterval: baseInterval * time.Second,
|
||||
},
|
||||
@ -98,11 +111,11 @@ func SetupTestEnv(t *testing.T, baseInterval time.Duration) (*ngalert.AlertNG, *
|
||||
}
|
||||
|
||||
// CreateTestAlertRule creates a dummy alert definition to be used by the tests.
|
||||
func CreateTestAlertRule(t *testing.T, ctx context.Context, dbstore *store.DBstore, intervalSeconds int64, orgID int64) *models.AlertRule {
|
||||
func CreateTestAlertRule(t testing.TB, ctx context.Context, dbstore *store.DBstore, intervalSeconds int64, orgID int64) *models.AlertRule {
|
||||
return CreateTestAlertRuleWithLabels(t, ctx, dbstore, intervalSeconds, orgID, nil)
|
||||
}
|
||||
|
||||
func CreateTestAlertRuleWithLabels(t *testing.T, ctx context.Context, dbstore *store.DBstore, intervalSeconds int64, orgID int64, labels map[string]string) *models.AlertRule {
|
||||
func CreateTestAlertRuleWithLabels(t testing.TB, ctx context.Context, dbstore *store.DBstore, intervalSeconds int64, orgID int64, labels map[string]string) *models.AlertRule {
|
||||
ruleGroup := fmt.Sprintf("ruleGroup-%s", util.GenerateShortUID())
|
||||
folderUID := "namespace"
|
||||
user := &user.SignedInUser{
|
||||
|
@ -235,7 +235,6 @@ func (db *PostgresDialect) UpsertMultipleSQL(tableName string, keyCols, updateCo
|
||||
}
|
||||
columnsStr := strings.Builder{}
|
||||
onConflictStr := strings.Builder{}
|
||||
colPlaceHoldersStr := strings.Builder{}
|
||||
setStr := strings.Builder{}
|
||||
|
||||
const separator = ", "
|
||||
@ -246,8 +245,7 @@ func (db *PostgresDialect) UpsertMultipleSQL(tableName string, keyCols, updateCo
|
||||
}
|
||||
|
||||
columnsStr.WriteString(fmt.Sprintf("%s%s", db.Quote(c), separatorVar))
|
||||
colPlaceHoldersStr.WriteString(fmt.Sprintf("?%s", separatorVar))
|
||||
setStr.WriteString(fmt.Sprintf("%s=excluded.%s%s", db.Quote(c), db.Quote(c), separatorVar))
|
||||
setStr.WriteString(fmt.Sprintf("%s=EXCLUDED.%s%s", db.Quote(c), db.Quote(c), separatorVar))
|
||||
}
|
||||
|
||||
separatorVar = separator
|
||||
@ -260,21 +258,36 @@ func (db *PostgresDialect) UpsertMultipleSQL(tableName string, keyCols, updateCo
|
||||
|
||||
valuesStr := strings.Builder{}
|
||||
separatorVar = separator
|
||||
colPlaceHolders := colPlaceHoldersStr.String()
|
||||
nextPlaceHolder := 1
|
||||
|
||||
for i := 0; i < count; i++ {
|
||||
if i == count-1 {
|
||||
separatorVar = ""
|
||||
}
|
||||
|
||||
colPlaceHoldersStr := strings.Builder{}
|
||||
placeHolderSep := separator
|
||||
for j := 1; j <= len(updateCols); j++ {
|
||||
if j == len(updateCols) {
|
||||
placeHolderSep = ""
|
||||
}
|
||||
placeHolder := fmt.Sprintf("$%v%s", nextPlaceHolder, placeHolderSep)
|
||||
nextPlaceHolder++
|
||||
colPlaceHoldersStr.WriteString(placeHolder)
|
||||
}
|
||||
colPlaceHolders := colPlaceHoldersStr.String()
|
||||
|
||||
valuesStr.WriteString(fmt.Sprintf("(%s)%s", colPlaceHolders, separatorVar))
|
||||
}
|
||||
|
||||
s := fmt.Sprintf(`INSERT INTO %s (%s) VALUES %s ON CONFLICT(%s) DO UPDATE SET %s`,
|
||||
s := fmt.Sprintf(`INSERT INTO %s (%s) VALUES %s ON CONFLICT (%s) DO UPDATE SET %s;`,
|
||||
tableName,
|
||||
columnsStr.String(),
|
||||
valuesStr.String(),
|
||||
onConflictStr.String(),
|
||||
setStr.String(),
|
||||
)
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
|
@ -23,7 +23,7 @@ func TestUpsertMultiple(t *testing.T) {
|
||||
[]string{"key1", "key2", "val1", "val2"},
|
||||
1,
|
||||
false,
|
||||
"INSERT INTO test_table (\"key1\", \"key2\", \"val1\", \"val2\") VALUES (?, ?, ?, ?) ON CONFLICT(\"key1\", \"key2\") DO UPDATE SET \"key1\"=excluded.\"key1\", \"key2\"=excluded.\"key2\", \"val1\"=excluded.\"val1\", \"val2\"=excluded.\"val2\"",
|
||||
"INSERT INTO test_table (\"key1\", \"key2\", \"val1\", \"val2\") VALUES ($1, $2, $3, $4) ON CONFLICT (\"key1\", \"key2\") DO UPDATE SET \"key1\"=EXCLUDED.\"key1\", \"key2\"=EXCLUDED.\"key2\", \"val1\"=EXCLUDED.\"val1\", \"val2\"=EXCLUDED.\"val2\";",
|
||||
"INSERT INTO test_table (`key1`, `key2`, `val1`, `val2`) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE `key1`=VALUES(`key1`), `key2`=VALUES(`key2`), `val1`=VALUES(`val1`), `val2`=VALUES(`val2`)",
|
||||
"INSERT INTO test_table (`key1`, `key2`, `val1`, `val2`) VALUES (?, ?, ?, ?) ON CONFLICT(`key1`, `key2`) DO UPDATE SET `key1`=excluded.`key1`, `key2`=excluded.`key2`, `val1`=excluded.`val1`, `val2`=excluded.`val2`",
|
||||
},
|
||||
@ -33,7 +33,7 @@ func TestUpsertMultiple(t *testing.T) {
|
||||
[]string{"key1", "key2", "val1", "val2"},
|
||||
2,
|
||||
false,
|
||||
"INSERT INTO test_table (\"key1\", \"key2\", \"val1\", \"val2\") VALUES (?, ?, ?, ?), (?, ?, ?, ?) ON CONFLICT(\"key1\", \"key2\") DO UPDATE SET \"key1\"=excluded.\"key1\", \"key2\"=excluded.\"key2\", \"val1\"=excluded.\"val1\", \"val2\"=excluded.\"val2\"",
|
||||
"INSERT INTO test_table (\"key1\", \"key2\", \"val1\", \"val2\") VALUES ($1, $2, $3, $4), ($5, $6, $7, $8) ON CONFLICT (\"key1\", \"key2\") DO UPDATE SET \"key1\"=EXCLUDED.\"key1\", \"key2\"=EXCLUDED.\"key2\", \"val1\"=EXCLUDED.\"val1\", \"val2\"=EXCLUDED.\"val2\";",
|
||||
"INSERT INTO test_table (`key1`, `key2`, `val1`, `val2`) VALUES (?, ?, ?, ?), (?, ?, ?, ?) ON DUPLICATE KEY UPDATE `key1`=VALUES(`key1`), `key2`=VALUES(`key2`), `val1`=VALUES(`val1`), `val2`=VALUES(`val2`)",
|
||||
"INSERT INTO test_table (`key1`, `key2`, `val1`, `val2`) VALUES (?, ?, ?, ?), (?, ?, ?, ?) ON CONFLICT(`key1`, `key2`) DO UPDATE SET `key1`=excluded.`key1`, `key2`=excluded.`key2`, `val1`=excluded.`val1`, `val2`=excluded.`val2`",
|
||||
},
|
||||
|
Loading…
Reference in New Issue
Block a user