From 7948e3b20dd1ed310dfe5c31c24bb718ce56c729 Mon Sep 17 00:00:00 2001 From: Dominik Prokop Date: Mon, 6 Mar 2023 03:09:26 -0800 Subject: [PATCH] Transformations: Add support for custom transformation operators in transformDataFrame (#64208) --- .../transformDataFrame.test.ts | 148 ++++++++++++++++-- .../src/transformations/transformDataFrame.ts | 25 ++- .../grafana-data/src/types/transformations.ts | 7 + 3 files changed, 164 insertions(+), 16 deletions(-) diff --git a/packages/grafana-data/src/transformations/transformDataFrame.test.ts b/packages/grafana-data/src/transformations/transformDataFrame.test.ts index 61e1a48488a..8970f2f189d 100644 --- a/packages/grafana-data/src/transformations/transformDataFrame.test.ts +++ b/packages/grafana-data/src/transformations/transformDataFrame.test.ts @@ -1,6 +1,9 @@ +import { map } from 'rxjs'; + import { toDataFrame } from '../dataframe/processDataFrame'; -import { FieldType } from '../types'; +import { CustomTransformOperator, FieldType } from '../types'; import { mockTransformationsRegistry } from '../utils/tests/mockTransformationsRegistry'; +import { ArrayVector } from '../vector/ArrayVector'; import { ReducerID } from './fieldReducer'; import { FrameMatcherID } from './matchers/ids'; @@ -9,13 +12,52 @@ import { filterFieldsByNameTransformer } from './transformers/filterByName'; import { DataTransformerID } from './transformers/ids'; import { reduceTransformer, ReduceTransformerMode } from './transformers/reduce'; -const seriesAWithSingleField = toDataFrame({ - name: 'A', - fields: [ - { name: 'time', type: FieldType.time, values: [3000, 4000, 5000, 6000] }, - { name: 'temperature', type: FieldType.number, values: [3, 4, 5, 6] }, - ], -}); +const getSeriesAWithSingleField = () => + toDataFrame({ + name: 'A', + fields: [ + { name: 'time', type: FieldType.time, values: [3000, 4000, 5000, 6000] }, + { name: 'temperature', type: FieldType.number, values: [3, 4, 5, 6] }, + ], + }); + +// Divide values by 100 +const customTransform1: CustomTransformOperator = () => (source) => { + return source.pipe( + map((data) => { + return data.map((frame) => { + return { + ...frame, + fields: frame.fields.map((field) => { + return { + ...field, + values: new ArrayVector(field.values.toArray().map((v) => v / 100)), + }; + }), + }; + }); + }) + ); +}; + +// Multiply values by 2 +const customTransform2: CustomTransformOperator = () => (source) => { + return source.pipe( + map((data) => { + return data.map((frame) => { + return { + ...frame, + fields: frame.fields.map((field) => { + return { + ...field, + values: new ArrayVector(field.values.toArray().map((v) => v * 2)), + }; + }), + }; + }); + }) + ); +}; describe('transformDataFrame', () => { beforeAll(() => { @@ -40,7 +82,7 @@ describe('transformDataFrame', () => { }, ]; - await expect(transformDataFrame(cfg, [seriesAWithSingleField])).toEmitValuesWith((received) => { + await expect(transformDataFrame(cfg, [getSeriesAWithSingleField()])).toEmitValuesWith((received) => { const processed = received[0]; expect(processed[0].length).toEqual(1); expect(processed[0].fields.length).toEqual(1); @@ -67,7 +109,7 @@ describe('transformDataFrame', () => { }, ]; - await expect(transformDataFrame(cfg, [seriesAWithSingleField])).toEmitValuesWith((received) => { + await expect(transformDataFrame(cfg, [getSeriesAWithSingleField()])).toEmitValuesWith((received) => { const processed = received[0]; expect(processed[0].length).toEqual(1); expect(processed[0].fields.length).toEqual(2); @@ -114,4 +156,90 @@ describe('transformDataFrame', () => { expect(processed).toMatchObject([[5, 6], [7]]); }); }); + + describe('Custom transformations', () => { + it('supports leading custom transformation', async () => { + // divide by 100, reduce, filter + const cfg = [ + customTransform1, + { + id: DataTransformerID.reduce, + options: { + reducers: [ReducerID.first], + }, + }, + { + id: DataTransformerID.filterFieldsByName, + options: { + include: { + pattern: '/First/', + }, + }, + }, + ]; + + await expect(transformDataFrame(cfg, [getSeriesAWithSingleField()])).toEmitValuesWith((received) => { + const processed = received[0]; + expect(processed[0].length).toEqual(1); + expect(processed[0].fields.length).toEqual(1); + expect(processed[0].fields[0].values.get(0)).toEqual(0.03); + }); + }); + it('supports trailing custom transformation', async () => { + // reduce, filter, divide by 100 + const cfg = [ + { + id: DataTransformerID.reduce, + options: { + reducers: [ReducerID.first], + }, + }, + { + id: DataTransformerID.filterFieldsByName, + options: { + include: { + pattern: '/First/', + }, + }, + }, + customTransform1, + ]; + + await expect(transformDataFrame(cfg, [getSeriesAWithSingleField()])).toEmitValuesWith((received) => { + const processed = received[0]; + expect(processed[0].length).toEqual(1); + expect(processed[0].fields.length).toEqual(1); + expect(processed[0].fields[0].values.get(0)).toEqual(0.03); + }); + }); + + it('supports mixed custom transformation', async () => { + // reduce, multiply by 2, filter, divide by 100 + const cfg = [ + { + id: DataTransformerID.reduce, + options: { + reducers: [ReducerID.first], + }, + }, + customTransform2, + { + id: DataTransformerID.filterFieldsByName, + options: { + include: { + pattern: '/First/', + }, + }, + }, + customTransform1, + ]; + + await expect(transformDataFrame(cfg, [getSeriesAWithSingleField()])).toEmitValuesWith((received) => { + const processed = received[0]; + expect(processed[0].length).toEqual(1); + expect(processed[0].fields.length).toEqual(1); + expect(processed[0].fields[0].values.get(0)).toEqual(0.06); + }); + }); + }); }); diff --git a/packages/grafana-data/src/transformations/transformDataFrame.ts b/packages/grafana-data/src/transformations/transformDataFrame.ts index fbd6951013e..f9baf411d6a 100644 --- a/packages/grafana-data/src/transformations/transformDataFrame.ts +++ b/packages/grafana-data/src/transformations/transformDataFrame.ts @@ -1,7 +1,13 @@ import { MonoTypeOperatorFunction, Observable, of } from 'rxjs'; import { map, mergeMap } from 'rxjs/operators'; -import { DataFrame, DataTransformContext, DataTransformerConfig, FrameMatcher } from '../types'; +import { + DataFrame, + DataTransformContext, + DataTransformerConfig, + FrameMatcher, + CustomTransformOperator, +} from '../types'; import { getFrameMatchers } from './matchers'; import { standardTransformersRegistry, TransformerRegistryItem } from './standardTransformersRegistry'; @@ -85,7 +91,7 @@ const postProcessTransform = * Apply configured transformations to the input data */ export function transformDataFrame( - options: DataTransformerConfig[], + options: Array, data: DataFrame[], ctx?: DataTransformContext ): Observable { @@ -101,13 +107,20 @@ export function transformDataFrame( for (let index = 0; index < options.length; index++) { const config = options[index]; - if (config.disabled) { - continue; + if (isCustomTransformation(config)) { + operators.push(config(context)); + } else { + if (config.disabled) { + continue; + } + operators.push(getOperator(config, context)); } - - operators.push(getOperator(config, context)); } // @ts-ignore TypeScript has a hard time understanding this construct return stream.pipe.apply(stream, operators); } + +function isCustomTransformation(t: DataTransformerConfig | CustomTransformOperator): t is CustomTransformOperator { + return typeof t === 'function'; +} diff --git a/packages/grafana-data/src/types/transformations.ts b/packages/grafana-data/src/types/transformations.ts index 3f212a917ed..1f9b3be2c47 100644 --- a/packages/grafana-data/src/types/transformations.ts +++ b/packages/grafana-data/src/types/transformations.ts @@ -30,6 +30,13 @@ export interface DataTransformerInfo extends RegistryItemWithOpt operator: (options: TOptions, context: DataTransformContext) => MonoTypeOperatorFunction; } +/** + * Function that returns a cutsom transform operator for transforming data frames + * + * @public + */ +export type CustomTransformOperator = (context: DataTransformContext) => MonoTypeOperatorFunction; + /** * Many transformations can be called with a simple synchronous function. * When a transformer is defined, it should have identical behavior to using the operator