code layouts and comments

This commit is contained in:
bergquist
2019-03-05 15:34:51 +01:00
parent 9a78c23165
commit daa3b17951
7 changed files with 79 additions and 60 deletions

View File

@@ -108,11 +108,10 @@ cache_mode = private
#################################### Cache server #############################
[cache_server]
# Either "memory", "redis", "memcached" or "database" default is "database"
# Either "redis", "memcached" or "database" default is "database"
type = database
# cache connectionstring options
# memory: no config required. Should only be used on single install grafana.
# database: will use Grafana primary database.
# redis: config like redis server e.g. `addr=127.0.0.1:6379,pool_size=100,db=grafana`
# memcache: 127.0.0.1:11211

View File

@@ -28,6 +28,7 @@ import (
// self registering services
_ "github.com/grafana/grafana/pkg/extensions"
_ "github.com/grafana/grafana/pkg/infra/distcache"
_ "github.com/grafana/grafana/pkg/infra/metrics"
_ "github.com/grafana/grafana/pkg/infra/serverlock"
_ "github.com/grafana/grafana/pkg/infra/tracing"

View File

@@ -1,6 +1,7 @@
package distcache
import (
"context"
"time"
"github.com/grafana/grafana/pkg/log"
@@ -18,32 +19,33 @@ func newDatabaseCache(sqlstore *sqlstore.SqlStore) *databaseCache {
log: log.New("distcache.database"),
}
//go dc.StartGC() //TODO: start the GC somehow
return dc
}
func (dc *databaseCache) Run(ctx context.Context) error {
ticker := time.NewTicker(time.Minute * 10)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
dc.internalRunGC()
}
}
}
var getTime = time.Now
func (dc *databaseCache) internalRunGC() {
now := getTime().Unix()
sql := `DELETE FROM cache_data WHERE (? - created) >= expire`
sql := `DELETE FROM cache_data WHERE (? - created_at) >= expires AND expires <> 0`
//EXTRACT(EPOCH FROM NOW()) - created >= expire
//UNIX_TIMESTAMP(NOW()) - created >= expire
_, err := dc.SQLStore.NewSession().Exec(sql, now)
if err != nil {
dc.log.Error("failed to run garbage collect", "error", err)
}
}
func (dc *databaseCache) StartGC() {
dc.internalRunGC()
time.AfterFunc(time.Second*10, func() {
dc.StartGC()
})
}
func (dc *databaseCache) Get(key string) (interface{}, error) {
cacheHits := []cacheData{}
err := dc.SQLStore.NewSession().Where(`key = ?`, key).Find(&cacheHits)
@@ -57,8 +59,10 @@ func (dc *databaseCache) Get(key string) (interface{}, error) {
}
cacheHit = cacheHits[0]
// if Expires is set. Make sure its still valid.
if cacheHit.Expires > 0 {
if getTime().Unix()-cacheHit.CreatedAt >= cacheHit.Expires {
existedButExpired := getTime().Unix()-cacheHit.CreatedAt >= cacheHit.Expires
if existedButExpired {
dc.Delete(key)
return nil, ErrCacheItemNotFound
}
@@ -72,13 +76,6 @@ func (dc *databaseCache) Get(key string) (interface{}, error) {
return item.Val, nil
}
type cacheData struct {
Key string
Data []byte
Expires int64
CreatedAt int64
}
func (dc *databaseCache) Set(key string, value interface{}, expire time.Duration) error {
item := &cachedItem{Val: value}
data, err := encodeGob(item)
@@ -87,22 +84,23 @@ func (dc *databaseCache) Set(key string, value interface{}, expire time.Duration
}
now := getTime().Unix()
cacheHits := []cacheData{}
err = dc.SQLStore.NewSession().Where(`key = ?`, key).Find(&cacheHits)
if err != nil {
return err
}
var expiresInEpoch int64
var expiresAtEpoch int64
if expire != 0 {
expiresInEpoch = int64(expire) / int64(time.Second)
expiresAtEpoch = int64(expire) / int64(time.Second)
}
session := dc.SQLStore.NewSession()
// insert or update depending on if item already exist
if len(cacheHits) > 0 {
_, err = dc.SQLStore.NewSession().Exec("UPDATE cache_data SET data=?, created=?, expire=? WHERE key=?", data, now, expiresInEpoch, key)
_, err = session.Exec("UPDATE cache_data SET data=?, created=?, expire=? WHERE key=?", data, now, expiresAtEpoch, key)
} else {
_, err = dc.SQLStore.NewSession().Exec("INSERT INTO cache_data(key,data,created_at,expires) VALUES(?,?,?,?)", key, data, now, expiresInEpoch)
_, err = session.Exec("INSERT INTO cache_data(key,data,created_at,expires) VALUES(?,?,?,?)", key, data, now, expiresAtEpoch)
}
return err
@@ -110,8 +108,14 @@ func (dc *databaseCache) Set(key string, value interface{}, expire time.Duration
func (dc *databaseCache) Delete(key string) error {
sql := `DELETE FROM cache_data WHERE key = ?`
_, err := dc.SQLStore.NewSession().Exec(sql, key)
return err
}
type cacheData struct {
Key string
Data []byte
Expires int64
CreatedAt int64
}

View File

@@ -2,6 +2,7 @@ package distcache
import (
"bytes"
"context"
"encoding/gob"
"errors"
"time"
@@ -22,6 +23,29 @@ func init() {
registry.RegisterService(&DistributedCache{})
}
// CacheStorage allows the caller to set, get and delete items in the cache.
// Cached items are stored as byte arrays and marshalled using "encoding/gob"
// so any struct added to the cache needs to be registred with `distcache.Register`
// ex `distcache.Register(CacheableStruct{})``
type CacheStorage interface {
// Get reads object from Cache
Get(key string) (interface{}, error)
// Set sets an object into the cache
Set(key string, value interface{}, expire time.Duration) error
// Delete object from cache
Delete(key string) error
}
// DistributedCache allows Grafana to cache data outside its own process
type DistributedCache struct {
log log.Logger
Client CacheStorage
SQLStore *sqlstore.SqlStore `inject:""`
Cfg *setting.Cfg `inject:""`
}
// Init initializes the service
func (ds *DistributedCache) Init() error {
ds.log = log.New("distributed.cache")
@@ -31,6 +55,16 @@ func (ds *DistributedCache) Init() error {
return nil
}
func (ds *DistributedCache) Run(ctx context.Context) error {
backgroundjob, ok := ds.Client.(registry.BackgroundService)
if ok {
return backgroundjob.Run(ctx)
}
<-ctx.Done()
return ctx.Err()
}
func createClient(opts *setting.CacheOpts, sqlstore *sqlstore.SqlStore) CacheStorage {
if opts.Name == "redis" {
return newRedisStorage(opts)
@@ -43,12 +77,14 @@ func createClient(opts *setting.CacheOpts, sqlstore *sqlstore.SqlStore) CacheSto
return newDatabaseCache(sqlstore)
}
// DistributedCache allows Grafana to cache data outside its own process
type DistributedCache struct {
log log.Logger
Client CacheStorage
SQLStore *sqlstore.SqlStore `inject:""`
Cfg *setting.Cfg `inject:""`
// Register records a type, identified by a value for that type, under its
// internal type name. That name will identify the concrete type of a value
// sent or received as an interface variable. Only types that will be
// transferred as implementations of interface values need to be registered.
// Expecting to be used only during initialization, it panics if the mapping
// between types and names is not a bijection.
func Register(value interface{}) {
gob.Register(value)
}
type cachedItem struct {
@@ -65,18 +101,3 @@ func decodeGob(data []byte, out *cachedItem) error {
buf := bytes.NewBuffer(data)
return gob.NewDecoder(buf).Decode(&out)
}
// CacheStorage allows the caller to set, get and delete items in the cache.
// Cached items are stored as byte arrays and marshalled using "encoding/gob"
// so any struct added to the cache needs to be registred with `gob.Register`
// ex `gob.Register(CacheableStruct{})``
type CacheStorage interface {
// Get reads object from Cache
Get(key string) (interface{}, error)
// Set sets an object into the cache
Set(key string, value interface{}, expire time.Duration) error
// Delete object from cache
Delete(key string) error
}

View File

@@ -1,7 +1,6 @@
package distcache
import (
"encoding/gob"
"testing"
"time"
@@ -17,7 +16,7 @@ type CacheableStruct struct {
}
func init() {
gob.Register(CacheableStruct{})
Register(CacheableStruct{})
}
func createTestClient(t *testing.T, opts *setting.CacheOpts, sqlstore *sqlstore.SqlStore) CacheStorage {

View File

@@ -28,21 +28,18 @@ func newItem(sid string, data []byte, expire int32) *memcache.Item {
// Set sets value to given key in the cache.
func (s *memcachedStorage) Set(key string, val interface{}, expires time.Duration) error {
item := &cachedItem{Val: val}
bytes, err := encodeGob(item)
if err != nil {
return err
}
memcacheItem := newItem(key, bytes, int32(expires))
return s.c.Set(memcacheItem)
memcachedItem := newItem(key, bytes, int32(expires))
return s.c.Set(memcachedItem)
}
// Get gets value by given key in the cache.
func (s *memcachedStorage) Get(key string) (interface{}, error) {
i, err := s.c.Get(key)
memcachedItem, err := s.c.Get(key)
if err != nil && err.Error() == "memcache: cache miss" {
return nil, ErrCacheItemNotFound
}
@@ -53,7 +50,7 @@ func (s *memcachedStorage) Get(key string) (interface{}, error) {
item := &cachedItem{}
err = decodeGob(i.Value, item)
err = decodeGob(memcachedItem.Value, item)
if err != nil {
return nil, err
}

View File

@@ -783,8 +783,6 @@ func (cfg *Cfg) Load(args *CommandLineArgs) error {
cfg.EnterpriseLicensePath = enterprise.Key("license_path").MustString(filepath.Join(cfg.DataPath, "license.jwt"))
cacheServer := iniFile.Section("cache_server")
//cfg.DistCacheType = cacheServer.Key("type").MustString("database")
//cfg.DistCacheConnStr = cacheServer.Key("connstr").MustString("")
cfg.CacheOptions = &CacheOpts{
Name: cacheServer.Key("type").MustString("database"),
ConnStr: cacheServer.Key("connstr").MustString(""),