Merge pull request #15457 from bergquist/distributed_cache

Distributed cache
This commit is contained in:
Carl Bergquist 2019-03-14 16:16:39 +01:00 committed by GitHub
commit 291ffcb75b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 679 additions and 2 deletions

View File

@ -56,6 +56,20 @@ jobs:
name: postgres integration tests name: postgres integration tests
command: './scripts/circle-test-postgres.sh' command: './scripts/circle-test-postgres.sh'
cache-server-test:
docker:
- image: circleci/golang:1.11.5
- image: circleci/redis:4-alpine
- image: memcached
working_directory: /go/src/github.com/grafana/grafana
steps:
- checkout
- run: dockerize -wait tcp://127.0.0.1:11211 -timeout 120s
- run: dockerize -wait tcp://127.0.0.1:6379 -timeout 120s
- run:
name: cache server tests
command: './scripts/circle-test-cache-servers.sh'
codespell: codespell:
docker: docker:
- image: circleci/python - image: circleci/python
@ -545,6 +559,8 @@ workflows:
filters: *filter-not-release-or-master filters: *filter-not-release-or-master
- postgres-integration-test: - postgres-integration-test:
filters: *filter-not-release-or-master filters: *filter-not-release-or-master
- cache-server-test:
filters: *filter-not-release-or-master
- grafana-docker-pr: - grafana-docker-pr:
requires: requires:
- build - build
@ -554,4 +570,5 @@ workflows:
- gometalinter - gometalinter
- mysql-integration-test - mysql-integration-test
- postgres-integration-test - postgres-integration-test
- cache-server-test
filters: *filter-not-release-or-master filters: *filter-not-release-or-master

View File

@ -106,6 +106,17 @@ path = grafana.db
# For "sqlite3" only. cache mode setting used for connecting to the database # For "sqlite3" only. cache mode setting used for connecting to the database
cache_mode = private cache_mode = private
#################################### Cache server #############################
[remote_cache]
# Either "redis", "memcached" or "database" default is "database"
type = database
# cache connectionstring options
# database: will use Grafana primary database.
# redis: config like redis server e.g. `addr=127.0.0.1:6379,pool_size=100,db=grafana`
# memcache: 127.0.0.1:11211
connstr =
#################################### Session ############################# #################################### Session #############################
[session] [session]
# Either "memory", "file", "redis", "mysql", "postgres", "memcache", default is "file" # Either "memory", "file", "redis", "mysql", "postgres", "memcache", default is "file"

View File

@ -102,6 +102,17 @@ log_queries =
# For "sqlite3" only. cache mode setting used for connecting to the database. (private, shared) # For "sqlite3" only. cache mode setting used for connecting to the database. (private, shared)
;cache_mode = private ;cache_mode = private
#################################### Cache server #############################
[remote_cache]
# Either "redis", "memcached" or "database" default is "database"
;type = database
# cache connectionstring options
# database: will use Grafana primary database.
# redis: config like redis server e.g. `addr=127.0.0.1:6379,pool_size=100,db=grafana`
# memcache: 127.0.0.1:11211
;connstr =
#################################### Session #################################### #################################### Session ####################################
[session] [session]
# Either "memory", "file", "redis", "mysql", "postgres", default is "file" # Either "memory", "file", "redis", "mysql", "postgres", default is "file"

View File

@ -1,4 +1,4 @@
memcached: redis:
image: redis:latest image: redis:latest
ports: ports:
- "6379:6379" - "6379:6379"

View File

@ -179,7 +179,6 @@ Path to the certificate key file (if `protocol` is set to `https`).
Set to true for Grafana to log all HTTP requests (not just errors). These are logged as Info level events Set to true for Grafana to log all HTTP requests (not just errors). These are logged as Info level events
to grafana log. to grafana log.
<hr />
<hr /> <hr />
@ -262,6 +261,19 @@ Set to `true` to log the sql calls and execution times.
For "sqlite3" only. [Shared cache](https://www.sqlite.org/sharedcache.html) setting used for connecting to the database. (private, shared) For "sqlite3" only. [Shared cache](https://www.sqlite.org/sharedcache.html) setting used for connecting to the database. (private, shared)
Defaults to private. Defaults to private.
<hr />
## [remote_cache]
### type
Either `redis`, `memcached` or `database` default is `database`
### connstr
The remote cache connection string. Leave empty when using `database` since it will use the primary database.
Redis example config: `addr=127.0.0.1:6379,pool_size=100,db=grafana`
Memcache example: `127.0.0.1:11211`
<hr /> <hr />

View File

@ -29,6 +29,7 @@ import (
// self registering services // self registering services
_ "github.com/grafana/grafana/pkg/extensions" _ "github.com/grafana/grafana/pkg/extensions"
_ "github.com/grafana/grafana/pkg/infra/metrics" _ "github.com/grafana/grafana/pkg/infra/metrics"
_ "github.com/grafana/grafana/pkg/infra/remotecache"
_ "github.com/grafana/grafana/pkg/infra/serverlock" _ "github.com/grafana/grafana/pkg/infra/serverlock"
_ "github.com/grafana/grafana/pkg/infra/tracing" _ "github.com/grafana/grafana/pkg/infra/tracing"
_ "github.com/grafana/grafana/pkg/infra/usagestats" _ "github.com/grafana/grafana/pkg/infra/usagestats"

View File

@ -0,0 +1,126 @@
package remotecache
import (
"context"
"time"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/services/sqlstore"
)
var getTime = time.Now
const databaseCacheType = "database"
type databaseCache struct {
SQLStore *sqlstore.SqlStore
log log.Logger
}
func newDatabaseCache(sqlstore *sqlstore.SqlStore) *databaseCache {
dc := &databaseCache{
SQLStore: sqlstore,
log: log.New("remotecache.database"),
}
return dc
}
func (dc *databaseCache) Run(ctx context.Context) error {
ticker := time.NewTicker(time.Minute * 10)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
dc.internalRunGC()
}
}
}
func (dc *databaseCache) internalRunGC() {
now := getTime().Unix()
sql := `DELETE FROM cache_data WHERE (? - created_at) >= expires AND expires <> 0`
_, err := dc.SQLStore.NewSession().Exec(sql, now)
if err != nil {
dc.log.Error("failed to run garbage collect", "error", err)
}
}
func (dc *databaseCache) Get(key string) (interface{}, error) {
cacheHit := CacheData{}
session := dc.SQLStore.NewSession()
defer session.Close()
exist, err := session.Where("cache_key= ?", key).Get(&cacheHit)
if err != nil {
return nil, err
}
if !exist {
return nil, ErrCacheItemNotFound
}
if cacheHit.Expires > 0 {
existedButExpired := getTime().Unix()-cacheHit.CreatedAt >= cacheHit.Expires
if existedButExpired {
_ = dc.Delete(key) //ignore this error since we will return `ErrCacheItemNotFound` anyway
return nil, ErrCacheItemNotFound
}
}
item := &cachedItem{}
if err = decodeGob(cacheHit.Data, item); err != nil {
return nil, err
}
return item.Val, nil
}
func (dc *databaseCache) Set(key string, value interface{}, expire time.Duration) error {
item := &cachedItem{Val: value}
data, err := encodeGob(item)
if err != nil {
return err
}
session := dc.SQLStore.NewSession()
var cacheHit CacheData
has, err := session.Where("cache_key = ?", key).Get(&cacheHit)
if err != nil {
return err
}
var expiresInSeconds int64
if expire != 0 {
expiresInSeconds = int64(expire) / int64(time.Second)
}
// insert or update depending on if item already exist
if has {
sql := `UPDATE cache_data SET data=?, created=?, expire=? WHERE cache_key='?'`
_, err = session.Exec(sql, data, getTime().Unix(), expiresInSeconds, key)
} else {
sql := `INSERT INTO cache_data (cache_key,data,created_at,expires) VALUES(?,?,?,?)`
_, err = session.Exec(sql, key, data, getTime().Unix(), expiresInSeconds)
}
return err
}
func (dc *databaseCache) Delete(key string) error {
sql := "DELETE FROM cache_data WHERE cache_key=?"
_, err := dc.SQLStore.NewSession().Exec(sql, key)
return err
}
type CacheData struct {
CacheKey string
Data []byte
Expires int64
CreatedAt int64
}

View File

@ -0,0 +1,56 @@
package remotecache
import (
"testing"
"time"
"github.com/bmizerany/assert"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/services/sqlstore"
)
func TestDatabaseStorageGarbageCollection(t *testing.T) {
sqlstore := sqlstore.InitTestDB(t)
db := &databaseCache{
SQLStore: sqlstore,
log: log.New("remotecache.database"),
}
obj := &CacheableStruct{String: "foolbar"}
//set time.now to 2 weeks ago
var err error
getTime = func() time.Time { return time.Now().AddDate(0, 0, -2) }
err = db.Set("key1", obj, 1000*time.Second)
assert.Equal(t, err, nil)
err = db.Set("key2", obj, 1000*time.Second)
assert.Equal(t, err, nil)
err = db.Set("key3", obj, 1000*time.Second)
assert.Equal(t, err, nil)
// insert object that should never expire
db.Set("key4", obj, 0)
getTime = time.Now
db.Set("key5", obj, 1000*time.Second)
//run GC
db.internalRunGC()
//try to read values
_, err = db.Get("key1")
assert.Equal(t, err, ErrCacheItemNotFound, "expected cache item not found. got: ", err)
_, err = db.Get("key2")
assert.Equal(t, err, ErrCacheItemNotFound)
_, err = db.Get("key3")
assert.Equal(t, err, ErrCacheItemNotFound)
_, err = db.Get("key4")
assert.Equal(t, err, nil)
_, err = db.Get("key5")
assert.Equal(t, err, nil)
}

View File

@ -0,0 +1,71 @@
package remotecache
import (
"time"
"github.com/bradfitz/gomemcache/memcache"
"github.com/grafana/grafana/pkg/setting"
)
const memcachedCacheType = "memcached"
type memcachedStorage struct {
c *memcache.Client
}
func newMemcachedStorage(opts *setting.RemoteCacheOptions) *memcachedStorage {
return &memcachedStorage{
c: memcache.New(opts.ConnStr),
}
}
func newItem(sid string, data []byte, expire int32) *memcache.Item {
return &memcache.Item{
Key: sid,
Value: data,
Expiration: expire,
}
}
// Set sets value to given key in the cache.
func (s *memcachedStorage) Set(key string, val interface{}, expires time.Duration) error {
item := &cachedItem{Val: val}
bytes, err := encodeGob(item)
if err != nil {
return err
}
var expiresInSeconds int64
if expires != 0 {
expiresInSeconds = int64(expires) / int64(time.Second)
}
memcachedItem := newItem(key, bytes, int32(expiresInSeconds))
return s.c.Set(memcachedItem)
}
// Get gets value by given key in the cache.
func (s *memcachedStorage) Get(key string) (interface{}, error) {
memcachedItem, err := s.c.Get(key)
if err != nil && err.Error() == "memcache: cache miss" {
return nil, ErrCacheItemNotFound
}
if err != nil {
return nil, err
}
item := &cachedItem{}
err = decodeGob(memcachedItem.Value, item)
if err != nil {
return nil, err
}
return item.Val, nil
}
// Delete delete a key from the cache
func (s *memcachedStorage) Delete(key string) error {
return s.c.Delete(key)
}

View File

@ -0,0 +1,15 @@
// +build memcached
package remotecache
import (
"testing"
"github.com/grafana/grafana/pkg/setting"
)
func TestMemcachedCacheStorage(t *testing.T) {
opts := &setting.RemoteCacheOptions{Name: memcachedCacheType, ConnStr: "localhost:11211"}
client := createTestClient(t, opts, nil)
runTestsForClient(t, client)
}

View File

@ -0,0 +1,62 @@
package remotecache
import (
"time"
"github.com/grafana/grafana/pkg/setting"
redis "gopkg.in/redis.v2"
)
const redisCacheType = "redis"
type redisStorage struct {
c *redis.Client
}
func newRedisStorage(opts *setting.RemoteCacheOptions) *redisStorage {
opt := &redis.Options{
Network: "tcp",
Addr: opts.ConnStr,
}
return &redisStorage{c: redis.NewClient(opt)}
}
// Set sets value to given key in session.
func (s *redisStorage) Set(key string, val interface{}, expires time.Duration) error {
item := &cachedItem{Val: val}
value, err := encodeGob(item)
if err != nil {
return err
}
status := s.c.SetEx(key, expires, string(value))
return status.Err()
}
// Get gets value by given key in session.
func (s *redisStorage) Get(key string) (interface{}, error) {
v := s.c.Get(key)
item := &cachedItem{}
err := decodeGob([]byte(v.Val()), item)
if err == nil {
return item.Val, nil
}
if err.Error() == "EOF" {
return nil, ErrCacheItemNotFound
}
if err != nil {
return nil, err
}
return item.Val, nil
}
// Delete delete a key from session.
func (s *redisStorage) Delete(key string) error {
cmd := s.c.Del(key)
return cmd.Err()
}

View File

@ -0,0 +1,16 @@
// +build redis
package remotecache
import (
"testing"
"github.com/grafana/grafana/pkg/setting"
)
func TestRedisCacheStorage(t *testing.T) {
opts := &setting.RemoteCacheOptions{Name: redisCacheType, ConnStr: "localhost:6379"}
client := createTestClient(t, opts, nil)
runTestsForClient(t, client)
}

View File

@ -0,0 +1,133 @@
package remotecache
import (
"bytes"
"context"
"encoding/gob"
"errors"
"time"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/registry"
)
var (
// ErrCacheItemNotFound is returned if cache does not exist
ErrCacheItemNotFound = errors.New("cache item not found")
// ErrInvalidCacheType is returned if the type is invalid
ErrInvalidCacheType = errors.New("invalid remote cache name")
defaultMaxCacheExpiration = time.Hour * 24
)
func init() {
registry.RegisterService(&RemoteCache{})
}
// CacheStorage allows the caller to set, get and delete items in the cache.
// Cached items are stored as byte arrays and marshalled using "encoding/gob"
// so any struct added to the cache needs to be registred with `remotecache.Register`
// ex `remotecache.Register(CacheableStruct{})``
type CacheStorage interface {
// Get reads object from Cache
Get(key string) (interface{}, error)
// Set sets an object into the cache. if `expire` is set to zero it will default to 24h
Set(key string, value interface{}, expire time.Duration) error
// Delete object from cache
Delete(key string) error
}
// RemoteCache allows Grafana to cache data outside its own process
type RemoteCache struct {
log log.Logger
client CacheStorage
SQLStore *sqlstore.SqlStore `inject:""`
Cfg *setting.Cfg `inject:""`
}
// Get reads object from Cache
func (ds *RemoteCache) Get(key string) (interface{}, error) {
return ds.client.Get(key)
}
// Set sets an object into the cache. if `expire` is set to zero it will default to 24h
func (ds *RemoteCache) Set(key string, value interface{}, expire time.Duration) error {
if expire == 0 {
expire = defaultMaxCacheExpiration
}
return ds.client.Set(key, value, expire)
}
// Delete object from cache
func (ds *RemoteCache) Delete(key string) error {
return ds.client.Delete(key)
}
// Init initializes the service
func (ds *RemoteCache) Init() error {
ds.log = log.New("cache.remote")
var err error
ds.client, err = createClient(ds.Cfg.RemoteCacheOptions, ds.SQLStore)
return err
}
// Run start the backend processes for cache clients
func (ds *RemoteCache) Run(ctx context.Context) error {
//create new interface if more clients need GC jobs
backgroundjob, ok := ds.client.(registry.BackgroundService)
if ok {
return backgroundjob.Run(ctx)
}
<-ctx.Done()
return ctx.Err()
}
func createClient(opts *setting.RemoteCacheOptions, sqlstore *sqlstore.SqlStore) (CacheStorage, error) {
if opts.Name == redisCacheType {
return newRedisStorage(opts), nil
}
if opts.Name == memcachedCacheType {
return newMemcachedStorage(opts), nil
}
if opts.Name == databaseCacheType {
return newDatabaseCache(sqlstore), nil
}
return nil, ErrInvalidCacheType
}
// Register records a type, identified by a value for that type, under its
// internal type name. That name will identify the concrete type of a value
// sent or received as an interface variable. Only types that will be
// transferred as implementations of interface values need to be registered.
// Expecting to be used only during initialization, it panics if the mapping
// between types and names is not a bijection.
func Register(value interface{}) {
gob.Register(value)
}
type cachedItem struct {
Val interface{}
}
func encodeGob(item *cachedItem) ([]byte, error) {
buf := bytes.NewBuffer(nil)
err := gob.NewEncoder(buf).Encode(item)
return buf.Bytes(), err
}
func decodeGob(data []byte, out *cachedItem) error {
buf := bytes.NewBuffer(data)
return gob.NewDecoder(buf).Decode(&out)
}

View File

@ -0,0 +1,93 @@
package remotecache
import (
"testing"
"time"
"github.com/bmizerany/assert"
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/setting"
)
type CacheableStruct struct {
String string
Int64 int64
}
func init() {
Register(CacheableStruct{})
}
func createTestClient(t *testing.T, opts *setting.RemoteCacheOptions, sqlstore *sqlstore.SqlStore) CacheStorage {
t.Helper()
dc := &RemoteCache{
SQLStore: sqlstore,
Cfg: &setting.Cfg{
RemoteCacheOptions: opts,
},
}
err := dc.Init()
if err != nil {
t.Fatalf("failed to init client for test. error: %v", err)
}
return dc
}
func TestCachedBasedOnConfig(t *testing.T) {
cfg := setting.NewCfg()
cfg.Load(&setting.CommandLineArgs{
HomePath: "../../../",
})
client := createTestClient(t, cfg.RemoteCacheOptions, sqlstore.InitTestDB(t))
runTestsForClient(t, client)
}
func TestInvalidCacheTypeReturnsError(t *testing.T) {
_, err := createClient(&setting.RemoteCacheOptions{Name: "invalid"}, nil)
assert.Equal(t, err, ErrInvalidCacheType)
}
func runTestsForClient(t *testing.T, client CacheStorage) {
canPutGetAndDeleteCachedObjects(t, client)
canNotFetchExpiredItems(t, client)
}
func canPutGetAndDeleteCachedObjects(t *testing.T, client CacheStorage) {
cacheableStruct := CacheableStruct{String: "hej", Int64: 2000}
err := client.Set("key1", cacheableStruct, 0)
assert.Equal(t, err, nil, "expected nil. got: ", err)
data, err := client.Get("key1")
s, ok := data.(CacheableStruct)
assert.Equal(t, ok, true)
assert.Equal(t, s.String, "hej")
assert.Equal(t, s.Int64, int64(2000))
err = client.Delete("key1")
assert.Equal(t, err, nil)
_, err = client.Get("key1")
assert.Equal(t, err, ErrCacheItemNotFound)
}
func canNotFetchExpiredItems(t *testing.T, client CacheStorage) {
cacheableStruct := CacheableStruct{String: "hej", Int64: 2000}
err := client.Set("key1", cacheableStruct, time.Second)
assert.Equal(t, err, nil)
//not sure how this can be avoided when testing redis/memcached :/
<-time.After(time.Second + time.Millisecond)
// should not be able to read that value since its expired
_, err = client.Get("key1")
assert.Equal(t, err, ErrCacheItemNotFound)
}

View File

@ -0,0 +1,22 @@
package migrations
import "github.com/grafana/grafana/pkg/services/sqlstore/migrator"
func addCacheMigration(mg *migrator.Migrator) {
var cacheDataV1 = migrator.Table{
Name: "cache_data",
Columns: []*migrator.Column{
{Name: "cache_key", Type: migrator.DB_NVarchar, IsPrimaryKey: true, Length: 168},
{Name: "data", Type: migrator.DB_Blob},
{Name: "expires", Type: migrator.DB_Integer, Length: 255, Nullable: false},
{Name: "created_at", Type: migrator.DB_Integer, Length: 255, Nullable: false},
},
Indices: []*migrator.Index{
{Cols: []string{"cache_key"}, Type: migrator.UniqueIndex},
},
}
mg.AddMigration("create cache_data table", migrator.NewAddTableMigration(cacheDataV1))
mg.AddMigration("add unique index cache_data.cache_key", migrator.NewAddIndexMigration(cacheDataV1, cacheDataV1.Indices[0]))
}

View File

@ -33,6 +33,7 @@ func AddMigrations(mg *Migrator) {
addUserAuthMigrations(mg) addUserAuthMigrations(mg)
addServerlockMigrations(mg) addServerlockMigrations(mg)
addUserAuthTokenMigrations(mg) addUserAuthTokenMigrations(mg)
addCacheMigration(mg)
} }
func addMigrationLogMigrations(mg *Migrator) { func addMigrationLogMigrations(mg *Migrator) {

View File

@ -241,6 +241,9 @@ type Cfg struct {
// User // User
EditorsCanOwn bool EditorsCanOwn bool
// DistributedCache
RemoteCacheOptions *RemoteCacheOptions
} }
type CommandLineArgs struct { type CommandLineArgs struct {
@ -781,9 +784,20 @@ func (cfg *Cfg) Load(args *CommandLineArgs) error {
enterprise := iniFile.Section("enterprise") enterprise := iniFile.Section("enterprise")
cfg.EnterpriseLicensePath = enterprise.Key("license_path").MustString(filepath.Join(cfg.DataPath, "license.jwt")) cfg.EnterpriseLicensePath = enterprise.Key("license_path").MustString(filepath.Join(cfg.DataPath, "license.jwt"))
cacheServer := iniFile.Section("remote_cache")
cfg.RemoteCacheOptions = &RemoteCacheOptions{
Name: cacheServer.Key("type").MustString("database"),
ConnStr: cacheServer.Key("connstr").MustString(""),
}
return nil return nil
} }
type RemoteCacheOptions struct {
Name string
ConnStr string
}
func (cfg *Cfg) readSessionConfig() { func (cfg *Cfg) readSessionConfig() {
sec := cfg.Raw.Section("session") sec := cfg.Raw.Section("session")
SessionOptions = session.Options{} SessionOptions = session.Options{}

View File

@ -0,0 +1,16 @@
#!/bin/bash
function exit_if_fail {
command=$@
echo "Executing '$command'"
eval $command
rc=$?
if [ $rc -ne 0 ]; then
echo "'$command' returned $rc."
exit $rc
fi
}
echo "running redis and memcache tests"
time exit_if_fail go test -tags=redis ./pkg/infra/remotecache/...
time exit_if_fail go test -tags=memcached ./pkg/infra/remotecache/...