LastImportTime for resource. (#112153)

* LastImportTime for resource.

* Make StorageBackendImpl implement GetResourceLastImportTimes

* More missing implementations of GetResourceLastImportTimes

* Fix import.

* Skip TestGetResourceLastImportTime in TestBadgerKVStorageBackend.

* Implement GetResourceLastImportTimes by mockStorageBackend

* Bump test tolerance.

* Fix postgres query and timezone.

* Fix postgres query and timezone.

* Make linter happy.
This commit is contained in:
Peter Štibraný
2025-10-09 11:27:11 +02:00
committed by GitHub
parent 3cdae5d67d
commit d801b87db9
24 changed files with 476 additions and 8 deletions
+48
View File
@@ -955,3 +955,51 @@ func (b *backend) fetchLatestHistoryRV(ctx context.Context, x db.ContextExecer,
}
return res.ResourceVersion, nil
}
func (b *backend) GetResourceLastImportTimes(ctx context.Context) iter.Seq2[resource.ResourceLastImportTime, error] {
ctx, span := b.tracer.Start(ctx, tracePrefix+"GetLastImportTimes")
defer span.End()
rows, err := dbutil.QueryRows(ctx, b.db, sqlResourceLastImportTimeQuery, &sqlResourceLastImportTimeQueryRequest{SQLTemplate: sqltemplate.New(b.dialect)})
if err != nil {
return func(yield func(resource.ResourceLastImportTime, error) bool) {
yield(resource.ResourceLastImportTime{}, err)
}
}
return func(yield func(resource.ResourceLastImportTime, error) bool) {
closeOnDefer := true
defer func() {
if closeOnDefer {
_ = rows.Close() // Close while ignoring errors.
}
}()
for rows.Next() {
// If context has finished, return early.
if ctx.Err() != nil {
yield(resource.ResourceLastImportTime{}, ctx.Err())
return
}
row := resource.ResourceLastImportTime{}
err = rows.Scan(&row.Namespace, &row.Group, &row.Resource, &row.LastImportTime)
if err != nil {
yield(resource.ResourceLastImportTime{}, err)
return
}
if !yield(row, nil) {
return
}
}
closeOnDefer = false
// Close and report error, if any.
err := rows.Close()
if err != nil {
yield(resource.ResourceLastImportTime{}, err)
}
}
}
+28 -1
View File
@@ -170,7 +170,7 @@ func (b *backend) processBulk(ctx context.Context, setting resource.BulkSettings
// Calculate the RV based on incoming request timestamps
rv := newBulkRV()
summaries := make(map[string]*resourcepb.BulkResponse_Summary, len(setting.Collection)*4)
summaries := make(map[string]*resourcepb.BulkResponse_Summary, len(setting.Collection))
// First clear everything in the transaction
if setting.RebuildCollection {
@@ -182,6 +182,14 @@ func (b *backend) processBulk(ctx context.Context, setting resource.BulkSettings
summaries[resource.NSGR(key)] = summary
rsp.Summary = append(rsp.Summary, summary)
}
} else {
for _, key := range setting.Collection {
summaries[resource.NSGR(key)] = &resourcepb.BulkResponse_Summary{
Namespace: key.Namespace,
Group: key.Group,
Resource: key.Resource,
}
}
}
obj := &unstructured.Unstructured{}
@@ -253,6 +261,12 @@ func (b *backend) processBulk(ctx context.Context, setting resource.BulkSettings
if err != nil {
b.log.Warn("error increasing RV", "error", err)
}
// Update the last import time. This is important to trigger reindexing
// of the resource for a given namespace.
if err := b.updateLastImportTime(ctx, tx, key, time.Now()); err != nil {
return rollbackWithError(err)
}
}
return nil
})
@@ -262,6 +276,19 @@ func (b *backend) processBulk(ctx context.Context, setting resource.BulkSettings
return rsp
}
func (b *backend) updateLastImportTime(ctx context.Context, tx db.Tx, key *resourcepb.ResourceKey, now time.Time) error {
if _, err := dbutil.Exec(ctx, tx, sqlResourceLastImportTimeInsert, sqlResourceLastImportTimeInsertRequest{
SQLTemplate: sqltemplate.New(b.dialect),
Namespace: key.Namespace,
Group: key.Group,
Resource: key.Resource,
LastImportTime: now.UTC(),
}); err != nil {
return fmt.Errorf("insert resource last import time: %w", err)
}
return nil
}
type bulkWroker struct {
ctx context.Context
tx db.ContextExecer
@@ -0,0 +1,42 @@
{{ if eq $.DialectName "mysql" }}
INSERT INTO {{ .Ident "resource_last_import_time" }} (
{{ .Ident "group" }},
{{ .Ident "resource" }},
{{ .Ident "namespace" }},
{{ .Ident "last_import_time" }}
) VALUES (
{{ .Arg .Group }},
{{ .Arg .Resource }},
{{ .Arg .Namespace }},
{{ .Arg .LastImportTime }}
) ON DUPLICATE KEY UPDATE {{ .Ident "last_import_time" }} = {{ .Arg .LastImportTime }}
{{ else if eq $.DialectName "sqlite" }}
INSERT OR REPLACE INTO {{ .Ident "resource_last_import_time" }} (
{{ .Ident "group" }},
{{ .Ident "resource" }},
{{ .Ident "namespace" }},
{{ .Ident "last_import_time" }}
) VALUES (
{{ .Arg .Group }},
{{ .Arg .Resource }},
{{ .Arg .Namespace }},
{{ .Arg .LastImportTime }}
)
{{ else if eq $.DialectName "postgres" }}
INSERT INTO {{ .Ident "resource_last_import_time" }} (
{{ .Ident "group" }},
{{ .Ident "resource" }},
{{ .Ident "namespace" }},
{{ .Ident "last_import_time" }}
) VALUES (
{{ .Arg .Group }},
{{ .Arg .Resource }},
{{ .Arg .Namespace }},
{{ .Arg .LastImportTime }}
) ON CONFLICT ({{ .Ident "group" }}, {{ .Ident "resource" }}, {{ .Ident "namespace" }})
DO UPDATE SET {{ .Ident "last_import_time" }} = {{ .Arg .LastImportTime }}
{{ end }}
;
@@ -0,0 +1,8 @@
SELECT
{{ .Ident "namespace" }},
{{ .Ident "group" }},
{{ .Ident "resource" }},
{{ .Ident "last_import_time" }}
FROM
{{ .Ident "resource_last_import_time" }}
;
@@ -116,6 +116,17 @@ func initResourceTables(mg *migrator.Migrator) string {
},
})
tables = append(tables, migrator.Table{
Name: "resource_last_import_time",
Columns: []*migrator.Column{
{Name: "group", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
{Name: "resource", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
{Name: "namespace", Type: migrator.DB_NVarchar, Length: 63, Nullable: false},
{Name: "last_import_time", Type: migrator.DB_DateTime, Nullable: false},
},
PrimaryKeys: []string{"group", "resource", "namespace"},
})
// Initialize all tables
for t := range tables {
mg.AddMigration("drop table "+tables[t].Name, migrator.NewDropTableMigration(tables[t].Name))
+35
View File
@@ -58,6 +58,9 @@ var (
sqlResourceBlobInsert = mustTemplate("resource_blob_insert.sql")
sqlResourceBlobQuery = mustTemplate("resource_blob_query.sql")
sqlResourceLastImportTimeInsert = mustTemplate("resource_last_import_time_insert.sql")
sqlResourceLastImportTimeQuery = mustTemplate("resource_last_import_time_query.sql")
)
// TxOptions.
@@ -454,3 +457,35 @@ func (r sqlResourceListModifiedSinceRequest) Validate() error {
}
return nil
}
type sqlResourceLastImportTimeInsertRequest struct {
sqltemplate.SQLTemplate
Namespace string
Group string
Resource string
LastImportTime time.Time
}
func (r sqlResourceLastImportTimeInsertRequest) Validate() error {
if r.Namespace == "" {
return fmt.Errorf("missing namespace")
}
if r.Group == "" {
return fmt.Errorf("missing group")
}
if r.Resource == "" {
return fmt.Errorf("missing resource")
}
if r.LastImportTime.IsZero() {
return fmt.Errorf("last import time cannot be zero")
}
return nil
}
type sqlResourceLastImportTimeQueryRequest struct {
sqltemplate.SQLTemplate
}
func (r *sqlResourceLastImportTimeQueryRequest) Validate() error {
return nil
}
+20
View File
@@ -495,5 +495,25 @@ func TestUnifiedStorageQueries(t *testing.T) {
},
},
},
sqlResourceLastImportTimeInsert: {
{
Name: "insert",
Data: &sqlResourceLastImportTimeInsertRequest{
SQLTemplate: mocks.NewTestingSQLTemplate(),
Namespace: "ns",
Group: "group",
Resource: "res",
LastImportTime: time.Date(2025, 10, 07, 22, 30, 05, 0, time.UTC),
},
},
},
sqlResourceLastImportTimeQuery: {
{
Name: "insert",
Data: &sqlResourceLastImportTimeQueryRequest{
SQLTemplate: mocks.NewTestingSQLTemplate(),
},
},
},
}})
}
@@ -0,0 +1,12 @@
INSERT INTO `resource_last_import_time` (
`group`,
`resource`,
`namespace`,
`last_import_time`
) VALUES (
'group',
'res',
'ns',
'2025-10-07 22:30:05 +0000 UTC'
) ON DUPLICATE KEY UPDATE `last_import_time` = '2025-10-07 22:30:05 +0000 UTC'
;
@@ -0,0 +1,8 @@
SELECT
`namespace`,
`group`,
`resource`,
`last_import_time`
FROM
`resource_last_import_time`
;
@@ -0,0 +1,13 @@
INSERT INTO "resource_last_import_time" (
"group",
"resource",
"namespace",
"last_import_time"
) VALUES (
'group',
'res',
'ns',
'2025-10-07 22:30:05 +0000 UTC'
) ON CONFLICT ("group", "resource", "namespace")
DO UPDATE SET "last_import_time" = '2025-10-07 22:30:05 +0000 UTC'
;
@@ -0,0 +1,8 @@
SELECT
"namespace",
"group",
"resource",
"last_import_time"
FROM
"resource_last_import_time"
;
@@ -0,0 +1,12 @@
INSERT OR REPLACE INTO "resource_last_import_time" (
"group",
"resource",
"namespace",
"last_import_time"
) VALUES (
'group',
'res',
'ns',
'2025-10-07 22:30:05 +0000 UTC'
)
;
@@ -0,0 +1,8 @@
SELECT
"namespace",
"group",
"resource",
"last_import_time"
FROM
"resource_last_import_time"
;