diff --git a/packages/grafana-data/src/types/config.ts b/packages/grafana-data/src/types/config.ts index 4b80e161e3e..e4b68d72f49 100644 --- a/packages/grafana-data/src/types/config.ts +++ b/packages/grafana-data/src/types/config.ts @@ -99,4 +99,5 @@ export interface GrafanaConfig { pluginsToPreload: string[]; featureToggles: FeatureToggles; licenseInfo: LicenseInfo; + http2Enabled: boolean; } diff --git a/packages/grafana-runtime/src/config.ts b/packages/grafana-runtime/src/config.ts index 1dbb47f78fd..760dcd6c1de 100644 --- a/packages/grafana-runtime/src/config.ts +++ b/packages/grafana-runtime/src/config.ts @@ -58,6 +58,7 @@ export class GrafanaBootConfig implements GrafanaConfig { }; licenseInfo: LicenseInfo = {} as LicenseInfo; rendererAvailable = false; + http2Enabled = false; constructor(options: GrafanaBootConfig) { this.theme = options.bootData.user.lightTheme ? getTheme(GrafanaThemeType.Light) : getTheme(GrafanaThemeType.Dark); diff --git a/pkg/api/frontendsettings.go b/pkg/api/frontendsettings.go index 2021ddf5b84..00146c7afba 100644 --- a/pkg/api/frontendsettings.go +++ b/pkg/api/frontendsettings.go @@ -226,6 +226,7 @@ func (hs *HTTPServer) getFrontendSettingsMap(c *models.ReqContext) (map[string]i }, "featureToggles": hs.Cfg.FeatureToggles, "rendererAvailable": hs.RenderService.IsAvailable(), + "http2Enabled": hs.Cfg.Protocol == setting.HTTP2, } return jsonObj, nil diff --git a/pkg/setting/setting.go b/pkg/setting/setting.go index 2f1d0d778f9..fd9018e12a8 100644 --- a/pkg/setting/setting.go +++ b/pkg/setting/setting.go @@ -673,7 +673,6 @@ func (cfg *Cfg) Load(args *CommandLineArgs) error { cfg.ServeFromSubPath = ServeFromSubPath Protocol = HTTP - cfg.Protocol = Protocol protocolStr, err := valueAsString(server, "protocol", "http") if err != nil { return err @@ -692,6 +691,7 @@ func (cfg *Cfg) Load(args *CommandLineArgs) error { Protocol = SOCKET SocketPath = server.Key("socket").String() } + cfg.Protocol = Protocol Domain, err = valueAsString(server, "domain", "localhost") if err != nil { diff --git a/public/app/core/services/FetchQueue.test.ts b/public/app/core/services/FetchQueue.test.ts new file mode 100644 index 00000000000..2738e752c22 --- /dev/null +++ b/public/app/core/services/FetchQueue.test.ts @@ -0,0 +1,156 @@ +import { Observable } from 'rxjs'; +import { take } from 'rxjs/operators'; +import { BackendSrvRequest } from '@grafana/runtime'; + +import { FetchQueue, FetchQueueUpdate, FetchStatus } from './FetchQueue'; + +type SubscribeTesterArgs = { + observable: Observable; + expectCallback: (data: T) => void; + doneCallback: jest.DoneCallback; +}; + +export const subscribeTester = ({ observable, expectCallback, doneCallback }: SubscribeTesterArgs) => { + observable.subscribe({ + next: data => expectCallback(data), + complete: () => { + doneCallback(); + }, + }); +}; + +describe('FetchQueue', () => { + describe('add', () => { + describe('when called twice', () => { + it('then an update with the correct state should be published', done => { + const id = 'id'; + const id2 = 'id2'; + const options: BackendSrvRequest = { url: 'http://someurl' }; + const options2: BackendSrvRequest = { url: 'http://someotherurl' }; + const expects: FetchQueueUpdate[] = [ + { + noOfPending: 1, + noOfInProgress: 0, + state: { + ['id']: { options: { url: 'http://someurl' }, state: FetchStatus.Pending }, + }, + }, + { + noOfPending: 2, + noOfInProgress: 0, + state: { + ['id']: { options: { url: 'http://someurl' }, state: FetchStatus.Pending }, + ['id2']: { options: { url: 'http://someotherurl' }, state: FetchStatus.Pending }, + }, + }, + ]; + const queue = new FetchQueue(); + let calls = 0; + + subscribeTester({ + observable: queue.getUpdates().pipe(take(2)), + expectCallback: data => expect(data).toEqual(expects[calls++]), + doneCallback: done, + }); + + queue.add(id, options); + queue.add(id2, options2); + }); + }); + }); + + describe('setInProgress', () => { + describe('when called', () => { + it('then an update with the correct state should be published', done => { + const id = 'id'; + const id2 = 'id2'; + const options: BackendSrvRequest = { url: 'http://someurl' }; + const options2: BackendSrvRequest = { url: 'http://someotherurl' }; + const expects: FetchQueueUpdate[] = [ + { + noOfPending: 1, + noOfInProgress: 0, + state: { + ['id']: { options: { url: 'http://someurl' }, state: FetchStatus.Pending }, + }, + }, + { + noOfPending: 2, + noOfInProgress: 0, + state: { + ['id']: { options: { url: 'http://someurl' }, state: FetchStatus.Pending }, + ['id2']: { options: { url: 'http://someotherurl' }, state: FetchStatus.Pending }, + }, + }, + { + noOfPending: 1, + noOfInProgress: 1, + state: { + ['id']: { options: { url: 'http://someurl' }, state: FetchStatus.Pending }, + ['id2']: { options: { url: 'http://someotherurl' }, state: FetchStatus.InProgress }, + }, + }, + ]; + const queue = new FetchQueue(); + let calls = 0; + + subscribeTester({ + observable: queue.getUpdates().pipe(take(3)), + expectCallback: data => expect(data).toEqual(expects[calls++]), + doneCallback: done, + }); + + queue.add(id, options); + queue.add(id2, options2); + queue.setInProgress(id2); + }); + }); + }); + + describe('setDone', () => { + describe('when called', () => { + it('then an update with the correct state should be published', done => { + const id = 'id'; + const id2 = 'id2'; + const options: BackendSrvRequest = { url: 'http://someurl' }; + const options2: BackendSrvRequest = { url: 'http://someotherurl' }; + const expects: FetchQueueUpdate[] = [ + { + noOfPending: 1, + noOfInProgress: 0, + state: { + ['id']: { options: { url: 'http://someurl' }, state: FetchStatus.Pending }, + }, + }, + { + noOfPending: 2, + noOfInProgress: 0, + state: { + ['id']: { options: { url: 'http://someurl' }, state: FetchStatus.Pending }, + ['id2']: { options: { url: 'http://someotherurl' }, state: FetchStatus.Pending }, + }, + }, + { + noOfPending: 1, + noOfInProgress: 0, + state: { + ['id2']: { options: { url: 'http://someotherurl' }, state: FetchStatus.Pending }, + }, + }, + ]; + const queue = new FetchQueue(); + let calls = 0; + + subscribeTester({ + observable: queue.getUpdates().pipe(take(3)), + expectCallback: data => expect(data).toEqual(expects[calls++]), + doneCallback: done, + }); + + queue.add(id, options); + queue.add(id2, options2); + queue.setDone(id); + }); + }); + }); +}); diff --git a/public/app/core/services/FetchQueue.ts b/public/app/core/services/FetchQueue.ts new file mode 100644 index 00000000000..bcf21118e67 --- /dev/null +++ b/public/app/core/services/FetchQueue.ts @@ -0,0 +1,94 @@ +import { Observable, Subject } from 'rxjs'; + +import { BackendSrvRequest } from '@grafana/runtime'; + +export interface QueueState extends Record {} + +export enum FetchStatus { + Pending, + InProgress, + Done, +} + +export interface FetchQueueUpdate { + noOfInProgress: number; + noOfPending: number; + state: QueueState; +} + +interface QueueStateEntry { + id: string; + options?: BackendSrvRequest; + state: FetchStatus; +} + +export class FetchQueue { + private state: QueueState = {}; // internal queue state + private queue: Subject = new Subject(); // internal stream for requests that are to be queued + private updates: Subject = new Subject(); // external stream with updates to the queue state + + constructor(debug = false) { + // This will create an implicit live subscription for as long as this class lives. + // But as FetchQueue is used by the singleton backendSrv that also lives for as long as Grafana app lives + // I think this ok. We could add some disposable pattern later if the need arises. + this.queue.subscribe(entry => { + const { id, state, options } = entry; + + if (!this.state[id]) { + this.state[id] = { state: FetchStatus.Pending, options: {} as BackendSrvRequest }; + } + + if (state === FetchStatus.Done) { + delete this.state[id]; + const update = this.getUpdate(this.state); + this.publishUpdate(update, debug); + return; + } + + this.state[id].state = state; + + if (options) { + this.state[id].options = options; + } + + const update = this.getUpdate(this.state); + this.publishUpdate(update, debug); + }); + } + + add = (id: string, options: BackendSrvRequest): void => this.queue.next({ id, options, state: FetchStatus.Pending }); + + setInProgress = (id: string): void => this.queue.next({ id, state: FetchStatus.InProgress }); + + setDone = (id: string): void => this.queue.next({ id, state: FetchStatus.Done }); + + getUpdates = (): Observable => this.updates.asObservable(); + + private getUpdate = (state: QueueState): FetchQueueUpdate => { + const noOfInProgress = Object.keys(state).filter(key => state[key].state === FetchStatus.InProgress).length; + const noOfPending = Object.keys(state).filter(key => state[key].state === FetchStatus.Pending).length; + + return { noOfPending, noOfInProgress, state }; + }; + + private publishUpdate = (update: FetchQueueUpdate, debug: boolean): void => { + this.printState(update, debug); + this.updates.next(update); + }; + + private printState = (update: FetchQueueUpdate, debug: boolean): void => { + if (!debug) { + return; + } + + const entriesWithoutOptions = Object.keys(update.state).reduce((all, key) => { + const entry = { id: key, state: update.state[key].state }; + all.push(entry); + return all; + }, [] as Array<{ id: string; state: FetchStatus }>); + + console.log('FetchQueue noOfStarted', update.noOfInProgress); + console.log('FetchQueue noOfNotStarted', update.noOfPending); + console.log('FetchQueue state', entriesWithoutOptions); + }; +} diff --git a/public/app/core/services/FetchQueueWorker.test.ts b/public/app/core/services/FetchQueueWorker.test.ts new file mode 100644 index 00000000000..69553f5bc82 --- /dev/null +++ b/public/app/core/services/FetchQueueWorker.test.ts @@ -0,0 +1,97 @@ +import { Subject } from 'rxjs'; + +import { FetchQueue, FetchQueueUpdate, FetchStatus } from './FetchQueue'; +import { ResponseQueue } from './ResponseQueue'; +import { FetchQueueWorker } from './FetchQueueWorker'; +import { expect } from '../../../test/lib/common'; +import { GrafanaBootConfig } from '@grafana/runtime'; + +const getTestContext = (http2Enabled = false) => { + const config: GrafanaBootConfig = ({ http2Enabled } as unknown) as GrafanaBootConfig; + const dataUrl = 'http://localhost:3000/api/ds/query?=abc'; + const apiUrl = 'http://localhost:3000/api/alerts?state=all'; + const updates: Subject = new Subject(); + + const queueMock: FetchQueue = ({ + add: jest.fn(), + setInProgress: jest.fn(), + setDone: jest.fn(), + getUpdates: () => updates.asObservable(), + } as unknown) as FetchQueue; + + const addMock = jest.fn(); + const responseQueueMock: ResponseQueue = ({ + add: addMock, + getResponses: jest.fn(), + } as unknown) as ResponseQueue; + + new FetchQueueWorker(queueMock, responseQueueMock, config); + + return { dataUrl, apiUrl, updates, queueMock, addMock }; +}; + +describe('FetchQueueWorker', () => { + describe('when an update is pushed in the stream', () => { + describe('and queue has no pending entries', () => { + it('then nothing should be added to the responseQueue', () => { + const { updates, addMock } = getTestContext(); + updates.next({ noOfPending: 0, noOfInProgress: 1, state: {} }); + + expect(addMock).toHaveBeenCalledTimes(0); + }); + }); + + describe('and queue has pending entries', () => { + describe('and there are no entries in progress', () => { + it('then api request should be added before data requests responseQueue', () => { + const { updates, addMock, dataUrl, apiUrl } = getTestContext(); + updates.next({ + noOfPending: 2, + noOfInProgress: 0, + state: { + ['data']: { state: FetchStatus.Pending, options: { url: dataUrl } }, + ['api']: { state: FetchStatus.Pending, options: { url: apiUrl } }, + }, + }); + + expect(addMock.mock.calls).toEqual([ + ['api', { url: 'http://localhost:3000/api/alerts?state=all' }], + ['data', { url: 'http://localhost:3000/api/ds/query?=abc' }], + ]); + }); + }); + + describe('and there are max concurrent entries in progress', () => { + it('then api request should always pass through but no data requests should pass', () => { + const { updates, addMock, dataUrl, apiUrl } = getTestContext(); + updates.next({ + noOfPending: 2, + noOfInProgress: 5, + state: { + ['data']: { state: FetchStatus.Pending, options: { url: dataUrl } }, + ['api']: { state: FetchStatus.Pending, options: { url: apiUrl } }, + }, + }); + + expect(addMock.mock.calls).toEqual([['api', { url: 'http://localhost:3000/api/alerts?state=all' }]]); + }); + }); + + describe('and http2 is enabled and there are max concurrent entries in progress', () => { + it('then api request should always pass through but no data requests should pass', () => { + const { updates, addMock, dataUrl, apiUrl } = getTestContext(true); + updates.next({ + noOfPending: 2, + noOfInProgress: 1000, + state: { + ['data']: { state: FetchStatus.Pending, options: { url: dataUrl } }, + ['api']: { state: FetchStatus.Pending, options: { url: apiUrl } }, + }, + }); + + expect(addMock.mock.calls).toEqual([['api', { url: 'http://localhost:3000/api/alerts?state=all' }]]); + }); + }); + }); + }); +}); diff --git a/public/app/core/services/FetchQueueWorker.ts b/public/app/core/services/FetchQueueWorker.ts new file mode 100644 index 00000000000..e0aac03341d --- /dev/null +++ b/public/app/core/services/FetchQueueWorker.ts @@ -0,0 +1,58 @@ +import { concatMap, filter } from 'rxjs/operators'; + +import { FetchQueue, FetchStatus } from './FetchQueue'; +import { BackendSrvRequest } from '@grafana/runtime'; +import { isDataQuery } from '../utils/query'; +import { ResponseQueue } from './ResponseQueue'; +import { getConfig } from '../config'; + +interface WorkerEntry { + id: string; + options: BackendSrvRequest; +} + +export class FetchQueueWorker { + constructor(fetchQueue: FetchQueue, responseQueue: ResponseQueue, config = getConfig()) { + const maxParallelRequests = config.http2Enabled ? 1000 : 5; // assuming that 1000 parallel requests are enough for http2 + + // This will create an implicit live subscription for as long as this class lives. + // But as FetchQueueWorker is used by the singleton backendSrv that also lives for as long as Grafana app lives + // I think this ok. We could add some disposable pattern later if the need arises. + fetchQueue + .getUpdates() + .pipe( + filter(({ noOfPending }) => noOfPending > 0), // no reason to act if there is nothing to act upon + // Using concatMap instead of mergeMap so that the order with apiRequests first is preserved + // https://rxjs.dev/api/operators/concatMap + concatMap(({ state, noOfInProgress }) => { + const apiRequests = Object.keys(state) + .filter(k => state[k].state === FetchStatus.Pending && !isDataQuery(state[k].options.url)) + .reduce((all, key) => { + const entry = { id: key, options: state[key].options }; + all.push(entry); + return all; + }, [] as WorkerEntry[]); + + const dataRequests = Object.keys(state) + .filter(key => state[key].state === FetchStatus.Pending && isDataQuery(state[key].options.url)) + .reduce((all, key) => { + const entry = { id: key, options: state[key].options }; + all.push(entry); + return all; + }, [] as WorkerEntry[]); + + // apiRequests have precedence over data requests and should always be called directly + // this means we can end up with a negative value. + // Because the way Array.toSlice works with negative numbers we use Math.max below. + const noOfAllowedDataRequests = Math.max(maxParallelRequests - noOfInProgress - apiRequests.length, 0); + const dataRequestToFetch = dataRequests.slice(0, noOfAllowedDataRequests); + + return apiRequests.concat(dataRequestToFetch); + }) + ) + .subscribe(({ id, options }) => { + // This will add an entry to the responseQueue + responseQueue.add(id, options); + }); + } +} diff --git a/public/app/core/services/ResponseQueue.test.ts b/public/app/core/services/ResponseQueue.test.ts new file mode 100644 index 00000000000..26f4fab9ef0 --- /dev/null +++ b/public/app/core/services/ResponseQueue.test.ts @@ -0,0 +1,101 @@ +import { of } from 'rxjs'; +import { first } from 'rxjs/operators'; +import { BackendSrvRequest } from '@grafana/runtime'; + +import { FetchQueue, FetchQueueUpdate } from './FetchQueue'; +import { ResponseQueue } from './ResponseQueue'; +import { subscribeTester } from './FetchQueue.test'; +import { describe, expect } from '../../../test/lib/common'; + +const getTestContext = () => { + const id = 'id'; + const options: BackendSrvRequest = { url: 'http://someurl' }; + const expects: FetchQueueUpdate[] = []; + + const fetchResult = of({ + data: id, + status: 200, + statusText: 'OK', + ok: true, + headers: (null as unknown) as Headers, + redirected: false, + type: (null as unknown) as ResponseType, + url: options.url, + config: (null as unknown) as BackendSrvRequest, + }); + + const fetchMock = jest.fn().mockReturnValue(fetchResult); + const setInProgressMock = jest.fn(); + const setDoneMock = jest.fn(); + + const queueMock: FetchQueue = ({ + add: jest.fn(), + setInProgress: setInProgressMock, + setDone: setDoneMock, + getUpdates: jest.fn(), + } as unknown) as FetchQueue; + + const responseQueue = new ResponseQueue(queueMock, fetchMock); + + return { id, options, expects, fetchMock, setInProgressMock, setDoneMock, responseQueue, fetchResult }; +}; + +describe('ResponseQueue', () => { + describe('add', () => { + describe('when called', () => { + it('then the matching fetchQueue entry should be set to inProgress', () => { + const { id, options, setInProgressMock, setDoneMock, responseQueue } = getTestContext(); + + responseQueue.add(id, options); + + expect(setInProgressMock.mock.calls).toEqual([['id']]); + expect(setDoneMock).not.toHaveBeenCalled(); + }); + + it('then a response entry with correct id should be published', done => { + const { id, options, responseQueue } = getTestContext(); + + subscribeTester({ + observable: responseQueue.getResponses(id).pipe(first()), + expectCallback: data => expect(data.id).toEqual(id), + doneCallback: done, + }); + + responseQueue.add(id, options); + }); + + it('then fetch is called with correct options', done => { + const { id, options, responseQueue, fetchMock } = getTestContext(); + + subscribeTester({ + observable: responseQueue.getResponses(id).pipe(first()), + expectCallback: () => { + expect(fetchMock).toHaveBeenCalledTimes(1); + expect(fetchMock).toHaveBeenCalledWith({ url: 'http://someurl' }); + }, + doneCallback: done, + }); + + responseQueue.add(id, options); + }); + + describe('and when the fetch Observable is completed', () => { + it('then the matching fetchQueue entry should be set to Done', done => { + const { id, options, responseQueue, setInProgressMock, setDoneMock } = getTestContext(); + + subscribeTester({ + observable: responseQueue.getResponses(id).pipe(first()), + expectCallback: data => { + data.observable.subscribe().unsubscribe(); + expect(setInProgressMock.mock.calls).toEqual([['id']]); + expect(setDoneMock.mock.calls).toEqual([['id']]); + }, + doneCallback: done, + }); + + responseQueue.add(id, options); + }); + }); + }); + }); +}); diff --git a/public/app/core/services/ResponseQueue.ts b/public/app/core/services/ResponseQueue.ts new file mode 100644 index 00000000000..c3ab773db8e --- /dev/null +++ b/public/app/core/services/ResponseQueue.ts @@ -0,0 +1,50 @@ +import { Observable, Subject } from 'rxjs'; +import { filter, finalize } from 'rxjs/operators'; +import { BackendSrvRequest, FetchResponse } from '@grafana/runtime'; +import { FetchQueue } from './FetchQueue'; + +interface FetchWorkEntry { + id: string; + options: BackendSrvRequest; +} + +interface FetchResponsesEntry { + id: string; + observable: Observable>; +} + +export class ResponseQueue { + private queue: Subject = new Subject(); // internal stream for requests that are to be executed + private responses: Subject> = new Subject>(); // external stream with responses from fetch + + constructor(fetchQueue: FetchQueue, fetch: (options: BackendSrvRequest) => Observable>) { + // This will create an implicit live subscription for as long as this class lives. + // But as FetchQueue is used by the singleton backendSrv that also lives for as long as Grafana app lives + // I think this ok. We could add some disposable pattern later if the need arises. + this.queue.subscribe(entry => { + const { id, options } = entry; + + // Let the fetchQueue know that this id has started data fetching. + fetchQueue.setInProgress(id); + + this.responses.next({ + id, + observable: fetch(options).pipe( + // finalize is called whenever this observable is unsubscribed/errored/completed/canceled + // https://rxjs.dev/api/operators/finalize + finalize(() => { + // Let the fetchQueue know that this id is done. + fetchQueue.setDone(id); + }) + ), + }); + }); + } + + add = (id: string, options: BackendSrvRequest): void => { + this.queue.next({ id, options }); + }; + + getResponses = (id: string): Observable> => + this.responses.asObservable().pipe(filter(entry => entry.id === id)); +} diff --git a/public/app/core/services/backend_srv.ts b/public/app/core/services/backend_srv.ts index 38df1149472..c94a7f5aa02 100644 --- a/public/app/core/services/backend_srv.ts +++ b/public/app/core/services/backend_srv.ts @@ -1,7 +1,7 @@ -import { from, merge, MonoTypeOperatorFunction, Observable, Subject, throwError } from 'rxjs'; +import { from, merge, MonoTypeOperatorFunction, Observable, Subject, Subscription, throwError } from 'rxjs'; import { catchError, filter, map, mergeMap, retryWhen, share, takeUntil, tap, throwIfEmpty } from 'rxjs/operators'; import { fromFetch } from 'rxjs/fetch'; -import { BackendSrv as BackendService, BackendSrvRequest, FetchResponse, FetchError } from '@grafana/runtime'; +import { BackendSrv as BackendService, BackendSrvRequest, FetchError, FetchResponse } from '@grafana/runtime'; import { AppEvents } from '@grafana/data'; import appEvents from 'app/core/app_events'; @@ -12,6 +12,10 @@ import { coreModule } from 'app/core/core_module'; import { ContextSrv, contextSrv } from './context_srv'; import { Emitter } from '../utils/emitter'; import { parseInitFromOptions, parseUrlFromOptions } from '../utils/fetch'; +import { isDataQuery, isLocalUrl } from '../utils/query'; +import { FetchQueue } from './FetchQueue'; +import { ResponseQueue } from './ResponseQueue'; +import { FetchQueueWorker } from './FetchQueueWorker'; const CANCEL_ALL_REQUESTS_REQUEST_ID = 'cancel_all_requests_request_id'; @@ -27,6 +31,8 @@ export class BackendSrv implements BackendService { private HTTP_REQUEST_CANCELED = -1; private noBackendCache: boolean; private inspectorStream: Subject = new Subject(); + private readonly fetchQueue: FetchQueue; + private readonly responseQueue: ResponseQueue; private dependencies: BackendSrvDependencies = { fromFetch: fromFetch, @@ -44,6 +50,11 @@ export class BackendSrv implements BackendService { ...deps, }; } + + this.internalFetch = this.internalFetch.bind(this); + this.fetchQueue = new FetchQueue(); + this.responseQueue = new ResponseQueue(this.fetchQueue, this.internalFetch); + new FetchQueueWorker(this.fetchQueue, this.responseQueue); } async request(options: BackendSrvRequest): Promise { @@ -53,6 +64,38 @@ export class BackendSrv implements BackendService { } fetch(options: BackendSrvRequest): Observable> { + return new Observable(observer => { + // We need to match an entry added to the queue stream with the entry that is eventually added to the response stream + // using Date.now() as the unique identifier + const id = Date.now().toString(10); + + // Subscription is an object that is returned whenever you subscribe to an Observable. + // You can also use it as a container of many subscriptions and when it is unsubscribed all subscriptions within are also unsubscribed. + const subscriptions: Subscription = new Subscription(); + + // We're using the subscriptions.add function to add the subscription implicitly returned by this.responseQueue.getResponses(id).subscribe below. + subscriptions.add( + this.responseQueue.getResponses(id).subscribe(result => { + // The one liner below can seem magical if you're not accustomed to RxJs. + // Firstly, we're subscribing to the result from the result.observable and we're passing in the outer observer object. + // By passing the outer observer object then any updates on result.observable are passed through to any subscriber of the fetch function. + // Secondly, we're adding the subscription implicitly returned by result.observable.subscribe(observer). + subscriptions.add(result.observable.subscribe(observer)); + }) + ); + + // Let the fetchQueue know that this id needs to start data fetching. + this.fetchQueue.add(id, options); + + // This returned function will be called whenever the returned Observable from the fetch function is unsubscribed/errored/completed/canceled. + return function unsubscribe() { + // When subscriptions is unsubscribed all the implicitly added subscriptions above are also unsubscribed. + subscriptions.unsubscribe(); + }; + }); + } + + private internalFetch(options: BackendSrvRequest): Observable> { if (options.requestId) { this.inFlightRequests.next(options.requestId); } @@ -103,10 +146,6 @@ export class BackendSrv implements BackendService { options.url = options.url.substring(1); } - // if (options.url.endsWith('/')) { - // options.url = options.url.slice(0, -1); - // } - if (options.headers?.Authorization) { options.headers['X-DS-Authorization'] = options.headers.Authorization; delete options.headers.Authorization; @@ -363,22 +402,6 @@ export class BackendSrv implements BackendService { } } -function isDataQuery(url: string): boolean { - if ( - url.indexOf('api/datasources/proxy') !== -1 || - url.indexOf('api/tsdb/query') !== -1 || - url.indexOf('api/ds/query') !== -1 - ) { - return true; - } - - return false; -} - -function isLocalUrl(url: string) { - return !url.match(/^http/); -} - coreModule.factory('backendSrv', () => backendSrv); // Used for testing and things that really need BackendSrv export const backendSrv = new BackendSrv(); diff --git a/public/app/core/utils/query.ts b/public/app/core/utils/query.ts index 3d94afa6a6d..6fd904ce22a 100644 --- a/public/app/core/utils/query.ts +++ b/public/app/core/utils/query.ts @@ -18,3 +18,19 @@ export function addQuery(queries: DataQuery[], query?: Partial): Data q.refId = getNextRefIdChar(queries); return [...queries, q as DataQuery]; } + +export function isDataQuery(url: string): boolean { + if ( + url.indexOf('api/datasources/proxy') !== -1 || + url.indexOf('api/tsdb/query') !== -1 || + url.indexOf('api/ds/query') !== -1 + ) { + return true; + } + + return false; +} + +export function isLocalUrl(url: string) { + return !url.match(/^http/); +} diff --git a/public/app/features/panel/specs/metrics_panel_ctrl.test.ts b/public/app/features/panel/specs/metrics_panel_ctrl.test.ts index be7a548562a..45f9140fe81 100644 --- a/public/app/features/panel/specs/metrics_panel_ctrl.test.ts +++ b/public/app/features/panel/specs/metrics_panel_ctrl.test.ts @@ -1,6 +1,7 @@ jest.mock('app/core/core', () => ({})); jest.mock('app/core/config', () => { return { + ...((jest.requireActual('app/core/config') as unknown) as object), bootData: { user: {}, }, diff --git a/public/app/features/teams/TeamPages.test.tsx b/public/app/features/teams/TeamPages.test.tsx index eafad3ba057..42b52fb0a85 100644 --- a/public/app/features/teams/TeamPages.test.tsx +++ b/public/app/features/teams/TeamPages.test.tsx @@ -1,13 +1,16 @@ import React from 'react'; import { shallow } from 'enzyme'; -import { TeamPages, Props } from './TeamPages'; -import { Team, TeamMember, OrgRole } from '../../types'; +import { Props, TeamPages } from './TeamPages'; +import { OrgRole, Team, TeamMember } from '../../types'; import { getMockTeam } from './__mocks__/teamMocks'; import { User } from 'app/core/services/context_srv'; import { NavModel } from '@grafana/data'; jest.mock('app/core/config', () => ({ - licenseInfo: { hasLicense: true }, + ...((jest.requireActual('app/core/config') as unknown) as object), + licenseInfo: { + hasLicense: true, + }, })); const setup = (propOverrides?: object) => {