Expressions: use datasource model from the query (#41376)

* refactor datasource loading

* refactor datasource loading

* pass uid

* use dscache in alerting to get DS

* remove expr/translate pacakge

* remove dup injection entry

* fix DS type on metrics endpoint, remove SQL DS lookup inside SSE

* update test and adapter

* comment fix

* Make eval run as admin when getting datasource info

Co-authored-by: Marcus Efraimsson <marcus.efraimsson@gmail.com>

* fmt and comment

* remove unncessary/redundant code

Co-authored-by: Kyle Brandt <kyle@grafana.com>
Co-authored-by: Marcus Efraimsson <marcus.efraimsson@gmail.com>
Co-authored-by: Santiago <santiagohernandez.1997@gmail.com>
This commit is contained in:
Ryan McKinley 2021-12-16 08:51:46 -08:00 committed by GitHub
parent 1745cd8186
commit 2754e4fdf0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 119 additions and 1088 deletions

View File

@ -126,6 +126,10 @@ func (s *Service) buildGraph(req *Request) (*simple.DirectedGraph, error) {
dp := simple.NewDirectedGraph()
for _, query := range req.Queries {
if query.DataSource == nil || query.DataSource.Uid == "" {
return nil, fmt.Errorf("missing datasource uid in query with refId %v", query.RefID)
}
rawQueryProp := make(map[string]interface{})
queryBytes, err := query.JSON.MarshalJSON()
@ -139,34 +143,16 @@ func (s *Service) buildGraph(req *Request) (*simple.DirectedGraph, error) {
}
rn := &rawNode{
Query: rawQueryProp,
RefID: query.RefID,
TimeRange: query.TimeRange,
QueryType: query.QueryType,
DatasourceUID: query.GetDatasourceUID(),
}
numericDSID := float64(0) // legacy
if rn.DatasourceUID == "" {
if rv, ok := rn.Query["datasourceId"]; ok {
if sv, ok := rv.(float64); ok {
if sv == DatasourceID {
rn.DatasourceUID = DatasourceUID
}
if sv > 0 {
numericDSID = sv
}
}
}
}
if rn.DatasourceUID == "" && numericDSID == 0 {
return nil, fmt.Errorf("missing datasource uid in query with refId %v", query.RefID)
Query: rawQueryProp,
RefID: query.RefID,
TimeRange: query.TimeRange,
QueryType: query.QueryType,
DataSource: query.DataSource,
}
var node Node
if rn.IsExpressionQuery() {
if IsDataSource(rn.DataSource.Uid) {
node, err = buildCMDNode(dp, rn)
} else {
node, err = s.buildDSNode(dp, rn, req)

View File

@ -4,6 +4,7 @@ import (
"encoding/json"
"testing"
"github.com/grafana/grafana/pkg/models"
"github.com/stretchr/testify/require"
)
@ -19,8 +20,8 @@ func TestServicebuildPipeLine(t *testing.T) {
req: &Request{
Queries: []Query{
{
RefID: "A",
DatasourceUID: OldDatasourceUID,
RefID: "A",
DataSource: DataSourceModel(),
JSON: json.RawMessage(`{
"expression": "B",
"reducer": "mean",
@ -28,8 +29,10 @@ func TestServicebuildPipeLine(t *testing.T) {
}`),
},
{
RefID: "B",
DatasourceUID: "Fake",
RefID: "B",
DataSource: &models.DataSource{
Uid: "Fake",
},
},
},
},
@ -40,16 +43,16 @@ func TestServicebuildPipeLine(t *testing.T) {
req: &Request{
Queries: []Query{
{
RefID: "A",
DatasourceUID: OldDatasourceUID,
RefID: "A",
DataSource: DataSourceModel(),
JSON: json.RawMessage(`{
"expression": "$B",
"type": "math"
}`),
},
{
RefID: "B",
DatasourceUID: OldDatasourceUID,
RefID: "B",
DataSource: DataSourceModel(),
JSON: json.RawMessage(`{
"expression": "$A",
"type": "math"
@ -64,8 +67,8 @@ func TestServicebuildPipeLine(t *testing.T) {
req: &Request{
Queries: []Query{
{
RefID: "A",
DatasourceUID: OldDatasourceUID,
RefID: "A",
DataSource: DataSourceModel(),
JSON: json.RawMessage(`{
"expression": "$A",
"type": "math"
@ -80,8 +83,8 @@ func TestServicebuildPipeLine(t *testing.T) {
req: &Request{
Queries: []Query{
{
RefID: "A",
DatasourceUID: OldDatasourceUID,
RefID: "A",
DataSource: DataSourceModel(),
JSON: json.RawMessage(`{
"expression": "$B",
"type": "math"
@ -96,8 +99,8 @@ func TestServicebuildPipeLine(t *testing.T) {
req: &Request{
Queries: []Query{
{
RefID: "A",
DatasourceUID: OldDatasourceUID,
RefID: "A",
DataSource: DataSourceModel(),
JSON: json.RawMessage(`{
"type": "classic_conditions",
"conditions": [
@ -127,8 +130,8 @@ func TestServicebuildPipeLine(t *testing.T) {
}`),
},
{
RefID: "B",
DatasourceUID: OldDatasourceUID,
RefID: "B",
DataSource: DataSourceModel(),
JSON: json.RawMessage(`{
"expression": "C",
"reducer": "mean",
@ -136,8 +139,10 @@ func TestServicebuildPipeLine(t *testing.T) {
}`),
},
{
RefID: "C",
DatasourceUID: "Fake",
RefID: "C",
DataSource: &models.DataSource{
Uid: "Fake",
},
},
},
},
@ -148,8 +153,8 @@ func TestServicebuildPipeLine(t *testing.T) {
req: &Request{
Queries: []Query{
{
RefID: "A",
DatasourceUID: OldDatasourceUID,
RefID: "A",
DataSource: DataSourceModel(),
JSON: json.RawMessage(`{
"type": "classic_conditions",
"conditions": [
@ -179,8 +184,8 @@ func TestServicebuildPipeLine(t *testing.T) {
}`),
},
{
RefID: "B",
DatasourceUID: OldDatasourceUID,
RefID: "B",
DataSource: DataSourceModel(),
JSON: json.RawMessage(`{
"expression": "A",
"reducer": "mean",
@ -188,8 +193,10 @@ func TestServicebuildPipeLine(t *testing.T) {
}`),
},
{
RefID: "C",
DatasourceUID: "Fake",
RefID: "C",
DataSource: &models.DataSource{
Uid: "Fake",
},
},
},
},
@ -200,10 +207,8 @@ func TestServicebuildPipeLine(t *testing.T) {
req: &Request{
Queries: []Query{
{
RefID: "A",
Datasource: DataSourceRef{
UID: DatasourceUID,
},
RefID: "A",
DataSource: DataSourceModel(),
JSON: json.RawMessage(`{
"expression": "B",
"reducer": "mean",
@ -212,8 +217,8 @@ func TestServicebuildPipeLine(t *testing.T) {
},
{
RefID: "B",
Datasource: DataSourceRef{
UID: "Fake",
DataSource: &models.DataSource{
Uid: "Fake",
},
},
},

View File

@ -11,6 +11,9 @@ import (
"github.com/grafana/grafana/pkg/expr/classic"
"github.com/grafana/grafana/pkg/expr/mathexp"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins/adapters"
"github.com/grafana/grafana/pkg/util/errutil"
"gonum.org/v1/gonum/graph/simple"
)
@ -39,23 +42,11 @@ type baseNode struct {
}
type rawNode struct {
RefID string `json:"refId"`
Query map[string]interface{}
QueryType string
TimeRange TimeRange
DatasourceUID string // Gets populated from Either DatasourceUID or Datasource.UID
}
func (rn *rawNode) IsExpressionQuery() bool {
if IsDataSource(rn.DatasourceUID) {
return true
}
if v, ok := rn.Query["datasourceId"]; ok {
if v == OldDatasourceUID {
return true
}
}
return false
RefID string `json:"refId"`
Query map[string]interface{}
QueryType string
TimeRange TimeRange
DataSource *models.DataSource
}
func (rn *rawNode) GetCommandType() (c CommandType, err error) {
@ -146,9 +137,8 @@ const (
// DSNode is a DPNode that holds a datasource request.
type DSNode struct {
baseNode
query json.RawMessage
datasourceID int64
datasourceUID string
query json.RawMessage
datasource *models.DataSource
orgID int64
queryType string
@ -181,18 +171,7 @@ func (s *Service) buildDSNode(dp *simple.DirectedGraph, rn *rawNode, req *Reques
maxDP: defaultMaxDP,
timeRange: rn.TimeRange,
request: *req,
}
// support old datasourceId property
rawDsID, ok := rn.Query["datasourceId"]
if ok {
floatDsID, ok := rawDsID.(float64)
if !ok {
return nil, fmt.Errorf("expected datasourceId to be a float64, got type %T for refId %v", rawDsID, rn.RefID)
}
dsNode.datasourceID = int64(floatDsID)
} else {
dsNode.datasourceUID = rn.DatasourceUID
datasource: rn.DataSource,
}
var floatIntervalMS float64
@ -218,12 +197,14 @@ func (s *Service) buildDSNode(dp *simple.DirectedGraph, rn *rawNode, req *Reques
// other nodes they must have already been executed and their results must
// already by in vars.
func (dn *DSNode) Execute(ctx context.Context, vars mathexp.Vars, s *Service) (mathexp.Results, error) {
dsInstanceSettings, err := adapters.ModelToInstanceSettings(dn.datasource, s.decryptSecureJsonDataFn(ctx))
if err != nil {
return mathexp.Results{}, errutil.Wrap("failed to convert datasource instance settings", err)
}
pc := backend.PluginContext{
OrgID: dn.orgID,
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{
ID: dn.datasourceID,
UID: dn.datasourceUID,
},
OrgID: dn.orgID,
DataSourceInstanceSettings: dsInstanceSettings,
PluginID: dn.datasource.Type,
}
q := []backend.DataQuery{
@ -240,12 +221,11 @@ func (dn *DSNode) Execute(ctx context.Context, vars mathexp.Vars, s *Service) (m
},
}
resp, err := s.queryData(ctx, &backend.QueryDataRequest{
resp, err := s.dataService.QueryData(ctx, &backend.QueryDataRequest{
PluginContext: pc,
Queries: q,
Headers: dn.request.Headers,
})
if err != nil {
return mathexp.Results{}, err
}

View File

@ -39,18 +39,24 @@ func TestService(t *testing.T) {
}
bus.AddHandlerCtx("test", func(_ context.Context, query *models.GetDataSourceQuery) error {
query.Result = &models.DataSource{Id: 1, OrgId: 1, Type: "test", JsonData: simplejson.New()}
query.Result = &models.DataSource{Uid: "1", OrgId: 1, Type: "test", JsonData: simplejson.New()}
return nil
})
queries := []Query{
{
RefID: "A",
JSON: json.RawMessage(`{ "datasource": "test", "datasourceId": 1, "orgId": 1, "intervalMs": 1000, "maxDataPoints": 1000 }`),
DataSource: &models.DataSource{
OrgId: 1,
Uid: "test",
Type: "test",
},
JSON: json.RawMessage(`{ "datasource": { "uid": "1" }, "intervalMs": 1000, "maxDataPoints": 1000 }`),
},
{
RefID: "B",
JSON: json.RawMessage(`{ "datasource": "__expr__", "datasourceId": -100, "type": "math", "expression": "$A * 2" }`),
RefID: "B",
DataSource: DataSourceModel(),
JSON: json.RawMessage(`{ "datasource": { "uid": "__expr__", "type": "__expr__"}, "type": "math", "expression": "$A * 2" }`),
},
}

View File

@ -6,10 +6,7 @@ import (
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins/adapters"
"github.com/grafana/grafana/pkg/util/errutil"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/context"
)
@ -44,30 +41,13 @@ type Request struct {
type Query struct {
RefID string
TimeRange TimeRange
DatasourceUID string // deprecated, value -100 when expressions
Datasource DataSourceRef `json:"datasource"`
DataSource *models.DataSource `json:"datasource"`
JSON json.RawMessage
Interval time.Duration
QueryType string
MaxDataPoints int64
}
type DataSourceRef struct {
Type string `json:"type"` // value should be __expr__
UID string `json:"uid"` // value should be __expr__
}
func (q *Query) GetDatasourceUID() string {
if q.DatasourceUID != "" {
return q.DatasourceUID // backwards compatibility gets precedence
}
if q.Datasource.UID != "" {
return q.Datasource.UID
}
return ""
}
// TimeRange is a time.Time based TimeRange.
type TimeRange struct {
From time.Time
@ -146,42 +126,6 @@ func hiddenRefIDs(queries []Query) (map[string]struct{}, error) {
return hidden, nil
}
// queryData is called used to query datasources that are not expression commands, but are used
// alongside expressions and/or are the input of an expression command.
func (s *Service) queryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
if len(req.Queries) == 0 {
return nil, fmt.Errorf("zero queries found in datasource request")
}
datasourceID := int64(0)
var datasourceUID string
if req.PluginContext.DataSourceInstanceSettings != nil {
datasourceID = req.PluginContext.DataSourceInstanceSettings.ID
datasourceUID = req.PluginContext.DataSourceInstanceSettings.UID
}
getDsInfo := &models.GetDataSourceQuery{
OrgId: req.PluginContext.OrgID,
Id: datasourceID,
Uid: datasourceUID,
}
if err := bus.DispatchCtx(ctx, getDsInfo); err != nil {
return nil, fmt.Errorf("could not find datasource: %w", err)
}
dsInstanceSettings, err := adapters.ModelToInstanceSettings(getDsInfo.Result, s.decryptSecureJsonDataFn(ctx))
if err != nil {
return nil, errutil.Wrap("failed to convert datasource instance settings", err)
}
req.PluginContext.DataSourceInstanceSettings = dsInstanceSettings
req.PluginContext.PluginID = getDsInfo.Result.Type
return s.dataService.QueryData(ctx, req)
}
func (s *Service) decryptSecureJsonDataFn(ctx context.Context) func(map[string][]byte) map[string]string {
return func(m map[string][]byte) map[string]string {
decryptedJsonData, err := s.secretsService.DecryptJsonData(ctx, m)

View File

@ -1,354 +0,0 @@
{
"conditions": [
{
"evaluator": {
"params": [
3
],
"type": "gt"
},
"operator": {
"type": "and"
},
"query": {
"datasourceId": 4,
"model": {
"alias": "",
"csvWave": {
"timeStep": 60,
"valuesCSV": "0,0,2,2,1,1"
},
"hide": false,
"lines": 10,
"points": [],
"pulseWave": {
"offCount": 3,
"offValue": 1,
"onCount": 3,
"onValue": 2,
"timeStep": 60
},
"refId": "B",
"scenarioId": "predictable_pulse",
"stringInput": ""
},
"params": [
"B",
"5m",
"now"
]
},
"reducer": {
"params": [],
"type": "avg"
},
"type": "query"
},
{
"evaluator": {
"params": [
2,
5
],
"type": "within_range"
},
"operator": {
"type": "and"
},
"query": {
"datasourceId": 4,
"model": {
"alias": "",
"csvWave": {
"timeStep": 60,
"valuesCSV": "0,0,2,2,1,1"
},
"hide": false,
"lines": 10,
"points": [],
"pulseWave": {
"offCount": 3,
"offValue": 1,
"onCount": 3,
"onValue": 2,
"timeStep": 60
},
"refId": "B",
"scenarioId": "predictable_pulse",
"stringInput": ""
},
"params": [
"B",
"10m",
"now-5m"
]
},
"reducer": {
"params": [],
"type": "max"
},
"type": "query"
},
{
"evaluator": {
"params": [
6
],
"type": "gt"
},
"operator": {
"type": "or"
},
"query": {
"datasourceId": 4,
"model": {
"alias": "",
"csvWave": {
"timeStep": 60,
"valuesCSV": "0,0,2,2,1,1"
},
"lines": 10,
"points": [],
"pulseWave": {
"offCount": 3,
"offValue": 1,
"onCount": 3,
"onValue": 2,
"timeStep": 60
},
"refId": "A",
"scenarioId": "predictable_csv_wave",
"stringInput": ""
},
"params": [
"A",
"5m",
"now"
]
},
"reducer": {
"params": [],
"type": "sum"
},
"type": "query"
},
{
"evaluator": {
"params": [
7
],
"type": "gt"
},
"operator": {
"type": "and"
},
"query": {
"datasourceId": 4,
"model": {
"alias": "",
"csvWave": {
"timeStep": 60,
"valuesCSV": "0,0,2,2,1,1"
},
"lines": 10,
"points": [],
"pulseWave": {
"offCount": 3,
"offValue": 1,
"onCount": 3,
"onValue": 2,
"timeStep": 60
},
"refId": "A",
"scenarioId": "predictable_csv_wave",
"stringInput": ""
},
"params": [
"A",
"5m",
"now"
]
},
"reducer": {
"params": [],
"type": "last"
},
"type": "query"
},
{
"evaluator": {
"params": [],
"type": "no_value"
},
"operator": {
"type": "and"
},
"query": {
"datasourceId": 4,
"model": {
"alias": "",
"csvWave": {
"timeStep": 60,
"valuesCSV": "0,0,2,2,1,1"
},
"hide": false,
"lines": 10,
"points": [],
"pulseWave": {
"offCount": 3,
"offValue": 1,
"onCount": 3,
"onValue": 2,
"timeStep": 60
},
"refId": "C",
"scenarioId": "no_data_points",
"stringInput": ""
},
"params": [
"C",
"5m",
"now"
]
},
"reducer": {
"params": [],
"type": "diff"
},
"type": "query"
},
{
"evaluator": {
"params": [
9
],
"type": "gt"
},
"operator": {
"type": "or"
},
"query": {
"datasourceId": 4,
"model": {
"alias": "",
"csvWave": {
"timeStep": 30,
"valuesCSV": "1,1,6,6,3,3"
},
"hide": false,
"lines": 10,
"points": [],
"pulseWave": {
"offCount": 3,
"offValue": 1,
"onCount": 3,
"onValue": 2,
"timeStep": 60
},
"refId": "D",
"scenarioId": "predictable_csv_wave",
"stringInput": ""
},
"params": [
"D",
"5m",
"now"
]
},
"reducer": {
"params": [],
"type": "diff_abs"
},
"type": "query"
},
{
"evaluator": {
"params": [
10
],
"type": "gt"
},
"operator": {
"type": "and"
},
"query": {
"datasourceId": 4,
"model": {
"alias": "",
"csvWave": {
"timeStep": 30,
"valuesCSV": "1,1,6,6,3,3"
},
"hide": false,
"lines": 10,
"points": [],
"pulseWave": {
"offCount": 3,
"offValue": 1,
"onCount": 3,
"onValue": 2,
"timeStep": 60
},
"refId": "D",
"scenarioId": "predictable_csv_wave",
"stringInput": ""
},
"params": [
"D",
"5m",
"now"
]
},
"reducer": {
"params": [],
"type": "percent_diff"
},
"type": "query"
},
{
"evaluator": {
"params": [
11
],
"type": "gt"
},
"operator": {
"type": "and"
},
"query": {
"datasourceId": 4,
"model": {
"alias": "",
"csvWave": {
"timeStep": 30,
"valuesCSV": "1,1,6,6,3,3"
},
"hide": false,
"lines": 10,
"points": [],
"pulseWave": {
"offCount": 3,
"offValue": 1,
"onCount": 3,
"onValue": 2,
"timeStep": 60
},
"refId": "D",
"scenarioId": "predictable_csv_wave",
"stringInput": ""
},
"params": [
"D",
"10m",
"now"
]
},
"reducer": {
"params": [],
"type": "percent_diff_abs"
},
"type": "query"
}
]
}

View File

@ -1,63 +0,0 @@
{
"conditions": [
{
"evaluator": {
"params": [
0
],
"type": "lt"
},
"operator": {
"type": ""
},
"query": {
"datasourceId": 2,
"model": {
"expr": "avg_over_time(sum by (instance) (up)[1h:5m])",
"interval": "",
"legendFormat": "",
"refId": "A"
},
"params": [
"A",
"5m",
"now"
]
},
"reducer": {
"params": [],
"type": "avg"
},
"type": "query"
},
{
"evaluator": {
"params": [
0
],
"type": "gt"
},
"operator": {
"type": "and"
},
"query": {
"datasourceId": 2,
"model": {
"expr": "avg_over_time(sum by (instance) (up)[1h:5m])",
"interval": "",
"legendFormat": "",
"refId": "A"
},
"params": [
"A",
"10m",
"now-5m"
]
},
"reducer": {
"params": [],
"type": "avg"
},
"type": "query"
}
]}

View File

@ -1,349 +0,0 @@
package translate
import (
"context"
"encoding/json"
"fmt"
"sort"
"strings"
"time"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/expr"
"github.com/grafana/grafana/pkg/expr/classic"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/util"
)
// DashboardAlertConditions turns dashboard alerting conditions into server side expression queries and a
// classic conditions operation. A Condition from the ngalert model's package will be returned if the
// translation is successful in creating an expression that can be parsed.
// A query is created for each unique referenced query in the dashboard. Each query is considered to be unique
// based on the RefID and the Time Range. Therefore, if the same RefID has multiple time ranges in the dashboard
// condition, new RefIDs will be created.
func DashboardAlertConditions(rawDCondJSON []byte, orgID int64) (*ngmodels.Condition, error) {
oldCond := dashConditionsJSON{}
err := json.Unmarshal(rawDCondJSON, &oldCond)
if err != nil {
return nil, err
}
ngCond, err := oldCond.GetNew(orgID)
if err != nil {
return nil, err
}
backendReq, err := eval.GetExprRequest(eval.AlertExecCtx{ExpressionsEnabled: true, Log: log.New("translate")}, ngCond.Data, time.Unix(500, 0))
if err != nil {
return nil, err
}
svc := &expr.Service{}
_, err = svc.BuildPipeline(backendReq)
if err != nil {
return nil, err
}
return ngCond, nil
}
type dashConditionsJSON struct {
Conditions []dashAlertingConditionJSON `json:"conditions"`
}
// dashAlertingConditionJSON is like classic.ClassicConditionJSON except that it
// includes the model property with the query.
type dashAlertingConditionJSON struct {
Evaluator conditionEvalJSON `json:"evaluator"`
Operator struct {
Type string `json:"type"`
} `json:"operator"`
Query struct {
Params []string `json:"params"`
DatasourceID int64 `json:""`
Model json.RawMessage
} `json:"query"`
Reducer struct {
// Params []interface{} `json:"params"` (Unused)
Type string `json:"type"`
}
}
type conditionEvalJSON struct {
Params []float64 `json:"params"`
Type string `json:"type"` // e.g. "gt"
}
func (dc *dashConditionsJSON) GetNew(orgID int64) (*ngmodels.Condition, error) {
refIDtoCondIdx := make(map[string][]int) // a map of original refIds to their corresponding condition index
for i, cond := range dc.Conditions {
if len(cond.Query.Params) != 3 {
return nil, fmt.Errorf("unexpected number of query parameters in cond %v, want 3 got %v", i+1, len(cond.Query.Params))
}
refID := cond.Query.Params[0]
refIDtoCondIdx[refID] = append(refIDtoCondIdx[refID], i)
}
newRefIDstoCondIdx := make(map[string][]int) // a map of the new refIds to their coresponding condition index
refIDs := make([]string, 0, len(refIDtoCondIdx)) // a unique sorted list of the original refIDs
for refID := range refIDtoCondIdx {
refIDs = append(refIDs, refID)
}
sort.Strings(refIDs)
newRefIDsToTimeRanges := make(map[string][2]string) // a map of new RefIDs to their time range string tuple representation
for _, refID := range refIDs {
condIdxes := refIDtoCondIdx[refID]
if len(condIdxes) == 1 {
// If the refID is used in only condition, keep the letter a new refID
newRefIDstoCondIdx[refID] = append(newRefIDstoCondIdx[refID], condIdxes[0])
newRefIDsToTimeRanges[refID] = [2]string{dc.Conditions[condIdxes[0]].Query.Params[1], dc.Conditions[condIdxes[0]].Query.Params[2]}
continue
}
// track unique time ranges within the same refID
timeRangesToCondIdx := make(map[[2]string][]int) // a map of the time range tuple to the condition index
for _, idx := range condIdxes {
timeParamFrom := dc.Conditions[idx].Query.Params[1]
timeParamTo := dc.Conditions[idx].Query.Params[2]
key := [2]string{timeParamFrom, timeParamTo}
timeRangesToCondIdx[key] = append(timeRangesToCondIdx[key], idx)
}
if len(timeRangesToCondIdx) == 1 {
// if all shared time range, no need to create a new query with a new RefID
for i := range condIdxes {
newRefIDstoCondIdx[refID] = append(newRefIDstoCondIdx[refID], condIdxes[i])
newRefIDsToTimeRanges[refID] = [2]string{dc.Conditions[condIdxes[i]].Query.Params[1], dc.Conditions[condIdxes[i]].Query.Params[2]}
}
continue
}
// This referenced query/refID has different time ranges, so new queries are needed for each unique time range.
timeRanges := make([][2]string, 0, len(timeRangesToCondIdx)) // a sorted list of unique time ranges for the query
for tr := range timeRangesToCondIdx {
timeRanges = append(timeRanges, tr)
}
sort.Slice(timeRanges, func(i, j int) bool {
switch {
case timeRanges[i][0] < timeRanges[j][0]:
return true
case timeRanges[i][0] > timeRanges[j][0]:
return false
default:
return timeRanges[i][1] < timeRanges[j][1]
}
})
for _, tr := range timeRanges {
idxes := timeRangesToCondIdx[tr]
for i := 0; i < len(idxes); i++ {
newLetter, err := getNewRefID(newRefIDstoCondIdx)
if err != nil {
return nil, err
}
newRefIDstoCondIdx[newLetter] = append(newRefIDstoCondIdx[newLetter], idxes[i])
newRefIDsToTimeRanges[newLetter] = [2]string{dc.Conditions[idxes[i]].Query.Params[1], dc.Conditions[idxes[i]].Query.Params[2]}
}
}
}
newRefIDs := make([]string, 0, len(newRefIDstoCondIdx)) // newRefIds is a sorted list of the unique refIds of new queries
for refID := range newRefIDstoCondIdx {
newRefIDs = append(newRefIDs, refID)
}
sort.Strings(newRefIDs)
ngCond := &ngmodels.Condition{}
condIdxToNewRefID := make(map[int]string) // a map of condition indices to the RefIDs of new queries
// build the new data source queries
for _, refID := range newRefIDs {
condIdxes := newRefIDstoCondIdx[refID]
for i, condIdx := range condIdxes {
condIdxToNewRefID[condIdx] = refID
if i > 0 {
// only create each unique query once
continue
}
var queryObj map[string]interface{} // copy the model
err := json.Unmarshal(dc.Conditions[condIdx].Query.Model, &queryObj)
if err != nil {
return nil, err
}
getDsInfo := &models.GetDataSourceQuery{
OrgId: orgID,
Id: dc.Conditions[condIdx].Query.DatasourceID,
}
if err := bus.DispatchCtx(context.TODO(), getDsInfo); err != nil {
return nil, fmt.Errorf("could not find datasource: %w", err)
}
queryObj["datasource"] = getDsInfo.Result.Name
queryObj["refId"] = refID
encodedObj, err := json.Marshal(queryObj)
if err != nil {
return nil, err
}
rawFrom := newRefIDsToTimeRanges[refID][0]
rawTo := newRefIDsToTimeRanges[refID][1]
rTR, err := getRelativeDuration(rawFrom, rawTo)
if err != nil {
return nil, err
}
alertQuery := ngmodels.AlertQuery{
RefID: refID,
Model: encodedObj,
RelativeTimeRange: *rTR,
DatasourceUID: getDsInfo.Result.Uid,
}
ngCond.Data = append(ngCond.Data, alertQuery)
}
}
// build the new classic condition pointing our new equivalent queries
conditions := make([]classic.ClassicConditionJSON, len(dc.Conditions))
for i, cond := range dc.Conditions {
newCond := classic.ClassicConditionJSON{}
newCond.Evaluator = classic.ConditionEvalJSON{
Type: cond.Evaluator.Type,
Params: cond.Evaluator.Params,
}
newCond.Operator.Type = cond.Operator.Type
newCond.Query.Params = append(newCond.Query.Params, condIdxToNewRefID[i])
newCond.Reducer.Type = cond.Reducer.Type
conditions[i] = newCond
}
ccRefID, err := getNewRefID(newRefIDstoCondIdx) // get refID for the classic condition
if err != nil {
return nil, err
}
ngCond.Condition = ccRefID // set the alert condition to point to the classic condition
ngCond.OrgID = orgID
exprModel := struct {
Type string `json:"type"`
RefID string `json:"refId"`
Conditions []classic.ClassicConditionJSON `json:"conditions"`
}{
"classic_conditions",
ccRefID,
conditions,
}
exprModelJSON, err := json.Marshal(&exprModel)
if err != nil {
return nil, err
}
ccAlertQuery := ngmodels.AlertQuery{
RefID: ccRefID,
Model: exprModelJSON,
DatasourceUID: expr.OldDatasourceUID,
}
ngCond.Data = append(ngCond.Data, ccAlertQuery)
for i := range ngCond.Data {
err := ngCond.Data[i].PreSave() // Set query model properties
if err != nil {
return nil, err
}
}
sort.Slice(ngCond.Data, func(i, j int) bool {
return ngCond.Data[i].RefID < ngCond.Data[j].RefID
})
return ngCond, nil
}
const alpha = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
// getNewRefID finds first capital letter in the alphabet not in use
// to use for a new RefID. It errors if it runs out of letters.
func getNewRefID(refIDs map[string][]int) (string, error) {
for _, r := range alpha {
sR := string(r)
if _, ok := refIDs[sR]; ok {
continue
}
return sR, nil
}
for i := 0; i < 20; i++ {
sR := util.GenerateShortUID()
if _, ok := refIDs[sR]; ok {
continue
}
return sR, nil
}
return "", fmt.Errorf("failed to generate unique RefID")
}
// getRelativeDuration turns the alerting durations for dashboard conditions
// into a relative time range.
func getRelativeDuration(rawFrom, rawTo string) (*ngmodels.RelativeTimeRange, error) {
fromD, err := getFrom(rawFrom)
if err != nil {
return nil, err
}
toD, err := getTo(rawTo)
if err != nil {
return nil, err
}
return &ngmodels.RelativeTimeRange{
From: ngmodels.Duration(fromD),
To: ngmodels.Duration(toD),
}, nil
}
func getFrom(from string) (time.Duration, error) {
fromRaw := strings.Replace(from, "now-", "", 1)
d, err := time.ParseDuration("-" + fromRaw)
if err != nil {
return 0, err
}
return -d, err
}
func getTo(to string) (time.Duration, error) {
if to == "now" {
return 0, nil
} else if strings.HasPrefix(to, "now-") {
withoutNow := strings.Replace(to, "now-", "", 1)
d, err := time.ParseDuration("-" + withoutNow)
if err != nil {
return 0, err
}
return -d, nil
}
d, err := time.ParseDuration(to)
if err != nil {
return 0, err
}
return -d, nil
}

View File

@ -1,147 +0,0 @@
package translate
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"path/filepath"
"testing"
"time"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/expr/classic"
"github.com/grafana/grafana/pkg/models"
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/stretchr/testify/require"
)
func TestDashboardAlertConditions(t *testing.T) {
registerGetDsInfoHandler()
var tests = []struct {
name string
// inputJSONFName, at least for now, is as "conditions" will appear within the alert table
// settings column JSON. Which means it has already run through the dashboard
// alerting Extractor. It is the input.
inputJSONFName string
// Condition is quite large (and unexported things), so check misc attributes.
spotCheckFn func(t *testing.T, cond *ngmodels.Condition)
}{
{
name: "two conditions one query but different time ranges",
inputJSONFName: `sameQueryDifferentTimeRange.json`,
spotCheckFn: func(t *testing.T, cond *ngmodels.Condition) {
require.Equal(t, "C", cond.Condition, "unexpected refId for condition")
require.Equal(t, 3, len(cond.Data), "unexpected query/expression array length")
firstQuery := cond.Data[0]
require.Equal(t, "A", firstQuery.RefID, "unexpected refId for first query")
require.Equal(t, ngmodels.RelativeTimeRange{
From: ngmodels.Duration(time.Second * 600),
To: ngmodels.Duration(time.Second * 300),
}, firstQuery.RelativeTimeRange, "unexpected timerange for first query")
secondQuery := cond.Data[1]
require.Equal(t, "B", secondQuery.RefID, "unexpected refId for second query")
require.Equal(t, ngmodels.RelativeTimeRange{
From: ngmodels.Duration(time.Second * 300),
To: ngmodels.Duration(0),
}, secondQuery.RelativeTimeRange, "unexpected timerange for second query")
condQuery := cond.Data[2]
require.Equal(t, "C", condQuery.RefID, "unexpected refId for second query")
isExpr, err := condQuery.IsExpression()
require.NoError(t, err)
require.Equal(t, true, isExpr, "third query should be an expression")
c := struct {
Conditions []classic.ClassicConditionJSON `json:"conditions"`
}{}
err = json.Unmarshal(condQuery.Model, &c)
require.NoError(t, err)
require.Equal(t, 2, len(c.Conditions), "expected 2 conditions in classic condition")
// This is "correct" in that the condition gets the correct time range,
// but a bit odd that it creates B then A, can look into changing that
// later.
firstCond := c.Conditions[0]
require.Equal(t, "lt", firstCond.Evaluator.Type, "expected first cond to use lt")
require.Equal(t, "B", firstCond.Query.Params[0], "expected first cond to reference B")
secondCond := c.Conditions[1]
require.Equal(t, "gt", secondCond.Evaluator.Type, "expected second cond to use gt")
require.Equal(t, "A", secondCond.Query.Params[0], "expected second cond to reference A")
},
},
{
name: "mixed shared and unshared time ranges",
inputJSONFName: `mixedSharedUnsharedTimeRange.json`,
spotCheckFn: func(t *testing.T, cond *ngmodels.Condition) {
require.Equal(t, "G", cond.Condition, "unexpected refId for condition")
require.Equal(t, 7, len(cond.Data), "unexpected query/expression array length")
condQuery := cond.Data[6]
isExpr, err := condQuery.IsExpression()
require.NoError(t, err)
require.Equal(t, true, isExpr, "expected last query to be an expression")
c := struct {
Conditions []classic.ClassicConditionJSON `json:"conditions"`
}{}
err = json.Unmarshal(condQuery.Model, &c)
require.NoError(t, err)
require.Equal(t, 8, len(c.Conditions), "expected 8 conditions in classic condition")
firstCond := c.Conditions[0]
require.Equal(t, "gt", firstCond.Evaluator.Type, "expected first cond to use gt")
require.Equal(t, "avg", firstCond.Reducer.Type, "expected first cond to use reducer avg")
firstCondRefID := firstCond.Query.Params[0]
aq, err := alertRuleByRefId(cond, firstCondRefID)
require.NoError(t, err)
require.Equal(t, ngmodels.Duration(300*time.Second), aq.RelativeTimeRange.From,
"expected first condition to reference a query with a from of 300 seconds")
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
jsonFile := filepath.Join("testdata", tt.inputJSONFName)
//nolint:GOSEC
b, err := ioutil.ReadFile(jsonFile)
require.NoError(t, err)
cond, err := DashboardAlertConditions(b, 1)
require.NoError(t, err)
tt.spotCheckFn(t, cond)
})
}
}
func alertRuleByRefId(cond *ngmodels.Condition, refID string) (ngmodels.AlertQuery, error) {
for _, aq := range cond.Data {
if aq.RefID == refID {
return aq, nil
}
}
return ngmodels.AlertQuery{}, fmt.Errorf("query with refId %v not found", refID)
}
func registerGetDsInfoHandler() {
bus.AddHandlerCtx("test", func(ctx context.Context, query *models.GetDataSourceQuery) error {
switch {
case query.Id == 2:
query.Result = &models.DataSource{Id: 2, OrgId: 1, Uid: "000000002"}
case query.Id == 4:
query.Result = &models.DataSource{Id: 4, OrgId: 1, Uid: "000000004"}
default:
return fmt.Errorf("datasource not found")
}
return nil
})
}

View File

@ -2,6 +2,8 @@
package adapters
import (
"encoding/json"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/models"
)
@ -9,9 +11,13 @@ import (
// ModelToInstanceSettings converts a models.DataSource to a backend.DataSourceInstanceSettings.
func ModelToInstanceSettings(ds *models.DataSource, decryptFn func(map[string][]byte) map[string]string,
) (*backend.DataSourceInstanceSettings, error) {
jsonDataBytes, err := ds.JsonData.MarshalJSON()
if err != nil {
return nil, err
var jsonDataBytes json.RawMessage
if ds.JsonData != nil {
var err error
jsonDataBytes, err = ds.JsonData.MarshalJSON()
if err != nil {
return nil, err
}
}
return &backend.DataSourceInstanceSettings{

View File

@ -85,7 +85,7 @@ func (srv TestingApiSrv) RouteEvalQueries(c *models.ReqContext, cmd apimodels.Ev
return ErrResp(http.StatusBadRequest, err, "invalid queries or expressions")
}
evaluator := eval.Evaluator{Cfg: srv.Cfg, Log: srv.log}
evaluator := eval.Evaluator{Cfg: srv.Cfg, Log: srv.log, DataSourceCache: srv.DatasourceCache}
evalResults, err := evaluator.QueriesAndExpressionsEval(c.SignedInUser.OrgId, cmd.Data, now, srv.ExpressionService)
if err != nil {
return ErrResp(http.StatusBadRequest, err, "Failed to evaluate queries and expressions")

View File

@ -241,7 +241,7 @@ func conditionEval(c *models.ReqContext, cmd ngmodels.EvalAlertConditionCommand,
now = timeNow()
}
evaluator := eval.Evaluator{Cfg: cfg, Log: log}
evaluator := eval.Evaluator{Cfg: cfg, Log: log, DataSourceCache: datasourceCache}
evalResults, err := evaluator.ConditionEval(&evalCond, now, expressionService)
if err != nil {
return ErrResp(http.StatusBadRequest, err, "Failed to evaluate conditions")

View File

@ -13,6 +13,8 @@ import (
"github.com/grafana/grafana/pkg/expr/classic"
"github.com/grafana/grafana/pkg/infra/log"
m "github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/setting"
@ -23,8 +25,9 @@ import (
)
type Evaluator struct {
Cfg *setting.Cfg
Log log.Logger
Cfg *setting.Cfg
Log log.Logger
DataSourceCache datasources.CacheService
}
// invalidEvalResultFormatError is an error for invalid format of the alert definition evaluation results.
@ -120,8 +123,8 @@ type AlertExecCtx struct {
Ctx context.Context
}
// GetExprRequest validates the condition and creates a expr.Request from it.
func GetExprRequest(ctx AlertExecCtx, data []models.AlertQuery, now time.Time) (*expr.Request, error) {
// GetExprRequest validates the condition, gets the datasource information and creates an expr.Request from it.
func GetExprRequest(ctx AlertExecCtx, data []models.AlertQuery, now time.Time, dsCacheService datasources.CacheService) (*expr.Request, error) {
req := &expr.Request{
OrgId: ctx.OrgID,
Headers: map[string]string{
@ -131,6 +134,8 @@ func GetExprRequest(ctx AlertExecCtx, data []models.AlertQuery, now time.Time) (
},
}
datasources := make(map[string]*m.DataSource, len(data))
for i := range data {
q := data[i]
model, err := q.GetModel()
@ -147,12 +152,27 @@ func GetExprRequest(ctx AlertExecCtx, data []models.AlertQuery, now time.Time) (
return nil, fmt.Errorf("failed to retrieve maxDatapoints from the model: %w", err)
}
ds, ok := datasources[q.DatasourceUID]
if !ok {
if expr.IsDataSource(q.DatasourceUID) {
ds = expr.DataSourceModel()
} else {
ds, err = dsCacheService.GetDatasourceByUID(q.DatasourceUID, &m.SignedInUser{
OrgId: ctx.OrgID,
OrgRole: m.ROLE_ADMIN, // Get DS as admin for service, API calls (test/post) must check permissions based on user.
}, true)
if err != nil {
return nil, err
}
}
datasources[q.DatasourceUID] = ds
}
req.Queries = append(req.Queries, expr.Query{
TimeRange: expr.TimeRange{
From: q.RelativeTimeRange.ToTimeRange(now).From,
To: q.RelativeTimeRange.ToTimeRange(now).To,
},
DatasourceUID: q.DatasourceUID,
DataSource: ds,
JSON: model,
Interval: interval,
RefID: q.RefID,
@ -169,8 +189,8 @@ type NumberValueCapture struct {
Value *float64
}
func executeCondition(ctx AlertExecCtx, c *models.Condition, now time.Time, exprService *expr.Service) ExecutionResults {
execResp, err := executeQueriesAndExpressions(ctx, c.Data, now, exprService)
func executeCondition(ctx AlertExecCtx, c *models.Condition, now time.Time, exprService *expr.Service, dsCacheService datasources.CacheService) ExecutionResults {
execResp, err := executeQueriesAndExpressions(ctx, c.Data, now, exprService, dsCacheService)
if err != nil {
return ExecutionResults{Error: err}
}
@ -253,7 +273,7 @@ func executeCondition(ctx AlertExecCtx, c *models.Condition, now time.Time, expr
return result
}
func executeQueriesAndExpressions(ctx AlertExecCtx, data []models.AlertQuery, now time.Time, exprService *expr.Service) (resp *backend.QueryDataResponse, err error) {
func executeQueriesAndExpressions(ctx AlertExecCtx, data []models.AlertQuery, now time.Time, exprService *expr.Service, dsCacheService datasources.CacheService) (resp *backend.QueryDataResponse, err error) {
defer func() {
if e := recover(); e != nil {
ctx.Log.Error("alert rule panic", "error", e, "stack", string(debug.Stack()))
@ -266,7 +286,7 @@ func executeQueriesAndExpressions(ctx AlertExecCtx, data []models.AlertQuery, no
}
}()
queryDataReq, err := GetExprRequest(ctx, data, now)
queryDataReq, err := GetExprRequest(ctx, data, now, dsCacheService)
if err != nil {
return nil, err
}
@ -507,7 +527,7 @@ func (e *Evaluator) ConditionEval(condition *models.Condition, now time.Time, ex
alertExecCtx := AlertExecCtx{OrgID: condition.OrgID, Ctx: alertCtx, ExpressionsEnabled: e.Cfg.ExpressionsEnabled, Log: e.Log}
execResult := executeCondition(alertExecCtx, condition, now, expressionService)
execResult := executeCondition(alertExecCtx, condition, now, expressionService, e.DataSourceCache)
evalResults := evaluateExecutionResult(execResult, now)
return evalResults, nil
@ -520,7 +540,7 @@ func (e *Evaluator) QueriesAndExpressionsEval(orgID int64, data []models.AlertQu
alertExecCtx := AlertExecCtx{OrgID: orgID, Ctx: alertCtx, ExpressionsEnabled: e.Cfg.ExpressionsEnabled, Log: e.Log}
execResult, err := executeQueriesAndExpressions(alertExecCtx, data, now, expressionService)
execResult, err := executeQueriesAndExpressions(alertExecCtx, data, now, expressionService, e.DataSourceCache)
if err != nil {
return nil, fmt.Errorf("failed to execute conditions: %w", err)
}

View File

@ -119,7 +119,7 @@ func (ng *AlertNG) init() error {
BaseInterval: baseInterval,
Logger: ng.Log,
MaxAttempts: ng.Cfg.UnifiedAlerting.MaxAttempts,
Evaluator: eval.Evaluator{Cfg: ng.Cfg, Log: ng.Log},
Evaluator: eval.Evaluator{Cfg: ng.Cfg, Log: ng.Log, DataSourceCache: ng.DataSourceCache},
InstanceStore: store,
RuleStore: store,
AdminConfigStore: store,

View File

@ -87,10 +87,7 @@ func (s *Service) handleExpressions(ctx context.Context, user *models.SignedInUs
RefID: pq.query.RefID,
MaxDataPoints: pq.query.MaxDataPoints,
QueryType: pq.query.QueryType,
Datasource: expr.DataSourceRef{
Type: pq.datasource.Type,
UID: pq.datasource.Uid,
},
DataSource: pq.datasource,
TimeRange: expr.TimeRange{
From: pq.query.TimeRange.From,
To: pq.query.TimeRange.To,