mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Zanzana: Run OpenFGA HTTP server in standalone mode (#89914)
* Zanzana: Listen http to handle fga cli requests. * make configurable * start http server during service run * wait for GRPC server is ready * remove unnecessary logs * fix linter errors * run only in devenv * make address configurable
This commit is contained in:
parent
073ef93007
commit
f1968bbcbb
@ -136,6 +136,16 @@ func (z *Zanzana) start(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (z *Zanzana) running(ctx context.Context) error {
|
||||
if z.cfg.Env == setting.Dev && z.cfg.Zanzana.ListenHTTP {
|
||||
go func() {
|
||||
z.logger.Info("Starting OpenFGA HTTP server")
|
||||
err := zanzana.StartOpenFGAHttpSever(z.cfg, z.handle, z.logger)
|
||||
if err != nil {
|
||||
z.logger.Error("failed to start OpenFGA HTTP server", "error", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Run is blocking so we can just run it here
|
||||
return z.handle.Run(ctx)
|
||||
}
|
||||
|
@ -1,10 +1,27 @@
|
||||
package zanzana
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
|
||||
openfgav1 "github.com/openfga/api/proto/openfga/v1"
|
||||
httpmiddleware "github.com/openfga/openfga/pkg/middleware/http"
|
||||
"github.com/openfga/openfga/pkg/server"
|
||||
serverErrors "github.com/openfga/openfga/pkg/server/errors"
|
||||
"github.com/openfga/openfga/pkg/storage"
|
||||
"github.com/rs/cors"
|
||||
"go.uber.org/zap/zapcore"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
healthv1pb "google.golang.org/grpc/health/grpc_health_v1"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/grpcserver"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
)
|
||||
|
||||
func NewServer(store storage.OpenFGADatastore, logger log.Logger) (*server.Server, error) {
|
||||
@ -24,3 +41,70 @@ func NewServer(store storage.OpenFGADatastore, logger log.Logger) (*server.Serve
|
||||
|
||||
return srv, nil
|
||||
}
|
||||
|
||||
// StartOpenFGAHttpSever starts HTTP server which allows to use fga cli.
|
||||
func StartOpenFGAHttpSever(cfg *setting.Cfg, srv grpcserver.Provider, logger log.Logger) error {
|
||||
dialOpts := []grpc.DialOption{
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
}
|
||||
|
||||
addr := srv.GetAddress()
|
||||
// Wait until GRPC server is initialized
|
||||
ticker := time.NewTicker(100 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
maxRetries := 100
|
||||
retries := 0
|
||||
for addr == "" && retries < maxRetries {
|
||||
<-ticker.C
|
||||
addr = srv.GetAddress()
|
||||
retries++
|
||||
}
|
||||
if addr == "" {
|
||||
return fmt.Errorf("failed to start HTTP server: GRPC server unavailable")
|
||||
}
|
||||
|
||||
conn, err := grpc.NewClient(addr, dialOpts...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to dial GRPC: %w", err)
|
||||
}
|
||||
|
||||
muxOpts := []runtime.ServeMuxOption{
|
||||
runtime.WithForwardResponseOption(httpmiddleware.HTTPResponseModifier),
|
||||
runtime.WithErrorHandler(func(c context.Context,
|
||||
sr *runtime.ServeMux, mm runtime.Marshaler, w http.ResponseWriter, r *http.Request, e error) {
|
||||
intCode := serverErrors.ConvertToEncodedErrorCode(status.Convert(e))
|
||||
httpmiddleware.CustomHTTPErrorHandler(c, w, r, serverErrors.NewEncodedError(intCode, e.Error()))
|
||||
}),
|
||||
runtime.WithStreamErrorHandler(func(ctx context.Context, e error) *status.Status {
|
||||
intCode := serverErrors.ConvertToEncodedErrorCode(status.Convert(e))
|
||||
encodedErr := serverErrors.NewEncodedError(intCode, e.Error())
|
||||
return status.Convert(encodedErr)
|
||||
}),
|
||||
runtime.WithHealthzEndpoint(healthv1pb.NewHealthClient(conn)),
|
||||
runtime.WithOutgoingHeaderMatcher(func(s string) (string, bool) { return s, true }),
|
||||
}
|
||||
mux := runtime.NewServeMux(muxOpts...)
|
||||
if err := openfgav1.RegisterOpenFGAServiceHandler(context.TODO(), mux, conn); err != nil {
|
||||
return fmt.Errorf("failed to register gateway handler: %w", err)
|
||||
}
|
||||
|
||||
httpServer := &http.Server{
|
||||
Addr: cfg.Zanzana.HttpAddr,
|
||||
Handler: cors.New(cors.Options{
|
||||
AllowedOrigins: []string{"*"},
|
||||
AllowCredentials: true,
|
||||
AllowedHeaders: []string{"*"},
|
||||
AllowedMethods: []string{http.MethodGet, http.MethodPost,
|
||||
http.MethodHead, http.MethodPatch, http.MethodDelete, http.MethodPut},
|
||||
}).Handler(mux),
|
||||
ReadHeaderTimeout: 30 * time.Second,
|
||||
}
|
||||
go func() {
|
||||
err = httpServer.ListenAndServe()
|
||||
if err != nil {
|
||||
logger.Error("failed to start http server", zapcore.Field{Key: "err", Type: zapcore.ErrorType, Interface: err})
|
||||
}
|
||||
}()
|
||||
logger.Info(fmt.Sprintf("OpenFGA HTTP server listening on '%s'...", httpServer.Addr))
|
||||
return nil
|
||||
}
|
||||
|
@ -16,6 +16,10 @@ type ZanzanaSettings struct {
|
||||
Addr string
|
||||
// Mode can either be embedded or client
|
||||
Mode ZanzanaMode
|
||||
// ListenHTTP enables OpenFGA http server which allows to use fga cli
|
||||
ListenHTTP bool
|
||||
// OpenFGA http server address which allows to connect with fga cli
|
||||
HttpAddr string
|
||||
}
|
||||
|
||||
func (cfg *Cfg) readZanzanaSettings() {
|
||||
@ -32,6 +36,8 @@ func (cfg *Cfg) readZanzanaSettings() {
|
||||
}
|
||||
|
||||
s.Addr = sec.Key("address").MustString("")
|
||||
s.ListenHTTP = sec.Key("listen_http").MustBool(false)
|
||||
s.HttpAddr = sec.Key("http_addr").MustString("127.0.0.1:8080")
|
||||
|
||||
cfg.Zanzana = s
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user