From e556df5b499e9f3d2c66d8978b27f69e605baca0 Mon Sep 17 00:00:00 2001 From: bergquist Date: Tue, 4 Oct 2016 16:25:33 +0200 Subject: [PATCH 01/25] feat(tsdb): add draft implementation for influxdb --- pkg/cmd/grafana-server/main.go | 1 + pkg/tsdb/influxdb/influxdb.go | 42 +++++++++++++++++++ pkg/tsdb/influxdb/models.go | 1 + .../plugins/datasource/influxdb/plugin.json | 1 + 4 files changed, 45 insertions(+) create mode 100644 pkg/tsdb/influxdb/influxdb.go create mode 100644 pkg/tsdb/influxdb/models.go diff --git a/pkg/cmd/grafana-server/main.go b/pkg/cmd/grafana-server/main.go index 6cd063f798f..caf9d2cb56f 100644 --- a/pkg/cmd/grafana-server/main.go +++ b/pkg/cmd/grafana-server/main.go @@ -20,6 +20,7 @@ import ( _ "github.com/grafana/grafana/pkg/services/alerting/conditions" _ "github.com/grafana/grafana/pkg/services/alerting/notifiers" _ "github.com/grafana/grafana/pkg/tsdb/graphite" + _ "github.com/grafana/grafana/pkg/tsdb/influxdb" _ "github.com/grafana/grafana/pkg/tsdb/prometheus" _ "github.com/grafana/grafana/pkg/tsdb/testdata" ) diff --git a/pkg/tsdb/influxdb/influxdb.go b/pkg/tsdb/influxdb/influxdb.go new file mode 100644 index 00000000000..010b18f4070 --- /dev/null +++ b/pkg/tsdb/influxdb/influxdb.go @@ -0,0 +1,42 @@ +package influxdb + +import ( + "context" + "crypto/tls" + "net/http" + "time" + + "github.com/grafana/grafana/pkg/log" + "github.com/grafana/grafana/pkg/tsdb" +) + +type InfluxDBExecutor struct { + *tsdb.DataSourceInfo +} + +func NewInfluxDBExecutor(dsInfo *tsdb.DataSourceInfo) tsdb.Executor { + return &InfluxDBExecutor{dsInfo} +} + +var ( + glog log.Logger + HttpClient *http.Client +) + +func init() { + glog = log.New("tsdb.influxdb") + tsdb.RegisterExecutor("influxdb", NewInfluxDBExecutor) + + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + + HttpClient = &http.Client{ + Timeout: time.Duration(15 * time.Second), + Transport: tr, + } +} + +func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult { + panic("missing implementation") +} diff --git a/pkg/tsdb/influxdb/models.go b/pkg/tsdb/influxdb/models.go new file mode 100644 index 00000000000..ab711e3b83a --- /dev/null +++ b/pkg/tsdb/influxdb/models.go @@ -0,0 +1 @@ +package influxdb diff --git a/public/app/plugins/datasource/influxdb/plugin.json b/public/app/plugins/datasource/influxdb/plugin.json index 605ce168831..635309d1610 100644 --- a/public/app/plugins/datasource/influxdb/plugin.json +++ b/public/app/plugins/datasource/influxdb/plugin.json @@ -6,6 +6,7 @@ "defaultMatchFormat": "regex values", "metrics": true, "annotations": true, + "alerting": true, "info": { "author": { From 946e0bf32e983972858fcf319038240e9e14b7b9 Mon Sep 17 00:00:00 2001 From: bergquist Date: Tue, 4 Oct 2016 21:28:05 +0200 Subject: [PATCH 02/25] feat(influxdb): parse query json --- pkg/tsdb/influxdb/influxdb.go | 14 ++++- pkg/tsdb/influxdb/models.go | 98 +++++++++++++++++++++++++++++++++++ 2 files changed, 111 insertions(+), 1 deletion(-) diff --git a/pkg/tsdb/influxdb/influxdb.go b/pkg/tsdb/influxdb/influxdb.go index 010b18f4070..5268a573ab0 100644 --- a/pkg/tsdb/influxdb/influxdb.go +++ b/pkg/tsdb/influxdb/influxdb.go @@ -38,5 +38,17 @@ func init() { } func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult { - panic("missing implementation") + result := &tsdb.BatchResult{} + for _, v := range queries { + + query, err := ParseQuery(v.Model) + + if err != nil { + result.Error = err + return result + } + glog.Info("Influxdb executor", "query", query) + } + + return result } diff --git a/pkg/tsdb/influxdb/models.go b/pkg/tsdb/influxdb/models.go index ab711e3b83a..78f9ce2bb3e 100644 --- a/pkg/tsdb/influxdb/models.go +++ b/pkg/tsdb/influxdb/models.go @@ -1 +1,99 @@ package influxdb + +import "github.com/grafana/grafana/pkg/components/simplejson" + +type InfluxDBQuery struct { + Measurement string + Policy string + ResultFormat string + Tags []Tag + GroupBy []GroupBy +} + +type Tag struct { + Key string + Operator string + Value string +} + +type GroupBy struct { + Type string + Params []string +} + +type InfluxDbSelect struct { + Type string +} + +func ParseQuery(model *simplejson.Json) (*InfluxDBQuery, error) { + measurement, err := model.Get("measurement").String() + if err != nil { + return nil, err + } + + policy := model.Get("policy").MustString("default") + + resultFormat, err := model.Get("resultFormat").String() + if err != nil { + return nil, err + } + + var tags []Tag + var groupBy []GroupBy + + for _, g := range model.Get("groupBy").MustArray() { + group := simplejson.NewFromAny(g) + + typ, err := group.Get("type").String() + if err != nil { + return nil, err + } + + var params []string + for _, p := range group.Get("params").MustArray() { + param := simplejson.NewFromAny(p) + + pv, err := param.String() + if err != nil { + return nil, err + } + params = append(params, pv) + } + + gap := GroupBy{ + Type: typ, + Params: params, + } + + groupBy = append(groupBy, gap) + } + + for _, t := range model.Get("tags").MustArray() { + tag := simplejson.NewFromAny(t) + + key, err := tag.Get("key").String() + if err != nil { + return nil, err + } + + operator, err := tag.Get("operator").String() + if err != nil { + return nil, err + } + + value, err := tag.Get("value").String() + if err != nil { + return nil, err + } + + tags = append(tags, Tag{Key: key, Operator: operator, Value: value}) + } + + return &InfluxDBQuery{ + Measurement: measurement, + Policy: policy, + ResultFormat: resultFormat, + GroupBy: groupBy, + Tags: tags, + }, nil +} From d0e6a9559eef48d08a74881a9a4164b9e53d0274 Mon Sep 17 00:00:00 2001 From: bergquist Date: Wed, 5 Oct 2016 10:56:34 +0200 Subject: [PATCH 03/25] feat(influxdb): add query part model --- pkg/tsdb/influxdb/influxdb.go | 8 +- pkg/tsdb/influxdb/models.go | 86 ++-------------- pkg/tsdb/influxdb/parser.go | 128 ++++++++++++++++++++++++ pkg/tsdb/influxdb/parser_test.go | 93 +++++++++++++++++ pkg/tsdb/influxdb/query_builder.go | 10 ++ pkg/tsdb/influxdb/query_builder_test.go | 57 +++++++++++ pkg/tsdb/influxdb/query_part.go | 5 + pkg/tsdb/influxdb/query_part_test.go | 63 ++++++++++++ 8 files changed, 369 insertions(+), 81 deletions(-) create mode 100644 pkg/tsdb/influxdb/parser.go create mode 100644 pkg/tsdb/influxdb/parser_test.go create mode 100644 pkg/tsdb/influxdb/query_builder.go create mode 100644 pkg/tsdb/influxdb/query_builder_test.go create mode 100644 pkg/tsdb/influxdb/query_part.go create mode 100644 pkg/tsdb/influxdb/query_part_test.go diff --git a/pkg/tsdb/influxdb/influxdb.go b/pkg/tsdb/influxdb/influxdb.go index 5268a573ab0..b0775ebf9dd 100644 --- a/pkg/tsdb/influxdb/influxdb.go +++ b/pkg/tsdb/influxdb/influxdb.go @@ -12,10 +12,14 @@ import ( type InfluxDBExecutor struct { *tsdb.DataSourceInfo + QueryParser *InfluxdbQueryParser } func NewInfluxDBExecutor(dsInfo *tsdb.DataSourceInfo) tsdb.Executor { - return &InfluxDBExecutor{dsInfo} + return &InfluxDBExecutor{ + DataSourceInfo: dsInfo, + QueryParser: &InfluxdbQueryParser{}, + } } var ( @@ -41,7 +45,7 @@ func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, result := &tsdb.BatchResult{} for _, v := range queries { - query, err := ParseQuery(v.Model) + query, err := e.QueryParser.Parse(v.Model) if err != nil { result.Error = err diff --git a/pkg/tsdb/influxdb/models.go b/pkg/tsdb/influxdb/models.go index 78f9ce2bb3e..00a718e9b1e 100644 --- a/pkg/tsdb/influxdb/models.go +++ b/pkg/tsdb/influxdb/models.go @@ -1,13 +1,12 @@ package influxdb -import "github.com/grafana/grafana/pkg/components/simplejson" - -type InfluxDBQuery struct { +type Query struct { Measurement string Policy string ResultFormat string - Tags []Tag - GroupBy []GroupBy + Tags []*Tag + GroupBy []*QueryPart + Selects []*Select } type Tag struct { @@ -16,84 +15,13 @@ type Tag struct { Value string } -type GroupBy struct { +type QueryPart struct { Type string Params []string } +type Select []QueryPart + type InfluxDbSelect struct { Type string } - -func ParseQuery(model *simplejson.Json) (*InfluxDBQuery, error) { - measurement, err := model.Get("measurement").String() - if err != nil { - return nil, err - } - - policy := model.Get("policy").MustString("default") - - resultFormat, err := model.Get("resultFormat").String() - if err != nil { - return nil, err - } - - var tags []Tag - var groupBy []GroupBy - - for _, g := range model.Get("groupBy").MustArray() { - group := simplejson.NewFromAny(g) - - typ, err := group.Get("type").String() - if err != nil { - return nil, err - } - - var params []string - for _, p := range group.Get("params").MustArray() { - param := simplejson.NewFromAny(p) - - pv, err := param.String() - if err != nil { - return nil, err - } - params = append(params, pv) - } - - gap := GroupBy{ - Type: typ, - Params: params, - } - - groupBy = append(groupBy, gap) - } - - for _, t := range model.Get("tags").MustArray() { - tag := simplejson.NewFromAny(t) - - key, err := tag.Get("key").String() - if err != nil { - return nil, err - } - - operator, err := tag.Get("operator").String() - if err != nil { - return nil, err - } - - value, err := tag.Get("value").String() - if err != nil { - return nil, err - } - - tags = append(tags, Tag{Key: key, Operator: operator, Value: value}) - } - - return &InfluxDBQuery{ - Measurement: measurement, - Policy: policy, - ResultFormat: resultFormat, - GroupBy: groupBy, - Tags: tags, - }, nil -} diff --git a/pkg/tsdb/influxdb/parser.go b/pkg/tsdb/influxdb/parser.go new file mode 100644 index 00000000000..0e024ab4f27 --- /dev/null +++ b/pkg/tsdb/influxdb/parser.go @@ -0,0 +1,128 @@ +package influxdb + +import "github.com/grafana/grafana/pkg/components/simplejson" + +type InfluxdbQueryParser struct{} + +func (qp *InfluxdbQueryParser) Parse(model *simplejson.Json) (*Query, error) { + policy := model.Get("policy").MustString("default") + + measurement, err := model.Get("measurement").String() + if err != nil { + return nil, err + } + + resultFormat, err := model.Get("resultFormat").String() + if err != nil { + return nil, err + } + + tags, err := qp.parseTags(model) + if err != nil { + return nil, err + } + + groupBys, err := qp.parseGroupBy(model) + if err != nil { + return nil, err + } + + selects, err := qp.parseSelects(model) + if err != nil { + return nil, err + } + + return &Query{ + Measurement: measurement, + Policy: policy, + ResultFormat: resultFormat, + GroupBy: groupBys, + Tags: tags, + Selects: selects, + }, nil +} + +func (qp *InfluxdbQueryParser) parseSelects(model *simplejson.Json) ([]*Select, error) { + var result []*Select + + for _, selectObj := range model.Get("select").MustArray() { + selectJson := simplejson.NewFromAny(selectObj) + var parts Select + + for _, partObj := range selectJson.MustArray() { + part := simplejson.NewFromAny(partObj) + queryPart, err := qp.parseQueryPart(part) + if err != nil { + return nil, err + } + + parts = append(parts, *queryPart) + } + + result = append(result, &parts) + } + + return result, nil +} + +func (*InfluxdbQueryParser) parseTags(model *simplejson.Json) ([]*Tag, error) { + var result []*Tag + for _, t := range model.Get("tags").MustArray() { + tagJson := simplejson.NewFromAny(t) + + key, err := tagJson.Get("key").String() + if err != nil { + return nil, err + } + + operator, err := tagJson.Get("operator").String() + if err != nil { + return nil, err + } + + value, err := tagJson.Get("value").String() + if err != nil { + return nil, err + } + + result = append(result, &Tag{Key: key, Operator: operator, Value: value}) + } + + return result, nil +} + +func (*InfluxdbQueryParser) parseQueryPart(model *simplejson.Json) (*QueryPart, error) { + typ, err := model.Get("type").String() + if err != nil { + return nil, err + } + + var params []string + for _, paramObj := range model.Get("params").MustArray() { + param := simplejson.NewFromAny(paramObj) + + pv, err := param.String() + if err != nil { + return nil, err + } + params = append(params, pv) + } + + return &QueryPart{Type: typ, Params: params}, nil +} + +func (qp *InfluxdbQueryParser) parseGroupBy(model *simplejson.Json) ([]*QueryPart, error) { + var result []*QueryPart + + for _, groupObj := range model.Get("groupBy").MustArray() { + groupJson := simplejson.NewFromAny(groupObj) + queryPart, err := qp.parseQueryPart(groupJson) + + if err != nil { + return nil, err + } + result = append(result, queryPart) + } + + return result, nil +} diff --git a/pkg/tsdb/influxdb/parser_test.go b/pkg/tsdb/influxdb/parser_test.go new file mode 100644 index 00000000000..cb7c27ec35f --- /dev/null +++ b/pkg/tsdb/influxdb/parser_test.go @@ -0,0 +1,93 @@ +package influxdb + +import ( + "testing" + + "github.com/grafana/grafana/pkg/components/simplejson" + . "github.com/smartystreets/goconvey/convey" +) + +func TestInfluxdbQueryParser(t *testing.T) { + Convey("Influxdb query parser", t, func() { + + parser := &InfluxdbQueryParser{} + + Convey("converting metric name", func() { + json := ` + { + "dsType": "influxdb", + "groupBy": [ + { + "params": [ + "$interval" + ], + "type": "time" + }, + { + "type": "tag", + "params": [ + "datacenter" + ] + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "measurement": "logins.count", + "policy": "default", + "refId": "B", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "value" + ], + "type": "field" + }, + { + "params": [ + + ], + "type": "count" + } + ], + [ + { + "params": [ + "value" + ], + "type": "field" + }, + { + "params": [ + + ], + "type": "mean" + } + ] + ], + "tags": [ + { + "key": "datacenter", + "operator": "=", + "value": "America" + } + ] + } + ` + + modelJson, err := simplejson.NewJson([]byte(json)) + So(err, ShouldBeNil) + + res, err := parser.Parse(modelJson) + So(err, ShouldBeNil) + So(len(res.GroupBy), ShouldEqual, 3) + So(len(res.Selects), ShouldEqual, 2) + So(len(res.Tags), ShouldEqual, 1) + }) + }) +} diff --git a/pkg/tsdb/influxdb/query_builder.go b/pkg/tsdb/influxdb/query_builder.go new file mode 100644 index 00000000000..86d4981133e --- /dev/null +++ b/pkg/tsdb/influxdb/query_builder.go @@ -0,0 +1,10 @@ +package influxdb + +import "fmt" + +type QueryBuild struct{} + +func (*QueryBuild) Build(query *Query) (string, error) { + + return "", fmt.Errorf("query is not valid") +} diff --git a/pkg/tsdb/influxdb/query_builder_test.go b/pkg/tsdb/influxdb/query_builder_test.go new file mode 100644 index 00000000000..7ec80d389dd --- /dev/null +++ b/pkg/tsdb/influxdb/query_builder_test.go @@ -0,0 +1,57 @@ +package influxdb + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" +) + +func TestInfluxdbQueryBuilder(t *testing.T) { + Convey("Influxdb query builder", t, func() { + + builder := &QueryBuild{} + + Convey("can build query", func() { + //query := &Query{} + //res, err := builder.Build(query) + //So(err, ShouldBeNil) + }) + + Convey("empty queries should return error", func() { + query := &Query{} + + res, err := builder.Build(query) + So(err, ShouldNotBeNil) + So(res, ShouldEqual, "") + }) + }) +} + +/* + describe('render series with mesurement only', function() { + it('should generate correct query', function() { + var query = new InfluxQuery({ + measurement: 'cpu', + }, templateSrv, {}); + + var queryText = query.render(); + expect(queryText).to.be('SELECT mean("value") FROM "cpu" WHERE $timeFilter GROUP BY time($interval) fill(null)'); + }); + }); +*/ + +/* + describe('series with tags OR condition', function() { + it('should generate correct query', function() { + var query = new InfluxQuery({ + measurement: 'cpu', + groupBy: [{type: 'time', params: ['auto']}], + tags: [{key: 'hostname', value: 'server1'}, {key: 'hostname', value: 'server2', condition: "OR"}] + }, templateSrv, {}); + + var queryText = query.render(); + expect(queryText).to.be('SELECT mean("value") FROM "cpu" WHERE "hostname" = \'server1\' OR "hostname" = \'server2\' AND ' + + '$timeFilter GROUP BY time($interval)'); + }); + }); +*/ diff --git a/pkg/tsdb/influxdb/query_part.go b/pkg/tsdb/influxdb/query_part.go new file mode 100644 index 00000000000..3d5a51d42c0 --- /dev/null +++ b/pkg/tsdb/influxdb/query_part.go @@ -0,0 +1,5 @@ +package influxdb + +type Selector interface { + Render(input string) string +} diff --git a/pkg/tsdb/influxdb/query_part_test.go b/pkg/tsdb/influxdb/query_part_test.go new file mode 100644 index 00000000000..20f12cc3359 --- /dev/null +++ b/pkg/tsdb/influxdb/query_part_test.go @@ -0,0 +1,63 @@ +package influxdb + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" +) + +func TestInfluxdbQueryPart(t *testing.T) { + Convey("Influxdb query part builder", t, func() { + + Convey("can build query", func() { + }) + + Convey("empty queries should return error", func() { + + }) + }) +} + +/* + describe('series with mesurement only', () => { + it('should handle nested function parts', () => { + var part = queryPart.create({ + type: 'derivative', + params: ['10s'], + }); + + expect(part.text).to.be('derivative(10s)'); + expect(part.render('mean(value)')).to.be('derivative(mean(value), 10s)'); + }); + + it('should nest spread function', () => { + var part = queryPart.create({ + type: 'spread' + }); + + expect(part.text).to.be('spread()'); + expect(part.render('value')).to.be('spread(value)'); + }); + + it('should handle suffirx parts', () => { + var part = queryPart.create({ + type: 'math', + params: ['/ 100'], + }); + + expect(part.text).to.be('math(/ 100)'); + expect(part.render('mean(value)')).to.be('mean(value) / 100'); + }); + + it('should handle alias parts', () => { + var part = queryPart.create({ + type: 'alias', + params: ['test'], + }); + + expect(part.text).to.be('alias(test)'); + expect(part.render('mean(value)')).to.be('mean(value) AS "test"'); + }); + + }); +*/ From c7abd3ba4e296cc98a14c3302d4fa16c63872836 Mon Sep 17 00:00:00 2001 From: bergquist Date: Wed, 5 Oct 2016 14:42:06 +0200 Subject: [PATCH 04/25] feat(influxdb): add querypart renderers --- pkg/tsdb/influxdb/models.go | 5 - pkg/tsdb/influxdb/parser_test.go | 138 +++++++++++++++------------ pkg/tsdb/influxdb/query_part.go | 123 +++++++++++++++++++++++- pkg/tsdb/influxdb/query_part_test.go | 91 +++++++++--------- 4 files changed, 242 insertions(+), 115 deletions(-) diff --git a/pkg/tsdb/influxdb/models.go b/pkg/tsdb/influxdb/models.go index 00a718e9b1e..a70fe7f5d47 100644 --- a/pkg/tsdb/influxdb/models.go +++ b/pkg/tsdb/influxdb/models.go @@ -15,11 +15,6 @@ type Tag struct { Value string } -type QueryPart struct { - Type string - Params []string -} - type Select []QueryPart type InfluxDbSelect struct { diff --git a/pkg/tsdb/influxdb/parser_test.go b/pkg/tsdb/influxdb/parser_test.go index cb7c27ec35f..19e2c228f1a 100644 --- a/pkg/tsdb/influxdb/parser_test.go +++ b/pkg/tsdb/influxdb/parser_test.go @@ -14,70 +14,84 @@ func TestInfluxdbQueryParser(t *testing.T) { Convey("converting metric name", func() { json := ` - { - "dsType": "influxdb", - "groupBy": [ - { - "params": [ - "$interval" - ], - "type": "time" - }, - { - "type": "tag", - "params": [ - "datacenter" - ] - }, - { - "params": [ - "null" - ], - "type": "fill" - } - ], - "measurement": "logins.count", - "policy": "default", - "refId": "B", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" + { + "dsType": "influxdb", + "groupBy": [ + { + "params": [ + "$interval" + ], + "type": "time" + }, + { + "params": [ + "datacenter" + ], + "type": "tag" + }, + { + "params": [ + "null" + ], + "type": "fill" + } ], - "type": "field" - }, - { - "params": [ - + "measurement": "logins.count", + "policy": "default", + "refId": "B", + "resultFormat": "time_series", + "select": [ + [ + { + "type": "field", + "params": [ + "value" + ] + }, + { + "type": "count", + "params": [] + } + ], + [ + { + "type": "field", + "params": [ + "value" + ] + }, + { + "type": "mean", + "params": [] + } + ], + [ + { + "type": "field", + "params": [ + "value" + ] + }, + { + "type": "mean", + "params": [] + }, + { + "type": "math", + "params": [ + " / 100" + ] + } + ] ], - "type": "count" + "tags": [ + { + "key": "datacenter", + "operator": "=", + "value": "America" + } + ] } - ], - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [ - - ], - "type": "mean" - } - ] - ], - "tags": [ - { - "key": "datacenter", - "operator": "=", - "value": "America" - } - ] - } ` modelJson, err := simplejson.NewJson([]byte(json)) @@ -86,7 +100,7 @@ func TestInfluxdbQueryParser(t *testing.T) { res, err := parser.Parse(modelJson) So(err, ShouldBeNil) So(len(res.GroupBy), ShouldEqual, 3) - So(len(res.Selects), ShouldEqual, 2) + So(len(res.Selects), ShouldEqual, 3) So(len(res.Tags), ShouldEqual, 1) }) }) diff --git a/pkg/tsdb/influxdb/query_part.go b/pkg/tsdb/influxdb/query_part.go index 3d5a51d42c0..cba0c57b6b1 100644 --- a/pkg/tsdb/influxdb/query_part.go +++ b/pkg/tsdb/influxdb/query_part.go @@ -1,5 +1,124 @@ package influxdb -type Selector interface { - Render(input string) string +import ( + "fmt" + "strings" +) + +var renders map[string]QueryDefinition + +type QueryDefinition struct { + Renderer func(part *QueryPart, innerExpr string) string +} + +func init() { + renders = make(map[string]QueryDefinition) + + renders["field"] = QueryDefinition{Renderer: fieldRenderer} + + renders["spread"] = QueryDefinition{Renderer: functionRenderer} + renders["count"] = QueryDefinition{Renderer: functionRenderer} + renders["distinct"] = QueryDefinition{Renderer: functionRenderer} + renders["integral"] = QueryDefinition{Renderer: functionRenderer} + renders["mean"] = QueryDefinition{Renderer: functionRenderer} + renders["median"] = QueryDefinition{Renderer: functionRenderer} + renders["sum"] = QueryDefinition{Renderer: functionRenderer} + + renders["derivative"] = QueryDefinition{ + Renderer: functionRenderer, + //params: [{ name: "duration", type: "interval", options: ['1s', '10s', '1m', '5m', '10m', '15m', '1h']}], + } + + renders["non_negative_derivative"] = QueryDefinition{ + Renderer: functionRenderer, + //params: [{ name: "duration", type: "interval", options: ['1s', '10s', '1m', '5m', '10m', '15m', '1h']}], + } + renders["difference"] = QueryDefinition{Renderer: functionRenderer} + renders["moving_average"] = QueryDefinition{ + Renderer: functionRenderer, + //params: [{ name: "window", type: "number", options: [5, 10, 20, 30, 40]}] + } + renders["stddev"] = QueryDefinition{Renderer: functionRenderer} + renders["time"] = QueryDefinition{ + Renderer: functionRenderer, + //params: [{ name: "interval", type: "time", options: ['auto', '1s', '10s', '1m', '5m', '10m', '15m', '1h'] }], + } + renders["fill"] = QueryDefinition{ + Renderer: functionRenderer, + //params: [{ name: "fill", type: "string", options: ['none', 'null', '0', 'previous'] }], + } + renders["elapsed"] = QueryDefinition{ + Renderer: functionRenderer, + //params: [{ name: "duration", type: "interval", options: ['1s', '10s', '1m', '5m', '10m', '15m', '1h']}], + } + renders["bottom"] = QueryDefinition{ + Renderer: functionRenderer, + //params: [{name: 'count', type: 'int'}], + } + + renders["first"] = QueryDefinition{Renderer: functionRenderer} + renders["last"] = QueryDefinition{Renderer: functionRenderer} + renders["max"] = QueryDefinition{Renderer: functionRenderer} + renders["min"] = QueryDefinition{Renderer: functionRenderer} + renders["percentile"] = QueryDefinition{ + Renderer: functionRenderer, + //params: [{name: 'nth', type: 'int'}], + } + renders["top"] = QueryDefinition{ + Renderer: functionRenderer, + //params: [{name: 'count', type: 'int'}], + } + renders["tag"] = QueryDefinition{ + Renderer: fieldRenderer, + //params: [{name: 'tag', type: 'string', dynamicLookup: true}], + } + + renders["math"] = QueryDefinition{Renderer: suffixRenderer} + renders["alias"] = QueryDefinition{Renderer: aliasRenderer} +} + +func fieldRenderer(part *QueryPart, innerExpr string) string { + if part.Params[0] == "*" { + return "*" + } + return fmt.Sprintf(`"%v"`, part.Params[0]) +} + +func functionRenderer(part *QueryPart, innerExpr string) string { + params := strings.Join(part.Params, ", ") + + if len(part.Params) > 0 { + return fmt.Sprintf("%s(%s, %s)", part.Type, innerExpr, params) + } + + return fmt.Sprintf("%s(%s)", part.Type, innerExpr) +} + +func suffixRenderer(part *QueryPart, innerExpr string) string { + return fmt.Sprintf("%s %s", innerExpr, part.Params[0]) +} + +func aliasRenderer(part *QueryPart, innerExpr string) string { + return fmt.Sprintf(`%s AS "%s"`, innerExpr, part.Params[0]) +} + +func (r QueryDefinition) Render(part *QueryPart, innerExpr string) string { + return r.Renderer(part, innerExpr) +} + +type QueryPartDefinition struct { +} + +type QueryPart struct { + Type string + Params []string +} + +func (qp *QueryPart) Render(expr string) (string, error) { + renderFn, exist := renders[qp.Type] + if !exist { + return "", fmt.Errorf("could not find render strategy %s", qp.Type) + } + + return renderFn.Renderer(qp, expr), nil } diff --git a/pkg/tsdb/influxdb/query_part_test.go b/pkg/tsdb/influxdb/query_part_test.go index 20f12cc3359..eb2583e7711 100644 --- a/pkg/tsdb/influxdb/query_part_test.go +++ b/pkg/tsdb/influxdb/query_part_test.go @@ -9,55 +9,54 @@ import ( func TestInfluxdbQueryPart(t *testing.T) { Convey("Influxdb query part builder", t, func() { - Convey("can build query", func() { + Convey("should handle field renderer parts", func() { + part := QueryPart{ + Type: "field", + Params: []string{"value"}, + } + + res, _ := part.Render("value") + So(res, ShouldEqual, `"value"`) }) - Convey("empty queries should return error", func() { + Convey("should handle nested function parts", func() { + part := QueryPart{ + Type: "derivative", + Params: []string{"10s"}, + } + res, _ := part.Render("mean(value)") + So(res, ShouldEqual, "derivative(mean(value), 10s)") + }) + + Convey("should nest spread function", func() { + part := QueryPart{ + Type: "spread", + } + + res, err := part.Render("value") + So(err, ShouldBeNil) + So(res, ShouldEqual, "spread(value)") + }) + + Convey("should handle suffix parts", func() { + part := QueryPart{ + Type: "math", + Params: []string{"/ 100"}, + } + + res, _ := part.Render("mean(value)") + So(res, ShouldEqual, "mean(value) / 100") + }) + + Convey("should handle alias parts", func() { + part := QueryPart{ + Type: "alias", + Params: []string{"test"}, + } + + res, _ := part.Render("mean(value)") + So(res, ShouldEqual, `mean(value) AS "test"`) }) }) } - -/* - describe('series with mesurement only', () => { - it('should handle nested function parts', () => { - var part = queryPart.create({ - type: 'derivative', - params: ['10s'], - }); - - expect(part.text).to.be('derivative(10s)'); - expect(part.render('mean(value)')).to.be('derivative(mean(value), 10s)'); - }); - - it('should nest spread function', () => { - var part = queryPart.create({ - type: 'spread' - }); - - expect(part.text).to.be('spread()'); - expect(part.render('value')).to.be('spread(value)'); - }); - - it('should handle suffirx parts', () => { - var part = queryPart.create({ - type: 'math', - params: ['/ 100'], - }); - - expect(part.text).to.be('math(/ 100)'); - expect(part.render('mean(value)')).to.be('mean(value) / 100'); - }); - - it('should handle alias parts', () => { - var part = queryPart.create({ - type: 'alias', - params: ['test'], - }); - - expect(part.text).to.be('alias(test)'); - expect(part.render('mean(value)')).to.be('mean(value) AS "test"'); - }); - - }); -*/ From 8588bb386c77748b0c94fa368d393c4bd8f636e2 Mon Sep 17 00:00:00 2001 From: bergquist Date: Wed, 5 Oct 2016 16:57:32 +0200 Subject: [PATCH 05/25] feat(influxdb): add conditions property to tag --- pkg/tsdb/influxdb/models.go | 7 ++++--- pkg/tsdb/influxdb/parser.go | 21 ++++++++++++++------- pkg/tsdb/influxdb/parser_test.go | 8 +++++++- 3 files changed, 25 insertions(+), 11 deletions(-) diff --git a/pkg/tsdb/influxdb/models.go b/pkg/tsdb/influxdb/models.go index a70fe7f5d47..4895c9c393b 100644 --- a/pkg/tsdb/influxdb/models.go +++ b/pkg/tsdb/influxdb/models.go @@ -10,9 +10,10 @@ type Query struct { } type Tag struct { - Key string - Operator string - Value string + Key string + Operator string + Value string + Condition string } type Select []QueryPart diff --git a/pkg/tsdb/influxdb/parser.go b/pkg/tsdb/influxdb/parser.go index 0e024ab4f27..c158d5879a1 100644 --- a/pkg/tsdb/influxdb/parser.go +++ b/pkg/tsdb/influxdb/parser.go @@ -69,23 +69,30 @@ func (*InfluxdbQueryParser) parseTags(model *simplejson.Json) ([]*Tag, error) { var result []*Tag for _, t := range model.Get("tags").MustArray() { tagJson := simplejson.NewFromAny(t) + tag := &Tag{} + var err error - key, err := tagJson.Get("key").String() + tag.Key, err = tagJson.Get("key").String() + if err != nil { + return nil, err + } + + tag.Value, err = tagJson.Get("value").String() if err != nil { return nil, err } operator, err := tagJson.Get("operator").String() - if err != nil { - return nil, err + if err == nil { + tag.Operator = operator } - value, err := tagJson.Get("value").String() - if err != nil { - return nil, err + condition, err := tagJson.Get("condition").String() + if err == nil { + tag.Condition = condition } - result = append(result, &Tag{Key: key, Operator: operator, Value: value}) + result = append(result, tag) } return result, nil diff --git a/pkg/tsdb/influxdb/parser_test.go b/pkg/tsdb/influxdb/parser_test.go index 19e2c228f1a..3dc76124365 100644 --- a/pkg/tsdb/influxdb/parser_test.go +++ b/pkg/tsdb/influxdb/parser_test.go @@ -89,6 +89,12 @@ func TestInfluxdbQueryParser(t *testing.T) { "key": "datacenter", "operator": "=", "value": "America" + }, + { + "condition": "OR", + "key": "hostname", + "operator": "=", + "value": "server1" } ] } @@ -101,7 +107,7 @@ func TestInfluxdbQueryParser(t *testing.T) { So(err, ShouldBeNil) So(len(res.GroupBy), ShouldEqual, 3) So(len(res.Selects), ShouldEqual, 3) - So(len(res.Tags), ShouldEqual, 1) + So(len(res.Tags), ShouldEqual, 2) }) }) } From 9968fa5bc5c42b40423488782209476f1d59f289 Mon Sep 17 00:00:00 2001 From: bergquist Date: Wed, 5 Oct 2016 16:59:33 +0200 Subject: [PATCH 06/25] feat(influxdb): add query part definitions --- pkg/tsdb/influxdb/parser.go | 31 +++++++++++++---- pkg/tsdb/influxdb/parser_test.go | 10 +++--- pkg/tsdb/influxdb/query_part.go | 51 +++++++++++++++++---------- pkg/tsdb/influxdb/query_part_test.go | 52 +++++++++++++--------------- 4 files changed, 88 insertions(+), 56 deletions(-) diff --git a/pkg/tsdb/influxdb/parser.go b/pkg/tsdb/influxdb/parser.go index c158d5879a1..7d51d087b5d 100644 --- a/pkg/tsdb/influxdb/parser.go +++ b/pkg/tsdb/influxdb/parser.go @@ -1,6 +1,10 @@ package influxdb -import "github.com/grafana/grafana/pkg/components/simplejson" +import ( + "strconv" + + "github.com/grafana/grafana/pkg/components/simplejson" +) type InfluxdbQueryParser struct{} @@ -108,14 +112,29 @@ func (*InfluxdbQueryParser) parseQueryPart(model *simplejson.Json) (*QueryPart, for _, paramObj := range model.Get("params").MustArray() { param := simplejson.NewFromAny(paramObj) - pv, err := param.String() - if err != nil { - return nil, err + stringParam, err := param.String() + if err == nil { + params = append(params, stringParam) + continue } - params = append(params, pv) + + intParam, err := param.Int() + if err == nil { + params = append(params, strconv.Itoa(intParam)) + continue + } + + return nil, err + } - return &QueryPart{Type: typ, Params: params}, nil + qp, err := NewQueryPart(typ, params) + if err != nil { + return nil, err + } + + return qp, nil + //return &QueryPart{Type: typ, Params: params}, nil } func (qp *InfluxdbQueryParser) parseGroupBy(model *simplejson.Json) ([]*QueryPart, error) { diff --git a/pkg/tsdb/influxdb/parser_test.go b/pkg/tsdb/influxdb/parser_test.go index 3dc76124365..b8bba08616a 100644 --- a/pkg/tsdb/influxdb/parser_test.go +++ b/pkg/tsdb/influxdb/parser_test.go @@ -14,7 +14,7 @@ func TestInfluxdbQueryParser(t *testing.T) { Convey("converting metric name", func() { json := ` - { + { "dsType": "influxdb", "groupBy": [ { @@ -31,7 +31,7 @@ func TestInfluxdbQueryParser(t *testing.T) { }, { "params": [ - "null" + "none" ], "type": "fill" } @@ -61,8 +61,10 @@ func TestInfluxdbQueryParser(t *testing.T) { ] }, { - "type": "mean", - "params": [] + "type": "bottom", + "params": [ + 3 + ] } ], [ diff --git a/pkg/tsdb/influxdb/query_part.go b/pkg/tsdb/influxdb/query_part.go index cba0c57b6b1..ff352f5e048 100644 --- a/pkg/tsdb/influxdb/query_part.go +++ b/pkg/tsdb/influxdb/query_part.go @@ -7,8 +7,14 @@ import ( var renders map[string]QueryDefinition +type DefinitionParameters struct { + Name string + Type string +} + type QueryDefinition struct { Renderer func(part *QueryPart, innerExpr string) string + Params []DefinitionParameters } func init() { @@ -26,34 +32,34 @@ func init() { renders["derivative"] = QueryDefinition{ Renderer: functionRenderer, - //params: [{ name: "duration", type: "interval", options: ['1s', '10s', '1m', '5m', '10m', '15m', '1h']}], + Params: []DefinitionParameters{{Name: "duration", Type: "interval"}}, } renders["non_negative_derivative"] = QueryDefinition{ Renderer: functionRenderer, - //params: [{ name: "duration", type: "interval", options: ['1s', '10s', '1m', '5m', '10m', '15m', '1h']}], + Params: []DefinitionParameters{{Name: "duration", Type: "interval"}}, } renders["difference"] = QueryDefinition{Renderer: functionRenderer} renders["moving_average"] = QueryDefinition{ Renderer: functionRenderer, - //params: [{ name: "window", type: "number", options: [5, 10, 20, 30, 40]}] + Params: []DefinitionParameters{{Name: "window", Type: "number"}}, } renders["stddev"] = QueryDefinition{Renderer: functionRenderer} renders["time"] = QueryDefinition{ Renderer: functionRenderer, - //params: [{ name: "interval", type: "time", options: ['auto', '1s', '10s', '1m', '5m', '10m', '15m', '1h'] }], + Params: []DefinitionParameters{{Name: "interval", Type: "time"}}, } renders["fill"] = QueryDefinition{ Renderer: functionRenderer, - //params: [{ name: "fill", type: "string", options: ['none', 'null', '0', 'previous'] }], + Params: []DefinitionParameters{{Name: "fill", Type: "string"}}, } renders["elapsed"] = QueryDefinition{ Renderer: functionRenderer, - //params: [{ name: "duration", type: "interval", options: ['1s', '10s', '1m', '5m', '10m', '15m', '1h']}], + Params: []DefinitionParameters{{Name: "duration", Type: "interval"}}, } renders["bottom"] = QueryDefinition{ Renderer: functionRenderer, - //params: [{name: 'count', type: 'int'}], + Params: []DefinitionParameters{{Name: "count", Type: "int"}}, } renders["first"] = QueryDefinition{Renderer: functionRenderer} @@ -62,15 +68,15 @@ func init() { renders["min"] = QueryDefinition{Renderer: functionRenderer} renders["percentile"] = QueryDefinition{ Renderer: functionRenderer, - //params: [{name: 'nth', type: 'int'}], + Params: []DefinitionParameters{{Name: "nth", Type: "int"}}, } renders["top"] = QueryDefinition{ Renderer: functionRenderer, - //params: [{name: 'count', type: 'int'}], + Params: []DefinitionParameters{{Name: "count", Type: "int"}}, } renders["tag"] = QueryDefinition{ Renderer: fieldRenderer, - //params: [{name: 'tag', type: 'string', dynamicLookup: true}], + Params: []DefinitionParameters{{Name: "tag", Type: "string"}}, } renders["math"] = QueryDefinition{Renderer: suffixRenderer} @@ -81,7 +87,7 @@ func fieldRenderer(part *QueryPart, innerExpr string) string { if part.Params[0] == "*" { return "*" } - return fmt.Sprintf(`"%v"`, part.Params[0]) + return fmt.Sprintf(`"%s"`, part.Params[0]) } func functionRenderer(part *QueryPart, innerExpr string) string { @@ -106,19 +112,26 @@ func (r QueryDefinition) Render(part *QueryPart, innerExpr string) string { return r.Renderer(part, innerExpr) } -type QueryPartDefinition struct { +func NewQueryPart(typ string, params []string) (*QueryPart, error) { + def, exist := renders[typ] + + if !exist { + return nil, fmt.Errorf("Missing query definition for %s", typ) + } + + return &QueryPart{ + Type: typ, + Params: params, + Def: def, + }, nil } type QueryPart struct { + Def QueryDefinition Type string Params []string } -func (qp *QueryPart) Render(expr string) (string, error) { - renderFn, exist := renders[qp.Type] - if !exist { - return "", fmt.Errorf("could not find render strategy %s", qp.Type) - } - - return renderFn.Renderer(qp, expr), nil +func (qp *QueryPart) Render(expr string) string { + return qp.Def.Renderer(qp, expr) } diff --git a/pkg/tsdb/influxdb/query_part_test.go b/pkg/tsdb/influxdb/query_part_test.go index eb2583e7711..b1c24b7070f 100644 --- a/pkg/tsdb/influxdb/query_part_test.go +++ b/pkg/tsdb/influxdb/query_part_test.go @@ -10,52 +10,50 @@ func TestInfluxdbQueryPart(t *testing.T) { Convey("Influxdb query part builder", t, func() { Convey("should handle field renderer parts", func() { - part := QueryPart{ - Type: "field", - Params: []string{"value"}, - } + part, err := NewQueryPart("field", []string{"value"}) + So(err, ShouldBeNil) - res, _ := part.Render("value") + res := part.Render("value") So(res, ShouldEqual, `"value"`) }) Convey("should handle nested function parts", func() { - part := QueryPart{ - Type: "derivative", - Params: []string{"10s"}, - } + part, err := NewQueryPart("derivative", []string{"10s"}) + So(err, ShouldBeNil) - res, _ := part.Render("mean(value)") + res := part.Render("mean(value)") So(res, ShouldEqual, "derivative(mean(value), 10s)") }) - Convey("should nest spread function", func() { - part := QueryPart{ - Type: "spread", - } - - res, err := part.Render("value") + Convey("bottom", func() { + part, err := NewQueryPart("bottom", []string{"3"}) So(err, ShouldBeNil) - So(res, ShouldEqual, "spread(value)") + + res := part.Render("value") + So(res, ShouldEqual, "bottom(value, 3)") + }) + + Convey("should nest spread function", func() { + part, err := NewQueryPart("spread", []string{}) + So(err, ShouldBeNil) + + res := part.Render("value") + So(res, ShouldEqual, `spread(value)`) }) Convey("should handle suffix parts", func() { - part := QueryPart{ - Type: "math", - Params: []string{"/ 100"}, - } + part, err := NewQueryPart("math", []string{"/ 100"}) + So(err, ShouldBeNil) - res, _ := part.Render("mean(value)") + res := part.Render("mean(value)") So(res, ShouldEqual, "mean(value) / 100") }) Convey("should handle alias parts", func() { - part := QueryPart{ - Type: "alias", - Params: []string{"test"}, - } + part, err := NewQueryPart("alias", []string{"test"}) + So(err, ShouldBeNil) - res, _ := part.Render("mean(value)") + res := part.Render("mean(value)") So(res, ShouldEqual, `mean(value) AS "test"`) }) }) From 4387d202225a297296c779e5b47cf939d9bcd5ad Mon Sep 17 00:00:00 2001 From: bergquist Date: Wed, 5 Oct 2016 20:36:05 +0200 Subject: [PATCH 07/25] feat(influxdb): render select and groupby --- pkg/tsdb/influxdb/influxdb.go | 10 +++- pkg/tsdb/influxdb/query_builder.go | 61 +++++++++++++++++++++-- pkg/tsdb/influxdb/query_builder_test.go | 66 +++++++++++-------------- pkg/tsdb/influxdb/query_part.go | 10 ++-- 4 files changed, 99 insertions(+), 48 deletions(-) diff --git a/pkg/tsdb/influxdb/influxdb.go b/pkg/tsdb/influxdb/influxdb.go index b0775ebf9dd..8f45e8d2522 100644 --- a/pkg/tsdb/influxdb/influxdb.go +++ b/pkg/tsdb/influxdb/influxdb.go @@ -12,13 +12,15 @@ import ( type InfluxDBExecutor struct { *tsdb.DataSourceInfo - QueryParser *InfluxdbQueryParser + QueryParser *InfluxdbQueryParser + QueryBuilder *QueryBuild } func NewInfluxDBExecutor(dsInfo *tsdb.DataSourceInfo) tsdb.Executor { return &InfluxDBExecutor{ DataSourceInfo: dsInfo, QueryParser: &InfluxdbQueryParser{}, + QueryBuilder: &QueryBuild{}, } } @@ -46,12 +48,16 @@ func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, for _, v := range queries { query, err := e.QueryParser.Parse(v.Model) - if err != nil { result.Error = err return result } + glog.Info("Influxdb executor", "query", query) + + rawQuery, err := e.QueryBuilder.Build(query) + + glog.Info("Influxdb", "error", err, "rawQuery", rawQuery) } return result diff --git a/pkg/tsdb/influxdb/query_builder.go b/pkg/tsdb/influxdb/query_builder.go index 86d4981133e..487193584d8 100644 --- a/pkg/tsdb/influxdb/query_builder.go +++ b/pkg/tsdb/influxdb/query_builder.go @@ -1,10 +1,65 @@ package influxdb -import "fmt" +import ( + "fmt" + "strings" +) type QueryBuild struct{} -func (*QueryBuild) Build(query *Query) (string, error) { +func renderTags(query *Query) []string { + var res []string + for i, tag := range query.Tags { + str := "" - return "", fmt.Errorf("query is not valid") + if i > 0 { + if tag.Condition == "" { + str += "AND" + } else { + str += tag.Condition + } + str += " " + } + + res = append(res, fmt.Sprintf(`%s"%s" %s '%s'`, str, tag.Key, tag.Operator, tag.Value)) + } + + return res +} + +func (*QueryBuild) Build(query *Query) (string, error) { + res := "SELECT " + + var selectors []string + for _, sel := range query.Selects { + + stk := "" + for _, s := range *sel { + stk = s.Render(stk) + } + selectors = append(selectors, stk) + } + res += strings.Join(selectors, ", ") + + res += fmt.Sprintf(` FROM "%s"`, query.Measurement) + + res += " WHERE " + conditions := renderTags(query) + res += strings.Join(conditions, " ") + if len(conditions) > 0 { + res += " AND " + } + + res += "$timeFilter" + + var groupBy []string + for _, group := range query.GroupBy { + groupBy = append(groupBy, group.Render("")) + } + + if len(groupBy) > 0 { + res += " GROUP BY " + strings.Join(groupBy, " ") + } + + return res, nil } diff --git a/pkg/tsdb/influxdb/query_builder_test.go b/pkg/tsdb/influxdb/query_builder_test.go index 7ec80d389dd..0e5a25eca5f 100644 --- a/pkg/tsdb/influxdb/query_builder_test.go +++ b/pkg/tsdb/influxdb/query_builder_test.go @@ -8,50 +8,40 @@ import ( func TestInfluxdbQueryBuilder(t *testing.T) { Convey("Influxdb query builder", t, func() { + builder := QueryBuild{} - builder := &QueryBuild{} + qp1, _ := NewQueryPart("field", []string{"value"}) + qp2, _ := NewQueryPart("mean", []string{}) + + groupBy1, _ := NewQueryPart("time", []string{"$interval"}) + groupBy2, _ := NewQueryPart("fill", []string{"null"}) + + tag1 := &Tag{Key: "hostname", Value: "server1", Operator: "="} + tag2 := &Tag{Key: "hostname", Value: "server2", Operator: "=", Condition: "OR"} Convey("can build query", func() { - //query := &Query{} - //res, err := builder.Build(query) - //So(err, ShouldBeNil) + query := &Query{ + Selects: []*Select{{*qp1, *qp2}}, + Measurement: "cpu", + GroupBy: []*QueryPart{groupBy1, groupBy2}, + } + + rawQuery, err := builder.Build(query) + So(err, ShouldBeNil) + So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "cpu" WHERE $timeFilter GROUP BY time($interval) fill(null)`) }) - Convey("empty queries should return error", func() { - query := &Query{} + Convey("can asd query", func() { + query := &Query{ + Selects: []*Select{{*qp1, *qp2}}, + Measurement: "cpu", + GroupBy: []*QueryPart{groupBy1}, + Tags: []*Tag{tag1, tag2}, + } - res, err := builder.Build(query) - So(err, ShouldNotBeNil) - So(res, ShouldEqual, "") + rawQuery, err := builder.Build(query) + So(err, ShouldBeNil) + So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "cpu" WHERE "hostname" = 'server1' OR "hostname" = 'server2' AND $timeFilter GROUP BY time($interval)`) }) }) } - -/* - describe('render series with mesurement only', function() { - it('should generate correct query', function() { - var query = new InfluxQuery({ - measurement: 'cpu', - }, templateSrv, {}); - - var queryText = query.render(); - expect(queryText).to.be('SELECT mean("value") FROM "cpu" WHERE $timeFilter GROUP BY time($interval) fill(null)'); - }); - }); -*/ - -/* - describe('series with tags OR condition', function() { - it('should generate correct query', function() { - var query = new InfluxQuery({ - measurement: 'cpu', - groupBy: [{type: 'time', params: ['auto']}], - tags: [{key: 'hostname', value: 'server1'}, {key: 'hostname', value: 'server2', condition: "OR"}] - }, templateSrv, {}); - - var queryText = query.render(); - expect(queryText).to.be('SELECT mean("value") FROM "cpu" WHERE "hostname" = \'server1\' OR "hostname" = \'server2\' AND ' + - '$timeFilter GROUP BY time($interval)'); - }); - }); -*/ diff --git a/pkg/tsdb/influxdb/query_part.go b/pkg/tsdb/influxdb/query_part.go index ff352f5e048..32ba6ba4d4a 100644 --- a/pkg/tsdb/influxdb/query_part.go +++ b/pkg/tsdb/influxdb/query_part.go @@ -91,13 +91,13 @@ func fieldRenderer(part *QueryPart, innerExpr string) string { } func functionRenderer(part *QueryPart, innerExpr string) string { - params := strings.Join(part.Params, ", ") - - if len(part.Params) > 0 { - return fmt.Sprintf("%s(%s, %s)", part.Type, innerExpr, params) + if innerExpr != "" { + part.Params = append([]string{innerExpr}, part.Params...) } - return fmt.Sprintf("%s(%s)", part.Type, innerExpr) + params := strings.Join(part.Params, ", ") + + return fmt.Sprintf("%s(%s)", part.Type, params) } func suffixRenderer(part *QueryPart, innerExpr string) string { From 887ca40455b416caa4956f460d88fbf5e2072f55 Mon Sep 17 00:00:00 2001 From: bergquist Date: Wed, 5 Oct 2016 20:57:28 +0200 Subject: [PATCH 08/25] feat(influxdb): add support for policies --- pkg/tsdb/influxdb/query_builder.go | 6 +++++- pkg/tsdb/influxdb/query_builder_test.go | 3 ++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/tsdb/influxdb/query_builder.go b/pkg/tsdb/influxdb/query_builder.go index 487193584d8..d82950ba4bf 100644 --- a/pkg/tsdb/influxdb/query_builder.go +++ b/pkg/tsdb/influxdb/query_builder.go @@ -41,7 +41,11 @@ func (*QueryBuild) Build(query *Query) (string, error) { } res += strings.Join(selectors, ", ") - res += fmt.Sprintf(` FROM "%s"`, query.Measurement) + policy := "" + if query.Policy != "" { + policy = `"` + query.Policy + `".` + } + res += fmt.Sprintf(` FROM %s"%s"`, policy, query.Measurement) res += " WHERE " conditions := renderTags(query) diff --git a/pkg/tsdb/influxdb/query_builder_test.go b/pkg/tsdb/influxdb/query_builder_test.go index 0e5a25eca5f..ce0c2046c6a 100644 --- a/pkg/tsdb/influxdb/query_builder_test.go +++ b/pkg/tsdb/influxdb/query_builder_test.go @@ -23,12 +23,13 @@ func TestInfluxdbQueryBuilder(t *testing.T) { query := &Query{ Selects: []*Select{{*qp1, *qp2}}, Measurement: "cpu", + Policy: "policy", GroupBy: []*QueryPart{groupBy1, groupBy2}, } rawQuery, err := builder.Build(query) So(err, ShouldBeNil) - So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "cpu" WHERE $timeFilter GROUP BY time($interval) fill(null)`) + So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "policy"."cpu" WHERE $timeFilter GROUP BY time($interval) fill(null)`) }) Convey("can asd query", func() { From b519d5e92cc0e216c95873494650556dd55ea4e1 Mon Sep 17 00:00:00 2001 From: bergquist Date: Thu, 6 Oct 2016 12:50:06 +0200 Subject: [PATCH 09/25] feat(docker): update docker-compose for influxdb --- docker/blocks/influxdb/fig | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docker/blocks/influxdb/fig b/docker/blocks/influxdb/fig index bdb4a274634..7b83bb2bab4 100644 --- a/docker/blocks/influxdb/fig +++ b/docker/blocks/influxdb/fig @@ -1,5 +1,7 @@ influxdb: - image: tutum/influxdb:0.12 + #image: influxdb/influxdb:1.0-alpine + image: influxdb:latest + container_name: influxdb ports: - "2004:2004" - "8083:8083" From ab8751767c93f7ee82d1de84f1b02a16da06c905 Mon Sep 17 00:00:00 2001 From: bergquist Date: Thu, 6 Oct 2016 12:51:45 +0200 Subject: [PATCH 10/25] feat(influxdb): send request and parse response --- pkg/tsdb/influxdb/influxdb.go | 145 ++++++++++++++++++++++-- pkg/tsdb/influxdb/models.go | 2 + pkg/tsdb/influxdb/query_builder.go | 11 +- pkg/tsdb/influxdb/query_builder_test.go | 15 ++- pkg/tsdb/influxdb/query_part.go | 6 + pkg/tsdb/influxdb/query_part_test.go | 8 ++ pkg/tsdb/models.go | 6 +- pkg/tsdb/prometheus/prometheus.go | 4 +- pkg/tsdb/testdata/scenarios.go | 8 +- 9 files changed, 182 insertions(+), 23 deletions(-) diff --git a/pkg/tsdb/influxdb/influxdb.go b/pkg/tsdb/influxdb/influxdb.go index 8f45e8d2522..566b807bfcf 100644 --- a/pkg/tsdb/influxdb/influxdb.go +++ b/pkg/tsdb/influxdb/influxdb.go @@ -3,9 +3,17 @@ package influxdb import ( "context" "crypto/tls" + "encoding/json" + "fmt" "net/http" + "net/url" + "path" "time" + "gopkg.in/guregu/null.v3" + + "golang.org/x/net/context/ctxhttp" + "github.com/grafana/grafana/pkg/log" "github.com/grafana/grafana/pkg/tsdb" ) @@ -43,22 +51,141 @@ func init() { } } -func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult { - result := &tsdb.BatchResult{} +func (e *InfluxDBExecutor) getQuery(queries tsdb.QuerySlice, context *tsdb.QueryContext) (string, error) { for _, v := range queries { - query, err := e.QueryParser.Parse(v.Model) if err != nil { - result.Error = err - return result + return "", err } - glog.Info("Influxdb executor", "query", query) + rawQuery, err := e.QueryBuilder.Build(query, context) + if err != nil { + return "", err + } - rawQuery, err := e.QueryBuilder.Build(query) - - glog.Info("Influxdb", "error", err, "rawQuery", rawQuery) + return rawQuery, nil } + return "", fmt.Errorf("Tsdb request contains no queries") +} + +func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult { + result := &tsdb.BatchResult{} + + query, err := e.getQuery(queries, context) + if err != nil { + result.Error = err + return result + } + + glog.Info("Influxdb", "query", query) + + u, _ := url.Parse(e.Url) + u.Path = path.Join(u.Path, "query") + + req, err := http.NewRequest(http.MethodGet, u.String(), nil) + if err != nil { + result.Error = err + return result + } + + params := req.URL.Query() + params.Set("q", query) + params.Set("db", e.Database) + params.Set("epoch", "s") + + req.URL.RawQuery = params.Encode() + + req.Header.Set("Content-Type", "") + req.Header.Set("User-Agent", "Grafana") + if e.BasicAuth { + req.SetBasicAuth(e.BasicAuthUser, e.BasicAuthPassword) + } + + glog.Info("influxdb request", "url", req.URL.String()) + resp, err := ctxhttp.Do(ctx, HttpClient, req) + if err != nil { + result.Error = err + return result + } + + if resp.StatusCode/100 != 2 { + result.Error = fmt.Errorf("Influxdb returned statuscode %v body %v", resp.Status) + return result + } + + var response Response + dec := json.NewDecoder(resp.Body) + dec.UseNumber() + err = dec.Decode(&response) + if err != nil { + glog.Error("Influxdb decode failed", "err", err) + result.Error = err + return result + } + + result.QueryResults = make(map[string]*tsdb.QueryResult) + queryRes := tsdb.NewQueryResult() + + for _, v := range response.Results { + for _, r := range v.Series { + serie := tsdb.TimeSeries{Name: r.Name} + var points tsdb.TimeSeriesPoints + + for _, k := range r.Values { + var value null.Float + var err error + num, ok := k[1].(json.Number) + if !ok { + value = null.FloatFromPtr(nil) + } else { + fvalue, err := num.Float64() + if err == nil { + value = null.FloatFrom(fvalue) + } + } + + pos0, ok := k[0].(json.Number) + timestamp, err := pos0.Float64() + if err == nil && ok { + points = append(points, tsdb.NewTimePoint(value, timestamp)) + } else { + glog.Error("Failed to convert response", "err1", err, "ok", ok, "timestamp", timestamp, "value", value.Float64) + } + serie.Points = points + } + queryRes.Series = append(queryRes.Series, &serie) + } + } + + for _, v := range queryRes.Series { + glog.Info("result", "name", v.Name, "points", v.Points) + } + + result.QueryResults["A"] = queryRes + return result } + +type Response struct { + Results []Result + Err error +} + +type Result struct { + Series []Row + Messages []*Message + Err error +} + +type Message struct { + Level string `json:"level,omitempty"` + Text string `json:"text,omitempty"` +} + +type Row struct { + Name string `json:"name,omitempty"` + Tags map[string]string `json:"tags,omitempty"` + Columns []string `json:"columns,omitempty"` + Values [][]interface{} `json:"values,omitempty"` +} diff --git a/pkg/tsdb/influxdb/models.go b/pkg/tsdb/influxdb/models.go index 4895c9c393b..3de80154ee2 100644 --- a/pkg/tsdb/influxdb/models.go +++ b/pkg/tsdb/influxdb/models.go @@ -7,6 +7,8 @@ type Query struct { Tags []*Tag GroupBy []*QueryPart Selects []*Select + + Interval string } type Tag struct { diff --git a/pkg/tsdb/influxdb/query_builder.go b/pkg/tsdb/influxdb/query_builder.go index d82950ba4bf..22863ce78bf 100644 --- a/pkg/tsdb/influxdb/query_builder.go +++ b/pkg/tsdb/influxdb/query_builder.go @@ -3,6 +3,8 @@ package influxdb import ( "fmt" "strings" + + "github.com/grafana/grafana/pkg/tsdb" ) type QueryBuild struct{} @@ -27,7 +29,7 @@ func renderTags(query *Query) []string { return res } -func (*QueryBuild) Build(query *Query) (string, error) { +func (*QueryBuild) Build(query *Query, queryContext *tsdb.QueryContext) (string, error) { res := "SELECT " var selectors []string @@ -42,7 +44,9 @@ func (*QueryBuild) Build(query *Query) (string, error) { res += strings.Join(selectors, ", ") policy := "" - if query.Policy != "" { + if query.Policy == "" || query.Policy == "default" { + policy = "" + } else { policy = `"` + query.Policy + `".` } res += fmt.Sprintf(` FROM %s"%s"`, policy, query.Measurement) @@ -54,7 +58,8 @@ func (*QueryBuild) Build(query *Query) (string, error) { res += " AND " } - res += "$timeFilter" + //res += "$timeFilter" + res += "time > " + strings.Replace(queryContext.TimeRange.From, "now", "now()", 1) var groupBy []string for _, group := range query.GroupBy { diff --git a/pkg/tsdb/influxdb/query_builder_test.go b/pkg/tsdb/influxdb/query_builder_test.go index ce0c2046c6a..f464117bd3e 100644 --- a/pkg/tsdb/influxdb/query_builder_test.go +++ b/pkg/tsdb/influxdb/query_builder_test.go @@ -3,6 +3,7 @@ package influxdb import ( "testing" + "github.com/grafana/grafana/pkg/tsdb" . "github.com/smartystreets/goconvey/convey" ) @@ -19,17 +20,22 @@ func TestInfluxdbQueryBuilder(t *testing.T) { tag1 := &Tag{Key: "hostname", Value: "server1", Operator: "="} tag2 := &Tag{Key: "hostname", Value: "server2", Operator: "=", Condition: "OR"} + queryContext := &tsdb.QueryContext{ + TimeRange: tsdb.NewTimeRange("now-5h", "now"), + } + Convey("can build query", func() { query := &Query{ Selects: []*Select{{*qp1, *qp2}}, Measurement: "cpu", Policy: "policy", GroupBy: []*QueryPart{groupBy1, groupBy2}, + Interval: "10s", } - rawQuery, err := builder.Build(query) + rawQuery, err := builder.Build(query, queryContext) So(err, ShouldBeNil) - So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "policy"."cpu" WHERE $timeFilter GROUP BY time($interval) fill(null)`) + So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "policy"."cpu" WHERE time > now()-5h GROUP BY time(10s) fill(null)`) }) Convey("can asd query", func() { @@ -38,11 +44,12 @@ func TestInfluxdbQueryBuilder(t *testing.T) { Measurement: "cpu", GroupBy: []*QueryPart{groupBy1}, Tags: []*Tag{tag1, tag2}, + Interval: "5s", } - rawQuery, err := builder.Build(query) + rawQuery, err := builder.Build(query, queryContext) So(err, ShouldBeNil) - So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "cpu" WHERE "hostname" = 'server1' OR "hostname" = 'server2' AND $timeFilter GROUP BY time($interval)`) + So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "cpu" WHERE "hostname" = 'server1' OR "hostname" = 'server2' AND time > now()-5h GROUP BY time(10s)`) }) }) } diff --git a/pkg/tsdb/influxdb/query_part.go b/pkg/tsdb/influxdb/query_part.go index 32ba6ba4d4a..ba14bcac665 100644 --- a/pkg/tsdb/influxdb/query_part.go +++ b/pkg/tsdb/influxdb/query_part.go @@ -91,6 +91,12 @@ func fieldRenderer(part *QueryPart, innerExpr string) string { } func functionRenderer(part *QueryPart, innerExpr string) string { + for i, v := range part.Params { + if v == "$interval" { + part.Params[i] = "10s" + } + } + if innerExpr != "" { part.Params = append([]string{innerExpr}, part.Params...) } diff --git a/pkg/tsdb/influxdb/query_part_test.go b/pkg/tsdb/influxdb/query_part_test.go index b1c24b7070f..2160d59b851 100644 --- a/pkg/tsdb/influxdb/query_part_test.go +++ b/pkg/tsdb/influxdb/query_part_test.go @@ -33,6 +33,14 @@ func TestInfluxdbQueryPart(t *testing.T) { So(res, ShouldEqual, "bottom(value, 3)") }) + Convey("time", func() { + part, err := NewQueryPart("time", []string{"$interval"}) + So(err, ShouldBeNil) + + res := part.Render("") + So(res, ShouldEqual, "time(10s)") + }) + Convey("should nest spread function", func() { part, err := NewQueryPart("spread", []string{}) So(err, ShouldBeNil) diff --git a/pkg/tsdb/models.go b/pkg/tsdb/models.go index 918f0ad65eb..7ba25f9674c 100644 --- a/pkg/tsdb/models.go +++ b/pkg/tsdb/models.go @@ -73,15 +73,15 @@ func NewQueryResult() *QueryResult { } } -func NewTimePoint(value float64, timestamp float64) TimePoint { - return TimePoint{null.FloatFrom(value), null.FloatFrom(timestamp)} +func NewTimePoint(value null.Float, timestamp float64) TimePoint { + return TimePoint{value, null.FloatFrom(timestamp)} } func NewTimeSeriesPointsFromArgs(values ...float64) TimeSeriesPoints { points := make(TimeSeriesPoints, 0) for i := 0; i < len(values); i += 2 { - points = append(points, NewTimePoint(values[i], values[i+1])) + points = append(points, NewTimePoint(null.FloatFrom(values[i]), values[i+1])) } return points diff --git a/pkg/tsdb/prometheus/prometheus.go b/pkg/tsdb/prometheus/prometheus.go index 98d4c72fd03..c95f4b49467 100644 --- a/pkg/tsdb/prometheus/prometheus.go +++ b/pkg/tsdb/prometheus/prometheus.go @@ -8,6 +8,8 @@ import ( "strings" "time" + "gopkg.in/guregu/null.v3" + "github.com/grafana/grafana/pkg/log" "github.com/grafana/grafana/pkg/tsdb" "github.com/prometheus/client_golang/api/prometheus" @@ -145,7 +147,7 @@ func parseResponse(value pmodel.Value, query *PrometheusQuery) (map[string]*tsdb } for _, k := range v.Values { - series.Points = append(series.Points, tsdb.NewTimePoint(float64(k.Value), float64(k.Timestamp.Unix()*1000))) + series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(float64(k.Value)), float64(k.Timestamp.Unix()*1000))) } queryRes.Series = append(queryRes.Series, &series) diff --git a/pkg/tsdb/testdata/scenarios.go b/pkg/tsdb/testdata/scenarios.go index e90b0d4df79..29a0b5a2ada 100644 --- a/pkg/tsdb/testdata/scenarios.go +++ b/pkg/tsdb/testdata/scenarios.go @@ -6,6 +6,8 @@ import ( "strings" "time" + "gopkg.in/guregu/null.v3" + "github.com/grafana/grafana/pkg/log" "github.com/grafana/grafana/pkg/tsdb" ) @@ -42,7 +44,7 @@ func init() { walker := rand.Float64() * 100 for i := int64(0); i < 10000 && timeWalkerMs < to; i++ { - points = append(points, tsdb.NewTimePoint(walker, float64(timeWalkerMs))) + points = append(points, tsdb.NewTimePoint(null.FloatFrom(walker), float64(timeWalkerMs))) walker += rand.Float64() - 0.5 timeWalkerMs += query.IntervalMs @@ -73,7 +75,7 @@ func init() { series := newSeriesForQuery(query) outsideTime := context.TimeRange.MustGetFrom().Add(-1*time.Hour).Unix() * 1000 - series.Points = append(series.Points, tsdb.NewTimePoint(10, float64(outsideTime))) + series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(10), float64(outsideTime))) queryRes.Series = append(queryRes.Series, series) return queryRes @@ -105,7 +107,7 @@ func init() { step := (endTime - startTime) / int64(len(values)-1) for _, val := range values { - series.Points = append(series.Points, tsdb.NewTimePoint(val, float64(startTime))) + series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(val), float64(startTime))) startTime += step } From d8aa38fafe8cda0736eb307e114894e5fd5dd18b Mon Sep 17 00:00:00 2001 From: bergquist Date: Thu, 6 Oct 2016 14:16:26 +0200 Subject: [PATCH 11/25] tech(influxdb): refactor and cleanup --- .../alerting/conditions/reducer_test.go | 4 +- pkg/tsdb/influxdb/influxdb.go | 131 +++++------------- .../influxdb/{parser.go => model_parser.go} | 0 pkg/tsdb/influxdb/model_parser_test.go | 115 +++++++++++++++ pkg/tsdb/influxdb/models.go | 23 +++ pkg/tsdb/influxdb/parser_test.go | 115 --------------- pkg/tsdb/influxdb/query_builder.go | 40 ++++-- pkg/tsdb/influxdb/query_builder_test.go | 12 +- pkg/tsdb/influxdb/query_part_test.go | 16 +-- pkg/tsdb/influxdb/response_parser.go | 49 +++++++ pkg/tsdb/influxdb/response_parser_test.go | 49 +++++++ pkg/tsdb/models.go | 5 + pkg/tsdb/prometheus/prometheus.go | 11 +- 13 files changed, 330 insertions(+), 240 deletions(-) rename pkg/tsdb/influxdb/{parser.go => model_parser.go} (100%) create mode 100644 pkg/tsdb/influxdb/model_parser_test.go delete mode 100644 pkg/tsdb/influxdb/parser_test.go create mode 100644 pkg/tsdb/influxdb/response_parser.go create mode 100644 pkg/tsdb/influxdb/response_parser_test.go diff --git a/pkg/services/alerting/conditions/reducer_test.go b/pkg/services/alerting/conditions/reducer_test.go index 67765f9c310..198a52b746a 100644 --- a/pkg/services/alerting/conditions/reducer_test.go +++ b/pkg/services/alerting/conditions/reducer_test.go @@ -3,6 +3,8 @@ package conditions import ( "testing" + "gopkg.in/guregu/null.v3" + "github.com/grafana/grafana/pkg/tsdb" . "github.com/smartystreets/goconvey/convey" ) @@ -43,7 +45,7 @@ func testReducer(typ string, datapoints ...float64) float64 { } for idx := range datapoints { - series.Points = append(series.Points, tsdb.NewTimePoint(datapoints[idx], 1234134)) + series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(datapoints[idx]), 1234134)) } return reducer.Reduce(series).Float64 diff --git a/pkg/tsdb/influxdb/influxdb.go b/pkg/tsdb/influxdb/influxdb.go index 566b807bfcf..d914381b11c 100644 --- a/pkg/tsdb/influxdb/influxdb.go +++ b/pkg/tsdb/influxdb/influxdb.go @@ -10,8 +10,6 @@ import ( "path" "time" - "gopkg.in/guregu/null.v3" - "golang.org/x/net/context/ctxhttp" "github.com/grafana/grafana/pkg/log" @@ -21,14 +19,14 @@ import ( type InfluxDBExecutor struct { *tsdb.DataSourceInfo QueryParser *InfluxdbQueryParser - QueryBuilder *QueryBuild + QueryBuilder *QueryBuilder } func NewInfluxDBExecutor(dsInfo *tsdb.DataSourceInfo) tsdb.Executor { return &InfluxDBExecutor{ DataSourceInfo: dsInfo, QueryParser: &InfluxdbQueryParser{}, - QueryBuilder: &QueryBuild{}, + QueryBuilder: &QueryBuilder{}, } } @@ -66,7 +64,31 @@ func (e *InfluxDBExecutor) getQuery(queries tsdb.QuerySlice, context *tsdb.Query return rawQuery, nil } - return "", fmt.Errorf("Tsdb request contains no queries") + return "", fmt.Errorf("query request contains no queries") +} + +func (e *InfluxDBExecutor) createRequest(query string) (*http.Request, error) { + u, _ := url.Parse(e.Url) + u.Path = path.Join(u.Path, "query") + + req, err := http.NewRequest(http.MethodGet, u.String(), nil) + if err != nil { + return nil, err + } + + params := req.URL.Query() + params.Set("q", query) + params.Set("db", e.Database) + params.Set("epoch", "s") + req.URL.RawQuery = params.Encode() + + req.Header.Set("User-Agent", "Grafana") + if e.BasicAuth { + req.SetBasicAuth(e.BasicAuthUser, e.BasicAuthPassword) + } + + glog.Debug("influxdb request", "url", req.URL.String()) + return req, nil } func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult { @@ -74,44 +96,23 @@ func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, query, err := e.getQuery(queries, context) if err != nil { - result.Error = err - return result + return result.WithError(err) } - glog.Info("Influxdb", "query", query) + glog.Debug("Influxdb query", "raw query", query) - u, _ := url.Parse(e.Url) - u.Path = path.Join(u.Path, "query") - - req, err := http.NewRequest(http.MethodGet, u.String(), nil) + req, err := e.createRequest(query) if err != nil { - result.Error = err - return result + return result.WithError(err) } - params := req.URL.Query() - params.Set("q", query) - params.Set("db", e.Database) - params.Set("epoch", "s") - - req.URL.RawQuery = params.Encode() - - req.Header.Set("Content-Type", "") - req.Header.Set("User-Agent", "Grafana") - if e.BasicAuth { - req.SetBasicAuth(e.BasicAuthUser, e.BasicAuthPassword) - } - - glog.Info("influxdb request", "url", req.URL.String()) resp, err := ctxhttp.Do(ctx, HttpClient, req) if err != nil { - result.Error = err - return result + return result.WithError(err) } if resp.StatusCode/100 != 2 { - result.Error = fmt.Errorf("Influxdb returned statuscode %v body %v", resp.Status) - return result + return result.WithError(fmt.Errorf("Influxdb returned statuscode invalid status code: %v", resp.Status)) } var response Response @@ -119,73 +120,11 @@ func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, dec.UseNumber() err = dec.Decode(&response) if err != nil { - glog.Error("Influxdb decode failed", "err", err) - result.Error = err - return result + return result.WithError(err) } result.QueryResults = make(map[string]*tsdb.QueryResult) - queryRes := tsdb.NewQueryResult() - - for _, v := range response.Results { - for _, r := range v.Series { - serie := tsdb.TimeSeries{Name: r.Name} - var points tsdb.TimeSeriesPoints - - for _, k := range r.Values { - var value null.Float - var err error - num, ok := k[1].(json.Number) - if !ok { - value = null.FloatFromPtr(nil) - } else { - fvalue, err := num.Float64() - if err == nil { - value = null.FloatFrom(fvalue) - } - } - - pos0, ok := k[0].(json.Number) - timestamp, err := pos0.Float64() - if err == nil && ok { - points = append(points, tsdb.NewTimePoint(value, timestamp)) - } else { - glog.Error("Failed to convert response", "err1", err, "ok", ok, "timestamp", timestamp, "value", value.Float64) - } - serie.Points = points - } - queryRes.Series = append(queryRes.Series, &serie) - } - } - - for _, v := range queryRes.Series { - glog.Info("result", "name", v.Name, "points", v.Points) - } - - result.QueryResults["A"] = queryRes + result.QueryResults["A"] = ParseQueryResult(&response) return result } - -type Response struct { - Results []Result - Err error -} - -type Result struct { - Series []Row - Messages []*Message - Err error -} - -type Message struct { - Level string `json:"level,omitempty"` - Text string `json:"text,omitempty"` -} - -type Row struct { - Name string `json:"name,omitempty"` - Tags map[string]string `json:"tags,omitempty"` - Columns []string `json:"columns,omitempty"` - Values [][]interface{} `json:"values,omitempty"` -} diff --git a/pkg/tsdb/influxdb/parser.go b/pkg/tsdb/influxdb/model_parser.go similarity index 100% rename from pkg/tsdb/influxdb/parser.go rename to pkg/tsdb/influxdb/model_parser.go diff --git a/pkg/tsdb/influxdb/model_parser_test.go b/pkg/tsdb/influxdb/model_parser_test.go new file mode 100644 index 00000000000..b2ba1afa5af --- /dev/null +++ b/pkg/tsdb/influxdb/model_parser_test.go @@ -0,0 +1,115 @@ +package influxdb + +import ( + "testing" + + "github.com/grafana/grafana/pkg/components/simplejson" + . "github.com/smartystreets/goconvey/convey" +) + +func TestInfluxdbQueryParser(t *testing.T) { + Convey("Influxdb query parser", t, func() { + + parser := &InfluxdbQueryParser{} + + Convey("can parse influxdb json model", func() { + json := ` + { + "dsType": "influxdb", + "groupBy": [ + { + "params": [ + "$interval" + ], + "type": "time" + }, + { + "params": [ + "datacenter" + ], + "type": "tag" + }, + { + "params": [ + "none" + ], + "type": "fill" + } + ], + "measurement": "logins.count", + "policy": "default", + "refId": "B", + "resultFormat": "time_series", + "select": [ + [ + { + "type": "field", + "params": [ + "value" + ] + }, + { + "type": "count", + "params": [] + } + ], + [ + { + "type": "field", + "params": [ + "value" + ] + }, + { + "type": "bottom", + "params": [ + 3 + ] + } + ], + [ + { + "type": "field", + "params": [ + "value" + ] + }, + { + "type": "mean", + "params": [] + }, + { + "type": "math", + "params": [ + " / 100" + ] + } + ] + ], + "tags": [ + { + "key": "datacenter", + "operator": "=", + "value": "America" + }, + { + "condition": "OR", + "key": "hostname", + "operator": "=", + "value": "server1" + } + ] + } + ` + + modelJson, err := simplejson.NewJson([]byte(json)) + So(err, ShouldBeNil) + + res, err := parser.Parse(modelJson) + So(err, ShouldBeNil) + So(len(res.GroupBy), ShouldEqual, 3) + So(len(res.Selects), ShouldEqual, 3) + So(len(res.Tags), ShouldEqual, 2) + }) + }) +} diff --git a/pkg/tsdb/influxdb/models.go b/pkg/tsdb/influxdb/models.go index 3de80154ee2..c0713c83183 100644 --- a/pkg/tsdb/influxdb/models.go +++ b/pkg/tsdb/influxdb/models.go @@ -23,3 +23,26 @@ type Select []QueryPart type InfluxDbSelect struct { Type string } + +type Response struct { + Results []Result + Err error +} + +type Result struct { + Series []Row + Messages []*Message + Err error +} + +type Message struct { + Level string `json:"level,omitempty"` + Text string `json:"text,omitempty"` +} + +type Row struct { + Name string `json:"name,omitempty"` + Tags map[string]string `json:"tags,omitempty"` + Columns []string `json:"columns,omitempty"` + Values [][]interface{} `json:"values,omitempty"` +} diff --git a/pkg/tsdb/influxdb/parser_test.go b/pkg/tsdb/influxdb/parser_test.go deleted file mode 100644 index b8bba08616a..00000000000 --- a/pkg/tsdb/influxdb/parser_test.go +++ /dev/null @@ -1,115 +0,0 @@ -package influxdb - -import ( - "testing" - - "github.com/grafana/grafana/pkg/components/simplejson" - . "github.com/smartystreets/goconvey/convey" -) - -func TestInfluxdbQueryParser(t *testing.T) { - Convey("Influxdb query parser", t, func() { - - parser := &InfluxdbQueryParser{} - - Convey("converting metric name", func() { - json := ` - { - "dsType": "influxdb", - "groupBy": [ - { - "params": [ - "$interval" - ], - "type": "time" - }, - { - "params": [ - "datacenter" - ], - "type": "tag" - }, - { - "params": [ - "none" - ], - "type": "fill" - } - ], - "measurement": "logins.count", - "policy": "default", - "refId": "B", - "resultFormat": "time_series", - "select": [ - [ - { - "type": "field", - "params": [ - "value" - ] - }, - { - "type": "count", - "params": [] - } - ], - [ - { - "type": "field", - "params": [ - "value" - ] - }, - { - "type": "bottom", - "params": [ - 3 - ] - } - ], - [ - { - "type": "field", - "params": [ - "value" - ] - }, - { - "type": "mean", - "params": [] - }, - { - "type": "math", - "params": [ - " / 100" - ] - } - ] - ], - "tags": [ - { - "key": "datacenter", - "operator": "=", - "value": "America" - }, - { - "condition": "OR", - "key": "hostname", - "operator": "=", - "value": "server1" - } - ] - } - ` - - modelJson, err := simplejson.NewJson([]byte(json)) - So(err, ShouldBeNil) - - res, err := parser.Parse(modelJson) - So(err, ShouldBeNil) - So(len(res.GroupBy), ShouldEqual, 3) - So(len(res.Selects), ShouldEqual, 3) - So(len(res.Tags), ShouldEqual, 2) - }) - }) -} diff --git a/pkg/tsdb/influxdb/query_builder.go b/pkg/tsdb/influxdb/query_builder.go index 22863ce78bf..90e2cdbfa4e 100644 --- a/pkg/tsdb/influxdb/query_builder.go +++ b/pkg/tsdb/influxdb/query_builder.go @@ -7,7 +7,7 @@ import ( "github.com/grafana/grafana/pkg/tsdb" ) -type QueryBuild struct{} +type QueryBuilder struct{} func renderTags(query *Query) []string { var res []string @@ -29,7 +29,23 @@ func renderTags(query *Query) []string { return res } -func (*QueryBuild) Build(query *Query, queryContext *tsdb.QueryContext) (string, error) { +func (*QueryBuilder) Build(query *Query, queryContext *tsdb.QueryContext) (string, error) { + res := renderSelectors(query) + res += renderMeasurement(query) + res += renderWhereClause(query) + res += renderTimeFilter(query) + res += renderGroupBy(query) + + return res, nil +} + +func renderTimeFilter(query *Query) string { + //res += "$timeFilter" + //res += "time > now() -" + strings.Replace(queryContext.TimeRange.From, "now", "", 1) + return "time > now() - 5m" +} + +func renderSelectors(query *Query) string { res := "SELECT " var selectors []string @@ -41,34 +57,40 @@ func (*QueryBuild) Build(query *Query, queryContext *tsdb.QueryContext) (string, } selectors = append(selectors, stk) } - res += strings.Join(selectors, ", ") + return res + strings.Join(selectors, ", ") +} + +func renderMeasurement(query *Query) string { policy := "" if query.Policy == "" || query.Policy == "default" { policy = "" } else { policy = `"` + query.Policy + `".` } - res += fmt.Sprintf(` FROM %s"%s"`, policy, query.Measurement) + return fmt.Sprintf(` FROM %s"%s"`, policy, query.Measurement) +} - res += " WHERE " +func renderWhereClause(query *Query) string { + res := " WHERE " conditions := renderTags(query) res += strings.Join(conditions, " ") if len(conditions) > 0 { res += " AND " } - //res += "$timeFilter" - res += "time > " + strings.Replace(queryContext.TimeRange.From, "now", "now()", 1) + return res +} +func renderGroupBy(query *Query) string { var groupBy []string for _, group := range query.GroupBy { groupBy = append(groupBy, group.Render("")) } if len(groupBy) > 0 { - res += " GROUP BY " + strings.Join(groupBy, " ") + return " GROUP BY " + strings.Join(groupBy, " ") } - return res, nil + return "" } diff --git a/pkg/tsdb/influxdb/query_builder_test.go b/pkg/tsdb/influxdb/query_builder_test.go index f464117bd3e..b477d00e6cc 100644 --- a/pkg/tsdb/influxdb/query_builder_test.go +++ b/pkg/tsdb/influxdb/query_builder_test.go @@ -9,7 +9,7 @@ import ( func TestInfluxdbQueryBuilder(t *testing.T) { Convey("Influxdb query builder", t, func() { - builder := QueryBuild{} + builder := QueryBuilder{} qp1, _ := NewQueryPart("field", []string{"value"}) qp2, _ := NewQueryPart("mean", []string{}) @@ -21,10 +21,10 @@ func TestInfluxdbQueryBuilder(t *testing.T) { tag2 := &Tag{Key: "hostname", Value: "server2", Operator: "=", Condition: "OR"} queryContext := &tsdb.QueryContext{ - TimeRange: tsdb.NewTimeRange("now-5h", "now"), + TimeRange: tsdb.NewTimeRange("now-5m", "now"), } - Convey("can build query", func() { + Convey("can build simple query", func() { query := &Query{ Selects: []*Select{{*qp1, *qp2}}, Measurement: "cpu", @@ -35,10 +35,10 @@ func TestInfluxdbQueryBuilder(t *testing.T) { rawQuery, err := builder.Build(query, queryContext) So(err, ShouldBeNil) - So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "policy"."cpu" WHERE time > now()-5h GROUP BY time(10s) fill(null)`) + So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "policy"."cpu" WHERE time > now() - 5m GROUP BY time(10s) fill(null)`) }) - Convey("can asd query", func() { + Convey("can build query with group bys", func() { query := &Query{ Selects: []*Select{{*qp1, *qp2}}, Measurement: "cpu", @@ -49,7 +49,7 @@ func TestInfluxdbQueryBuilder(t *testing.T) { rawQuery, err := builder.Build(query, queryContext) So(err, ShouldBeNil) - So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "cpu" WHERE "hostname" = 'server1' OR "hostname" = 'server2' AND time > now()-5h GROUP BY time(10s)`) + So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "cpu" WHERE "hostname" = 'server1' OR "hostname" = 'server2' AND time > now() - 5m GROUP BY time(10s)`) }) }) } diff --git a/pkg/tsdb/influxdb/query_part_test.go b/pkg/tsdb/influxdb/query_part_test.go index 2160d59b851..c1b2a22a5a1 100644 --- a/pkg/tsdb/influxdb/query_part_test.go +++ b/pkg/tsdb/influxdb/query_part_test.go @@ -7,9 +7,9 @@ import ( ) func TestInfluxdbQueryPart(t *testing.T) { - Convey("Influxdb query part builder", t, func() { + Convey("Influxdb query parts", t, func() { - Convey("should handle field renderer parts", func() { + Convey("render field ", func() { part, err := NewQueryPart("field", []string{"value"}) So(err, ShouldBeNil) @@ -17,7 +17,7 @@ func TestInfluxdbQueryPart(t *testing.T) { So(res, ShouldEqual, `"value"`) }) - Convey("should handle nested function parts", func() { + Convey("render nested part", func() { part, err := NewQueryPart("derivative", []string{"10s"}) So(err, ShouldBeNil) @@ -25,7 +25,7 @@ func TestInfluxdbQueryPart(t *testing.T) { So(res, ShouldEqual, "derivative(mean(value), 10s)") }) - Convey("bottom", func() { + Convey("render bottom", func() { part, err := NewQueryPart("bottom", []string{"3"}) So(err, ShouldBeNil) @@ -33,7 +33,7 @@ func TestInfluxdbQueryPart(t *testing.T) { So(res, ShouldEqual, "bottom(value, 3)") }) - Convey("time", func() { + Convey("render time", func() { part, err := NewQueryPart("time", []string{"$interval"}) So(err, ShouldBeNil) @@ -41,7 +41,7 @@ func TestInfluxdbQueryPart(t *testing.T) { So(res, ShouldEqual, "time(10s)") }) - Convey("should nest spread function", func() { + Convey("render spread", func() { part, err := NewQueryPart("spread", []string{}) So(err, ShouldBeNil) @@ -49,7 +49,7 @@ func TestInfluxdbQueryPart(t *testing.T) { So(res, ShouldEqual, `spread(value)`) }) - Convey("should handle suffix parts", func() { + Convey("render suffix", func() { part, err := NewQueryPart("math", []string{"/ 100"}) So(err, ShouldBeNil) @@ -57,7 +57,7 @@ func TestInfluxdbQueryPart(t *testing.T) { So(res, ShouldEqual, "mean(value) / 100") }) - Convey("should handle alias parts", func() { + Convey("render alias", func() { part, err := NewQueryPart("alias", []string{"test"}) So(err, ShouldBeNil) diff --git a/pkg/tsdb/influxdb/response_parser.go b/pkg/tsdb/influxdb/response_parser.go new file mode 100644 index 00000000000..1200f11d459 --- /dev/null +++ b/pkg/tsdb/influxdb/response_parser.go @@ -0,0 +1,49 @@ +package influxdb + +import ( + "encoding/json" + + "github.com/grafana/grafana/pkg/tsdb" + "gopkg.in/guregu/null.v3" +) + +func ParseQueryResult(response *Response) *tsdb.QueryResult { + queryRes := tsdb.NewQueryResult() + + for _, v := range response.Results { + for _, r := range v.Series { + serie := tsdb.TimeSeries{Name: r.Name} + var points tsdb.TimeSeriesPoints + + for _, k := range r.Values { + var value null.Float + var err error + num, ok := k[1].(json.Number) + if !ok { + value = null.FloatFromPtr(nil) + } else { + fvalue, err := num.Float64() + if err == nil { + value = null.FloatFrom(fvalue) + } + } + + pos0, ok := k[0].(json.Number) + timestamp, err := pos0.Float64() + if err == nil && ok { + points = append(points, tsdb.NewTimePoint(value, timestamp)) + } else { + //glog.Error("Failed to convert response", "err1", err, "ok", ok, "timestamp", timestamp, "value", value.Float64) + } + serie.Points = points + } + queryRes.Series = append(queryRes.Series, &serie) + } + } + + for _, v := range queryRes.Series { + glog.Info("result", "name", v.Name, "points", v.Points) + } + + return queryRes +} diff --git a/pkg/tsdb/influxdb/response_parser_test.go b/pkg/tsdb/influxdb/response_parser_test.go new file mode 100644 index 00000000000..f1443c946e5 --- /dev/null +++ b/pkg/tsdb/influxdb/response_parser_test.go @@ -0,0 +1,49 @@ +package influxdb + +import ( + "encoding/json" + "testing" + + "github.com/grafana/grafana/pkg/setting" + . "github.com/smartystreets/goconvey/convey" +) + +func TestInfluxdbResponseParser(t *testing.T) { + Convey("Influxdb response parser", t, func() { + + setting.NewConfigContext(&setting.CommandLineArgs{ + HomePath: "../../../", + }) + + response := &Response{ + Results: []Result{ + Result{ + Series: []Row{ + { + Name: "cpu", + Columns: []string{"time", "mean", "sum"}, + Values: [][]interface{}{ + {json.Number("123"), json.Number("123"), json.Number("123")}, + {json.Number("123"), json.Number("123"), json.Number("123")}, + {json.Number("123"), json.Number("123"), json.Number("123")}, + {json.Number("123"), json.Number("123"), json.Number("123")}, + {json.Number("123"), json.Number("123"), json.Number("123")}, + {json.Number("123"), json.Number("123"), json.Number("123")}, + {json.Number("123"), json.Number("123"), json.Number("123")}, + {json.Number("123"), json.Number("123"), json.Number("123")}, + {json.Number("123"), json.Number("123"), json.Number("123")}, + {json.Number("123"), json.Number("123"), json.Number("123")}, + }, + }, + }, + }, + }, + } + + Convey("can parse response", func() { + result := ParseQueryResult(response) + So(len(result.Series), ShouldEqual, 1) + So(len(result.Series[0].Points), ShouldEqual, 10) + }) + }) +} diff --git a/pkg/tsdb/models.go b/pkg/tsdb/models.go index 7ba25f9674c..366709cfba7 100644 --- a/pkg/tsdb/models.go +++ b/pkg/tsdb/models.go @@ -52,6 +52,11 @@ type BatchResult struct { Timings *BatchTiming } +func (br *BatchResult) WithError(err error) *BatchResult { + br.Error = err + return br +} + type QueryResult struct { Error error `json:"error"` RefId string `json:"refId"` diff --git a/pkg/tsdb/prometheus/prometheus.go b/pkg/tsdb/prometheus/prometheus.go index c95f4b49467..85f6b621c1c 100644 --- a/pkg/tsdb/prometheus/prometheus.go +++ b/pkg/tsdb/prometheus/prometheus.go @@ -52,12 +52,12 @@ func (e *PrometheusExecutor) Execute(ctx context.Context, queries tsdb.QuerySlic client, err := e.getClient() if err != nil { - return resultWithError(result, err) + return result.WithError(err) } query, err := parseQuery(queries, queryContext) if err != nil { - return resultWithError(result, err) + return result.WithError(err) } timeRange := prometheus.Range{ @@ -69,12 +69,12 @@ func (e *PrometheusExecutor) Execute(ctx context.Context, queries tsdb.QuerySlic value, err := client.QueryRange(ctx, query.Expr, timeRange) if err != nil { - return resultWithError(result, err) + return result.WithError(err) } queryResult, err := parseResponse(value, query) if err != nil { - return resultWithError(result, err) + return result.WithError(err) } result.QueryResults = queryResult return result @@ -157,7 +157,8 @@ func parseResponse(value pmodel.Value, query *PrometheusQuery) (map[string]*tsdb return queryResults, nil } +/* func resultWithError(result *tsdb.BatchResult, err error) *tsdb.BatchResult { result.Error = err return result -} +}*/ From b0addbd7cbf2113e860e06320a1b14c674971ca0 Mon Sep 17 00:00:00 2001 From: bergquist Date: Thu, 6 Oct 2016 15:30:09 +0200 Subject: [PATCH 12/25] feat(influxdb): support multi row results --- pkg/tsdb/influxdb/influxdb.go | 8 +- pkg/tsdb/influxdb/response_parser.go | 95 +++++++++++++++-------- pkg/tsdb/influxdb/response_parser_test.go | 46 ++++++----- 3 files changed, 96 insertions(+), 53 deletions(-) diff --git a/pkg/tsdb/influxdb/influxdb.go b/pkg/tsdb/influxdb/influxdb.go index d914381b11c..075959e9e31 100644 --- a/pkg/tsdb/influxdb/influxdb.go +++ b/pkg/tsdb/influxdb/influxdb.go @@ -18,8 +18,9 @@ import ( type InfluxDBExecutor struct { *tsdb.DataSourceInfo - QueryParser *InfluxdbQueryParser - QueryBuilder *QueryBuilder + QueryParser *InfluxdbQueryParser + QueryBuilder *QueryBuilder + ResponseParser *ResponseParser } func NewInfluxDBExecutor(dsInfo *tsdb.DataSourceInfo) tsdb.Executor { @@ -27,6 +28,7 @@ func NewInfluxDBExecutor(dsInfo *tsdb.DataSourceInfo) tsdb.Executor { DataSourceInfo: dsInfo, QueryParser: &InfluxdbQueryParser{}, QueryBuilder: &QueryBuilder{}, + ResponseParser: &ResponseParser{}, } } @@ -124,7 +126,7 @@ func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, } result.QueryResults = make(map[string]*tsdb.QueryResult) - result.QueryResults["A"] = ParseQueryResult(&response) + result.QueryResults["A"] = e.ResponseParser.Parse(&response) return result } diff --git a/pkg/tsdb/influxdb/response_parser.go b/pkg/tsdb/influxdb/response_parser.go index 1200f11d459..25da1fcebb3 100644 --- a/pkg/tsdb/influxdb/response_parser.go +++ b/pkg/tsdb/influxdb/response_parser.go @@ -2,48 +2,79 @@ package influxdb import ( "encoding/json" + "fmt" "github.com/grafana/grafana/pkg/tsdb" "gopkg.in/guregu/null.v3" ) -func ParseQueryResult(response *Response) *tsdb.QueryResult { +type ResponseParser struct{} + +func (rp *ResponseParser) Parse(response *Response) *tsdb.QueryResult { queryRes := tsdb.NewQueryResult() - for _, v := range response.Results { - for _, r := range v.Series { - serie := tsdb.TimeSeries{Name: r.Name} - var points tsdb.TimeSeriesPoints - - for _, k := range r.Values { - var value null.Float - var err error - num, ok := k[1].(json.Number) - if !ok { - value = null.FloatFromPtr(nil) - } else { - fvalue, err := num.Float64() - if err == nil { - value = null.FloatFrom(fvalue) - } - } - - pos0, ok := k[0].(json.Number) - timestamp, err := pos0.Float64() - if err == nil && ok { - points = append(points, tsdb.NewTimePoint(value, timestamp)) - } else { - //glog.Error("Failed to convert response", "err1", err, "ok", ok, "timestamp", timestamp, "value", value.Float64) - } - serie.Points = points - } - queryRes.Series = append(queryRes.Series, &serie) - } + for _, result := range response.Results { + rp.parseResult(result.Series, queryRes) } - for _, v := range queryRes.Series { - glog.Info("result", "name", v.Name, "points", v.Points) + for _, serie := range queryRes.Series { + glog.Debug("result", "name", serie.Name, "points", serie.Points) } return queryRes } + +func (rp *ResponseParser) parseResult(result []Row, queryResult *tsdb.QueryResult) { + for _, r := range result { + for columnIndex, column := range r.Columns { + if column == "time" { + continue + } + + var points tsdb.TimeSeriesPoints + for _, k := range r.Values { + points = append(points, rp.parseTimepoint(k, columnIndex)) + } + + queryResult.Series = append(queryResult.Series, &tsdb.TimeSeries{ + Name: rp.formatName(r, column), + Points: points, + }) + } + } +} + +func (rp *ResponseParser) formatName(row Row, column string) string { + return fmt.Sprintf("%s.%s", row.Name, column) +} + +func (rp *ResponseParser) parseTimepoint(k []interface{}, valuePosition int) tsdb.TimePoint { + var value null.Float = rp.parseValue(k[valuePosition]) + + timestampNumber, _ := k[0].(json.Number) + timestamp, err := timestampNumber.Float64() + if err != nil { + glog.Error("Invalid timestamp format. This should never happen!") + } + + return tsdb.NewTimePoint(value, timestamp) +} + +func (rp *ResponseParser) parseValue(value interface{}) null.Float { + num, ok := value.(json.Number) + if !ok { + return null.FloatFromPtr(nil) + } + + fvalue, err := num.Float64() + if err == nil { + return null.FloatFrom(fvalue) + } + + ivalue, err := num.Int64() + if err == nil { + return null.FloatFrom(float64(ivalue)) + } + + return null.FloatFromPtr(nil) +} diff --git a/pkg/tsdb/influxdb/response_parser_test.go b/pkg/tsdb/influxdb/response_parser_test.go index f1443c946e5..635bd2c3b20 100644 --- a/pkg/tsdb/influxdb/response_parser_test.go +++ b/pkg/tsdb/influxdb/response_parser_test.go @@ -4,16 +4,13 @@ import ( "encoding/json" "testing" - "github.com/grafana/grafana/pkg/setting" . "github.com/smartystreets/goconvey/convey" ) func TestInfluxdbResponseParser(t *testing.T) { Convey("Influxdb response parser", t, func() { - setting.NewConfigContext(&setting.CommandLineArgs{ - HomePath: "../../../", - }) + parser := &ResponseParser{} response := &Response{ Results: []Result{ @@ -22,17 +19,11 @@ func TestInfluxdbResponseParser(t *testing.T) { { Name: "cpu", Columns: []string{"time", "mean", "sum"}, + Tags: map[string]string{"datacenter": "America"}, Values: [][]interface{}{ - {json.Number("123"), json.Number("123"), json.Number("123")}, - {json.Number("123"), json.Number("123"), json.Number("123")}, - {json.Number("123"), json.Number("123"), json.Number("123")}, - {json.Number("123"), json.Number("123"), json.Number("123")}, - {json.Number("123"), json.Number("123"), json.Number("123")}, - {json.Number("123"), json.Number("123"), json.Number("123")}, - {json.Number("123"), json.Number("123"), json.Number("123")}, - {json.Number("123"), json.Number("123"), json.Number("123")}, - {json.Number("123"), json.Number("123"), json.Number("123")}, - {json.Number("123"), json.Number("123"), json.Number("123")}, + {json.Number("111"), json.Number("222"), json.Number("333")}, + {json.Number("111"), json.Number("222"), json.Number("333")}, + {json.Number("111"), json.Number("null"), json.Number("333")}, }, }, }, @@ -40,10 +31,29 @@ func TestInfluxdbResponseParser(t *testing.T) { }, } - Convey("can parse response", func() { - result := ParseQueryResult(response) - So(len(result.Series), ShouldEqual, 1) - So(len(result.Series[0].Points), ShouldEqual, 10) + result := parser.Parse(response) + + Convey("can parse all series", func() { + So(len(result.Series), ShouldEqual, 2) + }) + + Convey("can parse all points", func() { + So(len(result.Series[0].Points), ShouldEqual, 3) + So(len(result.Series[1].Points), ShouldEqual, 3) + }) + + Convey("can parse multi row result", func() { + So(result.Series[0].Points[1][0].Float64, ShouldEqual, float64(222)) + So(result.Series[1].Points[1][0].Float64, ShouldEqual, float64(333)) + }) + + Convey("can parse null points", func() { + So(result.Series[0].Points[2][0].Valid, ShouldBeFalse) + }) + + Convey("can format serie names", func() { + So(result.Series[0].Name, ShouldEqual, "cpu.mean") + So(result.Series[1].Name, ShouldEqual, "cpu.sum") }) }) } From 87650c150bd4982e0411e9039ba2ed5ff6dfeef6 Mon Sep 17 00:00:00 2001 From: bergquist Date: Thu, 6 Oct 2016 18:47:59 +0200 Subject: [PATCH 13/25] feat(influxdb): add tags to serie names --- pkg/tsdb/influxdb/response_parser.go | 12 +++++++++++- pkg/tsdb/influxdb/response_parser_test.go | 4 ++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/pkg/tsdb/influxdb/response_parser.go b/pkg/tsdb/influxdb/response_parser.go index 25da1fcebb3..0d7d8d049e3 100644 --- a/pkg/tsdb/influxdb/response_parser.go +++ b/pkg/tsdb/influxdb/response_parser.go @@ -45,7 +45,17 @@ func (rp *ResponseParser) parseResult(result []Row, queryResult *tsdb.QueryResul } func (rp *ResponseParser) formatName(row Row, column string) string { - return fmt.Sprintf("%s.%s", row.Name, column) + tags := "" + + for k, v := range row.Tags { + tags += k + ": " + v + } + + if tags != "" { + tags = fmt.Sprintf(" { %s }", tags) + } + + return fmt.Sprintf("%s.%s%s", row.Name, column, tags) } func (rp *ResponseParser) parseTimepoint(k []interface{}, valuePosition int) tsdb.TimePoint { diff --git a/pkg/tsdb/influxdb/response_parser_test.go b/pkg/tsdb/influxdb/response_parser_test.go index 635bd2c3b20..b45f98a1fff 100644 --- a/pkg/tsdb/influxdb/response_parser_test.go +++ b/pkg/tsdb/influxdb/response_parser_test.go @@ -52,8 +52,8 @@ func TestInfluxdbResponseParser(t *testing.T) { }) Convey("can format serie names", func() { - So(result.Series[0].Name, ShouldEqual, "cpu.mean") - So(result.Series[1].Name, ShouldEqual, "cpu.sum") + So(result.Series[0].Name, ShouldEqual, "cpu.mean { datacenter: America }") + So(result.Series[1].Name, ShouldEqual, "cpu.sum { datacenter: America }") }) }) } From 522d40fa26acc21f99b404ada10ebf5cf5c6048e Mon Sep 17 00:00:00 2001 From: bergquist Date: Thu, 6 Oct 2016 18:51:17 +0200 Subject: [PATCH 14/25] fix(influxdb): support mulitple tags --- pkg/tsdb/influxdb/response_parser.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pkg/tsdb/influxdb/response_parser.go b/pkg/tsdb/influxdb/response_parser.go index 0d7d8d049e3..81c90e65419 100644 --- a/pkg/tsdb/influxdb/response_parser.go +++ b/pkg/tsdb/influxdb/response_parser.go @@ -3,6 +3,7 @@ package influxdb import ( "encoding/json" "fmt" + "strings" "github.com/grafana/grafana/pkg/tsdb" "gopkg.in/guregu/null.v3" @@ -45,17 +46,18 @@ func (rp *ResponseParser) parseResult(result []Row, queryResult *tsdb.QueryResul } func (rp *ResponseParser) formatName(row Row, column string) string { - tags := "" + var tags []string for k, v := range row.Tags { - tags += k + ": " + v + tags = append(tags, fmt.Sprintf("%s: %s", k, v)) } - if tags != "" { - tags = fmt.Sprintf(" { %s }", tags) + tagText := "" + if len(tags) > 0 { + tagText = fmt.Sprintf(" { %s }", strings.Join(tags, " ")) } - return fmt.Sprintf("%s.%s%s", row.Name, column, tags) + return fmt.Sprintf("%s.%s%s", row.Name, column, tagText) } func (rp *ResponseParser) parseTimepoint(k []interface{}, valuePosition int) tsdb.TimePoint { From 0633e2c03f568534f0f46539b97013945f8e77d6 Mon Sep 17 00:00:00 2001 From: bergquist Date: Fri, 7 Oct 2016 11:23:37 +0200 Subject: [PATCH 15/25] feat(influxdb): add real support for time ranges --- pkg/tsdb/influxdb/models.go | 5 ++++- pkg/tsdb/influxdb/query_builder.go | 11 ++++++++--- pkg/tsdb/influxdb/query_builder_test.go | 17 ++++++++++++++++- 3 files changed, 28 insertions(+), 5 deletions(-) diff --git a/pkg/tsdb/influxdb/models.go b/pkg/tsdb/influxdb/models.go index c0713c83183..f45a14797d7 100644 --- a/pkg/tsdb/influxdb/models.go +++ b/pkg/tsdb/influxdb/models.go @@ -1,5 +1,7 @@ package influxdb +import "github.com/grafana/grafana/pkg/tsdb" + type Query struct { Measurement string Policy string @@ -8,7 +10,8 @@ type Query struct { GroupBy []*QueryPart Selects []*Select - Interval string + Interval string + TimeRange tsdb.TimeRange } type Tag struct { diff --git a/pkg/tsdb/influxdb/query_builder.go b/pkg/tsdb/influxdb/query_builder.go index 90e2cdbfa4e..d6caf0ee906 100644 --- a/pkg/tsdb/influxdb/query_builder.go +++ b/pkg/tsdb/influxdb/query_builder.go @@ -40,9 +40,14 @@ func (*QueryBuilder) Build(query *Query, queryContext *tsdb.QueryContext) (strin } func renderTimeFilter(query *Query) string { - //res += "$timeFilter" - //res += "time > now() -" + strings.Replace(queryContext.TimeRange.From, "now", "", 1) - return "time > now() - 5m" + from := "now() - " + query.TimeRange.From + to := "" + + if query.TimeRange.To != "now" && query.TimeRange.To != "" { + to = " and time < now() - " + strings.Replace(query.TimeRange.To, "now-", "", 1) + } + + return fmt.Sprintf("time > %s%s", from, to) } func renderSelectors(query *Query) string { diff --git a/pkg/tsdb/influxdb/query_builder_test.go b/pkg/tsdb/influxdb/query_builder_test.go index b477d00e6cc..f237ad53ed9 100644 --- a/pkg/tsdb/influxdb/query_builder_test.go +++ b/pkg/tsdb/influxdb/query_builder_test.go @@ -8,6 +8,7 @@ import ( ) func TestInfluxdbQueryBuilder(t *testing.T) { + Convey("Influxdb query builder", t, func() { builder := QueryBuilder{} @@ -31,6 +32,7 @@ func TestInfluxdbQueryBuilder(t *testing.T) { Policy: "policy", GroupBy: []*QueryPart{groupBy1, groupBy2}, Interval: "10s", + TimeRange: tsdb.TimeRange{From: "5m"}, } rawQuery, err := builder.Build(query, queryContext) @@ -45,11 +47,24 @@ func TestInfluxdbQueryBuilder(t *testing.T) { GroupBy: []*QueryPart{groupBy1}, Tags: []*Tag{tag1, tag2}, Interval: "5s", + TimeRange: tsdb.TimeRange{From: "1h", To: "now-1m"}, } rawQuery, err := builder.Build(query, queryContext) So(err, ShouldBeNil) - So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "cpu" WHERE "hostname" = 'server1' OR "hostname" = 'server2' AND time > now() - 5m GROUP BY time(10s)`) + So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "cpu" WHERE "hostname" = 'server1' OR "hostname" = 'server2' AND time > now() - 1h and time < now() - 1m GROUP BY time(10s)`) + }) + + Convey("can render time range", func() { + Convey("render from: 2h to now-1h", func() { + query := Query{TimeRange: tsdb.TimeRange{From: "2h", To: "now-1h"}} + So(renderTimeFilter(&query), ShouldEqual, "time > now() - 2h and time < now() - 1h") + }) + + Convey("render from: 10m", func() { + query := Query{TimeRange: tsdb.TimeRange{From: "10m"}} + So(renderTimeFilter(&query), ShouldEqual, "time > now() - 10m") + }) }) }) } From e499e8850c02fe90651319f8a2da2f694d83d6c7 Mon Sep 17 00:00:00 2001 From: bergquist Date: Fri, 7 Oct 2016 11:33:17 +0200 Subject: [PATCH 16/25] fix(influxdb): fix for timerange --- pkg/tsdb/influxdb/models.go | 5 +---- pkg/tsdb/influxdb/query_builder.go | 10 +++++----- pkg/tsdb/influxdb/query_builder_test.go | 16 ++++++++-------- 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/pkg/tsdb/influxdb/models.go b/pkg/tsdb/influxdb/models.go index f45a14797d7..c0713c83183 100644 --- a/pkg/tsdb/influxdb/models.go +++ b/pkg/tsdb/influxdb/models.go @@ -1,7 +1,5 @@ package influxdb -import "github.com/grafana/grafana/pkg/tsdb" - type Query struct { Measurement string Policy string @@ -10,8 +8,7 @@ type Query struct { GroupBy []*QueryPart Selects []*Select - Interval string - TimeRange tsdb.TimeRange + Interval string } type Tag struct { diff --git a/pkg/tsdb/influxdb/query_builder.go b/pkg/tsdb/influxdb/query_builder.go index d6caf0ee906..faf6f051bbb 100644 --- a/pkg/tsdb/influxdb/query_builder.go +++ b/pkg/tsdb/influxdb/query_builder.go @@ -33,18 +33,18 @@ func (*QueryBuilder) Build(query *Query, queryContext *tsdb.QueryContext) (strin res := renderSelectors(query) res += renderMeasurement(query) res += renderWhereClause(query) - res += renderTimeFilter(query) + res += renderTimeFilter(query, queryContext) res += renderGroupBy(query) return res, nil } -func renderTimeFilter(query *Query) string { - from := "now() - " + query.TimeRange.From +func renderTimeFilter(query *Query, queryContext *tsdb.QueryContext) string { + from := "now() - " + queryContext.TimeRange.From to := "" - if query.TimeRange.To != "now" && query.TimeRange.To != "" { - to = " and time < now() - " + strings.Replace(query.TimeRange.To, "now-", "", 1) + if queryContext.TimeRange.To != "now" && queryContext.TimeRange.To != "" { + to = " and time < now() - " + strings.Replace(queryContext.TimeRange.To, "now-", "", 1) } return fmt.Sprintf("time > %s%s", from, to) diff --git a/pkg/tsdb/influxdb/query_builder_test.go b/pkg/tsdb/influxdb/query_builder_test.go index f237ad53ed9..26f587d479b 100644 --- a/pkg/tsdb/influxdb/query_builder_test.go +++ b/pkg/tsdb/influxdb/query_builder_test.go @@ -22,7 +22,7 @@ func TestInfluxdbQueryBuilder(t *testing.T) { tag2 := &Tag{Key: "hostname", Value: "server2", Operator: "=", Condition: "OR"} queryContext := &tsdb.QueryContext{ - TimeRange: tsdb.NewTimeRange("now-5m", "now"), + TimeRange: tsdb.NewTimeRange("5m", "now"), } Convey("can build simple query", func() { @@ -32,7 +32,6 @@ func TestInfluxdbQueryBuilder(t *testing.T) { Policy: "policy", GroupBy: []*QueryPart{groupBy1, groupBy2}, Interval: "10s", - TimeRange: tsdb.TimeRange{From: "5m"}, } rawQuery, err := builder.Build(query, queryContext) @@ -47,23 +46,24 @@ func TestInfluxdbQueryBuilder(t *testing.T) { GroupBy: []*QueryPart{groupBy1}, Tags: []*Tag{tag1, tag2}, Interval: "5s", - TimeRange: tsdb.TimeRange{From: "1h", To: "now-1m"}, } rawQuery, err := builder.Build(query, queryContext) So(err, ShouldBeNil) - So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "cpu" WHERE "hostname" = 'server1' OR "hostname" = 'server2' AND time > now() - 1h and time < now() - 1m GROUP BY time(10s)`) + So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "cpu" WHERE "hostname" = 'server1' OR "hostname" = 'server2' AND time > now() - 5m GROUP BY time(10s)`) }) Convey("can render time range", func() { + query := Query{} Convey("render from: 2h to now-1h", func() { - query := Query{TimeRange: tsdb.TimeRange{From: "2h", To: "now-1h"}} - So(renderTimeFilter(&query), ShouldEqual, "time > now() - 2h and time < now() - 1h") + query := Query{} + queryContext := &tsdb.QueryContext{TimeRange: tsdb.NewTimeRange("2h", "now-1h")} + So(renderTimeFilter(&query, queryContext), ShouldEqual, "time > now() - 2h and time < now() - 1h") }) Convey("render from: 10m", func() { - query := Query{TimeRange: tsdb.TimeRange{From: "10m"}} - So(renderTimeFilter(&query), ShouldEqual, "time > now() - 10m") + queryContext := &tsdb.QueryContext{TimeRange: tsdb.NewTimeRange("10m", "now")} + So(renderTimeFilter(&query, queryContext), ShouldEqual, "time > now() - 10m") }) }) }) From 4fafefd66a66f03fe0f4612c294a663b60b35e8d Mon Sep 17 00:00:00 2001 From: bergquist Date: Fri, 7 Oct 2016 15:09:54 +0200 Subject: [PATCH 17/25] feat(influxdb): proper support for group by tags --- pkg/tsdb/influxdb/query_builder.go | 22 ++++++++++++++-------- pkg/tsdb/influxdb/query_builder_test.go | 9 +++++---- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/pkg/tsdb/influxdb/query_builder.go b/pkg/tsdb/influxdb/query_builder.go index faf6f051bbb..846c6a4ba67 100644 --- a/pkg/tsdb/influxdb/query_builder.go +++ b/pkg/tsdb/influxdb/query_builder.go @@ -88,14 +88,20 @@ func renderWhereClause(query *Query) string { } func renderGroupBy(query *Query) string { - var groupBy []string - for _, group := range query.GroupBy { - groupBy = append(groupBy, group.Render("")) + groupBy := "" + for i, group := range query.GroupBy { + if i == 0 { + groupBy += " GROUP BY" + } + + if i > 0 && group.Type != "fill" { + groupBy += ", " //fill is special. fill is a creep + } else { + groupBy += " " + } + + groupBy += group.Render("") } - if len(groupBy) > 0 { - return " GROUP BY " + strings.Join(groupBy, " ") - } - - return "" + return groupBy } diff --git a/pkg/tsdb/influxdb/query_builder_test.go b/pkg/tsdb/influxdb/query_builder_test.go index 26f587d479b..fcc81986d4c 100644 --- a/pkg/tsdb/influxdb/query_builder_test.go +++ b/pkg/tsdb/influxdb/query_builder_test.go @@ -16,7 +16,8 @@ func TestInfluxdbQueryBuilder(t *testing.T) { qp2, _ := NewQueryPart("mean", []string{}) groupBy1, _ := NewQueryPart("time", []string{"$interval"}) - groupBy2, _ := NewQueryPart("fill", []string{"null"}) + groupBy2, _ := NewQueryPart("tag", []string{"datacenter"}) + groupBy3, _ := NewQueryPart("fill", []string{"null"}) tag1 := &Tag{Key: "hostname", Value: "server1", Operator: "="} tag2 := &Tag{Key: "hostname", Value: "server2", Operator: "=", Condition: "OR"} @@ -30,7 +31,7 @@ func TestInfluxdbQueryBuilder(t *testing.T) { Selects: []*Select{{*qp1, *qp2}}, Measurement: "cpu", Policy: "policy", - GroupBy: []*QueryPart{groupBy1, groupBy2}, + GroupBy: []*QueryPart{groupBy1, groupBy3}, Interval: "10s", } @@ -43,14 +44,14 @@ func TestInfluxdbQueryBuilder(t *testing.T) { query := &Query{ Selects: []*Select{{*qp1, *qp2}}, Measurement: "cpu", - GroupBy: []*QueryPart{groupBy1}, + GroupBy: []*QueryPart{groupBy1, groupBy2, groupBy3}, Tags: []*Tag{tag1, tag2}, Interval: "5s", } rawQuery, err := builder.Build(query, queryContext) So(err, ShouldBeNil) - So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "cpu" WHERE "hostname" = 'server1' OR "hostname" = 'server2' AND time > now() - 5m GROUP BY time(10s)`) + So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "cpu" WHERE "hostname" = 'server1' OR "hostname" = 'server2' AND time > now() - 5m GROUP BY time(10s), "datacenter" fill(null)`) }) Convey("can render time range", func() { From 81443bf8b446346017c468bfacc70c45a6366b28 Mon Sep 17 00:00:00 2001 From: Eric Perrino Date: Sat, 8 Oct 2016 01:20:45 -0500 Subject: [PATCH 18/25] Added a state parameter for all OAuth requests --- pkg/api/login_oauth.go | 20 +++++++++++++++++++- pkg/middleware/session.go | 1 + 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/pkg/api/login_oauth.go b/pkg/api/login_oauth.go index bc222361b25..3f1a0cadaa9 100644 --- a/pkg/api/login_oauth.go +++ b/pkg/api/login_oauth.go @@ -3,6 +3,8 @@ package api import ( "errors" "fmt" + "crypto/rand" + "encoding/base64" "golang.org/x/oauth2" @@ -14,6 +16,12 @@ import ( "github.com/grafana/grafana/pkg/social" ) +func GenStateString() string { + rnd := make([]byte, 32) + rand.Read(rnd) + return base64.StdEncoding.EncodeToString(rnd) +} + func OAuthLogin(ctx *middleware.Context) { if setting.OAuthService == nil { ctx.Handle(404, "login.OAuthLogin(oauth service not enabled)", nil) @@ -29,7 +37,17 @@ func OAuthLogin(ctx *middleware.Context) { code := ctx.Query("code") if code == "" { - ctx.Redirect(connect.AuthCodeURL("", oauth2.AccessTypeOnline)) + state := GenStateString() + ctx.Session.Set(middleware.SESS_KEY_OAUTH_STATE, state) + ctx.Redirect(connect.AuthCodeURL(state, oauth2.AccessTypeOnline)) + return + } + + // verify state string + savedState := ctx.Session.Get(middleware.SESS_KEY_OAUTH_STATE).(string) + queryState := ctx.Query("state") + if savedState != queryState { + ctx.Handle(500, "login.OAuthLogin(state mismatch)", nil) return } diff --git a/pkg/middleware/session.go b/pkg/middleware/session.go index ee6462be37a..d575189f4de 100644 --- a/pkg/middleware/session.go +++ b/pkg/middleware/session.go @@ -13,6 +13,7 @@ import ( const ( SESS_KEY_USERID = "uid" + SESS_KEY_OAUTH_STATE = "state" ) var sessionManager *session.Manager From 0bfb94dc6fc6419ff89f1d74f838a6130e905c8a Mon Sep 17 00:00:00 2001 From: bergquist Date: Mon, 10 Oct 2016 07:12:21 +0200 Subject: [PATCH 19/25] feat(tsdb): add interval calculator --- pkg/tsdb/influxdb/query_builder.go | 12 +- pkg/tsdb/influxdb/query_builder_test.go | 4 +- pkg/tsdb/influxdb/query_part.go | 22 ++-- pkg/tsdb/influxdb/query_part_test.go | 21 ++-- pkg/tsdb/influxdb/response_parser.go | 8 +- pkg/tsdb/interval.go | 149 ++++++++++++++++++++++++ pkg/tsdb/interval_test.go | 57 +++++++++ 7 files changed, 244 insertions(+), 29 deletions(-) create mode 100644 pkg/tsdb/interval.go create mode 100644 pkg/tsdb/interval_test.go diff --git a/pkg/tsdb/influxdb/query_builder.go b/pkg/tsdb/influxdb/query_builder.go index 846c6a4ba67..5c82a41e78a 100644 --- a/pkg/tsdb/influxdb/query_builder.go +++ b/pkg/tsdb/influxdb/query_builder.go @@ -30,11 +30,11 @@ func renderTags(query *Query) []string { } func (*QueryBuilder) Build(query *Query, queryContext *tsdb.QueryContext) (string, error) { - res := renderSelectors(query) + res := renderSelectors(query, queryContext) res += renderMeasurement(query) res += renderWhereClause(query) res += renderTimeFilter(query, queryContext) - res += renderGroupBy(query) + res += renderGroupBy(query, queryContext) return res, nil } @@ -50,7 +50,7 @@ func renderTimeFilter(query *Query, queryContext *tsdb.QueryContext) string { return fmt.Sprintf("time > %s%s", from, to) } -func renderSelectors(query *Query) string { +func renderSelectors(query *Query, queryContext *tsdb.QueryContext) string { res := "SELECT " var selectors []string @@ -58,7 +58,7 @@ func renderSelectors(query *Query) string { stk := "" for _, s := range *sel { - stk = s.Render(stk) + stk = s.Render(queryContext, stk) } selectors = append(selectors, stk) } @@ -87,7 +87,7 @@ func renderWhereClause(query *Query) string { return res } -func renderGroupBy(query *Query) string { +func renderGroupBy(query *Query, queryContext *tsdb.QueryContext) string { groupBy := "" for i, group := range query.GroupBy { if i == 0 { @@ -100,7 +100,7 @@ func renderGroupBy(query *Query) string { groupBy += " " } - groupBy += group.Render("") + groupBy += group.Render(queryContext, "") } return groupBy diff --git a/pkg/tsdb/influxdb/query_builder_test.go b/pkg/tsdb/influxdb/query_builder_test.go index fcc81986d4c..083d09b360e 100644 --- a/pkg/tsdb/influxdb/query_builder_test.go +++ b/pkg/tsdb/influxdb/query_builder_test.go @@ -37,7 +37,7 @@ func TestInfluxdbQueryBuilder(t *testing.T) { rawQuery, err := builder.Build(query, queryContext) So(err, ShouldBeNil) - So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "policy"."cpu" WHERE time > now() - 5m GROUP BY time(10s) fill(null)`) + So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "policy"."cpu" WHERE time > now() - 5m GROUP BY time(200ms) fill(null)`) }) Convey("can build query with group bys", func() { @@ -51,7 +51,7 @@ func TestInfluxdbQueryBuilder(t *testing.T) { rawQuery, err := builder.Build(query, queryContext) So(err, ShouldBeNil) - So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "cpu" WHERE "hostname" = 'server1' OR "hostname" = 'server2' AND time > now() - 5m GROUP BY time(10s), "datacenter" fill(null)`) + So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "cpu" WHERE "hostname" = 'server1' OR "hostname" = 'server2' AND time > now() - 5m GROUP BY time(200ms), "datacenter" fill(null)`) }) Convey("can render time range", func() { diff --git a/pkg/tsdb/influxdb/query_part.go b/pkg/tsdb/influxdb/query_part.go index ba14bcac665..bace83246c2 100644 --- a/pkg/tsdb/influxdb/query_part.go +++ b/pkg/tsdb/influxdb/query_part.go @@ -3,6 +3,8 @@ package influxdb import ( "fmt" "strings" + + "github.com/grafana/grafana/pkg/tsdb" ) var renders map[string]QueryDefinition @@ -13,7 +15,7 @@ type DefinitionParameters struct { } type QueryDefinition struct { - Renderer func(part *QueryPart, innerExpr string) string + Renderer func(queryContext *tsdb.QueryContext, part *QueryPart, innerExpr string) string Params []DefinitionParameters } @@ -83,17 +85,17 @@ func init() { renders["alias"] = QueryDefinition{Renderer: aliasRenderer} } -func fieldRenderer(part *QueryPart, innerExpr string) string { +func fieldRenderer(queryContext *tsdb.QueryContext, part *QueryPart, innerExpr string) string { if part.Params[0] == "*" { return "*" } return fmt.Sprintf(`"%s"`, part.Params[0]) } -func functionRenderer(part *QueryPart, innerExpr string) string { +func functionRenderer(queryContext *tsdb.QueryContext, part *QueryPart, innerExpr string) string { for i, v := range part.Params { if v == "$interval" { - part.Params[i] = "10s" + part.Params[i] = tsdb.CalculateInterval(queryContext.TimeRange) } } @@ -106,16 +108,16 @@ func functionRenderer(part *QueryPart, innerExpr string) string { return fmt.Sprintf("%s(%s)", part.Type, params) } -func suffixRenderer(part *QueryPart, innerExpr string) string { +func suffixRenderer(queryContext *tsdb.QueryContext, part *QueryPart, innerExpr string) string { return fmt.Sprintf("%s %s", innerExpr, part.Params[0]) } -func aliasRenderer(part *QueryPart, innerExpr string) string { +func aliasRenderer(queryContext *tsdb.QueryContext, part *QueryPart, innerExpr string) string { return fmt.Sprintf(`%s AS "%s"`, innerExpr, part.Params[0]) } -func (r QueryDefinition) Render(part *QueryPart, innerExpr string) string { - return r.Renderer(part, innerExpr) +func (r QueryDefinition) Render(queryContext *tsdb.QueryContext, part *QueryPart, innerExpr string) string { + return r.Renderer(queryContext, part, innerExpr) } func NewQueryPart(typ string, params []string) (*QueryPart, error) { @@ -138,6 +140,6 @@ type QueryPart struct { Params []string } -func (qp *QueryPart) Render(expr string) string { - return qp.Def.Renderer(qp, expr) +func (qp *QueryPart) Render(queryContext *tsdb.QueryContext, expr string) string { + return qp.Def.Renderer(queryContext, qp, expr) } diff --git a/pkg/tsdb/influxdb/query_part_test.go b/pkg/tsdb/influxdb/query_part_test.go index c1b2a22a5a1..bd2d544d3e1 100644 --- a/pkg/tsdb/influxdb/query_part_test.go +++ b/pkg/tsdb/influxdb/query_part_test.go @@ -3,17 +3,22 @@ package influxdb import ( "testing" + "github.com/grafana/grafana/pkg/tsdb" . "github.com/smartystreets/goconvey/convey" ) func TestInfluxdbQueryPart(t *testing.T) { Convey("Influxdb query parts", t, func() { + queryContext := &tsdb.QueryContext{ + TimeRange: tsdb.NewTimeRange("5m", "now"), + } + Convey("render field ", func() { part, err := NewQueryPart("field", []string{"value"}) So(err, ShouldBeNil) - res := part.Render("value") + res := part.Render(queryContext, "value") So(res, ShouldEqual, `"value"`) }) @@ -21,7 +26,7 @@ func TestInfluxdbQueryPart(t *testing.T) { part, err := NewQueryPart("derivative", []string{"10s"}) So(err, ShouldBeNil) - res := part.Render("mean(value)") + res := part.Render(queryContext, "mean(value)") So(res, ShouldEqual, "derivative(mean(value), 10s)") }) @@ -29,7 +34,7 @@ func TestInfluxdbQueryPart(t *testing.T) { part, err := NewQueryPart("bottom", []string{"3"}) So(err, ShouldBeNil) - res := part.Render("value") + res := part.Render(queryContext, "value") So(res, ShouldEqual, "bottom(value, 3)") }) @@ -37,15 +42,15 @@ func TestInfluxdbQueryPart(t *testing.T) { part, err := NewQueryPart("time", []string{"$interval"}) So(err, ShouldBeNil) - res := part.Render("") - So(res, ShouldEqual, "time(10s)") + res := part.Render(queryContext, "") + So(res, ShouldEqual, "time(200ms)") }) Convey("render spread", func() { part, err := NewQueryPart("spread", []string{}) So(err, ShouldBeNil) - res := part.Render("value") + res := part.Render(queryContext, "value") So(res, ShouldEqual, `spread(value)`) }) @@ -53,7 +58,7 @@ func TestInfluxdbQueryPart(t *testing.T) { part, err := NewQueryPart("math", []string{"/ 100"}) So(err, ShouldBeNil) - res := part.Render("mean(value)") + res := part.Render(queryContext, "mean(value)") So(res, ShouldEqual, "mean(value) / 100") }) @@ -61,7 +66,7 @@ func TestInfluxdbQueryPart(t *testing.T) { part, err := NewQueryPart("alias", []string{"test"}) So(err, ShouldBeNil) - res := part.Render("mean(value)") + res := part.Render(queryContext, "mean(value)") So(res, ShouldEqual, `mean(value) AS "test"`) }) }) diff --git a/pkg/tsdb/influxdb/response_parser.go b/pkg/tsdb/influxdb/response_parser.go index 81c90e65419..267ebc99c63 100644 --- a/pkg/tsdb/influxdb/response_parser.go +++ b/pkg/tsdb/influxdb/response_parser.go @@ -18,9 +18,11 @@ func (rp *ResponseParser) Parse(response *Response) *tsdb.QueryResult { rp.parseResult(result.Series, queryRes) } - for _, serie := range queryRes.Series { - glog.Debug("result", "name", serie.Name, "points", serie.Points) - } + /* + for _, serie := range queryRes.Series { + glog.Debug("result", "name", serie.Name, "points", serie.Points) + } + */ return queryRes } diff --git a/pkg/tsdb/interval.go b/pkg/tsdb/interval.go new file mode 100644 index 00000000000..7bcdbfee31a --- /dev/null +++ b/pkg/tsdb/interval.go @@ -0,0 +1,149 @@ +package tsdb + +import ( + "fmt" + "time" + + "github.com/grafana/grafana/pkg/log" +) + +var ( + defaultRes int64 = 1500 + minInterval time.Duration = 1 * time.Millisecond + year time.Duration = time.Hour * 24 * 365 + day time.Duration = time.Hour * 24 * 365 +) + +func CalculateInterval(timerange *TimeRange) string { + interval := time.Duration((timerange.MustGetTo().UnixNano() - timerange.MustGetFrom().UnixNano()) / defaultRes) + + log.Info2("res", "resinMs", time.Duration(interval).String()) + + if interval < minInterval { + return formatDuration(minInterval) + } + + return formatDuration(roundInterval(interval)) +} + +func formatDuration(inter time.Duration) string { + if inter >= year { + return fmt.Sprintf("%dy", inter/year) + } + + if inter >= day { + return fmt.Sprintf("%dd", inter/day) + } + + if inter >= time.Hour { + return fmt.Sprintf("%dh", inter/time.Hour) + } + + if inter >= time.Minute { + return fmt.Sprintf("%dm", inter/time.Minute) + } + + if inter >= time.Second { + return fmt.Sprintf("%ds", inter/time.Second) + } + + if inter >= time.Millisecond { + return fmt.Sprintf("%dms", inter/time.Millisecond) + } + + return "1ms" +} + +func roundInterval(interval time.Duration) time.Duration { + switch true { + // 0.015s + case interval <= 15*time.Millisecond: + return time.Millisecond * 10 // 0.01s + // 0.035s + case interval <= 35*time.Millisecond: + return time.Millisecond * 20 // 0.02s + // 0.075s + case interval <= 75*time.Millisecond: + return time.Millisecond * 50 // 0.05s + // 0.15s + case interval <= 150*time.Millisecond: + return time.Millisecond * 100 // 0.1s + // 0.35s + case interval <= 350*time.Millisecond: + return time.Millisecond * 200 // 0.2s + // 0.75s + case interval <= 750*time.Millisecond: + return time.Millisecond * 500 // 0.5s + // 1.5s + case interval <= 1500*time.Millisecond: + return time.Millisecond * 1000 // 1s + // 3.5s + case interval <= 3500*time.Millisecond: + return time.Millisecond * 2000 // 2s + // 7.5s + case interval <= 7500*time.Millisecond: + return time.Millisecond * 5000 // 5s + // 12.5s + case interval <= 12500*time.Millisecond: + return time.Millisecond * 10000 // 10s + // 17.5s + case interval <= 17500*time.Millisecond: + return time.Millisecond * 15000 // 15s + // 25s + case interval <= 25000*time.Millisecond: + return time.Millisecond * 20000 // 20s + // 45s + case interval <= 45000*time.Millisecond: + return time.Millisecond * 30000 // 30s + // 1.5m + case interval <= 90000*time.Millisecond: + return time.Millisecond * 60000 // 1m + // 3.5m + case interval <= 210000*time.Millisecond: + return time.Millisecond * 120000 // 2m + // 7.5m + case interval <= 450000*time.Millisecond: + return time.Millisecond * 300000 // 5m + // 12.5m + case interval <= 750000*time.Millisecond: + return time.Millisecond * 600000 // 10m + // 12.5m + case interval <= 1050000*time.Millisecond: + return time.Millisecond * 900000 // 15m + // 25m + case interval <= 1500000*time.Millisecond: + return time.Millisecond * 1200000 // 20m + // 45m + case interval <= 2700000*time.Millisecond: + return time.Millisecond * 1800000 // 30m + // 1.5h + case interval <= 5400000*time.Millisecond: + return time.Millisecond * 3600000 // 1h + // 2.5h + case interval <= 9000000*time.Millisecond: + return time.Millisecond * 7200000 // 2h + // 4.5h + case interval <= 16200000*time.Millisecond: + return time.Millisecond * 10800000 // 3h + // 9h + case interval <= 32400000*time.Millisecond: + return time.Millisecond * 21600000 // 6h + // 24h + case interval <= 86400000*time.Millisecond: + return time.Millisecond * 43200000 // 12h + // 48h + case interval <= 172800000*time.Millisecond: + return time.Millisecond * 86400000 // 24h + // 1w + case interval <= 604800000*time.Millisecond: + return time.Millisecond * 86400000 // 24h + // 3w + case interval <= 1814400000*time.Millisecond: + return time.Millisecond * 604800000 // 1w + // 2y + case interval < 3628800000*time.Millisecond: + return time.Millisecond * 2592000000 // 30d + default: + return time.Millisecond * 31536000000 // 1y + } +} diff --git a/pkg/tsdb/interval_test.go b/pkg/tsdb/interval_test.go new file mode 100644 index 00000000000..c06e1879668 --- /dev/null +++ b/pkg/tsdb/interval_test.go @@ -0,0 +1,57 @@ +package tsdb + +import ( + "testing" + "time" + + "github.com/grafana/grafana/pkg/setting" + . "github.com/smartystreets/goconvey/convey" +) + +func TestInterval(t *testing.T) { + Convey("Default interval ", t, func() { + setting.NewConfigContext(&setting.CommandLineArgs{ + HomePath: "../../", + }) + + Convey("for 5min", func() { + tr := NewTimeRange("5m", "now") + + interval := CalculateInterval(tr) + So(interval, ShouldEqual, "200ms") + }) + + Convey("for 15min", func() { + tr := NewTimeRange("15m", "now") + + interval := CalculateInterval(tr) + So(interval, ShouldEqual, "500ms") + }) + + Convey("for 30min", func() { + tr := NewTimeRange("30m", "now") + + interval := CalculateInterval(tr) + So(interval, ShouldEqual, "1s") + }) + + Convey("for 1h", func() { + tr := NewTimeRange("1h", "now") + + interval := CalculateInterval(tr) + So(interval, ShouldEqual, "2s") + }) + + Convey("Round interval", func() { + So(roundInterval(time.Millisecond*30), ShouldEqual, time.Millisecond*20) + So(roundInterval(time.Millisecond*45), ShouldEqual, time.Millisecond*50) + }) + + Convey("Format value", func() { + So(formatDuration(time.Second*61), ShouldEqual, "1m") + So(formatDuration(time.Millisecond*30), ShouldEqual, "30ms") + So(formatDuration(time.Hour*23), ShouldEqual, "23h") + So(formatDuration(time.Hour*24*367), ShouldEqual, "1y") + }) + }) +} From f919d04e3cc389eb8f3bc1bc2f8220c69406d659 Mon Sep 17 00:00:00 2001 From: bergquist Date: Mon, 10 Oct 2016 09:52:53 +0200 Subject: [PATCH 20/25] docs(tsdb): improve fill comment --- pkg/tsdb/influxdb/query_builder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/tsdb/influxdb/query_builder.go b/pkg/tsdb/influxdb/query_builder.go index 5c82a41e78a..cb1be332f1c 100644 --- a/pkg/tsdb/influxdb/query_builder.go +++ b/pkg/tsdb/influxdb/query_builder.go @@ -95,7 +95,7 @@ func renderGroupBy(query *Query, queryContext *tsdb.QueryContext) string { } if i > 0 && group.Type != "fill" { - groupBy += ", " //fill is special. fill is a creep + groupBy += ", " //fill is so very special. fill is a creep, fill is a weirdo } else { groupBy += " " } From 95b9f472d1a5beb8cfc59f403ad4a411b8fb6704 Mon Sep 17 00:00:00 2001 From: bergquist Date: Mon, 10 Oct 2016 11:16:27 +0200 Subject: [PATCH 21/25] tech(influxdb): remove unused logging --- pkg/tsdb/influxdb/response_parser.go | 6 ------ pkg/tsdb/interval.go | 4 ---- 2 files changed, 10 deletions(-) diff --git a/pkg/tsdb/influxdb/response_parser.go b/pkg/tsdb/influxdb/response_parser.go index 267ebc99c63..9f83cc771ae 100644 --- a/pkg/tsdb/influxdb/response_parser.go +++ b/pkg/tsdb/influxdb/response_parser.go @@ -18,12 +18,6 @@ func (rp *ResponseParser) Parse(response *Response) *tsdb.QueryResult { rp.parseResult(result.Series, queryRes) } - /* - for _, serie := range queryRes.Series { - glog.Debug("result", "name", serie.Name, "points", serie.Points) - } - */ - return queryRes } diff --git a/pkg/tsdb/interval.go b/pkg/tsdb/interval.go index 7bcdbfee31a..71caf122c13 100644 --- a/pkg/tsdb/interval.go +++ b/pkg/tsdb/interval.go @@ -3,8 +3,6 @@ package tsdb import ( "fmt" "time" - - "github.com/grafana/grafana/pkg/log" ) var ( @@ -17,8 +15,6 @@ var ( func CalculateInterval(timerange *TimeRange) string { interval := time.Duration((timerange.MustGetTo().UnixNano() - timerange.MustGetFrom().UnixNano()) / defaultRes) - log.Info2("res", "resinMs", time.Duration(interval).String()) - if interval < minInterval { return formatDuration(minInterval) } From f609623abd86520bebbd63923c2c01ac8717eaf0 Mon Sep 17 00:00:00 2001 From: bergquist Date: Mon, 10 Oct 2016 11:34:52 +0200 Subject: [PATCH 22/25] style(influxdb): tidy up code --- pkg/services/alerting/conditions/evaluator.go | 2 +- pkg/tsdb/influxdb/influxdb.go | 76 +++++++++---------- 2 files changed, 39 insertions(+), 39 deletions(-) diff --git a/pkg/services/alerting/conditions/evaluator.go b/pkg/services/alerting/conditions/evaluator.go index 1c154e17ec2..2ab0d085140 100644 --- a/pkg/services/alerting/conditions/evaluator.go +++ b/pkg/services/alerting/conditions/evaluator.go @@ -122,7 +122,7 @@ func NewAlertEvaluator(model *simplejson.Json) (AlertEvaluator, error) { return &NoDataEvaluator{}, nil } - return nil, alerting.ValidationError{Reason: "Evaludator invalid evaluator type"} + return nil, alerting.ValidationError{Reason: "Evaludator invalid evaluator type: " + typ} } func inSlice(a string, list []string) bool { diff --git a/pkg/tsdb/influxdb/influxdb.go b/pkg/tsdb/influxdb/influxdb.go index 075959e9e31..34c88d62073 100644 --- a/pkg/tsdb/influxdb/influxdb.go +++ b/pkg/tsdb/influxdb/influxdb.go @@ -51,6 +51,44 @@ func init() { } } +func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult { + result := &tsdb.BatchResult{} + + query, err := e.getQuery(queries, context) + if err != nil { + return result.WithError(err) + } + + glog.Debug("Influxdb query", "raw query", query) + + req, err := e.createRequest(query) + if err != nil { + return result.WithError(err) + } + + resp, err := ctxhttp.Do(ctx, HttpClient, req) + if err != nil { + return result.WithError(err) + } + + if resp.StatusCode/100 != 2 { + return result.WithError(fmt.Errorf("Influxdb returned statuscode invalid status code: %v", resp.Status)) + } + + var response Response + dec := json.NewDecoder(resp.Body) + dec.UseNumber() + err = dec.Decode(&response) + if err != nil { + return result.WithError(err) + } + + result.QueryResults = make(map[string]*tsdb.QueryResult) + result.QueryResults["A"] = e.ResponseParser.Parse(&response) + + return result +} + func (e *InfluxDBExecutor) getQuery(queries tsdb.QuerySlice, context *tsdb.QueryContext) (string, error) { for _, v := range queries { query, err := e.QueryParser.Parse(v.Model) @@ -92,41 +130,3 @@ func (e *InfluxDBExecutor) createRequest(query string) (*http.Request, error) { glog.Debug("influxdb request", "url", req.URL.String()) return req, nil } - -func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult { - result := &tsdb.BatchResult{} - - query, err := e.getQuery(queries, context) - if err != nil { - return result.WithError(err) - } - - glog.Debug("Influxdb query", "raw query", query) - - req, err := e.createRequest(query) - if err != nil { - return result.WithError(err) - } - - resp, err := ctxhttp.Do(ctx, HttpClient, req) - if err != nil { - return result.WithError(err) - } - - if resp.StatusCode/100 != 2 { - return result.WithError(fmt.Errorf("Influxdb returned statuscode invalid status code: %v", resp.Status)) - } - - var response Response - dec := json.NewDecoder(resp.Body) - dec.UseNumber() - err = dec.Decode(&response) - if err != nil { - return result.WithError(err) - } - - result.QueryResults = make(map[string]*tsdb.QueryResult) - result.QueryResults["A"] = e.ResponseParser.Parse(&response) - - return result -} From 8d96262106448e8fb5b4a03b151b16b50037501e Mon Sep 17 00:00:00 2001 From: bergquist Date: Mon, 10 Oct 2016 11:58:06 +0200 Subject: [PATCH 23/25] chore(tsdb): tidy up code --- pkg/tsdb/influxdb/influxdb.go | 2 +- pkg/tsdb/influxdb/model_parser.go | 1 - pkg/tsdb/influxdb/query_builder.go | 34 ++++++++++---------- pkg/tsdb/influxdb/query_builder_test.go | 5 +-- pkg/tsdb/influxdb/response_parser.go | 42 ++++++++++++++----------- 5 files changed, 45 insertions(+), 39 deletions(-) diff --git a/pkg/tsdb/influxdb/influxdb.go b/pkg/tsdb/influxdb/influxdb.go index 34c88d62073..2666cb0bd71 100644 --- a/pkg/tsdb/influxdb/influxdb.go +++ b/pkg/tsdb/influxdb/influxdb.go @@ -127,6 +127,6 @@ func (e *InfluxDBExecutor) createRequest(query string) (*http.Request, error) { req.SetBasicAuth(e.BasicAuthUser, e.BasicAuthPassword) } - glog.Debug("influxdb request", "url", req.URL.String()) + glog.Debug("Influxdb request", "url", req.URL.String()) return req, nil } diff --git a/pkg/tsdb/influxdb/model_parser.go b/pkg/tsdb/influxdb/model_parser.go index 7d51d087b5d..3cc2de36280 100644 --- a/pkg/tsdb/influxdb/model_parser.go +++ b/pkg/tsdb/influxdb/model_parser.go @@ -134,7 +134,6 @@ func (*InfluxdbQueryParser) parseQueryPart(model *simplejson.Json) (*QueryPart, } return qp, nil - //return &QueryPart{Type: typ, Params: params}, nil } func (qp *InfluxdbQueryParser) parseGroupBy(model *simplejson.Json) ([]*QueryPart, error) { diff --git a/pkg/tsdb/influxdb/query_builder.go b/pkg/tsdb/influxdb/query_builder.go index cb1be332f1c..ecea8a7ebbb 100644 --- a/pkg/tsdb/influxdb/query_builder.go +++ b/pkg/tsdb/influxdb/query_builder.go @@ -9,7 +9,17 @@ import ( type QueryBuilder struct{} -func renderTags(query *Query) []string { +func (qb *QueryBuilder) Build(query *Query, queryContext *tsdb.QueryContext) (string, error) { + res := qb.renderSelectors(query, queryContext) + res += qb.renderMeasurement(query) + res += qb.renderWhereClause(query) + res += qb.renderTimeFilter(query, queryContext) + res += qb.renderGroupBy(query, queryContext) + + return res, nil +} + +func (qb *QueryBuilder) renderTags(query *Query) []string { var res []string for i, tag := range query.Tags { str := "" @@ -29,17 +39,7 @@ func renderTags(query *Query) []string { return res } -func (*QueryBuilder) Build(query *Query, queryContext *tsdb.QueryContext) (string, error) { - res := renderSelectors(query, queryContext) - res += renderMeasurement(query) - res += renderWhereClause(query) - res += renderTimeFilter(query, queryContext) - res += renderGroupBy(query, queryContext) - - return res, nil -} - -func renderTimeFilter(query *Query, queryContext *tsdb.QueryContext) string { +func (qb *QueryBuilder) renderTimeFilter(query *Query, queryContext *tsdb.QueryContext) string { from := "now() - " + queryContext.TimeRange.From to := "" @@ -50,7 +50,7 @@ func renderTimeFilter(query *Query, queryContext *tsdb.QueryContext) string { return fmt.Sprintf("time > %s%s", from, to) } -func renderSelectors(query *Query, queryContext *tsdb.QueryContext) string { +func (qb *QueryBuilder) renderSelectors(query *Query, queryContext *tsdb.QueryContext) string { res := "SELECT " var selectors []string @@ -66,7 +66,7 @@ func renderSelectors(query *Query, queryContext *tsdb.QueryContext) string { return res + strings.Join(selectors, ", ") } -func renderMeasurement(query *Query) string { +func (qb *QueryBuilder) renderMeasurement(query *Query) string { policy := "" if query.Policy == "" || query.Policy == "default" { policy = "" @@ -76,9 +76,9 @@ func renderMeasurement(query *Query) string { return fmt.Sprintf(` FROM %s"%s"`, policy, query.Measurement) } -func renderWhereClause(query *Query) string { +func (qb *QueryBuilder) renderWhereClause(query *Query) string { res := " WHERE " - conditions := renderTags(query) + conditions := qb.renderTags(query) res += strings.Join(conditions, " ") if len(conditions) > 0 { res += " AND " @@ -87,7 +87,7 @@ func renderWhereClause(query *Query) string { return res } -func renderGroupBy(query *Query, queryContext *tsdb.QueryContext) string { +func (qb *QueryBuilder) renderGroupBy(query *Query, queryContext *tsdb.QueryContext) string { groupBy := "" for i, group := range query.GroupBy { if i == 0 { diff --git a/pkg/tsdb/influxdb/query_builder_test.go b/pkg/tsdb/influxdb/query_builder_test.go index 083d09b360e..c8f1fb914dd 100644 --- a/pkg/tsdb/influxdb/query_builder_test.go +++ b/pkg/tsdb/influxdb/query_builder_test.go @@ -56,15 +56,16 @@ func TestInfluxdbQueryBuilder(t *testing.T) { Convey("can render time range", func() { query := Query{} + builder := &QueryBuilder{} Convey("render from: 2h to now-1h", func() { query := Query{} queryContext := &tsdb.QueryContext{TimeRange: tsdb.NewTimeRange("2h", "now-1h")} - So(renderTimeFilter(&query, queryContext), ShouldEqual, "time > now() - 2h and time < now() - 1h") + So(builder.renderTimeFilter(&query, queryContext), ShouldEqual, "time > now() - 2h and time < now() - 1h") }) Convey("render from: 10m", func() { queryContext := &tsdb.QueryContext{TimeRange: tsdb.NewTimeRange("10m", "now")} - So(renderTimeFilter(&query, queryContext), ShouldEqual, "time > now() - 10m") + So(builder.renderTimeFilter(&query, queryContext), ShouldEqual, "time > now() - 10m") }) }) }) diff --git a/pkg/tsdb/influxdb/response_parser.go b/pkg/tsdb/influxdb/response_parser.go index 9f83cc771ae..44afa910b27 100644 --- a/pkg/tsdb/influxdb/response_parser.go +++ b/pkg/tsdb/influxdb/response_parser.go @@ -15,33 +15,39 @@ func (rp *ResponseParser) Parse(response *Response) *tsdb.QueryResult { queryRes := tsdb.NewQueryResult() for _, result := range response.Results { - rp.parseResult(result.Series, queryRes) + queryRes.Series = append(queryRes.Series, rp.transformRows(result.Series, queryRes)...) } return queryRes } -func (rp *ResponseParser) parseResult(result []Row, queryResult *tsdb.QueryResult) { - for _, r := range result { - for columnIndex, column := range r.Columns { +func (rp *ResponseParser) transformRows(rows []Row, queryResult *tsdb.QueryResult) tsdb.TimeSeriesSlice { + var result tsdb.TimeSeriesSlice + + for _, row := range rows { + for columnIndex, column := range row.Columns { if column == "time" { continue } var points tsdb.TimeSeriesPoints - for _, k := range r.Values { - points = append(points, rp.parseTimepoint(k, columnIndex)) + for _, valuePair := range row.Values { + point, err := rp.parseTimepoint(valuePair, columnIndex) + if err == nil { + points = append(points, point) + } } - - queryResult.Series = append(queryResult.Series, &tsdb.TimeSeries{ - Name: rp.formatName(r, column), + result = append(result, &tsdb.TimeSeries{ + Name: rp.formatSerieName(row, column), Points: points, }) } } + + return result } -func (rp *ResponseParser) formatName(row Row, column string) string { +func (rp *ResponseParser) formatSerieName(row Row, column string) string { var tags []string for k, v := range row.Tags { @@ -56,30 +62,30 @@ func (rp *ResponseParser) formatName(row Row, column string) string { return fmt.Sprintf("%s.%s%s", row.Name, column, tagText) } -func (rp *ResponseParser) parseTimepoint(k []interface{}, valuePosition int) tsdb.TimePoint { - var value null.Float = rp.parseValue(k[valuePosition]) +func (rp *ResponseParser) parseTimepoint(valuePair []interface{}, valuePosition int) (tsdb.TimePoint, error) { + var value null.Float = rp.parseValue(valuePair[valuePosition]) - timestampNumber, _ := k[0].(json.Number) + timestampNumber, _ := valuePair[0].(json.Number) timestamp, err := timestampNumber.Float64() if err != nil { - glog.Error("Invalid timestamp format. This should never happen!") + return tsdb.TimePoint{}, err } - return tsdb.NewTimePoint(value, timestamp) + return tsdb.NewTimePoint(value, timestamp), nil } func (rp *ResponseParser) parseValue(value interface{}) null.Float { - num, ok := value.(json.Number) + number, ok := value.(json.Number) if !ok { return null.FloatFromPtr(nil) } - fvalue, err := num.Float64() + fvalue, err := number.Float64() if err == nil { return null.FloatFrom(fvalue) } - ivalue, err := num.Int64() + ivalue, err := number.Int64() if err == nil { return null.FloatFrom(float64(ivalue)) } From d4fd1c82e3e6742342f108434cebe6136c0c6732 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torkel=20=C3=96degaard?= Date: Mon, 10 Oct 2016 19:47:16 +0200 Subject: [PATCH 24/25] fix(alerting): added step any to alert threshold input fields, fixes #6205 --- public/app/features/alerting/partials/alert_tab.html | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/public/app/features/alerting/partials/alert_tab.html b/public/app/features/alerting/partials/alert_tab.html index bb6fe7547b2..07ff28dcddb 100644 --- a/public/app/features/alerting/partials/alert_tab.html +++ b/public/app/features/alerting/partials/alert_tab.html @@ -52,9 +52,9 @@
- + - +