CloudMigrations: Query GMS for snapshot status with a results offset (#90453)

* query GMS for status with an offset

* remove unused state
This commit is contained in:
Michael Mandrus 2024-07-16 09:04:21 -04:00 committed by GitHub
parent f14ba32ea6
commit 6ff21726b7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 18 additions and 8 deletions

View File

@ -524,9 +524,11 @@ func (s *Service) GetSnapshot(ctx context.Context, query cloudmigration.GetSnaps
return nil, fmt.Errorf("fetching session for uid %s: %w", sessionUid, err)
}
// Ask GMS for snapshot status while the source of truth is in the cloud
if snapshot.ShouldQueryGMS() {
// ask GMS for status if it's in the cloud
snapshotMeta, err := s.gmsClient.GetSnapshotStatus(ctx, *session, *snapshot)
// Calculate offset based on how many results we currently have responses for
pending := snapshot.StatsRollup.CountsByStatus[cloudmigration.ItemStatusPending]
snapshotMeta, err := s.gmsClient.GetSnapshotStatus(ctx, *session, *snapshot, snapshot.StatsRollup.Total-pending)
if err != nil {
return snapshot, fmt.Errorf("error fetching snapshot status from GMS: sessionUid: %s, snapshotUid: %s", sessionUid, snapshotUid)
}

View File

@ -441,7 +441,7 @@ func (m *gmsClientMock) StartSnapshot(_ context.Context, _ cloudmigration.CloudM
return nil, nil
}
func (m *gmsClientMock) GetSnapshotStatus(_ context.Context, _ cloudmigration.CloudMigrationSession, _ cloudmigration.CloudMigrationSnapshot) (*cloudmigration.GetSnapshotStatusResponse, error) {
func (m *gmsClientMock) GetSnapshotStatus(_ context.Context, _ cloudmigration.CloudMigrationSession, _ cloudmigration.CloudMigrationSnapshot, _ int) (*cloudmigration.GetSnapshotStatusResponse, error) {
m.getStatusCalled++
return m.getSnapshotResponse, nil
}

View File

@ -339,7 +339,13 @@ func (ss *sqlStore) GetSnapshotResourceStats(ctx context.Context, snapshotUid st
Count int `json:"count"`
Status string `json:"status"`
}, 0)
total := 0
err := ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
if t, err := sess.Count(cloudmigration.CloudMigrationResource{SnapshotUID: snapshotUid}); err != nil {
return err
} else {
total = int(t)
}
sess.Select("count(uid) as 'count', resource_type as 'type'").
Table(tableName).
GroupBy("type").
@ -360,6 +366,7 @@ func (ss *sqlStore) GetSnapshotResourceStats(ctx context.Context, snapshotUid st
stats := &cloudmigration.SnapshotResourceStats{
CountsByType: make(map[cloudmigration.MigrateDataType]int, len(typeCounts)),
CountsByStatus: make(map[cloudmigration.ItemStatus]int, len(statusCounts)),
Total: total,
}
for _, c := range typeCounts {
stats.CountsByType[cloudmigration.MigrateDataType(c.Type)] = c.Count

View File

@ -257,6 +257,7 @@ func Test_SnapshotResources(t *testing.T) {
cloudmigration.ItemStatusOK: 3,
cloudmigration.ItemStatusPending: 1,
}, stats.CountsByStatus)
assert.Equal(t, 4, stats.Total)
// delete snapshot resources
err = s.DeleteSnapshotResources(ctx, "poiuy")

View File

@ -10,7 +10,7 @@ type Client interface {
ValidateKey(context.Context, cloudmigration.CloudMigrationSession) error
MigrateData(context.Context, cloudmigration.CloudMigrationSession, cloudmigration.MigrateDataRequest) (*cloudmigration.MigrateDataResponse, error)
StartSnapshot(context.Context, cloudmigration.CloudMigrationSession) (*cloudmigration.StartSnapshotResponse, error)
GetSnapshotStatus(context.Context, cloudmigration.CloudMigrationSession, cloudmigration.CloudMigrationSnapshot) (*cloudmigration.GetSnapshotStatusResponse, error)
GetSnapshotStatus(context.Context, cloudmigration.CloudMigrationSession, cloudmigration.CloudMigrationSnapshot, int) (*cloudmigration.GetSnapshotStatusResponse, error)
}
const logPrefix = "cloudmigration.gmsclient"

View File

@ -162,12 +162,12 @@ func (c *gmsClientImpl) StartSnapshot(ctx context.Context, session cloudmigratio
return &result, nil
}
func (c *gmsClientImpl) GetSnapshotStatus(ctx context.Context, session cloudmigration.CloudMigrationSession, snapshot cloudmigration.CloudMigrationSnapshot) (*cloudmigration.GetSnapshotStatusResponse, error) {
func (c *gmsClientImpl) GetSnapshotStatus(ctx context.Context, session cloudmigration.CloudMigrationSession, snapshot cloudmigration.CloudMigrationSnapshot, offset int) (*cloudmigration.GetSnapshotStatusResponse, error) {
c.getStatusMux.Lock()
defer c.getStatusMux.Unlock()
logger := c.log.FromContext(ctx)
path := fmt.Sprintf("%s/api/v1/status/%s/status", c.buildBasePath(session.ClusterSlug), snapshot.GMSSnapshotUID)
path := fmt.Sprintf("%s/api/v1/status/%s/status?offset=%d", c.buildBasePath(session.ClusterSlug), snapshot.GMSSnapshotUID, offset)
// Send the request to gms with the associated auth token
req, err := http.NewRequest(http.MethodGet, path, nil)

View File

@ -67,7 +67,7 @@ func (c *memoryClientImpl) StartSnapshot(context.Context, cloudmigration.CloudMi
return c.snapshot, nil
}
func (c *memoryClientImpl) GetSnapshotStatus(ctx context.Context, session cloudmigration.CloudMigrationSession, snapshot cloudmigration.CloudMigrationSnapshot) (*cloudmigration.GetSnapshotStatusResponse, error) {
func (c *memoryClientImpl) GetSnapshotStatus(ctx context.Context, session cloudmigration.CloudMigrationSession, snapshot cloudmigration.CloudMigrationSnapshot, offset int) (*cloudmigration.GetSnapshotStatusResponse, error) {
gmsResp := &cloudmigration.GetSnapshotStatusResponse{
State: cloudmigration.SnapshotStateFinished,
Results: []cloudmigration.CloudMigrationResource{

View File

@ -91,12 +91,12 @@ const (
ItemStatusOK ItemStatus = "OK"
ItemStatusError ItemStatus = "ERROR"
ItemStatusPending ItemStatus = "PENDING"
ItemStatusUnknown ItemStatus = "UNKNOWN"
)
type SnapshotResourceStats struct {
CountsByType map[MigrateDataType]int
CountsByStatus map[ItemStatus]int
Total int
}
// Deprecated, use GetSnapshotResult for the async workflow