Add pagination

This commit is contained in:
Georges Chaudy 2024-07-08 22:57:11 +02:00
parent c8f516fc55
commit d305f1baf4
No known key found for this signature in database
GPG Key ID: 0EE887FFCA1DB6EF
4 changed files with 167 additions and 4 deletions

View File

@ -351,6 +351,9 @@ func (b *backend) PrepareList(ctx context.Context, req *resource.ListRequest) (*
_, span := b.tracer.Start(ctx, trace_prefix+"List")
defer span.End()
if req.ResourceVersion > 0 || req.NextPageToken != "" {
return b.listAtRevision(ctx, req)
}
return b.listLatest(ctx, req)
}
@ -372,6 +375,7 @@ func (b *backend) listLatest(ctx context.Context, req *resource.ListRequest) (*r
}
// Fetch one extra row for Limit
lim := req.Limit
if req.Limit > 0 {
req.Limit++
}
@ -397,8 +401,76 @@ func (b *backend) listLatest(ctx context.Context, req *resource.ListRequest) (*r
}
rw := *readReq.Response
if req.Limit > 0 && i >= req.Limit {
continueToken := &ContinueToken{ResourceVersion: out.ResourceVersion, StartOffset: i - 1}
if lim > 0 && i > lim {
continueToken := &ContinueToken{ResourceVersion: out.ResourceVersion, StartOffset: lim}
out.NextPageToken = continueToken.String()
break
}
out.Items = append(out.Items, &rw)
}
return nil
})
return out, err
}
// listAtRevision fetches the resources from the resource_history table at a specific revision.
func (b *backend) listAtRevision(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) {
// Get the RV
rv := req.ResourceVersion
offset := int64(0)
if rv == 0 {
continueToken, err := GetContinueToken(req.NextPageToken)
if err != nil {
return nil, fmt.Errorf("get continue token: %w", err)
}
rv = continueToken.ResourceVersion
offset = continueToken.StartOffset
}
out := &resource.ListResponse{
Items: []*resource.ResourceWrapper{}, // TODO: we could pre-allocate the capacity if we estimate the number of items
ResourceVersion: rv,
}
err := b.sqlDB.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error {
var err error
// Fetch one extra row for Limit
lim := req.Limit
if lim > 0 {
req.Limit++
}
readReq := sqlResourceHistoryListRequest{
SQLTemplate: sqltemplate.New(b.sqlDialect),
Request: &historyListRequest{
ResourceVersion: rv,
Limit: req.Limit,
Offset: offset,
Options: req.Options,
},
Response: new(resource.ResourceWrapper),
}
query, err := sqltemplate.Execute(sqlResourceHistoryList, readReq)
if err != nil {
return fmt.Errorf("execute SQL template to list resources at revision: %w", err)
}
rows, err := tx.QueryContext(ctx, query, readReq.GetArgs()...)
if err != nil {
return fmt.Errorf("list resources at revision: %w", err)
}
for i := int64(1); rows.Next(); i++ {
if ctx.Err() != nil {
return ctx.Err()
}
if err := rows.Scan(readReq.GetScanDest()...); err != nil {
return fmt.Errorf("scan row #%d: %w", i, err)
}
rw := *readReq.Response
if lim > 0 && i > lim {
continueToken := &ContinueToken{ResourceVersion: out.ResourceVersion, StartOffset: offset + lim}
out.NextPageToken = continueToken.String()
break
}

View File

@ -218,7 +218,6 @@ func TestBackendPrepareList(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, int64(i), rv)
}
t.Run("fetch all latest", func(t *testing.T) {
res, err := store.PrepareList(ctx, &resource.ListRequest{})
assert.NoError(t, err)
@ -226,7 +225,7 @@ func TestBackendPrepareList(t *testing.T) {
assert.Empty(t, res.NextPageToken)
})
t.Run("fetch first page", func(t *testing.T) {
t.Run("list latest first page ", func(t *testing.T) {
res, err := store.PrepareList(ctx, &resource.ListRequest{
Limit: 5,
})
@ -237,4 +236,49 @@ func TestBackendPrepareList(t *testing.T) {
assert.Equal(t, int64(10), continueToken.ResourceVersion)
assert.Equal(t, int64(5), continueToken.StartOffset)
})
t.Run("list at revision", func(t *testing.T) {
res, err := store.PrepareList(ctx, &resource.ListRequest{
ResourceVersion: 4,
})
assert.NoError(t, err)
assert.Len(t, res.Items, 4)
assert.Equal(t, "initial value 1", string(res.Items[0].Value))
assert.Empty(t, res.NextPageToken)
})
t.Run("fetch first page at revision with limit", func(t *testing.T) {
res, err := store.PrepareList(ctx, &resource.ListRequest{
Limit: 3,
ResourceVersion: 5,
})
assert.NoError(t, err)
assert.Len(t, res.Items, 3)
assert.Equal(t, "initial value 1", string(res.Items[0].Value))
assert.Equal(t, "initial value 2", string(res.Items[1].Value))
assert.Equal(t, "initial value 3", string(res.Items[2].Value))
continueToken, err := GetContinueToken(res.NextPageToken)
assert.NoError(t, err)
assert.Equal(t, int64(5), continueToken.ResourceVersion)
assert.Equal(t, int64(3), continueToken.StartOffset)
})
t.Run("fetch second page at revision", func(t *testing.T) {
continueToken := &ContinueToken{
ResourceVersion: 5,
StartOffset: 2,
}
res, err := store.PrepareList(ctx, &resource.ListRequest{
NextPageToken: continueToken.String(),
Limit: 2,
})
assert.NoError(t, err)
assert.Len(t, res.Items, 2)
assert.Equal(t, "initial value 3", string(res.Items[0].Value))
assert.Equal(t, "initial value 4", string(res.Items[1].Value))
continueToken, err = GetContinueToken(res.NextPageToken)
assert.NoError(t, err)
assert.Equal(t, int64(5), continueToken.ResourceVersion)
assert.Equal(t, int64(4), continueToken.StartOffset)
})
}

View File

@ -0,0 +1,32 @@
SELECT
kv.{{ .Ident "resource_version" | .Into .Response.ResourceVersion }},
{{ .Ident "value" | .Into .Response.Value }}
FROM {{ .Ident "resource_history" }} as kv
JOIN (
SELECT {{ .Ident "guid" }}, max({{ .Ident "resource_version" }}) AS {{ .Ident "resource_version" }}
FROM {{ .Ident "resource_history" }} AS mkv
WHERE 1 = 1
AND {{ .Ident "resource_version" }} <= {{ .Arg .Request.ResourceVersion }}
{{ if and .Request.Options .Request.Options.Key }}
{{ if .Request.Options.Key.Namespace }}
AND {{ .Ident "namespace" }} = {{ .Arg .Request.Options.Key.Namespace }}
{{ end }}
{{ if .Request.Options.Key.Group }}
AND {{ .Ident "group" }} = {{ .Arg .Request.Options.Key.Group }}
{{ end }}
{{ if .Request.Options.Key.Resource }}
AND {{ .Ident "resource" }} = {{ .Arg .Request.Options.Key.Resource }}
{{ end }}
{{ if .Request.Options.Key.Name }}
AND {{ .Ident "name" }} = {{ .Arg .Request.Options.Key.Name }}
{{ end }}
{{ end }}
GROUP BY mkv.{{ .Ident "namespace" }}, mkv.{{ .Ident "group" }}, mkv.{{ .Ident "resource" }}, mkv.{{ .Ident "name" }}
) AS maxkv
ON maxkv.{{ .Ident "guid" }} = kv.{{ .Ident "guid" }}
WHERE kv.{{ .Ident "action" }} != 3
ORDER BY kv.{{ .Ident "resource_version" }} ASC
{{ if (gt .Request.Limit 0) }}
LIMIT {{ .Arg .Request.Offset }}, {{ .Arg .Request.Limit }}
{{ end }}
;

View File

@ -32,6 +32,7 @@ var (
sqlResourceUpdate = mustTemplate("resource_update.sql")
sqlResourceRead = mustTemplate("resource_read.sql")
sqlResourceList = mustTemplate("resource_list.sql")
sqlResourceHistoryList = mustTemplate("resource_history_list.sql")
sqlResourceUpdateRV = mustTemplate("resource_update_rv.sql")
sqlResourceHistoryRead = mustTemplate("resource_history_read.sql")
sqlResourceHistoryUpdateRV = mustTemplate("resource_history_update_rv.sql")
@ -144,6 +145,20 @@ func (r sqlResourceListRequest) Validate() error {
return nil // TODO
}
type historyListRequest struct {
ResourceVersion, Limit, Offset int64
Options *resource.ListOptions
}
type sqlResourceHistoryListRequest struct {
*sqltemplate.SQLTemplate
Request *historyListRequest
Response *resource.ResourceWrapper
}
func (r sqlResourceHistoryListRequest) Validate() error {
return nil // TODO
}
// update RV
type sqlResourceUpdateRVRequest struct {