From 4baca6947d95e3a8b873098d131c7010d59fdc1c Mon Sep 17 00:00:00 2001 From: Georges Chaudy Date: Thu, 25 Jul 2024 18:17:39 +0200 Subject: [PATCH] ResourceServer: make the resource store the default unified storage backend (#90899) * make the resource store the default unified storage backend * add integration tests * fix test non passing * Update pkg/storage/unified/sql/test/integration_test.go Co-authored-by: Ryan McKinley * lint * fix tests * fix no rows --------- Co-authored-by: Ryan McKinley --- pkg/server/module_server.go | 4 +- pkg/server/module_server_test.go | 2 +- pkg/services/apiserver/options/storage.go | 16 +-- pkg/services/apiserver/service.go | 50 +------ pkg/services/grpcserver/service.go | 20 +-- pkg/storage/unified/resource/health.go | 88 ++++++++++++ pkg/storage/unified/resource/health_test.go | 123 ++++++++++++++++ pkg/storage/unified/sql/backend.go | 16 ++- pkg/storage/unified/sql/service.go | 134 ++++++++++++++++++ .../unified/sql/test/integration_test.go | 73 ++++++++++ 10 files changed, 452 insertions(+), 74 deletions(-) create mode 100644 pkg/storage/unified/resource/health.go create mode 100644 pkg/storage/unified/resource/health_test.go create mode 100644 pkg/storage/unified/sql/service.go diff --git a/pkg/server/module_server.go b/pkg/server/module_server.go index 71ee65f3367..4f8ba2f32a7 100644 --- a/pkg/server/module_server.go +++ b/pkg/server/module_server.go @@ -15,8 +15,8 @@ import ( "github.com/grafana/grafana/pkg/modules" "github.com/grafana/grafana/pkg/services/authz" "github.com/grafana/grafana/pkg/services/featuremgmt" - storageServer "github.com/grafana/grafana/pkg/services/store/entity/server" "github.com/grafana/grafana/pkg/setting" + "github.com/grafana/grafana/pkg/storage/unified/sql" ) // NewModule returns an instance of a ModuleServer, responsible for managing @@ -131,7 +131,7 @@ func (s *ModuleServer) Run() error { //} m.RegisterModule(modules.StorageServer, func() (services.Service, error) { - return storageServer.ProvideService(s.cfg, s.features, s.log) + return sql.ProvideService(s.cfg, s.features, nil, s.log) }) m.RegisterModule(modules.ZanzanaServer, func() (services.Service, error) { diff --git a/pkg/server/module_server_test.go b/pkg/server/module_server_test.go index a1d25884fc1..7292f3cda8d 100644 --- a/pkg/server/module_server_test.go +++ b/pkg/server/module_server_test.go @@ -63,7 +63,7 @@ func TestIntegrationWillRunInstrumentationServerWhenTargetHasNoHttpServer(t *tes } func addStorageServerToConfig(t *testing.T, cfg *setting.Cfg, dbType string) { - s, err := cfg.Raw.NewSection("entity_api") + s, err := cfg.Raw.NewSection("resource_api") require.NoError(t, err) _, err = s.NewKey("db_type", dbType) require.NoError(t, err) diff --git a/pkg/services/apiserver/options/storage.go b/pkg/services/apiserver/options/storage.go index 27568c0f96f..088bf5e5fe8 100644 --- a/pkg/services/apiserver/options/storage.go +++ b/pkg/services/apiserver/options/storage.go @@ -14,13 +14,11 @@ import ( type StorageType string const ( - StorageTypeFile StorageType = "file" - StorageTypeEtcd StorageType = "etcd" - StorageTypeLegacy StorageType = "legacy" - StorageTypeUnified StorageType = "unified" - StorageTypeUnifiedGrpc StorageType = "unified-grpc" - StorageTypeUnifiedNext StorageType = "unified-next" - StorageTypeUnifiedNextGrpc StorageType = "unified-next-grpc" + StorageTypeFile StorageType = "file" + StorageTypeEtcd StorageType = "etcd" + StorageTypeLegacy StorageType = "legacy" + StorageTypeUnified StorageType = "unified" + StorageTypeUnifiedGrpc StorageType = "unified-grpc" ) type StorageOptions struct { @@ -46,10 +44,10 @@ func (o *StorageOptions) AddFlags(fs *pflag.FlagSet) { func (o *StorageOptions) Validate() []error { errs := []error{} switch o.StorageType { - case StorageTypeFile, StorageTypeEtcd, StorageTypeLegacy, StorageTypeUnified, StorageTypeUnifiedGrpc, StorageTypeUnifiedNext, StorageTypeUnifiedNextGrpc: + case StorageTypeFile, StorageTypeEtcd, StorageTypeLegacy, StorageTypeUnified, StorageTypeUnifiedGrpc: // no-op default: - errs = append(errs, fmt.Errorf("--grafana-apiserver-storage-type must be one of %s, %s, %s, %s, %s, %s, %s", StorageTypeFile, StorageTypeEtcd, StorageTypeLegacy, StorageTypeUnified, StorageTypeUnifiedGrpc, StorageTypeUnifiedNext, StorageTypeUnifiedNextGrpc)) + errs = append(errs, fmt.Errorf("--grafana-apiserver-storage-type must be one of %s, %s, %s, %s, %s", StorageTypeFile, StorageTypeEtcd, StorageTypeLegacy, StorageTypeUnified, StorageTypeUnifiedGrpc)) } if _, _, err := net.SplitHostPort(o.Address); err != nil { diff --git a/pkg/services/apiserver/service.go b/pkg/services/apiserver/service.go index 1eff0cf9e3f..640e819bda6 100644 --- a/pkg/services/apiserver/service.go +++ b/pkg/services/apiserver/service.go @@ -36,17 +36,12 @@ import ( "github.com/grafana/grafana/pkg/services/apiserver/builder" "github.com/grafana/grafana/pkg/services/apiserver/endpoints/request" grafanaapiserveroptions "github.com/grafana/grafana/pkg/services/apiserver/options" - entitystorage "github.com/grafana/grafana/pkg/services/apiserver/storage/entity" "github.com/grafana/grafana/pkg/services/apiserver/utils" contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model" "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/org" - "github.com/grafana/grafana/pkg/services/store/entity" - "github.com/grafana/grafana/pkg/services/store/entity/db/dbimpl" - "github.com/grafana/grafana/pkg/services/store/entity/sqlstash" "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/storage/unified/apistore" - "github.com/grafana/grafana/pkg/storage/unified/entitybridge" "github.com/grafana/grafana/pkg/storage/unified/resource" "github.com/grafana/grafana/pkg/storage/unified/sql" ) @@ -269,7 +264,7 @@ func (s *service) start(ctx context.Context) error { return err } - case grafanaapiserveroptions.StorageTypeUnifiedNext: + case grafanaapiserveroptions.StorageTypeUnified: if !s.features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorage) { return fmt.Errorf("unified storage requires the unifiedStorage feature flag") } @@ -282,7 +277,7 @@ func (s *service) start(ctx context.Context) error { serverConfig.Config.RESTOptionsGetter = apistore.NewRESTOptionsGetterForClient(client, o.RecommendedOptions.Etcd.StorageConfig) - case grafanaapiserveroptions.StorageTypeUnifiedNextGrpc: + case grafanaapiserveroptions.StorageTypeUnifiedGrpc: if !s.features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorage) { return fmt.Errorf("unified storage requires the unifiedStorage feature flag") } @@ -296,47 +291,6 @@ func (s *service) start(ctx context.Context) error { client := resource.NewResourceStoreClientGRPC(conn) serverConfig.Config.RESTOptionsGetter = apistore.NewRESTOptionsGetterForClient(client, o.RecommendedOptions.Etcd.StorageConfig) - case grafanaapiserveroptions.StorageTypeUnified, grafanaapiserveroptions.StorageTypeUnifiedGrpc: - var client entity.EntityStoreClient - var entityServer sqlstash.SqlEntityServer - - if o.StorageOptions.StorageType == grafanaapiserveroptions.StorageTypeUnifiedGrpc { - conn, err := grpc.NewClient(o.StorageOptions.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - return err - } - client = entity.NewEntityStoreClientGRPC(conn) - } else { - if !s.features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorage) { - return fmt.Errorf("unified storage requires the unifiedStorage feature flag") - } - - eDB, err := dbimpl.ProvideEntityDB(s.db, s.cfg, s.features, s.tracing) - if err != nil { - return err - } - - entityServer, err = sqlstash.ProvideSQLEntityServer(eDB, s.tracing) - if err != nil { - return err - } - client = entity.NewEntityStoreClientLocal(entityServer) - } - - if false { - // Use the entity bridge - server, err := entitybridge.EntityAsResourceServer(client, entityServer, s.tracing) - if err != nil { - return err - } - client := resource.NewLocalResourceStoreClient(server) - serverConfig.Config.RESTOptionsGetter = apistore.NewRESTOptionsGetterForClient(client, - o.RecommendedOptions.Etcd.StorageConfig) - } else { - serverConfig.Config.RESTOptionsGetter = entitystorage.NewRESTOptionsGetter(s.cfg, - client, o.RecommendedOptions.Etcd.StorageConfig.Codec) - } - case grafanaapiserveroptions.StorageTypeLegacy: fallthrough case grafanaapiserveroptions.StorageTypeFile: diff --git a/pkg/services/grpcserver/service.go b/pkg/services/grpcserver/service.go index 898349b0644..f4777b44356 100644 --- a/pkg/services/grpcserver/service.go +++ b/pkg/services/grpcserver/service.go @@ -34,18 +34,20 @@ type Provider interface { } type gPRCServerService struct { - cfg *setting.Cfg - logger log.Logger - server *grpc.Server - address string - enabled bool + cfg *setting.Cfg + logger log.Logger + server *grpc.Server + address string + enabled bool + startedChan chan struct{} } func ProvideService(cfg *setting.Cfg, features featuremgmt.FeatureToggles, authenticator interceptors.Authenticator, tracer tracing.Tracer, registerer prometheus.Registerer) (Provider, error) { s := &gPRCServerService{ - cfg: cfg, - logger: log.New("grpc-server"), - enabled: features.IsEnabledGlobally(featuremgmt.FlagGrpcServer), + cfg: cfg, + logger: log.New("grpc-server"), + enabled: features.IsEnabledGlobally(featuremgmt.FlagGrpcServer), + startedChan: make(chan struct{}), } // Register the metric here instead of an init() function so that we do @@ -110,6 +112,7 @@ func (s *gPRCServerService) Run(ctx context.Context) error { } s.address = listener.Addr().String() + close(s.startedChan) serveErr := make(chan error, 1) go func() { @@ -141,5 +144,6 @@ func (s *gPRCServerService) GetServer() *grpc.Server { } func (s *gPRCServerService) GetAddress() string { + <-s.startedChan return s.address } diff --git a/pkg/storage/unified/resource/health.go b/pkg/storage/unified/resource/health.go new file mode 100644 index 00000000000..6c08643edc6 --- /dev/null +++ b/pkg/storage/unified/resource/health.go @@ -0,0 +1,88 @@ +package resource + +import ( + "context" + "errors" + "time" + + grpcAuth "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/auth" + "google.golang.org/grpc/health/grpc_health_v1" +) + +// Compile-time assertion +var _ HealthService = &healthServer{} + +type HealthService interface { + grpc_health_v1.HealthServer + grpcAuth.ServiceAuthFuncOverride +} + +func ProvideHealthService(server DiagnosticsServer) (grpc_health_v1.HealthServer, error) { + h := &healthServer{srv: server} + return h, nil +} + +type healthServer struct { + srv DiagnosticsServer +} + +// AuthFuncOverride for no auth for health service. +func (s *healthServer) AuthFuncOverride(ctx context.Context, _ string) (context.Context, error) { + return ctx, nil +} + +func (s *healthServer) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) { + r, err := s.srv.IsHealthy(ctx, &HealthCheckRequest{}) + if err != nil { + return nil, err + } + + return &grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_ServingStatus(r.Status.Number()), + }, nil +} + +func (s *healthServer) Watch(req *grpc_health_v1.HealthCheckRequest, stream grpc_health_v1.Health_WatchServer) error { + h, err := s.srv.IsHealthy(stream.Context(), &HealthCheckRequest{}) + if err != nil { + return err + } + + // send initial health status + err = stream.Send(&grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_ServingStatus(h.Status.Number()), + }) + if err != nil { + return err + } + + currHealth := h.Status.Number() + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + // get current health status + h, err := s.srv.IsHealthy(stream.Context(), &HealthCheckRequest{}) + if err != nil { + return err + } + + // if health status has not changed, continue + if h.Status.Number() == currHealth { + continue + } + + // send the new health status + currHealth = h.Status.Number() + err = stream.Send(&grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_ServingStatus(h.Status.Number()), + }) + if err != nil { + return err + } + case <-stream.Context().Done(): + return errors.New("stream closed, context cancelled") + } + } +} diff --git a/pkg/storage/unified/resource/health_test.go b/pkg/storage/unified/resource/health_test.go new file mode 100644 index 00000000000..4aebcbb1fe5 --- /dev/null +++ b/pkg/storage/unified/resource/health_test.go @@ -0,0 +1,123 @@ +package resource + +import ( + "context" + "errors" + sync "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/health/grpc_health_v1" +) + +func TestHealthCheck(t *testing.T) { + t.Run("will return serving response when healthy", func(t *testing.T) { + stub := &diag{healthResponse: HealthCheckResponse_SERVING} + svc, err := ProvideHealthService(stub) + require.NoError(t, err) + + req := &grpc_health_v1.HealthCheckRequest{} + res, err := svc.Check(context.Background(), req) + + require.NoError(t, err) + assert.Equal(t, grpc_health_v1.HealthCheckResponse_SERVING, res.Status) + }) + + t.Run("will return not serving response when not healthy", func(t *testing.T) { + stub := &diag{healthResponse: HealthCheckResponse_NOT_SERVING} + svc, err := ProvideHealthService(stub) + require.NoError(t, err) + + req := &grpc_health_v1.HealthCheckRequest{} + res, err := svc.Check(context.Background(), req) + + require.NoError(t, err) + assert.Equal(t, grpc_health_v1.HealthCheckResponse_NOT_SERVING, res.Status) + }) +} + +func TestHealthWatch(t *testing.T) { + t.Run("watch will return message when called", func(t *testing.T) { + stub := &diag{healthResponse: HealthCheckResponse_SERVING} + svc, err := ProvideHealthService(stub) + require.NoError(t, err) + + req := &grpc_health_v1.HealthCheckRequest{} + stream := &fakeHealthWatchServer{} + go func() { + err := svc.Watch(req, stream) + require.NoError(t, err) + }() + + time.Sleep(100 * time.Millisecond) + err = stream.RecvMsg(nil) + require.NoError(t, err) + }) + + t.Run("watch will return error when context cancelled", func(t *testing.T) { + stub := &diag{healthResponse: HealthCheckResponse_NOT_SERVING} + svc, err := ProvideHealthService(stub) + require.NoError(t, err) + + req := &grpc_health_v1.HealthCheckRequest{} + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + stream := &fakeHealthWatchServer{context: ctx} + err = svc.Watch(req, stream) + + require.Error(t, err) + }) +} + +var _ DiagnosticsServer = &diag{} + +type diag struct { + healthResponse HealthCheckResponse_ServingStatus + error error +} + +func (s *diag) IsHealthy(ctx context.Context, req *HealthCheckRequest) (*HealthCheckResponse, error) { + if s.error != nil { + return nil, s.error + } + + return &HealthCheckResponse{Status: s.healthResponse}, nil +} + +type fakeHealthWatchServer struct { + mu sync.Mutex + grpc.ServerStream + healthChecks []*grpc_health_v1.HealthCheckResponse + context context.Context +} + +func (f *fakeHealthWatchServer) Send(resp *grpc_health_v1.HealthCheckResponse) error { + f.mu.Lock() + defer f.mu.Unlock() + f.healthChecks = append(f.healthChecks, resp) + return nil +} + +func (f *fakeHealthWatchServer) RecvMsg(m interface{}) error { + f.mu.Lock() + defer f.mu.Unlock() + if len(f.healthChecks) == 0 { + return errors.New("no health checks received") + } + f.healthChecks = f.healthChecks[1:] + return nil +} + +func (f *fakeHealthWatchServer) SendMsg(m interface{}) error { + return errors.New("not implemented") +} + +func (f *fakeHealthWatchServer) Context() context.Context { + if f.context == nil { + f.context = context.Background() + } + return f.context +} diff --git a/pkg/storage/unified/sql/backend.go b/pkg/storage/unified/sql/backend.go index 1c4a6845b23..8d64f968108 100644 --- a/pkg/storage/unified/sql/backend.go +++ b/pkg/storage/unified/sql/backend.go @@ -9,15 +9,16 @@ import ( "time" "github.com/google/uuid" - "go.opentelemetry.io/otel/trace" - "go.opentelemetry.io/otel/trace/noop" - "google.golang.org/protobuf/proto" - "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/storage/unified/resource" "github.com/grafana/grafana/pkg/storage/unified/sql/db" "github.com/grafana/grafana/pkg/storage/unified/sql/dbutil" "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" + "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/trace/noop" + "google.golang.org/protobuf/proto" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime/schema" ) const trace_prefix = "sql.resource." @@ -314,7 +315,10 @@ func (b *backend) Read(ctx context.Context, req *resource.ReadRequest) (*resourc res, err := dbutil.QueryRow(ctx, b.db, sr, readReq) if errors.Is(err, sql.ErrNoRows) { - return nil, resource.ErrNotFound + return nil, apierrors.NewNotFound(schema.GroupResource{ + Group: req.Key.Group, + Resource: req.Key.Resource, + }, req.Key.Name) } else if err != nil { return nil, fmt.Errorf("get resource version: %w", err) } @@ -540,7 +544,7 @@ func fetchLatestRV(ctx context.Context, x db.ContextExecer, d sqltemplate.Dialec resourceVersion: new(resourceVersion), }) if errors.Is(err, sql.ErrNoRows) { - return 0, fmt.Errorf("now row for the provided resource version") + return 1, nil } else if err != nil { return 0, fmt.Errorf("get resource version: %w", err) } diff --git a/pkg/storage/unified/sql/service.go b/pkg/storage/unified/sql/service.go new file mode 100644 index 00000000000..8ad9d9bd134 --- /dev/null +++ b/pkg/storage/unified/sql/service.go @@ -0,0 +1,134 @@ +package sql + +import ( + "context" + + "github.com/grafana/dskit/services" + "github.com/prometheus/client_golang/prometheus" + "google.golang.org/grpc/health/grpc_health_v1" + + infraDB "github.com/grafana/grafana/pkg/infra/db" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/tracing" + "github.com/grafana/grafana/pkg/modules" + "github.com/grafana/grafana/pkg/services/featuremgmt" + "github.com/grafana/grafana/pkg/services/grpcserver" + "github.com/grafana/grafana/pkg/services/grpcserver/interceptors" + "github.com/grafana/grafana/pkg/setting" + "github.com/grafana/grafana/pkg/storage/unified/resource" + "github.com/grafana/grafana/pkg/storage/unified/resource/grpc" +) + +var ( + _ Service = (*service)(nil) +) + +type Service interface { + services.NamedService +} + +type service struct { + *services.BasicService + + cfg *setting.Cfg + features featuremgmt.FeatureToggles + db infraDB.DB + stopCh chan struct{} + stoppedCh chan error + + handler grpcserver.Provider + + tracing *tracing.TracingService + + authenticator interceptors.Authenticator + + log log.Logger +} + +func ProvideService( + cfg *setting.Cfg, + features featuremgmt.FeatureToggles, + db infraDB.DB, + log log.Logger, +) (*service, error) { + tracingCfg, err := tracing.ProvideTracingConfig(cfg) + if err != nil { + return nil, err + } + tracingCfg.ServiceName = "unified-storage" + + tracing, err := tracing.ProvideService(tracingCfg) + if err != nil { + return nil, err + } + + authn := &grpc.Authenticator{} + + s := &service{ + cfg: cfg, + features: features, + stopCh: make(chan struct{}), + authenticator: authn, + tracing: tracing, + db: db, + log: log, + } + + // This will be used when running as a dskit service + s.BasicService = services.NewBasicService(s.start, s.running, nil).WithName(modules.StorageServer) + + return s, nil +} + +func (s *service) start(ctx context.Context) error { + server, err := ProvideResourceServer(s.db, s.cfg, s.features, s.tracing) + if err != nil { + return err + } + s.handler, err = grpcserver.ProvideService(s.cfg, s.features, s.authenticator, s.tracing, prometheus.DefaultRegisterer) + if err != nil { + return err + } + + healthService, err := resource.ProvideHealthService(server) + if err != nil { + return err + } + + resource.RegisterResourceStoreServer(s.handler.GetServer(), server) + grpc_health_v1.RegisterHealthServer(s.handler.GetServer(), healthService) + + // register reflection service + _, err = grpcserver.ProvideReflectionService(s.cfg, s.handler) + if err != nil { + return err + } + + // start the gRPC server + go func() { + err := s.handler.Run(ctx) + if err != nil { + s.stoppedCh <- err + } else { + s.stoppedCh <- nil + } + }() + return nil +} + +// GetAddress returns the address of the gRPC server. +func (s *service) GetAddress() string { + return s.handler.GetAddress() +} + +func (s *service) running(ctx context.Context) error { + select { + case err := <-s.stoppedCh: + if err != nil { + return err + } + case <-ctx.Done(): + close(s.stopCh) + } + return nil +} diff --git a/pkg/storage/unified/sql/test/integration_test.go b/pkg/storage/unified/sql/test/integration_test.go index 98cea330d73..e153bc530ea 100644 --- a/pkg/storage/unified/sql/test/integration_test.go +++ b/pkg/storage/unified/sql/test/integration_test.go @@ -7,7 +7,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/trace/noop" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "github.com/grafana/dskit/services" + "github.com/grafana/grafana/pkg/apimachinery/identity" infraDB "github.com/grafana/grafana/pkg/infra/db" "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/setting" @@ -279,6 +283,75 @@ func TestBackendPrepareList(t *testing.T) { assert.Equal(t, int64(4), continueToken.StartOffset) }) } +func TestClientServer(t *testing.T) { + ctx := context.Background() + dbstore := infraDB.InitTestDB(t) + + cfg := setting.NewCfg() + cfg.GRPCServerAddress = "localhost:0" + cfg.GRPCServerNetwork = "tcp" + + features := featuremgmt.WithFeatures(featuremgmt.FlagUnifiedStorage) + + svc, err := sql.ProvideService(cfg, features, dbstore, nil) + require.NoError(t, err) + var client resource.ResourceStoreClient + + // Test with an admin identity + clientCtx := identity.WithRequester(context.Background(), &identity.StaticRequester{ + Type: identity.TypeUser, + Login: "testuser", + UserID: 123, + UserUID: "u123", + OrgRole: identity.RoleAdmin, + IsGrafanaAdmin: true, // can do anything + }) + + t.Run("Start and stop service", func(t *testing.T) { + err = services.StartAndAwaitRunning(ctx, svc) + require.NoError(t, err) + require.NotEmpty(t, svc.GetAddress()) + }) + + t.Run("Create a client", func(t *testing.T) { + conn, err := grpc.NewClient(svc.GetAddress(), grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + client = resource.NewResourceStoreClientGRPC(conn) + }) + + t.Run("Create a resource", func(t *testing.T) { + raw := []byte(`{ + "apiVersion": "group/v0alpha1", + "kind": "resource", + "metadata": { + "name": "item1", + "namespace": "namespace" + }, + "spec": {} + }`) + resp, err := client.Create(clientCtx, &resource.CreateRequest{ + Key: resourceKey("item1"), + Value: raw, + }) + require.NoError(t, err) + require.Empty(t, resp.Error) + require.Greater(t, resp.ResourceVersion, int64(0)) + }) + + t.Run("Read the resource", func(t *testing.T) { + resp, err := client.Read(clientCtx, &resource.ReadRequest{ + Key: resourceKey("item1"), + }) + require.NoError(t, err) + require.Empty(t, resp.Error) + require.Greater(t, resp.ResourceVersion, int64(0)) + }) + + t.Run("Stop the service", func(t *testing.T) { + err = services.StopAndAwaitTerminated(ctx, svc) + require.NoError(t, err) + }) +} func writeEvent(ctx context.Context, store sql.Backend, name string, action resource.WatchEvent_Type) (int64, error) { return store.WriteEvent(ctx, resource.WriteEvent{