diff --git a/apps/dashboard/pkg/apis/dashboard/cuevalidator/validator.go b/apps/dashboard/pkg/apis/dashboard/cuevalidator/validator.go new file mode 100644 index 00000000000..38792ade026 --- /dev/null +++ b/apps/dashboard/pkg/apis/dashboard/cuevalidator/validator.go @@ -0,0 +1,29 @@ +package cuevalidator + +import ( + "sync" + + "cuelang.org/go/cue" + cuejson "cuelang.org/go/encoding/json" +) + +// Validator provides thread-safe CUE schema validation. +// +// CUE is not safe for concurrent use: https://github.com/cue-lang/cue/discussions/1205#discussioncomment-1189238 +// This validator uses a mutex to protect concurrent access to the underlying CUE validation. +type Validator struct { + schema cue.Value + mu sync.Mutex +} + +func NewValidator(schema cue.Value) *Validator { + return &Validator{ + schema: schema, + } +} + +func (v *Validator) Validate(data []byte) error { + v.mu.Lock() + defer v.mu.Unlock() + return cuejson.Validate(data, v.schema) +} diff --git a/apps/dashboard/pkg/apis/dashboard/v0alpha1/validation.go b/apps/dashboard/pkg/apis/dashboard/v0alpha1/validation.go index 6fdf1dd4515..7c5573f600a 100644 --- a/apps/dashboard/pkg/apis/dashboard/v0alpha1/validation.go +++ b/apps/dashboard/pkg/apis/dashboard/v0alpha1/validation.go @@ -7,13 +7,13 @@ import ( "strings" "sync" + "github.com/grafana/grafana/apps/dashboard/pkg/apis/dashboard/cuevalidator" "github.com/grafana/grafana/apps/dashboard/pkg/migration/schemaversion" "k8s.io/apimachinery/pkg/util/validation/field" "cuelang.org/go/cue" "cuelang.org/go/cue/cuecontext" "cuelang.org/go/cue/errors" - cuejson "cuelang.org/go/encoding/json" ) func ValidateDashboardSpec(obj *Dashboard, forceValidation bool) (field.ErrorList, field.ErrorList) { @@ -33,7 +33,7 @@ func ValidateDashboardSpec(obj *Dashboard, forceValidation bool) (field.ErrorLis }, schemaVersionError } - if err := cuejson.Validate(data, getCueSchema()); err != nil { + if err := getValidator().Validate(data); err != nil { errs := field.ErrorList{} for _, e := range errors.Errors(err) { @@ -71,20 +71,21 @@ func formatErrorPath(path []string) string { } var ( - compiledSchema cue.Value - getSchemaOnce sync.Once + validator *cuevalidator.Validator + getSchemaOnce sync.Once ) //go:embed dashboard_kind.cue var schemaSource string -func getCueSchema() cue.Value { +func getValidator() *cuevalidator.Validator { getSchemaOnce.Do(func() { cueCtx := cuecontext.New() - compiledSchema = cueCtx.CompileString(schemaSource).LookupPath( + compiledSchema := cueCtx.CompileString(schemaSource).LookupPath( cue.ParsePath("lineage.schemas[0].schema.spec"), ) + validator = cuevalidator.NewValidator(compiledSchema) }) - return compiledSchema + return validator } diff --git a/apps/dashboard/pkg/apis/dashboard/v1beta1/validation.go b/apps/dashboard/pkg/apis/dashboard/v1beta1/validation.go index 7020aa46b90..b48e38ffd7b 100644 --- a/apps/dashboard/pkg/apis/dashboard/v1beta1/validation.go +++ b/apps/dashboard/pkg/apis/dashboard/v1beta1/validation.go @@ -12,8 +12,8 @@ import ( "cuelang.org/go/cue" "cuelang.org/go/cue/cuecontext" "cuelang.org/go/cue/errors" - cuejson "cuelang.org/go/encoding/json" + "github.com/grafana/grafana/apps/dashboard/pkg/apis/dashboard/cuevalidator" "github.com/grafana/grafana/apps/dashboard/pkg/migration/schemaversion" ) @@ -34,7 +34,7 @@ func ValidateDashboardSpec(obj *Dashboard, forceValidation bool) (field.ErrorLis }, schemaVersionError } - if err := cuejson.Validate(data, getCueSchema()); err != nil { + if err := getValidator().Validate(data); err != nil { errs := field.ErrorList{} for _, e := range errors.Errors(err) { @@ -72,20 +72,21 @@ func formatErrorPath(path []string) string { } var ( - compiledSchema cue.Value - getSchemaOnce sync.Once + validator *cuevalidator.Validator + getSchemaOnce sync.Once ) //go:embed dashboard_kind.cue var schemaSource string -func getCueSchema() cue.Value { +func getValidator() *cuevalidator.Validator { getSchemaOnce.Do(func() { cueCtx := cuecontext.New() - compiledSchema = cueCtx.CompileString(schemaSource).LookupPath( + compiledSchema := cueCtx.CompileString(schemaSource).LookupPath( cue.ParsePath("lineage.schemas[0].schema.spec"), ) + validator = cuevalidator.NewValidator(compiledSchema) }) - return compiledSchema + return validator } diff --git a/apps/dashboard/pkg/apis/dashboard/v2alpha1/validation.go b/apps/dashboard/pkg/apis/dashboard/v2alpha1/validation.go index 7c61faa8924..ca9dcd3e514 100644 --- a/apps/dashboard/pkg/apis/dashboard/v2alpha1/validation.go +++ b/apps/dashboard/pkg/apis/dashboard/v2alpha1/validation.go @@ -12,7 +12,8 @@ import ( "cuelang.org/go/cue" "cuelang.org/go/cue/cuecontext" "cuelang.org/go/cue/errors" - cuejson "cuelang.org/go/encoding/json" + + "github.com/grafana/grafana/apps/dashboard/pkg/apis/dashboard/cuevalidator" ) func ValidateDashboardSpec(obj *Dashboard) field.ErrorList { @@ -26,7 +27,7 @@ func ValidateDashboardSpec(obj *Dashboard) field.ErrorList { // Custom validation for action query params and headers validateAndTrimActionArrays(obj) - if err := cuejson.Validate(data, getCueSchema()); err != nil { + if err := getValidator().Validate(data); err != nil { errs := field.ErrorList{} for _, e := range errors.Errors(err) { @@ -123,20 +124,21 @@ func formatErrorPath(path []string) string { } var ( - compiledSchema cue.Value - getSchemaOnce sync.Once + validator *cuevalidator.Validator + getSchemaOnce sync.Once ) //go:embed dashboard_spec.cue var schemaSource string -func getCueSchema() cue.Value { +func getValidator() *cuevalidator.Validator { getSchemaOnce.Do(func() { cueCtx := cuecontext.New() - compiledSchema = cueCtx.CompileString(schemaSource).LookupPath( + compiledSchema := cueCtx.CompileString(schemaSource).LookupPath( cue.ParsePath("DashboardSpec"), ) + validator = cuevalidator.NewValidator(compiledSchema) }) - return compiledSchema + return validator } diff --git a/apps/dashboard/pkg/apis/dashboard/v2beta1/validation.go b/apps/dashboard/pkg/apis/dashboard/v2beta1/validation.go index 7c859626b98..518c133bcd7 100644 --- a/apps/dashboard/pkg/apis/dashboard/v2beta1/validation.go +++ b/apps/dashboard/pkg/apis/dashboard/v2beta1/validation.go @@ -12,7 +12,8 @@ import ( "cuelang.org/go/cue" "cuelang.org/go/cue/cuecontext" "cuelang.org/go/cue/errors" - cuejson "cuelang.org/go/encoding/json" + + "github.com/grafana/grafana/apps/dashboard/pkg/apis/dashboard/cuevalidator" ) func ValidateDashboardSpec(obj *Dashboard) field.ErrorList { @@ -26,7 +27,7 @@ func ValidateDashboardSpec(obj *Dashboard) field.ErrorList { // Custom validation for action query params and headers validateAndTrimActionArrays(obj) - if err := cuejson.Validate(data, getCueSchema()); err != nil { + if err := getValidator().Validate(data); err != nil { errs := field.ErrorList{} for _, e := range errors.Errors(err) { @@ -123,20 +124,21 @@ func formatErrorPath(path []string) string { } var ( - compiledSchema cue.Value - getSchemaOnce sync.Once + validator *cuevalidator.Validator + getSchemaOnce sync.Once ) //go:embed dashboard_spec.cue var schemaSource string -func getCueSchema() cue.Value { +func getValidator() *cuevalidator.Validator { getSchemaOnce.Do(func() { cueCtx := cuecontext.New() - compiledSchema = cueCtx.CompileString(schemaSource).LookupPath( + compiledSchema := cueCtx.CompileString(schemaSource).LookupPath( cue.ParsePath("DashboardSpec"), ) + validator = cuevalidator.NewValidator(compiledSchema) }) - return compiledSchema + return validator } diff --git a/pkg/operators/provisioning/repo_operator.go b/pkg/operators/provisioning/repo_operator.go index f96653a1d87..1651a126ffe 100644 --- a/pkg/operators/provisioning/repo_operator.go +++ b/pkg/operators/provisioning/repo_operator.go @@ -90,6 +90,7 @@ func RunRepoController(deps server.OperatorDependencies) error { statusPatcher, deps.Registerer, tracer, + controllerCfg.parallelOperations, ) if err != nil { return fmt.Errorf("failed to create repository controller: %w", err) @@ -107,6 +108,7 @@ func RunRepoController(deps server.OperatorDependencies) error { type repoControllerConfig struct { provisioningControllerConfig workerCount int + parallelOperations int allowedTargets []string allowImageRendering bool minSyncInterval time.Duration @@ -128,6 +130,7 @@ func getRepoControllerConfig(cfg *setting.Cfg, registry prometheus.Registerer) ( provisioningControllerConfig: *controllerCfg, allowedTargets: allowedTargets, workerCount: cfg.SectionWithEnvOverrides("operator").Key("worker_count").MustInt(1), + parallelOperations: cfg.SectionWithEnvOverrides("operator").Key("parallel_operations").MustInt(10), allowImageRendering: cfg.SectionWithEnvOverrides("provisioning").Key("allow_image_rendering").MustBool(false), minSyncInterval: cfg.SectionWithEnvOverrides("provisioning").Key("min_sync_interval").MustDuration(1 * time.Minute), }, nil diff --git a/pkg/registry/apis/provisioning/controller/finalizers.go b/pkg/registry/apis/provisioning/controller/finalizers.go index d9c41ee4c15..5bd468fe660 100644 --- a/pkg/registry/apis/provisioning/controller/finalizers.go +++ b/pkg/registry/apis/provisioning/controller/finalizers.go @@ -7,8 +7,11 @@ import ( "slices" "sort" "strings" + "sync/atomic" "time" + "github.com/grafana/dskit/concurrency" + "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -27,6 +30,7 @@ type finalizer struct { lister resources.ResourceLister clientFactory resources.ClientFactory metrics *finalizerMetrics + maxWorkers int } func (f *finalizer) process(ctx context.Context, @@ -113,27 +117,73 @@ func (f *finalizer) processExistingItems( // Safe deletion order sortResourceListForDeletion(items) - count := 0 + var dashboards, folderItems []*provisioning.ResourceListItem for _, item := range items.Items { - res, _, err := clients.ForResource(ctx, schema.GroupVersionResource{ + if item.Group == folders.GroupVersion.Group { + folderItems = append(folderItems, &item) + } else { + dashboards = append(dashboards, &item) + } + } + + processItem := func(jobCtx context.Context, item *provisioning.ResourceListItem) error { + res, _, err := clients.ForResource(jobCtx, schema.GroupVersionResource{ Group: item.Group, Resource: item.Resource, }) if err != nil { logger.Error("error getting client for resource", "resource", item.Resource, "error", err) - return count, err + return err } - err = cb(res, &item) + err = cb(res, item) if err != nil { + if errors.IsNotFound(err) { + logger.Info("resource not found, skipping", "name", item.Name, "group", item.Group, "resource", item.Resource) + return nil + } logger.Error("error processing item", "name", item.Name, "error", err) - return count, fmt.Errorf("processing item: %w", err) - } else { + return fmt.Errorf("processing item: %w", err) + } + return nil + } + + processGroup := func(group []*provisioning.ResourceListItem) (int, error) { + var processed int64 + err := concurrency.ForEachJob(ctx, len(group), f.maxWorkers, func(ctx context.Context, idx int) error { + jobCtx, cancel := context.WithTimeout(ctx, 15*time.Second) + defer cancel() + item := group[idx] + if err := processItem(jobCtx, item); err != nil { + return err + } + atomic.AddInt64(&processed, 1) + return nil + }) + return int(processed), err + } + + count := 0 + + if len(dashboards) > 0 { + processed, err := processGroup(dashboards) + if err != nil { + return processed, err + } + count += processed + } + + if len(folderItems) > 0 { + for _, item := range folderItems { + if err := processItem(ctx, item); err != nil { + return count, err + } count++ } } - logger.Info("processed orphan items", "items", count) + + logger.Info("processed items", "items", count) return count, nil } diff --git a/pkg/registry/apis/provisioning/controller/finalizers_test.go b/pkg/registry/apis/provisioning/controller/finalizers_test.go index 140363a0507..4df89107bbe 100644 --- a/pkg/registry/apis/provisioning/controller/finalizers_test.go +++ b/pkg/registry/apis/provisioning/controller/finalizers_test.go @@ -2,10 +2,15 @@ package controller import ( "context" + "fmt" + "sync" + "sync/atomic" "testing" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" + mock "github.com/stretchr/testify/mock" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" @@ -14,6 +19,8 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" + "github.com/grafana/grafana-app-sdk/logging" + folders "github.com/grafana/grafana/apps/folder/pkg/apis/folder/v1beta1" provisioning "github.com/grafana/grafana/apps/provisioning/pkg/apis/provisioning/v0alpha1" "github.com/grafana/grafana/apps/provisioning/pkg/repository" "github.com/grafana/grafana/pkg/registry/apis/provisioning/resources" @@ -140,7 +147,7 @@ func TestFinalizer_process(t *testing.T) { resourceLister := resources.NewMockResourceLister(t) resourceLister. - On("List", context.Background(), "default", "my-repo"). + On("List", mock.Anything, "default", "my-repo"). Once(). Return(&provisioning.ResourceList{ Items: []provisioning.ResourceListItem{ @@ -164,12 +171,12 @@ func TestFinalizer_process(t *testing.T) { } clientFactory. - On("Clients", context.Background(), "default"). + On("Clients", mock.Anything, "default"). Once(). Return(clients, nil) clients. - On("ForResource", context.Background(), schema.GroupVersionResource{ + On("ForResource", mock.Anything, schema.GroupVersionResource{ Group: "dashboard.grafana.app", Resource: "dashboards", }). @@ -196,7 +203,7 @@ func TestFinalizer_process(t *testing.T) { resourceLister := resources.NewMockResourceLister(t) resourceLister. - On("List", context.Background(), "default", "my-repo"). + On("List", mock.Anything, "default", "my-repo"). Once(). Return(&provisioning.ResourceList{ Items: []provisioning.ResourceListItem{ @@ -220,12 +227,12 @@ func TestFinalizer_process(t *testing.T) { } clientFactory. - On("Clients", context.Background(), "default"). + On("Clients", mock.Anything, "default"). Once(). Return(clients, nil) clients. - On("ForResource", context.Background(), schema.GroupVersionResource{ + On("ForResource", mock.Anything, schema.GroupVersionResource{ Group: "dashboard.grafana.app", Resource: "dashboards", }). @@ -253,7 +260,7 @@ func TestFinalizer_process(t *testing.T) { clientFactory := resources.NewMockClientFactory(t) clientFactory. - On("Clients", context.Background(), "default"). + On("Clients", mock.Anything, "default"). Once(). Return(nil, assert.AnError) @@ -275,7 +282,7 @@ func TestFinalizer_process(t *testing.T) { resourceLister := resources.NewMockResourceLister(t) resourceLister. - On("List", context.Background(), "default", "my-repo"). + On("List", mock.Anything, "default", "my-repo"). Once(). Return(nil, assert.AnError) @@ -286,7 +293,7 @@ func TestFinalizer_process(t *testing.T) { clients := resources.NewMockResourceClients(t) clientFactory. - On("Clients", context.Background(), "default"). + On("Clients", mock.Anything, "default"). Once(). Return(clients, nil) @@ -308,7 +315,7 @@ func TestFinalizer_process(t *testing.T) { resourceLister := resources.NewMockResourceLister(t) resourceLister. - On("List", context.Background(), "default", "my-repo"). + On("List", mock.Anything, "default", "my-repo"). Once(). Return(&provisioning.ResourceList{ Items: []provisioning.ResourceListItem{ @@ -327,12 +334,12 @@ func TestFinalizer_process(t *testing.T) { clients := resources.NewMockResourceClients(t) clientFactory. - On("Clients", context.Background(), "default"). + On("Clients", mock.Anything, "default"). Once(). Return(clients, nil) clients. - On("ForResource", context.Background(), schema.GroupVersionResource{ + On("ForResource", mock.Anything, schema.GroupVersionResource{ Group: "dashboard.grafana.app", Resource: "dashboards", }). @@ -357,7 +364,7 @@ func TestFinalizer_process(t *testing.T) { resourceLister := resources.NewMockResourceLister(t) resourceLister. - On("List", context.Background(), "default", "my-repo"). + On("List", mock.Anything, "default", "my-repo"). Once(). Return(&provisioning.ResourceList{ Items: []provisioning.ResourceListItem{ @@ -381,12 +388,12 @@ func TestFinalizer_process(t *testing.T) { } clientFactory. - On("Clients", context.Background(), "default"). + On("Clients", mock.Anything, "default"). Once(). Return(clients, nil) clients. - On("ForResource", context.Background(), schema.GroupVersionResource{ + On("ForResource", mock.Anything, schema.GroupVersionResource{ Group: "dashboard.grafana.app", Resource: "dashboards", }). @@ -414,7 +421,7 @@ func TestFinalizer_process(t *testing.T) { resourceLister := resources.NewMockResourceLister(t) resourceLister. - On("List", context.Background(), "default", "my-repo"). + On("List", mock.Anything, "default", "my-repo"). Once(). Return(&provisioning.ResourceList{ Items: []provisioning.ResourceListItem{ @@ -438,12 +445,12 @@ func TestFinalizer_process(t *testing.T) { } clientFactory. - On("Clients", context.Background(), "default"). + On("Clients", mock.Anything, "default"). Once(). Return(clients, nil) clients. - On("ForResource", context.Background(), schema.GroupVersionResource{ + On("ForResource", mock.Anything, schema.GroupVersionResource{ Group: "dashboard.grafana.app", Resource: "dashboards", }). @@ -560,3 +567,142 @@ func TestSortResourceListForDeletion(t *testing.T) { }) } } + +func TestFinalizer_processExistingItems_Concurrency(t *testing.T) { + testCases := []struct { + name string + dashboardCount int + folderCount int + maxWorkers int + expectedConcurrency bool + }{ + { + name: "Multiple dashboards processed concurrently", + dashboardCount: 10, + folderCount: 0, + maxWorkers: 5, + expectedConcurrency: true, + }, + { + name: "Single worker processes dashboards sequentially", + dashboardCount: 5, + folderCount: 0, + maxWorkers: 1, + expectedConcurrency: false, + }, + { + name: "Folders processed sequentially regardless of maxWorkers", + dashboardCount: 0, + folderCount: 5, + maxWorkers: 10, + expectedConcurrency: false, + }, + { + name: "Mixed dashboards and folders - dashboards concurrent, folders sequential", + dashboardCount: 10, + folderCount: 3, + maxWorkers: 5, + expectedConcurrency: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Will be used to track concurrent executions + var ( + concurrentCount int64 + maxConcurrent int64 + mu sync.Mutex + ) + + items := provisioning.ResourceList{Items: []provisioning.ResourceListItem{}} + + for i := 0; i < tc.dashboardCount; i++ { + items.Items = append(items.Items, provisioning.ResourceListItem{ + Group: "dashboard.grafana.app", + Resource: "dashboards", + Name: fmt.Sprintf("dashboard-%d", i), + }) + } + + for i := 0; i < tc.folderCount; i++ { + items.Items = append(items.Items, provisioning.ResourceListItem{ + Group: folders.GroupVersion.Group, + Resource: "folders", + Name: fmt.Sprintf("folder-%d", i), + }) + } + + resourceLister := resources.NewMockResourceLister(t) + resourceLister. + On("List", mock.Anything, "default", "my-repo"). + Return(&items, nil) + + clientFactory := resources.NewMockClientFactory(t) + clients := resources.NewMockResourceClients(t) + + client := &mockDynamicClient{ + deleteFunc: func(ctx context.Context, name string, options metav1.DeleteOptions, subresources ...string) error { + // Track concurrent executions + current := atomic.AddInt64(&concurrentCount, 1) + defer atomic.AddInt64(&concurrentCount, -1) + + mu.Lock() + if current > maxConcurrent { + maxConcurrent = current + } + mu.Unlock() + + // Simulate slow client to allow concurrency to build up + time.Sleep(1 * time.Second) + + return nil + }, + } + + clientFactory. + On("Clients", mock.Anything, "default"). + Return(clients, nil) + + clients. + On("ForResource", mock.Anything, mock.Anything). + Return(client, schema.GroupVersionKind{}, nil) + + metrics := registerFinalizerMetrics(prometheus.NewRegistry()) + f := &finalizer{ + lister: resourceLister, + clientFactory: clientFactory, + metrics: &metrics, + maxWorkers: tc.maxWorkers, + } + + repo := &provisioning.Repository{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-repo", + Namespace: "default", + }, + } + + count, err := f.processExistingItems( + context.Background(), + repo, + f.removeResources(context.Background(), logging.DefaultLogger), + ) + + assert.NoError(t, err) + assert.Equal(t, tc.dashboardCount+tc.folderCount, count) + + if tc.expectedConcurrency { + // When concurrent, max concurrent should be > 1 + assert.Greater(t, maxConcurrent, int64(1), + "Expected concurrent execution but maxConcurrent was %d", maxConcurrent) + // Should not exceed maxWorkers + assert.LessOrEqual(t, maxConcurrent, int64(tc.maxWorkers)) + } else { + // When sequential, max concurrent should be 1 + assert.Equal(t, int64(1), maxConcurrent, + "Expected sequential execution but maxConcurrent was %d", maxConcurrent) + } + }) + } +} diff --git a/pkg/registry/apis/provisioning/controller/repository.go b/pkg/registry/apis/provisioning/controller/repository.go index 70e5c3b4a87..681619711e1 100644 --- a/pkg/registry/apis/provisioning/controller/repository.go +++ b/pkg/registry/apis/provisioning/controller/repository.go @@ -84,6 +84,7 @@ func NewRepositoryController( statusPatcher StatusPatcher, registry prometheus.Registerer, tracer tracing.Tracer, + parallelOperations int, ) (*RepositoryController, error) { finalizerMetrics := registerFinalizerMetrics(registry) @@ -104,6 +105,7 @@ func NewRepositoryController( lister: resourceLister, clientFactory: clients, metrics: &finalizerMetrics, + maxWorkers: parallelOperations, }, jobs: jobs, logger: logging.DefaultLogger.With("logger", loggerName), diff --git a/pkg/registry/apis/provisioning/register.go b/pkg/registry/apis/provisioning/register.go index 8797d940408..485f6bfe5d5 100644 --- a/pkg/registry/apis/provisioning/register.go +++ b/pkg/registry/apis/provisioning/register.go @@ -798,6 +798,7 @@ func (b *APIBuilder) GetPostStartHooks() (map[string]genericapiserver.PostStartH b.statusPatcher, b.registry, b.tracer, + 10, ) if err != nil { return err