mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
GRPC Server: Add gRPC server service (#47849)
Co-authored-by: Todd Treece <todd.treece@grafana.com> Co-authored-by: Ryan McKinley <ryantxu@gmail.com>
This commit is contained in:
129
pkg/services/grpcserver/auth.go
Normal file
129
pkg/services/grpcserver/auth.go
Normal file
@@ -0,0 +1,129 @@
|
||||
package grpcserver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/grafana/grafana/pkg/cmd/grafana-cli/logger"
|
||||
apikeygenprefix "github.com/grafana/grafana/pkg/components/apikeygenprefixed"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/apikey"
|
||||
"github.com/grafana/grafana/pkg/services/org"
|
||||
"github.com/grafana/grafana/pkg/services/user"
|
||||
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// authenticator can authenticate GRPC requests.
|
||||
type authenticator struct {
|
||||
logger log.Logger
|
||||
APIKey apikey.Service
|
||||
UserService user.Service
|
||||
}
|
||||
|
||||
func newAuthenticator(apiKey apikey.Service, userService user.Service) *authenticator {
|
||||
return &authenticator{
|
||||
logger: log.New("grpc-server-authenticator"),
|
||||
APIKey: apiKey,
|
||||
UserService: userService,
|
||||
}
|
||||
}
|
||||
|
||||
// Authenticate checks that a token exists and is valid, and then removes the token from the
|
||||
// authorization header in the context.
|
||||
func (a *authenticator) authenticate(ctx context.Context) (context.Context, error) {
|
||||
return a.tokenAuth(ctx)
|
||||
}
|
||||
|
||||
const tokenPrefix = "Bearer "
|
||||
|
||||
func (a *authenticator) tokenAuth(ctx context.Context) (context.Context, error) {
|
||||
auth, err := extractAuthorization(ctx)
|
||||
if err != nil {
|
||||
return ctx, err
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(auth, tokenPrefix) {
|
||||
return ctx, status.Error(codes.Unauthenticated, `missing "Bearer " prefix in "authorization" value`)
|
||||
}
|
||||
|
||||
token := strings.TrimPrefix(auth, tokenPrefix)
|
||||
if token == "" {
|
||||
return ctx, status.Error(codes.Unauthenticated, "token required")
|
||||
}
|
||||
|
||||
newCtx := purgeHeader(ctx, "authorization")
|
||||
|
||||
err = a.validateToken(ctx, token)
|
||||
if err != nil {
|
||||
logger.Warn("request with invalid token", "error", err, "token", token)
|
||||
return ctx, status.Error(codes.Unauthenticated, "invalid token")
|
||||
}
|
||||
return newCtx, nil
|
||||
}
|
||||
|
||||
func (a *authenticator) validateToken(ctx context.Context, keyString string) error {
|
||||
decoded, err := apikeygenprefix.Decode(keyString)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
hash, err := decoded.Hash()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
apikey, err := a.APIKey.GetAPIKeyByHash(ctx, hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if apikey == nil || apikey.ServiceAccountId == nil {
|
||||
return status.Error(codes.Unauthenticated, "api key does not have a service account")
|
||||
}
|
||||
|
||||
querySignedInUser := user.GetSignedInUserQuery{UserID: *apikey.ServiceAccountId, OrgID: apikey.OrgId}
|
||||
signedInUser, err := a.UserService.GetSignedInUserWithCacheCtx(ctx, &querySignedInUser)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !signedInUser.HasRole(org.RoleAdmin) {
|
||||
return fmt.Errorf("api key does not have admin role")
|
||||
}
|
||||
|
||||
// disabled service accounts are not allowed to access the API
|
||||
if signedInUser.IsDisabled {
|
||||
return fmt.Errorf("service account is disabled")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func extractAuthorization(ctx context.Context) (string, error) {
|
||||
md, ok := metadata.FromIncomingContext(ctx)
|
||||
if !ok {
|
||||
return "", status.Error(codes.Unauthenticated, "no headers in request")
|
||||
}
|
||||
|
||||
authHeaders, ok := md["authorization"]
|
||||
if !ok {
|
||||
return "", status.Error(codes.Unauthenticated, `no "authorization" header in request`)
|
||||
}
|
||||
|
||||
if len(authHeaders) != 1 {
|
||||
return "", status.Error(codes.Unauthenticated, `malformed "authorization" header: one value required`)
|
||||
}
|
||||
|
||||
return authHeaders[0], nil
|
||||
}
|
||||
|
||||
func purgeHeader(ctx context.Context, header string) context.Context {
|
||||
md, _ := metadata.FromIncomingContext(ctx)
|
||||
mdCopy := md.Copy()
|
||||
mdCopy[header] = nil
|
||||
return metadata.NewIncomingContext(ctx, mdCopy)
|
||||
}
|
||||
108
pkg/services/grpcserver/auth_test.go
Normal file
108
pkg/services/grpcserver/auth_test.go
Normal file
@@ -0,0 +1,108 @@
|
||||
package grpcserver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
apikeygenprefix "github.com/grafana/grafana/pkg/components/apikeygenprefixed"
|
||||
"github.com/grafana/grafana/pkg/services/apikey"
|
||||
"github.com/grafana/grafana/pkg/services/org"
|
||||
"github.com/grafana/grafana/pkg/services/user"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
func TestAuthenticator_Authenticate(t *testing.T) {
|
||||
serviceAccountId := int64(1)
|
||||
t.Run("accepts service api key with admin role", func(t *testing.T) {
|
||||
s := newFakeAPIKey(&apikey.APIKey{
|
||||
Id: 1,
|
||||
OrgId: 1,
|
||||
Key: "admin-api-key",
|
||||
Name: "Admin API Key",
|
||||
ServiceAccountId: &serviceAccountId,
|
||||
}, nil)
|
||||
a := newAuthenticator(s, &fakeUserService{OrgRole: org.RoleAdmin})
|
||||
ctx, err := setupContext()
|
||||
require.NoError(t, err)
|
||||
_, err = a.authenticate(ctx)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("rejects non-admin role", func(t *testing.T) {
|
||||
s := newFakeAPIKey(&apikey.APIKey{
|
||||
Id: 1,
|
||||
OrgId: 1,
|
||||
Key: "admin-api-key",
|
||||
Name: "Admin API Key",
|
||||
ServiceAccountId: &serviceAccountId,
|
||||
}, nil)
|
||||
a := newAuthenticator(s, &fakeUserService{OrgRole: org.RoleEditor})
|
||||
ctx, err := setupContext()
|
||||
require.NoError(t, err)
|
||||
_, err = a.authenticate(ctx)
|
||||
require.NotNil(t, err)
|
||||
})
|
||||
|
||||
t.Run("removes auth header from context", func(t *testing.T) {
|
||||
s := newFakeAPIKey(&apikey.APIKey{
|
||||
Id: 1,
|
||||
OrgId: 1,
|
||||
Key: "admin-api-key",
|
||||
Name: "Admin API Key",
|
||||
ServiceAccountId: &serviceAccountId,
|
||||
}, nil)
|
||||
a := newAuthenticator(s, &fakeUserService{OrgRole: org.RoleAdmin})
|
||||
ctx, err := setupContext()
|
||||
require.NoError(t, err)
|
||||
md, ok := metadata.FromIncomingContext(ctx)
|
||||
require.True(t, ok)
|
||||
require.NotEmpty(t, md["authorization"])
|
||||
ctx, err = a.authenticate(ctx)
|
||||
require.NoError(t, err)
|
||||
md, ok = metadata.FromIncomingContext(ctx)
|
||||
require.True(t, ok)
|
||||
require.Empty(t, md["authorization"])
|
||||
})
|
||||
}
|
||||
|
||||
type fakeAPIKey struct {
|
||||
apikey.Service
|
||||
key *apikey.APIKey
|
||||
err error
|
||||
}
|
||||
|
||||
func newFakeAPIKey(key *apikey.APIKey, err error) *fakeAPIKey {
|
||||
return &fakeAPIKey{
|
||||
key: key,
|
||||
err: err,
|
||||
}
|
||||
}
|
||||
|
||||
func (f *fakeAPIKey) GetAPIKeyByHash(ctx context.Context, hash string) (*apikey.APIKey, error) {
|
||||
return f.key, f.err
|
||||
}
|
||||
|
||||
type fakeUserService struct {
|
||||
user.Service
|
||||
OrgRole org.RoleType
|
||||
}
|
||||
|
||||
func (f *fakeUserService) GetSignedInUserWithCacheCtx(ctx context.Context, query *user.GetSignedInUserQuery) (*user.SignedInUser, error) {
|
||||
return &user.SignedInUser{
|
||||
UserID: 1,
|
||||
OrgID: 1,
|
||||
OrgRole: f.OrgRole,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func setupContext() (context.Context, error) {
|
||||
ctx := context.Background()
|
||||
key, err := apikeygenprefix.New("sa")
|
||||
if err != nil {
|
||||
return ctx, err
|
||||
}
|
||||
md := metadata.New(map[string]string{})
|
||||
md["authorization"] = []string{"Bearer " + key.ClientSecret}
|
||||
return metadata.NewIncomingContext(ctx, md), nil
|
||||
}
|
||||
37
pkg/services/grpcserver/health.go
Normal file
37
pkg/services/grpcserver/health.go
Normal file
@@ -0,0 +1,37 @@
|
||||
package grpcserver
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
|
||||
"google.golang.org/grpc/health"
|
||||
"google.golang.org/grpc/health/grpc_health_v1"
|
||||
)
|
||||
|
||||
// HealthService implements GRPC Health Checking Protocol:
|
||||
// https://github.com/grpc/grpc/blob/master/doc/health-checking.md
|
||||
// It also demonstrates how to override authentication for a service – in this
|
||||
// case we are disabling any auth in AuthFuncOverride.
|
||||
type HealthService struct {
|
||||
cfg *setting.Cfg
|
||||
healthServer *healthServer
|
||||
}
|
||||
|
||||
type healthServer struct {
|
||||
*health.Server
|
||||
}
|
||||
|
||||
// AuthFuncOverride for no auth for health service.
|
||||
func (s *healthServer) AuthFuncOverride(ctx context.Context, _ string) (context.Context, error) {
|
||||
return ctx, nil
|
||||
}
|
||||
|
||||
func ProvideHealthService(cfg *setting.Cfg, grpcServerProvider Provider) (*HealthService, error) {
|
||||
hs := &healthServer{health.NewServer()}
|
||||
grpc_health_v1.RegisterHealthServer(grpcServerProvider.GetServer(), hs)
|
||||
return &HealthService{
|
||||
cfg: cfg,
|
||||
healthServer: hs,
|
||||
}, nil
|
||||
}
|
||||
98
pkg/services/grpcserver/service.go
Normal file
98
pkg/services/grpcserver/service.go
Normal file
@@ -0,0 +1,98 @@
|
||||
package grpcserver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/registry"
|
||||
"github.com/grafana/grafana/pkg/services/apikey"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/user"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
grpcAuth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/reflection"
|
||||
)
|
||||
|
||||
type Provider interface {
|
||||
registry.BackgroundService
|
||||
GetServer() *grpc.Server
|
||||
}
|
||||
|
||||
type GPRCServerService struct {
|
||||
cfg *setting.Cfg
|
||||
logger log.Logger
|
||||
server *grpc.Server
|
||||
}
|
||||
|
||||
func ProvideService(cfg *setting.Cfg, apiKey apikey.Service, userService user.Service) (Provider, error) {
|
||||
s := &GPRCServerService{
|
||||
cfg: cfg,
|
||||
logger: log.New("grpc-server"),
|
||||
}
|
||||
|
||||
var opts []grpc.ServerOption
|
||||
|
||||
// Default auth is admin token check, but this can be overridden by
|
||||
// services which implement ServiceAuthFuncOverride interface.
|
||||
// See https://github.com/grpc-ecosystem/go-grpc-middleware/blob/master/auth/auth.go#L30.
|
||||
authenticator := newAuthenticator(apiKey, userService)
|
||||
opts = append(opts, []grpc.ServerOption{
|
||||
grpc.StreamInterceptor(grpcAuth.StreamServerInterceptor(authenticator.authenticate)),
|
||||
grpc.UnaryInterceptor(grpcAuth.UnaryServerInterceptor(authenticator.authenticate)),
|
||||
}...)
|
||||
|
||||
if s.cfg.GRPCServerTLSConfig != nil {
|
||||
opts = append(opts, grpc.Creds(credentials.NewTLS(cfg.GRPCServerTLSConfig)))
|
||||
}
|
||||
|
||||
grpcServer := grpc.NewServer(opts...)
|
||||
reflection.Register(grpcServer)
|
||||
s.server = grpcServer
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *GPRCServerService) Run(ctx context.Context) error {
|
||||
s.logger.Info("Running GRPC server", "address", s.cfg.GRPCServerAddress, "network", s.cfg.GRPCServerNetwork, "tls", s.cfg.GRPCServerTLSConfig != nil)
|
||||
|
||||
listener, err := net.Listen(s.cfg.GRPCServerNetwork, s.cfg.GRPCServerAddress)
|
||||
if err != nil {
|
||||
return fmt.Errorf("GRPC server: failed to listen: %w", err)
|
||||
}
|
||||
|
||||
serveErr := make(chan error, 1)
|
||||
go func() {
|
||||
s.logger.Info("GRPC server: starting")
|
||||
err := s.server.Serve(listener)
|
||||
if err != nil {
|
||||
backend.Logger.Error("GRPC server: failed to serve", "err", err)
|
||||
serveErr <- err
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-serveErr:
|
||||
backend.Logger.Error("GRPC server: failed to serve", "err", err)
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
}
|
||||
s.logger.Warn("GRPC server: shutting down")
|
||||
s.server.Stop()
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
func (s *GPRCServerService) IsDisabled() bool {
|
||||
if s.cfg == nil {
|
||||
return true
|
||||
}
|
||||
return !s.cfg.IsFeatureToggleEnabled(featuremgmt.FlagGrpcServer)
|
||||
}
|
||||
|
||||
func (s *GPRCServerService) GetServer() *grpc.Server {
|
||||
return s.server
|
||||
}
|
||||
Reference in New Issue
Block a user