CloudMigrations: Move business logic out of api layer (#86406)

* move run migration to the cloudmigrationimpl layer

* add migration run list logic down a layer

* remove useless comments

* pull cms calls into their own service
This commit is contained in:
Michael Mandrus
2024-04-17 15:43:09 -04:00
committed by GitHub
parent 00256d3fdd
commit df4c8c3cbc
5 changed files with 217 additions and 158 deletions
@@ -1,20 +1,22 @@
package cloudmigrationimpl
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
"strconv"
"time"
"github.com/grafana/grafana/pkg/api/response"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/cloudmigration"
"github.com/grafana/grafana/pkg/services/cloudmigration/api"
"github.com/grafana/grafana/pkg/services/cloudmigration/cmsclient"
"github.com/grafana/grafana/pkg/services/contexthandler"
"github.com/grafana/grafana/pkg/services/dashboards"
"github.com/grafana/grafana/pkg/services/datasources"
@@ -33,7 +35,8 @@ type Service struct {
log *log.ConcreteLogger
cfg *setting.Cfg
features featuremgmt.FeatureToggles
features featuremgmt.FeatureToggles
cmsClient cmsclient.Client
dsService datasources.DataSourceService
gcomService gcom.Service
@@ -70,9 +73,9 @@ func ProvideService(
tracer tracing.Tracer,
dashboardService dashboards.DashboardService,
folderService folder.Service,
) cloudmigration.Service {
) (cloudmigration.Service, error) {
if !features.IsEnabledGlobally(featuremgmt.FlagOnPremToCloudMigrations) {
return &NoopServiceImpl{}
return &NoopServiceImpl{}, nil
}
s := &Service{
@@ -90,11 +93,18 @@ func ProvideService(
}
s.api = api.RegisterApi(routeRegister, s, tracer)
// get CMS path from the config
domain, err := s.parseCloudMigrationConfig()
if err != nil {
return nil, fmt.Errorf("config parse error: %w", err)
}
s.cmsClient = cmsclient.NewCMSClient(domain)
if err := s.registerMetrics(prom, s.metrics); err != nil {
s.log.Warn("error registering prom metrics", "error", err.Error())
}
return s
return s, nil
}
func (s *Service) CreateToken(ctx context.Context) (cloudmigration.CreateAccessTokenResponse, error) {
@@ -213,44 +223,9 @@ func (s *Service) findAccessPolicyByName(ctx context.Context, regionSlug, access
func (s *Service) ValidateToken(ctx context.Context, cm cloudmigration.CloudMigration) error {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.ValidateToken")
defer span.End()
logger := s.log.FromContext(ctx)
// get CMS path from the config
domain, err := s.ParseCloudMigrationConfig()
if err != nil {
return fmt.Errorf("config parse error: %w", err)
}
path := fmt.Sprintf("https://cms-%s.%s/cloud-migrations/api/v1/validate-key", cm.ClusterSlug, domain)
// validation is an empty POST to CMS with the authorization header included
req, err := http.NewRequest("POST", path, bytes.NewReader(nil))
if err != nil {
logger.Error("error creating http request for token validation", "err", err.Error())
return fmt.Errorf("http request error: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %d:%s", cm.StackID, cm.AuthToken))
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
logger.Error("error sending http request for token validation", "err", err.Error())
return fmt.Errorf("http request error: %w", err)
}
defer func() {
if err := resp.Body.Close(); err != nil {
logger.Error("closing request body", "err", err.Error())
}
}()
if resp.StatusCode != 200 {
var errResp map[string]any
if err := json.NewDecoder(resp.Body).Decode(&errResp); err != nil {
logger.Error("decoding error response", "err", err.Error())
} else {
return fmt.Errorf("token validation failure: %v", errResp)
}
if err := s.cmsClient.ValidateKey(ctx, cm); err != nil {
return fmt.Errorf("validating key: %w", err)
}
return nil
@@ -323,10 +298,52 @@ func (s *Service) UpdateMigration(ctx context.Context, id int64, cm cloudmigrati
return nil, nil
}
func (s *Service) GetMigrationDataJSON(ctx context.Context, id int64) ([]byte, error) {
func (s *Service) RunMigration(ctx context.Context, id int64) (*cloudmigration.MigrateDataResponseDTO, error) {
// Get migration to read the auth token
migration, err := s.GetMigration(ctx, id)
if err != nil {
return nil, fmt.Errorf("migration get error: %w", err)
}
// Get migration data JSON
request, err := s.getMigrationDataJSON(ctx)
if err != nil {
s.log.Error("error getting the json request body for migration run", "err", err.Error())
return nil, fmt.Errorf("migration data get error: %w", err)
}
// Call the cms service
resp, err := s.cmsClient.MigrateData(ctx, *migration, *request)
if err != nil {
s.log.Error("error migrating data: %w", err)
return nil, fmt.Errorf("migrate data error: %w", err)
}
// TODO update cloud migration run schema to treat the result as a first-class citizen
respData, err := json.Marshal(resp)
if err != nil {
s.log.Error("error marshalling migration response data: %w", err)
return nil, fmt.Errorf("marshalling migration response data: %w", err)
}
// save the result of the migration
runID, err := s.SaveMigrationRun(ctx, &cloudmigration.CloudMigrationRun{
CloudMigrationUID: strconv.Itoa(int(id)),
Result: respData,
})
if err != nil {
response.Error(http.StatusInternalServerError, "migration run save error", err)
}
resp.RunID = runID
return resp, nil
}
func (s *Service) getMigrationDataJSON(ctx context.Context) (*cloudmigration.MigrateDataRequestDTO, error) {
var migrationDataSlice []cloudmigration.MigrateDataRequestItemDTO
// Data sources
dataSources, err := s.getDataSources(ctx, id)
dataSources, err := s.getDataSources(ctx)
if err != nil {
s.log.Error("Failed to get datasources", "err", err)
return nil, err
@@ -341,7 +358,7 @@ func (s *Service) GetMigrationDataJSON(ctx context.Context, id int64) ([]byte, e
}
// Dashboards
dashboards, err := s.getDashboards(ctx, id)
dashboards, err := s.getDashboards(ctx)
if err != nil {
s.log.Error("Failed to get dashboards", "err", err)
return nil, err
@@ -358,7 +375,7 @@ func (s *Service) GetMigrationDataJSON(ctx context.Context, id int64) ([]byte, e
}
// Folders
folders, err := s.getFolders(ctx, id)
folders, err := s.getFolders(ctx)
if err != nil {
s.log.Error("Failed to get folders", "err", err)
return nil, err
@@ -372,18 +389,14 @@ func (s *Service) GetMigrationDataJSON(ctx context.Context, id int64) ([]byte, e
Data: f,
})
}
migrationData := cloudmigration.MigrateDataRequestDTO{
migrationData := &cloudmigration.MigrateDataRequestDTO{
Items: migrationDataSlice,
}
result, err := json.Marshal(migrationData)
if err != nil {
s.log.Error("Failed to marshal datasources", "err", err)
return nil, err
}
return result, nil
return migrationData, nil
}
func (s *Service) getDataSources(ctx context.Context, id int64) ([]datasources.AddDataSourceCommand, error) {
func (s *Service) getDataSources(ctx context.Context) ([]datasources.AddDataSourceCommand, error) {
dataSources, err := s.dsService.GetAllDataSources(ctx, &datasources.GetAllDataSourcesQuery{})
if err != nil {
s.log.Error("Failed to get all datasources", "err", err)
@@ -420,7 +433,7 @@ func (s *Service) getDataSources(ctx context.Context, id int64) ([]datasources.A
return result, err
}
func (s *Service) getFolders(ctx context.Context, id int64) ([]folder.Folder, error) {
func (s *Service) getFolders(ctx context.Context) ([]folder.Folder, error) {
reqCtx := contexthandler.FromContext(ctx)
folders, err := s.folderService.GetFolders(ctx, folder.GetFoldersQuery{
SignedInUser: reqCtx.SignedInUser,
@@ -437,7 +450,7 @@ func (s *Service) getFolders(ctx context.Context, id int64) ([]folder.Folder, er
return result, nil
}
func (s *Service) getDashboards(ctx context.Context, id int64) ([]dashboards.Dashboard, error) {
func (s *Service) getDashboards(ctx context.Context) ([]dashboards.Dashboard, error) {
dashs, err := s.dashboardService.GetAllDashboards(ctx)
if err != nil {
return nil, err
@@ -471,12 +484,26 @@ func (s *Service) GetMigrationStatus(ctx context.Context, id string, runID strin
return cmr, nil
}
func (s *Service) GetMigrationStatusList(ctx context.Context, migrationID string) ([]*cloudmigration.CloudMigrationRun, error) {
cmrs, err := s.store.GetMigrationStatusList(ctx, migrationID)
func (s *Service) GetMigrationRunList(ctx context.Context, migrationID string) (*cloudmigration.CloudMigrationRunList, error) {
runs, err := s.store.GetMigrationStatusList(ctx, migrationID)
if err != nil {
return nil, fmt.Errorf("retrieving migration statuses from db: %w", err)
}
return cmrs, nil
runList := &cloudmigration.CloudMigrationRunList{Runs: []cloudmigration.MigrateDataResponseDTO{}}
for _, s := range runs {
// attempt to bind the raw result to a list of response item DTOs
r := cloudmigration.MigrateDataResponseDTO{
Items: []cloudmigration.MigrateDataResponseItemDTO{},
}
if err := json.Unmarshal(s.Result, &r); err != nil {
return nil, fmt.Errorf("error unmarshalling migration response items: %w", err)
}
r.RunID = s.ID
runList.Runs = append(runList.Runs, r)
}
return runList, nil
}
func (s *Service) DeleteMigration(ctx context.Context, id int64) (*cloudmigration.CloudMigration, error) {
@@ -487,7 +514,7 @@ func (s *Service) DeleteMigration(ctx context.Context, id int64) (*cloudmigratio
return c, nil
}
func (s *Service) ParseCloudMigrationConfig() (string, error) {
func (s *Service) parseCloudMigrationConfig() (string, error) {
if s.cfg == nil {
return "", fmt.Errorf("cfg cannot be nil")
}