CloudMigrations: Add and fix logging (#90616)

* add logs so we can see how long async processes take

* fix a bunch of error logs:

* fix logs again
This commit is contained in:
Michael Mandrus 2024-07-18 14:16:56 -04:00 committed by GitHub
parent e3150b4eb8
commit 71a97b925a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 52 additions and 24 deletions

View File

@ -166,6 +166,11 @@ func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedIn
s.buildSnapshotMutex.Lock()
defer s.buildSnapshotMutex.Unlock()
start := time.Now()
defer func() {
s.log.Debug(fmt.Sprintf("buildSnapshot: method completed in %d ms", time.Since(start).Milliseconds()))
}()
// Update status to snapshot creating with retries
if err := s.updateSnapshotWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{
UID: snapshotMeta.UID,
@ -179,6 +184,8 @@ func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedIn
return fmt.Errorf("nacl: generating public and private key: %w", err)
}
s.log.Debug(fmt.Sprintf("buildSnapshot: generated keys in %d ms", time.Since(start).Milliseconds()))
// Use GMS public key + the grafana generated private private key to encrypt snapshot files.
snapshotWriter, err := snapshot.NewSnapshotWriter(contracts.AssymetricKeys{
Public: []byte(snapshotMeta.EncryptionKey),
@ -191,11 +198,15 @@ func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedIn
return fmt.Errorf("instantiating snapshot writer: %w", err)
}
s.log.Debug(fmt.Sprintf("buildSnapshot: created snapshot writing in %d ms", time.Since(start).Milliseconds()))
migrationData, err := s.getMigrationDataJSON(ctx, signedInUser)
if err != nil {
return fmt.Errorf("fetching migration data: %w", err)
}
s.log.Debug(fmt.Sprintf("buildSnapshot: got migration data json in %d ms", time.Since(start).Milliseconds()))
localSnapshotResource := make([]cloudmigration.CloudMigrationResource, len(migrationData.Items))
resourcesGroupedByType := make(map[cloudmigration.MigrateDataType][]snapshot.MigrateDataRequestItemDTO, 0)
for i, item := range migrationData.Items {
@ -225,6 +236,8 @@ func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedIn
}
}
s.log.Debug(fmt.Sprintf("buildSnapshot: wrote data files in %d ms", time.Since(start).Milliseconds()))
// Add the grafana generated public key to the index file so gms can use it to decrypt the snapshot files later.
// This works because the snapshot files are being encrypted with
// the grafana generated private key + the gms public key.
@ -232,6 +245,8 @@ func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedIn
return fmt.Errorf("finishing writing snapshot files and generating index file: %w", err)
}
s.log.Debug(fmt.Sprintf("buildSnapshot: finished snapshot in %d ms", time.Since(start).Milliseconds()))
// update snapshot status to pending upload with retries
if err := s.updateSnapshotWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{
UID: snapshotMeta.UID,
@ -250,6 +265,11 @@ func (s *Service) uploadSnapshot(ctx context.Context, session *cloudmigration.Cl
s.buildSnapshotMutex.Lock()
defer s.buildSnapshotMutex.Unlock()
start := time.Now()
defer func() {
s.log.Debug(fmt.Sprintf("uploadSnapshot: method completed in %d ms", time.Since(start).Milliseconds()))
}()
// update snapshot status to uploading with retries
if err := s.updateSnapshotWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{
UID: snapshotMeta.UID,
@ -276,6 +296,8 @@ func (s *Service) uploadSnapshot(ctx context.Context, session *cloudmigration.Cl
return fmt.Errorf("reading index from file: %w", err)
}
s.log.Debug(fmt.Sprintf("uploadSnapshot: read index file in %d ms", time.Since(start).Milliseconds()))
// Upload the data files.
for _, fileNames := range index.Items {
for _, fileName := range fileNames {
@ -284,9 +306,12 @@ func (s *Service) uploadSnapshot(ctx context.Context, session *cloudmigration.Cl
if err := s.uploadUsingPresignedURL(ctx, uploadUrl, key, filePath); err != nil {
return fmt.Errorf("uploading snapshot file using presigned url: %w", err)
}
s.log.Debug(fmt.Sprintf("uploadSnapshot: uploaded %s in %d ms", fileName, time.Since(start).Milliseconds()))
}
}
s.log.Debug(fmt.Sprintf("uploadSnapshot: uploaded all data files in %d ms", time.Since(start).Milliseconds()))
// Upload the index file. Must be done after uploading the data files.
key := fmt.Sprintf("%d/snapshots/%s/%s", session.StackID, snapshotMeta.GMSSnapshotUID, "index.json")
if _, err := indexFile.Seek(0, 0); err != nil {
@ -297,6 +322,7 @@ func (s *Service) uploadSnapshot(ctx context.Context, session *cloudmigration.Cl
return fmt.Errorf("uploading file using presigned url: %w", err)
}
s.log.Debug(fmt.Sprintf("uploadSnapshot: uploaded index file in %d ms", time.Since(start).Milliseconds()))
s.log.Info("successfully uploaded snapshot", "snapshotUid", snapshotMeta.UID, "cloud_snapshotUid", snapshotMeta.GMSSnapshotUID)
// update snapshot status to processing with retries

View File

@ -37,15 +37,13 @@ type gmsClientImpl struct {
}
func (c *gmsClientImpl) ValidateKey(ctx context.Context, cm cloudmigration.CloudMigrationSession) (err error) {
logger := c.log.FromContext(ctx)
// TODO: there is a lot of boilerplate code in these methods, we should consolidate them when we have a gardening period
path := fmt.Sprintf("%s/api/v1/validate-key", c.buildBasePath(cm.ClusterSlug))
// validation is an empty POST to GMS with the authorization header included
req, err := http.NewRequest("POST", path, bytes.NewReader(nil))
if err != nil {
logger.Error("error creating http request for token validation", "err", err.Error())
c.log.Error("error creating http request for token validation", "err", err.Error())
return fmt.Errorf("http request error: %w", err)
}
req.Header.Set("Content-Type", "application/json")
@ -56,7 +54,7 @@ func (c *gmsClientImpl) ValidateKey(ctx context.Context, cm cloudmigration.Cloud
}
resp, err := client.Do(req)
if err != nil {
logger.Error("error sending http request for token validation", "err", err.Error())
c.log.Error("error sending http request for token validation", "err", err.Error())
return fmt.Errorf("http request error: %w", err)
}
defer func() {
@ -75,9 +73,6 @@ func (c *gmsClientImpl) ValidateKey(ctx context.Context, cm cloudmigration.Cloud
// Deprecated
func (c *gmsClientImpl) MigrateData(ctx context.Context, cm cloudmigration.CloudMigrationSession, request cloudmigration.MigrateDataRequest) (result *cloudmigration.MigrateDataResponse, err error) {
logger := c.log.FromContext(ctx)
// TODO update service url to gms
path := fmt.Sprintf("%s/api/v1/migrate-data", c.buildBasePath(cm.ClusterSlug))
reqDTO := convertRequestToDTO(request)
@ -114,7 +109,7 @@ func (c *gmsClientImpl) MigrateData(ctx context.Context, cm cloudmigration.Cloud
var respDTO MigrateDataResponseDTO
if err := json.NewDecoder(resp.Body).Decode(&respDTO); err != nil {
logger.Error("unmarshalling response body: %w", err)
c.log.Error("unmarshalling response body", "err", err.Error())
return nil, fmt.Errorf("unmarshalling migration run response: %w", err)
}
@ -141,19 +136,21 @@ func (c *gmsClientImpl) StartSnapshot(ctx context.Context, session cloudmigratio
if err != nil {
c.log.Error("error sending http request to start snapshot", "err", err.Error())
return nil, fmt.Errorf("http request error: %w", err)
} else if resp.StatusCode >= 400 {
c.log.Error("received error response for start snapshot", "statusCode", resp.StatusCode)
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("reading response body: %w", err)
}
return nil, fmt.Errorf("http request error: body=%s", string(body))
}
defer func() {
if closeErr := resp.Body.Close(); closeErr != nil {
err = errors.Join(err, fmt.Errorf("closing response body: %w", closeErr))
}
}()
if resp.StatusCode >= 400 {
body, _ := io.ReadAll(resp.Body)
c.log.Error("received error response to start snapshot", "statusCode", resp.StatusCode)
return nil, fmt.Errorf("http request error: body=%s %w", string(body), err)
}
var result cloudmigration.StartSnapshotResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("unmarshalling start snapshot response: %w", err)
@ -165,7 +162,6 @@ func (c *gmsClientImpl) StartSnapshot(ctx context.Context, session cloudmigratio
func (c *gmsClientImpl) GetSnapshotStatus(ctx context.Context, session cloudmigration.CloudMigrationSession, snapshot cloudmigration.CloudMigrationSnapshot, offset int) (*cloudmigration.GetSnapshotStatusResponse, error) {
c.getStatusMux.Lock()
defer c.getStatusMux.Unlock()
logger := c.log.FromContext(ctx)
path := fmt.Sprintf("%s/api/v1/snapshots/%s/status?offset=%d", c.buildBasePath(session.ClusterSlug), snapshot.GMSSnapshotUID, offset)
@ -187,19 +183,23 @@ func (c *gmsClientImpl) GetSnapshotStatus(ctx context.Context, session cloudmigr
c.log.Error("error sending http request to get snapshot status", "err", err.Error())
return nil, fmt.Errorf("http request error: %w", err)
} else if resp.StatusCode >= 400 {
c.log.Error("received error response to get snapshot status", "statusCode", resp.StatusCode)
return nil, fmt.Errorf("http request error: %w", err)
c.log.Error("received error response for get snapshot status", "statusCode", resp.StatusCode)
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("reading response body: %w", err)
}
return nil, fmt.Errorf("http request error: body=%s", string(body))
}
defer func() {
if err := resp.Body.Close(); err != nil {
logger.Error("closing request body: %w", err)
c.log.Error("closing request body", "err", err.Error())
}
}()
var result cloudmigration.GetSnapshotStatusResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
logger.Error("unmarshalling response body: %w", err)
c.log.Error("unmarshalling response body", "err", err.Error())
return nil, fmt.Errorf("unmarshalling get snapshot status response: %w", err)
}
@ -207,8 +207,6 @@ func (c *gmsClientImpl) GetSnapshotStatus(ctx context.Context, session cloudmigr
}
func (c *gmsClientImpl) CreatePresignedUploadUrl(ctx context.Context, session cloudmigration.CloudMigrationSession, snapshot cloudmigration.CloudMigrationSnapshot) (string, error) {
logger := c.log.FromContext(ctx)
path := fmt.Sprintf("%s/api/v1/snapshots/%s/create-upload-url", c.buildBasePath(session.ClusterSlug), snapshot.GMSSnapshotUID)
// Send the request to gms with the associated auth token
@ -229,18 +227,22 @@ func (c *gmsClientImpl) CreatePresignedUploadUrl(ctx context.Context, session cl
return "", fmt.Errorf("http request error: %w", err)
} else if resp.StatusCode >= 400 {
c.log.Error("received error response to create an upload url", "statusCode", resp.StatusCode)
return "", fmt.Errorf("http request error: %w", err)
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("reading response body: %w", err)
}
return "", fmt.Errorf("http request error: body=%s", string(body))
}
defer func() {
if err := resp.Body.Close(); err != nil {
logger.Error("closing request body: %w", err)
c.log.Error("closing request body", "err", err.Error())
}
}()
var result CreateSnapshotUploadUrlResponseDTO
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
logger.Error("unmarshalling response body: %w", err)
c.log.Error("unmarshalling response body", "err", err.Error())
return "", fmt.Errorf("unmarshalling create upload url response: %w", err)
}