feat: add folder tree validation for unified data migration (#114342)
* fix: add validator * fix: revert startup index
This commit is contained in:
committed by
GitHub
parent
8d75d79313
commit
bca58f5626
@@ -18,6 +18,7 @@ import (
|
||||
// ValidationFunc is a function that validates migration results.
|
||||
|
||||
type Validator interface {
|
||||
Name() string
|
||||
Validate(ctx context.Context, sess *xorm.Session, response *resourcepb.BulkResponse, log log.Logger) error
|
||||
}
|
||||
|
||||
@@ -27,7 +28,7 @@ type ResourceMigration struct {
|
||||
migrator UnifiedMigrator
|
||||
resources []schema.GroupResource
|
||||
migrationID string
|
||||
validator Validator // Optional: custom validation logic for this migration
|
||||
validators []Validator // Optional: custom validation logic for this migration
|
||||
log log.Logger
|
||||
}
|
||||
|
||||
@@ -36,13 +37,13 @@ func NewResourceMigration(
|
||||
migrator UnifiedMigrator,
|
||||
resources []schema.GroupResource,
|
||||
migrationID string,
|
||||
validator Validator,
|
||||
validators []Validator,
|
||||
) *ResourceMigration {
|
||||
return &ResourceMigration{
|
||||
migrator: migrator,
|
||||
resources: resources,
|
||||
migrationID: migrationID,
|
||||
validator: validator,
|
||||
validators: validators,
|
||||
log: log.New("storage.unified.resource_migration." + migrationID),
|
||||
}
|
||||
}
|
||||
@@ -123,20 +124,22 @@ func (m *ResourceMigration) migrateOrg(ctx context.Context, sess *xorm.Session,
|
||||
return nil
|
||||
}
|
||||
|
||||
// validateMigration calls the custom validation function if provided
|
||||
// validateMigration runs all validators in sequence
|
||||
func (m *ResourceMigration) validateMigration(ctx context.Context, sess *xorm.Session, response *resourcepb.BulkResponse) error {
|
||||
if m.validator == nil {
|
||||
m.log.Debug("No validation function provided, skipping validation")
|
||||
if len(m.validators) == 0 {
|
||||
m.log.Debug("No validators provided, skipping validation")
|
||||
return nil
|
||||
}
|
||||
|
||||
return m.validator.Validate(ctx, sess, response, m.log)
|
||||
}
|
||||
for _, validator := range m.validators {
|
||||
m.log.Debug("Running validator", "name", validator.Name(), "total", len(m.validators))
|
||||
if err := validator.Validate(ctx, sess, response, m.log); err != nil {
|
||||
return fmt.Errorf("validator %s failed: %w", validator.Name(), err)
|
||||
}
|
||||
}
|
||||
|
||||
// LegacyTableInfo defines how to map a unified storage resource to its legacy table
|
||||
type LegacyTableInfo struct {
|
||||
Table string // Legacy table name (e.g., "dashboard", "playlist")
|
||||
WhereClause string // WHERE clause template with org_id parameter (e.g., "org_id = ? and is_folder = false")
|
||||
m.log.Debug("All validators passed", "count", len(m.validators))
|
||||
return nil
|
||||
}
|
||||
|
||||
func ParseOrgIDFromNamespace(namespace string) (int64, error) {
|
||||
|
||||
@@ -81,8 +81,7 @@ func RegisterMigrations(
|
||||
}
|
||||
|
||||
// Register resource migrations
|
||||
// To add a new resource type, simply add another migration here with the appropriate resources
|
||||
registerResourceMigrations(mg, migrator, client)
|
||||
registerDashboardAndFolderMigration(mg, migrator, client)
|
||||
|
||||
// Run all registered migrations (blocking)
|
||||
sec := cfg.Raw.Section("database")
|
||||
@@ -96,18 +95,31 @@ func RegisterMigrations(
|
||||
return nil
|
||||
}
|
||||
|
||||
func registerResourceMigrations(mg *sqlstoremigrator.Migrator, migrator UnifiedMigrator, client resource.ResourceClient) {
|
||||
func registerDashboardAndFolderMigration(mg *sqlstoremigrator.Migrator, migrator UnifiedMigrator, client resource.ResourceClient) {
|
||||
folders := schema.GroupResource{Group: "folder.grafana.app", Resource: "folders"}
|
||||
dashboards := schema.GroupResource{Group: "dashboard.grafana.app", Resource: "dashboards"}
|
||||
|
||||
folderCountValidator := NewCountValidator(
|
||||
client,
|
||||
folders,
|
||||
"dashboard",
|
||||
"org_id = ? and is_folder = true",
|
||||
)
|
||||
|
||||
dashboardCountValidator := NewCountValidator(
|
||||
client,
|
||||
dashboards,
|
||||
"dashboard",
|
||||
"org_id = ? and is_folder = false",
|
||||
)
|
||||
|
||||
folderTreeValidator := NewFolderTreeValidator(client, folders)
|
||||
|
||||
dashboardsAndFolders := NewResourceMigration(
|
||||
migrator,
|
||||
[]schema.GroupResource{
|
||||
{Group: "folder.grafana.app", Resource: "folders"},
|
||||
{Group: "dashboard.grafana.app", Resource: "dashboards"},
|
||||
},
|
||||
[]schema.GroupResource{folders, dashboards},
|
||||
"folders-dashboards",
|
||||
NewCountValidator(client, map[string]LegacyTableInfo{
|
||||
"folder.grafana.app/folders": {Table: "dashboard", WhereClause: "org_id = ? and is_folder = true"},
|
||||
"dashboard.grafana.app/dashboards": {Table: "dashboard", WhereClause: "org_id = ? and is_folder = false"},
|
||||
}),
|
||||
[]Validator{folderCountValidator, dashboardCountValidator, folderTreeValidator},
|
||||
)
|
||||
mg.AddMigration("folders and dashboards migration", dashboardsAndFolders)
|
||||
}
|
||||
|
||||
@@ -7,18 +7,80 @@ import (
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
"github.com/grafana/grafana/pkg/util/xorm"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
)
|
||||
|
||||
type CountValidator struct {
|
||||
client resourcepb.ResourceIndexClient
|
||||
legacyTableMap map[string]LegacyTableInfo
|
||||
func filterResponse(response *resourcepb.BulkResponse, resources []schema.GroupResource) *resourcepb.BulkResponse {
|
||||
if len(resources) == 0 {
|
||||
return response
|
||||
}
|
||||
|
||||
// Create a resource lookup map for efficient filtering
|
||||
resourceMap := make(map[string]bool)
|
||||
for _, res := range resources {
|
||||
key := fmt.Sprintf("%s/%s", res.Group, res.Resource)
|
||||
resourceMap[key] = true
|
||||
}
|
||||
|
||||
// Filter summaries to only include matching resources
|
||||
var filteredSummaries []*resourcepb.BulkResponse_Summary
|
||||
for _, summary := range response.Summary {
|
||||
key := fmt.Sprintf("%s/%s", summary.Group, summary.Resource)
|
||||
if resourceMap[key] {
|
||||
filteredSummaries = append(filteredSummaries, summary)
|
||||
}
|
||||
}
|
||||
|
||||
// Filter rejected items to only include matching resources
|
||||
var filteredRejected []*resourcepb.BulkResponse_Rejected
|
||||
for _, rejected := range response.Rejected {
|
||||
if rejected.Key != nil {
|
||||
key := fmt.Sprintf("%s/%s", rejected.Key.Group, rejected.Key.Resource)
|
||||
if resourceMap[key] {
|
||||
filteredRejected = append(filteredRejected, rejected)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create filtered response, preserving original processed count for percentage calculations
|
||||
return &resourcepb.BulkResponse{
|
||||
Error: response.Error,
|
||||
Processed: response.Processed,
|
||||
Summary: filteredSummaries,
|
||||
Rejected: filteredRejected,
|
||||
}
|
||||
}
|
||||
|
||||
func NewCountValidator(client resourcepb.ResourceIndexClient, legacyTableMap map[string]LegacyTableInfo) Validator {
|
||||
return &CountValidator{client: client, legacyTableMap: legacyTableMap}
|
||||
type CountValidator struct {
|
||||
name string
|
||||
client resourcepb.ResourceIndexClient
|
||||
resource schema.GroupResource
|
||||
table string
|
||||
whereClause string
|
||||
}
|
||||
|
||||
func NewCountValidator(
|
||||
client resourcepb.ResourceIndexClient,
|
||||
resource schema.GroupResource,
|
||||
table string,
|
||||
whereClause string,
|
||||
) Validator {
|
||||
return &CountValidator{
|
||||
name: "CountValidator",
|
||||
client: client,
|
||||
resource: resource,
|
||||
table: table,
|
||||
whereClause: whereClause,
|
||||
}
|
||||
}
|
||||
|
||||
func (v *CountValidator) Name() string {
|
||||
return v.name
|
||||
}
|
||||
|
||||
func (v *CountValidator) Validate(ctx context.Context, sess *xorm.Session, response *resourcepb.BulkResponse, log log.Logger) error {
|
||||
// Filter response to only include the configured resource
|
||||
response = filterResponse(response, []schema.GroupResource{v.resource})
|
||||
if len(response.Rejected) > 0 {
|
||||
log.Warn("Migration had rejected items", "count", len(response.Rejected))
|
||||
for i, rejected := range response.Rejected {
|
||||
@@ -34,66 +96,255 @@ func (v *CountValidator) Validate(ctx context.Context, sess *xorm.Session, respo
|
||||
// Rejections are not fatal - they may be expected for invalid data
|
||||
}
|
||||
|
||||
// Validate counts for each resource type
|
||||
for _, summary := range response.Summary {
|
||||
key := fmt.Sprintf("%s/%s", summary.Group, summary.Resource)
|
||||
tableInfo, ok := v.legacyTableMap[key]
|
||||
if !ok {
|
||||
log.Debug("No legacy table mapping for resource, skipping count validation",
|
||||
"resource", fmt.Sprintf("%s.%s", summary.Resource, summary.Group),
|
||||
"namespace", summary.Namespace)
|
||||
continue
|
||||
// Should have at most one summary after filtering
|
||||
if len(response.Summary) == 0 {
|
||||
log.Debug("No summaries found for resource, skipping count validation",
|
||||
"resource", fmt.Sprintf("%s.%s", v.resource.Resource, v.resource.Group))
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(response.Summary) > 1 {
|
||||
return fmt.Errorf("expected at most 1 summary after filtering, got %d", len(response.Summary))
|
||||
}
|
||||
|
||||
summary := response.Summary[0]
|
||||
|
||||
// Get legacy count from database
|
||||
orgID, err := ParseOrgIDFromNamespace(summary.Namespace)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid namespace %s: %w", summary.Namespace, err)
|
||||
}
|
||||
|
||||
legacyCount, err := sess.Table(v.table).Where(v.whereClause, orgID).Count()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to count %s: %w", v.table, err)
|
||||
}
|
||||
|
||||
// Get unified storage count using GetStats API
|
||||
statsResp, err := v.client.GetStats(ctx, &resourcepb.ResourceStatsRequest{
|
||||
Namespace: summary.Namespace,
|
||||
Kinds: []string{fmt.Sprintf("%s/%s", summary.Group, summary.Resource)},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get stats for %s/%s in namespace %s: %w",
|
||||
summary.Group, summary.Resource, summary.Namespace, err)
|
||||
}
|
||||
|
||||
// Find the count for this specific resource type
|
||||
var unifiedCount int64
|
||||
for _, stat := range statsResp.Stats {
|
||||
if stat.Group == summary.Group && stat.Resource == summary.Resource {
|
||||
unifiedCount = stat.Count
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Get legacy count from database
|
||||
orgID, err := ParseOrgIDFromNamespace(summary.Namespace)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid namespace %s: %w", summary.Namespace, err)
|
||||
}
|
||||
// Account for rejected items in validation
|
||||
expectedCount := unifiedCount + int64(len(response.Rejected))
|
||||
|
||||
legacyCount, err := sess.Table(tableInfo.Table).Where(tableInfo.WhereClause, orgID).Count()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to count %s: %w", tableInfo.Table, err)
|
||||
}
|
||||
log.Info("Count validation",
|
||||
"resource", fmt.Sprintf("%s.%s", summary.Resource, summary.Group),
|
||||
"namespace", summary.Namespace,
|
||||
"legacy_count", legacyCount,
|
||||
"unified_count", unifiedCount,
|
||||
"migration_summary_count", summary.Count,
|
||||
"rejected", len(response.Rejected),
|
||||
"history", summary.History)
|
||||
|
||||
// Get unified storage count using GetStats API
|
||||
statsResp, err := v.client.GetStats(ctx, &resourcepb.ResourceStatsRequest{
|
||||
Namespace: summary.Namespace,
|
||||
Kinds: []string{fmt.Sprintf("%s/%s", summary.Group, summary.Resource)},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get stats for %s/%s in namespace %s: %w",
|
||||
summary.Group, summary.Resource, summary.Namespace, err)
|
||||
}
|
||||
|
||||
// Find the count for this specific resource type
|
||||
var unifiedCount int64
|
||||
for _, stat := range statsResp.Stats {
|
||||
if stat.Group == summary.Group && stat.Resource == summary.Resource {
|
||||
unifiedCount = stat.Count
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Account for rejected items in validation
|
||||
expectedCount := unifiedCount + int64(len(response.Rejected))
|
||||
|
||||
log.Info("Count validation",
|
||||
"resource", fmt.Sprintf("%s.%s", summary.Resource, summary.Group),
|
||||
"namespace", summary.Namespace,
|
||||
"legacy_count", legacyCount,
|
||||
"unified_count", unifiedCount,
|
||||
"migration_summary_count", summary.Count,
|
||||
"rejected", len(response.Rejected),
|
||||
"history", summary.History)
|
||||
|
||||
// Validate that we migrated all items (allowing for rejected items)
|
||||
if legacyCount > expectedCount {
|
||||
return fmt.Errorf("count mismatch for %s.%s in namespace %s: legacy has %d, unified has %d, rejected %d",
|
||||
summary.Resource, summary.Group, summary.Namespace,
|
||||
legacyCount, unifiedCount, len(response.Rejected))
|
||||
}
|
||||
// Validate that we migrated all items (allowing for rejected items)
|
||||
if legacyCount > expectedCount {
|
||||
return fmt.Errorf("count mismatch for %s.%s in namespace %s: legacy has %d, unified has %d, rejected %d",
|
||||
summary.Resource, summary.Group, summary.Namespace,
|
||||
legacyCount, unifiedCount, len(response.Rejected))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type FolderTreeValidator struct {
|
||||
name string
|
||||
client resourcepb.ResourceIndexClient
|
||||
resource schema.GroupResource
|
||||
}
|
||||
|
||||
func NewFolderTreeValidator(
|
||||
client resourcepb.ResourceIndexClient,
|
||||
resource schema.GroupResource,
|
||||
) Validator {
|
||||
return &FolderTreeValidator{
|
||||
name: "FolderTreeValidator",
|
||||
client: client,
|
||||
resource: resource,
|
||||
}
|
||||
}
|
||||
|
||||
type legacyFolder struct {
|
||||
ID int64 `xorm:"id"`
|
||||
UID string `xorm:"uid"`
|
||||
FolderUID string `xorm:"folder_uid"`
|
||||
Title string `xorm:"title"`
|
||||
}
|
||||
|
||||
func (v *FolderTreeValidator) Name() string {
|
||||
return v.name
|
||||
}
|
||||
|
||||
func (v *FolderTreeValidator) Validate(ctx context.Context, sess *xorm.Session, response *resourcepb.BulkResponse, log log.Logger) error {
|
||||
// Filter response to only include the configured resource (folders)
|
||||
response = filterResponse(response, []schema.GroupResource{v.resource})
|
||||
|
||||
// Should have at most one summary after filtering
|
||||
if len(response.Summary) == 0 {
|
||||
log.Debug("No summaries found for folders, skipping folder tree validation")
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(response.Summary) > 1 {
|
||||
return fmt.Errorf("expected at most 1 summary after filtering, got %d", len(response.Summary))
|
||||
}
|
||||
|
||||
summary := response.Summary[0]
|
||||
|
||||
// Get orgID from namespace
|
||||
orgID, err := ParseOrgIDFromNamespace(summary.Namespace)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid namespace %s: %w", summary.Namespace, err)
|
||||
}
|
||||
|
||||
// Build legacy folder parent map
|
||||
legacyParentMap, err := v.buildLegacyFolderParentMap(sess, orgID, log)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to build legacy folder parent map: %w", err)
|
||||
}
|
||||
|
||||
// Build unified storage folder parent map
|
||||
unifiedParentMap, err := v.buildUnifiedFolderParentMap(ctx, summary.Namespace, log)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to build unified folder parent map: %w", err)
|
||||
}
|
||||
|
||||
// Compare the two maps
|
||||
mismatches := []string{}
|
||||
for uid, legacyParent := range legacyParentMap {
|
||||
unifiedParent, exists := unifiedParentMap[uid]
|
||||
if !exists {
|
||||
// Folder exists in legacy but not in unified - might be rejected, skip
|
||||
log.Debug("Folder exists in legacy but not in unified storage",
|
||||
"uid", uid,
|
||||
"legacy_parent", legacyParent)
|
||||
continue
|
||||
}
|
||||
|
||||
if legacyParent != unifiedParent {
|
||||
mismatch := fmt.Sprintf("folder %s: legacy parent=%s, unified parent=%s",
|
||||
uid, legacyParent, unifiedParent)
|
||||
mismatches = append(mismatches, mismatch)
|
||||
log.Warn("Folder parent mismatch",
|
||||
"uid", uid,
|
||||
"legacy_parent", legacyParent,
|
||||
"unified_parent", unifiedParent)
|
||||
}
|
||||
}
|
||||
|
||||
// Check for folders in unified but not in legacy (shouldn't happen)
|
||||
for uid := range unifiedParentMap {
|
||||
if _, exists := legacyParentMap[uid]; !exists {
|
||||
mismatch := fmt.Sprintf("folder %s exists in unified but not in legacy", uid)
|
||||
mismatches = append(mismatches, mismatch)
|
||||
log.Warn("Folder exists in unified but not in legacy", "uid", uid)
|
||||
}
|
||||
}
|
||||
|
||||
if len(mismatches) > 0 {
|
||||
log.Error("Folder tree structure validation failed",
|
||||
"mismatch_count", len(mismatches),
|
||||
"total_legacy_folders", len(legacyParentMap),
|
||||
"total_unified_folders", len(unifiedParentMap))
|
||||
return fmt.Errorf("folder tree structure mismatch: %d folders have incorrect parents", len(mismatches))
|
||||
}
|
||||
|
||||
log.Info("Folder tree structure validation passed",
|
||||
"folder_count", len(legacyParentMap),
|
||||
"namespace", summary.Namespace)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *FolderTreeValidator) buildLegacyFolderParentMap(sess *xorm.Session, orgID int64, log log.Logger) (map[string]string, error) {
|
||||
// Query all folders for this org
|
||||
var folders []legacyFolder
|
||||
err := sess.Table("dashboard").
|
||||
Cols("id", "uid", "folder_uid", "title").
|
||||
Where("org_id = ? AND is_folder = ?", orgID, true).
|
||||
Find(&folders)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query legacy folders: %w", err)
|
||||
}
|
||||
|
||||
parentMap := make(map[string]string)
|
||||
for _, folder := range folders {
|
||||
parentMap[folder.UID] = folder.FolderUID
|
||||
}
|
||||
|
||||
if len(parentMap) == 0 {
|
||||
log.Debug("No legacy folders found for org", "org_id", orgID)
|
||||
return make(map[string]string), nil
|
||||
}
|
||||
|
||||
log.Debug("Built legacy folder parent map",
|
||||
"folder_count", len(parentMap),
|
||||
"org_id", orgID)
|
||||
|
||||
return parentMap, nil
|
||||
}
|
||||
|
||||
func (v *FolderTreeValidator) buildUnifiedFolderParentMap(ctx context.Context, namespace string, log log.Logger) (map[string]string, error) {
|
||||
// Search for all folders in this namespace
|
||||
searchResp, err := v.client.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||
Options: &resourcepb.ListOptions{
|
||||
Key: &resourcepb.ResourceKey{
|
||||
Namespace: namespace,
|
||||
Group: v.resource.Group,
|
||||
Resource: v.resource.Resource,
|
||||
},
|
||||
},
|
||||
Limit: 100000, // Large limit to get all folders
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to search folders in unified storage: %w", err)
|
||||
}
|
||||
|
||||
if searchResp.Results == nil {
|
||||
return make(map[string]string), nil
|
||||
}
|
||||
|
||||
parentMap := make(map[string]string)
|
||||
for _, row := range searchResp.Results.Rows {
|
||||
if row.Key == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
folderUID := row.Key.Name
|
||||
parentUID := ""
|
||||
|
||||
folderColIdx := -1
|
||||
for i, col := range searchResp.Results.Columns {
|
||||
if col.Name == "folder" {
|
||||
folderColIdx = i
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if folderColIdx >= 0 && folderColIdx < len(row.Cells) {
|
||||
parentUID = string(row.Cells[folderColIdx])
|
||||
}
|
||||
|
||||
parentMap[folderUID] = parentUID
|
||||
}
|
||||
|
||||
log.Debug("Built unified folder parent map",
|
||||
"folder_count", len(parentMap),
|
||||
"namespace", namespace)
|
||||
|
||||
return parentMap, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user