From a4aa3529c8faa64c3c63734a3513b065cced0687 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Thu, 23 Oct 2025 13:17:08 +0200 Subject: [PATCH] Cleanup old entries from resource_last_import_time table. (#112438) * Cleanup old entries from resource_last_import_time table. * Add index for last_import_time column. * Address review feedback. --- pkg/storage/unified/sql/backend.go | 34 +++++++++++++++++++ .../data/resource_last_import_time_delete.sql | 3 ++ .../unified/sql/db/migrations/resource_mig.go | 11 ++++-- pkg/storage/unified/sql/queries.go | 10 ++++++ pkg/storage/unified/sql/queries_test.go | 9 +++++ pkg/storage/unified/sql/server.go | 13 +++---- .../unified/sql/test/integration_test.go | 1 + ...esource_last_import_time_delete-delete.sql | 3 ++ ...esource_last_import_time_delete-delete.sql | 3 ++ ...esource_last_import_time_delete-delete.sql | 3 ++ 10 files changed, 82 insertions(+), 8 deletions(-) create mode 100644 pkg/storage/unified/sql/data/resource_last_import_time_delete.sql create mode 100755 pkg/storage/unified/sql/testdata/mysql--resource_last_import_time_delete-delete.sql create mode 100755 pkg/storage/unified/sql/testdata/postgres--resource_last_import_time_delete-delete.sql create mode 100755 pkg/storage/unified/sql/testdata/sqlite--resource_last_import_time_delete-delete.sql diff --git a/pkg/storage/unified/sql/backend.go b/pkg/storage/unified/sql/backend.go index 823ec1092e7..ac7e12a51d0 100644 --- a/pkg/storage/unified/sql/backend.go +++ b/pkg/storage/unified/sql/backend.go @@ -16,6 +16,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace/noop" + "go.uber.org/atomic" "google.golang.org/protobuf/proto" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime/schema" @@ -67,6 +68,9 @@ type BackendOptions struct { // testing SimulatedNetworkLatency time.Duration // slows down the create transactions by a fixed amount + + // If not zero, the backend will regularly remove times from resource_last_import_time table older than this. + LastImportTimeMaxAge time.Duration } func NewBackend(opts BackendOptions) (Backend, error) { @@ -98,6 +102,7 @@ func NewBackend(opts BackendOptions) (Backend, error) { bulkLock: &bulkLock{running: make(map[string]bool)}, simulatedNetworkLatency: opts.SimulatedNetworkLatency, withPruner: opts.withPruner, + lastImportTimeMaxAge: opts.LastImportTimeMaxAge, }, nil } @@ -137,6 +142,9 @@ type backend struct { historyPruner resource.Pruner withPruner bool + + lastImportTimeMaxAge time.Duration + lastImportTimeDeletionTime atomic.Time } func (b *backend) Init(ctx context.Context) error { @@ -965,10 +973,36 @@ func (b *backend) fetchLatestHistoryRV(ctx context.Context, x db.ContextExecer, return res.ResourceVersion, nil } +// Don't run deletion of "last import times" more often than this duration. +const limitLastImportTimesDeletion = 1 * time.Hour + func (b *backend) GetResourceLastImportTimes(ctx context.Context) iter.Seq2[resource.ResourceLastImportTime, error] { ctx, span := b.tracer.Start(ctx, tracePrefix+"GetLastImportTimes") defer span.End() + // Delete old entries, if configured, and if enough time has passed since last deletion. + if b.lastImportTimeMaxAge > 0 && time.Since(b.lastImportTimeDeletionTime.Load()) > limitLastImportTimesDeletion { + now := time.Now() + + res, err := dbutil.Exec(ctx, b.db, sqlResourceLastImportTimeDelete, &sqlResourceLastImportTimeDeleteRequest{ + SQLTemplate: sqltemplate.New(b.dialect), + Threshold: now.Add(-b.lastImportTimeMaxAge), + }) + + if err != nil { + return func(yield func(resource.ResourceLastImportTime, error) bool) { + yield(resource.ResourceLastImportTime{}, err) + } + } + + aff, err := res.RowsAffected() + if err == nil && aff > 0 { + b.log.Info("Deleted old last import times", "rows", aff) + } + + b.lastImportTimeDeletionTime.Store(now) + } + rows, err := dbutil.QueryRows(ctx, b.db, sqlResourceLastImportTimeQuery, &sqlResourceLastImportTimeQueryRequest{SQLTemplate: sqltemplate.New(b.dialect)}) if err != nil { return func(yield func(resource.ResourceLastImportTime, error) bool) { diff --git a/pkg/storage/unified/sql/data/resource_last_import_time_delete.sql b/pkg/storage/unified/sql/data/resource_last_import_time_delete.sql new file mode 100644 index 00000000000..9db2b5eae85 --- /dev/null +++ b/pkg/storage/unified/sql/data/resource_last_import_time_delete.sql @@ -0,0 +1,3 @@ +DELETE FROM {{ .Ident "resource_last_import_time" }} +WHERE {{ .Ident "last_import_time" }} <= {{ .Arg .Threshold }} +; diff --git a/pkg/storage/unified/sql/db/migrations/resource_mig.go b/pkg/storage/unified/sql/db/migrations/resource_mig.go index 91c896e3c6f..315ff2b5b40 100644 --- a/pkg/storage/unified/sql/db/migrations/resource_mig.go +++ b/pkg/storage/unified/sql/db/migrations/resource_mig.go @@ -116,7 +116,7 @@ func initResourceTables(mg *migrator.Migrator) string { }, }) - tables = append(tables, migrator.Table{ + resource_last_import_time := migrator.Table{ Name: "resource_last_import_time", Columns: []*migrator.Column{ {Name: "group", Type: migrator.DB_NVarchar, Length: 190, Nullable: false}, @@ -125,7 +125,8 @@ func initResourceTables(mg *migrator.Migrator) string { {Name: "last_import_time", Type: migrator.DB_DateTime, Nullable: false}, }, PrimaryKeys: []string{"group", "resource", "namespace"}, - }) + } + tables = append(tables, resource_last_import_time) // Initialize all tables for t := range tables { @@ -178,5 +179,11 @@ func initResourceTables(mg *migrator.Migrator) string { Name: "IDX_resource_history_namespace_group_resource_name_generation", })) + mg.AddMigration("Add UQE_resource_last_import_time_last_import_time index", migrator.NewAddIndexMigration(resource_last_import_time, &migrator.Index{ + Cols: []string{"last_import_time"}, + Type: migrator.IndexType, + Name: "UQE_resource_last_import_time_last_import_time", + })) + return marker } diff --git a/pkg/storage/unified/sql/queries.go b/pkg/storage/unified/sql/queries.go index e167cd39e63..e6b9b1615bf 100644 --- a/pkg/storage/unified/sql/queries.go +++ b/pkg/storage/unified/sql/queries.go @@ -61,6 +61,7 @@ var ( sqlResourceLastImportTimeInsert = mustTemplate("resource_last_import_time_insert.sql") sqlResourceLastImportTimeQuery = mustTemplate("resource_last_import_time_query.sql") + sqlResourceLastImportTimeDelete = mustTemplate("resource_last_import_time_delete.sql") ) // TxOptions. @@ -489,3 +490,12 @@ type sqlResourceLastImportTimeQueryRequest struct { func (r *sqlResourceLastImportTimeQueryRequest) Validate() error { return nil } + +type sqlResourceLastImportTimeDeleteRequest struct { + sqltemplate.SQLTemplate + Threshold time.Time +} + +func (r *sqlResourceLastImportTimeDeleteRequest) Validate() error { + return nil +} diff --git a/pkg/storage/unified/sql/queries_test.go b/pkg/storage/unified/sql/queries_test.go index 652d1926e59..5673bfad1e8 100644 --- a/pkg/storage/unified/sql/queries_test.go +++ b/pkg/storage/unified/sql/queries_test.go @@ -515,5 +515,14 @@ func TestUnifiedStorageQueries(t *testing.T) { }, }, }, + sqlResourceLastImportTimeDelete: { + { + Name: "delete", + Data: &sqlResourceLastImportTimeDeleteRequest{ + SQLTemplate: mocks.NewTestingSQLTemplate(), + Threshold: time.Date(2025, 10, 15, 14, 30, 05, 0, time.UTC), + }, + }, + }, }}) } diff --git a/pkg/storage/unified/sql/server.go b/pkg/storage/unified/sql/server.go index 7520a2aaf36..b8d61d67420 100644 --- a/pkg/storage/unified/sql/server.go +++ b/pkg/storage/unified/sql/server.go @@ -102,12 +102,13 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) { withPruner := opts.Features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageHistoryPruner) backend, err := NewBackend(BackendOptions{ - DBProvider: eDB, - Tracer: opts.Tracer, - Reg: opts.Reg, - IsHA: isHA, - withPruner: withPruner, - storageMetrics: opts.StorageMetrics, + DBProvider: eDB, + Tracer: opts.Tracer, + Reg: opts.Reg, + IsHA: isHA, + withPruner: withPruner, + storageMetrics: opts.StorageMetrics, + LastImportTimeMaxAge: opts.SearchOptions.MaxIndexAge, // No need to keep last_import_times older than max index age. }) if err != nil { return nil, err diff --git a/pkg/storage/unified/sql/test/integration_test.go b/pkg/storage/unified/sql/test/integration_test.go index 9fa10efd938..d32ca569d66 100644 --- a/pkg/storage/unified/sql/test/integration_test.go +++ b/pkg/storage/unified/sql/test/integration_test.go @@ -53,6 +53,7 @@ func newTestBackend(t *testing.T, isHA bool, simulatedNetworkLatency time.Durati DBProvider: eDB, IsHA: isHA, SimulatedNetworkLatency: simulatedNetworkLatency, + LastImportTimeMaxAge: 24 * time.Hour, }) require.NoError(t, err) require.NotNil(t, backend) diff --git a/pkg/storage/unified/sql/testdata/mysql--resource_last_import_time_delete-delete.sql b/pkg/storage/unified/sql/testdata/mysql--resource_last_import_time_delete-delete.sql new file mode 100755 index 00000000000..0973aca31ad --- /dev/null +++ b/pkg/storage/unified/sql/testdata/mysql--resource_last_import_time_delete-delete.sql @@ -0,0 +1,3 @@ +DELETE FROM `resource_last_import_time` +WHERE `last_import_time` <= '2025-10-15 14:30:05 +0000 UTC' +; diff --git a/pkg/storage/unified/sql/testdata/postgres--resource_last_import_time_delete-delete.sql b/pkg/storage/unified/sql/testdata/postgres--resource_last_import_time_delete-delete.sql new file mode 100755 index 00000000000..59791b15c5d --- /dev/null +++ b/pkg/storage/unified/sql/testdata/postgres--resource_last_import_time_delete-delete.sql @@ -0,0 +1,3 @@ +DELETE FROM "resource_last_import_time" +WHERE "last_import_time" <= '2025-10-15 14:30:05 +0000 UTC' +; diff --git a/pkg/storage/unified/sql/testdata/sqlite--resource_last_import_time_delete-delete.sql b/pkg/storage/unified/sql/testdata/sqlite--resource_last_import_time_delete-delete.sql new file mode 100755 index 00000000000..59791b15c5d --- /dev/null +++ b/pkg/storage/unified/sql/testdata/sqlite--resource_last_import_time_delete-delete.sql @@ -0,0 +1,3 @@ +DELETE FROM "resource_last_import_time" +WHERE "last_import_time" <= '2025-10-15 14:30:05 +0000 UTC' +;