From c04be62b6589cc5db43ea832888249b02734d2c3 Mon Sep 17 00:00:00 2001 From: Karl Persson Date: Thu, 4 Jul 2024 11:23:48 +0200 Subject: [PATCH] Zanzana: client integration test (#89997) * Restructure * Zanzana: Add integration tests for client * skip mysql 5.7 integration tests --- pkg/services/authz/zanzana/client/client.go | 15 +- .../authz/zanzana/client/client_test.go | 113 ++++++++++++++++ .../authz/zanzana/{ => logger}/logger.go | 34 ++--- pkg/services/authz/zanzana/server.go | 98 +------------- pkg/services/authz/zanzana/server/server.go | 113 ++++++++++++++++ pkg/services/authz/zanzana/store.go | 118 +--------------- .../zanzana/store/{ => assets}/assets.go | 2 +- .../sqlite/001_initialize_schema.sql | 0 .../002_add_authorization_model_version.sql | 0 .../sqlite/003_add_reverse_lookup_index.sql | 0 ...uthorization_model_serialized_protobuf.sql | 0 .../sqlite/005_add_conditions_to_tuples.sql | 0 .../authz/zanzana/store/sqlite/store_test.go | 7 +- pkg/services/authz/zanzana/store/store.go | 128 ++++++++++++++++++ 14 files changed, 397 insertions(+), 231 deletions(-) create mode 100644 pkg/services/authz/zanzana/client/client_test.go rename pkg/services/authz/zanzana/{ => logger}/logger.go (59%) create mode 100644 pkg/services/authz/zanzana/server/server.go rename pkg/services/authz/zanzana/store/{ => assets}/assets.go (91%) rename pkg/services/authz/zanzana/store/{ => assets}/migrations/sqlite/001_initialize_schema.sql (100%) rename pkg/services/authz/zanzana/store/{ => assets}/migrations/sqlite/002_add_authorization_model_version.sql (100%) rename pkg/services/authz/zanzana/store/{ => assets}/migrations/sqlite/003_add_reverse_lookup_index.sql (100%) rename pkg/services/authz/zanzana/store/{ => assets}/migrations/sqlite/004_add_authorization_model_serialized_protobuf.sql (100%) rename pkg/services/authz/zanzana/store/{ => assets}/migrations/sqlite/005_add_conditions_to_tuples.sql (100%) create mode 100644 pkg/services/authz/zanzana/store/store.go diff --git a/pkg/services/authz/zanzana/client/client.go b/pkg/services/authz/zanzana/client/client.go index e4c60780a40..cc31cc53d93 100644 --- a/pkg/services/authz/zanzana/client/client.go +++ b/pkg/services/authz/zanzana/client/client.go @@ -28,9 +28,16 @@ func WithLogger(logger log.Logger) ClientOption { } } +func WithSchema(dsl string) ClientOption { + return func(c *Client) { + c.dsl = dsl + } +} + type Client struct { logger log.Logger client openfgav1.OpenFGAServiceClient + dsl string tenantID string storeID string modelID string @@ -53,6 +60,10 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, opts ...ClientOption) c.tenantID = "stack-default" } + if c.dsl == "" { + c.dsl = schema.DSL + } + store, err := c.getOrCreateStore(ctx, c.tenantID) if err != nil { return nil, err @@ -60,7 +71,7 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, opts ...ClientOption) c.storeID = store.GetId() - modelID, err := c.loadModel(ctx, c.storeID, schema.DSL) + modelID, err := c.loadModel(ctx, c.storeID, c.dsl) if err != nil { return nil, err } @@ -165,7 +176,7 @@ func (c *Client) loadModel(ctx context.Context, storeID string, dsl string) (str // If provided dsl is equal to a stored dsl we use that as the authorization id if schema.EqualModels(dsl, storedDSL) { - return res.AuthorizationModels[0].GetId(), nil + return model.GetId(), nil } } diff --git a/pkg/services/authz/zanzana/client/client_test.go b/pkg/services/authz/zanzana/client/client_test.go new file mode 100644 index 00000000000..523fc665e36 --- /dev/null +++ b/pkg/services/authz/zanzana/client/client_test.go @@ -0,0 +1,113 @@ +package client + +import ( + "context" + "testing" + + openfgav1 "github.com/openfga/api/proto/openfga/v1" + + "github.com/fullstorydev/grpchan/inprocgrpc" + "github.com/grafana/grafana/pkg/infra/db" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/tests/testsuite" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + zserver "github.com/grafana/grafana/pkg/services/authz/zanzana/server" + zstore "github.com/grafana/grafana/pkg/services/authz/zanzana/store" + "github.com/grafana/grafana/pkg/services/sqlstore/migrator" +) + +func TestMain(m *testing.M) { + testsuite.Run(m) +} + +func TestIntegrationClient(t *testing.T) { + conn := zanzanaServerIntegrationTest(t) + + var ( + prevStoreID string + prevModelID string + ) + + t.Run("should create default store and authorization model on first startup", func(t *testing.T) { + c, err := New(context.Background(), conn) + require.NoError(t, err) + + assert.NotEmpty(t, c.storeID) + assert.NotEmpty(t, c.modelID) + + prevStoreID, prevModelID = c.storeID, c.modelID + }) + + t.Run("should reuse existing store and authorization model", func(t *testing.T) { + c, err := New(context.Background(), conn) + require.NoError(t, err) + + assert.Equal(t, prevStoreID, c.storeID) + assert.Equal(t, prevModelID, c.modelID) + }) + + t.Run("should create new store and authorization model when new tenant id is used", func(t *testing.T) { + c, err := New(context.Background(), conn, WithTenantID("new")) + require.NoError(t, err) + + assert.NotEmpty(t, c.storeID) + assert.NotEmpty(t, c.modelID) + + assert.NotEqual(t, prevStoreID, c.storeID) + assert.NotEqual(t, prevModelID, c.modelID) + + prevStoreID, prevModelID = c.storeID, c.modelID + }) + + t.Run("should update authorization model if it has new changes", func(t *testing.T) { + dsl := ` +model + schema 1.1 + +type user + ` + c, err := New(context.Background(), conn, WithTenantID("new"), WithSchema(dsl)) + require.NoError(t, err) + + assert.Equal(t, prevStoreID, c.storeID) + assert.NotEqual(t, prevModelID, c.modelID) + }) + + t.Run("should load older authorization model", func(t *testing.T) { + c, err := New(context.Background(), conn, WithTenantID("new")) + require.NoError(t, err) + + assert.Equal(t, prevStoreID, c.storeID) + assert.Equal(t, prevModelID, c.modelID) + }) +} + +func zanzanaServerIntegrationTest(tb testing.TB) *inprocgrpc.Channel { + if testing.Short() { + tb.Skip("skipping integration test") + } + + db, cfg := db.InitTestDBWithCfg(tb) + + // Hack to skip these tests on mysql 5.7 + if db.GetDialect().DriverName() == migrator.MySQL { + if supported, err := db.RecursiveQueriesAreSupported(); !supported || err != nil { + tb.Skip("skipping integration test") + } + } + + logger := log.NewNopLogger() + + store, err := zstore.NewEmbeddedStore(cfg, db, logger) + require.NoError(tb, err) + + srv, err := zserver.New(store, logger) + require.NoError(tb, err) + + channel := &inprocgrpc.Channel{} + openfgav1.RegisterOpenFGAServiceServer(channel, srv) + + return channel +} diff --git a/pkg/services/authz/zanzana/logger.go b/pkg/services/authz/zanzana/logger/logger.go similarity index 59% rename from pkg/services/authz/zanzana/logger.go rename to pkg/services/authz/zanzana/logger/logger.go index 79936df85e0..ac953ed4f15 100644 --- a/pkg/services/authz/zanzana/logger.go +++ b/pkg/services/authz/zanzana/logger/logger.go @@ -1,4 +1,4 @@ -package zanzana +package logger import ( "context" @@ -8,13 +8,13 @@ import ( "github.com/grafana/grafana/pkg/infra/log" ) -// zanzanaLogger is a grafana logger wrapper compatible with OpenFGA logger interface -type zanzanaLogger struct { +// ZanzanaLogger is a grafana logger wrapper compatible with OpenFGA logger interface +type ZanzanaLogger struct { logger log.Logger } -func newZanzanaLogger(logger log.Logger) *zanzanaLogger { - return &zanzanaLogger{ +func New(logger log.Logger) *ZanzanaLogger { + return &ZanzanaLogger{ logger: logger, } } @@ -36,50 +36,50 @@ func zapFieldsToArgs(fields []zap.Field) []any { return args } -func (l *zanzanaLogger) Debug(msg string, fields ...zap.Field) { +func (l *ZanzanaLogger) Debug(msg string, fields ...zap.Field) { l.logger.Debug(msg, zapFieldsToArgs(fields)...) } -func (l *zanzanaLogger) Info(msg string, fields ...zap.Field) { +func (l *ZanzanaLogger) Info(msg string, fields ...zap.Field) { l.logger.Info(msg, zapFieldsToArgs(fields)...) } -func (l *zanzanaLogger) Warn(msg string, fields ...zap.Field) { +func (l *ZanzanaLogger) Warn(msg string, fields ...zap.Field) { l.logger.Warn(msg, zapFieldsToArgs(fields)...) } -func (l *zanzanaLogger) Error(msg string, fields ...zap.Field) { +func (l *ZanzanaLogger) Error(msg string, fields ...zap.Field) { l.logger.Error(msg, zapFieldsToArgs(fields)...) } -func (l *zanzanaLogger) Panic(msg string, fields ...zap.Field) { +func (l *ZanzanaLogger) Panic(msg string, fields ...zap.Field) { l.logger.Error(msg, zapFieldsToArgs(fields)...) } -func (l *zanzanaLogger) Fatal(msg string, fields ...zap.Field) { +func (l *ZanzanaLogger) Fatal(msg string, fields ...zap.Field) { l.logger.Error(msg, zapFieldsToArgs(fields)...) } -func (l *zanzanaLogger) DebugWithContext(ctx context.Context, msg string, fields ...zap.Field) { +func (l *ZanzanaLogger) DebugWithContext(ctx context.Context, msg string, fields ...zap.Field) { l.logger.Debug(msg, zapFieldsToArgs(fields)...) } -func (l *zanzanaLogger) InfoWithContext(ctx context.Context, msg string, fields ...zap.Field) { +func (l *ZanzanaLogger) InfoWithContext(ctx context.Context, msg string, fields ...zap.Field) { l.logger.Info(msg, zapFieldsToArgs(fields)...) } -func (l *zanzanaLogger) WarnWithContext(ctx context.Context, msg string, fields ...zap.Field) { +func (l *ZanzanaLogger) WarnWithContext(ctx context.Context, msg string, fields ...zap.Field) { l.logger.Warn(msg, zapFieldsToArgs(fields)...) } -func (l *zanzanaLogger) ErrorWithContext(ctx context.Context, msg string, fields ...zap.Field) { +func (l *ZanzanaLogger) ErrorWithContext(ctx context.Context, msg string, fields ...zap.Field) { l.logger.Error(msg, zapFieldsToArgs(fields)...) } -func (l *zanzanaLogger) PanicWithContext(ctx context.Context, msg string, fields ...zap.Field) { +func (l *ZanzanaLogger) PanicWithContext(ctx context.Context, msg string, fields ...zap.Field) { l.logger.Error(msg, zapFieldsToArgs(fields)...) } -func (l *zanzanaLogger) FatalWithContext(ctx context.Context, msg string, fields ...zap.Field) { +func (l *ZanzanaLogger) FatalWithContext(ctx context.Context, msg string, fields ...zap.Field) { l.logger.Error(msg, zapFieldsToArgs(fields)...) } diff --git a/pkg/services/authz/zanzana/server.go b/pkg/services/authz/zanzana/server.go index 787ff6acab9..3d6b0dc4a6a 100644 --- a/pkg/services/authz/zanzana/server.go +++ b/pkg/services/authz/zanzana/server.go @@ -1,110 +1,20 @@ package zanzana import ( - "context" - "fmt" - "net/http" - "time" - - "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" - openfgav1 "github.com/openfga/api/proto/openfga/v1" - httpmiddleware "github.com/openfga/openfga/pkg/middleware/http" "github.com/openfga/openfga/pkg/server" - serverErrors "github.com/openfga/openfga/pkg/server/errors" "github.com/openfga/openfga/pkg/storage" - "github.com/rs/cors" - "go.uber.org/zap/zapcore" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - healthv1pb "google.golang.org/grpc/health/grpc_health_v1" - "google.golang.org/grpc/status" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/services/grpcserver" "github.com/grafana/grafana/pkg/setting" + + zserver "github.com/grafana/grafana/pkg/services/authz/zanzana/server" ) func NewServer(store storage.OpenFGADatastore, logger log.Logger) (*server.Server, error) { - // FIXME(kalleep): add support for more options, tracing etc - opts := []server.OpenFGAServiceV1Option{ - server.WithDatastore(store), - server.WithLogger(newZanzanaLogger(logger)), - } - - // FIXME(kalleep): Interceptors - // We probably need to at least need to add store id interceptor also - // would be nice to inject our own requestid? - srv, err := server.NewServerWithOpts(opts...) - if err != nil { - return nil, err - } - - return srv, nil + return zserver.New(store, logger) } -// StartOpenFGAHttpSever starts HTTP server which allows to use fga cli. func StartOpenFGAHttpSever(cfg *setting.Cfg, srv grpcserver.Provider, logger log.Logger) error { - dialOpts := []grpc.DialOption{ - grpc.WithTransportCredentials(insecure.NewCredentials()), - } - - addr := srv.GetAddress() - // Wait until GRPC server is initialized - ticker := time.NewTicker(100 * time.Millisecond) - defer ticker.Stop() - maxRetries := 100 - retries := 0 - for addr == "" && retries < maxRetries { - <-ticker.C - addr = srv.GetAddress() - retries++ - } - if addr == "" { - return fmt.Errorf("failed to start HTTP server: GRPC server unavailable") - } - - conn, err := grpc.NewClient(addr, dialOpts...) - if err != nil { - return fmt.Errorf("unable to dial GRPC: %w", err) - } - - muxOpts := []runtime.ServeMuxOption{ - runtime.WithForwardResponseOption(httpmiddleware.HTTPResponseModifier), - runtime.WithErrorHandler(func(c context.Context, - sr *runtime.ServeMux, mm runtime.Marshaler, w http.ResponseWriter, r *http.Request, e error) { - intCode := serverErrors.ConvertToEncodedErrorCode(status.Convert(e)) - httpmiddleware.CustomHTTPErrorHandler(c, w, r, serverErrors.NewEncodedError(intCode, e.Error())) - }), - runtime.WithStreamErrorHandler(func(ctx context.Context, e error) *status.Status { - intCode := serverErrors.ConvertToEncodedErrorCode(status.Convert(e)) - encodedErr := serverErrors.NewEncodedError(intCode, e.Error()) - return status.Convert(encodedErr) - }), - runtime.WithHealthzEndpoint(healthv1pb.NewHealthClient(conn)), - runtime.WithOutgoingHeaderMatcher(func(s string) (string, bool) { return s, true }), - } - mux := runtime.NewServeMux(muxOpts...) - if err := openfgav1.RegisterOpenFGAServiceHandler(context.TODO(), mux, conn); err != nil { - return fmt.Errorf("failed to register gateway handler: %w", err) - } - - httpServer := &http.Server{ - Addr: cfg.Zanzana.HttpAddr, - Handler: cors.New(cors.Options{ - AllowedOrigins: []string{"*"}, - AllowCredentials: true, - AllowedHeaders: []string{"*"}, - AllowedMethods: []string{http.MethodGet, http.MethodPost, - http.MethodHead, http.MethodPatch, http.MethodDelete, http.MethodPut}, - }).Handler(mux), - ReadHeaderTimeout: 30 * time.Second, - } - go func() { - err = httpServer.ListenAndServe() - if err != nil { - logger.Error("failed to start http server", zapcore.Field{Key: "err", Type: zapcore.ErrorType, Interface: err}) - } - }() - logger.Info(fmt.Sprintf("OpenFGA HTTP server listening on '%s'...", httpServer.Addr)) - return nil + return zserver.StartOpenFGAHttpSever(cfg, srv, logger) } diff --git a/pkg/services/authz/zanzana/server/server.go b/pkg/services/authz/zanzana/server/server.go new file mode 100644 index 00000000000..49baf954fde --- /dev/null +++ b/pkg/services/authz/zanzana/server/server.go @@ -0,0 +1,113 @@ +package server + +import ( + "context" + "fmt" + "net/http" + "time" + + openfgav1 "github.com/openfga/api/proto/openfga/v1" + httpmiddleware "github.com/openfga/openfga/pkg/middleware/http" + "github.com/openfga/openfga/pkg/server" + serverErrors "github.com/openfga/openfga/pkg/server/errors" + "github.com/openfga/openfga/pkg/storage" + + "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" + "github.com/rs/cors" + "go.uber.org/zap/zapcore" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + healthv1pb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" + + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/services/grpcserver" + "github.com/grafana/grafana/pkg/setting" + + zlogger "github.com/grafana/grafana/pkg/services/authz/zanzana/logger" +) + +func New(store storage.OpenFGADatastore, logger log.Logger) (*server.Server, error) { + // FIXME(kalleep): add support for more options, tracing etc + opts := []server.OpenFGAServiceV1Option{ + server.WithDatastore(store), + server.WithLogger(zlogger.New(logger)), + } + + // FIXME(kalleep): Interceptors + // We probably need to at least need to add store id interceptor also + // would be nice to inject our own requestid? + srv, err := server.NewServerWithOpts(opts...) + if err != nil { + return nil, err + } + + return srv, nil +} + +// StartOpenFGAHttpSever starts HTTP server which allows to use fga cli. +func StartOpenFGAHttpSever(cfg *setting.Cfg, srv grpcserver.Provider, logger log.Logger) error { + dialOpts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + } + + addr := srv.GetAddress() + // Wait until GRPC server is initialized + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + maxRetries := 100 + retries := 0 + for addr == "" && retries < maxRetries { + <-ticker.C + addr = srv.GetAddress() + retries++ + } + if addr == "" { + return fmt.Errorf("failed to start HTTP server: GRPC server unavailable") + } + + conn, err := grpc.NewClient(addr, dialOpts...) + if err != nil { + return fmt.Errorf("unable to dial GRPC: %w", err) + } + + muxOpts := []runtime.ServeMuxOption{ + runtime.WithForwardResponseOption(httpmiddleware.HTTPResponseModifier), + runtime.WithErrorHandler(func(c context.Context, + sr *runtime.ServeMux, mm runtime.Marshaler, w http.ResponseWriter, r *http.Request, e error) { + intCode := serverErrors.ConvertToEncodedErrorCode(status.Convert(e)) + httpmiddleware.CustomHTTPErrorHandler(c, w, r, serverErrors.NewEncodedError(intCode, e.Error())) + }), + runtime.WithStreamErrorHandler(func(ctx context.Context, e error) *status.Status { + intCode := serverErrors.ConvertToEncodedErrorCode(status.Convert(e)) + encodedErr := serverErrors.NewEncodedError(intCode, e.Error()) + return status.Convert(encodedErr) + }), + runtime.WithHealthzEndpoint(healthv1pb.NewHealthClient(conn)), + runtime.WithOutgoingHeaderMatcher(func(s string) (string, bool) { return s, true }), + } + mux := runtime.NewServeMux(muxOpts...) + if err := openfgav1.RegisterOpenFGAServiceHandler(context.TODO(), mux, conn); err != nil { + return fmt.Errorf("failed to register gateway handler: %w", err) + } + + httpServer := &http.Server{ + Addr: cfg.Zanzana.HttpAddr, + Handler: cors.New(cors.Options{ + AllowedOrigins: []string{"*"}, + AllowCredentials: true, + AllowedHeaders: []string{"*"}, + AllowedMethods: []string{http.MethodGet, http.MethodPost, + http.MethodHead, http.MethodPatch, http.MethodDelete, http.MethodPut}, + }).Handler(mux), + ReadHeaderTimeout: 30 * time.Second, + } + go func() { + err = httpServer.ListenAndServe() + if err != nil { + logger.Error("failed to start http server", zapcore.Field{Key: "err", Type: zapcore.ErrorType, Interface: err}) + } + }() + logger.Info(fmt.Sprintf("OpenFGA HTTP server listening on '%s'...", httpServer.Addr)) + return nil +} diff --git a/pkg/services/authz/zanzana/store.go b/pkg/services/authz/zanzana/store.go index 380d3d08ba1..67361386744 100644 --- a/pkg/services/authz/zanzana/store.go +++ b/pkg/services/authz/zanzana/store.go @@ -1,128 +1,18 @@ package zanzana import ( - "fmt" - "time" - - "xorm.io/xorm" - - "github.com/openfga/openfga/assets" "github.com/openfga/openfga/pkg/storage" - "github.com/openfga/openfga/pkg/storage/mysql" - "github.com/openfga/openfga/pkg/storage/postgres" - "github.com/openfga/openfga/pkg/storage/sqlcommon" "github.com/grafana/grafana/pkg/infra/db" "github.com/grafana/grafana/pkg/infra/log" - zstore "github.com/grafana/grafana/pkg/services/authz/zanzana/store" - "github.com/grafana/grafana/pkg/services/authz/zanzana/store/migration" - "github.com/grafana/grafana/pkg/services/authz/zanzana/store/sqlite" - "github.com/grafana/grafana/pkg/services/sqlstore" - "github.com/grafana/grafana/pkg/services/sqlstore/migrator" "github.com/grafana/grafana/pkg/setting" + + "github.com/grafana/grafana/pkg/services/authz/zanzana/store" ) -// FIXME(kalleep): Add sqlite data store. -// There is no support for sqlite atm but we are working on adding it: https://github.com/openfga/openfga/pull/1615 func NewStore(cfg *setting.Cfg, logger log.Logger) (storage.OpenFGADatastore, error) { - grafanaDBCfg, zanzanaDBCfg, err := parseConfig(cfg, logger) - if err != nil { - return nil, fmt.Errorf("failed to parse database config: %w", err) - } - - switch grafanaDBCfg.Type { - case migrator.SQLite: - connStr := grafanaDBCfg.ConnectionString - // Initilize connection using xorm engine so we can reuse it for both migrations and data store - engine, err := xorm.NewEngine(grafanaDBCfg.Type, connStr) - if err != nil { - return nil, fmt.Errorf("failed to connect to database: %w", err) - } - - m := migrator.NewMigrator(engine, cfg) - if err := migration.RunWithMigrator(m, cfg, zstore.EmbedMigrations, zstore.SQLiteMigrationDir); err != nil { - return nil, fmt.Errorf("failed to run migrations: %w", err) - } - - return sqlite.NewWithDB(engine.DB().DB, &sqlite.Config{ - Config: zanzanaDBCfg, - QueryRetries: grafanaDBCfg.QueryRetries, - }) - case migrator.MySQL: - // For mysql we need to pass parseTime parameter in connection string - connStr := grafanaDBCfg.ConnectionString + "&parseTime=true" - if err := migration.Run(cfg, migrator.MySQL, connStr, assets.EmbedMigrations, assets.MySQLMigrationDir); err != nil { - return nil, fmt.Errorf("failed to run migrations: %w", err) - } - - return mysql.New(connStr, zanzanaDBCfg) - case migrator.Postgres: - connStr := grafanaDBCfg.ConnectionString - if err := migration.Run(cfg, migrator.Postgres, connStr, assets.EmbedMigrations, assets.PostgresMigrationDir); err != nil { - return nil, fmt.Errorf("failed to run migrations: %w", err) - } - - return postgres.New(connStr, zanzanaDBCfg) - } - - // Should never happen - return nil, fmt.Errorf("unsupported database engine: %s", grafanaDBCfg.Type) + return store.NewStore(cfg, logger) } - func NewEmbeddedStore(cfg *setting.Cfg, db db.DB, logger log.Logger) (storage.OpenFGADatastore, error) { - grafanaDBCfg, zanzanaDBCfg, err := parseConfig(cfg, logger) - if err != nil { - return nil, fmt.Errorf("failed to parse database config: %w", err) - } - - m := migrator.NewMigrator(db.GetEngine(), cfg) - - switch grafanaDBCfg.Type { - case migrator.SQLite: - if err := migration.RunWithMigrator(m, cfg, zstore.EmbedMigrations, zstore.SQLiteMigrationDir); err != nil { - return nil, fmt.Errorf("failed to run migrations: %w", err) - } - - // FIXME(kalleep): We should work on getting sqlite implemtation merged upstream and replace this one - return sqlite.NewWithDB(db.GetEngine().DB().DB, &sqlite.Config{ - Config: zanzanaDBCfg, - QueryRetries: grafanaDBCfg.QueryRetries, - }) - case migrator.MySQL: - if err := migration.RunWithMigrator(m, cfg, assets.EmbedMigrations, assets.MySQLMigrationDir); err != nil { - return nil, fmt.Errorf("failed to run migrations: %w", err) - } - - // For mysql we need to pass parseTime parameter in connection string - return mysql.New(grafanaDBCfg.ConnectionString+"&parseTime=true", zanzanaDBCfg) - case migrator.Postgres: - if err := migration.RunWithMigrator(m, cfg, assets.EmbedMigrations, assets.PostgresMigrationDir); err != nil { - return nil, fmt.Errorf("failed to run migrations: %w", err) - } - - return postgres.New(grafanaDBCfg.ConnectionString, zanzanaDBCfg) - } - - // Should never happen - return nil, fmt.Errorf("unsupported database engine: %s", db.GetDialect().DriverName()) -} - -func parseConfig(cfg *setting.Cfg, logger log.Logger) (*sqlstore.DatabaseConfig, *sqlcommon.Config, error) { - sec := cfg.Raw.Section("database") - grafanaDBCfg, err := sqlstore.NewDatabaseConfig(cfg, nil) - if err != nil { - return nil, nil, nil - } - - zanzanaDBCfg := &sqlcommon.Config{ - Logger: newZanzanaLogger(logger), - MaxTuplesPerWriteField: 100, - MaxTypesPerModelField: 100, - MaxOpenConns: grafanaDBCfg.MaxOpenConn, - MaxIdleConns: grafanaDBCfg.MaxIdleConn, - ConnMaxLifetime: time.Duration(grafanaDBCfg.ConnMaxLifetime) * time.Second, - ExportMetrics: sec.Key("instrument_queries").MustBool(false), - } - - return grafanaDBCfg, zanzanaDBCfg, nil + return store.NewEmbeddedStore(cfg, db, logger) } diff --git a/pkg/services/authz/zanzana/store/assets.go b/pkg/services/authz/zanzana/store/assets/assets.go similarity index 91% rename from pkg/services/authz/zanzana/store/assets.go rename to pkg/services/authz/zanzana/store/assets/assets.go index e61b32e5b93..fb55d16eef2 100644 --- a/pkg/services/authz/zanzana/store/assets.go +++ b/pkg/services/authz/zanzana/store/assets/assets.go @@ -1,4 +1,4 @@ -package store +package assets import "embed" diff --git a/pkg/services/authz/zanzana/store/migrations/sqlite/001_initialize_schema.sql b/pkg/services/authz/zanzana/store/assets/migrations/sqlite/001_initialize_schema.sql similarity index 100% rename from pkg/services/authz/zanzana/store/migrations/sqlite/001_initialize_schema.sql rename to pkg/services/authz/zanzana/store/assets/migrations/sqlite/001_initialize_schema.sql diff --git a/pkg/services/authz/zanzana/store/migrations/sqlite/002_add_authorization_model_version.sql b/pkg/services/authz/zanzana/store/assets/migrations/sqlite/002_add_authorization_model_version.sql similarity index 100% rename from pkg/services/authz/zanzana/store/migrations/sqlite/002_add_authorization_model_version.sql rename to pkg/services/authz/zanzana/store/assets/migrations/sqlite/002_add_authorization_model_version.sql diff --git a/pkg/services/authz/zanzana/store/migrations/sqlite/003_add_reverse_lookup_index.sql b/pkg/services/authz/zanzana/store/assets/migrations/sqlite/003_add_reverse_lookup_index.sql similarity index 100% rename from pkg/services/authz/zanzana/store/migrations/sqlite/003_add_reverse_lookup_index.sql rename to pkg/services/authz/zanzana/store/assets/migrations/sqlite/003_add_reverse_lookup_index.sql diff --git a/pkg/services/authz/zanzana/store/migrations/sqlite/004_add_authorization_model_serialized_protobuf.sql b/pkg/services/authz/zanzana/store/assets/migrations/sqlite/004_add_authorization_model_serialized_protobuf.sql similarity index 100% rename from pkg/services/authz/zanzana/store/migrations/sqlite/004_add_authorization_model_serialized_protobuf.sql rename to pkg/services/authz/zanzana/store/assets/migrations/sqlite/004_add_authorization_model_serialized_protobuf.sql diff --git a/pkg/services/authz/zanzana/store/migrations/sqlite/005_add_conditions_to_tuples.sql b/pkg/services/authz/zanzana/store/assets/migrations/sqlite/005_add_conditions_to_tuples.sql similarity index 100% rename from pkg/services/authz/zanzana/store/migrations/sqlite/005_add_conditions_to_tuples.sql rename to pkg/services/authz/zanzana/store/assets/migrations/sqlite/005_add_conditions_to_tuples.sql diff --git a/pkg/services/authz/zanzana/store/sqlite/store_test.go b/pkg/services/authz/zanzana/store/sqlite/store_test.go index 2274fec3822..5388114d188 100644 --- a/pkg/services/authz/zanzana/store/sqlite/store_test.go +++ b/pkg/services/authz/zanzana/store/sqlite/store_test.go @@ -19,10 +19,11 @@ import ( "github.com/openfga/openfga/pkg/typesystem" "github.com/grafana/grafana/pkg/infra/db" - "github.com/grafana/grafana/pkg/services/authz/zanzana/store" - "github.com/grafana/grafana/pkg/services/authz/zanzana/store/migration" "github.com/grafana/grafana/pkg/services/sqlstore/migrator" "github.com/grafana/grafana/pkg/tests/testsuite" + + zassets "github.com/grafana/grafana/pkg/services/authz/zanzana/store/assets" + "github.com/grafana/grafana/pkg/services/authz/zanzana/store/migration" ) func TestMain(m *testing.M) { @@ -287,7 +288,7 @@ func sqliteIntegrationTest(tb testing.TB) *sql.DB { db, cfg := db.InitTestDBWithCfg(tb) m := migrator.NewMigrator(db.GetEngine(), cfg) - err := migration.RunWithMigrator(m, cfg, store.EmbedMigrations, store.SQLiteMigrationDir) + err := migration.RunWithMigrator(m, cfg, zassets.EmbedMigrations, zassets.SQLiteMigrationDir) require.NoError(tb, err) return db.GetEngine().DB().DB diff --git a/pkg/services/authz/zanzana/store/store.go b/pkg/services/authz/zanzana/store/store.go new file mode 100644 index 00000000000..fe7aadc0cee --- /dev/null +++ b/pkg/services/authz/zanzana/store/store.go @@ -0,0 +1,128 @@ +package store + +import ( + "fmt" + "time" + + "xorm.io/xorm" + + "github.com/openfga/openfga/assets" + "github.com/openfga/openfga/pkg/storage" + "github.com/openfga/openfga/pkg/storage/mysql" + "github.com/openfga/openfga/pkg/storage/postgres" + "github.com/openfga/openfga/pkg/storage/sqlcommon" + + "github.com/grafana/grafana/pkg/infra/db" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/services/sqlstore" + "github.com/grafana/grafana/pkg/services/sqlstore/migrator" + "github.com/grafana/grafana/pkg/setting" + + zlogger "github.com/grafana/grafana/pkg/services/authz/zanzana/logger" + zassets "github.com/grafana/grafana/pkg/services/authz/zanzana/store/assets" + "github.com/grafana/grafana/pkg/services/authz/zanzana/store/migration" + "github.com/grafana/grafana/pkg/services/authz/zanzana/store/sqlite" +) + +func NewStore(cfg *setting.Cfg, logger log.Logger) (storage.OpenFGADatastore, error) { + grafanaDBCfg, zanzanaDBCfg, err := parseConfig(cfg, logger) + if err != nil { + return nil, fmt.Errorf("failed to parse database config: %w", err) + } + + switch grafanaDBCfg.Type { + case migrator.SQLite: + connStr := grafanaDBCfg.ConnectionString + // Initilize connection using xorm engine so we can reuse it for both migrations and data store + engine, err := xorm.NewEngine(grafanaDBCfg.Type, connStr) + if err != nil { + return nil, fmt.Errorf("failed to connect to database: %w", err) + } + + m := migrator.NewMigrator(engine, cfg) + if err := migration.RunWithMigrator(m, cfg, zassets.EmbedMigrations, zassets.SQLiteMigrationDir); err != nil { + return nil, fmt.Errorf("failed to run migrations: %w", err) + } + + return sqlite.NewWithDB(engine.DB().DB, &sqlite.Config{ + Config: zanzanaDBCfg, + QueryRetries: grafanaDBCfg.QueryRetries, + }) + case migrator.MySQL: + // For mysql we need to pass parseTime parameter in connection string + connStr := grafanaDBCfg.ConnectionString + "&parseTime=true" + if err := migration.Run(cfg, migrator.MySQL, connStr, assets.EmbedMigrations, assets.MySQLMigrationDir); err != nil { + return nil, fmt.Errorf("failed to run migrations: %w", err) + } + + return mysql.New(connStr, zanzanaDBCfg) + case migrator.Postgres: + connStr := grafanaDBCfg.ConnectionString + if err := migration.Run(cfg, migrator.Postgres, connStr, assets.EmbedMigrations, assets.PostgresMigrationDir); err != nil { + return nil, fmt.Errorf("failed to run migrations: %w", err) + } + + return postgres.New(connStr, zanzanaDBCfg) + } + + // Should never happen + return nil, fmt.Errorf("unsupported database engine: %s", grafanaDBCfg.Type) +} + +func NewEmbeddedStore(cfg *setting.Cfg, db db.DB, logger log.Logger) (storage.OpenFGADatastore, error) { + grafanaDBCfg, zanzanaDBCfg, err := parseConfig(cfg, logger) + if err != nil { + return nil, fmt.Errorf("failed to parse database config: %w", err) + } + + m := migrator.NewMigrator(db.GetEngine(), cfg) + + switch grafanaDBCfg.Type { + case migrator.SQLite: + if err := migration.RunWithMigrator(m, cfg, zassets.EmbedMigrations, zassets.SQLiteMigrationDir); err != nil { + return nil, fmt.Errorf("failed to run migrations: %w", err) + } + + // FIXME(kalleep): We should work on getting sqlite implemtation merged upstream and replace this one + return sqlite.NewWithDB(db.GetEngine().DB().DB, &sqlite.Config{ + Config: zanzanaDBCfg, + QueryRetries: grafanaDBCfg.QueryRetries, + }) + case migrator.MySQL: + if err := migration.RunWithMigrator(m, cfg, assets.EmbedMigrations, assets.MySQLMigrationDir); err != nil { + return nil, fmt.Errorf("failed to run migrations: %w", err) + } + + // For mysql we need to pass parseTime parameter in connection string + return mysql.New(grafanaDBCfg.ConnectionString+"&parseTime=true", zanzanaDBCfg) + case migrator.Postgres: + if err := migration.RunWithMigrator(m, cfg, assets.EmbedMigrations, assets.PostgresMigrationDir); err != nil { + return nil, fmt.Errorf("failed to run migrations: %w", err) + } + + return postgres.New(grafanaDBCfg.ConnectionString, zanzanaDBCfg) + } + + // Should never happen + return nil, fmt.Errorf("unsupported database engine: %s", db.GetDialect().DriverName()) +} + +func parseConfig(cfg *setting.Cfg, logger log.Logger) (*sqlstore.DatabaseConfig, *sqlcommon.Config, error) { + sec := cfg.Raw.Section("database") + grafanaDBCfg, err := sqlstore.NewDatabaseConfig(cfg, nil) + if err != nil { + return nil, nil, nil + } + + zanzanaDBCfg := &sqlcommon.Config{ + Logger: zlogger.New(logger), + MaxTuplesPerWriteField: 100, + MaxTypesPerModelField: 100, + MaxOpenConns: grafanaDBCfg.MaxOpenConn, + MaxIdleConns: grafanaDBCfg.MaxIdleConn, + ConnMaxLifetime: time.Duration(grafanaDBCfg.ConnMaxLifetime) * time.Second, + ExportMetrics: sec.Key("instrument_queries").MustBool(false), + } + + return grafanaDBCfg, zanzanaDBCfg, nil +}