Plugins: Refactor kvstore usage in signing keys and angular patterns (#73154)

* Initial refactoring work for plugins kvstore

* Replace implementations for keystore and angularstore

* Cleanup

* add interface check

* lint

* fix storeKeyGetter not being called in namespacedstore set

* Fix tests

* Comments

* Add tests

* Fix invalid cap in ListKeys when store is empty

* Update docstrings

* Add setLastUpdatedOnDelete

* Renamed DefaultStoreKeyGetterFunc, add TestDefaultStoreKeyGetter

* Sort imports

* PR review: removed last_updated key

* PR review: Removed setLastUpdatedOnDelete

* Re-added relevant tests

* PR review: Removed SingleKeyStore

* PR review: Removed custom marshaling support

* Renamed marshaler.go to marshal.go

* PR review: removed unused interfaces

* PR review: Moved marshal into namespacedstore.go

* PR review: removed storekeygetter

* Removed unused file cachekvstore.go

* Renamed NamespacedStore to CacheKvStore

* removed todo
This commit is contained in:
Giuseppe Guerra 2023-09-05 16:20:42 +02:00 committed by GitHub
parent 41ca13418b
commit 2e67a9463d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 361 additions and 110 deletions

View File

@ -173,10 +173,10 @@ type SignatureCalculator interface {
type KeyStore interface {
Get(ctx context.Context, key string) (string, bool, error)
Set(ctx context.Context, key string, value string) error
Del(ctx context.Context, key string) error
Set(ctx context.Context, key string, value any) error
Delete(ctx context.Context, key string) error
ListKeys(ctx context.Context) ([]string, error)
GetLastUpdated(ctx context.Context) (*time.Time, error)
GetLastUpdated(ctx context.Context) (time.Time, error)
SetLastUpdated(ctx context.Context) error
}

View File

@ -2,74 +2,42 @@ package angularpatternsstore
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/grafana/grafana/pkg/infra/kvstore"
"github.com/grafana/grafana/pkg/services/pluginsintegration/cachekvstore"
)
type Service interface {
Get(ctx context.Context) (string, bool, error)
Set(ctx context.Context, patterns any) error
GetLastUpdated(ctx context.Context) (time.Time, error)
Get(ctx context.Context) (string, bool, error)
Set(ctx context.Context, value any) error
}
const (
kvNamespace = "plugin.angularpatterns"
keyPatterns = "angular_patterns"
keyLastUpdated = "last_updated"
keyPatterns = "angular_patterns"
)
// KVStoreService allows to cache GCOM angular patterns into the database, as a cache.
type KVStoreService struct {
kv *kvstore.NamespacedKVStore
*cachekvstore.CacheKvStore
}
var _ Service = (*KVStoreService)(nil)
func ProvideService(kv kvstore.KVStore) Service {
return &KVStoreService{
kv: kvstore.WithNamespace(kv, 0, kvNamespace),
CacheKvStore: cachekvstore.NewCacheKvStore(kv, kvNamespace),
}
}
// Get returns the raw cached angular detection patterns. The returned value is a JSON-encoded string.
// If no value is present, the second argument is false and the returned error is nil.
// Get returns the stored angular patterns from the underlying cachekvstore.
func (s *KVStoreService) Get(ctx context.Context) (string, bool, error) {
return s.kv.Get(ctx, keyPatterns)
return s.CacheKvStore.Get(ctx, keyPatterns)
}
// Set sets the cached angular detection patterns and the latest update time to time.Now().
// patterns must implement json.Marshaler.
func (s *KVStoreService) Set(ctx context.Context, patterns any) error {
b, err := json.Marshal(patterns)
if err != nil {
return fmt.Errorf("json marshal: %w", err)
}
if err := s.kv.Set(ctx, keyPatterns, string(b)); err != nil {
return fmt.Errorf("kv set: %w", err)
}
if err := s.kv.Set(ctx, keyLastUpdated, time.Now().Format(time.RFC3339)); err != nil {
return fmt.Errorf("kv last updated set: %w", err)
}
return nil
}
// GetLastUpdated returns the time when Set was last called. If the value cannot be unmarshalled correctly,
// it returns a zero-value time.Time.
func (s *KVStoreService) GetLastUpdated(ctx context.Context) (time.Time, error) {
v, ok, err := s.kv.Get(ctx, keyLastUpdated)
if err != nil {
return time.Time{}, fmt.Errorf("kv get: %w", err)
}
if !ok {
return time.Time{}, nil
}
t, err := time.Parse(time.RFC3339, v)
if err != nil {
// Ignore decode errors, so we can change the format in future versions
// and keep backwards/forwards compatibility
return time.Time{}, nil
}
return t, nil
// Set stores the given angular patterns in the underlying cachekvstore.s
func (s *KVStoreService) Set(ctx context.Context, value any) error {
return s.CacheKvStore.Set(ctx, keyPatterns, value)
}

View File

@ -41,7 +41,8 @@ func TestAngularPatternsStore(t *testing.T) {
})
t.Run("latest update", func(t *testing.T) {
svc := ProvideService(kvstore.NewFakeKVStore())
underlyingKv := kvstore.NewFakeKVStore()
svc := ProvideService(underlyingKv)
t.Run("empty", func(t *testing.T) {
lastUpdated, err := svc.GetLastUpdated(context.Background())
@ -59,7 +60,7 @@ func TestAngularPatternsStore(t *testing.T) {
})
t.Run("invalid timestamp stored", func(t *testing.T) {
err := svc.(*KVStoreService).kv.Set(context.Background(), keyLastUpdated, "abcd")
err := underlyingKv.Set(context.Background(), 0, kvNamespace, "last_updated", "abcd")
require.NoError(t, err)
lastUpdated, err := svc.GetLastUpdated(context.Background())

View File

@ -0,0 +1,142 @@
package cachekvstore
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/grafana/grafana/pkg/infra/kvstore"
)
// keyLastUpdated is the key used to store the last updated time.
const keyLastUpdated = "last_updated"
// CacheKvStore is a Store that stores data in a *kvstore.NamespacedKVStore.
// It also stores a last updated time, which is unique for all the keys and is updated on each call to `Set`,
// and can be used to determine if the data is stale.
type CacheKvStore struct {
// kv is the underlying KV store.
kv *kvstore.NamespacedKVStore
// keyPrefix is the prefix to use for all the keys.
keyPrefix string
}
// NewCacheKvStoreWithPrefix creates a new CacheKvStore using the provided underlying KVStore, namespace and prefix.
func NewCacheKvStoreWithPrefix(kv kvstore.KVStore, namespace, prefix string) *CacheKvStore {
return &CacheKvStore{
kv: kvstore.WithNamespace(kv, 0, namespace),
keyPrefix: prefix,
}
}
// NewCacheKvStore creates a new CacheKvStore using the provided underlying KVStore and namespace.
func NewCacheKvStore(kv kvstore.KVStore, namespace string) *CacheKvStore {
return NewCacheKvStoreWithPrefix(kv, namespace, "")
}
// storeKey returns the key to use in the underlying store for the given key.
func (s *CacheKvStore) storeKey(k string) string {
return s.keyPrefix + k
}
// Get returns the value for the given key.
// If no value is present, the second argument is false and the returned error is nil.
func (s *CacheKvStore) Get(ctx context.Context, key string) (string, bool, error) {
return s.kv.Get(ctx, s.storeKey(key))
}
// Set sets the value for the given key and updates the last updated time.
// It uses the marshal method to marshal the value before storing it.
// This means that the value to store can implement the Marshaler interface to control how it is stored.
func (s *CacheKvStore) Set(ctx context.Context, key string, value any) error {
valueToStore, err := marshal(value)
if err != nil {
return fmt.Errorf("marshal: %w", err)
}
if err := s.kv.Set(ctx, s.storeKey(key), valueToStore); err != nil {
return fmt.Errorf("kv set: %w", err)
}
if err := s.SetLastUpdated(ctx); err != nil {
return fmt.Errorf("set last updated: %w", err)
}
return nil
}
// GetLastUpdated returns the last updated time.
// If the last updated time is not set, it returns a zero time.
func (s *CacheKvStore) GetLastUpdated(ctx context.Context) (time.Time, error) {
v, ok, err := s.kv.Get(ctx, keyLastUpdated)
if err != nil {
return time.Time{}, fmt.Errorf("kv get: %w", err)
}
if !ok {
return time.Time{}, nil
}
t, err := time.Parse(time.RFC3339, v)
if err != nil {
// Ignore decode errors, so we can change the format in future versions
// and keep backwards/forwards compatibility
return time.Time{}, nil
}
return t, nil
}
// SetLastUpdated sets the last updated time to the current time.
// The last updated time is shared between all the keys for this store.
func (s *CacheKvStore) SetLastUpdated(ctx context.Context) error {
return s.kv.Set(ctx, keyLastUpdated, time.Now().Format(time.RFC3339))
}
// Delete deletes the value for the given key and it also updates the last updated time.
func (s *CacheKvStore) Delete(ctx context.Context, key string) error {
if err := s.kv.Del(ctx, s.storeKey(key)); err != nil {
return fmt.Errorf("kv del: %w", err)
}
if err := s.SetLastUpdated(ctx); err != nil {
return fmt.Errorf("set last updated: %w", err)
}
return nil
}
// ListKeys returns all the keys in the store.
func (s *CacheKvStore) ListKeys(ctx context.Context) ([]string, error) {
keys, err := s.kv.Keys(ctx, s.storeKey(""))
if err != nil {
return nil, err
}
if len(keys) == 0 {
return nil, nil
}
res := make([]string, 0, len(keys)-1)
for _, key := range keys {
// Filter out last updated time
if key.Key == keyLastUpdated {
continue
}
res = append(res, key.Key)
}
return res, nil
}
// marshal marshals the provided value to a string to store it in the kv store.
// The provided value can be of a type implementing fmt.Stringer, a string or []byte.
// If the value is none of those, it is marshaled to JSON.
func marshal(value any) (string, error) {
switch value := value.(type) {
case fmt.Stringer:
return value.String(), nil
case string:
return value, nil
case []byte:
return string(value), nil
default:
b, err := json.Marshal(value)
if err != nil {
return "", fmt.Errorf("json marshal: %w", err)
}
return string(b), nil
}
}

View File

@ -0,0 +1,188 @@
package cachekvstore
import (
"context"
"encoding/json"
"fmt"
"sort"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/infra/kvstore"
)
func TestNamespacedStore(t *testing.T) {
const namespace = "namespace"
t.Run("simple", func(t *testing.T) {
store := NewCacheKvStore(kvstore.NewFakeKVStore(), namespace)
t.Run("default last updated time is zero", func(t *testing.T) {
ts, err := store.GetLastUpdated(context.Background())
require.NoError(t, err)
require.Zero(t, ts)
})
t.Run("Get returns false if key does not exist", func(t *testing.T) {
_, ok, err := store.Get(context.Background(), "key")
require.NoError(t, err)
require.False(t, ok)
})
t.Run("Set sets the value and updates the last updated time", func(t *testing.T) {
ts, err := store.GetLastUpdated(context.Background())
require.NoError(t, err)
require.Zero(t, ts)
require.NoError(t, store.Set(context.Background(), "key", "value"))
ts, err = store.GetLastUpdated(context.Background())
require.NoError(t, err)
require.NotZero(t, ts)
require.WithinDuration(t, ts, time.Now(), time.Second*10)
v, ok, err := store.Get(context.Background(), "key")
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, "value", v)
})
t.Run("Delete deletes the value", func(t *testing.T) {
// First store
require.NoError(t, store.Set(context.Background(), "key", "value"))
// Then read it
v, ok, err := store.Get(context.Background(), "key")
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, "value", v)
// Delete it
require.NoError(t, store.Delete(context.Background(), "key"))
// Read it again
_, ok, err = store.Get(context.Background(), "key")
require.NoError(t, err)
require.False(t, ok)
})
t.Run("sets last updated on delete", func(t *testing.T) {
store := NewCacheKvStore(kvstore.NewFakeKVStore(), namespace)
ts, err := store.GetLastUpdated(context.Background())
require.NoError(t, err)
require.Zero(t, ts)
require.NoError(t, store.Delete(context.Background(), "key"))
ts, err = store.GetLastUpdated(context.Background())
require.NoError(t, err)
require.WithinDuration(t, time.Now(), ts, time.Second*10)
})
t.Run("last updated key is used in GetLastUpdated", func(t *testing.T) {
store := NewCacheKvStore(kvstore.NewFakeKVStore(), namespace)
// Set in underlying store
ts := time.Now()
require.NoError(t, store.kv.Set(context.Background(), keyLastUpdated, ts.Format(time.RFC3339)))
// Make sure we get the same value
storeTs, err := store.GetLastUpdated(context.Background())
require.NoError(t, err)
// Format to account for marshal/unmarshal differences
require.Equal(t, ts.Format(time.RFC3339), storeTs.Format(time.RFC3339))
})
t.Run("last updated key is used in SetLastUpdated", func(t *testing.T) {
store := NewCacheKvStore(kvstore.NewFakeKVStore(), namespace)
require.NoError(t, store.SetLastUpdated(context.Background()))
marshaledStoreTs, ok, err := store.kv.Get(context.Background(), keyLastUpdated)
require.NoError(t, err)
require.True(t, ok)
storeTs, err := time.Parse(time.RFC3339, marshaledStoreTs)
require.NoError(t, err)
require.WithinDuration(t, time.Now(), storeTs, time.Second*10)
})
t.Run("ListKeys", func(t *testing.T) {
t.Run("returns empty list if no keys", func(t *testing.T) {
keys, err := store.ListKeys(context.Background())
require.NoError(t, err)
require.Empty(t, keys)
})
t.Run("returns the keys", func(t *testing.T) {
expectedKeys := make([]string, 0, 10)
for i := 0; i < 10; i++ {
k := fmt.Sprintf("key-%d", i)
err := store.Set(context.Background(), k, fmt.Sprintf("value-%d", i))
expectedKeys = append(expectedKeys, k)
require.NoError(t, err)
}
keys, err := store.ListKeys(context.Background())
require.NoError(t, err)
sort.Strings(expectedKeys)
sort.Strings(keys)
require.Equal(t, expectedKeys, keys)
})
})
})
t.Run("prefix", func(t *testing.T) {
t.Run("no prefix", func(t *testing.T) {
store := NewCacheKvStore(kvstore.NewFakeKVStore(), namespace)
require.Equal(t, "k", store.storeKey("k"))
})
t.Run("prefix", func(t *testing.T) {
store := NewCacheKvStoreWithPrefix(kvstore.NewFakeKVStore(), namespace, "my-")
require.Equal(t, "my-k", store.storeKey("k"))
})
})
}
func TestMarshal(t *testing.T) {
t.Run("json", func(t *testing.T) {
// Other type (rather than string, []byte or fmt.Stringer) marshals to JSON.
var value struct {
A string `json:"a"`
B string `json:"b"`
}
expV, err := json.Marshal(value)
require.NoError(t, err)
v, err := marshal(value)
require.NoError(t, err)
require.Equal(t, string(expV), v)
})
t.Run("string", func(t *testing.T) {
v, err := marshal("value")
require.NoError(t, err)
require.Equal(t, "value", v)
})
t.Run("stringer", func(t *testing.T) {
var s stringer
v, err := marshal(s)
require.NoError(t, err)
require.Equal(t, s.String(), v)
})
t.Run("byte slice", func(t *testing.T) {
v, err := marshal([]byte("value"))
require.NoError(t, err)
require.Equal(t, "value", v)
})
}
type stringer struct{}
func (s stringer) String() string {
return "aaaa"
}

View File

@ -0,0 +1,3 @@
// Package cachekvstore implements a key-value store that also keeps track of the last update time of the store.
// It can be used to cache data that is updated periodically.
package cachekvstore

View File

@ -101,7 +101,7 @@ func (kr *KeyRetriever) updateKeys(ctx context.Context) error {
if err != nil {
return err
}
if !kr.cfg.PluginForcePublicKeyDownload && time.Since(*lastUpdated) < publicKeySyncInterval {
if !kr.cfg.PluginForcePublicKeyDownload && time.Since(lastUpdated) < publicKeySyncInterval {
// Cache is still valid
return nil
}
@ -170,15 +170,13 @@ func (kr *KeyRetriever) downloadKeys(ctx context.Context) error {
// Delete keys that are no longer in the API
for _, key := range cachedKeys {
if !shouldKeep[key] {
err = kr.kv.Del(ctx, key)
err = kr.kv.Delete(ctx, key)
if err != nil {
return err
}
}
}
// Update the last updated timestamp
return kr.kv.SetLastUpdated(ctx)
return nil
}
func (kr *KeyRetriever) ensureKeys(ctx context.Context) error {

View File

@ -79,7 +79,7 @@ func Test_PublicKeyUpdate(t *testing.T) {
defer v.lock.Unlock()
ti, err := v.kv.GetLastUpdated(context.Background())
require.NoError(t, err)
require.Less(t, time.Time{}, *ti)
require.Less(t, time.Time{}, ti)
})
t.Run("it should remove old keys", func(t *testing.T) {

View File

@ -1,74 +1,25 @@
package keystore
import (
"context"
"fmt"
"time"
"github.com/grafana/grafana/pkg/infra/kvstore"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/services/pluginsintegration/cachekvstore"
)
// Service is a service for storing and retrieving public keys.
type Service struct {
kv *kvstore.NamespacedKVStore
*cachekvstore.CacheKvStore
}
const (
prefix = "key-"
lastUpdatedKey = "last_updated"
namespace = "plugin.publickeys"
prefix = "key-"
)
var _ plugins.KeyStore = (*Service)(nil)
func ProvideService(kv kvstore.KVStore) *Service {
return &Service{
kv: kvstore.WithNamespace(kv, 0, "plugin.publickeys"),
CacheKvStore: cachekvstore.NewCacheKvStoreWithPrefix(kv, namespace, prefix),
}
}
func (s *Service) Get(ctx context.Context, key string) (string, bool, error) {
return s.kv.Get(ctx, prefix+key)
}
func (s *Service) Set(ctx context.Context, key string, value string) error {
return s.kv.Set(ctx, prefix+key, value)
}
func (s *Service) Del(ctx context.Context, key string) error {
return s.kv.Del(ctx, prefix+key)
}
func (s *Service) GetLastUpdated(ctx context.Context) (*time.Time, error) {
lastUpdated := &time.Time{}
if val, ok, err := s.kv.Get(ctx, lastUpdatedKey); err != nil {
return nil, fmt.Errorf("failed to get last updated time: %v", err)
} else if ok {
if parsed, err := time.Parse(time.RFC3339, val); err != nil {
return nil, fmt.Errorf("failed to parse last updated time: %v", err)
} else {
lastUpdated = &parsed
}
}
return lastUpdated, nil
}
func (s *Service) SetLastUpdated(ctx context.Context) error {
lastUpdated := time.Now()
if err := s.kv.Set(ctx, lastUpdatedKey, lastUpdated.Format(time.RFC3339)); err != nil {
return fmt.Errorf("failed to update last updated time: %v", err)
}
return nil
}
func (s *Service) ListKeys(ctx context.Context) ([]string, error) {
keys, err := s.kv.Keys(ctx, prefix)
if err != nil {
return nil, err
}
res := make([]string, 0, len(keys))
for _, key := range keys {
res = append(res, key.Key)
}
return res, nil
}