mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Storage: enable SQL backend (#48095)
* #45498: add String util to ListResponse for better UX * #45498: refactor db_filestorage FS API backend - use path_hash in DB schema * #45498: enable DB backend fs api tests * #45498: add comment * #45498: enable Storage feature flag during integration tests * remove fmt.println * #45498: reduce sizes of hash columns * separate conditions * #45498: make it easy to ignore backends when running fs api integration tests * #45498: quote `key` column name * #45498: reduce path_hash size * #45498: verify `/{orgId}/{storageName}/` prefix convention in integration tests * #45498: add etag to the sql table * #45498: add etag to the sql table * remove feature flag check (storage isn't dev-mode only) * add cacheControl and content disposition * add comments * add path_hash comment * explicitly set `path` column collation in `file` table for postgres
This commit is contained in:
parent
ff844f0599
commit
5c321599c8
@ -3,6 +3,7 @@ package filestorage
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
@ -51,8 +52,10 @@ type Paging struct {
|
||||
}
|
||||
|
||||
type UpsertFileCommand struct {
|
||||
Path string
|
||||
MimeType string
|
||||
Path string
|
||||
MimeType string
|
||||
CacheControl string
|
||||
ContentDisposition string
|
||||
|
||||
// Contents of an existing file won't be modified if cmd.Contents is nil
|
||||
Contents []byte
|
||||
@ -77,6 +80,29 @@ type ListResponse struct {
|
||||
LastPath string
|
||||
}
|
||||
|
||||
func (r *ListResponse) String() string {
|
||||
if r == nil {
|
||||
return "Nil ListResponse"
|
||||
}
|
||||
|
||||
if r.Files == nil {
|
||||
return "ListResponse with Nil files slice"
|
||||
}
|
||||
|
||||
if len(r.Files) == 0 {
|
||||
return "Empty ListResponse"
|
||||
}
|
||||
|
||||
var sb strings.Builder
|
||||
sb.WriteString(fmt.Sprintf("ListResponse with %d files\n", len(r.Files)))
|
||||
for i := range r.Files {
|
||||
sb.WriteString(fmt.Sprintf(" - %s, contentsLength: %d\n", r.Files[i].FullPath, len(r.Files[i].Contents)))
|
||||
}
|
||||
|
||||
sb.WriteString(fmt.Sprintf("Last path: %s, has more: %t\n", r.LastPath, r.HasMore))
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
type ListOptions struct {
|
||||
Recursive bool
|
||||
WithFiles bool
|
||||
|
@ -2,28 +2,41 @@ package filestorage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"encoding/hex"
|
||||
|
||||
// can ignore because we don't need a cryptographically secure hash function
|
||||
// sha1 low chance of collisions and better performance than sha256
|
||||
// nolint:gosec
|
||||
"crypto/sha1"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||
"github.com/grafana/grafana/pkg/util/errutil"
|
||||
)
|
||||
|
||||
type file struct {
|
||||
Path string `xorm:"path"`
|
||||
ParentFolderPath string `xorm:"parent_folder_path"`
|
||||
Contents []byte `xorm:"contents"`
|
||||
Updated time.Time `xorm:"updated"`
|
||||
Created time.Time `xorm:"created"`
|
||||
Size int64 `xorm:"size"`
|
||||
MimeType string `xorm:"mime_type"`
|
||||
Path string `xorm:"path"`
|
||||
PathHash string `xorm:"path_hash"`
|
||||
ParentFolderPathHash string `xorm:"parent_folder_path_hash"`
|
||||
Contents []byte `xorm:"contents"`
|
||||
ETag string `xorm:"etag"`
|
||||
CacheControl string `xorm:"cache_control"`
|
||||
ContentDisposition string `xorm:"content_disposition"`
|
||||
Updated time.Time `xorm:"updated"`
|
||||
Created time.Time `xorm:"created"`
|
||||
Size int64 `xorm:"size"`
|
||||
MimeType string `xorm:"mime_type"`
|
||||
}
|
||||
|
||||
type fileMeta struct {
|
||||
Path string `xorm:"path"`
|
||||
Key string `xorm:"key"`
|
||||
Value string `xorm:"value"`
|
||||
PathHash string `xorm:"path_hash"`
|
||||
Key string `xorm:"key"`
|
||||
Value string `xorm:"value"`
|
||||
}
|
||||
|
||||
type dbFileStorage struct {
|
||||
@ -31,6 +44,20 @@ type dbFileStorage struct {
|
||||
log log.Logger
|
||||
}
|
||||
|
||||
func createPathHash(path string) (string, error) {
|
||||
hasher := sha1.New()
|
||||
if _, err := hasher.Write([]byte(strings.ToLower(path))); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%x", hasher.Sum(nil)), nil
|
||||
}
|
||||
|
||||
func createContentsHash(contents []byte) string {
|
||||
hash := md5.Sum(contents)
|
||||
return hex.EncodeToString(hash[:])
|
||||
}
|
||||
|
||||
func NewDbStorage(log log.Logger, db *sqlstore.SQLStore, filter PathFilter, rootFolder string) FileStorage {
|
||||
return newWrapper(log, &dbFileStorage{
|
||||
log: log,
|
||||
@ -38,19 +65,19 @@ func NewDbStorage(log log.Logger, db *sqlstore.SQLStore, filter PathFilter, root
|
||||
}, filter, rootFolder)
|
||||
}
|
||||
|
||||
func (s dbFileStorage) getProperties(sess *sqlstore.DBSession, lowerCasePaths []string) (map[string]map[string]string, error) {
|
||||
func (s dbFileStorage) getProperties(sess *sqlstore.DBSession, pathHashes []string) (map[string]map[string]string, error) {
|
||||
attributesByPath := make(map[string]map[string]string)
|
||||
|
||||
entities := make([]*fileMeta, 0)
|
||||
if err := sess.Table("file_meta").In("path", lowerCasePaths).Find(&entities); err != nil {
|
||||
if err := sess.Table("file_meta").In("path_hash", pathHashes).Find(&entities); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, entity := range entities {
|
||||
if _, ok := attributesByPath[entity.Path]; !ok {
|
||||
attributesByPath[entity.Path] = make(map[string]string)
|
||||
if _, ok := attributesByPath[entity.PathHash]; !ok {
|
||||
attributesByPath[entity.PathHash] = make(map[string]string)
|
||||
}
|
||||
attributesByPath[entity.Path][entity.Key] = entity.Value
|
||||
attributesByPath[entity.PathHash][entity.Key] = entity.Value
|
||||
}
|
||||
|
||||
return attributesByPath, nil
|
||||
@ -58,15 +85,20 @@ func (s dbFileStorage) getProperties(sess *sqlstore.DBSession, lowerCasePaths []
|
||||
|
||||
func (s dbFileStorage) Get(ctx context.Context, filePath string) (*File, error) {
|
||||
var result *File
|
||||
err := s.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
|
||||
pathHash, err := createPathHash(filePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = s.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
table := &file{}
|
||||
exists, err := sess.Table("file").Where("LOWER(path) = ?", strings.ToLower(filePath)).Get(table)
|
||||
exists, err := sess.Table("file").Where("path_hash = ?", pathHash).Get(table)
|
||||
if !exists {
|
||||
return nil
|
||||
}
|
||||
|
||||
var meta = make([]*fileMeta, 0)
|
||||
if err := sess.Table("file_meta").Where("path = ?", strings.ToLower(filePath)).Find(&meta); err != nil {
|
||||
if err := sess.Table("file_meta").Where("path_hash = ?", pathHash).Find(&meta); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -100,9 +132,13 @@ func (s dbFileStorage) Get(ctx context.Context, filePath string) (*File, error)
|
||||
}
|
||||
|
||||
func (s dbFileStorage) Delete(ctx context.Context, filePath string) error {
|
||||
err := s.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
pathHash, err := createPathHash(filePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = s.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
table := &file{}
|
||||
exists, innerErr := sess.Table("file").Where("LOWER(path) = ?", strings.ToLower(filePath)).Get(table)
|
||||
exists, innerErr := sess.Table("file").Where("path_hash = ?", pathHash).Get(table)
|
||||
if innerErr != nil {
|
||||
return innerErr
|
||||
}
|
||||
@ -111,14 +147,14 @@ func (s dbFileStorage) Delete(ctx context.Context, filePath string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
number, innerErr := sess.Table("file").Where("LOWER(path) = ?", strings.ToLower(filePath)).Delete(table)
|
||||
number, innerErr := sess.Table("file").Where("path_hash = ?", pathHash).Delete(table)
|
||||
if innerErr != nil {
|
||||
return innerErr
|
||||
}
|
||||
s.log.Info("Deleted file", "path", filePath, "affectedRecords", number)
|
||||
|
||||
metaTable := &fileMeta{}
|
||||
number, innerErr = sess.Table("file_meta").Where("path = ?", strings.ToLower(filePath)).Delete(metaTable)
|
||||
number, innerErr = sess.Table("file_meta").Where("path_hash = ?", pathHash).Delete(metaTable)
|
||||
if innerErr != nil {
|
||||
return innerErr
|
||||
}
|
||||
@ -131,9 +167,14 @@ func (s dbFileStorage) Delete(ctx context.Context, filePath string) error {
|
||||
|
||||
func (s dbFileStorage) Upsert(ctx context.Context, cmd *UpsertFileCommand) error {
|
||||
now := time.Now()
|
||||
err := s.db.WithTransactionalDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
pathHash, err := createPathHash(cmd.Path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = s.db.WithTransactionalDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
existing := &file{}
|
||||
exists, err := sess.Table("file").Where("LOWER(path) = ?", strings.ToLower(cmd.Path)).Get(existing)
|
||||
exists, err := sess.Table("file").Where("path_hash = ?", pathHash).Get(existing)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -144,10 +185,13 @@ func (s dbFileStorage) Upsert(ctx context.Context, cmd *UpsertFileCommand) error
|
||||
contents := cmd.Contents
|
||||
existing.Contents = contents
|
||||
existing.MimeType = cmd.MimeType
|
||||
existing.ETag = createContentsHash(contents)
|
||||
existing.ContentDisposition = cmd.ContentDisposition
|
||||
existing.CacheControl = cmd.CacheControl
|
||||
existing.Size = int64(len(contents))
|
||||
}
|
||||
|
||||
_, err = sess.Where("LOWER(path) = ?", strings.ToLower(cmd.Path)).Update(existing)
|
||||
_, err = sess.Where("path_hash = ?", pathHash).Update(existing)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -157,23 +201,32 @@ func (s dbFileStorage) Upsert(ctx context.Context, cmd *UpsertFileCommand) error
|
||||
contentsToInsert = cmd.Contents
|
||||
}
|
||||
|
||||
file := &file{
|
||||
Path: cmd.Path,
|
||||
ParentFolderPath: strings.ToLower(getParentFolderPath(cmd.Path)),
|
||||
Contents: contentsToInsert,
|
||||
MimeType: cmd.MimeType,
|
||||
Size: int64(len(contentsToInsert)),
|
||||
Updated: now,
|
||||
Created: now,
|
||||
}
|
||||
_, err := sess.Insert(file)
|
||||
parentFolderPath := getParentFolderPath(cmd.Path)
|
||||
parentFolderPathHash, err := createPathHash(parentFolderPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
file := &file{
|
||||
Path: cmd.Path,
|
||||
PathHash: pathHash,
|
||||
ParentFolderPathHash: parentFolderPathHash,
|
||||
Contents: contentsToInsert,
|
||||
ContentDisposition: cmd.ContentDisposition,
|
||||
CacheControl: cmd.CacheControl,
|
||||
ETag: createContentsHash(contentsToInsert),
|
||||
MimeType: cmd.MimeType,
|
||||
Size: int64(len(contentsToInsert)),
|
||||
Updated: now,
|
||||
Created: now,
|
||||
}
|
||||
if _, err = sess.Insert(file); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if len(cmd.Properties) != 0 {
|
||||
if err = upsertProperties(sess, now, cmd); err != nil {
|
||||
if err = upsertProperties(s.db.Dialect, sess, now, cmd, pathHash); err != nil {
|
||||
if rollbackErr := sess.Rollback(); rollbackErr != nil {
|
||||
s.log.Error("failed while rolling back upsert", "path", cmd.Path)
|
||||
}
|
||||
@ -187,36 +240,38 @@ func (s dbFileStorage) Upsert(ctx context.Context, cmd *UpsertFileCommand) error
|
||||
return err
|
||||
}
|
||||
|
||||
func upsertProperties(sess *sqlstore.DBSession, now time.Time, cmd *UpsertFileCommand) error {
|
||||
func upsertProperties(dialect migrator.Dialect, sess *sqlstore.DBSession, now time.Time, cmd *UpsertFileCommand, pathHash string) error {
|
||||
fileMeta := &fileMeta{}
|
||||
_, err := sess.Table("file_meta").Where("path = ?", strings.ToLower(cmd.Path)).Delete(fileMeta)
|
||||
_, err := sess.Table("file_meta").Where("path_hash = ?", pathHash).Delete(fileMeta)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for key, val := range cmd.Properties {
|
||||
if err := upsertProperty(sess, now, cmd.Path, key, val); err != nil {
|
||||
if err := upsertProperty(dialect, sess, now, pathHash, key, val); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func upsertProperty(sess *sqlstore.DBSession, now time.Time, path string, key string, val string) error {
|
||||
func upsertProperty(dialect migrator.Dialect, sess *sqlstore.DBSession, now time.Time, pathHash string, key string, val string) error {
|
||||
existing := &fileMeta{}
|
||||
exists, err := sess.Table("file_meta").Where("path = ? AND key = ?", strings.ToLower(path), key).Get(existing)
|
||||
|
||||
keyEqualsCondition := fmt.Sprintf("%s = ?", dialect.Quote("key"))
|
||||
exists, err := sess.Table("file_meta").Where("path_hash = ?", pathHash).Where(keyEqualsCondition, key).Get(existing)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if exists {
|
||||
existing.Value = val
|
||||
_, err = sess.Where("path = ? AND key = ?", strings.ToLower(path), key).Update(existing)
|
||||
_, err = sess.Where("path_hash = ?", pathHash).Where(keyEqualsCondition, key).Update(existing)
|
||||
} else {
|
||||
_, err = sess.Insert(&fileMeta{
|
||||
Path: strings.ToLower(path),
|
||||
Key: key,
|
||||
Value: val,
|
||||
PathHash: pathHash,
|
||||
Key: key,
|
||||
Value: val,
|
||||
})
|
||||
}
|
||||
return err
|
||||
@ -229,7 +284,12 @@ func (s dbFileStorage) List(ctx context.Context, folderPath string, paging *Pagi
|
||||
err := s.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
cursor := ""
|
||||
if paging != nil && paging.After != "" {
|
||||
exists, err := sess.Table("file").Where("LOWER(path) = ?", strings.ToLower(paging.After)+Delimiter).Cols("mime_type").Exist()
|
||||
pagingFolderPathHash, err := createPathHash(paging.After + Delimiter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
exists, err := sess.Table("file").Where("path_hash = ?", pagingFolderPathHash).Exist()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -252,12 +312,18 @@ func (s dbFileStorage) List(ctx context.Context, folderPath string, paging *Pagi
|
||||
lowerFolderPrefix = lowerFolderPath + Delimiter
|
||||
}
|
||||
|
||||
sess.Where("LOWER(path) != ?", lowerFolderPrefix)
|
||||
prefixHash, _ := createPathHash(lowerFolderPrefix)
|
||||
|
||||
sess.Where("path_hash != ?", prefixHash)
|
||||
parentHash, err := createPathHash(lowerFolderPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !options.Recursive {
|
||||
sess.Where("parent_folder_path = ?", lowerFolderPath)
|
||||
sess.Where("parent_folder_path_hash = ?", parentHash)
|
||||
} else {
|
||||
sess.Where("(parent_folder_path = ?) OR (parent_folder_path LIKE ?)", lowerFolderPath, lowerFolderPrefix+"%")
|
||||
sess.Where("(parent_folder_path_hash = ?) OR (lower(path) LIKE ?)", parentHash, lowerFolderPrefix+"%")
|
||||
}
|
||||
|
||||
if !options.WithFolders && options.WithFiles {
|
||||
@ -289,14 +355,20 @@ func (s dbFileStorage) List(ctx context.Context, folderPath string, paging *Pagi
|
||||
foundLength = pageSize
|
||||
}
|
||||
|
||||
lowerCasePaths := make([]string, 0)
|
||||
pathToHash := make(map[string]string)
|
||||
hashes := make([]string, 0)
|
||||
for i := 0; i < foundLength; i++ {
|
||||
isFolder := strings.HasSuffix(foundFiles[i].Path, Delimiter)
|
||||
if !isFolder {
|
||||
lowerCasePaths = append(lowerCasePaths, strings.ToLower(foundFiles[i].Path))
|
||||
hash, err := createPathHash(foundFiles[i].Path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
hashes = append(hashes, hash)
|
||||
pathToHash[foundFiles[i].Path] = hash
|
||||
}
|
||||
}
|
||||
propertiesByLowerPath, err := s.getProperties(sess, lowerCasePaths)
|
||||
propertiesByPathHash, err := s.getProperties(sess, hashes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -305,8 +377,13 @@ func (s dbFileStorage) List(ctx context.Context, folderPath string, paging *Pagi
|
||||
for i := 0; i < foundLength; i++ {
|
||||
var props map[string]string
|
||||
path := strings.TrimSuffix(foundFiles[i].Path, Delimiter)
|
||||
if foundProps, ok := propertiesByLowerPath[strings.ToLower(path)]; ok {
|
||||
props = foundProps
|
||||
|
||||
if hash, ok := pathToHash[path]; ok {
|
||||
if foundProps, ok := propertiesByPathHash[hash]; ok {
|
||||
props = foundProps
|
||||
} else {
|
||||
props = make(map[string]string)
|
||||
}
|
||||
} else {
|
||||
props = make(map[string]string)
|
||||
}
|
||||
@ -360,7 +437,13 @@ func (s dbFileStorage) CreateFolder(ctx context.Context, path string) error {
|
||||
if !strings.HasSuffix(currentFolderPath, Delimiter) {
|
||||
currentFolderPath = currentFolderPath + Delimiter
|
||||
}
|
||||
exists, err := sess.Table("file").Where("LOWER(path) = ?", strings.ToLower(currentFolderPath)).Get(existing)
|
||||
|
||||
currentFolderPathHash, err := createPathHash(currentFolderPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
exists, err := sess.Table("file").Where("path_hash = ?", currentFolderPathHash).Get(existing)
|
||||
if err != nil {
|
||||
insertErr = err
|
||||
break
|
||||
@ -371,20 +454,28 @@ func (s dbFileStorage) CreateFolder(ctx context.Context, path string) error {
|
||||
continue
|
||||
}
|
||||
|
||||
currentFolderParentPathHash, err := createPathHash(currentFolderParentPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
contents := make([]byte, 0)
|
||||
file := &file{
|
||||
Path: currentFolderPath,
|
||||
ParentFolderPath: strings.ToLower(currentFolderParentPath),
|
||||
Contents: make([]byte, 0),
|
||||
Updated: now,
|
||||
MimeType: DirectoryMimeType,
|
||||
Created: now,
|
||||
Path: currentFolderPath,
|
||||
PathHash: currentFolderPathHash,
|
||||
ParentFolderPathHash: currentFolderParentPathHash,
|
||||
Contents: contents,
|
||||
ETag: createContentsHash(contents),
|
||||
Updated: now,
|
||||
MimeType: DirectoryMimeType,
|
||||
Created: now,
|
||||
}
|
||||
_, err = sess.Insert(file)
|
||||
if err != nil {
|
||||
insertErr = err
|
||||
break
|
||||
}
|
||||
s.log.Info("Created folder", "markerPath", file.Path, "parent", file.ParentFolderPath)
|
||||
s.log.Info("Created folder", "markerPath", file.Path, "parent", currentFolderParentPath)
|
||||
}
|
||||
|
||||
if insertErr != nil {
|
||||
@ -403,8 +494,11 @@ func (s dbFileStorage) CreateFolder(ctx context.Context, path string) error {
|
||||
func (s dbFileStorage) DeleteFolder(ctx context.Context, folderPath string) error {
|
||||
err := s.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
existing := &file{}
|
||||
internalFolderPath := strings.ToLower(folderPath) + Delimiter
|
||||
exists, err := sess.Table("file").Where("LOWER(path) = ?", internalFolderPath).Get(existing)
|
||||
internalFolderPathHash, err := createPathHash(folderPath + Delimiter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
exists, err := sess.Table("file").Where("path_hash = ?", internalFolderPathHash).Get(existing)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -413,7 +507,7 @@ func (s dbFileStorage) DeleteFolder(ctx context.Context, folderPath string) erro
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err = sess.Table("file").Where("LOWER(path) = ?", internalFolderPath).Delete(existing)
|
||||
_, err = sess.Table("file").Where("path_hash = ?", internalFolderPathHash).Delete(existing)
|
||||
return err
|
||||
})
|
||||
|
||||
|
@ -13,6 +13,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||
"gocloud.dev/blob"
|
||||
)
|
||||
|
||||
@ -35,9 +36,19 @@ func runTestCase(t *testing.T, testCase fsTestCase, ctx context.Context, filesto
|
||||
}
|
||||
}
|
||||
|
||||
type backend string
|
||||
|
||||
const (
|
||||
backendSQL backend = "sql"
|
||||
backendSQLNested backend = "sqlNested"
|
||||
backendInMem backend = "inMem"
|
||||
backendLocalFS backend = "localFS"
|
||||
backendLocalFSNested backend = "localFSNested"
|
||||
)
|
||||
|
||||
func runTests(createCases func() []fsTestCase, t *testing.T) {
|
||||
var testLogger log.Logger
|
||||
//var sqlStore *sqlstore.SQLStore
|
||||
var sqlStore *sqlstore.SQLStore
|
||||
var filestorage FileStorage
|
||||
var ctx context.Context
|
||||
var tempDir string
|
||||
@ -49,7 +60,7 @@ func runTests(createCases func() []fsTestCase, t *testing.T) {
|
||||
|
||||
cleanUp := func() {
|
||||
testLogger = nil
|
||||
//sqlStore = nil
|
||||
sqlStore = nil
|
||||
if filestorage != nil {
|
||||
_ = filestorage.close()
|
||||
filestorage = nil
|
||||
@ -65,17 +76,17 @@ func runTests(createCases func() []fsTestCase, t *testing.T) {
|
||||
filestorage = NewCdkBlobStorage(testLogger, bucket, "", nil)
|
||||
}
|
||||
|
||||
//setupSqlFS := func() {
|
||||
// commonSetup()
|
||||
// sqlStore = sqlstore.InitTestDB(t)
|
||||
// filestorage = NewDbStorage(testLogger, sqlStore, nil, "/")
|
||||
//}
|
||||
//
|
||||
//setupSqlFSNestedPath := func() {
|
||||
// commonSetup()
|
||||
// sqlStore = sqlstore.InitTestDB(t)
|
||||
// filestorage = NewDbStorage(testLogger, sqlStore, nil, "/dashboards/")
|
||||
//}
|
||||
setupSqlFS := func() {
|
||||
commonSetup()
|
||||
sqlStore = sqlstore.InitTestDB(t)
|
||||
filestorage = NewDbStorage(testLogger, sqlStore, nil, "/")
|
||||
}
|
||||
|
||||
setupSqlFSNestedPath := func() {
|
||||
commonSetup()
|
||||
sqlStore = sqlstore.InitTestDB(t)
|
||||
filestorage = NewDbStorage(testLogger, sqlStore, nil, "/5/dashboards/")
|
||||
}
|
||||
|
||||
setupLocalFs := func() {
|
||||
commonSetup()
|
||||
@ -114,31 +125,43 @@ func runTests(createCases func() []fsTestCase, t *testing.T) {
|
||||
|
||||
backends := []struct {
|
||||
setup func()
|
||||
name string
|
||||
name backend
|
||||
}{
|
||||
{
|
||||
setup: setupLocalFs,
|
||||
name: "Local FS",
|
||||
name: backendLocalFS,
|
||||
},
|
||||
{
|
||||
setup: setupLocalFsNestedPath,
|
||||
name: "Local FS with nested path",
|
||||
name: backendLocalFSNested,
|
||||
},
|
||||
{
|
||||
setup: setupInMemFS,
|
||||
name: "In-mem FS",
|
||||
name: backendInMem,
|
||||
},
|
||||
//{
|
||||
// setup: setupSqlFS,
|
||||
// name: "SQL FS",
|
||||
//},
|
||||
//{
|
||||
// setup: setupSqlFSNestedPath,
|
||||
// name: "SQL FS with nested path",
|
||||
//},
|
||||
{
|
||||
setup: setupSqlFS,
|
||||
name: backendSQL,
|
||||
},
|
||||
{
|
||||
setup: setupSqlFSNestedPath,
|
||||
name: backendSQLNested,
|
||||
},
|
||||
}
|
||||
|
||||
skipBackends := map[backend]bool{
|
||||
backendInMem: false,
|
||||
backendSQL: false,
|
||||
backendLocalFS: false,
|
||||
backendLocalFSNested: false,
|
||||
backendSQLNested: false,
|
||||
}
|
||||
|
||||
for _, backend := range backends {
|
||||
if skipBackends[backend.name] {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, tt := range createCases() {
|
||||
t.Run(fmt.Sprintf("%s: %s", backend.name, tt.name), func(t *testing.T) {
|
||||
backend.setup()
|
||||
|
@ -235,17 +235,17 @@ func runChecks(t *testing.T, stepName string, path string, output interface{}, c
|
||||
for _, check := range checks {
|
||||
runFileMetadataCheck(o, check, interfaceName(check))
|
||||
}
|
||||
case ListResponse:
|
||||
case *ListResponse:
|
||||
for _, check := range checks {
|
||||
c := check
|
||||
checkName := interfaceName(c)
|
||||
switch c := check.(type) {
|
||||
case listSizeCheck:
|
||||
require.Equal(t, c.v, len(o.Files), "%s %s", stepName, path)
|
||||
require.Equal(t, c.v, len(o.Files), "%s %s\nReceived %s", stepName, path, o)
|
||||
case listHasMoreCheck:
|
||||
require.Equal(t, c.v, o.HasMore, "%s %s", stepName, path)
|
||||
require.Equal(t, c.v, o.HasMore, "%s %s\nReceived %s", stepName, path, o)
|
||||
case listLastPathCheck:
|
||||
require.Equal(t, c.v, o.LastPath, "%s %s", stepName, path)
|
||||
require.Equal(t, c.v, o.LastPath, "%s %s\nReceived %s", stepName, path, o)
|
||||
default:
|
||||
t.Fatalf("unrecognized list check %s", checkName)
|
||||
}
|
||||
@ -290,7 +290,7 @@ func handleQuery(t *testing.T, ctx context.Context, query interface{}, queryName
|
||||
require.NoError(t, err, "%s: should be able to list files in %s", queryName, inputPath)
|
||||
require.NotNil(t, resp)
|
||||
if q.list != nil && len(q.list) > 0 {
|
||||
runChecks(t, queryName, inputPath, *resp, q.list)
|
||||
runChecks(t, queryName, inputPath, resp, q.list)
|
||||
} else {
|
||||
require.NotNil(t, resp, "%s %s", queryName, inputPath)
|
||||
require.Equal(t, false, resp.HasMore, "%s %s", queryName, inputPath)
|
||||
|
@ -2,40 +2,66 @@ package migrations
|
||||
|
||||
import "github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||
|
||||
// TODO: remove nolint as part of https://github.com/grafana/grafana/issues/45498
|
||||
// nolint:unused,deadcode
|
||||
func addDbFileStorageMigration(mg *migrator.Migrator) {
|
||||
filesTable := migrator.Table{
|
||||
Name: "file",
|
||||
Columns: []*migrator.Column{
|
||||
{Name: "path", Type: migrator.DB_NVarchar, Length: 1024, Nullable: false},
|
||||
{Name: "parent_folder_path", Type: migrator.DB_NVarchar, Length: 1024, Nullable: false},
|
||||
|
||||
// path_hash is used for indexing. we are using it to circumvent the max length limit of 191 for varchar2 fields in MySQL 5.6
|
||||
{Name: "path_hash", Type: migrator.DB_NVarchar, Length: 64, Nullable: false},
|
||||
|
||||
// parent_folder_path_hash is an optimization for a common use case - list all files in a given folder
|
||||
{Name: "parent_folder_path_hash", Type: migrator.DB_NVarchar, Length: 64, Nullable: false},
|
||||
|
||||
{Name: "contents", Type: migrator.DB_Blob, Nullable: false},
|
||||
|
||||
// HTTP Entity tag; md5 hash
|
||||
{Name: "etag", Type: migrator.DB_NVarchar, Length: 32, Nullable: false},
|
||||
|
||||
// cache_control HTTP header
|
||||
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control
|
||||
{Name: "cache_control", Type: migrator.DB_NVarchar, Length: 128, Nullable: false},
|
||||
|
||||
// content_disposition HTTP header - inline/attachment file display
|
||||
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Disposition
|
||||
{Name: "content_disposition", Type: migrator.DB_NVarchar, Length: 128, Nullable: false},
|
||||
|
||||
{Name: "updated", Type: migrator.DB_DateTime, Nullable: false},
|
||||
{Name: "created", Type: migrator.DB_DateTime, Nullable: false},
|
||||
{Name: "size", Type: migrator.DB_BigInt, Nullable: false},
|
||||
{Name: "mime_type", Type: migrator.DB_NVarchar, Length: 255, Nullable: false},
|
||||
},
|
||||
Indices: []*migrator.Index{
|
||||
{Cols: []string{"path"}, Type: migrator.UniqueIndex},
|
||||
{Cols: []string{"path_hash"}, Type: migrator.UniqueIndex},
|
||||
{Cols: []string{"parent_folder_path_hash"}},
|
||||
},
|
||||
}
|
||||
|
||||
mg.AddMigration("create file table", migrator.NewAddTableMigration(filesTable))
|
||||
mg.AddMigration("file table idx: path natural pk", migrator.NewAddIndexMigration(filesTable, filesTable.Indices[0]))
|
||||
mg.AddMigration("file table idx: parent_folder_path_hash fast folder retrieval", migrator.NewAddIndexMigration(filesTable, filesTable.Indices[1]))
|
||||
|
||||
fileMetaTable := migrator.Table{
|
||||
Name: "file_meta",
|
||||
Columns: []*migrator.Column{
|
||||
{Name: "path", Type: migrator.DB_NVarchar, Length: 1024, Nullable: false},
|
||||
{Name: "key", Type: migrator.DB_NVarchar, Length: 1024, Nullable: false},
|
||||
{Name: "path_hash", Type: migrator.DB_NVarchar, Length: 64, Nullable: false},
|
||||
|
||||
// 191 is the maximum length of indexable VARCHAR fields in MySQL 5.6 <= with utf8mb4 encoding
|
||||
{Name: "key", Type: migrator.DB_NVarchar, Length: 191, Nullable: false},
|
||||
{Name: "value", Type: migrator.DB_NVarchar, Length: 1024, Nullable: false},
|
||||
},
|
||||
Indices: []*migrator.Index{
|
||||
{Cols: []string{"path", "key"}, Type: migrator.UniqueIndex},
|
||||
{Cols: []string{"path_hash", "key"}, Type: migrator.UniqueIndex},
|
||||
},
|
||||
}
|
||||
|
||||
mg.AddMigration("create file_meta table", migrator.NewAddTableMigration(fileMetaTable))
|
||||
mg.AddMigration("file table idx: path key", migrator.NewAddIndexMigration(fileMetaTable, fileMetaTable.Indices[0]))
|
||||
|
||||
// TODO: add collation support to `migrator.Column`
|
||||
mg.AddMigration("set path collation in file table", migrator.NewRawSQLMigration("").
|
||||
// MySQL `utf8mb4_unicode_ci` collation is set in `mysql_dialect.go`
|
||||
// SQLite uses a `BINARY` collation by default
|
||||
Postgres("ALTER TABLE file ALTER COLUMN path TYPE VARCHAR(1024) COLLATE \"C\";")) // Collate C - sorting done based on character code byte values
|
||||
}
|
||||
|
@ -95,6 +95,7 @@ func (*OSSMigrations) AddMigration(mg *Migrator) {
|
||||
|
||||
addPublicDashboardMigration(mg)
|
||||
ualert.CreateDefaultFoldersForAlertingMigration(mg)
|
||||
addDbFileStorageMigration(mg)
|
||||
}
|
||||
|
||||
func addMigrationLogMigrations(mg *Migrator) {
|
||||
|
Loading…
Reference in New Issue
Block a user