Explore: Allow supplementary query data provider to be undefined for mixed data sources (#66422)

* Switch back to the API where data provider for a supplementary query can be undefined

* Promote exploreMixedDatasource to beta

* Revert "Promote exploreMixedDatasource to beta"

This reverts commit a3bc167d1c.

* Resolve data source in parallel

* Do not modify function params

* Reorganize code for better readability

* Improve readability

* Simplify code

* Stop using a deprecated error property
This commit is contained in:
Piotr Jamróz 2023-04-14 11:31:46 +02:00 committed by GitHub
parent f48c858ca2
commit b5e2b0d14c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 187 additions and 164 deletions

View File

@ -171,7 +171,7 @@ describe('runQueries', () => {
expect(datasource.getDataProvider).toHaveBeenCalledWith( expect(datasource.getDataProvider).toHaveBeenCalledWith(
type, type,
expect.objectContaining({ expect.objectContaining({
requestId: `explore_left_${snakeCase(type)}`, requestId: `explore_left_${snakeCase(type)}_0`,
}) })
); );
} }

View File

@ -17,6 +17,7 @@ import {
LogsVolumeType, LogsVolumeType,
PanelEvents, PanelEvents,
QueryFixAction, QueryFixAction,
ScopedVars,
SupplementaryQueryType, SupplementaryQueryType,
toLegacyResponseData, toLegacyResponseData,
} from '@grafana/data'; } from '@grafana/data';
@ -37,7 +38,14 @@ import { CorrelationData } from 'app/features/correlations/useCorrelations';
import { getTimeZone } from 'app/features/profile/state/selectors'; import { getTimeZone } from 'app/features/profile/state/selectors';
import { MIXED_DATASOURCE_NAME } from 'app/plugins/datasource/mixed/MixedDataSource'; import { MIXED_DATASOURCE_NAME } from 'app/plugins/datasource/mixed/MixedDataSource';
import { store } from 'app/store/store'; import { store } from 'app/store/store';
import { createAsyncThunk, ExploreItemState, ExplorePanelData, ThunkDispatch, ThunkResult } from 'app/types'; import {
createAsyncThunk,
ExploreItemState,
ExplorePanelData,
QueryTransaction,
ThunkDispatch,
ThunkResult,
} from 'app/types';
import { ExploreId, ExploreState, QueryOptions, SupplementaryQueries } from 'app/types/explore'; import { ExploreId, ExploreState, QueryOptions, SupplementaryQueries } from 'app/types/explore';
import { notifyApp } from '../../../core/actions'; import { notifyApp } from '../../../core/actions';
@ -607,44 +615,17 @@ export const runQueries = (
dispatch(cleanSupplementaryQueryAction({ exploreId, type })); dispatch(cleanSupplementaryQueryAction({ exploreId, type }));
} }
} else { } else {
for (const type of supplementaryQueryTypes) { dispatch(
// We always prepare provider, even is supplementary query is disabled because when the user handleSupplementaryQueries({
// enables the query, we need to load the data, so we need the provider exploreId,
const dataProvider = getSupplementaryQueryProvider(
datasourceInstance, datasourceInstance,
type, transaction,
{ newQuerySource,
...transaction.request, supplementaryQueries,
requestId: `${transaction.request.requestId}_${snakeCase(type)}`, queries,
}, absoluteRange,
newQuerySource })
); );
if (dataProvider) {
dispatch(
storeSupplementaryQueryDataProviderAction({
exploreId,
type,
dataProvider,
})
);
if (!canReuseSupplementaryQueryData(supplementaryQueries[type].data, queries, absoluteRange)) {
dispatch(cleanSupplementaryQueryAction({ exploreId, type }));
if (supplementaryQueries[type].enabled) {
dispatch(loadSupplementaryQueryData(exploreId, type));
}
}
} else {
// If data source instance doesn't support this supplementary query, we clean the data provider
dispatch(
cleanSupplementaryQueryDataProviderAction({
exploreId,
type,
})
);
}
}
} }
} }
@ -652,6 +633,95 @@ export const runQueries = (
}; };
}; };
const groupDataQueries = async (datasources: DataQuery[], scopedVars: ScopedVars) => {
const nonMixedDataSources = datasources.filter((t) => {
return t.datasource?.uid !== MIXED_DATASOURCE_NAME;
});
const sets: { [key: string]: DataQuery[] } = groupBy(nonMixedDataSources, 'datasource.uid');
return await Promise.all(
Object.values(sets).map(async (targets) => {
const datasource = await getDataSourceSrv().get(targets[0].datasource, scopedVars);
return {
datasource,
targets,
};
})
);
};
type HandleSupplementaryQueriesOptions = {
exploreId: ExploreId;
transaction: QueryTransaction;
datasourceInstance: DataSourceApi;
newQuerySource: Observable<ExplorePanelData>;
supplementaryQueries: SupplementaryQueries;
queries: DataQuery[];
absoluteRange: AbsoluteTimeRange;
};
const handleSupplementaryQueries = createAsyncThunk(
'explore/handleSupplementaryQueries',
async (
{
datasourceInstance,
exploreId,
transaction,
newQuerySource,
supplementaryQueries,
queries,
absoluteRange,
}: HandleSupplementaryQueriesOptions,
{ dispatch }
) => {
let groupedQueries;
if (datasourceInstance.meta.mixed) {
groupedQueries = await groupDataQueries(transaction.request.targets, transaction.request.scopedVars);
} else {
groupedQueries = [{ datasource: datasourceInstance, targets: transaction.request.targets }];
}
for (const type of supplementaryQueryTypes) {
// We always prepare provider, even is supplementary query is disabled because when the user
// enables the query, we need to load the data, so we need the provider
const dataProvider = getSupplementaryQueryProvider(
groupedQueries,
type,
{
...transaction.request,
requestId: `${transaction.request.requestId}_${snakeCase(type)}`,
},
newQuerySource
);
if (dataProvider) {
dispatch(
storeSupplementaryQueryDataProviderAction({
exploreId,
type,
dataProvider,
})
);
if (!canReuseSupplementaryQueryData(supplementaryQueries[type].data, queries, absoluteRange)) {
dispatch(cleanSupplementaryQueryAction({ exploreId, type }));
if (supplementaryQueries[type].enabled) {
dispatch(loadSupplementaryQueryData(exploreId, type));
}
}
} else {
// If data source instance doesn't support this supplementary query, we clean the data provider
dispatch(
cleanSupplementaryQueryDataProviderAction({
exploreId,
type,
})
);
}
}
}
);
/** /**
* Checks if after changing the time range the existing data can be used to show supplementary query. * Checks if after changing the time range the existing data can be used to show supplementary query.
* It can happen if queries are the same and new time range is within existing data time range. * It can happen if queries are the same and new time range is within existing data time range.

View File

@ -133,7 +133,6 @@ const datasources: DataSourceApi[] = [
), ),
new MockDataSourceApi('no-data-providers'), new MockDataSourceApi('no-data-providers'),
new MockDataSourceApi('no-data-providers-2'), new MockDataSourceApi('no-data-providers-2'),
new MockDataSourceApi('mixed').setupMixed(true),
]; ];
jest.mock('@grafana/runtime', () => ({ jest.mock('@grafana/runtime', () => ({
@ -145,17 +144,23 @@ jest.mock('@grafana/runtime', () => ({
}, },
})); }));
const setup = async (rootDataSource: string, type: SupplementaryQueryType, targetSources?: string[]) => { const setup = async (targetSources: string[], type: SupplementaryQueryType) => {
const rootDataSourceApiMock = await getDataSourceSrv().get({ uid: rootDataSource });
targetSources = targetSources || [rootDataSource];
const requestMock = new MockDataQueryRequest({ const requestMock = new MockDataQueryRequest({
targets: targetSources.map((source, i) => new MockQuery(`${i}`, 'a', { uid: source })), targets: targetSources.map((source, i) => new MockQuery(`${i}`, 'a', { uid: source })),
}); });
const explorePanelDataMock: Observable<ExplorePanelData> = mockExploreDataWithLogs(); const explorePanelDataMock: Observable<ExplorePanelData> = mockExploreDataWithLogs();
return getSupplementaryQueryProvider(rootDataSourceApiMock, type, requestMock, explorePanelDataMock); const datasources = await Promise.all(
targetSources.map(async (source, i) => {
const datasource = await getDataSourceSrv().get({ uid: source });
return {
datasource,
targets: [new MockQuery(`${i}`, 'a', { uid: source })],
};
})
);
return getSupplementaryQueryProvider(datasources, type, requestMock, explorePanelDataMock);
}; };
const assertDataFrom = (type: SupplementaryQueryType, ...datasources: string[]) => { const assertDataFrom = (type: SupplementaryQueryType, ...datasources: string[]) => {
@ -173,7 +178,7 @@ const assertDataFromLogsResults = () => {
describe('SupplementaryQueries utils', function () { describe('SupplementaryQueries utils', function () {
describe('Non-mixed data source', function () { describe('Non-mixed data source', function () {
it('Returns result from the provider', async () => { it('Returns result from the provider', async () => {
const testProvider = await setup('logs-volume-a', SupplementaryQueryType.LogsVolume); const testProvider = await setup(['logs-volume-a'], SupplementaryQueryType.LogsVolume);
await expect(testProvider).toEmitValuesWith((received) => { await expect(testProvider).toEmitValuesWith((received) => {
expect(received).toMatchObject([ expect(received).toMatchObject([
@ -186,7 +191,7 @@ describe('SupplementaryQueries utils', function () {
}); });
}); });
it('Uses fallback for logs volume', async () => { it('Uses fallback for logs volume', async () => {
const testProvider = await setup('no-data-providers', SupplementaryQueryType.LogsVolume); const testProvider = await setup(['no-data-providers'], SupplementaryQueryType.LogsVolume);
await expect(testProvider).toEmitValuesWith((received) => { await expect(testProvider).toEmitValuesWith((received) => {
expect(received).toMatchObject([ expect(received).toMatchObject([
@ -198,14 +203,11 @@ describe('SupplementaryQueries utils', function () {
}); });
}); });
it('Returns undefined for logs sample', async () => { it('Returns undefined for logs sample', async () => {
const testProvider = await setup('no-data-providers', SupplementaryQueryType.LogsSample); const testProvider = await setup(['no-data-providers'], SupplementaryQueryType.LogsSample);
await expect(testProvider).toBe(undefined); await expect(testProvider).toBe(undefined);
}); });
it('Creates single fallback result', async () => { it('Creates single fallback result', async () => {
const testProvider = await setup('no-data-providers', SupplementaryQueryType.LogsVolume, [ const testProvider = await setup(['no-data-providers', 'no-data-providers-2'], SupplementaryQueryType.LogsVolume);
'no-data-providers',
'no-data-providers-2',
]);
await expect(testProvider).toEmitValuesWith((received) => { await expect(testProvider).toEmitValuesWith((received) => {
expect(received).toMatchObject([ expect(received).toMatchObject([
@ -226,10 +228,7 @@ describe('SupplementaryQueries utils', function () {
describe('Logs volume', function () { describe('Logs volume', function () {
describe('All data sources support full range logs volume', function () { describe('All data sources support full range logs volume', function () {
it('Merges all data frames into a single response', async () => { it('Merges all data frames into a single response', async () => {
const testProvider = await setup('mixed', SupplementaryQueryType.LogsVolume, [ const testProvider = await setup(['logs-volume-a', 'logs-volume-b'], SupplementaryQueryType.LogsVolume);
'logs-volume-a',
'logs-volume-b',
]);
await expect(testProvider).toEmitValuesWith((received) => { await expect(testProvider).toEmitValuesWith((received) => {
expect(received).toMatchObject([ expect(received).toMatchObject([
{ data: [], state: LoadingState.Loading }, { data: [], state: LoadingState.Loading },
@ -248,10 +247,10 @@ describe('SupplementaryQueries utils', function () {
describe('All data sources do not support full range logs volume', function () { describe('All data sources do not support full range logs volume', function () {
it('Creates single fallback result', async () => { it('Creates single fallback result', async () => {
const testProvider = await setup('mixed', SupplementaryQueryType.LogsVolume, [ const testProvider = await setup(
'no-data-providers', ['no-data-providers', 'no-data-providers-2'],
'no-data-providers-2', SupplementaryQueryType.LogsVolume
]); );
await expect(testProvider).toEmitValuesWith((received) => { await expect(testProvider).toEmitValuesWith((received) => {
expect(received).toMatchObject([ expect(received).toMatchObject([
@ -270,12 +269,10 @@ describe('SupplementaryQueries utils', function () {
describe('Some data sources support full range logs volume, while others do not', function () { describe('Some data sources support full range logs volume, while others do not', function () {
it('Creates merged result containing full range and limited logs volume', async () => { it('Creates merged result containing full range and limited logs volume', async () => {
const testProvider = await setup('mixed', SupplementaryQueryType.LogsVolume, [ const testProvider = await setup(
'logs-volume-a', ['logs-volume-a', 'no-data-providers', 'logs-volume-b', 'no-data-providers-2'],
'no-data-providers', SupplementaryQueryType.LogsVolume
'logs-volume-b', );
'no-data-providers-2',
]);
await expect(testProvider).toEmitValuesWith((received) => { await expect(testProvider).toEmitValuesWith((received) => {
expect(received).toMatchObject([ expect(received).toMatchObject([
{ {
@ -310,10 +307,7 @@ describe('SupplementaryQueries utils', function () {
describe('Logs sample', function () { describe('Logs sample', function () {
describe('All data sources support logs sample', function () { describe('All data sources support logs sample', function () {
it('Merges all responses into single result', async () => { it('Merges all responses into single result', async () => {
const testProvider = await setup('mixed', SupplementaryQueryType.LogsSample, [ const testProvider = await setup(['logs-sample-a', 'logs-sample-b'], SupplementaryQueryType.LogsSample);
'logs-sample-a',
'logs-sample-b',
]);
await expect(testProvider).toEmitValuesWith((received) => { await expect(testProvider).toEmitValuesWith((received) => {
expect(received).toMatchObject([ expect(received).toMatchObject([
{ data: [], state: LoadingState.Loading }, { data: [], state: LoadingState.Loading },
@ -332,24 +326,20 @@ describe('SupplementaryQueries utils', function () {
describe('All data sources do not support full range logs volume', function () { describe('All data sources do not support full range logs volume', function () {
it('Does not provide fallback result', async () => { it('Does not provide fallback result', async () => {
const testProvider = await setup('mixed', SupplementaryQueryType.LogsSample, [ const testProvider = await setup(
'no-data-providers', ['no-data-providers', 'no-data-providers-2'],
'no-data-providers-2', SupplementaryQueryType.LogsSample
]); );
await expect(testProvider).toEmitValuesWith((received) => { await expect(testProvider).toBeUndefined();
expect(received).toMatchObject([{ state: LoadingState.NotStarted, data: [] }]);
});
}); });
}); });
describe('Some data sources support full range logs volume, while others do not', function () { describe('Some data sources support full range logs volume, while others do not', function () {
it('Returns results only for data sources supporting logs sample', async () => { it('Returns results only for data sources supporting logs sample', async () => {
const testProvider = await setup('mixed', SupplementaryQueryType.LogsSample, [ const testProvider = await setup(
'logs-sample-a', ['logs-sample-a', 'no-data-providers', 'logs-sample-b', 'no-data-providers-2'],
'no-data-providers', SupplementaryQueryType.LogsSample
'logs-sample-b', );
'no-data-providers-2',
]);
await expect(testProvider).toEmitValuesWith((received) => { await expect(testProvider).toEmitValuesWith((received) => {
expect(received).toMatchObject([ expect(received).toMatchObject([
{ data: [], state: LoadingState.Loading }, { data: [], state: LoadingState.Loading },

View File

@ -1,5 +1,5 @@
import { cloneDeep, groupBy } from 'lodash'; import { cloneDeep, groupBy } from 'lodash';
import { distinct, from, mergeMap, Observable, of } from 'rxjs'; import { distinct, Observable, merge } from 'rxjs';
import { scan } from 'rxjs/operators'; import { scan } from 'rxjs/operators';
import { import {
@ -9,15 +9,14 @@ import {
DataQueryResponse, DataQueryResponse,
DataSourceApi, DataSourceApi,
hasSupplementaryQuerySupport, hasSupplementaryQuerySupport,
isTruthy,
LoadingState, LoadingState,
LogsVolumeCustomMetaData, LogsVolumeCustomMetaData,
LogsVolumeType, LogsVolumeType,
SupplementaryQueryType, SupplementaryQueryType,
} from '@grafana/data'; } from '@grafana/data';
import { getDataSourceSrv } from '@grafana/runtime';
import { makeDataFramesForLogs } from 'app/core/logsModel'; import { makeDataFramesForLogs } from 'app/core/logsModel';
import store from 'app/core/store'; import store from 'app/core/store';
import { MIXED_DATASOURCE_NAME } from 'app/plugins/datasource/mixed/MixedDataSource';
import { ExplorePanelData, SupplementaryQueries } from 'app/types'; import { ExplorePanelData, SupplementaryQueries } from 'app/types';
export const supplementaryQueryTypes: SupplementaryQueryType[] = [ export const supplementaryQueryTypes: SupplementaryQueryType[] = [
@ -129,98 +128,62 @@ const getSupplementaryQueryFallback = (
if (type === SupplementaryQueryType.LogsVolume) { if (type === SupplementaryQueryType.LogsVolume) {
return createFallbackLogVolumeProvider(explorePanelData, queryTargets, datasourceName); return createFallbackLogVolumeProvider(explorePanelData, queryTargets, datasourceName);
} else { } else {
return of({ return undefined;
data: [],
state: LoadingState.NotStarted,
});
} }
}; };
export const getSupplementaryQueryProvider = ( export const getSupplementaryQueryProvider = (
datasourceInstance: DataSourceApi, groupedQueries: Array<{ datasource: DataSourceApi; targets: DataQuery[] }>,
type: SupplementaryQueryType, type: SupplementaryQueryType,
request: DataQueryRequest, request: DataQueryRequest,
explorePanelData: Observable<ExplorePanelData> explorePanelData: Observable<ExplorePanelData>
): Observable<DataQueryResponse> | undefined => { ): Observable<DataQueryResponse> | undefined => {
if (hasSupplementaryQuerySupport(datasourceInstance, type)) { const providers = groupedQueries.map(({ datasource, targets }, i) => {
return datasourceInstance.getDataProvider(type, request); const dsRequest = cloneDeep(request);
} else if (datasourceInstance.meta?.mixed === true) { dsRequest.requestId = `${dsRequest.requestId || ''}_${i}`;
const queries = request.targets.filter((t) => { dsRequest.targets = targets;
return t.datasource?.uid !== MIXED_DATASOURCE_NAME;
});
// Build groups of queries to run in parallel
const sets: { [key: string]: DataQuery[] } = groupBy(queries, 'datasource.uid');
const mixed: Array<{ datasource: Promise<DataSourceApi>; targets: DataQuery[] }> = [];
for (const key in sets) { if (hasSupplementaryQuerySupport(datasource, type)) {
const targets = sets[key]; return datasource.getDataProvider(type, dsRequest);
mixed.push({ } else {
datasource: getDataSourceSrv().get(targets[0].datasource, request.scopedVars), return getSupplementaryQueryFallback(type, explorePanelData, targets, datasource.name);
targets,
});
} }
});
return from(mixed).pipe( const definedProviders = providers.filter(isTruthy);
mergeMap((query, i) => {
return from(query.datasource).pipe(
mergeMap((ds) => {
const dsRequest = cloneDeep(request);
dsRequest.requestId = `mixed-${type}-${i}-${dsRequest.requestId || ''}`;
dsRequest.targets = query.targets;
if (hasSupplementaryQuerySupport(ds, type)) { if (definedProviders.length === 0) {
const dsProvider = ds.getDataProvider(type, dsRequest); return undefined;
if (dsProvider) { } else if (definedProviders.length === 1) {
// 1) It provides data for current request - use the provider return definedProviders[0];
return dsProvider; }
} else {
// 2) It doesn't provide data for current request -> return nothing
return of({
data: [],
state: LoadingState.NotStarted,
});
}
} else {
// 3) Data source doesn't support the supplementary query -> use fallback
// the fallback cannot determine data availability based on request, it
// works on the results once they are available so it never uses the cache
return getSupplementaryQueryFallback(type, explorePanelData, query.targets, ds.name);
}
})
);
}),
scan<DataQueryResponse, DataQueryResponse>(
(acc, next) => {
if (acc.error || next.state === LoadingState.NotStarted) {
return acc;
}
if (next.state === LoadingState.Loading && acc.state === LoadingState.NotStarted) { return merge(...definedProviders).pipe(
return { scan<DataQueryResponse, DataQueryResponse>(
...acc, (acc, next) => {
state: LoadingState.Loading, if ((acc.errors && acc.errors.length) || next.state === LoadingState.NotStarted) {
}; return acc;
} }
if (next.state && next.state !== LoadingState.Done) {
return acc;
}
if (next.state === LoadingState.Loading && acc.state === LoadingState.NotStarted) {
return { return {
...acc, ...acc,
data: [...acc.data, ...next.data], state: LoadingState.Loading,
state: LoadingState.Done,
}; };
}, }
{ data: [], state: LoadingState.NotStarted }
), if (next.state && next.state !== LoadingState.Done) {
distinct() return acc;
); }
} else if (type === SupplementaryQueryType.LogsSample) {
return undefined; return {
} else { ...acc,
// Create a fallback to results based logs volume data: [...acc.data, ...next.data],
return getSupplementaryQueryFallback(type, explorePanelData, request.targets, datasourceInstance.name); state: LoadingState.Done,
} };
return undefined; },
{ data: [], state: LoadingState.NotStarted }
),
distinct()
);
}; };