Mixed-ds: Return both results from the same datasource (#39865)

* Mixed-ds: Return both results from the same datasource

* fix mixed data source

* Change concatMap to mergeMap

* Make sure to return correct state at the end

* Make sure to count the targets as well

* Use Marcus's version

* Fix stict error

* Apply suggestions from code review

Co-authored-by: Marcus Andersson <marcus.andersson@grafana.com>

Co-authored-by: Marcus Andersson <marcus.andersson@grafana.com>
This commit is contained in:
Zoltán Bedi 2021-10-26 12:54:22 +02:00 committed by GitHub
parent 54899f91ff
commit c43776aec3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 130 additions and 95 deletions

View File

@ -1,104 +1,129 @@
import { from } from 'rxjs';
import { getDataSourceSrv } from '@grafana/runtime';
import { DataSourceInstanceSettings, LoadingState } from '@grafana/data';
import { DatasourceSrvMock, MockDataSourceApi } from 'test/mocks/datasource_srv';
import { LoadingState } from '@grafana/data';
import { lastValueFrom } from 'rxjs';
import { getQueryOptions } from 'test/helpers/getQueryOptions';
import { DatasourceSrvMock, MockObservableDataSourceApi } from 'test/mocks/datasource_srv';
import { MixedDatasource } from './module';
const defaultDS = new MockDataSourceApi('DefaultDS', { data: ['DDD'] });
const defaultDS = new MockObservableDataSourceApi('DefaultDS', [{ data: ['DDD'] }]);
const datasourceSrv = new DatasourceSrvMock(defaultDS, {
'-- Mixed --': new MixedDatasource({ name: 'mixed', id: 5 } as DataSourceInstanceSettings),
A: new MockDataSourceApi('DSA', { data: ['AAAA'] }),
B: new MockDataSourceApi('DSB', { data: ['BBBB'] }),
C: new MockDataSourceApi('DSC', { data: ['CCCC'] }),
D: new MockDataSourceApi('DSD', { data: [] }, {}, 'syntax error near FROM'),
E: new MockDataSourceApi('DSE', { data: [] }, {}, 'syntax error near WHERE'),
'-- Mixed --': new MockObservableDataSourceApi('mixed'),
A: new MockObservableDataSourceApi('DSA', [{ data: ['AAAA'] }]),
B: new MockObservableDataSourceApi('DSB', [{ data: ['BBBB'] }]),
C: new MockObservableDataSourceApi('DSC', [{ data: ['CCCC'] }]),
D: new MockObservableDataSourceApi('DSD', [{ data: [] }], {}, 'syntax error near FROM'),
E: new MockObservableDataSourceApi('DSE', [{ data: [] }], {}, 'syntax error near WHERE'),
Loki: new MockObservableDataSourceApi('Loki', [
{ data: ['A'], key: 'A' },
{ data: ['B'], key: 'B' },
]),
});
const getDataSourceSrvMock = jest.fn().mockReturnValue(datasourceSrv);
jest.mock('@grafana/runtime', () => ({
...((jest.requireActual('@grafana/runtime') as unknown) as object),
getDataSourceSrv: () => {
return datasourceSrv;
},
getDataSourceSrv: () => getDataSourceSrvMock(),
}));
describe('MixedDatasource', () => {
describe('with no errors', () => {
const requestMixed = getQueryOptions({
targets: [
{ refId: 'QA', datasource: 'A' }, // 1
{ refId: 'QB', datasource: 'B' }, // 2
{ refId: 'QC', datasource: 'C' }, // 3
],
});
const results: any[] = [];
beforeEach((done) => {
getDataSourceSrv()
.get('-- Mixed --')
.then((ds) => {
from(ds.query(requestMixed)).subscribe((result) => {
results.push(result);
if (result.state === LoadingState.Done) {
done();
}
});
});
});
it('direct query should return results', async () => {
expect(results.length).toBe(3);
expect(results[0].data).toEqual(['AAAA']);
expect(results[0].state).toEqual(LoadingState.Loading);
expect(results[1].data).toEqual(['BBBB']);
expect(results[2].data).toEqual(['CCCC']);
expect(results[2].state).toEqual(LoadingState.Done);
expect(results.length).toBe(3);
const ds = new MixedDatasource({} as any);
const requestMixed = getQueryOptions({
targets: [
{ refId: 'QA', datasource: 'A' }, // 1
{ refId: 'QB', datasource: 'B' }, // 2
{ refId: 'QC', datasource: 'C' }, // 3
],
});
await expect(ds.query(requestMixed)).toEmitValuesWith((results) => {
expect(results.length).toBe(3);
expect(results[0].data).toEqual(['AAAA']);
expect(results[0].state).toEqual(LoadingState.Loading);
expect(results[1].data).toEqual(['BBBB']);
expect(results[2].data).toEqual(['CCCC']);
expect(results[2].state).toEqual(LoadingState.Done);
});
});
});
describe('with errors', () => {
const requestMixed = getQueryOptions({
targets: [
{ refId: 'QA', datasource: 'A' }, // 1
{ refId: 'QD', datasource: 'D' }, // 2
{ refId: 'QB', datasource: 'B' }, // 3
{ refId: 'QE', datasource: 'E' }, // 4
{ refId: 'QC', datasource: 'C' }, // 5
],
});
const results: any[] = [];
beforeEach((done) => {
getDataSourceSrv()
.get('-- Mixed --')
.then((ds) => {
from(ds.query(requestMixed)).subscribe((result) => {
results.push(result);
if (results.length === 5) {
done();
}
});
});
});
it('direct query should return results', async () => {
expect(results[0].data).toEqual(['AAAA']);
expect(results[0].state).toEqual(LoadingState.Loading);
expect(results[1].data).toEqual([]);
expect(results[1].state).toEqual(LoadingState.Error);
expect(results[1].error).toEqual({ message: 'DSD: syntax error near FROM' });
expect(results[2].data).toEqual(['BBBB']);
expect(results[2].state).toEqual(LoadingState.Loading);
expect(results[3].data).toEqual([]);
expect(results[3].state).toEqual(LoadingState.Error);
expect(results[3].error).toEqual({ message: 'DSE: syntax error near WHERE' });
expect(results[4].data).toEqual(['CCCC']);
expect(results[4].state).toEqual(LoadingState.Loading);
expect(results[5].data).toEqual([]);
expect(results[5].state).toEqual(LoadingState.Error);
expect(results[5].error).toEqual({ message: 'DSD: syntax error near FROM' });
const ds = new MixedDatasource({} as any);
const requestMixed = getQueryOptions({
targets: [
{ refId: 'QA', datasource: 'A' }, // 1
{ refId: 'QD', datasource: 'D' }, // 2
{ refId: 'QB', datasource: 'B' }, // 3
{ refId: 'QE', datasource: 'E' }, // 4
{ refId: 'QC', datasource: 'C' }, // 5
],
});
await expect(ds.query(requestMixed)).toEmitValuesWith((results) => {
expect(results[0].data).toEqual(['AAAA']);
expect(results[0].state).toEqual(LoadingState.Loading);
expect(results[1].data).toEqual([]);
expect(results[1].state).toEqual(LoadingState.Error);
expect(results[1].error).toEqual({ message: 'DSD: syntax error near FROM' });
expect(results[2].data).toEqual(['BBBB']);
expect(results[2].state).toEqual(LoadingState.Loading);
expect(results[3].data).toEqual([]);
expect(results[3].state).toEqual(LoadingState.Error);
expect(results[3].error).toEqual({ message: 'DSE: syntax error near WHERE' });
expect(results[4].data).toEqual(['CCCC']);
expect(results[4].state).toEqual(LoadingState.Loading);
expect(results[5].data).toEqual([]);
expect(results[5].state).toEqual(LoadingState.Error);
expect(results[5].error).toEqual({ message: 'DSD: syntax error near FROM' });
});
});
});
it('should return both query results from the same data source', async () => {
const ds = new MixedDatasource({} as any);
const request: any = {
targets: [
{ refId: 'A', datasource: 'Loki' },
{ refId: 'B', datasource: 'Loki' },
{ refId: 'C', datasource: 'A' },
],
};
await expect(ds.query(request)).toEmitValuesWith((results) => {
expect(results).toHaveLength(3);
expect(results[0].key).toBe('mixed-0-A');
expect(results[1].key).toBe('mixed-0-B');
expect(results[1].state).toBe(LoadingState.Loading);
expect(results[2].key).toBe('mixed-1-');
expect(results[2].state).toBe(LoadingState.Done);
});
});
it('should not return the error for the second time', async () => {
const ds = new MixedDatasource({} as any);
const request: any = {
targets: [
{ refId: 'A', datasource: 'Loki' },
{ refId: 'DD', datasource: 'D' },
{ refId: 'C', datasource: 'A' },
],
};
await lastValueFrom(ds.query(request));
await expect(
ds.query({
targets: [
{ refId: 'QA', datasource: 'A' },
{ refId: 'QB', datasource: 'B' },
],
} as any)
).toEmitValuesWith((results) => {
expect(results).toHaveLength(2);
expect(results[0].key).toBe('mixed-0-');
expect(results[1].key).toBe('mixed-1-');
expect(results[1].state).toBe(LoadingState.Done);
});
});
});

View File

@ -1,7 +1,3 @@
import { cloneDeep, groupBy } from 'lodash';
import { forkJoin, from, Observable, of } from 'rxjs';
import { catchError, map, mergeAll, mergeMap } from 'rxjs/operators';
import {
DataQuery,
DataQueryRequest,
@ -11,6 +7,9 @@ import {
LoadingState,
} from '@grafana/data';
import { getDataSourceSrv, toDataQueryError } from '@grafana/runtime';
import { cloneDeep, groupBy } from 'lodash';
import { forkJoin, from, Observable, of, OperatorFunction } from 'rxjs';
import { catchError, map, mergeAll, mergeMap, reduce, toArray } from 'rxjs/operators';
export const MIXED_DATASOURCE_NAME = '-- Mixed --';
@ -68,24 +67,26 @@ export class MixedDatasource extends DataSourceApi<DataQuery> {
key: `mixed-${i}-${response.key || ''}`,
} as DataQueryResponse;
}),
toArray(),
catchError((err) => {
err = toDataQueryError(err);
err.message = `${api.name}: ${err.message}`;
return of({
data: [],
state: LoadingState.Error,
error: err,
key: `mixed-${i}-${dsRequest.requestId || ''}`,
});
return of<DataQueryResponse[]>([
{
data: [],
state: LoadingState.Error,
error: err,
key: `mixed-${i}-${dsRequest.requestId || ''}`,
},
]);
})
);
})
)
);
return forkJoin(runningQueries).pipe(map(this.finalizeResponses), mergeAll());
return forkJoin(runningQueries).pipe(flattenResponses(), map(this.finalizeResponses), mergeAll());
}
testDatasource() {
@ -113,3 +114,12 @@ export class MixedDatasource extends DataSourceApi<DataQuery> {
return responses;
}
}
function flattenResponses(): OperatorFunction<DataQueryResponse[][], DataQueryResponse[]> {
return reduce((all: DataQueryResponse[], current) => {
return current.reduce((innerAll, innerCurrent) => {
innerAll.push.apply(innerAll, innerCurrent);
return innerAll;
}, all);
}, []);
}