From 543c0bbccbc72d88e7eed3a54614d2fe1759126e Mon Sep 17 00:00:00 2001 From: Marco de Abreu Date: Sat, 22 Mar 2025 23:47:27 +0100 Subject: [PATCH] App platform: Add cleanup job for dashboards when going through /apis (kubectl) (#102506) * Add dashboard cleanup job Change log message Adjust logic to account for new head RV logic Don't update lastResourceVersion due to pagination Save improvements * Address review feedback * Update docs. * Remove docs * Rename config --------- Co-authored-by: Marco de Abreu <18629099+marcoabreu@users.noreply.github.com> --- pkg/api/dashboard_test.go | 4 + pkg/api/folder_bench_test.go | 4 + .../backgroundsvcs/background_services.go | 3 + .../accesscontrol/accesscontrol_test.go | 6 +- .../annotationsimpl/annotations_test.go | 11 +- pkg/services/dashboards/dashboard.go | 1 + .../dashboards/dashboard_service_mock.go | 18 + .../dashboards/service/dashboard_service.go | 299 ++++++++++++++- .../dashboard_service_integration_test.go | 10 + .../service/dashboard_service_test.go | 345 +++++++++++++++++- .../service/service_test.go | 5 + pkg/services/folder/folderimpl/folder_test.go | 19 +- .../libraryelements/libraryelements_test.go | 8 + .../librarypanels/librarypanels_test.go | 8 +- pkg/services/ngalert/testutil/testutil.go | 4 + .../publicdashboards/api/query_test.go | 5 + .../publicdashboards/service/service_test.go | 6 +- pkg/services/quota/quotaimpl/quota_test.go | 6 +- pkg/setting/setting.go | 4 + pkg/setting/setting_k8s_dashboard_cleanup.go | 53 +++ 20 files changed, 788 insertions(+), 31 deletions(-) create mode 100644 pkg/setting/setting_k8s_dashboard_cleanup.go diff --git a/pkg/api/dashboard_test.go b/pkg/api/dashboard_test.go index 549532806bc..0dfdcd43934 100644 --- a/pkg/api/dashboard_test.go +++ b/pkg/api/dashboard_test.go @@ -23,8 +23,10 @@ import ( "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/db" "github.com/grafana/grafana/pkg/infra/db/dbtest" + "github.com/grafana/grafana/pkg/infra/kvstore" "github.com/grafana/grafana/pkg/infra/localcache" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/serverlock" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/infra/usagestats" "github.com/grafana/grafana/pkg/services/accesscontrol" @@ -875,6 +877,7 @@ func getDashboardShouldReturn200WithConfig(t *testing.T, sc *scenarioContext, pr cfg, dashboardStore, folderStore, features, folderPermissions, ac, folderSvc, fStore, nil, client.MockTestRestConfig{}, nil, quotaService, nil, nil, nil, dualwrite.ProvideTestService(), sort.ProvideService(), + serverlock.ProvideService(db, tracing.InitializeTracerForTest()), kvstore.NewFakeKVStore(), ) require.NoError(t, err) dashboardService.(dashboards.PermissionsRegistrationService).RegisterDashboardPermissions(dashboardPermissions) @@ -884,6 +887,7 @@ func getDashboardShouldReturn200WithConfig(t *testing.T, sc *scenarioContext, pr cfg, dashboardStore, folderStore, features, folderPermissions, ac, folderSvc, fStore, nil, client.MockTestRestConfig{}, nil, quotaService, nil, nil, nil, dualwrite.ProvideTestService(), sort.ProvideService(), + serverlock.ProvideService(db, tracing.InitializeTracerForTest()), kvstore.NewFakeKVStore(), ) require.NoError(t, err) diff --git a/pkg/api/folder_bench_test.go b/pkg/api/folder_bench_test.go index 5847529c941..03a9ab1cf63 100644 --- a/pkg/api/folder_bench_test.go +++ b/pkg/api/folder_bench_test.go @@ -18,8 +18,10 @@ import ( "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/db" + "github.com/grafana/grafana/pkg/infra/kvstore" "github.com/grafana/grafana/pkg/infra/localcache" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/serverlock" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/services/accesscontrol" "github.com/grafana/grafana/pkg/services/accesscontrol/acimpl" @@ -476,6 +478,8 @@ func setupServer(b testing.TB, sc benchScenario, features featuremgmt.FeatureTog sc.cfg, dashStore, folderStore, features, folderPermissions, ac, folderServiceWithFlagOn, fStore, nil, client.MockTestRestConfig{}, nil, quotaSrv, nil, nil, nil, dualwrite.ProvideTestService(), sort.ProvideService(), + serverlock.ProvideService(sc.db, tracing.InitializeTracerForTest()), + kvstore.NewFakeKVStore(), ) require.NoError(b, err) diff --git a/pkg/registry/backgroundsvcs/background_services.go b/pkg/registry/backgroundsvcs/background_services.go index b0f339ed29d..621c2eee751 100644 --- a/pkg/registry/backgroundsvcs/background_services.go +++ b/pkg/registry/backgroundsvcs/background_services.go @@ -17,6 +17,7 @@ import ( "github.com/grafana/grafana/pkg/services/authn/authnimpl" "github.com/grafana/grafana/pkg/services/cleanup" "github.com/grafana/grafana/pkg/services/cloudmigration" + "github.com/grafana/grafana/pkg/services/dashboards/service" "github.com/grafana/grafana/pkg/services/dashboardsnapshots" "github.com/grafana/grafana/pkg/services/grpcserver" "github.com/grafana/grafana/pkg/services/guardian" @@ -69,6 +70,7 @@ func ProvideBackgroundServiceRegistry( zanzanaReconciler *dualwrite.ZanzanaReconciler, appRegistry *appregistry.Service, pluginDashboardUpdater *plugindashboardsservice.DashboardUpdater, + dashboardServiceImpl *service.DashboardServiceImpl, // Need to make sure these are initialized, is there a better place to put them? _ dashboardsnapshots.Service, _ serviceaccounts.Service, _ *guardian.Provider, @@ -115,6 +117,7 @@ func ProvideBackgroundServiceRegistry( zanzanaReconciler, appRegistry, pluginDashboardUpdater, + dashboardServiceImpl, ) } diff --git a/pkg/services/annotations/accesscontrol/accesscontrol_test.go b/pkg/services/annotations/accesscontrol/accesscontrol_test.go index 1bff50c2dd3..c92e1b9546b 100644 --- a/pkg/services/annotations/accesscontrol/accesscontrol_test.go +++ b/pkg/services/annotations/accesscontrol/accesscontrol_test.go @@ -10,6 +10,8 @@ import ( "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/db" + "github.com/grafana/grafana/pkg/infra/kvstore" + "github.com/grafana/grafana/pkg/infra/serverlock" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/services/accesscontrol" "github.com/grafana/grafana/pkg/services/accesscontrol/actest" @@ -50,7 +52,9 @@ func TestIntegrationAuthorize(t *testing.T) { fStore, ac, bus.ProvideBus(tracing.InitializeTracerForTest()), dashStore, folderStore, nil, sql, featuremgmt.WithFeatures(), supportbundlestest.NewFakeBundleService(), nil, cfg, nil, tracing.InitializeTracerForTest(), nil, dualwrite.ProvideTestService(), sort.ProvideService()) dashSvc, err := dashboardsservice.ProvideDashboardServiceImpl(cfg, dashStore, folderStore, featuremgmt.WithFeatures(), accesscontrolmock.NewMockedPermissionsService(), - ac, folderSvc, fStore, nil, client.MockTestRestConfig{}, nil, quotatest.New(false, nil), nil, nil, nil, dualwrite.ProvideTestService(), sort.ProvideService()) + ac, folderSvc, fStore, nil, client.MockTestRestConfig{}, nil, quotatest.New(false, nil), nil, nil, nil, dualwrite.ProvideTestService(), sort.ProvideService(), + serverlock.ProvideService(sql, tracing.InitializeTracerForTest()), + kvstore.NewFakeKVStore()) require.NoError(t, err) dashSvc.RegisterDashboardPermissions(accesscontrolmock.NewMockedPermissionsService()) diff --git a/pkg/services/annotations/annotationsimpl/annotations_test.go b/pkg/services/annotations/annotationsimpl/annotations_test.go index 145c94c0748..8e169e88401 100644 --- a/pkg/services/annotations/annotationsimpl/annotations_test.go +++ b/pkg/services/annotations/annotationsimpl/annotations_test.go @@ -12,7 +12,9 @@ import ( "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/db" + "github.com/grafana/grafana/pkg/infra/kvstore" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/serverlock" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/services/accesscontrol" "github.com/grafana/grafana/pkg/services/accesscontrol/actest" @@ -63,7 +65,9 @@ func TestIntegrationAnnotationListingWithRBAC(t *testing.T) { fStore, ac, bus.ProvideBus(tracing.InitializeTracerForTest()), dashStore, folderStore, nil, sql, featuremgmt.WithFeatures(), supportbundlestest.NewFakeBundleService(), nil, cfg, nil, tracing.InitializeTracerForTest(), nil, dualwrite.ProvideTestService(), sort.ProvideService()) dashSvc, err := dashboardsservice.ProvideDashboardServiceImpl(cfg, dashStore, folderStore, featuremgmt.WithFeatures(), accesscontrolmock.NewMockedPermissionsService(), - ac, folderSvc, fStore, nil, client.MockTestRestConfig{}, nil, quotatest.New(false, nil), nil, nil, nil, dualwrite.ProvideTestService(), sort.ProvideService()) + ac, folderSvc, fStore, nil, client.MockTestRestConfig{}, nil, quotatest.New(false, nil), nil, nil, nil, dualwrite.ProvideTestService(), sort.ProvideService(), + serverlock.ProvideService(sql, tracing.InitializeTracerForTest()), + kvstore.NewFakeKVStore()) require.NoError(t, err) dashSvc.RegisterDashboardPermissions(accesscontrolmock.NewMockedPermissionsService()) repo := ProvideService(sql, cfg, features, tagService, tracing.InitializeTracerForTest(), ruleStore, dashSvc) @@ -246,7 +250,10 @@ func TestIntegrationAnnotationListingWithInheritedRBAC(t *testing.T) { fStore, ac, bus.ProvideBus(tracing.InitializeTracerForTest()), dashStore, folderStore, nil, sql, features, supportbundlestest.NewFakeBundleService(), nil, cfg, nil, tracing.InitializeTracerForTest(), nil, dualwrite.ProvideTestService(), sort.ProvideService()) dashSvc, err := dashboardsservice.ProvideDashboardServiceImpl(cfg, dashStore, folderStore, features, accesscontrolmock.NewMockedPermissionsService(), - ac, folderSvc, fStore, nil, client.MockTestRestConfig{}, nil, quotatest.New(false, nil), nil, nil, nil, dualwrite.ProvideTestService(), sort.ProvideService()) + ac, folderSvc, fStore, nil, client.MockTestRestConfig{}, nil, quotatest.New(false, nil), nil, nil, nil, dualwrite.ProvideTestService(), sort.ProvideService(), + serverlock.ProvideService(sql, tracing.InitializeTracerForTest()), + kvstore.NewFakeKVStore(), + ) require.NoError(t, err) dashSvc.RegisterDashboardPermissions(accesscontrolmock.NewMockedPermissionsService()) cfg.AnnotationMaximumTagsLength = 60 diff --git a/pkg/services/dashboards/dashboard.go b/pkg/services/dashboards/dashboard.go index 92232c95bd8..b904e3c2e07 100644 --- a/pkg/services/dashboards/dashboard.go +++ b/pkg/services/dashboards/dashboard.go @@ -35,6 +35,7 @@ type DashboardService interface { GetAllDashboardsByOrgId(ctx context.Context, orgID int64) ([]*Dashboard, error) SoftDeleteDashboard(ctx context.Context, orgID int64, dashboardUid string) error RestoreDashboard(ctx context.Context, dashboard *Dashboard, user identity.Requester, optionalFolderUID string) error + CleanUpDashboard(ctx context.Context, dashboardUID string, orgId int64) error CleanUpDeletedDashboards(ctx context.Context) (int64, error) GetSoftDeletedDashboard(ctx context.Context, orgID int64, uid string) (*Dashboard, error) CountDashboardsInOrg(ctx context.Context, orgID int64) (int64, error) diff --git a/pkg/services/dashboards/dashboard_service_mock.go b/pkg/services/dashboards/dashboard_service_mock.go index 2e37ea6f3b8..ad526998488 100644 --- a/pkg/services/dashboards/dashboard_service_mock.go +++ b/pkg/services/dashboards/dashboard_service_mock.go @@ -562,6 +562,24 @@ func (_m *FakeDashboardService) SoftDeleteDashboard(ctx context.Context, orgID i return r0 } +// CleanUpDashboard provides a mock function with given fields: ctx, dashboardUID, orgId +func (_m *FakeDashboardService) CleanUpDashboard(ctx context.Context, dashboardUID string, orgId int64) error { + ret := _m.Called(ctx, dashboardUID, orgId) + + if len(ret) == 0 { + panic("no return value specified for CleanUpDashboard") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, int64) error); ok { + r0 = rf(ctx, dashboardUID, orgId) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // NewFakeDashboardService creates a new instance of FakeDashboardService. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewFakeDashboardService(t interface { diff --git a/pkg/services/dashboards/service/dashboard_service.go b/pkg/services/dashboards/service/dashboard_service.go index 0a11d43f4d7..d85dd62405b 100644 --- a/pkg/services/dashboards/service/dashboard_service.go +++ b/pkg/services/dashboards/service/dashboard_service.go @@ -29,9 +29,12 @@ import ( "github.com/grafana/grafana/pkg/apimachinery/utils" folderv0alpha1 "github.com/grafana/grafana/pkg/apis/folder/v0alpha1" "github.com/grafana/grafana/pkg/components/simplejson" + "github.com/grafana/grafana/pkg/infra/kvstore" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/metrics" + "github.com/grafana/grafana/pkg/infra/serverlock" "github.com/grafana/grafana/pkg/infra/slugify" + "github.com/grafana/grafana/pkg/registry" "github.com/grafana/grafana/pkg/registry/apis/dashboard/legacysearcher" "github.com/grafana/grafana/pkg/services/accesscontrol" "github.com/grafana/grafana/pkg/services/apiserver" @@ -56,6 +59,7 @@ import ( "github.com/grafana/grafana/pkg/storage/unified/search" "github.com/grafana/grafana/pkg/util" "github.com/grafana/grafana/pkg/util/retryer" + "go.opentelemetry.io/otel/attribute" ) var ( @@ -68,6 +72,11 @@ var ( tracer = otel.Tracer("github.com/grafana/grafana/pkg/services/dashboards/service") ) +const ( + k8sDashboardKvNamespace = "dashboard-cleanup" + k8sDashboardKvLastResourceVersionKey = "last-resource-version" +) + type DashboardServiceImpl struct { cfg *setting.Cfg log log.Logger @@ -82,11 +91,271 @@ type DashboardServiceImpl struct { k8sclient client.K8sHandler metrics *dashboardsMetrics publicDashboardService publicdashboards.ServiceWrapper + serverLockService *serverlock.ServerLockService + kvstore kvstore.KVStore dashboardPermissionsReady chan struct{} } +func (dr *DashboardServiceImpl) startK8sDeletedDashboardsCleanupJob(ctx context.Context) chan struct{} { + done := make(chan struct{}) + go func() { + defer close(done) + + ticker := time.NewTicker(dr.cfg.K8sDashboardCleanup.Interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := dr.executeCleanupWithLock(ctx); err != nil { + dr.log.Error("Failed to execute k8s dashboard cleanup", "error", err) + } + } + } + }() + return done +} + +func (dr *DashboardServiceImpl) executeCleanupWithLock(ctx context.Context) error { + // We're taking a leader-like locking approach here. By locking and executing, but never releasing the lock, + // we ensure that other instances of this service can't run in parallel and hence the cleanup will only happen once + // per cleanup interval by setting the maxInterval and having the time between executions be the cleanup interval as well. + return dr.serverLockService.LockAndExecute( + ctx, + k8sDashboardKvNamespace, + dr.cfg.K8sDashboardCleanup.Interval, + func(ctx context.Context) { + if err := dr.cleanupK8sDashboardResources(ctx, dr.cfg.K8sDashboardCleanup.BatchSize, dr.cfg.K8sDashboardCleanup.Timeout); err != nil { + dr.log.Error("Failed to cleanup k8s dashboard resources", "error", err) + } + }, + ) +} + +// cleanupK8sDashboardResources cleans up resources marked for deletion in the k8s API. +// It processes all organizations, finds dashboards with the trash label, and cleans them up. +// batchSize specifies how many dashboards to process in a single batch. +// timeout specifies the timeout duration for the cleanup operation. +func (dr *DashboardServiceImpl) cleanupK8sDashboardResources(ctx context.Context, batchSize int64, timeout time.Duration) error { + ctx, span := tracer.Start(ctx, "dashboards.service.cleanupK8sDashboardResources") + defer span.End() + + if !dr.features.IsEnabledGlobally(featuremgmt.FlagKubernetesClientDashboardsFolders) { + return nil + } + + // Create a timeout context to ensure we complete before the lock expires + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + orgs, err := dr.orgService.Search(ctx, &org.SearchOrgsQuery{}) + if err != nil { + return err + } + dr.log.Debug("Running k8s dashboard resource cleanup for all orgs", "numOrgs", len(orgs)) + + var errs []error + for _, org := range orgs { + // Check if we're approaching the timeout + if ctx.Err() != nil { + dr.log.Info("Timeout reached during cleanup, stopping processing", "timeout", timeout) + break + } + + orgErr := dr.cleanupOrganizationK8sDashboards(ctx, org.ID, batchSize) + if orgErr != nil { + errs = append(errs, fmt.Errorf("org %d: %w", org.ID, orgErr)) + } + } + + if len(errs) > 0 { + return errors.Join(errs...) + } + + return nil +} + +// cleanupOrganizationK8sDashboards handles cleanup for a single organization's Kubernetes dashboards +func (dr *DashboardServiceImpl) cleanupOrganizationK8sDashboards(ctx context.Context, orgID int64, batchSize int64) error { + dr.log.Debug("Running k8s dashboard resource cleanup for org", "orgID", orgID) + + ctx, span := tracer.Start(ctx, "dashboards.service.cleanupK8sDashboardResources.org") + defer span.End() + span.SetAttributes(attribute.Int64("org_id", orgID)) + + ctx, _ = identity.WithServiceIdentity(ctx, orgID) + + // Get the last processed resource version + lastResourceVersion, err := dr.getLastResourceVersion(ctx, orgID) + if err != nil { + return err + } + + var errs []error + continueToken := "" + itemsProcessed := 0 + + for { + // Check if we're approaching the timeout + if ctx.Err() != nil { + dr.log.Info("Timeout reached during org cleanup, stopping processing", "orgID", orgID) + break + } + + // List resources to be cleaned up + data, listErr, shouldContinue := dr.listResourcesToCleanup(ctx, orgID, lastResourceVersion, continueToken, batchSize) + if listErr != nil { + errs = append(errs, fmt.Errorf("failed to list resources: %w", listErr)) + break + } + if shouldContinue { + // Reset and try again with updated resource version + lastResourceVersion = "0" + continueToken = "" + continue + } + + // Skip the first item if it matches our last resource version (due to NotOlderThan behavior) + if len(data.Items) > 0 && data.Items[0].GetResourceVersion() == lastResourceVersion { + data.Items = data.Items[1:] + } + + if len(data.Items) == 0 { + dr.log.Debug("No items to clean up in this batch", "orgID", orgID) + break + } + + dr.log.Info("Processing dashboard cleanup batch", "orgID", orgID, "count", len(data.Items)) + + // Process the batch + processedItems, processingErrs := dr.processDashboardBatch(ctx, orgID, data.Items) + if len(processingErrs) > 0 { + errs = append(errs, processingErrs...) + } + itemsProcessed += processedItems + + // Update resource version after the batch + if len(data.Items) > 0 { + maxBatchResourceVersion := data.Items[len(data.Items)-1].GetResourceVersion() + if lastResourceVersion != maxBatchResourceVersion { + dr.log.Info("Updating resource version after batch", "orgID", orgID, + "newResourceVersion", maxBatchResourceVersion, "oldResourceVersion", lastResourceVersion) + + if updateErr := dr.kvstore.Set(ctx, orgID, k8sDashboardKvNamespace, + k8sDashboardKvLastResourceVersionKey, maxBatchResourceVersion); updateErr != nil { + errs = append(errs, fmt.Errorf("failed to update resource version: %w", updateErr)) + } + } + } + + meta, _ := data.Object["metadata"].(map[string]interface{}) + continueToken, _ = meta["continue"].(string) + if continueToken == "" { + break + } + } + + if itemsProcessed > 0 { + dr.log.Info("Finished k8s dashboard resources cleanup", "orgID", orgID, "itemsProcessed", itemsProcessed) + } + + if len(errs) > 0 { + return errors.Join(errs...) + } + return nil +} + +// getLastResourceVersion retrieves the last processed resource version from kvstore +func (dr *DashboardServiceImpl) getLastResourceVersion(ctx context.Context, orgID int64) (string, error) { + lastResourceVersion, ok, err := dr.kvstore.Get(ctx, orgID, k8sDashboardKvNamespace, k8sDashboardKvLastResourceVersionKey) + if err != nil { + return "", fmt.Errorf("failed to get last resource version: %w", err) + } + + if !ok { + dr.log.Info("No last resource version found, starting from scratch", "orgID", orgID) + return "0", nil + } + + return lastResourceVersion, nil +} + +// listResourcesToCleanup lists resources that need to be cleaned up +func (dr *DashboardServiceImpl) listResourcesToCleanup(ctx context.Context, orgID int64, resourceVersion, continueToken string, batchSize int64) (*unstructured.UnstructuredList, error, bool) { + var listOptions v1.ListOptions + if continueToken != "" { + listOptions = v1.ListOptions{ + LabelSelector: utils.LabelKeyGetTrash + "=true", + Continue: continueToken, + Limit: batchSize, + } + } else { + listOptions = v1.ListOptions{ + LabelSelector: utils.LabelKeyGetTrash + "=true", + ResourceVersionMatch: v1.ResourceVersionMatchNotOlderThan, + ResourceVersion: resourceVersion, + Limit: batchSize, + } + } + + data, err := dr.k8sclient.List(ctx, orgID, listOptions) + if err != nil { + if strings.Contains(err.Error(), "too old resource version") { + // If the resource version is too old, start from the current version + dr.log.Info("Resource version too old, starting from current version", "orgID", orgID) + return nil, nil, true // Signal to continue with reset version + } + return nil, err, false + } + + return data, nil, false +} + +// processDashboardBatch processes a batch of dashboards for cleanup +func (dr *DashboardServiceImpl) processDashboardBatch(ctx context.Context, orgID int64, items []unstructured.Unstructured) (int, []error) { + var errs []error + itemsProcessed := 0 + + for _, item := range items { + dash, err := dr.UnstructuredToLegacyDashboard(ctx, &item, orgID) + if err != nil { + errs = append(errs, fmt.Errorf("failed to convert dashboard: %w", err)) + continue + } + + meta, _ := item.Object["metadata"].(map[string]interface{}) + deletionTimestamp, _ := meta["deletionTimestamp"].(string) + resourceVersion, _ := meta["resourceVersion"].(string) + + dr.log.Info("K8s dashboard resource previously got deleted, cleaning up", + "UID", dash.UID, + "orgID", orgID, + "deletionTimestamp", deletionTimestamp, + "resourceVersion", resourceVersion) + + if err = dr.CleanUpDashboard(ctx, dash.UID, orgID); err != nil { + errs = append(errs, fmt.Errorf("failed to clean up dashboard %s: %w", dash.UID, err)) + } + itemsProcessed++ + } + + return itemsProcessed, errs +} + +// This gets auto-invoked when grafana starts, part of the BackgroundService interface +func (dr *DashboardServiceImpl) Run(ctx context.Context) error { + cleanupBackgroundJobStopped := dr.startK8sDeletedDashboardsCleanupJob(ctx) + <-ctx.Done() + // Wait for cleanup job to finish + <-cleanupBackgroundJobStopped + return ctx.Err() +} + var _ dashboards.PermissionsRegistrationService = (*DashboardServiceImpl)(nil) +var _ registry.BackgroundService = (*DashboardServiceImpl)(nil) // This is the uber service that implements a three smaller services func ProvideDashboardServiceImpl( @@ -96,6 +365,8 @@ func ProvideDashboardServiceImpl( restConfigProvider apiserver.RestConfigProvider, userService user.Service, quotaService quota.Service, orgService org.Service, publicDashboardService publicdashboards.ServiceWrapper, resourceClient resource.ResourceClient, dual dualwrite.Service, sorter sort.Service, + serverLockService *serverlock.ServerLockService, + kvstore kvstore.KVStore, ) (*DashboardServiceImpl, error) { k8sHandler := client.NewK8sHandler(dual, request.GetNamespaceMapper(cfg), dashboardv0alpha1.DashboardResourceInfo.GroupVersionResource(), restConfigProvider.GetRestConfig, dashboardStore, userService, resourceClient, sorter) @@ -113,6 +384,8 @@ func ProvideDashboardServiceImpl( metrics: newDashboardsMetrics(r), dashboardPermissionsReady: make(chan struct{}), publicDashboardService: publicDashboardService, + serverLockService: serverLockService, + kvstore: kvstore, } defaultLimits, err := readQuotaConfig(cfg) @@ -860,18 +1133,7 @@ func (dr *DashboardServiceImpl) deleteDashboard(ctx context.Context, dashboardId cmd := &dashboards.DeleteDashboardCommand{OrgID: orgId, ID: dashboardId, UID: dashboardUID} if dr.features.IsEnabledGlobally(featuremgmt.FlagKubernetesClientDashboardsFolders) { - err := dr.deleteDashboardThroughK8s(ctx, cmd, validateProvisionedDashboard) - if err != nil { - return err - } - - // cleanup things related to dashboards that are not stored in unistore yet - err = dr.publicDashboardService.DeleteByDashboardUIDs(ctx, orgId, []string{dashboardUID}) - if err != nil { - return err - } - - return dr.dashboardStore.CleanupAfterDelete(ctx, cmd) + return dr.deleteDashboardThroughK8s(ctx, cmd, validateProvisionedDashboard) } if validateProvisionedDashboard { @@ -1494,6 +1756,19 @@ func (dr *DashboardServiceImpl) DeleteInFolders(ctx context.Context, orgID int64 func (dr *DashboardServiceImpl) Kind() string { return entity.StandardKindDashboard } +func (dr *DashboardServiceImpl) CleanUpDashboard(ctx context.Context, dashboardUID string, orgId int64) error { + ctx, span := tracer.Start(ctx, "dashboards.service.CleanUpDashboard") + defer span.End() + + // cleanup things related to dashboards that are not stored in unistore yet + var err = dr.publicDashboardService.DeleteByDashboardUIDs(ctx, orgId, []string{dashboardUID}) + if err != nil { + return err + } + + return dr.dashboardStore.CleanupAfterDelete(ctx, &dashboards.DeleteDashboardCommand{OrgID: orgId, UID: dashboardUID}) +} + func (dr *DashboardServiceImpl) CleanUpDeletedDashboards(ctx context.Context) (int64, error) { ctx, span := tracer.Start(ctx, "dashboards.service.CleanUpDeletedDashboards") defer span.End() diff --git a/pkg/services/dashboards/service/dashboard_service_integration_test.go b/pkg/services/dashboards/service/dashboard_service_integration_test.go index e22a5ae0cd4..7887e628110 100644 --- a/pkg/services/dashboards/service/dashboard_service_integration_test.go +++ b/pkg/services/dashboards/service/dashboard_service_integration_test.go @@ -11,6 +11,8 @@ import ( "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/db" + "github.com/grafana/grafana/pkg/infra/kvstore" + "github.com/grafana/grafana/pkg/infra/serverlock" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/services/accesscontrol" "github.com/grafana/grafana/pkg/services/accesscontrol/acimpl" @@ -813,6 +815,8 @@ func permissionScenario(t *testing.T, desc string, fn permissionScenarioFunc) { nil, dualwrite.ProvideTestService(), sort.ProvideService(), + serverlock.ProvideService(sqlStore, tracing.InitializeTracerForTest()), + kvstore.NewFakeKVStore(), ) dashboardService.RegisterDashboardPermissions(dashboardPermissions) require.NoError(t, err) @@ -906,6 +910,8 @@ func callSaveWithResult(t *testing.T, cmd dashboards.SaveDashboardCommand, sqlSt nil, dualwrite.ProvideTestService(), sort.ProvideService(), + serverlock.ProvideService(sqlStore, tracing.InitializeTracerForTest()), + kvstore.NewFakeKVStore(), ) require.NoError(t, err) service.RegisterDashboardPermissions(dashboardPermissions) @@ -977,6 +983,8 @@ func saveTestDashboard(t *testing.T, title string, orgID int64, folderUID string nil, dualwrite.ProvideTestService(), sort.ProvideService(), + serverlock.ProvideService(sqlStore, tracing.InitializeTracerForTest()), + kvstore.NewFakeKVStore(), ) require.NoError(t, err) service.RegisterDashboardPermissions(dashboardPermissions) @@ -1056,6 +1064,8 @@ func saveTestFolder(t *testing.T, title string, orgID int64, sqlStore db.DB) *da nil, dualwrite.ProvideTestService(), sort.ProvideService(), + serverlock.ProvideService(sqlStore, tracing.InitializeTracerForTest()), + kvstore.NewFakeKVStore(), ) require.NoError(t, err) service.RegisterDashboardPermissions(accesscontrolmock.NewMockedPermissionsService()) diff --git a/pkg/services/dashboards/service/dashboard_service_test.go b/pkg/services/dashboards/service/dashboard_service_test.go index 78c38656329..82600d9f2cd 100644 --- a/pkg/services/dashboards/service/dashboard_service_test.go +++ b/pkg/services/dashboards/service/dashboard_service_test.go @@ -18,7 +18,10 @@ import ( "github.com/grafana/grafana/pkg/apimachinery/identity" "github.com/grafana/grafana/pkg/apimachinery/utils" "github.com/grafana/grafana/pkg/components/simplejson" + "github.com/grafana/grafana/pkg/infra/kvstore" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/serverlock" + "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/services/accesscontrol" "github.com/grafana/grafana/pkg/services/accesscontrol/actest" acmock "github.com/grafana/grafana/pkg/services/accesscontrol/mock" @@ -33,6 +36,7 @@ import ( "github.com/grafana/grafana/pkg/services/quota" "github.com/grafana/grafana/pkg/services/search/model" "github.com/grafana/grafana/pkg/services/search/sort" + "github.com/grafana/grafana/pkg/services/sqlstore" "github.com/grafana/grafana/pkg/services/user" "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/storage/unified/resource" @@ -833,9 +837,6 @@ func TestDeleteOrphanedProvisionedDashboards(t *testing.T) { _, k8sCliMock := setupK8sDashboardTests(service) k8sCliMock.On("GetNamespace", mock.Anything, mock.Anything).Return("default") k8sCliMock.On("Delete", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) - fakeStore.On("CleanupAfterDelete", mock.Anything, &dashboards.DeleteDashboardCommand{UID: "uid", OrgID: 1}).Return(nil).Once() - fakeStore.On("CleanupAfterDelete", mock.Anything, &dashboards.DeleteDashboardCommand{UID: "uid3", OrgID: 2}).Return(nil).Once() - fakePublicDashboardService.On("DeleteByDashboardUIDs", mock.Anything, mock.Anything, mock.Anything).Return(nil) k8sCliMock.On("Get", mock.Anything, "uid", mock.Anything, mock.Anything, mock.Anything).Return(&unstructured.Unstructured{Object: map[string]any{ "metadata": map[string]any{ "name": "uid", @@ -1033,8 +1034,6 @@ func TestDeleteOrphanedProvisionedDashboards(t *testing.T) { // Mock deleteDashboard() k8sCliMock.On("Delete", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() - fakePublicDashboardService.On("DeleteByDashboardUIDs", mock.Anything, mock.Anything, mock.Anything).Return(nil) - fakeStore.On("CleanupAfterDelete", mock.Anything, mock.Anything).Return(nil).Once() // Mock WaitForSearchQuery() // First call returns 1 hit @@ -1476,8 +1475,6 @@ func TestDeleteDashboard(t *testing.T) { t.Run("Should use Kubernetes client if feature flags are enabled", func(t *testing.T) { ctx, k8sCliMock := setupK8sDashboardTests(service) k8sCliMock.On("Delete", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() - fakeStore.On("CleanupAfterDelete", mock.Anything, mock.Anything).Return(nil).Once() - fakePublicDashboardService.On("DeleteByDashboardUIDs", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() err := service.DeleteDashboard(ctx, 1, "uid", 1) require.NoError(t, err) @@ -1488,8 +1485,6 @@ func TestDeleteDashboard(t *testing.T) { ctx, k8sCliMock := setupK8sDashboardTests(service) k8sCliMock.On("GetNamespace", mock.Anything, mock.Anything).Return("default") k8sCliMock.On("Delete", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() - fakeStore.On("CleanupAfterDelete", mock.Anything, mock.Anything).Return(nil).Once() - fakePublicDashboardService.On("DeleteByDashboardUIDs", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() k8sCliMock.On("Search", mock.Anything, mock.Anything, mock.Anything).Return(&resource.ResourceSearchResponse{ Results: &resource.ResourceTable{ Columns: []*resource.ResourceTableColumnDefinition{ @@ -2314,3 +2309,335 @@ func TestLegacySaveCommandToUnstructured(t *testing.T) { assert.Equal(t, result.GetAnnotations(), map[string]string(nil)) }) } + +func TestCleanUpDashboard(t *testing.T) { + tests := []struct { + name string + deleteError error + cleanupError error + expectCleanup bool + expectedError error + }{ + { + name: "Should delete public dashboards and clean up after delete", + expectCleanup: true, + }, + { + name: "Should return error if DeleteByDashboardUIDs fails", + deleteError: fmt.Errorf("deletion error"), + expectCleanup: false, + expectedError: fmt.Errorf("deletion error"), + }, + { + name: "Should return error if CleanupAfterDelete fails", + cleanupError: fmt.Errorf("cleanup error"), + expectCleanup: true, + expectedError: fmt.Errorf("cleanup error"), + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + fakeStore := dashboards.FakeDashboardStore{} + fakePublicDashboardService := publicdashboards.NewFakePublicDashboardServiceWrapper(t) + service := &DashboardServiceImpl{ + cfg: setting.NewCfg(), + dashboardStore: &fakeStore, + publicDashboardService: fakePublicDashboardService, + } + + ctx := context.Background() + dashboardUID := "dash-uid" + orgID := int64(1) + + // Setup mocks + fakePublicDashboardService.On("DeleteByDashboardUIDs", mock.Anything, orgID, []string{dashboardUID}).Return(tc.deleteError).Maybe() + + if tc.expectCleanup { + fakeStore.On("CleanupAfterDelete", mock.Anything, &dashboards.DeleteDashboardCommand{ + OrgID: orgID, + UID: dashboardUID, + }).Return(tc.cleanupError).Maybe() + } + + // Execute + err := service.CleanUpDashboard(ctx, dashboardUID, orgID) + + // Assert + if tc.expectedError != nil { + require.Error(t, err) + require.Equal(t, tc.expectedError.Error(), err.Error()) + } else { + require.NoError(t, err) + } + + fakePublicDashboardService.AssertExpectations(t) + fakeStore.AssertExpectations(t) + }) + } +} + +func TestK8sDashboardCleanupJob(t *testing.T) { + tests := []struct { + name string + featureEnabled bool + batchSize int + setupFunc func(*DashboardServiceImpl, context.Context, *client.MockK8sHandler) + verifyFunc func(*testing.T, *DashboardServiceImpl, context.Context, *client.MockK8sHandler, *kvstore.FakeKVStore) + }{ + { + name: "Should not run cleanup when feature flag is disabled", + featureEnabled: false, + batchSize: 10, + }, + { + name: "Should process dashboard cleanup for all orgs", + featureEnabled: true, + batchSize: 10, + setupFunc: func(service *DashboardServiceImpl, ctx context.Context, k8sCliMock *client.MockK8sHandler) { + // Test organizations + fakeOrgService := service.orgService.(*orgtest.FakeOrgService) + fakeOrgService.ExpectedOrgs = []*org.OrgDTO{ + {ID: 1, Name: "org1"}, + {ID: 2, Name: "org2"}, + } + + kv := service.kvstore.(*kvstore.FakeKVStore) + fakeStore := service.dashboardStore.(*dashboards.FakeDashboardStore) + fakePublicDashboardService := service.publicDashboardService.(*publicdashboards.FakePublicDashboardServiceWrapper) + + // Create dashboard unstructured items for response + dashboard1 := createTestUnstructuredDashboard("dash1", "org1-dashboard", "101") + dashboard2 := createTestUnstructuredDashboard("dash2", "org2-dashboard", "201") + + // Setup test data in KV store. Only populate org 1. + _ = kv.Set(ctx, int64(1), k8sDashboardKvNamespace, k8sDashboardKvLastResourceVersionKey, "100") + + // Mock K8s responses for org 1 + k8sCliMock.On("List", mock.AnythingOfType("*context.valueCtx"), int64(1), mock.MatchedBy(func(opts metav1.ListOptions) bool { + return opts.LabelSelector == utils.LabelKeyGetTrash+"=true" && + opts.Continue == "" + })).Return(&unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "metadata": map[string]interface{}{ + "resourceVersion": "101", + }, + }, + Items: []unstructured.Unstructured{dashboard1}, + }, nil).Once() + + // Mock K8s responses for org 2 + k8sCliMock.On("List", mock.AnythingOfType("*context.valueCtx"), int64(2), mock.MatchedBy(func(opts metav1.ListOptions) bool { + return opts.LabelSelector == utils.LabelKeyGetTrash+"=true" && + opts.Continue == "" + })).Return(&unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "metadata": map[string]interface{}{ + "resourceVersion": "201", + }, + }, + Items: []unstructured.Unstructured{dashboard2}, + }, nil).Once() + + // Mock GetUserFromMeta calls + k8sCliMock.On("GetUserFromMeta", mock.AnythingOfType("*context.valueCtx"), mock.Anything).Return(&user.User{}, nil).Times(4) + + // Mock cleanup + fakePublicDashboardService.On("DeleteByDashboardUIDs", mock.Anything, int64(1), []string{"dash1"}).Return(nil).Once() + fakePublicDashboardService.On("DeleteByDashboardUIDs", mock.Anything, int64(2), []string{"dash2"}).Return(nil).Once() + fakeStore.On("CleanupAfterDelete", mock.Anything, mock.Anything).Return(nil).Times(2) + }, + verifyFunc: func(t *testing.T, service *DashboardServiceImpl, ctx context.Context, k8sCliMock *client.MockK8sHandler, kv *kvstore.FakeKVStore) { + k8sCliMock.AssertExpectations(t) + + // Verify KV store was updated with new resource versions + val1, found1, _ := kv.Get(ctx, int64(1), k8sDashboardKvNamespace, k8sDashboardKvLastResourceVersionKey) + require.True(t, found1) + require.Equal(t, "101", val1) + + val2, found2, _ := kv.Get(ctx, int64(2), k8sDashboardKvNamespace, k8sDashboardKvLastResourceVersionKey) + require.True(t, found2) + require.Equal(t, "201", val2) + }, + }, + { + name: "Should handle pagination and batching when processing large sets of dashboards", + featureEnabled: true, + batchSize: 3, + setupFunc: func(service *DashboardServiceImpl, ctx context.Context, k8sCliMock *client.MockK8sHandler) { + // Test organization + fakeOrgService := service.orgService.(*orgtest.FakeOrgService) + fakeOrgService.ExpectedOrgs = []*org.OrgDTO{ + {ID: 1, Name: "org1"}, + } + + kv := service.kvstore.(*kvstore.FakeKVStore) + fakeStore := service.dashboardStore.(*dashboards.FakeDashboardStore) + fakePublicDashboardService := service.publicDashboardService.(*publicdashboards.FakePublicDashboardServiceWrapper) + + // Setup initial resource version + initialVersion := "100" + _ = kv.Set(ctx, int64(1), k8sDashboardKvNamespace, k8sDashboardKvLastResourceVersionKey, initialVersion) + + // Create dashboard batches (5 dashboards total, to be processed in 2 batches) + firstBatch := []unstructured.Unstructured{ + createTestUnstructuredDashboard("dash1", "dashboard1", "101"), + createTestUnstructuredDashboard("dash2", "dashboard2", "102"), + createTestUnstructuredDashboard("dash3", "dashboard3", "150"), + } + secondBatch := []unstructured.Unstructured{ + createTestUnstructuredDashboard("dash4", "dashboard4", "180"), + createTestUnstructuredDashboard("dash5", "dashboard5", "200"), + } + + // First batch response with continue token + k8sCliMock.On("List", mock.AnythingOfType("*context.valueCtx"), int64(1), mock.MatchedBy(func(opts metav1.ListOptions) bool { + return opts.LabelSelector == utils.LabelKeyGetTrash+"=true" && + opts.Continue == "" + })).Return(&unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "metadata": map[string]interface{}{ + "resourceVersion": "200", + "continue": "next-token", + }, + }, + Items: firstBatch, + }, nil).Once() + + // Second batch response with updated resource version + k8sCliMock.On("List", mock.AnythingOfType("*context.valueCtx"), int64(1), mock.MatchedBy(func(opts metav1.ListOptions) bool { + return opts.LabelSelector == utils.LabelKeyGetTrash+"=true" && + opts.Continue == "next-token" + })).Return(&unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "metadata": map[string]interface{}{ + "resourceVersion": "200", + }, + }, + Items: secondBatch, + }, nil).Once() + + // Mock GetUserFromMeta calls for each dashboard + k8sCliMock.On("GetUserFromMeta", mock.AnythingOfType("*context.valueCtx"), mock.Anything).Return(&user.User{}, nil).Times(10) + + // Mock public dashboard deletion for each dashboard + fakePublicDashboardService.On("DeleteByDashboardUIDs", mock.Anything, int64(1), []string{"dash1"}).Return(nil).Once() + fakePublicDashboardService.On("DeleteByDashboardUIDs", mock.Anything, int64(1), []string{"dash2"}).Return(nil).Once() + fakePublicDashboardService.On("DeleteByDashboardUIDs", mock.Anything, int64(1), []string{"dash3"}).Return(nil).Once() + fakePublicDashboardService.On("DeleteByDashboardUIDs", mock.Anything, int64(1), []string{"dash4"}).Return(nil).Once() + fakePublicDashboardService.On("DeleteByDashboardUIDs", mock.Anything, int64(1), []string{"dash5"}).Return(nil).Once() + + // Mock cleanup after delete for each dashboard + fakeStore.On("CleanupAfterDelete", mock.Anything, mock.Anything).Return(nil).Times(5) + }, + verifyFunc: func(t *testing.T, service *DashboardServiceImpl, ctx context.Context, k8sCliMock *client.MockK8sHandler, kv *kvstore.FakeKVStore) { + k8sCliMock.AssertExpectations(t) + + // Verify KV store was updated with latest resource version + val, found, _ := kv.Get(ctx, int64(1), k8sDashboardKvNamespace, k8sDashboardKvLastResourceVersionKey) + require.True(t, found) + require.Equal(t, "200", val) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + // Setup test database and utilities + sqlStore, _ := sqlstore.InitTestDB(t) + lockService := serverlock.ProvideService(sqlStore, tracing.InitializeTracerForTest()) + kv := kvstore.NewFakeKVStore() + + fakeStore := dashboards.FakeDashboardStore{} + fakePublicDashboardService := publicdashboards.NewFakePublicDashboardServiceWrapper(t) + fakeOrgService := orgtest.NewOrgServiceFake() + + features := featuremgmt.WithFeatures() + if tc.featureEnabled { + features = featuremgmt.WithFeatures(featuremgmt.FlagKubernetesClientDashboardsFolders) + } + + service := &DashboardServiceImpl{ + cfg: setting.NewCfg(), + log: log.New("test.logger"), + dashboardStore: &fakeStore, + publicDashboardService: fakePublicDashboardService, + orgService: fakeOrgService, + serverLockService: lockService, + kvstore: kv, + features: features, + } + + ctx, k8sCliMock := setupK8sDashboardTests(service) + + if tc.setupFunc != nil { + tc.setupFunc(service, ctx, k8sCliMock) + } + + // Execute + err := service.cleanupK8sDashboardResources(ctx, int64(tc.batchSize), 20*time.Second) + require.NoError(t, err) + + if tc.verifyFunc != nil { + tc.verifyFunc(t, service, ctx, k8sCliMock, kv) + } + }) + } + + t.Run("Should start and stop background job correctly", func(t *testing.T) { + // Setup test database and utilities + sqlStore, _ := sqlstore.InitTestDB(t) + lockService := serverlock.ProvideService(sqlStore, tracing.InitializeTracerForTest()) + + cfg := setting.NewCfg() + cfg.K8sDashboardCleanup = setting.K8sDashboardCleanupSettings{ + Interval: 30 * time.Second, + Timeout: 25 * time.Second, + BatchSize: 10, + } + + service := &DashboardServiceImpl{ + cfg: cfg, + log: log.New("test.logger"), + features: featuremgmt.WithFeatures(featuremgmt.FlagKubernetesClientDashboardsFolders), + serverLockService: lockService, + } + + // Create a test context that can be canceled + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start job with the context + done := service.startK8sDeletedDashboardsCleanupJob(ctx) + + // Cancel context to verify graceful shutdown + cancel() + + // Wait for goroutine to exit instead of using sleep + select { + case <-done: + // Job exited successfully + case <-time.After(time.Second): + t.Fatal("Cleanup job didn't exit within timeout") + } + }) +} + +// Helper functions for testing + +func createTestUnstructuredDashboard(uid, title string, resourceVersion string) unstructured.Unstructured { + return unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": dashboardv0alpha1.DashboardResourceInfo.GroupVersion().String(), + "kind": dashboardv0alpha1.DashboardResourceInfo.GroupVersionKind().Kind, + "metadata": map[string]interface{}{ + "name": uid, + "deletionTimestamp": "2023-01-01T00:00:00Z", + "resourceVersion": resourceVersion, + }, + "spec": map[string]interface{}{ + "title": title, + }, + }, + } +} diff --git a/pkg/services/dashboardsnapshots/service/service_test.go b/pkg/services/dashboardsnapshots/service/service_test.go index 9c4fedb47e4..c647528785b 100644 --- a/pkg/services/dashboardsnapshots/service/service_test.go +++ b/pkg/services/dashboardsnapshots/service/service_test.go @@ -11,6 +11,9 @@ import ( common "github.com/grafana/grafana/pkg/apimachinery/apis/common/v0alpha1" dashboardsnapshot "github.com/grafana/grafana/pkg/apis/dashboardsnapshot/v0alpha1" "github.com/grafana/grafana/pkg/infra/db" + "github.com/grafana/grafana/pkg/infra/kvstore" + "github.com/grafana/grafana/pkg/infra/serverlock" + "github.com/grafana/grafana/pkg/infra/tracing" acmock "github.com/grafana/grafana/pkg/services/accesscontrol/mock" "github.com/grafana/grafana/pkg/services/apiserver/client" "github.com/grafana/grafana/pkg/services/dashboards" @@ -121,6 +124,8 @@ func TestValidateDashboardExists(t *testing.T) { nil, dualwrite.ProvideTestService(), sort.ProvideService(), + serverlock.ProvideService(sqlStore, tracing.InitializeTracerForTest()), + kvstore.NewFakeKVStore(), ) require.NoError(t, err) s := ProvideService(dsStore, secretsService, dashSvc) diff --git a/pkg/services/folder/folderimpl/folder_test.go b/pkg/services/folder/folderimpl/folder_test.go index b6361936a5d..fcb88a922cf 100644 --- a/pkg/services/folder/folderimpl/folder_test.go +++ b/pkg/services/folder/folderimpl/folder_test.go @@ -20,8 +20,10 @@ import ( "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/infra/db" "github.com/grafana/grafana/pkg/infra/db/dbtest" + "github.com/grafana/grafana/pkg/infra/kvstore" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/log/logtest" + "github.com/grafana/grafana/pkg/infra/serverlock" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/services/accesscontrol" "github.com/grafana/grafana/pkg/services/accesscontrol/acimpl" @@ -498,7 +500,10 @@ func TestIntegrationNestedFolderService(t *testing.T) { publicDashboardFakeService.On("DeleteByDashboardUIDs", mock.Anything, mock.Anything, mock.Anything).Return(nil) dashSrv, err := dashboardservice.ProvideDashboardServiceImpl(cfg, dashStore, folderStore, featuresFlagOn, folderPermissions, ac, serviceWithFlagOn, nestedFolderStore, nil, - client.MockTestRestConfig{}, nil, quotaService, nil, publicDashboardFakeService, nil, dualwrite.ProvideTestService(), sort.ProvideService()) + client.MockTestRestConfig{}, nil, quotaService, nil, publicDashboardFakeService, nil, dualwrite.ProvideTestService(), sort.ProvideService(), + serverlock.ProvideService(db, tracing.InitializeTracerForTest()), + kvstore.NewFakeKVStore(), + ) require.NoError(t, err) dashSrv.RegisterDashboardPermissions(dashboardPermissions) @@ -584,7 +589,10 @@ func TestIntegrationNestedFolderService(t *testing.T) { publicDashboardFakeService.On("DeleteByDashboardUIDs", mock.Anything, mock.Anything, mock.Anything).Return(nil) dashSrv, err := dashboardservice.ProvideDashboardServiceImpl(cfg, dashStore, folderStore, featuresFlagOff, - folderPermissions, ac, serviceWithFlagOff, nestedFolderStore, nil, client.MockTestRestConfig{}, nil, quotaService, nil, publicDashboardFakeService, nil, dualwrite.ProvideTestService(), sort.ProvideService()) + folderPermissions, ac, serviceWithFlagOff, nestedFolderStore, nil, client.MockTestRestConfig{}, nil, quotaService, nil, publicDashboardFakeService, nil, dualwrite.ProvideTestService(), sort.ProvideService(), + serverlock.ProvideService(db, tracing.InitializeTracerForTest()), + kvstore.NewFakeKVStore(), + ) require.NoError(t, err) dashSrv.RegisterDashboardPermissions(dashboardPermissions) alertStore, err := ngstore.ProvideDBStore(cfg, featuresFlagOff, db, serviceWithFlagOff, dashSrv, ac, b) @@ -729,7 +737,10 @@ func TestIntegrationNestedFolderService(t *testing.T) { dashSrv, err := dashboardservice.ProvideDashboardServiceImpl(cfg, dashStore, folderStore, tc.featuresFlag, folderPermissions, ac, tc.service, tc.service.store, nil, client.MockTestRestConfig{}, nil, quotaService, nil, publicDashboardFakeService, nil, - dualwrite.ProvideTestService(), sort.ProvideService()) + dualwrite.ProvideTestService(), sort.ProvideService(), + serverlock.ProvideService(db, tracing.InitializeTracerForTest()), + kvstore.NewFakeKVStore(), + ) require.NoError(t, err) dashSrv.RegisterDashboardPermissions(dashboardPermissions) @@ -1524,6 +1535,8 @@ func TestIntegrationNestedFolderSharedWithMe(t *testing.T) { nil, dualwrite.ProvideTestService(), sort.ProvideService(), + serverlock.ProvideService(db, tracing.InitializeTracerForTest()), + kvstore.NewFakeKVStore(), ) require.NoError(t, err) dashboardService.RegisterDashboardPermissions(dashboardPermissions) diff --git a/pkg/services/libraryelements/libraryelements_test.go b/pkg/services/libraryelements/libraryelements_test.go index ee1d1fe7ec6..8eec5685b14 100644 --- a/pkg/services/libraryelements/libraryelements_test.go +++ b/pkg/services/libraryelements/libraryelements_test.go @@ -18,7 +18,9 @@ import ( "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/db" + "github.com/grafana/grafana/pkg/infra/kvstore" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/serverlock" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/kinds/librarypanel" "github.com/grafana/grafana/pkg/services/accesscontrol" @@ -366,6 +368,8 @@ func createDashboard(t *testing.T, sqlStore db.DB, user user.SignedInUser, dash nil, dualwrite.ProvideTestService(), sort.ProvideService(), + serverlock.ProvideService(sqlStore, tracing.InitializeTracerForTest()), + kvstore.NewFakeKVStore(), ) require.NoError(t, err) service.RegisterDashboardPermissions(dashboardPermissions) @@ -459,6 +463,8 @@ func scenarioWithPanel(t *testing.T, desc string, fn func(t *testing.T, sc scena features, folderPermissions, ac, folderSvc, fStore, nil, client.MockTestRestConfig{}, nil, quotaService, nil, nil, nil, dualwrite.ProvideTestService(), sort.ProvideService(), + serverlock.ProvideService(sqlStore, tracing.InitializeTracerForTest()), + kvstore.NewFakeKVStore(), ) require.NoError(t, svcErr) dashboardService.RegisterDashboardPermissions(dashboardPermissions) @@ -532,6 +538,8 @@ func testScenario(t *testing.T, desc string, fn func(t *testing.T, sc scenarioCo features, folderPermissions, ac, folderSvc, fStore, nil, client.MockTestRestConfig{}, nil, quotaService, nil, nil, nil, dualwrite.ProvideTestService(), sort.ProvideService(), + serverlock.ProvideService(sqlStore, tracing.InitializeTracerForTest()), + kvstore.NewFakeKVStore(), ) require.NoError(t, dashSvcErr) dashService.RegisterDashboardPermissions(dashboardPermissions) diff --git a/pkg/services/librarypanels/librarypanels_test.go b/pkg/services/librarypanels/librarypanels_test.go index 531f3714eca..930ec705e64 100644 --- a/pkg/services/librarypanels/librarypanels_test.go +++ b/pkg/services/librarypanels/librarypanels_test.go @@ -15,7 +15,9 @@ import ( "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/db" + "github.com/grafana/grafana/pkg/infra/kvstore" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/serverlock" "github.com/grafana/grafana/pkg/infra/slugify" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/kinds/librarypanel" @@ -739,7 +741,8 @@ func createDashboard(t *testing.T, sqlStore db.DB, user *user.SignedInUser, dash features, acmock.NewMockedPermissionsService(), ac, foldertest.NewFakeService(), folder.NewFakeStore(), nil, client.MockTestRestConfig{}, nil, quotaService, nil, nil, nil, dualwrite.ProvideTestService(), sort.ProvideService(), - ) + serverlock.ProvideService(sqlStore, tracing.InitializeTracerForTest()), + kvstore.NewFakeKVStore()) require.NoError(t, err) service.RegisterDashboardPermissions(dashPermissionService) dashboard, err := service.SaveDashboard(context.Background(), dashItem, true) @@ -837,7 +840,8 @@ func testScenario(t *testing.T, desc string, fn func(t *testing.T, sc scenarioCo features, acmock.NewMockedPermissionsService(), ac, folderSvc, folder.NewFakeStore(), nil, client.MockTestRestConfig{}, nil, quotaService, nil, nil, nil, dualwrite.ProvideTestService(), sort.ProvideService(), - ) + serverlock.ProvideService(sqlStore, tracing.InitializeTracerForTest()), + kvstore.NewFakeKVStore()) require.NoError(t, err) dashService.RegisterDashboardPermissions(dashPermissionService) guardian.InitAccessControlGuardian(cfg, ac, dashService, folderSvc, log.NewNopLogger()) diff --git a/pkg/services/ngalert/testutil/testutil.go b/pkg/services/ngalert/testutil/testutil.go index 0e13a46834d..daa61da93d3 100644 --- a/pkg/services/ngalert/testutil/testutil.go +++ b/pkg/services/ngalert/testutil/testutil.go @@ -8,6 +8,8 @@ import ( "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/infra/db" + "github.com/grafana/grafana/pkg/infra/kvstore" + "github.com/grafana/grafana/pkg/infra/serverlock" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/services/accesscontrol" acmock "github.com/grafana/grafana/pkg/services/accesscontrol/mock" @@ -67,6 +69,8 @@ func SetupDashboardService(tb testing.TB, sqlStore db.DB, fs *folderimpl.Dashboa foldertest.NewFakeService(), folder.NewFakeStore(), nil, client.MockTestRestConfig{}, nil, quotaService, nil, nil, nil, dualwrite.ProvideTestService(), sort.ProvideService(), + serverlock.ProvideService(sqlStore, tracing.InitializeTracerForTest()), + kvstore.NewFakeKVStore(), ) require.NoError(tb, err) dashboardService.RegisterDashboardPermissions(dashboardPermissions) diff --git a/pkg/services/publicdashboards/api/query_test.go b/pkg/services/publicdashboards/api/query_test.go index 8c27d18b0ea..ddbf2925d4b 100644 --- a/pkg/services/publicdashboards/api/query_test.go +++ b/pkg/services/publicdashboards/api/query_test.go @@ -21,8 +21,11 @@ import ( "github.com/grafana/grafana/pkg/apimachinery/errutil" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/db" + "github.com/grafana/grafana/pkg/infra/kvstore" "github.com/grafana/grafana/pkg/infra/localcache" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/serverlock" + "github.com/grafana/grafana/pkg/infra/tracing" acmock "github.com/grafana/grafana/pkg/services/accesscontrol/mock" "github.com/grafana/grafana/pkg/services/annotations/annotationstest" "github.com/grafana/grafana/pkg/services/apiserver/client" @@ -330,6 +333,8 @@ func TestIntegrationUnauthenticatedUserCanGetPubdashPanelQueryData(t *testing.T) featuremgmt.WithFeatures(), acmock.NewMockedPermissionsService(), ac, foldertest.NewFakeService(), folder.NewFakeStore(), nil, client.MockTestRestConfig{}, nil, quotatest.New(false, nil), nil, nil, nil, dualwrite.ProvideTestService(), sort.ProvideService(), + serverlock.ProvideService(db, tracing.InitializeTracerForTest()), + kvstore.NewFakeKVStore(), ) require.NoError(t, err) dashService.RegisterDashboardPermissions(dashPermissionService) diff --git a/pkg/services/publicdashboards/service/service_test.go b/pkg/services/publicdashboards/service/service_test.go index 67feb554cf2..e8ea78749a6 100644 --- a/pkg/services/publicdashboards/service/service_test.go +++ b/pkg/services/publicdashboards/service/service_test.go @@ -19,6 +19,8 @@ import ( "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/db" + "github.com/grafana/grafana/pkg/infra/kvstore" + "github.com/grafana/grafana/pkg/infra/serverlock" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/services/accesscontrol" "github.com/grafana/grafana/pkg/services/accesscontrol/actest" @@ -1400,7 +1402,9 @@ func TestPublicDashboardServiceImpl_ListPublicDashboards(t *testing.T) { fStore, ac, bus.ProvideBus(tracing.InitializeTracerForTest()), dashStore, folderStore, nil, testDB, features, supportbundlestest.NewFakeBundleService(), nil, cfg, nil, tracing.InitializeTracerForTest(), nil, dualwrite.ProvideTestService(), sort.ProvideService()) - dashboardService, err := dashsvc.ProvideDashboardServiceImpl(cfg, dashStore, folderStore, featuremgmt.WithFeatures(), folderPermissions, ac, folderSvc, fStore, nil, client.MockTestRestConfig{}, nil, quotatest.New(false, nil), nil, nil, nil, dualwrite.ProvideTestService(), sort.ProvideService()) + dashboardService, err := dashsvc.ProvideDashboardServiceImpl(cfg, dashStore, folderStore, featuremgmt.WithFeatures(), folderPermissions, ac, folderSvc, fStore, nil, client.MockTestRestConfig{}, nil, quotatest.New(false, nil), nil, nil, nil, dualwrite.ProvideTestService(), sort.ProvideService(), + serverlock.ProvideService(testDB, tracing.InitializeTracerForTest()), + kvstore.NewFakeKVStore()) require.NoError(t, err) dashboardService.RegisterDashboardPermissions(&actest.FakePermissionsService{}) diff --git a/pkg/services/quota/quotaimpl/quota_test.go b/pkg/services/quota/quotaimpl/quota_test.go index 6100f1f0e6a..c8cc6d5dfde 100644 --- a/pkg/services/quota/quotaimpl/quota_test.go +++ b/pkg/services/quota/quotaimpl/quota_test.go @@ -12,7 +12,9 @@ import ( "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/infra/db" "github.com/grafana/grafana/pkg/infra/httpclient" + "github.com/grafana/grafana/pkg/infra/kvstore" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/serverlock" "github.com/grafana/grafana/pkg/infra/tracing" pluginfakes "github.com/grafana/grafana/pkg/plugins/manager/fakes" "github.com/grafana/grafana/pkg/services/accesscontrol/acimpl" @@ -499,7 +501,9 @@ func setupEnv(t *testing.T, sqlStore db.DB, cfg *setting.Cfg, b bus.Bus, quotaSe fStore, acmock.New(), bus.ProvideBus(tracing.InitializeTracerForTest()), dashStore, folderStore, nil, sqlStore, featuremgmt.WithFeatures(), supportbundlestest.NewFakeBundleService(), nil, cfg, nil, tracing.InitializeTracerForTest(), nil, dualwrite.ProvideTestService(), sort.ProvideService()) dashService, err := dashService.ProvideDashboardServiceImpl(cfg, dashStore, folderStore, featuremgmt.WithFeatures(), acmock.NewMockedPermissionsService(), - ac, folderSvc, fStore, nil, client.MockTestRestConfig{}, nil, quotaService, nil, nil, nil, dualwrite.ProvideTestService(), sort.ProvideService()) + ac, folderSvc, fStore, nil, client.MockTestRestConfig{}, nil, quotaService, nil, nil, nil, dualwrite.ProvideTestService(), sort.ProvideService(), + serverlock.ProvideService(sqlStore, tracing.InitializeTracerForTest()), + kvstore.NewFakeKVStore()) require.NoError(t, err) dashService.RegisterDashboardPermissions(acmock.NewMockedPermissionsService()) secretsService := secretsmng.SetupTestService(t, fakes.NewFakeSecretsStore()) diff --git a/pkg/setting/setting.go b/pkg/setting/setting.go index 815553d3813..59fe66d9175 100644 --- a/pkg/setting/setting.go +++ b/pkg/setting/setting.go @@ -181,6 +181,9 @@ type Cfg struct { DataProxyWhiteList map[string]bool ActionsAllowPostURL string + // K8s Dashboard Cleanup + K8sDashboardCleanup K8sDashboardCleanupSettings + TempDataLifetime time.Duration // Plugins @@ -1291,6 +1294,7 @@ func (cfg *Cfg) parseINIFile(iniFile *ini.File) error { cfg.readDataSourcesSettings() cfg.readDataSourceSecuritySettings() + cfg.readK8sDashboardCleanupSettings() cfg.readSqlDataSourceSettings() cfg.Storage = readStorageSettings(iniFile) diff --git a/pkg/setting/setting_k8s_dashboard_cleanup.go b/pkg/setting/setting_k8s_dashboard_cleanup.go new file mode 100644 index 00000000000..660de43c388 --- /dev/null +++ b/pkg/setting/setting_k8s_dashboard_cleanup.go @@ -0,0 +1,53 @@ +package setting + +import ( + "time" +) + +type K8sDashboardCleanupSettings struct { + Interval time.Duration + Timeout time.Duration + BatchSize int64 +} + +const ( + defaultK8sDashboardCleanupInterval = 30 * time.Second + defaultK8sDashboardCleanupBatchSize = int64(10) + minK8sDashboardCleanupInterval = 10 * time.Second + minK8sDashboardCleanupTimeout = 5 * time.Second + minK8sDashboardCleanupBatchSize = int64(5) + maxK8sDashboardCleanupBatchSize = int64(200) +) + +func (cfg *Cfg) readK8sDashboardCleanupSettings() { + section := cfg.Raw.Section("dashboard_cleanup") + + // Read interval setting with validation + cleanupInterval := section.Key("interval").MustDuration(defaultK8sDashboardCleanupInterval) + if cleanupInterval < minK8sDashboardCleanupInterval { + cfg.Logger.Warn("[dashboard_cleanup.interval] is too low; the minimum allowed (10s) is enforced") + cleanupInterval = minK8sDashboardCleanupInterval + } + + // Calculate timeout as 5 seconds less than interval, with minimum validation + cleanupTimeout := cleanupInterval - (5 * time.Second) + if cleanupTimeout < minK8sDashboardCleanupTimeout { + cleanupTimeout = minK8sDashboardCleanupTimeout + } + + // Read batch size with validation + batchSize := section.Key("batch_size").MustInt64(defaultK8sDashboardCleanupBatchSize) + if batchSize < minK8sDashboardCleanupBatchSize { + cfg.Logger.Warn("[dashboard_cleanup.batch_size] is too low; the minimum allowed (5) is enforced") + batchSize = minK8sDashboardCleanupBatchSize + } else if batchSize > maxK8sDashboardCleanupBatchSize { + cfg.Logger.Warn("[dashboard_cleanup.batch_size] is too high; the maximum allowed (1000) is enforced") + batchSize = maxK8sDashboardCleanupBatchSize + } + + cfg.K8sDashboardCleanup = K8sDashboardCleanupSettings{ + Interval: cleanupInterval, + Timeout: cleanupTimeout, + BatchSize: batchSize, + } +}