ResourceClient: Exercise resource client in k8s apis tests (#93473)

This commit is contained in:
Ryan McKinley 2024-09-19 17:16:48 +03:00 committed by GitHub
parent 63195664f4
commit 542105b680
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 68 additions and 28 deletions

View File

@ -9,6 +9,7 @@ import (
"github.com/grafana/grafana/pkg/services/notifications"
"github.com/grafana/grafana/pkg/services/oauthtoken/oauthtokentest"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/web"
)
@ -23,6 +24,7 @@ func ProvideTestEnv(
httpClientProvider httpclient.Provider,
oAuthTokenService *oauthtokentest.Service,
featureMgmt featuremgmt.FeatureToggles,
resourceClient resource.ResourceClient,
) (*TestEnv, error) {
return &TestEnv{
Server: server,
@ -35,6 +37,7 @@ func ProvideTestEnv(
HTTPClientProvider: httpClientProvider,
OAuthTokenService: oAuthTokenService,
FeatureToggles: featureMgmt,
ResourceClient: resourceClient,
}, nil
}
@ -50,4 +53,5 @@ type TestEnv struct {
OAuthTokenService *oauthtokentest.Service
RequestMiddleware web.Middleware
FeatureToggles featuremgmt.FeatureToggles
ResourceClient resource.ResourceClient
}

View File

@ -25,11 +25,11 @@ import (
var _ generic.RESTOptionsGetter = (*RESTOptionsGetter)(nil)
type RESTOptionsGetter struct {
client resource.ResourceStoreClient
client resource.ResourceClient
original storagebackend.Config
}
func NewRESTOptionsGetterForClient(client resource.ResourceStoreClient, original storagebackend.Config) *RESTOptionsGetter {
func NewRESTOptionsGetterForClient(client resource.ResourceClient, original storagebackend.Config) *RESTOptionsGetter {
return &RESTOptionsGetter{
client: client,
original: original,

View File

@ -48,7 +48,7 @@ type Storage struct {
trigger storage.IndexerFuncs
indexers *cache.Indexers
store resource.ResourceStoreClient
store resource.ResourceClient
getKey func(string) (*resource.ResourceKey, error)
watchSet *WatchSet
@ -64,7 +64,7 @@ var ErrNamespaceNotExists = errors.New("namespace does not exist")
// NewStorage instantiates a new Storage.
func NewStorage(
config *storagebackend.ConfigForResource,
store resource.ResourceStoreClient,
store resource.ResourceClient,
keyFunc func(obj runtime.Object) (string, error),
keyParser func(key string) (*resource.ResourceKey, error),
newFunc func() runtime.Object,

View File

@ -2,6 +2,7 @@ package unified
import (
"context"
"fmt"
"path/filepath"
infraDB "github.com/grafana/grafana/pkg/infra/db"
@ -59,6 +60,10 @@ func ProvideUnifiedStorageClient(
return resource.NewLocalResourceClient(server), nil
case options.StorageTypeUnifiedGrpc:
if opts.Address == "" {
return nil, fmt.Errorf("expecting address for storage_type: %s", opts.StorageType)
}
// Create a connection to the gRPC server
conn, err := grpc.NewClient(opts.Address,
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),

View File

@ -35,15 +35,20 @@ func NewLocalResourceClient(server ResourceServer) ResourceClient {
channel := &inprocgrpc.Channel{}
auth := &grpcUtils.Authenticator{}
channel.RegisterService(
grpchan.InterceptServer(
&ResourceStore_ServiceDesc,
grpcAuth.UnaryServerInterceptor(auth.Authenticate),
grpcAuth.StreamServerInterceptor(auth.Authenticate),
),
server, // Implements all the things
)
for _, desc := range []*grpc.ServiceDesc{
&ResourceStore_ServiceDesc,
&ResourceIndex_ServiceDesc,
&Diagnostics_ServiceDesc,
} {
channel.RegisterService(
grpchan.InterceptServer(
desc,
grpcAuth.UnaryServerInterceptor(auth.Authenticate),
grpcAuth.StreamServerInterceptor(auth.Authenticate),
),
server,
)
}
cc := grpchan.InterceptClientConn(channel, grpcUtils.UnaryClientInterceptor, grpcUtils.StreamClientInterceptor)
return &resourceClient{

View File

@ -98,8 +98,11 @@ func (s *service) start(ctx context.Context) error {
return err
}
resource.RegisterResourceStoreServer(s.handler.GetServer(), server)
grpc_health_v1.RegisterHealthServer(s.handler.GetServer(), healthService)
srv := s.handler.GetServer()
resource.RegisterResourceStoreServer(srv, server)
resource.RegisterResourceIndexServer(srv, server)
resource.RegisterDiagnosticsServer(srv, server)
grpc_health_v1.RegisterHealthServer(srv, healthService)
// register reflection service
_, err = grpcserver.ProvideReflectionService(s.cfg, s.handler)

View File

@ -5,6 +5,7 @@ import (
"strings"
"testing"
"github.com/grafana/grafana/pkg/services/apiserver/options"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tests/apis"
@ -31,8 +32,9 @@ func TestIntegrationRequiresDevMode(t *testing.T) {
t.Skip("skipping integration test")
}
helper := apis.NewK8sTestHelper(t, testinfra.GrafanaOpts{
AppModeProduction: true, // should fail
DisableAnonymous: true,
AppModeProduction: true, // should fail
DisableAnonymous: true,
APIServerStorageType: options.StorageTypeUnifiedGrpc, // tests remote connection
EnableFeatureToggles: []string{
featuremgmt.FlagGrafanaAPIServerWithExperimentalAPIs, // Required to start the example service
},

View File

@ -41,6 +41,7 @@ import (
"github.com/grafana/grafana/pkg/services/team/teamimpl"
"github.com/grafana/grafana/pkg/services/user"
"github.com/grafana/grafana/pkg/services/user/userimpl"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/tests/testinfra"
)
@ -74,6 +75,12 @@ func NewK8sTestHelper(t *testing.T, opts testinfra.GrafanaOpts) *K8sTestHelper {
c.loadAPIGroups()
// ensure unified storage is alive and running
ctx := identity.WithRequester(context.Background(), c.Org1.Admin.Identity)
rsp, err := c.env.ResourceClient.IsHealthy(ctx, &resource.HealthCheckRequest{})
require.NoError(t, err, "unable to read resource client health check")
require.Equal(t, resource.HealthCheckResponse_SERVING, rsp.Status)
return c
}
@ -485,6 +492,7 @@ func (c *K8sTestHelper) CreateUser(name string, orgName string, basicRole org.Ro
require.Equal(c.t, orgId, u.OrgID)
require.True(c.t, u.ID > 0)
// should this always return a user with ID token?
s, err := userSvc.GetSignedInUser(context.Background(), &user.GetSignedInUserQuery{
UserID: u.ID,
Login: u.Login,

View File

@ -52,6 +52,28 @@ func StartGrafanaEnv(t *testing.T, grafDir, cfgPath string) (string, *server.Tes
serverOpts := server.Options{Listener: listener, HomePath: grafDir}
apiServerOpts := api.ServerOptions{Listener: listener}
// Potentially allocate a real gRPC port for unified storage
runstore := false
unistore, _ := cfg.Raw.GetSection("grafana-apiserver")
if unistore != nil &&
unistore.Key("storage_type").MustString("") == string(options.StorageTypeUnifiedGrpc) &&
unistore.Key("address").String() == "" {
// Allocate a new address
listener2, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
cfg.GRPCServerNetwork = "tcp"
cfg.GRPCServerAddress = listener2.Addr().String()
cfg.GRPCServerTLSConfig = nil
_, err = unistore.NewKey("address", cfg.GRPCServerAddress)
require.NoError(t, err)
// release the one we just discovered -- it will be used by the services on startup
err = listener2.Close()
require.NoError(t, err)
runstore = true
}
env, err := server.InitializeForTest(t, cfg, serverOpts, apiServerOpts)
require.NoError(t, err)
@ -74,16 +96,8 @@ func StartGrafanaEnv(t *testing.T, grafDir, cfgPath string) (string, *server.Tes
// UnifiedStorageOverGRPC
var storage sql.UnifiedStorageGrpcService
unistore, _ := env.Cfg.Raw.GetSection("grafana-apiserver")
if unistore != nil &&
unistore.Key("storage_type").MustString("") == string(options.StorageTypeUnifiedGrpc) &&
unistore.Key("address").String() == "" { // no address is configured
copy := *env.Cfg
copy.GRPCServerNetwork = "tcp"
copy.GRPCServerAddress = "localhost:0"
copy.GRPCServerTLSConfig = nil
storage, err = sql.ProvideUnifiedStorageGrpcService(&copy, env.FeatureToggles, env.SQLStore, env.Cfg.Logger)
if runstore {
storage, err = sql.ProvideUnifiedStorageGrpcService(env.Cfg, env.FeatureToggles, env.SQLStore, env.Cfg.Logger)
require.NoError(t, err)
ctx := context.Background()
err = storage.StartAsync(ctx)
@ -91,7 +105,6 @@ func StartGrafanaEnv(t *testing.T, grafDir, cfgPath string) (string, *server.Tes
err = storage.AwaitRunning(ctx)
require.NoError(t, err)
_, err = unistore.NewKey("address", storage.GetAddress())
require.NoError(t, err)
t.Logf("Unified storage running on %s", storage.GetAddress())
}