Elasticsearch: Support chained pipeline aggregations (#27945)

* Elasticsearch: Support multiple pipeline aggregations for a query

* Ensuring descendents are deleted with their parents.

* Update public/app/plugins/datasource/elasticsearch/metric_agg.ts

Co-authored-by: Giordano Ricci <grdnricci@gmail.com>

* Update public/app/plugins/datasource/elasticsearch/query_def.ts

Co-authored-by: Giordano Ricci <grdnricci@gmail.com>

* Update public/app/plugins/datasource/elasticsearch/specs/query_def.test.ts

Co-authored-by: Giordano Ricci <grdnricci@gmail.com>

* Fixing typo

Co-authored-by: Giordano Ricci <grdnricci@gmail.com>
This commit is contained in:
Chris Cowan 2020-10-08 03:24:54 -07:00 committed by GitHub
parent 7d63b2c473
commit 3af193358c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 109 additions and 32 deletions

View File

@ -5,6 +5,10 @@ import { ElasticsearchAggregation } from './types';
import { GrafanaRootScope } from 'app/routes/GrafanaCtrl';
import { CoreEvents } from 'app/types';
function createDefaultMetric(id = 0): ElasticsearchAggregation {
return { type: 'count', field: 'select field', id: (id + 1).toString() };
}
export class ElasticMetricAggCtrl {
/** @ngInject */
constructor($scope: any, uiSegmentSrv: any, $rootScope: GrafanaRootScope) {
@ -21,7 +25,7 @@ export class ElasticMetricAggCtrl {
};
$scope.updatePipelineAggOptions = () => {
$scope.pipelineAggOptions = queryDef.getPipelineAggOptions($scope.target);
$scope.pipelineAggOptions = queryDef.getPipelineAggOptions($scope.target, $scope.agg);
};
$rootScope.onAppEvent(
@ -195,12 +199,19 @@ export class ElasticMetricAggCtrl {
0
);
metricAggs.splice(addIndex, 0, { type: 'count', field: 'select field', id: (id + 1).toString() });
metricAggs.splice(addIndex, 0, createDefaultMetric(id));
$scope.onChange();
};
$scope.removeMetricAgg = () => {
metricAggs.splice($scope.index, 1);
const metricBeingRemoved = metricAggs[$scope.index];
const metricsToRemove = queryDef.getAncestors($scope.target, metricBeingRemoved);
const newMetricAggs = metricAggs.filter(m => !metricsToRemove.includes(m.id));
if (newMetricAggs.length > 0) {
metricAggs.splice(0, metricAggs.length, ...newMetricAggs);
} else {
metricAggs.splice(0, metricAggs.length, createDefaultMetric());
}
$scope.onChange();
};

View File

@ -1,4 +1,5 @@
import _ from 'lodash';
import { ElasticsearchAggregation, ElasticsearchQuery } from './types';
export const metricAggTypes = [
{ text: 'Count', value: 'count', requiresField: false },
@ -207,15 +208,26 @@ export function isPipelineAggWithMultipleBucketPaths(metricType: any) {
return false;
}
export function getPipelineAggOptions(targets: any) {
const result: any[] = [];
_.each(targets.metrics, metric => {
if (!isPipelineAgg(metric.type)) {
result.push({ text: describeMetric(metric), value: metric.id });
export function getAncestors(target: ElasticsearchQuery, metric?: ElasticsearchAggregation) {
const { metrics } = target;
if (!metrics) {
return (metric && [metric.id]) || [];
}
const initialAncestors = metric != null ? [metric.id] : ([] as string[]);
return metrics.reduce((acc: string[], metric: ElasticsearchAggregation) => {
const includedInField = (metric.field && acc.includes(metric.field)) || false;
const includedInVariables = metric.pipelineVariables?.some(pv => acc.includes(pv?.pipelineAgg ?? ''));
return includedInField || includedInVariables ? [...acc, metric.id] : acc;
}, initialAncestors);
}
});
return result;
export function getPipelineAggOptions(target: ElasticsearchQuery, metric?: ElasticsearchAggregation) {
const { metrics } = target;
if (!metrics) {
return [];
}
const ancestors = getAncestors(target, metric);
return metrics.filter(m => !ancestors.includes(m.id)).map(m => ({ text: describeMetric(m), value: m.id }));
}
export function getMovingAvgSettings(model: any, filtered: boolean) {
@ -247,7 +259,7 @@ export function describeOrder(order: string) {
return def.text;
}
export function describeMetric(metric: { type: string; field: string }) {
export function describeMetric(metric: ElasticsearchAggregation) {
const def: any = _.find(metricAggTypes, { value: metric.type });
if (!def.requiresField && !isPipelineAgg(metric.type)) {
return def.text;

View File

@ -1,51 +1,104 @@
import * as queryDef from '../query_def';
describe('ElasticQueryDef', () => {
describe('getAncestors', () => {
describe('with multiple pipeline aggs', () => {
const maxMetric = { id: '1', type: 'max', field: '@value' };
const derivativeMetric = { id: '2', type: 'derivative', field: '1' };
const bucketScriptMetric = {
id: '3',
type: 'bucket_script',
field: '2',
pipelineVariables: [{ name: 'var1', pipelineAgg: '2' }],
};
const target = {
refId: '1',
isLogsQuery: false,
metrics: [maxMetric, derivativeMetric, bucketScriptMetric],
};
test('should return id of derivative and bucket_script', () => {
const response = queryDef.getAncestors(target, derivativeMetric);
expect(response).toEqual(['2', '3']);
});
test('should return id of the bucket_script', () => {
const response = queryDef.getAncestors(target, bucketScriptMetric);
expect(response).toEqual(['3']);
});
test('should return id of all the metrics', () => {
const response = queryDef.getAncestors(target, maxMetric);
expect(response).toEqual(['1', '2', '3']);
});
});
});
describe('getPipelineAggOptions', () => {
describe('with zero targets', () => {
const response = queryDef.getPipelineAggOptions([]);
describe('with zero metrics', () => {
const target = {
refId: '1',
isLogsQuery: false,
metrics: [],
};
const response = queryDef.getPipelineAggOptions(target);
test('should return zero', () => {
expect(response.length).toBe(0);
});
});
describe('with count and sum targets', () => {
const targets = {
metrics: [
{ type: 'count', field: '@value' },
{ type: 'sum', field: '@value' },
],
describe('with count and sum metrics', () => {
const currentAgg = { type: 'moving_avg', field: '@value', id: '3' };
const target = {
refId: '1',
isLogsQuery: false,
metrics: [{ type: 'count', field: '@value', id: '1' }, { type: 'sum', field: '@value', id: '2' }, currentAgg],
};
const response = queryDef.getPipelineAggOptions(targets);
const response = queryDef.getPipelineAggOptions(target, currentAgg);
test('should return zero', () => {
expect(response.length).toBe(2);
});
});
describe('with count and moving average targets', () => {
const targets = {
metrics: [
{ type: 'count', field: '@value' },
{ type: 'moving_avg', field: '@value' },
],
describe('with count and moving average metrics', () => {
const currentAgg = { type: 'moving_avg', field: '@value', id: '2' };
const target = {
refId: '1',
isLogsQuery: false,
metrics: [{ type: 'count', field: '@value', id: '1' }, currentAgg],
};
const response = queryDef.getPipelineAggOptions(targets);
const response = queryDef.getPipelineAggOptions(target, currentAgg);
test('should return one', () => {
expect(response.length).toBe(1);
});
});
describe('with derivatives targets', () => {
const targets = {
metrics: [{ type: 'derivative', field: '@value' }],
describe('with multiple chained pipeline aggs', () => {
const currentAgg = { type: 'moving_avg', field: '2', id: '3' };
const target = {
refId: '1',
isLogsQuery: false,
metrics: [{ type: 'count', field: '@value', id: '1' }, { type: 'moving_avg', field: '1', id: '2' }, currentAgg],
};
const response = queryDef.getPipelineAggOptions(targets);
const response = queryDef.getPipelineAggOptions(target, currentAgg);
test('should return two', () => {
expect(response.length).toBe(2);
});
});
describe('with derivatives metrics', () => {
const currentAgg = { type: 'derivative', field: '@value', id: '1' };
const target = {
refId: '1',
isLogsQuery: false,
metrics: [currentAgg],
};
const response = queryDef.getPipelineAggOptions(target, currentAgg);
test('should return zero', () => {
expect(response.length).toBe(0);

View File

@ -16,6 +16,7 @@ export interface ElasticsearchAggregation {
type: string;
settings?: any;
field?: string;
pipelineVariables?: Array<{ name?: string; pipelineAgg?: string }>;
}
export interface ElasticsearchQuery extends DataQuery {