mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Transformations: support a synchronous transformation pattern (#37780)
This commit is contained in:
parent
181d6a78ed
commit
3205450210
@ -1,33 +1,28 @@
|
||||
import { of } from 'rxjs';
|
||||
|
||||
import { seriesToColumnsTransformer } from './seriesToColumns';
|
||||
import { DataFrame } from '../../types/dataFrame';
|
||||
import { getTimeField } from '../../dataframe/processDataFrame';
|
||||
import { DataTransformerInfo } from '../../types/transformations';
|
||||
import { SynchronousDataTransformerInfo } from '../../types/transformations';
|
||||
import { DataTransformerID } from './ids';
|
||||
import { mergeMap } from 'rxjs/operators';
|
||||
import { map } from 'rxjs/operators';
|
||||
|
||||
export const ensureColumnsTransformer: DataTransformerInfo = {
|
||||
export const ensureColumnsTransformer: SynchronousDataTransformerInfo = {
|
||||
id: DataTransformerID.ensureColumns,
|
||||
name: 'Ensure Columns Transformer',
|
||||
description: 'Will check if current data frames is series or columns. If in series it will convert to columns.',
|
||||
operator: (options = {}) => (source) =>
|
||||
source.pipe(
|
||||
mergeMap((data) => {
|
||||
// Assume timeseries should first be joined by time
|
||||
const timeFieldName = findConsistentTimeFieldName(data);
|
||||
|
||||
if (data.length > 1 && timeFieldName) {
|
||||
return of(data).pipe(
|
||||
seriesToColumnsTransformer.operator({
|
||||
byField: timeFieldName,
|
||||
})
|
||||
);
|
||||
}
|
||||
operator: (options) => (source) => source.pipe(map((data) => ensureColumnsTransformer.transformer(options)(data))),
|
||||
|
||||
return of(data);
|
||||
})
|
||||
),
|
||||
transformer: (options: any) => (frames: DataFrame[]) => {
|
||||
// Assume timeseries should first be joined by time
|
||||
const timeFieldName = findConsistentTimeFieldName(frames);
|
||||
|
||||
if (frames.length > 1 && timeFieldName) {
|
||||
return seriesToColumnsTransformer.transformer({
|
||||
byField: timeFieldName,
|
||||
})(frames);
|
||||
}
|
||||
return frames;
|
||||
},
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -1,4 +1,4 @@
|
||||
import { DataTransformerInfo } from '../../types';
|
||||
import { SynchronousDataTransformerInfo } from '../../types';
|
||||
import { map } from 'rxjs/operators';
|
||||
|
||||
import { DataTransformerID } from './ids';
|
||||
@ -73,7 +73,7 @@ export const histogramFieldInfo = {
|
||||
/**
|
||||
* @alpha
|
||||
*/
|
||||
export const histogramTransformer: DataTransformerInfo<HistogramTransformerOptions> = {
|
||||
export const histogramTransformer: SynchronousDataTransformerInfo<HistogramTransformerOptions> = {
|
||||
id: DataTransformerID.histogram,
|
||||
name: 'Histogram',
|
||||
description: 'Calculate a histogram from input data',
|
||||
@ -81,23 +81,18 @@ export const histogramTransformer: DataTransformerInfo<HistogramTransformerOptio
|
||||
fields: {},
|
||||
},
|
||||
|
||||
/**
|
||||
* Return a modified copy of the series. If the transform is not or should not
|
||||
* be applied, just return the input series
|
||||
*/
|
||||
operator: (options) => (source) =>
|
||||
source.pipe(
|
||||
map((data) => {
|
||||
if (!Array.isArray(data) || data.length === 0) {
|
||||
return data;
|
||||
}
|
||||
const hist = buildHistogram(data, options);
|
||||
if (hist == null) {
|
||||
return [];
|
||||
}
|
||||
return [histogramFieldsToFrame(hist)];
|
||||
})
|
||||
),
|
||||
operator: (options) => (source) => source.pipe(map((data) => histogramTransformer.transformer(options)(data))),
|
||||
|
||||
transformer: (options: HistogramTransformerOptions) => (data: DataFrame[]) => {
|
||||
if (!Array.isArray(data) || data.length === 0) {
|
||||
return data;
|
||||
}
|
||||
const hist = buildHistogram(data, options);
|
||||
if (hist == null) {
|
||||
return [];
|
||||
}
|
||||
return [histogramFieldsToFrame(hist)];
|
||||
},
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -1,6 +1,6 @@
|
||||
import { map } from 'rxjs/operators';
|
||||
|
||||
import { DataTransformerInfo, FieldMatcher } from '../../types';
|
||||
import { DataFrame, SynchronousDataTransformerInfo, FieldMatcher } from '../../types';
|
||||
import { DataTransformerID } from './ids';
|
||||
import { outerJoinDataFrames } from './joinDataFrames';
|
||||
import { fieldMatchers } from '../matchers';
|
||||
@ -10,27 +10,29 @@ export interface SeriesToColumnsOptions {
|
||||
byField?: string; // empty will pick the field automatically
|
||||
}
|
||||
|
||||
export const seriesToColumnsTransformer: DataTransformerInfo<SeriesToColumnsOptions> = {
|
||||
export const seriesToColumnsTransformer: SynchronousDataTransformerInfo<SeriesToColumnsOptions> = {
|
||||
id: DataTransformerID.seriesToColumns,
|
||||
name: 'Series as columns', // Called 'Outer join' in the UI!
|
||||
description: 'Groups series by field and returns values as columns',
|
||||
defaultOptions: {
|
||||
byField: undefined, // DEFAULT_KEY_FIELD,
|
||||
},
|
||||
operator: (options) => (source) =>
|
||||
source.pipe(
|
||||
map((data) => {
|
||||
if (data.length > 1) {
|
||||
let joinBy: FieldMatcher | undefined = undefined;
|
||||
if (options.byField) {
|
||||
joinBy = fieldMatchers.get(FieldMatcherID.byName).get(options.byField);
|
||||
}
|
||||
const joined = outerJoinDataFrames({ frames: data, joinBy });
|
||||
if (joined) {
|
||||
return [joined];
|
||||
}
|
||||
|
||||
operator: (options) => (source) => source.pipe(map((data) => seriesToColumnsTransformer.transformer(options)(data))),
|
||||
|
||||
transformer: (options: SeriesToColumnsOptions) => {
|
||||
let joinBy: FieldMatcher | undefined = undefined;
|
||||
return (data: DataFrame[]) => {
|
||||
if (data.length > 1) {
|
||||
if (options.byField && !joinBy) {
|
||||
joinBy = fieldMatchers.get(FieldMatcherID.byName).get(options.byField);
|
||||
}
|
||||
return data;
|
||||
})
|
||||
),
|
||||
const joined = outerJoinDataFrames({ frames: data, joinBy });
|
||||
if (joined) {
|
||||
return [joined];
|
||||
}
|
||||
}
|
||||
return data;
|
||||
};
|
||||
},
|
||||
};
|
||||
|
@ -5,6 +5,8 @@ import { RegistryItemWithOptions } from '../utils/Registry';
|
||||
|
||||
/**
|
||||
* Function that transform data frames (AKA transformer)
|
||||
*
|
||||
* @public
|
||||
*/
|
||||
export interface DataTransformerInfo<TOptions = any> extends RegistryItemWithOptions {
|
||||
/**
|
||||
@ -14,6 +16,19 @@ export interface DataTransformerInfo<TOptions = any> extends RegistryItemWithOpt
|
||||
operator: (options: TOptions) => MonoTypeOperatorFunction<DataFrame[]>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Many transformations can be called with a simple synchronous function.
|
||||
* When a transformer is defined, it should have identical behavior to using the operator
|
||||
*
|
||||
* @public
|
||||
*/
|
||||
export interface SynchronousDataTransformerInfo<TOptions = any> extends DataTransformerInfo<TOptions> {
|
||||
transformer: (options: TOptions) => (frames: DataFrame[]) => DataFrame[];
|
||||
}
|
||||
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export interface DataTransformerConfig<TOptions = any> {
|
||||
/**
|
||||
* Unique identifier of transformer
|
||||
|
@ -1,5 +1,5 @@
|
||||
import { toDataFrame, ArrayVector, DataFrame, FieldType, toDataFrameDTO, DataFrameDTO } from '@grafana/data';
|
||||
import { prepareTimeSeries, PrepareTimeSeriesOptions, timeSeriesFormat } from './prepareTimeSeries';
|
||||
import { prepareTimeSeriesTransformer, PrepareTimeSeriesOptions, timeSeriesFormat } from './prepareTimeSeries';
|
||||
|
||||
describe('Prepair time series transformer', () => {
|
||||
it('should transform wide to many', () => {
|
||||
@ -19,7 +19,7 @@ describe('Prepair time series transformer', () => {
|
||||
format: timeSeriesFormat.TimeSeriesMany,
|
||||
};
|
||||
|
||||
expect(prepareTimeSeries(source, config)).toEqual([
|
||||
expect(prepareTimeSeriesTransformer.transformer(config)(source)).toEqual([
|
||||
toEquableDataFrame({
|
||||
name: 'wide',
|
||||
refId: 'A',
|
||||
@ -59,7 +59,7 @@ describe('Prepair time series transformer', () => {
|
||||
format: timeSeriesFormat.TimeSeriesMany,
|
||||
};
|
||||
|
||||
expect(prepareTimeSeries(source, config)).toEqual([
|
||||
expect(prepareTimeSeriesTransformer.transformer(config)(source)).toEqual([
|
||||
toEquableDataFrame({
|
||||
name: 'wide',
|
||||
refId: 'A',
|
||||
@ -107,7 +107,7 @@ describe('Prepair time series transformer', () => {
|
||||
format: timeSeriesFormat.TimeSeriesMany,
|
||||
};
|
||||
|
||||
expect(prepareTimeSeries(source, config)).toEqual([
|
||||
expect(prepareTimeSeriesTransformer.transformer(config)(source)).toEqual([
|
||||
toEquableDataFrame({
|
||||
name: 'wide',
|
||||
refId: 'A',
|
||||
@ -162,7 +162,9 @@ describe('Prepair time series transformer', () => {
|
||||
format: timeSeriesFormat.TimeSeriesMany,
|
||||
};
|
||||
|
||||
expect(toEquableDataFrames(prepareTimeSeries(source, config))).toEqual(toEquableDataFrames(source));
|
||||
expect(toEquableDataFrames(prepareTimeSeriesTransformer.transformer(config)(source))).toEqual(
|
||||
toEquableDataFrames(source)
|
||||
);
|
||||
});
|
||||
|
||||
it('should return empty array when no timeseries exist', () => {
|
||||
@ -191,7 +193,7 @@ describe('Prepair time series transformer', () => {
|
||||
format: timeSeriesFormat.TimeSeriesMany,
|
||||
};
|
||||
|
||||
expect(prepareTimeSeries(source, config)).toEqual([]);
|
||||
expect(prepareTimeSeriesTransformer.transformer(config)(source)).toEqual([]);
|
||||
});
|
||||
});
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
import {
|
||||
DataTransformerInfo,
|
||||
SynchronousDataTransformerInfo,
|
||||
DataFrame,
|
||||
FieldType,
|
||||
DataTransformerID,
|
||||
@ -63,31 +63,30 @@ export function toTimeSeriesMany(data: DataFrame[]): DataFrame[] {
|
||||
return result;
|
||||
}
|
||||
|
||||
export function prepareTimeSeries(data: DataFrame[], options: PrepareTimeSeriesOptions): DataFrame[] {
|
||||
const format = options?.format ?? timeSeriesFormat.TimeSeriesWide;
|
||||
if (format === timeSeriesFormat.TimeSeriesMany) {
|
||||
return toTimeSeriesMany(data);
|
||||
}
|
||||
|
||||
// Join by the first frame
|
||||
const frame = outerJoinDataFrames({
|
||||
frames: data,
|
||||
joinBy: fieldMatchers.get(FieldMatcherID.firstTimeField).get({}),
|
||||
enforceSort: true,
|
||||
keepOriginIndices: true,
|
||||
});
|
||||
return frame ? [frame] : [];
|
||||
}
|
||||
|
||||
export const prepareTimeSeriesTransformer: DataTransformerInfo<PrepareTimeSeriesOptions> = {
|
||||
export const prepareTimeSeriesTransformer: SynchronousDataTransformerInfo<PrepareTimeSeriesOptions> = {
|
||||
id: DataTransformerID.prepareTimeSeries,
|
||||
name: 'Prepare time series',
|
||||
description: `Will stretch data frames from the wide format into the long format. This is really helpful to be able to keep backwards compatability for panels not supporting the new wide format.`,
|
||||
defaultOptions: {},
|
||||
|
||||
/**
|
||||
* Return a modified copy of the series. If the transform is not or should not
|
||||
* be applied, just return the input series
|
||||
*/
|
||||
operator: (options) => (source) => source.pipe(map((data) => prepareTimeSeries(data, options))),
|
||||
operator: (options) => (source) =>
|
||||
source.pipe(map((data) => prepareTimeSeriesTransformer.transformer(options)(data))),
|
||||
|
||||
transformer: (options: PrepareTimeSeriesOptions) => {
|
||||
const format = options?.format ?? timeSeriesFormat.TimeSeriesWide;
|
||||
if (format === timeSeriesFormat.TimeSeriesMany) {
|
||||
return toTimeSeriesMany;
|
||||
}
|
||||
|
||||
return (data: DataFrame[]) => {
|
||||
// Join by the first frame
|
||||
const frame = outerJoinDataFrames({
|
||||
frames: data,
|
||||
joinBy: fieldMatchers.get(FieldMatcherID.firstTimeField).get({}),
|
||||
enforceSort: true,
|
||||
keepOriginIndices: true,
|
||||
});
|
||||
return frame ? [frame] : [];
|
||||
};
|
||||
},
|
||||
};
|
||||
|
Loading…
Reference in New Issue
Block a user