mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
GRPC Server: Add tracing interceptors (#56045)
Co-authored-by: Artur Wierzbicki <artur.wierzbicki@grafana.com>
This commit is contained in:
parent
a863a4d95d
commit
2d433194d0
@ -1,4 +1,4 @@
|
||||
package grpcserver
|
||||
package interceptors
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -17,15 +17,15 @@ import (
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// authenticator can authenticate GRPC requests.
|
||||
type authenticator struct {
|
||||
// Authenticator can authenticate GRPC requests.
|
||||
type Authenticator struct {
|
||||
logger log.Logger
|
||||
APIKey apikey.Service
|
||||
UserService user.Service
|
||||
}
|
||||
|
||||
func newAuthenticator(apiKey apikey.Service, userService user.Service) *authenticator {
|
||||
return &authenticator{
|
||||
func NewAuthenticator(apiKey apikey.Service, userService user.Service) *Authenticator {
|
||||
return &Authenticator{
|
||||
logger: log.New("grpc-server-authenticator"),
|
||||
APIKey: apiKey,
|
||||
UserService: userService,
|
||||
@ -34,13 +34,13 @@ func newAuthenticator(apiKey apikey.Service, userService user.Service) *authenti
|
||||
|
||||
// Authenticate checks that a token exists and is valid, and then removes the token from the
|
||||
// authorization header in the context.
|
||||
func (a *authenticator) authenticate(ctx context.Context) (context.Context, error) {
|
||||
func (a *Authenticator) Authenticate(ctx context.Context) (context.Context, error) {
|
||||
return a.tokenAuth(ctx)
|
||||
}
|
||||
|
||||
const tokenPrefix = "Bearer "
|
||||
|
||||
func (a *authenticator) tokenAuth(ctx context.Context) (context.Context, error) {
|
||||
func (a *Authenticator) tokenAuth(ctx context.Context) (context.Context, error) {
|
||||
auth, err := extractAuthorization(ctx)
|
||||
if err != nil {
|
||||
return ctx, err
|
||||
@ -57,50 +57,51 @@ func (a *authenticator) tokenAuth(ctx context.Context) (context.Context, error)
|
||||
|
||||
newCtx := purgeHeader(ctx, "authorization")
|
||||
|
||||
err = a.validateToken(ctx, token)
|
||||
_, err = a.getSignedInUser(ctx, token)
|
||||
if err != nil {
|
||||
logger.Warn("request with invalid token", "error", err, "token", token)
|
||||
return ctx, status.Error(codes.Unauthenticated, "invalid token")
|
||||
}
|
||||
|
||||
return newCtx, nil
|
||||
}
|
||||
|
||||
func (a *authenticator) validateToken(ctx context.Context, keyString string) error {
|
||||
decoded, err := apikeygenprefix.Decode(keyString)
|
||||
func (a *Authenticator) getSignedInUser(ctx context.Context, token string) (*user.SignedInUser, error) {
|
||||
decoded, err := apikeygenprefix.Decode(token)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
hash, err := decoded.Hash()
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
apikey, err := a.APIKey.GetAPIKeyByHash(ctx, hash)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if apikey == nil || apikey.ServiceAccountId == nil {
|
||||
return status.Error(codes.Unauthenticated, "api key does not have a service account")
|
||||
return nil, status.Error(codes.Unauthenticated, "api key does not have a service account")
|
||||
}
|
||||
|
||||
querySignedInUser := user.GetSignedInUserQuery{UserID: *apikey.ServiceAccountId, OrgID: apikey.OrgId}
|
||||
signedInUser, err := a.UserService.GetSignedInUserWithCacheCtx(ctx, &querySignedInUser)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !signedInUser.HasRole(org.RoleAdmin) {
|
||||
return fmt.Errorf("api key does not have admin role")
|
||||
return nil, fmt.Errorf("api key does not have admin role")
|
||||
}
|
||||
|
||||
// disabled service accounts are not allowed to access the API
|
||||
if signedInUser.IsDisabled {
|
||||
return fmt.Errorf("service account is disabled")
|
||||
return nil, fmt.Errorf("service account is disabled")
|
||||
}
|
||||
|
||||
return nil
|
||||
return signedInUser, nil
|
||||
}
|
||||
|
||||
func extractAuthorization(ctx context.Context) (string, error) {
|
@ -1,4 +1,4 @@
|
||||
package grpcserver
|
||||
package interceptors
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -22,10 +22,10 @@ func TestAuthenticator_Authenticate(t *testing.T) {
|
||||
Name: "Admin API Key",
|
||||
ServiceAccountId: &serviceAccountId,
|
||||
}, nil)
|
||||
a := newAuthenticator(s, &fakeUserService{OrgRole: org.RoleAdmin})
|
||||
a := NewAuthenticator(s, &fakeUserService{OrgRole: org.RoleAdmin})
|
||||
ctx, err := setupContext()
|
||||
require.NoError(t, err)
|
||||
_, err = a.authenticate(ctx)
|
||||
_, err = a.Authenticate(ctx)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
@ -37,10 +37,10 @@ func TestAuthenticator_Authenticate(t *testing.T) {
|
||||
Name: "Admin API Key",
|
||||
ServiceAccountId: &serviceAccountId,
|
||||
}, nil)
|
||||
a := newAuthenticator(s, &fakeUserService{OrgRole: org.RoleEditor})
|
||||
a := NewAuthenticator(s, &fakeUserService{OrgRole: org.RoleEditor})
|
||||
ctx, err := setupContext()
|
||||
require.NoError(t, err)
|
||||
_, err = a.authenticate(ctx)
|
||||
_, err = a.Authenticate(ctx)
|
||||
require.NotNil(t, err)
|
||||
})
|
||||
|
||||
@ -52,13 +52,13 @@ func TestAuthenticator_Authenticate(t *testing.T) {
|
||||
Name: "Admin API Key",
|
||||
ServiceAccountId: &serviceAccountId,
|
||||
}, nil)
|
||||
a := newAuthenticator(s, &fakeUserService{OrgRole: org.RoleAdmin})
|
||||
a := NewAuthenticator(s, &fakeUserService{OrgRole: org.RoleAdmin})
|
||||
ctx, err := setupContext()
|
||||
require.NoError(t, err)
|
||||
md, ok := metadata.FromIncomingContext(ctx)
|
||||
require.True(t, ok)
|
||||
require.NotEmpty(t, md["authorization"])
|
||||
ctx, err = a.authenticate(ctx)
|
||||
ctx, err = a.Authenticate(ctx)
|
||||
require.NoError(t, err)
|
||||
md, ok = metadata.FromIncomingContext(ctx)
|
||||
require.True(t, ok)
|
45
pkg/services/grpcserver/interceptors/tracing.go
Normal file
45
pkg/services/grpcserver/interceptors/tracing.go
Normal file
@ -0,0 +1,45 @@
|
||||
package interceptors
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
const tracingPrefix = "gRPC Server "
|
||||
|
||||
func TracingUnaryInterceptor(tracer tracing.Tracer) grpc.UnaryServerInterceptor {
|
||||
return func(
|
||||
ctx context.Context,
|
||||
req interface{},
|
||||
info *grpc.UnaryServerInfo,
|
||||
handler grpc.UnaryHandler,
|
||||
) (resp interface{}, err error) {
|
||||
ctx, span := tracer.Start(ctx, tracingPrefix+info.FullMethod)
|
||||
defer span.End()
|
||||
resp, err = handler(ctx, req)
|
||||
return resp, err
|
||||
}
|
||||
}
|
||||
|
||||
func TracingStreamInterceptor(tracer tracing.Tracer) grpc.StreamServerInterceptor {
|
||||
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
||||
ctx, span := tracer.Start(stream.Context(), tracingPrefix+info.FullMethod)
|
||||
defer span.End()
|
||||
tracingStream := &tracingServerStream{
|
||||
ServerStream: stream,
|
||||
ctx: ctx,
|
||||
}
|
||||
return handler(srv, tracingStream)
|
||||
}
|
||||
}
|
||||
|
||||
type tracingServerStream struct {
|
||||
grpc.ServerStream
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func (s *tracingServerStream) Context() context.Context {
|
||||
return s.ctx
|
||||
}
|
@ -7,11 +7,14 @@ import (
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/registry"
|
||||
"github.com/grafana/grafana/pkg/services/apikey"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/grpcserver/interceptors"
|
||||
"github.com/grafana/grafana/pkg/services/user"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
|
||||
grpcAuth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
@ -31,7 +34,7 @@ type GPRCServerService struct {
|
||||
address string
|
||||
}
|
||||
|
||||
func ProvideService(cfg *setting.Cfg, apiKey apikey.Service, userService user.Service) (Provider, error) {
|
||||
func ProvideService(cfg *setting.Cfg, apiKey apikey.Service, userService user.Service, tracer tracing.Tracer) (Provider, error) {
|
||||
s := &GPRCServerService{
|
||||
cfg: cfg,
|
||||
logger: log.New("grpc-server"),
|
||||
@ -42,10 +45,21 @@ func ProvideService(cfg *setting.Cfg, apiKey apikey.Service, userService user.Se
|
||||
// Default auth is admin token check, but this can be overridden by
|
||||
// services which implement ServiceAuthFuncOverride interface.
|
||||
// See https://github.com/grpc-ecosystem/go-grpc-middleware/blob/master/auth/auth.go#L30.
|
||||
authenticator := newAuthenticator(apiKey, userService)
|
||||
authenticator := interceptors.NewAuthenticator(apiKey, userService)
|
||||
|
||||
opts = append(opts, []grpc.ServerOption{
|
||||
grpc.StreamInterceptor(grpcAuth.StreamServerInterceptor(authenticator.authenticate)),
|
||||
grpc.UnaryInterceptor(grpcAuth.UnaryServerInterceptor(authenticator.authenticate)),
|
||||
grpc.UnaryInterceptor(
|
||||
grpc_middleware.ChainUnaryServer(
|
||||
grpcAuth.UnaryServerInterceptor(authenticator.Authenticate),
|
||||
interceptors.TracingUnaryInterceptor(tracer),
|
||||
),
|
||||
),
|
||||
grpc.StreamInterceptor(
|
||||
grpc_middleware.ChainStreamServer(
|
||||
interceptors.TracingStreamInterceptor(tracer),
|
||||
grpcAuth.StreamServerInterceptor(authenticator.Authenticate),
|
||||
),
|
||||
),
|
||||
}...)
|
||||
|
||||
if s.cfg.GRPCServerTLSConfig != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user