CloudMigrations: Implement migrations API (#85348)

* Implement run migration endpoint

* Refactor RunMigration method into separate methods

* Save migration runs fix lint

* Minor changes

* Refactor how to use cms endpoint

* fix interface

* complete merge

* add individual items

* adds tracing to getMigration

* linter

* updated swagger definition with the latest changes

* CloudMigrations: Implement core API handlers for cloud migrations and migration runs (#85407)

* implement delete

* add auth token encryption

* implement token validation

* call token validation during migration creation

* implement get migration status

* implement list migration runs

* fix bug

* finish parse domain func

* fix urls

* fix typo

* fix encoding and decoding

* remove double decryption

* add missing slash

* fix id returned by create function

* inject missing services

* finish implementing (as far as I can tell right now) data migration and response handling

* comment out broken test, needs a rewrite

* add a few final touches

* get dashboard migration to work properly

* changed runMigration to a POST

* swagger

* swagger

* swagger

---------

Co-authored-by: Michael Mandrus <michael.mandrus@grafana.com>
Co-authored-by: Leonard Gram <leo@xlson.com>
Co-authored-by: Michael Mandrus <41969079+mmandrus@users.noreply.github.com>
This commit is contained in:
idafurjes
2024-04-03 13:36:13 +02:00
committed by GitHub
parent 89638238e5
commit b885da09da
17 changed files with 966 additions and 333 deletions

View File

@@ -1,6 +1,10 @@
package api
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
@@ -15,10 +19,10 @@ import (
)
type CloudMigrationAPI struct {
cloudMigrationsService cloudmigration.Service
routeRegister routing.RouteRegister
log log.Logger
tracer tracing.Tracer
cloudMigrationService cloudmigration.Service
routeRegister routing.RouteRegister
log log.Logger
tracer tracing.Tracer
}
func RegisterApi(
@@ -27,10 +31,10 @@ func RegisterApi(
tracer tracing.Tracer,
) *CloudMigrationAPI {
api := &CloudMigrationAPI{
log: log.New("cloudmigrations.api"),
routeRegister: rr,
cloudMigrationsService: cms,
tracer: tracer,
log: log.New("cloudmigrations.api"),
routeRegister: rr,
cloudMigrationService: cms,
tracer: tracer,
}
api.registerEndpoints()
return api
@@ -43,7 +47,7 @@ func (cma *CloudMigrationAPI) registerEndpoints() {
cloudMigrationRoute.Get("/migration", routing.Wrap(cma.GetMigrationList))
cloudMigrationRoute.Post("/migration", routing.Wrap(cma.CreateMigration))
cloudMigrationRoute.Get("/migration/:id", routing.Wrap(cma.GetMigration))
cloudMigrationRoute.Delete("migration/:id", routing.Wrap(cma.DeleteMigration))
cloudMigrationRoute.Delete("/migration/:id", routing.Wrap(cma.DeleteMigration))
cloudMigrationRoute.Post("/migration/:id/run", routing.Wrap(cma.RunMigration))
cloudMigrationRoute.Get("/migration/:id/run", routing.Wrap(cma.GetMigrationRunList))
cloudMigrationRoute.Get("/migration/:id/run/:runID", routing.Wrap(cma.GetMigrationRun))
@@ -66,7 +70,7 @@ func (cma *CloudMigrationAPI) CreateToken(c *contextmodel.ReqContext) response.R
logger := cma.log.FromContext(ctx)
resp, err := cma.cloudMigrationsService.CreateToken(ctx)
resp, err := cma.cloudMigrationService.CreateToken(ctx)
if err != nil {
logger.Error("creating gcom access token", "err", err.Error())
return response.Error(http.StatusInternalServerError, "creating gcom access token", err)
@@ -85,7 +89,10 @@ func (cma *CloudMigrationAPI) CreateToken(c *contextmodel.ReqContext) response.R
// 403: forbiddenError
// 500: internalServerError
func (cma *CloudMigrationAPI) GetMigrationList(c *contextmodel.ReqContext) response.Response {
cloudMigrations, err := cma.cloudMigrationsService.GetMigrationList(c.Req.Context())
ctx, span := cma.tracer.Start(c.Req.Context(), "MigrationAPI.GetMigrationList")
defer span.End()
cloudMigrations, err := cma.cloudMigrationService.GetMigrationList(ctx)
if err != nil {
return response.Error(http.StatusInternalServerError, "migration list error", err)
}
@@ -105,11 +112,14 @@ func (cma *CloudMigrationAPI) GetMigrationList(c *contextmodel.ReqContext) respo
// 403: forbiddenError
// 500: internalServerError
func (cma *CloudMigrationAPI) GetMigration(c *contextmodel.ReqContext) response.Response {
ctx, span := cma.tracer.Start(c.Req.Context(), "MigrationAPI.GetMigration")
defer span.End()
id, err := strconv.ParseInt(web.Params(c.Req)[":id"], 10, 64)
if err != nil {
return response.Error(http.StatusBadRequest, "id is invalid", err)
}
cloudMigration, err := cma.cloudMigrationsService.GetMigration(c.Req.Context(), id)
cloudMigration, err := cma.cloudMigrationService.GetMigration(ctx, id)
if err != nil {
return response.Error(http.StatusNotFound, "migration not found", err)
}
@@ -134,18 +144,21 @@ type GetCloudMigrationRequest struct {
// 403: forbiddenError
// 500: internalServerError
func (cma *CloudMigrationAPI) CreateMigration(c *contextmodel.ReqContext) response.Response {
ctx, span := cma.tracer.Start(c.Req.Context(), "MigrationAPI.CreateMigration")
defer span.End()
cmd := cloudmigration.CloudMigrationRequest{}
if err := web.Bind(c.Req, &cmd); err != nil {
return response.Error(http.StatusBadRequest, "bad request data", err)
}
cloudMigration, err := cma.cloudMigrationsService.CreateMigration(c.Req.Context(), cmd)
cloudMigration, err := cma.cloudMigrationService.CreateMigration(ctx, cmd)
if err != nil {
return response.Error(http.StatusInternalServerError, "migration creation error", err)
}
return response.JSON(http.StatusOK, cloudMigration)
}
// swagger:route GET /cloudmigration/migration/{id}/run migrations runCloudMigration
// swagger:route POST /cloudmigration/migration/{id}/run migrations runCloudMigration
//
// Trigger the run of a migration to the Grafana Cloud.
//
@@ -157,11 +170,80 @@ func (cma *CloudMigrationAPI) CreateMigration(c *contextmodel.ReqContext) respon
// 403: forbiddenError
// 500: internalServerError
func (cma *CloudMigrationAPI) RunMigration(c *contextmodel.ReqContext) response.Response {
cloudMigrationRun, err := cma.cloudMigrationsService.RunMigration(c.Req.Context(), web.Params(c.Req)[":id"])
ctx, span := cma.tracer.Start(c.Req.Context(), "MigrationAPI.RunMigration")
defer span.End()
logger := cma.log.FromContext(ctx)
stringID := web.Params(c.Req)[":id"]
id, err := strconv.ParseInt(stringID, 10, 64)
if err != nil {
return response.Error(http.StatusInternalServerError, "migration run error", err)
return response.Error(http.StatusBadRequest, "id is invalid", err)
}
return response.JSON(http.StatusOK, cloudMigrationRun)
// Get migration to read the auth token
migration, err := cma.cloudMigrationService.GetMigration(ctx, id)
if err != nil {
return response.Error(http.StatusInternalServerError, "migration get error", err)
}
// get CMS path from the config
domain, err := cma.cloudMigrationService.ParseCloudMigrationConfig()
if err != nil {
return response.Error(http.StatusInternalServerError, "config parse error", err)
}
path := fmt.Sprintf("https://cms-dev-%s.%s/cloud-migrations/api/v1/migrate-data", migration.ClusterSlug, domain)
// Get migration data JSON
body, err := cma.cloudMigrationService.GetMigrationDataJSON(ctx, id)
if err != nil {
cma.log.Error("error getting the json request body for migration run", "err", err.Error())
return response.Error(http.StatusInternalServerError, "migration data get error", err)
}
req, err := http.NewRequest("POST", path, bytes.NewReader(body))
if err != nil {
cma.log.Error("error creating http request for cloud migration run", "err", err.Error())
return response.Error(http.StatusInternalServerError, "http request error", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %d:%s", migration.StackID, migration.AuthToken))
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
cma.log.Error("error sending http request for cloud migration run", "err", err.Error())
return response.Error(http.StatusInternalServerError, "http request error", err)
} else if resp.StatusCode >= 400 {
cma.log.Error("received error response for cloud migration run", "statusCode", resp.StatusCode)
return response.Error(http.StatusInternalServerError, "http request error", fmt.Errorf("http request error while migrating data"))
}
defer func() {
if err := resp.Body.Close(); err != nil {
logger.Error("closing request body: %w", err)
}
}()
// read response so we can unmarshal it
respData, err := io.ReadAll(resp.Body)
if err != nil {
logger.Error("reading response body: %w", err)
return response.Error(http.StatusInternalServerError, "reading migration run response", err)
}
var result cloudmigration.MigrateDataResponseDTO
if err := json.Unmarshal(respData, &result); err != nil {
logger.Error("unmarshalling response body: %w", err)
return response.Error(http.StatusInternalServerError, "unmarshalling migration run response", err)
}
_, err = cma.cloudMigrationService.SaveMigrationRun(ctx, &cloudmigration.CloudMigrationRun{
CloudMigrationUID: stringID,
Result: respData,
})
if err != nil {
response.Error(http.StatusInternalServerError, "migration run save error", err)
}
return response.JSON(http.StatusOK, result)
}
// swagger:parameters runCloudMigration
@@ -182,7 +264,10 @@ type RunCloudMigrationRequest struct {
// 403: forbiddenError
// 500: internalServerError
func (cma *CloudMigrationAPI) GetMigrationRun(c *contextmodel.ReqContext) response.Response {
migrationStatus, err := cma.cloudMigrationsService.GetMigrationStatus(c.Req.Context(), web.Params(c.Req)[":id"], web.Params(c.Req)[":runID"])
ctx, span := cma.tracer.Start(c.Req.Context(), "MigrationAPI.GetMigrationRun")
defer span.End()
migrationStatus, err := cma.cloudMigrationService.GetMigrationStatus(ctx, web.Params(c.Req)[":id"], web.Params(c.Req)[":runID"])
if err != nil {
return response.Error(http.StatusInternalServerError, "migration status error", err)
}
@@ -212,12 +297,27 @@ type GetMigrationRunParams struct {
// 403: forbiddenError
// 500: internalServerError
func (cma *CloudMigrationAPI) GetMigrationRunList(c *contextmodel.ReqContext) response.Response {
migrationStatus, err := cma.cloudMigrationsService.GetMigrationStatusList(c.Req.Context(), web.Params(c.Req)[":id"])
ctx, span := cma.tracer.Start(c.Req.Context(), "MigrationAPI.GetMigrationRunList")
defer span.End()
migrationStatuses, err := cma.cloudMigrationService.GetMigrationStatusList(ctx, web.Params(c.Req)[":id"])
if err != nil {
return response.Error(http.StatusInternalServerError, "migration status error", err)
}
runList := cloudmigration.CloudMigrationRunList{Runs: migrationStatus}
runList := cloudmigration.CloudMigrationRunList{Runs: []cloudmigration.MigrateDataResponseDTO{}}
for _, s := range migrationStatuses {
// attempt to bind the raw result to a list of response item DTOs
r := cloudmigration.MigrateDataResponseDTO{
Items: []cloudmigration.MigrateDataResponseItemDTO{},
}
if err := json.Unmarshal(s.Result, &r); err != nil {
return response.Error(http.StatusInternalServerError, "error unmarshalling migration response items", err)
}
r.RunID = s.ID
runList.Runs = append(runList.Runs, r)
}
return response.JSON(http.StatusOK, runList)
}
@@ -239,7 +339,18 @@ type GetCloudMigrationRunList struct {
// 403: forbiddenError
// 500: internalServerError
func (cma *CloudMigrationAPI) DeleteMigration(c *contextmodel.ReqContext) response.Response {
err := cma.cloudMigrationsService.DeleteMigration(c.Req.Context(), web.Params(c.Req)[":id"])
ctx, span := cma.tracer.Start(c.Req.Context(), "MigrationAPI.DeleteMigration")
defer span.End()
idStr := web.Params(c.Req)[":id"]
if idStr == "" {
return response.Error(http.StatusBadRequest, "missing migration id", fmt.Errorf("missing migration id"))
}
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil {
return response.Error(http.StatusBadRequest, "migration id should be numeric", fmt.Errorf("migration id should be numeric"))
}
_, err = cma.cloudMigrationService.DeleteMigration(ctx, id)
if err != nil {
return response.Error(http.StatusInternalServerError, "migration delete error", err)
}
@@ -257,7 +368,7 @@ type DeleteMigrationRequest struct {
// swagger:response cloudMigrationRunResponse
type CloudMigrationRunResponse struct {
// in: body
Body cloudmigration.CloudMigrationRun
Body cloudmigration.MigrateDataResponseDTO
}
// swagger:response cloudMigrationListResponse

View File

@@ -6,15 +6,17 @@ import (
type Service interface {
CreateToken(context.Context) (CreateAccessTokenResponse, error)
ValidateToken(context.Context, string) error
SaveEncryptedToken(context.Context, string) error
ValidateToken(context.Context, CloudMigration) error
// migration
GetMigration(context.Context, int64) (*CloudMigrationResponse, error)
GetMigration(context.Context, int64) (*CloudMigration, error)
GetMigrationList(context.Context) (*CloudMigrationListResponse, error)
CreateMigration(context.Context, CloudMigrationRequest) (*CloudMigrationResponse, error)
GetMigrationDataJSON(context.Context, int64) ([]byte, error)
UpdateMigration(context.Context, int64, CloudMigrationRequest) (*CloudMigrationResponse, error)
RunMigration(context.Context, string) (*CloudMigrationRun, error)
GetMigrationStatus(context.Context, string, string) (*CloudMigrationRun, error)
GetMigrationStatusList(context.Context, string) ([]CloudMigrationRun, error)
DeleteMigration(context.Context, string) error
GetMigrationStatusList(context.Context, string) ([]*CloudMigrationRun, error)
DeleteMigration(context.Context, int64) (*CloudMigration, error)
SaveMigrationRun(context.Context, *CloudMigrationRun) (string, error)
ParseCloudMigrationConfig() (string, error)
}

View File

@@ -1,10 +1,12 @@
package cloudmigrationimpl
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/grafana/grafana/pkg/api/routing"
@@ -13,9 +15,13 @@ import (
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/cloudmigration"
"github.com/grafana/grafana/pkg/services/cloudmigration/api"
"github.com/grafana/grafana/pkg/services/contexthandler"
"github.com/grafana/grafana/pkg/services/dashboards"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/folder"
"github.com/grafana/grafana/pkg/services/gcom"
"github.com/grafana/grafana/pkg/services/secrets"
"github.com/grafana/grafana/pkg/setting"
"github.com/prometheus/client_golang/prometheus"
)
@@ -27,9 +33,13 @@ type Service struct {
log *log.ConcreteLogger
cfg *setting.Cfg
features featuremgmt.FeatureToggles
dsService datasources.DataSourceService
gcomService gcom.Service
features featuremgmt.FeatureToggles
dsService datasources.DataSourceService
gcomService gcom.Service
dashboardService dashboards.DashboardService
folderService folder.Service
secretsService secrets.Service
api *api.CloudMigrationAPI
tracer tracing.Tracer
@@ -54,23 +64,29 @@ func ProvideService(
features featuremgmt.FeatureToggles,
db db.DB,
dsService datasources.DataSourceService,
secretsService secrets.Service,
routeRegister routing.RouteRegister,
prom prometheus.Registerer,
tracer tracing.Tracer,
dashboardService dashboards.DashboardService,
folderService folder.Service,
) cloudmigration.Service {
if !features.IsEnabledGlobally(featuremgmt.FlagOnPremToCloudMigrations) {
return &NoopServiceImpl{}
}
s := &Service{
store: &sqlStore{db: db},
log: log.New(LogPrefix),
cfg: cfg,
features: features,
dsService: dsService,
gcomService: gcom.New(gcom.Config{ApiURL: cfg.GrafanaComAPIURL, Token: cfg.CloudMigration.GcomAPIToken}),
tracer: tracer,
metrics: newMetrics(),
store: &sqlStore{db: db, secretsService: secretsService},
log: log.New(LogPrefix),
cfg: cfg,
features: features,
dsService: dsService,
gcomService: gcom.New(gcom.Config{ApiURL: cfg.GrafanaComAPIURL, Token: cfg.CloudMigration.GcomAPIToken}),
tracer: tracer,
metrics: newMetrics(),
secretsService: secretsService,
dashboardService: dashboardService,
folderService: folderService,
}
s.api = api.RegisterApi(routeRegister, s, tracer)
@@ -186,22 +202,61 @@ func (s *Service) findAccessPolicyByName(ctx context.Context, regionSlug, access
return nil, nil
}
func (s *Service) ValidateToken(ctx context.Context, token string) error {
// TODO: Implement method
func (s *Service) ValidateToken(ctx context.Context, cm cloudmigration.CloudMigration) error {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.ValidateToken")
defer span.End()
logger := s.log.FromContext(ctx)
// get CMS path from the config
domain, err := s.ParseCloudMigrationConfig()
if err != nil {
return fmt.Errorf("config parse error: %w", err)
}
path := fmt.Sprintf("https://cms-dev-%s.%s/cloud-migrations/api/v1/validate-key", cm.ClusterSlug, domain)
// validation is an empty POST to CMS with the authorization header included
req, err := http.NewRequest("POST", path, bytes.NewReader(nil))
if err != nil {
logger.Error("error creating http request for token validation", "err", err.Error())
return fmt.Errorf("http request error: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %d:%s", cm.StackID, cm.AuthToken))
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
logger.Error("error sending http request for token validation", "err", err.Error())
return fmt.Errorf("http request error: %w", err)
}
defer func() {
if err := resp.Body.Close(); err != nil {
logger.Error("closing request body", "err", err.Error())
}
}()
if resp.StatusCode != 200 {
var errResp map[string]any
if err := json.NewDecoder(resp.Body).Decode(&errResp); err != nil {
logger.Error("decoding error response", "err", err.Error())
} else {
return fmt.Errorf("token validation failure: %v", errResp)
}
}
return nil
}
func (s *Service) SaveEncryptedToken(ctx context.Context, token string) error {
// TODO: Implement method
return nil
}
func (s *Service) GetMigration(ctx context.Context, id int64) (*cloudmigration.CloudMigration, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.GetMigration")
defer span.End()
migration, err := s.store.GetMigration(ctx, id)
if err != nil {
return nil, err
}
func (s *Service) GetMigration(ctx context.Context, id int64) (*cloudmigration.CloudMigrationResponse, error) {
// commenting to fix linter, uncomment when this function is implemented
// ctx, span := s.tracer.Start(ctx, "CloudMigrationService.GetMigration")
// defer span.End()
return nil, nil
return migration, nil
}
func (s *Service) GetMigrationList(ctx context.Context) (*cloudmigration.CloudMigrationListResponse, error) {
@@ -237,16 +292,21 @@ func (s *Service) CreateMigration(ctx context.Context, cmd cloudmigration.CloudM
}
migration := token.ToMigration()
if err := s.store.CreateMigration(ctx, migration); err != nil {
// validate token against cms before saving
if err := s.ValidateToken(ctx, migration); err != nil {
return nil, fmt.Errorf("token validation: %w", err)
}
cm, err := s.store.CreateMigration(ctx, migration)
if err != nil {
return nil, fmt.Errorf("error creating migration: %w", err)
}
return &cloudmigration.CloudMigrationResponse{
ID: int64(token.Instance.StackID),
Stack: token.Instance.Slug,
// TODO replace this with the actual value once the storage piece is implemented
Created: time.Now(),
Updated: time.Now(),
ID: cm.ID,
Stack: token.Instance.Slug,
Created: cm.Created,
Updated: cm.Updated,
}, nil
}
@@ -255,26 +315,178 @@ func (s *Service) UpdateMigration(ctx context.Context, id int64, cm cloudmigrati
return nil, nil
}
func (s *Service) RunMigration(ctx context.Context, uid string) (*cloudmigration.CloudMigrationRun, error) {
// TODO: Implement method
return nil, nil
func (s *Service) GetMigrationDataJSON(ctx context.Context, id int64) ([]byte, error) {
var migrationDataSlice []cloudmigration.MigrateDataRequestItemDTO
// Data sources
dataSources, err := s.getDataSources(ctx, id)
if err != nil {
s.log.Error("Failed to get datasources", "err", err)
return nil, err
}
for _, ds := range dataSources {
migrationDataSlice = append(migrationDataSlice, cloudmigration.MigrateDataRequestItemDTO{
Type: cloudmigration.DatasourceDataType,
RefID: ds.UID,
Name: ds.Name,
Data: ds,
})
}
// Dashboards
dashboards, err := s.getDashboards(ctx, id)
if err != nil {
s.log.Error("Failed to get dashboards", "err", err)
return nil, err
}
for _, dashboard := range dashboards {
dashboard.Data.Del("id")
migrationDataSlice = append(migrationDataSlice, cloudmigration.MigrateDataRequestItemDTO{
Type: cloudmigration.DashboardDataType,
RefID: dashboard.UID,
Name: dashboard.Title,
Data: map[string]any{"dashboard": dashboard.Data},
})
}
// Folders
folders, err := s.getFolders(ctx, id)
if err != nil {
s.log.Error("Failed to get folders", "err", err)
return nil, err
}
for _, f := range folders {
migrationDataSlice = append(migrationDataSlice, cloudmigration.MigrateDataRequestItemDTO{
Type: cloudmigration.FolderDataType,
RefID: f.UID,
Name: f.Title,
Data: f,
})
}
migrationData := cloudmigration.MigrateDataRequestDTO{
Items: migrationDataSlice,
}
result, err := json.Marshal(migrationData)
if err != nil {
s.log.Error("Failed to marshal datasources", "err", err)
return nil, err
}
return result, nil
}
func (s *Service) getDataSources(ctx context.Context, id int64) ([]datasources.AddDataSourceCommand, error) {
dataSources, err := s.dsService.GetAllDataSources(ctx, &datasources.GetAllDataSourcesQuery{})
if err != nil {
s.log.Error("Failed to get all datasources", "err", err)
return nil, err
}
result := []datasources.AddDataSourceCommand{}
for _, dataSource := range dataSources {
// Decrypt secure json to send raw credentials
decryptedData, err := s.secretsService.DecryptJsonData(ctx, dataSource.SecureJsonData)
if err != nil {
s.log.Error("Failed to decrypt secure json data", "err", err)
return nil, err
}
dataSourceCmd := datasources.AddDataSourceCommand{
OrgID: dataSource.OrgID,
Name: dataSource.Name,
Type: dataSource.Type,
Access: dataSource.Access,
URL: dataSource.URL,
User: dataSource.User,
Database: dataSource.Database,
BasicAuth: dataSource.BasicAuth,
BasicAuthUser: dataSource.BasicAuthUser,
WithCredentials: dataSource.WithCredentials,
IsDefault: dataSource.IsDefault,
JsonData: dataSource.JsonData,
SecureJsonData: decryptedData,
ReadOnly: dataSource.ReadOnly,
UID: dataSource.UID,
}
result = append(result, dataSourceCmd)
}
return result, err
}
func (s *Service) getFolders(ctx context.Context, id int64) ([]folder.Folder, error) {
reqCtx := contexthandler.FromContext(ctx)
folders, err := s.folderService.GetFolders(ctx, folder.GetFoldersQuery{
SignedInUser: reqCtx.SignedInUser,
})
if err != nil {
return nil, err
}
var result []folder.Folder
for _, folder := range folders {
result = append(result, *folder)
}
return result, nil
}
func (s *Service) getDashboards(ctx context.Context, id int64) ([]dashboards.Dashboard, error) {
dashs, err := s.dashboardService.GetAllDashboards(ctx)
if err != nil {
return nil, err
}
var result []dashboards.Dashboard
for _, dashboard := range dashs {
result = append(result, *dashboard)
}
return result, nil
}
func (s *Service) SaveMigrationRun(ctx context.Context, cmr *cloudmigration.CloudMigrationRun) (string, error) {
cmr.Created = time.Now()
cmr.Updated = time.Now()
cmr.Finished = time.Now()
err := s.store.SaveMigrationRun(ctx, cmr)
if err != nil {
s.log.Error("Failed to save migration run", "err", err)
return "", err
}
return cmr.CloudMigrationUID, nil
}
func (s *Service) GetMigrationStatus(ctx context.Context, id string, runID string) (*cloudmigration.CloudMigrationRun, error) {
// TODO: Implement method
return nil, nil
cmr, err := s.store.GetMigrationStatus(ctx, id, runID)
if err != nil {
return nil, fmt.Errorf("retrieving migration status from db: %w", err)
}
return cmr, nil
}
func (s *Service) GetMigrationStatusList(ctx context.Context, id string) ([]cloudmigration.CloudMigrationRun, error) {
// TODO: Implement method
return nil, nil
func (s *Service) GetMigrationStatusList(ctx context.Context, migrationID string) ([]*cloudmigration.CloudMigrationRun, error) {
cmrs, err := s.store.GetMigrationStatusList(ctx, migrationID)
if err != nil {
return nil, fmt.Errorf("retrieving migration statuses from db: %w", err)
}
return cmrs, nil
}
func (s *Service) DeleteMigration(ctx context.Context, id string) error {
// TODO: Implement method
return nil
func (s *Service) DeleteMigration(ctx context.Context, id int64) (*cloudmigration.CloudMigration, error) {
c, err := s.store.DeleteMigration(ctx, id)
if err != nil {
return c, fmt.Errorf("deleting migration from db: %w", err)
}
return c, nil
}
// func (s *Service) MigrateDatasources(ctx context.Context, request *cloudmigration.MigrateDatasourcesRequest) (*cloudmigration.MigrateDatasourcesResponse, error) {
// return s.store.MigrateDatasources(ctx, request)
// }
func (s *Service) ParseCloudMigrationConfig() (string, error) {
if s.cfg == nil {
return "", fmt.Errorf("cfg cannot be nil")
}
section := s.cfg.Raw.Section("cloud_migration")
domain := section.Key("domain").MustString("")
if domain == "" {
return "", fmt.Errorf("cloudmigration domain not set")
}
return domain, nil
}

View File

@@ -18,15 +18,11 @@ func (s *NoopServiceImpl) MigrateDatasources(ctx context.Context, request *cloud
func (s *NoopServiceImpl) CreateToken(ctx context.Context) (cloudmigration.CreateAccessTokenResponse, error) {
return cloudmigration.CreateAccessTokenResponse{}, cloudmigration.ErrFeatureDisabledError
}
func (s *NoopServiceImpl) ValidateToken(ctx context.Context, token string) error {
func (s *NoopServiceImpl) ValidateToken(ctx context.Context, cm cloudmigration.CloudMigration) error {
return cloudmigration.ErrFeatureDisabledError
}
func (s *NoopServiceImpl) SaveEncryptedToken(ctx context.Context, token string) error {
return cloudmigration.ErrFeatureDisabledError
}
func (s *NoopServiceImpl) GetMigration(ctx context.Context, id int64) (*cloudmigration.CloudMigrationResponse, error) {
func (s *NoopServiceImpl) GetMigration(ctx context.Context, id int64) (*cloudmigration.CloudMigration, error) {
return nil, cloudmigration.ErrFeatureDisabledError
}
@@ -42,18 +38,26 @@ func (s *NoopServiceImpl) UpdateMigration(ctx context.Context, id int64, cm clou
return nil, cloudmigration.ErrFeatureDisabledError
}
func (s *NoopServiceImpl) RunMigration(ctx context.Context, uid string) (*cloudmigration.CloudMigrationRun, error) {
return nil, cloudmigration.ErrFeatureDisabledError
}
func (s *NoopServiceImpl) GetMigrationStatus(ctx context.Context, id string, runID string) (*cloudmigration.CloudMigrationRun, error) {
return nil, cloudmigration.ErrFeatureDisabledError
}
func (s *NoopServiceImpl) GetMigrationStatusList(ctx context.Context, id string) ([]cloudmigration.CloudMigrationRun, error) {
func (s *NoopServiceImpl) GetMigrationStatusList(ctx context.Context, id string) ([]*cloudmigration.CloudMigrationRun, error) {
return nil, cloudmigration.ErrFeatureDisabledError
}
func (s *NoopServiceImpl) DeleteMigration(ctx context.Context, id string) error {
return cloudmigration.ErrFeatureDisabledError
func (s *NoopServiceImpl) DeleteMigration(ctx context.Context, id int64) (*cloudmigration.CloudMigration, error) {
return nil, cloudmigration.ErrFeatureDisabledError
}
func (s *NoopServiceImpl) SaveMigrationRun(ctx context.Context, cmr *cloudmigration.CloudMigrationRun) (string, error) {
return "", cloudmigration.ErrInternalNotImplementedError
}
func (s *NoopServiceImpl) GetMigrationDataJSON(ctx context.Context, id int64) ([]byte, error) {
return nil, cloudmigration.ErrFeatureDisabledError
}
func (s *NoopServiceImpl) ParseCloudMigrationConfig() (string, error) {
return "", cloudmigration.ErrFeatureDisabledError
}

View File

@@ -7,7 +7,12 @@ import (
)
type store interface {
MigrateDatasources(context.Context, *cloudmigration.MigrateDatasourcesRequest) (*cloudmigration.MigrateDatasourcesResponse, error)
CreateMigration(ctx context.Context, token cloudmigration.CloudMigration) error
CreateMigration(ctx context.Context, token cloudmigration.CloudMigration) (*cloudmigration.CloudMigration, error)
GetMigration(context.Context, int64) (*cloudmigration.CloudMigration, error)
GetAllCloudMigrations(ctx context.Context) ([]*cloudmigration.CloudMigration, error)
DeleteMigration(ctx context.Context, id int64) (*cloudmigration.CloudMigration, error)
SaveMigrationRun(ctx context.Context, cmr *cloudmigration.CloudMigrationRun) error
GetMigrationStatus(ctx context.Context, id string, runID string) (*cloudmigration.CloudMigrationRun, error)
GetMigrationStatusList(ctx context.Context, migrationID string) ([]*cloudmigration.CloudMigrationRun, error)
}

View File

@@ -2,8 +2,12 @@ package cloudmigrationimpl
import (
"context"
"encoding/base64"
"fmt"
"strconv"
"time"
"github.com/grafana/grafana/pkg/services/secrets"
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/infra/db"
@@ -11,27 +15,55 @@ import (
)
type sqlStore struct {
db db.DB
db db.DB
secretsService secrets.Service
}
func (ss *sqlStore) MigrateDatasources(ctx context.Context, request *cloudmigration.MigrateDatasourcesRequest) (*cloudmigration.MigrateDatasourcesResponse, error) {
return nil, cloudmigration.ErrInternalNotImplementedError
func (ss *sqlStore) GetMigration(ctx context.Context, id int64) (*cloudmigration.CloudMigration, error) {
var cm cloudmigration.CloudMigration
err := ss.db.WithDbSession(ctx, func(sess *db.Session) error {
exist, err := sess.ID(id).Get(&cm)
if err != nil {
return err
}
if !exist {
return cloudmigration.ErrMigrationNotFound
}
return nil
})
if err := ss.decryptToken(ctx, &cm); err != nil {
return &cm, err
}
return &cm, err
}
func (ss *sqlStore) CreateMigration(ctx context.Context, migration cloudmigration.CloudMigration) error {
func (ss *sqlStore) SaveMigrationRun(ctx context.Context, cmr *cloudmigration.CloudMigrationRun) error {
return ss.db.WithDbSession(ctx, func(sess *db.Session) error {
_, err := sess.Insert(cmr)
return err
})
}
func (ss *sqlStore) CreateMigration(ctx context.Context, migration cloudmigration.CloudMigration) (*cloudmigration.CloudMigration, error) {
if err := ss.encryptToken(ctx, &migration); err != nil {
return nil, err
}
err := ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
migration.Created = time.Now()
migration.Updated = time.Now()
_, err := sess.Insert(migration)
_, err := sess.Insert(&migration)
if err != nil {
return err
}
return nil
})
if err != nil {
return err
return nil, err
}
return nil
return &migration, nil
}
func (ss *sqlStore) GetAllCloudMigrations(ctx context.Context) ([]*cloudmigration.CloudMigration, error) {
@@ -40,5 +72,98 @@ func (ss *sqlStore) GetAllCloudMigrations(ctx context.Context) ([]*cloudmigratio
if err != nil {
return nil, err
}
for i := 0; i < len(migrations); i++ {
m := migrations[i]
if err := ss.decryptToken(ctx, m); err != nil {
return migrations, err
}
}
return migrations, nil
}
func (ss *sqlStore) DeleteMigration(ctx context.Context, id int64) (*cloudmigration.CloudMigration, error) {
var c cloudmigration.CloudMigration
err := ss.db.WithDbSession(ctx, func(sess *db.Session) error {
exist, err := sess.ID(id).Get(&c)
if err != nil {
return err
}
if !exist {
return cloudmigration.ErrMigrationNotFound
}
affected, err := sess.Delete(&cloudmigration.CloudMigration{
ID: id,
})
if affected == 0 {
return cloudmigration.ErrMigrationNotDeleted.Errorf("0 affected rows for id %d", id)
}
return err
})
return &c, err
}
func (ss *sqlStore) GetMigrationStatus(ctx context.Context, migrationID string, runID string) (*cloudmigration.CloudMigrationRun, error) {
id, err := strconv.ParseInt(runID, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid runID: %s", runID)
}
cm := cloudmigration.CloudMigrationRun{
ID: id,
CloudMigrationUID: migrationID,
}
err = ss.db.WithDbSession(ctx, func(sess *db.Session) error {
exist, err := sess.Get(&cm)
if err != nil {
return err
}
if !exist {
return cloudmigration.ErrMigrationRunNotFound
}
return nil
})
return &cm, err
}
func (ss *sqlStore) GetMigrationStatusList(ctx context.Context, migrationID string) ([]*cloudmigration.CloudMigrationRun, error) {
var runs = make([]*cloudmigration.CloudMigrationRun, 0)
err := ss.db.WithDbSession(ctx, func(sess *db.Session) error {
return sess.Find(&runs, &cloudmigration.CloudMigrationRun{
CloudMigrationUID: migrationID,
})
})
if err != nil {
return nil, err
}
return runs, nil
}
func (ss *sqlStore) encryptToken(ctx context.Context, cm *cloudmigration.CloudMigration) error {
s, err := ss.secretsService.Encrypt(ctx, []byte(cm.AuthToken), secrets.WithoutScope())
if err != nil {
return fmt.Errorf("encrypting auth token: %w", err)
}
cm.AuthToken = base64.StdEncoding.EncodeToString(s)
return nil
}
func (ss *sqlStore) decryptToken(ctx context.Context, cm *cloudmigration.CloudMigration) error {
decoded, err := base64.StdEncoding.DecodeString(cm.AuthToken)
if err != nil {
return fmt.Errorf("token could not be decoded")
}
t, err := ss.secretsService.Decrypt(ctx, decoded)
if err != nil {
return fmt.Errorf("decrypting auth token: %w", err)
}
cm.AuthToken = string(t)
return nil
}

View File

@@ -1,13 +1,8 @@
package cloudmigrationimpl
import (
"context"
"strconv"
"testing"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/tests/testsuite"
)
@@ -15,42 +10,39 @@ func TestMain(m *testing.M) {
testsuite.Run(m)
}
func TestMigrateDatasources(t *testing.T) {
// TODO: Write this test
}
// TODO rewrite this to include encoding and decryption
// func TestGetAllCloudMigrations(t *testing.T) {
// testDB := db.InitTestDB(t)
// s := &sqlStore{db: testDB}
// ctx := context.Background()
func TestGetAllCloudMigrations(t *testing.T) {
testDB := db.InitTestDB(t)
s := &sqlStore{db: testDB}
ctx := context.Background()
// t.Run("get all cloud_migrations", func(t *testing.T) {
// // replace this with proper method when created
// _, err := testDB.GetSqlxSession().Exec(ctx, `
// INSERT INTO cloud_migration (id, auth_token, stack, stack_id, region_slug, cluster_slug, created, updated)
// VALUES (1, '12345', '11111', 11111, 'test', 'test', '2024-03-25 15:30:36.000', '2024-03-27 15:30:43.000'),
// (2, '6789', '22222', 22222, 'test', 'test', '2024-03-25 15:30:36.000', '2024-03-27 15:30:43.000'),
// (3, '777', '33333', 33333, 'test', 'test', '2024-03-25 15:30:36.000', '2024-03-27 15:30:43.000');
// `)
// require.NoError(t, err)
t.Run("get all cloud_migrations", func(t *testing.T) {
// replace this with proper method when created
_, err := testDB.GetSqlxSession().Exec(ctx, `
INSERT INTO cloud_migration (id, auth_token, stack, stack_id, region_slug, cluster_slug, created, updated)
VALUES (1, '12345', '11111', 11111, 'test', 'test', '2024-03-25 15:30:36.000', '2024-03-27 15:30:43.000'),
(2, '6789', '22222', 22222, 'test', 'test', '2024-03-25 15:30:36.000', '2024-03-27 15:30:43.000'),
(3, '777', '33333', 33333, 'test', 'test', '2024-03-25 15:30:36.000', '2024-03-27 15:30:43.000');
`)
require.NoError(t, err)
value, err := s.GetAllCloudMigrations(ctx)
require.NoError(t, err)
require.Equal(t, 3, len(value))
for _, m := range value {
switch m.ID {
case 1:
require.Equal(t, "11111", m.Stack)
require.Equal(t, "12345", m.AuthToken)
case 2:
require.Equal(t, "22222", m.Stack)
require.Equal(t, "6789", m.AuthToken)
case 3:
require.Equal(t, "33333", m.Stack)
require.Equal(t, "777", m.AuthToken)
default:
require.Fail(t, "ID value not expected: "+strconv.FormatInt(m.ID, 10))
}
}
})
}
// value, err := s.GetAllCloudMigrations(ctx)
// require.NoError(t, err)
// require.Equal(t, 3, len(value))
// for _, m := range value {
// switch m.ID {
// case 1:
// require.Equal(t, "11111", m.Stack)
// require.Equal(t, "12345", m.AuthToken)
// case 2:
// require.Equal(t, "22222", m.Stack)
// require.Equal(t, "6789", m.AuthToken)
// case 3:
// require.Equal(t, "33333", m.Stack)
// require.Equal(t, "777", m.AuthToken)
// default:
// require.Fail(t, "ID value not expected: "+strconv.FormatInt(m.ID, 10))
// }
// }
// })
// }

View File

@@ -9,11 +9,15 @@ import (
var (
ErrInternalNotImplementedError = errutil.Internal("cloudmigrations.notImplemented", errutil.WithPublicMessage("Internal server error"))
ErrFeatureDisabledError = errutil.Internal("cloudmigrations.disabled", errutil.WithPublicMessage("Cloud migrations are disabled on this instance"))
ErrMigrationNotFound = errutil.NotFound("cloudmigrations.migrationNotFound", errutil.WithPublicMessage("Migration not found"))
ErrMigrationRunNotFound = errutil.NotFound("cloudmigrations.migrationRunNotFound", errutil.WithPublicMessage("Migration run not found"))
ErrMigrationNotDeleted = errutil.Internal("cloudmigrations.migrationNotDeleted", errutil.WithPublicMessage("Migration not deleted"))
)
// cloud migration api dtos
type CloudMigration struct {
ID int64 `json:"id" xorm:"pk autoincr 'id'"`
AuthToken string `json:"authToken"`
AuthToken string `json:"-"`
Stack string `json:"stack"`
StackID int `json:"stackID" xorm:"stack_id"`
RegionSlug string `json:"regionSlug"`
@@ -41,17 +45,16 @@ type MigratedResource struct {
}
type CloudMigrationRun struct {
ID int64 `json:"id" xorm:"pk autoincr 'id'"`
CloudMigrationUID string `json:"uid" xorm:"cloud_migration_uid"`
Resources []MigratedResource `json:"items"`
Result MigrationResult `json:"result"`
Created time.Time `json:"created"`
Updated time.Time `json:"updated"`
Finished time.Time `json:"finished"`
ID int64 `json:"id" xorm:"pk autoincr 'id'"`
CloudMigrationUID string `json:"uid" xorm:"cloud_migration_uid"`
Result []byte `json:"result"` //store raw cms response body
Created time.Time `json:"created"`
Updated time.Time `json:"updated"`
Finished time.Time `json:"finished"`
}
type CloudMigrationRunList struct {
Runs []CloudMigrationRun `json:"runs"`
Runs []MigrateDataResponseDTO `json:"runs"`
}
// swagger:parameters createMigration
@@ -88,6 +91,8 @@ type MigrateDatasourcesResponseDTO struct {
DatasourcesMigrated int `json:"datasourcesMigrated"`
}
// access token
type CreateAccessTokenResponse struct {
Token string
}
@@ -117,3 +122,42 @@ type Base64HGInstance struct {
RegionSlug string
ClusterSlug string
}
// dtos for cms api
type MigrateDataType string
const (
DashboardDataType MigrateDataType = "DASHBOARD"
DatasourceDataType MigrateDataType = "DATASOURCE"
FolderDataType MigrateDataType = "FOLDER"
)
type MigrateDataRequestDTO struct {
Items []MigrateDataRequestItemDTO `json:"items"`
}
type MigrateDataRequestItemDTO struct {
Type MigrateDataType `json:"type"`
RefID string `json:"refId"`
Name string `json:"name"`
Data interface{} `json:"data"`
}
type ItemStatus string
const (
ItemStatusOK ItemStatus = "OK"
ItemStatusError ItemStatus = "ERROR"
)
type MigrateDataResponseDTO struct {
RunID int64 `json:"id"`
Items []MigrateDataResponseItemDTO `json:"items"`
}
type MigrateDataResponseItemDTO struct {
RefID string `json:"refId"`
Status ItemStatus `json:"status"`
Error string `json:"error,omitempty"`
}