mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Transformations: Add support for custom transformation operators in transformDataFrame (#64208)
This commit is contained in:
parent
88231c7cb8
commit
7948e3b20d
@ -1,6 +1,9 @@
|
||||
import { map } from 'rxjs';
|
||||
|
||||
import { toDataFrame } from '../dataframe/processDataFrame';
|
||||
import { FieldType } from '../types';
|
||||
import { CustomTransformOperator, FieldType } from '../types';
|
||||
import { mockTransformationsRegistry } from '../utils/tests/mockTransformationsRegistry';
|
||||
import { ArrayVector } from '../vector/ArrayVector';
|
||||
|
||||
import { ReducerID } from './fieldReducer';
|
||||
import { FrameMatcherID } from './matchers/ids';
|
||||
@ -9,13 +12,52 @@ import { filterFieldsByNameTransformer } from './transformers/filterByName';
|
||||
import { DataTransformerID } from './transformers/ids';
|
||||
import { reduceTransformer, ReduceTransformerMode } from './transformers/reduce';
|
||||
|
||||
const seriesAWithSingleField = toDataFrame({
|
||||
name: 'A',
|
||||
fields: [
|
||||
{ name: 'time', type: FieldType.time, values: [3000, 4000, 5000, 6000] },
|
||||
{ name: 'temperature', type: FieldType.number, values: [3, 4, 5, 6] },
|
||||
],
|
||||
});
|
||||
const getSeriesAWithSingleField = () =>
|
||||
toDataFrame({
|
||||
name: 'A',
|
||||
fields: [
|
||||
{ name: 'time', type: FieldType.time, values: [3000, 4000, 5000, 6000] },
|
||||
{ name: 'temperature', type: FieldType.number, values: [3, 4, 5, 6] },
|
||||
],
|
||||
});
|
||||
|
||||
// Divide values by 100
|
||||
const customTransform1: CustomTransformOperator = () => (source) => {
|
||||
return source.pipe(
|
||||
map((data) => {
|
||||
return data.map((frame) => {
|
||||
return {
|
||||
...frame,
|
||||
fields: frame.fields.map((field) => {
|
||||
return {
|
||||
...field,
|
||||
values: new ArrayVector(field.values.toArray().map((v) => v / 100)),
|
||||
};
|
||||
}),
|
||||
};
|
||||
});
|
||||
})
|
||||
);
|
||||
};
|
||||
|
||||
// Multiply values by 2
|
||||
const customTransform2: CustomTransformOperator = () => (source) => {
|
||||
return source.pipe(
|
||||
map((data) => {
|
||||
return data.map((frame) => {
|
||||
return {
|
||||
...frame,
|
||||
fields: frame.fields.map((field) => {
|
||||
return {
|
||||
...field,
|
||||
values: new ArrayVector(field.values.toArray().map((v) => v * 2)),
|
||||
};
|
||||
}),
|
||||
};
|
||||
});
|
||||
})
|
||||
);
|
||||
};
|
||||
|
||||
describe('transformDataFrame', () => {
|
||||
beforeAll(() => {
|
||||
@ -40,7 +82,7 @@ describe('transformDataFrame', () => {
|
||||
},
|
||||
];
|
||||
|
||||
await expect(transformDataFrame(cfg, [seriesAWithSingleField])).toEmitValuesWith((received) => {
|
||||
await expect(transformDataFrame(cfg, [getSeriesAWithSingleField()])).toEmitValuesWith((received) => {
|
||||
const processed = received[0];
|
||||
expect(processed[0].length).toEqual(1);
|
||||
expect(processed[0].fields.length).toEqual(1);
|
||||
@ -67,7 +109,7 @@ describe('transformDataFrame', () => {
|
||||
},
|
||||
];
|
||||
|
||||
await expect(transformDataFrame(cfg, [seriesAWithSingleField])).toEmitValuesWith((received) => {
|
||||
await expect(transformDataFrame(cfg, [getSeriesAWithSingleField()])).toEmitValuesWith((received) => {
|
||||
const processed = received[0];
|
||||
expect(processed[0].length).toEqual(1);
|
||||
expect(processed[0].fields.length).toEqual(2);
|
||||
@ -114,4 +156,90 @@ describe('transformDataFrame', () => {
|
||||
expect(processed).toMatchObject([[5, 6], [7]]);
|
||||
});
|
||||
});
|
||||
|
||||
describe('Custom transformations', () => {
|
||||
it('supports leading custom transformation', async () => {
|
||||
// divide by 100, reduce, filter
|
||||
const cfg = [
|
||||
customTransform1,
|
||||
{
|
||||
id: DataTransformerID.reduce,
|
||||
options: {
|
||||
reducers: [ReducerID.first],
|
||||
},
|
||||
},
|
||||
{
|
||||
id: DataTransformerID.filterFieldsByName,
|
||||
options: {
|
||||
include: {
|
||||
pattern: '/First/',
|
||||
},
|
||||
},
|
||||
},
|
||||
];
|
||||
|
||||
await expect(transformDataFrame(cfg, [getSeriesAWithSingleField()])).toEmitValuesWith((received) => {
|
||||
const processed = received[0];
|
||||
expect(processed[0].length).toEqual(1);
|
||||
expect(processed[0].fields.length).toEqual(1);
|
||||
expect(processed[0].fields[0].values.get(0)).toEqual(0.03);
|
||||
});
|
||||
});
|
||||
it('supports trailing custom transformation', async () => {
|
||||
// reduce, filter, divide by 100
|
||||
const cfg = [
|
||||
{
|
||||
id: DataTransformerID.reduce,
|
||||
options: {
|
||||
reducers: [ReducerID.first],
|
||||
},
|
||||
},
|
||||
{
|
||||
id: DataTransformerID.filterFieldsByName,
|
||||
options: {
|
||||
include: {
|
||||
pattern: '/First/',
|
||||
},
|
||||
},
|
||||
},
|
||||
customTransform1,
|
||||
];
|
||||
|
||||
await expect(transformDataFrame(cfg, [getSeriesAWithSingleField()])).toEmitValuesWith((received) => {
|
||||
const processed = received[0];
|
||||
expect(processed[0].length).toEqual(1);
|
||||
expect(processed[0].fields.length).toEqual(1);
|
||||
expect(processed[0].fields[0].values.get(0)).toEqual(0.03);
|
||||
});
|
||||
});
|
||||
|
||||
it('supports mixed custom transformation', async () => {
|
||||
// reduce, multiply by 2, filter, divide by 100
|
||||
const cfg = [
|
||||
{
|
||||
id: DataTransformerID.reduce,
|
||||
options: {
|
||||
reducers: [ReducerID.first],
|
||||
},
|
||||
},
|
||||
customTransform2,
|
||||
{
|
||||
id: DataTransformerID.filterFieldsByName,
|
||||
options: {
|
||||
include: {
|
||||
pattern: '/First/',
|
||||
},
|
||||
},
|
||||
},
|
||||
customTransform1,
|
||||
];
|
||||
|
||||
await expect(transformDataFrame(cfg, [getSeriesAWithSingleField()])).toEmitValuesWith((received) => {
|
||||
const processed = received[0];
|
||||
expect(processed[0].length).toEqual(1);
|
||||
expect(processed[0].fields.length).toEqual(1);
|
||||
expect(processed[0].fields[0].values.get(0)).toEqual(0.06);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
@ -1,7 +1,13 @@
|
||||
import { MonoTypeOperatorFunction, Observable, of } from 'rxjs';
|
||||
import { map, mergeMap } from 'rxjs/operators';
|
||||
|
||||
import { DataFrame, DataTransformContext, DataTransformerConfig, FrameMatcher } from '../types';
|
||||
import {
|
||||
DataFrame,
|
||||
DataTransformContext,
|
||||
DataTransformerConfig,
|
||||
FrameMatcher,
|
||||
CustomTransformOperator,
|
||||
} from '../types';
|
||||
|
||||
import { getFrameMatchers } from './matchers';
|
||||
import { standardTransformersRegistry, TransformerRegistryItem } from './standardTransformersRegistry';
|
||||
@ -85,7 +91,7 @@ const postProcessTransform =
|
||||
* Apply configured transformations to the input data
|
||||
*/
|
||||
export function transformDataFrame(
|
||||
options: DataTransformerConfig[],
|
||||
options: Array<DataTransformerConfig | CustomTransformOperator>,
|
||||
data: DataFrame[],
|
||||
ctx?: DataTransformContext
|
||||
): Observable<DataFrame[]> {
|
||||
@ -101,13 +107,20 @@ export function transformDataFrame(
|
||||
for (let index = 0; index < options.length; index++) {
|
||||
const config = options[index];
|
||||
|
||||
if (config.disabled) {
|
||||
continue;
|
||||
if (isCustomTransformation(config)) {
|
||||
operators.push(config(context));
|
||||
} else {
|
||||
if (config.disabled) {
|
||||
continue;
|
||||
}
|
||||
operators.push(getOperator(config, context));
|
||||
}
|
||||
|
||||
operators.push(getOperator(config, context));
|
||||
}
|
||||
|
||||
// @ts-ignore TypeScript has a hard time understanding this construct
|
||||
return stream.pipe.apply(stream, operators);
|
||||
}
|
||||
|
||||
function isCustomTransformation(t: DataTransformerConfig | CustomTransformOperator): t is CustomTransformOperator {
|
||||
return typeof t === 'function';
|
||||
}
|
||||
|
@ -30,6 +30,13 @@ export interface DataTransformerInfo<TOptions = any> extends RegistryItemWithOpt
|
||||
operator: (options: TOptions, context: DataTransformContext) => MonoTypeOperatorFunction<DataFrame[]>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Function that returns a cutsom transform operator for transforming data frames
|
||||
*
|
||||
* @public
|
||||
*/
|
||||
export type CustomTransformOperator = (context: DataTransformContext) => MonoTypeOperatorFunction<DataFrame[]>;
|
||||
|
||||
/**
|
||||
* Many transformations can be called with a simple synchronous function.
|
||||
* When a transformer is defined, it should have identical behavior to using the operator
|
||||
|
Loading…
Reference in New Issue
Block a user