Compare commits

...

9 Commits

Author SHA1 Message Date
Ezequiel Victorero f218122ae5 Merge branch 'main' into evictorero/snapshtos-mt-cleanup-expired 2026-01-14 15:43:33 -03:00
Ezequiel Victorero 7b91acbaeb apply feedback review 2026-01-14 15:43:27 -03:00
nmarrs ea5b39cd9f Merge remote-tracking branch 'origin' into evictorero/snapshtos-mt-cleanup-expired 2026-01-09 16:40:36 -08:00
Ezequiel Victorero ba79a2bbd6 fix lint 2026-01-09 08:06:55 -03:00
Ezequiel Victorero 3175275c25 Merge branch 'main' into evictorero/snapshtos-mt-cleanup-expired 2026-01-09 08:04:48 -03:00
Ezequiel Victorero 4a3cf7abaf Merge branch 'main' into evictorero/snapshtos-mt-cleanup-expired 2026-01-06 11:12:19 -03:00
Ezequiel Victorero 1cbbce160d add test 2026-01-06 11:11:39 -03:00
Ezequiel Victorero 47fbff6136 fix error and update comments 2026-01-05 17:18:48 -03:00
Ezequiel Victorero d98dd3e952 Snapshots: Cleanup using k8s api 2025-12-19 17:21:22 -03:00
2 changed files with 351 additions and 5 deletions
+90 -5
View File
@@ -15,7 +15,9 @@ import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"github.com/grafana/grafana/apps/dashboard/pkg/apis/dashboard/v0alpha1"
"github.com/grafana/grafana/apps/shorturl/pkg/apis/shorturl/v1beta1"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/infra/db"
@@ -61,6 +63,7 @@ type CleanUpService struct {
orgService org.Service
teamService team.Service
dataSourceService datasources.DataSourceService
dynamicClientFactory func(*rest.Config) (dynamic.Interface, error)
}
func ProvideService(cfg *setting.Cfg, Features featuremgmt.FeatureToggles, serverLockService *serverlock.ServerLockService,
@@ -86,6 +89,9 @@ func ProvideService(cfg *setting.Cfg, Features featuremgmt.FeatureToggles, serve
orgService: orgService,
teamService: teamService,
dataSourceService: dataSourceService,
dynamicClientFactory: func(c *rest.Config) (dynamic.Interface, error) {
return dynamic.NewForConfig(c)
},
}
return s
}
@@ -230,14 +236,93 @@ func (srv *CleanUpService) shouldCleanupTempFile(filemtime time.Time, now time.T
func (srv *CleanUpService) deleteExpiredSnapshots(ctx context.Context) {
logger := srv.log.FromContext(ctx)
cmd := dashboardsnapshots.DeleteExpiredSnapshotsCommand{}
if err := srv.dashboardSnapshotService.DeleteExpiredSnapshots(ctx, &cmd); err != nil {
logger.Error("Failed to delete expired snapshots", "error", err.Error())
//nolint:staticcheck // not yet migrated to OpenFeature
if srv.Features.IsEnabledGlobally(featuremgmt.FlagKubernetesSnapshots) {
srv.deleteKubernetesExpiredSnapshots(ctx)
} else {
logger.Debug("Deleted expired snapshots", "rows affected", cmd.DeletedRows)
cmd := dashboardsnapshots.DeleteExpiredSnapshotsCommand{}
if err := srv.dashboardSnapshotService.DeleteExpiredSnapshots(ctx, &cmd); err != nil {
logger.Error("Failed to delete expired snapshots", "error", err.Error())
} else {
logger.Debug("Deleted expired snapshots", "rows affected", cmd.DeletedRows)
}
}
}
func (srv *CleanUpService) deleteKubernetesExpiredSnapshots(ctx context.Context) {
logger := srv.log.FromContext(ctx)
logger.Debug("Starting deleting expired Kubernetes snapshots")
// Create the dynamic client for Kubernetes API
restConfig, err := srv.clientConfigProvider.GetRestConfig(ctx)
if err != nil {
logger.Error("Failed to get REST config for Kubernetes client", "error", err.Error())
return
}
client, err := srv.dynamicClientFactory(restConfig)
if err != nil {
logger.Error("Failed to create Kubernetes client", "error", err.Error())
return
}
// Set up the GroupVersionResource for snapshots
gvr := v0alpha1.SnapshotKind().GroupVersionResource()
// Expiration time is now
expirationTime := time.Now()
expirationTimestamp := expirationTime.UnixMilli()
deletedCount := 0
// List and delete expired snapshots across all namespaces
orgs, err := srv.orgService.Search(ctx, &org.SearchOrgsQuery{})
if err != nil {
logger.Error("Failed to list organizations", "error", err.Error())
return
}
for _, o := range orgs {
ctx, _ := identity.WithServiceIdentity(ctx, o.ID)
namespaceMapper := request.GetNamespaceMapper(srv.Cfg)
snapshots, err := client.Resource(gvr).Namespace(namespaceMapper(o.ID)).List(ctx, v1.ListOptions{})
if err != nil {
logger.Error("Failed to list snapshots for org", "orgID", o.ID, "error", err.Error())
continue
}
// Check each snapshot for expiration
for _, item := range snapshots.Items {
// Convert unstructured object to Snapshot struct
var snapshot v0alpha1.Snapshot
err := runtime.DefaultUnstructuredConverter.FromUnstructured(item.Object, &snapshot)
if err != nil {
logger.Error("Failed to convert unstructured object to snapshot", "name", item.GetName(), "namespace", item.GetNamespace(), "error", err.Error())
continue
}
// Only delete expired snapshots
if snapshot.Spec.Expires != nil && *snapshot.Spec.Expires < expirationTimestamp {
namespace := snapshot.Namespace
name := snapshot.Name
err := client.Resource(gvr).Namespace(namespace).Delete(ctx, name, v1.DeleteOptions{})
if err != nil {
// Check if it's a "not found" error, which is expected if the resource was already deleted
if k8serrors.IsNotFound(err) {
logger.Debug("Snapshot already deleted", "name", name, "namespace", namespace)
} else {
logger.Error("Failed to delete expired snapshot", "name", name, "namespace", namespace, "error", err.Error())
}
} else {
deletedCount++
logger.Debug("Successfully deleted expired snapshot", "name", name, "namespace", namespace, "creationTime", snapshot.CreationTimestamp.Unix(), "expirationTime", expirationTimestamp)
}
}
}
}
logger.Debug("Deleted expired Kubernetes snapshots", "count", deletedCount)
}
func (srv *CleanUpService) deleteExpiredDashboardVersions(ctx context.Context) {
logger := srv.log.FromContext(ctx)
cmd := dashver.DeleteExpiredVersionsCommand{}
@@ -318,7 +403,7 @@ func (srv *CleanUpService) deleteStaleKubernetesShortURLs(ctx context.Context) {
return
}
client, err := dynamic.NewForConfig(restConfig)
client, err := srv.dynamicClientFactory(restConfig)
if err != nil {
logger.Error("Failed to create Kubernetes client", "error", err.Error())
return
+261
View File
@@ -1,11 +1,28 @@
package cleanup
import (
"context"
"errors"
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"github.com/grafana/grafana/apps/dashboard/pkg/apis/dashboard/v0alpha1"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/apiserver"
"github.com/grafana/grafana/pkg/services/dashboardsnapshots"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/org"
"github.com/grafana/grafana/pkg/services/org/orgtest"
"github.com/grafana/grafana/pkg/setting"
)
@@ -36,3 +53,247 @@ func TestCleanUpTmpFiles(t *testing.T) {
require.False(t, service.shouldCleanupTempFile(weekAgo, now))
})
}
func TestDeleteExpiredSnapshots_LegacyMode(t *testing.T) {
t.Run("calls DeleteExpiredSnapshots on success", func(t *testing.T) {
mockSnapService := dashboardsnapshots.NewMockService(t)
mockSnapService.On("DeleteExpiredSnapshots", mock.Anything, mock.Anything).Return(nil)
service := &CleanUpService{
log: log.New("cleanup"),
Features: featuremgmt.WithFeatures(),
dashboardSnapshotService: mockSnapService,
}
service.deleteExpiredSnapshots(context.Background())
mockSnapService.AssertCalled(t, "DeleteExpiredSnapshots", mock.Anything, mock.Anything)
})
t.Run("handles error gracefully", func(t *testing.T) {
mockSnapService := dashboardsnapshots.NewMockService(t)
mockSnapService.On("DeleteExpiredSnapshots", mock.Anything, mock.Anything).Return(errors.New("db error"))
service := &CleanUpService{
log: log.New("cleanup"),
Features: featuremgmt.WithFeatures(),
dashboardSnapshotService: mockSnapService,
}
// Should not panic
service.deleteExpiredSnapshots(context.Background())
})
}
func TestDeleteExpiredSnapshots_KubernetesMode(t *testing.T) {
t.Run("deletes expired snapshots across multiple orgs", func(t *testing.T) {
// Create expired snapshots - one per org
expiredTime := time.Now().Add(-time.Hour).UnixMilli()
expiredSnapshot1 := createUnstructuredSnapshot("expired-snap-1", "org-1", expiredTime)
expiredSnapshot2 := createUnstructuredSnapshot("expired-snap-2", "org-2", expiredTime)
// Track which namespaces were queried
namespacesQueried := make(map[string]bool)
mockResource := new(mockResourceInterface)
mockResource.On("Namespace", mock.Anything).Run(func(args mock.Arguments) {
ns := args.Get(0).(string)
namespacesQueried[ns] = true
}).Return(mockResource)
mockResource.On("List", mock.Anything, mock.Anything).Return(&unstructured.UnstructuredList{
Items: []unstructured.Unstructured{*expiredSnapshot1, *expiredSnapshot2},
}, nil)
mockResource.On("Delete", mock.Anything, "expired-snap-1", mock.Anything, mock.Anything).Return(nil)
mockResource.On("Delete", mock.Anything, "expired-snap-2", mock.Anything, mock.Anything).Return(nil)
mockDynClient := new(mockDynamicClient)
mockDynClient.On("Resource", mock.Anything).Return(mockResource)
service := createK8sCleanupService(t, mockDynClient)
service.deleteExpiredSnapshots(context.Background())
// Verify multiple namespaces were queried (one per org)
require.GreaterOrEqual(t, len(namespacesQueried), 2, "expected at least 2 namespaces to be queried")
// Verify both snapshots were deleted
mockResource.AssertCalled(t, "Delete", mock.Anything, "expired-snap-1", mock.Anything, mock.Anything)
mockResource.AssertCalled(t, "Delete", mock.Anything, "expired-snap-2", mock.Anything, mock.Anything)
})
t.Run("skips non-expired snapshots", func(t *testing.T) {
// Setup with future timestamp
futureTime := time.Now().Add(time.Hour).UnixMilli()
futureSnapshot := createUnstructuredSnapshot("future-snap", "org-1", futureTime)
mockResource := new(mockResourceInterface)
mockResource.On("Namespace", mock.Anything).Return(mockResource)
mockResource.On("List", mock.Anything, mock.Anything).Return(&unstructured.UnstructuredList{
Items: []unstructured.Unstructured{*futureSnapshot},
}, nil)
mockDynClient := new(mockDynamicClient)
mockDynClient.On("Resource", mock.Anything).Return(mockResource)
service := createK8sCleanupService(t, mockDynClient)
service.deleteExpiredSnapshots(context.Background())
mockResource.AssertNotCalled(t, "Delete", mock.Anything, mock.Anything, mock.Anything, mock.Anything)
})
t.Run("handles REST config error", func(t *testing.T) {
service := &CleanUpService{
log: log.New("cleanup"),
Cfg: &setting.Cfg{},
Features: featuremgmt.WithFeatures(featuremgmt.FlagKubernetesSnapshots),
clientConfigProvider: apiserver.WithoutRestConfig,
}
// Should not panic
service.deleteExpiredSnapshots(context.Background())
})
t.Run("handles not found error gracefully", func(t *testing.T) {
expiredTime := time.Now().Add(-time.Hour).UnixMilli()
expiredSnapshot := createUnstructuredSnapshot("expired-snap", "org-1", expiredTime)
notFoundErr := k8serrors.NewNotFound(schema.GroupResource{}, "expired-snap")
mockResource := new(mockResourceInterface)
mockResource.On("Namespace", mock.Anything).Return(mockResource)
mockResource.On("List", mock.Anything, mock.Anything).Return(&unstructured.UnstructuredList{
Items: []unstructured.Unstructured{*expiredSnapshot},
}, nil)
mockResource.On("Delete", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(notFoundErr)
mockDynClient := new(mockDynamicClient)
mockDynClient.On("Resource", mock.Anything).Return(mockResource)
service := createK8sCleanupService(t, mockDynClient)
// Should not panic - not found is expected
service.deleteExpiredSnapshots(context.Background())
mockResource.AssertExpectations(t)
})
t.Run("continues processing other orgs when one org fails to list snapshots", func(t *testing.T) {
// Create an expired snapshot for the second org
expiredTime := time.Now().Add(-time.Hour).UnixMilli()
expiredSnapshot := createUnstructuredSnapshot("expired-snap-org2", "org-2", expiredTime)
// Use namespace-aware mock that returns error for first namespace, success for second
mockResource := &namespaceAwareMockResource{
expiredSnapshot: expiredSnapshot,
deletedNames: make(map[string]bool),
}
mockDynClient := new(mockDynamicClient)
mockDynClient.On("Resource", mock.Anything).Return(mockResource)
service := createK8sCleanupService(t, mockDynClient)
service.deleteExpiredSnapshots(context.Background())
// Verify that the snapshot from the second org was still deleted despite the first org failing
require.True(t, mockResource.deletedNames["expired-snap-org2"], "expected snapshot from second org to be deleted")
})
}
// Helper function to create unstructured snapshots for testing
func createUnstructuredSnapshot(name, namespace string, expiresMillis int64) *unstructured.Unstructured {
snapshot := &v0alpha1.Snapshot{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: v0alpha1.SnapshotSpec{
Expires: &expiresMillis,
},
}
obj, _ := runtime.DefaultUnstructuredConverter.ToUnstructured(snapshot)
return &unstructured.Unstructured{Object: obj}
}
// Helper to create CleanUpService configured for Kubernetes mode with standard two-org setup
func createK8sCleanupService(t *testing.T, mockDynClient *mockDynamicClient) *CleanUpService {
mockOrgSvc := orgtest.NewMockService(t)
mockOrgSvc.On("Search", mock.Anything, mock.Anything).Return([]*org.OrgDTO{
{ID: 1, Name: "org1"},
{ID: 2, Name: "org2"},
}, nil)
return &CleanUpService{
log: log.New("cleanup"),
Cfg: &setting.Cfg{},
Features: featuremgmt.WithFeatures(featuremgmt.FlagKubernetesSnapshots),
clientConfigProvider: apiserver.RestConfigProviderFunc(func(ctx context.Context) (*rest.Config, error) {
return &rest.Config{}, nil
}),
orgService: mockOrgSvc,
dynamicClientFactory: func(cfg *rest.Config) (dynamic.Interface, error) {
return mockDynClient, nil
},
}
}
// mockDynamicClient is a minimal mock for dynamic.Interface
type mockDynamicClient struct {
mock.Mock
}
func (m *mockDynamicClient) Resource(resource schema.GroupVersionResource) dynamic.NamespaceableResourceInterface {
args := m.Called(resource)
return args.Get(0).(dynamic.NamespaceableResourceInterface)
}
// mockResourceInterface is a minimal mock for dynamic.ResourceInterface.
// Embeds dynamic.ResourceInterface to avoid implementing unused methods.
type mockResourceInterface struct {
mock.Mock
dynamic.ResourceInterface
}
func (m *mockResourceInterface) Namespace(ns string) dynamic.ResourceInterface {
args := m.Called(ns)
return args.Get(0).(dynamic.ResourceInterface)
}
func (m *mockResourceInterface) List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) {
args := m.Called(ctx, opts)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(*unstructured.UnstructuredList), args.Error(1)
}
func (m *mockResourceInterface) Delete(ctx context.Context, name string, opts metav1.DeleteOptions, subresources ...string) error {
args := m.Called(ctx, name, opts, subresources)
return args.Error(0)
}
// namespaceAwareMockResource is a mock that returns different results based on namespace.
// Used to test that processing continues when one org fails.
// Embeds dynamic.ResourceInterface to avoid implementing unused methods.
type namespaceAwareMockResource struct {
dynamic.ResourceInterface
currentNamespace string
expiredSnapshot *unstructured.Unstructured
deletedNames map[string]bool
}
func (m *namespaceAwareMockResource) Namespace(ns string) dynamic.ResourceInterface {
m.currentNamespace = ns
return m
}
func (m *namespaceAwareMockResource) List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) {
// Fail for the first org namespace (org ID 1 maps to "default"), succeed for the second (org ID 2 maps to "org-2")
if m.currentNamespace == "default" {
return nil, errors.New("simulated list error for org-1")
}
return &unstructured.UnstructuredList{
Items: []unstructured.Unstructured{*m.expiredSnapshot},
}, nil
}
func (m *namespaceAwareMockResource) Delete(ctx context.Context, name string, opts metav1.DeleteOptions, subresources ...string) error {
m.deletedNames[name] = true
return nil
}