unistor client side updates

This commit is contained in:
Claudiu Dragalina-Paraipan 2024-09-24 17:34:36 +03:00
parent a28440c40b
commit fb7bbf743b
2 changed files with 35 additions and 23 deletions

View File

@ -5,6 +5,11 @@ import (
"fmt"
"path/filepath"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"gocloud.dev/blob/fileblob"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
infraDB "github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/apiserver/options"
@ -12,10 +17,6 @@ import (
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"gocloud.dev/blob/fileblob"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// This adds a UnifiedStorage client into the wire dependency tree
@ -72,7 +73,13 @@ func ProvideUnifiedStorageClient(
if err != nil {
return nil, err
}
return resource.NewResourceClient(conn), nil
// Create a client instance
client, err := newResourceClient(conn, cfg)
if err != nil {
return nil, err
}
return client, nil
// Use the local SQL
default:
@ -83,3 +90,10 @@ func ProvideUnifiedStorageClient(
return resource.NewLocalResourceClient(server), nil
}
}
func newResourceClient(conn *grpc.ClientConn, cfg *setting.Cfg) (resource.ResourceClient, error) {
if cfg.StackID != "" {
return resource.NewCloudResourceClient(conn, cfg)
}
return resource.NewGRPCResourceClient(conn)
}

View File

@ -1,14 +1,11 @@
package resource
import (
"context"
"fmt"
"time"
"context"
"crypto/tls"
"fmt"
"net/http"
"time"
"github.com/fullstorydev/grpchan"
"github.com/fullstorydev/grpchan/inprocgrpc"
@ -23,7 +20,7 @@ import (
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/services/auth"
"github.com/grafana/grafana/pkg/services/authn/grpcutils"
grpcUtils "github.com/grafana/grafana/pkg/storage/unified/resource/grpc"
"github.com/grafana/grafana/pkg/setting"
)
// TODO(drclau): decide on the audience for the resource store
@ -42,15 +39,6 @@ type resourceClient struct {
DiagnosticsClient
}
func NewResourceClient(channel *grpc.ClientConn) ResourceClient {
cc := grpchan.InterceptClientConn(channel, grpcUtils.UnaryClientInterceptor, grpcUtils.StreamClientInterceptor)
return &resourceClient{
ResourceStoreClient: NewResourceStoreClient(cc),
ResourceIndexClient: NewResourceIndexClient(cc),
DiagnosticsClient: NewDiagnosticsClient(cc),
}
}
func NewLocalResourceClient(server ResourceServer) ResourceClient {
// scenario: local in-proc
channel := &inprocgrpc.Channel{}
@ -85,7 +73,7 @@ func NewLocalResourceClient(server ResourceServer) ResourceClient {
}
}
func NewGRPCResourceClient(conn *grpc.ClientConn) (ResourceStoreClient, error) {
func NewGRPCResourceClient(conn *grpc.ClientConn) (ResourceClient, error) {
// scenario: remote on-prem
clientInt, err := authnlib.NewGrpcClientInterceptor(
&authnlib.GrpcClientConfig{},
@ -97,10 +85,15 @@ func NewGRPCResourceClient(conn *grpc.ClientConn) (ResourceStoreClient, error) {
return nil, err
}
return NewResourceStoreClient(grpchan.InterceptClientConn(conn, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor)), nil
cc := grpchan.InterceptClientConn(conn, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor)
return &resourceClient{
ResourceStoreClient: NewResourceStoreClient(cc),
ResourceIndexClient: NewResourceIndexClient(cc),
DiagnosticsClient: NewDiagnosticsClient(cc),
}, nil
}
func NewCloudResourceClient(conn *grpc.ClientConn, cfg *setting.Cfg) (ResourceStoreClient, error) {
func NewCloudResourceClient(conn *grpc.ClientConn, cfg *setting.Cfg) (ResourceClient, error) {
// scenario: remote cloud
grpcClientConfig := clientCfgMapping(grpcutils.ReadGrpcClientConfig(cfg))
@ -118,7 +111,12 @@ func NewCloudResourceClient(conn *grpc.ClientConn, cfg *setting.Cfg) (ResourceSt
return nil, err
}
return NewResourceStoreClient(grpchan.InterceptClientConn(conn, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor)), nil
cc := grpchan.InterceptClientConn(conn, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor)
return &resourceClient{
ResourceStoreClient: NewResourceStoreClient(cc),
ResourceIndexClient: NewResourceIndexClient(cc),
DiagnosticsClient: NewDiagnosticsClient(cc),
}, nil
}
func idTokenExtractor(ctx context.Context) (string, error) {