CloudMigrations: Break snapshot resources out into their own table (#89575)

* create a new table for migration resources

* remove raw result bytes from db

* more snapshot resource management stuff

* integrate new table with snapshots

* pass in result limit and offset as params

* combine create and update

* set up xorm store test

* add unit tests

* save some cpu

* remove unneeded arg

* regen swagger

* fix bug with result processing

* fix update create logic so that uid isn't required for lookup

* change offset to page

* regen swagger

* revert accidental changes to file

* curl command page should be 1 indexed
This commit is contained in:
Michael Mandrus 2024-06-24 23:50:07 -04:00 committed by GitHub
parent dfee2720cc
commit 4d69213829
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 369 additions and 158 deletions

View File

@ -412,23 +412,32 @@ func (cma *CloudMigrationAPI) GetSnapshot(c *contextmodel.ReqContext) response.R
return response.ErrOrFallback(http.StatusBadRequest, "invalid snapshot uid", err) return response.ErrOrFallback(http.StatusBadRequest, "invalid snapshot uid", err)
} }
snapshot, err := cma.cloudMigrationService.GetSnapshot(ctx, sessUid, snapshotUid) q := cloudmigration.GetSnapshotsQuery{
SnapshotUID: snapshotUid,
SessionUID: sessUid,
ResultPage: c.QueryInt("resultPage"),
ResultLimit: c.QueryInt("resultLimit"),
}
if q.ResultLimit == 0 {
q.ResultLimit = 100
}
if q.ResultPage < 1 {
q.ResultPage = 1
}
snapshot, err := cma.cloudMigrationService.GetSnapshot(ctx, q)
if err != nil { if err != nil {
return response.ErrOrFallback(http.StatusInternalServerError, "error retrieving snapshot", err) return response.ErrOrFallback(http.StatusInternalServerError, "error retrieving snapshot", err)
} }
result, err := snapshot.GetSnapshotResult() results := snapshot.Resources
if err != nil {
return response.ErrOrFallback(http.StatusInternalServerError, "error snapshot reading snapshot results", err)
}
dtoResults := make([]MigrateDataResponseItemDTO, len(result)) dtoResults := make([]MigrateDataResponseItemDTO, len(results))
for i := 0; i < len(result); i++ { for i := 0; i < len(results); i++ {
dtoResults[i] = MigrateDataResponseItemDTO{ dtoResults[i] = MigrateDataResponseItemDTO{
Type: MigrateDataType(result[i].Type), Type: MigrateDataType(results[i].Type),
RefID: result[i].RefID, RefID: results[i].RefID,
Status: ItemStatus(result[i].Status), Status: ItemStatus(results[i].Status),
Error: result[i].Error, Error: results[i].Error,
} }
} }
@ -467,11 +476,14 @@ func (cma *CloudMigrationAPI) GetSnapshotList(c *contextmodel.ReqContext) respon
q := cloudmigration.ListSnapshotsQuery{ q := cloudmigration.ListSnapshotsQuery{
SessionUID: uid, SessionUID: uid,
Limit: c.QueryInt("limit"), Limit: c.QueryInt("limit"),
Offset: c.QueryInt("offset"), Page: c.QueryInt("page"),
} }
if q.Limit == 0 { if q.Limit == 0 {
q.Limit = 100 q.Limit = 100
} }
if q.Page < 1 {
q.Page = 1
}
snapshotList, err := cma.cloudMigrationService.GetSnapshotList(ctx, q) snapshotList, err := cma.cloudMigrationService.GetSnapshotList(ctx, q)
if err != nil { if err != nil {

View File

@ -11,10 +11,10 @@ curl -X POST -H "Content-Type: application/json" \
http://admin:admin@localhost:3000/api/cloudmigration/migration/{sessionUid}/snapshot http://admin:admin@localhost:3000/api/cloudmigration/migration/{sessionUid}/snapshot
[get snapshot list] [get snapshot list]
curl -X GET http://admin:admin@localhost:3000/api/cloudmigration/migration/{sessionUid}/snapshots?limit=100&offset=0 curl -X GET http://admin:admin@localhost:3000/api/cloudmigration/migration/{sessionUid}/snapshots?limit=100&page=1
[get snapshot] [get snapshot]
curl -X GET http://admin:admin@localhost:3000/api/cloudmigration/migration/{sessionUid}/snapshot/{snapshotUid} curl -X GET http://admin:admin@localhost:3000/api/cloudmigration/migration/{sessionUid}/snapshot/{snapshotUid}?resultLimit=100&resultPage=1
[upload snapshot] [upload snapshot]
curl -X POST -H "Content-Type: application/json" \ curl -X POST -H "Content-Type: application/json" \

View File

@ -128,8 +128,10 @@ const (
type ItemStatus string type ItemStatus string
const ( const (
ItemStatusOK ItemStatus = "OK" ItemStatusOK ItemStatus = "OK"
ItemStatusError ItemStatus = "ERROR" ItemStatusError ItemStatus = "ERROR"
ItemStatusPending ItemStatus = "PENDING"
ItemStatusUnknown ItemStatus = "UNKNOWN"
) )
// swagger:parameters getCloudMigrationRun // swagger:parameters getCloudMigrationRun
@ -268,6 +270,18 @@ type CreateSnapshotResponseDTO struct {
// swagger:parameters getSnapshot // swagger:parameters getSnapshot
type GetSnapshotParams struct { type GetSnapshotParams struct {
// ResultPage is used for pagination with ResultLimit
// in:query
// required:false
// default: 1
ResultPage int `json:"resultPage"`
// Max limit for snapshot results returned.
// in:query
// required:false
// default: 100
ResultLimit int `json:"resultLimit"`
// Session UID of a session // Session UID of a session
// in: path // in: path
UID string `json:"uid"` UID string `json:"uid"`
@ -290,16 +304,18 @@ type GetSnapshotResponseDTO struct {
// swagger:parameters getShapshotList // swagger:parameters getShapshotList
type GetSnapshotListParams struct { type GetSnapshotListParams struct {
// Offset is used for pagination with limit // Page is used for pagination with limit
// in:query // in:query
// required:false // required:false
// default: 0 // default: 1
Offset int `json:"offset"` Page int `json:"page"`
// Max limit for results returned. // Max limit for results returned.
// in:query // in:query
// required:false // required:false
// default: 100 // default: 100
Limit int `json:"limit"` Limit int `json:"limit"`
// Session UID of a session // Session UID of a session
// in: path // in: path
UID string `json:"uid"` UID string `json:"uid"`

View File

@ -25,7 +25,7 @@ type Service interface {
GetMigrationRunList(ctx context.Context, migUID string) (*CloudMigrationRunList, error) GetMigrationRunList(ctx context.Context, migUID string) (*CloudMigrationRunList, error)
CreateSnapshot(ctx context.Context, sessionUid string) (*CloudMigrationSnapshot, error) CreateSnapshot(ctx context.Context, sessionUid string) (*CloudMigrationSnapshot, error)
GetSnapshot(ctx context.Context, sessionUid string, snapshotUid string) (*CloudMigrationSnapshot, error) GetSnapshot(ctx context.Context, query GetSnapshotsQuery) (*CloudMigrationSnapshot, error)
GetSnapshotList(ctx context.Context, query ListSnapshotsQuery) ([]CloudMigrationSnapshot, error) GetSnapshotList(ctx context.Context, query ListSnapshotsQuery) ([]CloudMigrationSnapshot, error)
UploadSnapshot(ctx context.Context, sessionUid string, snapshotUid string) error UploadSnapshot(ctx context.Context, sessionUid string, snapshotUid string) error
CancelSnapshot(ctx context.Context, sessionUid string, snapshotUid string) error CancelSnapshot(ctx context.Context, sessionUid string, snapshotUid string) error

View File

@ -404,16 +404,10 @@ func (s *Service) RunMigration(ctx context.Context, uid string) (*cloudmigration
return nil, fmt.Errorf("migrate data error: %w", err) return nil, fmt.Errorf("migrate data error: %w", err)
} }
respData, err := json.Marshal(resp)
if err != nil {
s.log.Error("error marshalling migration response data: %w", err)
return nil, fmt.Errorf("marshalling migration response data: %w", err)
}
// save the result of the migration // save the result of the migration
runUID, err := s.createMigrationRun(ctx, cloudmigration.CloudMigrationSnapshot{ runUID, err := s.createMigrationRun(ctx, cloudmigration.CloudMigrationSnapshot{
SessionUID: migration.UID, SessionUID: migration.UID,
Result: respData, Resources: resp.Items,
}) })
if err != nil { if err != nil {
response.Error(http.StatusInternalServerError, "migration run save error", err) response.Error(http.StatusInternalServerError, "migration run save error", err)
@ -513,11 +507,12 @@ func (s *Service) CreateSnapshot(ctx context.Context, sessionUid string) (*cloud
} }
// GetSnapshot returns the on-prem version of a snapshot, supplemented with processing status from GMS // GetSnapshot returns the on-prem version of a snapshot, supplemented with processing status from GMS
func (s *Service) GetSnapshot(ctx context.Context, sessionUid string, snapshotUid string) (*cloudmigration.CloudMigrationSnapshot, error) { func (s *Service) GetSnapshot(ctx context.Context, query cloudmigration.GetSnapshotsQuery) (*cloudmigration.CloudMigrationSnapshot, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.GetSnapshot") ctx, span := s.tracer.Start(ctx, "CloudMigrationService.GetSnapshot")
defer span.End() defer span.End()
snapshot, err := s.store.GetSnapshotByUID(ctx, snapshotUid) sessionUid, snapshotUid := query.SessionUID, query.SnapshotUID
snapshot, err := s.store.GetSnapshotByUID(ctx, snapshotUid, query.ResultPage, query.ResultLimit)
if err != nil { if err != nil {
return nil, fmt.Errorf("fetching snapshot for uid %s: %w", snapshotUid, err) return nil, fmt.Errorf("fetching snapshot for uid %s: %w", snapshotUid, err)
} }
@ -534,16 +529,12 @@ func (s *Service) GetSnapshot(ctx context.Context, sessionUid string, snapshotUi
return nil, fmt.Errorf("error fetching snapshot status from GMS: sessionUid: %s, snapshotUid: %s", sessionUid, snapshotUid) return nil, fmt.Errorf("error fetching snapshot status from GMS: sessionUid: %s, snapshotUid: %s", sessionUid, snapshotUid)
} }
// grab any result available
// TODO: figure out a more intelligent way to do this, will depend on GMS apis
snapshot.Result = snapshotMeta.Result
if snapshotMeta.Status == cloudmigration.SnapshotStatusFinished { if snapshotMeta.Status == cloudmigration.SnapshotStatusFinished {
// we need to update the snapshot in our db before reporting anything finished to the client // we need to update the snapshot in our db before reporting anything finished to the client
if err := s.store.UpdateSnapshot(ctx, cloudmigration.UpdateSnapshotCmd{ if err := s.store.UpdateSnapshot(ctx, cloudmigration.UpdateSnapshotCmd{
UID: snapshot.UID, UID: snapshot.UID,
Status: cloudmigration.SnapshotStatusFinished, Status: cloudmigration.SnapshotStatusFinished,
Result: snapshot.Result, Resources: snapshotMeta.Resources,
}); err != nil { }); err != nil {
return nil, fmt.Errorf("error updating snapshot status: %w", err) return nil, fmt.Errorf("error updating snapshot status: %w", err)
} }
@ -568,7 +559,10 @@ func (s *Service) UploadSnapshot(ctx context.Context, sessionUid string, snapsho
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.UploadSnapshot") ctx, span := s.tracer.Start(ctx, "CloudMigrationService.UploadSnapshot")
defer span.End() defer span.End()
snapshot, err := s.GetSnapshot(ctx, sessionUid, snapshotUid) snapshot, err := s.GetSnapshot(ctx, cloudmigration.GetSnapshotsQuery{
SnapshotUID: snapshotUid,
SessionUID: sessionUid,
})
if err != nil { if err != nil {
return fmt.Errorf("fetching snapshot with uid %s: %w", snapshotUid, err) return fmt.Errorf("fetching snapshot with uid %s: %w", snapshotUid, err)
} }

View File

@ -64,7 +64,7 @@ func (s *NoopServiceImpl) CreateSnapshot(ctx context.Context, sessionUid string)
return nil, cloudmigration.ErrFeatureDisabledError return nil, cloudmigration.ErrFeatureDisabledError
} }
func (s *NoopServiceImpl) GetSnapshot(ctx context.Context, sessionUid string, snapshotUid string) (*cloudmigration.CloudMigrationSnapshot, error) { func (s *NoopServiceImpl) GetSnapshot(ctx context.Context, query cloudmigration.GetSnapshotsQuery) (*cloudmigration.CloudMigrationSnapshot, error) {
return nil, cloudmigration.ErrFeatureDisabledError return nil, cloudmigration.ErrFeatureDisabledError
} }

View File

@ -145,18 +145,28 @@ func Test_ExecuteAsyncWorkflow(t *testing.T) {
require.Equal(t, sessionUid, snapshotResp.SessionUID) require.Equal(t, sessionUid, snapshotResp.SessionUID)
snapshotUid := snapshotResp.UID snapshotUid := snapshotResp.UID
snapshot, err := s.GetSnapshot(ctxWithSignedInUser(), sessionUid, snapshotUid) // Service doesn't currently expose updating a snapshot externally, so we will just manually add a resource
err = (s.(*Service)).store.CreateUpdateSnapshotResources(context.Background(), snapshotUid, []cloudmigration.CloudMigrationResource{{Type: cloudmigration.DashboardDataType, RefID: "qwerty", Status: cloudmigration.ItemStatusOK}})
assert.NoError(t, err)
snapshot, err := s.GetSnapshot(ctxWithSignedInUser(), cloudmigration.GetSnapshotsQuery{
SnapshotUID: snapshotUid,
SessionUID: sessionUid,
ResultPage: 1,
ResultLimit: 100,
})
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, snapshotResp.UID, snapshot.UID) assert.Equal(t, snapshotResp.UID, snapshot.UID)
assert.Equal(t, snapshotResp.EncryptionKey, snapshot.EncryptionKey) assert.Equal(t, snapshotResp.EncryptionKey, snapshot.EncryptionKey)
assert.Empty(t, snapshot.Result) // will change once we create a new table for migration items assert.Len(t, snapshot.Resources, 1)
assert.Equal(t, "qwerty", snapshot.Resources[0].RefID)
snapshots, err := s.GetSnapshotList(ctxWithSignedInUser(), cloudmigration.ListSnapshotsQuery{SessionUID: sessionUid, Limit: 100}) snapshots, err := s.GetSnapshotList(ctxWithSignedInUser(), cloudmigration.ListSnapshotsQuery{SessionUID: sessionUid, Page: 1, Limit: 100})
require.NoError(t, err) require.NoError(t, err)
assert.Len(t, snapshots, 1) assert.Len(t, snapshots, 1)
assert.Equal(t, snapshotResp.UID, snapshots[0].UID) assert.Equal(t, snapshotResp.UID, snapshots[0].UID)
assert.Equal(t, snapshotResp.EncryptionKey, snapshots[0].EncryptionKey) assert.Equal(t, snapshotResp.EncryptionKey, snapshots[0].EncryptionKey)
assert.Empty(t, snapshots[0].Result) // should remain this way even after we create a new table assert.Empty(t, snapshots[0].Resources)
err = s.UploadSnapshot(ctxWithSignedInUser(), sessionUid, snapshotUid) err = s.UploadSnapshot(ctxWithSignedInUser(), sessionUid, snapshotUid)
require.NoError(t, err) require.NoError(t, err)

View File

@ -2,7 +2,6 @@ package fake
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"time" "time"
@ -93,7 +92,7 @@ func (m FakeServiceImpl) RunMigration(_ context.Context, _ string) (*cloudmigrat
func fakeMigrateDataResponseDTO() cloudmigration.MigrateDataResponse { func fakeMigrateDataResponseDTO() cloudmigration.MigrateDataResponse {
return cloudmigration.MigrateDataResponse{ return cloudmigration.MigrateDataResponse{
RunUID: "fake_uid", RunUID: "fake_uid",
Items: []cloudmigration.MigrateDataResponseItem{ Items: []cloudmigration.CloudMigrationResource{
{Type: "type", RefID: "make_refid", Status: "ok", Error: "none"}, {Type: "type", RefID: "make_refid", Status: "ok", Error: "none"},
}, },
} }
@ -107,15 +106,11 @@ func (m FakeServiceImpl) GetMigrationStatus(_ context.Context, _ string) (*cloud
if m.ReturnError { if m.ReturnError {
return nil, fmt.Errorf("mock error") return nil, fmt.Errorf("mock error")
} }
result, err := json.Marshal(fakeMigrateDataResponseDTO())
if err != nil {
return nil, err
}
return &cloudmigration.CloudMigrationSnapshot{ return &cloudmigration.CloudMigrationSnapshot{
ID: 0, ID: 0,
UID: "fake_uid", UID: "fake_uid",
SessionUID: "fake_mig_uid", SessionUID: "fake_mig_uid",
Result: result, Resources: fakeMigrateDataResponseDTO().Items,
Created: fixedDate, Created: fixedDate,
Updated: fixedDate, Updated: fixedDate,
Finished: fixedDate, Finished: fixedDate,
@ -145,7 +140,7 @@ func (m FakeServiceImpl) CreateSnapshot(ctx context.Context, sessionUid string)
}, nil }, nil
} }
func (m FakeServiceImpl) GetSnapshot(ctx context.Context, sessionUid string, snapshotUid string) (*cloudmigration.CloudMigrationSnapshot, error) { func (m FakeServiceImpl) GetSnapshot(ctx context.Context, query cloudmigration.GetSnapshotsQuery) (*cloudmigration.CloudMigrationSnapshot, error) {
if m.ReturnError { if m.ReturnError {
return nil, fmt.Errorf("mock error") return nil, fmt.Errorf("mock error")
} }

View File

@ -18,6 +18,10 @@ type store interface {
CreateSnapshot(ctx context.Context, snapshot cloudmigration.CloudMigrationSnapshot) (string, error) CreateSnapshot(ctx context.Context, snapshot cloudmigration.CloudMigrationSnapshot) (string, error)
UpdateSnapshot(ctx context.Context, snapshot cloudmigration.UpdateSnapshotCmd) error UpdateSnapshot(ctx context.Context, snapshot cloudmigration.UpdateSnapshotCmd) error
GetSnapshotByUID(ctx context.Context, uid string) (*cloudmigration.CloudMigrationSnapshot, error) GetSnapshotByUID(ctx context.Context, uid string, resultPage int, resultLimit int) (*cloudmigration.CloudMigrationSnapshot, error)
GetSnapshotList(ctx context.Context, query cloudmigration.ListSnapshotsQuery) ([]cloudmigration.CloudMigrationSnapshot, error) GetSnapshotList(ctx context.Context, query cloudmigration.ListSnapshotsQuery) ([]cloudmigration.CloudMigrationSnapshot, error)
CreateUpdateSnapshotResources(ctx context.Context, snapshotUid string, resources []cloudmigration.CloudMigrationResource) error
GetSnapshotResources(ctx context.Context, snapshotUid string, page int, limit int) ([]cloudmigration.CloudMigrationResource, error)
DeleteSnapshotResources(ctx context.Context, snapshotUid string) error
} }

View File

@ -150,9 +150,7 @@ func (ss *sqlStore) CreateSnapshot(ctx context.Context, snapshot cloudmigration.
if err := ss.encryptKey(ctx, &snapshot); err != nil { if err := ss.encryptKey(ctx, &snapshot); err != nil {
return "", err return "", err
} }
if snapshot.Result == nil {
snapshot.Result = make([]byte, 0)
}
if snapshot.UID == "" { if snapshot.UID == "" {
snapshot.UID = util.GenerateShortUID() snapshot.UID = util.GenerateShortUID()
} }
@ -181,38 +179,31 @@ func (ss *sqlStore) UpdateSnapshot(ctx context.Context, update cloudmigration.Up
} }
err := ss.db.InTransaction(ctx, func(ctx context.Context) error { err := ss.db.InTransaction(ctx, func(ctx context.Context) error {
// Update status if set // Update status if set
if err := ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error { if update.Status != "" {
if update.Status != "" { if err := ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
rawSQL := "UPDATE cloud_migration_snapshot SET status=? WHERE uid=?" rawSQL := "UPDATE cloud_migration_snapshot SET status=? WHERE uid=?"
if _, err := sess.Exec(rawSQL, update.Status, update.UID); err != nil { if _, err := sess.Exec(rawSQL, update.Status, update.UID); err != nil {
return fmt.Errorf("updating snapshot status for uid %s: %w", update.UID, err) return fmt.Errorf("updating snapshot status for uid %s: %w", update.UID, err)
} }
return nil
}); err != nil {
return err
} }
return nil
}); err != nil {
return err
} }
// Update result if set // Update resources if set
if err := ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error { if len(update.Resources) > 0 {
if len(update.Result) > 0 { if err := ss.CreateUpdateSnapshotResources(ctx, update.UID, update.Resources); err != nil {
rawSQL := "UPDATE cloud_migration_snapshot SET result=? WHERE uid=?" return err
if _, err := sess.Exec(rawSQL, update.Result, update.UID); err != nil {
return fmt.Errorf("updating snapshot result for uid %s: %w", update.UID, err)
}
} }
return nil
}); err != nil {
return err
} }
return nil return nil
}) })
return err return err
} }
func (ss *sqlStore) GetSnapshotByUID(ctx context.Context, uid string) (*cloudmigration.CloudMigrationSnapshot, error) { func (ss *sqlStore) GetSnapshotByUID(ctx context.Context, uid string, resultPage int, resultLimit int) (*cloudmigration.CloudMigrationSnapshot, error) {
var snapshot cloudmigration.CloudMigrationSnapshot var snapshot cloudmigration.CloudMigrationSnapshot
err := ss.db.WithDbSession(ctx, func(sess *db.Session) error { err := ss.db.WithDbSession(ctx, func(sess *db.Session) error {
exist, err := sess.Where("uid=?", uid).Get(&snapshot) exist, err := sess.Where("uid=?", uid).Get(&snapshot)
@ -224,18 +215,28 @@ func (ss *sqlStore) GetSnapshotByUID(ctx context.Context, uid string) (*cloudmig
} }
return nil return nil
}) })
if err != nil {
return nil, err
}
if err := ss.decryptKey(ctx, &snapshot); err != nil { if err := ss.decryptKey(ctx, &snapshot); err != nil {
return &snapshot, err return &snapshot, err
} }
resources, err := ss.GetSnapshotResources(ctx, uid, resultPage, resultLimit)
if err == nil {
snapshot.Resources = resources
}
return &snapshot, err return &snapshot, err
} }
// GetSnapshotList returns snapshots without resources included. Use GetSnapshotByUID to get individual snapshot results.
func (ss *sqlStore) GetSnapshotList(ctx context.Context, query cloudmigration.ListSnapshotsQuery) ([]cloudmigration.CloudMigrationSnapshot, error) { func (ss *sqlStore) GetSnapshotList(ctx context.Context, query cloudmigration.ListSnapshotsQuery) ([]cloudmigration.CloudMigrationSnapshot, error) {
var snapshots = make([]cloudmigration.CloudMigrationSnapshot, 0) var snapshots = make([]cloudmigration.CloudMigrationSnapshot, 0)
err := ss.db.WithDbSession(ctx, func(sess *db.Session) error { err := ss.db.WithDbSession(ctx, func(sess *db.Session) error {
sess.Limit(query.Limit, query.Offset) offset := (query.Page - 1) * query.Limit
sess.Limit(query.Limit, offset)
return sess.Find(&snapshots, &cloudmigration.CloudMigrationSnapshot{ return sess.Find(&snapshots, &cloudmigration.CloudMigrationSnapshot{
SessionUID: query.SessionUID, SessionUID: query.SessionUID,
}) })
@ -252,6 +253,70 @@ func (ss *sqlStore) GetSnapshotList(ctx context.Context, query cloudmigration.Li
return snapshots, nil return snapshots, nil
} }
func (ss *sqlStore) CreateUpdateSnapshotResources(ctx context.Context, snapshotUid string, resources []cloudmigration.CloudMigrationResource) error {
// ensure snapshot_uids are consistent so that we can use them to query when uid isn't known
for i := 0; i < len(resources); i++ {
resources[i].SnapshotUID = snapshotUid
}
return ss.db.InTransaction(ctx, func(ctx context.Context) error {
sql := "UPDATE cloud_migration_resource SET status=?, error_string=? WHERE uid=? OR (snapshot_uid=? AND resource_uid=?)"
err := ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
for _, r := range resources {
// try an update first
result, err := sess.Exec(sql, r.Status, r.Error, r.UID, snapshotUid, r.RefID)
if err != nil {
return err
}
// if this had no effect, assign a uid and insert instead
n, err := result.RowsAffected()
if err != nil {
return err
} else if n == 0 {
r.UID = util.GenerateShortUID()
_, err := sess.Insert(r)
if err != nil {
return err
}
}
}
return nil
})
if err != nil {
return fmt.Errorf("updating resources: %w", err)
}
return nil
})
}
func (ss *sqlStore) GetSnapshotResources(ctx context.Context, snapshotUid string, page int, limit int) ([]cloudmigration.CloudMigrationResource, error) {
var resources []cloudmigration.CloudMigrationResource
if limit == 0 {
return resources, nil
}
err := ss.db.WithDbSession(ctx, func(sess *db.Session) error {
offset := (page - 1) * limit
sess.Limit(limit, offset)
return sess.Find(&resources, &cloudmigration.CloudMigrationResource{
SnapshotUID: snapshotUid,
})
})
if err != nil {
return nil, err
}
return resources, nil
}
func (ss *sqlStore) DeleteSnapshotResources(ctx context.Context, snapshotUid string) error {
return ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
_, err := sess.Delete(cloudmigration.CloudMigrationResource{
SnapshotUID: snapshotUid,
})
return err
})
}
func (ss *sqlStore) encryptToken(ctx context.Context, cm *cloudmigration.CloudMigrationSession) error { func (ss *sqlStore) encryptToken(ctx context.Context, cm *cloudmigration.CloudMigrationSession) error {
s, err := ss.secretsService.Encrypt(ctx, []byte(cm.AuthToken), secrets.WithoutScope()) s, err := ss.secretsService.Encrypt(ctx, []byte(cm.AuthToken), secrets.WithoutScope())
if err != nil { if err != nil {

View File

@ -12,6 +12,7 @@ import (
"github.com/grafana/grafana/pkg/services/sqlstore" "github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/tests/testsuite" "github.com/grafana/grafana/pkg/tests/testsuite"
"github.com/grafana/grafana/pkg/util" "github.com/grafana/grafana/pkg/util"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -111,10 +112,9 @@ func Test_CreateMigrationRun(t *testing.T) {
ctx := context.Background() ctx := context.Background()
t.Run("creates a session run and retrieves it from db", func(t *testing.T) { t.Run("creates a session run and retrieves it from db", func(t *testing.T) {
result := []byte("OK")
cmr := cloudmigration.CloudMigrationSnapshot{ cmr := cloudmigration.CloudMigrationSnapshot{
SessionUID: "asdfg", SessionUID: "asdfg",
Result: result, Status: cloudmigration.SnapshotStatusFinished,
} }
createResp, err := s.CreateMigrationRun(ctx, cmr) createResp, err := s.CreateMigrationRun(ctx, cmr)
@ -123,7 +123,7 @@ func Test_CreateMigrationRun(t *testing.T) {
getMRResp, err := s.GetMigrationStatus(ctx, createResp) getMRResp, err := s.GetMigrationStatus(ctx, createResp)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, result, getMRResp.Result) require.Equal(t, cmr.Status, getMRResp.Status)
}) })
} }
@ -181,7 +181,7 @@ func Test_SnapshotManagement(t *testing.T) {
require.NotEmpty(t, snapshotUid) require.NotEmpty(t, snapshotUid)
//retrieve it from the db //retrieve it from the db
snapshot, err := s.GetSnapshotByUID(ctx, snapshotUid) snapshot, err := s.GetSnapshotByUID(ctx, snapshotUid, 0, 0)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, cloudmigration.SnapshotStatusInitializing, string(snapshot.Status)) require.Equal(t, cloudmigration.SnapshotStatusInitializing, string(snapshot.Status))
@ -190,18 +190,71 @@ func Test_SnapshotManagement(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
//retrieve it again //retrieve it again
snapshot, err = s.GetSnapshotByUID(ctx, snapshotUid) snapshot, err = s.GetSnapshotByUID(ctx, snapshotUid, 0, 0)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, cloudmigration.SnapshotStatusCreating, string(snapshot.Status)) require.Equal(t, cloudmigration.SnapshotStatusCreating, string(snapshot.Status))
// lists snapshots and ensures it's in there // lists snapshots and ensures it's in there
snapshots, err := s.GetSnapshotList(ctx, cloudmigration.ListSnapshotsQuery{SessionUID: sessionUid, Offset: 0, Limit: 100}) snapshots, err := s.GetSnapshotList(ctx, cloudmigration.ListSnapshotsQuery{SessionUID: sessionUid, Page: 1, Limit: 100})
require.NoError(t, err) require.NoError(t, err)
require.Len(t, snapshots, 1) require.Len(t, snapshots, 1)
require.Equal(t, *snapshot, snapshots[0]) require.Equal(t, *snapshot, snapshots[0])
}) })
} }
func Test_SnapshotResources(t *testing.T) {
_, s := setUpTest(t)
ctx := context.Background()
t.Run("tests CRUD of snapshot resources", func(t *testing.T) {
// Get the default rows from the test
resources, err := s.GetSnapshotResources(ctx, "poiuy", 0, 100)
assert.NoError(t, err)
assert.Len(t, resources, 3)
// create a new resource and update an existing resource
err = s.CreateUpdateSnapshotResources(ctx, "poiuy", []cloudmigration.CloudMigrationResource{
{
Type: cloudmigration.DatasourceDataType,
RefID: "mi39fj",
Status: cloudmigration.ItemStatusOK,
},
{
UID: "qwerty",
Status: cloudmigration.ItemStatusOK,
},
})
assert.NoError(t, err)
// Get resources again
resources, err = s.GetSnapshotResources(ctx, "poiuy", 0, 100)
assert.NoError(t, err)
assert.Len(t, resources, 4)
// ensure existing resource was updated
for _, r := range resources {
if r.UID == "querty" {
assert.Equal(t, cloudmigration.ItemStatusOK, r.Status)
break
}
}
// ensure a new one was made
for _, r := range resources {
if r.UID == "mi39fj" {
assert.Equal(t, cloudmigration.ItemStatusOK, r.Status)
break
}
}
// delete snapshot resources
err = s.DeleteSnapshotResources(ctx, "poiuy")
assert.NoError(t, err)
// make sure they're gone
resources, err = s.GetSnapshotResources(ctx, "poiuy", 0, 100)
assert.NoError(t, err)
assert.Len(t, resources, 0)
})
}
func setUpTest(t *testing.T) (*sqlstore.SQLStore, *sqlStore) { func setUpTest(t *testing.T) (*sqlstore.SQLStore, *sqlStore) {
testDB := db.InitTestDB(t) testDB := db.InitTestDB(t)
s := &sqlStore{ s := &sqlStore{
@ -212,12 +265,12 @@ func setUpTest(t *testing.T) (*sqlstore.SQLStore, *sqlStore) {
// insert cloud migration test data // insert cloud migration test data
_, err := testDB.GetSqlxSession().Exec(ctx, ` _, err := testDB.GetSqlxSession().Exec(ctx, `
INSERT INTO INSERT INTO
cloud_migration_session (id, uid, auth_token, slug, stack_id, region_slug, cluster_slug, created, updated) cloud_migration_session (id, uid, auth_token, slug, stack_id, region_slug, cluster_slug, created, updated)
VALUES VALUES
(1,'qwerty', ?, '11111', 11111, 'test', 'test', '2024-03-25 15:30:36.000', '2024-03-27 15:30:43.000'), (1,'qwerty', ?, '11111', 11111, 'test', 'test', '2024-03-25 15:30:36.000', '2024-03-27 15:30:43.000'),
(2,'asdfgh', ?, '22222', 22222, 'test', 'test', '2024-03-25 15:30:36.000', '2024-03-27 15:30:43.000'), (2,'asdfgh', ?, '22222', 22222, 'test', 'test', '2024-03-25 15:30:36.000', '2024-03-27 15:30:43.000'),
(3,'zxcvbn', ?, '33333', 33333, 'test', 'test', '2024-03-25 15:30:36.000', '2024-03-27 15:30:43.000'); (3,'zxcvbn', ?, '33333', 33333, 'test', 'test', '2024-03-25 15:30:36.000', '2024-03-27 15:30:43.000');
`, `,
encodeToken("12345"), encodeToken("12345"),
encodeToken("6789"), encodeToken("6789"),
@ -227,16 +280,25 @@ func setUpTest(t *testing.T) (*sqlstore.SQLStore, *sqlStore) {
// insert cloud migration run test data // insert cloud migration run test data
_, err = testDB.GetSqlxSession().Exec(ctx, ` _, err = testDB.GetSqlxSession().Exec(ctx, `
INSERT INTO INSERT INTO
cloud_migration_snapshot (session_uid, uid, result, created, updated, finished, status) cloud_migration_snapshot (session_uid, uid, created, updated, finished, status)
VALUES VALUES
('qwerty', 'poiuy', ?, '2024-03-25 15:30:36.000', '2024-03-27 15:30:43.000', '2024-03-27 15:30:43.000', "finished"), ('qwerty', 'poiuy', '2024-03-25 15:30:36.000', '2024-03-27 15:30:43.000', '2024-03-27 15:30:43.000', "finished"),
('qwerty', 'lkjhg', ?, '2024-03-25 15:30:36.000', '2024-03-27 15:30:43.000', '2024-03-27 15:30:43.000', "finished"), ('qwerty', 'lkjhg', '2024-03-25 15:30:36.000', '2024-03-27 15:30:43.000', '2024-03-27 15:30:43.000', "finished"),
('zxcvbn', 'mnbvvc', ?, '2024-03-25 15:30:36.000', '2024-03-27 15:30:43.000', '2024-03-27 15:30:43.000', "finished"); ('zxcvbn', 'mnbvvc', '2024-03-25 15:30:36.000', '2024-03-27 15:30:43.000', '2024-03-27 15:30:43.000', "finished");
`, `,
[]byte("ERROR"), )
[]byte("OK"), require.NoError(t, err)
[]byte("OK"),
_, err = testDB.GetSqlxSession().Exec(ctx, `
INSERT INTO
cloud_migration_resource (uid, snapshot_uid, resource_type, resource_uid, status, error_string)
VALUES
('mnbvde', 'poiuy', 'DATASOURCE', 'jf38gh', 'OK', ''),
('qwerty', 'poiuy', 'DASHBOARD', 'ejcx4d', 'ERROR', 'fake error'),
('zxcvbn', 'poiuy', 'FOLDER', 'fi39fj', 'PENDING', ''),
('4fi9sd', '39fi39', 'FOLDER', 'fi39fj', 'OK', '');
`,
) )
require.NoError(t, err) require.NoError(t, err)

View File

@ -137,10 +137,10 @@ func convertRequestToDTO(request cloudmigration.MigrateDataRequest) MigrateDataR
} }
func convertResponseFromDTO(result MigrateDataResponseDTO) cloudmigration.MigrateDataResponse { func convertResponseFromDTO(result MigrateDataResponseDTO) cloudmigration.MigrateDataResponse {
items := make([]cloudmigration.MigrateDataResponseItem, len(result.Items)) items := make([]cloudmigration.CloudMigrationResource, len(result.Items))
for i := 0; i < len(result.Items); i++ { for i := 0; i < len(result.Items); i++ {
item := result.Items[i] item := result.Items[i]
items[i] = cloudmigration.MigrateDataResponseItem{ items[i] = cloudmigration.CloudMigrationResource{
Type: cloudmigration.MigrateDataType(item.Type), Type: cloudmigration.MigrateDataType(item.Type),
RefID: item.RefID, RefID: item.RefID,
Status: cloudmigration.ItemStatus(item.Status), Status: cloudmigration.ItemStatus(item.Status),

View File

@ -2,7 +2,6 @@ package gmsclient
import ( import (
"context" "context"
"encoding/json"
"math/rand" "math/rand"
"time" "time"
@ -29,11 +28,11 @@ func (c *memoryClientImpl) MigrateData(
request cloudmigration.MigrateDataRequest, request cloudmigration.MigrateDataRequest,
) (*cloudmigration.MigrateDataResponse, error) { ) (*cloudmigration.MigrateDataResponse, error) {
result := cloudmigration.MigrateDataResponse{ result := cloudmigration.MigrateDataResponse{
Items: make([]cloudmigration.MigrateDataResponseItem, len(request.Items)), Items: make([]cloudmigration.CloudMigrationResource, len(request.Items)),
} }
for i, v := range request.Items { for i, v := range request.Items {
result.Items[i] = cloudmigration.MigrateDataResponseItem{ result.Items[i] = cloudmigration.CloudMigrationResource{
Type: v.Type, Type: v.Type,
RefID: v.RefID, RefID: v.RefID,
Status: cloudmigration.ItemStatusOK, Status: cloudmigration.ItemStatusOK,
@ -60,38 +59,32 @@ func (c *memoryClientImpl) InitializeSnapshot(context.Context, cloudmigration.Cl
} }
func (c *memoryClientImpl) GetSnapshotStatus(ctx context.Context, session cloudmigration.CloudMigrationSession, snapshot cloudmigration.CloudMigrationSnapshot) (*cloudmigration.CloudMigrationSnapshot, error) { func (c *memoryClientImpl) GetSnapshotStatus(ctx context.Context, session cloudmigration.CloudMigrationSession, snapshot cloudmigration.CloudMigrationSnapshot) (*cloudmigration.CloudMigrationSnapshot, error) {
// just fake an entire response results := []cloudmigration.CloudMigrationResource{
gmsSnapshot := cloudmigration.CloudMigrationSnapshot{
Status: cloudmigration.SnapshotStatusFinished,
GMSSnapshotUID: util.GenerateShortUID(),
Result: []byte{},
Finished: time.Now(),
}
result := []cloudmigration.MigrateDataResponseItem{
{ {
Type: cloudmigration.DashboardDataType, Type: cloudmigration.DashboardDataType,
RefID: util.GenerateShortUID(), RefID: "dash1",
Status: cloudmigration.ItemStatusOK, Status: cloudmigration.ItemStatusOK,
}, },
{ {
Type: cloudmigration.DatasourceDataType, Type: cloudmigration.DatasourceDataType,
RefID: util.GenerateShortUID(), RefID: "ds1",
Status: cloudmigration.ItemStatusError, Status: cloudmigration.ItemStatusError,
Error: "fake error", Error: "fake error",
}, },
{ {
Type: cloudmigration.FolderDataType, Type: cloudmigration.FolderDataType,
RefID: util.GenerateShortUID(), RefID: "folder1",
Status: cloudmigration.ItemStatusOK, Status: cloudmigration.ItemStatusOK,
}, },
} }
b, err := json.Marshal(result) // just fake an entire response
if err != nil { gmsSnapshot := cloudmigration.CloudMigrationSnapshot{
return nil, err Status: cloudmigration.SnapshotStatusFinished,
GMSSnapshotUID: "gmssnapshotuid",
Resources: results,
Finished: time.Now(),
} }
gmsSnapshot.Result = b
return &gmsSnapshot, nil return &gmsSnapshot, nil
} }

View File

@ -1,8 +1,6 @@
package cloudmigration package cloudmigration
import ( import (
"encoding/json"
"errors"
"time" "time"
"github.com/grafana/grafana/pkg/apimachinery/errutil" "github.com/grafana/grafana/pkg/apimachinery/errutil"
@ -45,8 +43,8 @@ type CloudMigrationSnapshot struct {
Updated time.Time Updated time.Time
Finished time.Time Finished time.Time
// []MigrateDataResponseItem // Stored in the cloud_migration_resource table
Result []byte `xorm:"result"` //store raw gms response body Resources []CloudMigrationResource `xorm:"-"`
} }
type SnapshotStatus string type SnapshotStatus string
@ -63,14 +61,24 @@ const (
SnapshotStatusUnknown = "unknown" SnapshotStatusUnknown = "unknown"
) )
type CloudMigrationResource struct {
ID int64 `xorm:"pk autoincr 'id'"`
UID string `xorm:"uid"`
Type MigrateDataType `xorm:"resource_type"`
RefID string `xorm:"resource_uid"`
Status ItemStatus `xorm:"status"`
Error string `xorm:"error_string"`
SnapshotUID string `xorm:"snapshot_uid"`
}
// Deprecated, use GetSnapshotResult for the async workflow // Deprecated, use GetSnapshotResult for the async workflow
func (s CloudMigrationSnapshot) GetResult() (*MigrateDataResponse, error) { func (s CloudMigrationSnapshot) GetResult() (*MigrateDataResponse, error) {
var result MigrateDataResponse result := MigrateDataResponse{
err := json.Unmarshal(s.Result, &result) RunUID: s.UID,
if err != nil { Items: s.Resources,
return nil, errors.New("could not parse result of run")
} }
result.RunUID = s.UID
return &result, nil return &result, nil
} }
@ -78,17 +86,6 @@ func (s CloudMigrationSnapshot) ShouldQueryGMS() bool {
return s.Status == SnapshotStatusPendingProcessing || s.Status == SnapshotStatusProcessing return s.Status == SnapshotStatusPendingProcessing || s.Status == SnapshotStatusProcessing
} }
func (s CloudMigrationSnapshot) GetSnapshotResult() ([]MigrateDataResponseItem, error) {
var result []MigrateDataResponseItem
if len(s.Result) > 0 {
err := json.Unmarshal(s.Result, &result)
if err != nil {
return nil, errors.New("could not parse result of run")
}
}
return result, nil
}
type CloudMigrationRunList struct { type CloudMigrationRunList struct {
Runs []MigrateDataResponseList Runs []MigrateDataResponseList
} }
@ -108,16 +105,23 @@ type CloudMigrationSessionListResponse struct {
Sessions []CloudMigrationSessionResponse Sessions []CloudMigrationSessionResponse
} }
type GetSnapshotsQuery struct {
SnapshotUID string
SessionUID string
ResultPage int
ResultLimit int
}
type ListSnapshotsQuery struct { type ListSnapshotsQuery struct {
SessionUID string SessionUID string
Offset int Page int
Limit int Limit int
} }
type UpdateSnapshotCmd struct { type UpdateSnapshotCmd struct {
UID string UID string
Status SnapshotStatus Status SnapshotStatus
Result []byte //store raw gms response body Resources []CloudMigrationResource
} }
// access token // access token
@ -172,26 +176,21 @@ type MigrateDataRequestItem struct {
type ItemStatus string type ItemStatus string
const ( const (
ItemStatusOK ItemStatus = "OK" ItemStatusOK ItemStatus = "OK"
ItemStatusError ItemStatus = "ERROR" ItemStatusError ItemStatus = "ERROR"
ItemStatusPending ItemStatus = "PENDING"
ItemStatusUnknown ItemStatus = "UNKNOWN"
) )
type MigrateDataResponse struct { type MigrateDataResponse struct {
RunUID string RunUID string
Items []MigrateDataResponseItem Items []CloudMigrationResource
} }
type MigrateDataResponseList struct { type MigrateDataResponseList struct {
RunUID string RunUID string
} }
type MigrateDataResponseItem struct {
Type MigrateDataType
RefID string
Status ItemStatus
Error string
}
type CreateSessionResponse struct { type CreateSessionResponse struct {
SnapshotUid string SnapshotUid string
} }

View File

@ -139,4 +139,23 @@ func addCloudMigrationsMigrations(mg *Migrator) {
errorStringColumn := Column{Name: "error_string", Type: DB_Text, Nullable: true} errorStringColumn := Column{Name: "error_string", Type: DB_Text, Nullable: true}
mg.AddMigration("add snapshot error_string column", NewAddColumnMigration(migrationSnapshotTable, &errorStringColumn)) mg.AddMigration("add snapshot error_string column", NewAddColumnMigration(migrationSnapshotTable, &errorStringColumn))
// --- create table for tracking resource migrations
migrationResourceTable := Table{
Name: "cloud_migration_resource",
Columns: []*Column{
{Name: "id", Type: DB_BigInt, IsPrimaryKey: true, IsAutoIncrement: true},
{Name: "uid", Type: DB_NVarchar, Length: 40, Nullable: false},
{Name: "resource_type", Type: DB_NVarchar, Length: 40, Nullable: false},
{Name: "resource_uid", Type: DB_NVarchar, Length: 40, Nullable: false},
{Name: "status", Type: DB_NVarchar, Length: 20, Nullable: false},
{Name: "error_string", Type: DB_Text, Nullable: true},
{Name: "snapshot_uid", Type: DB_NVarchar, Length: 40, Nullable: false},
},
}
mg.AddMigration("create cloud_migration_resource table v1", NewAddTableMigration(migrationResourceTable))
// -- delete the snapshot result column while still in the experimental phase
mg.AddMigration("delete cloud_migration_snapshot.result column", NewRawSQLMigration("ALTER TABLE cloud_migration_snapshot DROP COLUMN result"))
} }

View File

@ -5382,7 +5382,9 @@
"type": "string", "type": "string",
"enum": [ "enum": [
"OK", "OK",
"ERROR" "ERROR",
"PENDING",
"UNKNOWN"
] ]
}, },
"type": { "type": {

View File

@ -2564,6 +2564,22 @@
"summary": "Get metadata about a snapshot, including where it is in its processing and final results.", "summary": "Get metadata about a snapshot, including where it is in its processing and final results.",
"operationId": "getSnapshot", "operationId": "getSnapshot",
"parameters": [ "parameters": [
{
"type": "integer",
"format": "int64",
"default": 1,
"description": "ResultPage is used for pagination with ResultLimit",
"name": "resultPage",
"in": "query"
},
{
"type": "integer",
"format": "int64",
"default": 100,
"description": "Max limit for snapshot results returned.",
"name": "resultLimit",
"in": "query"
},
{ {
"type": "string", "type": "string",
"description": "Session UID of a session", "description": "Session UID of a session",
@ -2694,9 +2710,9 @@
{ {
"type": "integer", "type": "integer",
"format": "int64", "format": "int64",
"default": 0, "default": 1,
"description": "Offset is used for pagination with limit", "description": "Page is used for pagination with limit",
"name": "offset", "name": "page",
"in": "query" "in": "query"
}, },
{ {
@ -16874,7 +16890,9 @@
"type": "string", "type": "string",
"enum": [ "enum": [
"OK", "OK",
"ERROR" "ERROR",
"PENDING",
"UNKNOWN"
] ]
}, },
"type": { "type": {

View File

@ -7009,7 +7009,9 @@
"status": { "status": {
"enum": [ "enum": [
"OK", "OK",
"ERROR" "ERROR",
"PENDING",
"UNKNOWN"
], ],
"type": "string" "type": "string"
}, },
@ -15527,6 +15529,26 @@
"get": { "get": {
"operationId": "getSnapshot", "operationId": "getSnapshot",
"parameters": [ "parameters": [
{
"description": "ResultPage is used for pagination with ResultLimit",
"in": "query",
"name": "resultPage",
"schema": {
"default": 1,
"format": "int64",
"type": "integer"
}
},
{
"description": "Max limit for snapshot results returned.",
"in": "query",
"name": "resultLimit",
"schema": {
"default": 100,
"format": "int64",
"type": "integer"
}
},
{ {
"description": "Session UID of a session", "description": "Session UID of a session",
"in": "path", "in": "path",
@ -15667,11 +15689,11 @@
"operationId": "getShapshotList", "operationId": "getShapshotList",
"parameters": [ "parameters": [
{ {
"description": "Offset is used for pagination with limit", "description": "Page is used for pagination with limit",
"in": "query", "in": "query",
"name": "offset", "name": "page",
"schema": { "schema": {
"default": 0, "default": 1,
"format": "int64", "format": "int64",
"type": "integer" "type": "integer"
} }