diff --git a/public/app/features/explore/state/query.test.ts b/public/app/features/explore/state/query.test.ts index 52dbe7530b1..d6a00955f8c 100644 --- a/public/app/features/explore/state/query.test.ts +++ b/public/app/features/explore/state/query.test.ts @@ -171,7 +171,7 @@ describe('runQueries', () => { expect(datasource.getDataProvider).toHaveBeenCalledWith( type, expect.objectContaining({ - requestId: `explore_left_${snakeCase(type)}`, + requestId: `explore_left_${snakeCase(type)}_0`, }) ); } diff --git a/public/app/features/explore/state/query.ts b/public/app/features/explore/state/query.ts index ce92375a47d..822f8fbc970 100644 --- a/public/app/features/explore/state/query.ts +++ b/public/app/features/explore/state/query.ts @@ -17,6 +17,7 @@ import { LogsVolumeType, PanelEvents, QueryFixAction, + ScopedVars, SupplementaryQueryType, toLegacyResponseData, } from '@grafana/data'; @@ -37,7 +38,14 @@ import { CorrelationData } from 'app/features/correlations/useCorrelations'; import { getTimeZone } from 'app/features/profile/state/selectors'; import { MIXED_DATASOURCE_NAME } from 'app/plugins/datasource/mixed/MixedDataSource'; import { store } from 'app/store/store'; -import { createAsyncThunk, ExploreItemState, ExplorePanelData, ThunkDispatch, ThunkResult } from 'app/types'; +import { + createAsyncThunk, + ExploreItemState, + ExplorePanelData, + QueryTransaction, + ThunkDispatch, + ThunkResult, +} from 'app/types'; import { ExploreId, ExploreState, QueryOptions, SupplementaryQueries } from 'app/types/explore'; import { notifyApp } from '../../../core/actions'; @@ -607,44 +615,17 @@ export const runQueries = ( dispatch(cleanSupplementaryQueryAction({ exploreId, type })); } } else { - for (const type of supplementaryQueryTypes) { - // We always prepare provider, even is supplementary query is disabled because when the user - // enables the query, we need to load the data, so we need the provider - const dataProvider = getSupplementaryQueryProvider( + dispatch( + handleSupplementaryQueries({ + exploreId, datasourceInstance, - type, - { - ...transaction.request, - requestId: `${transaction.request.requestId}_${snakeCase(type)}`, - }, - newQuerySource - ); - - if (dataProvider) { - dispatch( - storeSupplementaryQueryDataProviderAction({ - exploreId, - type, - dataProvider, - }) - ); - - if (!canReuseSupplementaryQueryData(supplementaryQueries[type].data, queries, absoluteRange)) { - dispatch(cleanSupplementaryQueryAction({ exploreId, type })); - if (supplementaryQueries[type].enabled) { - dispatch(loadSupplementaryQueryData(exploreId, type)); - } - } - } else { - // If data source instance doesn't support this supplementary query, we clean the data provider - dispatch( - cleanSupplementaryQueryDataProviderAction({ - exploreId, - type, - }) - ); - } - } + transaction, + newQuerySource, + supplementaryQueries, + queries, + absoluteRange, + }) + ); } } @@ -652,6 +633,95 @@ export const runQueries = ( }; }; +const groupDataQueries = async (datasources: DataQuery[], scopedVars: ScopedVars) => { + const nonMixedDataSources = datasources.filter((t) => { + return t.datasource?.uid !== MIXED_DATASOURCE_NAME; + }); + const sets: { [key: string]: DataQuery[] } = groupBy(nonMixedDataSources, 'datasource.uid'); + + return await Promise.all( + Object.values(sets).map(async (targets) => { + const datasource = await getDataSourceSrv().get(targets[0].datasource, scopedVars); + return { + datasource, + targets, + }; + }) + ); +}; + +type HandleSupplementaryQueriesOptions = { + exploreId: ExploreId; + transaction: QueryTransaction; + datasourceInstance: DataSourceApi; + newQuerySource: Observable; + supplementaryQueries: SupplementaryQueries; + queries: DataQuery[]; + absoluteRange: AbsoluteTimeRange; +}; + +const handleSupplementaryQueries = createAsyncThunk( + 'explore/handleSupplementaryQueries', + async ( + { + datasourceInstance, + exploreId, + transaction, + newQuerySource, + supplementaryQueries, + queries, + absoluteRange, + }: HandleSupplementaryQueriesOptions, + { dispatch } + ) => { + let groupedQueries; + if (datasourceInstance.meta.mixed) { + groupedQueries = await groupDataQueries(transaction.request.targets, transaction.request.scopedVars); + } else { + groupedQueries = [{ datasource: datasourceInstance, targets: transaction.request.targets }]; + } + + for (const type of supplementaryQueryTypes) { + // We always prepare provider, even is supplementary query is disabled because when the user + // enables the query, we need to load the data, so we need the provider + const dataProvider = getSupplementaryQueryProvider( + groupedQueries, + type, + { + ...transaction.request, + requestId: `${transaction.request.requestId}_${snakeCase(type)}`, + }, + newQuerySource + ); + + if (dataProvider) { + dispatch( + storeSupplementaryQueryDataProviderAction({ + exploreId, + type, + dataProvider, + }) + ); + + if (!canReuseSupplementaryQueryData(supplementaryQueries[type].data, queries, absoluteRange)) { + dispatch(cleanSupplementaryQueryAction({ exploreId, type })); + if (supplementaryQueries[type].enabled) { + dispatch(loadSupplementaryQueryData(exploreId, type)); + } + } + } else { + // If data source instance doesn't support this supplementary query, we clean the data provider + dispatch( + cleanSupplementaryQueryDataProviderAction({ + exploreId, + type, + }) + ); + } + } + } +); + /** * Checks if after changing the time range the existing data can be used to show supplementary query. * It can happen if queries are the same and new time range is within existing data time range. diff --git a/public/app/features/explore/utils/supplementaryQueries.test.ts b/public/app/features/explore/utils/supplementaryQueries.test.ts index ce023643a2c..ce85095d7e1 100644 --- a/public/app/features/explore/utils/supplementaryQueries.test.ts +++ b/public/app/features/explore/utils/supplementaryQueries.test.ts @@ -133,7 +133,6 @@ const datasources: DataSourceApi[] = [ ), new MockDataSourceApi('no-data-providers'), new MockDataSourceApi('no-data-providers-2'), - new MockDataSourceApi('mixed').setupMixed(true), ]; jest.mock('@grafana/runtime', () => ({ @@ -145,17 +144,23 @@ jest.mock('@grafana/runtime', () => ({ }, })); -const setup = async (rootDataSource: string, type: SupplementaryQueryType, targetSources?: string[]) => { - const rootDataSourceApiMock = await getDataSourceSrv().get({ uid: rootDataSource }); - - targetSources = targetSources || [rootDataSource]; - +const setup = async (targetSources: string[], type: SupplementaryQueryType) => { const requestMock = new MockDataQueryRequest({ targets: targetSources.map((source, i) => new MockQuery(`${i}`, 'a', { uid: source })), }); const explorePanelDataMock: Observable = mockExploreDataWithLogs(); - return getSupplementaryQueryProvider(rootDataSourceApiMock, type, requestMock, explorePanelDataMock); + const datasources = await Promise.all( + targetSources.map(async (source, i) => { + const datasource = await getDataSourceSrv().get({ uid: source }); + return { + datasource, + targets: [new MockQuery(`${i}`, 'a', { uid: source })], + }; + }) + ); + + return getSupplementaryQueryProvider(datasources, type, requestMock, explorePanelDataMock); }; const assertDataFrom = (type: SupplementaryQueryType, ...datasources: string[]) => { @@ -173,7 +178,7 @@ const assertDataFromLogsResults = () => { describe('SupplementaryQueries utils', function () { describe('Non-mixed data source', function () { it('Returns result from the provider', async () => { - const testProvider = await setup('logs-volume-a', SupplementaryQueryType.LogsVolume); + const testProvider = await setup(['logs-volume-a'], SupplementaryQueryType.LogsVolume); await expect(testProvider).toEmitValuesWith((received) => { expect(received).toMatchObject([ @@ -186,7 +191,7 @@ describe('SupplementaryQueries utils', function () { }); }); it('Uses fallback for logs volume', async () => { - const testProvider = await setup('no-data-providers', SupplementaryQueryType.LogsVolume); + const testProvider = await setup(['no-data-providers'], SupplementaryQueryType.LogsVolume); await expect(testProvider).toEmitValuesWith((received) => { expect(received).toMatchObject([ @@ -198,14 +203,11 @@ describe('SupplementaryQueries utils', function () { }); }); it('Returns undefined for logs sample', async () => { - const testProvider = await setup('no-data-providers', SupplementaryQueryType.LogsSample); + const testProvider = await setup(['no-data-providers'], SupplementaryQueryType.LogsSample); await expect(testProvider).toBe(undefined); }); it('Creates single fallback result', async () => { - const testProvider = await setup('no-data-providers', SupplementaryQueryType.LogsVolume, [ - 'no-data-providers', - 'no-data-providers-2', - ]); + const testProvider = await setup(['no-data-providers', 'no-data-providers-2'], SupplementaryQueryType.LogsVolume); await expect(testProvider).toEmitValuesWith((received) => { expect(received).toMatchObject([ @@ -226,10 +228,7 @@ describe('SupplementaryQueries utils', function () { describe('Logs volume', function () { describe('All data sources support full range logs volume', function () { it('Merges all data frames into a single response', async () => { - const testProvider = await setup('mixed', SupplementaryQueryType.LogsVolume, [ - 'logs-volume-a', - 'logs-volume-b', - ]); + const testProvider = await setup(['logs-volume-a', 'logs-volume-b'], SupplementaryQueryType.LogsVolume); await expect(testProvider).toEmitValuesWith((received) => { expect(received).toMatchObject([ { data: [], state: LoadingState.Loading }, @@ -248,10 +247,10 @@ describe('SupplementaryQueries utils', function () { describe('All data sources do not support full range logs volume', function () { it('Creates single fallback result', async () => { - const testProvider = await setup('mixed', SupplementaryQueryType.LogsVolume, [ - 'no-data-providers', - 'no-data-providers-2', - ]); + const testProvider = await setup( + ['no-data-providers', 'no-data-providers-2'], + SupplementaryQueryType.LogsVolume + ); await expect(testProvider).toEmitValuesWith((received) => { expect(received).toMatchObject([ @@ -270,12 +269,10 @@ describe('SupplementaryQueries utils', function () { describe('Some data sources support full range logs volume, while others do not', function () { it('Creates merged result containing full range and limited logs volume', async () => { - const testProvider = await setup('mixed', SupplementaryQueryType.LogsVolume, [ - 'logs-volume-a', - 'no-data-providers', - 'logs-volume-b', - 'no-data-providers-2', - ]); + const testProvider = await setup( + ['logs-volume-a', 'no-data-providers', 'logs-volume-b', 'no-data-providers-2'], + SupplementaryQueryType.LogsVolume + ); await expect(testProvider).toEmitValuesWith((received) => { expect(received).toMatchObject([ { @@ -310,10 +307,7 @@ describe('SupplementaryQueries utils', function () { describe('Logs sample', function () { describe('All data sources support logs sample', function () { it('Merges all responses into single result', async () => { - const testProvider = await setup('mixed', SupplementaryQueryType.LogsSample, [ - 'logs-sample-a', - 'logs-sample-b', - ]); + const testProvider = await setup(['logs-sample-a', 'logs-sample-b'], SupplementaryQueryType.LogsSample); await expect(testProvider).toEmitValuesWith((received) => { expect(received).toMatchObject([ { data: [], state: LoadingState.Loading }, @@ -332,24 +326,20 @@ describe('SupplementaryQueries utils', function () { describe('All data sources do not support full range logs volume', function () { it('Does not provide fallback result', async () => { - const testProvider = await setup('mixed', SupplementaryQueryType.LogsSample, [ - 'no-data-providers', - 'no-data-providers-2', - ]); - await expect(testProvider).toEmitValuesWith((received) => { - expect(received).toMatchObject([{ state: LoadingState.NotStarted, data: [] }]); - }); + const testProvider = await setup( + ['no-data-providers', 'no-data-providers-2'], + SupplementaryQueryType.LogsSample + ); + await expect(testProvider).toBeUndefined(); }); }); describe('Some data sources support full range logs volume, while others do not', function () { it('Returns results only for data sources supporting logs sample', async () => { - const testProvider = await setup('mixed', SupplementaryQueryType.LogsSample, [ - 'logs-sample-a', - 'no-data-providers', - 'logs-sample-b', - 'no-data-providers-2', - ]); + const testProvider = await setup( + ['logs-sample-a', 'no-data-providers', 'logs-sample-b', 'no-data-providers-2'], + SupplementaryQueryType.LogsSample + ); await expect(testProvider).toEmitValuesWith((received) => { expect(received).toMatchObject([ { data: [], state: LoadingState.Loading }, diff --git a/public/app/features/explore/utils/supplementaryQueries.ts b/public/app/features/explore/utils/supplementaryQueries.ts index da91115b3fd..75ca46ce5af 100644 --- a/public/app/features/explore/utils/supplementaryQueries.ts +++ b/public/app/features/explore/utils/supplementaryQueries.ts @@ -1,5 +1,5 @@ import { cloneDeep, groupBy } from 'lodash'; -import { distinct, from, mergeMap, Observable, of } from 'rxjs'; +import { distinct, Observable, merge } from 'rxjs'; import { scan } from 'rxjs/operators'; import { @@ -9,15 +9,14 @@ import { DataQueryResponse, DataSourceApi, hasSupplementaryQuerySupport, + isTruthy, LoadingState, LogsVolumeCustomMetaData, LogsVolumeType, SupplementaryQueryType, } from '@grafana/data'; -import { getDataSourceSrv } from '@grafana/runtime'; import { makeDataFramesForLogs } from 'app/core/logsModel'; import store from 'app/core/store'; -import { MIXED_DATASOURCE_NAME } from 'app/plugins/datasource/mixed/MixedDataSource'; import { ExplorePanelData, SupplementaryQueries } from 'app/types'; export const supplementaryQueryTypes: SupplementaryQueryType[] = [ @@ -129,98 +128,62 @@ const getSupplementaryQueryFallback = ( if (type === SupplementaryQueryType.LogsVolume) { return createFallbackLogVolumeProvider(explorePanelData, queryTargets, datasourceName); } else { - return of({ - data: [], - state: LoadingState.NotStarted, - }); + return undefined; } }; export const getSupplementaryQueryProvider = ( - datasourceInstance: DataSourceApi, + groupedQueries: Array<{ datasource: DataSourceApi; targets: DataQuery[] }>, type: SupplementaryQueryType, request: DataQueryRequest, explorePanelData: Observable ): Observable | undefined => { - if (hasSupplementaryQuerySupport(datasourceInstance, type)) { - return datasourceInstance.getDataProvider(type, request); - } else if (datasourceInstance.meta?.mixed === true) { - const queries = request.targets.filter((t) => { - return t.datasource?.uid !== MIXED_DATASOURCE_NAME; - }); - // Build groups of queries to run in parallel - const sets: { [key: string]: DataQuery[] } = groupBy(queries, 'datasource.uid'); - const mixed: Array<{ datasource: Promise; targets: DataQuery[] }> = []; + const providers = groupedQueries.map(({ datasource, targets }, i) => { + const dsRequest = cloneDeep(request); + dsRequest.requestId = `${dsRequest.requestId || ''}_${i}`; + dsRequest.targets = targets; - for (const key in sets) { - const targets = sets[key]; - mixed.push({ - datasource: getDataSourceSrv().get(targets[0].datasource, request.scopedVars), - targets, - }); + if (hasSupplementaryQuerySupport(datasource, type)) { + return datasource.getDataProvider(type, dsRequest); + } else { + return getSupplementaryQueryFallback(type, explorePanelData, targets, datasource.name); } + }); - return from(mixed).pipe( - mergeMap((query, i) => { - return from(query.datasource).pipe( - mergeMap((ds) => { - const dsRequest = cloneDeep(request); - dsRequest.requestId = `mixed-${type}-${i}-${dsRequest.requestId || ''}`; - dsRequest.targets = query.targets; + const definedProviders = providers.filter(isTruthy); - if (hasSupplementaryQuerySupport(ds, type)) { - const dsProvider = ds.getDataProvider(type, dsRequest); - if (dsProvider) { - // 1) It provides data for current request - use the provider - return dsProvider; - } else { - // 2) It doesn't provide data for current request -> return nothing - return of({ - data: [], - state: LoadingState.NotStarted, - }); - } - } else { - // 3) Data source doesn't support the supplementary query -> use fallback - // the fallback cannot determine data availability based on request, it - // works on the results once they are available so it never uses the cache - return getSupplementaryQueryFallback(type, explorePanelData, query.targets, ds.name); - } - }) - ); - }), - scan( - (acc, next) => { - if (acc.error || next.state === LoadingState.NotStarted) { - return acc; - } + if (definedProviders.length === 0) { + return undefined; + } else if (definedProviders.length === 1) { + return definedProviders[0]; + } - if (next.state === LoadingState.Loading && acc.state === LoadingState.NotStarted) { - return { - ...acc, - state: LoadingState.Loading, - }; - } - - if (next.state && next.state !== LoadingState.Done) { - return acc; - } + return merge(...definedProviders).pipe( + scan( + (acc, next) => { + if ((acc.errors && acc.errors.length) || next.state === LoadingState.NotStarted) { + return acc; + } + if (next.state === LoadingState.Loading && acc.state === LoadingState.NotStarted) { return { ...acc, - data: [...acc.data, ...next.data], - state: LoadingState.Done, + state: LoadingState.Loading, }; - }, - { data: [], state: LoadingState.NotStarted } - ), - distinct() - ); - } else if (type === SupplementaryQueryType.LogsSample) { - return undefined; - } else { - // Create a fallback to results based logs volume - return getSupplementaryQueryFallback(type, explorePanelData, request.targets, datasourceInstance.name); - } - return undefined; + } + + if (next.state && next.state !== LoadingState.Done) { + return acc; + } + + return { + ...acc, + data: [...acc.data, ...next.data], + state: LoadingState.Done, + }; + }, + { data: [], state: LoadingState.NotStarted } + ), + distinct() + ); };