From 67252dfa46ed7b159b46902c18f9974d90b669e4 Mon Sep 17 00:00:00 2001 From: Karl Persson <23356117+kalleep@users.noreply.github.com> Date: Fri, 17 Jan 2025 13:39:42 +0100 Subject: [PATCH] Zanzana: Add grpc health and readiness checks for standalone zanzana (#99176) Add grpc health and readiness checks for standalone zanzana --- pkg/services/authz/zanzana.go | 17 +++-- pkg/services/authz/zanzana/server.go | 7 +- pkg/services/authz/zanzana/server/health.go | 77 +++++++++++++++++++++ pkg/services/authz/zanzana/server/server.go | 13 +++- 4 files changed, 104 insertions(+), 10 deletions(-) create mode 100644 pkg/services/authz/zanzana/server/health.go diff --git a/pkg/services/authz/zanzana.go b/pkg/services/authz/zanzana.go index 0cab9bf52d4..04a1d7fdcbb 100644 --- a/pkg/services/authz/zanzana.go +++ b/pkg/services/authz/zanzana.go @@ -15,6 +15,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + healthv1pb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/metadata" "github.com/grafana/grafana/pkg/infra/db" @@ -154,12 +155,12 @@ func (z *Zanzana) start(ctx context.Context) error { return fmt.Errorf("failed to initilize zanana store: %w", err) } - openfga, err := zanzana.NewOpenFGAServer(z.cfg.ZanzanaServer, store, z.logger) + openfgaServer, err := zanzana.NewOpenFGAServer(z.cfg.ZanzanaServer, store, z.logger) if err != nil { return fmt.Errorf("failed to start zanzana: %w", err) } - srv, err := zanzana.NewServer(z.cfg.ZanzanaServer, openfga, z.logger) + zanzanaServer, err := zanzana.NewServer(z.cfg.ZanzanaServer, openfgaServer, z.logger) if err != nil { return fmt.Errorf("failed to start zanzana: %w", err) } @@ -203,10 +204,14 @@ func (z *Zanzana) start(ctx context.Context) error { return fmt.Errorf("failed to create zanzana grpc server: %w", err) } - s := z.handle.GetServer() - openfgav1.RegisterOpenFGAServiceServer(s, openfga) - authzv1.RegisterAuthzServiceServer(s, srv) - authzextv1.RegisterAuthzExtentionServiceServer(s, srv) + grpcServer := z.handle.GetServer() + openfgav1.RegisterOpenFGAServiceServer(grpcServer, openfgaServer) + authzv1.RegisterAuthzServiceServer(grpcServer, zanzanaServer) + authzextv1.RegisterAuthzExtentionServiceServer(grpcServer, zanzanaServer) + + // register grpc health server + healthServer := zanzana.NewHealthServer(zanzanaServer) + healthv1pb.RegisterHealthServer(grpcServer, healthServer) if _, err := grpcserver.ProvideReflectionService(z.cfg, z.handle); err != nil { return fmt.Errorf("failed to register reflection for zanzana: %w", err) diff --git a/pkg/services/authz/zanzana/server.go b/pkg/services/authz/zanzana/server.go index ff03e446aaf..b1b79cc179f 100644 --- a/pkg/services/authz/zanzana/server.go +++ b/pkg/services/authz/zanzana/server.go @@ -3,7 +3,6 @@ package zanzana import ( "net/http" - openfgav1 "github.com/openfga/api/proto/openfga/v1" openfgaserver "github.com/openfga/openfga/pkg/server" openfgastorage "github.com/openfga/openfga/pkg/storage" @@ -13,10 +12,14 @@ import ( "github.com/grafana/grafana/pkg/setting" ) -func NewServer(cfg setting.ZanzanaServerSettings, openfga openfgav1.OpenFGAServiceServer, logger log.Logger) (*server.Server, error) { +func NewServer(cfg setting.ZanzanaServerSettings, openfga server.OpenFGAServer, logger log.Logger) (*server.Server, error) { return server.NewServer(cfg, openfga, logger) } +func NewHealthServer(target server.DiagnosticServer) *server.HealthServer { + return server.NewHealthServer(target) +} + func NewOpenFGAServer(cfg setting.ZanzanaServerSettings, store openfgastorage.OpenFGADatastore, logger log.Logger) (*openfgaserver.Server, error) { return server.NewOpenFGAServer(cfg, store, logger) } diff --git a/pkg/services/authz/zanzana/server/health.go b/pkg/services/authz/zanzana/server/health.go new file mode 100644 index 00000000000..7acea309956 --- /dev/null +++ b/pkg/services/authz/zanzana/server/health.go @@ -0,0 +1,77 @@ +package server + +import ( + "context" + "errors" + "time" + + grpcauth "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/auth" + healthv1pb "google.golang.org/grpc/health/grpc_health_v1" +) + +type DiagnosticServer interface { + IsHealthy(ctx context.Context) (bool, error) +} + +func NewHealthServer(target DiagnosticServer) *HealthServer { + return &HealthServer{target: target} +} + +type HealthServer struct { + healthv1pb.UnimplementedHealthServer + target DiagnosticServer +} + +var _ grpcauth.ServiceAuthFuncOverride = (*HealthServer)(nil) + +func (s *HealthServer) AuthFuncOverride(ctx context.Context, fullMethodName string) (context.Context, error) { + return ctx, nil +} + +func (s *HealthServer) Check(ctx context.Context, req *healthv1pb.HealthCheckRequest) (*healthv1pb.HealthCheckResponse, error) { + healthy, err := s.target.IsHealthy(ctx) + if err != nil || !healthy { + return &healthv1pb.HealthCheckResponse{Status: healthv1pb.HealthCheckResponse_NOT_SERVING}, err + } + return &healthv1pb.HealthCheckResponse{Status: healthv1pb.HealthCheckResponse_SERVING}, nil +} + +func (s *HealthServer) Watch(req *healthv1pb.HealthCheckRequest, stream healthv1pb.Health_WatchServer) error { + res, err := s.Check(stream.Context(), &healthv1pb.HealthCheckRequest{}) + if err != nil { + return err + } + + err = stream.Send(res) + if err != nil { + return err + } + + prevStatus := res.GetStatus() + + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + res, err := s.Check(stream.Context(), &healthv1pb.HealthCheckRequest{}) + if err != nil { + return err + } + + // if health status has not changed, continue + if res.GetStatus() == prevStatus { + continue + } + + prevStatus = res.GetStatus() + err = stream.Send(res) + if err != nil { + return err + } + + case <-stream.Context().Done(): + return errors.New("stream closed, context cancelled") + } + } +} diff --git a/pkg/services/authz/zanzana/server/server.go b/pkg/services/authz/zanzana/server/server.go index 2bba0fb95ec..6826b2b936f 100644 --- a/pkg/services/authz/zanzana/server/server.go +++ b/pkg/services/authz/zanzana/server/server.go @@ -26,11 +26,16 @@ var _ authzextv1.AuthzExtentionServiceServer = (*Server)(nil) var tracer = otel.Tracer("github.com/grafana/grafana/pkg/services/authz/zanzana/server") +type OpenFGAServer interface { + openfgav1.OpenFGAServiceServer + IsReady(ctx context.Context) (bool, error) +} + type Server struct { authzv1.UnimplementedAuthzServiceServer authzextv1.UnimplementedAuthzExtentionServiceServer - openfga openfgav1.OpenFGAServiceServer + openfga OpenFGAServer openfgaClient openfgav1.OpenFGAServiceClient cfg setting.ZanzanaServerSettings @@ -45,7 +50,7 @@ type storeInfo struct { ModelID string } -func NewServer(cfg setting.ZanzanaServerSettings, openfga openfgav1.OpenFGAServiceServer, logger log.Logger) (*Server, error) { +func NewServer(cfg setting.ZanzanaServerSettings, openfga OpenFGAServer, logger log.Logger) (*Server, error) { channel := &inprocgrpc.Channel{} openfgav1.RegisterOpenFGAServiceServer(channel, openfga) openFGAClient := openfgav1.NewOpenFGAServiceClient(channel) @@ -63,6 +68,10 @@ func NewServer(cfg setting.ZanzanaServerSettings, openfga openfgav1.OpenFGAServi return s, nil } +func (s *Server) IsHealthy(ctx context.Context) (bool, error) { + return s.openfga.IsReady(ctx) +} + func (s *Server) getContextuals(ctx context.Context, subject string) (*openfgav1.ContextualTupleKeys, error) { contextuals, err := s.getGlobalAuthorizationContext(ctx) if err != nil {