Elasticsearch: Refactor processQuery to make it more readable (#61145)

* WIP: Simplify process query logic

* WIP: Simplify process query logic

* Simplify

* fix lint
This commit is contained in:
Ivana Huckova 2023-01-10 10:49:43 +01:00 committed by GitHub
parent b3a44649ce
commit 256f640e19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -57,137 +57,25 @@ func (e *timeSeriesQuery) execute() (*backend.QueryDataResponse, error) {
}
func (e *timeSeriesQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilder, from, to int64) error {
defaultTimeField := e.client.GetTimeField()
err := isQueryWithError(q)
if err != nil {
return err
}
defaultTimeField := e.client.GetTimeField()
b := ms.Search(q.Interval)
b.Size(0)
filters := b.Query().Bool().Filter()
filters.AddDateRangeFilter(e.client.GetTimeField(), to, from, es.DateFormatEpochMS)
filters.AddDateRangeFilter(defaultTimeField, to, from, es.DateFormatEpochMS)
filters.AddQueryStringFilter(q.RawQuery, true)
if len(q.BucketAggs) == 0 {
// If no aggregations, only document and logs queries are valid
if len(q.Metrics) == 0 || !(q.Metrics[0].Type == rawDataType || q.Metrics[0].Type == rawDocumentType || q.Metrics[0].Type == logsType) {
return fmt.Errorf("invalid query, missing metrics and aggregations")
}
// Defaults for log and document queries
metric := q.Metrics[0]
b.SortDesc(e.client.GetTimeField(), "boolean")
b.SortDesc("_doc", "")
b.AddDocValueField(e.client.GetTimeField())
b.Size(metric.Settings.Get("size").MustInt(defaultSize))
if metric.Type == "logs" {
// Add additional defaults for log query
b.Size(metric.Settings.Get("limit").MustInt(defaultSize))
b.AddHighlight()
// For log query, we add a date histogram aggregation
aggBuilder := b.Agg()
q.BucketAggs = append(q.BucketAggs, &BucketAgg{
Type: dateHistType,
Field: e.client.GetTimeField(),
ID: "1",
Settings: simplejson.NewFromAny(map[string]interface{}{
"interval": "auto",
}),
})
bucketAgg := q.BucketAggs[0]
bucketAgg.Settings = simplejson.NewFromAny(
bucketAgg.generateSettingsForDSL(),
)
_ = addDateHistogramAgg(aggBuilder, bucketAgg, from, to, defaultTimeField)
}
return nil
}
aggBuilder := b.Agg()
// iterate backwards to create aggregations bottom-down
for _, bucketAgg := range q.BucketAggs {
bucketAgg.Settings = simplejson.NewFromAny(
bucketAgg.generateSettingsForDSL(),
)
switch bucketAgg.Type {
case dateHistType:
aggBuilder = addDateHistogramAgg(aggBuilder, bucketAgg, from, to, defaultTimeField)
case histogramType:
aggBuilder = addHistogramAgg(aggBuilder, bucketAgg)
case filtersType:
aggBuilder = addFiltersAgg(aggBuilder, bucketAgg)
case termsType:
aggBuilder = addTermsAgg(aggBuilder, bucketAgg, q.Metrics)
case geohashGridType:
aggBuilder = addGeoHashGridAgg(aggBuilder, bucketAgg)
}
}
for _, m := range q.Metrics {
m := m
if m.Type == countType {
continue
}
if isPipelineAgg(m.Type) {
if isPipelineAggWithMultipleBucketPaths(m.Type) {
if len(m.PipelineVariables) > 0 {
bucketPaths := map[string]interface{}{}
for name, pipelineAgg := range m.PipelineVariables {
if _, err := strconv.Atoi(pipelineAgg); err == nil {
var appliedAgg *MetricAgg
for _, pipelineMetric := range q.Metrics {
if pipelineMetric.ID == pipelineAgg {
appliedAgg = pipelineMetric
break
}
}
if appliedAgg != nil {
if appliedAgg.Type == countType {
bucketPaths[name] = "_count"
} else {
bucketPaths[name] = pipelineAgg
}
}
}
}
aggBuilder.Pipeline(m.ID, m.Type, bucketPaths, func(a *es.PipelineAggregation) {
a.Settings = m.generateSettingsForDSL()
})
} else {
continue
}
} else {
pipelineAggField := getPipelineAggField(m)
if _, err := strconv.Atoi(pipelineAggField); err == nil {
var appliedAgg *MetricAgg
for _, pipelineMetric := range q.Metrics {
if pipelineMetric.ID == pipelineAggField {
appliedAgg = pipelineMetric
break
}
}
if appliedAgg != nil {
bucketPath := pipelineAggField
if appliedAgg.Type == countType {
bucketPath = "_count"
}
aggBuilder.Pipeline(m.ID, m.Type, bucketPath, func(a *es.PipelineAggregation) {
a.Settings = m.generateSettingsForDSL()
})
}
} else {
continue
}
}
} else {
aggBuilder.Metric(m.ID, m.Type, m.Field, func(a *es.MetricAggregation) {
a.Settings = m.generateSettingsForDSL()
})
}
if isLogsQuery(q) {
processLogsQuery(q, b, from, to, defaultTimeField)
} else if isDocumentQuery(q) {
processDocumentQuery(q, b, from, to, defaultTimeField)
} else {
// Otherwise, it is a time series query and we process it
processTimeSeriesQuery(q, b, from, to, defaultTimeField)
}
return nil
@ -399,3 +287,148 @@ func getPipelineAggField(m *MetricAgg) string {
}
return pipelineAggField
}
func isQueryWithError(query *Query) error {
if len(query.BucketAggs) == 0 {
// If no aggregations, only document and logs queries are valid
if len(query.Metrics) == 0 || !(isLogsQuery(query) || isDocumentQuery(query)) {
return fmt.Errorf("invalid query, missing metrics and aggregations")
}
}
return nil
}
func isLogsQuery(query *Query) bool {
return query.Metrics[0].Type == logsType
}
func isDocumentQuery(query *Query) bool {
return query.Metrics[0].Type == rawDataType || query.Metrics[0].Type == rawDocumentType
}
func processLogsQuery(q *Query, b *es.SearchRequestBuilder, from, to int64, defaultTimeField string) {
metric := q.Metrics[0]
b.SortDesc(defaultTimeField, "boolean")
b.SortDesc("_doc", "")
b.AddDocValueField(defaultTimeField)
b.Size(metric.Settings.Get("size").MustInt(defaultSize))
// Add additional defaults for log query
b.Size(metric.Settings.Get("limit").MustInt(defaultSize))
b.AddHighlight()
// For log query, we add a date histogram aggregation
aggBuilder := b.Agg()
q.BucketAggs = append(q.BucketAggs, &BucketAgg{
Type: dateHistType,
Field: defaultTimeField,
ID: "1",
Settings: simplejson.NewFromAny(map[string]interface{}{
"interval": "auto",
}),
})
bucketAgg := q.BucketAggs[0]
bucketAgg.Settings = simplejson.NewFromAny(
bucketAgg.generateSettingsForDSL(),
)
_ = addDateHistogramAgg(aggBuilder, bucketAgg, from, to, defaultTimeField)
}
func processDocumentQuery(q *Query, b *es.SearchRequestBuilder, from, to int64, defaultTimeField string) {
metric := q.Metrics[0]
b.SortDesc(defaultTimeField, "boolean")
b.SortDesc("_doc", "")
b.AddDocValueField(defaultTimeField)
b.Size(metric.Settings.Get("size").MustInt(defaultSize))
}
func processTimeSeriesQuery(q *Query, b *es.SearchRequestBuilder, from, to int64, defaultTimeField string) {
aggBuilder := b.Agg()
// Process buckets
// iterate backwards to create aggregations bottom-down
for _, bucketAgg := range q.BucketAggs {
bucketAgg.Settings = simplejson.NewFromAny(
bucketAgg.generateSettingsForDSL(),
)
switch bucketAgg.Type {
case dateHistType:
aggBuilder = addDateHistogramAgg(aggBuilder, bucketAgg, from, to, defaultTimeField)
case histogramType:
aggBuilder = addHistogramAgg(aggBuilder, bucketAgg)
case filtersType:
aggBuilder = addFiltersAgg(aggBuilder, bucketAgg)
case termsType:
aggBuilder = addTermsAgg(aggBuilder, bucketAgg, q.Metrics)
case geohashGridType:
aggBuilder = addGeoHashGridAgg(aggBuilder, bucketAgg)
}
}
// Process metrics
for _, m := range q.Metrics {
m := m
if m.Type == countType {
continue
}
if isPipelineAgg(m.Type) {
if isPipelineAggWithMultipleBucketPaths(m.Type) {
if len(m.PipelineVariables) > 0 {
bucketPaths := map[string]interface{}{}
for name, pipelineAgg := range m.PipelineVariables {
if _, err := strconv.Atoi(pipelineAgg); err == nil {
var appliedAgg *MetricAgg
for _, pipelineMetric := range q.Metrics {
if pipelineMetric.ID == pipelineAgg {
appliedAgg = pipelineMetric
break
}
}
if appliedAgg != nil {
if appliedAgg.Type == countType {
bucketPaths[name] = "_count"
} else {
bucketPaths[name] = pipelineAgg
}
}
}
}
aggBuilder.Pipeline(m.ID, m.Type, bucketPaths, func(a *es.PipelineAggregation) {
a.Settings = m.generateSettingsForDSL()
})
} else {
continue
}
} else {
pipelineAggField := getPipelineAggField(m)
if _, err := strconv.Atoi(pipelineAggField); err == nil {
var appliedAgg *MetricAgg
for _, pipelineMetric := range q.Metrics {
if pipelineMetric.ID == pipelineAggField {
appliedAgg = pipelineMetric
break
}
}
if appliedAgg != nil {
bucketPath := pipelineAggField
if appliedAgg.Type == countType {
bucketPath = "_count"
}
aggBuilder.Pipeline(m.ID, m.Type, bucketPath, func(a *es.PipelineAggregation) {
a.Settings = m.generateSettingsForDSL()
})
}
} else {
continue
}
}
} else {
aggBuilder.Metric(m.ID, m.Type, m.Field, func(a *es.MetricAggregation) {
a.Settings = m.generateSettingsForDSL()
})
}
}
}