mirror of
https://github.com/grafana/grafana.git
synced 2024-12-28 01:41:24 -06:00
Merge pull request #13900 from marefr/ds_cache_refactor
Datasource proxy cache refactor
This commit is contained in:
commit
5882e5bb46
@ -1,62 +1,22 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/pkg/errors"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/api/pluginproxy"
|
||||
"github.com/grafana/grafana/pkg/bus"
|
||||
"github.com/grafana/grafana/pkg/metrics"
|
||||
m "github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
)
|
||||
|
||||
const HeaderNameNoBackendCache = "X-Grafana-NoCache"
|
||||
|
||||
func (hs *HTTPServer) getDatasourceFromCache(id int64, c *m.ReqContext) (*m.DataSource, error) {
|
||||
userPermissionsQuery := m.GetDataSourcePermissionsForUserQuery{
|
||||
User: c.SignedInUser,
|
||||
}
|
||||
if err := bus.Dispatch(&userPermissionsQuery); err != nil {
|
||||
if err != bus.ErrHandlerNotFound {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
permissionType, exists := userPermissionsQuery.Result[id]
|
||||
if exists && permissionType != m.DsPermissionQuery {
|
||||
return nil, errors.New("User not allowed to access datasource")
|
||||
}
|
||||
}
|
||||
|
||||
nocache := c.Req.Header.Get(HeaderNameNoBackendCache) == "true"
|
||||
cacheKey := fmt.Sprintf("ds-%d", id)
|
||||
|
||||
if !nocache {
|
||||
if cached, found := hs.cache.Get(cacheKey); found {
|
||||
ds := cached.(*m.DataSource)
|
||||
if ds.OrgId == c.OrgId {
|
||||
return ds, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
query := m.GetDataSourceByIdQuery{Id: id, OrgId: c.OrgId}
|
||||
if err := bus.Dispatch(&query); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
hs.cache.Set(cacheKey, query.Result, time.Second*5)
|
||||
return query.Result, nil
|
||||
}
|
||||
|
||||
func (hs *HTTPServer) ProxyDataSourceRequest(c *m.ReqContext) {
|
||||
c.TimeRequest(metrics.M_DataSource_ProxyReq_Timer)
|
||||
|
||||
dsId := c.ParamsInt64(":id")
|
||||
ds, err := hs.getDatasourceFromCache(dsId, c)
|
||||
|
||||
ds, err := hs.DatasourceCache.GetDatasource(dsId, c.SignedInUser, c.SkipCache)
|
||||
if err != nil {
|
||||
if err == m.ErrDataSourceAccessDenied {
|
||||
c.JsonApiErr(403, "Access denied to datasource", err)
|
||||
return
|
||||
}
|
||||
c.JsonApiErr(500, "Unable to load datasource meta data", err)
|
||||
return
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ import (
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
|
||||
gocache "github.com/patrickmn/go-cache"
|
||||
macaron "gopkg.in/macaron.v1"
|
||||
|
||||
"github.com/grafana/grafana/pkg/api/live"
|
||||
@ -28,6 +27,8 @@ import (
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/registry"
|
||||
"github.com/grafana/grafana/pkg/services/cache"
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
"github.com/grafana/grafana/pkg/services/hooks"
|
||||
"github.com/grafana/grafana/pkg/services/rendering"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
@ -46,19 +47,19 @@ type HTTPServer struct {
|
||||
macaron *macaron.Macaron
|
||||
context context.Context
|
||||
streamManager *live.StreamManager
|
||||
cache *gocache.Cache
|
||||
httpSrv *http.Server
|
||||
|
||||
RouteRegister routing.RouteRegister `inject:""`
|
||||
Bus bus.Bus `inject:""`
|
||||
RenderService rendering.Service `inject:""`
|
||||
Cfg *setting.Cfg `inject:""`
|
||||
HooksService *hooks.HooksService `inject:""`
|
||||
RouteRegister routing.RouteRegister `inject:""`
|
||||
Bus bus.Bus `inject:""`
|
||||
RenderService rendering.Service `inject:""`
|
||||
Cfg *setting.Cfg `inject:""`
|
||||
HooksService *hooks.HooksService `inject:""`
|
||||
CacheService *cache.CacheService `inject:""`
|
||||
DatasourceCache datasources.CacheService `inject:""`
|
||||
}
|
||||
|
||||
func (hs *HTTPServer) Init() error {
|
||||
hs.log = log.New("http.server")
|
||||
hs.cache = gocache.New(5*time.Minute, 10*time.Minute)
|
||||
|
||||
hs.streamManager = live.NewStreamManager()
|
||||
hs.macaron = hs.newMacaron()
|
||||
@ -231,6 +232,7 @@ func (hs *HTTPServer) addMiddlewaresAndStaticRoutes() {
|
||||
m.Use(middleware.ValidateHostHeader(setting.Domain))
|
||||
}
|
||||
|
||||
m.Use(middleware.HandleNoCacheHeader())
|
||||
m.Use(middleware.AddDefaultResponseHeaders())
|
||||
}
|
||||
|
||||
|
@ -25,8 +25,11 @@ func (hs *HTTPServer) QueryMetrics(c *m.ReqContext, reqDto dtos.MetricRequest) R
|
||||
return Error(400, "Query missing datasourceId", nil)
|
||||
}
|
||||
|
||||
ds, err := hs.getDatasourceFromCache(datasourceId, c)
|
||||
ds, err := hs.DatasourceCache.GetDatasource(datasourceId, c.SignedInUser, c.SkipCache)
|
||||
if err != nil {
|
||||
if err == m.ErrDataSourceAccessDenied {
|
||||
return Error(403, "Access denied to datasource", err)
|
||||
}
|
||||
return Error(500, "Unable to load datasource meta data", err)
|
||||
}
|
||||
|
||||
|
@ -15,13 +15,21 @@ import (
|
||||
"github.com/grafana/grafana/pkg/api"
|
||||
"github.com/grafana/grafana/pkg/api/routing"
|
||||
"github.com/grafana/grafana/pkg/bus"
|
||||
_ "github.com/grafana/grafana/pkg/extensions"
|
||||
"github.com/grafana/grafana/pkg/log"
|
||||
"github.com/grafana/grafana/pkg/login"
|
||||
_ "github.com/grafana/grafana/pkg/metrics"
|
||||
"github.com/grafana/grafana/pkg/middleware"
|
||||
_ "github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/registry"
|
||||
"github.com/grafana/grafana/pkg/social"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/grafana/grafana/pkg/log"
|
||||
"github.com/grafana/grafana/pkg/services/cache"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
|
||||
// self registering services
|
||||
_ "github.com/grafana/grafana/pkg/extensions"
|
||||
_ "github.com/grafana/grafana/pkg/metrics"
|
||||
_ "github.com/grafana/grafana/pkg/plugins"
|
||||
_ "github.com/grafana/grafana/pkg/services/alerting"
|
||||
_ "github.com/grafana/grafana/pkg/services/cleanup"
|
||||
_ "github.com/grafana/grafana/pkg/services/notifications"
|
||||
@ -29,10 +37,7 @@ import (
|
||||
_ "github.com/grafana/grafana/pkg/services/rendering"
|
||||
_ "github.com/grafana/grafana/pkg/services/search"
|
||||
_ "github.com/grafana/grafana/pkg/services/sqlstore"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/social" // self registering services
|
||||
_ "github.com/grafana/grafana/pkg/tracing"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func NewGrafanaServer() *GrafanaServerImpl {
|
||||
@ -72,6 +77,7 @@ func (g *GrafanaServerImpl) Run() error {
|
||||
serviceGraph.Provide(&inject.Object{Value: bus.GetBus()})
|
||||
serviceGraph.Provide(&inject.Object{Value: g.cfg})
|
||||
serviceGraph.Provide(&inject.Object{Value: routing.NewRouteRegister(middleware.RequestMetrics, middleware.RequestTracing)})
|
||||
serviceGraph.Provide(&inject.Object{Value: cache.New(5*time.Minute, 10*time.Minute)})
|
||||
|
||||
// self registered services
|
||||
services := registry.GetServices()
|
||||
@ -138,7 +144,6 @@ func (g *GrafanaServerImpl) Run() error {
|
||||
}
|
||||
|
||||
sendSystemdNotification("READY=1")
|
||||
|
||||
return g.childRoutines.Wait()
|
||||
}
|
||||
|
||||
|
@ -2,6 +2,7 @@ package login
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/grafana/grafana/pkg/bus"
|
||||
m "github.com/grafana/grafana/pkg/models"
|
||||
)
|
||||
|
14
pkg/middleware/headers.go
Normal file
14
pkg/middleware/headers.go
Normal file
@ -0,0 +1,14 @@
|
||||
package middleware
|
||||
|
||||
import (
|
||||
m "github.com/grafana/grafana/pkg/models"
|
||||
macaron "gopkg.in/macaron.v1"
|
||||
)
|
||||
|
||||
const HeaderNameNoBackendCache = "X-Grafana-NoCache"
|
||||
|
||||
func HandleNoCacheHeader() macaron.Handler {
|
||||
return func(ctx *m.ReqContext) {
|
||||
ctx.SkipCache = ctx.Req.Header.Get(HeaderNameNoBackendCache) == "true"
|
||||
}
|
||||
}
|
@ -29,6 +29,7 @@ func GetContextHandler() macaron.Handler {
|
||||
Session: session.GetSession(),
|
||||
IsSignedIn: false,
|
||||
AllowAnonymous: false,
|
||||
SkipCache: false,
|
||||
Logger: log.New("context"),
|
||||
}
|
||||
|
||||
|
@ -20,6 +20,7 @@ type ReqContext struct {
|
||||
IsSignedIn bool
|
||||
IsRenderCall bool
|
||||
AllowAnonymous bool
|
||||
SkipCache bool
|
||||
Logger log.Logger
|
||||
}
|
||||
|
||||
|
@ -207,11 +207,6 @@ func (p DsPermissionType) String() string {
|
||||
return names[int(p)]
|
||||
}
|
||||
|
||||
type GetDataSourcePermissionsForUserQuery struct {
|
||||
User *SignedInUser
|
||||
Result map[int64]DsPermissionType
|
||||
}
|
||||
|
||||
type DatasourcesPermissionFilterQuery struct {
|
||||
User *SignedInUser
|
||||
Datasources []*DataSource
|
||||
|
@ -165,6 +165,7 @@ type SignedInUser struct {
|
||||
IsAnonymous bool
|
||||
HelpFlags1 HelpFlags1
|
||||
LastSeenAt time.Time
|
||||
Teams []int64
|
||||
}
|
||||
|
||||
func (u *SignedInUser) ShouldUpdateLastSeenAt() bool {
|
||||
|
@ -29,11 +29,42 @@ func Register(descriptor *Descriptor) {
|
||||
}
|
||||
|
||||
func GetServices() []*Descriptor {
|
||||
sort.Slice(services, func(i, j int) bool {
|
||||
return services[i].InitPriority > services[j].InitPriority
|
||||
slice := getServicesWithOverrides()
|
||||
|
||||
sort.Slice(slice, func(i, j int) bool {
|
||||
return slice[i].InitPriority > slice[j].InitPriority
|
||||
})
|
||||
|
||||
return services
|
||||
return slice
|
||||
}
|
||||
|
||||
type OverrideServiceFunc func(descriptor Descriptor) (*Descriptor, bool)
|
||||
|
||||
var overrides []OverrideServiceFunc
|
||||
|
||||
func RegisterOverride(fn OverrideServiceFunc) {
|
||||
overrides = append(overrides, fn)
|
||||
}
|
||||
|
||||
func getServicesWithOverrides() []*Descriptor {
|
||||
slice := []*Descriptor{}
|
||||
for _, s := range services {
|
||||
var descriptor *Descriptor
|
||||
for _, fn := range overrides {
|
||||
if newDescriptor, override := fn(*s); override {
|
||||
descriptor = newDescriptor
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if descriptor != nil {
|
||||
slice = append(slice, descriptor)
|
||||
} else {
|
||||
slice = append(slice, s)
|
||||
}
|
||||
}
|
||||
|
||||
return slice
|
||||
}
|
||||
|
||||
// Service interface is the lowest common shape that services
|
||||
|
17
pkg/services/cache/cache.go
vendored
Normal file
17
pkg/services/cache/cache.go
vendored
Normal file
@ -0,0 +1,17 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
gocache "github.com/patrickmn/go-cache"
|
||||
)
|
||||
|
||||
type CacheService struct {
|
||||
*gocache.Cache
|
||||
}
|
||||
|
||||
func New(defaultExpiration, cleanupInterval time.Duration) *CacheService {
|
||||
return &CacheService{
|
||||
Cache: gocache.New(defaultExpiration, cleanupInterval),
|
||||
}
|
||||
}
|
53
pkg/services/datasources/cache.go
Normal file
53
pkg/services/datasources/cache.go
Normal file
@ -0,0 +1,53 @@
|
||||
package datasources
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/bus"
|
||||
m "github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/registry"
|
||||
"github.com/grafana/grafana/pkg/services/cache"
|
||||
)
|
||||
|
||||
type CacheService interface {
|
||||
GetDatasource(datasourceID int64, user *m.SignedInUser, skipCache bool) (*m.DataSource, error)
|
||||
}
|
||||
|
||||
type CacheServiceImpl struct {
|
||||
Bus bus.Bus `inject:""`
|
||||
CacheService *cache.CacheService `inject:""`
|
||||
}
|
||||
|
||||
func init() {
|
||||
registry.Register(®istry.Descriptor{
|
||||
Name: "DatasourceCacheService",
|
||||
Instance: &CacheServiceImpl{},
|
||||
InitPriority: registry.Low,
|
||||
})
|
||||
}
|
||||
|
||||
func (dc *CacheServiceImpl) Init() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dc *CacheServiceImpl) GetDatasource(datasourceID int64, user *m.SignedInUser, skipCache bool) (*m.DataSource, error) {
|
||||
cacheKey := fmt.Sprintf("ds-%d", datasourceID)
|
||||
|
||||
if !skipCache {
|
||||
if cached, found := dc.CacheService.Get(cacheKey); found {
|
||||
ds := cached.(*m.DataSource)
|
||||
if ds.OrgId == user.OrgId {
|
||||
return ds, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
query := m.GetDataSourceByIdQuery{Id: datasourceID, OrgId: user.OrgId}
|
||||
if err := dc.Bus.Dispatch(&query); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dc.CacheService.Set(cacheKey, query.Result, time.Second*5)
|
||||
return query.Result, nil
|
||||
}
|
@ -16,6 +16,7 @@ import (
|
||||
m "github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/registry"
|
||||
"github.com/grafana/grafana/pkg/services/annotations"
|
||||
"github.com/grafana/grafana/pkg/services/cache"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore/migrations"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore/sqlutil"
|
||||
@ -47,8 +48,9 @@ func init() {
|
||||
}
|
||||
|
||||
type SqlStore struct {
|
||||
Cfg *setting.Cfg `inject:""`
|
||||
Bus bus.Bus `inject:""`
|
||||
Cfg *setting.Cfg `inject:""`
|
||||
Bus bus.Bus `inject:""`
|
||||
CacheService *cache.CacheService `inject:""`
|
||||
|
||||
dbCfg DatabaseConfig
|
||||
engine *xorm.Engine
|
||||
@ -148,9 +150,11 @@ func (ss *SqlStore) Init() error {
|
||||
|
||||
// Init repo instances
|
||||
annotations.SetRepository(&SqlAnnotationRepo{})
|
||||
|
||||
ss.Bus.SetTransactionManager(ss)
|
||||
|
||||
// Register handlers
|
||||
ss.addUserQueryAndCommandHandlers()
|
||||
|
||||
// ensure admin user
|
||||
if ss.skipEnsureAdmin {
|
||||
return nil
|
||||
@ -322,6 +326,7 @@ func InitTestDB(t *testing.T) *SqlStore {
|
||||
sqlstore := &SqlStore{}
|
||||
sqlstore.skipEnsureAdmin = true
|
||||
sqlstore.Bus = bus.New()
|
||||
sqlstore.CacheService = cache.New(5*time.Minute, 10*time.Minute)
|
||||
|
||||
dbType := migrator.SQLITE
|
||||
|
||||
|
@ -15,8 +15,9 @@ import (
|
||||
"github.com/grafana/grafana/pkg/util"
|
||||
)
|
||||
|
||||
func init() {
|
||||
//bus.AddHandler("sql", CreateUser)
|
||||
func (ss *SqlStore) addUserQueryAndCommandHandlers() {
|
||||
ss.Bus.AddHandler(ss.GetSignedInUserWithCache)
|
||||
|
||||
bus.AddHandler("sql", GetUserById)
|
||||
bus.AddHandler("sql", UpdateUser)
|
||||
bus.AddHandler("sql", ChangeUserPassword)
|
||||
@ -25,7 +26,6 @@ func init() {
|
||||
bus.AddHandler("sql", SetUsingOrg)
|
||||
bus.AddHandler("sql", UpdateUserLastSeenAt)
|
||||
bus.AddHandler("sql", GetUserProfile)
|
||||
bus.AddHandler("sql", GetSignedInUser)
|
||||
bus.AddHandler("sql", SearchUsers)
|
||||
bus.AddHandler("sql", GetUserOrgList)
|
||||
bus.AddHandler("sql", DeleteUser)
|
||||
@ -345,6 +345,22 @@ func GetUserOrgList(query *m.GetUserOrgListQuery) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (ss *SqlStore) GetSignedInUserWithCache(query *m.GetSignedInUserQuery) error {
|
||||
cacheKey := fmt.Sprintf("signed-in-user-%d-%d", query.UserId, query.OrgId)
|
||||
if cached, found := ss.CacheService.Get(cacheKey); found {
|
||||
query.Result = cached.(*m.SignedInUser)
|
||||
return nil
|
||||
}
|
||||
|
||||
err := GetSignedInUser(query)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ss.CacheService.Set(cacheKey, query.Result, time.Second*5)
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetSignedInUser(query *m.GetSignedInUserQuery) error {
|
||||
orgId := "u.org_id"
|
||||
if query.OrgId > 0 {
|
||||
@ -389,6 +405,17 @@ func GetSignedInUser(query *m.GetSignedInUserQuery) error {
|
||||
user.OrgName = "Org missing"
|
||||
}
|
||||
|
||||
getTeamsByUserQuery := &m.GetTeamsByUserQuery{OrgId: user.OrgId, UserId: user.UserId}
|
||||
err = GetTeamsByUser(getTeamsByUserQuery)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
user.Teams = make([]int64, len(getTeamsByUserQuery.Result))
|
||||
for i, t := range getTeamsByUserQuery.Result {
|
||||
user.Teams[i] = t.Id
|
||||
}
|
||||
|
||||
query.Result = &user
|
||||
return err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user