From e788df921c0b582160b61ec655f26c31ff9b6ad3 Mon Sep 17 00:00:00 2001 From: Diego Augusto Molina Date: Tue, 20 Aug 2024 09:29:06 -0300 Subject: [PATCH] Storage: always use transactions and make them readonly when possible (#92110) * always use transactions and make them readonly when possible * fix linters * fix reference --- pkg/storage/unified/sql/backend.go | 112 ++++++++++++++--------------- pkg/storage/unified/sql/queries.go | 36 +++++++++- 2 files changed, 85 insertions(+), 63 deletions(-) diff --git a/pkg/storage/unified/sql/backend.go b/pkg/storage/unified/sql/backend.go index d9157131ba9..75ec7806eb6 100644 --- a/pkg/storage/unified/sql/backend.go +++ b/pkg/storage/unified/sql/backend.go @@ -301,7 +301,7 @@ func (b *backend) ReadResource(ctx context.Context, req *resource.ReadRequest) * // TODO: validate key ? - readReq := sqlResourceReadRequest{ + readReq := &sqlResourceReadRequest{ SQLTemplate: sqltemplate.New(b.dialect), Request: req, readResponse: new(readResponse), @@ -313,7 +313,12 @@ func (b *backend) ReadResource(ctx context.Context, req *resource.ReadRequest) * sr = sqlResourceHistoryRead } - res, err := dbutil.QueryRow(ctx, b.db, sr, readReq) + var res *readResponse + err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error { + var err error + res, err = dbutil.QueryRow(ctx, tx, sr, readReq) + return err + }) if errors.Is(err, sql.ErrNoRows) { return &resource.ReadResponse{ Error: resource.NewNotFoundError(req.Key), @@ -552,33 +557,28 @@ func (b *backend) poller(ctx context.Context, since groupResourceRV, stream chan // listLatestRVs returns the latest resource version for each (Group, Resource) pair. func (b *backend) listLatestRVs(ctx context.Context) (groupResourceRV, error) { - since := groupResourceRV{} - reqRVs := sqlResourceVersionListRequest{ - SQLTemplate: sqltemplate.New(b.dialect), - groupResourceVersion: new(groupResourceVersion), - } - query, err := sqltemplate.Execute(sqlResourceVersionList, reqRVs) - if err != nil { - return nil, fmt.Errorf("execute SQL template to get the latest resource version: %w", err) - } - rows, err := b.db.QueryContext(ctx, query, reqRVs.GetArgs()...) - if err != nil { - return nil, fmt.Errorf("fetching recent resource versions: %w", err) - } - defer func() { _ = rows.Close() }() + var grvs []*groupResourceVersion + err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error { + var err error + grvs, err = dbutil.Query(ctx, tx, sqlResourceVersionList, &sqlResourceVersionListRequest{ + SQLTemplate: sqltemplate.New(b.dialect), + groupResourceVersion: new(groupResourceVersion), + }) - for rows.Next() { - if err := rows.Scan(reqRVs.GetScanDest()...); err != nil { - return nil, err - } - if _, ok := since[reqRVs.Group]; !ok { - since[reqRVs.Group] = map[string]int64{} - } - if _, ok := since[reqRVs.Group][reqRVs.Resource]; !ok { - since[reqRVs.Group] = map[string]int64{} - } - since[reqRVs.Group][reqRVs.Resource] = reqRVs.ResourceVersion + return err + }) + if err != nil { + return nil, err } + + since := groupResourceRV{} + for _, grv := range grvs { + if since[grv.Group] == nil { + since[grv.Group] = map[string]int64{} + } + since[grv.Group][grv.Resource] = grv.ResourceVersion + } + return since, nil } @@ -603,52 +603,44 @@ func (b *backend) poll(ctx context.Context, grp string, res string, since int64, ctx, span := b.tracer.Start(ctx, trace_prefix+"poll") defer span.End() - pollReq := sqlResourceHistoryPollRequest{ - SQLTemplate: sqltemplate.New(b.dialect), - Resource: res, - Group: grp, - SinceResourceVersion: since, - Response: &historyPollResponse{}, - } - query, err := sqltemplate.Execute(sqlResourceHistoryPoll, pollReq) + var records []*historyPollResponse + err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error { + var err error + records, err = dbutil.Query(ctx, tx, sqlResourceHistoryPoll, &sqlResourceHistoryPollRequest{ + SQLTemplate: sqltemplate.New(b.dialect), + Resource: res, + Group: grp, + SinceResourceVersion: since, + Response: &historyPollResponse{}, + }) + return err + }) if err != nil { - return since, fmt.Errorf("execute SQL template to poll for resource history: %w", err) - } - rows, err := b.db.QueryContext(ctx, query, pollReq.GetArgs()...) - if err != nil { - return since, fmt.Errorf("poll for resource history: %w", err) + return 0, fmt.Errorf("poll history: %w", err) } - defer func() { _ = rows.Close() }() - nextRV := since - for rows.Next() { - // check if the context is done - if ctx.Err() != nil { - return nextRV, ctx.Err() - } - if err := rows.Scan(pollReq.GetScanDest()...); err != nil { - return nextRV, fmt.Errorf("scan row polling for resource history: %w", err) - } - resp := pollReq.Response - if resp.Key.Group == "" || resp.Key.Resource == "" || resp.Key.Name == "" { + var nextRV int64 + for _, rec := range records { + if rec.Key.Group == "" || rec.Key.Resource == "" || rec.Key.Name == "" { return nextRV, fmt.Errorf("missing key in response") } - nextRV = resp.ResourceVersion + nextRV = rec.ResourceVersion stream <- &resource.WrittenEvent{ WriteEvent: resource.WriteEvent{ - Value: resp.Value, + Value: rec.Value, Key: &resource.ResourceKey{ - Namespace: resp.Key.Namespace, - Group: resp.Key.Group, - Resource: resp.Key.Resource, - Name: resp.Key.Name, + Namespace: rec.Key.Namespace, + Group: rec.Key.Group, + Resource: rec.Key.Resource, + Name: rec.Key.Name, }, - Type: resource.WatchEvent_Type(resp.Action), + Type: resource.WatchEvent_Type(rec.Action), }, - ResourceVersion: resp.ResourceVersion, + ResourceVersion: rec.ResourceVersion, // Timestamp: , // TODO: add timestamp } } + return nextRV, nil } diff --git a/pkg/storage/unified/sql/queries.go b/pkg/storage/unified/sql/queries.go index 99f126672fb..893169c3f3a 100644 --- a/pkg/storage/unified/sql/queries.go +++ b/pkg/storage/unified/sql/queries.go @@ -79,6 +79,7 @@ func (r *historyPollResponse) Results() (*historyPollResponse, error) { } type groupResourceRV map[string]map[string]int64 + type sqlResourceHistoryPollRequest struct { sqltemplate.SQLTemplate Resource string @@ -87,10 +88,24 @@ type sqlResourceHistoryPollRequest struct { Response *historyPollResponse } -func (r sqlResourceHistoryPollRequest) Validate() error { +func (r *sqlResourceHistoryPollRequest) Validate() error { return nil // TODO } +func (r *sqlResourceHistoryPollRequest) Results() (*historyPollResponse, error) { + return &historyPollResponse{ + Key: resource.ResourceKey{ + Namespace: r.Response.Key.Namespace, + Group: r.Response.Key.Group, + Resource: r.Response.Key.Resource, + Name: r.Response.Key.Name, + }, + ResourceVersion: r.Response.ResourceVersion, + Value: r.Response.Value, + Action: r.Response.Action, + }, nil +} + // sqlResourceReadRequest can be used to retrieve a row fromthe "resource" tables. type readResponse struct { @@ -107,10 +122,20 @@ type sqlResourceReadRequest struct { *readResponse } -func (r sqlResourceReadRequest) Validate() error { +func (r *sqlResourceReadRequest) Validate() error { return nil // TODO } +func (r *sqlResourceReadRequest) Results() (*readResponse, error) { + return &readResponse{ + ReadResponse: resource.ReadResponse{ + Error: r.ReadResponse.Error, + ResourceVersion: r.ReadResponse.ResourceVersion, + Value: r.ReadResponse.Value, + }, + }, nil +} + // List type sqlResourceListRequest struct { sqltemplate.SQLTemplate @@ -189,6 +214,11 @@ type sqlResourceVersionListRequest struct { *groupResourceVersion } -func (r sqlResourceVersionListRequest) Validate() error { +func (r *sqlResourceVersionListRequest) Validate() error { return nil // TODO } + +func (r *sqlResourceVersionListRequest) Results() (*groupResourceVersion, error) { + x := *r.groupResourceVersion + return &x, nil +}