mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Merge pull request #6221 from grafana/alerting_influxdb
Alerting support for influxdb
This commit is contained in:
@@ -1,5 +1,7 @@
|
||||
influxdb:
|
||||
image: tutum/influxdb:0.12
|
||||
#image: influxdb/influxdb:1.0-alpine
|
||||
image: influxdb:latest
|
||||
container_name: influxdb
|
||||
ports:
|
||||
- "2004:2004"
|
||||
- "8083:8083"
|
||||
|
||||
@@ -20,6 +20,7 @@ import (
|
||||
_ "github.com/grafana/grafana/pkg/services/alerting/conditions"
|
||||
_ "github.com/grafana/grafana/pkg/services/alerting/notifiers"
|
||||
_ "github.com/grafana/grafana/pkg/tsdb/graphite"
|
||||
_ "github.com/grafana/grafana/pkg/tsdb/influxdb"
|
||||
_ "github.com/grafana/grafana/pkg/tsdb/prometheus"
|
||||
_ "github.com/grafana/grafana/pkg/tsdb/testdata"
|
||||
)
|
||||
|
||||
@@ -122,7 +122,7 @@ func NewAlertEvaluator(model *simplejson.Json) (AlertEvaluator, error) {
|
||||
return &NoDataEvaluator{}, nil
|
||||
}
|
||||
|
||||
return nil, alerting.ValidationError{Reason: "Evaludator invalid evaluator type"}
|
||||
return nil, alerting.ValidationError{Reason: "Evaludator invalid evaluator type: " + typ}
|
||||
}
|
||||
|
||||
func inSlice(a string, list []string) bool {
|
||||
|
||||
@@ -3,6 +3,8 @@ package conditions
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"gopkg.in/guregu/null.v3"
|
||||
|
||||
"github.com/grafana/grafana/pkg/tsdb"
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
)
|
||||
@@ -43,7 +45,7 @@ func testReducer(typ string, datapoints ...float64) float64 {
|
||||
}
|
||||
|
||||
for idx := range datapoints {
|
||||
series.Points = append(series.Points, tsdb.NewTimePoint(datapoints[idx], 1234134))
|
||||
series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(datapoints[idx]), 1234134))
|
||||
}
|
||||
|
||||
return reducer.Reduce(series).Float64
|
||||
|
||||
132
pkg/tsdb/influxdb/influxdb.go
Normal file
132
pkg/tsdb/influxdb/influxdb.go
Normal file
@@ -0,0 +1,132 @@
|
||||
package influxdb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context/ctxhttp"
|
||||
|
||||
"github.com/grafana/grafana/pkg/log"
|
||||
"github.com/grafana/grafana/pkg/tsdb"
|
||||
)
|
||||
|
||||
type InfluxDBExecutor struct {
|
||||
*tsdb.DataSourceInfo
|
||||
QueryParser *InfluxdbQueryParser
|
||||
QueryBuilder *QueryBuilder
|
||||
ResponseParser *ResponseParser
|
||||
}
|
||||
|
||||
func NewInfluxDBExecutor(dsInfo *tsdb.DataSourceInfo) tsdb.Executor {
|
||||
return &InfluxDBExecutor{
|
||||
DataSourceInfo: dsInfo,
|
||||
QueryParser: &InfluxdbQueryParser{},
|
||||
QueryBuilder: &QueryBuilder{},
|
||||
ResponseParser: &ResponseParser{},
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
glog log.Logger
|
||||
HttpClient *http.Client
|
||||
)
|
||||
|
||||
func init() {
|
||||
glog = log.New("tsdb.influxdb")
|
||||
tsdb.RegisterExecutor("influxdb", NewInfluxDBExecutor)
|
||||
|
||||
tr := &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
|
||||
}
|
||||
|
||||
HttpClient = &http.Client{
|
||||
Timeout: time.Duration(15 * time.Second),
|
||||
Transport: tr,
|
||||
}
|
||||
}
|
||||
|
||||
func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult {
|
||||
result := &tsdb.BatchResult{}
|
||||
|
||||
query, err := e.getQuery(queries, context)
|
||||
if err != nil {
|
||||
return result.WithError(err)
|
||||
}
|
||||
|
||||
glog.Debug("Influxdb query", "raw query", query)
|
||||
|
||||
req, err := e.createRequest(query)
|
||||
if err != nil {
|
||||
return result.WithError(err)
|
||||
}
|
||||
|
||||
resp, err := ctxhttp.Do(ctx, HttpClient, req)
|
||||
if err != nil {
|
||||
return result.WithError(err)
|
||||
}
|
||||
|
||||
if resp.StatusCode/100 != 2 {
|
||||
return result.WithError(fmt.Errorf("Influxdb returned statuscode invalid status code: %v", resp.Status))
|
||||
}
|
||||
|
||||
var response Response
|
||||
dec := json.NewDecoder(resp.Body)
|
||||
dec.UseNumber()
|
||||
err = dec.Decode(&response)
|
||||
if err != nil {
|
||||
return result.WithError(err)
|
||||
}
|
||||
|
||||
result.QueryResults = make(map[string]*tsdb.QueryResult)
|
||||
result.QueryResults["A"] = e.ResponseParser.Parse(&response)
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func (e *InfluxDBExecutor) getQuery(queries tsdb.QuerySlice, context *tsdb.QueryContext) (string, error) {
|
||||
for _, v := range queries {
|
||||
query, err := e.QueryParser.Parse(v.Model)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
rawQuery, err := e.QueryBuilder.Build(query, context)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return rawQuery, nil
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("query request contains no queries")
|
||||
}
|
||||
|
||||
func (e *InfluxDBExecutor) createRequest(query string) (*http.Request, error) {
|
||||
u, _ := url.Parse(e.Url)
|
||||
u.Path = path.Join(u.Path, "query")
|
||||
|
||||
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
params := req.URL.Query()
|
||||
params.Set("q", query)
|
||||
params.Set("db", e.Database)
|
||||
params.Set("epoch", "s")
|
||||
req.URL.RawQuery = params.Encode()
|
||||
|
||||
req.Header.Set("User-Agent", "Grafana")
|
||||
if e.BasicAuth {
|
||||
req.SetBasicAuth(e.BasicAuthUser, e.BasicAuthPassword)
|
||||
}
|
||||
|
||||
glog.Debug("Influxdb request", "url", req.URL.String())
|
||||
return req, nil
|
||||
}
|
||||
153
pkg/tsdb/influxdb/model_parser.go
Normal file
153
pkg/tsdb/influxdb/model_parser.go
Normal file
@@ -0,0 +1,153 @@
|
||||
package influxdb
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
)
|
||||
|
||||
type InfluxdbQueryParser struct{}
|
||||
|
||||
func (qp *InfluxdbQueryParser) Parse(model *simplejson.Json) (*Query, error) {
|
||||
policy := model.Get("policy").MustString("default")
|
||||
|
||||
measurement, err := model.Get("measurement").String()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resultFormat, err := model.Get("resultFormat").String()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tags, err := qp.parseTags(model)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
groupBys, err := qp.parseGroupBy(model)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
selects, err := qp.parseSelects(model)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Query{
|
||||
Measurement: measurement,
|
||||
Policy: policy,
|
||||
ResultFormat: resultFormat,
|
||||
GroupBy: groupBys,
|
||||
Tags: tags,
|
||||
Selects: selects,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (qp *InfluxdbQueryParser) parseSelects(model *simplejson.Json) ([]*Select, error) {
|
||||
var result []*Select
|
||||
|
||||
for _, selectObj := range model.Get("select").MustArray() {
|
||||
selectJson := simplejson.NewFromAny(selectObj)
|
||||
var parts Select
|
||||
|
||||
for _, partObj := range selectJson.MustArray() {
|
||||
part := simplejson.NewFromAny(partObj)
|
||||
queryPart, err := qp.parseQueryPart(part)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
parts = append(parts, *queryPart)
|
||||
}
|
||||
|
||||
result = append(result, &parts)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (*InfluxdbQueryParser) parseTags(model *simplejson.Json) ([]*Tag, error) {
|
||||
var result []*Tag
|
||||
for _, t := range model.Get("tags").MustArray() {
|
||||
tagJson := simplejson.NewFromAny(t)
|
||||
tag := &Tag{}
|
||||
var err error
|
||||
|
||||
tag.Key, err = tagJson.Get("key").String()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tag.Value, err = tagJson.Get("value").String()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
operator, err := tagJson.Get("operator").String()
|
||||
if err == nil {
|
||||
tag.Operator = operator
|
||||
}
|
||||
|
||||
condition, err := tagJson.Get("condition").String()
|
||||
if err == nil {
|
||||
tag.Condition = condition
|
||||
}
|
||||
|
||||
result = append(result, tag)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (*InfluxdbQueryParser) parseQueryPart(model *simplejson.Json) (*QueryPart, error) {
|
||||
typ, err := model.Get("type").String()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var params []string
|
||||
for _, paramObj := range model.Get("params").MustArray() {
|
||||
param := simplejson.NewFromAny(paramObj)
|
||||
|
||||
stringParam, err := param.String()
|
||||
if err == nil {
|
||||
params = append(params, stringParam)
|
||||
continue
|
||||
}
|
||||
|
||||
intParam, err := param.Int()
|
||||
if err == nil {
|
||||
params = append(params, strconv.Itoa(intParam))
|
||||
continue
|
||||
}
|
||||
|
||||
return nil, err
|
||||
|
||||
}
|
||||
|
||||
qp, err := NewQueryPart(typ, params)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return qp, nil
|
||||
}
|
||||
|
||||
func (qp *InfluxdbQueryParser) parseGroupBy(model *simplejson.Json) ([]*QueryPart, error) {
|
||||
var result []*QueryPart
|
||||
|
||||
for _, groupObj := range model.Get("groupBy").MustArray() {
|
||||
groupJson := simplejson.NewFromAny(groupObj)
|
||||
queryPart, err := qp.parseQueryPart(groupJson)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, queryPart)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
115
pkg/tsdb/influxdb/model_parser_test.go
Normal file
115
pkg/tsdb/influxdb/model_parser_test.go
Normal file
@@ -0,0 +1,115 @@
|
||||
package influxdb
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
)
|
||||
|
||||
func TestInfluxdbQueryParser(t *testing.T) {
|
||||
Convey("Influxdb query parser", t, func() {
|
||||
|
||||
parser := &InfluxdbQueryParser{}
|
||||
|
||||
Convey("can parse influxdb json model", func() {
|
||||
json := `
|
||||
{
|
||||
"dsType": "influxdb",
|
||||
"groupBy": [
|
||||
{
|
||||
"params": [
|
||||
"$interval"
|
||||
],
|
||||
"type": "time"
|
||||
},
|
||||
{
|
||||
"params": [
|
||||
"datacenter"
|
||||
],
|
||||
"type": "tag"
|
||||
},
|
||||
{
|
||||
"params": [
|
||||
"none"
|
||||
],
|
||||
"type": "fill"
|
||||
}
|
||||
],
|
||||
"measurement": "logins.count",
|
||||
"policy": "default",
|
||||
"refId": "B",
|
||||
"resultFormat": "time_series",
|
||||
"select": [
|
||||
[
|
||||
{
|
||||
"type": "field",
|
||||
"params": [
|
||||
"value"
|
||||
]
|
||||
},
|
||||
{
|
||||
"type": "count",
|
||||
"params": []
|
||||
}
|
||||
],
|
||||
[
|
||||
{
|
||||
"type": "field",
|
||||
"params": [
|
||||
"value"
|
||||
]
|
||||
},
|
||||
{
|
||||
"type": "bottom",
|
||||
"params": [
|
||||
3
|
||||
]
|
||||
}
|
||||
],
|
||||
[
|
||||
{
|
||||
"type": "field",
|
||||
"params": [
|
||||
"value"
|
||||
]
|
||||
},
|
||||
{
|
||||
"type": "mean",
|
||||
"params": []
|
||||
},
|
||||
{
|
||||
"type": "math",
|
||||
"params": [
|
||||
" / 100"
|
||||
]
|
||||
}
|
||||
]
|
||||
],
|
||||
"tags": [
|
||||
{
|
||||
"key": "datacenter",
|
||||
"operator": "=",
|
||||
"value": "America"
|
||||
},
|
||||
{
|
||||
"condition": "OR",
|
||||
"key": "hostname",
|
||||
"operator": "=",
|
||||
"value": "server1"
|
||||
}
|
||||
]
|
||||
}
|
||||
`
|
||||
|
||||
modelJson, err := simplejson.NewJson([]byte(json))
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
res, err := parser.Parse(modelJson)
|
||||
So(err, ShouldBeNil)
|
||||
So(len(res.GroupBy), ShouldEqual, 3)
|
||||
So(len(res.Selects), ShouldEqual, 3)
|
||||
So(len(res.Tags), ShouldEqual, 2)
|
||||
})
|
||||
})
|
||||
}
|
||||
48
pkg/tsdb/influxdb/models.go
Normal file
48
pkg/tsdb/influxdb/models.go
Normal file
@@ -0,0 +1,48 @@
|
||||
package influxdb
|
||||
|
||||
type Query struct {
|
||||
Measurement string
|
||||
Policy string
|
||||
ResultFormat string
|
||||
Tags []*Tag
|
||||
GroupBy []*QueryPart
|
||||
Selects []*Select
|
||||
|
||||
Interval string
|
||||
}
|
||||
|
||||
type Tag struct {
|
||||
Key string
|
||||
Operator string
|
||||
Value string
|
||||
Condition string
|
||||
}
|
||||
|
||||
type Select []QueryPart
|
||||
|
||||
type InfluxDbSelect struct {
|
||||
Type string
|
||||
}
|
||||
|
||||
type Response struct {
|
||||
Results []Result
|
||||
Err error
|
||||
}
|
||||
|
||||
type Result struct {
|
||||
Series []Row
|
||||
Messages []*Message
|
||||
Err error
|
||||
}
|
||||
|
||||
type Message struct {
|
||||
Level string `json:"level,omitempty"`
|
||||
Text string `json:"text,omitempty"`
|
||||
}
|
||||
|
||||
type Row struct {
|
||||
Name string `json:"name,omitempty"`
|
||||
Tags map[string]string `json:"tags,omitempty"`
|
||||
Columns []string `json:"columns,omitempty"`
|
||||
Values [][]interface{} `json:"values,omitempty"`
|
||||
}
|
||||
107
pkg/tsdb/influxdb/query_builder.go
Normal file
107
pkg/tsdb/influxdb/query_builder.go
Normal file
@@ -0,0 +1,107 @@
|
||||
package influxdb
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/grafana/grafana/pkg/tsdb"
|
||||
)
|
||||
|
||||
type QueryBuilder struct{}
|
||||
|
||||
func (qb *QueryBuilder) Build(query *Query, queryContext *tsdb.QueryContext) (string, error) {
|
||||
res := qb.renderSelectors(query, queryContext)
|
||||
res += qb.renderMeasurement(query)
|
||||
res += qb.renderWhereClause(query)
|
||||
res += qb.renderTimeFilter(query, queryContext)
|
||||
res += qb.renderGroupBy(query, queryContext)
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (qb *QueryBuilder) renderTags(query *Query) []string {
|
||||
var res []string
|
||||
for i, tag := range query.Tags {
|
||||
str := ""
|
||||
|
||||
if i > 0 {
|
||||
if tag.Condition == "" {
|
||||
str += "AND"
|
||||
} else {
|
||||
str += tag.Condition
|
||||
}
|
||||
str += " "
|
||||
}
|
||||
|
||||
res = append(res, fmt.Sprintf(`%s"%s" %s '%s'`, str, tag.Key, tag.Operator, tag.Value))
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
func (qb *QueryBuilder) renderTimeFilter(query *Query, queryContext *tsdb.QueryContext) string {
|
||||
from := "now() - " + queryContext.TimeRange.From
|
||||
to := ""
|
||||
|
||||
if queryContext.TimeRange.To != "now" && queryContext.TimeRange.To != "" {
|
||||
to = " and time < now() - " + strings.Replace(queryContext.TimeRange.To, "now-", "", 1)
|
||||
}
|
||||
|
||||
return fmt.Sprintf("time > %s%s", from, to)
|
||||
}
|
||||
|
||||
func (qb *QueryBuilder) renderSelectors(query *Query, queryContext *tsdb.QueryContext) string {
|
||||
res := "SELECT "
|
||||
|
||||
var selectors []string
|
||||
for _, sel := range query.Selects {
|
||||
|
||||
stk := ""
|
||||
for _, s := range *sel {
|
||||
stk = s.Render(queryContext, stk)
|
||||
}
|
||||
selectors = append(selectors, stk)
|
||||
}
|
||||
|
||||
return res + strings.Join(selectors, ", ")
|
||||
}
|
||||
|
||||
func (qb *QueryBuilder) renderMeasurement(query *Query) string {
|
||||
policy := ""
|
||||
if query.Policy == "" || query.Policy == "default" {
|
||||
policy = ""
|
||||
} else {
|
||||
policy = `"` + query.Policy + `".`
|
||||
}
|
||||
return fmt.Sprintf(` FROM %s"%s"`, policy, query.Measurement)
|
||||
}
|
||||
|
||||
func (qb *QueryBuilder) renderWhereClause(query *Query) string {
|
||||
res := " WHERE "
|
||||
conditions := qb.renderTags(query)
|
||||
res += strings.Join(conditions, " ")
|
||||
if len(conditions) > 0 {
|
||||
res += " AND "
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
func (qb *QueryBuilder) renderGroupBy(query *Query, queryContext *tsdb.QueryContext) string {
|
||||
groupBy := ""
|
||||
for i, group := range query.GroupBy {
|
||||
if i == 0 {
|
||||
groupBy += " GROUP BY"
|
||||
}
|
||||
|
||||
if i > 0 && group.Type != "fill" {
|
||||
groupBy += ", " //fill is so very special. fill is a creep, fill is a weirdo
|
||||
} else {
|
||||
groupBy += " "
|
||||
}
|
||||
|
||||
groupBy += group.Render(queryContext, "")
|
||||
}
|
||||
|
||||
return groupBy
|
||||
}
|
||||
72
pkg/tsdb/influxdb/query_builder_test.go
Normal file
72
pkg/tsdb/influxdb/query_builder_test.go
Normal file
@@ -0,0 +1,72 @@
|
||||
package influxdb
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/grafana/grafana/pkg/tsdb"
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
)
|
||||
|
||||
func TestInfluxdbQueryBuilder(t *testing.T) {
|
||||
|
||||
Convey("Influxdb query builder", t, func() {
|
||||
builder := QueryBuilder{}
|
||||
|
||||
qp1, _ := NewQueryPart("field", []string{"value"})
|
||||
qp2, _ := NewQueryPart("mean", []string{})
|
||||
|
||||
groupBy1, _ := NewQueryPart("time", []string{"$interval"})
|
||||
groupBy2, _ := NewQueryPart("tag", []string{"datacenter"})
|
||||
groupBy3, _ := NewQueryPart("fill", []string{"null"})
|
||||
|
||||
tag1 := &Tag{Key: "hostname", Value: "server1", Operator: "="}
|
||||
tag2 := &Tag{Key: "hostname", Value: "server2", Operator: "=", Condition: "OR"}
|
||||
|
||||
queryContext := &tsdb.QueryContext{
|
||||
TimeRange: tsdb.NewTimeRange("5m", "now"),
|
||||
}
|
||||
|
||||
Convey("can build simple query", func() {
|
||||
query := &Query{
|
||||
Selects: []*Select{{*qp1, *qp2}},
|
||||
Measurement: "cpu",
|
||||
Policy: "policy",
|
||||
GroupBy: []*QueryPart{groupBy1, groupBy3},
|
||||
Interval: "10s",
|
||||
}
|
||||
|
||||
rawQuery, err := builder.Build(query, queryContext)
|
||||
So(err, ShouldBeNil)
|
||||
So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "policy"."cpu" WHERE time > now() - 5m GROUP BY time(200ms) fill(null)`)
|
||||
})
|
||||
|
||||
Convey("can build query with group bys", func() {
|
||||
query := &Query{
|
||||
Selects: []*Select{{*qp1, *qp2}},
|
||||
Measurement: "cpu",
|
||||
GroupBy: []*QueryPart{groupBy1, groupBy2, groupBy3},
|
||||
Tags: []*Tag{tag1, tag2},
|
||||
Interval: "5s",
|
||||
}
|
||||
|
||||
rawQuery, err := builder.Build(query, queryContext)
|
||||
So(err, ShouldBeNil)
|
||||
So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "cpu" WHERE "hostname" = 'server1' OR "hostname" = 'server2' AND time > now() - 5m GROUP BY time(200ms), "datacenter" fill(null)`)
|
||||
})
|
||||
|
||||
Convey("can render time range", func() {
|
||||
query := Query{}
|
||||
builder := &QueryBuilder{}
|
||||
Convey("render from: 2h to now-1h", func() {
|
||||
query := Query{}
|
||||
queryContext := &tsdb.QueryContext{TimeRange: tsdb.NewTimeRange("2h", "now-1h")}
|
||||
So(builder.renderTimeFilter(&query, queryContext), ShouldEqual, "time > now() - 2h and time < now() - 1h")
|
||||
})
|
||||
|
||||
Convey("render from: 10m", func() {
|
||||
queryContext := &tsdb.QueryContext{TimeRange: tsdb.NewTimeRange("10m", "now")}
|
||||
So(builder.renderTimeFilter(&query, queryContext), ShouldEqual, "time > now() - 10m")
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
145
pkg/tsdb/influxdb/query_part.go
Normal file
145
pkg/tsdb/influxdb/query_part.go
Normal file
@@ -0,0 +1,145 @@
|
||||
package influxdb
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/grafana/grafana/pkg/tsdb"
|
||||
)
|
||||
|
||||
var renders map[string]QueryDefinition
|
||||
|
||||
type DefinitionParameters struct {
|
||||
Name string
|
||||
Type string
|
||||
}
|
||||
|
||||
type QueryDefinition struct {
|
||||
Renderer func(queryContext *tsdb.QueryContext, part *QueryPart, innerExpr string) string
|
||||
Params []DefinitionParameters
|
||||
}
|
||||
|
||||
func init() {
|
||||
renders = make(map[string]QueryDefinition)
|
||||
|
||||
renders["field"] = QueryDefinition{Renderer: fieldRenderer}
|
||||
|
||||
renders["spread"] = QueryDefinition{Renderer: functionRenderer}
|
||||
renders["count"] = QueryDefinition{Renderer: functionRenderer}
|
||||
renders["distinct"] = QueryDefinition{Renderer: functionRenderer}
|
||||
renders["integral"] = QueryDefinition{Renderer: functionRenderer}
|
||||
renders["mean"] = QueryDefinition{Renderer: functionRenderer}
|
||||
renders["median"] = QueryDefinition{Renderer: functionRenderer}
|
||||
renders["sum"] = QueryDefinition{Renderer: functionRenderer}
|
||||
|
||||
renders["derivative"] = QueryDefinition{
|
||||
Renderer: functionRenderer,
|
||||
Params: []DefinitionParameters{{Name: "duration", Type: "interval"}},
|
||||
}
|
||||
|
||||
renders["non_negative_derivative"] = QueryDefinition{
|
||||
Renderer: functionRenderer,
|
||||
Params: []DefinitionParameters{{Name: "duration", Type: "interval"}},
|
||||
}
|
||||
renders["difference"] = QueryDefinition{Renderer: functionRenderer}
|
||||
renders["moving_average"] = QueryDefinition{
|
||||
Renderer: functionRenderer,
|
||||
Params: []DefinitionParameters{{Name: "window", Type: "number"}},
|
||||
}
|
||||
renders["stddev"] = QueryDefinition{Renderer: functionRenderer}
|
||||
renders["time"] = QueryDefinition{
|
||||
Renderer: functionRenderer,
|
||||
Params: []DefinitionParameters{{Name: "interval", Type: "time"}},
|
||||
}
|
||||
renders["fill"] = QueryDefinition{
|
||||
Renderer: functionRenderer,
|
||||
Params: []DefinitionParameters{{Name: "fill", Type: "string"}},
|
||||
}
|
||||
renders["elapsed"] = QueryDefinition{
|
||||
Renderer: functionRenderer,
|
||||
Params: []DefinitionParameters{{Name: "duration", Type: "interval"}},
|
||||
}
|
||||
renders["bottom"] = QueryDefinition{
|
||||
Renderer: functionRenderer,
|
||||
Params: []DefinitionParameters{{Name: "count", Type: "int"}},
|
||||
}
|
||||
|
||||
renders["first"] = QueryDefinition{Renderer: functionRenderer}
|
||||
renders["last"] = QueryDefinition{Renderer: functionRenderer}
|
||||
renders["max"] = QueryDefinition{Renderer: functionRenderer}
|
||||
renders["min"] = QueryDefinition{Renderer: functionRenderer}
|
||||
renders["percentile"] = QueryDefinition{
|
||||
Renderer: functionRenderer,
|
||||
Params: []DefinitionParameters{{Name: "nth", Type: "int"}},
|
||||
}
|
||||
renders["top"] = QueryDefinition{
|
||||
Renderer: functionRenderer,
|
||||
Params: []DefinitionParameters{{Name: "count", Type: "int"}},
|
||||
}
|
||||
renders["tag"] = QueryDefinition{
|
||||
Renderer: fieldRenderer,
|
||||
Params: []DefinitionParameters{{Name: "tag", Type: "string"}},
|
||||
}
|
||||
|
||||
renders["math"] = QueryDefinition{Renderer: suffixRenderer}
|
||||
renders["alias"] = QueryDefinition{Renderer: aliasRenderer}
|
||||
}
|
||||
|
||||
func fieldRenderer(queryContext *tsdb.QueryContext, part *QueryPart, innerExpr string) string {
|
||||
if part.Params[0] == "*" {
|
||||
return "*"
|
||||
}
|
||||
return fmt.Sprintf(`"%s"`, part.Params[0])
|
||||
}
|
||||
|
||||
func functionRenderer(queryContext *tsdb.QueryContext, part *QueryPart, innerExpr string) string {
|
||||
for i, v := range part.Params {
|
||||
if v == "$interval" {
|
||||
part.Params[i] = tsdb.CalculateInterval(queryContext.TimeRange)
|
||||
}
|
||||
}
|
||||
|
||||
if innerExpr != "" {
|
||||
part.Params = append([]string{innerExpr}, part.Params...)
|
||||
}
|
||||
|
||||
params := strings.Join(part.Params, ", ")
|
||||
|
||||
return fmt.Sprintf("%s(%s)", part.Type, params)
|
||||
}
|
||||
|
||||
func suffixRenderer(queryContext *tsdb.QueryContext, part *QueryPart, innerExpr string) string {
|
||||
return fmt.Sprintf("%s %s", innerExpr, part.Params[0])
|
||||
}
|
||||
|
||||
func aliasRenderer(queryContext *tsdb.QueryContext, part *QueryPart, innerExpr string) string {
|
||||
return fmt.Sprintf(`%s AS "%s"`, innerExpr, part.Params[0])
|
||||
}
|
||||
|
||||
func (r QueryDefinition) Render(queryContext *tsdb.QueryContext, part *QueryPart, innerExpr string) string {
|
||||
return r.Renderer(queryContext, part, innerExpr)
|
||||
}
|
||||
|
||||
func NewQueryPart(typ string, params []string) (*QueryPart, error) {
|
||||
def, exist := renders[typ]
|
||||
|
||||
if !exist {
|
||||
return nil, fmt.Errorf("Missing query definition for %s", typ)
|
||||
}
|
||||
|
||||
return &QueryPart{
|
||||
Type: typ,
|
||||
Params: params,
|
||||
Def: def,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type QueryPart struct {
|
||||
Def QueryDefinition
|
||||
Type string
|
||||
Params []string
|
||||
}
|
||||
|
||||
func (qp *QueryPart) Render(queryContext *tsdb.QueryContext, expr string) string {
|
||||
return qp.Def.Renderer(queryContext, qp, expr)
|
||||
}
|
||||
73
pkg/tsdb/influxdb/query_part_test.go
Normal file
73
pkg/tsdb/influxdb/query_part_test.go
Normal file
@@ -0,0 +1,73 @@
|
||||
package influxdb
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/grafana/grafana/pkg/tsdb"
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
)
|
||||
|
||||
func TestInfluxdbQueryPart(t *testing.T) {
|
||||
Convey("Influxdb query parts", t, func() {
|
||||
|
||||
queryContext := &tsdb.QueryContext{
|
||||
TimeRange: tsdb.NewTimeRange("5m", "now"),
|
||||
}
|
||||
|
||||
Convey("render field ", func() {
|
||||
part, err := NewQueryPart("field", []string{"value"})
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
res := part.Render(queryContext, "value")
|
||||
So(res, ShouldEqual, `"value"`)
|
||||
})
|
||||
|
||||
Convey("render nested part", func() {
|
||||
part, err := NewQueryPart("derivative", []string{"10s"})
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
res := part.Render(queryContext, "mean(value)")
|
||||
So(res, ShouldEqual, "derivative(mean(value), 10s)")
|
||||
})
|
||||
|
||||
Convey("render bottom", func() {
|
||||
part, err := NewQueryPart("bottom", []string{"3"})
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
res := part.Render(queryContext, "value")
|
||||
So(res, ShouldEqual, "bottom(value, 3)")
|
||||
})
|
||||
|
||||
Convey("render time", func() {
|
||||
part, err := NewQueryPart("time", []string{"$interval"})
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
res := part.Render(queryContext, "")
|
||||
So(res, ShouldEqual, "time(200ms)")
|
||||
})
|
||||
|
||||
Convey("render spread", func() {
|
||||
part, err := NewQueryPart("spread", []string{})
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
res := part.Render(queryContext, "value")
|
||||
So(res, ShouldEqual, `spread(value)`)
|
||||
})
|
||||
|
||||
Convey("render suffix", func() {
|
||||
part, err := NewQueryPart("math", []string{"/ 100"})
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
res := part.Render(queryContext, "mean(value)")
|
||||
So(res, ShouldEqual, "mean(value) / 100")
|
||||
})
|
||||
|
||||
Convey("render alias", func() {
|
||||
part, err := NewQueryPart("alias", []string{"test"})
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
res := part.Render(queryContext, "mean(value)")
|
||||
So(res, ShouldEqual, `mean(value) AS "test"`)
|
||||
})
|
||||
})
|
||||
}
|
||||
94
pkg/tsdb/influxdb/response_parser.go
Normal file
94
pkg/tsdb/influxdb/response_parser.go
Normal file
@@ -0,0 +1,94 @@
|
||||
package influxdb
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/grafana/grafana/pkg/tsdb"
|
||||
"gopkg.in/guregu/null.v3"
|
||||
)
|
||||
|
||||
type ResponseParser struct{}
|
||||
|
||||
func (rp *ResponseParser) Parse(response *Response) *tsdb.QueryResult {
|
||||
queryRes := tsdb.NewQueryResult()
|
||||
|
||||
for _, result := range response.Results {
|
||||
queryRes.Series = append(queryRes.Series, rp.transformRows(result.Series, queryRes)...)
|
||||
}
|
||||
|
||||
return queryRes
|
||||
}
|
||||
|
||||
func (rp *ResponseParser) transformRows(rows []Row, queryResult *tsdb.QueryResult) tsdb.TimeSeriesSlice {
|
||||
var result tsdb.TimeSeriesSlice
|
||||
|
||||
for _, row := range rows {
|
||||
for columnIndex, column := range row.Columns {
|
||||
if column == "time" {
|
||||
continue
|
||||
}
|
||||
|
||||
var points tsdb.TimeSeriesPoints
|
||||
for _, valuePair := range row.Values {
|
||||
point, err := rp.parseTimepoint(valuePair, columnIndex)
|
||||
if err == nil {
|
||||
points = append(points, point)
|
||||
}
|
||||
}
|
||||
result = append(result, &tsdb.TimeSeries{
|
||||
Name: rp.formatSerieName(row, column),
|
||||
Points: points,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func (rp *ResponseParser) formatSerieName(row Row, column string) string {
|
||||
var tags []string
|
||||
|
||||
for k, v := range row.Tags {
|
||||
tags = append(tags, fmt.Sprintf("%s: %s", k, v))
|
||||
}
|
||||
|
||||
tagText := ""
|
||||
if len(tags) > 0 {
|
||||
tagText = fmt.Sprintf(" { %s }", strings.Join(tags, " "))
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%s.%s%s", row.Name, column, tagText)
|
||||
}
|
||||
|
||||
func (rp *ResponseParser) parseTimepoint(valuePair []interface{}, valuePosition int) (tsdb.TimePoint, error) {
|
||||
var value null.Float = rp.parseValue(valuePair[valuePosition])
|
||||
|
||||
timestampNumber, _ := valuePair[0].(json.Number)
|
||||
timestamp, err := timestampNumber.Float64()
|
||||
if err != nil {
|
||||
return tsdb.TimePoint{}, err
|
||||
}
|
||||
|
||||
return tsdb.NewTimePoint(value, timestamp), nil
|
||||
}
|
||||
|
||||
func (rp *ResponseParser) parseValue(value interface{}) null.Float {
|
||||
number, ok := value.(json.Number)
|
||||
if !ok {
|
||||
return null.FloatFromPtr(nil)
|
||||
}
|
||||
|
||||
fvalue, err := number.Float64()
|
||||
if err == nil {
|
||||
return null.FloatFrom(fvalue)
|
||||
}
|
||||
|
||||
ivalue, err := number.Int64()
|
||||
if err == nil {
|
||||
return null.FloatFrom(float64(ivalue))
|
||||
}
|
||||
|
||||
return null.FloatFromPtr(nil)
|
||||
}
|
||||
59
pkg/tsdb/influxdb/response_parser_test.go
Normal file
59
pkg/tsdb/influxdb/response_parser_test.go
Normal file
@@ -0,0 +1,59 @@
|
||||
package influxdb
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
)
|
||||
|
||||
func TestInfluxdbResponseParser(t *testing.T) {
|
||||
Convey("Influxdb response parser", t, func() {
|
||||
|
||||
parser := &ResponseParser{}
|
||||
|
||||
response := &Response{
|
||||
Results: []Result{
|
||||
Result{
|
||||
Series: []Row{
|
||||
{
|
||||
Name: "cpu",
|
||||
Columns: []string{"time", "mean", "sum"},
|
||||
Tags: map[string]string{"datacenter": "America"},
|
||||
Values: [][]interface{}{
|
||||
{json.Number("111"), json.Number("222"), json.Number("333")},
|
||||
{json.Number("111"), json.Number("222"), json.Number("333")},
|
||||
{json.Number("111"), json.Number("null"), json.Number("333")},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
result := parser.Parse(response)
|
||||
|
||||
Convey("can parse all series", func() {
|
||||
So(len(result.Series), ShouldEqual, 2)
|
||||
})
|
||||
|
||||
Convey("can parse all points", func() {
|
||||
So(len(result.Series[0].Points), ShouldEqual, 3)
|
||||
So(len(result.Series[1].Points), ShouldEqual, 3)
|
||||
})
|
||||
|
||||
Convey("can parse multi row result", func() {
|
||||
So(result.Series[0].Points[1][0].Float64, ShouldEqual, float64(222))
|
||||
So(result.Series[1].Points[1][0].Float64, ShouldEqual, float64(333))
|
||||
})
|
||||
|
||||
Convey("can parse null points", func() {
|
||||
So(result.Series[0].Points[2][0].Valid, ShouldBeFalse)
|
||||
})
|
||||
|
||||
Convey("can format serie names", func() {
|
||||
So(result.Series[0].Name, ShouldEqual, "cpu.mean { datacenter: America }")
|
||||
So(result.Series[1].Name, ShouldEqual, "cpu.sum { datacenter: America }")
|
||||
})
|
||||
})
|
||||
}
|
||||
145
pkg/tsdb/interval.go
Normal file
145
pkg/tsdb/interval.go
Normal file
@@ -0,0 +1,145 @@
|
||||
package tsdb
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
defaultRes int64 = 1500
|
||||
minInterval time.Duration = 1 * time.Millisecond
|
||||
year time.Duration = time.Hour * 24 * 365
|
||||
day time.Duration = time.Hour * 24 * 365
|
||||
)
|
||||
|
||||
func CalculateInterval(timerange *TimeRange) string {
|
||||
interval := time.Duration((timerange.MustGetTo().UnixNano() - timerange.MustGetFrom().UnixNano()) / defaultRes)
|
||||
|
||||
if interval < minInterval {
|
||||
return formatDuration(minInterval)
|
||||
}
|
||||
|
||||
return formatDuration(roundInterval(interval))
|
||||
}
|
||||
|
||||
func formatDuration(inter time.Duration) string {
|
||||
if inter >= year {
|
||||
return fmt.Sprintf("%dy", inter/year)
|
||||
}
|
||||
|
||||
if inter >= day {
|
||||
return fmt.Sprintf("%dd", inter/day)
|
||||
}
|
||||
|
||||
if inter >= time.Hour {
|
||||
return fmt.Sprintf("%dh", inter/time.Hour)
|
||||
}
|
||||
|
||||
if inter >= time.Minute {
|
||||
return fmt.Sprintf("%dm", inter/time.Minute)
|
||||
}
|
||||
|
||||
if inter >= time.Second {
|
||||
return fmt.Sprintf("%ds", inter/time.Second)
|
||||
}
|
||||
|
||||
if inter >= time.Millisecond {
|
||||
return fmt.Sprintf("%dms", inter/time.Millisecond)
|
||||
}
|
||||
|
||||
return "1ms"
|
||||
}
|
||||
|
||||
func roundInterval(interval time.Duration) time.Duration {
|
||||
switch true {
|
||||
// 0.015s
|
||||
case interval <= 15*time.Millisecond:
|
||||
return time.Millisecond * 10 // 0.01s
|
||||
// 0.035s
|
||||
case interval <= 35*time.Millisecond:
|
||||
return time.Millisecond * 20 // 0.02s
|
||||
// 0.075s
|
||||
case interval <= 75*time.Millisecond:
|
||||
return time.Millisecond * 50 // 0.05s
|
||||
// 0.15s
|
||||
case interval <= 150*time.Millisecond:
|
||||
return time.Millisecond * 100 // 0.1s
|
||||
// 0.35s
|
||||
case interval <= 350*time.Millisecond:
|
||||
return time.Millisecond * 200 // 0.2s
|
||||
// 0.75s
|
||||
case interval <= 750*time.Millisecond:
|
||||
return time.Millisecond * 500 // 0.5s
|
||||
// 1.5s
|
||||
case interval <= 1500*time.Millisecond:
|
||||
return time.Millisecond * 1000 // 1s
|
||||
// 3.5s
|
||||
case interval <= 3500*time.Millisecond:
|
||||
return time.Millisecond * 2000 // 2s
|
||||
// 7.5s
|
||||
case interval <= 7500*time.Millisecond:
|
||||
return time.Millisecond * 5000 // 5s
|
||||
// 12.5s
|
||||
case interval <= 12500*time.Millisecond:
|
||||
return time.Millisecond * 10000 // 10s
|
||||
// 17.5s
|
||||
case interval <= 17500*time.Millisecond:
|
||||
return time.Millisecond * 15000 // 15s
|
||||
// 25s
|
||||
case interval <= 25000*time.Millisecond:
|
||||
return time.Millisecond * 20000 // 20s
|
||||
// 45s
|
||||
case interval <= 45000*time.Millisecond:
|
||||
return time.Millisecond * 30000 // 30s
|
||||
// 1.5m
|
||||
case interval <= 90000*time.Millisecond:
|
||||
return time.Millisecond * 60000 // 1m
|
||||
// 3.5m
|
||||
case interval <= 210000*time.Millisecond:
|
||||
return time.Millisecond * 120000 // 2m
|
||||
// 7.5m
|
||||
case interval <= 450000*time.Millisecond:
|
||||
return time.Millisecond * 300000 // 5m
|
||||
// 12.5m
|
||||
case interval <= 750000*time.Millisecond:
|
||||
return time.Millisecond * 600000 // 10m
|
||||
// 12.5m
|
||||
case interval <= 1050000*time.Millisecond:
|
||||
return time.Millisecond * 900000 // 15m
|
||||
// 25m
|
||||
case interval <= 1500000*time.Millisecond:
|
||||
return time.Millisecond * 1200000 // 20m
|
||||
// 45m
|
||||
case interval <= 2700000*time.Millisecond:
|
||||
return time.Millisecond * 1800000 // 30m
|
||||
// 1.5h
|
||||
case interval <= 5400000*time.Millisecond:
|
||||
return time.Millisecond * 3600000 // 1h
|
||||
// 2.5h
|
||||
case interval <= 9000000*time.Millisecond:
|
||||
return time.Millisecond * 7200000 // 2h
|
||||
// 4.5h
|
||||
case interval <= 16200000*time.Millisecond:
|
||||
return time.Millisecond * 10800000 // 3h
|
||||
// 9h
|
||||
case interval <= 32400000*time.Millisecond:
|
||||
return time.Millisecond * 21600000 // 6h
|
||||
// 24h
|
||||
case interval <= 86400000*time.Millisecond:
|
||||
return time.Millisecond * 43200000 // 12h
|
||||
// 48h
|
||||
case interval <= 172800000*time.Millisecond:
|
||||
return time.Millisecond * 86400000 // 24h
|
||||
// 1w
|
||||
case interval <= 604800000*time.Millisecond:
|
||||
return time.Millisecond * 86400000 // 24h
|
||||
// 3w
|
||||
case interval <= 1814400000*time.Millisecond:
|
||||
return time.Millisecond * 604800000 // 1w
|
||||
// 2y
|
||||
case interval < 3628800000*time.Millisecond:
|
||||
return time.Millisecond * 2592000000 // 30d
|
||||
default:
|
||||
return time.Millisecond * 31536000000 // 1y
|
||||
}
|
||||
}
|
||||
57
pkg/tsdb/interval_test.go
Normal file
57
pkg/tsdb/interval_test.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package tsdb
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
)
|
||||
|
||||
func TestInterval(t *testing.T) {
|
||||
Convey("Default interval ", t, func() {
|
||||
setting.NewConfigContext(&setting.CommandLineArgs{
|
||||
HomePath: "../../",
|
||||
})
|
||||
|
||||
Convey("for 5min", func() {
|
||||
tr := NewTimeRange("5m", "now")
|
||||
|
||||
interval := CalculateInterval(tr)
|
||||
So(interval, ShouldEqual, "200ms")
|
||||
})
|
||||
|
||||
Convey("for 15min", func() {
|
||||
tr := NewTimeRange("15m", "now")
|
||||
|
||||
interval := CalculateInterval(tr)
|
||||
So(interval, ShouldEqual, "500ms")
|
||||
})
|
||||
|
||||
Convey("for 30min", func() {
|
||||
tr := NewTimeRange("30m", "now")
|
||||
|
||||
interval := CalculateInterval(tr)
|
||||
So(interval, ShouldEqual, "1s")
|
||||
})
|
||||
|
||||
Convey("for 1h", func() {
|
||||
tr := NewTimeRange("1h", "now")
|
||||
|
||||
interval := CalculateInterval(tr)
|
||||
So(interval, ShouldEqual, "2s")
|
||||
})
|
||||
|
||||
Convey("Round interval", func() {
|
||||
So(roundInterval(time.Millisecond*30), ShouldEqual, time.Millisecond*20)
|
||||
So(roundInterval(time.Millisecond*45), ShouldEqual, time.Millisecond*50)
|
||||
})
|
||||
|
||||
Convey("Format value", func() {
|
||||
So(formatDuration(time.Second*61), ShouldEqual, "1m")
|
||||
So(formatDuration(time.Millisecond*30), ShouldEqual, "30ms")
|
||||
So(formatDuration(time.Hour*23), ShouldEqual, "23h")
|
||||
So(formatDuration(time.Hour*24*367), ShouldEqual, "1y")
|
||||
})
|
||||
})
|
||||
}
|
||||
@@ -52,6 +52,11 @@ type BatchResult struct {
|
||||
Timings *BatchTiming
|
||||
}
|
||||
|
||||
func (br *BatchResult) WithError(err error) *BatchResult {
|
||||
br.Error = err
|
||||
return br
|
||||
}
|
||||
|
||||
type QueryResult struct {
|
||||
Error error `json:"error"`
|
||||
RefId string `json:"refId"`
|
||||
@@ -73,15 +78,15 @@ func NewQueryResult() *QueryResult {
|
||||
}
|
||||
}
|
||||
|
||||
func NewTimePoint(value float64, timestamp float64) TimePoint {
|
||||
return TimePoint{null.FloatFrom(value), null.FloatFrom(timestamp)}
|
||||
func NewTimePoint(value null.Float, timestamp float64) TimePoint {
|
||||
return TimePoint{value, null.FloatFrom(timestamp)}
|
||||
}
|
||||
|
||||
func NewTimeSeriesPointsFromArgs(values ...float64) TimeSeriesPoints {
|
||||
points := make(TimeSeriesPoints, 0)
|
||||
|
||||
for i := 0; i < len(values); i += 2 {
|
||||
points = append(points, NewTimePoint(values[i], values[i+1]))
|
||||
points = append(points, NewTimePoint(null.FloatFrom(values[i]), values[i+1]))
|
||||
}
|
||||
|
||||
return points
|
||||
|
||||
@@ -8,6 +8,8 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"gopkg.in/guregu/null.v3"
|
||||
|
||||
"github.com/grafana/grafana/pkg/log"
|
||||
"github.com/grafana/grafana/pkg/tsdb"
|
||||
"github.com/prometheus/client_golang/api/prometheus"
|
||||
@@ -50,12 +52,12 @@ func (e *PrometheusExecutor) Execute(ctx context.Context, queries tsdb.QuerySlic
|
||||
|
||||
client, err := e.getClient()
|
||||
if err != nil {
|
||||
return resultWithError(result, err)
|
||||
return result.WithError(err)
|
||||
}
|
||||
|
||||
query, err := parseQuery(queries, queryContext)
|
||||
if err != nil {
|
||||
return resultWithError(result, err)
|
||||
return result.WithError(err)
|
||||
}
|
||||
|
||||
timeRange := prometheus.Range{
|
||||
@@ -67,12 +69,12 @@ func (e *PrometheusExecutor) Execute(ctx context.Context, queries tsdb.QuerySlic
|
||||
value, err := client.QueryRange(ctx, query.Expr, timeRange)
|
||||
|
||||
if err != nil {
|
||||
return resultWithError(result, err)
|
||||
return result.WithError(err)
|
||||
}
|
||||
|
||||
queryResult, err := parseResponse(value, query)
|
||||
if err != nil {
|
||||
return resultWithError(result, err)
|
||||
return result.WithError(err)
|
||||
}
|
||||
result.QueryResults = queryResult
|
||||
return result
|
||||
@@ -145,7 +147,7 @@ func parseResponse(value pmodel.Value, query *PrometheusQuery) (map[string]*tsdb
|
||||
}
|
||||
|
||||
for _, k := range v.Values {
|
||||
series.Points = append(series.Points, tsdb.NewTimePoint(float64(k.Value), float64(k.Timestamp.Unix()*1000)))
|
||||
series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(float64(k.Value)), float64(k.Timestamp.Unix()*1000)))
|
||||
}
|
||||
|
||||
queryRes.Series = append(queryRes.Series, &series)
|
||||
@@ -155,7 +157,8 @@ func parseResponse(value pmodel.Value, query *PrometheusQuery) (map[string]*tsdb
|
||||
return queryResults, nil
|
||||
}
|
||||
|
||||
/*
|
||||
func resultWithError(result *tsdb.BatchResult, err error) *tsdb.BatchResult {
|
||||
result.Error = err
|
||||
return result
|
||||
}
|
||||
}*/
|
||||
|
||||
4
pkg/tsdb/testdata/scenarios.go
vendored
4
pkg/tsdb/testdata/scenarios.go
vendored
@@ -44,7 +44,7 @@ func init() {
|
||||
walker := rand.Float64() * 100
|
||||
|
||||
for i := int64(0); i < 10000 && timeWalkerMs < to; i++ {
|
||||
points = append(points, tsdb.NewTimePoint(walker, float64(timeWalkerMs)))
|
||||
points = append(points, tsdb.NewTimePoint(null.FloatFrom(walker), float64(timeWalkerMs)))
|
||||
|
||||
walker += rand.Float64() - 0.5
|
||||
timeWalkerMs += query.IntervalMs
|
||||
@@ -75,7 +75,7 @@ func init() {
|
||||
series := newSeriesForQuery(query)
|
||||
outsideTime := context.TimeRange.MustGetFrom().Add(-1*time.Hour).Unix() * 1000
|
||||
|
||||
series.Points = append(series.Points, tsdb.NewTimePoint(10, float64(outsideTime)))
|
||||
series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(10), float64(outsideTime)))
|
||||
queryRes.Series = append(queryRes.Series, series)
|
||||
|
||||
return queryRes
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
"defaultMatchFormat": "regex values",
|
||||
"metrics": true,
|
||||
"annotations": true,
|
||||
"alerting": true,
|
||||
|
||||
"info": {
|
||||
"author": {
|
||||
|
||||
Reference in New Issue
Block a user