diff --git a/go.mod b/go.mod index 9dfd7a46739..b1a4fc02f40 100644 --- a/go.mod +++ b/go.mod @@ -306,7 +306,7 @@ require ( github.com/grafana/regexp v0.0.0-20221123153739-15dc172cd2db // indirect github.com/grafana/sqlds/v3 v3.2.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.1-0.20191002090509-6af20e3a5340 // indirect; @grafana/plugins-platform-backend - github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-immutable-radix v1.3.1 // indirect github.com/hashicorp/go-msgpack v0.5.5 // indirect @@ -380,7 +380,7 @@ require ( github.com/redis/rueidis v1.0.16 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/rivo/uniseg v0.4.7 // indirect - github.com/rs/cors v1.10.1 // indirect + github.com/rs/cors v1.10.1 github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect github.com/segmentio/asm v1.2.0 // indirect diff --git a/pkg/services/apiserver/service.go b/pkg/services/apiserver/service.go index 6a772a7c771..5be784f1f57 100644 --- a/pkg/services/apiserver/service.go +++ b/pkg/services/apiserver/service.go @@ -45,8 +45,8 @@ import ( "github.com/grafana/grafana/pkg/services/store/entity/sqlstash" "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/storage/unified/apistore" + "github.com/grafana/grafana/pkg/storage/unified/entitybridge" "github.com/grafana/grafana/pkg/storage/unified/resource" - "github.com/grafana/grafana/pkg/storage/unified/sqlobj" ) var ( @@ -267,20 +267,13 @@ func (s *service) start(ctx context.Context) error { return fmt.Errorf("unified storage requires the unifiedStorage feature flag") } - // resourceServer, err := entitybridge.ProvideResourceServer(s.db, s.cfg, s.features, s.tracing) - // if err != nil { - // return err - // } - - // HACK... for now - resourceServer, err := sqlobj.ProvideSQLResourceServer(s.db, s.tracing) + server, err := entitybridge.ProvideResourceServer(s.db, s.cfg, s.features, s.tracing) if err != nil { return err } - - store := resource.NewLocalResourceStoreClient(resourceServer) - serverConfig.Config.RESTOptionsGetter = apistore.NewRESTOptionsGetter(store, + serverConfig.Config.RESTOptionsGetter = apistore.NewRESTOptionsGetterForServer(server, o.RecommendedOptions.Etcd.StorageConfig.Codec) + case grafanaapiserveroptions.StorageTypeUnifiedNextGrpc: if !s.features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorage) { return fmt.Errorf("unified storage requires the unifiedStorage feature flag") @@ -295,9 +288,9 @@ func (s *service) start(ctx context.Context) error { // defer conn.Close() // Create a client instance - store := resource.NewResourceStoreClientGRPC(conn) + client := resource.NewResourceStoreClientGRPC(conn) - serverConfig.Config.RESTOptionsGetter = apistore.NewRESTOptionsGetter(store, o.RecommendedOptions.Etcd.StorageConfig.Codec) + serverConfig.Config.RESTOptionsGetter = apistore.NewRESTOptionsGetter(client, o.RecommendedOptions.Etcd.StorageConfig.Codec) case grafanaapiserveroptions.StorageTypeUnified: if !s.features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorage) { diff --git a/pkg/services/sqlstore/migrations/migrations.go b/pkg/services/sqlstore/migrations/migrations.go index f867f2f0de6..018d59d4c88 100644 --- a/pkg/services/sqlstore/migrations/migrations.go +++ b/pkg/services/sqlstore/migrations/migrations.go @@ -9,6 +9,7 @@ import ( "github.com/grafana/grafana/pkg/services/sqlstore/migrations/ssosettings" "github.com/grafana/grafana/pkg/services/sqlstore/migrations/ualert" . "github.com/grafana/grafana/pkg/services/sqlstore/migrator" + basicResourceMigrations "github.com/grafana/grafana/pkg/storage/unified/basic/migrations" ) // --- Migration Guide line --- @@ -124,7 +125,7 @@ func (oss *OSSMigrations) AddMigration(mg *Migrator) { ualert.AddRecordingRuleColumns(mg) - addObjectMigrations(mg) + basicResourceMigrations.AddBasicResourceMigrations(mg) } func addStarMigrations(mg *Migrator) { diff --git a/pkg/storage/unified/apistore/restoptions.go b/pkg/storage/unified/apistore/restoptions.go index 3b33f5f4604..3346331094a 100644 --- a/pkg/storage/unified/apistore/restoptions.go +++ b/pkg/storage/unified/apistore/restoptions.go @@ -21,14 +21,21 @@ import ( var _ generic.RESTOptionsGetter = (*RESTOptionsGetter)(nil) type RESTOptionsGetter struct { - store resource.ResourceStoreClient - Codec runtime.Codec + client resource.ResourceStoreClient + Codec runtime.Codec } -func NewRESTOptionsGetter(store resource.ResourceStoreClient, codec runtime.Codec) *RESTOptionsGetter { +func NewRESTOptionsGetterForServer(server resource.ResourceServer, codec runtime.Codec) *RESTOptionsGetter { return &RESTOptionsGetter{ - store: store, - Codec: codec, + client: resource.NewLocalResourceStoreClient(server), + Codec: codec, + } +} + +func NewRESTOptionsGetter(client resource.ResourceStoreClient, codec runtime.Codec) *RESTOptionsGetter { + return &RESTOptionsGetter{ + client: client, + Codec: codec, } } @@ -67,7 +74,7 @@ func (f *RESTOptionsGetter) GetRESTOptions(resource schema.GroupResource) (gener trigger storage.IndexerFuncs, indexers *cache.Indexers, ) (storage.Interface, factory.DestroyFunc, error) { - return NewStorage(config, resource, f.store, f.Codec, keyFunc, newFunc, newListFunc, getAttrsFunc) + return NewStorage(config, resource, f.client, f.Codec, keyFunc, newFunc, newListFunc, getAttrsFunc) }, DeleteCollectionWorkers: 0, EnableGarbageCollection: false, diff --git a/pkg/storage/unified/basic/basic_sql_backend.go b/pkg/storage/unified/basic/basic_sql_backend.go new file mode 100644 index 00000000000..d746a0d73fe --- /dev/null +++ b/pkg/storage/unified/basic/basic_sql_backend.go @@ -0,0 +1,227 @@ +package basic + +import ( + "context" + "database/sql" + "fmt" + "log/slog" + "time" + + "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/trace/noop" + "k8s.io/apimachinery/pkg/runtime/schema" + + "github.com/grafana/grafana/pkg/infra/db" + "github.com/grafana/grafana/pkg/services/sqlstore/session" + "github.com/grafana/grafana/pkg/storage/unified/resource" +) + +const trace_prefix = "basic.sql.resource." +const table_name = "basic_resource" + +type ResourceServerOptions struct { + DB db.DB + GroupResource schema.GroupResource + Tracer trace.Tracer + MaxItems int +} + +// This storage engine is not designed to support large collections +// The goal with this package is a production ready implementation that +// can support modest requirements. By design, this will scan all +// results on all list operations, so we do not want this to grow too big +func NewResourceServer(opts ResourceServerOptions) (resource.ResourceServer, error) { + if opts.Tracer == nil { + opts.Tracer = noop.NewTracerProvider().Tracer("resource-server") + } + + store := &basicSQLBackend{ + db: opts.DB, + gr: opts.GroupResource, + tracer: opts.Tracer, + log: slog.Default().With("logger", "basic-sql-resource"), + } + + return resource.NewResourceServer(resource.ResourceServerOptions{ + Tracer: opts.Tracer, + Backend: store, + Diagnostics: store, + Lifecycle: store, + }) +} + +type basicSQLBackend struct { + log *slog.Logger + db db.DB + gr schema.GroupResource + maxItems int + tracer trace.Tracer + + // Simple watch stream -- NOTE, this only works for single tenant! + broadcaster resource.Broadcaster[*resource.WrittenEvent] + + stream chan<- *resource.WrittenEvent +} + +func (s *basicSQLBackend) Init() (err error) { + s.broadcaster, err = resource.NewBroadcaster(context.Background(), func(c chan<- *resource.WrittenEvent) error { + s.stream = c + return nil + }) + return +} + +func (s *basicSQLBackend) IsHealthy(ctx context.Context, r *resource.HealthCheckRequest) (*resource.HealthCheckResponse, error) { + return &resource.HealthCheckResponse{Status: resource.HealthCheckResponse_SERVING}, nil +} + +func (s *basicSQLBackend) Stop() { + if s.stream != nil { + close(s.stream) + } +} + +func (s *basicSQLBackend) validateKey(key *resource.ResourceKey) error { + if s.gr.Group != "" && s.gr.Group != key.Group { + return fmt.Errorf("expected group: %s, found: %s", s.gr.Group, key.Group) + } + if s.gr.Resource != "" && s.gr.Resource != key.Resource { + return fmt.Errorf("expected resource: %s, found: %s", s.gr.Resource, key.Resource) + } + return nil +} + +func (s *basicSQLBackend) WriteEvent(ctx context.Context, event resource.WriteEvent) (rv int64, err error) { + _, span := s.tracer.Start(ctx, trace_prefix+"WriteEvent") + defer span.End() + + key := event.Key + err = s.validateKey(key) + if err != nil { + return + } + gvk := event.Object.GetGroupVersionKind() + + // This delegates resource version creation to auto-increment + // At scale, this is not a great strategy since everything is locked across all resources while this executes + appender := func(tx *session.SessionTx) (int64, error) { + return tx.ExecWithReturningId(ctx, + `INSERT INTO `+table_name+` (api_group,api_version,namespace,resource,name,value) VALUES($1,$2,$3,$4,$5,$6)`, + key.Group, gvk.Version, key.Namespace, key.Resource, key.Name, event.Value) + } + + wiper := func(tx *session.SessionTx) (sql.Result, error) { + return tx.Exec(ctx, `DELETE FROM `+table_name+` WHERE `+ + `api_group=$1 AND `+ + `namespace=$2 AND `+ + `resource=$3 AND `+ + `name=$4`, + key.Group, key.Namespace, key.Resource, key.Name) + } + + err = s.db.GetSqlxSession().WithTransaction(ctx, func(tx *session.SessionTx) error { + switch event.Type { + case resource.WatchEvent_ADDED: + count := 0 + err = tx.Get(ctx, &count, `SELECT count(*) FROM `+table_name+` WHERE api_group=$1 AND resource=$2`, key.Group, key.Resource) + if err != nil { + return err + } + if count >= s.maxItems { + return fmt.Errorf("the storage backend only supports %d items", s.maxItems) + } + rv, err = appender(tx) + + case resource.WatchEvent_MODIFIED: + _, err = wiper(tx) + if err == nil { + rv, err = appender(tx) + } + case resource.WatchEvent_DELETED: + _, err = wiper(tx) + default: + return fmt.Errorf("unsupported event type") + } + return err + }) + + // Async notify all subscribers + if s.stream != nil { + go func() { + write := &resource.WrittenEvent{ + WriteEvent: event, + Timestamp: time.Now().UnixMilli(), + ResourceVersion: rv, + } + s.stream <- write + }() + } + return +} + +func (s *basicSQLBackend) WatchWriteEvents(ctx context.Context) (<-chan *resource.WrittenEvent, error) { + return s.broadcaster.Subscribe(ctx) +} + +func (s *basicSQLBackend) Read(ctx context.Context, req *resource.ReadRequest) (*resource.ReadResponse, error) { + _, span := s.tracer.Start(ctx, trace_prefix+"Read") + defer span.End() + + key := req.Key + err := s.validateKey(key) + if err != nil { + return nil, err + } + + rows, err := s.db.GetSqlxSession().Query(ctx, "SELECT rv,value FROM "+table_name+" WHERE api_group=$1 AND namespace=$2 AND resource=$3 AND name=$4", + key.Group, key.Namespace, key.Resource, key.Name) + if err != nil { + return nil, err + } + if rows.Next() { + rsp := &resource.ReadResponse{} + err = rows.Scan(&rsp.ResourceVersion, &rsp.Value) + if err == nil && rows.Next() { + return nil, fmt.Errorf("unexpected multiple results found") // should not be possible with the index strategy + } + return rsp, err + } + return nil, fmt.Errorf("NOT FOUND ERROR") +} + +// This implementation is only ever called from inside single tenant grafana, so there is no need to decode +// the value and try filtering first -- that will happen one layer up anyway +func (s *basicSQLBackend) PrepareList(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) { + if req.NextPageToken != "" { + return nil, fmt.Errorf("this storage backend does not support paging") + } + _, span := s.tracer.Start(ctx, trace_prefix+"PrepareList") + defer span.End() + + key := req.Options.Key + err := s.validateKey(key) + if err != nil { + return nil, err + } + rsp := &resource.ListResponse{} + rows, err := s.db.GetSqlxSession().Query(ctx, + "SELECT rv,value FROM "+table_name+ + " WHERE api_group=$1 AND namespace=$2 AND resource=$3 "+ + " ORDER BY name asc LIMIT $4", + key.Group, key.Namespace, key.Resource, s.maxItems+1) + if err != nil { + return nil, err + } + for rows.Next() { + wrapper := &resource.ResourceWrapper{} + err = rows.Scan(&wrapper.ResourceVersion, &wrapper.Value) + if err != nil { + break + } + rsp.Items = append(rsp.Items, wrapper) + } + if len(rsp.Items) > s.maxItems { + err = fmt.Errorf("more values that are supported by this storage engine") + } + return rsp, err +} diff --git a/pkg/services/sqlstore/migrations/object_mig.go b/pkg/storage/unified/basic/migrations/migrations.go similarity index 55% rename from pkg/services/sqlstore/migrations/object_mig.go rename to pkg/storage/unified/basic/migrations/migrations.go index 3bdaf1751a2..6b11d762539 100644 --- a/pkg/services/sqlstore/migrations/object_mig.go +++ b/pkg/storage/unified/basic/migrations/migrations.go @@ -2,19 +2,17 @@ package migrations import "github.com/grafana/grafana/pkg/services/sqlstore/migrator" -// Add SQL table for a simple unified storage object backend -// NOTE: it would be nice to have this defined in the unified storage package, however that -// introduces a circular dependency. -func addObjectMigrations(mg *migrator.Migrator) { - mg.AddMigration("create unified storage object table", migrator.NewAddTableMigration(migrator.Table{ - Name: "object", +func AddBasicResourceMigrations(mg *migrator.Migrator) { + mg.AddMigration("create unified storage basic resource table", migrator.NewAddTableMigration(migrator.Table{ + Name: "basic_resource", Columns: []*migrator.Column{ // Sequential resource version {Name: "rv", Type: migrator.DB_BigInt, Nullable: false, IsPrimaryKey: true, IsAutoIncrement: true}, // Properties that exist in path/key (and duplicated in the json value) - {Name: "group", Type: migrator.DB_NVarchar, Length: 190, Nullable: false}, - {Name: "namespace", Type: migrator.DB_NVarchar, Length: 63, Nullable: true}, // namespace is not required (cluster scope) + {Name: "api_group", Type: migrator.DB_NVarchar, Length: 190, Nullable: false}, // avoid "group" so escaping is easier :) + {Name: "api_version", Type: migrator.DB_NVarchar, Length: 32, Nullable: false}, // informational + {Name: "namespace", Type: migrator.DB_NVarchar, Length: 63, Nullable: true}, // namespace is not required (cluster scope) {Name: "resource", Type: migrator.DB_NVarchar, Length: 190, Nullable: false}, {Name: "name", Type: migrator.DB_NVarchar, Length: 190, Nullable: false}, @@ -22,7 +20,7 @@ func addObjectMigrations(mg *migrator.Migrator) { {Name: "value", Type: migrator.DB_MediumText, Nullable: false}, }, Indices: []*migrator.Index{ - {Cols: []string{"group", "namespace", "resource", "name"}, Type: migrator.UniqueIndex}, + {Cols: []string{"api_group", "resource", "namespace", "name"}, Type: migrator.UniqueIndex}, }, })) } diff --git a/pkg/storage/unified/resource/go.mod b/pkg/storage/unified/resource/go.mod index 99f949a0bb0..4574bb9f95f 100644 --- a/pkg/storage/unified/resource/go.mod +++ b/pkg/storage/unified/resource/go.mod @@ -3,7 +3,6 @@ module github.com/grafana/grafana/pkg/storage/unified/resource go 1.21.10 require ( - github.com/bwmarrin/snowflake v0.3.0 github.com/fullstorydev/grpchan v1.1.1 github.com/google/uuid v1.6.0 github.com/grafana/authlib v0.0.0-20240611075137-331cbe4e840f diff --git a/pkg/storage/unified/resource/go.sum b/pkg/storage/unified/resource/go.sum index fe8c86a681a..d1d127ec502 100644 --- a/pkg/storage/unified/resource/go.sum +++ b/pkg/storage/unified/resource/go.sum @@ -25,7 +25,6 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.16.3 h1:cJGRyzCSVwZC7zZZ1xbx9m32UnrK github.com/aws/smithy-go v1.11.2 h1:eG/N+CcUMAvsdffgMvjMKwfyDzIkjM6pfxMJ8Mzc6mE= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/bufbuild/protocompile v0.4.0 h1:LbFKd2XowZvQ/kajzguUp2DC9UEIQhIq77fZZlaQsNA= -github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= diff --git a/pkg/storage/unified/sqlobj/sql_resources.go b/pkg/storage/unified/sqlobj/sql_resources.go deleted file mode 100644 index 5274fab449e..00000000000 --- a/pkg/storage/unified/sqlobj/sql_resources.go +++ /dev/null @@ -1,181 +0,0 @@ -package sqlobj - -import ( - "context" - "database/sql" - "errors" - "fmt" - "log/slog" - "time" - - "go.opentelemetry.io/otel/trace" - - "github.com/grafana/grafana/pkg/infra/db" - "github.com/grafana/grafana/pkg/infra/tracing" - "github.com/grafana/grafana/pkg/services/sqlstore/session" - "github.com/grafana/grafana/pkg/storage/unified/resource" -) - -// Package-level errors. -var ( - ErrNotImplementedYet = errors.New("not implemented yet (sqlobj)") -) - -func ProvideSQLResourceServer(db db.DB, tracer tracing.Tracer) (resource.ResourceServer, error) { - store := &sqlResourceStore{ - db: db, - log: slog.Default().With("logger", "unistore-sql-objects"), - tracer: tracer, - } - - return resource.NewResourceServer(resource.ResourceServerOptions{ - Tracer: tracer, - Backend: store, - Diagnostics: store, - Lifecycle: store, - }) -} - -type sqlResourceStore struct { - log *slog.Logger - db db.DB - tracer trace.Tracer - - broadcaster resource.Broadcaster[*resource.WrittenEvent] - - // Simple watch stream -- NOTE, this only works for single tenant! - stream chan<- *resource.WrittenEvent -} - -func (s *sqlResourceStore) Init() (err error) { - s.broadcaster, err = resource.NewBroadcaster(context.Background(), func(c chan<- *resource.WrittenEvent) error { - s.stream = c - return nil - }) - return -} - -func (s *sqlResourceStore) IsHealthy(ctx context.Context, r *resource.HealthCheckRequest) (*resource.HealthCheckResponse, error) { - return &resource.HealthCheckResponse{Status: resource.HealthCheckResponse_SERVING}, nil -} - -func (s *sqlResourceStore) Stop() { - if s.stream != nil { - close(s.stream) - } -} - -func (s *sqlResourceStore) WriteEvent(ctx context.Context, event resource.WriteEvent) (rv int64, err error) { - _, span := s.tracer.Start(ctx, "sql_resource.WriteEvent") - defer span.End() - - key := event.Key - - // This delegates resource version creation to auto-increment - // At scale, this is not a great strategy since everything is locked across all resources while this executes - appender := func(tx *session.SessionTx) (int64, error) { - return tx.ExecWithReturningId(ctx, - `INSERT INTO "object" ("group","namespace","resource","name","value") VALUES($1,$2,$3,$4,$5)`, - key.Group, key.Namespace, key.Resource, key.Name, event.Value) - } - - wiper := func(tx *session.SessionTx) (sql.Result, error) { - return tx.Exec(ctx, `DELETE FROM "object" WHERE `+ - `"group"=$1 AND `+ - `"namespace"=$2 AND `+ - `"resource"=$3 AND `+ - `"name"=$4`, - key.Group, key.Namespace, key.Resource, key.Name) - } - - err = s.db.GetSqlxSession().WithTransaction(ctx, func(tx *session.SessionTx) error { - switch event.Type { - case resource.WatchEvent_ADDED: - rv, err = appender(tx) - - case resource.WatchEvent_MODIFIED: - _, err = wiper(tx) - if err == nil { - rv, err = appender(tx) - } - case resource.WatchEvent_DELETED: - _, err = wiper(tx) - default: - return fmt.Errorf("unsupported event type") - } - return err - }) - - // Async notify all subscribers - if s.stream != nil { - go func() { - write := &resource.WrittenEvent{ - WriteEvent: event, - Timestamp: time.Now().UnixMilli(), - ResourceVersion: rv, - } - s.stream <- write - }() - } - return -} - -func (s *sqlResourceStore) WatchWriteEvents(ctx context.Context) (<-chan *resource.WrittenEvent, error) { - return s.broadcaster.Subscribe(ctx) -} - -func (s *sqlResourceStore) Read(ctx context.Context, req *resource.ReadRequest) (*resource.ReadResponse, error) { - _, span := s.tracer.Start(ctx, "storage_server.GetResource") - defer span.End() - - key := req.Key - rows, err := s.db.GetSqlxSession().Query(ctx, "SELECT rv,value FROM object WHERE group=$1 AND namespace=$2 AND resource=$3 AND name=$4", - key.Group, key.Namespace, key.Resource, key.Name) - if err != nil { - return nil, err - } - if rows.Next() { - rsp := &resource.ReadResponse{} - err = rows.Scan(&rsp.ResourceVersion, &rsp.Value) - if err == nil && rows.Next() { - return nil, fmt.Errorf("unexpected multiple results found") // should not be possible with the index strategy - } - return rsp, err - } - return nil, fmt.Errorf("NOT FOUND ERROR") -} - -// This implementation is only ever called from inside single tenant grafana, so there is no need to decode -// the value and try filtering first -- that will happen one layer up anyway -func (s *sqlResourceStore) PrepareList(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) { - _, span := s.tracer.Start(ctx, "storage_server.List") - defer span.End() - - if req.NextPageToken != "" { - return nil, fmt.Errorf("This storage backend does not support paging") - } - - max := 250 - key := req.Options.Key - rsp := &resource.ListResponse{} - rows, err := s.db.GetSqlxSession().Query(ctx, - "SELECT rv,value FROM object \n"+ - ` WHERE "group"=$1 AND namespace=$2 AND resource=$3 `+ - " ORDER BY name asc LIMIT $4", - key.Group, key.Namespace, key.Resource, max+1) - if err != nil { - return nil, err - } - for rows.Next() { - wrapper := &resource.ResourceWrapper{} - err = rows.Scan(&wrapper.ResourceVersion, &wrapper.Value) - if err != nil { - break - } - rsp.Items = append(rsp.Items, wrapper) - } - if len(rsp.Items) > max { - err = fmt.Errorf("more values that are supported by this storage engine") - } - return rsp, err -}