mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Zanzana: client integration test (#89997)
* Restructure * Zanzana: Add integration tests for client * skip mysql 5.7 integration tests
This commit is contained in:
@@ -28,9 +28,16 @@ func WithLogger(logger log.Logger) ClientOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithSchema(dsl string) ClientOption {
|
||||||
|
return func(c *Client) {
|
||||||
|
c.dsl = dsl
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type Client struct {
|
type Client struct {
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
client openfgav1.OpenFGAServiceClient
|
client openfgav1.OpenFGAServiceClient
|
||||||
|
dsl string
|
||||||
tenantID string
|
tenantID string
|
||||||
storeID string
|
storeID string
|
||||||
modelID string
|
modelID string
|
||||||
@@ -53,6 +60,10 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, opts ...ClientOption)
|
|||||||
c.tenantID = "stack-default"
|
c.tenantID = "stack-default"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if c.dsl == "" {
|
||||||
|
c.dsl = schema.DSL
|
||||||
|
}
|
||||||
|
|
||||||
store, err := c.getOrCreateStore(ctx, c.tenantID)
|
store, err := c.getOrCreateStore(ctx, c.tenantID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -60,7 +71,7 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, opts ...ClientOption)
|
|||||||
|
|
||||||
c.storeID = store.GetId()
|
c.storeID = store.GetId()
|
||||||
|
|
||||||
modelID, err := c.loadModel(ctx, c.storeID, schema.DSL)
|
modelID, err := c.loadModel(ctx, c.storeID, c.dsl)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -165,7 +176,7 @@ func (c *Client) loadModel(ctx context.Context, storeID string, dsl string) (str
|
|||||||
|
|
||||||
// If provided dsl is equal to a stored dsl we use that as the authorization id
|
// If provided dsl is equal to a stored dsl we use that as the authorization id
|
||||||
if schema.EqualModels(dsl, storedDSL) {
|
if schema.EqualModels(dsl, storedDSL) {
|
||||||
return res.AuthorizationModels[0].GetId(), nil
|
return model.GetId(), nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
113
pkg/services/authz/zanzana/client/client_test.go
Normal file
113
pkg/services/authz/zanzana/client/client_test.go
Normal file
@@ -0,0 +1,113 @@
|
|||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
openfgav1 "github.com/openfga/api/proto/openfga/v1"
|
||||||
|
|
||||||
|
"github.com/fullstorydev/grpchan/inprocgrpc"
|
||||||
|
"github.com/grafana/grafana/pkg/infra/db"
|
||||||
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
|
"github.com/grafana/grafana/pkg/tests/testsuite"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
zserver "github.com/grafana/grafana/pkg/services/authz/zanzana/server"
|
||||||
|
zstore "github.com/grafana/grafana/pkg/services/authz/zanzana/store"
|
||||||
|
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMain(m *testing.M) {
|
||||||
|
testsuite.Run(m)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestIntegrationClient(t *testing.T) {
|
||||||
|
conn := zanzanaServerIntegrationTest(t)
|
||||||
|
|
||||||
|
var (
|
||||||
|
prevStoreID string
|
||||||
|
prevModelID string
|
||||||
|
)
|
||||||
|
|
||||||
|
t.Run("should create default store and authorization model on first startup", func(t *testing.T) {
|
||||||
|
c, err := New(context.Background(), conn)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
assert.NotEmpty(t, c.storeID)
|
||||||
|
assert.NotEmpty(t, c.modelID)
|
||||||
|
|
||||||
|
prevStoreID, prevModelID = c.storeID, c.modelID
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("should reuse existing store and authorization model", func(t *testing.T) {
|
||||||
|
c, err := New(context.Background(), conn)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
assert.Equal(t, prevStoreID, c.storeID)
|
||||||
|
assert.Equal(t, prevModelID, c.modelID)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("should create new store and authorization model when new tenant id is used", func(t *testing.T) {
|
||||||
|
c, err := New(context.Background(), conn, WithTenantID("new"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
assert.NotEmpty(t, c.storeID)
|
||||||
|
assert.NotEmpty(t, c.modelID)
|
||||||
|
|
||||||
|
assert.NotEqual(t, prevStoreID, c.storeID)
|
||||||
|
assert.NotEqual(t, prevModelID, c.modelID)
|
||||||
|
|
||||||
|
prevStoreID, prevModelID = c.storeID, c.modelID
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("should update authorization model if it has new changes", func(t *testing.T) {
|
||||||
|
dsl := `
|
||||||
|
model
|
||||||
|
schema 1.1
|
||||||
|
|
||||||
|
type user
|
||||||
|
`
|
||||||
|
c, err := New(context.Background(), conn, WithTenantID("new"), WithSchema(dsl))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
assert.Equal(t, prevStoreID, c.storeID)
|
||||||
|
assert.NotEqual(t, prevModelID, c.modelID)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("should load older authorization model", func(t *testing.T) {
|
||||||
|
c, err := New(context.Background(), conn, WithTenantID("new"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
assert.Equal(t, prevStoreID, c.storeID)
|
||||||
|
assert.Equal(t, prevModelID, c.modelID)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func zanzanaServerIntegrationTest(tb testing.TB) *inprocgrpc.Channel {
|
||||||
|
if testing.Short() {
|
||||||
|
tb.Skip("skipping integration test")
|
||||||
|
}
|
||||||
|
|
||||||
|
db, cfg := db.InitTestDBWithCfg(tb)
|
||||||
|
|
||||||
|
// Hack to skip these tests on mysql 5.7
|
||||||
|
if db.GetDialect().DriverName() == migrator.MySQL {
|
||||||
|
if supported, err := db.RecursiveQueriesAreSupported(); !supported || err != nil {
|
||||||
|
tb.Skip("skipping integration test")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
logger := log.NewNopLogger()
|
||||||
|
|
||||||
|
store, err := zstore.NewEmbeddedStore(cfg, db, logger)
|
||||||
|
require.NoError(tb, err)
|
||||||
|
|
||||||
|
srv, err := zserver.New(store, logger)
|
||||||
|
require.NoError(tb, err)
|
||||||
|
|
||||||
|
channel := &inprocgrpc.Channel{}
|
||||||
|
openfgav1.RegisterOpenFGAServiceServer(channel, srv)
|
||||||
|
|
||||||
|
return channel
|
||||||
|
}
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package zanzana
|
package logger
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@@ -8,13 +8,13 @@ import (
|
|||||||
"github.com/grafana/grafana/pkg/infra/log"
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// zanzanaLogger is a grafana logger wrapper compatible with OpenFGA logger interface
|
// ZanzanaLogger is a grafana logger wrapper compatible with OpenFGA logger interface
|
||||||
type zanzanaLogger struct {
|
type ZanzanaLogger struct {
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func newZanzanaLogger(logger log.Logger) *zanzanaLogger {
|
func New(logger log.Logger) *ZanzanaLogger {
|
||||||
return &zanzanaLogger{
|
return &ZanzanaLogger{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -36,50 +36,50 @@ func zapFieldsToArgs(fields []zap.Field) []any {
|
|||||||
return args
|
return args
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *zanzanaLogger) Debug(msg string, fields ...zap.Field) {
|
func (l *ZanzanaLogger) Debug(msg string, fields ...zap.Field) {
|
||||||
l.logger.Debug(msg, zapFieldsToArgs(fields)...)
|
l.logger.Debug(msg, zapFieldsToArgs(fields)...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *zanzanaLogger) Info(msg string, fields ...zap.Field) {
|
func (l *ZanzanaLogger) Info(msg string, fields ...zap.Field) {
|
||||||
l.logger.Info(msg, zapFieldsToArgs(fields)...)
|
l.logger.Info(msg, zapFieldsToArgs(fields)...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *zanzanaLogger) Warn(msg string, fields ...zap.Field) {
|
func (l *ZanzanaLogger) Warn(msg string, fields ...zap.Field) {
|
||||||
l.logger.Warn(msg, zapFieldsToArgs(fields)...)
|
l.logger.Warn(msg, zapFieldsToArgs(fields)...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *zanzanaLogger) Error(msg string, fields ...zap.Field) {
|
func (l *ZanzanaLogger) Error(msg string, fields ...zap.Field) {
|
||||||
l.logger.Error(msg, zapFieldsToArgs(fields)...)
|
l.logger.Error(msg, zapFieldsToArgs(fields)...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *zanzanaLogger) Panic(msg string, fields ...zap.Field) {
|
func (l *ZanzanaLogger) Panic(msg string, fields ...zap.Field) {
|
||||||
l.logger.Error(msg, zapFieldsToArgs(fields)...)
|
l.logger.Error(msg, zapFieldsToArgs(fields)...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *zanzanaLogger) Fatal(msg string, fields ...zap.Field) {
|
func (l *ZanzanaLogger) Fatal(msg string, fields ...zap.Field) {
|
||||||
l.logger.Error(msg, zapFieldsToArgs(fields)...)
|
l.logger.Error(msg, zapFieldsToArgs(fields)...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *zanzanaLogger) DebugWithContext(ctx context.Context, msg string, fields ...zap.Field) {
|
func (l *ZanzanaLogger) DebugWithContext(ctx context.Context, msg string, fields ...zap.Field) {
|
||||||
l.logger.Debug(msg, zapFieldsToArgs(fields)...)
|
l.logger.Debug(msg, zapFieldsToArgs(fields)...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *zanzanaLogger) InfoWithContext(ctx context.Context, msg string, fields ...zap.Field) {
|
func (l *ZanzanaLogger) InfoWithContext(ctx context.Context, msg string, fields ...zap.Field) {
|
||||||
l.logger.Info(msg, zapFieldsToArgs(fields)...)
|
l.logger.Info(msg, zapFieldsToArgs(fields)...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *zanzanaLogger) WarnWithContext(ctx context.Context, msg string, fields ...zap.Field) {
|
func (l *ZanzanaLogger) WarnWithContext(ctx context.Context, msg string, fields ...zap.Field) {
|
||||||
l.logger.Warn(msg, zapFieldsToArgs(fields)...)
|
l.logger.Warn(msg, zapFieldsToArgs(fields)...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *zanzanaLogger) ErrorWithContext(ctx context.Context, msg string, fields ...zap.Field) {
|
func (l *ZanzanaLogger) ErrorWithContext(ctx context.Context, msg string, fields ...zap.Field) {
|
||||||
l.logger.Error(msg, zapFieldsToArgs(fields)...)
|
l.logger.Error(msg, zapFieldsToArgs(fields)...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *zanzanaLogger) PanicWithContext(ctx context.Context, msg string, fields ...zap.Field) {
|
func (l *ZanzanaLogger) PanicWithContext(ctx context.Context, msg string, fields ...zap.Field) {
|
||||||
l.logger.Error(msg, zapFieldsToArgs(fields)...)
|
l.logger.Error(msg, zapFieldsToArgs(fields)...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *zanzanaLogger) FatalWithContext(ctx context.Context, msg string, fields ...zap.Field) {
|
func (l *ZanzanaLogger) FatalWithContext(ctx context.Context, msg string, fields ...zap.Field) {
|
||||||
l.logger.Error(msg, zapFieldsToArgs(fields)...)
|
l.logger.Error(msg, zapFieldsToArgs(fields)...)
|
||||||
}
|
}
|
||||||
@@ -1,110 +1,20 @@
|
|||||||
package zanzana
|
package zanzana
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"net/http"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
|
|
||||||
openfgav1 "github.com/openfga/api/proto/openfga/v1"
|
|
||||||
httpmiddleware "github.com/openfga/openfga/pkg/middleware/http"
|
|
||||||
"github.com/openfga/openfga/pkg/server"
|
"github.com/openfga/openfga/pkg/server"
|
||||||
serverErrors "github.com/openfga/openfga/pkg/server/errors"
|
|
||||||
"github.com/openfga/openfga/pkg/storage"
|
"github.com/openfga/openfga/pkg/storage"
|
||||||
"github.com/rs/cors"
|
|
||||||
"go.uber.org/zap/zapcore"
|
|
||||||
"google.golang.org/grpc"
|
|
||||||
"google.golang.org/grpc/credentials/insecure"
|
|
||||||
healthv1pb "google.golang.org/grpc/health/grpc_health_v1"
|
|
||||||
"google.golang.org/grpc/status"
|
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/infra/log"
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
"github.com/grafana/grafana/pkg/services/grpcserver"
|
"github.com/grafana/grafana/pkg/services/grpcserver"
|
||||||
"github.com/grafana/grafana/pkg/setting"
|
"github.com/grafana/grafana/pkg/setting"
|
||||||
|
|
||||||
|
zserver "github.com/grafana/grafana/pkg/services/authz/zanzana/server"
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewServer(store storage.OpenFGADatastore, logger log.Logger) (*server.Server, error) {
|
func NewServer(store storage.OpenFGADatastore, logger log.Logger) (*server.Server, error) {
|
||||||
// FIXME(kalleep): add support for more options, tracing etc
|
return zserver.New(store, logger)
|
||||||
opts := []server.OpenFGAServiceV1Option{
|
|
||||||
server.WithDatastore(store),
|
|
||||||
server.WithLogger(newZanzanaLogger(logger)),
|
|
||||||
}
|
|
||||||
|
|
||||||
// FIXME(kalleep): Interceptors
|
|
||||||
// We probably need to at least need to add store id interceptor also
|
|
||||||
// would be nice to inject our own requestid?
|
|
||||||
srv, err := server.NewServerWithOpts(opts...)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return srv, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// StartOpenFGAHttpSever starts HTTP server which allows to use fga cli.
|
|
||||||
func StartOpenFGAHttpSever(cfg *setting.Cfg, srv grpcserver.Provider, logger log.Logger) error {
|
func StartOpenFGAHttpSever(cfg *setting.Cfg, srv grpcserver.Provider, logger log.Logger) error {
|
||||||
dialOpts := []grpc.DialOption{
|
return zserver.StartOpenFGAHttpSever(cfg, srv, logger)
|
||||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
|
||||||
}
|
|
||||||
|
|
||||||
addr := srv.GetAddress()
|
|
||||||
// Wait until GRPC server is initialized
|
|
||||||
ticker := time.NewTicker(100 * time.Millisecond)
|
|
||||||
defer ticker.Stop()
|
|
||||||
maxRetries := 100
|
|
||||||
retries := 0
|
|
||||||
for addr == "" && retries < maxRetries {
|
|
||||||
<-ticker.C
|
|
||||||
addr = srv.GetAddress()
|
|
||||||
retries++
|
|
||||||
}
|
|
||||||
if addr == "" {
|
|
||||||
return fmt.Errorf("failed to start HTTP server: GRPC server unavailable")
|
|
||||||
}
|
|
||||||
|
|
||||||
conn, err := grpc.NewClient(addr, dialOpts...)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("unable to dial GRPC: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
muxOpts := []runtime.ServeMuxOption{
|
|
||||||
runtime.WithForwardResponseOption(httpmiddleware.HTTPResponseModifier),
|
|
||||||
runtime.WithErrorHandler(func(c context.Context,
|
|
||||||
sr *runtime.ServeMux, mm runtime.Marshaler, w http.ResponseWriter, r *http.Request, e error) {
|
|
||||||
intCode := serverErrors.ConvertToEncodedErrorCode(status.Convert(e))
|
|
||||||
httpmiddleware.CustomHTTPErrorHandler(c, w, r, serverErrors.NewEncodedError(intCode, e.Error()))
|
|
||||||
}),
|
|
||||||
runtime.WithStreamErrorHandler(func(ctx context.Context, e error) *status.Status {
|
|
||||||
intCode := serverErrors.ConvertToEncodedErrorCode(status.Convert(e))
|
|
||||||
encodedErr := serverErrors.NewEncodedError(intCode, e.Error())
|
|
||||||
return status.Convert(encodedErr)
|
|
||||||
}),
|
|
||||||
runtime.WithHealthzEndpoint(healthv1pb.NewHealthClient(conn)),
|
|
||||||
runtime.WithOutgoingHeaderMatcher(func(s string) (string, bool) { return s, true }),
|
|
||||||
}
|
|
||||||
mux := runtime.NewServeMux(muxOpts...)
|
|
||||||
if err := openfgav1.RegisterOpenFGAServiceHandler(context.TODO(), mux, conn); err != nil {
|
|
||||||
return fmt.Errorf("failed to register gateway handler: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
httpServer := &http.Server{
|
|
||||||
Addr: cfg.Zanzana.HttpAddr,
|
|
||||||
Handler: cors.New(cors.Options{
|
|
||||||
AllowedOrigins: []string{"*"},
|
|
||||||
AllowCredentials: true,
|
|
||||||
AllowedHeaders: []string{"*"},
|
|
||||||
AllowedMethods: []string{http.MethodGet, http.MethodPost,
|
|
||||||
http.MethodHead, http.MethodPatch, http.MethodDelete, http.MethodPut},
|
|
||||||
}).Handler(mux),
|
|
||||||
ReadHeaderTimeout: 30 * time.Second,
|
|
||||||
}
|
|
||||||
go func() {
|
|
||||||
err = httpServer.ListenAndServe()
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("failed to start http server", zapcore.Field{Key: "err", Type: zapcore.ErrorType, Interface: err})
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
logger.Info(fmt.Sprintf("OpenFGA HTTP server listening on '%s'...", httpServer.Addr))
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|||||||
113
pkg/services/authz/zanzana/server/server.go
Normal file
113
pkg/services/authz/zanzana/server/server.go
Normal file
@@ -0,0 +1,113 @@
|
|||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
openfgav1 "github.com/openfga/api/proto/openfga/v1"
|
||||||
|
httpmiddleware "github.com/openfga/openfga/pkg/middleware/http"
|
||||||
|
"github.com/openfga/openfga/pkg/server"
|
||||||
|
serverErrors "github.com/openfga/openfga/pkg/server/errors"
|
||||||
|
"github.com/openfga/openfga/pkg/storage"
|
||||||
|
|
||||||
|
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
|
||||||
|
"github.com/rs/cors"
|
||||||
|
"go.uber.org/zap/zapcore"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
|
healthv1pb "google.golang.org/grpc/health/grpc_health_v1"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
|
|
||||||
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
|
"github.com/grafana/grafana/pkg/services/grpcserver"
|
||||||
|
"github.com/grafana/grafana/pkg/setting"
|
||||||
|
|
||||||
|
zlogger "github.com/grafana/grafana/pkg/services/authz/zanzana/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
func New(store storage.OpenFGADatastore, logger log.Logger) (*server.Server, error) {
|
||||||
|
// FIXME(kalleep): add support for more options, tracing etc
|
||||||
|
opts := []server.OpenFGAServiceV1Option{
|
||||||
|
server.WithDatastore(store),
|
||||||
|
server.WithLogger(zlogger.New(logger)),
|
||||||
|
}
|
||||||
|
|
||||||
|
// FIXME(kalleep): Interceptors
|
||||||
|
// We probably need to at least need to add store id interceptor also
|
||||||
|
// would be nice to inject our own requestid?
|
||||||
|
srv, err := server.NewServerWithOpts(opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return srv, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartOpenFGAHttpSever starts HTTP server which allows to use fga cli.
|
||||||
|
func StartOpenFGAHttpSever(cfg *setting.Cfg, srv grpcserver.Provider, logger log.Logger) error {
|
||||||
|
dialOpts := []grpc.DialOption{
|
||||||
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||||
|
}
|
||||||
|
|
||||||
|
addr := srv.GetAddress()
|
||||||
|
// Wait until GRPC server is initialized
|
||||||
|
ticker := time.NewTicker(100 * time.Millisecond)
|
||||||
|
defer ticker.Stop()
|
||||||
|
maxRetries := 100
|
||||||
|
retries := 0
|
||||||
|
for addr == "" && retries < maxRetries {
|
||||||
|
<-ticker.C
|
||||||
|
addr = srv.GetAddress()
|
||||||
|
retries++
|
||||||
|
}
|
||||||
|
if addr == "" {
|
||||||
|
return fmt.Errorf("failed to start HTTP server: GRPC server unavailable")
|
||||||
|
}
|
||||||
|
|
||||||
|
conn, err := grpc.NewClient(addr, dialOpts...)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to dial GRPC: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
muxOpts := []runtime.ServeMuxOption{
|
||||||
|
runtime.WithForwardResponseOption(httpmiddleware.HTTPResponseModifier),
|
||||||
|
runtime.WithErrorHandler(func(c context.Context,
|
||||||
|
sr *runtime.ServeMux, mm runtime.Marshaler, w http.ResponseWriter, r *http.Request, e error) {
|
||||||
|
intCode := serverErrors.ConvertToEncodedErrorCode(status.Convert(e))
|
||||||
|
httpmiddleware.CustomHTTPErrorHandler(c, w, r, serverErrors.NewEncodedError(intCode, e.Error()))
|
||||||
|
}),
|
||||||
|
runtime.WithStreamErrorHandler(func(ctx context.Context, e error) *status.Status {
|
||||||
|
intCode := serverErrors.ConvertToEncodedErrorCode(status.Convert(e))
|
||||||
|
encodedErr := serverErrors.NewEncodedError(intCode, e.Error())
|
||||||
|
return status.Convert(encodedErr)
|
||||||
|
}),
|
||||||
|
runtime.WithHealthzEndpoint(healthv1pb.NewHealthClient(conn)),
|
||||||
|
runtime.WithOutgoingHeaderMatcher(func(s string) (string, bool) { return s, true }),
|
||||||
|
}
|
||||||
|
mux := runtime.NewServeMux(muxOpts...)
|
||||||
|
if err := openfgav1.RegisterOpenFGAServiceHandler(context.TODO(), mux, conn); err != nil {
|
||||||
|
return fmt.Errorf("failed to register gateway handler: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
httpServer := &http.Server{
|
||||||
|
Addr: cfg.Zanzana.HttpAddr,
|
||||||
|
Handler: cors.New(cors.Options{
|
||||||
|
AllowedOrigins: []string{"*"},
|
||||||
|
AllowCredentials: true,
|
||||||
|
AllowedHeaders: []string{"*"},
|
||||||
|
AllowedMethods: []string{http.MethodGet, http.MethodPost,
|
||||||
|
http.MethodHead, http.MethodPatch, http.MethodDelete, http.MethodPut},
|
||||||
|
}).Handler(mux),
|
||||||
|
ReadHeaderTimeout: 30 * time.Second,
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
err = httpServer.ListenAndServe()
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("failed to start http server", zapcore.Field{Key: "err", Type: zapcore.ErrorType, Interface: err})
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
logger.Info(fmt.Sprintf("OpenFGA HTTP server listening on '%s'...", httpServer.Addr))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
@@ -1,128 +1,18 @@
|
|||||||
package zanzana
|
package zanzana
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"xorm.io/xorm"
|
|
||||||
|
|
||||||
"github.com/openfga/openfga/assets"
|
|
||||||
"github.com/openfga/openfga/pkg/storage"
|
"github.com/openfga/openfga/pkg/storage"
|
||||||
"github.com/openfga/openfga/pkg/storage/mysql"
|
|
||||||
"github.com/openfga/openfga/pkg/storage/postgres"
|
|
||||||
"github.com/openfga/openfga/pkg/storage/sqlcommon"
|
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/infra/db"
|
"github.com/grafana/grafana/pkg/infra/db"
|
||||||
"github.com/grafana/grafana/pkg/infra/log"
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
zstore "github.com/grafana/grafana/pkg/services/authz/zanzana/store"
|
|
||||||
"github.com/grafana/grafana/pkg/services/authz/zanzana/store/migration"
|
|
||||||
"github.com/grafana/grafana/pkg/services/authz/zanzana/store/sqlite"
|
|
||||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
|
||||||
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
|
||||||
"github.com/grafana/grafana/pkg/setting"
|
"github.com/grafana/grafana/pkg/setting"
|
||||||
|
|
||||||
|
"github.com/grafana/grafana/pkg/services/authz/zanzana/store"
|
||||||
)
|
)
|
||||||
|
|
||||||
// FIXME(kalleep): Add sqlite data store.
|
|
||||||
// There is no support for sqlite atm but we are working on adding it: https://github.com/openfga/openfga/pull/1615
|
|
||||||
func NewStore(cfg *setting.Cfg, logger log.Logger) (storage.OpenFGADatastore, error) {
|
func NewStore(cfg *setting.Cfg, logger log.Logger) (storage.OpenFGADatastore, error) {
|
||||||
grafanaDBCfg, zanzanaDBCfg, err := parseConfig(cfg, logger)
|
return store.NewStore(cfg, logger)
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to parse database config: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
switch grafanaDBCfg.Type {
|
|
||||||
case migrator.SQLite:
|
|
||||||
connStr := grafanaDBCfg.ConnectionString
|
|
||||||
// Initilize connection using xorm engine so we can reuse it for both migrations and data store
|
|
||||||
engine, err := xorm.NewEngine(grafanaDBCfg.Type, connStr)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to connect to database: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
m := migrator.NewMigrator(engine, cfg)
|
|
||||||
if err := migration.RunWithMigrator(m, cfg, zstore.EmbedMigrations, zstore.SQLiteMigrationDir); err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to run migrations: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return sqlite.NewWithDB(engine.DB().DB, &sqlite.Config{
|
|
||||||
Config: zanzanaDBCfg,
|
|
||||||
QueryRetries: grafanaDBCfg.QueryRetries,
|
|
||||||
})
|
|
||||||
case migrator.MySQL:
|
|
||||||
// For mysql we need to pass parseTime parameter in connection string
|
|
||||||
connStr := grafanaDBCfg.ConnectionString + "&parseTime=true"
|
|
||||||
if err := migration.Run(cfg, migrator.MySQL, connStr, assets.EmbedMigrations, assets.MySQLMigrationDir); err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to run migrations: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return mysql.New(connStr, zanzanaDBCfg)
|
|
||||||
case migrator.Postgres:
|
|
||||||
connStr := grafanaDBCfg.ConnectionString
|
|
||||||
if err := migration.Run(cfg, migrator.Postgres, connStr, assets.EmbedMigrations, assets.PostgresMigrationDir); err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to run migrations: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return postgres.New(connStr, zanzanaDBCfg)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Should never happen
|
|
||||||
return nil, fmt.Errorf("unsupported database engine: %s", grafanaDBCfg.Type)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewEmbeddedStore(cfg *setting.Cfg, db db.DB, logger log.Logger) (storage.OpenFGADatastore, error) {
|
func NewEmbeddedStore(cfg *setting.Cfg, db db.DB, logger log.Logger) (storage.OpenFGADatastore, error) {
|
||||||
grafanaDBCfg, zanzanaDBCfg, err := parseConfig(cfg, logger)
|
return store.NewEmbeddedStore(cfg, db, logger)
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to parse database config: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
m := migrator.NewMigrator(db.GetEngine(), cfg)
|
|
||||||
|
|
||||||
switch grafanaDBCfg.Type {
|
|
||||||
case migrator.SQLite:
|
|
||||||
if err := migration.RunWithMigrator(m, cfg, zstore.EmbedMigrations, zstore.SQLiteMigrationDir); err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to run migrations: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// FIXME(kalleep): We should work on getting sqlite implemtation merged upstream and replace this one
|
|
||||||
return sqlite.NewWithDB(db.GetEngine().DB().DB, &sqlite.Config{
|
|
||||||
Config: zanzanaDBCfg,
|
|
||||||
QueryRetries: grafanaDBCfg.QueryRetries,
|
|
||||||
})
|
|
||||||
case migrator.MySQL:
|
|
||||||
if err := migration.RunWithMigrator(m, cfg, assets.EmbedMigrations, assets.MySQLMigrationDir); err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to run migrations: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// For mysql we need to pass parseTime parameter in connection string
|
|
||||||
return mysql.New(grafanaDBCfg.ConnectionString+"&parseTime=true", zanzanaDBCfg)
|
|
||||||
case migrator.Postgres:
|
|
||||||
if err := migration.RunWithMigrator(m, cfg, assets.EmbedMigrations, assets.PostgresMigrationDir); err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to run migrations: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return postgres.New(grafanaDBCfg.ConnectionString, zanzanaDBCfg)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Should never happen
|
|
||||||
return nil, fmt.Errorf("unsupported database engine: %s", db.GetDialect().DriverName())
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseConfig(cfg *setting.Cfg, logger log.Logger) (*sqlstore.DatabaseConfig, *sqlcommon.Config, error) {
|
|
||||||
sec := cfg.Raw.Section("database")
|
|
||||||
grafanaDBCfg, err := sqlstore.NewDatabaseConfig(cfg, nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
zanzanaDBCfg := &sqlcommon.Config{
|
|
||||||
Logger: newZanzanaLogger(logger),
|
|
||||||
MaxTuplesPerWriteField: 100,
|
|
||||||
MaxTypesPerModelField: 100,
|
|
||||||
MaxOpenConns: grafanaDBCfg.MaxOpenConn,
|
|
||||||
MaxIdleConns: grafanaDBCfg.MaxIdleConn,
|
|
||||||
ConnMaxLifetime: time.Duration(grafanaDBCfg.ConnMaxLifetime) * time.Second,
|
|
||||||
ExportMetrics: sec.Key("instrument_queries").MustBool(false),
|
|
||||||
}
|
|
||||||
|
|
||||||
return grafanaDBCfg, zanzanaDBCfg, nil
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
package store
|
package assets
|
||||||
|
|
||||||
import "embed"
|
import "embed"
|
||||||
|
|
||||||
@@ -19,10 +19,11 @@ import (
|
|||||||
"github.com/openfga/openfga/pkg/typesystem"
|
"github.com/openfga/openfga/pkg/typesystem"
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/infra/db"
|
"github.com/grafana/grafana/pkg/infra/db"
|
||||||
"github.com/grafana/grafana/pkg/services/authz/zanzana/store"
|
|
||||||
"github.com/grafana/grafana/pkg/services/authz/zanzana/store/migration"
|
|
||||||
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||||
"github.com/grafana/grafana/pkg/tests/testsuite"
|
"github.com/grafana/grafana/pkg/tests/testsuite"
|
||||||
|
|
||||||
|
zassets "github.com/grafana/grafana/pkg/services/authz/zanzana/store/assets"
|
||||||
|
"github.com/grafana/grafana/pkg/services/authz/zanzana/store/migration"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMain(m *testing.M) {
|
func TestMain(m *testing.M) {
|
||||||
@@ -287,7 +288,7 @@ func sqliteIntegrationTest(tb testing.TB) *sql.DB {
|
|||||||
db, cfg := db.InitTestDBWithCfg(tb)
|
db, cfg := db.InitTestDBWithCfg(tb)
|
||||||
|
|
||||||
m := migrator.NewMigrator(db.GetEngine(), cfg)
|
m := migrator.NewMigrator(db.GetEngine(), cfg)
|
||||||
err := migration.RunWithMigrator(m, cfg, store.EmbedMigrations, store.SQLiteMigrationDir)
|
err := migration.RunWithMigrator(m, cfg, zassets.EmbedMigrations, zassets.SQLiteMigrationDir)
|
||||||
require.NoError(tb, err)
|
require.NoError(tb, err)
|
||||||
|
|
||||||
return db.GetEngine().DB().DB
|
return db.GetEngine().DB().DB
|
||||||
|
|||||||
128
pkg/services/authz/zanzana/store/store.go
Normal file
128
pkg/services/authz/zanzana/store/store.go
Normal file
@@ -0,0 +1,128 @@
|
|||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"xorm.io/xorm"
|
||||||
|
|
||||||
|
"github.com/openfga/openfga/assets"
|
||||||
|
"github.com/openfga/openfga/pkg/storage"
|
||||||
|
"github.com/openfga/openfga/pkg/storage/mysql"
|
||||||
|
"github.com/openfga/openfga/pkg/storage/postgres"
|
||||||
|
"github.com/openfga/openfga/pkg/storage/sqlcommon"
|
||||||
|
|
||||||
|
"github.com/grafana/grafana/pkg/infra/db"
|
||||||
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
|
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||||
|
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||||
|
"github.com/grafana/grafana/pkg/setting"
|
||||||
|
|
||||||
|
zlogger "github.com/grafana/grafana/pkg/services/authz/zanzana/logger"
|
||||||
|
zassets "github.com/grafana/grafana/pkg/services/authz/zanzana/store/assets"
|
||||||
|
"github.com/grafana/grafana/pkg/services/authz/zanzana/store/migration"
|
||||||
|
"github.com/grafana/grafana/pkg/services/authz/zanzana/store/sqlite"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewStore(cfg *setting.Cfg, logger log.Logger) (storage.OpenFGADatastore, error) {
|
||||||
|
grafanaDBCfg, zanzanaDBCfg, err := parseConfig(cfg, logger)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to parse database config: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
switch grafanaDBCfg.Type {
|
||||||
|
case migrator.SQLite:
|
||||||
|
connStr := grafanaDBCfg.ConnectionString
|
||||||
|
// Initilize connection using xorm engine so we can reuse it for both migrations and data store
|
||||||
|
engine, err := xorm.NewEngine(grafanaDBCfg.Type, connStr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to connect to database: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
m := migrator.NewMigrator(engine, cfg)
|
||||||
|
if err := migration.RunWithMigrator(m, cfg, zassets.EmbedMigrations, zassets.SQLiteMigrationDir); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to run migrations: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return sqlite.NewWithDB(engine.DB().DB, &sqlite.Config{
|
||||||
|
Config: zanzanaDBCfg,
|
||||||
|
QueryRetries: grafanaDBCfg.QueryRetries,
|
||||||
|
})
|
||||||
|
case migrator.MySQL:
|
||||||
|
// For mysql we need to pass parseTime parameter in connection string
|
||||||
|
connStr := grafanaDBCfg.ConnectionString + "&parseTime=true"
|
||||||
|
if err := migration.Run(cfg, migrator.MySQL, connStr, assets.EmbedMigrations, assets.MySQLMigrationDir); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to run migrations: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return mysql.New(connStr, zanzanaDBCfg)
|
||||||
|
case migrator.Postgres:
|
||||||
|
connStr := grafanaDBCfg.ConnectionString
|
||||||
|
if err := migration.Run(cfg, migrator.Postgres, connStr, assets.EmbedMigrations, assets.PostgresMigrationDir); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to run migrations: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return postgres.New(connStr, zanzanaDBCfg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should never happen
|
||||||
|
return nil, fmt.Errorf("unsupported database engine: %s", grafanaDBCfg.Type)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewEmbeddedStore(cfg *setting.Cfg, db db.DB, logger log.Logger) (storage.OpenFGADatastore, error) {
|
||||||
|
grafanaDBCfg, zanzanaDBCfg, err := parseConfig(cfg, logger)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to parse database config: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
m := migrator.NewMigrator(db.GetEngine(), cfg)
|
||||||
|
|
||||||
|
switch grafanaDBCfg.Type {
|
||||||
|
case migrator.SQLite:
|
||||||
|
if err := migration.RunWithMigrator(m, cfg, zassets.EmbedMigrations, zassets.SQLiteMigrationDir); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to run migrations: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// FIXME(kalleep): We should work on getting sqlite implemtation merged upstream and replace this one
|
||||||
|
return sqlite.NewWithDB(db.GetEngine().DB().DB, &sqlite.Config{
|
||||||
|
Config: zanzanaDBCfg,
|
||||||
|
QueryRetries: grafanaDBCfg.QueryRetries,
|
||||||
|
})
|
||||||
|
case migrator.MySQL:
|
||||||
|
if err := migration.RunWithMigrator(m, cfg, assets.EmbedMigrations, assets.MySQLMigrationDir); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to run migrations: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// For mysql we need to pass parseTime parameter in connection string
|
||||||
|
return mysql.New(grafanaDBCfg.ConnectionString+"&parseTime=true", zanzanaDBCfg)
|
||||||
|
case migrator.Postgres:
|
||||||
|
if err := migration.RunWithMigrator(m, cfg, assets.EmbedMigrations, assets.PostgresMigrationDir); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to run migrations: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return postgres.New(grafanaDBCfg.ConnectionString, zanzanaDBCfg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should never happen
|
||||||
|
return nil, fmt.Errorf("unsupported database engine: %s", db.GetDialect().DriverName())
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseConfig(cfg *setting.Cfg, logger log.Logger) (*sqlstore.DatabaseConfig, *sqlcommon.Config, error) {
|
||||||
|
sec := cfg.Raw.Section("database")
|
||||||
|
grafanaDBCfg, err := sqlstore.NewDatabaseConfig(cfg, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
zanzanaDBCfg := &sqlcommon.Config{
|
||||||
|
Logger: zlogger.New(logger),
|
||||||
|
MaxTuplesPerWriteField: 100,
|
||||||
|
MaxTypesPerModelField: 100,
|
||||||
|
MaxOpenConns: grafanaDBCfg.MaxOpenConn,
|
||||||
|
MaxIdleConns: grafanaDBCfg.MaxIdleConn,
|
||||||
|
ConnMaxLifetime: time.Duration(grafanaDBCfg.ConnMaxLifetime) * time.Second,
|
||||||
|
ExportMetrics: sec.Key("instrument_queries").MustBool(false),
|
||||||
|
}
|
||||||
|
|
||||||
|
return grafanaDBCfg, zanzanaDBCfg, nil
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user