diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go index 7f987f81f43..e7b58fa5ebf 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go @@ -9,6 +9,7 @@ import ( "os" "path/filepath" "sync" + "sync/atomic" "time" "github.com/google/uuid" @@ -53,6 +54,8 @@ type Service struct { cancelMutex sync.Mutex cancelFunc context.CancelFunc + isSyncSnapshotStatusFromGMSRunning int32 + features featuremgmt.FeatureToggles gmsClient gmsclient.Client objectStorage objectstorage.ObjectStorage @@ -546,8 +549,8 @@ func (s *Service) GetSnapshot(ctx context.Context, query cloudmigration.GetSnaps return nil, fmt.Errorf("fetching session for uid %s: %w", sessionUid, err) } - // Ask GMS for snapshot status while the source of truth is in the cloud - if snapshot.ShouldQueryGMS() { + // FIXME: this function should be a method + syncStatus := func(ctx context.Context, session *cloudmigration.CloudMigrationSession, snapshot *cloudmigration.CloudMigrationSnapshot) (*cloudmigration.CloudMigrationSnapshot, error) { // Calculate offset based on how many results we currently have responses for pending := snapshot.StatsRollup.CountsByStatus[cloudmigration.ItemStatusPending] snapshotMeta, err := s.gmsClient.GetSnapshotStatus(ctx, *session, *snapshot, snapshot.StatsRollup.Total-pending) @@ -595,11 +598,74 @@ func (s *Service) GetSnapshot(ctx context.Context, query cloudmigration.GetSnaps if err != nil { return nil, fmt.Errorf("fetching snapshot for uid %s: %w", snapshotUid, err) } + return snapshot, nil } + // Create a context out the span context to ensure the trace is propagated + asyncSyncCtx := trace.ContextWithSpanContext(context.Background(), span.SpanContext()) + // Sync snapshot results from GMS if the one created after upload is not running (e.g. due to a restart) + // and anybody is interested in the status. + go s.syncSnapshotStatusFromGMSUntilDone(asyncSyncCtx, session, snapshot, syncStatus) + return snapshot, nil } +// FIXME: this definition should not exist once the function is GetSnapshot is converted to a method +type syncStatusFunc func(context.Context, *cloudmigration.CloudMigrationSession, *cloudmigration.CloudMigrationSnapshot) (*cloudmigration.CloudMigrationSnapshot, error) + +func (s *Service) syncSnapshotStatusFromGMSUntilDone(ctx context.Context, session *cloudmigration.CloudMigrationSession, snapshot *cloudmigration.CloudMigrationSnapshot, syncStatus syncStatusFunc) { + ctx, span := s.tracer.Start(ctx, "CloudMigrationService.syncSnapshotStatusFromGMSUntilDone") + span.SetAttributes( + attribute.String("sessionUID", session.UID), + attribute.String("snapshotUID", snapshot.UID), + ) + defer span.End() + + // Ensure only one in-flight sync running + if !atomic.CompareAndSwapInt32(&s.isSyncSnapshotStatusFromGMSRunning, 0, 1) { + s.log.Info("synchronize snapshot status already running", "sessionUID", session.UID, "snapshotUID", snapshot.UID) + return + } + defer atomic.StoreInt32(&s.isSyncSnapshotStatusFromGMSRunning, 0) + + if !snapshot.ShouldQueryGMS() { + return + } + + s.cancelMutex.Lock() + defer func() { + s.cancelFunc = nil + s.cancelMutex.Unlock() + }() + + ctx, s.cancelFunc = context.WithCancel(ctx) + + updatedSnapshot, err := syncStatus(ctx, session, snapshot) + if err != nil { + s.log.Error("error fetching snapshot status from GMS", "error", err, "sessionUID", session.UID, "snapshotUID", snapshot.UID) + } else { + snapshot = updatedSnapshot + } + + tick := time.NewTicker(10 * time.Second) + defer tick.Stop() + + for snapshot.ShouldQueryGMS() { + select { + case <-ctx.Done(): + s.log.Info("cancelling snapshot status polling", "sessionUID", session.UID, "snapshotUID", snapshot.UID) + return + case <-tick.C: + updatedSnapshot, err := syncStatus(ctx, session, snapshot) + if err != nil { + s.log.Error("error fetching snapshot status from GMS", "error", err, "sessionUID", session.UID, "snapshotUID", snapshot.UID) + continue + } + snapshot = updatedSnapshot + } + } +} + var gmsStateToLocalStatus map[cloudmigration.SnapshotState]cloudmigration.SnapshotStatus = map[cloudmigration.SnapshotState]cloudmigration.SnapshotStatus{ cloudmigration.SnapshotStateInitialized: cloudmigration.SnapshotStatusPendingProcessing, // GMS has not yet received a notification for the data cloudmigration.SnapshotStateProcessing: cloudmigration.SnapshotStatusProcessing, // GMS has received a notification and is migrating the data diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go index 3d84cc75a4a..b236a96864f 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go @@ -6,6 +6,7 @@ import ( "os" "path/filepath" "slices" + "sync" "testing" "time" @@ -111,7 +112,7 @@ func Test_GetSnapshotStatusFromGMS(t *testing.T) { }) assert.NoError(t, err) assert.Equal(t, cloudmigration.SnapshotStatusCreating, snapshot.Status) - assert.Equal(t, 0, gmsClientMock.getStatusCalled) + assert.Never(t, func() bool { return gmsClientMock.GetSnapshotStatusCallCount() > 0 }, time.Second, 10*time.Millisecond) // Make the status pending processing and ensure GMS gets called err = s.store.UpdateSnapshot(context.Background(), cloudmigration.UpdateSnapshotCmd{ @@ -123,6 +124,11 @@ func Test_GetSnapshotStatusFromGMS(t *testing.T) { cleanupFunc := func() { gmsClientMock.getStatusCalled = 0 + gmsClientMock.getSnapshotResponse = nil + if s.cancelFunc != nil { + s.cancelFunc() + } + err = s.store.UpdateSnapshot(context.Background(), cloudmigration.UpdateSnapshotCmd{ UID: uid, SessionID: sess.UID, @@ -131,6 +137,20 @@ func Test_GetSnapshotStatusFromGMS(t *testing.T) { assert.NoError(t, err) } + checkStatusSync := func(status cloudmigration.SnapshotStatus) func() bool { + return func() bool { + snapshot, err := s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{ + SnapshotUID: uid, + SessionUID: sess.UID, + }) + if err != nil { + return false + } + + return snapshot.Status == status + } + } + t.Run("test case: gms snapshot initialized", func(t *testing.T) { gmsClientMock.getSnapshotResponse = &cloudmigration.GetSnapshotStatusResponse{ State: cloudmigration.SnapshotStateInitialized, @@ -140,9 +160,8 @@ func Test_GetSnapshotStatusFromGMS(t *testing.T) { SessionUID: sess.UID, }) assert.NoError(t, err) - assert.Equal(t, cloudmigration.SnapshotStatusPendingProcessing, snapshot.Status) - assert.Equal(t, 1, gmsClientMock.getStatusCalled) - + require.Eventually(t, checkStatusSync(cloudmigration.SnapshotStatusPendingProcessing), time.Second, 10*time.Millisecond) + assert.Equal(t, 1, gmsClientMock.GetSnapshotStatusCallCount()) t.Cleanup(cleanupFunc) }) @@ -150,14 +169,13 @@ func Test_GetSnapshotStatusFromGMS(t *testing.T) { gmsClientMock.getSnapshotResponse = &cloudmigration.GetSnapshotStatusResponse{ State: cloudmigration.SnapshotStateProcessing, } - snapshot, err = s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{ + _, err := s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{ SnapshotUID: uid, SessionUID: sess.UID, }) assert.NoError(t, err) - assert.Equal(t, cloudmigration.SnapshotStatusProcessing, snapshot.Status) - assert.Equal(t, 1, gmsClientMock.getStatusCalled) - + require.Eventually(t, checkStatusSync(cloudmigration.SnapshotStatusProcessing), time.Second, 10*time.Millisecond) + assert.Equal(t, 1, gmsClientMock.GetSnapshotStatusCallCount()) t.Cleanup(cleanupFunc) }) @@ -170,8 +188,8 @@ func Test_GetSnapshotStatusFromGMS(t *testing.T) { SessionUID: sess.UID, }) assert.NoError(t, err) - assert.Equal(t, cloudmigration.SnapshotStatusFinished, snapshot.Status) - assert.Equal(t, 1, gmsClientMock.getStatusCalled) + require.Eventually(t, checkStatusSync(cloudmigration.SnapshotStatusFinished), time.Second, 10*time.Millisecond) + assert.Equal(t, 1, gmsClientMock.GetSnapshotStatusCallCount()) t.Cleanup(cleanupFunc) }) @@ -185,8 +203,8 @@ func Test_GetSnapshotStatusFromGMS(t *testing.T) { SessionUID: sess.UID, }) assert.NoError(t, err) - assert.Equal(t, cloudmigration.SnapshotStatusCanceled, snapshot.Status) - assert.Equal(t, 1, gmsClientMock.getStatusCalled) + require.Eventually(t, checkStatusSync(cloudmigration.SnapshotStatusCanceled), time.Second, 10*time.Millisecond) + assert.Equal(t, 1, gmsClientMock.GetSnapshotStatusCallCount()) t.Cleanup(cleanupFunc) }) @@ -200,8 +218,8 @@ func Test_GetSnapshotStatusFromGMS(t *testing.T) { SessionUID: sess.UID, }) assert.NoError(t, err) - assert.Equal(t, cloudmigration.SnapshotStatusError, snapshot.Status) - assert.Equal(t, 1, gmsClientMock.getStatusCalled) + require.Eventually(t, checkStatusSync(cloudmigration.SnapshotStatusError), time.Second, 10*time.Millisecond) + assert.Equal(t, 1, gmsClientMock.GetSnapshotStatusCallCount()) t.Cleanup(cleanupFunc) }) @@ -216,8 +234,14 @@ func Test_GetSnapshotStatusFromGMS(t *testing.T) { }) assert.NoError(t, err) // snapshot status should remain unchanged + + require.Eventually(t, func() bool { return gmsClientMock.GetSnapshotStatusCallCount() == 1 }, time.Second, 10*time.Millisecond) + snapshot, err = s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{ + SnapshotUID: uid, + SessionUID: sess.UID, + }) + assert.NoError(t, err) assert.Equal(t, cloudmigration.SnapshotStatusPendingProcessing, snapshot.Status) - assert.Equal(t, 1, gmsClientMock.getStatusCalled) t.Cleanup(cleanupFunc) }) @@ -242,8 +266,14 @@ func Test_GetSnapshotStatusFromGMS(t *testing.T) { SessionUID: sess.UID, }) assert.NoError(t, err) - assert.Equal(t, cloudmigration.SnapshotStatusFinished, snapshot.Status) - assert.Equal(t, 1, gmsClientMock.getStatusCalled) // shouldn't have queried GMS again now that it is finished + require.Eventually(t, func() bool { return gmsClientMock.GetSnapshotStatusCallCount() == 1 }, time.Second, 10*time.Millisecond) + + snapshot, err = s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{ + SnapshotUID: uid, + SessionUID: sess.UID, + }) + assert.NoError(t, err) + assert.Len(t, snapshot.Resources, 1) assert.Equal(t, "A", snapshot.Resources[0].RefID) assert.Equal(t, "fake", snapshot.Resources[0].Error) @@ -300,7 +330,7 @@ func Test_OnlyQueriesStatusFromGMSWhenRequired(t *testing.T) { SessionUID: sess.UID, }) assert.NoError(t, err) - assert.Equal(t, 0, gmsClientMock.getStatusCalled) + assert.Never(t, func() bool { return gmsClientMock.GetSnapshotStatusCallCount() > 0 }, time.Second, 10*time.Millisecond) } // make sure GMS is called when snapshot is pending processing or processing @@ -319,8 +349,9 @@ func Test_OnlyQueriesStatusFromGMSWhenRequired(t *testing.T) { SessionUID: sess.UID, }) assert.NoError(t, err) - assert.Equal(t, i+1, gmsClientMock.getStatusCalled) + require.Eventually(t, func() bool { return gmsClientMock.GetSnapshotStatusCallCount() == i+1 }, time.Second, 10*time.Millisecond) } + assert.Never(t, func() bool { return gmsClientMock.GetSnapshotStatusCallCount() > 2 }, time.Second, 10*time.Millisecond) } func Test_DeletedDashboardsNotMigrated(t *testing.T) { @@ -470,15 +501,25 @@ func Test_NonCoreDataSourcesHaveWarning(t *testing.T) { }, } - // Retrieve the snapshot with results - snapshot, err := s.GetSnapshot(ctxWithSignedInUser(), cloudmigration.GetSnapshotsQuery{ - SnapshotUID: snapshotUid, - SessionUID: sess.UID, - ResultPage: 1, - ResultLimit: 10, - }) - assert.NoError(t, err) - assert.Len(t, snapshot.Resources, 4) + var snapshot *cloudmigration.CloudMigrationSnapshot + hasFourResources := func() bool { + // Retrieve the snapshot with results + var err error + snapshot, err = s.GetSnapshot(ctxWithSignedInUser(), cloudmigration.GetSnapshotsQuery{ + SnapshotUID: snapshotUid, + SessionUID: sess.UID, + ResultPage: 1, + ResultLimit: 10, + }) + + if !assert.NoError(t, err) { + return false + } + + return len(snapshot.Resources) == 4 + } + + require.Eventually(t, hasFourResources, time.Second, 10*time.Millisecond) findRef := func(id string) *cloudmigration.CloudMigrationResource { for _, r := range snapshot.Resources { @@ -579,6 +620,7 @@ func TestReportEvent(t *testing.T) { require.Equal(t, 1, gmsMock.reportEventCalled) }) } + func TestGetFolderNamesForFolderUIDs(t *testing.T) { s := setUpServiceTest(t, false).(*Service) ctx, cancel := context.WithCancel(context.Background()) @@ -802,7 +844,7 @@ func setUpServiceTest(t *testing.T, withDashboardMock bool) cloudmigration.Servi ) require.NoError(t, err) - var validConfig = `{ + validConfig := `{ "alertmanager_config": { "route": { "receiver": "grafana-default-email" @@ -850,6 +892,8 @@ func setUpServiceTest(t *testing.T, withDashboardMock bool) cloudmigration.Servi } type gmsClientMock struct { + mu sync.RWMutex + validateKeyCalled int startSnapshotCalled int getStatusCalled int @@ -874,7 +918,10 @@ func (m *gmsClientMock) StartSnapshot(_ context.Context, _ cloudmigration.CloudM } func (m *gmsClientMock) GetSnapshotStatus(_ context.Context, _ cloudmigration.CloudMigrationSession, _ cloudmigration.CloudMigrationSnapshot, _ int) (*cloudmigration.GetSnapshotStatusResponse, error) { + m.mu.Lock() m.getStatusCalled++ + m.mu.Unlock() + return m.getSnapshotResponse, nil } @@ -886,3 +933,10 @@ func (m *gmsClientMock) CreatePresignedUploadUrl(ctx context.Context, session cl func (m *gmsClientMock) ReportEvent(context.Context, cloudmigration.CloudMigrationSession, gmsclient.EventRequestDTO) { m.reportEventCalled++ } + +func (m *gmsClientMock) GetSnapshotStatusCallCount() int { + m.mu.RLock() + defer m.mu.RUnlock() + + return m.getStatusCalled +}