AlertingNG: Create a scheduler to evaluate alert definitions (#29305)

* Always use cache: stop passing skipCache among ngalert functions

* Add updated column

* Scheduler initial draft

* Add retry on failure

* Allow settting/updating alert definition interval

Set default interval if no interval is provided during alert definition creation.
Keep existing alert definition interval if no interval is provided during alert definition update.

* Parameterise alerting.Ticker to run on custom interval

* Allow updating alert definition interval without having to provide the queries and expressions

* Add schedule tests

* Use xorm tags for having initialisms with consistent case in Go

* Add ability to pause/unpause the scheduler

* Add alert definition versioning

* Optimise scheduler to fetch alert definition only when it's necessary

* Change MySQL data column to mediumtext

* Delete alert definition versions

* Increase default scheduler interval to 10 seconds

* Fix setting OrgID on updates

* Add validation for alert definition name length

* Recreate tables
This commit is contained in:
Sofia Papagiannaki 2020-12-17 16:00:09 +02:00 committed by GitHub
parent 7d5c07549a
commit 3cac10e598
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1204 additions and 260 deletions

View File

@ -46,7 +46,7 @@ func (e *AlertEngine) IsDisabled() bool {
// Init initializes the AlertingService.
func (e *AlertEngine) Init() error {
e.ticker = NewTicker(time.Now(), time.Second*0, clock.New())
e.ticker = NewTicker(time.Now(), time.Second*0, clock.New(), 1)
e.execQueue = make(chan *Job, 1000)
e.scheduler = newScheduler()
e.evalHandler = NewEvalHandler()

View File

@ -9,7 +9,7 @@ import (
// Ticker is a ticker to power the alerting scheduler. it's like a time.Ticker, except:
// * it doesn't drop ticks for slow receivers, rather, it queues up. so that callers are in control to instrument what's going on.
// * it automatically ticks every second, which is the right thing in our current design
// * it ticks on second marks or very shortly after. this provides a predictable load pattern
// * it ticks on intervalSec marks or very shortly after. this provides a predictable load pattern
// (this shouldn't cause too much load contention issues because the next steps in the pipeline just process at their own pace)
// * the timestamps are used to mark "last datapoint to query for" and as such, are a configurable amount of seconds in the past
// * because we want to allow:
@ -17,21 +17,24 @@ import (
// - adjusting offset over time to compensate for storage backing up or getting fast and providing lower latency
// you specify a lastProcessed timestamp as well as an offset at creation, or runtime
type Ticker struct {
C chan time.Time
clock clock.Clock
last time.Time
offset time.Duration
newOffset chan time.Duration
C chan time.Time
clock clock.Clock
last time.Time
offset time.Duration
newOffset chan time.Duration
intervalSec int64
paused bool
}
// NewTicker returns a ticker that ticks on second marks or very shortly after, and never drops ticks
func NewTicker(last time.Time, initialOffset time.Duration, c clock.Clock) *Ticker {
// NewTicker returns a ticker that ticks on intervalSec marks or very shortly after, and never drops ticks
func NewTicker(last time.Time, initialOffset time.Duration, c clock.Clock, intervalSec int64) *Ticker {
t := &Ticker{
C: make(chan time.Time),
clock: c,
last: last,
offset: initialOffset,
newOffset: make(chan time.Duration),
C: make(chan time.Time),
clock: c,
last: last,
offset: initialOffset,
newOffset: make(chan time.Duration),
intervalSec: intervalSec,
}
go t.run()
return t
@ -39,10 +42,12 @@ func NewTicker(last time.Time, initialOffset time.Duration, c clock.Clock) *Tick
func (t *Ticker) run() {
for {
next := t.last.Add(time.Duration(1) * time.Second)
next := t.last.Add(time.Duration(t.intervalSec) * time.Second)
diff := t.clock.Now().Add(-t.offset).Sub(next)
if diff >= 0 {
t.C <- next
if !t.paused {
t.C <- next
}
t.last = next
continue
}
@ -54,3 +59,18 @@ func (t *Ticker) run() {
}
}
}
// ResetOffset resets the offset.
func (t *Ticker) ResetOffset(duration time.Duration) {
t.newOffset <- duration
}
// Pause unpauses the ticker and no ticks will be sent.
func (t *Ticker) Pause() {
t.paused = true
}
// Unpause unpauses the ticker and ticks will be sent.
func (t *Ticker) Unpause() {
t.paused = false
}

View File

@ -1,6 +1,12 @@
package ngalert
import "fmt"
import (
"fmt"
"time"
)
// timeNow makes it possible to test usage of time
var timeNow = time.Now
// preSave sets datasource and loads the updated model for each alert query.
func (alertDefinition *AlertDefinition) preSave() error {
@ -11,5 +17,6 @@ func (alertDefinition *AlertDefinition) preSave() error {
}
alertDefinition.Data[i] = q
}
alertDefinition.Updated = timeNow()
return nil
}

View File

@ -1,17 +1,13 @@
package ngalert
import (
"context"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/go-macaron/binding"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/api"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/middleware"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/tsdb"
"github.com/grafana/grafana/pkg/util"
)
@ -19,40 +15,29 @@ import (
func (ng *AlertNG) registerAPIEndpoints() {
ng.RouteRegister.Group("/api/alert-definitions", func(alertDefinitions routing.RouteRegister) {
alertDefinitions.Get("", middleware.ReqSignedIn, api.Wrap(ng.listAlertDefinitions))
alertDefinitions.Get("/eval/:alertDefinitionId", ng.validateOrgAlertDefinition, api.Wrap(ng.alertDefinitionEval))
alertDefinitions.Post("/eval", middleware.ReqSignedIn, binding.Bind(evalAlertConditionCommand{}), api.Wrap(ng.conditionEval))
alertDefinitions.Get("/eval/:alertDefinitionId", ng.validateOrgAlertDefinition, api.Wrap(ng.alertDefinitionEvalEndpoint))
alertDefinitions.Post("/eval", middleware.ReqSignedIn, binding.Bind(evalAlertConditionCommand{}), api.Wrap(ng.conditionEvalEndpoint))
alertDefinitions.Get("/:alertDefinitionId", ng.validateOrgAlertDefinition, api.Wrap(ng.getAlertDefinitionEndpoint))
alertDefinitions.Delete("/:alertDefinitionId", ng.validateOrgAlertDefinition, api.Wrap(ng.deleteAlertDefinitionEndpoint))
alertDefinitions.Post("/", middleware.ReqSignedIn, binding.Bind(saveAlertDefinitionCommand{}), api.Wrap(ng.createAlertDefinitionEndpoint))
alertDefinitions.Put("/:alertDefinitionId", ng.validateOrgAlertDefinition, binding.Bind(updateAlertDefinitionCommand{}), api.Wrap(ng.updateAlertDefinitionEndpoint))
})
ng.RouteRegister.Group("/api/ngalert/", func(schedulerRouter routing.RouteRegister) {
schedulerRouter.Post("/pause", api.Wrap(ng.pauseScheduler))
schedulerRouter.Post("/unpause", api.Wrap(ng.unpauseScheduler))
}, middleware.ReqOrgAdmin)
}
// conditionEval handles POST /api/alert-definitions/eval.
func (ng *AlertNG) conditionEval(c *models.ReqContext, dto evalAlertConditionCommand) api.Response {
alertCtx, cancelFn := context.WithTimeout(context.Background(), setting.AlertingEvaluationTimeout)
defer cancelFn()
alertExecCtx := eval.AlertExecCtx{Ctx: alertCtx, SignedInUser: c.SignedInUser}
fromStr := c.Query("from")
if fromStr == "" {
fromStr = "now-3h"
// conditionEvalEndpoint handles POST /api/alert-definitions/eval.
func (ng *AlertNG) conditionEvalEndpoint(c *models.ReqContext, dto evalAlertConditionCommand) api.Response {
if err := ng.validateCondition(dto.Condition, c.SignedInUser); err != nil {
return api.Error(400, "invalid condition", err)
}
toStr := c.Query("to")
if toStr == "" {
toStr = "now"
}
execResult, err := dto.Condition.Execute(alertExecCtx, fromStr, toStr)
evalResults, err := eval.ConditionEval(&dto.Condition, timeNow())
if err != nil {
return api.Error(400, "Failed to execute conditions", err)
}
evalResults, err := eval.EvaluateExecutionResult(execResult)
if err != nil {
return api.Error(400, "Failed to evaluate results", err)
return api.Error(400, "Failed to evaluate conditions", err)
}
frame := evalResults.AsDataFrame()
@ -67,48 +52,34 @@ func (ng *AlertNG) conditionEval(c *models.ReqContext, dto evalAlertConditionCom
})
}
// alertDefinitionEval handles GET /api/alert-definitions/eval/:dashboardId/:panelId/:refId".
func (ng *AlertNG) alertDefinitionEval(c *models.ReqContext) api.Response {
// alertDefinitionEvalEndpoint handles GET /api/alert-definitions/eval/:dashboardId/:panelId/:refId".
func (ng *AlertNG) alertDefinitionEvalEndpoint(c *models.ReqContext) api.Response {
alertDefinitionID := c.ParamsInt64(":alertDefinitionId")
fromStr := c.Query("from")
if fromStr == "" {
fromStr = "now-3h"
}
toStr := c.Query("to")
if toStr == "" {
toStr = "now"
}
conditions, err := ng.LoadAlertCondition(alertDefinitionID, c.SignedInUser, c.SkipCache)
condition, err := ng.LoadAlertCondition(alertDefinitionID)
if err != nil {
return api.Error(400, "Failed to load conditions", err)
return api.Error(400, "Failed to load alert definition conditions", err)
}
alertCtx, cancelFn := context.WithTimeout(context.Background(), setting.AlertingEvaluationTimeout)
defer cancelFn()
if err := ng.validateCondition(*condition, c.SignedInUser); err != nil {
return api.Error(400, "invalid condition", err)
}
alertExecCtx := eval.AlertExecCtx{Ctx: alertCtx, SignedInUser: c.SignedInUser}
execResult, err := conditions.Execute(alertExecCtx, fromStr, toStr)
evalResults, err := eval.ConditionEval(condition, timeNow())
if err != nil {
return api.Error(400, "Failed to execute conditions", err)
return api.Error(400, "Failed to evaludate alert", err)
}
evalResults, err := eval.EvaluateExecutionResult(execResult)
if err != nil {
return api.Error(400, "Failed to evaluate results", err)
}
frame := evalResults.AsDataFrame()
df := tsdb.NewDecodedDataFrames([]*data.Frame{&frame})
if err != nil {
return api.Error(400, "Failed to instantiate Dataframes from the decoded frames", err)
}
instances, err := df.Encoded()
if err != nil {
return api.Error(400, "Failed to encode result dataframes", err)
}
return api.JSON(200, util.DynMap{
"instances": instances,
})
@ -133,51 +104,80 @@ func (ng *AlertNG) getAlertDefinitionEndpoint(c *models.ReqContext) api.Response
func (ng *AlertNG) deleteAlertDefinitionEndpoint(c *models.ReqContext) api.Response {
alertDefinitionID := c.ParamsInt64(":alertDefinitionId")
query := deleteAlertDefinitionByIDQuery{
cmd := deleteAlertDefinitionByIDCommand{
ID: alertDefinitionID,
OrgID: c.SignedInUser.OrgId,
}
if err := ng.deleteAlertDefinitionByID(&query); err != nil {
if err := ng.deleteAlertDefinitionByID(&cmd); err != nil {
return api.Error(500, "Failed to delete alert definition", err)
}
return api.JSON(200, util.DynMap{"affectedRows": query.RowsAffected})
if cmd.RowsAffected != 1 {
ng.log.Warn("unexpected number of rows affected on alert definition delete", "definitionID", alertDefinitionID, "rowsAffected", cmd.RowsAffected)
}
return api.Success("Alert definition deleted")
}
// updateAlertDefinitionEndpoint handles PUT /api/alert-definitions/:alertDefinitionId.
func (ng *AlertNG) updateAlertDefinitionEndpoint(c *models.ReqContext, cmd updateAlertDefinitionCommand) api.Response {
cmd.ID = c.ParamsInt64(":alertDefinitionId")
cmd.SignedInUser = c.SignedInUser
cmd.SkipCache = c.SkipCache
cmd.OrgID = c.SignedInUser.OrgId
if err := ng.validateCondition(cmd.Condition, c.SignedInUser); err != nil {
return api.Error(400, "invalid condition", err)
}
if err := ng.updateAlertDefinition(&cmd); err != nil {
return api.Error(500, "Failed to update alert definition", err)
}
return api.JSON(200, util.DynMap{"affectedRows": cmd.RowsAffected, "id": cmd.Result.Id})
if cmd.RowsAffected != 1 {
ng.log.Warn("unexpected number of rows affected on alert definition update", "definitionID", cmd.ID, "rowsAffected", cmd.RowsAffected)
}
return api.Success("Alert definition updated")
}
// createAlertDefinitionEndpoint handles POST /api/alert-definitions.
func (ng *AlertNG) createAlertDefinitionEndpoint(c *models.ReqContext, cmd saveAlertDefinitionCommand) api.Response {
cmd.OrgID = c.SignedInUser.OrgId
cmd.SignedInUser = c.SignedInUser
cmd.SkipCache = c.SkipCache
if err := ng.validateCondition(cmd.Condition, c.SignedInUser); err != nil {
return api.Error(400, "invalid condition", err)
}
if err := ng.saveAlertDefinition(&cmd); err != nil {
return api.Error(500, "Failed to create alert definition", err)
}
return api.JSON(200, util.DynMap{"id": cmd.Result.Id})
return api.JSON(200, util.DynMap{"id": cmd.Result.ID})
}
// listAlertDefinitions handles GET /api/alert-definitions.
func (ng *AlertNG) listAlertDefinitions(c *models.ReqContext) api.Response {
cmd := listAlertDefinitionsCommand{OrgID: c.SignedInUser.OrgId}
query := listAlertDefinitionsQuery{OrgID: c.SignedInUser.OrgId}
if err := ng.getAlertDefinitions(&cmd); err != nil {
if err := ng.getOrgAlertDefinitions(&query); err != nil {
return api.Error(500, "Failed to list alert definitions", err)
}
return api.JSON(200, util.DynMap{"results": cmd.Result})
return api.JSON(200, util.DynMap{"results": query.Result})
}
func (ng *AlertNG) pauseScheduler() api.Response {
err := ng.schedule.pause()
if err != nil {
return api.Error(500, "Failed to pause scheduler", err)
}
return api.JSON(200, util.DynMap{"message": "alert definition scheduler paused"})
}
func (ng *AlertNG) unpauseScheduler() api.Response {
err := ng.schedule.unpause()
if err != nil {
return api.Error(500, "Failed to unpause scheduler", err)
}
return api.JSON(200, util.DynMap{"message": "alert definition scheduler unpaused"})
}

View File

@ -0,0 +1,85 @@
package ngalert
import (
"encoding/json"
"fmt"
"math/rand"
"testing"
"time"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/setting"
"github.com/stretchr/testify/require"
)
func setupTestEnv(t *testing.T) *AlertNG {
cfg := setting.NewCfg()
cfg.FeatureToggles = map[string]bool{"ngalert": true}
ng := overrideAlertNGInRegistry(cfg)
sqlStore := sqlstore.InitTestDB(t)
ng.SQLStore = sqlStore
err := ng.Init()
require.NoError(t, err)
return &ng
}
func overrideAlertNGInRegistry(cfg *setting.Cfg) AlertNG {
ng := AlertNG{
SQLStore: nil,
Cfg: cfg,
RouteRegister: routing.NewRouteRegister(),
log: log.New("ngalert-test"),
}
overrideServiceFunc := func(descriptor registry.Descriptor) (*registry.Descriptor, bool) {
if _, ok := descriptor.Instance.(*AlertNG); ok {
return &registry.Descriptor{
Name: "AlertNG",
Instance: &ng,
InitPriority: descriptor.InitPriority,
}, true
}
return nil, false
}
registry.RegisterOverride(overrideServiceFunc)
return ng
}
func createTestAlertDefinition(t *testing.T, ng *AlertNG, intervalSeconds int64) *AlertDefinition {
cmd := saveAlertDefinitionCommand{
OrgID: 1,
Title: fmt.Sprintf("an alert definition %d", rand.Intn(1000)),
Condition: eval.Condition{
RefID: "A",
QueriesAndExpressions: []eval.AlertQuery{
{
Model: json.RawMessage(`{
"datasource": "__expr__",
"type":"math",
"expression":"2 + 2 > 1"
}`),
RelativeTimeRange: eval.RelativeTimeRange{
From: eval.Duration(5 * time.Hour),
To: eval.Duration(3 * time.Hour),
},
RefID: "A",
},
},
},
IntervalSeconds: &intervalSeconds,
}
err := ng.saveAlertDefinition(&cmd)
require.NoError(t, err)
t.Logf("alert definition: %d with interval: %d created", cmd.Result.ID, intervalSeconds)
return cmd.Result
}

View File

@ -2,8 +2,11 @@ package ngalert
import (
"context"
"errors"
"fmt"
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/util"
)
func getAlertDefinitionByID(alertDefinitionID int64, sess *sqlstore.DBSession) (*AlertDefinition, error) {
@ -20,9 +23,9 @@ func getAlertDefinitionByID(alertDefinitionID int64, sess *sqlstore.DBSession) (
// deleteAlertDefinitionByID is a handler for deleting an alert definition.
// It returns models.ErrAlertDefinitionNotFound if no alert definition is found for the provided ID.
func (ng *AlertNG) deleteAlertDefinitionByID(query *deleteAlertDefinitionByIDQuery) error {
func (ng *AlertNG) deleteAlertDefinitionByID(cmd *deleteAlertDefinitionByIDCommand) error {
return ng.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
res, err := sess.Exec("DELETE FROM alert_definition WHERE id = ?", query.ID)
res, err := sess.Exec("DELETE FROM alert_definition WHERE id = ?", cmd.ID)
if err != nil {
return err
}
@ -31,7 +34,13 @@ func (ng *AlertNG) deleteAlertDefinitionByID(query *deleteAlertDefinitionByIDQue
if err != nil {
return err
}
query.RowsAffected = rowsAffected
cmd.RowsAffected = rowsAffected
_, err = sess.Exec("DELETE FROM alert_definition_version WHERE alert_definition_id = ?", cmd.ID)
if err != nil {
return err
}
return nil
})
}
@ -52,14 +61,29 @@ func (ng *AlertNG) getAlertDefinitionByID(query *getAlertDefinitionByIDQuery) er
// saveAlertDefinition is a handler for saving a new alert definition.
func (ng *AlertNG) saveAlertDefinition(cmd *saveAlertDefinitionCommand) error {
return ng.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
alertDefinition := &AlertDefinition{
OrgId: cmd.OrgID,
Name: cmd.Name,
Condition: cmd.Condition.RefID,
Data: cmd.Condition.QueriesAndExpressions,
intervalSeconds := defaultIntervalSeconds
if cmd.IntervalSeconds != nil {
intervalSeconds = *cmd.IntervalSeconds
}
if err := ng.validateAlertDefinition(alertDefinition, cmd.SignedInUser, cmd.SkipCache); err != nil {
var initialVersion int64 = 1
uid, err := generateNewAlertDefinitionUID(sess, cmd.OrgID)
if err != nil {
return fmt.Errorf("failed to generate UID for alert definition %q: %w", cmd.Title, err)
}
alertDefinition := &AlertDefinition{
OrgID: cmd.OrgID,
Title: cmd.Title,
Condition: cmd.Condition.RefID,
Data: cmd.Condition.QueriesAndExpressions,
IntervalSeconds: intervalSeconds,
Version: initialVersion,
UID: uid,
}
if err := ng.validateAlertDefinition(alertDefinition, false); err != nil {
return err
}
@ -71,6 +95,20 @@ func (ng *AlertNG) saveAlertDefinition(cmd *saveAlertDefinitionCommand) error {
return err
}
alertDefVersion := AlertDefinitionVersion{
AlertDefinitionID: alertDefinition.ID,
AlertDefinitionUID: alertDefinition.UID,
Version: alertDefinition.Version,
Created: alertDefinition.Updated,
Condition: alertDefinition.Condition,
Title: alertDefinition.Title,
Data: alertDefinition.Data,
IntervalSeconds: alertDefinition.IntervalSeconds,
}
if _, err := sess.Insert(alertDefVersion); err != nil {
return err
}
cmd.Result = alertDefinition
return nil
})
@ -81,13 +119,17 @@ func (ng *AlertNG) saveAlertDefinition(cmd *saveAlertDefinitionCommand) error {
func (ng *AlertNG) updateAlertDefinition(cmd *updateAlertDefinitionCommand) error {
return ng.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
alertDefinition := &AlertDefinition{
Id: cmd.ID,
Name: cmd.Name,
ID: cmd.ID,
Title: cmd.Title,
Condition: cmd.Condition.RefID,
Data: cmd.Condition.QueriesAndExpressions,
OrgID: cmd.OrgID,
}
if cmd.IntervalSeconds != nil {
alertDefinition.IntervalSeconds = *cmd.IntervalSeconds
}
if err := ng.validateAlertDefinition(alertDefinition, cmd.SignedInUser, cmd.SkipCache); err != nil {
if err := ng.validateAlertDefinition(alertDefinition, true); err != nil {
return err
}
@ -95,27 +137,101 @@ func (ng *AlertNG) updateAlertDefinition(cmd *updateAlertDefinitionCommand) erro
return err
}
existingAlertDefinition, err := getAlertDefinitionByID(alertDefinition.ID, sess)
if err != nil {
if errors.Is(err, errAlertDefinitionNotFound) {
cmd.Result = alertDefinition
cmd.RowsAffected = 0
return nil
}
return err
}
alertDefinition.Version = existingAlertDefinition.Version + 1
affectedRows, err := sess.ID(cmd.ID).Update(alertDefinition)
if err != nil {
return err
}
title := cmd.Title
if title == "" {
title = existingAlertDefinition.Title
}
condition := cmd.Condition.RefID
if condition == "" {
condition = existingAlertDefinition.Condition
}
data := cmd.Condition.QueriesAndExpressions
if data == nil {
data = existingAlertDefinition.Data
}
intervalSeconds := cmd.IntervalSeconds
if intervalSeconds == nil {
intervalSeconds = &existingAlertDefinition.IntervalSeconds
}
alertDefVersion := AlertDefinitionVersion{
AlertDefinitionID: alertDefinition.ID,
AlertDefinitionUID: existingAlertDefinition.UID,
ParentVersion: existingAlertDefinition.Version,
Version: alertDefinition.Version,
Condition: condition,
Created: alertDefinition.Updated,
Title: title,
Data: data,
IntervalSeconds: *intervalSeconds,
}
if _, err := sess.Insert(alertDefVersion); err != nil {
return err
}
cmd.Result = alertDefinition
cmd.RowsAffected = affectedRows
return nil
})
}
// getAlertDefinitions is a handler for retrieving alert definitions of specific organisation.
func (ng *AlertNG) getAlertDefinitions(cmd *listAlertDefinitionsCommand) error {
// getOrgAlertDefinitions is a handler for retrieving alert definitions of specific organisation.
func (ng *AlertNG) getOrgAlertDefinitions(query *listAlertDefinitionsQuery) error {
return ng.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
alertDefinitions := make([]*AlertDefinition, 0)
q := "SELECT * FROM alert_definition WHERE org_id = ?"
if err := sess.SQL(q, cmd.OrgID).Find(&alertDefinitions); err != nil {
if err := sess.SQL(q, query.OrgID).Find(&alertDefinitions); err != nil {
return err
}
cmd.Result = alertDefinitions
query.Result = alertDefinitions
return nil
})
}
func (ng *AlertNG) getAlertDefinitions(query *listAlertDefinitionsQuery) error {
return ng.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
alerts := make([]*AlertDefinition, 0)
q := "SELECT id, interval_seconds, version FROM alert_definition"
if err := sess.SQL(q).Find(&alerts); err != nil {
return err
}
query.Result = alerts
return nil
})
}
func generateNewAlertDefinitionUID(sess *sqlstore.DBSession, orgID int64) (string, error) {
for i := 0; i < 3; i++ {
uid := util.GenerateShortUID()
exists, err := sess.Where("org_id=? AND uid=?", orgID, uid).Get(&AlertDefinition{})
if err != nil {
return "", err
}
if !exists {
return uid, nil
}
}
return "", errAlertDefinitionFailedGenerateUniqueUID
}

View File

@ -0,0 +1,70 @@
package ngalert
import (
"fmt"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
)
func addAlertDefinitionMigrations(mg *migrator.Migrator) {
mg.AddMigration("delete alert_definition table", migrator.NewDropTableMigration("alert_definition"))
alertDefinition := migrator.Table{
Name: "alert_definition",
Columns: []*migrator.Column{
{Name: "id", Type: migrator.DB_BigInt, IsPrimaryKey: true, IsAutoIncrement: true},
{Name: "org_id", Type: migrator.DB_BigInt, Nullable: false},
{Name: "title", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
{Name: "condition", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
{Name: "data", Type: migrator.DB_Text, Nullable: false},
{Name: "updated", Type: migrator.DB_DateTime, Nullable: false},
{Name: "interval_seconds", Type: migrator.DB_BigInt, Nullable: false, Default: fmt.Sprintf("%d", defaultIntervalSeconds)},
{Name: "version", Type: migrator.DB_Int, Nullable: false, Default: "0"},
{Name: "uid", Type: migrator.DB_NVarchar, Length: 40, Nullable: false, Default: "0"},
},
Indices: []*migrator.Index{
{Cols: []string{"org_id", "title"}, Type: migrator.IndexType},
{Cols: []string{"org_id", "uid"}, Type: migrator.IndexType},
},
}
// create table
mg.AddMigration("recreate alert_definition table", migrator.NewAddTableMigration(alertDefinition))
// create indices
mg.AddMigration("add index in alert_definition on org_id and title columns", migrator.NewAddIndexMigration(alertDefinition, alertDefinition.Indices[0]))
mg.AddMigration("add index in alert_definition on org_id and uid columns", migrator.NewAddIndexMigration(alertDefinition, alertDefinition.Indices[1]))
mg.AddMigration("alter alert_definition table data column to mediumtext in mysql", migrator.NewRawSQLMigration("").
Mysql("ALTER TABLE alert_definition MODIFY data MEDIUMTEXT;"))
}
func addAlertDefinitionVersionMigrations(mg *migrator.Migrator) {
mg.AddMigration("delete alert_definition_version table", migrator.NewDropTableMigration("alert_definition_version"))
alertDefinitionVersion := migrator.Table{
Name: "alert_definition_version",
Columns: []*migrator.Column{
{Name: "id", Type: migrator.DB_BigInt, IsPrimaryKey: true, IsAutoIncrement: true},
{Name: "alert_definition_id", Type: migrator.DB_BigInt},
{Name: "alert_definition_uid", Type: migrator.DB_NVarchar, Length: 40, Nullable: false, Default: "0"},
{Name: "parent_version", Type: migrator.DB_Int, Nullable: false},
{Name: "restored_from", Type: migrator.DB_Int, Nullable: false},
{Name: "version", Type: migrator.DB_Int, Nullable: false},
{Name: "created", Type: migrator.DB_DateTime, Nullable: false},
{Name: "title", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
{Name: "condition", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
{Name: "data", Type: migrator.DB_Text, Nullable: false},
{Name: "interval_seconds", Type: migrator.DB_BigInt, Nullable: false},
},
Indices: []*migrator.Index{
{Cols: []string{"alert_definition_id", "version"}, Type: migrator.UniqueIndex},
{Cols: []string{"alert_definition_uid", "version"}, Type: migrator.UniqueIndex},
},
}
mg.AddMigration("recreate alert_definition_version table", migrator.NewAddTableMigration(alertDefinitionVersion))
mg.AddMigration("add index in alert_definition_version table on alert_definition_id and version columns", migrator.NewAddIndexMigration(alertDefinitionVersion, alertDefinitionVersion.Indices[0]))
mg.AddMigration("add index in alert_definition_version table on alert_definition_uid and version columns", migrator.NewAddIndexMigration(alertDefinitionVersion, alertDefinitionVersion.Indices[1]))
mg.AddMigration("alter alert_definition_version table data column to mediumtext in mysql", migrator.NewRawSQLMigration("").
Mysql("ALTER TABLE alert_definition_version MODIFY data MEDIUMTEXT;"))
}

View File

@ -4,51 +4,116 @@ package ngalert
import (
"encoding/json"
"errors"
"testing"
"time"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/setting"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func mockTimeNow() {
var timeSeed int64
timeNow = func() time.Time {
fakeNow := time.Unix(timeSeed, 0).UTC()
timeSeed++
return fakeNow
}
}
func resetTimeNow() {
timeNow = time.Now
}
func TestCreatingAlertDefinition(t *testing.T) {
t.Run("should fail gracefully when creating alert definition with invalid relative time range", func(t *testing.T) {
ng := setupTestEnv(t)
q := saveAlertDefinitionCommand{
OrgID: 1,
Name: "something completely different",
Condition: condition{
RefID: "B",
QueriesAndExpressions: []eval.AlertQuery{
{
Model: json.RawMessage(`{
"datasource": "__expr__",
"type":"math",
"expression":"2 + 3 > 1"
}`),
RefID: "B",
mockTimeNow()
defer resetTimeNow()
var customIntervalSeconds int64 = 120
testCases := []struct {
desc string
inputIntervalSeconds *int64
inputTitle string
expectedError error
expectedInterval int64
expectedUpdated time.Time
}{
{
desc: "should create successfuly an alert definition with default interval",
inputIntervalSeconds: nil,
inputTitle: "a name",
expectedInterval: defaultIntervalSeconds,
expectedUpdated: time.Unix(0, 0).UTC(),
},
{
desc: "should create successfuly an alert definition with custom interval",
inputIntervalSeconds: &customIntervalSeconds,
inputTitle: "another name",
expectedInterval: customIntervalSeconds,
expectedUpdated: time.Unix(1, 0).UTC(),
},
{
desc: "should fail to create an alert definition with too big name",
inputIntervalSeconds: &customIntervalSeconds,
inputTitle: getLongString(alertDefinitionMaxNameLength + 1),
expectedError: errors.New(""),
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
ng := setupTestEnv(t)
q := saveAlertDefinitionCommand{
OrgID: 1,
Title: tc.inputTitle,
Condition: eval.Condition{
RefID: "B",
QueriesAndExpressions: []eval.AlertQuery{
{
Model: json.RawMessage(`{
"datasource": "__expr__",
"type":"math",
"expression":"2 + 3 > 1"
}`),
RefID: "B",
RelativeTimeRange: eval.RelativeTimeRange{
From: eval.Duration(time.Duration(5) * time.Hour),
To: eval.Duration(time.Duration(3) * time.Hour),
},
},
},
},
},
}
err := ng.saveAlertDefinition(&q)
require.NoError(t, err)
})
}
if tc.inputIntervalSeconds != nil {
q.IntervalSeconds = tc.inputIntervalSeconds
}
err := ng.saveAlertDefinition(&q)
switch {
case tc.expectedError != nil:
require.Error(t, err)
default:
require.NoError(t, err)
assert.Equal(t, tc.expectedUpdated, q.Result.Updated)
assert.Equal(t, tc.expectedInterval, q.Result.IntervalSeconds)
assert.Equal(t, int64(1), q.Result.Version)
}
})
}
}
func TestUpdatingAlertDefinition(t *testing.T) {
mockTimeNow()
defer resetTimeNow()
t.Run("zero rows affected when updating unknown alert", func(t *testing.T) {
ng := setupTestEnv(t)
q := updateAlertDefinitionCommand{
ID: 1,
OrgID: 1,
Name: "something completely different",
Condition: condition{
Title: "something completely different",
Condition: eval.Condition{
RefID: "A",
QueriesAndExpressions: []eval.AlertQuery{
{
@ -72,15 +137,59 @@ func TestUpdatingAlertDefinition(t *testing.T) {
assert.Equal(t, int64(0), q.RowsAffected)
})
t.Run("updating successfully existing alert", func(t *testing.T) {
t.Run("updating existing alert", func(t *testing.T) {
ng := setupTestEnv(t)
alertDefinition := createTestAlertDefinition(t, ng)
var initialInterval int64 = 120
alertDefinition := createTestAlertDefinition(t, ng, initialInterval)
created := alertDefinition.Updated
var customInterval int64 = 30
testCases := []struct {
desc string
inputOrgID int64
inputTitle string
inputInterval *int64
expectedError error
expectedIntervalSeconds int64
expectedUpdated time.Time
}{
{
desc: "should not update previous interval if it's not provided",
inputInterval: nil,
inputOrgID: alertDefinition.OrgID,
inputTitle: "something completely different",
expectedIntervalSeconds: initialInterval,
expectedUpdated: time.Unix(2, 0).UTC(),
},
{
desc: "should update interval if it's provided",
inputInterval: &customInterval,
inputOrgID: alertDefinition.OrgID,
inputTitle: "something completely different",
expectedIntervalSeconds: customInterval,
expectedUpdated: time.Unix(3, 0).UTC(),
},
{
desc: "should not update organisation if it's provided",
inputInterval: &customInterval,
inputOrgID: 0,
inputTitle: "something completely different",
expectedIntervalSeconds: customInterval,
expectedUpdated: time.Unix(4, 0).UTC(),
},
{
desc: "should not update alert definition if the name it's too big",
inputInterval: &customInterval,
inputOrgID: 0,
inputTitle: getLongString(alertDefinitionMaxNameLength + 1),
expectedError: errors.New(""),
},
}
q := updateAlertDefinitionCommand{
ID: (*alertDefinition).Id,
OrgID: 1,
Name: "something completely different",
Condition: condition{
ID: (*alertDefinition).ID,
Title: "something completely different",
Condition: eval.Condition{
RefID: "B",
QueriesAndExpressions: []eval.AlertQuery{
{
@ -99,17 +208,61 @@ func TestUpdatingAlertDefinition(t *testing.T) {
},
}
err := ng.updateAlertDefinition(&q)
require.NoError(t, err)
assert.Equal(t, int64(1), q.RowsAffected)
assert.Equal(t, int64(1), q.Result.Id)
lastUpdated := created
previousAlertDefinition := alertDefinition
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
if tc.inputInterval != nil {
q.IntervalSeconds = tc.inputInterval
}
if tc.inputOrgID != 0 {
q.OrgID = tc.inputOrgID
}
q.Title = tc.inputTitle
err := ng.updateAlertDefinition(&q)
switch {
case tc.expectedError != nil:
require.Error(t, err)
getAlertDefinitionByIDQuery := getAlertDefinitionByIDQuery{ID: (*alertDefinition).ID}
err = ng.getAlertDefinitionByID(&getAlertDefinitionByIDQuery)
require.NoError(t, err)
assert.Equal(t, previousAlertDefinition.Title, getAlertDefinitionByIDQuery.Result.Title)
assert.Equal(t, previousAlertDefinition.Condition, getAlertDefinitionByIDQuery.Result.Condition)
assert.Equal(t, len(previousAlertDefinition.Data), len(getAlertDefinitionByIDQuery.Result.Data))
assert.Equal(t, previousAlertDefinition.IntervalSeconds, getAlertDefinitionByIDQuery.Result.IntervalSeconds)
assert.Equal(t, previousAlertDefinition.Updated, getAlertDefinitionByIDQuery.Result.Updated)
assert.Equal(t, previousAlertDefinition.Version, getAlertDefinitionByIDQuery.Result.Version)
assert.Equal(t, previousAlertDefinition.OrgID, getAlertDefinitionByIDQuery.Result.OrgID)
assert.Equal(t, previousAlertDefinition.UID, getAlertDefinitionByIDQuery.Result.UID)
default:
require.NoError(t, err)
assert.Equal(t, int64(1), q.RowsAffected)
assert.Equal(t, int64(1), q.Result.ID)
assert.True(t, q.Result.Updated.After(lastUpdated))
assert.Equal(t, tc.expectedUpdated, q.Result.Updated)
assert.Equal(t, previousAlertDefinition.Version+1, q.Result.Version)
assert.Equal(t, alertDefinition.OrgID, q.Result.OrgID)
getAlertDefinitionByIDQuery := getAlertDefinitionByIDQuery{ID: (*alertDefinition).ID}
err = ng.getAlertDefinitionByID(&getAlertDefinitionByIDQuery)
require.NoError(t, err)
assert.Equal(t, "something completely different", getAlertDefinitionByIDQuery.Result.Title)
assert.Equal(t, "B", getAlertDefinitionByIDQuery.Result.Condition)
assert.Equal(t, 1, len(getAlertDefinitionByIDQuery.Result.Data))
assert.Equal(t, tc.expectedUpdated, getAlertDefinitionByIDQuery.Result.Updated)
assert.Equal(t, tc.expectedIntervalSeconds, getAlertDefinitionByIDQuery.Result.IntervalSeconds)
assert.Equal(t, previousAlertDefinition.Version+1, getAlertDefinitionByIDQuery.Result.Version)
assert.Equal(t, alertDefinition.OrgID, getAlertDefinitionByIDQuery.Result.OrgID)
assert.Equal(t, alertDefinition.UID, getAlertDefinitionByIDQuery.Result.UID)
previousAlertDefinition = getAlertDefinitionByIDQuery.Result
}
})
}
getAlertDefinitionByIDQuery := getAlertDefinitionByIDQuery{ID: (*alertDefinition).Id}
err = ng.getAlertDefinitionByID(&getAlertDefinitionByIDQuery)
require.NoError(t, err)
assert.Equal(t, "something completely different", getAlertDefinitionByIDQuery.Result.Name)
assert.Equal(t, "B", getAlertDefinitionByIDQuery.Result.Condition)
assert.Equal(t, 1, len(getAlertDefinitionByIDQuery.Result.Data))
})
}
@ -117,7 +270,7 @@ func TestDeletingAlertDefinition(t *testing.T) {
t.Run("zero rows affected when deleting unknown alert", func(t *testing.T) {
ng := setupTestEnv(t)
q := deleteAlertDefinitionByIDQuery{
q := deleteAlertDefinitionByIDCommand{
ID: 1,
OrgID: 1,
}
@ -129,10 +282,10 @@ func TestDeletingAlertDefinition(t *testing.T) {
t.Run("deleting successfully existing alert", func(t *testing.T) {
ng := setupTestEnv(t)
alertDefinition := createTestAlertDefinition(t, ng)
alertDefinition := createTestAlertDefinition(t, ng, 60)
q := deleteAlertDefinitionByIDQuery{
ID: (*alertDefinition).Id,
q := deleteAlertDefinitionByIDCommand{
ID: (*alertDefinition).ID,
OrgID: 1,
}
@ -142,40 +295,10 @@ func TestDeletingAlertDefinition(t *testing.T) {
})
}
func setupTestEnv(t *testing.T) AlertNG {
sqlStore := sqlstore.InitTestDB(t)
cfg := setting.Cfg{}
cfg.FeatureToggles = map[string]bool{"ngalert": true}
ng := AlertNG{
SQLStore: sqlStore,
Cfg: &cfg,
func getLongString(n int) string {
b := make([]rune, n)
for i := range b {
b[i] = 'a'
}
return ng
}
func createTestAlertDefinition(t *testing.T, ng AlertNG) *AlertDefinition {
cmd := saveAlertDefinitionCommand{
OrgID: 1,
Name: "an alert definition",
Condition: condition{
RefID: "A",
QueriesAndExpressions: []eval.AlertQuery{
{
Model: json.RawMessage(`{
"datasource": "__expr__",
"type":"math",
"expression":"2 + 2 > 1"
}`),
RelativeTimeRange: eval.RelativeTimeRange{
From: eval.Duration(5 * time.Hour),
To: eval.Duration(3 * time.Hour),
},
RefID: "A",
},
},
},
}
err := ng.saveAlertDefinition(&cmd)
require.NoError(t, err)
return cmd.Result
return string(b)
}

View File

@ -10,9 +10,10 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/expr"
"github.com/grafana/grafana/pkg/models"
)
const alertingEvaluationTimeout = 30 * time.Second
// invalidEvalResultFormatError is an error for invalid format of the alert definition evaluation results.
type invalidEvalResultFormatError struct {
refID string
@ -36,6 +37,7 @@ func (e *invalidEvalResultFormatError) Unwrap() error {
// of the query or expression that will be evaluated.
type Condition struct {
RefID string `json:"refId"`
OrgID int64 `json:"-"`
QueriesAndExpressions []AlertQuery `json:"queriesAndExpressions"`
}
@ -85,14 +87,13 @@ func (c Condition) IsValid() bool {
// AlertExecCtx is the context provided for executing an alert condition.
type AlertExecCtx struct {
AlertDefitionID int64
SignedInUser *models.SignedInUser
OrgID int64
Ctx context.Context
}
// Execute runs the Condition's expressions or queries.
func (c *Condition) Execute(ctx AlertExecCtx, fromStr, toStr string) (*ExecutionResults, error) {
// execute runs the Condition's expressions or queries.
func (c *Condition) execute(ctx AlertExecCtx, now time.Time) (*ExecutionResults, error) {
result := ExecutionResults{}
if !c.IsValid() {
return nil, fmt.Errorf("invalid conditions")
@ -101,7 +102,7 @@ func (c *Condition) Execute(ctx AlertExecCtx, fromStr, toStr string) (*Execution
queryDataReq := &backend.QueryDataRequest{
PluginContext: backend.PluginContext{
OrgID: ctx.SignedInUser.OrgId,
OrgID: ctx.OrgID,
},
Queries: []backend.DataQuery{},
}
@ -128,7 +129,7 @@ func (c *Condition) Execute(ctx AlertExecCtx, fromStr, toStr string) (*Execution
RefID: q.RefID,
MaxDataPoints: maxDatapoints,
QueryType: q.QueryType,
TimeRange: q.RelativeTimeRange.toTimeRange(time.Now()),
TimeRange: q.RelativeTimeRange.toTimeRange(now),
})
}
@ -153,9 +154,9 @@ func (c *Condition) Execute(ctx AlertExecCtx, fromStr, toStr string) (*Execution
return &result, nil
}
// EvaluateExecutionResult takes the ExecutionResult, and returns a frame where
// evaluateExecutionResult takes the ExecutionResult, and returns a frame where
// each column is a string type that holds a string representing its state.
func EvaluateExecutionResult(results *ExecutionResults) (Results, error) {
func evaluateExecutionResult(results *ExecutionResults) (Results, error) {
evalResults := make([]result, 0)
labels := make(map[string]bool)
for _, f := range results.Results {
@ -207,3 +208,22 @@ func (evalResults Results) AsDataFrame() data.Frame {
f := data.NewFrame("", fields...)
return *f
}
// ConditionEval executes conditions and evaluates the result.
func ConditionEval(condition *Condition, now time.Time) (Results, error) {
alertCtx, cancelFn := context.WithTimeout(context.Background(), alertingEvaluationTimeout)
defer cancelFn()
alertExecCtx := AlertExecCtx{OrgID: condition.OrgID, Ctx: alertCtx}
execResult, err := condition.execute(alertExecCtx, now)
if err != nil {
return nil, fmt.Errorf("failed to execute conditions: %w", err)
}
evalResults, err := evaluateExecutionResult(execResult)
if err != nil {
return nil, fmt.Errorf("failed to evaluate results: %w", err)
}
return evalResults, nil
}

View File

@ -0,0 +1,15 @@
package ngalert
import (
"time"
)
func (ng *AlertNG) fetchAllDetails(now time.Time) []*AlertDefinition {
q := listAlertDefinitionsQuery{}
err := ng.getAlertDefinitions(&q)
if err != nil {
ng.schedule.log.Error("failed to fetch alert definitions", "now", now, "err", err)
return nil
}
return q.Result
}

View File

@ -13,7 +13,7 @@ func (ng *AlertNG) validateOrgAlertDefinition(c *models.ReqContext) {
return
}
if c.OrgId != query.Result.OrgId {
if c.OrgId != query.Result.OrgID {
c.JsonApiErr(403, "You are not allowed to edit/view alert definition", nil)
return
}

View File

@ -1,20 +1,42 @@
package ngalert
import (
"errors"
"fmt"
"time"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
)
var errAlertDefinitionFailedGenerateUniqueUID = errors.New("failed to generate alert definition UID")
// AlertDefinition is the model for alert definitions in Alerting NG.
type AlertDefinition struct {
Id int64
OrgId int64
Name string
Condition string
Data []eval.AlertQuery
ID int64 `xorm:"pk autoincr 'id'"`
OrgID int64 `xorm:"org_id"`
Title string
Condition string
Data []eval.AlertQuery
Updated time.Time
IntervalSeconds int64
Version int64
UID string `xorm:"uid"`
}
// AlertDefinitionVersion is the model for alert definition versions in Alerting NG.
type AlertDefinitionVersion struct {
ID int64 `xorm:"pk autoincr 'id'"`
AlertDefinitionID int64 `xorm:"alert_definition_id"`
AlertDefinitionUID string `xorm:"alert_definition_uid"`
ParentVersion int64
RestoredFrom int64
Version int64
Created time.Time
Title string
Condition string
Data []eval.AlertQuery
IntervalSeconds int64
}
var (
@ -30,62 +52,42 @@ type getAlertDefinitionByIDQuery struct {
Result *AlertDefinition
}
type deleteAlertDefinitionByIDQuery struct {
type deleteAlertDefinitionByIDCommand struct {
ID int64
OrgID int64
RowsAffected int64
}
// condition is the structure used by storing/updating alert definition commmands
type condition struct {
RefID string `json:"refId"`
QueriesAndExpressions []eval.AlertQuery `json:"queriesAndExpressions"`
}
// saveAlertDefinitionCommand is the query for saving a new alert definition.
type saveAlertDefinitionCommand struct {
Name string `json:"name"`
OrgID int64 `json:"-"`
Condition condition `json:"condition"`
SignedInUser *models.SignedInUser `json:"-"`
SkipCache bool `json:"-"`
Title string `json:"title"`
OrgID int64 `json:"-"`
Condition eval.Condition `json:"condition"`
IntervalSeconds *int64 `json:"interval_seconds"`
Result *AlertDefinition
}
// IsValid validates a SaveAlertDefinitionCommand.
// Always returns true.
func (cmd *saveAlertDefinitionCommand) IsValid() bool {
return true
}
// updateAlertDefinitionCommand is the query for updating an existing alert definition.
type updateAlertDefinitionCommand struct {
ID int64 `json:"-"`
Name string `json:"name"`
OrgID int64 `json:"-"`
Condition condition `json:"condition"`
SignedInUser *models.SignedInUser `json:"-"`
SkipCache bool `json:"-"`
ID int64 `json:"-"`
Title string `json:"title"`
OrgID int64 `json:"-"`
Condition eval.Condition `json:"condition"`
IntervalSeconds *int64 `json:"interval_seconds"`
UID string `json:"-"`
RowsAffected int64
Result *AlertDefinition
}
// IsValid validates an UpdateAlertDefinitionCommand.
// Always returns true.
func (cmd *updateAlertDefinitionCommand) IsValid() bool {
return true
}
type evalAlertConditionCommand struct {
Condition eval.Condition `json:"condition"`
Now time.Time `json:"now"`
}
type listAlertDefinitionsCommand struct {
type listAlertDefinitionsQuery struct {
OrgID int64 `json:"-"`
Result []*AlertDefinition

View File

@ -1,11 +1,14 @@
package ngalert
import (
"context"
"time"
"github.com/benbjohnson/clock"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/sqlstore"
@ -13,6 +16,18 @@ import (
"github.com/grafana/grafana/pkg/setting"
)
const (
maxAttempts int64 = 3
// scheduler interval
// changing this value is discouraged
// because this could cause existing alert definition
// with intervals that are not exactly divided by this number
// not to be evaluated
baseIntervalSeconds = 10
// default alert definiiton interval
defaultIntervalSeconds int64 = 6 * baseIntervalSeconds
)
// AlertNG is the service for evaluating the condition of an alert definition.
type AlertNG struct {
Cfg *setting.Cfg `inject:""`
@ -20,6 +35,7 @@ type AlertNG struct {
RouteRegister routing.RouteRegister `inject:""`
SQLStore *sqlstore.SQLStore `inject:""`
log log.Logger
schedule *schedule
}
func init() {
@ -31,14 +47,20 @@ func (ng *AlertNG) Init() error {
ng.log = log.New("ngalert")
ng.registerAPIEndpoints()
ng.schedule = newScheduler(clock.New(), baseIntervalSeconds*time.Second, ng.log, nil)
return nil
}
// Run starts the scheduler
func (ng *AlertNG) Run(ctx context.Context) error {
ng.log.Debug("ngalert starting")
return ng.alertingTicker(ctx)
}
// IsDisabled returns true if the alerting service is disable for this instance.
func (ng *AlertNG) IsDisabled() bool {
if ng.Cfg == nil {
return false
return true
}
// Check also about expressions?
return !ng.Cfg.IsNgAlertEnabled()
@ -50,42 +72,26 @@ func (ng *AlertNG) AddMigration(mg *migrator.Migrator) {
if ng.IsDisabled() {
return
}
alertDefinition := migrator.Table{
Name: "alert_definition",
Columns: []*migrator.Column{
{Name: "id", Type: migrator.DB_BigInt, IsPrimaryKey: true, IsAutoIncrement: true},
{Name: "org_id", Type: migrator.DB_BigInt, Nullable: false},
{Name: "name", Type: migrator.DB_NVarchar, Length: 255, Nullable: false},
{Name: "condition", Type: migrator.DB_NVarchar, Length: 255, Nullable: false},
{Name: "data", Type: migrator.DB_Text, Nullable: false},
},
Indices: []*migrator.Index{
{Cols: []string{"org_id"}, Type: migrator.IndexType},
},
}
// create table
mg.AddMigration("create alert_definition table", migrator.NewAddTableMigration(alertDefinition))
// create indices
mg.AddMigration("add index alert_definition org_id", migrator.NewAddIndexMigration(alertDefinition, alertDefinition.Indices[0]))
addAlertDefinitionMigrations(mg)
addAlertDefinitionVersionMigrations(mg)
}
// LoadAlertCondition returns a Condition object for the given alertDefinitionID.
func (ng *AlertNG) LoadAlertCondition(alertDefinitionID int64, signedInUser *models.SignedInUser, skipCache bool) (*eval.Condition, error) {
func (ng *AlertNG) LoadAlertCondition(alertDefinitionID int64) (*eval.Condition, error) {
getAlertDefinitionByIDQuery := getAlertDefinitionByIDQuery{ID: alertDefinitionID}
if err := ng.getAlertDefinitionByID(&getAlertDefinitionByIDQuery); err != nil {
return nil, err
}
alertDefinition := getAlertDefinitionByIDQuery.Result
err := ng.validateAlertDefinition(alertDefinition, signedInUser, skipCache)
err := ng.validateAlertDefinition(alertDefinition, true)
if err != nil {
return nil, err
}
return &eval.Condition{
RefID: alertDefinition.Condition,
OrgID: alertDefinition.OrgID,
QueriesAndExpressions: alertDefinition.Data,
}, nil
}

View File

@ -0,0 +1,291 @@
package ngalert
import (
"context"
"fmt"
"sync"
"time"
"github.com/benbjohnson/clock"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/alerting"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"golang.org/x/sync/errgroup"
)
func (ng *AlertNG) definitionRoutine(grafanaCtx context.Context, definitionID int64, evalCh <-chan *evalContext) error {
ng.log.Debug("alert definition routine started", "definitionID", definitionID)
evalRunning := false
var start, end time.Time
var attempt int64
var alertDefinition *AlertDefinition
for {
select {
case ctx := <-evalCh:
if evalRunning {
continue
}
evaluate := func(attempt int64) error {
start = timeNow()
// fetch latest alert definition version
if alertDefinition == nil || alertDefinition.Version < ctx.version {
q := getAlertDefinitionByIDQuery{ID: definitionID}
err := ng.getAlertDefinitionByID(&q)
if err != nil {
ng.schedule.log.Error("failed to fetch alert definition", "alertDefinitionID", alertDefinition.ID)
return err
}
alertDefinition = q.Result
ng.schedule.log.Debug("new alert definition version fetched", "alertDefinitionID", alertDefinition.ID, "version", alertDefinition.Version)
}
condition := eval.Condition{
RefID: alertDefinition.Condition,
OrgID: alertDefinition.OrgID,
QueriesAndExpressions: alertDefinition.Data,
}
results, err := eval.ConditionEval(&condition, ctx.now)
end = timeNow()
if err != nil {
ng.schedule.log.Error("failed to evaluate alert definition", "definitionID", definitionID, "attempt", attempt, "now", ctx.now, "duration", end.Sub(start), "error", err)
return err
}
for _, r := range results {
ng.schedule.log.Info("alert definition result", "definitionID", definitionID, "attempt", attempt, "now", ctx.now, "duration", end.Sub(start), "instance", r.Instance, "state", r.State.String())
}
return nil
}
func() {
evalRunning = true
defer func() {
evalRunning = false
if ng.schedule.evalApplied != nil {
ng.schedule.evalApplied(definitionID, ctx.now)
}
}()
for attempt = 0; attempt < ng.schedule.maxAttempts; attempt++ {
err := evaluate(attempt)
if err == nil {
break
}
}
}()
case id := <-ng.schedule.stop:
if id == definitionID {
ng.schedule.log.Debug("stopping alert definition routine", "definitionID", definitionID)
// interrupt evaluation if it's running
return nil
}
case <-grafanaCtx.Done():
return grafanaCtx.Err()
}
}
}
type schedule struct {
// base tick rate (fastest possible configured check)
baseInterval time.Duration
// each alert definition gets its own channel and routine
registry alertDefinitionRegistry
// broadcast channel for stopping definition routines
stop chan int64
maxAttempts int64
clock clock.Clock
heartbeat *alerting.Ticker
// evalApplied is only used for tests: test code can set it to non-nil
// function, and then it'll be called from the event loop whenever the
// message from evalApplied is handled.
evalApplied func(int64, time.Time)
log log.Logger
}
// newScheduler returns a new schedule.
func newScheduler(c clock.Clock, baseInterval time.Duration, logger log.Logger, evalApplied func(int64, time.Time)) *schedule {
ticker := alerting.NewTicker(c.Now(), time.Second*0, c, int64(baseInterval.Seconds()))
sch := schedule{
registry: alertDefinitionRegistry{alertDefinitionInfo: make(map[int64]alertDefinitionInfo)},
stop: make(chan int64),
maxAttempts: maxAttempts,
clock: c,
baseInterval: baseInterval,
log: logger,
heartbeat: ticker,
evalApplied: evalApplied,
}
return &sch
}
func (sch *schedule) pause() error {
if sch == nil {
return fmt.Errorf("scheduler is not initialised")
}
sch.heartbeat.Pause()
sch.log.Info("alert definition scheduler paused", "now", sch.clock.Now())
return nil
}
func (sch *schedule) unpause() error {
if sch == nil {
return fmt.Errorf("scheduler is not initialised")
}
sch.heartbeat.Unpause()
sch.log.Info("alert definition scheduler unpaused", "now", sch.clock.Now())
return nil
}
func (ng *AlertNG) alertingTicker(grafanaCtx context.Context) error {
dispatcherGroup, ctx := errgroup.WithContext(grafanaCtx)
for {
select {
case tick := <-ng.schedule.heartbeat.C:
tickNum := tick.Unix() / int64(ng.schedule.baseInterval.Seconds())
alertDefinitions := ng.fetchAllDetails(tick)
ng.schedule.log.Debug("alert definitions fetched", "count", len(alertDefinitions))
// registeredDefinitions is a map used for finding deleted alert definitions
// initially it is assigned to all known alert definitions from the previous cycle
// each alert definition found also in this cycle is removed
// so, at the end, the remaining registered alert definitions are the deleted ones
registeredDefinitions := ng.schedule.registry.keyMap()
type readyToRunItem struct {
id int64
definitionInfo alertDefinitionInfo
}
readyToRun := make([]readyToRunItem, 0)
for _, item := range alertDefinitions {
itemID := item.ID
itemVersion := item.Version
newRoutine := !ng.schedule.registry.exists(itemID)
definitionInfo := ng.schedule.registry.getOrCreateInfo(itemID, itemVersion)
invalidInterval := item.IntervalSeconds%int64(ng.schedule.baseInterval.Seconds()) != 0
if newRoutine && !invalidInterval {
dispatcherGroup.Go(func() error {
return ng.definitionRoutine(ctx, itemID, definitionInfo.ch)
})
}
if invalidInterval {
// this is expected to be always false
// give that we validate interval during alert definition updates
ng.schedule.log.Debug("alert definition with invalid interval will be ignored: interval should be divided exactly by scheduler interval", "definitionID", itemID, "interval", time.Duration(item.IntervalSeconds)*time.Second, "scheduler interval", ng.schedule.baseInterval)
continue
}
itemFrequency := item.IntervalSeconds / int64(ng.schedule.baseInterval.Seconds())
if item.IntervalSeconds != 0 && tickNum%itemFrequency == 0 {
readyToRun = append(readyToRun, readyToRunItem{id: itemID, definitionInfo: definitionInfo})
}
// remove the alert definition from the registered alert definitions
delete(registeredDefinitions, itemID)
}
var step int64 = 0
if len(readyToRun) > 0 {
step = ng.schedule.baseInterval.Nanoseconds() / int64(len(readyToRun))
}
for i := range readyToRun {
item := readyToRun[i]
time.AfterFunc(time.Duration(int64(i)*step), func() {
item.definitionInfo.ch <- &evalContext{now: tick, version: item.definitionInfo.version}
})
}
// unregister and stop routines of the deleted alert definitions
for id := range registeredDefinitions {
ng.schedule.stop <- id
ng.schedule.registry.del(id)
}
case <-grafanaCtx.Done():
err := dispatcherGroup.Wait()
return err
}
}
}
type alertDefinitionRegistry struct {
mu sync.Mutex
alertDefinitionInfo map[int64]alertDefinitionInfo
}
// getOrCreateInfo returns the channel for the specific alert definition
// if it does not exists creates one and returns it
func (r *alertDefinitionRegistry) getOrCreateInfo(definitionID int64, definitionVersion int64) alertDefinitionInfo {
r.mu.Lock()
defer r.mu.Unlock()
info, ok := r.alertDefinitionInfo[definitionID]
if !ok {
r.alertDefinitionInfo[definitionID] = alertDefinitionInfo{ch: make(chan *evalContext), version: definitionVersion}
return r.alertDefinitionInfo[definitionID]
}
info.version = definitionVersion
r.alertDefinitionInfo[definitionID] = info
return info
}
func (r *alertDefinitionRegistry) exists(definitionID int64) bool {
r.mu.Lock()
defer r.mu.Unlock()
_, ok := r.alertDefinitionInfo[definitionID]
return ok
}
func (r *alertDefinitionRegistry) del(definitionID int64) {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.alertDefinitionInfo, definitionID)
}
func (r *alertDefinitionRegistry) iter() <-chan int64 {
c := make(chan int64)
f := func() {
r.mu.Lock()
defer r.mu.Unlock()
for k := range r.alertDefinitionInfo {
c <- k
}
close(c)
}
go f()
return c
}
func (r *alertDefinitionRegistry) keyMap() map[int64]struct{} {
definitionsIDs := make(map[int64]struct{})
for definitionID := range r.iter() {
definitionsIDs[definitionID] = struct{}{}
}
return definitionsIDs
}
type alertDefinitionInfo struct {
ch chan *evalContext
version int64
}
type evalContext struct {
now time.Time
version int64
}

View File

@ -0,0 +1,153 @@
package ngalert
import (
"context"
"fmt"
"runtime"
"strconv"
"strings"
"testing"
"time"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/registry"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/benbjohnson/clock"
)
type evalAppliedInfo struct {
alertDefID int64
now time.Time
}
func TestAlertingTicker(t *testing.T) {
ng := setupTestEnv(t)
t.Cleanup(registry.ClearOverrides)
mockedClock := clock.NewMock()
ng.schedule = newScheduler(mockedClock, time.Second, log.New("ngalert.schedule.test"), nil)
alerts := make([]*AlertDefinition, 0)
// create alert definition with zero interval (should never run)
alerts = append(alerts, createTestAlertDefinition(t, ng, 0))
// create alert definition with one second interval
alerts = append(alerts, createTestAlertDefinition(t, ng, 1))
evalAppliedCh := make(chan evalAppliedInfo, len(alerts))
ng.schedule.evalApplied = func(alertDefID int64, now time.Time) {
evalAppliedCh <- evalAppliedInfo{alertDefID: alertDefID, now: now}
}
ctx := context.Background()
go func() {
err := ng.alertingTicker(ctx)
require.NoError(t, err)
}()
runtime.Gosched()
expectedAlertDefinitionsEvaluated := []int64{alerts[1].ID}
t.Run(fmt.Sprintf("on 1st tick alert definitions: %s should be evaluated", concatenate(expectedAlertDefinitionsEvaluated)), func(t *testing.T) {
tick := advanceClock(t, mockedClock)
assertEvalRun(t, evalAppliedCh, tick, expectedAlertDefinitionsEvaluated...)
})
// change alert definition interval to three seconds
var threeSecInterval int64 = 3
err := ng.updateAlertDefinition(&updateAlertDefinitionCommand{
ID: alerts[0].ID,
IntervalSeconds: &threeSecInterval,
OrgID: alerts[0].OrgID,
})
require.NoError(t, err)
t.Logf("alert definition: %d interval reset to: %d", alerts[0].ID, threeSecInterval)
expectedAlertDefinitionsEvaluated = []int64{alerts[1].ID}
t.Run(fmt.Sprintf("on 2nd tick alert definition: %s should be evaluated", concatenate(expectedAlertDefinitionsEvaluated)), func(t *testing.T) {
tick := advanceClock(t, mockedClock)
assertEvalRun(t, evalAppliedCh, tick, expectedAlertDefinitionsEvaluated...)
})
expectedAlertDefinitionsEvaluated = []int64{alerts[1].ID, alerts[0].ID}
t.Run(fmt.Sprintf("on 3rd tick alert definitions: %s should be evaluated", concatenate(expectedAlertDefinitionsEvaluated)), func(t *testing.T) {
tick := advanceClock(t, mockedClock)
assertEvalRun(t, evalAppliedCh, tick, expectedAlertDefinitionsEvaluated...)
})
expectedAlertDefinitionsEvaluated = []int64{alerts[1].ID}
t.Run(fmt.Sprintf("on 4th tick alert definitions: %s should be evaluated", concatenate(expectedAlertDefinitionsEvaluated)), func(t *testing.T) {
tick := advanceClock(t, mockedClock)
assertEvalRun(t, evalAppliedCh, tick, expectedAlertDefinitionsEvaluated...)
})
err = ng.deleteAlertDefinitionByID(&deleteAlertDefinitionByIDCommand{ID: alerts[1].ID})
require.NoError(t, err)
t.Logf("alert definition: %d deleted", alerts[1].ID)
expectedAlertDefinitionsEvaluated = []int64{}
t.Run(fmt.Sprintf("on 5th tick alert definitions: %s should be evaluated", concatenate(expectedAlertDefinitionsEvaluated)), func(t *testing.T) {
tick := advanceClock(t, mockedClock)
assertEvalRun(t, evalAppliedCh, tick, expectedAlertDefinitionsEvaluated...)
})
expectedAlertDefinitionsEvaluated = []int64{alerts[0].ID}
t.Run(fmt.Sprintf("on 6th tick alert definitions: %s should be evaluated", concatenate(expectedAlertDefinitionsEvaluated)), func(t *testing.T) {
tick := advanceClock(t, mockedClock)
assertEvalRun(t, evalAppliedCh, tick, expectedAlertDefinitionsEvaluated...)
})
// create alert definition with one second interval
alerts = append(alerts, createTestAlertDefinition(t, ng, 1))
expectedAlertDefinitionsEvaluated = []int64{alerts[2].ID}
t.Run(fmt.Sprintf("on 7th tick alert definitions: %s should be evaluated", concatenate(expectedAlertDefinitionsEvaluated)), func(t *testing.T) {
tick := advanceClock(t, mockedClock)
assertEvalRun(t, evalAppliedCh, tick, expectedAlertDefinitionsEvaluated...)
})
}
func assertEvalRun(t *testing.T, ch <-chan evalAppliedInfo, tick time.Time, ids ...int64) {
timeout := time.After(time.Second)
expected := make(map[int64]struct{}, len(ids))
for _, id := range ids {
expected[id] = struct{}{}
}
for {
select {
case info := <-ch:
_, ok := expected[info.alertDefID]
t.Logf("alert definition: %d evaluated at: %v", info.alertDefID, info.now)
assert.True(t, ok)
assert.Equal(t, tick, info.now)
delete(expected, info.alertDefID)
if len(expected) == 0 {
return
}
case <-timeout:
if len(expected) == 0 {
return
}
t.Fatal("cycle has expired")
}
}
}
func advanceClock(t *testing.T, mockedClock *clock.Mock) time.Time {
mockedClock.Add(time.Second)
return mockedClock.Now()
// t.Logf("Tick: %v", mockedClock.Now())
}
func concatenate(ids []int64) string {
s := make([]string, len(ids))
for _, id := range ids {
s = append(s, strconv.FormatInt(id, 10))
}
return fmt.Sprintf("[%s]", strings.TrimLeft(strings.Join(s, ","), ","))
}

View File

@ -2,32 +2,68 @@ package ngalert
import (
"fmt"
"time"
"github.com/grafana/grafana/pkg/expr"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
)
// validateAlertDefinition validates that the alert definition contains at least one alert query
// and that alert queries refer to existing datasources.
func (ng *AlertNG) validateAlertDefinition(alertDefinition *AlertDefinition, signedInUser *models.SignedInUser, skipCache bool) error {
if len(alertDefinition.Data) == 0 {
const alertDefinitionMaxNameLength = 190
// validateAlertDefinition validates the alert definition interval and organisation.
// If requireData is true checks that it contains at least one alert query
func (ng *AlertNG) validateAlertDefinition(alertDefinition *AlertDefinition, requireData bool) error {
if !requireData && len(alertDefinition.Data) == 0 {
return fmt.Errorf("no queries or expressions are found")
}
for _, query := range alertDefinition.Data {
if alertDefinition.IntervalSeconds%int64(ng.schedule.baseInterval.Seconds()) != 0 {
return fmt.Errorf("invalid interval: %v: interval should be divided exactly by scheduler interval: %v", time.Duration(alertDefinition.IntervalSeconds)*time.Second, ng.schedule.baseInterval)
}
// enfore max name length in SQLite
if len(alertDefinition.Title) > alertDefinitionMaxNameLength {
return fmt.Errorf("name length should not be greater than %d", alertDefinitionMaxNameLength)
}
if alertDefinition.OrgID == 0 {
return fmt.Errorf("no organisation is found")
}
return nil
}
// validateCondition validates that condition queries refer to existing datasources
func (ng *AlertNG) validateCondition(c eval.Condition, user *models.SignedInUser) error {
var refID string
if len(c.QueriesAndExpressions) == 0 {
return nil
}
for _, query := range c.QueriesAndExpressions {
if c.RefID == query.RefID {
refID = c.RefID
}
datasourceID, err := query.GetDatasource()
if err != nil {
return err
}
if datasourceID == expr.DatasourceID {
return nil
continue
}
_, err = ng.DatasourceCache.GetDatasource(datasourceID, signedInUser, skipCache)
_, err = ng.DatasourceCache.GetDatasource(datasourceID, user, false)
if err != nil {
return err
}
}
if refID == "" {
return fmt.Errorf("condition %s not found in any query or expression", c.RefID)
}
return nil
}

View File

@ -170,7 +170,7 @@ func (mg *Migrator) inTransaction(callback dbTransactionFunc) error {
if err := callback(sess); err != nil {
if rollErr := sess.Rollback(); !errors.Is(err, rollErr) {
return errutil.Wrapf(err, "failed to roll back transaction due to error: %s", rollErr)
return errutil.Wrapf(err, "failed to roll back transaction due to error: %s; initial err: %s", rollErr, err)
}
return err