mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Zanzana: Add grpc health and readiness checks for standalone zanzana (#99176)
Add grpc health and readiness checks for standalone zanzana
This commit is contained in:
parent
032f465ac6
commit
67252dfa46
@ -15,6 +15,7 @@ import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
healthv1pb "google.golang.org/grpc/health/grpc_health_v1"
|
||||
"google.golang.org/grpc/metadata"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/db"
|
||||
@ -154,12 +155,12 @@ func (z *Zanzana) start(ctx context.Context) error {
|
||||
return fmt.Errorf("failed to initilize zanana store: %w", err)
|
||||
}
|
||||
|
||||
openfga, err := zanzana.NewOpenFGAServer(z.cfg.ZanzanaServer, store, z.logger)
|
||||
openfgaServer, err := zanzana.NewOpenFGAServer(z.cfg.ZanzanaServer, store, z.logger)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start zanzana: %w", err)
|
||||
}
|
||||
|
||||
srv, err := zanzana.NewServer(z.cfg.ZanzanaServer, openfga, z.logger)
|
||||
zanzanaServer, err := zanzana.NewServer(z.cfg.ZanzanaServer, openfgaServer, z.logger)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start zanzana: %w", err)
|
||||
}
|
||||
@ -203,10 +204,14 @@ func (z *Zanzana) start(ctx context.Context) error {
|
||||
return fmt.Errorf("failed to create zanzana grpc server: %w", err)
|
||||
}
|
||||
|
||||
s := z.handle.GetServer()
|
||||
openfgav1.RegisterOpenFGAServiceServer(s, openfga)
|
||||
authzv1.RegisterAuthzServiceServer(s, srv)
|
||||
authzextv1.RegisterAuthzExtentionServiceServer(s, srv)
|
||||
grpcServer := z.handle.GetServer()
|
||||
openfgav1.RegisterOpenFGAServiceServer(grpcServer, openfgaServer)
|
||||
authzv1.RegisterAuthzServiceServer(grpcServer, zanzanaServer)
|
||||
authzextv1.RegisterAuthzExtentionServiceServer(grpcServer, zanzanaServer)
|
||||
|
||||
// register grpc health server
|
||||
healthServer := zanzana.NewHealthServer(zanzanaServer)
|
||||
healthv1pb.RegisterHealthServer(grpcServer, healthServer)
|
||||
|
||||
if _, err := grpcserver.ProvideReflectionService(z.cfg, z.handle); err != nil {
|
||||
return fmt.Errorf("failed to register reflection for zanzana: %w", err)
|
||||
|
@ -3,7 +3,6 @@ package zanzana
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
openfgav1 "github.com/openfga/api/proto/openfga/v1"
|
||||
openfgaserver "github.com/openfga/openfga/pkg/server"
|
||||
openfgastorage "github.com/openfga/openfga/pkg/storage"
|
||||
|
||||
@ -13,10 +12,14 @@ import (
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
)
|
||||
|
||||
func NewServer(cfg setting.ZanzanaServerSettings, openfga openfgav1.OpenFGAServiceServer, logger log.Logger) (*server.Server, error) {
|
||||
func NewServer(cfg setting.ZanzanaServerSettings, openfga server.OpenFGAServer, logger log.Logger) (*server.Server, error) {
|
||||
return server.NewServer(cfg, openfga, logger)
|
||||
}
|
||||
|
||||
func NewHealthServer(target server.DiagnosticServer) *server.HealthServer {
|
||||
return server.NewHealthServer(target)
|
||||
}
|
||||
|
||||
func NewOpenFGAServer(cfg setting.ZanzanaServerSettings, store openfgastorage.OpenFGADatastore, logger log.Logger) (*openfgaserver.Server, error) {
|
||||
return server.NewOpenFGAServer(cfg, store, logger)
|
||||
}
|
||||
|
77
pkg/services/authz/zanzana/server/health.go
Normal file
77
pkg/services/authz/zanzana/server/health.go
Normal file
@ -0,0 +1,77 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
grpcauth "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/auth"
|
||||
healthv1pb "google.golang.org/grpc/health/grpc_health_v1"
|
||||
)
|
||||
|
||||
type DiagnosticServer interface {
|
||||
IsHealthy(ctx context.Context) (bool, error)
|
||||
}
|
||||
|
||||
func NewHealthServer(target DiagnosticServer) *HealthServer {
|
||||
return &HealthServer{target: target}
|
||||
}
|
||||
|
||||
type HealthServer struct {
|
||||
healthv1pb.UnimplementedHealthServer
|
||||
target DiagnosticServer
|
||||
}
|
||||
|
||||
var _ grpcauth.ServiceAuthFuncOverride = (*HealthServer)(nil)
|
||||
|
||||
func (s *HealthServer) AuthFuncOverride(ctx context.Context, fullMethodName string) (context.Context, error) {
|
||||
return ctx, nil
|
||||
}
|
||||
|
||||
func (s *HealthServer) Check(ctx context.Context, req *healthv1pb.HealthCheckRequest) (*healthv1pb.HealthCheckResponse, error) {
|
||||
healthy, err := s.target.IsHealthy(ctx)
|
||||
if err != nil || !healthy {
|
||||
return &healthv1pb.HealthCheckResponse{Status: healthv1pb.HealthCheckResponse_NOT_SERVING}, err
|
||||
}
|
||||
return &healthv1pb.HealthCheckResponse{Status: healthv1pb.HealthCheckResponse_SERVING}, nil
|
||||
}
|
||||
|
||||
func (s *HealthServer) Watch(req *healthv1pb.HealthCheckRequest, stream healthv1pb.Health_WatchServer) error {
|
||||
res, err := s.Check(stream.Context(), &healthv1pb.HealthCheckRequest{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = stream.Send(res)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
prevStatus := res.GetStatus()
|
||||
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
res, err := s.Check(stream.Context(), &healthv1pb.HealthCheckRequest{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// if health status has not changed, continue
|
||||
if res.GetStatus() == prevStatus {
|
||||
continue
|
||||
}
|
||||
|
||||
prevStatus = res.GetStatus()
|
||||
err = stream.Send(res)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
case <-stream.Context().Done():
|
||||
return errors.New("stream closed, context cancelled")
|
||||
}
|
||||
}
|
||||
}
|
@ -26,11 +26,16 @@ var _ authzextv1.AuthzExtentionServiceServer = (*Server)(nil)
|
||||
|
||||
var tracer = otel.Tracer("github.com/grafana/grafana/pkg/services/authz/zanzana/server")
|
||||
|
||||
type OpenFGAServer interface {
|
||||
openfgav1.OpenFGAServiceServer
|
||||
IsReady(ctx context.Context) (bool, error)
|
||||
}
|
||||
|
||||
type Server struct {
|
||||
authzv1.UnimplementedAuthzServiceServer
|
||||
authzextv1.UnimplementedAuthzExtentionServiceServer
|
||||
|
||||
openfga openfgav1.OpenFGAServiceServer
|
||||
openfga OpenFGAServer
|
||||
openfgaClient openfgav1.OpenFGAServiceClient
|
||||
|
||||
cfg setting.ZanzanaServerSettings
|
||||
@ -45,7 +50,7 @@ type storeInfo struct {
|
||||
ModelID string
|
||||
}
|
||||
|
||||
func NewServer(cfg setting.ZanzanaServerSettings, openfga openfgav1.OpenFGAServiceServer, logger log.Logger) (*Server, error) {
|
||||
func NewServer(cfg setting.ZanzanaServerSettings, openfga OpenFGAServer, logger log.Logger) (*Server, error) {
|
||||
channel := &inprocgrpc.Channel{}
|
||||
openfgav1.RegisterOpenFGAServiceServer(channel, openfga)
|
||||
openFGAClient := openfgav1.NewOpenFGAServiceClient(channel)
|
||||
@ -63,6 +68,10 @@ func NewServer(cfg setting.ZanzanaServerSettings, openfga openfgav1.OpenFGAServi
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *Server) IsHealthy(ctx context.Context) (bool, error) {
|
||||
return s.openfga.IsReady(ctx)
|
||||
}
|
||||
|
||||
func (s *Server) getContextuals(ctx context.Context, subject string) (*openfgav1.ContextualTupleKeys, error) {
|
||||
contextuals, err := s.getGlobalAuthorizationContext(ctx)
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user