mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Plugins: Angular patterns: Use ETag for GCOM requests (#74453)
* Plugins: Dynamic angular patterns: Send If-None-Match to GCOM, store ETag * Fix SetETag settings the wrong key in underlying kvstore * Fix wrong type in GCOMResponse.Patterns and wrong content being saved * Fix ctx passing to GetETag in background job * Added more ETag tests * More ETag tests * Set last updated and log when not modified is returned * Fix missing in-memory detectors update when etag matches, add comments * Fix mutex usage
This commit is contained in:
parent
8caf85b99a
commit
e9a12598db
@ -19,6 +19,8 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/pluginsintegration/angularpatternsstore"
|
||||
)
|
||||
|
||||
var errNotModified = errors.New("not modified")
|
||||
|
||||
// backgroundJobInterval is the interval that passes between background job runs.
|
||||
// It can be overwritten in tests.
|
||||
var backgroundJobInterval = time.Hour * 1
|
||||
@ -61,11 +63,13 @@ func ProvideDynamic(cfg *config.Cfg, store angularpatternsstore.Service, feature
|
||||
// Perform the initial restore from db
|
||||
st := time.Now()
|
||||
d.log.Debug("Restoring cache")
|
||||
d.mux.Lock()
|
||||
if err := d.setDetectorsFromCache(context.Background()); err != nil {
|
||||
d.log.Warn("Cache restore failed", "error", err)
|
||||
} else {
|
||||
d.log.Info("Restored cache from database", "duration", time.Since(st))
|
||||
}
|
||||
d.mux.Unlock()
|
||||
return d, nil
|
||||
}
|
||||
|
||||
@ -94,62 +98,96 @@ func (d *Dynamic) patternsToDetectors(patterns GCOMPatterns) ([]angulardetector.
|
||||
return detectors, nil
|
||||
}
|
||||
|
||||
type GCOMResponse struct {
|
||||
Patterns GCOMPatterns
|
||||
ETag string
|
||||
}
|
||||
|
||||
// fetch fetches the angular patterns from GCOM and returns them as GCOMPatterns.
|
||||
// Call detectors() on the returned value to get the corresponding detectors.
|
||||
func (d *Dynamic) fetch(ctx context.Context) (GCOMPatterns, error) {
|
||||
// If etag is not empty, it will be sent as If-None-Match header.
|
||||
// If the response status code is 304, it returns errNotModified.
|
||||
func (d *Dynamic) fetch(ctx context.Context, etag string) (GCOMResponse, error) {
|
||||
st := time.Now()
|
||||
|
||||
reqURL, err := url.JoinPath(d.baseURL, gcomAngularPatternsPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("url joinpath: %w", err)
|
||||
return GCOMResponse{}, fmt.Errorf("url joinpath: %w", err)
|
||||
}
|
||||
|
||||
d.log.Debug("Fetching dynamic angular detection patterns", "url", reqURL)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("new request with context: %w", err)
|
||||
return GCOMResponse{}, fmt.Errorf("new request with context: %w", err)
|
||||
}
|
||||
if etag != "" {
|
||||
req.Header.Add("If-None-Match", etag)
|
||||
}
|
||||
|
||||
var r GCOMResponse
|
||||
resp, err := d.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("http do: %w", err)
|
||||
return GCOMResponse{}, fmt.Errorf("http do: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
if closeErr := resp.Body.Close(); closeErr != nil {
|
||||
d.log.Error("Response body close error", "error", err)
|
||||
}
|
||||
}()
|
||||
if resp.StatusCode == http.StatusNotModified {
|
||||
return GCOMResponse{}, errNotModified
|
||||
}
|
||||
if resp.StatusCode/100 != 2 {
|
||||
return nil, fmt.Errorf("bad status code: %d", resp.StatusCode)
|
||||
return GCOMResponse{}, fmt.Errorf("bad status code: %d", resp.StatusCode)
|
||||
}
|
||||
var out GCOMPatterns
|
||||
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
||||
return nil, fmt.Errorf("json decode: %w", err)
|
||||
if err := json.NewDecoder(resp.Body).Decode(&r.Patterns); err != nil {
|
||||
return GCOMResponse{}, fmt.Errorf("json decode: %w", err)
|
||||
}
|
||||
d.log.Debug("Fetched dynamic angular detection patterns", "patterns", len(out), "duration", time.Since(st))
|
||||
return out, nil
|
||||
r.ETag = resp.Header.Get("ETag")
|
||||
d.log.Debug("Fetched dynamic angular detection patterns", "patterns", len(r.Patterns), "duration", time.Since(st))
|
||||
return r, nil
|
||||
}
|
||||
|
||||
// updateDetectors fetches the patterns from GCOM, converts them to detectors,
|
||||
// stores the patterns in the database and update the cached detectors.
|
||||
func (d *Dynamic) updateDetectors(ctx context.Context) error {
|
||||
func (d *Dynamic) updateDetectors(ctx context.Context, etag string) error {
|
||||
// Fetch patterns from GCOM
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
patterns, err := d.fetch(ctx)
|
||||
if err != nil {
|
||||
|
||||
resp, err := d.fetch(ctx, etag)
|
||||
switch {
|
||||
case err == nil:
|
||||
break
|
||||
case errors.Is(err, errNotModified):
|
||||
// If patterns did not change, update the last updated anyways,
|
||||
// so we don't keep trying to fetch them, but update the in-memory
|
||||
// detectors from the previously stored value.
|
||||
d.log.Debug("Not modified, skipping update")
|
||||
if err := d.store.SetLastUpdated(ctx); err != nil {
|
||||
return fmt.Errorf("set last updated: %w", err)
|
||||
}
|
||||
// Update in-memory detectors, by reading current value in the kvstore
|
||||
if err := d.setDetectorsFromCache(ctx); err != nil {
|
||||
return fmt.Errorf("set detectors from cache: %w", err)
|
||||
}
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("fetch: %w", err)
|
||||
}
|
||||
|
||||
// Convert the patterns to detectors
|
||||
newDetectors, err := d.patternsToDetectors(patterns)
|
||||
newDetectors, err := d.patternsToDetectors(resp.Patterns)
|
||||
if err != nil {
|
||||
return fmt.Errorf("patterns convert to detectors: %w", err)
|
||||
}
|
||||
|
||||
// Update store only if the patterns can be converted to detectors
|
||||
if err := d.store.Set(ctx, patterns); err != nil {
|
||||
if err := d.store.Set(ctx, resp.Patterns); err != nil {
|
||||
return fmt.Errorf("store set: %w", err)
|
||||
}
|
||||
if err := d.store.SetETag(ctx, resp.ETag); err != nil {
|
||||
return fmt.Errorf("store set etag: %w", err)
|
||||
}
|
||||
|
||||
// Update cached detectors
|
||||
d.detectors = newDetectors
|
||||
@ -159,9 +197,6 @@ func (d *Dynamic) updateDetectors(ctx context.Context) error {
|
||||
// setDetectorsFromCache sets the in-memory detectors from the patterns in the store.
|
||||
// The caller must Lock d.mux before calling this function.
|
||||
func (d *Dynamic) setDetectorsFromCache(ctx context.Context) error {
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
var cachedPatterns GCOMPatterns
|
||||
rawCached, ok, err := d.store.Get(ctx)
|
||||
if !ok {
|
||||
@ -237,8 +272,15 @@ func (d *Dynamic) Run(ctx context.Context) error {
|
||||
case <-tick:
|
||||
st := time.Now()
|
||||
d.log.Debug("Updating patterns")
|
||||
|
||||
if err := d.updateDetectors(context.Background()); err != nil {
|
||||
etag, ok, err := d.store.GetETag(context.Background())
|
||||
if err != nil {
|
||||
d.log.Error("Error while getting etag", "error", err)
|
||||
}
|
||||
// Ensure etag is empty if we don't have a value
|
||||
if !ok {
|
||||
etag = ""
|
||||
}
|
||||
if err := d.updateDetectors(context.Background(), etag); err != nil {
|
||||
d.log.Error("Error while updating detectors", "error", err)
|
||||
}
|
||||
d.log.Info("Patterns update finished", "duration", time.Since(st))
|
||||
|
@ -92,18 +92,19 @@ func TestDynamicAngularDetectorsProvider(t *testing.T) {
|
||||
|
||||
t.Run("fetch", func(t *testing.T) {
|
||||
t.Run("returns value from gcom api", func(t *testing.T) {
|
||||
r, err := svc.fetch(context.Background())
|
||||
r, err := svc.fetch(context.Background(), "")
|
||||
require.NoError(t, err)
|
||||
|
||||
require.True(t, gcom.httpCalls.calledOnce(), "gcom api should be called")
|
||||
require.Equal(t, mockGCOMPatterns, r)
|
||||
require.Equal(t, mockGCOMPatterns, r.Patterns)
|
||||
require.Empty(t, r.ETag)
|
||||
})
|
||||
|
||||
t.Run("handles timeout", func(t *testing.T) {
|
||||
// ctx that expired in the past
|
||||
ctx, canc := context.WithDeadline(context.Background(), time.Now().Add(time.Second*-30))
|
||||
defer canc()
|
||||
_, err := svc.fetch(ctx)
|
||||
_, err := svc.fetch(ctx, "")
|
||||
require.ErrorIs(t, err, context.DeadlineExceeded)
|
||||
require.False(t, gcom.httpCalls.called(), "gcom api should not be called")
|
||||
require.Empty(t, svc.ProvideDetectors(context.Background()))
|
||||
@ -119,9 +120,60 @@ func TestDynamicAngularDetectorsProvider(t *testing.T) {
|
||||
errSrv := errScenario.newHTTPTestServer()
|
||||
t.Cleanup(errSrv.Close)
|
||||
svc := provideDynamic(t, errSrv.URL)
|
||||
_, err := svc.fetch(context.Background())
|
||||
_, err := svc.fetch(context.Background(), "")
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("etag", func(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
clientEtag string
|
||||
|
||||
serverEtag string
|
||||
expError error
|
||||
}{
|
||||
{name: "no client etag", clientEtag: "", serverEtag: "etag", expError: nil},
|
||||
{name: "no server etag", clientEtag: `"abcdef"`, serverEtag: "", expError: nil},
|
||||
{name: "client different etag than server", clientEtag: `"abcdef"`, serverEtag: "etag", expError: nil},
|
||||
{name: "same client and server etag returns errNotModified", clientEtag: `"etag"`, serverEtag: `"etag"`, expError: errNotModified},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
callback := make(chan struct{})
|
||||
gcom := newDefaultGCOMScenario(func(writer http.ResponseWriter, request *http.Request) {
|
||||
const headerIfNoneMatch = "If-None-Match"
|
||||
if tc.clientEtag == "" {
|
||||
require.Empty(t, request.Header.Values(headerIfNoneMatch))
|
||||
} else {
|
||||
require.Equal(t, tc.clientEtag, request.Header.Get(headerIfNoneMatch))
|
||||
}
|
||||
if tc.serverEtag != "" {
|
||||
writer.Header().Add("ETag", tc.serverEtag)
|
||||
if tc.serverEtag == tc.clientEtag {
|
||||
writer.WriteHeader(http.StatusNotModified)
|
||||
}
|
||||
}
|
||||
close(callback)
|
||||
})
|
||||
srv := gcom.newHTTPTestServer()
|
||||
t.Cleanup(srv.Close)
|
||||
svc := provideDynamic(t, srv.URL)
|
||||
|
||||
_, err := svc.fetch(context.Background(), tc.clientEtag)
|
||||
if tc.expError != nil {
|
||||
require.ErrorIs(t, err, tc.expError)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
select {
|
||||
case <-callback:
|
||||
break
|
||||
case <-time.After(time.Second * 10):
|
||||
t.Fatal("timeout")
|
||||
}
|
||||
require.True(t, gcom.httpCalls.calledOnce(), "gcom api should be called")
|
||||
})
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("updateDetectors", func(t *testing.T) {
|
||||
@ -141,7 +193,7 @@ func TestDynamicAngularDetectorsProvider(t *testing.T) {
|
||||
require.Empty(t, svc.ProvideDetectors(context.Background()))
|
||||
|
||||
// Fetch and store value
|
||||
err = svc.updateDetectors(context.Background())
|
||||
err = svc.updateDetectors(context.Background(), "")
|
||||
require.NoError(t, err)
|
||||
checkMockDetectors(t, svc)
|
||||
|
||||
@ -178,7 +230,7 @@ func TestDynamicAngularDetectorsProvider(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
// Try to update from GCOM, but it returns an error
|
||||
err = svc.updateDetectors(context.Background())
|
||||
err = svc.updateDetectors(context.Background(), "")
|
||||
require.Error(t, err)
|
||||
require.True(t, scenario.httpCalls.calledOnce(), "gcom api should be called once")
|
||||
|
||||
@ -195,6 +247,45 @@ func TestDynamicAngularDetectorsProvider(t *testing.T) {
|
||||
// Same for in-memory detectors
|
||||
checkMockDetectors(t, svc)
|
||||
})
|
||||
|
||||
t.Run("etag", func(t *testing.T) {
|
||||
const serverEtag = "hit"
|
||||
gcom := newEtagGCOMScenario(serverEtag)
|
||||
srv := gcom.newHTTPTestServer()
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
t.Run("etag is saved in underlying store", func(t *testing.T) {
|
||||
svc := provideDynamic(t, srv.URL)
|
||||
|
||||
err := svc.updateDetectors(context.Background(), "old")
|
||||
require.NoError(t, err)
|
||||
|
||||
etag, ok, err := svc.store.GetETag(context.Background())
|
||||
require.NoError(t, err)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, serverEtag, etag)
|
||||
|
||||
lastUpdate, err := svc.store.GetLastUpdated(context.Background())
|
||||
require.NoError(t, err)
|
||||
require.WithinDuration(t, lastUpdate, time.Now(), time.Second*10)
|
||||
})
|
||||
|
||||
t.Run("same etag does not modify underlying store", func(t *testing.T) {
|
||||
svc := provideDynamic(t, srv.URL)
|
||||
require.NoError(t, svc.updateDetectors(context.Background(), serverEtag))
|
||||
_, ok, err := svc.store.Get(context.Background())
|
||||
require.NoError(t, err)
|
||||
require.False(t, ok)
|
||||
})
|
||||
|
||||
t.Run("different etag modified underlying store", func(t *testing.T) {
|
||||
svc := provideDynamic(t, srv.URL)
|
||||
require.NoError(t, svc.updateDetectors(context.Background(), "old"))
|
||||
_, ok, err := svc.store.Get(context.Background())
|
||||
require.NoError(t, err)
|
||||
require.True(t, ok)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("setDetectorsFromCache", func(t *testing.T) {
|
||||
@ -444,12 +535,23 @@ func (s *gcomScenario) newHTTPTestServer() *httptest.Server {
|
||||
}))
|
||||
}
|
||||
|
||||
func newEtagGCOMScenario(etag string) *gcomScenario {
|
||||
return &gcomScenario{httpHandlerFunc: func(w http.ResponseWriter, req *http.Request) {
|
||||
if req.Header.Get("If-None-Match") == etag {
|
||||
w.WriteHeader(http.StatusNotModified)
|
||||
return
|
||||
}
|
||||
w.Header().Add("ETag", etag)
|
||||
mockGCOMHTTPHandlerFunc(w, req)
|
||||
}}
|
||||
}
|
||||
|
||||
func newDefaultGCOMScenario(middlewares ...http.HandlerFunc) *gcomScenario {
|
||||
return &gcomScenario{httpHandlerFunc: func(w http.ResponseWriter, req *http.Request) {
|
||||
mockGCOMHTTPHandlerFunc(w, req)
|
||||
for _, f := range middlewares {
|
||||
f(w, req)
|
||||
}
|
||||
mockGCOMHTTPHandlerFunc(w, req)
|
||||
}}
|
||||
}
|
||||
|
||||
|
@ -10,13 +10,19 @@ import (
|
||||
|
||||
type Service interface {
|
||||
GetLastUpdated(ctx context.Context) (time.Time, error)
|
||||
|
||||
Get(ctx context.Context) (string, bool, error)
|
||||
Set(ctx context.Context, value any) error
|
||||
SetLastUpdated(ctx context.Context) error
|
||||
|
||||
GetETag(ctx context.Context) (string, bool, error)
|
||||
SetETag(ctx context.Context, etag string) error
|
||||
}
|
||||
|
||||
const (
|
||||
kvNamespace = "plugin.angularpatterns"
|
||||
keyPatterns = "angular_patterns"
|
||||
keyEtag = "etag"
|
||||
)
|
||||
|
||||
// KVStoreService allows to cache GCOM angular patterns into the database, as a cache.
|
||||
@ -37,7 +43,17 @@ func (s *KVStoreService) Get(ctx context.Context) (string, bool, error) {
|
||||
return s.CacheKvStore.Get(ctx, keyPatterns)
|
||||
}
|
||||
|
||||
// Set stores the given angular patterns in the underlying cachekvstore.s
|
||||
// GetETag returns the stored etag from the underlying cachekvstore.
|
||||
func (s *KVStoreService) GetETag(ctx context.Context) (string, bool, error) {
|
||||
return s.CacheKvStore.Get(ctx, keyEtag)
|
||||
}
|
||||
|
||||
// Set stores the given angular patterns in the underlying cachekvstore.
|
||||
func (s *KVStoreService) Set(ctx context.Context, value any) error {
|
||||
return s.CacheKvStore.Set(ctx, keyPatterns, value)
|
||||
}
|
||||
|
||||
// SetETag stores the given etag in the underlying cachekvstore.
|
||||
func (s *KVStoreService) SetETag(ctx context.Context, etag string) error {
|
||||
return s.CacheKvStore.Set(ctx, keyEtag, etag)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user