This commit is contained in:
Georges Chaudy 2024-07-09 15:57:55 +02:00
parent 37e0745d7a
commit 36d9099d1a
No known key found for this signature in database
GPG Key ID: 0EE887FFCA1DB6EF
2 changed files with 6 additions and 7 deletions

View File

@ -354,6 +354,8 @@ func (b *backend) PrepareList(ctx context.Context, req *resource.ListRequest) (*
_, span := b.tracer.Start(ctx, trace_prefix+"List")
defer span.End()
// TODO: think about how to handler VersionMatch. We should be able to use latest for the first page (only).
if req.ResourceVersion > 0 || req.NextPageToken != "" {
return b.listAtRevision(ctx, req)
}
@ -396,7 +398,7 @@ func (b *backend) listLatest(ctx context.Context, req *resource.ListRequest) (*r
if err != nil {
return fmt.Errorf("list latest resources: %w", err)
}
defer rows.Close()
defer func() { _ = rows.Close() }()
for i := int64(1); rows.Next(); i++ {
if ctx.Err() != nil {
return ctx.Err()
@ -467,7 +469,7 @@ func (b *backend) listAtRevision(ctx context.Context, req *resource.ListRequest)
if err != nil {
return fmt.Errorf("list resources at revision: %w", err)
}
defer rows.Close()
defer func() { _ = rows.Close() }()
for i := int64(1); rows.Next(); i++ {
if ctx.Err() != nil {
return ctx.Err()
@ -537,14 +539,13 @@ func fetchLatestRV(ctx context.Context, db db.ContextExecer) (int64, error) {
if err != nil {
return 0, fmt.Errorf("fetch latest rv: %w", err)
}
defer rows.Close()
defer func() { _ = rows.Close() }()
if rows.Next() {
rv := new(int64)
if err := rows.Scan(&rv); err != nil {
return 0, fmt.Errorf("scan since resource version: %w", err)
}
return *rv, nil
}
return 0, fmt.Errorf("no rows")
}
@ -566,7 +567,7 @@ func (b *backend) poll(ctx context.Context, since int64, stream chan<- *resource
if err != nil {
return 0, fmt.Errorf("poll for resource history: %w", err)
}
defer rows.Close()
defer func() { _ = rows.Close() }()
next := since
for i := 1; rows.Next(); i++ {
// check if the context is done
@ -603,7 +604,6 @@ func (b *backend) poll(ctx context.Context, since int64, stream chan<- *resource
// in a single roundtrip. This would reduce the latency of the operation, and also increase the
// throughput of the system. This is a good candidate for a future optimization.
func resourceVersionAtomicInc(ctx context.Context, x db.ContextExecer, d sqltemplate.Dialect, key *resource.ResourceKey) (newVersion int64, err error) {
// 1. Increment the resource version
res, err := exec(ctx, x, sqlResourceVersionInc, sqlResourceVersionRequest{
SQLTemplate: sqltemplate.New(d),

View File

@ -45,7 +45,6 @@ func TestBackendHappyPath(t *testing.T) {
rv, err = writeEvent(ctx, store, "item3", resource.WatchEvent_ADDED)
assert.NoError(t, err)
assert.Equal(t, int64(3), rv)
})
t.Run("Update item2", func(t *testing.T) {