From bca58f5626c8e44a4f9e3497c80dbadb4e7e893b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mustafa=20Sencer=20=C3=96zcan?= <32759850+mustafasencer@users.noreply.github.com> Date: Mon, 24 Nov 2025 11:02:14 +0100 Subject: [PATCH] feat: add folder tree validation for unified data migration (#114342) * fix: add validator * fix: revert startup index --- .../unified/migrations/resource_migration.go | 27 +- pkg/storage/unified/migrations/service.go | 34 +- pkg/storage/unified/migrations/validator.go | 371 +++++++++++++++--- 3 files changed, 349 insertions(+), 83 deletions(-) diff --git a/pkg/storage/unified/migrations/resource_migration.go b/pkg/storage/unified/migrations/resource_migration.go index cf67d220ec3..281ee1c85e3 100644 --- a/pkg/storage/unified/migrations/resource_migration.go +++ b/pkg/storage/unified/migrations/resource_migration.go @@ -18,6 +18,7 @@ import ( // ValidationFunc is a function that validates migration results. type Validator interface { + Name() string Validate(ctx context.Context, sess *xorm.Session, response *resourcepb.BulkResponse, log log.Logger) error } @@ -27,7 +28,7 @@ type ResourceMigration struct { migrator UnifiedMigrator resources []schema.GroupResource migrationID string - validator Validator // Optional: custom validation logic for this migration + validators []Validator // Optional: custom validation logic for this migration log log.Logger } @@ -36,13 +37,13 @@ func NewResourceMigration( migrator UnifiedMigrator, resources []schema.GroupResource, migrationID string, - validator Validator, + validators []Validator, ) *ResourceMigration { return &ResourceMigration{ migrator: migrator, resources: resources, migrationID: migrationID, - validator: validator, + validators: validators, log: log.New("storage.unified.resource_migration." + migrationID), } } @@ -123,20 +124,22 @@ func (m *ResourceMigration) migrateOrg(ctx context.Context, sess *xorm.Session, return nil } -// validateMigration calls the custom validation function if provided +// validateMigration runs all validators in sequence func (m *ResourceMigration) validateMigration(ctx context.Context, sess *xorm.Session, response *resourcepb.BulkResponse) error { - if m.validator == nil { - m.log.Debug("No validation function provided, skipping validation") + if len(m.validators) == 0 { + m.log.Debug("No validators provided, skipping validation") return nil } - return m.validator.Validate(ctx, sess, response, m.log) -} + for _, validator := range m.validators { + m.log.Debug("Running validator", "name", validator.Name(), "total", len(m.validators)) + if err := validator.Validate(ctx, sess, response, m.log); err != nil { + return fmt.Errorf("validator %s failed: %w", validator.Name(), err) + } + } -// LegacyTableInfo defines how to map a unified storage resource to its legacy table -type LegacyTableInfo struct { - Table string // Legacy table name (e.g., "dashboard", "playlist") - WhereClause string // WHERE clause template with org_id parameter (e.g., "org_id = ? and is_folder = false") + m.log.Debug("All validators passed", "count", len(m.validators)) + return nil } func ParseOrgIDFromNamespace(namespace string) (int64, error) { diff --git a/pkg/storage/unified/migrations/service.go b/pkg/storage/unified/migrations/service.go index 699be937d72..1a78f837066 100644 --- a/pkg/storage/unified/migrations/service.go +++ b/pkg/storage/unified/migrations/service.go @@ -81,8 +81,7 @@ func RegisterMigrations( } // Register resource migrations - // To add a new resource type, simply add another migration here with the appropriate resources - registerResourceMigrations(mg, migrator, client) + registerDashboardAndFolderMigration(mg, migrator, client) // Run all registered migrations (blocking) sec := cfg.Raw.Section("database") @@ -96,18 +95,31 @@ func RegisterMigrations( return nil } -func registerResourceMigrations(mg *sqlstoremigrator.Migrator, migrator UnifiedMigrator, client resource.ResourceClient) { +func registerDashboardAndFolderMigration(mg *sqlstoremigrator.Migrator, migrator UnifiedMigrator, client resource.ResourceClient) { + folders := schema.GroupResource{Group: "folder.grafana.app", Resource: "folders"} + dashboards := schema.GroupResource{Group: "dashboard.grafana.app", Resource: "dashboards"} + + folderCountValidator := NewCountValidator( + client, + folders, + "dashboard", + "org_id = ? and is_folder = true", + ) + + dashboardCountValidator := NewCountValidator( + client, + dashboards, + "dashboard", + "org_id = ? and is_folder = false", + ) + + folderTreeValidator := NewFolderTreeValidator(client, folders) + dashboardsAndFolders := NewResourceMigration( migrator, - []schema.GroupResource{ - {Group: "folder.grafana.app", Resource: "folders"}, - {Group: "dashboard.grafana.app", Resource: "dashboards"}, - }, + []schema.GroupResource{folders, dashboards}, "folders-dashboards", - NewCountValidator(client, map[string]LegacyTableInfo{ - "folder.grafana.app/folders": {Table: "dashboard", WhereClause: "org_id = ? and is_folder = true"}, - "dashboard.grafana.app/dashboards": {Table: "dashboard", WhereClause: "org_id = ? and is_folder = false"}, - }), + []Validator{folderCountValidator, dashboardCountValidator, folderTreeValidator}, ) mg.AddMigration("folders and dashboards migration", dashboardsAndFolders) } diff --git a/pkg/storage/unified/migrations/validator.go b/pkg/storage/unified/migrations/validator.go index 6deef77bb6d..7a973976402 100644 --- a/pkg/storage/unified/migrations/validator.go +++ b/pkg/storage/unified/migrations/validator.go @@ -7,18 +7,80 @@ import ( "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/storage/unified/resourcepb" "github.com/grafana/grafana/pkg/util/xorm" + "k8s.io/apimachinery/pkg/runtime/schema" ) -type CountValidator struct { - client resourcepb.ResourceIndexClient - legacyTableMap map[string]LegacyTableInfo +func filterResponse(response *resourcepb.BulkResponse, resources []schema.GroupResource) *resourcepb.BulkResponse { + if len(resources) == 0 { + return response + } + + // Create a resource lookup map for efficient filtering + resourceMap := make(map[string]bool) + for _, res := range resources { + key := fmt.Sprintf("%s/%s", res.Group, res.Resource) + resourceMap[key] = true + } + + // Filter summaries to only include matching resources + var filteredSummaries []*resourcepb.BulkResponse_Summary + for _, summary := range response.Summary { + key := fmt.Sprintf("%s/%s", summary.Group, summary.Resource) + if resourceMap[key] { + filteredSummaries = append(filteredSummaries, summary) + } + } + + // Filter rejected items to only include matching resources + var filteredRejected []*resourcepb.BulkResponse_Rejected + for _, rejected := range response.Rejected { + if rejected.Key != nil { + key := fmt.Sprintf("%s/%s", rejected.Key.Group, rejected.Key.Resource) + if resourceMap[key] { + filteredRejected = append(filteredRejected, rejected) + } + } + } + + // Create filtered response, preserving original processed count for percentage calculations + return &resourcepb.BulkResponse{ + Error: response.Error, + Processed: response.Processed, + Summary: filteredSummaries, + Rejected: filteredRejected, + } } -func NewCountValidator(client resourcepb.ResourceIndexClient, legacyTableMap map[string]LegacyTableInfo) Validator { - return &CountValidator{client: client, legacyTableMap: legacyTableMap} +type CountValidator struct { + name string + client resourcepb.ResourceIndexClient + resource schema.GroupResource + table string + whereClause string +} + +func NewCountValidator( + client resourcepb.ResourceIndexClient, + resource schema.GroupResource, + table string, + whereClause string, +) Validator { + return &CountValidator{ + name: "CountValidator", + client: client, + resource: resource, + table: table, + whereClause: whereClause, + } +} + +func (v *CountValidator) Name() string { + return v.name } func (v *CountValidator) Validate(ctx context.Context, sess *xorm.Session, response *resourcepb.BulkResponse, log log.Logger) error { + // Filter response to only include the configured resource + response = filterResponse(response, []schema.GroupResource{v.resource}) if len(response.Rejected) > 0 { log.Warn("Migration had rejected items", "count", len(response.Rejected)) for i, rejected := range response.Rejected { @@ -34,66 +96,255 @@ func (v *CountValidator) Validate(ctx context.Context, sess *xorm.Session, respo // Rejections are not fatal - they may be expected for invalid data } - // Validate counts for each resource type - for _, summary := range response.Summary { - key := fmt.Sprintf("%s/%s", summary.Group, summary.Resource) - tableInfo, ok := v.legacyTableMap[key] - if !ok { - log.Debug("No legacy table mapping for resource, skipping count validation", - "resource", fmt.Sprintf("%s.%s", summary.Resource, summary.Group), - "namespace", summary.Namespace) - continue + // Should have at most one summary after filtering + if len(response.Summary) == 0 { + log.Debug("No summaries found for resource, skipping count validation", + "resource", fmt.Sprintf("%s.%s", v.resource.Resource, v.resource.Group)) + return nil + } + + if len(response.Summary) > 1 { + return fmt.Errorf("expected at most 1 summary after filtering, got %d", len(response.Summary)) + } + + summary := response.Summary[0] + + // Get legacy count from database + orgID, err := ParseOrgIDFromNamespace(summary.Namespace) + if err != nil { + return fmt.Errorf("invalid namespace %s: %w", summary.Namespace, err) + } + + legacyCount, err := sess.Table(v.table).Where(v.whereClause, orgID).Count() + if err != nil { + return fmt.Errorf("failed to count %s: %w", v.table, err) + } + + // Get unified storage count using GetStats API + statsResp, err := v.client.GetStats(ctx, &resourcepb.ResourceStatsRequest{ + Namespace: summary.Namespace, + Kinds: []string{fmt.Sprintf("%s/%s", summary.Group, summary.Resource)}, + }) + if err != nil { + return fmt.Errorf("failed to get stats for %s/%s in namespace %s: %w", + summary.Group, summary.Resource, summary.Namespace, err) + } + + // Find the count for this specific resource type + var unifiedCount int64 + for _, stat := range statsResp.Stats { + if stat.Group == summary.Group && stat.Resource == summary.Resource { + unifiedCount = stat.Count + break } + } - // Get legacy count from database - orgID, err := ParseOrgIDFromNamespace(summary.Namespace) - if err != nil { - return fmt.Errorf("invalid namespace %s: %w", summary.Namespace, err) - } + // Account for rejected items in validation + expectedCount := unifiedCount + int64(len(response.Rejected)) - legacyCount, err := sess.Table(tableInfo.Table).Where(tableInfo.WhereClause, orgID).Count() - if err != nil { - return fmt.Errorf("failed to count %s: %w", tableInfo.Table, err) - } + log.Info("Count validation", + "resource", fmt.Sprintf("%s.%s", summary.Resource, summary.Group), + "namespace", summary.Namespace, + "legacy_count", legacyCount, + "unified_count", unifiedCount, + "migration_summary_count", summary.Count, + "rejected", len(response.Rejected), + "history", summary.History) - // Get unified storage count using GetStats API - statsResp, err := v.client.GetStats(ctx, &resourcepb.ResourceStatsRequest{ - Namespace: summary.Namespace, - Kinds: []string{fmt.Sprintf("%s/%s", summary.Group, summary.Resource)}, - }) - if err != nil { - return fmt.Errorf("failed to get stats for %s/%s in namespace %s: %w", - summary.Group, summary.Resource, summary.Namespace, err) - } - - // Find the count for this specific resource type - var unifiedCount int64 - for _, stat := range statsResp.Stats { - if stat.Group == summary.Group && stat.Resource == summary.Resource { - unifiedCount = stat.Count - break - } - } - - // Account for rejected items in validation - expectedCount := unifiedCount + int64(len(response.Rejected)) - - log.Info("Count validation", - "resource", fmt.Sprintf("%s.%s", summary.Resource, summary.Group), - "namespace", summary.Namespace, - "legacy_count", legacyCount, - "unified_count", unifiedCount, - "migration_summary_count", summary.Count, - "rejected", len(response.Rejected), - "history", summary.History) - - // Validate that we migrated all items (allowing for rejected items) - if legacyCount > expectedCount { - return fmt.Errorf("count mismatch for %s.%s in namespace %s: legacy has %d, unified has %d, rejected %d", - summary.Resource, summary.Group, summary.Namespace, - legacyCount, unifiedCount, len(response.Rejected)) - } + // Validate that we migrated all items (allowing for rejected items) + if legacyCount > expectedCount { + return fmt.Errorf("count mismatch for %s.%s in namespace %s: legacy has %d, unified has %d, rejected %d", + summary.Resource, summary.Group, summary.Namespace, + legacyCount, unifiedCount, len(response.Rejected)) } return nil } + +type FolderTreeValidator struct { + name string + client resourcepb.ResourceIndexClient + resource schema.GroupResource +} + +func NewFolderTreeValidator( + client resourcepb.ResourceIndexClient, + resource schema.GroupResource, +) Validator { + return &FolderTreeValidator{ + name: "FolderTreeValidator", + client: client, + resource: resource, + } +} + +type legacyFolder struct { + ID int64 `xorm:"id"` + UID string `xorm:"uid"` + FolderUID string `xorm:"folder_uid"` + Title string `xorm:"title"` +} + +func (v *FolderTreeValidator) Name() string { + return v.name +} + +func (v *FolderTreeValidator) Validate(ctx context.Context, sess *xorm.Session, response *resourcepb.BulkResponse, log log.Logger) error { + // Filter response to only include the configured resource (folders) + response = filterResponse(response, []schema.GroupResource{v.resource}) + + // Should have at most one summary after filtering + if len(response.Summary) == 0 { + log.Debug("No summaries found for folders, skipping folder tree validation") + return nil + } + + if len(response.Summary) > 1 { + return fmt.Errorf("expected at most 1 summary after filtering, got %d", len(response.Summary)) + } + + summary := response.Summary[0] + + // Get orgID from namespace + orgID, err := ParseOrgIDFromNamespace(summary.Namespace) + if err != nil { + return fmt.Errorf("invalid namespace %s: %w", summary.Namespace, err) + } + + // Build legacy folder parent map + legacyParentMap, err := v.buildLegacyFolderParentMap(sess, orgID, log) + if err != nil { + return fmt.Errorf("failed to build legacy folder parent map: %w", err) + } + + // Build unified storage folder parent map + unifiedParentMap, err := v.buildUnifiedFolderParentMap(ctx, summary.Namespace, log) + if err != nil { + return fmt.Errorf("failed to build unified folder parent map: %w", err) + } + + // Compare the two maps + mismatches := []string{} + for uid, legacyParent := range legacyParentMap { + unifiedParent, exists := unifiedParentMap[uid] + if !exists { + // Folder exists in legacy but not in unified - might be rejected, skip + log.Debug("Folder exists in legacy but not in unified storage", + "uid", uid, + "legacy_parent", legacyParent) + continue + } + + if legacyParent != unifiedParent { + mismatch := fmt.Sprintf("folder %s: legacy parent=%s, unified parent=%s", + uid, legacyParent, unifiedParent) + mismatches = append(mismatches, mismatch) + log.Warn("Folder parent mismatch", + "uid", uid, + "legacy_parent", legacyParent, + "unified_parent", unifiedParent) + } + } + + // Check for folders in unified but not in legacy (shouldn't happen) + for uid := range unifiedParentMap { + if _, exists := legacyParentMap[uid]; !exists { + mismatch := fmt.Sprintf("folder %s exists in unified but not in legacy", uid) + mismatches = append(mismatches, mismatch) + log.Warn("Folder exists in unified but not in legacy", "uid", uid) + } + } + + if len(mismatches) > 0 { + log.Error("Folder tree structure validation failed", + "mismatch_count", len(mismatches), + "total_legacy_folders", len(legacyParentMap), + "total_unified_folders", len(unifiedParentMap)) + return fmt.Errorf("folder tree structure mismatch: %d folders have incorrect parents", len(mismatches)) + } + + log.Info("Folder tree structure validation passed", + "folder_count", len(legacyParentMap), + "namespace", summary.Namespace) + + return nil +} + +func (v *FolderTreeValidator) buildLegacyFolderParentMap(sess *xorm.Session, orgID int64, log log.Logger) (map[string]string, error) { + // Query all folders for this org + var folders []legacyFolder + err := sess.Table("dashboard"). + Cols("id", "uid", "folder_uid", "title"). + Where("org_id = ? AND is_folder = ?", orgID, true). + Find(&folders) + if err != nil { + return nil, fmt.Errorf("failed to query legacy folders: %w", err) + } + + parentMap := make(map[string]string) + for _, folder := range folders { + parentMap[folder.UID] = folder.FolderUID + } + + if len(parentMap) == 0 { + log.Debug("No legacy folders found for org", "org_id", orgID) + return make(map[string]string), nil + } + + log.Debug("Built legacy folder parent map", + "folder_count", len(parentMap), + "org_id", orgID) + + return parentMap, nil +} + +func (v *FolderTreeValidator) buildUnifiedFolderParentMap(ctx context.Context, namespace string, log log.Logger) (map[string]string, error) { + // Search for all folders in this namespace + searchResp, err := v.client.Search(ctx, &resourcepb.ResourceSearchRequest{ + Options: &resourcepb.ListOptions{ + Key: &resourcepb.ResourceKey{ + Namespace: namespace, + Group: v.resource.Group, + Resource: v.resource.Resource, + }, + }, + Limit: 100000, // Large limit to get all folders + }) + if err != nil { + return nil, fmt.Errorf("failed to search folders in unified storage: %w", err) + } + + if searchResp.Results == nil { + return make(map[string]string), nil + } + + parentMap := make(map[string]string) + for _, row := range searchResp.Results.Rows { + if row.Key == nil { + continue + } + + folderUID := row.Key.Name + parentUID := "" + + folderColIdx := -1 + for i, col := range searchResp.Results.Columns { + if col.Name == "folder" { + folderColIdx = i + break + } + } + + if folderColIdx >= 0 && folderColIdx < len(row.Cells) { + parentUID = string(row.Cells[folderColIdx]) + } + + parentMap[folderUID] = parentUID + } + + log.Debug("Built unified folder parent map", + "folder_count", len(parentMap), + "namespace", namespace) + + return parentMap, nil +}