mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
CloudMigrations: Query GMS for a presigned upload url at upload time (#90505)
query GMS for an upload url at upload time
This commit is contained in:
parent
970cafa20f
commit
9b7e9d992b
@ -1942,6 +1942,8 @@ start_snapshot_timeout = 5s
|
|||||||
validate_key_timeout = 5s
|
validate_key_timeout = 5s
|
||||||
# How long to wait for a request sent to gms to get a snapshot status to complete
|
# How long to wait for a request sent to gms to get a snapshot status to complete
|
||||||
get_snapshot_status_timeout = 5s
|
get_snapshot_status_timeout = 5s
|
||||||
|
# How long to wait for a request sent to gms to create a presigned upload url
|
||||||
|
create_upload_url_timeout = 5s
|
||||||
# How long to wait for a request to fetch an instance to complete
|
# How long to wait for a request to fetch an instance to complete
|
||||||
fetch_instance_timeout = 5s
|
fetch_instance_timeout = 5s
|
||||||
# How long to wait for a request to create an access policy to complete
|
# How long to wait for a request to create an access policy to complete
|
||||||
|
@ -1875,6 +1875,8 @@ timeout = 30s
|
|||||||
;validate_key_timeout = 5s
|
;validate_key_timeout = 5s
|
||||||
# How long to wait for a request sent to gms to get a snapshot status to complete
|
# How long to wait for a request sent to gms to get a snapshot status to complete
|
||||||
;get_snapshot_status_timeout = 5s
|
;get_snapshot_status_timeout = 5s
|
||||||
|
# How long to wait for a request sent to gms to create a presigned upload url
|
||||||
|
;create_upload_url_timeout = 5s
|
||||||
# How long to wait for a request to fetch an instance to complete
|
# How long to wait for a request to fetch an instance to complete
|
||||||
;fetch_instance_timeout = 5s
|
;fetch_instance_timeout = 5s
|
||||||
# How long to wait for a request to create an access policy to complete
|
# How long to wait for a request to create an access policy to complete
|
||||||
|
@ -487,7 +487,6 @@ func (s *Service) CreateSnapshot(ctx context.Context, signedInUser *user.SignedI
|
|||||||
SessionUID: sessionUid,
|
SessionUID: sessionUid,
|
||||||
Status: cloudmigration.SnapshotStatusCreating,
|
Status: cloudmigration.SnapshotStatusCreating,
|
||||||
EncryptionKey: initResp.EncryptionKey,
|
EncryptionKey: initResp.EncryptionKey,
|
||||||
UploadURL: initResp.UploadURL,
|
|
||||||
GMSSnapshotUID: initResp.SnapshotID,
|
GMSSnapshotUID: initResp.SnapshotID,
|
||||||
LocalDir: filepath.Join(s.cfg.CloudMigration.SnapshotFolder, "grafana", "snapshots", initResp.SnapshotID),
|
LocalDir: filepath.Join(s.cfg.CloudMigration.SnapshotFolder, "grafana", "snapshots", initResp.SnapshotID),
|
||||||
}
|
}
|
||||||
@ -601,11 +600,16 @@ func (s *Service) UploadSnapshot(ctx context.Context, sessionUid string, snapsho
|
|||||||
return fmt.Errorf("fetching snapshot with uid %s: %w", snapshotUid, err)
|
return fmt.Errorf("fetching snapshot with uid %s: %w", snapshotUid, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.log.Info("Uploading snapshot in local directory", "gmsSnapshotUID", snapshot.GMSSnapshotUID, "localDir", snapshot.LocalDir, "uploadURL", snapshot.UploadURL)
|
uploadUrl, err := s.gmsClient.CreatePresignedUploadUrl(ctx, *session, *snapshot)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("creating presigned upload url for snapshot %s: %w", snapshotUid, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.log.Info("Uploading snapshot in local directory", "gmsSnapshotUID", snapshot.GMSSnapshotUID, "localDir", snapshot.LocalDir, "uploadURL", uploadUrl)
|
||||||
|
|
||||||
// start uploading the snapshot asynchronously while we return a success response to the client
|
// start uploading the snapshot asynchronously while we return a success response to the client
|
||||||
go func() {
|
go func() {
|
||||||
if err := s.uploadSnapshot(context.Background(), session, snapshot); err != nil {
|
if err := s.uploadSnapshot(context.Background(), session, snapshot, uploadUrl); err != nil {
|
||||||
s.log.Error("uploading snapshot", "err", err)
|
s.log.Error("uploading snapshot", "err", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -449,9 +449,10 @@ func setUpServiceTest(t *testing.T, withDashboardMock bool) cloudmigration.Servi
|
|||||||
}
|
}
|
||||||
|
|
||||||
type gmsClientMock struct {
|
type gmsClientMock struct {
|
||||||
validateKeyCalled int
|
validateKeyCalled int
|
||||||
startSnapshotCalled int
|
startSnapshotCalled int
|
||||||
getStatusCalled int
|
getStatusCalled int
|
||||||
|
createUploadUrlCalled int
|
||||||
|
|
||||||
getSnapshotResponse *cloudmigration.GetSnapshotStatusResponse
|
getSnapshotResponse *cloudmigration.GetSnapshotStatusResponse
|
||||||
}
|
}
|
||||||
@ -474,3 +475,8 @@ func (m *gmsClientMock) GetSnapshotStatus(_ context.Context, _ cloudmigration.Cl
|
|||||||
m.getStatusCalled++
|
m.getStatusCalled++
|
||||||
return m.getSnapshotResponse, nil
|
return m.getSnapshotResponse, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *gmsClientMock) CreatePresignedUploadUrl(ctx context.Context, session cloudmigration.CloudMigrationSession, snapshot cloudmigration.CloudMigrationSnapshot) (string, error) {
|
||||||
|
m.createUploadUrlCalled++
|
||||||
|
return "http://localhost:3000", nil
|
||||||
|
}
|
||||||
|
@ -249,7 +249,7 @@ func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedIn
|
|||||||
}
|
}
|
||||||
|
|
||||||
// asynchronous process for and updating the snapshot status
|
// asynchronous process for and updating the snapshot status
|
||||||
func (s *Service) uploadSnapshot(ctx context.Context, session *cloudmigration.CloudMigrationSession, snapshotMeta *cloudmigration.CloudMigrationSnapshot) (err error) {
|
func (s *Service) uploadSnapshot(ctx context.Context, session *cloudmigration.CloudMigrationSession, snapshotMeta *cloudmigration.CloudMigrationSnapshot, uploadUrl string) (err error) {
|
||||||
// TODO -- make sure we can only upload one snapshot at a time
|
// TODO -- make sure we can only upload one snapshot at a time
|
||||||
s.buildSnapshotMutex.Lock()
|
s.buildSnapshotMutex.Lock()
|
||||||
defer s.buildSnapshotMutex.Unlock()
|
defer s.buildSnapshotMutex.Unlock()
|
||||||
@ -288,7 +288,7 @@ func (s *Service) uploadSnapshot(ctx context.Context, session *cloudmigration.Cl
|
|||||||
for _, fileName := range fileNames {
|
for _, fileName := range fileNames {
|
||||||
filePath := filepath.Join(snapshotMeta.LocalDir, fileName)
|
filePath := filepath.Join(snapshotMeta.LocalDir, fileName)
|
||||||
key := fmt.Sprintf("%d/snapshots/%s/%s", session.StackID, snapshotMeta.GMSSnapshotUID, fileName)
|
key := fmt.Sprintf("%d/snapshots/%s/%s", session.StackID, snapshotMeta.GMSSnapshotUID, fileName)
|
||||||
if err := s.uploadUsingPresignedURL(ctx, snapshotMeta.UploadURL, key, filePath); err != nil {
|
if err := s.uploadUsingPresignedURL(ctx, uploadUrl, key, filePath); err != nil {
|
||||||
return fmt.Errorf("uploading snapshot file using presigned url: %w", err)
|
return fmt.Errorf("uploading snapshot file using presigned url: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -300,7 +300,7 @@ func (s *Service) uploadSnapshot(ctx context.Context, session *cloudmigration.Cl
|
|||||||
return fmt.Errorf("seeking to beginning of index file: %w", err)
|
return fmt.Errorf("seeking to beginning of index file: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.objectStorage.PresignedURLUpload(ctx, snapshotMeta.UploadURL, key, indexFile); err != nil {
|
if err := s.objectStorage.PresignedURLUpload(ctx, uploadUrl, key, indexFile); err != nil {
|
||||||
return fmt.Errorf("uploading file using presigned url: %w", err)
|
return fmt.Errorf("uploading file using presigned url: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -11,6 +11,7 @@ type Client interface {
|
|||||||
MigrateData(context.Context, cloudmigration.CloudMigrationSession, cloudmigration.MigrateDataRequest) (*cloudmigration.MigrateDataResponse, error)
|
MigrateData(context.Context, cloudmigration.CloudMigrationSession, cloudmigration.MigrateDataRequest) (*cloudmigration.MigrateDataResponse, error)
|
||||||
StartSnapshot(context.Context, cloudmigration.CloudMigrationSession) (*cloudmigration.StartSnapshotResponse, error)
|
StartSnapshot(context.Context, cloudmigration.CloudMigrationSession) (*cloudmigration.StartSnapshotResponse, error)
|
||||||
GetSnapshotStatus(context.Context, cloudmigration.CloudMigrationSession, cloudmigration.CloudMigrationSnapshot, int) (*cloudmigration.GetSnapshotStatusResponse, error)
|
GetSnapshotStatus(context.Context, cloudmigration.CloudMigrationSession, cloudmigration.CloudMigrationSnapshot, int) (*cloudmigration.GetSnapshotStatusResponse, error)
|
||||||
|
CreatePresignedUploadUrl(context.Context, cloudmigration.CloudMigrationSession, cloudmigration.CloudMigrationSnapshot) (string, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
const logPrefix = "cloudmigration.gmsclient"
|
const logPrefix = "cloudmigration.gmsclient"
|
||||||
|
@ -44,3 +44,7 @@ type MigrateDataResponseItemDTO struct {
|
|||||||
Status ItemStatus `json:"status"`
|
Status ItemStatus `json:"status"`
|
||||||
Error string `json:"error,omitempty"`
|
Error string `json:"error,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type CreateSnapshotUploadUrlResponseDTO struct {
|
||||||
|
UploadUrl string `json:"uploadUrl"`
|
||||||
|
}
|
||||||
|
@ -206,6 +206,47 @@ func (c *gmsClientImpl) GetSnapshotStatus(ctx context.Context, session cloudmigr
|
|||||||
return &result, nil
|
return &result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *gmsClientImpl) CreatePresignedUploadUrl(ctx context.Context, session cloudmigration.CloudMigrationSession, snapshot cloudmigration.CloudMigrationSnapshot) (string, error) {
|
||||||
|
logger := c.log.FromContext(ctx)
|
||||||
|
|
||||||
|
path := fmt.Sprintf("%s/api/v1/status/%s/create-upload-url", c.buildBasePath(session.ClusterSlug), snapshot.GMSSnapshotUID)
|
||||||
|
|
||||||
|
// Send the request to gms with the associated auth token
|
||||||
|
req, err := http.NewRequest(http.MethodPost, path, nil)
|
||||||
|
if err != nil {
|
||||||
|
c.log.Error("error creating http request to create upload url", "err", err.Error())
|
||||||
|
return "", fmt.Errorf("http request error: %w", err)
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
req.Header.Set("Authorization", fmt.Sprintf("Bearer %d:%s", session.StackID, session.AuthToken))
|
||||||
|
|
||||||
|
client := &http.Client{
|
||||||
|
Timeout: c.cfg.CloudMigration.GMSCreateUploadUrlTimeout,
|
||||||
|
}
|
||||||
|
resp, err := client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
c.log.Error("error sending http request to create an upload url", "err", err.Error())
|
||||||
|
return "", fmt.Errorf("http request error: %w", err)
|
||||||
|
} else if resp.StatusCode >= 400 {
|
||||||
|
c.log.Error("received error response to create an upload url", "statusCode", resp.StatusCode)
|
||||||
|
return "", fmt.Errorf("http request error: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
if err := resp.Body.Close(); err != nil {
|
||||||
|
logger.Error("closing request body: %w", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
var result CreateSnapshotUploadUrlResponseDTO
|
||||||
|
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||||
|
logger.Error("unmarshalling response body: %w", err)
|
||||||
|
return "", fmt.Errorf("unmarshalling create upload url response: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result.UploadUrl, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *gmsClientImpl) buildBasePath(clusterSlug string) string {
|
func (c *gmsClientImpl) buildBasePath(clusterSlug string) string {
|
||||||
domain := c.cfg.CloudMigration.GMSDomain
|
domain := c.cfg.CloudMigration.GMSDomain
|
||||||
if strings.HasPrefix(domain, "http://localhost") {
|
if strings.HasPrefix(domain, "http://localhost") {
|
||||||
|
@ -58,7 +58,6 @@ func (c *memoryClientImpl) StartSnapshot(context.Context, cloudmigration.CloudMi
|
|||||||
}
|
}
|
||||||
c.snapshot = &cloudmigration.StartSnapshotResponse{
|
c.snapshot = &cloudmigration.StartSnapshotResponse{
|
||||||
EncryptionKey: fmt.Sprintf("%x", publicKey[:]),
|
EncryptionKey: fmt.Sprintf("%x", publicKey[:]),
|
||||||
UploadURL: "localhost:3000",
|
|
||||||
SnapshotID: uuid.NewString(),
|
SnapshotID: uuid.NewString(),
|
||||||
MaxItemsPerPartition: 10,
|
MaxItemsPerPartition: 10,
|
||||||
Algo: "nacl",
|
Algo: "nacl",
|
||||||
@ -92,3 +91,7 @@ func (c *memoryClientImpl) GetSnapshotStatus(ctx context.Context, session cloudm
|
|||||||
|
|
||||||
return gmsResp, nil
|
return gmsResp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *memoryClientImpl) CreatePresignedUploadUrl(ctx context.Context, sess cloudmigration.CloudMigrationSession, snapshot cloudmigration.CloudMigrationSnapshot) (string, error) {
|
||||||
|
return "http://localhost:3000", nil
|
||||||
|
}
|
||||||
|
@ -38,7 +38,6 @@ type CloudMigrationSnapshot struct {
|
|||||||
SessionUID string `xorm:"session_uid"`
|
SessionUID string `xorm:"session_uid"`
|
||||||
Status SnapshotStatus
|
Status SnapshotStatus
|
||||||
EncryptionKey string `xorm:"encryption_key"` // stored in the unified secrets table
|
EncryptionKey string `xorm:"encryption_key"` // stored in the unified secrets table
|
||||||
UploadURL string `xorm:"upload_url"`
|
|
||||||
LocalDir string `xorm:"local_directory"`
|
LocalDir string `xorm:"local_directory"`
|
||||||
GMSSnapshotUID string `xorm:"gms_snapshot_uid"`
|
GMSSnapshotUID string `xorm:"gms_snapshot_uid"`
|
||||||
ErrorString string `xorm:"error_string"`
|
ErrorString string `xorm:"error_string"`
|
||||||
@ -208,7 +207,6 @@ type StartSnapshotResponse struct {
|
|||||||
SnapshotID string `json:"snapshotID"`
|
SnapshotID string `json:"snapshotID"`
|
||||||
MaxItemsPerPartition uint32 `json:"maxItemsPerPartition"`
|
MaxItemsPerPartition uint32 `json:"maxItemsPerPartition"`
|
||||||
Algo string `json:"algo"`
|
Algo string `json:"algo"`
|
||||||
UploadURL string `json:"uploadURL"`
|
|
||||||
EncryptionKey string `json:"encryptionKey"`
|
EncryptionKey string `json:"encryptionKey"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -12,6 +12,7 @@ type CloudMigrationSettings struct {
|
|||||||
GMSDomain string
|
GMSDomain string
|
||||||
GMSStartSnapshotTimeout time.Duration
|
GMSStartSnapshotTimeout time.Duration
|
||||||
GMSGetSnapshotStatusTimeout time.Duration
|
GMSGetSnapshotStatusTimeout time.Duration
|
||||||
|
GMSCreateUploadUrlTimeout time.Duration
|
||||||
GMSValidateKeyTimeout time.Duration
|
GMSValidateKeyTimeout time.Duration
|
||||||
FetchInstanceTimeout time.Duration
|
FetchInstanceTimeout time.Duration
|
||||||
CreateAccessPolicyTimeout time.Duration
|
CreateAccessPolicyTimeout time.Duration
|
||||||
@ -34,6 +35,7 @@ func (cfg *Cfg) readCloudMigrationSettings() {
|
|||||||
cfg.CloudMigration.GMSValidateKeyTimeout = cloudMigration.Key("validate_key_timeout").MustDuration(5 * time.Second)
|
cfg.CloudMigration.GMSValidateKeyTimeout = cloudMigration.Key("validate_key_timeout").MustDuration(5 * time.Second)
|
||||||
cfg.CloudMigration.GMSStartSnapshotTimeout = cloudMigration.Key("start_snapshot_timeout").MustDuration(5 * time.Second)
|
cfg.CloudMigration.GMSStartSnapshotTimeout = cloudMigration.Key("start_snapshot_timeout").MustDuration(5 * time.Second)
|
||||||
cfg.CloudMigration.GMSGetSnapshotStatusTimeout = cloudMigration.Key("get_snapshot_status_timeout").MustDuration(5 * time.Second)
|
cfg.CloudMigration.GMSGetSnapshotStatusTimeout = cloudMigration.Key("get_snapshot_status_timeout").MustDuration(5 * time.Second)
|
||||||
|
cfg.CloudMigration.GMSCreateUploadUrlTimeout = cloudMigration.Key("create_upload_url_timeout").MustDuration(5 * time.Second)
|
||||||
cfg.CloudMigration.FetchInstanceTimeout = cloudMigration.Key("fetch_instance_timeout").MustDuration(5 * time.Second)
|
cfg.CloudMigration.FetchInstanceTimeout = cloudMigration.Key("fetch_instance_timeout").MustDuration(5 * time.Second)
|
||||||
cfg.CloudMigration.CreateAccessPolicyTimeout = cloudMigration.Key("create_access_policy_timeout").MustDuration(5 * time.Second)
|
cfg.CloudMigration.CreateAccessPolicyTimeout = cloudMigration.Key("create_access_policy_timeout").MustDuration(5 * time.Second)
|
||||||
cfg.CloudMigration.FetchAccessPolicyTimeout = cloudMigration.Key("fetch_access_policy_timeout").MustDuration(5 * time.Second)
|
cfg.CloudMigration.FetchAccessPolicyTimeout = cloudMigration.Key("fetch_access_policy_timeout").MustDuration(5 * time.Second)
|
||||||
|
Loading…
Reference in New Issue
Block a user