diff --git a/go.mod b/go.mod index 8c84abd03df..cbf34482d7d 100644 --- a/go.mod +++ b/go.mod @@ -89,7 +89,7 @@ require ( github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 // @grafana/grafana-operator-experience-squad github.com/grafana/grafana-aws-sdk v0.28.0 // @grafana/aws-datasources github.com/grafana/grafana-azure-sdk-go/v2 v2.0.4 // @grafana/partner-datasources - github.com/grafana/grafana-cloud-migration-snapshot v1.0.0 // @grafana/grafana-operator-experience-squad + github.com/grafana/grafana-cloud-migration-snapshot v1.1.0 // @grafana/grafana-operator-experience-squad github.com/grafana/grafana-google-sdk-go v0.1.0 // @grafana/partner-datasources github.com/grafana/grafana-openapi-client-go v0.0.0-20231213163343-bd475d63fb79 // @grafana/grafana-backend-group github.com/grafana/grafana-plugin-sdk-go v0.238.0 // @grafana/plugins-platform-backend diff --git a/go.sum b/go.sum index b275b9bddb2..9c9d6ceec0e 100644 --- a/go.sum +++ b/go.sum @@ -2331,8 +2331,8 @@ github.com/grafana/grafana-aws-sdk v0.28.0 h1:ShdA+msLPGJGWWS1SFUYnF+ch1G3gUOlAd github.com/grafana/grafana-aws-sdk v0.28.0/go.mod h1:ZSVPU7IIJSi5lEg+K3Js+EUpZLXxUaBdaQWH+As1ihI= github.com/grafana/grafana-azure-sdk-go/v2 v2.0.4 h1:z6amQ286IJSBctHf6c+ibJq/v0+TvmEjVkrdMNBd4uY= github.com/grafana/grafana-azure-sdk-go/v2 v2.0.4/go.mod h1:aKlFPE36IDa8qccRg3KbgZX3MQ5xymS3RelT4j6kkVU= -github.com/grafana/grafana-cloud-migration-snapshot v1.0.0 h1:vOepRtpYS5ssG/PXLTpc/7OcL4lJiGruiU3Cw0c0DE4= -github.com/grafana/grafana-cloud-migration-snapshot v1.0.0/go.mod h1:rWNhyxYkgiXgV7xZ4yOQzMV08yikO8L8S8M5KNoQNpA= +github.com/grafana/grafana-cloud-migration-snapshot v1.1.0 h1:96Osqvdm1XXKs7ufmyFy31AW5ZWcikvcDrPX8p5LEpo= +github.com/grafana/grafana-cloud-migration-snapshot v1.1.0/go.mod h1:rWNhyxYkgiXgV7xZ4yOQzMV08yikO8L8S8M5KNoQNpA= github.com/grafana/grafana-google-sdk-go v0.1.0 h1:LKGY8z2DSxKjYfr2flZsWgTRTZ6HGQbTqewE3JvRaNA= github.com/grafana/grafana-google-sdk-go v0.1.0/go.mod h1:Vo2TKWfDVmNTELBUM+3lkrZvFtBws0qSZdXhQxRdJrE= github.com/grafana/grafana-openapi-client-go v0.0.0-20231213163343-bd475d63fb79 h1:r+mU5bGMzcXCRVAuOrTn54S80qbfVkvTdUJZfSfTNbs= diff --git a/pkg/services/cloudmigration/api/curl_commands.txt b/pkg/services/cloudmigration/api/curl_commands.txt index 925df467c31..fa7b8b10aa5 100644 --- a/pkg/services/cloudmigration/api/curl_commands.txt +++ b/pkg/services/cloudmigration/api/curl_commands.txt @@ -1,10 +1,10 @@ [sample token] // NOT A REAL TOKEN -eyJUb2tlbiI6ImNvbXBsZXRlbHlfZmFrZV90b2tlbl9jZG9peTFhYzdwdXlwZCIsIkluc3RhbmNlIjp7IlN0YWNrSUQiOjEyMzQ1LCJTbHVnIjoic3R1Ymluc3RhbmNlIiwiUmVnaW9uU2x1ZyI6ImZha2UtcmVnaW9uIiwiQ2x1c3RlclNsdWciOiJmYWtlLWNsdXNlciJ9fQ== +eyJUb2tlbiI6ImNvbXBsZXRlbHlfZmFrZV90b2tlbl9jZG9peTFhYzdwdXlwZCIsIkluc3RhbmNlIjp7IlN0YWNrSUQiOiAzODYzLCJTbHVnIjoic3R1Ymluc3RhbmNlIiwiUmVnaW9uU2x1ZyI6ImZha2UtcmVnaW9uIiwiQ2x1c3RlclNsdWciOiJmYWtlLWNsdXNlciJ9fQo= [create session] curl -X POST -H "Content-Type: application/json" \ http://admin:admin@localhost:3000/api/cloudmigration/migration \ - -d '{"AuthToken":"eyJUb2tlbiI6ImNvbXBsZXRlbHlfZmFrZV90b2tlbl9jZG9peTFhYzdwdXlwZCIsIkluc3RhbmNlIjp7IlN0YWNrSUQiOjEyMzQ1LCJTbHVnIjoic3R1Ymluc3RhbmNlIiwiUmVnaW9uU2x1ZyI6ImZha2UtcmVnaW9uIiwiQ2x1c3RlclNsdWciOiJmYWtlLWNsdXNlciJ9fQ=="}' + -d '{"AuthToken":"eyJUb2tlbiI6ImNvbXBsZXRlbHlfZmFrZV90b2tlbl9jZG9peTFhYzdwdXlwZCIsIkluc3RhbmNlIjp7IlN0YWNrSUQiOiAzODYzLCJTbHVnIjoic3R1Ymluc3RhbmNlIiwiUmVnaW9uU2x1ZyI6ImZha2UtcmVnaW9uIiwiQ2x1c3RlclNsdWciOiJmYWtlLWNsdXNlciJ9fQo="}' [create snapshot] curl -X POST -H "Content-Type: application/json" \ diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go index 7a61dafaaf7..6a9f50420e5 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go @@ -19,6 +19,7 @@ import ( "github.com/grafana/grafana/pkg/services/cloudmigration" "github.com/grafana/grafana/pkg/services/cloudmigration/api" "github.com/grafana/grafana/pkg/services/cloudmigration/gmsclient" + "github.com/grafana/grafana/pkg/services/cloudmigration/objectstorage" "github.com/grafana/grafana/pkg/services/dashboards" "github.com/grafana/grafana/pkg/services/datasources" "github.com/grafana/grafana/pkg/services/featuremgmt" @@ -42,8 +43,9 @@ type Service struct { buildSnapshotMutex sync.Mutex - features featuremgmt.FeatureToggles - gmsClient gmsclient.Client + features featuremgmt.FeatureToggles + gmsClient gmsclient.Client + objectStorage objectstorage.ObjectStorage dsService datasources.DataSourceService gcomService gcom.Service @@ -99,6 +101,8 @@ func ProvideService( } s.api = api.RegisterApi(routeRegister, s, tracer) + s.objectStorage = objectstorage.NewS3() + if !cfg.CloudMigration.IsDeveloperMode { // get GMS path from the config domain, err := s.parseCloudMigrationConfig() @@ -558,9 +562,20 @@ func (s *Service) GetSnapshotList(ctx context.Context, query cloudmigration.List } func (s *Service) UploadSnapshot(ctx context.Context, sessionUid string, snapshotUid string) error { - ctx, span := s.tracer.Start(ctx, "CloudMigrationService.UploadSnapshot") + ctx, span := s.tracer.Start(ctx, "CloudMigrationService.UploadSnapshot", + trace.WithAttributes( + attribute.String("sessionUid", sessionUid), + attribute.String("snapshotUid", snapshotUid), + ), + ) defer span.End() + // fetch session for the gms auth token + session, err := s.store.GetMigrationSessionByUID(ctx, sessionUid) + if err != nil { + return fmt.Errorf("fetching migration session for uid %s: %w", sessionUid, err) + } + snapshot, err := s.GetSnapshot(ctx, cloudmigration.GetSnapshotsQuery{ SnapshotUID: snapshotUid, SessionUID: sessionUid, @@ -569,11 +584,14 @@ func (s *Service) UploadSnapshot(ctx context.Context, sessionUid string, snapsho return fmt.Errorf("fetching snapshot with uid %s: %w", snapshotUid, err) } - s.log.Info("Uploading snapshot with GMS ID %s in local directory %s to url %s", snapshot.GMSSnapshotUID, snapshot.LocalDir, snapshot.UploadURL) - s.log.Debug("UploadSnapshot not yet implemented, faking it") + s.log.Info("Uploading snapshot in local directory", "gmsSnapshotUID", snapshot.GMSSnapshotUID, "localDir", snapshot.LocalDir, "uploadURL", snapshot.UploadURL) // start uploading the snapshot asynchronously while we return a success response to the client - go s.uploadSnapshot(context.Background(), *snapshot) + go func() { + if err := s.uploadSnapshot(context.Background(), session, snapshot); err != nil { + s.log.Error("uploading snapshot", "err", err) + } + }() return nil } diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go b/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go index fad2fba3d2e..75c9f51655b 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go @@ -4,6 +4,8 @@ import ( "context" cryptoRand "crypto/rand" "fmt" + "os" + "path/filepath" "time" snapshot "github.com/grafana/grafana-cloud-migration-snapshot/src" @@ -213,8 +215,7 @@ func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedIn // Add the grafana generated public key to the index file so gms can use it to decrypt the snapshot files later. // This works because the snapshot files are being encrypted with // the grafana generated private key + the gms public key. - _, err = snapshotWriter.Finish(publicKey[:]) - if err != nil { + if _, err := snapshotWriter.Finish(publicKey[:]); err != nil { return fmt.Errorf("finishing writing snapshot files and generating index file: %w", err) } @@ -234,7 +235,7 @@ func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedIn } // asynchronous process for and updating the snapshot status -func (s *Service) uploadSnapshot(ctx context.Context, snapshotMeta cloudmigration.CloudMigrationSnapshot) { +func (s *Service) uploadSnapshot(ctx context.Context, session *cloudmigration.CloudMigrationSession, snapshotMeta *cloudmigration.CloudMigrationSnapshot) (err error) { // TODO -- make sure we can only upload one snapshot at a time s.buildSnapshotMutex.Lock() defer s.buildSnapshotMutex.Unlock() @@ -247,34 +248,74 @@ func (s *Service) uploadSnapshot(ctx context.Context, snapshotMeta cloudmigratio }) return retryer.FuncComplete, err }, 10, time.Millisecond*100, time.Second*10); err != nil { - s.log.Error("failed to set snapshot status to 'creating'", "err", err) - return + return fmt.Errorf("failed to set snapshot status to 'creating': %w", err) } - // upload snapshot - // just sleep for now to simulate snapshot creation happening - s.log.Debug("snapshot meta", "snapshot", snapshotMeta) - time.Sleep(3 * time.Second) + indexFilePath := filepath.Join(snapshotMeta.LocalDir, "index.json") + // LocalDir can be set in the configuration, therefore the file path can be set to any path. + // nolint:gosec + indexFile, err := os.Open(indexFilePath) + if err != nil { + return fmt.Errorf("opening index files: %w", err) + } + defer func() { + if closeErr := indexFile.Close(); closeErr != nil { + s.log.Error("closing index file", "err", closeErr.Error()) + } + }() - // update snapshot status to pending processing with retry - if err := retryer.Retry(func() (retryer.RetrySignal, error) { - err := s.store.UpdateSnapshot(ctx, cloudmigration.UpdateSnapshotCmd{ - UID: snapshotMeta.UID, - Status: cloudmigration.SnapshotStatusPendingProcessing, - }) - return retryer.FuncComplete, err - }, 10, time.Millisecond*100, time.Second*10); err != nil { - s.log.Error("failed to set snapshot status to 'pending upload'", "err", err) + index, err := snapshot.ReadIndex(indexFile) + if err != nil { + return fmt.Errorf("reading index from file: %w", err) + } + + // Upload the data files. + for _, fileNames := range index.Items { + for _, fileName := range fileNames { + filePath := filepath.Join(snapshotMeta.LocalDir, fileName) + key := fmt.Sprintf("%d/snapshots/%s/%s", session.StackID, snapshotMeta.GMSSnapshotUID, fileName) + if err := s.uploadUsingPresignedURL(ctx, snapshotMeta.UploadURL, key, filePath); err != nil { + return fmt.Errorf("uploading snapshot file using presigned url: %w", err) + } + } + } + + // Upload the index file. Must be done after uploading the data files. + key := fmt.Sprintf("%d/snapshots/%s/%s", session.StackID, snapshotMeta.GMSSnapshotUID, "index.json") + if _, err := indexFile.Seek(0, 0); err != nil { + return fmt.Errorf("seeking to beginning of index file: %w", err) + } + + if err := s.objectStorage.PresignedURLUpload(ctx, snapshotMeta.UploadURL, key, indexFile); err != nil { + return fmt.Errorf("uploading file using presigned url: %w", err) } - // simulate the rest - // processing - time.Sleep(3 * time.Second) if err := s.store.UpdateSnapshot(ctx, cloudmigration.UpdateSnapshotCmd{ UID: snapshotMeta.UID, Status: cloudmigration.SnapshotStatusProcessing, }); err != nil { - s.log.Error("updating snapshot", "err", err) + return fmt.Errorf("updating snapshot: %w", err) } - // end here as the GetSnapshot handler will fill in the rest when called + + return nil +} + +func (s *Service) uploadUsingPresignedURL(ctx context.Context, uploadURL, key string, filePath string) (err error) { + // The directory that contains the file can set in the configuration, therefore the directory can be any directory. + // nolint:gosec + file, err := os.Open(filePath) + if err != nil { + return fmt.Errorf("opening snapshot file: path=%s %w", filePath, err) + } + defer func() { + if closeErr := file.Close(); closeErr != nil { + s.log.Error("closing file", "path", filePath, "err", closeErr) + } + }() + + if err = s.objectStorage.PresignedURLUpload(ctx, uploadURL, key, file); err != nil { + return fmt.Errorf("uploading file using presigned url: %w", err) + } + + return nil } diff --git a/pkg/services/cloudmigration/gmsclient/gms_client.go b/pkg/services/cloudmigration/gmsclient/gms_client.go index 4d4e6cc41b2..602a1dd52ce 100644 --- a/pkg/services/cloudmigration/gmsclient/gms_client.go +++ b/pkg/services/cloudmigration/gmsclient/gms_client.go @@ -4,8 +4,11 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" + "io" "net/http" + "strings" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/services/cloudmigration" @@ -24,11 +27,11 @@ type gmsClientImpl struct { log *log.ConcreteLogger } -func (c *gmsClientImpl) ValidateKey(ctx context.Context, cm cloudmigration.CloudMigrationSession) error { +func (c *gmsClientImpl) ValidateKey(ctx context.Context, cm cloudmigration.CloudMigrationSession) (err error) { logger := c.log.FromContext(ctx) // TODO update service url to gms - path := fmt.Sprintf("https://cms-%s.%s/cloud-migrations/api/v1/validate-key", cm.ClusterSlug, c.domain) + path := fmt.Sprintf("%s/api/v1/validate-key", buildBasePath(c.domain, cm.ClusterSlug)) // validation is an empty POST to GMS with the authorization header included req, err := http.NewRequest("POST", path, bytes.NewReader(nil)) @@ -45,30 +48,25 @@ func (c *gmsClientImpl) ValidateKey(ctx context.Context, cm cloudmigration.Cloud logger.Error("error sending http request for token validation", "err", err.Error()) return fmt.Errorf("http request error: %w", err) } - defer func() { - if err := resp.Body.Close(); err != nil { - logger.Error("closing request body", "err", err.Error()) + if closeErr := resp.Body.Close(); closeErr != nil { + err = errors.Join(err, fmt.Errorf("closing response body: %w", closeErr)) } }() if resp.StatusCode != 200 { - var errResp map[string]any - if err := json.NewDecoder(resp.Body).Decode(&errResp); err != nil { - logger.Error("decoding error response", "err", err.Error()) - } else { - return fmt.Errorf("token validation failure: %v", errResp) - } + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("token validation failure: %v", body) } return nil } -func (c *gmsClientImpl) MigrateData(ctx context.Context, cm cloudmigration.CloudMigrationSession, request cloudmigration.MigrateDataRequest) (*cloudmigration.MigrateDataResponse, error) { +func (c *gmsClientImpl) MigrateData(ctx context.Context, cm cloudmigration.CloudMigrationSession, request cloudmigration.MigrateDataRequest) (result *cloudmigration.MigrateDataResponse, err error) { logger := c.log.FromContext(ctx) // TODO update service url to gms - path := fmt.Sprintf("https://cms-%s.%s/cloud-migrations/api/v1/migrate-data", cm.ClusterSlug, c.domain) + path := fmt.Sprintf("%s/api/v1/migrate-data", buildBasePath(c.domain, cm.ClusterSlug)) reqDTO := convertRequestToDTO(request) body, err := json.Marshal(reqDTO) @@ -90,31 +88,30 @@ func (c *gmsClientImpl) MigrateData(ctx context.Context, cm cloudmigration.Cloud if err != nil { c.log.Error("error sending http request for cloud migration run", "err", err.Error()) return nil, fmt.Errorf("http request error: %w", err) - } else if resp.StatusCode >= 400 { + } + defer func() { + if closeErr := resp.Body.Close(); closeErr != nil { + err = errors.Join(err, fmt.Errorf("closing response body: %w", closeErr)) + } + }() + + if resp.StatusCode >= 400 { c.log.Error("received error response for cloud migration run", "statusCode", resp.StatusCode) return nil, fmt.Errorf("http request error: %w", err) } - defer func() { - if err := resp.Body.Close(); err != nil { - logger.Error("closing request body: %w", err) - } - }() - var respDTO MigrateDataResponseDTO if err := json.NewDecoder(resp.Body).Decode(&respDTO); err != nil { logger.Error("unmarshalling response body: %w", err) return nil, fmt.Errorf("unmarshalling migration run response: %w", err) } - result := convertResponseFromDTO(respDTO) - return &result, nil + res := convertResponseFromDTO(respDTO) + return &res, nil } -func (c *gmsClientImpl) StartSnapshot(ctx context.Context, session cloudmigration.CloudMigrationSession) (*cloudmigration.StartSnapshotResponse, error) { - logger := c.log.FromContext(ctx) - - path := fmt.Sprintf("https://cms-%s.%s/cloud-migrations/api/v1/start-snapshot", session.ClusterSlug, c.domain) +func (c *gmsClientImpl) StartSnapshot(ctx context.Context, session cloudmigration.CloudMigrationSession) (out *cloudmigration.StartSnapshotResponse, err error) { + path := fmt.Sprintf("%s/api/v1/start-snapshot", buildBasePath(c.domain, session.ClusterSlug)) // Send the request to cms with the associated auth token req, err := http.NewRequest(http.MethodPost, path, nil) @@ -130,20 +127,21 @@ func (c *gmsClientImpl) StartSnapshot(ctx context.Context, session cloudmigratio if err != nil { c.log.Error("error sending http request to start snapshot", "err", err.Error()) return nil, fmt.Errorf("http request error: %w", err) - } else if resp.StatusCode >= 400 { - c.log.Error("received error response to start snapshot", "statusCode", resp.StatusCode) - return nil, fmt.Errorf("http request error: %w", err) } - defer func() { - if err := resp.Body.Close(); err != nil { - logger.Error("closing request body: %w", err) + if closeErr := resp.Body.Close(); closeErr != nil { + err = errors.Join(err, fmt.Errorf("closing response body: %w", closeErr)) } }() + if resp.StatusCode >= 400 { + body, _ := io.ReadAll(resp.Body) + c.log.Error("received error response to start snapshot", "statusCode", resp.StatusCode) + return nil, fmt.Errorf("http request error: body=%s %w", string(body), err) + } + var result cloudmigration.StartSnapshotResponse if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { - logger.Error("unmarshalling response body: %w", err) return nil, fmt.Errorf("unmarshalling start snapshot response: %w", err) } @@ -187,3 +185,10 @@ func convertResponseFromDTO(result MigrateDataResponseDTO) cloudmigration.Migrat Items: items, } } + +func buildBasePath(domain, clusterSlug string) string { + if strings.HasPrefix(domain, "http://localhost") { + return domain + } + return fmt.Sprintf("https://cms-%s.%s/cloud-migrations", clusterSlug, domain) +} diff --git a/pkg/services/cloudmigration/gmsclient/gms_client_test.go b/pkg/services/cloudmigration/gmsclient/gms_client_test.go new file mode 100644 index 00000000000..49c34ccd105 --- /dev/null +++ b/pkg/services/cloudmigration/gmsclient/gms_client_test.go @@ -0,0 +1,36 @@ +package gmsclient + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_buildBasePath(t *testing.T) { + t.Parallel() + + tests := []struct { + description string + domain string + clusterSlug string + expected string + }{ + { + description: "domain starts with http://localhost, should return domain", + domain: "http://localhost:8080", + clusterSlug: "anything", + expected: "http://localhost:8080", + }, + { + description: "domain doesn't start with http://localhost, should build a string using the domain and clusterSlug", + domain: "gms-dev", + clusterSlug: "us-east-1", + expected: "https://cms-us-east-1.gms-dev/cloud-migrations", + }, + } + for _, tt := range tests { + t.Run(tt.description, func(t *testing.T) { + assert.Equal(t, tt.expected, buildBasePath(tt.domain, tt.clusterSlug)) + }) + } +} diff --git a/pkg/services/cloudmigration/gmsclient/inmemory_client.go b/pkg/services/cloudmigration/gmsclient/inmemory_client.go index c7b9b9abb87..ced99c01bb8 100644 --- a/pkg/services/cloudmigration/gmsclient/inmemory_client.go +++ b/pkg/services/cloudmigration/gmsclient/inmemory_client.go @@ -2,11 +2,15 @@ package gmsclient import ( "context" + "fmt" "math/rand" "time" + cryptoRand "crypto/rand" + + "github.com/google/uuid" "github.com/grafana/grafana/pkg/services/cloudmigration" - "github.com/grafana/grafana/pkg/util" + "golang.org/x/crypto/nacl/box" ) // NewInMemoryClient returns an implementation of Client that returns canned responses @@ -49,10 +53,16 @@ func (c *memoryClientImpl) MigrateData( } func (c *memoryClientImpl) StartSnapshot(context.Context, cloudmigration.CloudMigrationSession) (*cloudmigration.StartSnapshotResponse, error) { + publicKey, _, err := box.GenerateKey(cryptoRand.Reader) + if err != nil { + return nil, fmt.Errorf("nacl: generating public and private key: %w", err) + } c.snapshot = &cloudmigration.StartSnapshotResponse{ - EncryptionKey: util.GenerateShortUID(), - SnapshotID: util.GenerateShortUID(), - UploadURL: "localhost:3000", + EncryptionKey: fmt.Sprintf("%x", publicKey[:]), + UploadURL: "localhost:3000", + SnapshotID: uuid.NewString(), + MaxItemsPerPartition: 10, + Algo: "nacl", } return c.snapshot, nil diff --git a/pkg/services/cloudmigration/model.go b/pkg/services/cloudmigration/model.go index 3ab277ae1ef..76a787c5786 100644 --- a/pkg/services/cloudmigration/model.go +++ b/pkg/services/cloudmigration/model.go @@ -206,11 +206,9 @@ type CreateSessionResponse struct { } type StartSnapshotResponse struct { - SnapshotID string `json:"snapshotID"` - MaxItemsPerPartition uint32 `json:"maxItemsPerPartition"` - Algo string `json:"algo"` - UploadURL string `json:"uploadURL"` - PresignedURLFormData map[string]string `json:"presignedURLFormData"` - EncryptionKey string `json:"encryptionKey"` - Nonce string `json:"nonce"` + SnapshotID string `json:"snapshotID"` + MaxItemsPerPartition uint32 `json:"maxItemsPerPartition"` + Algo string `json:"algo"` + UploadURL string `json:"uploadURL"` + EncryptionKey string `json:"encryptionKey"` } diff --git a/pkg/services/cloudmigration/objectstorage/objectstorage.go b/pkg/services/cloudmigration/objectstorage/objectstorage.go new file mode 100644 index 00000000000..38c974c4afe --- /dev/null +++ b/pkg/services/cloudmigration/objectstorage/objectstorage.go @@ -0,0 +1,10 @@ +package objectstorage + +import ( + "context" + "io" +) + +type ObjectStorage interface { + PresignedURLUpload(ctx context.Context, url, key string, reader io.Reader) error +} diff --git a/pkg/services/cloudmigration/objectstorage/s3.go b/pkg/services/cloudmigration/objectstorage/s3.go new file mode 100644 index 00000000000..dc84553cde1 --- /dev/null +++ b/pkg/services/cloudmigration/objectstorage/s3.go @@ -0,0 +1,93 @@ +package objectstorage + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "mime/multipart" + "net/http" + "net/url" +) + +type S3 struct{} + +func NewS3() *S3 { + return &S3{} +} + +func (s3 *S3) PresignedURLUpload(ctx context.Context, presignedURL, key string, reader io.Reader) (err error) { + url, err := url.Parse(presignedURL) + if err != nil { + return fmt.Errorf("parsing presigned url") + } + + buffer := bytes.NewBuffer([]byte{}) + writer := multipart.NewWriter(buffer) + defer func() { + if closeErr := writer.Close(); closeErr != nil { + err = errors.Join(err, fmt.Errorf("closing multipart writer: %w", closeErr)) + } + }() + + for k := range url.Query() { + formField, err := writer.CreateFormField(k) + if err != nil { + return fmt.Errorf("creating %s form field: %w", k, err) + } + + v := url.Query().Get(k) + if _, err := formField.Write([]byte(v)); err != nil { + return fmt.Errorf("writing value for form field: field=%s value=%s", k, v) + } + } + + formField, err := writer.CreateFormField("key") + if err != nil { + return fmt.Errorf(": %w", err) + } + _, err = formField.Write([]byte(key)) + if err != nil { + return fmt.Errorf("writing key form field value: %w", err) + } + + formField, err = writer.CreateFormFile("file", "file") + if err != nil { + return fmt.Errorf(": %w", err) + } + + _, err = io.Copy(formField, reader) + if err != nil { + return fmt.Errorf(": %w", err) + } + + if err := writer.Close(); err != nil { + return fmt.Errorf("closing multipart writer: %w", err) + } + + endpoint := fmt.Sprintf("%s://%s%s", url.Scheme, url.Host, url.Path) + + request, err := http.NewRequest(http.MethodPost, endpoint, buffer) + if err != nil { + return fmt.Errorf("creating http request: %w", err) + } + request.Header.Set("Content-Type", writer.FormDataContentType()) + httpClient := http.Client{} + response, err := httpClient.Do(request) + if err != nil { + return fmt.Errorf("sending http request: %w", err) + } + defer func() { + if closeErr := response.Body.Close(); closeErr != nil { + err = errors.Join(err, fmt.Errorf("closing response body: %w", closeErr)) + } + }() + + if response.StatusCode >= 400 { + body, _ := io.ReadAll(response.Body) + return fmt.Errorf("unexpected response: status=%d body=%s", response.StatusCode, body) + } + + return nil +}