From ab8751767c93f7ee82d1de84f1b02a16da06c905 Mon Sep 17 00:00:00 2001 From: bergquist Date: Thu, 6 Oct 2016 12:51:45 +0200 Subject: [PATCH] feat(influxdb): send request and parse response --- pkg/tsdb/influxdb/influxdb.go | 145 ++++++++++++++++++++++-- pkg/tsdb/influxdb/models.go | 2 + pkg/tsdb/influxdb/query_builder.go | 11 +- pkg/tsdb/influxdb/query_builder_test.go | 15 ++- pkg/tsdb/influxdb/query_part.go | 6 + pkg/tsdb/influxdb/query_part_test.go | 8 ++ pkg/tsdb/models.go | 6 +- pkg/tsdb/prometheus/prometheus.go | 4 +- pkg/tsdb/testdata/scenarios.go | 8 +- 9 files changed, 182 insertions(+), 23 deletions(-) diff --git a/pkg/tsdb/influxdb/influxdb.go b/pkg/tsdb/influxdb/influxdb.go index 8f45e8d2522..566b807bfcf 100644 --- a/pkg/tsdb/influxdb/influxdb.go +++ b/pkg/tsdb/influxdb/influxdb.go @@ -3,9 +3,17 @@ package influxdb import ( "context" "crypto/tls" + "encoding/json" + "fmt" "net/http" + "net/url" + "path" "time" + "gopkg.in/guregu/null.v3" + + "golang.org/x/net/context/ctxhttp" + "github.com/grafana/grafana/pkg/log" "github.com/grafana/grafana/pkg/tsdb" ) @@ -43,22 +51,141 @@ func init() { } } -func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult { - result := &tsdb.BatchResult{} +func (e *InfluxDBExecutor) getQuery(queries tsdb.QuerySlice, context *tsdb.QueryContext) (string, error) { for _, v := range queries { - query, err := e.QueryParser.Parse(v.Model) if err != nil { - result.Error = err - return result + return "", err } - glog.Info("Influxdb executor", "query", query) + rawQuery, err := e.QueryBuilder.Build(query, context) + if err != nil { + return "", err + } - rawQuery, err := e.QueryBuilder.Build(query) - - glog.Info("Influxdb", "error", err, "rawQuery", rawQuery) + return rawQuery, nil } + return "", fmt.Errorf("Tsdb request contains no queries") +} + +func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult { + result := &tsdb.BatchResult{} + + query, err := e.getQuery(queries, context) + if err != nil { + result.Error = err + return result + } + + glog.Info("Influxdb", "query", query) + + u, _ := url.Parse(e.Url) + u.Path = path.Join(u.Path, "query") + + req, err := http.NewRequest(http.MethodGet, u.String(), nil) + if err != nil { + result.Error = err + return result + } + + params := req.URL.Query() + params.Set("q", query) + params.Set("db", e.Database) + params.Set("epoch", "s") + + req.URL.RawQuery = params.Encode() + + req.Header.Set("Content-Type", "") + req.Header.Set("User-Agent", "Grafana") + if e.BasicAuth { + req.SetBasicAuth(e.BasicAuthUser, e.BasicAuthPassword) + } + + glog.Info("influxdb request", "url", req.URL.String()) + resp, err := ctxhttp.Do(ctx, HttpClient, req) + if err != nil { + result.Error = err + return result + } + + if resp.StatusCode/100 != 2 { + result.Error = fmt.Errorf("Influxdb returned statuscode %v body %v", resp.Status) + return result + } + + var response Response + dec := json.NewDecoder(resp.Body) + dec.UseNumber() + err = dec.Decode(&response) + if err != nil { + glog.Error("Influxdb decode failed", "err", err) + result.Error = err + return result + } + + result.QueryResults = make(map[string]*tsdb.QueryResult) + queryRes := tsdb.NewQueryResult() + + for _, v := range response.Results { + for _, r := range v.Series { + serie := tsdb.TimeSeries{Name: r.Name} + var points tsdb.TimeSeriesPoints + + for _, k := range r.Values { + var value null.Float + var err error + num, ok := k[1].(json.Number) + if !ok { + value = null.FloatFromPtr(nil) + } else { + fvalue, err := num.Float64() + if err == nil { + value = null.FloatFrom(fvalue) + } + } + + pos0, ok := k[0].(json.Number) + timestamp, err := pos0.Float64() + if err == nil && ok { + points = append(points, tsdb.NewTimePoint(value, timestamp)) + } else { + glog.Error("Failed to convert response", "err1", err, "ok", ok, "timestamp", timestamp, "value", value.Float64) + } + serie.Points = points + } + queryRes.Series = append(queryRes.Series, &serie) + } + } + + for _, v := range queryRes.Series { + glog.Info("result", "name", v.Name, "points", v.Points) + } + + result.QueryResults["A"] = queryRes + return result } + +type Response struct { + Results []Result + Err error +} + +type Result struct { + Series []Row + Messages []*Message + Err error +} + +type Message struct { + Level string `json:"level,omitempty"` + Text string `json:"text,omitempty"` +} + +type Row struct { + Name string `json:"name,omitempty"` + Tags map[string]string `json:"tags,omitempty"` + Columns []string `json:"columns,omitempty"` + Values [][]interface{} `json:"values,omitempty"` +} diff --git a/pkg/tsdb/influxdb/models.go b/pkg/tsdb/influxdb/models.go index 4895c9c393b..3de80154ee2 100644 --- a/pkg/tsdb/influxdb/models.go +++ b/pkg/tsdb/influxdb/models.go @@ -7,6 +7,8 @@ type Query struct { Tags []*Tag GroupBy []*QueryPart Selects []*Select + + Interval string } type Tag struct { diff --git a/pkg/tsdb/influxdb/query_builder.go b/pkg/tsdb/influxdb/query_builder.go index d82950ba4bf..22863ce78bf 100644 --- a/pkg/tsdb/influxdb/query_builder.go +++ b/pkg/tsdb/influxdb/query_builder.go @@ -3,6 +3,8 @@ package influxdb import ( "fmt" "strings" + + "github.com/grafana/grafana/pkg/tsdb" ) type QueryBuild struct{} @@ -27,7 +29,7 @@ func renderTags(query *Query) []string { return res } -func (*QueryBuild) Build(query *Query) (string, error) { +func (*QueryBuild) Build(query *Query, queryContext *tsdb.QueryContext) (string, error) { res := "SELECT " var selectors []string @@ -42,7 +44,9 @@ func (*QueryBuild) Build(query *Query) (string, error) { res += strings.Join(selectors, ", ") policy := "" - if query.Policy != "" { + if query.Policy == "" || query.Policy == "default" { + policy = "" + } else { policy = `"` + query.Policy + `".` } res += fmt.Sprintf(` FROM %s"%s"`, policy, query.Measurement) @@ -54,7 +58,8 @@ func (*QueryBuild) Build(query *Query) (string, error) { res += " AND " } - res += "$timeFilter" + //res += "$timeFilter" + res += "time > " + strings.Replace(queryContext.TimeRange.From, "now", "now()", 1) var groupBy []string for _, group := range query.GroupBy { diff --git a/pkg/tsdb/influxdb/query_builder_test.go b/pkg/tsdb/influxdb/query_builder_test.go index ce0c2046c6a..f464117bd3e 100644 --- a/pkg/tsdb/influxdb/query_builder_test.go +++ b/pkg/tsdb/influxdb/query_builder_test.go @@ -3,6 +3,7 @@ package influxdb import ( "testing" + "github.com/grafana/grafana/pkg/tsdb" . "github.com/smartystreets/goconvey/convey" ) @@ -19,17 +20,22 @@ func TestInfluxdbQueryBuilder(t *testing.T) { tag1 := &Tag{Key: "hostname", Value: "server1", Operator: "="} tag2 := &Tag{Key: "hostname", Value: "server2", Operator: "=", Condition: "OR"} + queryContext := &tsdb.QueryContext{ + TimeRange: tsdb.NewTimeRange("now-5h", "now"), + } + Convey("can build query", func() { query := &Query{ Selects: []*Select{{*qp1, *qp2}}, Measurement: "cpu", Policy: "policy", GroupBy: []*QueryPart{groupBy1, groupBy2}, + Interval: "10s", } - rawQuery, err := builder.Build(query) + rawQuery, err := builder.Build(query, queryContext) So(err, ShouldBeNil) - So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "policy"."cpu" WHERE $timeFilter GROUP BY time($interval) fill(null)`) + So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "policy"."cpu" WHERE time > now()-5h GROUP BY time(10s) fill(null)`) }) Convey("can asd query", func() { @@ -38,11 +44,12 @@ func TestInfluxdbQueryBuilder(t *testing.T) { Measurement: "cpu", GroupBy: []*QueryPart{groupBy1}, Tags: []*Tag{tag1, tag2}, + Interval: "5s", } - rawQuery, err := builder.Build(query) + rawQuery, err := builder.Build(query, queryContext) So(err, ShouldBeNil) - So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "cpu" WHERE "hostname" = 'server1' OR "hostname" = 'server2' AND $timeFilter GROUP BY time($interval)`) + So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "cpu" WHERE "hostname" = 'server1' OR "hostname" = 'server2' AND time > now()-5h GROUP BY time(10s)`) }) }) } diff --git a/pkg/tsdb/influxdb/query_part.go b/pkg/tsdb/influxdb/query_part.go index 32ba6ba4d4a..ba14bcac665 100644 --- a/pkg/tsdb/influxdb/query_part.go +++ b/pkg/tsdb/influxdb/query_part.go @@ -91,6 +91,12 @@ func fieldRenderer(part *QueryPart, innerExpr string) string { } func functionRenderer(part *QueryPart, innerExpr string) string { + for i, v := range part.Params { + if v == "$interval" { + part.Params[i] = "10s" + } + } + if innerExpr != "" { part.Params = append([]string{innerExpr}, part.Params...) } diff --git a/pkg/tsdb/influxdb/query_part_test.go b/pkg/tsdb/influxdb/query_part_test.go index b1c24b7070f..2160d59b851 100644 --- a/pkg/tsdb/influxdb/query_part_test.go +++ b/pkg/tsdb/influxdb/query_part_test.go @@ -33,6 +33,14 @@ func TestInfluxdbQueryPart(t *testing.T) { So(res, ShouldEqual, "bottom(value, 3)") }) + Convey("time", func() { + part, err := NewQueryPart("time", []string{"$interval"}) + So(err, ShouldBeNil) + + res := part.Render("") + So(res, ShouldEqual, "time(10s)") + }) + Convey("should nest spread function", func() { part, err := NewQueryPart("spread", []string{}) So(err, ShouldBeNil) diff --git a/pkg/tsdb/models.go b/pkg/tsdb/models.go index 918f0ad65eb..7ba25f9674c 100644 --- a/pkg/tsdb/models.go +++ b/pkg/tsdb/models.go @@ -73,15 +73,15 @@ func NewQueryResult() *QueryResult { } } -func NewTimePoint(value float64, timestamp float64) TimePoint { - return TimePoint{null.FloatFrom(value), null.FloatFrom(timestamp)} +func NewTimePoint(value null.Float, timestamp float64) TimePoint { + return TimePoint{value, null.FloatFrom(timestamp)} } func NewTimeSeriesPointsFromArgs(values ...float64) TimeSeriesPoints { points := make(TimeSeriesPoints, 0) for i := 0; i < len(values); i += 2 { - points = append(points, NewTimePoint(values[i], values[i+1])) + points = append(points, NewTimePoint(null.FloatFrom(values[i]), values[i+1])) } return points diff --git a/pkg/tsdb/prometheus/prometheus.go b/pkg/tsdb/prometheus/prometheus.go index 98d4c72fd03..c95f4b49467 100644 --- a/pkg/tsdb/prometheus/prometheus.go +++ b/pkg/tsdb/prometheus/prometheus.go @@ -8,6 +8,8 @@ import ( "strings" "time" + "gopkg.in/guregu/null.v3" + "github.com/grafana/grafana/pkg/log" "github.com/grafana/grafana/pkg/tsdb" "github.com/prometheus/client_golang/api/prometheus" @@ -145,7 +147,7 @@ func parseResponse(value pmodel.Value, query *PrometheusQuery) (map[string]*tsdb } for _, k := range v.Values { - series.Points = append(series.Points, tsdb.NewTimePoint(float64(k.Value), float64(k.Timestamp.Unix()*1000))) + series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(float64(k.Value)), float64(k.Timestamp.Unix()*1000))) } queryRes.Series = append(queryRes.Series, &series) diff --git a/pkg/tsdb/testdata/scenarios.go b/pkg/tsdb/testdata/scenarios.go index e90b0d4df79..29a0b5a2ada 100644 --- a/pkg/tsdb/testdata/scenarios.go +++ b/pkg/tsdb/testdata/scenarios.go @@ -6,6 +6,8 @@ import ( "strings" "time" + "gopkg.in/guregu/null.v3" + "github.com/grafana/grafana/pkg/log" "github.com/grafana/grafana/pkg/tsdb" ) @@ -42,7 +44,7 @@ func init() { walker := rand.Float64() * 100 for i := int64(0); i < 10000 && timeWalkerMs < to; i++ { - points = append(points, tsdb.NewTimePoint(walker, float64(timeWalkerMs))) + points = append(points, tsdb.NewTimePoint(null.FloatFrom(walker), float64(timeWalkerMs))) walker += rand.Float64() - 0.5 timeWalkerMs += query.IntervalMs @@ -73,7 +75,7 @@ func init() { series := newSeriesForQuery(query) outsideTime := context.TimeRange.MustGetFrom().Add(-1*time.Hour).Unix() * 1000 - series.Points = append(series.Points, tsdb.NewTimePoint(10, float64(outsideTime))) + series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(10), float64(outsideTime))) queryRes.Series = append(queryRes.Series, series) return queryRes @@ -105,7 +107,7 @@ func init() { step := (endTime - startTime) / int64(len(values)-1) for _, val := range values { - series.Points = append(series.Points, tsdb.NewTimePoint(val, float64(startTime))) + series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(val), float64(startTime))) startTime += step }