elasticsearch: refactor query handling and use new es simple client

Removes moment dependency.
Adds response parser tests (based on frontend tests).
Adds time series query tests (based on frontend tests).
Fixes various issues related to response parsing and building search request queries.
Added support for extended stats metrics and geo hash grid aggregations.
This commit is contained in:
Marcus Efraimsson 2018-05-23 15:09:58 +02:00
parent e171ed8910
commit 4840adff00
No known key found for this signature in database
GPG Key ID: EBFE0FB04612DD4A
17 changed files with 1967 additions and 2607 deletions

8
Gopkg.lock generated
View File

@ -308,12 +308,6 @@
packages = ["."]
revision = "7cafcd837844e784b526369c9bce262804aebc60"
[[projects]]
branch = "master"
name = "github.com/leibowitz/moment"
packages = ["."]
revision = "8548108dcca204a1110b99e5fec966817499fe84"
[[projects]]
branch = "master"
name = "github.com/lib/pq"
@ -667,6 +661,6 @@
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "4039f122ac5dd045948e003eb7a74c8864df1759b25147f1b2e2e8ad7a8414d6"
inputs-digest = "bd54a1a836599d90b36d4ac1af56d716ef9ca5be4865e217bddd49e3d32a1997"
solver-name = "gps-cdcl"
solver-version = 1

View File

@ -199,7 +199,3 @@ ignored = [
[[constraint]]
name = "github.com/denisenkom/go-mssqldb"
revision = "270bc3860bb94dd3a3ffd047377d746c5e276726"
[[constraint]]
branch = "master"
name = "github.com/leibowitz/moment"

View File

@ -3,17 +3,14 @@ package elasticsearch
import (
"context"
"fmt"
"net/http"
"net/url"
"path"
"strings"
"time"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/tsdb"
"github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
)
// ElasticsearchExecutor represents a handler for handling elasticsearch datasource request
type ElasticsearchExecutor struct{}
var (
@ -21,43 +18,28 @@ var (
intervalCalculator tsdb.IntervalCalculator
)
// NewElasticsearchExecutor creates a new elasticsearch executor
func NewElasticsearchExecutor(dsInfo *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
return &ElasticsearchExecutor{}, nil
}
func init() {
glog = log.New("tsdb.elasticsearch")
intervalCalculator = tsdb.NewIntervalCalculator(nil)
tsdb.RegisterTsdbQueryEndpoint("elasticsearch", NewElasticsearchExecutor)
intervalCalculator = tsdb.NewIntervalCalculator(&tsdb.IntervalOptions{MinInterval: time.Millisecond * 1})
}
// Query handles an elasticsearch datasource request
func (e *ElasticsearchExecutor) Query(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQuery) (*tsdb.Response, error) {
if len(tsdbQuery.Queries) == 0 {
return nil, fmt.Errorf("query contains no queries")
}
return e.executeTimeSeriesQuery(ctx, dsInfo, tsdbQuery)
}
func (e *ElasticsearchExecutor) createRequest(dsInfo *models.DataSource, query string) (*http.Request, error) {
u, _ := url.Parse(dsInfo.Url)
u.Path = path.Join(u.Path, "_msearch")
req, err := http.NewRequest(http.MethodPost, u.String(), strings.NewReader(query))
client, err := es.NewClient(ctx, dsInfo, tsdbQuery.TimeRange)
if err != nil {
return nil, err
}
req.Header.Set("User-Agent", "Grafana")
req.Header.Set("Content-Type", "application/json")
if dsInfo.BasicAuth {
req.SetBasicAuth(dsInfo.BasicAuthUser, dsInfo.BasicAuthPassword)
}
if !dsInfo.BasicAuth && dsInfo.User != "" {
req.SetBasicAuth(dsInfo.User, dsInfo.Password)
}
glog.Debug("Elasticsearch request", "url", req.URL.String())
glog.Debug("Elasticsearch request", "body", query)
return req, nil
query := newTimeSeriesQuery(client, tsdbQuery, intervalCalculator)
return query.execute()
}

View File

@ -1,121 +0,0 @@
package elasticsearch
import (
"github.com/grafana/grafana/pkg/components/simplejson"
"time"
)
var avgWithMovingAvg = Query{
TimeField: "timestamp",
RawQuery: "(test:query) AND (name:sample)",
Interval: time.Millisecond,
BucketAggs: []*BucketAgg{{
Field: "timestamp",
ID: "2",
Type: "date_histogram",
Settings: simplejson.NewFromAny(map[string]interface{}{
"interval": "auto",
"min_doc_count": 0,
"trimEdges": 0,
}),
}},
Metrics: []*Metric{{
Field: "value",
ID: "1",
Type: "avg",
Settings: simplejson.NewFromAny(map[string]interface{}{
"script": map[string]string{
"inline": "_value * 2",
},
}),
}, {
Field: "1",
ID: "3",
Type: "moving_avg",
PipelineAggregate: "1",
Settings: simplejson.NewFromAny(map[string]interface{}{
"minimize": false,
"model": "simple",
"window": 5,
}),
}},
}
var wildcardsAndQuotes = Query{
TimeField: "timestamp",
RawQuery: "scope:$location.leagueconnect.api AND name:*CreateRegistration AND name:\"*.201-responses.rate\"",
Interval: time.Millisecond,
BucketAggs: []*BucketAgg{{
Field: "timestamp",
ID: "2",
Type: "date_histogram",
Settings: simplejson.NewFromAny(map[string]interface{}{}),
}},
Metrics: []*Metric{{
Field: "value",
ID: "1",
Type: "sum",
Settings: simplejson.NewFromAny(map[string]interface{}{}),
}},
}
var termAggs = Query{
TimeField: "timestamp",
RawQuery: "(scope:*.hmp.metricsd) AND (name_raw:builtin.general.*_instance_count)",
Interval: time.Millisecond,
BucketAggs: []*BucketAgg{{
Field: "name_raw",
ID: "4",
Type: "terms",
Settings: simplejson.NewFromAny(map[string]interface{}{
"order": "desc",
"orderBy": "_term",
"size": "10",
}),
}, {
Field: "timestamp",
ID: "2",
Type: "date_histogram",
Settings: simplejson.NewFromAny(map[string]interface{}{
"interval": "auto",
"min_doc_count": 0,
"trimEdges": 0,
}),
}},
Metrics: []*Metric{{
Field: "value",
ID: "1",
Type: "sum",
Settings: simplejson.NewFromAny(map[string]interface{}{}),
}},
}
var filtersAggs = Query{
TimeField: "time",
RawQuery: "*",
Interval: time.Millisecond,
BucketAggs: []*BucketAgg{{
ID: "3",
Type: "filters",
Settings: simplejson.NewFromAny(map[string]interface{}{
"filters": []interface{}{
map[string]interface{}{"label": "hello", "query": "host:\"67.65.185.232\""},
},
}),
}, {
Field: "timestamp",
ID: "2",
Type: "date_histogram",
Settings: simplejson.NewFromAny(map[string]interface{}{
"interval": "auto",
"min_doc_count": 0,
"trimEdges": 0,
}),
}},
Metrics: []*Metric{{
Field: "bytesSent",
ID: "1",
Type: "count",
PipelineAggregate: "select metric",
Settings: simplejson.NewFromAny(map[string]interface{}{}),
}},
}

View File

@ -1,12 +1,21 @@
package elasticsearch
import (
"bytes"
"encoding/json"
"fmt"
"github.com/grafana/grafana/pkg/components/simplejson"
)
// Query represents the time series query model of the datasource
type Query struct {
TimeField string `json:"timeField"`
RawQuery string `json:"query"`
BucketAggs []*BucketAgg `json:"bucketAggs"`
Metrics []*MetricAgg `json:"metrics"`
Alias string `json:"alias"`
Interval string
RefID string
}
// BucketAgg represents a bucket aggregation of the time series query model of the datasource
type BucketAgg struct {
Field string `json:"field"`
ID string `json:"id"`
@ -14,120 +23,55 @@ type BucketAgg struct {
Type string `jsons:"type"`
}
type Metric struct {
// MetricAgg represents a metric aggregation of the time series query model of the datasource
type MetricAgg struct {
Field string `json:"field"`
Hide bool `json:"hide"`
ID string `json:"id"`
PipelineAggregate string `json:"pipelineAgg"`
Settings *simplejson.Json `json:"settings"`
Meta *simplejson.Json `json:"meta"`
Type string `json:"type"`
}
type QueryHeader struct {
SearchType string `json:"search_type"`
IgnoreUnavailable bool `json:"ignore_unavailable"`
Index interface{} `json:"index"`
MaxConcurrentShardRequests int `json:"max_concurrent_shard_requests,omitempty"`
var metricAggType = map[string]string{
"count": "Count",
"avg": "Average",
"sum": "Sum",
"max": "Max",
"min": "Min",
"extended_stats": "Extended Stats",
"percentiles": "Percentiles",
"cardinality": "Unique Count",
"moving_avg": "Moving Average",
"derivative": "Derivative",
"raw_document": "Raw Document",
}
func (q *QueryHeader) String() string {
r, _ := json.Marshal(q)
return string(r)
var extendedStats = map[string]string{
"avg": "Avg",
"min": "Min",
"max": "Max",
"sum": "Sum",
"count": "Count",
"std_deviation": "Std Dev",
"std_deviation_bounds_upper": "Std Dev Upper",
"std_deviation_bounds_lower": "Std Dev Lower",
}
type Request struct {
Query map[string]interface{} `json:"query"`
Aggs Aggs `json:"aggs"`
Size int `json:"size"`
var pipelineAggType = map[string]string{
"moving_avg": "moving_avg",
"derivative": "derivative",
}
type Aggs map[string]interface{}
type HistogramAgg struct {
Interval string `json:"interval,omitempty"`
Field string `json:"field"`
MinDocCount int `json:"min_doc_count"`
Missing string `json:"missing,omitempty"`
}
type DateHistogramAgg struct {
HistogramAgg
ExtendedBounds ExtendedBounds `json:"extended_bounds"`
Format string `json:"format"`
}
type FiltersAgg struct {
Filters map[string]interface{} `json:"filters"`
}
type TermsAgg struct {
Field string `json:"field"`
Size int `json:"size"`
Order map[string]interface{} `json:"order"`
Missing string `json:"missing,omitempty"`
}
type TermsAggWrap struct {
Terms TermsAgg `json:"terms"`
Aggs Aggs `json:"aggs"`
}
type ExtendedBounds struct {
Min string `json:"min"`
Max string `json:"max"`
}
type RangeFilter struct {
Range map[string]RangeFilterSetting `json:"range"`
}
type RangeFilterSetting struct {
Gte string `json:"gte"`
Lte string `json:"lte"`
Format string `json:"format"`
}
func newRangeFilter(field string, rangeFilterSetting RangeFilterSetting) *RangeFilter {
return &RangeFilter{
map[string]RangeFilterSetting{field: rangeFilterSetting}}
}
type QueryStringFilter struct {
QueryString QueryStringFilterSetting `json:"query_string"`
}
type QueryStringFilterSetting struct {
AnalyzeWildcard bool `json:"analyze_wildcard"`
Query string `json:"query"`
}
func newQueryStringFilter(analyzeWildcard bool, query string) *QueryStringFilter {
return &QueryStringFilter{QueryStringFilterSetting{AnalyzeWildcard: analyzeWildcard, Query: query}}
}
type BoolQuery struct {
Filter []interface{} `json:"filter"`
}
type Responses struct {
Responses []Response `json:"responses"`
}
type Response struct {
Status int `json:"status"`
Err map[string]interface{} `json:"error"`
Aggregations map[string]interface{} `json:"aggregations"`
}
func (r *Response) getErrMsg() string {
var msg bytes.Buffer
errJson := simplejson.NewFromAny(r.Err)
errType, err := errJson.Get("type").String()
if err == nil {
msg.WriteString(fmt.Sprintf("type:%s", errType))
func isPipelineAgg(metricType string) bool {
if _, ok := pipelineAggType[metricType]; ok {
return true
}
reason, err := errJson.Get("type").String()
if err == nil {
msg.WriteString(fmt.Sprintf("reason:%s", reason))
}
return msg.String()
return false
}
func describeMetric(metricType, field string) string {
text := metricAggType[metricType]
return text + " " + field
}

View File

@ -1,296 +0,0 @@
package elasticsearch
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/tsdb"
"github.com/leibowitz/moment"
)
var rangeFilterSetting = RangeFilterSetting{Gte: "$timeFrom",
Lte: "$timeTo",
Format: "epoch_millis"}
type Query struct {
TimeField string `json:"timeField"`
RawQuery string `json:"query"`
BucketAggs []*BucketAgg `json:"bucketAggs"`
Metrics []*Metric `json:"metrics"`
Alias string `json:"alias"`
Interval time.Duration
}
func (q *Query) Build(queryContext *tsdb.TsdbQuery, dsInfo *models.DataSource) (string, error) {
var req Request
req.Size = 0
q.renderReqQuery(&req)
// handle document query
if q.isRawDocumentQuery() {
return "", errors.New("alert not support Raw_Document")
}
err := q.parseAggs(&req)
if err != nil {
return "", err
}
reqBytes, err := json.Marshal(req)
reqHeader := getRequestHeader(queryContext.TimeRange, dsInfo)
payload := bytes.Buffer{}
payload.WriteString(reqHeader.String() + "\n")
payload.WriteString(string(reqBytes) + "\n")
return q.renderTemplate(payload.String(), queryContext)
}
func (q *Query) isRawDocumentQuery() bool {
if len(q.BucketAggs) == 0 {
if len(q.Metrics) > 0 {
metric := simplejson.NewFromAny(q.Metrics[0])
if metric.Get("type").MustString("") == "raw_document" {
return true
}
}
}
return false
}
func (q *Query) renderReqQuery(req *Request) {
req.Query = make(map[string]interface{})
boolQuery := BoolQuery{}
boolQuery.Filter = append(boolQuery.Filter, newRangeFilter(q.TimeField, rangeFilterSetting))
boolQuery.Filter = append(boolQuery.Filter, newQueryStringFilter(true, q.RawQuery))
req.Query["bool"] = boolQuery
}
func (q *Query) parseAggs(req *Request) error {
aggs := make(Aggs)
nestedAggs := aggs
for _, agg := range q.BucketAggs {
esAggs := make(Aggs)
switch agg.Type {
case "date_histogram":
esAggs["date_histogram"] = q.getDateHistogramAgg(agg)
case "histogram":
esAggs["histogram"] = q.getHistogramAgg(agg)
case "filters":
esAggs["filters"] = q.getFilters(agg)
case "terms":
terms := q.getTerms(agg)
esAggs["terms"] = terms.Terms
esAggs["aggs"] = terms.Aggs
case "geohash_grid":
return errors.New("alert not support Geo_Hash_Grid")
}
if _, ok := nestedAggs["aggs"]; !ok {
nestedAggs["aggs"] = make(Aggs)
}
if aggs, ok := (nestedAggs["aggs"]).(Aggs); ok {
aggs[agg.ID] = esAggs
}
nestedAggs = esAggs
}
nestedAggs["aggs"] = make(Aggs)
for _, metric := range q.Metrics {
subAgg := make(Aggs)
if metric.Type == "count" {
continue
}
settings := metric.Settings.MustMap(make(map[string]interface{}))
if isPipelineAgg(metric.Type) {
if _, err := strconv.Atoi(metric.PipelineAggregate); err == nil {
settings["buckets_path"] = metric.PipelineAggregate
} else {
continue
}
} else {
settings["field"] = metric.Field
}
subAgg[metric.Type] = settings
nestedAggs["aggs"].(Aggs)[metric.ID] = subAgg
}
req.Aggs = aggs["aggs"].(Aggs)
return nil
}
func (q *Query) getDateHistogramAgg(target *BucketAgg) *DateHistogramAgg {
agg := &DateHistogramAgg{}
interval, err := target.Settings.Get("interval").String()
if err == nil {
agg.Interval = interval
}
agg.Field = q.TimeField
agg.MinDocCount = target.Settings.Get("min_doc_count").MustInt(0)
agg.ExtendedBounds = ExtendedBounds{"$timeFrom", "$timeTo"}
agg.Format = "epoch_millis"
if agg.Interval == "auto" {
agg.Interval = "$__interval"
}
missing, err := target.Settings.Get("missing").String()
if err == nil {
agg.Missing = missing
}
return agg
}
func (q *Query) getHistogramAgg(target *BucketAgg) *HistogramAgg {
agg := &HistogramAgg{}
interval, err := target.Settings.Get("interval").String()
if err == nil {
agg.Interval = interval
}
if target.Field != "" {
agg.Field = target.Field
}
agg.MinDocCount = target.Settings.Get("min_doc_count").MustInt(0)
missing, err := target.Settings.Get("missing").String()
if err == nil {
agg.Missing = missing
}
return agg
}
func (q *Query) getFilters(target *BucketAgg) *FiltersAgg {
agg := &FiltersAgg{}
agg.Filters = map[string]interface{}{}
for _, filter := range target.Settings.Get("filters").MustArray() {
filterJson := simplejson.NewFromAny(filter)
query := filterJson.Get("query").MustString("")
label := filterJson.Get("label").MustString("")
if label == "" {
label = query
}
agg.Filters[label] = newQueryStringFilter(true, query)
}
return agg
}
func (q *Query) getTerms(target *BucketAgg) *TermsAggWrap {
agg := &TermsAggWrap{Aggs: make(Aggs)}
agg.Terms.Field = target.Field
if len(target.Settings.MustMap()) == 0 {
return agg
}
sizeStr := target.Settings.Get("size").MustString("")
size, err := strconv.Atoi(sizeStr)
if err != nil {
size = 500
}
agg.Terms.Size = size
orderBy, err := target.Settings.Get("orderBy").String()
if err == nil {
agg.Terms.Order = make(map[string]interface{})
agg.Terms.Order[orderBy] = target.Settings.Get("order").MustString("")
if _, err := strconv.Atoi(orderBy); err != nil {
for _, metricI := range q.Metrics {
metric := simplejson.NewFromAny(metricI)
metricId := metric.Get("id").MustString()
if metricId == orderBy {
subAggs := make(Aggs)
metricField := metric.Get("field").MustString()
metricType := metric.Get("type").MustString()
subAggs[metricType] = map[string]string{"field": metricField}
agg.Aggs = make(Aggs)
agg.Aggs[metricId] = subAggs
break
}
}
}
}
missing, err := target.Settings.Get("missing").String()
if err == nil {
agg.Terms.Missing = missing
}
return agg
}
func (q *Query) renderTemplate(payload string, queryContext *tsdb.TsdbQuery) (string, error) {
timeRange := queryContext.TimeRange
interval := intervalCalculator.Calculate(timeRange, q.Interval)
payload = strings.Replace(payload, "$timeFrom", fmt.Sprintf("%d", timeRange.GetFromAsMsEpoch()), -1)
payload = strings.Replace(payload, "$timeTo", fmt.Sprintf("%d", timeRange.GetToAsMsEpoch()), -1)
payload = strings.Replace(payload, "$interval", interval.Text, -1)
payload = strings.Replace(payload, "$__interval_ms", strconv.FormatInt(interval.Value.Nanoseconds()/int64(time.Millisecond), 10), -1)
payload = strings.Replace(payload, "$__interval", interval.Text, -1)
return payload, nil
}
func getRequestHeader(timeRange *tsdb.TimeRange, dsInfo *models.DataSource) *QueryHeader {
var header QueryHeader
esVersion := dsInfo.JsonData.Get("esVersion").MustInt()
searchType := "query_then_fetch"
if esVersion < 5 {
searchType = "count"
}
header.SearchType = searchType
header.IgnoreUnavailable = true
header.Index = getIndexList(dsInfo.Database, dsInfo.JsonData.Get("interval").MustString(), timeRange)
if esVersion >= 56 {
header.MaxConcurrentShardRequests = dsInfo.JsonData.Get("maxConcurrentShardRequests").MustInt()
}
return &header
}
func getIndexList(pattern string, interval string, timeRange *tsdb.TimeRange) string {
if interval == "" {
return pattern
}
var indexes []string
indexParts := strings.Split(strings.TrimLeft(pattern, "["), "]")
indexBase := indexParts[0]
if len(indexParts) <= 1 {
return pattern
}
indexDateFormat := indexParts[1]
start := moment.NewMoment(timeRange.MustGetFrom())
end := moment.NewMoment(timeRange.MustGetTo())
indexes = append(indexes, fmt.Sprintf("%s%s", indexBase, start.Format(indexDateFormat)))
for start.IsBefore(*end) {
switch interval {
case "Hourly":
start = start.AddHours(1)
case "Daily":
start = start.AddDay()
case "Weekly":
start = start.AddWeeks(1)
case "Monthly":
start = start.AddMonths(1)
case "Yearly":
start = start.AddYears(1)
}
indexes = append(indexes, fmt.Sprintf("%s%s", indexBase, start.Format(indexDateFormat)))
}
return strings.Join(indexes, ",")
}

View File

@ -1,43 +0,0 @@
package elasticsearch
var metricAggType = map[string]string{
"count": "Count",
"avg": "Average",
"sum": "Sum",
"max": "Max",
"min": "Min",
"extended_stats": "Extended Stats",
"percentiles": "Percentiles",
"cardinality": "Unique Count",
"moving_avg": "Moving Average",
"derivative": "Derivative",
"raw_document": "Raw Document",
}
var extendedStats = map[string]string{
"avg": "Avg",
"min": "Min",
"max": "Max",
"sum": "Sum",
"count": "Count",
"std_deviation": "Std Dev",
"std_deviation_bounds_upper": "Std Dev Upper",
"std_deviation_bounds_lower": "Std Dev Lower",
}
var pipelineOptions = map[string]string{
"moving_avg": "moving_avg",
"derivative": "derivative",
}
func isPipelineAgg(metricType string) bool {
if _, ok := pipelineOptions[metricType]; ok {
return true
}
return false
}
func describeMetric(metricType, field string) string {
text := metricAggType[metricType]
return text + " " + field
}

View File

@ -1,297 +0,0 @@
package elasticsearch
import (
"encoding/json"
"fmt"
"reflect"
"strconv"
"strings"
"testing"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/tsdb"
. "github.com/smartystreets/goconvey/convey"
)
func testElasticSearchResponse(query Query, expectedElasticSearchRequestJSON string) {
var queryExpectedJSONInterface, queryJSONInterface interface{}
jsonDate, _ := simplejson.NewJson([]byte(`{"esVersion":2}`))
dsInfo := &models.DataSource{
Database: "grafana-test",
JsonData: jsonDate,
}
testTimeRange := tsdb.NewTimeRange("5m", "now")
s, err := query.Build(&tsdb.TsdbQuery{TimeRange: testTimeRange}, dsInfo)
So(err, ShouldBeNil)
queryJSON := strings.Split(s, "\n")[1]
err = json.Unmarshal([]byte(queryJSON), &queryJSONInterface)
So(err, ShouldBeNil)
expectedElasticSearchRequestJSON = strings.Replace(
expectedElasticSearchRequestJSON,
"<FROM_TIMESTAMP>",
strconv.FormatInt(testTimeRange.GetFromAsMsEpoch(), 10),
-1,
)
expectedElasticSearchRequestJSON = strings.Replace(
expectedElasticSearchRequestJSON,
"<TO_TIMESTAMP>",
strconv.FormatInt(testTimeRange.GetToAsMsEpoch(), 10),
-1,
)
err = json.Unmarshal([]byte(expectedElasticSearchRequestJSON), &queryExpectedJSONInterface)
So(err, ShouldBeNil)
result := reflect.DeepEqual(queryExpectedJSONInterface, queryJSONInterface)
if !result {
fmt.Printf("ERROR: %s \n != \n %s", expectedElasticSearchRequestJSON, queryJSON)
}
So(result, ShouldBeTrue)
}
func TestElasticSearchQueryBuilder(t *testing.T) {
Convey("Elasticsearch QueryBuilder query testing", t, func() {
Convey("Build test average metric with moving average", func() {
var expectedElasticsearchQueryJSON = `
{
"size": 0,
"query": {
"bool": {
"filter": [
{
"range": {
"timestamp": {
"gte": "<FROM_TIMESTAMP>",
"lte": "<TO_TIMESTAMP>",
"format": "epoch_millis"
}
}
},
{
"query_string": {
"analyze_wildcard": true,
"query": "(test:query) AND (name:sample)"
}
}
]
}
},
"aggs": {
"2": {
"date_histogram": {
"interval": "200ms",
"field": "timestamp",
"min_doc_count": 0,
"extended_bounds": {
"min": "<FROM_TIMESTAMP>",
"max": "<TO_TIMESTAMP>"
},
"format": "epoch_millis"
},
"aggs": {
"1": {
"avg": {
"field": "value",
"script": {
"inline": "_value * 2"
}
}
},
"3": {
"moving_avg": {
"buckets_path": "1",
"window": 5,
"model": "simple",
"minimize": false
}
}
}
}
}
}`
testElasticSearchResponse(avgWithMovingAvg, expectedElasticsearchQueryJSON)
})
Convey("Test Wildcards and Quotes", func() {
expectedElasticsearchQueryJSON := `
{
"size": 0,
"query": {
"bool": {
"filter": [
{
"range": {
"timestamp": {
"gte": "<FROM_TIMESTAMP>",
"lte": "<TO_TIMESTAMP>",
"format": "epoch_millis"
}
}
},
{
"query_string": {
"analyze_wildcard": true,
"query": "scope:$location.leagueconnect.api AND name:*CreateRegistration AND name:\"*.201-responses.rate\""
}
}
]
}
},
"aggs": {
"2": {
"aggs": {
"1": {
"sum": {
"field": "value"
}
}
},
"date_histogram": {
"extended_bounds": {
"max": "<TO_TIMESTAMP>",
"min": "<FROM_TIMESTAMP>"
},
"field": "timestamp",
"format": "epoch_millis",
"min_doc_count": 0
}
}
}
}`
testElasticSearchResponse(wildcardsAndQuotes, expectedElasticsearchQueryJSON)
})
Convey("Test Term Aggregates", func() {
expectedElasticsearchQueryJSON := `
{
"size": 0,
"query": {
"bool": {
"filter": [
{
"range": {
"timestamp": {
"gte": "<FROM_TIMESTAMP>",
"lte": "<TO_TIMESTAMP>",
"format": "epoch_millis"
}
}
},
{
"query_string": {
"analyze_wildcard": true,
"query": "(scope:*.hmp.metricsd) AND (name_raw:builtin.general.*_instance_count)"
}
}
]
}
},
"aggs": {"4":{"aggs":{"2":{"aggs":{"1":{"sum":{"field":"value"}}},"date_histogram":{"extended_bounds":{"max":"<TO_TIMESTAMP>","min":"<FROM_TIMESTAMP>"},"field":"timestamp","format":"epoch_millis","interval":"200ms","min_doc_count":0}}},"terms":{"field":"name_raw","order":{"_term":"desc"},"size":10}}}
}`
testElasticSearchResponse(termAggs, expectedElasticsearchQueryJSON)
})
Convey("Test Filters Aggregates", func() {
expectedElasticsearchQueryJSON := `{
"size": 0,
"query": {
"bool": {
"filter": [
{
"range": {
"time": {
"gte": "<FROM_TIMESTAMP>",
"lte": "<TO_TIMESTAMP>",
"format": "epoch_millis"
}
}
},
{
"query_string": {
"analyze_wildcard": true,
"query": "*"
}
}
]
}
},
"aggs": {
"3": {
"filters": {
"filters": {
"hello": {
"query_string": {
"query": "host:\"67.65.185.232\"",
"analyze_wildcard": true
}
}
}
},
"aggs": {
"2": {
"date_histogram": {
"interval": "200ms",
"field": "time",
"min_doc_count": 0,
"extended_bounds": {
"min": "<FROM_TIMESTAMP>",
"max": "<TO_TIMESTAMP>"
},
"format": "epoch_millis"
},
"aggs": {}
}
}
}
}
}
`
testElasticSearchResponse(filtersAggs, expectedElasticsearchQueryJSON)
})
})
}
func makeTime(hour int) string {
//unixtime 1500000000 == 2017-07-14T02:40:00+00:00
return strconv.Itoa((1500000000 + hour*60*60) * 1000)
}
func getIndexListByTime(pattern string, interval string, hour int) string {
timeRange := &tsdb.TimeRange{
From: makeTime(0),
To: makeTime(hour),
}
return getIndexList(pattern, interval, timeRange)
}
func TestElasticsearchGetIndexList(t *testing.T) {
Convey("Test Elasticsearch getIndex ", t, func() {
Convey("Parse Interval Formats", func() {
So(getIndexListByTime("[logstash-]YYYY.MM.DD", "Daily", 48),
ShouldEqual, "logstash-2017.07.14,logstash-2017.07.15,logstash-2017.07.16")
So(len(strings.Split(getIndexListByTime("[logstash-]YYYY.MM.DD.HH", "Hourly", 3), ",")),
ShouldEqual, 4)
So(getIndexListByTime("[logstash-]YYYY.W", "Weekly", 100),
ShouldEqual, "logstash-2017.28,logstash-2017.29")
So(getIndexListByTime("[logstash-]YYYY.MM", "Monthly", 700),
ShouldEqual, "logstash-2017.07,logstash-2017.08")
So(getIndexListByTime("[logstash-]YYYY", "Yearly", 10000),
ShouldEqual, "logstash-2017,logstash-2018,logstash-2019")
})
Convey("No Interval", func() {
index := getIndexListByTime("logstash-test", "", 1)
So(index, ShouldEqual, "logstash-test")
})
})
}

View File

@ -2,39 +2,79 @@ package elasticsearch
import (
"errors"
"fmt"
"regexp"
"sort"
"strconv"
"strings"
"github.com/grafana/grafana/pkg/components/null"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/tsdb"
"regexp"
"strconv"
"strings"
"github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
)
type ElasticsearchResponseParser struct {
Responses []Response
type responseParser struct {
Responses []*es.SearchResponse
Targets []*Query
}
func (rp *ElasticsearchResponseParser) getTimeSeries() *tsdb.QueryResult {
queryRes := tsdb.NewQueryResult()
for i, res := range rp.Responses {
target := rp.Targets[i]
props := make(map[string]string)
series := make([]*tsdb.TimeSeries, 0)
rp.processBuckets(res.Aggregations, target, &series, props, 0)
rp.nameSeries(&series, target)
queryRes.Series = append(queryRes.Series, series...)
var newResponseParser = func(responses []*es.SearchResponse, targets []*Query) *responseParser {
return &responseParser{
Responses: responses,
Targets: targets,
}
return queryRes
}
func (rp *ElasticsearchResponseParser) processBuckets(aggs map[string]interface{}, target *Query, series *[]*tsdb.TimeSeries, props map[string]string, depth int) error {
func (rp *responseParser) getTimeSeries() (*tsdb.Response, error) {
result := &tsdb.Response{}
result.Results = make(map[string]*tsdb.QueryResult)
if rp.Responses == nil {
return result, nil
}
for i, res := range rp.Responses {
target := rp.Targets[i]
if res.Error != nil {
result.Results[target.RefID] = getErrorFromElasticResponse(res)
continue
}
queryRes := tsdb.NewQueryResult()
props := make(map[string]string)
table := tsdb.Table{
Columns: make([]tsdb.TableColumn, 0),
Rows: make([]tsdb.RowValues, 0),
}
err := rp.processBuckets(res.Aggregations, target, &queryRes.Series, &table, props, 0)
if err != nil {
return nil, err
}
rp.nameSeries(&queryRes.Series, target)
rp.trimDatapoints(&queryRes.Series, target)
if len(table.Rows) > 0 {
queryRes.Tables = append(queryRes.Tables, &table)
}
result.Results[target.RefID] = queryRes
}
return result, nil
}
func (rp *responseParser) processBuckets(aggs map[string]interface{}, target *Query, series *tsdb.TimeSeriesSlice, table *tsdb.Table, props map[string]string, depth int) error {
var err error
maxDepth := len(target.BucketAggs) - 1
for aggId, v := range aggs {
aggDef, _ := findAgg(target, aggId)
aggIDs := make([]string, 0)
for k := range aggs {
aggIDs = append(aggIDs, k)
}
sort.Strings(aggIDs)
for _, aggID := range aggIDs {
v := aggs[aggID]
aggDef, _ := findAgg(target, aggID)
esAgg := simplejson.NewFromAny(v)
if aggDef == nil {
continue
@ -43,26 +83,50 @@ func (rp *ElasticsearchResponseParser) processBuckets(aggs map[string]interface{
if depth == maxDepth {
if aggDef.Type == "date_histogram" {
err = rp.processMetrics(esAgg, target, series, props)
if err != nil {
return err
}
} else {
return fmt.Errorf("not support type:%s", aggDef.Type)
err = rp.processAggregationDocs(esAgg, aggDef, target, table, props)
}
if err != nil {
return err
}
} else {
for i, b := range esAgg.Get("buckets").MustArray() {
for _, b := range esAgg.Get("buckets").MustArray() {
bucket := simplejson.NewFromAny(b)
newProps := props
newProps := make(map[string]string, 0)
for k, v := range props {
newProps[k] = v
}
if key, err := bucket.Get("key").String(); err == nil {
newProps[aggDef.Field] = key
} else {
props["filter"] = strconv.Itoa(i)
} else if key, err := bucket.Get("key").Int64(); err == nil {
newProps[aggDef.Field] = strconv.FormatInt(key, 10)
}
if key, err := bucket.Get("key_as_string").String(); err == nil {
props[aggDef.Field] = key
newProps[aggDef.Field] = key
}
err = rp.processBuckets(bucket.MustMap(), target, series, table, newProps, depth+1)
if err != nil {
return err
}
}
for k, v := range esAgg.Get("buckets").MustMap() {
bucket := simplejson.NewFromAny(v)
newProps := make(map[string]string, 0)
for k, v := range props {
newProps[k] = v
}
newProps["filter"] = k
err = rp.processBuckets(bucket.MustMap(), target, series, table, newProps, depth+1)
if err != nil {
return err
}
rp.processBuckets(bucket.MustMap(), target, series, newProps, depth+1)
}
}
@ -71,7 +135,7 @@ func (rp *ElasticsearchResponseParser) processBuckets(aggs map[string]interface{
}
func (rp *ElasticsearchResponseParser) processMetrics(esAgg *simplejson.Json, target *Query, series *[]*tsdb.TimeSeries, props map[string]string) error {
func (rp *responseParser) processMetrics(esAgg *simplejson.Json, target *Query, series *tsdb.TimeSeriesSlice, props map[string]string) error {
for _, metric := range target.Metrics {
if metric.Hide {
continue
@ -79,14 +143,20 @@ func (rp *ElasticsearchResponseParser) processMetrics(esAgg *simplejson.Json, ta
switch metric.Type {
case "count":
newSeries := tsdb.TimeSeries{}
newSeries := tsdb.TimeSeries{
Tags: make(map[string]string),
}
for _, v := range esAgg.Get("buckets").MustArray() {
bucket := simplejson.NewFromAny(v)
value := castToNullFloat(bucket.Get("doc_count"))
key := castToNullFloat(bucket.Get("key"))
newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
}
newSeries.Tags = props
for k, v := range props {
newSeries.Tags[k] = v
}
newSeries.Tags["metric"] = "count"
*series = append(*series, &newSeries)
@ -99,9 +169,18 @@ func (rp *ElasticsearchResponseParser) processMetrics(esAgg *simplejson.Json, ta
firstBucket := simplejson.NewFromAny(buckets[0])
percentiles := firstBucket.GetPath(metric.ID, "values").MustMap()
for percentileName := range percentiles {
newSeries := tsdb.TimeSeries{}
newSeries.Tags = props
percentileKeys := make([]string, 0)
for k := range percentiles {
percentileKeys = append(percentileKeys, k)
}
sort.Strings(percentileKeys)
for _, percentileName := range percentileKeys {
newSeries := tsdb.TimeSeries{
Tags: make(map[string]string),
}
for k, v := range props {
newSeries.Tags[k] = v
}
newSeries.Tags["metric"] = "p" + percentileName
newSeries.Tags["field"] = metric.Field
for _, v := range buckets {
@ -112,9 +191,49 @@ func (rp *ElasticsearchResponseParser) processMetrics(esAgg *simplejson.Json, ta
}
*series = append(*series, &newSeries)
}
case "extended_stats":
buckets := esAgg.Get("buckets").MustArray()
metaKeys := make([]string, 0)
meta := metric.Meta.MustMap()
for k := range meta {
metaKeys = append(metaKeys, k)
}
sort.Strings(metaKeys)
for _, statName := range metaKeys {
v := meta[statName]
if enabled, ok := v.(bool); !ok || !enabled {
continue
}
newSeries := tsdb.TimeSeries{
Tags: make(map[string]string),
}
for k, v := range props {
newSeries.Tags[k] = v
}
newSeries.Tags["metric"] = statName
newSeries.Tags["field"] = metric.Field
for _, v := range buckets {
bucket := simplejson.NewFromAny(v)
key := castToNullFloat(bucket.Get("key"))
var value null.Float
if statName == "std_deviation_bounds_upper" {
value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper"))
} else if statName == "std_deviation_bounds_lower" {
value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower"))
} else {
value = castToNullFloat(bucket.GetPath(metric.ID, statName))
}
newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
}
*series = append(*series, &newSeries)
}
default:
newSeries := tsdb.TimeSeries{}
newSeries.Tags = map[string]string{}
newSeries := tsdb.TimeSeries{
Tags: make(map[string]string),
}
for k, v := range props {
newSeries.Tags[k] = v
}
@ -142,7 +261,129 @@ func (rp *ElasticsearchResponseParser) processMetrics(esAgg *simplejson.Json, ta
return nil
}
func (rp *ElasticsearchResponseParser) nameSeries(seriesList *[]*tsdb.TimeSeries, target *Query) {
func (rp *responseParser) processAggregationDocs(esAgg *simplejson.Json, aggDef *BucketAgg, target *Query, table *tsdb.Table, props map[string]string) error {
propKeys := make([]string, 0)
for k := range props {
propKeys = append(propKeys, k)
}
sort.Strings(propKeys)
if len(table.Columns) == 0 {
for _, propKey := range propKeys {
table.Columns = append(table.Columns, tsdb.TableColumn{Text: propKey})
}
table.Columns = append(table.Columns, tsdb.TableColumn{Text: aggDef.Field})
}
addMetricValue := func(values *tsdb.RowValues, metricName string, value null.Float) {
found := false
for _, c := range table.Columns {
if c.Text == metricName {
found = true
break
}
}
if !found {
table.Columns = append(table.Columns, tsdb.TableColumn{Text: metricName})
}
*values = append(*values, value)
}
for _, v := range esAgg.Get("buckets").MustArray() {
bucket := simplejson.NewFromAny(v)
values := make(tsdb.RowValues, 0)
for _, propKey := range propKeys {
values = append(values, props[propKey])
}
if key, err := bucket.Get("key").String(); err == nil {
values = append(values, key)
} else {
values = append(values, castToNullFloat(bucket.Get("key")))
}
for _, metric := range target.Metrics {
switch metric.Type {
case "count":
addMetricValue(&values, rp.getMetricName(metric.Type), castToNullFloat(bucket.Get("doc_count")))
break
case "extended_stats":
metaKeys := make([]string, 0)
meta := metric.Meta.MustMap()
for k := range meta {
metaKeys = append(metaKeys, k)
}
sort.Strings(metaKeys)
for _, statName := range metaKeys {
v := meta[statName]
if enabled, ok := v.(bool); !ok || !enabled {
continue
}
var value null.Float
if statName == "std_deviation_bounds_upper" {
value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper"))
} else if statName == "std_deviation_bounds_lower" {
value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower"))
} else {
value = castToNullFloat(bucket.GetPath(metric.ID, statName))
}
addMetricValue(&values, rp.getMetricName(metric.Type), value)
break
}
default:
metricName := rp.getMetricName(metric.Type)
otherMetrics := make([]*MetricAgg, 0)
for _, m := range target.Metrics {
if m.Type == metric.Type {
otherMetrics = append(otherMetrics, m)
}
}
if len(otherMetrics) > 1 {
metricName += " " + metric.Field
}
addMetricValue(&values, metricName, castToNullFloat(bucket.GetPath(metric.ID, "value")))
break
}
}
table.Rows = append(table.Rows, values)
}
return nil
}
func (rp *responseParser) trimDatapoints(series *tsdb.TimeSeriesSlice, target *Query) {
var histogram *BucketAgg
for _, bucketAgg := range target.BucketAggs {
if bucketAgg.Type == "date_histogram" {
histogram = bucketAgg
break
}
}
if histogram == nil {
return
}
trimEdges, err := histogram.Settings.Get("trimEdges").Int()
if err != nil {
return
}
for _, s := range *series {
if len(s.Points) > trimEdges*2 {
s.Points = s.Points[trimEdges : len(s.Points)-trimEdges]
}
}
}
func (rp *responseParser) nameSeries(seriesList *tsdb.TimeSeriesSlice, target *Query) {
set := make(map[string]string)
for _, v := range *seriesList {
if metricType, exists := v.Tags["metric"]; exists {
@ -158,7 +399,9 @@ func (rp *ElasticsearchResponseParser) nameSeries(seriesList *[]*tsdb.TimeSeries
}
func (rp *ElasticsearchResponseParser) getSeriesName(series *tsdb.TimeSeries, target *Query, metricTypeCount int) string {
var aliasPatternRegex = regexp.MustCompile(`\{\{([\s\S]+?)\}\}`)
func (rp *responseParser) getSeriesName(series *tsdb.TimeSeries, target *Query, metricTypeCount int) string {
metricType := series.Tags["metric"]
metricName := rp.getMetricName(metricType)
delete(series.Tags, "metric")
@ -170,27 +413,31 @@ func (rp *ElasticsearchResponseParser) getSeriesName(series *tsdb.TimeSeries, ta
}
if target.Alias != "" {
var re = regexp.MustCompile(`{{([\s\S]+?)}}`)
for _, match := range re.FindAllString(target.Alias, -1) {
group := match[2 : len(match)-2]
seriesName := target.Alias
if strings.HasPrefix(group, "term ") {
if term, ok := series.Tags["term "]; ok {
strings.Replace(target.Alias, match, term, 1)
}
subMatches := aliasPatternRegex.FindAllStringSubmatch(target.Alias, -1)
for _, subMatch := range subMatches {
group := subMatch[0]
if len(subMatch) > 1 {
group = subMatch[1]
}
if strings.Index(group, "term ") == 0 {
seriesName = strings.Replace(seriesName, subMatch[0], series.Tags[group[5:]], 1)
}
if v, ok := series.Tags[group]; ok {
strings.Replace(target.Alias, match, v, 1)
seriesName = strings.Replace(seriesName, subMatch[0], v, 1)
}
switch group {
case "metric":
strings.Replace(target.Alias, match, metricName, 1)
case "field":
strings.Replace(target.Alias, match, field, 1)
if group == "metric" {
seriesName = strings.Replace(seriesName, subMatch[0], metricName, 1)
}
if group == "field" {
seriesName = strings.Replace(seriesName, subMatch[0], field, 1)
}
}
return seriesName
}
// todo, if field and pipelineAgg
if field != "" && isPipelineAgg(metricType) {
@ -204,7 +451,6 @@ func (rp *ElasticsearchResponseParser) getSeriesName(series *tsdb.TimeSeries, ta
if !found {
metricName = "Unset"
}
} else if field != "" {
metricName += " " + field
}
@ -226,7 +472,7 @@ func (rp *ElasticsearchResponseParser) getSeriesName(series *tsdb.TimeSeries, ta
}
func (rp *ElasticsearchResponseParser) getMetricName(metric string) string {
func (rp *responseParser) getMetricName(metric string) string {
if text, ok := metricAggType[metric]; ok {
return text
}
@ -253,11 +499,28 @@ func castToNullFloat(j *simplejson.Json) null.Float {
return null.NewFloat(0, false)
}
func findAgg(target *Query, aggId string) (*BucketAgg, error) {
func findAgg(target *Query, aggID string) (*BucketAgg, error) {
for _, v := range target.BucketAggs {
if aggId == v.ID {
if aggID == v.ID {
return v, nil
}
}
return nil, errors.New("can't found aggDef, aggID:" + aggId)
return nil, errors.New("can't found aggDef, aggID:" + aggID)
}
func getErrorFromElasticResponse(response *es.SearchResponse) *tsdb.QueryResult {
result := tsdb.NewQueryResult()
json := simplejson.NewFromAny(response.Error)
reason := json.Get("reason").MustString()
rootCauseReason := json.Get("root_cause").GetIndex(0).Get("reason").MustString()
if rootCauseReason != "" {
result.ErrorString = rootCauseReason
} else if reason != "" {
result.ErrorString = reason
} else {
result.ErrorString = "Unkown elasticsearch error response"
}
return result
}

View File

@ -2,109 +2,879 @@ package elasticsearch
import (
"encoding/json"
"fmt"
"testing"
"time"
"github.com/grafana/grafana/pkg/components/null"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
"github.com/grafana/grafana/pkg/tsdb"
. "github.com/smartystreets/goconvey/convey"
)
func testElasticsearchResponse(body string, target Query) *tsdb.QueryResult {
var responses Responses
err := json.Unmarshal([]byte(body), &responses)
So(err, ShouldBeNil)
responseParser := ElasticsearchResponseParser{responses.Responses, []*Query{&target}}
return responseParser.getTimeSeries()
}
func TestElasticSearchResponseParser(t *testing.T) {
Convey("Elasticsearch Response query testing", t, func() {
Convey("Build test average metric with moving average", func() {
responses := `{
"responses": [
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 4500,
"max_score": 0,
"hits": []
},
"aggregations": {
"2": {
"buckets": [
{
"1": {
"value": null
},
"key_as_string": "1522205880000",
"key": 1522205880000,
"doc_count": 0
},
{
"1": {
"value": 10
},
"key_as_string": "1522205940000",
"key": 1522205940000,
"doc_count": 300
},
{
"1": {
"value": 10
},
"3": {
"value": 20
},
"key_as_string": "1522206000000",
"key": 1522206000000,
"doc_count": 300
},
{
"1": {
"value": 10
},
"3": {
"value": 20
},
"key_as_string": "1522206060000",
"key": 1522206060000,
"doc_count": 300
func TestResponseParser(t *testing.T) {
Convey("Elasticsearch response parser test", t, func() {
Convey("Simple query and count", func() {
targets := map[string]string{
"A": `{
"timeField": "@timestamp",
"metrics": [{ "type": "count", "id": "1" }],
"bucketAggs": [{ "type": "date_histogram", "field": "@timestamp", "id": "2" }]
}`,
}
response := `{
"responses": [
{
"aggregations": {
"2": {
"buckets": [
{
"doc_count": 10,
"key": 1000
},
{
"doc_count": 15,
"key": 2000
}
]
}
}
]
}
},
"status": 200
}
]
}
`
res := testElasticsearchResponse(responses, avgWithMovingAvg)
So(len(res.Series), ShouldEqual, 2)
So(res.Series[0].Name, ShouldEqual, "Average value")
So(len(res.Series[0].Points), ShouldEqual, 4)
for i, p := range res.Series[0].Points {
if i == 0 {
So(p[0].Valid, ShouldBeFalse)
} else {
So(p[0].Float64, ShouldEqual, 10)
}
So(p[1].Float64, ShouldEqual, 1522205880000+60000*i)
}
So(res.Series[1].Name, ShouldEqual, "Moving Average Average 1")
So(len(res.Series[1].Points), ShouldEqual, 2)
for _, p := range res.Series[1].Points {
So(p[0].Float64, ShouldEqual, 20)
}
}
]
}`
rp, err := newResponseParserForTest(targets, response)
So(err, ShouldBeNil)
result, err := rp.getTimeSeries()
So(err, ShouldBeNil)
So(result.Results, ShouldHaveLength, 1)
queryRes := result.Results["A"]
So(queryRes, ShouldNotBeNil)
So(queryRes.Series, ShouldHaveLength, 1)
series := queryRes.Series[0]
So(series.Name, ShouldEqual, "Count")
So(series.Points, ShouldHaveLength, 2)
So(series.Points[0][0].Float64, ShouldEqual, 10)
So(series.Points[0][1].Float64, ShouldEqual, 1000)
So(series.Points[1][0].Float64, ShouldEqual, 15)
So(series.Points[1][1].Float64, ShouldEqual, 2000)
})
Convey("Simple query count & avg aggregation", func() {
targets := map[string]string{
"A": `{
"timeField": "@timestamp",
"metrics": [{ "type": "count", "id": "1" }, {"type": "avg", "field": "value", "id": "2" }],
"bucketAggs": [{ "type": "date_histogram", "field": "@timestamp", "id": "3" }]
}`,
}
response := `{
"responses": [
{
"aggregations": {
"3": {
"buckets": [
{
"2": { "value": 88 },
"doc_count": 10,
"key": 1000
},
{
"2": { "value": 99 },
"doc_count": 15,
"key": 2000
}
]
}
}
}
]
}`
rp, err := newResponseParserForTest(targets, response)
So(err, ShouldBeNil)
result, err := rp.getTimeSeries()
So(err, ShouldBeNil)
So(result.Results, ShouldHaveLength, 1)
queryRes := result.Results["A"]
So(queryRes, ShouldNotBeNil)
So(queryRes.Series, ShouldHaveLength, 2)
seriesOne := queryRes.Series[0]
So(seriesOne.Name, ShouldEqual, "Count")
So(seriesOne.Points, ShouldHaveLength, 2)
So(seriesOne.Points[0][0].Float64, ShouldEqual, 10)
So(seriesOne.Points[0][1].Float64, ShouldEqual, 1000)
So(seriesOne.Points[1][0].Float64, ShouldEqual, 15)
So(seriesOne.Points[1][1].Float64, ShouldEqual, 2000)
seriesTwo := queryRes.Series[1]
So(seriesTwo.Name, ShouldEqual, "Average value")
So(seriesTwo.Points, ShouldHaveLength, 2)
So(seriesTwo.Points[0][0].Float64, ShouldEqual, 88)
So(seriesTwo.Points[0][1].Float64, ShouldEqual, 1000)
So(seriesTwo.Points[1][0].Float64, ShouldEqual, 99)
So(seriesTwo.Points[1][1].Float64, ShouldEqual, 2000)
})
Convey("Single group by query one metric", func() {
targets := map[string]string{
"A": `{
"timeField": "@timestamp",
"metrics": [{ "type": "count", "id": "1" }],
"bucketAggs": [
{ "type": "terms", "field": "host", "id": "2" },
{ "type": "date_histogram", "field": "@timestamp", "id": "3" }
]
}`,
}
response := `{
"responses": [
{
"aggregations": {
"2": {
"buckets": [
{
"3": {
"buckets": [{ "doc_count": 1, "key": 1000 }, { "doc_count": 3, "key": 2000 }]
},
"doc_count": 4,
"key": "server1"
},
{
"3": {
"buckets": [{ "doc_count": 2, "key": 1000 }, { "doc_count": 8, "key": 2000 }]
},
"doc_count": 10,
"key": "server2"
}
]
}
}
}
]
}`
rp, err := newResponseParserForTest(targets, response)
So(err, ShouldBeNil)
result, err := rp.getTimeSeries()
So(err, ShouldBeNil)
So(result.Results, ShouldHaveLength, 1)
queryRes := result.Results["A"]
So(queryRes, ShouldNotBeNil)
So(queryRes.Series, ShouldHaveLength, 2)
seriesOne := queryRes.Series[0]
So(seriesOne.Name, ShouldEqual, "server1")
So(seriesOne.Points, ShouldHaveLength, 2)
So(seriesOne.Points[0][0].Float64, ShouldEqual, 1)
So(seriesOne.Points[0][1].Float64, ShouldEqual, 1000)
So(seriesOne.Points[1][0].Float64, ShouldEqual, 3)
So(seriesOne.Points[1][1].Float64, ShouldEqual, 2000)
seriesTwo := queryRes.Series[1]
So(seriesTwo.Name, ShouldEqual, "server2")
So(seriesTwo.Points, ShouldHaveLength, 2)
So(seriesTwo.Points[0][0].Float64, ShouldEqual, 2)
So(seriesTwo.Points[0][1].Float64, ShouldEqual, 1000)
So(seriesTwo.Points[1][0].Float64, ShouldEqual, 8)
So(seriesTwo.Points[1][1].Float64, ShouldEqual, 2000)
})
Convey("Single group by query two metrics", func() {
targets := map[string]string{
"A": `{
"timeField": "@timestamp",
"metrics": [{ "type": "count", "id": "1" }, { "type": "avg", "field": "@value", "id": "4" }],
"bucketAggs": [
{ "type": "terms", "field": "host", "id": "2" },
{ "type": "date_histogram", "field": "@timestamp", "id": "3" }
]
}`,
}
response := `{
"responses": [
{
"aggregations": {
"2": {
"buckets": [
{
"3": {
"buckets": [
{ "4": { "value": 10 }, "doc_count": 1, "key": 1000 },
{ "4": { "value": 12 }, "doc_count": 3, "key": 2000 }
]
},
"doc_count": 4,
"key": "server1"
},
{
"3": {
"buckets": [
{ "4": { "value": 20 }, "doc_count": 1, "key": 1000 },
{ "4": { "value": 32 }, "doc_count": 3, "key": 2000 }
]
},
"doc_count": 10,
"key": "server2"
}
]
}
}
}
]
}`
rp, err := newResponseParserForTest(targets, response)
So(err, ShouldBeNil)
result, err := rp.getTimeSeries()
So(err, ShouldBeNil)
So(result.Results, ShouldHaveLength, 1)
queryRes := result.Results["A"]
So(queryRes, ShouldNotBeNil)
So(queryRes.Series, ShouldHaveLength, 4)
seriesOne := queryRes.Series[0]
So(seriesOne.Name, ShouldEqual, "server1 Count")
So(seriesOne.Points, ShouldHaveLength, 2)
So(seriesOne.Points[0][0].Float64, ShouldEqual, 1)
So(seriesOne.Points[0][1].Float64, ShouldEqual, 1000)
So(seriesOne.Points[1][0].Float64, ShouldEqual, 3)
So(seriesOne.Points[1][1].Float64, ShouldEqual, 2000)
seriesTwo := queryRes.Series[1]
So(seriesTwo.Name, ShouldEqual, "server1 Average @value")
So(seriesTwo.Points, ShouldHaveLength, 2)
So(seriesTwo.Points[0][0].Float64, ShouldEqual, 10)
So(seriesTwo.Points[0][1].Float64, ShouldEqual, 1000)
So(seriesTwo.Points[1][0].Float64, ShouldEqual, 12)
So(seriesTwo.Points[1][1].Float64, ShouldEqual, 2000)
seriesThree := queryRes.Series[2]
So(seriesThree.Name, ShouldEqual, "server2 Count")
So(seriesThree.Points, ShouldHaveLength, 2)
So(seriesThree.Points[0][0].Float64, ShouldEqual, 1)
So(seriesThree.Points[0][1].Float64, ShouldEqual, 1000)
So(seriesThree.Points[1][0].Float64, ShouldEqual, 3)
So(seriesThree.Points[1][1].Float64, ShouldEqual, 2000)
seriesFour := queryRes.Series[3]
So(seriesFour.Name, ShouldEqual, "server2 Average @value")
So(seriesFour.Points, ShouldHaveLength, 2)
So(seriesFour.Points[0][0].Float64, ShouldEqual, 20)
So(seriesFour.Points[0][1].Float64, ShouldEqual, 1000)
So(seriesFour.Points[1][0].Float64, ShouldEqual, 32)
So(seriesFour.Points[1][1].Float64, ShouldEqual, 2000)
})
Convey("With percentiles", func() {
targets := map[string]string{
"A": `{
"timeField": "@timestamp",
"metrics": [{ "type": "percentiles", "settings": { "percents": [75, 90] }, "id": "1" }],
"bucketAggs": [{ "type": "date_histogram", "field": "@timestamp", "id": "3" }]
}`,
}
response := `{
"responses": [
{
"aggregations": {
"3": {
"buckets": [
{
"1": { "values": { "75": 3.3, "90": 5.5 } },
"doc_count": 10,
"key": 1000
},
{
"1": { "values": { "75": 2.3, "90": 4.5 } },
"doc_count": 15,
"key": 2000
}
]
}
}
}
]
}`
rp, err := newResponseParserForTest(targets, response)
So(err, ShouldBeNil)
result, err := rp.getTimeSeries()
So(err, ShouldBeNil)
So(result.Results, ShouldHaveLength, 1)
queryRes := result.Results["A"]
So(queryRes, ShouldNotBeNil)
So(queryRes.Series, ShouldHaveLength, 2)
seriesOne := queryRes.Series[0]
So(seriesOne.Name, ShouldEqual, "p75")
So(seriesOne.Points, ShouldHaveLength, 2)
So(seriesOne.Points[0][0].Float64, ShouldEqual, 3.3)
So(seriesOne.Points[0][1].Float64, ShouldEqual, 1000)
So(seriesOne.Points[1][0].Float64, ShouldEqual, 2.3)
So(seriesOne.Points[1][1].Float64, ShouldEqual, 2000)
seriesTwo := queryRes.Series[1]
So(seriesTwo.Name, ShouldEqual, "p90")
So(seriesTwo.Points, ShouldHaveLength, 2)
So(seriesTwo.Points[0][0].Float64, ShouldEqual, 5.5)
So(seriesTwo.Points[0][1].Float64, ShouldEqual, 1000)
So(seriesTwo.Points[1][0].Float64, ShouldEqual, 4.5)
So(seriesTwo.Points[1][1].Float64, ShouldEqual, 2000)
})
Convey("With extended stats", func() {
targets := map[string]string{
"A": `{
"timeField": "@timestamp",
"metrics": [{ "type": "extended_stats", "meta": { "max": true, "std_deviation_bounds_upper": true, "std_deviation_bounds_lower": true }, "id": "1" }],
"bucketAggs": [
{ "type": "terms", "field": "host", "id": "3" },
{ "type": "date_histogram", "field": "@timestamp", "id": "4" }
]
}`,
}
response := `{
"responses": [
{
"aggregations": {
"3": {
"buckets": [
{
"key": "server1",
"4": {
"buckets": [
{
"1": {
"max": 10.2,
"min": 5.5,
"std_deviation_bounds": { "upper": 3, "lower": -2 }
},
"doc_count": 10,
"key": 1000
}
]
}
},
{
"key": "server2",
"4": {
"buckets": [
{
"1": {
"max": 15.5,
"min": 3.4,
"std_deviation_bounds": { "upper": 4, "lower": -1 }
},
"doc_count": 10,
"key": 1000
}
]
}
}
]
}
}
}
]
}`
rp, err := newResponseParserForTest(targets, response)
So(err, ShouldBeNil)
result, err := rp.getTimeSeries()
So(err, ShouldBeNil)
So(result.Results, ShouldHaveLength, 1)
queryRes := result.Results["A"]
So(queryRes, ShouldNotBeNil)
So(queryRes.Series, ShouldHaveLength, 6)
seriesOne := queryRes.Series[0]
So(seriesOne.Name, ShouldEqual, "server1 Max")
So(seriesOne.Points, ShouldHaveLength, 1)
So(seriesOne.Points[0][0].Float64, ShouldEqual, 10.2)
So(seriesOne.Points[0][1].Float64, ShouldEqual, 1000)
seriesTwo := queryRes.Series[1]
So(seriesTwo.Name, ShouldEqual, "server1 Std Dev Lower")
So(seriesTwo.Points, ShouldHaveLength, 1)
So(seriesTwo.Points[0][0].Float64, ShouldEqual, -2)
So(seriesTwo.Points[0][1].Float64, ShouldEqual, 1000)
seriesThree := queryRes.Series[2]
So(seriesThree.Name, ShouldEqual, "server1 Std Dev Upper")
So(seriesThree.Points, ShouldHaveLength, 1)
So(seriesThree.Points[0][0].Float64, ShouldEqual, 3)
So(seriesThree.Points[0][1].Float64, ShouldEqual, 1000)
seriesFour := queryRes.Series[3]
So(seriesFour.Name, ShouldEqual, "server2 Max")
So(seriesFour.Points, ShouldHaveLength, 1)
So(seriesFour.Points[0][0].Float64, ShouldEqual, 15.5)
So(seriesFour.Points[0][1].Float64, ShouldEqual, 1000)
seriesFive := queryRes.Series[4]
So(seriesFive.Name, ShouldEqual, "server2 Std Dev Lower")
So(seriesFive.Points, ShouldHaveLength, 1)
So(seriesFive.Points[0][0].Float64, ShouldEqual, -1)
So(seriesFive.Points[0][1].Float64, ShouldEqual, 1000)
seriesSix := queryRes.Series[5]
So(seriesSix.Name, ShouldEqual, "server2 Std Dev Upper")
So(seriesSix.Points, ShouldHaveLength, 1)
So(seriesSix.Points[0][0].Float64, ShouldEqual, 4)
So(seriesSix.Points[0][1].Float64, ShouldEqual, 1000)
})
Convey("Single group by with alias pattern", func() {
targets := map[string]string{
"A": `{
"timeField": "@timestamp",
"alias": "{{term @host}} {{metric}} and {{not_exist}} {{@host}}",
"metrics": [{ "type": "count", "id": "1" }],
"bucketAggs": [
{ "type": "terms", "field": "@host", "id": "2" },
{ "type": "date_histogram", "field": "@timestamp", "id": "3" }
]
}`,
}
response := `{
"responses": [
{
"aggregations": {
"2": {
"buckets": [
{
"3": {
"buckets": [{ "doc_count": 1, "key": 1000 }, { "doc_count": 3, "key": 2000 }]
},
"doc_count": 4,
"key": "server1"
},
{
"3": {
"buckets": [{ "doc_count": 2, "key": 1000 }, { "doc_count": 8, "key": 2000 }]
},
"doc_count": 10,
"key": "server2"
},
{
"3": {
"buckets": [{ "doc_count": 2, "key": 1000 }, { "doc_count": 8, "key": 2000 }]
},
"doc_count": 10,
"key": 0
}
]
}
}
}
]
}`
rp, err := newResponseParserForTest(targets, response)
So(err, ShouldBeNil)
result, err := rp.getTimeSeries()
So(err, ShouldBeNil)
So(result.Results, ShouldHaveLength, 1)
queryRes := result.Results["A"]
So(queryRes, ShouldNotBeNil)
So(queryRes.Series, ShouldHaveLength, 3)
seriesOne := queryRes.Series[0]
So(seriesOne.Name, ShouldEqual, "server1 Count and {{not_exist}} server1")
So(seriesOne.Points, ShouldHaveLength, 2)
So(seriesOne.Points[0][0].Float64, ShouldEqual, 1)
So(seriesOne.Points[0][1].Float64, ShouldEqual, 1000)
So(seriesOne.Points[1][0].Float64, ShouldEqual, 3)
So(seriesOne.Points[1][1].Float64, ShouldEqual, 2000)
seriesTwo := queryRes.Series[1]
So(seriesTwo.Name, ShouldEqual, "server2 Count and {{not_exist}} server2")
So(seriesTwo.Points, ShouldHaveLength, 2)
So(seriesTwo.Points[0][0].Float64, ShouldEqual, 2)
So(seriesTwo.Points[0][1].Float64, ShouldEqual, 1000)
So(seriesTwo.Points[1][0].Float64, ShouldEqual, 8)
So(seriesTwo.Points[1][1].Float64, ShouldEqual, 2000)
seriesThree := queryRes.Series[2]
So(seriesThree.Name, ShouldEqual, "0 Count and {{not_exist}} 0")
So(seriesThree.Points, ShouldHaveLength, 2)
So(seriesThree.Points[0][0].Float64, ShouldEqual, 2)
So(seriesThree.Points[0][1].Float64, ShouldEqual, 1000)
So(seriesThree.Points[1][0].Float64, ShouldEqual, 8)
So(seriesThree.Points[1][1].Float64, ShouldEqual, 2000)
})
Convey("Histogram response", func() {
targets := map[string]string{
"A": `{
"timeField": "@timestamp",
"metrics": [{ "type": "count", "id": "1" }],
"bucketAggs": [{ "type": "histogram", "field": "bytes", "id": "3" }]
}`,
}
response := `{
"responses": [
{
"aggregations": {
"3": {
"buckets": [{ "doc_count": 1, "key": 1000 }, { "doc_count": 3, "key": 2000 }, { "doc_count": 2, "key": 3000 }]
}
}
}
]
}`
rp, err := newResponseParserForTest(targets, response)
So(err, ShouldBeNil)
result, err := rp.getTimeSeries()
So(err, ShouldBeNil)
So(result.Results, ShouldHaveLength, 1)
queryRes := result.Results["A"]
So(queryRes, ShouldNotBeNil)
So(queryRes.Tables, ShouldHaveLength, 1)
rows := queryRes.Tables[0].Rows
So(rows, ShouldHaveLength, 3)
cols := queryRes.Tables[0].Columns
So(cols, ShouldHaveLength, 2)
So(cols[0].Text, ShouldEqual, "bytes")
So(cols[1].Text, ShouldEqual, "Count")
So(rows[0][0].(null.Float).Float64, ShouldEqual, 1000)
So(rows[0][1].(null.Float).Float64, ShouldEqual, 1)
So(rows[1][0].(null.Float).Float64, ShouldEqual, 2000)
So(rows[1][1].(null.Float).Float64, ShouldEqual, 3)
So(rows[2][0].(null.Float).Float64, ShouldEqual, 3000)
So(rows[2][1].(null.Float).Float64, ShouldEqual, 2)
})
Convey("With two filters agg", func() {
targets := map[string]string{
"A": `{
"timeField": "@timestamp",
"metrics": [{ "type": "count", "id": "1" }],
"bucketAggs": [
{
"type": "filters",
"id": "2",
"settings": {
"filters": [{ "query": "@metric:cpu" }, { "query": "@metric:logins.count" }]
}
},
{ "type": "date_histogram", "field": "@timestamp", "id": "3" }
]
}`,
}
response := `{
"responses": [
{
"aggregations": {
"2": {
"buckets": {
"@metric:cpu": {
"3": {
"buckets": [{ "doc_count": 1, "key": 1000 }, { "doc_count": 3, "key": 2000 }]
}
},
"@metric:logins.count": {
"3": {
"buckets": [{ "doc_count": 2, "key": 1000 }, { "doc_count": 8, "key": 2000 }]
}
}
}
}
}
}
]
}`
rp, err := newResponseParserForTest(targets, response)
So(err, ShouldBeNil)
result, err := rp.getTimeSeries()
So(err, ShouldBeNil)
So(result.Results, ShouldHaveLength, 1)
queryRes := result.Results["A"]
So(queryRes, ShouldNotBeNil)
So(queryRes.Series, ShouldHaveLength, 2)
seriesOne := queryRes.Series[0]
So(seriesOne.Name, ShouldEqual, "@metric:cpu")
So(seriesOne.Points, ShouldHaveLength, 2)
So(seriesOne.Points[0][0].Float64, ShouldEqual, 1)
So(seriesOne.Points[0][1].Float64, ShouldEqual, 1000)
So(seriesOne.Points[1][0].Float64, ShouldEqual, 3)
So(seriesOne.Points[1][1].Float64, ShouldEqual, 2000)
seriesTwo := queryRes.Series[1]
So(seriesTwo.Name, ShouldEqual, "@metric:logins.count")
So(seriesTwo.Points, ShouldHaveLength, 2)
So(seriesTwo.Points[0][0].Float64, ShouldEqual, 2)
So(seriesTwo.Points[0][1].Float64, ShouldEqual, 1000)
So(seriesTwo.Points[1][0].Float64, ShouldEqual, 8)
So(seriesTwo.Points[1][1].Float64, ShouldEqual, 2000)
})
Convey("With dropfirst and last aggregation", func() {
targets := map[string]string{
"A": `{
"timeField": "@timestamp",
"metrics": [{ "type": "avg", "id": "1" }, { "type": "count" }],
"bucketAggs": [
{
"type": "date_histogram",
"field": "@timestamp",
"id": "2",
"settings": { "trimEdges": 1 }
}
]
}`,
}
response := `{
"responses": [
{
"aggregations": {
"2": {
"buckets": [
{
"1": { "value": 1000 },
"key": 1,
"doc_count": 369
},
{
"1": { "value": 2000 },
"key": 2,
"doc_count": 200
},
{
"1": { "value": 2000 },
"key": 3,
"doc_count": 200
}
]
}
}
}
]
}`
rp, err := newResponseParserForTest(targets, response)
So(err, ShouldBeNil)
result, err := rp.getTimeSeries()
So(err, ShouldBeNil)
So(result.Results, ShouldHaveLength, 1)
queryRes := result.Results["A"]
So(queryRes, ShouldNotBeNil)
So(queryRes.Series, ShouldHaveLength, 2)
seriesOne := queryRes.Series[0]
So(seriesOne.Name, ShouldEqual, "Average")
So(seriesOne.Points, ShouldHaveLength, 1)
So(seriesOne.Points[0][0].Float64, ShouldEqual, 2000)
So(seriesOne.Points[0][1].Float64, ShouldEqual, 2)
seriesTwo := queryRes.Series[1]
So(seriesTwo.Name, ShouldEqual, "Count")
So(seriesTwo.Points, ShouldHaveLength, 1)
So(seriesTwo.Points[0][0].Float64, ShouldEqual, 200)
So(seriesTwo.Points[0][1].Float64, ShouldEqual, 2)
})
Convey("No group by time", func() {
targets := map[string]string{
"A": `{
"timeField": "@timestamp",
"metrics": [{ "type": "avg", "id": "1" }, { "type": "count" }],
"bucketAggs": [{ "type": "terms", "field": "host", "id": "2" }]
}`,
}
response := `{
"responses": [
{
"aggregations": {
"2": {
"buckets": [
{
"1": { "value": 1000 },
"key": "server-1",
"doc_count": 369
},
{
"1": { "value": 2000 },
"key": "server-2",
"doc_count": 200
}
]
}
}
}
]
}`
rp, err := newResponseParserForTest(targets, response)
So(err, ShouldBeNil)
result, err := rp.getTimeSeries()
So(err, ShouldBeNil)
So(result.Results, ShouldHaveLength, 1)
queryRes := result.Results["A"]
So(queryRes, ShouldNotBeNil)
So(queryRes.Tables, ShouldHaveLength, 1)
rows := queryRes.Tables[0].Rows
So(rows, ShouldHaveLength, 2)
cols := queryRes.Tables[0].Columns
So(cols, ShouldHaveLength, 3)
So(cols[0].Text, ShouldEqual, "host")
So(cols[1].Text, ShouldEqual, "Average")
So(cols[2].Text, ShouldEqual, "Count")
So(rows[0][0].(string), ShouldEqual, "server-1")
So(rows[0][1].(null.Float).Float64, ShouldEqual, 1000)
So(rows[0][2].(null.Float).Float64, ShouldEqual, 369)
So(rows[1][0].(string), ShouldEqual, "server-2")
So(rows[1][1].(null.Float).Float64, ShouldEqual, 2000)
So(rows[1][2].(null.Float).Float64, ShouldEqual, 200)
})
Convey("Multiple metrics of same type", func() {
targets := map[string]string{
"A": `{
"timeField": "@timestamp",
"metrics": [{ "type": "avg", "field": "test", "id": "1" }, { "type": "avg", "field": "test2", "id": "2" }],
"bucketAggs": [{ "type": "terms", "field": "host", "id": "2" }]
}`,
}
response := `{
"responses": [
{
"aggregations": {
"2": {
"buckets": [
{
"1": { "value": 1000 },
"2": { "value": 3000 },
"key": "server-1",
"doc_count": 369
}
]
}
}
}
]
}`
rp, err := newResponseParserForTest(targets, response)
So(err, ShouldBeNil)
result, err := rp.getTimeSeries()
So(err, ShouldBeNil)
So(result.Results, ShouldHaveLength, 1)
queryRes := result.Results["A"]
So(queryRes, ShouldNotBeNil)
So(queryRes.Tables, ShouldHaveLength, 1)
rows := queryRes.Tables[0].Rows
So(rows, ShouldHaveLength, 1)
cols := queryRes.Tables[0].Columns
So(cols, ShouldHaveLength, 3)
So(cols[0].Text, ShouldEqual, "host")
So(cols[1].Text, ShouldEqual, "Average test")
So(cols[2].Text, ShouldEqual, "Average test2")
So(rows[0][0].(string), ShouldEqual, "server-1")
So(rows[0][1].(null.Float).Float64, ShouldEqual, 1000)
So(rows[0][2].(null.Float).Float64, ShouldEqual, 3000)
})
// Convey("Raw documents query", func() {
// targets := map[string]string{
// "A": `{
// "timeField": "@timestamp",
// "metrics": [{ "type": "raw_document", "id": "1" }]
// }`,
// }
// response := `{
// "responses": [
// {
// "hits": {
// "total": 100,
// "hits": [
// {
// "_id": "1",
// "_type": "type",
// "_index": "index",
// "_source": { "sourceProp": "asd" },
// "fields": { "fieldProp": "field" }
// },
// {
// "_source": { "sourceProp": "asd2" },
// "fields": { "fieldProp": "field2" }
// }
// ]
// }
// }
// ]
// }`
// rp, err := newResponseParserForTest(targets, response)
// So(err, ShouldBeNil)
// result, err := rp.getTimeSeries()
// So(err, ShouldBeNil)
// So(result.Results, ShouldHaveLength, 1)
// queryRes := result.Results["A"]
// So(queryRes, ShouldNotBeNil)
// So(queryRes.Tables, ShouldHaveLength, 1)
// rows := queryRes.Tables[0].Rows
// So(rows, ShouldHaveLength, 1)
// cols := queryRes.Tables[0].Columns
// So(cols, ShouldHaveLength, 3)
// So(cols[0].Text, ShouldEqual, "host")
// So(cols[1].Text, ShouldEqual, "Average test")
// So(cols[2].Text, ShouldEqual, "Average test2")
// So(rows[0][0].(string), ShouldEqual, "server-1")
// So(rows[0][1].(null.Float).Float64, ShouldEqual, 1000)
// So(rows[0][2].(null.Float).Float64, ShouldEqual, 3000)
// })
})
}
func newResponseParserForTest(tsdbQueries map[string]string, responseBody string) (*responseParser, error) {
from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC)
to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC)
fromStr := fmt.Sprintf("%d", from.UnixNano()/int64(time.Millisecond))
toStr := fmt.Sprintf("%d", to.UnixNano()/int64(time.Millisecond))
tsdbQuery := &tsdb.TsdbQuery{
Queries: []*tsdb.Query{},
TimeRange: tsdb.NewTimeRange(fromStr, toStr),
}
for refID, tsdbQueryBody := range tsdbQueries {
tsdbQueryJSON, err := simplejson.NewJson([]byte(tsdbQueryBody))
if err != nil {
return nil, err
}
tsdbQuery.Queries = append(tsdbQuery.Queries, &tsdb.Query{
Model: tsdbQueryJSON,
RefId: refID,
})
}
var response es.MultiSearchResponse
err := json.Unmarshal([]byte(responseBody), &response)
if err != nil {
return nil, err
}
tsQueryParser := newTimeSeriesQueryParser()
queries, err := tsQueryParser.parse(tsdbQuery)
if err != nil {
return nil, err
}
return newResponseParser(response.Responses, queries), nil
}

View File

@ -1,99 +1,246 @@
package elasticsearch
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb"
"golang.org/x/net/context/ctxhttp"
"github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
)
type timeSeriesQuery struct {
queries []*Query
client es.Client
tsdbQuery *tsdb.TsdbQuery
intervalCalculator tsdb.IntervalCalculator
}
func (e *ElasticsearchExecutor) executeTimeSeriesQuery(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQuery) (*tsdb.Response, error) {
var newTimeSeriesQuery = func(client es.Client, tsdbQuery *tsdb.TsdbQuery, intervalCalculator tsdb.IntervalCalculator) *timeSeriesQuery {
return &timeSeriesQuery{
client: client,
tsdbQuery: tsdbQuery,
intervalCalculator: intervalCalculator,
}
}
func (e *timeSeriesQuery) execute() (*tsdb.Response, error) {
result := &tsdb.Response{}
result.Results = make(map[string]*tsdb.QueryResult)
tsQueryParser := newTimeSeriesQueryParser(dsInfo)
query, err := tsQueryParser.parse(tsdbQuery)
tsQueryParser := newTimeSeriesQueryParser()
queries, err := tsQueryParser.parse(e.tsdbQuery)
if err != nil {
return nil, err
}
buff := bytes.Buffer{}
for _, q := range query.queries {
s, err := q.Build(tsdbQuery, dsInfo)
ms := e.client.MultiSearch()
from := fmt.Sprintf("%d", e.tsdbQuery.TimeRange.GetFromAsMsEpoch())
to := fmt.Sprintf("%d", e.tsdbQuery.TimeRange.GetToAsMsEpoch())
for _, q := range queries {
minInterval, err := e.client.GetMinInterval(q.Interval)
if err != nil {
return nil, err
}
buff.WriteString(s)
}
payload := buff.String()
interval := e.intervalCalculator.Calculate(e.tsdbQuery.TimeRange, minInterval)
if setting.Env == setting.DEV {
glog.Debug("Elasticsearch playload", "raw playload", payload)
}
glog.Info("Elasticsearch playload", "raw playload", payload)
b := ms.Search()
b.Size(0)
filters := b.Query().Bool().Filter()
filters.AddDateRangeFilter(e.client.GetTimeField(), to, from, es.DateFormatEpochMS)
req, err := e.createRequest(dsInfo, payload)
if err != nil {
return nil, err
}
if q.RawQuery != "" {
filters.AddQueryStringFilter(q.RawQuery, true)
}
httpClient, err := dsInfo.GetHttpClient()
if err != nil {
return nil, err
}
if len(q.BucketAggs) == 0 {
if len(q.Metrics) == 0 || q.Metrics[0].Type != "raw_document" {
result.Results[q.RefID] = &tsdb.QueryResult{
RefId: q.RefID,
Error: fmt.Errorf("invalid query, missing metrics and aggregations"),
ErrorString: "invalid query, missing metrics and aggregations",
}
continue
}
metric := q.Metrics[0]
b.Size(metric.Settings.Get("size").MustInt(500))
b.SortDesc("@timestamp", "boolean")
b.AddDocValueField("@timestamp")
continue
}
resp, err := ctxhttp.Do(ctx, httpClient, req)
if err != nil {
return nil, err
}
aggBuilder := b.Agg()
if resp.StatusCode/100 != 2 {
return nil, fmt.Errorf("elasticsearch returned statuscode invalid status code: %v", resp.Status)
}
// iterate backwards to create aggregations bottom-down
for _, bucketAgg := range q.BucketAggs {
switch bucketAgg.Type {
case "date_histogram":
aggBuilder = addDateHistogramAgg(aggBuilder, bucketAgg, from, to, interval)
case "histogram":
aggBuilder = addHistogramAgg(aggBuilder, bucketAgg)
case "filters":
aggBuilder = addFiltersAgg(aggBuilder, bucketAgg)
case "terms":
aggBuilder = addTermsAgg(aggBuilder, bucketAgg, q.Metrics)
case "geohash_grid":
aggBuilder = addGeoHashGridAgg(aggBuilder, bucketAgg)
}
}
var responses Responses
defer resp.Body.Close()
dec := json.NewDecoder(resp.Body)
dec.UseNumber()
err = dec.Decode(&responses)
if err != nil {
return nil, err
}
for _, m := range q.Metrics {
if m.Type == "count" {
continue
}
for _, res := range responses.Responses {
if res.Err != nil {
return nil, errors.New(res.getErrMsg())
if isPipelineAgg(m.Type) {
if _, err := strconv.Atoi(m.PipelineAggregate); err == nil {
aggBuilder.Pipeline(m.ID, m.Type, m.PipelineAggregate, func(a *es.PipelineAggregation) {
a.Settings = m.Settings.MustMap()
})
} else {
continue
}
} else {
aggBuilder.Metric(m.ID, m.Type, m.Field, func(a *es.MetricAggregation) {
a.Settings = m.Settings.MustMap()
})
}
}
}
responseParser := ElasticsearchResponseParser{responses.Responses, query.queries}
queryRes := responseParser.getTimeSeries()
result.Results["A"] = queryRes
return result, nil
}
type timeSeriesQueryParser struct {
ds *models.DataSource
}
func newTimeSeriesQueryParser(ds *models.DataSource) *timeSeriesQueryParser {
return &timeSeriesQueryParser{
ds: ds,
req, err := ms.Build()
if err != nil {
return nil, err
}
res, err := e.client.ExecuteMultisearch(req)
if err != nil {
return nil, err
}
rp := newResponseParser(res.Responses, queries)
return rp.getTimeSeries()
}
func (p *timeSeriesQueryParser) parse(tsdbQuery *tsdb.TsdbQuery) (*timeSeriesQuery, error) {
func addDateHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, timeFrom, timeTo string, interval tsdb.Interval) es.AggBuilder {
aggBuilder.DateHistogram(bucketAgg.ID, bucketAgg.Field, func(a *es.DateHistogramAgg, b es.AggBuilder) {
a.Interval = bucketAgg.Settings.Get("interval").MustString("auto")
a.MinDocCount = bucketAgg.Settings.Get("min_doc_count").MustInt(0)
a.ExtendedBounds = &es.ExtendedBounds{Min: timeFrom, Max: timeTo}
a.Format = bucketAgg.Settings.Get("format").MustString(es.DateFormatEpochMS)
if a.Interval == "auto" {
a.Interval = "$__interval"
}
a.Interval = strings.Replace(a.Interval, "$interval", interval.Text, -1)
a.Interval = strings.Replace(a.Interval, "$__interval_ms", strconv.FormatInt(interval.Value.Nanoseconds()/int64(time.Millisecond), 10), -1)
a.Interval = strings.Replace(a.Interval, "$__interval", interval.Text, -1)
if missing, err := bucketAgg.Settings.Get("missing").String(); err == nil {
a.Missing = &missing
}
aggBuilder = b
})
return aggBuilder
}
func addHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBuilder {
aggBuilder.Histogram(bucketAgg.ID, bucketAgg.Field, func(a *es.HistogramAgg, b es.AggBuilder) {
a.Interval = bucketAgg.Settings.Get("interval").MustInt(1000)
a.MinDocCount = bucketAgg.Settings.Get("min_doc_count").MustInt(0)
if missing, err := bucketAgg.Settings.Get("missing").Int(); err == nil {
a.Missing = &missing
}
aggBuilder = b
})
return aggBuilder
}
func addTermsAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, metrics []*MetricAgg) es.AggBuilder {
aggBuilder.Terms(bucketAgg.ID, bucketAgg.Field, func(a *es.TermsAggregation, b es.AggBuilder) {
if size, err := bucketAgg.Settings.Get("size").Int(); err == nil {
a.Size = size
} else if size, err := bucketAgg.Settings.Get("size").String(); err == nil {
a.Size, err = strconv.Atoi(size)
if err != nil {
a.Size = 500
}
} else {
a.Size = 500
}
if minDocCount, err := bucketAgg.Settings.Get("min_doc_count").Int(); err == nil {
a.MinDocCount = &minDocCount
}
if missing, err := bucketAgg.Settings.Get("missing").String(); err == nil {
a.Missing = &missing
}
if orderBy, err := bucketAgg.Settings.Get("orderBy").String(); err == nil {
a.Order[orderBy] = bucketAgg.Settings.Get("order").MustString("desc")
if _, err := strconv.Atoi(orderBy); err == nil {
for _, m := range metrics {
if m.ID == orderBy {
b.Metric(m.ID, m.Type, m.Field, nil)
break
}
}
}
}
aggBuilder = b
})
return aggBuilder
}
func addFiltersAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBuilder {
filters := make(map[string]interface{})
for _, filter := range bucketAgg.Settings.Get("filters").MustArray() {
json := simplejson.NewFromAny(filter)
query := json.Get("query").MustString()
label := json.Get("label").MustString()
if label == "" {
label = query
}
filters[label] = &es.QueryStringFilter{Query: query, AnalyzeWildcard: true}
}
if len(filters) > 0 {
aggBuilder.Filters(bucketAgg.ID, func(a *es.FiltersAggregation, b es.AggBuilder) {
a.Filters = filters
aggBuilder = b
})
}
return aggBuilder
}
func addGeoHashGridAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBuilder {
aggBuilder.GeoHashGrid(bucketAgg.ID, bucketAgg.Field, func(a *es.GeoHashGridAggregation, b es.AggBuilder) {
a.Precision = bucketAgg.Settings.Get("precision").MustInt(3)
aggBuilder = b
})
return aggBuilder
}
type timeSeriesQueryParser struct{}
func newTimeSeriesQueryParser() *timeSeriesQueryParser {
return &timeSeriesQueryParser{}
}
func (p *timeSeriesQueryParser) parse(tsdbQuery *tsdb.TsdbQuery) ([]*Query, error) {
queries := make([]*Query, 0)
for _, q := range tsdbQuery.Queries {
model := q.Model
@ -111,10 +258,7 @@ func (p *timeSeriesQueryParser) parse(tsdbQuery *tsdb.TsdbQuery) (*timeSeriesQue
return nil, err
}
alias := model.Get("alias").MustString("")
parsedInterval, err := tsdb.GetIntervalFrom(p.ds, model, time.Millisecond)
if err != nil {
return nil, err
}
interval := model.Get("interval").MustString()
queries = append(queries, &Query{
TimeField: timeField,
@ -122,54 +266,52 @@ func (p *timeSeriesQueryParser) parse(tsdbQuery *tsdb.TsdbQuery) (*timeSeriesQue
BucketAggs: bucketAggs,
Metrics: metrics,
Alias: alias,
Interval: parsedInterval,
Interval: interval,
RefID: q.RefId,
})
}
return &timeSeriesQuery{queries: queries}, nil
return queries, nil
}
func (p *timeSeriesQueryParser) parseBucketAggs(model *simplejson.Json) ([]*BucketAgg, error) {
var err error
var result []*BucketAgg
for _, t := range model.Get("bucketAggs").MustArray() {
aggJson := simplejson.NewFromAny(t)
aggJSON := simplejson.NewFromAny(t)
agg := &BucketAgg{}
agg.Type, err = aggJson.Get("type").String()
agg.Type, err = aggJSON.Get("type").String()
if err != nil {
return nil, err
}
agg.ID, err = aggJson.Get("id").String()
agg.ID, err = aggJSON.Get("id").String()
if err != nil {
return nil, err
}
agg.Field = aggJson.Get("field").MustString()
agg.Settings = simplejson.NewFromAny(aggJson.Get("settings").MustMap())
agg.Field = aggJSON.Get("field").MustString()
agg.Settings = simplejson.NewFromAny(aggJSON.Get("settings").MustMap())
result = append(result, agg)
}
return result, nil
}
func (p *timeSeriesQueryParser) parseMetrics(model *simplejson.Json) ([]*Metric, error) {
func (p *timeSeriesQueryParser) parseMetrics(model *simplejson.Json) ([]*MetricAgg, error) {
var err error
var result []*Metric
var result []*MetricAgg
for _, t := range model.Get("metrics").MustArray() {
metricJSON := simplejson.NewFromAny(t)
metric := &Metric{}
metric := &MetricAgg{}
metric.Field = metricJSON.Get("field").MustString()
metric.Hide = metricJSON.Get("hide").MustBool(false)
metric.ID, err = metricJSON.Get("id").String()
if err != nil {
return nil, err
}
metric.ID = metricJSON.Get("id").MustString()
metric.PipelineAggregate = metricJSON.Get("pipelineAgg").MustString()
metric.Settings = simplejson.NewFromAny(metricJSON.Get("settings").MustMap())
metric.Meta = simplejson.NewFromAny(metricJSON.Get("meta").MustMap())
metric.Type, err = metricJSON.Get("type").String()
if err != nil {

View File

@ -1,21 +1,514 @@
package elasticsearch
import (
"fmt"
"testing"
"time"
"github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/tsdb"
. "github.com/smartystreets/goconvey/convey"
)
func TestExecuteTimeSeriesQuery(t *testing.T) {
from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC)
to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC)
fromStr := fmt.Sprintf("%d", from.UnixNano()/int64(time.Millisecond))
toStr := fmt.Sprintf("%d", to.UnixNano()/int64(time.Millisecond))
Convey("Test execute time series query", t, func() {
Convey("With defaults on es 2", func() {
c := newFakeClient(2)
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"bucketAggs": [{ "type": "date_histogram", "field": "@timestamp", "id": "2" }],
"metrics": [{"type": "count", "id": "0" }]
}`, from, to, 15*time.Second)
So(err, ShouldBeNil)
sr := c.multisearchRequests[0].Requests[0]
rangeFilter := sr.Query.Bool.Filters[0].(*es.RangeFilter)
So(rangeFilter.Key, ShouldEqual, c.timeField)
So(rangeFilter.Lte, ShouldEqual, toStr)
So(rangeFilter.Gte, ShouldEqual, fromStr)
So(rangeFilter.Format, ShouldEqual, es.DateFormatEpochMS)
So(sr.Aggs[0].Key, ShouldEqual, "2")
dateHistogramAgg := sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg)
So(dateHistogramAgg.Field, ShouldEqual, "@timestamp")
So(dateHistogramAgg.ExtendedBounds.Min, ShouldEqual, fromStr)
So(dateHistogramAgg.ExtendedBounds.Max, ShouldEqual, toStr)
})
Convey("With defaults on es 5", func() {
c := newFakeClient(5)
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"bucketAggs": [{ "type": "date_histogram", "field": "@timestamp", "id": "2" }],
"metrics": [{"type": "count", "id": "0" }]
}`, from, to, 15*time.Second)
So(err, ShouldBeNil)
sr := c.multisearchRequests[0].Requests[0]
So(sr.Query.Bool.Filters[0].(*es.RangeFilter).Key, ShouldEqual, c.timeField)
So(sr.Aggs[0].Key, ShouldEqual, "2")
So(sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg).ExtendedBounds.Min, ShouldEqual, fromStr)
So(sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg).ExtendedBounds.Max, ShouldEqual, toStr)
})
Convey("With multiple bucket aggs", func() {
c := newFakeClient(5)
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"bucketAggs": [
{ "type": "terms", "field": "@host", "id": "2" },
{ "type": "date_histogram", "field": "@timestamp", "id": "3" }
],
"metrics": [{"type": "count", "id": "1" }]
}`, from, to, 15*time.Second)
So(err, ShouldBeNil)
sr := c.multisearchRequests[0].Requests[0]
firstLevel := sr.Aggs[0]
So(firstLevel.Key, ShouldEqual, "2")
So(firstLevel.Aggregation.Aggregation.(*es.TermsAggregation).Field, ShouldEqual, "@host")
secondLevel := firstLevel.Aggregation.Aggs[0]
So(secondLevel.Key, ShouldEqual, "3")
So(secondLevel.Aggregation.Aggregation.(*es.DateHistogramAgg).Field, ShouldEqual, "@timestamp")
})
Convey("With select field", func() {
c := newFakeClient(5)
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"bucketAggs": [
{ "type": "date_histogram", "field": "@timestamp", "id": "2" }
],
"metrics": [{"type": "avg", "field": "@value", "id": "1" }]
}`, from, to, 15*time.Second)
So(err, ShouldBeNil)
sr := c.multisearchRequests[0].Requests[0]
firstLevel := sr.Aggs[0]
So(firstLevel.Key, ShouldEqual, "2")
So(firstLevel.Aggregation.Aggregation.(*es.DateHistogramAgg).Field, ShouldEqual, "@timestamp")
secondLevel := firstLevel.Aggregation.Aggs[0]
So(secondLevel.Key, ShouldEqual, "1")
So(secondLevel.Aggregation.Type, ShouldEqual, "avg")
So(secondLevel.Aggregation.Aggregation.(*es.MetricAggregation).Field, ShouldEqual, "@value")
})
Convey("With term agg and order by metric agg", func() {
c := newFakeClient(5)
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"bucketAggs": [
{
"type": "terms",
"field": "@host",
"id": "2",
"settings": { "size": "5", "order": "asc", "orderBy": "5" }
},
{ "type": "date_histogram", "field": "@timestamp", "id": "3" }
],
"metrics": [
{"type": "count", "id": "1" },
{"type": "avg", "field": "@value", "id": "5" }
]
}`, from, to, 15*time.Second)
So(err, ShouldBeNil)
sr := c.multisearchRequests[0].Requests[0]
avgAggOrderBy := sr.Aggs[0].Aggregation.Aggs[0]
So(avgAggOrderBy.Key, ShouldEqual, "5")
So(avgAggOrderBy.Aggregation.Type, ShouldEqual, "avg")
avgAgg := sr.Aggs[0].Aggregation.Aggs[1].Aggregation.Aggs[0]
So(avgAgg.Key, ShouldEqual, "5")
So(avgAgg.Aggregation.Type, ShouldEqual, "avg")
})
Convey("With metric percentiles", func() {
c := newFakeClient(5)
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"bucketAggs": [
{ "type": "date_histogram", "field": "@timestamp", "id": "3" }
],
"metrics": [
{
"id": "1",
"type": "percentiles",
"field": "@load_time",
"settings": {
"percents": [ "1", "2", "3", "4" ]
}
}
]
}`, from, to, 15*time.Second)
So(err, ShouldBeNil)
sr := c.multisearchRequests[0].Requests[0]
percentilesAgg := sr.Aggs[0].Aggregation.Aggs[0]
So(percentilesAgg.Key, ShouldEqual, "1")
So(percentilesAgg.Aggregation.Type, ShouldEqual, "percentiles")
metricAgg := percentilesAgg.Aggregation.Aggregation.(*es.MetricAggregation)
percents := metricAgg.Settings["percents"].([]interface{})
So(percents, ShouldHaveLength, 4)
So(percents[0], ShouldEqual, "1")
So(percents[1], ShouldEqual, "2")
So(percents[2], ShouldEqual, "3")
So(percents[3], ShouldEqual, "4")
})
Convey("With filters aggs on es 2", func() {
c := newFakeClient(2)
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"bucketAggs": [
{
"id": "2",
"type": "filters",
"settings": {
"filters": [ { "query": "@metric:cpu" }, { "query": "@metric:logins.count" } ]
}
},
{ "type": "date_histogram", "field": "@timestamp", "id": "4" }
],
"metrics": [{"type": "count", "id": "1" }]
}`, from, to, 15*time.Second)
So(err, ShouldBeNil)
sr := c.multisearchRequests[0].Requests[0]
filtersAgg := sr.Aggs[0]
So(filtersAgg.Key, ShouldEqual, "2")
So(filtersAgg.Aggregation.Type, ShouldEqual, "filters")
fAgg := filtersAgg.Aggregation.Aggregation.(*es.FiltersAggregation)
So(fAgg.Filters["@metric:cpu"].(*es.QueryStringFilter).Query, ShouldEqual, "@metric:cpu")
So(fAgg.Filters["@metric:logins.count"].(*es.QueryStringFilter).Query, ShouldEqual, "@metric:logins.count")
dateHistogramAgg := sr.Aggs[0].Aggregation.Aggs[0]
So(dateHistogramAgg.Key, ShouldEqual, "4")
So(dateHistogramAgg.Aggregation.Aggregation.(*es.DateHistogramAgg).Field, ShouldEqual, "@timestamp")
})
Convey("With filters aggs on es 5", func() {
c := newFakeClient(5)
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"bucketAggs": [
{
"id": "2",
"type": "filters",
"settings": {
"filters": [ { "query": "@metric:cpu" }, { "query": "@metric:logins.count" } ]
}
},
{ "type": "date_histogram", "field": "@timestamp", "id": "4" }
],
"metrics": [{"type": "count", "id": "1" }]
}`, from, to, 15*time.Second)
So(err, ShouldBeNil)
sr := c.multisearchRequests[0].Requests[0]
filtersAgg := sr.Aggs[0]
So(filtersAgg.Key, ShouldEqual, "2")
So(filtersAgg.Aggregation.Type, ShouldEqual, "filters")
fAgg := filtersAgg.Aggregation.Aggregation.(*es.FiltersAggregation)
So(fAgg.Filters["@metric:cpu"].(*es.QueryStringFilter).Query, ShouldEqual, "@metric:cpu")
So(fAgg.Filters["@metric:logins.count"].(*es.QueryStringFilter).Query, ShouldEqual, "@metric:logins.count")
dateHistogramAgg := sr.Aggs[0].Aggregation.Aggs[0]
So(dateHistogramAgg.Key, ShouldEqual, "4")
So(dateHistogramAgg.Aggregation.Aggregation.(*es.DateHistogramAgg).Field, ShouldEqual, "@timestamp")
})
Convey("With raw document metric", func() {
c := newFakeClient(5)
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"bucketAggs": [],
"metrics": [{ "id": "1", "type": "raw_document", "settings": {} }]
}`, from, to, 15*time.Second)
So(err, ShouldBeNil)
sr := c.multisearchRequests[0].Requests[0]
So(sr.Size, ShouldEqual, 500)
})
Convey("With raw document metric size set", func() {
c := newFakeClient(5)
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"bucketAggs": [],
"metrics": [{ "id": "1", "type": "raw_document", "settings": { "size": 1337 } }]
}`, from, to, 15*time.Second)
So(err, ShouldBeNil)
sr := c.multisearchRequests[0].Requests[0]
So(sr.Size, ShouldEqual, 1337)
})
Convey("With date histogram agg", func() {
c := newFakeClient(5)
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"bucketAggs": [
{
"id": "2",
"type": "date_histogram",
"field": "@timestamp",
"settings": { "interval": "auto", "min_doc_count": 2 }
}
],
"metrics": [{"type": "count", "id": "1" }]
}`, from, to, 15*time.Second)
So(err, ShouldBeNil)
sr := c.multisearchRequests[0].Requests[0]
firstLevel := sr.Aggs[0]
So(firstLevel.Key, ShouldEqual, "2")
So(firstLevel.Aggregation.Type, ShouldEqual, "date_histogram")
hAgg := firstLevel.Aggregation.Aggregation.(*es.DateHistogramAgg)
So(hAgg.Field, ShouldEqual, "@timestamp")
So(hAgg.Interval, ShouldEqual, "15s")
So(hAgg.MinDocCount, ShouldEqual, 2)
})
Convey("With histogram agg", func() {
c := newFakeClient(5)
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"bucketAggs": [
{
"id": "3",
"type": "histogram",
"field": "bytes",
"settings": { "interval": 10, "min_doc_count": 2, "missing": 5 }
}
],
"metrics": [{"type": "count", "id": "1" }]
}`, from, to, 15*time.Second)
So(err, ShouldBeNil)
sr := c.multisearchRequests[0].Requests[0]
firstLevel := sr.Aggs[0]
So(firstLevel.Key, ShouldEqual, "3")
So(firstLevel.Aggregation.Type, ShouldEqual, "histogram")
hAgg := firstLevel.Aggregation.Aggregation.(*es.HistogramAgg)
So(hAgg.Field, ShouldEqual, "bytes")
So(hAgg.Interval, ShouldEqual, 10)
So(hAgg.MinDocCount, ShouldEqual, 2)
So(*hAgg.Missing, ShouldEqual, 5)
})
Convey("With geo hash grid agg", func() {
c := newFakeClient(5)
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"bucketAggs": [
{
"id": "3",
"type": "geohash_grid",
"field": "@location",
"settings": { "precision": 3 }
}
],
"metrics": [{"type": "count", "id": "1" }]
}`, from, to, 15*time.Second)
So(err, ShouldBeNil)
sr := c.multisearchRequests[0].Requests[0]
firstLevel := sr.Aggs[0]
So(firstLevel.Key, ShouldEqual, "3")
So(firstLevel.Aggregation.Type, ShouldEqual, "geohash_grid")
ghGridAgg := firstLevel.Aggregation.Aggregation.(*es.GeoHashGridAggregation)
So(ghGridAgg.Field, ShouldEqual, "@location")
So(ghGridAgg.Precision, ShouldEqual, 3)
})
Convey("With moving average", func() {
c := newFakeClient(5)
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"bucketAggs": [
{ "type": "date_histogram", "field": "@timestamp", "id": "4" }
],
"metrics": [
{ "id": "3", "type": "sum", "field": "@value" },
{
"id": "2",
"type": "moving_avg",
"field": "3",
"pipelineAgg": "3"
}
]
}`, from, to, 15*time.Second)
So(err, ShouldBeNil)
sr := c.multisearchRequests[0].Requests[0]
firstLevel := sr.Aggs[0]
So(firstLevel.Key, ShouldEqual, "4")
So(firstLevel.Aggregation.Type, ShouldEqual, "date_histogram")
So(firstLevel.Aggregation.Aggs, ShouldHaveLength, 2)
sumAgg := firstLevel.Aggregation.Aggs[0]
So(sumAgg.Key, ShouldEqual, "3")
So(sumAgg.Aggregation.Type, ShouldEqual, "sum")
mAgg := sumAgg.Aggregation.Aggregation.(*es.MetricAggregation)
So(mAgg.Field, ShouldEqual, "@value")
movingAvgAgg := firstLevel.Aggregation.Aggs[1]
So(movingAvgAgg.Key, ShouldEqual, "2")
So(movingAvgAgg.Aggregation.Type, ShouldEqual, "moving_avg")
pl := movingAvgAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
So(pl.BucketPath, ShouldEqual, "3")
})
Convey("With broken moving average", func() {
c := newFakeClient(5)
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"bucketAggs": [
{ "type": "date_histogram", "field": "@timestamp", "id": "5" }
],
"metrics": [
{ "id": "3", "type": "sum", "field": "@value" },
{
"id": "2",
"type": "moving_avg",
"pipelineAgg": "3"
},
{
"id": "4",
"type": "moving_avg",
"pipelineAgg": "Metric to apply moving average"
}
]
}`, from, to, 15*time.Second)
So(err, ShouldBeNil)
sr := c.multisearchRequests[0].Requests[0]
firstLevel := sr.Aggs[0]
So(firstLevel.Key, ShouldEqual, "5")
So(firstLevel.Aggregation.Type, ShouldEqual, "date_histogram")
So(firstLevel.Aggregation.Aggs, ShouldHaveLength, 2)
movingAvgAgg := firstLevel.Aggregation.Aggs[1]
So(movingAvgAgg.Key, ShouldEqual, "2")
plAgg := movingAvgAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
So(plAgg.BucketPath, ShouldEqual, "3")
})
Convey("With derivative", func() {
c := newFakeClient(5)
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"bucketAggs": [
{ "type": "date_histogram", "field": "@timestamp", "id": "4" }
],
"metrics": [
{ "id": "3", "type": "sum", "field": "@value" },
{
"id": "2",
"type": "derivative",
"pipelineAgg": "3"
}
]
}`, from, to, 15*time.Second)
So(err, ShouldBeNil)
sr := c.multisearchRequests[0].Requests[0]
firstLevel := sr.Aggs[0]
So(firstLevel.Key, ShouldEqual, "4")
So(firstLevel.Aggregation.Type, ShouldEqual, "date_histogram")
derivativeAgg := firstLevel.Aggregation.Aggs[1]
So(derivativeAgg.Key, ShouldEqual, "2")
plAgg := derivativeAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
So(plAgg.BucketPath, ShouldEqual, "3")
})
})
}
type fakeClient struct {
version int
timeField string
multiSearchResponse *es.MultiSearchResponse
multiSearchError error
builder *es.MultiSearchRequestBuilder
multisearchRequests []*es.MultiSearchRequest
}
func newFakeClient(version int) *fakeClient {
return &fakeClient{
version: version,
timeField: "@timestamp",
multisearchRequests: make([]*es.MultiSearchRequest, 0),
multiSearchResponse: &es.MultiSearchResponse{},
}
}
func (c *fakeClient) GetVersion() int {
return c.version
}
func (c *fakeClient) GetTimeField() string {
return c.timeField
}
func (c *fakeClient) GetMinInterval(queryInterval string) (time.Duration, error) {
return 15 * time.Second, nil
}
func (c *fakeClient) ExecuteMultisearch(r *es.MultiSearchRequest) (*es.MultiSearchResponse, error) {
c.multisearchRequests = append(c.multisearchRequests, r)
return c.multiSearchResponse, c.multiSearchError
}
func (c *fakeClient) MultiSearch() *es.MultiSearchRequestBuilder {
c.builder = es.NewMultiSearchRequestBuilder(c.version)
return c.builder
}
func newTsdbQuery(body string) (*tsdb.TsdbQuery, error) {
json, err := simplejson.NewJson([]byte(body))
if err != nil {
return nil, err
}
return &tsdb.TsdbQuery{
Queries: []*tsdb.Query{
{
Model: json,
},
},
}, nil
}
func executeTsdbQuery(c es.Client, body string, from, to time.Time, minInterval time.Duration) (*tsdb.Response, error) {
json, err := simplejson.NewJson([]byte(body))
if err != nil {
return nil, err
}
fromStr := fmt.Sprintf("%d", from.UnixNano()/int64(time.Millisecond))
toStr := fmt.Sprintf("%d", to.UnixNano()/int64(time.Millisecond))
tsdbQuery := &tsdb.TsdbQuery{
Queries: []*tsdb.Query{
{
Model: json,
},
},
TimeRange: tsdb.NewTimeRange(fromStr, toStr),
}
query := newTimeSeriesQuery(c, tsdbQuery, tsdb.NewIntervalCalculator(&tsdb.IntervalOptions{MinInterval: minInterval}))
return query.execute()
}
func TestTimeSeriesQueryParser(t *testing.T) {
Convey("Test time series query parser", t, func() {
ds := &models.DataSource{}
p := newTimeSeriesQueryParser(ds)
p := newTimeSeriesQueryParser()
Convey("Should be able to parse query", func() {
json, err := simplejson.NewJson([]byte(`{
body := `{
"timeField": "@timestamp",
"query": "@metric:cpu",
"alias": "{{@hostname}} {{metric}}",
@ -63,21 +556,14 @@ func TestTimeSeriesQueryParser(t *testing.T) {
"type": "date_histogram"
}
]
}`))
}`
tsdbQuery, err := newTsdbQuery(body)
So(err, ShouldBeNil)
tsdbQuery := &tsdb.TsdbQuery{
Queries: []*tsdb.Query{
{
DataSource: ds,
Model: json,
},
},
}
tsQuery, err := p.parse(tsdbQuery)
queries, err := p.parse(tsdbQuery)
So(err, ShouldBeNil)
So(tsQuery.queries, ShouldHaveLength, 1)
So(queries, ShouldHaveLength, 1)
q := tsQuery.queries[0]
q := queries[0]
So(q.TimeField, ShouldEqual, "@timestamp")
So(q.RawQuery, ShouldEqual, "@metric:cpu")

View File

@ -1,75 +0,0 @@
package moment
import (
"fmt"
"math"
"time"
)
// @todo In months/years requires the old and new to calculate correctly, right?
// @todo decide how to handle rounding (i.e. always floor?)
type Diff struct {
duration time.Duration
}
func (d *Diff) InSeconds() int {
return int(d.duration.Seconds())
}
func (d *Diff) InMinutes() int {
return int(d.duration.Minutes())
}
func (d *Diff) InHours() int {
return int(d.duration.Hours())
}
func (d *Diff) InDays() int {
return int(math.Floor(float64(d.InSeconds()) / 86400))
}
// This depends on where the weeks fall?
func (d *Diff) InWeeks() int {
return int(math.Floor(float64(d.InDays() / 7)))
}
func (d *Diff) InMonths() int {
return 0
}
func (d *Diff) InYears() int {
return 0
}
// http://momentjs.com/docs/#/durations/humanize/
func (d *Diff) Humanize() string {
diffInSeconds := d.InSeconds()
if diffInSeconds <= 45 {
return fmt.Sprintf("%d seconds ago", diffInSeconds)
} else if diffInSeconds <= 90 {
return "a minute ago"
}
diffInMinutes := d.InMinutes()
if diffInMinutes <= 45 {
return fmt.Sprintf("%d minutes ago", diffInMinutes)
} else if diffInMinutes <= 90 {
return "an hour ago"
}
diffInHours := d.InHours()
if diffInHours <= 22 {
return fmt.Sprintf("%d hours ago", diffInHours)
} else if diffInHours <= 36 {
return "a day ago"
}
return "diff is in days"
}
// In Months
// In years

File diff suppressed because it is too large Load Diff

View File

@ -1,100 +0,0 @@
package moment
import (
"regexp"
"strings"
)
type MomentParser struct{}
var (
date_pattern = regexp.MustCompile("(LT|LL?L?L?|l{1,4}|Mo|MM?M?M?|Do|DDDo|DD?D?D?|ddd?d?|do?|w[o|w]?|W[o|W]?|YYYYY|YYYY|YY|gg(ggg?)?|GG(GGG?)?|e|E|a|A|hh?|HH?|mm?|ss?|SS?S?|X|zz?|ZZ?|Q)")
)
/*
+ <stdOrdinal> S (makes any number before it ordinal)
+ stdDayOfYear 1,2,365
+ stdDayOfYearZero 001, 002, 365
+ stdDayOfWeek w 0, 1, 2 numeric day of the week (0 = sunday)
+ stdDayOfWeekISO N 1 = Monday
+ stdWeekOfYear W Iso week number of year
+ stdUnix U
+ stdQuarter
*/
// Thanks to https://github.com/fightbulc/moment.php for replacement keys and regex
var moment_replacements = map[string]string{
"M": "1", // stdNumMonth 1 2 ... 11 12
"Mo": "1<stdOrdinal>", // stdNumMonth 1st 2nd ... 11th 12th
"MM": "01", // stdZeroMonth 01 02 ... 11 12
"MMM": "Jan", // stdMonth Jan Feb ... Nov Dec
"MMMM": "January", // stdLongMonth January February ... November December
"D": "2", // stdDay 1 2 ... 30 30
"Do": "2<stdOrdinal>", // stdDay 1st 2nd ... 30th 31st @todo support st nd th etch
"DD": "02", // stdZeroDay 01 02 ... 30 31
"DDD": "<stdDayOfYear>", // Day of the year 1 2 ... 364 365
"DDDo": "<stdDayOfYear><stdOrdinal>", // Day of the year 1st 2nd ... 364th 365th
"DDDD": "<stdDayOfYearZero>", // Day of the year 001 002 ... 364 365 @todo****
"d": "<stdDayOfWeek>", // Numeric representation of day of the week 0 1 ... 5 6
"do": "<stdDayOfWeek><stdOrdinal>", // 0th 1st ... 5th 6th
"dd": "Mon", // ***Su Mo ... Fr Sa @todo
"ddd": "Mon", // Sun Mon ... Fri Sat
"dddd": "Monday", // stdLongWeekDay Sunday Monday ... Friday Saturday
"e": "<stdDayOfWeek>", // Numeric representation of day of the week 0 1 ... 5 6 @todo
"E": "<stdDayOfWeekISO>", // ISO-8601 numeric representation of the day of the week (added in PHP 5.1.0) 1 2 ... 6 7 @todo
"w": "<stdWeekOfYear>", // 1 2 ... 52 53
"wo": "<stdWeekOfYear><stdOrdinal>", // 1st 2nd ... 52nd 53rd
"ww": "<stdWeekOfYear>", // ***01 02 ... 52 53 @todo
"W": "<stdWeekOfYear>", // 1 2 ... 52 53
"Wo": "<stdWeekOfYear><stdOrdinal>", // 1st 2nd ... 52nd 53rd
"WW": "<stdWeekOfYear>", // ***01 02 ... 52 53 @todo
"YY": "06", // stdYear 70 71 ... 29 30
"YYYY": "2006", // stdLongYear 1970 1971 ... 2029 2030
// "gg" : "o", // ISO-8601 year number 70 71 ... 29 30 @todo
// "gggg" : "o", // ***1970 1971 ... 2029 2030 @todo
// "GG" : "o", //70 71 ... 29 30 @todo
// "GGGG" : "o", // ***1970 1971 ... 2029 2030 @todo
"Q": "<stdQuarter>",
"A": "PM", // stdPM AM PM
"a": "pm", // stdpm am pm
"H": "<stdHourNoZero>", // stdHour 0 1 ... 22 23
"HH": "15", // 00 01 ... 22 23
"h": "3", // stdHour12 1 2 ... 11 12
"hh": "03", // stdZeroHour12 01 02 ... 11 12
"m": "4", // stdZeroMinute 0 1 ... 58 59
"mm": "04", // stdZeroMinute 00 01 ... 58 59
"s": "5", // stdSecond 0 1 ... 58 59
"ss": "05", // stdZeroSecond ***00 01 ... 58 59
// "S" : "", //0 1 ... 8 9
// "SS" : "", //0 1 ... 98 99
// "SSS" : "", //0 1 ... 998 999
"z": "MST", //EST CST ... MST PST
"zz": "MST", //EST CST ... MST PST
"Z": "Z07:00", // stdNumColonTZ -07:00 -06:00 ... +06:00 +07:00
"ZZ": "-0700", // stdNumTZ -0700 -0600 ... +0600 +0700
"X": "<stdUnix>", // Seconds since unix epoch 1360013296
"LT": "3:04 PM", // 8:30 PM
"L": "01/02/2006", //09/04/1986
"l": "1/2/2006", //9/4/1986
"LL": "January 2<stdOrdinal> 2006", //September 4th 1986 the php s flag isn't supported
"ll": "Jan 2 2006", //Sep 4 1986
"LLL": "January 2<stdOrdinal> 2006 3:04 PM", //September 4th 1986 8:30 PM @todo the php s flag isn't supported
"lll": "Jan 2 2006 3:04 PM", //Sep 4 1986 8:30 PM
"LLLL": "Monday, January 2<stdOrdinal> 2006 3:04 PM", //Thursday, September 4th 1986 8:30 PM the php s flag isn't supported
"llll": "Mon, Jan 2 2006 3:04 PM", //Thu, Sep 4 1986 8:30 PM
}
func (p *MomentParser) Convert(layout string) string {
var match [][]string
if match = date_pattern.FindAllStringSubmatch(layout, -1); match == nil {
return layout
}
for i := range match {
if replace, ok := moment_replacements[match[i][0]]; ok {
layout = strings.Replace(layout, match[i][0], replace, 1)
}
}
return layout
}

View File

@ -1,32 +0,0 @@
package moment
import (
"fmt"
"strings"
"time"
)
var (
days = []time.Weekday{
time.Sunday,
time.Monday,
time.Tuesday,
time.Wednesday,
time.Thursday,
time.Friday,
time.Saturday,
}
)
func ParseWeekDay(day string) (time.Weekday, error) {
day = strings.ToLower(day)
for _, d := range days {
if day == strings.ToLower(d.String()) {
return d, nil
}
}
return -1, fmt.Errorf("Unable to parse %s as week day", day)
}

View File

@ -1,68 +0,0 @@
package moment
import (
"regexp"
"strings"
)
type StrftimeParser struct{}
var (
replacements_pattern = regexp.MustCompile("%[mbhBedjwuaAVgyGYpPkHlIMSZzsTrRTDFXx]")
)
// Not implemented
// U
// C
var strftime_replacements = map[string]string{
"%m": "01", // stdZeroMonth 01 02 ... 11 12
"%b": "Jan", // stdMonth Jan Feb ... Nov Dec
"%h": "Jan",
"%B": "January", // stdLongMonth January February ... November December
"%e": "2", // stdDay 1 2 ... 30 30
"%d": "02", // stdZeroDay 01 02 ... 30 31
"%j": "<stdDayOfYear>", // Day of the year ***001 002 ... 364 365 @todo****
"%w": "<stdDayOfWeek>", // Numeric representation of day of the week 0 1 ... 5 6
"%u": "<stdDayOfWeekISO>", // ISO-8601 numeric representation of the day of the week (added in PHP 5.1.0) 1 2 ... 6 7 @todo
"%a": "Mon", // Sun Mon ... Fri Sat
"%A": "Monday", // stdLongWeekDay Sunday Monday ... Friday Saturday
"%V": "<stdWeekOfYear>", // ***01 02 ... 52 53 @todo begin with zeros
"%g": "06", // stdYear 70 71 ... 29 30
"%y": "06",
"%G": "2006", // stdLongYear 1970 1971 ... 2029 2030
"%Y": "2006",
"%p": "PM", // stdPM AM PM
"%P": "pm", // stdpm am pm
"%k": "15", // stdHour 0 1 ... 22 23
"%H": "15", // 00 01 ... 22 23
"%l": "3", // stdHour12 1 2 ... 11 12
"%I": "03", // stdZeroHour12 01 02 ... 11 12
"%M": "04", // stdZeroMinute 00 01 ... 58 59
"%S": "05", // stdZeroSecond ***00 01 ... 58 59
"%Z": "MST", //EST CST ... MST PST
"%z": "-0700", // stdNumTZ -0700 -0600 ... +0600 +0700
"%s": "<stdUnix>", // Seconds since unix epoch 1360013296
"%r": "03:04:05 PM",
"%R": "15:04",
"%T": "15:04:05",
"%D": "01/02/06",
"%F": "2006-01-02",
"%X": "15:04:05",
"%x": "01/02/06",
}
func (p *StrftimeParser) Convert(layout string) string {
var match [][]string
if match = replacements_pattern.FindAllStringSubmatch(layout, -1); match == nil {
return layout
}
for i := range match {
if replace, ok := strftime_replacements[match[i][0]]; ok {
layout = strings.Replace(layout, match[i][0], replace, 1)
}
}
return layout
}