diff --git a/packages/grafana-ui/src/components/Graph/Graph.tsx b/packages/grafana-ui/src/components/Graph/Graph.tsx index 7e7ff4f98ca..60068da4c3d 100644 --- a/packages/grafana-ui/src/components/Graph/Graph.tsx +++ b/packages/grafana-ui/src/components/Graph/Graph.tsx @@ -105,7 +105,6 @@ export class Graph extends PureComponent { }; try { - console.log('Graph render'); $.plot(this.element, series, flotOptions); } catch (err) { console.log('Graph rendering error', err, flotOptions, series); diff --git a/packages/grafana-ui/src/types/data.ts b/packages/grafana-ui/src/types/data.ts index e006ebba908..fbde1b14fc1 100644 --- a/packages/grafana-ui/src/types/data.ts +++ b/packages/grafana-ui/src/types/data.ts @@ -1,6 +1,7 @@ export enum LoadingState { NotStarted = 'NotStarted', Loading = 'Loading', + Streaming = 'Streaming', Done = 'Done', Error = 'Error', } diff --git a/packages/grafana-ui/src/types/datasource.ts b/packages/grafana-ui/src/types/datasource.ts index 13e7db09dd3..8cd1e652896 100644 --- a/packages/grafana-ui/src/types/datasource.ts +++ b/packages/grafana-ui/src/types/datasource.ts @@ -1,7 +1,7 @@ import { ComponentClass } from 'react'; import { TimeRange } from './time'; import { PluginMeta } from './plugin'; -import { TableData, TimeSeries, SeriesData } from './data'; +import { TableData, TimeSeries, SeriesData, LoadingState } from './data'; import { PanelData } from './panel'; export interface DataSourcePluginOptionsEditorProps { @@ -105,7 +105,7 @@ export interface DataSourceApi { /** * Main metrics / data query action */ - query(options: DataQueryRequest): Promise; + query(options: DataQueryRequest, observer?: DataStreamObserver): Promise; /** * Test & verify datasource settings & connection details @@ -183,6 +183,47 @@ export type LegacyResponseData = TimeSeries | TableData | any; export type DataQueryResponseData = SeriesData | LegacyResponseData; +export type DataStreamObserver = (event: DataStreamState) => void; + +export interface DataStreamState { + /** + * when Done or Error no more events will be processed + */ + state: LoadingState; + + /** + * Consistent key across events. + */ + key: string; + + /** + * The stream request. The properties of this request will be examined + * to determine if the stream matches the original query. If not, it + * will be unsubscribed. + */ + request: DataQueryRequest; + + /** + * Series data may not be known yet + */ + series?: SeriesData[]; + + /** + * Error in stream (but may still be running) + */ + error?: DataQueryError; + + /** + * Optionally return only the rows that changed in this event + */ + delta?: SeriesData[]; + + /** + * Stop listening to this stream + */ + unsubscribe: () => void; +} + export interface DataQueryResponse { data: DataQueryResponseData[]; } diff --git a/pkg/tsdb/testdata/scenarios.go b/pkg/tsdb/testdata/scenarios.go index 4780dd6e662..964e6f586f3 100644 --- a/pkg/tsdb/testdata/scenarios.go +++ b/pkg/tsdb/testdata/scenarios.go @@ -225,6 +225,15 @@ func init() { }, }) + registerScenario(&Scenario{ + Id: "streaming_client", + Name: "Streaming Client", + Handler: func(query *tsdb.Query, context *tsdb.TsdbQuery) *tsdb.QueryResult { + // Real work is in javascript client + return tsdb.NewQueryResult() + }, + }) + registerScenario(&Scenario{ Id: "table_static", Name: "Table Static", diff --git a/public/app/core/services/__mocks__/backend_srv.ts b/public/app/core/services/__mocks__/backend_srv.ts index 5ac31f718c0..41c02f6f1a6 100644 --- a/public/app/core/services/__mocks__/backend_srv.ts +++ b/public/app/core/services/__mocks__/backend_srv.ts @@ -4,6 +4,7 @@ const backendSrv = { getDashboardByUid: jest.fn(), getFolderByUid: jest.fn(), post: jest.fn(), + resolveCancelerIfExists: jest.fn(), }; export function getBackendSrv() { diff --git a/public/app/features/dashboard/dashgrid/PanelChrome.tsx b/public/app/features/dashboard/dashgrid/PanelChrome.tsx index 71cf4921356..c69e7be0c4e 100644 --- a/public/app/features/dashboard/dashgrid/PanelChrome.tsx +++ b/public/app/features/dashboard/dashgrid/PanelChrome.tsx @@ -22,7 +22,7 @@ import { ScopedVars } from '@grafana/ui'; import templateSrv from 'app/features/templating/template_srv'; -import { getProcessedSeriesData } from '../state/PanelQueryRunner'; +import { getProcessedSeriesData } from '../state/PanelQueryState'; import { Unsubscribable } from 'rxjs'; const DEFAULT_PLUGIN_ERROR = 'Error in plugin'; @@ -48,6 +48,7 @@ export interface State { export class PanelChrome extends PureComponent { timeSrv: TimeSrv = getTimeSrv(); querySubscription: Unsubscribable; + delayedStateUpdate: Partial; constructor(props: Props) { super(props); @@ -118,7 +119,15 @@ export class PanelChrome extends PureComponent { } } - this.setState({ isFirstLoad, errorMessage, data }); + const stateUpdate = { isFirstLoad, errorMessage, data }; + + if (this.isVisible) { + this.setState(stateUpdate); + } else { + // if we are getting data while another panel is in fullscreen / edit mode + // we need to store the data but not update state yet + this.delayedStateUpdate = stateUpdate; + } }, }; @@ -162,9 +171,15 @@ export class PanelChrome extends PureComponent { }; onRender = () => { - this.setState({ - renderCounter: this.state.renderCounter + 1, - }); + const stateUpdate = { renderCounter: this.state.renderCounter + 1 }; + + // If we have received a data update while hidden copy over that state as well + if (this.delayedStateUpdate) { + Object.assign(stateUpdate, this.delayedStateUpdate); + this.delayedStateUpdate = null; + } + + this.setState(stateUpdate); }; onOptionsChange = (options: any) => { diff --git a/public/app/features/dashboard/state/PanelQueryRunner.test.ts b/public/app/features/dashboard/state/PanelQueryRunner.test.ts index 33db473b1d0..16cd2bc349b 100644 --- a/public/app/features/dashboard/state/PanelQueryRunner.test.ts +++ b/public/app/features/dashboard/state/PanelQueryRunner.test.ts @@ -1,42 +1,15 @@ -import { getProcessedSeriesData, PanelQueryRunner } from './PanelQueryRunner'; -import { PanelData, DataQueryRequest } from '@grafana/ui/src/types'; +import { PanelQueryRunner } from './PanelQueryRunner'; +import { + PanelData, + DataQueryRequest, + DataStreamObserver, + DataStreamState, + LoadingState, + ScopedVars, +} from '@grafana/ui/src/types'; import moment from 'moment'; -describe('PanelQueryRunner', () => { - it('converts timeseries to table skipping nulls', () => { - const input1 = { - target: 'Field Name', - datapoints: [[100, 1], [200, 2]], - }; - const input2 = { - // without target - target: '', - datapoints: [[100, 1], [200, 2]], - }; - const data = getProcessedSeriesData([null, input1, input2, null, null]); - expect(data.length).toBe(2); - expect(data[0].fields[0].name).toBe(input1.target); - expect(data[0].rows).toBe(input1.datapoints); - - // Default name - expect(data[1].fields[0].name).toEqual('Value'); - - // Every colun should have a name and a type - for (const table of data) { - for (const column of table.fields) { - expect(column.name).toBeDefined(); - expect(column.type).toBeDefined(); - } - } - }); - - it('supports null values from query OK', () => { - expect(getProcessedSeriesData([null, null, null, null])).toEqual([]); - expect(getProcessedSeriesData(undefined)).toEqual([]); - expect(getProcessedSeriesData((null as unknown) as any[])).toEqual([]); - expect(getProcessedSeriesData([])).toEqual([]); - }); -}); +jest.mock('app/core/services/backend_srv'); interface ScenarioContext { setup: (fn: () => void) => void; @@ -47,6 +20,9 @@ interface ScenarioContext { events?: PanelData[]; res?: PanelData; queryCalledWith?: DataQueryRequest; + observer: DataStreamObserver; + runner: PanelQueryRunner; + scopedVars: ScopedVars; } type ScenarioFn = (ctx: ScenarioContext) => void; @@ -57,12 +33,16 @@ function describeQueryRunnerScenario(description: string, scenarioFn: ScenarioFn const ctx: ScenarioContext = { widthPixels: 200, + scopedVars: { + server: { text: 'Server1', value: 'server-1' }, + }, + runner: new PanelQueryRunner(), + observer: (args: any) => {}, setup: (fn: () => void) => { setupFn = fn; }, }; - let runner: PanelQueryRunner; const response: any = { data: [{ target: 'hello', datapoints: [] }], }; @@ -71,9 +51,11 @@ function describeQueryRunnerScenario(description: string, scenarioFn: ScenarioFn setupFn(); const datasource: any = { + name: 'TestDB', interval: ctx.dsInterval, - query: (options: DataQueryRequest) => { + query: (options: DataQueryRequest, observer: DataStreamObserver) => { ctx.queryCalledWith = options; + ctx.observer = observer; return Promise.resolve(response); }, testDatasource: jest.fn(), @@ -81,6 +63,7 @@ function describeQueryRunnerScenario(description: string, scenarioFn: ScenarioFn const args: any = { datasource, + scopedVars: ctx.scopedVars, minInterval: ctx.minInterval, widthPixels: ctx.widthPixels, maxDataPoints: ctx.maxDataPoints, @@ -93,15 +76,15 @@ function describeQueryRunnerScenario(description: string, scenarioFn: ScenarioFn queries: [{ refId: 'A', test: 1 }], }; - runner = new PanelQueryRunner(); - runner.subscribe({ + ctx.runner = new PanelQueryRunner(); + ctx.runner.subscribe({ next: (data: PanelData) => { ctx.events.push(data); }, }); ctx.events = []; - ctx.res = await runner.run(args); + ctx.res = await ctx.runner.run(args); }); scenarioFn(ctx); @@ -109,6 +92,22 @@ function describeQueryRunnerScenario(description: string, scenarioFn: ScenarioFn } describe('PanelQueryRunner', () => { + describeQueryRunnerScenario('simple scenario', ctx => { + it('should set requestId on request', async () => { + expect(ctx.queryCalledWith.requestId).toBe('Q100'); + }); + + it('should set datasource name on request', async () => { + expect(ctx.queryCalledWith.targets[0].datasource).toBe('TestDB'); + }); + + it('should pass scopedVars to datasource with interval props', async () => { + expect(ctx.queryCalledWith.scopedVars.server.text).toBe('Server1'); + expect(ctx.queryCalledWith.scopedVars.__interval.text).toBe('5m'); + expect(ctx.queryCalledWith.scopedVars.__interval_ms.text).toBe('300000'); + }); + }); + describeQueryRunnerScenario('with no maxDataPoints or minInterval', ctx => { ctx.setup(() => { ctx.maxDataPoints = null; @@ -165,4 +164,47 @@ describe('PanelQueryRunner', () => { expect(ctx.queryCalledWith.maxDataPoints).toBe(10); }); }); + + describeQueryRunnerScenario('when datasource is streaming data', ctx => { + let streamState: DataStreamState; + let isUnsubbed = false; + + beforeEach(() => { + streamState = { + state: LoadingState.Streaming, + key: 'test-stream-1', + series: [ + { + rows: [], + fields: [], + name: 'I am a magic stream', + }, + ], + request: { + requestId: ctx.queryCalledWith.requestId, + } as any, + unsubscribe: () => { + isUnsubbed = true; + }, + }; + ctx.observer(streamState); + }); + + it('should push another update to subscriber', async () => { + expect(ctx.events.length).toBe(2); + }); + + it('should set state to streaming', async () => { + expect(ctx.events[1].state).toBe(LoadingState.Streaming); + }); + + it('should not unsubscribe', async () => { + expect(isUnsubbed).toBe(false); + }); + + it('destroy should unsubscribe streams', async () => { + ctx.runner.destroy(); + expect(isUnsubbed).toBe(true); + }); + }); }); diff --git a/public/app/features/dashboard/state/PanelQueryRunner.ts b/public/app/features/dashboard/state/PanelQueryRunner.ts index 208a9c7686c..04c4f9f7a74 100644 --- a/public/app/features/dashboard/state/PanelQueryRunner.ts +++ b/public/app/features/dashboard/state/PanelQueryRunner.ts @@ -1,27 +1,17 @@ // Libraries import cloneDeep from 'lodash/cloneDeep'; +import throttle from 'lodash/throttle'; import { Subject, Unsubscribable, PartialObserver } from 'rxjs'; // Services & Utils import { getDatasourceSrv } from 'app/features/plugins/datasource_srv'; import kbn from 'app/core/utils/kbn'; import templateSrv from 'app/features/templating/template_srv'; - -// Components & Types -import { - guessFieldTypes, - toSeriesData, - PanelData, - LoadingState, - DataQuery, - TimeRange, - ScopedVars, - DataQueryRequest, - SeriesData, - DataSourceApi, -} from '@grafana/ui'; import { PanelQueryState } from './PanelQueryState'; +// Types +import { PanelData, DataQuery, TimeRange, ScopedVars, DataQueryRequest, DataSourceApi } from '@grafana/ui'; + export interface QueryRunnerOptions { datasource: string | DataSourceApi; queries: TQuery[]; @@ -54,6 +44,10 @@ export class PanelQueryRunner { private state = new PanelQueryState(); + constructor() { + this.state.onStreamingDataUpdated = this.onStreamingDataUpdated; + } + /** * Listen for updates to the PanelData. If a query has already run for this panel, * the results will be immediatly passed to the observer @@ -73,7 +67,7 @@ export class PanelQueryRunner { } // Send the last result - if (this.state.data.state !== LoadingState.NotStarted) { + if (this.state.isStarted()) { observer.next(this.state.getDataAfterCheckingFormats()); } @@ -103,6 +97,9 @@ export class PanelQueryRunner { delayStateNotification, } = options; + // filter out hidden queries & deep clone them + const clonedAndFilteredQueries = cloneDeep(queries.filter(q => !q.hide)); + const request: DataQueryRequest = { requestId: getNextRequestId(), timezone, @@ -112,26 +109,20 @@ export class PanelQueryRunner { timeInfo, interval: '', intervalMs: 0, - targets: cloneDeep( - queries.filter(q => { - return !q.hide; // Skip any hidden queries - }) - ), + targets: clonedAndFilteredQueries, maxDataPoints: maxDataPoints || widthPixels, scopedVars: scopedVars || {}, cacheTimeout, startTime: Date.now(), }; - // Deprecated + + // Add deprecated property (request as any).rangeRaw = timeRange.raw; let loadingStateTimeoutId = 0; try { - const ds = - datasource && (datasource as any).query - ? (datasource as DataSourceApi) - : await getDatasourceSrv().get(datasource as string, request.scopedVars); + const ds = await getDataSource(datasource, request.scopedVars); // Attach the datasource name to each query request.targets = request.targets.map(query => { @@ -148,17 +139,19 @@ export class PanelQueryRunner { // and add built in variables interval and interval_ms request.scopedVars = Object.assign({}, request.scopedVars, { __interval: { text: norm.interval, value: norm.interval }, - __interval_ms: { text: norm.intervalMs, value: norm.intervalMs }, + __interval_ms: { text: norm.intervalMs.toString(), value: norm.intervalMs }, }); request.interval = norm.interval; request.intervalMs = norm.intervalMs; // Check if we can reuse the already issued query - if (state.isRunning()) { + const active = state.getActiveRunner(); + if (active) { if (state.isSameQuery(ds, request)) { - // TODO? maybe cancel if it has run too long? - return state.getCurrentExecutor(); + // Maybe cancel if it has run too long? + console.log('Trying to execute query while last one has yet to complete, returning same promise'); + return active; } else { state.cancel('Query Changed while running'); } @@ -166,8 +159,8 @@ export class PanelQueryRunner { // Send a loading status event on slower queries loadingStateTimeoutId = window.setTimeout(() => { - if (this.state.isRunning()) { - this.subject.next(this.state.data); + if (state.getActiveRunner()) { + this.subject.next(this.state.validateStreamsAndGetPanelData()); } }, delayStateNotification || 500); @@ -188,6 +181,18 @@ export class PanelQueryRunner { } } + /** + * Called after every streaming event. This should be throttled so we + * avoid accidentally overwhelming the browser + */ + onStreamingDataUpdated = throttle( + () => { + this.subject.next(this.state.validateStreamsAndGetPanelData()); + }, + 50, + { trailing: true, leading: true } + ); + /** * Called when the panel is closed */ @@ -202,22 +207,12 @@ export class PanelQueryRunner { } } -/** - * All panels will be passed tables that have our best guess at colum type set - * - * This is also used by PanelChrome for snapshot support - */ -export function getProcessedSeriesData(results?: any[]): SeriesData[] { - if (!results) { - return []; +async function getDataSource( + datasource: string | DataSourceApi | null, + scopedVars: ScopedVars +): Promise { + if (datasource && (datasource as any).query) { + return datasource as DataSourceApi; } - - const series: SeriesData[] = []; - for (const r of results) { - if (r) { - series.push(guessFieldTypes(toSeriesData(r))); - } - } - - return series; + return await getDatasourceSrv().get(datasource as string, scopedVars); } diff --git a/public/app/features/dashboard/state/PanelQueryState.test.ts b/public/app/features/dashboard/state/PanelQueryState.test.ts index 6d7004a9310..486d6f29e03 100644 --- a/public/app/features/dashboard/state/PanelQueryState.test.ts +++ b/public/app/features/dashboard/state/PanelQueryState.test.ts @@ -1,6 +1,6 @@ -import { toDataQueryError, PanelQueryState } from './PanelQueryState'; +import { toDataQueryError, PanelQueryState, getProcessedSeriesData } from './PanelQueryState'; import { MockDataSourceApi } from 'test/mocks/datasource_srv'; -import { DataQueryResponse } from '@grafana/ui'; +import { DataQueryResponse, LoadingState } from '@grafana/ui'; import { getQueryOptions } from 'test/helpers/getQueryOptions'; describe('PanelQueryState', () => { @@ -17,11 +17,11 @@ describe('PanelQueryState', () => { it('keeps track of running queries', async () => { const state = new PanelQueryState(); - expect(state.isRunning()).toBeFalsy(); + expect(state.getActiveRunner()).toBeFalsy(); let hasRun = false; const dsRunner = new Promise((resolve, reject) => { // The status should be running when we get here - expect(state.isRunning()).toBeTruthy(); + expect(state.getActiveRunner()).toBeTruthy(); resolve({ data: ['x', 'y'] }); hasRun = true; }); @@ -30,7 +30,7 @@ describe('PanelQueryState', () => { // should not actually run for an empty query let empty = await state.execute(ds, getQueryOptions({})); - expect(state.isRunning()).toBeFalsy(); + expect(state.getActiveRunner()).toBeFalsy(); expect(empty.series.length).toBe(0); expect(hasRun).toBeFalsy(); @@ -39,8 +39,162 @@ describe('PanelQueryState', () => { getQueryOptions({ targets: [{ hide: true, refId: 'X' }, { hide: true, refId: 'Y' }, { hide: true, refId: 'Z' }] }) ); // should not run any hidden queries' - expect(state.isRunning()).toBeFalsy(); + expect(state.getActiveRunner()).toBeFalsy(); expect(empty.series.length).toBe(0); expect(hasRun).toBeFalsy(); }); }); + +describe('getProcessedSeriesData', () => { + it('converts timeseries to table skipping nulls', () => { + const input1 = { + target: 'Field Name', + datapoints: [[100, 1], [200, 2]], + }; + const input2 = { + // without target + target: '', + datapoints: [[100, 1], [200, 2]], + }; + const data = getProcessedSeriesData([null, input1, input2, null, null]); + expect(data.length).toBe(2); + expect(data[0].fields[0].name).toBe(input1.target); + expect(data[0].rows).toBe(input1.datapoints); + + // Default name + expect(data[1].fields[0].name).toEqual('Value'); + + // Every colun should have a name and a type + for (const table of data) { + for (const column of table.fields) { + expect(column.name).toBeDefined(); + expect(column.type).toBeDefined(); + } + } + }); + + it('supports null values from query OK', () => { + expect(getProcessedSeriesData([null, null, null, null])).toEqual([]); + expect(getProcessedSeriesData(undefined)).toEqual([]); + expect(getProcessedSeriesData((null as unknown) as any[])).toEqual([]); + expect(getProcessedSeriesData([])).toEqual([]); + }); +}); + +function makeSeriesStub(refId: string) { + return { + fields: [{ name: 'a' }], + rows: [], + refId, + }; +} + +describe('stream handling', () => { + const state = new PanelQueryState(); + state.onStreamingDataUpdated = () => { + // nothing + }; + state.request = { + requestId: '123', + range: { + raw: { + from: 123, // if string it gets revaluated + }, + }, + } as any; + state.response = { + state: LoadingState.Done, + series: [makeSeriesStub('A'), makeSeriesStub('B')], + }; + + it('gets the response', () => { + const data = state.validateStreamsAndGetPanelData(); + expect(data.series.length).toBe(2); + expect(data.state).toBe(LoadingState.Done); + expect(data.series[0].refId).toBe('A'); + }); + + it('adds a stream event', () => { + // Post a stream event + state.dataStreamObserver({ + state: LoadingState.Loading, + key: 'C', + request: state.request, // From the same request + series: [makeSeriesStub('C')], + unsubscribe: () => {}, + }); + expect(state.streams.length).toBe(1); + + const data = state.validateStreamsAndGetPanelData(); + expect(data.series.length).toBe(3); + expect(data.state).toBe(LoadingState.Streaming); + expect(data.series[2].refId).toBe('C'); + }); + + it('add another stream event (with a differnet key)', () => { + // Post a stream event + state.dataStreamObserver({ + state: LoadingState.Loading, + key: 'D', + request: state.request, // From the same request + series: [makeSeriesStub('D')], + unsubscribe: () => {}, + }); + expect(state.streams.length).toBe(2); + + const data = state.validateStreamsAndGetPanelData(); + expect(data.series.length).toBe(4); + expect(data.state).toBe(LoadingState.Streaming); + expect(data.series[3].refId).toBe('D'); + }); + + it('replace the first stream value, but keep the order', () => { + // Post a stream event + state.dataStreamObserver({ + state: LoadingState.Loading, + key: 'C', // The key to replace previous index 2 + request: state.request, // From the same request + series: [makeSeriesStub('X')], + unsubscribe: () => {}, + }); + expect(state.streams.length).toBe(2); + + const data = state.validateStreamsAndGetPanelData(); + expect(data.series[2].refId).toBe('X'); + }); + + it('ignores streams from a differnet request', () => { + // Post a stream event + state.dataStreamObserver({ + state: LoadingState.Loading, + key: 'Z', // Note with key 'A' it would still overwrite + request: { + ...state.request, + requestId: 'XXX', // Different request and id + } as any, + series: [makeSeriesStub('C')], + unsubscribe: () => {}, + }); + + expect(state.streams.length).toBe(2); // no change + const data = state.validateStreamsAndGetPanelData(); + expect(data.series.length).toBe(4); + }); + + it('removes streams when the query changes', () => { + state.request = { + ...state.request, + requestId: 'somethine else', + } as any; + state.response = { + state: LoadingState.Done, + series: [makeSeriesStub('F')], + }; + expect(state.streams.length).toBe(2); // unchanged + + const data = state.validateStreamsAndGetPanelData(); + expect(data.series.length).toBe(1); + expect(data.series[0].refId).toBe('F'); + expect(state.streams.length).toBe(0); // no streams + }); +}); diff --git a/public/app/features/dashboard/state/PanelQueryState.ts b/public/app/features/dashboard/state/PanelQueryState.ts index b71b9e24aaa..69f9aeb71e2 100644 --- a/public/app/features/dashboard/state/PanelQueryState.ts +++ b/public/app/features/dashboard/state/PanelQueryState.ts @@ -1,16 +1,25 @@ +// Libraries +import isString from 'lodash/isString'; +import isEqual from 'lodash/isEqual'; + +// Utils & Services +import { getBackendSrv } from 'app/core/services/backend_srv'; +import * as dateMath from 'app/core/utils/datemath'; +import { guessFieldTypes, toSeriesData, isSeriesData } from '@grafana/ui/src/utils'; + +// Types import { DataSourceApi, DataQueryRequest, PanelData, LoadingState, toLegacyResponseData, - isSeriesData, - toSeriesData, DataQueryError, + DataStreamObserver, + DataStreamState, + SeriesData, + DataQueryResponseData, } from '@grafana/ui'; -import { getProcessedSeriesData } from './PanelQueryRunner'; -import { getBackendSrv } from 'app/core/services/backend_srv'; -import isEqual from 'lodash/isEqual'; export class PanelQueryState { // The current/last running request @@ -19,26 +28,33 @@ export class PanelQueryState { endTime: 1000, // Somethign not zero } as DataQueryRequest; - // The best known state of data - data = { + // The result back from the datasource query + response = { state: LoadingState.NotStarted, series: [], } as PanelData; + // Active stream results + streams: DataStreamState[] = []; + sendSeries = false; sendLegacy = false; // A promise for the running query - private executor: Promise = {} as any; + private executor?: Promise; private rejector = (reason?: any) => {}; private datasource: DataSourceApi = {} as any; - isRunning() { - return this.data.state === LoadingState.Loading; // + isFinished(state: LoadingState) { + return state === LoadingState.Done || state === LoadingState.Error; + } + + isStarted() { + return this.response.state !== LoadingState.NotStarted; } isSameQuery(ds: DataSourceApi, req: DataQueryRequest) { - if (this.datasource !== this.datasource) { + if (ds !== this.datasource) { return false; } @@ -46,16 +62,22 @@ export class PanelQueryState { return isEqual(this.request.targets, req.targets); } - getCurrentExecutor() { + /** + * Return the currently running query + */ + getActiveRunner(): Promise | undefined { return this.executor; } cancel(reason: string) { const { request } = this; + this.executor = null; + try { + // If no endTime the call to datasource.query did not complete + // call rejector to reject the executor promise if (!request.endTime) { request.endTime = Date.now(); - this.rejector('Canceled:' + reason); } @@ -64,8 +86,11 @@ export class PanelQueryState { getBackendSrv().resolveCancelerIfExists(request.requestId); } } catch (err) { - console.log('Error canceling request'); + console.log('Error canceling request', err); } + + // Close any open streams + this.closeStreams(true); } execute(ds: DataSourceApi, req: DataQueryRequest): Promise { @@ -75,85 +100,214 @@ export class PanelQueryState { if (!req.targets.length) { console.log('No queries, so return early'); this.request.endTime = Date.now(); + this.closeStreams(); return Promise.resolve( - (this.data = { + (this.response = { state: LoadingState.Done, series: [], // Clear the data legacy: [], - request: req, }) ); } // Set the loading state immediatly - this.data.state = LoadingState.Loading; - return (this.executor = new Promise((resolve, reject) => { + this.response.state = LoadingState.Loading; + this.executor = new Promise((resolve, reject) => { this.rejector = reject; return ds - .query(this.request) + .query(this.request, this.dataStreamObserver) .then(resp => { this.request.endTime = Date.now(); + this.executor = null; // Make sure we send something back -- called run() w/o subscribe! if (!(this.sendSeries || this.sendLegacy)) { this.sendSeries = true; } - // Make sure the response is in a supported format - const series = this.sendSeries ? getProcessedSeriesData(resp.data) : []; - const legacy = this.sendLegacy - ? resp.data.map(v => { - if (isSeriesData(v)) { - return toLegacyResponseData(v); - } - return v; - }) - : undefined; - - resolve( - (this.data = { - state: LoadingState.Done, - request: this.request, - series, - legacy, - }) - ); + // Save the result state + this.response = { + state: LoadingState.Done, + request: this.request, + series: this.sendSeries ? getProcessedSeriesData(resp.data) : [], + legacy: this.sendLegacy ? translateToLegacyData(resp.data) : undefined, + }; + resolve(this.validateStreamsAndGetPanelData()); }) .catch(err => { + this.executor = null; resolve(this.setError(err)); }); - })); + }); + + return this.executor; + } + + // Send a notice when the stream has updated the current model + onStreamingDataUpdated: () => void; + + // This gets all stream events and keeps track of them + // it will then delegate real changes to the PanelQueryRunner + dataStreamObserver: DataStreamObserver = (stream: DataStreamState) => { + // Streams only work with the 'series' format + this.sendSeries = true; + + // Add the stream to our list + let found = false; + const active = this.streams.map(s => { + if (s.key === stream.key) { + found = true; + return stream; + } + return s; + }); + + if (!found) { + if (shouldDisconnect(this.request, stream)) { + console.log('Got stream update from old stream, unsubscribing'); + stream.unsubscribe(); + return; + } + active.push(stream); + } + + this.streams = active; + this.onStreamingDataUpdated(); + }; + + closeStreams(keepSeries = false) { + if (!this.streams.length) { + return; + } + + const series: SeriesData[] = []; + + for (const stream of this.streams) { + if (stream.series) { + series.push.apply(series, stream.series); + } + + try { + stream.unsubscribe(); + } catch { + console.log('Failed to unsubscribe to stream'); + } + } + + this.streams = []; + + // Move the series from streams to the resposne + if (keepSeries) { + const { response } = this; + this.response = { + ...response, + series: [ + ...response.series, + ...series, // Append the streamed series + ], + }; + } + } + + /** + * This is called before broadcasting data to listeners. Given that + * stream events can happen at any point, we need to make sure to + * only return data from active streams. + */ + validateStreamsAndGetPanelData(): PanelData { + const { response, streams, request } = this; + + // When not streaming, return the response + request + if (!streams.length) { + return { + ...response, + request: request, + }; + } + + let done = this.isFinished(response.state); + const series = [...response.series]; + const active: DataStreamState[] = []; + + for (const stream of this.streams) { + if (shouldDisconnect(request, stream)) { + console.log('getPanelData() - shouldDisconnect true, unsubscribing to steam'); + stream.unsubscribe(); + continue; + } + + active.push(stream); + series.push.apply(series, stream.series); + + if (!this.isFinished(stream.state)) { + done = false; + } + } + + this.streams = active; + + // Update the time range + let timeRange = this.request.range; + if (isString(timeRange.raw.from)) { + timeRange = { + from: dateMath.parse(timeRange.raw.from, false), + to: dateMath.parse(timeRange.raw.to, true), + raw: timeRange.raw, + }; + } + + return { + state: done ? LoadingState.Done : LoadingState.Streaming, + series, // Union of series from response and all streams + legacy: this.sendLegacy ? translateToLegacyData(series) : undefined, + request: { + ...this.request, + range: timeRange, // update the time range + }, + }; } /** * Make sure all requested formats exist on the data */ getDataAfterCheckingFormats(): PanelData { - const { data, sendLegacy, sendSeries } = this; - if (sendLegacy && (!data.legacy || !data.legacy.length)) { - data.legacy = data.series.map(v => toLegacyResponseData(v)); + const { response, sendLegacy, sendSeries } = this; + if (sendLegacy && (!response.legacy || !response.legacy.length)) { + response.legacy = response.series.map(v => toLegacyResponseData(v)); } - if (sendSeries && !data.series.length && data.legacy) { - data.series = data.legacy.map(v => toSeriesData(v)); + if (sendSeries && !response.series.length && response.legacy) { + response.series = response.legacy.map(v => toSeriesData(v)); } - return this.data; + return this.validateStreamsAndGetPanelData(); } setError(err: any): PanelData { if (!this.request.endTime) { this.request.endTime = Date.now(); } - - return (this.data = { - ...this.data, // Keep any existing data + this.closeStreams(true); + this.response = { + ...this.response, // Keep any existing data state: LoadingState.Error, error: toDataQueryError(err), - request: this.request, - }); + }; + return this.validateStreamsAndGetPanelData(); } } +export function shouldDisconnect(source: DataQueryRequest, state: DataStreamState) { + // It came from the same the same request, so keep it + if (source === state.request || state.request.requestId.startsWith(source.requestId)) { + return false; + } + + // We should be able to check that it is the same query regardless of + // if it came from the same request. This will be important for #16676 + + return true; +} + export function toDataQueryError(err: any): DataQueryError { const error = (err || {}) as DataQueryError; if (!error.message) { @@ -175,3 +329,32 @@ export function toDataQueryError(err: any): DataQueryError { } return error; } + +function translateToLegacyData(data: DataQueryResponseData) { + return data.map(v => { + if (isSeriesData(v)) { + return toLegacyResponseData(v); + } + return v; + }); +} + +/** + * All panels will be passed tables that have our best guess at colum type set + * + * This is also used by PanelChrome for snapshot support + */ +export function getProcessedSeriesData(results?: any[]): SeriesData[] { + if (!results) { + return []; + } + + const series: SeriesData[] = []; + for (const r of results) { + if (r) { + series.push(guessFieldTypes(toSeriesData(r))); + } + } + + return series; +} diff --git a/public/app/plugins/datasource/testdata/StreamHandler.ts b/public/app/plugins/datasource/testdata/StreamHandler.ts new file mode 100644 index 00000000000..02159a72a65 --- /dev/null +++ b/public/app/plugins/datasource/testdata/StreamHandler.ts @@ -0,0 +1,400 @@ +import defaults from 'lodash/defaults'; +import { + DataQueryRequest, + FieldType, + SeriesData, + DataQueryResponse, + DataQueryError, + DataStreamObserver, + DataStreamState, + LoadingState, +} from '@grafana/ui'; +import { TestDataQuery, StreamingQuery } from './types'; + +export const defaultQuery: StreamingQuery = { + type: 'signal', + speed: 250, // ms + spread: 3.5, + noise: 2.2, +}; + +type StreamWorkers = { + [key: string]: StreamWorker; +}; + +export class StreamHandler { + workers: StreamWorkers = {}; + + process(req: DataQueryRequest, observer: DataStreamObserver): DataQueryResponse | undefined { + let resp: DataQueryResponse; + + for (const query of req.targets) { + if ('streaming_client' !== query.scenarioId) { + continue; + } + + if (!resp) { + resp = { data: [] }; + } + + // set stream option defaults + query.stream = defaults(query.stream, defaultQuery); + // create stream key + const key = req.dashboardId + '/' + req.panelId + '/' + query.refId; + + if (this.workers[key]) { + const existing = this.workers[key]; + if (existing.update(query, req)) { + continue; + } + existing.unsubscribe(); + delete this.workers[key]; + } + + const type = query.stream.type; + if (type === 'signal') { + this.workers[key] = new SignalWorker(key, query, req, observer); + } else if (type === 'logs') { + this.workers[key] = new LogsWorker(key, query, req, observer); + } else { + throw { + message: 'Unknown Stream type: ' + type, + refId: query.refId, + } as DataQueryError; + } + } + return resp; + } +} + +/** + * Manages a single stream request + */ +export class StreamWorker { + query: StreamingQuery; + stream: DataStreamState; + observer: DataStreamObserver; + last = -1; + timeoutId = 0; + + constructor(key: string, query: TestDataQuery, request: DataQueryRequest, observer: DataStreamObserver) { + this.stream = { + key, + state: LoadingState.Streaming, + request, + unsubscribe: this.unsubscribe, + }; + this.query = query.stream; + this.last = Date.now(); + this.observer = observer; + console.log('Creating Test Stream: ', this); + } + + unsubscribe = () => { + this.observer = null; + if (this.timeoutId) { + clearTimeout(this.timeoutId); + this.timeoutId = 0; + } + }; + + update(query: TestDataQuery, request: DataQueryRequest): boolean { + // Check if stream has been unsubscribed or query changed type + if (this.observer === null || this.query.type !== query.stream.type) { + return false; + } + this.query = query.stream; + this.stream.request = request; // OK? + console.log('Reuse Test Stream: ', this); + return true; + } + + appendRows(append: any[][]) { + // Trim the maximum row count + const { query, stream } = this; + const maxRows = query.buffer ? query.buffer : stream.request.maxDataPoints; + + // Edit the first series + const series = stream.series[0]; + let rows = series.rows.concat(append); + const extra = maxRows - rows.length; + if (extra < 0) { + rows = rows.slice(extra * -1); + } + series.rows = rows; + + // Tell the event about only the rows that changed (it may want to process them) + stream.delta = [{ ...series, rows: append }]; + + // Broadcast the changes + if (this.observer) { + this.observer(stream); + } else { + console.log('StreamWorker working without any observer'); + } + + this.last = Date.now(); + } +} + +export class SignalWorker extends StreamWorker { + value: number; + + constructor(key: string, query: TestDataQuery, request: DataQueryRequest, observer: DataStreamObserver) { + super(key, query, request, observer); + setTimeout(() => { + this.stream.series = [this.initBuffer(query.refId)]; + this.looper(); + }, 10); + } + + nextRow = (time: number) => { + const { spread, noise } = this.query; + this.value += (Math.random() - 0.5) * spread; + return [ + time, + this.value, // Value + this.value - Math.random() * noise, // MIN + this.value + Math.random() * noise, // MAX + ]; + }; + + initBuffer(refId: string): SeriesData { + const { speed, buffer } = this.query; + const data = { + fields: [ + { name: 'Time', type: FieldType.time }, + { name: 'Value', type: FieldType.number }, + { name: 'Min', type: FieldType.number }, + { name: 'Max', type: FieldType.number }, + ], + rows: [], + refId, + name: 'Signal ' + refId, + } as SeriesData; + + const request = this.stream.request; + + this.value = Math.random() * 100; + const maxRows = buffer ? buffer : request.maxDataPoints; + let time = Date.now() - maxRows * speed; + for (let i = 0; i < maxRows; i++) { + data.rows.push(this.nextRow(time)); + time += speed; + } + return data; + } + + looper = () => { + if (!this.observer) { + const request = this.stream.request; + const elapsed = request.startTime - Date.now(); + if (elapsed > 1000) { + console.log('Stop looping'); + return; + } + } + + // Make sure it has a minimum speed + const { query } = this; + if (query.speed < 5) { + query.speed = 5; + } + + this.appendRows([this.nextRow(Date.now())]); + this.timeoutId = window.setTimeout(this.looper, query.speed); + }; +} + +export class LogsWorker extends StreamWorker { + index = 0; + + constructor(key: string, query: TestDataQuery, request: DataQueryRequest, observer: DataStreamObserver) { + super(key, query, request, observer); + + window.setTimeout(() => { + this.stream.series = [this.initBuffer(query.refId)]; + this.looper(); + }, 10); + } + + getNextWord() { + this.index = (this.index + Math.floor(Math.random() * 5)) % words.length; + return words[this.index]; + } + + getRandomLine() { + let line = this.getNextWord(); + while (line.length < 80) { + line += ' ' + this.getNextWord(); + } + return line; + } + + nextRow = (time: number) => { + return [time, this.getRandomLine()]; + }; + + initBuffer(refId: string): SeriesData { + const { speed, buffer } = this.query; + const data = { + fields: [{ name: 'Time', type: FieldType.time }, { name: 'Line', type: FieldType.string }], + rows: [], + refId, + name: 'Logs ' + refId, + } as SeriesData; + + const request = this.stream.request; + + const maxRows = buffer ? buffer : request.maxDataPoints; + let time = Date.now() - maxRows * speed; + for (let i = 0; i < maxRows; i++) { + data.rows.push(this.nextRow(time)); + time += speed; + } + return data; + } + + looper = () => { + if (!this.observer) { + const request = this.stream.request; + const elapsed = request.startTime - Date.now(); + if (elapsed > 1000) { + console.log('Stop looping'); + return; + } + } + + // Make sure it has a minimum speed + const { query } = this; + if (query.speed < 5) { + query.speed = 5; + } + + this.appendRows([this.nextRow(Date.now())]); + this.timeoutId = window.setTimeout(this.looper, query.speed); + }; +} + +const words = [ + 'At', + 'vero', + 'eos', + 'et', + 'accusamus', + 'et', + 'iusto', + 'odio', + 'dignissimos', + 'ducimus', + 'qui', + 'blanditiis', + 'praesentium', + 'voluptatum', + 'deleniti', + 'atque', + 'corrupti', + 'quos', + 'dolores', + 'et', + 'quas', + 'molestias', + 'excepturi', + 'sint', + 'occaecati', + 'cupiditate', + 'non', + 'provident', + 'similique', + 'sunt', + 'in', + 'culpa', + 'qui', + 'officia', + 'deserunt', + 'mollitia', + 'animi', + 'id', + 'est', + 'laborum', + 'et', + 'dolorum', + 'fuga', + 'Et', + 'harum', + 'quidem', + 'rerum', + 'facilis', + 'est', + 'et', + 'expedita', + 'distinctio', + 'Nam', + 'libero', + 'tempore', + 'cum', + 'soluta', + 'nobis', + 'est', + 'eligendi', + 'optio', + 'cumque', + 'nihil', + 'impedit', + 'quo', + 'minus', + 'id', + 'quod', + 'maxime', + 'placeat', + 'facere', + 'possimus', + 'omnis', + 'voluptas', + 'assumenda', + 'est', + 'omnis', + 'dolor', + 'repellendus', + 'Temporibus', + 'autem', + 'quibusdam', + 'et', + 'aut', + 'officiis', + 'debitis', + 'aut', + 'rerum', + 'necessitatibus', + 'saepe', + 'eveniet', + 'ut', + 'et', + 'voluptates', + 'repudiandae', + 'sint', + 'et', + 'molestiae', + 'non', + 'recusandae', + 'Itaque', + 'earum', + 'rerum', + 'hic', + 'tenetur', + 'a', + 'sapiente', + 'delectus', + 'ut', + 'aut', + 'reiciendis', + 'voluptatibus', + 'maiores', + 'alias', + 'consequatur', + 'aut', + 'perferendis', + 'doloribus', + 'asperiores', + 'repellat', +]; diff --git a/public/app/plugins/datasource/testdata/datasource.ts b/public/app/plugins/datasource/testdata/datasource.ts index a796717198f..52f6080bbd5 100644 --- a/public/app/plugins/datasource/testdata/datasource.ts +++ b/public/app/plugins/datasource/testdata/datasource.ts @@ -1,6 +1,15 @@ import _ from 'lodash'; -import { DataSourceApi, DataQueryRequest, TableData, TimeSeries } from '@grafana/ui'; +import { + DataSourceApi, + DataQueryRequest, + TableData, + TimeSeries, + DataSourceInstanceSettings, + DataStreamObserver, +} from '@grafana/ui'; import { TestDataQuery, Scenario } from './types'; +import { getBackendSrv } from 'app/core/services/backend_srv'; +import { StreamHandler } from './StreamHandler'; type TestData = TimeSeries | TableData; @@ -10,16 +19,15 @@ export interface TestDataRegistry { export class TestDataDatasource implements DataSourceApi { id: number; + streams = new StreamHandler(); /** @ngInject */ - constructor(instanceSettings, private backendSrv, private $q) { + constructor(instanceSettings: DataSourceInstanceSettings) { this.id = instanceSettings.id; } - query(options: DataQueryRequest) { - const queries = _.filter(options.targets, item => { - return item.hide !== true; - }).map(item => { + query(options: DataQueryRequest, observer: DataStreamObserver) { + const queries = options.targets.map(item => { return { refId: item.refId, scenarioId: item.scenarioId, @@ -33,10 +41,16 @@ export class TestDataDatasource implements DataSourceApi { }); if (queries.length === 0) { - return this.$q.when({ data: [] }); + return Promise.resolve({ data: [] }); } - return this.backendSrv + // Currently we do not support mixed with client only streaming + const resp = this.streams.process(options, observer); + if (resp) { + return Promise.resolve(resp); + } + + return getBackendSrv() .datasourceRequest({ method: 'POST', url: '/api/tsdb/query', @@ -76,7 +90,7 @@ export class TestDataDatasource implements DataSourceApi { }); } - annotationQuery(options) { + annotationQuery(options: any) { let timeWalker = options.range.from.valueOf(); const to = options.range.to.valueOf(); const events = []; @@ -92,7 +106,7 @@ export class TestDataDatasource implements DataSourceApi { }); timeWalker += step; } - return this.$q.when(events); + return Promise.resolve(events); } getQueryDisplayText(query: TestDataQuery) { @@ -110,6 +124,6 @@ export class TestDataDatasource implements DataSourceApi { } getScenarios(): Promise { - return this.backendSrv.get('/api/tsdb/testdata/scenarios'); + return getBackendSrv().get('/api/tsdb/testdata/scenarios'); } } diff --git a/public/app/plugins/datasource/testdata/partials/query.editor.html b/public/app/plugins/datasource/testdata/partials/query.editor.html index fc16f2a8b44..78fd3b44f89 100644 --- a/public/app/plugins/datasource/testdata/partials/query.editor.html +++ b/public/app/plugins/datasource/testdata/partials/query.editor.html @@ -36,4 +36,50 @@
+
+
+ +
+ +
+
+
+ + +
+
+ + +
+
+ + +
+
+
+
+
diff --git a/public/app/plugins/datasource/testdata/query_ctrl.ts b/public/app/plugins/datasource/testdata/query_ctrl.ts index 7a40d264f64..c0667a33000 100644 --- a/public/app/plugins/datasource/testdata/query_ctrl.ts +++ b/public/app/plugins/datasource/testdata/query_ctrl.ts @@ -2,6 +2,8 @@ import _ from 'lodash'; import { QueryCtrl } from 'app/plugins/sdk'; import moment from 'moment'; +import { defaultQuery } from './StreamHandler'; +import { getBackendSrv } from 'app/core/services/backend_srv'; export class TestDataQueryCtrl extends QueryCtrl { static templateUrl = 'partials/query.editor.html'; @@ -13,7 +15,7 @@ export class TestDataQueryCtrl extends QueryCtrl { selectedPoint: any; /** @ngInject */ - constructor($scope, $injector, private backendSrv) { + constructor($scope: any, $injector: any) { super($scope, $injector); this.target.scenarioId = this.target.scenarioId || 'random_walk'; @@ -49,10 +51,12 @@ export class TestDataQueryCtrl extends QueryCtrl { } $onInit() { - return this.backendSrv.get('/api/tsdb/testdata/scenarios').then(res => { - this.scenarioList = res; - this.scenario = _.find(this.scenarioList, { id: this.target.scenarioId }); - }); + return getBackendSrv() + .get('/api/tsdb/testdata/scenarios') + .then(res => { + this.scenarioList = res; + this.scenario = _.find(this.scenarioList, { id: this.target.scenarioId }); + }); } scenarioChanged() { @@ -65,6 +69,16 @@ export class TestDataQueryCtrl extends QueryCtrl { delete this.target.points; } + if (this.target.scenarioId === 'streaming_client') { + this.target.stream = _.defaults(this.target.stream || {}, defaultQuery); + } else { + delete this.target.stream; + } + + this.refresh(); + } + + streamChanged() { this.refresh(); } } diff --git a/public/app/plugins/datasource/testdata/types.ts b/public/app/plugins/datasource/testdata/types.ts index 1a05b3eac27..c17097cd01f 100644 --- a/public/app/plugins/datasource/testdata/types.ts +++ b/public/app/plugins/datasource/testdata/types.ts @@ -1,13 +1,22 @@ import { DataQuery } from '@grafana/ui/src/types'; -export interface TestDataQuery extends DataQuery { - alias?: string; - scenarioId: string; - stringInput: string; - points: any; -} - export interface Scenario { id: string; name: string; } + +export interface TestDataQuery extends DataQuery { + alias?: string; + scenarioId: string; + stringInput: string; + points?: any[]; + stream?: StreamingQuery; +} + +export interface StreamingQuery { + type: 'signal' | 'logs'; + speed: number; + spread: number; + noise: number; // wiggle around the signal for min/max + buffer?: number; +}