CloudMigrations: Send local events to gms during the migration process (#90637)

* add gms client function

* add timeout config for endpoint

* report events to gms

* fix lint error

* clean up report calls and make sure reports all have local ids

* extra validation

* improve error logging and fix url
This commit is contained in:
Michael Mandrus 2024-07-20 00:02:31 -04:00 committed by GitHub
parent 1c5ed0da4d
commit ee90cd3031
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 157 additions and 2 deletions

View File

@ -1947,6 +1947,8 @@ validate_key_timeout = 5s
get_snapshot_status_timeout = 5s
# How long to wait for a request sent to gms to create a presigned upload url
create_upload_url_timeout = 5s
# How long to wait for a request sent to gms to report an event
report_event_timeout = 5s
# How long to wait for a request to fetch an instance to complete
fetch_instance_timeout = 5s
# How long to wait for a request to create an access policy to complete

View File

@ -1877,6 +1877,8 @@ timeout = 30s
;get_snapshot_status_timeout = 5s
# How long to wait for a request sent to gms to create a presigned upload url
;create_upload_url_timeout = 5s
# How long to wait for a request sent to gms to report an event
;report_event_timeout = 5s
# How long to wait for a request to fetch an instance to complete
;fetch_instance_timeout = 5s
# How long to wait for a request to create an access policy to complete

View File

@ -11,9 +11,11 @@ import (
"sync"
"time"
"github.com/google/uuid"
"github.com/grafana/grafana/pkg/api/response"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/kvstore"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/cloudmigration"
@ -55,6 +57,7 @@ type Service struct {
dashboardService dashboards.DashboardService
folderService folder.Service
secretsService secrets.Service
kvStore *kvstore.NamespacedKVStore
api *api.CloudMigrationAPI
tracer tracing.Tracer
@ -85,6 +88,7 @@ func ProvideService(
tracer tracing.Tracer,
dashboardService dashboards.DashboardService,
folderService folder.Service,
kvStore kvstore.KVStore,
) (cloudmigration.Service, error) {
if !features.IsEnabledGlobally(featuremgmt.FlagOnPremToCloudMigrations) {
return &NoopServiceImpl{}, nil
@ -101,6 +105,7 @@ func ProvideService(
secretsService: secretsService,
dashboardService: dashboardService,
folderService: folderService,
kvStore: kvstore.WithNamespace(kvStore, 0, "cloudmigration"),
}
s.api = api.RegisterApi(routeRegister, s, tracer)
@ -379,6 +384,8 @@ func (s *Service) CreateSession(ctx context.Context, cmd cloudmigration.CloudMig
return nil, fmt.Errorf("error creating migration: %w", err)
}
s.report(ctx, cm, gmsclient.EventConnect, 0, nil)
return &cloudmigration.CloudMigrationSessionResponse{
UID: cm.UID,
Slug: token.Instance.Slug,
@ -460,6 +467,9 @@ func (s *Service) DeleteSession(ctx context.Context, uid string) (*cloudmigratio
if err != nil {
return c, fmt.Errorf("deleting migration from db: %w", err)
}
s.report(ctx, c, gmsclient.EventDisconnect, 0, nil)
return c, nil
}
@ -511,7 +521,11 @@ func (s *Service) CreateSnapshot(ctx context.Context, signedInUser *user.SignedI
ctx, cancelFunc := context.WithCancel(context.Background())
s.cancelFunc = cancelFunc
if err := s.buildSnapshot(ctx, signedInUser, initResp.MaxItemsPerPartition, snapshot); err != nil {
s.report(ctx, session, gmsclient.EventStartBuildingSnapshot, 0, nil)
start := time.Now()
err := s.buildSnapshot(ctx, signedInUser, initResp.MaxItemsPerPartition, snapshot)
if err != nil {
s.log.Error("building snapshot", "err", err.Error())
// Update status to error with retries
if err := s.updateSnapshotWithRetries(context.Background(), cloudmigration.UpdateSnapshotCmd{
@ -521,6 +535,8 @@ func (s *Service) CreateSnapshot(ctx context.Context, signedInUser *user.SignedI
s.log.Error("critical failure during snapshot creation - please report any error logs")
}
}
s.report(ctx, session, gmsclient.EventDoneBuildingSnapshot, time.Since(start), err)
}()
return &snapshot, nil
@ -637,7 +653,11 @@ func (s *Service) UploadSnapshot(ctx context.Context, sessionUid string, snapsho
ctx, cancelFunc := context.WithCancel(context.Background())
s.cancelFunc = cancelFunc
if err := s.uploadSnapshot(ctx, session, snapshot, uploadUrl); err != nil {
s.report(ctx, session, gmsclient.EventStartUploadingSnapshot, 0, nil)
start := time.Now()
err := s.uploadSnapshot(ctx, session, snapshot, uploadUrl)
if err != nil {
s.log.Error("uploading snapshot", "err", err.Error())
// Update status to error with retries
if err := s.updateSnapshotWithRetries(context.Background(), cloudmigration.UpdateSnapshotCmd{
@ -647,6 +667,8 @@ func (s *Service) UploadSnapshot(ctx context.Context, sessionUid string, snapsho
s.log.Error("critical failure during snapshot upload - please report any error logs")
}
}
s.report(ctx, session, gmsclient.EventDoneUploadingSnapshot, time.Since(start), err)
}()
return nil
@ -678,3 +700,52 @@ func (s *Service) CancelSnapshot(ctx context.Context, sessionUid string, snapsho
return nil
}
func (s *Service) report(
ctx context.Context,
sess *cloudmigration.CloudMigrationSession,
t gmsclient.LocalEventType,
d time.Duration,
evtErr error,
) {
id, err := s.getLocalEventId(ctx)
if err != nil {
s.log.Error("failed to report event", "type", t, "error", err.Error())
return
}
e := gmsclient.EventRequestDTO{
Event: t,
LocalID: id,
}
if d != 0 {
e.DurationIfFinished = d
}
if evtErr != nil {
e.Error = evtErr.Error()
}
s.gmsClient.ReportEvent(ctx, *sess, e)
}
func (s *Service) getLocalEventId(ctx context.Context) (string, error) {
anonId, ok, err := s.kvStore.Get(ctx, "anonymous_id")
if err != nil {
return "", fmt.Errorf("failed to get usage stats id: %w", err)
}
if ok {
return anonId, nil
}
anonId = uuid.NewString()
err = s.kvStore.Set(ctx, "anonymous_id", anonId)
if err != nil {
s.log.Error("Failed to store usage stats id", "error", err)
return "", fmt.Errorf("failed to store usage stats id: %w", err)
}
return anonId, nil
}

View File

@ -11,8 +11,10 @@ import (
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/kvstore"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/cloudmigration"
"github.com/grafana/grafana/pkg/services/cloudmigration/gmsclient"
"github.com/grafana/grafana/pkg/services/contexthandler/ctxkey"
contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model"
"github.com/grafana/grafana/pkg/services/dashboards"
@ -442,6 +444,7 @@ func setUpServiceTest(t *testing.T, withDashboardMock bool) cloudmigration.Servi
tracer,
dashboardService,
mockFolder,
kvstore.ProvideService(sqlStore),
)
require.NoError(t, err)
@ -453,6 +456,7 @@ type gmsClientMock struct {
startSnapshotCalled int
getStatusCalled int
createUploadUrlCalled int
reportEventCalled int
getSnapshotResponse *cloudmigration.GetSnapshotStatusResponse
}
@ -480,3 +484,7 @@ func (m *gmsClientMock) CreatePresignedUploadUrl(ctx context.Context, session cl
m.createUploadUrlCalled++
return "http://localhost:3000", nil
}
func (m *gmsClientMock) ReportEvent(context.Context, cloudmigration.CloudMigrationSession, gmsclient.EventRequestDTO) {
m.reportEventCalled++
}

View File

@ -12,6 +12,7 @@ type Client interface {
StartSnapshot(context.Context, cloudmigration.CloudMigrationSession) (*cloudmigration.StartSnapshotResponse, error)
GetSnapshotStatus(context.Context, cloudmigration.CloudMigrationSession, cloudmigration.CloudMigrationSnapshot, int) (*cloudmigration.GetSnapshotStatusResponse, error)
CreatePresignedUploadUrl(context.Context, cloudmigration.CloudMigrationSession, cloudmigration.CloudMigrationSnapshot) (string, error)
ReportEvent(context.Context, cloudmigration.CloudMigrationSession, EventRequestDTO)
}
const logPrefix = "cloudmigration.gmsclient"

View File

@ -1,5 +1,7 @@
package gmsclient
import "time"
type MigrateDataType string
const (
@ -48,3 +50,21 @@ type MigrateDataResponseItemDTO struct {
type CreateSnapshotUploadUrlResponseDTO struct {
UploadUrl string `json:"uploadUrl"`
}
type EventRequestDTO struct {
LocalID string `json:"migrationClientId"`
Event LocalEventType `json:"event"`
Error string `json:"error"`
DurationIfFinished time.Duration `json:"duration"`
}
type LocalEventType string
const (
EventConnect LocalEventType = "connect"
EventDisconnect LocalEventType = "disconnect"
EventStartBuildingSnapshot LocalEventType = "start_building_snapshot"
EventDoneBuildingSnapshot LocalEventType = "done_building_snapshot"
EventStartUploadingSnapshot LocalEventType = "start_uploading_snapshot"
EventDoneUploadingSnapshot LocalEventType = "done_uploading_snapshot"
)

View File

@ -249,6 +249,52 @@ func (c *gmsClientImpl) CreatePresignedUploadUrl(ctx context.Context, session cl
return result.UploadUrl, nil
}
func (c *gmsClientImpl) ReportEvent(ctx context.Context, session cloudmigration.CloudMigrationSession, event EventRequestDTO) {
if event.LocalID == "" || event.Event == "" {
return
}
path := fmt.Sprintf("%s/api/v1/snapshots/events", c.buildBasePath(session.ClusterSlug))
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(event); err != nil {
c.log.Error("encoding event", "err", err.Error())
return
}
// Send the request to gms with the associated auth token
req, err := http.NewRequest(http.MethodPost, path, &buf)
if err != nil {
c.log.Error("error creating http request to report event", "err", err.Error())
return
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %d:%s", session.StackID, session.AuthToken))
client := &http.Client{
Timeout: c.cfg.CloudMigration.GMSReportEventTimeout,
}
resp, err := client.Do(req)
if err != nil {
c.log.Error("error sending http request for report event", "err", err.Error())
return
} else if resp.StatusCode >= 400 {
c.log.Error("received error response for report event", "type", event.Event, "statusCode", resp.StatusCode)
body, err := io.ReadAll(resp.Body)
if err != nil {
c.log.Error("reading request body", "err", err.Error())
return
}
c.log.Error("http request error", "body", string(body))
return
}
defer func() {
if err := resp.Body.Close(); err != nil {
c.log.Error("closing request body", "err", err.Error())
}
}()
}
func (c *gmsClientImpl) buildBasePath(clusterSlug string) string {
domain := c.cfg.CloudMigration.GMSDomain
if strings.HasPrefix(domain, "http://localhost") {

View File

@ -95,3 +95,6 @@ func (c *memoryClientImpl) GetSnapshotStatus(ctx context.Context, session cloudm
func (c *memoryClientImpl) CreatePresignedUploadUrl(ctx context.Context, sess cloudmigration.CloudMigrationSession, snapshot cloudmigration.CloudMigrationSnapshot) (string, error) {
return "http://localhost:3000", nil
}
func (c *memoryClientImpl) ReportEvent(context.Context, cloudmigration.CloudMigrationSession, EventRequestDTO) {
}

View File

@ -14,6 +14,7 @@ type CloudMigrationSettings struct {
GMSGetSnapshotStatusTimeout time.Duration
GMSCreateUploadUrlTimeout time.Duration
GMSValidateKeyTimeout time.Duration
GMSReportEventTimeout time.Duration
FetchInstanceTimeout time.Duration
CreateAccessPolicyTimeout time.Duration
FetchAccessPolicyTimeout time.Duration
@ -36,6 +37,7 @@ func (cfg *Cfg) readCloudMigrationSettings() {
cfg.CloudMigration.GMSStartSnapshotTimeout = cloudMigration.Key("start_snapshot_timeout").MustDuration(5 * time.Second)
cfg.CloudMigration.GMSGetSnapshotStatusTimeout = cloudMigration.Key("get_snapshot_status_timeout").MustDuration(5 * time.Second)
cfg.CloudMigration.GMSCreateUploadUrlTimeout = cloudMigration.Key("create_upload_url_timeout").MustDuration(5 * time.Second)
cfg.CloudMigration.GMSReportEventTimeout = cloudMigration.Key("report_event_timeout").MustDuration(5 * time.Second)
cfg.CloudMigration.FetchInstanceTimeout = cloudMigration.Key("fetch_instance_timeout").MustDuration(5 * time.Second)
cfg.CloudMigration.CreateAccessPolicyTimeout = cloudMigration.Key("create_access_policy_timeout").MustDuration(5 * time.Second)
cfg.CloudMigration.FetchAccessPolicyTimeout = cloudMigration.Key("fetch_access_policy_timeout").MustDuration(5 * time.Second)