package cloudmigrationimpl import ( "bytes" "context" "encoding/base64" "encoding/json" "fmt" "net/http" "time" "github.com/grafana/grafana/pkg/api/routing" "github.com/grafana/grafana/pkg/infra/db" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/services/cloudmigration" "github.com/grafana/grafana/pkg/services/cloudmigration/api" "github.com/grafana/grafana/pkg/services/contexthandler" "github.com/grafana/grafana/pkg/services/dashboards" "github.com/grafana/grafana/pkg/services/datasources" "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/folder" "github.com/grafana/grafana/pkg/services/gcom" "github.com/grafana/grafana/pkg/services/secrets" "github.com/grafana/grafana/pkg/setting" "github.com/prometheus/client_golang/prometheus" ) // Service Define the cloudmigration.Service Implementation. type Service struct { store store log *log.ConcreteLogger cfg *setting.Cfg features featuremgmt.FeatureToggles dsService datasources.DataSourceService gcomService gcom.Service dashboardService dashboards.DashboardService folderService folder.Service secretsService secrets.Service api *api.CloudMigrationAPI tracer tracing.Tracer metrics *Metrics } var LogPrefix = "cloudmigration.service" const ( // nolint:gosec cloudMigrationAccessPolicyNamePrefix = "grafana-cloud-migrations" //nolint:gosec cloudMigrationTokenNamePrefix = "grafana-cloud-migrations" ) var _ cloudmigration.Service = (*Service)(nil) // ProvideService Factory for method used by wire to inject dependencies. // builds the service, and api, and configures routes func ProvideService( cfg *setting.Cfg, features featuremgmt.FeatureToggles, db db.DB, dsService datasources.DataSourceService, secretsService secrets.Service, routeRegister routing.RouteRegister, prom prometheus.Registerer, tracer tracing.Tracer, dashboardService dashboards.DashboardService, folderService folder.Service, ) cloudmigration.Service { if !features.IsEnabledGlobally(featuremgmt.FlagOnPremToCloudMigrations) { return &NoopServiceImpl{} } s := &Service{ store: &sqlStore{db: db, secretsService: secretsService}, log: log.New(LogPrefix), cfg: cfg, features: features, dsService: dsService, gcomService: gcom.New(gcom.Config{ApiURL: cfg.GrafanaComAPIURL, Token: cfg.CloudMigration.GcomAPIToken}), tracer: tracer, metrics: newMetrics(), secretsService: secretsService, dashboardService: dashboardService, folderService: folderService, } s.api = api.RegisterApi(routeRegister, s, tracer) if err := s.registerMetrics(prom, s.metrics); err != nil { s.log.Warn("error registering prom metrics", "error", err.Error()) } return s } func (s *Service) CreateToken(ctx context.Context) (cloudmigration.CreateAccessTokenResponse, error) { ctx, span := s.tracer.Start(ctx, "CloudMigrationService.CreateToken") defer span.End() logger := s.log.FromContext(ctx) requestID := tracing.TraceIDFromContext(ctx, false) timeoutCtx, cancel := context.WithTimeout(ctx, s.cfg.CloudMigration.FetchInstanceTimeout) defer cancel() instance, err := s.gcomService.GetInstanceByID(timeoutCtx, requestID, s.cfg.StackID) if err != nil { return cloudmigration.CreateAccessTokenResponse{}, fmt.Errorf("fetching instance by id: id=%s %w", s.cfg.StackID, err) } // Add the stack id to the access policy name to ensure access policies in a org have unique names. accessPolicyName := fmt.Sprintf("%s-%s", cloudMigrationAccessPolicyNamePrefix, s.cfg.StackID) accessPolicyDisplayName := fmt.Sprintf("%s-%s", s.cfg.Slug, cloudMigrationAccessPolicyNamePrefix) timeoutCtx, cancel = context.WithTimeout(ctx, s.cfg.CloudMigration.FetchAccessPolicyTimeout) defer cancel() existingAccessPolicy, err := s.findAccessPolicyByName(timeoutCtx, instance.RegionSlug, accessPolicyName) if err != nil { return cloudmigration.CreateAccessTokenResponse{}, fmt.Errorf("fetching access policy by name: name=%s %w", accessPolicyName, err) } if existingAccessPolicy != nil { timeoutCtx, cancel := context.WithTimeout(ctx, s.cfg.CloudMigration.DeleteAccessPolicyTimeout) defer cancel() if _, err := s.gcomService.DeleteAccessPolicy(timeoutCtx, gcom.DeleteAccessPolicyParams{ RequestID: requestID, AccessPolicyID: existingAccessPolicy.ID, Region: instance.RegionSlug, }); err != nil { return cloudmigration.CreateAccessTokenResponse{}, fmt.Errorf("deleting access policy: id=%s region=%s %w", existingAccessPolicy.ID, instance.RegionSlug, err) } logger.Info("deleted access policy", existingAccessPolicy.ID, "name", existingAccessPolicy.Name) } timeoutCtx, cancel = context.WithTimeout(ctx, s.cfg.CloudMigration.CreateAccessPolicyTimeout) defer cancel() accessPolicy, err := s.gcomService.CreateAccessPolicy(timeoutCtx, gcom.CreateAccessPolicyParams{ RequestID: requestID, Region: instance.RegionSlug, }, gcom.CreateAccessPolicyPayload{ Name: accessPolicyName, DisplayName: accessPolicyDisplayName, Realms: []gcom.Realm{{Type: "stack", Identifier: s.cfg.StackID, LabelPolicies: []gcom.LabelPolicy{}}}, Scopes: []string{"cloud-migrations:read", "cloud-migrations:write"}, }) if err != nil { return cloudmigration.CreateAccessTokenResponse{}, fmt.Errorf("creating access policy: %w", err) } logger.Info("created access policy", "id", accessPolicy.ID, "name", accessPolicy.Name) // Add the stack id to the token name to ensure tokens in a org have unique names. accessTokenName := fmt.Sprintf("%s-%s", cloudMigrationTokenNamePrefix, s.cfg.StackID) accessTokenDisplayName := fmt.Sprintf("%s-%s", s.cfg.Slug, cloudMigrationTokenNamePrefix) timeoutCtx, cancel = context.WithTimeout(ctx, s.cfg.CloudMigration.CreateTokenTimeout) defer cancel() token, err := s.gcomService.CreateToken(timeoutCtx, gcom.CreateTokenParams{RequestID: requestID, Region: instance.RegionSlug}, gcom.CreateTokenPayload{ AccessPolicyID: accessPolicy.ID, Name: accessTokenName, DisplayName: accessTokenDisplayName, ExpiresAt: time.Now().Add(s.cfg.CloudMigration.TokenExpiresAfter), }) if err != nil { return cloudmigration.CreateAccessTokenResponse{}, fmt.Errorf("creating access token: %w", err) } logger.Info("created access token", "id", token.ID, "name", token.Name) s.metrics.accessTokenCreated.With(prometheus.Labels{"slug": s.cfg.Slug}).Inc() bytes, err := json.Marshal(cloudmigration.Base64EncodedTokenPayload{ Token: token.Token, Instance: cloudmigration.Base64HGInstance{ StackID: instance.ID, RegionSlug: instance.RegionSlug, ClusterSlug: instance.ClusterSlug, // This should be used for routing to CMS Slug: instance.Slug, }, }) if err != nil { return cloudmigration.CreateAccessTokenResponse{}, fmt.Errorf("encoding token: %w", err) } return cloudmigration.CreateAccessTokenResponse{Token: base64.StdEncoding.EncodeToString(bytes)}, nil } func (s *Service) findAccessPolicyByName(ctx context.Context, regionSlug, accessPolicyName string) (*gcom.AccessPolicy, error) { ctx, span := s.tracer.Start(ctx, "CloudMigrationService.findAccessPolicyByName") defer span.End() accessPolicies, err := s.gcomService.ListAccessPolicies(ctx, gcom.ListAccessPoliciesParams{ RequestID: tracing.TraceIDFromContext(ctx, false), Region: regionSlug, Name: accessPolicyName, }) if err != nil { return nil, fmt.Errorf("listing access policies: name=%s region=%s :%w", accessPolicyName, regionSlug, err) } for _, accessPolicy := range accessPolicies { if accessPolicy.Name == accessPolicyName { return &accessPolicy, nil } } return nil, nil } func (s *Service) ValidateToken(ctx context.Context, cm cloudmigration.CloudMigration) error { ctx, span := s.tracer.Start(ctx, "CloudMigrationService.ValidateToken") defer span.End() logger := s.log.FromContext(ctx) // get CMS path from the config domain, err := s.ParseCloudMigrationConfig() if err != nil { return fmt.Errorf("config parse error: %w", err) } path := fmt.Sprintf("https://cms-dev-%s.%s/cloud-migrations/api/v1/validate-key", cm.ClusterSlug, domain) // validation is an empty POST to CMS with the authorization header included req, err := http.NewRequest("POST", path, bytes.NewReader(nil)) if err != nil { logger.Error("error creating http request for token validation", "err", err.Error()) return fmt.Errorf("http request error: %w", err) } req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", fmt.Sprintf("Bearer %d:%s", cm.StackID, cm.AuthToken)) client := &http.Client{} resp, err := client.Do(req) if err != nil { logger.Error("error sending http request for token validation", "err", err.Error()) return fmt.Errorf("http request error: %w", err) } defer func() { if err := resp.Body.Close(); err != nil { logger.Error("closing request body", "err", err.Error()) } }() if resp.StatusCode != 200 { var errResp map[string]any if err := json.NewDecoder(resp.Body).Decode(&errResp); err != nil { logger.Error("decoding error response", "err", err.Error()) } else { return fmt.Errorf("token validation failure: %v", errResp) } } return nil } func (s *Service) GetMigration(ctx context.Context, id int64) (*cloudmigration.CloudMigration, error) { ctx, span := s.tracer.Start(ctx, "CloudMigrationService.GetMigration") defer span.End() migration, err := s.store.GetMigration(ctx, id) if err != nil { return nil, err } return migration, nil } func (s *Service) GetMigrationList(ctx context.Context) (*cloudmigration.CloudMigrationListResponse, error) { values, err := s.store.GetAllCloudMigrations(ctx) if err != nil { return nil, err } migrations := make([]cloudmigration.CloudMigrationResponse, 0) for _, v := range values { migrations = append(migrations, cloudmigration.CloudMigrationResponse{ ID: v.ID, Stack: v.Stack, Created: v.Created, Updated: v.Updated, }) } return &cloudmigration.CloudMigrationListResponse{Migrations: migrations}, nil } func (s *Service) CreateMigration(ctx context.Context, cmd cloudmigration.CloudMigrationRequest) (*cloudmigration.CloudMigrationResponse, error) { ctx, span := s.tracer.Start(ctx, "CloudMigrationService.createMigration") defer span.End() base64Token := cmd.AuthToken b, err := base64.StdEncoding.DecodeString(base64Token) if err != nil { return nil, fmt.Errorf("token could not be decoded") } var token cloudmigration.Base64EncodedTokenPayload if err := json.Unmarshal(b, &token); err != nil { return nil, fmt.Errorf("invalid token") // don't want to leak info here } migration := token.ToMigration() // validate token against cms before saving if err := s.ValidateToken(ctx, migration); err != nil { return nil, fmt.Errorf("token validation: %w", err) } cm, err := s.store.CreateMigration(ctx, migration) if err != nil { return nil, fmt.Errorf("error creating migration: %w", err) } return &cloudmigration.CloudMigrationResponse{ ID: cm.ID, Stack: token.Instance.Slug, Created: cm.Created, Updated: cm.Updated, }, nil } func (s *Service) UpdateMigration(ctx context.Context, id int64, cm cloudmigration.CloudMigrationRequest) (*cloudmigration.CloudMigrationResponse, error) { // TODO: Implement method return nil, nil } func (s *Service) GetMigrationDataJSON(ctx context.Context, id int64) ([]byte, error) { var migrationDataSlice []cloudmigration.MigrateDataRequestItemDTO // Data sources dataSources, err := s.getDataSources(ctx, id) if err != nil { s.log.Error("Failed to get datasources", "err", err) return nil, err } for _, ds := range dataSources { migrationDataSlice = append(migrationDataSlice, cloudmigration.MigrateDataRequestItemDTO{ Type: cloudmigration.DatasourceDataType, RefID: ds.UID, Name: ds.Name, Data: ds, }) } // Dashboards dashboards, err := s.getDashboards(ctx, id) if err != nil { s.log.Error("Failed to get dashboards", "err", err) return nil, err } for _, dashboard := range dashboards { dashboard.Data.Del("id") migrationDataSlice = append(migrationDataSlice, cloudmigration.MigrateDataRequestItemDTO{ Type: cloudmigration.DashboardDataType, RefID: dashboard.UID, Name: dashboard.Title, Data: map[string]any{"dashboard": dashboard.Data}, }) } // Folders folders, err := s.getFolders(ctx, id) if err != nil { s.log.Error("Failed to get folders", "err", err) return nil, err } for _, f := range folders { migrationDataSlice = append(migrationDataSlice, cloudmigration.MigrateDataRequestItemDTO{ Type: cloudmigration.FolderDataType, RefID: f.UID, Name: f.Title, Data: f, }) } migrationData := cloudmigration.MigrateDataRequestDTO{ Items: migrationDataSlice, } result, err := json.Marshal(migrationData) if err != nil { s.log.Error("Failed to marshal datasources", "err", err) return nil, err } return result, nil } func (s *Service) getDataSources(ctx context.Context, id int64) ([]datasources.AddDataSourceCommand, error) { dataSources, err := s.dsService.GetAllDataSources(ctx, &datasources.GetAllDataSourcesQuery{}) if err != nil { s.log.Error("Failed to get all datasources", "err", err) return nil, err } result := []datasources.AddDataSourceCommand{} for _, dataSource := range dataSources { // Decrypt secure json to send raw credentials decryptedData, err := s.secretsService.DecryptJsonData(ctx, dataSource.SecureJsonData) if err != nil { s.log.Error("Failed to decrypt secure json data", "err", err) return nil, err } dataSourceCmd := datasources.AddDataSourceCommand{ OrgID: dataSource.OrgID, Name: dataSource.Name, Type: dataSource.Type, Access: dataSource.Access, URL: dataSource.URL, User: dataSource.User, Database: dataSource.Database, BasicAuth: dataSource.BasicAuth, BasicAuthUser: dataSource.BasicAuthUser, WithCredentials: dataSource.WithCredentials, IsDefault: dataSource.IsDefault, JsonData: dataSource.JsonData, SecureJsonData: decryptedData, ReadOnly: dataSource.ReadOnly, UID: dataSource.UID, } result = append(result, dataSourceCmd) } return result, err } func (s *Service) getFolders(ctx context.Context, id int64) ([]folder.Folder, error) { reqCtx := contexthandler.FromContext(ctx) folders, err := s.folderService.GetFolders(ctx, folder.GetFoldersQuery{ SignedInUser: reqCtx.SignedInUser, }) if err != nil { return nil, err } var result []folder.Folder for _, folder := range folders { result = append(result, *folder) } return result, nil } func (s *Service) getDashboards(ctx context.Context, id int64) ([]dashboards.Dashboard, error) { dashs, err := s.dashboardService.GetAllDashboards(ctx) if err != nil { return nil, err } var result []dashboards.Dashboard for _, dashboard := range dashs { result = append(result, *dashboard) } return result, nil } func (s *Service) SaveMigrationRun(ctx context.Context, cmr *cloudmigration.CloudMigrationRun) (int64, error) { cmr.Created = time.Now() cmr.Updated = time.Now() cmr.Finished = time.Now() err := s.store.SaveMigrationRun(ctx, cmr) if err != nil { s.log.Error("Failed to save migration run", "err", err) return -1, err } return cmr.ID, nil } func (s *Service) GetMigrationStatus(ctx context.Context, id string, runID string) (*cloudmigration.CloudMigrationRun, error) { cmr, err := s.store.GetMigrationStatus(ctx, id, runID) if err != nil { return nil, fmt.Errorf("retrieving migration status from db: %w", err) } return cmr, nil } func (s *Service) GetMigrationStatusList(ctx context.Context, migrationID string) ([]*cloudmigration.CloudMigrationRun, error) { cmrs, err := s.store.GetMigrationStatusList(ctx, migrationID) if err != nil { return nil, fmt.Errorf("retrieving migration statuses from db: %w", err) } return cmrs, nil } func (s *Service) DeleteMigration(ctx context.Context, id int64) (*cloudmigration.CloudMigration, error) { c, err := s.store.DeleteMigration(ctx, id) if err != nil { return c, fmt.Errorf("deleting migration from db: %w", err) } return c, nil } func (s *Service) ParseCloudMigrationConfig() (string, error) { if s.cfg == nil { return "", fmt.Errorf("cfg cannot be nil") } section := s.cfg.Raw.Section("cloud_migration") domain := section.Key("domain").MustString("") if domain == "" { return "", fmt.Errorf("cloudmigration domain not set") } return domain, nil }