mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Datasource: Shared HTTP client provider for core backend data sources and any data source using the data source proxy (#33439)
Uses new httpclient package from grafana-plugin-sdk-go introduced via grafana/grafana-plugin-sdk-go#328. Replaces the GetHTTPClient, GetTransport, GetTLSConfig methods defined on DataSource model. Longer-term the goal is to migrate core HTTP backend data sources to use the SDK contracts and using httpclient.Provider for creating HTTP clients and such. Co-authored-by: Arve Knudsen <arve.knudsen@gmail.com>
This commit is contained in:
committed by
GitHub
parent
7a83d1f9ff
commit
348e76fc8e
@@ -2,128 +2,17 @@ package models
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-aws-sdk/pkg/sigv4"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/metrics/metricutil"
|
||||
sdkhttpclient "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
"github.com/grafana/grafana/pkg/infra/httpclient"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
)
|
||||
|
||||
var datasourceRequestCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "grafana",
|
||||
Name: "datasource_request_total",
|
||||
Help: "A counter for outgoing requests for a datasource",
|
||||
},
|
||||
[]string{"datasource", "code", "method"},
|
||||
)
|
||||
|
||||
var datasourceRequestSummary = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Namespace: "grafana",
|
||||
Name: "datasource_request_duration_seconds",
|
||||
Help: "summary of outgoing datasource requests sent from Grafana",
|
||||
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
|
||||
}, []string{"datasource", "code", "method"},
|
||||
)
|
||||
|
||||
var datasourceResponseSummary = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Namespace: "grafana",
|
||||
Name: "datasource_response_size_bytes",
|
||||
Help: "summary of datasource response sizes returned to Grafana",
|
||||
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
|
||||
}, []string{"datasource"},
|
||||
)
|
||||
|
||||
var datasourceRequestsInFlight = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: "grafana",
|
||||
Name: "datasource_request_in_flight",
|
||||
Help: "A gauge of outgoing datasource requests currently being sent by Grafana",
|
||||
},
|
||||
[]string{"datasource"},
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(datasourceRequestSummary,
|
||||
datasourceRequestCounter,
|
||||
datasourceRequestsInFlight,
|
||||
datasourceResponseSummary)
|
||||
}
|
||||
|
||||
type proxyTransportCache struct {
|
||||
cache map[int64]cachedTransport
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
// dataSourceTransport implements http.RoundTripper (https://golang.org/pkg/net/http/#RoundTripper)
|
||||
type dataSourceTransport struct {
|
||||
datasourceName string
|
||||
headers map[string]string
|
||||
transport *http.Transport
|
||||
next http.RoundTripper
|
||||
}
|
||||
|
||||
func instrumentRoundtrip(datasourceName string, next http.RoundTripper) promhttp.RoundTripperFunc {
|
||||
return promhttp.RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
|
||||
datasourceLabelName, err := metricutil.SanitizeLabelName(datasourceName)
|
||||
// if the datasource named cannot be turned into a prometheus
|
||||
// label we will skip instrumenting these metrics.
|
||||
if err != nil {
|
||||
return next.RoundTrip(r)
|
||||
}
|
||||
|
||||
datasourceLabel := prometheus.Labels{"datasource": datasourceLabelName}
|
||||
|
||||
requestCounter := datasourceRequestCounter.MustCurryWith(datasourceLabel)
|
||||
requestSummary := datasourceRequestSummary.MustCurryWith(datasourceLabel)
|
||||
requestInFlight := datasourceRequestsInFlight.With(datasourceLabel)
|
||||
responseSizeSummary := datasourceResponseSummary.With(datasourceLabel)
|
||||
|
||||
res, err := promhttp.InstrumentRoundTripperDuration(requestSummary,
|
||||
promhttp.InstrumentRoundTripperCounter(requestCounter,
|
||||
promhttp.InstrumentRoundTripperInFlight(requestInFlight, next))).
|
||||
RoundTrip(r)
|
||||
|
||||
// we avoid measuring contentlength less than zero because it indicates
|
||||
// that the content size is unknown. https://godoc.org/github.com/badu/http#Response
|
||||
if res != nil && res.ContentLength > 0 {
|
||||
responseSizeSummary.Observe(float64(res.ContentLength))
|
||||
}
|
||||
|
||||
return res, err
|
||||
})
|
||||
}
|
||||
|
||||
// RoundTrip executes a single HTTP transaction, returning a Response for the provided Request.
|
||||
func (d *dataSourceTransport) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
for key, value := range d.headers {
|
||||
req.Header.Set(key, value)
|
||||
}
|
||||
|
||||
return instrumentRoundtrip(d.datasourceName, d.next).RoundTrip(req)
|
||||
}
|
||||
|
||||
type cachedTransport struct {
|
||||
updated time.Time
|
||||
|
||||
*dataSourceTransport
|
||||
}
|
||||
|
||||
var ptc = proxyTransportCache{
|
||||
cache: make(map[int64]cachedTransport),
|
||||
}
|
||||
|
||||
func (ds *DataSource) getTimeout() time.Duration {
|
||||
timeout := 0
|
||||
if ds.JsonData != nil {
|
||||
@@ -135,8 +24,22 @@ func (ds *DataSource) getTimeout() time.Duration {
|
||||
return time.Duration(timeout) * time.Second
|
||||
}
|
||||
|
||||
func (ds *DataSource) GetHttpClient() (*http.Client, error) {
|
||||
transport, err := ds.GetHttpTransport()
|
||||
type proxyTransportCache struct {
|
||||
cache map[int64]cachedRoundTripper
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
type cachedRoundTripper struct {
|
||||
updated time.Time
|
||||
roundTripper http.RoundTripper
|
||||
}
|
||||
|
||||
var ptc = proxyTransportCache{
|
||||
cache: make(map[int64]cachedRoundTripper),
|
||||
}
|
||||
|
||||
func (ds *DataSource) GetHTTPClient(provider httpclient.Provider) (*http.Client, error) {
|
||||
transport, err := ds.GetHTTPTransport(provider)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -147,79 +50,86 @@ func (ds *DataSource) GetHttpClient() (*http.Client, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Creates a HTTP Transport middleware chain
|
||||
func (ds *DataSource) GetHttpTransport() (*dataSourceTransport, error) {
|
||||
func (ds *DataSource) GetHTTPTransport(provider httpclient.Provider) (http.RoundTripper, error) {
|
||||
ptc.Lock()
|
||||
defer ptc.Unlock()
|
||||
|
||||
if t, present := ptc.cache[ds.Id]; present && ds.Updated.Equal(t.updated) {
|
||||
return t.dataSourceTransport, nil
|
||||
return t.roundTripper, nil
|
||||
}
|
||||
|
||||
tlsConfig, err := ds.GetTLSConfig()
|
||||
rt, err := provider.GetTransport(ds.HTTPClientOptions())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tlsConfig.Renegotiation = tls.RenegotiateFreelyAsClient
|
||||
|
||||
// Create transport which adds all
|
||||
customHeaders := ds.getCustomHeaders()
|
||||
transport := &http.Transport{
|
||||
TLSClientConfig: tlsConfig,
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
Dial: (&net.Dialer{
|
||||
Timeout: ds.getTimeout(),
|
||||
KeepAlive: time.Duration(setting.DataProxyKeepAlive) * time.Second,
|
||||
}).Dial,
|
||||
TLSHandshakeTimeout: time.Duration(setting.DataProxyTLSHandshakeTimeout) * time.Second,
|
||||
ExpectContinueTimeout: time.Duration(setting.DataProxyExpectContinueTimeout) * time.Second,
|
||||
MaxIdleConns: setting.DataProxyMaxIdleConns,
|
||||
IdleConnTimeout: time.Duration(setting.DataProxyIdleConnTimeout) * time.Second,
|
||||
ptc.cache[ds.Id] = cachedRoundTripper{
|
||||
roundTripper: rt,
|
||||
updated: ds.Updated,
|
||||
}
|
||||
|
||||
// Set default next round tripper to the default transport
|
||||
next := http.RoundTripper(transport)
|
||||
|
||||
// Add SigV4 middleware if enabled, which will then defer to the default transport
|
||||
if ds.JsonData != nil && ds.JsonData.Get("sigV4Auth").MustBool() && setting.SigV4AuthEnabled {
|
||||
next = ds.sigV4Middleware(transport)
|
||||
}
|
||||
|
||||
dsTransport := &dataSourceTransport{
|
||||
datasourceName: ds.Name,
|
||||
headers: customHeaders,
|
||||
transport: transport,
|
||||
next: next,
|
||||
}
|
||||
|
||||
ptc.cache[ds.Id] = cachedTransport{
|
||||
dataSourceTransport: dsTransport,
|
||||
updated: ds.Updated,
|
||||
}
|
||||
|
||||
return dsTransport, nil
|
||||
return rt, nil
|
||||
}
|
||||
|
||||
func (ds *DataSource) sigV4Middleware(next http.RoundTripper) http.RoundTripper {
|
||||
decrypted := ds.DecryptedValues()
|
||||
func (ds *DataSource) HTTPClientOptions() sdkhttpclient.Options {
|
||||
tlsOptions := ds.TLSOptions()
|
||||
opts := sdkhttpclient.Options{
|
||||
Timeouts: &sdkhttpclient.TimeoutOptions{
|
||||
Timeout: ds.getTimeout(),
|
||||
KeepAlive: time.Duration(setting.DataProxyKeepAlive) * time.Second,
|
||||
TLSHandshakeTimeout: time.Duration(setting.DataProxyTLSHandshakeTimeout) * time.Second,
|
||||
ExpectContinueTimeout: time.Duration(setting.DataProxyExpectContinueTimeout) * time.Second,
|
||||
MaxIdleConns: setting.DataProxyMaxIdleConns,
|
||||
MaxIdleConnsPerHost: setting.DataProxyMaxIdleConnsPerHost,
|
||||
IdleConnTimeout: time.Duration(setting.DataProxyIdleConnTimeout) * time.Second,
|
||||
},
|
||||
Headers: getCustomHeaders(ds.JsonData, ds.DecryptedValues()),
|
||||
Labels: map[string]string{
|
||||
"datasource_name": ds.Name,
|
||||
"datasource_uid": ds.Uid,
|
||||
},
|
||||
TLS: &tlsOptions,
|
||||
}
|
||||
|
||||
return sigv4.New(
|
||||
&sigv4.Config{
|
||||
if ds.JsonData != nil {
|
||||
opts.CustomOptions = ds.JsonData.MustMap()
|
||||
}
|
||||
|
||||
if ds.BasicAuth {
|
||||
opts.BasicAuth = &sdkhttpclient.BasicAuthOptions{
|
||||
User: ds.BasicAuthUser,
|
||||
Password: ds.DecryptedBasicAuthPassword(),
|
||||
}
|
||||
} else if ds.User != "" {
|
||||
opts.BasicAuth = &sdkhttpclient.BasicAuthOptions{
|
||||
User: ds.User,
|
||||
Password: ds.DecryptedPassword(),
|
||||
}
|
||||
}
|
||||
|
||||
if ds.JsonData != nil && ds.JsonData.Get("sigV4Auth").MustBool(false) {
|
||||
opts.SigV4 = &sdkhttpclient.SigV4Config{
|
||||
Service: awsServiceNamespace(ds.Type),
|
||||
AccessKey: decrypted["sigV4AccessKey"],
|
||||
SecretKey: decrypted["sigV4SecretKey"],
|
||||
Region: ds.JsonData.Get("sigV4Region").MustString(),
|
||||
AssumeRoleARN: ds.JsonData.Get("sigV4AssumeRoleArn").MustString(),
|
||||
AuthType: ds.JsonData.Get("sigV4AuthType").MustString(),
|
||||
ExternalID: ds.JsonData.Get("sigV4ExternalId").MustString(),
|
||||
Profile: ds.JsonData.Get("sigV4Profile").MustString(),
|
||||
},
|
||||
next,
|
||||
)
|
||||
}
|
||||
|
||||
if val, exists := ds.DecryptedValue("sigV4AccessKey"); exists {
|
||||
opts.SigV4.AccessKey = val
|
||||
}
|
||||
|
||||
if val, exists := ds.DecryptedValue("sigV4SecretKey"); exists {
|
||||
opts.SigV4.SecretKey = val
|
||||
}
|
||||
}
|
||||
|
||||
return opts
|
||||
}
|
||||
|
||||
func (ds *DataSource) GetTLSConfig() (*tls.Config, error) {
|
||||
func (ds *DataSource) TLSOptions() sdkhttpclient.TLSOptions {
|
||||
var tlsSkipVerify, tlsClientAuth, tlsAuthWithCACert bool
|
||||
var serverName string
|
||||
|
||||
@@ -230,55 +140,55 @@ func (ds *DataSource) GetTLSConfig() (*tls.Config, error) {
|
||||
serverName = ds.JsonData.Get("serverName").MustString()
|
||||
}
|
||||
|
||||
tlsConfig := &tls.Config{
|
||||
opts := sdkhttpclient.TLSOptions{
|
||||
InsecureSkipVerify: tlsSkipVerify,
|
||||
ServerName: serverName,
|
||||
}
|
||||
|
||||
if tlsClientAuth || tlsAuthWithCACert {
|
||||
decrypted := ds.SecureJsonData.Decrypt()
|
||||
if tlsAuthWithCACert && len(decrypted["tlsCACert"]) > 0 {
|
||||
caPool := x509.NewCertPool()
|
||||
ok := caPool.AppendCertsFromPEM([]byte(decrypted["tlsCACert"]))
|
||||
if !ok {
|
||||
return nil, errors.New("failed to parse TLS CA PEM certificate")
|
||||
if tlsAuthWithCACert {
|
||||
if val, exists := ds.DecryptedValue("tlsCACert"); exists && len(val) > 0 {
|
||||
opts.CACertificate = val
|
||||
}
|
||||
tlsConfig.RootCAs = caPool
|
||||
}
|
||||
|
||||
if tlsClientAuth {
|
||||
cert, err := tls.X509KeyPair([]byte(decrypted["tlsClientCert"]), []byte(decrypted["tlsClientKey"]))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if val, exists := ds.DecryptedValue("tlsClientCert"); exists && len(val) > 0 {
|
||||
opts.ClientCertificate = val
|
||||
}
|
||||
if val, exists := ds.DecryptedValue("tlsClientKey"); exists && len(val) > 0 {
|
||||
opts.ClientKey = val
|
||||
}
|
||||
tlsConfig.Certificates = []tls.Certificate{cert}
|
||||
}
|
||||
}
|
||||
|
||||
return tlsConfig, nil
|
||||
return opts
|
||||
}
|
||||
|
||||
func (ds *DataSource) GetTLSConfig(httpClientProvider httpclient.Provider) (*tls.Config, error) {
|
||||
return httpClientProvider.GetTLSConfig(ds.HTTPClientOptions())
|
||||
}
|
||||
|
||||
// getCustomHeaders returns a map with all the to be set headers
|
||||
// The map key represents the HeaderName and the value represents this header's value
|
||||
func (ds *DataSource) getCustomHeaders() map[string]string {
|
||||
func getCustomHeaders(jsonData *simplejson.Json, decryptedValues map[string]string) map[string]string {
|
||||
headers := make(map[string]string)
|
||||
if ds.JsonData == nil {
|
||||
if jsonData == nil {
|
||||
return headers
|
||||
}
|
||||
|
||||
decrypted := ds.SecureJsonData.Decrypt()
|
||||
index := 1
|
||||
for {
|
||||
headerNameSuffix := fmt.Sprintf("httpHeaderName%d", index)
|
||||
headerValueSuffix := fmt.Sprintf("httpHeaderValue%d", index)
|
||||
|
||||
key := ds.JsonData.Get(headerNameSuffix).MustString()
|
||||
key := jsonData.Get(headerNameSuffix).MustString()
|
||||
if key == "" {
|
||||
// No (more) header values are available
|
||||
break
|
||||
}
|
||||
|
||||
if val, ok := decrypted[headerValueSuffix]; ok {
|
||||
if val, ok := decryptedValues[headerValueSuffix]; ok {
|
||||
headers[key] = val
|
||||
}
|
||||
index++
|
||||
|
||||
Reference in New Issue
Block a user