mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Caching: Refactor enterprise query caching middleware to a wire service (#65616)
* define initial service and add to wire * update caching service interface * add skipQueryCache header handler and update metrics query function to use it * add caching service as a dependency to query service * working caching impl * propagate cache status to frontend in response * beginning of improvements suggested by Lean - separate caching logic from query logic. * more changes to simplify query function * Decided to revert renaming of function * Remove error status from cache request * add extra documentation * Move query caching duration metric to query package * add a little bit of documentation * wip: convert resource caching * Change return type of query service QueryData to a QueryDataResponse with Headers * update codeowners * change X-Cache value to const * use resource caching in endpoint handlers * write resource headers to response even if it's not a cache hit * fix panic caused by lack of nil check * update unit test * remove NONE header - shouldn't show up in OSS * Convert everything to use the plugin middleware * revert a few more things * clean up unused vars * start reverting resource caching, start to implement in plugin middleware * revert more, fix typo * Update caching interfaces - resource caching now has a separate cache method * continue wiring up new resource caching conventions - still in progress * add more safety to implementation * remove some unused objects * remove some code that I left in by accident * add some comments, fix codeowners, fix duplicate registration * fix source of panic in resource middleware * Update client decorator test to provide an empty response object * create tests for caching middleware * fix unit test * Update pkg/services/caching/service.go Co-authored-by: Arati R. <33031346+suntala@users.noreply.github.com> * improve error message in error log * quick docs update * Remove use of mockery. Update return signature to return an explicit hit/miss bool * create unit test for empty request context * rename caching metrics to make it clear they pertain to caching * Update pkg/services/pluginsintegration/clientmiddleware/caching_middleware.go Co-authored-by: Marcus Efraimsson <marcus.efraimsson@gmail.com> * Add clarifying comments to cache skip middleware func * Add comment pointing to the resource cache update call * fix unit tests (missing dependency) * try to fix mystery syntax error * fix a panic * Caching: Introduce feature toggle to caching service refactor (#66323) * introduce new feature toggle * hide calls to new service behind a feature flag * remove licensing flag from toggle (misunderstood what it was for) * fix unit tests * rerun toggle gen --------- Co-authored-by: Arati R. <33031346+suntala@users.noreply.github.com> Co-authored-by: Marcus Efraimsson <marcus.efraimsson@gmail.com>
This commit is contained in:
parent
e78be44e1a
commit
5626461b3c
1
.github/CODEOWNERS
vendored
1
.github/CODEOWNERS
vendored
@ -541,6 +541,7 @@ lerna.json @grafana/frontend-ops
|
||||
# Grafana Operator Experience Team
|
||||
/pkg/infra/httpclient/httpclientprovider/sigv4_middleware.go @grafana/grafana-operator-experience-squad
|
||||
/pkg/infra/httpclient/httpclientprovider/sigv4_middleware_test.go @grafana/grafana-operator-experience-squad
|
||||
/pkg/services/caching/ @grafana/grafana-operator-experience-squad
|
||||
|
||||
# Kind definitions
|
||||
/kinds/dashboard @grafana/dashboards-squad
|
||||
|
@ -34,6 +34,7 @@ Some stable features are enabled by default. You can disable a stable feature by
|
||||
| `emptyDashboardPage` | Enable the redesigned user interface of a dashboard page that includes no panels | Yes |
|
||||
| `disablePrometheusExemplarSampling` | Disable Prometheus exemplar sampling | |
|
||||
| `logsSampleInExplore` | Enables access to the logs sample feature in Explore | Yes |
|
||||
| `useCachingService` | When turned on, the new query and resource caching implementation using a wire service inject will be used in place of the previous middleware implementation | |
|
||||
| `disableElasticsearchBackendQuerying` | Disable the processing of queries and responses in the Elasticsearch data source through backend | |
|
||||
|
||||
## Beta feature toggles
|
||||
|
@ -95,5 +95,6 @@ export interface FeatureToggles {
|
||||
pyroscopeFlameGraph?: boolean;
|
||||
externalServiceAuth?: boolean;
|
||||
dataplaneFrontendFallback?: boolean;
|
||||
useCachingService?: boolean;
|
||||
disableElasticsearchBackendQuerying?: boolean;
|
||||
}
|
||||
|
@ -653,7 +653,7 @@ func (hs *HTTPServer) CallDatasourceResource(c *contextmodel.ReqContext) {
|
||||
c.JsonApiErr(http.StatusBadRequest, "id is invalid", nil)
|
||||
return
|
||||
}
|
||||
ds, err := hs.DataSourceCache.GetDatasource(c.Req.Context(), datasourceID, c.SignedInUser, c.SkipCache)
|
||||
ds, err := hs.DataSourceCache.GetDatasource(c.Req.Context(), datasourceID, c.SignedInUser, c.SkipDSCache)
|
||||
if err != nil {
|
||||
if errors.Is(err, datasources.ErrDataSourceAccessDenied) {
|
||||
c.JsonApiErr(403, "Access denied to datasource", err)
|
||||
@ -690,7 +690,7 @@ func (hs *HTTPServer) CallDatasourceResourceWithUID(c *contextmodel.ReqContext)
|
||||
return
|
||||
}
|
||||
|
||||
ds, err := hs.DataSourceCache.GetDatasourceByUID(c.Req.Context(), dsUID, c.SignedInUser, c.SkipCache)
|
||||
ds, err := hs.DataSourceCache.GetDatasourceByUID(c.Req.Context(), dsUID, c.SignedInUser, c.SkipDSCache)
|
||||
if err != nil {
|
||||
if errors.Is(err, datasources.ErrDataSourceAccessDenied) {
|
||||
c.JsonApiErr(http.StatusForbidden, "Access denied to datasource", err)
|
||||
@ -760,7 +760,7 @@ func (hs *HTTPServer) CheckDatasourceHealthWithUID(c *contextmodel.ReqContext) r
|
||||
return response.Error(http.StatusBadRequest, "UID is invalid", nil)
|
||||
}
|
||||
|
||||
ds, err := hs.DataSourceCache.GetDatasourceByUID(c.Req.Context(), dsUID, c.SignedInUser, c.SkipCache)
|
||||
ds, err := hs.DataSourceCache.GetDatasourceByUID(c.Req.Context(), dsUID, c.SignedInUser, c.SkipDSCache)
|
||||
if err != nil {
|
||||
if errors.Is(err, datasources.ErrDataSourceAccessDenied) {
|
||||
return response.Error(http.StatusForbidden, "Access denied to datasource", err)
|
||||
@ -790,7 +790,7 @@ func (hs *HTTPServer) CheckDatasourceHealth(c *contextmodel.ReqContext) response
|
||||
return response.Error(http.StatusBadRequest, "id is invalid", nil)
|
||||
}
|
||||
|
||||
ds, err := hs.DataSourceCache.GetDatasource(c.Req.Context(), datasourceID, c.SignedInUser, c.SkipCache)
|
||||
ds, err := hs.DataSourceCache.GetDatasource(c.Req.Context(), datasourceID, c.SignedInUser, c.SkipDSCache)
|
||||
if err != nil {
|
||||
if errors.Is(err, datasources.ErrDataSourceAccessDenied) {
|
||||
return response.Error(http.StatusForbidden, "Access denied to datasource", err)
|
||||
|
@ -41,6 +41,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/apikey"
|
||||
"github.com/grafana/grafana/pkg/services/auth"
|
||||
"github.com/grafana/grafana/pkg/services/authn"
|
||||
"github.com/grafana/grafana/pkg/services/caching"
|
||||
"github.com/grafana/grafana/pkg/services/cleanup"
|
||||
"github.com/grafana/grafana/pkg/services/contexthandler"
|
||||
"github.com/grafana/grafana/pkg/services/correlations"
|
||||
@ -210,6 +211,7 @@ type HTTPServer struct {
|
||||
statsService stats.Service
|
||||
authnService authn.Service
|
||||
starApi *starApi.API
|
||||
cachingService caching.CachingService
|
||||
}
|
||||
|
||||
type ServerOptions struct {
|
||||
@ -252,7 +254,8 @@ func ProvideHTTPServer(opts ServerOptions, cfg *setting.Cfg, routeRegister routi
|
||||
accesscontrolService accesscontrol.Service, dashboardThumbsService thumbs.DashboardThumbService, navTreeService navtree.Service,
|
||||
annotationRepo annotations.Repository, tagService tag.Service, searchv2HTTPService searchV2.SearchHTTPService, oauthTokenService oauthtoken.OAuthTokenService,
|
||||
statsService stats.Service, authnService authn.Service, pluginsCDNService *pluginscdn.Service,
|
||||
starApi *starApi.API,
|
||||
starApi *starApi.API, cachingService caching.CachingService,
|
||||
|
||||
) (*HTTPServer, error) {
|
||||
web.Env = cfg.Env
|
||||
m := web.New()
|
||||
@ -358,6 +361,7 @@ func ProvideHTTPServer(opts ServerOptions, cfg *setting.Cfg, routeRegister routi
|
||||
authnService: authnService,
|
||||
pluginsCDNService: pluginsCDNService,
|
||||
starApi: starApi,
|
||||
cachingService: cachingService,
|
||||
}
|
||||
if hs.Listener != nil {
|
||||
hs.log.Debug("Using provided listener")
|
||||
@ -630,7 +634,7 @@ func (hs *HTTPServer) addMiddlewaresAndStaticRoutes() {
|
||||
m.Use(middleware.ValidateHostHeader(hs.Cfg))
|
||||
}
|
||||
|
||||
m.Use(middleware.HandleNoCacheHeader)
|
||||
m.Use(middleware.HandleNoCacheHeaders)
|
||||
|
||||
if hs.Cfg.CSPEnabled || hs.Cfg.CSPReportOnlyEnabled {
|
||||
m.UseMiddleware(middleware.ContentSecurityPolicy(hs.Cfg, hs.log))
|
||||
|
@ -52,7 +52,7 @@ func (hs *HTTPServer) QueryMetricsV2(c *contextmodel.ReqContext) response.Respon
|
||||
return response.Error(http.StatusBadRequest, "bad request data", err)
|
||||
}
|
||||
|
||||
resp, err := hs.queryDataService.QueryData(c.Req.Context(), c.SignedInUser, c.SkipCache, reqDTO)
|
||||
resp, err := hs.queryDataService.QueryData(c.Req.Context(), c.SignedInUser, c.SkipDSCache, reqDTO)
|
||||
if err != nil {
|
||||
return hs.handleQueryMetricsError(err)
|
||||
}
|
||||
|
@ -15,6 +15,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/plugins/backendplugin"
|
||||
contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model"
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/util/proxyutil"
|
||||
"github.com/grafana/grafana/pkg/web"
|
||||
)
|
||||
@ -129,7 +130,7 @@ func (hs *HTTPServer) makePluginResourceRequest(w http.ResponseWriter, req *http
|
||||
|
||||
var flushStreamErr error
|
||||
go func() {
|
||||
flushStreamErr = hs.flushStream(stream, w)
|
||||
flushStreamErr = hs.flushStream(req.Context(), crReq, stream, w)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
@ -140,9 +141,10 @@ func (hs *HTTPServer) makePluginResourceRequest(w http.ResponseWriter, req *http
|
||||
return flushStreamErr
|
||||
}
|
||||
|
||||
func (hs *HTTPServer) flushStream(stream callResourceClientResponseStream, w http.ResponseWriter) error {
|
||||
func (hs *HTTPServer) flushStream(ctx context.Context, req *backend.CallResourceRequest, stream callResourceClientResponseStream, w http.ResponseWriter) error {
|
||||
processedStreams := 0
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
for {
|
||||
resp, err := stream.Recv()
|
||||
if errors.Is(err, io.EOF) {
|
||||
@ -198,6 +200,12 @@ func (hs *HTTPServer) flushStream(stream callResourceClientResponseStream, w htt
|
||||
|
||||
if _, err := w.Write(resp.Body); err != nil {
|
||||
hs.log.Error("Failed to write resource response", "err", err)
|
||||
} else if hs.Features.IsEnabled(featuremgmt.FlagUseCachingService) {
|
||||
// Placing the new service implementation behind a feature flag until it is known to be stable
|
||||
|
||||
// The enterprise implementation of this function will use the headers and status of the first response,
|
||||
// And append the body of any subsequent responses. It waits for the context to be canceled before caching the cumulative result.
|
||||
hs.cachingService.CacheResourceResponse(ctx, req, resp)
|
||||
}
|
||||
|
||||
if flusher, ok := w.(http.Flusher); ok {
|
||||
|
@ -29,6 +29,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/plugins/manager/store"
|
||||
"github.com/grafana/grafana/pkg/plugins/pluginscdn"
|
||||
"github.com/grafana/grafana/pkg/services/accesscontrol"
|
||||
"github.com/grafana/grafana/pkg/services/caching"
|
||||
datasources "github.com/grafana/grafana/pkg/services/datasources/fakes"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/oauthtoken/oauthtokentest"
|
||||
@ -77,6 +78,7 @@ func TestCallResource(t *testing.T) {
|
||||
hs.QuotaService = quotatest.New(false, nil)
|
||||
hs.pluginStore = ps
|
||||
hs.pluginClient = pluginClient.ProvideService(reg, pCfg)
|
||||
hs.cachingService = &caching.OSSCachingService{}
|
||||
})
|
||||
|
||||
t.Run("Test successful response is received for valid request", func(t *testing.T) {
|
||||
@ -106,7 +108,7 @@ func TestCallResource(t *testing.T) {
|
||||
req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
|
||||
return errors.New("something went wrong")
|
||||
}),
|
||||
}, pluginsintegration.CreateMiddlewares(cfg, &oauthtokentest.Service{}, tracing.InitializeTracerForTest())...)
|
||||
}, pluginsintegration.CreateMiddlewares(cfg, &oauthtokentest.Service{}, tracing.InitializeTracerForTest(), &caching.OSSCachingService{}, &featuremgmt.FeatureManager{})...)
|
||||
require.NoError(t, err)
|
||||
|
||||
srv = SetupAPITestServer(t, func(hs *HTTPServer) {
|
||||
|
@ -30,7 +30,9 @@ import (
|
||||
"github.com/grafana/grafana/pkg/plugins/manager/store"
|
||||
"github.com/grafana/grafana/pkg/plugins/pluginscdn"
|
||||
ac "github.com/grafana/grafana/pkg/services/accesscontrol"
|
||||
"github.com/grafana/grafana/pkg/services/caching"
|
||||
contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/org"
|
||||
"github.com/grafana/grafana/pkg/services/org/orgtest"
|
||||
"github.com/grafana/grafana/pkg/services/pluginsintegration/pluginaccesscontrol"
|
||||
@ -371,9 +373,11 @@ func Test_GetPluginAssets(t *testing.T) {
|
||||
|
||||
func TestMakePluginResourceRequest(t *testing.T) {
|
||||
hs := HTTPServer{
|
||||
Cfg: setting.NewCfg(),
|
||||
log: log.New(),
|
||||
pluginClient: &fakePluginClient{},
|
||||
Cfg: setting.NewCfg(),
|
||||
log: log.New(),
|
||||
pluginClient: &fakePluginClient{},
|
||||
cachingService: &caching.OSSCachingService{},
|
||||
Features: &featuremgmt.FeatureManager{},
|
||||
}
|
||||
req := httptest.NewRequest(http.MethodGet, "/", nil)
|
||||
|
||||
@ -399,6 +403,8 @@ func TestMakePluginResourceRequestSetCookieNotPresent(t *testing.T) {
|
||||
pluginClient: &fakePluginClient{
|
||||
headers: map[string][]string{"Set-Cookie": {"monster"}},
|
||||
},
|
||||
cachingService: &caching.OSSCachingService{},
|
||||
Features: &featuremgmt.FeatureManager{},
|
||||
}
|
||||
req := httptest.NewRequest(http.MethodGet, "/", nil)
|
||||
resp := httptest.NewRecorder()
|
||||
@ -433,6 +439,8 @@ func TestMakePluginResourceRequestContentTypeUnique(t *testing.T) {
|
||||
"x-another": {"hello"},
|
||||
},
|
||||
},
|
||||
cachingService: &caching.OSSCachingService{},
|
||||
Features: &featuremgmt.FeatureManager{},
|
||||
}
|
||||
req := httptest.NewRequest(http.MethodGet, "/", nil)
|
||||
resp := httptest.NewRecorder()
|
||||
@ -456,9 +464,11 @@ func TestMakePluginResourceRequestContentTypeEmpty(t *testing.T) {
|
||||
statusCode: http.StatusNoContent,
|
||||
}
|
||||
hs := HTTPServer{
|
||||
Cfg: setting.NewCfg(),
|
||||
log: log.New(),
|
||||
pluginClient: pluginClient,
|
||||
Cfg: setting.NewCfg(),
|
||||
log: log.New(),
|
||||
pluginClient: pluginClient,
|
||||
cachingService: &caching.OSSCachingService{},
|
||||
Features: &featuremgmt.FeatureManager{},
|
||||
}
|
||||
req := httptest.NewRequest(http.MethodGet, "/", nil)
|
||||
resp := httptest.NewRecorder()
|
||||
|
@ -97,7 +97,7 @@ func contextProvider(tc *testContext) web.Handler {
|
||||
Context: c,
|
||||
SignedInUser: tc.user,
|
||||
IsSignedIn: signedIn,
|
||||
SkipCache: true,
|
||||
SkipDSCache: true,
|
||||
Logger: log.New("test"),
|
||||
}
|
||||
c.Req = c.Req.WithContext(ctxkey.Set(c.Req.Context(), reqCtx))
|
||||
|
@ -22,8 +22,11 @@ var (
|
||||
ReqOrgAdmin = RoleAuth(org.RoleAdmin)
|
||||
)
|
||||
|
||||
func HandleNoCacheHeader(ctx *contextmodel.ReqContext) {
|
||||
ctx.SkipCache = ctx.Req.Header.Get("X-Grafana-NoCache") == "true"
|
||||
func HandleNoCacheHeaders(ctx *contextmodel.ReqContext) {
|
||||
// X-Grafana-NoCache tells Grafana to skip the cache while retrieving datasource instance metadata
|
||||
ctx.SkipDSCache = ctx.Req.Header.Get("X-Grafana-NoCache") == "true"
|
||||
// X-Cache-Skip tells Grafana to skip the Enterprise query/resource cache while issuing query and resource calls
|
||||
ctx.SkipQueryCache = ctx.Req.Header.Get("X-Cache-Skip") == "true"
|
||||
}
|
||||
|
||||
func AddDefaultResponseHeaders(cfg *setting.Cfg) web.Handler {
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
@ -238,7 +239,9 @@ func WithReqContext(req *http.Request, user *user.SignedInUser) ClientDecoratorT
|
||||
return ClientDecoratorTestOption(func(cdt *ClientDecoratorTest) {
|
||||
if cdt.ReqContext == nil {
|
||||
cdt.ReqContext = &contextmodel.ReqContext{
|
||||
Context: &web.Context{},
|
||||
Context: &web.Context{
|
||||
Resp: web.NewResponseWriter(req.Method, httptest.NewRecorder()),
|
||||
},
|
||||
SignedInUser: user,
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/anonymous/anonimpl"
|
||||
"github.com/grafana/grafana/pkg/services/auth"
|
||||
"github.com/grafana/grafana/pkg/services/auth/authimpl"
|
||||
"github.com/grafana/grafana/pkg/services/caching"
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
"github.com/grafana/grafana/pkg/services/datasources/permissions"
|
||||
datasourceservice "github.com/grafana/grafana/pkg/services/datasources/service"
|
||||
@ -88,6 +89,8 @@ var wireExtsBasicSet = wire.NewSet(
|
||||
pluginsintegration.WireExtensionSet,
|
||||
publicdashboardsService.ProvideServiceWrapper,
|
||||
wire.Bind(new(publicdashboards.ServiceWrapper), new(*publicdashboardsService.PublicDashboardServiceWrapperImpl)),
|
||||
caching.ProvideCachingService,
|
||||
wire.Bind(new(caching.CachingService), new(*caching.OSSCachingService)),
|
||||
)
|
||||
|
||||
var wireExtsSet = wire.NewSet(
|
||||
|
@ -136,7 +136,7 @@ func contextProvider(modifiers ...func(c *contextmodel.ReqContext)) web.Handler
|
||||
Logger: log.New(""),
|
||||
SignedInUser: &user.SignedInUser{},
|
||||
IsSignedIn: true,
|
||||
SkipCache: true,
|
||||
SkipDSCache: true,
|
||||
}
|
||||
for _, modifier := range modifiers {
|
||||
modifier(reqCtx)
|
||||
|
@ -449,7 +449,7 @@ func contextProvider(tc *testContext) web.Handler {
|
||||
Context: c,
|
||||
SignedInUser: tc.user,
|
||||
IsSignedIn: signedIn,
|
||||
SkipCache: true,
|
||||
SkipDSCache: true,
|
||||
Logger: log.New("test"),
|
||||
}
|
||||
c.Req = c.Req.WithContext(ctxkey.Set(c.Req.Context(), reqCtx))
|
||||
|
46
pkg/services/caching/fake_caching_service.go
Normal file
46
pkg/services/caching/fake_caching_service.go
Normal file
@ -0,0 +1,46 @@
|
||||
package caching
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
type FakeOSSCachingService struct {
|
||||
calls map[string]int
|
||||
ReturnHit bool
|
||||
ReturnResourceResponse *backend.CallResourceResponse
|
||||
ReturnQueryResponse CachedQueryDataResponse
|
||||
}
|
||||
|
||||
func (f *FakeOSSCachingService) CacheResourceResponse(ctx context.Context, req *backend.CallResourceRequest, resp *backend.CallResourceResponse) {
|
||||
f.calls["CacheResourceResponse"]++
|
||||
}
|
||||
|
||||
func (f *FakeOSSCachingService) HandleQueryRequest(ctx context.Context, req *backend.QueryDataRequest) (bool, CachedQueryDataResponse) {
|
||||
f.calls["HandleQueryRequest"]++
|
||||
return f.ReturnHit, f.ReturnQueryResponse
|
||||
}
|
||||
|
||||
func (f *FakeOSSCachingService) HandleResourceRequest(ctx context.Context, req *backend.CallResourceRequest) (bool, *backend.CallResourceResponse) {
|
||||
f.calls["HandleResourceRequest"]++
|
||||
return f.ReturnHit, f.ReturnResourceResponse
|
||||
}
|
||||
|
||||
func (f *FakeOSSCachingService) AssertCalls(t *testing.T, fn string, times int) {
|
||||
assert.Equal(t, times, f.calls[fn])
|
||||
}
|
||||
|
||||
func (f *FakeOSSCachingService) Reset() {
|
||||
*f = *NewFakeOSSCachingService()
|
||||
}
|
||||
|
||||
func NewFakeOSSCachingService() *FakeOSSCachingService {
|
||||
fake := &FakeOSSCachingService{
|
||||
calls: map[string]int{},
|
||||
}
|
||||
|
||||
return fake
|
||||
}
|
60
pkg/services/caching/service.go
Normal file
60
pkg/services/caching/service.go
Normal file
@ -0,0 +1,60 @@
|
||||
package caching
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
)
|
||||
|
||||
const (
|
||||
XCacheHeader = "X-Cache"
|
||||
StatusHit = "HIT"
|
||||
StatusMiss = "MISS"
|
||||
StatusBypass = "BYPASS"
|
||||
StatusError = "ERROR"
|
||||
StatusDisabled = "DISABLED"
|
||||
)
|
||||
|
||||
type CacheQueryResponseFn func(context.Context, *backend.QueryDataResponse)
|
||||
|
||||
type CachedQueryDataResponse struct {
|
||||
// The cached data response associated with a query, or nil if no cached data is found
|
||||
Response *backend.QueryDataResponse
|
||||
// A function that should be used to cache a QueryDataResponse for a given query.
|
||||
// It can be set to nil by the method implementation (if there is an error, for example), so it should be checked before being called.
|
||||
UpdateCacheFn CacheQueryResponseFn
|
||||
}
|
||||
|
||||
func ProvideCachingService() *OSSCachingService {
|
||||
return &OSSCachingService{}
|
||||
}
|
||||
|
||||
type CachingService interface {
|
||||
// HandleQueryRequest uses a QueryDataRequest to check the cache for any existing results for that query.
|
||||
// If none are found, it should return false and a CachedQueryDataResponse with an UpdateCacheFn which can be used to update the results cache after the fact.
|
||||
// This function may populate any response headers (accessible through the context) with the cache status using the X-Cache header.
|
||||
HandleQueryRequest(context.Context, *backend.QueryDataRequest) (bool, CachedQueryDataResponse)
|
||||
// HandleResourceRequest uses a CallResourceRequest to check the cache for any existing results for that request. If none are found, it should return false.
|
||||
// This function may populate any response headers (accessible through the context) with the cache status using the X-Cache header.
|
||||
HandleResourceRequest(context.Context, *backend.CallResourceRequest) (bool, *backend.CallResourceResponse)
|
||||
// CacheResourceResponse is used to cache resource responses for a resource request.
|
||||
// Because plugins can send multiple responses asyncronously, the implementation should be able to handle multiple calls to this function for one request.
|
||||
CacheResourceResponse(context.Context, *backend.CallResourceRequest, *backend.CallResourceResponse)
|
||||
}
|
||||
|
||||
// Implementation of interface - does nothing
|
||||
type OSSCachingService struct {
|
||||
}
|
||||
|
||||
func (s *OSSCachingService) HandleQueryRequest(ctx context.Context, req *backend.QueryDataRequest) (bool, CachedQueryDataResponse) {
|
||||
return false, CachedQueryDataResponse{}
|
||||
}
|
||||
|
||||
func (s *OSSCachingService) HandleResourceRequest(ctx context.Context, req *backend.CallResourceRequest) (bool, *backend.CallResourceResponse) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (s *OSSCachingService) CacheResourceResponse(ctx context.Context, req *backend.CallResourceRequest, resp *backend.CallResourceResponse) {
|
||||
}
|
||||
|
||||
var _ CachingService = &OSSCachingService{}
|
@ -125,7 +125,7 @@ func (h *ContextHandler) Middleware(next http.Handler) http.Handler {
|
||||
},
|
||||
IsSignedIn: false,
|
||||
AllowAnonymous: false,
|
||||
SkipCache: false,
|
||||
SkipDSCache: false,
|
||||
Logger: log.New("context"),
|
||||
}
|
||||
|
||||
|
@ -25,7 +25,8 @@ type ReqContext struct {
|
||||
IsSignedIn bool
|
||||
IsRenderCall bool
|
||||
AllowAnonymous bool
|
||||
SkipCache bool
|
||||
SkipDSCache bool
|
||||
SkipQueryCache bool
|
||||
Logger log.Logger
|
||||
Error error
|
||||
// RequestNonce is a cryptographic request identifier for use with Content Security Policy.
|
||||
|
@ -73,7 +73,7 @@ func (p *DataSourceProxyService) ProxyDatasourceRequestWithUID(c *contextmodel.R
|
||||
return
|
||||
}
|
||||
|
||||
ds, err := p.DataSourceCache.GetDatasourceByUID(c.Req.Context(), dsUID, c.SignedInUser, c.SkipCache)
|
||||
ds, err := p.DataSourceCache.GetDatasourceByUID(c.Req.Context(), dsUID, c.SignedInUser, c.SkipDSCache)
|
||||
if err != nil {
|
||||
toAPIError(c, err)
|
||||
return
|
||||
@ -84,7 +84,7 @@ func (p *DataSourceProxyService) ProxyDatasourceRequestWithUID(c *contextmodel.R
|
||||
func (p *DataSourceProxyService) ProxyDatasourceRequestWithID(c *contextmodel.ReqContext, dsID int64) {
|
||||
c.TimeRequest(metrics.MDataSourceProxyReqTimer)
|
||||
|
||||
ds, err := p.DataSourceCache.GetDatasource(c.Req.Context(), dsID, c.SignedInUser, c.SkipCache)
|
||||
ds, err := p.DataSourceCache.GetDatasource(c.Req.Context(), dsID, c.SignedInUser, c.SkipDSCache)
|
||||
if err != nil {
|
||||
toAPIError(c, err)
|
||||
return
|
||||
|
@ -23,4 +23,5 @@ const (
|
||||
awsPluginsSquad codeowner = "@grafana/aws-plugins"
|
||||
appO11ySquad codeowner = "@grafana/app-o11y"
|
||||
grafanaPartnerPluginsSquad codeowner = "@grafana/partner-plugins"
|
||||
grafanaOperatorExperienceSquad codeowner = "@grafana/grafana-operator-experience-squad"
|
||||
)
|
||||
|
@ -513,6 +513,13 @@ var (
|
||||
FrontendOnly: true,
|
||||
Owner: grafanaObservabilityMetricsSquad,
|
||||
},
|
||||
{
|
||||
Name: "useCachingService",
|
||||
Description: "When turned on, the new query and resource caching implementation using a wire service inject will be used in place of the previous middleware implementation",
|
||||
State: FeatureStateStable,
|
||||
Owner: grafanaOperatorExperienceSquad,
|
||||
RequiresRestart: true,
|
||||
},
|
||||
{
|
||||
Name: "disableElasticsearchBackendQuerying",
|
||||
Description: "Disable the processing of queries and responses in the Elasticsearch data source through backend",
|
||||
|
@ -76,4 +76,5 @@ renderAuthJWT,beta,@grafana/grafana-as-code,false,false,false,false
|
||||
pyroscopeFlameGraph,alpha,@grafana/observability-traces-and-profiling,false,false,false,false
|
||||
externalServiceAuth,alpha,@grafana/grafana-authnz-team,true,false,false,false
|
||||
dataplaneFrontendFallback,alpha,@grafana/observability-metrics,false,false,false,true
|
||||
useCachingService,stable,@grafana/grafana-operator-experience-squad,false,false,true,false
|
||||
disableElasticsearchBackendQuerying,stable,@grafana/observability-logs,false,false,false,false
|
||||
|
|
@ -315,6 +315,10 @@ const (
|
||||
// Support dataplane contract field name change for transformations and field name matchers where the name is different
|
||||
FlagDataplaneFrontendFallback = "dataplaneFrontendFallback"
|
||||
|
||||
// FlagUseCachingService
|
||||
// When turned on, the new query and resource caching implementation using a wire service inject will be used in place of the previous middleware implementation
|
||||
FlagUseCachingService = "useCachingService"
|
||||
|
||||
// FlagDisableElasticsearchBackendQuerying
|
||||
// Disable the processing of queries and responses in the Elasticsearch data source through backend
|
||||
FlagDisableElasticsearchBackendQuerying = "disableElasticsearchBackendQuerying"
|
||||
|
@ -74,7 +74,7 @@ func (am *LotexAM) withAMReq(
|
||||
return response.Error(http.StatusBadRequest, "DatasourceUID is invalid", nil)
|
||||
}
|
||||
|
||||
ds, err := am.DataProxy.DataSourceCache.GetDatasourceByUID(ctx.Req.Context(), datasourceUID, ctx.SignedInUser, ctx.SkipCache)
|
||||
ds, err := am.DataProxy.DataSourceCache.GetDatasourceByUID(ctx.Req.Context(), datasourceUID, ctx.SignedInUser, ctx.SkipDSCache)
|
||||
if err != nil {
|
||||
if errors.Is(err, datasources.ErrDataSourceAccessDenied) {
|
||||
return ErrResp(http.StatusForbidden, err, "Access denied to datasource")
|
||||
|
@ -82,7 +82,7 @@ func (p *LotexProm) getEndpoints(ctx *contextmodel.ReqContext) (*promEndpoints,
|
||||
return nil, fmt.Errorf("datasource UID is invalid")
|
||||
}
|
||||
|
||||
ds, err := p.DataProxy.DataSourceCache.GetDatasourceByUID(ctx.Req.Context(), datasourceUID, ctx.SignedInUser, ctx.SkipCache)
|
||||
ds, err := p.DataProxy.DataSourceCache.GetDatasourceByUID(ctx.Req.Context(), datasourceUID, ctx.SignedInUser, ctx.SkipDSCache)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -179,7 +179,7 @@ func (r *LotexRuler) validateAndGetPrefix(ctx *contextmodel.ReqContext) (string,
|
||||
return "", fmt.Errorf("datasource UID is invalid")
|
||||
}
|
||||
|
||||
ds, err := r.DataProxy.DataSourceCache.GetDatasourceByUID(ctx.Req.Context(), datasourceUID, ctx.SignedInUser, ctx.SkipCache)
|
||||
ds, err := r.DataProxy.DataSourceCache.GetDatasourceByUID(ctx.Req.Context(), datasourceUID, ctx.SignedInUser, ctx.SkipDSCache)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
@ -36,7 +36,7 @@ func toMacaronPath(path string) string {
|
||||
|
||||
func getDatasourceByUID(ctx *contextmodel.ReqContext, cache datasources.CacheService, expectedType apimodels.Backend) (*datasources.DataSource, error) {
|
||||
datasourceUID := web.Params(ctx.Req)[":DatasourceUID"]
|
||||
ds, err := cache.GetDatasourceByUID(ctx.Req.Context(), datasourceUID, ctx.SignedInUser, ctx.SkipCache)
|
||||
ds, err := cache.GetDatasourceByUID(ctx.Req.Context(), datasourceUID, ctx.SignedInUser, ctx.SkipDSCache)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -50,7 +50,8 @@ func TestAlertingProxy_createProxyContext(t *testing.T) {
|
||||
IsSignedIn: rand.Int63()%2 == 1,
|
||||
IsRenderCall: rand.Int63()%2 == 1,
|
||||
AllowAnonymous: rand.Int63()%2 == 1,
|
||||
SkipCache: rand.Int63()%2 == 1,
|
||||
SkipDSCache: rand.Int63()%2 == 1,
|
||||
SkipQueryCache: rand.Int63()%2 == 1,
|
||||
Logger: log.New("test"),
|
||||
RequestNonce: util.GenerateShortUID(),
|
||||
IsPublicDashboardView: rand.Int63()%2 == 1,
|
||||
@ -75,7 +76,8 @@ func TestAlertingProxy_createProxyContext(t *testing.T) {
|
||||
require.Equal(t, ctx.IsSignedIn, newCtx.IsSignedIn)
|
||||
require.Equal(t, ctx.IsRenderCall, newCtx.IsRenderCall)
|
||||
require.Equal(t, ctx.AllowAnonymous, newCtx.AllowAnonymous)
|
||||
require.Equal(t, ctx.SkipCache, newCtx.SkipCache)
|
||||
require.Equal(t, ctx.SkipDSCache, newCtx.SkipDSCache)
|
||||
require.Equal(t, ctx.SkipQueryCache, newCtx.SkipQueryCache)
|
||||
require.Equal(t, ctx.Logger, newCtx.Logger)
|
||||
require.Equal(t, ctx.RequestNonce, newCtx.RequestNonce)
|
||||
require.Equal(t, ctx.IsPublicDashboardView, newCtx.IsPublicDashboardView)
|
||||
|
@ -0,0 +1,35 @@
|
||||
package clientmiddleware
|
||||
|
||||
import (
|
||||
"github.com/grafana/grafana/pkg/infra/metrics"
|
||||
contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
QueryPubdash = "pubdash"
|
||||
QueryDashboard = "dashboard"
|
||||
)
|
||||
|
||||
var QueryCachingRequestHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: metrics.ExporterName,
|
||||
Subsystem: "caching",
|
||||
Name: "query_caching_request_duration_seconds",
|
||||
Help: "histogram of grafana query endpoint requests in seconds",
|
||||
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 25, 50, 100},
|
||||
}, []string{"datasource_type", "cache", "query_type"})
|
||||
|
||||
var ResourceCachingRequestHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: metrics.ExporterName,
|
||||
Subsystem: "caching",
|
||||
Name: "resource_caching_request_duration_seconds",
|
||||
Help: "histogram of grafana resource endpoint requests in seconds",
|
||||
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 25, 50, 100},
|
||||
}, []string{"plugin_id", "cache"})
|
||||
|
||||
func getQueryType(req *contextmodel.ReqContext) string {
|
||||
if req.IsPublicDashboardView {
|
||||
return QueryPubdash
|
||||
}
|
||||
return QueryDashboard
|
||||
}
|
@ -0,0 +1,144 @@
|
||||
package clientmiddleware
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/services/caching"
|
||||
"github.com/grafana/grafana/pkg/services/contexthandler"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
// NewCachingMiddleware creates a new plugins.ClientMiddleware that will
|
||||
// attempt to read and write query results to the cache
|
||||
func NewCachingMiddleware(cachingService caching.CachingService) plugins.ClientMiddleware {
|
||||
log := log.New("caching_middleware")
|
||||
if err := prometheus.Register(QueryCachingRequestHistogram); err != nil {
|
||||
log.Error("error registering prometheus collector 'QueryRequestHistogram'", "error", err)
|
||||
}
|
||||
if err := prometheus.Register(ResourceCachingRequestHistogram); err != nil {
|
||||
log.Error("error registering prometheus collector 'ResourceRequestHistogram'", "error", err)
|
||||
}
|
||||
return plugins.ClientMiddlewareFunc(func(next plugins.Client) plugins.Client {
|
||||
return &CachingMiddleware{
|
||||
next: next,
|
||||
caching: cachingService,
|
||||
log: log,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
type CachingMiddleware struct {
|
||||
next plugins.Client
|
||||
caching caching.CachingService
|
||||
log log.Logger
|
||||
}
|
||||
|
||||
// QueryData receives a data request and attempts to access results already stored in the cache for that request.
|
||||
// If data is found, it will return it immediately. Otherwise, it will perform the queries as usual, then write the response to the cache.
|
||||
// If the cache service is implemented, we capture the request duration as a metric. The service is expected to write any response headers.
|
||||
func (m *CachingMiddleware) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
if req == nil {
|
||||
return m.next.QueryData(ctx, req)
|
||||
}
|
||||
|
||||
reqCtx := contexthandler.FromContext(ctx)
|
||||
if reqCtx == nil {
|
||||
return m.next.QueryData(ctx, req)
|
||||
}
|
||||
|
||||
// time how long this request takes
|
||||
start := time.Now()
|
||||
|
||||
// First look in the query cache if enabled
|
||||
hit, cr := m.caching.HandleQueryRequest(ctx, req)
|
||||
|
||||
defer func() {
|
||||
// record request duration if caching was used
|
||||
if ch := reqCtx.Resp.Header().Get(caching.XCacheHeader); ch != "" {
|
||||
QueryCachingRequestHistogram.With(prometheus.Labels{
|
||||
"datasource_type": req.PluginContext.DataSourceInstanceSettings.Type,
|
||||
"cache": ch,
|
||||
"query_type": getQueryType(reqCtx),
|
||||
}).Observe(time.Since(start).Seconds())
|
||||
}
|
||||
}()
|
||||
|
||||
// Cache hit; return the response
|
||||
if hit {
|
||||
return cr.Response, nil
|
||||
}
|
||||
|
||||
// Cache miss; do the actual queries
|
||||
resp, err := m.next.QueryData(ctx, req)
|
||||
|
||||
// Update the query cache with the result for this metrics request
|
||||
if err == nil && cr.UpdateCacheFn != nil {
|
||||
cr.UpdateCacheFn(ctx, resp)
|
||||
}
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// CallResource receives a resource request and attempts to access results already stored in the cache for that request.
|
||||
// If data is found, it will return it immediately. Otherwise, it will perform the request as usual. The caller of CallResource is expected to explicitly update the cache with any responses.
|
||||
// If the cache service is implemented, we capture the request duration as a metric. The service is expected to write any response headers.
|
||||
func (m *CachingMiddleware) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
|
||||
if req == nil {
|
||||
return m.next.CallResource(ctx, req, sender)
|
||||
}
|
||||
|
||||
reqCtx := contexthandler.FromContext(ctx)
|
||||
if reqCtx == nil {
|
||||
return m.next.CallResource(ctx, req, sender)
|
||||
}
|
||||
|
||||
// time how long this request takes
|
||||
start := time.Now()
|
||||
|
||||
// First look in the resource cache if enabled
|
||||
hit, resp := m.caching.HandleResourceRequest(ctx, req)
|
||||
|
||||
defer func() {
|
||||
// record request duration if caching was used
|
||||
if ch := reqCtx.Resp.Header().Get(caching.XCacheHeader); ch != "" {
|
||||
ResourceCachingRequestHistogram.With(prometheus.Labels{
|
||||
"plugin_id": req.PluginContext.PluginID,
|
||||
"cache": ch,
|
||||
}).Observe(time.Since(start).Seconds())
|
||||
}
|
||||
}()
|
||||
|
||||
// Cache hit; send the response and return
|
||||
if hit {
|
||||
return sender.Send(resp)
|
||||
}
|
||||
|
||||
// Cache miss; do the actual request
|
||||
// The call to update the cache happens in /pkg/api/plugin_resource.go in the flushStream() func
|
||||
// TODO: Implement updating the cache from this method
|
||||
return m.next.CallResource(ctx, req, sender)
|
||||
}
|
||||
|
||||
func (m *CachingMiddleware) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
|
||||
return m.next.CheckHealth(ctx, req)
|
||||
}
|
||||
|
||||
func (m *CachingMiddleware) CollectMetrics(ctx context.Context, req *backend.CollectMetricsRequest) (*backend.CollectMetricsResult, error) {
|
||||
return m.next.CollectMetrics(ctx, req)
|
||||
}
|
||||
|
||||
func (m *CachingMiddleware) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
|
||||
return m.next.SubscribeStream(ctx, req)
|
||||
}
|
||||
|
||||
func (m *CachingMiddleware) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
|
||||
return m.next.PublishStream(ctx, req)
|
||||
}
|
||||
|
||||
func (m *CachingMiddleware) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
|
||||
return m.next.RunStream(ctx, req, sender)
|
||||
}
|
@ -0,0 +1,225 @@
|
||||
package clientmiddleware
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"testing"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana/pkg/plugins/manager/client/clienttest"
|
||||
"github.com/grafana/grafana/pkg/services/caching"
|
||||
"github.com/grafana/grafana/pkg/services/contexthandler"
|
||||
"github.com/grafana/grafana/pkg/services/user"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestCachingMiddleware(t *testing.T) {
|
||||
t.Run("When QueryData is called", func(t *testing.T) {
|
||||
req, err := http.NewRequest(http.MethodGet, "/query", nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
cs := caching.NewFakeOSSCachingService()
|
||||
cdt := clienttest.NewClientDecoratorTest(t,
|
||||
clienttest.WithReqContext(req, &user.SignedInUser{}),
|
||||
clienttest.WithMiddlewares(NewCachingMiddleware(cs)),
|
||||
)
|
||||
|
||||
jsonDataMap := map[string]interface{}{}
|
||||
jsonDataBytes, err := json.Marshal(&jsonDataMap)
|
||||
require.NoError(t, err)
|
||||
|
||||
pluginCtx := backend.PluginContext{
|
||||
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{
|
||||
JSONData: jsonDataBytes,
|
||||
},
|
||||
}
|
||||
|
||||
// Populated by clienttest.WithReqContext
|
||||
reqCtx := contexthandler.FromContext(req.Context())
|
||||
require.NotNil(t, reqCtx)
|
||||
|
||||
qdr := &backend.QueryDataRequest{
|
||||
PluginContext: pluginCtx,
|
||||
}
|
||||
|
||||
// Track whether the update cache fn was called, depending on what the response headers are in the cache request
|
||||
var updateCacheCalled bool
|
||||
dataResponse := caching.CachedQueryDataResponse{
|
||||
Response: &backend.QueryDataResponse{},
|
||||
UpdateCacheFn: func(ctx context.Context, qdr *backend.QueryDataResponse) {
|
||||
updateCacheCalled = true
|
||||
},
|
||||
}
|
||||
|
||||
t.Run("If cache returns a hit, no queries are issued", func(t *testing.T) {
|
||||
t.Cleanup(func() {
|
||||
updateCacheCalled = false
|
||||
cs.Reset()
|
||||
})
|
||||
|
||||
cs.ReturnHit = true
|
||||
cs.ReturnQueryResponse = dataResponse
|
||||
|
||||
resp, err := cdt.Decorator.QueryData(req.Context(), qdr)
|
||||
assert.NoError(t, err)
|
||||
// Cache service is called once
|
||||
cs.AssertCalls(t, "HandleQueryRequest", 1)
|
||||
// Equals the mocked response
|
||||
assert.NotNil(t, resp)
|
||||
assert.Equal(t, dataResponse.Response, resp)
|
||||
// Cache was not updated by the middleware
|
||||
assert.False(t, updateCacheCalled)
|
||||
})
|
||||
|
||||
t.Run("If cache returns a miss, queries are issued and the update cache function is called", func(t *testing.T) {
|
||||
t.Cleanup(func() {
|
||||
updateCacheCalled = false
|
||||
cs.Reset()
|
||||
})
|
||||
|
||||
cs.ReturnHit = false
|
||||
cs.ReturnQueryResponse = dataResponse
|
||||
|
||||
resp, err := cdt.Decorator.QueryData(req.Context(), qdr)
|
||||
assert.NoError(t, err)
|
||||
// Cache service is called once
|
||||
cs.AssertCalls(t, "HandleQueryRequest", 1)
|
||||
// Equals nil (returned by the decorator test)
|
||||
assert.Nil(t, resp)
|
||||
// Since it was a miss, the middleware called the update func
|
||||
assert.True(t, updateCacheCalled)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("When CallResource is called", func(t *testing.T) {
|
||||
req, err := http.NewRequest(http.MethodGet, "/resource/blah", nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
cs := caching.NewFakeOSSCachingService()
|
||||
cdt := clienttest.NewClientDecoratorTest(t,
|
||||
clienttest.WithReqContext(req, &user.SignedInUser{}),
|
||||
clienttest.WithMiddlewares(NewCachingMiddleware(cs)),
|
||||
)
|
||||
|
||||
jsonDataMap := map[string]interface{}{}
|
||||
jsonDataBytes, err := json.Marshal(&jsonDataMap)
|
||||
require.NoError(t, err)
|
||||
|
||||
pluginCtx := backend.PluginContext{
|
||||
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{
|
||||
JSONData: jsonDataBytes,
|
||||
},
|
||||
}
|
||||
|
||||
// Populated by clienttest.WithReqContext
|
||||
reqCtx := contexthandler.FromContext(req.Context())
|
||||
require.NotNil(t, reqCtx)
|
||||
|
||||
crr := &backend.CallResourceRequest{
|
||||
PluginContext: pluginCtx,
|
||||
}
|
||||
|
||||
resourceResponse := &backend.CallResourceResponse{
|
||||
Status: 200,
|
||||
Body: []byte("bogus"),
|
||||
}
|
||||
|
||||
var sentResponse *backend.CallResourceResponse
|
||||
var storeOneResponseCallResourceSender = callResourceResponseSenderFunc(func(res *backend.CallResourceResponse) error {
|
||||
sentResponse = res
|
||||
return nil
|
||||
})
|
||||
|
||||
t.Run("If cache returns a hit, no resource call is issued", func(t *testing.T) {
|
||||
t.Cleanup(func() {
|
||||
sentResponse = nil
|
||||
cs.Reset()
|
||||
})
|
||||
|
||||
cs.ReturnHit = true
|
||||
cs.ReturnResourceResponse = resourceResponse
|
||||
|
||||
err := cdt.Decorator.CallResource(req.Context(), crr, storeOneResponseCallResourceSender)
|
||||
assert.NoError(t, err)
|
||||
// Cache service is called once
|
||||
cs.AssertCalls(t, "HandleResourceRequest", 1)
|
||||
// Equals the mocked response was sent
|
||||
assert.NotNil(t, sentResponse)
|
||||
assert.Equal(t, resourceResponse, sentResponse)
|
||||
})
|
||||
|
||||
t.Run("If cache returns a miss, resource call is issued", func(t *testing.T) {
|
||||
t.Cleanup(func() {
|
||||
sentResponse = nil
|
||||
cs.Reset()
|
||||
})
|
||||
|
||||
cs.ReturnHit = false
|
||||
cs.ReturnResourceResponse = resourceResponse
|
||||
|
||||
err := cdt.Decorator.CallResource(req.Context(), crr, storeOneResponseCallResourceSender)
|
||||
assert.NoError(t, err)
|
||||
// Cache service is called once
|
||||
cs.AssertCalls(t, "HandleResourceRequest", 1)
|
||||
// Nil response was sent
|
||||
assert.Nil(t, sentResponse)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("When RequestContext is nil", func(t *testing.T) {
|
||||
req, err := http.NewRequest(http.MethodGet, "/doesnt/matter", nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
cs := caching.NewFakeOSSCachingService()
|
||||
cdt := clienttest.NewClientDecoratorTest(t,
|
||||
// Skip the request context in this case
|
||||
clienttest.WithMiddlewares(NewCachingMiddleware(cs)),
|
||||
)
|
||||
reqCtx := contexthandler.FromContext(req.Context())
|
||||
require.Nil(t, reqCtx)
|
||||
|
||||
jsonDataMap := map[string]interface{}{}
|
||||
jsonDataBytes, err := json.Marshal(&jsonDataMap)
|
||||
require.NoError(t, err)
|
||||
|
||||
pluginCtx := backend.PluginContext{
|
||||
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{
|
||||
JSONData: jsonDataBytes,
|
||||
},
|
||||
}
|
||||
|
||||
t.Run("Query caching is skipped", func(t *testing.T) {
|
||||
t.Cleanup(func() {
|
||||
cs.Reset()
|
||||
})
|
||||
|
||||
qdr := &backend.QueryDataRequest{
|
||||
PluginContext: pluginCtx,
|
||||
}
|
||||
|
||||
resp, err := cdt.Decorator.QueryData(context.Background(), qdr)
|
||||
assert.NoError(t, err)
|
||||
// Cache service is never called
|
||||
cs.AssertCalls(t, "HandleQueryRequest", 0)
|
||||
// Equals nil (returned by the decorator test)
|
||||
assert.Nil(t, resp)
|
||||
})
|
||||
|
||||
t.Run("Resource caching is skipped", func(t *testing.T) {
|
||||
t.Cleanup(func() {
|
||||
cs.Reset()
|
||||
})
|
||||
|
||||
crr := &backend.CallResourceRequest{
|
||||
PluginContext: pluginCtx,
|
||||
}
|
||||
|
||||
err := cdt.Decorator.CallResource(req.Context(), crr, nopCallResourceSender)
|
||||
assert.NoError(t, err)
|
||||
// Cache service is never called
|
||||
cs.AssertCalls(t, "HandleResourceRequest", 0)
|
||||
})
|
||||
})
|
||||
}
|
@ -21,6 +21,8 @@ import (
|
||||
"github.com/grafana/grafana/pkg/plugins/manager/store"
|
||||
"github.com/grafana/grafana/pkg/plugins/pluginscdn"
|
||||
"github.com/grafana/grafana/pkg/plugins/repo"
|
||||
"github.com/grafana/grafana/pkg/services/caching"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/oauthtoken"
|
||||
"github.com/grafana/grafana/pkg/services/pluginsintegration/clientmiddleware"
|
||||
"github.com/grafana/grafana/pkg/services/pluginsintegration/config"
|
||||
@ -77,24 +79,29 @@ var WireExtensionSet = wire.NewSet(
|
||||
finder.NewLocalFinder,
|
||||
)
|
||||
|
||||
func ProvideClientDecorator(cfg *setting.Cfg, pCfg *pCfg.Cfg,
|
||||
func ProvideClientDecorator(
|
||||
cfg *setting.Cfg, pCfg *pCfg.Cfg,
|
||||
pluginRegistry registry.Service,
|
||||
oAuthTokenService oauthtoken.OAuthTokenService,
|
||||
tracer tracing.Tracer) (*client.Decorator, error) {
|
||||
return NewClientDecorator(cfg, pCfg, pluginRegistry, oAuthTokenService, tracer)
|
||||
tracer tracing.Tracer,
|
||||
cachingService caching.CachingService,
|
||||
features *featuremgmt.FeatureManager,
|
||||
) (*client.Decorator, error) {
|
||||
return NewClientDecorator(cfg, pCfg, pluginRegistry, oAuthTokenService, tracer, cachingService, features)
|
||||
}
|
||||
|
||||
func NewClientDecorator(cfg *setting.Cfg, pCfg *pCfg.Cfg,
|
||||
pluginRegistry registry.Service,
|
||||
oAuthTokenService oauthtoken.OAuthTokenService,
|
||||
tracer tracing.Tracer) (*client.Decorator, error) {
|
||||
func NewClientDecorator(
|
||||
cfg *setting.Cfg, pCfg *pCfg.Cfg,
|
||||
pluginRegistry registry.Service, oAuthTokenService oauthtoken.OAuthTokenService,
|
||||
tracer tracing.Tracer, cachingService caching.CachingService, features *featuremgmt.FeatureManager,
|
||||
) (*client.Decorator, error) {
|
||||
c := client.ProvideService(pluginRegistry, pCfg)
|
||||
middlewares := CreateMiddlewares(cfg, oAuthTokenService, tracer)
|
||||
middlewares := CreateMiddlewares(cfg, oAuthTokenService, tracer, cachingService, features)
|
||||
|
||||
return client.NewDecorator(c, middlewares...)
|
||||
}
|
||||
|
||||
func CreateMiddlewares(cfg *setting.Cfg, oAuthTokenService oauthtoken.OAuthTokenService, tracer tracing.Tracer) []plugins.ClientMiddleware {
|
||||
func CreateMiddlewares(cfg *setting.Cfg, oAuthTokenService oauthtoken.OAuthTokenService, tracer tracing.Tracer, cachingService caching.CachingService, features *featuremgmt.FeatureManager) []plugins.ClientMiddleware {
|
||||
skipCookiesNames := []string{cfg.LoginCookieName}
|
||||
middlewares := []plugins.ClientMiddleware{
|
||||
clientmiddleware.NewTracingMiddleware(tracer),
|
||||
@ -104,6 +111,11 @@ func CreateMiddlewares(cfg *setting.Cfg, oAuthTokenService oauthtoken.OAuthToken
|
||||
clientmiddleware.NewCookiesMiddleware(skipCookiesNames),
|
||||
}
|
||||
|
||||
// Placing the new service implementation behind a feature flag until it is known to be stable
|
||||
if features.IsEnabled(featuremgmt.FlagUseCachingService) {
|
||||
middlewares = append(middlewares, clientmiddleware.NewCachingMiddleware(cachingService))
|
||||
}
|
||||
|
||||
if cfg.SendUserHeader {
|
||||
middlewares = append(middlewares, clientmiddleware.NewUserHeaderMiddleware())
|
||||
}
|
||||
|
@ -86,7 +86,7 @@ func contextProvider(tc *testContext) web.Handler {
|
||||
Context: c,
|
||||
SignedInUser: tc.user,
|
||||
IsSignedIn: signedIn,
|
||||
SkipCache: true,
|
||||
SkipDSCache: true,
|
||||
Logger: log.New("publicdashboards-test"),
|
||||
}
|
||||
c.Req = c.Req.WithContext(ctxkey.Set(c.Req.Context(), reqCtx))
|
||||
|
@ -70,7 +70,7 @@ func (api *Api) QueryPublicDashboard(c *contextmodel.ReqContext) response.Respon
|
||||
return response.Err(ErrBadRequest.Errorf("QueryPublicDashboard: error parsing request: %v", err))
|
||||
}
|
||||
|
||||
resp, err := api.PublicDashboardService.GetQueryDataResponse(c.Req.Context(), c.SkipCache, reqDTO, panelId, accessToken)
|
||||
resp, err := api.PublicDashboardService.GetQueryDataResponse(c.Req.Context(), c.SkipDSCache, reqDTO, panelId, accessToken)
|
||||
if err != nil {
|
||||
return response.Err(err)
|
||||
}
|
||||
|
@ -461,4 +461,4 @@ func NewFakePublicDashboardService(t mockConstructorTestingTNewFakePublicDashboa
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
||||
}
|
@ -29,7 +29,7 @@ type Service interface {
|
||||
DeleteByDashboard(ctx context.Context, dashboard *dashboards.Dashboard) error
|
||||
|
||||
GetMetricRequest(ctx context.Context, dashboard *dashboards.Dashboard, publicDashboard *PublicDashboard, panelId int64, reqDTO PublicDashboardQueryDTO) (dtos.MetricRequest, error)
|
||||
GetQueryDataResponse(ctx context.Context, skipCache bool, reqDTO PublicDashboardQueryDTO, panelId int64, accessToken string) (*backend.QueryDataResponse, error)
|
||||
GetQueryDataResponse(ctx context.Context, skipDSCache bool, reqDTO PublicDashboardQueryDTO, panelId int64, accessToken string) (*backend.QueryDataResponse, error)
|
||||
GetOrgIdByAccessToken(ctx context.Context, accessToken string) (int64, error)
|
||||
NewPublicDashboardAccessToken(ctx context.Context) (string, error)
|
||||
NewPublicDashboardUid(ctx context.Context) (string, error)
|
||||
|
@ -120,7 +120,7 @@ func (pd *PublicDashboardServiceImpl) GetMetricRequest(ctx context.Context, dash
|
||||
}
|
||||
|
||||
// GetQueryDataResponse returns a query data response for the given panel and query
|
||||
func (pd *PublicDashboardServiceImpl) GetQueryDataResponse(ctx context.Context, skipCache bool, queryDto models.PublicDashboardQueryDTO, panelId int64, accessToken string) (*backend.QueryDataResponse, error) {
|
||||
func (pd *PublicDashboardServiceImpl) GetQueryDataResponse(ctx context.Context, skipDSCache bool, queryDto models.PublicDashboardQueryDTO, panelId int64, accessToken string) (*backend.QueryDataResponse, error) {
|
||||
publicDashboard, dashboard, err := pd.FindEnabledPublicDashboardAndDashboardByAccessToken(ctx, accessToken)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -136,7 +136,7 @@ func (pd *PublicDashboardServiceImpl) GetQueryDataResponse(ctx context.Context,
|
||||
}
|
||||
|
||||
anonymousUser := buildAnonymousUser(ctx, dashboard)
|
||||
res, err := pd.QueryDataService.QueryData(ctx, anonymousUser, skipCache, metricReq)
|
||||
res, err := pd.QueryDataService.QueryData(ctx, anonymousUser, skipDSCache, metricReq)
|
||||
|
||||
reqDatasources := metricReq.GetUniqueDatasourceTypes()
|
||||
if err != nil {
|
||||
|
@ -56,7 +56,7 @@ func ProvideService(
|
||||
//go:generate mockery --name Service --structname FakeQueryService --inpackage --filename query_service_mock.go
|
||||
type Service interface {
|
||||
Run(ctx context.Context) error
|
||||
QueryData(ctx context.Context, user *user.SignedInUser, skipCache bool, reqDTO dtos.MetricRequest) (*backend.QueryDataResponse, error)
|
||||
QueryData(ctx context.Context, user *user.SignedInUser, skipDSCache bool, reqDTO dtos.MetricRequest) (*backend.QueryDataResponse, error)
|
||||
}
|
||||
|
||||
// Gives us compile time error if the service does not adhere to the contract of the interface
|
||||
@ -79,9 +79,9 @@ func (s *ServiceImpl) Run(ctx context.Context) error {
|
||||
}
|
||||
|
||||
// QueryData processes queries and returns query responses. It handles queries to single or mixed datasources, as well as expressions.
|
||||
func (s *ServiceImpl) QueryData(ctx context.Context, user *user.SignedInUser, skipCache bool, reqDTO dtos.MetricRequest) (*backend.QueryDataResponse, error) {
|
||||
func (s *ServiceImpl) QueryData(ctx context.Context, user *user.SignedInUser, skipDSCache bool, reqDTO dtos.MetricRequest) (*backend.QueryDataResponse, error) {
|
||||
// Parse the request into parsed queries grouped by datasource uid
|
||||
parsedReq, err := s.parseMetricRequest(ctx, user, skipCache, reqDTO)
|
||||
parsedReq, err := s.parseMetricRequest(ctx, user, skipDSCache, reqDTO)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -95,11 +95,11 @@ func (s *ServiceImpl) QueryData(ctx context.Context, user *user.SignedInUser, sk
|
||||
return s.handleQuerySingleDatasource(ctx, user, parsedReq)
|
||||
}
|
||||
// If there are multiple datasources, handle their queries concurrently and return the aggregate result
|
||||
return s.executeConcurrentQueries(ctx, user, skipCache, reqDTO, parsedReq.parsedQueries)
|
||||
return s.executeConcurrentQueries(ctx, user, skipDSCache, reqDTO, parsedReq.parsedQueries)
|
||||
}
|
||||
|
||||
// executeConcurrentQueries executes queries to multiple datasources concurrently and returns the aggregate result.
|
||||
func (s *ServiceImpl) executeConcurrentQueries(ctx context.Context, user *user.SignedInUser, skipCache bool, reqDTO dtos.MetricRequest, queriesbyDs map[string][]parsedQuery) (*backend.QueryDataResponse, error) {
|
||||
func (s *ServiceImpl) executeConcurrentQueries(ctx context.Context, user *user.SignedInUser, skipDSCache bool, reqDTO dtos.MetricRequest, queriesbyDs map[string][]parsedQuery) (*backend.QueryDataResponse, error) {
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
g.SetLimit(8) // arbitrary limit to prevent too many concurrent requests
|
||||
rchan := make(chan backend.Responses, len(queriesbyDs))
|
||||
@ -132,7 +132,7 @@ func (s *ServiceImpl) executeConcurrentQueries(ctx context.Context, user *user.S
|
||||
// Handle panics in the datasource qery
|
||||
defer recoveryFn(subDTO.Queries)
|
||||
|
||||
subResp, err := s.QueryData(ctx, user, skipCache, subDTO)
|
||||
subResp, err := s.QueryData(ctx, user, skipDSCache, subDTO)
|
||||
if err == nil {
|
||||
rchan <- subResp.Responses
|
||||
} else {
|
||||
@ -248,7 +248,7 @@ func (s *ServiceImpl) handleQuerySingleDatasource(ctx context.Context, user *use
|
||||
}
|
||||
|
||||
// parseRequest parses a request into parsed queries grouped by datasource uid
|
||||
func (s *ServiceImpl) parseMetricRequest(ctx context.Context, user *user.SignedInUser, skipCache bool, reqDTO dtos.MetricRequest) (*parsedRequest, error) {
|
||||
func (s *ServiceImpl) parseMetricRequest(ctx context.Context, user *user.SignedInUser, skipDSCache bool, reqDTO dtos.MetricRequest) (*parsedRequest, error) {
|
||||
if len(reqDTO.Queries) == 0 {
|
||||
return nil, ErrNoQueriesFound
|
||||
}
|
||||
@ -263,7 +263,7 @@ func (s *ServiceImpl) parseMetricRequest(ctx context.Context, user *user.SignedI
|
||||
// Parse the queries and store them by datasource
|
||||
datasourcesByUid := map[string]*datasources.DataSource{}
|
||||
for _, query := range reqDTO.Queries {
|
||||
ds, err := s.getDataSourceFromQuery(ctx, user, skipCache, query, datasourcesByUid)
|
||||
ds, err := s.getDataSourceFromQuery(ctx, user, skipDSCache, query, datasourcesByUid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -309,7 +309,7 @@ func (s *ServiceImpl) parseMetricRequest(ctx context.Context, user *user.SignedI
|
||||
return req, req.validateRequest(ctx)
|
||||
}
|
||||
|
||||
func (s *ServiceImpl) getDataSourceFromQuery(ctx context.Context, user *user.SignedInUser, skipCache bool, query *simplejson.Json, history map[string]*datasources.DataSource) (*datasources.DataSource, error) {
|
||||
func (s *ServiceImpl) getDataSourceFromQuery(ctx context.Context, user *user.SignedInUser, skipDSCache bool, query *simplejson.Json, history map[string]*datasources.DataSource) (*datasources.DataSource, error) {
|
||||
var err error
|
||||
uid := query.Get("datasource").Get("uid").MustString()
|
||||
|
||||
@ -333,7 +333,7 @@ func (s *ServiceImpl) getDataSourceFromQuery(ctx context.Context, user *user.Sig
|
||||
}
|
||||
|
||||
if uid != "" {
|
||||
ds, err = s.dataSourceCache.GetDatasourceByUID(ctx, uid, user, skipCache)
|
||||
ds, err = s.dataSourceCache.GetDatasourceByUID(ctx, uid, user, skipDSCache)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -343,7 +343,7 @@ func (s *ServiceImpl) getDataSourceFromQuery(ctx context.Context, user *user.Sig
|
||||
// use datasourceId if it exists
|
||||
id := query.Get("datasourceId").MustInt64(0)
|
||||
if id > 0 {
|
||||
ds, err = s.dataSourceCache.GetDatasource(ctx, id, user, skipCache)
|
||||
ds, err = s.dataSourceCache.GetDatasource(ctx, id, user, skipDSCache)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -19,13 +19,13 @@ type FakeQueryService struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// QueryData provides a mock function with given fields: ctx, _a1, skipCache, reqDTO
|
||||
func (_m *FakeQueryService) QueryData(ctx context.Context, _a1 *user.SignedInUser, skipCache bool, reqDTO dtos.MetricRequest) (*backend.QueryDataResponse, error) {
|
||||
ret := _m.Called(ctx, _a1, skipCache, reqDTO)
|
||||
// QueryData provides a mock function with given fields: ctx, _a1, skipDSCache, reqDTO
|
||||
func (_m *FakeQueryService) QueryData(ctx context.Context, _a1 *user.SignedInUser, skipDSCache bool, reqDTO dtos.MetricRequest) (*backend.QueryDataResponse, error) {
|
||||
ret := _m.Called(ctx, _a1, skipDSCache, reqDTO)
|
||||
|
||||
var r0 *backend.QueryDataResponse
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *user.SignedInUser, bool, dtos.MetricRequest) *backend.QueryDataResponse); ok {
|
||||
r0 = rf(ctx, _a1, skipCache, reqDTO)
|
||||
r0 = rf(ctx, _a1, skipDSCache, reqDTO)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*backend.QueryDataResponse)
|
||||
@ -34,7 +34,7 @@ func (_m *FakeQueryService) QueryData(ctx context.Context, _a1 *user.SignedInUse
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, *user.SignedInUser, bool, dtos.MetricRequest) error); ok {
|
||||
r1 = rf(ctx, _a1, skipCache, reqDTO)
|
||||
r1 = rf(ctx, _a1, skipDSCache, reqDTO)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
@ -140,7 +140,7 @@ func requestContextMiddleware() web.Middleware {
|
||||
c.IsSignedIn = ctx.IsSignedIn
|
||||
c.IsRenderCall = ctx.IsRenderCall
|
||||
c.AllowAnonymous = ctx.AllowAnonymous
|
||||
c.SkipCache = ctx.SkipCache
|
||||
c.SkipDSCache = ctx.SkipDSCache
|
||||
c.RequestNonce = ctx.RequestNonce
|
||||
c.PerfmonTimer = ctx.PerfmonTimer
|
||||
c.LookupTokenErr = ctx.LookupTokenErr
|
||||
|
Loading…
Reference in New Issue
Block a user