QueryProcessing: Observable query interface and RxJS for query & stream processing (#18899)

* I needed to learn some rxjs and understand this more, so just playing around

* Updated

* Removed all the complete calls

* Refactoring

* StreamHandler -> observable start

* progress

* simple singal works

* Handle update time range

* added error handling

* wrap old function

* minor changes

* handle data format in the subscribe function

* Use replay subject to return last value to subscribers

* Set loading state after no response in 50ms

* added missing file

* updated comment

* Added cancelation of network requests

* runRequest: Added unit test scenario framework

* Progress on tests

* minor refactor of unit tests

* updated test

* removed some old code

* Shared queries work again, and also became so much simplier

* unified query and observe methods

* implict any fix

* Fixed closed subject issue

* removed comment

* Use last returned data for loading state

* WIP: Explore to runRequest makover step1

* Minor progress

* Minor progress on explore and runRequest

* minor progress

* Things are starting to work in explore

* Updated prometheus to use new observable query response, greatly simplified code

* Revert refId change

* Found better solution for key/refId/requestId problem

* use observable with loki

* tests compile

* fix loki query prep

* Explore: correct first response handling

* Refactorings

* Refactoring

* Explore: Fixes LoadingState and GraphResults between runs (#18986)

* Refactor: Adds state to DataQueryResponse

* Fix: Fixes so we do not empty results before new data arrives
Fixes: #17409

* Transformations work

* observable test data

* remove single() from loki promise

* Fixed comment

* Explore: Fixes failing Loki and Prometheus unit tests (#18995)

* Tests: Makes datasource tests work again

* Fix: Fixes loki datasource so highligthing works

* Chore: Runs Prettier

* Fixed query runner tests

* Delay loading state indication to 200ms

* Fixed test

* fixed unit tests

* Clear cached calcs

* Fixed bug getProcesedDataFrames

* Fix the correct test is a better idea

* Fix: Fixes so queries in Explore are only run if Graph/Table is shown (#19000)

* Fix: Fixes so queries in Explore are only run if Graph/Table is shown
Fixes: #18618

* Refactor: Removes unnecessary condition

* PanelData: provide legacy data only when needed  (#19018)

* no legacy

* invert logic... now compiles

* merge getQueryResponseData and getDataRaw

* update comment about query editor

* use single getData() function

* only send legacy when it is used in explore

* pre process rather than post process

* pre process rather than post process

* Minor refactoring

* Add missing tags to test datasource response

* MixedDatasource: Adds query observable pattern to MixedDatasource (#19037)

* start mixed datasource

* Refactor: Refactors into observable parttern

* Tests: Fixes tests

* Tests: Removes console.log

* Refactor: Adds unique requestId
This commit is contained in:
Torkel Ödegaard
2019-09-12 17:28:46 +02:00
committed by GitHub
parent 3742db720f
commit 140ecbcf79
49 changed files with 1620 additions and 1525 deletions

View File

@@ -11,7 +11,7 @@ import { ErrorBoundary } from '@grafana/ui';
import { getTimeSrv, TimeSrv } from '../services/TimeSrv';
import { applyPanelTimeOverrides, calculateInnerPanelHeight } from 'app/features/dashboard/utils/panel';
import { profiler } from 'app/core/profiler';
import { getProcessedDataFrames } from '../state/PanelQueryState';
import { getProcessedDataFrames } from '../state/runRequest';
import templateSrv from 'app/features/templating/template_srv';
import config from 'app/core/config';
@@ -82,6 +82,7 @@ export class PanelChrome extends PureComponent<Props, State> {
componentWillUnmount() {
this.props.panel.events.off('refresh', this.onRefresh);
if (this.querySubscription) {
this.querySubscription.unsubscribe();
this.querySubscription = null;
@@ -94,12 +95,6 @@ export class PanelChrome extends PureComponent<Props, State> {
// View state has changed
if (isInView !== prevProps.isInView) {
if (isInView) {
// Subscribe will kick of a notice of the last known state
if (!this.querySubscription && this.wantsQueryExecution) {
const runner = this.props.panel.getQueryRunner();
this.querySubscription = runner.subscribe(this.panelDataObserver);
}
// Check if we need a delayed refresh
if (this.state.refreshWhenInView) {
this.onRefresh();
@@ -170,8 +165,9 @@ export class PanelChrome extends PureComponent<Props, State> {
const queryRunner = panel.getQueryRunner();
if (!this.querySubscription) {
this.querySubscription = queryRunner.subscribe(this.panelDataObserver);
this.querySubscription = queryRunner.getData().subscribe(this.panelDataObserver);
}
queryRunner.run({
datasource: panel.datasource,
queries: panel.targets,

View File

@@ -22,10 +22,8 @@ import { DashboardModel } from '../state/DashboardModel';
import { DataQuery, DataSourceSelectItem, PanelData, AlphaNotice, PluginState } from '@grafana/ui';
import { LoadingState, DataTransformerConfig } from '@grafana/data';
import { PluginHelp } from 'app/core/components/PluginHelp/PluginHelp';
import { PanelQueryRunnerFormat } from '../state/PanelQueryRunner';
import { Unsubscribable } from 'rxjs';
import { isSharedDashboardQuery } from 'app/plugins/datasource/dashboard/SharedQueryRunner';
import { DashboardQueryEditor } from 'app/plugins/datasource/dashboard/DashboardQueryEditor';
import { isSharedDashboardQuery, DashboardQueryEditor } from 'app/plugins/datasource/dashboard';
interface Props {
panel: PanelModel;
@@ -64,7 +62,9 @@ export class QueriesTab extends PureComponent<Props, State> {
const { panel } = this.props;
const queryRunner = panel.getQueryRunner();
this.querySubscription = queryRunner.subscribe(this.panelDataObserver, PanelQueryRunnerFormat.both);
this.querySubscription = queryRunner.getData(false).subscribe({
next: (data: PanelData) => this.onPanelDataUpdate(data),
});
}
componentWillUnmount() {
@@ -74,22 +74,9 @@ export class QueriesTab extends PureComponent<Props, State> {
}
}
// Updates the response with information from the stream
panelDataObserver = {
next: (data: PanelData) => {
try {
const { panel } = this.props;
if (data.state === LoadingState.Error) {
panel.events.emit('data-error', data.error);
} else if (data.state === LoadingState.Done) {
panel.events.emit('data-received', data.legacy);
}
} catch (err) {
console.log('Panel.events handler error', err);
}
this.setState({ data });
},
};
onPanelDataUpdate(data: PanelData) {
this.setState({ data });
}
findCurrentDataSource(): DataSourceSelectItem {
const { panel } = this.props;
@@ -226,11 +213,6 @@ export class QueriesTab extends PureComponent<Props, State> {
this.setState({ scrollTop: target.scrollTop });
};
getCurrentData = (applyTransformations = true) => {
const queryRunner = this.props.panel.getQueryRunner();
return queryRunner.getCurrentData(applyTransformations).series;
};
render() {
const { panel, dashboard } = this.props;
const { currentDS, scrollTop, data } = this.state;
@@ -301,7 +283,7 @@ export class QueriesTab extends PureComponent<Props, State> {
<TransformationsEditor
transformations={this.props.panel.transformations || []}
onChange={this.onTransformersChange}
getCurrentData={this.getCurrentData}
dataFrames={data.series}
/>
)}
</PanelOptionsGroup>

View File

@@ -12,7 +12,7 @@ import { getTimeSrv } from 'app/features/dashboard/services/TimeSrv';
// Types
import { PanelModel } from '../state/PanelModel';
import { DataQuery, DataSourceApi, PanelData, DataQueryRequest, ErrorBoundaryAlert } from '@grafana/ui';
import { TimeRange, LoadingState } from '@grafana/data';
import { TimeRange, LoadingState, toLegacyResponseData } from '@grafana/data';
import { DashboardModel } from '../state/DashboardModel';
interface Props {
@@ -89,7 +89,7 @@ export class QueryEditorRow extends PureComponent<Props, State> {
componentDidUpdate(prevProps: Props) {
const { loadedDataSourceValue } = this.state;
const { data, query } = this.props;
const { data, query, panel } = this.props;
if (data !== prevProps.data) {
this.setState({ queryResponse: filterPanelDataToQuery(data, query.refId) });
@@ -99,9 +99,7 @@ export class QueryEditorRow extends PureComponent<Props, State> {
}
if (this.angularQueryEditor) {
// Some query controllers listen to data error events and need a digest
// for some reason this needs to be done in next tick
setTimeout(this.angularQueryEditor.digest);
notifyAngularQueryEditorsOfData(panel, data, this.angularQueryEditor);
}
}
@@ -265,6 +263,25 @@ export class QueryEditorRow extends PureComponent<Props, State> {
}
}
// To avoid sending duplicate events for each row we have this global cached object here
// So we can check if we already emitted this legacy data event
let globalLastPanelDataCache: PanelData = null;
function notifyAngularQueryEditorsOfData(panel: PanelModel, data: PanelData, editor: AngularComponent) {
if (data === globalLastPanelDataCache) {
return;
}
globalLastPanelDataCache = data;
const legacy = data.series.map(v => toLegacyResponseData(v));
panel.events.emit('data-received', legacy);
// Some query controllers listen to data error events and need a digest
// for some reason this needs to be done in next tick
setTimeout(editor.digest);
}
export interface AngularQueryComponentScope {
target: DataQuery;
panel: PanelModel;

View File

@@ -327,7 +327,8 @@ export class PanelModel {
getQueryRunner(): PanelQueryRunner {
if (!this.queryRunner) {
this.queryRunner = new PanelQueryRunner(this.id);
this.queryRunner = new PanelQueryRunner();
this.setTransformations(this.transformations);
}
return this.queryRunner;
}
@@ -336,6 +337,10 @@ export class PanelModel {
return this.title && this.title.length > 0;
}
isAngularPlugin(): boolean {
return this.plugin && !!this.plugin.angularPanelCtrl;
}
destroy() {
this.events.emit('panel-teardown');
this.events.removeAllListeners();
@@ -347,11 +352,8 @@ export class PanelModel {
}
setTransformations(transformations: DataTransformerConfig[]) {
// save for persistence
this.transformations = transformations;
// update query runner transformers
this.getQueryRunner().setTransform(transformations);
this.getQueryRunner().setTransformations(transformations);
}
}

View File

@@ -1,17 +1,13 @@
import { PanelQueryRunner, QueryRunnerOptions } from './PanelQueryRunner';
import { PanelData, DataQueryRequest, DataStreamObserver, DataStreamState } from '@grafana/ui';
import { LoadingState, MutableDataFrame, ScopedVars } from '@grafana/data';
import { dateTime } from '@grafana/data';
import { SHARED_DASHBODARD_QUERY } from 'app/plugins/datasource/dashboard/SharedQueryRunner';
import { DashboardQuery } from 'app/plugins/datasource/dashboard/types';
import { PanelQueryRunner } from './PanelQueryRunner';
import { PanelData, DataQueryRequest } from '@grafana/ui';
import { dateTime, ScopedVars } from '@grafana/data';
import { PanelModel } from './PanelModel';
import { Subject } from 'rxjs';
jest.mock('app/core/services/backend_srv');
// Defined within setup functions
const panelsForCurrentDashboardMock: { [key: number]: PanelModel } = {};
jest.mock('app/features/dashboard/services/DashboardSrv', () => ({
getDashboardSrv: () => {
return {
@@ -40,7 +36,6 @@ interface ScenarioContext {
events?: PanelData[];
res?: PanelData;
queryCalledWith?: DataQueryRequest;
observer: DataStreamObserver;
runner: PanelQueryRunner;
}
@@ -55,8 +50,7 @@ function describeQueryRunnerScenario(description: string, scenarioFn: ScenarioFn
scopedVars: {
server: { text: 'Server1', value: 'server-1' },
},
runner: new PanelQueryRunner(1),
observer: (args: any) => {},
runner: new PanelQueryRunner(),
setup: (fn: () => void) => {
setupFn = fn;
},
@@ -72,9 +66,8 @@ function describeQueryRunnerScenario(description: string, scenarioFn: ScenarioFn
const datasource: any = {
name: 'TestDB',
interval: ctx.dsInterval,
query: (options: DataQueryRequest, observer: DataStreamObserver) => {
query: (options: DataQueryRequest) => {
ctx.queryCalledWith = options;
ctx.observer = observer;
return Promise.resolve(response);
},
testDatasource: jest.fn(),
@@ -95,9 +88,10 @@ function describeQueryRunnerScenario(description: string, scenarioFn: ScenarioFn
queries: [{ refId: 'A', test: 1 }],
};
ctx.runner = new PanelQueryRunner(1);
ctx.runner.subscribe({
ctx.runner = new PanelQueryRunner();
ctx.runner.getData().subscribe({
next: (data: PanelData) => {
ctx.res = data;
ctx.events.push(data);
},
});
@@ -110,7 +104,7 @@ function describeQueryRunnerScenario(description: string, scenarioFn: ScenarioFn
} as PanelModel;
ctx.events = [];
ctx.res = await ctx.runner.run(args);
ctx.runner.run(args);
});
scenarioFn(ctx);
@@ -190,102 +184,4 @@ describe('PanelQueryRunner', () => {
expect(ctx.queryCalledWith.maxDataPoints).toBe(10);
});
});
describeQueryRunnerScenario('when datasource is streaming data', ctx => {
let streamState: DataStreamState;
let isUnsubbed = false;
beforeEach(() => {
streamState = {
state: LoadingState.Streaming,
key: 'test-stream-1',
data: [
new MutableDataFrame({
fields: [],
name: 'I am a magic stream',
}),
],
request: {
requestId: ctx.queryCalledWith.requestId,
} as any,
unsubscribe: () => {
isUnsubbed = true;
},
};
ctx.observer(streamState);
});
it('should push another update to subscriber', async () => {
expect(ctx.events.length).toBe(2);
});
it('should set state to streaming', async () => {
expect(ctx.events[1].state).toBe(LoadingState.Streaming);
});
it('should not unsubscribe', async () => {
expect(isUnsubbed).toBe(false);
});
it('destroy should unsubscribe streams', async () => {
ctx.runner.destroy();
expect(isUnsubbed).toBe(true);
});
});
describeQueryRunnerScenario('Shared query request', ctx => {
ctx.setup(() => {});
it('should get the same results as the original', async () => {
// Get the results from
const q: DashboardQuery = { refId: 'Z', panelId: 1 };
const myPanelId = 7;
const runnerWantingSharedResults = new PanelQueryRunner(myPanelId);
panelsForCurrentDashboardMock[myPanelId] = {
id: myPanelId,
getQueryRunner: () => {
return runnerWantingSharedResults;
},
} as PanelModel;
const res = await runnerWantingSharedResults.run({
datasource: SHARED_DASHBODARD_QUERY,
queries: [q],
// Same query setup
scopedVars: ctx.scopedVars,
minInterval: ctx.minInterval,
widthPixels: ctx.widthPixels,
maxDataPoints: ctx.maxDataPoints,
timeRange: {
from: dateTime().subtract(1, 'days'),
to: dateTime(),
raw: { from: '1h', to: 'now' },
},
panelId: myPanelId, // Not 1
});
const req = res.request;
expect(req.panelId).toBe(1); // The source panel
expect(req.targets[0].datasource).toBe('TestDB');
expect(res.series.length).toBe(1);
expect(res.series[0].length).toBe(2);
// Get the private subject and check that someone is listening
const subject = (ctx.runner as any).subject as Subject<PanelData>;
expect(subject.observers.length).toBe(2);
// Now change the query and we should stop listening
try {
runnerWantingSharedResults.run({
datasource: 'unknown-datasource',
panelId: myPanelId, // Not 1
} as QueryRunnerOptions);
} catch {}
// runnerWantingSharedResults subject is now unsubscribed
// the test listener is still subscribed
expect(subject.observers.length).toBe(1);
});
});
});

View File

@@ -1,20 +1,19 @@
// Libraries
import cloneDeep from 'lodash/cloneDeep';
import throttle from 'lodash/throttle';
import { Subject, Unsubscribable, PartialObserver } from 'rxjs';
import { cloneDeep } from 'lodash';
import { ReplaySubject, Unsubscribable, Observable } from 'rxjs';
import { map } from 'rxjs/operators';
// Services & Utils
import { config } from 'app/core/config';
import { getDatasourceSrv } from 'app/features/plugins/datasource_srv';
import kbn from 'app/core/utils/kbn';
import templateSrv from 'app/features/templating/template_srv';
import { PanelQueryState } from './PanelQueryState';
import { isSharedDashboardQuery, SharedQueryRunner } from 'app/plugins/datasource/dashboard/SharedQueryRunner';
import { runRequest, preProcessPanelData } from './runRequest';
import { runSharedRequest, isSharedDashboardQuery } from '../../../plugins/datasource/dashboard';
// Types
import { PanelData, DataQuery, DataQueryRequest, DataSourceApi, DataSourceJsonData } from '@grafana/ui';
import { TimeRange, DataTransformerConfig, transformDataFrame, toLegacyResponseData, ScopedVars } from '@grafana/data';
import config from 'app/core/config';
import { TimeRange, DataTransformerConfig, transformDataFrame, ScopedVars } from '@grafana/data';
export interface QueryRunnerOptions<
TQuery extends DataQuery = DataQuery,
@@ -36,109 +35,45 @@ export interface QueryRunnerOptions<
transformations?: DataTransformerConfig[];
}
export enum PanelQueryRunnerFormat {
frames = 'frames',
legacy = 'legacy',
both = 'both',
}
let counter = 100;
function getNextRequestId() {
return 'Q' + counter++;
}
export class PanelQueryRunner {
private subject?: Subject<PanelData>;
private state = new PanelQueryState();
private subject?: ReplaySubject<PanelData>;
private subscription?: Unsubscribable;
private transformations?: DataTransformerConfig[];
// Listen to another panel for changes
private sharedQueryRunner: SharedQueryRunner;
constructor(private panelId: number) {
this.state.onStreamingDataUpdated = this.onStreamingDataUpdated;
this.subject = new Subject();
}
getPanelId() {
return this.panelId;
constructor() {
this.subject = new ReplaySubject(1);
}
/**
* Get the last result -- optionally skip the transformation
* Returns an observable that subscribes to the shared multi-cast subject (that reply last result).
*/
// TODO: add tests
getCurrentData(transform = true): PanelData {
const v = this.state.validateStreamsAndGetPanelData();
const transformData = config.featureToggles.transformations && transform;
const hasTransformations = this.transformations && this.transformations.length;
if (transformData && hasTransformations) {
const processed = transformDataFrame(this.transformations, v.series);
return {
...v,
series: processed,
legacy: processed.map(p => toLegacyResponseData(p)),
};
getData(transform = true): Observable<PanelData> {
if (transform) {
return this.subject.pipe(
map((data: PanelData) => {
if (this.hasTransformations()) {
const newSeries = transformDataFrame(this.transformations, data.series);
return { ...data, series: newSeries };
}
return data;
})
);
}
return v;
// Just pass it directly
return this.subject.pipe();
}
/**
* Listen for updates to the PanelData. If a query has already run for this panel,
* the results will be immediatly passed to the observer
*/
subscribe(observer: PartialObserver<PanelData>, format = PanelQueryRunnerFormat.frames): Unsubscribable {
if (format === PanelQueryRunnerFormat.legacy) {
this.state.sendLegacy = true;
} else if (format === PanelQueryRunnerFormat.both) {
this.state.sendFrames = true;
this.state.sendLegacy = true;
} else {
this.state.sendFrames = true;
}
// Send the last result
if (this.state.isStarted()) {
// Force check formats again?
this.state.getDataAfterCheckingFormats();
observer.next(this.getCurrentData()); // transformed
}
return this.subject.subscribe(observer);
hasTransformations() {
return config.featureToggles.transformations && this.transformations && this.transformations.length > 0;
}
/**
* Subscribe one runner to another
*/
chain(runner: PanelQueryRunner): Unsubscribable {
const { sendLegacy, sendFrames } = runner.state;
let format = sendFrames ? PanelQueryRunnerFormat.frames : PanelQueryRunnerFormat.legacy;
if (sendLegacy) {
format = PanelQueryRunnerFormat.both;
}
return this.subscribe(runner.subject, format);
}
/**
* Change the current transformation and notify all listeners
* Should be used only by panel editor to update the transformers
*/
setTransform = (transformations?: DataTransformerConfig[]) => {
this.transformations = transformations;
if (this.state.isStarted()) {
this.onStreamingDataUpdated();
}
};
async run(options: QueryRunnerOptions): Promise<PanelData> {
const { state } = this;
async run(options: QueryRunnerOptions) {
const {
queries,
timezone,
@@ -152,18 +87,12 @@ export class PanelQueryRunner {
maxDataPoints,
scopedVars,
minInterval,
delayStateNotification,
// delayStateNotification,
} = options;
// Support shared queries
if (isSharedDashboardQuery(datasource)) {
if (!this.sharedQueryRunner) {
this.sharedQueryRunner = new SharedQueryRunner(this);
}
return this.sharedQueryRunner.process(options);
} else if (this.sharedQueryRunner) {
this.sharedQueryRunner.disconnect();
this.sharedQueryRunner = null;
this.pipeToSubject(runSharedRequest(options));
return;
}
const request: DataQueryRequest = {
@@ -185,8 +114,6 @@ export class PanelQueryRunner {
// Add deprecated property
(request as any).rangeRaw = timeRange.raw;
let loadingStateTimeoutId = 0;
try {
const ds = await getDataSource(datasource, request.scopedVars);
@@ -215,54 +142,30 @@ export class PanelQueryRunner {
request.interval = norm.interval;
request.intervalMs = norm.intervalMs;
// Check if we can reuse the already issued query
const active = state.getActiveRunner();
if (active) {
if (state.isSameQuery(ds, request)) {
// Maybe cancel if it has run too long?
console.log('Trying to execute query while last one has yet to complete, returning same promise');
return active;
} else {
state.cancel('Query Changed while running');
}
}
// Send a loading status event on slower queries
loadingStateTimeoutId = window.setTimeout(() => {
if (state.getActiveRunner()) {
this.subject.next(this.state.validateStreamsAndGetPanelData());
}
}, delayStateNotification || 500);
this.transformations = options.transformations;
const data = await state.execute(ds, request);
// Clear the delayed loading state timeout
clearTimeout(loadingStateTimeoutId);
// Broadcast results
this.subject.next(this.getCurrentData());
return data;
this.pipeToSubject(runRequest(ds, request));
} catch (err) {
clearTimeout(loadingStateTimeoutId);
const data = state.setError(err);
this.subject.next(data);
return data;
console.log('PanelQueryRunner Error', err);
}
}
/**
* Called after every streaming event. This should be throttled so we
* avoid accidentally overwhelming the browser
*/
onStreamingDataUpdated = throttle(
() => {
this.subject.next(this.getCurrentData());
},
50,
{ trailing: true, leading: true }
);
private pipeToSubject(observable: Observable<PanelData>) {
if (this.subscription) {
this.subscription.unsubscribe();
}
// Makes sure everything is a proper DataFrame
const prepare = preProcessPanelData();
this.subscription = observable.subscribe({
next: (data: PanelData) => {
this.subject.next(prepare(data));
},
});
}
setTransformations(transformations?: DataTransformerConfig[]) {
this.transformations = transformations;
}
/**
* Called when the panel is closed
@@ -273,17 +176,10 @@ export class PanelQueryRunner {
this.subject.complete();
}
// Will cancel and disconnect any open requets
this.state.cancel('destroy');
if (this.subscription) {
this.subscription.unsubscribe();
}
}
setState = (state: PanelQueryState) => {
this.state = state;
};
getState = () => {
return this.state;
};
}
async function getDataSource(

View File

@@ -1,238 +0,0 @@
import { toDataQueryError, PanelQueryState, getProcessedDataFrames } from './PanelQueryState';
import { MockDataSourceApi } from 'test/mocks/datasource_srv';
import { LoadingState, getDataFrameRow } from '@grafana/data';
import { DataQueryResponse, DataQueryRequest, DataQuery } from '@grafana/ui';
import { getQueryOptions } from 'test/helpers/getQueryOptions';
describe('PanelQueryState', () => {
it('converts anythign to an error', () => {
let err = toDataQueryError(undefined);
expect(err.message).toEqual('Query error');
err = toDataQueryError('STRING ERRROR');
expect(err.message).toEqual('STRING ERRROR');
err = toDataQueryError({ message: 'hello' });
expect(err.message).toEqual('hello');
});
it('keeps track of running queries', async () => {
const state = new PanelQueryState();
expect(state.getActiveRunner()).toBeFalsy();
let hasRun = false;
const dsRunner = new Promise<DataQueryResponse>((resolve, reject) => {
// The status should be running when we get here
expect(state.getActiveRunner()).toBeTruthy();
resolve({ data: ['x', 'y'] });
hasRun = true;
});
const ds = new MockDataSourceApi('test');
ds.queryResolver = dsRunner;
// should not actually run for an empty query
let empty = await state.execute(ds, getQueryOptions({}));
expect(state.getActiveRunner()).toBeFalsy();
expect(empty.series.length).toBe(0);
expect(hasRun).toBeFalsy();
const query = getQueryOptions({
targets: [{ hide: true, refId: 'X' }, { hide: true, refId: 'Y' }, { hide: true, refId: 'Z' }],
});
empty = await state.execute(ds, query);
// should not run any hidden queries'
expect(state.getActiveRunner()).toBeFalsy();
expect(empty.series.length).toBe(0);
expect(hasRun).toBeFalsy();
// Check for the same query
expect(state.isSameQuery(ds, query)).toBeTruthy();
// Check for differnet queries
expect(state.isSameQuery(new MockDataSourceApi('test'), query)).toBeFalsy();
expect(state.isSameQuery(ds, getQueryOptions({ targets: [{ refId: 'differnet' }] }))).toBeFalsy();
});
});
describe('When cancelling request', () => {
it('Should call rejector', () => {
const state = new PanelQueryState();
state.request = {} as DataQueryRequest<DataQuery>;
(state as any).rejector = (obj: any) => {
expect(obj.cancelled).toBe(true);
expect(obj.message).toBe('OHH');
};
state.cancel('OHH');
});
});
describe('getProcessedDataFrame', () => {
it('converts timeseries to table skipping nulls', () => {
const input1 = {
target: 'Field Name',
datapoints: [[100, 1], [200, 2]],
};
const input2 = {
// without target
target: '',
datapoints: [[100, 1], [200, 2]],
};
const data = getProcessedDataFrames([null, input1, input2, null, null]);
expect(data.length).toBe(2);
expect(data[0].fields[0].name).toBe(input1.target);
const cmp = [getDataFrameRow(data[0], 0), getDataFrameRow(data[0], 1)];
expect(cmp).toEqual(input1.datapoints);
// Default name
expect(data[1].fields[0].name).toEqual('Value');
// Every colun should have a name and a type
for (const table of data) {
for (const field of table.fields) {
expect(field.name).toBeDefined();
expect(field.type).toBeDefined();
}
}
});
it('supports null values from query OK', () => {
expect(getProcessedDataFrames([null, null, null, null])).toEqual([]);
expect(getProcessedDataFrames(undefined)).toEqual([]);
expect(getProcessedDataFrames((null as unknown) as any[])).toEqual([]);
expect(getProcessedDataFrames([])).toEqual([]);
});
});
function makeSeriesStub(refId: string) {
return {
fields: [{ name: undefined }],
refId,
} as any;
}
describe('stream handling', () => {
const state = new PanelQueryState();
state.onStreamingDataUpdated = () => {
// nothing
};
state.request = {
requestId: '123',
range: {
raw: {
from: 123, // if string it gets revaluated
},
},
} as any;
state.response = {
state: LoadingState.Done,
series: [makeSeriesStub('A'), makeSeriesStub('B')],
};
it('gets the response', () => {
const data = state.validateStreamsAndGetPanelData();
expect(data.series.length).toBe(2);
expect(data.state).toBe(LoadingState.Done);
expect(data.series[0].refId).toBe('A');
});
it('adds a stream event', () => {
// Post a stream event
state.dataStreamObserver({
state: LoadingState.Loading,
key: 'C',
request: state.request, // From the same request
data: [makeSeriesStub('C')],
unsubscribe: () => {},
});
expect(state.streams.length).toBe(1);
const data = state.validateStreamsAndGetPanelData();
expect(data.series.length).toBe(3);
expect(data.state).toBe(LoadingState.Streaming);
expect(data.series[2].refId).toBe('C');
});
it('add another stream event (with a differnet key)', () => {
// Post a stream event
state.dataStreamObserver({
state: LoadingState.Loading,
key: 'D',
request: state.request, // From the same request
data: [makeSeriesStub('D')],
unsubscribe: () => {},
});
expect(state.streams.length).toBe(2);
const data = state.validateStreamsAndGetPanelData();
expect(data.series.length).toBe(4);
expect(data.state).toBe(LoadingState.Streaming);
expect(data.series[3].refId).toBe('D');
});
it('replace the first stream value, but keep the order', () => {
// Post a stream event
state.dataStreamObserver({
state: LoadingState.Loading,
key: 'C', // The key to replace previous index 2
request: state.request, // From the same request
data: [makeSeriesStub('X')],
unsubscribe: () => {},
});
expect(state.streams.length).toBe(2);
const data = state.validateStreamsAndGetPanelData();
expect(data.series[2].refId).toBe('X');
});
it('ignores streams from a differnet request', () => {
// Post a stream event
state.dataStreamObserver({
state: LoadingState.Loading,
key: 'Z', // Note with key 'A' it would still overwrite
request: {
...state.request,
requestId: 'XXX', // Different request and id
} as any,
data: [makeSeriesStub('C')],
unsubscribe: () => {},
});
expect(state.streams.length).toBe(2); // no change
const data = state.validateStreamsAndGetPanelData();
expect(data.series.length).toBe(4);
});
it('removes streams when the query changes', () => {
state.request = {
...state.request,
requestId: 'somethine else',
} as any;
state.response = {
state: LoadingState.Done,
series: [makeSeriesStub('F')],
};
expect(state.streams.length).toBe(2); // unchanged
const data = state.validateStreamsAndGetPanelData();
expect(data.series.length).toBe(1);
expect(data.series[0].refId).toBe('F');
expect(state.streams.length).toBe(0); // no streams
});
it('should close streams on error', () => {
// Post a stream event
state.dataStreamObserver({
state: LoadingState.Error,
key: 'C',
error: { message: 'EEEEE' },
data: [],
request: state.request,
unsubscribe: () => {},
});
expect(state.streams.length).toBe(0);
expect(state.response.state).toBe(LoadingState.Error);
});
});

View File

@@ -1,377 +0,0 @@
// Libraries
import { isArray, isEqual, isString } from 'lodash';
// Utils & Services
import { getBackendSrv } from 'app/core/services/backend_srv';
import {
dateMath,
guessFieldTypes,
LoadingState,
toLegacyResponseData,
DataFrame,
toDataFrame,
isDataFrame,
} from '@grafana/data';
// Types
import {
DataSourceApi,
DataQueryRequest,
PanelData,
DataQueryError,
DataStreamObserver,
DataStreamState,
DataQueryResponseData,
} from '@grafana/ui';
export class PanelQueryState {
// The current/last running request
request = {
startTime: 0,
endTime: 1000, // Somethign not zero
} as DataQueryRequest;
// The result back from the datasource query
response = {
state: LoadingState.NotStarted,
series: [],
} as PanelData;
// Active stream results
streams: DataStreamState[] = [];
sendFrames = false;
sendLegacy = false;
// A promise for the running query
private executor?: Promise<PanelData> = null;
private rejector = (reason?: any) => {};
private datasource: DataSourceApi = {} as any;
isFinished(state: LoadingState) {
return state === LoadingState.Done || state === LoadingState.Error;
}
isStarted() {
return this.response.state !== LoadingState.NotStarted;
}
isSameQuery(ds: DataSourceApi, req: DataQueryRequest) {
if (ds !== this.datasource) {
return false;
}
// For now just check that the targets look the same
return isEqual(this.request.targets, req.targets);
}
/**
* Return the currently running query
*/
getActiveRunner(): Promise<PanelData> | undefined {
return this.executor;
}
cancel(reason: string) {
const { request } = this;
this.executor = null;
try {
// If no endTime the call to datasource.query did not complete
// call rejector to reject the executor promise
if (!request.endTime) {
request.endTime = Date.now();
this.rejector({ cancelled: true, message: reason });
}
// Cancel any open HTTP request with the same ID
if (request.requestId) {
getBackendSrv().resolveCancelerIfExists(request.requestId);
}
} catch (err) {
console.log('Error canceling request', err);
}
// Close any open streams
this.closeStreams(true);
}
execute(ds: DataSourceApi, req: DataQueryRequest): Promise<PanelData> {
this.request = {
...req,
startTime: Date.now(),
};
this.datasource = ds;
// Return early if there are no queries to run
if (!req.targets.length) {
console.log('No queries, so return early');
this.request.endTime = Date.now();
this.closeStreams();
return Promise.resolve(
(this.response = {
state: LoadingState.Done,
series: [], // Clear the data
legacy: [],
})
);
}
// Set the loading state immediately
this.response.state = LoadingState.Loading;
this.executor = new Promise<PanelData>((resolve, reject) => {
this.rejector = reject;
return ds
.query(this.request, this.dataStreamObserver)
.then(resp => {
if (!isArray(resp.data)) {
throw new Error(`Expected response data to be array, got ${typeof resp.data}.`);
}
this.request.endTime = Date.now();
this.executor = null;
// Make sure we send something back -- called run() w/o subscribe!
if (!(this.sendFrames || this.sendLegacy)) {
this.sendFrames = true;
}
// Save the result state
this.response = {
state: LoadingState.Done,
request: this.request,
series: this.sendFrames ? getProcessedDataFrames(resp.data) : [],
legacy: this.sendLegacy ? translateToLegacyData(resp.data) : undefined,
};
resolve(this.validateStreamsAndGetPanelData());
})
.catch(err => {
this.executor = null;
resolve(this.setError(err));
});
});
return this.executor;
}
// Send a notice when the stream has updated the current model
onStreamingDataUpdated: () => void;
// This gets all stream events and keeps track of them
// it will then delegate real changes to the PanelQueryRunner
dataStreamObserver: DataStreamObserver = (stream: DataStreamState) => {
// Streams only work with the 'series' format
this.sendFrames = true;
if (stream.state === LoadingState.Error) {
this.setError(stream.error);
this.onStreamingDataUpdated();
return;
}
// Add the stream to our list
let found = false;
const active = this.streams.map(s => {
if (s.key === stream.key) {
found = true;
return stream;
}
return s;
});
if (!found) {
if (shouldDisconnect(this.request, stream)) {
console.log('Got stream update from old stream, unsubscribing');
stream.unsubscribe();
return;
}
active.push(stream);
}
this.streams = active;
this.onStreamingDataUpdated();
};
closeStreams(keepSeries = false) {
if (!this.streams.length) {
return;
}
const series: DataFrame[] = [];
for (const stream of this.streams) {
if (stream.data) {
series.push.apply(series, stream.data);
}
try {
stream.unsubscribe();
} catch {
console.log('Failed to unsubscribe to stream');
}
}
this.streams = [];
// Move the series from streams to the response
if (keepSeries) {
const { response } = this;
this.response = {
...response,
series: [
...response.series,
...series, // Append the streamed series
],
};
}
}
/**
* This is called before broadcasting data to listeners. Given that
* stream events can happen at any point, we need to make sure to
* only return data from active streams.
*/
validateStreamsAndGetPanelData(): PanelData {
const { response, streams, request } = this;
// When not streaming, return the response + request
if (!streams.length) {
return {
...response,
request: request,
};
}
let done = this.isFinished(response.state);
const series = [...response.series];
const active: DataStreamState[] = [];
for (const stream of this.streams) {
if (shouldDisconnect(request, stream)) {
console.log('getPanelData() - shouldDisconnect true, unsubscribing to steam');
stream.unsubscribe();
continue;
}
active.push(stream);
series.push.apply(series, stream.data);
if (!this.isFinished(stream.state)) {
done = false;
}
}
this.streams = active;
// Update the time range
let timeRange = this.request.range;
if (isString(timeRange.raw.from)) {
timeRange = {
from: dateMath.parse(timeRange.raw.from, false),
to: dateMath.parse(timeRange.raw.to, true),
raw: timeRange.raw,
};
}
return {
state: done ? LoadingState.Done : LoadingState.Streaming,
// This should not be needed but unfortunately Prometheus datasource sends non DataFrame here bypassing the
// typings
series: this.sendFrames ? getProcessedDataFrames(series) : [],
legacy: this.sendLegacy ? translateToLegacyData(series) : undefined,
request: {
...this.request,
range: timeRange, // update the time range
},
};
}
/**
* Make sure all requested formats exist on the data
*/
getDataAfterCheckingFormats(): PanelData {
const { response, sendLegacy, sendFrames } = this;
if (sendLegacy && (!response.legacy || !response.legacy.length)) {
response.legacy = response.series.map(v => toLegacyResponseData(v));
}
if (sendFrames && !response.series.length && response.legacy) {
response.series = response.legacy.map(v => toDataFrame(v));
}
return this.validateStreamsAndGetPanelData();
}
setError(err: any): PanelData {
if (!this.request.endTime) {
this.request.endTime = Date.now();
}
this.closeStreams(true);
this.response = {
...this.response, // Keep any existing data
state: LoadingState.Error,
error: toDataQueryError(err),
};
return this.validateStreamsAndGetPanelData();
}
}
export function shouldDisconnect(source: DataQueryRequest, state: DataStreamState) {
// It came from the same the same request, so keep it
if (source === state.request || state.request.requestId.startsWith(source.requestId)) {
return false;
}
// We should be able to check that it is the same query regardless of
// if it came from the same request. This will be important for #16676
return true;
}
export function toDataQueryError(err: any): DataQueryError {
const error = (err || {}) as DataQueryError;
if (!error.message) {
if (typeof err === 'string' || err instanceof String) {
return { message: err } as DataQueryError;
}
let message = 'Query error';
if (error.message) {
message = error.message;
} else if (error.data && error.data.message) {
message = error.data.message;
} else if (error.data && error.data.error) {
message = error.data.error;
} else if (error.status) {
message = `Query error: ${error.status} ${error.statusText}`;
}
error.message = message;
}
return error;
}
function translateToLegacyData(data: DataQueryResponseData) {
return data.map((v: any) => {
if (isDataFrame(v)) {
return toLegacyResponseData(v);
}
return v;
});
}
/**
* All panels will be passed tables that have our best guess at colum type set
*
* This is also used by PanelChrome for snapshot support
*/
export function getProcessedDataFrames(results?: DataQueryResponseData[]): DataFrame[] {
if (!isArray(results)) {
return [];
}
const series: DataFrame[] = [];
for (const r of results) {
if (r) {
series.push(guessFieldTypes(toDataFrame(r)));
}
}
return series;
}

View File

@@ -0,0 +1,206 @@
import { DataFrame, LoadingState, dateTime } from '@grafana/data';
import { PanelData, DataSourceApi, DataQueryRequest, DataQueryResponse } from '@grafana/ui';
import { Subscriber, Observable, Subscription } from 'rxjs';
import { runRequest } from './runRequest';
jest.mock('app/core/services/backend_srv');
class ScenarioCtx {
ds: DataSourceApi;
request: DataQueryRequest;
subscriber: Subscriber<DataQueryResponse>;
isUnsubbed = false;
setupFn: () => void = () => {};
results: PanelData[];
subscription: Subscription;
wasStarted = false;
error: Error = null;
toStartTime = dateTime();
fromStartTime = dateTime();
reset() {
this.wasStarted = false;
this.isUnsubbed = false;
this.results = [];
this.request = {
range: {
from: this.toStartTime,
to: this.fromStartTime,
raw: { from: '1h', to: 'now' },
},
targets: [
{
refId: 'A',
},
],
} as DataQueryRequest;
this.ds = {
query: (request: DataQueryRequest) => {
return new Observable<DataQueryResponse>(subscriber => {
this.subscriber = subscriber;
this.wasStarted = true;
if (this.error) {
throw this.error;
}
return () => {
this.isUnsubbed = true;
};
});
},
} as DataSourceApi;
}
start() {
this.subscription = runRequest(this.ds, this.request).subscribe({
next: (data: PanelData) => {
this.results.push(data);
},
});
}
emitPacket(packet: DataQueryResponse) {
this.subscriber.next(packet);
}
setup(fn: () => void) {
this.setupFn = fn;
}
}
function runRequestScenario(desc: string, fn: (ctx: ScenarioCtx) => void) {
describe(desc, () => {
const ctx = new ScenarioCtx();
beforeEach(() => {
ctx.reset();
return ctx.setupFn();
});
fn(ctx);
});
}
describe('runRequest', () => {
runRequestScenario('with no queries', ctx => {
ctx.setup(() => {
ctx.request.targets = [];
ctx.start();
});
it('should emit empty result with loading state done', () => {
expect(ctx.wasStarted).toBe(false);
expect(ctx.results[0].state).toBe(LoadingState.Done);
});
});
runRequestScenario('After first response', ctx => {
ctx.setup(() => {
ctx.start();
ctx.emitPacket({
data: [{ name: 'Data' } as DataFrame],
});
});
it('should emit single result with loading state done', () => {
expect(ctx.wasStarted).toBe(true);
expect(ctx.results.length).toBe(1);
});
});
runRequestScenario('After tree responses, 2 with different keys', ctx => {
ctx.setup(() => {
ctx.start();
ctx.emitPacket({
data: [{ name: 'DataA-1' } as DataFrame],
key: 'A',
});
ctx.emitPacket({
data: [{ name: 'DataA-2' } as DataFrame],
key: 'A',
});
ctx.emitPacket({
data: [{ name: 'DataB-1' } as DataFrame],
key: 'B',
});
});
it('should emit 3 seperate results', () => {
expect(ctx.results.length).toBe(3);
});
it('should combine results and return latest data for key A', () => {
expect(ctx.results[2].series).toEqual([{ name: 'DataA-2' }, { name: 'DataB-1' }]);
});
it('should have loading state Done', () => {
expect(ctx.results[2].state).toEqual(LoadingState.Done);
});
});
runRequestScenario('After response with state Streaming', ctx => {
ctx.setup(() => {
ctx.start();
ctx.emitPacket({
data: [{ name: 'DataA-1' } as DataFrame],
key: 'A',
});
ctx.emitPacket({
data: [{ name: 'DataA-2' } as DataFrame],
key: 'A',
state: LoadingState.Streaming,
});
});
it('should have loading state Streaming', () => {
expect(ctx.results[1].state).toEqual(LoadingState.Streaming);
});
});
runRequestScenario('If no response after 250ms', ctx => {
ctx.setup(async () => {
ctx.start();
await sleep(250);
});
it('should emit 1 result with loading state', () => {
expect(ctx.results.length).toBe(1);
expect(ctx.results[0].state).toBe(LoadingState.Loading);
});
});
runRequestScenario('on thrown error', ctx => {
ctx.setup(() => {
ctx.error = new Error('Ohh no');
ctx.start();
});
it('should emit 1 error result', () => {
expect(ctx.results[0].error.message).toBe('Ohh no');
expect(ctx.results[0].state).toBe(LoadingState.Error);
});
});
runRequestScenario('If time range is relative', ctx => {
ctx.setup(async () => {
ctx.start();
// wait a bit
await sleep(20);
ctx.emitPacket({ data: [{ name: 'DataB-1' } as DataFrame] });
});
it('should update returned request range', () => {
expect(ctx.results[0].request.range.to.valueOf()).not.toBe(ctx.fromStartTime);
});
});
});
async function sleep(ms: number) {
return new Promise(resolve => {
setTimeout(resolve, ms);
});
}

View File

@@ -0,0 +1,211 @@
// Libraries
import { Observable, of, timer, merge, from } from 'rxjs';
import { flatten, map as lodashMap, isArray, isString } from 'lodash';
import { map, catchError, takeUntil, mapTo, share, finalize } from 'rxjs/operators';
// Utils & Services
import { getBackendSrv } from 'app/core/services/backend_srv';
// Types
import {
DataSourceApi,
DataQueryRequest,
PanelData,
DataQueryResponse,
DataQueryResponseData,
DataQueryError,
} from '@grafana/ui';
import { LoadingState, dateMath, toDataFrame, DataFrame, guessFieldTypes } from '@grafana/data';
type MapOfResponsePackets = { [str: string]: DataQueryResponse };
interface RunningQueryState {
packets: { [key: string]: DataQueryResponse };
panelData: PanelData;
}
/*
* This function should handle composing a PanelData from multiple responses
*/
export function processResponsePacket(packet: DataQueryResponse, state: RunningQueryState): RunningQueryState {
const request = state.panelData.request;
const packets: MapOfResponsePackets = {
...state.packets,
};
packets[packet.key || 'A'] = packet;
// Update the time range
let timeRange = request.range;
if (isString(timeRange.raw.from)) {
timeRange = {
from: dateMath.parse(timeRange.raw.from, false),
to: dateMath.parse(timeRange.raw.to, true),
raw: timeRange.raw,
};
}
const combinedData = flatten(
lodashMap(packets, (packet: DataQueryResponse) => {
return packet.data;
})
);
const panelData = {
state: packet.state || LoadingState.Done,
series: combinedData,
request: {
...request,
range: timeRange,
},
};
return { packets, panelData };
}
/**
* This function handles the excecution of requests & and processes the single or multiple response packets into
* a combined PanelData response.
* It will
* * Merge multiple responses into a single DataFrame array based on the packet key
* * Will emit a loading state if no response after 50ms
* * Cancel any still runnning network requests on unsubscribe (using request.requestId)
*/
export function runRequest(datasource: DataSourceApi, request: DataQueryRequest): Observable<PanelData> {
let state: RunningQueryState = {
panelData: {
state: LoadingState.Loading,
series: [],
request: request,
},
packets: {},
};
// Return early if there are no queries to run
if (!request.targets.length) {
request.endTime = Date.now();
state.panelData.state = LoadingState.Done;
return of(state.panelData);
}
const dataObservable = callQueryMethod(datasource, request).pipe(
// Transform response packets into PanelData with merged results
map((packet: DataQueryResponse) => {
if (!isArray(packet.data)) {
throw new Error(`Expected response data to be array, got ${typeof packet.data}.`);
}
request.endTime = Date.now();
state = processResponsePacket(packet, state);
return state.panelData;
}),
// handle errors
catchError(err =>
of({
...state.panelData,
state: LoadingState.Error,
error: processQueryError(err),
})
),
// finalize is triggered when subscriber unsubscribes
// This makes sure any still running network requests are cancelled
finalize(cancelNetworkRequestsOnUnsubscribe(request)),
// this makes it possible to share this observable in takeUntil
share()
);
// If 50ms without a response emit a loading state
// mapTo will translate the timer event into state.panelData (which has state set to loading)
// takeUntil will cancel the timer emit when first response packet is received on the dataObservable
return merge(
timer(200).pipe(
mapTo(state.panelData),
takeUntil(dataObservable)
),
dataObservable
);
}
function cancelNetworkRequestsOnUnsubscribe(req: DataQueryRequest) {
return () => {
getBackendSrv().resolveCancelerIfExists(req.requestId);
};
}
export function callQueryMethod(datasource: DataSourceApi, request: DataQueryRequest) {
const returnVal = datasource.query(request);
return from(returnVal);
}
export function processQueryError(err: any): DataQueryError {
const error = (err || {}) as DataQueryError;
if (!error.message) {
if (typeof err === 'string' || err instanceof String) {
return { message: err } as DataQueryError;
}
let message = 'Query error';
if (error.message) {
message = error.message;
} else if (error.data && error.data.message) {
message = error.data.message;
} else if (error.data && error.data.error) {
message = error.data.error;
} else if (error.status) {
message = `Query error: ${error.status} ${error.statusText}`;
}
error.message = message;
}
return error;
}
/**
* All panels will be passed tables that have our best guess at colum type set
*
* This is also used by PanelChrome for snapshot support
*/
export function getProcessedDataFrames(results?: DataQueryResponseData[]): DataFrame[] {
if (!isArray(results)) {
return [];
}
const dataFrames: DataFrame[] = [];
for (const result of results) {
const dataFrame = guessFieldTypes(toDataFrame(result));
// clear out any cached calcs
for (const field of dataFrame.fields) {
field.calcs = null;
}
dataFrames.push(dataFrame);
}
return dataFrames;
}
export function preProcessPanelData() {
let lastResult: PanelData = null;
return function mapper(data: PanelData) {
let { series } = data;
// for loading states with no data, use last result
if (data.state === LoadingState.Loading && series.length === 0) {
if (!lastResult) {
lastResult = data;
}
return { ...lastResult, state: LoadingState.Loading };
}
// Makes sure the data is properly formatted
series = getProcessedDataFrames(series);
lastResult = { ...data, series };
return lastResult;
};
}