From b6df91d56bc0a6e7369f17d9d69ee4585f58f7a3 Mon Sep 17 00:00:00 2001 From: Sven Klemm Date: Fri, 8 Dec 2017 23:04:17 +0100 Subject: [PATCH 01/10] pass Query to MacroEngine Interpolate --- pkg/tsdb/mysql/macros.go | 2 +- pkg/tsdb/mysql/macros_test.go | 19 ++++++++++--------- pkg/tsdb/postgres/macros.go | 4 +++- pkg/tsdb/postgres/macros_test.go | 19 ++++++++++--------- pkg/tsdb/sql_engine.go | 8 ++++---- 5 files changed, 28 insertions(+), 24 deletions(-) diff --git a/pkg/tsdb/mysql/macros.go b/pkg/tsdb/mysql/macros.go index 108b81fc5f3..7eb673e7aa2 100644 --- a/pkg/tsdb/mysql/macros.go +++ b/pkg/tsdb/mysql/macros.go @@ -21,7 +21,7 @@ func NewMysqlMacroEngine() tsdb.SqlMacroEngine { return &MySqlMacroEngine{} } -func (m *MySqlMacroEngine) Interpolate(timeRange *tsdb.TimeRange, sql string) (string, error) { +func (m *MySqlMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error) { m.TimeRange = timeRange rExp, _ := regexp.Compile(sExpr) var macroError error diff --git a/pkg/tsdb/mysql/macros_test.go b/pkg/tsdb/mysql/macros_test.go index 988612fb287..19c10d20a06 100644 --- a/pkg/tsdb/mysql/macros_test.go +++ b/pkg/tsdb/mysql/macros_test.go @@ -10,31 +10,32 @@ import ( func TestMacroEngine(t *testing.T) { Convey("MacroEngine", t, func() { engine := &MySqlMacroEngine{} + query := &tsdb.Query{} timeRange := &tsdb.TimeRange{From: "5m", To: "now"} Convey("interpolate __time function", func() { - sql, err := engine.Interpolate(nil, "select $__time(time_column)") + sql, err := engine.Interpolate(query, timeRange, "select $__time(time_column)") So(err, ShouldBeNil) So(sql, ShouldEqual, "select UNIX_TIMESTAMP(time_column) as time_sec") }) Convey("interpolate __time function wrapped in aggregation", func() { - sql, err := engine.Interpolate(nil, "select min($__time(time_column))") + sql, err := engine.Interpolate(query, timeRange, "select min($__time(time_column))") So(err, ShouldBeNil) So(sql, ShouldEqual, "select min(UNIX_TIMESTAMP(time_column) as time_sec)") }) Convey("interpolate __timeFilter function", func() { - sql, err := engine.Interpolate(timeRange, "WHERE $__timeFilter(time_column)") + sql, err := engine.Interpolate(query, timeRange, "WHERE $__timeFilter(time_column)") So(err, ShouldBeNil) So(sql, ShouldEqual, "WHERE time_column >= FROM_UNIXTIME(18446744066914186738) AND time_column <= FROM_UNIXTIME(18446744066914187038)") }) Convey("interpolate __timeFrom function", func() { - sql, err := engine.Interpolate(timeRange, "select $__timeFrom(time_column)") + sql, err := engine.Interpolate(query, timeRange, "select $__timeFrom(time_column)") So(err, ShouldBeNil) So(sql, ShouldEqual, "select FROM_UNIXTIME(18446744066914186738)") @@ -42,35 +43,35 @@ func TestMacroEngine(t *testing.T) { Convey("interpolate __timeGroup function", func() { - sql, err := engine.Interpolate(timeRange, "GROUP BY $__timeGroup(time_column,'5m')") + sql, err := engine.Interpolate(query, timeRange, "GROUP BY $__timeGroup(time_column,'5m')") So(err, ShouldBeNil) So(sql, ShouldEqual, "GROUP BY cast(cast(UNIX_TIMESTAMP(time_column)/(300) as signed)*300 as signed)") }) Convey("interpolate __timeTo function", func() { - sql, err := engine.Interpolate(timeRange, "select $__timeTo(time_column)") + sql, err := engine.Interpolate(query, timeRange, "select $__timeTo(time_column)") So(err, ShouldBeNil) So(sql, ShouldEqual, "select FROM_UNIXTIME(18446744066914187038)") }) Convey("interpolate __unixEpochFilter function", func() { - sql, err := engine.Interpolate(timeRange, "select $__unixEpochFilter(18446744066914186738)") + sql, err := engine.Interpolate(query, timeRange, "select $__unixEpochFilter(18446744066914186738)") So(err, ShouldBeNil) So(sql, ShouldEqual, "select 18446744066914186738 >= 18446744066914186738 AND 18446744066914186738 <= 18446744066914187038") }) Convey("interpolate __unixEpochFrom function", func() { - sql, err := engine.Interpolate(timeRange, "select $__unixEpochFrom()") + sql, err := engine.Interpolate(query, timeRange, "select $__unixEpochFrom()") So(err, ShouldBeNil) So(sql, ShouldEqual, "select 18446744066914186738") }) Convey("interpolate __unixEpochTo function", func() { - sql, err := engine.Interpolate(timeRange, "select $__unixEpochTo()") + sql, err := engine.Interpolate(query, timeRange, "select $__unixEpochTo()") So(err, ShouldBeNil) So(sql, ShouldEqual, "select 18446744066914187038") diff --git a/pkg/tsdb/postgres/macros.go b/pkg/tsdb/postgres/macros.go index 086eb96655f..1fffbb100bc 100644 --- a/pkg/tsdb/postgres/macros.go +++ b/pkg/tsdb/postgres/macros.go @@ -15,14 +15,16 @@ const sExpr = `\$` + rsIdentifier + `\(([^\)]*)\)` type PostgresMacroEngine struct { TimeRange *tsdb.TimeRange + Query *tsdb.Query } func NewPostgresMacroEngine() tsdb.SqlMacroEngine { return &PostgresMacroEngine{} } -func (m *PostgresMacroEngine) Interpolate(timeRange *tsdb.TimeRange, sql string) (string, error) { +func (m *PostgresMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error) { m.TimeRange = timeRange + m.Query = query rExp, _ := regexp.Compile(sExpr) var macroError error diff --git a/pkg/tsdb/postgres/macros_test.go b/pkg/tsdb/postgres/macros_test.go index ebc5191d46e..cf89d6bdde3 100644 --- a/pkg/tsdb/postgres/macros_test.go +++ b/pkg/tsdb/postgres/macros_test.go @@ -10,31 +10,32 @@ import ( func TestMacroEngine(t *testing.T) { Convey("MacroEngine", t, func() { engine := &PostgresMacroEngine{} + query := &tsdb.Query{} timeRange := &tsdb.TimeRange{From: "5m", To: "now"} Convey("interpolate __time function", func() { - sql, err := engine.Interpolate(nil, "select $__time(time_column)") + sql, err := engine.Interpolate(query, timeRange, "select $__time(time_column)") So(err, ShouldBeNil) So(sql, ShouldEqual, "select time_column AS \"time\"") }) Convey("interpolate __time function wrapped in aggregation", func() { - sql, err := engine.Interpolate(nil, "select min($__time(time_column))") + sql, err := engine.Interpolate(query, timeRange, "select min($__time(time_column))") So(err, ShouldBeNil) So(sql, ShouldEqual, "select min(time_column AS \"time\")") }) Convey("interpolate __timeFilter function", func() { - sql, err := engine.Interpolate(timeRange, "WHERE $__timeFilter(time_column)") + sql, err := engine.Interpolate(query, timeRange, "WHERE $__timeFilter(time_column)") So(err, ShouldBeNil) So(sql, ShouldEqual, "WHERE extract(epoch from time_column) BETWEEN 18446744066914186738 AND 18446744066914187038") }) Convey("interpolate __timeFrom function", func() { - sql, err := engine.Interpolate(timeRange, "select $__timeFrom(time_column)") + sql, err := engine.Interpolate(query, timeRange, "select $__timeFrom(time_column)") So(err, ShouldBeNil) So(sql, ShouldEqual, "select to_timestamp(18446744066914186738)") @@ -42,35 +43,35 @@ func TestMacroEngine(t *testing.T) { Convey("interpolate __timeGroup function", func() { - sql, err := engine.Interpolate(timeRange, "GROUP BY $__timeGroup(time_column,'5m')") + sql, err := engine.Interpolate(query, timeRange, "GROUP BY $__timeGroup(time_column,'5m')") So(err, ShouldBeNil) So(sql, ShouldEqual, "GROUP BY (extract(epoch from time_column)/300)::bigint*300 AS time") }) Convey("interpolate __timeTo function", func() { - sql, err := engine.Interpolate(timeRange, "select $__timeTo(time_column)") + sql, err := engine.Interpolate(query, timeRange, "select $__timeTo(time_column)") So(err, ShouldBeNil) So(sql, ShouldEqual, "select to_timestamp(18446744066914187038)") }) Convey("interpolate __unixEpochFilter function", func() { - sql, err := engine.Interpolate(timeRange, "select $__unixEpochFilter(18446744066914186738)") + sql, err := engine.Interpolate(query, timeRange, "select $__unixEpochFilter(18446744066914186738)") So(err, ShouldBeNil) So(sql, ShouldEqual, "select 18446744066914186738 >= 18446744066914186738 AND 18446744066914186738 <= 18446744066914187038") }) Convey("interpolate __unixEpochFrom function", func() { - sql, err := engine.Interpolate(timeRange, "select $__unixEpochFrom()") + sql, err := engine.Interpolate(query, timeRange, "select $__unixEpochFrom()") So(err, ShouldBeNil) So(sql, ShouldEqual, "select 18446744066914186738") }) Convey("interpolate __unixEpochTo function", func() { - sql, err := engine.Interpolate(timeRange, "select $__unixEpochTo()") + sql, err := engine.Interpolate(query, timeRange, "select $__unixEpochTo()") So(err, ShouldBeNil) So(sql, ShouldEqual, "select 18446744066914187038") diff --git a/pkg/tsdb/sql_engine.go b/pkg/tsdb/sql_engine.go index 12778b4e1ad..8bfb6a21703 100644 --- a/pkg/tsdb/sql_engine.go +++ b/pkg/tsdb/sql_engine.go @@ -22,10 +22,10 @@ type SqlEngine interface { ) (*Response, error) } -// SqlMacroEngine interpolates macros into sql. It takes in the timeRange to be able to -// generate queries that use from and to. +// SqlMacroEngine interpolates macros into sql. It takes in the Query to have access to query context and +// timeRange to be able to generate queries that use from and to. type SqlMacroEngine interface { - Interpolate(timeRange *TimeRange, sql string) (string, error) + Interpolate(query *Query, timeRange *TimeRange, sql string) (string, error) } type DefaultSqlEngine struct { @@ -97,7 +97,7 @@ func (e *DefaultSqlEngine) Query( queryResult := &QueryResult{Meta: simplejson.New(), RefId: query.RefId} result.Results[query.RefId] = queryResult - rawSql, err := e.MacroEngine.Interpolate(tsdbQuery.TimeRange, rawSql) + rawSql, err := e.MacroEngine.Interpolate(query, tsdbQuery.TimeRange, rawSql) if err != nil { queryResult.Error = err continue From b86a42fffec4b62cde10167ffdc43b99c08cdd5d Mon Sep 17 00:00:00 2001 From: Sven Klemm Date: Sat, 9 Dec 2017 20:35:00 +0100 Subject: [PATCH 02/10] pass tsdbQuery to transformToTimeSeries and transformToTable to get access to selected frontend timerange --- pkg/tsdb/mysql/mysql.go | 4 ++-- pkg/tsdb/postgres/postgres.go | 4 ++-- pkg/tsdb/sql_engine.go | 12 ++++++------ 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/tsdb/mysql/mysql.go b/pkg/tsdb/mysql/mysql.go index e5c6b92f245..b3fec71aaf7 100644 --- a/pkg/tsdb/mysql/mysql.go +++ b/pkg/tsdb/mysql/mysql.go @@ -56,7 +56,7 @@ func (e *MysqlQueryEndpoint) Query(ctx context.Context, dsInfo *models.DataSourc return e.sqlEngine.Query(ctx, dsInfo, tsdbQuery, e.transformToTimeSeries, e.transformToTable) } -func (e MysqlQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult) error { +func (e MysqlQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error { columnNames, err := rows.Columns() columnCount := len(columnNames) @@ -163,7 +163,7 @@ func (e MysqlQueryEndpoint) getTypedRowData(types []*sql.ColumnType, rows *core. return values, nil } -func (e MysqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult) error { +func (e MysqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error { pointsBySeries := make(map[string]*tsdb.TimeSeries) seriesByQueryOrder := list.New() columnNames, err := rows.Columns() diff --git a/pkg/tsdb/postgres/postgres.go b/pkg/tsdb/postgres/postgres.go index a8c96d8119c..3cf4d25f1f7 100644 --- a/pkg/tsdb/postgres/postgres.go +++ b/pkg/tsdb/postgres/postgres.go @@ -60,7 +60,7 @@ func (e *PostgresQueryEndpoint) Query(ctx context.Context, dsInfo *models.DataSo return e.sqlEngine.Query(ctx, dsInfo, tsdbQuery, e.transformToTimeSeries, e.transformToTable) } -func (e PostgresQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult) error { +func (e PostgresQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error { columnNames, err := rows.Columns() if err != nil { @@ -157,7 +157,7 @@ func (e PostgresQueryEndpoint) getTypedRowData(rows *core.Rows) (tsdb.RowValues, return values, nil } -func (e PostgresQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult) error { +func (e PostgresQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error { pointsBySeries := make(map[string]*tsdb.TimeSeries) seriesByQueryOrder := list.New() diff --git a/pkg/tsdb/sql_engine.go b/pkg/tsdb/sql_engine.go index 8bfb6a21703..7ea0682235f 100644 --- a/pkg/tsdb/sql_engine.go +++ b/pkg/tsdb/sql_engine.go @@ -17,8 +17,8 @@ type SqlEngine interface { ctx context.Context, ds *models.DataSource, query *TsdbQuery, - transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult) error, - transformToTable func(query *Query, rows *core.Rows, result *QueryResult) error, + transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error, + transformToTable func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error, ) (*Response, error) } @@ -77,8 +77,8 @@ func (e *DefaultSqlEngine) Query( ctx context.Context, dsInfo *models.DataSource, tsdbQuery *TsdbQuery, - transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult) error, - transformToTable func(query *Query, rows *core.Rows, result *QueryResult) error, + transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error, + transformToTable func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error, ) (*Response, error) { result := &Response{ Results: make(map[string]*QueryResult), @@ -117,13 +117,13 @@ func (e *DefaultSqlEngine) Query( switch format { case "time_series": - err := transformToTimeSeries(query, rows, queryResult) + err := transformToTimeSeries(query, rows, queryResult, tsdbQuery) if err != nil { queryResult.Error = err continue } case "table": - err := transformToTable(query, rows, queryResult) + err := transformToTable(query, rows, queryResult, tsdbQuery) if err != nil { queryResult.Error = err continue From e2a3590d8b4142b26b1e864d9ab3c943212d5477 Mon Sep 17 00:00:00 2001 From: Sven Klemm Date: Sun, 10 Dec 2017 09:59:33 +0100 Subject: [PATCH 03/10] allow optional 3rd argument to timeGroup to control filling missing values --- pkg/tsdb/postgres/macros.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/pkg/tsdb/postgres/macros.go b/pkg/tsdb/postgres/macros.go index 1fffbb100bc..c0c03c268d3 100644 --- a/pkg/tsdb/postgres/macros.go +++ b/pkg/tsdb/postgres/macros.go @@ -3,6 +3,7 @@ package postgres import ( "fmt" "regexp" + "strconv" "strings" "time" @@ -84,13 +85,26 @@ func (m *PostgresMacroEngine) evaluateMacro(name string, args []string) (string, case "__timeTo": return fmt.Sprintf("to_timestamp(%d)", uint64(m.TimeRange.GetToAsMsEpoch()/1000)), nil case "__timeGroup": - if len(args) != 2 { - return "", fmt.Errorf("macro %v needs time column and interval", name) + if len(args) < 2 { + return "", fmt.Errorf("macro %v needs time column and interval and optional fill value", name) } interval, err := time.ParseDuration(strings.Trim(args[1], `' `)) if err != nil { return "", fmt.Errorf("error parsing interval %v", args[1]) } + if len(args) == 3 { + m.Query.Model.Set("fill", true) + m.Query.Model.Set("fillInterval", interval.Seconds()) + if strings.Trim(args[2], " ") == "NULL" { + m.Query.Model.Set("fillNull", true) + } else { + floatVal, err := strconv.ParseFloat(args[2], 64) + if err != nil { + return "", fmt.Errorf("error parsing fill value %v", args[2]) + } + m.Query.Model.Set("fillValue", floatVal) + } + } return fmt.Sprintf("(extract(epoch from %s)/%v)::bigint*%v AS time", args[0], interval.Seconds(), interval.Seconds()), nil case "__unixEpochFilter": if len(args) == 0 { From c1282e8ea8fe3274300b0bc68d7d6253ea4590c8 Mon Sep 17 00:00:00 2001 From: Sven Klemm Date: Sun, 10 Dec 2017 11:42:55 +0100 Subject: [PATCH 04/10] implement missing value fill functionality for postgres --- pkg/tsdb/postgres/postgres.go | 68 ++++++++++++++++++++++++++++------- 1 file changed, 55 insertions(+), 13 deletions(-) diff --git a/pkg/tsdb/postgres/postgres.go b/pkg/tsdb/postgres/postgres.go index 3cf4d25f1f7..d7bc42b3a11 100644 --- a/pkg/tsdb/postgres/postgres.go +++ b/pkg/tsdb/postgres/postgres.go @@ -4,6 +4,7 @@ import ( "container/list" "context" "fmt" + "math" "net/url" "strconv" "time" @@ -198,6 +199,18 @@ func (e PostgresQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *co return fmt.Errorf("Found no column named time") } + fillMissing := query.Model.Get("fill").MustBool(false) + var fillInterval float64 + fillValue := null.Float{} + if fillMissing { + fillInterval = query.Model.Get("fillInterval").MustFloat64() * 1000 + if query.Model.Get("fillNULL").MustBool(false) == false { + fillValue.Float64 = query.Model.Get("fillValue").MustFloat64() + fillValue.Valid = true + } + + } + for rows.Next() { var timestamp float64 var value null.Float @@ -249,7 +262,34 @@ func (e PostgresQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *co if metricIndex == -1 { metric = col } - e.appendTimePoint(pointsBySeries, seriesByQueryOrder, metric, timestamp, value) + + series, exist := pointsBySeries[metric] + if exist == false { + series = &tsdb.TimeSeries{Name: metric} + pointsBySeries[metric] = series + seriesByQueryOrder.PushBack(metric) + } + + if fillMissing { + var intervalStart float64 + if exist == false { + intervalStart = float64(tsdbQuery.TimeRange.MustGetFrom().UnixNano() / 1e6) + } else { + intervalStart = series.Points[len(series.Points)-1][1].Float64 + fillInterval + } + + // align interval start + intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval + + for i := intervalStart; i < timestamp; i += fillInterval { + series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)}) + rowCount++ + } + } + + series.Points = append(series.Points, tsdb.TimePoint{value, null.FloatFrom(timestamp)}) + + e.log.Debug("Rows", "metric", metric, "time", timestamp, "value", value) rowCount++ } @@ -258,20 +298,22 @@ func (e PostgresQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *co for elem := seriesByQueryOrder.Front(); elem != nil; elem = elem.Next() { key := elem.Value.(string) result.Series = append(result.Series, pointsBySeries[key]) + + if fillMissing { + series := pointsBySeries[key] + // fill in values from last fetched value till interval end + intervalStart := series.Points[len(series.Points)-1][1].Float64 + intervalEnd := float64(tsdbQuery.TimeRange.MustGetTo().UnixNano() / 1e6) + + // align interval start + intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval + for i := intervalStart + fillInterval; i < intervalEnd; i += fillInterval { + series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)}) + rowCount++ + } + } } result.Meta.Set("rowCount", rowCount) return nil } - -func (e PostgresQueryEndpoint) appendTimePoint(pointsBySeries map[string]*tsdb.TimeSeries, seriesByQueryOrder *list.List, metric string, timestamp float64, value null.Float) { - if series, exist := pointsBySeries[metric]; exist { - series.Points = append(series.Points, tsdb.TimePoint{value, null.FloatFrom(timestamp)}) - } else { - series := &tsdb.TimeSeries{Name: metric} - series.Points = append(series.Points, tsdb.TimePoint{value, null.FloatFrom(timestamp)}) - pointsBySeries[metric] = series - seriesByQueryOrder.PushBack(metric) - } - e.log.Debug("Rows", "metric", metric, "time", timestamp, "value", value) -} From 6cdd901ec636a6a108fc18d18c828ea5a5e90c7e Mon Sep 17 00:00:00 2001 From: Sven Klemm Date: Sun, 10 Dec 2017 12:40:09 +0100 Subject: [PATCH 05/10] fix typo --- pkg/tsdb/postgres/postgres.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/tsdb/postgres/postgres.go b/pkg/tsdb/postgres/postgres.go index d7bc42b3a11..067b173ad5a 100644 --- a/pkg/tsdb/postgres/postgres.go +++ b/pkg/tsdb/postgres/postgres.go @@ -204,7 +204,7 @@ func (e PostgresQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *co fillValue := null.Float{} if fillMissing { fillInterval = query.Model.Get("fillInterval").MustFloat64() * 1000 - if query.Model.Get("fillNULL").MustBool(false) == false { + if query.Model.Get("fillNull").MustBool(false) == false { fillValue.Float64 = query.Model.Get("fillValue").MustFloat64() fillValue.Valid = true } From cb15d9cbdd6d8de9675bbb37d196961f6ac2e2d4 Mon Sep 17 00:00:00 2001 From: Sven Klemm Date: Sun, 10 Dec 2017 19:50:01 +0100 Subject: [PATCH 06/10] add missing value fill code to mysql datasource --- pkg/tsdb/mysql/macros.go | 18 +++++++++++++- pkg/tsdb/mysql/mysql.go | 54 ++++++++++++++++++++++++++++++++++++---- 2 files changed, 66 insertions(+), 6 deletions(-) diff --git a/pkg/tsdb/mysql/macros.go b/pkg/tsdb/mysql/macros.go index 7eb673e7aa2..d684fdc9d92 100644 --- a/pkg/tsdb/mysql/macros.go +++ b/pkg/tsdb/mysql/macros.go @@ -3,6 +3,7 @@ package mysql import ( "fmt" "regexp" + "strconv" "strings" "time" @@ -15,6 +16,7 @@ const sExpr = `\$` + rsIdentifier + `\(([^\)]*)\)` type MySqlMacroEngine struct { TimeRange *tsdb.TimeRange + Query *tsdb.Query } func NewMysqlMacroEngine() tsdb.SqlMacroEngine { @@ -23,6 +25,7 @@ func NewMysqlMacroEngine() tsdb.SqlMacroEngine { func (m *MySqlMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error) { m.TimeRange = timeRange + m.Query = query rExp, _ := regexp.Compile(sExpr) var macroError error @@ -76,13 +79,26 @@ func (m *MySqlMacroEngine) evaluateMacro(name string, args []string) (string, er case "__timeTo": return fmt.Sprintf("FROM_UNIXTIME(%d)", uint64(m.TimeRange.GetToAsMsEpoch()/1000)), nil case "__timeGroup": - if len(args) != 2 { + if len(args) < 2 { return "", fmt.Errorf("macro %v needs time column and interval", name) } interval, err := time.ParseDuration(strings.Trim(args[1], `'" `)) if err != nil { return "", fmt.Errorf("error parsing interval %v", args[1]) } + if len(args) == 3 { + m.Query.Model.Set("fill", true) + m.Query.Model.Set("fillInterval", interval.Seconds()) + if strings.Trim(args[2], " ") == "NULL" { + m.Query.Model.Set("fillNull", true) + } else { + floatVal, err := strconv.ParseFloat(args[2], 64) + if err != nil { + return "", fmt.Errorf("error parsing fill value %v", args[2]) + } + m.Query.Model.Set("fillValue", floatVal) + } + } return fmt.Sprintf("cast(cast(UNIX_TIMESTAMP(%s)/(%.0f) as signed)*%.0f as signed)", args[0], interval.Seconds(), interval.Seconds()), nil case "__unixEpochFilter": if len(args) == 0 { diff --git a/pkg/tsdb/mysql/mysql.go b/pkg/tsdb/mysql/mysql.go index b3fec71aaf7..ddd90b77642 100644 --- a/pkg/tsdb/mysql/mysql.go +++ b/pkg/tsdb/mysql/mysql.go @@ -5,6 +5,7 @@ import ( "context" "database/sql" "fmt" + "math" "strconv" "time" @@ -176,6 +177,18 @@ func (e MysqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core. rowLimit := 1000000 rowCount := 0 + fillMissing := query.Model.Get("fill").MustBool(false) + var fillInterval float64 + fillValue := null.Float{} + if fillMissing { + fillInterval = query.Model.Get("fillInterval").MustFloat64() * 1000 + if query.Model.Get("fillNull").MustBool(false) == false { + fillValue.Float64 = query.Model.Get("fillValue").MustFloat64() + fillValue.Valid = true + } + + } + for ; rows.Next(); rowCount++ { if rowCount > rowLimit { return fmt.Errorf("MySQL query row limit exceeded, limit %d", rowLimit) @@ -195,19 +208,50 @@ func (e MysqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core. return fmt.Errorf("Found row with no time value") } - if series, exist := pointsBySeries[rowData.metric]; exist { - series.Points = append(series.Points, tsdb.TimePoint{rowData.value, rowData.time}) - } else { - series := &tsdb.TimeSeries{Name: rowData.metric} - series.Points = append(series.Points, tsdb.TimePoint{rowData.value, rowData.time}) + series, exist := pointsBySeries[rowData.metric] + if exist == false { + series = &tsdb.TimeSeries{Name: rowData.metric} pointsBySeries[rowData.metric] = series seriesByQueryOrder.PushBack(rowData.metric) } + + if fillMissing { + var intervalStart float64 + if exist == false { + intervalStart = float64(tsdbQuery.TimeRange.MustGetFrom().UnixNano() / 1e6) + } else { + intervalStart = series.Points[len(series.Points)-1][1].Float64 + fillInterval + } + + // align interval start + intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval + + for i := intervalStart; i < rowData.time.Float64; i += fillInterval { + series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)}) + rowCount++ + } + } + + series.Points = append(series.Points, tsdb.TimePoint{rowData.value, rowData.time}) } for elem := seriesByQueryOrder.Front(); elem != nil; elem = elem.Next() { key := elem.Value.(string) result.Series = append(result.Series, pointsBySeries[key]) + + if fillMissing { + series := pointsBySeries[key] + // fill in values from last fetched value till interval end + intervalStart := series.Points[len(series.Points)-1][1].Float64 + intervalEnd := float64(tsdbQuery.TimeRange.MustGetTo().UnixNano() / 1e6) + + // align interval start + intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval + for i := intervalStart + fillInterval; i < intervalEnd; i += fillInterval { + series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)}) + rowCount++ + } + } } result.Meta.Set("rowCount", rowCount) From f1ba9137c07d9d9c5d420585993b12ce750f0454 Mon Sep 17 00:00:00 2001 From: Sven Klemm Date: Fri, 2 Mar 2018 19:20:30 +0100 Subject: [PATCH 07/10] remove spaces around arguments before calling macro expansion --- pkg/tsdb/postgres/macros.go | 10 +++++++--- pkg/tsdb/postgres/macros_test.go | 8 ++++++++ 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/pkg/tsdb/postgres/macros.go b/pkg/tsdb/postgres/macros.go index c0c03c268d3..23daeebec5a 100644 --- a/pkg/tsdb/postgres/macros.go +++ b/pkg/tsdb/postgres/macros.go @@ -30,7 +30,11 @@ func (m *PostgresMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.Tim var macroError error sql = replaceAllStringSubmatchFunc(rExp, sql, func(groups []string) string { - res, err := m.evaluateMacro(groups[1], strings.Split(groups[2], ",")) + args := strings.Split(groups[2], ",") + for i, arg := range args { + args[i] = strings.Trim(arg, " ") + } + res, err := m.evaluateMacro(groups[1], args) if err != nil && macroError == nil { macroError = err return "macro_error()" @@ -88,14 +92,14 @@ func (m *PostgresMacroEngine) evaluateMacro(name string, args []string) (string, if len(args) < 2 { return "", fmt.Errorf("macro %v needs time column and interval and optional fill value", name) } - interval, err := time.ParseDuration(strings.Trim(args[1], `' `)) + interval, err := time.ParseDuration(strings.Trim(args[1], `'`)) if err != nil { return "", fmt.Errorf("error parsing interval %v", args[1]) } if len(args) == 3 { m.Query.Model.Set("fill", true) m.Query.Model.Set("fillInterval", interval.Seconds()) - if strings.Trim(args[2], " ") == "NULL" { + if args[2] == "NULL" { m.Query.Model.Set("fillNull", true) } else { floatVal, err := strconv.ParseFloat(args[2], 64) diff --git a/pkg/tsdb/postgres/macros_test.go b/pkg/tsdb/postgres/macros_test.go index cf89d6bdde3..b18acced963 100644 --- a/pkg/tsdb/postgres/macros_test.go +++ b/pkg/tsdb/postgres/macros_test.go @@ -49,6 +49,14 @@ func TestMacroEngine(t *testing.T) { So(sql, ShouldEqual, "GROUP BY (extract(epoch from time_column)/300)::bigint*300 AS time") }) + Convey("interpolate __timeGroup function with spaces between args", func() { + + sql, err := engine.Interpolate(query, timeRange, "GROUP BY $__timeGroup(time_column , '5m')") + So(err, ShouldBeNil) + + So(sql, ShouldEqual, "GROUP BY (extract(epoch from time_column)/300)::bigint*300 AS time") + }) + Convey("interpolate __timeTo function", func() { sql, err := engine.Interpolate(query, timeRange, "select $__timeTo(time_column)") So(err, ShouldBeNil) From 4e826ff8f76e88fafbc79b568e64449c3402c6c8 Mon Sep 17 00:00:00 2001 From: Sven Klemm Date: Fri, 2 Mar 2018 19:24:15 +0100 Subject: [PATCH 08/10] remove spaces around arguments of macros --- pkg/tsdb/mysql/macros.go | 10 +++++++--- pkg/tsdb/mysql/macros_test.go | 8 ++++++++ 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/pkg/tsdb/mysql/macros.go b/pkg/tsdb/mysql/macros.go index d684fdc9d92..b0170070dcf 100644 --- a/pkg/tsdb/mysql/macros.go +++ b/pkg/tsdb/mysql/macros.go @@ -30,7 +30,11 @@ func (m *MySqlMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRa var macroError error sql = replaceAllStringSubmatchFunc(rExp, sql, func(groups []string) string { - res, err := m.evaluateMacro(groups[1], strings.Split(groups[2], ",")) + args := strings.Split(groups[2], ",") + for i, arg := range args { + args[i] = strings.Trim(arg, " ") + } + res, err := m.evaluateMacro(groups[1], args) if err != nil && macroError == nil { macroError = err return "macro_error()" @@ -82,14 +86,14 @@ func (m *MySqlMacroEngine) evaluateMacro(name string, args []string) (string, er if len(args) < 2 { return "", fmt.Errorf("macro %v needs time column and interval", name) } - interval, err := time.ParseDuration(strings.Trim(args[1], `'" `)) + interval, err := time.ParseDuration(strings.Trim(args[1], `'"`)) if err != nil { return "", fmt.Errorf("error parsing interval %v", args[1]) } if len(args) == 3 { m.Query.Model.Set("fill", true) m.Query.Model.Set("fillInterval", interval.Seconds()) - if strings.Trim(args[2], " ") == "NULL" { + if args[2] == "NULL" { m.Query.Model.Set("fillNull", true) } else { floatVal, err := strconv.ParseFloat(args[2], 64) diff --git a/pkg/tsdb/mysql/macros_test.go b/pkg/tsdb/mysql/macros_test.go index 19c10d20a06..a89ba16ab78 100644 --- a/pkg/tsdb/mysql/macros_test.go +++ b/pkg/tsdb/mysql/macros_test.go @@ -49,6 +49,14 @@ func TestMacroEngine(t *testing.T) { So(sql, ShouldEqual, "GROUP BY cast(cast(UNIX_TIMESTAMP(time_column)/(300) as signed)*300 as signed)") }) + Convey("interpolate __timeGroup function with spaces around arguments", func() { + + sql, err := engine.Interpolate(query, timeRange, "GROUP BY $__timeGroup(time_column , '5m')") + So(err, ShouldBeNil) + + So(sql, ShouldEqual, "GROUP BY cast(cast(UNIX_TIMESTAMP(time_column)/(300) as signed)*300 as signed)") + }) + Convey("interpolate __timeTo function", func() { sql, err := engine.Interpolate(query, timeRange, "select $__timeTo(time_column)") So(err, ShouldBeNil) From 328fc45bcc332b817934f205b752e9536ac7c23d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torkel=20=C3=96degaard?= Date: Sat, 3 Mar 2018 12:49:40 +0100 Subject: [PATCH 09/10] docs: removed beta notice in whats new article --- docs/sources/guides/whats-new-in-v5.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/docs/sources/guides/whats-new-in-v5.md b/docs/sources/guides/whats-new-in-v5.md index fdc3c515a79..678f4cba22a 100644 --- a/docs/sources/guides/whats-new-in-v5.md +++ b/docs/sources/guides/whats-new-in-v5.md @@ -12,8 +12,6 @@ weight = -6 # What's New in Grafana v5.0 -> Out in beta: [Download now!](https://grafana.com/grafana/download/beta) - This is the most substantial update that Grafana has ever seen. This article will detail the major new features and enhancements. - [New Dashboard Layout Engine]({{< relref "#new-dashboard-layout-engine" >}}) enables a much easier drag, drop and resize experience and new types of layouts. From 69c93e6401b64e46368ec829d8cf867d4c0b74e9 Mon Sep 17 00:00:00 2001 From: Julien Pivotto Date: Sat, 3 Mar 2018 13:40:28 +0100 Subject: [PATCH 10/10] Fix Prometheus 2.0 stats (#11048) Fixes #11016 Signed-off-by: Julien Pivotto --- .../dashboards/prometheus_2_stats.json | 20 ++++++------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/public/app/plugins/datasource/prometheus/dashboards/prometheus_2_stats.json b/public/app/plugins/datasource/prometheus/dashboards/prometheus_2_stats.json index 6d6a1972d16..636575b7240 100644 --- a/public/app/plugins/datasource/prometheus/dashboards/prometheus_2_stats.json +++ b/public/app/plugins/datasource/prometheus/dashboards/prometheus_2_stats.json @@ -348,7 +348,7 @@ "tableColumn": "", "targets": [ { - "expr": "tsdb_wal_corruptions_total{job=\"prometheus\"}", + "expr": "prometheus_tsdb_wal_corruptions_total{job=\"prometheus\"}", "format": "time_series", "intervalFactor": 2, "legendFormat": "", @@ -1048,7 +1048,7 @@ "steppedLine": false, "targets": [ { - "expr": "max(prometheus_evaluator_duration_seconds{job=\"prometheus\", quantile!=\"0.01\", quantile!=\"0.05\"}) by (quantile)", + "expr": "max(prometheus_rule_group_duration_seconds{job=\"prometheus\"}) by (quantile)", "format": "time_series", "interval": "", "intervalFactor": 2, @@ -1060,7 +1060,7 @@ "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "Rule Eval Duration", + "title": "Rule Group Eval Duration", "tooltip": { "shared": true, "sort": 0, @@ -1124,7 +1124,7 @@ "steppedLine": false, "targets": [ { - "expr": "rate(prometheus_evaluator_iterations_missed_total{job=\"prometheus\"}[5m])", + "expr": "rate(prometheus_rule_group_iterations_missed_total{job=\"prometheus\"}[5m])", "format": "time_series", "intervalFactor": 2, "legendFormat": "missed", @@ -1132,15 +1132,7 @@ "step": 10 }, { - "expr": "rate(prometheus_evaluator_iterations_skipped_total{job=\"prometheus\"}[5m])", - "format": "time_series", - "intervalFactor": 2, - "legendFormat": "skipped", - "refId": "C", - "step": 10 - }, - { - "expr": "rate(prometheus_evaluator_iterations_total{job=\"prometheus\"}[5m])", + "expr": "rate(prometheus_rule_group_iterations_total{job=\"prometheus\"}[5m])", "format": "time_series", "intervalFactor": 2, "legendFormat": "iterations", @@ -1151,7 +1143,7 @@ "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "Rule Eval Activity", + "title": "Rule Group Eval Activity", "tooltip": { "shared": true, "sort": 0,