mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
* define initial service and add to wire * update caching service interface * add skipQueryCache header handler and update metrics query function to use it * add caching service as a dependency to query service * working caching impl * propagate cache status to frontend in response * beginning of improvements suggested by Lean - separate caching logic from query logic. * more changes to simplify query function * Decided to revert renaming of function * Remove error status from cache request * add extra documentation * Move query caching duration metric to query package * add a little bit of documentation * wip: convert resource caching * Change return type of query service QueryData to a QueryDataResponse with Headers * update codeowners * change X-Cache value to const * use resource caching in endpoint handlers * write resource headers to response even if it's not a cache hit * fix panic caused by lack of nil check * update unit test * remove NONE header - shouldn't show up in OSS * Convert everything to use the plugin middleware * revert a few more things * clean up unused vars * start reverting resource caching, start to implement in plugin middleware * revert more, fix typo * Update caching interfaces - resource caching now has a separate cache method * continue wiring up new resource caching conventions - still in progress * add more safety to implementation * remove some unused objects * remove some code that I left in by accident * add some comments, fix codeowners, fix duplicate registration * fix source of panic in resource middleware * Update client decorator test to provide an empty response object * create tests for caching middleware * fix unit test * Update pkg/services/caching/service.go Co-authored-by: Arati R. <33031346+suntala@users.noreply.github.com> * improve error message in error log * quick docs update * Remove use of mockery. Update return signature to return an explicit hit/miss bool * create unit test for empty request context * rename caching metrics to make it clear they pertain to caching * Update pkg/services/pluginsintegration/clientmiddleware/caching_middleware.go Co-authored-by: Marcus Efraimsson <marcus.efraimsson@gmail.com> * Add clarifying comments to cache skip middleware func * Add comment pointing to the resource cache update call * fix unit tests (missing dependency) * try to fix mystery syntax error * fix a panic * Caching: Introduce feature toggle to caching service refactor (#66323) * introduce new feature toggle * hide calls to new service behind a feature flag * remove licensing flag from toggle (misunderstood what it was for) * fix unit tests * rerun toggle gen --------- Co-authored-by: Arati R. <33031346+suntala@users.noreply.github.com> Co-authored-by: Marcus Efraimsson <marcus.efraimsson@gmail.com>
145 lines
5.2 KiB
Go
145 lines
5.2 KiB
Go
package clientmiddleware
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
|
"github.com/grafana/grafana/pkg/infra/log"
|
|
"github.com/grafana/grafana/pkg/plugins"
|
|
"github.com/grafana/grafana/pkg/services/caching"
|
|
"github.com/grafana/grafana/pkg/services/contexthandler"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
)
|
|
|
|
// NewCachingMiddleware creates a new plugins.ClientMiddleware that will
|
|
// attempt to read and write query results to the cache
|
|
func NewCachingMiddleware(cachingService caching.CachingService) plugins.ClientMiddleware {
|
|
log := log.New("caching_middleware")
|
|
if err := prometheus.Register(QueryCachingRequestHistogram); err != nil {
|
|
log.Error("error registering prometheus collector 'QueryRequestHistogram'", "error", err)
|
|
}
|
|
if err := prometheus.Register(ResourceCachingRequestHistogram); err != nil {
|
|
log.Error("error registering prometheus collector 'ResourceRequestHistogram'", "error", err)
|
|
}
|
|
return plugins.ClientMiddlewareFunc(func(next plugins.Client) plugins.Client {
|
|
return &CachingMiddleware{
|
|
next: next,
|
|
caching: cachingService,
|
|
log: log,
|
|
}
|
|
})
|
|
}
|
|
|
|
type CachingMiddleware struct {
|
|
next plugins.Client
|
|
caching caching.CachingService
|
|
log log.Logger
|
|
}
|
|
|
|
// QueryData receives a data request and attempts to access results already stored in the cache for that request.
|
|
// If data is found, it will return it immediately. Otherwise, it will perform the queries as usual, then write the response to the cache.
|
|
// If the cache service is implemented, we capture the request duration as a metric. The service is expected to write any response headers.
|
|
func (m *CachingMiddleware) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
|
if req == nil {
|
|
return m.next.QueryData(ctx, req)
|
|
}
|
|
|
|
reqCtx := contexthandler.FromContext(ctx)
|
|
if reqCtx == nil {
|
|
return m.next.QueryData(ctx, req)
|
|
}
|
|
|
|
// time how long this request takes
|
|
start := time.Now()
|
|
|
|
// First look in the query cache if enabled
|
|
hit, cr := m.caching.HandleQueryRequest(ctx, req)
|
|
|
|
defer func() {
|
|
// record request duration if caching was used
|
|
if ch := reqCtx.Resp.Header().Get(caching.XCacheHeader); ch != "" {
|
|
QueryCachingRequestHistogram.With(prometheus.Labels{
|
|
"datasource_type": req.PluginContext.DataSourceInstanceSettings.Type,
|
|
"cache": ch,
|
|
"query_type": getQueryType(reqCtx),
|
|
}).Observe(time.Since(start).Seconds())
|
|
}
|
|
}()
|
|
|
|
// Cache hit; return the response
|
|
if hit {
|
|
return cr.Response, nil
|
|
}
|
|
|
|
// Cache miss; do the actual queries
|
|
resp, err := m.next.QueryData(ctx, req)
|
|
|
|
// Update the query cache with the result for this metrics request
|
|
if err == nil && cr.UpdateCacheFn != nil {
|
|
cr.UpdateCacheFn(ctx, resp)
|
|
}
|
|
|
|
return resp, err
|
|
}
|
|
|
|
// CallResource receives a resource request and attempts to access results already stored in the cache for that request.
|
|
// If data is found, it will return it immediately. Otherwise, it will perform the request as usual. The caller of CallResource is expected to explicitly update the cache with any responses.
|
|
// If the cache service is implemented, we capture the request duration as a metric. The service is expected to write any response headers.
|
|
func (m *CachingMiddleware) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
|
|
if req == nil {
|
|
return m.next.CallResource(ctx, req, sender)
|
|
}
|
|
|
|
reqCtx := contexthandler.FromContext(ctx)
|
|
if reqCtx == nil {
|
|
return m.next.CallResource(ctx, req, sender)
|
|
}
|
|
|
|
// time how long this request takes
|
|
start := time.Now()
|
|
|
|
// First look in the resource cache if enabled
|
|
hit, resp := m.caching.HandleResourceRequest(ctx, req)
|
|
|
|
defer func() {
|
|
// record request duration if caching was used
|
|
if ch := reqCtx.Resp.Header().Get(caching.XCacheHeader); ch != "" {
|
|
ResourceCachingRequestHistogram.With(prometheus.Labels{
|
|
"plugin_id": req.PluginContext.PluginID,
|
|
"cache": ch,
|
|
}).Observe(time.Since(start).Seconds())
|
|
}
|
|
}()
|
|
|
|
// Cache hit; send the response and return
|
|
if hit {
|
|
return sender.Send(resp)
|
|
}
|
|
|
|
// Cache miss; do the actual request
|
|
// The call to update the cache happens in /pkg/api/plugin_resource.go in the flushStream() func
|
|
// TODO: Implement updating the cache from this method
|
|
return m.next.CallResource(ctx, req, sender)
|
|
}
|
|
|
|
func (m *CachingMiddleware) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
|
|
return m.next.CheckHealth(ctx, req)
|
|
}
|
|
|
|
func (m *CachingMiddleware) CollectMetrics(ctx context.Context, req *backend.CollectMetricsRequest) (*backend.CollectMetricsResult, error) {
|
|
return m.next.CollectMetrics(ctx, req)
|
|
}
|
|
|
|
func (m *CachingMiddleware) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
|
|
return m.next.SubscribeStream(ctx, req)
|
|
}
|
|
|
|
func (m *CachingMiddleware) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
|
|
return m.next.PublishStream(ctx, req)
|
|
}
|
|
|
|
func (m *CachingMiddleware) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
|
|
return m.next.RunStream(ctx, req, sender)
|
|
}
|