mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
AlertingNG: manage and evaluate alert definitions via the API (#28377)
* Alerting NG: prototype v2 (WIP) * Separate eval package * Modify eval alert definition endpoint * Disable migration if ngalert is not enabled * Remove premature test * Fix lint issues * Delete obsolete struct * Apply suggestions from code review * Update pkg/services/ngalert/ngalert.go Co-authored-by: Kyle Brandt <kyle@grafana.com> * Add API endpoint for listing alert definitions * Introduce index for alert_definition table * make ds object for expression to avoid panic * wrap error * Update pkg/services/ngalert/eval/eval.go * Swith to backend.DataQuery * Export TransformWrapper callback * Fix lint issues * Update pkg/services/ngalert/ngalert.go Co-authored-by: Kyle Brandt <kyle@grafana.com> * Validate alert definitions before storing them * Introduce AlertQuery * Add test * Add QueryType in AlertQuery * Accept only float64 (seconds) durations * Apply suggestions from code review * Get rid of bus * Do not export symbols * Fix failing test * Fix failure due to service initialization order Introduce MediumHigh service priority and assign it to backendplugin service * Fix test * Apply suggestions from code review * Fix renamed reference Co-authored-by: Kyle Brandt <kyle@grafana.com>
This commit is contained in:
parent
a1e80af800
commit
43f580c299
@ -357,13 +357,6 @@ func (hs *HTTPServer) registerRoutes() {
|
||||
alertsRoute.Get("/states-for-dashboard", Wrap(GetAlertStatesForDashboard))
|
||||
})
|
||||
|
||||
if hs.Cfg.IsNgAlertEnabled() {
|
||||
apiRoute.Group("/alert-definitions", func(alertDefinitions routing.RouteRegister) {
|
||||
alertDefinitions.Get("/eval/:dashboardID/:panelID/:refID", reqEditorRole, Wrap(hs.AlertDefinitionEval))
|
||||
alertDefinitions.Post("/eval", reqEditorRole, bind(dtos.EvalAlertConditionCommand{}), Wrap(hs.ConditionEval))
|
||||
})
|
||||
}
|
||||
|
||||
apiRoute.Get("/alert-notifiers", reqEditorRole, Wrap(GetAlertNotifiers))
|
||||
|
||||
apiRoute.Group("/alert-notifications", func(alertNotifications routing.RouteRegister) {
|
||||
|
@ -1,12 +0,0 @@
|
||||
package dtos
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
eval "github.com/grafana/grafana/pkg/services/ngalert"
|
||||
)
|
||||
|
||||
type EvalAlertConditionCommand struct {
|
||||
Condition eval.Condition `json:"condition"`
|
||||
Now time.Time `json:"now"`
|
||||
}
|
@ -31,7 +31,6 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
"github.com/grafana/grafana/pkg/services/hooks"
|
||||
"github.com/grafana/grafana/pkg/services/login"
|
||||
eval "github.com/grafana/grafana/pkg/services/ngalert"
|
||||
"github.com/grafana/grafana/pkg/services/provisioning"
|
||||
"github.com/grafana/grafana/pkg/services/quota"
|
||||
"github.com/grafana/grafana/pkg/services/rendering"
|
||||
@ -73,7 +72,6 @@ type HTTPServer struct {
|
||||
BackendPluginManager backendplugin.Manager `inject:""`
|
||||
PluginManager *plugins.PluginManager `inject:""`
|
||||
SearchService *search.SearchService `inject:""`
|
||||
AlertNG *eval.AlertNG `inject:""`
|
||||
ShortURLService *shorturls.ShortURLService `inject:""`
|
||||
Live *live.GrafanaLive `inject:""`
|
||||
Listener net.Listener
|
||||
|
@ -1,101 +0,0 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana/pkg/api/dtos"
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
eval "github.com/grafana/grafana/pkg/services/ngalert"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/tsdb"
|
||||
"github.com/grafana/grafana/pkg/util"
|
||||
)
|
||||
|
||||
// POST /api/alert-definitions/eval
|
||||
func (hs *HTTPServer) ConditionEval(c *models.ReqContext, dto dtos.EvalAlertConditionCommand) Response {
|
||||
alertCtx, cancelFn := context.WithTimeout(context.Background(), setting.AlertingEvaluationTimeout)
|
||||
defer cancelFn()
|
||||
|
||||
alertExecCtx := eval.AlertExecCtx{Ctx: alertCtx, SignedInUser: c.SignedInUser}
|
||||
|
||||
fromStr := c.Query("from")
|
||||
if fromStr == "" {
|
||||
fromStr = "now-3h"
|
||||
}
|
||||
|
||||
toStr := c.Query("to")
|
||||
if toStr == "" {
|
||||
toStr = "now"
|
||||
}
|
||||
|
||||
execResult, err := dto.Condition.Execute(alertExecCtx, fromStr, toStr)
|
||||
if err != nil {
|
||||
return Error(400, "Failed to execute conditions", err)
|
||||
}
|
||||
|
||||
evalResults, err := eval.EvaluateExecutionResult(execResult)
|
||||
if err != nil {
|
||||
return Error(400, "Failed to evaluate results", err)
|
||||
}
|
||||
|
||||
frame := evalResults.AsDataFrame()
|
||||
df := tsdb.NewDecodedDataFrames([]*data.Frame{&frame})
|
||||
instances, err := df.Encoded()
|
||||
if err != nil {
|
||||
return Error(400, "Failed to encode result dataframes", err)
|
||||
}
|
||||
|
||||
return JSON(200, util.DynMap{
|
||||
"instances": instances,
|
||||
})
|
||||
}
|
||||
|
||||
// GET /api/alert-definitions/eval/:dashboardId/:panelId/:refId"
|
||||
func (hs *HTTPServer) AlertDefinitionEval(c *models.ReqContext) Response {
|
||||
dashboardID := c.ParamsInt64(":dashboardID")
|
||||
panelID := c.ParamsInt64(":panelID")
|
||||
conditionRefID := c.Params(":refID")
|
||||
|
||||
fromStr := c.Query("from")
|
||||
if fromStr == "" {
|
||||
fromStr = "now-3h"
|
||||
}
|
||||
|
||||
toStr := c.Query("to")
|
||||
if toStr == "" {
|
||||
toStr = "now"
|
||||
}
|
||||
|
||||
conditions, err := hs.AlertNG.LoadAlertCondition(dashboardID, panelID, conditionRefID, c.SignedInUser, c.SkipCache)
|
||||
if err != nil {
|
||||
return Error(400, "Failed to load conditions", err)
|
||||
}
|
||||
|
||||
alertCtx, cancelFn := context.WithTimeout(context.Background(), setting.AlertingEvaluationTimeout)
|
||||
defer cancelFn()
|
||||
|
||||
alertExecCtx := eval.AlertExecCtx{Ctx: alertCtx, SignedInUser: c.SignedInUser}
|
||||
|
||||
execResult, err := conditions.Execute(alertExecCtx, fromStr, toStr)
|
||||
if err != nil {
|
||||
return Error(400, "Failed to execute conditions", err)
|
||||
}
|
||||
|
||||
evalResults, err := eval.EvaluateExecutionResult(execResult)
|
||||
if err != nil {
|
||||
return Error(400, "Failed to evaluate results", err)
|
||||
}
|
||||
|
||||
frame := evalResults.AsDataFrame()
|
||||
|
||||
df := tsdb.NewDecodedDataFrames([]*data.Frame{&frame})
|
||||
instances, err := df.Encoded()
|
||||
if err != nil {
|
||||
return Error(400, "Failed to encode result dataframes", err)
|
||||
}
|
||||
|
||||
return JSON(200, util.DynMap{
|
||||
"instances": instances,
|
||||
})
|
||||
}
|
@ -33,7 +33,7 @@ var (
|
||||
)
|
||||
|
||||
func init() {
|
||||
registry.RegisterService(&manager{})
|
||||
registry.RegisterServiceWithPriority(&manager{}, registry.MediumHigh)
|
||||
}
|
||||
|
||||
// Manager manages backend plugins.
|
||||
|
@ -74,7 +74,7 @@ func NewTransformWrapper(log log.Logger, plugin sdkgrpcplugin.TransformClient) *
|
||||
type TransformWrapper struct {
|
||||
sdkgrpcplugin.TransformClient
|
||||
logger log.Logger
|
||||
callback *transformCallback
|
||||
Callback *transformCallback
|
||||
}
|
||||
|
||||
func (tw *TransformWrapper) Transform(ctx context.Context, query *tsdb.TsdbQuery) (*tsdb.Response, error) {
|
||||
@ -102,7 +102,7 @@ func (tw *TransformWrapper) Transform(ctx context.Context, query *tsdb.TsdbQuery
|
||||
},
|
||||
})
|
||||
}
|
||||
pbRes, err := tw.TransformClient.TransformData(ctx, pbQuery, tw.callback)
|
||||
pbRes, err := tw.TransformClient.TransformData(ctx, pbQuery, tw.Callback)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -16,6 +16,14 @@ type Descriptor struct {
|
||||
|
||||
var services []*Descriptor
|
||||
|
||||
func RegisterServiceWithPriority(instance Service, priority Priority) {
|
||||
services = append(services, &Descriptor{
|
||||
Name: reflect.TypeOf(instance).Elem().Name(),
|
||||
Instance: instance,
|
||||
InitPriority: priority,
|
||||
})
|
||||
}
|
||||
|
||||
func RegisterService(instance Service) {
|
||||
services = append(services, &Descriptor{
|
||||
Name: reflect.TypeOf(instance).Elem().Name(),
|
||||
@ -111,7 +119,8 @@ func IsDisabled(srv Service) bool {
|
||||
type Priority int
|
||||
|
||||
const (
|
||||
High Priority = 100
|
||||
Medium Priority = 50
|
||||
Low Priority = 0
|
||||
High Priority = 100
|
||||
MediumHigh Priority = 75
|
||||
Medium Priority = 50
|
||||
Low Priority = 0
|
||||
)
|
||||
|
@ -35,6 +35,7 @@ import (
|
||||
_ "github.com/grafana/grafana/pkg/services/alerting"
|
||||
_ "github.com/grafana/grafana/pkg/services/auth"
|
||||
_ "github.com/grafana/grafana/pkg/services/cleanup"
|
||||
_ "github.com/grafana/grafana/pkg/services/ngalert"
|
||||
_ "github.com/grafana/grafana/pkg/services/notifications"
|
||||
_ "github.com/grafana/grafana/pkg/services/provisioning"
|
||||
_ "github.com/grafana/grafana/pkg/services/rendering"
|
||||
|
15
pkg/services/ngalert/alert_definition.go
Normal file
15
pkg/services/ngalert/alert_definition.go
Normal file
@ -0,0 +1,15 @@
|
||||
package ngalert
|
||||
|
||||
import "fmt"
|
||||
|
||||
// preSave sets datasource and loads the updated model for each alert query.
|
||||
func (alertDefinition *AlertDefinition) preSave() error {
|
||||
for i, q := range alertDefinition.Data {
|
||||
err := q.PreSave(alertDefinition.OrgId)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid alert query %s: %w", q.RefID, err)
|
||||
}
|
||||
alertDefinition.Data[i] = q
|
||||
}
|
||||
return nil
|
||||
}
|
183
pkg/services/ngalert/api.go
Normal file
183
pkg/services/ngalert/api.go
Normal file
@ -0,0 +1,183 @@
|
||||
package ngalert
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||
|
||||
"github.com/go-macaron/binding"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana/pkg/api"
|
||||
"github.com/grafana/grafana/pkg/api/routing"
|
||||
"github.com/grafana/grafana/pkg/middleware"
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/tsdb"
|
||||
"github.com/grafana/grafana/pkg/util"
|
||||
)
|
||||
|
||||
func (ng *AlertNG) registerAPIEndpoints() {
|
||||
ng.RouteRegister.Group("/api/alert-definitions", func(alertDefinitions routing.RouteRegister) {
|
||||
alertDefinitions.Get("", middleware.ReqSignedIn, api.Wrap(ng.listAlertDefinitions))
|
||||
alertDefinitions.Get("/eval/:alertDefinitionId", ng.validateOrgAlertDefinition, api.Wrap(ng.alertDefinitionEval))
|
||||
alertDefinitions.Post("/eval", middleware.ReqSignedIn, binding.Bind(evalAlertConditionCommand{}), api.Wrap(ng.conditionEval))
|
||||
alertDefinitions.Get("/:alertDefinitionId", ng.validateOrgAlertDefinition, api.Wrap(ng.getAlertDefinitionEndpoint))
|
||||
alertDefinitions.Delete("/:alertDefinitionId", ng.validateOrgAlertDefinition, api.Wrap(ng.deleteAlertDefinitionEndpoint))
|
||||
alertDefinitions.Post("/", middleware.ReqSignedIn, binding.Bind(saveAlertDefinitionCommand{}), api.Wrap(ng.createAlertDefinitionEndpoint))
|
||||
alertDefinitions.Put("/:alertDefinitionId", ng.validateOrgAlertDefinition, binding.Bind(updateAlertDefinitionCommand{}), api.Wrap(ng.updateAlertDefinitionEndpoint))
|
||||
})
|
||||
}
|
||||
|
||||
// conditionEval handles POST /api/alert-definitions/eval.
|
||||
func (ng *AlertNG) conditionEval(c *models.ReqContext, dto evalAlertConditionCommand) api.Response {
|
||||
alertCtx, cancelFn := context.WithTimeout(context.Background(), setting.AlertingEvaluationTimeout)
|
||||
defer cancelFn()
|
||||
|
||||
alertExecCtx := eval.AlertExecCtx{Ctx: alertCtx, SignedInUser: c.SignedInUser}
|
||||
|
||||
fromStr := c.Query("from")
|
||||
if fromStr == "" {
|
||||
fromStr = "now-3h"
|
||||
}
|
||||
|
||||
toStr := c.Query("to")
|
||||
if toStr == "" {
|
||||
toStr = "now"
|
||||
}
|
||||
|
||||
execResult, err := dto.Condition.Execute(alertExecCtx, fromStr, toStr)
|
||||
if err != nil {
|
||||
return api.Error(400, "Failed to execute conditions", err)
|
||||
}
|
||||
|
||||
evalResults, err := eval.EvaluateExecutionResult(execResult)
|
||||
if err != nil {
|
||||
return api.Error(400, "Failed to evaluate results", err)
|
||||
}
|
||||
|
||||
frame := evalResults.AsDataFrame()
|
||||
df := tsdb.NewDecodedDataFrames([]*data.Frame{&frame})
|
||||
instances, err := df.Encoded()
|
||||
if err != nil {
|
||||
return api.Error(400, "Failed to encode result dataframes", err)
|
||||
}
|
||||
|
||||
return api.JSON(200, util.DynMap{
|
||||
"instances": instances,
|
||||
})
|
||||
}
|
||||
|
||||
// alertDefinitionEval handles GET /api/alert-definitions/eval/:dashboardId/:panelId/:refId".
|
||||
func (ng *AlertNG) alertDefinitionEval(c *models.ReqContext) api.Response {
|
||||
alertDefinitionID := c.ParamsInt64(":alertDefinitionId")
|
||||
|
||||
fromStr := c.Query("from")
|
||||
if fromStr == "" {
|
||||
fromStr = "now-3h"
|
||||
}
|
||||
|
||||
toStr := c.Query("to")
|
||||
if toStr == "" {
|
||||
toStr = "now"
|
||||
}
|
||||
|
||||
conditions, err := ng.LoadAlertCondition(alertDefinitionID, c.SignedInUser, c.SkipCache)
|
||||
if err != nil {
|
||||
return api.Error(400, "Failed to load conditions", err)
|
||||
}
|
||||
|
||||
alertCtx, cancelFn := context.WithTimeout(context.Background(), setting.AlertingEvaluationTimeout)
|
||||
defer cancelFn()
|
||||
|
||||
alertExecCtx := eval.AlertExecCtx{Ctx: alertCtx, SignedInUser: c.SignedInUser}
|
||||
|
||||
execResult, err := conditions.Execute(alertExecCtx, fromStr, toStr)
|
||||
if err != nil {
|
||||
return api.Error(400, "Failed to execute conditions", err)
|
||||
}
|
||||
|
||||
evalResults, err := eval.EvaluateExecutionResult(execResult)
|
||||
if err != nil {
|
||||
return api.Error(400, "Failed to evaluate results", err)
|
||||
}
|
||||
|
||||
frame := evalResults.AsDataFrame()
|
||||
|
||||
df := tsdb.NewDecodedDataFrames([]*data.Frame{&frame})
|
||||
instances, err := df.Encoded()
|
||||
if err != nil {
|
||||
return api.Error(400, "Failed to encode result dataframes", err)
|
||||
}
|
||||
|
||||
return api.JSON(200, util.DynMap{
|
||||
"instances": instances,
|
||||
})
|
||||
}
|
||||
|
||||
// getAlertDefinitionEndpoint handles GET /api/alert-definitions/:alertDefinitionId.
|
||||
func (ng *AlertNG) getAlertDefinitionEndpoint(c *models.ReqContext) api.Response {
|
||||
alertDefinitionID := c.ParamsInt64(":alertDefinitionId")
|
||||
|
||||
query := getAlertDefinitionByIDQuery{
|
||||
ID: alertDefinitionID,
|
||||
}
|
||||
|
||||
if err := ng.getAlertDefinitionByID(&query); err != nil {
|
||||
return api.Error(500, "Failed to get alert definition", err)
|
||||
}
|
||||
|
||||
return api.JSON(200, &query.Result)
|
||||
}
|
||||
|
||||
// deleteAlertDefinitionEndpoint handles DELETE /api/alert-definitions/:alertDefinitionId.
|
||||
func (ng *AlertNG) deleteAlertDefinitionEndpoint(c *models.ReqContext) api.Response {
|
||||
alertDefinitionID := c.ParamsInt64(":alertDefinitionId")
|
||||
|
||||
query := deleteAlertDefinitionByIDQuery{
|
||||
ID: alertDefinitionID,
|
||||
OrgID: c.SignedInUser.OrgId,
|
||||
}
|
||||
|
||||
if err := ng.deleteAlertDefinitionByID(&query); err != nil {
|
||||
return api.Error(500, "Failed to delete alert definition", err)
|
||||
}
|
||||
|
||||
return api.JSON(200, util.DynMap{"affectedRows": query.RowsAffected})
|
||||
}
|
||||
|
||||
// updateAlertDefinitionEndpoint handles PUT /api/alert-definitions/:alertDefinitionId.
|
||||
func (ng *AlertNG) updateAlertDefinitionEndpoint(c *models.ReqContext, cmd updateAlertDefinitionCommand) api.Response {
|
||||
cmd.ID = c.ParamsInt64(":alertDefinitionId")
|
||||
cmd.SignedInUser = c.SignedInUser
|
||||
cmd.SkipCache = c.SkipCache
|
||||
|
||||
if err := ng.updateAlertDefinition(&cmd); err != nil {
|
||||
return api.Error(500, "Failed to update alert definition", err)
|
||||
}
|
||||
|
||||
return api.JSON(200, util.DynMap{"affectedRows": cmd.RowsAffected, "id": cmd.Result.Id})
|
||||
}
|
||||
|
||||
// createAlertDefinitionEndpoint handles POST /api/alert-definitions.
|
||||
func (ng *AlertNG) createAlertDefinitionEndpoint(c *models.ReqContext, cmd saveAlertDefinitionCommand) api.Response {
|
||||
cmd.OrgID = c.SignedInUser.OrgId
|
||||
cmd.SignedInUser = c.SignedInUser
|
||||
cmd.SkipCache = c.SkipCache
|
||||
|
||||
if err := ng.saveAlertDefinition(&cmd); err != nil {
|
||||
return api.Error(500, "Failed to create alert definition", err)
|
||||
}
|
||||
|
||||
return api.JSON(200, util.DynMap{"id": cmd.Result.Id})
|
||||
}
|
||||
|
||||
// listAlertDefinitions handles GET /api/alert-definitions.
|
||||
func (ng *AlertNG) listAlertDefinitions(c *models.ReqContext) api.Response {
|
||||
cmd := listAlertDefinitionsCommand{OrgID: c.SignedInUser.OrgId}
|
||||
|
||||
if err := ng.getAlertDefinitions(&cmd); err != nil {
|
||||
return api.Error(500, "Failed to list alert definitions", err)
|
||||
}
|
||||
|
||||
return api.JSON(200, util.DynMap{"results": cmd.Result})
|
||||
}
|
121
pkg/services/ngalert/database.go
Normal file
121
pkg/services/ngalert/database.go
Normal file
@ -0,0 +1,121 @@
|
||||
package ngalert
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||
)
|
||||
|
||||
func getAlertDefinitionByID(alertDefinitionID int64, sess *sqlstore.DBSession) (*AlertDefinition, error) {
|
||||
alertDefinition := AlertDefinition{}
|
||||
has, err := sess.ID(alertDefinitionID).Get(&alertDefinition)
|
||||
if !has {
|
||||
return nil, errAlertDefinitionNotFound
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &alertDefinition, nil
|
||||
}
|
||||
|
||||
// deleteAlertDefinitionByID is a handler for deleting an alert definition.
|
||||
// It returns models.ErrAlertDefinitionNotFound if no alert definition is found for the provided ID.
|
||||
func (ng *AlertNG) deleteAlertDefinitionByID(query *deleteAlertDefinitionByIDQuery) error {
|
||||
return ng.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
|
||||
res, err := sess.Exec("DELETE FROM alert_definition WHERE id = ?", query.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rowsAffected, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
query.RowsAffected = rowsAffected
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// getAlertDefinitionByID is a handler for retrieving an alert definition from that database by its ID.
|
||||
// It returns models.ErrAlertDefinitionNotFound if no alert definition is found for the provided ID.
|
||||
func (ng *AlertNG) getAlertDefinitionByID(query *getAlertDefinitionByIDQuery) error {
|
||||
return ng.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
|
||||
alertDefinition, err := getAlertDefinitionByID(query.ID, sess)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
query.Result = alertDefinition
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// saveAlertDefinition is a handler for saving a new alert definition.
|
||||
func (ng *AlertNG) saveAlertDefinition(cmd *saveAlertDefinitionCommand) error {
|
||||
return ng.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
|
||||
alertDefinition := &AlertDefinition{
|
||||
OrgId: cmd.OrgID,
|
||||
Name: cmd.Name,
|
||||
Condition: cmd.Condition.RefID,
|
||||
Data: cmd.Condition.QueriesAndExpressions,
|
||||
}
|
||||
|
||||
if err := ng.validateAlertDefinition(alertDefinition, cmd.SignedInUser, cmd.SkipCache); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := alertDefinition.preSave(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := sess.Insert(alertDefinition); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cmd.Result = alertDefinition
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// updateAlertDefinition is a handler for updating an existing alert definition.
|
||||
// It returns models.ErrAlertDefinitionNotFound if no alert definition is found for the provided ID.
|
||||
func (ng *AlertNG) updateAlertDefinition(cmd *updateAlertDefinitionCommand) error {
|
||||
return ng.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
|
||||
alertDefinition := &AlertDefinition{
|
||||
Id: cmd.ID,
|
||||
Name: cmd.Name,
|
||||
Condition: cmd.Condition.RefID,
|
||||
Data: cmd.Condition.QueriesAndExpressions,
|
||||
}
|
||||
|
||||
if err := ng.validateAlertDefinition(alertDefinition, cmd.SignedInUser, cmd.SkipCache); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := alertDefinition.preSave(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
affectedRows, err := sess.ID(cmd.ID).Update(alertDefinition)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cmd.Result = alertDefinition
|
||||
cmd.RowsAffected = affectedRows
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// getAlertDefinitions is a handler for retrieving alert definitions of specific organisation.
|
||||
func (ng *AlertNG) getAlertDefinitions(cmd *listAlertDefinitionsCommand) error {
|
||||
return ng.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
|
||||
alertDefinitions := make([]*AlertDefinition, 0)
|
||||
q := "SELECT * FROM alert_definition WHERE org_id = ?"
|
||||
if err := sess.SQL(q, cmd.OrgID).Find(&alertDefinitions); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cmd.Result = alertDefinitions
|
||||
return nil
|
||||
})
|
||||
}
|
181
pkg/services/ngalert/database_test.go
Normal file
181
pkg/services/ngalert/database_test.go
Normal file
@ -0,0 +1,181 @@
|
||||
// +build integration
|
||||
|
||||
package ngalert
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestCreatingAlertDefinition(t *testing.T) {
|
||||
t.Run("should fail gracefully when creating alert definition with invalid relative time range", func(t *testing.T) {
|
||||
ng := setupTestEnv(t)
|
||||
q := saveAlertDefinitionCommand{
|
||||
OrgID: 1,
|
||||
Name: "something completely different",
|
||||
Condition: condition{
|
||||
RefID: "B",
|
||||
QueriesAndExpressions: []eval.AlertQuery{
|
||||
{
|
||||
Model: json.RawMessage(`{
|
||||
"datasource": "__expr__",
|
||||
"type":"math",
|
||||
"expression":"2 + 3 > 1"
|
||||
}`),
|
||||
RefID: "B",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
err := ng.saveAlertDefinition(&q)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestUpdatingAlertDefinition(t *testing.T) {
|
||||
t.Run("zero rows affected when updating unknown alert", func(t *testing.T) {
|
||||
ng := setupTestEnv(t)
|
||||
|
||||
q := updateAlertDefinitionCommand{
|
||||
ID: 1,
|
||||
OrgID: 1,
|
||||
Name: "something completely different",
|
||||
Condition: condition{
|
||||
RefID: "A",
|
||||
QueriesAndExpressions: []eval.AlertQuery{
|
||||
{
|
||||
Model: json.RawMessage(`{
|
||||
"datasource": "__expr__",
|
||||
"type":"math",
|
||||
"expression":"2 + 2 > 1"
|
||||
}`),
|
||||
RefID: "A",
|
||||
RelativeTimeRange: eval.RelativeTimeRange{
|
||||
From: eval.Duration(time.Duration(5) * time.Hour),
|
||||
To: eval.Duration(time.Duration(3) * time.Hour),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err := ng.updateAlertDefinition(&q)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int64(0), q.RowsAffected)
|
||||
})
|
||||
|
||||
t.Run("updating successfully existing alert", func(t *testing.T) {
|
||||
ng := setupTestEnv(t)
|
||||
alertDefinition := createTestAlertDefinition(t, ng)
|
||||
|
||||
q := updateAlertDefinitionCommand{
|
||||
ID: (*alertDefinition).Id,
|
||||
OrgID: 1,
|
||||
Name: "something completely different",
|
||||
Condition: condition{
|
||||
RefID: "B",
|
||||
QueriesAndExpressions: []eval.AlertQuery{
|
||||
{
|
||||
Model: json.RawMessage(`{
|
||||
"datasource": "__expr__",
|
||||
"type":"math",
|
||||
"expression":"2 + 3 > 1"
|
||||
}`),
|
||||
RefID: "B",
|
||||
RelativeTimeRange: eval.RelativeTimeRange{
|
||||
From: eval.Duration(5 * time.Hour),
|
||||
To: eval.Duration(3 * time.Hour),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err := ng.updateAlertDefinition(&q)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int64(1), q.RowsAffected)
|
||||
assert.Equal(t, int64(1), q.Result.Id)
|
||||
|
||||
getAlertDefinitionByIDQuery := getAlertDefinitionByIDQuery{ID: (*alertDefinition).Id}
|
||||
err = ng.getAlertDefinitionByID(&getAlertDefinitionByIDQuery)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "something completely different", getAlertDefinitionByIDQuery.Result.Name)
|
||||
assert.Equal(t, "B", getAlertDefinitionByIDQuery.Result.Condition)
|
||||
assert.Equal(t, 1, len(getAlertDefinitionByIDQuery.Result.Data))
|
||||
})
|
||||
}
|
||||
|
||||
func TestDeletingAlertDefinition(t *testing.T) {
|
||||
t.Run("zero rows affected when deleting unknown alert", func(t *testing.T) {
|
||||
ng := setupTestEnv(t)
|
||||
|
||||
q := deleteAlertDefinitionByIDQuery{
|
||||
ID: 1,
|
||||
OrgID: 1,
|
||||
}
|
||||
|
||||
err := ng.deleteAlertDefinitionByID(&q)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int64(0), q.RowsAffected)
|
||||
})
|
||||
|
||||
t.Run("deleting successfully existing alert", func(t *testing.T) {
|
||||
ng := setupTestEnv(t)
|
||||
alertDefinition := createTestAlertDefinition(t, ng)
|
||||
|
||||
q := deleteAlertDefinitionByIDQuery{
|
||||
ID: (*alertDefinition).Id,
|
||||
OrgID: 1,
|
||||
}
|
||||
|
||||
err := ng.deleteAlertDefinitionByID(&q)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int64(1), q.RowsAffected)
|
||||
})
|
||||
}
|
||||
|
||||
func setupTestEnv(t *testing.T) AlertNG {
|
||||
sqlStore := sqlstore.InitTestDB(t)
|
||||
cfg := setting.Cfg{}
|
||||
cfg.FeatureToggles = map[string]bool{"ngalert": true}
|
||||
ng := AlertNG{
|
||||
SQLStore: sqlStore,
|
||||
Cfg: &cfg,
|
||||
}
|
||||
return ng
|
||||
}
|
||||
|
||||
func createTestAlertDefinition(t *testing.T, ng AlertNG) *AlertDefinition {
|
||||
cmd := saveAlertDefinitionCommand{
|
||||
OrgID: 1,
|
||||
Name: "an alert definition",
|
||||
Condition: condition{
|
||||
RefID: "A",
|
||||
QueriesAndExpressions: []eval.AlertQuery{
|
||||
{
|
||||
Model: json.RawMessage(`{
|
||||
"datasource": "__expr__",
|
||||
"type":"math",
|
||||
"expression":"2 + 2 > 1"
|
||||
}`),
|
||||
RelativeTimeRange: eval.RelativeTimeRange{
|
||||
From: eval.Duration(5 * time.Hour),
|
||||
To: eval.Duration(3 * time.Hour),
|
||||
},
|
||||
RefID: "A",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
err := ng.saveAlertDefinition(&cmd)
|
||||
require.NoError(t, err)
|
||||
return cmd.Result
|
||||
}
|
@ -1,282 +0,0 @@
|
||||
// Package eval executes the condition for an alert definition, evaluates the condition results, and
|
||||
// returns the alert instance states.
|
||||
package eval
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana/pkg/bus"
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/registry"
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
"github.com/grafana/grafana/pkg/tsdb"
|
||||
)
|
||||
|
||||
type minimalDashboard struct {
|
||||
Panels []struct {
|
||||
ID int64 `json:"id"`
|
||||
Datasource string `json:"datasource"`
|
||||
Targets []*simplejson.Json `json:"targets"`
|
||||
} `json:"panels"`
|
||||
}
|
||||
|
||||
// AlertNG is the service for evaluating the condition of an alert definition.
|
||||
type AlertNG struct {
|
||||
DatasourceCache datasources.CacheService `inject:""`
|
||||
}
|
||||
|
||||
func init() {
|
||||
registry.RegisterService(&AlertNG{})
|
||||
}
|
||||
|
||||
// Init initializes the AlertingService.
|
||||
func (ng *AlertNG) Init() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// AlertExecCtx is the context provided for executing an alert condition.
|
||||
type AlertExecCtx struct {
|
||||
AlertDefitionID int64
|
||||
SignedInUser *models.SignedInUser
|
||||
|
||||
Ctx context.Context
|
||||
}
|
||||
|
||||
// Condition contains backend expressions and queries and the RefID
|
||||
// of the query or expression that will be evaluated.
|
||||
type Condition struct {
|
||||
RefID string `json:"refId"`
|
||||
|
||||
QueriesAndExpressions []tsdb.Query `json:"queriesAndExpressions"`
|
||||
}
|
||||
|
||||
// ExecutionResults contains the unevaluated results from executing
|
||||
// a condition.
|
||||
type ExecutionResults struct {
|
||||
AlertDefinitionID int64
|
||||
|
||||
Error error
|
||||
|
||||
Results data.Frames
|
||||
}
|
||||
|
||||
// Results is a slice of evaluated alert instances states.
|
||||
type Results []Result
|
||||
|
||||
// Result contains the evaluated state of an alert instance
|
||||
// identified by its labels.
|
||||
type Result struct {
|
||||
Instance data.Labels
|
||||
State State // Enum
|
||||
}
|
||||
|
||||
// State is an enum of the evaluation state for an alert instance.
|
||||
type State int
|
||||
|
||||
const (
|
||||
// Normal is the eval state for an alert instance condition
|
||||
// that evaluated to false.
|
||||
Normal State = iota
|
||||
|
||||
// Alerting is the eval state for an alert instance condition
|
||||
// that evaluated to false.
|
||||
Alerting
|
||||
)
|
||||
|
||||
func (s State) String() string {
|
||||
return [...]string{"Normal", "Alerting"}[s]
|
||||
}
|
||||
|
||||
// IsValid checks the conditions validity
|
||||
func (c Condition) IsValid() bool {
|
||||
// TODO search for refIDs in QueriesAndExpressions
|
||||
return len(c.QueriesAndExpressions) != 0
|
||||
}
|
||||
|
||||
// LoadAlertCondition returns a Condition object for the given alertDefintionId.
|
||||
func (ng *AlertNG) LoadAlertCondition(dashboardID int64, panelID int64, conditionRefID string, signedInUser *models.SignedInUser, skipCache bool) (*Condition, error) {
|
||||
// get queries from the dashboard (because GEL expressions cannot be stored in alerts so far)
|
||||
getDashboardQuery := models.GetDashboardQuery{Id: dashboardID}
|
||||
if err := bus.Dispatch(&getDashboardQuery); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
blob, err := getDashboardQuery.Result.Data.MarshalJSON()
|
||||
if err != nil {
|
||||
return nil, errors.New("failed to marshal dashboard JSON")
|
||||
}
|
||||
var dash minimalDashboard
|
||||
err = json.Unmarshal(blob, &dash)
|
||||
if err != nil {
|
||||
return nil, errors.New("failed to unmarshal dashboard JSON")
|
||||
}
|
||||
|
||||
condition := Condition{}
|
||||
for _, p := range dash.Panels {
|
||||
if p.ID == panelID {
|
||||
panelDatasource := p.Datasource
|
||||
var ds *models.DataSource
|
||||
for i, query := range p.Targets {
|
||||
refID := query.Get("refId").MustString("A")
|
||||
queryDatasource := query.Get("datasource").MustString()
|
||||
|
||||
if i == 0 && queryDatasource != "__expr__" {
|
||||
dsName := panelDatasource
|
||||
if queryDatasource != "" {
|
||||
dsName = queryDatasource
|
||||
}
|
||||
|
||||
getDataSourceByNameQuery := models.GetDataSourceByNameQuery{Name: dsName, OrgId: getDashboardQuery.Result.OrgId}
|
||||
if err := bus.Dispatch(&getDataSourceByNameQuery); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ds, err = ng.DatasourceCache.GetDatasource(getDataSourceByNameQuery.Result.Id, signedInUser, skipCache)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if ds == nil {
|
||||
return nil, errors.New("no datasource reference found")
|
||||
}
|
||||
|
||||
if queryDatasource == "" {
|
||||
query.Set("datasource", ds.Name)
|
||||
}
|
||||
|
||||
if query.Get("datasourceId").MustString() == "" {
|
||||
query.Set("datasourceId", ds.Id)
|
||||
}
|
||||
|
||||
if query.Get("orgId").MustString() == "" { // GEL requires orgID inside the query JSON
|
||||
// need to decide which organization id is expected there
|
||||
// in grafana queries is passed the signed in user organization id:
|
||||
// https://github.com/grafana/grafana/blob/34a355fe542b511ed02976523aa6716aeb00bde6/packages/grafana-runtime/src/utils/DataSourceWithBackend.ts#L60
|
||||
// but I think that it should be datasource org id instead
|
||||
query.Set("orgId", 0)
|
||||
}
|
||||
|
||||
if query.Get("maxDataPoints").MustString() == "" { // GEL requires maxDataPoints inside the query JSON
|
||||
query.Set("maxDataPoints", 100)
|
||||
}
|
||||
|
||||
// intervalMS is calculated by the frontend
|
||||
// should we do something similar?
|
||||
if query.Get("intervalMs").MustString() == "" { // GEL requires intervalMs inside the query JSON
|
||||
query.Set("intervalMs", 1000)
|
||||
}
|
||||
|
||||
condition.QueriesAndExpressions = append(condition.QueriesAndExpressions, tsdb.Query{
|
||||
RefId: refID,
|
||||
MaxDataPoints: query.Get("maxDataPoints").MustInt64(100),
|
||||
IntervalMs: query.Get("intervalMs").MustInt64(1000),
|
||||
QueryType: query.Get("queryType").MustString(""),
|
||||
Model: query,
|
||||
DataSource: ds,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
condition.RefID = conditionRefID
|
||||
return &condition, nil
|
||||
}
|
||||
|
||||
// Execute runs the Condition's expressions or queries.
|
||||
func (c *Condition) Execute(ctx AlertExecCtx, fromStr, toStr string) (*ExecutionResults, error) {
|
||||
result := ExecutionResults{}
|
||||
if !c.IsValid() {
|
||||
return nil, fmt.Errorf("invalid conditions")
|
||||
}
|
||||
|
||||
request := &tsdb.TsdbQuery{
|
||||
TimeRange: tsdb.NewTimeRange(fromStr, toStr),
|
||||
Debug: true,
|
||||
User: ctx.SignedInUser,
|
||||
}
|
||||
for i := range c.QueriesAndExpressions {
|
||||
request.Queries = append(request.Queries, &c.QueriesAndExpressions[i])
|
||||
}
|
||||
|
||||
resp, err := plugins.Transform.Transform(ctx.Ctx, request)
|
||||
if err != nil {
|
||||
result.Error = err
|
||||
return &result, err
|
||||
}
|
||||
|
||||
conditionResult := resp.Results[c.RefID]
|
||||
if conditionResult == nil {
|
||||
err = fmt.Errorf("no GEL results")
|
||||
result.Error = err
|
||||
return &result, err
|
||||
}
|
||||
|
||||
result.Results, err = conditionResult.Dataframes.Decoded()
|
||||
if err != nil {
|
||||
result.Error = err
|
||||
return &result, err
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
// EvaluateExecutionResult takes the ExecutionResult, and returns a frame where
|
||||
// each column is a string type that holds a string representing its state.
|
||||
func EvaluateExecutionResult(results *ExecutionResults) (Results, error) {
|
||||
evalResults := make([]Result, 0)
|
||||
labels := make(map[string]bool)
|
||||
for _, f := range results.Results {
|
||||
rowLen, err := f.RowLen()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to get frame row length")
|
||||
}
|
||||
if rowLen > 1 {
|
||||
return nil, fmt.Errorf("invalid frame %v: row length %v", f.Name, rowLen)
|
||||
}
|
||||
|
||||
if len(f.Fields) > 1 {
|
||||
return nil, fmt.Errorf("invalid frame %v: field length %v", f.Name, len(f.Fields))
|
||||
}
|
||||
|
||||
if f.Fields[0].Type() != data.FieldTypeNullableFloat64 {
|
||||
return nil, fmt.Errorf("invalid frame %v: field type %v", f.Name, f.Fields[0].Type())
|
||||
}
|
||||
|
||||
labelsStr := f.Fields[0].Labels.String()
|
||||
_, ok := labels[labelsStr]
|
||||
if ok {
|
||||
return nil, fmt.Errorf("invalid frame %v: frames cannot uniquely be identified by its labels: %q", f.Name, labelsStr)
|
||||
}
|
||||
labels[labelsStr] = true
|
||||
|
||||
state := Normal
|
||||
val, err := f.Fields[0].FloatAt(0)
|
||||
if err != nil || val != 0 {
|
||||
state = Alerting
|
||||
}
|
||||
|
||||
evalResults = append(evalResults, Result{
|
||||
Instance: f.Fields[0].Labels,
|
||||
State: state,
|
||||
})
|
||||
}
|
||||
return evalResults, nil
|
||||
}
|
||||
|
||||
// AsDataFrame forms the EvalResults in Frame suitable for displaying in the table panel of the front end.
|
||||
// This may be temporary, as there might be a fair amount we want to display in the frontend, and it might not make sense to store that in data.Frame.
|
||||
// For the first pass, I would expect a Frame with a single row, and a column for each instance with a boolean value.
|
||||
func (evalResults Results) AsDataFrame() data.Frame {
|
||||
fields := make([]*data.Field, 0)
|
||||
for _, evalResult := range evalResults {
|
||||
fields = append(fields, data.NewField("", evalResult.Instance, []bool{evalResult.State != Normal}))
|
||||
}
|
||||
f := data.NewFrame("", fields...)
|
||||
return *f
|
||||
}
|
294
pkg/services/ngalert/eval/alert_query.go
Normal file
294
pkg/services/ngalert/eval/alert_query.go
Normal file
@ -0,0 +1,294 @@
|
||||
package eval
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2"
|
||||
)
|
||||
|
||||
const defaultMaxDataPoints float64 = 100
|
||||
const defaultIntervalMS float64 = 1000
|
||||
|
||||
// DefaultExprDatasourceID is the datasource identifier for expressions.:w
|
||||
const DefaultExprDatasourceID = -100
|
||||
|
||||
// Duration is a type used for marshalling durations.
|
||||
type Duration time.Duration
|
||||
|
||||
func (d Duration) String() string {
|
||||
return time.Duration(d).String()
|
||||
}
|
||||
|
||||
func (d Duration) MarshalJSON() ([]byte, error) {
|
||||
return json.Marshal(time.Duration(d).Seconds())
|
||||
}
|
||||
|
||||
func (d *Duration) UnmarshalJSON(b []byte) error {
|
||||
var v interface{}
|
||||
if err := json.Unmarshal(b, &v); err != nil {
|
||||
return err
|
||||
}
|
||||
switch value := v.(type) {
|
||||
case float64:
|
||||
*d = Duration(time.Duration(value) * time.Second)
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("invalid duration %v", v)
|
||||
}
|
||||
}
|
||||
|
||||
// RelativeTimeRange is the per query start and end time
|
||||
// for requests.
|
||||
type RelativeTimeRange struct {
|
||||
From Duration
|
||||
To Duration
|
||||
}
|
||||
|
||||
// isValid checks that From duration is greater than To duration.
|
||||
func (rtr *RelativeTimeRange) isValid() bool {
|
||||
return rtr.From > rtr.To
|
||||
}
|
||||
|
||||
func (rtr *RelativeTimeRange) toTimeRange(now time.Time) *pluginv2.TimeRange {
|
||||
return &pluginv2.TimeRange{
|
||||
FromEpochMS: now.Add(-time.Duration(rtr.From)).UnixNano() / 1e6,
|
||||
ToEpochMS: now.Add(-time.Duration(rtr.To)).UnixNano() / 1e6,
|
||||
}
|
||||
}
|
||||
|
||||
// AlertQuery represents a single query associated with an alert definition.
|
||||
type AlertQuery struct {
|
||||
// RefID is the unique identifier of the query, set by the frontend call.
|
||||
RefID string `json:"refId"`
|
||||
|
||||
// QueryType is an optional identifier for the type of query.
|
||||
// It can be used to distinguish different types of queries.
|
||||
QueryType string `json:"queryType"`
|
||||
|
||||
// RelativeTimeRange is the relative Start and End of the query as sent by the frontend.
|
||||
RelativeTimeRange RelativeTimeRange `json:"relativeTimeRange"`
|
||||
|
||||
DatasourceID int64 `json:"-"`
|
||||
|
||||
// JSON is the raw JSON query and includes the above properties as well as custom properties.
|
||||
Model json.RawMessage `json:"model"`
|
||||
|
||||
modelProps map[string]interface{} `json:"-"`
|
||||
}
|
||||
|
||||
func (aq *AlertQuery) setModelProps() error {
|
||||
aq.modelProps = make(map[string]interface{})
|
||||
err := json.Unmarshal(aq.Model, &aq.modelProps)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to unmarshal query model: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// setDatasource sets DatasourceID.
|
||||
// If it's an expression sets DefaultExprDatasourceID.
|
||||
func (aq *AlertQuery) setDatasource() error {
|
||||
if aq.modelProps == nil {
|
||||
err := aq.setModelProps()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
dsName, ok := aq.modelProps["datasource"]
|
||||
if !ok {
|
||||
return fmt.Errorf("failed to get datasource from query model")
|
||||
}
|
||||
|
||||
if dsName == "__expr__" {
|
||||
aq.DatasourceID = DefaultExprDatasourceID
|
||||
aq.modelProps["datasourceId"] = DefaultExprDatasourceID
|
||||
return nil
|
||||
}
|
||||
|
||||
i, ok := aq.modelProps["datasourceId"]
|
||||
if !ok {
|
||||
return fmt.Errorf("failed to get datasourceId from query model")
|
||||
}
|
||||
dsID, ok := i.(float64)
|
||||
if !ok {
|
||||
return fmt.Errorf("failed to cast datasourceId to float64: %v", i)
|
||||
}
|
||||
aq.DatasourceID = int64(dsID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsExpression returns true if the alert query is an expression.
|
||||
func (aq *AlertQuery) IsExpression() (bool, error) {
|
||||
err := aq.setDatasource()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return aq.DatasourceID == DefaultExprDatasourceID, nil
|
||||
}
|
||||
|
||||
// setMaxDatapoints sets the model maxDataPoints if it's missing or invalid
|
||||
func (aq *AlertQuery) setMaxDatapoints() error {
|
||||
if aq.modelProps == nil {
|
||||
err := aq.setModelProps()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
i, ok := aq.modelProps["maxDataPoints"] // GEL requires maxDataPoints inside the query JSON
|
||||
if !ok {
|
||||
aq.modelProps["maxDataPoints"] = defaultMaxDataPoints
|
||||
}
|
||||
maxDataPoints, ok := i.(float64)
|
||||
if !ok || maxDataPoints == 0 {
|
||||
aq.modelProps["maxDataPoints"] = defaultMaxDataPoints
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (aq *AlertQuery) getMaxDatapoints() (int64, error) {
|
||||
err := aq.setMaxDatapoints()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
maxDataPoints, ok := aq.modelProps["maxDataPoints"].(float64)
|
||||
if !ok {
|
||||
return 0, fmt.Errorf("failed to cast maxDataPoints to float64: %v", aq.modelProps["maxDataPoints"])
|
||||
}
|
||||
return int64(maxDataPoints), nil
|
||||
}
|
||||
|
||||
// setIntervalMS sets the model IntervalMs if it's missing or invalid
|
||||
func (aq *AlertQuery) setIntervalMS() error {
|
||||
if aq.modelProps == nil {
|
||||
err := aq.setModelProps()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
i, ok := aq.modelProps["intervalMs"] // GEL requires intervalMs inside the query JSON
|
||||
if !ok {
|
||||
aq.modelProps["intervalMs"] = defaultIntervalMS
|
||||
}
|
||||
intervalMs, ok := i.(float64)
|
||||
if !ok || intervalMs == 0 {
|
||||
aq.modelProps["intervalMs"] = defaultIntervalMS
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (aq *AlertQuery) getIntervalMS() (int64, error) {
|
||||
err := aq.setIntervalMS()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
intervalMs, ok := aq.modelProps["intervalMs"].(float64)
|
||||
if !ok {
|
||||
return 0, fmt.Errorf("failed to cast intervalMs to float64: %v", aq.modelProps["intervalMs"])
|
||||
}
|
||||
return int64(intervalMs), nil
|
||||
}
|
||||
|
||||
// GetDatasource returns the query datasource identifier.
|
||||
func (aq *AlertQuery) GetDatasource() (int64, error) {
|
||||
err := aq.setDatasource()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return aq.DatasourceID, nil
|
||||
}
|
||||
|
||||
func (aq *AlertQuery) getModel() ([]byte, error) {
|
||||
err := aq.setDatasource()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = aq.setMaxDatapoints()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = aq.setIntervalMS()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
model, err := json.Marshal(aq.modelProps)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to marshal query model: %w", err)
|
||||
}
|
||||
return model, nil
|
||||
}
|
||||
|
||||
func (aq *AlertQuery) setOrgID(orgID int64) error {
|
||||
if aq.modelProps == nil {
|
||||
err := aq.setModelProps()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
aq.modelProps["orgId"] = orgID
|
||||
return nil
|
||||
}
|
||||
|
||||
func (aq *AlertQuery) setQueryType() error {
|
||||
if aq.modelProps == nil {
|
||||
err := aq.setModelProps()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
i, ok := aq.modelProps["queryType"]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
queryType, ok := i.(string)
|
||||
if !ok {
|
||||
return fmt.Errorf("failed to get queryType from query model: %v", i)
|
||||
}
|
||||
|
||||
aq.QueryType = queryType
|
||||
return nil
|
||||
}
|
||||
|
||||
// PreSave sets query's properties.
|
||||
// It should be called before being saved.
|
||||
func (aq *AlertQuery) PreSave(orgID int64) error {
|
||||
err := aq.setOrgID(orgID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to set orgId to query model: %w", err)
|
||||
}
|
||||
|
||||
if err := aq.setDatasource(); err != nil {
|
||||
return fmt.Errorf("failed to set datasource to query model: %w", err)
|
||||
}
|
||||
|
||||
if err := aq.setQueryType(); err != nil {
|
||||
return fmt.Errorf("failed to set query type to query model: %w", err)
|
||||
}
|
||||
|
||||
// override model
|
||||
model, err := aq.getModel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
aq.Model = model
|
||||
|
||||
isExpression, err := aq.IsExpression()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if ok := isExpression || aq.RelativeTimeRange.isValid(); !ok {
|
||||
return fmt.Errorf("invalid relative time range: %+v", aq.RelativeTimeRange)
|
||||
}
|
||||
return nil
|
||||
}
|
296
pkg/services/ngalert/eval/alert_query_test.go
Normal file
296
pkg/services/ngalert/eval/alert_query_test.go
Normal file
@ -0,0 +1,296 @@
|
||||
package eval
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestAlertQuery(t *testing.T) {
|
||||
testCases := []struct {
|
||||
desc string
|
||||
alertQuery AlertQuery
|
||||
expectedIsExpression bool
|
||||
expectedDatasource string
|
||||
expectedDatasourceID int64
|
||||
expectedMaxPoints int64
|
||||
expectedIntervalMS int64
|
||||
err error
|
||||
}{
|
||||
{
|
||||
desc: "given an expression query",
|
||||
alertQuery: AlertQuery{
|
||||
RefID: "A",
|
||||
Model: json.RawMessage(`{
|
||||
"datasource": "__expr__",
|
||||
"queryType": "metricQuery",
|
||||
"extraParam": "some text"
|
||||
}`),
|
||||
},
|
||||
expectedIsExpression: true,
|
||||
expectedDatasource: "__expr__",
|
||||
expectedDatasourceID: int64(DefaultExprDatasourceID),
|
||||
expectedMaxPoints: int64(defaultMaxDataPoints),
|
||||
expectedIntervalMS: int64(defaultIntervalMS),
|
||||
},
|
||||
{
|
||||
desc: "given a query",
|
||||
alertQuery: AlertQuery{
|
||||
RefID: "A",
|
||||
Model: json.RawMessage(`{
|
||||
"datasource": "my datasource",
|
||||
"datasourceId": 1,
|
||||
"queryType": "metricQuery",
|
||||
"extraParam": "some text"
|
||||
}`),
|
||||
},
|
||||
expectedIsExpression: false,
|
||||
expectedDatasource: "my datasource",
|
||||
expectedDatasourceID: 1,
|
||||
expectedMaxPoints: int64(defaultMaxDataPoints),
|
||||
expectedIntervalMS: int64(defaultIntervalMS),
|
||||
},
|
||||
{
|
||||
desc: "given a query with valid maxDataPoints",
|
||||
alertQuery: AlertQuery{
|
||||
RefID: "A",
|
||||
Model: json.RawMessage(`{
|
||||
"datasource": "my datasource",
|
||||
"datasourceId": 1,
|
||||
"queryType": "metricQuery",
|
||||
"maxDataPoints": 200,
|
||||
"extraParam": "some text"
|
||||
}`),
|
||||
},
|
||||
expectedIsExpression: false,
|
||||
expectedDatasource: "my datasource",
|
||||
expectedDatasourceID: 1,
|
||||
expectedMaxPoints: 200,
|
||||
expectedIntervalMS: int64(defaultIntervalMS),
|
||||
},
|
||||
{
|
||||
desc: "given a query with invalid maxDataPoints",
|
||||
alertQuery: AlertQuery{
|
||||
RefID: "A",
|
||||
Model: json.RawMessage(`{
|
||||
"datasource": "my datasource",
|
||||
"datasourceId": 1,
|
||||
"queryType": "metricQuery",
|
||||
"maxDataPoints": "invalid",
|
||||
"extraParam": "some text"
|
||||
}`),
|
||||
},
|
||||
expectedIsExpression: false,
|
||||
expectedDatasource: "my datasource",
|
||||
expectedDatasourceID: 1,
|
||||
expectedMaxPoints: int64(defaultMaxDataPoints),
|
||||
expectedIntervalMS: int64(defaultIntervalMS),
|
||||
},
|
||||
{
|
||||
desc: "given a query with zero maxDataPoints",
|
||||
alertQuery: AlertQuery{
|
||||
RefID: "A",
|
||||
Model: json.RawMessage(`{
|
||||
"datasource": "my datasource",
|
||||
"datasourceId": 1,
|
||||
"queryType": "metricQuery",
|
||||
"maxDataPoints": 0,
|
||||
"extraParam": "some text"
|
||||
}`),
|
||||
},
|
||||
expectedIsExpression: false,
|
||||
expectedDatasource: "my datasource",
|
||||
expectedDatasourceID: 1,
|
||||
expectedMaxPoints: int64(defaultMaxDataPoints),
|
||||
expectedIntervalMS: int64(defaultIntervalMS),
|
||||
},
|
||||
{
|
||||
desc: "given a query with valid intervalMs",
|
||||
alertQuery: AlertQuery{
|
||||
RefID: "A",
|
||||
Model: json.RawMessage(`{
|
||||
"datasource": "my datasource",
|
||||
"datasourceId": 1,
|
||||
"queryType": "metricQuery",
|
||||
"intervalMs": 2000,
|
||||
"extraParam": "some text"
|
||||
}`),
|
||||
},
|
||||
expectedIsExpression: false,
|
||||
expectedDatasource: "my datasource",
|
||||
expectedDatasourceID: 1,
|
||||
expectedMaxPoints: int64(defaultMaxDataPoints),
|
||||
expectedIntervalMS: 2000,
|
||||
},
|
||||
{
|
||||
desc: "given a query with invalid intervalMs",
|
||||
alertQuery: AlertQuery{
|
||||
RefID: "A",
|
||||
Model: json.RawMessage(`{
|
||||
"datasource": "my datasource",
|
||||
"datasourceId": 1,
|
||||
"queryType": "metricQuery",
|
||||
"intervalMs": "invalid",
|
||||
"extraParam": "some text"
|
||||
}`),
|
||||
},
|
||||
expectedIsExpression: false,
|
||||
expectedDatasource: "my datasource",
|
||||
expectedDatasourceID: 1,
|
||||
expectedMaxPoints: int64(defaultMaxDataPoints),
|
||||
expectedIntervalMS: int64(defaultIntervalMS),
|
||||
},
|
||||
{
|
||||
desc: "given a query with invalid intervalMs",
|
||||
alertQuery: AlertQuery{
|
||||
RefID: "A",
|
||||
Model: json.RawMessage(`{
|
||||
"datasource": "my datasource",
|
||||
"datasourceId": 1,
|
||||
"queryType": "metricQuery",
|
||||
"intervalMs": 0,
|
||||
"extraParam": "some text"
|
||||
}`),
|
||||
},
|
||||
expectedIsExpression: false,
|
||||
expectedDatasource: "my datasource",
|
||||
expectedDatasourceID: 1,
|
||||
expectedMaxPoints: int64(defaultMaxDataPoints),
|
||||
expectedIntervalMS: int64(defaultIntervalMS),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
t.Run("can recognize if it's an expression", func(t *testing.T) {
|
||||
isExpression, err := tc.alertQuery.IsExpression()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tc.expectedIsExpression, isExpression)
|
||||
})
|
||||
|
||||
t.Run("can set datasource for expression", func(t *testing.T) {
|
||||
err := tc.alertQuery.setDatasource()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expectedDatasourceID, tc.alertQuery.DatasourceID)
|
||||
})
|
||||
|
||||
t.Run("can set queryType for expression", func(t *testing.T) {
|
||||
err := tc.alertQuery.setQueryType()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "metricQuery", tc.alertQuery.QueryType)
|
||||
})
|
||||
|
||||
t.Run("can update model maxDataPoints (if missing)", func(t *testing.T) {
|
||||
maxDataPoints, err := tc.alertQuery.getMaxDatapoints()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expectedMaxPoints, maxDataPoints)
|
||||
})
|
||||
|
||||
t.Run("can update model intervalMs (if missing)", func(t *testing.T) {
|
||||
intervalMS, err := tc.alertQuery.getIntervalMS()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, intervalMS, tc.expectedIntervalMS)
|
||||
})
|
||||
|
||||
t.Run("can get the updated model with the default properties (if missing)", func(t *testing.T) {
|
||||
blob, err := tc.alertQuery.getModel()
|
||||
require.NoError(t, err)
|
||||
model := make(map[string]interface{})
|
||||
err = json.Unmarshal(blob, &model)
|
||||
require.NoError(t, err)
|
||||
|
||||
i, ok := model["datasource"]
|
||||
require.True(t, ok)
|
||||
datasource, ok := i.(string)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, tc.expectedDatasource, datasource)
|
||||
|
||||
i, ok = model["datasourceId"]
|
||||
require.True(t, ok)
|
||||
datasourceID, ok := i.(float64)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, tc.expectedDatasourceID, int64(datasourceID))
|
||||
|
||||
i, ok = model["maxDataPoints"]
|
||||
require.True(t, ok)
|
||||
maxDataPoints, ok := i.(float64)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, tc.expectedMaxPoints, int64(maxDataPoints))
|
||||
|
||||
i, ok = model["intervalMs"]
|
||||
require.True(t, ok)
|
||||
intervalMs, ok := i.(float64)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, tc.expectedIntervalMS, int64(intervalMs))
|
||||
|
||||
i, ok = model["extraParam"]
|
||||
require.True(t, ok)
|
||||
extraParam, ok := i.(string)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, "some text", extraParam)
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestAlertQueryMarshalling(t *testing.T) {
|
||||
testCases := []struct {
|
||||
desc string
|
||||
blob string
|
||||
err error
|
||||
expectedFrom Duration
|
||||
expectedTo Duration
|
||||
}{
|
||||
{
|
||||
desc: "unmarshalling successfully when input is correct",
|
||||
blob: `{
|
||||
"refId": "B",
|
||||
"relativeTimeRange": {
|
||||
"from": 18000,
|
||||
"to": 10800
|
||||
},
|
||||
"model": {}
|
||||
}`,
|
||||
expectedFrom: Duration(5 * time.Hour),
|
||||
expectedTo: Duration(3 * time.Hour),
|
||||
},
|
||||
{
|
||||
desc: "failing unmarshalling gracefully when from is incorrect",
|
||||
blob: `{
|
||||
"refId": "B",
|
||||
"relativeTimeRange": {
|
||||
"from": "5h10m",
|
||||
"to": 18000
|
||||
},
|
||||
"model": {}
|
||||
}`,
|
||||
err: fmt.Errorf("invalid duration 5h10m"),
|
||||
},
|
||||
{
|
||||
desc: "failing unmarshalling gracefully when to is incorrect",
|
||||
blob: `{
|
||||
"refId": "B",
|
||||
"relativeTimeRange": {
|
||||
"from": 18000,
|
||||
"to": "5h10m"
|
||||
},
|
||||
"model": {}
|
||||
}`,
|
||||
err: fmt.Errorf("invalid duration 5h10m"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
var aq AlertQuery
|
||||
err := json.Unmarshal([]byte(tc.blob), &aq)
|
||||
require.Equal(t, tc.err, err)
|
||||
if tc.err == nil {
|
||||
assert.Equal(t, tc.expectedFrom, aq.RelativeTimeRange.From)
|
||||
assert.Equal(t, tc.expectedTo, aq.RelativeTimeRange.To)
|
||||
}
|
||||
}
|
||||
}
|
215
pkg/services/ngalert/eval/eval.go
Normal file
215
pkg/services/ngalert/eval/eval.go
Normal file
@ -0,0 +1,215 @@
|
||||
// Package eval executes the condition for an alert definition, evaluates the condition results, and
|
||||
// returns the alert instance states.
|
||||
package eval
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2"
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/tsdb"
|
||||
)
|
||||
|
||||
// invalidEvalResultFormatError is an error for invalid format of the alert definition evaluation results.
|
||||
type invalidEvalResultFormatError struct {
|
||||
refID string
|
||||
reason string
|
||||
err error
|
||||
}
|
||||
|
||||
func (e *invalidEvalResultFormatError) Error() string {
|
||||
s := fmt.Sprintf("invalid format of evaluation results for the alert definition %s: %s", e.refID, e.reason)
|
||||
if e.err != nil {
|
||||
s = fmt.Sprintf("%s: %s", s, e.err.Error())
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (e *invalidEvalResultFormatError) Unwrap() error {
|
||||
return e.err
|
||||
}
|
||||
|
||||
// Condition contains backend expressions and queries and the RefID
|
||||
// of the query or expression that will be evaluated.
|
||||
type Condition struct {
|
||||
RefID string `json:"refId"`
|
||||
|
||||
QueriesAndExpressions []AlertQuery `json:"queriesAndExpressions"`
|
||||
}
|
||||
|
||||
// ExecutionResults contains the unevaluated results from executing
|
||||
// a condition.
|
||||
type ExecutionResults struct {
|
||||
AlertDefinitionID int64
|
||||
|
||||
Error error
|
||||
|
||||
Results data.Frames
|
||||
}
|
||||
|
||||
// Results is a slice of evaluated alert instances states.
|
||||
type Results []result
|
||||
|
||||
// result contains the evaluated state of an alert instance
|
||||
// identified by its labels.
|
||||
type result struct {
|
||||
Instance data.Labels
|
||||
State state // Enum
|
||||
}
|
||||
|
||||
// state is an enum of the evaluation state for an alert instance.
|
||||
type state int
|
||||
|
||||
const (
|
||||
// Normal is the eval state for an alert instance condition
|
||||
// that evaluated to false.
|
||||
Normal state = iota
|
||||
|
||||
// Alerting is the eval state for an alert instance condition
|
||||
// that evaluated to false.
|
||||
Alerting
|
||||
)
|
||||
|
||||
func (s state) String() string {
|
||||
return [...]string{"Normal", "Alerting"}[s]
|
||||
}
|
||||
|
||||
// IsValid checks the condition's validity.
|
||||
func (c Condition) IsValid() bool {
|
||||
// TODO search for refIDs in QueriesAndExpressions
|
||||
return len(c.QueriesAndExpressions) != 0
|
||||
}
|
||||
|
||||
// AlertExecCtx is the context provided for executing an alert condition.
|
||||
type AlertExecCtx struct {
|
||||
AlertDefitionID int64
|
||||
SignedInUser *models.SignedInUser
|
||||
|
||||
Ctx context.Context
|
||||
}
|
||||
|
||||
// Execute runs the Condition's expressions or queries.
|
||||
func (c *Condition) Execute(ctx AlertExecCtx, fromStr, toStr string) (*ExecutionResults, error) {
|
||||
result := ExecutionResults{}
|
||||
if !c.IsValid() {
|
||||
return nil, fmt.Errorf("invalid conditions")
|
||||
}
|
||||
|
||||
pbQuery := &pluginv2.QueryDataRequest{
|
||||
PluginContext: &pluginv2.PluginContext{
|
||||
// TODO: Things probably
|
||||
},
|
||||
Queries: []*pluginv2.DataQuery{},
|
||||
}
|
||||
|
||||
for i := range c.QueriesAndExpressions {
|
||||
q := c.QueriesAndExpressions[i]
|
||||
model, err := q.getModel()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get query model: %w", err)
|
||||
}
|
||||
intervalMS, err := q.getIntervalMS()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to retrieve intervalMs from the model: %w", err)
|
||||
}
|
||||
|
||||
maxDatapoints, err := q.getMaxDatapoints()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to retrieve maxDatapoints from the model: %w", err)
|
||||
}
|
||||
|
||||
pbQuery.Queries = append(pbQuery.Queries, &pluginv2.DataQuery{
|
||||
Json: model,
|
||||
IntervalMS: intervalMS,
|
||||
RefId: q.RefID,
|
||||
MaxDataPoints: maxDatapoints,
|
||||
QueryType: q.QueryType,
|
||||
TimeRange: q.RelativeTimeRange.toTimeRange(time.Now()),
|
||||
})
|
||||
}
|
||||
|
||||
tw := plugins.Transform
|
||||
pbRes, err := tw.TransformClient.TransformData(ctx.Ctx, pbQuery, tw.Callback)
|
||||
if err != nil {
|
||||
return &result, err
|
||||
}
|
||||
|
||||
for refID, res := range pbRes.Responses {
|
||||
if refID != c.RefID {
|
||||
continue
|
||||
}
|
||||
df := tsdb.NewEncodedDataFrames(res.Frames)
|
||||
result.Results, err = df.Decoded()
|
||||
if err != nil {
|
||||
result.Error = err
|
||||
return &result, err
|
||||
}
|
||||
}
|
||||
|
||||
if len(result.Results) == 0 {
|
||||
err = fmt.Errorf("no GEL results")
|
||||
result.Error = err
|
||||
return &result, err
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
// EvaluateExecutionResult takes the ExecutionResult, and returns a frame where
|
||||
// each column is a string type that holds a string representing its state.
|
||||
func EvaluateExecutionResult(results *ExecutionResults) (Results, error) {
|
||||
evalResults := make([]result, 0)
|
||||
labels := make(map[string]bool)
|
||||
for _, f := range results.Results {
|
||||
rowLen, err := f.RowLen()
|
||||
if err != nil {
|
||||
return nil, &invalidEvalResultFormatError{refID: f.RefID, reason: "unable to get frame row length", err: err}
|
||||
}
|
||||
if rowLen > 1 {
|
||||
return nil, &invalidEvalResultFormatError{refID: f.RefID, reason: fmt.Sprintf("unexpected row length: %d instead of 1", rowLen)}
|
||||
}
|
||||
|
||||
if len(f.Fields) > 1 {
|
||||
return nil, &invalidEvalResultFormatError{refID: f.RefID, reason: fmt.Sprintf("unexpected field length: %d instead of 1", len(f.Fields))}
|
||||
}
|
||||
|
||||
if f.Fields[0].Type() != data.FieldTypeNullableFloat64 {
|
||||
return nil, &invalidEvalResultFormatError{refID: f.RefID, reason: fmt.Sprintf("invalid field type: %d", f.Fields[0].Type())}
|
||||
}
|
||||
|
||||
labelsStr := f.Fields[0].Labels.String()
|
||||
_, ok := labels[labelsStr]
|
||||
if ok {
|
||||
return nil, &invalidEvalResultFormatError{refID: f.RefID, reason: fmt.Sprintf("frame cannot uniquely be identified by its labels: %s", labelsStr)}
|
||||
}
|
||||
labels[labelsStr] = true
|
||||
|
||||
state := Normal
|
||||
val, err := f.Fields[0].FloatAt(0)
|
||||
if err != nil || val != 0 {
|
||||
state = Alerting
|
||||
}
|
||||
|
||||
evalResults = append(evalResults, result{
|
||||
Instance: f.Fields[0].Labels,
|
||||
State: state,
|
||||
})
|
||||
}
|
||||
return evalResults, nil
|
||||
}
|
||||
|
||||
// AsDataFrame forms the EvalResults in Frame suitable for displaying in the table panel of the front end.
|
||||
// This may be temporary, as there might be a fair amount we want to display in the frontend, and it might not make sense to store that in data.Frame.
|
||||
// For the first pass, I would expect a Frame with a single row, and a column for each instance with a boolean value.
|
||||
func (evalResults Results) AsDataFrame() data.Frame {
|
||||
fields := make([]*data.Field, 0)
|
||||
for _, evalResult := range evalResults {
|
||||
fields = append(fields, data.NewField("", evalResult.Instance, []bool{evalResult.State != Normal}))
|
||||
}
|
||||
f := data.NewFrame("", fields...)
|
||||
return *f
|
||||
}
|
20
pkg/services/ngalert/middleware.go
Normal file
20
pkg/services/ngalert/middleware.go
Normal file
@ -0,0 +1,20 @@
|
||||
package ngalert
|
||||
|
||||
import (
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
)
|
||||
|
||||
func (ng *AlertNG) validateOrgAlertDefinition(c *models.ReqContext) {
|
||||
id := c.ParamsInt64(":alertDefinitionId")
|
||||
query := getAlertDefinitionByIDQuery{ID: id}
|
||||
|
||||
if err := ng.getAlertDefinitionByID(&query); err != nil {
|
||||
c.JsonApiErr(404, "Alert definition not found", nil)
|
||||
return
|
||||
}
|
||||
|
||||
if c.OrgId != query.Result.OrgId {
|
||||
c.JsonApiErr(403, "You are not allowed to edit/view alert definition", nil)
|
||||
return
|
||||
}
|
||||
}
|
92
pkg/services/ngalert/models.go
Normal file
92
pkg/services/ngalert/models.go
Normal file
@ -0,0 +1,92 @@
|
||||
package ngalert
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||
)
|
||||
|
||||
// AlertDefinition is the model for alert definitions in Alerting NG.
|
||||
type AlertDefinition struct {
|
||||
Id int64
|
||||
OrgId int64
|
||||
Name string
|
||||
Condition string
|
||||
Data []eval.AlertQuery
|
||||
}
|
||||
|
||||
var (
|
||||
// errAlertDefinitionNotFound is an error for an unknown alert definition.
|
||||
errAlertDefinitionNotFound = fmt.Errorf("could not find alert definition")
|
||||
)
|
||||
|
||||
// getAlertDefinitionByIDQuery is the query for retrieving/deleting an alert definition by ID.
|
||||
type getAlertDefinitionByIDQuery struct {
|
||||
ID int64
|
||||
OrgID int64
|
||||
|
||||
Result *AlertDefinition
|
||||
}
|
||||
|
||||
type deleteAlertDefinitionByIDQuery struct {
|
||||
ID int64
|
||||
OrgID int64
|
||||
|
||||
RowsAffected int64
|
||||
}
|
||||
|
||||
// condition is the structure used by storing/updating alert definition commmands
|
||||
type condition struct {
|
||||
RefID string `json:"refId"`
|
||||
|
||||
QueriesAndExpressions []eval.AlertQuery `json:"queriesAndExpressions"`
|
||||
}
|
||||
|
||||
// saveAlertDefinitionCommand is the query for saving a new alert definition.
|
||||
type saveAlertDefinitionCommand struct {
|
||||
Name string `json:"name"`
|
||||
OrgID int64 `json:"-"`
|
||||
Condition condition `json:"condition"`
|
||||
SignedInUser *models.SignedInUser `json:"-"`
|
||||
SkipCache bool `json:"-"`
|
||||
|
||||
Result *AlertDefinition
|
||||
}
|
||||
|
||||
// IsValid validates a SaveAlertDefinitionCommand.
|
||||
// Always returns true.
|
||||
func (cmd *saveAlertDefinitionCommand) IsValid() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// updateAlertDefinitionCommand is the query for updating an existing alert definition.
|
||||
type updateAlertDefinitionCommand struct {
|
||||
ID int64 `json:"-"`
|
||||
Name string `json:"name"`
|
||||
OrgID int64 `json:"-"`
|
||||
Condition condition `json:"condition"`
|
||||
SignedInUser *models.SignedInUser `json:"-"`
|
||||
SkipCache bool `json:"-"`
|
||||
|
||||
RowsAffected int64
|
||||
Result *AlertDefinition
|
||||
}
|
||||
|
||||
// IsValid validates an UpdateAlertDefinitionCommand.
|
||||
// Always returns true.
|
||||
func (cmd *updateAlertDefinitionCommand) IsValid() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
type evalAlertConditionCommand struct {
|
||||
Condition eval.Condition `json:"condition"`
|
||||
Now time.Time `json:"now"`
|
||||
}
|
||||
|
||||
type listAlertDefinitionsCommand struct {
|
||||
OrgID int64 `json:"-"`
|
||||
|
||||
Result []*AlertDefinition
|
||||
}
|
91
pkg/services/ngalert/ngalert.go
Normal file
91
pkg/services/ngalert/ngalert.go
Normal file
@ -0,0 +1,91 @@
|
||||
package ngalert
|
||||
|
||||
import (
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||
|
||||
"github.com/grafana/grafana/pkg/api/routing"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/registry"
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
)
|
||||
|
||||
// AlertNG is the service for evaluating the condition of an alert definition.
|
||||
type AlertNG struct {
|
||||
Cfg *setting.Cfg `inject:""`
|
||||
DatasourceCache datasources.CacheService `inject:""`
|
||||
RouteRegister routing.RouteRegister `inject:""`
|
||||
SQLStore *sqlstore.SQLStore `inject:""`
|
||||
log log.Logger
|
||||
}
|
||||
|
||||
func init() {
|
||||
registry.RegisterService(&AlertNG{})
|
||||
}
|
||||
|
||||
// Init initializes the AlertingService.
|
||||
func (ng *AlertNG) Init() error {
|
||||
ng.log = log.New("ngalert")
|
||||
|
||||
ng.registerAPIEndpoints()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsDisabled returns true if the alerting service is disable for this instance.
|
||||
func (ng *AlertNG) IsDisabled() bool {
|
||||
if ng.Cfg == nil {
|
||||
return false
|
||||
}
|
||||
// Check also about expressions?
|
||||
return !ng.Cfg.IsNgAlertEnabled()
|
||||
}
|
||||
|
||||
// AddMigration defines database migrations.
|
||||
// If Alerting NG is not enabled does nothing.
|
||||
func (ng *AlertNG) AddMigration(mg *migrator.Migrator) {
|
||||
if ng.IsDisabled() {
|
||||
return
|
||||
}
|
||||
|
||||
alertDefinition := migrator.Table{
|
||||
Name: "alert_definition",
|
||||
Columns: []*migrator.Column{
|
||||
{Name: "id", Type: migrator.DB_BigInt, IsPrimaryKey: true, IsAutoIncrement: true},
|
||||
{Name: "org_id", Type: migrator.DB_BigInt, Nullable: false},
|
||||
{Name: "name", Type: migrator.DB_NVarchar, Length: 255, Nullable: false},
|
||||
{Name: "condition", Type: migrator.DB_NVarchar, Length: 255, Nullable: false},
|
||||
{Name: "data", Type: migrator.DB_Text, Nullable: false},
|
||||
},
|
||||
Indices: []*migrator.Index{
|
||||
{Cols: []string{"org_id"}, Type: migrator.IndexType},
|
||||
},
|
||||
}
|
||||
// create table
|
||||
mg.AddMigration("create alert_definition table", migrator.NewAddTableMigration(alertDefinition))
|
||||
|
||||
// create indices
|
||||
mg.AddMigration("add index alert_definition org_id", migrator.NewAddIndexMigration(alertDefinition, alertDefinition.Indices[0]))
|
||||
}
|
||||
|
||||
// LoadAlertCondition returns a Condition object for the given alertDefinitionID.
|
||||
func (ng *AlertNG) LoadAlertCondition(alertDefinitionID int64, signedInUser *models.SignedInUser, skipCache bool) (*eval.Condition, error) {
|
||||
getAlertDefinitionByIDQuery := getAlertDefinitionByIDQuery{ID: alertDefinitionID}
|
||||
if err := ng.getAlertDefinitionByID(&getAlertDefinitionByIDQuery); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
alertDefinition := getAlertDefinitionByIDQuery.Result
|
||||
|
||||
err := ng.validateAlertDefinition(alertDefinition, signedInUser, skipCache)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &eval.Condition{
|
||||
RefID: alertDefinition.Condition,
|
||||
QueriesAndExpressions: alertDefinition.Data,
|
||||
}, nil
|
||||
}
|
33
pkg/services/ngalert/validator.go
Normal file
33
pkg/services/ngalert/validator.go
Normal file
@ -0,0 +1,33 @@
|
||||
package ngalert
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||
)
|
||||
|
||||
// validateAlertDefinition validates that the alert definition contains at least one alert query
|
||||
// and that alert queries refer to existing datasources.
|
||||
func (ng *AlertNG) validateAlertDefinition(alertDefinition *AlertDefinition, signedInUser *models.SignedInUser, skipCache bool) error {
|
||||
if len(alertDefinition.Data) == 0 {
|
||||
return fmt.Errorf("no queries or expressions are found")
|
||||
}
|
||||
|
||||
for _, query := range alertDefinition.Data {
|
||||
datasourceID, err := query.GetDatasource()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if datasourceID == eval.DefaultExprDatasourceID {
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err = ng.DatasourceCache.GetDatasource(datasourceID, signedInUser, skipCache)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
@ -19,7 +19,7 @@ type TsdbQuery struct {
|
||||
}
|
||||
|
||||
type Query struct {
|
||||
RefId string `json:"refID"`
|
||||
RefId string `json:"refId"`
|
||||
Model *simplejson.Json `json:"model,omitempty"`
|
||||
DataSource *models.DataSource `json:"datasource"`
|
||||
MaxDataPoints int64 `json:"maxDataPoints"`
|
||||
|
Loading…
Reference in New Issue
Block a user