diff --git a/pkg/registry/apis/dashboard/legacy/sql_dashboards.go b/pkg/registry/apis/dashboard/legacy/sql_dashboards.go index e6174dc679b..8f08769032c 100644 --- a/pkg/registry/apis/dashboard/legacy/sql_dashboards.go +++ b/pkg/registry/apis/dashboard/legacy/sql_dashboards.go @@ -3,6 +3,7 @@ package legacy import ( "context" "database/sql" + "encoding/json" "fmt" "path/filepath" "sync" @@ -38,9 +39,6 @@ type dashboardRow struct { // The folder UID (needed for access control checks) FolderUID string - // Size (in bytes) of the dashboard payload - Bytes int - // The token we can use that will start a new connection that includes // this same dashboard token *continueToken @@ -72,6 +70,16 @@ func NewDashboardAccess(sql db.DB, } } +func (a *dashboardSqlAccess) currentRV(ctx context.Context) (int64, error) { + t := time.Now() + max := "" + err := a.sess.Get(ctx, &max, "SELECT MAX(updated) FROM dashboard") + if err == nil && max != "" { + t, err = time.Parse(time.DateTime, max) + } + return t.UnixMilli(), err +} + const selector = `SELECT dashboard.org_id, dashboard.id, dashboard.uid, dashboard.folder_uid, @@ -107,9 +115,9 @@ const history = `SELECT LEFT OUTER JOIN user AS UpdatedUSER ON dashboard_version.created_by = UpdatedUSER.id WHERE dashboard.is_folder = false` -func (a *dashboardSqlAccess) getRows(ctx context.Context, query *DashboardQuery) (*rowsWrapper, int, error) { +func (a *dashboardSqlAccess) getRows(ctx context.Context, query *DashboardQuery) (*rowsWrapper, error) { if len(query.Labels) > 0 { - return nil, 0, fmt.Errorf("labels not yet supported") + return nil, fmt.Errorf("labels not yet supported") // if query.Requirements.Folder != nil { // args = append(args, *query.Requirements.Folder) // sqlcmd = fmt.Sprintf("%s AND dashboard.folder_uid=$%d", sqlcmd, len(args)) @@ -119,20 +127,15 @@ func (a *dashboardSqlAccess) getRows(ctx context.Context, query *DashboardQuery) var sqlcmd string args := []any{query.OrgID} - limit := query.Limit - if limit < 1 { - limit = 15 // - } - if query.GetHistory || query.Version > 0 { if query.GetTrash { - return nil, 0, fmt.Errorf("trash not included in history table") + return nil, fmt.Errorf("trash not included in history table") } sqlcmd = fmt.Sprintf("%s AND dashboard.org_id=$%d\n ", history, len(args)) if query.UID == "" { - return nil, 0, fmt.Errorf("history query must have a UID") + return nil, fmt.Errorf("history query must have a UID") } args = append(args, query.UID) @@ -146,8 +149,7 @@ func (a *dashboardSqlAccess) getRows(ctx context.Context, query *DashboardQuery) sqlcmd = fmt.Sprintf("%s AND dashboard_version.version<$%d", sqlcmd, len(args)) } - args = append(args, (limit + 2)) // add more so we can include a next token - sqlcmd = fmt.Sprintf("%s\n ORDER BY dashboard_version.version desc LIMIT $%d", sqlcmd, len(args)) + sqlcmd = fmt.Sprintf("%s\n ORDER BY dashboard_version.version desc", sqlcmd) } else { sqlcmd = fmt.Sprintf("%s AND dashboard.org_id=$%d\n ", selector, len(args)) @@ -156,7 +158,7 @@ func (a *dashboardSqlAccess) getRows(ctx context.Context, query *DashboardQuery) sqlcmd = fmt.Sprintf("%s AND dashboard.uid=$%d", sqlcmd, len(args)) } else if query.LastID > 0 { args = append(args, query.LastID) - sqlcmd = fmt.Sprintf("%s AND dashboard.id>=$%d", sqlcmd, len(args)) + sqlcmd = fmt.Sprintf("%s AND dashboard.id>$%d", sqlcmd, len(args)) } if query.GetTrash { sqlcmd = sqlcmd + " AND dashboard.deleted IS NOT NULL" @@ -164,8 +166,7 @@ func (a *dashboardSqlAccess) getRows(ctx context.Context, query *DashboardQuery) sqlcmd = sqlcmd + " AND dashboard.deleted IS NULL" } - args = append(args, (limit + 2)) // add more so we can include a next token - sqlcmd = fmt.Sprintf("%s\n ORDER BY dashboard.id asc LIMIT $%d", sqlcmd, len(args)) + sqlcmd = fmt.Sprintf("%s\n ORDER BY dashboard.id asc", sqlcmd) } // fmt.Printf("%s // %v\n", sqlcmd, args) @@ -176,7 +177,7 @@ func (a *dashboardSqlAccess) getRows(ctx context.Context, query *DashboardQuery) } rows = nil } - return rows, limit, err + return rows, err } func (a *dashboardSqlAccess) doQuery(ctx context.Context, query string, args ...any) (*rowsWrapper, error) { @@ -196,25 +197,40 @@ func (a *dashboardSqlAccess) doQuery(ctx context.Context, query string, args ... }, err } +var _ resource.ListIterator = (*rowsWrapper)(nil) + type rowsWrapper struct { - a *dashboardSqlAccess - rows *sql.Rows - idx int - total int64 + a *dashboardSqlAccess + rows *sql.Rows canReadDashboard func(scopes ...string) bool + + // Current + row *dashboardRow + err error } func (r *rowsWrapper) Close() error { return r.rows.Close() } -func (r *rowsWrapper) Next() (*dashboardRow, error) { +func (r *rowsWrapper) Next() bool { + if r.err != nil { + return false + } + var err error + // breaks after first readable value for r.rows.Next() { - r.idx++ - d, err := r.a.scanRow(r.rows) - if d != nil { + r.row, err = r.a.scanRow(r.rows) + if err != nil { + r.err = err + return false + } + + if r.row != nil { + d := r.row + // Access control checker scopes := []string{dashboards.ScopeDashboardsProvider.GetResourceScopeUID(d.Dash.Name)} if d.FolderUID != "" { // Copied from searchV2... not sure the logic is right @@ -223,13 +239,44 @@ func (r *rowsWrapper) Next() (*dashboardRow, error) { if !r.canReadDashboard(scopes...) { continue } - r.total += int64(d.Bytes) - } - // returns the first folder it can - return d, err + // returns the first folder it can + return true + } } - return nil, nil + return false +} + +// ContinueToken implements resource.ListIterator. +func (r *rowsWrapper) ContinueToken() string { + return r.row.token.String() +} + +// Error implements resource.ListIterator. +func (r *rowsWrapper) Error() error { + return r.err +} + +// Name implements resource.ListIterator. +func (r *rowsWrapper) Name() string { + return r.row.Dash.Name +} + +// Namespace implements resource.ListIterator. +func (r *rowsWrapper) Namespace() string { + return r.row.Dash.Namespace +} + +// ResourceVersion implements resource.ListIterator. +func (r *rowsWrapper) ResourceVersion() int64 { + return r.row.RV +} + +// Value implements resource.ListIterator. +func (r *rowsWrapper) Value() []byte { + b, err := json.Marshal(r.row.Dash) + r.err = err + return b } func (a *dashboardSqlAccess) scanRow(rows *sql.Rows) (*dashboardRow, error) { @@ -318,14 +365,14 @@ func (a *dashboardSqlAccess) scanRow(rows *sql.Rows) (*dashboardRow, error) { }) } - row.Bytes = len(data) - if row.Bytes > 0 { + if len(data) > 0 { err = dash.Spec.UnmarshalJSON(data) if err != nil { return row, err } - dash.Spec.Set("id", dashboard_id) // add it so we can get it from the body later } + // add it so we can get it from the body later + dash.Spec.Set("id", dashboard_id) } return row, err } diff --git a/pkg/registry/apis/dashboard/legacy/storage.go b/pkg/registry/apis/dashboard/legacy/storage.go index 4c3a87a7c37..14c32b05eee 100644 --- a/pkg/registry/apis/dashboard/legacy/storage.go +++ b/pkg/registry/apis/dashboard/legacy/storage.go @@ -101,9 +101,8 @@ func (a *dashboardSqlAccess) WriteEvent(ctx context.Context, event resource.Writ return rv, err } -// Read implements ResourceStoreServer. func (a *dashboardSqlAccess) GetDashboard(ctx context.Context, orgId int64, uid string, v int64) (*dashboard.Dashboard, int64, error) { - rows, _, err := a.getRows(ctx, &DashboardQuery{ + rows, err := a.getRows(ctx, &DashboardQuery{ OrgID: orgId, UID: uid, Limit: 2, // will only be one! @@ -114,11 +113,13 @@ func (a *dashboardSqlAccess) GetDashboard(ctx context.Context, orgId int64, uid } defer func() { _ = rows.Close() }() - row, err := rows.Next() - if err != nil || row == nil { - return nil, 0, err + if rows.Next() { + row := rows.row + if row != nil { + return row.Dash, row.RV, rows.err + } } - return row.Dash, row.RV, nil + return nil, 0, rows.err } // Read implements ResourceStoreServer. @@ -157,26 +158,22 @@ func (a *dashboardSqlAccess) ReadResource(ctx context.Context, req *resource.Rea } // List implements AppendingStore. -func (a *dashboardSqlAccess) PrepareList(ctx context.Context, req *resource.ListRequest) *resource.ListResponse { - list := &resource.ListResponse{} +func (a *dashboardSqlAccess) ListIterator(ctx context.Context, req *resource.ListRequest, cb func(resource.ListIterator) error) (int64, error) { opts := req.Options info, err := request.ParseNamespace(opts.Key.Namespace) if err == nil { err = isDashboardKey(opts.Key, false) } if err != nil { - list.Error = resource.AsErrorResult(err) - return list + return 0, err } token, err := readContinueToken(req.NextPageToken) if err != nil { - list.Error = resource.AsErrorResult(err) - return list + return 0, err } if token.orgId > 0 && token.orgId != info.OrgID { - list.Error = resource.NewBadRequestError("token and orgID mismatch") - return list + return 0, fmt.Errorf("token and orgID mismatch") } query := &DashboardQuery{ @@ -187,40 +184,20 @@ func (a *dashboardSqlAccess) PrepareList(ctx context.Context, req *resource.List Labels: req.Options.Labels, } - rows, limit, err := a.getRows(ctx, query) + listRV, err := a.currentRV(ctx) if err != nil { - list.Error = resource.AsErrorResult(err) - return list + return 0, err } - defer func() { _ = rows.Close() }() - - totalSize := 0 - for { - row, err := rows.Next() - if err != nil || row == nil { - list.Error = resource.AsErrorResult(err) - return list - } - - totalSize += row.Bytes - if len(list.Items) > 0 && (totalSize > query.MaxBytes || len(list.Items) >= limit) { - // if query.Requirements.Folder != nil { - // row.token.folder = *query.Requirements.Folder - // } - list.NextPageToken = row.token.String() // will skip this one but start here next time - return list - } - // TODO -- make it smaller and stick the body as an annotation... - val, err := json.Marshal(row.Dash) - if err != nil { - list.Error = resource.AsErrorResult(err) - return list - } - list.Items = append(list.Items, &resource.ResourceWrapper{ - ResourceVersion: row.RV, - Value: val, - }) + rows, err := a.getRows(ctx, query) + if rows != nil { + defer func() { + _ = rows.Close() + }() } + if err != nil { + err = cb(rows) + } + return listRV, err } // Watch implements AppendingStore. @@ -290,7 +267,7 @@ func (a *dashboardSqlAccess) History(ctx context.Context, req *resource.HistoryR query.GetHistory = true } - rows, limit, err := a.getRows(ctx, query) + rows, err := a.getRows(ctx, query) if err != nil { return nil, err } @@ -298,21 +275,11 @@ func (a *dashboardSqlAccess) History(ctx context.Context, req *resource.HistoryR totalSize := 0 list := &resource.HistoryResponse{} - for { - row, err := rows.Next() - if err != nil || row == nil { - return list, err - } - - totalSize += row.Bytes - if len(list.Items) > 0 && (totalSize > query.MaxBytes || len(list.Items) >= limit) { - // if query.Requirements.Folder != nil { - // row.token.folder = *query.Requirements.Folder - // } - row.token.id = getVersionFromRV(row.RV) // Use the version as the increment - list.NextPageToken = row.token.String() // will skip this one but start here next time + for rows.Next() { + if rows.err != nil || rows.row == nil { return list, err } + row := rows.row partial := &metav1.PartialObjectMetadata{ ObjectMeta: row.Dash.ObjectMeta, @@ -323,17 +290,25 @@ func (a *dashboardSqlAccess) History(ctx context.Context, req *resource.HistoryR if err != nil { return list, err } - full, err := json.Marshal(row.Dash.Spec) - if err != nil { + + totalSize += len(rows.Value()) + if len(list.Items) > 0 && (totalSize > query.MaxBytes || len(list.Items) >= query.Limit) { + // if query.Requirements.Folder != nil { + // row.token.folder = *query.Requirements.Folder + // } + row.token.id = getVersionFromRV(row.RV) // Use the version as the increment + list.NextPageToken = row.token.String() // will skip this one but start here next time return list, err } + list.Items = append(list.Items, &resource.ResourceMeta{ ResourceVersion: row.RV, PartialObjectMeta: val, - Size: int32(len(full)), + Size: int32(len(rows.Value())), Hash: "??", // hash the full? }) } + return list, err } // Used for efficient provisioning diff --git a/pkg/storage/unified/resource/cdk_backend.go b/pkg/storage/unified/resource/cdk_backend.go index a5a16bab586..3fa6739d1a7 100644 --- a/pkg/storage/unified/resource/cdk_backend.go +++ b/pkg/storage/unified/resource/cdk_backend.go @@ -209,32 +209,13 @@ func isDeletedMarker(raw []byte) bool { return false } -func (s *cdkBackend) PrepareList(ctx context.Context, req *ListRequest) *ListResponse { +func (s *cdkBackend) ListIterator(ctx context.Context, req *ListRequest, cb func(ListIterator) error) (int64, error) { resources, err := buildTree(ctx, s, req.Options.Key) if err != nil { - return &ListResponse{ - Error: AsErrorResult(err), - } + return 0, err } - - rsp := &ListResponse{ - ResourceVersion: s.rv.Load(), - } - for _, item := range resources { - latest := item.versions[0] - raw, err := s.bucket.ReadAll(ctx, latest.key) - if err != nil { - rsp.Error = AsErrorResult(err) - return rsp - } - if !isDeletedMarker(raw) { - rsp.Items = append(rsp.Items, &ResourceWrapper{ - ResourceVersion: latest.rv, - Value: raw, - }) - } - } - return rsp + err = cb(resources) + return resources.listRV, err } func (s *cdkBackend) WatchWriteEvents(ctx context.Context) (<-chan *WrittenEvent, error) { @@ -264,9 +245,82 @@ type cdkVersion struct { key string } -func buildTree(ctx context.Context, s *cdkBackend, key *ResourceKey) ([]cdkResource, error) { - byPrefix := make(map[string]*cdkResource) +type cdkListIterator struct { + bucket *blob.Bucket + ctx context.Context + err error + listRV int64 + resources []cdkResource + index int + + currentRV int64 + currentKey string + currentVal []byte +} + +// Next implements ListIterator. +func (c *cdkListIterator) Next() bool { + if c.err != nil { + return false + } + for { + c.currentVal = nil + c.index += 1 + if c.index >= len(c.resources) { + return false + } + + item := c.resources[c.index] + latest := item.versions[0] + raw, err := c.bucket.ReadAll(c.ctx, latest.key) + if err != nil { + c.err = err + return false + } + if !isDeletedMarker(raw) { + c.currentRV = latest.rv + c.currentKey = latest.key + c.currentVal = raw + return true + } + } +} + +// Error implements ListIterator. +func (c *cdkListIterator) Error() error { + return c.err +} + +// ResourceVersion implements ListIterator. +func (c *cdkListIterator) ResourceVersion() int64 { + return c.currentRV +} + +// Value implements ListIterator. +func (c *cdkListIterator) Value() []byte { + return c.currentVal +} + +// ContinueToken implements ListIterator. +func (c *cdkListIterator) ContinueToken() string { + return fmt.Sprintf("index:%d/key:%s", c.index, c.currentKey) +} + +// Name implements ListIterator. +func (c *cdkListIterator) Name() string { + return c.currentKey // TODO (parse name from key) +} + +// Namespace implements ListIterator. +func (c *cdkListIterator) Namespace() string { + return c.currentKey // TODO (parse namespace from key) +} + +var _ ListIterator = (*cdkListIterator)(nil) + +func buildTree(ctx context.Context, s *cdkBackend, key *ResourceKey) (*cdkListIterator, error) { + byPrefix := make(map[string]*cdkResource) path := s.getPath(key, 0) iter := s.bucket.List(&blob.ListOptions{Prefix: path, Delimiter: ""}) // "" is recursive for { @@ -310,5 +364,11 @@ func buildTree(ctx context.Context, s *cdkBackend, key *ResourceKey) ([]cdkResou return a > b }) - return resources, nil + return &cdkListIterator{ + ctx: ctx, + bucket: s.bucket, + resources: resources, + listRV: s.rv.Load(), + index: -1, // must call next first + }, nil } diff --git a/pkg/storage/unified/resource/server.go b/pkg/storage/unified/resource/server.go index 545d3da42b0..c301e37288d 100644 --- a/pkg/storage/unified/resource/server.go +++ b/pkg/storage/unified/resource/server.go @@ -27,6 +27,30 @@ type ResourceServer interface { LifecycleHooks } +type ListIterator interface { + Next() bool // sql.Rows + + // Iterator error (if exts) + Error() error + + // The token that can be used to start iterating *after* this item + ContinueToken() string + + // ResourceVersion of the current item + ResourceVersion() int64 + + // Namespace of the current item + // Used for fast(er) authz filtering + Namespace() string + + // Name of the current item + // Used for fast(er) authz filtering + Name() string + + // Value for the current item + Value() []byte +} + // The StorageBackend is an internal abstraction that supports interacting with // the underlying raw storage medium. This interface is never exposed directly, // it is provided by concrete instances that actually write values. @@ -39,12 +63,12 @@ type StorageBackend interface { // Read a resource from storage optionally at an explicit version ReadResource(context.Context, *ReadRequest) *ReadResponse - // When the ResourceServer executes a List request, it will first + // When the ResourceServer executes a List request, this iterator will // query the backend for potential results. All results will be // checked against the kubernetes requirements before finally returning // results. The list options can be used to improve performance // but are the the final answer. - PrepareList(context.Context, *ListRequest) *ListResponse + ListIterator(context.Context, *ListRequest, func(ListIterator) error) (int64, error) // Get all events from the store // For HA setups, this will be more events than the local WriteEvent above! @@ -441,10 +465,51 @@ func (s *server) List(ctx context.Context, req *ListRequest) (*ListResponse, err if err := s.Init(ctx); err != nil { return nil, err } + if req.Limit < 1 { + req.Limit = 50 // default max 50 items in a page + } + maxPageBytes := 1024 * 1024 * 2 // 2mb/page + pageBytes := 0 + rsp := &ListResponse{} + rv, err := s.backend.ListIterator(ctx, req, func(iter ListIterator) error { + for iter.Next() { + if err := iter.Error(); err != nil { + return err + } - rsp := s.backend.PrepareList(ctx, req) - // Status??? - return rsp, nil + // TODO: add authz filters + + item := &ResourceWrapper{ + ResourceVersion: iter.ResourceVersion(), + Value: iter.Value(), + } + + pageBytes += len(item.Value) + rsp.Items = append(rsp.Items, item) + if len(rsp.Items) >= int(req.Limit) || pageBytes >= maxPageBytes { + t := iter.ContinueToken() + if iter.Next() { + rsp.NextPageToken = t + } + break + } + } + return nil + }) + if err != nil { + rsp.Error = AsErrorResult(err) + return rsp, nil + } + + if rv < 1 { + rsp.Error = &ErrorResult{ + Code: http.StatusInternalServerError, + Message: fmt.Sprintf("invalid resource version for list: %v", rv), + } + return rsp, nil + } + rsp.ResourceVersion = rv + return rsp, err } func (s *server) initWatcher() error { diff --git a/pkg/storage/unified/sql/backend.go b/pkg/storage/unified/sql/backend.go index 7f4de06694c..d9157131ba9 100644 --- a/pkg/storage/unified/sql/backend.go +++ b/pkg/storage/unified/sql/backend.go @@ -5,6 +5,7 @@ import ( "database/sql" "errors" "fmt" + "math" "sync" "time" @@ -17,6 +18,7 @@ import ( "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace/noop" "google.golang.org/protobuf/proto" + apierrors "k8s.io/apimachinery/pkg/api/errors" ) const trace_prefix = "sql.resource." @@ -323,14 +325,12 @@ func (b *backend) ReadResource(ctx context.Context, req *resource.ReadRequest) * return &res.ReadResponse } -func (b *backend) PrepareList(ctx context.Context, req *resource.ListRequest) *resource.ListResponse { +func (b *backend) ListIterator(ctx context.Context, req *resource.ListRequest, cb func(resource.ListIterator) error) (int64, error) { _, span := b.tracer.Start(ctx, trace_prefix+"List") defer span.End() if req.Options == nil || req.Options.Key.Group == "" || req.Options.Key.Resource == "" { - return &resource.ListResponse{ - Error: resource.NewBadRequestError("missing group or resource"), - } + return 0, fmt.Errorf("missing group or resource") } // TODO: think about how to handler VersionMatch. We should be able to use latest for the first page (only). @@ -338,21 +338,81 @@ func (b *backend) PrepareList(ctx context.Context, req *resource.ListRequest) *r // TODO: add support for RemainingItemCount if req.ResourceVersion > 0 || req.NextPageToken != "" { - return b.listAtRevision(ctx, req) + return b.listAtRevision(ctx, req, cb) } - return b.listLatest(ctx, req) + return b.listLatest(ctx, req, cb) } +type listIter struct { + rows *sql.Rows + offset int64 + listRV int64 + + // any error + err error + + // The row + rv int64 + value []byte + namespace string + name string +} + +// ContinueToken implements resource.ListIterator. +func (l *listIter) ContinueToken() string { + return ContinueToken{ResourceVersion: l.listRV, StartOffset: l.offset}.String() +} + +// Error implements resource.ListIterator. +func (l *listIter) Error() error { + return l.err +} + +// Name implements resource.ListIterator. +func (l *listIter) Name() string { + return l.name +} + +// Namespace implements resource.ListIterator. +func (l *listIter) Namespace() string { + return l.namespace +} + +// ResourceVersion implements resource.ListIterator. +func (l *listIter) ResourceVersion() int64 { + return l.rv +} + +// Value implements resource.ListIterator. +func (l *listIter) Value() []byte { + return l.value +} + +// Next implements resource.ListIterator. +func (l *listIter) Next() bool { + if l.rows.Next() { + l.offset++ + l.err = l.rows.Scan(&l.rv, &l.namespace, &l.name, &l.value) + return true + } + return false +} + +var _ resource.ListIterator = (*listIter)(nil) + // listLatest fetches the resources from the resource table. -func (b *backend) listLatest(ctx context.Context, req *resource.ListRequest) *resource.ListResponse { - out := &resource.ListResponse{ - ResourceVersion: 0, +func (b *backend) listLatest(ctx context.Context, req *resource.ListRequest, cb func(resource.ListIterator) error) (int64, error) { + if req.NextPageToken != "" { + return 0, fmt.Errorf("only works for the first page") + } + if req.ResourceVersion > 0 { + return 0, fmt.Errorf("only works for the 'latest' resource version") } + iter := &listIter{} err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error { var err error - - out.ResourceVersion, err = fetchLatestRV(ctx, tx, b.dialect, req.Options.Key.Group, req.Options.Key.Resource) + iter.listRV, err = fetchLatestRV(ctx, tx, b.dialect, req.Options.Key.Group, req.Options.Key.Resource) if err != nil { return err } @@ -360,96 +420,78 @@ func (b *backend) listLatest(ctx context.Context, req *resource.ListRequest) *re listReq := sqlResourceListRequest{ SQLTemplate: sqltemplate.New(b.dialect), Request: new(resource.ListRequest), - Response: new(resource.ResourceWrapper), } listReq.Request = proto.Clone(req).(*resource.ListRequest) - if req.Limit > 0 { - listReq.Request.Limit++ // fetch one extra row for Limit - } - items, err := dbutil.Query(ctx, tx, sqlResourceList, listReq) + rows, err := dbutil.QueryRows(ctx, tx, sqlResourceList, listReq) + if rows != nil { + defer func() { + if err := rows.Close(); err != nil { + b.log.Warn("listLatest error closing rows", "error", err) + } + }() + } if err != nil { - return fmt.Errorf("list latest resources: %w", err) + return err } - if 0 < req.Limit && int(req.Limit) < len(items) { - // remove the additional item we added synthetically above - clear(items[req.Limit:]) - items = items[:req.Limit] - - out.NextPageToken = ContinueToken{ - ResourceVersion: out.ResourceVersion, - StartOffset: req.Limit, - }.String() - } - out.Items = items - - return nil + iter.rows = rows + return cb(iter) }) - if err != nil { - out.Error = resource.AsErrorResult(err) - } - return out + return iter.listRV, err } // listAtRevision fetches the resources from the resource_history table at a specific revision. -func (b *backend) listAtRevision(ctx context.Context, req *resource.ListRequest) *resource.ListResponse { +func (b *backend) listAtRevision(ctx context.Context, req *resource.ListRequest, cb func(resource.ListIterator) error) (int64, error) { // Get the RV - rv := req.ResourceVersion - offset := int64(0) + iter := &listIter{listRV: req.ResourceVersion} if req.NextPageToken != "" { continueToken, err := GetContinueToken(req.NextPageToken) if err != nil { - return &resource.ListResponse{ - Error: resource.AsErrorResult(fmt.Errorf("get continue token: %w", err)), - } + return 0, fmt.Errorf("get continue token: %w", err) } - rv = continueToken.ResourceVersion - offset = continueToken.StartOffset - } + iter.listRV = continueToken.ResourceVersion + iter.offset = continueToken.StartOffset - out := &resource.ListResponse{ - ResourceVersion: rv, + if req.ResourceVersion != 0 && req.ResourceVersion != iter.listRV { + return 0, apierrors.NewBadRequest("request resource version does not math token") + } + } + if iter.listRV < 1 { + return 0, apierrors.NewBadRequest("expecting an explicit resource version query") } err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error { + limit := int64(0) // ignore limit + if iter.offset > 0 { + limit = math.MaxInt64 // a limit is required for offset + } listReq := sqlResourceHistoryListRequest{ SQLTemplate: sqltemplate.New(b.dialect), Request: &historyListRequest{ - ResourceVersion: rv, - Limit: req.Limit, - Offset: offset, + ResourceVersion: iter.listRV, + Limit: limit, + Offset: iter.offset, Options: req.Options, }, - Response: new(resource.ResourceWrapper), - } - if listReq.Request.Limit > 0 { - listReq.Request.Limit++ // fetch one extra row for Limit } - items, err := dbutil.Query(ctx, tx, sqlResourceHistoryList, listReq) + rows, err := dbutil.QueryRows(ctx, tx, sqlResourceHistoryList, listReq) + if rows != nil { + defer func() { + if err := rows.Close(); err != nil { + b.log.Warn("listAtRevision error closing rows", "error", err) + } + }() + } if err != nil { - return fmt.Errorf("list resources at revision: %w", err) + return err } - if 0 < req.Limit && int(req.Limit) < len(items) { - // remove the additional item we added synthetically above - clear(items[req.Limit:]) - items = items[:req.Limit] - - out.NextPageToken = ContinueToken{ - ResourceVersion: out.ResourceVersion, - StartOffset: req.Limit + offset, - }.String() - } - out.Items = items - - return nil + iter.rows = rows + return cb(iter) }) - if err != nil { - out.Error = resource.AsErrorResult(err) - } - return out + return iter.listRV, err } func (b *backend) WatchWriteEvents(ctx context.Context) (<-chan *resource.WrittenEvent, error) { @@ -616,7 +658,7 @@ func (b *backend) poll(ctx context.Context, grp string, res string, since int64, // in a single roundtrip. This would reduce the latency of the operation, and also increase the // throughput of the system. This is a good candidate for a future optimization. func resourceVersionAtomicInc(ctx context.Context, x db.ContextExecer, d sqltemplate.Dialect, key *resource.ResourceKey) (newVersion int64, err error) { - // TODO: refactor this code to run in a multi-statement transaction in order to minimise the number of roundtrips. + // TODO: refactor this code to run in a multi-statement transaction in order to minimize the number of round trips. // 1 Lock the row for update rv, err := dbutil.QueryRow(ctx, x, sqlResourceVersionGet, sqlResourceVersionRequest{ SQLTemplate: sqltemplate.New(d), @@ -656,6 +698,6 @@ func resourceVersionAtomicInc(ctx context.Context, x db.ContextExecer, d sqltemp return 0, fmt.Errorf("increase resource version: %w", err) } - // 3. Retun the incremended value + // 3. Return the incremented value return nextRV, nil } diff --git a/pkg/storage/unified/sql/data/resource_history_list.sql b/pkg/storage/unified/sql/data/resource_history_list.sql index ec15b144809..62a9205e58d 100644 --- a/pkg/storage/unified/sql/data/resource_history_list.sql +++ b/pkg/storage/unified/sql/data/resource_history_list.sql @@ -1,6 +1,8 @@ SELECT - kv.{{ .Ident "resource_version" | .Into .Response.ResourceVersion }}, - kv.{{ .Ident "value" | .Into .Response.Value }} + kv.{{ .Ident "resource_version" }}, + kv.{{ .Ident "namespace" }}, + kv.{{ .Ident "name" }}, + kv.{{ .Ident "value" }} FROM {{ .Ident "resource_history" }} as kv INNER JOIN ( SELECT {{ .Ident "namespace" }}, {{ .Ident "group" }}, {{ .Ident "resource" }}, {{ .Ident "name" }}, max({{ .Ident "resource_version" }}) AS {{ .Ident "resource_version" }} diff --git a/pkg/storage/unified/sql/data/resource_list.sql b/pkg/storage/unified/sql/data/resource_list.sql index 7b24980e6a9..5b09f8ba61f 100644 --- a/pkg/storage/unified/sql/data/resource_list.sql +++ b/pkg/storage/unified/sql/data/resource_list.sql @@ -1,6 +1,8 @@ SELECT - {{ .Ident "resource_version" | .Into .Response.ResourceVersion }}, - {{ .Ident "value" | .Into .Response.Value }} + {{ .Ident "resource_version" }}, + {{ .Ident "namespace" }}, + {{ .Ident "name" }}, + {{ .Ident "value" }} FROM {{ .Ident "resource" }} WHERE 1 = 1 {{ if and .Request.Options .Request.Options.Key }} @@ -18,7 +20,4 @@ SELECT {{ end }} {{ end }} ORDER BY {{ .Ident "resource_version" }} DESC - {{ if (gt .Request.Limit 0) }} - LIMIT {{ .Arg .Request.Limit }} - {{ end }} ; diff --git a/pkg/storage/unified/sql/dbutil/dbutil.go b/pkg/storage/unified/sql/dbutil/dbutil.go index 5a86fb61c36..fb2ec5a791e 100644 --- a/pkg/storage/unified/sql/dbutil/dbutil.go +++ b/pkg/storage/unified/sql/dbutil/dbutil.go @@ -107,10 +107,8 @@ func Exec(ctx context.Context, x db.ContextExecer, tmpl *template.Template, req } // Query uses `req` as input for a single-statement, set-returning query -// generated with `tmpl`, and executed in `x`. The `Results` method of `req` -// should return a deep copy since it will be used multiple times to decode each -// value. It returns an error if more than one result set is returned. -func Query[T any](ctx context.Context, x db.ContextExecer, tmpl *template.Template, req sqltemplate.WithResults[T]) ([]T, error) { +// generated with `tmpl`, and executed in `x`. +func QueryRows(ctx context.Context, x db.ContextExecer, tmpl *template.Template, req sqltemplate.SQLTemplateIface) (*sql.Rows, error) { if err := req.Validate(); err != nil { return nil, fmt.Errorf("Query: invalid request for template %q: %w", tmpl.Name(), err) @@ -134,6 +132,18 @@ func Query[T any](ctx context.Context, x db.ContextExecer, tmpl *template.Templa RawQuery: rawQuery, } } + return rows, err +} + +// Query uses `req` as input for a single-statement, set-returning query +// generated with `tmpl`, and executed in `x`. The `Results` method of `req` +// should return a deep copy since it will be used multiple times to decode each +// value. It returns an error if more than one result set is returned. +func Query[T any](ctx context.Context, x db.ContextExecer, tmpl *template.Template, req sqltemplate.WithResults[T]) ([]T, error) { + rows, err := QueryRows(ctx, x, tmpl, req) + if err != nil { + return nil, err + } var ret []T for rows.Next() { diff --git a/pkg/storage/unified/sql/queries.go b/pkg/storage/unified/sql/queries.go index 207d9425992..a1b7af5f282 100644 --- a/pkg/storage/unified/sql/queries.go +++ b/pkg/storage/unified/sql/queries.go @@ -114,26 +114,13 @@ func (r sqlResourceReadRequest) Validate() error { // List type sqlResourceListRequest struct { *sqltemplate.SQLTemplate - Request *resource.ListRequest - Response *resource.ResourceWrapper + Request *resource.ListRequest } func (r sqlResourceListRequest) Validate() error { return nil // TODO } -func (r sqlResourceListRequest) Results() (*resource.ResourceWrapper, error) { - // sqlResourceListRequest is a set-returning query. As such, it - // should not return its *Response, since that will be overwritten in the - // next call to `Scan`, so it needs to return a copy of it. Note, though, - // that it is safe to return the same `Response.Value` since `Scan` - // allocates a new slice of bytes each time. - return &resource.ResourceWrapper{ - ResourceVersion: r.Response.ResourceVersion, - Value: r.Response.Value, - }, nil -} - type historyListRequest struct { ResourceVersion, Limit, Offset int64 Options *resource.ListOptions diff --git a/pkg/storage/unified/sql/queries_test.go b/pkg/storage/unified/sql/queries_test.go index d40517dfe73..1f70d8905e1 100644 --- a/pkg/storage/unified/sql/queries_test.go +++ b/pkg/storage/unified/sql/queries_test.go @@ -159,7 +159,6 @@ func TestQueries(t *testing.T) { }, }, }, - Response: new(resource.ResourceWrapper), }, Expected: expected{ "resource_list_mysql_sqlite.sql": dialects{ diff --git a/pkg/storage/unified/sql/test/integration_test.go b/pkg/storage/unified/sql/test/integration_test.go index 0f765831a25..5e02181d8c0 100644 --- a/pkg/storage/unified/sql/test/integration_test.go +++ b/pkg/storage/unified/sql/test/integration_test.go @@ -26,7 +26,7 @@ func TestMain(m *testing.M) { testsuite.Run(m) } -func newServer(t *testing.T) sql.Backend { +func newServer(t *testing.T) (sql.Backend, resource.ResourceServer) { t.Helper() dbstore := infraDB.InitTestDB(t) @@ -48,7 +48,15 @@ func newServer(t *testing.T) sql.Backend { err = ret.Init(testutil.NewDefaultTestContext(t)) require.NoError(t, err) - return ret + server, err := resource.NewResourceServer(resource.ResourceServerOptions{ + Backend: ret, + Diagnostics: ret, + Lifecycle: ret, + }) + require.NoError(t, err) + require.NotNil(t, server) + + return ret, server } func TestIntegrationBackendHappyPath(t *testing.T) { @@ -57,57 +65,65 @@ func TestIntegrationBackendHappyPath(t *testing.T) { t.Skip("skipping integration test") } - ctx := testutil.NewTestContext(t, time.Now().Add(5*time.Second)) - store := newServer(t) + testUserA := &identity.StaticRequester{ + Type: identity.TypeUser, + Login: "testuser", + UserID: 123, + UserUID: "u123", + OrgRole: identity.RoleAdmin, + IsGrafanaAdmin: true, // can do anything + } + ctx := identity.WithRequester(context.Background(), testUserA) + backend, server := newServer(t) - stream, err := store.WatchWriteEvents(ctx) + stream, err := backend.WatchWriteEvents(context.Background()) // Using a different context to avoid canceling the stream after the DefaultContextTimeout require.NoError(t, err) t.Run("Add 3 resources", func(t *testing.T) { - rv, err := writeEvent(ctx, store, "item1", resource.WatchEvent_ADDED) + rv, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_ADDED) require.NoError(t, err) require.Equal(t, int64(1), rv) - rv, err = writeEvent(ctx, store, "item2", resource.WatchEvent_ADDED) + rv, err = writeEvent(ctx, backend, "item2", resource.WatchEvent_ADDED) require.NoError(t, err) require.Equal(t, int64(2), rv) - rv, err = writeEvent(ctx, store, "item3", resource.WatchEvent_ADDED) + rv, err = writeEvent(ctx, backend, "item3", resource.WatchEvent_ADDED) require.NoError(t, err) require.Equal(t, int64(3), rv) }) t.Run("Update item2", func(t *testing.T) { - rv, err := writeEvent(ctx, store, "item2", resource.WatchEvent_MODIFIED) + rv, err := writeEvent(ctx, backend, "item2", resource.WatchEvent_MODIFIED) require.NoError(t, err) require.Equal(t, int64(4), rv) }) t.Run("Delete item1", func(t *testing.T) { - rv, err := writeEvent(ctx, store, "item1", resource.WatchEvent_DELETED) + rv, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_DELETED) require.NoError(t, err) require.Equal(t, int64(5), rv) }) t.Run("Read latest item 2", func(t *testing.T) { - resp := store.ReadResource(ctx, &resource.ReadRequest{Key: resourceKey("item2")}) - require.NoError(t, err) + resp := backend.ReadResource(ctx, &resource.ReadRequest{Key: resourceKey("item2")}) + require.Nil(t, resp.Error) require.Equal(t, int64(4), resp.ResourceVersion) require.Equal(t, "item2 MODIFIED", string(resp.Value)) }) - t.Run("Read early verion of item2", func(t *testing.T) { - resp := store.ReadResource(ctx, &resource.ReadRequest{ + t.Run("Read early version of item2", func(t *testing.T) { + resp := backend.ReadResource(ctx, &resource.ReadRequest{ Key: resourceKey("item2"), ResourceVersion: 3, // item2 was created at rv=2 and updated at rv=4 }) - require.NoError(t, err) + require.Nil(t, resp.Error) require.Equal(t, int64(2), resp.ResourceVersion) require.Equal(t, "item2 ADDED", string(resp.Value)) }) t.Run("PrepareList latest", func(t *testing.T) { - resp := store.PrepareList(ctx, &resource.ListRequest{ + resp, err := server.List(ctx, &resource.ListRequest{ Options: &resource.ListOptions{ Key: &resource.ResourceKey{ Namespace: "namespace", @@ -117,6 +133,7 @@ func TestIntegrationBackendHappyPath(t *testing.T) { }, }) require.NoError(t, err) + require.Nil(t, resp.Error) require.Len(t, resp.Items, 2) require.Equal(t, "item2 MODIFIED", string(resp.Items[0].Value)) require.Equal(t, "item3 ADDED", string(resp.Items[1].Value)) @@ -157,42 +174,42 @@ func TestIntegrationBackendWatchWriteEventsFromLastest(t *testing.T) { } ctx := testutil.NewTestContext(t, time.Now().Add(5*time.Second)) - store := newServer(t) + backend, _ := newServer(t) // Create a few resources before initing the watch - _, err := writeEvent(ctx, store, "item1", resource.WatchEvent_ADDED) + _, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_ADDED) require.NoError(t, err) // Start the watch - stream, err := store.WatchWriteEvents(ctx) + stream, err := backend.WatchWriteEvents(ctx) require.NoError(t, err) // Create one more event - _, err = writeEvent(ctx, store, "item2", resource.WatchEvent_ADDED) + _, err = writeEvent(ctx, backend, "item2", resource.WatchEvent_ADDED) require.NoError(t, err) require.Equal(t, "item2", (<-stream).Key.Name) } -func TestIntegrationBackendPrepareList(t *testing.T) { +func TestIntegrationBackendList(t *testing.T) { t.Skip("TODO: test blocking, skipping to unblock Enterprise until we fix this") if testing.Short() { t.Skip("skipping integration test") } ctx := testutil.NewTestContext(t, time.Now().Add(5*time.Second)) - store := newServer(t) + backend, server := newServer(t) - // Create a few resources before initing the watch - _, _ = writeEvent(ctx, store, "item1", resource.WatchEvent_ADDED) // rv=1 - _, _ = writeEvent(ctx, store, "item2", resource.WatchEvent_ADDED) // rv=2 - will be modified at rv=6 - _, _ = writeEvent(ctx, store, "item3", resource.WatchEvent_ADDED) // rv=3 - will be deleted at rv=7 - _, _ = writeEvent(ctx, store, "item4", resource.WatchEvent_ADDED) // rv=4 - _, _ = writeEvent(ctx, store, "item5", resource.WatchEvent_ADDED) // rv=5 - _, _ = writeEvent(ctx, store, "item2", resource.WatchEvent_MODIFIED) // rv=6 - _, _ = writeEvent(ctx, store, "item3", resource.WatchEvent_DELETED) // rv=7 - _, _ = writeEvent(ctx, store, "item6", resource.WatchEvent_ADDED) // rv=8 + // Create a few resources before starting the watch + _, _ = writeEvent(ctx, backend, "item1", resource.WatchEvent_ADDED) // rv=1 + _, _ = writeEvent(ctx, backend, "item2", resource.WatchEvent_ADDED) // rv=2 - will be modified at rv=6 + _, _ = writeEvent(ctx, backend, "item3", resource.WatchEvent_ADDED) // rv=3 - will be deleted at rv=7 + _, _ = writeEvent(ctx, backend, "item4", resource.WatchEvent_ADDED) // rv=4 + _, _ = writeEvent(ctx, backend, "item5", resource.WatchEvent_ADDED) // rv=5 + _, _ = writeEvent(ctx, backend, "item2", resource.WatchEvent_MODIFIED) // rv=6 + _, _ = writeEvent(ctx, backend, "item3", resource.WatchEvent_DELETED) // rv=7 + _, _ = writeEvent(ctx, backend, "item6", resource.WatchEvent_ADDED) // rv=8 t.Run("fetch all latest", func(t *testing.T) { - res := store.PrepareList(ctx, &resource.ListRequest{ + res, err := server.List(ctx, &resource.ListRequest{ Options: &resource.ListOptions{ Key: &resource.ResourceKey{ Group: "group", @@ -200,6 +217,7 @@ func TestIntegrationBackendPrepareList(t *testing.T) { }, }, }) + require.NoError(t, err) require.Nil(t, res.Error) require.Len(t, res.Items, 5) // should be sorted by resource version DESC @@ -213,7 +231,7 @@ func TestIntegrationBackendPrepareList(t *testing.T) { }) t.Run("list latest first page ", func(t *testing.T) { - res := store.PrepareList(ctx, &resource.ListRequest{ + res, err := server.List(ctx, &resource.ListRequest{ Limit: 3, Options: &resource.ListOptions{ Key: &resource.ResourceKey{ @@ -222,6 +240,7 @@ func TestIntegrationBackendPrepareList(t *testing.T) { }, }, }) + require.NoError(t, err) require.Nil(t, res.Error) require.Len(t, res.Items, 3) continueToken, err := sql.GetContinueToken(res.NextPageToken) @@ -230,11 +249,10 @@ func TestIntegrationBackendPrepareList(t *testing.T) { require.Equal(t, "item2 MODIFIED", string(res.Items[1].Value)) require.Equal(t, "item5 ADDED", string(res.Items[2].Value)) require.Equal(t, int64(8), continueToken.ResourceVersion) - require.Equal(t, int64(3), continueToken.StartOffset) }) t.Run("list at revision", func(t *testing.T) { - res := store.PrepareList(ctx, &resource.ListRequest{ + res, err := server.List(ctx, &resource.ListRequest{ ResourceVersion: 4, Options: &resource.ListOptions{ Key: &resource.ResourceKey{ @@ -243,6 +261,7 @@ func TestIntegrationBackendPrepareList(t *testing.T) { }, }, }) + require.NoError(t, err) require.Nil(t, res.Error) require.Len(t, res.Items, 4) require.Equal(t, "item4 ADDED", string(res.Items[0].Value)) @@ -253,7 +272,7 @@ func TestIntegrationBackendPrepareList(t *testing.T) { }) t.Run("fetch first page at revision with limit", func(t *testing.T) { - res := store.PrepareList(ctx, &resource.ListRequest{ + res, err := server.List(ctx, &resource.ListRequest{ Limit: 3, ResourceVersion: 7, Options: &resource.ListOptions{ @@ -263,6 +282,8 @@ func TestIntegrationBackendPrepareList(t *testing.T) { }, }, }) + require.NoError(t, err) + require.NoError(t, err) require.Nil(t, res.Error) require.Len(t, res.Items, 3) t.Log(res.Items) @@ -273,7 +294,6 @@ func TestIntegrationBackendPrepareList(t *testing.T) { continueToken, err := sql.GetContinueToken(res.NextPageToken) require.NoError(t, err) require.Equal(t, int64(7), continueToken.ResourceVersion) - require.Equal(t, int64(3), continueToken.StartOffset) }) t.Run("fetch second page at revision", func(t *testing.T) { @@ -281,7 +301,7 @@ func TestIntegrationBackendPrepareList(t *testing.T) { ResourceVersion: 8, StartOffset: 2, } - res := store.PrepareList(ctx, &resource.ListRequest{ + res, err := server.List(ctx, &resource.ListRequest{ NextPageToken: continueToken.String(), Limit: 2, Options: &resource.ListOptions{ @@ -291,12 +311,14 @@ func TestIntegrationBackendPrepareList(t *testing.T) { }, }, }) + require.NoError(t, err) require.Nil(t, res.Error) require.Len(t, res.Items, 2) + t.Log(res.Items) require.Equal(t, "item5 ADDED", string(res.Items[0].Value)) require.Equal(t, "item4 ADDED", string(res.Items[1].Value)) - continueToken, err := sql.GetContinueToken(res.NextPageToken) + continueToken, err = sql.GetContinueToken(res.NextPageToken) require.NoError(t, err) require.Equal(t, int64(8), continueToken.ResourceVersion) require.Equal(t, int64(4), continueToken.StartOffset) diff --git a/pkg/storage/unified/sql/testdata/resource_history_list_mysql_sqlite.sql b/pkg/storage/unified/sql/testdata/resource_history_list_mysql_sqlite.sql index 893e78a7df7..d733a38f0c7 100644 --- a/pkg/storage/unified/sql/testdata/resource_history_list_mysql_sqlite.sql +++ b/pkg/storage/unified/sql/testdata/resource_history_list_mysql_sqlite.sql @@ -1,4 +1,4 @@ -SELECT kv."resource_version", kv."value" +SELECT kv."resource_version", kv."namespace", kv."name", kv."value" FROM "resource_history" as kv INNER JOIN ( SELECT "namespace", "group", "resource", "name", max("resource_version") AS "resource_version" diff --git a/pkg/storage/unified/sql/testdata/resource_list_mysql_sqlite.sql b/pkg/storage/unified/sql/testdata/resource_list_mysql_sqlite.sql index 0b2edae1e31..2409a8cb40c 100644 --- a/pkg/storage/unified/sql/testdata/resource_list_mysql_sqlite.sql +++ b/pkg/storage/unified/sql/testdata/resource_list_mysql_sqlite.sql @@ -1,6 +1,5 @@ -SELECT "resource_version", "value" +SELECT "resource_version", "namespace", "name", "value" FROM "resource" WHERE 1 = 1 AND "namespace" = ? ORDER BY "resource_version" DESC - LIMIT ? ;