Storage: always use transactions and make them readonly when possible (#92110)

* always use transactions and make them readonly when possible

* fix linters

* fix reference
This commit is contained in:
Diego Augusto Molina 2024-08-20 09:29:06 -03:00 committed by GitHub
parent 704b07b3f0
commit e788df921c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 85 additions and 63 deletions

View File

@ -301,7 +301,7 @@ func (b *backend) ReadResource(ctx context.Context, req *resource.ReadRequest) *
// TODO: validate key ?
readReq := sqlResourceReadRequest{
readReq := &sqlResourceReadRequest{
SQLTemplate: sqltemplate.New(b.dialect),
Request: req,
readResponse: new(readResponse),
@ -313,7 +313,12 @@ func (b *backend) ReadResource(ctx context.Context, req *resource.ReadRequest) *
sr = sqlResourceHistoryRead
}
res, err := dbutil.QueryRow(ctx, b.db, sr, readReq)
var res *readResponse
err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error {
var err error
res, err = dbutil.QueryRow(ctx, tx, sr, readReq)
return err
})
if errors.Is(err, sql.ErrNoRows) {
return &resource.ReadResponse{
Error: resource.NewNotFoundError(req.Key),
@ -552,33 +557,28 @@ func (b *backend) poller(ctx context.Context, since groupResourceRV, stream chan
// listLatestRVs returns the latest resource version for each (Group, Resource) pair.
func (b *backend) listLatestRVs(ctx context.Context) (groupResourceRV, error) {
since := groupResourceRV{}
reqRVs := sqlResourceVersionListRequest{
var grvs []*groupResourceVersion
err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error {
var err error
grvs, err = dbutil.Query(ctx, tx, sqlResourceVersionList, &sqlResourceVersionListRequest{
SQLTemplate: sqltemplate.New(b.dialect),
groupResourceVersion: new(groupResourceVersion),
}
query, err := sqltemplate.Execute(sqlResourceVersionList, reqRVs)
if err != nil {
return nil, fmt.Errorf("execute SQL template to get the latest resource version: %w", err)
}
rows, err := b.db.QueryContext(ctx, query, reqRVs.GetArgs()...)
if err != nil {
return nil, fmt.Errorf("fetching recent resource versions: %w", err)
}
defer func() { _ = rows.Close() }()
})
for rows.Next() {
if err := rows.Scan(reqRVs.GetScanDest()...); err != nil {
return err
})
if err != nil {
return nil, err
}
if _, ok := since[reqRVs.Group]; !ok {
since[reqRVs.Group] = map[string]int64{}
since := groupResourceRV{}
for _, grv := range grvs {
if since[grv.Group] == nil {
since[grv.Group] = map[string]int64{}
}
if _, ok := since[reqRVs.Group][reqRVs.Resource]; !ok {
since[reqRVs.Group] = map[string]int64{}
}
since[reqRVs.Group][reqRVs.Resource] = reqRVs.ResourceVersion
since[grv.Group][grv.Resource] = grv.ResourceVersion
}
return since, nil
}
@ -603,52 +603,44 @@ func (b *backend) poll(ctx context.Context, grp string, res string, since int64,
ctx, span := b.tracer.Start(ctx, trace_prefix+"poll")
defer span.End()
pollReq := sqlResourceHistoryPollRequest{
var records []*historyPollResponse
err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error {
var err error
records, err = dbutil.Query(ctx, tx, sqlResourceHistoryPoll, &sqlResourceHistoryPollRequest{
SQLTemplate: sqltemplate.New(b.dialect),
Resource: res,
Group: grp,
SinceResourceVersion: since,
Response: &historyPollResponse{},
}
query, err := sqltemplate.Execute(sqlResourceHistoryPoll, pollReq)
})
return err
})
if err != nil {
return since, fmt.Errorf("execute SQL template to poll for resource history: %w", err)
}
rows, err := b.db.QueryContext(ctx, query, pollReq.GetArgs()...)
if err != nil {
return since, fmt.Errorf("poll for resource history: %w", err)
return 0, fmt.Errorf("poll history: %w", err)
}
defer func() { _ = rows.Close() }()
nextRV := since
for rows.Next() {
// check if the context is done
if ctx.Err() != nil {
return nextRV, ctx.Err()
}
if err := rows.Scan(pollReq.GetScanDest()...); err != nil {
return nextRV, fmt.Errorf("scan row polling for resource history: %w", err)
}
resp := pollReq.Response
if resp.Key.Group == "" || resp.Key.Resource == "" || resp.Key.Name == "" {
var nextRV int64
for _, rec := range records {
if rec.Key.Group == "" || rec.Key.Resource == "" || rec.Key.Name == "" {
return nextRV, fmt.Errorf("missing key in response")
}
nextRV = resp.ResourceVersion
nextRV = rec.ResourceVersion
stream <- &resource.WrittenEvent{
WriteEvent: resource.WriteEvent{
Value: resp.Value,
Value: rec.Value,
Key: &resource.ResourceKey{
Namespace: resp.Key.Namespace,
Group: resp.Key.Group,
Resource: resp.Key.Resource,
Name: resp.Key.Name,
Namespace: rec.Key.Namespace,
Group: rec.Key.Group,
Resource: rec.Key.Resource,
Name: rec.Key.Name,
},
Type: resource.WatchEvent_Type(resp.Action),
Type: resource.WatchEvent_Type(rec.Action),
},
ResourceVersion: resp.ResourceVersion,
ResourceVersion: rec.ResourceVersion,
// Timestamp: , // TODO: add timestamp
}
}
return nextRV, nil
}

View File

@ -79,6 +79,7 @@ func (r *historyPollResponse) Results() (*historyPollResponse, error) {
}
type groupResourceRV map[string]map[string]int64
type sqlResourceHistoryPollRequest struct {
sqltemplate.SQLTemplate
Resource string
@ -87,10 +88,24 @@ type sqlResourceHistoryPollRequest struct {
Response *historyPollResponse
}
func (r sqlResourceHistoryPollRequest) Validate() error {
func (r *sqlResourceHistoryPollRequest) Validate() error {
return nil // TODO
}
func (r *sqlResourceHistoryPollRequest) Results() (*historyPollResponse, error) {
return &historyPollResponse{
Key: resource.ResourceKey{
Namespace: r.Response.Key.Namespace,
Group: r.Response.Key.Group,
Resource: r.Response.Key.Resource,
Name: r.Response.Key.Name,
},
ResourceVersion: r.Response.ResourceVersion,
Value: r.Response.Value,
Action: r.Response.Action,
}, nil
}
// sqlResourceReadRequest can be used to retrieve a row fromthe "resource" tables.
type readResponse struct {
@ -107,10 +122,20 @@ type sqlResourceReadRequest struct {
*readResponse
}
func (r sqlResourceReadRequest) Validate() error {
func (r *sqlResourceReadRequest) Validate() error {
return nil // TODO
}
func (r *sqlResourceReadRequest) Results() (*readResponse, error) {
return &readResponse{
ReadResponse: resource.ReadResponse{
Error: r.ReadResponse.Error,
ResourceVersion: r.ReadResponse.ResourceVersion,
Value: r.ReadResponse.Value,
},
}, nil
}
// List
type sqlResourceListRequest struct {
sqltemplate.SQLTemplate
@ -189,6 +214,11 @@ type sqlResourceVersionListRequest struct {
*groupResourceVersion
}
func (r sqlResourceVersionListRequest) Validate() error {
func (r *sqlResourceVersionListRequest) Validate() error {
return nil // TODO
}
func (r *sqlResourceVersionListRequest) Results() (*groupResourceVersion, error) {
x := *r.groupResourceVersion
return &x, nil
}