BackendSrv: Restructure response stream to resolve a retried request correctly (#66095)

* BackendSrv: Restrucutre response stream to resolve a retried request
correctly

* BackendSrv: Get signed in property from user

* Fix test with correct number of calls

* To prevent retry we need to throw error

* Fix tests

* revert change

* Fixes to token rotation

* Align tests

* Remove commented out code

---------

Co-authored-by: Torkel Ödegaard <torkel@grafana.com>
Co-authored-by: Mihaly Gyongyosi <mgyongyosi@users.noreply.github.com>
This commit is contained in:
Karl Persson 2023-05-04 11:56:51 +02:00 committed by GitHub
parent 338e98e237
commit e03a8b6826
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 229 additions and 147 deletions

View File

@ -1631,8 +1631,8 @@ exports[`better eslint`] = {
"public/app/core/services/backend_srv.ts:5381": [
[0, 0, 0, "Unexpected any. Specify a different type.", "0"],
[0, 0, 0, "Unexpected any. Specify a different type.", "1"],
[0, 0, 0, "Unexpected any. Specify a different type.", "2"],
[0, 0, 0, "Do not use any type assertions.", "3"],
[0, 0, 0, "Do not use any type assertions.", "2"],
[0, 0, 0, "Unexpected any. Specify a different type.", "3"],
[0, 0, 0, "Unexpected any. Specify a different type.", "4"],
[0, 0, 0, "Unexpected any. Specify a different type.", "5"],
[0, 0, 0, "Unexpected any. Specify a different type.", "6"],

View File

@ -1,16 +1,17 @@
import {
from,
lastValueFrom,
merge,
MonoTypeOperatorFunction,
Observable,
of,
Subject,
Subscription,
throwError,
} from 'rxjs';
import { from, lastValueFrom, MonoTypeOperatorFunction, Observable, Subject, Subscription, throwError } from 'rxjs';
import { fromFetch } from 'rxjs/fetch';
import { catchError, filter, map, mergeMap, retryWhen, share, takeUntil, tap, throwIfEmpty } from 'rxjs/operators';
import {
catchError,
filter,
finalize,
map,
mergeMap,
retryWhen,
share,
takeUntil,
tap,
throwIfEmpty,
} from 'rxjs/operators';
import { v4 as uuidv4 } from 'uuid';
import { AppEvents, DataQueryErrorType } from '@grafana/data';
@ -59,6 +60,7 @@ export class BackendSrv implements BackendService {
private inspectorStream: Subject<FetchResponse | FetchError> = new Subject<FetchResponse | FetchError>();
private readonly fetchQueue: FetchQueue;
private readonly responseQueue: ResponseQueue;
private _tokenRotationInProgress?: Observable<FetchResponse> | null = null;
private dependencies: BackendSrvDependencies = {
fromFetch: fromFetch,
@ -141,18 +143,9 @@ export class BackendSrv implements BackendService {
}
}
const fromFetchStream = this.getFromFetchStream<T>(options);
const failureStream = fromFetchStream.pipe(this.toFailureStream<T>(options));
const successStream = fromFetchStream.pipe(
filter((response) => response.ok === true),
tap((response) => {
this.showSuccessAlert(response);
this.inspectorStream.next(response);
})
);
return merge(successStream, failureStream).pipe(
catchError((err: FetchError) => throwError(this.processRequestError(options, err))),
return this.getFromFetchStream<T>(options).pipe(
this.handleStreamResponse<T>(options),
this.handleStreamError(options),
this.handleStreamCancellation(options)
);
}
@ -228,70 +221,10 @@ export class BackendSrv implements BackendService {
traceId: response.headers.get(GRAFANA_TRACEID_HEADER) ?? undefined,
};
return fetchResponse;
}),
share() // sharing this so we can split into success and failure and then merge back
})
);
}
private toFailureStream<T>(options: BackendSrvRequest): MonoTypeOperatorFunction<FetchResponse<T>> {
const { isSignedIn } = this.dependencies.contextSrv.user;
return (inputStream) =>
inputStream.pipe(
filter((response) => response.ok === false),
mergeMap((response) => {
const { status, statusText, data } = response;
const fetchErrorResponse: FetchError = {
status,
statusText,
data,
config: options,
traceId: response.headers.get(GRAFANA_TRACEID_HEADER) ?? undefined,
};
return throwError(fetchErrorResponse);
}),
retryWhen((attempts: Observable<any>) =>
attempts.pipe(
mergeMap((error, i) => {
const firstAttempt = i === 0 && options.retry === 0;
if (error.status === 401 && isLocalUrl(options.url) && firstAttempt && isSignedIn) {
if (error.data?.error?.id === 'ERR_TOKEN_REVOKED') {
this.dependencies.appEvents.publish(
new ShowModalReactEvent({
component: TokenRevokedModal,
props: {
maxConcurrentSessions: error.data?.error?.maxConcurrentSessions,
},
})
);
return of({});
}
let authChecker = () => this.loginPing();
if (config.featureToggles.clientTokenRotation) {
authChecker = () => this.rotateToken();
}
return from(authChecker()).pipe(
catchError((err) => {
if (err.status === 401) {
this.dependencies.logout();
return throwError(err);
}
return throwError(err);
})
);
}
return throwError(error);
})
)
)
);
}
showApplicationErrorAlert(err: FetchError) {}
showSuccessAlert<T>(response: FetchResponse<T>) {
@ -386,6 +319,74 @@ export class BackendSrv implements BackendService {
return err;
}
private handleStreamResponse<T>(options: BackendSrvRequest): MonoTypeOperatorFunction<FetchResponse<T>> {
return (inputStream) =>
inputStream.pipe(
map((response) => {
if (!response.ok) {
const { status, statusText, data } = response;
const fetchErrorResponse: FetchError = {
status,
statusText,
data,
config: options,
traceId: response.headers.get(GRAFANA_TRACEID_HEADER) ?? undefined,
};
throw fetchErrorResponse;
}
return response;
}),
tap((response) => {
this.showSuccessAlert(response);
this.inspectorStream.next(response);
})
);
}
private handleStreamError<T>(options: BackendSrvRequest): MonoTypeOperatorFunction<FetchResponse<T>> {
const { isSignedIn } = this.dependencies.contextSrv.user;
return (inputStream) =>
inputStream.pipe(
retryWhen((attempts: Observable<any>) =>
attempts.pipe(
mergeMap((error, i) => {
const firstAttempt = i === 0 && options.retry === 0;
if (error.status === 401 && isLocalUrl(options.url) && firstAttempt && isSignedIn) {
if (error.data?.error?.id === 'ERR_TOKEN_REVOKED') {
this.dependencies.appEvents.publish(
new ShowModalReactEvent({
component: TokenRevokedModal,
props: {
maxConcurrentSessions: error.data?.error?.maxConcurrentSessions,
},
})
);
return throwError(() => error);
}
let authChecker = config.featureToggles.clientTokenRotation ? this.rotateToken() : this.loginPing();
return from(authChecker).pipe(
catchError((err) => {
if (err.status === 401) {
this.dependencies.logout();
return throwError(err);
}
return throwError(err);
})
);
}
return throwError(error);
})
)
),
catchError((err: FetchError) => throwError(() => this.processRequestError(options, err)))
);
}
private handleStreamCancellation(options: BackendSrvRequest): MonoTypeOperatorFunction<FetchResponse<any>> {
return (inputStream) =>
inputStream.pipe(
@ -459,11 +460,22 @@ export class BackendSrv implements BackendService {
}
rotateToken() {
return this.request({ url: '/api/user/auth-tokens/rotate', method: 'POST', retry: 1 });
if (this._tokenRotationInProgress) {
return this._tokenRotationInProgress;
}
this._tokenRotationInProgress = this.fetch({ url: '/api/user/auth-tokens/rotate', method: 'POST', retry: 1 }).pipe(
finalize(() => {
this._tokenRotationInProgress = null;
}),
share()
);
return this._tokenRotationInProgress;
}
loginPing() {
return this.request({ url: '/api/login/ping', method: 'GET', retry: 1 });
return this.fetch({ url: '/api/login/ping', method: 'GET', retry: 1 });
}
/** @deprecated */

View File

@ -1,16 +1,17 @@
import 'whatwg-fetch'; // fetch polyfill needed for PhantomJs rendering
import { Observable, of, lastValueFrom } from 'rxjs';
import { fromFetch } from 'rxjs/fetch';
import { delay } from 'rxjs/operators';
import { AppEvents, DataQueryErrorType, EventBusExtended } from '@grafana/data';
import { BackendSrvRequest, FetchError, FetchResponse } from '@grafana/runtime';
import { BackendSrvRequest, FetchError, config, FetchResponse } from '@grafana/runtime';
import { TokenRevokedModal } from '../../features/users/TokenRevokedModal';
import { ShowModalReactEvent } from '../../types/events';
import { BackendSrv, BackendSrvDependencies } from '../services/backend_srv';
import { ContextSrv, User } from '../services/context_srv';
const getTestContext = (overides?: object) => {
const getTestContext = (overides?: object, mockFromFetch = true) => {
const defaults = {
data: { test: 'hello world' },
ok: true,
@ -55,7 +56,7 @@ const getTestContext = (overides?: object) => {
const parseRequestOptionsMock = jest.fn().mockImplementation((options) => options);
const backendSrv = new BackendSrv({
fromFetch: fromFetchMock,
fromFetch: mockFromFetch ? fromFetchMock : fromFetch,
appEvents: appEventsMock,
contextSrv: contextSrvMock,
logout: logoutMock,
@ -63,14 +64,14 @@ const getTestContext = (overides?: object) => {
backendSrv['parseRequestOptions'] = parseRequestOptionsMock;
const expectCallChain = () => {
expect(fromFetchMock).toHaveBeenCalledTimes(1);
const expectCallChain = (calls = 1) => {
expect(fromFetchMock).toHaveBeenCalledTimes(calls);
};
const expectRequestCallChain = (options: unknown) => {
const expectRequestCallChain = (options: unknown, calls = 1) => {
expect(parseRequestOptionsMock).toHaveBeenCalledTimes(1);
expect(parseRequestOptionsMock).toHaveBeenCalledWith(options);
expectCallChain();
expectCallChain(calls);
};
return {
@ -158,38 +159,71 @@ describe('backendSrv', () => {
});
describe('when making an unsuccessful call and conditions for retry are favorable and loginPing does not throw', () => {
it('then it should retry', async () => {
jest.useFakeTimers();
const url = '/api/dashboard/';
const { backendSrv, appEventsMock, logoutMock, expectRequestCallChain } = getTestContext({
ok: false,
status: 401,
statusText: errorMessage,
data: { message: errorMessage },
url,
});
const url = '/api/dashboard/';
const okResponse = { ok: true, status: 200, statusText: 'OK', data: { message: 'Ok' } };
let fetchMock: jest.SpyInstance;
backendSrv.loginPing = jest
.fn()
.mockResolvedValue({ ok: true, status: 200, statusText: 'OK', data: { message: 'Ok' } });
afterEach(() => {
fetchMock.mockClear();
});
await backendSrv
.request({ url, method: 'GET', retry: 0 })
.catch((error) => {
expect(error.status).toBe(401);
expect(error.statusText).toBe(errorMessage);
expect(error.data).toEqual({ message: errorMessage });
expect(appEventsMock.emit).not.toHaveBeenCalled();
expect(logoutMock).not.toHaveBeenCalled();
expect(backendSrv.loginPing).toHaveBeenCalledTimes(1);
expectRequestCallChain({ url, method: 'GET', retry: 0 });
jest.advanceTimersByTime(50);
afterAll(() => {
fetchMock.mockRestore();
config.featureToggles.clientTokenRotation = false;
});
it.each`
clientTokenRotation
${true}
${false}
`('then it should retry (clientTokenRotation = %s)', async ({ clientTokenRotation }) => {
config.featureToggles.clientTokenRotation = clientTokenRotation;
fetchMock = jest
.spyOn(global, 'fetch')
.mockRejectedValueOnce({
ok: false,
status: 401,
statusText: errorMessage,
headers: new Map(),
text: jest.fn().mockResolvedValue(JSON.stringify({ test: 'hello world' })),
data: { message: errorMessage },
url,
})
.catch((error) => {
expect(error).toEqual({ message: errorMessage });
expect(appEventsMock.emit).toHaveBeenCalledTimes(1);
expect(appEventsMock.emit).toHaveBeenCalledWith(AppEvents.alertWarning, [errorMessage, '']);
});
.mockResolvedValueOnce({
ok: true,
status: 200,
headers: new Map(),
text: jest.fn().mockResolvedValue(JSON.stringify({ test: 'hello world' })),
data: { message: 'OK' },
url,
} as unknown as Response);
const { backendSrv, appEventsMock, logoutMock } = getTestContext(
{
ok: false,
status: 401,
statusText: errorMessage,
data: { message: errorMessage },
url,
},
false
);
backendSrv.loginPing = jest.fn().mockResolvedValue(okResponse);
backendSrv.rotateToken = jest.fn().mockResolvedValue(okResponse);
await backendSrv.request({ url, method: 'GET', retry: 0 }).finally(() => {
expect(appEventsMock.emit).not.toHaveBeenCalled();
expect(logoutMock).not.toHaveBeenCalled();
if (config.featureToggles.clientTokenRotation) {
expect(backendSrv.rotateToken).toHaveBeenCalledTimes(1);
} else {
expect(backendSrv.loginPing).toHaveBeenCalledTimes(1);
}
expect(fetchMock).toHaveBeenCalledTimes(2); // expecting 2 calls because of retry and because the loginPing/tokenRotation is mocked
});
});
});
@ -486,32 +520,68 @@ describe('backendSrv', () => {
});
describe('when making an unsuccessful call and conditions for retry are favorable and loginPing does not throw', () => {
it('then it should retry', async () => {
const { backendSrv, logoutMock, expectRequestCallChain } = getTestContext({
ok: false,
status: 401,
statusText: 'UnAuthorized',
data: { message: 'UnAuthorized' },
});
const url = '/api/dashboard/';
const okResponse = { ok: true, status: 200, statusText: 'OK', data: { message: 'Ok' } };
let fetchMock: jest.SpyInstance;
backendSrv.loginPing = jest
.fn()
.mockResolvedValue({ ok: true, status: 200, statusText: 'OK', data: { message: 'Ok' } });
const url = '/api/dashboard/';
afterEach(() => {
fetchMock.mockClear();
});
let inspectorPacket: FetchResponse | FetchError;
backendSrv.getInspectorStream().subscribe({
next: (rsp) => (inspectorPacket = rsp),
});
afterAll(() => {
fetchMock.mockRestore();
config.featureToggles.clientTokenRotation = false;
});
await backendSrv.datasourceRequest({ url, method: 'GET', retry: 0 }).catch((error) => {
expect(error.status).toBe(401);
expect(error.statusText).toBe('UnAuthorized');
expect(error.data).toEqual({ message: 'UnAuthorized' });
expect(inspectorPacket).toBe(error);
expect(backendSrv.loginPing).toHaveBeenCalledTimes(1);
it.each`
clientTokenRotation
${true}
${false}
`('then it should retry (clientTokenRotation = %s)', async ({ clientTokenRotation }) => {
config.featureToggles.clientTokenRotation = clientTokenRotation;
fetchMock = jest
.spyOn(global, 'fetch')
.mockRejectedValueOnce({
ok: false,
status: 401,
statusText: 'UnAuthorized',
headers: new Map(),
text: jest.fn().mockResolvedValue(JSON.stringify({ test: 'hello world' })),
data: { message: 'UnAuthorized' },
url,
})
.mockResolvedValueOnce({
ok: true,
status: 200,
headers: new Map(),
text: jest.fn().mockResolvedValue(JSON.stringify({ test: 'hello world' })),
data: { message: 'OK' },
url,
} as unknown as Response);
const { backendSrv, logoutMock } = getTestContext(
{
ok: false,
status: 401,
statusText: 'UnAuthorized',
data: { message: 'UnAuthorized' },
},
false
);
backendSrv.loginPing = jest.fn().mockResolvedValue(okResponse);
backendSrv.rotateToken = jest.fn().mockResolvedValue(okResponse);
await backendSrv.datasourceRequest({ url, method: 'GET', retry: 0 }).finally(() => {
expect(logoutMock).not.toHaveBeenCalled();
expectRequestCallChain({ url, method: 'GET', retry: 0 });
if (config.featureToggles.clientTokenRotation) {
expect(backendSrv.rotateToken).toHaveBeenCalledTimes(1);
} else {
expect(backendSrv.loginPing).toHaveBeenCalledTimes(1);
}
expect(fetchMock).toHaveBeenCalledTimes(2); // expecting 2 calls because of retry and because the loginPing/tokenRotation is mocked
});
});
});