CloudMigrations: Implement snapshot management apis (#89296)

* add new apis

* add payloads

* create snapshot status type

* add some impl

* finish implementing update

* start implementing build snapshot func

* add more fake build logic

* add cancel endpoint. do some cleanup

* implement GetSnapshot

* implement upload snapshot

* merge onprem status with gms result

* get it working

* update comment

* rename list endpoint

* add query limit and offset

* add helper method to snapshot

* little bit of cleanup

* work on swagger annotations

* manual merge

* generate swagger specs

* clean up curl commands

* fix bugs found during final testing

* fix linter issue

* fix unit test
This commit is contained in:
Michael Mandrus
2024-06-19 09:20:52 -04:00
committed by GitHub
parent d928fac5c3
commit 8a8f97b0e4
20 changed files with 1939 additions and 182 deletions
@@ -7,6 +7,9 @@ import (
"errors"
"fmt"
"net/http"
"os"
"path/filepath"
"sync"
"time"
"github.com/grafana/grafana/pkg/api/response"
@@ -17,7 +20,6 @@ import (
"github.com/grafana/grafana/pkg/services/cloudmigration"
"github.com/grafana/grafana/pkg/services/cloudmigration/api"
"github.com/grafana/grafana/pkg/services/cloudmigration/gmsclient"
"github.com/grafana/grafana/pkg/services/contexthandler"
"github.com/grafana/grafana/pkg/services/dashboards"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/featuremgmt"
@@ -25,6 +27,7 @@ import (
"github.com/grafana/grafana/pkg/services/gcom"
"github.com/grafana/grafana/pkg/services/secrets"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
@@ -37,6 +40,9 @@ type Service struct {
log *log.ConcreteLogger
cfg *setting.Cfg
buildSnapshotMutex sync.Mutex
buildSnapshotError bool
features featuremgmt.FeatureToggles
gmsClient gmsclient.Client
@@ -398,7 +404,6 @@ func (s *Service) RunMigration(ctx context.Context, uid string) (*cloudmigration
return nil, fmt.Errorf("migrate data error: %w", err)
}
// TODO update cloud migration run schema to treat the result as a first-class citizen
respData, err := json.Marshal(resp)
if err != nil {
s.log.Error("error marshalling migration response data: %w", err)
@@ -419,135 +424,6 @@ func (s *Service) RunMigration(ctx context.Context, uid string) (*cloudmigration
return resp, nil
}
func (s *Service) getMigrationDataJSON(ctx context.Context) (*cloudmigration.MigrateDataRequest, error) {
// Data sources
dataSources, err := s.getDataSources(ctx)
if err != nil {
s.log.Error("Failed to get datasources", "err", err)
return nil, err
}
// Dashboards
dashboards, err := s.getDashboards(ctx)
if err != nil {
s.log.Error("Failed to get dashboards", "err", err)
return nil, err
}
// Folders
folders, err := s.getFolders(ctx)
if err != nil {
s.log.Error("Failed to get folders", "err", err)
return nil, err
}
migrationDataSlice := make(
[]cloudmigration.MigrateDataRequestItem, 0,
len(dataSources)+len(dashboards)+len(folders),
)
for _, ds := range dataSources {
migrationDataSlice = append(migrationDataSlice, cloudmigration.MigrateDataRequestItem{
Type: cloudmigration.DatasourceDataType,
RefID: ds.UID,
Name: ds.Name,
Data: ds,
})
}
for _, dashboard := range dashboards {
dashboard.Data.Del("id")
migrationDataSlice = append(migrationDataSlice, cloudmigration.MigrateDataRequestItem{
Type: cloudmigration.DashboardDataType,
RefID: dashboard.UID,
Name: dashboard.Title,
Data: map[string]any{"dashboard": dashboard.Data},
})
}
for _, f := range folders {
migrationDataSlice = append(migrationDataSlice, cloudmigration.MigrateDataRequestItem{
Type: cloudmigration.FolderDataType,
RefID: f.UID,
Name: f.Title,
Data: f,
})
}
migrationData := &cloudmigration.MigrateDataRequest{
Items: migrationDataSlice,
}
return migrationData, nil
}
func (s *Service) getDataSources(ctx context.Context) ([]datasources.AddDataSourceCommand, error) {
dataSources, err := s.dsService.GetAllDataSources(ctx, &datasources.GetAllDataSourcesQuery{})
if err != nil {
s.log.Error("Failed to get all datasources", "err", err)
return nil, err
}
result := []datasources.AddDataSourceCommand{}
for _, dataSource := range dataSources {
// Decrypt secure json to send raw credentials
decryptedData, err := s.secretsService.DecryptJsonData(ctx, dataSource.SecureJsonData)
if err != nil {
s.log.Error("Failed to decrypt secure json data", "err", err)
return nil, err
}
dataSourceCmd := datasources.AddDataSourceCommand{
OrgID: dataSource.OrgID,
Name: dataSource.Name,
Type: dataSource.Type,
Access: dataSource.Access,
URL: dataSource.URL,
User: dataSource.User,
Database: dataSource.Database,
BasicAuth: dataSource.BasicAuth,
BasicAuthUser: dataSource.BasicAuthUser,
WithCredentials: dataSource.WithCredentials,
IsDefault: dataSource.IsDefault,
JsonData: dataSource.JsonData,
SecureJsonData: decryptedData,
ReadOnly: dataSource.ReadOnly,
UID: dataSource.UID,
}
result = append(result, dataSourceCmd)
}
return result, err
}
func (s *Service) getFolders(ctx context.Context) ([]folder.Folder, error) {
reqCtx := contexthandler.FromContext(ctx)
folders, err := s.folderService.GetFolders(ctx, folder.GetFoldersQuery{
SignedInUser: reqCtx.SignedInUser,
})
if err != nil {
return nil, err
}
result := make([]folder.Folder, len(folders))
for i, folder := range folders {
result[i] = *folder
}
return result, nil
}
func (s *Service) getDashboards(ctx context.Context) ([]dashboards.Dashboard, error) {
dashs, err := s.dashboardService.GetAllDashboards(ctx)
if err != nil {
return nil, err
}
result := make([]dashboards.Dashboard, len(dashs))
for i, dashboard := range dashs {
result[i] = *dashboard
}
return result, nil
}
func (s *Service) createMigrationRun(ctx context.Context, cmr cloudmigration.CloudMigrationSnapshot) (string, error) {
uid, err := s.store.CreateMigrationRun(ctx, cmr)
if err != nil {
@@ -565,13 +441,13 @@ func (s *Service) GetMigrationStatus(ctx context.Context, runUID string) (*cloud
return cmr, nil
}
func (s *Service) GetMigrationRunList(ctx context.Context, migUID string) (*cloudmigration.SnapshotList, error) {
func (s *Service) GetMigrationRunList(ctx context.Context, migUID string) (*cloudmigration.CloudMigrationRunList, error) {
runs, err := s.store.GetMigrationStatusList(ctx, migUID)
if err != nil {
return nil, fmt.Errorf("retrieving migration statuses from db: %w", err)
}
runList := &cloudmigration.SnapshotList{Runs: []cloudmigration.MigrateDataResponseList{}}
runList := &cloudmigration.CloudMigrationRunList{Runs: []cloudmigration.MigrateDataResponseList{}}
for _, s := range runs {
runList.Runs = append(runList.Runs, cloudmigration.MigrateDataResponseList{
RunUID: s.UID,
@@ -589,6 +465,123 @@ func (s *Service) DeleteSession(ctx context.Context, uid string) (*cloudmigratio
return c, nil
}
func (s *Service) CreateSnapshot(ctx context.Context, sessionUid string) (*cloudmigration.CloudMigrationSnapshot, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.CreateSnapshot")
defer span.End()
// fetch session for the gms auth token
session, err := s.store.GetMigrationSessionByUID(ctx, sessionUid)
if err != nil {
return nil, fmt.Errorf("fetching migration session for uid %s: %w", sessionUid, err)
}
// query gms to establish new snapshot
initResp, err := s.gmsClient.InitializeSnapshot(ctx, *session)
if err != nil {
return nil, fmt.Errorf("initializing snapshot with GMS for session %s: %w", sessionUid, err)
}
// create new directory for snapshot writing
snapshotUid := util.GenerateShortUID()
dir := filepath.Join("cloudmigration.snapshots", fmt.Sprintf("snapshot-%s-%s", snapshotUid, initResp.GMSSnapshotUID))
err = os.MkdirAll(dir, 0750)
if err != nil {
return nil, fmt.Errorf("creating snapshot directory: %w", err)
}
// save snapshot to the db
snapshot := cloudmigration.CloudMigrationSnapshot{
UID: snapshotUid,
SessionUID: sessionUid,
Status: cloudmigration.SnapshotStatusInitializing,
EncryptionKey: initResp.EncryptionKey,
UploadURL: initResp.UploadURL,
GMSSnapshotUID: initResp.GMSSnapshotUID,
LocalDir: dir,
}
uid, err := s.store.CreateSnapshot(ctx, snapshot)
if err != nil {
return nil, fmt.Errorf("saving snapshot: %w", err)
}
snapshot.UID = uid
// start building the snapshot asynchronously while we return a success response to the client
go s.buildSnapshot(context.Background(), snapshot)
return &snapshot, nil
}
// GetSnapshot returns the on-prem version of a snapshot, supplemented with processing status from GMS
func (s *Service) GetSnapshot(ctx context.Context, sessionUid string, snapshotUid string) (*cloudmigration.CloudMigrationSnapshot, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.GetSnapshot")
defer span.End()
snapshot, err := s.store.GetSnapshotByUID(ctx, snapshotUid)
if err != nil {
return nil, fmt.Errorf("fetching snapshot for uid %s: %w", snapshotUid, err)
}
session, err := s.store.GetMigrationSessionByUID(ctx, sessionUid)
if err != nil {
return nil, fmt.Errorf("fetching session for uid %s: %w", sessionUid, err)
}
if snapshot.ShouldQueryGMS() {
// ask GMS for status if it's in the cloud
snapshotMeta, err := s.gmsClient.GetSnapshotStatus(ctx, *session, *snapshot)
if err != nil {
return nil, fmt.Errorf("error fetching snapshot status from GMS: sessionUid: %s, snapshotUid: %s", sessionUid, snapshotUid)
}
// grab any result available
// TODO: figure out a more intelligent way to do this, will depend on GMS apis
snapshot.Result = snapshotMeta.Result
if snapshotMeta.Status == cloudmigration.SnapshotStatusFinished {
// we need to update the snapshot in our db before reporting anything finished to the client
if err := s.store.UpdateSnapshot(ctx, cloudmigration.UpdateSnapshotCmd{
UID: snapshot.UID,
Status: cloudmigration.SnapshotStatusFinished,
Result: snapshot.Result,
}); err != nil {
return nil, fmt.Errorf("error updating snapshot status: %w", err)
}
}
}
return snapshot, nil
}
func (s *Service) GetSnapshotList(ctx context.Context, query cloudmigration.ListSnapshotsQuery) ([]cloudmigration.CloudMigrationSnapshot, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.GetSnapshotList")
defer span.End()
snapshotList, err := s.store.GetSnapshotList(ctx, query)
if err != nil {
return nil, fmt.Errorf("fetching snapshots for session uid %s: %w", query.SessionUID, err)
}
return snapshotList, nil
}
func (s *Service) UploadSnapshot(ctx context.Context, sessionUid string, snapshotUid string) error {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.UploadSnapshot")
defer span.End()
snapshot, err := s.GetSnapshot(ctx, sessionUid, snapshotUid)
if err != nil {
return fmt.Errorf("fetching snapshot with uid %s: %w", snapshotUid, err)
}
s.log.Info("Uploading snapshot with GMS ID %s in local directory %s to url %s", snapshot.GMSSnapshotUID, snapshot.LocalDir, snapshot.UploadURL)
s.log.Debug("UploadSnapshot not yet implemented, faking it")
// start uploading the snapshot asynchronously while we return a success response to the client
go s.uploadSnapshot(context.Background(), *snapshot)
return nil
}
func (s *Service) parseCloudMigrationConfig() (string, error) {
if s.cfg == nil {
return "", fmt.Errorf("cfg cannot be nil")