Files
grafana/public/app/plugins/datasource/mixed/MixedDataSource.ts
Ryan McKinley eaba50283e MixedQuery: refactor so other components could also batch queries (#20219)
* query mixed

* query mixed

* batch queries

* check that the values only have 3 rows at the *end* as well
2019-11-07 07:18:27 +01:00

115 lines
3.4 KiB
TypeScript

import cloneDeep from 'lodash/cloneDeep';
import groupBy from 'lodash/groupBy';
import { from, of, Observable, merge } from 'rxjs';
import { tap } from 'rxjs/operators';
import {
LoadingState,
DataSourceApi,
DataQuery,
DataQueryRequest,
DataQueryResponse,
DataSourceInstanceSettings,
} from '@grafana/data';
import { getDataSourceSrv } from '@grafana/runtime';
import { mergeMap, map } from 'rxjs/operators';
export const MIXED_DATASOURCE_NAME = '-- Mixed --';
export interface BatchedQueries {
datasource: Promise<DataSourceApi>;
targets: DataQuery[];
}
export class MixedDatasource extends DataSourceApi<DataQuery> {
constructor(instanceSettings: DataSourceInstanceSettings) {
super(instanceSettings);
}
query(request: DataQueryRequest<DataQuery>): Observable<DataQueryResponse> {
// Remove any invalid queries
const queries = request.targets.filter(t => {
return t.datasource !== MIXED_DATASOURCE_NAME;
});
if (!queries.length) {
return of({ data: [] } as DataQueryResponse); // nothing
}
// Build groups of queries to run in parallel
const sets: { [key: string]: DataQuery[] } = groupBy(queries, 'datasource');
const mixed: BatchedQueries[] = [];
for (const key in sets) {
const targets = sets[key];
const dsName = targets[0].datasource;
mixed.push({
datasource: getDataSourceSrv().get(dsName),
targets,
});
}
return this.batchQueries(mixed, request);
}
batchQueries(mixed: BatchedQueries[], request: DataQueryRequest<DataQuery>): Observable<DataQueryResponse> {
const observables: Array<Observable<DataQueryResponse>> = [];
let runningSubRequests = 0;
for (let i = 0; i < mixed.length; i++) {
const query = mixed[i];
if (!query.targets || !query.targets.length) {
continue;
}
const observable = from(query.datasource).pipe(
mergeMap((dataSourceApi: DataSourceApi) => {
const datasourceRequest = cloneDeep(request);
datasourceRequest.requestId = `mixed-${i}-${datasourceRequest.requestId || ''}`;
datasourceRequest.targets = query.targets;
runningSubRequests++;
let hasCountedAsDone = false;
return from(dataSourceApi.query(datasourceRequest)).pipe(
tap(
(response: DataQueryResponse) => {
if (
hasCountedAsDone ||
response.state === LoadingState.Streaming ||
response.state === LoadingState.Loading
) {
return;
}
runningSubRequests--;
hasCountedAsDone = true;
},
() => {
if (hasCountedAsDone) {
return;
}
hasCountedAsDone = true;
runningSubRequests--;
}
),
map((response: DataQueryResponse) => {
return {
...response,
data: response.data || [],
state: runningSubRequests === 0 ? LoadingState.Done : LoadingState.Loading,
key: `mixed-${i}-${response.key || ''}`,
} as DataQueryResponse;
})
);
})
);
observables.push(observable);
}
return merge(...observables);
}
testDatasource() {
return Promise.resolve({});
}
}