mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Merge pull request #10138 from svenklemm/macroengine-interpolate
postgres+mysql: add optional fill parameter to $__timeGroup macro to fill in missing values in series
This commit is contained in:
commit
7ce63169a0
@ -3,6 +3,7 @@ package mysql
|
||||
import (
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@ -15,19 +16,25 @@ const sExpr = `\$` + rsIdentifier + `\(([^\)]*)\)`
|
||||
|
||||
type MySqlMacroEngine struct {
|
||||
TimeRange *tsdb.TimeRange
|
||||
Query *tsdb.Query
|
||||
}
|
||||
|
||||
func NewMysqlMacroEngine() tsdb.SqlMacroEngine {
|
||||
return &MySqlMacroEngine{}
|
||||
}
|
||||
|
||||
func (m *MySqlMacroEngine) Interpolate(timeRange *tsdb.TimeRange, sql string) (string, error) {
|
||||
func (m *MySqlMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error) {
|
||||
m.TimeRange = timeRange
|
||||
m.Query = query
|
||||
rExp, _ := regexp.Compile(sExpr)
|
||||
var macroError error
|
||||
|
||||
sql = replaceAllStringSubmatchFunc(rExp, sql, func(groups []string) string {
|
||||
res, err := m.evaluateMacro(groups[1], strings.Split(groups[2], ","))
|
||||
args := strings.Split(groups[2], ",")
|
||||
for i, arg := range args {
|
||||
args[i] = strings.Trim(arg, " ")
|
||||
}
|
||||
res, err := m.evaluateMacro(groups[1], args)
|
||||
if err != nil && macroError == nil {
|
||||
macroError = err
|
||||
return "macro_error()"
|
||||
@ -76,13 +83,26 @@ func (m *MySqlMacroEngine) evaluateMacro(name string, args []string) (string, er
|
||||
case "__timeTo":
|
||||
return fmt.Sprintf("FROM_UNIXTIME(%d)", uint64(m.TimeRange.GetToAsMsEpoch()/1000)), nil
|
||||
case "__timeGroup":
|
||||
if len(args) != 2 {
|
||||
if len(args) < 2 {
|
||||
return "", fmt.Errorf("macro %v needs time column and interval", name)
|
||||
}
|
||||
interval, err := time.ParseDuration(strings.Trim(args[1], `'" `))
|
||||
interval, err := time.ParseDuration(strings.Trim(args[1], `'"`))
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("error parsing interval %v", args[1])
|
||||
}
|
||||
if len(args) == 3 {
|
||||
m.Query.Model.Set("fill", true)
|
||||
m.Query.Model.Set("fillInterval", interval.Seconds())
|
||||
if args[2] == "NULL" {
|
||||
m.Query.Model.Set("fillNull", true)
|
||||
} else {
|
||||
floatVal, err := strconv.ParseFloat(args[2], 64)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("error parsing fill value %v", args[2])
|
||||
}
|
||||
m.Query.Model.Set("fillValue", floatVal)
|
||||
}
|
||||
}
|
||||
return fmt.Sprintf("cast(cast(UNIX_TIMESTAMP(%s)/(%.0f) as signed)*%.0f as signed)", args[0], interval.Seconds(), interval.Seconds()), nil
|
||||
case "__unixEpochFilter":
|
||||
if len(args) == 0 {
|
||||
|
@ -10,31 +10,32 @@ import (
|
||||
func TestMacroEngine(t *testing.T) {
|
||||
Convey("MacroEngine", t, func() {
|
||||
engine := &MySqlMacroEngine{}
|
||||
query := &tsdb.Query{}
|
||||
timeRange := &tsdb.TimeRange{From: "5m", To: "now"}
|
||||
|
||||
Convey("interpolate __time function", func() {
|
||||
sql, err := engine.Interpolate(nil, "select $__time(time_column)")
|
||||
sql, err := engine.Interpolate(query, timeRange, "select $__time(time_column)")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
So(sql, ShouldEqual, "select UNIX_TIMESTAMP(time_column) as time_sec")
|
||||
})
|
||||
|
||||
Convey("interpolate __time function wrapped in aggregation", func() {
|
||||
sql, err := engine.Interpolate(nil, "select min($__time(time_column))")
|
||||
sql, err := engine.Interpolate(query, timeRange, "select min($__time(time_column))")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
So(sql, ShouldEqual, "select min(UNIX_TIMESTAMP(time_column) as time_sec)")
|
||||
})
|
||||
|
||||
Convey("interpolate __timeFilter function", func() {
|
||||
sql, err := engine.Interpolate(timeRange, "WHERE $__timeFilter(time_column)")
|
||||
sql, err := engine.Interpolate(query, timeRange, "WHERE $__timeFilter(time_column)")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
So(sql, ShouldEqual, "WHERE time_column >= FROM_UNIXTIME(18446744066914186738) AND time_column <= FROM_UNIXTIME(18446744066914187038)")
|
||||
})
|
||||
|
||||
Convey("interpolate __timeFrom function", func() {
|
||||
sql, err := engine.Interpolate(timeRange, "select $__timeFrom(time_column)")
|
||||
sql, err := engine.Interpolate(query, timeRange, "select $__timeFrom(time_column)")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
So(sql, ShouldEqual, "select FROM_UNIXTIME(18446744066914186738)")
|
||||
@ -42,35 +43,43 @@ func TestMacroEngine(t *testing.T) {
|
||||
|
||||
Convey("interpolate __timeGroup function", func() {
|
||||
|
||||
sql, err := engine.Interpolate(timeRange, "GROUP BY $__timeGroup(time_column,'5m')")
|
||||
sql, err := engine.Interpolate(query, timeRange, "GROUP BY $__timeGroup(time_column,'5m')")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
So(sql, ShouldEqual, "GROUP BY cast(cast(UNIX_TIMESTAMP(time_column)/(300) as signed)*300 as signed)")
|
||||
})
|
||||
|
||||
Convey("interpolate __timeGroup function with spaces around arguments", func() {
|
||||
|
||||
sql, err := engine.Interpolate(query, timeRange, "GROUP BY $__timeGroup(time_column , '5m')")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
So(sql, ShouldEqual, "GROUP BY cast(cast(UNIX_TIMESTAMP(time_column)/(300) as signed)*300 as signed)")
|
||||
})
|
||||
|
||||
Convey("interpolate __timeTo function", func() {
|
||||
sql, err := engine.Interpolate(timeRange, "select $__timeTo(time_column)")
|
||||
sql, err := engine.Interpolate(query, timeRange, "select $__timeTo(time_column)")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
So(sql, ShouldEqual, "select FROM_UNIXTIME(18446744066914187038)")
|
||||
})
|
||||
|
||||
Convey("interpolate __unixEpochFilter function", func() {
|
||||
sql, err := engine.Interpolate(timeRange, "select $__unixEpochFilter(18446744066914186738)")
|
||||
sql, err := engine.Interpolate(query, timeRange, "select $__unixEpochFilter(18446744066914186738)")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
So(sql, ShouldEqual, "select 18446744066914186738 >= 18446744066914186738 AND 18446744066914186738 <= 18446744066914187038")
|
||||
})
|
||||
|
||||
Convey("interpolate __unixEpochFrom function", func() {
|
||||
sql, err := engine.Interpolate(timeRange, "select $__unixEpochFrom()")
|
||||
sql, err := engine.Interpolate(query, timeRange, "select $__unixEpochFrom()")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
So(sql, ShouldEqual, "select 18446744066914186738")
|
||||
})
|
||||
|
||||
Convey("interpolate __unixEpochTo function", func() {
|
||||
sql, err := engine.Interpolate(timeRange, "select $__unixEpochTo()")
|
||||
sql, err := engine.Interpolate(query, timeRange, "select $__unixEpochTo()")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
So(sql, ShouldEqual, "select 18446744066914187038")
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"math"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"time"
|
||||
@ -56,7 +57,7 @@ func (e *MysqlQueryEndpoint) Query(ctx context.Context, dsInfo *models.DataSourc
|
||||
return e.sqlEngine.Query(ctx, dsInfo, tsdbQuery, e.transformToTimeSeries, e.transformToTable)
|
||||
}
|
||||
|
||||
func (e MysqlQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult) error {
|
||||
func (e MysqlQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error {
|
||||
columnNames, err := rows.Columns()
|
||||
columnCount := len(columnNames)
|
||||
|
||||
@ -175,7 +176,7 @@ func (e MysqlQueryEndpoint) getTypedRowData(rows *core.Rows) (tsdb.RowValues, er
|
||||
return values, nil
|
||||
}
|
||||
|
||||
func (e MysqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult) error {
|
||||
func (e MysqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error {
|
||||
pointsBySeries := make(map[string]*tsdb.TimeSeries)
|
||||
seriesByQueryOrder := list.New()
|
||||
|
||||
@ -188,6 +189,18 @@ func (e MysqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.
|
||||
rowLimit := 1000000
|
||||
rowCount := 0
|
||||
|
||||
fillMissing := query.Model.Get("fill").MustBool(false)
|
||||
var fillInterval float64
|
||||
fillValue := null.Float{}
|
||||
if fillMissing {
|
||||
fillInterval = query.Model.Get("fillInterval").MustFloat64() * 1000
|
||||
if query.Model.Get("fillNull").MustBool(false) == false {
|
||||
fillValue.Float64 = query.Model.Get("fillValue").MustFloat64()
|
||||
fillValue.Valid = true
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
for ; rows.Next(); rowCount++ {
|
||||
if rowCount > rowLimit {
|
||||
return fmt.Errorf("MySQL query row limit exceeded, limit %d", rowLimit)
|
||||
@ -207,19 +220,50 @@ func (e MysqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.
|
||||
return fmt.Errorf("Found row with no time value")
|
||||
}
|
||||
|
||||
if series, exist := pointsBySeries[rowData.metric]; exist {
|
||||
series.Points = append(series.Points, tsdb.TimePoint{rowData.value, rowData.time})
|
||||
} else {
|
||||
series := &tsdb.TimeSeries{Name: rowData.metric}
|
||||
series.Points = append(series.Points, tsdb.TimePoint{rowData.value, rowData.time})
|
||||
series, exist := pointsBySeries[rowData.metric]
|
||||
if exist == false {
|
||||
series = &tsdb.TimeSeries{Name: rowData.metric}
|
||||
pointsBySeries[rowData.metric] = series
|
||||
seriesByQueryOrder.PushBack(rowData.metric)
|
||||
}
|
||||
|
||||
if fillMissing {
|
||||
var intervalStart float64
|
||||
if exist == false {
|
||||
intervalStart = float64(tsdbQuery.TimeRange.MustGetFrom().UnixNano() / 1e6)
|
||||
} else {
|
||||
intervalStart = series.Points[len(series.Points)-1][1].Float64 + fillInterval
|
||||
}
|
||||
|
||||
// align interval start
|
||||
intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval
|
||||
|
||||
for i := intervalStart; i < rowData.time.Float64; i += fillInterval {
|
||||
series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)})
|
||||
rowCount++
|
||||
}
|
||||
}
|
||||
|
||||
series.Points = append(series.Points, tsdb.TimePoint{rowData.value, rowData.time})
|
||||
}
|
||||
|
||||
for elem := seriesByQueryOrder.Front(); elem != nil; elem = elem.Next() {
|
||||
key := elem.Value.(string)
|
||||
result.Series = append(result.Series, pointsBySeries[key])
|
||||
|
||||
if fillMissing {
|
||||
series := pointsBySeries[key]
|
||||
// fill in values from last fetched value till interval end
|
||||
intervalStart := series.Points[len(series.Points)-1][1].Float64
|
||||
intervalEnd := float64(tsdbQuery.TimeRange.MustGetTo().UnixNano() / 1e6)
|
||||
|
||||
// align interval start
|
||||
intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval
|
||||
for i := intervalStart + fillInterval; i < intervalEnd; i += fillInterval {
|
||||
series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)})
|
||||
rowCount++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
result.Meta.Set("rowCount", rowCount)
|
||||
|
@ -3,6 +3,7 @@ package postgres
|
||||
import (
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@ -15,19 +16,25 @@ const sExpr = `\$` + rsIdentifier + `\(([^\)]*)\)`
|
||||
|
||||
type PostgresMacroEngine struct {
|
||||
TimeRange *tsdb.TimeRange
|
||||
Query *tsdb.Query
|
||||
}
|
||||
|
||||
func NewPostgresMacroEngine() tsdb.SqlMacroEngine {
|
||||
return &PostgresMacroEngine{}
|
||||
}
|
||||
|
||||
func (m *PostgresMacroEngine) Interpolate(timeRange *tsdb.TimeRange, sql string) (string, error) {
|
||||
func (m *PostgresMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error) {
|
||||
m.TimeRange = timeRange
|
||||
m.Query = query
|
||||
rExp, _ := regexp.Compile(sExpr)
|
||||
var macroError error
|
||||
|
||||
sql = replaceAllStringSubmatchFunc(rExp, sql, func(groups []string) string {
|
||||
res, err := m.evaluateMacro(groups[1], strings.Split(groups[2], ","))
|
||||
args := strings.Split(groups[2], ",")
|
||||
for i, arg := range args {
|
||||
args[i] = strings.Trim(arg, " ")
|
||||
}
|
||||
res, err := m.evaluateMacro(groups[1], args)
|
||||
if err != nil && macroError == nil {
|
||||
macroError = err
|
||||
return "macro_error()"
|
||||
@ -82,13 +89,26 @@ func (m *PostgresMacroEngine) evaluateMacro(name string, args []string) (string,
|
||||
case "__timeTo":
|
||||
return fmt.Sprintf("to_timestamp(%d)", uint64(m.TimeRange.GetToAsMsEpoch()/1000)), nil
|
||||
case "__timeGroup":
|
||||
if len(args) != 2 {
|
||||
return "", fmt.Errorf("macro %v needs time column and interval", name)
|
||||
if len(args) < 2 {
|
||||
return "", fmt.Errorf("macro %v needs time column and interval and optional fill value", name)
|
||||
}
|
||||
interval, err := time.ParseDuration(strings.Trim(args[1], `' `))
|
||||
interval, err := time.ParseDuration(strings.Trim(args[1], `'`))
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("error parsing interval %v", args[1])
|
||||
}
|
||||
if len(args) == 3 {
|
||||
m.Query.Model.Set("fill", true)
|
||||
m.Query.Model.Set("fillInterval", interval.Seconds())
|
||||
if args[2] == "NULL" {
|
||||
m.Query.Model.Set("fillNull", true)
|
||||
} else {
|
||||
floatVal, err := strconv.ParseFloat(args[2], 64)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("error parsing fill value %v", args[2])
|
||||
}
|
||||
m.Query.Model.Set("fillValue", floatVal)
|
||||
}
|
||||
}
|
||||
return fmt.Sprintf("(extract(epoch from %s)/%v)::bigint*%v AS time", args[0], interval.Seconds(), interval.Seconds()), nil
|
||||
case "__unixEpochFilter":
|
||||
if len(args) == 0 {
|
||||
|
@ -10,31 +10,32 @@ import (
|
||||
func TestMacroEngine(t *testing.T) {
|
||||
Convey("MacroEngine", t, func() {
|
||||
engine := &PostgresMacroEngine{}
|
||||
query := &tsdb.Query{}
|
||||
timeRange := &tsdb.TimeRange{From: "5m", To: "now"}
|
||||
|
||||
Convey("interpolate __time function", func() {
|
||||
sql, err := engine.Interpolate(nil, "select $__time(time_column)")
|
||||
sql, err := engine.Interpolate(query, timeRange, "select $__time(time_column)")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
So(sql, ShouldEqual, "select time_column AS \"time\"")
|
||||
})
|
||||
|
||||
Convey("interpolate __time function wrapped in aggregation", func() {
|
||||
sql, err := engine.Interpolate(nil, "select min($__time(time_column))")
|
||||
sql, err := engine.Interpolate(query, timeRange, "select min($__time(time_column))")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
So(sql, ShouldEqual, "select min(time_column AS \"time\")")
|
||||
})
|
||||
|
||||
Convey("interpolate __timeFilter function", func() {
|
||||
sql, err := engine.Interpolate(timeRange, "WHERE $__timeFilter(time_column)")
|
||||
sql, err := engine.Interpolate(query, timeRange, "WHERE $__timeFilter(time_column)")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
So(sql, ShouldEqual, "WHERE extract(epoch from time_column) BETWEEN 18446744066914186738 AND 18446744066914187038")
|
||||
})
|
||||
|
||||
Convey("interpolate __timeFrom function", func() {
|
||||
sql, err := engine.Interpolate(timeRange, "select $__timeFrom(time_column)")
|
||||
sql, err := engine.Interpolate(query, timeRange, "select $__timeFrom(time_column)")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
So(sql, ShouldEqual, "select to_timestamp(18446744066914186738)")
|
||||
@ -42,35 +43,43 @@ func TestMacroEngine(t *testing.T) {
|
||||
|
||||
Convey("interpolate __timeGroup function", func() {
|
||||
|
||||
sql, err := engine.Interpolate(timeRange, "GROUP BY $__timeGroup(time_column,'5m')")
|
||||
sql, err := engine.Interpolate(query, timeRange, "GROUP BY $__timeGroup(time_column,'5m')")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
So(sql, ShouldEqual, "GROUP BY (extract(epoch from time_column)/300)::bigint*300 AS time")
|
||||
})
|
||||
|
||||
Convey("interpolate __timeGroup function with spaces between args", func() {
|
||||
|
||||
sql, err := engine.Interpolate(query, timeRange, "GROUP BY $__timeGroup(time_column , '5m')")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
So(sql, ShouldEqual, "GROUP BY (extract(epoch from time_column)/300)::bigint*300 AS time")
|
||||
})
|
||||
|
||||
Convey("interpolate __timeTo function", func() {
|
||||
sql, err := engine.Interpolate(timeRange, "select $__timeTo(time_column)")
|
||||
sql, err := engine.Interpolate(query, timeRange, "select $__timeTo(time_column)")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
So(sql, ShouldEqual, "select to_timestamp(18446744066914187038)")
|
||||
})
|
||||
|
||||
Convey("interpolate __unixEpochFilter function", func() {
|
||||
sql, err := engine.Interpolate(timeRange, "select $__unixEpochFilter(18446744066914186738)")
|
||||
sql, err := engine.Interpolate(query, timeRange, "select $__unixEpochFilter(18446744066914186738)")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
So(sql, ShouldEqual, "select 18446744066914186738 >= 18446744066914186738 AND 18446744066914186738 <= 18446744066914187038")
|
||||
})
|
||||
|
||||
Convey("interpolate __unixEpochFrom function", func() {
|
||||
sql, err := engine.Interpolate(timeRange, "select $__unixEpochFrom()")
|
||||
sql, err := engine.Interpolate(query, timeRange, "select $__unixEpochFrom()")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
So(sql, ShouldEqual, "select 18446744066914186738")
|
||||
})
|
||||
|
||||
Convey("interpolate __unixEpochTo function", func() {
|
||||
sql, err := engine.Interpolate(timeRange, "select $__unixEpochTo()")
|
||||
sql, err := engine.Interpolate(query, timeRange, "select $__unixEpochTo()")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
So(sql, ShouldEqual, "select 18446744066914187038")
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"container/list"
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"time"
|
||||
@ -60,7 +61,7 @@ func (e *PostgresQueryEndpoint) Query(ctx context.Context, dsInfo *models.DataSo
|
||||
return e.sqlEngine.Query(ctx, dsInfo, tsdbQuery, e.transformToTimeSeries, e.transformToTable)
|
||||
}
|
||||
|
||||
func (e PostgresQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult) error {
|
||||
func (e PostgresQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error {
|
||||
|
||||
columnNames, err := rows.Columns()
|
||||
if err != nil {
|
||||
@ -157,7 +158,7 @@ func (e PostgresQueryEndpoint) getTypedRowData(rows *core.Rows) (tsdb.RowValues,
|
||||
return values, nil
|
||||
}
|
||||
|
||||
func (e PostgresQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult) error {
|
||||
func (e PostgresQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error {
|
||||
pointsBySeries := make(map[string]*tsdb.TimeSeries)
|
||||
seriesByQueryOrder := list.New()
|
||||
|
||||
@ -198,6 +199,18 @@ func (e PostgresQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *co
|
||||
return fmt.Errorf("Found no column named time")
|
||||
}
|
||||
|
||||
fillMissing := query.Model.Get("fill").MustBool(false)
|
||||
var fillInterval float64
|
||||
fillValue := null.Float{}
|
||||
if fillMissing {
|
||||
fillInterval = query.Model.Get("fillInterval").MustFloat64() * 1000
|
||||
if query.Model.Get("fillNull").MustBool(false) == false {
|
||||
fillValue.Float64 = query.Model.Get("fillValue").MustFloat64()
|
||||
fillValue.Valid = true
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
for rows.Next() {
|
||||
var timestamp float64
|
||||
var value null.Float
|
||||
@ -249,7 +262,34 @@ func (e PostgresQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *co
|
||||
if metricIndex == -1 {
|
||||
metric = col
|
||||
}
|
||||
e.appendTimePoint(pointsBySeries, seriesByQueryOrder, metric, timestamp, value)
|
||||
|
||||
series, exist := pointsBySeries[metric]
|
||||
if exist == false {
|
||||
series = &tsdb.TimeSeries{Name: metric}
|
||||
pointsBySeries[metric] = series
|
||||
seriesByQueryOrder.PushBack(metric)
|
||||
}
|
||||
|
||||
if fillMissing {
|
||||
var intervalStart float64
|
||||
if exist == false {
|
||||
intervalStart = float64(tsdbQuery.TimeRange.MustGetFrom().UnixNano() / 1e6)
|
||||
} else {
|
||||
intervalStart = series.Points[len(series.Points)-1][1].Float64 + fillInterval
|
||||
}
|
||||
|
||||
// align interval start
|
||||
intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval
|
||||
|
||||
for i := intervalStart; i < timestamp; i += fillInterval {
|
||||
series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)})
|
||||
rowCount++
|
||||
}
|
||||
}
|
||||
|
||||
series.Points = append(series.Points, tsdb.TimePoint{value, null.FloatFrom(timestamp)})
|
||||
|
||||
e.log.Debug("Rows", "metric", metric, "time", timestamp, "value", value)
|
||||
rowCount++
|
||||
|
||||
}
|
||||
@ -258,20 +298,22 @@ func (e PostgresQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *co
|
||||
for elem := seriesByQueryOrder.Front(); elem != nil; elem = elem.Next() {
|
||||
key := elem.Value.(string)
|
||||
result.Series = append(result.Series, pointsBySeries[key])
|
||||
|
||||
if fillMissing {
|
||||
series := pointsBySeries[key]
|
||||
// fill in values from last fetched value till interval end
|
||||
intervalStart := series.Points[len(series.Points)-1][1].Float64
|
||||
intervalEnd := float64(tsdbQuery.TimeRange.MustGetTo().UnixNano() / 1e6)
|
||||
|
||||
// align interval start
|
||||
intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval
|
||||
for i := intervalStart + fillInterval; i < intervalEnd; i += fillInterval {
|
||||
series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)})
|
||||
rowCount++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
result.Meta.Set("rowCount", rowCount)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e PostgresQueryEndpoint) appendTimePoint(pointsBySeries map[string]*tsdb.TimeSeries, seriesByQueryOrder *list.List, metric string, timestamp float64, value null.Float) {
|
||||
if series, exist := pointsBySeries[metric]; exist {
|
||||
series.Points = append(series.Points, tsdb.TimePoint{value, null.FloatFrom(timestamp)})
|
||||
} else {
|
||||
series := &tsdb.TimeSeries{Name: metric}
|
||||
series.Points = append(series.Points, tsdb.TimePoint{value, null.FloatFrom(timestamp)})
|
||||
pointsBySeries[metric] = series
|
||||
seriesByQueryOrder.PushBack(metric)
|
||||
}
|
||||
e.log.Debug("Rows", "metric", metric, "time", timestamp, "value", value)
|
||||
}
|
||||
|
@ -17,15 +17,15 @@ type SqlEngine interface {
|
||||
ctx context.Context,
|
||||
ds *models.DataSource,
|
||||
query *TsdbQuery,
|
||||
transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult) error,
|
||||
transformToTable func(query *Query, rows *core.Rows, result *QueryResult) error,
|
||||
transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
|
||||
transformToTable func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
|
||||
) (*Response, error)
|
||||
}
|
||||
|
||||
// SqlMacroEngine interpolates macros into sql. It takes in the timeRange to be able to
|
||||
// generate queries that use from and to.
|
||||
// SqlMacroEngine interpolates macros into sql. It takes in the Query to have access to query context and
|
||||
// timeRange to be able to generate queries that use from and to.
|
||||
type SqlMacroEngine interface {
|
||||
Interpolate(timeRange *TimeRange, sql string) (string, error)
|
||||
Interpolate(query *Query, timeRange *TimeRange, sql string) (string, error)
|
||||
}
|
||||
|
||||
type DefaultSqlEngine struct {
|
||||
@ -77,8 +77,8 @@ func (e *DefaultSqlEngine) Query(
|
||||
ctx context.Context,
|
||||
dsInfo *models.DataSource,
|
||||
tsdbQuery *TsdbQuery,
|
||||
transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult) error,
|
||||
transformToTable func(query *Query, rows *core.Rows, result *QueryResult) error,
|
||||
transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
|
||||
transformToTable func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
|
||||
) (*Response, error) {
|
||||
result := &Response{
|
||||
Results: make(map[string]*QueryResult),
|
||||
@ -97,7 +97,7 @@ func (e *DefaultSqlEngine) Query(
|
||||
queryResult := &QueryResult{Meta: simplejson.New(), RefId: query.RefId}
|
||||
result.Results[query.RefId] = queryResult
|
||||
|
||||
rawSql, err := e.MacroEngine.Interpolate(tsdbQuery.TimeRange, rawSql)
|
||||
rawSql, err := e.MacroEngine.Interpolate(query, tsdbQuery.TimeRange, rawSql)
|
||||
if err != nil {
|
||||
queryResult.Error = err
|
||||
continue
|
||||
@ -117,13 +117,13 @@ func (e *DefaultSqlEngine) Query(
|
||||
|
||||
switch format {
|
||||
case "time_series":
|
||||
err := transformToTimeSeries(query, rows, queryResult)
|
||||
err := transformToTimeSeries(query, rows, queryResult, tsdbQuery)
|
||||
if err != nil {
|
||||
queryResult.Error = err
|
||||
continue
|
||||
}
|
||||
case "table":
|
||||
err := transformToTable(query, rows, queryResult)
|
||||
err := transformToTable(query, rows, queryResult, tsdbQuery)
|
||||
if err != nil {
|
||||
queryResult.Error = err
|
||||
continue
|
||||
|
Loading…
Reference in New Issue
Block a user