From 542105b680a015d439e4b64900e42f73ee8d28d4 Mon Sep 17 00:00:00 2001 From: Ryan McKinley Date: Thu, 19 Sep 2024 17:16:48 +0300 Subject: [PATCH] ResourceClient: Exercise resource client in k8s apis tests (#93473) --- pkg/server/test_env.go | 4 +++ pkg/storage/unified/apistore/restoptions.go | 4 +-- pkg/storage/unified/apistore/store.go | 4 +-- pkg/storage/unified/client.go | 5 +++ pkg/storage/unified/resource/client.go | 23 ++++++++------ pkg/storage/unified/sql/service.go | 7 +++-- pkg/tests/apis/dashboard/dashboards_test.go | 6 ++-- pkg/tests/apis/helper.go | 8 +++++ pkg/tests/testinfra/testinfra.go | 35 ++++++++++++++------- 9 files changed, 68 insertions(+), 28 deletions(-) diff --git a/pkg/server/test_env.go b/pkg/server/test_env.go index a2098c0a8b6..104dc1242f6 100644 --- a/pkg/server/test_env.go +++ b/pkg/server/test_env.go @@ -9,6 +9,7 @@ import ( "github.com/grafana/grafana/pkg/services/notifications" "github.com/grafana/grafana/pkg/services/oauthtoken/oauthtokentest" "github.com/grafana/grafana/pkg/setting" + "github.com/grafana/grafana/pkg/storage/unified/resource" "github.com/grafana/grafana/pkg/web" ) @@ -23,6 +24,7 @@ func ProvideTestEnv( httpClientProvider httpclient.Provider, oAuthTokenService *oauthtokentest.Service, featureMgmt featuremgmt.FeatureToggles, + resourceClient resource.ResourceClient, ) (*TestEnv, error) { return &TestEnv{ Server: server, @@ -35,6 +37,7 @@ func ProvideTestEnv( HTTPClientProvider: httpClientProvider, OAuthTokenService: oAuthTokenService, FeatureToggles: featureMgmt, + ResourceClient: resourceClient, }, nil } @@ -50,4 +53,5 @@ type TestEnv struct { OAuthTokenService *oauthtokentest.Service RequestMiddleware web.Middleware FeatureToggles featuremgmt.FeatureToggles + ResourceClient resource.ResourceClient } diff --git a/pkg/storage/unified/apistore/restoptions.go b/pkg/storage/unified/apistore/restoptions.go index 6c1a4dbc039..ba7d46471a4 100644 --- a/pkg/storage/unified/apistore/restoptions.go +++ b/pkg/storage/unified/apistore/restoptions.go @@ -25,11 +25,11 @@ import ( var _ generic.RESTOptionsGetter = (*RESTOptionsGetter)(nil) type RESTOptionsGetter struct { - client resource.ResourceStoreClient + client resource.ResourceClient original storagebackend.Config } -func NewRESTOptionsGetterForClient(client resource.ResourceStoreClient, original storagebackend.Config) *RESTOptionsGetter { +func NewRESTOptionsGetterForClient(client resource.ResourceClient, original storagebackend.Config) *RESTOptionsGetter { return &RESTOptionsGetter{ client: client, original: original, diff --git a/pkg/storage/unified/apistore/store.go b/pkg/storage/unified/apistore/store.go index 17953f82cf4..4d2947ff54b 100644 --- a/pkg/storage/unified/apistore/store.go +++ b/pkg/storage/unified/apistore/store.go @@ -48,7 +48,7 @@ type Storage struct { trigger storage.IndexerFuncs indexers *cache.Indexers - store resource.ResourceStoreClient + store resource.ResourceClient getKey func(string) (*resource.ResourceKey, error) watchSet *WatchSet @@ -64,7 +64,7 @@ var ErrNamespaceNotExists = errors.New("namespace does not exist") // NewStorage instantiates a new Storage. func NewStorage( config *storagebackend.ConfigForResource, - store resource.ResourceStoreClient, + store resource.ResourceClient, keyFunc func(obj runtime.Object) (string, error), keyParser func(key string) (*resource.ResourceKey, error), newFunc func() runtime.Object, diff --git a/pkg/storage/unified/client.go b/pkg/storage/unified/client.go index 79e9c01448c..a48a08ee292 100644 --- a/pkg/storage/unified/client.go +++ b/pkg/storage/unified/client.go @@ -2,6 +2,7 @@ package unified import ( "context" + "fmt" "path/filepath" infraDB "github.com/grafana/grafana/pkg/infra/db" @@ -59,6 +60,10 @@ func ProvideUnifiedStorageClient( return resource.NewLocalResourceClient(server), nil case options.StorageTypeUnifiedGrpc: + if opts.Address == "" { + return nil, fmt.Errorf("expecting address for storage_type: %s", opts.StorageType) + } + // Create a connection to the gRPC server conn, err := grpc.NewClient(opts.Address, grpc.WithStatsHandler(otelgrpc.NewClientHandler()), diff --git a/pkg/storage/unified/resource/client.go b/pkg/storage/unified/resource/client.go index 92ea7686bf8..d38950a6641 100644 --- a/pkg/storage/unified/resource/client.go +++ b/pkg/storage/unified/resource/client.go @@ -35,15 +35,20 @@ func NewLocalResourceClient(server ResourceServer) ResourceClient { channel := &inprocgrpc.Channel{} auth := &grpcUtils.Authenticator{} - - channel.RegisterService( - grpchan.InterceptServer( - &ResourceStore_ServiceDesc, - grpcAuth.UnaryServerInterceptor(auth.Authenticate), - grpcAuth.StreamServerInterceptor(auth.Authenticate), - ), - server, // Implements all the things - ) + for _, desc := range []*grpc.ServiceDesc{ + &ResourceStore_ServiceDesc, + &ResourceIndex_ServiceDesc, + &Diagnostics_ServiceDesc, + } { + channel.RegisterService( + grpchan.InterceptServer( + desc, + grpcAuth.UnaryServerInterceptor(auth.Authenticate), + grpcAuth.StreamServerInterceptor(auth.Authenticate), + ), + server, + ) + } cc := grpchan.InterceptClientConn(channel, grpcUtils.UnaryClientInterceptor, grpcUtils.StreamClientInterceptor) return &resourceClient{ diff --git a/pkg/storage/unified/sql/service.go b/pkg/storage/unified/sql/service.go index bf381d493a2..ecd22176cc4 100644 --- a/pkg/storage/unified/sql/service.go +++ b/pkg/storage/unified/sql/service.go @@ -98,8 +98,11 @@ func (s *service) start(ctx context.Context) error { return err } - resource.RegisterResourceStoreServer(s.handler.GetServer(), server) - grpc_health_v1.RegisterHealthServer(s.handler.GetServer(), healthService) + srv := s.handler.GetServer() + resource.RegisterResourceStoreServer(srv, server) + resource.RegisterResourceIndexServer(srv, server) + resource.RegisterDiagnosticsServer(srv, server) + grpc_health_v1.RegisterHealthServer(srv, healthService) // register reflection service _, err = grpcserver.ProvideReflectionService(s.cfg, s.handler) diff --git a/pkg/tests/apis/dashboard/dashboards_test.go b/pkg/tests/apis/dashboard/dashboards_test.go index 78e5ce6ae98..5bab84bf236 100644 --- a/pkg/tests/apis/dashboard/dashboards_test.go +++ b/pkg/tests/apis/dashboard/dashboards_test.go @@ -5,6 +5,7 @@ import ( "strings" "testing" + "github.com/grafana/grafana/pkg/services/apiserver/options" "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/tests/apis" @@ -31,8 +32,9 @@ func TestIntegrationRequiresDevMode(t *testing.T) { t.Skip("skipping integration test") } helper := apis.NewK8sTestHelper(t, testinfra.GrafanaOpts{ - AppModeProduction: true, // should fail - DisableAnonymous: true, + AppModeProduction: true, // should fail + DisableAnonymous: true, + APIServerStorageType: options.StorageTypeUnifiedGrpc, // tests remote connection EnableFeatureToggles: []string{ featuremgmt.FlagGrafanaAPIServerWithExperimentalAPIs, // Required to start the example service }, diff --git a/pkg/tests/apis/helper.go b/pkg/tests/apis/helper.go index fe9efe26db0..98c7e634929 100644 --- a/pkg/tests/apis/helper.go +++ b/pkg/tests/apis/helper.go @@ -41,6 +41,7 @@ import ( "github.com/grafana/grafana/pkg/services/team/teamimpl" "github.com/grafana/grafana/pkg/services/user" "github.com/grafana/grafana/pkg/services/user/userimpl" + "github.com/grafana/grafana/pkg/storage/unified/resource" "github.com/grafana/grafana/pkg/tests/testinfra" ) @@ -74,6 +75,12 @@ func NewK8sTestHelper(t *testing.T, opts testinfra.GrafanaOpts) *K8sTestHelper { c.loadAPIGroups() + // ensure unified storage is alive and running + ctx := identity.WithRequester(context.Background(), c.Org1.Admin.Identity) + rsp, err := c.env.ResourceClient.IsHealthy(ctx, &resource.HealthCheckRequest{}) + require.NoError(t, err, "unable to read resource client health check") + require.Equal(t, resource.HealthCheckResponse_SERVING, rsp.Status) + return c } @@ -485,6 +492,7 @@ func (c *K8sTestHelper) CreateUser(name string, orgName string, basicRole org.Ro require.Equal(c.t, orgId, u.OrgID) require.True(c.t, u.ID > 0) + // should this always return a user with ID token? s, err := userSvc.GetSignedInUser(context.Background(), &user.GetSignedInUserQuery{ UserID: u.ID, Login: u.Login, diff --git a/pkg/tests/testinfra/testinfra.go b/pkg/tests/testinfra/testinfra.go index 03bd4056e16..9f6f5a0220b 100644 --- a/pkg/tests/testinfra/testinfra.go +++ b/pkg/tests/testinfra/testinfra.go @@ -52,6 +52,28 @@ func StartGrafanaEnv(t *testing.T, grafDir, cfgPath string) (string, *server.Tes serverOpts := server.Options{Listener: listener, HomePath: grafDir} apiServerOpts := api.ServerOptions{Listener: listener} + // Potentially allocate a real gRPC port for unified storage + runstore := false + unistore, _ := cfg.Raw.GetSection("grafana-apiserver") + if unistore != nil && + unistore.Key("storage_type").MustString("") == string(options.StorageTypeUnifiedGrpc) && + unistore.Key("address").String() == "" { + // Allocate a new address + listener2, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + cfg.GRPCServerNetwork = "tcp" + cfg.GRPCServerAddress = listener2.Addr().String() + cfg.GRPCServerTLSConfig = nil + _, err = unistore.NewKey("address", cfg.GRPCServerAddress) + require.NoError(t, err) + + // release the one we just discovered -- it will be used by the services on startup + err = listener2.Close() + require.NoError(t, err) + runstore = true + } + env, err := server.InitializeForTest(t, cfg, serverOpts, apiServerOpts) require.NoError(t, err) @@ -74,16 +96,8 @@ func StartGrafanaEnv(t *testing.T, grafDir, cfgPath string) (string, *server.Tes // UnifiedStorageOverGRPC var storage sql.UnifiedStorageGrpcService - unistore, _ := env.Cfg.Raw.GetSection("grafana-apiserver") - if unistore != nil && - unistore.Key("storage_type").MustString("") == string(options.StorageTypeUnifiedGrpc) && - unistore.Key("address").String() == "" { // no address is configured - copy := *env.Cfg - copy.GRPCServerNetwork = "tcp" - copy.GRPCServerAddress = "localhost:0" - copy.GRPCServerTLSConfig = nil - - storage, err = sql.ProvideUnifiedStorageGrpcService(©, env.FeatureToggles, env.SQLStore, env.Cfg.Logger) + if runstore { + storage, err = sql.ProvideUnifiedStorageGrpcService(env.Cfg, env.FeatureToggles, env.SQLStore, env.Cfg.Logger) require.NoError(t, err) ctx := context.Background() err = storage.StartAsync(ctx) @@ -91,7 +105,6 @@ func StartGrafanaEnv(t *testing.T, grafDir, cfgPath string) (string, *server.Tes err = storage.AwaitRunning(ctx) require.NoError(t, err) - _, err = unistore.NewKey("address", storage.GetAddress()) require.NoError(t, err) t.Logf("Unified storage running on %s", storage.GetAddress()) }