Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f218122ae5 | |||
| 7b91acbaeb | |||
| ea5b39cd9f | |||
| ba79a2bbd6 | |||
| 3175275c25 | |||
| 4a3cf7abaf | |||
| 1cbbce160d | |||
| 47fbff6136 | |||
| d98dd3e952 |
@@ -15,7 +15,9 @@ import (
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/rest"
|
||||
|
||||
"github.com/grafana/grafana/apps/dashboard/pkg/apis/dashboard/v0alpha1"
|
||||
"github.com/grafana/grafana/apps/shorturl/pkg/apis/shorturl/v1beta1"
|
||||
"github.com/grafana/grafana/pkg/apimachinery/identity"
|
||||
"github.com/grafana/grafana/pkg/infra/db"
|
||||
@@ -61,6 +63,7 @@ type CleanUpService struct {
|
||||
orgService org.Service
|
||||
teamService team.Service
|
||||
dataSourceService datasources.DataSourceService
|
||||
dynamicClientFactory func(*rest.Config) (dynamic.Interface, error)
|
||||
}
|
||||
|
||||
func ProvideService(cfg *setting.Cfg, Features featuremgmt.FeatureToggles, serverLockService *serverlock.ServerLockService,
|
||||
@@ -86,6 +89,9 @@ func ProvideService(cfg *setting.Cfg, Features featuremgmt.FeatureToggles, serve
|
||||
orgService: orgService,
|
||||
teamService: teamService,
|
||||
dataSourceService: dataSourceService,
|
||||
dynamicClientFactory: func(c *rest.Config) (dynamic.Interface, error) {
|
||||
return dynamic.NewForConfig(c)
|
||||
},
|
||||
}
|
||||
return s
|
||||
}
|
||||
@@ -230,14 +236,93 @@ func (srv *CleanUpService) shouldCleanupTempFile(filemtime time.Time, now time.T
|
||||
|
||||
func (srv *CleanUpService) deleteExpiredSnapshots(ctx context.Context) {
|
||||
logger := srv.log.FromContext(ctx)
|
||||
cmd := dashboardsnapshots.DeleteExpiredSnapshotsCommand{}
|
||||
if err := srv.dashboardSnapshotService.DeleteExpiredSnapshots(ctx, &cmd); err != nil {
|
||||
logger.Error("Failed to delete expired snapshots", "error", err.Error())
|
||||
//nolint:staticcheck // not yet migrated to OpenFeature
|
||||
if srv.Features.IsEnabledGlobally(featuremgmt.FlagKubernetesSnapshots) {
|
||||
srv.deleteKubernetesExpiredSnapshots(ctx)
|
||||
} else {
|
||||
logger.Debug("Deleted expired snapshots", "rows affected", cmd.DeletedRows)
|
||||
cmd := dashboardsnapshots.DeleteExpiredSnapshotsCommand{}
|
||||
if err := srv.dashboardSnapshotService.DeleteExpiredSnapshots(ctx, &cmd); err != nil {
|
||||
logger.Error("Failed to delete expired snapshots", "error", err.Error())
|
||||
} else {
|
||||
logger.Debug("Deleted expired snapshots", "rows affected", cmd.DeletedRows)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (srv *CleanUpService) deleteKubernetesExpiredSnapshots(ctx context.Context) {
|
||||
logger := srv.log.FromContext(ctx)
|
||||
logger.Debug("Starting deleting expired Kubernetes snapshots")
|
||||
|
||||
// Create the dynamic client for Kubernetes API
|
||||
restConfig, err := srv.clientConfigProvider.GetRestConfig(ctx)
|
||||
if err != nil {
|
||||
logger.Error("Failed to get REST config for Kubernetes client", "error", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
client, err := srv.dynamicClientFactory(restConfig)
|
||||
if err != nil {
|
||||
logger.Error("Failed to create Kubernetes client", "error", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// Set up the GroupVersionResource for snapshots
|
||||
gvr := v0alpha1.SnapshotKind().GroupVersionResource()
|
||||
|
||||
// Expiration time is now
|
||||
expirationTime := time.Now()
|
||||
expirationTimestamp := expirationTime.UnixMilli()
|
||||
deletedCount := 0
|
||||
|
||||
// List and delete expired snapshots across all namespaces
|
||||
orgs, err := srv.orgService.Search(ctx, &org.SearchOrgsQuery{})
|
||||
if err != nil {
|
||||
logger.Error("Failed to list organizations", "error", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
for _, o := range orgs {
|
||||
ctx, _ := identity.WithServiceIdentity(ctx, o.ID)
|
||||
namespaceMapper := request.GetNamespaceMapper(srv.Cfg)
|
||||
snapshots, err := client.Resource(gvr).Namespace(namespaceMapper(o.ID)).List(ctx, v1.ListOptions{})
|
||||
if err != nil {
|
||||
logger.Error("Failed to list snapshots for org", "orgID", o.ID, "error", err.Error())
|
||||
continue
|
||||
}
|
||||
// Check each snapshot for expiration
|
||||
for _, item := range snapshots.Items {
|
||||
// Convert unstructured object to Snapshot struct
|
||||
var snapshot v0alpha1.Snapshot
|
||||
err := runtime.DefaultUnstructuredConverter.FromUnstructured(item.Object, &snapshot)
|
||||
if err != nil {
|
||||
logger.Error("Failed to convert unstructured object to snapshot", "name", item.GetName(), "namespace", item.GetNamespace(), "error", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
// Only delete expired snapshots
|
||||
if snapshot.Spec.Expires != nil && *snapshot.Spec.Expires < expirationTimestamp {
|
||||
namespace := snapshot.Namespace
|
||||
name := snapshot.Name
|
||||
|
||||
err := client.Resource(gvr).Namespace(namespace).Delete(ctx, name, v1.DeleteOptions{})
|
||||
if err != nil {
|
||||
// Check if it's a "not found" error, which is expected if the resource was already deleted
|
||||
if k8serrors.IsNotFound(err) {
|
||||
logger.Debug("Snapshot already deleted", "name", name, "namespace", namespace)
|
||||
} else {
|
||||
logger.Error("Failed to delete expired snapshot", "name", name, "namespace", namespace, "error", err.Error())
|
||||
}
|
||||
} else {
|
||||
deletedCount++
|
||||
logger.Debug("Successfully deleted expired snapshot", "name", name, "namespace", namespace, "creationTime", snapshot.CreationTimestamp.Unix(), "expirationTime", expirationTimestamp)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.Debug("Deleted expired Kubernetes snapshots", "count", deletedCount)
|
||||
}
|
||||
|
||||
func (srv *CleanUpService) deleteExpiredDashboardVersions(ctx context.Context) {
|
||||
logger := srv.log.FromContext(ctx)
|
||||
cmd := dashver.DeleteExpiredVersionsCommand{}
|
||||
@@ -318,7 +403,7 @@ func (srv *CleanUpService) deleteStaleKubernetesShortURLs(ctx context.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
client, err := dynamic.NewForConfig(restConfig)
|
||||
client, err := srv.dynamicClientFactory(restConfig)
|
||||
if err != nil {
|
||||
logger.Error("Failed to create Kubernetes client", "error", err.Error())
|
||||
return
|
||||
|
||||
@@ -1,11 +1,28 @@
|
||||
package cleanup
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/rest"
|
||||
|
||||
"github.com/grafana/grafana/apps/dashboard/pkg/apis/dashboard/v0alpha1"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver"
|
||||
"github.com/grafana/grafana/pkg/services/dashboardsnapshots"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/org"
|
||||
"github.com/grafana/grafana/pkg/services/org/orgtest"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
)
|
||||
|
||||
@@ -36,3 +53,247 @@ func TestCleanUpTmpFiles(t *testing.T) {
|
||||
require.False(t, service.shouldCleanupTempFile(weekAgo, now))
|
||||
})
|
||||
}
|
||||
|
||||
func TestDeleteExpiredSnapshots_LegacyMode(t *testing.T) {
|
||||
t.Run("calls DeleteExpiredSnapshots on success", func(t *testing.T) {
|
||||
mockSnapService := dashboardsnapshots.NewMockService(t)
|
||||
mockSnapService.On("DeleteExpiredSnapshots", mock.Anything, mock.Anything).Return(nil)
|
||||
|
||||
service := &CleanUpService{
|
||||
log: log.New("cleanup"),
|
||||
Features: featuremgmt.WithFeatures(),
|
||||
dashboardSnapshotService: mockSnapService,
|
||||
}
|
||||
|
||||
service.deleteExpiredSnapshots(context.Background())
|
||||
|
||||
mockSnapService.AssertCalled(t, "DeleteExpiredSnapshots", mock.Anything, mock.Anything)
|
||||
})
|
||||
|
||||
t.Run("handles error gracefully", func(t *testing.T) {
|
||||
mockSnapService := dashboardsnapshots.NewMockService(t)
|
||||
mockSnapService.On("DeleteExpiredSnapshots", mock.Anything, mock.Anything).Return(errors.New("db error"))
|
||||
|
||||
service := &CleanUpService{
|
||||
log: log.New("cleanup"),
|
||||
Features: featuremgmt.WithFeatures(),
|
||||
dashboardSnapshotService: mockSnapService,
|
||||
}
|
||||
|
||||
// Should not panic
|
||||
service.deleteExpiredSnapshots(context.Background())
|
||||
})
|
||||
}
|
||||
|
||||
func TestDeleteExpiredSnapshots_KubernetesMode(t *testing.T) {
|
||||
t.Run("deletes expired snapshots across multiple orgs", func(t *testing.T) {
|
||||
// Create expired snapshots - one per org
|
||||
expiredTime := time.Now().Add(-time.Hour).UnixMilli()
|
||||
expiredSnapshot1 := createUnstructuredSnapshot("expired-snap-1", "org-1", expiredTime)
|
||||
expiredSnapshot2 := createUnstructuredSnapshot("expired-snap-2", "org-2", expiredTime)
|
||||
|
||||
// Track which namespaces were queried
|
||||
namespacesQueried := make(map[string]bool)
|
||||
|
||||
mockResource := new(mockResourceInterface)
|
||||
mockResource.On("Namespace", mock.Anything).Run(func(args mock.Arguments) {
|
||||
ns := args.Get(0).(string)
|
||||
namespacesQueried[ns] = true
|
||||
}).Return(mockResource)
|
||||
mockResource.On("List", mock.Anything, mock.Anything).Return(&unstructured.UnstructuredList{
|
||||
Items: []unstructured.Unstructured{*expiredSnapshot1, *expiredSnapshot2},
|
||||
}, nil)
|
||||
mockResource.On("Delete", mock.Anything, "expired-snap-1", mock.Anything, mock.Anything).Return(nil)
|
||||
mockResource.On("Delete", mock.Anything, "expired-snap-2", mock.Anything, mock.Anything).Return(nil)
|
||||
|
||||
mockDynClient := new(mockDynamicClient)
|
||||
mockDynClient.On("Resource", mock.Anything).Return(mockResource)
|
||||
|
||||
service := createK8sCleanupService(t, mockDynClient)
|
||||
service.deleteExpiredSnapshots(context.Background())
|
||||
|
||||
// Verify multiple namespaces were queried (one per org)
|
||||
require.GreaterOrEqual(t, len(namespacesQueried), 2, "expected at least 2 namespaces to be queried")
|
||||
// Verify both snapshots were deleted
|
||||
mockResource.AssertCalled(t, "Delete", mock.Anything, "expired-snap-1", mock.Anything, mock.Anything)
|
||||
mockResource.AssertCalled(t, "Delete", mock.Anything, "expired-snap-2", mock.Anything, mock.Anything)
|
||||
})
|
||||
|
||||
t.Run("skips non-expired snapshots", func(t *testing.T) {
|
||||
// Setup with future timestamp
|
||||
futureTime := time.Now().Add(time.Hour).UnixMilli()
|
||||
futureSnapshot := createUnstructuredSnapshot("future-snap", "org-1", futureTime)
|
||||
|
||||
mockResource := new(mockResourceInterface)
|
||||
mockResource.On("Namespace", mock.Anything).Return(mockResource)
|
||||
mockResource.On("List", mock.Anything, mock.Anything).Return(&unstructured.UnstructuredList{
|
||||
Items: []unstructured.Unstructured{*futureSnapshot},
|
||||
}, nil)
|
||||
|
||||
mockDynClient := new(mockDynamicClient)
|
||||
mockDynClient.On("Resource", mock.Anything).Return(mockResource)
|
||||
|
||||
service := createK8sCleanupService(t, mockDynClient)
|
||||
service.deleteExpiredSnapshots(context.Background())
|
||||
|
||||
mockResource.AssertNotCalled(t, "Delete", mock.Anything, mock.Anything, mock.Anything, mock.Anything)
|
||||
})
|
||||
|
||||
t.Run("handles REST config error", func(t *testing.T) {
|
||||
service := &CleanUpService{
|
||||
log: log.New("cleanup"),
|
||||
Cfg: &setting.Cfg{},
|
||||
Features: featuremgmt.WithFeatures(featuremgmt.FlagKubernetesSnapshots),
|
||||
clientConfigProvider: apiserver.WithoutRestConfig,
|
||||
}
|
||||
|
||||
// Should not panic
|
||||
service.deleteExpiredSnapshots(context.Background())
|
||||
})
|
||||
|
||||
t.Run("handles not found error gracefully", func(t *testing.T) {
|
||||
expiredTime := time.Now().Add(-time.Hour).UnixMilli()
|
||||
expiredSnapshot := createUnstructuredSnapshot("expired-snap", "org-1", expiredTime)
|
||||
|
||||
notFoundErr := k8serrors.NewNotFound(schema.GroupResource{}, "expired-snap")
|
||||
|
||||
mockResource := new(mockResourceInterface)
|
||||
mockResource.On("Namespace", mock.Anything).Return(mockResource)
|
||||
mockResource.On("List", mock.Anything, mock.Anything).Return(&unstructured.UnstructuredList{
|
||||
Items: []unstructured.Unstructured{*expiredSnapshot},
|
||||
}, nil)
|
||||
mockResource.On("Delete", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(notFoundErr)
|
||||
|
||||
mockDynClient := new(mockDynamicClient)
|
||||
mockDynClient.On("Resource", mock.Anything).Return(mockResource)
|
||||
|
||||
service := createK8sCleanupService(t, mockDynClient)
|
||||
|
||||
// Should not panic - not found is expected
|
||||
service.deleteExpiredSnapshots(context.Background())
|
||||
mockResource.AssertExpectations(t)
|
||||
})
|
||||
|
||||
t.Run("continues processing other orgs when one org fails to list snapshots", func(t *testing.T) {
|
||||
// Create an expired snapshot for the second org
|
||||
expiredTime := time.Now().Add(-time.Hour).UnixMilli()
|
||||
expiredSnapshot := createUnstructuredSnapshot("expired-snap-org2", "org-2", expiredTime)
|
||||
|
||||
// Use namespace-aware mock that returns error for first namespace, success for second
|
||||
mockResource := &namespaceAwareMockResource{
|
||||
expiredSnapshot: expiredSnapshot,
|
||||
deletedNames: make(map[string]bool),
|
||||
}
|
||||
|
||||
mockDynClient := new(mockDynamicClient)
|
||||
mockDynClient.On("Resource", mock.Anything).Return(mockResource)
|
||||
|
||||
service := createK8sCleanupService(t, mockDynClient)
|
||||
service.deleteExpiredSnapshots(context.Background())
|
||||
|
||||
// Verify that the snapshot from the second org was still deleted despite the first org failing
|
||||
require.True(t, mockResource.deletedNames["expired-snap-org2"], "expected snapshot from second org to be deleted")
|
||||
})
|
||||
}
|
||||
|
||||
// Helper function to create unstructured snapshots for testing
|
||||
func createUnstructuredSnapshot(name, namespace string, expiresMillis int64) *unstructured.Unstructured {
|
||||
snapshot := &v0alpha1.Snapshot{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: namespace,
|
||||
},
|
||||
Spec: v0alpha1.SnapshotSpec{
|
||||
Expires: &expiresMillis,
|
||||
},
|
||||
}
|
||||
obj, _ := runtime.DefaultUnstructuredConverter.ToUnstructured(snapshot)
|
||||
return &unstructured.Unstructured{Object: obj}
|
||||
}
|
||||
|
||||
// Helper to create CleanUpService configured for Kubernetes mode with standard two-org setup
|
||||
func createK8sCleanupService(t *testing.T, mockDynClient *mockDynamicClient) *CleanUpService {
|
||||
mockOrgSvc := orgtest.NewMockService(t)
|
||||
mockOrgSvc.On("Search", mock.Anything, mock.Anything).Return([]*org.OrgDTO{
|
||||
{ID: 1, Name: "org1"},
|
||||
{ID: 2, Name: "org2"},
|
||||
}, nil)
|
||||
|
||||
return &CleanUpService{
|
||||
log: log.New("cleanup"),
|
||||
Cfg: &setting.Cfg{},
|
||||
Features: featuremgmt.WithFeatures(featuremgmt.FlagKubernetesSnapshots),
|
||||
clientConfigProvider: apiserver.RestConfigProviderFunc(func(ctx context.Context) (*rest.Config, error) {
|
||||
return &rest.Config{}, nil
|
||||
}),
|
||||
orgService: mockOrgSvc,
|
||||
dynamicClientFactory: func(cfg *rest.Config) (dynamic.Interface, error) {
|
||||
return mockDynClient, nil
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// mockDynamicClient is a minimal mock for dynamic.Interface
|
||||
type mockDynamicClient struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (m *mockDynamicClient) Resource(resource schema.GroupVersionResource) dynamic.NamespaceableResourceInterface {
|
||||
args := m.Called(resource)
|
||||
return args.Get(0).(dynamic.NamespaceableResourceInterface)
|
||||
}
|
||||
|
||||
// mockResourceInterface is a minimal mock for dynamic.ResourceInterface.
|
||||
// Embeds dynamic.ResourceInterface to avoid implementing unused methods.
|
||||
type mockResourceInterface struct {
|
||||
mock.Mock
|
||||
dynamic.ResourceInterface
|
||||
}
|
||||
|
||||
func (m *mockResourceInterface) Namespace(ns string) dynamic.ResourceInterface {
|
||||
args := m.Called(ns)
|
||||
return args.Get(0).(dynamic.ResourceInterface)
|
||||
}
|
||||
|
||||
func (m *mockResourceInterface) List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) {
|
||||
args := m.Called(ctx, opts)
|
||||
if args.Get(0) == nil {
|
||||
return nil, args.Error(1)
|
||||
}
|
||||
return args.Get(0).(*unstructured.UnstructuredList), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *mockResourceInterface) Delete(ctx context.Context, name string, opts metav1.DeleteOptions, subresources ...string) error {
|
||||
args := m.Called(ctx, name, opts, subresources)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
// namespaceAwareMockResource is a mock that returns different results based on namespace.
|
||||
// Used to test that processing continues when one org fails.
|
||||
// Embeds dynamic.ResourceInterface to avoid implementing unused methods.
|
||||
type namespaceAwareMockResource struct {
|
||||
dynamic.ResourceInterface
|
||||
currentNamespace string
|
||||
expiredSnapshot *unstructured.Unstructured
|
||||
deletedNames map[string]bool
|
||||
}
|
||||
|
||||
func (m *namespaceAwareMockResource) Namespace(ns string) dynamic.ResourceInterface {
|
||||
m.currentNamespace = ns
|
||||
return m
|
||||
}
|
||||
|
||||
func (m *namespaceAwareMockResource) List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) {
|
||||
// Fail for the first org namespace (org ID 1 maps to "default"), succeed for the second (org ID 2 maps to "org-2")
|
||||
if m.currentNamespace == "default" {
|
||||
return nil, errors.New("simulated list error for org-1")
|
||||
}
|
||||
return &unstructured.UnstructuredList{
|
||||
Items: []unstructured.Unstructured{*m.expiredSnapshot},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *namespaceAwareMockResource) Delete(ctx context.Context, name string, opts metav1.DeleteOptions, subresources ...string) error {
|
||||
m.deletedNames[name] = true
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user