DataSourceWithBackend: Add plugin id to the request headers (#58082)

This commit is contained in:
Ryan McKinley 2022-11-15 01:35:50 +01:00 committed by GitHub
parent b9d8bcb59b
commit d33939da55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 199 additions and 14 deletions

View File

@ -6203,6 +6203,10 @@ exports[`better eslint`] = {
[0, 0, 0, "Do not use any type assertions.", "0"],
[0, 0, 0, "Unexpected any. Specify a different type.", "1"]
],
"public/app/plugins/datasource/loki/datasource.test.ts:5381": [
[0, 0, 0, "Unexpected any. Specify a different type.", "0"],
[0, 0, 0, "Unexpected any. Specify a different type.", "1"]
],
"public/app/plugins/datasource/loki/datasource.ts:5381": [
[0, 0, 0, "Unexpected any. Specify a different type.", "0"],
[0, 0, 0, "Unexpected any. Specify a different type.", "1"],

View File

@ -32,7 +32,10 @@ jest.mock('../services', () => ({
getBackendSrv: () => backendSrv,
getDataSourceSrv: () => {
return {
getInstanceSettings: (ref?: DataSourceRef) => ({ type: ref?.type ?? '?', uid: ref?.uid ?? '?' }),
getInstanceSettings: (ref?: DataSourceRef) => ({
type: ref?.type ?? '<mocktype>',
uid: ref?.uid ?? '<mockuid>',
}),
};
},
}));
@ -43,6 +46,8 @@ describe('DataSourceWithBackend', () => {
maxDataPoints: 10,
intervalMs: 5000,
targets: [{ refId: 'A' }, { refId: 'B', datasource: { type: 'sample' } }],
dashboardUID: 'dashA',
panelId: 123,
} as DataQueryRequest);
const args = mock.calls[0][0];
@ -65,7 +70,7 @@ describe('DataSourceWithBackend', () => {
Object {
"datasource": Object {
"type": "sample",
"uid": "?",
"uid": "<mockuid>",
},
"datasourceId": undefined,
"intervalMs": 5000,
@ -74,6 +79,12 @@ describe('DataSourceWithBackend', () => {
},
],
},
"headers": Object {
"X-Dashboard-Uid": "dashA",
"X-Datasource-Uid": "abc, <mockuid>",
"X-Panel-Id": "123",
"X-Plugin-Id": "dummy, sample",
},
"hideFromInspector": false,
"method": "POST",
"requestId": undefined,
@ -88,6 +99,8 @@ describe('DataSourceWithBackend', () => {
intervalMs: 5000,
targets: [{ refId: 'A' }, { refId: 'B', datasource: { type: 'sample' } }],
hideFromInspector: true,
dashboardUID: 'dashA',
panelId: 123,
} as DataQueryRequest);
const args = mock.calls[0][0];
@ -110,7 +123,7 @@ describe('DataSourceWithBackend', () => {
Object {
"datasource": Object {
"type": "sample",
"uid": "?",
"uid": "<mockuid>",
},
"datasourceId": undefined,
"intervalMs": 5000,
@ -119,6 +132,12 @@ describe('DataSourceWithBackend', () => {
},
],
},
"headers": Object {
"X-Dashboard-Uid": "dashA",
"X-Datasource-Uid": "abc, <mockuid>",
"X-Panel-Id": "123",
"X-Plugin-Id": "dummy, sample",
},
"hideFromInspector": true,
"method": "POST",
"requestId": undefined,

View File

@ -71,6 +71,14 @@ export enum HealthStatus {
Error = 'ERROR',
}
// Internal for now
enum PluginRequestHeaders {
PluginID = 'X-Plugin-Id', // can be used for routing
DatasourceUID = 'X-Datasource-Uid', // can be used for routing/ load balancing
DashboardUID = 'X-Dashboard-Uid', // mainly useful for debuging slow queries
PanelID = 'X-Panel-Id', // mainly useful for debuging slow queries
}
/**
* Describes the details in the payload returned when checking the health of a data source
* plugin.
@ -119,11 +127,15 @@ class DataSourceWithBackend<
targets = targets.filter((q) => this.filterQuery!(q));
}
let hasExpr = false;
const pluginIDs = new Set<string>();
const dsUIDs = new Set<string>();
const queries = targets.map((q) => {
let datasource = this.getRef();
let datasourceId = this.id;
if (isExpressionReference(q.datasource)) {
hasExpr = true;
return {
...q,
datasource: ExpressionDatasourceRef,
@ -140,7 +152,12 @@ class DataSourceWithBackend<
datasource = ds.rawRef ?? getDataSourceRef(ds);
datasourceId = ds.id;
}
if (datasource.type?.length) {
pluginIDs.add(datasource.type);
}
if (datasource.uid?.length) {
dsUIDs.add(datasource.uid);
}
return {
...this.applyTemplateVariables(q, request.scopedVars),
datasource,
@ -170,13 +187,28 @@ class DataSourceWithBackend<
});
}
let url = '/api/ds/query';
if (hasExpr) {
url += '?expression=true';
}
const headers: Record<string, string> = {};
headers[PluginRequestHeaders.PluginID] = Array.from(pluginIDs).join(', ');
headers[PluginRequestHeaders.DatasourceUID] = Array.from(dsUIDs).join(', ');
if (request.dashboardUID) {
headers[PluginRequestHeaders.DashboardUID] = request.dashboardUID;
}
if (request.panelId) {
headers[PluginRequestHeaders.PanelID] = `${request.panelId}`;
}
return getBackendSrv()
.fetch<BackendDataSourceResponse>({
url: '/api/ds/query',
url,
method: 'POST',
data: body,
requestId,
hideFromInspector,
headers,
})
.pipe(
switchMap((raw) => {
@ -193,6 +225,14 @@ class DataSourceWithBackend<
);
}
/** Get request headers with plugin ID+UID set */
protected getRequestHeaders(): Record<string, string> {
const headers: Record<string, string> = {};
headers[PluginRequestHeaders.PluginID] = this.type;
headers[PluginRequestHeaders.DatasourceUID] = this.uid;
return headers;
}
/**
* Apply template variables for explore
*/
@ -221,23 +261,43 @@ class DataSourceWithBackend<
/**
* Make a GET request to the datasource resource path
*/
async getResource(
async getResource<T = any>(
path: string,
params?: BackendSrvRequest['params'],
options?: Partial<BackendSrvRequest>
): Promise<any> {
return getBackendSrv().get(`/api/datasources/${this.id}/resources/${path}`, params, options?.requestId, options);
): Promise<T> {
const headers = this.getRequestHeaders();
const result = await lastValueFrom(
getBackendSrv().fetch<T>({
...options,
method: 'GET',
headers: options?.headers ? { ...options.headers, ...headers } : headers,
params: params ?? options?.params,
url: `/api/datasources/${this.id}/resources/${path}`,
})
);
return result.data;
}
/**
* Send a POST request to the datasource resource path
*/
async postResource(
async postResource<T = any>(
path: string,
data?: BackendSrvRequest['data'],
options?: Partial<BackendSrvRequest>
): Promise<any> {
return getBackendSrv().post(`/api/datasources/${this.id}/resources/${path}`, { ...data }, options);
): Promise<T> {
const headers = this.getRequestHeaders();
const result = await lastValueFrom(
getBackendSrv().fetch<T>({
...options,
method: 'GET',
headers: options?.headers ? { ...options.headers, ...headers } : headers,
data: data ?? { ...data },
url: `/api/datasources/${this.id}/resources/${path}`,
})
);
return result.data;
}
/**
@ -249,6 +309,7 @@ class DataSourceWithBackend<
method: 'GET',
url: `/api/datasources/${this.id}/health`,
showErrorAlert: false,
headers: this.getRequestHeaders(),
})
)
.then((v: FetchResponse) => v.data as HealthCheckResult)

View File

@ -8,4 +8,5 @@ var (
ErrNoQueriesFound = errutil.NewBase(errutil.StatusBadRequest, "query.noQueries", errutil.WithPublicMessage("No queries found")).Errorf("no queries found")
ErrInvalidDatasourceID = errutil.NewBase(errutil.StatusBadRequest, "query.invalidDatasourceId", errutil.WithPublicMessage("Query does not contain a valid data source identifier")).Errorf("invalid data source identifier")
ErrMissingDataSourceInfo = errutil.NewBase(errutil.StatusBadRequest, "query.missingDataSourceInfo").MustTemplate("query missing datasource info: {{ .Public.RefId }}", errutil.WithPublic("Query {{ .Public.RefId }} is missing datasource information"))
ErrQueryParamMismatch = errutil.NewBase(errutil.StatusBadRequest, "query.headerMismatch", errutil.WithPublicMessage("The request headers point to a different plugin than is defined in the request body")).Errorf("plugin header/body mismatch")
)

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/http"
"strings"
"time"
"github.com/grafana/grafana/pkg/api/dtos"
@ -28,6 +29,13 @@ import (
"golang.org/x/sync/errgroup"
)
const (
HeaderPluginID = "X-Plugin-Id" // can be used for routing
HeaderDatasourceUID = "X-Datasource-Uid" // can be used for routing/ load balancing
HeaderDashboardUID = "X-Dashboard-Uid" // mainly useful for debuging slow queries
HeaderPanelID = "X-Panel-Id" // mainly useful for debuging slow queries
)
func ProvideService(
cfg *setting.Cfg,
dataSourceCache datasources.CacheService,
@ -75,6 +83,7 @@ func (s *Service) QueryData(ctx context.Context, user *user.SignedInUser, skipCa
if err != nil {
return nil, err
}
// If there are expressions, handle them and return
if parsedReq.hasExpression {
return s.handleExpressions(ctx, user, parsedReq)
@ -233,6 +242,7 @@ type parsedQuery struct {
type parsedRequest struct {
hasExpression bool
parsedQueries map[string][]parsedQuery
dsTypes map[string]bool
httpRequest *http.Request
}
@ -244,6 +254,53 @@ func (pr parsedRequest) getFlattenedQueries() []parsedQuery {
return queries
}
func (pr parsedRequest) validateRequest() error {
if pr.httpRequest == nil {
return nil
}
vals := splitHeaders(pr.httpRequest.Header.Values(HeaderDatasourceUID))
count := len(vals)
if count > 0 { // header exists
if count != len(pr.parsedQueries) {
return ErrQueryParamMismatch
}
for _, t := range vals {
if pr.parsedQueries[t] == nil {
return ErrQueryParamMismatch
}
}
}
vals = splitHeaders(pr.httpRequest.Header.Values(HeaderPluginID))
count = len(vals)
if count > 0 { // header exists
if count != len(pr.dsTypes) {
return ErrQueryParamMismatch
}
for _, t := range vals {
if !pr.dsTypes[t] {
return ErrQueryParamMismatch
}
}
}
return nil
}
func splitHeaders(headers []string) []string {
out := []string{}
for _, v := range headers {
if strings.Contains(v, ",") {
for _, sub := range strings.Split(v, ",") {
out = append(out, strings.TrimSpace(sub))
}
} else {
out = append(out, v)
}
}
return out
}
// parseRequest parses a request into parsed queries grouped by datasource uid
func (s *Service) parseMetricRequest(ctx context.Context, user *user.SignedInUser, skipCache bool, reqDTO dtos.MetricRequest) (*parsedRequest, error) {
if len(reqDTO.Queries) == 0 {
@ -254,6 +311,7 @@ func (s *Service) parseMetricRequest(ctx context.Context, user *user.SignedInUse
req := &parsedRequest{
hasExpression: false,
parsedQueries: make(map[string][]parsedQuery),
dsTypes: make(map[string]bool),
}
// Parse the queries and store them by datasource
@ -270,6 +328,8 @@ func (s *Service) parseMetricRequest(ctx context.Context, user *user.SignedInUse
datasourcesByUid[ds.Uid] = ds
if expr.IsDataSource(ds.Uid) {
req.hasExpression = true
} else {
req.dsTypes[ds.Type] = true
}
if _, ok := req.parsedQueries[ds.Uid]; !ok {
@ -304,7 +364,7 @@ func (s *Service) parseMetricRequest(ctx context.Context, user *user.SignedInUse
req.httpRequest = reqDTO.HTTPRequest
}
return req, nil
return req, req.validateRequest()
}
func (s *Service) getDataSourceFromQuery(ctx context.Context, user *user.SignedInUser, skipCache bool, query *simplejson.Json, history map[string]*datasources.DataSource) (*datasources.DataSource, error) {

View File

@ -1,6 +1,7 @@
package query
import (
"bytes"
"context"
"errors"
"net/http"
@ -169,6 +170,40 @@ func TestParseMetricRequest(t *testing.T) {
_, err = tc.queryService.handleExpressions(context.Background(), tc.signedInUser, parsedReq)
assert.NoError(t, err)
})
t.Run("Header validation", func(t *testing.T) {
mr := metricRequestWithQueries(t, `{
"refId": "A",
"datasource": {
"uid": "gIEkMvIVz",
"type": "postgres"
}
}`, `{
"refId": "B",
"datasource": {
"uid": "sEx6ZvSVk",
"type": "testdata"
}
}`)
httpreq, _ := http.NewRequest(http.MethodPost, "http://localhost/", bytes.NewReader([]byte{}))
httpreq.Header.Add("X-Datasource-Uid", "gIEkMvIVz")
mr.HTTPRequest = httpreq
_, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr)
require.Error(t, err)
// With the second value it is OK
httpreq.Header.Add("X-Datasource-Uid", "sEx6ZvSVk")
mr.HTTPRequest = httpreq
_, err = tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr)
require.NoError(t, err)
// Single header with comma syntax
httpreq, _ = http.NewRequest(http.MethodPost, "http://localhost/", bytes.NewReader([]byte{}))
httpreq.Header.Set("X-Datasource-Uid", "gIEkMvIVz, sEx6ZvSVk")
mr.HTTPRequest = httpreq
_, err = tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr)
require.NoError(t, err)
})
}
func TestQueryDataMultipleSources(t *testing.T) {

View File

@ -148,6 +148,7 @@ export abstract class SqlDatasource extends DataSourceWithBackend<SQLQuery, SQLO
.fetch<BackendDataSourceResponse>({
url: '/api/ds/query',
method: 'POST',
headers: this.getRequestHeaders(),
data: {
from: options?.range?.from.valueOf().toString() || range.from.valueOf().toString(),
to: options?.range?.to.valueOf().toString() || range.to.valueOf().toString(),
@ -171,6 +172,7 @@ export abstract class SqlDatasource extends DataSourceWithBackend<SQLQuery, SQLO
.fetch<BackendDataSourceResponse>({
url: '/api/ds/query',
method: 'POST',
headers: this.getRequestHeaders(),
data: {
from: '5m',
to: 'now',

View File

@ -110,6 +110,7 @@ export default class CloudMonitoringDatasource extends DataSourceWithBackend<
return getBackendSrv().fetch<PostResponse>({
url: '/api/ds/query',
method: 'POST',
headers: this.getRequestHeaders(),
data: {
from: options.range.from.valueOf().toString(),
to: options.range.to.valueOf().toString(),

View File

@ -409,6 +409,7 @@ export default class InfluxDatasource extends DataSourceWithBackend<InfluxQuery,
.fetch<BackendDataSourceResponse>({
url: '/api/ds/query',
method: 'POST',
headers: this.getRequestHeaders(),
data: {
from: options.range.from.valueOf().toString(),
to: options.range.to.valueOf().toString(),

View File

@ -882,7 +882,7 @@ describe('LokiDatasource', () => {
});
it('keeps all labels when no labels are loaded', async () => {
ds.getResource = () => Promise.resolve({ data: [] });
ds.getResource = () => Promise.resolve({ data: [] } as any);
const queries = await ds.importFromAbstractQueries([
{
refId: 'A',
@ -896,7 +896,7 @@ describe('LokiDatasource', () => {
});
it('filters out non existing labels', async () => {
ds.getResource = () => Promise.resolve({ data: ['foo'] });
ds.getResource = () => Promise.resolve({ data: ['foo'] } as any);
const queries = await ds.importFromAbstractQueries([
{
refId: 'A',

View File

@ -773,6 +773,7 @@ export class PrometheusDatasource
.fetch<BackendDataSourceResponse>({
url: '/api/ds/query',
method: 'POST',
headers: this.getRequestHeaders(),
data: {
from: (this.getPrometheusTime(options.range.from, false) * 1000).toString(),
to: (this.getPrometheusTime(options.range.to, true) * 1000).toString(),