CloudMigrations: Handle cancelation during snapshot building and upload (#90612)

implement cancelation during upload and building
This commit is contained in:
Michael Mandrus 2024-07-18 12:55:27 -04:00 committed by GitHub
parent 61d8910a5b
commit 17228e5794
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 55 additions and 11 deletions

View File

@ -43,6 +43,9 @@ type Service struct {
buildSnapshotMutex sync.Mutex
cancelMutex sync.Mutex
cancelFunc context.CancelFunc
features featuremgmt.FeatureToggles
gmsClient gmsclient.Client
objectStorage objectstorage.ObjectStorage
@ -499,10 +502,19 @@ func (s *Service) CreateSnapshot(ctx context.Context, signedInUser *user.SignedI
// start building the snapshot asynchronously while we return a success response to the client
go func() {
if err := s.buildSnapshot(context.Background(), signedInUser, initResp.MaxItemsPerPartition, snapshot); err != nil {
s.cancelMutex.Lock()
defer func() {
s.cancelFunc = nil
s.cancelMutex.Unlock()
}()
ctx, cancelFunc := context.WithCancel(context.Background())
s.cancelFunc = cancelFunc
if err := s.buildSnapshot(ctx, signedInUser, initResp.MaxItemsPerPartition, snapshot); err != nil {
s.log.Error("building snapshot", "err", err.Error())
// Update status to error with retries
if err := s.updateStatusWithRetries(context.Background(), cloudmigration.UpdateSnapshotCmd{
if err := s.updateSnapshotWithRetries(context.Background(), cloudmigration.UpdateSnapshotCmd{
UID: snapshot.UID,
Status: cloudmigration.SnapshotStatusError,
}); err != nil {
@ -616,10 +628,19 @@ func (s *Service) UploadSnapshot(ctx context.Context, sessionUid string, snapsho
// start uploading the snapshot asynchronously while we return a success response to the client
go func() {
if err := s.uploadSnapshot(context.Background(), session, snapshot, uploadUrl); err != nil {
s.cancelMutex.Lock()
defer func() {
s.cancelFunc = nil
s.cancelMutex.Unlock()
}()
ctx, cancelFunc := context.WithCancel(context.Background())
s.cancelFunc = cancelFunc
if err := s.uploadSnapshot(ctx, session, snapshot, uploadUrl); err != nil {
s.log.Error("uploading snapshot", "err", err.Error())
// Update status to error with retries
if err := s.updateStatusWithRetries(context.Background(), cloudmigration.UpdateSnapshotCmd{
if err := s.updateSnapshotWithRetries(context.Background(), cloudmigration.UpdateSnapshotCmd{
UID: snapshot.UID,
Status: cloudmigration.SnapshotStatusError,
}); err != nil {
@ -631,6 +652,29 @@ func (s *Service) UploadSnapshot(ctx context.Context, sessionUid string, snapsho
return nil
}
func (s *Service) CancelSnapshot(ctx context.Context, sessionUid string, snapshotUid string) error {
panic("not implemented")
func (s *Service) CancelSnapshot(ctx context.Context, sessionUid string, snapshotUid string) (err error) {
// The cancel func itself is protected by a mutex in the async threads, so it may or may not be set by the time CancelSnapshot is called
// Attempt to cancel and recover from the panic if the cancel function is nil
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("nothing to cancel")
}
}()
s.cancelFunc()
// Canceling will ensure that any goroutines holding the lock finish and release the lock
s.cancelMutex.Lock()
defer s.cancelMutex.Unlock()
s.cancelFunc = nil
if err := s.updateSnapshotWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{
UID: snapshotUid,
Status: cloudmigration.SnapshotStatusCanceled,
}); err != nil {
s.log.Error("critical failure during snapshot cancelation - please report any error logs")
}
s.log.Info("canceled snapshot", "sessionUid", sessionUid, "snapshotUid", snapshotUid)
return nil
}

View File

@ -167,7 +167,7 @@ func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedIn
defer s.buildSnapshotMutex.Unlock()
// Update status to snapshot creating with retries
if err := s.updateStatusWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{
if err := s.updateSnapshotWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{
UID: snapshotMeta.UID,
Status: cloudmigration.SnapshotStatusCreating,
}); err != nil {
@ -233,7 +233,7 @@ func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedIn
}
// update snapshot status to pending upload with retries
if err := s.updateStatusWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{
if err := s.updateSnapshotWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{
UID: snapshotMeta.UID,
Status: cloudmigration.SnapshotStatusPendingUpload,
Resources: localSnapshotResource,
@ -251,7 +251,7 @@ func (s *Service) uploadSnapshot(ctx context.Context, session *cloudmigration.Cl
defer s.buildSnapshotMutex.Unlock()
// update snapshot status to uploading with retries
if err := s.updateStatusWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{
if err := s.updateSnapshotWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{
UID: snapshotMeta.UID,
Status: cloudmigration.SnapshotStatusUploading,
}); err != nil {
@ -300,7 +300,7 @@ func (s *Service) uploadSnapshot(ctx context.Context, session *cloudmigration.Cl
s.log.Info("successfully uploaded snapshot", "snapshotUid", snapshotMeta.UID, "cloud_snapshotUid", snapshotMeta.GMSSnapshotUID)
// update snapshot status to processing with retries
if err := s.updateStatusWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{
if err := s.updateSnapshotWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{
UID: snapshotMeta.UID,
Status: cloudmigration.SnapshotStatusProcessing,
}); err != nil {
@ -330,7 +330,7 @@ func (s *Service) uploadUsingPresignedURL(ctx context.Context, uploadURL, key st
return nil
}
func (s *Service) updateStatusWithRetries(ctx context.Context, cmd cloudmigration.UpdateSnapshotCmd) (err error) {
func (s *Service) updateSnapshotWithRetries(ctx context.Context, cmd cloudmigration.UpdateSnapshotCmd) (err error) {
if err := retryer.Retry(func() (retryer.RetrySignal, error) {
err := s.store.UpdateSnapshot(ctx, cmd)
return retryer.FuncComplete, err