AuthZ: Launch service within IAM app (#96421)

This commit is contained in:
Gabriel MABILLE 2024-11-20 11:13:33 +01:00 committed by GitHub
parent 4800b8a26b
commit aa2b4751a0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 223 additions and 96 deletions

View File

@ -35,8 +35,8 @@ func TestIntegrationWillRunInstrumentationServerWhenTargetHasNoHttpServer(t *tes
_, cfg := db.InitTestDBWithCfg(t) _, cfg := db.InitTestDBWithCfg(t)
cfg.HTTPPort = "3001" cfg.HTTPPort = "3001"
cfg.GRPCServerNetwork = "tcp" cfg.GRPCServer.Network = "tcp"
cfg.GRPCServerAddress = "localhost:10000" cfg.GRPCServer.Address = "localhost:10000"
addStorageServerToConfig(t, cfg, dbType) addStorageServerToConfig(t, cfg, dbType)
cfg.Target = []string{modules.StorageServer} cfg.Target = []string{modules.StorageServer}

View File

@ -0,0 +1,37 @@
package rbac
import (
authzv1 "github.com/grafana/authlib/authz/proto/v1"
authzextv1 "github.com/grafana/grafana/pkg/services/authz/proto/v1"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/authz/rbac/store"
"github.com/grafana/grafana/pkg/storage/legacysql"
)
type Service struct {
authzv1.UnimplementedAuthzServiceServer
authzextv1.UnimplementedAuthzExtentionServiceServer
store *store.Store
logger log.Logger
tracer tracing.Tracer
}
func NewService(sql legacysql.LegacyDatabaseProvider, logger log.Logger, tracer tracing.Tracer) *Service {
return &Service{
store: store.NewStore(sql),
logger: logger,
tracer: tracer,
}
}
// TODO: Implement Check
// func (s *Service) Check(ctx context.Context, req *authzv1.CheckRequest) (*authzv1.CheckResponse, error) {
// This needs to be done for the database provider
// ns := req.GetNamespace()
// ctx = request.WithNamespace(ctx, ns)
// return nil, nil
// }

View File

@ -0,0 +1,15 @@
package store
import "github.com/grafana/grafana/pkg/storage/legacysql"
// TODO (gamab): Implement GetRoles, GetTeams, GetFolders, GetPermissions
type Store struct {
sql legacysql.LegacyDatabaseProvider
}
func NewStore(sql legacysql.LegacyDatabaseProvider) *Store {
return &Store{
sql: sql,
}
}

View File

@ -16,11 +16,22 @@ import (
"github.com/grafana/grafana/pkg/services/accesscontrol" "github.com/grafana/grafana/pkg/services/accesscontrol"
"github.com/grafana/grafana/pkg/services/authn" "github.com/grafana/grafana/pkg/services/authn"
"github.com/grafana/grafana/pkg/services/authz/mappers" "github.com/grafana/grafana/pkg/services/authz/mappers"
authzextv1 "github.com/grafana/grafana/pkg/services/authz/proto/v1"
"github.com/grafana/grafana/pkg/services/authz/rbac"
"github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/folder" "github.com/grafana/grafana/pkg/services/folder"
"github.com/grafana/grafana/pkg/services/grpcserver" "github.com/grafana/grafana/pkg/services/grpcserver"
"github.com/grafana/grafana/pkg/storage/legacysql"
) )
func RegisterRBACAuthZService(handler grpcserver.Provider, db legacysql.LegacyDatabaseProvider, tracer tracing.Tracer) {
server := rbac.NewService(db, log.New("authz-grpc-server"), tracer)
srv := handler.GetServer()
authzv1.RegisterAuthzServiceServer(srv, server)
authzextv1.RegisterAuthzExtentionServiceServer(srv, server)
}
var _ authzv1.AuthzServiceServer = (*legacyServer)(nil) var _ authzv1.AuthzServiceServer = (*legacyServer)(nil)
var _ grpc_auth.ServiceAuthFuncOverride = (*legacyServer)(nil) var _ grpc_auth.ServiceAuthFuncOverride = (*legacyServer)(nil)
var _ authzlib.ServiceAuthorizeFuncOverride = (*legacyServer)(nil) var _ authzlib.ServiceAuthorizeFuncOverride = (*legacyServer)(nil)

View File

@ -4,11 +4,10 @@ import (
"context" "context"
"github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/setting"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
func LoggingUnaryInterceptor(cfg *setting.Cfg, logger log.Logger) grpc.UnaryServerInterceptor { func LoggingUnaryInterceptor(logger log.Logger, enabled bool) grpc.UnaryServerInterceptor {
return func( return func(
ctx context.Context, ctx context.Context,
req any, req any,
@ -16,7 +15,7 @@ func LoggingUnaryInterceptor(cfg *setting.Cfg, logger log.Logger) grpc.UnaryServ
handler grpc.UnaryHandler, handler grpc.UnaryHandler,
) (resp any, err error) { ) (resp any, err error) {
resp, err = handler(ctx, req) resp, err = handler(ctx, req)
if cfg.GRPCServerEnableLogging { if enabled {
ctxLogger := logger.FromContext(ctx) ctxLogger := logger.FromContext(ctx)
if err != nil { if err != nil {
ctxLogger.Error("gRPC call", "method", info.FullMethod, "req", req, "err", err) ctxLogger.Error("gRPC call", "method", info.FullMethod, "req", req, "err", err)

View File

@ -34,7 +34,7 @@ type Provider interface {
} }
type gPRCServerService struct { type gPRCServerService struct {
cfg *setting.Cfg cfg setting.GRPCServerSettings
logger log.Logger logger log.Logger
server *grpc.Server server *grpc.Server
address string address string
@ -44,9 +44,9 @@ type gPRCServerService struct {
func ProvideService(cfg *setting.Cfg, features featuremgmt.FeatureToggles, authenticator interceptors.Authenticator, tracer tracing.Tracer, registerer prometheus.Registerer) (Provider, error) { func ProvideService(cfg *setting.Cfg, features featuremgmt.FeatureToggles, authenticator interceptors.Authenticator, tracer tracing.Tracer, registerer prometheus.Registerer) (Provider, error) {
s := &gPRCServerService{ s := &gPRCServerService{
cfg: cfg, cfg: cfg.GRPCServer,
logger: log.New("grpc-server"), logger: log.New("grpc-server"),
enabled: features.IsEnabledGlobally(featuremgmt.FlagGrpcServer), enabled: features.IsEnabledGlobally(featuremgmt.FlagGrpcServer), // TODO: replace with cfg.GRPCServer.Enabled when we remove feature toggle.
startedChan: make(chan struct{}), startedChan: make(chan struct{}),
} }
@ -75,7 +75,7 @@ func ProvideService(cfg *setting.Cfg, features featuremgmt.FeatureToggles, authe
grpc.StatsHandler(otelgrpc.NewServerHandler()), grpc.StatsHandler(otelgrpc.NewServerHandler()),
grpc.ChainUnaryInterceptor( grpc.ChainUnaryInterceptor(
grpcAuth.UnaryServerInterceptor(authenticator.Authenticate), grpcAuth.UnaryServerInterceptor(authenticator.Authenticate),
interceptors.LoggingUnaryInterceptor(s.cfg, s.logger), // needs to be registered after tracing interceptor to get trace id interceptors.LoggingUnaryInterceptor(s.logger, s.cfg.EnableLogging), // needs to be registered after tracing interceptor to get trace id
middleware.UnaryServerInstrumentInterceptor(grpcRequestDuration), middleware.UnaryServerInstrumentInterceptor(grpcRequestDuration),
), ),
grpc.ChainStreamInterceptor( grpc.ChainStreamInterceptor(
@ -85,16 +85,16 @@ func ProvideService(cfg *setting.Cfg, features featuremgmt.FeatureToggles, authe
), ),
} }
if s.cfg.GRPCServerTLSConfig != nil { if s.cfg.TLSConfig != nil {
opts = append(opts, grpc.Creds(credentials.NewTLS(cfg.GRPCServerTLSConfig))) opts = append(opts, grpc.Creds(credentials.NewTLS(s.cfg.TLSConfig)))
} }
if s.cfg.GRPCServerMaxRecvMsgSize > 0 { if s.cfg.MaxRecvMsgSize > 0 {
opts = append(opts, grpc.MaxRecvMsgSize(s.cfg.GRPCServerMaxRecvMsgSize)) opts = append(opts, grpc.MaxRecvMsgSize(s.cfg.MaxRecvMsgSize))
} }
if s.cfg.GRPCServerMaxSendMsgSize > 0 { if s.cfg.MaxSendMsgSize > 0 {
opts = append(opts, grpc.MaxSendMsgSize(s.cfg.GRPCServerMaxSendMsgSize)) opts = append(opts, grpc.MaxSendMsgSize(s.cfg.MaxSendMsgSize))
} }
s.server = grpc.NewServer(opts...) s.server = grpc.NewServer(opts...)
@ -102,9 +102,9 @@ func ProvideService(cfg *setting.Cfg, features featuremgmt.FeatureToggles, authe
} }
func (s *gPRCServerService) Run(ctx context.Context) error { func (s *gPRCServerService) Run(ctx context.Context) error {
s.logger.Info("Running GRPC server", "address", s.cfg.GRPCServerAddress, "network", s.cfg.GRPCServerNetwork, "tls", s.cfg.GRPCServerTLSConfig != nil, "max_recv_msg_size", s.cfg.GRPCServerMaxRecvMsgSize, "max_send_msg_size", s.cfg.GRPCServerMaxSendMsgSize) s.logger.Info("Running GRPC server", "address", s.cfg.Address, "network", s.cfg.Network, "tls", s.cfg.TLSConfig != nil, "max_recv_msg_size", s.cfg.MaxRecvMsgSize, "max_send_msg_size", s.cfg.MaxSendMsgSize)
listener, err := net.Listen(s.cfg.GRPCServerNetwork, s.cfg.GRPCServerAddress) listener, err := net.Listen(s.cfg.Network, s.cfg.Address)
if err != nil { if err != nil {
return fmt.Errorf("GRPC server: failed to listen: %w", err) return fmt.Errorf("GRPC server: failed to listen: %w", err)
} }

View File

@ -5,11 +5,9 @@ package setting
import ( import (
"bytes" "bytes"
"crypto/tls"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io/fs"
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
@ -480,12 +478,7 @@ type Cfg struct {
Zanzana ZanzanaSettings Zanzana ZanzanaSettings
// GRPC Server. // GRPC Server.
GRPCServerNetwork string GRPCServer GRPCServerSettings
GRPCServerAddress string
GRPCServerTLSConfig *tls.Config
GRPCServerEnableLogging bool // log request and response of each unary gRPC call
GRPCServerMaxRecvMsgSize int
GRPCServerMaxSendMsgSize int
CustomResponseHeaders map[string]string CustomResponseHeaders map[string]string
@ -1799,71 +1792,6 @@ func (cfg *Cfg) readAlertingSettings(iniFile *ini.File) error {
return nil return nil
} }
func readGRPCServerSettings(cfg *Cfg, iniFile *ini.File) error {
server := iniFile.Section("grpc_server")
errPrefix := "grpc_server:"
useTLS := server.Key("use_tls").MustBool(false)
certFile := server.Key("cert_file").String()
keyFile := server.Key("cert_key").String()
if useTLS {
serverCert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return fmt.Errorf("%s error loading X509 key pair: %w", errPrefix, err)
}
cfg.GRPCServerTLSConfig = &tls.Config{
Certificates: []tls.Certificate{serverCert},
ClientAuth: tls.NoClientCert,
}
}
cfg.GRPCServerNetwork = valueAsString(server, "network", "tcp")
cfg.GRPCServerAddress = valueAsString(server, "address", "")
cfg.GRPCServerEnableLogging = server.Key("enable_logging").MustBool(false)
cfg.GRPCServerMaxRecvMsgSize = server.Key("max_recv_msg_size").MustInt(0)
cfg.GRPCServerMaxSendMsgSize = server.Key("max_send_msg_size").MustInt(0)
switch cfg.GRPCServerNetwork {
case "unix":
if cfg.GRPCServerAddress != "" {
// Explicitly provided path for unix domain socket.
if stat, err := os.Stat(cfg.GRPCServerAddress); os.IsNotExist(err) {
// File does not exist - nice, nothing to do.
} else if err != nil {
return fmt.Errorf("%s error getting stat for a file: %s", errPrefix, cfg.GRPCServerAddress)
} else {
if stat.Mode()&fs.ModeSocket == 0 {
return fmt.Errorf("%s file %s already exists and is not a unix domain socket", errPrefix, cfg.GRPCServerAddress)
}
// Unix domain socket file, should be safe to remove.
err := os.Remove(cfg.GRPCServerAddress)
if err != nil {
return fmt.Errorf("%s can't remove unix socket file: %s", errPrefix, cfg.GRPCServerAddress)
}
}
} else {
// Use temporary file path for a unix domain socket.
tf, err := os.CreateTemp("", "gf_grpc_server_api")
if err != nil {
return fmt.Errorf("%s error creating tmp file: %v", errPrefix, err)
}
unixPath := tf.Name()
if err := tf.Close(); err != nil {
return fmt.Errorf("%s error closing tmp file: %v", errPrefix, err)
}
if err := os.Remove(unixPath); err != nil {
return fmt.Errorf("%s error removing tmp file: %v", errPrefix, err)
}
cfg.GRPCServerAddress = unixPath
}
case "tcp":
if cfg.GRPCServerAddress == "" {
cfg.GRPCServerAddress = "127.0.0.1:10000"
}
default:
return fmt.Errorf("%s unsupported network %s", errPrefix, cfg.GRPCServerNetwork)
}
return nil
}
func readSnapshotsSettings(cfg *Cfg, iniFile *ini.File) error { func readSnapshotsSettings(cfg *Cfg, iniFile *ini.File) error {
snapshots := iniFile.Section("snapshots") snapshots := iniFile.Section("snapshots")

137
pkg/setting/setting_grpc.go Normal file
View File

@ -0,0 +1,137 @@
package setting
import (
"crypto/tls"
"fmt"
"io/fs"
"os"
"github.com/spf13/pflag"
"gopkg.in/ini.v1"
)
type GRPCServerSettings struct {
Enabled bool
Network string
Address string // with flags, call Process to fill this field defaults
TLSConfig *tls.Config // with flags, call Process to fill this field
EnableLogging bool // log request and response of each unary gRPC call
MaxRecvMsgSize int
MaxSendMsgSize int
// Internal fields
useTLS bool
certFile string
keyFile string
}
func gRPCServerSettingsError(msg string, args ...interface{}) error {
return fmt.Errorf("grpc_server: "+msg, args...)
}
func (c *GRPCServerSettings) Process() error {
err := c.processTLSConfig()
if err != nil {
return err
}
return c.processAddress()
}
func (c *GRPCServerSettings) processTLSConfig() error {
if !c.useTLS {
return nil
}
serverCert, err := tls.LoadX509KeyPair(c.certFile, c.keyFile)
if err != nil {
return gRPCServerSettingsError("error loading X509 key pair: %w", err)
}
c.TLSConfig = &tls.Config{
Certificates: []tls.Certificate{serverCert},
ClientAuth: tls.NoClientCert,
}
return nil
}
func (c *GRPCServerSettings) processAddress() error {
switch c.Network {
case "unix":
if c.Address != "" {
// Explicitly provided path for unix domain socket.
if stat, err := os.Stat(c.Address); os.IsNotExist(err) {
// File does not exist - nice, nothing to do.
} else if err != nil {
return gRPCServerSettingsError("error getting stat for a file: %s", c.Address)
} else {
if stat.Mode()&fs.ModeSocket == 0 {
return gRPCServerSettingsError("file %s already exists and is not a unix domain socket", c.Address)
}
// Unix domain socket file, should be safe to remove.
err := os.Remove(c.Address)
if err != nil {
return gRPCServerSettingsError("can't remove unix socket file: %s", c.Address)
}
}
} else {
// Use temporary file path for a unix domain socket.
tf, err := os.CreateTemp("", "gf_grpc_server_api")
if err != nil {
return gRPCServerSettingsError("error creating tmp file: %v", err)
}
unixPath := tf.Name()
if err := tf.Close(); err != nil {
return gRPCServerSettingsError("error closing tmp file: %v", err)
}
if err := os.Remove(unixPath); err != nil {
return gRPCServerSettingsError("error removing tmp file: %v", err)
}
c.Address = unixPath
}
return nil
case "tcp":
if c.Address == "" {
c.Address = "127.0.0.1:10000"
}
return nil
default:
return gRPCServerSettingsError("unsupported network %s", c.Network)
}
}
func readGRPCServerSettings(cfg *Cfg, iniFile *ini.File) error {
server := iniFile.Section("grpc_server")
// This setting can be used to replace the `FlagGrpcServer` feature flag
cfg.GRPCServer.Enabled = server.Key("enabled").MustBool(false)
cfg.GRPCServer.useTLS = server.Key("use_tls").MustBool(false)
cfg.GRPCServer.certFile = server.Key("cert_file").String()
cfg.GRPCServer.keyFile = server.Key("cert_key").String()
if err := cfg.GRPCServer.processTLSConfig(); err != nil {
return err
}
cfg.GRPCServer.Network = valueAsString(server, "network", "tcp")
cfg.GRPCServer.Address = valueAsString(server, "address", "")
cfg.GRPCServer.EnableLogging = server.Key("enable_logging").MustBool(false)
cfg.GRPCServer.MaxRecvMsgSize = server.Key("max_recv_msg_size").MustInt(0)
cfg.GRPCServer.MaxSendMsgSize = server.Key("max_send_msg_size").MustInt(0)
return cfg.GRPCServer.processAddress()
}
func (c *GRPCServerSettings) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&c.Enabled, "grpc-server-enabled", false, "Enable gRPC server")
fs.StringVar(&c.Network, "grpc-server-network", "tcp", "Network type for the gRPC server (tcp, unix)")
fs.StringVar(&c.Address, "grpc-server-address", "", "Address for the gRPC server")
fs.BoolVar(&c.EnableLogging, "grpc-server-enable-logging", false, "Enable logging of gRPC requests and responses")
fs.IntVar(&c.MaxRecvMsgSize, "grpc-server-max-recv-msg-size", 0, "Maximum size of a gRPC request message in bytes")
fs.IntVar(&c.MaxSendMsgSize, "grpc-server-max-send-msg-size", 0, "Maximum size of a gRPC response message in bytes")
// Internal flags, we need to call ProcessTLSConfig
fs.BoolVar(&c.useTLS, "grpc-server-use-tls", false, "Enable TLS for the gRPC server")
fs.StringVar(&c.certFile, "grpc-server-cert-file", "", "Path to the certificate file for the gRPC server")
fs.StringVar(&c.keyFile, "grpc-server-key-file", "", "Path to the certificate key file for the gRPC server")
}

View File

@ -357,8 +357,8 @@ func TestClientServer(t *testing.T) {
dbstore := infraDB.InitTestDB(t) dbstore := infraDB.InitTestDB(t)
cfg := setting.NewCfg() cfg := setting.NewCfg()
cfg.GRPCServerAddress = "localhost:0" // get a free address cfg.GRPCServer.Address = "localhost:0" // get a free address
cfg.GRPCServerNetwork = "tcp" cfg.GRPCServer.Network = "tcp"
features := featuremgmt.WithFeatures() features := featuremgmt.WithFeatures()

View File

@ -68,10 +68,10 @@ func StartGrafanaEnv(t *testing.T, grafDir, cfgPath string) (string, *server.Tes
listener2, err := net.Listen("tcp", "127.0.0.1:0") listener2, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err) require.NoError(t, err)
cfg.GRPCServerNetwork = "tcp" cfg.GRPCServer.Network = "tcp"
cfg.GRPCServerAddress = listener2.Addr().String() cfg.GRPCServer.Address = listener2.Addr().String()
cfg.GRPCServerTLSConfig = nil cfg.GRPCServer.TLSConfig = nil
_, err = unistore.NewKey("address", cfg.GRPCServerAddress) _, err = unistore.NewKey("address", cfg.GRPCServer.Address)
require.NoError(t, err) require.NoError(t, err)
// release the one we just discovered -- it will be used by the services on startup // release the one we just discovered -- it will be used by the services on startup