mirror of
https://github.com/grafana/grafana.git
synced 2025-01-09 23:53:25 -06:00
fix pipeline aggregations on doc count
This commit is contained in:
parent
cbd4125e69
commit
9a8ad70013
@ -73,5 +73,8 @@ func isPipelineAgg(metricType string) bool {
|
||||
|
||||
func describeMetric(metricType, field string) string {
|
||||
text := metricAggType[metricType]
|
||||
if metricType == countType {
|
||||
return text
|
||||
}
|
||||
return text + " " + field
|
||||
}
|
||||
|
@ -89,15 +89,29 @@ func (e *timeSeriesQuery) execute() (*tsdb.Response, error) {
|
||||
}
|
||||
|
||||
for _, m := range q.Metrics {
|
||||
if m.Type == "count" {
|
||||
if m.Type == countType {
|
||||
continue
|
||||
}
|
||||
|
||||
if isPipelineAgg(m.Type) {
|
||||
if _, err := strconv.Atoi(m.PipelineAggregate); err == nil {
|
||||
aggBuilder.Pipeline(m.ID, m.Type, m.PipelineAggregate, func(a *es.PipelineAggregation) {
|
||||
a.Settings = m.Settings.MustMap()
|
||||
})
|
||||
var appliedAgg *MetricAgg
|
||||
for _, pipelineMetric := range q.Metrics {
|
||||
if pipelineMetric.ID == m.PipelineAggregate {
|
||||
appliedAgg = pipelineMetric
|
||||
break
|
||||
}
|
||||
}
|
||||
if appliedAgg != nil {
|
||||
bucketPath := m.PipelineAggregate
|
||||
if appliedAgg.Type == countType {
|
||||
bucketPath = "_count"
|
||||
}
|
||||
|
||||
aggBuilder.Pipeline(m.ID, m.Type, bucketPath, func(a *es.PipelineAggregation) {
|
||||
a.Settings = m.Settings.MustMap()
|
||||
})
|
||||
}
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
|
@ -418,6 +418,38 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
|
||||
So(pl.BucketPath, ShouldEqual, "3")
|
||||
})
|
||||
|
||||
Convey("With moving average doc count", func() {
|
||||
c := newFakeClient(5)
|
||||
_, err := executeTsdbQuery(c, `{
|
||||
"timeField": "@timestamp",
|
||||
"bucketAggs": [
|
||||
{ "type": "date_histogram", "field": "@timestamp", "id": "4" }
|
||||
],
|
||||
"metrics": [
|
||||
{ "id": "3", "type": "count", "field": "select field" },
|
||||
{
|
||||
"id": "2",
|
||||
"type": "moving_avg",
|
||||
"field": "3",
|
||||
"pipelineAgg": "3"
|
||||
}
|
||||
]
|
||||
}`, from, to, 15*time.Second)
|
||||
So(err, ShouldBeNil)
|
||||
sr := c.multisearchRequests[0].Requests[0]
|
||||
|
||||
firstLevel := sr.Aggs[0]
|
||||
So(firstLevel.Key, ShouldEqual, "4")
|
||||
So(firstLevel.Aggregation.Type, ShouldEqual, "date_histogram")
|
||||
So(firstLevel.Aggregation.Aggs, ShouldHaveLength, 1)
|
||||
|
||||
movingAvgAgg := firstLevel.Aggregation.Aggs[0]
|
||||
So(movingAvgAgg.Key, ShouldEqual, "2")
|
||||
So(movingAvgAgg.Aggregation.Type, ShouldEqual, "moving_avg")
|
||||
pl := movingAvgAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
|
||||
So(pl.BucketPath, ShouldEqual, "_count")
|
||||
})
|
||||
|
||||
Convey("With broken moving average", func() {
|
||||
c := newFakeClient(5)
|
||||
_, err := executeTsdbQuery(c, `{
|
||||
@ -483,6 +515,34 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
|
||||
So(plAgg.BucketPath, ShouldEqual, "3")
|
||||
})
|
||||
|
||||
Convey("With derivative doc count", func() {
|
||||
c := newFakeClient(5)
|
||||
_, err := executeTsdbQuery(c, `{
|
||||
"timeField": "@timestamp",
|
||||
"bucketAggs": [
|
||||
{ "type": "date_histogram", "field": "@timestamp", "id": "4" }
|
||||
],
|
||||
"metrics": [
|
||||
{ "id": "3", "type": "count", "field": "select field" },
|
||||
{
|
||||
"id": "2",
|
||||
"type": "derivative",
|
||||
"pipelineAgg": "3"
|
||||
}
|
||||
]
|
||||
}`, from, to, 15*time.Second)
|
||||
So(err, ShouldBeNil)
|
||||
sr := c.multisearchRequests[0].Requests[0]
|
||||
|
||||
firstLevel := sr.Aggs[0]
|
||||
So(firstLevel.Key, ShouldEqual, "4")
|
||||
So(firstLevel.Aggregation.Type, ShouldEqual, "date_histogram")
|
||||
|
||||
derivativeAgg := firstLevel.Aggregation.Aggs[0]
|
||||
So(derivativeAgg.Key, ShouldEqual, "2")
|
||||
plAgg := derivativeAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
|
||||
So(plAgg.BucketPath, ShouldEqual, "_count")
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -266,7 +266,14 @@ export class ElasticQueryBuilder {
|
||||
|
||||
if (queryDef.isPipelineAgg(metric.type)) {
|
||||
if (metric.pipelineAgg && /^\d*$/.test(metric.pipelineAgg)) {
|
||||
metricAgg = { buckets_path: metric.pipelineAgg };
|
||||
const appliedAgg = queryDef.findMetricById(target.metrics, metric.pipelineAgg);
|
||||
if (appliedAgg) {
|
||||
if (appliedAgg.type === 'count') {
|
||||
metricAgg = { buckets_path: '_count' };
|
||||
} else {
|
||||
metricAgg = { buckets_path: metric.pipelineAgg };
|
||||
}
|
||||
}
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
|
@ -213,6 +213,9 @@ export function describeOrder(order) {
|
||||
|
||||
export function describeMetric(metric) {
|
||||
const def = _.find(metricAggTypes, { value: metric.type });
|
||||
if (!def.requiresField && !isPipelineAgg(metric.type)) {
|
||||
return def.text;
|
||||
}
|
||||
return def.text + ' ' + metric.field;
|
||||
}
|
||||
|
||||
@ -236,3 +239,7 @@ export function defaultMetricAgg() {
|
||||
export function defaultBucketAgg() {
|
||||
return { type: 'date_histogram', id: '2', settings: { interval: 'auto' } };
|
||||
}
|
||||
|
||||
export const findMetricById = (metrics: any[], id: any) => {
|
||||
return _.find(metrics, { id: id });
|
||||
};
|
||||
|
@ -250,6 +250,31 @@ describe('ElasticQueryBuilder', () => {
|
||||
expect(firstLevel.aggs['2'].moving_avg.buckets_path).toBe('3');
|
||||
});
|
||||
|
||||
it('with moving average doc count', () => {
|
||||
const query = builder.build({
|
||||
metrics: [
|
||||
{
|
||||
id: '3',
|
||||
type: 'count',
|
||||
field: 'select field',
|
||||
},
|
||||
{
|
||||
id: '2',
|
||||
type: 'moving_avg',
|
||||
field: '3',
|
||||
pipelineAgg: '3',
|
||||
},
|
||||
],
|
||||
bucketAggs: [{ type: 'date_histogram', field: '@timestamp', id: '4' }],
|
||||
});
|
||||
|
||||
const firstLevel = query.aggs['4'];
|
||||
|
||||
expect(firstLevel.aggs['2']).not.toBe(undefined);
|
||||
expect(firstLevel.aggs['2'].moving_avg).not.toBe(undefined);
|
||||
expect(firstLevel.aggs['2'].moving_avg.buckets_path).toBe('_count');
|
||||
});
|
||||
|
||||
it('with broken moving average', () => {
|
||||
const query = builder.build({
|
||||
metrics: [
|
||||
@ -304,6 +329,30 @@ describe('ElasticQueryBuilder', () => {
|
||||
expect(firstLevel.aggs['2'].derivative.buckets_path).toBe('3');
|
||||
});
|
||||
|
||||
it('with derivative doc count', () => {
|
||||
const query = builder.build({
|
||||
metrics: [
|
||||
{
|
||||
id: '3',
|
||||
type: 'count',
|
||||
field: 'select field',
|
||||
},
|
||||
{
|
||||
id: '2',
|
||||
type: 'derivative',
|
||||
pipelineAgg: '3',
|
||||
},
|
||||
],
|
||||
bucketAggs: [{ type: 'date_histogram', field: '@timestamp', id: '4' }],
|
||||
});
|
||||
|
||||
const firstLevel = query.aggs['4'];
|
||||
|
||||
expect(firstLevel.aggs['2']).not.toBe(undefined);
|
||||
expect(firstLevel.aggs['2'].derivative).not.toBe(undefined);
|
||||
expect(firstLevel.aggs['2'].derivative.buckets_path).toBe('_count');
|
||||
});
|
||||
|
||||
it('with histogram', () => {
|
||||
const query = builder.build({
|
||||
metrics: [{ id: '1', type: 'count' }],
|
||||
|
Loading…
Reference in New Issue
Block a user