Storage: Add authz access client stubs (#95548)

authz stubs

authz stubs

lint fix

Add folder

move check.Name on creat

fix watch
This commit is contained in:
Ryan McKinley
2024-11-13 15:17:15 +03:00
committed by GitHub
parent 1129647039
commit d9b87ef987
4 changed files with 185 additions and 40 deletions

View File

@@ -0,0 +1,26 @@
package resource
import (
"context"
"github.com/grafana/authlib/authz"
"github.com/grafana/authlib/claims"
)
type staticAuthzClient struct {
allowed bool
}
// Check implements authz.AccessClient.
func (c *staticAuthzClient) Check(ctx context.Context, id claims.AuthInfo, req authz.CheckRequest) (authz.CheckResponse, error) {
return authz.CheckResponse{Allowed: c.allowed}, nil
}
// Compile implements authz.AccessClient.
func (c *staticAuthzClient) Compile(ctx context.Context, id claims.AuthInfo, req authz.ListRequest) (authz.ItemChecker, error) {
return func(namespace string, name, folder string) bool {
return c.allowed
}, nil
}
var _ authz.AccessClient = &staticAuthzClient{}

View File

@@ -8,10 +8,6 @@ import (
)
type WriteAccessHooks struct {
// Check if a user has access to write folders
// When this is nil, no resources can have folders configured
Folder func(ctx context.Context, user claims.AuthInfo, uid string) bool
// When configured, this will make sure a user is allowed to save to a given origin
Origin func(ctx context.Context, user claims.AuthInfo, origin string) bool
}
@@ -24,16 +20,6 @@ type LifecycleHooks interface {
Stop(context.Context) error
}
func (a *WriteAccessHooks) CanWriteFolder(ctx context.Context, user claims.AuthInfo, uid string) error {
if a.Folder == nil {
return fmt.Errorf("writing folders is not supported")
}
if !a.Folder(ctx, user, uid) {
return fmt.Errorf("not allowed to write resource to folder")
}
return nil
}
func (a *WriteAccessHooks) CanWriteOrigin(ctx context.Context, user claims.AuthInfo, uid string) error {
if a.Origin == nil || uid == "UI" {
return nil // default to OK

View File

@@ -17,6 +17,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"github.com/grafana/authlib/authz"
"github.com/grafana/authlib/claims"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/apimachinery/utils"
@@ -140,7 +141,10 @@ type ResourceServerOptions struct {
// Check if a user has access to write folders
// When this is nil, no resources can have folders configured
WriteAccess WriteAccessHooks
WriteHooks WriteAccessHooks
// Link RBAC
AccessClient authz.AccessClient
// Callbacks for startup and shutdown
Lifecycle LifecycleHooks
@@ -161,6 +165,10 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
return nil, fmt.Errorf("missing Backend implementation")
}
if opts.AccessClient == nil {
opts.AccessClient = &staticAuthzClient{allowed: true} // everything OK
}
if opts.Diagnostics == nil {
opts.Diagnostics = &noopService{}
}
@@ -209,7 +217,8 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
index: opts.Index,
blob: blobstore,
diagnostics: opts.Diagnostics,
access: opts.WriteAccess,
access: opts.AccessClient,
writeHooks: opts.WriteHooks,
lifecycle: opts.Lifecycle,
now: opts.Now,
ctx: ctx,
@@ -228,7 +237,8 @@ type server struct {
blob BlobSupport
index ResourceIndexServer
diagnostics DiagnosticsServer
access WriteAccessHooks
access authz.AccessClient
writeHooks WriteAccessHooks
lifecycle LifecycleHooks
now func() int64
mostRecentRV atomic.Int64 // The most recent resource version seen by the server
@@ -312,6 +322,13 @@ func (s *server) newEvent(ctx context.Context, user claims.AuthInfo, key *Resour
s.log.Error("object must not include a resource version", "key", key)
}
check := authz.CheckRequest{
Verb: "create",
Group: key.Group,
Resource: key.Resource,
Namespace: key.Namespace,
}
event := &WriteEvent{
Value: value,
Key: key,
@@ -321,6 +338,7 @@ func (s *server) newEvent(ctx context.Context, user claims.AuthInfo, key *Resour
event.Type = WatchEvent_ADDED
} else {
event.Type = WatchEvent_MODIFIED
check.Verb = "update"
temp := &unstructured.Unstructured{}
err = temp.UnmarshalJSON(oldValue)
@@ -364,19 +382,24 @@ func (s *server) newEvent(ctx context.Context, user claims.AuthInfo, key *Resour
return nil, err
}
folder := obj.GetFolder()
if folder != "" {
err = s.access.CanWriteFolder(ctx, user, folder)
if err != nil {
return nil, AsErrorResult(err)
check.Folder = obj.GetFolder()
check.Name = key.Name
a, err := s.access.Check(ctx, user, check)
if err != nil {
return nil, AsErrorResult(err)
}
if !a.Allowed {
return nil, &ErrorResult{
Code: http.StatusForbidden,
}
}
origin, err := obj.GetOriginInfo()
if err != nil {
return nil, NewBadRequestError("invalid origin info")
}
if origin != nil {
err = s.access.CanWriteOrigin(ctx, user, origin.Name)
err = s.writeHooks.CanWriteOrigin(ctx, user, origin.Name)
if err != nil {
return nil, AsErrorResult(err)
}
@@ -406,7 +429,7 @@ func (s *server) Create(ctx context.Context, req *CreateRequest) (*CreateRespons
if found != nil && len(found.Value) > 0 {
rsp.Error = &ErrorResult{
Code: http.StatusConflict,
Message: "key already exists",
Message: "key already exists", // TODO?? soft delete replace?
}
return rsp, nil
}
@@ -416,6 +439,7 @@ func (s *server) Create(ctx context.Context, req *CreateRequest) (*CreateRespons
rsp.Error = e
return rsp, nil
}
var err error
rsp.ResourceVersion, err = s.backend.WriteEvent(ctx, *event)
if err != nil {
@@ -491,6 +515,14 @@ func (s *server) Delete(ctx context.Context, req *DeleteRequest) (*DeleteRespons
if req.ResourceVersion < 0 {
return nil, apierrors.NewBadRequest("update must include the previous version")
}
user, ok := claims.From(ctx)
if !ok || user == nil {
rsp.Error = &ErrorResult{
Message: "no user found in context",
Code: http.StatusUnauthorized,
}
return rsp, nil
}
latest := s.backend.ReadResource(ctx, &ReadRequest{
Key: req.Key,
@@ -504,6 +536,25 @@ func (s *server) Delete(ctx context.Context, req *DeleteRequest) (*DeleteRespons
return rsp, nil
}
access, err := s.access.Check(ctx, user, authz.CheckRequest{
Verb: "delete",
Group: req.Key.Group,
Resource: req.Key.Resource,
Namespace: req.Key.Namespace,
Name: req.Key.Name,
Folder: latest.Folder,
})
if err != nil {
rsp.Error = AsErrorResult(err)
return rsp, nil
}
if !access.Allowed {
rsp.Error = &ErrorResult{
Code: http.StatusForbidden,
}
return rsp, nil
}
now := metav1.NewTime(time.UnixMilli(s.now()))
event := WriteEvent{
Key: req.Key,
@@ -515,7 +566,7 @@ func (s *server) Delete(ctx context.Context, req *DeleteRequest) (*DeleteRespons
return nil, apierrors.NewBadRequest("unable to get user")
}
marker := &DeletedMarker{}
err := json.Unmarshal(latest.Value, marker)
err = json.Unmarshal(latest.Value, marker)
if err != nil {
return nil, apierrors.NewBadRequest(
fmt.Sprintf("unable to read previous object, %v", err))
@@ -551,6 +602,14 @@ func (s *server) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, err
if err := s.Init(ctx); err != nil {
return nil, err
}
user, ok := claims.From(ctx)
if !ok || user == nil {
return &ReadResponse{
Error: &ErrorResult{
Message: "no user found in context",
Code: http.StatusUnauthorized,
}}, nil
}
// if req.Key.Group == "" {
// status, _ := AsErrorResult(apierrors.NewBadRequest("missing group"))
@@ -561,7 +620,24 @@ func (s *server) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, err
}
rsp := s.backend.ReadResource(ctx, req)
// TODO, check folder permissions etc
a, err := s.access.Check(ctx, user, authz.CheckRequest{
Verb: "get",
Group: req.Key.Group,
Resource: req.Key.Resource,
Namespace: req.Key.Namespace,
Name: req.Key.Name,
Folder: rsp.Folder,
})
if err != nil {
return &ReadResponse{Error: AsErrorResult(err)}, nil
}
if !a.Allowed {
return &ReadResponse{
Error: &ErrorResult{
Code: http.StatusForbidden,
}}, nil
}
return &ReadResponse{
ResourceVersion: rsp.ResourceVersion,
Value: rsp.Value,
@@ -573,6 +649,15 @@ func (s *server) List(ctx context.Context, req *ListRequest) (*ListResponse, err
ctx, span := s.tracer.Start(ctx, "storage_server.List")
defer span.End()
user, ok := claims.From(ctx)
if !ok || user == nil {
return &ListResponse{
Error: &ErrorResult{
Message: "no user found in context",
Code: http.StatusUnauthorized,
}}, nil
}
if err := s.Init(ctx); err != nil {
return nil, err
}
@@ -582,19 +667,37 @@ func (s *server) List(ctx context.Context, req *ListRequest) (*ListResponse, err
maxPageBytes := 1024 * 1024 * 2 // 2mb/page
pageBytes := 0
rsp := &ListResponse{}
key := req.Options.Key
checker, err := s.access.Compile(ctx, user, authz.ListRequest{
Group: key.Group,
Resource: key.Resource,
Namespace: key.Namespace,
})
if err != nil {
return &ListResponse{Error: AsErrorResult(err)}, nil
}
if checker == nil {
return &ListResponse{Error: &ErrorResult{
Code: http.StatusForbidden,
}}, nil
}
rv, err := s.backend.ListIterator(ctx, req, func(iter ListIterator) error {
for iter.Next() {
if err := iter.Error(); err != nil {
return err
}
// TODO: add authz filters
item := &ResourceWrapper{
ResourceVersion: iter.ResourceVersion(),
Value: iter.Value(),
}
if !checker(iter.Namespace(), iter.Name(), iter.Folder()) {
continue
}
pageBytes += len(item.Value)
rsp.Items = append(rsp.Items, item)
if len(rsp.Items) >= int(req.Limit) || pageBytes >= maxPageBytes {
@@ -644,6 +747,7 @@ func (s *server) initWatcher() error {
return err
}
//nolint:gocyclo
func (s *server) Watch(req *WatchRequest, srv ResourceStore_WatchServer) error {
ctx := srv.Context()
@@ -651,6 +755,24 @@ func (s *server) Watch(req *WatchRequest, srv ResourceStore_WatchServer) error {
return err
}
user, ok := claims.From(ctx)
if !ok || user == nil {
return apierrors.NewUnauthorized("no user found in context")
}
key := req.Options.Key
checker, err := s.access.Compile(ctx, user, authz.ListRequest{
Group: key.Group,
Resource: key.Resource,
Namespace: key.Namespace,
})
if err != nil {
return err
}
if checker == nil {
return apierrors.NewUnauthorized("not allowed to list anything") // ?? or a single error?
}
// Start listening -- this will buffer any changes that happen while we backfill.
// If events are generated faster than we can process them, then some events will be dropped.
// TODO: Think of a way to allow the client to catch up.
@@ -724,6 +846,10 @@ func (s *server) Watch(req *WatchRequest, srv ResourceStore_WatchServer) error {
}
s.log.Debug("Server Broadcasting", "type", event.Type, "rv", event.ResourceVersion, "previousRV", event.PreviousRV, "group", event.Key.Group, "namespace", event.Key.Namespace, "resource", event.Key.Resource, "name", event.Key.Name)
if event.ResourceVersion > since && matchesQueryKey(req.Options.Key, event.Key) {
if !checker(event.Key.Namespace, event.Key.Name, event.Folder) {
continue
}
value := event.Value
// remove the delete marker stored in the value for deleted objects
if event.Type == WatchEvent_DELETED {

View File

@@ -5,14 +5,16 @@ import (
"os"
"strings"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/authlib/claims"
"github.com/grafana/grafana/pkg/apimachinery/identity"
infraDB "github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
"github.com/prometheus/client_golang/prometheus"
)
// Creates a new ResourceServer
@@ -48,17 +50,6 @@ func NewResourceServer(ctx context.Context, db infraDB.DB, cfg *setting.Cfg, fea
opts.Diagnostics = store
opts.Lifecycle = store
if features.IsEnabledGlobally(featuremgmt.FlagKubernetesFolders) {
opts.WriteAccess = resource.WriteAccessHooks{
Folder: func(ctx context.Context, user claims.AuthInfo, uid string) bool {
// #TODO build on the logic here
// #TODO only enable write access when the resource being written in the folder
// is another folder
return true
},
}
}
if features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageSearch) {
opts.Index = resource.NewResourceIndexServer(cfg, tracer)
}
@@ -70,6 +61,22 @@ func NewResourceServer(ctx context.Context, db infraDB.DB, cfg *setting.Cfg, fea
// Initialize the indexer if one is configured
if opts.Index != nil {
// TODO: Create a proper identity for the indexer
orgId := int64(1)
ctx = identity.WithRequester(ctx, &identity.StaticRequester{
Type: claims.TypeServiceAccount, // system:apiserver
UserID: 1,
OrgID: int64(1),
Name: "admin",
Login: "admin",
OrgRole: identity.RoleAdmin,
IsGrafanaAdmin: true,
Permissions: map[int64]map[string][]string{
orgId: {
"*": {"*"}, // all resources, all scopes
},
},
})
_, err = rs.(resource.ResourceIndexer).Index(ctx)
if err != nil {
return nil, err