mirror of
https://github.com/mattermost/mattermost.git
synced 2025-02-25 18:55:24 -06:00
Implement plugin KV store in memory (#26244)
This commit is contained in:
parent
f9861b8666
commit
02379b17ca
@ -2,7 +2,6 @@ package common
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/mattermost/mattermost/server/public/pluginapi"
|
"github.com/mattermost/mattermost/server/public/pluginapi"
|
||||||
)
|
)
|
||||||
@ -11,9 +10,6 @@ var ErrNotFound = errors.New("not found")
|
|||||||
|
|
||||||
type KVStore interface {
|
type KVStore interface {
|
||||||
Set(key string, value interface{}, options ...pluginapi.KVSetOption) (bool, error)
|
Set(key string, value interface{}, options ...pluginapi.KVSetOption) (bool, error)
|
||||||
SetWithExpiry(key string, value interface{}, ttl time.Duration) error
|
|
||||||
CompareAndSet(key string, oldValue, value interface{}) (bool, error)
|
|
||||||
CompareAndDelete(key string, oldValue interface{}) (bool, error)
|
|
||||||
Get(key string, o interface{}) error
|
Get(key string, o interface{}) error
|
||||||
Delete(key string) error
|
Delete(key string) error
|
||||||
DeleteAll() error
|
DeleteAll() error
|
||||||
|
@ -22,7 +22,6 @@ type KVService struct {
|
|||||||
api plugin.API
|
api plugin.API
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Should this be un exported?
|
|
||||||
type KVSetOptions struct {
|
type KVSetOptions struct {
|
||||||
model.PluginKVSetOptions
|
model.PluginKVSetOptions
|
||||||
oldValue interface{}
|
oldValue interface{}
|
||||||
@ -49,7 +48,7 @@ func SetExpiry(ttl time.Duration) KVSetOption {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Set stores a key-value pair, unique per plugin.
|
// Set stores a key-value pair, unique per plugin.
|
||||||
// Keys prefixed with `mmi_` are reserved for use by this package and will fail to be set.
|
// Keys prefixed with `mmi_` are reserved for internal use and will fail to be set.
|
||||||
//
|
//
|
||||||
// Returns (false, err) if DB error occurred
|
// Returns (false, err) if DB error occurred
|
||||||
// Returns (false, nil) if the value was not set
|
// Returns (false, nil) if the value was not set
|
||||||
@ -57,8 +56,8 @@ func SetExpiry(ttl time.Duration) KVSetOption {
|
|||||||
//
|
//
|
||||||
// Minimum server version: 5.18
|
// Minimum server version: 5.18
|
||||||
func (k *KVService) Set(key string, value interface{}, options ...KVSetOption) (bool, error) {
|
func (k *KVService) Set(key string, value interface{}, options ...KVSetOption) (bool, error) {
|
||||||
if strings.HasPrefix(key, "mmi_") {
|
if strings.HasPrefix(key, internalKeyPrefix) {
|
||||||
return false, errors.New("'mmi_' prefix is not allowed for keys")
|
return false, errors.Errorf("'%s' prefix is not allowed for keys", internalKeyPrefix)
|
||||||
}
|
}
|
||||||
|
|
||||||
opts := KVSetOptions{}
|
opts := KVSetOptions{}
|
||||||
@ -103,46 +102,6 @@ func (k *KVService) Set(key string, value interface{}, options ...KVSetOption) (
|
|||||||
return written, normalizeAppErr(appErr)
|
return written, normalizeAppErr(appErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetWithExpiry sets a key-value pair with the given expiration duration relative to now.
|
|
||||||
//
|
|
||||||
// Deprecated: SetWithExpiry exists to streamline adoption of this package for existing plugins.
|
|
||||||
// Use Set with the appropriate options instead.
|
|
||||||
//
|
|
||||||
// Minimum server version: 5.18
|
|
||||||
func (k *KVService) SetWithExpiry(key string, value interface{}, ttl time.Duration) error {
|
|
||||||
_, err := k.Set(key, value, SetExpiry(ttl))
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// CompareAndSet writes a key-value pair if the current value matches the given old value.
|
|
||||||
//
|
|
||||||
// Returns (false, err) if DB error occurred
|
|
||||||
// Returns (false, nil) if the value was not set
|
|
||||||
// Returns (true, nil) if the value was set
|
|
||||||
//
|
|
||||||
// Deprecated: CompareAndSet exists to streamline adoption of this package for existing plugins.
|
|
||||||
// Use Set with the appropriate options instead.
|
|
||||||
//
|
|
||||||
// Minimum server version: 5.18
|
|
||||||
func (k *KVService) CompareAndSet(key string, oldValue, value interface{}) (bool, error) {
|
|
||||||
return k.Set(key, value, SetAtomic(oldValue))
|
|
||||||
}
|
|
||||||
|
|
||||||
// CompareAndDelete deletes a key-value pair if the current value matches the given old value.
|
|
||||||
//
|
|
||||||
// Returns (false, err) if DB error occurred
|
|
||||||
// Returns (false, nil) if current value != oldValue or key does not exist when deleting
|
|
||||||
// Returns (true, nil) if current value == oldValue and the key was deleted
|
|
||||||
//
|
|
||||||
// Deprecated: CompareAndDelete exists to streamline adoption of this package for existing plugins.
|
|
||||||
// Use Set with the appropriate options instead.
|
|
||||||
//
|
|
||||||
// Minimum server version: 5.18
|
|
||||||
func (k *KVService) CompareAndDelete(key string, oldValue interface{}) (bool, error) {
|
|
||||||
return k.Set(key, nil, SetAtomic(oldValue))
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetAtomicWithRetries will set a key-value pair atomically using compare and set semantics:
|
// SetAtomicWithRetries will set a key-value pair atomically using compare and set semantics:
|
||||||
// it will read key's value (to get oldValue), perform valueFunc (to get newValue),
|
// it will read key's value (to get oldValue), perform valueFunc (to get newValue),
|
||||||
// and compare and set (comparing oldValue and setting newValue).
|
// and compare and set (comparing oldValue and setting newValue).
|
||||||
|
284
server/public/pluginapi/kv_memory.go
Normal file
284
server/public/pluginapi/kv_memory.go
Normal file
@ -0,0 +1,284 @@
|
|||||||
|
package pluginapi
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
"unicode/utf8"
|
||||||
|
|
||||||
|
"github.com/mattermost/mattermost/server/public/model"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
// MemoryStore is an implementation of the plugin KV store API for testing.
|
||||||
|
// It's not meant for production use.
|
||||||
|
// It's safe for concurrent use by multiple goroutines.
|
||||||
|
type MemoryStore struct {
|
||||||
|
mux sync.RWMutex
|
||||||
|
elems map[string]kvElem
|
||||||
|
}
|
||||||
|
|
||||||
|
type kvElem struct {
|
||||||
|
value []byte
|
||||||
|
expiresAt *time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e kvElem) isExpired() bool {
|
||||||
|
return e.expiresAt != nil && e.expiresAt.Before(time.Now())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set stores a key-value pair, unique per plugin.
|
||||||
|
// Keys prefixed with `mmi_` are reserved for internal use and will fail to be set.
|
||||||
|
//
|
||||||
|
// Returns (false, err) if DB error occurred
|
||||||
|
// Returns (false, nil) if the value was not set
|
||||||
|
// Returns (true, nil) if the value was set
|
||||||
|
func (s *MemoryStore) Set(key string, value any, options ...KVSetOption) (bool, error) {
|
||||||
|
if key == "" {
|
||||||
|
return false, errors.New("key must not be empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
if strings.HasPrefix(key, internalKeyPrefix) {
|
||||||
|
return false, errors.Errorf("'%s' prefix is not allowed for keys", internalKeyPrefix)
|
||||||
|
}
|
||||||
|
|
||||||
|
if utf8.RuneCountInString(key) > model.KeyValueKeyMaxRunes {
|
||||||
|
return false, errors.Errorf("key must not be longer then %d", model.KeyValueKeyMaxRunes)
|
||||||
|
}
|
||||||
|
|
||||||
|
opts := KVSetOptions{}
|
||||||
|
for _, o := range options {
|
||||||
|
if o != nil {
|
||||||
|
o(&opts)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var valueBytes []byte
|
||||||
|
if value != nil {
|
||||||
|
// Assume JSON encoding, unless explicitly given a byte slice.
|
||||||
|
var isValueInBytes bool
|
||||||
|
valueBytes, isValueInBytes = value.([]byte)
|
||||||
|
if !isValueInBytes {
|
||||||
|
var err error
|
||||||
|
valueBytes, err = json.Marshal(value)
|
||||||
|
if err != nil {
|
||||||
|
return false, errors.Wrapf(err, "failed to marshal value %v", value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
downstreamOpts := model.PluginKVSetOptions{
|
||||||
|
Atomic: opts.Atomic,
|
||||||
|
ExpireInSeconds: opts.ExpireInSeconds,
|
||||||
|
}
|
||||||
|
|
||||||
|
if opts.oldValue != nil {
|
||||||
|
oldValueBytes, isOldValueInBytes := opts.oldValue.([]byte)
|
||||||
|
if isOldValueInBytes {
|
||||||
|
downstreamOpts.OldValue = oldValueBytes
|
||||||
|
} else {
|
||||||
|
data, err := json.Marshal(opts.oldValue)
|
||||||
|
if err != nil {
|
||||||
|
return false, errors.Wrapf(err, "failed to marshal value %v", opts.oldValue)
|
||||||
|
}
|
||||||
|
|
||||||
|
downstreamOpts.OldValue = data
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := downstreamOpts.IsValid(); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.mux.Lock()
|
||||||
|
defer s.mux.Unlock()
|
||||||
|
|
||||||
|
if s.elems == nil {
|
||||||
|
s.elems = make(map[string]kvElem)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !opts.Atomic {
|
||||||
|
if value == nil {
|
||||||
|
delete(s.elems, key)
|
||||||
|
} else {
|
||||||
|
s.elems[key] = kvElem{
|
||||||
|
value: valueBytes,
|
||||||
|
expiresAt: expireTime(downstreamOpts.ExpireInSeconds),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
oldElem := s.elems[key]
|
||||||
|
if !oldElem.isExpired() && !bytes.Equal(oldElem.value, downstreamOpts.OldValue) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if value == nil {
|
||||||
|
delete(s.elems, key)
|
||||||
|
} else {
|
||||||
|
s.elems[key] = kvElem{
|
||||||
|
value: valueBytes,
|
||||||
|
expiresAt: expireTime(downstreamOpts.ExpireInSeconds),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MemoryStore) SetAtomicWithRetries(key string, valueFunc func(oldValue []byte) (newValue any, err error)) error {
|
||||||
|
if valueFunc == nil {
|
||||||
|
return errors.New("function must not be nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < numRetries; i++ {
|
||||||
|
var oldVal []byte
|
||||||
|
if err := s.Get(key, &oldVal); err != nil {
|
||||||
|
return errors.Wrapf(err, "failed to get value for key %s", key)
|
||||||
|
}
|
||||||
|
|
||||||
|
newVal, err := valueFunc(oldVal)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "valueFunc failed")
|
||||||
|
}
|
||||||
|
|
||||||
|
if saved, err := s.Set(key, newVal, SetAtomic(oldVal)); err != nil {
|
||||||
|
return errors.Wrapf(err, "DB failed to set value for key %s", key)
|
||||||
|
} else if saved {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// small delay to allow cooperative scheduling to do its thing
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
}
|
||||||
|
return errors.Errorf("failed to set value after %d retries", numRetries)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MemoryStore) ListKeys(page int, count int, options ...ListKeysOption) ([]string, error) {
|
||||||
|
if page < 0 {
|
||||||
|
return nil, errors.New("page number must not be negative")
|
||||||
|
}
|
||||||
|
|
||||||
|
if count < 0 {
|
||||||
|
return nil, errors.New("count must not be negative")
|
||||||
|
}
|
||||||
|
|
||||||
|
if count == 0 {
|
||||||
|
return []string{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
opt := listKeysOptions{}
|
||||||
|
for _, o := range options {
|
||||||
|
if o != nil {
|
||||||
|
o(&opt)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
allKeys := make([]string, 0)
|
||||||
|
s.mux.RLock()
|
||||||
|
for k, e := range s.elems {
|
||||||
|
if e.isExpired() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
allKeys = append(allKeys, k)
|
||||||
|
}
|
||||||
|
s.mux.RUnlock()
|
||||||
|
|
||||||
|
if len(allKeys) == 0 {
|
||||||
|
return []string{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Use slices.Sort once the toolchain got updated to go1.21
|
||||||
|
sort.Strings(allKeys)
|
||||||
|
|
||||||
|
pageKeys := paginateSlice(allKeys, page, count)
|
||||||
|
|
||||||
|
if len(opt.checkers) == 0 {
|
||||||
|
return pageKeys, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
n := 0
|
||||||
|
for _, k := range pageKeys {
|
||||||
|
keep := true
|
||||||
|
for _, c := range opt.checkers {
|
||||||
|
ok, err := c(k)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
keep = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if keep {
|
||||||
|
pageKeys[n] = k
|
||||||
|
n++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return pageKeys[:n], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MemoryStore) Get(key string, o any) error {
|
||||||
|
s.mux.RLock()
|
||||||
|
e, ok := s.elems[key]
|
||||||
|
s.mux.RUnlock()
|
||||||
|
if !ok || len(e.value) == 0 || e.isExpired() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if bytesOut, ok := o.(*[]byte); ok {
|
||||||
|
*bytesOut = e.value
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := json.Unmarshal(e.value, o); err != nil {
|
||||||
|
return errors.Wrapf(err, "failed to unmarshal value for key %s", key)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MemoryStore) Delete(key string) error {
|
||||||
|
s.mux.Lock()
|
||||||
|
delete(s.elems, key)
|
||||||
|
s.mux.Unlock()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteAll removes all key-value pairs.
|
||||||
|
func (s *MemoryStore) DeleteAll() error {
|
||||||
|
s.mux.Lock()
|
||||||
|
s.elems = make(map[string]kvElem)
|
||||||
|
s.mux.Unlock()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func expireTime(expireInSeconds int64) *time.Time {
|
||||||
|
if expireInSeconds == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
t := time.Now().Add(time.Second * time.Duration(expireInSeconds))
|
||||||
|
return &t
|
||||||
|
}
|
||||||
|
|
||||||
|
func paginateSlice[T any](list []T, page int, perPage int) []T {
|
||||||
|
i := page * perPage
|
||||||
|
j := (page + 1) * perPage
|
||||||
|
l := len(list)
|
||||||
|
if j > l {
|
||||||
|
j = l
|
||||||
|
}
|
||||||
|
if i > l {
|
||||||
|
i = l
|
||||||
|
}
|
||||||
|
return list[i:j]
|
||||||
|
}
|
469
server/public/pluginapi/kv_memory_test.go
Normal file
469
server/public/pluginapi/kv_memory_test.go
Normal file
@ -0,0 +1,469 @@
|
|||||||
|
package pluginapi_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/mattermost/mattermost/server/public/pluginapi"
|
||||||
|
)
|
||||||
|
|
||||||
|
// kvStore is used to check that KVService and MemoryStore implement the same interface.
|
||||||
|
// Methods names are sorted alphabetically for easier comparison.
|
||||||
|
type kvStore interface {
|
||||||
|
Delete(key string) error
|
||||||
|
DeleteAll() error
|
||||||
|
Get(key string, o any) error
|
||||||
|
ListKeys(page, count int, options ...pluginapi.ListKeysOption) ([]string, error)
|
||||||
|
Set(key string, value any, options ...pluginapi.KVSetOption) (bool, error)
|
||||||
|
SetAtomicWithRetries(key string, valueFunc func(oldValue []byte) (newValue any, err error)) error
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ kvStore = (*pluginapi.MemoryStore)(nil)
|
||||||
|
var _ kvStore = (*pluginapi.KVService)(nil)
|
||||||
|
|
||||||
|
func TestMemoryStoreSet(t *testing.T) {
|
||||||
|
t.Run("empty key", func(t *testing.T) {
|
||||||
|
store := pluginapi.MemoryStore{}
|
||||||
|
ok, err := store.Set("", []byte("value"))
|
||||||
|
assert.Error(t, err)
|
||||||
|
assert.False(t, ok)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("key has mmi_ prefix", func(t *testing.T) {
|
||||||
|
store := pluginapi.MemoryStore{}
|
||||||
|
ok, err := store.Set("mmi_foo", []byte("value"))
|
||||||
|
assert.Error(t, err)
|
||||||
|
assert.False(t, ok)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("nil map", func(t *testing.T) {
|
||||||
|
store := pluginapi.MemoryStore{}
|
||||||
|
ok, err := store.Set("key", []byte("value"))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.True(t, ok)
|
||||||
|
|
||||||
|
var out []byte
|
||||||
|
err = store.Get("key", &out)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, []byte("value"), out)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("atomic with no old value", func(t *testing.T) {
|
||||||
|
store := pluginapi.MemoryStore{}
|
||||||
|
ok, err := store.Set("key", []byte("value"), pluginapi.SetAtomic([]byte("old")))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.False(t, ok)
|
||||||
|
|
||||||
|
isNil(t, &store, "key")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("atomic with same old value", func(t *testing.T) {
|
||||||
|
store := pluginapi.MemoryStore{}
|
||||||
|
|
||||||
|
ok, err := store.Set("key", []byte("old"))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.True(t, ok)
|
||||||
|
|
||||||
|
ok, err = store.Set("key", []byte("new"), pluginapi.SetAtomic([]byte("old")))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.True(t, ok)
|
||||||
|
|
||||||
|
var out []byte
|
||||||
|
err = store.Get("key", &out)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, []byte("new"), out)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("setting to nil is deleting", func(t *testing.T) {
|
||||||
|
store := pluginapi.MemoryStore{}
|
||||||
|
|
||||||
|
ok, err := store.Set("key", []byte("value"))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.True(t, ok)
|
||||||
|
|
||||||
|
ok, err = store.Set("key", nil)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.True(t, ok)
|
||||||
|
|
||||||
|
isNil(t, &store, "key")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("atomicly setting to nil is deleting", func(t *testing.T) {
|
||||||
|
store := pluginapi.MemoryStore{}
|
||||||
|
|
||||||
|
ok, err := store.Set("key", []byte("old"))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.True(t, ok)
|
||||||
|
|
||||||
|
ok, err = store.Set("key", nil, pluginapi.SetAtomic([]byte("old")))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.True(t, ok)
|
||||||
|
|
||||||
|
isNil(t, &store, "key")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("with long expiry", func(t *testing.T) {
|
||||||
|
store := pluginapi.MemoryStore{}
|
||||||
|
|
||||||
|
ok, err := store.Set("key", []byte("value"), pluginapi.SetExpiry(time.Minute))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.True(t, ok)
|
||||||
|
|
||||||
|
var out []byte
|
||||||
|
err = store.Get("key", &out)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, []byte("value"), out)
|
||||||
|
|
||||||
|
ok, err = store.Set("key", []byte("value"), pluginapi.SetExpiry(time.Second))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.True(t, ok)
|
||||||
|
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
|
||||||
|
isNil(t, &store, "key")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("concurrent writes", func(t *testing.T) {
|
||||||
|
store := pluginapi.MemoryStore{}
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
const n = 100
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
i := i
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
ok, err := store.Set(fmt.Sprintf("k_%d", i), []byte("value"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, ok)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
var out []byte
|
||||||
|
err := store.Get(fmt.Sprintf("k_%d", i), &out)
|
||||||
|
assert.NoError(t, err, "i=%d", i)
|
||||||
|
assert.Equal(t, []byte("value"), out, "i=%d", i)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMemoryStoreSetAtomicWithRetries(t *testing.T) {
|
||||||
|
t.Run("nil function", func(t *testing.T) {
|
||||||
|
store := pluginapi.MemoryStore{}
|
||||||
|
err := store.SetAtomicWithRetries("key", nil)
|
||||||
|
assert.Error(t, err)
|
||||||
|
|
||||||
|
isNil(t, &store, "key")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("old value not found", func(t *testing.T) {
|
||||||
|
store := pluginapi.MemoryStore{}
|
||||||
|
err := store.SetAtomicWithRetries("key", func(oldValue []byte) (any, error) { return []byte("new"), nil })
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var out []byte
|
||||||
|
err = store.Get("key", &out)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, []byte("new"), out)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("old value not found", func(t *testing.T) {
|
||||||
|
store := pluginapi.MemoryStore{}
|
||||||
|
err := store.SetAtomicWithRetries("key", func(oldValue []byte) (any, error) { return nil, errors.New("some error") })
|
||||||
|
require.Error(t, err)
|
||||||
|
|
||||||
|
isNil(t, &store, "key")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("two goroutines race", func(t *testing.T) {
|
||||||
|
store := pluginapi.MemoryStore{}
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
const n = 10
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
i := i
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
err := store.SetAtomicWithRetries("key", func(oldValue []byte) (any, error) { return fmt.Sprintf("k_%d", i), nil })
|
||||||
|
require.NoError(t, err)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// It undefinded, which goroutine wins the final write. Just check that any value was written.
|
||||||
|
var out string
|
||||||
|
err := store.Get("key", &out)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.True(t, strings.HasPrefix(out, "k_"))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMemoryStoreListKeys(t *testing.T) {
|
||||||
|
t.Run("nil map", func(t *testing.T) {
|
||||||
|
store := pluginapi.MemoryStore{}
|
||||||
|
keys, err := store.ListKeys(0, 200)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Len(t, keys, 0)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("zero count", func(t *testing.T) {
|
||||||
|
store := pluginapi.MemoryStore{}
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
ok, err := store.Set(fmt.Sprintf("k_%d", i), "foo")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, ok)
|
||||||
|
}
|
||||||
|
keys, err := store.ListKeys(1, 0)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Len(t, keys, 0)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("negative count", func(t *testing.T) {
|
||||||
|
store := pluginapi.MemoryStore{}
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
ok, err := store.Set(fmt.Sprintf("k_%d", i), "foo")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, ok)
|
||||||
|
}
|
||||||
|
keys, err := store.ListKeys(0, -1)
|
||||||
|
assert.Error(t, err)
|
||||||
|
assert.Len(t, keys, 0)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("negative page", func(t *testing.T) {
|
||||||
|
store := pluginapi.MemoryStore{}
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
ok, err := store.Set(fmt.Sprintf("k_%d", i), "foo")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, ok)
|
||||||
|
}
|
||||||
|
keys, err := store.ListKeys(-1, 200)
|
||||||
|
assert.Error(t, err)
|
||||||
|
assert.Len(t, keys, 0)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("single page", func(t *testing.T) {
|
||||||
|
store := pluginapi.MemoryStore{}
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
ok, err := store.Set(fmt.Sprintf("k_%d", i), "foo")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, ok)
|
||||||
|
}
|
||||||
|
keys, err := store.ListKeys(0, 200)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Len(t, keys, 10)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("multiple pages", func(t *testing.T) {
|
||||||
|
store := pluginapi.MemoryStore{}
|
||||||
|
for i := 0; i < 7; i++ {
|
||||||
|
ok, err := store.Set(fmt.Sprintf("k_%d", i), "foo")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, ok)
|
||||||
|
}
|
||||||
|
keys, err := store.ListKeys(0, 3)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, []string{"k_0", "k_1", "k_2"}, keys)
|
||||||
|
|
||||||
|
keys, err = store.ListKeys(1, 3)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, []string{"k_3", "k_4", "k_5"}, keys)
|
||||||
|
|
||||||
|
keys, err = store.ListKeys(2, 3)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, []string{"k_6"}, keys)
|
||||||
|
|
||||||
|
keys, err = store.ListKeys(5, 100)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, []string{}, keys)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("with checker", func(t *testing.T) {
|
||||||
|
store := pluginapi.MemoryStore{}
|
||||||
|
odd := func(key string) (bool, error) {
|
||||||
|
s := strings.Split(key, "_")
|
||||||
|
if len(s) != 2 {
|
||||||
|
return false, errors.Errorf("wrongly formated key %v", key)
|
||||||
|
}
|
||||||
|
i, err := strconv.Atoi(s[1])
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return i%2 == 1, nil
|
||||||
|
}
|
||||||
|
even := func(key string) (bool, error) {
|
||||||
|
s := strings.Split(key, "_")
|
||||||
|
if len(s) != 2 {
|
||||||
|
return false, errors.Errorf("wrongly formated key %v", key)
|
||||||
|
}
|
||||||
|
i, err := strconv.Atoi(s[1])
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return i%2 == 0, nil
|
||||||
|
}
|
||||||
|
for i := 0; i < 7; i++ {
|
||||||
|
ok, err := store.Set(fmt.Sprintf("k_%d", i), "foo")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, ok)
|
||||||
|
}
|
||||||
|
keys, err := store.ListKeys(0, 3, pluginapi.WithChecker(even))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, []string{"k_0", "k_2"}, keys)
|
||||||
|
|
||||||
|
keys, err = store.ListKeys(0, 3, pluginapi.WithChecker(odd))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, []string{"k_1"}, keys)
|
||||||
|
|
||||||
|
keys, err = store.ListKeys(0, 3, pluginapi.WithChecker(odd), pluginapi.WithChecker(even))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, []string{}, keys)
|
||||||
|
|
||||||
|
keys, err = store.ListKeys(1, 3)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, []string{"k_3", "k_4", "k_5"}, keys)
|
||||||
|
|
||||||
|
keys, err = store.ListKeys(2, 3)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, []string{"k_6"}, keys)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("with expired entries", func(t *testing.T) {
|
||||||
|
store := pluginapi.MemoryStore{}
|
||||||
|
for i := 0; i < 7; i++ {
|
||||||
|
var opt pluginapi.KVSetOption
|
||||||
|
if i%2 == 1 {
|
||||||
|
opt = pluginapi.SetExpiry(1 * time.Second)
|
||||||
|
}
|
||||||
|
ok, err := store.Set(fmt.Sprintf("k_%d", i), "foo", opt)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, ok)
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
|
keys, err := store.ListKeys(0, 5)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, []string{"k_0", "k_2", "k_4", "k_6"}, keys)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMemoryStoreGet(t *testing.T) {
|
||||||
|
t.Run("nil map", func(t *testing.T) {
|
||||||
|
store := pluginapi.MemoryStore{}
|
||||||
|
isNil(t, &store, "key")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("set empty byte slice", func(t *testing.T) {
|
||||||
|
store := pluginapi.MemoryStore{}
|
||||||
|
in := []byte("")
|
||||||
|
|
||||||
|
ok, err := store.Set("key", in)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.True(t, ok)
|
||||||
|
|
||||||
|
isNil(t, &store, "key")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("set and get byte slice", func(t *testing.T) {
|
||||||
|
store := pluginapi.MemoryStore{}
|
||||||
|
in := []byte("foo")
|
||||||
|
|
||||||
|
ok, err := store.Set("key", in)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.True(t, ok)
|
||||||
|
|
||||||
|
var out []byte
|
||||||
|
err = store.Get("key", &out)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, []byte("foo"), out)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("set and get struct slice", func(t *testing.T) {
|
||||||
|
store := pluginapi.MemoryStore{}
|
||||||
|
|
||||||
|
type myStruct struct {
|
||||||
|
Int int
|
||||||
|
String string
|
||||||
|
unExported bool
|
||||||
|
}
|
||||||
|
in := myStruct{
|
||||||
|
Int: 1,
|
||||||
|
String: "s",
|
||||||
|
unExported: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
ok, err := store.Set("key", in)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.True(t, ok)
|
||||||
|
|
||||||
|
var out myStruct
|
||||||
|
err = store.Get("key", &out)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, myStruct{Int: 1, String: "s"}, out)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMemoryStoreDelete(t *testing.T) {
|
||||||
|
t.Run("nil map", func(t *testing.T) {
|
||||||
|
store := pluginapi.MemoryStore{}
|
||||||
|
err := store.Delete("some key")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMemoryStoreDeleteAll(t *testing.T) {
|
||||||
|
t.Run("nil map", func(t *testing.T) {
|
||||||
|
store := pluginapi.MemoryStore{}
|
||||||
|
err := store.DeleteAll()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
keys, err := store.ListKeys(0, 200)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Empty(t, keys)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("nil map", func(t *testing.T) {
|
||||||
|
store := pluginapi.MemoryStore{}
|
||||||
|
|
||||||
|
ok, err := store.Set("k_1", "foo")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, ok)
|
||||||
|
|
||||||
|
err = store.DeleteAll()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
keys, err := store.ListKeys(0, 200)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Empty(t, keys)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("idempotent", func(t *testing.T) {
|
||||||
|
store := pluginapi.MemoryStore{}
|
||||||
|
err := store.DeleteAll()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
err = store.DeleteAll()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
keys, err := store.ListKeys(0, 200)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Empty(t, keys)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func isNil(t *testing.T, store *pluginapi.MemoryStore, key string) {
|
||||||
|
var out []byte
|
||||||
|
err := store.Get(key, &out)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Nil(t, out)
|
||||||
|
}
|
@ -7,7 +7,6 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
@ -155,49 +154,6 @@ func TestKVSet(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSetWithExpiry(t *testing.T) {
|
|
||||||
api := &plugintest.API{}
|
|
||||||
defer api.AssertExpectations(t)
|
|
||||||
client := pluginapi.NewClient(api, &plugintest.Driver{})
|
|
||||||
|
|
||||||
api.On("KVSetWithOptions", "1", []byte(`2`), model.PluginKVSetOptions{
|
|
||||||
ExpireInSeconds: 60,
|
|
||||||
}).Return(true, nil)
|
|
||||||
|
|
||||||
err := client.KV.SetWithExpiry("1", 2, time.Minute)
|
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCompareAndSet(t *testing.T) {
|
|
||||||
api := &plugintest.API{}
|
|
||||||
defer api.AssertExpectations(t)
|
|
||||||
client := pluginapi.NewClient(api, &plugintest.Driver{})
|
|
||||||
|
|
||||||
api.On("KVSetWithOptions", "1", []byte("2"), model.PluginKVSetOptions{
|
|
||||||
Atomic: true,
|
|
||||||
OldValue: []byte("3"),
|
|
||||||
}).Return(true, nil)
|
|
||||||
|
|
||||||
upserted, err := client.KV.CompareAndSet("1", 3, 2)
|
|
||||||
require.NoError(t, err)
|
|
||||||
assert.True(t, upserted)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCompareAndDelete(t *testing.T) {
|
|
||||||
api := &plugintest.API{}
|
|
||||||
defer api.AssertExpectations(t)
|
|
||||||
client := pluginapi.NewClient(api, &plugintest.Driver{})
|
|
||||||
|
|
||||||
api.On("KVSetWithOptions", "1", []byte(nil), model.PluginKVSetOptions{
|
|
||||||
Atomic: true,
|
|
||||||
OldValue: []byte("2"),
|
|
||||||
}).Return(true, nil)
|
|
||||||
|
|
||||||
deleted, err := client.KV.CompareAndDelete("1", 2)
|
|
||||||
require.NoError(t, err)
|
|
||||||
assert.True(t, deleted)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSetAtomicWithRetries(t *testing.T) {
|
func TestSetAtomicWithRetries(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
|
Loading…
Reference in New Issue
Block a user