diff --git a/pkg/services/cloudmigration/api/api.go b/pkg/services/cloudmigration/api/api.go index 1cd851a9717..055da380018 100644 --- a/pkg/services/cloudmigration/api/api.go +++ b/pkg/services/cloudmigration/api/api.go @@ -13,6 +13,8 @@ import ( contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model" "github.com/grafana/grafana/pkg/util" "github.com/grafana/grafana/pkg/web" + + "go.opentelemetry.io/otel/codes" ) type CloudMigrationAPI struct { @@ -78,6 +80,9 @@ func (cma *CloudMigrationAPI) GetToken(c *contextmodel.ReqContext) response.Resp token, err := cma.cloudMigrationService.GetToken(ctx) if err != nil { + span.SetStatus(codes.Error, "fetching cloud migration access token") + span.RecordError(err) + if !errors.Is(err, cloudmigration.ErrTokenNotFound) { logger.Error("fetching cloud migration access token", "err", err.Error()) } @@ -112,7 +117,10 @@ func (cma *CloudMigrationAPI) CreateToken(c *contextmodel.ReqContext) response.R resp, err := cma.cloudMigrationService.CreateToken(ctx) if err != nil { + span.SetStatus(codes.Error, "creating gcom access token") + span.RecordError(err) logger.Error("creating gcom access token", "err", err.Error()) + return response.ErrOrFallback(http.StatusInternalServerError, "creating gcom access token", err) } @@ -137,11 +145,17 @@ func (cma *CloudMigrationAPI) DeleteToken(c *contextmodel.ReqContext) response.R uid := web.Params(c.Req)[":uid"] if err := util.ValidateUID(uid); err != nil { + span.SetStatus(codes.Error, "invalid migration uid") + span.RecordError(err) + return response.Error(http.StatusBadRequest, "invalid migration uid", err) } if err := cma.cloudMigrationService.DeleteToken(ctx, uid); err != nil { + span.SetStatus(codes.Error, "deleting cloud migration token") + span.RecordError(err) logger.Error("deleting cloud migration token", "err", err.Error()) + return response.ErrOrFallback(http.StatusInternalServerError, "deleting cloud migration token", err) } @@ -163,6 +177,9 @@ func (cma *CloudMigrationAPI) GetSessionList(c *contextmodel.ReqContext) respons sl, err := cma.cloudMigrationService.GetSessionList(ctx) if err != nil { + span.SetStatus(codes.Error, "session list error") + span.RecordError(err) + return response.ErrOrFallback(http.StatusInternalServerError, "session list error", err) } @@ -185,11 +202,17 @@ func (cma *CloudMigrationAPI) GetSession(c *contextmodel.ReqContext) response.Re uid := web.Params(c.Req)[":uid"] if err := util.ValidateUID(uid); err != nil { + span.SetStatus(codes.Error, "invalid session uid") + span.RecordError(err) + return response.Error(http.StatusBadRequest, "invalid session uid", err) } s, err := cma.cloudMigrationService.GetSession(ctx, uid) if err != nil { + span.SetStatus(codes.Error, "session not found") + span.RecordError(err) + return response.ErrOrFallback(http.StatusNotFound, "session not found", err) } @@ -217,12 +240,18 @@ func (cma *CloudMigrationAPI) CreateSession(c *contextmodel.ReqContext) response cmd := CloudMigrationSessionRequestDTO{} if err := web.Bind(c.Req, &cmd); err != nil { + span.SetStatus(codes.Error, "bad request data") + span.RecordError(err) + return response.ErrOrFallback(http.StatusBadRequest, "bad request data", err) } s, err := cma.cloudMigrationService.CreateSession(ctx, cloudmigration.CloudMigrationSessionRequest{ AuthToken: cmd.AuthToken, }) if err != nil { + span.SetStatus(codes.Error, "session creation error") + span.RecordError(err) + return response.ErrOrFallback(http.StatusInternalServerError, "session creation error", err) } @@ -250,11 +279,17 @@ func (cma *CloudMigrationAPI) DeleteSession(c *contextmodel.ReqContext) response uid := web.Params(c.Req)[":uid"] if err := util.ValidateUID(uid); err != nil { + span.SetStatus(codes.Error, "invalid session uid") + span.RecordError(err) + return response.ErrOrFallback(http.StatusBadRequest, "invalid session uid", err) } _, err := cma.cloudMigrationService.DeleteSession(ctx, uid) if err != nil { + span.SetStatus(codes.Error, "session delete error") + span.RecordError(err) + return response.ErrOrFallback(http.StatusInternalServerError, "session delete error", err) } return response.Empty(http.StatusOK) @@ -278,11 +313,17 @@ func (cma *CloudMigrationAPI) CreateSnapshot(c *contextmodel.ReqContext) respons uid := web.Params(c.Req)[":uid"] if err := util.ValidateUID(uid); err != nil { + span.SetStatus(codes.Error, "invalid session uid") + span.RecordError(err) + return response.ErrOrFallback(http.StatusBadRequest, "invalid session uid", err) } ss, err := cma.cloudMigrationService.CreateSnapshot(ctx, c.SignedInUser, uid) if err != nil { + span.SetStatus(codes.Error, "error creating snapshot") + span.RecordError(err) + return response.ErrOrFallback(http.StatusInternalServerError, "error creating snapshot", err) } @@ -307,9 +348,15 @@ func (cma *CloudMigrationAPI) GetSnapshot(c *contextmodel.ReqContext) response.R sessUid, snapshotUid := web.Params(c.Req)[":uid"], web.Params(c.Req)[":snapshotUid"] if err := util.ValidateUID(sessUid); err != nil { + span.SetStatus(codes.Error, "invalid session uid") + span.RecordError(err) + return response.ErrOrFallback(http.StatusBadRequest, "invalid session uid", err) } if err := util.ValidateUID(snapshotUid); err != nil { + span.SetStatus(codes.Error, "invalid snapshot uid") + span.RecordError(err) + return response.ErrOrFallback(http.StatusBadRequest, "invalid snapshot uid", err) } @@ -327,6 +374,9 @@ func (cma *CloudMigrationAPI) GetSnapshot(c *contextmodel.ReqContext) response.R } snapshot, err := cma.cloudMigrationService.GetSnapshot(ctx, q) if err != nil { + span.SetStatus(codes.Error, "error retrieving snapshot") + span.RecordError(err) + return response.ErrOrFallback(http.StatusInternalServerError, "error retrieving snapshot", err) } @@ -386,6 +436,9 @@ func (cma *CloudMigrationAPI) GetSnapshotList(c *contextmodel.ReqContext) respon uid := web.Params(c.Req)[":uid"] if err := util.ValidateUID(uid); err != nil { + span.SetStatus(codes.Error, "invalid session uid") + span.RecordError(err) + return response.ErrOrFallback(http.StatusBadRequest, "invalid session uid", err) } q := cloudmigration.ListSnapshotsQuery{ @@ -403,6 +456,9 @@ func (cma *CloudMigrationAPI) GetSnapshotList(c *contextmodel.ReqContext) respon snapshotList, err := cma.cloudMigrationService.GetSnapshotList(ctx, q) if err != nil { + span.SetStatus(codes.Error, "error retrieving snapshot list") + span.RecordError(err) + return response.ErrOrFallback(http.StatusInternalServerError, "error retrieving snapshot list", err) } @@ -438,13 +494,22 @@ func (cma *CloudMigrationAPI) UploadSnapshot(c *contextmodel.ReqContext) respons sessUid, snapshotUid := web.Params(c.Req)[":uid"], web.Params(c.Req)[":snapshotUid"] if err := util.ValidateUID(sessUid); err != nil { + span.SetStatus(codes.Error, "invalid session uid") + span.RecordError(err) + return response.ErrOrFallback(http.StatusBadRequest, "invalid session uid", err) } if err := util.ValidateUID(snapshotUid); err != nil { + span.SetStatus(codes.Error, "invalid snapshot uid") + span.RecordError(err) + return response.ErrOrFallback(http.StatusBadRequest, "invalid snapshot uid", err) } if err := cma.cloudMigrationService.UploadSnapshot(ctx, sessUid, snapshotUid); err != nil { + span.SetStatus(codes.Error, "error uploading snapshot") + span.RecordError(err) + return response.ErrOrFallback(http.StatusInternalServerError, "error uploading snapshot", err) } @@ -468,13 +533,22 @@ func (cma *CloudMigrationAPI) CancelSnapshot(c *contextmodel.ReqContext) respons sessUid, snapshotUid := web.Params(c.Req)[":uid"], web.Params(c.Req)[":snapshotUid"] if err := util.ValidateUID(sessUid); err != nil { + span.SetStatus(codes.Error, "invalid session uid") + span.RecordError(err) + return response.ErrOrFallback(http.StatusBadRequest, "invalid session uid", err) } if err := util.ValidateUID(snapshotUid); err != nil { + span.SetStatus(codes.Error, "invalid snapshot uid") + span.RecordError(err) + return response.ErrOrFallback(http.StatusBadRequest, "invalid snapshot uid", err) } if err := cma.cloudMigrationService.CancelSnapshot(ctx, sessUid, snapshotUid); err != nil { + span.SetStatus(codes.Error, "error canceling snapshot") + span.RecordError(err) + return response.ErrOrFallback(http.StatusInternalServerError, "error canceling snapshot", err) } diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go index 886cb8565ef..90ee06b6d60 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go @@ -12,6 +12,7 @@ import ( "time" "github.com/google/uuid" + "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient" "github.com/grafana/grafana/pkg/api/routing" "github.com/grafana/grafana/pkg/infra/db" "github.com/grafana/grafana/pkg/infra/kvstore" @@ -35,6 +36,7 @@ import ( "github.com/grafana/grafana/pkg/util" "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" ) @@ -83,6 +85,7 @@ var _ cloudmigration.Service = (*Service)(nil) // builds the service, and api, and configures routes func ProvideService( cfg *setting.Cfg, + httpClientProvider *httpclient.Provider, features featuremgmt.FeatureToggles, db db.DB, dsService datasources.DataSourceService, @@ -118,15 +121,29 @@ func ProvideService( } s.api = api.RegisterApi(routeRegister, s, tracer) - s.objectStorage = objectstorage.NewS3() + httpClientS3, err := httpClientProvider.New() + if err != nil { + return nil, fmt.Errorf("creating http client for S3: %w", err) + } + s.objectStorage = objectstorage.NewS3(httpClientS3, tracer) if !cfg.CloudMigration.IsDeveloperMode { - c, err := gmsclient.NewGMSClient(cfg) + httpClientGMS, err := httpClientProvider.New() + if err != nil { + return nil, fmt.Errorf("creating http client for GMS: %w", err) + } + + c, err := gmsclient.NewGMSClient(cfg, httpClientGMS) if err != nil { return nil, fmt.Errorf("initializing GMS client: %w", err) } s.gmsClient = c - s.gcomService = gcom.New(gcom.Config{ApiURL: cfg.GrafanaComAPIURL, Token: cfg.CloudMigration.GcomAPIToken}) + + httpClientGcom, err := httpClientProvider.New() + if err != nil { + return nil, fmt.Errorf("creating http client for GCOM: %w", err) + } + s.gcomService = gcom.New(gcom.Config{ApiURL: cfg.GrafanaComAPIURL, Token: cfg.CloudMigration.GcomAPIToken}, httpClientGcom) } else { s.gmsClient = gmsclient.NewInMemoryClient() s.gcomService = &gcomStub{policies: map[string]gcom.AccessPolicy{}, token: nil} @@ -169,7 +186,8 @@ func (s *Service) GetToken(ctx context.Context) (gcom.TokenView, error) { RequestID: requestID, Region: instance.RegionSlug, AccessPolicyName: accessPolicyName, - TokenName: accessTokenName}) + TokenName: accessTokenName, + }) if err != nil { return gcom.TokenView{}, fmt.Errorf("listing tokens: %w", err) } @@ -279,9 +297,6 @@ func (s *Service) CreateToken(ctx context.Context) (cloudmigration.CreateAccessT } func (s *Service) findAccessPolicyByName(ctx context.Context, regionSlug, accessPolicyName string) (*gcom.AccessPolicy, error) { - ctx, span := s.tracer.Start(ctx, "CloudMigrationService.findAccessPolicyByName") - defer span.End() - accessPolicies, err := s.gcomService.ListAccessPolicies(ctx, gcom.ListAccessPoliciesParams{ RequestID: tracing.TraceIDFromContext(ctx, false), Region: regionSlug, @@ -341,7 +356,7 @@ func (s *Service) DeleteToken(ctx context.Context, tokenID string) error { } func (s *Service) GetSession(ctx context.Context, uid string) (*cloudmigration.CloudMigrationSession, error) { - ctx, span := s.tracer.Start(ctx, "CloudMigrationService.GetMigration") + ctx, span := s.tracer.Start(ctx, "CloudMigrationService.GetSession") defer span.End() migration, err := s.store.GetMigrationSessionByUID(ctx, uid) if err != nil { @@ -352,6 +367,9 @@ func (s *Service) GetSession(ctx context.Context, uid string) (*cloudmigration.C } func (s *Service) GetSessionList(ctx context.Context) (*cloudmigration.CloudMigrationSessionListResponse, error) { + ctx, span := s.tracer.Start(ctx, "CloudMigrationService.GetSessionList") + defer span.End() + values, err := s.store.GetCloudMigrationSessionList(ctx) if err != nil { return nil, fmt.Errorf("retrieving session list from store: %w", err) @@ -370,7 +388,7 @@ func (s *Service) GetSessionList(ctx context.Context) (*cloudmigration.CloudMigr } func (s *Service) CreateSession(ctx context.Context, cmd cloudmigration.CloudMigrationSessionRequest) (*cloudmigration.CloudMigrationSessionResponse, error) { - ctx, span := s.tracer.Start(ctx, "CloudMigrationService.createMigration") + ctx, span := s.tracer.Start(ctx, "CloudMigrationService.CreateSession") defer span.End() base64Token := cmd.AuthToken @@ -405,6 +423,9 @@ func (s *Service) CreateSession(ctx context.Context, cmd cloudmigration.CloudMig } func (s *Service) DeleteSession(ctx context.Context, sessionUID string) (*cloudmigration.CloudMigrationSession, error) { + ctx, span := s.tracer.Start(ctx, "CloudMigrationService.DeleteSession") + defer span.End() + session, snapshots, err := s.store.DeleteMigrationSessionByUID(ctx, sessionUID) if err != nil { s.report(ctx, session, gmsclient.EventDisconnect, 0, err) @@ -470,26 +491,36 @@ func (s *Service) CreateSnapshot(ctx context.Context, signedInUser *user.SignedI s.cancelMutex.Unlock() }() - ctx, cancelFunc := context.WithCancel(context.Background()) + // Create context out the span context to ensure the trace is propagated + asyncCtx := trace.ContextWithSpanContext(context.Background(), span.SpanContext()) + asyncCtx, asyncSpan := s.tracer.Start(asyncCtx, "CloudMigrationService.CreateSnapshotAsync") + defer asyncSpan.End() + + asyncCtx, cancelFunc := context.WithCancel(asyncCtx) s.cancelFunc = cancelFunc - s.report(ctx, session, gmsclient.EventStartBuildingSnapshot, 0, nil) + s.report(asyncCtx, session, gmsclient.EventStartBuildingSnapshot, 0, nil) start := time.Now() - err := s.buildSnapshot(ctx, signedInUser, initResp.MaxItemsPerPartition, initResp.Metadata, snapshot) + err := s.buildSnapshot(asyncCtx, signedInUser, initResp.MaxItemsPerPartition, initResp.Metadata, snapshot) if err != nil { + asyncSpan.SetStatus(codes.Error, "error building snapshot") + asyncSpan.RecordError(err) s.log.Error("building snapshot", "err", err.Error()) + // Update status to error with retries - if err := s.updateSnapshotWithRetries(context.Background(), cloudmigration.UpdateSnapshotCmd{ + if err := s.updateSnapshotWithRetries(asyncCtx, cloudmigration.UpdateSnapshotCmd{ UID: snapshot.UID, SessionID: sessionUid, Status: cloudmigration.SnapshotStatusError, }); err != nil { s.log.Error("critical failure during snapshot creation - please report any error logs") + asyncSpan.RecordError(err) } } - s.report(ctx, session, gmsclient.EventDoneBuildingSnapshot, time.Since(start), err) + span.SetStatus(codes.Ok, "snapshot built") + s.report(asyncCtx, session, gmsclient.EventDoneBuildingSnapshot, time.Since(start), err) }() return &snapshot, nil @@ -624,32 +655,48 @@ func (s *Service) UploadSnapshot(ctx context.Context, sessionUid string, snapsho s.cancelMutex.Unlock() }() - ctx, cancelFunc := context.WithCancel(context.Background()) - s.cancelFunc = cancelFunc + // Create context out the span context to ensure the trace is propagated + asyncCtx := trace.ContextWithSpanContext(context.Background(), span.SpanContext()) + asyncCtx, asyncSpan := s.tracer.Start(asyncCtx, "CloudMigrationService.UploadSnapshot") + defer asyncSpan.End() - s.report(ctx, session, gmsclient.EventStartUploadingSnapshot, 0, nil) + asyncCtx, s.cancelFunc = context.WithCancel(asyncCtx) + + s.report(asyncCtx, session, gmsclient.EventStartUploadingSnapshot, 0, nil) start := time.Now() - err := s.uploadSnapshot(ctx, session, snapshot, uploadUrl) + err := s.uploadSnapshot(asyncCtx, session, snapshot, uploadUrl) if err != nil { + asyncSpan.SetStatus(codes.Error, "error uploading snapshot") + asyncSpan.RecordError(err) + s.log.Error("uploading snapshot", "err", err.Error()) // Update status to error with retries - if err := s.updateSnapshotWithRetries(context.Background(), cloudmigration.UpdateSnapshotCmd{ + if err := s.updateSnapshotWithRetries(asyncCtx, cloudmigration.UpdateSnapshotCmd{ UID: snapshot.UID, SessionID: sessionUid, Status: cloudmigration.SnapshotStatusError, }); err != nil { + asyncSpan.RecordError(err) s.log.Error("critical failure during snapshot upload - please report any error logs") } } - s.report(ctx, session, gmsclient.EventDoneUploadingSnapshot, time.Since(start), err) + s.report(asyncCtx, session, gmsclient.EventDoneUploadingSnapshot, time.Since(start), err) }() return nil } func (s *Service) CancelSnapshot(ctx context.Context, sessionUid string, snapshotUid string) (err error) { + ctx, span := s.tracer.Start(ctx, "CloudMigrationService.CancelSnapshot", + trace.WithAttributes( + attribute.String("sessionUid", sessionUid), + attribute.String("snapshotUid", snapshotUid), + ), + ) + defer span.End() + // The cancel func itself is protected by a mutex in the async threads, so it may or may not be set by the time CancelSnapshot is called // Attempt to cancel and recover from the panic if the cancel function is nil defer func() { @@ -684,6 +731,9 @@ func (s *Service) report( d time.Duration, evtErr error, ) { + ctx, span := s.tracer.Start(ctx, "CloudMigrationService.report") + defer span.End() + id, err := s.getLocalEventId(ctx) if err != nil { s.log.Error("failed to report event", "type", t, "error", err.Error()) @@ -738,6 +788,9 @@ func (s *Service) getLocalEventId(ctx context.Context) (string, error) { } func (s *Service) deleteLocalFiles(snapshots []cloudmigration.CloudMigrationSnapshot) error { + _, span := s.tracer.Start(context.Background(), "CloudMigrationService.deleteLocalFiles") + defer span.End() + var err error for _, snapshot := range snapshots { err = os.RemoveAll(snapshot.LocalDir) diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go index ff068b68a0c..3667b7fd081 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/google/uuid" + "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient" "github.com/grafana/grafana/pkg/api/routing" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/db" @@ -644,6 +645,7 @@ func setUpServiceTest(t *testing.T, withDashboardMock bool) cloudmigration.Servi s, err := ProvideService( cfg, + httpclient.NewProvider(), featuremgmt.WithFeatures( featuremgmt.FlagOnPremToCloudMigrations, featuremgmt.FlagDashboardRestore), diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go b/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go index ece57dc92ed..48bf773d44b 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go @@ -23,9 +23,14 @@ import ( "github.com/grafana/grafana/pkg/services/user" "github.com/grafana/grafana/pkg/util/retryer" "golang.org/x/crypto/nacl/box" + + "go.opentelemetry.io/otel/codes" ) func (s *Service) getMigrationDataJSON(ctx context.Context, signedInUser *user.SignedInUser) (*cloudmigration.MigrateDataRequest, error) { + ctx, span := s.tracer.Start(ctx, "CloudMigrationService.getMigrationDataJSON") + defer span.End() + // Data sources dataSources, err := s.getDataSourceCommands(ctx) if err != nil { @@ -103,6 +108,9 @@ func (s *Service) getMigrationDataJSON(ctx context.Context, signedInUser *user.S } func (s *Service) getDataSourceCommands(ctx context.Context) ([]datasources.AddDataSourceCommand, error) { + ctx, span := s.tracer.Start(ctx, "CloudMigrationService.getDataSourceCommands") + defer span.End() + dataSources, err := s.dsService.GetAllDataSources(ctx, &datasources.GetAllDataSourcesQuery{}) if err != nil { s.log.Error("Failed to get all datasources", "err", err) @@ -141,6 +149,9 @@ func (s *Service) getDataSourceCommands(ctx context.Context) ([]datasources.AddD // getDashboardAndFolderCommands returns the json payloads required by the dashboard and folder creation APIs func (s *Service) getDashboardAndFolderCommands(ctx context.Context, signedInUser *user.SignedInUser) ([]dashboards.Dashboard, []folder.CreateFolderCommand, error) { + ctx, span := s.tracer.Start(ctx, "CloudMigrationService.getDashboardAndFolderCommands") + defer span.End() + dashs, err := s.dashboardService.GetAllDashboards(ctx) if err != nil { return nil, nil, err @@ -196,6 +207,9 @@ type libraryElement struct { // getLibraryElementsCommands returns the json payloads required by the library elements creation API func (s *Service) getLibraryElementsCommands(ctx context.Context, signedInUser *user.SignedInUser) ([]libraryElement, error) { + ctx, span := s.tracer.Start(ctx, "CloudMigrationService.getLibraryElementsCommands") + defer span.End() + const perPage = 100 cmds := make([]libraryElement, 0) @@ -242,6 +256,9 @@ func (s *Service) getLibraryElementsCommands(ctx context.Context, signedInUser * // asynchronous process for writing the snapshot to the filesystem and updating the snapshot status func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedInUser, maxItemsPerPartition uint32, metadata []byte, snapshotMeta cloudmigration.CloudMigrationSnapshot) error { + ctx, span := s.tracer.Start(ctx, "CloudMigrationService.buildSnapshot") + defer span.End() + // TODO -- make sure we can only build one snapshot at a time s.buildSnapshotMutex.Lock() defer s.buildSnapshotMutex.Unlock() @@ -339,6 +356,9 @@ func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedIn // asynchronous process for and updating the snapshot status func (s *Service) uploadSnapshot(ctx context.Context, session *cloudmigration.CloudMigrationSession, snapshotMeta *cloudmigration.CloudMigrationSnapshot, uploadUrl string) (err error) { + ctx, span := s.tracer.Start(ctx, "CloudMigrationService.uploadSnapshot") + defer span.End() + // TODO -- make sure we can only upload one snapshot at a time s.buildSnapshotMutex.Lock() defer s.buildSnapshotMutex.Unlock() @@ -361,37 +381,61 @@ func (s *Service) uploadSnapshot(ctx context.Context, session *cloudmigration.Cl } }() + _, readIndexSpan := s.tracer.Start(ctx, "CloudMigrationService.uploadSnapshot.readIndex") index, err := snapshot.ReadIndex(indexFile) if err != nil { + readIndexSpan.SetStatus(codes.Error, "reading index from file") + readIndexSpan.RecordError(err) + readIndexSpan.End() + return fmt.Errorf("reading index from file: %w", err) } + readIndexSpan.End() s.log.Debug(fmt.Sprintf("uploadSnapshot: read index file in %d ms", time.Since(start).Milliseconds())) + uploadCtx, uploadSpan := s.tracer.Start(ctx, "CloudMigrationService.uploadSnapshot.uploadDataFiles") // Upload the data files. for _, fileNames := range index.Items { for _, fileName := range fileNames { filePath := filepath.Join(snapshotMeta.LocalDir, fileName) key := fmt.Sprintf("%d/snapshots/%s/%s", session.StackID, snapshotMeta.GMSSnapshotUID, fileName) - if err := s.uploadUsingPresignedURL(ctx, uploadUrl, key, filePath); err != nil { + if err := s.uploadUsingPresignedURL(uploadCtx, uploadUrl, key, filePath); err != nil { + uploadSpan.SetStatus(codes.Error, "uploading snapshot data file using presigned url") + uploadSpan.RecordError(err) + uploadSpan.End() + return fmt.Errorf("uploading snapshot file using presigned url: %w", err) } s.log.Debug(fmt.Sprintf("uploadSnapshot: uploaded %s in %d ms", fileName, time.Since(start).Milliseconds())) } } + uploadSpan.End() s.log.Debug(fmt.Sprintf("uploadSnapshot: uploaded all data files in %d ms", time.Since(start).Milliseconds())) + uploadCtx, uploadSpan = s.tracer.Start(ctx, "CloudMigrationService.uploadSnapshot.uploadIndex") + // Upload the index file. Must be done after uploading the data files. key := fmt.Sprintf("%d/snapshots/%s/%s", session.StackID, snapshotMeta.GMSSnapshotUID, "index.json") if _, err := indexFile.Seek(0, 0); err != nil { + uploadSpan.SetStatus(codes.Error, "seeking to beginning of index file") + uploadSpan.RecordError(err) + uploadSpan.End() + return fmt.Errorf("seeking to beginning of index file: %w", err) } - if err := s.objectStorage.PresignedURLUpload(ctx, uploadUrl, key, indexFile); err != nil { + if err := s.objectStorage.PresignedURLUpload(uploadCtx, uploadUrl, key, indexFile); err != nil { + uploadSpan.SetStatus(codes.Error, "uploading index file using presigned url") + uploadSpan.RecordError(err) + uploadSpan.End() + return fmt.Errorf("uploading file using presigned url: %w", err) } + uploadSpan.End() + s.log.Debug(fmt.Sprintf("uploadSnapshot: uploaded index file in %d ms", time.Since(start).Milliseconds())) s.log.Info("successfully uploaded snapshot", "snapshotUid", snapshotMeta.UID, "cloud_snapshotUid", snapshotMeta.GMSSnapshotUID) @@ -408,6 +452,9 @@ func (s *Service) uploadSnapshot(ctx context.Context, session *cloudmigration.Cl } func (s *Service) uploadUsingPresignedURL(ctx context.Context, uploadURL, key string, filePath string) (err error) { + ctx, span := s.tracer.Start(ctx, "CloudMigrationService.uploadUsingPresignedURL") + defer span.End() + // The directory that contains the file can set in the configuration, therefore the directory can be any directory. // nolint:gosec file, err := os.Open(filePath) diff --git a/pkg/services/cloudmigration/gmsclient/gms_client.go b/pkg/services/cloudmigration/gmsclient/gms_client.go index 9cbe60d97e3..b041e4714af 100644 --- a/pkg/services/cloudmigration/gmsclient/gms_client.go +++ b/pkg/services/cloudmigration/gmsclient/gms_client.go @@ -18,19 +18,21 @@ import ( ) // NewGMSClient returns an implementation of Client that queries GrafanaMigrationService -func NewGMSClient(cfg *setting.Cfg) (Client, error) { +func NewGMSClient(cfg *setting.Cfg, httpClient *http.Client) (Client, error) { if cfg.CloudMigration.GMSDomain == "" { return nil, fmt.Errorf("missing GMS domain") } return &gmsClientImpl{ - cfg: cfg, - log: log.New(logPrefix), + cfg: cfg, + log: log.New(logPrefix), + httpClient: httpClient, }, nil } type gmsClientImpl struct { - cfg *setting.Cfg - log *log.ConcreteLogger + cfg *setting.Cfg + log *log.ConcreteLogger + httpClient *http.Client getStatusMux sync.Mutex getStatusLastQueried time.Time @@ -40,8 +42,11 @@ func (c *gmsClientImpl) ValidateKey(ctx context.Context, cm cloudmigration.Cloud // TODO: there is a lot of boilerplate code in these methods, we should consolidate them when we have a gardening period path := fmt.Sprintf("%s/api/v1/validate-key", c.buildBasePath(cm.ClusterSlug)) + ctx, cancel := context.WithTimeout(ctx, c.cfg.CloudMigration.GMSValidateKeyTimeout) + defer cancel() + // validation is an empty POST to GMS with the authorization header included - req, err := http.NewRequest("POST", path, bytes.NewReader(nil)) + req, err := http.NewRequestWithContext(ctx, "POST", path, bytes.NewReader(nil)) if err != nil { c.log.Error("error creating http request for token validation", "err", err.Error()) return fmt.Errorf("http request error: %w", err) @@ -49,10 +54,7 @@ func (c *gmsClientImpl) ValidateKey(ctx context.Context, cm cloudmigration.Cloud req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", fmt.Sprintf("Bearer %d:%s", cm.StackID, cm.AuthToken)) - client := &http.Client{ - Timeout: c.cfg.CloudMigration.GMSValidateKeyTimeout, - } - resp, err := client.Do(req) + resp, err := c.httpClient.Do(req) if err != nil { c.log.Error("error sending http request for token validation", "err", err.Error()) return fmt.Errorf("http request error: %w", err) @@ -74,8 +76,11 @@ func (c *gmsClientImpl) ValidateKey(ctx context.Context, cm cloudmigration.Cloud func (c *gmsClientImpl) StartSnapshot(ctx context.Context, session cloudmigration.CloudMigrationSession) (out *cloudmigration.StartSnapshotResponse, err error) { path := fmt.Sprintf("%s/api/v1/start-snapshot", c.buildBasePath(session.ClusterSlug)) + ctx, cancel := context.WithTimeout(ctx, c.cfg.CloudMigration.GMSStartSnapshotTimeout) + defer cancel() + // Send the request to cms with the associated auth token - req, err := http.NewRequest(http.MethodPost, path, nil) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, path, nil) if err != nil { c.log.Error("error creating http request to start snapshot", "err", err.Error()) return nil, fmt.Errorf("http request error: %w", err) @@ -83,10 +88,7 @@ func (c *gmsClientImpl) StartSnapshot(ctx context.Context, session cloudmigratio req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", fmt.Sprintf("Bearer %d:%s", session.StackID, session.AuthToken)) - client := &http.Client{ - Timeout: c.cfg.CloudMigration.GMSStartSnapshotTimeout, - } - resp, err := client.Do(req) + resp, err := c.httpClient.Do(req) if err != nil { c.log.Error("error sending http request to start snapshot", "err", err.Error()) return nil, fmt.Errorf("http request error: %w", err) @@ -119,8 +121,11 @@ func (c *gmsClientImpl) GetSnapshotStatus(ctx context.Context, session cloudmigr path := fmt.Sprintf("%s/api/v1/snapshots/%s/status?offset=%d", c.buildBasePath(session.ClusterSlug), snapshot.GMSSnapshotUID, offset) + ctx, cancel := context.WithTimeout(ctx, c.cfg.CloudMigration.GMSGetSnapshotStatusTimeout) + defer cancel() + // Send the request to gms with the associated auth token - req, err := http.NewRequest(http.MethodGet, path, nil) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, path, nil) if err != nil { c.log.Error("error creating http request to get snapshot status", "err", err.Error()) return nil, fmt.Errorf("http request error: %w", err) @@ -128,11 +133,8 @@ func (c *gmsClientImpl) GetSnapshotStatus(ctx context.Context, session cloudmigr req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", fmt.Sprintf("Bearer %d:%s", session.StackID, session.AuthToken)) - client := &http.Client{ - Timeout: c.cfg.CloudMigration.GMSGetSnapshotStatusTimeout, - } c.getStatusLastQueried = time.Now() - resp, err := client.Do(req) + resp, err := c.httpClient.Do(req) if err != nil { c.log.Error("error sending http request to get snapshot status", "err", err.Error()) return nil, fmt.Errorf("http request error: %w", err) @@ -163,8 +165,11 @@ func (c *gmsClientImpl) GetSnapshotStatus(ctx context.Context, session cloudmigr func (c *gmsClientImpl) CreatePresignedUploadUrl(ctx context.Context, session cloudmigration.CloudMigrationSession, snapshot cloudmigration.CloudMigrationSnapshot) (string, error) { path := fmt.Sprintf("%s/api/v1/snapshots/%s/create-upload-url", c.buildBasePath(session.ClusterSlug), snapshot.GMSSnapshotUID) + ctx, cancel := context.WithTimeout(ctx, c.cfg.CloudMigration.GMSCreateUploadUrlTimeout) + defer cancel() + // Send the request to gms with the associated auth token - req, err := http.NewRequest(http.MethodPost, path, nil) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, path, nil) if err != nil { c.log.Error("error creating http request to create upload url", "err", err.Error()) return "", fmt.Errorf("http request error: %w", err) @@ -172,10 +177,7 @@ func (c *gmsClientImpl) CreatePresignedUploadUrl(ctx context.Context, session cl req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", fmt.Sprintf("Bearer %d:%s", session.StackID, session.AuthToken)) - client := &http.Client{ - Timeout: c.cfg.CloudMigration.GMSCreateUploadUrlTimeout, - } - resp, err := client.Do(req) + resp, err := c.httpClient.Do(req) if err != nil { c.log.Error("error sending http request to create an upload url", "err", err.Error()) return "", fmt.Errorf("http request error: %w", err) @@ -208,6 +210,9 @@ func (c *gmsClientImpl) ReportEvent(ctx context.Context, session cloudmigration. return } + ctx, cancel := context.WithTimeout(ctx, c.cfg.CloudMigration.GMSReportEventTimeout) + defer cancel() + path := fmt.Sprintf("%s/api/v1/events", c.buildBasePath(session.ClusterSlug)) var buf bytes.Buffer @@ -216,7 +221,7 @@ func (c *gmsClientImpl) ReportEvent(ctx context.Context, session cloudmigration. return } // Send the request to gms with the associated auth token - req, err := http.NewRequest(http.MethodPost, path, &buf) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, path, &buf) if err != nil { c.log.Error("error creating http request to report event", "err", err.Error()) return @@ -224,10 +229,7 @@ func (c *gmsClientImpl) ReportEvent(ctx context.Context, session cloudmigration. req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", fmt.Sprintf("Bearer %d:%s", session.StackID, session.AuthToken)) - client := &http.Client{ - Timeout: c.cfg.CloudMigration.GMSReportEventTimeout, - } - resp, err := client.Do(req) + resp, err := c.httpClient.Do(req) if err != nil { c.log.Error("error sending http request for report event", "err", err.Error()) return diff --git a/pkg/services/cloudmigration/gmsclient/gms_client_test.go b/pkg/services/cloudmigration/gmsclient/gms_client_test.go index a2eb92d76cb..c8e769ecbc5 100644 --- a/pkg/services/cloudmigration/gmsclient/gms_client_test.go +++ b/pkg/services/cloudmigration/gmsclient/gms_client_test.go @@ -1,6 +1,7 @@ package gmsclient import ( + "net/http" "testing" "github.com/grafana/grafana/pkg/setting" @@ -16,7 +17,9 @@ func Test_buildBasePath(t *testing.T) { CloudMigration: setting.CloudMigrationSettings{ GMSDomain: "", }, - }) + }, + http.DefaultClient, + ) require.Error(t, err) // Domain is required @@ -24,7 +27,9 @@ func Test_buildBasePath(t *testing.T) { CloudMigration: setting.CloudMigrationSettings{ GMSDomain: "non-empty", }, - }) + }, + http.DefaultClient, + ) require.NoError(t, err) client := c.(*gmsClientImpl) diff --git a/pkg/services/cloudmigration/objectstorage/s3.go b/pkg/services/cloudmigration/objectstorage/s3.go index 32bfecfd4f5..2ae4a9fbfd6 100644 --- a/pkg/services/cloudmigration/objectstorage/s3.go +++ b/pkg/services/cloudmigration/objectstorage/s3.go @@ -9,15 +9,26 @@ import ( "mime/multipart" "net/http" "net/url" + + "github.com/grafana/grafana/pkg/infra/tracing" + + "go.opentelemetry.io/otel/attribute" ) -type S3 struct{} +type S3 struct { + httpClient *http.Client + tracer tracing.Tracer +} -func NewS3() *S3 { - return &S3{} +func NewS3(httpClient *http.Client, tracer tracing.Tracer) *S3 { + return &S3{httpClient: httpClient, tracer: tracer} } func (s3 *S3) PresignedURLUpload(ctx context.Context, presignedURL, key string, reader io.Reader) (err error) { + ctx, span := s3.tracer.Start(ctx, "objectstorage.S3.PresignedURLUpload") + span.SetAttributes(attribute.String("key", key)) + defer span.End() + url, err := url.Parse(presignedURL) if err != nil { return fmt.Errorf("parsing presigned url") @@ -68,13 +79,13 @@ func (s3 *S3) PresignedURLUpload(ctx context.Context, presignedURL, key string, endpoint := fmt.Sprintf("%s://%s%s", url.Scheme, url.Host, url.Path) - request, err := http.NewRequest(http.MethodPost, endpoint, buffer) + request, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, buffer) if err != nil { return fmt.Errorf("creating http request: %w", err) } request.Header.Set("Content-Type", writer.FormDataContentType()) - httpClient := http.Client{} - response, err := httpClient.Do(request) + + response, err := s3.httpClient.Do(request) if err != nil { return fmt.Errorf("sending http request: %w", err) } diff --git a/pkg/services/gcom/gcom.go b/pkg/services/gcom/gcom.go index c510db5c095..2db6dc7e1d2 100644 --- a/pkg/services/gcom/gcom.go +++ b/pkg/services/gcom/gcom.go @@ -138,11 +138,11 @@ type Config struct { Token string } -func New(cfg Config) Service { +func New(cfg Config, httpClient *http.Client) Service { return &GcomClient{ log: log.New(LogPrefix), cfg: cfg, - httpClient: &http.Client{}, + httpClient: httpClient, } } @@ -360,6 +360,7 @@ func (client *GcomClient) ListTokens(ctx context.Context, params ListTokenParams return body.Items, nil } + func (client *GcomClient) CreateToken(ctx context.Context, params CreateTokenParams, payload CreateTokenPayload) (Token, error) { endpoint, err := url.JoinPath(client.cfg.ApiURL, "/v1/tokens") if err != nil {