mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
cache: initial version of db cache
This commit is contained in:
parent
f9e3c33ebd
commit
d4f5789660
82
pkg/infra/distcache/database_storage.go
Normal file
82
pkg/infra/distcache/database_storage.go
Normal file
@ -0,0 +1,82 @@
|
|||||||
|
package distcache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||||
|
)
|
||||||
|
|
||||||
|
type databaseCache struct {
|
||||||
|
SQLStore *sqlstore.SqlStore
|
||||||
|
}
|
||||||
|
|
||||||
|
var getTime = time.Now
|
||||||
|
|
||||||
|
func (dc *databaseCache) Get(key string) (interface{}, error) {
|
||||||
|
//now := getTime().Unix()
|
||||||
|
|
||||||
|
cacheHits := []CacheData{}
|
||||||
|
err := dc.SQLStore.NewSession().Where(`key = ?`, key).Find(&cacheHits)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var cacheHit CacheData
|
||||||
|
if len(cacheHits) == 0 {
|
||||||
|
return nil, ErrCacheItemNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
cacheHit = cacheHits[0]
|
||||||
|
if cacheHit.Expires > 0 {
|
||||||
|
if getTime().Unix()-cacheHit.CreatedAt >= cacheHit.Expires {
|
||||||
|
dc.Delete(key)
|
||||||
|
return nil, ErrCacheItemNotFound
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
item := &Item{}
|
||||||
|
if err = DecodeGob(cacheHit.Data, item); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return item.Val, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type CacheData struct {
|
||||||
|
Key string
|
||||||
|
Data []byte
|
||||||
|
Expires int64
|
||||||
|
CreatedAt int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dc *databaseCache) Put(key string, value interface{}, expire int64) error {
|
||||||
|
item := &Item{Val: value}
|
||||||
|
data, err := EncodeGob(item)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
now := getTime().Unix()
|
||||||
|
|
||||||
|
cacheHits := []CacheData{}
|
||||||
|
err = dc.SQLStore.NewSession().Where(`key = ?`, key).Find(&cacheHits)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(cacheHits) > 0 {
|
||||||
|
_, err = dc.SQLStore.NewSession().Exec("UPDATE cached_data SET data=?, created=?, expire=? WHERE key=?", data, now, expire, key)
|
||||||
|
} else {
|
||||||
|
_, err = dc.SQLStore.NewSession().Exec("INSERT INTO cache_data(key,data,created_at,expires) VALUES(?,?,?,?)", key, data, now, expire)
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dc *databaseCache) Delete(key string) error {
|
||||||
|
sql := `DELETE FROM cache_data WHERE key = ?`
|
||||||
|
|
||||||
|
_, err := dc.SQLStore.NewSession().Exec(sql, key)
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
68
pkg/infra/distcache/distcache.go
Normal file
68
pkg/infra/distcache/distcache.go
Normal file
@ -0,0 +1,68 @@
|
|||||||
|
package distcache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/gob"
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"github.com/grafana/grafana/pkg/log"
|
||||||
|
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||||
|
|
||||||
|
"github.com/grafana/grafana/pkg/registry"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrCacheItemNotFound = errors.New("cache item not found")
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
registry.RegisterService(&DistributedCache{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Init initializes the service
|
||||||
|
func (ds *DistributedCache) Init() error {
|
||||||
|
ds.log = log.New("distributed.cache")
|
||||||
|
|
||||||
|
// memory
|
||||||
|
// redis
|
||||||
|
// memcache
|
||||||
|
// database. using SQLSTORE
|
||||||
|
ds.Client = &databaseCache{SQLStore: ds.SQLStore}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DistributedCache allows Grafana to cache data outside its own process
|
||||||
|
type DistributedCache struct {
|
||||||
|
log log.Logger
|
||||||
|
Client cacheStorage
|
||||||
|
SQLStore *sqlstore.SqlStore `inject:""`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Item struct {
|
||||||
|
Val interface{}
|
||||||
|
Created int64
|
||||||
|
Expire int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func EncodeGob(item *Item) ([]byte, error) {
|
||||||
|
buf := bytes.NewBuffer(nil)
|
||||||
|
err := gob.NewEncoder(buf).Encode(item)
|
||||||
|
return buf.Bytes(), err
|
||||||
|
}
|
||||||
|
|
||||||
|
func DecodeGob(data []byte, out *Item) error {
|
||||||
|
buf := bytes.NewBuffer(data)
|
||||||
|
return gob.NewDecoder(buf).Decode(&out)
|
||||||
|
}
|
||||||
|
|
||||||
|
type cacheStorage interface {
|
||||||
|
// Get reads object from Cache
|
||||||
|
Get(key string) (interface{}, error)
|
||||||
|
|
||||||
|
// Puts an object into the cache
|
||||||
|
Put(key string, value interface{}, expire int64) error
|
||||||
|
|
||||||
|
// Delete object from cache
|
||||||
|
Delete(key string) error
|
||||||
|
}
|
87
pkg/infra/distcache/distcache_test.go
Normal file
87
pkg/infra/distcache/distcache_test.go
Normal file
@ -0,0 +1,87 @@
|
|||||||
|
package distcache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/gob"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/bmizerany/assert"
|
||||||
|
|
||||||
|
"github.com/grafana/grafana/pkg/log"
|
||||||
|
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||||
|
)
|
||||||
|
|
||||||
|
type CacheableStruct struct {
|
||||||
|
String string
|
||||||
|
Int64 int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
gob.Register(CacheableStruct{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func createClient(t *testing.T) cacheStorage {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
sqlstore := sqlstore.InitTestDB(t)
|
||||||
|
dc := DistributedCache{log: log.New("test.logger"), SQLStore: sqlstore}
|
||||||
|
dc.Init()
|
||||||
|
return dc.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCanPutIntoDatabaseStorage(t *testing.T) {
|
||||||
|
client := createClient(t)
|
||||||
|
cacheableStruct := CacheableStruct{String: "hej", Int64: 2000}
|
||||||
|
|
||||||
|
err := client.Put("key", cacheableStruct, 1000)
|
||||||
|
assert.Equal(t, err, nil)
|
||||||
|
|
||||||
|
data, err := client.Get("key")
|
||||||
|
s, ok := data.(CacheableStruct)
|
||||||
|
|
||||||
|
assert.Equal(t, ok, true)
|
||||||
|
assert.Equal(t, s.String, "hej")
|
||||||
|
assert.Equal(t, s.Int64, int64(2000))
|
||||||
|
|
||||||
|
err = client.Delete("key")
|
||||||
|
assert.Equal(t, err, nil)
|
||||||
|
|
||||||
|
_, err = client.Get("key")
|
||||||
|
assert.Equal(t, err, ErrCacheItemNotFound)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCanNotFetchExpiredItems(t *testing.T) {
|
||||||
|
client := createClient(t)
|
||||||
|
|
||||||
|
cacheableStruct := CacheableStruct{String: "hej", Int64: 2000}
|
||||||
|
|
||||||
|
// insert cache item one day back
|
||||||
|
getTime = func() time.Time { return time.Now().AddDate(0, 0, -2) }
|
||||||
|
err := client.Put("key", cacheableStruct, 10000)
|
||||||
|
assert.Equal(t, err, nil)
|
||||||
|
|
||||||
|
// should not be able to read that value since its expired
|
||||||
|
getTime = time.Now
|
||||||
|
_, err = client.Get("key")
|
||||||
|
assert.Equal(t, err, ErrCacheItemNotFound)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCanSetInfiniteCacheExpiration(t *testing.T) {
|
||||||
|
client := createClient(t)
|
||||||
|
|
||||||
|
cacheableStruct := CacheableStruct{String: "hej", Int64: 2000}
|
||||||
|
|
||||||
|
// insert cache item one day back
|
||||||
|
getTime = func() time.Time { return time.Now().AddDate(0, 0, -2) }
|
||||||
|
err := client.Put("key", cacheableStruct, 0)
|
||||||
|
assert.Equal(t, err, nil)
|
||||||
|
|
||||||
|
// should not be able to read that value since its expired
|
||||||
|
getTime = time.Now
|
||||||
|
data, err := client.Get("key")
|
||||||
|
s, ok := data.(CacheableStruct)
|
||||||
|
|
||||||
|
assert.Equal(t, ok, true)
|
||||||
|
assert.Equal(t, s.String, "hej")
|
||||||
|
assert.Equal(t, s.Int64, int64(2000))
|
||||||
|
}
|
17
pkg/services/sqlstore/migrations/cache_data_mig.go
Normal file
17
pkg/services/sqlstore/migrations/cache_data_mig.go
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
package migrations
|
||||||
|
|
||||||
|
import . "github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||||
|
|
||||||
|
func addCacheMigration(mg *Migrator) {
|
||||||
|
var cacheDataV1 = Table{
|
||||||
|
Name: "cache_data",
|
||||||
|
Columns: []*Column{
|
||||||
|
{Name: "key", Type: DB_Char, IsPrimaryKey: true, Length: 16},
|
||||||
|
{Name: "data", Type: DB_Blob},
|
||||||
|
{Name: "expires", Type: DB_Integer, Length: 255, Nullable: false},
|
||||||
|
{Name: "created_at", Type: DB_Integer, Length: 255, Nullable: false},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
mg.AddMigration("create cache_data table", NewAddTableMigration(cacheDataV1))
|
||||||
|
}
|
@ -33,6 +33,7 @@ func AddMigrations(mg *Migrator) {
|
|||||||
addUserAuthMigrations(mg)
|
addUserAuthMigrations(mg)
|
||||||
addServerlockMigrations(mg)
|
addServerlockMigrations(mg)
|
||||||
addUserAuthTokenMigrations(mg)
|
addUserAuthTokenMigrations(mg)
|
||||||
|
addCacheMigration(mg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func addMigrationLogMigrations(mg *Migrator) {
|
func addMigrationLogMigrations(mg *Migrator) {
|
||||||
|
Loading…
Reference in New Issue
Block a user