DashboardQueryRunner: Fixes unrestrained subscriptions being created (#36371)

This commit is contained in:
Hugo Häggmark
2021-07-02 10:52:13 +02:00
committed by GitHub
parent 30dc4025c2
commit b741245960
4 changed files with 155 additions and 132 deletions

View File

@@ -64,10 +64,10 @@ function expectOnResults(args: {
next: (value) => {
try {
expectCallback(value);
subscription.unsubscribe();
subscription?.unsubscribe();
done();
} catch (err) {
subscription.unsubscribe();
subscription?.unsubscribe();
done.fail(err);
}
},
@@ -97,6 +97,56 @@ describe('DashboardQueryRunnerImpl', () => {
});
});
describe('when calling run and all workers succeed but take longer than 200ms', () => {
it('then it should return the empty results', (done) => {
const { runner, options, annotationQueryMock, executeAnnotationQueryMock, getMock } = getTestContext();
const wait = 201;
executeAnnotationQueryMock.mockReturnValue(toAsyncOfResult({ events: [{ id: 'NextGen' }] }).pipe(delay(wait)));
expectOnResults({
runner,
panelId: 1,
done,
expect: (results) => {
// should have one alert state, one snapshot, one legacy and one next gen result
// having both snapshot and legacy/next gen is a imaginary example for testing purposes and doesn't exist for real
expect(results).toEqual({ annotations: [] });
expect(annotationQueryMock).toHaveBeenCalledTimes(1);
expect(executeAnnotationQueryMock).toHaveBeenCalledTimes(1);
expect(getMock).toHaveBeenCalledTimes(1);
},
});
runner.run(options);
});
});
describe('when calling run and all workers succeed but the subscriber subscribes after the run', () => {
it('then it should return the last results', (done) => {
const { runner, options, annotationQueryMock, executeAnnotationQueryMock, getMock } = getTestContext();
runner.run(options);
setTimeout(
() =>
expectOnResults({
runner,
panelId: 1,
done,
expect: (results) => {
// should have one alert state, one snapshot, one legacy and one next gen result
// having both snapshot and legacy/next gen is a imaginary example for testing purposes and doesn't exist for real
expect(results).toEqual(getExpectedForAllResult());
expect(annotationQueryMock).toHaveBeenCalledTimes(1);
expect(executeAnnotationQueryMock).toHaveBeenCalledTimes(1);
expect(getMock).toHaveBeenCalledTimes(1);
},
}),
200
); // faking a late subscriber to make sure we get the latest results
});
});
describe('when calling run and all workers fail', () => {
silenceConsoleOutput();
it('then it should return the correct results', (done) => {

View File

@@ -1,5 +1,5 @@
import { merge, Observable, ReplaySubject, Subject, Unsubscribable } from 'rxjs';
import { mergeAll, reduce, share, takeUntil } from 'rxjs/operators';
import { merge, Observable, ReplaySubject, Subject, Subscription, timer, Unsubscribable } from 'rxjs';
import { finalize, map, mapTo, mergeAll, reduce, share, takeUntil } from 'rxjs/operators';
import { AnnotationQuery } from '@grafana/data';
import { dedupAnnotations } from 'app/features/annotations/events_processing';
@@ -13,7 +13,7 @@ import {
import { AlertStatesWorker } from './AlertStatesWorker';
import { SnapshotWorker } from './SnapshotWorker';
import { AnnotationsWorker } from './AnnotationsWorker';
import { emptyResult, getAnnotationsByPanelId } from './utils';
import { getAnnotationsByPanelId } from './utils';
import { DashboardModel } from '../../../dashboard/state';
import { getTimeSrv, TimeSrv } from '../../../dashboard/services/TimeSrv';
import { RefreshEvent } from '../../../../types/events';
@@ -53,32 +53,25 @@ class DashboardQueryRunnerImpl implements DashboardQueryRunner {
}
getResult(panelId?: number): Observable<DashboardQueryRunnerResult> {
return new Observable<DashboardQueryRunnerResult>((subscriber) => {
const subscription = this.results.subscribe({
next: (result) => {
return this.results.asObservable().pipe(
map((result) => {
const annotations = getAnnotationsByPanelId(result.annotations, panelId);
const alertState = result.alertStates.find((res) => Boolean(panelId) && res.panelId === panelId);
subscriber.next({ annotations: dedupAnnotations(annotations), alertState });
},
error: (err) => subscriber.error(err),
complete: () => subscriber.complete(),
});
return () => {
subscription.unsubscribe();
};
});
return { annotations: dedupAnnotations(annotations), alertState };
})
);
}
private executeRun(options: DashboardQueryRunnerOptions) {
const workers = this.workers.filter((w) => w.canWork(options));
const workerObservables = workers.map((w) => w.work(options));
const observables = [emptyResult()].concat(workerObservables);
merge(observables)
.pipe(
const resultSubscription = new Subscription();
const resultObservable = merge(workerObservables).pipe(
takeUntil(this.runs.asObservable()),
mergeAll(),
reduce((acc, value) => {
reduce((acc: DashboardQueryRunnerWorkerResult, value: DashboardQueryRunnerWorkerResult) => {
// console.log({ acc: acc.annotations.length, value: value.annotations.length });
// should we use scan or reduce here
// reduce will only emit when all observables are completed
// scan will emit when any observable is completed
@@ -86,11 +79,34 @@ class DashboardQueryRunnerImpl implements DashboardQueryRunner {
acc.annotations = acc.annotations.concat(value.annotations);
acc.alertStates = acc.alertStates.concat(value.alertStates);
return acc;
}),
finalize(() => {
resultSubscription.unsubscribe(); // important to avoid memory leaks
}),
share() // shared because we're using it in takeUntil below
);
const timerSubscription = new Subscription();
const timerObservable = timer(200).pipe(
mapTo({ annotations: [], alertStates: [] }),
takeUntil(resultObservable),
finalize(() => {
timerSubscription.unsubscribe(); // important to avoid memory leaks
})
)
.subscribe((x) => {
this.results.next(x);
});
);
// if the result takes longer than 200ms we just publish an empty result
timerSubscription.add(
timerObservable.subscribe((result) => {
this.results.next(result);
})
);
resultSubscription.add(
resultObservable.subscribe((result: DashboardQueryRunnerWorkerResult) => {
this.results.next(result);
})
);
}
cancel(annotation: AnnotationQuery): void {

View File

@@ -1,6 +1,4 @@
import { AlertState, getDefaultTimeRange, LoadingState, PanelData, toDataFrame } from '@grafana/data';
import { DashboardQueryRunnerResult } from './DashboardQueryRunner/types';
import { mergePanelAndDashData } from './mergePanelAndDashData';
import { TestScheduler } from 'rxjs/testing';
@@ -12,33 +10,58 @@ function getTestContext() {
annotations: [toDataFrame([{ id: 'panelData' }])],
timeRange,
};
const dashData: DashboardQueryRunnerResult = {
annotations: [{ id: 'dashData' }],
alertState: { id: 1, state: AlertState.OK, dashboardId: 1, panelId: 1, newStateDate: '' },
};
const scheduler: TestScheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
return { timeRange, scheduler, panelData, dashData };
return { timeRange, scheduler, panelData };
}
describe('mergePanelAndDashboardData', () => {
describe('when both results are fast', () => {
it('then just combine the results', () => {
const { panelData, dashData, timeRange, scheduler } = getTestContext();
describe('when called and dashboard data contains annotations', () => {
it('then the annotations should be combined', () => {
const { panelData, timeRange, scheduler } = getTestContext();
scheduler.run(({ cold, expectObservable }) => {
const panelObservable = cold('10ms a', { a: panelData });
const dashObservable = cold('10ms a', { a: dashData });
const panelObservable = cold('a', { a: panelData });
const dashObservable = cold('a', { a: { annotations: [{ id: 'dashData' }] } });
const result = mergePanelAndDashData(panelObservable, dashObservable);
expectObservable(result).toBe('10ms a', {
expectObservable(result).toBe('a', {
a: {
state: LoadingState.Done,
series: [],
annotations: [toDataFrame([{ id: 'panelData' }]), toDataFrame([{ id: 'dashData' }])],
timeRange,
},
});
});
scheduler.flush();
});
});
describe('when called and dashboard data contains alert states', () => {
it('then the alert states should be added', () => {
const { panelData, timeRange, scheduler } = getTestContext();
scheduler.run(({ cold, expectObservable }) => {
const panelObservable = cold('a', { a: panelData });
const dashObservable = cold('a', {
a: {
annotations: [],
alertState: { id: 1, state: AlertState.OK, dashboardId: 1, panelId: 1, newStateDate: '' },
},
});
const result = mergePanelAndDashData(panelObservable, dashObservable);
expectObservable(result).toBe('a', {
a: {
state: LoadingState.Done,
series: [],
annotations: [toDataFrame([{ id: 'panelData' }]), toDataFrame([])],
alertState: { id: 1, state: AlertState.OK, dashboardId: 1, panelId: 1, newStateDate: '' },
timeRange,
},
@@ -49,86 +72,27 @@ describe('mergePanelAndDashboardData', () => {
});
});
describe('when dashboard results are slow', () => {
it('then flush panel data first', () => {
const { panelData, dashData, timeRange, scheduler } = getTestContext();
describe('when called and dashboard data does not contain annotations or alertState', () => {
it('then the panelData is unchanged', () => {
const { panelData, timeRange, scheduler } = getTestContext();
scheduler.run(({ cold, expectObservable }) => {
const panelObservable = cold('10ms a', { a: panelData });
const dashObservable = cold('210ms a', { a: dashData });
const panelObservable = cold('a', { a: panelData });
const dashObservable = cold('a', {
a: {
annotations: [],
},
});
const result = mergePanelAndDashData(panelObservable, dashObservable);
expectObservable(result).toBe('200ms a 9ms b', {
expectObservable(result).toBe('a', {
a: {
state: LoadingState.Done,
series: [],
annotations: [toDataFrame([{ id: 'panelData' }])],
timeRange,
},
b: {
state: LoadingState.Done,
series: [],
annotations: [toDataFrame([{ id: 'panelData' }]), toDataFrame([{ id: 'dashData' }])],
alertState: { id: 1, state: AlertState.OK, dashboardId: 1, panelId: 1, newStateDate: '' },
timeRange,
},
});
});
scheduler.flush();
});
});
describe('when panel results are slow', () => {
it('then just combine the results', () => {
const { panelData, dashData, timeRange, scheduler } = getTestContext();
scheduler.run(({ cold, expectObservable }) => {
const panelObservable = cold('210ms a', { a: panelData });
const dashObservable = cold('10ms a', { a: dashData });
const result = mergePanelAndDashData(panelObservable, dashObservable);
expectObservable(result).toBe('210ms a', {
a: {
state: LoadingState.Done,
series: [],
annotations: [toDataFrame([{ id: 'panelData' }]), toDataFrame([{ id: 'dashData' }])],
alertState: { id: 1, state: AlertState.OK, dashboardId: 1, panelId: 1, newStateDate: '' },
timeRange,
},
});
});
scheduler.flush();
});
});
describe('when both results are slow', () => {
it('then flush panel data first', () => {
const { panelData, dashData, timeRange, scheduler } = getTestContext();
scheduler.run(({ cold, expectObservable }) => {
const panelObservable = cold('210ms a', { a: panelData });
const dashObservable = cold('210ms a', { a: dashData });
const result = mergePanelAndDashData(panelObservable, dashObservable);
expectObservable(result).toBe('210ms (ab)', {
a: {
state: LoadingState.Done,
series: [],
annotations: [toDataFrame([{ id: 'panelData' }])],
timeRange,
},
b: {
state: LoadingState.Done,
series: [],
annotations: [toDataFrame([{ id: 'panelData' }]), toDataFrame([{ id: 'dashData' }])],
alertState: { id: 1, state: AlertState.OK, dashboardId: 1, panelId: 1, newStateDate: '' },
timeRange,
},
});
});

View File

@@ -1,18 +1,13 @@
import { combineLatest, merge, Observable, of, timer } from 'rxjs';
import { combineLatest, Observable, of } from 'rxjs';
import { ArrayDataFrame, PanelData } from '@grafana/data';
import { DashboardQueryRunnerResult } from './DashboardQueryRunner/types';
import { mergeMap, mergeMapTo, takeUntil } from 'rxjs/operators';
import { mergeMap } from 'rxjs/operators';
export function mergePanelAndDashData(
panelObservable: Observable<PanelData>,
dashObservable: Observable<DashboardQueryRunnerResult>
): Observable<PanelData> {
const slowDashResult: Observable<DashboardQueryRunnerResult> = merge(
timer(200).pipe(mergeMapTo(of({ annotations: [], alertState: undefined })), takeUntil(dashObservable)),
dashObservable
);
return combineLatest([panelObservable, slowDashResult]).pipe(
return combineLatest([panelObservable, dashObservable]).pipe(
mergeMap((combined) => {
const [panelData, dashData] = combined;
@@ -21,11 +16,9 @@ export function mergePanelAndDashData(
panelData.annotations = [];
}
return of({
...panelData,
annotations: panelData.annotations.concat(new ArrayDataFrame(dashData.annotations)),
alertState: dashData.alertState,
});
const annotations = panelData.annotations.concat(new ArrayDataFrame(dashData.annotations));
const alertState = dashData.alertState;
return of({ ...panelData, annotations, alertState });
}
return of(panelData);