From 0254a29e35e9404ccc0ccfdc06af8dcac0cd508f Mon Sep 17 00:00:00 2001 From: Sven Klemm <31455525+svenklemm@users.noreply.github.com> Date: Thu, 13 Sep 2018 16:51:00 +0200 Subject: [PATCH] Interpolate $__interval in backend for alerting with sql datasources (#13156) add support for interpolate $__interval and $__interval_ms in sql datasources --- pkg/tsdb/elasticsearch/client/client.go | 2 +- pkg/tsdb/influxdb/query.go | 3 +- pkg/tsdb/interval.go | 4 ++ pkg/tsdb/mssql/macros.go | 22 ++--------- pkg/tsdb/mssql/mssql_test.go | 40 ++++++++++++++++++++ pkg/tsdb/mysql/macros.go | 23 ++---------- pkg/tsdb/mysql/mysql_test.go | 40 ++++++++++++++++++++ pkg/tsdb/postgres/macros.go | 26 +++---------- pkg/tsdb/postgres/postgres_test.go | 40 ++++++++++++++++++++ pkg/tsdb/sql_engine.go | 50 ++++++++++++++++++++++++- pkg/tsdb/sql_engine_test.go | 31 +++++++++++++++ 11 files changed, 218 insertions(+), 63 deletions(-) diff --git a/pkg/tsdb/elasticsearch/client/client.go b/pkg/tsdb/elasticsearch/client/client.go index dff626a79eb..78973b3faa6 100644 --- a/pkg/tsdb/elasticsearch/client/client.go +++ b/pkg/tsdb/elasticsearch/client/client.go @@ -138,7 +138,7 @@ func (c *baseClientImpl) encodeBatchRequests(requests []*multiRequest) ([]byte, } body := string(reqBody) - body = strings.Replace(body, "$__interval_ms", strconv.FormatInt(r.interval.Value.Nanoseconds()/int64(time.Millisecond), 10), -1) + body = strings.Replace(body, "$__interval_ms", strconv.FormatInt(r.interval.Milliseconds(), 10), -1) body = strings.Replace(body, "$__interval", r.interval.Text, -1) payload.WriteString(body + "\n") diff --git a/pkg/tsdb/influxdb/query.go b/pkg/tsdb/influxdb/query.go index 0637a5bbb44..7cb8f0ecd82 100644 --- a/pkg/tsdb/influxdb/query.go +++ b/pkg/tsdb/influxdb/query.go @@ -4,7 +4,6 @@ import ( "fmt" "strconv" "strings" - "time" "regexp" @@ -34,7 +33,7 @@ func (query *Query) Build(queryContext *tsdb.TsdbQuery) (string, error) { res = strings.Replace(res, "$timeFilter", query.renderTimeFilter(queryContext), -1) res = strings.Replace(res, "$interval", interval.Text, -1) - res = strings.Replace(res, "$__interval_ms", strconv.FormatInt(interval.Value.Nanoseconds()/int64(time.Millisecond), 10), -1) + res = strings.Replace(res, "$__interval_ms", strconv.FormatInt(interval.Milliseconds(), 10), -1) res = strings.Replace(res, "$__interval", interval.Text, -1) return res, nil } diff --git a/pkg/tsdb/interval.go b/pkg/tsdb/interval.go index 49904f27a37..fd6adee39d7 100644 --- a/pkg/tsdb/interval.go +++ b/pkg/tsdb/interval.go @@ -49,6 +49,10 @@ func NewIntervalCalculator(opt *IntervalOptions) *intervalCalculator { return calc } +func (i *Interval) Milliseconds() int64 { + return i.Value.Nanoseconds() / int64(time.Millisecond) +} + func (ic *intervalCalculator) Calculate(timerange *TimeRange, minInterval time.Duration) Interval { to := timerange.MustGetTo().UnixNano() from := timerange.MustGetFrom().UnixNano() diff --git a/pkg/tsdb/mssql/macros.go b/pkg/tsdb/mssql/macros.go index caba043e7b6..9303712a480 100644 --- a/pkg/tsdb/mssql/macros.go +++ b/pkg/tsdb/mssql/macros.go @@ -13,12 +13,13 @@ const rsIdentifier = `([_a-zA-Z0-9]+)` const sExpr = `\$` + rsIdentifier + `\(([^\)]*)\)` type msSqlMacroEngine struct { + *tsdb.SqlMacroEngineBase timeRange *tsdb.TimeRange query *tsdb.Query } func newMssqlMacroEngine() tsdb.SqlMacroEngine { - return &msSqlMacroEngine{} + return &msSqlMacroEngine{SqlMacroEngineBase: tsdb.NewSqlMacroEngineBase()} } func (m *msSqlMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error) { @@ -27,7 +28,7 @@ func (m *msSqlMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRa rExp, _ := regexp.Compile(sExpr) var macroError error - sql = replaceAllStringSubmatchFunc(rExp, sql, func(groups []string) string { + sql = m.ReplaceAllStringSubmatchFunc(rExp, sql, func(groups []string) string { args := strings.Split(groups[2], ",") for i, arg := range args { args[i] = strings.Trim(arg, " ") @@ -47,23 +48,6 @@ func (m *msSqlMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRa return sql, nil } -func replaceAllStringSubmatchFunc(re *regexp.Regexp, str string, repl func([]string) string) string { - result := "" - lastIndex := 0 - - for _, v := range re.FindAllSubmatchIndex([]byte(str), -1) { - groups := []string{} - for i := 0; i < len(v); i += 2 { - groups = append(groups, str[v[i]:v[i+1]]) - } - - result += str[lastIndex:v[0]] + repl(groups) - lastIndex = v[1] - } - - return result + str[lastIndex:] -} - func (m *msSqlMacroEngine) evaluateMacro(name string, args []string) (string, error) { switch name { case "__time": diff --git a/pkg/tsdb/mssql/mssql_test.go b/pkg/tsdb/mssql/mssql_test.go index 30d1da3bda1..f9525fc37ac 100644 --- a/pkg/tsdb/mssql/mssql_test.go +++ b/pkg/tsdb/mssql/mssql_test.go @@ -35,6 +35,11 @@ func TestMSSQL(t *testing.T) { return x, nil } + origInterpolate := tsdb.Interpolate + tsdb.Interpolate = func(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error) { + return sql, nil + } + endpoint, err := newMssqlQueryEndpoint(&models.DataSource{ JsonData: simplejson.New(), SecureJsonData: securejsondata.SecureJsonData{}, @@ -47,6 +52,7 @@ func TestMSSQL(t *testing.T) { Reset(func() { sess.Close() tsdb.NewXormEngine = origXormEngine + tsdb.Interpolate = origInterpolate }) Convey("Given a table with different native data types", func() { @@ -295,6 +301,40 @@ func TestMSSQL(t *testing.T) { }) + Convey("When doing a metric query using timeGroup and $__interval", func() { + mockInterpolate := tsdb.Interpolate + tsdb.Interpolate = origInterpolate + + Reset(func() { + tsdb.Interpolate = mockInterpolate + }) + + Convey("Should replace $__interval", func() { + query := &tsdb.TsdbQuery{ + Queries: []*tsdb.Query{ + { + DataSource: &models.DataSource{}, + Model: simplejson.NewFromAny(map[string]interface{}{ + "rawSql": "SELECT $__timeGroup(time, $__interval) AS time, avg(value) as value FROM metric GROUP BY $__timeGroup(time, $__interval) ORDER BY 1", + "format": "time_series", + }), + RefId: "A", + }, + }, + TimeRange: &tsdb.TimeRange{ + From: fmt.Sprintf("%v", fromStart.Unix()*1000), + To: fmt.Sprintf("%v", fromStart.Add(30*time.Minute).Unix()*1000), + }, + } + + resp, err := endpoint.Query(nil, nil, query) + So(err, ShouldBeNil) + queryResult := resp.Results["A"] + So(queryResult.Error, ShouldBeNil) + So(queryResult.Meta.Get("sql").MustString(), ShouldEqual, "SELECT FLOOR(DATEDIFF(second, '1970-01-01', time)/60)*60 AS time, avg(value) as value FROM metric GROUP BY FLOOR(DATEDIFF(second, '1970-01-01', time)/60)*60 ORDER BY 1") + }) + }) + Convey("When doing a metric query using timeGroup with float fill enabled", func() { query := &tsdb.TsdbQuery{ Queries: []*tsdb.Query{ diff --git a/pkg/tsdb/mysql/macros.go b/pkg/tsdb/mysql/macros.go index 0dabdd7c283..0f1c4fcaf2c 100644 --- a/pkg/tsdb/mysql/macros.go +++ b/pkg/tsdb/mysql/macros.go @@ -9,17 +9,17 @@ import ( "github.com/grafana/grafana/pkg/tsdb" ) -//const rsString = `(?:"([^"]*)")`; const rsIdentifier = `([_a-zA-Z0-9]+)` const sExpr = `\$` + rsIdentifier + `\(([^\)]*)\)` type mySqlMacroEngine struct { + *tsdb.SqlMacroEngineBase timeRange *tsdb.TimeRange query *tsdb.Query } func newMysqlMacroEngine() tsdb.SqlMacroEngine { - return &mySqlMacroEngine{} + return &mySqlMacroEngine{SqlMacroEngineBase: tsdb.NewSqlMacroEngineBase()} } func (m *mySqlMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error) { @@ -28,7 +28,7 @@ func (m *mySqlMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRa rExp, _ := regexp.Compile(sExpr) var macroError error - sql = replaceAllStringSubmatchFunc(rExp, sql, func(groups []string) string { + sql = m.ReplaceAllStringSubmatchFunc(rExp, sql, func(groups []string) string { args := strings.Split(groups[2], ",") for i, arg := range args { args[i] = strings.Trim(arg, " ") @@ -48,23 +48,6 @@ func (m *mySqlMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRa return sql, nil } -func replaceAllStringSubmatchFunc(re *regexp.Regexp, str string, repl func([]string) string) string { - result := "" - lastIndex := 0 - - for _, v := range re.FindAllSubmatchIndex([]byte(str), -1) { - groups := []string{} - for i := 0; i < len(v); i += 2 { - groups = append(groups, str[v[i]:v[i+1]]) - } - - result += str[lastIndex:v[0]] + repl(groups) - lastIndex = v[1] - } - - return result + str[lastIndex:] -} - func (m *mySqlMacroEngine) evaluateMacro(name string, args []string) (string, error) { switch name { case "__timeEpoch", "__time": diff --git a/pkg/tsdb/mysql/mysql_test.go b/pkg/tsdb/mysql/mysql_test.go index ca6df8e360e..13d9040a738 100644 --- a/pkg/tsdb/mysql/mysql_test.go +++ b/pkg/tsdb/mysql/mysql_test.go @@ -42,6 +42,11 @@ func TestMySQL(t *testing.T) { return x, nil } + origInterpolate := tsdb.Interpolate + tsdb.Interpolate = func(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error) { + return sql, nil + } + endpoint, err := newMysqlQueryEndpoint(&models.DataSource{ JsonData: simplejson.New(), SecureJsonData: securejsondata.SecureJsonData{}, @@ -54,6 +59,7 @@ func TestMySQL(t *testing.T) { Reset(func() { sess.Close() tsdb.NewXormEngine = origXormEngine + tsdb.Interpolate = origInterpolate }) Convey("Given a table with different native data types", func() { @@ -295,6 +301,40 @@ func TestMySQL(t *testing.T) { }) + Convey("When doing a metric query using timeGroup and $__interval", func() { + mockInterpolate := tsdb.Interpolate + tsdb.Interpolate = origInterpolate + + Reset(func() { + tsdb.Interpolate = mockInterpolate + }) + + Convey("Should replace $__interval", func() { + query := &tsdb.TsdbQuery{ + Queries: []*tsdb.Query{ + { + DataSource: &models.DataSource{}, + Model: simplejson.NewFromAny(map[string]interface{}{ + "rawSql": "SELECT $__timeGroup(time, $__interval) AS time, avg(value) as value FROM metric GROUP BY 1 ORDER BY 1", + "format": "time_series", + }), + RefId: "A", + }, + }, + TimeRange: &tsdb.TimeRange{ + From: fmt.Sprintf("%v", fromStart.Unix()*1000), + To: fmt.Sprintf("%v", fromStart.Add(30*time.Minute).Unix()*1000), + }, + } + + resp, err := endpoint.Query(nil, nil, query) + So(err, ShouldBeNil) + queryResult := resp.Results["A"] + So(queryResult.Error, ShouldBeNil) + So(queryResult.Meta.Get("sql").MustString(), ShouldEqual, "SELECT UNIX_TIMESTAMP(time) DIV 60 * 60 AS time, avg(value) as value FROM metric GROUP BY 1 ORDER BY 1") + }) + }) + Convey("When doing a metric query using timeGroup with value fill enabled", func() { query := &tsdb.TsdbQuery{ Queries: []*tsdb.Query{ diff --git a/pkg/tsdb/postgres/macros.go b/pkg/tsdb/postgres/macros.go index 0a2ea1d2af6..16f4adb68a6 100644 --- a/pkg/tsdb/postgres/macros.go +++ b/pkg/tsdb/postgres/macros.go @@ -9,18 +9,21 @@ import ( "github.com/grafana/grafana/pkg/tsdb" ) -//const rsString = `(?:"([^"]*)")`; const rsIdentifier = `([_a-zA-Z0-9]+)` const sExpr = `\$` + rsIdentifier + `\(([^\)]*)\)` type postgresMacroEngine struct { + *tsdb.SqlMacroEngineBase timeRange *tsdb.TimeRange query *tsdb.Query timescaledb bool } func newPostgresMacroEngine(timescaledb bool) tsdb.SqlMacroEngine { - return &postgresMacroEngine{timescaledb: timescaledb} + return &postgresMacroEngine{ + SqlMacroEngineBase: tsdb.NewSqlMacroEngineBase(), + timescaledb: timescaledb, + } } func (m *postgresMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error) { @@ -29,7 +32,7 @@ func (m *postgresMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.Tim rExp, _ := regexp.Compile(sExpr) var macroError error - sql = replaceAllStringSubmatchFunc(rExp, sql, func(groups []string) string { + sql = m.ReplaceAllStringSubmatchFunc(rExp, sql, func(groups []string) string { // detect if $__timeGroup is supposed to add AS time for pre 5.3 compatibility // if there is a ',' directly after the macro call $__timeGroup is probably used @@ -66,23 +69,6 @@ func (m *postgresMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.Tim return sql, nil } -func replaceAllStringSubmatchFunc(re *regexp.Regexp, str string, repl func([]string) string) string { - result := "" - lastIndex := 0 - - for _, v := range re.FindAllSubmatchIndex([]byte(str), -1) { - groups := []string{} - for i := 0; i < len(v); i += 2 { - groups = append(groups, str[v[i]:v[i+1]]) - } - - result += str[lastIndex:v[0]] + repl(groups) - lastIndex = v[1] - } - - return result + str[lastIndex:] -} - func (m *postgresMacroEngine) evaluateMacro(name string, args []string) (string, error) { switch name { case "__time": diff --git a/pkg/tsdb/postgres/postgres_test.go b/pkg/tsdb/postgres/postgres_test.go index 4e05f676682..fc1a5f34253 100644 --- a/pkg/tsdb/postgres/postgres_test.go +++ b/pkg/tsdb/postgres/postgres_test.go @@ -43,6 +43,11 @@ func TestPostgres(t *testing.T) { return x, nil } + origInterpolate := tsdb.Interpolate + tsdb.Interpolate = func(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error) { + return sql, nil + } + endpoint, err := newPostgresQueryEndpoint(&models.DataSource{ JsonData: simplejson.New(), SecureJsonData: securejsondata.SecureJsonData{}, @@ -55,6 +60,7 @@ func TestPostgres(t *testing.T) { Reset(func() { sess.Close() tsdb.NewXormEngine = origXormEngine + tsdb.Interpolate = origInterpolate }) Convey("Given a table with different native data types", func() { @@ -222,6 +228,40 @@ func TestPostgres(t *testing.T) { } }) + Convey("When doing a metric query using timeGroup and $__interval", func() { + mockInterpolate := tsdb.Interpolate + tsdb.Interpolate = origInterpolate + + Reset(func() { + tsdb.Interpolate = mockInterpolate + }) + + Convey("Should replace $__interval", func() { + query := &tsdb.TsdbQuery{ + Queries: []*tsdb.Query{ + { + DataSource: &models.DataSource{}, + Model: simplejson.NewFromAny(map[string]interface{}{ + "rawSql": "SELECT $__timeGroup(time, $__interval) AS time, avg(value) as value FROM metric GROUP BY 1 ORDER BY 1", + "format": "time_series", + }), + RefId: "A", + }, + }, + TimeRange: &tsdb.TimeRange{ + From: fmt.Sprintf("%v", fromStart.Unix()*1000), + To: fmt.Sprintf("%v", fromStart.Add(30*time.Minute).Unix()*1000), + }, + } + + resp, err := endpoint.Query(nil, nil, query) + So(err, ShouldBeNil) + queryResult := resp.Results["A"] + So(queryResult.Error, ShouldBeNil) + So(queryResult.Meta.Get("sql").MustString(), ShouldEqual, "SELECT floor(extract(epoch from time)/60)*60 AS time, avg(value) as value FROM metric GROUP BY 1 ORDER BY 1") + }) + }) + Convey("When doing a metric query using timeGroup with NULL fill enabled", func() { query := &tsdb.TsdbQuery{ Queries: []*tsdb.Query{ diff --git a/pkg/tsdb/sql_engine.go b/pkg/tsdb/sql_engine.go index 454853c7cc8..18e02e328d1 100644 --- a/pkg/tsdb/sql_engine.go +++ b/pkg/tsdb/sql_engine.go @@ -6,6 +6,7 @@ import ( "database/sql" "fmt" "math" + "regexp" "strconv" "strings" "sync" @@ -43,6 +44,8 @@ var engineCache = engineCacheType{ versions: make(map[int64]int), } +var sqlIntervalCalculator = NewIntervalCalculator(nil) + var NewXormEngine = func(driverName string, connectionString string) (*xorm.Engine, error) { return xorm.NewEngine(driverName, connectionString) } @@ -126,7 +129,15 @@ func (e *sqlQueryEndpoint) Query(ctx context.Context, dsInfo *models.DataSource, queryResult := &QueryResult{Meta: simplejson.New(), RefId: query.RefId} result.Results[query.RefId] = queryResult - rawSQL, err := e.macroEngine.Interpolate(query, tsdbQuery.TimeRange, rawSQL) + // global substitutions + rawSQL, err := Interpolate(query, tsdbQuery.TimeRange, rawSQL) + if err != nil { + queryResult.Error = err + continue + } + + // datasource specific substitutions + rawSQL, err = e.macroEngine.Interpolate(query, tsdbQuery.TimeRange, rawSQL) if err != nil { queryResult.Error = err continue @@ -163,6 +174,20 @@ func (e *sqlQueryEndpoint) Query(ctx context.Context, dsInfo *models.DataSource, return result, nil } +// global macros/substitutions for all sql datasources +var Interpolate = func(query *Query, timeRange *TimeRange, sql string) (string, error) { + minInterval, err := GetIntervalFrom(query.DataSource, query.Model, time.Second*60) + if err != nil { + return sql, nil + } + interval := sqlIntervalCalculator.Calculate(timeRange, minInterval) + + sql = strings.Replace(sql, "$__interval_ms", strconv.FormatInt(interval.Milliseconds(), 10), -1) + sql = strings.Replace(sql, "$__interval", interval.Text, -1) + + return sql, nil +} + func (e *sqlQueryEndpoint) transformToTable(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error { columnNames, err := rows.Columns() columnCount := len(columnNames) @@ -589,3 +614,26 @@ func SetupFillmode(query *Query, interval time.Duration, fillmode string) error return nil } + +type SqlMacroEngineBase struct{} + +func NewSqlMacroEngineBase() *SqlMacroEngineBase { + return &SqlMacroEngineBase{} +} + +func (m *SqlMacroEngineBase) ReplaceAllStringSubmatchFunc(re *regexp.Regexp, str string, repl func([]string) string) string { + result := "" + lastIndex := 0 + + for _, v := range re.FindAllSubmatchIndex([]byte(str), -1) { + groups := []string{} + for i := 0; i < len(v); i += 2 { + groups = append(groups, str[v[i]:v[i+1]]) + } + + result += str[lastIndex:v[0]] + repl(groups) + lastIndex = v[1] + } + + return result + str[lastIndex:] +} diff --git a/pkg/tsdb/sql_engine_test.go b/pkg/tsdb/sql_engine_test.go index 854734fac31..05b8a51ae6f 100644 --- a/pkg/tsdb/sql_engine_test.go +++ b/pkg/tsdb/sql_engine_test.go @@ -5,6 +5,8 @@ import ( "time" "github.com/grafana/grafana/pkg/components/null" + "github.com/grafana/grafana/pkg/components/simplejson" + "github.com/grafana/grafana/pkg/models" . "github.com/smartystreets/goconvey/convey" ) @@ -14,6 +16,35 @@ func TestSqlEngine(t *testing.T) { dt := time.Date(2018, 3, 14, 21, 20, 6, int(527345*time.Microsecond), time.UTC) earlyDt := time.Date(1970, 3, 14, 21, 20, 6, int(527345*time.Microsecond), time.UTC) + Convey("Given a time range between 2018-04-12 00:00 and 2018-04-12 00:05", func() { + from := time.Date(2018, 4, 12, 18, 0, 0, 0, time.UTC) + to := from.Add(5 * time.Minute) + timeRange := NewFakeTimeRange("5m", "now", to) + query := &Query{DataSource: &models.DataSource{}, Model: simplejson.New()} + + Convey("interpolate $__interval", func() { + sql, err := Interpolate(query, timeRange, "select $__interval ") + So(err, ShouldBeNil) + + So(sql, ShouldEqual, "select 1m ") + }) + + Convey("interpolate $__interval in $__timeGroup", func() { + sql, err := Interpolate(query, timeRange, "select $__timeGroupAlias(time,$__interval)") + So(err, ShouldBeNil) + + So(sql, ShouldEqual, "select $__timeGroupAlias(time,1m)") + }) + + Convey("interpolate $__interval_ms", func() { + sql, err := Interpolate(query, timeRange, "select $__interval_ms ") + So(err, ShouldBeNil) + + So(sql, ShouldEqual, "select 60000 ") + }) + + }) + Convey("Given row values with time.Time as time columns", func() { var nilPointer *time.Time