CloudMigrations: Query Grafana Migration Status for status while the snapshot is in the cloud (#90314)

* implement querying gms for snapshot status

* add some documentation

* provide snapshot resources after snapshot is created

* add rate limiting to backend

* fix compilation error

* fix typo

* add unit tests

* finish merge

* lint

* swagger gen

* more testing

* remove duplicate test

* address a couple PR comments

* update switch statement to a map

* add timeouts to gms client through the http client

* remove extra whitespace

* put method back where it was so the PR is less confusing

* fix tests

* add todo

* fix final unit test
This commit is contained in:
Michael Mandrus 2024-07-15 09:22:57 -04:00 committed by GitHub
parent 5beaae8561
commit 542a1bf3ac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 532 additions and 125 deletions

View File

@ -1934,6 +1934,10 @@ is_target = false
gcom_api_token = ""
# How long to wait for a request sent to gms to start a snapshot to complete
start_snapshot_timeout = 5s
# How long to wait for a request sent to gms to validate a key to complete
validate_key_timeout = 5s
# How long to wait for a request sent to gms to get a snapshot status to complete
get_snapshot_status_timeout = 5s
# How long to wait for a request to fetch an instance to complete
fetch_instance_timeout = 5s
# How long to wait for a request to create an access policy to complete

View File

@ -1858,3 +1858,28 @@ timeout = 30s
[public_dashboards]
# Set to false to disable public dashboards
;enabled = true
###################################### Cloud Migration ######################################
[cloud_migration]
# Set to true to enable target-side migration UI
;is_target = false
# Token used to send requests to grafana com
;gcom_api_token = ""
# How long to wait for a request sent to gms to start a snapshot to complete
;start_snapshot_timeout = 5s
# How long to wait for a request sent to gms to validate a key to complete
;validate_key_timeout = 5s
# How long to wait for a request sent to gms to get a snapshot status to complete
;get_snapshot_status_timeout = 5s
# How long to wait for a request to fetch an instance to complete
;fetch_instance_timeout = 5s
# How long to wait for a request to create an access policy to complete
;create_access_policy_timeout = 5s
# How long to wait for a request to create to fetch an access policy to complete
;fetch_access_policy_timeout = 5s
# How long to wait for a request to create to delete an access policy to complete
;delete_access_policy_timeout = 5s
# The domain name used to access cms
;domain = grafana-dev.net
# Folder used to store snapshot files. Defaults to the home dir
;snapshot_folder = ""

View File

@ -471,7 +471,7 @@ func TestCloudMigrationAPI_GetSnapshot(t *testing.T) {
requestUrl: "/api/cloudmigration/migration/1234/snapshot/1",
basicRole: org.RoleAdmin,
expectedHttpResult: http.StatusOK,
expectedBody: `{"uid":"fake_uid","status":"UNKNOWN","sessionUid":"1234","created":"0001-01-01T00:00:00Z","finished":"0001-01-01T00:00:00Z","results":[],"stats":{"types":{},"statuses":{}}}`,
expectedBody: `{"uid":"fake_uid","status":"CREATING","sessionUid":"1234","created":"0001-01-01T00:00:00Z","finished":"0001-01-01T00:00:00Z","results":[],"stats":{"types":{},"statuses":{}}}`,
},
{
desc: "should return 403 if no used is not admin",
@ -521,7 +521,7 @@ func TestCloudMigrationAPI_GetSnapshotList(t *testing.T) {
requestUrl: "/api/cloudmigration/migration/1234/snapshots",
basicRole: org.RoleAdmin,
expectedHttpResult: http.StatusOK,
expectedBody: `{"snapshots":[{"uid":"fake_uid","status":"UNKNOWN","sessionUid":"1234","created":"0001-01-01T00:00:00Z","finished":"0001-01-01T00:00:00Z"},{"uid":"fake_uid","status":"UNKNOWN","sessionUid":"1234","created":"0001-01-01T00:00:00Z","finished":"0001-01-01T00:00:00Z"}]}`,
expectedBody: `{"snapshots":[{"uid":"fake_uid","status":"CREATING","sessionUid":"1234","created":"0001-01-01T00:00:00Z","finished":"0001-01-01T00:00:00Z"},{"uid":"fake_uid","status":"CREATING","sessionUid":"1234","created":"0001-01-01T00:00:00Z","finished":"0001-01-01T00:00:00Z"}]}`,
},
{
desc: "should return 403 if no used is not admin",

View File

@ -224,14 +224,13 @@ const (
SnapshotStatusPendingProcessing SnapshotStatus = "PENDING_PROCESSING"
SnapshotStatusProcessing SnapshotStatus = "PROCESSING"
SnapshotStatusFinished SnapshotStatus = "FINISHED"
SnapshotStatusCanceled SnapshotStatus = "CANCELED"
SnapshotStatusError SnapshotStatus = "ERROR"
SnapshotStatusUnknown SnapshotStatus = "UNKNOWN"
)
func fromSnapshotStatus(status cloudmigration.SnapshotStatus) SnapshotStatus {
switch status {
case cloudmigration.SnapshotStatusInitializing:
return SnapshotStatusInitializing
case cloudmigration.SnapshotStatusCreating:
return SnapshotStatusCreating
case cloudmigration.SnapshotStatusPendingUpload:
@ -244,6 +243,8 @@ func fromSnapshotStatus(status cloudmigration.SnapshotStatus) SnapshotStatus {
return SnapshotStatusProcessing
case cloudmigration.SnapshotStatusFinished:
return SnapshotStatusFinished
case cloudmigration.SnapshotStatusCanceled:
return SnapshotStatusCanceled
case cloudmigration.SnapshotStatusError:
return SnapshotStatusError
default:

View File

@ -104,13 +104,11 @@ func ProvideService(
s.objectStorage = objectstorage.NewS3()
if !cfg.CloudMigration.IsDeveloperMode {
// get GMS path from the config
domain, err := s.parseCloudMigrationConfig()
c, err := gmsclient.NewGMSClient(cfg)
if err != nil {
return nil, fmt.Errorf("config parse error: %w", err)
return nil, fmt.Errorf("initializing GMS client: %w", err)
}
s.gmsClient = gmsclient.NewGMSClient(domain)
s.gmsClient = c
s.gcomService = gcom.New(gcom.Config{ApiURL: cfg.GrafanaComAPIURL, Token: cfg.CloudMigration.GcomAPIToken})
} else {
s.gmsClient = gmsclient.NewInMemoryClient()
@ -474,10 +472,8 @@ func (s *Service) CreateSnapshot(ctx context.Context, signedInUser *user.SignedI
return nil, fmt.Errorf("fetching migration session for uid %s: %w", sessionUid, err)
}
// query gms to establish new snapshot
timeoutCtx, cancel := context.WithTimeout(ctx, s.cfg.CloudMigration.StartSnapshotTimeout)
defer cancel()
initResp, err := s.gmsClient.StartSnapshot(timeoutCtx, *session)
// query gms to establish new snapshot s.cfg.CloudMigration.StartSnapshotTimeout
initResp, err := s.gmsClient.StartSnapshot(ctx, *session)
if err != nil {
return nil, fmt.Errorf("initializing snapshot with GMS for session %s: %w", sessionUid, err)
}
@ -489,7 +485,7 @@ func (s *Service) CreateSnapshot(ctx context.Context, signedInUser *user.SignedI
snapshot := cloudmigration.CloudMigrationSnapshot{
UID: util.GenerateShortUID(),
SessionUID: sessionUid,
Status: cloudmigration.SnapshotStatusInitializing,
Status: cloudmigration.SnapshotStatusCreating,
EncryptionKey: initResp.EncryptionKey,
UploadURL: initResp.UploadURL,
GMSSnapshotUID: initResp.SnapshotID,
@ -532,24 +528,43 @@ func (s *Service) GetSnapshot(ctx context.Context, query cloudmigration.GetSnaps
// ask GMS for status if it's in the cloud
snapshotMeta, err := s.gmsClient.GetSnapshotStatus(ctx, *session, *snapshot)
if err != nil {
return nil, fmt.Errorf("error fetching snapshot status from GMS: sessionUid: %s, snapshotUid: %s", sessionUid, snapshotUid)
return snapshot, fmt.Errorf("error fetching snapshot status from GMS: sessionUid: %s, snapshotUid: %s", sessionUid, snapshotUid)
}
if snapshotMeta.Status == cloudmigration.SnapshotStatusFinished {
// we need to update the snapshot in our db before reporting anything finished to the client
if err := s.store.UpdateSnapshot(ctx, cloudmigration.UpdateSnapshotCmd{
UID: snapshot.UID,
Status: cloudmigration.SnapshotStatusFinished,
Resources: snapshotMeta.Resources,
}); err != nil {
return nil, fmt.Errorf("error updating snapshot status: %w", err)
}
if snapshotMeta.State == cloudmigration.SnapshotStateUnknown {
// If a status from Grafana Migration Service is unavailable, return the snapshot as-is
return snapshot, nil
}
localStatus, ok := gmsStateToLocalStatus[snapshotMeta.State]
if !ok {
s.log.Error("unexpected GMS snapshot state: %s", snapshotMeta.State)
return snapshot, nil
}
// We need to update the snapshot in our db before reporting anything
if err := s.store.UpdateSnapshot(ctx, cloudmigration.UpdateSnapshotCmd{
UID: snapshot.UID,
Status: localStatus,
Resources: snapshotMeta.Results,
}); err != nil {
return nil, fmt.Errorf("error updating snapshot status: %w", err)
}
snapshot.Status = localStatus
snapshot.Resources = append(snapshot.Resources, snapshotMeta.Results...)
}
return snapshot, nil
}
var gmsStateToLocalStatus map[cloudmigration.SnapshotState]cloudmigration.SnapshotStatus = map[cloudmigration.SnapshotState]cloudmigration.SnapshotStatus{
cloudmigration.SnapshotStateInitialized: cloudmigration.SnapshotStatusPendingProcessing, // GMS has not yet received a notification for the data
cloudmigration.SnapshotStateProcessing: cloudmigration.SnapshotStatusProcessing, // GMS has received a notification and is migrating the data
cloudmigration.SnapshotStateFinished: cloudmigration.SnapshotStatusFinished, // GMS has completed the migration - all resources were attempted to be migrated
cloudmigration.SnapshotStateCanceled: cloudmigration.SnapshotStatusCanceled, // GMS has processed a cancelation request. Snapshot cancelation is not supported yet.
cloudmigration.SnapshotStateError: cloudmigration.SnapshotStatusError, // Something unrecoverable has occurred in the migration process.
}
func (s *Service) GetSnapshotList(ctx context.Context, query cloudmigration.ListSnapshotsQuery) ([]cloudmigration.CloudMigrationSnapshot, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.GetSnapshotList")
defer span.End()
@ -599,15 +614,3 @@ func (s *Service) UploadSnapshot(ctx context.Context, sessionUid string, snapsho
func (s *Service) CancelSnapshot(ctx context.Context, sessionUid string, snapshotUid string) error {
panic("not implemented")
}
func (s *Service) parseCloudMigrationConfig() (string, error) {
if s.cfg == nil {
return "", fmt.Errorf("cfg cannot be nil")
}
section := s.cfg.Raw.Section("cloud_migration")
domain := section.Key("domain").MustString("")
if domain == "" {
return "", fmt.Errorf("cloudmigration domain not set")
}
return domain, nil
}

View File

@ -31,12 +31,14 @@ import (
)
func Test_NoopServiceDoesNothing(t *testing.T) {
t.Parallel()
s := &NoopServiceImpl{}
_, e := s.CreateToken(context.Background())
assert.ErrorIs(t, e, cloudmigration.ErrFeatureDisabledError)
}
func Test_CreateGetAndDeleteToken(t *testing.T) {
t.Parallel()
s := setUpServiceTest(t, false)
createResp, err := s.CreateToken(context.Background())
@ -59,6 +61,7 @@ func Test_CreateGetAndDeleteToken(t *testing.T) {
}
func Test_CreateGetRunMigrationsAndRuns(t *testing.T) {
t.Parallel()
s := setUpServiceTest(t, true)
createTokenResp, err := s.CreateToken(context.Background())
@ -112,6 +115,245 @@ func Test_CreateGetRunMigrationsAndRuns(t *testing.T) {
require.NotNil(t, createResp.UID, delMigResp.UID)
}
func Test_GetSnapshotStatusFromGMS(t *testing.T) {
t.Parallel()
s := setUpServiceTest(t, false).(*Service)
gmsClientMock := &gmsClientMock{}
s.gmsClient = gmsClientMock
// Insert a session and snapshot into the database before we start
sess, err := s.store.CreateMigrationSession(context.Background(), cloudmigration.CloudMigrationSession{})
require.NoError(t, err)
uid, err := s.store.CreateSnapshot(context.Background(), cloudmigration.CloudMigrationSnapshot{
UID: "test uid",
SessionUID: sess.UID,
Status: cloudmigration.SnapshotStatusCreating,
GMSSnapshotUID: "gms uid",
})
require.NoError(t, err)
assert.Equal(t, "test uid", uid)
// Make sure status is coming from the db only
snapshot, err := s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{
SnapshotUID: uid,
SessionUID: sess.UID,
})
assert.NoError(t, err)
assert.Equal(t, cloudmigration.SnapshotStatusCreating, snapshot.Status)
assert.Equal(t, 0, gmsClientMock.getStatusCalled)
// Make the status pending processing and ensure GMS gets called
err = s.store.UpdateSnapshot(context.Background(), cloudmigration.UpdateSnapshotCmd{
UID: uid,
Status: cloudmigration.SnapshotStatusPendingProcessing,
})
assert.NoError(t, err)
cleanupFunc := func() {
gmsClientMock.getStatusCalled = 0
err = s.store.UpdateSnapshot(context.Background(), cloudmigration.UpdateSnapshotCmd{
UID: uid,
Status: cloudmigration.SnapshotStatusPendingProcessing,
})
assert.NoError(t, err)
}
t.Run("test case: gms snapshot initialized", func(t *testing.T) {
gmsClientMock.getSnapshotResponse = &cloudmigration.GetSnapshotStatusResponse{
State: cloudmigration.SnapshotStateInitialized,
}
snapshot, err = s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{
SnapshotUID: uid,
SessionUID: sess.UID,
})
assert.NoError(t, err)
assert.Equal(t, cloudmigration.SnapshotStatusPendingProcessing, snapshot.Status)
assert.Equal(t, 1, gmsClientMock.getStatusCalled)
t.Cleanup(cleanupFunc)
})
t.Run("test case: gms snapshot processing", func(t *testing.T) {
gmsClientMock.getSnapshotResponse = &cloudmigration.GetSnapshotStatusResponse{
State: cloudmigration.SnapshotStateProcessing,
}
snapshot, err = s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{
SnapshotUID: uid,
SessionUID: sess.UID,
})
assert.NoError(t, err)
assert.Equal(t, cloudmigration.SnapshotStatusProcessing, snapshot.Status)
assert.Equal(t, 1, gmsClientMock.getStatusCalled)
t.Cleanup(cleanupFunc)
})
t.Run("test case: gms snapshot finished", func(t *testing.T) {
gmsClientMock.getSnapshotResponse = &cloudmigration.GetSnapshotStatusResponse{
State: cloudmigration.SnapshotStateFinished,
}
snapshot, err = s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{
SnapshotUID: uid,
SessionUID: sess.UID,
})
assert.NoError(t, err)
assert.Equal(t, cloudmigration.SnapshotStatusFinished, snapshot.Status)
assert.Equal(t, 1, gmsClientMock.getStatusCalled)
t.Cleanup(cleanupFunc)
})
t.Run("test case: gms snapshot canceled", func(t *testing.T) {
gmsClientMock.getSnapshotResponse = &cloudmigration.GetSnapshotStatusResponse{
State: cloudmigration.SnapshotStateCanceled,
}
snapshot, err = s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{
SnapshotUID: uid,
SessionUID: sess.UID,
})
assert.NoError(t, err)
assert.Equal(t, cloudmigration.SnapshotStatusCanceled, snapshot.Status)
assert.Equal(t, 1, gmsClientMock.getStatusCalled)
t.Cleanup(cleanupFunc)
})
t.Run("test case: gms snapshot error", func(t *testing.T) {
gmsClientMock.getSnapshotResponse = &cloudmigration.GetSnapshotStatusResponse{
State: cloudmigration.SnapshotStateError,
}
snapshot, err = s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{
SnapshotUID: uid,
SessionUID: sess.UID,
})
assert.NoError(t, err)
assert.Equal(t, cloudmigration.SnapshotStatusError, snapshot.Status)
assert.Equal(t, 1, gmsClientMock.getStatusCalled)
t.Cleanup(cleanupFunc)
})
t.Run("test case: gms snapshot unknown", func(t *testing.T) {
gmsClientMock.getSnapshotResponse = &cloudmigration.GetSnapshotStatusResponse{
State: cloudmigration.SnapshotStateUnknown,
}
snapshot, err = s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{
SnapshotUID: uid,
SessionUID: sess.UID,
})
assert.NoError(t, err)
// snapshot status should remain unchanged
assert.Equal(t, cloudmigration.SnapshotStatusPendingProcessing, snapshot.Status)
assert.Equal(t, 1, gmsClientMock.getStatusCalled)
t.Cleanup(cleanupFunc)
})
t.Run("GMS results applied to local snapshot", func(t *testing.T) {
gmsClientMock.getSnapshotResponse = &cloudmigration.GetSnapshotStatusResponse{
State: cloudmigration.SnapshotStateFinished,
Results: []cloudmigration.CloudMigrationResource{
{
Type: cloudmigration.DatasourceDataType,
RefID: "A",
Status: cloudmigration.ItemStatusError,
Error: "fake",
},
},
}
snapshot, err = s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{
SnapshotUID: uid,
SessionUID: sess.UID,
})
assert.NoError(t, err)
assert.Equal(t, cloudmigration.SnapshotStatusFinished, snapshot.Status)
assert.Equal(t, 1, gmsClientMock.getStatusCalled)
assert.Len(t, snapshot.Resources, 1)
assert.Equal(t, gmsClientMock.getSnapshotResponse.Results[0], snapshot.Resources[0])
// ensure it is persisted
snapshot, err = s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{
SnapshotUID: uid,
SessionUID: sess.UID,
ResultPage: 1,
ResultLimit: 100,
})
assert.NoError(t, err)
assert.Equal(t, cloudmigration.SnapshotStatusFinished, snapshot.Status)
assert.Equal(t, 1, gmsClientMock.getStatusCalled) // shouldn't have queried GMS again now that it is finished
assert.Len(t, snapshot.Resources, 1)
assert.Equal(t, "A", snapshot.Resources[0].RefID)
assert.Equal(t, "fake", snapshot.Resources[0].Error)
t.Cleanup(cleanupFunc)
})
}
func Test_OnlyQueriesStatusFromGMSWhenRequired(t *testing.T) {
t.Parallel()
s := setUpServiceTest(t, false).(*Service)
gmsClientMock := &gmsClientMock{
getSnapshotResponse: &cloudmigration.GetSnapshotStatusResponse{
State: cloudmigration.SnapshotStateFinished,
},
}
s.gmsClient = gmsClientMock
// Insert a snapshot into the database before we start
sess, err := s.store.CreateMigrationSession(context.Background(), cloudmigration.CloudMigrationSession{})
require.NoError(t, err)
uid, err := s.store.CreateSnapshot(context.Background(), cloudmigration.CloudMigrationSnapshot{
UID: uuid.NewString(),
SessionUID: sess.UID,
Status: cloudmigration.SnapshotStatusCreating,
GMSSnapshotUID: "gms uid",
})
require.NoError(t, err)
// make sure GMS is not called when snapshot is creating, pending upload, uploading, finished, canceled, or errored
for _, status := range []cloudmigration.SnapshotStatus{
cloudmigration.SnapshotStatusCreating,
cloudmigration.SnapshotStatusPendingUpload,
cloudmigration.SnapshotStatusUploading,
cloudmigration.SnapshotStatusFinished,
cloudmigration.SnapshotStatusCanceled,
cloudmigration.SnapshotStatusError,
} {
err = s.store.UpdateSnapshot(context.Background(), cloudmigration.UpdateSnapshotCmd{
UID: uid,
Status: status,
})
assert.NoError(t, err)
_, err := s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{
SnapshotUID: uid,
SessionUID: sess.UID,
})
assert.NoError(t, err)
assert.Equal(t, 0, gmsClientMock.getStatusCalled)
}
// make sure GMS is called when snapshot is pending processing or processing
for i, status := range []cloudmigration.SnapshotStatus{
cloudmigration.SnapshotStatusPendingProcessing,
cloudmigration.SnapshotStatusProcessing,
} {
err = s.store.UpdateSnapshot(context.Background(), cloudmigration.UpdateSnapshotCmd{
UID: uid,
Status: status,
})
assert.NoError(t, err)
_, err := s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{
SnapshotUID: uid,
SessionUID: sess.UID,
})
assert.NoError(t, err)
assert.Equal(t, i+1, gmsClientMock.getStatusCalled)
}
}
func ctxWithSignedInUser() context.Context {
c := &contextmodel.ReqContext{
SignedInUser: &user.SignedInUser{OrgID: 1},
@ -136,8 +378,8 @@ func setUpServiceTest(t *testing.T, withDashboardMock bool) cloudmigration.Servi
require.NoError(t, err)
_, err = section.NewKey("domain", "localhost:1234")
require.NoError(t, err)
// dont know if this is the best, but dont want to refactor at the moment
cfg.CloudMigration.IsDeveloperMode = true
cfg.CloudMigration.IsDeveloperMode = true // ensure local implementations are used
cfg.CloudMigration.SnapshotFolder = filepath.Join(os.TempDir(), uuid.NewString())
dashboardService := dashboards.NewFakeDashboardService(t)
@ -176,3 +418,30 @@ func setUpServiceTest(t *testing.T, withDashboardMock bool) cloudmigration.Servi
return s
}
type gmsClientMock struct {
validateKeyCalled int
startSnapshotCalled int
getStatusCalled int
getSnapshotResponse *cloudmigration.GetSnapshotStatusResponse
}
func (m *gmsClientMock) ValidateKey(_ context.Context, _ cloudmigration.CloudMigrationSession) error {
m.validateKeyCalled++
return nil
}
func (m *gmsClientMock) MigrateData(_ context.Context, _ cloudmigration.CloudMigrationSession, _ cloudmigration.MigrateDataRequest) (*cloudmigration.MigrateDataResponse, error) {
panic("not implemented") // TODO: Implement
}
func (m *gmsClientMock) StartSnapshot(_ context.Context, _ cloudmigration.CloudMigrationSession) (*cloudmigration.StartSnapshotResponse, error) {
m.startSnapshotCalled++
return nil, nil
}
func (m *gmsClientMock) GetSnapshotStatus(_ context.Context, _ cloudmigration.CloudMigrationSession, _ cloudmigration.CloudMigrationSnapshot) (*cloudmigration.GetSnapshotStatusResponse, error) {
m.getStatusCalled++
return m.getSnapshotResponse, nil
}

View File

@ -137,7 +137,7 @@ func (m FakeServiceImpl) CreateSnapshot(ctx context.Context, user *user.SignedIn
return &cloudmigration.CloudMigrationSnapshot{
UID: "fake_uid",
SessionUID: sessionUid,
Status: cloudmigration.SnapshotStatusUnknown,
Status: cloudmigration.SnapshotStatusCreating,
}, nil
}
@ -148,7 +148,7 @@ func (m FakeServiceImpl) GetSnapshot(ctx context.Context, query cloudmigration.G
return &cloudmigration.CloudMigrationSnapshot{
UID: "fake_uid",
SessionUID: "fake_uid",
Status: cloudmigration.SnapshotStatusUnknown,
Status: cloudmigration.SnapshotStatusCreating,
}, nil
}
@ -160,12 +160,12 @@ func (m FakeServiceImpl) GetSnapshotList(ctx context.Context, query cloudmigrati
{
UID: "fake_uid",
SessionUID: query.SessionUID,
Status: cloudmigration.SnapshotStatusUnknown,
Status: cloudmigration.SnapshotStatusCreating,
},
{
UID: "fake_uid",
SessionUID: query.SessionUID,
Status: cloudmigration.SnapshotStatusUnknown,
Status: cloudmigration.SnapshotStatusCreating,
},
}, nil
}

View File

@ -190,6 +190,7 @@ func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedIn
return fmt.Errorf("fetching migration data: %w", err)
}
localSnapshotResource := make([]cloudmigration.CloudMigrationResource, len(migrationData.Items))
resourcesGroupedByType := make(map[cloudmigration.MigrateDataType][]snapshot.MigrateDataRequestItemDTO, 0)
for _, item := range migrationData.Items {
resourcesGroupedByType[item.Type] = append(resourcesGroupedByType[item.Type], snapshot.MigrateDataRequestItemDTO{
@ -198,6 +199,11 @@ func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedIn
Name: item.Name,
Data: item.Data,
})
localSnapshotResource = append(localSnapshotResource, cloudmigration.CloudMigrationResource{
Type: item.Type,
RefID: item.RefID,
Status: cloudmigration.ItemStatusPending,
})
}
for _, resourceType := range []cloudmigration.MigrateDataType{
@ -222,8 +228,9 @@ func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedIn
// update snapshot status to pending upload with retry
if err := retryer.Retry(func() (retryer.RetrySignal, error) {
err := s.store.UpdateSnapshot(ctx, cloudmigration.UpdateSnapshotCmd{
UID: snapshotMeta.UID,
Status: cloudmigration.SnapshotStatusPendingUpload,
UID: snapshotMeta.UID,
Status: cloudmigration.SnapshotStatusPendingUpload,
Resources: localSnapshotResource,
})
return retryer.FuncComplete, err
}, 10, time.Millisecond*100, time.Second*10); err != nil {

View File

@ -36,6 +36,9 @@ func (ss *sqlStore) GetMigrationSessionByUID(ctx context.Context, uid string) (*
}
return nil
})
if err != nil {
return nil, err
}
if err := ss.decryptToken(ctx, &cm); err != nil {
return &cm, err
@ -165,7 +168,6 @@ func (ss *sqlStore) CreateSnapshot(ctx context.Context, snapshot cloudmigration.
err := ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
snapshot.Created = time.Now()
snapshot.Updated = time.Now()
snapshot.UID = util.GenerateShortUID()
_, err := sess.Insert(&snapshot)
if err != nil {
@ -271,6 +273,8 @@ func (ss *sqlStore) GetSnapshotList(ctx context.Context, query cloudmigration.Li
return snapshots, nil
}
// CreateUpdateSnapshotResources either updates a migration resource for a snapshot, or creates it if it does not exist
// If the uid is not known, it uses snapshot_uid + resource_uid as a lookup
func (ss *sqlStore) CreateUpdateSnapshotResources(ctx context.Context, snapshotUid string, resources []cloudmigration.CloudMigrationResource) error {
// ensure snapshot_uids are consistent so that we can use them to query when uid isn't known
for i := 0; i < len(resources); i++ {

View File

@ -173,7 +173,7 @@ func Test_SnapshotManagement(t *testing.T) {
// create a snapshot
cmr := cloudmigration.CloudMigrationSnapshot{
SessionUID: sessionUid,
Status: "initializing",
Status: cloudmigration.SnapshotStatusCreating,
}
snapshotUid, err := s.CreateSnapshot(ctx, cmr)
@ -183,7 +183,7 @@ func Test_SnapshotManagement(t *testing.T) {
//retrieve it from the db
snapshot, err := s.GetSnapshotByUID(ctx, snapshotUid, 0, 0)
require.NoError(t, err)
require.Equal(t, cloudmigration.SnapshotStatusInitializing, string(snapshot.Status))
require.Equal(t, cloudmigration.SnapshotStatusCreating, snapshot.Status)
// update its status
err = s.UpdateSnapshot(ctx, cloudmigration.UpdateSnapshotCmd{UID: snapshotUid, Status: cloudmigration.SnapshotStatusCreating})
@ -192,7 +192,7 @@ func Test_SnapshotManagement(t *testing.T) {
//retrieve it again
snapshot, err = s.GetSnapshotByUID(ctx, snapshotUid, 0, 0)
require.NoError(t, err)
require.Equal(t, cloudmigration.SnapshotStatusCreating, string(snapshot.Status))
require.Equal(t, cloudmigration.SnapshotStatusCreating, snapshot.Status)
// lists snapshots and ensures it's in there
snapshots, err := s.GetSnapshotList(ctx, cloudmigration.ListSnapshotsQuery{SessionUID: sessionUid, Page: 1, Limit: 100})

View File

@ -10,7 +10,7 @@ type Client interface {
ValidateKey(context.Context, cloudmigration.CloudMigrationSession) error
MigrateData(context.Context, cloudmigration.CloudMigrationSession, cloudmigration.MigrateDataRequest) (*cloudmigration.MigrateDataResponse, error)
StartSnapshot(context.Context, cloudmigration.CloudMigrationSession) (*cloudmigration.StartSnapshotResponse, error)
GetSnapshotStatus(context.Context, cloudmigration.CloudMigrationSession, cloudmigration.CloudMigrationSnapshot) (*cloudmigration.CloudMigrationSnapshot, error)
GetSnapshotStatus(context.Context, cloudmigration.CloudMigrationSession, cloudmigration.CloudMigrationSnapshot) (*cloudmigration.GetSnapshotStatusResponse, error)
}
const logPrefix = "cloudmigration.gmsclient"

View File

@ -1,4 +1,3 @@
// TODO: Move these to a shared library in common with GMS
package gmsclient
type MigrateDataType string

View File

@ -9,29 +9,38 @@ import (
"io"
"net/http"
"strings"
"sync"
"time"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/cloudmigration"
"github.com/grafana/grafana/pkg/setting"
)
// NewGMSClient returns an implementation of Client that queries GrafanaMigrationService
func NewGMSClient(domain string) Client {
return &gmsClientImpl{
domain: domain,
log: log.New(logPrefix),
func NewGMSClient(cfg *setting.Cfg) (Client, error) {
if cfg.CloudMigration.GMSDomain == "" {
return nil, fmt.Errorf("missing GMS domain")
}
return &gmsClientImpl{
cfg: cfg,
log: log.New(logPrefix),
}, nil
}
type gmsClientImpl struct {
domain string
log *log.ConcreteLogger
cfg *setting.Cfg
log *log.ConcreteLogger
getStatusMux sync.Mutex
getStatusLastQueried time.Time
}
func (c *gmsClientImpl) ValidateKey(ctx context.Context, cm cloudmigration.CloudMigrationSession) (err error) {
logger := c.log.FromContext(ctx)
// TODO update service url to gms
path := fmt.Sprintf("%s/api/v1/validate-key", buildBasePath(c.domain, cm.ClusterSlug))
// TODO: there is a lot of boilerplate code in these methods, we should consolidate them when we have a gardening period
path := fmt.Sprintf("%s/api/v1/validate-key", c.buildBasePath(cm.ClusterSlug))
// validation is an empty POST to GMS with the authorization header included
req, err := http.NewRequest("POST", path, bytes.NewReader(nil))
@ -42,7 +51,9 @@ func (c *gmsClientImpl) ValidateKey(ctx context.Context, cm cloudmigration.Cloud
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %d:%s", cm.StackID, cm.AuthToken))
client := &http.Client{}
client := &http.Client{
Timeout: c.cfg.CloudMigration.GMSValidateKeyTimeout,
}
resp, err := client.Do(req)
if err != nil {
logger.Error("error sending http request for token validation", "err", err.Error())
@ -62,11 +73,12 @@ func (c *gmsClientImpl) ValidateKey(ctx context.Context, cm cloudmigration.Cloud
return nil
}
// Deprecated
func (c *gmsClientImpl) MigrateData(ctx context.Context, cm cloudmigration.CloudMigrationSession, request cloudmigration.MigrateDataRequest) (result *cloudmigration.MigrateDataResponse, err error) {
logger := c.log.FromContext(ctx)
// TODO update service url to gms
path := fmt.Sprintf("%s/api/v1/migrate-data", buildBasePath(c.domain, cm.ClusterSlug))
path := fmt.Sprintf("%s/api/v1/migrate-data", c.buildBasePath(cm.ClusterSlug))
reqDTO := convertRequestToDTO(request)
body, err := json.Marshal(reqDTO)
@ -111,7 +123,7 @@ func (c *gmsClientImpl) MigrateData(ctx context.Context, cm cloudmigration.Cloud
}
func (c *gmsClientImpl) StartSnapshot(ctx context.Context, session cloudmigration.CloudMigrationSession) (out *cloudmigration.StartSnapshotResponse, err error) {
path := fmt.Sprintf("%s/api/v1/start-snapshot", buildBasePath(c.domain, session.ClusterSlug))
path := fmt.Sprintf("%s/api/v1/start-snapshot", c.buildBasePath(session.ClusterSlug))
// Send the request to cms with the associated auth token
req, err := http.NewRequest(http.MethodPost, path, nil)
@ -122,7 +134,9 @@ func (c *gmsClientImpl) StartSnapshot(ctx context.Context, session cloudmigratio
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %d:%s", session.StackID, session.AuthToken))
client := &http.Client{}
client := &http.Client{
Timeout: c.cfg.CloudMigration.GMSStartSnapshotTimeout,
}
resp, err := client.Do(req)
if err != nil {
c.log.Error("error sending http request to start snapshot", "err", err.Error())
@ -148,8 +162,56 @@ func (c *gmsClientImpl) StartSnapshot(ctx context.Context, session cloudmigratio
return &result, nil
}
func (c *gmsClientImpl) GetSnapshotStatus(context.Context, cloudmigration.CloudMigrationSession, cloudmigration.CloudMigrationSnapshot) (*cloudmigration.CloudMigrationSnapshot, error) {
panic("not implemented")
func (c *gmsClientImpl) GetSnapshotStatus(ctx context.Context, session cloudmigration.CloudMigrationSession, snapshot cloudmigration.CloudMigrationSnapshot) (*cloudmigration.GetSnapshotStatusResponse, error) {
c.getStatusMux.Lock()
defer c.getStatusMux.Unlock()
logger := c.log.FromContext(ctx)
path := fmt.Sprintf("%s/api/v1/status/%s/status", c.buildBasePath(session.ClusterSlug), snapshot.GMSSnapshotUID)
// Send the request to gms with the associated auth token
req, err := http.NewRequest(http.MethodGet, path, nil)
if err != nil {
c.log.Error("error creating http request to get snapshot status", "err", err.Error())
return nil, fmt.Errorf("http request error: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %d:%s", session.StackID, session.AuthToken))
client := &http.Client{
Timeout: c.cfg.CloudMigration.GMSGetSnapshotStatusTimeout,
}
c.getStatusLastQueried = time.Now()
resp, err := client.Do(req)
if err != nil {
c.log.Error("error sending http request to get snapshot status", "err", err.Error())
return nil, fmt.Errorf("http request error: %w", err)
} else if resp.StatusCode >= 400 {
c.log.Error("received error response to get snapshot status", "statusCode", resp.StatusCode)
return nil, fmt.Errorf("http request error: %w", err)
}
defer func() {
if err := resp.Body.Close(); err != nil {
logger.Error("closing request body: %w", err)
}
}()
var result cloudmigration.GetSnapshotStatusResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
logger.Error("unmarshalling response body: %w", err)
return nil, fmt.Errorf("unmarshalling get snapshot status response: %w", err)
}
return &result, nil
}
func (c *gmsClientImpl) buildBasePath(clusterSlug string) string {
domain := c.cfg.CloudMigration.GMSDomain
if strings.HasPrefix(domain, "http://localhost") {
return domain
}
return fmt.Sprintf("https://cms-%s.%s/cloud-migrations", clusterSlug, domain)
}
func convertRequestToDTO(request cloudmigration.MigrateDataRequest) MigrateDataRequestDTO {
@ -185,10 +247,3 @@ func convertResponseFromDTO(result MigrateDataResponseDTO) cloudmigration.Migrat
Items: items,
}
}
func buildBasePath(domain, clusterSlug string) string {
if strings.HasPrefix(domain, "http://localhost") {
return domain
}
return fmt.Sprintf("https://cms-%s.%s/cloud-migrations", clusterSlug, domain)
}

View File

@ -3,12 +3,31 @@ package gmsclient
import (
"testing"
"github.com/grafana/grafana/pkg/setting"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func Test_buildBasePath(t *testing.T) {
t.Parallel()
// Domain is required
_, err := NewGMSClient(&setting.Cfg{
CloudMigration: setting.CloudMigrationSettings{
GMSDomain: "",
},
})
require.Error(t, err)
// Domain is required
c, err := NewGMSClient(&setting.Cfg{
CloudMigration: setting.CloudMigrationSettings{
GMSDomain: "non-empty",
},
})
require.NoError(t, err)
client := c.(*gmsClientImpl)
tests := []struct {
description string
domain string
@ -30,7 +49,8 @@ func Test_buildBasePath(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.description, func(t *testing.T) {
assert.Equal(t, tt.expected, buildBasePath(tt.domain, tt.clusterSlug))
client.cfg.CloudMigration.GMSDomain = tt.domain
assert.Equal(t, tt.expected, client.buildBasePath(tt.clusterSlug))
})
}
}

View File

@ -4,7 +4,6 @@ import (
"context"
"fmt"
"math/rand"
"time"
cryptoRand "crypto/rand"
@ -68,33 +67,28 @@ func (c *memoryClientImpl) StartSnapshot(context.Context, cloudmigration.CloudMi
return c.snapshot, nil
}
func (c *memoryClientImpl) GetSnapshotStatus(ctx context.Context, session cloudmigration.CloudMigrationSession, snapshot cloudmigration.CloudMigrationSnapshot) (*cloudmigration.CloudMigrationSnapshot, error) {
results := []cloudmigration.CloudMigrationResource{
{
Type: cloudmigration.DashboardDataType,
RefID: "dash1",
Status: cloudmigration.ItemStatusOK,
},
{
Type: cloudmigration.DatasourceDataType,
RefID: "ds1",
Status: cloudmigration.ItemStatusError,
Error: "fake error",
},
{
Type: cloudmigration.FolderDataType,
RefID: "folder1",
Status: cloudmigration.ItemStatusOK,
func (c *memoryClientImpl) GetSnapshotStatus(ctx context.Context, session cloudmigration.CloudMigrationSession, snapshot cloudmigration.CloudMigrationSnapshot) (*cloudmigration.GetSnapshotStatusResponse, error) {
gmsResp := &cloudmigration.GetSnapshotStatusResponse{
State: cloudmigration.SnapshotStateFinished,
Results: []cloudmigration.CloudMigrationResource{
{
Type: cloudmigration.DashboardDataType,
RefID: "dash1",
Status: cloudmigration.ItemStatusOK,
},
{
Type: cloudmigration.DatasourceDataType,
RefID: "ds1",
Status: cloudmigration.ItemStatusError,
Error: "fake error",
},
{
Type: cloudmigration.FolderDataType,
RefID: "folder1",
Status: cloudmigration.ItemStatusOK,
},
},
}
// just fake an entire response
gmsSnapshot := cloudmigration.CloudMigrationSnapshot{
Status: cloudmigration.SnapshotStatusFinished,
GMSSnapshotUID: "gmssnapshotuid",
Resources: results,
Finished: time.Now(),
}
return &gmsSnapshot, nil
return gmsResp, nil
}

View File

@ -55,25 +55,24 @@ type CloudMigrationSnapshot struct {
type SnapshotStatus string
const (
SnapshotStatusInitializing = "initializing"
SnapshotStatusCreating = "creating"
SnapshotStatusPendingUpload = "pending_upload"
SnapshotStatusUploading = "uploading"
SnapshotStatusPendingProcessing = "pending_processing"
SnapshotStatusProcessing = "processing"
SnapshotStatusFinished = "finished"
SnapshotStatusError = "error"
SnapshotStatusUnknown = "unknown"
SnapshotStatusCreating SnapshotStatus = "creating"
SnapshotStatusPendingUpload SnapshotStatus = "pending_upload"
SnapshotStatusUploading SnapshotStatus = "uploading"
SnapshotStatusPendingProcessing SnapshotStatus = "pending_processing"
SnapshotStatusProcessing SnapshotStatus = "processing"
SnapshotStatusFinished SnapshotStatus = "finished"
SnapshotStatusCanceled SnapshotStatus = "canceled"
SnapshotStatusError SnapshotStatus = "error"
)
type CloudMigrationResource struct {
ID int64 `xorm:"pk autoincr 'id'"`
UID string `xorm:"uid"`
Type MigrateDataType `xorm:"resource_type"`
RefID string `xorm:"resource_uid"`
Status ItemStatus `xorm:"status"`
Error string `xorm:"error_string"`
Type MigrateDataType `xorm:"resource_type" json:"type"`
RefID string `xorm:"resource_uid" json:"refId"`
Status ItemStatus `xorm:"status" json:"status"`
Error string `xorm:"error_string" json:"error"`
SnapshotUID string `xorm:"snapshot_uid"`
}
@ -212,3 +211,20 @@ type StartSnapshotResponse struct {
UploadURL string `json:"uploadURL"`
EncryptionKey string `json:"encryptionKey"`
}
// Based on Grafana Migration Service DTOs
type GetSnapshotStatusResponse struct {
State SnapshotState `json:"state"`
Results []CloudMigrationResource `json:"results"`
}
type SnapshotState string
const (
SnapshotStateInitialized SnapshotState = "INITIALIZED"
SnapshotStateProcessing SnapshotState = "PROCESSING"
SnapshotStateFinished SnapshotState = "FINISHED"
SnapshotStateCanceled SnapshotState = "CANCELED"
SnapshotStateError SnapshotState = "ERROR"
SnapshotStateUnknown SnapshotState = "UNKNOWN"
)

View File

@ -6,18 +6,21 @@ import (
)
type CloudMigrationSettings struct {
IsTarget bool
GcomAPIToken string
SnapshotFolder string
StartSnapshotTimeout time.Duration
FetchInstanceTimeout time.Duration
CreateAccessPolicyTimeout time.Duration
FetchAccessPolicyTimeout time.Duration
DeleteAccessPolicyTimeout time.Duration
ListTokensTimeout time.Duration
CreateTokenTimeout time.Duration
DeleteTokenTimeout time.Duration
TokenExpiresAfter time.Duration
IsTarget bool
GcomAPIToken string
SnapshotFolder string
GMSDomain string
GMSStartSnapshotTimeout time.Duration
GMSGetSnapshotStatusTimeout time.Duration
GMSValidateKeyTimeout time.Duration
FetchInstanceTimeout time.Duration
CreateAccessPolicyTimeout time.Duration
FetchAccessPolicyTimeout time.Duration
DeleteAccessPolicyTimeout time.Duration
ListTokensTimeout time.Duration
CreateTokenTimeout time.Duration
DeleteTokenTimeout time.Duration
TokenExpiresAfter time.Duration
IsDeveloperMode bool
}
@ -27,7 +30,10 @@ func (cfg *Cfg) readCloudMigrationSettings() {
cfg.CloudMigration.IsTarget = cloudMigration.Key("is_target").MustBool(false)
cfg.CloudMigration.GcomAPIToken = cloudMigration.Key("gcom_api_token").MustString("")
cfg.CloudMigration.SnapshotFolder = cloudMigration.Key("snapshot_folder").MustString("")
cfg.CloudMigration.StartSnapshotTimeout = cloudMigration.Key("start_snapshot_timeout").MustDuration(5 * time.Second)
cfg.CloudMigration.GMSDomain = cloudMigration.Key("domain").MustString("")
cfg.CloudMigration.GMSValidateKeyTimeout = cloudMigration.Key("validate_key_timeout").MustDuration(5 * time.Second)
cfg.CloudMigration.GMSStartSnapshotTimeout = cloudMigration.Key("start_snapshot_timeout").MustDuration(5 * time.Second)
cfg.CloudMigration.GMSGetSnapshotStatusTimeout = cloudMigration.Key("get_snapshot_status_timeout").MustDuration(5 * time.Second)
cfg.CloudMigration.FetchInstanceTimeout = cloudMigration.Key("fetch_instance_timeout").MustDuration(5 * time.Second)
cfg.CloudMigration.CreateAccessPolicyTimeout = cloudMigration.Key("create_access_policy_timeout").MustDuration(5 * time.Second)
cfg.CloudMigration.FetchAccessPolicyTimeout = cloudMigration.Key("fetch_access_policy_timeout").MustDuration(5 * time.Second)

View File

@ -15601,6 +15601,7 @@
"PENDING_PROCESSING",
"PROCESSING",
"FINISHED",
"CANCELED",
"ERROR",
"UNKNOWN"
]
@ -20369,6 +20370,7 @@
"PENDING_PROCESSING",
"PROCESSING",
"FINISHED",
"CANCELED",
"ERROR",
"UNKNOWN"
]

View File

@ -5697,6 +5697,7 @@
"PENDING_PROCESSING",
"PROCESSING",
"FINISHED",
"CANCELED",
"ERROR",
"UNKNOWN"
],
@ -10464,6 +10465,7 @@
"PENDING_PROCESSING",
"PROCESSING",
"FINISHED",
"CANCELED",
"ERROR",
"UNKNOWN"
],