mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Zanzana: Initial work to allow partial data migrations (#89919)
* Zanana: Add Write method to interface * Zanzana: Add utilities for translating RBAC to openFGA tuple keys * RBAC: Add zanzana synchronizer * Run zanzana sync in access controll provider
This commit is contained in:
parent
f518c5978c
commit
e568b86ac0
@ -26,6 +26,7 @@ import (
|
||||
acdb "github.com/grafana/grafana/pkg/services/accesscontrol/database"
|
||||
"github.com/grafana/grafana/pkg/services/accesscontrol/ossaccesscontrol"
|
||||
"github.com/grafana/grafana/pkg/services/accesscontrol/resourcepermissions"
|
||||
"github.com/grafana/grafana/pkg/services/authz/zanzana"
|
||||
"github.com/grafana/grafana/pkg/services/contexthandler/ctxkey"
|
||||
contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model"
|
||||
"github.com/grafana/grafana/pkg/services/dashboards"
|
||||
@ -460,7 +461,10 @@ func setupServer(b testing.TB, sc benchScenario, features featuremgmt.FeatureTog
|
||||
|
||||
cfg := setting.NewCfg()
|
||||
actionSets := resourcepermissions.NewActionSetService()
|
||||
acSvc := acimpl.ProvideOSSService(sc.cfg, acdb.ProvideService(sc.db), actionSets, localcache.ProvideService(), features, tracing.InitializeTracerForTest())
|
||||
acSvc := acimpl.ProvideOSSService(
|
||||
sc.cfg, acdb.ProvideService(sc.db), actionSets, localcache.ProvideService(),
|
||||
features, tracing.InitializeTracerForTest(), zanzana.NewNoopClient(), sc.db,
|
||||
)
|
||||
|
||||
folderPermissions, err := ossaccesscontrol.ProvideFolderPermissions(
|
||||
cfg, features, routing.NewRouteRegister(), sc.db, ac, license, &dashboards.FakeDashboardStore{}, folderServiceWithFlagOn, acSvc, sc.teamSvc, sc.userSvc, actionSets)
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/services/accesscontrol"
|
||||
"github.com/grafana/grafana/pkg/services/accesscontrol/acimpl"
|
||||
"github.com/grafana/grafana/pkg/services/authz/zanzana"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/quota/quotaimpl"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||
@ -89,7 +90,7 @@ func initializeConflictResolver(cmd *utils.ContextCommandLine, f Formatter, ctx
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%v: %w", "failed to initialize tracer service", err)
|
||||
}
|
||||
acService, err := acimpl.ProvideService(cfg, s, routing, nil, nil, nil, features, tracer)
|
||||
acService, err := acimpl.ProvideService(cfg, s, routing, nil, nil, nil, features, tracer, zanzana.NewNoopClient())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%v: %w", "failed to get access control", err)
|
||||
}
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/accesscontrol/migrator"
|
||||
"github.com/grafana/grafana/pkg/services/accesscontrol/pluginutils"
|
||||
"github.com/grafana/grafana/pkg/services/authn"
|
||||
"github.com/grafana/grafana/pkg/services/authz/zanzana"
|
||||
"github.com/grafana/grafana/pkg/services/dashboards"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/folder"
|
||||
@ -46,8 +47,12 @@ var SharedWithMeFolderPermission = accesscontrol.Permission{
|
||||
|
||||
var OSSRolesPrefixes = []string{accesscontrol.ManagedRolePrefix, accesscontrol.ExternalServiceRolePrefix}
|
||||
|
||||
func ProvideService(cfg *setting.Cfg, db db.DB, routeRegister routing.RouteRegister, cache *localcache.CacheService, accessControl accesscontrol.AccessControl, actionResolver accesscontrol.ActionResolver, features featuremgmt.FeatureToggles, tracer tracing.Tracer) (*Service, error) {
|
||||
service := ProvideOSSService(cfg, database.ProvideService(db), actionResolver, cache, features, tracer)
|
||||
func ProvideService(
|
||||
cfg *setting.Cfg, db db.DB, routeRegister routing.RouteRegister, cache *localcache.CacheService,
|
||||
accessControl accesscontrol.AccessControl, actionResolver accesscontrol.ActionResolver,
|
||||
features featuremgmt.FeatureToggles, tracer tracing.Tracer, zclient zanzana.Client,
|
||||
) (*Service, error) {
|
||||
service := ProvideOSSService(cfg, database.ProvideService(db), actionResolver, cache, features, tracer, zclient, db)
|
||||
|
||||
api.NewAccessControlAPI(routeRegister, accessControl, service, features).RegisterAPIEndpoints()
|
||||
if err := accesscontrol.DeclareFixedRoles(service, cfg); err != nil {
|
||||
@ -65,7 +70,11 @@ func ProvideService(cfg *setting.Cfg, db db.DB, routeRegister routing.RouteRegis
|
||||
return service, nil
|
||||
}
|
||||
|
||||
func ProvideOSSService(cfg *setting.Cfg, store accesscontrol.Store, actionResolver accesscontrol.ActionResolver, cache *localcache.CacheService, features featuremgmt.FeatureToggles, tracer tracing.Tracer) *Service {
|
||||
func ProvideOSSService(
|
||||
cfg *setting.Cfg, store accesscontrol.Store, actionResolver accesscontrol.ActionResolver,
|
||||
cache *localcache.CacheService, features featuremgmt.FeatureToggles, tracer tracing.Tracer,
|
||||
zclient zanzana.Client, db db.DB,
|
||||
) *Service {
|
||||
s := &Service{
|
||||
actionResolver: actionResolver,
|
||||
cache: cache,
|
||||
@ -75,6 +84,7 @@ func ProvideOSSService(cfg *setting.Cfg, store accesscontrol.Store, actionResolv
|
||||
roles: accesscontrol.BuildBasicRoleDefinitions(),
|
||||
store: store,
|
||||
tracer: tracer,
|
||||
sync: migrator.NewZanzanaSynchroniser(zclient, db),
|
||||
}
|
||||
|
||||
return s
|
||||
@ -91,6 +101,7 @@ type Service struct {
|
||||
roles map[string]*accesscontrol.RoleDTO
|
||||
store accesscontrol.Store
|
||||
tracer tracing.Tracer
|
||||
sync *migrator.ZanzanaSynchroniser
|
||||
}
|
||||
|
||||
func (s *Service) GetUsageStats(_ context.Context) map[string]any {
|
||||
@ -397,6 +408,13 @@ func (s *Service) RegisterFixedRoles(ctx context.Context) error {
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
if s.features.IsEnabledGlobally(featuremgmt.FlagZanzana) {
|
||||
if err := s.sync.Sync(context.Background()); err != nil {
|
||||
s.log.Error("Failed to synchronise permissions to zanzana ", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -69,6 +69,8 @@ func TestUsageMetrics(t *testing.T) {
|
||||
localcache.ProvideService(),
|
||||
featuremgmt.WithFeatures(),
|
||||
tracing.InitializeTracerForTest(),
|
||||
nil,
|
||||
nil,
|
||||
)
|
||||
assert.Equal(t, tt.expectedValue, s.GetUsageStats(context.Background())["stats.oss.accesscontrol.enabled.count"])
|
||||
})
|
||||
|
128
pkg/services/accesscontrol/migrator/zanzana.go
Normal file
128
pkg/services/accesscontrol/migrator/zanzana.go
Normal file
@ -0,0 +1,128 @@
|
||||
package migrator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
openfgav1 "github.com/openfga/api/proto/openfga/v1"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/db"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/authz/zanzana"
|
||||
)
|
||||
|
||||
// A TupleCollector is responsible to build and store [openfgav1.TupleKey] into provided tuple map.
|
||||
// They key used should be a unique group key for the collector so we can skip over an already synced group.
|
||||
type TupleCollector func(ctx context.Context, tuples map[string][]*openfgav1.TupleKey) error
|
||||
|
||||
// ZanzanaSynchroniser is a component to sync RBAC permissions to zanzana.
|
||||
// We should rewrite the migration after we have "migrated" all possible actions
|
||||
// into our schema. This will only do a one time migration for each action so its
|
||||
// is not really syncing the full rbac state. If a fresh sync is needed the tuple
|
||||
// needs to be cleared first.
|
||||
type ZanzanaSynchroniser struct {
|
||||
log log.Logger
|
||||
client zanzana.Client
|
||||
collectors []TupleCollector
|
||||
}
|
||||
|
||||
func NewZanzanaSynchroniser(client zanzana.Client, store db.DB, collectors ...TupleCollector) *ZanzanaSynchroniser {
|
||||
// Append shared collectors that is used by both enterprise and oss
|
||||
collectors = append(collectors, managedPermissionsCollector(store))
|
||||
|
||||
return &ZanzanaSynchroniser{
|
||||
log: log.New("zanzana.sync"),
|
||||
collectors: collectors,
|
||||
}
|
||||
}
|
||||
|
||||
// Sync runs all collectors and tries to write all collected tuples.
|
||||
// It will skip over any "sync group" that has already been written.
|
||||
func (z *ZanzanaSynchroniser) Sync(ctx context.Context) error {
|
||||
tuplesMap := make(map[string][]*openfgav1.TupleKey)
|
||||
|
||||
for _, c := range z.collectors {
|
||||
if err := c(ctx, tuplesMap); err != nil {
|
||||
return fmt.Errorf("failed to collect permissions: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
for key, tuples := range tuplesMap {
|
||||
if err := batch(len(tuples), 100, func(start, end int) error {
|
||||
return z.client.Write(ctx, &openfgav1.WriteRequest{
|
||||
Writes: &openfgav1.WriteRequestWrites{
|
||||
TupleKeys: tuples[start:end],
|
||||
},
|
||||
})
|
||||
}); err != nil {
|
||||
if strings.Contains(err.Error(), "cannot write a tuple which already exists") {
|
||||
z.log.Debug("Skipping already synced permissions", "sync_key", key)
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// managedPermissionsCollector collects managed permissions into provided tuple map.
|
||||
// It will only store actions that are supported by our schema. Managed permissions can
|
||||
// be directly mapped to user/team/role without having to write an intermediate role.
|
||||
func managedPermissionsCollector(store db.DB) TupleCollector {
|
||||
return func(ctx context.Context, tuples map[string][]*openfgav1.TupleKey) error {
|
||||
const collectorID = "managed"
|
||||
const query = `
|
||||
SELECT ur.user_id, p.action, p.kind, p.identifier, r.org_id FROM permission p
|
||||
INNER JOIN role r on p.role_id = r.id
|
||||
LEFT JOIN user_role ur on r.id = ur.role_id
|
||||
LEFT JOIN team_role tr on r.id = tr.role_id
|
||||
LEFT JOIN builtin_role br on r.id = br.role_id
|
||||
WHERE r.name LIKE 'managed:%'
|
||||
`
|
||||
type Permission struct {
|
||||
RoleName string `xorm:"role_name"`
|
||||
OrgID int64 `xorm:"org_id"`
|
||||
Action string `xorm:"action"`
|
||||
Kind string
|
||||
Identifier string
|
||||
UserID int64 `xorm:"user_id"`
|
||||
TeamID int64 `xorm:"user_id"`
|
||||
}
|
||||
|
||||
var permissions []Permission
|
||||
err := store.WithDbSession(ctx, func(sess *db.Session) error {
|
||||
return sess.SQL(query).Find(&permissions)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, p := range permissions {
|
||||
var subject string
|
||||
if p.UserID > 0 {
|
||||
subject = zanzana.NewObject(zanzana.TypeUser, strconv.FormatInt(p.UserID, 10))
|
||||
} else if p.TeamID > 0 {
|
||||
subject = zanzana.NewObject(zanzana.TypeTeam, strconv.FormatInt(p.TeamID, 10))
|
||||
} else {
|
||||
// FIXME(kalleep): Unsuported role binding (org role). We need to have basic roles in place
|
||||
continue
|
||||
}
|
||||
|
||||
tuple, ok := zanzana.TranslateToTuple(subject, p.Action, p.Kind, p.Identifier, p.OrgID)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// our "sync key" is a combination of collectorID and action so we can run this
|
||||
// sync new data when more actions are supported
|
||||
key := fmt.Sprintf("%s-%s", collectorID, p.Action)
|
||||
tuples[key] = append(tuples[key], tuple)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
@ -15,8 +15,9 @@ import (
|
||||
|
||||
// Client is a wrapper around [openfgav1.OpenFGAServiceClient]
|
||||
type Client interface {
|
||||
Check(ctx context.Context, in *openfgav1.CheckRequest, opts ...grpc.CallOption) (*openfgav1.CheckResponse, error)
|
||||
ListObjects(ctx context.Context, in *openfgav1.ListObjectsRequest, opts ...grpc.CallOption) (*openfgav1.ListObjectsResponse, error)
|
||||
Check(ctx context.Context, in *openfgav1.CheckRequest) (*openfgav1.CheckResponse, error)
|
||||
ListObjects(ctx context.Context, in *openfgav1.ListObjectsRequest) (*openfgav1.ListObjectsResponse, error)
|
||||
Write(ctx context.Context, in *openfgav1.WriteRequest) error
|
||||
}
|
||||
|
||||
func NewClient(ctx context.Context, cc grpc.ClientConnInterface, cfg *setting.Cfg) (*client.Client, error) {
|
||||
@ -27,3 +28,7 @@ func NewClient(ctx context.Context, cc grpc.ClientConnInterface, cfg *setting.Cf
|
||||
client.WithLogger(log.New("zanzana-client")),
|
||||
)
|
||||
}
|
||||
|
||||
func NewNoopClient() *client.NoopClient {
|
||||
return client.NewNoop()
|
||||
}
|
||||
|
@ -70,12 +70,23 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, opts ...ClientOption)
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *Client) Check(ctx context.Context, in *openfgav1.CheckRequest, opts ...grpc.CallOption) (*openfgav1.CheckResponse, error) {
|
||||
return c.client.Check(ctx, in, opts...)
|
||||
func (c *Client) Check(ctx context.Context, in *openfgav1.CheckRequest) (*openfgav1.CheckResponse, error) {
|
||||
in.StoreId = c.storeID
|
||||
in.AuthorizationModelId = c.modelID
|
||||
return c.client.Check(ctx, in)
|
||||
}
|
||||
|
||||
func (c *Client) ListObjects(ctx context.Context, in *openfgav1.ListObjectsRequest, opts ...grpc.CallOption) (*openfgav1.ListObjectsResponse, error) {
|
||||
return c.client.ListObjects(ctx, in, opts...)
|
||||
func (c *Client) ListObjects(ctx context.Context, in *openfgav1.ListObjectsRequest) (*openfgav1.ListObjectsResponse, error) {
|
||||
in.StoreId = c.storeID
|
||||
in.AuthorizationModelId = c.modelID
|
||||
return c.client.ListObjects(ctx, in)
|
||||
}
|
||||
|
||||
func (c *Client) Write(ctx context.Context, in *openfgav1.WriteRequest) error {
|
||||
in.StoreId = c.storeID
|
||||
in.AuthorizationModelId = c.modelID
|
||||
_, err := c.client.Write(ctx, in)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Client) getOrCreateStore(ctx context.Context, name string) (*openfgav1.Store, error) {
|
||||
|
@ -3,8 +3,6 @@ package client
|
||||
import (
|
||||
"context"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
openfgav1 "github.com/openfga/api/proto/openfga/v1"
|
||||
)
|
||||
|
||||
@ -14,10 +12,14 @@ func NewNoop() *NoopClient {
|
||||
|
||||
type NoopClient struct{}
|
||||
|
||||
func (nc NoopClient) Check(ctx context.Context, in *openfgav1.CheckRequest, opts ...grpc.CallOption) (*openfgav1.CheckResponse, error) {
|
||||
func (nc NoopClient) Check(ctx context.Context, in *openfgav1.CheckRequest) (*openfgav1.CheckResponse, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (nc NoopClient) ListObjects(ctx context.Context, in *openfgav1.ListObjectsRequest, opts ...grpc.CallOption) (*openfgav1.ListObjectsResponse, error) {
|
||||
func (nc NoopClient) ListObjects(ctx context.Context, in *openfgav1.ListObjectsRequest) (*openfgav1.ListObjectsResponse, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (nc NoopClient) Write(ctx context.Context, in *openfgav1.WriteRequest) error {
|
||||
return nil
|
||||
}
|
||||
|
60
pkg/services/authz/zanzana/zanzana.go
Normal file
60
pkg/services/authz/zanzana/zanzana.go
Normal file
@ -0,0 +1,60 @@
|
||||
package zanzana
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
openfgav1 "github.com/openfga/api/proto/openfga/v1"
|
||||
)
|
||||
|
||||
const (
|
||||
TypeUser string = "user"
|
||||
TypeTeam string = "team"
|
||||
)
|
||||
|
||||
func NewObject(typ, id string) string {
|
||||
return fmt.Sprintf("%s:%s", typ, id)
|
||||
}
|
||||
|
||||
func NewScopedObject(typ, id, scope string) string {
|
||||
return NewObject(typ, fmt.Sprintf("%s-%s", scope, id))
|
||||
}
|
||||
|
||||
// rbac action to relation translation
|
||||
var actionTranslations = map[string]string{}
|
||||
|
||||
type kindTranslation struct {
|
||||
typ string
|
||||
orgScoped bool
|
||||
}
|
||||
|
||||
// all kinds that we can translate into a openFGA object
|
||||
var kindTranslations = map[string]kindTranslation{}
|
||||
|
||||
func TranslateToTuple(user string, action, kind, identifier string, orgID int64) (*openfgav1.TupleKey, bool) {
|
||||
relation, ok := actionTranslations[action]
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
t, ok := kindTranslations[kind]
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
tuple := &openfgav1.TupleKey{
|
||||
Relation: relation,
|
||||
}
|
||||
|
||||
tuple.User = user
|
||||
tuple.Relation = relation
|
||||
|
||||
// UID in grafana are not guarantee to be unique across orgs so we need to scope them.
|
||||
if t.orgScoped {
|
||||
tuple.Object = NewScopedObject(t.typ, identifier, strconv.FormatInt(orgID, 10))
|
||||
} else {
|
||||
tuple.Object = NewObject(t.typ, identifier)
|
||||
}
|
||||
|
||||
return tuple, true
|
||||
}
|
@ -45,7 +45,10 @@ func setupTestEnv(t *testing.T) *TestEnv {
|
||||
}
|
||||
logger := log.New("extsvcaccounts.test")
|
||||
env.S = &ExtSvcAccountsService{
|
||||
acSvc: acimpl.ProvideOSSService(cfg, env.AcStore, &resourcepermissions.FakeActionSetSvc{}, localcache.New(0, 0), fmgt, tracing.InitializeTracerForTest()),
|
||||
acSvc: acimpl.ProvideOSSService(
|
||||
cfg, env.AcStore, &resourcepermissions.FakeActionSetSvc{},
|
||||
localcache.New(0, 0), fmgt, tracing.InitializeTracerForTest(), nil, nil,
|
||||
),
|
||||
features: fmgt,
|
||||
logger: logger,
|
||||
metrics: newMetrics(nil, env.SaSvc, logger),
|
||||
|
Loading…
Reference in New Issue
Block a user