Redshift: Support caching async aws queries (#71682)

Co-authored-by: Sarah Zinger <sarah.zinger@grafana.com>
This commit is contained in:
Isabella Siu 2023-07-21 11:34:07 -04:00 committed by GitHub
parent 3457ccbf12
commit 56913fbd95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 201 additions and 64 deletions

View File

@ -75,7 +75,7 @@ These features are early in their development lifecycle and so are not yet suppo
Experimental features might be changed or removed without prior notice. Experimental features might be changed or removed without prior notice.
| Feature toggle name | Description | | Feature toggle name | Description |
| ------------------------------------------- | ------------------------------------------------------------------------------------------------------------ | | ------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `live-service-web-worker` | This will use a webworker thread to processes events rather than the main thread | | `live-service-web-worker` | This will use a webworker thread to processes events rather than the main thread |
| `queryOverLive` | Use Grafana Live WebSocket to execute backend queries | | `queryOverLive` | Use Grafana Live WebSocket to execute backend queries |
| `lokiExperimentalStreaming` | Support new streaming approach for loki (prototype, needs special loki build) | | `lokiExperimentalStreaming` | Support new streaming approach for loki (prototype, needs special loki build) |
@ -128,6 +128,7 @@ Experimental features might be changed or removed without prior notice.
| `disableTraceQLStreaming` | Disables the option to stream the response of TraceQL queries of the Tempo data source | | `disableTraceQLStreaming` | Disables the option to stream the response of TraceQL queries of the Tempo data source |
| `grafanaAPIServer` | Enable Kubernetes API Server for Grafana resources | | `grafanaAPIServer` | Enable Kubernetes API Server for Grafana resources |
| `featureToggleAdminPage` | Enable admin page for managing feature toggles from the Grafana front-end | | `featureToggleAdminPage` | Enable admin page for managing feature toggles from the Grafana front-end |
| `awsAsyncQueryCaching` | Enable caching for async queries for Redshift and Athena. Requires that the `useCachingService` feature toggle is enabled and the datasource has caching and async query support enabled |
## Development feature toggles ## Development feature toggles

2
go.mod
View File

@ -64,7 +64,7 @@ require (
github.com/gorilla/websocket v1.5.0 // @grafana/grafana-app-platform-squad github.com/gorilla/websocket v1.5.0 // @grafana/grafana-app-platform-squad
github.com/grafana/alerting v0.0.0-20230606080147-55b8d71c7890 // @grafana/alerting-squad-backend github.com/grafana/alerting v0.0.0-20230606080147-55b8d71c7890 // @grafana/alerting-squad-backend
github.com/grafana/cuetsy v0.1.10 // @grafana/grafana-as-code github.com/grafana/cuetsy v0.1.10 // @grafana/grafana-as-code
github.com/grafana/grafana-aws-sdk v0.15.0 // @grafana/aws-datasources github.com/grafana/grafana-aws-sdk v0.16.1 // @grafana/aws-datasources
github.com/grafana/grafana-azure-sdk-go v1.7.0 // @grafana/backend-platform github.com/grafana/grafana-azure-sdk-go v1.7.0 // @grafana/backend-platform
github.com/grafana/grafana-plugin-sdk-go v0.171.0 // @grafana/plugins-platform-backend github.com/grafana/grafana-plugin-sdk-go v0.171.0 // @grafana/plugins-platform-backend
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // @grafana/backend-platform github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // @grafana/backend-platform

4
go.sum
View File

@ -1336,6 +1336,10 @@ github.com/grafana/grafana-apiserver v0.0.0-20230713001719-88a9ed41992d h1:fjc6v
github.com/grafana/grafana-apiserver v0.0.0-20230713001719-88a9ed41992d/go.mod h1:2g9qGdCeU6x/69QAs82WM52bwBUT5/CBaBD0I94+txU= github.com/grafana/grafana-apiserver v0.0.0-20230713001719-88a9ed41992d/go.mod h1:2g9qGdCeU6x/69QAs82WM52bwBUT5/CBaBD0I94+txU=
github.com/grafana/grafana-aws-sdk v0.15.0 h1:ZOPHQcC5NUFi1bLTwnju91G0KmGh1z+qXOKj9nDfxNs= github.com/grafana/grafana-aws-sdk v0.15.0 h1:ZOPHQcC5NUFi1bLTwnju91G0KmGh1z+qXOKj9nDfxNs=
github.com/grafana/grafana-aws-sdk v0.15.0/go.mod h1:rCXLYoMpPqF90U7XqgVJ1HIAopFVF0bB3SXBVEJIm3I= github.com/grafana/grafana-aws-sdk v0.15.0/go.mod h1:rCXLYoMpPqF90U7XqgVJ1HIAopFVF0bB3SXBVEJIm3I=
github.com/grafana/grafana-aws-sdk v0.16.0 h1:FFVab0jvhENce5cMEAodANCa5ARjyObN1dSOrvciW0c=
github.com/grafana/grafana-aws-sdk v0.16.0/go.mod h1:rCXLYoMpPqF90U7XqgVJ1HIAopFVF0bB3SXBVEJIm3I=
github.com/grafana/grafana-aws-sdk v0.16.1 h1:R/hMtQP7H0+8nWFoIOApaZj0qstmZM+5Pw0rRzk3A3Y=
github.com/grafana/grafana-aws-sdk v0.16.1/go.mod h1:rCXLYoMpPqF90U7XqgVJ1HIAopFVF0bB3SXBVEJIm3I=
github.com/grafana/grafana-azure-sdk-go v1.7.0 h1:2EAPwNl/qsDMHwKjlzaHif+H+bHcF1W7sM8/jAcxVcI= github.com/grafana/grafana-azure-sdk-go v1.7.0 h1:2EAPwNl/qsDMHwKjlzaHif+H+bHcF1W7sM8/jAcxVcI=
github.com/grafana/grafana-azure-sdk-go v1.7.0/go.mod h1:X4PdEQIYgHfn0KTa2ZTKvufhNz6jbCEKUQPZIlcyOGw= github.com/grafana/grafana-azure-sdk-go v1.7.0/go.mod h1:X4PdEQIYgHfn0KTa2ZTKvufhNz6jbCEKUQPZIlcyOGw=
github.com/grafana/grafana-google-sdk-go v0.1.0 h1:LKGY8z2DSxKjYfr2flZsWgTRTZ6HGQbTqewE3JvRaNA= github.com/grafana/grafana-google-sdk-go v0.1.0 h1:LKGY8z2DSxKjYfr2flZsWgTRTZ6HGQbTqewE3JvRaNA=

View File

@ -114,5 +114,6 @@ export interface FeatureToggles {
disableTraceQLStreaming?: boolean; disableTraceQLStreaming?: boolean;
grafanaAPIServer?: boolean; grafanaAPIServer?: boolean;
featureToggleAdminPage?: boolean; featureToggleAdminPage?: boolean;
awsAsyncQueryCaching?: boolean;
splitScopes?: boolean; splitScopes?: boolean;
} }

View File

@ -657,6 +657,12 @@ var (
Owner: grafanaOperatorExperienceSquad, Owner: grafanaOperatorExperienceSquad,
RequiresRestart: true, RequiresRestart: true,
}, },
{
Name: "awsAsyncQueryCaching",
Description: "Enable caching for async queries for Redshift and Athena. Requires that the `useCachingService` feature toggle is enabled and the datasource has caching and async query support enabled",
Stage: FeatureStageExperimental,
Owner: awsDatasourcesSquad,
},
{ {
Name: "splitScopes", Name: "splitScopes",
Description: "Support faster dashboard and folder search by splitting permission scopes into parts", Description: "Support faster dashboard and folder search by splitting permission scopes into parts",

View File

@ -95,4 +95,5 @@ mlExpressions,experimental,@grafana/alerting-squad,false,false,false,false
disableTraceQLStreaming,experimental,@grafana/observability-traces-and-profiling,false,false,false,true disableTraceQLStreaming,experimental,@grafana/observability-traces-and-profiling,false,false,false,true
grafanaAPIServer,experimental,@grafana/grafana-app-platform-squad,false,false,false,false grafanaAPIServer,experimental,@grafana/grafana-app-platform-squad,false,false,false,false
featureToggleAdminPage,experimental,@grafana/grafana-operator-experience-squad,false,false,true,false featureToggleAdminPage,experimental,@grafana/grafana-operator-experience-squad,false,false,true,false
awsAsyncQueryCaching,experimental,@grafana/aws-datasources,false,false,false,false
splitScopes,preview,@grafana/grafana-authnz-team,false,false,true,false splitScopes,preview,@grafana/grafana-authnz-team,false,false,true,false

1 Name Stage Owner requiresDevMode RequiresLicense RequiresRestart FrontendOnly
95 disableTraceQLStreaming experimental @grafana/observability-traces-and-profiling false false false true
96 grafanaAPIServer experimental @grafana/grafana-app-platform-squad false false false false
97 featureToggleAdminPage experimental @grafana/grafana-operator-experience-squad false false true false
98 awsAsyncQueryCaching experimental @grafana/aws-datasources false false false false
99 splitScopes preview @grafana/grafana-authnz-team false false true false

View File

@ -391,6 +391,10 @@ const (
// Enable admin page for managing feature toggles from the Grafana front-end // Enable admin page for managing feature toggles from the Grafana front-end
FlagFeatureToggleAdminPage = "featureToggleAdminPage" FlagFeatureToggleAdminPage = "featureToggleAdminPage"
// FlagAwsAsyncQueryCaching
// Enable caching for async queries for Redshift and Athena. Requires that the `useCachingService` feature toggle is enabled and the datasource has caching and async query support enabled
FlagAwsAsyncQueryCaching = "awsAsyncQueryCaching"
// FlagSplitScopes // FlagSplitScopes
// Support faster dashboard and folder search by splitting permission scopes into parts // Support faster dashboard and folder search by splitting permission scopes into parts
FlagSplitScopes = "splitScopes" FlagSplitScopes = "splitScopes"

View File

@ -19,6 +19,14 @@ var QueryCachingRequestHistogram = prometheus.NewHistogramVec(prometheus.Histogr
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 25, 50, 100}, Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 25, 50, 100},
}, []string{"datasource_type", "cache", "query_type"}) }, []string{"datasource_type", "cache", "query_type"})
var ShouldCacheQueryHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: metrics.ExporterName,
Subsystem: "caching",
Name: "should_cache_query_request_duration_seconds",
Help: "histogram of grafana query endpoint requests in seconds",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 25, 50, 100},
}, []string{"datasource_type", "cache", "shouldCache", "query_type"})
var ResourceCachingRequestHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ var ResourceCachingRequestHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: metrics.ExporterName, Namespace: metrics.ExporterName,
Subsystem: "caching", Subsystem: "caching",

View File

@ -2,19 +2,31 @@ package clientmiddleware
import ( import (
"context" "context"
"strconv"
"time" "time"
"github.com/grafana/grafana-aws-sdk/pkg/awsds"
"github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/plugins" "github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/services/caching" "github.com/grafana/grafana/pkg/services/caching"
"github.com/grafana/grafana/pkg/services/contexthandler" "github.com/grafana/grafana/pkg/services/contexthandler"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
// needed to mock the function for testing
var shouldCacheQuery = awsds.ShouldCacheQuery
// NewCachingMiddleware creates a new plugins.ClientMiddleware that will // NewCachingMiddleware creates a new plugins.ClientMiddleware that will
// attempt to read and write query results to the cache // attempt to read and write query results to the cache
func NewCachingMiddleware(cachingService caching.CachingService) plugins.ClientMiddleware { func NewCachingMiddleware(cachingService caching.CachingService) plugins.ClientMiddleware {
return NewCachingMiddlewareWithFeatureManager(cachingService, nil)
}
// NewCachingMiddlewareWithFeatureManager creates a new plugins.ClientMiddleware that will
// attempt to read and write query results to the cache with a feature manager
func NewCachingMiddlewareWithFeatureManager(cachingService caching.CachingService, features *featuremgmt.FeatureManager) plugins.ClientMiddleware {
log := log.New("caching_middleware") log := log.New("caching_middleware")
if err := prometheus.Register(QueryCachingRequestHistogram); err != nil { if err := prometheus.Register(QueryCachingRequestHistogram); err != nil {
log.Error("error registering prometheus collector 'QueryRequestHistogram'", "error", err) log.Error("error registering prometheus collector 'QueryRequestHistogram'", "error", err)
@ -27,6 +39,7 @@ func NewCachingMiddleware(cachingService caching.CachingService) plugins.ClientM
next: next, next: next,
caching: cachingService, caching: cachingService,
log: log, log: log,
features: features,
} }
}) })
} }
@ -35,6 +48,7 @@ type CachingMiddleware struct {
next plugins.Client next plugins.Client
caching caching.CachingService caching caching.CachingService
log log.Logger log log.Logger
features *featuremgmt.FeatureManager
} }
// QueryData receives a data request and attempts to access results already stored in the cache for that request. // QueryData receives a data request and attempts to access results already stored in the cache for that request.
@ -57,7 +71,8 @@ func (m *CachingMiddleware) QueryData(ctx context.Context, req *backend.QueryDat
hit, cr := m.caching.HandleQueryRequest(ctx, req) hit, cr := m.caching.HandleQueryRequest(ctx, req)
// record request duration if caching was used // record request duration if caching was used
if ch := reqCtx.Resp.Header().Get(caching.XCacheHeader); ch != "" { ch := reqCtx.Resp.Header().Get(caching.XCacheHeader)
if ch != "" {
defer func() { defer func() {
QueryCachingRequestHistogram.With(prometheus.Labels{ QueryCachingRequestHistogram.With(prometheus.Labels{
"datasource_type": req.PluginContext.DataSourceInstanceSettings.Type, "datasource_type": req.PluginContext.DataSourceInstanceSettings.Type,
@ -77,7 +92,25 @@ func (m *CachingMiddleware) QueryData(ctx context.Context, req *backend.QueryDat
// Update the query cache with the result for this metrics request // Update the query cache with the result for this metrics request
if err == nil && cr.UpdateCacheFn != nil { if err == nil && cr.UpdateCacheFn != nil {
// If AWS async caching is not enabled, use the old code path
if m.features == nil || !m.features.IsEnabled(featuremgmt.FlagAwsAsyncQueryCaching) {
cr.UpdateCacheFn(ctx, resp) cr.UpdateCacheFn(ctx, resp)
} else {
// time how long shouldCacheQuery takes
startShouldCacheQuery := time.Now()
shouldCache := shouldCacheQuery(resp)
ShouldCacheQueryHistogram.With(prometheus.Labels{
"datasource_type": req.PluginContext.DataSourceInstanceSettings.Type,
"cache": ch,
"shouldCache": strconv.FormatBool(shouldCache),
"query_type": getQueryType(reqCtx),
}).Observe(time.Since(startShouldCacheQuery).Seconds())
// If AWS async caching is enabled and resp is for a running async query, don't cache it
if shouldCache {
cr.UpdateCacheFn(ctx, resp)
}
}
} }
return resp, err return resp, err

View File

@ -10,6 +10,7 @@ import (
"github.com/grafana/grafana/pkg/plugins/manager/client/clienttest" "github.com/grafana/grafana/pkg/plugins/manager/client/clienttest"
"github.com/grafana/grafana/pkg/services/caching" "github.com/grafana/grafana/pkg/services/caching"
"github.com/grafana/grafana/pkg/services/contexthandler" "github.com/grafana/grafana/pkg/services/contexthandler"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/user" "github.com/grafana/grafana/pkg/services/user"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -74,8 +75,17 @@ func TestCachingMiddleware(t *testing.T) {
}) })
t.Run("If cache returns a miss, queries are issued and the update cache function is called", func(t *testing.T) { t.Run("If cache returns a miss, queries are issued and the update cache function is called", func(t *testing.T) {
origShouldCacheQuery := shouldCacheQuery
var shouldCacheQueryCalled bool
shouldCacheQuery = func(resp *backend.QueryDataResponse) bool {
shouldCacheQueryCalled = true
return true
}
t.Cleanup(func() { t.Cleanup(func() {
updateCacheCalled = false updateCacheCalled = false
shouldCacheQueryCalled = false
shouldCacheQuery = origShouldCacheQuery
cs.Reset() cs.Reset()
}) })
@ -90,6 +100,75 @@ func TestCachingMiddleware(t *testing.T) {
assert.Nil(t, resp) assert.Nil(t, resp)
// Since it was a miss, the middleware called the update func // Since it was a miss, the middleware called the update func
assert.True(t, updateCacheCalled) assert.True(t, updateCacheCalled)
// Since the feature flag was not set, the middleware did not call shouldCacheQuery
assert.False(t, shouldCacheQueryCalled)
})
t.Run("with async queries", func(t *testing.T) {
asyncCdt := clienttest.NewClientDecoratorTest(t,
clienttest.WithReqContext(req, &user.SignedInUser{}),
clienttest.WithMiddlewares(
NewCachingMiddlewareWithFeatureManager(cs, featuremgmt.WithFeatures(featuremgmt.FlagAwsAsyncQueryCaching))),
)
t.Run("If shoudCacheQuery returns true update cache function is called", func(t *testing.T) {
origShouldCacheQuery := shouldCacheQuery
var shouldCacheQueryCalled bool
shouldCacheQuery = func(resp *backend.QueryDataResponse) bool {
shouldCacheQueryCalled = true
return true
}
t.Cleanup(func() {
updateCacheCalled = false
shouldCacheQueryCalled = false
shouldCacheQuery = origShouldCacheQuery
cs.Reset()
})
cs.ReturnHit = false
cs.ReturnQueryResponse = dataResponse
resp, err := asyncCdt.Decorator.QueryData(req.Context(), qdr)
assert.NoError(t, err)
// Cache service is called once
cs.AssertCalls(t, "HandleQueryRequest", 1)
// Equals nil (returned by the decorator test)
assert.Nil(t, resp)
// Since it was a miss, the middleware called the update func
assert.True(t, updateCacheCalled)
// Since the feature flag set, the middleware called shouldCacheQuery
assert.True(t, shouldCacheQueryCalled)
})
t.Run("If shoudCacheQuery returns false update cache function is not called", func(t *testing.T) {
origShouldCacheQuery := shouldCacheQuery
var shouldCacheQueryCalled bool
shouldCacheQuery = func(resp *backend.QueryDataResponse) bool {
shouldCacheQueryCalled = true
return false
}
t.Cleanup(func() {
updateCacheCalled = false
shouldCacheQueryCalled = false
shouldCacheQuery = origShouldCacheQuery
cs.Reset()
})
cs.ReturnHit = false
cs.ReturnQueryResponse = dataResponse
resp, err := asyncCdt.Decorator.QueryData(req.Context(), qdr)
assert.NoError(t, err)
// Cache service is called once
cs.AssertCalls(t, "HandleQueryRequest", 1)
// Equals nil (returned by the decorator test)
assert.Nil(t, resp)
// Since it was a miss, the middleware called the update func
assert.False(t, updateCacheCalled)
// Since the feature flag set, the middleware called shouldCacheQuery
assert.True(t, shouldCacheQueryCalled)
})
}) })
}) })

View File

@ -137,7 +137,7 @@ func CreateMiddlewares(cfg *setting.Cfg, oAuthTokenService oauthtoken.OAuthToken
// Placing the new service implementation behind a feature flag until it is known to be stable // Placing the new service implementation behind a feature flag until it is known to be stable
if features.IsEnabled(featuremgmt.FlagUseCachingService) { if features.IsEnabled(featuremgmt.FlagUseCachingService) {
middlewares = append(middlewares, clientmiddleware.NewCachingMiddleware(cachingService)) middlewares = append(middlewares, clientmiddleware.NewCachingMiddlewareWithFeatureManager(cachingService, features))
} }
if cfg.SendUserHeader { if cfg.SendUserHeader {