Scenes: Add support for transformations (#60329)

* Add support for transformations

* Add tests for data transformers

* Provide replace function to transformations

* Unsubscribe from existing transformations stream if new data comes in
This commit is contained in:
Dominik Prokop 2022-12-15 04:56:29 -08:00 committed by GitHub
parent b2da0d22af
commit 66c076f24e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 449 additions and 8 deletions

View File

@ -0,0 +1,163 @@
import { map } from 'rxjs';
import {
ArrayVector,
getDefaultTimeRange,
LoadingState,
standardTransformersRegistry,
toDataFrame,
} from '@grafana/data';
import { SceneFlexLayout } from '../components';
import { SceneDataNode } from './SceneDataNode';
import { SceneDataTransformer } from './SceneDataTransformer';
import { SceneObjectBase } from './SceneObjectBase';
import { sceneGraph } from './sceneGraph';
class TestSceneObject extends SceneObjectBase<{}> {}
describe('SceneDataTransformer', () => {
let transformerSpy1 = jest.fn();
let transformerSpy2 = jest.fn();
beforeEach(() => {
standardTransformersRegistry.setInit(() => {
return [
{
id: 'customTransformer1',
editor: () => null,
transformation: {
id: 'customTransformer1',
name: 'Custom Transformer',
operator: (options) => (source) => {
transformerSpy1(options);
return source.pipe(
map((data) => {
return data.map((frame) => {
return {
...frame,
fields: frame.fields.map((field) => {
return {
...field,
values: new ArrayVector(field.values.toArray().map((v) => v * 2)),
};
}),
};
});
})
);
},
},
name: 'Custom Transformer',
},
{
id: 'customTransformer2',
editor: () => null,
transformation: {
id: 'customTransformer2',
name: 'Custom Transformer2',
operator: (options) => (source) => {
transformerSpy2(options);
return source.pipe(
map((data) => {
return data.map((frame) => {
return {
...frame,
fields: frame.fields.map((field) => {
return {
...field,
values: new ArrayVector(field.values.toArray().map((v) => v * 3)),
};
}),
};
});
})
);
},
},
name: 'Custom Transformer 2',
},
];
});
});
it('applies transformations to closest data node', () => {
const sourceDataNode = new SceneDataNode({
data: {
state: LoadingState.Loading,
timeRange: getDefaultTimeRange(),
series: [
toDataFrame([
[100, 1],
[200, 2],
[300, 3],
]),
],
},
});
const transformationNode = new SceneDataTransformer({
transformations: [
{
id: 'customTransformer1',
options: {
option: 'value1',
},
},
{
id: 'customTransformer2',
options: {
option: 'value2',
},
},
],
});
const consumer = new TestSceneObject({
$data: transformationNode,
});
// @ts-expect-error
const scene = new SceneFlexLayout({
$data: sourceDataNode,
children: [consumer],
});
sourceDataNode.activate();
transformationNode.activate();
// Transforms initial data
let data = sceneGraph.getData(consumer).state.data;
expect(transformerSpy1).toHaveBeenCalledTimes(1);
expect(transformerSpy1).toHaveBeenCalledWith({ option: 'value1' });
expect(transformerSpy2).toHaveBeenCalledTimes(1);
expect(transformerSpy2).toHaveBeenCalledWith({ option: 'value2' });
expect(data?.series.length).toBe(1);
expect(data?.series[0].fields).toHaveLength(2);
expect(data?.series[0].fields[0].values.toArray()).toEqual([600, 1200, 1800]);
expect(data?.series[0].fields[1].values.toArray()).toEqual([6, 12, 18]);
sourceDataNode.setState({
data: {
state: LoadingState.Done,
timeRange: getDefaultTimeRange(),
series: [
toDataFrame([
[10, 10],
[20, 20],
[30, 30],
]),
],
},
});
// Transforms updated data
data = sceneGraph.getData(consumer).state.data;
expect(transformerSpy1).toHaveBeenCalledTimes(2);
expect(transformerSpy2).toHaveBeenCalledTimes(2);
expect(data?.series[0].fields[0].values.toArray()).toEqual([60, 120, 180]);
expect(data?.series[0].fields[1].values.toArray()).toEqual([60, 120, 180]);
});
});

View File

@ -0,0 +1,64 @@
import { Observable, of, Unsubscribable } from 'rxjs';
import { DataTransformerConfig, LoadingState, PanelData } from '@grafana/data';
import { getTransformationsStream } from '../querying/SceneQueryRunner';
import { SceneObjectBase } from './SceneObjectBase';
import { sceneGraph } from './sceneGraph';
import { SceneDataState } from './types';
export interface SceneDataTransformerState extends SceneDataState {
transformations?: DataTransformerConfig[];
}
export class SceneDataTransformer extends SceneObjectBase<SceneDataTransformerState> {
private _transformationsSub?: Unsubscribable;
public activate() {
super.activate();
if (!this.parent || !this.parent.parent) {
return;
}
const initialData = sceneGraph.getData(this.parent.parent).state.data;
if (initialData) {
this.transformData(of(initialData));
}
this._subs.add(
// Need to subscribe to the parent's parent because the parent has a $data reference to this object
sceneGraph.getData(this.parent.parent).subscribeToState({
next: (data) => {
if (data.data?.state === LoadingState.Done) {
this.transformData(of(data.data));
}
},
})
);
}
public deactivate(): void {
super.deactivate();
if (this._transformationsSub) {
this._transformationsSub.unsubscribe();
this._transformationsSub = undefined;
}
}
private transformData(data: Observable<PanelData>) {
if (this._transformationsSub) {
this._transformationsSub.unsubscribe();
this._transformationsSub = undefined;
}
this._transformationsSub = data.pipe(getTransformationsStream(this, this.state.transformations)).subscribe({
next: (data) => {
this.setState({ data });
},
});
}
}

View File

@ -1,6 +1,15 @@
import { of } from 'rxjs';
import { map, of } from 'rxjs';
import { DataQueryRequest, DataSourceApi, getDefaultTimeRange, LoadingState, PanelData } from '@grafana/data';
import {
ArrayVector,
DataQueryRequest,
DataSourceApi,
getDefaultTimeRange,
LoadingState,
PanelData,
standardTransformersRegistry,
toDataFrame,
} from '@grafana/data';
import { SceneTimeRange } from '../core/SceneTimeRange';
@ -21,7 +30,13 @@ jest.mock('app/features/plugins/datasource_srv', () => ({
const runRequest = jest.fn().mockReturnValue(
of<PanelData>({
state: LoadingState.Done,
series: [],
series: [
toDataFrame([
[100, 1],
[200, 2],
[300, 3],
]),
],
timeRange: getDefaultTimeRange(),
})
);
@ -80,4 +95,106 @@ describe('SceneQueryRunner', () => {
expect(queryRunner.state.data?.state).toBe(LoadingState.Done);
});
});
describe('transformations', () => {
let transformerSpy1 = jest.fn();
let transformerSpy2 = jest.fn();
beforeEach(() => {
standardTransformersRegistry.setInit(() => {
return [
{
id: 'customTransformer1',
editor: () => null,
transformation: {
id: 'customTransformer1',
name: 'Custom Transformer',
operator: (options) => (source) => {
transformerSpy1(options);
return source.pipe(
map((data) => {
return data.map((frame) => {
return {
...frame,
fields: frame.fields.map((field) => {
return {
...field,
values: new ArrayVector(field.values.toArray().map((v) => v * 2)),
};
}),
};
});
})
);
},
},
name: 'Custom Transformer',
},
{
id: 'customTransformer2',
editor: () => null,
transformation: {
id: 'customTransformer2',
name: 'Custom Transformer2',
operator: (options) => (source) => {
transformerSpy2(options);
return source.pipe(
map((data) => {
return data.map((frame) => {
return {
...frame,
fields: frame.fields.map((field) => {
return {
...field,
values: new ArrayVector(field.values.toArray().map((v) => v * 3)),
};
}),
};
});
})
);
},
},
name: 'Custom Transformer 2',
},
];
});
});
it('should apply transformations to query results', async () => {
const queryRunner = new SceneQueryRunner({
queries: [{ refId: 'A' }],
$timeRange: new SceneTimeRange(),
maxDataPoints: 100,
transformations: [
{
id: 'customTransformer1',
options: {
option: 'value1',
},
},
{
id: 'customTransformer2',
options: {
option: 'value2',
},
},
],
});
queryRunner.activate();
await new Promise((r) => setTimeout(r, 1));
expect(queryRunner.state.data?.state).toBe(LoadingState.Done);
expect(transformerSpy1).toHaveBeenCalledTimes(1);
expect(transformerSpy1).toHaveBeenCalledWith({ option: 'value1' });
expect(transformerSpy2).toHaveBeenCalledTimes(1);
expect(transformerSpy2).toHaveBeenCalledWith({ option: 'value2' });
expect(queryRunner.state.data?.series).toHaveLength(1);
expect(queryRunner.state.data?.series[0].fields).toHaveLength(2);
expect(queryRunner.state.data?.series[0].fields[0].values.toArray()).toEqual([600, 1200, 1800]);
expect(queryRunner.state.data?.series[0].fields[1].values.toArray()).toEqual([6, 12, 18]);
});
});
});

View File

@ -1,5 +1,5 @@
import { cloneDeep } from 'lodash';
import { Unsubscribable } from 'rxjs';
import { mergeMap, MonoTypeOperatorFunction, Unsubscribable, map, of } from 'rxjs';
import {
CoreApp,
@ -7,10 +7,12 @@ import {
DataQueryRequest,
DataSourceApi,
DataSourceRef,
DataTransformerConfig,
PanelData,
rangeUtil,
ScopedVars,
TimeRange,
transformDataFrame,
} from '@grafana/data';
import { getDatasourceSrv } from 'app/features/plugins/datasource_srv';
import { getNextRequestId } from 'app/features/query/state/PanelQueryRunner';
@ -18,12 +20,13 @@ import { runRequest } from 'app/features/query/state/runRequest';
import { SceneObjectBase } from '../core/SceneObjectBase';
import { sceneGraph } from '../core/sceneGraph';
import { SceneObjectStatePlain } from '../core/types';
import { SceneObject, SceneObjectStatePlain } from '../core/types';
import { VariableDependencyConfig } from '../variables/VariableDependencyConfig';
export interface QueryRunnerState extends SceneObjectStatePlain {
data?: PanelData;
queries: DataQueryExtended[];
transformations?: DataTransformerConfig[];
datasource?: DataSourceRef;
minInterval?: string;
maxDataPoints?: number;
@ -158,9 +161,11 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> {
request.interval = norm.interval;
request.intervalMs = norm.intervalMs;
this._querySub = runRequest(ds, request).subscribe({
next: this.onDataReceived,
});
this._querySub = runRequest(ds, request)
.pipe(getTransformationsStream(this, this.state.transformations))
.subscribe({
next: this.onDataReceived,
});
} catch (err) {
console.error('PanelQueryRunner Error', err);
}
@ -177,3 +182,26 @@ async function getDataSource(datasource: DataSourceRef | undefined, scopedVars:
}
return await getDatasourceSrv().get(datasource as string, scopedVars);
}
export const getTransformationsStream: (
sceneObject: SceneObject,
transformations?: DataTransformerConfig[]
) => MonoTypeOperatorFunction<PanelData> = (sceneObject, transformations) => (inputStream) => {
return inputStream.pipe(
mergeMap((data) => {
if (!transformations || transformations.length === 0) {
return of(data);
}
const replace: (option?: string) => string = (option) => {
return sceneGraph.interpolate(sceneObject, option, data?.request?.scopedVars);
};
transformations.forEach((transform: DataTransformerConfig) => {
transform.replace = replace;
});
return transformDataFrame(transformations, data.series).pipe(map((series) => ({ ...data, series })));
})
);
};

View File

@ -9,6 +9,7 @@ import { getGridWithRowLayoutTest } from './gridWithRow';
import { getNestedScene } from './nested';
import { getQueryVariableDemo } from './queryVariableDemo';
import { getSceneWithRows } from './sceneWithRows';
import { getTransformationsDemo } from './transformations';
import { getVariablesDemo, getVariablesDemoWithAll } from './variablesDemo';
interface SceneDef {
@ -29,6 +30,7 @@ export function getScenes(): SceneDef[] {
{ title: 'Variables', getScene: getVariablesDemo },
{ title: 'Variables with All values', getScene: getVariablesDemoWithAll },
{ title: 'Query variable', getScene: getQueryVariableDemo },
{ title: 'Transformations demo', getScene: getTransformationsDemo },
];
}

View File

@ -0,0 +1,67 @@
import { Scene, SceneTimePicker, SceneFlexLayout, VizPanel } from '../components';
import { EmbeddedScene } from '../components/Scene';
import { SceneDataTransformer } from '../core/SceneDataTransformer';
import { SceneTimeRange } from '../core/SceneTimeRange';
import { SceneEditManager } from '../editor/SceneEditManager';
import { getQueryRunnerWithRandomWalkQuery } from './queries';
export function getTransformationsDemo(standalone: boolean): Scene {
const state = {
title: 'Transformations demo',
layout: new SceneFlexLayout({
direction: 'row',
children: [
new SceneFlexLayout({
direction: 'column',
children: [
new SceneFlexLayout({
direction: 'row',
children: [
new VizPanel({
pluginId: 'timeseries',
title: 'Source data (global query',
}),
new VizPanel({
pluginId: 'stat',
title: 'Transformed data',
$data: new SceneDataTransformer({
transformations: [
{
id: 'reduce',
options: {
reducers: ['last', 'mean'],
},
},
],
}),
}),
],
}),
new VizPanel({
$data: getQueryRunnerWithRandomWalkQuery(undefined, {
transformations: [
{
id: 'reduce',
options: {
reducers: ['mean'],
},
},
],
}),
pluginId: 'stat',
title: 'Query with predefined transformations',
}),
],
}),
],
}),
$editor: new SceneEditManager({}),
$timeRange: new SceneTimeRange(),
$data: getQueryRunnerWithRandomWalkQuery(),
actions: [new SceneTimePicker({})],
};
return standalone ? new Scene(state) : new EmbeddedScene(state);
}