CloudMigrations: Fix bugs found during local testing (#91163)

* send dashboard commands instead of dashboards

* move status updates before goroutine to ensure frontend polls

* fix syncing issues between snapshot state and resources

* make sessionUid a requirement for modifying snapshots

* move the function I meant to move earlier

* remove accidental commit

* another accidental commit

* verify UpdateSnapshot is called with sessionUid

* revert

* pass in session uid everywhere

* forgot to save

* fix unit test

* fix typo

* tiny tweak
This commit is contained in:
Michael Mandrus 2024-07-30 09:02:41 -04:00 committed by GitHub
parent 0a561d22fb
commit 3e138449bb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 84 additions and 62 deletions

View File

@ -512,6 +512,15 @@ func (s *Service) CreateSnapshot(ctx context.Context, signedInUser *user.SignedI
}
snapshot.UID = uid
// Update status to "creating" to ensure the frontend polls from now on
if err := s.updateSnapshotWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{
UID: uid,
SessionID: sessionUid,
Status: cloudmigration.SnapshotStatusCreating,
}); err != nil {
return nil, err
}
// start building the snapshot asynchronously while we return a success response to the client
go func() {
s.cancelMutex.Lock()
@ -531,8 +540,9 @@ func (s *Service) CreateSnapshot(ctx context.Context, signedInUser *user.SignedI
s.log.Error("building snapshot", "err", err.Error())
// Update status to error with retries
if err := s.updateSnapshotWithRetries(context.Background(), cloudmigration.UpdateSnapshotCmd{
UID: snapshot.UID,
Status: cloudmigration.SnapshotStatusError,
UID: snapshot.UID,
SessionID: sessionUid,
Status: cloudmigration.SnapshotStatusError,
}); err != nil {
s.log.Error("critical failure during snapshot creation - please report any error logs")
}
@ -550,7 +560,7 @@ func (s *Service) GetSnapshot(ctx context.Context, query cloudmigration.GetSnaps
defer span.End()
sessionUid, snapshotUid := query.SessionUID, query.SnapshotUID
snapshot, err := s.store.GetSnapshotByUID(ctx, snapshotUid, query.ResultPage, query.ResultLimit)
snapshot, err := s.store.GetSnapshotByUID(ctx, sessionUid, snapshotUid, query.ResultPage, query.ResultLimit)
if err != nil {
return nil, fmt.Errorf("fetching snapshot for uid %s: %w", snapshotUid, err)
}
@ -583,13 +593,18 @@ func (s *Service) GetSnapshot(ctx context.Context, query cloudmigration.GetSnaps
// We need to update the snapshot in our db before reporting anything
if err := s.store.UpdateSnapshot(ctx, cloudmigration.UpdateSnapshotCmd{
UID: snapshot.UID,
SessionID: sessionUid,
Status: localStatus,
Resources: snapshotMeta.Results,
}); err != nil {
return nil, fmt.Errorf("error updating snapshot status: %w", err)
}
snapshot.Status = localStatus
snapshot.Resources = append(snapshot.Resources, snapshotMeta.Results...)
// Refresh the snapshot after the update
snapshot, err = s.store.GetSnapshotByUID(ctx, sessionUid, snapshotUid, query.ResultPage, query.ResultLimit)
if err != nil {
return nil, fmt.Errorf("fetching snapshot for uid %s: %w", snapshotUid, err)
}
}
return snapshot, nil
@ -644,6 +659,15 @@ func (s *Service) UploadSnapshot(ctx context.Context, sessionUid string, snapsho
s.log.Info("Uploading snapshot in local directory", "gmsSnapshotUID", snapshot.GMSSnapshotUID, "localDir", snapshot.LocalDir, "uploadURL", uploadUrl)
// Update status to "uploading" to ensure the frontend polls from now on
if err := s.updateSnapshotWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{
UID: snapshotUid,
SessionID: sessionUid,
Status: cloudmigration.SnapshotStatusUploading,
}); err != nil {
return err
}
// start uploading the snapshot asynchronously while we return a success response to the client
go func() {
s.cancelMutex.Lock()
@ -663,8 +687,9 @@ func (s *Service) UploadSnapshot(ctx context.Context, sessionUid string, snapsho
s.log.Error("uploading snapshot", "err", err.Error())
// Update status to error with retries
if err := s.updateSnapshotWithRetries(context.Background(), cloudmigration.UpdateSnapshotCmd{
UID: snapshot.UID,
Status: cloudmigration.SnapshotStatusError,
UID: snapshot.UID,
SessionID: sessionUid,
Status: cloudmigration.SnapshotStatusError,
}); err != nil {
s.log.Error("critical failure during snapshot upload - please report any error logs")
}
@ -692,8 +717,9 @@ func (s *Service) CancelSnapshot(ctx context.Context, sessionUid string, snapsho
s.cancelFunc = nil
if err := s.updateSnapshotWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{
UID: snapshotUid,
Status: cloudmigration.SnapshotStatusCanceled,
UID: snapshotUid,
SessionID: sessionUid,
Status: cloudmigration.SnapshotStatusCanceled,
}); err != nil {
s.log.Error("critical failure during snapshot cancelation - please report any error logs")
}

View File

@ -146,16 +146,18 @@ func Test_GetSnapshotStatusFromGMS(t *testing.T) {
// Make the status pending processing and ensure GMS gets called
err = s.store.UpdateSnapshot(context.Background(), cloudmigration.UpdateSnapshotCmd{
UID: uid,
Status: cloudmigration.SnapshotStatusPendingProcessing,
UID: uid,
SessionID: sess.UID,
Status: cloudmigration.SnapshotStatusPendingProcessing,
})
assert.NoError(t, err)
cleanupFunc := func() {
gmsClientMock.getStatusCalled = 0
err = s.store.UpdateSnapshot(context.Background(), cloudmigration.UpdateSnapshotCmd{
UID: uid,
Status: cloudmigration.SnapshotStatusPendingProcessing,
UID: uid,
SessionID: sess.UID,
Status: cloudmigration.SnapshotStatusPendingProcessing,
})
assert.NoError(t, err)
}
@ -264,22 +266,10 @@ func Test_GetSnapshotStatusFromGMS(t *testing.T) {
},
}
snapshot, err = s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{
SnapshotUID: uid,
SessionUID: sess.UID,
})
assert.NoError(t, err)
assert.Equal(t, cloudmigration.SnapshotStatusFinished, snapshot.Status)
assert.Equal(t, 1, gmsClientMock.getStatusCalled)
assert.Len(t, snapshot.Resources, 1)
assert.Equal(t, gmsClientMock.getSnapshotResponse.Results[0], snapshot.Resources[0])
// ensure it is persisted
snapshot, err = s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{
SnapshotUID: uid,
SessionUID: sess.UID,
ResultPage: 1,
ResultLimit: 100,
})
assert.NoError(t, err)
assert.Equal(t, cloudmigration.SnapshotStatusFinished, snapshot.Status)
@ -323,8 +313,9 @@ func Test_OnlyQueriesStatusFromGMSWhenRequired(t *testing.T) {
cloudmigration.SnapshotStatusError,
} {
err = s.store.UpdateSnapshot(context.Background(), cloudmigration.UpdateSnapshotCmd{
UID: uid,
Status: status,
UID: uid,
SessionID: sess.UID,
Status: status,
})
assert.NoError(t, err)
_, err := s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{
@ -341,8 +332,9 @@ func Test_OnlyQueriesStatusFromGMSWhenRequired(t *testing.T) {
cloudmigration.SnapshotStatusProcessing,
} {
err = s.store.UpdateSnapshot(context.Background(), cloudmigration.UpdateSnapshotCmd{
UID: uid,
Status: status,
UID: uid,
SessionID: sess.UID,
Status: status,
})
assert.NoError(t, err)
_, err := s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{

View File

@ -32,7 +32,7 @@ func (s *Service) getMigrationDataJSON(ctx context.Context, signedInUser *user.S
}
// Dashboards and folders are linked via the schema, so we need to get both
dashboards, folders, err := s.getDashboardAndFolderCommands(ctx, signedInUser)
dashs, folders, err := s.getDashboardAndFolderCommands(ctx, signedInUser)
if err != nil {
s.log.Error("Failed to get dashboards and folders", "err", err)
return nil, err
@ -40,7 +40,7 @@ func (s *Service) getMigrationDataJSON(ctx context.Context, signedInUser *user.S
migrationDataSlice := make(
[]cloudmigration.MigrateDataRequestItem, 0,
len(dataSources)+len(dashboards)+len(folders),
len(dataSources)+len(dashs)+len(folders),
)
for _, ds := range dataSources {
@ -52,13 +52,19 @@ func (s *Service) getMigrationDataJSON(ctx context.Context, signedInUser *user.S
})
}
for _, dashboard := range dashboards {
for _, dashboard := range dashs {
dashboard.Data.Del("id")
migrationDataSlice = append(migrationDataSlice, cloudmigration.MigrateDataRequestItem{
Type: cloudmigration.DashboardDataType,
RefID: dashboard.UID,
Name: dashboard.Title,
Data: map[string]any{"dashboard": dashboard.Data},
Data: dashboards.SaveDashboardCommand{
Dashboard: dashboard.Data,
Overwrite: true, // currently only intended to be a push, not a sync; revisit during the preview
Message: fmt.Sprintf("Created via the Grafana Cloud Migration Assistant by on-prem user \"%s\"", signedInUser.Login),
IsFolder: false,
FolderUID: dashboard.FolderUID,
},
})
}
@ -174,14 +180,6 @@ func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedIn
s.log.Debug(fmt.Sprintf("buildSnapshot: method completed in %d ms", time.Since(start).Milliseconds()))
}()
// Update status to snapshot creating with retries
if err := s.updateSnapshotWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{
UID: snapshotMeta.UID,
Status: cloudmigration.SnapshotStatusCreating,
}); err != nil {
return err
}
publicKey, privateKey, err := box.GenerateKey(cryptoRand.Reader)
if err != nil {
return fmt.Errorf("nacl: generating public and private key: %w", err)
@ -256,6 +254,7 @@ func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedIn
// update snapshot status to pending upload with retries
if err := s.updateSnapshotWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{
UID: snapshotMeta.UID,
SessionID: snapshotMeta.SessionUID,
Status: cloudmigration.SnapshotStatusPendingUpload,
Resources: localSnapshotResource,
}); err != nil {
@ -276,14 +275,6 @@ func (s *Service) uploadSnapshot(ctx context.Context, session *cloudmigration.Cl
s.log.Debug(fmt.Sprintf("uploadSnapshot: method completed in %d ms", time.Since(start).Milliseconds()))
}()
// update snapshot status to uploading with retries
if err := s.updateSnapshotWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{
UID: snapshotMeta.UID,
Status: cloudmigration.SnapshotStatusUploading,
}); err != nil {
return err
}
indexFilePath := filepath.Join(snapshotMeta.LocalDir, "index.json")
// LocalDir can be set in the configuration, therefore the file path can be set to any path.
// nolint:gosec
@ -333,8 +324,9 @@ func (s *Service) uploadSnapshot(ctx context.Context, session *cloudmigration.Cl
// update snapshot status to processing with retries
if err := s.updateSnapshotWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{
UID: snapshotMeta.UID,
Status: cloudmigration.SnapshotStatusProcessing,
UID: snapshotMeta.UID,
SessionID: snapshotMeta.SessionUID,
Status: cloudmigration.SnapshotStatusProcessing,
}); err != nil {
return err
}

View File

@ -18,7 +18,7 @@ type store interface {
CreateSnapshot(ctx context.Context, snapshot cloudmigration.CloudMigrationSnapshot) (string, error)
UpdateSnapshot(ctx context.Context, snapshot cloudmigration.UpdateSnapshotCmd) error
GetSnapshotByUID(ctx context.Context, uid string, resultPage int, resultLimit int) (*cloudmigration.CloudMigrationSnapshot, error)
GetSnapshotByUID(ctx context.Context, sessUid, id string, resultPage int, resultLimit int) (*cloudmigration.CloudMigrationSnapshot, error)
GetSnapshotList(ctx context.Context, query cloudmigration.ListSnapshotsQuery) ([]cloudmigration.CloudMigrationSnapshot, error)
CreateUpdateSnapshotResources(ctx context.Context, snapshotUid string, resources []cloudmigration.CloudMigrationResource) error

View File

@ -160,6 +160,10 @@ func (ss *sqlStore) GetMigrationStatusList(ctx context.Context, migrationUID str
}
func (ss *sqlStore) CreateSnapshot(ctx context.Context, snapshot cloudmigration.CloudMigrationSnapshot) (string, error) {
if snapshot.SessionUID == "" {
return "", fmt.Errorf("sessionUID is required")
}
if snapshot.UID == "" {
snapshot.UID = util.GenerateShortUID()
}
@ -189,12 +193,15 @@ func (ss *sqlStore) UpdateSnapshot(ctx context.Context, update cloudmigration.Up
if update.UID == "" {
return fmt.Errorf("missing snapshot uid")
}
if update.SessionID == "" {
return fmt.Errorf("missing session uid")
}
err := ss.db.InTransaction(ctx, func(ctx context.Context) error {
// Update status if set
if update.Status != "" {
if err := ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
rawSQL := "UPDATE cloud_migration_snapshot SET status=? WHERE uid=?"
if _, err := sess.Exec(rawSQL, update.Status, update.UID); err != nil {
rawSQL := "UPDATE cloud_migration_snapshot SET status=? WHERE session_uid=? AND uid=?"
if _, err := sess.Exec(rawSQL, update.Status, update.SessionID, update.UID); err != nil {
return fmt.Errorf("updating snapshot status for uid %s: %w", update.UID, err)
}
return nil
@ -215,10 +222,10 @@ func (ss *sqlStore) UpdateSnapshot(ctx context.Context, update cloudmigration.Up
return err
}
func (ss *sqlStore) GetSnapshotByUID(ctx context.Context, uid string, resultPage int, resultLimit int) (*cloudmigration.CloudMigrationSnapshot, error) {
func (ss *sqlStore) GetSnapshotByUID(ctx context.Context, sessionUid, uid string, resultPage int, resultLimit int) (*cloudmigration.CloudMigrationSnapshot, error) {
var snapshot cloudmigration.CloudMigrationSnapshot
err := ss.db.WithDbSession(ctx, func(sess *db.Session) error {
exist, err := sess.Where("uid=?", uid).Get(&snapshot)
exist, err := sess.Where("session_uid=? AND uid=?", sessionUid, uid).Get(&snapshot)
if err != nil {
return err
}
@ -324,10 +331,14 @@ func (ss *sqlStore) CreateUpdateSnapshotResources(ctx context.Context, snapshotU
}
func (ss *sqlStore) GetSnapshotResources(ctx context.Context, snapshotUid string, page int, limit int) ([]cloudmigration.CloudMigrationResource, error) {
var resources []cloudmigration.CloudMigrationResource
if limit == 0 {
return resources, nil
if page < 1 {
page = 1
}
if limit == 0 {
limit = 100
}
var resources []cloudmigration.CloudMigrationResource
err := ss.db.WithDbSession(ctx, func(sess *db.Session) error {
offset := (page - 1) * limit
sess.Limit(limit, offset)

View File

@ -182,16 +182,16 @@ func Test_SnapshotManagement(t *testing.T) {
require.NotEmpty(t, snapshotUid)
//retrieve it from the db
snapshot, err := s.GetSnapshotByUID(ctx, snapshotUid, 0, 0)
snapshot, err := s.GetSnapshotByUID(ctx, sessionUid, snapshotUid, 0, 0)
require.NoError(t, err)
require.Equal(t, cloudmigration.SnapshotStatusCreating, snapshot.Status)
// update its status
err = s.UpdateSnapshot(ctx, cloudmigration.UpdateSnapshotCmd{UID: snapshotUid, Status: cloudmigration.SnapshotStatusCreating})
err = s.UpdateSnapshot(ctx, cloudmigration.UpdateSnapshotCmd{UID: snapshotUid, Status: cloudmigration.SnapshotStatusCreating, SessionID: sessionUid})
require.NoError(t, err)
//retrieve it again
snapshot, err = s.GetSnapshotByUID(ctx, snapshotUid, 0, 0)
snapshot, err = s.GetSnapshotByUID(ctx, sessionUid, snapshotUid, 0, 0)
require.NoError(t, err)
require.Equal(t, cloudmigration.SnapshotStatusCreating, snapshot.Status)

View File

@ -145,6 +145,7 @@ type ListSnapshotsQuery struct {
type UpdateSnapshotCmd struct {
UID string
SessionID string
Status SnapshotStatus
Resources []CloudMigrationResource
}