From 9b7e9d992b3e49cf7feb5c45e19f9ba296b63318 Mon Sep 17 00:00:00 2001 From: Michael Mandrus <41969079+mmandrus@users.noreply.github.com> Date: Wed, 17 Jul 2024 11:53:21 -0400 Subject: [PATCH] CloudMigrations: Query GMS for a presigned upload url at upload time (#90505) query GMS for an upload url at upload time --- conf/defaults.ini | 2 + conf/sample.ini | 2 + .../cloudmigrationimpl/cloudmigration.go | 10 +++-- .../cloudmigrationimpl/cloudmigration_test.go | 12 ++++-- .../cloudmigrationimpl/snapshot_mgmt.go | 6 +-- .../cloudmigration/gmsclient/client.go | 1 + pkg/services/cloudmigration/gmsclient/dtos.go | 4 ++ .../cloudmigration/gmsclient/gms_client.go | 41 +++++++++++++++++++ .../gmsclient/inmemory_client.go | 5 ++- pkg/services/cloudmigration/model.go | 2 - pkg/setting/setting_cloud_migration.go | 2 + 11 files changed, 75 insertions(+), 12 deletions(-) diff --git a/conf/defaults.ini b/conf/defaults.ini index bd38558716e..f66d8a10620 100644 --- a/conf/defaults.ini +++ b/conf/defaults.ini @@ -1942,6 +1942,8 @@ start_snapshot_timeout = 5s validate_key_timeout = 5s # How long to wait for a request sent to gms to get a snapshot status to complete get_snapshot_status_timeout = 5s +# How long to wait for a request sent to gms to create a presigned upload url +create_upload_url_timeout = 5s # How long to wait for a request to fetch an instance to complete fetch_instance_timeout = 5s # How long to wait for a request to create an access policy to complete diff --git a/conf/sample.ini b/conf/sample.ini index 72db893cc20..0482e9388e0 100644 --- a/conf/sample.ini +++ b/conf/sample.ini @@ -1875,6 +1875,8 @@ timeout = 30s ;validate_key_timeout = 5s # How long to wait for a request sent to gms to get a snapshot status to complete ;get_snapshot_status_timeout = 5s +# How long to wait for a request sent to gms to create a presigned upload url +;create_upload_url_timeout = 5s # How long to wait for a request to fetch an instance to complete ;fetch_instance_timeout = 5s # How long to wait for a request to create an access policy to complete diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go index 933b49358a4..01218dd8d10 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go @@ -487,7 +487,6 @@ func (s *Service) CreateSnapshot(ctx context.Context, signedInUser *user.SignedI SessionUID: sessionUid, Status: cloudmigration.SnapshotStatusCreating, EncryptionKey: initResp.EncryptionKey, - UploadURL: initResp.UploadURL, GMSSnapshotUID: initResp.SnapshotID, LocalDir: filepath.Join(s.cfg.CloudMigration.SnapshotFolder, "grafana", "snapshots", initResp.SnapshotID), } @@ -601,11 +600,16 @@ func (s *Service) UploadSnapshot(ctx context.Context, sessionUid string, snapsho return fmt.Errorf("fetching snapshot with uid %s: %w", snapshotUid, err) } - s.log.Info("Uploading snapshot in local directory", "gmsSnapshotUID", snapshot.GMSSnapshotUID, "localDir", snapshot.LocalDir, "uploadURL", snapshot.UploadURL) + uploadUrl, err := s.gmsClient.CreatePresignedUploadUrl(ctx, *session, *snapshot) + if err != nil { + return fmt.Errorf("creating presigned upload url for snapshot %s: %w", snapshotUid, err) + } + + s.log.Info("Uploading snapshot in local directory", "gmsSnapshotUID", snapshot.GMSSnapshotUID, "localDir", snapshot.LocalDir, "uploadURL", uploadUrl) // start uploading the snapshot asynchronously while we return a success response to the client go func() { - if err := s.uploadSnapshot(context.Background(), session, snapshot); err != nil { + if err := s.uploadSnapshot(context.Background(), session, snapshot, uploadUrl); err != nil { s.log.Error("uploading snapshot", "err", err) } }() diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go index 9176c1b8a17..1afb3a5c5e3 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go @@ -449,9 +449,10 @@ func setUpServiceTest(t *testing.T, withDashboardMock bool) cloudmigration.Servi } type gmsClientMock struct { - validateKeyCalled int - startSnapshotCalled int - getStatusCalled int + validateKeyCalled int + startSnapshotCalled int + getStatusCalled int + createUploadUrlCalled int getSnapshotResponse *cloudmigration.GetSnapshotStatusResponse } @@ -474,3 +475,8 @@ func (m *gmsClientMock) GetSnapshotStatus(_ context.Context, _ cloudmigration.Cl m.getStatusCalled++ return m.getSnapshotResponse, nil } + +func (m *gmsClientMock) CreatePresignedUploadUrl(ctx context.Context, session cloudmigration.CloudMigrationSession, snapshot cloudmigration.CloudMigrationSnapshot) (string, error) { + m.createUploadUrlCalled++ + return "http://localhost:3000", nil +} diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go b/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go index 389e99a9183..d922ceb8331 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go @@ -249,7 +249,7 @@ func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedIn } // asynchronous process for and updating the snapshot status -func (s *Service) uploadSnapshot(ctx context.Context, session *cloudmigration.CloudMigrationSession, snapshotMeta *cloudmigration.CloudMigrationSnapshot) (err error) { +func (s *Service) uploadSnapshot(ctx context.Context, session *cloudmigration.CloudMigrationSession, snapshotMeta *cloudmigration.CloudMigrationSnapshot, uploadUrl string) (err error) { // TODO -- make sure we can only upload one snapshot at a time s.buildSnapshotMutex.Lock() defer s.buildSnapshotMutex.Unlock() @@ -288,7 +288,7 @@ func (s *Service) uploadSnapshot(ctx context.Context, session *cloudmigration.Cl for _, fileName := range fileNames { filePath := filepath.Join(snapshotMeta.LocalDir, fileName) key := fmt.Sprintf("%d/snapshots/%s/%s", session.StackID, snapshotMeta.GMSSnapshotUID, fileName) - if err := s.uploadUsingPresignedURL(ctx, snapshotMeta.UploadURL, key, filePath); err != nil { + if err := s.uploadUsingPresignedURL(ctx, uploadUrl, key, filePath); err != nil { return fmt.Errorf("uploading snapshot file using presigned url: %w", err) } } @@ -300,7 +300,7 @@ func (s *Service) uploadSnapshot(ctx context.Context, session *cloudmigration.Cl return fmt.Errorf("seeking to beginning of index file: %w", err) } - if err := s.objectStorage.PresignedURLUpload(ctx, snapshotMeta.UploadURL, key, indexFile); err != nil { + if err := s.objectStorage.PresignedURLUpload(ctx, uploadUrl, key, indexFile); err != nil { return fmt.Errorf("uploading file using presigned url: %w", err) } diff --git a/pkg/services/cloudmigration/gmsclient/client.go b/pkg/services/cloudmigration/gmsclient/client.go index e6bc631c6d2..ed75a6493cb 100644 --- a/pkg/services/cloudmigration/gmsclient/client.go +++ b/pkg/services/cloudmigration/gmsclient/client.go @@ -11,6 +11,7 @@ type Client interface { MigrateData(context.Context, cloudmigration.CloudMigrationSession, cloudmigration.MigrateDataRequest) (*cloudmigration.MigrateDataResponse, error) StartSnapshot(context.Context, cloudmigration.CloudMigrationSession) (*cloudmigration.StartSnapshotResponse, error) GetSnapshotStatus(context.Context, cloudmigration.CloudMigrationSession, cloudmigration.CloudMigrationSnapshot, int) (*cloudmigration.GetSnapshotStatusResponse, error) + CreatePresignedUploadUrl(context.Context, cloudmigration.CloudMigrationSession, cloudmigration.CloudMigrationSnapshot) (string, error) } const logPrefix = "cloudmigration.gmsclient" diff --git a/pkg/services/cloudmigration/gmsclient/dtos.go b/pkg/services/cloudmigration/gmsclient/dtos.go index 0b03dbf3f4b..31ab080a29d 100644 --- a/pkg/services/cloudmigration/gmsclient/dtos.go +++ b/pkg/services/cloudmigration/gmsclient/dtos.go @@ -44,3 +44,7 @@ type MigrateDataResponseItemDTO struct { Status ItemStatus `json:"status"` Error string `json:"error,omitempty"` } + +type CreateSnapshotUploadUrlResponseDTO struct { + UploadUrl string `json:"uploadUrl"` +} diff --git a/pkg/services/cloudmigration/gmsclient/gms_client.go b/pkg/services/cloudmigration/gmsclient/gms_client.go index b73254eed29..19f152d9966 100644 --- a/pkg/services/cloudmigration/gmsclient/gms_client.go +++ b/pkg/services/cloudmigration/gmsclient/gms_client.go @@ -206,6 +206,47 @@ func (c *gmsClientImpl) GetSnapshotStatus(ctx context.Context, session cloudmigr return &result, nil } +func (c *gmsClientImpl) CreatePresignedUploadUrl(ctx context.Context, session cloudmigration.CloudMigrationSession, snapshot cloudmigration.CloudMigrationSnapshot) (string, error) { + logger := c.log.FromContext(ctx) + + path := fmt.Sprintf("%s/api/v1/status/%s/create-upload-url", c.buildBasePath(session.ClusterSlug), snapshot.GMSSnapshotUID) + + // Send the request to gms with the associated auth token + req, err := http.NewRequest(http.MethodPost, path, nil) + if err != nil { + c.log.Error("error creating http request to create upload url", "err", err.Error()) + return "", fmt.Errorf("http request error: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", fmt.Sprintf("Bearer %d:%s", session.StackID, session.AuthToken)) + + client := &http.Client{ + Timeout: c.cfg.CloudMigration.GMSCreateUploadUrlTimeout, + } + resp, err := client.Do(req) + if err != nil { + c.log.Error("error sending http request to create an upload url", "err", err.Error()) + return "", fmt.Errorf("http request error: %w", err) + } else if resp.StatusCode >= 400 { + c.log.Error("received error response to create an upload url", "statusCode", resp.StatusCode) + return "", fmt.Errorf("http request error: %w", err) + } + + defer func() { + if err := resp.Body.Close(); err != nil { + logger.Error("closing request body: %w", err) + } + }() + + var result CreateSnapshotUploadUrlResponseDTO + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + logger.Error("unmarshalling response body: %w", err) + return "", fmt.Errorf("unmarshalling create upload url response: %w", err) + } + + return result.UploadUrl, nil +} + func (c *gmsClientImpl) buildBasePath(clusterSlug string) string { domain := c.cfg.CloudMigration.GMSDomain if strings.HasPrefix(domain, "http://localhost") { diff --git a/pkg/services/cloudmigration/gmsclient/inmemory_client.go b/pkg/services/cloudmigration/gmsclient/inmemory_client.go index e84780a7f2f..8d1219f1eab 100644 --- a/pkg/services/cloudmigration/gmsclient/inmemory_client.go +++ b/pkg/services/cloudmigration/gmsclient/inmemory_client.go @@ -58,7 +58,6 @@ func (c *memoryClientImpl) StartSnapshot(context.Context, cloudmigration.CloudMi } c.snapshot = &cloudmigration.StartSnapshotResponse{ EncryptionKey: fmt.Sprintf("%x", publicKey[:]), - UploadURL: "localhost:3000", SnapshotID: uuid.NewString(), MaxItemsPerPartition: 10, Algo: "nacl", @@ -92,3 +91,7 @@ func (c *memoryClientImpl) GetSnapshotStatus(ctx context.Context, session cloudm return gmsResp, nil } + +func (c *memoryClientImpl) CreatePresignedUploadUrl(ctx context.Context, sess cloudmigration.CloudMigrationSession, snapshot cloudmigration.CloudMigrationSnapshot) (string, error) { + return "http://localhost:3000", nil +} diff --git a/pkg/services/cloudmigration/model.go b/pkg/services/cloudmigration/model.go index d26b4877b85..9fe9afc449f 100644 --- a/pkg/services/cloudmigration/model.go +++ b/pkg/services/cloudmigration/model.go @@ -38,7 +38,6 @@ type CloudMigrationSnapshot struct { SessionUID string `xorm:"session_uid"` Status SnapshotStatus EncryptionKey string `xorm:"encryption_key"` // stored in the unified secrets table - UploadURL string `xorm:"upload_url"` LocalDir string `xorm:"local_directory"` GMSSnapshotUID string `xorm:"gms_snapshot_uid"` ErrorString string `xorm:"error_string"` @@ -208,7 +207,6 @@ type StartSnapshotResponse struct { SnapshotID string `json:"snapshotID"` MaxItemsPerPartition uint32 `json:"maxItemsPerPartition"` Algo string `json:"algo"` - UploadURL string `json:"uploadURL"` EncryptionKey string `json:"encryptionKey"` } diff --git a/pkg/setting/setting_cloud_migration.go b/pkg/setting/setting_cloud_migration.go index b5de66503d1..48f91696eeb 100644 --- a/pkg/setting/setting_cloud_migration.go +++ b/pkg/setting/setting_cloud_migration.go @@ -12,6 +12,7 @@ type CloudMigrationSettings struct { GMSDomain string GMSStartSnapshotTimeout time.Duration GMSGetSnapshotStatusTimeout time.Duration + GMSCreateUploadUrlTimeout time.Duration GMSValidateKeyTimeout time.Duration FetchInstanceTimeout time.Duration CreateAccessPolicyTimeout time.Duration @@ -34,6 +35,7 @@ func (cfg *Cfg) readCloudMigrationSettings() { cfg.CloudMigration.GMSValidateKeyTimeout = cloudMigration.Key("validate_key_timeout").MustDuration(5 * time.Second) cfg.CloudMigration.GMSStartSnapshotTimeout = cloudMigration.Key("start_snapshot_timeout").MustDuration(5 * time.Second) cfg.CloudMigration.GMSGetSnapshotStatusTimeout = cloudMigration.Key("get_snapshot_status_timeout").MustDuration(5 * time.Second) + cfg.CloudMigration.GMSCreateUploadUrlTimeout = cloudMigration.Key("create_upload_url_timeout").MustDuration(5 * time.Second) cfg.CloudMigration.FetchInstanceTimeout = cloudMigration.Key("fetch_instance_timeout").MustDuration(5 * time.Second) cfg.CloudMigration.CreateAccessPolicyTimeout = cloudMigration.Key("create_access_policy_timeout").MustDuration(5 * time.Second) cfg.CloudMigration.FetchAccessPolicyTimeout = cloudMigration.Key("fetch_access_policy_timeout").MustDuration(5 * time.Second)