mirror of
https://github.com/grafana/grafana.git
synced 2024-12-01 13:09:22 -06:00
7a304223e6
* Elasticsearch: Fix min_doc_count value when alerting * Add tests
470 lines
14 KiB
Go
470 lines
14 KiB
Go
package elasticsearch
|
|
|
|
import (
|
|
"fmt"
|
|
"regexp"
|
|
"strconv"
|
|
|
|
"github.com/Masterminds/semver"
|
|
"github.com/grafana/grafana/pkg/components/simplejson"
|
|
"github.com/grafana/grafana/pkg/plugins"
|
|
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
|
"github.com/grafana/grafana/pkg/tsdb/interval"
|
|
)
|
|
|
|
type timeSeriesQuery struct {
|
|
client es.Client
|
|
tsdbQuery plugins.DataQuery
|
|
intervalCalculator interval.Calculator
|
|
}
|
|
|
|
var newTimeSeriesQuery = func(client es.Client, dataQuery plugins.DataQuery,
|
|
intervalCalculator interval.Calculator) *timeSeriesQuery {
|
|
return &timeSeriesQuery{
|
|
client: client,
|
|
tsdbQuery: dataQuery,
|
|
intervalCalculator: intervalCalculator,
|
|
}
|
|
}
|
|
|
|
// nolint:staticcheck // plugins.DataQueryResult deprecated
|
|
func (e *timeSeriesQuery) execute() (plugins.DataResponse, error) {
|
|
tsQueryParser := newTimeSeriesQueryParser()
|
|
queries, err := tsQueryParser.parse(e.tsdbQuery)
|
|
if err != nil {
|
|
return plugins.DataResponse{}, err
|
|
}
|
|
|
|
ms := e.client.MultiSearch()
|
|
|
|
from := fmt.Sprintf("%d", e.tsdbQuery.TimeRange.GetFromAsMsEpoch())
|
|
to := fmt.Sprintf("%d", e.tsdbQuery.TimeRange.GetToAsMsEpoch())
|
|
result := plugins.DataResponse{
|
|
Results: make(map[string]plugins.DataQueryResult),
|
|
}
|
|
for _, q := range queries {
|
|
if err := e.processQuery(q, ms, from, to, result); err != nil {
|
|
return plugins.DataResponse{}, err
|
|
}
|
|
}
|
|
|
|
req, err := ms.Build()
|
|
if err != nil {
|
|
return plugins.DataResponse{}, err
|
|
}
|
|
|
|
res, err := e.client.ExecuteMultisearch(req)
|
|
if err != nil {
|
|
return plugins.DataResponse{}, err
|
|
}
|
|
|
|
rp := newResponseParser(res.Responses, queries, res.DebugInfo)
|
|
return rp.getTimeSeries()
|
|
}
|
|
|
|
// nolint:staticcheck // plugins.DataQueryResult deprecated
|
|
func (e *timeSeriesQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilder, from, to string,
|
|
result plugins.DataResponse) error {
|
|
minInterval, err := e.client.GetMinInterval(q.Interval)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
interval := e.intervalCalculator.Calculate(*e.tsdbQuery.TimeRange, minInterval)
|
|
|
|
b := ms.Search(interval)
|
|
b.Size(0)
|
|
filters := b.Query().Bool().Filter()
|
|
filters.AddDateRangeFilter(e.client.GetTimeField(), to, from, es.DateFormatEpochMS)
|
|
|
|
if q.RawQuery != "" {
|
|
filters.AddQueryStringFilter(q.RawQuery, true)
|
|
}
|
|
|
|
if len(q.BucketAggs) == 0 {
|
|
if len(q.Metrics) == 0 || q.Metrics[0].Type != "raw_document" {
|
|
result.Results[q.RefID] = plugins.DataQueryResult{
|
|
RefID: q.RefID,
|
|
Error: fmt.Errorf("invalid query, missing metrics and aggregations"),
|
|
ErrorString: "invalid query, missing metrics and aggregations",
|
|
}
|
|
return nil
|
|
}
|
|
metric := q.Metrics[0]
|
|
b.Size(metric.Settings.Get("size").MustInt(500))
|
|
b.SortDesc("@timestamp", "boolean")
|
|
b.AddDocValueField("@timestamp")
|
|
return nil
|
|
}
|
|
|
|
aggBuilder := b.Agg()
|
|
|
|
// iterate backwards to create aggregations bottom-down
|
|
for _, bucketAgg := range q.BucketAggs {
|
|
bucketAgg.Settings = simplejson.NewFromAny(
|
|
bucketAgg.generateSettingsForDSL(),
|
|
)
|
|
switch bucketAgg.Type {
|
|
case dateHistType:
|
|
aggBuilder = addDateHistogramAgg(aggBuilder, bucketAgg, from, to)
|
|
case histogramType:
|
|
aggBuilder = addHistogramAgg(aggBuilder, bucketAgg)
|
|
case filtersType:
|
|
aggBuilder = addFiltersAgg(aggBuilder, bucketAgg)
|
|
case termsType:
|
|
aggBuilder = addTermsAgg(aggBuilder, bucketAgg, q.Metrics)
|
|
case geohashGridType:
|
|
aggBuilder = addGeoHashGridAgg(aggBuilder, bucketAgg)
|
|
}
|
|
}
|
|
|
|
for _, m := range q.Metrics {
|
|
m := m
|
|
if m.Type == countType {
|
|
continue
|
|
}
|
|
|
|
if isPipelineAgg(m.Type) {
|
|
if isPipelineAggWithMultipleBucketPaths(m.Type) {
|
|
if len(m.PipelineVariables) > 0 {
|
|
bucketPaths := map[string]interface{}{}
|
|
for name, pipelineAgg := range m.PipelineVariables {
|
|
if _, err := strconv.Atoi(pipelineAgg); err == nil {
|
|
var appliedAgg *MetricAgg
|
|
for _, pipelineMetric := range q.Metrics {
|
|
if pipelineMetric.ID == pipelineAgg {
|
|
appliedAgg = pipelineMetric
|
|
break
|
|
}
|
|
}
|
|
if appliedAgg != nil {
|
|
if appliedAgg.Type == countType {
|
|
bucketPaths[name] = "_count"
|
|
} else {
|
|
bucketPaths[name] = pipelineAgg
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
aggBuilder.Pipeline(m.ID, m.Type, bucketPaths, func(a *es.PipelineAggregation) {
|
|
a.Settings = m.generateSettingsForDSL(e.client.GetVersion())
|
|
})
|
|
} else {
|
|
continue
|
|
}
|
|
} else {
|
|
if _, err := strconv.Atoi(m.PipelineAggregate); err == nil {
|
|
var appliedAgg *MetricAgg
|
|
for _, pipelineMetric := range q.Metrics {
|
|
if pipelineMetric.ID == m.PipelineAggregate {
|
|
appliedAgg = pipelineMetric
|
|
break
|
|
}
|
|
}
|
|
if appliedAgg != nil {
|
|
bucketPath := m.PipelineAggregate
|
|
if appliedAgg.Type == countType {
|
|
bucketPath = "_count"
|
|
}
|
|
|
|
aggBuilder.Pipeline(m.ID, m.Type, bucketPath, func(a *es.PipelineAggregation) {
|
|
a.Settings = m.generateSettingsForDSL(e.client.GetVersion())
|
|
})
|
|
}
|
|
} else {
|
|
continue
|
|
}
|
|
}
|
|
} else {
|
|
aggBuilder.Metric(m.ID, m.Type, m.Field, func(a *es.MetricAggregation) {
|
|
a.Settings = m.generateSettingsForDSL(e.client.GetVersion())
|
|
})
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func setFloatPath(settings *simplejson.Json, path ...string) {
|
|
if stringValue, err := settings.GetPath(path...).String(); err == nil {
|
|
if value, err := strconv.ParseFloat(stringValue, 64); err == nil {
|
|
settings.SetPath(path, value)
|
|
}
|
|
}
|
|
}
|
|
|
|
func setIntPath(settings *simplejson.Json, path ...string) {
|
|
if stringValue, err := settings.GetPath(path...).String(); err == nil {
|
|
if value, err := strconv.ParseInt(stringValue, 10, 64); err == nil {
|
|
settings.SetPath(path, value)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Casts values to float when required by Elastic's query DSL
|
|
func (metricAggregation MetricAgg) generateSettingsForDSL(version *semver.Version) map[string]interface{} {
|
|
switch metricAggregation.Type {
|
|
case "moving_avg":
|
|
setFloatPath(metricAggregation.Settings, "window")
|
|
setFloatPath(metricAggregation.Settings, "predict")
|
|
setFloatPath(metricAggregation.Settings, "settings", "alpha")
|
|
setFloatPath(metricAggregation.Settings, "settings", "beta")
|
|
setFloatPath(metricAggregation.Settings, "settings", "gamma")
|
|
setFloatPath(metricAggregation.Settings, "settings", "period")
|
|
case "serial_diff":
|
|
setFloatPath(metricAggregation.Settings, "lag")
|
|
}
|
|
|
|
if isMetricAggregationWithInlineScriptSupport(metricAggregation.Type) {
|
|
scriptValue, err := metricAggregation.Settings.GetPath("script").String()
|
|
if err != nil {
|
|
// the script is stored using the old format : `script:{inline: "value"}` or is not set
|
|
scriptValue, err = metricAggregation.Settings.GetPath("script", "inline").String()
|
|
}
|
|
|
|
constraint, _ := semver.NewConstraint(">=5.6.0")
|
|
|
|
if err == nil {
|
|
if constraint.Check(version) {
|
|
metricAggregation.Settings.SetPath([]string{"script"}, scriptValue)
|
|
} else {
|
|
metricAggregation.Settings.SetPath([]string{"script"}, map[string]interface{}{"inline": scriptValue})
|
|
}
|
|
}
|
|
}
|
|
|
|
return metricAggregation.Settings.MustMap()
|
|
}
|
|
|
|
func (bucketAgg BucketAgg) generateSettingsForDSL() map[string]interface{} {
|
|
// TODO: This might also need to be applied to other bucket aggregations and other fields.
|
|
switch bucketAgg.Type {
|
|
case "date_histogram":
|
|
setIntPath(bucketAgg.Settings, "min_doc_count")
|
|
}
|
|
|
|
return bucketAgg.Settings.MustMap()
|
|
}
|
|
|
|
func addDateHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, timeFrom, timeTo string) es.AggBuilder {
|
|
aggBuilder.DateHistogram(bucketAgg.ID, bucketAgg.Field, func(a *es.DateHistogramAgg, b es.AggBuilder) {
|
|
a.Interval = bucketAgg.Settings.Get("interval").MustString("auto")
|
|
a.MinDocCount = bucketAgg.Settings.Get("min_doc_count").MustInt(0)
|
|
a.ExtendedBounds = &es.ExtendedBounds{Min: timeFrom, Max: timeTo}
|
|
a.Format = bucketAgg.Settings.Get("format").MustString(es.DateFormatEpochMS)
|
|
|
|
if a.Interval == "auto" {
|
|
a.Interval = "$__interval"
|
|
}
|
|
|
|
if offset, err := bucketAgg.Settings.Get("offset").String(); err == nil {
|
|
a.Offset = offset
|
|
}
|
|
|
|
if missing, err := bucketAgg.Settings.Get("missing").String(); err == nil {
|
|
a.Missing = &missing
|
|
}
|
|
|
|
aggBuilder = b
|
|
})
|
|
|
|
return aggBuilder
|
|
}
|
|
|
|
func addHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBuilder {
|
|
aggBuilder.Histogram(bucketAgg.ID, bucketAgg.Field, func(a *es.HistogramAgg, b es.AggBuilder) {
|
|
a.Interval = bucketAgg.Settings.Get("interval").MustInt(1000)
|
|
a.MinDocCount = bucketAgg.Settings.Get("min_doc_count").MustInt(0)
|
|
|
|
if missing, err := bucketAgg.Settings.Get("missing").Int(); err == nil {
|
|
a.Missing = &missing
|
|
}
|
|
|
|
aggBuilder = b
|
|
})
|
|
|
|
return aggBuilder
|
|
}
|
|
|
|
func addTermsAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, metrics []*MetricAgg) es.AggBuilder {
|
|
aggBuilder.Terms(bucketAgg.ID, bucketAgg.Field, func(a *es.TermsAggregation, b es.AggBuilder) {
|
|
if size, err := bucketAgg.Settings.Get("size").Int(); err == nil {
|
|
a.Size = size
|
|
} else if size, err := bucketAgg.Settings.Get("size").String(); err == nil {
|
|
a.Size, err = strconv.Atoi(size)
|
|
if err != nil {
|
|
a.Size = 500
|
|
}
|
|
} else {
|
|
a.Size = 500
|
|
}
|
|
if a.Size == 0 {
|
|
a.Size = 500
|
|
}
|
|
|
|
if minDocCount, err := bucketAgg.Settings.Get("min_doc_count").Int(); err == nil {
|
|
a.MinDocCount = &minDocCount
|
|
}
|
|
if missing, err := bucketAgg.Settings.Get("missing").String(); err == nil {
|
|
a.Missing = &missing
|
|
}
|
|
|
|
if orderBy, err := bucketAgg.Settings.Get("orderBy").String(); err == nil {
|
|
/*
|
|
The format for extended stats and percentiles is {metricId}[bucket_path]
|
|
for everything else it's just {metricId}, _count, _term, or _key
|
|
*/
|
|
metricIdRegex := regexp.MustCompile(`^(\d+)`)
|
|
metricId := metricIdRegex.FindString(orderBy)
|
|
|
|
if len(metricId) > 0 {
|
|
for _, m := range metrics {
|
|
if m.ID == metricId {
|
|
if m.Type == "count" {
|
|
a.Order["_count"] = bucketAgg.Settings.Get("order").MustString("desc")
|
|
} else {
|
|
a.Order[orderBy] = bucketAgg.Settings.Get("order").MustString("desc")
|
|
b.Metric(m.ID, m.Type, m.Field, nil)
|
|
}
|
|
break
|
|
}
|
|
}
|
|
} else {
|
|
a.Order[orderBy] = bucketAgg.Settings.Get("order").MustString("desc")
|
|
}
|
|
}
|
|
|
|
aggBuilder = b
|
|
})
|
|
|
|
return aggBuilder
|
|
}
|
|
|
|
func addFiltersAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBuilder {
|
|
filters := make(map[string]interface{})
|
|
for _, filter := range bucketAgg.Settings.Get("filters").MustArray() {
|
|
json := simplejson.NewFromAny(filter)
|
|
query := json.Get("query").MustString()
|
|
label := json.Get("label").MustString()
|
|
if label == "" {
|
|
label = query
|
|
}
|
|
filters[label] = &es.QueryStringFilter{Query: query, AnalyzeWildcard: true}
|
|
}
|
|
|
|
if len(filters) > 0 {
|
|
aggBuilder.Filters(bucketAgg.ID, func(a *es.FiltersAggregation, b es.AggBuilder) {
|
|
a.Filters = filters
|
|
aggBuilder = b
|
|
})
|
|
}
|
|
|
|
return aggBuilder
|
|
}
|
|
|
|
func addGeoHashGridAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBuilder {
|
|
aggBuilder.GeoHashGrid(bucketAgg.ID, bucketAgg.Field, func(a *es.GeoHashGridAggregation, b es.AggBuilder) {
|
|
a.Precision = bucketAgg.Settings.Get("precision").MustInt(3)
|
|
aggBuilder = b
|
|
})
|
|
|
|
return aggBuilder
|
|
}
|
|
|
|
type timeSeriesQueryParser struct{}
|
|
|
|
func newTimeSeriesQueryParser() *timeSeriesQueryParser {
|
|
return &timeSeriesQueryParser{}
|
|
}
|
|
|
|
func (p *timeSeriesQueryParser) parse(tsdbQuery plugins.DataQuery) ([]*Query, error) {
|
|
queries := make([]*Query, 0)
|
|
for _, q := range tsdbQuery.Queries {
|
|
model := q.Model
|
|
timeField, err := model.Get("timeField").String()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rawQuery := model.Get("query").MustString()
|
|
bucketAggs, err := p.parseBucketAggs(model)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
metrics, err := p.parseMetrics(model)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
alias := model.Get("alias").MustString("")
|
|
interval := model.Get("interval").MustString("")
|
|
|
|
queries = append(queries, &Query{
|
|
TimeField: timeField,
|
|
RawQuery: rawQuery,
|
|
BucketAggs: bucketAggs,
|
|
Metrics: metrics,
|
|
Alias: alias,
|
|
Interval: interval,
|
|
RefID: q.RefID,
|
|
})
|
|
}
|
|
|
|
return queries, nil
|
|
}
|
|
|
|
func (p *timeSeriesQueryParser) parseBucketAggs(model *simplejson.Json) ([]*BucketAgg, error) {
|
|
var err error
|
|
var result []*BucketAgg
|
|
for _, t := range model.Get("bucketAggs").MustArray() {
|
|
aggJSON := simplejson.NewFromAny(t)
|
|
agg := &BucketAgg{}
|
|
|
|
agg.Type, err = aggJSON.Get("type").String()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
agg.ID, err = aggJSON.Get("id").String()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
agg.Field = aggJSON.Get("field").MustString()
|
|
agg.Settings = simplejson.NewFromAny(aggJSON.Get("settings").MustMap())
|
|
|
|
result = append(result, agg)
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func (p *timeSeriesQueryParser) parseMetrics(model *simplejson.Json) ([]*MetricAgg, error) {
|
|
var err error
|
|
var result []*MetricAgg
|
|
for _, t := range model.Get("metrics").MustArray() {
|
|
metricJSON := simplejson.NewFromAny(t)
|
|
metric := &MetricAgg{}
|
|
|
|
metric.Field = metricJSON.Get("field").MustString()
|
|
metric.Hide = metricJSON.Get("hide").MustBool(false)
|
|
metric.ID = metricJSON.Get("id").MustString()
|
|
metric.PipelineAggregate = metricJSON.Get("pipelineAgg").MustString()
|
|
metric.Settings = simplejson.NewFromAny(metricJSON.Get("settings").MustMap())
|
|
metric.Meta = simplejson.NewFromAny(metricJSON.Get("meta").MustMap())
|
|
metric.Type, err = metricJSON.Get("type").String()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if isPipelineAggWithMultipleBucketPaths(metric.Type) {
|
|
metric.PipelineVariables = map[string]string{}
|
|
pvArr := metricJSON.Get("pipelineVariables").MustArray()
|
|
for _, v := range pvArr {
|
|
kv := v.(map[string]interface{})
|
|
metric.PipelineVariables[kv["name"].(string)] = kv["pipelineAgg"].(string)
|
|
}
|
|
}
|
|
|
|
result = append(result, metric)
|
|
}
|
|
return result, nil
|
|
}
|