[CloudMigrations] Fetch cloud migration status asynchronously (#96084)

* Single go-routine without touching too many lines.
* Fix unit tests
* Use require for Eventually
* Use a 10 second internal to poll for new results.
This commit is contained in:
Roberto Jiménez Sánchez
2024-11-11 19:26:30 +01:00
committed by GitHub
parent 754351273b
commit 477326b9c3
2 changed files with 151 additions and 31 deletions

View File

@@ -9,6 +9,7 @@ import (
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/google/uuid"
@@ -53,6 +54,8 @@ type Service struct {
cancelMutex sync.Mutex
cancelFunc context.CancelFunc
isSyncSnapshotStatusFromGMSRunning int32
features featuremgmt.FeatureToggles
gmsClient gmsclient.Client
objectStorage objectstorage.ObjectStorage
@@ -546,8 +549,8 @@ func (s *Service) GetSnapshot(ctx context.Context, query cloudmigration.GetSnaps
return nil, fmt.Errorf("fetching session for uid %s: %w", sessionUid, err)
}
// Ask GMS for snapshot status while the source of truth is in the cloud
if snapshot.ShouldQueryGMS() {
// FIXME: this function should be a method
syncStatus := func(ctx context.Context, session *cloudmigration.CloudMigrationSession, snapshot *cloudmigration.CloudMigrationSnapshot) (*cloudmigration.CloudMigrationSnapshot, error) {
// Calculate offset based on how many results we currently have responses for
pending := snapshot.StatsRollup.CountsByStatus[cloudmigration.ItemStatusPending]
snapshotMeta, err := s.gmsClient.GetSnapshotStatus(ctx, *session, *snapshot, snapshot.StatsRollup.Total-pending)
@@ -595,11 +598,74 @@ func (s *Service) GetSnapshot(ctx context.Context, query cloudmigration.GetSnaps
if err != nil {
return nil, fmt.Errorf("fetching snapshot for uid %s: %w", snapshotUid, err)
}
return snapshot, nil
}
// Create a context out the span context to ensure the trace is propagated
asyncSyncCtx := trace.ContextWithSpanContext(context.Background(), span.SpanContext())
// Sync snapshot results from GMS if the one created after upload is not running (e.g. due to a restart)
// and anybody is interested in the status.
go s.syncSnapshotStatusFromGMSUntilDone(asyncSyncCtx, session, snapshot, syncStatus)
return snapshot, nil
}
// FIXME: this definition should not exist once the function is GetSnapshot is converted to a method
type syncStatusFunc func(context.Context, *cloudmigration.CloudMigrationSession, *cloudmigration.CloudMigrationSnapshot) (*cloudmigration.CloudMigrationSnapshot, error)
func (s *Service) syncSnapshotStatusFromGMSUntilDone(ctx context.Context, session *cloudmigration.CloudMigrationSession, snapshot *cloudmigration.CloudMigrationSnapshot, syncStatus syncStatusFunc) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.syncSnapshotStatusFromGMSUntilDone")
span.SetAttributes(
attribute.String("sessionUID", session.UID),
attribute.String("snapshotUID", snapshot.UID),
)
defer span.End()
// Ensure only one in-flight sync running
if !atomic.CompareAndSwapInt32(&s.isSyncSnapshotStatusFromGMSRunning, 0, 1) {
s.log.Info("synchronize snapshot status already running", "sessionUID", session.UID, "snapshotUID", snapshot.UID)
return
}
defer atomic.StoreInt32(&s.isSyncSnapshotStatusFromGMSRunning, 0)
if !snapshot.ShouldQueryGMS() {
return
}
s.cancelMutex.Lock()
defer func() {
s.cancelFunc = nil
s.cancelMutex.Unlock()
}()
ctx, s.cancelFunc = context.WithCancel(ctx)
updatedSnapshot, err := syncStatus(ctx, session, snapshot)
if err != nil {
s.log.Error("error fetching snapshot status from GMS", "error", err, "sessionUID", session.UID, "snapshotUID", snapshot.UID)
} else {
snapshot = updatedSnapshot
}
tick := time.NewTicker(10 * time.Second)
defer tick.Stop()
for snapshot.ShouldQueryGMS() {
select {
case <-ctx.Done():
s.log.Info("cancelling snapshot status polling", "sessionUID", session.UID, "snapshotUID", snapshot.UID)
return
case <-tick.C:
updatedSnapshot, err := syncStatus(ctx, session, snapshot)
if err != nil {
s.log.Error("error fetching snapshot status from GMS", "error", err, "sessionUID", session.UID, "snapshotUID", snapshot.UID)
continue
}
snapshot = updatedSnapshot
}
}
}
var gmsStateToLocalStatus map[cloudmigration.SnapshotState]cloudmigration.SnapshotStatus = map[cloudmigration.SnapshotState]cloudmigration.SnapshotStatus{
cloudmigration.SnapshotStateInitialized: cloudmigration.SnapshotStatusPendingProcessing, // GMS has not yet received a notification for the data
cloudmigration.SnapshotStateProcessing: cloudmigration.SnapshotStatusProcessing, // GMS has received a notification and is migrating the data

View File

@@ -6,6 +6,7 @@ import (
"os"
"path/filepath"
"slices"
"sync"
"testing"
"time"
@@ -111,7 +112,7 @@ func Test_GetSnapshotStatusFromGMS(t *testing.T) {
})
assert.NoError(t, err)
assert.Equal(t, cloudmigration.SnapshotStatusCreating, snapshot.Status)
assert.Equal(t, 0, gmsClientMock.getStatusCalled)
assert.Never(t, func() bool { return gmsClientMock.GetSnapshotStatusCallCount() > 0 }, time.Second, 10*time.Millisecond)
// Make the status pending processing and ensure GMS gets called
err = s.store.UpdateSnapshot(context.Background(), cloudmigration.UpdateSnapshotCmd{
@@ -123,6 +124,11 @@ func Test_GetSnapshotStatusFromGMS(t *testing.T) {
cleanupFunc := func() {
gmsClientMock.getStatusCalled = 0
gmsClientMock.getSnapshotResponse = nil
if s.cancelFunc != nil {
s.cancelFunc()
}
err = s.store.UpdateSnapshot(context.Background(), cloudmigration.UpdateSnapshotCmd{
UID: uid,
SessionID: sess.UID,
@@ -131,6 +137,20 @@ func Test_GetSnapshotStatusFromGMS(t *testing.T) {
assert.NoError(t, err)
}
checkStatusSync := func(status cloudmigration.SnapshotStatus) func() bool {
return func() bool {
snapshot, err := s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{
SnapshotUID: uid,
SessionUID: sess.UID,
})
if err != nil {
return false
}
return snapshot.Status == status
}
}
t.Run("test case: gms snapshot initialized", func(t *testing.T) {
gmsClientMock.getSnapshotResponse = &cloudmigration.GetSnapshotStatusResponse{
State: cloudmigration.SnapshotStateInitialized,
@@ -140,9 +160,8 @@ func Test_GetSnapshotStatusFromGMS(t *testing.T) {
SessionUID: sess.UID,
})
assert.NoError(t, err)
assert.Equal(t, cloudmigration.SnapshotStatusPendingProcessing, snapshot.Status)
assert.Equal(t, 1, gmsClientMock.getStatusCalled)
require.Eventually(t, checkStatusSync(cloudmigration.SnapshotStatusPendingProcessing), time.Second, 10*time.Millisecond)
assert.Equal(t, 1, gmsClientMock.GetSnapshotStatusCallCount())
t.Cleanup(cleanupFunc)
})
@@ -150,14 +169,13 @@ func Test_GetSnapshotStatusFromGMS(t *testing.T) {
gmsClientMock.getSnapshotResponse = &cloudmigration.GetSnapshotStatusResponse{
State: cloudmigration.SnapshotStateProcessing,
}
snapshot, err = s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{
_, err := s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{
SnapshotUID: uid,
SessionUID: sess.UID,
})
assert.NoError(t, err)
assert.Equal(t, cloudmigration.SnapshotStatusProcessing, snapshot.Status)
assert.Equal(t, 1, gmsClientMock.getStatusCalled)
require.Eventually(t, checkStatusSync(cloudmigration.SnapshotStatusProcessing), time.Second, 10*time.Millisecond)
assert.Equal(t, 1, gmsClientMock.GetSnapshotStatusCallCount())
t.Cleanup(cleanupFunc)
})
@@ -170,8 +188,8 @@ func Test_GetSnapshotStatusFromGMS(t *testing.T) {
SessionUID: sess.UID,
})
assert.NoError(t, err)
assert.Equal(t, cloudmigration.SnapshotStatusFinished, snapshot.Status)
assert.Equal(t, 1, gmsClientMock.getStatusCalled)
require.Eventually(t, checkStatusSync(cloudmigration.SnapshotStatusFinished), time.Second, 10*time.Millisecond)
assert.Equal(t, 1, gmsClientMock.GetSnapshotStatusCallCount())
t.Cleanup(cleanupFunc)
})
@@ -185,8 +203,8 @@ func Test_GetSnapshotStatusFromGMS(t *testing.T) {
SessionUID: sess.UID,
})
assert.NoError(t, err)
assert.Equal(t, cloudmigration.SnapshotStatusCanceled, snapshot.Status)
assert.Equal(t, 1, gmsClientMock.getStatusCalled)
require.Eventually(t, checkStatusSync(cloudmigration.SnapshotStatusCanceled), time.Second, 10*time.Millisecond)
assert.Equal(t, 1, gmsClientMock.GetSnapshotStatusCallCount())
t.Cleanup(cleanupFunc)
})
@@ -200,8 +218,8 @@ func Test_GetSnapshotStatusFromGMS(t *testing.T) {
SessionUID: sess.UID,
})
assert.NoError(t, err)
assert.Equal(t, cloudmigration.SnapshotStatusError, snapshot.Status)
assert.Equal(t, 1, gmsClientMock.getStatusCalled)
require.Eventually(t, checkStatusSync(cloudmigration.SnapshotStatusError), time.Second, 10*time.Millisecond)
assert.Equal(t, 1, gmsClientMock.GetSnapshotStatusCallCount())
t.Cleanup(cleanupFunc)
})
@@ -216,8 +234,14 @@ func Test_GetSnapshotStatusFromGMS(t *testing.T) {
})
assert.NoError(t, err)
// snapshot status should remain unchanged
require.Eventually(t, func() bool { return gmsClientMock.GetSnapshotStatusCallCount() == 1 }, time.Second, 10*time.Millisecond)
snapshot, err = s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{
SnapshotUID: uid,
SessionUID: sess.UID,
})
assert.NoError(t, err)
assert.Equal(t, cloudmigration.SnapshotStatusPendingProcessing, snapshot.Status)
assert.Equal(t, 1, gmsClientMock.getStatusCalled)
t.Cleanup(cleanupFunc)
})
@@ -242,8 +266,14 @@ func Test_GetSnapshotStatusFromGMS(t *testing.T) {
SessionUID: sess.UID,
})
assert.NoError(t, err)
assert.Equal(t, cloudmigration.SnapshotStatusFinished, snapshot.Status)
assert.Equal(t, 1, gmsClientMock.getStatusCalled) // shouldn't have queried GMS again now that it is finished
require.Eventually(t, func() bool { return gmsClientMock.GetSnapshotStatusCallCount() == 1 }, time.Second, 10*time.Millisecond)
snapshot, err = s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{
SnapshotUID: uid,
SessionUID: sess.UID,
})
assert.NoError(t, err)
assert.Len(t, snapshot.Resources, 1)
assert.Equal(t, "A", snapshot.Resources[0].RefID)
assert.Equal(t, "fake", snapshot.Resources[0].Error)
@@ -300,7 +330,7 @@ func Test_OnlyQueriesStatusFromGMSWhenRequired(t *testing.T) {
SessionUID: sess.UID,
})
assert.NoError(t, err)
assert.Equal(t, 0, gmsClientMock.getStatusCalled)
assert.Never(t, func() bool { return gmsClientMock.GetSnapshotStatusCallCount() > 0 }, time.Second, 10*time.Millisecond)
}
// make sure GMS is called when snapshot is pending processing or processing
@@ -319,8 +349,9 @@ func Test_OnlyQueriesStatusFromGMSWhenRequired(t *testing.T) {
SessionUID: sess.UID,
})
assert.NoError(t, err)
assert.Equal(t, i+1, gmsClientMock.getStatusCalled)
require.Eventually(t, func() bool { return gmsClientMock.GetSnapshotStatusCallCount() == i+1 }, time.Second, 10*time.Millisecond)
}
assert.Never(t, func() bool { return gmsClientMock.GetSnapshotStatusCallCount() > 2 }, time.Second, 10*time.Millisecond)
}
func Test_DeletedDashboardsNotMigrated(t *testing.T) {
@@ -470,15 +501,25 @@ func Test_NonCoreDataSourcesHaveWarning(t *testing.T) {
},
}
// Retrieve the snapshot with results
snapshot, err := s.GetSnapshot(ctxWithSignedInUser(), cloudmigration.GetSnapshotsQuery{
SnapshotUID: snapshotUid,
SessionUID: sess.UID,
ResultPage: 1,
ResultLimit: 10,
})
assert.NoError(t, err)
assert.Len(t, snapshot.Resources, 4)
var snapshot *cloudmigration.CloudMigrationSnapshot
hasFourResources := func() bool {
// Retrieve the snapshot with results
var err error
snapshot, err = s.GetSnapshot(ctxWithSignedInUser(), cloudmigration.GetSnapshotsQuery{
SnapshotUID: snapshotUid,
SessionUID: sess.UID,
ResultPage: 1,
ResultLimit: 10,
})
if !assert.NoError(t, err) {
return false
}
return len(snapshot.Resources) == 4
}
require.Eventually(t, hasFourResources, time.Second, 10*time.Millisecond)
findRef := func(id string) *cloudmigration.CloudMigrationResource {
for _, r := range snapshot.Resources {
@@ -579,6 +620,7 @@ func TestReportEvent(t *testing.T) {
require.Equal(t, 1, gmsMock.reportEventCalled)
})
}
func TestGetFolderNamesForFolderUIDs(t *testing.T) {
s := setUpServiceTest(t, false).(*Service)
ctx, cancel := context.WithCancel(context.Background())
@@ -802,7 +844,7 @@ func setUpServiceTest(t *testing.T, withDashboardMock bool) cloudmigration.Servi
)
require.NoError(t, err)
var validConfig = `{
validConfig := `{
"alertmanager_config": {
"route": {
"receiver": "grafana-default-email"
@@ -850,6 +892,8 @@ func setUpServiceTest(t *testing.T, withDashboardMock bool) cloudmigration.Servi
}
type gmsClientMock struct {
mu sync.RWMutex
validateKeyCalled int
startSnapshotCalled int
getStatusCalled int
@@ -874,7 +918,10 @@ func (m *gmsClientMock) StartSnapshot(_ context.Context, _ cloudmigration.CloudM
}
func (m *gmsClientMock) GetSnapshotStatus(_ context.Context, _ cloudmigration.CloudMigrationSession, _ cloudmigration.CloudMigrationSnapshot, _ int) (*cloudmigration.GetSnapshotStatusResponse, error) {
m.mu.Lock()
m.getStatusCalled++
m.mu.Unlock()
return m.getSnapshotResponse, nil
}
@@ -886,3 +933,10 @@ func (m *gmsClientMock) CreatePresignedUploadUrl(ctx context.Context, session cl
func (m *gmsClientMock) ReportEvent(context.Context, cloudmigration.CloudMigrationSession, gmsclient.EventRequestDTO) {
m.reportEventCalled++
}
func (m *gmsClientMock) GetSnapshotStatusCallCount() int {
m.mu.RLock()
defer m.mu.RUnlock()
return m.getStatusCalled
}