Cloud migration: upload snapshot files using presigned url (#90273)
* Cloud migration: upload snapshot files using presigned url * log error if index file cannot be closed * log error if file cannot be closed in uploadUsingPresignedURL
This commit is contained in:
@@ -19,6 +19,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/cloudmigration"
|
||||
"github.com/grafana/grafana/pkg/services/cloudmigration/api"
|
||||
"github.com/grafana/grafana/pkg/services/cloudmigration/gmsclient"
|
||||
"github.com/grafana/grafana/pkg/services/cloudmigration/objectstorage"
|
||||
"github.com/grafana/grafana/pkg/services/dashboards"
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
@@ -42,8 +43,9 @@ type Service struct {
|
||||
|
||||
buildSnapshotMutex sync.Mutex
|
||||
|
||||
features featuremgmt.FeatureToggles
|
||||
gmsClient gmsclient.Client
|
||||
features featuremgmt.FeatureToggles
|
||||
gmsClient gmsclient.Client
|
||||
objectStorage objectstorage.ObjectStorage
|
||||
|
||||
dsService datasources.DataSourceService
|
||||
gcomService gcom.Service
|
||||
@@ -99,6 +101,8 @@ func ProvideService(
|
||||
}
|
||||
s.api = api.RegisterApi(routeRegister, s, tracer)
|
||||
|
||||
s.objectStorage = objectstorage.NewS3()
|
||||
|
||||
if !cfg.CloudMigration.IsDeveloperMode {
|
||||
// get GMS path from the config
|
||||
domain, err := s.parseCloudMigrationConfig()
|
||||
@@ -558,9 +562,20 @@ func (s *Service) GetSnapshotList(ctx context.Context, query cloudmigration.List
|
||||
}
|
||||
|
||||
func (s *Service) UploadSnapshot(ctx context.Context, sessionUid string, snapshotUid string) error {
|
||||
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.UploadSnapshot")
|
||||
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.UploadSnapshot",
|
||||
trace.WithAttributes(
|
||||
attribute.String("sessionUid", sessionUid),
|
||||
attribute.String("snapshotUid", snapshotUid),
|
||||
),
|
||||
)
|
||||
defer span.End()
|
||||
|
||||
// fetch session for the gms auth token
|
||||
session, err := s.store.GetMigrationSessionByUID(ctx, sessionUid)
|
||||
if err != nil {
|
||||
return fmt.Errorf("fetching migration session for uid %s: %w", sessionUid, err)
|
||||
}
|
||||
|
||||
snapshot, err := s.GetSnapshot(ctx, cloudmigration.GetSnapshotsQuery{
|
||||
SnapshotUID: snapshotUid,
|
||||
SessionUID: sessionUid,
|
||||
@@ -569,11 +584,14 @@ func (s *Service) UploadSnapshot(ctx context.Context, sessionUid string, snapsho
|
||||
return fmt.Errorf("fetching snapshot with uid %s: %w", snapshotUid, err)
|
||||
}
|
||||
|
||||
s.log.Info("Uploading snapshot with GMS ID %s in local directory %s to url %s", snapshot.GMSSnapshotUID, snapshot.LocalDir, snapshot.UploadURL)
|
||||
s.log.Debug("UploadSnapshot not yet implemented, faking it")
|
||||
s.log.Info("Uploading snapshot in local directory", "gmsSnapshotUID", snapshot.GMSSnapshotUID, "localDir", snapshot.LocalDir, "uploadURL", snapshot.UploadURL)
|
||||
|
||||
// start uploading the snapshot asynchronously while we return a success response to the client
|
||||
go s.uploadSnapshot(context.Background(), *snapshot)
|
||||
go func() {
|
||||
if err := s.uploadSnapshot(context.Background(), session, snapshot); err != nil {
|
||||
s.log.Error("uploading snapshot", "err", err)
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -4,6 +4,8 @@ import (
|
||||
"context"
|
||||
cryptoRand "crypto/rand"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
snapshot "github.com/grafana/grafana-cloud-migration-snapshot/src"
|
||||
@@ -213,8 +215,7 @@ func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedIn
|
||||
// Add the grafana generated public key to the index file so gms can use it to decrypt the snapshot files later.
|
||||
// This works because the snapshot files are being encrypted with
|
||||
// the grafana generated private key + the gms public key.
|
||||
_, err = snapshotWriter.Finish(publicKey[:])
|
||||
if err != nil {
|
||||
if _, err := snapshotWriter.Finish(publicKey[:]); err != nil {
|
||||
return fmt.Errorf("finishing writing snapshot files and generating index file: %w", err)
|
||||
}
|
||||
|
||||
@@ -234,7 +235,7 @@ func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedIn
|
||||
}
|
||||
|
||||
// asynchronous process for and updating the snapshot status
|
||||
func (s *Service) uploadSnapshot(ctx context.Context, snapshotMeta cloudmigration.CloudMigrationSnapshot) {
|
||||
func (s *Service) uploadSnapshot(ctx context.Context, session *cloudmigration.CloudMigrationSession, snapshotMeta *cloudmigration.CloudMigrationSnapshot) (err error) {
|
||||
// TODO -- make sure we can only upload one snapshot at a time
|
||||
s.buildSnapshotMutex.Lock()
|
||||
defer s.buildSnapshotMutex.Unlock()
|
||||
@@ -247,34 +248,74 @@ func (s *Service) uploadSnapshot(ctx context.Context, snapshotMeta cloudmigratio
|
||||
})
|
||||
return retryer.FuncComplete, err
|
||||
}, 10, time.Millisecond*100, time.Second*10); err != nil {
|
||||
s.log.Error("failed to set snapshot status to 'creating'", "err", err)
|
||||
return
|
||||
return fmt.Errorf("failed to set snapshot status to 'creating': %w", err)
|
||||
}
|
||||
|
||||
// upload snapshot
|
||||
// just sleep for now to simulate snapshot creation happening
|
||||
s.log.Debug("snapshot meta", "snapshot", snapshotMeta)
|
||||
time.Sleep(3 * time.Second)
|
||||
indexFilePath := filepath.Join(snapshotMeta.LocalDir, "index.json")
|
||||
// LocalDir can be set in the configuration, therefore the file path can be set to any path.
|
||||
// nolint:gosec
|
||||
indexFile, err := os.Open(indexFilePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("opening index files: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
if closeErr := indexFile.Close(); closeErr != nil {
|
||||
s.log.Error("closing index file", "err", closeErr.Error())
|
||||
}
|
||||
}()
|
||||
|
||||
// update snapshot status to pending processing with retry
|
||||
if err := retryer.Retry(func() (retryer.RetrySignal, error) {
|
||||
err := s.store.UpdateSnapshot(ctx, cloudmigration.UpdateSnapshotCmd{
|
||||
UID: snapshotMeta.UID,
|
||||
Status: cloudmigration.SnapshotStatusPendingProcessing,
|
||||
})
|
||||
return retryer.FuncComplete, err
|
||||
}, 10, time.Millisecond*100, time.Second*10); err != nil {
|
||||
s.log.Error("failed to set snapshot status to 'pending upload'", "err", err)
|
||||
index, err := snapshot.ReadIndex(indexFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("reading index from file: %w", err)
|
||||
}
|
||||
|
||||
// Upload the data files.
|
||||
for _, fileNames := range index.Items {
|
||||
for _, fileName := range fileNames {
|
||||
filePath := filepath.Join(snapshotMeta.LocalDir, fileName)
|
||||
key := fmt.Sprintf("%d/snapshots/%s/%s", session.StackID, snapshotMeta.GMSSnapshotUID, fileName)
|
||||
if err := s.uploadUsingPresignedURL(ctx, snapshotMeta.UploadURL, key, filePath); err != nil {
|
||||
return fmt.Errorf("uploading snapshot file using presigned url: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Upload the index file. Must be done after uploading the data files.
|
||||
key := fmt.Sprintf("%d/snapshots/%s/%s", session.StackID, snapshotMeta.GMSSnapshotUID, "index.json")
|
||||
if _, err := indexFile.Seek(0, 0); err != nil {
|
||||
return fmt.Errorf("seeking to beginning of index file: %w", err)
|
||||
}
|
||||
|
||||
if err := s.objectStorage.PresignedURLUpload(ctx, snapshotMeta.UploadURL, key, indexFile); err != nil {
|
||||
return fmt.Errorf("uploading file using presigned url: %w", err)
|
||||
}
|
||||
|
||||
// simulate the rest
|
||||
// processing
|
||||
time.Sleep(3 * time.Second)
|
||||
if err := s.store.UpdateSnapshot(ctx, cloudmigration.UpdateSnapshotCmd{
|
||||
UID: snapshotMeta.UID,
|
||||
Status: cloudmigration.SnapshotStatusProcessing,
|
||||
}); err != nil {
|
||||
s.log.Error("updating snapshot", "err", err)
|
||||
return fmt.Errorf("updating snapshot: %w", err)
|
||||
}
|
||||
// end here as the GetSnapshot handler will fill in the rest when called
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) uploadUsingPresignedURL(ctx context.Context, uploadURL, key string, filePath string) (err error) {
|
||||
// The directory that contains the file can set in the configuration, therefore the directory can be any directory.
|
||||
// nolint:gosec
|
||||
file, err := os.Open(filePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("opening snapshot file: path=%s %w", filePath, err)
|
||||
}
|
||||
defer func() {
|
||||
if closeErr := file.Close(); closeErr != nil {
|
||||
s.log.Error("closing file", "path", filePath, "err", closeErr)
|
||||
}
|
||||
}()
|
||||
|
||||
if err = s.objectStorage.PresignedURLUpload(ctx, uploadURL, key, file); err != nil {
|
||||
return fmt.Errorf("uploading file using presigned url: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user