Prometheus: Add custom query params for alert and exemplars queries (#32440)

* Add configuration for custom query params

* Add custom transport in prometheus

* Move custom query handling to request method

* Add encoding
This commit is contained in:
Andrej Ocenas 2021-05-12 19:30:41 +02:00 committed by GitHub
parent 1601f12cf1
commit 293677a0cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 204 additions and 168 deletions

View File

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"net/url"
"regexp"
"strings"
"time"
@ -23,33 +24,53 @@ import (
)
type PrometheusExecutor struct {
Transport http.RoundTripper
intervalCalculator interval.Calculator
baseRoundTripperFactory func(dsInfo *models.DataSource) (http.RoundTripper, error)
intervalCalculator interval.Calculator
}
type basicAuthTransport struct {
type prometheusTransport struct {
Transport http.RoundTripper
username string
password string
hasBasicAuth bool
username string
password string
customQueryParameters string
}
func (bat basicAuthTransport) RoundTrip(req *http.Request) (*http.Response, error) {
req.SetBasicAuth(bat.username, bat.password)
return bat.Transport.RoundTrip(req)
func (transport *prometheusTransport) RoundTrip(req *http.Request) (*http.Response, error) {
if transport.hasBasicAuth {
req.SetBasicAuth(transport.username, transport.password)
}
if transport.customQueryParameters != "" {
params := url.Values{}
for _, param := range strings.Split(transport.customQueryParameters, "&") {
parts := strings.Split(param, "=")
if len(parts) == 1 {
// This is probably a mistake on the users part in defining the params but we don't want to crash.
params.Add(parts[0], "")
} else {
params.Add(parts[0], parts[1])
}
}
if req.URL.RawQuery != "" {
req.URL.RawQuery = fmt.Sprintf("%s&%s", req.URL.RawQuery, params.Encode())
} else {
req.URL.RawQuery = params.Encode()
}
}
return transport.Transport.RoundTrip(req)
}
//nolint: staticcheck // plugins.DataPlugin deprecated
func NewExecutor(dsInfo *models.DataSource) (plugins.DataPlugin, error) {
transport, err := dsInfo.GetHttpTransport()
if err != nil {
return nil, err
}
return &PrometheusExecutor{
Transport: transport,
intervalCalculator: interval.NewCalculator(interval.CalculatorOptions{MinInterval: time.Second * 1}),
baseRoundTripperFactory: func(ds *models.DataSource) (http.RoundTripper, error) {
return ds.GetHttpTransport()
},
}, nil
}
@ -63,17 +84,23 @@ func init() {
}
func (e *PrometheusExecutor) getClient(dsInfo *models.DataSource) (apiv1.API, error) {
cfg := api.Config{
Address: dsInfo.Url,
RoundTripper: e.Transport,
// Would make sense to cache this but executor is recreated on every alert request anyway.
transport, err := e.baseRoundTripperFactory(dsInfo)
if err != nil {
return nil, err
}
if dsInfo.BasicAuth {
cfg.RoundTripper = basicAuthTransport{
Transport: e.Transport,
username: dsInfo.BasicAuthUser,
password: dsInfo.DecryptedBasicAuthPassword(),
}
promTransport := &prometheusTransport{
Transport: transport,
hasBasicAuth: dsInfo.BasicAuth,
username: dsInfo.BasicAuthUser,
password: dsInfo.DecryptedBasicAuthPassword(),
customQueryParameters: dsInfo.JsonData.Get("customQueryParameters").MustString(""),
}
cfg := api.Config{
Address: dsInfo.Url,
RoundTripper: promTransport,
}
client, err := api.NewClient(cfg)

View File

@ -1,6 +1,9 @@
package prometheus
import (
"context"
"fmt"
"net/http"
"testing"
"time"
@ -12,12 +15,15 @@ import (
)
func TestPrometheus(t *testing.T) {
json, _ := simplejson.NewJson([]byte(`
{ "customQueryParameters": "custom=par/am&second=f oo"}
`))
dsInfo := &models.DataSource{
JsonData: simplejson.New(),
JsonData: json,
}
plug, err := NewExecutor(dsInfo)
executor := plug.(*PrometheusExecutor)
require.NoError(t, err)
executor := plug.(*PrometheusExecutor)
t.Run("converting metric name", func(t *testing.T) {
metric := map[p.LabelName]p.LabelValue{
@ -47,100 +53,98 @@ func TestPrometheus(t *testing.T) {
})
t.Run("parsing query model with step", func(t *testing.T) {
json := `{
"expr": "go_goroutines",
"format": "time_series",
"refId": "A"
}`
jsonModel, _ := simplejson.NewJson([]byte(json))
queryModels := []plugins.DataSubQuery{
{Model: jsonModel},
}
timeRange := plugins.NewDataTimeRange("12h", "now")
queryContext := plugins.DataQuery{
Queries: queryModels,
TimeRange: &timeRange,
}
models, err := executor.parseQuery(dsInfo, queryContext)
query := queryContext(`{
"expr": "go_goroutines",
"format": "time_series",
"refId": "A"
}`)
timerange := plugins.NewDataTimeRange("12h", "now")
query.TimeRange = &timerange
models, err := executor.parseQuery(dsInfo, query)
require.NoError(t, err)
require.Equal(t, time.Second*30, models[0].Step)
})
t.Run("parsing query model without step parameter", func(t *testing.T) {
json := `{
"expr": "go_goroutines",
"format": "time_series",
"intervalFactor": 1,
"refId": "A"
}`
jsonModel, _ := simplejson.NewJson([]byte(json))
queryModels := []plugins.DataSubQuery{
{Model: jsonModel},
}
timeRange := plugins.NewDataTimeRange("48h", "now")
queryContext := plugins.DataQuery{
Queries: queryModels,
TimeRange: &timeRange,
}
models, err := executor.parseQuery(dsInfo, queryContext)
query := queryContext(`{
"expr": "go_goroutines",
"format": "time_series",
"intervalFactor": 1,
"refId": "A"
}`)
models, err := executor.parseQuery(dsInfo, query)
require.NoError(t, err)
require.Equal(t, time.Minute*2, models[0].Step)
timeRange = plugins.NewDataTimeRange("1h", "now")
queryContext.TimeRange = &timeRange
models, err = executor.parseQuery(dsInfo, queryContext)
timeRange := plugins.NewDataTimeRange("1h", "now")
query.TimeRange = &timeRange
models, err = executor.parseQuery(dsInfo, query)
require.NoError(t, err)
require.Equal(t, time.Second*15, models[0].Step)
})
t.Run("parsing query model with high intervalFactor", func(t *testing.T) {
json := `{
"expr": "go_goroutines",
"format": "time_series",
"intervalFactor": 10,
"refId": "A"
}`
jsonModel, _ := simplejson.NewJson([]byte(json))
queryModels := []plugins.DataSubQuery{
{Model: jsonModel},
}
timeRange := plugins.NewDataTimeRange("48h", "now")
queryContext := plugins.DataQuery{
TimeRange: &timeRange,
Queries: queryModels,
}
models, err := executor.parseQuery(dsInfo, queryContext)
models, err := executor.parseQuery(dsInfo, queryContext(`{
"expr": "go_goroutines",
"format": "time_series",
"intervalFactor": 10,
"refId": "A"
}`))
require.NoError(t, err)
require.Equal(t, time.Minute*20, models[0].Step)
})
t.Run("parsing query model with low intervalFactor", func(t *testing.T) {
json := `{
"expr": "go_goroutines",
"format": "time_series",
"intervalFactor": 1,
"refId": "A"
}`
jsonModel, _ := simplejson.NewJson([]byte(json))
queryModels := []plugins.DataSubQuery{
{Model: jsonModel},
}
timeRange := plugins.NewDataTimeRange("48h", "now")
queryContext := plugins.DataQuery{
TimeRange: &timeRange,
Queries: queryModels,
}
models, err := executor.parseQuery(dsInfo, queryContext)
models, err := executor.parseQuery(dsInfo, queryContext(`{
"expr": "go_goroutines",
"format": "time_series",
"intervalFactor": 1,
"refId": "A"
}`))
require.NoError(t, err)
require.Equal(t, time.Minute*2, models[0].Step)
})
t.Run("runs query with custom params", func(t *testing.T) {
query := queryContext(`{
"expr": "go_goroutines",
"format": "time_series",
"intervalFactor": 1,
"refId": "A"
}`)
queryParams := ""
executor.baseRoundTripperFactory = func(ds *models.DataSource) (http.RoundTripper, error) {
rt := &RoundTripperMock{}
rt.roundTrip = func(request *http.Request) (*http.Response, error) {
queryParams = request.URL.RawQuery
return nil, fmt.Errorf("this is fine")
}
return rt, nil
}
_, _ = executor.DataQuery(context.Background(), dsInfo, query)
require.Equal(t, "custom=par%2Fam&second=f+oo", queryParams)
})
}
type RoundTripperMock struct {
roundTrip func(*http.Request) (*http.Response, error)
}
func (rt *RoundTripperMock) RoundTrip(req *http.Request) (*http.Response, error) {
return rt.roundTrip(req)
}
func queryContext(json string) plugins.DataQuery {
jsonModel, _ := simplejson.NewJson([]byte(json))
queryModels := []plugins.DataSubQuery{
{Model: jsonModel},
}
timeRange := plugins.NewDataTimeRange("48h", "now")
return plugins.DataQuery{
TimeRange: &timeRange,
Queries: queryModels,
}
}
func TestParseResponse(t *testing.T) {

View File

@ -126,28 +126,47 @@ describe('PrometheusDatasource', () => {
});
});
describe('When using customQueryParams', () => {
describe('customQueryParams', () => {
const promDs = new PrometheusDatasource(
{ ...instanceSettings, jsonData: { customQueryParameters: 'customQuery=123' } as any },
templateSrvStub as any,
timeSrvStub as any
);
const target = { expr: 'test{job="testjob"}', format: 'time_series', refId: '' };
function makeQuery(target: PromQuery) {
return {
range: { from: time({ seconds: 63 }), to: time({ seconds: 183 }) },
targets: [target],
interval: '60s',
} as any;
}
it('added to metadata request', () => {
promDs.metadataRequest('/foo');
expect(fetchMock.mock.calls.length).toBe(1);
expect(fetchMock.mock.calls[0][0].url).toBe('proxied/foo?customQuery=123');
});
it('added to query', () => {
promDs.query({
range: { from: time({ seconds: 63 }), to: time({ seconds: 183 }) },
targets: [{ expr: 'test{job="testjob"}', format: 'time_series' }],
interval: '60s',
} as any);
it('adds params to timeseries query', () => {
promDs.query(makeQuery(target));
expect(fetchMock.mock.calls.length).toBe(1);
expect(fetchMock.mock.calls[0][0].url).toBe(
'proxied/api/v1/query_range?query=test%7Bjob%3D%22testjob%22%7D&start=60&end=180&step=60&customQuery=123'
);
});
it('adds params to exemplars query', () => {
promDs.query(makeQuery({ ...target, exemplar: true }));
// We do also range query for single exemplars target
expect(fetchMock.mock.calls.length).toBe(2);
expect(fetchMock.mock.calls[0][0].url).toContain('&customQuery=123');
expect(fetchMock.mock.calls[1][0].url).toContain('&customQuery=123');
});
it('adds params to instant query', () => {
promDs.query(makeQuery({ ...target, instant: true }));
expect(fetchMock.mock.calls.length).toBe(1);
expect(fetchMock.mock.calls[0][0].url).toContain('&customQuery=123');
});
});
describe('When using adhoc filters', () => {

View File

@ -13,13 +13,13 @@ import {
ScopedVars,
TimeRange,
} from '@grafana/data';
import { BackendSrvRequest, FetchError, getBackendSrv } from '@grafana/runtime';
import { BackendSrvRequest, FetchError, FetchResponse, getBackendSrv } from '@grafana/runtime';
import { safeStringifyValue } from 'app/core/utils/explore';
import { getTimeSrv, TimeSrv } from 'app/features/dashboard/services/TimeSrv';
import { getTemplateSrv, TemplateSrv } from 'app/features/templating/template_srv';
import { defaults, cloneDeep } from 'lodash';
import LRU from 'lru-cache';
import { forkJoin, merge, Observable, of, pipe, Subject, throwError } from 'rxjs';
import { forkJoin, merge, Observable, of, OperatorFunction, pipe, Subject, throwError } from 'rxjs';
import { catchError, filter, map, tap } from 'rxjs/operators';
import addLabelToQuery from './add_label_to_query';
import PrometheusLanguageProvider from './language_provider';
@ -105,7 +105,23 @@ export class PrometheusDatasource extends DataSourceApi<PromQuery, PromOptions>
}
}
_request<T = any>(url: string, data: Record<string, string> | null, overrides: Partial<BackendSrvRequest> = {}) {
/**
* Any request done from this data source should go through here as it contains some common processing for the
* request. Any processing done here needs to be also copied on the backend as this goes through data source proxy
* but not through the same code as alerting.
*/
_request<T = any>(
url: string,
data: Record<string, string> | null,
overrides: Partial<BackendSrvRequest> = {}
): Observable<FetchResponse<T>> {
data = data || {};
for (const [key, value] of this.customQueryParameters) {
if (data[key] == null) {
data[key] = value;
}
}
const options: BackendSrvRequest = defaults(overrides, {
url: this.url + url,
method: this.httpMethod,
@ -139,18 +155,10 @@ export class PrometheusDatasource extends DataSourceApi<PromQuery, PromOptions>
// Use this for tab completion features, wont publish response to other components
async metadataRequest<T = any>(url: string, params = {}) {
const data: any = params;
for (const [key, value] of this.customQueryParameters) {
if (data[key] == null) {
data[key] = value;
}
}
// If URL includes endpoint that supports POST and GET method, try to use configured method. This might fail as POST is supported only in v2.10+.
if (GET_AND_POST_METADATA_ENDPOINTS.some((endpoint) => url.includes(endpoint))) {
try {
return await this._request<T>(url, data, { method: this.httpMethod, hideFromInspector: true }).toPromise();
return await this._request<T>(url, params, { method: this.httpMethod, hideFromInspector: true }).toPromise();
} catch (err) {
// If status code of error is Method Not Allowed (405) and HTTP method is POST, retry with GET
if (this.httpMethod === 'POST' && err.status === 405) {
@ -161,7 +169,7 @@ export class PrometheusDatasource extends DataSourceApi<PromQuery, PromOptions>
}
}
return await this._request<T>(url, data, { method: 'GET', hideFromInspector: true }).toPromise(); // toPromise until we change getTagValues, getTagKeys to Observable
return await this._request<T>(url, params, { method: 'GET', hideFromInspector: true }).toPromise(); // toPromise until we change getTagValues, getTagKeys to Observable
}
interpolateQueryExpr(value: string | string[] = [], variable: any) {
@ -308,24 +316,7 @@ export class PrometheusDatasource extends DataSourceApi<PromQuery, PromOptions>
})
);
if (query.instant) {
return this.performInstantQuery(query, end).pipe(filterAndMapResponse);
}
if (query.exemplar) {
return this.getExemplars(query).pipe(
catchError(() => {
this.exemplarErrors.next(EXEMPLARS_NOT_AVAILABLE);
return of({
data: [],
state: LoadingState.Done,
});
}),
filterAndMapResponse
);
}
return this.performTimeSeriesQuery(query, query.start, query.end).pipe(filterAndMapResponse);
return this.runQuery(query, end, filterAndMapResponse);
});
return merge(...subQueries);
@ -355,24 +346,7 @@ export class PrometheusDatasource extends DataSourceApi<PromQuery, PromOptions>
})
);
if (query.instant) {
return this.performInstantQuery(query, end).pipe(filterAndMapResponse);
}
if (query.exemplar) {
return this.getExemplars(query).pipe(
catchError(() => {
this.exemplarErrors.next(EXEMPLARS_NOT_AVAILABLE);
return of({
data: [],
state: LoadingState.Done,
});
}),
filterAndMapResponse
);
}
return this.performTimeSeriesQuery(query, query.start, query.end).pipe(filterAndMapResponse);
return this.runQuery(query, end, filterAndMapResponse);
});
return forkJoin(observables).pipe(
@ -389,6 +363,27 @@ export class PrometheusDatasource extends DataSourceApi<PromQuery, PromOptions>
);
}
private runQuery<T>(query: PromQueryRequest, end: number, filter: OperatorFunction<any, T>): Observable<T> {
if (query.instant) {
return this.performInstantQuery(query, end).pipe(filter);
}
if (query.exemplar) {
return this.getExemplars(query).pipe(
catchError((err: FetchError) => {
this.exemplarErrors.next(EXEMPLARS_NOT_AVAILABLE);
return of({
data: [],
state: LoadingState.Done,
});
}),
filter
);
}
return this.performTimeSeriesQuery(query, query.start, query.end).pipe(filter);
}
createQuery(target: PromQuery, options: DataQueryRequest<PromQuery>, start: number, end: number) {
const query: PromQueryRequest = {
hinting: target.hinting,
@ -499,12 +494,6 @@ export class PrometheusDatasource extends DataSourceApi<PromQuery, PromOptions>
data['timeout'] = this.queryTimeout;
}
for (const [key, value] of this.customQueryParameters) {
if (data[key] == null) {
data[key] = value;
}
}
return this._request<PromDataSuccessResponse<PromMatrixData>>(url, data, {
requestId: query.requestId,
headers: query.headers,
@ -519,7 +508,10 @@ export class PrometheusDatasource extends DataSourceApi<PromQuery, PromOptions>
);
}
performInstantQuery(query: PromQueryRequest, time: number) {
performInstantQuery(
query: PromQueryRequest,
time: number
): Observable<FetchResponse<PromDataSuccessResponse<PromVectorData | PromScalarData>> | FetchError> {
const url = '/api/v1/query';
const data: any = {
query: query.expr,
@ -530,12 +522,6 @@ export class PrometheusDatasource extends DataSourceApi<PromQuery, PromOptions>
data['timeout'] = this.queryTimeout;
}
for (const [key, value] of this.customQueryParameters) {
if (data[key] == null) {
data[key] = value;
}
}
return this._request<PromDataSuccessResponse<PromVectorData | PromScalarData>>(url, data, {
requestId: query.requestId,
headers: query.headers,