Storage: Return an iterator from backend List (#91185)

This commit is contained in:
Ryan McKinley 2024-07-31 12:05:59 +03:00 committed by GitHub
parent dd9172e738
commit f804b0baa3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 475 additions and 268 deletions

View File

@ -3,6 +3,7 @@ package legacy
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"path/filepath"
"sync"
@ -38,9 +39,6 @@ type dashboardRow struct {
// The folder UID (needed for access control checks)
FolderUID string
// Size (in bytes) of the dashboard payload
Bytes int
// The token we can use that will start a new connection that includes
// this same dashboard
token *continueToken
@ -72,6 +70,16 @@ func NewDashboardAccess(sql db.DB,
}
}
func (a *dashboardSqlAccess) currentRV(ctx context.Context) (int64, error) {
t := time.Now()
max := ""
err := a.sess.Get(ctx, &max, "SELECT MAX(updated) FROM dashboard")
if err == nil && max != "" {
t, err = time.Parse(time.DateTime, max)
}
return t.UnixMilli(), err
}
const selector = `SELECT
dashboard.org_id, dashboard.id,
dashboard.uid, dashboard.folder_uid,
@ -107,9 +115,9 @@ const history = `SELECT
LEFT OUTER JOIN user AS UpdatedUSER ON dashboard_version.created_by = UpdatedUSER.id
WHERE dashboard.is_folder = false`
func (a *dashboardSqlAccess) getRows(ctx context.Context, query *DashboardQuery) (*rowsWrapper, int, error) {
func (a *dashboardSqlAccess) getRows(ctx context.Context, query *DashboardQuery) (*rowsWrapper, error) {
if len(query.Labels) > 0 {
return nil, 0, fmt.Errorf("labels not yet supported")
return nil, fmt.Errorf("labels not yet supported")
// if query.Requirements.Folder != nil {
// args = append(args, *query.Requirements.Folder)
// sqlcmd = fmt.Sprintf("%s AND dashboard.folder_uid=$%d", sqlcmd, len(args))
@ -119,20 +127,15 @@ func (a *dashboardSqlAccess) getRows(ctx context.Context, query *DashboardQuery)
var sqlcmd string
args := []any{query.OrgID}
limit := query.Limit
if limit < 1 {
limit = 15 //
}
if query.GetHistory || query.Version > 0 {
if query.GetTrash {
return nil, 0, fmt.Errorf("trash not included in history table")
return nil, fmt.Errorf("trash not included in history table")
}
sqlcmd = fmt.Sprintf("%s AND dashboard.org_id=$%d\n ", history, len(args))
if query.UID == "" {
return nil, 0, fmt.Errorf("history query must have a UID")
return nil, fmt.Errorf("history query must have a UID")
}
args = append(args, query.UID)
@ -146,8 +149,7 @@ func (a *dashboardSqlAccess) getRows(ctx context.Context, query *DashboardQuery)
sqlcmd = fmt.Sprintf("%s AND dashboard_version.version<$%d", sqlcmd, len(args))
}
args = append(args, (limit + 2)) // add more so we can include a next token
sqlcmd = fmt.Sprintf("%s\n ORDER BY dashboard_version.version desc LIMIT $%d", sqlcmd, len(args))
sqlcmd = fmt.Sprintf("%s\n ORDER BY dashboard_version.version desc", sqlcmd)
} else {
sqlcmd = fmt.Sprintf("%s AND dashboard.org_id=$%d\n ", selector, len(args))
@ -156,7 +158,7 @@ func (a *dashboardSqlAccess) getRows(ctx context.Context, query *DashboardQuery)
sqlcmd = fmt.Sprintf("%s AND dashboard.uid=$%d", sqlcmd, len(args))
} else if query.LastID > 0 {
args = append(args, query.LastID)
sqlcmd = fmt.Sprintf("%s AND dashboard.id>=$%d", sqlcmd, len(args))
sqlcmd = fmt.Sprintf("%s AND dashboard.id>$%d", sqlcmd, len(args))
}
if query.GetTrash {
sqlcmd = sqlcmd + " AND dashboard.deleted IS NOT NULL"
@ -164,8 +166,7 @@ func (a *dashboardSqlAccess) getRows(ctx context.Context, query *DashboardQuery)
sqlcmd = sqlcmd + " AND dashboard.deleted IS NULL"
}
args = append(args, (limit + 2)) // add more so we can include a next token
sqlcmd = fmt.Sprintf("%s\n ORDER BY dashboard.id asc LIMIT $%d", sqlcmd, len(args))
sqlcmd = fmt.Sprintf("%s\n ORDER BY dashboard.id asc", sqlcmd)
}
// fmt.Printf("%s // %v\n", sqlcmd, args)
@ -176,7 +177,7 @@ func (a *dashboardSqlAccess) getRows(ctx context.Context, query *DashboardQuery)
}
rows = nil
}
return rows, limit, err
return rows, err
}
func (a *dashboardSqlAccess) doQuery(ctx context.Context, query string, args ...any) (*rowsWrapper, error) {
@ -196,25 +197,40 @@ func (a *dashboardSqlAccess) doQuery(ctx context.Context, query string, args ...
}, err
}
var _ resource.ListIterator = (*rowsWrapper)(nil)
type rowsWrapper struct {
a *dashboardSqlAccess
rows *sql.Rows
idx int
total int64
canReadDashboard func(scopes ...string) bool
// Current
row *dashboardRow
err error
}
func (r *rowsWrapper) Close() error {
return r.rows.Close()
}
func (r *rowsWrapper) Next() (*dashboardRow, error) {
func (r *rowsWrapper) Next() bool {
if r.err != nil {
return false
}
var err error
// breaks after first readable value
for r.rows.Next() {
r.idx++
d, err := r.a.scanRow(r.rows)
if d != nil {
r.row, err = r.a.scanRow(r.rows)
if err != nil {
r.err = err
return false
}
if r.row != nil {
d := r.row
// Access control checker
scopes := []string{dashboards.ScopeDashboardsProvider.GetResourceScopeUID(d.Dash.Name)}
if d.FolderUID != "" { // Copied from searchV2... not sure the logic is right
@ -223,13 +239,44 @@ func (r *rowsWrapper) Next() (*dashboardRow, error) {
if !r.canReadDashboard(scopes...) {
continue
}
r.total += int64(d.Bytes)
}
// returns the first folder it can
return d, err
return true
}
return nil, nil
}
return false
}
// ContinueToken implements resource.ListIterator.
func (r *rowsWrapper) ContinueToken() string {
return r.row.token.String()
}
// Error implements resource.ListIterator.
func (r *rowsWrapper) Error() error {
return r.err
}
// Name implements resource.ListIterator.
func (r *rowsWrapper) Name() string {
return r.row.Dash.Name
}
// Namespace implements resource.ListIterator.
func (r *rowsWrapper) Namespace() string {
return r.row.Dash.Namespace
}
// ResourceVersion implements resource.ListIterator.
func (r *rowsWrapper) ResourceVersion() int64 {
return r.row.RV
}
// Value implements resource.ListIterator.
func (r *rowsWrapper) Value() []byte {
b, err := json.Marshal(r.row.Dash)
r.err = err
return b
}
func (a *dashboardSqlAccess) scanRow(rows *sql.Rows) (*dashboardRow, error) {
@ -318,14 +365,14 @@ func (a *dashboardSqlAccess) scanRow(rows *sql.Rows) (*dashboardRow, error) {
})
}
row.Bytes = len(data)
if row.Bytes > 0 {
if len(data) > 0 {
err = dash.Spec.UnmarshalJSON(data)
if err != nil {
return row, err
}
dash.Spec.Set("id", dashboard_id) // add it so we can get it from the body later
}
// add it so we can get it from the body later
dash.Spec.Set("id", dashboard_id)
}
return row, err
}

View File

@ -101,9 +101,8 @@ func (a *dashboardSqlAccess) WriteEvent(ctx context.Context, event resource.Writ
return rv, err
}
// Read implements ResourceStoreServer.
func (a *dashboardSqlAccess) GetDashboard(ctx context.Context, orgId int64, uid string, v int64) (*dashboard.Dashboard, int64, error) {
rows, _, err := a.getRows(ctx, &DashboardQuery{
rows, err := a.getRows(ctx, &DashboardQuery{
OrgID: orgId,
UID: uid,
Limit: 2, // will only be one!
@ -114,11 +113,13 @@ func (a *dashboardSqlAccess) GetDashboard(ctx context.Context, orgId int64, uid
}
defer func() { _ = rows.Close() }()
row, err := rows.Next()
if err != nil || row == nil {
return nil, 0, err
if rows.Next() {
row := rows.row
if row != nil {
return row.Dash, row.RV, rows.err
}
return row.Dash, row.RV, nil
}
return nil, 0, rows.err
}
// Read implements ResourceStoreServer.
@ -157,26 +158,22 @@ func (a *dashboardSqlAccess) ReadResource(ctx context.Context, req *resource.Rea
}
// List implements AppendingStore.
func (a *dashboardSqlAccess) PrepareList(ctx context.Context, req *resource.ListRequest) *resource.ListResponse {
list := &resource.ListResponse{}
func (a *dashboardSqlAccess) ListIterator(ctx context.Context, req *resource.ListRequest, cb func(resource.ListIterator) error) (int64, error) {
opts := req.Options
info, err := request.ParseNamespace(opts.Key.Namespace)
if err == nil {
err = isDashboardKey(opts.Key, false)
}
if err != nil {
list.Error = resource.AsErrorResult(err)
return list
return 0, err
}
token, err := readContinueToken(req.NextPageToken)
if err != nil {
list.Error = resource.AsErrorResult(err)
return list
return 0, err
}
if token.orgId > 0 && token.orgId != info.OrgID {
list.Error = resource.NewBadRequestError("token and orgID mismatch")
return list
return 0, fmt.Errorf("token and orgID mismatch")
}
query := &DashboardQuery{
@ -187,40 +184,20 @@ func (a *dashboardSqlAccess) PrepareList(ctx context.Context, req *resource.List
Labels: req.Options.Labels,
}
rows, limit, err := a.getRows(ctx, query)
listRV, err := a.currentRV(ctx)
if err != nil {
list.Error = resource.AsErrorResult(err)
return list
return 0, err
}
defer func() { _ = rows.Close() }()
totalSize := 0
for {
row, err := rows.Next()
if err != nil || row == nil {
list.Error = resource.AsErrorResult(err)
return list
rows, err := a.getRows(ctx, query)
if rows != nil {
defer func() {
_ = rows.Close()
}()
}
totalSize += row.Bytes
if len(list.Items) > 0 && (totalSize > query.MaxBytes || len(list.Items) >= limit) {
// if query.Requirements.Folder != nil {
// row.token.folder = *query.Requirements.Folder
// }
list.NextPageToken = row.token.String() // will skip this one but start here next time
return list
}
// TODO -- make it smaller and stick the body as an annotation...
val, err := json.Marshal(row.Dash)
if err != nil {
list.Error = resource.AsErrorResult(err)
return list
}
list.Items = append(list.Items, &resource.ResourceWrapper{
ResourceVersion: row.RV,
Value: val,
})
err = cb(rows)
}
return listRV, err
}
// Watch implements AppendingStore.
@ -290,7 +267,7 @@ func (a *dashboardSqlAccess) History(ctx context.Context, req *resource.HistoryR
query.GetHistory = true
}
rows, limit, err := a.getRows(ctx, query)
rows, err := a.getRows(ctx, query)
if err != nil {
return nil, err
}
@ -298,21 +275,11 @@ func (a *dashboardSqlAccess) History(ctx context.Context, req *resource.HistoryR
totalSize := 0
list := &resource.HistoryResponse{}
for {
row, err := rows.Next()
if err != nil || row == nil {
return list, err
}
totalSize += row.Bytes
if len(list.Items) > 0 && (totalSize > query.MaxBytes || len(list.Items) >= limit) {
// if query.Requirements.Folder != nil {
// row.token.folder = *query.Requirements.Folder
// }
row.token.id = getVersionFromRV(row.RV) // Use the version as the increment
list.NextPageToken = row.token.String() // will skip this one but start here next time
for rows.Next() {
if rows.err != nil || rows.row == nil {
return list, err
}
row := rows.row
partial := &metav1.PartialObjectMetadata{
ObjectMeta: row.Dash.ObjectMeta,
@ -323,17 +290,25 @@ func (a *dashboardSqlAccess) History(ctx context.Context, req *resource.HistoryR
if err != nil {
return list, err
}
full, err := json.Marshal(row.Dash.Spec)
if err != nil {
totalSize += len(rows.Value())
if len(list.Items) > 0 && (totalSize > query.MaxBytes || len(list.Items) >= query.Limit) {
// if query.Requirements.Folder != nil {
// row.token.folder = *query.Requirements.Folder
// }
row.token.id = getVersionFromRV(row.RV) // Use the version as the increment
list.NextPageToken = row.token.String() // will skip this one but start here next time
return list, err
}
list.Items = append(list.Items, &resource.ResourceMeta{
ResourceVersion: row.RV,
PartialObjectMeta: val,
Size: int32(len(full)),
Size: int32(len(rows.Value())),
Hash: "??", // hash the full?
})
}
return list, err
}
// Used for efficient provisioning

View File

@ -209,32 +209,13 @@ func isDeletedMarker(raw []byte) bool {
return false
}
func (s *cdkBackend) PrepareList(ctx context.Context, req *ListRequest) *ListResponse {
func (s *cdkBackend) ListIterator(ctx context.Context, req *ListRequest, cb func(ListIterator) error) (int64, error) {
resources, err := buildTree(ctx, s, req.Options.Key)
if err != nil {
return &ListResponse{
Error: AsErrorResult(err),
return 0, err
}
}
rsp := &ListResponse{
ResourceVersion: s.rv.Load(),
}
for _, item := range resources {
latest := item.versions[0]
raw, err := s.bucket.ReadAll(ctx, latest.key)
if err != nil {
rsp.Error = AsErrorResult(err)
return rsp
}
if !isDeletedMarker(raw) {
rsp.Items = append(rsp.Items, &ResourceWrapper{
ResourceVersion: latest.rv,
Value: raw,
})
}
}
return rsp
err = cb(resources)
return resources.listRV, err
}
func (s *cdkBackend) WatchWriteEvents(ctx context.Context) (<-chan *WrittenEvent, error) {
@ -264,9 +245,82 @@ type cdkVersion struct {
key string
}
func buildTree(ctx context.Context, s *cdkBackend, key *ResourceKey) ([]cdkResource, error) {
byPrefix := make(map[string]*cdkResource)
type cdkListIterator struct {
bucket *blob.Bucket
ctx context.Context
err error
listRV int64
resources []cdkResource
index int
currentRV int64
currentKey string
currentVal []byte
}
// Next implements ListIterator.
func (c *cdkListIterator) Next() bool {
if c.err != nil {
return false
}
for {
c.currentVal = nil
c.index += 1
if c.index >= len(c.resources) {
return false
}
item := c.resources[c.index]
latest := item.versions[0]
raw, err := c.bucket.ReadAll(c.ctx, latest.key)
if err != nil {
c.err = err
return false
}
if !isDeletedMarker(raw) {
c.currentRV = latest.rv
c.currentKey = latest.key
c.currentVal = raw
return true
}
}
}
// Error implements ListIterator.
func (c *cdkListIterator) Error() error {
return c.err
}
// ResourceVersion implements ListIterator.
func (c *cdkListIterator) ResourceVersion() int64 {
return c.currentRV
}
// Value implements ListIterator.
func (c *cdkListIterator) Value() []byte {
return c.currentVal
}
// ContinueToken implements ListIterator.
func (c *cdkListIterator) ContinueToken() string {
return fmt.Sprintf("index:%d/key:%s", c.index, c.currentKey)
}
// Name implements ListIterator.
func (c *cdkListIterator) Name() string {
return c.currentKey // TODO (parse name from key)
}
// Namespace implements ListIterator.
func (c *cdkListIterator) Namespace() string {
return c.currentKey // TODO (parse namespace from key)
}
var _ ListIterator = (*cdkListIterator)(nil)
func buildTree(ctx context.Context, s *cdkBackend, key *ResourceKey) (*cdkListIterator, error) {
byPrefix := make(map[string]*cdkResource)
path := s.getPath(key, 0)
iter := s.bucket.List(&blob.ListOptions{Prefix: path, Delimiter: ""}) // "" is recursive
for {
@ -310,5 +364,11 @@ func buildTree(ctx context.Context, s *cdkBackend, key *ResourceKey) ([]cdkResou
return a > b
})
return resources, nil
return &cdkListIterator{
ctx: ctx,
bucket: s.bucket,
resources: resources,
listRV: s.rv.Load(),
index: -1, // must call next first
}, nil
}

View File

@ -27,6 +27,30 @@ type ResourceServer interface {
LifecycleHooks
}
type ListIterator interface {
Next() bool // sql.Rows
// Iterator error (if exts)
Error() error
// The token that can be used to start iterating *after* this item
ContinueToken() string
// ResourceVersion of the current item
ResourceVersion() int64
// Namespace of the current item
// Used for fast(er) authz filtering
Namespace() string
// Name of the current item
// Used for fast(er) authz filtering
Name() string
// Value for the current item
Value() []byte
}
// The StorageBackend is an internal abstraction that supports interacting with
// the underlying raw storage medium. This interface is never exposed directly,
// it is provided by concrete instances that actually write values.
@ -39,12 +63,12 @@ type StorageBackend interface {
// Read a resource from storage optionally at an explicit version
ReadResource(context.Context, *ReadRequest) *ReadResponse
// When the ResourceServer executes a List request, it will first
// When the ResourceServer executes a List request, this iterator will
// query the backend for potential results. All results will be
// checked against the kubernetes requirements before finally returning
// results. The list options can be used to improve performance
// but are the the final answer.
PrepareList(context.Context, *ListRequest) *ListResponse
ListIterator(context.Context, *ListRequest, func(ListIterator) error) (int64, error)
// Get all events from the store
// For HA setups, this will be more events than the local WriteEvent above!
@ -441,12 +465,53 @@ func (s *server) List(ctx context.Context, req *ListRequest) (*ListResponse, err
if err := s.Init(ctx); err != nil {
return nil, err
}
if req.Limit < 1 {
req.Limit = 50 // default max 50 items in a page
}
maxPageBytes := 1024 * 1024 * 2 // 2mb/page
pageBytes := 0
rsp := &ListResponse{}
rv, err := s.backend.ListIterator(ctx, req, func(iter ListIterator) error {
for iter.Next() {
if err := iter.Error(); err != nil {
return err
}
rsp := s.backend.PrepareList(ctx, req)
// Status???
// TODO: add authz filters
item := &ResourceWrapper{
ResourceVersion: iter.ResourceVersion(),
Value: iter.Value(),
}
pageBytes += len(item.Value)
rsp.Items = append(rsp.Items, item)
if len(rsp.Items) >= int(req.Limit) || pageBytes >= maxPageBytes {
t := iter.ContinueToken()
if iter.Next() {
rsp.NextPageToken = t
}
break
}
}
return nil
})
if err != nil {
rsp.Error = AsErrorResult(err)
return rsp, nil
}
if rv < 1 {
rsp.Error = &ErrorResult{
Code: http.StatusInternalServerError,
Message: fmt.Sprintf("invalid resource version for list: %v", rv),
}
return rsp, nil
}
rsp.ResourceVersion = rv
return rsp, err
}
func (s *server) initWatcher() error {
var err error
s.broadcaster, err = NewBroadcaster(s.ctx, func(out chan<- *WrittenEvent) error {

View File

@ -5,6 +5,7 @@ import (
"database/sql"
"errors"
"fmt"
"math"
"sync"
"time"
@ -17,6 +18,7 @@ import (
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"google.golang.org/protobuf/proto"
apierrors "k8s.io/apimachinery/pkg/api/errors"
)
const trace_prefix = "sql.resource."
@ -323,14 +325,12 @@ func (b *backend) ReadResource(ctx context.Context, req *resource.ReadRequest) *
return &res.ReadResponse
}
func (b *backend) PrepareList(ctx context.Context, req *resource.ListRequest) *resource.ListResponse {
func (b *backend) ListIterator(ctx context.Context, req *resource.ListRequest, cb func(resource.ListIterator) error) (int64, error) {
_, span := b.tracer.Start(ctx, trace_prefix+"List")
defer span.End()
if req.Options == nil || req.Options.Key.Group == "" || req.Options.Key.Resource == "" {
return &resource.ListResponse{
Error: resource.NewBadRequestError("missing group or resource"),
}
return 0, fmt.Errorf("missing group or resource")
}
// TODO: think about how to handler VersionMatch. We should be able to use latest for the first page (only).
@ -338,21 +338,81 @@ func (b *backend) PrepareList(ctx context.Context, req *resource.ListRequest) *r
// TODO: add support for RemainingItemCount
if req.ResourceVersion > 0 || req.NextPageToken != "" {
return b.listAtRevision(ctx, req)
return b.listAtRevision(ctx, req, cb)
}
return b.listLatest(ctx, req)
return b.listLatest(ctx, req, cb)
}
type listIter struct {
rows *sql.Rows
offset int64
listRV int64
// any error
err error
// The row
rv int64
value []byte
namespace string
name string
}
// ContinueToken implements resource.ListIterator.
func (l *listIter) ContinueToken() string {
return ContinueToken{ResourceVersion: l.listRV, StartOffset: l.offset}.String()
}
// Error implements resource.ListIterator.
func (l *listIter) Error() error {
return l.err
}
// Name implements resource.ListIterator.
func (l *listIter) Name() string {
return l.name
}
// Namespace implements resource.ListIterator.
func (l *listIter) Namespace() string {
return l.namespace
}
// ResourceVersion implements resource.ListIterator.
func (l *listIter) ResourceVersion() int64 {
return l.rv
}
// Value implements resource.ListIterator.
func (l *listIter) Value() []byte {
return l.value
}
// Next implements resource.ListIterator.
func (l *listIter) Next() bool {
if l.rows.Next() {
l.offset++
l.err = l.rows.Scan(&l.rv, &l.namespace, &l.name, &l.value)
return true
}
return false
}
var _ resource.ListIterator = (*listIter)(nil)
// listLatest fetches the resources from the resource table.
func (b *backend) listLatest(ctx context.Context, req *resource.ListRequest) *resource.ListResponse {
out := &resource.ListResponse{
ResourceVersion: 0,
func (b *backend) listLatest(ctx context.Context, req *resource.ListRequest, cb func(resource.ListIterator) error) (int64, error) {
if req.NextPageToken != "" {
return 0, fmt.Errorf("only works for the first page")
}
if req.ResourceVersion > 0 {
return 0, fmt.Errorf("only works for the 'latest' resource version")
}
iter := &listIter{}
err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error {
var err error
out.ResourceVersion, err = fetchLatestRV(ctx, tx, b.dialect, req.Options.Key.Group, req.Options.Key.Resource)
iter.listRV, err = fetchLatestRV(ctx, tx, b.dialect, req.Options.Key.Group, req.Options.Key.Resource)
if err != nil {
return err
}
@ -360,96 +420,78 @@ func (b *backend) listLatest(ctx context.Context, req *resource.ListRequest) *re
listReq := sqlResourceListRequest{
SQLTemplate: sqltemplate.New(b.dialect),
Request: new(resource.ListRequest),
Response: new(resource.ResourceWrapper),
}
listReq.Request = proto.Clone(req).(*resource.ListRequest)
if req.Limit > 0 {
listReq.Request.Limit++ // fetch one extra row for Limit
}
items, err := dbutil.Query(ctx, tx, sqlResourceList, listReq)
rows, err := dbutil.QueryRows(ctx, tx, sqlResourceList, listReq)
if rows != nil {
defer func() {
if err := rows.Close(); err != nil {
b.log.Warn("listLatest error closing rows", "error", err)
}
}()
}
if err != nil {
return fmt.Errorf("list latest resources: %w", err)
return err
}
if 0 < req.Limit && int(req.Limit) < len(items) {
// remove the additional item we added synthetically above
clear(items[req.Limit:])
items = items[:req.Limit]
out.NextPageToken = ContinueToken{
ResourceVersion: out.ResourceVersion,
StartOffset: req.Limit,
}.String()
}
out.Items = items
return nil
iter.rows = rows
return cb(iter)
})
if err != nil {
out.Error = resource.AsErrorResult(err)
}
return out
return iter.listRV, err
}
// listAtRevision fetches the resources from the resource_history table at a specific revision.
func (b *backend) listAtRevision(ctx context.Context, req *resource.ListRequest) *resource.ListResponse {
func (b *backend) listAtRevision(ctx context.Context, req *resource.ListRequest, cb func(resource.ListIterator) error) (int64, error) {
// Get the RV
rv := req.ResourceVersion
offset := int64(0)
iter := &listIter{listRV: req.ResourceVersion}
if req.NextPageToken != "" {
continueToken, err := GetContinueToken(req.NextPageToken)
if err != nil {
return &resource.ListResponse{
Error: resource.AsErrorResult(fmt.Errorf("get continue token: %w", err)),
}
}
rv = continueToken.ResourceVersion
offset = continueToken.StartOffset
return 0, fmt.Errorf("get continue token: %w", err)
}
iter.listRV = continueToken.ResourceVersion
iter.offset = continueToken.StartOffset
out := &resource.ListResponse{
ResourceVersion: rv,
if req.ResourceVersion != 0 && req.ResourceVersion != iter.listRV {
return 0, apierrors.NewBadRequest("request resource version does not math token")
}
}
if iter.listRV < 1 {
return 0, apierrors.NewBadRequest("expecting an explicit resource version query")
}
err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error {
limit := int64(0) // ignore limit
if iter.offset > 0 {
limit = math.MaxInt64 // a limit is required for offset
}
listReq := sqlResourceHistoryListRequest{
SQLTemplate: sqltemplate.New(b.dialect),
Request: &historyListRequest{
ResourceVersion: rv,
Limit: req.Limit,
Offset: offset,
ResourceVersion: iter.listRV,
Limit: limit,
Offset: iter.offset,
Options: req.Options,
},
Response: new(resource.ResourceWrapper),
}
if listReq.Request.Limit > 0 {
listReq.Request.Limit++ // fetch one extra row for Limit
}
items, err := dbutil.Query(ctx, tx, sqlResourceHistoryList, listReq)
rows, err := dbutil.QueryRows(ctx, tx, sqlResourceHistoryList, listReq)
if rows != nil {
defer func() {
if err := rows.Close(); err != nil {
b.log.Warn("listAtRevision error closing rows", "error", err)
}
}()
}
if err != nil {
return fmt.Errorf("list resources at revision: %w", err)
return err
}
if 0 < req.Limit && int(req.Limit) < len(items) {
// remove the additional item we added synthetically above
clear(items[req.Limit:])
items = items[:req.Limit]
out.NextPageToken = ContinueToken{
ResourceVersion: out.ResourceVersion,
StartOffset: req.Limit + offset,
}.String()
}
out.Items = items
return nil
iter.rows = rows
return cb(iter)
})
if err != nil {
out.Error = resource.AsErrorResult(err)
}
return out
return iter.listRV, err
}
func (b *backend) WatchWriteEvents(ctx context.Context) (<-chan *resource.WrittenEvent, error) {
@ -616,7 +658,7 @@ func (b *backend) poll(ctx context.Context, grp string, res string, since int64,
// in a single roundtrip. This would reduce the latency of the operation, and also increase the
// throughput of the system. This is a good candidate for a future optimization.
func resourceVersionAtomicInc(ctx context.Context, x db.ContextExecer, d sqltemplate.Dialect, key *resource.ResourceKey) (newVersion int64, err error) {
// TODO: refactor this code to run in a multi-statement transaction in order to minimise the number of roundtrips.
// TODO: refactor this code to run in a multi-statement transaction in order to minimize the number of round trips.
// 1 Lock the row for update
rv, err := dbutil.QueryRow(ctx, x, sqlResourceVersionGet, sqlResourceVersionRequest{
SQLTemplate: sqltemplate.New(d),
@ -656,6 +698,6 @@ func resourceVersionAtomicInc(ctx context.Context, x db.ContextExecer, d sqltemp
return 0, fmt.Errorf("increase resource version: %w", err)
}
// 3. Retun the incremended value
// 3. Return the incremented value
return nextRV, nil
}

View File

@ -1,6 +1,8 @@
SELECT
kv.{{ .Ident "resource_version" | .Into .Response.ResourceVersion }},
kv.{{ .Ident "value" | .Into .Response.Value }}
kv.{{ .Ident "resource_version" }},
kv.{{ .Ident "namespace" }},
kv.{{ .Ident "name" }},
kv.{{ .Ident "value" }}
FROM {{ .Ident "resource_history" }} as kv
INNER JOIN (
SELECT {{ .Ident "namespace" }}, {{ .Ident "group" }}, {{ .Ident "resource" }}, {{ .Ident "name" }}, max({{ .Ident "resource_version" }}) AS {{ .Ident "resource_version" }}

View File

@ -1,6 +1,8 @@
SELECT
{{ .Ident "resource_version" | .Into .Response.ResourceVersion }},
{{ .Ident "value" | .Into .Response.Value }}
{{ .Ident "resource_version" }},
{{ .Ident "namespace" }},
{{ .Ident "name" }},
{{ .Ident "value" }}
FROM {{ .Ident "resource" }}
WHERE 1 = 1
{{ if and .Request.Options .Request.Options.Key }}
@ -18,7 +20,4 @@ SELECT
{{ end }}
{{ end }}
ORDER BY {{ .Ident "resource_version" }} DESC
{{ if (gt .Request.Limit 0) }}
LIMIT {{ .Arg .Request.Limit }}
{{ end }}
;

View File

@ -107,10 +107,8 @@ func Exec(ctx context.Context, x db.ContextExecer, tmpl *template.Template, req
}
// Query uses `req` as input for a single-statement, set-returning query
// generated with `tmpl`, and executed in `x`. The `Results` method of `req`
// should return a deep copy since it will be used multiple times to decode each
// value. It returns an error if more than one result set is returned.
func Query[T any](ctx context.Context, x db.ContextExecer, tmpl *template.Template, req sqltemplate.WithResults[T]) ([]T, error) {
// generated with `tmpl`, and executed in `x`.
func QueryRows(ctx context.Context, x db.ContextExecer, tmpl *template.Template, req sqltemplate.SQLTemplateIface) (*sql.Rows, error) {
if err := req.Validate(); err != nil {
return nil, fmt.Errorf("Query: invalid request for template %q: %w",
tmpl.Name(), err)
@ -134,6 +132,18 @@ func Query[T any](ctx context.Context, x db.ContextExecer, tmpl *template.Templa
RawQuery: rawQuery,
}
}
return rows, err
}
// Query uses `req` as input for a single-statement, set-returning query
// generated with `tmpl`, and executed in `x`. The `Results` method of `req`
// should return a deep copy since it will be used multiple times to decode each
// value. It returns an error if more than one result set is returned.
func Query[T any](ctx context.Context, x db.ContextExecer, tmpl *template.Template, req sqltemplate.WithResults[T]) ([]T, error) {
rows, err := QueryRows(ctx, x, tmpl, req)
if err != nil {
return nil, err
}
var ret []T
for rows.Next() {

View File

@ -115,25 +115,12 @@ func (r sqlResourceReadRequest) Validate() error {
type sqlResourceListRequest struct {
*sqltemplate.SQLTemplate
Request *resource.ListRequest
Response *resource.ResourceWrapper
}
func (r sqlResourceListRequest) Validate() error {
return nil // TODO
}
func (r sqlResourceListRequest) Results() (*resource.ResourceWrapper, error) {
// sqlResourceListRequest is a set-returning query. As such, it
// should not return its *Response, since that will be overwritten in the
// next call to `Scan`, so it needs to return a copy of it. Note, though,
// that it is safe to return the same `Response.Value` since `Scan`
// allocates a new slice of bytes each time.
return &resource.ResourceWrapper{
ResourceVersion: r.Response.ResourceVersion,
Value: r.Response.Value,
}, nil
}
type historyListRequest struct {
ResourceVersion, Limit, Offset int64
Options *resource.ListOptions

View File

@ -159,7 +159,6 @@ func TestQueries(t *testing.T) {
},
},
},
Response: new(resource.ResourceWrapper),
},
Expected: expected{
"resource_list_mysql_sqlite.sql": dialects{

View File

@ -26,7 +26,7 @@ func TestMain(m *testing.M) {
testsuite.Run(m)
}
func newServer(t *testing.T) sql.Backend {
func newServer(t *testing.T) (sql.Backend, resource.ResourceServer) {
t.Helper()
dbstore := infraDB.InitTestDB(t)
@ -48,7 +48,15 @@ func newServer(t *testing.T) sql.Backend {
err = ret.Init(testutil.NewDefaultTestContext(t))
require.NoError(t, err)
return ret
server, err := resource.NewResourceServer(resource.ResourceServerOptions{
Backend: ret,
Diagnostics: ret,
Lifecycle: ret,
})
require.NoError(t, err)
require.NotNil(t, server)
return ret, server
}
func TestIntegrationBackendHappyPath(t *testing.T) {
@ -57,57 +65,65 @@ func TestIntegrationBackendHappyPath(t *testing.T) {
t.Skip("skipping integration test")
}
ctx := testutil.NewTestContext(t, time.Now().Add(5*time.Second))
store := newServer(t)
testUserA := &identity.StaticRequester{
Type: identity.TypeUser,
Login: "testuser",
UserID: 123,
UserUID: "u123",
OrgRole: identity.RoleAdmin,
IsGrafanaAdmin: true, // can do anything
}
ctx := identity.WithRequester(context.Background(), testUserA)
backend, server := newServer(t)
stream, err := store.WatchWriteEvents(ctx)
stream, err := backend.WatchWriteEvents(context.Background()) // Using a different context to avoid canceling the stream after the DefaultContextTimeout
require.NoError(t, err)
t.Run("Add 3 resources", func(t *testing.T) {
rv, err := writeEvent(ctx, store, "item1", resource.WatchEvent_ADDED)
rv, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_ADDED)
require.NoError(t, err)
require.Equal(t, int64(1), rv)
rv, err = writeEvent(ctx, store, "item2", resource.WatchEvent_ADDED)
rv, err = writeEvent(ctx, backend, "item2", resource.WatchEvent_ADDED)
require.NoError(t, err)
require.Equal(t, int64(2), rv)
rv, err = writeEvent(ctx, store, "item3", resource.WatchEvent_ADDED)
rv, err = writeEvent(ctx, backend, "item3", resource.WatchEvent_ADDED)
require.NoError(t, err)
require.Equal(t, int64(3), rv)
})
t.Run("Update item2", func(t *testing.T) {
rv, err := writeEvent(ctx, store, "item2", resource.WatchEvent_MODIFIED)
rv, err := writeEvent(ctx, backend, "item2", resource.WatchEvent_MODIFIED)
require.NoError(t, err)
require.Equal(t, int64(4), rv)
})
t.Run("Delete item1", func(t *testing.T) {
rv, err := writeEvent(ctx, store, "item1", resource.WatchEvent_DELETED)
rv, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_DELETED)
require.NoError(t, err)
require.Equal(t, int64(5), rv)
})
t.Run("Read latest item 2", func(t *testing.T) {
resp := store.ReadResource(ctx, &resource.ReadRequest{Key: resourceKey("item2")})
require.NoError(t, err)
resp := backend.ReadResource(ctx, &resource.ReadRequest{Key: resourceKey("item2")})
require.Nil(t, resp.Error)
require.Equal(t, int64(4), resp.ResourceVersion)
require.Equal(t, "item2 MODIFIED", string(resp.Value))
})
t.Run("Read early verion of item2", func(t *testing.T) {
resp := store.ReadResource(ctx, &resource.ReadRequest{
t.Run("Read early version of item2", func(t *testing.T) {
resp := backend.ReadResource(ctx, &resource.ReadRequest{
Key: resourceKey("item2"),
ResourceVersion: 3, // item2 was created at rv=2 and updated at rv=4
})
require.NoError(t, err)
require.Nil(t, resp.Error)
require.Equal(t, int64(2), resp.ResourceVersion)
require.Equal(t, "item2 ADDED", string(resp.Value))
})
t.Run("PrepareList latest", func(t *testing.T) {
resp := store.PrepareList(ctx, &resource.ListRequest{
resp, err := server.List(ctx, &resource.ListRequest{
Options: &resource.ListOptions{
Key: &resource.ResourceKey{
Namespace: "namespace",
@ -117,6 +133,7 @@ func TestIntegrationBackendHappyPath(t *testing.T) {
},
})
require.NoError(t, err)
require.Nil(t, resp.Error)
require.Len(t, resp.Items, 2)
require.Equal(t, "item2 MODIFIED", string(resp.Items[0].Value))
require.Equal(t, "item3 ADDED", string(resp.Items[1].Value))
@ -157,42 +174,42 @@ func TestIntegrationBackendWatchWriteEventsFromLastest(t *testing.T) {
}
ctx := testutil.NewTestContext(t, time.Now().Add(5*time.Second))
store := newServer(t)
backend, _ := newServer(t)
// Create a few resources before initing the watch
_, err := writeEvent(ctx, store, "item1", resource.WatchEvent_ADDED)
_, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_ADDED)
require.NoError(t, err)
// Start the watch
stream, err := store.WatchWriteEvents(ctx)
stream, err := backend.WatchWriteEvents(ctx)
require.NoError(t, err)
// Create one more event
_, err = writeEvent(ctx, store, "item2", resource.WatchEvent_ADDED)
_, err = writeEvent(ctx, backend, "item2", resource.WatchEvent_ADDED)
require.NoError(t, err)
require.Equal(t, "item2", (<-stream).Key.Name)
}
func TestIntegrationBackendPrepareList(t *testing.T) {
func TestIntegrationBackendList(t *testing.T) {
t.Skip("TODO: test blocking, skipping to unblock Enterprise until we fix this")
if testing.Short() {
t.Skip("skipping integration test")
}
ctx := testutil.NewTestContext(t, time.Now().Add(5*time.Second))
store := newServer(t)
backend, server := newServer(t)
// Create a few resources before initing the watch
_, _ = writeEvent(ctx, store, "item1", resource.WatchEvent_ADDED) // rv=1
_, _ = writeEvent(ctx, store, "item2", resource.WatchEvent_ADDED) // rv=2 - will be modified at rv=6
_, _ = writeEvent(ctx, store, "item3", resource.WatchEvent_ADDED) // rv=3 - will be deleted at rv=7
_, _ = writeEvent(ctx, store, "item4", resource.WatchEvent_ADDED) // rv=4
_, _ = writeEvent(ctx, store, "item5", resource.WatchEvent_ADDED) // rv=5
_, _ = writeEvent(ctx, store, "item2", resource.WatchEvent_MODIFIED) // rv=6
_, _ = writeEvent(ctx, store, "item3", resource.WatchEvent_DELETED) // rv=7
_, _ = writeEvent(ctx, store, "item6", resource.WatchEvent_ADDED) // rv=8
// Create a few resources before starting the watch
_, _ = writeEvent(ctx, backend, "item1", resource.WatchEvent_ADDED) // rv=1
_, _ = writeEvent(ctx, backend, "item2", resource.WatchEvent_ADDED) // rv=2 - will be modified at rv=6
_, _ = writeEvent(ctx, backend, "item3", resource.WatchEvent_ADDED) // rv=3 - will be deleted at rv=7
_, _ = writeEvent(ctx, backend, "item4", resource.WatchEvent_ADDED) // rv=4
_, _ = writeEvent(ctx, backend, "item5", resource.WatchEvent_ADDED) // rv=5
_, _ = writeEvent(ctx, backend, "item2", resource.WatchEvent_MODIFIED) // rv=6
_, _ = writeEvent(ctx, backend, "item3", resource.WatchEvent_DELETED) // rv=7
_, _ = writeEvent(ctx, backend, "item6", resource.WatchEvent_ADDED) // rv=8
t.Run("fetch all latest", func(t *testing.T) {
res := store.PrepareList(ctx, &resource.ListRequest{
res, err := server.List(ctx, &resource.ListRequest{
Options: &resource.ListOptions{
Key: &resource.ResourceKey{
Group: "group",
@ -200,6 +217,7 @@ func TestIntegrationBackendPrepareList(t *testing.T) {
},
},
})
require.NoError(t, err)
require.Nil(t, res.Error)
require.Len(t, res.Items, 5)
// should be sorted by resource version DESC
@ -213,7 +231,7 @@ func TestIntegrationBackendPrepareList(t *testing.T) {
})
t.Run("list latest first page ", func(t *testing.T) {
res := store.PrepareList(ctx, &resource.ListRequest{
res, err := server.List(ctx, &resource.ListRequest{
Limit: 3,
Options: &resource.ListOptions{
Key: &resource.ResourceKey{
@ -222,6 +240,7 @@ func TestIntegrationBackendPrepareList(t *testing.T) {
},
},
})
require.NoError(t, err)
require.Nil(t, res.Error)
require.Len(t, res.Items, 3)
continueToken, err := sql.GetContinueToken(res.NextPageToken)
@ -230,11 +249,10 @@ func TestIntegrationBackendPrepareList(t *testing.T) {
require.Equal(t, "item2 MODIFIED", string(res.Items[1].Value))
require.Equal(t, "item5 ADDED", string(res.Items[2].Value))
require.Equal(t, int64(8), continueToken.ResourceVersion)
require.Equal(t, int64(3), continueToken.StartOffset)
})
t.Run("list at revision", func(t *testing.T) {
res := store.PrepareList(ctx, &resource.ListRequest{
res, err := server.List(ctx, &resource.ListRequest{
ResourceVersion: 4,
Options: &resource.ListOptions{
Key: &resource.ResourceKey{
@ -243,6 +261,7 @@ func TestIntegrationBackendPrepareList(t *testing.T) {
},
},
})
require.NoError(t, err)
require.Nil(t, res.Error)
require.Len(t, res.Items, 4)
require.Equal(t, "item4 ADDED", string(res.Items[0].Value))
@ -253,7 +272,7 @@ func TestIntegrationBackendPrepareList(t *testing.T) {
})
t.Run("fetch first page at revision with limit", func(t *testing.T) {
res := store.PrepareList(ctx, &resource.ListRequest{
res, err := server.List(ctx, &resource.ListRequest{
Limit: 3,
ResourceVersion: 7,
Options: &resource.ListOptions{
@ -263,6 +282,8 @@ func TestIntegrationBackendPrepareList(t *testing.T) {
},
},
})
require.NoError(t, err)
require.NoError(t, err)
require.Nil(t, res.Error)
require.Len(t, res.Items, 3)
t.Log(res.Items)
@ -273,7 +294,6 @@ func TestIntegrationBackendPrepareList(t *testing.T) {
continueToken, err := sql.GetContinueToken(res.NextPageToken)
require.NoError(t, err)
require.Equal(t, int64(7), continueToken.ResourceVersion)
require.Equal(t, int64(3), continueToken.StartOffset)
})
t.Run("fetch second page at revision", func(t *testing.T) {
@ -281,7 +301,7 @@ func TestIntegrationBackendPrepareList(t *testing.T) {
ResourceVersion: 8,
StartOffset: 2,
}
res := store.PrepareList(ctx, &resource.ListRequest{
res, err := server.List(ctx, &resource.ListRequest{
NextPageToken: continueToken.String(),
Limit: 2,
Options: &resource.ListOptions{
@ -291,12 +311,14 @@ func TestIntegrationBackendPrepareList(t *testing.T) {
},
},
})
require.NoError(t, err)
require.Nil(t, res.Error)
require.Len(t, res.Items, 2)
t.Log(res.Items)
require.Equal(t, "item5 ADDED", string(res.Items[0].Value))
require.Equal(t, "item4 ADDED", string(res.Items[1].Value))
continueToken, err := sql.GetContinueToken(res.NextPageToken)
continueToken, err = sql.GetContinueToken(res.NextPageToken)
require.NoError(t, err)
require.Equal(t, int64(8), continueToken.ResourceVersion)
require.Equal(t, int64(4), continueToken.StartOffset)

View File

@ -1,4 +1,4 @@
SELECT kv."resource_version", kv."value"
SELECT kv."resource_version", kv."namespace", kv."name", kv."value"
FROM "resource_history" as kv
INNER JOIN (
SELECT "namespace", "group", "resource", "name", max("resource_version") AS "resource_version"

View File

@ -1,6 +1,5 @@
SELECT "resource_version", "value"
SELECT "resource_version", "namespace", "name", "value"
FROM "resource"
WHERE 1 = 1 AND "namespace" = ?
ORDER BY "resource_version" DESC
LIMIT ?
;