mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
feat(influxdb): support multi row results
This commit is contained in:
@@ -18,8 +18,9 @@ import (
|
||||
|
||||
type InfluxDBExecutor struct {
|
||||
*tsdb.DataSourceInfo
|
||||
QueryParser *InfluxdbQueryParser
|
||||
QueryBuilder *QueryBuilder
|
||||
QueryParser *InfluxdbQueryParser
|
||||
QueryBuilder *QueryBuilder
|
||||
ResponseParser *ResponseParser
|
||||
}
|
||||
|
||||
func NewInfluxDBExecutor(dsInfo *tsdb.DataSourceInfo) tsdb.Executor {
|
||||
@@ -27,6 +28,7 @@ func NewInfluxDBExecutor(dsInfo *tsdb.DataSourceInfo) tsdb.Executor {
|
||||
DataSourceInfo: dsInfo,
|
||||
QueryParser: &InfluxdbQueryParser{},
|
||||
QueryBuilder: &QueryBuilder{},
|
||||
ResponseParser: &ResponseParser{},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -124,7 +126,7 @@ func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice,
|
||||
}
|
||||
|
||||
result.QueryResults = make(map[string]*tsdb.QueryResult)
|
||||
result.QueryResults["A"] = ParseQueryResult(&response)
|
||||
result.QueryResults["A"] = e.ResponseParser.Parse(&response)
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
@@ -2,48 +2,79 @@ package influxdb
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/grafana/grafana/pkg/tsdb"
|
||||
"gopkg.in/guregu/null.v3"
|
||||
)
|
||||
|
||||
func ParseQueryResult(response *Response) *tsdb.QueryResult {
|
||||
type ResponseParser struct{}
|
||||
|
||||
func (rp *ResponseParser) Parse(response *Response) *tsdb.QueryResult {
|
||||
queryRes := tsdb.NewQueryResult()
|
||||
|
||||
for _, v := range response.Results {
|
||||
for _, r := range v.Series {
|
||||
serie := tsdb.TimeSeries{Name: r.Name}
|
||||
var points tsdb.TimeSeriesPoints
|
||||
|
||||
for _, k := range r.Values {
|
||||
var value null.Float
|
||||
var err error
|
||||
num, ok := k[1].(json.Number)
|
||||
if !ok {
|
||||
value = null.FloatFromPtr(nil)
|
||||
} else {
|
||||
fvalue, err := num.Float64()
|
||||
if err == nil {
|
||||
value = null.FloatFrom(fvalue)
|
||||
}
|
||||
}
|
||||
|
||||
pos0, ok := k[0].(json.Number)
|
||||
timestamp, err := pos0.Float64()
|
||||
if err == nil && ok {
|
||||
points = append(points, tsdb.NewTimePoint(value, timestamp))
|
||||
} else {
|
||||
//glog.Error("Failed to convert response", "err1", err, "ok", ok, "timestamp", timestamp, "value", value.Float64)
|
||||
}
|
||||
serie.Points = points
|
||||
}
|
||||
queryRes.Series = append(queryRes.Series, &serie)
|
||||
}
|
||||
for _, result := range response.Results {
|
||||
rp.parseResult(result.Series, queryRes)
|
||||
}
|
||||
|
||||
for _, v := range queryRes.Series {
|
||||
glog.Info("result", "name", v.Name, "points", v.Points)
|
||||
for _, serie := range queryRes.Series {
|
||||
glog.Debug("result", "name", serie.Name, "points", serie.Points)
|
||||
}
|
||||
|
||||
return queryRes
|
||||
}
|
||||
|
||||
func (rp *ResponseParser) parseResult(result []Row, queryResult *tsdb.QueryResult) {
|
||||
for _, r := range result {
|
||||
for columnIndex, column := range r.Columns {
|
||||
if column == "time" {
|
||||
continue
|
||||
}
|
||||
|
||||
var points tsdb.TimeSeriesPoints
|
||||
for _, k := range r.Values {
|
||||
points = append(points, rp.parseTimepoint(k, columnIndex))
|
||||
}
|
||||
|
||||
queryResult.Series = append(queryResult.Series, &tsdb.TimeSeries{
|
||||
Name: rp.formatName(r, column),
|
||||
Points: points,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (rp *ResponseParser) formatName(row Row, column string) string {
|
||||
return fmt.Sprintf("%s.%s", row.Name, column)
|
||||
}
|
||||
|
||||
func (rp *ResponseParser) parseTimepoint(k []interface{}, valuePosition int) tsdb.TimePoint {
|
||||
var value null.Float = rp.parseValue(k[valuePosition])
|
||||
|
||||
timestampNumber, _ := k[0].(json.Number)
|
||||
timestamp, err := timestampNumber.Float64()
|
||||
if err != nil {
|
||||
glog.Error("Invalid timestamp format. This should never happen!")
|
||||
}
|
||||
|
||||
return tsdb.NewTimePoint(value, timestamp)
|
||||
}
|
||||
|
||||
func (rp *ResponseParser) parseValue(value interface{}) null.Float {
|
||||
num, ok := value.(json.Number)
|
||||
if !ok {
|
||||
return null.FloatFromPtr(nil)
|
||||
}
|
||||
|
||||
fvalue, err := num.Float64()
|
||||
if err == nil {
|
||||
return null.FloatFrom(fvalue)
|
||||
}
|
||||
|
||||
ivalue, err := num.Int64()
|
||||
if err == nil {
|
||||
return null.FloatFrom(float64(ivalue))
|
||||
}
|
||||
|
||||
return null.FloatFromPtr(nil)
|
||||
}
|
||||
|
||||
@@ -4,16 +4,13 @@ import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
)
|
||||
|
||||
func TestInfluxdbResponseParser(t *testing.T) {
|
||||
Convey("Influxdb response parser", t, func() {
|
||||
|
||||
setting.NewConfigContext(&setting.CommandLineArgs{
|
||||
HomePath: "../../../",
|
||||
})
|
||||
parser := &ResponseParser{}
|
||||
|
||||
response := &Response{
|
||||
Results: []Result{
|
||||
@@ -22,17 +19,11 @@ func TestInfluxdbResponseParser(t *testing.T) {
|
||||
{
|
||||
Name: "cpu",
|
||||
Columns: []string{"time", "mean", "sum"},
|
||||
Tags: map[string]string{"datacenter": "America"},
|
||||
Values: [][]interface{}{
|
||||
{json.Number("123"), json.Number("123"), json.Number("123")},
|
||||
{json.Number("123"), json.Number("123"), json.Number("123")},
|
||||
{json.Number("123"), json.Number("123"), json.Number("123")},
|
||||
{json.Number("123"), json.Number("123"), json.Number("123")},
|
||||
{json.Number("123"), json.Number("123"), json.Number("123")},
|
||||
{json.Number("123"), json.Number("123"), json.Number("123")},
|
||||
{json.Number("123"), json.Number("123"), json.Number("123")},
|
||||
{json.Number("123"), json.Number("123"), json.Number("123")},
|
||||
{json.Number("123"), json.Number("123"), json.Number("123")},
|
||||
{json.Number("123"), json.Number("123"), json.Number("123")},
|
||||
{json.Number("111"), json.Number("222"), json.Number("333")},
|
||||
{json.Number("111"), json.Number("222"), json.Number("333")},
|
||||
{json.Number("111"), json.Number("null"), json.Number("333")},
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -40,10 +31,29 @@ func TestInfluxdbResponseParser(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
Convey("can parse response", func() {
|
||||
result := ParseQueryResult(response)
|
||||
So(len(result.Series), ShouldEqual, 1)
|
||||
So(len(result.Series[0].Points), ShouldEqual, 10)
|
||||
result := parser.Parse(response)
|
||||
|
||||
Convey("can parse all series", func() {
|
||||
So(len(result.Series), ShouldEqual, 2)
|
||||
})
|
||||
|
||||
Convey("can parse all points", func() {
|
||||
So(len(result.Series[0].Points), ShouldEqual, 3)
|
||||
So(len(result.Series[1].Points), ShouldEqual, 3)
|
||||
})
|
||||
|
||||
Convey("can parse multi row result", func() {
|
||||
So(result.Series[0].Points[1][0].Float64, ShouldEqual, float64(222))
|
||||
So(result.Series[1].Points[1][0].Float64, ShouldEqual, float64(333))
|
||||
})
|
||||
|
||||
Convey("can parse null points", func() {
|
||||
So(result.Series[0].Points[2][0].Valid, ShouldBeFalse)
|
||||
})
|
||||
|
||||
Convey("can format serie names", func() {
|
||||
So(result.Series[0].Name, ShouldEqual, "cpu.mean")
|
||||
So(result.Series[1].Name, ShouldEqual, "cpu.sum")
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user