Cloud migrations: create snapshot files (#89693)

* Cloud migrations: create snapshot and store it on disk

* fix merge conflicts

* implement StartSnapshot for gms client

* pass snapshot directory as argument to snapshot builder

* ensure snapshot folder is set

* make swagger-gen

* remove Test_ExecuteAsyncWorkflow

* pass signed in user to buildSnapshot method / use github.com/grafana/grafana-cloud-migration-snapshot to create snapshot files

* fix FakeServiceImpl.CreateSnapshot

* remove new line
This commit is contained in:
Bruno
2024-07-03 10:38:26 -03:00
committed by GitHub
parent 7b29242600
commit d1952bb681
18 changed files with 285 additions and 127 deletions
@@ -7,7 +7,6 @@ import (
"errors"
"fmt"
"net/http"
"os"
"path/filepath"
"sync"
"time"
@@ -26,6 +25,7 @@ import (
"github.com/grafana/grafana/pkg/services/folder"
"github.com/grafana/grafana/pkg/services/gcom"
"github.com/grafana/grafana/pkg/services/secrets"
"github.com/grafana/grafana/pkg/services/user"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/util"
"github.com/prometheus/client_golang/prometheus"
@@ -41,7 +41,6 @@ type Service struct {
cfg *setting.Cfg
buildSnapshotMutex sync.Mutex
buildSnapshotError bool
features featuremgmt.FeatureToggles
gmsClient gmsclient.Client
@@ -391,7 +390,7 @@ func (s *Service) RunMigration(ctx context.Context, uid string) (*cloudmigration
}
// Get migration data JSON
request, err := s.getMigrationDataJSON(ctx)
request, err := s.getMigrationDataJSON(ctx, &user.SignedInUser{})
if err != nil {
s.log.Error("error getting the json request body for migration run", "err", err.Error())
return nil, fmt.Errorf("migration data get error: %w", err)
@@ -459,8 +458,10 @@ func (s *Service) DeleteSession(ctx context.Context, uid string) (*cloudmigratio
return c, nil
}
func (s *Service) CreateSnapshot(ctx context.Context, sessionUid string) (*cloudmigration.CloudMigrationSnapshot, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.CreateSnapshot")
func (s *Service) CreateSnapshot(ctx context.Context, signedInUser *user.SignedInUser, sessionUid string) (*cloudmigration.CloudMigrationSnapshot, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.CreateSnapshot", trace.WithAttributes(
attribute.String("sessionUid", sessionUid),
))
defer span.End()
// fetch session for the gms auth token
@@ -470,28 +471,25 @@ func (s *Service) CreateSnapshot(ctx context.Context, sessionUid string) (*cloud
}
// query gms to establish new snapshot
initResp, err := s.gmsClient.InitializeSnapshot(ctx, *session)
timeoutCtx, cancel := context.WithTimeout(ctx, s.cfg.CloudMigration.StartSnapshotTimeout)
defer cancel()
initResp, err := s.gmsClient.StartSnapshot(timeoutCtx, *session)
if err != nil {
return nil, fmt.Errorf("initializing snapshot with GMS for session %s: %w", sessionUid, err)
}
// create new directory for snapshot writing
snapshotUid := util.GenerateShortUID()
dir := filepath.Join("cloudmigration.snapshots", fmt.Sprintf("snapshot-%s-%s", snapshotUid, initResp.GMSSnapshotUID))
err = os.MkdirAll(dir, 0750)
if err != nil {
return nil, fmt.Errorf("creating snapshot directory: %w", err)
if s.cfg.CloudMigration.SnapshotFolder == "" {
return nil, fmt.Errorf("snapshot folder is not set")
}
// save snapshot to the db
snapshot := cloudmigration.CloudMigrationSnapshot{
UID: snapshotUid,
UID: util.GenerateShortUID(),
SessionUID: sessionUid,
Status: cloudmigration.SnapshotStatusInitializing,
EncryptionKey: initResp.EncryptionKey,
UploadURL: initResp.UploadURL,
GMSSnapshotUID: initResp.GMSSnapshotUID,
LocalDir: dir,
GMSSnapshotUID: initResp.SnapshotID,
LocalDir: filepath.Join(s.cfg.CloudMigration.SnapshotFolder, "grafana", "snapshots", initResp.SnapshotID),
}
uid, err := s.store.CreateSnapshot(ctx, snapshot)
@@ -501,7 +499,11 @@ func (s *Service) CreateSnapshot(ctx context.Context, sessionUid string) (*cloud
snapshot.UID = uid
// start building the snapshot asynchronously while we return a success response to the client
go s.buildSnapshot(context.Background(), snapshot)
go func() {
if err := s.buildSnapshot(context.Background(), signedInUser, initResp.MaxItemsPerPartition, snapshot); err != nil {
s.log.Error("building snapshot", "err", err.Error())
}
}()
return &snapshot, nil
}
@@ -5,6 +5,7 @@ import (
"github.com/grafana/grafana/pkg/services/cloudmigration"
"github.com/grafana/grafana/pkg/services/gcom"
"github.com/grafana/grafana/pkg/services/user"
)
// NoopServiceImpl Define the Service Implementation.
@@ -60,7 +61,7 @@ func (s *NoopServiceImpl) RunMigration(context.Context, string) (*cloudmigration
return nil, cloudmigration.ErrFeatureDisabledError
}
func (s *NoopServiceImpl) CreateSnapshot(ctx context.Context, sessionUid string) (*cloudmigration.CloudMigrationSnapshot, error) {
func (s *NoopServiceImpl) CreateSnapshot(ctx context.Context, user *user.SignedInUser, sessionUid string) (*cloudmigration.CloudMigrationSnapshot, error) {
return nil, cloudmigration.ErrFeatureDisabledError
}
@@ -2,8 +2,11 @@ package cloudmigrationimpl
import (
"context"
"os"
"path/filepath"
"testing"
"github.com/google/uuid"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/db"
@@ -109,73 +112,6 @@ func Test_CreateGetRunMigrationsAndRuns(t *testing.T) {
require.NotNil(t, createResp.UID, delMigResp.UID)
}
func Test_ExecuteAsyncWorkflow(t *testing.T) {
s := setUpServiceTest(t, false)
createTokenResp, err := s.CreateToken(context.Background())
assert.NoError(t, err)
assert.NotEmpty(t, createTokenResp.Token)
cmd := cloudmigration.CloudMigrationSessionRequest{
AuthToken: createTokenResp.Token,
}
createResp, err := s.CreateSession(context.Background(), cmd)
require.NoError(t, err)
require.NotEmpty(t, createResp.UID)
require.NotEmpty(t, createResp.Slug)
getSessionResp, err := s.GetSession(context.Background(), createResp.UID)
require.NoError(t, err)
require.NotNil(t, getSessionResp)
require.Equal(t, createResp.UID, getSessionResp.UID)
require.Equal(t, createResp.Slug, getSessionResp.Slug)
listResp, err := s.GetSessionList(context.Background())
require.NoError(t, err)
require.NotNil(t, listResp)
require.Equal(t, 1, len(listResp.Sessions))
require.Equal(t, createResp.UID, listResp.Sessions[0].UID)
require.Equal(t, createResp.Slug, listResp.Sessions[0].Slug)
sessionUid := createResp.UID
snapshotResp, err := s.CreateSnapshot(ctxWithSignedInUser(), sessionUid)
require.NoError(t, err)
require.NotEmpty(t, snapshotResp.UID)
require.Equal(t, sessionUid, snapshotResp.SessionUID)
snapshotUid := snapshotResp.UID
// Service doesn't currently expose updating a snapshot externally, so we will just manually add a resource
err = (s.(*Service)).store.CreateUpdateSnapshotResources(context.Background(), snapshotUid, []cloudmigration.CloudMigrationResource{{Type: cloudmigration.DashboardDataType, RefID: "qwerty", Status: cloudmigration.ItemStatusOK}})
assert.NoError(t, err)
snapshot, err := s.GetSnapshot(ctxWithSignedInUser(), cloudmigration.GetSnapshotsQuery{
SnapshotUID: snapshotUid,
SessionUID: sessionUid,
ResultPage: 1,
ResultLimit: 100,
})
require.NoError(t, err)
assert.Equal(t, snapshotResp.UID, snapshot.UID)
assert.Equal(t, snapshotResp.EncryptionKey, snapshot.EncryptionKey)
assert.Len(t, snapshot.Resources, 1)
assert.Equal(t, "qwerty", snapshot.Resources[0].RefID)
snapshots, err := s.GetSnapshotList(ctxWithSignedInUser(), cloudmigration.ListSnapshotsQuery{SessionUID: sessionUid, Page: 1, Limit: 100})
require.NoError(t, err)
assert.Len(t, snapshots, 1)
assert.Equal(t, snapshotResp.UID, snapshots[0].UID)
assert.Equal(t, snapshotResp.EncryptionKey, snapshots[0].EncryptionKey)
assert.Empty(t, snapshots[0].Resources)
err = s.UploadSnapshot(ctxWithSignedInUser(), sessionUid, snapshotUid)
require.NoError(t, err)
assert.Panics(t, func() {
err = s.CancelSnapshot(ctxWithSignedInUser(), sessionUid, snapshotUid)
})
}
func ctxWithSignedInUser() context.Context {
c := &contextmodel.ReqContext{
SignedInUser: &user.SignedInUser{OrgID: 1},
@@ -202,6 +138,7 @@ func setUpServiceTest(t *testing.T, withDashboardMock bool) cloudmigration.Servi
require.NoError(t, err)
// dont know if this is the best, but dont want to refactor at the moment
cfg.CloudMigration.IsDeveloperMode = true
cfg.CloudMigration.SnapshotFolder = filepath.Join(os.TempDir(), uuid.NewString())
dashboardService := dashboards.NewFakeDashboardService(t)
if withDashboardMock {
@@ -7,6 +7,7 @@ import (
"github.com/grafana/grafana/pkg/services/cloudmigration"
"github.com/grafana/grafana/pkg/services/gcom"
"github.com/grafana/grafana/pkg/services/user"
)
var fixedDate = time.Date(2024, 6, 5, 17, 30, 40, 0, time.UTC)
@@ -129,7 +130,7 @@ func (m FakeServiceImpl) GetMigrationRunList(_ context.Context, _ string) (*clou
}, nil
}
func (m FakeServiceImpl) CreateSnapshot(ctx context.Context, sessionUid string) (*cloudmigration.CloudMigrationSnapshot, error) {
func (m FakeServiceImpl) CreateSnapshot(ctx context.Context, user *user.SignedInUser, sessionUid string) (*cloudmigration.CloudMigrationSnapshot, error) {
if m.ReturnError {
return nil, fmt.Errorf("mock error")
}
@@ -2,17 +2,24 @@ package cloudmigrationimpl
import (
"context"
cryptoRand "crypto/rand"
"fmt"
"time"
snapshot "github.com/grafana/grafana-cloud-migration-snapshot/src"
"github.com/grafana/grafana-cloud-migration-snapshot/src/contracts"
"github.com/grafana/grafana-cloud-migration-snapshot/src/infra/crypto"
"github.com/grafana/grafana/pkg/services/cloudmigration"
"github.com/grafana/grafana/pkg/services/contexthandler"
"github.com/grafana/grafana/pkg/services/cloudmigration/slicesext"
"github.com/grafana/grafana/pkg/services/dashboards"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/folder"
"github.com/grafana/grafana/pkg/services/user"
"github.com/grafana/grafana/pkg/util/retryer"
"golang.org/x/crypto/nacl/box"
)
func (s *Service) getMigrationDataJSON(ctx context.Context) (*cloudmigration.MigrateDataRequest, error) {
func (s *Service) getMigrationDataJSON(ctx context.Context, signedInUser *user.SignedInUser) (*cloudmigration.MigrateDataRequest, error) {
// Data sources
dataSources, err := s.getDataSources(ctx)
if err != nil {
@@ -28,7 +35,7 @@ func (s *Service) getMigrationDataJSON(ctx context.Context) (*cloudmigration.Mig
}
// Folders
folders, err := s.getFolders(ctx)
folders, err := s.getFolders(ctx, signedInUser)
if err != nil {
s.log.Error("Failed to get folders", "err", err)
return nil, err
@@ -111,10 +118,9 @@ func (s *Service) getDataSources(ctx context.Context) ([]datasources.AddDataSour
return result, err
}
func (s *Service) getFolders(ctx context.Context) ([]folder.Folder, error) {
reqCtx := contexthandler.FromContext(ctx)
func (s *Service) getFolders(ctx context.Context, signedInUser *user.SignedInUser) ([]folder.Folder, error) {
folders, err := s.folderService.GetFolders(ctx, folder.GetFoldersQuery{
SignedInUser: reqCtx.SignedInUser,
SignedInUser: signedInUser,
})
if err != nil {
return nil, err
@@ -143,11 +149,10 @@ func (s *Service) getDashboards(ctx context.Context) ([]dashboards.Dashboard, er
}
// asynchronous process for writing the snapshot to the filesystem and updating the snapshot status
func (s *Service) buildSnapshot(ctx context.Context, snapshotMeta cloudmigration.CloudMigrationSnapshot) {
func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedInUser, maxItemsPerPartition uint32, snapshotMeta cloudmigration.CloudMigrationSnapshot) error {
// TODO -- make sure we can only build one snapshot at a time
s.buildSnapshotMutex.Lock()
defer s.buildSnapshotMutex.Unlock()
s.buildSnapshotError = false
// update snapshot status to creating, add some retries since this is a background task
if err := retryer.Retry(func() (retryer.RetrySignal, error) {
@@ -158,18 +163,60 @@ func (s *Service) buildSnapshot(ctx context.Context, snapshotMeta cloudmigration
return retryer.FuncComplete, err
}, 10, time.Millisecond*100, time.Second*10); err != nil {
s.log.Error("failed to set snapshot status to 'creating'", "err", err)
s.buildSnapshotError = true
return
return fmt.Errorf("setting snapshot status to creating: snapshotUID=%s %w", snapshotMeta.UID, err)
}
// build snapshot
// just sleep for now to simulate snapshot creation happening
// need to do a couple of fancy things when we implement this:
// - some sort of regular check-in so we know we haven't timed out
// - a channel to listen for cancel events
// - retries baked into the snapshot writing process?
s.log.Debug("snapshot meta", "snapshot", snapshotMeta)
time.Sleep(3 * time.Second)
publicKey, privateKey, err := box.GenerateKey(cryptoRand.Reader)
if err != nil {
return fmt.Errorf("nacl: generating public and private key: %w", err)
}
// Use GMS public key + the grafana generated private private key to encrypt snapshot files.
snapshotWriter, err := snapshot.NewSnapshotWriter(contracts.AssymetricKeys{
Public: []byte(snapshotMeta.EncryptionKey),
Private: privateKey[:],
},
crypto.NewNacl(),
snapshotMeta.LocalDir,
)
if err != nil {
return fmt.Errorf("instantiating snapshot writer: %w", err)
}
migrationData, err := s.getMigrationDataJSON(ctx, signedInUser)
if err != nil {
return fmt.Errorf("fetching migration data: %w", err)
}
resourcesGroupedByType := make(map[cloudmigration.MigrateDataType][]snapshot.MigrateDataRequestItemDTO, 0)
for _, item := range migrationData.Items {
resourcesGroupedByType[item.Type] = append(resourcesGroupedByType[item.Type], snapshot.MigrateDataRequestItemDTO{
Type: snapshot.MigrateDataType(item.Type),
RefID: item.RefID,
Name: item.Name,
Data: item.Data,
})
}
for _, resourceType := range []cloudmigration.MigrateDataType{
cloudmigration.DatasourceDataType,
cloudmigration.FolderDataType,
cloudmigration.DashboardDataType,
} {
for _, chunk := range slicesext.Chunks(int(maxItemsPerPartition), resourcesGroupedByType[resourceType]) {
if err := snapshotWriter.Write(string(resourceType), chunk); err != nil {
return fmt.Errorf("writing resources to snapshot writer: resourceType=%s %w", resourceType, err)
}
}
}
// Add the grafana generated public key to the index file so gms can use it to decrypt the snapshot files later.
// This works because the snapshot files are being encrypted with
// the grafana generated private key + the gms public key.
_, err = snapshotWriter.Finish(publicKey[:])
if err != nil {
return fmt.Errorf("finishing writing snapshot files and generating index file: %w", err)
}
// update snapshot status to pending upload with retry
if err := retryer.Retry(func() (retryer.RetrySignal, error) {
@@ -180,8 +227,10 @@ func (s *Service) buildSnapshot(ctx context.Context, snapshotMeta cloudmigration
return retryer.FuncComplete, err
}, 10, time.Millisecond*100, time.Second*10); err != nil {
s.log.Error("failed to set snapshot status to 'pending upload'", "err", err)
s.buildSnapshotError = true
return fmt.Errorf("setting snapshot status to pending upload: snapshotID=%s %w", snapshotMeta.UID, err)
}
return nil
}
// asynchronous process for and updating the snapshot status
@@ -189,7 +238,6 @@ func (s *Service) uploadSnapshot(ctx context.Context, snapshotMeta cloudmigratio
// TODO -- make sure we can only upload one snapshot at a time
s.buildSnapshotMutex.Lock()
defer s.buildSnapshotMutex.Unlock()
s.buildSnapshotError = false
// update snapshot status to uploading, add some retries since this is a background task
if err := retryer.Retry(func() (retryer.RetrySignal, error) {
@@ -200,7 +248,6 @@ func (s *Service) uploadSnapshot(ctx context.Context, snapshotMeta cloudmigratio
return retryer.FuncComplete, err
}, 10, time.Millisecond*100, time.Second*10); err != nil {
s.log.Error("failed to set snapshot status to 'creating'", "err", err)
s.buildSnapshotError = true
return
}
@@ -218,7 +265,6 @@ func (s *Service) uploadSnapshot(ctx context.Context, snapshotMeta cloudmigratio
return retryer.FuncComplete, err
}, 10, time.Millisecond*100, time.Second*10); err != nil {
s.log.Error("failed to set snapshot status to 'pending upload'", "err", err)
s.buildSnapshotError = true
}
// simulate the rest