mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Zanzana: periodic sync of team members (#94752)
* Rewrite zanzana collector to fetch all available pages * Register access control as a background service * If zanzana is enabled we run Syncs and start Reconciliation job * Update pkg/services/authz/zanzana/client/client.go Co-authored-by: Alexander Zobnin <alexanderzobnin@gmail.com> * Use server lock when doing performing reconciliation
This commit is contained in:
parent
a50507e645
commit
4083b2208e
@ -461,7 +461,7 @@ func setupServer(b testing.TB, sc benchScenario, features featuremgmt.FeatureTog
|
|||||||
actionSets := resourcepermissions.NewActionSetService(features)
|
actionSets := resourcepermissions.NewActionSetService(features)
|
||||||
acSvc := acimpl.ProvideOSSService(
|
acSvc := acimpl.ProvideOSSService(
|
||||||
sc.cfg, acdb.ProvideService(sc.db), actionSets, localcache.ProvideService(),
|
sc.cfg, acdb.ProvideService(sc.db), actionSets, localcache.ProvideService(),
|
||||||
features, tracing.InitializeTracerForTest(), zanzana.NewNoopClient(), sc.db, permreg.ProvidePermissionRegistry(),
|
features, tracing.InitializeTracerForTest(), zanzana.NewNoopClient(), sc.db, permreg.ProvidePermissionRegistry(), nil,
|
||||||
)
|
)
|
||||||
fStore := folderimpl.ProvideStore(sc.db)
|
fStore := folderimpl.ProvideStore(sc.db)
|
||||||
folderPermissions, err := ossaccesscontrol.ProvideFolderPermissions(
|
folderPermissions, err := ossaccesscontrol.ProvideFolderPermissions(
|
||||||
|
@ -86,7 +86,7 @@ func initializeConflictResolver(cmd *utils.ContextCommandLine, f Formatter, ctx
|
|||||||
}
|
}
|
||||||
routing := routing.ProvideRegister()
|
routing := routing.ProvideRegister()
|
||||||
|
|
||||||
acService, err := acimpl.ProvideService(cfg, s, routing, nil, nil, nil, features, tracer, zanzana.NewNoopClient(), permreg.ProvidePermissionRegistry())
|
acService, err := acimpl.ProvideService(cfg, s, routing, nil, nil, nil, features, tracer, zanzana.NewNoopClient(), permreg.ProvidePermissionRegistry(), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("%v: %w", "failed to get access control", err)
|
return nil, fmt.Errorf("%v: %w", "failed to get access control", err)
|
||||||
}
|
}
|
||||||
|
@ -9,6 +9,7 @@ import (
|
|||||||
"github.com/grafana/grafana/pkg/infra/usagestats/statscollector"
|
"github.com/grafana/grafana/pkg/infra/usagestats/statscollector"
|
||||||
"github.com/grafana/grafana/pkg/registry"
|
"github.com/grafana/grafana/pkg/registry"
|
||||||
apiregistry "github.com/grafana/grafana/pkg/registry/apis"
|
apiregistry "github.com/grafana/grafana/pkg/registry/apis"
|
||||||
|
"github.com/grafana/grafana/pkg/services/accesscontrol"
|
||||||
"github.com/grafana/grafana/pkg/services/anonymous/anonimpl"
|
"github.com/grafana/grafana/pkg/services/anonymous/anonimpl"
|
||||||
grafanaapiserver "github.com/grafana/grafana/pkg/services/apiserver"
|
grafanaapiserver "github.com/grafana/grafana/pkg/services/apiserver"
|
||||||
"github.com/grafana/grafana/pkg/services/auth"
|
"github.com/grafana/grafana/pkg/services/auth"
|
||||||
@ -65,6 +66,7 @@ func ProvideBackgroundServiceRegistry(
|
|||||||
ssoSettings *ssosettingsimpl.Service,
|
ssoSettings *ssosettingsimpl.Service,
|
||||||
pluginExternal *pluginexternal.Service,
|
pluginExternal *pluginexternal.Service,
|
||||||
pluginInstaller *plugininstaller.Service,
|
pluginInstaller *plugininstaller.Service,
|
||||||
|
accessControl accesscontrol.Service,
|
||||||
// Need to make sure these are initialized, is there a better place to put them?
|
// Need to make sure these are initialized, is there a better place to put them?
|
||||||
_ dashboardsnapshots.Service,
|
_ dashboardsnapshots.Service,
|
||||||
_ serviceaccounts.Service, _ *guardian.Provider,
|
_ serviceaccounts.Service, _ *guardian.Provider,
|
||||||
@ -108,6 +110,7 @@ func ProvideBackgroundServiceRegistry(
|
|||||||
ssoSettings,
|
ssoSettings,
|
||||||
pluginExternal,
|
pluginExternal,
|
||||||
pluginInstaller,
|
pluginInstaller,
|
||||||
|
accessControl,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,6 +36,7 @@ type AccessControl interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Service interface {
|
type Service interface {
|
||||||
|
registry.BackgroundService
|
||||||
registry.ProvidesUsageStats
|
registry.ProvidesUsageStats
|
||||||
// GetRoleByName returns a role by name
|
// GetRoleByName returns a role by name
|
||||||
GetRoleByName(ctx context.Context, orgID int64, roleName string) (*RoleDTO, error)
|
GetRoleByName(ctx context.Context, orgID int64, roleName string) (*RoleDTO, error)
|
||||||
|
@ -18,12 +18,14 @@ import (
|
|||||||
"github.com/grafana/grafana/pkg/infra/localcache"
|
"github.com/grafana/grafana/pkg/infra/localcache"
|
||||||
"github.com/grafana/grafana/pkg/infra/log"
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
"github.com/grafana/grafana/pkg/infra/metrics"
|
"github.com/grafana/grafana/pkg/infra/metrics"
|
||||||
|
"github.com/grafana/grafana/pkg/infra/serverlock"
|
||||||
"github.com/grafana/grafana/pkg/infra/slugify"
|
"github.com/grafana/grafana/pkg/infra/slugify"
|
||||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||||
"github.com/grafana/grafana/pkg/plugins"
|
"github.com/grafana/grafana/pkg/plugins"
|
||||||
"github.com/grafana/grafana/pkg/services/accesscontrol"
|
"github.com/grafana/grafana/pkg/services/accesscontrol"
|
||||||
"github.com/grafana/grafana/pkg/services/accesscontrol/api"
|
"github.com/grafana/grafana/pkg/services/accesscontrol/api"
|
||||||
"github.com/grafana/grafana/pkg/services/accesscontrol/database"
|
"github.com/grafana/grafana/pkg/services/accesscontrol/database"
|
||||||
|
"github.com/grafana/grafana/pkg/services/accesscontrol/dualwrite"
|
||||||
"github.com/grafana/grafana/pkg/services/accesscontrol/migrator"
|
"github.com/grafana/grafana/pkg/services/accesscontrol/migrator"
|
||||||
"github.com/grafana/grafana/pkg/services/accesscontrol/permreg"
|
"github.com/grafana/grafana/pkg/services/accesscontrol/permreg"
|
||||||
"github.com/grafana/grafana/pkg/services/accesscontrol/pluginutils"
|
"github.com/grafana/grafana/pkg/services/accesscontrol/pluginutils"
|
||||||
@ -53,8 +55,20 @@ func ProvideService(
|
|||||||
cfg *setting.Cfg, db db.DB, routeRegister routing.RouteRegister, cache *localcache.CacheService,
|
cfg *setting.Cfg, db db.DB, routeRegister routing.RouteRegister, cache *localcache.CacheService,
|
||||||
accessControl accesscontrol.AccessControl, actionResolver accesscontrol.ActionResolver,
|
accessControl accesscontrol.AccessControl, actionResolver accesscontrol.ActionResolver,
|
||||||
features featuremgmt.FeatureToggles, tracer tracing.Tracer, zclient zanzana.Client, permRegistry permreg.PermissionRegistry,
|
features featuremgmt.FeatureToggles, tracer tracing.Tracer, zclient zanzana.Client, permRegistry permreg.PermissionRegistry,
|
||||||
|
lock *serverlock.ServerLockService,
|
||||||
) (*Service, error) {
|
) (*Service, error) {
|
||||||
service := ProvideOSSService(cfg, database.ProvideService(db), actionResolver, cache, features, tracer, zclient, db, permRegistry)
|
service := ProvideOSSService(
|
||||||
|
cfg,
|
||||||
|
database.ProvideService(db),
|
||||||
|
actionResolver,
|
||||||
|
cache,
|
||||||
|
features,
|
||||||
|
tracer,
|
||||||
|
zclient,
|
||||||
|
db,
|
||||||
|
permRegistry,
|
||||||
|
lock,
|
||||||
|
)
|
||||||
|
|
||||||
api.NewAccessControlAPI(routeRegister, accessControl, service, features).RegisterAPIEndpoints()
|
api.NewAccessControlAPI(routeRegister, accessControl, service, features).RegisterAPIEndpoints()
|
||||||
if err := accesscontrol.DeclareFixedRoles(service, cfg); err != nil {
|
if err := accesscontrol.DeclareFixedRoles(service, cfg); err != nil {
|
||||||
@ -75,7 +89,7 @@ func ProvideService(
|
|||||||
func ProvideOSSService(
|
func ProvideOSSService(
|
||||||
cfg *setting.Cfg, store accesscontrol.Store, actionResolver accesscontrol.ActionResolver,
|
cfg *setting.Cfg, store accesscontrol.Store, actionResolver accesscontrol.ActionResolver,
|
||||||
cache *localcache.CacheService, features featuremgmt.FeatureToggles, tracer tracing.Tracer,
|
cache *localcache.CacheService, features featuremgmt.FeatureToggles, tracer tracing.Tracer,
|
||||||
zclient zanzana.Client, db db.DB, permRegistry permreg.PermissionRegistry,
|
zclient zanzana.Client, db db.DB, permRegistry permreg.PermissionRegistry, lock *serverlock.ServerLockService,
|
||||||
) *Service {
|
) *Service {
|
||||||
s := &Service{
|
s := &Service{
|
||||||
actionResolver: actionResolver,
|
actionResolver: actionResolver,
|
||||||
@ -85,7 +99,7 @@ func ProvideOSSService(
|
|||||||
log: log.New("accesscontrol.service"),
|
log: log.New("accesscontrol.service"),
|
||||||
roles: accesscontrol.BuildBasicRoleDefinitions(),
|
roles: accesscontrol.BuildBasicRoleDefinitions(),
|
||||||
store: store,
|
store: store,
|
||||||
sync: migrator.NewZanzanaSynchroniser(zclient, db),
|
reconciler: dualwrite.NewZanzanaReconciler(zclient, db, lock),
|
||||||
permRegistry: permRegistry,
|
permRegistry: permRegistry,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -102,10 +116,22 @@ type Service struct {
|
|||||||
registrations accesscontrol.RegistrationList
|
registrations accesscontrol.RegistrationList
|
||||||
roles map[string]*accesscontrol.RoleDTO
|
roles map[string]*accesscontrol.RoleDTO
|
||||||
store accesscontrol.Store
|
store accesscontrol.Store
|
||||||
sync *migrator.ZanzanaSynchroniser
|
reconciler *dualwrite.ZanzanaReconciler
|
||||||
permRegistry permreg.PermissionRegistry
|
permRegistry permreg.PermissionRegistry
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Run implements accesscontrol.Service.
|
||||||
|
func (s *Service) Run(ctx context.Context) error {
|
||||||
|
if s.features.IsEnabledGlobally(featuremgmt.FlagZanzana) {
|
||||||
|
if err := s.reconciler.Sync(context.Background()); err != nil {
|
||||||
|
s.log.Error("Failed to synchronise permissions to zanzana ", "err", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.reconciler.Reconcile(ctx)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Service) GetUsageStats(_ context.Context) map[string]any {
|
func (s *Service) GetUsageStats(_ context.Context) map[string]any {
|
||||||
return map[string]any{
|
return map[string]any{
|
||||||
"stats.oss.accesscontrol.enabled.count": 1,
|
"stats.oss.accesscontrol.enabled.count": 1,
|
||||||
@ -448,12 +474,6 @@ func (s *Service) RegisterFixedRoles(ctx context.Context) error {
|
|||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
|
||||||
if s.features.IsEnabledGlobally(featuremgmt.FlagZanzana) {
|
|
||||||
if err := s.sync.Sync(context.Background()); err != nil {
|
|
||||||
s.log.Error("Failed to synchronise permissions to zanzana ", "err", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -74,6 +74,7 @@ func TestUsageMetrics(t *testing.T) {
|
|||||||
nil,
|
nil,
|
||||||
nil,
|
nil,
|
||||||
permreg.ProvidePermissionRegistry(),
|
permreg.ProvidePermissionRegistry(),
|
||||||
|
nil,
|
||||||
)
|
)
|
||||||
assert.Equal(t, tt.expectedValue, s.GetUsageStats(context.Background())["stats.oss.accesscontrol.enabled.count"])
|
assert.Equal(t, tt.expectedValue, s.GetUsageStats(context.Background())["stats.oss.accesscontrol.enabled.count"])
|
||||||
})
|
})
|
||||||
|
19
pkg/services/accesscontrol/dualwrite/batch.go
Normal file
19
pkg/services/accesscontrol/dualwrite/batch.go
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
package dualwrite
|
||||||
|
|
||||||
|
// batch will call fn with a batch of T for specified size.
|
||||||
|
func batch[T any](items []T, batchSize int, fn func([]T) error) error {
|
||||||
|
count := len(items)
|
||||||
|
for i := 0; i < count; {
|
||||||
|
end := i + batchSize
|
||||||
|
if end > count {
|
||||||
|
end = count
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := fn(items[i:end]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
i = end
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
109
pkg/services/accesscontrol/dualwrite/collectors.go
Normal file
109
pkg/services/accesscontrol/dualwrite/collectors.go
Normal file
@ -0,0 +1,109 @@
|
|||||||
|
package dualwrite
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/grafana/grafana/pkg/infra/db"
|
||||||
|
"github.com/grafana/grafana/pkg/services/authz/zanzana"
|
||||||
|
openfgav1 "github.com/openfga/api/proto/openfga/v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
func teamMembershipCollector(store db.DB) legacyTupleCollector {
|
||||||
|
return func(ctx context.Context) (map[string]map[string]*openfgav1.TupleKey, error) {
|
||||||
|
query := `
|
||||||
|
SELECT t.uid as team_uid, u.uid as user_uid, tm.permission
|
||||||
|
FROM team_member tm
|
||||||
|
INNER JOIN team t ON tm.team_id = t.id
|
||||||
|
INNER JOIN ` + store.GetDialect().Quote("user") + ` u ON tm.user_id = u.id
|
||||||
|
`
|
||||||
|
|
||||||
|
type membership struct {
|
||||||
|
TeamUID string `xorm:"team_uid"`
|
||||||
|
UserUID string `xorm:"user_uid"`
|
||||||
|
Permission int
|
||||||
|
}
|
||||||
|
|
||||||
|
var memberships []membership
|
||||||
|
err := store.WithDbSession(ctx, func(sess *db.Session) error {
|
||||||
|
return sess.SQL(query).Find(&memberships)
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
tuples := make(map[string]map[string]*openfgav1.TupleKey)
|
||||||
|
|
||||||
|
for _, m := range memberships {
|
||||||
|
tuple := &openfgav1.TupleKey{
|
||||||
|
User: zanzana.NewTupleEntry(zanzana.TypeUser, m.UserUID, ""),
|
||||||
|
Object: zanzana.NewTupleEntry(zanzana.TypeTeam, m.TeamUID, ""),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Admin permission is 4 and member 0
|
||||||
|
if m.Permission == 4 {
|
||||||
|
tuple.Relation = zanzana.RelationTeamAdmin
|
||||||
|
} else {
|
||||||
|
tuple.Relation = zanzana.RelationTeamMember
|
||||||
|
}
|
||||||
|
|
||||||
|
if tuples[tuple.Object] == nil {
|
||||||
|
tuples[tuple.Object] = make(map[string]*openfgav1.TupleKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
tuples[tuple.Object][tuple.String()] = tuple
|
||||||
|
}
|
||||||
|
|
||||||
|
return tuples, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func zanzanaCollector(client zanzana.Client, relations []string) zanzanaTupleCollector {
|
||||||
|
return func(ctx context.Context, client zanzana.Client, object string) (map[string]*openfgav1.TupleKey, error) {
|
||||||
|
// list will use continuation token to collect all tuples for object and relation
|
||||||
|
list := func(relation string) ([]*openfgav1.Tuple, error) {
|
||||||
|
first, err := client.Read(ctx, &openfgav1.ReadRequest{
|
||||||
|
TupleKey: &openfgav1.ReadRequestTupleKey{
|
||||||
|
Object: object,
|
||||||
|
Relation: relation,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
c := first.ContinuationToken
|
||||||
|
|
||||||
|
for c != "" {
|
||||||
|
res, err := client.Read(ctx, &openfgav1.ReadRequest{
|
||||||
|
TupleKey: &openfgav1.ReadRequestTupleKey{
|
||||||
|
Object: object,
|
||||||
|
Relation: relation,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
c = res.ContinuationToken
|
||||||
|
first.Tuples = append(first.Tuples, res.Tuples...)
|
||||||
|
}
|
||||||
|
|
||||||
|
return first.Tuples, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
out := make(map[string]*openfgav1.TupleKey)
|
||||||
|
for _, r := range relations {
|
||||||
|
tuples, err := list(r)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for _, t := range tuples {
|
||||||
|
out[t.Key.String()] = t.Key
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
}
|
@ -1,4 +1,4 @@
|
|||||||
package migrator
|
package dualwrite
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@ -6,12 +6,14 @@ import (
|
|||||||
"slices"
|
"slices"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
openfgav1 "github.com/openfga/api/proto/openfga/v1"
|
openfgav1 "github.com/openfga/api/proto/openfga/v1"
|
||||||
"go.opentelemetry.io/otel"
|
"go.opentelemetry.io/otel"
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/infra/db"
|
"github.com/grafana/grafana/pkg/infra/db"
|
||||||
"github.com/grafana/grafana/pkg/infra/log"
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
|
"github.com/grafana/grafana/pkg/infra/serverlock"
|
||||||
"github.com/grafana/grafana/pkg/services/authz/zanzana"
|
"github.com/grafana/grafana/pkg/services/authz/zanzana"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -21,22 +23,26 @@ var tracer = otel.Tracer("github.com/grafana/grafana/pkg/accesscontrol/migrator"
|
|||||||
// They key used should be a unique group key for the collector so we can skip over an already synced group.
|
// They key used should be a unique group key for the collector so we can skip over an already synced group.
|
||||||
type TupleCollector func(ctx context.Context, tuples map[string][]*openfgav1.TupleKey) error
|
type TupleCollector func(ctx context.Context, tuples map[string][]*openfgav1.TupleKey) error
|
||||||
|
|
||||||
// ZanzanaSynchroniser is a component to sync RBAC permissions to zanzana.
|
// ZanzanaReconciler is a component to reconcile RBAC permissions to zanzana.
|
||||||
// We should rewrite the migration after we have "migrated" all possible actions
|
// We should rewrite the migration after we have "migrated" all possible actions
|
||||||
// into our schema. This will only do a one time migration for each action so its
|
// into our schema.
|
||||||
// is not really syncing the full rbac state. If a fresh sync is needed the tuple
|
type ZanzanaReconciler struct {
|
||||||
// needs to be cleared first.
|
lock *serverlock.ServerLockService
|
||||||
type ZanzanaSynchroniser struct {
|
|
||||||
log log.Logger
|
log log.Logger
|
||||||
client zanzana.Client
|
client zanzana.Client
|
||||||
|
// collectors are one time best effort migrations that gives up on first conflict.
|
||||||
|
// These are deprecated and everything should move be resourceReconcilers that are periodically synced
|
||||||
|
// between grafana db and zanzana store.
|
||||||
collectors []TupleCollector
|
collectors []TupleCollector
|
||||||
|
// reconcilers are migrations that tries to reconcile the state of grafana db to zanzana store.
|
||||||
|
// These are run periodically to try to maintain a consistent state.
|
||||||
|
reconcilers []resourceReconciler
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewZanzanaSynchroniser(client zanzana.Client, store db.DB, collectors ...TupleCollector) *ZanzanaSynchroniser {
|
func NewZanzanaReconciler(client zanzana.Client, store db.DB, lock *serverlock.ServerLockService, collectors ...TupleCollector) *ZanzanaReconciler {
|
||||||
// Append shared collectors that is used by both enterprise and oss
|
// Append shared collectors that is used by both enterprise and oss
|
||||||
collectors = append(
|
collectors = append(
|
||||||
collectors,
|
collectors,
|
||||||
teamMembershipCollector(store),
|
|
||||||
managedPermissionsCollector(store),
|
managedPermissionsCollector(store),
|
||||||
folderTreeCollector(store),
|
folderTreeCollector(store),
|
||||||
basicRolesCollector(store),
|
basicRolesCollector(store),
|
||||||
@ -47,47 +53,101 @@ func NewZanzanaSynchroniser(client zanzana.Client, store db.DB, collectors ...Tu
|
|||||||
fixedRoleTuplesCollector(store),
|
fixedRoleTuplesCollector(store),
|
||||||
)
|
)
|
||||||
|
|
||||||
return &ZanzanaSynchroniser{
|
return &ZanzanaReconciler{
|
||||||
client: client,
|
client: client,
|
||||||
log: log.New("zanzana.sync"),
|
lock: lock,
|
||||||
|
log: log.New("zanzana.reconciler"),
|
||||||
collectors: collectors,
|
collectors: collectors,
|
||||||
|
reconcilers: []resourceReconciler{
|
||||||
|
newResourceReconciler(
|
||||||
|
"team memberships",
|
||||||
|
teamMembershipCollector(store),
|
||||||
|
zanzanaCollector(client, []string{zanzana.RelationTeamMember, zanzana.RelationTeamAdmin}),
|
||||||
|
client,
|
||||||
|
),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sync runs all collectors and tries to write all collected tuples.
|
// Sync runs all collectors and tries to write all collected tuples.
|
||||||
// It will skip over any "sync group" that has already been written.
|
// It will skip over any "sync group" that has already been written.
|
||||||
func (z *ZanzanaSynchroniser) Sync(ctx context.Context) error {
|
func (r *ZanzanaReconciler) Sync(ctx context.Context) error {
|
||||||
z.log.Info("Starting zanzana permissions sync")
|
r.log.Info("Starting zanzana permissions sync")
|
||||||
ctx, span := tracer.Start(ctx, "accesscontrol.migrator.Sync")
|
ctx, span := tracer.Start(ctx, "accesscontrol.migrator.Sync")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
tuplesMap := make(map[string][]*openfgav1.TupleKey)
|
tuplesMap := make(map[string][]*openfgav1.TupleKey)
|
||||||
|
|
||||||
for _, c := range z.collectors {
|
for _, c := range r.collectors {
|
||||||
if err := c(ctx, tuplesMap); err != nil {
|
if err := c(ctx, tuplesMap); err != nil {
|
||||||
return fmt.Errorf("failed to collect permissions: %w", err)
|
return fmt.Errorf("failed to collect permissions: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for key, tuples := range tuplesMap {
|
for key, tuples := range tuplesMap {
|
||||||
if err := batch(len(tuples), 100, func(start, end int) error {
|
if err := batch(tuples, 100, func(items []*openfgav1.TupleKey) error {
|
||||||
return z.client.Write(ctx, &openfgav1.WriteRequest{
|
return r.client.Write(ctx, &openfgav1.WriteRequest{
|
||||||
Writes: &openfgav1.WriteRequestWrites{
|
Writes: &openfgav1.WriteRequestWrites{
|
||||||
TupleKeys: tuples[start:end],
|
TupleKeys: items,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
if strings.Contains(err.Error(), "cannot write a tuple which already exists") {
|
if strings.Contains(err.Error(), "cannot write a tuple which already exists") {
|
||||||
z.log.Debug("Skipping already synced permissions", "sync_key", key)
|
r.log.Debug("Skipping already synced permissions", "sync_key", key)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
r.reconcile(ctx)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reconcile schedules as job that will run and reconcile resources between
|
||||||
|
// legacy access control and zanzana.
|
||||||
|
func (r *ZanzanaReconciler) Reconcile(ctx context.Context) error {
|
||||||
|
// FIXME: try to reconcile at start whenever we have moved all syncs to reconcilers
|
||||||
|
// r.reconcile(ctx)
|
||||||
|
|
||||||
|
// FIXME:
|
||||||
|
// 1. We should be a bit graceful about reconciliations so we are not hammering dbs
|
||||||
|
// 2. We should be able to configure reconciliation interval
|
||||||
|
ticker := time.NewTicker(1 * time.Hour)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
r.reconcile(ctx)
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ZanzanaReconciler) reconcile(ctx context.Context) {
|
||||||
|
run := func(ctx context.Context) {
|
||||||
|
now := time.Now()
|
||||||
|
for _, reconciler := range r.reconcilers {
|
||||||
|
if err := reconciler.reconcile(ctx); err != nil {
|
||||||
|
r.log.Warn("Failed to perform reconciliation for resource", "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
r.log.Debug("Finished reconciliation", "elapsed", time.Since(now))
|
||||||
|
}
|
||||||
|
|
||||||
|
// in tests we can skip creating a lock
|
||||||
|
if r.lock == nil {
|
||||||
|
run(ctx)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// We ignore the error for now
|
||||||
|
_ = r.lock.LockExecuteAndRelease(ctx, "zanzana-reconciliation", 10*time.Hour, func(ctx context.Context) {
|
||||||
|
run(ctx)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// managedPermissionsCollector collects managed permissions into provided tuple map.
|
// managedPermissionsCollector collects managed permissions into provided tuple map.
|
||||||
// It will only store actions that are supported by our schema. Managed permissions can
|
// It will only store actions that are supported by our schema. Managed permissions can
|
||||||
// be directly mapped to user/team/role without having to write an intermediate role.
|
// be directly mapped to user/team/role without having to write an intermediate role.
|
||||||
@ -150,54 +210,6 @@ func managedPermissionsCollector(store db.DB) TupleCollector {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func teamMembershipCollector(store db.DB) TupleCollector {
|
|
||||||
return func(ctx context.Context, tuples map[string][]*openfgav1.TupleKey) error {
|
|
||||||
ctx, span := tracer.Start(ctx, "accesscontrol.migrator.teamMembershipCollector")
|
|
||||||
defer span.End()
|
|
||||||
|
|
||||||
const collectorID = "team_membership"
|
|
||||||
query := `
|
|
||||||
SELECT t.uid as team_uid, u.uid as user_uid, tm.permission
|
|
||||||
FROM team_member tm
|
|
||||||
INNER JOIN team t ON tm.team_id = t.id
|
|
||||||
INNER JOIN ` + store.GetDialect().Quote("user") + ` u ON tm.user_id = u.id
|
|
||||||
`
|
|
||||||
|
|
||||||
type membership struct {
|
|
||||||
TeamUID string `xorm:"team_uid"`
|
|
||||||
UserUID string `xorm:"user_uid"`
|
|
||||||
Permission int
|
|
||||||
}
|
|
||||||
|
|
||||||
var memberships []membership
|
|
||||||
err := store.WithDbSession(ctx, func(sess *db.Session) error {
|
|
||||||
return sess.SQL(query).Find(&memberships)
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, m := range memberships {
|
|
||||||
tuple := &openfgav1.TupleKey{
|
|
||||||
User: zanzana.NewTupleEntry(zanzana.TypeUser, m.UserUID, ""),
|
|
||||||
Object: zanzana.NewTupleEntry(zanzana.TypeTeam, m.TeamUID, ""),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Admin permission is 4 and member 0
|
|
||||||
if m.Permission == 4 {
|
|
||||||
tuple.Relation = zanzana.RelationTeamAdmin
|
|
||||||
} else {
|
|
||||||
tuple.Relation = zanzana.RelationTeamMember
|
|
||||||
}
|
|
||||||
|
|
||||||
tuples[collectorID] = append(tuples[collectorID], tuple)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// folderTreeCollector collects folder tree structure and writes it as relation tuples
|
// folderTreeCollector collects folder tree structure and writes it as relation tuples
|
||||||
func folderTreeCollector(store db.DB) TupleCollector {
|
func folderTreeCollector(store db.DB) TupleCollector {
|
||||||
return func(ctx context.Context, tuples map[string][]*openfgav1.TupleKey) error {
|
return func(ctx context.Context, tuples map[string][]*openfgav1.TupleKey) error {
|
99
pkg/services/accesscontrol/dualwrite/resource_reconciler.go
Normal file
99
pkg/services/accesscontrol/dualwrite/resource_reconciler.go
Normal file
@ -0,0 +1,99 @@
|
|||||||
|
package dualwrite
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/grafana/grafana/pkg/services/authz/zanzana"
|
||||||
|
openfgav1 "github.com/openfga/api/proto/openfga/v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
// legacyTupleCollector collects tuples groupd by object and tupleKey
|
||||||
|
type legacyTupleCollector func(ctx context.Context) (map[string]map[string]*openfgav1.TupleKey, error)
|
||||||
|
|
||||||
|
// zanzanaTupleCollector collects tuples from zanzana for given object
|
||||||
|
type zanzanaTupleCollector func(ctx context.Context, client zanzana.Client, object string) (map[string]*openfgav1.TupleKey, error)
|
||||||
|
|
||||||
|
type resourceReconciler struct {
|
||||||
|
name string
|
||||||
|
legacy legacyTupleCollector
|
||||||
|
zanzana zanzanaTupleCollector
|
||||||
|
client zanzana.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
func newResourceReconciler(name string, legacy legacyTupleCollector, zanzana zanzanaTupleCollector, client zanzana.Client) resourceReconciler {
|
||||||
|
return resourceReconciler{name, legacy, zanzana, client}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r resourceReconciler) reconcile(ctx context.Context) error {
|
||||||
|
// 1. Fetch grafana resources stored in grafana db.
|
||||||
|
res, err := r.legacy(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to collect legacy tuples for %s: %w", r.name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
writes = []*openfgav1.TupleKey{}
|
||||||
|
deletes = []*openfgav1.TupleKeyWithoutCondition{}
|
||||||
|
)
|
||||||
|
|
||||||
|
for object, tuples := range res {
|
||||||
|
// 2. Fetch all tuples for given object.
|
||||||
|
// Due to limitations in open fga api we need to collect tuples per object
|
||||||
|
zanzanaTuples, err := r.zanzana(ctx, r.client, object)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to collect zanzanaa tuples for %s: %w", r.name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. Check if tuples from grafana db exists in zanzana and if not add them to writes
|
||||||
|
for key, t := range tuples {
|
||||||
|
_, ok := zanzanaTuples[key]
|
||||||
|
if !ok {
|
||||||
|
writes = append(writes, t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. Check if tuple from zanzana don't exists in grafana db, if not add them to deletes.
|
||||||
|
for key, tuple := range zanzanaTuples {
|
||||||
|
_, ok := tuples[key]
|
||||||
|
if !ok {
|
||||||
|
deletes = append(deletes, &openfgav1.TupleKeyWithoutCondition{
|
||||||
|
User: tuple.User,
|
||||||
|
Relation: tuple.Relation,
|
||||||
|
Object: tuple.Object,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(writes) == 0 && len(deletes) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FIXME: batch them together
|
||||||
|
if len(writes) > 0 {
|
||||||
|
err := batch(writes, 100, func(items []*openfgav1.TupleKey) error {
|
||||||
|
return r.client.Write(ctx, &openfgav1.WriteRequest{
|
||||||
|
Writes: &openfgav1.WriteRequestWrites{TupleKeys: items},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(deletes) > 0 {
|
||||||
|
err := batch(deletes, 100, func(items []*openfgav1.TupleKeyWithoutCondition) error {
|
||||||
|
return r.client.Write(ctx, &openfgav1.WriteRequest{
|
||||||
|
Deletes: &openfgav1.WriteRequestDeletes{TupleKeys: items},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
@ -90,6 +90,10 @@ func (m *Mock) GetRoleByName(ctx context.Context, orgID int64, roleName string)
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *Mock) Run(ctx context.Context) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (m *Mock) GetUsageStats(ctx context.Context) map[string]interface{} {
|
func (m *Mock) GetUsageStats(ctx context.Context) map[string]interface{} {
|
||||||
return make(map[string]interface{})
|
return make(map[string]interface{})
|
||||||
}
|
}
|
||||||
|
@ -31,7 +31,7 @@ func ProvideFolderPermissions(
|
|||||||
actionSets := resourcepermissions.NewActionSetService(features)
|
actionSets := resourcepermissions.NewActionSetService(features)
|
||||||
acSvc := acimpl.ProvideOSSService(
|
acSvc := acimpl.ProvideOSSService(
|
||||||
cfg, acdb.ProvideService(sqlStore), actionSets, localcache.ProvideService(),
|
cfg, acdb.ProvideService(sqlStore), actionSets, localcache.ProvideService(),
|
||||||
features, tracing.InitializeTracerForTest(), zanzana.NewNoopClient(), sqlStore, permreg.ProvidePermissionRegistry(),
|
features, tracing.InitializeTracerForTest(), zanzana.NewNoopClient(), sqlStore, permreg.ProvidePermissionRegistry(), nil,
|
||||||
)
|
)
|
||||||
|
|
||||||
license := licensingtest.NewFakeLicensing()
|
license := licensingtest.NewFakeLicensing()
|
||||||
|
@ -16,6 +16,7 @@ import (
|
|||||||
// Client is a wrapper around [openfgav1.OpenFGAServiceClient]
|
// Client is a wrapper around [openfgav1.OpenFGAServiceClient]
|
||||||
type Client interface {
|
type Client interface {
|
||||||
Check(ctx context.Context, in *openfgav1.CheckRequest) (*openfgav1.CheckResponse, error)
|
Check(ctx context.Context, in *openfgav1.CheckRequest) (*openfgav1.CheckResponse, error)
|
||||||
|
Read(ctx context.Context, in *openfgav1.ReadRequest) (*openfgav1.ReadResponse, error)
|
||||||
ListObjects(ctx context.Context, in *openfgav1.ListObjectsRequest) (*openfgav1.ListObjectsResponse, error)
|
ListObjects(ctx context.Context, in *openfgav1.ListObjectsRequest) (*openfgav1.ListObjectsResponse, error)
|
||||||
Write(ctx context.Context, in *openfgav1.WriteRequest) error
|
Write(ctx context.Context, in *openfgav1.WriteRequest) error
|
||||||
}
|
}
|
||||||
|
@ -94,6 +94,14 @@ func (c *Client) Check(ctx context.Context, in *openfgav1.CheckRequest) (*openfg
|
|||||||
return c.client.Check(ctx, in)
|
return c.client.Check(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Client) Read(ctx context.Context, in *openfgav1.ReadRequest) (*openfgav1.ReadResponse, error) {
|
||||||
|
ctx, span := tracer.Start(ctx, "authz.zanzana.client.Read")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
in.StoreId = c.storeID
|
||||||
|
return c.client.Read(ctx, in)
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) ListObjects(ctx context.Context, in *openfgav1.ListObjectsRequest) (*openfgav1.ListObjectsResponse, error) {
|
func (c *Client) ListObjects(ctx context.Context, in *openfgav1.ListObjectsRequest) (*openfgav1.ListObjectsResponse, error) {
|
||||||
ctx, span := tracer.Start(ctx, "authz.zanzana.client.ListObjects")
|
ctx, span := tracer.Start(ctx, "authz.zanzana.client.ListObjects")
|
||||||
span.SetAttributes(attribute.String("resource.type", in.Type))
|
span.SetAttributes(attribute.String("resource.type", in.Type))
|
||||||
|
@ -16,6 +16,10 @@ func (nc NoopClient) Check(ctx context.Context, in *openfgav1.CheckRequest) (*op
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (nc NoopClient) Read(ctx context.Context, in *openfgav1.ReadRequest) (*openfgav1.ReadResponse, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (nc NoopClient) ListObjects(ctx context.Context, in *openfgav1.ListObjectsRequest) (*openfgav1.ListObjectsResponse, error) {
|
func (nc NoopClient) ListObjects(ctx context.Context, in *openfgav1.ListObjectsRequest) (*openfgav1.ListObjectsResponse, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
@ -10,7 +10,7 @@ import (
|
|||||||
|
|
||||||
"github.com/grafana/grafana/pkg/infra/db"
|
"github.com/grafana/grafana/pkg/infra/db"
|
||||||
"github.com/grafana/grafana/pkg/services/accesscontrol/acimpl"
|
"github.com/grafana/grafana/pkg/services/accesscontrol/acimpl"
|
||||||
"github.com/grafana/grafana/pkg/services/accesscontrol/migrator"
|
"github.com/grafana/grafana/pkg/services/accesscontrol/dualwrite"
|
||||||
accesscontrolmock "github.com/grafana/grafana/pkg/services/accesscontrol/mock"
|
accesscontrolmock "github.com/grafana/grafana/pkg/services/accesscontrol/mock"
|
||||||
"github.com/grafana/grafana/pkg/services/authz"
|
"github.com/grafana/grafana/pkg/services/authz"
|
||||||
"github.com/grafana/grafana/pkg/services/dashboards"
|
"github.com/grafana/grafana/pkg/services/dashboards"
|
||||||
@ -77,7 +77,7 @@ func TestIntegrationDashboardServiceZanzana(t *testing.T) {
|
|||||||
createDashboards(t, service, 100, "test-b")
|
createDashboards(t, service, 100, "test-b")
|
||||||
|
|
||||||
// Sync Grafana DB with zanzana (migrate data)
|
// Sync Grafana DB with zanzana (migrate data)
|
||||||
zanzanaSyncronizer := migrator.NewZanzanaSynchroniser(zclient, db)
|
zanzanaSyncronizer := dualwrite.NewZanzanaReconciler(zclient, db, nil)
|
||||||
err = zanzanaSyncronizer.Sync(context.Background())
|
err = zanzanaSyncronizer.Sync(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -55,7 +55,7 @@ func setupTestEnv(t *testing.T) *TestEnv {
|
|||||||
acSvc: acimpl.ProvideOSSService(
|
acSvc: acimpl.ProvideOSSService(
|
||||||
cfg, env.AcStore, &resourcepermissions.FakeActionSetSvc{},
|
cfg, env.AcStore, &resourcepermissions.FakeActionSetSvc{},
|
||||||
localcache.New(0, 0), fmgt, tracing.InitializeTracerForTest(), nil, nil,
|
localcache.New(0, 0), fmgt, tracing.InitializeTracerForTest(), nil, nil,
|
||||||
permreg.ProvidePermissionRegistry(),
|
permreg.ProvidePermissionRegistry(), nil,
|
||||||
),
|
),
|
||||||
defaultOrgID: autoAssignOrgID,
|
defaultOrgID: autoAssignOrgID,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
|
Loading…
Reference in New Issue
Block a user