grafana/public/app/features/dashboard/state/runRequest.test.ts
Torkel Ödegaard 140ecbcf79
QueryProcessing: Observable query interface and RxJS for query & stream processing (#18899)
* I needed to learn some rxjs and understand this more, so just playing around

* Updated

* Removed all the complete calls

* Refactoring

* StreamHandler -> observable start

* progress

* simple singal works

* Handle update time range

* added error handling

* wrap old function

* minor changes

* handle data format in the subscribe function

* Use replay subject to return last value to subscribers

* Set loading state after no response in 50ms

* added missing file

* updated comment

* Added cancelation of network requests

* runRequest: Added unit test scenario framework

* Progress on tests

* minor refactor of unit tests

* updated test

* removed some old code

* Shared queries work again, and also became so much simplier

* unified query and observe methods

* implict any fix

* Fixed closed subject issue

* removed comment

* Use last returned data for loading state

* WIP: Explore to runRequest makover step1

* Minor progress

* Minor progress on explore and runRequest

* minor progress

* Things are starting to work in explore

* Updated prometheus to use new observable query response, greatly simplified code

* Revert refId change

* Found better solution for key/refId/requestId problem

* use observable with loki

* tests compile

* fix loki query prep

* Explore: correct first response handling

* Refactorings

* Refactoring

* Explore: Fixes LoadingState and GraphResults between runs (#18986)

* Refactor: Adds state to DataQueryResponse

* Fix: Fixes so we do not empty results before new data arrives
Fixes: #17409

* Transformations work

* observable test data

* remove single() from loki promise

* Fixed comment

* Explore: Fixes failing Loki and Prometheus unit tests (#18995)

* Tests: Makes datasource tests work again

* Fix: Fixes loki datasource so highligthing works

* Chore: Runs Prettier

* Fixed query runner tests

* Delay loading state indication to 200ms

* Fixed test

* fixed unit tests

* Clear cached calcs

* Fixed bug getProcesedDataFrames

* Fix the correct test is a better idea

* Fix: Fixes so queries in Explore are only run if Graph/Table is shown (#19000)

* Fix: Fixes so queries in Explore are only run if Graph/Table is shown
Fixes: #18618

* Refactor: Removes unnecessary condition

* PanelData: provide legacy data only when needed  (#19018)

* no legacy

* invert logic... now compiles

* merge getQueryResponseData and getDataRaw

* update comment about query editor

* use single getData() function

* only send legacy when it is used in explore

* pre process rather than post process

* pre process rather than post process

* Minor refactoring

* Add missing tags to test datasource response

* MixedDatasource: Adds query observable pattern to MixedDatasource (#19037)

* start mixed datasource

* Refactor: Refactors into observable parttern

* Tests: Fixes tests

* Tests: Removes console.log

* Refactor: Adds unique requestId
2019-09-12 17:28:46 +02:00

207 lines
5.0 KiB
TypeScript

import { DataFrame, LoadingState, dateTime } from '@grafana/data';
import { PanelData, DataSourceApi, DataQueryRequest, DataQueryResponse } from '@grafana/ui';
import { Subscriber, Observable, Subscription } from 'rxjs';
import { runRequest } from './runRequest';
jest.mock('app/core/services/backend_srv');
class ScenarioCtx {
ds: DataSourceApi;
request: DataQueryRequest;
subscriber: Subscriber<DataQueryResponse>;
isUnsubbed = false;
setupFn: () => void = () => {};
results: PanelData[];
subscription: Subscription;
wasStarted = false;
error: Error = null;
toStartTime = dateTime();
fromStartTime = dateTime();
reset() {
this.wasStarted = false;
this.isUnsubbed = false;
this.results = [];
this.request = {
range: {
from: this.toStartTime,
to: this.fromStartTime,
raw: { from: '1h', to: 'now' },
},
targets: [
{
refId: 'A',
},
],
} as DataQueryRequest;
this.ds = {
query: (request: DataQueryRequest) => {
return new Observable<DataQueryResponse>(subscriber => {
this.subscriber = subscriber;
this.wasStarted = true;
if (this.error) {
throw this.error;
}
return () => {
this.isUnsubbed = true;
};
});
},
} as DataSourceApi;
}
start() {
this.subscription = runRequest(this.ds, this.request).subscribe({
next: (data: PanelData) => {
this.results.push(data);
},
});
}
emitPacket(packet: DataQueryResponse) {
this.subscriber.next(packet);
}
setup(fn: () => void) {
this.setupFn = fn;
}
}
function runRequestScenario(desc: string, fn: (ctx: ScenarioCtx) => void) {
describe(desc, () => {
const ctx = new ScenarioCtx();
beforeEach(() => {
ctx.reset();
return ctx.setupFn();
});
fn(ctx);
});
}
describe('runRequest', () => {
runRequestScenario('with no queries', ctx => {
ctx.setup(() => {
ctx.request.targets = [];
ctx.start();
});
it('should emit empty result with loading state done', () => {
expect(ctx.wasStarted).toBe(false);
expect(ctx.results[0].state).toBe(LoadingState.Done);
});
});
runRequestScenario('After first response', ctx => {
ctx.setup(() => {
ctx.start();
ctx.emitPacket({
data: [{ name: 'Data' } as DataFrame],
});
});
it('should emit single result with loading state done', () => {
expect(ctx.wasStarted).toBe(true);
expect(ctx.results.length).toBe(1);
});
});
runRequestScenario('After tree responses, 2 with different keys', ctx => {
ctx.setup(() => {
ctx.start();
ctx.emitPacket({
data: [{ name: 'DataA-1' } as DataFrame],
key: 'A',
});
ctx.emitPacket({
data: [{ name: 'DataA-2' } as DataFrame],
key: 'A',
});
ctx.emitPacket({
data: [{ name: 'DataB-1' } as DataFrame],
key: 'B',
});
});
it('should emit 3 seperate results', () => {
expect(ctx.results.length).toBe(3);
});
it('should combine results and return latest data for key A', () => {
expect(ctx.results[2].series).toEqual([{ name: 'DataA-2' }, { name: 'DataB-1' }]);
});
it('should have loading state Done', () => {
expect(ctx.results[2].state).toEqual(LoadingState.Done);
});
});
runRequestScenario('After response with state Streaming', ctx => {
ctx.setup(() => {
ctx.start();
ctx.emitPacket({
data: [{ name: 'DataA-1' } as DataFrame],
key: 'A',
});
ctx.emitPacket({
data: [{ name: 'DataA-2' } as DataFrame],
key: 'A',
state: LoadingState.Streaming,
});
});
it('should have loading state Streaming', () => {
expect(ctx.results[1].state).toEqual(LoadingState.Streaming);
});
});
runRequestScenario('If no response after 250ms', ctx => {
ctx.setup(async () => {
ctx.start();
await sleep(250);
});
it('should emit 1 result with loading state', () => {
expect(ctx.results.length).toBe(1);
expect(ctx.results[0].state).toBe(LoadingState.Loading);
});
});
runRequestScenario('on thrown error', ctx => {
ctx.setup(() => {
ctx.error = new Error('Ohh no');
ctx.start();
});
it('should emit 1 error result', () => {
expect(ctx.results[0].error.message).toBe('Ohh no');
expect(ctx.results[0].state).toBe(LoadingState.Error);
});
});
runRequestScenario('If time range is relative', ctx => {
ctx.setup(async () => {
ctx.start();
// wait a bit
await sleep(20);
ctx.emitPacket({ data: [{ name: 'DataB-1' } as DataFrame] });
});
it('should update returned request range', () => {
expect(ctx.results[0].request.range.to.valueOf()).not.toBe(ctx.fromStartTime);
});
});
});
async function sleep(ms: number) {
return new Promise(resolve => {
setTimeout(resolve, ms);
});
}