grafana/pkg/services/ngalert/store/instance_database.go
2024-01-22 13:07:11 +01:00

234 lines
7.3 KiB
Go

package store
import (
"context"
"fmt"
"sort"
"strings"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/sqlstore"
)
// ListAlertInstances is a handler for retrieving alert instances within specific organisation
// based on various filters.
func (st DBstore) ListAlertInstances(ctx context.Context, cmd *models.ListAlertInstancesQuery) (result []*models.AlertInstance, err error) {
err = st.SQLStore.WithDbSession(ctx, func(sess *db.Session) error {
alertInstances := make([]*models.AlertInstance, 0)
s := strings.Builder{}
params := make([]any, 0)
addToQuery := func(stmt string, p ...any) {
s.WriteString(stmt)
params = append(params, p...)
}
addToQuery("SELECT * FROM alert_instance WHERE rule_org_id = ?", cmd.RuleOrgID)
if cmd.RuleUID != "" {
addToQuery(` AND rule_uid = ?`, cmd.RuleUID)
}
if st.FeatureToggles.IsEnabled(ctx, featuremgmt.FlagAlertingNoNormalState) {
s.WriteString(fmt.Sprintf(" AND NOT (current_state = '%s' AND current_reason = '')", models.InstanceStateNormal))
}
if err := sess.SQL(s.String(), params...).Find(&alertInstances); err != nil {
return err
}
result = alertInstances
return nil
})
return result, err
}
// SaveAlertInstance is a handler for saving a new alert instance.
func (st DBstore) SaveAlertInstance(ctx context.Context, alertInstance models.AlertInstance) error {
return st.SQLStore.WithDbSession(ctx, func(sess *db.Session) error {
if err := models.ValidateAlertInstance(alertInstance); err != nil {
return err
}
labelTupleJSON, err := alertInstance.Labels.StringKey()
if err != nil {
return err
}
params := append(make([]any, 0), alertInstance.RuleOrgID, alertInstance.RuleUID, labelTupleJSON, alertInstance.LabelsHash, alertInstance.CurrentState, alertInstance.CurrentReason, alertInstance.CurrentStateSince.Unix(), alertInstance.CurrentStateEnd.Unix(), alertInstance.LastEvalTime.Unix(), alertInstance.ResultFingerprint)
upsertSQL := st.SQLStore.GetDialect().UpsertSQL(
"alert_instance",
[]string{"rule_org_id", "rule_uid", "labels_hash"},
[]string{"rule_org_id", "rule_uid", "labels", "labels_hash", "current_state", "current_reason", "current_state_since", "current_state_end", "last_eval_time", "result_fingerprint"})
_, err = sess.SQL(upsertSQL, params...).Query()
if err != nil {
return err
}
return nil
})
}
func (st DBstore) FetchOrgIds(ctx context.Context) ([]int64, error) {
orgIds := []int64{}
err := st.SQLStore.WithDbSession(ctx, func(sess *db.Session) error {
s := strings.Builder{}
params := make([]any, 0)
addToQuery := func(stmt string, p ...any) {
s.WriteString(stmt)
params = append(params, p...)
}
addToQuery("SELECT DISTINCT rule_org_id FROM alert_instance")
if err := sess.SQL(s.String(), params...).Find(&orgIds); err != nil {
return err
}
return nil
})
return orgIds, err
}
// DeleteAlertInstances deletes instances with the provided keys in a single transaction.
func (st DBstore) DeleteAlertInstances(ctx context.Context, keys ...models.AlertInstanceKey) error {
if len(keys) == 0 {
return nil
}
type data struct {
ruleOrgID int64
ruleUID string
labelHashes []any
}
// Sort by org and rule UID. Most callers will have grouped already, but it's
// cheap to verify and leads to more compact transactions.
sort.Slice(keys, func(i, j int) bool {
aye := keys[i]
jay := keys[j]
if aye.RuleOrgID < jay.RuleOrgID {
return true
}
if aye.RuleOrgID == jay.RuleOrgID && aye.RuleUID < jay.RuleUID {
return true
}
return false
})
maxRows := 200
rowData := data{
0, "", make([]any, 0, maxRows),
}
placeholdersBuilder := strings.Builder{}
placeholdersBuilder.WriteString("(")
execQuery := func(s *db.Session, rd data, placeholders string) error {
if len(rd.labelHashes) == 0 {
return nil
}
placeholders = strings.TrimRight(placeholders, ", ")
placeholders = placeholders + ")"
queryString := fmt.Sprintf(
"DELETE FROM alert_instance WHERE rule_org_id = ? AND rule_uid = ? AND labels_hash IN %s;",
placeholders,
)
execArgs := make([]any, 0, 3+len(rd.labelHashes))
execArgs = append(execArgs, queryString, rd.ruleOrgID, rd.ruleUID)
execArgs = append(execArgs, rd.labelHashes...)
_, err := s.Exec(execArgs...)
if err != nil {
return err
}
return nil
}
err := st.SQLStore.WithTransactionalDbSession(ctx, func(sess *db.Session) error {
counter := 0
// Create batches of up to 200 items and execute a new delete statement for each batch.
for _, k := range keys {
counter++
// When a rule ID changes or we hit 200 hashes, issue a statement.
if rowData.ruleOrgID != k.RuleOrgID || rowData.ruleUID != k.RuleUID || len(rowData.labelHashes) >= 200 {
err := execQuery(sess, rowData, placeholdersBuilder.String())
if err != nil {
return err
}
// reset our reused data.
rowData.ruleOrgID = k.RuleOrgID
rowData.ruleUID = k.RuleUID
rowData.labelHashes = rowData.labelHashes[:0]
placeholdersBuilder.Reset()
placeholdersBuilder.WriteString("(")
}
// Accumulate new values.
rowData.labelHashes = append(rowData.labelHashes, k.LabelsHash)
placeholdersBuilder.WriteString("?, ")
}
// Delete any remaining rows.
if len(rowData.labelHashes) != 0 {
err := execQuery(sess, rowData, placeholdersBuilder.String())
if err != nil {
return err
}
}
return nil
})
return err
}
func (st DBstore) DeleteAlertInstancesByRule(ctx context.Context, key models.AlertRuleKey) error {
return st.SQLStore.WithTransactionalDbSession(ctx, func(sess *db.Session) error {
_, err := sess.Exec("DELETE FROM alert_instance WHERE rule_org_id = ? AND rule_uid = ?", key.OrgID, key.UID)
return err
})
}
func (st DBstore) FullSync(ctx context.Context, instances []models.AlertInstance) error {
if len(instances) == 0 {
return nil
}
return st.SQLStore.WithTransactionalDbSession(ctx, func(sess *sqlstore.DBSession) error {
// First we delete all records from the table
if _, err := sess.Exec("DELETE FROM alert_instance"); err != nil {
return fmt.Errorf("failed to delete alert_instance table: %w", err)
}
for _, alertInstance := range instances {
if err := models.ValidateAlertInstance(alertInstance); err != nil {
st.Logger.Warn("Failed to validate alert instance, skipping", "err", err, "rule_uid", alertInstance.RuleUID)
continue
}
labelTupleJSON, err := alertInstance.Labels.StringKey()
if err != nil {
st.Logger.Warn("Failed to generate alert instance labels key, skipping", "err", err, "rule_uid", alertInstance.RuleUID)
continue
}
_, err = sess.Exec("INSERT INTO alert_instance (rule_org_id, rule_uid, labels, labels_hash, current_state, current_reason, current_state_since, current_state_end, last_eval_time) VALUES (?,?,?,?,?,?,?,?,?)",
alertInstance.RuleOrgID, alertInstance.RuleUID, labelTupleJSON, alertInstance.LabelsHash, alertInstance.CurrentState, alertInstance.CurrentReason, alertInstance.CurrentStateSince.Unix(), alertInstance.CurrentStateEnd.Unix(), alertInstance.LastEvalTime.Unix())
if err != nil {
return fmt.Errorf("failed to insert into alert_instance table: %w", err)
}
}
if err := sess.Commit(); err != nil {
return fmt.Errorf("failed to commit alert_instance table: %w", err)
}
return nil
})
}