diff --git a/public/app/features/query/state/DashboardQueryRunner/DashboardQueryRunner.test.ts b/public/app/features/query/state/DashboardQueryRunner/DashboardQueryRunner.test.ts index 3bd9fb77b19..dd2504fec57 100644 --- a/public/app/features/query/state/DashboardQueryRunner/DashboardQueryRunner.test.ts +++ b/public/app/features/query/state/DashboardQueryRunner/DashboardQueryRunner.test.ts @@ -64,10 +64,10 @@ function expectOnResults(args: { next: (value) => { try { expectCallback(value); - subscription.unsubscribe(); + subscription?.unsubscribe(); done(); } catch (err) { - subscription.unsubscribe(); + subscription?.unsubscribe(); done.fail(err); } }, @@ -97,6 +97,56 @@ describe('DashboardQueryRunnerImpl', () => { }); }); + describe('when calling run and all workers succeed but take longer than 200ms', () => { + it('then it should return the empty results', (done) => { + const { runner, options, annotationQueryMock, executeAnnotationQueryMock, getMock } = getTestContext(); + const wait = 201; + executeAnnotationQueryMock.mockReturnValue(toAsyncOfResult({ events: [{ id: 'NextGen' }] }).pipe(delay(wait))); + + expectOnResults({ + runner, + panelId: 1, + done, + expect: (results) => { + // should have one alert state, one snapshot, one legacy and one next gen result + // having both snapshot and legacy/next gen is a imaginary example for testing purposes and doesn't exist for real + expect(results).toEqual({ annotations: [] }); + expect(annotationQueryMock).toHaveBeenCalledTimes(1); + expect(executeAnnotationQueryMock).toHaveBeenCalledTimes(1); + expect(getMock).toHaveBeenCalledTimes(1); + }, + }); + + runner.run(options); + }); + }); + + describe('when calling run and all workers succeed but the subscriber subscribes after the run', () => { + it('then it should return the last results', (done) => { + const { runner, options, annotationQueryMock, executeAnnotationQueryMock, getMock } = getTestContext(); + + runner.run(options); + + setTimeout( + () => + expectOnResults({ + runner, + panelId: 1, + done, + expect: (results) => { + // should have one alert state, one snapshot, one legacy and one next gen result + // having both snapshot and legacy/next gen is a imaginary example for testing purposes and doesn't exist for real + expect(results).toEqual(getExpectedForAllResult()); + expect(annotationQueryMock).toHaveBeenCalledTimes(1); + expect(executeAnnotationQueryMock).toHaveBeenCalledTimes(1); + expect(getMock).toHaveBeenCalledTimes(1); + }, + }), + 200 + ); // faking a late subscriber to make sure we get the latest results + }); + }); + describe('when calling run and all workers fail', () => { silenceConsoleOutput(); it('then it should return the correct results', (done) => { diff --git a/public/app/features/query/state/DashboardQueryRunner/DashboardQueryRunner.ts b/public/app/features/query/state/DashboardQueryRunner/DashboardQueryRunner.ts index 31da2b3e68a..0ac27c6c4f3 100644 --- a/public/app/features/query/state/DashboardQueryRunner/DashboardQueryRunner.ts +++ b/public/app/features/query/state/DashboardQueryRunner/DashboardQueryRunner.ts @@ -1,5 +1,5 @@ -import { merge, Observable, ReplaySubject, Subject, Unsubscribable } from 'rxjs'; -import { mergeAll, reduce, share, takeUntil } from 'rxjs/operators'; +import { merge, Observable, ReplaySubject, Subject, Subscription, timer, Unsubscribable } from 'rxjs'; +import { finalize, map, mapTo, mergeAll, reduce, share, takeUntil } from 'rxjs/operators'; import { AnnotationQuery } from '@grafana/data'; import { dedupAnnotations } from 'app/features/annotations/events_processing'; @@ -13,7 +13,7 @@ import { import { AlertStatesWorker } from './AlertStatesWorker'; import { SnapshotWorker } from './SnapshotWorker'; import { AnnotationsWorker } from './AnnotationsWorker'; -import { emptyResult, getAnnotationsByPanelId } from './utils'; +import { getAnnotationsByPanelId } from './utils'; import { DashboardModel } from '../../../dashboard/state'; import { getTimeSrv, TimeSrv } from '../../../dashboard/services/TimeSrv'; import { RefreshEvent } from '../../../../types/events'; @@ -53,44 +53,60 @@ class DashboardQueryRunnerImpl implements DashboardQueryRunner { } getResult(panelId?: number): Observable { - return new Observable((subscriber) => { - const subscription = this.results.subscribe({ - next: (result) => { - const annotations = getAnnotationsByPanelId(result.annotations, panelId); - const alertState = result.alertStates.find((res) => Boolean(panelId) && res.panelId === panelId); - subscriber.next({ annotations: dedupAnnotations(annotations), alertState }); - }, - error: (err) => subscriber.error(err), - complete: () => subscriber.complete(), - }); - return () => { - subscription.unsubscribe(); - }; - }); + return this.results.asObservable().pipe( + map((result) => { + const annotations = getAnnotationsByPanelId(result.annotations, panelId); + const alertState = result.alertStates.find((res) => Boolean(panelId) && res.panelId === panelId); + return { annotations: dedupAnnotations(annotations), alertState }; + }) + ); } private executeRun(options: DashboardQueryRunnerOptions) { const workers = this.workers.filter((w) => w.canWork(options)); const workerObservables = workers.map((w) => w.work(options)); - const observables = [emptyResult()].concat(workerObservables); - merge(observables) - .pipe( - takeUntil(this.runs.asObservable()), - mergeAll(), - reduce((acc, value) => { - // should we use scan or reduce here - // reduce will only emit when all observables are completed - // scan will emit when any observable is completed - // choosing reduce to minimize re-renders - acc.annotations = acc.annotations.concat(value.annotations); - acc.alertStates = acc.alertStates.concat(value.alertStates); - return acc; - }) - ) - .subscribe((x) => { - this.results.next(x); - }); + const resultSubscription = new Subscription(); + const resultObservable = merge(workerObservables).pipe( + takeUntil(this.runs.asObservable()), + mergeAll(), + reduce((acc: DashboardQueryRunnerWorkerResult, value: DashboardQueryRunnerWorkerResult) => { + // console.log({ acc: acc.annotations.length, value: value.annotations.length }); + // should we use scan or reduce here + // reduce will only emit when all observables are completed + // scan will emit when any observable is completed + // choosing reduce to minimize re-renders + acc.annotations = acc.annotations.concat(value.annotations); + acc.alertStates = acc.alertStates.concat(value.alertStates); + return acc; + }), + finalize(() => { + resultSubscription.unsubscribe(); // important to avoid memory leaks + }), + share() // shared because we're using it in takeUntil below + ); + + const timerSubscription = new Subscription(); + const timerObservable = timer(200).pipe( + mapTo({ annotations: [], alertStates: [] }), + takeUntil(resultObservable), + finalize(() => { + timerSubscription.unsubscribe(); // important to avoid memory leaks + }) + ); + + // if the result takes longer than 200ms we just publish an empty result + timerSubscription.add( + timerObservable.subscribe((result) => { + this.results.next(result); + }) + ); + + resultSubscription.add( + resultObservable.subscribe((result: DashboardQueryRunnerWorkerResult) => { + this.results.next(result); + }) + ); } cancel(annotation: AnnotationQuery): void { diff --git a/public/app/features/query/state/mergePanelAndDashData.test.ts b/public/app/features/query/state/mergePanelAndDashData.test.ts index e5abbd1ea90..b559ba47878 100644 --- a/public/app/features/query/state/mergePanelAndDashData.test.ts +++ b/public/app/features/query/state/mergePanelAndDashData.test.ts @@ -1,6 +1,4 @@ import { AlertState, getDefaultTimeRange, LoadingState, PanelData, toDataFrame } from '@grafana/data'; - -import { DashboardQueryRunnerResult } from './DashboardQueryRunner/types'; import { mergePanelAndDashData } from './mergePanelAndDashData'; import { TestScheduler } from 'rxjs/testing'; @@ -12,33 +10,58 @@ function getTestContext() { annotations: [toDataFrame([{ id: 'panelData' }])], timeRange, }; - const dashData: DashboardQueryRunnerResult = { - annotations: [{ id: 'dashData' }], - alertState: { id: 1, state: AlertState.OK, dashboardId: 1, panelId: 1, newStateDate: '' }, - }; const scheduler: TestScheduler = new TestScheduler((actual, expected) => { expect(actual).toEqual(expected); }); - return { timeRange, scheduler, panelData, dashData }; + return { timeRange, scheduler, panelData }; } describe('mergePanelAndDashboardData', () => { - describe('when both results are fast', () => { - it('then just combine the results', () => { - const { panelData, dashData, timeRange, scheduler } = getTestContext(); + describe('when called and dashboard data contains annotations', () => { + it('then the annotations should be combined', () => { + const { panelData, timeRange, scheduler } = getTestContext(); scheduler.run(({ cold, expectObservable }) => { - const panelObservable = cold('10ms a', { a: panelData }); - const dashObservable = cold('10ms a', { a: dashData }); + const panelObservable = cold('a', { a: panelData }); + const dashObservable = cold('a', { a: { annotations: [{ id: 'dashData' }] } }); const result = mergePanelAndDashData(panelObservable, dashObservable); - expectObservable(result).toBe('10ms a', { + expectObservable(result).toBe('a', { a: { state: LoadingState.Done, series: [], annotations: [toDataFrame([{ id: 'panelData' }]), toDataFrame([{ id: 'dashData' }])], + timeRange, + }, + }); + }); + + scheduler.flush(); + }); + }); + + describe('when called and dashboard data contains alert states', () => { + it('then the alert states should be added', () => { + const { panelData, timeRange, scheduler } = getTestContext(); + + scheduler.run(({ cold, expectObservable }) => { + const panelObservable = cold('a', { a: panelData }); + const dashObservable = cold('a', { + a: { + annotations: [], + alertState: { id: 1, state: AlertState.OK, dashboardId: 1, panelId: 1, newStateDate: '' }, + }, + }); + + const result = mergePanelAndDashData(panelObservable, dashObservable); + + expectObservable(result).toBe('a', { + a: { + state: LoadingState.Done, + series: [], + annotations: [toDataFrame([{ id: 'panelData' }]), toDataFrame([])], alertState: { id: 1, state: AlertState.OK, dashboardId: 1, panelId: 1, newStateDate: '' }, timeRange, }, @@ -49,86 +72,27 @@ describe('mergePanelAndDashboardData', () => { }); }); - describe('when dashboard results are slow', () => { - it('then flush panel data first', () => { - const { panelData, dashData, timeRange, scheduler } = getTestContext(); + describe('when called and dashboard data does not contain annotations or alertState', () => { + it('then the panelData is unchanged', () => { + const { panelData, timeRange, scheduler } = getTestContext(); scheduler.run(({ cold, expectObservable }) => { - const panelObservable = cold('10ms a', { a: panelData }); - const dashObservable = cold('210ms a', { a: dashData }); + const panelObservable = cold('a', { a: panelData }); + const dashObservable = cold('a', { + a: { + annotations: [], + }, + }); const result = mergePanelAndDashData(panelObservable, dashObservable); - expectObservable(result).toBe('200ms a 9ms b', { + expectObservable(result).toBe('a', { a: { state: LoadingState.Done, series: [], annotations: [toDataFrame([{ id: 'panelData' }])], timeRange, }, - b: { - state: LoadingState.Done, - series: [], - annotations: [toDataFrame([{ id: 'panelData' }]), toDataFrame([{ id: 'dashData' }])], - alertState: { id: 1, state: AlertState.OK, dashboardId: 1, panelId: 1, newStateDate: '' }, - timeRange, - }, - }); - }); - - scheduler.flush(); - }); - }); - - describe('when panel results are slow', () => { - it('then just combine the results', () => { - const { panelData, dashData, timeRange, scheduler } = getTestContext(); - - scheduler.run(({ cold, expectObservable }) => { - const panelObservable = cold('210ms a', { a: panelData }); - const dashObservable = cold('10ms a', { a: dashData }); - - const result = mergePanelAndDashData(panelObservable, dashObservable); - - expectObservable(result).toBe('210ms a', { - a: { - state: LoadingState.Done, - series: [], - annotations: [toDataFrame([{ id: 'panelData' }]), toDataFrame([{ id: 'dashData' }])], - alertState: { id: 1, state: AlertState.OK, dashboardId: 1, panelId: 1, newStateDate: '' }, - timeRange, - }, - }); - }); - - scheduler.flush(); - }); - }); - - describe('when both results are slow', () => { - it('then flush panel data first', () => { - const { panelData, dashData, timeRange, scheduler } = getTestContext(); - - scheduler.run(({ cold, expectObservable }) => { - const panelObservable = cold('210ms a', { a: panelData }); - const dashObservable = cold('210ms a', { a: dashData }); - - const result = mergePanelAndDashData(panelObservable, dashObservable); - - expectObservable(result).toBe('210ms (ab)', { - a: { - state: LoadingState.Done, - series: [], - annotations: [toDataFrame([{ id: 'panelData' }])], - timeRange, - }, - b: { - state: LoadingState.Done, - series: [], - annotations: [toDataFrame([{ id: 'panelData' }]), toDataFrame([{ id: 'dashData' }])], - alertState: { id: 1, state: AlertState.OK, dashboardId: 1, panelId: 1, newStateDate: '' }, - timeRange, - }, }); }); diff --git a/public/app/features/query/state/mergePanelAndDashData.ts b/public/app/features/query/state/mergePanelAndDashData.ts index d07216000f9..1ee11a541b4 100644 --- a/public/app/features/query/state/mergePanelAndDashData.ts +++ b/public/app/features/query/state/mergePanelAndDashData.ts @@ -1,18 +1,13 @@ -import { combineLatest, merge, Observable, of, timer } from 'rxjs'; +import { combineLatest, Observable, of } from 'rxjs'; import { ArrayDataFrame, PanelData } from '@grafana/data'; import { DashboardQueryRunnerResult } from './DashboardQueryRunner/types'; -import { mergeMap, mergeMapTo, takeUntil } from 'rxjs/operators'; +import { mergeMap } from 'rxjs/operators'; export function mergePanelAndDashData( panelObservable: Observable, dashObservable: Observable ): Observable { - const slowDashResult: Observable = merge( - timer(200).pipe(mergeMapTo(of({ annotations: [], alertState: undefined })), takeUntil(dashObservable)), - dashObservable - ); - - return combineLatest([panelObservable, slowDashResult]).pipe( + return combineLatest([panelObservable, dashObservable]).pipe( mergeMap((combined) => { const [panelData, dashData] = combined; @@ -21,11 +16,9 @@ export function mergePanelAndDashData( panelData.annotations = []; } - return of({ - ...panelData, - annotations: panelData.annotations.concat(new ArrayDataFrame(dashData.annotations)), - alertState: dashData.alertState, - }); + const annotations = panelData.annotations.concat(new ArrayDataFrame(dashData.annotations)); + const alertState = dashData.alertState; + return of({ ...panelData, annotations, alertState }); } return of(panelData);