ResourceServer: Resource store sql backend (#90170)

This commit is contained in:
Georges Chaudy 2024-07-18 17:03:18 +02:00 committed by GitHub
parent bb40fb342a
commit 08c611c68b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
71 changed files with 2871 additions and 35 deletions

View File

@ -4,13 +4,10 @@ import (
"context"
"fmt"
"net/http"
"os"
"path"
"path/filepath"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"gocloud.dev/blob/fileblob"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -51,6 +48,7 @@ import (
"github.com/grafana/grafana/pkg/storage/unified/apistore"
"github.com/grafana/grafana/pkg/storage/unified/entitybridge"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql"
)
var (
@ -272,27 +270,11 @@ func (s *service) start(ctx context.Context) error {
}
case grafanaapiserveroptions.StorageTypeUnifiedNext:
// CDK (for now)
dir := filepath.Join(s.cfg.DataPath, "unistore", "resource")
if err := os.MkdirAll(dir, 0o750); err != nil {
return err
if !s.features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorage) {
return fmt.Errorf("unified storage requires the unifiedStorage feature flag")
}
bucket, err := fileblob.OpenBucket(dir, &fileblob.Options{
CreateDir: true,
Metadata: fileblob.MetadataDontWrite, // skip
})
if err != nil {
return err
}
backend, err := resource.NewCDKBackend(context.Background(), resource.CDKBackendOptions{
Tracer: s.tracing,
Bucket: bucket,
})
if err != nil {
return err
}
server, err := resource.NewResourceServer(resource.ResourceServerOptions{Backend: backend})
server, err := sql.ProvideResourceServer(s.db, s.cfg, s.features, s.tracing)
if err != nil {
return err
}

View File

@ -11,7 +11,7 @@ import (
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
"github.com/grafana/grafana/pkg/services/store/entity"
"github.com/grafana/grafana/pkg/services/store/entity/db"
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
)
func (s *sqlEntityServer) Create(ctx context.Context, r *entity.CreateEntityRequest) (*entity.CreateEntityResponse, error) {

View File

@ -10,7 +10,7 @@ import (
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
"github.com/grafana/grafana/pkg/services/store/entity"
"github.com/grafana/grafana/pkg/services/store/entity/db"
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
)
func (s *sqlEntityServer) Delete(ctx context.Context, r *entity.DeleteEntityRequest) (*entity.DeleteEntityResponse, error) {

View File

@ -8,7 +8,7 @@ import (
folder "github.com/grafana/grafana/pkg/apis/folder/v0alpha1"
"github.com/grafana/grafana/pkg/services/store/entity/db"
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
)
type folderInfo struct {

View File

@ -16,7 +16,7 @@ import (
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
"github.com/grafana/grafana/pkg/services/store/entity"
"github.com/grafana/grafana/pkg/services/store/entity/db"
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
)
// Templates setup.

View File

@ -16,7 +16,7 @@ import (
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
"github.com/grafana/grafana/pkg/services/store/entity"
"github.com/grafana/grafana/pkg/services/store/entity/db"
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
"github.com/grafana/grafana/pkg/util/testutil"
)

View File

@ -25,8 +25,8 @@ import (
"github.com/grafana/grafana/pkg/services/sqlstore/session"
"github.com/grafana/grafana/pkg/services/store/entity"
"github.com/grafana/grafana/pkg/services/store/entity/db"
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
)
const entityTable = "entity"

View File

@ -10,7 +10,7 @@ import (
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/sqlstore/session"
"github.com/grafana/grafana/pkg/services/store/entity"
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
"github.com/grafana/grafana/pkg/util/testutil"
)

View File

@ -11,7 +11,7 @@ import (
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
"github.com/grafana/grafana/pkg/services/store/entity"
"github.com/grafana/grafana/pkg/services/store/entity/db"
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
)
func (s *sqlEntityServer) Update(ctx context.Context, r *entity.UpdateEntityRequest) (*entity.UpdateEntityResponse, error) {

View File

@ -10,7 +10,7 @@ import (
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/services/store/entity/db"
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
)
func createETag(body []byte, meta []byte, status []byte) string {

View File

@ -16,8 +16,8 @@ import (
"github.com/grafana/grafana/pkg/services/store/entity/db"
"github.com/grafana/grafana/pkg/services/store/entity/db/dbimpl"
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate"
sqltemplateMocks "github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate/mocks"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
sqltemplateMocks "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate/mocks"
"github.com/grafana/grafana/pkg/util/testutil"
)

View File

@ -0,0 +1,733 @@
package sql
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"text/template"
"time"
"github.com/google/uuid"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/services/sqlstore/session"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
)
const trace_prefix = "sql.resource."
type backendOptions struct {
DB db.ResourceDBInterface
Tracer trace.Tracer
}
func NewBackendStore(opts backendOptions) (*backend, error) {
ctx, cancel := context.WithCancel(context.Background())
if opts.Tracer == nil {
opts.Tracer = noop.NewTracerProvider().Tracer("sql-backend")
}
return &backend{
db: opts.DB,
log: log.New("sql-resource-server"),
ctx: ctx,
cancel: cancel,
tracer: opts.Tracer,
}, nil
}
type backend struct {
log log.Logger
db db.ResourceDBInterface // needed to keep xorm engine in scope
sess *session.SessionDB
dialect migrator.Dialect
ctx context.Context // TODO: remove
cancel context.CancelFunc
tracer trace.Tracer
//stream chan *resource.WatchEvent
sqlDB db.DB
sqlDialect sqltemplate.Dialect
}
func (b *backend) Init() error {
if b.sess != nil {
return nil
}
if b.db == nil {
return errors.New("missing db")
}
err := b.db.Init()
if err != nil {
return err
}
sqlDB, err := b.db.GetDB()
if err != nil {
return err
}
b.sqlDB = sqlDB
driverName := sqlDB.DriverName()
driverName = strings.TrimSuffix(driverName, "WithHooks")
switch driverName {
case db.DriverMySQL:
b.sqlDialect = sqltemplate.MySQL
case db.DriverPostgres:
b.sqlDialect = sqltemplate.PostgreSQL
case db.DriverSQLite, db.DriverSQLite3:
b.sqlDialect = sqltemplate.SQLite
default:
return fmt.Errorf("no dialect for driver %q", driverName)
}
sess, err := b.db.GetSession()
if err != nil {
return err
}
engine, err := b.db.GetEngine()
if err != nil {
return err
}
b.sess = sess
b.dialect = migrator.NewDialect(engine.DriverName())
return nil
}
func (b *backend) IsHealthy(ctx context.Context, r *resource.HealthCheckRequest) (*resource.HealthCheckResponse, error) {
// ctxLogger := s.log.FromContext(log.WithContextualAttributes(ctx, []any{"method", "isHealthy"}))
if err := b.sqlDB.PingContext(ctx); err != nil {
return nil, err
}
// TODO: check the status of the watcher implementation as well
return &resource.HealthCheckResponse{Status: resource.HealthCheckResponse_SERVING}, nil
}
func (b *backend) Stop() {
b.cancel()
}
func (b *backend) WriteEvent(ctx context.Context, event resource.WriteEvent) (int64, error) {
_, span := b.tracer.Start(ctx, trace_prefix+"WriteEvent")
defer span.End()
// TODO: validate key ?
if err := b.Init(); err != nil {
return 0, err
}
switch event.Type {
case resource.WatchEvent_ADDED:
return b.create(ctx, event)
case resource.WatchEvent_MODIFIED:
return b.update(ctx, event)
case resource.WatchEvent_DELETED:
return b.delete(ctx, event)
default:
return 0, fmt.Errorf("unsupported event type")
}
}
func (b *backend) create(ctx context.Context, event resource.WriteEvent) (int64, error) {
ctx, span := b.tracer.Start(ctx, trace_prefix+"Create")
defer span.End()
var newVersion int64
guid := uuid.New().String()
err := b.sqlDB.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
// TODO: Set the Labels
// 1. Insert into resource
if _, err := exec(ctx, tx, sqlResourceInsert, sqlResourceRequest{
SQLTemplate: sqltemplate.New(b.sqlDialect),
WriteEvent: event,
GUID: guid,
}); err != nil {
return fmt.Errorf("insert into resource: %w", err)
}
// 2. Insert into resource history
if _, err := exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{
SQLTemplate: sqltemplate.New(b.sqlDialect),
WriteEvent: event,
GUID: guid,
}); err != nil {
return fmt.Errorf("insert into resource history: %w", err)
}
// 3. TODO: Rebuild the whole folder tree structure if we're creating a folder
// 4. Atomically increpement resource version for this kind
rv, err := resourceVersionAtomicInc(ctx, tx, b.sqlDialect, event.Key)
if err != nil {
return err
}
newVersion = rv
// 5. Update the RV in both resource and resource_history
if _, err = exec(ctx, tx, sqlResourceHistoryUpdateRV, sqlResourceUpdateRVRequest{
SQLTemplate: sqltemplate.New(b.sqlDialect),
GUID: guid,
ResourceVersion: newVersion,
}); err != nil {
return fmt.Errorf("update history rv: %w", err)
}
if _, err = exec(ctx, tx, sqlResourceUpdateRV, sqlResourceUpdateRVRequest{
SQLTemplate: sqltemplate.New(b.sqlDialect),
GUID: guid,
ResourceVersion: newVersion,
}); err != nil {
return fmt.Errorf("update resource rv: %w", err)
}
return nil
})
return newVersion, err
}
func (b *backend) update(ctx context.Context, event resource.WriteEvent) (int64, error) {
ctx, span := b.tracer.Start(ctx, trace_prefix+"Update")
defer span.End()
var newVersion int64
guid := uuid.New().String()
err := b.sqlDB.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
// TODO: Set the Labels
// 1. Update into resource
res, err := exec(ctx, tx, sqlResourceUpdate, sqlResourceRequest{
SQLTemplate: sqltemplate.New(b.sqlDialect),
WriteEvent: event,
GUID: guid,
})
if err != nil {
return fmt.Errorf("update into resource: %w", err)
}
count, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("update into resource: %w", err)
}
if count == 0 {
return fmt.Errorf("no rows affected")
}
// 2. Insert into resource history
if _, err := exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{
SQLTemplate: sqltemplate.New(b.sqlDialect),
WriteEvent: event,
GUID: guid,
}); err != nil {
return fmt.Errorf("insert into resource history: %w", err)
}
// 3. TODO: Rebuild the whole folder tree structure if we're creating a folder
// 4. Atomically increpement resource version for this kind
rv, err := resourceVersionAtomicInc(ctx, tx, b.sqlDialect, event.Key)
if err != nil {
return err
}
newVersion = rv
// 5. Update the RV in both resource and resource_history
if _, err = exec(ctx, tx, sqlResourceHistoryUpdateRV, sqlResourceUpdateRVRequest{
SQLTemplate: sqltemplate.New(b.sqlDialect),
GUID: guid,
ResourceVersion: newVersion,
}); err != nil {
return fmt.Errorf("update history rv: %w", err)
}
if _, err = exec(ctx, tx, sqlResourceUpdateRV, sqlResourceUpdateRVRequest{
SQLTemplate: sqltemplate.New(b.sqlDialect),
GUID: guid,
ResourceVersion: newVersion,
}); err != nil {
return fmt.Errorf("update resource rv: %w", err)
}
return nil
})
return newVersion, err
}
func (b *backend) delete(ctx context.Context, event resource.WriteEvent) (int64, error) {
ctx, span := b.tracer.Start(ctx, trace_prefix+"Delete")
defer span.End()
var newVersion int64
guid := uuid.New().String()
err := b.sqlDB.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
// TODO: Set the Labels
// 1. delete from resource
res, err := exec(ctx, tx, sqlResourceDelete, sqlResourceRequest{
SQLTemplate: sqltemplate.New(b.sqlDialect),
WriteEvent: event,
GUID: guid,
})
if err != nil {
return fmt.Errorf("delete resource: %w", err)
}
count, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("delete resource: %w", err)
}
if count == 0 {
return fmt.Errorf("no rows affected")
}
// 2. Add event to resource history
if _, err := exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{
SQLTemplate: sqltemplate.New(b.sqlDialect),
WriteEvent: event,
GUID: guid,
}); err != nil {
return fmt.Errorf("insert into resource history: %w", err)
}
// 3. TODO: Rebuild the whole folder tree structure if we're creating a folder
// 4. Atomically increpement resource version for this kind
newVersion, err = resourceVersionAtomicInc(ctx, tx, b.sqlDialect, event.Key)
if err != nil {
return err
}
// 5. Update the RV in resource_history
if _, err = exec(ctx, tx, sqlResourceHistoryUpdateRV, sqlResourceUpdateRVRequest{
SQLTemplate: sqltemplate.New(b.sqlDialect),
GUID: guid,
ResourceVersion: newVersion,
}); err != nil {
return fmt.Errorf("update history rv: %w", err)
}
return nil
})
return newVersion, err
}
func (b *backend) Read(ctx context.Context, req *resource.ReadRequest) (*resource.ReadResponse, error) {
_, span := b.tracer.Start(ctx, trace_prefix+".Read")
defer span.End()
// TODO: validate key ?
if err := b.Init(); err != nil {
return nil, err
}
readReq := sqlResourceReadRequest{
SQLTemplate: sqltemplate.New(b.sqlDialect),
Request: req,
readResponse: new(readResponse),
}
sr := sqlResourceRead
if req.ResourceVersion > 0 {
// read a specific version
sr = sqlResourceHistoryRead
}
res, err := queryRow(ctx, b.sqlDB, sr, readReq)
if errors.Is(err, sql.ErrNoRows) {
return nil, resource.ErrNotFound
} else if err != nil {
return nil, fmt.Errorf("get resource version: %w", err)
}
return &res.ReadResponse, nil
}
func (b *backend) PrepareList(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) {
_, span := b.tracer.Start(ctx, trace_prefix+"List")
defer span.End()
// TODO: think about how to handler VersionMatch. We should be able to use latest for the first page (only).
if req.ResourceVersion > 0 || req.NextPageToken != "" {
return b.listAtRevision(ctx, req)
}
return b.listLatest(ctx, req)
}
// listLatest fetches the resources from the resource table.
func (b *backend) listLatest(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) {
out := &resource.ListResponse{
Items: []*resource.ResourceWrapper{}, // TODO: we could pre-allocate the capacity if we estimate the number of items
ResourceVersion: 0,
}
err := b.sqlDB.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
var err error
// TODO: Here the lastest RV might be lower than the actual latest RV
// because delete events are not included in the resource table.
out.ResourceVersion, err = fetchLatestRV(ctx, tx)
if err != nil {
return err
}
// Fetch one extra row for Limit
lim := req.Limit
if req.Limit > 0 {
req.Limit++
}
listReq := sqlResourceListRequest{
SQLTemplate: sqltemplate.New(b.sqlDialect),
Request: req,
Response: new(resource.ResourceWrapper),
}
query, err := sqltemplate.Execute(sqlResourceList, listReq)
if err != nil {
return fmt.Errorf("execute SQL template to list resources: %w", err)
}
rows, err := tx.QueryContext(ctx, query, listReq.GetArgs()...)
if err != nil {
return fmt.Errorf("list latest resources: %w", err)
}
defer func() { _ = rows.Close() }()
for i := int64(1); rows.Next(); i++ {
if ctx.Err() != nil {
return ctx.Err()
}
if err := rows.Scan(listReq.GetScanDest()...); err != nil {
return fmt.Errorf("scan row #%d: %w", i, err)
}
if lim > 0 && i > lim {
continueToken := &ContinueToken{ResourceVersion: out.ResourceVersion, StartOffset: lim}
out.NextPageToken = continueToken.String()
break
}
out.Items = append(out.Items, &resource.ResourceWrapper{
ResourceVersion: listReq.Response.ResourceVersion,
Value: listReq.Response.Value,
})
}
return nil
})
return out, err
}
// listAtRevision fetches the resources from the resource_history table at a specific revision.
func (b *backend) listAtRevision(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) {
// Get the RV
rv := req.ResourceVersion
offset := int64(0)
if req.NextPageToken != "" {
continueToken, err := GetContinueToken(req.NextPageToken)
if err != nil {
return nil, fmt.Errorf("get continue token: %w", err)
}
rv = continueToken.ResourceVersion
offset = continueToken.StartOffset
}
out := &resource.ListResponse{
Items: []*resource.ResourceWrapper{}, // TODO: we could pre-allocate the capacity if we estimate the number of items
ResourceVersion: rv,
}
err := b.sqlDB.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
var err error
// Fetch one extra row for Limit
lim := req.Limit
if lim > 0 {
req.Limit++
}
listReq := sqlResourceHistoryListRequest{
SQLTemplate: sqltemplate.New(b.sqlDialect),
Request: &historyListRequest{
ResourceVersion: rv,
Limit: req.Limit,
Offset: offset,
Options: req.Options,
},
Response: new(resource.ResourceWrapper),
}
query, err := sqltemplate.Execute(sqlResourceHistoryList, listReq)
if err != nil {
return fmt.Errorf("execute SQL template to list resources at revision: %w", err)
}
rows, err := tx.QueryContext(ctx, query, listReq.GetArgs()...)
if err != nil {
return fmt.Errorf("list resources at revision: %w", err)
}
defer func() { _ = rows.Close() }()
for i := int64(1); rows.Next(); i++ {
if ctx.Err() != nil {
return ctx.Err()
}
if err := rows.Scan(listReq.GetScanDest()...); err != nil {
return fmt.Errorf("scan row #%d: %w", i, err)
}
if lim > 0 && i > lim {
continueToken := &ContinueToken{ResourceVersion: out.ResourceVersion, StartOffset: offset + lim}
out.NextPageToken = continueToken.String()
break
}
out.Items = append(out.Items, &resource.ResourceWrapper{
ResourceVersion: listReq.Response.ResourceVersion,
Value: listReq.Response.Value,
})
}
return nil
})
return out, err
}
func (b *backend) WatchWriteEvents(ctx context.Context) (<-chan *resource.WrittenEvent, error) {
if err := b.Init(); err != nil {
return nil, err
}
// Fetch the lastest RV
since, err := fetchLatestRV(ctx, b.sqlDB)
if err != nil {
return nil, err
}
// Start the poller
stream := make(chan *resource.WrittenEvent)
go b.poller(ctx, since, stream)
return stream, nil
}
func (b *backend) poller(ctx context.Context, since int64, stream chan<- *resource.WrittenEvent) {
var err error
interval := 100 * time.Millisecond // TODO make this configurable
t := time.NewTicker(interval)
defer close(stream)
defer t.Stop()
for {
select {
case <-b.ctx.Done():
return
case <-t.C:
since, err = b.poll(ctx, since, stream)
if err != nil {
b.log.Error("watch error", "err", err)
}
t.Reset(interval)
}
}
}
// fetchLatestRV returns the current maxium RV in the resource table
func fetchLatestRV(ctx context.Context, db db.ContextExecer) (int64, error) {
// Fetch the lastest RV
rows, err := db.QueryContext(ctx, `SELECT COALESCE(max("resource_version"), 0) FROM "resource";`)
if err != nil {
return 0, fmt.Errorf("fetch latest rv: %w", err)
}
defer func() { _ = rows.Close() }()
if rows.Next() {
rv := new(int64)
if err := rows.Scan(&rv); err != nil {
return 0, fmt.Errorf("scan since resource version: %w", err)
}
return *rv, nil
}
return 0, fmt.Errorf("no rows")
}
func (b *backend) poll(ctx context.Context, since int64, stream chan<- *resource.WrittenEvent) (int64, error) {
ctx, span := b.tracer.Start(ctx, trace_prefix+"poll")
defer span.End()
pollReq := sqlResourceHistoryPollRequest{
SQLTemplate: sqltemplate.New(b.sqlDialect),
SinceResourceVersion: since,
Response: new(historyPollResponse),
}
query, err := sqltemplate.Execute(sqlResourceHistoryPoll, pollReq)
if err != nil {
return 0, fmt.Errorf("execute SQL template to poll for resource history: %w", err)
}
rows, err := b.sqlDB.QueryContext(ctx, query, pollReq.GetArgs()...)
if err != nil {
return 0, fmt.Errorf("poll for resource history: %w", err)
}
defer func() { _ = rows.Close() }()
next := since
for i := 1; rows.Next(); i++ {
// check if the context is done
if ctx.Err() != nil {
return 0, ctx.Err()
}
if err := rows.Scan(pollReq.GetScanDest()...); err != nil {
return 0, fmt.Errorf("scan row #%d polling for resource history: %w", i, err)
}
resp := pollReq.Response
next = resp.ResourceVersion
stream <- &resource.WrittenEvent{
WriteEvent: resource.WriteEvent{
Value: resp.Value,
Key: &resource.ResourceKey{
Namespace: resp.Key.Namespace,
Group: resp.Key.Group,
Resource: resp.Key.Resource,
Name: resp.Key.Name,
},
Type: resource.WatchEvent_Type(resp.Action),
},
ResourceVersion: resp.ResourceVersion,
// Timestamp: , // TODO: add timestamp
}
}
return next, nil
}
// resourceVersionAtomicInc atomically increases the version of a kind within a
// transaction.
// TODO: Ideally we should attempt to update the RV in the resource and resource_history tables
// in a single roundtrip. This would reduce the latency of the operation, and also increase the
// throughput of the system. This is a good candidate for a future optimization.
func resourceVersionAtomicInc(ctx context.Context, x db.ContextExecer, d sqltemplate.Dialect, key *resource.ResourceKey) (newVersion int64, err error) {
// TODO: refactor this code to run in a multi-statement transaction in order to minimise the number of roundtrips.
// 1 Lock the row for update
req := sqlResourceVersionRequest{
SQLTemplate: sqltemplate.New(d),
Group: key.Group,
Resource: key.Resource,
resourceVersion: new(resourceVersion),
}
rv, err := queryRow(ctx, x, sqlResourceVersionGet, req)
if errors.Is(err, sql.ErrNoRows) {
// if there wasn't a row associated with the given resource, we create one with
// version 1
if _, err = exec(ctx, x, sqlResourceVersionInsert, sqlResourceVersionRequest{
SQLTemplate: sqltemplate.New(d),
Group: key.Group,
Resource: key.Resource,
}); err != nil {
return 0, fmt.Errorf("insert into resource_version: %w", err)
}
return 1, nil
}
if err != nil {
return 0, fmt.Errorf("increase resource version: %w", err)
}
nextRV := rv.ResourceVersion + 1
// 2. Increment the resource version
res, err := exec(ctx, x, sqlResourceVersionInc, sqlResourceVersionRequest{
SQLTemplate: sqltemplate.New(d),
Group: key.Group,
Resource: key.Resource,
resourceVersion: &resourceVersion{
ResourceVersion: nextRV,
},
})
if err != nil {
return 0, fmt.Errorf("increase resource version: %w", err)
}
if count, err := res.RowsAffected(); err != nil || count == 0 {
return 0, fmt.Errorf("increase resource version did not affect any rows: %w", err)
}
// 3. Retun the incremended value
return nextRV, nil
}
// exec uses `req` as input for a non-data returning query generated with
// `tmpl`, and executed in `x`.
func exec(ctx context.Context, x db.ContextExecer, tmpl *template.Template, req sqltemplate.SQLTemplateIface) (sql.Result, error) {
if err := req.Validate(); err != nil {
return nil, fmt.Errorf("exec: invalid request for template %q: %w",
tmpl.Name(), err)
}
rawQuery, err := sqltemplate.Execute(tmpl, req)
if err != nil {
return nil, fmt.Errorf("execute template: %w", err)
}
query := sqltemplate.FormatSQL(rawQuery)
res, err := x.ExecContext(ctx, query, req.GetArgs()...)
if err != nil {
return nil, SQLError{
Err: err,
CallType: "Exec",
TemplateName: tmpl.Name(),
arguments: req.GetArgs(),
Query: query,
RawQuery: rawQuery,
}
}
return res, nil
}
// queryRow uses `req` as input and output for a single-row returning query
// generated with `tmpl`, and executed in `x`.
func queryRow[T any](ctx context.Context, x db.ContextExecer, tmpl *template.Template, req sqltemplate.WithResults[T]) (T, error) {
var zero T
if err := req.Validate(); err != nil {
return zero, fmt.Errorf("query: invalid request for template %q: %w",
tmpl.Name(), err)
}
rawQuery, err := sqltemplate.Execute(tmpl, req)
if err != nil {
return zero, fmt.Errorf("execute template: %w", err)
}
query := sqltemplate.FormatSQL(rawQuery)
row := x.QueryRowContext(ctx, query, req.GetArgs()...)
if err := row.Err(); err != nil {
return zero, SQLError{
Err: err,
CallType: "QueryRow",
TemplateName: tmpl.Name(),
arguments: req.GetArgs(),
ScanDest: req.GetScanDest(),
Query: query,
RawQuery: rawQuery,
}
}
return scanRow(row, req)
}
type scanner interface {
Scan(dest ...any) error
}
// scanRow is used on *sql.Row and *sql.Rows, and is factored out here not to
// improving code reuse, but rather for ease of testing.
func scanRow[T any](sc scanner, req sqltemplate.WithResults[T]) (zero T, err error) {
if err = sc.Scan(req.GetScanDest()...); err != nil {
return zero, fmt.Errorf("row scan: %w", err)
}
res, err := req.Results()
if err != nil {
return zero, fmt.Errorf("row results: %w", err)
}
return res, nil
}

View File

@ -0,0 +1,254 @@
package sql
import (
"context"
"testing"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
"github.com/grafana/grafana/pkg/tests/testsuite"
"github.com/stretchr/testify/assert"
)
func TestMain(m *testing.M) {
testsuite.Run(m)
}
func TestBackendHappyPath(t *testing.T) {
ctx := context.Background()
dbstore := db.InitTestDB(t)
rdb, err := dbimpl.ProvideResourceDB(dbstore, setting.NewCfg(), featuremgmt.WithFeatures(featuremgmt.FlagUnifiedStorage), nil)
assert.NoError(t, err)
store, err := NewBackendStore(backendOptions{
DB: rdb,
})
assert.NoError(t, err)
assert.NotNil(t, store)
stream, err := store.WatchWriteEvents(ctx)
assert.NoError(t, err)
t.Run("Add 3 resources", func(t *testing.T) {
rv, err := writeEvent(ctx, store, "item1", resource.WatchEvent_ADDED)
assert.NoError(t, err)
assert.Equal(t, int64(1), rv)
rv, err = writeEvent(ctx, store, "item2", resource.WatchEvent_ADDED)
assert.NoError(t, err)
assert.Equal(t, int64(2), rv)
rv, err = writeEvent(ctx, store, "item3", resource.WatchEvent_ADDED)
assert.NoError(t, err)
assert.Equal(t, int64(3), rv)
})
t.Run("Update item2", func(t *testing.T) {
rv, err := writeEvent(ctx, store, "item2", resource.WatchEvent_MODIFIED)
assert.NoError(t, err)
assert.Equal(t, int64(4), rv)
})
t.Run("Delete item1", func(t *testing.T) {
rv, err := writeEvent(ctx, store, "item1", resource.WatchEvent_DELETED)
assert.NoError(t, err)
assert.Equal(t, int64(5), rv)
})
t.Run("Read latest item 2", func(t *testing.T) {
resp, err := store.Read(ctx, &resource.ReadRequest{Key: resourceKey("item2")})
assert.NoError(t, err)
assert.Equal(t, int64(4), resp.ResourceVersion)
assert.Equal(t, "item2 MODIFIED", string(resp.Value))
})
t.Run("Read early verion of item2", func(t *testing.T) {
resp, err := store.Read(ctx, &resource.ReadRequest{
Key: resourceKey("item2"),
ResourceVersion: 3, // item2 was created at rv=2 and updated at rv=4
})
assert.NoError(t, err)
assert.Equal(t, int64(2), resp.ResourceVersion)
assert.Equal(t, "item2 ADDED", string(resp.Value))
})
t.Run("PrepareList latest", func(t *testing.T) {
resp, err := store.PrepareList(ctx, &resource.ListRequest{})
assert.NoError(t, err)
assert.Len(t, resp.Items, 2)
assert.Equal(t, "item2 MODIFIED", string(resp.Items[0].Value))
assert.Equal(t, "item3 ADDED", string(resp.Items[1].Value))
assert.Equal(t, int64(4), resp.ResourceVersion)
})
t.Run("Watch events", func(t *testing.T) {
event := <-stream
assert.Equal(t, "item1", event.Key.Name)
assert.Equal(t, int64(1), event.ResourceVersion)
assert.Equal(t, resource.WatchEvent_ADDED, event.Type)
event = <-stream
assert.Equal(t, "item2", event.Key.Name)
assert.Equal(t, int64(2), event.ResourceVersion)
assert.Equal(t, resource.WatchEvent_ADDED, event.Type)
event = <-stream
assert.Equal(t, "item3", event.Key.Name)
assert.Equal(t, int64(3), event.ResourceVersion)
assert.Equal(t, resource.WatchEvent_ADDED, event.Type)
event = <-stream
assert.Equal(t, "item2", event.Key.Name)
assert.Equal(t, int64(4), event.ResourceVersion)
assert.Equal(t, resource.WatchEvent_MODIFIED, event.Type)
event = <-stream
assert.Equal(t, "item1", event.Key.Name)
assert.Equal(t, int64(5), event.ResourceVersion)
assert.Equal(t, resource.WatchEvent_DELETED, event.Type)
})
}
func TestBackendWatchWriteEventsFromLastest(t *testing.T) {
ctx := context.Background()
dbstore := db.InitTestDB(t)
rdb, err := dbimpl.ProvideResourceDB(dbstore, setting.NewCfg(), featuremgmt.WithFeatures(featuremgmt.FlagUnifiedStorage), nil)
assert.NoError(t, err)
store, err := NewBackendStore(backendOptions{
DB: rdb,
})
assert.NoError(t, err)
assert.NotNil(t, store)
// Create a few resources before initing the watch
_, err = writeEvent(ctx, store, "item1", resource.WatchEvent_ADDED)
assert.NoError(t, err)
// Start the watch
stream, err := store.WatchWriteEvents(ctx)
assert.NoError(t, err)
// Create one more event
_, err = writeEvent(ctx, store, "item2", resource.WatchEvent_ADDED)
assert.NoError(t, err)
assert.Equal(t, "item2", (<-stream).Key.Name)
}
func TestBackendPrepareList(t *testing.T) {
ctx := context.Background()
dbstore := db.InitTestDB(t)
rdb, err := dbimpl.ProvideResourceDB(dbstore, setting.NewCfg(), featuremgmt.WithFeatures(featuremgmt.FlagUnifiedStorage), nil)
assert.NoError(t, err)
store, err := NewBackendStore(backendOptions{
DB: rdb,
})
assert.NoError(t, err)
assert.NotNil(t, store)
// Create a few resources before initing the watch
_, _ = writeEvent(ctx, store, "item1", resource.WatchEvent_ADDED) // rv=1
_, _ = writeEvent(ctx, store, "item2", resource.WatchEvent_ADDED) // rv=2 - will be modified at rv=6
_, _ = writeEvent(ctx, store, "item3", resource.WatchEvent_ADDED) // rv=3 - will be deleted at rv=7
_, _ = writeEvent(ctx, store, "item4", resource.WatchEvent_ADDED) // rv=4
_, _ = writeEvent(ctx, store, "item5", resource.WatchEvent_ADDED) // rv=5
_, _ = writeEvent(ctx, store, "item2", resource.WatchEvent_MODIFIED) // rv=6
_, _ = writeEvent(ctx, store, "item3", resource.WatchEvent_DELETED) // rv=7
_, _ = writeEvent(ctx, store, "item6", resource.WatchEvent_ADDED) // rv=8
t.Run("fetch all latest", func(t *testing.T) {
res, err := store.PrepareList(ctx, &resource.ListRequest{})
assert.NoError(t, err)
assert.Len(t, res.Items, 5)
assert.Empty(t, res.NextPageToken)
})
t.Run("list latest first page ", func(t *testing.T) {
res, err := store.PrepareList(ctx, &resource.ListRequest{
Limit: 3,
})
assert.NoError(t, err)
assert.Len(t, res.Items, 3)
continueToken, err := GetContinueToken(res.NextPageToken)
assert.NoError(t, err)
assert.Equal(t, int64(8), continueToken.ResourceVersion)
assert.Equal(t, int64(3), continueToken.StartOffset)
})
t.Run("list at revision", func(t *testing.T) {
res, err := store.PrepareList(ctx, &resource.ListRequest{
ResourceVersion: 4,
})
assert.NoError(t, err)
assert.Len(t, res.Items, 4)
assert.Equal(t, "item1 ADDED", string(res.Items[0].Value))
assert.Equal(t, "item2 ADDED", string(res.Items[1].Value))
assert.Equal(t, "item3 ADDED", string(res.Items[2].Value))
assert.Equal(t, "item4 ADDED", string(res.Items[3].Value))
assert.Empty(t, res.NextPageToken)
})
t.Run("fetch first page at revision with limit", func(t *testing.T) {
res, err := store.PrepareList(ctx, &resource.ListRequest{
Limit: 3,
ResourceVersion: 7,
})
assert.NoError(t, err)
assert.Len(t, res.Items, 3)
assert.Equal(t, "item1 ADDED", string(res.Items[0].Value))
assert.Equal(t, "item4 ADDED", string(res.Items[1].Value))
assert.Equal(t, "item5 ADDED", string(res.Items[2].Value))
continueToken, err := GetContinueToken(res.NextPageToken)
assert.NoError(t, err)
assert.Equal(t, int64(7), continueToken.ResourceVersion)
assert.Equal(t, int64(3), continueToken.StartOffset)
})
t.Run("fetch second page at revision", func(t *testing.T) {
continueToken := &ContinueToken{
ResourceVersion: 8,
StartOffset: 2,
}
res, err := store.PrepareList(ctx, &resource.ListRequest{
NextPageToken: continueToken.String(),
Limit: 2,
})
assert.NoError(t, err)
assert.Len(t, res.Items, 2)
assert.Equal(t, "item5 ADDED", string(res.Items[0].Value))
assert.Equal(t, "item2 MODIFIED", string(res.Items[1].Value))
continueToken, err = GetContinueToken(res.NextPageToken)
assert.NoError(t, err)
assert.Equal(t, int64(8), continueToken.ResourceVersion)
assert.Equal(t, int64(4), continueToken.StartOffset)
})
}
func writeEvent(ctx context.Context, store *backend, name string, action resource.WatchEvent_Type) (int64, error) {
return store.WriteEvent(ctx, resource.WriteEvent{
Type: action,
Value: []byte(name + " " + resource.WatchEvent_Type_name[int32(action)]),
Key: &resource.ResourceKey{
Namespace: "namespace",
Group: "group",
Resource: "resource",
Name: name,
},
})
}
func resourceKey(name string) *resource.ResourceKey {
return &resource.ResourceKey{
Namespace: "namespace",
Group: "group",
Resource: "resource",
Name: name,
}
}

View File

@ -0,0 +1,32 @@
package sql
import (
"encoding/base64"
"encoding/json"
"fmt"
)
type ContinueToken struct {
StartOffset int64 `json:"o"`
ResourceVersion int64 `json:"v"`
}
func (c *ContinueToken) String() string {
b, _ := json.Marshal(c)
return base64.StdEncoding.EncodeToString(b)
}
func GetContinueToken(token string) (*ContinueToken, error) {
continueVal, err := base64.StdEncoding.DecodeString(token)
if err != nil {
return nil, fmt.Errorf("error decoding continue token")
}
t := &ContinueToken{}
err = json.Unmarshal(continueVal, t)
if err != nil {
return nil, err
}
return t, nil
}

View File

@ -0,0 +1,7 @@
DELETE FROM {{ .Ident "resource" }}
WHERE 1 = 1
AND {{ .Ident "namespace" }} = {{ .Arg .WriteEvent.Key.Namespace }}
AND {{ .Ident "group" }} = {{ .Arg .WriteEvent.Key.Group }}
AND {{ .Ident "resource" }} = {{ .Arg .WriteEvent.Key.Resource }}
AND {{ .Ident "name" }} = {{ .Arg .WriteEvent.Key.Name }}
;

View File

@ -0,0 +1,23 @@
INSERT INTO {{ .Ident "resource_history" }}
(
{{ .Ident "guid" }},
{{ .Ident "group" }},
{{ .Ident "resource" }},
{{ .Ident "namespace" }},
{{ .Ident "name" }},
{{ .Ident "value" }},
{{ .Ident "action" }}
)
VALUES (
{{ .Arg .GUID }},
{{ .Arg .WriteEvent.Key.Group }},
{{ .Arg .WriteEvent.Key.Resource }},
{{ .Arg .WriteEvent.Key.Namespace }},
{{ .Arg .WriteEvent.Key.Name }},
{{ .Arg .WriteEvent.Value }},
{{ .Arg .WriteEvent.Type }}
)
;

View File

@ -0,0 +1,32 @@
SELECT
kv.{{ .Ident "resource_version" | .Into .Response.ResourceVersion }},
{{ .Ident "value" | .Into .Response.Value }}
FROM {{ .Ident "resource_history" }} as kv
JOIN (
SELECT {{ .Ident "guid" }}, max({{ .Ident "resource_version" }}) AS {{ .Ident "resource_version" }}
FROM {{ .Ident "resource_history" }} AS mkv
WHERE 1 = 1
AND {{ .Ident "resource_version" }} <= {{ .Arg .Request.ResourceVersion }}
{{ if and .Request.Options .Request.Options.Key }}
{{ if .Request.Options.Key.Namespace }}
AND {{ .Ident "namespace" }} = {{ .Arg .Request.Options.Key.Namespace }}
{{ end }}
{{ if .Request.Options.Key.Group }}
AND {{ .Ident "group" }} = {{ .Arg .Request.Options.Key.Group }}
{{ end }}
{{ if .Request.Options.Key.Resource }}
AND {{ .Ident "resource" }} = {{ .Arg .Request.Options.Key.Resource }}
{{ end }}
{{ if .Request.Options.Key.Name }}
AND {{ .Ident "name" }} = {{ .Arg .Request.Options.Key.Name }}
{{ end }}
{{ end }}
GROUP BY mkv.{{ .Ident "namespace" }}, mkv.{{ .Ident "group" }}, mkv.{{ .Ident "resource" }}, mkv.{{ .Ident "name" }}
) AS maxkv
ON maxkv.{{ .Ident "guid" }} = kv.{{ .Ident "guid" }}
WHERE kv.{{ .Ident "action" }} != 3
ORDER BY kv.{{ .Ident "resource_version" }} ASC
{{ if (gt .Request.Limit 0) }}
LIMIT {{ .Arg .Request.Offset }}, {{ .Arg .Request.Limit }}
{{ end }}
;

View File

@ -0,0 +1,12 @@
SELECT
{{ .Ident "resource_version" | .Into .Response.ResourceVersion }},
{{ .Ident "namespace" | .Into .Response.Key.Namespace }},
{{ .Ident "group" | .Into .Response.Key.Group }},
{{ .Ident "resource" | .Into .Response.Key.Resource }},
{{ .Ident "name" | .Into .Response.Key.Name }},
{{ .Ident "value" | .Into .Response.Value }},
{{ .Ident "action" | .Into .Response.Action }}
FROM {{ .Ident "resource_history" }}
WHERE {{ .Ident "resource_version" }} > {{ .Arg .SinceResourceVersion }}
;

View File

@ -0,0 +1,17 @@
SELECT
{{ .Ident "resource_version" | .Into .ResourceVersion }},
{{ .Ident "value" | .Into .Value }}
FROM {{ .Ident "resource_history" }}
WHERE 1 = 1
AND {{ .Ident "namespace" }} = {{ .Arg .Request.Key.Namespace }}
AND {{ .Ident "group" }} = {{ .Arg .Request.Key.Group }}
AND {{ .Ident "resource" }} = {{ .Arg .Request.Key.Resource }}
AND {{ .Ident "name" }} = {{ .Arg .Request.Key.Name }}
{{ if gt .Request.ResourceVersion 0 }}
AND {{ .Ident "resource_version" }} <= {{ .Arg .Request.ResourceVersion }}
{{ end }}
ORDER BY {{ .Ident "resource_version" }} DESC
LIMIT 1
;

View File

@ -0,0 +1,4 @@
UPDATE {{ .Ident "resource_history" }}
SET {{ .Ident "resource_version" }} = {{ .Arg .ResourceVersion }}
WHERE {{ .Ident "guid" }} = {{ .Arg .GUID }}
;

View File

@ -0,0 +1,23 @@
INSERT INTO {{ .Ident "resource" }}
(
{{ .Ident "guid" }},
{{ .Ident "group" }},
{{ .Ident "resource" }},
{{ .Ident "namespace" }},
{{ .Ident "name" }},
{{ .Ident "value" }},
{{ .Ident "action" }}
)
VALUES (
{{ .Arg .GUID }},
{{ .Arg .WriteEvent.Key.Group }},
{{ .Arg .WriteEvent.Key.Resource }},
{{ .Arg .WriteEvent.Key.Namespace }},
{{ .Arg .WriteEvent.Key.Name }},
{{ .Arg .WriteEvent.Value }},
{{ .Arg .WriteEvent.Type }}
)
;

View File

@ -0,0 +1,24 @@
SELECT
{{ .Ident "resource_version" | .Into .Response.ResourceVersion }},
{{ .Ident "value" | .Into .Response.Value }}
FROM {{ .Ident "resource" }}
WHERE 1 = 1
{{ if and .Request.Options .Request.Options.Key }}
{{ if .Request.Options.Key.Namespace }}
AND {{ .Ident "namespace" }} = {{ .Arg .Request.Options.Key.Namespace }}
{{ end }}
{{ if .Request.Options.Key.Group }}
AND {{ .Ident "group" }} = {{ .Arg .Request.Options.Key.Group }}
{{ end }}
{{ if .Request.Options.Key.Resource }}
AND {{ .Ident "resource" }} = {{ .Arg .Request.Options.Key.Resource }}
{{ end }}
{{ if .Request.Options.Key.Name }}
AND {{ .Ident "name" }} = {{ .Arg .Request.Options.Key.Name }}
{{ end }}
{{ end }}
ORDER BY {{ .Ident "resource_version" }} DESC
{{ if (gt .Request.Limit 0) }}
LIMIT {{ .Arg .Request.Limit }}
{{ end }}
;

View File

@ -0,0 +1,10 @@
SELECT
{{ .Ident "resource_version" | .Into .ResourceVersion }},
{{ .Ident "value" | .Into .Value }}
FROM {{ .Ident "resource" }}
WHERE 1 = 1
AND {{ .Ident "namespace" }} = {{ .Arg .Request.Key.Namespace }}
AND {{ .Ident "group" }} = {{ .Arg .Request.Key.Group }}
AND {{ .Ident "resource" }} = {{ .Arg .Request.Key.Resource }}
AND {{ .Ident "name" }} = {{ .Arg .Request.Key.Name }}
;

View File

@ -0,0 +1,11 @@
UPDATE {{ .Ident "resource" }}
SET
{{ .Ident "guid" }} = {{ .Arg .GUID }},
{{ .Ident "value" }} = {{ .Arg .WriteEvent.Value }},
{{ .Ident "action" }} = {{ .Arg .WriteEvent.Type }}
WHERE 1 = 1
AND {{ .Ident "group" }} = {{ .Arg .WriteEvent.Key.Group }}
AND {{ .Ident "resource" }} = {{ .Arg .WriteEvent.Key.Resource }}
AND {{ .Ident "namespace" }} = {{ .Arg .WriteEvent.Key.Namespace }}
AND {{ .Ident "name" }} = {{ .Arg .WriteEvent.Key.Name }}
;

View File

@ -0,0 +1,4 @@
UPDATE {{ .Ident "resource" }}
SET {{ .Ident "resource_version" }} = {{ .Arg .ResourceVersion }}
WHERE {{ .Ident "guid" }} = {{ .Arg .GUID }}
;

View File

@ -0,0 +1,8 @@
SELECT
{{ .Ident "resource_version" | .Into .ResourceVersion }}
FROM {{ .Ident "resource_version" }}
WHERE 1 = 1
AND {{ .Ident "group" }} = {{ .Arg .Group }}
AND {{ .Ident "resource" }} = {{ .Arg .Resource }}
{{ .SelectFor "UPDATE" }}
;

View File

@ -0,0 +1,7 @@
UPDATE {{ .Ident "resource_version" }}
SET
{{ .Ident "resource_version" }} = {{ .Arg .ResourceVersion}}
WHERE 1 = 1
AND {{ .Ident "group" }} = {{ .Arg .Group }}
AND {{ .Ident "resource" }} = {{ .Arg .Resource }}
;

View File

@ -0,0 +1,13 @@
INSERT INTO {{ .Ident "resource_version" }}
(
{{ .Ident "group" }},
{{ .Ident "resource" }},
{{ .Ident "resource_version" }}
)
VALUES (
{{ .Arg .Group }},
{{ .Arg .Resource }},
1
)
;

View File

@ -0,0 +1,59 @@
package dbimpl
import (
"context"
"database/sql"
"fmt"
resourcedb "github.com/grafana/grafana/pkg/storage/unified/sql/db"
)
func NewDB(d *sql.DB, driverName string) resourcedb.DB {
return sqldb{
DB: d,
driverName: driverName,
}
}
type sqldb struct {
*sql.DB
driverName string
}
func (d sqldb) DriverName() string {
return d.driverName
}
func (d sqldb) BeginTx(ctx context.Context, opts *sql.TxOptions) (resourcedb.Tx, error) {
t, err := d.DB.BeginTx(ctx, opts)
if err != nil {
return nil, err
}
return tx{
Tx: t,
}, nil
}
func (d sqldb) WithTx(ctx context.Context, opts *sql.TxOptions, f resourcedb.TxFunc) error {
t, err := d.BeginTx(ctx, opts)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
if err := f(ctx, t); err != nil {
if rollbackErr := t.Rollback(); rollbackErr != nil {
return fmt.Errorf("tx err: %w; rollback err: %w", err, rollbackErr)
}
return fmt.Errorf("tx err: %w", err)
}
if err = t.Commit(); err != nil {
return fmt.Errorf("commit err: %w", err)
}
return nil
}
type tx struct {
*sql.Tx
}

View File

@ -0,0 +1,105 @@
package dbimpl
import (
"cmp"
"fmt"
"strings"
"time"
"github.com/go-sql-driver/mysql"
"xorm.io/xorm"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/store/entity/db"
)
func getEngineMySQL(getter *sectionGetter, _ tracing.Tracer) (*xorm.Engine, error) {
config := mysql.NewConfig()
config.User = getter.String("db_user")
config.Passwd = getter.String("db_pass")
config.Net = "tcp"
config.Addr = getter.String("db_host")
config.DBName = getter.String("db_name")
config.Params = map[string]string{
// See: https://dev.mysql.com/doc/refman/en/sql-mode.html
"@@SESSION.sql_mode": "ANSI",
}
config.Collation = "utf8mb4_unicode_ci"
config.Loc = time.UTC
config.AllowNativePasswords = true
config.ClientFoundRows = true
// TODO: do we want to support these?
// config.ServerPubKey = getter.String("db_server_pub_key")
// config.TLSConfig = getter.String("db_tls_config_name")
if err := getter.Err(); err != nil {
return nil, fmt.Errorf("config error: %w", err)
}
if strings.HasPrefix(config.Addr, "/") {
config.Net = "unix"
}
// FIXME: get rid of xorm
engine, err := xorm.NewEngine(db.DriverMySQL, config.FormatDSN())
if err != nil {
return nil, fmt.Errorf("open database: %w", err)
}
engine.SetMaxOpenConns(0)
engine.SetMaxIdleConns(2)
engine.SetConnMaxLifetime(4 * time.Hour)
return engine, nil
}
func getEnginePostgres(getter *sectionGetter, _ tracing.Tracer) (*xorm.Engine, error) {
dsnKV := map[string]string{
"user": getter.String("db_user"),
"password": getter.String("db_pass"),
"dbname": getter.String("db_name"),
"sslmode": cmp.Or(getter.String("db_sslmode"), "disable"),
}
// TODO: probably interesting:
// "passfile", "statement_timeout", "lock_timeout", "connect_timeout"
// TODO: for CockroachDB, we probably need to use the following:
// dsnKV["options"] = "-c enable_experimental_alter_column_type_general=true"
// Or otherwise specify it as:
// dsnKV["enable_experimental_alter_column_type_general"] = "true"
// TODO: do we want to support these options in the DSN as well?
// "sslkey", "sslcert", "sslrootcert", "sslpassword", "sslsni", "krbspn",
// "krbsrvname", "target_session_attrs", "service", "servicefile"
// More on Postgres connection string parameters:
// https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING
hostport := getter.String("db_host")
if err := getter.Err(); err != nil {
return nil, fmt.Errorf("config error: %w", err)
}
host, port, err := splitHostPortDefault(hostport, "127.0.0.1", "5432")
if err != nil {
return nil, fmt.Errorf("invalid db_host: %w", err)
}
dsnKV["host"] = host
dsnKV["port"] = port
dsn, err := MakeDSN(dsnKV)
if err != nil {
return nil, fmt.Errorf("error building DSN: %w", err)
}
// FIXME: get rid of xorm
engine, err := xorm.NewEngine(db.DriverPostgres, dsn)
if err != nil {
return nil, fmt.Errorf("open database: %w", err)
}
return engine, nil
}

View File

@ -0,0 +1,92 @@
package dbimpl
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestGetEngineMySQLFromConfig(t *testing.T) {
t.Parallel()
t.Run("happy path", func(t *testing.T) {
t.Parallel()
getter := newTestSectionGetter(map[string]string{
"db_type": "mysql",
"db_host": "/var/run/mysql.socket",
"db_name": "grafana",
"db_user": "user",
"db_password": "password",
})
engine, err := getEngineMySQL(getter, nil)
assert.NotNil(t, engine)
assert.NoError(t, err)
})
t.Run("invalid string", func(t *testing.T) {
t.Parallel()
getter := newTestSectionGetter(map[string]string{
"db_type": "mysql",
"db_host": "/var/run/mysql.socket",
"db_name": string(invalidUTF8ByteSequence),
"db_user": "user",
"db_password": "password",
})
engine, err := getEngineMySQL(getter, nil)
assert.Nil(t, engine)
assert.Error(t, err)
assert.ErrorIs(t, err, ErrInvalidUTF8Sequence)
})
}
func TestGetEnginePostgresFromConfig(t *testing.T) {
t.Parallel()
t.Run("happy path", func(t *testing.T) {
t.Parallel()
getter := newTestSectionGetter(map[string]string{
"db_type": "mysql",
"db_host": "localhost",
"db_name": "grafana",
"db_user": "user",
"db_password": "password",
})
engine, err := getEnginePostgres(getter, nil)
assert.NotNil(t, engine)
assert.NoError(t, err)
})
t.Run("invalid string", func(t *testing.T) {
t.Parallel()
getter := newTestSectionGetter(map[string]string{
"db_type": "mysql",
"db_host": string(invalidUTF8ByteSequence),
"db_name": "grafana",
"db_user": "user",
"db_password": "password",
})
engine, err := getEnginePostgres(getter, nil)
assert.Nil(t, engine)
assert.Error(t, err)
assert.ErrorIs(t, err, ErrInvalidUTF8Sequence)
})
t.Run("invalid hostport", func(t *testing.T) {
t.Parallel()
getter := newTestSectionGetter(map[string]string{
"db_type": "mysql",
"db_host": "1:1:1",
"db_name": "grafana",
"db_user": "user",
"db_password": "password",
})
engine, err := getEnginePostgres(getter, nil)
assert.Nil(t, engine)
assert.Error(t, err)
})
}

View File

@ -0,0 +1,154 @@
package dbimpl
import (
"context"
"errors"
"testing"
"time"
sqlmock "github.com/DATA-DOG/go-sqlmock"
"github.com/stretchr/testify/require"
resourcedb "github.com/grafana/grafana/pkg/storage/unified/sql/db"
)
func newCtx(t *testing.T) context.Context {
t.Helper()
d, ok := t.Deadline()
if !ok {
// provide a default timeout for tests
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
t.Cleanup(cancel)
return ctx
}
ctx, cancel := context.WithDeadline(context.Background(), d)
t.Cleanup(cancel)
return ctx
}
var errTest = errors.New("because of reasons")
const driverName = "sqlmock"
func TestDB_BeginTx(t *testing.T) {
t.Parallel()
t.Run("happy path", func(t *testing.T) {
t.Parallel()
sqldb, mock, err := sqlmock.New()
require.NoError(t, err)
db := NewDB(sqldb, driverName)
require.Equal(t, driverName, db.DriverName())
mock.ExpectBegin()
tx, err := db.BeginTx(newCtx(t), nil)
require.NoError(t, err)
require.NotNil(t, tx)
})
t.Run("fail begin", func(t *testing.T) {
t.Parallel()
sqldb, mock, err := sqlmock.New()
require.NoError(t, err)
db := NewDB(sqldb, "sqlmock")
mock.ExpectBegin().WillReturnError(errTest)
tx, err := db.BeginTx(newCtx(t), nil)
require.Nil(t, tx)
require.Error(t, err)
require.ErrorIs(t, err, errTest)
})
}
func TestDB_WithTx(t *testing.T) {
t.Parallel()
newTxFunc := func(err error) resourcedb.TxFunc {
return func(context.Context, resourcedb.Tx) error {
return err
}
}
t.Run("happy path", func(t *testing.T) {
t.Parallel()
sqldb, mock, err := sqlmock.New()
require.NoError(t, err)
db := NewDB(sqldb, "sqlmock")
mock.ExpectBegin()
mock.ExpectCommit()
err = db.WithTx(newCtx(t), nil, newTxFunc(nil))
require.NoError(t, err)
})
t.Run("fail begin", func(t *testing.T) {
t.Parallel()
sqldb, mock, err := sqlmock.New()
require.NoError(t, err)
db := NewDB(sqldb, "sqlmock")
mock.ExpectBegin().WillReturnError(errTest)
err = db.WithTx(newCtx(t), nil, newTxFunc(nil))
require.Error(t, err)
require.ErrorIs(t, err, errTest)
})
t.Run("fail tx", func(t *testing.T) {
t.Parallel()
sqldb, mock, err := sqlmock.New()
require.NoError(t, err)
db := NewDB(sqldb, "sqlmock")
mock.ExpectBegin()
mock.ExpectRollback()
err = db.WithTx(newCtx(t), nil, newTxFunc(errTest))
require.Error(t, err)
require.ErrorIs(t, err, errTest)
})
t.Run("fail tx; fail rollback", func(t *testing.T) {
t.Parallel()
sqldb, mock, err := sqlmock.New()
require.NoError(t, err)
db := NewDB(sqldb, "sqlmock")
errTest2 := errors.New("yet another err")
mock.ExpectBegin()
mock.ExpectRollback().WillReturnError(errTest)
err = db.WithTx(newCtx(t), nil, newTxFunc(errTest2))
require.Error(t, err)
require.ErrorIs(t, err, errTest)
require.ErrorIs(t, err, errTest2)
})
t.Run("fail commit", func(t *testing.T) {
t.Parallel()
sqldb, mock, err := sqlmock.New()
require.NoError(t, err)
db := NewDB(sqldb, "sqlmock")
mock.ExpectBegin()
mock.ExpectCommit().WillReturnError(errTest)
err = db.WithTx(newCtx(t), nil, newTxFunc(nil))
require.Error(t, err)
require.ErrorIs(t, err, errTest)
})
}

View File

@ -0,0 +1,166 @@
package dbimpl
import (
"fmt"
"sync"
"github.com/dlmiddlecote/sqlstats"
"github.com/jmoiron/sqlx"
"github.com/prometheus/client_golang/prometheus"
"xorm.io/xorm"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/sqlstore/session"
"github.com/grafana/grafana/pkg/setting"
resourcedb "github.com/grafana/grafana/pkg/storage/unified/sql/db"
"github.com/grafana/grafana/pkg/storage/unified/sql/db/migrations"
)
var _ resourcedb.ResourceDBInterface = (*ResourceDB)(nil)
func ProvideResourceDB(db db.DB, cfg *setting.Cfg, features featuremgmt.FeatureToggles, tracer tracing.Tracer) (*ResourceDB, error) {
return &ResourceDB{
db: db,
cfg: cfg,
features: features,
log: log.New("entity-db"),
tracer: tracer,
}, nil
}
type ResourceDB struct {
once sync.Once
onceErr error
db db.DB
features featuremgmt.FeatureToggles
engine *xorm.Engine
cfg *setting.Cfg
log log.Logger
tracer tracing.Tracer
}
func (db *ResourceDB) Init() error {
db.once.Do(func() {
db.onceErr = db.init()
})
return db.onceErr
}
func (db *ResourceDB) GetEngine() (*xorm.Engine, error) {
if err := db.Init(); err != nil {
return nil, err
}
return db.engine, db.onceErr
}
func (db *ResourceDB) init() error {
if db.engine != nil {
return nil
}
var engine *xorm.Engine
var err error
// TODO: This should be renamed resource_api
getter := &sectionGetter{
DynamicSection: db.cfg.SectionWithEnvOverrides("resource_api"),
}
dbType := getter.Key("db_type").MustString("")
// if explicit connection settings are provided, use them
if dbType != "" {
if dbType == "postgres" {
engine, err = getEnginePostgres(getter, db.tracer)
if err != nil {
return err
}
// FIXME: this config option is cockroachdb-specific, it's not supported by postgres
// FIXME: this only sets this option for the session that we get
// from the pool right now. A *sql.DB is a pool of connections,
// there is no guarantee that the session where this is run will be
// the same where we need to change the type of a column
_, err = engine.Exec("SET SESSION enable_experimental_alter_column_type_general=true")
if err != nil {
db.log.Error("error connecting to postgres", "msg", err.Error())
// FIXME: return nil, err
}
} else if dbType == "mysql" {
engine, err = getEngineMySQL(getter, db.tracer)
if err != nil {
return err
}
if err = engine.Ping(); err != nil {
return err
}
} else {
// TODO: sqlite support
return fmt.Errorf("invalid db type specified: %s", dbType)
}
// register sql stat metrics
if err := prometheus.Register(sqlstats.NewStatsCollector("unified_storage", engine.DB().DB)); err != nil {
db.log.Warn("Failed to register unified storage sql stats collector", "error", err)
}
// configure sql logging
debugSQL := getter.Key("log_queries").MustBool(false)
if !debugSQL {
engine.SetLogger(&xorm.DiscardLogger{})
} else {
// add stack to database calls to be able to see what repository initiated queries. Top 7 items from the stack as they are likely in the xorm library.
// engine.SetLogger(sqlstore.NewXormLogger(log.LvlInfo, log.WithSuffix(log.New("sqlstore.xorm"), log.CallerContextKey, log.StackCaller(log.DefaultCallerDepth))))
engine.ShowSQL(true)
engine.ShowExecTime(true)
}
// otherwise, try to use the grafana db connection
} else {
if db.db == nil {
return fmt.Errorf("no db connection provided")
}
engine = db.db.GetEngine()
}
db.engine = engine
if err := migrations.MigrateResourceStore(engine, db.cfg, db.features); err != nil {
db.engine = nil
return fmt.Errorf("run migrations: %w", err)
}
return nil
}
func (db *ResourceDB) GetSession() (*session.SessionDB, error) {
engine, err := db.GetEngine()
if err != nil {
return nil, err
}
return session.GetSession(sqlx.NewDb(engine.DB().DB, engine.DriverName())), nil
}
func (db *ResourceDB) GetCfg() *setting.Cfg {
return db.cfg
}
func (db *ResourceDB) GetDB() (resourcedb.DB, error) {
engine, err := db.GetEngine()
if err != nil {
return nil, err
}
ret := NewDB(engine.DB().DB, engine.Dialect().DriverName())
return ret, nil
}

View File

@ -0,0 +1,111 @@
package dbimpl
import (
"cmp"
"errors"
"fmt"
"net"
"sort"
"strings"
"unicode/utf8"
"github.com/grafana/grafana/pkg/setting"
)
var (
ErrInvalidUTF8Sequence = errors.New("invalid UTF-8 sequence")
)
type sectionGetter struct {
*setting.DynamicSection
err error
}
func (g *sectionGetter) Err() error {
return g.err
}
func (g *sectionGetter) String(key string) string {
v := g.DynamicSection.Key(key).MustString("")
if !utf8.ValidString(v) {
g.err = fmt.Errorf("value for key %q: %w", key, ErrInvalidUTF8Sequence)
return ""
}
return v
}
// MakeDSN creates a DSN from the given key/value pair. It validates the strings
// form valid UTF-8 sequences and escapes values if needed.
func MakeDSN(m map[string]string) (string, error) {
b := new(strings.Builder)
ks := keys(m)
sort.Strings(ks) // provide deterministic behaviour
for _, k := range ks {
v := m[k]
if !utf8.ValidString(v) {
return "", fmt.Errorf("value for DSN key %q: %w", k,
ErrInvalidUTF8Sequence)
}
if v == "" {
continue
}
if b.Len() > 0 {
_ = b.WriteByte(' ')
}
_, _ = b.WriteString(k)
_ = b.WriteByte('=')
writeDSNValue(b, v)
}
return b.String(), nil
}
func keys(m map[string]string) []string {
ret := make([]string, 0, len(m))
for k := range m {
ret = append(ret, k)
}
return ret
}
func writeDSNValue(b *strings.Builder, v string) {
numq := strings.Count(v, `'`)
numb := strings.Count(v, `\`)
if numq+numb == 0 && v != "" {
b.WriteString(v)
return
}
b.Grow(2 + numq + numb + len(v))
_ = b.WriteByte('\'')
for _, r := range v {
if r == '\\' || r == '\'' {
_ = b.WriteByte('\\')
}
_, _ = b.WriteRune(r)
}
_ = b.WriteByte('\'')
}
// splitHostPortDefault is similar to net.SplitHostPort, but will also accept a
// specification with no port and apply the default port instead. It also
// applies the given defaults if the results are empty strings.
func splitHostPortDefault(hostport, defaultHost, defaultPort string) (string, string, error) {
host, port, err := net.SplitHostPort(hostport)
if err != nil {
// try appending the port
host, port, err = net.SplitHostPort(hostport + ":" + defaultPort)
if err != nil {
return "", "", fmt.Errorf("invalid hostport: %q", hostport)
}
}
host = cmp.Or(host, defaultHost)
port = cmp.Or(port, defaultPort)
return host, port, nil
}

View File

@ -0,0 +1,108 @@
package dbimpl
import (
"fmt"
"testing"
"github.com/grafana/grafana/pkg/setting"
"github.com/stretchr/testify/require"
)
var invalidUTF8ByteSequence = []byte{0xff, 0xfe, 0xfd}
func setSectionKeyValues(section *setting.DynamicSection, m map[string]string) {
for k, v := range m {
section.Key(k).SetValue(v)
}
}
func newTestSectionGetter(m map[string]string) *sectionGetter {
section := setting.NewCfg().SectionWithEnvOverrides("entity_api")
setSectionKeyValues(section, m)
return &sectionGetter{
DynamicSection: section,
}
}
func TestSectionGetter(t *testing.T) {
t.Parallel()
var (
key = "the key"
val = string(invalidUTF8ByteSequence)
)
g := newTestSectionGetter(map[string]string{
key: val,
})
v := g.String("whatever")
require.Empty(t, v)
require.NoError(t, g.Err())
v = g.String(key)
require.Empty(t, v)
require.Error(t, g.Err())
require.ErrorIs(t, g.Err(), ErrInvalidUTF8Sequence)
}
func TestMakeDSN(t *testing.T) {
t.Parallel()
s, err := MakeDSN(map[string]string{
"db_name": string(invalidUTF8ByteSequence),
})
require.Empty(t, s)
require.Error(t, err)
require.ErrorIs(t, err, ErrInvalidUTF8Sequence)
s, err = MakeDSN(map[string]string{
"skip": "",
"user": `shou'ld esc\ape`,
"pass": "noescape",
})
require.NoError(t, err)
require.Equal(t, `pass=noescape user='shou\'ld esc\\ape'`, s)
}
func TestSplitHostPort(t *testing.T) {
t.Parallel()
testCases := []struct {
hostport string
defaultHost string
defaultPort string
fails bool
host string
port string
}{
{hostport: "192.168.0.140:456", defaultHost: "", defaultPort: "", host: "192.168.0.140", port: "456"},
{hostport: "192.168.0.140", defaultHost: "", defaultPort: "123", host: "192.168.0.140", port: "123"},
{hostport: "[::1]:456", defaultHost: "", defaultPort: "", host: "::1", port: "456"},
{hostport: "[::1]", defaultHost: "", defaultPort: "123", host: "::1", port: "123"},
{hostport: ":456", defaultHost: "1.2.3.4", defaultPort: "", host: "1.2.3.4", port: "456"},
{hostport: "xyz.rds.amazonaws.com", defaultHost: "", defaultPort: "123", host: "xyz.rds.amazonaws.com", port: "123"},
{hostport: "xyz.rds.amazonaws.com:123", defaultHost: "", defaultPort: "", host: "xyz.rds.amazonaws.com", port: "123"},
{hostport: "", defaultHost: "localhost", defaultPort: "1433", host: "localhost", port: "1433"},
{hostport: "1:1:1", fails: true},
}
for i, tc := range testCases {
t.Run(fmt.Sprintf("test index #%d", i), func(t *testing.T) {
t.Parallel()
host, port, err := splitHostPortDefault(tc.hostport, tc.defaultHost, tc.defaultPort)
if tc.fails {
require.Error(t, err)
require.Empty(t, host)
require.Empty(t, port)
} else {
require.NoError(t, err)
require.Equal(t, tc.host, host)
require.Equal(t, tc.port, port)
}
})
}
}

View File

@ -0,0 +1,24 @@
package migrations
import (
"xorm.io/xorm"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/setting"
)
func MigrateResourceStore(engine *xorm.Engine, cfg *setting.Cfg, features featuremgmt.FeatureToggles) error {
// Skip if feature flag is not enabled
if !features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorage) {
return nil
}
mg := migrator.NewScopedMigrator(engine, cfg, "resource")
mg.AddCreateMigration()
initResourceTables(mg)
// since it's a new feature enable migration locking by default
return mg.Start(true, 0)
}

View File

@ -0,0 +1,101 @@
package migrations
import (
"fmt"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
)
func initResourceTables(mg *migrator.Migrator) string {
marker := "Initialize resource tables"
mg.AddMigration(marker, &migrator.RawSQLMigration{})
tables := []migrator.Table{}
tables = append(tables, migrator.Table{
Name: "resource",
Columns: []*migrator.Column{
// primary identifier
{Name: "guid", Type: migrator.DB_NVarchar, Length: 36, Nullable: false, IsPrimaryKey: true},
{Name: "resource_version", Type: migrator.DB_BigInt, Nullable: true},
// K8s Identity group+(version)+namespace+resource+name
{Name: "group", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
{Name: "resource", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
{Name: "namespace", Type: migrator.DB_NVarchar, Length: 63, Nullable: false},
{Name: "name", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
{Name: "value", Type: migrator.DB_LongText, Nullable: true},
{Name: "action", Type: migrator.DB_Int, Nullable: false}, // 1: create, 2: update, 3: delete
// Hashed label set
{Name: "label_set", Type: migrator.DB_NVarchar, Length: 64, Nullable: true}, // null is no labels
},
Indices: []*migrator.Index{
{Cols: []string{"namespace", "group", "resource", "name"}, Type: migrator.UniqueIndex},
},
})
tables = append(tables, migrator.Table{
Name: "resource_history",
Columns: []*migrator.Column{
// primary identifier
{Name: "guid", Type: migrator.DB_NVarchar, Length: 36, Nullable: false, IsPrimaryKey: true},
{Name: "resource_version", Type: migrator.DB_BigInt, Nullable: true},
// K8s Identity group+(version)+namespace+resource+name
{Name: "group", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
{Name: "resource", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
{Name: "namespace", Type: migrator.DB_NVarchar, Length: 63, Nullable: false},
{Name: "name", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
{Name: "value", Type: migrator.DB_LongText, Nullable: true},
{Name: "action", Type: migrator.DB_Int, Nullable: false}, // 1: create, 2: update, 3: delete
// Hashed label set
{Name: "label_set", Type: migrator.DB_NVarchar, Length: 64, Nullable: true}, // null is no labels
},
Indices: []*migrator.Index{
{
Cols: []string{"namespace", "group", "resource", "name", "resource_version"},
Type: migrator.UniqueIndex,
Name: "UQE_resource_history_namespace_group_name_version",
},
// index to support watch poller
{Cols: []string{"resource_version"}, Type: migrator.IndexType},
},
})
// tables = append(tables, migrator.Table{
// Name: "resource_label_set",
// Columns: []*migrator.Column{
// {Name: "label_set", Type: migrator.DB_NVarchar, Length: 64, Nullable: false},
// {Name: "label", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
// {Name: "value", Type: migrator.DB_Text, Nullable: false},
// },
// Indices: []*migrator.Index{
// {Cols: []string{"label_set", "label"}, Type: migrator.UniqueIndex},
// },
// })
tables = append(tables, migrator.Table{
Name: "resource_version",
Columns: []*migrator.Column{
{Name: "group", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
{Name: "resource", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
{Name: "resource_version", Type: migrator.DB_BigInt, Nullable: false},
},
Indices: []*migrator.Index{
{Cols: []string{"group", "resource"}, Type: migrator.UniqueIndex},
},
})
// Initialize all tables
for t := range tables {
mg.AddMigration("drop table "+tables[t].Name, migrator.NewDropTableMigration(tables[t].Name))
mg.AddMigration("create table "+tables[t].Name, migrator.NewAddTableMigration(tables[t]))
for i := range tables[t].Indices {
mg.AddMigration(fmt.Sprintf("create table %s, index: %d", tables[t].Name, i), migrator.NewAddIndexMigration(tables[t], tables[t].Indices[i]))
}
}
return marker
}

View File

@ -0,0 +1,71 @@
package db
import (
"context"
"database/sql"
"xorm.io/xorm"
"github.com/grafana/grafana/pkg/services/sqlstore/session"
"github.com/grafana/grafana/pkg/setting"
)
const (
DriverPostgres = "postgres"
DriverMySQL = "mysql"
DriverSQLite = "sqlite"
DriverSQLite3 = "sqlite3"
)
// ResourceDBInterface provides access to a database capable of supporting the
// Entity Server.
type ResourceDBInterface interface {
Init() error
GetCfg() *setting.Cfg
GetDB() (DB, error)
// TODO: deprecate.
GetSession() (*session.SessionDB, error)
GetEngine() (*xorm.Engine, error)
}
// DB is a thin abstraction on *sql.DB to allow mocking to provide better unit
// testing. We purposefully hide database operation methods that would use
// context.Background().
type DB interface {
ContextExecer
BeginTx(context.Context, *sql.TxOptions) (Tx, error)
WithTx(context.Context, *sql.TxOptions, TxFunc) error
PingContext(context.Context) error
Stats() sql.DBStats
DriverName() string
}
// TxFunc is a function that executes with access to a transaction. The context
// it receives is the same context used to create the transaction, and is
// provided so that a general prupose TxFunc is able to retrieve information
// from that context, and derive other contexts that may be used to run database
// operation methods accepting a context. A derived context can be used to
// request a specific database operation to take no more than a specific
// fraction of the remaining timeout of the transaction context, or to enrich
// the downstream observability layer with relevant information regarding the
// specific operation being carried out.
type TxFunc = func(context.Context, Tx) error
// Tx is a thin abstraction on *sql.Tx to allow mocking to provide better unit
// testing. We allow database operation methods that do not take a
// context.Context here since a Tx can only be obtained with DB.BeginTx, which
// already takes a context.Context.
type Tx interface {
ContextExecer
Commit() error
Rollback() error
}
// ContextExecer is a set of database operation methods that take
// context.Context.
type ContextExecer interface {
ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
}

View File

@ -0,0 +1,191 @@
package sql
import (
"database/sql"
"embed"
"fmt"
"text/template"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
)
// Templates setup.
var (
//go:embed data/*.sql
sqlTemplatesFS embed.FS
sqlTemplates = template.Must(template.New("sql").ParseFS(sqlTemplatesFS, `data/*.sql`))
)
func mustTemplate(filename string) *template.Template {
if t := sqlTemplates.Lookup(filename); t != nil {
return t
}
panic(fmt.Sprintf("template file not found: %s", filename))
}
// Templates.
var (
sqlResourceDelete = mustTemplate("resource_delete.sql")
sqlResourceInsert = mustTemplate("resource_insert.sql")
sqlResourceUpdate = mustTemplate("resource_update.sql")
sqlResourceRead = mustTemplate("resource_read.sql")
sqlResourceList = mustTemplate("resource_list.sql")
sqlResourceHistoryList = mustTemplate("resource_history_list.sql")
sqlResourceUpdateRV = mustTemplate("resource_update_rv.sql")
sqlResourceHistoryRead = mustTemplate("resource_history_read.sql")
sqlResourceHistoryUpdateRV = mustTemplate("resource_history_update_rv.sql")
sqlResourceHistoryInsert = mustTemplate("resource_history_insert.sql")
sqlResourceHistoryPoll = mustTemplate("resource_history_poll.sql")
// sqlResourceLabelsInsert = mustTemplate("resource_labels_insert.sql")
sqlResourceVersionGet = mustTemplate("resource_version_get.sql")
sqlResourceVersionInc = mustTemplate("resource_version_inc.sql")
sqlResourceVersionInsert = mustTemplate("resource_version_insert.sql")
)
// TxOptions.
var (
ReadCommitted = &sql.TxOptions{
Isolation: sql.LevelReadCommitted,
}
ReadCommittedRO = &sql.TxOptions{
Isolation: sql.LevelReadCommitted,
ReadOnly: true,
}
)
// SQLError is an error returned by the database, which includes additionally
// debugging information about what was sent to the database.
type SQLError struct {
Err error
CallType string // either Query, QueryRow or Exec
TemplateName string
Query string
RawQuery string
ScanDest []any
// potentially regulated information is not exported and only directly
// available for local testing and local debugging purposes, making sure it
// is never marshaled to JSON or any other serialization.
arguments []any
}
func (e SQLError) Unwrap() error {
return e.Err
}
func (e SQLError) Error() string {
return fmt.Sprintf("%s: %s with %d input arguments and %d output "+
"destination arguments: %v", e.TemplateName, e.CallType,
len(e.arguments), len(e.ScanDest), e.Err)
}
type sqlResourceRequest struct {
*sqltemplate.SQLTemplate
GUID string
WriteEvent resource.WriteEvent
}
func (r sqlResourceRequest) Validate() error {
return nil // TODO
}
type historyPollResponse struct {
Key resource.ResourceKey
ResourceVersion int64
Value []byte
Action int
}
func (r *historyPollResponse) Results() (*historyPollResponse, error) {
return r, nil
}
type sqlResourceHistoryPollRequest struct {
*sqltemplate.SQLTemplate
SinceResourceVersion int64
Response *historyPollResponse
}
func (r sqlResourceHistoryPollRequest) Validate() error {
return nil // TODO
}
// sqlResourceReadRequest can be used to retrieve a row fromthe "resource" tables.
type readResponse struct {
resource.ReadResponse
}
func (r *readResponse) Results() (*readResponse, error) {
return r, nil
}
type sqlResourceReadRequest struct {
*sqltemplate.SQLTemplate
Request *resource.ReadRequest
*readResponse
}
func (r sqlResourceReadRequest) Validate() error {
return nil // TODO
}
// List
type sqlResourceListRequest struct {
*sqltemplate.SQLTemplate
Request *resource.ListRequest
Response *resource.ResourceWrapper
}
func (r sqlResourceListRequest) Validate() error {
return nil // TODO
}
type historyListRequest struct {
ResourceVersion, Limit, Offset int64
Options *resource.ListOptions
}
type sqlResourceHistoryListRequest struct {
*sqltemplate.SQLTemplate
Request *historyListRequest
Response *resource.ResourceWrapper
}
func (r sqlResourceHistoryListRequest) Validate() error {
return nil // TODO
}
// update RV
type sqlResourceUpdateRVRequest struct {
*sqltemplate.SQLTemplate
GUID string
ResourceVersion int64
}
func (r sqlResourceUpdateRVRequest) Validate() error {
return nil // TODO
}
// resource_version table requests.
type resourceVersion struct {
ResourceVersion int64
}
func (r *resourceVersion) Results() (*resourceVersion, error) {
return r, nil
}
type sqlResourceVersionRequest struct {
*sqltemplate.SQLTemplate
Group, Resource string
*resourceVersion
}
func (r sqlResourceVersionRequest) Validate() error {
return nil // TODO
}

View File

@ -0,0 +1,364 @@
package sql
import (
"embed"
"errors"
"testing"
"text/template"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
)
// debug is meant to provide greater debugging detail about certain errors. The
// returned error will either provide more detailed information or be the same
// original error, suitable only for local debugging. The details provided are
// not meant to be logged, since they could include PII or otherwise
// sensitive/confidential information. These information should only be used for
// local debugging with fake or otherwise non-regulated information.
func debug(err error) error {
var d interface{ Debug() string }
if errors.As(err, &d) {
return errors.New(d.Debug())
}
return err
}
var _ = debug // silence the `unused` linter
//go:embed testdata/*
var testdataFS embed.FS
func testdata(t *testing.T, filename string) []byte {
t.Helper()
b, err := testdataFS.ReadFile(`testdata/` + filename)
require.NoError(t, err)
return b
}
func TestQueries(t *testing.T) {
t.Parallel()
// Each template has one or more test cases, each identified with a
// descriptive name (e.g. "happy path", "error twiddling the frobb"). Each
// of them will test that for the same input data they must produce a result
// that will depend on the Dialect. Expected queries should be defined in
// separate files in the testdata directory. This improves the testing
// experience by separating test data from test code, since mixing both
// tends to make it more difficult to reason about what is being done,
// especially as we want testing code to scale and make it easy to add
// tests.
type (
// type aliases to make code more semantic and self-documenting
resultSQLFilename = string
dialects = []sqltemplate.Dialect
expected map[resultSQLFilename]dialects
testCase = struct {
Name string
// Data should be the struct passed to the template.
Data sqltemplate.SQLTemplateIface
// Expected maps the filename containing the expected result query
// to the list of dialects that would produce it. For simple
// queries, it is possible that more than one dialect produce the
// same output. The filename is expected to be in the `testdata`
// directory.
Expected expected
}
)
// Define tests cases. Most templates are trivial and testing that they
// generate correct code for a single Dialect is fine, since the one thing
// that always changes is how SQL placeholder arguments are passed (most
// Dialects use `?` while PostgreSQL uses `$1`, `$2`, etc.), and that is
// something that should be tested in the Dialect implementation instead of
// here. We will ask to have at least one test per SQL template, and we will
// lean to test MySQL. Templates containing branching (conditionals, loops,
// etc.) should be exercised at least once in each of their branches.
//
// NOTE: in the Data field, make sure to have pointers populated to simulate
// data is set as it would be in a real request. The data being correctly
// populated in each case should be tested in integration tests, where the
// data will actually flow to and from a real database. In this tests we
// only care about producing the correct SQL.
testCases := map[*template.Template][]*testCase{
sqlResourceDelete: {
{
Name: "single path",
Data: &sqlResourceRequest{
SQLTemplate: new(sqltemplate.SQLTemplate),
WriteEvent: resource.WriteEvent{
Key: &resource.ResourceKey{},
},
},
Expected: expected{
"resource_delete_mysql_sqlite.sql": dialects{
sqltemplate.MySQL,
sqltemplate.SQLite,
},
"resource_delete_postgres.sql": dialects{
sqltemplate.PostgreSQL,
},
},
},
},
sqlResourceInsert: {
{
Name: "insert into resource",
Data: &sqlResourceRequest{
SQLTemplate: new(sqltemplate.SQLTemplate),
WriteEvent: resource.WriteEvent{
Key: &resource.ResourceKey{},
},
},
Expected: expected{
"resource_insert_mysql_sqlite.sql": dialects{
sqltemplate.MySQL,
sqltemplate.SQLite,
},
},
},
},
sqlResourceUpdate: {
{
Name: "single path",
Data: &sqlResourceRequest{
SQLTemplate: new(sqltemplate.SQLTemplate),
WriteEvent: resource.WriteEvent{
Key: &resource.ResourceKey{},
},
},
Expected: expected{
"resource_update_mysql_sqlite.sql": dialects{
sqltemplate.MySQL,
sqltemplate.SQLite,
},
},
},
},
sqlResourceRead: {
{
Name: "without resource version",
Data: &sqlResourceReadRequest{
SQLTemplate: new(sqltemplate.SQLTemplate),
Request: &resource.ReadRequest{
Key: &resource.ResourceKey{},
},
readResponse: new(readResponse),
},
Expected: expected{
"resource_read_mysql_sqlite.sql": dialects{
sqltemplate.MySQL,
sqltemplate.SQLite,
},
},
},
},
sqlResourceList: {
{
Name: "filter on namespace",
Data: &sqlResourceListRequest{
SQLTemplate: new(sqltemplate.SQLTemplate),
Request: &resource.ListRequest{
Limit: 10,
Options: &resource.ListOptions{
Key: &resource.ResourceKey{
Namespace: "ns",
},
},
},
Response: new(resource.ResourceWrapper),
},
Expected: expected{
"resource_list_mysql_sqlite.sql": dialects{
sqltemplate.MySQL,
sqltemplate.SQLite,
},
},
},
},
sqlResourceHistoryList: {
{
Name: "single path",
Data: &sqlResourceHistoryListRequest{
SQLTemplate: new(sqltemplate.SQLTemplate),
Request: &historyListRequest{
Limit: 10,
Options: &resource.ListOptions{
Key: &resource.ResourceKey{
Namespace: "ns",
},
},
},
Response: new(resource.ResourceWrapper),
},
Expected: expected{
"resource_history_list_mysql_sqlite.sql": dialects{
sqltemplate.MySQL,
sqltemplate.SQLite,
},
},
},
},
sqlResourceUpdateRV: {
{
Name: "single path",
Data: &sqlResourceUpdateRVRequest{
SQLTemplate: new(sqltemplate.SQLTemplate),
},
Expected: expected{
"resource_update_rv_mysql_sqlite.sql": dialects{
sqltemplate.MySQL,
sqltemplate.SQLite,
},
},
},
},
sqlResourceHistoryRead: {
{
Name: "single path",
Data: &sqlResourceReadRequest{
SQLTemplate: new(sqltemplate.SQLTemplate),
Request: &resource.ReadRequest{
ResourceVersion: 123,
Key: &resource.ResourceKey{},
},
readResponse: new(readResponse),
},
Expected: expected{
"resource_history_read_mysql_sqlite.sql": dialects{
sqltemplate.MySQL,
sqltemplate.SQLite,
},
},
},
},
sqlResourceHistoryUpdateRV: {
{
Name: "single path",
Data: &sqlResourceUpdateRVRequest{
SQLTemplate: new(sqltemplate.SQLTemplate),
},
Expected: expected{
"resource_history_update_rv_mysql_sqlite.sql": dialects{
sqltemplate.MySQL,
sqltemplate.SQLite,
},
},
},
},
sqlResourceHistoryInsert: {
{
Name: "insert into resource_history",
Data: &sqlResourceRequest{
SQLTemplate: new(sqltemplate.SQLTemplate),
WriteEvent: resource.WriteEvent{
Key: &resource.ResourceKey{},
},
},
Expected: expected{
"resource_history_insert_mysql_sqlite.sql": dialects{
sqltemplate.MySQL,
sqltemplate.SQLite,
},
},
},
},
sqlResourceVersionGet: {
{
Name: "single path",
Data: &sqlResourceVersionRequest{
SQLTemplate: new(sqltemplate.SQLTemplate),
resourceVersion: new(resourceVersion),
},
Expected: expected{
"resource_version_get_mysql.sql": dialects{
sqltemplate.MySQL,
},
"resource_version_get_sqlite.sql": dialects{
sqltemplate.SQLite,
},
},
},
},
sqlResourceVersionInc: {
{
Name: "increment resource version",
Data: &sqlResourceVersionRequest{
SQLTemplate: new(sqltemplate.SQLTemplate),
resourceVersion: &resourceVersion{
ResourceVersion: 123,
},
},
Expected: expected{
"resource_version_inc_mysql_sqlite.sql": dialects{
sqltemplate.MySQL,
sqltemplate.SQLite,
},
},
},
},
sqlResourceVersionInsert: {
{
Name: "single path",
Data: &sqlResourceVersionRequest{
SQLTemplate: new(sqltemplate.SQLTemplate),
},
Expected: expected{
"resource_version_insert_mysql_sqlite.sql": dialects{
sqltemplate.MySQL,
sqltemplate.SQLite,
},
},
},
},
}
// Execute test cases
for tmpl, tcs := range testCases {
t.Run(tmpl.Name(), func(t *testing.T) {
t.Parallel()
for _, tc := range tcs {
t.Run(tc.Name, func(t *testing.T) {
t.Parallel()
for filename, ds := range tc.Expected {
t.Run(filename, func(t *testing.T) {
// not parallel because we're sharing tc.Data, not
// worth it deep cloning
rawQuery := string(testdata(t, filename))
expectedQuery := sqltemplate.FormatSQL(rawQuery)
for _, d := range ds {
t.Run(d.Name(), func(t *testing.T) {
// not parallel for the same reason
tc.Data.SetDialect(d)
err := tc.Data.Validate()
require.NoError(t, err)
got, err := sqltemplate.Execute(tmpl, tc.Data)
require.NoError(t, err)
got = sqltemplate.FormatSQL(got)
require.Equal(t, expectedQuery, got)
})
}
})
}
})
}
})
}
}

View File

@ -0,0 +1,31 @@
package sql
import (
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
)
// Creates a ResourceServer
func ProvideResourceServer(db db.DB, cfg *setting.Cfg, features featuremgmt.FeatureToggles, tracer tracing.Tracer) (resource.ResourceServer, error) {
opts := resource.ResourceServerOptions{
Tracer: tracer,
}
eDB, err := dbimpl.ProvideResourceDB(db, cfg, features, tracer)
if err != nil {
return nil, err
}
store, err := NewBackendStore(backendOptions{DB: eDB, Tracer: tracer})
if err != nil {
return nil, err
}
opts.Backend = store
opts.Diagnostics = store
opts.Lifecycle = store
return resource.NewResourceServer(opts)
}

View File

@ -7,7 +7,7 @@ import (
mock "github.com/stretchr/testify/mock"
sqltemplate "github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate"
sqltemplate "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
)
// SQLTemplateIface is an autogenerated mock type for the SQLTemplateIface type

View File

@ -7,7 +7,7 @@ import (
mock "github.com/stretchr/testify/mock"
sqltemplate "github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate"
sqltemplate "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
)
// WithResults is an autogenerated mock type for the WithResults type

View File

@ -0,0 +1 @@
DELETE FROM "resource" WHERE 1 = 1 AND "namespace" = ? AND "group" = ? AND "resource" = ? AND "name" = ?;

View File

@ -0,0 +1 @@
DELETE FROM "resource" WHERE 1 = 1 AND "namespace" = $1 AND "group" = $2 AND "resource" = $3 AND "name" = $4;

View File

@ -0,0 +1,3 @@
INSERT INTO "resource_history"
("guid", "group", "resource", "namespace", "name", "value", "action")
VALUES (?, ?, ?, ?, ?, ?, ?);

View File

@ -0,0 +1,12 @@
SELECT kv."resource_version", "value"
FROM "resource_history" as kv
JOIN (
SELECT "guid", max("resource_version") AS "resource_version"
FROM "resource_history" AS mkv
WHERE 1 = 1 AND "resource_version" <= ? AND "namespace" = ?
GROUP BY mkv."namespace", mkv."group", mkv."resource", mkv."name"
) AS maxkv ON maxkv."guid" = kv."guid"
WHERE kv."action" != 3
ORDER BY kv."resource_version" ASC
LIMIT ?, ?
;

View File

@ -0,0 +1,6 @@
SELECT "resource_version", "value"
FROM "resource_history"
WHERE 1 = 1 AND "namespace" = ? AND "group" = ? AND "resource" = ? AND "name" = ? AND "resource_version" <= ?
ORDER BY "resource_version" DESC
LIMIT 1
;

View File

@ -0,0 +1,3 @@
UPDATE "resource_history" SET "resource_version" = ?
WHERE "guid" = ?
;

View File

@ -0,0 +1,4 @@
INSERT INTO "resource"
("guid", "group", "resource", "namespace", "name", "value", "action")
VALUES (?, ?, ?, ?, ?, ?, ?)
;

View File

@ -0,0 +1,6 @@
SELECT "resource_version", "value"
FROM "resource"
WHERE 1 = 1 AND "namespace" = ?
ORDER BY "resource_version" DESC
LIMIT ?
;

View File

@ -0,0 +1,4 @@
SELECT "resource_version", "value"
FROM "resource"
WHERE 1 = 1 AND "namespace" = ? AND "group" = ? AND "resource" = ? AND "name" = ?
;

View File

@ -0,0 +1,4 @@
UPDATE "resource" SET "guid" = ?, "value" = ?, "action" = ?
WHERE 1 =1 AND "group" = ? AND "resource" = ? AND "namespace" = ? AND "name" = ?
;

View File

@ -0,0 +1,4 @@
UPDATE "resource" SET "resource_version" = ?
WHERE "guid" = ?
;

View File

@ -0,0 +1,4 @@
SELECT "resource_version"
FROM "resource_version"
WHERE 1 = 1 AND "group" = ? AND "resource" = ?
FOR UPDATE;

View File

@ -0,0 +1,4 @@
SELECT "resource_version"
FROM "resource_version"
WHERE 1 = 1 AND "group" = ? AND "resource" = ?
;

View File

@ -0,0 +1,4 @@
UPDATE "resource_version"
SET "resource_version" = ?
WHERE 1 = 1 AND "group" = ? AND "resource" = ?
;

View File

@ -0,0 +1,3 @@
INSERT INTO "resource_version"
("group", "resource", "resource_version")
VALUES (?, ?, 1);