Provisioning: concurrent deletes in finalizers and 404 handling (#113155)

* fix: concurrent deletes in finalizers and 404 handling

* chore: feedback review

* fix: broken tests
This commit is contained in:
Costa Alexoglou
2025-10-30 11:55:36 +01:00
committed by GitHub
parent cb86be2e32
commit bbfb8268d1
10 changed files with 290 additions and 53 deletions
@@ -90,6 +90,7 @@ func RunRepoController(deps server.OperatorDependencies) error {
statusPatcher,
deps.Registerer,
tracer,
controllerCfg.parallelOperations,
)
if err != nil {
return fmt.Errorf("failed to create repository controller: %w", err)
@@ -107,6 +108,7 @@ func RunRepoController(deps server.OperatorDependencies) error {
type repoControllerConfig struct {
provisioningControllerConfig
workerCount int
parallelOperations int
allowedTargets []string
allowImageRendering bool
minSyncInterval time.Duration
@@ -128,6 +130,7 @@ func getRepoControllerConfig(cfg *setting.Cfg, registry prometheus.Registerer) (
provisioningControllerConfig: *controllerCfg,
allowedTargets: allowedTargets,
workerCount: cfg.SectionWithEnvOverrides("operator").Key("worker_count").MustInt(1),
parallelOperations: cfg.SectionWithEnvOverrides("operator").Key("parallel_operations").MustInt(10),
allowImageRendering: cfg.SectionWithEnvOverrides("provisioning").Key("allow_image_rendering").MustBool(false),
minSyncInterval: cfg.SectionWithEnvOverrides("provisioning").Key("min_sync_interval").MustDuration(1 * time.Minute),
}, nil
@@ -7,8 +7,11 @@ import (
"slices"
"sort"
"strings"
"sync/atomic"
"time"
"github.com/grafana/dskit/concurrency"
"k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
@@ -27,6 +30,7 @@ type finalizer struct {
lister resources.ResourceLister
clientFactory resources.ClientFactory
metrics *finalizerMetrics
maxWorkers int
}
func (f *finalizer) process(ctx context.Context,
@@ -113,27 +117,73 @@ func (f *finalizer) processExistingItems(
// Safe deletion order
sortResourceListForDeletion(items)
count := 0
var dashboards, folderItems []*provisioning.ResourceListItem
for _, item := range items.Items {
res, _, err := clients.ForResource(ctx, schema.GroupVersionResource{
if item.Group == folders.GroupVersion.Group {
folderItems = append(folderItems, &item)
} else {
dashboards = append(dashboards, &item)
}
}
processItem := func(jobCtx context.Context, item *provisioning.ResourceListItem) error {
res, _, err := clients.ForResource(jobCtx, schema.GroupVersionResource{
Group: item.Group,
Resource: item.Resource,
})
if err != nil {
logger.Error("error getting client for resource", "resource", item.Resource, "error", err)
return count, err
return err
}
err = cb(res, &item)
err = cb(res, item)
if err != nil {
if errors.IsNotFound(err) {
logger.Info("resource not found, skipping", "name", item.Name, "group", item.Group, "resource", item.Resource)
return nil
}
logger.Error("error processing item", "name", item.Name, "error", err)
return count, fmt.Errorf("processing item: %w", err)
} else {
return fmt.Errorf("processing item: %w", err)
}
return nil
}
processGroup := func(group []*provisioning.ResourceListItem) (int, error) {
var processed int64
err := concurrency.ForEachJob(ctx, len(group), f.maxWorkers, func(ctx context.Context, idx int) error {
jobCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
item := group[idx]
if err := processItem(jobCtx, item); err != nil {
return err
}
atomic.AddInt64(&processed, 1)
return nil
})
return int(processed), err
}
count := 0
if len(dashboards) > 0 {
processed, err := processGroup(dashboards)
if err != nil {
return processed, err
}
count += processed
}
if len(folderItems) > 0 {
for _, item := range folderItems {
if err := processItem(ctx, item); err != nil {
return count, err
}
count++
}
}
logger.Info("processed orphan items", "items", count)
logger.Info("processed items", "items", count)
return count, nil
}
@@ -2,10 +2,15 @@ package controller
import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
mock "github.com/stretchr/testify/mock"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -14,6 +19,8 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"github.com/grafana/grafana-app-sdk/logging"
folders "github.com/grafana/grafana/apps/folder/pkg/apis/folder/v1beta1"
provisioning "github.com/grafana/grafana/apps/provisioning/pkg/apis/provisioning/v0alpha1"
"github.com/grafana/grafana/apps/provisioning/pkg/repository"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/resources"
@@ -140,7 +147,7 @@ func TestFinalizer_process(t *testing.T) {
resourceLister := resources.NewMockResourceLister(t)
resourceLister.
On("List", context.Background(), "default", "my-repo").
On("List", mock.Anything, "default", "my-repo").
Once().
Return(&provisioning.ResourceList{
Items: []provisioning.ResourceListItem{
@@ -164,12 +171,12 @@ func TestFinalizer_process(t *testing.T) {
}
clientFactory.
On("Clients", context.Background(), "default").
On("Clients", mock.Anything, "default").
Once().
Return(clients, nil)
clients.
On("ForResource", context.Background(), schema.GroupVersionResource{
On("ForResource", mock.Anything, schema.GroupVersionResource{
Group: "dashboard.grafana.app",
Resource: "dashboards",
}).
@@ -196,7 +203,7 @@ func TestFinalizer_process(t *testing.T) {
resourceLister := resources.NewMockResourceLister(t)
resourceLister.
On("List", context.Background(), "default", "my-repo").
On("List", mock.Anything, "default", "my-repo").
Once().
Return(&provisioning.ResourceList{
Items: []provisioning.ResourceListItem{
@@ -220,12 +227,12 @@ func TestFinalizer_process(t *testing.T) {
}
clientFactory.
On("Clients", context.Background(), "default").
On("Clients", mock.Anything, "default").
Once().
Return(clients, nil)
clients.
On("ForResource", context.Background(), schema.GroupVersionResource{
On("ForResource", mock.Anything, schema.GroupVersionResource{
Group: "dashboard.grafana.app",
Resource: "dashboards",
}).
@@ -253,7 +260,7 @@ func TestFinalizer_process(t *testing.T) {
clientFactory := resources.NewMockClientFactory(t)
clientFactory.
On("Clients", context.Background(), "default").
On("Clients", mock.Anything, "default").
Once().
Return(nil, assert.AnError)
@@ -275,7 +282,7 @@ func TestFinalizer_process(t *testing.T) {
resourceLister := resources.NewMockResourceLister(t)
resourceLister.
On("List", context.Background(), "default", "my-repo").
On("List", mock.Anything, "default", "my-repo").
Once().
Return(nil, assert.AnError)
@@ -286,7 +293,7 @@ func TestFinalizer_process(t *testing.T) {
clients := resources.NewMockResourceClients(t)
clientFactory.
On("Clients", context.Background(), "default").
On("Clients", mock.Anything, "default").
Once().
Return(clients, nil)
@@ -308,7 +315,7 @@ func TestFinalizer_process(t *testing.T) {
resourceLister := resources.NewMockResourceLister(t)
resourceLister.
On("List", context.Background(), "default", "my-repo").
On("List", mock.Anything, "default", "my-repo").
Once().
Return(&provisioning.ResourceList{
Items: []provisioning.ResourceListItem{
@@ -327,12 +334,12 @@ func TestFinalizer_process(t *testing.T) {
clients := resources.NewMockResourceClients(t)
clientFactory.
On("Clients", context.Background(), "default").
On("Clients", mock.Anything, "default").
Once().
Return(clients, nil)
clients.
On("ForResource", context.Background(), schema.GroupVersionResource{
On("ForResource", mock.Anything, schema.GroupVersionResource{
Group: "dashboard.grafana.app",
Resource: "dashboards",
}).
@@ -357,7 +364,7 @@ func TestFinalizer_process(t *testing.T) {
resourceLister := resources.NewMockResourceLister(t)
resourceLister.
On("List", context.Background(), "default", "my-repo").
On("List", mock.Anything, "default", "my-repo").
Once().
Return(&provisioning.ResourceList{
Items: []provisioning.ResourceListItem{
@@ -381,12 +388,12 @@ func TestFinalizer_process(t *testing.T) {
}
clientFactory.
On("Clients", context.Background(), "default").
On("Clients", mock.Anything, "default").
Once().
Return(clients, nil)
clients.
On("ForResource", context.Background(), schema.GroupVersionResource{
On("ForResource", mock.Anything, schema.GroupVersionResource{
Group: "dashboard.grafana.app",
Resource: "dashboards",
}).
@@ -414,7 +421,7 @@ func TestFinalizer_process(t *testing.T) {
resourceLister := resources.NewMockResourceLister(t)
resourceLister.
On("List", context.Background(), "default", "my-repo").
On("List", mock.Anything, "default", "my-repo").
Once().
Return(&provisioning.ResourceList{
Items: []provisioning.ResourceListItem{
@@ -438,12 +445,12 @@ func TestFinalizer_process(t *testing.T) {
}
clientFactory.
On("Clients", context.Background(), "default").
On("Clients", mock.Anything, "default").
Once().
Return(clients, nil)
clients.
On("ForResource", context.Background(), schema.GroupVersionResource{
On("ForResource", mock.Anything, schema.GroupVersionResource{
Group: "dashboard.grafana.app",
Resource: "dashboards",
}).
@@ -560,3 +567,142 @@ func TestSortResourceListForDeletion(t *testing.T) {
})
}
}
func TestFinalizer_processExistingItems_Concurrency(t *testing.T) {
testCases := []struct {
name string
dashboardCount int
folderCount int
maxWorkers int
expectedConcurrency bool
}{
{
name: "Multiple dashboards processed concurrently",
dashboardCount: 10,
folderCount: 0,
maxWorkers: 5,
expectedConcurrency: true,
},
{
name: "Single worker processes dashboards sequentially",
dashboardCount: 5,
folderCount: 0,
maxWorkers: 1,
expectedConcurrency: false,
},
{
name: "Folders processed sequentially regardless of maxWorkers",
dashboardCount: 0,
folderCount: 5,
maxWorkers: 10,
expectedConcurrency: false,
},
{
name: "Mixed dashboards and folders - dashboards concurrent, folders sequential",
dashboardCount: 10,
folderCount: 3,
maxWorkers: 5,
expectedConcurrency: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Will be used to track concurrent executions
var (
concurrentCount int64
maxConcurrent int64
mu sync.Mutex
)
items := provisioning.ResourceList{Items: []provisioning.ResourceListItem{}}
for i := 0; i < tc.dashboardCount; i++ {
items.Items = append(items.Items, provisioning.ResourceListItem{
Group: "dashboard.grafana.app",
Resource: "dashboards",
Name: fmt.Sprintf("dashboard-%d", i),
})
}
for i := 0; i < tc.folderCount; i++ {
items.Items = append(items.Items, provisioning.ResourceListItem{
Group: folders.GroupVersion.Group,
Resource: "folders",
Name: fmt.Sprintf("folder-%d", i),
})
}
resourceLister := resources.NewMockResourceLister(t)
resourceLister.
On("List", mock.Anything, "default", "my-repo").
Return(&items, nil)
clientFactory := resources.NewMockClientFactory(t)
clients := resources.NewMockResourceClients(t)
client := &mockDynamicClient{
deleteFunc: func(ctx context.Context, name string, options metav1.DeleteOptions, subresources ...string) error {
// Track concurrent executions
current := atomic.AddInt64(&concurrentCount, 1)
defer atomic.AddInt64(&concurrentCount, -1)
mu.Lock()
if current > maxConcurrent {
maxConcurrent = current
}
mu.Unlock()
// Simulate slow client to allow concurrency to build up
time.Sleep(1 * time.Second)
return nil
},
}
clientFactory.
On("Clients", mock.Anything, "default").
Return(clients, nil)
clients.
On("ForResource", mock.Anything, mock.Anything).
Return(client, schema.GroupVersionKind{}, nil)
metrics := registerFinalizerMetrics(prometheus.NewRegistry())
f := &finalizer{
lister: resourceLister,
clientFactory: clientFactory,
metrics: &metrics,
maxWorkers: tc.maxWorkers,
}
repo := &provisioning.Repository{
ObjectMeta: metav1.ObjectMeta{
Name: "my-repo",
Namespace: "default",
},
}
count, err := f.processExistingItems(
context.Background(),
repo,
f.removeResources(context.Background(), logging.DefaultLogger),
)
assert.NoError(t, err)
assert.Equal(t, tc.dashboardCount+tc.folderCount, count)
if tc.expectedConcurrency {
// When concurrent, max concurrent should be > 1
assert.Greater(t, maxConcurrent, int64(1),
"Expected concurrent execution but maxConcurrent was %d", maxConcurrent)
// Should not exceed maxWorkers
assert.LessOrEqual(t, maxConcurrent, int64(tc.maxWorkers))
} else {
// When sequential, max concurrent should be 1
assert.Equal(t, int64(1), maxConcurrent,
"Expected sequential execution but maxConcurrent was %d", maxConcurrent)
}
})
}
}
@@ -84,6 +84,7 @@ func NewRepositoryController(
statusPatcher StatusPatcher,
registry prometheus.Registerer,
tracer tracing.Tracer,
parallelOperations int,
) (*RepositoryController, error) {
finalizerMetrics := registerFinalizerMetrics(registry)
@@ -104,6 +105,7 @@ func NewRepositoryController(
lister: resourceLister,
clientFactory: clients,
metrics: &finalizerMetrics,
maxWorkers: parallelOperations,
},
jobs: jobs,
logger: logging.DefaultLogger.With("logger", loggerName),
@@ -798,6 +798,7 @@ func (b *APIBuilder) GetPostStartHooks() (map[string]genericapiserver.PostStartH
b.statusPatcher,
b.registry,
b.tracer,
10,
)
if err != nil {
return err