Zanzana: use namespace when performing reconciliation (#96205)

* Special handling for zanzana reconciliation if stack id is configured

* remove sync call
This commit is contained in:
Karl Persson 2024-11-11 13:48:49 +01:00 committed by GitHub
parent 19bed65d9b
commit 9f66843915
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 68 additions and 70 deletions

View File

@ -99,7 +99,7 @@ func ProvideOSSService(
log: log.New("accesscontrol.service"),
roles: accesscontrol.BuildBasicRoleDefinitions(),
store: store,
reconciler: dualwrite.NewZanzanaReconciler(zclient, db, lock),
reconciler: dualwrite.NewZanzanaReconciler(cfg, zclient, db, lock),
permRegistry: permRegistry,
}
@ -123,10 +123,6 @@ type Service struct {
// Run implements accesscontrol.Service.
func (s *Service) Run(ctx context.Context) error {
if s.features.IsEnabledGlobally(featuremgmt.FlagZanzana) {
if err := s.reconciler.Sync(context.Background()); err != nil {
s.log.Error("Failed to synchronise permissions to zanzana ", "err", err)
}
return s.reconciler.Reconcile(ctx)
}
return nil

View File

@ -7,17 +7,17 @@ import (
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/services/authz/zanzana"
"github.com/grafana/grafana/pkg/services/authz/zanzana/common"
authzextv1 "github.com/grafana/grafana/pkg/services/authz/zanzana/proto/v1"
)
func teamMembershipCollector(store db.DB) legacyTupleCollector {
return func(ctx context.Context, orgId int64) (map[string]map[string]*openfgav1.TupleKey, error) {
return func(ctx context.Context, orgID int64) (map[string]map[string]*openfgav1.TupleKey, error) {
query := `
SELECT t.uid as team_uid, u.uid as user_uid, tm.permission
FROM team_member tm
INNER JOIN team t ON tm.team_id = t.id
INNER JOIN ` + store.GetDialect().Quote("user") + ` u ON tm.user_id = u.id
WHERE org_id = ?
`
type membership struct {
@ -28,7 +28,7 @@ func teamMembershipCollector(store db.DB) legacyTupleCollector {
var memberships []membership
err := store.WithDbSession(ctx, func(sess *db.Session) error {
return sess.SQL(query).Find(&memberships)
return sess.SQL(query, orgID).Find(&memberships)
})
if err != nil {
@ -63,22 +63,21 @@ func teamMembershipCollector(store db.DB) legacyTupleCollector {
// folderTreeCollector collects folder tree structure and writes it as relation tuples
func folderTreeCollector(store db.DB) legacyTupleCollector {
return func(ctx context.Context, orgId int64) (map[string]map[string]*openfgav1.TupleKey, error) {
return func(ctx context.Context, orgID int64) (map[string]map[string]*openfgav1.TupleKey, error) {
ctx, span := tracer.Start(ctx, "accesscontrol.migrator.folderTreeCollector")
defer span.End()
const query = `
SELECT uid, parent_uid, org_id FROM folder
SELECT uid, parent_uid, org_id FROM folder WHERE org_id = ?
`
type folder struct {
OrgID int64 `xorm:"org_id"`
FolderUID string `xorm:"uid"`
ParentUID string `xorm:"parent_uid"`
}
var folders []folder
err := store.WithDbSession(ctx, func(sess *db.Session) error {
return sess.SQL(query).Find(&folders)
return sess.SQL(query, orgID).Find(&folders)
})
if err != nil {
@ -94,9 +93,9 @@ func folderTreeCollector(store db.DB) legacyTupleCollector {
}
tuple = &openfgav1.TupleKey{
Object: zanzana.NewTupleEntry(common.TypeFolder, f.FolderUID, ""),
Object: zanzana.NewTupleEntry(zanzana.TypeFolder, f.FolderUID, ""),
Relation: zanzana.RelationParent,
User: zanzana.NewTupleEntry(common.TypeFolder, f.ParentUID, ""),
User: zanzana.NewTupleEntry(zanzana.TypeFolder, f.ParentUID, ""),
}
if tuples[tuple.Object] == nil {
@ -114,7 +113,7 @@ func folderTreeCollector(store db.DB) legacyTupleCollector {
// It will only store actions that are supported by our schema. Managed permissions can
// be directly mapped to user/team/role without having to write an intermediate role.
func managedPermissionsCollector(store db.DB, kind string) legacyTupleCollector {
return func(ctx context.Context, orgId int64) (map[string]map[string]*openfgav1.TupleKey, error) {
return func(ctx context.Context, orgID int64) (map[string]map[string]*openfgav1.TupleKey, error) {
query := `
SELECT u.uid as user_uid, t.uid as team_uid, p.action, p.kind, p.identifier, r.org_id
FROM permission p
@ -125,11 +124,11 @@ func managedPermissionsCollector(store db.DB, kind string) legacyTupleCollector
LEFT JOIN team t ON tr.team_id = t.id
LEFT JOIN builtin_role br ON r.id = br.role_id
WHERE r.name LIKE 'managed:%'
AND r.org_id = ?
AND p.kind = ?
`
type Permission struct {
RoleName string `xorm:"role_name"`
OrgID int64 `xorm:"org_id"`
Action string `xorm:"action"`
Kind string
Identifier string
@ -139,7 +138,7 @@ func managedPermissionsCollector(store db.DB, kind string) legacyTupleCollector
var permissions []Permission
err := store.WithDbSession(ctx, func(sess *db.Session) error {
return sess.SQL(query, kind).Find(&permissions)
return sess.SQL(query, orgID, kind).Find(&permissions)
})
if err != nil {
@ -230,7 +229,7 @@ func zanzanaCollector(relations []string) zanzanaTupleCollector {
first.Tuples = append(first.Tuples, res.Tuples...)
}
return common.ToOpenFGATuples(first.Tuples), nil
return zanzana.ToOpenFGATuples(first.Tuples), nil
}
out := make(map[string]*openfgav1.TupleKey)

View File

@ -2,42 +2,42 @@ package dualwrite
import (
"context"
"strconv"
"time"
"github.com/grafana/authlib/claims"
openfgav1 "github.com/openfga/api/proto/openfga/v1"
"go.opentelemetry.io/otel"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/serverlock"
"github.com/grafana/grafana/pkg/services/authz/zanzana"
"github.com/grafana/grafana/pkg/setting"
)
var tracer = otel.Tracer("github.com/grafana/grafana/pkg/accesscontrol/migrator")
// A TupleCollector is responsible to build and store [openfgav1.TupleKey] into provided tuple map.
// They key used should be a unique group key for the collector so we can skip over an already synced group.
type TupleCollector func(ctx context.Context, namespace string, tuples map[string][]*openfgav1.TupleKey) error
// ZanzanaReconciler is a component to reconcile RBAC permissions to zanzana.
// We should rewrite the migration after we have "migrated" all possible actions
// into our schema.
type ZanzanaReconciler struct {
lock *serverlock.ServerLockService
log log.Logger
cfg *setting.Cfg
log log.Logger
store db.DB
client zanzana.Client
lock *serverlock.ServerLockService
// reconcilers are migrations that tries to reconcile the state of grafana db to zanzana store.
// These are run periodically to try to maintain a consistent state.
reconcilers []resourceReconciler
}
func NewZanzanaReconciler(client zanzana.Client, store db.DB, lock *serverlock.ServerLockService) *ZanzanaReconciler {
func NewZanzanaReconciler(cfg *setting.Cfg, client zanzana.Client, store db.DB, lock *serverlock.ServerLockService) *ZanzanaReconciler {
return &ZanzanaReconciler{
cfg: cfg,
log: log.New("zanzana.reconciler"),
client: client,
lock: lock,
log: log.New("zanzana.reconciler"),
store: store,
reconcilers: []resourceReconciler{
newResourceReconciler(
@ -68,23 +68,10 @@ func NewZanzanaReconciler(client zanzana.Client, store db.DB, lock *serverlock.S
}
}
// Sync runs all collectors and tries to write all collected tuples.
// It will skip over any "sync group" that has already been written.
func (r *ZanzanaReconciler) Sync(ctx context.Context) error {
r.log.Info("Starting zanzana permissions sync")
ctx, span := tracer.Start(ctx, "accesscontrol.migrator.Sync")
defer span.End()
r.reconcile(ctx)
return nil
}
// Reconcile schedules as job that will run and reconcile resources between
// legacy access control and zanzana.
func (r *ZanzanaReconciler) Reconcile(ctx context.Context) error {
// FIXME: try to reconcile at start whenever we have moved all syncs to reconcilers
// r.reconcile(ctx)
r.reconcile(ctx)
// FIXME:
// 1. We should be a bit graceful about reconciliations so we are not hammering dbs
@ -111,24 +98,40 @@ func (r *ZanzanaReconciler) reconcile(ctx context.Context) {
r.log.Debug("Finished reconciliation", "elapsed", time.Since(now))
}
orgIds, err := r.getOrgs(ctx)
if err != nil {
return
}
for _, orgId := range orgIds {
ns := claims.OrgNamespaceFormatter(orgId)
if r.lock == nil {
run(ctx, ns)
var namespaces []string
if r.cfg.StackID != "" {
id, err := strconv.ParseInt(r.cfg.StackID, 10, 64)
if err != nil {
r.log.Error("cannot perform reconciliation, malformed stack id", "id", r.cfg.StackID, "err", err)
return
}
// We ignore the error for now
_ = r.lock.LockExecuteAndRelease(ctx, "zanzana-reconciliation", 10*time.Hour, func(ctx context.Context) {
run(ctx, ns)
})
namespaces = []string{claims.CloudNamespaceFormatter(id)}
} else {
ids, err := r.getOrgs(ctx)
if err != nil {
r.log.Error("cannot perform reconciliation, failed to fetch orgs", "err", err)
return
}
for _, id := range ids {
namespaces = append(namespaces, claims.OrgNamespaceFormatter(id))
}
}
if r.lock == nil {
for _, ns := range namespaces {
run(ctx, ns)
}
return
}
// We ignore the error for now
_ = r.lock.LockExecuteAndRelease(ctx, "zanzana-reconciliation", 10*time.Hour, func(ctx context.Context) {
for _, ns := range namespaces {
run(ctx, ns)
}
})
}
func (r *ZanzanaReconciler) getOrgs(ctx context.Context) ([]int64, error) {

View File

@ -8,12 +8,11 @@ import (
openfgav1 "github.com/openfga/api/proto/openfga/v1"
"github.com/grafana/grafana/pkg/services/authz/zanzana"
"github.com/grafana/grafana/pkg/services/authz/zanzana/common"
authzextv1 "github.com/grafana/grafana/pkg/services/authz/zanzana/proto/v1"
)
// legacyTupleCollector collects tuples groupd by object and tupleKey
type legacyTupleCollector func(ctx context.Context, orgId int64) (map[string]map[string]*openfgav1.TupleKey, error)
type legacyTupleCollector func(ctx context.Context, orgID int64) (map[string]map[string]*openfgav1.TupleKey, error)
// zanzanaTupleCollector collects tuples from zanzana for given object
type zanzanaTupleCollector func(ctx context.Context, client zanzana.Client, object string, namespace string) (map[string]*openfgav1.TupleKey, error)
@ -95,7 +94,7 @@ func (r resourceReconciler) reconcile(ctx context.Context, namespace string) err
err := batch(deletes, 100, func(items []*openfgav1.TupleKeyWithoutCondition) error {
return r.client.Write(ctx, &authzextv1.WriteRequest{
Namespace: namespace,
Deletes: &authzextv1.WriteRequestDeletes{TupleKeys: common.ToAuthzExtTupleKeysWithoutCondition(items)},
Deletes: &authzextv1.WriteRequestDeletes{TupleKeys: zanzana.ToAuthzExtTupleKeysWithoutCondition(items)},
})
})
@ -108,7 +107,7 @@ func (r resourceReconciler) reconcile(ctx context.Context, namespace string) err
err := batch(writes, 100, func(items []*openfgav1.TupleKey) error {
return r.client.Write(ctx, &authzextv1.WriteRequest{
Namespace: namespace,
Writes: &authzextv1.WriteRequestWrites{TupleKeys: common.ToAuthzExtTupleKeys(items)},
Writes: &authzextv1.WriteRequestWrites{TupleKeys: zanzana.ToAuthzExtTupleKeys(items)},
})
})

View File

@ -84,6 +84,18 @@ const (
GlobalOrgID = 0
)
var (
ToAuthzExtTupleKey = common.ToAuthzExtTupleKey
ToAuthzExtTupleKeys = common.ToAuthzExtTupleKeys
ToAuthzExtTupleKeyWithoutCondition = common.ToAuthzExtTupleKeyWithoutCondition
ToAuthzExtTupleKeysWithoutCondition = common.ToAuthzExtTupleKeysWithoutCondition
ToOpenFGATuple = common.ToOpenFGATuple
ToOpenFGATuples = common.ToOpenFGATuples
ToOpenFGATupleKey = common.ToOpenFGATupleKey
ToOpenFGATupleKeyWithoutCondition = common.ToOpenFGATupleKeyWithoutCondition
)
// NewTupleEntry constructs new openfga entry type:id[#relation].
// Relation allows to specify group of users (subjects) related to type:id
// (for example, team:devs#member refers to users which are members of team devs)
@ -131,14 +143,3 @@ func MergeFolderResourceTuples(a, b *openfgav1.TupleKey) {
vb := b.Condition.Context.Fields["group_resources"]
va.GetListValue().Values = append(va.GetListValue().Values, vb.GetListValue().Values...)
}
func TranslateFixedRole(role string) string {
role = strings.ReplaceAll(role, ":", "_")
role = strings.ReplaceAll(role, ".", "_")
return role
}
// Translate "read" for the dashboard into "dashboard_read" for folder
func TranslateToFolderRelation(relation, objectType string) string {
return fmt.Sprintf("%s_%s", objectType, relation)
}