Zanzana: Use StreamedListObjects to fetch full list of resources (#97025)

* Initial streamed version of list

* instantiate openfga client to use StreamedListObjects

* Add config option for using streamed version

* Use caching

* fix cache init

* Fix hashing

* refactor
This commit is contained in:
Alexander Zobnin 2024-11-27 13:05:41 +01:00 committed by GitHub
parent 3957b0b26a
commit 5a91ab46af
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 136 additions and 10 deletions

View File

@ -2,12 +2,15 @@ package server
import (
"sync"
"time"
"github.com/fullstorydev/grpchan/inprocgrpc"
authzv1 "github.com/grafana/authlib/authz/proto/v1"
openfgav1 "github.com/openfga/api/proto/openfga/v1"
"github.com/openfga/language/pkg/go/transformer"
"go.opentelemetry.io/otel"
"github.com/grafana/grafana/pkg/infra/localcache"
"github.com/grafana/grafana/pkg/infra/log"
authzextv1 "github.com/grafana/grafana/pkg/services/authz/proto/v1"
"github.com/grafana/grafana/pkg/setting"
@ -17,6 +20,8 @@ const (
resourceType = "resource"
namespaceType = "namespace"
folderTypePrefix = "folder:"
cacheCleanInterval = 2 * time.Minute
)
var _ authzv1.AuthzServiceServer = (*Server)(nil)
@ -28,12 +33,15 @@ type Server struct {
authzv1.UnimplementedAuthzServiceServer
authzextv1.UnimplementedAuthzExtentionServiceServer
openfga openfgav1.OpenFGAServiceServer
openfga openfgav1.OpenFGAServiceServer
openfgaClient openfgav1.OpenFGAServiceClient
cfg setting.ZanzanaSettings
logger log.Logger
modules []transformer.ModuleFile
stores map[string]storeInfo
storesMU *sync.Mutex
cache *localcache.CacheService
}
type storeInfo struct {
@ -56,14 +64,21 @@ func WithSchema(modules []transformer.ModuleFile) ServerOption {
}
func NewAuthzServer(cfg *setting.Cfg, openfga openfgav1.OpenFGAServiceServer) (*Server, error) {
return NewAuthz(openfga)
return NewAuthz(cfg, openfga)
}
func NewAuthz(openfga openfgav1.OpenFGAServiceServer, opts ...ServerOption) (*Server, error) {
func NewAuthz(cfg *setting.Cfg, openfga openfgav1.OpenFGAServiceServer, opts ...ServerOption) (*Server, error) {
channel := &inprocgrpc.Channel{}
openfgav1.RegisterOpenFGAServiceServer(channel, openfga)
openFGAClient := openfgav1.NewOpenFGAServiceClient(channel)
s := &Server{
openfga: openfga,
storesMU: &sync.Mutex{},
stores: make(map[string]storeInfo),
openfga: openfga,
openfgaClient: openFGAClient,
storesMU: &sync.Mutex{},
stores: make(map[string]storeInfo),
cfg: cfg.Zanzana,
cache: localcache.New(cfg.Zanzana.CheckQueryCacheTTL, cacheCleanInterval),
}
for _, o := range opts {

View File

@ -47,9 +47,16 @@ func (s *Server) List(ctx context.Context, r *authzextv1.ListRequest) (*authzext
return s.listGeneric(ctx, r.GetSubject(), relation, r.GetGroup(), r.GetResource(), store)
}
func (s *Server) listObjects(ctx context.Context, req *openfgav1.ListObjectsRequest) (*openfgav1.ListObjectsResponse, error) {
if s.cfg.UseStreamedListObjects {
return s.streamedListObjects(ctx, req)
}
return s.openfga.ListObjects(ctx, req)
}
func (s *Server) listTyped(ctx context.Context, subject, relation string, info common.TypeInfo, store *storeInfo) (*authzextv1.ListResponse, error) {
// List all resources user has access too
listRes, err := s.openfga.ListObjects(ctx, &openfgav1.ListObjectsRequest{
listRes, err := s.listObjects(ctx, &openfgav1.ListObjectsRequest{
StoreId: store.ID,
AuthorizationModelId: store.ModelID,
Type: info.Type,
@ -69,7 +76,7 @@ func (s *Server) listGeneric(ctx context.Context, subject, relation, group, reso
groupResource := structpb.NewStringValue(common.FormatGroupResource(group, resource))
// 1. List all folders subject has access to resource type in
folders, err := s.openfga.ListObjects(ctx, &openfgav1.ListObjectsRequest{
folders, err := s.listObjects(ctx, &openfgav1.ListObjectsRequest{
StoreId: store.ID,
AuthorizationModelId: store.ModelID,
Type: common.TypeFolder,
@ -86,7 +93,7 @@ func (s *Server) listGeneric(ctx context.Context, subject, relation, group, reso
}
// 2. List all resource directly assigned to subject
direct, err := s.openfga.ListObjects(ctx, &openfgav1.ListObjectsRequest{
direct, err := s.listObjects(ctx, &openfgav1.ListObjectsRequest{
StoreId: store.ID,
AuthorizationModelId: store.ModelID,
Type: common.TypeResource,

View File

@ -0,0 +1,100 @@
package server
import (
"context"
"encoding/base64"
"errors"
"hash/fnv"
"io"
openfgav1 "github.com/openfga/api/proto/openfga/v1"
)
func (s *Server) streamedListObjects(ctx context.Context, req *openfgav1.ListObjectsRequest) (*openfgav1.ListObjectsResponse, error) {
if !s.cfg.CheckQueryCache {
return s.listObjectsWithStream(ctx, req)
}
return s.streamedListObjectsCached(ctx, req)
}
func (s *Server) streamedListObjectsCached(ctx context.Context, req *openfgav1.ListObjectsRequest) (*openfgav1.ListObjectsResponse, error) {
ctx, span := tracer.Start(ctx, "authzServer.streamedListObjectsCached")
defer span.End()
reqHash, err := getRequestHash(req)
if err != nil {
return nil, err
}
if res, ok := s.cache.Get(reqHash); ok {
return res.(*openfgav1.ListObjectsResponse), nil
}
res, err := s.listObjectsWithStream(ctx, req)
if err != nil {
return nil, err
}
s.cache.Set(reqHash, res, 0)
return res, nil
}
func (s *Server) listObjectsWithStream(ctx context.Context, req *openfgav1.ListObjectsRequest) (*openfgav1.ListObjectsResponse, error) {
ctx, span := tracer.Start(ctx, "authzServer.listObjectsWithStream")
defer span.End()
r := &openfgav1.StreamedListObjectsRequest{
StoreId: req.GetStoreId(),
AuthorizationModelId: req.GetAuthorizationModelId(),
Type: req.GetType(),
Relation: req.GetRelation(),
User: req.GetUser(),
Context: req.GetContext(),
}
clientStream, err := s.openfgaClient.StreamedListObjects(ctx, r)
if err != nil {
return nil, err
}
done := make(chan struct{})
var streamedObjectIDs []string
var streamingErr error
var streamingResp *openfgav1.StreamedListObjectsResponse
go func() {
for {
streamingResp, streamingErr = clientStream.Recv()
if streamingErr == nil {
streamedObjectIDs = append(streamedObjectIDs, streamingResp.GetObject())
} else {
if errors.Is(streamingErr, io.EOF) {
streamingErr = nil
}
break
}
}
done <- struct{}{}
}()
<-done
if streamingErr != nil {
return nil, streamingErr
}
return &openfgav1.ListObjectsResponse{
Objects: streamedObjectIDs,
}, nil
}
func getRequestHash(req *openfgav1.ListObjectsRequest) (string, error) {
if req == nil {
return "", errors.New("request must not be empty")
}
hash := fnv.New64a()
_, err := hash.Write([]byte(req.String()))
if err != nil {
return "", err
}
return base64.StdEncoding.EncodeToString(hash.Sum(nil)), nil
}

View File

@ -68,7 +68,7 @@ func setup(t *testing.T, testDB db.DB, cfg *setting.Cfg) *Server {
openfga, err := NewOpenFGA(&cfg.Zanzana, store, log.NewNopLogger())
require.NoError(t, err)
srv, err := NewAuthz(openfga)
srv, err := NewAuthz(cfg, openfga)
require.NoError(t, err)
storeInf, err := srv.getStoreInfo(context.Background(), namespace)

View File

@ -34,6 +34,9 @@ type ZanzanaSettings struct {
ListObjectsMaxResults uint32
// Deadline for the ListObjects() query. Default is 3 seconds.
ListObjectsDeadline time.Duration
// Use streamed version of list objects.
// Returns full list of objects, but takes more time.
UseStreamedListObjects bool
}
func (cfg *Cfg) readZanzanaSettings() {
@ -58,6 +61,7 @@ func (cfg *Cfg) readZanzanaSettings() {
s.CheckQueryCacheTTL = sec.Key("check_query_cache_ttl").MustDuration(10 * time.Second)
s.ListObjectsDeadline = sec.Key("list_objects_deadline").MustDuration(3 * time.Second)
s.ListObjectsMaxResults = uint32(sec.Key("list_objects_max_results").MustUint(1000))
s.UseStreamedListObjects = sec.Key("use_streamed_list_objects").MustBool(false)
cfg.Zanzana = s
}