mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
add key/value store service (#36868)
* add key/value store service * don't export kvStoreSQL, consumers should interact with KVStore & NamespacedKVStore * add del method, avoid ErrNotFound (#38627) * switch value column to medium text Co-authored-by: Alexander Emelin <frvzmb@gmail.com>
This commit is contained in:
parent
dd24995852
commit
681de1ea89
50
pkg/infra/kvstore/kvstore.go
Normal file
50
pkg/infra/kvstore/kvstore.go
Normal file
@ -0,0 +1,50 @@
|
||||
package kvstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||
)
|
||||
|
||||
func ProvideService(sqlStore *sqlstore.SQLStore) KVStore {
|
||||
return &kvStoreSQL{
|
||||
sqlStore: sqlStore,
|
||||
log: log.New("infra.kvstore.sql"),
|
||||
}
|
||||
}
|
||||
|
||||
// KVStore is an interface for k/v store.
|
||||
type KVStore interface {
|
||||
Get(ctx context.Context, orgId int64, namespace string, key string) (string, bool, error)
|
||||
Set(ctx context.Context, orgId int64, namespace string, key string, value string) error
|
||||
Del(ctx context.Context, orgId int64, namespace string, key string) error
|
||||
}
|
||||
|
||||
// WithNamespace returns a kvstore wrapper with fixed orgId and namespace.
|
||||
func WithNamespace(kv KVStore, orgId int64, namespace string) *NamespacedKVStore {
|
||||
return &NamespacedKVStore{
|
||||
kvStore: kv,
|
||||
orgId: orgId,
|
||||
namespace: namespace,
|
||||
}
|
||||
}
|
||||
|
||||
// NamespacedKVStore is a KVStore wrapper with fixed orgId and namespace.
|
||||
type NamespacedKVStore struct {
|
||||
kvStore KVStore
|
||||
orgId int64
|
||||
namespace string
|
||||
}
|
||||
|
||||
func (kv *NamespacedKVStore) Get(ctx context.Context, key string) (string, bool, error) {
|
||||
return kv.kvStore.Get(ctx, kv.orgId, kv.namespace, key)
|
||||
}
|
||||
|
||||
func (kv *NamespacedKVStore) Set(ctx context.Context, key string, value string) error {
|
||||
return kv.kvStore.Set(ctx, kv.orgId, kv.namespace, key, value)
|
||||
}
|
||||
|
||||
func (kv *NamespacedKVStore) Del(ctx context.Context, key string) error {
|
||||
return kv.kvStore.Del(ctx, kv.orgId, kv.namespace, key)
|
||||
}
|
168
pkg/infra/kvstore/kvstore_test.go
Normal file
168
pkg/infra/kvstore/kvstore_test.go
Normal file
@ -0,0 +1,168 @@
|
||||
package kvstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||
)
|
||||
|
||||
func createTestableKVStore(t *testing.T) KVStore {
|
||||
t.Helper()
|
||||
|
||||
sqlStore := sqlstore.InitTestDB(t)
|
||||
|
||||
kv := &kvStoreSQL{
|
||||
sqlStore: sqlStore,
|
||||
log: log.New("infra.kvstore.sql"),
|
||||
}
|
||||
|
||||
return kv
|
||||
}
|
||||
|
||||
type TestCase struct {
|
||||
OrgId int64
|
||||
Namespace string
|
||||
Key string
|
||||
Revision int64
|
||||
}
|
||||
|
||||
func (t *TestCase) Value() string {
|
||||
return fmt.Sprintf("%d:%s:%s:%d", t.OrgId, t.Namespace, t.Key, t.Revision)
|
||||
}
|
||||
|
||||
func TestKVStore(t *testing.T) {
|
||||
kv := createTestableKVStore(t)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
testCases := []*TestCase{
|
||||
{
|
||||
OrgId: 0,
|
||||
Namespace: "testing1",
|
||||
Key: "key1",
|
||||
},
|
||||
{
|
||||
OrgId: 0,
|
||||
Namespace: "testing2",
|
||||
Key: "key1",
|
||||
},
|
||||
{
|
||||
OrgId: 1,
|
||||
Namespace: "testing1",
|
||||
Key: "key1",
|
||||
},
|
||||
{
|
||||
OrgId: 1,
|
||||
Namespace: "testing3",
|
||||
Key: "key1",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
err := kv.Set(ctx, tc.OrgId, tc.Namespace, tc.Key, tc.Value())
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
t.Run("get existing keys", func(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
value, ok, err := kv.Get(ctx, tc.OrgId, tc.Namespace, tc.Key)
|
||||
require.NoError(t, err)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, tc.Value(), value)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("get nonexistent keys", func(t *testing.T) {
|
||||
tcs := []*TestCase{
|
||||
{
|
||||
OrgId: 0,
|
||||
Namespace: "testing1",
|
||||
Key: "key2",
|
||||
},
|
||||
{
|
||||
OrgId: 1,
|
||||
Namespace: "testing2",
|
||||
Key: "key1",
|
||||
},
|
||||
{
|
||||
OrgId: 1,
|
||||
Namespace: "testing3",
|
||||
Key: "key2",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tcs {
|
||||
value, ok, err := kv.Get(ctx, tc.OrgId, tc.Namespace, tc.Key)
|
||||
require.Nil(t, err)
|
||||
require.False(t, ok)
|
||||
require.Equal(t, "", value)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("modify existing key", func(t *testing.T) {
|
||||
tc := testCases[0]
|
||||
|
||||
value, ok, err := kv.Get(ctx, tc.OrgId, tc.Namespace, tc.Key)
|
||||
require.NoError(t, err)
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, tc.Value(), value)
|
||||
|
||||
tc.Revision += 1
|
||||
|
||||
err = kv.Set(ctx, tc.OrgId, tc.Namespace, tc.Key, tc.Value())
|
||||
require.NoError(t, err)
|
||||
|
||||
value, ok, err = kv.Get(ctx, tc.OrgId, tc.Namespace, tc.Key)
|
||||
require.NoError(t, err)
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, tc.Value(), value)
|
||||
})
|
||||
|
||||
t.Run("use namespaced client", func(t *testing.T) {
|
||||
tc := testCases[0]
|
||||
|
||||
client := WithNamespace(kv, tc.OrgId, tc.Namespace)
|
||||
|
||||
value, ok, err := client.Get(ctx, tc.Key)
|
||||
require.NoError(t, err)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, tc.Value(), value)
|
||||
|
||||
tc.Revision += 1
|
||||
|
||||
err = client.Set(ctx, tc.Key, tc.Value())
|
||||
require.NoError(t, err)
|
||||
|
||||
value, ok, err = client.Get(ctx, tc.Key)
|
||||
require.NoError(t, err)
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, tc.Value(), value)
|
||||
})
|
||||
|
||||
t.Run("deleting keys", func(t *testing.T) {
|
||||
var stillHasKeys bool
|
||||
for _, tc := range testCases {
|
||||
if _, ok, err := kv.Get(ctx, tc.OrgId, tc.Namespace, tc.Key); err == nil && ok {
|
||||
stillHasKeys = true
|
||||
break
|
||||
}
|
||||
}
|
||||
require.True(t, stillHasKeys,
|
||||
"we are going to test key deletion, but there are no keys to delete in the database")
|
||||
for _, tc := range testCases {
|
||||
err := kv.Del(ctx, tc.OrgId, tc.Namespace, tc.Key)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
_, ok, err := kv.Get(ctx, tc.OrgId, tc.Namespace, tc.Key)
|
||||
require.NoError(t, err)
|
||||
require.False(t, ok, "all keys should be deleted at this point")
|
||||
}
|
||||
})
|
||||
}
|
21
pkg/infra/kvstore/model.go
Normal file
21
pkg/infra/kvstore/model.go
Normal file
@ -0,0 +1,21 @@
|
||||
package kvstore
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// Item stored in k/v store.
|
||||
type Item struct {
|
||||
Id int64
|
||||
OrgId *int64
|
||||
Namespace *string
|
||||
Key *string
|
||||
Value string
|
||||
|
||||
Created time.Time
|
||||
Updated time.Time
|
||||
}
|
||||
|
||||
func (i *Item) TableName() string {
|
||||
return "kv_store"
|
||||
}
|
95
pkg/infra/kvstore/sql.go
Normal file
95
pkg/infra/kvstore/sql.go
Normal file
@ -0,0 +1,95 @@
|
||||
package kvstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||
)
|
||||
|
||||
// kvStoreSQL provides a key/value store backed by the Grafana database
|
||||
type kvStoreSQL struct {
|
||||
log log.Logger
|
||||
sqlStore *sqlstore.SQLStore
|
||||
}
|
||||
|
||||
// Get an item from the store
|
||||
func (kv *kvStoreSQL) Get(ctx context.Context, orgId int64, namespace string, key string) (string, bool, error) {
|
||||
item := Item{
|
||||
OrgId: &orgId,
|
||||
Namespace: &namespace,
|
||||
Key: &key,
|
||||
}
|
||||
var itemFound bool
|
||||
|
||||
err := kv.sqlStore.WithDbSession(ctx, func(dbSession *sqlstore.DBSession) error {
|
||||
has, err := dbSession.Get(&item)
|
||||
if err != nil {
|
||||
kv.log.Debug("error getting kvstore value", "orgId", orgId, "namespace", namespace, "key", key, "err", err)
|
||||
return err
|
||||
}
|
||||
if !has {
|
||||
kv.log.Debug("kvstore value not found", "orgId", orgId, "namespace", namespace, "key", key)
|
||||
return nil
|
||||
}
|
||||
itemFound = true
|
||||
kv.log.Debug("got kvstore value", "orgId", orgId, "namespace", namespace, "key", key, "value", item.Value)
|
||||
return nil
|
||||
})
|
||||
|
||||
return item.Value, itemFound, err
|
||||
}
|
||||
|
||||
// Set an item in the store
|
||||
func (kv *kvStoreSQL) Set(ctx context.Context, orgId int64, namespace string, key string, value string) error {
|
||||
return kv.sqlStore.WithTransactionalDbSession(ctx, func(dbSession *sqlstore.DBSession) error {
|
||||
item := Item{
|
||||
OrgId: &orgId,
|
||||
Namespace: &namespace,
|
||||
Key: &key,
|
||||
}
|
||||
|
||||
has, err := dbSession.Get(&item)
|
||||
if err != nil {
|
||||
kv.log.Debug("error checking kvstore value", "orgId", orgId, "namespace", namespace, "key", key, "value", value, "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if has && item.Value == value {
|
||||
kv.log.Debug("kvstore value not changed", "orgId", orgId, "namespace", namespace, "key", key, "value", value)
|
||||
return nil
|
||||
}
|
||||
|
||||
item.Value = value
|
||||
item.Updated = time.Now()
|
||||
|
||||
if has {
|
||||
_, err = dbSession.ID(item.Id).Update(&item)
|
||||
if err != nil {
|
||||
kv.log.Debug("error updating kvstore value", "orgId", orgId, "namespace", namespace, "key", key, "value", value, "err", err)
|
||||
} else {
|
||||
kv.log.Debug("kvstore value updated", "orgId", orgId, "namespace", namespace, "key", key, "value", value)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
item.Created = item.Updated
|
||||
_, err = dbSession.Insert(&item)
|
||||
if err != nil {
|
||||
kv.log.Debug("error inserting kvstore value", "orgId", orgId, "namespace", namespace, "key", key, "value", value, "err", err)
|
||||
} else {
|
||||
kv.log.Debug("kvstore value inserted", "orgId", orgId, "namespace", namespace, "key", key, "value", value)
|
||||
}
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
// Del deletes an item from the store.
|
||||
func (kv *kvStoreSQL) Del(ctx context.Context, orgId int64, namespace string, key string) error {
|
||||
err := kv.sqlStore.WithDbSession(ctx, func(dbSession *sqlstore.DBSession) error {
|
||||
_, err := dbSession.Exec("DELETE FROM kv_store WHERE org_id=? and namespace=? and key=?", orgId, namespace, key)
|
||||
return err
|
||||
})
|
||||
return err
|
||||
}
|
@ -11,6 +11,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/bus"
|
||||
"github.com/grafana/grafana/pkg/infra/httpclient"
|
||||
"github.com/grafana/grafana/pkg/infra/httpclient/httpclientprovider"
|
||||
"github.com/grafana/grafana/pkg/infra/kvstore"
|
||||
"github.com/grafana/grafana/pkg/infra/localcache"
|
||||
"github.com/grafana/grafana/pkg/infra/metrics"
|
||||
"github.com/grafana/grafana/pkg/infra/remotecache"
|
||||
@ -79,6 +80,7 @@ var wireBasicSet = wire.NewSet(
|
||||
routing.ProvideRegister,
|
||||
wire.Bind(new(routing.RouteRegister), new(*routing.RouteRegisterImpl)),
|
||||
hooks.ProvideService,
|
||||
kvstore.ProvideService,
|
||||
localcache.ProvideService,
|
||||
usagestats.ProvideService,
|
||||
wire.Bind(new(usagestats.UsageStats), new(*usagestats.UsageStatsService)),
|
||||
|
27
pkg/services/sqlstore/migrations/kv_store_mig.go
Normal file
27
pkg/services/sqlstore/migrations/kv_store_mig.go
Normal file
@ -0,0 +1,27 @@
|
||||
package migrations
|
||||
|
||||
import (
|
||||
. "github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||
)
|
||||
|
||||
func addKVStoreMigrations(mg *Migrator) {
|
||||
kvStoreV1 := Table{
|
||||
Name: "kv_store",
|
||||
Columns: []*Column{
|
||||
{Name: "id", Type: DB_BigInt, Nullable: false, IsPrimaryKey: true, IsAutoIncrement: true},
|
||||
{Name: "org_id", Type: DB_BigInt, Nullable: false},
|
||||
{Name: "namespace", Type: DB_NVarchar, Length: 190, Nullable: false},
|
||||
{Name: "key", Type: DB_NVarchar, Length: 190, Nullable: false},
|
||||
{Name: "value", Type: DB_MediumText, Nullable: false},
|
||||
{Name: "created", Type: DB_DateTime, Nullable: false},
|
||||
{Name: "updated", Type: DB_DateTime, Nullable: false},
|
||||
},
|
||||
Indices: []*Index{
|
||||
{Cols: []string{"org_id", "namespace", "key"}, Type: UniqueIndex},
|
||||
},
|
||||
}
|
||||
|
||||
mg.AddMigration("create kv_store table v1", NewAddTableMigration(kvStoreV1))
|
||||
|
||||
mg.AddMigration("add index kv_store.org_id-namespace-key", NewAddIndexMigration(kvStoreV1, kvStoreV1.Indices[0]))
|
||||
}
|
@ -52,6 +52,7 @@ func (*OSSMigrations) AddMigration(mg *Migrator) {
|
||||
addLiveChannelMigrations(mg)
|
||||
}
|
||||
ualert.RerunDashAlertMigration(mg)
|
||||
addKVStoreMigrations(mg)
|
||||
}
|
||||
|
||||
func addMigrationLogMigrations(mg *Migrator) {
|
||||
|
Loading…
Reference in New Issue
Block a user