diff --git a/pkg/storage/unified/resource/storage_backend.go b/pkg/storage/unified/resource/storage_backend.go index dff33f63b36..1fb0ad2e66a 100644 --- a/pkg/storage/unified/resource/storage_backend.go +++ b/pkg/storage/unified/resource/storage_backend.go @@ -215,7 +215,6 @@ func (k *kvStorageBackend) ListIterator(ctx context.Context, req *resourcepb.Lis // Fetch the latest objects keys := make([]MetaDataKey, 0, min(defaultListBufferSize, req.Limit+1)) - idx := 0 for metaKey, err := range k.metaStore.ListResourceKeysAtRevision(ctx, MetaListRequestKey{ Namespace: req.Options.Key.Namespace, Group: req.Options.Key.Group, @@ -225,17 +224,15 @@ func (k *kvStorageBackend) ListIterator(ctx context.Context, req *resourcepb.Lis if err != nil { return 0, err } - // Skip the first offset items. This is not efficient, but it's a simple way to implement it for now. - if idx < int(offset) { - idx++ - continue - } keys = append(keys, metaKey) - // Only fetch the first limit items + 1 to get the next token. - if len(keys) >= int(req.Limit+1) { - break - } } + + sortMetaKeysByResourceVersion(keys, true) // sort ascending for sql parity + + if offset > 0 && int64(len(keys)) > offset { + keys = keys[offset:] + } + iter := kvListIterator{ keys: keys, currentIndex: -1, @@ -433,6 +430,19 @@ func sortByResourceVersion(filteredKeys []DataKey, sortAscending bool) { } } +// sortMetaKeysByResourceVersion sorts the metadata keys based on the sortAscending flag +func sortMetaKeysByResourceVersion(keys []MetaDataKey, sortAscending bool) { + if sortAscending { + sort.Slice(keys, func(i, j int) bool { + return keys[i].ResourceVersion < keys[j].ResourceVersion + }) + } else { + sort.Slice(keys, func(i, j int) bool { + return keys[i].ResourceVersion > keys[j].ResourceVersion + }) + } +} + // applyPagination filters keys based on pagination parameters func applyPagination(keys []DataKey, lastSeenRV int64, sortAscending bool) []DataKey { if lastSeenRV == 0 { diff --git a/pkg/storage/unified/sql/backend.go b/pkg/storage/unified/sql/backend.go index c9d2bf159be..a23cf9469ea 100644 --- a/pkg/storage/unified/sql/backend.go +++ b/pkg/storage/unified/sql/backend.go @@ -600,7 +600,7 @@ func (b *backend) listLatest(ctx context.Context, req *resourcepb.ListRequest, c return 0, fmt.Errorf("only works for the 'latest' resource version") } - iter := &listIter{sortAsc: false} + iter := &listIter{} err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error { var err error iter.listRV, err = b.fetchLatestRV(ctx, tx, b.dialect, req.Options.Key.Group, req.Options.Key.Resource) @@ -638,7 +638,7 @@ func (b *backend) listAtRevision(ctx context.Context, req *resourcepb.ListReques defer span.End() // Get the RV - iter := &listIter{listRV: req.ResourceVersion, sortAsc: false} + iter := &listIter{listRV: req.ResourceVersion} if req.NextPageToken != "" { continueToken, err := resource.GetContinueToken(req.NextPageToken) if err != nil { diff --git a/pkg/storage/unified/sql/data/resource_history_get.sql b/pkg/storage/unified/sql/data/resource_history_get.sql index 434ae8ffa01..fdb90d908b7 100644 --- a/pkg/storage/unified/sql/data/resource_history_get.sql +++ b/pkg/storage/unified/sql/data/resource_history_get.sql @@ -29,7 +29,7 @@ WHERE 1 = 1 AND {{ .Ident "resource_version" }} = {{ .Arg .ExactRV }} {{ end }} {{ if .SortAscending }} -ORDER BY resource_version ASC +ORDER BY {{ .Ident "resource_version" }} ASC {{ else }} -ORDER BY resource_version DESC +ORDER BY {{ .Ident "resource_version" }} DESC {{ end }} diff --git a/pkg/storage/unified/sql/data/resource_history_list.sql b/pkg/storage/unified/sql/data/resource_history_list.sql index 9a0bbe1fa78..58838c331cf 100644 --- a/pkg/storage/unified/sql/data/resource_history_list.sql +++ b/pkg/storage/unified/sql/data/resource_history_list.sql @@ -50,7 +50,7 @@ SELECT AND kv.{{ .Ident "name" }} = {{ .Arg .Request.Options.Key.Name }} {{ end }} {{ end }} - ORDER BY kv.{{ .Ident "namespace" }} ASC, kv.{{ .Ident "name" }} ASC + ORDER BY kv.{{ .Ident "resource_version" }} ASC {{ if (gt .Request.Limit 0) }} LIMIT {{ .Arg .Request.Limit }} OFFSET {{ .Arg .Request.Offset }} {{ end }} diff --git a/pkg/storage/unified/sql/data/resource_list.sql b/pkg/storage/unified/sql/data/resource_list.sql index 2041618a370..a2ee2f47435 100644 --- a/pkg/storage/unified/sql/data/resource_list.sql +++ b/pkg/storage/unified/sql/data/resource_list.sql @@ -23,5 +23,5 @@ SELECT AND {{ .Ident "name" }} = {{ .Arg .Request.Options.Key.Name }} {{ end }} {{ end }} - ORDER BY {{ .Ident "namespace" }} ASC, {{ .Ident "name" }} ASC + ORDER BY {{ .Ident "resource_version" }} ASC ; diff --git a/pkg/storage/unified/sql/queries.go b/pkg/storage/unified/sql/queries.go index 43ae7f3a45a..1e0e50ae4af 100644 --- a/pkg/storage/unified/sql/queries.go +++ b/pkg/storage/unified/sql/queries.go @@ -242,10 +242,13 @@ func (r *resourceHistoryReadLatestRVResponse) Results() (*resourceHistoryReadLat } type historyListRequest struct { - ResourceVersion, Limit, Offset int64 - Folder string - Options *resourcepb.ListOptions + ResourceVersion int64 + Limit int64 + Offset int64 + Folder string + Options *resourcepb.ListOptions } + type sqlResourceHistoryListRequest struct { sqltemplate.SQLTemplate Request *historyListRequest diff --git a/pkg/storage/unified/sql/testdata/mysql--resource_history_get-read object history.sql b/pkg/storage/unified/sql/testdata/mysql--resource_history_get-read object history.sql index 4cde1c5f536..66103fecdeb 100755 --- a/pkg/storage/unified/sql/testdata/mysql--resource_history_get-read object history.sql +++ b/pkg/storage/unified/sql/testdata/mysql--resource_history_get-read object history.sql @@ -13,4 +13,4 @@ WHERE 1 = 1 AND `group` = 'gg' AND `resource` = 'rr' AND `name` = 'name' -ORDER BY resource_version DESC +ORDER BY `resource_version` DESC diff --git a/pkg/storage/unified/sql/testdata/mysql--resource_history_list-single path.sql b/pkg/storage/unified/sql/testdata/mysql--resource_history_list-single path.sql index 0599ac2e10f..4a49051687d 100755 --- a/pkg/storage/unified/sql/testdata/mysql--resource_history_list-single path.sql +++ b/pkg/storage/unified/sql/testdata/mysql--resource_history_list-single path.sql @@ -24,6 +24,6 @@ SELECT AND maxkv.`name` = kv.`name` WHERE kv.`action` != 3 AND kv.`namespace` = 'ns' - ORDER BY kv.`namespace` ASC, kv.`name` ASC + ORDER BY kv.`resource_version` ASC LIMIT 10 OFFSET 0 ; diff --git a/pkg/storage/unified/sql/testdata/mysql--resource_list-filter_on_namespace.sql b/pkg/storage/unified/sql/testdata/mysql--resource_list-filter_on_namespace.sql index d72fc1ddd1e..ffb9ed43f85 100755 --- a/pkg/storage/unified/sql/testdata/mysql--resource_list-filter_on_namespace.sql +++ b/pkg/storage/unified/sql/testdata/mysql--resource_list-filter_on_namespace.sql @@ -10,5 +10,5 @@ SELECT FROM `resource` WHERE 1 = 1 AND `namespace` = 'ns' - ORDER BY `namespace` ASC, `name` ASC + ORDER BY `resource_version` ASC ; diff --git a/pkg/storage/unified/sql/testdata/postgres--resource_history_get-read object history.sql b/pkg/storage/unified/sql/testdata/postgres--resource_history_get-read object history.sql index b0b6a310419..680fa3ca0a0 100755 --- a/pkg/storage/unified/sql/testdata/postgres--resource_history_get-read object history.sql +++ b/pkg/storage/unified/sql/testdata/postgres--resource_history_get-read object history.sql @@ -13,4 +13,4 @@ WHERE 1 = 1 AND "group" = 'gg' AND "resource" = 'rr' AND "name" = 'name' -ORDER BY resource_version DESC +ORDER BY "resource_version" DESC diff --git a/pkg/storage/unified/sql/testdata/postgres--resource_history_list-single path.sql b/pkg/storage/unified/sql/testdata/postgres--resource_history_list-single path.sql index 400b2a502cf..86aa57ad2f8 100755 --- a/pkg/storage/unified/sql/testdata/postgres--resource_history_list-single path.sql +++ b/pkg/storage/unified/sql/testdata/postgres--resource_history_list-single path.sql @@ -24,6 +24,6 @@ SELECT AND maxkv."name" = kv."name" WHERE kv."action" != 3 AND kv."namespace" = 'ns' - ORDER BY kv."namespace" ASC, kv."name" ASC + ORDER BY kv."resource_version" ASC LIMIT 10 OFFSET 0 ; diff --git a/pkg/storage/unified/sql/testdata/postgres--resource_list-filter_on_namespace.sql b/pkg/storage/unified/sql/testdata/postgres--resource_list-filter_on_namespace.sql index 41131a1520f..9268c75bb2b 100755 --- a/pkg/storage/unified/sql/testdata/postgres--resource_list-filter_on_namespace.sql +++ b/pkg/storage/unified/sql/testdata/postgres--resource_list-filter_on_namespace.sql @@ -10,5 +10,5 @@ SELECT FROM "resource" WHERE 1 = 1 AND "namespace" = 'ns' - ORDER BY "namespace" ASC, "name" ASC + ORDER BY "resource_version" ASC ; diff --git a/pkg/storage/unified/sql/testdata/sqlite--resource_history_get-read object history.sql b/pkg/storage/unified/sql/testdata/sqlite--resource_history_get-read object history.sql index b0b6a310419..680fa3ca0a0 100755 --- a/pkg/storage/unified/sql/testdata/sqlite--resource_history_get-read object history.sql +++ b/pkg/storage/unified/sql/testdata/sqlite--resource_history_get-read object history.sql @@ -13,4 +13,4 @@ WHERE 1 = 1 AND "group" = 'gg' AND "resource" = 'rr' AND "name" = 'name' -ORDER BY resource_version DESC +ORDER BY "resource_version" DESC diff --git a/pkg/storage/unified/sql/testdata/sqlite--resource_history_list-single path.sql b/pkg/storage/unified/sql/testdata/sqlite--resource_history_list-single path.sql index 400b2a502cf..86aa57ad2f8 100755 --- a/pkg/storage/unified/sql/testdata/sqlite--resource_history_list-single path.sql +++ b/pkg/storage/unified/sql/testdata/sqlite--resource_history_list-single path.sql @@ -24,6 +24,6 @@ SELECT AND maxkv."name" = kv."name" WHERE kv."action" != 3 AND kv."namespace" = 'ns' - ORDER BY kv."namespace" ASC, kv."name" ASC + ORDER BY kv."resource_version" ASC LIMIT 10 OFFSET 0 ; diff --git a/pkg/storage/unified/sql/testdata/sqlite--resource_list-filter_on_namespace.sql b/pkg/storage/unified/sql/testdata/sqlite--resource_list-filter_on_namespace.sql index 41131a1520f..9268c75bb2b 100755 --- a/pkg/storage/unified/sql/testdata/sqlite--resource_list-filter_on_namespace.sql +++ b/pkg/storage/unified/sql/testdata/sqlite--resource_list-filter_on_namespace.sql @@ -10,5 +10,5 @@ SELECT FROM "resource" WHERE 1 = 1 AND "namespace" = 'ns' - ORDER BY "namespace" ASC, "name" ASC + ORDER BY "resource_version" ASC ; diff --git a/pkg/storage/unified/testing/storage_backend.go b/pkg/storage/unified/testing/storage_backend.go index e3314376715..2390a6e68fb 100644 --- a/pkg/storage/unified/testing/storage_backend.go +++ b/pkg/storage/unified/testing/storage_backend.go @@ -175,8 +175,8 @@ func runTestIntegrationBackendHappyPath(t *testing.T, backend resource.StorageBa require.NoError(t, err) require.Nil(t, resp.Error) require.Len(t, resp.Items, 2) - require.Contains(t, string(resp.Items[0].Value), "item2 MODIFIED") - require.Contains(t, string(resp.Items[1].Value), "item3 ADDED") + require.Contains(t, string(resp.Items[0].Value), "item3 ADDED") + require.Contains(t, string(resp.Items[1].Value), "item2 MODIFIED") require.GreaterOrEqual(t, resp.ResourceVersion, rv5) // rv5 is the latest resource version }) @@ -372,11 +372,11 @@ func runTestIntegrationBackendList(t *testing.T, backend resource.StorageBackend require.NoError(t, err) require.Nil(t, res.Error) require.Len(t, res.Items, 5) - // should be sorted by key ASC + // should be sorted by resource_version ASC require.Contains(t, string(res.Items[0].Value), "item1 ADDED") - require.Contains(t, string(res.Items[1].Value), "item2 MODIFIED") - require.Contains(t, string(res.Items[2].Value), "item4 ADDED") - require.Contains(t, string(res.Items[3].Value), "item5 ADDED") + require.Contains(t, string(res.Items[1].Value), "item4 ADDED") + require.Contains(t, string(res.Items[2].Value), "item5 ADDED") + require.Contains(t, string(res.Items[3].Value), "item2 MODIFIED") require.Contains(t, string(res.Items[4].Value), "item6 ADDED") require.Empty(t, res.NextPageToken) @@ -399,8 +399,8 @@ func runTestIntegrationBackendList(t *testing.T, backend resource.StorageBackend continueToken, err := resource.GetContinueToken(res.NextPageToken) require.NoError(t, err) require.Contains(t, string(res.Items[0].Value), "item1 ADDED") - require.Contains(t, string(res.Items[1].Value), "item2 MODIFIED") - require.Contains(t, string(res.Items[2].Value), "item4 ADDED") + require.Contains(t, string(res.Items[1].Value), "item4 ADDED") + require.Contains(t, string(res.Items[2].Value), "item5 ADDED") require.GreaterOrEqual(t, continueToken.ResourceVersion, rv8) }) @@ -438,13 +438,12 @@ func runTestIntegrationBackendList(t *testing.T, backend resource.StorageBackend }, }) require.NoError(t, err) - require.NoError(t, err) require.Nil(t, res.Error) require.Len(t, res.Items, 3) t.Log(res.Items) require.Contains(t, string(res.Items[0].Value), "item1 ADDED") - require.Contains(t, string(res.Items[1].Value), "item2 MODIFIED") - require.Contains(t, string(res.Items[2].Value), "item4 ADDED") + require.Contains(t, string(res.Items[1].Value), "item4 ADDED") + require.Contains(t, string(res.Items[2].Value), "item5 ADDED") continueToken, err := resource.GetContinueToken(res.NextPageToken) require.NoError(t, err) @@ -471,15 +470,84 @@ func runTestIntegrationBackendList(t *testing.T, backend resource.StorageBackend require.NoError(t, err) require.Nil(t, res.Error) require.Len(t, res.Items, 2) - t.Log(res.Items) - require.Contains(t, string(res.Items[0].Value), "item4 ADDED") - require.Contains(t, string(res.Items[1].Value), "item5 ADDED") + require.Contains(t, string(res.Items[0].Value), "item5 ADDED") + require.Contains(t, string(res.Items[1].Value), "item2 MODIFIED") continueToken, err = resource.GetContinueToken(res.NextPageToken) require.NoError(t, err) require.Equal(t, rv8, continueToken.ResourceVersion) require.Equal(t, int64(4), continueToken.StartOffset) }) + + t.Run("Paginate through latest items one by one", func(t *testing.T) { + baseKey := &resourcepb.ResourceKey{ + Namespace: ns, + Group: "group", + Resource: "resource", + } + expectedItems := []string{"item1 ADDED", "item4 ADDED", "item5 ADDED", "item2 MODIFIED", "item6 ADDED"} + var allItems []*resourcepb.ResourceWrapper + var nextPageToken string + + for i, expectedValue := range expectedItems { + req := &resourcepb.ListRequest{ + Limit: 1, + NextPageToken: nextPageToken, + Options: &resourcepb.ListOptions{ + Key: baseKey, + }, + } + + res, err := server.List(ctx, req) + require.NoError(t, err) + require.Nil(t, res.Error) + require.Len(t, res.Items, 1) + require.Contains(t, string(res.Items[0].Value), expectedValue) + allItems = append(allItems, res.Items[0]) + + if i < len(expectedItems)-1 { + require.NotEmpty(t, res.NextPageToken, "should have a continue token for page %d", i+1) + nextPageToken = res.NextPageToken + } else { + require.Empty(t, res.NextPageToken, "should not have a continue token on the last page") + } + } + require.Len(t, allItems, len(expectedItems)) + }) + + t.Run("Paginate latest with a limit larger than remaining items", func(t *testing.T) { + baseKey := &resourcepb.ResourceKey{ + Namespace: ns, + Group: "group", + Resource: "resource", + } + // Request first 3 items (out of 5 total) + req := &resourcepb.ListRequest{ + Limit: 3, + Options: &resourcepb.ListOptions{ + Key: baseKey, + }, + } + res1, err := server.List(ctx, req) + require.NoError(t, err) + require.Nil(t, res1.Error) + require.Len(t, res1.Items, 3) + require.NotEmpty(t, res1.NextPageToken) + require.Contains(t, string(res1.Items[0].Value), "item1 ADDED") + require.Contains(t, string(res1.Items[1].Value), "item4 ADDED") + require.Contains(t, string(res1.Items[2].Value), "item5 ADDED") + + // Request next page with a large limit + req.Limit = 10 // Larger than the 2 remaining items + req.NextPageToken = res1.NextPageToken + res2, err := server.List(ctx, req) + require.NoError(t, err) + require.Nil(t, res2.Error) + require.Len(t, res2.Items, 2) // Should only get the 2 remaining items + require.Contains(t, string(res2.Items[0].Value), "item2 MODIFIED") + require.Contains(t, string(res2.Items[1].Value), "item6 ADDED") + require.Empty(t, res2.NextPageToken, "should be no continue token on the last page") + }) } func runTestIntegrationBackendListHistory(t *testing.T, backend resource.StorageBackend, nsPrefix string) { @@ -659,7 +727,6 @@ func runTestIntegrationBackendListHistory(t *testing.T, backend resource.Storage require.NoError(t, err) require.Nil(t, res.Error) require.Len(t, res.Items, 2) - t.Log(res.Items) require.Contains(t, string(res.Items[0].Value), "item1 MODIFIED") require.Equal(t, rvHistory2, res.Items[0].ResourceVersion) require.Contains(t, string(res.Items[1].Value), "item1 MODIFIED")