SQL/Blob: Add support for blob storage to SQL backend (#98192)

This commit is contained in:
Ryan McKinley 2025-01-08 23:08:10 +03:00 committed by GitHub
parent 3e68731600
commit 429da7fd68
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 478 additions and 13 deletions

View File

@ -74,7 +74,7 @@ func (db *PostgresDialect) SQLType(c *Column) string {
case DB_NVarchar:
res = DB_Varchar
case DB_Uuid:
res = DB_Uuid
return DB_Uuid // do not add the length options
case DB_Blob, DB_TinyBlob, DB_MediumBlob, DB_LongBlob:
return DB_Bytea
case DB_Double:

View File

@ -205,19 +205,24 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
// Initialize the blob storage
blobstore := opts.Blob.Backend
if blobstore == nil && opts.Blob.URL != "" {
ctx := context.Background()
bucket, err := OpenBlobBucket(ctx, opts.Blob.URL)
if err != nil {
return nil, err
}
if blobstore == nil {
if opts.Blob.URL != "" {
ctx := context.Background()
bucket, err := OpenBlobBucket(ctx, opts.Blob.URL)
if err != nil {
return nil, err
}
blobstore, err = NewCDKBlobSupport(ctx, CDKBlobSupportOptions{
Tracer: opts.Tracer,
Bucket: NewInstrumentedBucket(bucket, opts.Reg, opts.Tracer),
})
if err != nil {
return nil, err
blobstore, err = NewCDKBlobSupport(ctx, CDKBlobSupportOptions{
Tracer: opts.Tracer,
Bucket: NewInstrumentedBucket(bucket, opts.Reg, opts.Tracer),
})
if err != nil {
return nil, err
}
} else {
// Check if the backend supports blob storage
blobstore, _ = opts.Backend.(BlobSupport)
}
}

View File

@ -0,0 +1,113 @@
package sql
import (
context "context"
"crypto/md5"
"encoding/hex"
"fmt"
"net/http"
"github.com/google/uuid"
"github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
"github.com/grafana/grafana/pkg/storage/unified/sql/dbutil"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
)
var (
_ resource.BlobSupport = (*backend)(nil)
)
func (b *backend) SupportsSignedURLs() bool {
return false
}
func (b *backend) PutResourceBlob(ctx context.Context, req *resource.PutBlobRequest) (*resource.PutBlobResponse, error) {
ctx, span := b.tracer.Start(ctx, tracePrefix+"PutResourceBlob")
defer span.End()
if req.Method == resource.PutBlobRequest_HTTP {
return &resource.PutBlobResponse{
Error: resource.NewBadRequestError("signed url upload not supported"),
}, nil
}
hasher := md5.New() // same as s3
_, err := hasher.Write(req.Value)
if err != nil {
return nil, err
}
info := &utils.BlobInfo{
UID: uuid.New().String(),
Size: int64(len(req.Value)),
Hash: hex.EncodeToString(hasher.Sum(nil)),
}
info.SetContentType(req.ContentType)
if info.Size < 1 {
return &resource.PutBlobResponse{
Error: resource.NewBadRequestError("empty content"),
}, nil
}
// Insert the value
err = b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
_, err := dbutil.Exec(ctx, tx, sqlResourceBlobInsert, sqlResourceBlobInsertRequest{
SQLTemplate: sqltemplate.New(b.dialect),
Info: info,
Key: req.Resource,
ContentType: req.ContentType,
Value: req.Value,
})
return err
})
if err != nil {
return &resource.PutBlobResponse{
Error: resource.AsErrorResult(err),
}, nil
}
return &resource.PutBlobResponse{
Uid: info.UID,
Size: info.Size,
MimeType: info.MimeType,
Charset: info.Charset,
Hash: info.Hash,
}, nil
}
func (b *backend) GetResourceBlob(ctx context.Context, key *resource.ResourceKey, info *utils.BlobInfo, mustProxy bool) (*resource.GetBlobResponse, error) {
ctx, span := b.tracer.Start(ctx, tracePrefix+"GetResourceBlob")
defer span.End()
rsp := &resource.GetBlobResponse{}
err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
rows, err := dbutil.QueryRows(ctx, tx, sqlResourceBlobQuery, sqlResourceBlobQueryRequest{
SQLTemplate: sqltemplate.New(b.dialect),
Key: key,
UID: info.UID, // optional
})
if err != nil {
return err
}
if rows.Next() {
uid := ""
err = rows.Scan(&uid, &rsp.Value, &rsp.ContentType)
if info.UID != "" && info.UID != uid {
return fmt.Errorf("unexpected uid in result")
}
return err
}
rsp.Error = &resource.ErrorResult{
Code: http.StatusNotFound,
}
return err
})
if err != nil {
rsp.Error = resource.AsErrorResult(err)
}
return rsp, nil
}

View File

@ -0,0 +1,26 @@
INSERT INTO {{ .Ident "resource_blob" }}
(
{{ .Ident "uuid" }},
{{ .Ident "created" }},
{{ .Ident "group" }},
{{ .Ident "resource" }},
{{ .Ident "namespace" }},
{{ .Ident "name" }},
{{ .Ident "value" }},
{{ .Ident "hash" }},
{{ .Ident "content_type" }}
)
VALUES (
{{ .Arg .Info.UID }},
{{ .Arg .Now }},
{{ .Arg .Key.Group }},
{{ .Arg .Key.Resource }},
{{ .Arg .Key.Namespace }},
{{ .Arg .Key.Name }},
{{ .Arg .Value }},
{{ .Arg .Info.Hash }},
{{ .Arg .ContentType }}
)
;

View File

@ -0,0 +1,17 @@
SELECT
{{ .Ident "uuid" }},
{{ .Ident "value" }},
{{ .Ident "content_type" }}
FROM {{ .Ident "resource_blob" }}
WHERE 1 = 1
AND {{ .Ident "namespace" }} = {{ .Arg .Key.Namespace }}
AND {{ .Ident "group" }} = {{ .Arg .Key.Group }}
AND {{ .Ident "resource" }} = {{ .Arg .Key.Resource }}
{{ if .Key.Name }}
AND {{ .Ident "name" }} = {{ .Arg .Key.Name }}
{{ end }}
{{ if .UID }}
AND {{ .Ident "uuid" }} = {{ .Arg .UID }}
{{ end }}
ORDER BY {{ .Ident "created" }} DESC
LIMIT 1;

View File

@ -88,6 +88,34 @@ func initResourceTables(mg *migrator.Migrator) string {
},
})
tables = append(tables, migrator.Table{
Name: "resource_blob",
Columns: []*migrator.Column{
{Name: "uuid", Type: migrator.DB_Uuid, Length: 36, Nullable: false, IsPrimaryKey: true},
{Name: "created", Type: migrator.DB_DateTime, Nullable: false},
{Name: "group", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
{Name: "resource", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
{Name: "namespace", Type: migrator.DB_NVarchar, Length: 63, Nullable: false},
{Name: "name", Type: migrator.DB_NVarchar, Length: 253, Nullable: false},
// The raw bytes
{Name: "value", Type: migrator.DB_LongBlob, Nullable: false},
// Used as an etag
{Name: "hash", Type: migrator.DB_NVarchar, Length: 64, Nullable: false},
{Name: "content_type", Type: migrator.DB_NVarchar, Length: 255, Nullable: false},
},
Indices: []*migrator.Index{
{
Cols: []string{"namespace", "group", "resource", "name"},
Type: migrator.IndexType,
Name: "IDX_resource_history_namespace_group_name",
},
{Cols: []string{"created"}, Type: migrator.IndexType}, // sort field
},
})
// Initialize all tables
for t := range tables {
mg.AddMigration("drop table "+tables[t].Name, migrator.NewDropTableMigration(tables[t].Name))

View File

@ -5,7 +5,9 @@ import (
"embed"
"fmt"
"text/template"
"time"
"github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
)
@ -46,6 +48,9 @@ var (
sqlResourceVersionUpdate = mustTemplate("resource_version_update.sql")
sqlResourceVersionInsert = mustTemplate("resource_version_insert.sql")
sqlResourceVersionList = mustTemplate("resource_version_list.sql")
sqlResourceBlobInsert = mustTemplate("resource_blob_insert.sql")
sqlResourceBlobQuery = mustTemplate("resource_blob_query.sql")
)
// TxOptions.
@ -205,6 +210,32 @@ func (r sqlResourceHistoryUpdateRequest) Validate() error {
return nil // TODO
}
type sqlResourceBlobInsertRequest struct {
sqltemplate.SQLTemplate
Now time.Time
Info *utils.BlobInfo
Key *resource.ResourceKey
Value []byte
ContentType string
}
func (r sqlResourceBlobInsertRequest) Validate() error {
if len(r.Value) < 1 {
return fmt.Errorf("missing body")
}
return nil
}
type sqlResourceBlobQueryRequest struct {
sqltemplate.SQLTemplate
Key *resource.ResourceKey
UID string
}
func (r sqlResourceBlobQueryRequest) Validate() error {
return nil
}
// update RV
type sqlResourceUpdateRVRequest struct {

View File

@ -3,7 +3,9 @@ package sql
import (
"testing"
"text/template"
"time"
"github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate/mocks"
)
@ -266,5 +268,54 @@ func TestUnifiedStorageQueries(t *testing.T) {
},
},
},
sqlResourceBlobInsert: {
{
Name: "basic",
Data: &sqlResourceBlobInsertRequest{
SQLTemplate: mocks.NewTestingSQLTemplate(),
Key: &resource.ResourceKey{
Namespace: "x",
Group: "g",
Resource: "r",
Name: "name",
},
Now: time.UnixMilli(1704056400000).UTC(),
Info: &utils.BlobInfo{
UID: "abc",
Hash: "xxx",
Size: 1234,
},
ContentType: "text/plain",
Value: []byte("abcdefg"),
},
},
},
sqlResourceBlobQuery: {
{
Name: "basic",
Data: &sqlResourceBlobQueryRequest{
SQLTemplate: mocks.NewTestingSQLTemplate(),
Key: &resource.ResourceKey{
Namespace: "x",
Group: "g",
Resource: "r",
Name: "name",
},
UID: "abc",
},
},
{
Name: "resource", // NOTE: this returns multiple values
Data: &sqlResourceBlobQueryRequest{
SQLTemplate: mocks.NewTestingSQLTemplate(),
Key: &resource.ResourceKey{
Namespace: "x",
Group: "g",
Resource: "r",
},
},
},
},
}})
}

View File

@ -353,6 +353,56 @@ func TestIntegrationBackendList(t *testing.T) {
require.Equal(t, int64(4), continueToken.StartOffset)
})
}
func TestIntegrationBlobSupport(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
ctx := testutil.NewTestContext(t, time.Now().Add(5*time.Second))
backend, server := newServer(t, nil)
store, ok := backend.(resource.BlobSupport)
require.True(t, ok)
t.Run("put and fetch blob", func(t *testing.T) {
key := &resource.ResourceKey{
Namespace: "ns",
Group: "g",
Resource: "r",
Name: "n",
}
b1, err := server.PutBlob(ctx, &resource.PutBlobRequest{
Resource: key,
Method: resource.PutBlobRequest_GRPC,
ContentType: "plain/text",
Value: []byte("hello 11111"),
})
require.NoError(t, err)
require.Nil(t, b1.Error)
require.Equal(t, "c894ae57bd227b8f8c63f38a2ddf458b", b1.Hash)
b2, err := server.PutBlob(ctx, &resource.PutBlobRequest{
Resource: key,
Method: resource.PutBlobRequest_GRPC,
ContentType: "plain/text",
Value: []byte("hello 22222"), // the most recent
})
require.NoError(t, err)
require.Nil(t, b2.Error)
require.Equal(t, "b0da48de4ff92e0ad0d836de4d746937", b2.Hash)
// Check that we can still access both values
found, err := store.GetResourceBlob(ctx, key, &utils.BlobInfo{UID: b1.Uid}, true)
require.NoError(t, err)
require.Equal(t, []byte("hello 11111"), found.Value)
found, err = store.GetResourceBlob(ctx, key, &utils.BlobInfo{UID: b2.Uid}, true)
require.NoError(t, err)
require.Equal(t, []byte("hello 22222"), found.Value)
})
}
func TestClientServer(t *testing.T) {
if infraDB.IsTestDbSQLite() {
t.Skip("TODO: test blocking, skipping to unblock Enterprise until we fix this")

View File

@ -0,0 +1,24 @@
INSERT INTO `resource_blob`
(
`uuid`,
`created`,
`group`,
`resource`,
`namespace`,
`name`,
`value`,
`hash`,
`content_type`
)
VALUES (
'abc',
'2023-12-31 21:00:00 +0000 UTC',
'g',
'r',
'x',
'name',
'[97 98 99 100 101 102 103]',
'xxx',
'text/plain'
)
;

View File

@ -0,0 +1,13 @@
SELECT
`uuid`,
`value`,
`content_type`
FROM `resource_blob`
WHERE 1 = 1
AND `namespace` = 'x'
AND `group` = 'g'
AND `resource` = 'r'
AND `name` = 'name'
AND `uuid` = 'abc'
ORDER BY `created` DESC
LIMIT 1;

View File

@ -0,0 +1,11 @@
SELECT
`uuid`,
`value`,
`content_type`
FROM `resource_blob`
WHERE 1 = 1
AND `namespace` = 'x'
AND `group` = 'g'
AND `resource` = 'r'
ORDER BY `created` DESC
LIMIT 1;

View File

@ -0,0 +1,24 @@
INSERT INTO "resource_blob"
(
"uuid",
"created",
"group",
"resource",
"namespace",
"name",
"value",
"hash",
"content_type"
)
VALUES (
'abc',
'2023-12-31 21:00:00 +0000 UTC',
'g',
'r',
'x',
'name',
'[97 98 99 100 101 102 103]',
'xxx',
'text/plain'
)
;

View File

@ -0,0 +1,13 @@
SELECT
"uuid",
"value",
"content_type"
FROM "resource_blob"
WHERE 1 = 1
AND "namespace" = 'x'
AND "group" = 'g'
AND "resource" = 'r'
AND "name" = 'name'
AND "uuid" = 'abc'
ORDER BY "created" DESC
LIMIT 1;

View File

@ -0,0 +1,11 @@
SELECT
"uuid",
"value",
"content_type"
FROM "resource_blob"
WHERE 1 = 1
AND "namespace" = 'x'
AND "group" = 'g'
AND "resource" = 'r'
ORDER BY "created" DESC
LIMIT 1;

View File

@ -0,0 +1,24 @@
INSERT INTO "resource_blob"
(
"uuid",
"created",
"group",
"resource",
"namespace",
"name",
"value",
"hash",
"content_type"
)
VALUES (
'abc',
'2023-12-31 21:00:00 +0000 UTC',
'g',
'r',
'x',
'name',
'[97 98 99 100 101 102 103]',
'xxx',
'text/plain'
)
;

View File

@ -0,0 +1,13 @@
SELECT
"uuid",
"value",
"content_type"
FROM "resource_blob"
WHERE 1 = 1
AND "namespace" = 'x'
AND "group" = 'g'
AND "resource" = 'r'
AND "name" = 'name'
AND "uuid" = 'abc'
ORDER BY "created" DESC
LIMIT 1;

View File

@ -0,0 +1,11 @@
SELECT
"uuid",
"value",
"content_type"
FROM "resource_blob"
WHERE 1 = 1
AND "namespace" = 'x'
AND "group" = 'g'
AND "resource" = 'r'
ORDER BY "created" DESC
LIMIT 1;