diff --git a/pkg/server/module_server_test.go b/pkg/server/module_server_test.go index 7292f3cda8d..272fe56c53d 100644 --- a/pkg/server/module_server_test.go +++ b/pkg/server/module_server_test.go @@ -35,8 +35,8 @@ func TestIntegrationWillRunInstrumentationServerWhenTargetHasNoHttpServer(t *tes _, cfg := db.InitTestDBWithCfg(t) cfg.HTTPPort = "3001" - cfg.GRPCServerNetwork = "tcp" - cfg.GRPCServerAddress = "localhost:10000" + cfg.GRPCServer.Network = "tcp" + cfg.GRPCServer.Address = "localhost:10000" addStorageServerToConfig(t, cfg, dbType) cfg.Target = []string{modules.StorageServer} diff --git a/pkg/services/authz/rbac/service.go b/pkg/services/authz/rbac/service.go new file mode 100644 index 00000000000..c73fe9965b4 --- /dev/null +++ b/pkg/services/authz/rbac/service.go @@ -0,0 +1,37 @@ +package rbac + +import ( + authzv1 "github.com/grafana/authlib/authz/proto/v1" + authzextv1 "github.com/grafana/grafana/pkg/services/authz/proto/v1" + + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/tracing" + "github.com/grafana/grafana/pkg/services/authz/rbac/store" + "github.com/grafana/grafana/pkg/storage/legacysql" +) + +type Service struct { + authzv1.UnimplementedAuthzServiceServer + authzextv1.UnimplementedAuthzExtentionServiceServer + + store *store.Store + logger log.Logger + tracer tracing.Tracer +} + +func NewService(sql legacysql.LegacyDatabaseProvider, logger log.Logger, tracer tracing.Tracer) *Service { + return &Service{ + store: store.NewStore(sql), + logger: logger, + tracer: tracer, + } +} + +// TODO: Implement Check +// func (s *Service) Check(ctx context.Context, req *authzv1.CheckRequest) (*authzv1.CheckResponse, error) { +// This needs to be done for the database provider +// ns := req.GetNamespace() +// ctx = request.WithNamespace(ctx, ns) + +// return nil, nil +// } diff --git a/pkg/services/authz/rbac/store/store.go b/pkg/services/authz/rbac/store/store.go new file mode 100644 index 00000000000..3ba0971f60a --- /dev/null +++ b/pkg/services/authz/rbac/store/store.go @@ -0,0 +1,15 @@ +package store + +import "github.com/grafana/grafana/pkg/storage/legacysql" + +// TODO (gamab): Implement GetRoles, GetTeams, GetFolders, GetPermissions + +type Store struct { + sql legacysql.LegacyDatabaseProvider +} + +func NewStore(sql legacysql.LegacyDatabaseProvider) *Store { + return &Store{ + sql: sql, + } +} diff --git a/pkg/services/authz/server.go b/pkg/services/authz/server.go index 3ac60ebc5e0..8c471b9f8d0 100644 --- a/pkg/services/authz/server.go +++ b/pkg/services/authz/server.go @@ -16,11 +16,22 @@ import ( "github.com/grafana/grafana/pkg/services/accesscontrol" "github.com/grafana/grafana/pkg/services/authn" "github.com/grafana/grafana/pkg/services/authz/mappers" + authzextv1 "github.com/grafana/grafana/pkg/services/authz/proto/v1" + "github.com/grafana/grafana/pkg/services/authz/rbac" "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/folder" "github.com/grafana/grafana/pkg/services/grpcserver" + "github.com/grafana/grafana/pkg/storage/legacysql" ) +func RegisterRBACAuthZService(handler grpcserver.Provider, db legacysql.LegacyDatabaseProvider, tracer tracing.Tracer) { + server := rbac.NewService(db, log.New("authz-grpc-server"), tracer) + + srv := handler.GetServer() + authzv1.RegisterAuthzServiceServer(srv, server) + authzextv1.RegisterAuthzExtentionServiceServer(srv, server) +} + var _ authzv1.AuthzServiceServer = (*legacyServer)(nil) var _ grpc_auth.ServiceAuthFuncOverride = (*legacyServer)(nil) var _ authzlib.ServiceAuthorizeFuncOverride = (*legacyServer)(nil) diff --git a/pkg/services/grpcserver/interceptors/logging.go b/pkg/services/grpcserver/interceptors/logging.go index 96ceda3fdb2..2a3997a7024 100644 --- a/pkg/services/grpcserver/interceptors/logging.go +++ b/pkg/services/grpcserver/interceptors/logging.go @@ -4,11 +4,10 @@ import ( "context" "github.com/grafana/grafana/pkg/infra/log" - "github.com/grafana/grafana/pkg/setting" "google.golang.org/grpc" ) -func LoggingUnaryInterceptor(cfg *setting.Cfg, logger log.Logger) grpc.UnaryServerInterceptor { +func LoggingUnaryInterceptor(logger log.Logger, enabled bool) grpc.UnaryServerInterceptor { return func( ctx context.Context, req any, @@ -16,7 +15,7 @@ func LoggingUnaryInterceptor(cfg *setting.Cfg, logger log.Logger) grpc.UnaryServ handler grpc.UnaryHandler, ) (resp any, err error) { resp, err = handler(ctx, req) - if cfg.GRPCServerEnableLogging { + if enabled { ctxLogger := logger.FromContext(ctx) if err != nil { ctxLogger.Error("gRPC call", "method", info.FullMethod, "req", req, "err", err) diff --git a/pkg/services/grpcserver/service.go b/pkg/services/grpcserver/service.go index 543d3488ecb..2fa158f2153 100644 --- a/pkg/services/grpcserver/service.go +++ b/pkg/services/grpcserver/service.go @@ -34,7 +34,7 @@ type Provider interface { } type gPRCServerService struct { - cfg *setting.Cfg + cfg setting.GRPCServerSettings logger log.Logger server *grpc.Server address string @@ -44,9 +44,9 @@ type gPRCServerService struct { func ProvideService(cfg *setting.Cfg, features featuremgmt.FeatureToggles, authenticator interceptors.Authenticator, tracer tracing.Tracer, registerer prometheus.Registerer) (Provider, error) { s := &gPRCServerService{ - cfg: cfg, + cfg: cfg.GRPCServer, logger: log.New("grpc-server"), - enabled: features.IsEnabledGlobally(featuremgmt.FlagGrpcServer), + enabled: features.IsEnabledGlobally(featuremgmt.FlagGrpcServer), // TODO: replace with cfg.GRPCServer.Enabled when we remove feature toggle. startedChan: make(chan struct{}), } @@ -75,7 +75,7 @@ func ProvideService(cfg *setting.Cfg, features featuremgmt.FeatureToggles, authe grpc.StatsHandler(otelgrpc.NewServerHandler()), grpc.ChainUnaryInterceptor( grpcAuth.UnaryServerInterceptor(authenticator.Authenticate), - interceptors.LoggingUnaryInterceptor(s.cfg, s.logger), // needs to be registered after tracing interceptor to get trace id + interceptors.LoggingUnaryInterceptor(s.logger, s.cfg.EnableLogging), // needs to be registered after tracing interceptor to get trace id middleware.UnaryServerInstrumentInterceptor(grpcRequestDuration), ), grpc.ChainStreamInterceptor( @@ -85,16 +85,16 @@ func ProvideService(cfg *setting.Cfg, features featuremgmt.FeatureToggles, authe ), } - if s.cfg.GRPCServerTLSConfig != nil { - opts = append(opts, grpc.Creds(credentials.NewTLS(cfg.GRPCServerTLSConfig))) + if s.cfg.TLSConfig != nil { + opts = append(opts, grpc.Creds(credentials.NewTLS(s.cfg.TLSConfig))) } - if s.cfg.GRPCServerMaxRecvMsgSize > 0 { - opts = append(opts, grpc.MaxRecvMsgSize(s.cfg.GRPCServerMaxRecvMsgSize)) + if s.cfg.MaxRecvMsgSize > 0 { + opts = append(opts, grpc.MaxRecvMsgSize(s.cfg.MaxRecvMsgSize)) } - if s.cfg.GRPCServerMaxSendMsgSize > 0 { - opts = append(opts, grpc.MaxSendMsgSize(s.cfg.GRPCServerMaxSendMsgSize)) + if s.cfg.MaxSendMsgSize > 0 { + opts = append(opts, grpc.MaxSendMsgSize(s.cfg.MaxSendMsgSize)) } s.server = grpc.NewServer(opts...) @@ -102,9 +102,9 @@ func ProvideService(cfg *setting.Cfg, features featuremgmt.FeatureToggles, authe } func (s *gPRCServerService) Run(ctx context.Context) error { - s.logger.Info("Running GRPC server", "address", s.cfg.GRPCServerAddress, "network", s.cfg.GRPCServerNetwork, "tls", s.cfg.GRPCServerTLSConfig != nil, "max_recv_msg_size", s.cfg.GRPCServerMaxRecvMsgSize, "max_send_msg_size", s.cfg.GRPCServerMaxSendMsgSize) + s.logger.Info("Running GRPC server", "address", s.cfg.Address, "network", s.cfg.Network, "tls", s.cfg.TLSConfig != nil, "max_recv_msg_size", s.cfg.MaxRecvMsgSize, "max_send_msg_size", s.cfg.MaxSendMsgSize) - listener, err := net.Listen(s.cfg.GRPCServerNetwork, s.cfg.GRPCServerAddress) + listener, err := net.Listen(s.cfg.Network, s.cfg.Address) if err != nil { return fmt.Errorf("GRPC server: failed to listen: %w", err) } diff --git a/pkg/setting/setting.go b/pkg/setting/setting.go index 9715d99a427..1a4cd3f9b4d 100644 --- a/pkg/setting/setting.go +++ b/pkg/setting/setting.go @@ -5,11 +5,9 @@ package setting import ( "bytes" - "crypto/tls" "encoding/json" "errors" "fmt" - "io/fs" "net/http" "net/url" "os" @@ -480,12 +478,7 @@ type Cfg struct { Zanzana ZanzanaSettings // GRPC Server. - GRPCServerNetwork string - GRPCServerAddress string - GRPCServerTLSConfig *tls.Config - GRPCServerEnableLogging bool // log request and response of each unary gRPC call - GRPCServerMaxRecvMsgSize int - GRPCServerMaxSendMsgSize int + GRPCServer GRPCServerSettings CustomResponseHeaders map[string]string @@ -1799,71 +1792,6 @@ func (cfg *Cfg) readAlertingSettings(iniFile *ini.File) error { return nil } -func readGRPCServerSettings(cfg *Cfg, iniFile *ini.File) error { - server := iniFile.Section("grpc_server") - errPrefix := "grpc_server:" - useTLS := server.Key("use_tls").MustBool(false) - certFile := server.Key("cert_file").String() - keyFile := server.Key("cert_key").String() - if useTLS { - serverCert, err := tls.LoadX509KeyPair(certFile, keyFile) - if err != nil { - return fmt.Errorf("%s error loading X509 key pair: %w", errPrefix, err) - } - cfg.GRPCServerTLSConfig = &tls.Config{ - Certificates: []tls.Certificate{serverCert}, - ClientAuth: tls.NoClientCert, - } - } - - cfg.GRPCServerNetwork = valueAsString(server, "network", "tcp") - cfg.GRPCServerAddress = valueAsString(server, "address", "") - cfg.GRPCServerEnableLogging = server.Key("enable_logging").MustBool(false) - cfg.GRPCServerMaxRecvMsgSize = server.Key("max_recv_msg_size").MustInt(0) - cfg.GRPCServerMaxSendMsgSize = server.Key("max_send_msg_size").MustInt(0) - switch cfg.GRPCServerNetwork { - case "unix": - if cfg.GRPCServerAddress != "" { - // Explicitly provided path for unix domain socket. - if stat, err := os.Stat(cfg.GRPCServerAddress); os.IsNotExist(err) { - // File does not exist - nice, nothing to do. - } else if err != nil { - return fmt.Errorf("%s error getting stat for a file: %s", errPrefix, cfg.GRPCServerAddress) - } else { - if stat.Mode()&fs.ModeSocket == 0 { - return fmt.Errorf("%s file %s already exists and is not a unix domain socket", errPrefix, cfg.GRPCServerAddress) - } - // Unix domain socket file, should be safe to remove. - err := os.Remove(cfg.GRPCServerAddress) - if err != nil { - return fmt.Errorf("%s can't remove unix socket file: %s", errPrefix, cfg.GRPCServerAddress) - } - } - } else { - // Use temporary file path for a unix domain socket. - tf, err := os.CreateTemp("", "gf_grpc_server_api") - if err != nil { - return fmt.Errorf("%s error creating tmp file: %v", errPrefix, err) - } - unixPath := tf.Name() - if err := tf.Close(); err != nil { - return fmt.Errorf("%s error closing tmp file: %v", errPrefix, err) - } - if err := os.Remove(unixPath); err != nil { - return fmt.Errorf("%s error removing tmp file: %v", errPrefix, err) - } - cfg.GRPCServerAddress = unixPath - } - case "tcp": - if cfg.GRPCServerAddress == "" { - cfg.GRPCServerAddress = "127.0.0.1:10000" - } - default: - return fmt.Errorf("%s unsupported network %s", errPrefix, cfg.GRPCServerNetwork) - } - return nil -} - func readSnapshotsSettings(cfg *Cfg, iniFile *ini.File) error { snapshots := iniFile.Section("snapshots") diff --git a/pkg/setting/setting_grpc.go b/pkg/setting/setting_grpc.go new file mode 100644 index 00000000000..a28c262b93c --- /dev/null +++ b/pkg/setting/setting_grpc.go @@ -0,0 +1,137 @@ +package setting + +import ( + "crypto/tls" + "fmt" + "io/fs" + "os" + + "github.com/spf13/pflag" + + "gopkg.in/ini.v1" +) + +type GRPCServerSettings struct { + Enabled bool + Network string + Address string // with flags, call Process to fill this field defaults + TLSConfig *tls.Config // with flags, call Process to fill this field + EnableLogging bool // log request and response of each unary gRPC call + MaxRecvMsgSize int + MaxSendMsgSize int + + // Internal fields + useTLS bool + certFile string + keyFile string +} + +func gRPCServerSettingsError(msg string, args ...interface{}) error { + return fmt.Errorf("grpc_server: "+msg, args...) +} + +func (c *GRPCServerSettings) Process() error { + err := c.processTLSConfig() + if err != nil { + return err + } + + return c.processAddress() +} + +func (c *GRPCServerSettings) processTLSConfig() error { + if !c.useTLS { + return nil + } + serverCert, err := tls.LoadX509KeyPair(c.certFile, c.keyFile) + if err != nil { + return gRPCServerSettingsError("error loading X509 key pair: %w", err) + } + c.TLSConfig = &tls.Config{ + Certificates: []tls.Certificate{serverCert}, + ClientAuth: tls.NoClientCert, + } + return nil +} + +func (c *GRPCServerSettings) processAddress() error { + switch c.Network { + case "unix": + if c.Address != "" { + // Explicitly provided path for unix domain socket. + if stat, err := os.Stat(c.Address); os.IsNotExist(err) { + // File does not exist - nice, nothing to do. + } else if err != nil { + return gRPCServerSettingsError("error getting stat for a file: %s", c.Address) + } else { + if stat.Mode()&fs.ModeSocket == 0 { + return gRPCServerSettingsError("file %s already exists and is not a unix domain socket", c.Address) + } + // Unix domain socket file, should be safe to remove. + err := os.Remove(c.Address) + if err != nil { + return gRPCServerSettingsError("can't remove unix socket file: %s", c.Address) + } + } + } else { + // Use temporary file path for a unix domain socket. + tf, err := os.CreateTemp("", "gf_grpc_server_api") + if err != nil { + return gRPCServerSettingsError("error creating tmp file: %v", err) + } + unixPath := tf.Name() + if err := tf.Close(); err != nil { + return gRPCServerSettingsError("error closing tmp file: %v", err) + } + if err := os.Remove(unixPath); err != nil { + return gRPCServerSettingsError("error removing tmp file: %v", err) + } + c.Address = unixPath + } + return nil + case "tcp": + if c.Address == "" { + c.Address = "127.0.0.1:10000" + } + return nil + default: + return gRPCServerSettingsError("unsupported network %s", c.Network) + } +} + +func readGRPCServerSettings(cfg *Cfg, iniFile *ini.File) error { + server := iniFile.Section("grpc_server") + + // This setting can be used to replace the `FlagGrpcServer` feature flag + cfg.GRPCServer.Enabled = server.Key("enabled").MustBool(false) + + cfg.GRPCServer.useTLS = server.Key("use_tls").MustBool(false) + cfg.GRPCServer.certFile = server.Key("cert_file").String() + cfg.GRPCServer.keyFile = server.Key("cert_key").String() + + if err := cfg.GRPCServer.processTLSConfig(); err != nil { + return err + } + + cfg.GRPCServer.Network = valueAsString(server, "network", "tcp") + cfg.GRPCServer.Address = valueAsString(server, "address", "") + cfg.GRPCServer.EnableLogging = server.Key("enable_logging").MustBool(false) + cfg.GRPCServer.MaxRecvMsgSize = server.Key("max_recv_msg_size").MustInt(0) + cfg.GRPCServer.MaxSendMsgSize = server.Key("max_send_msg_size").MustInt(0) + + return cfg.GRPCServer.processAddress() +} + +func (c *GRPCServerSettings) AddFlags(fs *pflag.FlagSet) { + fs.BoolVar(&c.Enabled, "grpc-server-enabled", false, "Enable gRPC server") + fs.StringVar(&c.Network, "grpc-server-network", "tcp", "Network type for the gRPC server (tcp, unix)") + fs.StringVar(&c.Address, "grpc-server-address", "", "Address for the gRPC server") + fs.BoolVar(&c.EnableLogging, "grpc-server-enable-logging", false, "Enable logging of gRPC requests and responses") + fs.IntVar(&c.MaxRecvMsgSize, "grpc-server-max-recv-msg-size", 0, "Maximum size of a gRPC request message in bytes") + fs.IntVar(&c.MaxSendMsgSize, "grpc-server-max-send-msg-size", 0, "Maximum size of a gRPC response message in bytes") + + // Internal flags, we need to call ProcessTLSConfig + fs.BoolVar(&c.useTLS, "grpc-server-use-tls", false, "Enable TLS for the gRPC server") + fs.StringVar(&c.certFile, "grpc-server-cert-file", "", "Path to the certificate file for the gRPC server") + fs.StringVar(&c.keyFile, "grpc-server-key-file", "", "Path to the certificate key file for the gRPC server") +} diff --git a/pkg/storage/unified/sql/test/integration_test.go b/pkg/storage/unified/sql/test/integration_test.go index d36a9a93c7f..6381e8609e8 100644 --- a/pkg/storage/unified/sql/test/integration_test.go +++ b/pkg/storage/unified/sql/test/integration_test.go @@ -357,8 +357,8 @@ func TestClientServer(t *testing.T) { dbstore := infraDB.InitTestDB(t) cfg := setting.NewCfg() - cfg.GRPCServerAddress = "localhost:0" // get a free address - cfg.GRPCServerNetwork = "tcp" + cfg.GRPCServer.Address = "localhost:0" // get a free address + cfg.GRPCServer.Network = "tcp" features := featuremgmt.WithFeatures() diff --git a/pkg/tests/testinfra/testinfra.go b/pkg/tests/testinfra/testinfra.go index 63272dc4f5d..0b675139d31 100644 --- a/pkg/tests/testinfra/testinfra.go +++ b/pkg/tests/testinfra/testinfra.go @@ -68,10 +68,10 @@ func StartGrafanaEnv(t *testing.T, grafDir, cfgPath string) (string, *server.Tes listener2, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) - cfg.GRPCServerNetwork = "tcp" - cfg.GRPCServerAddress = listener2.Addr().String() - cfg.GRPCServerTLSConfig = nil - _, err = unistore.NewKey("address", cfg.GRPCServerAddress) + cfg.GRPCServer.Network = "tcp" + cfg.GRPCServer.Address = listener2.Addr().String() + cfg.GRPCServer.TLSConfig = nil + _, err = unistore.NewKey("address", cfg.GRPCServer.Address) require.NoError(t, err) // release the one we just discovered -- it will be used by the services on startup