mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
use in-process grpc client instead of wrapping server interface (#81926)
* use in-process grpc client instead of wrapping server interface * comment out jwt token checks until we're ready to validate the token
This commit is contained in:
@@ -240,11 +240,13 @@ func (s *service) start(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
store, err := sqlstash.ProvideSQLEntityServer(eDB)
|
||||
storeServer, err := sqlstash.ProvideSQLEntityServer(eDB)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
store := entity.NewEntityStoreClientLocal(storeServer)
|
||||
|
||||
serverConfig.Config.RESTOptionsGetter = entitystorage.NewRESTOptionsGetter(s.cfg, store, o.RecommendedOptions.Etcd.StorageConfig.Codec)
|
||||
|
||||
case grafanaapiserveroptions.StorageTypeUnifiedGrpc:
|
||||
@@ -259,7 +261,7 @@ func (s *service) start(ctx context.Context) error {
|
||||
// defer conn.Close()
|
||||
|
||||
// Create a client instance
|
||||
store := entity.NewEntityStoreClientWrapper(conn)
|
||||
store := entity.NewEntityStoreClientGRPC(conn)
|
||||
|
||||
serverConfig.Config.RESTOptionsGetter = entitystorage.NewRESTOptionsGetter(s.cfg, store, o.RecommendedOptions.Etcd.StorageConfig.Codec)
|
||||
|
||||
|
||||
@@ -24,11 +24,11 @@ var _ generic.RESTOptionsGetter = (*RESTOptionsGetter)(nil)
|
||||
|
||||
type RESTOptionsGetter struct {
|
||||
cfg *setting.Cfg
|
||||
store entityStore.EntityStoreServer
|
||||
store entityStore.EntityStoreClient
|
||||
Codec runtime.Codec
|
||||
}
|
||||
|
||||
func NewRESTOptionsGetter(cfg *setting.Cfg, store entityStore.EntityStoreServer, codec runtime.Codec) *RESTOptionsGetter {
|
||||
func NewRESTOptionsGetter(cfg *setting.Cfg, store entityStore.EntityStoreClient, codec runtime.Codec) *RESTOptionsGetter {
|
||||
return &RESTOptionsGetter{
|
||||
cfg: cfg,
|
||||
store: store,
|
||||
|
||||
@@ -36,7 +36,7 @@ const MaxUpdateAttempts = 1
|
||||
// Storage implements storage.Interface and storage resources as JSON files on disk.
|
||||
type Storage struct {
|
||||
config *storagebackend.ConfigForResource
|
||||
store entityStore.EntityStoreServer
|
||||
store entityStore.EntityStoreClient
|
||||
gr schema.GroupResource
|
||||
codec runtime.Codec
|
||||
keyFunc func(obj runtime.Object) (string, error)
|
||||
@@ -52,7 +52,7 @@ type Storage struct {
|
||||
func NewStorage(
|
||||
config *storagebackend.ConfigForResource,
|
||||
gr schema.GroupResource,
|
||||
store entityStore.EntityStoreServer,
|
||||
store entityStore.EntityStoreClient,
|
||||
codec runtime.Codec,
|
||||
keyFunc func(obj runtime.Object) (string, error),
|
||||
newFunc func() runtime.Object,
|
||||
|
||||
@@ -1,94 +1,30 @@
|
||||
package entity
|
||||
|
||||
import (
|
||||
context "context"
|
||||
"strconv"
|
||||
"github.com/fullstorydev/grpchan"
|
||||
"github.com/fullstorydev/grpchan/inprocgrpc"
|
||||
grpcAuth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
grpc "google.golang.org/grpc"
|
||||
codes "google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/metadata"
|
||||
status "google.golang.org/grpc/status"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/appcontext"
|
||||
grpcUtils "github.com/grafana/grafana/pkg/services/store/entity/grpc"
|
||||
)
|
||||
|
||||
var _ EntityStoreServer = (*entityStoreClientWrapper)(nil)
|
||||
func NewEntityStoreClientLocal(server EntityStoreServer) EntityStoreClient {
|
||||
channel := &inprocgrpc.Channel{}
|
||||
|
||||
// wrapper for EntityStoreClient that implements EntityStore interface
|
||||
type entityStoreClientWrapper struct {
|
||||
EntityStoreClient
|
||||
auth := &grpcUtils.Authenticator{}
|
||||
|
||||
channel.RegisterService(
|
||||
grpchan.InterceptServer(
|
||||
&EntityStore_ServiceDesc,
|
||||
grpcAuth.UnaryServerInterceptor(auth.Authenticate),
|
||||
grpcAuth.StreamServerInterceptor(auth.Authenticate),
|
||||
),
|
||||
server,
|
||||
)
|
||||
return NewEntityStoreClient(grpchan.InterceptClientConn(channel, grpcUtils.UnaryClientInterceptor, grpcUtils.StreamClientInterceptor))
|
||||
}
|
||||
|
||||
func (c *entityStoreClientWrapper) Read(ctx context.Context, in *ReadEntityRequest) (*Entity, error) {
|
||||
ctx, err := c.wrapContext(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return c.EntityStoreClient.Read(ctx, in)
|
||||
}
|
||||
func (c *entityStoreClientWrapper) BatchRead(ctx context.Context, in *BatchReadEntityRequest) (*BatchReadEntityResponse, error) {
|
||||
ctx, err := c.wrapContext(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return c.EntityStoreClient.BatchRead(ctx, in)
|
||||
}
|
||||
func (c *entityStoreClientWrapper) Create(ctx context.Context, in *CreateEntityRequest) (*CreateEntityResponse, error) {
|
||||
ctx, err := c.wrapContext(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return c.EntityStoreClient.Create(ctx, in)
|
||||
}
|
||||
func (c *entityStoreClientWrapper) Update(ctx context.Context, in *UpdateEntityRequest) (*UpdateEntityResponse, error) {
|
||||
ctx, err := c.wrapContext(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return c.EntityStoreClient.Update(ctx, in)
|
||||
}
|
||||
func (c *entityStoreClientWrapper) Delete(ctx context.Context, in *DeleteEntityRequest) (*DeleteEntityResponse, error) {
|
||||
ctx, err := c.wrapContext(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return c.EntityStoreClient.Delete(ctx, in)
|
||||
}
|
||||
func (c *entityStoreClientWrapper) History(ctx context.Context, in *EntityHistoryRequest) (*EntityHistoryResponse, error) {
|
||||
ctx, err := c.wrapContext(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return c.EntityStoreClient.History(ctx, in)
|
||||
}
|
||||
func (c *entityStoreClientWrapper) List(ctx context.Context, in *EntityListRequest) (*EntityListResponse, error) {
|
||||
ctx, err := c.wrapContext(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return c.EntityStoreClient.List(ctx, in)
|
||||
}
|
||||
func (c *entityStoreClientWrapper) Watch(*EntityWatchRequest, EntityStore_WatchServer) error {
|
||||
return status.Errorf(codes.Unimplemented, "method Watch not implemented")
|
||||
}
|
||||
|
||||
func (c *entityStoreClientWrapper) wrapContext(ctx context.Context) (context.Context, error) {
|
||||
user, err := appcontext.User(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// set grpc metadata into the context to pass to the grpc server
|
||||
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs(
|
||||
"grafana-idtoken", user.IDToken,
|
||||
"grafana-userid", strconv.FormatInt(user.UserID, 10),
|
||||
"grafana-orgid", strconv.FormatInt(user.OrgID, 10),
|
||||
"grafana-login", user.Login,
|
||||
))
|
||||
|
||||
return ctx, nil
|
||||
}
|
||||
|
||||
func NewEntityStoreClientWrapper(cc grpc.ClientConnInterface) EntityStoreServer {
|
||||
return &entityStoreClientWrapper{&entityStoreClient{cc}}
|
||||
func NewEntityStoreClientGRPC(channel *grpc.ClientConn) EntityStoreClient {
|
||||
return NewEntityStoreClient(grpchan.InterceptClientConn(channel, grpcUtils.UnaryClientInterceptor, grpcUtils.StreamClientInterceptor))
|
||||
}
|
||||
|
||||
97
pkg/services/store/entity/grpc/authenticator.go
Normal file
97
pkg/services/store/entity/grpc/authenticator.go
Normal file
@@ -0,0 +1,97 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/appcontext"
|
||||
"github.com/grafana/grafana/pkg/services/grpcserver/interceptors"
|
||||
"github.com/grafana/grafana/pkg/services/user"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
type Authenticator struct{}
|
||||
|
||||
func (f *Authenticator) Authenticate(ctx context.Context) (context.Context, error) {
|
||||
md, ok := metadata.FromIncomingContext(ctx)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("no metadata found")
|
||||
}
|
||||
|
||||
// TODO: use id token instead of these fields
|
||||
login := md.Get("grafana-login")[0]
|
||||
if login == "" {
|
||||
return nil, fmt.Errorf("no login found in context")
|
||||
}
|
||||
userID, err := strconv.ParseInt(md.Get("grafana-userid")[0], 10, 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid user id: %w", err)
|
||||
}
|
||||
orgID, err := strconv.ParseInt(md.Get("grafana-orgid")[0], 10, 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid org id: %w", err)
|
||||
}
|
||||
|
||||
// TODO: validate id token
|
||||
/*
|
||||
idToken := md.Get("grafana-idtoken")[0]
|
||||
if idToken == "" {
|
||||
return nil, fmt.Errorf("no id token found in context")
|
||||
}
|
||||
jwtToken, err := jwt.ParseSigned(idToken)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid id token: %w", err)
|
||||
}
|
||||
claims := jwt.Claims{}
|
||||
err = jwtToken.UnsafeClaimsWithoutVerification(&claims)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid id token: %w", err)
|
||||
}
|
||||
// fmt.Printf("JWT CLAIMS: %+v\n", claims)
|
||||
*/
|
||||
|
||||
return appcontext.WithUser(ctx, &user.SignedInUser{
|
||||
Login: login,
|
||||
UserID: userID,
|
||||
OrgID: orgID,
|
||||
}), nil
|
||||
}
|
||||
|
||||
var _ interceptors.Authenticator = (*Authenticator)(nil)
|
||||
|
||||
func UnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
ctx, err := WrapContext(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return invoker(ctx, method, req, reply, cc, opts...)
|
||||
}
|
||||
|
||||
var _ grpc.UnaryClientInterceptor = UnaryClientInterceptor
|
||||
|
||||
func StreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
ctx, err := WrapContext(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return streamer(ctx, desc, cc, method, opts...)
|
||||
}
|
||||
|
||||
var _ grpc.StreamClientInterceptor = StreamClientInterceptor
|
||||
|
||||
func WrapContext(ctx context.Context) (context.Context, error) {
|
||||
user, err := appcontext.User(ctx)
|
||||
if err != nil {
|
||||
return ctx, err
|
||||
}
|
||||
|
||||
// set grpc metadata into the context to pass to the grpc server
|
||||
return metadata.NewOutgoingContext(ctx, metadata.Pairs(
|
||||
"grafana-idtoken", user.IDToken,
|
||||
"grafana-userid", strconv.FormatInt(user.UserID, 10),
|
||||
"grafana-orgid", strconv.FormatInt(user.OrgID, 10),
|
||||
"grafana-login", user.Login,
|
||||
)), nil
|
||||
}
|
||||
@@ -2,15 +2,10 @@ package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"github.com/go-jose/go-jose/v3/jwt"
|
||||
"github.com/grafana/dskit/services"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"google.golang.org/grpc/metadata"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/appcontext"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/modules"
|
||||
"github.com/grafana/grafana/pkg/registry"
|
||||
@@ -19,8 +14,8 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/grpcserver/interceptors"
|
||||
"github.com/grafana/grafana/pkg/services/store/entity"
|
||||
"github.com/grafana/grafana/pkg/services/store/entity/db/dbimpl"
|
||||
"github.com/grafana/grafana/pkg/services/store/entity/grpc"
|
||||
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash"
|
||||
"github.com/grafana/grafana/pkg/services/user"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
)
|
||||
|
||||
@@ -58,53 +53,6 @@ type service struct {
|
||||
authenticator interceptors.Authenticator
|
||||
}
|
||||
|
||||
type Authenticator struct{}
|
||||
|
||||
func (f *Authenticator) Authenticate(ctx context.Context) (context.Context, error) {
|
||||
md, ok := metadata.FromIncomingContext(ctx)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("no metadata found")
|
||||
}
|
||||
|
||||
// TODO: use id token instead of these fields
|
||||
login := md.Get("grafana-login")[0]
|
||||
if login == "" {
|
||||
return nil, fmt.Errorf("no login found in context")
|
||||
}
|
||||
userID, err := strconv.ParseInt(md.Get("grafana-userid")[0], 10, 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid user id: %w", err)
|
||||
}
|
||||
orgID, err := strconv.ParseInt(md.Get("grafana-orgid")[0], 10, 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid org id: %w", err)
|
||||
}
|
||||
|
||||
// TODO: validate id token
|
||||
idToken := md.Get("grafana-idtoken")[0]
|
||||
if idToken == "" {
|
||||
return nil, fmt.Errorf("no id token found in context")
|
||||
}
|
||||
jwtToken, err := jwt.ParseSigned(idToken)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid id token: %w", err)
|
||||
}
|
||||
claims := jwt.Claims{}
|
||||
err = jwtToken.UnsafeClaimsWithoutVerification(&claims)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid id token: %w", err)
|
||||
}
|
||||
// fmt.Printf("JWT CLAIMS: %+v\n", claims)
|
||||
|
||||
return appcontext.WithUser(ctx, &user.SignedInUser{
|
||||
Login: login,
|
||||
UserID: userID,
|
||||
OrgID: orgID,
|
||||
}), nil
|
||||
}
|
||||
|
||||
var _ interceptors.Authenticator = (*Authenticator)(nil)
|
||||
|
||||
func ProvideService(
|
||||
cfg *setting.Cfg,
|
||||
features featuremgmt.FeatureToggles,
|
||||
@@ -114,7 +62,7 @@ func ProvideService(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
authn := &Authenticator{}
|
||||
authn := &grpc.Authenticator{}
|
||||
|
||||
s := &service{
|
||||
config: newConfig(cfg),
|
||||
|
||||
Reference in New Issue
Block a user