diff --git a/pkg/storage/unified/resource/data/sqlkv_keys.sql b/pkg/storage/unified/resource/data/sqlkv_keys.sql new file mode 100644 index 00000000000..36b7bfbe01c --- /dev/null +++ b/pkg/storage/unified/resource/data/sqlkv_keys.sql @@ -0,0 +1,9 @@ +SELECT {{ .Ident "key_path" }} +FROM {{ .TableName }} +WHERE {{ .Ident "key_path" }} >= {{ .Arg .StartKey }} + AND {{ .Ident "key_path" }} < {{ .Arg .EndKey }} +ORDER BY {{ .Ident "key_path" }} {{ if .SortAscending }}ASC{{ else }}DESC{{ end }} +{{ if .Options.Limit }} +LIMIT {{ .Options.Limit }} +{{ end }} +; diff --git a/pkg/storage/unified/resource/sqlkv.go b/pkg/storage/unified/resource/sqlkv.go index 90171a77a5f..a56d00a3e12 100644 --- a/pkg/storage/unified/resource/sqlkv.go +++ b/pkg/storage/unified/resource/sqlkv.go @@ -9,6 +9,7 @@ import ( "fmt" "io" "iter" + "strings" "text/template" "github.com/grafana/grafana/pkg/storage/unified/sql/db" @@ -35,6 +36,7 @@ func mustTemplate(filename string) *template.Template { var ( sqlKVGet = mustTemplate("sqlkv_get.sql") sqlKVDelete = mustTemplate("sqlkv_delete.sql") + sqlKVKeys = mustTemplate("sqlkv_keys.sql") ) // sqlKVSection can be embedded in structs used when rendering query templates @@ -114,6 +116,32 @@ func (req sqlKVDeleteRequest) Validate() error { return req.sqlKVSectionKey.Validate() } +type sqlKVKeysRequest struct { + sqltemplate.SQLTemplate + sqlKVSection + Options ListOptions +} + +func (req sqlKVKeysRequest) Validate() error { + return req.sqlKVSection.Validate() +} + +func (req sqlKVKeysRequest) StartKey() string { + return req.Section + "/" + req.Options.StartKey +} + +func (req sqlKVKeysRequest) EndKey() string { + if req.Options.EndKey == "" { + req.Options.EndKey = PrefixRangeEnd(req.Section + "/") + } + + return req.Section + "/" + req.Options.EndKey +} + +func (req sqlKVKeysRequest) SortAscending() bool { + return req.Options.Sort != SortOrderDesc +} + var _ KV = &sqlKV{} type sqlKV struct { @@ -151,7 +179,31 @@ func (k *sqlKV) Ping(ctx context.Context) error { func (k *sqlKV) Keys(ctx context.Context, section string, opt ListOptions) iter.Seq2[string, error] { return func(yield func(string, error) bool) { - panic("not implemented!") + rows, err := dbutil.QueryRows(ctx, k.db, sqlKVKeys, sqlKVKeysRequest{ + SQLTemplate: sqltemplate.New(k.dialect), + sqlKVSection: sqlKVSection{section}, + Options: opt, + }) + if err != nil { + yield("", err) + return + } + + for rows.Next() { + var key string + if err := rows.Scan(&key); err != nil { + yield("", fmt.Errorf("error reading row: %w", err)) + return + } + + if !yield(strings.TrimPrefix(key, section+"/"), nil) { + return + } + } + + if err := rows.Err(); err != nil { + yield("", fmt.Errorf("failed to read rows: %w", err)) + } } } diff --git a/pkg/storage/unified/testing/kv.go b/pkg/storage/unified/testing/kv.go index fbdb8c70d20..b4c61f84c13 100644 --- a/pkg/storage/unified/testing/kv.go +++ b/pkg/storage/unified/testing/kv.go @@ -88,8 +88,17 @@ func RunKVTest(t *testing.T, newKV NewKVFunc, opts *KVTestOptions) { } } -func prefixKey(nsPrefix, key string) string { - return nsPrefix + "/" + key +func namespacedKeys(nsPrefix string, keys []string) []string { + prefixed := make([]string, 0, len(keys)) + for _, k := range keys { + prefixed = append(prefixed, nsPrefix+"/"+k) + } + + return prefixed +} + +func namespacedKey(nsPrefix, key string) string { + return namespacedKeys(nsPrefix, []string{key})[0] } func runTestKVGet(t *testing.T, kv resource.KV, nsPrefix string) { @@ -97,7 +106,7 @@ func runTestKVGet(t *testing.T, kv resource.KV, nsPrefix string) { t.Run("get existing key", func(t *testing.T) { // First save a key - existingKey := prefixKey(nsPrefix, "existing-key") + existingKey := namespacedKey(nsPrefix, "existing-key") testValue := "test value for get" saveKVHelper(t, kv, ctx, testSection, existingKey, strings.NewReader(testValue)) @@ -116,13 +125,13 @@ func runTestKVGet(t *testing.T, kv resource.KV, nsPrefix string) { }) t.Run("get non-existent key", func(t *testing.T) { - _, err := kv.Get(ctx, testSection, prefixKey(nsPrefix, "non-existent-key")) + _, err := kv.Get(ctx, testSection, namespacedKey(nsPrefix, "non-existent-key")) assert.Error(t, err) assert.Equal(t, resource.ErrNotFound, err) }) t.Run("get with empty section", func(t *testing.T) { - _, err := kv.Get(ctx, "", prefixKey(nsPrefix, "some-key")) + _, err := kv.Get(ctx, "", namespacedKey(nsPrefix, "some-key")) assert.Error(t, err) assert.Contains(t, err.Error(), "section is required") }) @@ -215,7 +224,7 @@ func runTestKVDelete(t *testing.T, kv resource.KV, nsPrefix string) { t.Run("delete existing key", func(t *testing.T) { // First create a key - deleteKey := prefixKey(nsPrefix, "delete-key") + deleteKey := namespacedKey(nsPrefix, "delete-key") saveKVHelper(t, kv, ctx, testSection, deleteKey, strings.NewReader("delete me")) // Verify it exists @@ -233,13 +242,13 @@ func runTestKVDelete(t *testing.T, kv resource.KV, nsPrefix string) { }) t.Run("delete non-existent key", func(t *testing.T) { - err := kv.Delete(ctx, testSection, prefixKey(nsPrefix, "non-existent-delete-key")) + err := kv.Delete(ctx, testSection, namespacedKey(nsPrefix, "non-existent-delete-key")) assert.Error(t, err) assert.Equal(t, resource.ErrNotFound, err) }) t.Run("delete with empty section", func(t *testing.T) { - err := kv.Delete(ctx, "", prefixKey(nsPrefix, "some-key")) + err := kv.Delete(ctx, "", namespacedKey(nsPrefix, "some-key")) assert.Error(t, err) assert.Contains(t, err.Error(), "section is required") }) @@ -253,17 +262,16 @@ func runTestKVDelete(t *testing.T, kv resource.KV, nsPrefix string) { func runTestKVKeys(t *testing.T, kv resource.KV, nsPrefix string) { ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second)) - section := nsPrefix + "-keys" // Setup test data - testKeys := []string{"a1", "a2", "b1", "b2", "c1"} + testKeys := namespacedKeys(nsPrefix, []string{"a1", "a2", "b1", "b2", "c1"}) for _, key := range testKeys { - saveKVHelper(t, kv, ctx, section, key, strings.NewReader("value"+key)) + saveKVHelper(t, kv, ctx, testSection, key, strings.NewReader("value"+key)) } t.Run("list all keys", func(t *testing.T) { var keys []string - for k, err := range kv.Keys(ctx, section, resource.ListOptions{}) { + for k, err := range kv.Keys(ctx, testSection, resource.ListOptions{}) { require.NoError(t, err) keys = append(keys, k) } @@ -276,134 +284,186 @@ func runTestKVKeys(t *testing.T, kv resource.KV, nsPrefix string) { for k, err := range kv.Keys(ctx, "", resource.ListOptions{}) { if err != nil { errors = append(errors, err) - break + continue } keys = append(keys, k) } - assert.Len(t, errors, 1) + require.Len(t, errors, 1) assert.Contains(t, errors[0].Error(), "section is required") assert.Empty(t, keys) }) + t.Run("invalid sort option, defaults to asc", func(t *testing.T) { + var keys []string + var errors []error + for k, err := range kv.Keys(ctx, testSection, resource.ListOptions{ + Sort: resource.SortOrder(100), + }) { + if err != nil { + errors = append(errors, err) + continue + } + keys = append(keys, k) + } + assert.Empty(t, errors) + assert.Equal(t, testKeys, keys) + }) + + t.Run("list keys with end key < start key", func(t *testing.T) { + var keys []string + var errors []error + for k, err := range kv.Keys(ctx, testSection, resource.ListOptions{ + StartKey: namespacedKey(nsPrefix, "c"), + EndKey: namespacedKey(nsPrefix, "a"), + }) { + if err != nil { + errors = append(errors, err) + continue + } + keys = append(keys, k) + } + // Nothing is yielded + assert.Empty(t, errors) + assert.Empty(t, keys) + }) + t.Run("list keys returns 0 keys", func(t *testing.T) { - // Use a different section with no keys - emptySection := nsPrefix + "-empty-keys" + // Use a key range with no keys. + startKey, endKey := "aaaaa", "aaaaz" var keys []string - for k, err := range kv.Keys(ctx, emptySection, resource.ListOptions{}) { + for k, err := range kv.Keys(ctx, testSection, resource.ListOptions{ + StartKey: startKey, + EndKey: endKey, + }) { require.NoError(t, err) keys = append(keys, k) } assert.Empty(t, keys) assert.Len(t, keys, 0) }) + + t.Run("interrupting the iterator", func(t *testing.T) { + var keys []string + for k, err := range kv.Keys(ctx, testSection, resource.ListOptions{}) { + require.NoError(t, err) + keys = append(keys, k) + + if len(keys) == 2 { + break + } + } + + assert.Equal(t, namespacedKeys(nsPrefix, []string{"a1", "a2"}), keys) + }) } func runTestKVKeysWithLimits(t *testing.T, kv resource.KV, nsPrefix string) { ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second)) - section := nsPrefix + "-keys-limits" // Setup test data - testKeys := []string{"a1", "a2", "b1", "b2", "c1", "c2", "d1", "d2"} + testKeys := namespacedKeys(nsPrefix, []string{"a1", "a2", "b1", "b2", "c1", "c2", "d1", "d2"}) for _, key := range testKeys { - saveKVHelper(t, kv, ctx, section, key, strings.NewReader("value"+key)) + saveKVHelper(t, kv, ctx, testSection, key, strings.NewReader("value"+key)) } t.Run("keys with limit", func(t *testing.T) { var keys []string - for k, err := range kv.Keys(ctx, section, resource.ListOptions{Limit: 3}) { + for k, err := range kv.Keys(ctx, testSection, resource.ListOptions{Limit: 3}) { require.NoError(t, err) keys = append(keys, k) } - assert.Equal(t, []string{"a1", "a2", "b1"}, keys) + assert.Equal(t, namespacedKeys(nsPrefix, []string{"a1", "a2", "b1"}), keys) }) t.Run("keys with range", func(t *testing.T) { var keys []string - for k, err := range kv.Keys(ctx, section, resource.ListOptions{StartKey: "b", EndKey: "d"}) { - require.NoError(t, err) - keys = append(keys, k) - } - assert.Equal(t, []string{"b1", "b2", "c1", "c2"}, keys) - }) - - t.Run("keys with prefix", func(t *testing.T) { - var keys []string - for k, err := range kv.Keys(ctx, section, resource.ListOptions{ - StartKey: "c", - EndKey: resource.PrefixRangeEnd("c"), + for k, err := range kv.Keys(ctx, testSection, resource.ListOptions{ + StartKey: namespacedKey(nsPrefix, "b"), + EndKey: namespacedKey(nsPrefix, "d"), }) { require.NoError(t, err) keys = append(keys, k) } - assert.Equal(t, []string{"c1", "c2"}, keys) + assert.Equal(t, namespacedKeys(nsPrefix, []string{"b1", "b2", "c1", "c2"}), keys) + }) + + t.Run("keys with prefix", func(t *testing.T) { + var keys []string + for k, err := range kv.Keys(ctx, testSection, resource.ListOptions{ + StartKey: namespacedKey(nsPrefix, "c"), + EndKey: namespacedKey(nsPrefix, resource.PrefixRangeEnd("c")), + }) { + require.NoError(t, err) + keys = append(keys, k) + } + assert.Equal(t, namespacedKeys(nsPrefix, []string{"c1", "c2"}), keys) }) t.Run("keys with limit and range", func(t *testing.T) { var keys []string - for k, err := range kv.Keys(ctx, section, resource.ListOptions{ - StartKey: "a", - EndKey: "c", + for k, err := range kv.Keys(ctx, testSection, resource.ListOptions{ + StartKey: namespacedKey(nsPrefix, "a"), + EndKey: namespacedKey(nsPrefix, "c"), Limit: 2, }) { require.NoError(t, err) keys = append(keys, k) } - assert.Equal(t, []string{"a1", "a2"}, keys) + assert.Equal(t, namespacedKeys(nsPrefix, []string{"a1", "a2"}), keys) }) } func runTestKVKeysWithSort(t *testing.T, kv resource.KV, nsPrefix string) { ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second)) - section := nsPrefix + "-keys-sort" // Setup test data - testKeys := []string{"a1", "a2", "b1", "b2", "c1"} + testKeys := namespacedKeys(nsPrefix, []string{"a1", "a2", "b1", "b2", "c1"}) for _, key := range testKeys { - saveKVHelper(t, kv, ctx, section, key, strings.NewReader("value"+key)) + saveKVHelper(t, kv, ctx, testSection, key, strings.NewReader("value"+key)) } t.Run("keys in ascending order (default)", func(t *testing.T) { var keys []string - for k, err := range kv.Keys(ctx, section, resource.ListOptions{Sort: resource.SortOrderAsc}) { + for k, err := range kv.Keys(ctx, testSection, resource.ListOptions{Sort: resource.SortOrderAsc}) { require.NoError(t, err) keys = append(keys, k) } - assert.Equal(t, []string{"a1", "a2", "b1", "b2", "c1"}, keys) + assert.Equal(t, namespacedKeys(nsPrefix, []string{"a1", "a2", "b1", "b2", "c1"}), keys) }) t.Run("keys in descending order", func(t *testing.T) { var keys []string - for k, err := range kv.Keys(ctx, section, resource.ListOptions{Sort: resource.SortOrderDesc}) { + for k, err := range kv.Keys(ctx, testSection, resource.ListOptions{Sort: resource.SortOrderDesc}) { require.NoError(t, err) keys = append(keys, k) } - assert.Equal(t, []string{"c1", "b2", "b1", "a2", "a1"}, keys) + assert.Equal(t, namespacedKeys(nsPrefix, []string{"c1", "b2", "b1", "a2", "a1"}), keys) }) t.Run("keys descending with prefix", func(t *testing.T) { var keys []string - for k, err := range kv.Keys(ctx, section, resource.ListOptions{ - StartKey: "a", - EndKey: resource.PrefixRangeEnd("a"), + for k, err := range kv.Keys(ctx, testSection, resource.ListOptions{ + StartKey: namespacedKey(nsPrefix, "a"), + EndKey: namespacedKey(nsPrefix, resource.PrefixRangeEnd("a")), Sort: resource.SortOrderDesc, }) { require.NoError(t, err) keys = append(keys, k) } - assert.Equal(t, []string{"a2", "a1"}, keys) + assert.Equal(t, namespacedKeys(nsPrefix, []string{"a2", "a1"}), keys) }) t.Run("keys descending with limit", func(t *testing.T) { var keys []string - for k, err := range kv.Keys(ctx, section, resource.ListOptions{ + for k, err := range kv.Keys(ctx, testSection, resource.ListOptions{ Sort: resource.SortOrderDesc, Limit: 3, }) { require.NoError(t, err) keys = append(keys, k) } - assert.Equal(t, []string{"c1", "b2", "b1"}, keys) + assert.Equal(t, namespacedKeys(nsPrefix, []string{"c1", "b2", "b1"}), keys) }) } diff --git a/pkg/storage/unified/testing/kv_test.go b/pkg/storage/unified/testing/kv_test.go index f6db1d2f1d8..780f36fcc66 100644 --- a/pkg/storage/unified/testing/kv_test.go +++ b/pkg/storage/unified/testing/kv_test.go @@ -47,14 +47,11 @@ func TestSQLKV(t *testing.T) { }, &KVTestOptions{ NSPrefix: "sql-kv-test", SkipTests: map[string]bool{ - TestKVSave: true, - TestKVKeys: true, - TestKVKeysWithLimits: true, - TestKVKeysWithSort: true, - TestKVConcurrent: true, - TestKVUnixTimestamp: true, - TestKVBatchGet: true, - TestKVBatchDelete: true, + TestKVSave: true, + TestKVConcurrent: true, + TestKVUnixTimestamp: true, + TestKVBatchGet: true, + TestKVBatchDelete: true, }, }) }