Fix missing rows.Close()

This commit is contained in:
Georges Chaudy 2024-07-09 12:26:49 +02:00
parent a6538fa8df
commit 37e0745d7a
No known key found for this signature in database
GPG Key ID: 0EE887FFCA1DB6EF
10 changed files with 142 additions and 60 deletions

View File

@ -45,8 +45,8 @@ import (
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash" "github.com/grafana/grafana/pkg/services/store/entity/sqlstash"
"github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/apistore" "github.com/grafana/grafana/pkg/storage/unified/apistore"
"github.com/grafana/grafana/pkg/storage/unified/entitybridge"
"github.com/grafana/grafana/pkg/storage/unified/resource" "github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql"
) )
var ( var (
@ -267,7 +267,7 @@ func (s *service) start(ctx context.Context) error {
return fmt.Errorf("unified storage requires the unifiedStorage feature flag") return fmt.Errorf("unified storage requires the unifiedStorage feature flag")
} }
server, err := entitybridge.ProvideResourceServer(s.db, s.cfg, s.features, s.tracing) server, err := sql.ProvideResourceServer(s.db, s.cfg, s.features, s.tracing)
if err != nil { if err != nil {
return err return err
} }

View File

@ -105,8 +105,6 @@ func (b *backend) Init() error {
b.sess = sess b.sess = sess
b.dialect = migrator.NewDialect(engine.DriverName()) b.dialect = migrator.NewDialect(engine.DriverName())
// TODO.... set up the broadcaster
return nil return nil
} }
@ -178,16 +176,18 @@ func (b *backend) create(ctx context.Context, event resource.WriteEvent) (int64,
newVersion = rv newVersion = rv
// 5. Update the RV in both resource and resource_history // 5. Update the RV in both resource and resource_history
rvReq := sqlResourceUpdateRVRequest{ if _, err = exec(ctx, tx, sqlResourceHistoryUpdateRV, sqlResourceUpdateRVRequest{
SQLTemplate: sqltemplate.New(b.sqlDialect), SQLTemplate: sqltemplate.New(b.sqlDialect),
GUID: guid, GUID: guid,
ResourceVersion: newVersion, ResourceVersion: newVersion,
} }); err != nil {
if _, err = exec(ctx, tx, sqlResourceHistoryUpdateRV, rvReq); err != nil {
return fmt.Errorf("update history rv: %w", err) return fmt.Errorf("update history rv: %w", err)
} }
if _, err = exec(ctx, tx, sqlResourceUpdateRV, rvReq); err != nil { if _, err = exec(ctx, tx, sqlResourceUpdateRV, sqlResourceUpdateRVRequest{
SQLTemplate: sqltemplate.New(b.sqlDialect),
GUID: guid,
ResourceVersion: newVersion,
}); err != nil {
return fmt.Errorf("update resource rv: %w", err) return fmt.Errorf("update resource rv: %w", err)
} }
return nil return nil
@ -239,17 +239,20 @@ func (b *backend) update(ctx context.Context, event resource.WriteEvent) (int64,
return err return err
} }
newVersion = rv newVersion = rv
rvReq := sqlResourceUpdateRVRequest{
// 5. Update the RV in both resource and resource_history
if _, err = exec(ctx, tx, sqlResourceHistoryUpdateRV, sqlResourceUpdateRVRequest{
SQLTemplate: sqltemplate.New(b.sqlDialect), SQLTemplate: sqltemplate.New(b.sqlDialect),
GUID: guid, GUID: guid,
ResourceVersion: newVersion, ResourceVersion: newVersion,
} }); err != nil {
// 5. Update the RV in both resource and resource_history
if _, err = exec(ctx, tx, sqlResourceHistoryUpdateRV, rvReq); err != nil {
return fmt.Errorf("update history rv: %w", err) return fmt.Errorf("update history rv: %w", err)
} }
if _, err = exec(ctx, tx, sqlResourceUpdateRV, rvReq); err != nil { if _, err = exec(ctx, tx, sqlResourceUpdateRV, sqlResourceUpdateRVRequest{
SQLTemplate: sqltemplate.New(b.sqlDialect),
GUID: guid,
ResourceVersion: newVersion,
}); err != nil {
return fmt.Errorf("update resource rv: %w", err) return fmt.Errorf("update resource rv: %w", err)
} }
@ -364,7 +367,7 @@ func (b *backend) listLatest(ctx context.Context, req *resource.ListRequest) (*r
ResourceVersion: 0, ResourceVersion: 0,
} }
err := b.sqlDB.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error { err := b.sqlDB.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
var err error var err error
// TODO: Here the lastest RV might be lower than the actual latest RV // TODO: Here the lastest RV might be lower than the actual latest RV
@ -379,34 +382,38 @@ func (b *backend) listLatest(ctx context.Context, req *resource.ListRequest) (*r
if req.Limit > 0 { if req.Limit > 0 {
req.Limit++ req.Limit++
} }
readReq := sqlResourceListRequest{ listReq := sqlResourceListRequest{
SQLTemplate: sqltemplate.New(b.sqlDialect), SQLTemplate: sqltemplate.New(b.sqlDialect),
Request: req, Request: req,
Response: new(resource.ResourceWrapper), Response: new(resource.ResourceWrapper),
} }
query, err := sqltemplate.Execute(sqlResourceList, readReq) query, err := sqltemplate.Execute(sqlResourceList, listReq)
if err != nil { if err != nil {
return fmt.Errorf("execute SQL template to list resources: %w", err) return fmt.Errorf("execute SQL template to list resources: %w", err)
} }
rows, err := tx.QueryContext(ctx, query, readReq.GetArgs()...)
rows, err := tx.QueryContext(ctx, query, listReq.GetArgs()...)
if err != nil { if err != nil {
return fmt.Errorf("list resources: %w", err) return fmt.Errorf("list latest resources: %w", err)
} }
defer rows.Close()
for i := int64(1); rows.Next(); i++ { for i := int64(1); rows.Next(); i++ {
if ctx.Err() != nil { if ctx.Err() != nil {
return ctx.Err() return ctx.Err()
} }
if err := rows.Scan(readReq.GetScanDest()...); err != nil { if err := rows.Scan(listReq.GetScanDest()...); err != nil {
return fmt.Errorf("scan row #%d: %w", i, err) return fmt.Errorf("scan row #%d: %w", i, err)
} }
rw := *readReq.Response
if lim > 0 && i > lim { if lim > 0 && i > lim {
continueToken := &ContinueToken{ResourceVersion: out.ResourceVersion, StartOffset: lim} continueToken := &ContinueToken{ResourceVersion: out.ResourceVersion, StartOffset: lim}
out.NextPageToken = continueToken.String() out.NextPageToken = continueToken.String()
break break
} }
out.Items = append(out.Items, &rw) out.Items = append(out.Items, &resource.ResourceWrapper{
ResourceVersion: listReq.Response.ResourceVersion,
Value: listReq.Response.Value,
})
} }
return nil return nil
@ -434,7 +441,7 @@ func (b *backend) listAtRevision(ctx context.Context, req *resource.ListRequest)
ResourceVersion: rv, ResourceVersion: rv,
} }
err := b.sqlDB.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error { err := b.sqlDB.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
var err error var err error
// Fetch one extra row for Limit // Fetch one extra row for Limit
@ -442,7 +449,7 @@ func (b *backend) listAtRevision(ctx context.Context, req *resource.ListRequest)
if lim > 0 { if lim > 0 {
req.Limit++ req.Limit++
} }
readReq := sqlResourceHistoryListRequest{ listReq := sqlResourceHistoryListRequest{
SQLTemplate: sqltemplate.New(b.sqlDialect), SQLTemplate: sqltemplate.New(b.sqlDialect),
Request: &historyListRequest{ Request: &historyListRequest{
ResourceVersion: rv, ResourceVersion: rv,
@ -452,29 +459,32 @@ func (b *backend) listAtRevision(ctx context.Context, req *resource.ListRequest)
}, },
Response: new(resource.ResourceWrapper), Response: new(resource.ResourceWrapper),
} }
query, err := sqltemplate.Execute(sqlResourceHistoryList, readReq) query, err := sqltemplate.Execute(sqlResourceHistoryList, listReq)
if err != nil { if err != nil {
return fmt.Errorf("execute SQL template to list resources at revision: %w", err) return fmt.Errorf("execute SQL template to list resources at revision: %w", err)
} }
rows, err := tx.QueryContext(ctx, query, readReq.GetArgs()...) rows, err := tx.QueryContext(ctx, query, listReq.GetArgs()...)
if err != nil { if err != nil {
return fmt.Errorf("list resources at revision: %w", err) return fmt.Errorf("list resources at revision: %w", err)
} }
defer rows.Close()
for i := int64(1); rows.Next(); i++ { for i := int64(1); rows.Next(); i++ {
if ctx.Err() != nil { if ctx.Err() != nil {
return ctx.Err() return ctx.Err()
} }
if err := rows.Scan(readReq.GetScanDest()...); err != nil { if err := rows.Scan(listReq.GetScanDest()...); err != nil {
return fmt.Errorf("scan row #%d: %w", i, err) return fmt.Errorf("scan row #%d: %w", i, err)
} }
rw := *readReq.Response
if lim > 0 && i > lim { if lim > 0 && i > lim {
continueToken := &ContinueToken{ResourceVersion: out.ResourceVersion, StartOffset: offset + lim} continueToken := &ContinueToken{ResourceVersion: out.ResourceVersion, StartOffset: offset + lim}
out.NextPageToken = continueToken.String() out.NextPageToken = continueToken.String()
break break
} }
out.Items = append(out.Items, &rw) out.Items = append(out.Items, &resource.ResourceWrapper{
ResourceVersion: listReq.Response.ResourceVersion,
Value: listReq.Response.Value,
})
} }
return nil return nil
@ -527,6 +537,7 @@ func fetchLatestRV(ctx context.Context, db db.ContextExecer) (int64, error) {
if err != nil { if err != nil {
return 0, fmt.Errorf("fetch latest rv: %w", err) return 0, fmt.Errorf("fetch latest rv: %w", err)
} }
defer rows.Close()
if rows.Next() { if rows.Next() {
rv := new(int64) rv := new(int64)
if err := rows.Scan(&rv); err != nil { if err := rows.Scan(&rv); err != nil {
@ -555,6 +566,7 @@ func (b *backend) poll(ctx context.Context, since int64, stream chan<- *resource
if err != nil { if err != nil {
return 0, fmt.Errorf("poll for resource history: %w", err) return 0, fmt.Errorf("poll for resource history: %w", err)
} }
defer rows.Close()
next := since next := since
for i := 1; rows.Next(); i++ { for i := 1; rows.Next(); i++ {
// check if the context is done // check if the context is done
@ -593,12 +605,11 @@ func (b *backend) poll(ctx context.Context, since int64, stream chan<- *resource
func resourceVersionAtomicInc(ctx context.Context, x db.ContextExecer, d sqltemplate.Dialect, key *resource.ResourceKey) (newVersion int64, err error) { func resourceVersionAtomicInc(ctx context.Context, x db.ContextExecer, d sqltemplate.Dialect, key *resource.ResourceKey) (newVersion int64, err error) {
// 1. Increment the resource version // 1. Increment the resource version
req := sqlResourceVersionRequest{ res, err := exec(ctx, x, sqlResourceVersionInc, sqlResourceVersionRequest{
SQLTemplate: sqltemplate.New(d), SQLTemplate: sqltemplate.New(d),
Key: key, Group: key.Group,
resourceVersion: new(resourceVersion), Resource: key.Resource,
} })
res, err := exec(ctx, x, sqlResourceVersionInc, req)
if err != nil { if err != nil {
return 0, fmt.Errorf("increase resource version: %w", err) return 0, fmt.Errorf("increase resource version: %w", err)
} }
@ -621,7 +632,11 @@ func resourceVersionAtomicInc(ctx context.Context, x db.ContextExecer, d sqltemp
// complexity to the code. That would be preferrable to changing // complexity to the code. That would be preferrable to changing
// Dialect, though. The current alternative, just retrying, seems to be // Dialect, though. The current alternative, just retrying, seems to be
// enough for now. // enough for now.
if _, err = exec(ctx, x, sqlResourceVersionInsert, req); err != nil { if _, err = exec(ctx, x, sqlResourceVersionInsert, sqlResourceVersionRequest{
SQLTemplate: sqltemplate.New(d),
Group: key.Group,
Resource: key.Resource,
}); err != nil {
return 0, fmt.Errorf("insert into resource_version: %w", err) return 0, fmt.Errorf("insert into resource_version: %w", err)
} }
@ -629,8 +644,14 @@ func resourceVersionAtomicInc(ctx context.Context, x db.ContextExecer, d sqltemp
} }
// 2. Get the new version // 2. Get the new version
req := sqlResourceVersionRequest{
SQLTemplate: sqltemplate.New(d),
Group: key.Group,
Resource: key.Resource,
resourceVersion: new(resourceVersion),
}
if _, err = queryRow(ctx, x, sqlResourceVersionGet, req); err != nil { if _, err = queryRow(ctx, x, sqlResourceVersionGet, req); err != nil {
return 0, fmt.Errorf("get resource version: %w", err) return 0, fmt.Errorf("get the new resource version: %w", err)
} }
return req.ResourceVersion, nil return req.ResourceVersion, nil

View File

@ -2,6 +2,6 @@ SELECT
{{ .Ident "resource_version" | .Into .ResourceVersion }} {{ .Ident "resource_version" | .Into .ResourceVersion }}
FROM {{ .Ident "resource_version" }} FROM {{ .Ident "resource_version" }}
WHERE 1 = 1 WHERE 1 = 1
AND {{ .Ident "group" }} = {{ .Arg .Key.Group }} AND {{ .Ident "group" }} = {{ .Arg .Group }}
AND {{ .Ident "resource" }} = {{ .Arg .Key.Resource }} AND {{ .Ident "resource" }} = {{ .Arg .Resource }}
; ;

View File

@ -2,6 +2,6 @@ UPDATE {{ .Ident "resource_version" }}
SET SET
{{ .Ident "resource_version" }} = resource_version + 1 {{ .Ident "resource_version" }} = resource_version + 1
WHERE 1 = 1 WHERE 1 = 1
AND {{ .Ident "group" }} = {{ .Arg .Key.Group }} AND {{ .Ident "group" }} = {{ .Arg .Group }}
AND {{ .Ident "resource" }} = {{ .Arg .Key.Resource }} AND {{ .Ident "resource" }} = {{ .Arg .Resource }}
; ;

View File

@ -6,8 +6,8 @@ INSERT INTO {{ .Ident "resource_version" }}
) )
VALUES ( VALUES (
{{ .Arg .Key.Group }}, {{ .Arg .Group }},
{{ .Arg .Key.Resource }}, {{ .Arg .Resource }},
1 1
) )
; ;

View File

@ -69,7 +69,7 @@ func (db *ResourceDB) init() error {
// TODO: This should be renamed resource_api // TODO: This should be renamed resource_api
getter := &sectionGetter{ getter := &sectionGetter{
DynamicSection: db.cfg.SectionWithEnvOverrides("entity_api"), DynamicSection: db.cfg.SectionWithEnvOverrides("resource_api"),
} }
dbType := getter.Key("db_type").MustString("") dbType := getter.Key("db_type").MustString("")

View File

@ -64,17 +64,17 @@ func initResourceTables(mg *migrator.Migrator) string {
}, },
}) })
tables = append(tables, migrator.Table{ // tables = append(tables, migrator.Table{
Name: "resource_label_set", // Name: "resource_label_set",
Columns: []*migrator.Column{ // Columns: []*migrator.Column{
{Name: "label_set", Type: migrator.DB_NVarchar, Length: 64, Nullable: false}, // {Name: "label_set", Type: migrator.DB_NVarchar, Length: 64, Nullable: false},
{Name: "label", Type: migrator.DB_NVarchar, Length: 190, Nullable: false}, // {Name: "label", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
{Name: "value", Type: migrator.DB_Text, Nullable: false}, // {Name: "value", Type: migrator.DB_Text, Nullable: false},
}, // },
Indices: []*migrator.Index{ // Indices: []*migrator.Index{
{Cols: []string{"label_set", "label"}, Type: migrator.UniqueIndex}, // {Cols: []string{"label_set", "label"}, Type: migrator.UniqueIndex},
}, // },
}) // })
tables = append(tables, migrator.Table{ tables = append(tables, migrator.Table{
Name: "resource_version", Name: "resource_version",

View File

@ -182,7 +182,7 @@ func (r *resourceVersion) Results() (*resourceVersion, error) {
type sqlResourceVersionRequest struct { type sqlResourceVersionRequest struct {
*sqltemplate.SQLTemplate *sqltemplate.SQLTemplate
Key *resource.ResourceKey Group, Resource string
*resourceVersion *resourceVersion
} }

View File

@ -278,7 +278,6 @@ func TestQueries(t *testing.T) {
Name: "single path", Name: "single path",
Data: &sqlResourceVersionRequest{ Data: &sqlResourceVersionRequest{
SQLTemplate: new(sqltemplate.SQLTemplate), SQLTemplate: new(sqltemplate.SQLTemplate),
Key: &resource.ResourceKey{},
resourceVersion: new(resourceVersion), resourceVersion: new(resourceVersion),
}, },
Expected: expected{ Expected: expected{
@ -295,7 +294,6 @@ func TestQueries(t *testing.T) {
Name: "increment resource version", Name: "increment resource version",
Data: &sqlResourceVersionRequest{ Data: &sqlResourceVersionRequest{
SQLTemplate: new(sqltemplate.SQLTemplate), SQLTemplate: new(sqltemplate.SQLTemplate),
Key: &resource.ResourceKey{},
}, },
Expected: expected{ Expected: expected{
"resource_version_inc_mysql_sqlite.sql": dialects{ "resource_version_inc_mysql_sqlite.sql": dialects{
@ -311,7 +309,6 @@ func TestQueries(t *testing.T) {
Name: "single path", Name: "single path",
Data: &sqlResourceVersionRequest{ Data: &sqlResourceVersionRequest{
SQLTemplate: new(sqltemplate.SQLTemplate), SQLTemplate: new(sqltemplate.SQLTemplate),
Key: &resource.ResourceKey{},
}, },
Expected: expected{ Expected: expected{
"resource_version_insert_mysql_sqlite.sql": dialects{ "resource_version_insert_mysql_sqlite.sql": dialects{

View File

@ -0,0 +1,64 @@
package sql
import (
"context"
"os"
"path/filepath"
"time"
"gocloud.dev/blob/fileblob"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
)
// Creates a ResourceServer
func ProvideResourceServer(db db.DB, cfg *setting.Cfg, features featuremgmt.FeatureToggles, tracer tracing.Tracer) (resource.ResourceServer, error) {
opts := resource.ResourceServerOptions{
Tracer: tracer,
}
supportBlobs := true
// Create a local blob filesystem blob store
if supportBlobs {
dir := filepath.Join(cfg.DataPath, "unistore", "blobs")
if err := os.MkdirAll(dir, 0o750); err != nil {
return nil, err
}
bucket, err := fileblob.OpenBucket(dir, &fileblob.Options{
CreateDir: true,
Metadata: fileblob.MetadataDontWrite, // skip
})
if err != nil {
return nil, err
}
opts.Blob, err = resource.NewCDKBlobStore(context.Background(), resource.CDKBlobStoreOptions{
Tracer: tracer,
Bucket: bucket,
URLExpiration: time.Minute * 20,
})
if err != nil {
return nil, err
}
}
eDB, err := dbimpl.ProvideResourceDB(db, cfg, features, tracer)
if err != nil {
return nil, err
}
store, err := NewBackendStore(backendOptions{DB: eDB, Tracer: tracer})
if err != nil {
return nil, err
}
opts.Backend = store
opts.Diagnostics = store
opts.Lifecycle = store
return resource.NewResourceServer(opts)
}