Transformations: Add context parameter to transformDataFrame and operators (#60694)

* Transformations: Add context parameter to transformDataFrame and operators

* Remove unused queries prop

* Fixed test

* Fixed test
This commit is contained in:
Torkel Ödegaard
2023-01-05 17:34:09 +01:00
committed by GitHub
parent 6da850a2f2
commit bd90a6e1be
28 changed files with 150 additions and 108 deletions

View File

@@ -1,12 +1,12 @@
import { MonoTypeOperatorFunction, Observable, of } from 'rxjs';
import { map, mergeMap } from 'rxjs/operators';
import { DataFrame, DataTransformerConfig } from '../types';
import { DataFrame, DataTransformContext, DataTransformerConfig } from '../types';
import { standardTransformersRegistry, TransformerRegistryItem } from './standardTransformersRegistry';
const getOperator =
(config: DataTransformerConfig): MonoTypeOperatorFunction<DataFrame[]> =>
(config: DataTransformerConfig, ctx: DataTransformContext): MonoTypeOperatorFunction<DataFrame[]> =>
(source) => {
const info = standardTransformersRegistry.get(config.id);
@@ -19,7 +19,7 @@ const getOperator =
return source.pipe(
mergeMap((before) =>
of(before).pipe(info.transformation.operator(options, config.replace), postProcessTransform(before, info))
of(before).pipe(info.transformation.operator(options, ctx), postProcessTransform(before, info))
)
);
};
@@ -53,7 +53,11 @@ const postProcessTransform =
/**
* Apply configured transformations to the input data
*/
export function transformDataFrame(options: DataTransformerConfig[], data: DataFrame[]): Observable<DataFrame[]> {
export function transformDataFrame(
options: DataTransformerConfig[],
data: DataFrame[],
ctx?: DataTransformContext
): Observable<DataFrame[]> {
const stream = of<DataFrame[]>(data);
if (!options.length) {
@@ -61,6 +65,7 @@ export function transformDataFrame(options: DataTransformerConfig[], data: DataF
}
const operators: Array<MonoTypeOperatorFunction<DataFrame[]>> = [];
const context = ctx ?? { interpolate: (str) => str };
for (let index = 0; index < options.length; index++) {
const config = options[index];
@@ -69,7 +74,7 @@ export function transformDataFrame(options: DataTransformerConfig[], data: DataF
continue;
}
operators.push(getOperator(config));
operators.push(getOperator(config, context));
}
// @ts-ignore TypeScript has a hard time understanding this construct

View File

@@ -1,6 +1,6 @@
import { DataFrameView } from '../../dataframe';
import { toDataFrame } from '../../dataframe/processDataFrame';
import { ScopedVars } from '../../types';
import { DataTransformContext, ScopedVars } from '../../types';
import { FieldType } from '../../types/dataFrame';
import { BinaryOperationID } from '../../utils';
import { mockTransformationsRegistry } from '../../utils/tests/mockTransformationsRegistry';
@@ -235,7 +235,9 @@ describe('calculateField transformer w/ timeseries', () => {
},
replaceFields: true,
},
replace: (target: string | undefined, scopedVars?: ScopedVars, format?: string | Function): string => {
};
const context: DataTransformContext = {
interpolate: (target: string | undefined, scopedVars?: ScopedVars, format?: string | Function): string => {
if (!target) {
return '';
}
@@ -262,7 +264,7 @@ describe('calculateField transformer w/ timeseries', () => {
},
};
await expect(transformDataFrame([cfg], [seriesA])).toEmitValuesWith((received) => {
await expect(transformDataFrame([cfg], [seriesA], context)).toEmitValuesWith((received) => {
const data = received[0];
const filtered = data[0];
const rows = new DataFrameView(filtered).toArray();

View File

@@ -72,11 +72,15 @@ export const calculateFieldTransformer: DataTransformerInfo<CalculateFieldTransf
reducer: ReducerID.sum,
},
},
operator: (options, replace) => (outerSource) => {
operator: (options, ctx) => (outerSource) => {
const operator =
options && options.timeSeries !== false ? ensureColumnsTransformer.operator(null) : noopTransformer.operator({});
options && options.timeSeries !== false
? ensureColumnsTransformer.operator(null, ctx)
: noopTransformer.operator({}, ctx);
options.alias = replace ? replace(options.alias) : options.alias;
if (options.alias != null) {
options.alias = ctx.interpolate(options.alias);
}
return outerSource.pipe(
operator,
@@ -87,13 +91,12 @@ export const calculateFieldTransformer: DataTransformerInfo<CalculateFieldTransf
if (mode === CalculateFieldMode.ReduceRow) {
creator = getReduceRowCreator(defaults(options.reduce, defaultReduceOptions), data);
} else if (mode === CalculateFieldMode.BinaryOperation) {
const binaryOptions = replace
? {
...options.binary,
left: replace ? replace(options.binary?.left) : options.binary?.left,
right: replace ? replace(options.binary?.right) : options.binary?.right,
}
: options.binary;
const binaryOptions = {
...options.binary,
left: ctx.interpolate(options.binary?.left!),
right: ctx.interpolate(options.binary?.right!),
};
creator = getBinaryCreator(defaults(binaryOptions, defaultBinaryOptions), data);
}

View File

@@ -37,7 +37,8 @@ export const convertFieldTypeTransformer: SynchronousDataTransformerInfo<Convert
conversions: [{ targetField: undefined, destinationType: undefined, dateFormat: undefined }],
},
operator: (options) => (source) => source.pipe(map((data) => convertFieldTypeTransformer.transformer(options)(data))),
operator: (options, ctx) => (source) =>
source.pipe(map((data) => convertFieldTypeTransformer.transformer(options, ctx)(data))),
transformer: (options: ConvertFieldTypeTransformerOptions) => (data: DataFrame[]) => {
if (!Array.isArray(data) || data.length === 0) {

View File

@@ -12,16 +12,20 @@ export const ensureColumnsTransformer: SynchronousDataTransformerInfo = {
name: 'Ensure Columns Transformer',
description: 'Will check if current data frames is series or columns. If in series it will convert to columns.',
operator: (options) => (source) => source.pipe(map((data) => ensureColumnsTransformer.transformer(options)(data))),
operator: (options, ctx) => (source) =>
source.pipe(map((data) => ensureColumnsTransformer.transformer(options, ctx)(data))),
transformer: (options: any) => (frames: DataFrame[]) => {
transformer: (_options: any, ctx) => (frames: DataFrame[]) => {
// Assume timeseries should first be joined by time
const timeFieldName = findConsistentTimeFieldName(frames);
if (frames.length > 1 && timeFieldName) {
return joinByFieldTransformer.transformer({
byField: timeFieldName,
})(frames);
return joinByFieldTransformer.transformer(
{
byField: timeFieldName,
},
ctx
)(frames);
}
return frames;
},

View File

@@ -22,23 +22,21 @@ export const filterFieldsTransformer: DataTransformerInfo<FilterOptions> = {
* Return a modified copy of the series. If the transform is not or should not
* be applied, just return the input series
*/
operator: (options: FilterOptions, replace) => (source) => {
operator: (options: FilterOptions, ctx) => (source) => {
if (!options.include && !options.exclude) {
return source.pipe(noopTransformer.operator({}, replace));
return source.pipe(noopTransformer.operator({}, ctx));
}
if (replace) {
if (typeof options.include?.options === 'string') {
options.include.options = replace(options.include?.options);
} else if (typeof options.include?.options?.pattern === 'string') {
options.include.options.pattern = replace(options.include?.options.pattern);
}
if (typeof options.include?.options === 'string') {
options.include.options = ctx.interpolate(options.include?.options);
} else if (typeof options.include?.options?.pattern === 'string') {
options.include.options.pattern = ctx.interpolate(options.include?.options.pattern);
}
if (typeof options.exclude?.options === 'string') {
options.exclude.options = replace(options.exclude?.options);
} else if (typeof options.exclude?.options?.pattern === 'string') {
options.exclude.options.pattern = replace(options.exclude?.options.pattern);
}
if (typeof options.exclude?.options === 'string') {
options.exclude.options = ctx.interpolate(options.exclude?.options);
} else if (typeof options.exclude?.options?.pattern === 'string') {
options.exclude.options.pattern = ctx.interpolate(options.exclude?.options.pattern);
}
return source.pipe(
@@ -91,9 +89,9 @@ export const filterFramesTransformer: DataTransformerInfo<FilterOptions> = {
* Return a modified copy of the series. If the transform is not or should not
* be applied, just return the input series
*/
operator: (options) => (source) => {
operator: (options, ctx) => (source) => {
if (!options.include && !options.exclude) {
return source.pipe(noopTransformer.operator({}));
return source.pipe(noopTransformer.operator({}, ctx));
}
return source.pipe(

View File

@@ -205,7 +205,10 @@ describe('filterByName transformer', () => {
pattern: '/^$var1/',
},
},
replace: (target: string | undefined, scopedVars?: ScopedVars, format?: string | Function): string => {
};
const ctx = {
interpolate: (target: string | undefined, scopedVars?: ScopedVars, format?: string | Function): string => {
if (!target) {
return '';
}
@@ -222,7 +225,7 @@ describe('filterByName transformer', () => {
},
};
await expect(transformDataFrame([cfg], [seriesWithNamesToMatch])).toEmitValuesWith((received) => {
await expect(transformDataFrame([cfg], [seriesWithNamesToMatch], ctx)).toEmitValuesWith((received) => {
const data = received[0];
const filtered = data[0];
expect(filtered.fields.length).toBe(2);

View File

@@ -19,7 +19,7 @@ export const filterFramesByRefIdTransformer: DataTransformerInfo<FilterFramesByR
* Return a modified copy of the series. If the transform is not or should not
* be applied, just return the input series
*/
operator: (options) => (source) => {
operator: (options, ctx) => (source) => {
const filterOptions: FilterOptions = {};
if (options.include) {
filterOptions.include = {
@@ -34,6 +34,6 @@ export const filterFramesByRefIdTransformer: DataTransformerInfo<FilterFramesByR
};
}
return source.pipe(filterFramesTransformer.operator(filterOptions));
return source.pipe(filterFramesTransformer.operator(filterOptions, ctx));
},
};

View File

@@ -40,13 +40,13 @@ export const filterByValueTransformer: DataTransformerInfo<FilterByValueTransfor
match: FilterByValueMatch.any,
},
operator: (options) => (source) => {
operator: (options, ctx) => (source) => {
const filters = options.filters;
const matchAll = options.match === FilterByValueMatch.all;
const include = options.type === FilterByValueType.include;
if (!Array.isArray(filters) || filters.length === 0) {
return source.pipe(noopTransformer.operator({}));
return source.pipe(noopTransformer.operator({}, ctx));
}
return source.pipe(

View File

@@ -82,7 +82,8 @@ export const histogramTransformer: SynchronousDataTransformerInfo<HistogramTrans
fields: {},
},
operator: (options) => (source) => source.pipe(map((data) => histogramTransformer.transformer(options)(data))),
operator: (options, ctx) => (source) =>
source.pipe(map((data) => histogramTransformer.transformer(options, ctx)(data))),
transformer: (options: HistogramTransformerOptions) => (data: DataFrame[]) => {
if (!Array.isArray(data) || data.length === 0) {

View File

@@ -28,7 +28,8 @@ export const joinByFieldTransformer: SynchronousDataTransformerInfo<JoinByFieldO
mode: JoinMode.outer,
},
operator: (options) => (source) => source.pipe(map((data) => joinByFieldTransformer.transformer(options)(data))),
operator: (options, ctx) => (source) =>
source.pipe(map((data) => joinByFieldTransformer.transformer(options, ctx)(data))),
transformer: (options: JoinByFieldOptions) => {
let joinBy: FieldMatcher | undefined = undefined;

View File

@@ -28,7 +28,8 @@ export const labelsToFieldsTransformer: SynchronousDataTransformerInfo<LabelsToF
description: 'Extract time series labels to fields (columns or rows)',
defaultOptions: {},
operator: (options) => (source) => source.pipe(map((data) => labelsToFieldsTransformer.transformer(options)(data))),
operator: (options, ctx) => (source) =>
source.pipe(map((data) => labelsToFieldsTransformer.transformer(options, ctx)(data))),
transformer: (options: LabelsToFieldsOptions) => (data: DataFrame[]) => {
// Show each label as a field row

View File

@@ -25,13 +25,16 @@ export const organizeFieldsTransformer: DataTransformerInfo<OrganizeFieldsTransf
* Return a modified copy of the series. If the transform is not or should not
* be applied, just return the input series
*/
operator: (options) => (source) =>
operator: (options, ctx) => (source) =>
source.pipe(
filterFieldsByNameTransformer.operator({
exclude: { names: mapToExcludeArray(options.excludeByName) },
}),
orderFieldsTransformer.operator(options),
renameFieldsTransformer.operator(options)
filterFieldsByNameTransformer.operator(
{
exclude: { names: mapToExcludeArray(options.excludeByName) },
},
ctx
),
orderFieldsTransformer.operator(options, ctx),
renameFieldsTransformer.operator(options, ctx)
),
};

View File

@@ -2,8 +2,15 @@ import { MonoTypeOperatorFunction } from 'rxjs';
import { RegistryItemWithOptions } from '../utils/Registry';
import { ScopedVars } from './ScopedVars';
import { DataFrame, Field } from './dataFrame';
import { InterpolateFunction } from './panel';
/**
* Context passed to transformDataFrame and to each transform operator
*/
export interface DataTransformContext {
interpolate: InterpolateFunction;
}
/**
* Function that transform data frames (AKA transformer)
@@ -15,10 +22,7 @@ export interface DataTransformerInfo<TOptions = any> extends RegistryItemWithOpt
* Function that configures transformation and returns a transformer
* @param options
*/
operator: (
options: TOptions,
replace?: (target?: string, scopedVars?: ScopedVars, format?: string | Function) => string
) => MonoTypeOperatorFunction<DataFrame[]>;
operator: (options: TOptions, context: DataTransformContext) => MonoTypeOperatorFunction<DataFrame[]>;
}
/**
@@ -28,7 +32,7 @@ export interface DataTransformerInfo<TOptions = any> extends RegistryItemWithOpt
* @public
*/
export interface SynchronousDataTransformerInfo<TOptions = any> extends DataTransformerInfo<TOptions> {
transformer: (options: TOptions) => (frames: DataFrame[]) => DataFrame[];
transformer: (options: TOptions, context: DataTransformContext) => (frames: DataFrame[]) => DataFrame[];
}
/**
@@ -47,10 +51,6 @@ export interface DataTransformerConfig<TOptions = any> {
* Options to be passed to the transformer
*/
options: TOptions;
/**
* Function to apply template variable substitution to the DataTransformerConfig
*/
replace?: (target?: string, scopedVars?: ScopedVars, format?: string | Function) => string;
}
export type FrameMatcher = (frame: DataFrame) => boolean;