diff --git a/public/app/plugins/datasource/loki/datasource.test.ts b/public/app/plugins/datasource/loki/datasource.test.ts index 6765414ab28..6a5194b77e5 100644 --- a/public/app/plugins/datasource/loki/datasource.test.ts +++ b/public/app/plugins/datasource/loki/datasource.test.ts @@ -1,17 +1,22 @@ +import { of, Subject } from 'rxjs'; +import { first, last, take } from 'rxjs/operators'; +import { omit } from 'lodash'; +import { AnnotationQueryRequest, DataFrame, DataQueryResponse, dateTime, FieldCache, TimeRange } from '@grafana/data'; +import { BackendSrvRequest, FetchResponse } from '@grafana/runtime'; + import LokiDatasource from './datasource'; import { LokiQuery, LokiResponse, LokiResultType } from './types'; import { getQueryOptions } from 'test/helpers/getQueryOptions'; -import { AnnotationQueryRequest, DataFrame, DataSourceApi, dateTime, FieldCache, TimeRange } from '@grafana/data'; import { TemplateSrv } from 'app/features/templating/template_srv'; -import { makeMockLokiDatasource } from './mocks'; -import { of } from 'rxjs'; -import omit from 'lodash/omit'; import { backendSrv } from 'app/core/services/backend_srv'; import { CustomVariableModel } from '../../../features/variables/types'; -import { initialCustomVariableModelState } from '../../../features/variables/custom/reducer'; // will use the version in __mocks__ +import { initialCustomVariableModelState } from '../../../features/variables/custom/reducer'; +import { observableTester } from '../../../../test/helpers/observableTester'; +import { expect } from '../../../../test/lib/common'; +import { makeMockLokiDatasource } from './mocks'; jest.mock('@grafana/runtime', () => ({ - //@ts-ignore + // @ts-ignore ...jest.requireActual('@grafana/runtime'), getBackendSrv: () => backendSrv, })); @@ -27,14 +32,11 @@ jest.mock('app/features/dashboard/services/TimeSrv', () => { }; }); -const datasourceRequestMock = jest.spyOn(backendSrv, 'datasourceRequest'); - describe('LokiDatasource', () => { - const instanceSettings: any = { - url: 'myloggingurl', - }; + let fetchStream: Subject; + const fetchMock = jest.spyOn(backendSrv, 'fetch'); - const testResp: { data: LokiResponse } = { + const testResponse: FetchResponse = { data: { data: { resultType: LokiResultType.Stream, @@ -47,25 +49,28 @@ describe('LokiDatasource', () => { }, status: 'success', }, + ok: true, + headers: ({} as unknown) as Headers, + redirected: false, + status: 200, + statusText: 'Success', + type: 'default', + url: '', + config: ({} as unknown) as BackendSrvRequest, }; beforeEach(() => { jest.clearAllMocks(); - datasourceRequestMock.mockImplementation(() => Promise.resolve()); + fetchStream = new Subject(); + fetchMock.mockImplementation(() => fetchStream.asObservable()); }); - const templateSrvMock = ({ - getAdhocFilters: (): any[] => [], - replace: (a: string) => a, - } as unknown) as TemplateSrv; - describe('when creating range query', () => { let ds: LokiDatasource; let adjustIntervalSpy: jest.SpyInstance; + beforeEach(() => { - const customData = { ...(instanceSettings.jsonData || {}), maxLines: 20 }; - const customSettings = { ...instanceSettings, jsonData: customData }; - ds = new LokiDatasource(customSettings, templateSrvMock); + ds = createLokiDSForTests(); adjustIntervalSpy = jest.spyOn(ds, 'adjustInterval'); }); @@ -99,124 +104,161 @@ describe('LokiDatasource', () => { }); }); + describe('when querying with limits', () => { + const runLimitTest = ({ maxDataPoints, maxLines, expectedLimit, done }: any) => { + let settings: any = { + url: 'myloggingurl', + }; + + if (Number.isFinite(maxLines!)) { + const customData = { ...(settings.jsonData || {}), maxLines: 20 }; + settings = { ...settings, jsonData: customData }; + } + + const templateSrvMock = ({ + getAdhocFilters: (): any[] => [], + replace: (a: string) => a, + } as unknown) as TemplateSrv; + + const ds = new LokiDatasource(settings, templateSrvMock); + + const options = getQueryOptions({ targets: [{ expr: 'foo', refId: 'B', maxLines: maxDataPoints }] }); + + if (Number.isFinite(maxDataPoints!)) { + options.maxDataPoints = maxDataPoints; + } else { + // By default is 500 + delete options.maxDataPoints; + } + + observableTester().subscribeAndExpectOnComplete({ + observable: ds.query(options).pipe(take(1)), + expect: () => { + expect(fetchMock.mock.calls.length).toBe(2); + expect(fetchMock.mock.calls[0][0].url).toContain(`limit=${expectedLimit}`); + }, + done, + }); + + fetchStream.next(testResponse); + }; + + it('should use default max lines when no limit given', done => { + runLimitTest({ + expectedLimit: 1000, + done, + }); + }); + + it('should use custom max lines if limit is set', done => { + runLimitTest({ + maxLines: 20, + expectedLimit: 20, + done, + }); + }); + + it('should use custom maxDataPoints if set in request', () => { + runLimitTest({ + maxDataPoints: 500, + expectedLimit: 500, + }); + }); + + it('should use datasource maxLimit if maxDataPoints is higher', () => { + runLimitTest({ + maxLines: 20, + maxDataPoints: 500, + expectedLimit: 20, + }); + }); + }); + describe('when querying', () => { - let ds: LokiDatasource; - let testLimit: any; - - beforeAll(() => { - testLimit = makeLimitTest(instanceSettings, datasourceRequestMock, templateSrvMock, testResp); - }); - - beforeEach(() => { - const customData = { ...(instanceSettings.jsonData || {}), maxLines: 20 }; - const customSettings = { ...instanceSettings, jsonData: customData }; - ds = new LokiDatasource(customSettings, templateSrvMock); - datasourceRequestMock.mockImplementation(() => Promise.resolve(testResp)); - }); - - test('should run range and instant query', async () => { + it('should run range and instant query', done => { + const ds = createLokiDSForTests(); const options = getQueryOptions({ targets: [{ expr: '{job="grafana"}', refId: 'B' }], }); ds.runInstantQuery = jest.fn(() => of({ data: [] })); ds.runRangeQuery = jest.fn(() => of({ data: [] })); - await ds.query(options).toPromise(); - expect(ds.runInstantQuery).toBeCalled(); - expect(ds.runRangeQuery).toBeCalled(); - }); - - test('should use default max lines when no limit given', () => { - testLimit({ - expectedLimit: 1000, + observableTester().subscribeAndExpectOnComplete({ + observable: ds.query(options), + expect: () => { + expect(ds.runInstantQuery).toBeCalled(); + expect(ds.runRangeQuery).toBeCalled(); + }, + done, }); }); - test('should use custom max lines if limit is set', () => { - testLimit({ - maxLines: 20, - expectedLimit: 20, - }); - }); - - test('should use custom maxDataPoints if set in request', () => { - testLimit({ - maxDataPoints: 500, - expectedLimit: 500, - }); - }); - - test('should use datasource maxLimit if maxDataPoints is higher', () => { - testLimit({ - maxLines: 20, - maxDataPoints: 500, - expectedLimit: 20, - }); - }); - - test('should return series data', async () => { - const customData = { ...(instanceSettings.jsonData || {}), maxLines: 20 }; - const customSettings = { ...instanceSettings, jsonData: customData }; - const ds = new LokiDatasource(customSettings, templateSrvMock); - datasourceRequestMock.mockImplementation( - jest - .fn() - .mockReturnValueOnce(Promise.resolve(testResp)) - .mockReturnValueOnce(Promise.resolve(omit(testResp, 'data.status'))) - ); - + it('should return series data', done => { + const ds = createLokiDSForTests(); const options = getQueryOptions({ targets: [{ expr: '{job="grafana"} |= "foo"', refId: 'B' }], }); - const res = await ds.query(options).toPromise(); + observableTester().subscribeAndExpectOnNext({ + observable: ds.query(options).pipe(first()), // first result always comes from runInstantQuery + expect: res => { + expect(res).toEqual({ + data: [], + key: 'B_instant', + }); + }, + done, + }); - const dataFrame = res.data[0] as DataFrame; - const fieldCache = new FieldCache(dataFrame); - expect(fieldCache.getFieldByName('line')?.values.get(0)).toBe('hello'); - expect(dataFrame.meta?.limit).toBe(20); - expect(dataFrame.meta?.searchWords).toEqual(['foo']); + observableTester().subscribeAndExpectOnNext({ + observable: ds.query(options).pipe(last()), // last result always comes from runRangeQuery + expect: res => { + const dataFrame = res.data[0] as DataFrame; + const fieldCache = new FieldCache(dataFrame); + expect(fieldCache.getFieldByName('line')?.values.get(0)).toBe('hello'); + expect(dataFrame.meta?.limit).toBe(20); + expect(dataFrame.meta?.searchWords).toEqual(['foo']); + }, + done, + }); + + fetchStream.next(testResponse); + fetchStream.next(omit(testResponse, 'data.status')); }); - test('should return custom error message when Loki returns escaping error', async () => { - const customData = { ...(instanceSettings.jsonData || {}), maxLines: 20 }; - const customSettings = { ...instanceSettings, jsonData: customData }; - const ds = new LokiDatasource(customSettings, templateSrvMock); - - datasourceRequestMock.mockImplementation( - jest.fn().mockReturnValue( - Promise.reject({ - data: { - message: 'parse error at line 1, col 6: invalid char escape', - }, - status: 400, - statusText: 'Bad Request', - }) - ) - ); + it('should return custom error message when Loki returns escaping error', done => { + const ds = createLokiDSForTests(); const options = getQueryOptions({ targets: [{ expr: '{job="gra\\fana"}', refId: 'B' }], }); - try { - await ds.query(options).toPromise(); - } catch (err) { - expect(err.data.message).toBe( - 'Error: parse error at line 1, col 6: invalid char escape. Make sure that all special characters are escaped with \\. For more information on escaping of special characters visit LogQL documentation at https://github.com/grafana/loki/blob/master/docs/logql.md.' - ); - } + observableTester().subscribeAndExpectOnError({ + observable: ds.query(options), + expect: err => { + expect(err.data.message).toBe( + 'Error: parse error at line 1, col 6: invalid char escape. Make sure that all special characters are escaped with \\. For more information on escaping of special characters visit LogQL documentation at https://github.com/grafana/loki/blob/master/docs/logql.md.' + ); + }, + done, + }); + + fetchStream.error({ + data: { + message: 'parse error at line 1, col 6: invalid char escape', + }, + status: 400, + statusText: 'Bad Request', + }); }); }); - describe('When interpolating variables', () => { + describe('when interpolating variables', () => { let ds: LokiDatasource; let variable: CustomVariableModel; beforeEach(() => { - const customData = { ...(instanceSettings.jsonData || {}), maxLines: 20 }; - const customSettings = { ...instanceSettings, jsonData: customData }; - ds = new LokiDatasource(customSettings, templateSrvMock); + ds = createLokiDSForTests(); variable = { ...initialCustomVariableModelState }; }); @@ -258,86 +300,82 @@ describe('LokiDatasource', () => { }); describe('when performing testDataSource', () => { - let ds: DataSourceApi; - let result: any; + const getTestContext = () => { + const ds = createLokiDSForTests({} as TemplateSrv); + const promise = ds.testDatasource(); + + return { promise }; + }; describe('and call succeeds', () => { - beforeEach(async () => { - datasourceRequestMock.mockImplementation(async () => { - return Promise.resolve({ - status: 200, - data: { - values: ['avalue'], - }, - }); - }); - ds = new LokiDatasource(instanceSettings, {} as TemplateSrv); - result = await ds.testDatasource(); - }); + it('should return successfully', async () => { + const { promise } = getTestContext(); + + fetchStream.next(({ + status: 200, + data: { + values: ['avalue'], + }, + } as unknown) as FetchResponse); + + fetchStream.complete(); + + const result = await promise; - it('should return successfully', () => { expect(result.status).toBe('success'); }); }); describe('and call fails with 401 error', () => { - let ds: LokiDatasource; - beforeEach(() => { - datasourceRequestMock.mockImplementation(() => - Promise.reject({ - statusText: 'Unauthorized', - status: 401, - data: { - message: 'Unauthorized', - }, - }) - ); - - const customData = { ...(instanceSettings.jsonData || {}), maxLines: 20 }; - const customSettings = { ...instanceSettings, jsonData: customData }; - ds = new LokiDatasource(customSettings, templateSrvMock); - }); - it('should return error status and a detailed error message', async () => { - const result = await ds.testDatasource(); + const { promise } = getTestContext(); + + fetchStream.error({ + statusText: 'Unauthorized', + status: 401, + data: { + message: 'Unauthorized', + }, + }); + + const result = await promise; + expect(result.status).toEqual('error'); expect(result.message).toBe('Loki: Unauthorized. 401. Unauthorized'); }); }); describe('and call fails with 404 error', () => { - beforeEach(async () => { - datasourceRequestMock.mockImplementation(() => - Promise.reject({ - statusText: 'Not found', - status: 404, - data: '404 page not found', - }) - ); - ds = new LokiDatasource(instanceSettings, {} as TemplateSrv); - result = await ds.testDatasource(); - }); + it('should return error status and a detailed error message', async () => { + const { promise } = getTestContext(); + + fetchStream.error({ + statusText: 'Not found', + status: 404, + data: { + message: '404 page not found', + }, + }); + + const result = await promise; - it('should return error status and a detailed error message', () => { expect(result.status).toEqual('error'); expect(result.message).toBe('Loki: Not found. 404. 404 page not found'); }); }); describe('and call fails with 502 error', () => { - beforeEach(async () => { - datasourceRequestMock.mockImplementation(() => - Promise.reject({ - statusText: 'Bad Gateway', - status: 502, - data: '', - }) - ); - ds = new LokiDatasource(instanceSettings, {} as TemplateSrv); - result = await ds.testDatasource(); - }); + it('should return error status and a detailed error message', async () => { + const { promise } = getTestContext(); + + fetchStream.error({ + statusText: 'Bad Gateway', + status: 502, + data: '', + }); + + const result = await promise; - it('should return error status and a detailed error message', () => { expect(result.status).toEqual('error'); expect(result.message).toBe('Loki: Bad Gateway. 502'); }); @@ -345,56 +383,63 @@ describe('LokiDatasource', () => { }); describe('when creating a range query', () => { - const ds = new LokiDatasource(instanceSettings, templateSrvMock); - const query: LokiQuery = { expr: 'foo', refId: 'bar' }; - // Loki v1 API has an issue with float step parameters, can be removed when API is fixed it('should produce an integer step parameter', () => { + const ds = createLokiDSForTests(); + const query: LokiQuery = { expr: 'foo', refId: 'bar' }; const range: TimeRange = { from: dateTime(0), to: dateTime(1e9 + 1), raw: { from: '0', to: '1000000001' }, }; + // Odd timerange/interval combination that would lead to a float step const options = { range, intervalMs: 2000 }; + expect(Number.isInteger(ds.createRangeQuery(query, options as any).step!)).toBeTruthy(); }); }); - describe('annotationQuery', () => { - it('should transform the loki data to annotation response', async () => { - const ds = new LokiDatasource(instanceSettings, templateSrvMock); - datasourceRequestMock.mockImplementation( - jest.fn().mockReturnValueOnce( - Promise.resolve({ - data: { - data: { - resultType: LokiResultType.Stream, - result: [ - { - stream: { - label: 'value', - label2: 'value ', - }, - values: [['1549016857498000000', 'hello']], - }, - { - stream: { - label2: 'value2', - }, - values: [['1549024057498000000', 'hello 2']], - }, - ], - }, - status: 'success', - }, - }) - ) - ); - + describe('when calling annotationQuery', () => { + const getTestContext = () => { const query = makeAnnotationQueryRequest(); + const ds = createLokiDSForTests(); + const promise = ds.annotationQuery(query); + + return { promise }; + }; + + it('should transform the loki data to annotation response', async () => { + const { promise } = getTestContext(); + const response: FetchResponse = ({ + data: { + data: { + resultType: LokiResultType.Stream, + result: [ + { + stream: { + label: 'value', + label2: 'value ', + }, + values: [['1549016857498000000', 'hello']], + }, + { + stream: { + label2: 'value2', + }, + values: [['1549024057498000000', 'hello 2']], + }, + ], + }, + status: 'success', + }, + } as unknown) as FetchResponse; + + fetchStream.next(response); + fetchStream.complete(); + + const res = await promise; - const res = await ds.annotationQuery(query); expect(res.length).toBe(2); expect(res[0].text).toBe('hello'); expect(res[0].tags).toEqual(['value']); @@ -405,98 +450,72 @@ describe('LokiDatasource', () => { }); describe('metricFindQuery', () => { - const ds = new LokiDatasource(instanceSettings, templateSrvMock); + const getTestContext = (mock: LokiDatasource) => { + const ds = createLokiDSForTests(); + ds.getVersion = mock.getVersion; + ds.metadataRequest = mock.metadataRequest; + + return { ds }; + }; + const mocks = makeMetadataAndVersionsMocks(); mocks.forEach((mock, index) => { it(`should return label names for Loki v${index}`, async () => { - ds.getVersion = mock.getVersion; - ds.metadataRequest = mock.metadataRequest; - const query = 'label_names()'; - const res = await ds.metricFindQuery(query); - expect(res[0].text).toEqual('label1'); - expect(res[1].text).toEqual('label2'); - expect(res.length).toBe(2); - }); - }); + const { ds } = getTestContext(mock); - mocks.forEach((mock, index) => { - it(`should return label names for Loki v${index}`, async () => { - ds.getVersion = mock.getVersion; - ds.metadataRequest = mock.metadataRequest; - const query = 'label_names()'; - const res = await ds.metricFindQuery(query); - expect(res[0].text).toEqual('label1'); - expect(res[1].text).toEqual('label2'); - expect(res.length).toBe(2); + const res = await ds.metricFindQuery('label_names()'); + + expect(res).toEqual([{ text: 'label1' }, { text: 'label2' }]); }); }); mocks.forEach((mock, index) => { it(`should return label values for Loki v${index}`, async () => { - ds.getVersion = mock.getVersion; - ds.metadataRequest = mock.metadataRequest; - const query = 'label_values(label1)'; - const res = await ds.metricFindQuery(query); - expect(res[0].text).toEqual('value1'); - expect(res[1].text).toEqual('value2'); - expect(res.length).toBe(2); + const { ds } = getTestContext(mock); + + const res = await ds.metricFindQuery('label_values(label1)'); + + expect(res).toEqual([{ text: 'value1' }, { text: 'value2' }]); }); }); mocks.forEach((mock, index) => { it(`should return empty array when incorrect query for Loki v${index}`, async () => { - ds.getVersion = mock.getVersion; - ds.metadataRequest = mock.metadataRequest; - const query = 'incorrect_query'; - const res = await ds.metricFindQuery(query); - expect(res.length).toBe(0); + const { ds } = getTestContext(mock); + + const res = await ds.metricFindQuery('incorrect_query'); + + expect(res).toEqual([]); }); }); mocks.forEach((mock, index) => { - it(`should return label names according to provided rangefor Loki v${index} `, async () => { - ds.getVersion = mock.getVersion; - ds.metadataRequest = mock.metadataRequest; - const query = 'label_names()'; - const res = await ds.metricFindQuery(query, { - range: { from: new Date(2), to: new Date(3) }, - }); - expect(res[0].text).toEqual('label1'); - expect(res.length).toBe(1); + it(`should return label names according to provided rangefor Loki v${index}`, async () => { + const { ds } = getTestContext(mock); + + const res = await ds.metricFindQuery('label_names()', { range: { from: new Date(2), to: new Date(3) } }); + + expect(res).toEqual([{ text: 'label1' }]); }); }); }); }); -type LimitTestArgs = { - maxDataPoints?: number; - maxLines?: number; - expectedLimit: number; -}; -function makeLimitTest(instanceSettings: any, datasourceRequestMock: any, templateSrvMock: any, testResp: any) { - return ({ maxDataPoints, maxLines, expectedLimit }: LimitTestArgs) => { - let settings = instanceSettings; - if (Number.isFinite(maxLines!)) { - const customData = { ...(instanceSettings.jsonData || {}), maxLines: 20 }; - settings = { ...instanceSettings, jsonData: customData }; - } - const ds = new LokiDatasource(settings, templateSrvMock); - datasourceRequestMock.mockImplementation(() => Promise.resolve(testResp)); - - const options = getQueryOptions({ targets: [{ expr: 'foo', refId: 'B', maxLines: maxDataPoints }] }); - if (Number.isFinite(maxDataPoints!)) { - options.maxDataPoints = maxDataPoints; - } else { - // By default is 500 - delete options.maxDataPoints; - } - - ds.query(options); - - expect(datasourceRequestMock.mock.calls.length).toBe(2); - expect(datasourceRequestMock.mock.calls[0][0].url).toContain(`limit=${expectedLimit}`); +function createLokiDSForTests( + templateSrvMock = ({ + getAdhocFilters: (): any[] => [], + replace: (a: string) => a, + } as unknown) as TemplateSrv +): LokiDatasource { + const instanceSettings: any = { + url: 'myloggingurl', }; + + const customData = { ...(instanceSettings.jsonData || {}), maxLines: 20 }; + const customSettings = { ...instanceSettings, jsonData: customData }; + + return new LokiDatasource(customSettings, templateSrvMock); } function makeAnnotationQueryRequest(): AnnotationQueryRequest { diff --git a/public/app/plugins/datasource/loki/datasource.ts b/public/app/plugins/datasource/loki/datasource.ts index abb733eb28f..543d8210775 100644 --- a/public/app/plugins/datasource/loki/datasource.ts +++ b/public/app/plugins/datasource/loki/datasource.ts @@ -1,41 +1,43 @@ // Libraries -import { isEmpty, map as lodashMap, cloneDeep } from 'lodash'; -import { Observable, from, merge, of } from 'rxjs'; -import { map, catchError, switchMap } from 'rxjs/operators'; - -// Services & Utils -import { DataFrame, dateMath, FieldCache, QueryResultMeta, TimeRange } from '@grafana/data'; -import { getBackendSrv, BackendSrvRequest, FetchError } from '@grafana/runtime'; -import { addLabelToQuery } from 'app/plugins/datasource/prometheus/add_label_to_query'; -import { TemplateSrv } from 'app/features/templating/template_srv'; -import { convertToWebSocketUrl } from 'app/core/utils/explore'; -import { lokiResultsToTableModel, processRangeQueryResponse, lokiStreamResultToDataFrame } from './result_transformer'; -import { getHighlighterExpressionsFromQuery } from './query_utils'; -import { getTimeSrv } from 'app/features/dashboard/services/TimeSrv'; +import { cloneDeep, isEmpty, map as lodashMap } from 'lodash'; +import { merge, Observable, of } from 'rxjs'; +import { catchError, map, switchMap } from 'rxjs/operators'; // Types import { - LogRowModel, - DateTime, - LoadingState, AnnotationEvent, + AnnotationQueryRequest, + DataFrame, DataFrameView, - PluginMeta, - DataSourceApi, - DataSourceInstanceSettings, DataQueryError, DataQueryRequest, DataQueryResponse, - AnnotationQueryRequest, + DataSourceApi, + DataSourceInstanceSettings, + dateMath, + DateTime, + FieldCache, + LoadingState, + LogRowModel, + PluginMeta, + QueryResultMeta, ScopedVars, + TimeRange, } from '@grafana/data'; +import { BackendSrvRequest, FetchError, getBackendSrv } from '@grafana/runtime'; +import { addLabelToQuery } from 'app/plugins/datasource/prometheus/add_label_to_query'; +import { TemplateSrv } from 'app/features/templating/template_srv'; +import { convertToWebSocketUrl } from 'app/core/utils/explore'; +import { lokiResultsToTableModel, lokiStreamResultToDataFrame, processRangeQueryResponse } from './result_transformer'; +import { getHighlighterExpressionsFromQuery } from './query_utils'; +import { getTimeSrv } from 'app/features/dashboard/services/TimeSrv'; import { - LokiQuery, LokiOptions, + LokiQuery, + LokiRangeQueryRequest, LokiResponse, LokiResultType, - LokiRangeQueryRequest, LokiStreamResponse, } from './types'; import { LiveStreams, LokiLiveTarget } from './live_streams'; @@ -79,7 +81,7 @@ export class LokiDatasource extends DataSourceApi { url, }; - return from(getBackendSrv().datasourceRequest(req)); + return getBackendSrv().fetch>(req); } query(options: DataQueryRequest): Observable { diff --git a/public/test/helpers/observableTester.ts b/public/test/helpers/observableTester.ts new file mode 100644 index 00000000000..5aa5debe9d7 --- /dev/null +++ b/public/test/helpers/observableTester.ts @@ -0,0 +1,68 @@ +import { Observable } from 'rxjs'; + +interface ObservableTester { + observable: Observable; + done: jest.DoneCallback; +} + +interface SubscribeAndExpectOnNext extends ObservableTester { + expect: (value: T) => void; +} + +interface SubscribeAndExpectOnComplete extends ObservableTester { + expect: () => void; +} + +interface SubscribeAndExpectOnError extends ObservableTester { + expect: (err: any) => void; +} + +export const observableTester = () => { + const subscribeAndExpectOnNext = ({ observable, expect, done }: SubscribeAndExpectOnNext): void => { + observable.subscribe({ + next: value => { + try { + expect(value); + } catch (err) { + done.fail(err); + } + }, + error: err => done.fail(err), + complete: () => done(), + }); + }; + + const subscribeAndExpectOnComplete = ({ observable, expect, done }: SubscribeAndExpectOnComplete): void => { + observable.subscribe({ + next: () => {}, + error: err => done.fail(err), + complete: () => { + try { + expect(); + done(); + } catch (err) { + done.fail(err); + } + }, + }); + }; + + const subscribeAndExpectOnError = ({ observable, expect, done }: SubscribeAndExpectOnError): void => { + observable.subscribe({ + next: () => {}, + error: err => { + try { + expect(err); + done(); + } catch (err) { + done.fail(err); + } + }, + complete: () => { + done(); + }, + }); + }; + + return { subscribeAndExpectOnNext, subscribeAndExpectOnComplete, subscribeAndExpectOnError }; +};