diff --git a/packages/grafana-data/src/utils/dataFrameHelper.ts b/packages/grafana-data/src/utils/dataFrameHelper.ts index d890608d5c4..15b03bf72bd 100644 --- a/packages/grafana-data/src/utils/dataFrameHelper.ts +++ b/packages/grafana-data/src/utils/dataFrameHelper.ts @@ -393,8 +393,8 @@ export class CircularDataFrame extends MutableDataFrame { constructor(options: CircularOptions) { super(undefined, (buffer?: any[]) => { return new CircularVector({ - buffer, ...options, + buffer, }); }); } diff --git a/packages/grafana-data/src/utils/fieldReducer.ts b/packages/grafana-data/src/utils/fieldReducer.ts index 428968f4043..7655a735e3d 100644 --- a/packages/grafana-data/src/utils/fieldReducer.ts +++ b/packages/grafana-data/src/utils/fieldReducer.ts @@ -101,6 +101,7 @@ export function reduceField(options: ReduceFieldOptions): FieldCalcs { // For now everything can use the standard stats let values = doStandardCalcs(field, ignoreNulls, nullAsZero); + for (const reducer of queue) { if (!values.hasOwnProperty(reducer.id) && reducer.reduce) { values = { @@ -109,10 +110,12 @@ export function reduceField(options: ReduceFieldOptions): FieldCalcs { }; } } + field.calcs = { ...field.calcs, ...values, }; + return values; } diff --git a/packages/grafana-data/src/utils/processDataFrame.ts b/packages/grafana-data/src/utils/processDataFrame.ts index c6d35126ac9..542fb950721 100644 --- a/packages/grafana-data/src/utils/processDataFrame.ts +++ b/packages/grafana-data/src/utils/processDataFrame.ts @@ -238,16 +238,19 @@ export const toDataFrame = (data: any): DataFrame => { // This will convert the array values into Vectors return new MutableDataFrame(data as DataFrameDTO); } + if (data.hasOwnProperty('datapoints')) { return convertTimeSeriesToDataFrame(data); } + if (data.hasOwnProperty('data')) { return convertGraphSeriesToDataFrame(data); } + if (data.hasOwnProperty('columns')) { return convertTableToDataFrame(data); } - // TODO, try to convert JSON/Array to table? + console.warn('Can not convert', data); throw new Error('Unsupported data format'); }; diff --git a/packages/grafana-ui/src/components/TransformersUI/TransformationsEditor.tsx b/packages/grafana-ui/src/components/TransformersUI/TransformationsEditor.tsx index 151d7df2c40..d026c391890 100644 --- a/packages/grafana-ui/src/components/TransformersUI/TransformationsEditor.tsx +++ b/packages/grafana-ui/src/components/TransformersUI/TransformationsEditor.tsx @@ -13,7 +13,7 @@ interface TransformationsEditorState { interface TransformationsEditorProps { onChange: (transformations: DataTransformerConfig[]) => void; transformations: DataTransformerConfig[]; - getCurrentData: (applyTransformations?: boolean) => DataFrame[]; + dataFrames: DataFrame[]; } export class TransformationsEditor extends React.PureComponent { @@ -46,9 +46,9 @@ export class TransformationsEditor extends React.PureComponent { - const { transformations, getCurrentData } = this.props; + const { transformations, dataFrames } = this.props; const hasTransformations = transformations.length > 0; - const preTransformData = getCurrentData(false); + const preTransformData = dataFrames; if (!hasTransformations) { return undefined; diff --git a/packages/grafana-ui/src/types/datasource.ts b/packages/grafana-ui/src/types/datasource.ts index 5482ac3734c..44b60e4f335 100644 --- a/packages/grafana-ui/src/types/datasource.ts +++ b/packages/grafana-ui/src/types/datasource.ts @@ -13,6 +13,7 @@ import { } from '@grafana/data'; import { PluginMeta, GrafanaPlugin } from './plugin'; import { PanelData } from './panel'; +import { Observable } from 'rxjs'; // NOTE: this seems more general than just DataSource export interface DataSourcePluginOptionsEditorProps { @@ -189,26 +190,9 @@ export abstract class DataSourceApi< init?: () => void; /** - * Query for data, and optionally stream results to an observer. - * - * Are you reading these docs aiming to execute a query? - * +-> If Yes, then consider using panelQueryRunner/State instead. see: - * * {@link https://github.com/grafana/grafana/blob/master/public/app/features/dashboard/state/PanelQueryRunner.ts PanelQueryRunner.ts} - * * {@link https://github.com/grafana/grafana/blob/master/public/app/features/dashboard/state/PanelQueryState.ts PanelQueryState.ts} - * - * If you are implementing a simple request-response query, - * then you can ignore the `observer` entirely. - * - * When streaming behavior is required, the Promise can return at any time - * with empty or partial data in the response and optionally a state. - * NOTE: The data in this initial response will not be replaced with any - * data from subsequent events. {@see DataStreamState} - * - * The request object will be passed in each observer callback - * so the callback could assert that the correct events are streaming and - * unsubscribe if unexpected results are returned. + * Query for data, and optionally stream results */ - abstract query(request: DataQueryRequest, observer?: DataStreamObserver): Promise; + abstract query(request: DataQueryRequest): Promise | Observable; /** * Test & verify datasource settings & connection details @@ -397,6 +381,19 @@ export interface DataQueryResponse { * or a partial result set */ data: DataQueryResponseData[]; + + /** + * When returning multiple partial responses or streams + * Use this key to inform Grafana how to combine the partial responses + * Multiple responses with same key are replaced (latest used) + */ + key?: string; + + /** + * Use this to control which state the response should have + * Defaults to LoadingState.Done if state is not defined + */ + state?: LoadingState; } export interface DataQuery { diff --git a/packages/grafana-ui/src/types/panel.ts b/packages/grafana-ui/src/types/panel.ts index cb0e69d15ea..a104122c6fb 100644 --- a/packages/grafana-ui/src/types/panel.ts +++ b/packages/grafana-ui/src/types/panel.ts @@ -1,6 +1,6 @@ import { ComponentClass, ComponentType } from 'react'; import { LoadingState, DataFrame, TimeRange, TimeZone, ScopedVars } from '@grafana/data'; -import { DataQueryRequest, DataQueryError, LegacyResponseData } from './datasource'; +import { DataQueryRequest, DataQueryError } from './datasource'; import { PluginMeta, GrafanaPlugin } from './plugin'; export type InterpolateFunction = (value: string, scopedVars?: ScopedVars, format?: string | Function) => string; @@ -16,9 +16,6 @@ export interface PanelData { series: DataFrame[]; request?: DataQueryRequest; error?: DataQueryError; - - // Data format expected by Angular panels - legacy?: LegacyResponseData[]; } export interface PanelProps { diff --git a/public/app/core/services/backend_srv.ts b/public/app/core/services/backend_srv.ts index 88d4fedbf5c..bb8cc44a7a9 100644 --- a/public/app/core/services/backend_srv.ts +++ b/public/app/core/services/backend_srv.ts @@ -157,6 +157,7 @@ export class BackendSrv implements BackendService { // is canceled, canceling the previous datasource request if it is still // in-flight. const requestId = options.requestId; + if (requestId) { this.resolveCancelerIfExists(requestId); // create new canceler diff --git a/public/app/core/utils/explore.test.ts b/public/app/core/utils/explore.test.ts index a5408abe286..328bf1b5f80 100644 --- a/public/app/core/utils/explore.test.ts +++ b/public/app/core/utils/explore.test.ts @@ -5,7 +5,6 @@ import { updateHistory, clearHistory, hasNonEmptyQuery, - instanceOfDataQueryError, getValueWithRefId, getFirstQueryErrorWithoutRefId, getRefIds, @@ -201,30 +200,6 @@ describe('hasNonEmptyQuery', () => { }); }); -describe('instanceOfDataQueryError', () => { - describe('when called with a DataQueryError', () => { - it('then it should return true', () => { - const error: DataQueryError = { - message: 'A message', - status: '200', - statusText: 'Ok', - }; - const result = instanceOfDataQueryError(error); - - expect(result).toBe(true); - }); - }); - - describe('when called with a non DataQueryError', () => { - it('then it should return false', () => { - const error = {}; - const result = instanceOfDataQueryError(error); - - expect(result).toBe(false); - }); - }); -}); - describe('hasRefId', () => { describe('when called with a null value', () => { it('then it should return null', () => { diff --git a/public/app/core/utils/explore.ts b/public/app/core/utils/explore.ts index f5d2d9e0072..25f5ca982cd 100644 --- a/public/app/core/utils/explore.ts +++ b/public/app/core/utils/explore.ts @@ -1,5 +1,6 @@ // Libraries import _ from 'lodash'; +import { Unsubscribable } from 'rxjs'; import { isLive } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker'; // Services & Utils import { @@ -28,7 +29,6 @@ import { ExploreMode, } from 'app/types/explore'; import { config } from '../config'; -import { PanelQueryState } from '../../features/dashboard/state/PanelQueryState'; import { TimeSrv } from 'app/features/dashboard/services/TimeSrv'; export const DEFAULT_RANGE = { @@ -124,8 +124,7 @@ export function buildQueryTransaction( dashboardId: 0, // TODO probably should be taken from preferences but does not seem to be used anyway. timezone: DefaultTimeZone, - // This is set to correct time later on before the query is actually run. - startTime: 0, + startTime: Date.now(), interval, intervalMs, // TODO: the query request expects number and we are using string here. Seems like it works so far but can create @@ -409,10 +408,6 @@ export const getTimeRangeFromUrl = (range: RawTimeRange, timeZone: TimeZone): Ti }; }; -export const instanceOfDataQueryError = (value: any): value is DataQueryError => { - return value.message !== undefined && value.status !== undefined && value.statusText !== undefined; -}; - export const getValueWithRefId = (value: any): any | null => { if (!value) { return null; @@ -518,9 +513,8 @@ export const convertToWebSocketUrl = (url: string) => { return `${backend}${url}`; }; -export const stopQueryState = (queryState: PanelQueryState, reason: string) => { - if (queryState && queryState.isStarted()) { - queryState.cancel(reason); - queryState.closeStreams(false); +export const stopQueryState = (querySubscription: Unsubscribable) => { + if (querySubscription) { + querySubscription.unsubscribe(); } }; diff --git a/public/app/features/dashboard/dashgrid/PanelChrome.tsx b/public/app/features/dashboard/dashgrid/PanelChrome.tsx index 251cf2f19c8..bf3c379f081 100644 --- a/public/app/features/dashboard/dashgrid/PanelChrome.tsx +++ b/public/app/features/dashboard/dashgrid/PanelChrome.tsx @@ -11,7 +11,7 @@ import { ErrorBoundary } from '@grafana/ui'; import { getTimeSrv, TimeSrv } from '../services/TimeSrv'; import { applyPanelTimeOverrides, calculateInnerPanelHeight } from 'app/features/dashboard/utils/panel'; import { profiler } from 'app/core/profiler'; -import { getProcessedDataFrames } from '../state/PanelQueryState'; +import { getProcessedDataFrames } from '../state/runRequest'; import templateSrv from 'app/features/templating/template_srv'; import config from 'app/core/config'; @@ -82,6 +82,7 @@ export class PanelChrome extends PureComponent { componentWillUnmount() { this.props.panel.events.off('refresh', this.onRefresh); + if (this.querySubscription) { this.querySubscription.unsubscribe(); this.querySubscription = null; @@ -94,12 +95,6 @@ export class PanelChrome extends PureComponent { // View state has changed if (isInView !== prevProps.isInView) { if (isInView) { - // Subscribe will kick of a notice of the last known state - if (!this.querySubscription && this.wantsQueryExecution) { - const runner = this.props.panel.getQueryRunner(); - this.querySubscription = runner.subscribe(this.panelDataObserver); - } - // Check if we need a delayed refresh if (this.state.refreshWhenInView) { this.onRefresh(); @@ -170,8 +165,9 @@ export class PanelChrome extends PureComponent { const queryRunner = panel.getQueryRunner(); if (!this.querySubscription) { - this.querySubscription = queryRunner.subscribe(this.panelDataObserver); + this.querySubscription = queryRunner.getData().subscribe(this.panelDataObserver); } + queryRunner.run({ datasource: panel.datasource, queries: panel.targets, diff --git a/public/app/features/dashboard/panel_editor/QueriesTab.tsx b/public/app/features/dashboard/panel_editor/QueriesTab.tsx index 98d68f3d90b..81a783196e4 100644 --- a/public/app/features/dashboard/panel_editor/QueriesTab.tsx +++ b/public/app/features/dashboard/panel_editor/QueriesTab.tsx @@ -22,10 +22,8 @@ import { DashboardModel } from '../state/DashboardModel'; import { DataQuery, DataSourceSelectItem, PanelData, AlphaNotice, PluginState } from '@grafana/ui'; import { LoadingState, DataTransformerConfig } from '@grafana/data'; import { PluginHelp } from 'app/core/components/PluginHelp/PluginHelp'; -import { PanelQueryRunnerFormat } from '../state/PanelQueryRunner'; import { Unsubscribable } from 'rxjs'; -import { isSharedDashboardQuery } from 'app/plugins/datasource/dashboard/SharedQueryRunner'; -import { DashboardQueryEditor } from 'app/plugins/datasource/dashboard/DashboardQueryEditor'; +import { isSharedDashboardQuery, DashboardQueryEditor } from 'app/plugins/datasource/dashboard'; interface Props { panel: PanelModel; @@ -64,7 +62,9 @@ export class QueriesTab extends PureComponent { const { panel } = this.props; const queryRunner = panel.getQueryRunner(); - this.querySubscription = queryRunner.subscribe(this.panelDataObserver, PanelQueryRunnerFormat.both); + this.querySubscription = queryRunner.getData(false).subscribe({ + next: (data: PanelData) => this.onPanelDataUpdate(data), + }); } componentWillUnmount() { @@ -74,22 +74,9 @@ export class QueriesTab extends PureComponent { } } - // Updates the response with information from the stream - panelDataObserver = { - next: (data: PanelData) => { - try { - const { panel } = this.props; - if (data.state === LoadingState.Error) { - panel.events.emit('data-error', data.error); - } else if (data.state === LoadingState.Done) { - panel.events.emit('data-received', data.legacy); - } - } catch (err) { - console.log('Panel.events handler error', err); - } - this.setState({ data }); - }, - }; + onPanelDataUpdate(data: PanelData) { + this.setState({ data }); + } findCurrentDataSource(): DataSourceSelectItem { const { panel } = this.props; @@ -226,11 +213,6 @@ export class QueriesTab extends PureComponent { this.setState({ scrollTop: target.scrollTop }); }; - getCurrentData = (applyTransformations = true) => { - const queryRunner = this.props.panel.getQueryRunner(); - return queryRunner.getCurrentData(applyTransformations).series; - }; - render() { const { panel, dashboard } = this.props; const { currentDS, scrollTop, data } = this.state; @@ -301,7 +283,7 @@ export class QueriesTab extends PureComponent { )} diff --git a/public/app/features/dashboard/panel_editor/QueryEditorRow.tsx b/public/app/features/dashboard/panel_editor/QueryEditorRow.tsx index 1e28832fa63..4f5579b754b 100644 --- a/public/app/features/dashboard/panel_editor/QueryEditorRow.tsx +++ b/public/app/features/dashboard/panel_editor/QueryEditorRow.tsx @@ -12,7 +12,7 @@ import { getTimeSrv } from 'app/features/dashboard/services/TimeSrv'; // Types import { PanelModel } from '../state/PanelModel'; import { DataQuery, DataSourceApi, PanelData, DataQueryRequest, ErrorBoundaryAlert } from '@grafana/ui'; -import { TimeRange, LoadingState } from '@grafana/data'; +import { TimeRange, LoadingState, toLegacyResponseData } from '@grafana/data'; import { DashboardModel } from '../state/DashboardModel'; interface Props { @@ -89,7 +89,7 @@ export class QueryEditorRow extends PureComponent { componentDidUpdate(prevProps: Props) { const { loadedDataSourceValue } = this.state; - const { data, query } = this.props; + const { data, query, panel } = this.props; if (data !== prevProps.data) { this.setState({ queryResponse: filterPanelDataToQuery(data, query.refId) }); @@ -99,9 +99,7 @@ export class QueryEditorRow extends PureComponent { } if (this.angularQueryEditor) { - // Some query controllers listen to data error events and need a digest - // for some reason this needs to be done in next tick - setTimeout(this.angularQueryEditor.digest); + notifyAngularQueryEditorsOfData(panel, data, this.angularQueryEditor); } } @@ -265,6 +263,25 @@ export class QueryEditorRow extends PureComponent { } } +// To avoid sending duplicate events for each row we have this global cached object here +// So we can check if we already emitted this legacy data event +let globalLastPanelDataCache: PanelData = null; + +function notifyAngularQueryEditorsOfData(panel: PanelModel, data: PanelData, editor: AngularComponent) { + if (data === globalLastPanelDataCache) { + return; + } + + globalLastPanelDataCache = data; + + const legacy = data.series.map(v => toLegacyResponseData(v)); + panel.events.emit('data-received', legacy); + + // Some query controllers listen to data error events and need a digest + // for some reason this needs to be done in next tick + setTimeout(editor.digest); +} + export interface AngularQueryComponentScope { target: DataQuery; panel: PanelModel; diff --git a/public/app/features/dashboard/state/PanelModel.ts b/public/app/features/dashboard/state/PanelModel.ts index 41df0c38153..6648ba6b342 100644 --- a/public/app/features/dashboard/state/PanelModel.ts +++ b/public/app/features/dashboard/state/PanelModel.ts @@ -327,7 +327,8 @@ export class PanelModel { getQueryRunner(): PanelQueryRunner { if (!this.queryRunner) { - this.queryRunner = new PanelQueryRunner(this.id); + this.queryRunner = new PanelQueryRunner(); + this.setTransformations(this.transformations); } return this.queryRunner; } @@ -336,6 +337,10 @@ export class PanelModel { return this.title && this.title.length > 0; } + isAngularPlugin(): boolean { + return this.plugin && !!this.plugin.angularPanelCtrl; + } + destroy() { this.events.emit('panel-teardown'); this.events.removeAllListeners(); @@ -347,11 +352,8 @@ export class PanelModel { } setTransformations(transformations: DataTransformerConfig[]) { - // save for persistence this.transformations = transformations; - - // update query runner transformers - this.getQueryRunner().setTransform(transformations); + this.getQueryRunner().setTransformations(transformations); } } diff --git a/public/app/features/dashboard/state/PanelQueryRunner.test.ts b/public/app/features/dashboard/state/PanelQueryRunner.test.ts index 75209f54a08..0f8f55d7e39 100644 --- a/public/app/features/dashboard/state/PanelQueryRunner.test.ts +++ b/public/app/features/dashboard/state/PanelQueryRunner.test.ts @@ -1,17 +1,13 @@ -import { PanelQueryRunner, QueryRunnerOptions } from './PanelQueryRunner'; -import { PanelData, DataQueryRequest, DataStreamObserver, DataStreamState } from '@grafana/ui'; - -import { LoadingState, MutableDataFrame, ScopedVars } from '@grafana/data'; -import { dateTime } from '@grafana/data'; -import { SHARED_DASHBODARD_QUERY } from 'app/plugins/datasource/dashboard/SharedQueryRunner'; -import { DashboardQuery } from 'app/plugins/datasource/dashboard/types'; +import { PanelQueryRunner } from './PanelQueryRunner'; +import { PanelData, DataQueryRequest } from '@grafana/ui'; +import { dateTime, ScopedVars } from '@grafana/data'; import { PanelModel } from './PanelModel'; -import { Subject } from 'rxjs'; jest.mock('app/core/services/backend_srv'); // Defined within setup functions const panelsForCurrentDashboardMock: { [key: number]: PanelModel } = {}; + jest.mock('app/features/dashboard/services/DashboardSrv', () => ({ getDashboardSrv: () => { return { @@ -40,7 +36,6 @@ interface ScenarioContext { events?: PanelData[]; res?: PanelData; queryCalledWith?: DataQueryRequest; - observer: DataStreamObserver; runner: PanelQueryRunner; } @@ -55,8 +50,7 @@ function describeQueryRunnerScenario(description: string, scenarioFn: ScenarioFn scopedVars: { server: { text: 'Server1', value: 'server-1' }, }, - runner: new PanelQueryRunner(1), - observer: (args: any) => {}, + runner: new PanelQueryRunner(), setup: (fn: () => void) => { setupFn = fn; }, @@ -72,9 +66,8 @@ function describeQueryRunnerScenario(description: string, scenarioFn: ScenarioFn const datasource: any = { name: 'TestDB', interval: ctx.dsInterval, - query: (options: DataQueryRequest, observer: DataStreamObserver) => { + query: (options: DataQueryRequest) => { ctx.queryCalledWith = options; - ctx.observer = observer; return Promise.resolve(response); }, testDatasource: jest.fn(), @@ -95,9 +88,10 @@ function describeQueryRunnerScenario(description: string, scenarioFn: ScenarioFn queries: [{ refId: 'A', test: 1 }], }; - ctx.runner = new PanelQueryRunner(1); - ctx.runner.subscribe({ + ctx.runner = new PanelQueryRunner(); + ctx.runner.getData().subscribe({ next: (data: PanelData) => { + ctx.res = data; ctx.events.push(data); }, }); @@ -110,7 +104,7 @@ function describeQueryRunnerScenario(description: string, scenarioFn: ScenarioFn } as PanelModel; ctx.events = []; - ctx.res = await ctx.runner.run(args); + ctx.runner.run(args); }); scenarioFn(ctx); @@ -190,102 +184,4 @@ describe('PanelQueryRunner', () => { expect(ctx.queryCalledWith.maxDataPoints).toBe(10); }); }); - - describeQueryRunnerScenario('when datasource is streaming data', ctx => { - let streamState: DataStreamState; - let isUnsubbed = false; - - beforeEach(() => { - streamState = { - state: LoadingState.Streaming, - key: 'test-stream-1', - data: [ - new MutableDataFrame({ - fields: [], - name: 'I am a magic stream', - }), - ], - request: { - requestId: ctx.queryCalledWith.requestId, - } as any, - unsubscribe: () => { - isUnsubbed = true; - }, - }; - ctx.observer(streamState); - }); - - it('should push another update to subscriber', async () => { - expect(ctx.events.length).toBe(2); - }); - - it('should set state to streaming', async () => { - expect(ctx.events[1].state).toBe(LoadingState.Streaming); - }); - - it('should not unsubscribe', async () => { - expect(isUnsubbed).toBe(false); - }); - - it('destroy should unsubscribe streams', async () => { - ctx.runner.destroy(); - expect(isUnsubbed).toBe(true); - }); - }); - - describeQueryRunnerScenario('Shared query request', ctx => { - ctx.setup(() => {}); - - it('should get the same results as the original', async () => { - // Get the results from - const q: DashboardQuery = { refId: 'Z', panelId: 1 }; - const myPanelId = 7; - - const runnerWantingSharedResults = new PanelQueryRunner(myPanelId); - panelsForCurrentDashboardMock[myPanelId] = { - id: myPanelId, - getQueryRunner: () => { - return runnerWantingSharedResults; - }, - } as PanelModel; - - const res = await runnerWantingSharedResults.run({ - datasource: SHARED_DASHBODARD_QUERY, - queries: [q], - - // Same query setup - scopedVars: ctx.scopedVars, - minInterval: ctx.minInterval, - widthPixels: ctx.widthPixels, - maxDataPoints: ctx.maxDataPoints, - timeRange: { - from: dateTime().subtract(1, 'days'), - to: dateTime(), - raw: { from: '1h', to: 'now' }, - }, - panelId: myPanelId, // Not 1 - }); - - const req = res.request; - expect(req.panelId).toBe(1); // The source panel - expect(req.targets[0].datasource).toBe('TestDB'); - expect(res.series.length).toBe(1); - expect(res.series[0].length).toBe(2); - - // Get the private subject and check that someone is listening - const subject = (ctx.runner as any).subject as Subject; - expect(subject.observers.length).toBe(2); - - // Now change the query and we should stop listening - try { - runnerWantingSharedResults.run({ - datasource: 'unknown-datasource', - panelId: myPanelId, // Not 1 - } as QueryRunnerOptions); - } catch {} - // runnerWantingSharedResults subject is now unsubscribed - // the test listener is still subscribed - expect(subject.observers.length).toBe(1); - }); - }); }); diff --git a/public/app/features/dashboard/state/PanelQueryRunner.ts b/public/app/features/dashboard/state/PanelQueryRunner.ts index c6b4e6bd617..a8a5cf1f411 100644 --- a/public/app/features/dashboard/state/PanelQueryRunner.ts +++ b/public/app/features/dashboard/state/PanelQueryRunner.ts @@ -1,20 +1,19 @@ // Libraries -import cloneDeep from 'lodash/cloneDeep'; -import throttle from 'lodash/throttle'; -import { Subject, Unsubscribable, PartialObserver } from 'rxjs'; +import { cloneDeep } from 'lodash'; +import { ReplaySubject, Unsubscribable, Observable } from 'rxjs'; +import { map } from 'rxjs/operators'; // Services & Utils +import { config } from 'app/core/config'; import { getDatasourceSrv } from 'app/features/plugins/datasource_srv'; import kbn from 'app/core/utils/kbn'; import templateSrv from 'app/features/templating/template_srv'; -import { PanelQueryState } from './PanelQueryState'; -import { isSharedDashboardQuery, SharedQueryRunner } from 'app/plugins/datasource/dashboard/SharedQueryRunner'; +import { runRequest, preProcessPanelData } from './runRequest'; +import { runSharedRequest, isSharedDashboardQuery } from '../../../plugins/datasource/dashboard'; // Types import { PanelData, DataQuery, DataQueryRequest, DataSourceApi, DataSourceJsonData } from '@grafana/ui'; - -import { TimeRange, DataTransformerConfig, transformDataFrame, toLegacyResponseData, ScopedVars } from '@grafana/data'; -import config from 'app/core/config'; +import { TimeRange, DataTransformerConfig, transformDataFrame, ScopedVars } from '@grafana/data'; export interface QueryRunnerOptions< TQuery extends DataQuery = DataQuery, @@ -36,109 +35,45 @@ export interface QueryRunnerOptions< transformations?: DataTransformerConfig[]; } -export enum PanelQueryRunnerFormat { - frames = 'frames', - legacy = 'legacy', - both = 'both', -} - let counter = 100; function getNextRequestId() { return 'Q' + counter++; } export class PanelQueryRunner { - private subject?: Subject; - - private state = new PanelQueryState(); + private subject?: ReplaySubject; + private subscription?: Unsubscribable; private transformations?: DataTransformerConfig[]; - // Listen to another panel for changes - private sharedQueryRunner: SharedQueryRunner; - - constructor(private panelId: number) { - this.state.onStreamingDataUpdated = this.onStreamingDataUpdated; - this.subject = new Subject(); - } - - getPanelId() { - return this.panelId; + constructor() { + this.subject = new ReplaySubject(1); } /** - * Get the last result -- optionally skip the transformation + * Returns an observable that subscribes to the shared multi-cast subject (that reply last result). */ - // TODO: add tests - getCurrentData(transform = true): PanelData { - const v = this.state.validateStreamsAndGetPanelData(); - const transformData = config.featureToggles.transformations && transform; - const hasTransformations = this.transformations && this.transformations.length; - - if (transformData && hasTransformations) { - const processed = transformDataFrame(this.transformations, v.series); - return { - ...v, - series: processed, - legacy: processed.map(p => toLegacyResponseData(p)), - }; + getData(transform = true): Observable { + if (transform) { + return this.subject.pipe( + map((data: PanelData) => { + if (this.hasTransformations()) { + const newSeries = transformDataFrame(this.transformations, data.series); + return { ...data, series: newSeries }; + } + return data; + }) + ); } - return v; + // Just pass it directly + return this.subject.pipe(); } - /** - * Listen for updates to the PanelData. If a query has already run for this panel, - * the results will be immediatly passed to the observer - */ - subscribe(observer: PartialObserver, format = PanelQueryRunnerFormat.frames): Unsubscribable { - if (format === PanelQueryRunnerFormat.legacy) { - this.state.sendLegacy = true; - } else if (format === PanelQueryRunnerFormat.both) { - this.state.sendFrames = true; - this.state.sendLegacy = true; - } else { - this.state.sendFrames = true; - } - - // Send the last result - if (this.state.isStarted()) { - // Force check formats again? - this.state.getDataAfterCheckingFormats(); - observer.next(this.getCurrentData()); // transformed - } - - return this.subject.subscribe(observer); + hasTransformations() { + return config.featureToggles.transformations && this.transformations && this.transformations.length > 0; } - /** - * Subscribe one runner to another - */ - chain(runner: PanelQueryRunner): Unsubscribable { - const { sendLegacy, sendFrames } = runner.state; - let format = sendFrames ? PanelQueryRunnerFormat.frames : PanelQueryRunnerFormat.legacy; - - if (sendLegacy) { - format = PanelQueryRunnerFormat.both; - } - - return this.subscribe(runner.subject, format); - } - - /** - * Change the current transformation and notify all listeners - * Should be used only by panel editor to update the transformers - */ - setTransform = (transformations?: DataTransformerConfig[]) => { - this.transformations = transformations; - - if (this.state.isStarted()) { - this.onStreamingDataUpdated(); - } - }; - - async run(options: QueryRunnerOptions): Promise { - const { state } = this; - + async run(options: QueryRunnerOptions) { const { queries, timezone, @@ -152,18 +87,12 @@ export class PanelQueryRunner { maxDataPoints, scopedVars, minInterval, - delayStateNotification, + // delayStateNotification, } = options; - // Support shared queries if (isSharedDashboardQuery(datasource)) { - if (!this.sharedQueryRunner) { - this.sharedQueryRunner = new SharedQueryRunner(this); - } - return this.sharedQueryRunner.process(options); - } else if (this.sharedQueryRunner) { - this.sharedQueryRunner.disconnect(); - this.sharedQueryRunner = null; + this.pipeToSubject(runSharedRequest(options)); + return; } const request: DataQueryRequest = { @@ -185,8 +114,6 @@ export class PanelQueryRunner { // Add deprecated property (request as any).rangeRaw = timeRange.raw; - let loadingStateTimeoutId = 0; - try { const ds = await getDataSource(datasource, request.scopedVars); @@ -215,54 +142,30 @@ export class PanelQueryRunner { request.interval = norm.interval; request.intervalMs = norm.intervalMs; - // Check if we can reuse the already issued query - const active = state.getActiveRunner(); - if (active) { - if (state.isSameQuery(ds, request)) { - // Maybe cancel if it has run too long? - console.log('Trying to execute query while last one has yet to complete, returning same promise'); - return active; - } else { - state.cancel('Query Changed while running'); - } - } - - // Send a loading status event on slower queries - loadingStateTimeoutId = window.setTimeout(() => { - if (state.getActiveRunner()) { - this.subject.next(this.state.validateStreamsAndGetPanelData()); - } - }, delayStateNotification || 500); - - this.transformations = options.transformations; - - const data = await state.execute(ds, request); - // Clear the delayed loading state timeout - clearTimeout(loadingStateTimeoutId); - - // Broadcast results - this.subject.next(this.getCurrentData()); - return data; + this.pipeToSubject(runRequest(ds, request)); } catch (err) { - clearTimeout(loadingStateTimeoutId); - - const data = state.setError(err); - this.subject.next(data); - return data; + console.log('PanelQueryRunner Error', err); } } - /** - * Called after every streaming event. This should be throttled so we - * avoid accidentally overwhelming the browser - */ - onStreamingDataUpdated = throttle( - () => { - this.subject.next(this.getCurrentData()); - }, - 50, - { trailing: true, leading: true } - ); + private pipeToSubject(observable: Observable) { + if (this.subscription) { + this.subscription.unsubscribe(); + } + + // Makes sure everything is a proper DataFrame + const prepare = preProcessPanelData(); + + this.subscription = observable.subscribe({ + next: (data: PanelData) => { + this.subject.next(prepare(data)); + }, + }); + } + + setTransformations(transformations?: DataTransformerConfig[]) { + this.transformations = transformations; + } /** * Called when the panel is closed @@ -273,17 +176,10 @@ export class PanelQueryRunner { this.subject.complete(); } - // Will cancel and disconnect any open requets - this.state.cancel('destroy'); + if (this.subscription) { + this.subscription.unsubscribe(); + } } - - setState = (state: PanelQueryState) => { - this.state = state; - }; - - getState = () => { - return this.state; - }; } async function getDataSource( diff --git a/public/app/features/dashboard/state/PanelQueryState.test.ts b/public/app/features/dashboard/state/PanelQueryState.test.ts deleted file mode 100644 index 8f6d57cfafc..00000000000 --- a/public/app/features/dashboard/state/PanelQueryState.test.ts +++ /dev/null @@ -1,238 +0,0 @@ -import { toDataQueryError, PanelQueryState, getProcessedDataFrames } from './PanelQueryState'; -import { MockDataSourceApi } from 'test/mocks/datasource_srv'; -import { LoadingState, getDataFrameRow } from '@grafana/data'; -import { DataQueryResponse, DataQueryRequest, DataQuery } from '@grafana/ui'; -import { getQueryOptions } from 'test/helpers/getQueryOptions'; - -describe('PanelQueryState', () => { - it('converts anythign to an error', () => { - let err = toDataQueryError(undefined); - expect(err.message).toEqual('Query error'); - - err = toDataQueryError('STRING ERRROR'); - expect(err.message).toEqual('STRING ERRROR'); - - err = toDataQueryError({ message: 'hello' }); - expect(err.message).toEqual('hello'); - }); - - it('keeps track of running queries', async () => { - const state = new PanelQueryState(); - expect(state.getActiveRunner()).toBeFalsy(); - let hasRun = false; - const dsRunner = new Promise((resolve, reject) => { - // The status should be running when we get here - expect(state.getActiveRunner()).toBeTruthy(); - resolve({ data: ['x', 'y'] }); - hasRun = true; - }); - const ds = new MockDataSourceApi('test'); - ds.queryResolver = dsRunner; - - // should not actually run for an empty query - let empty = await state.execute(ds, getQueryOptions({})); - expect(state.getActiveRunner()).toBeFalsy(); - expect(empty.series.length).toBe(0); - expect(hasRun).toBeFalsy(); - - const query = getQueryOptions({ - targets: [{ hide: true, refId: 'X' }, { hide: true, refId: 'Y' }, { hide: true, refId: 'Z' }], - }); - - empty = await state.execute(ds, query); - // should not run any hidden queries' - expect(state.getActiveRunner()).toBeFalsy(); - expect(empty.series.length).toBe(0); - expect(hasRun).toBeFalsy(); - - // Check for the same query - expect(state.isSameQuery(ds, query)).toBeTruthy(); - - // Check for differnet queries - expect(state.isSameQuery(new MockDataSourceApi('test'), query)).toBeFalsy(); - expect(state.isSameQuery(ds, getQueryOptions({ targets: [{ refId: 'differnet' }] }))).toBeFalsy(); - }); -}); - -describe('When cancelling request', () => { - it('Should call rejector', () => { - const state = new PanelQueryState(); - state.request = {} as DataQueryRequest; - (state as any).rejector = (obj: any) => { - expect(obj.cancelled).toBe(true); - expect(obj.message).toBe('OHH'); - }; - - state.cancel('OHH'); - }); -}); - -describe('getProcessedDataFrame', () => { - it('converts timeseries to table skipping nulls', () => { - const input1 = { - target: 'Field Name', - datapoints: [[100, 1], [200, 2]], - }; - const input2 = { - // without target - target: '', - datapoints: [[100, 1], [200, 2]], - }; - const data = getProcessedDataFrames([null, input1, input2, null, null]); - expect(data.length).toBe(2); - expect(data[0].fields[0].name).toBe(input1.target); - - const cmp = [getDataFrameRow(data[0], 0), getDataFrameRow(data[0], 1)]; - expect(cmp).toEqual(input1.datapoints); - - // Default name - expect(data[1].fields[0].name).toEqual('Value'); - - // Every colun should have a name and a type - for (const table of data) { - for (const field of table.fields) { - expect(field.name).toBeDefined(); - expect(field.type).toBeDefined(); - } - } - }); - - it('supports null values from query OK', () => { - expect(getProcessedDataFrames([null, null, null, null])).toEqual([]); - expect(getProcessedDataFrames(undefined)).toEqual([]); - expect(getProcessedDataFrames((null as unknown) as any[])).toEqual([]); - expect(getProcessedDataFrames([])).toEqual([]); - }); -}); - -function makeSeriesStub(refId: string) { - return { - fields: [{ name: undefined }], - refId, - } as any; -} - -describe('stream handling', () => { - const state = new PanelQueryState(); - state.onStreamingDataUpdated = () => { - // nothing - }; - state.request = { - requestId: '123', - range: { - raw: { - from: 123, // if string it gets revaluated - }, - }, - } as any; - state.response = { - state: LoadingState.Done, - series: [makeSeriesStub('A'), makeSeriesStub('B')], - }; - - it('gets the response', () => { - const data = state.validateStreamsAndGetPanelData(); - expect(data.series.length).toBe(2); - expect(data.state).toBe(LoadingState.Done); - expect(data.series[0].refId).toBe('A'); - }); - - it('adds a stream event', () => { - // Post a stream event - state.dataStreamObserver({ - state: LoadingState.Loading, - key: 'C', - request: state.request, // From the same request - data: [makeSeriesStub('C')], - unsubscribe: () => {}, - }); - expect(state.streams.length).toBe(1); - - const data = state.validateStreamsAndGetPanelData(); - expect(data.series.length).toBe(3); - expect(data.state).toBe(LoadingState.Streaming); - expect(data.series[2].refId).toBe('C'); - }); - - it('add another stream event (with a differnet key)', () => { - // Post a stream event - state.dataStreamObserver({ - state: LoadingState.Loading, - key: 'D', - request: state.request, // From the same request - data: [makeSeriesStub('D')], - unsubscribe: () => {}, - }); - expect(state.streams.length).toBe(2); - - const data = state.validateStreamsAndGetPanelData(); - expect(data.series.length).toBe(4); - expect(data.state).toBe(LoadingState.Streaming); - expect(data.series[3].refId).toBe('D'); - }); - - it('replace the first stream value, but keep the order', () => { - // Post a stream event - state.dataStreamObserver({ - state: LoadingState.Loading, - key: 'C', // The key to replace previous index 2 - request: state.request, // From the same request - data: [makeSeriesStub('X')], - unsubscribe: () => {}, - }); - expect(state.streams.length).toBe(2); - - const data = state.validateStreamsAndGetPanelData(); - expect(data.series[2].refId).toBe('X'); - }); - - it('ignores streams from a differnet request', () => { - // Post a stream event - state.dataStreamObserver({ - state: LoadingState.Loading, - key: 'Z', // Note with key 'A' it would still overwrite - request: { - ...state.request, - requestId: 'XXX', // Different request and id - } as any, - data: [makeSeriesStub('C')], - unsubscribe: () => {}, - }); - - expect(state.streams.length).toBe(2); // no change - const data = state.validateStreamsAndGetPanelData(); - expect(data.series.length).toBe(4); - }); - - it('removes streams when the query changes', () => { - state.request = { - ...state.request, - requestId: 'somethine else', - } as any; - state.response = { - state: LoadingState.Done, - series: [makeSeriesStub('F')], - }; - expect(state.streams.length).toBe(2); // unchanged - - const data = state.validateStreamsAndGetPanelData(); - expect(data.series.length).toBe(1); - expect(data.series[0].refId).toBe('F'); - expect(state.streams.length).toBe(0); // no streams - }); - - it('should close streams on error', () => { - // Post a stream event - state.dataStreamObserver({ - state: LoadingState.Error, - key: 'C', - error: { message: 'EEEEE' }, - data: [], - request: state.request, - unsubscribe: () => {}, - }); - - expect(state.streams.length).toBe(0); - expect(state.response.state).toBe(LoadingState.Error); - }); -}); diff --git a/public/app/features/dashboard/state/PanelQueryState.ts b/public/app/features/dashboard/state/PanelQueryState.ts deleted file mode 100644 index 6add79847e8..00000000000 --- a/public/app/features/dashboard/state/PanelQueryState.ts +++ /dev/null @@ -1,377 +0,0 @@ -// Libraries -import { isArray, isEqual, isString } from 'lodash'; -// Utils & Services -import { getBackendSrv } from 'app/core/services/backend_srv'; -import { - dateMath, - guessFieldTypes, - LoadingState, - toLegacyResponseData, - DataFrame, - toDataFrame, - isDataFrame, -} from '@grafana/data'; -// Types -import { - DataSourceApi, - DataQueryRequest, - PanelData, - DataQueryError, - DataStreamObserver, - DataStreamState, - DataQueryResponseData, -} from '@grafana/ui'; - -export class PanelQueryState { - // The current/last running request - request = { - startTime: 0, - endTime: 1000, // Somethign not zero - } as DataQueryRequest; - - // The result back from the datasource query - response = { - state: LoadingState.NotStarted, - series: [], - } as PanelData; - - // Active stream results - streams: DataStreamState[] = []; - - sendFrames = false; - sendLegacy = false; - - // A promise for the running query - private executor?: Promise = null; - private rejector = (reason?: any) => {}; - private datasource: DataSourceApi = {} as any; - - isFinished(state: LoadingState) { - return state === LoadingState.Done || state === LoadingState.Error; - } - - isStarted() { - return this.response.state !== LoadingState.NotStarted; - } - - isSameQuery(ds: DataSourceApi, req: DataQueryRequest) { - if (ds !== this.datasource) { - return false; - } - - // For now just check that the targets look the same - return isEqual(this.request.targets, req.targets); - } - - /** - * Return the currently running query - */ - getActiveRunner(): Promise | undefined { - return this.executor; - } - - cancel(reason: string) { - const { request } = this; - this.executor = null; - - try { - // If no endTime the call to datasource.query did not complete - // call rejector to reject the executor promise - if (!request.endTime) { - request.endTime = Date.now(); - this.rejector({ cancelled: true, message: reason }); - } - - // Cancel any open HTTP request with the same ID - if (request.requestId) { - getBackendSrv().resolveCancelerIfExists(request.requestId); - } - } catch (err) { - console.log('Error canceling request', err); - } - - // Close any open streams - this.closeStreams(true); - } - - execute(ds: DataSourceApi, req: DataQueryRequest): Promise { - this.request = { - ...req, - startTime: Date.now(), - }; - this.datasource = ds; - - // Return early if there are no queries to run - if (!req.targets.length) { - console.log('No queries, so return early'); - this.request.endTime = Date.now(); - this.closeStreams(); - return Promise.resolve( - (this.response = { - state: LoadingState.Done, - series: [], // Clear the data - legacy: [], - }) - ); - } - - // Set the loading state immediately - this.response.state = LoadingState.Loading; - this.executor = new Promise((resolve, reject) => { - this.rejector = reject; - - return ds - .query(this.request, this.dataStreamObserver) - .then(resp => { - if (!isArray(resp.data)) { - throw new Error(`Expected response data to be array, got ${typeof resp.data}.`); - } - - this.request.endTime = Date.now(); - this.executor = null; - - // Make sure we send something back -- called run() w/o subscribe! - if (!(this.sendFrames || this.sendLegacy)) { - this.sendFrames = true; - } - - // Save the result state - this.response = { - state: LoadingState.Done, - request: this.request, - series: this.sendFrames ? getProcessedDataFrames(resp.data) : [], - legacy: this.sendLegacy ? translateToLegacyData(resp.data) : undefined, - }; - resolve(this.validateStreamsAndGetPanelData()); - }) - .catch(err => { - this.executor = null; - resolve(this.setError(err)); - }); - }); - - return this.executor; - } - - // Send a notice when the stream has updated the current model - onStreamingDataUpdated: () => void; - - // This gets all stream events and keeps track of them - // it will then delegate real changes to the PanelQueryRunner - dataStreamObserver: DataStreamObserver = (stream: DataStreamState) => { - // Streams only work with the 'series' format - this.sendFrames = true; - - if (stream.state === LoadingState.Error) { - this.setError(stream.error); - this.onStreamingDataUpdated(); - return; - } - - // Add the stream to our list - let found = false; - const active = this.streams.map(s => { - if (s.key === stream.key) { - found = true; - return stream; - } - return s; - }); - - if (!found) { - if (shouldDisconnect(this.request, stream)) { - console.log('Got stream update from old stream, unsubscribing'); - stream.unsubscribe(); - return; - } - active.push(stream); - } - - this.streams = active; - this.onStreamingDataUpdated(); - }; - - closeStreams(keepSeries = false) { - if (!this.streams.length) { - return; - } - - const series: DataFrame[] = []; - - for (const stream of this.streams) { - if (stream.data) { - series.push.apply(series, stream.data); - } - - try { - stream.unsubscribe(); - } catch { - console.log('Failed to unsubscribe to stream'); - } - } - - this.streams = []; - - // Move the series from streams to the response - if (keepSeries) { - const { response } = this; - this.response = { - ...response, - series: [ - ...response.series, - ...series, // Append the streamed series - ], - }; - } - } - - /** - * This is called before broadcasting data to listeners. Given that - * stream events can happen at any point, we need to make sure to - * only return data from active streams. - */ - validateStreamsAndGetPanelData(): PanelData { - const { response, streams, request } = this; - - // When not streaming, return the response + request - if (!streams.length) { - return { - ...response, - request: request, - }; - } - - let done = this.isFinished(response.state); - const series = [...response.series]; - const active: DataStreamState[] = []; - - for (const stream of this.streams) { - if (shouldDisconnect(request, stream)) { - console.log('getPanelData() - shouldDisconnect true, unsubscribing to steam'); - stream.unsubscribe(); - continue; - } - - active.push(stream); - series.push.apply(series, stream.data); - - if (!this.isFinished(stream.state)) { - done = false; - } - } - - this.streams = active; - - // Update the time range - let timeRange = this.request.range; - if (isString(timeRange.raw.from)) { - timeRange = { - from: dateMath.parse(timeRange.raw.from, false), - to: dateMath.parse(timeRange.raw.to, true), - raw: timeRange.raw, - }; - } - - return { - state: done ? LoadingState.Done : LoadingState.Streaming, - // This should not be needed but unfortunately Prometheus datasource sends non DataFrame here bypassing the - // typings - series: this.sendFrames ? getProcessedDataFrames(series) : [], - legacy: this.sendLegacy ? translateToLegacyData(series) : undefined, - request: { - ...this.request, - range: timeRange, // update the time range - }, - }; - } - - /** - * Make sure all requested formats exist on the data - */ - getDataAfterCheckingFormats(): PanelData { - const { response, sendLegacy, sendFrames } = this; - if (sendLegacy && (!response.legacy || !response.legacy.length)) { - response.legacy = response.series.map(v => toLegacyResponseData(v)); - } - if (sendFrames && !response.series.length && response.legacy) { - response.series = response.legacy.map(v => toDataFrame(v)); - } - return this.validateStreamsAndGetPanelData(); - } - - setError(err: any): PanelData { - if (!this.request.endTime) { - this.request.endTime = Date.now(); - } - this.closeStreams(true); - this.response = { - ...this.response, // Keep any existing data - state: LoadingState.Error, - error: toDataQueryError(err), - }; - return this.validateStreamsAndGetPanelData(); - } -} - -export function shouldDisconnect(source: DataQueryRequest, state: DataStreamState) { - // It came from the same the same request, so keep it - if (source === state.request || state.request.requestId.startsWith(source.requestId)) { - return false; - } - - // We should be able to check that it is the same query regardless of - // if it came from the same request. This will be important for #16676 - - return true; -} - -export function toDataQueryError(err: any): DataQueryError { - const error = (err || {}) as DataQueryError; - if (!error.message) { - if (typeof err === 'string' || err instanceof String) { - return { message: err } as DataQueryError; - } - - let message = 'Query error'; - if (error.message) { - message = error.message; - } else if (error.data && error.data.message) { - message = error.data.message; - } else if (error.data && error.data.error) { - message = error.data.error; - } else if (error.status) { - message = `Query error: ${error.status} ${error.statusText}`; - } - error.message = message; - } - return error; -} - -function translateToLegacyData(data: DataQueryResponseData) { - return data.map((v: any) => { - if (isDataFrame(v)) { - return toLegacyResponseData(v); - } - return v; - }); -} - -/** - * All panels will be passed tables that have our best guess at colum type set - * - * This is also used by PanelChrome for snapshot support - */ -export function getProcessedDataFrames(results?: DataQueryResponseData[]): DataFrame[] { - if (!isArray(results)) { - return []; - } - - const series: DataFrame[] = []; - for (const r of results) { - if (r) { - series.push(guessFieldTypes(toDataFrame(r))); - } - } - - return series; -} diff --git a/public/app/features/dashboard/state/runRequest.test.ts b/public/app/features/dashboard/state/runRequest.test.ts new file mode 100644 index 00000000000..f305c1bd693 --- /dev/null +++ b/public/app/features/dashboard/state/runRequest.test.ts @@ -0,0 +1,206 @@ +import { DataFrame, LoadingState, dateTime } from '@grafana/data'; +import { PanelData, DataSourceApi, DataQueryRequest, DataQueryResponse } from '@grafana/ui'; +import { Subscriber, Observable, Subscription } from 'rxjs'; +import { runRequest } from './runRequest'; + +jest.mock('app/core/services/backend_srv'); + +class ScenarioCtx { + ds: DataSourceApi; + request: DataQueryRequest; + subscriber: Subscriber; + isUnsubbed = false; + setupFn: () => void = () => {}; + results: PanelData[]; + subscription: Subscription; + wasStarted = false; + error: Error = null; + toStartTime = dateTime(); + fromStartTime = dateTime(); + + reset() { + this.wasStarted = false; + this.isUnsubbed = false; + + this.results = []; + this.request = { + range: { + from: this.toStartTime, + to: this.fromStartTime, + raw: { from: '1h', to: 'now' }, + }, + targets: [ + { + refId: 'A', + }, + ], + } as DataQueryRequest; + + this.ds = { + query: (request: DataQueryRequest) => { + return new Observable(subscriber => { + this.subscriber = subscriber; + this.wasStarted = true; + + if (this.error) { + throw this.error; + } + + return () => { + this.isUnsubbed = true; + }; + }); + }, + } as DataSourceApi; + } + + start() { + this.subscription = runRequest(this.ds, this.request).subscribe({ + next: (data: PanelData) => { + this.results.push(data); + }, + }); + } + + emitPacket(packet: DataQueryResponse) { + this.subscriber.next(packet); + } + + setup(fn: () => void) { + this.setupFn = fn; + } +} + +function runRequestScenario(desc: string, fn: (ctx: ScenarioCtx) => void) { + describe(desc, () => { + const ctx = new ScenarioCtx(); + + beforeEach(() => { + ctx.reset(); + return ctx.setupFn(); + }); + + fn(ctx); + }); +} + +describe('runRequest', () => { + runRequestScenario('with no queries', ctx => { + ctx.setup(() => { + ctx.request.targets = []; + ctx.start(); + }); + + it('should emit empty result with loading state done', () => { + expect(ctx.wasStarted).toBe(false); + expect(ctx.results[0].state).toBe(LoadingState.Done); + }); + }); + + runRequestScenario('After first response', ctx => { + ctx.setup(() => { + ctx.start(); + ctx.emitPacket({ + data: [{ name: 'Data' } as DataFrame], + }); + }); + + it('should emit single result with loading state done', () => { + expect(ctx.wasStarted).toBe(true); + expect(ctx.results.length).toBe(1); + }); + }); + + runRequestScenario('After tree responses, 2 with different keys', ctx => { + ctx.setup(() => { + ctx.start(); + ctx.emitPacket({ + data: [{ name: 'DataA-1' } as DataFrame], + key: 'A', + }); + ctx.emitPacket({ + data: [{ name: 'DataA-2' } as DataFrame], + key: 'A', + }); + ctx.emitPacket({ + data: [{ name: 'DataB-1' } as DataFrame], + key: 'B', + }); + }); + + it('should emit 3 seperate results', () => { + expect(ctx.results.length).toBe(3); + }); + + it('should combine results and return latest data for key A', () => { + expect(ctx.results[2].series).toEqual([{ name: 'DataA-2' }, { name: 'DataB-1' }]); + }); + + it('should have loading state Done', () => { + expect(ctx.results[2].state).toEqual(LoadingState.Done); + }); + }); + + runRequestScenario('After response with state Streaming', ctx => { + ctx.setup(() => { + ctx.start(); + ctx.emitPacket({ + data: [{ name: 'DataA-1' } as DataFrame], + key: 'A', + }); + ctx.emitPacket({ + data: [{ name: 'DataA-2' } as DataFrame], + key: 'A', + state: LoadingState.Streaming, + }); + }); + + it('should have loading state Streaming', () => { + expect(ctx.results[1].state).toEqual(LoadingState.Streaming); + }); + }); + + runRequestScenario('If no response after 250ms', ctx => { + ctx.setup(async () => { + ctx.start(); + await sleep(250); + }); + + it('should emit 1 result with loading state', () => { + expect(ctx.results.length).toBe(1); + expect(ctx.results[0].state).toBe(LoadingState.Loading); + }); + }); + + runRequestScenario('on thrown error', ctx => { + ctx.setup(() => { + ctx.error = new Error('Ohh no'); + ctx.start(); + }); + + it('should emit 1 error result', () => { + expect(ctx.results[0].error.message).toBe('Ohh no'); + expect(ctx.results[0].state).toBe(LoadingState.Error); + }); + }); + + runRequestScenario('If time range is relative', ctx => { + ctx.setup(async () => { + ctx.start(); + // wait a bit + await sleep(20); + + ctx.emitPacket({ data: [{ name: 'DataB-1' } as DataFrame] }); + }); + + it('should update returned request range', () => { + expect(ctx.results[0].request.range.to.valueOf()).not.toBe(ctx.fromStartTime); + }); + }); +}); + +async function sleep(ms: number) { + return new Promise(resolve => { + setTimeout(resolve, ms); + }); +} diff --git a/public/app/features/dashboard/state/runRequest.ts b/public/app/features/dashboard/state/runRequest.ts new file mode 100644 index 00000000000..9a41a060ec9 --- /dev/null +++ b/public/app/features/dashboard/state/runRequest.ts @@ -0,0 +1,211 @@ +// Libraries +import { Observable, of, timer, merge, from } from 'rxjs'; +import { flatten, map as lodashMap, isArray, isString } from 'lodash'; +import { map, catchError, takeUntil, mapTo, share, finalize } from 'rxjs/operators'; +// Utils & Services +import { getBackendSrv } from 'app/core/services/backend_srv'; +// Types +import { + DataSourceApi, + DataQueryRequest, + PanelData, + DataQueryResponse, + DataQueryResponseData, + DataQueryError, +} from '@grafana/ui'; + +import { LoadingState, dateMath, toDataFrame, DataFrame, guessFieldTypes } from '@grafana/data'; + +type MapOfResponsePackets = { [str: string]: DataQueryResponse }; + +interface RunningQueryState { + packets: { [key: string]: DataQueryResponse }; + panelData: PanelData; +} + +/* + * This function should handle composing a PanelData from multiple responses + */ +export function processResponsePacket(packet: DataQueryResponse, state: RunningQueryState): RunningQueryState { + const request = state.panelData.request; + const packets: MapOfResponsePackets = { + ...state.packets, + }; + + packets[packet.key || 'A'] = packet; + + // Update the time range + let timeRange = request.range; + if (isString(timeRange.raw.from)) { + timeRange = { + from: dateMath.parse(timeRange.raw.from, false), + to: dateMath.parse(timeRange.raw.to, true), + raw: timeRange.raw, + }; + } + + const combinedData = flatten( + lodashMap(packets, (packet: DataQueryResponse) => { + return packet.data; + }) + ); + + const panelData = { + state: packet.state || LoadingState.Done, + series: combinedData, + request: { + ...request, + range: timeRange, + }, + }; + + return { packets, panelData }; +} + +/** + * This function handles the excecution of requests & and processes the single or multiple response packets into + * a combined PanelData response. + * It will + * * Merge multiple responses into a single DataFrame array based on the packet key + * * Will emit a loading state if no response after 50ms + * * Cancel any still runnning network requests on unsubscribe (using request.requestId) + */ +export function runRequest(datasource: DataSourceApi, request: DataQueryRequest): Observable { + let state: RunningQueryState = { + panelData: { + state: LoadingState.Loading, + series: [], + request: request, + }, + packets: {}, + }; + + // Return early if there are no queries to run + if (!request.targets.length) { + request.endTime = Date.now(); + state.panelData.state = LoadingState.Done; + return of(state.panelData); + } + + const dataObservable = callQueryMethod(datasource, request).pipe( + // Transform response packets into PanelData with merged results + map((packet: DataQueryResponse) => { + if (!isArray(packet.data)) { + throw new Error(`Expected response data to be array, got ${typeof packet.data}.`); + } + + request.endTime = Date.now(); + + state = processResponsePacket(packet, state); + return state.panelData; + }), + // handle errors + catchError(err => + of({ + ...state.panelData, + state: LoadingState.Error, + error: processQueryError(err), + }) + ), + // finalize is triggered when subscriber unsubscribes + // This makes sure any still running network requests are cancelled + finalize(cancelNetworkRequestsOnUnsubscribe(request)), + // this makes it possible to share this observable in takeUntil + share() + ); + + // If 50ms without a response emit a loading state + // mapTo will translate the timer event into state.panelData (which has state set to loading) + // takeUntil will cancel the timer emit when first response packet is received on the dataObservable + return merge( + timer(200).pipe( + mapTo(state.panelData), + takeUntil(dataObservable) + ), + dataObservable + ); +} + +function cancelNetworkRequestsOnUnsubscribe(req: DataQueryRequest) { + return () => { + getBackendSrv().resolveCancelerIfExists(req.requestId); + }; +} + +export function callQueryMethod(datasource: DataSourceApi, request: DataQueryRequest) { + const returnVal = datasource.query(request); + return from(returnVal); +} + +export function processQueryError(err: any): DataQueryError { + const error = (err || {}) as DataQueryError; + + if (!error.message) { + if (typeof err === 'string' || err instanceof String) { + return { message: err } as DataQueryError; + } + + let message = 'Query error'; + if (error.message) { + message = error.message; + } else if (error.data && error.data.message) { + message = error.data.message; + } else if (error.data && error.data.error) { + message = error.data.error; + } else if (error.status) { + message = `Query error: ${error.status} ${error.statusText}`; + } + error.message = message; + } + + return error; +} + +/** + * All panels will be passed tables that have our best guess at colum type set + * + * This is also used by PanelChrome for snapshot support + */ +export function getProcessedDataFrames(results?: DataQueryResponseData[]): DataFrame[] { + if (!isArray(results)) { + return []; + } + + const dataFrames: DataFrame[] = []; + + for (const result of results) { + const dataFrame = guessFieldTypes(toDataFrame(result)); + + // clear out any cached calcs + for (const field of dataFrame.fields) { + field.calcs = null; + } + + dataFrames.push(dataFrame); + } + + return dataFrames; +} + +export function preProcessPanelData() { + let lastResult: PanelData = null; + + return function mapper(data: PanelData) { + let { series } = data; + + // for loading states with no data, use last result + if (data.state === LoadingState.Loading && series.length === 0) { + if (!lastResult) { + lastResult = data; + } + + return { ...lastResult, state: LoadingState.Loading }; + } + + // Makes sure the data is properly formatted + series = getProcessedDataFrames(series); + + lastResult = { ...data, series }; + return lastResult; + }; +} diff --git a/public/app/features/explore/state/actionTypes.ts b/public/app/features/explore/state/actionTypes.ts index 30c4df246b7..03ce5883906 100644 --- a/public/app/features/explore/state/actionTypes.ts +++ b/public/app/features/explore/state/actionTypes.ts @@ -1,4 +1,5 @@ // Types +import { Unsubscribable } from 'rxjs'; import { Emitter } from 'app/core/core'; import { DataQuery, DataSourceSelectItem, DataSourceApi, QueryFixAction, PanelData } from '@grafana/ui'; @@ -128,6 +129,11 @@ export interface QueryEndedPayload { response: PanelData; } +export interface QueryStoreSubscriptionPayload { + exploreId: ExploreId; + querySubscription: Unsubscribable; +} + export interface HistoryUpdatedPayload { exploreId: ExploreId; history: HistoryItem[]; @@ -307,6 +313,10 @@ export const queryStreamUpdatedAction = actionCreatorFactory( 'explore/QUERY_STREAM_UPDATED' ).create(); +export const queryStoreSubscriptionAction = actionCreatorFactory( + 'explore/QUERY_STORE_SUBSCRIPTION' +).create(); + /** * Remove query row of the given index, as well as associated query results. */ diff --git a/public/app/features/explore/state/actions.ts b/public/app/features/explore/state/actions.ts index 675e465dd56..f5fb5091082 100644 --- a/public/app/features/explore/state/actions.ts +++ b/public/app/features/explore/state/actions.ts @@ -1,4 +1,5 @@ // Libraries +import { map } from 'rxjs/operators'; // Services & Utils import store from 'app/core/store'; import { getDatasourceSrv } from 'app/features/plugins/datasource_srv'; @@ -63,21 +64,20 @@ import { loadExploreDatasources, changeModeAction, scanStopAction, - queryStartAction, setUrlReplacedAction, changeRangeAction, historyUpdatedAction, - queryEndedAction, queryStreamUpdatedAction, + queryStoreSubscriptionAction, clearOriginAction, } from './actionTypes'; import { ActionOf, ActionCreator } from 'app/core/redux/actionCreatorFactory'; import { getTimeZone } from 'app/features/profile/state/selectors'; import { offOption } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker'; import { getShiftedTimeRange } from 'app/core/utils/timePicker'; -import _ from 'lodash'; import { updateLocation } from '../../../core/actions'; import { getTimeSrv } from '../../dashboard/services/TimeSrv'; +import { runRequest, preProcessPanelData } from '../../dashboard/state/runRequest'; /** * Updates UI state and save it to the URL @@ -436,12 +436,14 @@ export function runQueries(exploreId: ExploreId): ThunkResult { datasourceError, containerWidth, isLive: live, - queryState, queryIntervals, range, scanning, + querySubscription, history, mode, + showingGraph, + showingTable, } = exploreItemState; if (datasourceError) { @@ -459,10 +461,7 @@ export function runQueries(exploreId: ExploreId): ThunkResult { // but we're using the datasource interval limit for now const interval = datasourceInstance.interval; - stopQueryState(queryState, 'New request issued'); - - queryState.sendFrames = true; - queryState.sendLegacy = true; + stopQueryState(querySubscription); const queryOptions = { interval, @@ -470,32 +469,32 @@ export function runQueries(exploreId: ExploreId): ThunkResult { // TODO: not sure if this makes sense for normal query when using both graph and table maxDataPoints: mode === ExploreMode.Logs ? 1000 : containerWidth, live, + showingGraph, + showingTable, }; + const datasourceId = datasourceInstance.meta.id; const transaction = buildQueryTransaction(queries, queryOptions, range, queryIntervals, scanning); - queryState.onStreamingDataUpdated = () => { - const response = queryState.validateStreamsAndGetPanelData(); - dispatch(queryStreamUpdatedAction({ exploreId, response })); - }; + let firstResponse = true; - dispatch(queryStartAction({ exploreId })); - - queryState - .execute(datasourceInstance, transaction.request) - .then((response: PanelData) => { - if (!response.error) { + const newQuerySub = runRequest(datasourceInstance, transaction.request) + .pipe(map(preProcessPanelData())) + .subscribe((data: PanelData) => { + if (!data.error && firstResponse) { // Side-effect: Saving history in localstorage const nextHistory = updateHistory(history, datasourceId, queries); dispatch(historyUpdatedAction({ exploreId, history: nextHistory })); + dispatch(stateSave()); } - dispatch(queryEndedAction({ exploreId, response })); - dispatch(stateSave()); + firstResponse = false; + + dispatch(queryStreamUpdatedAction({ exploreId, response: data })); // Keep scanning for results if this was the last scanning transaction if (getState().explore[exploreId].scanning) { - if (_.size(response.series) === 0) { + if (data.state === LoadingState.Done && data.series.length === 0) { const range = getShiftedTimeRange(-1, getState().explore[exploreId].range); dispatch(updateTime({ exploreId, absoluteRange: range })); dispatch(runQueries(exploreId)); @@ -504,15 +503,9 @@ export function runQueries(exploreId: ExploreId): ThunkResult { dispatch(scanStopAction({ exploreId })); } } - }) - .catch(error => { - dispatch( - queryEndedAction({ - exploreId, - response: { error, legacy: [], series: [], request: transaction.request, state: LoadingState.Error }, - }) - ); }); + + dispatch(queryStoreSubscriptionAction({ exploreId, querySubscription: newQuerySub })); }; } diff --git a/public/app/features/explore/state/reducers.test.ts b/public/app/features/explore/state/reducers.test.ts index 18005d2def4..bf09628ef70 100644 --- a/public/app/features/explore/state/reducers.test.ts +++ b/public/app/features/explore/state/reducers.test.ts @@ -18,6 +18,8 @@ import { splitCloseAction, changeModeAction, scanStopAction, + toggleGraphAction, + toggleTableAction, } from './actionTypes'; import { Reducer } from 'redux'; import { ActionOf } from 'app/core/redux/actionCreatorFactory'; @@ -26,14 +28,12 @@ import { serializeStateToUrlParam } from 'app/core/utils/explore'; import TableModel from 'app/core/table_model'; import { DataSourceApi, DataQuery } from '@grafana/ui'; import { LogsModel, LogsDedupStrategy } from '@grafana/data'; -import { PanelQueryState } from '../../dashboard/state/PanelQueryState'; describe('Explore item reducer', () => { describe('scanning', () => { it('should start scanning', () => { const initalState = { ...makeExploreItemState(), - queryState: null as PanelQueryState, scanning: false, }; @@ -42,14 +42,12 @@ describe('Explore item reducer', () => { .whenActionIsDispatched(scanStartAction({ exploreId: ExploreId.left })) .thenStateShouldEqual({ ...makeExploreItemState(), - queryState: null as PanelQueryState, scanning: true, }); }); it('should stop scanning', () => { const initalState = { ...makeExploreItemState(), - queryState: null as PanelQueryState, scanning: true, scanRange: {}, }; @@ -59,7 +57,6 @@ describe('Explore item reducer', () => { .whenActionIsDispatched(scanStopAction({ exploreId: ExploreId.left })) .thenStateShouldEqual({ ...makeExploreItemState(), - queryState: null as PanelQueryState, scanning: false, scanRange: undefined, }); @@ -175,6 +172,30 @@ describe('Explore item reducer', () => { }); }); }); + + describe('toggling panels', () => { + describe('when toggleGraphAction is dispatched', () => { + it('then it should set correct state', () => { + reducerTester() + .givenReducer(itemReducer, { graphResult: [] }) + .whenActionIsDispatched(toggleGraphAction({ exploreId: ExploreId.left })) + .thenStateShouldEqual({ showingGraph: true, graphResult: [] }) + .whenActionIsDispatched(toggleGraphAction({ exploreId: ExploreId.left })) + .thenStateShouldEqual({ showingGraph: false, graphResult: null }); + }); + }); + + describe('when toggleTableAction is dispatched', () => { + it('then it should set correct state', () => { + reducerTester() + .givenReducer(itemReducer, { tableResult: {} }) + .whenActionIsDispatched(toggleTableAction({ exploreId: ExploreId.left })) + .thenStateShouldEqual({ showingTable: true, tableResult: {} }) + .whenActionIsDispatched(toggleTableAction({ exploreId: ExploreId.left })) + .thenStateShouldEqual({ showingTable: false, tableResult: new TableModel() }); + }); + }); + }); }); export const setup = (urlStateOverrides?: any) => { diff --git a/public/app/features/explore/state/reducers.ts b/public/app/features/explore/state/reducers.ts index 7828a76fff9..1d8b0d2471f 100644 --- a/public/app/features/explore/state/reducers.ts +++ b/public/app/features/explore/state/reducers.ts @@ -8,10 +8,9 @@ import { sortLogsResult, stopQueryState, refreshIntervalToSortOrder, - instanceOfDataQueryError, } from 'app/core/utils/explore'; import { ExploreItemState, ExploreState, ExploreId, ExploreUpdateState, ExploreMode } from 'app/types/explore'; -import { LoadingState } from '@grafana/data'; +import { LoadingState, toLegacyResponseData } from '@grafana/data'; import { DataQuery, DataSourceApi, PanelData } from '@grafana/ui'; import { HigherOrderAction, @@ -29,9 +28,6 @@ import { queryStartAction, changeRangeAction, clearOriginAction, -} from './actionTypes'; - -import { addQueryRowAction, changeQueryAction, changeSizeAction, @@ -53,17 +49,17 @@ import { toggleLogLevelAction, changeLoadingStateAction, resetExploreAction, - queryEndedAction, queryStreamUpdatedAction, QueryEndedPayload, + queryStoreSubscriptionAction, setPausedStateAction, + toggleGraphAction, } from './actionTypes'; import { reducerFactory, ActionOf } from 'app/core/redux'; import { updateLocation } from 'app/core/actions/location'; import { LocationUpdate } from '@grafana/runtime'; import TableModel from 'app/core/table_model'; import { isLive } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker'; -import { PanelQueryState, toDataQueryError } from '../../dashboard/state/PanelQueryState'; import { ResultProcessor } from '../utils/ResultProcessor'; export const DEFAULT_RANGE = { @@ -121,7 +117,6 @@ export const makeExploreItemState = (): ExploreItemState => ({ isLive: false, isPaused: false, urlReplaced: false, - queryState: new PanelQueryState(), queryResponse: createEmptyQueryResponse(), }); @@ -129,7 +124,6 @@ export const createEmptyQueryResponse = (): PanelData => ({ state: LoadingState.NotStarted, request: null, series: [], - legacy: null, error: null, }); @@ -203,8 +197,9 @@ export const itemReducer = reducerFactory({} as ExploreItemSta const live = isLive(refreshInterval); const sortOrder = refreshIntervalToSortOrder(refreshInterval); const logsResult = sortLogsResult(state.logsResult, sortOrder); + if (isLive(state.refreshInterval) && !live) { - stopQueryState(state.queryState, 'Live streaming stopped'); + stopQueryState(state.querySubscription); } return { @@ -225,7 +220,7 @@ export const itemReducer = reducerFactory({} as ExploreItemSta filter: clearQueriesAction, mapper: (state): ExploreItemState => { const queries = ensureQueries(); - stopQueryState(state.queryState, 'Queries cleared'); + stopQueryState(state.querySubscription); return { ...state, queries: queries.slice(), @@ -284,7 +279,7 @@ export const itemReducer = reducerFactory({} as ExploreItemSta // Custom components const StartPage = datasourceInstance.components.ExploreStartPage; - stopQueryState(state.queryState, 'Datasource changed'); + stopQueryState(state.querySubscription); return { ...state, @@ -440,15 +435,26 @@ export const itemReducer = reducerFactory({} as ExploreItemSta return { ...state, ...action.payload }; }, }) + .addMapper({ + filter: toggleGraphAction, + mapper: (state): ExploreItemState => { + const showingGraph = !state.showingGraph; + if (showingGraph) { + return { ...state, showingGraph }; + } + + return { ...state, showingGraph, graphResult: null }; + }, + }) .addMapper({ filter: toggleTableAction, mapper: (state): ExploreItemState => { const showingTable = !state.showingTable; if (showingTable) { - return { ...state }; + return { ...state, showingTable }; } - return { ...state, tableResult: new TableModel() }; + return { ...state, showingTable, tableResult: new TableModel() }; }, }) .addMapper({ @@ -566,10 +572,13 @@ export const itemReducer = reducerFactory({} as ExploreItemSta }, }) .addMapper({ - //queryStreamUpdatedAction - filter: queryEndedAction, + filter: queryStoreSubscriptionAction, mapper: (state, action): ExploreItemState => { - return processQueryResponse(state, action); + const { querySubscription } = action.payload; + return { + ...state, + querySubscription, + }; }, }) .addMapper({ @@ -585,7 +594,7 @@ export const processQueryResponse = ( action: ActionOf ): ExploreItemState => { const { response } = action.payload; - const { request, state: loadingState, series, legacy, error } = response; + const { request, state: loadingState, series, error } = response; if (error) { if (error.cancelled) { @@ -595,12 +604,6 @@ export const processQueryResponse = ( // For Angular editors state.eventBridge.emit('data-error', error); - console.error(error); // To help finding problems with query syntax - - if (!instanceOfDataQueryError(error)) { - response.error = toDataQueryError(error); - } - return { ...state, loading: false, @@ -613,19 +616,26 @@ export const processQueryResponse = ( }; } - const latency = request.endTime - request.startTime; + const latency = request.endTime ? request.endTime - request.startTime : 0; const processor = new ResultProcessor(state, series); + const graphResult = processor.getGraphResult() || state.graphResult; // don't replace results until we receive new results + const tableResult = processor.getTableResult() || state.tableResult || new TableModel(); // don't replace results until we receive new results + const logsResult = processor.getLogsResult(); - // For Angular editors - state.eventBridge.emit('data-received', legacy); + // Send legacy data to Angular editors + if (state.datasourceInstance.components.QueryCtrl) { + const legacy = series.map(v => toLegacyResponseData(v)); + + state.eventBridge.emit('data-received', legacy); + } return { ...state, latency, queryResponse: response, - graphResult: processor.getGraphResult(), - tableResult: processor.getTableResult(), - logsResult: processor.getLogsResult(), + graphResult, + tableResult, + logsResult, loading: loadingState === LoadingState.Loading || loadingState === LoadingState.Streaming, showingStartPage: false, update: makeInitialUpdateState(), @@ -757,8 +767,8 @@ export const exploreReducer = (state = initialExploreState, action: HigherOrderA case resetExploreAction.type: { const leftState = state[ExploreId.left]; const rightState = state[ExploreId.right]; - stopQueryState(leftState.queryState, 'Navigated away from Explore'); - stopQueryState(rightState.queryState, 'Navigated away from Explore'); + stopQueryState(leftState.querySubscription); + stopQueryState(rightState.querySubscription); return { ...state, diff --git a/public/app/features/explore/utils/ResultProcessor.test.ts b/public/app/features/explore/utils/ResultProcessor.test.ts index faa649d83ac..9fa187ea45e 100644 --- a/public/app/features/explore/utils/ResultProcessor.test.ts +++ b/public/app/features/explore/utils/ResultProcessor.test.ts @@ -67,20 +67,20 @@ const testContext = (options: any = {}) => { describe('ResultProcessor', () => { describe('constructed without result', () => { describe('when calling getGraphResult', () => { - it('then it should return an empty array', () => { + it('then it should return null', () => { const { resultProcessor } = testContext({ dataFrames: [] }); const theResult = resultProcessor.getGraphResult(); - expect(theResult).toEqual([]); + expect(theResult).toEqual(null); }); }); describe('when calling getTableResult', () => { - it('then it should return an empty TableModel', () => { + it('then it should return null', () => { const { resultProcessor } = testContext({ dataFrames: [] }); const theResult = resultProcessor.getTableResult(); - expect(theResult).toEqual(new TableModel()); + expect(theResult).toEqual(null); }); }); diff --git a/public/app/features/explore/utils/ResultProcessor.ts b/public/app/features/explore/utils/ResultProcessor.ts index 6633e9238cc..45fe31cb010 100644 --- a/public/app/features/explore/utils/ResultProcessor.ts +++ b/public/app/features/explore/utils/ResultProcessor.ts @@ -11,11 +11,15 @@ export class ResultProcessor { getGraphResult(): GraphSeriesXY[] { if (this.state.mode !== ExploreMode.Metrics) { - return []; + return null; } const onlyTimeSeries = this.dataFrames.filter(isTimeSeries); + if (onlyTimeSeries.length === 0) { + return null; + } + return getGraphSeriesModel( onlyTimeSeries, {}, @@ -26,7 +30,7 @@ export class ResultProcessor { getTableResult(): TableModel { if (this.state.mode !== ExploreMode.Metrics) { - return new TableModel(); + return null; } // For now ignore time series @@ -34,6 +38,10 @@ export class ResultProcessor { // Ignore time series only for prometheus const onlyTables = this.dataFrames.filter(frame => !isTimeSeries(frame)); + if (onlyTables.length === 0) { + return null; + } + const tables = onlyTables.map(frame => { const { fields } = frame; const fieldCount = fields.length; diff --git a/public/app/features/panel/metrics_panel_ctrl.ts b/public/app/features/panel/metrics_panel_ctrl.ts index edf93c20461..ee3c81e44f2 100644 --- a/public/app/features/panel/metrics_panel_ctrl.ts +++ b/public/app/features/panel/metrics_panel_ctrl.ts @@ -6,12 +6,11 @@ import { PanelCtrl } from 'app/features/panel/panel_ctrl'; import { getExploreUrl } from 'app/core/utils/explore'; import { applyPanelTimeOverrides, getResolution } from 'app/features/dashboard/utils/panel'; import { ContextSrv } from 'app/core/services/context_srv'; -import { toLegacyResponseData, isDataFrame, TimeRange, LoadingState, DataFrame, toDataFrameDTO } from '@grafana/data'; +import { toLegacyResponseData, TimeRange, LoadingState, DataFrame, toDataFrameDTO } from '@grafana/data'; import { LegacyResponseData, DataSourceApi, PanelData, DataQueryResponse } from '@grafana/ui'; import { Unsubscribable } from 'rxjs'; import { PanelModel } from 'app/features/dashboard/state'; -import { PanelQueryRunnerFormat } from '../dashboard/state/PanelQueryRunner'; class MetricsPanelCtrl extends PanelCtrl { scope: any; @@ -30,7 +29,7 @@ class MetricsPanelCtrl extends PanelCtrl { skipDataOnInit: boolean; dataList: LegacyResponseData[]; querySubscription?: Unsubscribable; - dataFormat = PanelQueryRunnerFormat.legacy; + useDataFrames = false; constructor($scope: any, $injector: any) { super($scope, $injector); @@ -141,22 +140,12 @@ class MetricsPanelCtrl extends PanelCtrl { } } - if (this.dataFormat === PanelQueryRunnerFormat.legacy) { - // The result should already be processed, but just in case - if (!data.legacy) { - data.legacy = data.series.map(v => { - if (isDataFrame(v)) { - return toLegacyResponseData(v); - } - return v; - }); - } - - // Make the results look like they came directly from a <6.2 datasource request - // NOTE: any object other than 'data' is no longer supported supported - this.handleQueryResult({ data: data.legacy }); - } else { + if (this.useDataFrames) { this.handleDataFrames(data.series); + } else { + // Make the results look as if they came directly from a <6.2 datasource request + const legacy = data.series.map(v => toLegacyResponseData(v)); + this.handleQueryResult({ data: legacy }); } }, }; @@ -197,7 +186,7 @@ class MetricsPanelCtrl extends PanelCtrl { const queryRunner = panel.getQueryRunner(); if (!this.querySubscription) { - this.querySubscription = queryRunner.subscribe(this.panelDataObserver, this.dataFormat); + this.querySubscription = queryRunner.getData().subscribe(this.panelDataObserver); } return queryRunner.run({ diff --git a/public/app/plugins/datasource/dashboard/DashboardQueryEditor.tsx b/public/app/plugins/datasource/dashboard/DashboardQueryEditor.tsx index 7434f0cf363..925b55dd6de 100644 --- a/public/app/plugins/datasource/dashboard/DashboardQueryEditor.tsx +++ b/public/app/plugins/datasource/dashboard/DashboardQueryEditor.tsx @@ -9,7 +9,7 @@ import config from 'app/core/config'; import { css } from 'emotion'; import { getDatasourceSrv } from 'app/features/plugins/datasource_srv'; import { PanelModel } from 'app/features/dashboard/state'; -import { SHARED_DASHBODARD_QUERY } from './SharedQueryRunner'; +import { SHARED_DASHBODARD_QUERY } from './types'; import { getDashboardSrv } from 'app/features/dashboard/services/DashboardSrv'; import { filterPanelDataToQuery } from 'app/features/dashboard/panel_editor/QueryEditorRow'; diff --git a/public/app/plugins/datasource/dashboard/SharedQueryRunner.ts b/public/app/plugins/datasource/dashboard/SharedQueryRunner.ts deleted file mode 100644 index 1a56b4dac34..00000000000 --- a/public/app/plugins/datasource/dashboard/SharedQueryRunner.ts +++ /dev/null @@ -1,116 +0,0 @@ -import { DataSourceApi, DataQuery, PanelData } from '@grafana/ui'; -import { PanelQueryRunner, QueryRunnerOptions } from 'app/features/dashboard/state/PanelQueryRunner'; -import { toDataQueryError } from 'app/features/dashboard/state/PanelQueryState'; -import { DashboardQuery } from './types'; -import { getDashboardSrv } from 'app/features/dashboard/services/DashboardSrv'; -import { Unsubscribable } from 'rxjs'; -import { PanelModel } from 'app/features/dashboard/state'; -import { LoadingState } from '@grafana/data'; - -export const SHARED_DASHBODARD_QUERY = '-- Dashboard --'; - -export function isSharedDashboardQuery(datasource: string | DataSourceApi) { - if (!datasource) { - // default datasource - return false; - } - if (datasource === SHARED_DASHBODARD_QUERY) { - return true; - } - const ds = datasource as DataSourceApi; - return ds.meta && ds.meta.name === SHARED_DASHBODARD_QUERY; -} - -export class SharedQueryRunner { - private containerPanel: PanelModel; - private listenToPanelId: number; - private listenToPanel: PanelModel; - private listenToRunner: PanelQueryRunner; - private subscription: Unsubscribable; - - constructor(private runner: PanelQueryRunner) { - this.containerPanel = getDashboardSrv() - .getCurrent() - .getPanelById(runner.getPanelId()); - } - - process(options: QueryRunnerOptions): Promise { - const panelId = getPanelIdFromQuery(options.queries); - - if (!panelId) { - this.disconnect(); - return getQueryError('Missing panel reference ID'); - } - - // The requested panel changed - if (this.listenToPanelId !== panelId) { - this.disconnect(); - - this.listenToPanel = getDashboardSrv() - .getCurrent() - .getPanelById(panelId); - - if (!this.listenToPanel) { - return getQueryError('Unknown Panel: ' + panelId); - } - - this.listenToPanelId = panelId; - this.listenToRunner = this.listenToPanel.getQueryRunner(); - this.subscription = this.listenToRunner.chain(this.runner); - this.runner.setState(this.listenToRunner.getState()); - console.log('Connecting panel: ', this.containerPanel.id, 'to:', this.listenToPanelId); - } - - // If the target has refreshed recently, use the exising data - const data = this.listenToRunner.getCurrentData(); - if (data.request && data.request.startTime) { - const elapsed = Date.now() - data.request.startTime; - if (elapsed < 150) { - return Promise.resolve(data); - } - } - - // When fullscreen run with the current panel settings - if (this.containerPanel.fullscreen) { - const { datasource, targets } = this.listenToPanel; - const modified = { - ...options, - panelId, - datasource, - queries: targets, - }; - return this.listenToRunner.run(modified); - } else { - this.listenToPanel.refresh(); - } - - return Promise.resolve(data); - } - - disconnect() { - if (this.subscription) { - this.subscription.unsubscribe(); - this.subscription = null; - } - if (this.listenToPanel) { - this.listenToPanel = null; - } - this.listenToPanelId = undefined; - } -} - -function getPanelIdFromQuery(queries: DataQuery[]): number | undefined { - if (!queries || !queries.length) { - return undefined; - } - return (queries[0] as DashboardQuery).panelId; -} - -function getQueryError(msg: string): Promise { - return Promise.resolve({ - state: LoadingState.Error, - series: [], - legacy: [], - error: toDataQueryError(msg), - }); -} diff --git a/public/app/plugins/datasource/dashboard/index.ts b/public/app/plugins/datasource/dashboard/index.ts new file mode 100644 index 00000000000..23645ba6a91 --- /dev/null +++ b/public/app/plugins/datasource/dashboard/index.ts @@ -0,0 +1,3 @@ +export { isSharedDashboardQuery, runSharedRequest } from './runSharedRequest'; +export { DashboardQueryEditor } from './DashboardQueryEditor'; +export { SHARED_DASHBODARD_QUERY } from './types'; diff --git a/public/app/plugins/datasource/dashboard/SharedQueryRunner.test.ts b/public/app/plugins/datasource/dashboard/runSharedRequest.test.ts similarity index 90% rename from public/app/plugins/datasource/dashboard/SharedQueryRunner.test.ts rename to public/app/plugins/datasource/dashboard/runSharedRequest.test.ts index 6ae758ac3e0..2a3e17fb87f 100644 --- a/public/app/plugins/datasource/dashboard/SharedQueryRunner.test.ts +++ b/public/app/plugins/datasource/dashboard/runSharedRequest.test.ts @@ -1,4 +1,4 @@ -import { isSharedDashboardQuery } from './SharedQueryRunner'; +import { isSharedDashboardQuery } from './runSharedRequest'; import { DataSourceApi } from '@grafana/ui'; describe('SharedQueryRunner', () => { diff --git a/public/app/plugins/datasource/dashboard/runSharedRequest.ts b/public/app/plugins/datasource/dashboard/runSharedRequest.ts new file mode 100644 index 00000000000..603cd65a7af --- /dev/null +++ b/public/app/plugins/datasource/dashboard/runSharedRequest.ts @@ -0,0 +1,80 @@ +import { Observable } from 'rxjs'; +import { DataQuery, PanelData, DataSourceApi } from '@grafana/ui'; +import { QueryRunnerOptions } from 'app/features/dashboard/state/PanelQueryRunner'; +import { DashboardQuery } from './types'; +import { getDashboardSrv } from 'app/features/dashboard/services/DashboardSrv'; +import { LoadingState } from '@grafana/data'; +import { SHARED_DASHBODARD_QUERY } from './types'; + +export function isSharedDashboardQuery(datasource: string | DataSourceApi) { + if (!datasource) { + // default datasource + return false; + } + if (datasource === SHARED_DASHBODARD_QUERY) { + return true; + } + const ds = datasource as DataSourceApi; + return ds.meta && ds.meta.name === SHARED_DASHBODARD_QUERY; +} + +export function runSharedRequest(options: QueryRunnerOptions): Observable { + return new Observable(subscriber => { + const dashboard = getDashboardSrv().getCurrent(); + const listenToPanelId = getPanelIdFromQuery(options.queries); + + if (!listenToPanelId) { + subscriber.next(getQueryError('Missing panel reference ID')); + return null; + } + + const currentPanel = dashboard.getPanelById(options.panelId); + const listenToPanel = dashboard.getPanelById(listenToPanelId); + + if (!listenToPanel) { + subscriber.next(getQueryError('Unknown Panel: ' + listenToPanelId)); + return null; + } + + const listenToRunner = listenToPanel.getQueryRunner(); + const subscription = listenToRunner.getData(false).subscribe({ + next: (data: PanelData) => { + console.log('got data from other panel', data); + subscriber.next(data); + }, + }); + + // If we are in fullscreen the other panel will not execute any queries + // So we have to trigger it from here + if (currentPanel.fullscreen) { + const { datasource, targets } = listenToPanel; + const modified = { + ...options, + datasource, + panelId: listenToPanelId, + queries: targets, + }; + listenToRunner.run(modified); + } + + return () => { + console.log('runSharedRequest unsubscribe'); + subscription.unsubscribe(); + }; + }); +} + +function getPanelIdFromQuery(queries: DataQuery[]): number | undefined { + if (!queries || !queries.length) { + return undefined; + } + return (queries[0] as DashboardQuery).panelId; +} + +function getQueryError(msg: string): PanelData { + return { + state: LoadingState.Error, + series: [], + error: { message: msg }, + }; +} diff --git a/public/app/plugins/datasource/dashboard/types.ts b/public/app/plugins/datasource/dashboard/types.ts index 6207165a797..c9c55805339 100644 --- a/public/app/plugins/datasource/dashboard/types.ts +++ b/public/app/plugins/datasource/dashboard/types.ts @@ -1,5 +1,7 @@ import { DataQuery } from '@grafana/ui/src/types'; +export const SHARED_DASHBODARD_QUERY = '-- Dashboard --'; + export interface DashboardQuery extends DataQuery { panelId?: number; } diff --git a/public/app/plugins/datasource/loki/datasource.test.ts b/public/app/plugins/datasource/loki/datasource.test.ts index c4239134382..66507c2211a 100644 --- a/public/app/plugins/datasource/loki/datasource.test.ts +++ b/public/app/plugins/datasource/loki/datasource.test.ts @@ -71,7 +71,7 @@ describe('LokiDatasource', () => { targets: [{ expr: '{} foo', refId: 'B' }], }); - const res = await ds.query(options); + const res = await ds.query(options).toPromise(); const dataFrame = res.data[0] as DataFrame; expect(dataFrame.fields[1].values.get(0)).toBe('hello'); diff --git a/public/app/plugins/datasource/loki/datasource.ts b/public/app/plugins/datasource/loki/datasource.ts index e0cc078c429..f04c5601761 100644 --- a/public/app/plugins/datasource/loki/datasource.ts +++ b/public/app/plugins/datasource/loki/datasource.ts @@ -5,10 +5,10 @@ import { dateMath, DataFrame, LogRowModel, - LoadingState, DateTime, AnnotationEvent, DataFrameView, + LoadingState, } from '@grafana/data'; import { addLabelToSelector } from 'app/plugins/datasource/prometheus/add_label_to_query'; import LanguageProvider from './language_provider'; @@ -21,7 +21,6 @@ import { DataSourceInstanceSettings, DataQueryError, DataQueryRequest, - DataStreamObserver, DataQueryResponse, AnnotationQueryRequest, } from '@grafana/ui'; @@ -31,6 +30,8 @@ import { BackendSrv } from 'app/core/services/backend_srv'; import { TemplateSrv } from 'app/features/templating/template_srv'; import { safeStringifyValue, convertToWebSocketUrl } from 'app/core/utils/explore'; import { LiveTarget, LiveStreams } from './live_streams'; +import { Observable, from, merge } from 'rxjs'; +import { map, filter } from 'rxjs/operators'; export const DEFAULT_MAX_LINES = 1000; @@ -167,79 +168,51 @@ export class LokiDatasource extends DataSourceApi { return series; }; - runLiveQueries = (options: DataQueryRequest, observer?: DataStreamObserver) => { - const liveTargets = options.targets - .filter(target => target.expr && !target.hide && target.live) - .map(target => this.prepareLiveTarget(target, options)); - - for (const liveTarget of liveTargets) { - // Reuse an existing stream if one is already running - const stream = this.streams.getStream(liveTarget); - const subscription = stream.subscribe({ - next: (data: DataFrame[]) => { - observer({ - key: `loki-${liveTarget.refId}`, - request: options, - state: LoadingState.Streaming, - data, - unsubscribe: () => { - subscription.unsubscribe(); - }, - }); - }, - error: (err: any) => { - observer({ - key: `loki-${liveTarget.refId}`, - request: options, - state: LoadingState.Error, - error: this.processError(err, liveTarget), - unsubscribe: () => { - subscription.unsubscribe(); - }, - }); - }, - }); - } + runLiveQuery = (options: DataQueryRequest, target: LokiQuery): Observable => { + const liveTarget = this.prepareLiveTarget(target, options); + const stream = this.streams.getStream(liveTarget); + return stream.pipe( + map(data => { + return { + data, + key: `loki-${liveTarget.refId}`, + state: LoadingState.Streaming, + }; + }) + ); }; - runQueries = async (options: DataQueryRequest): Promise<{ data: DataFrame[] }> => { - const queryTargets = options.targets - .filter(target => target.expr && !target.hide && !target.live) - .map(target => this.prepareQueryTarget(target, options)); - - if (queryTargets.length === 0) { - return Promise.resolve({ data: [] }); - } - - const queries = queryTargets.map(target => - this._request('/api/prom/query', target).catch((err: any) => { + runQuery = (options: DataQueryRequest, target: LokiQuery): Observable => { + const query = this.prepareQueryTarget(target, options); + return from( + this._request('/api/prom/query', query).catch((err: any) => { if (err.cancelled) { return err; } - const error: DataQueryError = this.processError(err, target); + const error: DataQueryError = this.processError(err, query); throw error; }) + ).pipe( + filter((response: any) => (response.cancelled ? false : true)), + map((response: any) => { + const data = this.processResult(response.data, query); + return { data, key: query.refId }; + }) ); - - return Promise.all(queries).then((results: any[]) => { - let series: DataFrame[] = []; - - for (let i = 0; i < results.length; i++) { - const result = results[i]; - if (result.data) { - series = series.concat(this.processResult(result.data, queryTargets[i])); - } - } - - return { data: series }; - }); }; - async query(options: DataQueryRequest, observer?: DataStreamObserver) { - this.runLiveQueries(options, observer); + query(options: DataQueryRequest): Observable { + const subQueries = options.targets + .filter(target => target.expr && !target.hide) + .map(target => { + if (target.live) { + return this.runLiveQuery(options, target); + } + return this.runQuery(options, target); + }); - return this.runQueries(options); + return merge(...subQueries); } async importQueries(queries: LokiQuery[], originMeta: PluginMeta): Promise { @@ -383,12 +356,14 @@ export class LokiDatasource extends DataSourceApi { return []; } - const query = queryRequestFromAnnotationOptions(options); - const { data } = await this.runQueries(query); + const request = queryRequestFromAnnotationOptions(options); + const { data } = await this.runQuery(request, request.targets[0]).toPromise(); const annotations: AnnotationEvent[] = []; + for (const frame of data) { - const tags = Object.values(frame.labels); + const tags = Object.values(frame.labels) as string[]; const view = new DataFrameView<{ ts: string; line: string }>(frame); + view.forEachRow(row => { annotations.push({ time: new Date(row.ts).valueOf(), diff --git a/public/app/plugins/datasource/mixed/MixedDataSource.test.ts b/public/app/plugins/datasource/mixed/MixedDataSource.test.ts index 3d25daee15f..c20aab66f37 100644 --- a/public/app/plugins/datasource/mixed/MixedDataSource.test.ts +++ b/public/app/plugins/datasource/mixed/MixedDataSource.test.ts @@ -3,6 +3,7 @@ import { getDataSourceSrv } from '@grafana/runtime'; import { getQueryOptions } from 'test/helpers/getQueryOptions'; import { DataSourceInstanceSettings } from '@grafana/ui'; import { MixedDatasource } from './module'; +import { from } from 'rxjs'; const defaultDS = new MockDataSourceApi('DefaultDS', { data: ['DDD'] }); const datasourceSrv = new DatasourceSrvMock(defaultDS, { @@ -26,10 +27,19 @@ describe('MixedDatasource', () => { { refId: 'QC', datasource: 'C' }, // 3 ], }); + const results: any[] = []; + + beforeEach(async () => { + const ds = await getDataSourceSrv().get('-- Mixed --'); + from(ds.query(requestMixed)).subscribe(result => { + results.push(result); + }); + }); it('direct query should return results', async () => { - const ds = await getDataSourceSrv().get('-- Mixed --'); - const res = await ds.query(requestMixed); - expect(res.data).toEqual(['AAAA', 'BBBB', 'CCCC']); + expect(results.length).toBe(3); + expect(results[0].data).toEqual(['AAAA']); + expect(results[1].data).toEqual(['BBBB']); + expect(results[2].data).toEqual(['CCCC']); }); }); diff --git a/public/app/plugins/datasource/mixed/MixedDataSource.ts b/public/app/plugins/datasource/mixed/MixedDataSource.ts index 5462d1e11d9..0962bc1c498 100644 --- a/public/app/plugins/datasource/mixed/MixedDataSource.ts +++ b/public/app/plugins/datasource/mixed/MixedDataSource.ts @@ -1,19 +1,10 @@ import cloneDeep from 'lodash/cloneDeep'; import groupBy from 'lodash/groupBy'; -import map from 'lodash/map'; -import flatten from 'lodash/flatten'; -import filter from 'lodash/filter'; - -import { - DataSourceApi, - DataQuery, - DataQueryRequest, - DataQueryResponse, - DataStreamObserver, - DataSourceInstanceSettings, -} from '@grafana/ui'; +import { from, of, Observable, merge } from 'rxjs'; +import { DataSourceApi, DataQuery, DataQueryRequest, DataQueryResponse, DataSourceInstanceSettings } from '@grafana/ui'; import { getDataSourceSrv } from '@grafana/runtime'; +import { mergeMap, map, filter } from 'rxjs/operators'; export const MIXED_DATASOURCE_NAME = '-- Mixed --'; @@ -22,43 +13,68 @@ export class MixedDatasource extends DataSourceApi { super(instanceSettings); } - async query(request: DataQueryRequest, observer: DataStreamObserver): Promise { + query(request: DataQueryRequest): Observable { // Remove any invalid queries const queries = request.targets.filter(t => { return t.datasource !== MIXED_DATASOURCE_NAME; }); if (!queries.length) { - return Promise.resolve({ data: [] }); // nothing + return of({ data: [] } as DataQueryResponse); // nothing } - const sets = groupBy(queries, 'datasource'); + const sets: { [key: string]: DataQuery[] } = groupBy(queries, 'datasource'); + const observables: Array> = []; - const promises = map(sets, (targets: DataQuery[]) => { + for (const key in sets) { + const targets = sets[key]; const dsName = targets[0].datasource; - return getDataSourceSrv() - .get(dsName) - .then((ds: DataSourceApi) => { - const opt = cloneDeep(request); + + const observable = from(getDataSourceSrv().get(dsName)).pipe( + map((dataSourceApi: DataSourceApi) => { + const datasourceRequest = cloneDeep(request); // Remove any unused hidden queries - if (!ds.meta.hiddenQueries) { - targets = filter(targets, (t: DataQuery) => { - return !t.hide; - }); - if (targets.length === 0) { - return { data: [] }; - } + let newTargets = targets.slice(); + if (!dataSourceApi.meta.hiddenQueries) { + newTargets = newTargets.filter((t: DataQuery) => !t.hide); } - opt.targets = targets; - return ds.query(opt); - }); - }); + datasourceRequest.targets = newTargets; + datasourceRequest.requestId = `${dsName}${datasourceRequest.requestId || ''}`; + return { + dataSourceApi, + datasourceRequest, + }; + }) + ); - return Promise.all(promises).then(results => { - return { data: flatten(map(results, 'data')) }; - }); + const noTargets = observable.pipe( + filter(({ datasourceRequest }) => datasourceRequest.targets.length === 0), + mergeMap(() => { + return of({ data: [] } as DataQueryResponse); + }) + ); + + const hasTargets = observable.pipe( + filter(({ datasourceRequest }) => datasourceRequest.targets.length > 0), + mergeMap(({ dataSourceApi, datasourceRequest }) => { + return from(dataSourceApi.query(datasourceRequest)).pipe( + map((response: DataQueryResponse) => { + return { + ...response, + data: response.data || [], + key: `${dsName}${response.key || ''}`, + } as DataQueryResponse; + }) + ); + }) + ); + + observables.push(merge(noTargets, hasTargets)); + } + + return merge(...observables); } testDatasource() { diff --git a/public/app/plugins/datasource/postgres/query_ctrl.ts b/public/app/plugins/datasource/postgres/query_ctrl.ts index 22982dd4fd8..e0c53934721 100644 --- a/public/app/plugins/datasource/postgres/query_ctrl.ts +++ b/public/app/plugins/datasource/postgres/query_ctrl.ts @@ -293,6 +293,7 @@ export class PostgresQueryCtrl extends QueryCtrl { onDataReceived(dataList: any) { this.lastQueryMeta = null; this.lastQueryError = null; + console.log('postgres query data received', dataList); const anySeriesFromQuery: any = _.find(dataList, { refId: this.target.refId }); if (anySeriesFromQuery) { diff --git a/public/app/plugins/datasource/prometheus/datasource.ts b/public/app/plugins/datasource/prometheus/datasource.ts index 100301bad91..98ac07ff94d 100644 --- a/public/app/plugins/datasource/prometheus/datasource.ts +++ b/public/app/plugins/datasource/prometheus/datasource.ts @@ -3,9 +3,9 @@ import _ from 'lodash'; import $ from 'jquery'; // Services & Utils import kbn from 'app/core/utils/kbn'; -import { dateMath, TimeRange, DateTime, AnnotationEvent, LoadingState } from '@grafana/data'; -import { Observable, from, of } from 'rxjs'; -import { single, filter, mergeMap, catchError } from 'rxjs/operators'; +import { dateMath, TimeRange, DateTime, AnnotationEvent } from '@grafana/data'; +import { Observable, from, of, merge } from 'rxjs'; +import { filter, map } from 'rxjs/operators'; import PrometheusMetricFindQuery from './metric_find_query'; import { ResultTransformer } from './result_transformer'; @@ -21,14 +21,14 @@ import { DataSourceApi, DataSourceInstanceSettings, DataQueryError, - DataStreamObserver, DataQueryResponseData, - DataStreamState, + DataQueryResponse, } from '@grafana/ui'; import { safeStringifyValue } from 'app/core/utils/explore'; import { TemplateSrv } from 'app/features/templating/template_srv'; import { TimeSrv } from 'app/features/dashboard/services/TimeSrv'; import { ExploreUrlState } from 'app/types'; +import { LoadingState } from '@grafana/data/src/types/data'; export interface PromDataQueryResponse { data: { @@ -174,61 +174,6 @@ export class PrometheusDatasource extends DataSourceApi return series; }; - runObserverQueries = ( - options: DataQueryRequest, - observer: DataStreamObserver, - queries: PromQueryRequest[], - activeTargets: PromQuery[], - end: number - ) => { - for (let index = 0; index < queries.length; index++) { - const query = queries[index]; - const target = activeTargets[index]; - let observable: Observable = null; - - if (query.instant) { - observable = from(this.performInstantQuery(query, end)); - } else { - observable = from(this.performTimeSeriesQuery(query, query.start, query.end)); - } - - observable - .pipe( - single(), // unsubscribes automatically after first result - filter((response: any) => (response.cancelled ? false : true)), - mergeMap((response: any) => { - const data = this.processResult(response, query, target, queries.length); - const state: DataStreamState = { - key: `prometheus-${target.refId}`, - state: LoadingState.Done, - request: options, - // TODO this is obviously wrong as data is not a DataFrame and needs to be dealt with later on - // in PanelQueryState - data: data as any, - unsubscribe: () => undefined, - }; - - return [state]; - }), - catchError(err => { - const error = this.handleErrors(err, target); - const state: DataStreamState = { - key: `prometheus-${target.refId}`, - request: options, - state: LoadingState.Error, - error, - unsubscribe: () => undefined, - }; - - return of(state); - }) - ) - .subscribe({ - next: state => observer(state), - }); - } - }; - prepareTargets = (options: DataQueryRequest, start: number, end: number) => { const queries: PromQueryRequest[] = []; const activeTargets: PromQuery[] = []; @@ -238,22 +183,35 @@ export class PrometheusDatasource extends DataSourceApi continue; } - if (target.context === PromContext.Explore) { - target.format = 'time_series'; - target.instant = false; + target.requestId = options.panelId + target.refId; + + if (target.context !== PromContext.Explore) { + activeTargets.push(target); + queries.push(this.createQuery(target, options, start, end)); + continue; + } + + if (target.showingTable) { + // create instant target only if Table is showed in Explore const instantTarget: any = _.cloneDeep(target); instantTarget.format = 'table'; instantTarget.instant = true; instantTarget.valueWithRefId = true; delete instantTarget.maxDataPoints; instantTarget.requestId += '_instant'; - instantTarget.refId += '_instant'; + activeTargets.push(instantTarget); queries.push(this.createQuery(instantTarget, options, start, end)); } - activeTargets.push(target); - queries.push(this.createQuery(target, options, start, end)); + if (target.showingGraph) { + // create time series target only if Graph is showed in Explore + target.format = 'time_series'; + target.instant = false; + + activeTargets.push(target); + queries.push(this.createQuery(target, options, start, end)); + } } return { @@ -262,54 +220,44 @@ export class PrometheusDatasource extends DataSourceApi }; }; - query(options: DataQueryRequest, observer?: DataStreamObserver): Promise<{ data: any }> { + query(options: DataQueryRequest): Observable { const start = this.getPrometheusTime(options.range.from, false); const end = this.getPrometheusTime(options.range.to, true); - - options = _.clone(options); const { queries, activeTargets } = this.prepareTargets(options, start, end); // No valid targets, return the empty result to save a round trip. if (_.isEmpty(queries)) { - return this.$q.when({ data: [] }) as Promise<{ data: any }>; + return of({ data: [] }); } - if ( - observer && - options.targets.filter(target => target.context === PromContext.Explore).length === options.targets.length - ) { - // using observer to make the instant query return immediately - this.runObserverQueries(options, observer, queries, activeTargets, end); - return this.$q.when({ data: [] }) as Promise<{ data: any }>; - } + const allInstant = queries.filter(query => query.instant).length === queries.length; + const allTimeSeries = queries.filter(query => !query.instant).length === queries.length; + const subQueries = queries.map((query, index) => { + const target = activeTargets[index]; + let observable: Observable = null; + const state: LoadingState = + allInstant || allTimeSeries ? LoadingState.Done : query.instant ? LoadingState.Loading : LoadingState.Done; - const allQueryPromise = _.map(queries, query => { if (query.instant) { - return this.performInstantQuery(query, end); + observable = from(this.performInstantQuery(query, end)); } else { - return this.performTimeSeriesQuery(query, query.start, query.end); + observable = from(this.performTimeSeriesQuery(query, query.start, query.end)); } + + return observable.pipe( + filter((response: any) => (response.cancelled ? false : true)), + map((response: any) => { + const data = this.processResult(response, query, target, queries.length); + return { + data, + key: query.requestId, + state, + } as DataQueryResponse; + }) + ); }); - const allPromise = this.$q.all(allQueryPromise).then((responseList: any) => { - let result: any[] = []; - - _.each(responseList, (response, index: number) => { - if (response.cancelled) { - return; - } - - const target = activeTargets[index]; - const query = queries[index]; - const series = this.processResult(response, query, target, queries.length); - - result = [...result, ...series]; - }); - - return { data: result }; - }); - - return allPromise as Promise<{ data: any }>; + return merge(...subQueries); } createQuery(target: PromQuery, options: DataQueryRequest, start: number, end: number) { @@ -318,8 +266,8 @@ export class PrometheusDatasource extends DataSourceApi instant: target.instant, step: 0, expr: '', - requestId: '', - refId: '', + requestId: target.requestId, + refId: target.refId, start: 0, end: 0, }; @@ -361,8 +309,6 @@ export class PrometheusDatasource extends DataSourceApi // Only replace vars in expression after having (possibly) updated interval vars query.expr = this.templateSrv.replace(expr, scopedVars, this.interpolateQueryExpr); - query.requestId = options.panelId + target.refId; - query.refId = target.refId; // Align query interval with step to allow query caching and to ensure // that about-same-time query results look the same. @@ -520,9 +466,17 @@ export class PrometheusDatasource extends DataSourceApi ...options, interval: step, }; + // Unsetting min interval for accurate event resolution const minStep = '1s'; - const query = this.createQuery({ expr, interval: minStep, refId: 'X' }, queryOptions, start, end); + const queryModel = { + expr, + interval: minStep, + refId: 'X', + requestId: `prom-query-${annotation.name}`, + }; + + const query = this.createQuery(queryModel, queryOptions, start, end); const self = this; return this.performTimeSeriesQuery(query, query.start, query.end).then((results: PromDataQueryResponse) => { diff --git a/public/app/plugins/datasource/prometheus/specs/datasource.test.ts b/public/app/plugins/datasource/prometheus/specs/datasource.test.ts index 1ad026bd349..ab2680904c0 100644 --- a/public/app/plugins/datasource/prometheus/specs/datasource.test.ts +++ b/public/app/plugins/datasource/prometheus/specs/datasource.test.ts @@ -9,8 +9,8 @@ import { prometheusSpecialRegexEscape, } from '../datasource'; import { dateTime } from '@grafana/data'; -import { DataSourceInstanceSettings, DataQueryResponseData } from '@grafana/ui'; -import { PromOptions } from '../types'; +import { DataSourceInstanceSettings, DataQueryResponseData, DataQueryRequest } from '@grafana/ui'; +import { PromOptions, PromQuery, PromContext } from '../types'; import { TemplateSrv } from 'app/features/templating/template_srv'; import { TimeSrv } from 'app/features/dashboard/services/TimeSrv'; import { CustomVariable } from 'app/features/templating/custom_variable'; @@ -179,8 +179,8 @@ describe('PrometheusDatasource', () => { }, ]; - ctx.ds.performTimeSeriesQuery = jest.fn().mockReturnValue(responseMock); - return ctx.ds.query(ctx.query).then((result: any) => { + ctx.ds.performTimeSeriesQuery = jest.fn().mockReturnValue([responseMock]); + ctx.ds.query(ctx.query).subscribe((result: any) => { const results = result.data; return expect(results).toMatchObject(expected); }); @@ -209,8 +209,8 @@ describe('PrometheusDatasource', () => { const expected = ['1', '2', '4', '+Inf']; - ctx.ds.performTimeSeriesQuery = jest.fn().mockReturnValue(responseMock); - return ctx.ds.query(ctx.query).then((result: any) => { + ctx.ds.performTimeSeriesQuery = jest.fn().mockReturnValue([responseMock]); + ctx.ds.query(ctx.query).subscribe((result: any) => { const seriesLabels = _.map(result.data, 'target'); return expect(seriesLabels).toEqual(expected); }); @@ -469,7 +469,7 @@ describe('PrometheusDatasource', () => { backendSrv.datasourceRequest = jest.fn(() => Promise.resolve(response)); ctx.ds = new PrometheusDatasource(instanceSettings, q, backendSrv as any, templateSrv as any, timeSrv as any); - await ctx.ds.query(query).then((data: any) => { + ctx.ds.query(query).subscribe((data: any) => { results = data; }); }); @@ -503,18 +503,14 @@ describe('PrometheusDatasource', () => { }, }; - beforeEach(async () => { + it('should generate an error', () => { backendSrv.datasourceRequest = jest.fn(() => Promise.reject(response)); ctx.ds = new PrometheusDatasource(instanceSettings, q, backendSrv as any, templateSrv as any, timeSrv as any); - - await ctx.ds.query(query).catch((e: any) => { + ctx.ds.query(query).subscribe((e: any) => { results = e.message; + expect(results).toBe(`"${errMessage}"`); }); }); - - it('should generate an error', () => { - expect(results).toBe(`"${errMessage}"`); - }); }); }); @@ -553,7 +549,7 @@ describe('PrometheusDatasource', () => { backendSrv.datasourceRequest = jest.fn(() => Promise.resolve(response)); ctx.ds = new PrometheusDatasource(instanceSettings, q, backendSrv as any, templateSrv as any, timeSrv as any); - await ctx.ds.query(query).then((data: any) => { + ctx.ds.query(query).subscribe((data: any) => { results = data; }); }); @@ -614,8 +610,7 @@ describe('PrometheusDatasource', () => { backendSrv.datasourceRequest = jest.fn(() => Promise.resolve(response)); ctx.ds = new PrometheusDatasource(instanceSettings, q, backendSrv as any, templateSrv as any, timeSrv as any); - - await ctx.ds.query(query).then((data: any) => { + ctx.ds.query(query).subscribe((data: any) => { results = data; }); }); @@ -816,7 +811,7 @@ describe('PrometheusDatasource', () => { backendSrv.datasourceRequest = jest.fn(() => Promise.resolve(response)); ctx.ds = new PrometheusDatasource(instanceSettings, q, backendSrv as any, templateSrv as any, timeSrv as any); - await ctx.ds.query(query).then((data: any) => { + ctx.ds.query(query).subscribe((data: any) => { results = data; }); }); @@ -853,7 +848,7 @@ describe('PrometheusDatasource', () => { backendSrv.datasourceRequest = jest.fn(() => Promise.resolve(response)); ctx.ds = new PrometheusDatasource(instanceSettings, q, backendSrv as any, templateSrv as any, timeSrv as any); - await ctx.ds.query(query); + ctx.ds.query(query); const res = backendSrv.datasourceRequest.mock.calls[0][0]; expect(res.method).toBe('GET'); expect(res.url).toBe(urlExpected); @@ -869,7 +864,7 @@ describe('PrometheusDatasource', () => { const urlExpected = 'proxied/api/v1/query_range?query=test&start=60&end=420&step=1'; backendSrv.datasourceRequest = jest.fn(() => Promise.resolve(response)); ctx.ds = new PrometheusDatasource(instanceSettings, q, backendSrv as any, templateSrv as any, timeSrv as any); - await ctx.ds.query(query); + ctx.ds.query(query); const res = backendSrv.datasourceRequest.mock.calls[0][0]; expect(res.method).toBe('GET'); expect(res.url).toBe(urlExpected); @@ -890,7 +885,7 @@ describe('PrometheusDatasource', () => { const urlExpected = 'proxied/api/v1/query_range?query=test&start=60&end=420&step=10'; backendSrv.datasourceRequest = jest.fn(() => Promise.resolve(response)); ctx.ds = new PrometheusDatasource(instanceSettings, q, backendSrv as any, templateSrv as any, timeSrv as any); - await ctx.ds.query(query); + ctx.ds.query(query); const res = backendSrv.datasourceRequest.mock.calls[0][0]; expect(res.method).toBe('GET'); expect(res.url).toBe(urlExpected); @@ -907,7 +902,7 @@ describe('PrometheusDatasource', () => { const urlExpected = 'proxied/api/v1/query_range?query=test&start=' + start + '&end=' + end + '&step=2'; backendSrv.datasourceRequest = jest.fn(() => Promise.resolve(response)); ctx.ds = new PrometheusDatasource(instanceSettings, q, backendSrv as any, templateSrv as any, timeSrv as any); - await ctx.ds.query(query); + ctx.ds.query(query); const res = backendSrv.datasourceRequest.mock.calls[0][0]; expect(res.method).toBe('GET'); expect(res.url).toBe(urlExpected); @@ -929,7 +924,7 @@ describe('PrometheusDatasource', () => { const urlExpected = 'proxied/api/v1/query_range?query=test&start=50&end=400&step=50'; backendSrv.datasourceRequest = jest.fn(() => Promise.resolve(response)); ctx.ds = new PrometheusDatasource(instanceSettings, q, backendSrv as any, templateSrv as any, timeSrv as any); - await ctx.ds.query(query); + ctx.ds.query(query); const res = backendSrv.datasourceRequest.mock.calls[0][0]; expect(res.method).toBe('GET'); expect(res.url).toBe(urlExpected); @@ -950,7 +945,7 @@ describe('PrometheusDatasource', () => { const urlExpected = 'proxied/api/v1/query_range?query=test' + '&start=60&end=420&step=15'; backendSrv.datasourceRequest = jest.fn(() => Promise.resolve(response)); ctx.ds = new PrometheusDatasource(instanceSettings, q, backendSrv as any, templateSrv as any, timeSrv as any); - await ctx.ds.query(query); + ctx.ds.query(query); const res = backendSrv.datasourceRequest.mock.calls[0][0]; expect(res.method).toBe('GET'); expect(res.url).toBe(urlExpected); @@ -972,7 +967,7 @@ describe('PrometheusDatasource', () => { const urlExpected = 'proxied/api/v1/query_range?query=test' + '&start=0&end=400&step=100'; backendSrv.datasourceRequest = jest.fn(() => Promise.resolve(response)); ctx.ds = new PrometheusDatasource(instanceSettings, q, backendSrv as any, templateSrv as any, timeSrv as any); - await ctx.ds.query(query); + ctx.ds.query(query); const res = backendSrv.datasourceRequest.mock.calls[0][0]; expect(res.method).toBe('GET'); expect(res.url).toBe(urlExpected); @@ -994,7 +989,7 @@ describe('PrometheusDatasource', () => { const urlExpected = 'proxied/api/v1/query_range?query=test' + '&start=' + start + '&end=' + end + '&step=100'; backendSrv.datasourceRequest = jest.fn(() => Promise.resolve(response)); ctx.ds = new PrometheusDatasource(instanceSettings, q, backendSrv as any, templateSrv as any, timeSrv as any); - await ctx.ds.query(query); + ctx.ds.query(query); const res = backendSrv.datasourceRequest.mock.calls[0][0]; expect(res.method).toBe('GET'); expect(res.url).toBe(urlExpected); @@ -1016,7 +1011,7 @@ describe('PrometheusDatasource', () => { const urlExpected = 'proxied/api/v1/query_range?query=test' + '&start=' + start + '&end=' + end + '&step=60'; backendSrv.datasourceRequest = jest.fn(() => Promise.resolve(response)); ctx.ds = new PrometheusDatasource(instanceSettings, q, backendSrv as any, templateSrv as any, timeSrv as any); - await ctx.ds.query(query); + ctx.ds.query(query); const res = backendSrv.datasourceRequest.mock.calls[0][0]; expect(res.method).toBe('GET'); expect(res.url).toBe(urlExpected); @@ -1059,7 +1054,7 @@ describe('PrometheusDatasource', () => { templateSrv.replace = jest.fn(str => str); backendSrv.datasourceRequest = jest.fn(() => Promise.resolve(response)); ctx.ds = new PrometheusDatasource(instanceSettings, q, backendSrv as any, templateSrv as any, timeSrv as any); - await ctx.ds.query(query); + ctx.ds.query(query); const res = backendSrv.datasourceRequest.mock.calls[0][0]; expect(res.method).toBe('GET'); expect(res.url).toBe(urlExpected); @@ -1099,7 +1094,7 @@ describe('PrometheusDatasource', () => { backendSrv.datasourceRequest = jest.fn(() => Promise.resolve(response)); templateSrv.replace = jest.fn(str => str); ctx.ds = new PrometheusDatasource(instanceSettings, q, backendSrv as any, templateSrv as any, timeSrv as any); - await ctx.ds.query(query); + ctx.ds.query(query); const res = backendSrv.datasourceRequest.mock.calls[0][0]; expect(res.method).toBe('GET'); expect(res.url).toBe(urlExpected); @@ -1140,7 +1135,7 @@ describe('PrometheusDatasource', () => { backendSrv.datasourceRequest = jest.fn(() => Promise.resolve(response)); templateSrv.replace = jest.fn(str => str); ctx.ds = new PrometheusDatasource(instanceSettings, q, backendSrv as any, templateSrv as any, timeSrv as any); - await ctx.ds.query(query); + ctx.ds.query(query); const res = backendSrv.datasourceRequest.mock.calls[0][0]; expect(res.method).toBe('GET'); expect(res.url).toBe(urlExpected); @@ -1187,7 +1182,7 @@ describe('PrometheusDatasource', () => { templateSrv.replace = jest.fn(str => str); backendSrv.datasourceRequest = jest.fn(() => Promise.resolve(response)); ctx.ds = new PrometheusDatasource(instanceSettings, q, backendSrv as any, templateSrv as any, timeSrv as any); - await ctx.ds.query(query); + ctx.ds.query(query); const res = backendSrv.datasourceRequest.mock.calls[0][0]; expect(res.method).toBe('GET'); expect(res.url).toBe(urlExpected); @@ -1228,7 +1223,7 @@ describe('PrometheusDatasource', () => { backendSrv.datasourceRequest = jest.fn(() => Promise.resolve(response)); ctx.ds = new PrometheusDatasource(instanceSettings, q, backendSrv as any, templateSrv as any, timeSrv as any); - await ctx.ds.query(query); + ctx.ds.query(query); const res = backendSrv.datasourceRequest.mock.calls[0][0]; expect(res.method).toBe('GET'); expect(res.url).toBe(urlExpected); @@ -1274,7 +1269,7 @@ describe('PrometheusDatasource', () => { backendSrv.datasourceRequest = jest.fn(() => Promise.resolve(response)); templateSrv.replace = jest.fn(str => str); ctx.ds = new PrometheusDatasource(instanceSettings, q, backendSrv as any, templateSrv as any, timeSrv as any); - await ctx.ds.query(query); + ctx.ds.query(query); const res = backendSrv.datasourceRequest.mock.calls[0][0]; expect(res.method).toBe('GET'); expect(res.url).toBe(urlExpected); @@ -1326,7 +1321,7 @@ describe('PrometheusDatasource', () => { templateSrv.replace = jest.fn(str => str); backendSrv.datasourceRequest = jest.fn(() => Promise.resolve(response)); ctx.ds = new PrometheusDatasource(instanceSettings, q, backendSrv as any, templateSrv as any, timeSrv as any); - await ctx.ds.query(query); + ctx.ds.query(query); const res = backendSrv.datasourceRequest.mock.calls[0][0]; expect(res.url).toBe(urlExpected); @@ -1391,7 +1386,7 @@ describe('PrometheusDatasource for POST', () => { }; backendSrv.datasourceRequest = jest.fn(() => Promise.resolve(response)); ctx.ds = new PrometheusDatasource(instanceSettings, q, backendSrv as any, templateSrv as any, timeSrv as any); - await ctx.ds.query(query).then((data: any) => { + ctx.ds.query(query).subscribe((data: any) => { results = data; }); }); @@ -1432,3 +1427,211 @@ describe('PrometheusDatasource for POST', () => { }); }); }); + +const getPrepareTargetsContext = (target: PromQuery) => { + const instanceSettings = ({ + url: 'proxied', + directUrl: 'direct', + user: 'test', + password: 'mupp', + jsonData: { httpMethod: 'POST' }, + } as unknown) as DataSourceInstanceSettings; + const start = 0; + const end = 1; + const panelId = '2'; + const options = ({ targets: [target], interval: '1s', panelId } as any) as DataQueryRequest; + + const ds = new PrometheusDatasource(instanceSettings, q, backendSrv as any, templateSrv as any, timeSrv as any); + const { queries, activeTargets } = ds.prepareTargets(options, start, end); + + return { + queries, + activeTargets, + start, + end, + panelId, + }; +}; + +describe('prepareTargets', () => { + describe('when run from a Panel', () => { + it('then it should just add targets', () => { + const target: PromQuery = { + refId: 'A', + expr: 'up', + context: PromContext.Panel, + }; + + const { queries, activeTargets, panelId, end, start } = getPrepareTargetsContext(target); + + expect(queries.length).toBe(1); + expect(activeTargets.length).toBe(1); + expect(queries[0]).toEqual({ + end, + expr: 'up', + headers: { + 'X-Dashboard-Id': undefined, + 'X-Panel-Id': panelId, + }, + hinting: undefined, + instant: undefined, + refId: target.refId, + requestId: panelId + target.refId, + start, + step: 1, + }); + expect(activeTargets[0]).toEqual(target); + }); + }); + + describe('when run from Explore', () => { + describe('and both Graph and Table are shown', () => { + it('then it should return both instant and time series related objects', () => { + const target: PromQuery = { + refId: 'A', + expr: 'up', + context: PromContext.Explore, + showingGraph: true, + showingTable: true, + }; + + const { queries, activeTargets, panelId, end, start } = getPrepareTargetsContext(target); + + expect(queries.length).toBe(2); + expect(activeTargets.length).toBe(2); + expect(queries[0]).toEqual({ + end, + expr: 'up', + headers: { + 'X-Dashboard-Id': undefined, + 'X-Panel-Id': panelId, + }, + hinting: undefined, + instant: true, + refId: target.refId, + requestId: panelId + target.refId + '_instant', + start, + step: 1, + }); + expect(activeTargets[0]).toEqual({ + ...target, + format: 'table', + instant: true, + requestId: panelId + target.refId + '_instant', + valueWithRefId: true, + }); + expect(queries[1]).toEqual({ + end, + expr: 'up', + headers: { + 'X-Dashboard-Id': undefined, + 'X-Panel-Id': panelId, + }, + hinting: undefined, + instant: false, + refId: target.refId, + requestId: panelId + target.refId, + start, + step: 1, + }); + expect(activeTargets[1]).toEqual({ + ...target, + format: 'time_series', + instant: false, + requestId: panelId + target.refId, + }); + }); + }); + describe('and both Graph and Table are hidden', () => { + it('then it should return empty arrays', () => { + const target: PromQuery = { + refId: 'A', + expr: 'up', + context: PromContext.Explore, + showingGraph: false, + showingTable: false, + }; + + const { queries, activeTargets } = getPrepareTargetsContext(target); + + expect(queries.length).toBe(0); + expect(activeTargets.length).toBe(0); + }); + }); + + describe('and Graph is hidden', () => { + it('then it should return only intant related objects', () => { + const target: PromQuery = { + refId: 'A', + expr: 'up', + context: PromContext.Explore, + showingGraph: false, + showingTable: true, + }; + + const { queries, activeTargets, panelId, end, start } = getPrepareTargetsContext(target); + + expect(queries.length).toBe(1); + expect(activeTargets.length).toBe(1); + expect(queries[0]).toEqual({ + end, + expr: 'up', + headers: { + 'X-Dashboard-Id': undefined, + 'X-Panel-Id': panelId, + }, + hinting: undefined, + instant: true, + refId: target.refId, + requestId: panelId + target.refId + '_instant', + start, + step: 1, + }); + expect(activeTargets[0]).toEqual({ + ...target, + format: 'table', + instant: true, + requestId: panelId + target.refId + '_instant', + valueWithRefId: true, + }); + }); + }); + + describe('and Table is hidden', () => { + it('then it should return only time series related objects', () => { + const target: PromQuery = { + refId: 'A', + expr: 'up', + context: PromContext.Explore, + showingGraph: true, + showingTable: false, + }; + + const { queries, activeTargets, panelId, end, start } = getPrepareTargetsContext(target); + + expect(queries.length).toBe(1); + expect(activeTargets.length).toBe(1); + expect(queries[0]).toEqual({ + end, + expr: 'up', + headers: { + 'X-Dashboard-Id': undefined, + 'X-Panel-Id': panelId, + }, + hinting: undefined, + instant: false, + refId: target.refId, + requestId: panelId + target.refId, + start, + step: 1, + }); + expect(activeTargets[0]).toEqual({ + ...target, + format: 'time_series', + instant: false, + requestId: panelId + target.refId, + }); + }); + }); + }); +}); diff --git a/public/app/plugins/datasource/prometheus/types.ts b/public/app/plugins/datasource/prometheus/types.ts index dc548ae13ea..387b9c9e3d0 100644 --- a/public/app/plugins/datasource/prometheus/types.ts +++ b/public/app/plugins/datasource/prometheus/types.ts @@ -15,6 +15,9 @@ export interface PromQuery extends DataQuery { intervalFactor?: number; legendFormat?: string; valueWithRefId?: boolean; + requestId?: string; + showingGraph?: boolean; + showingTable?: boolean; } export interface PromOptions extends DataSourceJsonData { diff --git a/public/app/plugins/datasource/testdata/LogIpsum.ts b/public/app/plugins/datasource/testdata/LogIpsum.ts new file mode 100644 index 00000000000..a1afe7023fb --- /dev/null +++ b/public/app/plugins/datasource/testdata/LogIpsum.ts @@ -0,0 +1,162 @@ +import { LogLevel } from '@grafana/data'; + +let index = 0; + +export function getRandomLogLevel(): LogLevel { + const v = Math.random(); + if (v > 0.9) { + return LogLevel.critical; + } + if (v > 0.8) { + return LogLevel.error; + } + if (v > 0.7) { + return LogLevel.warning; + } + if (v > 0.4) { + return LogLevel.info; + } + if (v > 0.3) { + return LogLevel.debug; + } + if (v > 0.1) { + return LogLevel.trace; + } + return LogLevel.unknown; +} + +export function getNextWord() { + index = (index + Math.floor(Math.random() * 5)) % words.length; + return words[index]; +} + +export function getRandomLine(length = 60) { + let line = getNextWord(); + while (line.length < length) { + line += ' ' + getNextWord(); + } + return line; +} + +const words = [ + 'At', + 'vero', + 'eos', + 'et', + 'accusamus', + 'et', + 'iusto', + 'odio', + 'dignissimos', + 'ducimus', + 'qui', + 'blanditiis', + 'praesentium', + 'voluptatum', + 'deleniti', + 'atque', + 'corrupti', + 'quos', + 'dolores', + 'et', + 'quas', + 'molestias', + 'excepturi', + 'sint', + 'occaecati', + 'cupiditate', + 'non', + 'provident', + 'similique', + 'sunt', + 'in', + 'culpa', + 'qui', + 'officia', + 'deserunt', + 'mollitia', + 'animi', + 'id', + 'est', + 'laborum', + 'et', + 'dolorum', + 'fuga', + 'Et', + 'harum', + 'quidem', + 'rerum', + 'facilis', + 'est', + 'et', + 'expedita', + 'distinctio', + 'Nam', + 'libero', + 'tempore', + 'cum', + 'soluta', + 'nobis', + 'est', + 'eligendi', + 'optio', + 'cumque', + 'nihil', + 'impedit', + 'quo', + 'minus', + 'id', + 'quod', + 'maxime', + 'placeat', + 'facere', + 'possimus', + 'omnis', + 'voluptas', + 'assumenda', + 'est', + 'omnis', + 'dolor', + 'repellendus', + 'Temporibus', + 'autem', + 'quibusdam', + 'et', + 'aut', + 'officiis', + 'debitis', + 'aut', + 'rerum', + 'necessitatibus', + 'saepe', + 'eveniet', + 'ut', + 'et', + 'voluptates', + 'repudiandae', + 'sint', + 'et', + 'molestiae', + 'non', + 'recusandae', + 'Itaque', + 'earum', + 'rerum', + 'hic', + 'tenetur', + 'a', + 'sapiente', + 'delectus', + 'ut', + 'aut', + 'reiciendis', + 'voluptatibus', + 'maiores', + 'alias', + 'consequatur', + 'aut', + 'perferendis', + 'doloribus', + 'asperiores', + 'repellat', +]; diff --git a/public/app/plugins/datasource/testdata/datasource.ts b/public/app/plugins/datasource/testdata/datasource.ts index 2605e779678..737db5df4a0 100644 --- a/public/app/plugins/datasource/testdata/datasource.ts +++ b/public/app/plugins/datasource/testdata/datasource.ts @@ -3,95 +3,83 @@ import { DataSourceApi, DataQueryRequest, DataSourceInstanceSettings, - DataStreamObserver, + DataQueryResponse, MetricFindValue, } from '@grafana/ui'; import { TableData, TimeSeries } from '@grafana/data'; import { TestDataQuery, Scenario } from './types'; import { getBackendSrv } from 'app/core/services/backend_srv'; -import { StreamHandler } from './StreamHandler'; import { queryMetricTree } from './metricTree'; +import { Observable, from, merge } from 'rxjs'; +import { runStream } from './runStreams'; import templateSrv from 'app/features/templating/template_srv'; type TestData = TimeSeries | TableData; -export interface TestDataRegistry { - [key: string]: TestData[]; -} - export class TestDataDataSource extends DataSourceApi { - streams = new StreamHandler(); - - /** @ngInject */ constructor(instanceSettings: DataSourceInstanceSettings) { super(instanceSettings); } - query(options: DataQueryRequest, observer: DataStreamObserver) { - const queries = options.targets.map(item => { - return { - ...item, - intervalMs: options.intervalMs, - maxDataPoints: options.maxDataPoints, - datasourceId: this.id, - alias: templateSrv.replace(item.alias || ''), - }; - }); + query(options: DataQueryRequest): Observable { + const queries: any[] = []; + const streams: Array> = []; - if (queries.length === 0) { - return Promise.resolve({ data: [] }); + // Start streams and prepare queries + for (const target of options.targets) { + if (target.scenarioId === 'streaming_client') { + streams.push(runStream(target, options)); + } else { + queries.push({ + ...target, + intervalMs: options.intervalMs, + maxDataPoints: options.maxDataPoints, + datasourceId: this.id, + alias: templateSrv.replace(target.alias || ''), + }); + } } - // Currently we do not support mixed with client only streaming - const resp = this.streams.process(options, observer); - if (resp) { - return Promise.resolve(resp); + if (queries.length) { + const req: Promise = getBackendSrv() + .datasourceRequest({ + method: 'POST', + url: '/api/tsdb/query', + data: { + from: options.range.from.valueOf().toString(), + to: options.range.to.valueOf().toString(), + queries: queries, + }, + // This sets up a cancel token + requestId: options.requestId, + }) + .then((res: any) => this.processQueryResult(queries, res)); + + streams.push(from(req)); } - return getBackendSrv() - .datasourceRequest({ - method: 'POST', - url: '/api/tsdb/query', - data: { - from: options.range.from.valueOf().toString(), - to: options.range.to.valueOf().toString(), - queries: queries, - }, + return merge(...streams); + } - // This sets up a cancel token - requestId: options.requestId, - }) - .then((res: any) => { - const data: TestData[] = []; + processQueryResult(queries: any, res: any): DataQueryResponse { + const data: TestData[] = []; - // Returns data in the order it was asked for. - // if the response has data with different refId, it is ignored - for (const query of queries) { - const results = res.data.results[query.refId]; - if (!results) { - console.warn('No Results for:', query); - continue; - } + for (const query of queries) { + const results = res.data.results[query.refId]; - for (const t of results.tables || []) { - const table = t as TableData; - table.refId = query.refId; - table.name = query.alias; - data.push(table); - } + for (const t of results.tables || []) { + const table = t as TableData; + table.refId = query.refId; + table.name = query.alias; + data.push(table); + } - for (const series of results.series || []) { - data.push({ - target: series.name, - datapoints: series.points, - refId: query.refId, - tags: series.tags, - }); - } - } + for (const series of results.series || []) { + data.push({ target: series.name, datapoints: series.points, refId: query.refId, tags: series.tags }); + } + } - return { data: data }; - }); + return { data }; } annotationQuery(options: any) { diff --git a/public/app/plugins/datasource/testdata/query_ctrl.ts b/public/app/plugins/datasource/testdata/query_ctrl.ts index b311cf212be..46d7003e985 100644 --- a/public/app/plugins/datasource/testdata/query_ctrl.ts +++ b/public/app/plugins/datasource/testdata/query_ctrl.ts @@ -1,7 +1,7 @@ import _ from 'lodash'; import { QueryCtrl } from 'app/plugins/sdk'; -import { defaultQuery } from './StreamHandler'; +import { defaultQuery } from './runStreams'; import { getBackendSrv } from 'app/core/services/backend_srv'; import { dateTime } from '@grafana/data'; diff --git a/public/app/plugins/datasource/testdata/runStreams.ts b/public/app/plugins/datasource/testdata/runStreams.ts new file mode 100644 index 00000000000..12e7168c1c3 --- /dev/null +++ b/public/app/plugins/datasource/testdata/runStreams.ts @@ -0,0 +1,224 @@ +import { defaults } from 'lodash'; +import { Observable } from 'rxjs'; + +import { DataQueryRequest, DataQueryResponse } from '@grafana/ui'; + +import { FieldType, CircularDataFrame, CSVReader, Field, LoadingState } from '@grafana/data'; + +import { TestDataQuery, StreamingQuery } from './types'; +import { getRandomLine } from './LogIpsum'; + +export const defaultQuery: StreamingQuery = { + type: 'signal', + speed: 250, // ms + spread: 3.5, + noise: 2.2, + bands: 1, +}; + +export function runStream(target: TestDataQuery, req: DataQueryRequest): Observable { + const query = defaults(target.stream, defaultQuery); + if ('signal' === query.type) { + return runSignalStream(target, query, req); + } + if ('logs' === query.type) { + return runLogsStream(target, query, req); + } + if ('fetch' === query.type) { + return runFetchStream(target, query, req); + } + throw new Error(`Unknown Stream Type: ${query.type}`); +} + +export function runSignalStream( + target: TestDataQuery, + query: StreamingQuery, + req: DataQueryRequest +): Observable { + return new Observable(subscriber => { + const streamId = `signal-${req.panelId}-${target.refId}`; + const maxDataPoints = req.maxDataPoints || 1000; + + const data = new CircularDataFrame({ + append: 'tail', + capacity: maxDataPoints, + }); + data.refId = target.refId; + data.name = target.alias || 'Signal ' + target.refId; + data.addField({ name: 'time', type: FieldType.time }); + data.addField({ name: 'value', type: FieldType.number }); + + const { spread, speed, bands, noise } = query; + + for (let i = 0; i < bands; i++) { + const suffix = bands > 1 ? ` ${i + 1}` : ''; + data.addField({ name: 'Min' + suffix, type: FieldType.number }); + data.addField({ name: 'Max' + suffix, type: FieldType.number }); + } + + let value = Math.random() * 100; + let timeoutId: any = null; + + const addNextRow = (time: number) => { + value += (Math.random() - 0.5) * spread; + + let idx = 0; + data.fields[idx++].values.add(time); + data.fields[idx++].values.add(value); + + let min = value; + let max = value; + + for (let i = 0; i < bands; i++) { + min = min - Math.random() * noise; + max = max + Math.random() * noise; + + data.fields[idx++].values.add(min); + data.fields[idx++].values.add(max); + } + }; + + // Fill the buffer on init + if (true) { + let time = Date.now() - maxDataPoints * speed; + for (let i = 0; i < maxDataPoints; i++) { + addNextRow(time); + time += speed; + } + } + + const pushNextEvent = () => { + addNextRow(Date.now()); + subscriber.next({ + data: [data], + key: streamId, + }); + + timeoutId = setTimeout(pushNextEvent, speed); + }; + + // Send first event in 5ms + setTimeout(pushNextEvent, 5); + + return () => { + console.log('unsubscribing to stream ' + streamId); + clearTimeout(timeoutId); + }; + }); +} + +export function runLogsStream( + target: TestDataQuery, + query: StreamingQuery, + req: DataQueryRequest +): Observable { + return new Observable(subscriber => { + const streamId = `logs-${req.panelId}-${target.refId}`; + const maxDataPoints = req.maxDataPoints || 1000; + + const data = new CircularDataFrame({ + append: 'tail', + capacity: maxDataPoints, + }); + data.refId = target.refId; + data.name = target.alias || 'Logs ' + target.refId; + data.addField({ name: 'time', type: FieldType.time }); + data.addField({ name: 'line', type: FieldType.string }); + + const { speed } = query; + + let timeoutId: any = null; + + const pushNextEvent = () => { + data.values.time.add(Date.now()); + data.values.line.add(getRandomLine()); + + subscriber.next({ + data: [data], + key: streamId, + }); + + timeoutId = setTimeout(pushNextEvent, speed); + }; + + // Send first event in 5ms + setTimeout(pushNextEvent, 5); + + return () => { + console.log('unsubscribing to stream ' + streamId); + clearTimeout(timeoutId); + }; + }); +} + +export function runFetchStream( + target: TestDataQuery, + query: StreamingQuery, + req: DataQueryRequest +): Observable { + return new Observable(subscriber => { + const streamId = `fetch-${req.panelId}-${target.refId}`; + const maxDataPoints = req.maxDataPoints || 1000; + + let data = new CircularDataFrame({ + append: 'tail', + capacity: maxDataPoints, + }); + data.refId = target.refId; + data.name = target.alias || 'Fetch ' + target.refId; + + let reader: ReadableStreamReader; + const csv = new CSVReader({ + callback: { + onHeader: (fields: Field[]) => { + // Clear any existing fields + if (data.fields.length) { + data = new CircularDataFrame({ + append: 'tail', + capacity: maxDataPoints, + }); + data.refId = target.refId; + data.name = 'Fetch ' + target.refId; + } + for (const field of fields) { + data.addField(field); + } + }, + onRow: (row: any[]) => { + data.add(row); + }, + }, + }); + + const processChunk = (value: ReadableStreamReadResult): any => { + if (value.value) { + const text = new TextDecoder().decode(value.value); + csv.readCSV(text); + } + + subscriber.next({ + data: [data], + key: streamId, + state: value.done ? LoadingState.Done : LoadingState.Streaming, + }); + + if (value.done) { + console.log('Finished stream'); + subscriber.complete(); // necessary? + return; + } + + return reader.read().then(processChunk); + }; + + fetch(new Request(query.url)).then(response => { + reader = response.body.getReader(); + reader.read().then(processChunk); + }); + + return () => { + // Cancel fetch? + console.log('unsubscribing to stream ' + streamId); + }; + }); +} diff --git a/public/app/plugins/panel/graph/module.ts b/public/app/plugins/panel/graph/module.ts index c7bdfd99031..56b9bab8ec5 100644 --- a/public/app/plugins/panel/graph/module.ts +++ b/public/app/plugins/panel/graph/module.ts @@ -13,8 +13,7 @@ import config from 'app/core/config'; import TimeSeries from 'app/core/time_series2'; import { DataFrame, DataLink, DateTimeInput } from '@grafana/data'; import { getColorFromHexRgbOrName, LegacyResponseData, VariableSuggestion } from '@grafana/ui'; -import { getProcessedDataFrames } from 'app/features/dashboard/state/PanelQueryState'; -import { PanelQueryRunnerFormat } from 'app/features/dashboard/state/PanelQueryRunner'; +import { getProcessedDataFrames } from 'app/features/dashboard/state/runRequest'; import { GraphContextMenuCtrl } from './GraphContextMenuCtrl'; import { getDataLinksVariableSuggestions } from 'app/features/panel/panellinks/link_srv'; @@ -143,7 +142,7 @@ class GraphCtrl extends MetricsPanelCtrl { _.defaults(this.panel.xaxis, this.panelDefaults.xaxis); _.defaults(this.panel.options, this.panelDefaults.options); - this.dataFormat = PanelQueryRunnerFormat.frames; + this.useDataFrames = true; this.processor = new DataProcessor(this.panel); this.contextMenuCtrl = new GraphContextMenuCtrl($scope); diff --git a/public/app/plugins/panel/graph/specs/data_processor.test.ts b/public/app/plugins/panel/graph/specs/data_processor.test.ts index da95160b066..8b76536b39c 100644 --- a/public/app/plugins/panel/graph/specs/data_processor.test.ts +++ b/public/app/plugins/panel/graph/specs/data_processor.test.ts @@ -1,5 +1,5 @@ import { DataProcessor } from '../data_processor'; -import { getProcessedDataFrames } from 'app/features/dashboard/state/PanelQueryState'; +import { getProcessedDataFrames } from 'app/features/dashboard/state/runRequest'; describe('Graph DataProcessor', () => { const panel: any = { diff --git a/public/app/plugins/panel/singlestat/module.ts b/public/app/plugins/panel/singlestat/module.ts index 7ff672a67d5..d73cb9b5bb0 100644 --- a/public/app/plugins/panel/singlestat/module.ts +++ b/public/app/plugins/panel/singlestat/module.ts @@ -28,8 +28,7 @@ import { } from '@grafana/data'; import { auto } from 'angular'; import { LinkSrv } from 'app/features/panel/panellinks/link_srv'; -import { PanelQueryRunnerFormat } from 'app/features/dashboard/state/PanelQueryRunner'; -import { getProcessedDataFrames } from 'app/features/dashboard/state/PanelQueryState'; +import { getProcessedDataFrames } from 'app/features/dashboard/state/runRequest'; const BASE_FONT_SIZE = 38; @@ -124,7 +123,7 @@ class SingleStatCtrl extends MetricsPanelCtrl { this.events.on('data-snapshot-load', this.onDataReceived.bind(this)); this.events.on('init-edit-mode', this.onInitEditMode.bind(this)); - this.dataFormat = PanelQueryRunnerFormat.frames; + this.useDataFrames = true; this.onSparklineColorChange = this.onSparklineColorChange.bind(this); this.onSparklineFillChange = this.onSparklineFillChange.bind(this); diff --git a/public/app/types/explore.ts b/public/app/types/explore.ts index 7ddc236de34..37427dd6904 100644 --- a/public/app/types/explore.ts +++ b/public/app/types/explore.ts @@ -1,3 +1,4 @@ +import { Unsubscribable } from 'rxjs'; import { ComponentClass } from 'react'; import { DataQuery, @@ -21,7 +22,6 @@ import { import { Emitter } from 'app/core/core'; import TableModel from 'app/core/table_model'; -import { PanelQueryState } from '../features/dashboard/state/PanelQueryState'; export enum ExploreMode { Metrics = 'Metrics', @@ -263,7 +263,7 @@ export interface ExploreItemState { isPaused: boolean; urlReplaced: boolean; - queryState: PanelQueryState; + querySubscription?: Unsubscribable; queryResponse: PanelData; originPanelId?: number; diff --git a/public/test/core/redux/reducerTester.ts b/public/test/core/redux/reducerTester.ts index db645e074c3..ffcbbf20ee3 100644 --- a/public/test/core/redux/reducerTester.ts +++ b/public/test/core/redux/reducerTester.ts @@ -11,7 +11,7 @@ export interface When { } export interface Then { - thenStateShouldEqual: (state: State) => Then; + thenStateShouldEqual: (state: State) => When; } interface ObjectType extends Object { @@ -62,12 +62,12 @@ export const reducerTester = (): Given => { }; const whenActionIsDispatched = (action: ActionOf): Then => { - resultingState = reducerUnderTest(initialState, action); + resultingState = reducerUnderTest(resultingState || initialState, action); return instance; }; - const thenStateShouldEqual = (state: State): Then => { + const thenStateShouldEqual = (state: State): When => { expect(state).toEqual(resultingState); return instance;