diff --git a/pkg/registry/apis/dashboard/legacy/storage.go b/pkg/registry/apis/dashboard/legacy/storage.go index 5c47a5f6cfc..9b4e06ab746 100644 --- a/pkg/registry/apis/dashboard/legacy/storage.go +++ b/pkg/registry/apis/dashboard/legacy/storage.go @@ -3,7 +3,9 @@ package legacy import ( "context" "encoding/json" + "errors" "fmt" + "iter" "net/http" "time" @@ -234,6 +236,12 @@ func (a *dashboardSqlAccess) ListHistory(ctx context.Context, req *resourcepb.Li return a.ListIterator(ctx, req, cb) } +func (a *dashboardSqlAccess) ListModifiedSince(ctx context.Context, key resource.NamespacedResource, sinceRv int64) (int64, iter.Seq2[*resource.ModifiedResource, error]) { + return 0, func(yield func(*resource.ModifiedResource, error) bool) { + yield(nil, errors.New("not implemented")) + } +} + // List implements StorageBackend. func (a *dashboardSqlAccess) ListIterator(ctx context.Context, req *resourcepb.ListRequest, cb func(resource.ListIterator) error) (int64, error) { if req.ResourceVersion != 0 { diff --git a/pkg/registry/apis/iam/noopstorage/storage_backend.go b/pkg/registry/apis/iam/noopstorage/storage_backend.go index 200d0b5c485..eb8133ca0b8 100644 --- a/pkg/registry/apis/iam/noopstorage/storage_backend.go +++ b/pkg/registry/apis/iam/noopstorage/storage_backend.go @@ -3,6 +3,7 @@ package noopstorage import ( "context" "errors" + "iter" "net/http" "github.com/grafana/grafana/pkg/storage/unified/resource" @@ -36,6 +37,12 @@ func (c *StorageBackendImpl) ListIterator(context.Context, *resourcepb.ListReque return 0, errNoopStorage } +func (c *StorageBackendImpl) ListModifiedSince(ctx context.Context, key resource.NamespacedResource, sinceRv int64) (int64, iter.Seq2[*resource.ModifiedResource, error]) { + return 0, func(yield func(*resource.ModifiedResource, error) bool) { + yield(nil, errors.New("not implemented")) + } +} + // ReadResource implements resource.StorageBackend. func (c *StorageBackendImpl) ReadResource(_ context.Context, req *resourcepb.ReadRequest) *resource.BackendReadResponse { return &resource.BackendReadResponse{ diff --git a/pkg/storage/unified/resource/cdk_backend.go b/pkg/storage/unified/resource/cdk_backend.go index c47ec01d378..fdabaca6759 100644 --- a/pkg/storage/unified/resource/cdk_backend.go +++ b/pkg/storage/unified/resource/cdk_backend.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "iter" "net/http" "sort" "strconv" @@ -74,6 +75,12 @@ type cdkBackend struct { stream chan<- *WrittenEvent } +func (s *cdkBackend) ListModifiedSince(ctx context.Context, key NamespacedResource, sinceRv int64) (int64, iter.Seq2[*ModifiedResource, error]) { + return 0, func(yield func(*ModifiedResource, error) bool) { + yield(nil, errors.New("not implemented")) + } +} + func (s *cdkBackend) getPath(key *resourcepb.ResourceKey, rv int64) string { var buffer bytes.Buffer buffer.WriteString(s.root) diff --git a/pkg/storage/unified/resource/search_test.go b/pkg/storage/unified/resource/search_test.go index 07a95639ed6..f1e005ade86 100644 --- a/pkg/storage/unified/resource/search_test.go +++ b/pkg/storage/unified/resource/search_test.go @@ -2,7 +2,9 @@ package resource import ( "context" + "errors" "fmt" + "iter" "log/slog" "sync" "testing" @@ -98,6 +100,12 @@ func (m *mockStorageBackend) ListHistory(ctx context.Context, req *resourcepb.Li return 0, nil } +func (m *mockStorageBackend) ListModifiedSince(ctx context.Context, key NamespacedResource, sinceRv int64) (int64, iter.Seq2[*ModifiedResource, error]) { + return 0, func(yield func(*ModifiedResource, error) bool) { + yield(nil, errors.New("not implemented")) + } +} + // mockSearchBackend implements SearchBackend for testing with tracking capabilities type mockSearchBackend struct { mu sync.Mutex diff --git a/pkg/storage/unified/resource/server.go b/pkg/storage/unified/resource/server.go index 1b52245157d..f5dc2b7f2a5 100644 --- a/pkg/storage/unified/resource/server.go +++ b/pkg/storage/unified/resource/server.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "iter" "log/slog" "net/http" "sync" @@ -114,6 +115,10 @@ type StorageBackend interface { // ListHistory is like ListIterator, but it returns the history of a resource ListHistory(context.Context, *resourcepb.ListRequest, func(ListIterator) error) (int64, error) + // ListModifiedSince will return all resources that have changed since the given resource version. + // If a resource has changes, only the latest change will be returned. + ListModifiedSince(ctx context.Context, key NamespacedResource, sinceRv int64) (int64, iter.Seq2[*ModifiedResource, error]) + // Get all events from the store // For HA setups, this will be more events than the local WriteEvent above! WatchWriteEvents(ctx context.Context) (<-chan *WrittenEvent, error) @@ -122,6 +127,13 @@ type StorageBackend interface { GetResourceStats(ctx context.Context, namespace string, minCount int) ([]ResourceStats, error) } +type ModifiedResource struct { + Action resourcepb.WatchEvent_Type + Key resourcepb.ResourceKey + Value []byte + ResourceVersion int64 +} + type ResourceStats struct { NamespacedResource diff --git a/pkg/storage/unified/resource/storage_backend.go b/pkg/storage/unified/resource/storage_backend.go index dff33f63b36..eafb9b40f98 100644 --- a/pkg/storage/unified/resource/storage_backend.go +++ b/pkg/storage/unified/resource/storage_backend.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io" + "iter" "math/rand/v2" "net/http" "sort" @@ -450,6 +451,12 @@ func applyPagination(keys []DataKey, lastSeenRV int64, sortAscending bool) []Dat return pagedKeys } +func (k *kvStorageBackend) ListModifiedSince(ctx context.Context, key NamespacedResource, sinceRv int64) (int64, iter.Seq2[*ModifiedResource, error]) { + return 0, func(yield func(*ModifiedResource, error) bool) { + yield(nil, errors.New("not implemented")) + } +} + // ListHistory is like ListIterator, but it returns the history of a resource. func (k *kvStorageBackend) ListHistory(ctx context.Context, req *resourcepb.ListRequest, fn func(ListIterator) error) (int64, error) { if err := validateListHistoryRequest(req); err != nil { diff --git a/pkg/storage/unified/sql/backend.go b/pkg/storage/unified/sql/backend.go index ca3e941eb92..a4df140d541 100644 --- a/pkg/storage/unified/sql/backend.go +++ b/pkg/storage/unified/sql/backend.go @@ -5,6 +5,7 @@ import ( "database/sql" "errors" "fmt" + "iter" "math" "sync" "time" @@ -631,6 +632,99 @@ func (b *backend) listLatest(ctx context.Context, req *resourcepb.ListRequest, c return iter.listRV, err } +// ListModifiedSince will return all resources that have changed since the given resource version. +// If a resource has changes, only the latest change will be returned. +func (b *backend) ListModifiedSince(ctx context.Context, key resource.NamespacedResource, sinceRv int64) (int64, iter.Seq2[*resource.ModifiedResource, error]) { + tx, err := b.db.BeginTx(ctx, RepeatableRead) + if err != nil { + return 0, func(yield func(*resource.ModifiedResource, error) bool) { + yield(nil, err) + } + } + + // Fetch latest RV within the transaction + latestRv, err := b.fetchLatestRV(ctx, tx, b.dialect, key.Group, key.Resource) + if err != nil { + terr := tx.Rollback() + if terr != nil { + b.log.Warn("Error rolling back transaction in ListModifiedSince", "error", terr) + } + return 0, func(yield func(*resource.ModifiedResource, error) bool) { + yield(nil, err) + } + } + + // since results are sorted by name ASC and rv DESC, we can get away with tracking the last seen + lastSeen := "" + + // rollback transaction if iterator not called within 30 seconds + rollbackTimer := time.AfterFunc(30*time.Second, func() { + if err := tx.Rollback(); err != nil && !errors.Is(err, sql.ErrTxDone) { + b.log.Warn("rollback timer error", "err", err) + } + }) + + seq := func(yield func(*resource.ModifiedResource, error) bool) { + rollbackTimer.Stop() + + defer func() { + // Always rollback the read-only transaction when iterator is done + if rollbackErr := tx.Rollback(); rollbackErr != nil { + b.log.Warn("Error rolling back transaction in ListModifiedSince", "error", rollbackErr) + } + }() + + query := sqlResourceListModifiedSinceRequest{ + SQLTemplate: sqltemplate.New(b.dialect), + Namespace: key.Namespace, + Group: key.Group, + Resource: key.Resource, + SinceRv: sinceRv, + } + + rows, err := dbutil.QueryRows(ctx, tx, sqlResourceHistoryListModifiedSince, query) + if err != nil { + yield(nil, err) + return + } + if rows != nil { + defer func() { + if cerr := rows.Close(); cerr != nil { + b.log.Warn("listSinceModified error closing rows", "error", cerr) + } + }() + } + + for rows.Next() { + mr := &resource.ModifiedResource{} + if err := rows.Scan(&mr.Key.Namespace, &mr.Key.Group, &mr.Key.Resource, &mr.Key.Name, &mr.ResourceVersion, &mr.Action, &mr.Value); err != nil { + if !yield(nil, err) { + return + } + continue + } + + // Deduplicate by name (namespace, group, and resource are always the same in the result set) + if mr.Key.Name == lastSeen { + continue + } + + if mr.Key.Name <= lastSeen { + // resource names should be sorted alphabetically. So if not, the query is not correct. + yield(nil, fmt.Errorf("listModifiedSince: resources are not sorted by name ASC, lastSeen: %q, current: %q", lastSeen, mr.Key.Name)) + } + + lastSeen = mr.Key.Name + + if !yield(mr, nil) { + return + } + } + } + + return latestRv, seq +} + // listAtRevision fetches the resources from the resource_history table at a specific revision. func (b *backend) listAtRevision(ctx context.Context, req *resourcepb.ListRequest, cb func(resource.ListIterator) error) (int64, error) { ctx, span := b.tracer.Start(ctx, tracePrefix+"listAtRevision") diff --git a/pkg/storage/unified/sql/data/resource_history_list_since_modified.sql b/pkg/storage/unified/sql/data/resource_history_list_since_modified.sql new file mode 100644 index 00000000000..5c4b05eb997 --- /dev/null +++ b/pkg/storage/unified/sql/data/resource_history_list_since_modified.sql @@ -0,0 +1,14 @@ +SELECT + {{.Ident "namespace"}}, + {{.Ident "group"}}, + {{.Ident "resource"}}, + {{.Ident "name"}}, + {{.Ident "resource_version"}}, + {{.Ident "action"}}, + {{.Ident "value"}} +FROM resource_history +WHERE {{.Ident "namespace" }} = {{.Arg .Namespace }} + AND {{.Ident "group" }} = {{.Arg .Group }} + AND {{.Ident "resource" }} = {{.Arg .Resource }} + AND {{.Ident "resource_version" }} > {{.Arg .SinceRv }} -- needs to be exclusive of the sinceRv +ORDER BY {{.Ident "name" }} ASC, {{.Ident "resource_version" }} DESC diff --git a/pkg/storage/unified/sql/queries.go b/pkg/storage/unified/sql/queries.go index 43ae7f3a45a..3253f589ec7 100644 --- a/pkg/storage/unified/sql/queries.go +++ b/pkg/storage/unified/sql/queries.go @@ -30,24 +30,25 @@ func mustTemplate(filename string) *template.Template { // Templates. var ( - sqlResourceDelete = mustTemplate("resource_delete.sql") - sqlResourceInsert = mustTemplate("resource_insert.sql") - sqlResourceUpdate = mustTemplate("resource_update.sql") - sqlResourceRead = mustTemplate("resource_read.sql") - sqlResourceStats = mustTemplate("resource_stats.sql") - sqlResourceList = mustTemplate("resource_list.sql") - sqlResourceHistoryList = mustTemplate("resource_history_list.sql") - sqlResourceUpdateRV = mustTemplate("resource_update_rv.sql") - sqlResourceHistoryRead = mustTemplate("resource_history_read.sql") - sqlResourceHistoryReadLatestRV = mustTemplate("resource_history_read_latest_rv.sql") - sqlResourceHistoryUpdateRV = mustTemplate("resource_history_update_rv.sql") - sqlResourceHistoryInsert = mustTemplate("resource_history_insert.sql") - sqlResourceHistoryPoll = mustTemplate("resource_history_poll.sql") - sqlResourceHistoryGet = mustTemplate("resource_history_get.sql") - sqlResourceHistoryDelete = mustTemplate("resource_history_delete.sql") - sqlResourceHistoryPrune = mustTemplate("resource_history_prune.sql") - sqlResourceTrash = mustTemplate("resource_trash.sql") - sqlResourceInsertFromHistory = mustTemplate("resource_insert_from_history.sql") + sqlResourceDelete = mustTemplate("resource_delete.sql") + sqlResourceInsert = mustTemplate("resource_insert.sql") + sqlResourceUpdate = mustTemplate("resource_update.sql") + sqlResourceRead = mustTemplate("resource_read.sql") + sqlResourceStats = mustTemplate("resource_stats.sql") + sqlResourceList = mustTemplate("resource_list.sql") + sqlResourceHistoryList = mustTemplate("resource_history_list.sql") + sqlResourceHistoryListModifiedSince = mustTemplate("resource_history_list_since_modified.sql") + sqlResourceUpdateRV = mustTemplate("resource_update_rv.sql") + sqlResourceHistoryRead = mustTemplate("resource_history_read.sql") + sqlResourceHistoryReadLatestRV = mustTemplate("resource_history_read_latest_rv.sql") + sqlResourceHistoryUpdateRV = mustTemplate("resource_history_update_rv.sql") + sqlResourceHistoryInsert = mustTemplate("resource_history_insert.sql") + sqlResourceHistoryPoll = mustTemplate("resource_history_poll.sql") + sqlResourceHistoryGet = mustTemplate("resource_history_get.sql") + sqlResourceHistoryDelete = mustTemplate("resource_history_delete.sql") + sqlResourceHistoryPrune = mustTemplate("resource_history_prune.sql") + sqlResourceTrash = mustTemplate("resource_trash.sql") + sqlResourceInsertFromHistory = mustTemplate("resource_insert_from_history.sql") // sqlResourceLabelsInsert = mustTemplate("resource_labels_insert.sql") sqlResourceVersionGet = mustTemplate("resource_version_get.sql") @@ -425,3 +426,27 @@ func (r *sqlResourceVersionListRequest) Results() (*groupResourceVersion, error) x := *r.groupResourceVersion return &x, nil } + +type sqlResourceListModifiedSinceRequest struct { + sqltemplate.SQLTemplate + Namespace string + Group string + Resource string + SinceRv int64 +} + +func (r sqlResourceListModifiedSinceRequest) Validate() error { + if r.Namespace == "" { + return fmt.Errorf("missing namespace") + } + if r.Group == "" { + return fmt.Errorf("missing group") + } + if r.Resource == "" { + return fmt.Errorf("missing resource") + } + if r.SinceRv < 0 { + return fmt.Errorf("since resource version must be greater than or equal to zero") + } + return nil +} diff --git a/pkg/storage/unified/sql/queries_test.go b/pkg/storage/unified/sql/queries_test.go index dcdf64016d5..b5fbd407ade 100644 --- a/pkg/storage/unified/sql/queries_test.go +++ b/pkg/storage/unified/sql/queries_test.go @@ -120,6 +120,18 @@ func TestUnifiedStorageQueries(t *testing.T) { }, }, }, + sqlResourceHistoryListModifiedSince: { + { + Name: "single path", + Data: &sqlResourceListModifiedSinceRequest{ + SQLTemplate: mocks.NewTestingSQLTemplate(), + Namespace: "ns", + Group: "group", + Resource: "res", + SinceRv: 10000, + }, + }, + }, sqlResourceHistoryPoll: { { Name: "single path", diff --git a/pkg/storage/unified/sql/testdata/mysql--resource_history_list_since_modified-single path.sql b/pkg/storage/unified/sql/testdata/mysql--resource_history_list_since_modified-single path.sql new file mode 100755 index 00000000000..9e2d8bff37c --- /dev/null +++ b/pkg/storage/unified/sql/testdata/mysql--resource_history_list_since_modified-single path.sql @@ -0,0 +1,14 @@ +SELECT + `namespace`, + `group`, + `resource`, + `name`, + `resource_version`, + `action`, + `value` +FROM resource_history +WHERE `namespace` = 'ns' + AND `group` = 'group' + AND `resource` = 'res' + AND `resource_version` > 10000 -- needs to be exclusive of the sinceRv +ORDER BY `name` ASC, `resource_version` DESC diff --git a/pkg/storage/unified/sql/testdata/postgres--resource_history_list_since_modified-single path.sql b/pkg/storage/unified/sql/testdata/postgres--resource_history_list_since_modified-single path.sql new file mode 100755 index 00000000000..28089ffaa66 --- /dev/null +++ b/pkg/storage/unified/sql/testdata/postgres--resource_history_list_since_modified-single path.sql @@ -0,0 +1,14 @@ +SELECT + "namespace", + "group", + "resource", + "name", + "resource_version", + "action", + "value" +FROM resource_history +WHERE "namespace" = 'ns' + AND "group" = 'group' + AND "resource" = 'res' + AND "resource_version" > 10000 -- needs to be exclusive of the sinceRv +ORDER BY "name" ASC, "resource_version" DESC diff --git a/pkg/storage/unified/sql/testdata/sqlite--resource_history_list_since_modified-single path.sql b/pkg/storage/unified/sql/testdata/sqlite--resource_history_list_since_modified-single path.sql new file mode 100755 index 00000000000..28089ffaa66 --- /dev/null +++ b/pkg/storage/unified/sql/testdata/sqlite--resource_history_list_since_modified-single path.sql @@ -0,0 +1,14 @@ +SELECT + "namespace", + "group", + "resource", + "name", + "resource_version", + "action", + "value" +FROM resource_history +WHERE "namespace" = 'ns' + AND "group" = 'group' + AND "resource" = 'res' + AND "resource_version" > 10000 -- needs to be exclusive of the sinceRv +ORDER BY "name" ASC, "resource_version" DESC diff --git a/pkg/storage/unified/testing/storage_backend.go b/pkg/storage/unified/testing/storage_backend.go index e3314376715..5488b44a144 100644 --- a/pkg/storage/unified/testing/storage_backend.go +++ b/pkg/storage/unified/testing/storage_backend.go @@ -35,6 +35,7 @@ const ( TestGetResourceStats = "get resource stats" TestListHistory = "list history" TestListHistoryErrorReporting = "list history error reporting" + TestListModifiedSince = "list events since rv" TestListTrash = "list trash" TestCreateNewResource = "create new resource" ) @@ -78,6 +79,7 @@ func RunStorageBackendTest(t *testing.T, newBackend NewBackendFunc, opts *TestOp {TestListHistoryErrorReporting, runTestIntegrationBackendListHistoryErrorReporting}, {TestListTrash, runTestIntegrationBackendTrash}, {TestCreateNewResource, runTestIntegrationBackendCreateNewResource}, + {TestListModifiedSince, runTestIntegrationBackendListModifiedSince}, } for _, tc := range cases { @@ -482,6 +484,77 @@ func runTestIntegrationBackendList(t *testing.T, backend resource.StorageBackend }) } +func runTestIntegrationBackendListModifiedSince(t *testing.T, backend resource.StorageBackend, nsPrefix string) { + ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second)) + ns := nsPrefix + "-history-ns" + rvCreated, _ := writeEvent(ctx, backend, "item1", resourcepb.WatchEvent_ADDED, WithNamespace(ns)) + require.Greater(t, rvCreated, int64(0)) + rvUpdated, err := writeEvent(ctx, backend, "item1", resourcepb.WatchEvent_MODIFIED, WithNamespace(ns)) + require.NoError(t, err) + require.Greater(t, rvUpdated, rvCreated) + rvDeleted, err := writeEvent(ctx, backend, "item1", resourcepb.WatchEvent_DELETED, WithNamespace(ns)) + require.NoError(t, err) + require.Greater(t, rvDeleted, rvUpdated) + + t.Run("will list latest modified event when resource has multiple events", func(t *testing.T) { + key := resource.NamespacedResource{ + Namespace: ns, + Group: "group", + Resource: "resource", + } + latestRv, seq := backend.ListModifiedSince(ctx, key, rvCreated) + require.Greater(t, latestRv, rvCreated) + + counter := 0 + for res, err := range seq { + require.NoError(t, err) + require.Equal(t, rvDeleted, res.ResourceVersion) + counter++ + } + require.Equal(t, 1, counter) // only one event should be returned + }) + + t.Run("no events if none after the given resource version", func(t *testing.T) { + key := resource.NamespacedResource{ + Namespace: ns, + Group: "group", + Resource: "resource", + } + latestRv, seq := backend.ListModifiedSince(ctx, key, rvDeleted) + require.GreaterOrEqual(t, latestRv, rvDeleted) + + counter := 0 + for _, _ = range seq { + counter++ + } + require.Equal(t, 0, counter) // no events should be returned + }) + + t.Run("will only return modified events for the given key", func(t *testing.T) { + key := resource.NamespacedResource{ + Namespace: "other-ns", + Group: "group", + Resource: "resource", + } + + // Write an event for another tenant for the same resource + rvCreatedOtherTenant, err := writeEvent(ctx, backend, "item2", resourcepb.WatchEvent_ADDED, WithNamespace("other-ns")) + require.NoError(t, err) + + latestRv, seq := backend.ListModifiedSince(ctx, key, rvCreated) + require.Greater(t, latestRv, rvCreated) + + counter := 0 + for res, err := range seq { + require.NoError(t, err) + require.Equal(t, rvCreatedOtherTenant, res.ResourceVersion) + require.Equal(t, key.Namespace, res.Key.Namespace) + counter++ + } + require.Equal(t, 1, counter) // only one event should be returned + }) +} + func runTestIntegrationBackendListHistory(t *testing.T, backend resource.StorageBackend, nsPrefix string) { ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second)) server := newServer(t, backend) diff --git a/pkg/storage/unified/testing/storage_backend_test.go b/pkg/storage/unified/testing/storage_backend_test.go index a1da3b82e72..ced0b6869a8 100644 --- a/pkg/storage/unified/testing/storage_backend_test.go +++ b/pkg/storage/unified/testing/storage_backend_test.go @@ -23,7 +23,8 @@ func TestBadgerKVStorageBackend(t *testing.T) { NSPrefix: "kvstorage-test", SkipTests: map[string]bool{ // TODO: fix these tests and remove this skip - TestBlobSupport: true, + TestBlobSupport: true, + TestListModifiedSince: true, }, }) }