From 455fbce020eb8ac973f664c8517b478965ad012e Mon Sep 17 00:00:00 2001 From: Ryan McKinley Date: Fri, 2 Apr 2021 16:48:23 -0700 Subject: [PATCH] Live: throttle messages when FPS decreases (#32627) * throttle when FPS is low * fix throttling * grafanaStreamingPerfBudget * grafanaStreamingPerfBudget * change global strategy * also throttle frontend Co-authored-by: Leon Sorokin --- .../src/dataframe/StreamingDataFrame.ts | 2 +- .../grafana-runtime/src/measurement/perf.ts | 28 +++++++++++++++++++ .../grafana-runtime/src/measurement/query.ts | 8 +++++- .../plugins/datasource/testdata/runStreams.ts | 18 ++++++++---- 4 files changed, 49 insertions(+), 7 deletions(-) create mode 100644 packages/grafana-runtime/src/measurement/perf.ts diff --git a/packages/grafana-data/src/dataframe/StreamingDataFrame.ts b/packages/grafana-data/src/dataframe/StreamingDataFrame.ts index 8ff38302c86..f87457c2111 100644 --- a/packages/grafana-data/src/dataframe/StreamingDataFrame.ts +++ b/packages/grafana-data/src/dataframe/StreamingDataFrame.ts @@ -42,7 +42,7 @@ function circPush(data: number[][], newData: number[][], maxLength = Infinity, d sliceIdx = nlen - maxLength; } - if (maxDelta !== Infinity) { + if (maxDelta !== Infinity && deltaIdx >= 0) { const deltaLookup = data[deltaIdx]; const low = deltaLookup[sliceIdx]; diff --git a/packages/grafana-runtime/src/measurement/perf.ts b/packages/grafana-runtime/src/measurement/perf.ts new file mode 100644 index 00000000000..2af73c371db --- /dev/null +++ b/packages/grafana-runtime/src/measurement/perf.ts @@ -0,0 +1,28 @@ +let lastUpdate = Date.now(); + +/** + * This object indicats how overloaded the main thread is + */ +export const perf = { + budget: 1, + threshold: 1.05, // trial and error appears about right + ok: true, + last: lastUpdate, +}; + +// Expose this as a global object so it can be changed locally +// NOTE: when we are confident this is the right budget, this should be removed +(window as any).grafanaStreamingPerf = perf; + +// target is 20hz (50ms), but we poll at 100ms to smooth out jitter +const interval = 100; + +function measure() { + const now = Date.now(); + perf.last = now; + perf.budget = (now - lastUpdate) / interval; + perf.ok = perf.budget <= perf.threshold; + lastUpdate = now; +} + +setInterval(measure, interval); diff --git a/packages/grafana-runtime/src/measurement/query.ts b/packages/grafana-runtime/src/measurement/query.ts index 14a850dd42a..d999c64308f 100644 --- a/packages/grafana-runtime/src/measurement/query.ts +++ b/packages/grafana-runtime/src/measurement/query.ts @@ -16,6 +16,7 @@ import { getGrafanaLiveSrv } from '../services/live'; import { Observable, of } from 'rxjs'; import { toDataQueryError } from '../utils/queryResponse'; +import { perf } from './perf'; export interface LiveDataFilter { fields?: string[]; @@ -49,6 +50,7 @@ export function getLiveDataStream(options: LiveDataStreamOptions): Observable { if (!data) { @@ -67,7 +69,11 @@ export function getLiveDataStream(options: LiveDataStreamOptions): Observable 1000 || perf.ok) { + subscriber.next({ state, data: [filtered], key }); + last = perf.last; + } }; const sub = live diff --git a/public/app/plugins/datasource/testdata/runStreams.ts b/public/app/plugins/datasource/testdata/runStreams.ts index 89b0fdeea68..1100aa74593 100644 --- a/public/app/plugins/datasource/testdata/runStreams.ts +++ b/public/app/plugins/datasource/testdata/runStreams.ts @@ -16,6 +16,7 @@ import { import { TestDataQuery, StreamingQuery } from './types'; import { getRandomLine } from './LogIpsum'; +import { perf } from '@grafana/runtime/src/measurement/perf'; // not exported export const defaultStreamQuery: StreamingQuery = { type: 'signal', @@ -68,6 +69,7 @@ export function runSignalStream( let value = Math.random() * 100; let timeoutId: any = null; + let lastSent = -1; const addNextRow = (time: number) => { value += (Math.random() - 0.5) * spread; @@ -102,11 +104,17 @@ export function runSignalStream( const pushNextEvent = () => { addNextRow(Date.now()); - subscriber.next({ - data: [frame], - key: streamId, - state: LoadingState.Streaming, - }); + + const elapsed = perf.last - lastSent; + if (elapsed > 1000 || perf.ok) { + subscriber.next({ + data: [frame], + key: streamId, + state: LoadingState.Streaming, + }); + lastSent = perf.last; + } + timeoutId = setTimeout(pushNextEvent, speed); };