diff --git a/conf/defaults.ini b/conf/defaults.ini index 0cd2ecde138..860d7afaafc 100644 --- a/conf/defaults.ini +++ b/conf/defaults.ini @@ -1947,6 +1947,8 @@ validate_key_timeout = 5s get_snapshot_status_timeout = 5s # How long to wait for a request sent to gms to create a presigned upload url create_upload_url_timeout = 5s +# How long to wait for a request sent to gms to report an event +report_event_timeout = 5s # How long to wait for a request to fetch an instance to complete fetch_instance_timeout = 5s # How long to wait for a request to create an access policy to complete diff --git a/conf/sample.ini b/conf/sample.ini index 0482e9388e0..e4fe2ce5cc1 100644 --- a/conf/sample.ini +++ b/conf/sample.ini @@ -1877,6 +1877,8 @@ timeout = 30s ;get_snapshot_status_timeout = 5s # How long to wait for a request sent to gms to create a presigned upload url ;create_upload_url_timeout = 5s +# How long to wait for a request sent to gms to report an event +;report_event_timeout = 5s # How long to wait for a request to fetch an instance to complete ;fetch_instance_timeout = 5s # How long to wait for a request to create an access policy to complete diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go index 2797b64e820..78cfe7eb0fd 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go @@ -11,9 +11,11 @@ import ( "sync" "time" + "github.com/google/uuid" "github.com/grafana/grafana/pkg/api/response" "github.com/grafana/grafana/pkg/api/routing" "github.com/grafana/grafana/pkg/infra/db" + "github.com/grafana/grafana/pkg/infra/kvstore" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/services/cloudmigration" @@ -55,6 +57,7 @@ type Service struct { dashboardService dashboards.DashboardService folderService folder.Service secretsService secrets.Service + kvStore *kvstore.NamespacedKVStore api *api.CloudMigrationAPI tracer tracing.Tracer @@ -85,6 +88,7 @@ func ProvideService( tracer tracing.Tracer, dashboardService dashboards.DashboardService, folderService folder.Service, + kvStore kvstore.KVStore, ) (cloudmigration.Service, error) { if !features.IsEnabledGlobally(featuremgmt.FlagOnPremToCloudMigrations) { return &NoopServiceImpl{}, nil @@ -101,6 +105,7 @@ func ProvideService( secretsService: secretsService, dashboardService: dashboardService, folderService: folderService, + kvStore: kvstore.WithNamespace(kvStore, 0, "cloudmigration"), } s.api = api.RegisterApi(routeRegister, s, tracer) @@ -379,6 +384,8 @@ func (s *Service) CreateSession(ctx context.Context, cmd cloudmigration.CloudMig return nil, fmt.Errorf("error creating migration: %w", err) } + s.report(ctx, cm, gmsclient.EventConnect, 0, nil) + return &cloudmigration.CloudMigrationSessionResponse{ UID: cm.UID, Slug: token.Instance.Slug, @@ -460,6 +467,9 @@ func (s *Service) DeleteSession(ctx context.Context, uid string) (*cloudmigratio if err != nil { return c, fmt.Errorf("deleting migration from db: %w", err) } + + s.report(ctx, c, gmsclient.EventDisconnect, 0, nil) + return c, nil } @@ -511,7 +521,11 @@ func (s *Service) CreateSnapshot(ctx context.Context, signedInUser *user.SignedI ctx, cancelFunc := context.WithCancel(context.Background()) s.cancelFunc = cancelFunc - if err := s.buildSnapshot(ctx, signedInUser, initResp.MaxItemsPerPartition, snapshot); err != nil { + s.report(ctx, session, gmsclient.EventStartBuildingSnapshot, 0, nil) + + start := time.Now() + err := s.buildSnapshot(ctx, signedInUser, initResp.MaxItemsPerPartition, snapshot) + if err != nil { s.log.Error("building snapshot", "err", err.Error()) // Update status to error with retries if err := s.updateSnapshotWithRetries(context.Background(), cloudmigration.UpdateSnapshotCmd{ @@ -521,6 +535,8 @@ func (s *Service) CreateSnapshot(ctx context.Context, signedInUser *user.SignedI s.log.Error("critical failure during snapshot creation - please report any error logs") } } + + s.report(ctx, session, gmsclient.EventDoneBuildingSnapshot, time.Since(start), err) }() return &snapshot, nil @@ -637,7 +653,11 @@ func (s *Service) UploadSnapshot(ctx context.Context, sessionUid string, snapsho ctx, cancelFunc := context.WithCancel(context.Background()) s.cancelFunc = cancelFunc - if err := s.uploadSnapshot(ctx, session, snapshot, uploadUrl); err != nil { + s.report(ctx, session, gmsclient.EventStartUploadingSnapshot, 0, nil) + + start := time.Now() + err := s.uploadSnapshot(ctx, session, snapshot, uploadUrl) + if err != nil { s.log.Error("uploading snapshot", "err", err.Error()) // Update status to error with retries if err := s.updateSnapshotWithRetries(context.Background(), cloudmigration.UpdateSnapshotCmd{ @@ -647,6 +667,8 @@ func (s *Service) UploadSnapshot(ctx context.Context, sessionUid string, snapsho s.log.Error("critical failure during snapshot upload - please report any error logs") } } + + s.report(ctx, session, gmsclient.EventDoneUploadingSnapshot, time.Since(start), err) }() return nil @@ -678,3 +700,52 @@ func (s *Service) CancelSnapshot(ctx context.Context, sessionUid string, snapsho return nil } + +func (s *Service) report( + ctx context.Context, + sess *cloudmigration.CloudMigrationSession, + t gmsclient.LocalEventType, + d time.Duration, + evtErr error, +) { + id, err := s.getLocalEventId(ctx) + if err != nil { + s.log.Error("failed to report event", "type", t, "error", err.Error()) + return + } + + e := gmsclient.EventRequestDTO{ + Event: t, + LocalID: id, + } + + if d != 0 { + e.DurationIfFinished = d + } + if evtErr != nil { + e.Error = evtErr.Error() + } + + s.gmsClient.ReportEvent(ctx, *sess, e) +} + +func (s *Service) getLocalEventId(ctx context.Context) (string, error) { + anonId, ok, err := s.kvStore.Get(ctx, "anonymous_id") + if err != nil { + return "", fmt.Errorf("failed to get usage stats id: %w", err) + } + + if ok { + return anonId, nil + } + + anonId = uuid.NewString() + + err = s.kvStore.Set(ctx, "anonymous_id", anonId) + if err != nil { + s.log.Error("Failed to store usage stats id", "error", err) + return "", fmt.Errorf("failed to store usage stats id: %w", err) + } + + return anonId, nil +} diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go index 1afb3a5c5e3..1ee0ede3be0 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go @@ -11,8 +11,10 @@ import ( "github.com/grafana/grafana/pkg/api/routing" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/db" + "github.com/grafana/grafana/pkg/infra/kvstore" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/services/cloudmigration" + "github.com/grafana/grafana/pkg/services/cloudmigration/gmsclient" "github.com/grafana/grafana/pkg/services/contexthandler/ctxkey" contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model" "github.com/grafana/grafana/pkg/services/dashboards" @@ -442,6 +444,7 @@ func setUpServiceTest(t *testing.T, withDashboardMock bool) cloudmigration.Servi tracer, dashboardService, mockFolder, + kvstore.ProvideService(sqlStore), ) require.NoError(t, err) @@ -453,6 +456,7 @@ type gmsClientMock struct { startSnapshotCalled int getStatusCalled int createUploadUrlCalled int + reportEventCalled int getSnapshotResponse *cloudmigration.GetSnapshotStatusResponse } @@ -480,3 +484,7 @@ func (m *gmsClientMock) CreatePresignedUploadUrl(ctx context.Context, session cl m.createUploadUrlCalled++ return "http://localhost:3000", nil } + +func (m *gmsClientMock) ReportEvent(context.Context, cloudmigration.CloudMigrationSession, gmsclient.EventRequestDTO) { + m.reportEventCalled++ +} diff --git a/pkg/services/cloudmigration/gmsclient/client.go b/pkg/services/cloudmigration/gmsclient/client.go index ed75a6493cb..d15b03a9e95 100644 --- a/pkg/services/cloudmigration/gmsclient/client.go +++ b/pkg/services/cloudmigration/gmsclient/client.go @@ -12,6 +12,7 @@ type Client interface { StartSnapshot(context.Context, cloudmigration.CloudMigrationSession) (*cloudmigration.StartSnapshotResponse, error) GetSnapshotStatus(context.Context, cloudmigration.CloudMigrationSession, cloudmigration.CloudMigrationSnapshot, int) (*cloudmigration.GetSnapshotStatusResponse, error) CreatePresignedUploadUrl(context.Context, cloudmigration.CloudMigrationSession, cloudmigration.CloudMigrationSnapshot) (string, error) + ReportEvent(context.Context, cloudmigration.CloudMigrationSession, EventRequestDTO) } const logPrefix = "cloudmigration.gmsclient" diff --git a/pkg/services/cloudmigration/gmsclient/dtos.go b/pkg/services/cloudmigration/gmsclient/dtos.go index 31ab080a29d..91c305ce7c3 100644 --- a/pkg/services/cloudmigration/gmsclient/dtos.go +++ b/pkg/services/cloudmigration/gmsclient/dtos.go @@ -1,5 +1,7 @@ package gmsclient +import "time" + type MigrateDataType string const ( @@ -48,3 +50,21 @@ type MigrateDataResponseItemDTO struct { type CreateSnapshotUploadUrlResponseDTO struct { UploadUrl string `json:"uploadUrl"` } + +type EventRequestDTO struct { + LocalID string `json:"migrationClientId"` + Event LocalEventType `json:"event"` + Error string `json:"error"` + DurationIfFinished time.Duration `json:"duration"` +} + +type LocalEventType string + +const ( + EventConnect LocalEventType = "connect" + EventDisconnect LocalEventType = "disconnect" + EventStartBuildingSnapshot LocalEventType = "start_building_snapshot" + EventDoneBuildingSnapshot LocalEventType = "done_building_snapshot" + EventStartUploadingSnapshot LocalEventType = "start_uploading_snapshot" + EventDoneUploadingSnapshot LocalEventType = "done_uploading_snapshot" +) diff --git a/pkg/services/cloudmigration/gmsclient/gms_client.go b/pkg/services/cloudmigration/gmsclient/gms_client.go index 614fd889e70..c942f69d220 100644 --- a/pkg/services/cloudmigration/gmsclient/gms_client.go +++ b/pkg/services/cloudmigration/gmsclient/gms_client.go @@ -249,6 +249,52 @@ func (c *gmsClientImpl) CreatePresignedUploadUrl(ctx context.Context, session cl return result.UploadUrl, nil } +func (c *gmsClientImpl) ReportEvent(ctx context.Context, session cloudmigration.CloudMigrationSession, event EventRequestDTO) { + if event.LocalID == "" || event.Event == "" { + return + } + + path := fmt.Sprintf("%s/api/v1/snapshots/events", c.buildBasePath(session.ClusterSlug)) + + var buf bytes.Buffer + if err := json.NewEncoder(&buf).Encode(event); err != nil { + c.log.Error("encoding event", "err", err.Error()) + return + } + // Send the request to gms with the associated auth token + req, err := http.NewRequest(http.MethodPost, path, &buf) + if err != nil { + c.log.Error("error creating http request to report event", "err", err.Error()) + return + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", fmt.Sprintf("Bearer %d:%s", session.StackID, session.AuthToken)) + + client := &http.Client{ + Timeout: c.cfg.CloudMigration.GMSReportEventTimeout, + } + resp, err := client.Do(req) + if err != nil { + c.log.Error("error sending http request for report event", "err", err.Error()) + return + } else if resp.StatusCode >= 400 { + c.log.Error("received error response for report event", "type", event.Event, "statusCode", resp.StatusCode) + body, err := io.ReadAll(resp.Body) + if err != nil { + c.log.Error("reading request body", "err", err.Error()) + return + } + c.log.Error("http request error", "body", string(body)) + return + } + + defer func() { + if err := resp.Body.Close(); err != nil { + c.log.Error("closing request body", "err", err.Error()) + } + }() +} + func (c *gmsClientImpl) buildBasePath(clusterSlug string) string { domain := c.cfg.CloudMigration.GMSDomain if strings.HasPrefix(domain, "http://localhost") { diff --git a/pkg/services/cloudmigration/gmsclient/inmemory_client.go b/pkg/services/cloudmigration/gmsclient/inmemory_client.go index 8d1219f1eab..bf1baa6e55e 100644 --- a/pkg/services/cloudmigration/gmsclient/inmemory_client.go +++ b/pkg/services/cloudmigration/gmsclient/inmemory_client.go @@ -95,3 +95,6 @@ func (c *memoryClientImpl) GetSnapshotStatus(ctx context.Context, session cloudm func (c *memoryClientImpl) CreatePresignedUploadUrl(ctx context.Context, sess cloudmigration.CloudMigrationSession, snapshot cloudmigration.CloudMigrationSnapshot) (string, error) { return "http://localhost:3000", nil } + +func (c *memoryClientImpl) ReportEvent(context.Context, cloudmigration.CloudMigrationSession, EventRequestDTO) { +} diff --git a/pkg/setting/setting_cloud_migration.go b/pkg/setting/setting_cloud_migration.go index 48f91696eeb..9434e9cb4ea 100644 --- a/pkg/setting/setting_cloud_migration.go +++ b/pkg/setting/setting_cloud_migration.go @@ -14,6 +14,7 @@ type CloudMigrationSettings struct { GMSGetSnapshotStatusTimeout time.Duration GMSCreateUploadUrlTimeout time.Duration GMSValidateKeyTimeout time.Duration + GMSReportEventTimeout time.Duration FetchInstanceTimeout time.Duration CreateAccessPolicyTimeout time.Duration FetchAccessPolicyTimeout time.Duration @@ -36,6 +37,7 @@ func (cfg *Cfg) readCloudMigrationSettings() { cfg.CloudMigration.GMSStartSnapshotTimeout = cloudMigration.Key("start_snapshot_timeout").MustDuration(5 * time.Second) cfg.CloudMigration.GMSGetSnapshotStatusTimeout = cloudMigration.Key("get_snapshot_status_timeout").MustDuration(5 * time.Second) cfg.CloudMigration.GMSCreateUploadUrlTimeout = cloudMigration.Key("create_upload_url_timeout").MustDuration(5 * time.Second) + cfg.CloudMigration.GMSReportEventTimeout = cloudMigration.Key("report_event_timeout").MustDuration(5 * time.Second) cfg.CloudMigration.FetchInstanceTimeout = cloudMigration.Key("fetch_instance_timeout").MustDuration(5 * time.Second) cfg.CloudMigration.CreateAccessPolicyTimeout = cloudMigration.Key("create_access_policy_timeout").MustDuration(5 * time.Second) cfg.CloudMigration.FetchAccessPolicyTimeout = cloudMigration.Key("fetch_access_policy_timeout").MustDuration(5 * time.Second)