From 3cac10e598653985c760e9bec2bb4b6d9f7c75f4 Mon Sep 17 00:00:00 2001 From: Sofia Papagiannaki Date: Thu, 17 Dec 2020 16:00:09 +0200 Subject: [PATCH] AlertingNG: Create a scheduler to evaluate alert definitions (#29305) * Always use cache: stop passing skipCache among ngalert functions * Add updated column * Scheduler initial draft * Add retry on failure * Allow settting/updating alert definition interval Set default interval if no interval is provided during alert definition creation. Keep existing alert definition interval if no interval is provided during alert definition update. * Parameterise alerting.Ticker to run on custom interval * Allow updating alert definition interval without having to provide the queries and expressions * Add schedule tests * Use xorm tags for having initialisms with consistent case in Go * Add ability to pause/unpause the scheduler * Add alert definition versioning * Optimise scheduler to fetch alert definition only when it's necessary * Change MySQL data column to mediumtext * Delete alert definition versions * Increase default scheduler interval to 10 seconds * Fix setting OrgID on updates * Add validation for alert definition name length * Recreate tables --- pkg/services/alerting/engine.go | 2 +- pkg/services/alerting/ticker.go | 50 ++-- pkg/services/ngalert/alert_definition.go | 9 +- pkg/services/ngalert/api.go | 136 +++++----- pkg/services/ngalert/common_test.go | 85 ++++++ pkg/services/ngalert/database.go | 148 +++++++++-- pkg/services/ngalert/database_mig.go | 70 +++++ pkg/services/ngalert/database_test.go | 281 ++++++++++++++------ pkg/services/ngalert/eval/eval.go | 38 ++- pkg/services/ngalert/fetcher.go | 15 ++ pkg/services/ngalert/middleware.go | 2 +- pkg/services/ngalert/models.go | 78 +++--- pkg/services/ngalert/ngalert.go | 54 ++-- pkg/services/ngalert/schedule.go | 291 +++++++++++++++++++++ pkg/services/ngalert/schedule_test.go | 153 +++++++++++ pkg/services/ngalert/validator.go | 50 +++- pkg/services/sqlstore/migrator/migrator.go | 2 +- 17 files changed, 1204 insertions(+), 260 deletions(-) create mode 100644 pkg/services/ngalert/common_test.go create mode 100644 pkg/services/ngalert/database_mig.go create mode 100644 pkg/services/ngalert/fetcher.go create mode 100644 pkg/services/ngalert/schedule.go create mode 100644 pkg/services/ngalert/schedule_test.go diff --git a/pkg/services/alerting/engine.go b/pkg/services/alerting/engine.go index 808700592ec..3ccbc0ef5c3 100644 --- a/pkg/services/alerting/engine.go +++ b/pkg/services/alerting/engine.go @@ -46,7 +46,7 @@ func (e *AlertEngine) IsDisabled() bool { // Init initializes the AlertingService. func (e *AlertEngine) Init() error { - e.ticker = NewTicker(time.Now(), time.Second*0, clock.New()) + e.ticker = NewTicker(time.Now(), time.Second*0, clock.New(), 1) e.execQueue = make(chan *Job, 1000) e.scheduler = newScheduler() e.evalHandler = NewEvalHandler() diff --git a/pkg/services/alerting/ticker.go b/pkg/services/alerting/ticker.go index 9702a2cda63..48df4c227d5 100644 --- a/pkg/services/alerting/ticker.go +++ b/pkg/services/alerting/ticker.go @@ -9,7 +9,7 @@ import ( // Ticker is a ticker to power the alerting scheduler. it's like a time.Ticker, except: // * it doesn't drop ticks for slow receivers, rather, it queues up. so that callers are in control to instrument what's going on. // * it automatically ticks every second, which is the right thing in our current design -// * it ticks on second marks or very shortly after. this provides a predictable load pattern +// * it ticks on intervalSec marks or very shortly after. this provides a predictable load pattern // (this shouldn't cause too much load contention issues because the next steps in the pipeline just process at their own pace) // * the timestamps are used to mark "last datapoint to query for" and as such, are a configurable amount of seconds in the past // * because we want to allow: @@ -17,21 +17,24 @@ import ( // - adjusting offset over time to compensate for storage backing up or getting fast and providing lower latency // you specify a lastProcessed timestamp as well as an offset at creation, or runtime type Ticker struct { - C chan time.Time - clock clock.Clock - last time.Time - offset time.Duration - newOffset chan time.Duration + C chan time.Time + clock clock.Clock + last time.Time + offset time.Duration + newOffset chan time.Duration + intervalSec int64 + paused bool } -// NewTicker returns a ticker that ticks on second marks or very shortly after, and never drops ticks -func NewTicker(last time.Time, initialOffset time.Duration, c clock.Clock) *Ticker { +// NewTicker returns a ticker that ticks on intervalSec marks or very shortly after, and never drops ticks +func NewTicker(last time.Time, initialOffset time.Duration, c clock.Clock, intervalSec int64) *Ticker { t := &Ticker{ - C: make(chan time.Time), - clock: c, - last: last, - offset: initialOffset, - newOffset: make(chan time.Duration), + C: make(chan time.Time), + clock: c, + last: last, + offset: initialOffset, + newOffset: make(chan time.Duration), + intervalSec: intervalSec, } go t.run() return t @@ -39,10 +42,12 @@ func NewTicker(last time.Time, initialOffset time.Duration, c clock.Clock) *Tick func (t *Ticker) run() { for { - next := t.last.Add(time.Duration(1) * time.Second) + next := t.last.Add(time.Duration(t.intervalSec) * time.Second) diff := t.clock.Now().Add(-t.offset).Sub(next) if diff >= 0 { - t.C <- next + if !t.paused { + t.C <- next + } t.last = next continue } @@ -54,3 +59,18 @@ func (t *Ticker) run() { } } } + +// ResetOffset resets the offset. +func (t *Ticker) ResetOffset(duration time.Duration) { + t.newOffset <- duration +} + +// Pause unpauses the ticker and no ticks will be sent. +func (t *Ticker) Pause() { + t.paused = true +} + +// Unpause unpauses the ticker and ticks will be sent. +func (t *Ticker) Unpause() { + t.paused = false +} diff --git a/pkg/services/ngalert/alert_definition.go b/pkg/services/ngalert/alert_definition.go index b521cc80a7d..20cd73b2986 100644 --- a/pkg/services/ngalert/alert_definition.go +++ b/pkg/services/ngalert/alert_definition.go @@ -1,6 +1,12 @@ package ngalert -import "fmt" +import ( + "fmt" + "time" +) + +// timeNow makes it possible to test usage of time +var timeNow = time.Now // preSave sets datasource and loads the updated model for each alert query. func (alertDefinition *AlertDefinition) preSave() error { @@ -11,5 +17,6 @@ func (alertDefinition *AlertDefinition) preSave() error { } alertDefinition.Data[i] = q } + alertDefinition.Updated = timeNow() return nil } diff --git a/pkg/services/ngalert/api.go b/pkg/services/ngalert/api.go index 812300c1fbc..005639c2a12 100644 --- a/pkg/services/ngalert/api.go +++ b/pkg/services/ngalert/api.go @@ -1,17 +1,13 @@ package ngalert import ( - "context" - - "github.com/grafana/grafana/pkg/services/ngalert/eval" - "github.com/go-macaron/binding" "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana/pkg/api" "github.com/grafana/grafana/pkg/api/routing" "github.com/grafana/grafana/pkg/middleware" "github.com/grafana/grafana/pkg/models" - "github.com/grafana/grafana/pkg/setting" + "github.com/grafana/grafana/pkg/services/ngalert/eval" "github.com/grafana/grafana/pkg/tsdb" "github.com/grafana/grafana/pkg/util" ) @@ -19,40 +15,29 @@ import ( func (ng *AlertNG) registerAPIEndpoints() { ng.RouteRegister.Group("/api/alert-definitions", func(alertDefinitions routing.RouteRegister) { alertDefinitions.Get("", middleware.ReqSignedIn, api.Wrap(ng.listAlertDefinitions)) - alertDefinitions.Get("/eval/:alertDefinitionId", ng.validateOrgAlertDefinition, api.Wrap(ng.alertDefinitionEval)) - alertDefinitions.Post("/eval", middleware.ReqSignedIn, binding.Bind(evalAlertConditionCommand{}), api.Wrap(ng.conditionEval)) + alertDefinitions.Get("/eval/:alertDefinitionId", ng.validateOrgAlertDefinition, api.Wrap(ng.alertDefinitionEvalEndpoint)) + alertDefinitions.Post("/eval", middleware.ReqSignedIn, binding.Bind(evalAlertConditionCommand{}), api.Wrap(ng.conditionEvalEndpoint)) alertDefinitions.Get("/:alertDefinitionId", ng.validateOrgAlertDefinition, api.Wrap(ng.getAlertDefinitionEndpoint)) alertDefinitions.Delete("/:alertDefinitionId", ng.validateOrgAlertDefinition, api.Wrap(ng.deleteAlertDefinitionEndpoint)) alertDefinitions.Post("/", middleware.ReqSignedIn, binding.Bind(saveAlertDefinitionCommand{}), api.Wrap(ng.createAlertDefinitionEndpoint)) alertDefinitions.Put("/:alertDefinitionId", ng.validateOrgAlertDefinition, binding.Bind(updateAlertDefinitionCommand{}), api.Wrap(ng.updateAlertDefinitionEndpoint)) }) + + ng.RouteRegister.Group("/api/ngalert/", func(schedulerRouter routing.RouteRegister) { + schedulerRouter.Post("/pause", api.Wrap(ng.pauseScheduler)) + schedulerRouter.Post("/unpause", api.Wrap(ng.unpauseScheduler)) + }, middleware.ReqOrgAdmin) } -// conditionEval handles POST /api/alert-definitions/eval. -func (ng *AlertNG) conditionEval(c *models.ReqContext, dto evalAlertConditionCommand) api.Response { - alertCtx, cancelFn := context.WithTimeout(context.Background(), setting.AlertingEvaluationTimeout) - defer cancelFn() - - alertExecCtx := eval.AlertExecCtx{Ctx: alertCtx, SignedInUser: c.SignedInUser} - - fromStr := c.Query("from") - if fromStr == "" { - fromStr = "now-3h" +// conditionEvalEndpoint handles POST /api/alert-definitions/eval. +func (ng *AlertNG) conditionEvalEndpoint(c *models.ReqContext, dto evalAlertConditionCommand) api.Response { + if err := ng.validateCondition(dto.Condition, c.SignedInUser); err != nil { + return api.Error(400, "invalid condition", err) } - toStr := c.Query("to") - if toStr == "" { - toStr = "now" - } - - execResult, err := dto.Condition.Execute(alertExecCtx, fromStr, toStr) + evalResults, err := eval.ConditionEval(&dto.Condition, timeNow()) if err != nil { - return api.Error(400, "Failed to execute conditions", err) - } - - evalResults, err := eval.EvaluateExecutionResult(execResult) - if err != nil { - return api.Error(400, "Failed to evaluate results", err) + return api.Error(400, "Failed to evaluate conditions", err) } frame := evalResults.AsDataFrame() @@ -67,48 +52,34 @@ func (ng *AlertNG) conditionEval(c *models.ReqContext, dto evalAlertConditionCom }) } -// alertDefinitionEval handles GET /api/alert-definitions/eval/:dashboardId/:panelId/:refId". -func (ng *AlertNG) alertDefinitionEval(c *models.ReqContext) api.Response { +// alertDefinitionEvalEndpoint handles GET /api/alert-definitions/eval/:dashboardId/:panelId/:refId". +func (ng *AlertNG) alertDefinitionEvalEndpoint(c *models.ReqContext) api.Response { alertDefinitionID := c.ParamsInt64(":alertDefinitionId") - fromStr := c.Query("from") - if fromStr == "" { - fromStr = "now-3h" - } - - toStr := c.Query("to") - if toStr == "" { - toStr = "now" - } - - conditions, err := ng.LoadAlertCondition(alertDefinitionID, c.SignedInUser, c.SkipCache) + condition, err := ng.LoadAlertCondition(alertDefinitionID) if err != nil { - return api.Error(400, "Failed to load conditions", err) + return api.Error(400, "Failed to load alert definition conditions", err) } - alertCtx, cancelFn := context.WithTimeout(context.Background(), setting.AlertingEvaluationTimeout) - defer cancelFn() + if err := ng.validateCondition(*condition, c.SignedInUser); err != nil { + return api.Error(400, "invalid condition", err) + } - alertExecCtx := eval.AlertExecCtx{Ctx: alertCtx, SignedInUser: c.SignedInUser} - - execResult, err := conditions.Execute(alertExecCtx, fromStr, toStr) + evalResults, err := eval.ConditionEval(condition, timeNow()) if err != nil { - return api.Error(400, "Failed to execute conditions", err) + return api.Error(400, "Failed to evaludate alert", err) } - - evalResults, err := eval.EvaluateExecutionResult(execResult) - if err != nil { - return api.Error(400, "Failed to evaluate results", err) - } - frame := evalResults.AsDataFrame() df := tsdb.NewDecodedDataFrames([]*data.Frame{&frame}) + if err != nil { + return api.Error(400, "Failed to instantiate Dataframes from the decoded frames", err) + } + instances, err := df.Encoded() if err != nil { return api.Error(400, "Failed to encode result dataframes", err) } - return api.JSON(200, util.DynMap{ "instances": instances, }) @@ -133,51 +104,80 @@ func (ng *AlertNG) getAlertDefinitionEndpoint(c *models.ReqContext) api.Response func (ng *AlertNG) deleteAlertDefinitionEndpoint(c *models.ReqContext) api.Response { alertDefinitionID := c.ParamsInt64(":alertDefinitionId") - query := deleteAlertDefinitionByIDQuery{ + cmd := deleteAlertDefinitionByIDCommand{ ID: alertDefinitionID, OrgID: c.SignedInUser.OrgId, } - if err := ng.deleteAlertDefinitionByID(&query); err != nil { + if err := ng.deleteAlertDefinitionByID(&cmd); err != nil { return api.Error(500, "Failed to delete alert definition", err) } - return api.JSON(200, util.DynMap{"affectedRows": query.RowsAffected}) + if cmd.RowsAffected != 1 { + ng.log.Warn("unexpected number of rows affected on alert definition delete", "definitionID", alertDefinitionID, "rowsAffected", cmd.RowsAffected) + } + + return api.Success("Alert definition deleted") } // updateAlertDefinitionEndpoint handles PUT /api/alert-definitions/:alertDefinitionId. func (ng *AlertNG) updateAlertDefinitionEndpoint(c *models.ReqContext, cmd updateAlertDefinitionCommand) api.Response { cmd.ID = c.ParamsInt64(":alertDefinitionId") - cmd.SignedInUser = c.SignedInUser - cmd.SkipCache = c.SkipCache + cmd.OrgID = c.SignedInUser.OrgId + + if err := ng.validateCondition(cmd.Condition, c.SignedInUser); err != nil { + return api.Error(400, "invalid condition", err) + } if err := ng.updateAlertDefinition(&cmd); err != nil { return api.Error(500, "Failed to update alert definition", err) } - return api.JSON(200, util.DynMap{"affectedRows": cmd.RowsAffected, "id": cmd.Result.Id}) + if cmd.RowsAffected != 1 { + ng.log.Warn("unexpected number of rows affected on alert definition update", "definitionID", cmd.ID, "rowsAffected", cmd.RowsAffected) + } + + return api.Success("Alert definition updated") } // createAlertDefinitionEndpoint handles POST /api/alert-definitions. func (ng *AlertNG) createAlertDefinitionEndpoint(c *models.ReqContext, cmd saveAlertDefinitionCommand) api.Response { cmd.OrgID = c.SignedInUser.OrgId - cmd.SignedInUser = c.SignedInUser - cmd.SkipCache = c.SkipCache + + if err := ng.validateCondition(cmd.Condition, c.SignedInUser); err != nil { + return api.Error(400, "invalid condition", err) + } if err := ng.saveAlertDefinition(&cmd); err != nil { return api.Error(500, "Failed to create alert definition", err) } - return api.JSON(200, util.DynMap{"id": cmd.Result.Id}) + return api.JSON(200, util.DynMap{"id": cmd.Result.ID}) } // listAlertDefinitions handles GET /api/alert-definitions. func (ng *AlertNG) listAlertDefinitions(c *models.ReqContext) api.Response { - cmd := listAlertDefinitionsCommand{OrgID: c.SignedInUser.OrgId} + query := listAlertDefinitionsQuery{OrgID: c.SignedInUser.OrgId} - if err := ng.getAlertDefinitions(&cmd); err != nil { + if err := ng.getOrgAlertDefinitions(&query); err != nil { return api.Error(500, "Failed to list alert definitions", err) } - return api.JSON(200, util.DynMap{"results": cmd.Result}) + return api.JSON(200, util.DynMap{"results": query.Result}) +} + +func (ng *AlertNG) pauseScheduler() api.Response { + err := ng.schedule.pause() + if err != nil { + return api.Error(500, "Failed to pause scheduler", err) + } + return api.JSON(200, util.DynMap{"message": "alert definition scheduler paused"}) +} + +func (ng *AlertNG) unpauseScheduler() api.Response { + err := ng.schedule.unpause() + if err != nil { + return api.Error(500, "Failed to unpause scheduler", err) + } + return api.JSON(200, util.DynMap{"message": "alert definition scheduler unpaused"}) } diff --git a/pkg/services/ngalert/common_test.go b/pkg/services/ngalert/common_test.go new file mode 100644 index 00000000000..22441d4185c --- /dev/null +++ b/pkg/services/ngalert/common_test.go @@ -0,0 +1,85 @@ +package ngalert + +import ( + "encoding/json" + "fmt" + "math/rand" + "testing" + "time" + + "github.com/grafana/grafana/pkg/api/routing" + + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/registry" + "github.com/grafana/grafana/pkg/services/ngalert/eval" + "github.com/grafana/grafana/pkg/services/sqlstore" + "github.com/grafana/grafana/pkg/setting" + "github.com/stretchr/testify/require" +) + +func setupTestEnv(t *testing.T) *AlertNG { + cfg := setting.NewCfg() + cfg.FeatureToggles = map[string]bool{"ngalert": true} + + ng := overrideAlertNGInRegistry(cfg) + + sqlStore := sqlstore.InitTestDB(t) + ng.SQLStore = sqlStore + + err := ng.Init() + require.NoError(t, err) + return &ng +} + +func overrideAlertNGInRegistry(cfg *setting.Cfg) AlertNG { + ng := AlertNG{ + SQLStore: nil, + Cfg: cfg, + RouteRegister: routing.NewRouteRegister(), + log: log.New("ngalert-test"), + } + + overrideServiceFunc := func(descriptor registry.Descriptor) (*registry.Descriptor, bool) { + if _, ok := descriptor.Instance.(*AlertNG); ok { + return ®istry.Descriptor{ + Name: "AlertNG", + Instance: &ng, + InitPriority: descriptor.InitPriority, + }, true + } + return nil, false + } + + registry.RegisterOverride(overrideServiceFunc) + + return ng +} + +func createTestAlertDefinition(t *testing.T, ng *AlertNG, intervalSeconds int64) *AlertDefinition { + cmd := saveAlertDefinitionCommand{ + OrgID: 1, + Title: fmt.Sprintf("an alert definition %d", rand.Intn(1000)), + Condition: eval.Condition{ + RefID: "A", + QueriesAndExpressions: []eval.AlertQuery{ + { + Model: json.RawMessage(`{ + "datasource": "__expr__", + "type":"math", + "expression":"2 + 2 > 1" + }`), + RelativeTimeRange: eval.RelativeTimeRange{ + From: eval.Duration(5 * time.Hour), + To: eval.Duration(3 * time.Hour), + }, + RefID: "A", + }, + }, + }, + IntervalSeconds: &intervalSeconds, + } + err := ng.saveAlertDefinition(&cmd) + require.NoError(t, err) + t.Logf("alert definition: %d with interval: %d created", cmd.Result.ID, intervalSeconds) + return cmd.Result +} diff --git a/pkg/services/ngalert/database.go b/pkg/services/ngalert/database.go index 484d7dbbb6c..e09a30e818f 100644 --- a/pkg/services/ngalert/database.go +++ b/pkg/services/ngalert/database.go @@ -2,8 +2,11 @@ package ngalert import ( "context" + "errors" + "fmt" "github.com/grafana/grafana/pkg/services/sqlstore" + "github.com/grafana/grafana/pkg/util" ) func getAlertDefinitionByID(alertDefinitionID int64, sess *sqlstore.DBSession) (*AlertDefinition, error) { @@ -20,9 +23,9 @@ func getAlertDefinitionByID(alertDefinitionID int64, sess *sqlstore.DBSession) ( // deleteAlertDefinitionByID is a handler for deleting an alert definition. // It returns models.ErrAlertDefinitionNotFound if no alert definition is found for the provided ID. -func (ng *AlertNG) deleteAlertDefinitionByID(query *deleteAlertDefinitionByIDQuery) error { +func (ng *AlertNG) deleteAlertDefinitionByID(cmd *deleteAlertDefinitionByIDCommand) error { return ng.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error { - res, err := sess.Exec("DELETE FROM alert_definition WHERE id = ?", query.ID) + res, err := sess.Exec("DELETE FROM alert_definition WHERE id = ?", cmd.ID) if err != nil { return err } @@ -31,7 +34,13 @@ func (ng *AlertNG) deleteAlertDefinitionByID(query *deleteAlertDefinitionByIDQue if err != nil { return err } - query.RowsAffected = rowsAffected + cmd.RowsAffected = rowsAffected + + _, err = sess.Exec("DELETE FROM alert_definition_version WHERE alert_definition_id = ?", cmd.ID) + if err != nil { + return err + } + return nil }) } @@ -52,14 +61,29 @@ func (ng *AlertNG) getAlertDefinitionByID(query *getAlertDefinitionByIDQuery) er // saveAlertDefinition is a handler for saving a new alert definition. func (ng *AlertNG) saveAlertDefinition(cmd *saveAlertDefinitionCommand) error { return ng.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error { - alertDefinition := &AlertDefinition{ - OrgId: cmd.OrgID, - Name: cmd.Name, - Condition: cmd.Condition.RefID, - Data: cmd.Condition.QueriesAndExpressions, + intervalSeconds := defaultIntervalSeconds + if cmd.IntervalSeconds != nil { + intervalSeconds = *cmd.IntervalSeconds } - if err := ng.validateAlertDefinition(alertDefinition, cmd.SignedInUser, cmd.SkipCache); err != nil { + var initialVersion int64 = 1 + + uid, err := generateNewAlertDefinitionUID(sess, cmd.OrgID) + if err != nil { + return fmt.Errorf("failed to generate UID for alert definition %q: %w", cmd.Title, err) + } + + alertDefinition := &AlertDefinition{ + OrgID: cmd.OrgID, + Title: cmd.Title, + Condition: cmd.Condition.RefID, + Data: cmd.Condition.QueriesAndExpressions, + IntervalSeconds: intervalSeconds, + Version: initialVersion, + UID: uid, + } + + if err := ng.validateAlertDefinition(alertDefinition, false); err != nil { return err } @@ -71,6 +95,20 @@ func (ng *AlertNG) saveAlertDefinition(cmd *saveAlertDefinitionCommand) error { return err } + alertDefVersion := AlertDefinitionVersion{ + AlertDefinitionID: alertDefinition.ID, + AlertDefinitionUID: alertDefinition.UID, + Version: alertDefinition.Version, + Created: alertDefinition.Updated, + Condition: alertDefinition.Condition, + Title: alertDefinition.Title, + Data: alertDefinition.Data, + IntervalSeconds: alertDefinition.IntervalSeconds, + } + if _, err := sess.Insert(alertDefVersion); err != nil { + return err + } + cmd.Result = alertDefinition return nil }) @@ -81,13 +119,17 @@ func (ng *AlertNG) saveAlertDefinition(cmd *saveAlertDefinitionCommand) error { func (ng *AlertNG) updateAlertDefinition(cmd *updateAlertDefinitionCommand) error { return ng.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error { alertDefinition := &AlertDefinition{ - Id: cmd.ID, - Name: cmd.Name, + ID: cmd.ID, + Title: cmd.Title, Condition: cmd.Condition.RefID, Data: cmd.Condition.QueriesAndExpressions, + OrgID: cmd.OrgID, + } + if cmd.IntervalSeconds != nil { + alertDefinition.IntervalSeconds = *cmd.IntervalSeconds } - if err := ng.validateAlertDefinition(alertDefinition, cmd.SignedInUser, cmd.SkipCache); err != nil { + if err := ng.validateAlertDefinition(alertDefinition, true); err != nil { return err } @@ -95,27 +137,101 @@ func (ng *AlertNG) updateAlertDefinition(cmd *updateAlertDefinitionCommand) erro return err } + existingAlertDefinition, err := getAlertDefinitionByID(alertDefinition.ID, sess) + if err != nil { + if errors.Is(err, errAlertDefinitionNotFound) { + cmd.Result = alertDefinition + cmd.RowsAffected = 0 + return nil + } + return err + } + + alertDefinition.Version = existingAlertDefinition.Version + 1 + affectedRows, err := sess.ID(cmd.ID).Update(alertDefinition) if err != nil { return err } + title := cmd.Title + if title == "" { + title = existingAlertDefinition.Title + } + condition := cmd.Condition.RefID + if condition == "" { + condition = existingAlertDefinition.Condition + } + data := cmd.Condition.QueriesAndExpressions + if data == nil { + data = existingAlertDefinition.Data + } + intervalSeconds := cmd.IntervalSeconds + if intervalSeconds == nil { + intervalSeconds = &existingAlertDefinition.IntervalSeconds + } + + alertDefVersion := AlertDefinitionVersion{ + AlertDefinitionID: alertDefinition.ID, + AlertDefinitionUID: existingAlertDefinition.UID, + ParentVersion: existingAlertDefinition.Version, + Version: alertDefinition.Version, + Condition: condition, + Created: alertDefinition.Updated, + Title: title, + Data: data, + IntervalSeconds: *intervalSeconds, + } + if _, err := sess.Insert(alertDefVersion); err != nil { + return err + } + cmd.Result = alertDefinition cmd.RowsAffected = affectedRows return nil }) } -// getAlertDefinitions is a handler for retrieving alert definitions of specific organisation. -func (ng *AlertNG) getAlertDefinitions(cmd *listAlertDefinitionsCommand) error { +// getOrgAlertDefinitions is a handler for retrieving alert definitions of specific organisation. +func (ng *AlertNG) getOrgAlertDefinitions(query *listAlertDefinitionsQuery) error { return ng.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error { alertDefinitions := make([]*AlertDefinition, 0) q := "SELECT * FROM alert_definition WHERE org_id = ?" - if err := sess.SQL(q, cmd.OrgID).Find(&alertDefinitions); err != nil { + if err := sess.SQL(q, query.OrgID).Find(&alertDefinitions); err != nil { return err } - cmd.Result = alertDefinitions + query.Result = alertDefinitions return nil }) } + +func (ng *AlertNG) getAlertDefinitions(query *listAlertDefinitionsQuery) error { + return ng.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error { + alerts := make([]*AlertDefinition, 0) + q := "SELECT id, interval_seconds, version FROM alert_definition" + if err := sess.SQL(q).Find(&alerts); err != nil { + return err + } + + query.Result = alerts + return nil + }) +} + +func generateNewAlertDefinitionUID(sess *sqlstore.DBSession, orgID int64) (string, error) { + for i := 0; i < 3; i++ { + uid := util.GenerateShortUID() + + exists, err := sess.Where("org_id=? AND uid=?", orgID, uid).Get(&AlertDefinition{}) + if err != nil { + return "", err + } + + if !exists { + return uid, nil + } + } + + return "", errAlertDefinitionFailedGenerateUniqueUID +} diff --git a/pkg/services/ngalert/database_mig.go b/pkg/services/ngalert/database_mig.go new file mode 100644 index 00000000000..6d515a911d5 --- /dev/null +++ b/pkg/services/ngalert/database_mig.go @@ -0,0 +1,70 @@ +package ngalert + +import ( + "fmt" + + "github.com/grafana/grafana/pkg/services/sqlstore/migrator" +) + +func addAlertDefinitionMigrations(mg *migrator.Migrator) { + mg.AddMigration("delete alert_definition table", migrator.NewDropTableMigration("alert_definition")) + + alertDefinition := migrator.Table{ + Name: "alert_definition", + Columns: []*migrator.Column{ + {Name: "id", Type: migrator.DB_BigInt, IsPrimaryKey: true, IsAutoIncrement: true}, + {Name: "org_id", Type: migrator.DB_BigInt, Nullable: false}, + {Name: "title", Type: migrator.DB_NVarchar, Length: 190, Nullable: false}, + {Name: "condition", Type: migrator.DB_NVarchar, Length: 190, Nullable: false}, + {Name: "data", Type: migrator.DB_Text, Nullable: false}, + {Name: "updated", Type: migrator.DB_DateTime, Nullable: false}, + {Name: "interval_seconds", Type: migrator.DB_BigInt, Nullable: false, Default: fmt.Sprintf("%d", defaultIntervalSeconds)}, + {Name: "version", Type: migrator.DB_Int, Nullable: false, Default: "0"}, + {Name: "uid", Type: migrator.DB_NVarchar, Length: 40, Nullable: false, Default: "0"}, + }, + Indices: []*migrator.Index{ + {Cols: []string{"org_id", "title"}, Type: migrator.IndexType}, + {Cols: []string{"org_id", "uid"}, Type: migrator.IndexType}, + }, + } + // create table + mg.AddMigration("recreate alert_definition table", migrator.NewAddTableMigration(alertDefinition)) + + // create indices + mg.AddMigration("add index in alert_definition on org_id and title columns", migrator.NewAddIndexMigration(alertDefinition, alertDefinition.Indices[0])) + mg.AddMigration("add index in alert_definition on org_id and uid columns", migrator.NewAddIndexMigration(alertDefinition, alertDefinition.Indices[1])) + + mg.AddMigration("alter alert_definition table data column to mediumtext in mysql", migrator.NewRawSQLMigration(""). + Mysql("ALTER TABLE alert_definition MODIFY data MEDIUMTEXT;")) +} + +func addAlertDefinitionVersionMigrations(mg *migrator.Migrator) { + mg.AddMigration("delete alert_definition_version table", migrator.NewDropTableMigration("alert_definition_version")) + + alertDefinitionVersion := migrator.Table{ + Name: "alert_definition_version", + Columns: []*migrator.Column{ + {Name: "id", Type: migrator.DB_BigInt, IsPrimaryKey: true, IsAutoIncrement: true}, + {Name: "alert_definition_id", Type: migrator.DB_BigInt}, + {Name: "alert_definition_uid", Type: migrator.DB_NVarchar, Length: 40, Nullable: false, Default: "0"}, + {Name: "parent_version", Type: migrator.DB_Int, Nullable: false}, + {Name: "restored_from", Type: migrator.DB_Int, Nullable: false}, + {Name: "version", Type: migrator.DB_Int, Nullable: false}, + {Name: "created", Type: migrator.DB_DateTime, Nullable: false}, + {Name: "title", Type: migrator.DB_NVarchar, Length: 190, Nullable: false}, + {Name: "condition", Type: migrator.DB_NVarchar, Length: 190, Nullable: false}, + {Name: "data", Type: migrator.DB_Text, Nullable: false}, + {Name: "interval_seconds", Type: migrator.DB_BigInt, Nullable: false}, + }, + Indices: []*migrator.Index{ + {Cols: []string{"alert_definition_id", "version"}, Type: migrator.UniqueIndex}, + {Cols: []string{"alert_definition_uid", "version"}, Type: migrator.UniqueIndex}, + }, + } + mg.AddMigration("recreate alert_definition_version table", migrator.NewAddTableMigration(alertDefinitionVersion)) + mg.AddMigration("add index in alert_definition_version table on alert_definition_id and version columns", migrator.NewAddIndexMigration(alertDefinitionVersion, alertDefinitionVersion.Indices[0])) + mg.AddMigration("add index in alert_definition_version table on alert_definition_uid and version columns", migrator.NewAddIndexMigration(alertDefinitionVersion, alertDefinitionVersion.Indices[1])) + + mg.AddMigration("alter alert_definition_version table data column to mediumtext in mysql", migrator.NewRawSQLMigration(""). + Mysql("ALTER TABLE alert_definition_version MODIFY data MEDIUMTEXT;")) +} diff --git a/pkg/services/ngalert/database_test.go b/pkg/services/ngalert/database_test.go index bf115bc058a..eebc82dbbde 100644 --- a/pkg/services/ngalert/database_test.go +++ b/pkg/services/ngalert/database_test.go @@ -4,51 +4,116 @@ package ngalert import ( "encoding/json" + "errors" "testing" "time" "github.com/grafana/grafana/pkg/services/ngalert/eval" - "github.com/grafana/grafana/pkg/services/sqlstore" - "github.com/grafana/grafana/pkg/setting" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +func mockTimeNow() { + var timeSeed int64 + timeNow = func() time.Time { + fakeNow := time.Unix(timeSeed, 0).UTC() + timeSeed++ + return fakeNow + } +} + +func resetTimeNow() { + timeNow = time.Now +} + func TestCreatingAlertDefinition(t *testing.T) { - t.Run("should fail gracefully when creating alert definition with invalid relative time range", func(t *testing.T) { - ng := setupTestEnv(t) - q := saveAlertDefinitionCommand{ - OrgID: 1, - Name: "something completely different", - Condition: condition{ - RefID: "B", - QueriesAndExpressions: []eval.AlertQuery{ - { - Model: json.RawMessage(`{ - "datasource": "__expr__", - "type":"math", - "expression":"2 + 3 > 1" - }`), - RefID: "B", + mockTimeNow() + defer resetTimeNow() + + var customIntervalSeconds int64 = 120 + testCases := []struct { + desc string + inputIntervalSeconds *int64 + inputTitle string + expectedError error + expectedInterval int64 + expectedUpdated time.Time + }{ + { + desc: "should create successfuly an alert definition with default interval", + inputIntervalSeconds: nil, + inputTitle: "a name", + expectedInterval: defaultIntervalSeconds, + expectedUpdated: time.Unix(0, 0).UTC(), + }, + { + desc: "should create successfuly an alert definition with custom interval", + inputIntervalSeconds: &customIntervalSeconds, + inputTitle: "another name", + expectedInterval: customIntervalSeconds, + expectedUpdated: time.Unix(1, 0).UTC(), + }, + { + desc: "should fail to create an alert definition with too big name", + inputIntervalSeconds: &customIntervalSeconds, + inputTitle: getLongString(alertDefinitionMaxNameLength + 1), + expectedError: errors.New(""), + }, + } + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + ng := setupTestEnv(t) + q := saveAlertDefinitionCommand{ + OrgID: 1, + Title: tc.inputTitle, + Condition: eval.Condition{ + RefID: "B", + QueriesAndExpressions: []eval.AlertQuery{ + { + Model: json.RawMessage(`{ + "datasource": "__expr__", + "type":"math", + "expression":"2 + 3 > 1" + }`), + RefID: "B", + RelativeTimeRange: eval.RelativeTimeRange{ + From: eval.Duration(time.Duration(5) * time.Hour), + To: eval.Duration(time.Duration(3) * time.Hour), + }, + }, }, }, - }, - } - err := ng.saveAlertDefinition(&q) - require.NoError(t, err) - }) + } + if tc.inputIntervalSeconds != nil { + q.IntervalSeconds = tc.inputIntervalSeconds + } + err := ng.saveAlertDefinition(&q) + switch { + case tc.expectedError != nil: + require.Error(t, err) + default: + require.NoError(t, err) + assert.Equal(t, tc.expectedUpdated, q.Result.Updated) + assert.Equal(t, tc.expectedInterval, q.Result.IntervalSeconds) + assert.Equal(t, int64(1), q.Result.Version) + } + }) + } } func TestUpdatingAlertDefinition(t *testing.T) { + mockTimeNow() + defer resetTimeNow() + t.Run("zero rows affected when updating unknown alert", func(t *testing.T) { ng := setupTestEnv(t) q := updateAlertDefinitionCommand{ ID: 1, OrgID: 1, - Name: "something completely different", - Condition: condition{ + Title: "something completely different", + Condition: eval.Condition{ RefID: "A", QueriesAndExpressions: []eval.AlertQuery{ { @@ -72,15 +137,59 @@ func TestUpdatingAlertDefinition(t *testing.T) { assert.Equal(t, int64(0), q.RowsAffected) }) - t.Run("updating successfully existing alert", func(t *testing.T) { + t.Run("updating existing alert", func(t *testing.T) { ng := setupTestEnv(t) - alertDefinition := createTestAlertDefinition(t, ng) + var initialInterval int64 = 120 + alertDefinition := createTestAlertDefinition(t, ng, initialInterval) + created := alertDefinition.Updated + + var customInterval int64 = 30 + testCases := []struct { + desc string + inputOrgID int64 + inputTitle string + inputInterval *int64 + expectedError error + expectedIntervalSeconds int64 + expectedUpdated time.Time + }{ + { + desc: "should not update previous interval if it's not provided", + inputInterval: nil, + inputOrgID: alertDefinition.OrgID, + inputTitle: "something completely different", + expectedIntervalSeconds: initialInterval, + expectedUpdated: time.Unix(2, 0).UTC(), + }, + { + desc: "should update interval if it's provided", + inputInterval: &customInterval, + inputOrgID: alertDefinition.OrgID, + inputTitle: "something completely different", + expectedIntervalSeconds: customInterval, + expectedUpdated: time.Unix(3, 0).UTC(), + }, + { + desc: "should not update organisation if it's provided", + inputInterval: &customInterval, + inputOrgID: 0, + inputTitle: "something completely different", + expectedIntervalSeconds: customInterval, + expectedUpdated: time.Unix(4, 0).UTC(), + }, + { + desc: "should not update alert definition if the name it's too big", + inputInterval: &customInterval, + inputOrgID: 0, + inputTitle: getLongString(alertDefinitionMaxNameLength + 1), + expectedError: errors.New(""), + }, + } q := updateAlertDefinitionCommand{ - ID: (*alertDefinition).Id, - OrgID: 1, - Name: "something completely different", - Condition: condition{ + ID: (*alertDefinition).ID, + Title: "something completely different", + Condition: eval.Condition{ RefID: "B", QueriesAndExpressions: []eval.AlertQuery{ { @@ -99,17 +208,61 @@ func TestUpdatingAlertDefinition(t *testing.T) { }, } - err := ng.updateAlertDefinition(&q) - require.NoError(t, err) - assert.Equal(t, int64(1), q.RowsAffected) - assert.Equal(t, int64(1), q.Result.Id) + lastUpdated := created + previousAlertDefinition := alertDefinition + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + if tc.inputInterval != nil { + q.IntervalSeconds = tc.inputInterval + } + if tc.inputOrgID != 0 { + q.OrgID = tc.inputOrgID + } + q.Title = tc.inputTitle + err := ng.updateAlertDefinition(&q) + switch { + case tc.expectedError != nil: + require.Error(t, err) + + getAlertDefinitionByIDQuery := getAlertDefinitionByIDQuery{ID: (*alertDefinition).ID} + err = ng.getAlertDefinitionByID(&getAlertDefinitionByIDQuery) + require.NoError(t, err) + assert.Equal(t, previousAlertDefinition.Title, getAlertDefinitionByIDQuery.Result.Title) + assert.Equal(t, previousAlertDefinition.Condition, getAlertDefinitionByIDQuery.Result.Condition) + assert.Equal(t, len(previousAlertDefinition.Data), len(getAlertDefinitionByIDQuery.Result.Data)) + assert.Equal(t, previousAlertDefinition.IntervalSeconds, getAlertDefinitionByIDQuery.Result.IntervalSeconds) + assert.Equal(t, previousAlertDefinition.Updated, getAlertDefinitionByIDQuery.Result.Updated) + assert.Equal(t, previousAlertDefinition.Version, getAlertDefinitionByIDQuery.Result.Version) + assert.Equal(t, previousAlertDefinition.OrgID, getAlertDefinitionByIDQuery.Result.OrgID) + assert.Equal(t, previousAlertDefinition.UID, getAlertDefinitionByIDQuery.Result.UID) + default: + require.NoError(t, err) + assert.Equal(t, int64(1), q.RowsAffected) + assert.Equal(t, int64(1), q.Result.ID) + assert.True(t, q.Result.Updated.After(lastUpdated)) + assert.Equal(t, tc.expectedUpdated, q.Result.Updated) + assert.Equal(t, previousAlertDefinition.Version+1, q.Result.Version) + + assert.Equal(t, alertDefinition.OrgID, q.Result.OrgID) + + getAlertDefinitionByIDQuery := getAlertDefinitionByIDQuery{ID: (*alertDefinition).ID} + err = ng.getAlertDefinitionByID(&getAlertDefinitionByIDQuery) + require.NoError(t, err) + assert.Equal(t, "something completely different", getAlertDefinitionByIDQuery.Result.Title) + assert.Equal(t, "B", getAlertDefinitionByIDQuery.Result.Condition) + assert.Equal(t, 1, len(getAlertDefinitionByIDQuery.Result.Data)) + assert.Equal(t, tc.expectedUpdated, getAlertDefinitionByIDQuery.Result.Updated) + assert.Equal(t, tc.expectedIntervalSeconds, getAlertDefinitionByIDQuery.Result.IntervalSeconds) + assert.Equal(t, previousAlertDefinition.Version+1, getAlertDefinitionByIDQuery.Result.Version) + assert.Equal(t, alertDefinition.OrgID, getAlertDefinitionByIDQuery.Result.OrgID) + assert.Equal(t, alertDefinition.UID, getAlertDefinitionByIDQuery.Result.UID) + + previousAlertDefinition = getAlertDefinitionByIDQuery.Result + } + }) + + } - getAlertDefinitionByIDQuery := getAlertDefinitionByIDQuery{ID: (*alertDefinition).Id} - err = ng.getAlertDefinitionByID(&getAlertDefinitionByIDQuery) - require.NoError(t, err) - assert.Equal(t, "something completely different", getAlertDefinitionByIDQuery.Result.Name) - assert.Equal(t, "B", getAlertDefinitionByIDQuery.Result.Condition) - assert.Equal(t, 1, len(getAlertDefinitionByIDQuery.Result.Data)) }) } @@ -117,7 +270,7 @@ func TestDeletingAlertDefinition(t *testing.T) { t.Run("zero rows affected when deleting unknown alert", func(t *testing.T) { ng := setupTestEnv(t) - q := deleteAlertDefinitionByIDQuery{ + q := deleteAlertDefinitionByIDCommand{ ID: 1, OrgID: 1, } @@ -129,10 +282,10 @@ func TestDeletingAlertDefinition(t *testing.T) { t.Run("deleting successfully existing alert", func(t *testing.T) { ng := setupTestEnv(t) - alertDefinition := createTestAlertDefinition(t, ng) + alertDefinition := createTestAlertDefinition(t, ng, 60) - q := deleteAlertDefinitionByIDQuery{ - ID: (*alertDefinition).Id, + q := deleteAlertDefinitionByIDCommand{ + ID: (*alertDefinition).ID, OrgID: 1, } @@ -142,40 +295,10 @@ func TestDeletingAlertDefinition(t *testing.T) { }) } -func setupTestEnv(t *testing.T) AlertNG { - sqlStore := sqlstore.InitTestDB(t) - cfg := setting.Cfg{} - cfg.FeatureToggles = map[string]bool{"ngalert": true} - ng := AlertNG{ - SQLStore: sqlStore, - Cfg: &cfg, +func getLongString(n int) string { + b := make([]rune, n) + for i := range b { + b[i] = 'a' } - return ng -} - -func createTestAlertDefinition(t *testing.T, ng AlertNG) *AlertDefinition { - cmd := saveAlertDefinitionCommand{ - OrgID: 1, - Name: "an alert definition", - Condition: condition{ - RefID: "A", - QueriesAndExpressions: []eval.AlertQuery{ - { - Model: json.RawMessage(`{ - "datasource": "__expr__", - "type":"math", - "expression":"2 + 2 > 1" - }`), - RelativeTimeRange: eval.RelativeTimeRange{ - From: eval.Duration(5 * time.Hour), - To: eval.Duration(3 * time.Hour), - }, - RefID: "A", - }, - }, - }, - } - err := ng.saveAlertDefinition(&cmd) - require.NoError(t, err) - return cmd.Result + return string(b) } diff --git a/pkg/services/ngalert/eval/eval.go b/pkg/services/ngalert/eval/eval.go index b37109b09b0..fb95f04f247 100644 --- a/pkg/services/ngalert/eval/eval.go +++ b/pkg/services/ngalert/eval/eval.go @@ -10,9 +10,10 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana/pkg/expr" - "github.com/grafana/grafana/pkg/models" ) +const alertingEvaluationTimeout = 30 * time.Second + // invalidEvalResultFormatError is an error for invalid format of the alert definition evaluation results. type invalidEvalResultFormatError struct { refID string @@ -36,6 +37,7 @@ func (e *invalidEvalResultFormatError) Unwrap() error { // of the query or expression that will be evaluated. type Condition struct { RefID string `json:"refId"` + OrgID int64 `json:"-"` QueriesAndExpressions []AlertQuery `json:"queriesAndExpressions"` } @@ -85,14 +87,13 @@ func (c Condition) IsValid() bool { // AlertExecCtx is the context provided for executing an alert condition. type AlertExecCtx struct { - AlertDefitionID int64 - SignedInUser *models.SignedInUser + OrgID int64 Ctx context.Context } -// Execute runs the Condition's expressions or queries. -func (c *Condition) Execute(ctx AlertExecCtx, fromStr, toStr string) (*ExecutionResults, error) { +// execute runs the Condition's expressions or queries. +func (c *Condition) execute(ctx AlertExecCtx, now time.Time) (*ExecutionResults, error) { result := ExecutionResults{} if !c.IsValid() { return nil, fmt.Errorf("invalid conditions") @@ -101,7 +102,7 @@ func (c *Condition) Execute(ctx AlertExecCtx, fromStr, toStr string) (*Execution queryDataReq := &backend.QueryDataRequest{ PluginContext: backend.PluginContext{ - OrgID: ctx.SignedInUser.OrgId, + OrgID: ctx.OrgID, }, Queries: []backend.DataQuery{}, } @@ -128,7 +129,7 @@ func (c *Condition) Execute(ctx AlertExecCtx, fromStr, toStr string) (*Execution RefID: q.RefID, MaxDataPoints: maxDatapoints, QueryType: q.QueryType, - TimeRange: q.RelativeTimeRange.toTimeRange(time.Now()), + TimeRange: q.RelativeTimeRange.toTimeRange(now), }) } @@ -153,9 +154,9 @@ func (c *Condition) Execute(ctx AlertExecCtx, fromStr, toStr string) (*Execution return &result, nil } -// EvaluateExecutionResult takes the ExecutionResult, and returns a frame where +// evaluateExecutionResult takes the ExecutionResult, and returns a frame where // each column is a string type that holds a string representing its state. -func EvaluateExecutionResult(results *ExecutionResults) (Results, error) { +func evaluateExecutionResult(results *ExecutionResults) (Results, error) { evalResults := make([]result, 0) labels := make(map[string]bool) for _, f := range results.Results { @@ -207,3 +208,22 @@ func (evalResults Results) AsDataFrame() data.Frame { f := data.NewFrame("", fields...) return *f } + +// ConditionEval executes conditions and evaluates the result. +func ConditionEval(condition *Condition, now time.Time) (Results, error) { + alertCtx, cancelFn := context.WithTimeout(context.Background(), alertingEvaluationTimeout) + defer cancelFn() + + alertExecCtx := AlertExecCtx{OrgID: condition.OrgID, Ctx: alertCtx} + + execResult, err := condition.execute(alertExecCtx, now) + if err != nil { + return nil, fmt.Errorf("failed to execute conditions: %w", err) + } + + evalResults, err := evaluateExecutionResult(execResult) + if err != nil { + return nil, fmt.Errorf("failed to evaluate results: %w", err) + } + return evalResults, nil +} diff --git a/pkg/services/ngalert/fetcher.go b/pkg/services/ngalert/fetcher.go new file mode 100644 index 00000000000..f040163f3b2 --- /dev/null +++ b/pkg/services/ngalert/fetcher.go @@ -0,0 +1,15 @@ +package ngalert + +import ( + "time" +) + +func (ng *AlertNG) fetchAllDetails(now time.Time) []*AlertDefinition { + q := listAlertDefinitionsQuery{} + err := ng.getAlertDefinitions(&q) + if err != nil { + ng.schedule.log.Error("failed to fetch alert definitions", "now", now, "err", err) + return nil + } + return q.Result +} diff --git a/pkg/services/ngalert/middleware.go b/pkg/services/ngalert/middleware.go index 60b8f67d312..253baca9d41 100644 --- a/pkg/services/ngalert/middleware.go +++ b/pkg/services/ngalert/middleware.go @@ -13,7 +13,7 @@ func (ng *AlertNG) validateOrgAlertDefinition(c *models.ReqContext) { return } - if c.OrgId != query.Result.OrgId { + if c.OrgId != query.Result.OrgID { c.JsonApiErr(403, "You are not allowed to edit/view alert definition", nil) return } diff --git a/pkg/services/ngalert/models.go b/pkg/services/ngalert/models.go index 7a4a02ac83f..707906d4939 100644 --- a/pkg/services/ngalert/models.go +++ b/pkg/services/ngalert/models.go @@ -1,20 +1,42 @@ package ngalert import ( + "errors" "fmt" "time" - "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/services/ngalert/eval" ) +var errAlertDefinitionFailedGenerateUniqueUID = errors.New("failed to generate alert definition UID") + // AlertDefinition is the model for alert definitions in Alerting NG. type AlertDefinition struct { - Id int64 - OrgId int64 - Name string - Condition string - Data []eval.AlertQuery + ID int64 `xorm:"pk autoincr 'id'"` + OrgID int64 `xorm:"org_id"` + Title string + Condition string + Data []eval.AlertQuery + Updated time.Time + IntervalSeconds int64 + Version int64 + UID string `xorm:"uid"` +} + +// AlertDefinitionVersion is the model for alert definition versions in Alerting NG. +type AlertDefinitionVersion struct { + ID int64 `xorm:"pk autoincr 'id'"` + AlertDefinitionID int64 `xorm:"alert_definition_id"` + AlertDefinitionUID string `xorm:"alert_definition_uid"` + ParentVersion int64 + RestoredFrom int64 + Version int64 + + Created time.Time + Title string + Condition string + Data []eval.AlertQuery + IntervalSeconds int64 } var ( @@ -30,62 +52,42 @@ type getAlertDefinitionByIDQuery struct { Result *AlertDefinition } -type deleteAlertDefinitionByIDQuery struct { +type deleteAlertDefinitionByIDCommand struct { ID int64 OrgID int64 RowsAffected int64 } -// condition is the structure used by storing/updating alert definition commmands -type condition struct { - RefID string `json:"refId"` - - QueriesAndExpressions []eval.AlertQuery `json:"queriesAndExpressions"` -} - // saveAlertDefinitionCommand is the query for saving a new alert definition. type saveAlertDefinitionCommand struct { - Name string `json:"name"` - OrgID int64 `json:"-"` - Condition condition `json:"condition"` - SignedInUser *models.SignedInUser `json:"-"` - SkipCache bool `json:"-"` + Title string `json:"title"` + OrgID int64 `json:"-"` + Condition eval.Condition `json:"condition"` + IntervalSeconds *int64 `json:"interval_seconds"` Result *AlertDefinition } -// IsValid validates a SaveAlertDefinitionCommand. -// Always returns true. -func (cmd *saveAlertDefinitionCommand) IsValid() bool { - return true -} - // updateAlertDefinitionCommand is the query for updating an existing alert definition. type updateAlertDefinitionCommand struct { - ID int64 `json:"-"` - Name string `json:"name"` - OrgID int64 `json:"-"` - Condition condition `json:"condition"` - SignedInUser *models.SignedInUser `json:"-"` - SkipCache bool `json:"-"` + ID int64 `json:"-"` + Title string `json:"title"` + OrgID int64 `json:"-"` + Condition eval.Condition `json:"condition"` + IntervalSeconds *int64 `json:"interval_seconds"` + UID string `json:"-"` RowsAffected int64 Result *AlertDefinition } -// IsValid validates an UpdateAlertDefinitionCommand. -// Always returns true. -func (cmd *updateAlertDefinitionCommand) IsValid() bool { - return true -} - type evalAlertConditionCommand struct { Condition eval.Condition `json:"condition"` Now time.Time `json:"now"` } -type listAlertDefinitionsCommand struct { +type listAlertDefinitionsQuery struct { OrgID int64 `json:"-"` Result []*AlertDefinition diff --git a/pkg/services/ngalert/ngalert.go b/pkg/services/ngalert/ngalert.go index 3a4f5ec85c9..9399f1dd7e7 100644 --- a/pkg/services/ngalert/ngalert.go +++ b/pkg/services/ngalert/ngalert.go @@ -1,11 +1,14 @@ package ngalert import ( + "context" + "time" + + "github.com/benbjohnson/clock" "github.com/grafana/grafana/pkg/services/ngalert/eval" "github.com/grafana/grafana/pkg/api/routing" "github.com/grafana/grafana/pkg/infra/log" - "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/registry" "github.com/grafana/grafana/pkg/services/datasources" "github.com/grafana/grafana/pkg/services/sqlstore" @@ -13,6 +16,18 @@ import ( "github.com/grafana/grafana/pkg/setting" ) +const ( + maxAttempts int64 = 3 + // scheduler interval + // changing this value is discouraged + // because this could cause existing alert definition + // with intervals that are not exactly divided by this number + // not to be evaluated + baseIntervalSeconds = 10 + // default alert definiiton interval + defaultIntervalSeconds int64 = 6 * baseIntervalSeconds +) + // AlertNG is the service for evaluating the condition of an alert definition. type AlertNG struct { Cfg *setting.Cfg `inject:""` @@ -20,6 +35,7 @@ type AlertNG struct { RouteRegister routing.RouteRegister `inject:""` SQLStore *sqlstore.SQLStore `inject:""` log log.Logger + schedule *schedule } func init() { @@ -31,14 +47,20 @@ func (ng *AlertNG) Init() error { ng.log = log.New("ngalert") ng.registerAPIEndpoints() - + ng.schedule = newScheduler(clock.New(), baseIntervalSeconds*time.Second, ng.log, nil) return nil } +// Run starts the scheduler +func (ng *AlertNG) Run(ctx context.Context) error { + ng.log.Debug("ngalert starting") + return ng.alertingTicker(ctx) +} + // IsDisabled returns true if the alerting service is disable for this instance. func (ng *AlertNG) IsDisabled() bool { if ng.Cfg == nil { - return false + return true } // Check also about expressions? return !ng.Cfg.IsNgAlertEnabled() @@ -50,42 +72,26 @@ func (ng *AlertNG) AddMigration(mg *migrator.Migrator) { if ng.IsDisabled() { return } - - alertDefinition := migrator.Table{ - Name: "alert_definition", - Columns: []*migrator.Column{ - {Name: "id", Type: migrator.DB_BigInt, IsPrimaryKey: true, IsAutoIncrement: true}, - {Name: "org_id", Type: migrator.DB_BigInt, Nullable: false}, - {Name: "name", Type: migrator.DB_NVarchar, Length: 255, Nullable: false}, - {Name: "condition", Type: migrator.DB_NVarchar, Length: 255, Nullable: false}, - {Name: "data", Type: migrator.DB_Text, Nullable: false}, - }, - Indices: []*migrator.Index{ - {Cols: []string{"org_id"}, Type: migrator.IndexType}, - }, - } - // create table - mg.AddMigration("create alert_definition table", migrator.NewAddTableMigration(alertDefinition)) - - // create indices - mg.AddMigration("add index alert_definition org_id", migrator.NewAddIndexMigration(alertDefinition, alertDefinition.Indices[0])) + addAlertDefinitionMigrations(mg) + addAlertDefinitionVersionMigrations(mg) } // LoadAlertCondition returns a Condition object for the given alertDefinitionID. -func (ng *AlertNG) LoadAlertCondition(alertDefinitionID int64, signedInUser *models.SignedInUser, skipCache bool) (*eval.Condition, error) { +func (ng *AlertNG) LoadAlertCondition(alertDefinitionID int64) (*eval.Condition, error) { getAlertDefinitionByIDQuery := getAlertDefinitionByIDQuery{ID: alertDefinitionID} if err := ng.getAlertDefinitionByID(&getAlertDefinitionByIDQuery); err != nil { return nil, err } alertDefinition := getAlertDefinitionByIDQuery.Result - err := ng.validateAlertDefinition(alertDefinition, signedInUser, skipCache) + err := ng.validateAlertDefinition(alertDefinition, true) if err != nil { return nil, err } return &eval.Condition{ RefID: alertDefinition.Condition, + OrgID: alertDefinition.OrgID, QueriesAndExpressions: alertDefinition.Data, }, nil } diff --git a/pkg/services/ngalert/schedule.go b/pkg/services/ngalert/schedule.go new file mode 100644 index 00000000000..1210728ccf3 --- /dev/null +++ b/pkg/services/ngalert/schedule.go @@ -0,0 +1,291 @@ +package ngalert + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/benbjohnson/clock" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/services/alerting" + "github.com/grafana/grafana/pkg/services/ngalert/eval" + "golang.org/x/sync/errgroup" +) + +func (ng *AlertNG) definitionRoutine(grafanaCtx context.Context, definitionID int64, evalCh <-chan *evalContext) error { + ng.log.Debug("alert definition routine started", "definitionID", definitionID) + + evalRunning := false + var start, end time.Time + var attempt int64 + var alertDefinition *AlertDefinition + for { + select { + case ctx := <-evalCh: + if evalRunning { + continue + } + + evaluate := func(attempt int64) error { + start = timeNow() + + // fetch latest alert definition version + if alertDefinition == nil || alertDefinition.Version < ctx.version { + q := getAlertDefinitionByIDQuery{ID: definitionID} + err := ng.getAlertDefinitionByID(&q) + if err != nil { + ng.schedule.log.Error("failed to fetch alert definition", "alertDefinitionID", alertDefinition.ID) + return err + } + alertDefinition = q.Result + ng.schedule.log.Debug("new alert definition version fetched", "alertDefinitionID", alertDefinition.ID, "version", alertDefinition.Version) + } + + condition := eval.Condition{ + RefID: alertDefinition.Condition, + OrgID: alertDefinition.OrgID, + QueriesAndExpressions: alertDefinition.Data, + } + results, err := eval.ConditionEval(&condition, ctx.now) + end = timeNow() + if err != nil { + ng.schedule.log.Error("failed to evaluate alert definition", "definitionID", definitionID, "attempt", attempt, "now", ctx.now, "duration", end.Sub(start), "error", err) + return err + } + for _, r := range results { + ng.schedule.log.Info("alert definition result", "definitionID", definitionID, "attempt", attempt, "now", ctx.now, "duration", end.Sub(start), "instance", r.Instance, "state", r.State.String()) + } + return nil + } + + func() { + evalRunning = true + defer func() { + evalRunning = false + if ng.schedule.evalApplied != nil { + ng.schedule.evalApplied(definitionID, ctx.now) + } + }() + + for attempt = 0; attempt < ng.schedule.maxAttempts; attempt++ { + err := evaluate(attempt) + if err == nil { + break + } + } + }() + case id := <-ng.schedule.stop: + if id == definitionID { + ng.schedule.log.Debug("stopping alert definition routine", "definitionID", definitionID) + // interrupt evaluation if it's running + return nil + } + case <-grafanaCtx.Done(): + return grafanaCtx.Err() + } + } +} + +type schedule struct { + // base tick rate (fastest possible configured check) + baseInterval time.Duration + + // each alert definition gets its own channel and routine + registry alertDefinitionRegistry + + // broadcast channel for stopping definition routines + stop chan int64 + + maxAttempts int64 + + clock clock.Clock + + heartbeat *alerting.Ticker + + // evalApplied is only used for tests: test code can set it to non-nil + // function, and then it'll be called from the event loop whenever the + // message from evalApplied is handled. + evalApplied func(int64, time.Time) + + log log.Logger +} + +// newScheduler returns a new schedule. +func newScheduler(c clock.Clock, baseInterval time.Duration, logger log.Logger, evalApplied func(int64, time.Time)) *schedule { + ticker := alerting.NewTicker(c.Now(), time.Second*0, c, int64(baseInterval.Seconds())) + sch := schedule{ + registry: alertDefinitionRegistry{alertDefinitionInfo: make(map[int64]alertDefinitionInfo)}, + stop: make(chan int64), + maxAttempts: maxAttempts, + clock: c, + baseInterval: baseInterval, + log: logger, + heartbeat: ticker, + evalApplied: evalApplied, + } + return &sch +} + +func (sch *schedule) pause() error { + if sch == nil { + return fmt.Errorf("scheduler is not initialised") + } + sch.heartbeat.Pause() + sch.log.Info("alert definition scheduler paused", "now", sch.clock.Now()) + return nil +} + +func (sch *schedule) unpause() error { + if sch == nil { + return fmt.Errorf("scheduler is not initialised") + } + sch.heartbeat.Unpause() + sch.log.Info("alert definition scheduler unpaused", "now", sch.clock.Now()) + return nil +} + +func (ng *AlertNG) alertingTicker(grafanaCtx context.Context) error { + dispatcherGroup, ctx := errgroup.WithContext(grafanaCtx) + for { + select { + case tick := <-ng.schedule.heartbeat.C: + tickNum := tick.Unix() / int64(ng.schedule.baseInterval.Seconds()) + alertDefinitions := ng.fetchAllDetails(tick) + ng.schedule.log.Debug("alert definitions fetched", "count", len(alertDefinitions)) + + // registeredDefinitions is a map used for finding deleted alert definitions + // initially it is assigned to all known alert definitions from the previous cycle + // each alert definition found also in this cycle is removed + // so, at the end, the remaining registered alert definitions are the deleted ones + registeredDefinitions := ng.schedule.registry.keyMap() + + type readyToRunItem struct { + id int64 + definitionInfo alertDefinitionInfo + } + readyToRun := make([]readyToRunItem, 0) + for _, item := range alertDefinitions { + itemID := item.ID + itemVersion := item.Version + newRoutine := !ng.schedule.registry.exists(itemID) + definitionInfo := ng.schedule.registry.getOrCreateInfo(itemID, itemVersion) + invalidInterval := item.IntervalSeconds%int64(ng.schedule.baseInterval.Seconds()) != 0 + + if newRoutine && !invalidInterval { + dispatcherGroup.Go(func() error { + return ng.definitionRoutine(ctx, itemID, definitionInfo.ch) + }) + } + + if invalidInterval { + // this is expected to be always false + // give that we validate interval during alert definition updates + ng.schedule.log.Debug("alert definition with invalid interval will be ignored: interval should be divided exactly by scheduler interval", "definitionID", itemID, "interval", time.Duration(item.IntervalSeconds)*time.Second, "scheduler interval", ng.schedule.baseInterval) + continue + } + + itemFrequency := item.IntervalSeconds / int64(ng.schedule.baseInterval.Seconds()) + if item.IntervalSeconds != 0 && tickNum%itemFrequency == 0 { + readyToRun = append(readyToRun, readyToRunItem{id: itemID, definitionInfo: definitionInfo}) + } + + // remove the alert definition from the registered alert definitions + delete(registeredDefinitions, itemID) + } + + var step int64 = 0 + if len(readyToRun) > 0 { + step = ng.schedule.baseInterval.Nanoseconds() / int64(len(readyToRun)) + } + + for i := range readyToRun { + item := readyToRun[i] + + time.AfterFunc(time.Duration(int64(i)*step), func() { + item.definitionInfo.ch <- &evalContext{now: tick, version: item.definitionInfo.version} + }) + } + + // unregister and stop routines of the deleted alert definitions + for id := range registeredDefinitions { + ng.schedule.stop <- id + ng.schedule.registry.del(id) + } + case <-grafanaCtx.Done(): + err := dispatcherGroup.Wait() + return err + } + } +} + +type alertDefinitionRegistry struct { + mu sync.Mutex + alertDefinitionInfo map[int64]alertDefinitionInfo +} + +// getOrCreateInfo returns the channel for the specific alert definition +// if it does not exists creates one and returns it +func (r *alertDefinitionRegistry) getOrCreateInfo(definitionID int64, definitionVersion int64) alertDefinitionInfo { + r.mu.Lock() + defer r.mu.Unlock() + + info, ok := r.alertDefinitionInfo[definitionID] + if !ok { + r.alertDefinitionInfo[definitionID] = alertDefinitionInfo{ch: make(chan *evalContext), version: definitionVersion} + return r.alertDefinitionInfo[definitionID] + } + info.version = definitionVersion + r.alertDefinitionInfo[definitionID] = info + return info +} + +func (r *alertDefinitionRegistry) exists(definitionID int64) bool { + r.mu.Lock() + defer r.mu.Unlock() + + _, ok := r.alertDefinitionInfo[definitionID] + return ok +} + +func (r *alertDefinitionRegistry) del(definitionID int64) { + r.mu.Lock() + defer r.mu.Unlock() + + delete(r.alertDefinitionInfo, definitionID) +} + +func (r *alertDefinitionRegistry) iter() <-chan int64 { + c := make(chan int64) + + f := func() { + r.mu.Lock() + defer r.mu.Unlock() + + for k := range r.alertDefinitionInfo { + c <- k + } + close(c) + } + go f() + + return c +} + +func (r *alertDefinitionRegistry) keyMap() map[int64]struct{} { + definitionsIDs := make(map[int64]struct{}) + for definitionID := range r.iter() { + definitionsIDs[definitionID] = struct{}{} + } + return definitionsIDs +} + +type alertDefinitionInfo struct { + ch chan *evalContext + version int64 +} + +type evalContext struct { + now time.Time + version int64 +} diff --git a/pkg/services/ngalert/schedule_test.go b/pkg/services/ngalert/schedule_test.go new file mode 100644 index 00000000000..918036a5764 --- /dev/null +++ b/pkg/services/ngalert/schedule_test.go @@ -0,0 +1,153 @@ +package ngalert + +import ( + "context" + "fmt" + "runtime" + "strconv" + "strings" + "testing" + "time" + + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/registry" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/benbjohnson/clock" +) + +type evalAppliedInfo struct { + alertDefID int64 + now time.Time +} + +func TestAlertingTicker(t *testing.T) { + ng := setupTestEnv(t) + t.Cleanup(registry.ClearOverrides) + + mockedClock := clock.NewMock() + ng.schedule = newScheduler(mockedClock, time.Second, log.New("ngalert.schedule.test"), nil) + + alerts := make([]*AlertDefinition, 0) + + // create alert definition with zero interval (should never run) + alerts = append(alerts, createTestAlertDefinition(t, ng, 0)) + + // create alert definition with one second interval + alerts = append(alerts, createTestAlertDefinition(t, ng, 1)) + + evalAppliedCh := make(chan evalAppliedInfo, len(alerts)) + + ng.schedule.evalApplied = func(alertDefID int64, now time.Time) { + evalAppliedCh <- evalAppliedInfo{alertDefID: alertDefID, now: now} + } + + ctx := context.Background() + go func() { + err := ng.alertingTicker(ctx) + require.NoError(t, err) + }() + runtime.Gosched() + + expectedAlertDefinitionsEvaluated := []int64{alerts[1].ID} + t.Run(fmt.Sprintf("on 1st tick alert definitions: %s should be evaluated", concatenate(expectedAlertDefinitionsEvaluated)), func(t *testing.T) { + tick := advanceClock(t, mockedClock) + assertEvalRun(t, evalAppliedCh, tick, expectedAlertDefinitionsEvaluated...) + }) + + // change alert definition interval to three seconds + var threeSecInterval int64 = 3 + err := ng.updateAlertDefinition(&updateAlertDefinitionCommand{ + ID: alerts[0].ID, + IntervalSeconds: &threeSecInterval, + OrgID: alerts[0].OrgID, + }) + require.NoError(t, err) + t.Logf("alert definition: %d interval reset to: %d", alerts[0].ID, threeSecInterval) + + expectedAlertDefinitionsEvaluated = []int64{alerts[1].ID} + t.Run(fmt.Sprintf("on 2nd tick alert definition: %s should be evaluated", concatenate(expectedAlertDefinitionsEvaluated)), func(t *testing.T) { + tick := advanceClock(t, mockedClock) + assertEvalRun(t, evalAppliedCh, tick, expectedAlertDefinitionsEvaluated...) + }) + + expectedAlertDefinitionsEvaluated = []int64{alerts[1].ID, alerts[0].ID} + t.Run(fmt.Sprintf("on 3rd tick alert definitions: %s should be evaluated", concatenate(expectedAlertDefinitionsEvaluated)), func(t *testing.T) { + tick := advanceClock(t, mockedClock) + assertEvalRun(t, evalAppliedCh, tick, expectedAlertDefinitionsEvaluated...) + }) + + expectedAlertDefinitionsEvaluated = []int64{alerts[1].ID} + t.Run(fmt.Sprintf("on 4th tick alert definitions: %s should be evaluated", concatenate(expectedAlertDefinitionsEvaluated)), func(t *testing.T) { + tick := advanceClock(t, mockedClock) + assertEvalRun(t, evalAppliedCh, tick, expectedAlertDefinitionsEvaluated...) + }) + + err = ng.deleteAlertDefinitionByID(&deleteAlertDefinitionByIDCommand{ID: alerts[1].ID}) + require.NoError(t, err) + t.Logf("alert definition: %d deleted", alerts[1].ID) + + expectedAlertDefinitionsEvaluated = []int64{} + t.Run(fmt.Sprintf("on 5th tick alert definitions: %s should be evaluated", concatenate(expectedAlertDefinitionsEvaluated)), func(t *testing.T) { + tick := advanceClock(t, mockedClock) + assertEvalRun(t, evalAppliedCh, tick, expectedAlertDefinitionsEvaluated...) + }) + + expectedAlertDefinitionsEvaluated = []int64{alerts[0].ID} + t.Run(fmt.Sprintf("on 6th tick alert definitions: %s should be evaluated", concatenate(expectedAlertDefinitionsEvaluated)), func(t *testing.T) { + tick := advanceClock(t, mockedClock) + assertEvalRun(t, evalAppliedCh, tick, expectedAlertDefinitionsEvaluated...) + }) + + // create alert definition with one second interval + alerts = append(alerts, createTestAlertDefinition(t, ng, 1)) + + expectedAlertDefinitionsEvaluated = []int64{alerts[2].ID} + t.Run(fmt.Sprintf("on 7th tick alert definitions: %s should be evaluated", concatenate(expectedAlertDefinitionsEvaluated)), func(t *testing.T) { + tick := advanceClock(t, mockedClock) + assertEvalRun(t, evalAppliedCh, tick, expectedAlertDefinitionsEvaluated...) + }) +} + +func assertEvalRun(t *testing.T, ch <-chan evalAppliedInfo, tick time.Time, ids ...int64) { + timeout := time.After(time.Second) + + expected := make(map[int64]struct{}, len(ids)) + for _, id := range ids { + expected[id] = struct{}{} + } + + for { + select { + case info := <-ch: + _, ok := expected[info.alertDefID] + t.Logf("alert definition: %d evaluated at: %v", info.alertDefID, info.now) + assert.True(t, ok) + assert.Equal(t, tick, info.now) + delete(expected, info.alertDefID) + if len(expected) == 0 { + return + } + case <-timeout: + if len(expected) == 0 { + return + } + t.Fatal("cycle has expired") + } + } +} + +func advanceClock(t *testing.T, mockedClock *clock.Mock) time.Time { + mockedClock.Add(time.Second) + return mockedClock.Now() + // t.Logf("Tick: %v", mockedClock.Now()) +} + +func concatenate(ids []int64) string { + s := make([]string, len(ids)) + for _, id := range ids { + s = append(s, strconv.FormatInt(id, 10)) + } + return fmt.Sprintf("[%s]", strings.TrimLeft(strings.Join(s, ","), ",")) +} diff --git a/pkg/services/ngalert/validator.go b/pkg/services/ngalert/validator.go index 215a386346f..47b18a6c4d6 100644 --- a/pkg/services/ngalert/validator.go +++ b/pkg/services/ngalert/validator.go @@ -2,32 +2,68 @@ package ngalert import ( "fmt" + "time" "github.com/grafana/grafana/pkg/expr" "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/services/ngalert/eval" ) -// validateAlertDefinition validates that the alert definition contains at least one alert query -// and that alert queries refer to existing datasources. -func (ng *AlertNG) validateAlertDefinition(alertDefinition *AlertDefinition, signedInUser *models.SignedInUser, skipCache bool) error { - if len(alertDefinition.Data) == 0 { +const alertDefinitionMaxNameLength = 190 + +// validateAlertDefinition validates the alert definition interval and organisation. +// If requireData is true checks that it contains at least one alert query +func (ng *AlertNG) validateAlertDefinition(alertDefinition *AlertDefinition, requireData bool) error { + if !requireData && len(alertDefinition.Data) == 0 { return fmt.Errorf("no queries or expressions are found") } - for _, query := range alertDefinition.Data { + if alertDefinition.IntervalSeconds%int64(ng.schedule.baseInterval.Seconds()) != 0 { + return fmt.Errorf("invalid interval: %v: interval should be divided exactly by scheduler interval: %v", time.Duration(alertDefinition.IntervalSeconds)*time.Second, ng.schedule.baseInterval) + } + + // enfore max name length in SQLite + if len(alertDefinition.Title) > alertDefinitionMaxNameLength { + return fmt.Errorf("name length should not be greater than %d", alertDefinitionMaxNameLength) + } + + if alertDefinition.OrgID == 0 { + return fmt.Errorf("no organisation is found") + } + + return nil +} + +// validateCondition validates that condition queries refer to existing datasources +func (ng *AlertNG) validateCondition(c eval.Condition, user *models.SignedInUser) error { + var refID string + + if len(c.QueriesAndExpressions) == 0 { + return nil + } + + for _, query := range c.QueriesAndExpressions { + if c.RefID == query.RefID { + refID = c.RefID + } + datasourceID, err := query.GetDatasource() if err != nil { return err } if datasourceID == expr.DatasourceID { - return nil + continue } - _, err = ng.DatasourceCache.GetDatasource(datasourceID, signedInUser, skipCache) + _, err = ng.DatasourceCache.GetDatasource(datasourceID, user, false) if err != nil { return err } } + + if refID == "" { + return fmt.Errorf("condition %s not found in any query or expression", c.RefID) + } return nil } diff --git a/pkg/services/sqlstore/migrator/migrator.go b/pkg/services/sqlstore/migrator/migrator.go index 80b17fbab60..d86791d6456 100644 --- a/pkg/services/sqlstore/migrator/migrator.go +++ b/pkg/services/sqlstore/migrator/migrator.go @@ -170,7 +170,7 @@ func (mg *Migrator) inTransaction(callback dbTransactionFunc) error { if err := callback(sess); err != nil { if rollErr := sess.Rollback(); !errors.Is(err, rollErr) { - return errutil.Wrapf(err, "failed to roll back transaction due to error: %s", rollErr) + return errutil.Wrapf(err, "failed to roll back transaction due to error: %s; initial err: %s", rollErr, err) } return err