mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Zanzana: List then search implementation (#96705)
* Zanzana: Search with list * Allow to pass werb into list request * split list search into 2 functions * fix listing resources * remove unused * refactor * remove unused function * Add more logging to reconciler * Fix search for users with access to all resources * fix findFoldersZanzanaList * search for folders as well by default * refactor * use compile for list and search * remove list from client * remove only from client * remove list from interface * run compile once * refactor * refactor * add search tests * fix tests * Fix linter
This commit is contained in:
@@ -110,10 +110,18 @@ func (r *ZanzanaReconciler) Reconcile(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
// ReconcileSync runs reconciliation and returns. Useful for tests to perform
|
||||
// reconciliation in a synchronous way.
|
||||
func (r *ZanzanaReconciler) ReconcileSync(ctx context.Context) error {
|
||||
r.reconcile(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *ZanzanaReconciler) reconcile(ctx context.Context) {
|
||||
run := func(ctx context.Context, namespace string) {
|
||||
now := time.Now()
|
||||
for _, reconciler := range r.reconcilers {
|
||||
r.log.Debug("Performing zanzana reconciliation", "reconciler", reconciler.name)
|
||||
if err := reconciler.reconcile(ctx, namespace); err != nil {
|
||||
r.log.Warn("Failed to perform reconciliation for resource", "err", err)
|
||||
}
|
||||
@@ -150,11 +158,14 @@ func (r *ZanzanaReconciler) reconcile(ctx context.Context) {
|
||||
}
|
||||
|
||||
// We ignore the error for now
|
||||
_ = r.lock.LockExecuteAndRelease(ctx, "zanzana-reconciliation", 10*time.Hour, func(ctx context.Context) {
|
||||
err := r.lock.LockExecuteAndRelease(ctx, "zanzana-reconciliation", 10*time.Hour, func(ctx context.Context) {
|
||||
for _, ns := range namespaces {
|
||||
run(ctx, ns)
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
r.log.Error("Error performing zanzana reconciliation", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *ZanzanaReconciler) getOrgs(ctx context.Context) ([]int64, error) {
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/grafana/authlib/authz"
|
||||
"github.com/grafana/authlib/claims"
|
||||
|
||||
authzextv1 "github.com/grafana/grafana/pkg/services/authz/proto/v1"
|
||||
"github.com/grafana/grafana/pkg/services/authz/zanzana/client"
|
||||
@@ -13,7 +12,6 @@ import (
|
||||
// Client is a wrapper around [openfgav1.OpenFGAServiceClient]
|
||||
type Client interface {
|
||||
authz.AccessClient
|
||||
List(ctx context.Context, id claims.AuthInfo, req authz.ListRequest) (*authzextv1.ListResponse, error)
|
||||
Read(ctx context.Context, req *authzextv1.ReadRequest) (*authzextv1.ReadResponse, error)
|
||||
Write(ctx context.Context, req *authzextv1.WriteRequest) error
|
||||
BatchCheck(ctx context.Context, req *authzextv1.BatchCheckRequest) (*authzextv1.BatchCheckResponse, error)
|
||||
|
||||
@@ -142,19 +142,6 @@ func newItemChecker(res *authzextv1.ListResponse) authz.ItemChecker {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) List(ctx context.Context, id claims.AuthInfo, req authz.ListRequest) (*authzextv1.ListResponse, error) {
|
||||
ctx, span := tracer.Start(ctx, "authz.zanzana.client.List")
|
||||
defer span.End()
|
||||
|
||||
return c.authzext.List(ctx, &authzextv1.ListRequest{
|
||||
Subject: id.GetUID(),
|
||||
Group: req.Group,
|
||||
Verb: utils.VerbList,
|
||||
Resource: req.Resource,
|
||||
Namespace: req.Namespace,
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Client) Read(ctx context.Context, req *authzextv1.ReadRequest) (*authzextv1.ReadResponse, error) {
|
||||
ctx, span := tracer.Start(ctx, "authz.zanzana.client.Read")
|
||||
defer span.End()
|
||||
|
||||
@@ -25,10 +25,6 @@ func (nc *NoopClient) Compile(ctx context.Context, id claims.AuthInfo, req authz
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (nc *NoopClient) List(ctx context.Context, id claims.AuthInfo, req authz.ListRequest) (*authzextv1.ListResponse, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (nc NoopClient) Read(ctx context.Context, req *authzextv1.ReadRequest) (*authzextv1.ReadResponse, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
@@ -4,9 +4,8 @@ import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
openfgav1 "github.com/openfga/api/proto/openfga/v1"
|
||||
|
||||
"github.com/grafana/authlib/authz"
|
||||
openfgav1 "github.com/openfga/api/proto/openfga/v1"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/authz/zanzana/common"
|
||||
)
|
||||
@@ -154,6 +153,23 @@ func TranslateToCheckRequest(namespace, action, kind, folder, name string) (*aut
|
||||
return req, true
|
||||
}
|
||||
|
||||
func TranslateToListRequest(namespace, action, kind string) (*authz.ListRequest, bool) {
|
||||
translation, ok := resourceTranslations[kind]
|
||||
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// FIXME: support different verbs
|
||||
req := &authz.ListRequest{
|
||||
Namespace: namespace,
|
||||
Group: translation.group,
|
||||
Resource: translation.resource,
|
||||
}
|
||||
|
||||
return req, true
|
||||
}
|
||||
|
||||
func TranslateToGroupResource(kind string) string {
|
||||
translation, ok := resourceTranslations[kind]
|
||||
if !ok {
|
||||
|
||||
@@ -2,6 +2,7 @@ package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
@@ -13,6 +14,10 @@ import (
|
||||
|
||||
const (
|
||||
defaultQueryLimit = 1000
|
||||
// If search query string shorter than this value, then "List, then check" strategy will be used
|
||||
listQueryLengthThreshold = 8
|
||||
// If query limit set to value higher than this value, then "List, then check" strategy will be used
|
||||
listQueryLimitThreshold = 50
|
||||
)
|
||||
|
||||
type searchResult struct {
|
||||
@@ -85,13 +90,23 @@ func (dr *DashboardServiceImpl) findDashboardsZanzanaCompare(ctx context.Context
|
||||
return first.result, first.err
|
||||
}
|
||||
|
||||
type checkDashboardsFn func(context.Context, dashboards.FindPersistedDashboardsQuery, []dashboards.DashboardSearchProjection, int64) ([]dashboards.DashboardSearchProjection, error)
|
||||
|
||||
func (dr *DashboardServiceImpl) findDashboardsZanzana(ctx context.Context, query dashboards.FindPersistedDashboardsQuery) ([]dashboards.DashboardSearchProjection, error) {
|
||||
return dr.findDashboardsZanzanaCheck(ctx, query)
|
||||
if len(query.Title) <= listQueryLengthThreshold || query.Limit > listQueryLimitThreshold {
|
||||
checkCompileFn, err := dr.getCheckCompileFn(ctx, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return dr.findDashboardsZanzanaGeneric(ctx, query, checkCompileFn)
|
||||
}
|
||||
|
||||
return dr.findDashboardsZanzanaGeneric(ctx, query, dr.checkDashboardsBatch)
|
||||
}
|
||||
|
||||
// findDashboardsZanzanaCheck implements "Search, then check" strategy. It first performs search query, then filters out results
|
||||
// by checking access to each item.
|
||||
func (dr *DashboardServiceImpl) findDashboardsZanzanaCheck(ctx context.Context, query dashboards.FindPersistedDashboardsQuery) ([]dashboards.DashboardSearchProjection, error) {
|
||||
// findDashboardsZanzanaGeneric runs search query in the database and then check if resultls
|
||||
// available to user by calling provided checkFn function. It could be check-based or compile (list) - based.
|
||||
func (dr *DashboardServiceImpl) findDashboardsZanzanaGeneric(ctx context.Context, query dashboards.FindPersistedDashboardsQuery, checkFn checkDashboardsFn) ([]dashboards.DashboardSearchProjection, error) {
|
||||
ctx, span := tracer.Start(ctx, "dashboards.service.findDashboardsZanzanaCheck")
|
||||
defer span.End()
|
||||
|
||||
@@ -113,7 +128,7 @@ func (dr *DashboardServiceImpl) findDashboardsZanzanaCheck(ctx context.Context,
|
||||
}
|
||||
|
||||
remains := limit - int64(len(result))
|
||||
res, err := dr.checkDashboardsBatch(ctx, query, findRes, remains)
|
||||
res, err := checkFn(ctx, query, findRes, remains)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -195,3 +210,46 @@ func (dr *DashboardServiceImpl) checkDashboardsBatch(ctx context.Context, query
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (dr *DashboardServiceImpl) getCheckCompileFn(ctx context.Context, query dashboards.FindPersistedDashboardsQuery) (checkDashboardsFn, error) {
|
||||
// List available folders
|
||||
namespace := query.SignedInUser.GetNamespace()
|
||||
req, ok := zanzana.TranslateToListRequest(namespace, dashboards.ActionFoldersRead, zanzana.KindFolders)
|
||||
if !ok {
|
||||
return nil, errors.New("resource type not supported")
|
||||
}
|
||||
folderChecker, err := dr.zclient.Compile(ctx, query.SignedInUser, *req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// List available dashboards
|
||||
req, ok = zanzana.TranslateToListRequest(namespace, dashboards.ActionDashboardsRead, zanzana.KindDashboards)
|
||||
if !ok {
|
||||
return nil, errors.New("resource type not supported")
|
||||
}
|
||||
dashboardChecker, err := dr.zclient.Compile(ctx, query.SignedInUser, *req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return func(_ context.Context, _ dashboards.FindPersistedDashboardsQuery, searchRes []dashboards.DashboardSearchProjection, remains int64) ([]dashboards.DashboardSearchProjection, error) {
|
||||
result := make([]dashboards.DashboardSearchProjection, 0)
|
||||
for _, d := range searchRes {
|
||||
if len(result) >= int(remains) {
|
||||
break
|
||||
}
|
||||
allowed := false
|
||||
if d.IsFolder {
|
||||
allowed = folderChecker(namespace, d.UID, d.FolderUID)
|
||||
} else {
|
||||
allowed = dashboardChecker(namespace, d.UID, d.FolderUID)
|
||||
}
|
||||
if allowed {
|
||||
result = append(result, d)
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
133
pkg/services/dashboards/service/zanzana_integration_test.go
Normal file
133
pkg/services/dashboards/service/zanzana_integration_test.go
Normal file
@@ -0,0 +1,133 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/db"
|
||||
"github.com/grafana/grafana/pkg/infra/serverlock"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/services/accesscontrol/acimpl"
|
||||
"github.com/grafana/grafana/pkg/services/accesscontrol/dualwrite"
|
||||
accesscontrolmock "github.com/grafana/grafana/pkg/services/accesscontrol/mock"
|
||||
"github.com/grafana/grafana/pkg/services/authz"
|
||||
"github.com/grafana/grafana/pkg/services/dashboards"
|
||||
"github.com/grafana/grafana/pkg/services/dashboards/database"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/folder/folderimpl"
|
||||
"github.com/grafana/grafana/pkg/services/folder/foldertest"
|
||||
"github.com/grafana/grafana/pkg/services/guardian"
|
||||
"github.com/grafana/grafana/pkg/services/quota/quotatest"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||
"github.com/grafana/grafana/pkg/services/tag/tagimpl"
|
||||
"github.com/grafana/grafana/pkg/services/user"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
)
|
||||
|
||||
func TestIntegrationDashboardServiceZanzana(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test")
|
||||
}
|
||||
|
||||
t.Run("Zanzana enabled", func(t *testing.T) {
|
||||
features := featuremgmt.WithFeatures(featuremgmt.FlagZanzana)
|
||||
db, cfg := db.InitTestDBWithCfg(t)
|
||||
|
||||
// Hack to skip these tests on mysql 5.7
|
||||
if db.GetDialect().DriverName() == migrator.MySQL {
|
||||
if supported, err := db.RecursiveQueriesAreSupported(); !supported || err != nil {
|
||||
t.Skip("skipping integration test")
|
||||
}
|
||||
}
|
||||
|
||||
// Enable zanzana and run in embedded mode (part of grafana server)
|
||||
cfg.Zanzana.ZanzanaOnlyEvaluation = true
|
||||
cfg.Zanzana.Mode = setting.ZanzanaModeEmbedded
|
||||
cfg.Zanzana.ConcurrentChecks = 10
|
||||
|
||||
_, err := cfg.Raw.Section("rbac").NewKey("resources_with_managed_permissions_on_creation", "dashboard, folder")
|
||||
require.NoError(t, err)
|
||||
|
||||
quotaService := quotatest.New(false, nil)
|
||||
tagService := tagimpl.ProvideService(db)
|
||||
folderStore := folderimpl.ProvideDashboardFolderStore(db)
|
||||
fStore := folderimpl.ProvideStore(db)
|
||||
dashboardStore, err := database.ProvideDashboardStore(db, cfg, features, tagService, quotaService)
|
||||
require.NoError(t, err)
|
||||
|
||||
zclient, err := authz.ProvideZanzana(cfg, db, features)
|
||||
require.NoError(t, err)
|
||||
ac := acimpl.ProvideAccessControl(featuremgmt.WithFeatures(), zclient)
|
||||
|
||||
service, err := ProvideDashboardServiceImpl(
|
||||
cfg, dashboardStore, folderStore,
|
||||
featuremgmt.WithFeatures(),
|
||||
accesscontrolmock.NewMockedPermissionsService(),
|
||||
accesscontrolmock.NewMockedPermissionsService(),
|
||||
ac,
|
||||
foldertest.NewFakeService(),
|
||||
fStore,
|
||||
nil,
|
||||
zclient,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
guardianMock := &guardian.FakeDashboardGuardian{
|
||||
CanSaveValue: true,
|
||||
}
|
||||
guardian.MockDashboardGuardian(guardianMock)
|
||||
|
||||
createDashboards(t, service, 100, "test-a")
|
||||
createDashboards(t, service, 100, "test-b")
|
||||
|
||||
// Sync Grafana DB with zanzana (migrate data)
|
||||
tracer := tracing.InitializeTracerForTest()
|
||||
lock := serverlock.ProvideService(db, tracer)
|
||||
zanzanaSyncronizer := dualwrite.NewZanzanaReconciler(cfg, zclient, db, lock)
|
||||
err = zanzanaSyncronizer.ReconcileSync(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
query := &dashboards.FindPersistedDashboardsQuery{
|
||||
Title: "test-a",
|
||||
Limit: 1000,
|
||||
SignedInUser: &user.SignedInUser{
|
||||
OrgID: 1,
|
||||
UserID: 1,
|
||||
UserUID: "test1",
|
||||
Namespace: "default",
|
||||
},
|
||||
}
|
||||
res, err := service.FindDashboardsZanzana(context.Background(), query)
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 0, len(res))
|
||||
})
|
||||
}
|
||||
|
||||
func createDashboard(t *testing.T, service dashboards.DashboardService, uid, title string) {
|
||||
dto := &dashboards.SaveDashboardDTO{
|
||||
OrgID: 1,
|
||||
// User: user,
|
||||
User: &user.SignedInUser{
|
||||
OrgID: 1,
|
||||
UserID: 1,
|
||||
},
|
||||
}
|
||||
dto.Dashboard = dashboards.NewDashboard(title)
|
||||
dto.Dashboard.SetUID(uid)
|
||||
|
||||
_, err := service.SaveDashboard(context.Background(), dto, false)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func createDashboards(t *testing.T, service dashboards.DashboardService, number int, prefix string) {
|
||||
for i := 0; i < number; i++ {
|
||||
title := fmt.Sprintf("%s-%d", prefix, i)
|
||||
uid := fmt.Sprintf("dash-%s", title)
|
||||
createDashboard(t, service, uid, title)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user