From 3e138449bbbe49ff8cd8a3333595418d03c1d3d3 Mon Sep 17 00:00:00 2001 From: Michael Mandrus <41969079+mmandrus@users.noreply.github.com> Date: Tue, 30 Jul 2024 09:02:41 -0400 Subject: [PATCH] CloudMigrations: Fix bugs found during local testing (#91163) * send dashboard commands instead of dashboards * move status updates before goroutine to ensure frontend polls * fix syncing issues between snapshot state and resources * make sessionUid a requirement for modifying snapshots * move the function I meant to move earlier * remove accidental commit * another accidental commit * verify UpdateSnapshot is called with sessionUid * revert * pass in session uid everywhere * forgot to save * fix unit test * fix typo * tiny tweak --- .../cloudmigrationimpl/cloudmigration.go | 44 +++++++++++++++---- .../cloudmigrationimpl/cloudmigration_test.go | 32 +++++--------- .../cloudmigrationimpl/snapshot_mgmt.go | 36 ++++++--------- .../cloudmigrationimpl/store.go | 2 +- .../cloudmigrationimpl/xorm_store.go | 25 ++++++++--- .../cloudmigrationimpl/xorm_store_test.go | 6 +-- pkg/services/cloudmigration/model.go | 1 + 7 files changed, 84 insertions(+), 62 deletions(-) diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go index 07ad55df565..36763dfffe4 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go @@ -512,6 +512,15 @@ func (s *Service) CreateSnapshot(ctx context.Context, signedInUser *user.SignedI } snapshot.UID = uid + // Update status to "creating" to ensure the frontend polls from now on + if err := s.updateSnapshotWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{ + UID: uid, + SessionID: sessionUid, + Status: cloudmigration.SnapshotStatusCreating, + }); err != nil { + return nil, err + } + // start building the snapshot asynchronously while we return a success response to the client go func() { s.cancelMutex.Lock() @@ -531,8 +540,9 @@ func (s *Service) CreateSnapshot(ctx context.Context, signedInUser *user.SignedI s.log.Error("building snapshot", "err", err.Error()) // Update status to error with retries if err := s.updateSnapshotWithRetries(context.Background(), cloudmigration.UpdateSnapshotCmd{ - UID: snapshot.UID, - Status: cloudmigration.SnapshotStatusError, + UID: snapshot.UID, + SessionID: sessionUid, + Status: cloudmigration.SnapshotStatusError, }); err != nil { s.log.Error("critical failure during snapshot creation - please report any error logs") } @@ -550,7 +560,7 @@ func (s *Service) GetSnapshot(ctx context.Context, query cloudmigration.GetSnaps defer span.End() sessionUid, snapshotUid := query.SessionUID, query.SnapshotUID - snapshot, err := s.store.GetSnapshotByUID(ctx, snapshotUid, query.ResultPage, query.ResultLimit) + snapshot, err := s.store.GetSnapshotByUID(ctx, sessionUid, snapshotUid, query.ResultPage, query.ResultLimit) if err != nil { return nil, fmt.Errorf("fetching snapshot for uid %s: %w", snapshotUid, err) } @@ -583,13 +593,18 @@ func (s *Service) GetSnapshot(ctx context.Context, query cloudmigration.GetSnaps // We need to update the snapshot in our db before reporting anything if err := s.store.UpdateSnapshot(ctx, cloudmigration.UpdateSnapshotCmd{ UID: snapshot.UID, + SessionID: sessionUid, Status: localStatus, Resources: snapshotMeta.Results, }); err != nil { return nil, fmt.Errorf("error updating snapshot status: %w", err) } - snapshot.Status = localStatus - snapshot.Resources = append(snapshot.Resources, snapshotMeta.Results...) + + // Refresh the snapshot after the update + snapshot, err = s.store.GetSnapshotByUID(ctx, sessionUid, snapshotUid, query.ResultPage, query.ResultLimit) + if err != nil { + return nil, fmt.Errorf("fetching snapshot for uid %s: %w", snapshotUid, err) + } } return snapshot, nil @@ -644,6 +659,15 @@ func (s *Service) UploadSnapshot(ctx context.Context, sessionUid string, snapsho s.log.Info("Uploading snapshot in local directory", "gmsSnapshotUID", snapshot.GMSSnapshotUID, "localDir", snapshot.LocalDir, "uploadURL", uploadUrl) + // Update status to "uploading" to ensure the frontend polls from now on + if err := s.updateSnapshotWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{ + UID: snapshotUid, + SessionID: sessionUid, + Status: cloudmigration.SnapshotStatusUploading, + }); err != nil { + return err + } + // start uploading the snapshot asynchronously while we return a success response to the client go func() { s.cancelMutex.Lock() @@ -663,8 +687,9 @@ func (s *Service) UploadSnapshot(ctx context.Context, sessionUid string, snapsho s.log.Error("uploading snapshot", "err", err.Error()) // Update status to error with retries if err := s.updateSnapshotWithRetries(context.Background(), cloudmigration.UpdateSnapshotCmd{ - UID: snapshot.UID, - Status: cloudmigration.SnapshotStatusError, + UID: snapshot.UID, + SessionID: sessionUid, + Status: cloudmigration.SnapshotStatusError, }); err != nil { s.log.Error("critical failure during snapshot upload - please report any error logs") } @@ -692,8 +717,9 @@ func (s *Service) CancelSnapshot(ctx context.Context, sessionUid string, snapsho s.cancelFunc = nil if err := s.updateSnapshotWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{ - UID: snapshotUid, - Status: cloudmigration.SnapshotStatusCanceled, + UID: snapshotUid, + SessionID: sessionUid, + Status: cloudmigration.SnapshotStatusCanceled, }); err != nil { s.log.Error("critical failure during snapshot cancelation - please report any error logs") } diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go index 2a1f1fbf7b9..006a862da36 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go @@ -146,16 +146,18 @@ func Test_GetSnapshotStatusFromGMS(t *testing.T) { // Make the status pending processing and ensure GMS gets called err = s.store.UpdateSnapshot(context.Background(), cloudmigration.UpdateSnapshotCmd{ - UID: uid, - Status: cloudmigration.SnapshotStatusPendingProcessing, + UID: uid, + SessionID: sess.UID, + Status: cloudmigration.SnapshotStatusPendingProcessing, }) assert.NoError(t, err) cleanupFunc := func() { gmsClientMock.getStatusCalled = 0 err = s.store.UpdateSnapshot(context.Background(), cloudmigration.UpdateSnapshotCmd{ - UID: uid, - Status: cloudmigration.SnapshotStatusPendingProcessing, + UID: uid, + SessionID: sess.UID, + Status: cloudmigration.SnapshotStatusPendingProcessing, }) assert.NoError(t, err) } @@ -264,22 +266,10 @@ func Test_GetSnapshotStatusFromGMS(t *testing.T) { }, } - snapshot, err = s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{ - SnapshotUID: uid, - SessionUID: sess.UID, - }) - assert.NoError(t, err) - assert.Equal(t, cloudmigration.SnapshotStatusFinished, snapshot.Status) - assert.Equal(t, 1, gmsClientMock.getStatusCalled) - assert.Len(t, snapshot.Resources, 1) - assert.Equal(t, gmsClientMock.getSnapshotResponse.Results[0], snapshot.Resources[0]) - // ensure it is persisted snapshot, err = s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{ SnapshotUID: uid, SessionUID: sess.UID, - ResultPage: 1, - ResultLimit: 100, }) assert.NoError(t, err) assert.Equal(t, cloudmigration.SnapshotStatusFinished, snapshot.Status) @@ -323,8 +313,9 @@ func Test_OnlyQueriesStatusFromGMSWhenRequired(t *testing.T) { cloudmigration.SnapshotStatusError, } { err = s.store.UpdateSnapshot(context.Background(), cloudmigration.UpdateSnapshotCmd{ - UID: uid, - Status: status, + UID: uid, + SessionID: sess.UID, + Status: status, }) assert.NoError(t, err) _, err := s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{ @@ -341,8 +332,9 @@ func Test_OnlyQueriesStatusFromGMSWhenRequired(t *testing.T) { cloudmigration.SnapshotStatusProcessing, } { err = s.store.UpdateSnapshot(context.Background(), cloudmigration.UpdateSnapshotCmd{ - UID: uid, - Status: status, + UID: uid, + SessionID: sess.UID, + Status: status, }) assert.NoError(t, err) _, err := s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{ diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go b/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go index 2b277b237ba..65bd07e564b 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go @@ -32,7 +32,7 @@ func (s *Service) getMigrationDataJSON(ctx context.Context, signedInUser *user.S } // Dashboards and folders are linked via the schema, so we need to get both - dashboards, folders, err := s.getDashboardAndFolderCommands(ctx, signedInUser) + dashs, folders, err := s.getDashboardAndFolderCommands(ctx, signedInUser) if err != nil { s.log.Error("Failed to get dashboards and folders", "err", err) return nil, err @@ -40,7 +40,7 @@ func (s *Service) getMigrationDataJSON(ctx context.Context, signedInUser *user.S migrationDataSlice := make( []cloudmigration.MigrateDataRequestItem, 0, - len(dataSources)+len(dashboards)+len(folders), + len(dataSources)+len(dashs)+len(folders), ) for _, ds := range dataSources { @@ -52,13 +52,19 @@ func (s *Service) getMigrationDataJSON(ctx context.Context, signedInUser *user.S }) } - for _, dashboard := range dashboards { + for _, dashboard := range dashs { dashboard.Data.Del("id") migrationDataSlice = append(migrationDataSlice, cloudmigration.MigrateDataRequestItem{ Type: cloudmigration.DashboardDataType, RefID: dashboard.UID, Name: dashboard.Title, - Data: map[string]any{"dashboard": dashboard.Data}, + Data: dashboards.SaveDashboardCommand{ + Dashboard: dashboard.Data, + Overwrite: true, // currently only intended to be a push, not a sync; revisit during the preview + Message: fmt.Sprintf("Created via the Grafana Cloud Migration Assistant by on-prem user \"%s\"", signedInUser.Login), + IsFolder: false, + FolderUID: dashboard.FolderUID, + }, }) } @@ -174,14 +180,6 @@ func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedIn s.log.Debug(fmt.Sprintf("buildSnapshot: method completed in %d ms", time.Since(start).Milliseconds())) }() - // Update status to snapshot creating with retries - if err := s.updateSnapshotWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{ - UID: snapshotMeta.UID, - Status: cloudmigration.SnapshotStatusCreating, - }); err != nil { - return err - } - publicKey, privateKey, err := box.GenerateKey(cryptoRand.Reader) if err != nil { return fmt.Errorf("nacl: generating public and private key: %w", err) @@ -256,6 +254,7 @@ func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedIn // update snapshot status to pending upload with retries if err := s.updateSnapshotWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{ UID: snapshotMeta.UID, + SessionID: snapshotMeta.SessionUID, Status: cloudmigration.SnapshotStatusPendingUpload, Resources: localSnapshotResource, }); err != nil { @@ -276,14 +275,6 @@ func (s *Service) uploadSnapshot(ctx context.Context, session *cloudmigration.Cl s.log.Debug(fmt.Sprintf("uploadSnapshot: method completed in %d ms", time.Since(start).Milliseconds())) }() - // update snapshot status to uploading with retries - if err := s.updateSnapshotWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{ - UID: snapshotMeta.UID, - Status: cloudmigration.SnapshotStatusUploading, - }); err != nil { - return err - } - indexFilePath := filepath.Join(snapshotMeta.LocalDir, "index.json") // LocalDir can be set in the configuration, therefore the file path can be set to any path. // nolint:gosec @@ -333,8 +324,9 @@ func (s *Service) uploadSnapshot(ctx context.Context, session *cloudmigration.Cl // update snapshot status to processing with retries if err := s.updateSnapshotWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{ - UID: snapshotMeta.UID, - Status: cloudmigration.SnapshotStatusProcessing, + UID: snapshotMeta.UID, + SessionID: snapshotMeta.SessionUID, + Status: cloudmigration.SnapshotStatusProcessing, }); err != nil { return err } diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/store.go b/pkg/services/cloudmigration/cloudmigrationimpl/store.go index 3a4962b8a04..da196e2d7b1 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/store.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/store.go @@ -18,7 +18,7 @@ type store interface { CreateSnapshot(ctx context.Context, snapshot cloudmigration.CloudMigrationSnapshot) (string, error) UpdateSnapshot(ctx context.Context, snapshot cloudmigration.UpdateSnapshotCmd) error - GetSnapshotByUID(ctx context.Context, uid string, resultPage int, resultLimit int) (*cloudmigration.CloudMigrationSnapshot, error) + GetSnapshotByUID(ctx context.Context, sessUid, id string, resultPage int, resultLimit int) (*cloudmigration.CloudMigrationSnapshot, error) GetSnapshotList(ctx context.Context, query cloudmigration.ListSnapshotsQuery) ([]cloudmigration.CloudMigrationSnapshot, error) CreateUpdateSnapshotResources(ctx context.Context, snapshotUid string, resources []cloudmigration.CloudMigrationResource) error diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/xorm_store.go b/pkg/services/cloudmigration/cloudmigrationimpl/xorm_store.go index 1982c2ce220..a22ea2f0dff 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/xorm_store.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/xorm_store.go @@ -160,6 +160,10 @@ func (ss *sqlStore) GetMigrationStatusList(ctx context.Context, migrationUID str } func (ss *sqlStore) CreateSnapshot(ctx context.Context, snapshot cloudmigration.CloudMigrationSnapshot) (string, error) { + if snapshot.SessionUID == "" { + return "", fmt.Errorf("sessionUID is required") + } + if snapshot.UID == "" { snapshot.UID = util.GenerateShortUID() } @@ -189,12 +193,15 @@ func (ss *sqlStore) UpdateSnapshot(ctx context.Context, update cloudmigration.Up if update.UID == "" { return fmt.Errorf("missing snapshot uid") } + if update.SessionID == "" { + return fmt.Errorf("missing session uid") + } err := ss.db.InTransaction(ctx, func(ctx context.Context) error { // Update status if set if update.Status != "" { if err := ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error { - rawSQL := "UPDATE cloud_migration_snapshot SET status=? WHERE uid=?" - if _, err := sess.Exec(rawSQL, update.Status, update.UID); err != nil { + rawSQL := "UPDATE cloud_migration_snapshot SET status=? WHERE session_uid=? AND uid=?" + if _, err := sess.Exec(rawSQL, update.Status, update.SessionID, update.UID); err != nil { return fmt.Errorf("updating snapshot status for uid %s: %w", update.UID, err) } return nil @@ -215,10 +222,10 @@ func (ss *sqlStore) UpdateSnapshot(ctx context.Context, update cloudmigration.Up return err } -func (ss *sqlStore) GetSnapshotByUID(ctx context.Context, uid string, resultPage int, resultLimit int) (*cloudmigration.CloudMigrationSnapshot, error) { +func (ss *sqlStore) GetSnapshotByUID(ctx context.Context, sessionUid, uid string, resultPage int, resultLimit int) (*cloudmigration.CloudMigrationSnapshot, error) { var snapshot cloudmigration.CloudMigrationSnapshot err := ss.db.WithDbSession(ctx, func(sess *db.Session) error { - exist, err := sess.Where("uid=?", uid).Get(&snapshot) + exist, err := sess.Where("session_uid=? AND uid=?", sessionUid, uid).Get(&snapshot) if err != nil { return err } @@ -324,10 +331,14 @@ func (ss *sqlStore) CreateUpdateSnapshotResources(ctx context.Context, snapshotU } func (ss *sqlStore) GetSnapshotResources(ctx context.Context, snapshotUid string, page int, limit int) ([]cloudmigration.CloudMigrationResource, error) { - var resources []cloudmigration.CloudMigrationResource - if limit == 0 { - return resources, nil + if page < 1 { + page = 1 } + if limit == 0 { + limit = 100 + } + + var resources []cloudmigration.CloudMigrationResource err := ss.db.WithDbSession(ctx, func(sess *db.Session) error { offset := (page - 1) * limit sess.Limit(limit, offset) diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/xorm_store_test.go b/pkg/services/cloudmigration/cloudmigrationimpl/xorm_store_test.go index b61154574aa..b5d889c3fb2 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/xorm_store_test.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/xorm_store_test.go @@ -182,16 +182,16 @@ func Test_SnapshotManagement(t *testing.T) { require.NotEmpty(t, snapshotUid) //retrieve it from the db - snapshot, err := s.GetSnapshotByUID(ctx, snapshotUid, 0, 0) + snapshot, err := s.GetSnapshotByUID(ctx, sessionUid, snapshotUid, 0, 0) require.NoError(t, err) require.Equal(t, cloudmigration.SnapshotStatusCreating, snapshot.Status) // update its status - err = s.UpdateSnapshot(ctx, cloudmigration.UpdateSnapshotCmd{UID: snapshotUid, Status: cloudmigration.SnapshotStatusCreating}) + err = s.UpdateSnapshot(ctx, cloudmigration.UpdateSnapshotCmd{UID: snapshotUid, Status: cloudmigration.SnapshotStatusCreating, SessionID: sessionUid}) require.NoError(t, err) //retrieve it again - snapshot, err = s.GetSnapshotByUID(ctx, snapshotUid, 0, 0) + snapshot, err = s.GetSnapshotByUID(ctx, sessionUid, snapshotUid, 0, 0) require.NoError(t, err) require.Equal(t, cloudmigration.SnapshotStatusCreating, snapshot.Status) diff --git a/pkg/services/cloudmigration/model.go b/pkg/services/cloudmigration/model.go index 0a254ee0b35..6baf0f4c3fa 100644 --- a/pkg/services/cloudmigration/model.go +++ b/pkg/services/cloudmigration/model.go @@ -145,6 +145,7 @@ type ListSnapshotsQuery struct { type UpdateSnapshotCmd struct { UID string + SessionID string Status SnapshotStatus Resources []CloudMigrationResource }