diff --git a/conf/defaults.ini b/conf/defaults.ini index 3386e552b8a..91a58243c04 100644 --- a/conf/defaults.ini +++ b/conf/defaults.ini @@ -108,11 +108,10 @@ cache_mode = private #################################### Cache server ############################# [cache_server] -# Either "memory", "redis", "memcached" or "database" default is "database" +# Either "redis", "memcached" or "database" default is "database" type = database # cache connectionstring options -# memory: no config required. Should only be used on single install grafana. # database: will use Grafana primary database. # redis: config like redis server e.g. `addr=127.0.0.1:6379,pool_size=100,db=grafana` # memcache: 127.0.0.1:11211 diff --git a/pkg/cmd/grafana-server/server.go b/pkg/cmd/grafana-server/server.go index 53218147ae0..d2852e0b8ca 100644 --- a/pkg/cmd/grafana-server/server.go +++ b/pkg/cmd/grafana-server/server.go @@ -28,6 +28,7 @@ import ( // self registering services _ "github.com/grafana/grafana/pkg/extensions" + _ "github.com/grafana/grafana/pkg/infra/distcache" _ "github.com/grafana/grafana/pkg/infra/metrics" _ "github.com/grafana/grafana/pkg/infra/serverlock" _ "github.com/grafana/grafana/pkg/infra/tracing" diff --git a/pkg/infra/distcache/database_storage.go b/pkg/infra/distcache/database_storage.go index 0cf613471db..6a357005a21 100644 --- a/pkg/infra/distcache/database_storage.go +++ b/pkg/infra/distcache/database_storage.go @@ -1,6 +1,7 @@ package distcache import ( + "context" "time" "github.com/grafana/grafana/pkg/log" @@ -18,32 +19,33 @@ func newDatabaseCache(sqlstore *sqlstore.SqlStore) *databaseCache { log: log.New("distcache.database"), } - //go dc.StartGC() //TODO: start the GC somehow return dc } +func (dc *databaseCache) Run(ctx context.Context) error { + ticker := time.NewTicker(time.Minute * 10) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + dc.internalRunGC() + } + } +} + var getTime = time.Now func (dc *databaseCache) internalRunGC() { now := getTime().Unix() - sql := `DELETE FROM cache_data WHERE (? - created) >= expire` + sql := `DELETE FROM cache_data WHERE (? - created_at) >= expires AND expires <> 0` - //EXTRACT(EPOCH FROM NOW()) - created >= expire - //UNIX_TIMESTAMP(NOW()) - created >= expire _, err := dc.SQLStore.NewSession().Exec(sql, now) if err != nil { dc.log.Error("failed to run garbage collect", "error", err) } } -func (dc *databaseCache) StartGC() { - dc.internalRunGC() - - time.AfterFunc(time.Second*10, func() { - dc.StartGC() - }) -} - func (dc *databaseCache) Get(key string) (interface{}, error) { cacheHits := []cacheData{} err := dc.SQLStore.NewSession().Where(`key = ?`, key).Find(&cacheHits) @@ -57,8 +59,10 @@ func (dc *databaseCache) Get(key string) (interface{}, error) { } cacheHit = cacheHits[0] + // if Expires is set. Make sure its still valid. if cacheHit.Expires > 0 { - if getTime().Unix()-cacheHit.CreatedAt >= cacheHit.Expires { + existedButExpired := getTime().Unix()-cacheHit.CreatedAt >= cacheHit.Expires + if existedButExpired { dc.Delete(key) return nil, ErrCacheItemNotFound } @@ -72,13 +76,6 @@ func (dc *databaseCache) Get(key string) (interface{}, error) { return item.Val, nil } -type cacheData struct { - Key string - Data []byte - Expires int64 - CreatedAt int64 -} - func (dc *databaseCache) Set(key string, value interface{}, expire time.Duration) error { item := &cachedItem{Val: value} data, err := encodeGob(item) @@ -87,22 +84,23 @@ func (dc *databaseCache) Set(key string, value interface{}, expire time.Duration } now := getTime().Unix() - cacheHits := []cacheData{} err = dc.SQLStore.NewSession().Where(`key = ?`, key).Find(&cacheHits) if err != nil { return err } - var expiresInEpoch int64 + var expiresAtEpoch int64 if expire != 0 { - expiresInEpoch = int64(expire) / int64(time.Second) + expiresAtEpoch = int64(expire) / int64(time.Second) } + session := dc.SQLStore.NewSession() + // insert or update depending on if item already exist if len(cacheHits) > 0 { - _, err = dc.SQLStore.NewSession().Exec("UPDATE cache_data SET data=?, created=?, expire=? WHERE key=?", data, now, expiresInEpoch, key) + _, err = session.Exec("UPDATE cache_data SET data=?, created=?, expire=? WHERE key=?", data, now, expiresAtEpoch, key) } else { - _, err = dc.SQLStore.NewSession().Exec("INSERT INTO cache_data(key,data,created_at,expires) VALUES(?,?,?,?)", key, data, now, expiresInEpoch) + _, err = session.Exec("INSERT INTO cache_data(key,data,created_at,expires) VALUES(?,?,?,?)", key, data, now, expiresAtEpoch) } return err @@ -110,8 +108,14 @@ func (dc *databaseCache) Set(key string, value interface{}, expire time.Duration func (dc *databaseCache) Delete(key string) error { sql := `DELETE FROM cache_data WHERE key = ?` - _, err := dc.SQLStore.NewSession().Exec(sql, key) return err } + +type cacheData struct { + Key string + Data []byte + Expires int64 + CreatedAt int64 +} diff --git a/pkg/infra/distcache/distcache.go b/pkg/infra/distcache/distcache.go index 549774b848b..a8f12adaa27 100644 --- a/pkg/infra/distcache/distcache.go +++ b/pkg/infra/distcache/distcache.go @@ -2,6 +2,7 @@ package distcache import ( "bytes" + "context" "encoding/gob" "errors" "time" @@ -22,6 +23,29 @@ func init() { registry.RegisterService(&DistributedCache{}) } +// CacheStorage allows the caller to set, get and delete items in the cache. +// Cached items are stored as byte arrays and marshalled using "encoding/gob" +// so any struct added to the cache needs to be registred with `distcache.Register` +// ex `distcache.Register(CacheableStruct{})`` +type CacheStorage interface { + // Get reads object from Cache + Get(key string) (interface{}, error) + + // Set sets an object into the cache + Set(key string, value interface{}, expire time.Duration) error + + // Delete object from cache + Delete(key string) error +} + +// DistributedCache allows Grafana to cache data outside its own process +type DistributedCache struct { + log log.Logger + Client CacheStorage + SQLStore *sqlstore.SqlStore `inject:""` + Cfg *setting.Cfg `inject:""` +} + // Init initializes the service func (ds *DistributedCache) Init() error { ds.log = log.New("distributed.cache") @@ -31,6 +55,16 @@ func (ds *DistributedCache) Init() error { return nil } +func (ds *DistributedCache) Run(ctx context.Context) error { + backgroundjob, ok := ds.Client.(registry.BackgroundService) + if ok { + return backgroundjob.Run(ctx) + } + + <-ctx.Done() + return ctx.Err() +} + func createClient(opts *setting.CacheOpts, sqlstore *sqlstore.SqlStore) CacheStorage { if opts.Name == "redis" { return newRedisStorage(opts) @@ -43,12 +77,14 @@ func createClient(opts *setting.CacheOpts, sqlstore *sqlstore.SqlStore) CacheSto return newDatabaseCache(sqlstore) } -// DistributedCache allows Grafana to cache data outside its own process -type DistributedCache struct { - log log.Logger - Client CacheStorage - SQLStore *sqlstore.SqlStore `inject:""` - Cfg *setting.Cfg `inject:""` +// Register records a type, identified by a value for that type, under its +// internal type name. That name will identify the concrete type of a value +// sent or received as an interface variable. Only types that will be +// transferred as implementations of interface values need to be registered. +// Expecting to be used only during initialization, it panics if the mapping +// between types and names is not a bijection. +func Register(value interface{}) { + gob.Register(value) } type cachedItem struct { @@ -65,18 +101,3 @@ func decodeGob(data []byte, out *cachedItem) error { buf := bytes.NewBuffer(data) return gob.NewDecoder(buf).Decode(&out) } - -// CacheStorage allows the caller to set, get and delete items in the cache. -// Cached items are stored as byte arrays and marshalled using "encoding/gob" -// so any struct added to the cache needs to be registred with `gob.Register` -// ex `gob.Register(CacheableStruct{})`` -type CacheStorage interface { - // Get reads object from Cache - Get(key string) (interface{}, error) - - // Set sets an object into the cache - Set(key string, value interface{}, expire time.Duration) error - - // Delete object from cache - Delete(key string) error -} diff --git a/pkg/infra/distcache/distcache_test.go b/pkg/infra/distcache/distcache_test.go index a4a596fd930..b631a6283ac 100644 --- a/pkg/infra/distcache/distcache_test.go +++ b/pkg/infra/distcache/distcache_test.go @@ -1,7 +1,6 @@ package distcache import ( - "encoding/gob" "testing" "time" @@ -17,7 +16,7 @@ type CacheableStruct struct { } func init() { - gob.Register(CacheableStruct{}) + Register(CacheableStruct{}) } func createTestClient(t *testing.T, opts *setting.CacheOpts, sqlstore *sqlstore.SqlStore) CacheStorage { diff --git a/pkg/infra/distcache/memcached_storage.go b/pkg/infra/distcache/memcached_storage.go index 7a29eec0e5d..998d05621c9 100644 --- a/pkg/infra/distcache/memcached_storage.go +++ b/pkg/infra/distcache/memcached_storage.go @@ -28,21 +28,18 @@ func newItem(sid string, data []byte, expire int32) *memcache.Item { // Set sets value to given key in the cache. func (s *memcachedStorage) Set(key string, val interface{}, expires time.Duration) error { item := &cachedItem{Val: val} - bytes, err := encodeGob(item) if err != nil { return err } - memcacheItem := newItem(key, bytes, int32(expires)) - - return s.c.Set(memcacheItem) + memcachedItem := newItem(key, bytes, int32(expires)) + return s.c.Set(memcachedItem) } // Get gets value by given key in the cache. func (s *memcachedStorage) Get(key string) (interface{}, error) { - i, err := s.c.Get(key) - + memcachedItem, err := s.c.Get(key) if err != nil && err.Error() == "memcache: cache miss" { return nil, ErrCacheItemNotFound } @@ -53,7 +50,7 @@ func (s *memcachedStorage) Get(key string) (interface{}, error) { item := &cachedItem{} - err = decodeGob(i.Value, item) + err = decodeGob(memcachedItem.Value, item) if err != nil { return nil, err } diff --git a/pkg/setting/setting.go b/pkg/setting/setting.go index f25f2211b40..864c29fb382 100644 --- a/pkg/setting/setting.go +++ b/pkg/setting/setting.go @@ -783,8 +783,6 @@ func (cfg *Cfg) Load(args *CommandLineArgs) error { cfg.EnterpriseLicensePath = enterprise.Key("license_path").MustString(filepath.Join(cfg.DataPath, "license.jwt")) cacheServer := iniFile.Section("cache_server") - //cfg.DistCacheType = cacheServer.Key("type").MustString("database") - //cfg.DistCacheConnStr = cacheServer.Key("connstr").MustString("") cfg.CacheOptions = &CacheOpts{ Name: cacheServer.Key("type").MustString("database"), ConnStr: cacheServer.Key("connstr").MustString(""),