From 19f6dbe1bb8f51e6cbfdedf74d905bb78900a308 Mon Sep 17 00:00:00 2001 From: Renato Costa <103441181+renatolabs@users.noreply.github.com> Date: Thu, 18 Dec 2025 11:21:36 -0500 Subject: [PATCH] unified-storage: add `BatchGet` support to the sqlkv implementation (#115517) * unified-storage: add `BatchGet` support to the sqlkv implementation * address comments * fix linting --- .../unified/resource/data/sqlkv_batch_get.sql | 12 +++ pkg/storage/unified/resource/sqlkv.go | 78 +++++++++++++++++-- pkg/storage/unified/testing/kv.go | 75 +++++++++++------- pkg/storage/unified/testing/kv_test.go | 1 - 4 files changed, 131 insertions(+), 35 deletions(-) create mode 100644 pkg/storage/unified/resource/data/sqlkv_batch_get.sql diff --git a/pkg/storage/unified/resource/data/sqlkv_batch_get.sql b/pkg/storage/unified/resource/data/sqlkv_batch_get.sql new file mode 100644 index 00000000000..0babcb36970 --- /dev/null +++ b/pkg/storage/unified/resource/data/sqlkv_batch_get.sql @@ -0,0 +1,12 @@ +SELECT r.{{ .Ident "key_path" }}, r.{{ .Ident "value" }} +FROM ( +{{ range $id, $key_path := .KeyPaths }} + {{ if eq $id 0 }} + SELECT {{ $.Arg $id }} AS idx, {{ $.Arg $key_path }} AS key_path + {{ else }} + UNION ALL SELECT {{ $.Arg $id }}, {{ $.Arg $key_path }} + {{ end }} +{{ end }} +) AS requested_keys +INNER JOIN {{ .TableName }} r ON r.{{ .Ident "key_path" }} = requested_keys.{{ .Ident "key_path" }} +ORDER BY requested_keys.{{ .Ident "idx" }}; diff --git a/pkg/storage/unified/resource/sqlkv.go b/pkg/storage/unified/resource/sqlkv.go index a56d00a3e12..22cd3085122 100644 --- a/pkg/storage/unified/resource/sqlkv.go +++ b/pkg/storage/unified/resource/sqlkv.go @@ -34,9 +34,10 @@ func mustTemplate(filename string) *template.Template { // Templates. var ( - sqlKVGet = mustTemplate("sqlkv_get.sql") - sqlKVDelete = mustTemplate("sqlkv_delete.sql") - sqlKVKeys = mustTemplate("sqlkv_keys.sql") + sqlKVKeys = mustTemplate("sqlkv_keys.sql") + sqlKVGet = mustTemplate("sqlkv_get.sql") + sqlKVBatchGet = mustTemplate("sqlkv_batch_get.sql") + sqlKVDelete = mustTemplate("sqlkv_delete.sql") ) // sqlKVSection can be embedded in structs used when rendering query templates @@ -107,13 +108,23 @@ func (req sqlKVGetRequest) Results() ([]byte, error) { return req.Value, nil } -type sqlKVDeleteRequest struct { +type sqlKVBatchGetRequest struct { sqltemplate.SQLTemplate - sqlKVSectionKey + sqlKVSection + Keys []string } -func (req sqlKVDeleteRequest) Validate() error { - return req.sqlKVSectionKey.Validate() +func (req sqlKVBatchGetRequest) Validate() error { + return req.sqlKVSection.Validate() +} + +func (req sqlKVBatchGetRequest) KeyPaths() []string { + result := make([]string, 0, len(req.Keys)) + for _, key := range req.Keys { + result = append(result, req.Section+"/"+key) + } + + return result } type sqlKVKeysRequest struct { @@ -142,6 +153,15 @@ func (req sqlKVKeysRequest) SortAscending() bool { return req.Options.Sort != SortOrderDesc } +type sqlKVDeleteRequest struct { + sqltemplate.SQLTemplate + sqlKVSectionKey +} + +func (req sqlKVDeleteRequest) Validate() error { + return req.sqlKVSectionKey.Validate() +} + var _ KV = &sqlKV{} type sqlKV struct { @@ -188,6 +208,7 @@ func (k *sqlKV) Keys(ctx context.Context, section string, opt ListOptions) iter. yield("", err) return } + defer closeRows(rows, yield) for rows.Next() { var key string @@ -225,7 +246,41 @@ func (k *sqlKV) Get(ctx context.Context, section string, key string) (io.ReadClo func (k *sqlKV) BatchGet(ctx context.Context, section string, keys []string) iter.Seq2[KeyValue, error] { return func(yield func(KeyValue, error) bool) { - panic("not implemented!") + if len(keys) == 0 { + return + } + + rows, err := dbutil.QueryRows(ctx, k.db, sqlKVBatchGet, sqlKVBatchGetRequest{ + SQLTemplate: sqltemplate.New(k.dialect), + sqlKVSection: sqlKVSection{section}, + Keys: keys, + }) + if err != nil { + yield(KeyValue{}, err) + return + } + defer closeRows(rows, yield) + + for rows.Next() { + var key string + var value []byte + if err := rows.Scan(&key, &value); err != nil { + yield(KeyValue{}, fmt.Errorf("error reading row: %w", err)) + return + } + + kv := KeyValue{ + Key: strings.TrimPrefix(key, section+"/"), + Value: io.NopCloser(bytes.NewReader(value)), + } + if !yield(kv, nil) { + return + } + } + + if err := rows.Err(); err != nil { + yield(KeyValue{}, fmt.Errorf("failed to read rows: %w", err)) + } } } @@ -273,3 +328,10 @@ func (k *sqlKV) BatchDelete(ctx context.Context, section string, keys []string) func (k *sqlKV) UnixTimestamp(ctx context.Context) (int64, error) { panic("not implemented!") } + +func closeRows[T any](rows db.Rows, yield func(T, error) bool) { + if err := rows.Close(); err != nil { + var zero T + yield(zero, fmt.Errorf("error closing rows: %w", err)) + } +} diff --git a/pkg/storage/unified/testing/kv.go b/pkg/storage/unified/testing/kv.go index b4c61f84c13..fbd831f6281 100644 --- a/pkg/storage/unified/testing/kv.go +++ b/pkg/storage/unified/testing/kv.go @@ -103,6 +103,7 @@ func namespacedKey(nsPrefix, key string) string { func runTestKVGet(t *testing.T, kv resource.KV, nsPrefix string) { ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second)) + nsPrefix += "-get" t.Run("get existing key", func(t *testing.T) { // First save a key @@ -221,6 +222,7 @@ func runTestKVSave(t *testing.T, kv resource.KV, nsPrefix string) { func runTestKVDelete(t *testing.T, kv resource.KV, nsPrefix string) { ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second)) + nsPrefix += "-delete" t.Run("delete existing key", func(t *testing.T) { // First create a key @@ -262,6 +264,7 @@ func runTestKVDelete(t *testing.T, kv resource.KV, nsPrefix string) { func runTestKVKeys(t *testing.T, kv resource.KV, nsPrefix string) { ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second)) + nsPrefix += "-keys" // Setup test data testKeys := namespacedKeys(nsPrefix, []string{"a1", "a2", "b1", "b2", "c1"}) @@ -360,6 +363,7 @@ func runTestKVKeys(t *testing.T, kv resource.KV, nsPrefix string) { func runTestKVKeysWithLimits(t *testing.T, kv resource.KV, nsPrefix string) { ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second)) + nsPrefix += "-keys-with-limits" // Setup test data testKeys := namespacedKeys(nsPrefix, []string{"a1", "a2", "b1", "b2", "c1", "c2", "d1", "d2"}) @@ -416,6 +420,7 @@ func runTestKVKeysWithLimits(t *testing.T, kv resource.KV, nsPrefix string) { func runTestKVKeysWithSort(t *testing.T, kv resource.KV, nsPrefix string) { ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second)) + nsPrefix += "-keys-with-sort" // Setup test data testKeys := namespacedKeys(nsPrefix, []string{"a1", "a2", "b1", "b2", "c1"}) @@ -619,29 +624,29 @@ func runTestKVUnixTimestamp(t *testing.T, kv resource.KV, nsPrefix string) { func runTestKVBatchGet(t *testing.T, kv resource.KV, nsPrefix string) { ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second)) - section := nsPrefix + "-batchget" + nsPrefix += "-batchget" t.Run("batch get existing keys", func(t *testing.T) { // Setup test data testData := map[string]string{ - "key1": "value1", - "key2": "value2", - "key3": "value3", + namespacedKey(nsPrefix, "key1"): "value1", + namespacedKey(nsPrefix, "key2"): "value2", + namespacedKey(nsPrefix, "key3"): "value3", } // Save test data for key, value := range testData { - saveKVHelper(t, kv, ctx, section, key, strings.NewReader(value)) + saveKVHelper(t, kv, ctx, testSection, key, strings.NewReader(value)) } // Batch get all keys - keys := []string{"key1", "key2", "key3"} + keys := namespacedKeys(nsPrefix, []string{"key1", "key2", "key3"}) type result struct { key string value string } var results []result - for kv, err := range kv.BatchGet(ctx, section, keys) { + for kv, err := range kv.BatchGet(ctx, testSection, keys) { require.NoError(t, err) value, err := io.ReadAll(kv.Value) require.NoError(t, err) @@ -651,10 +656,10 @@ func runTestKVBatchGet(t *testing.T, kv resource.KV, nsPrefix string) { } // Verify results - assert.Len(t, results, 3) + require.Len(t, results, 3) // Check that all keys are present and in order - expectedKeys := []string{"key1", "key2", "key3"} + expectedKeys := namespacedKeys(nsPrefix, []string{"key1", "key2", "key3"}) actualKeys := make([]string, len(results)) for i, r := range results { actualKeys[i] = r.key @@ -663,22 +668,40 @@ func runTestKVBatchGet(t *testing.T, kv resource.KV, nsPrefix string) { // Verify values for _, r := range results { - assert.Equal(t, testData[r.key], r.value) + assert.Equal(t, testData[r.key], r.value, "key = %s", r.key) } }) + t.Run("batch get with empty section", func(t *testing.T) { + var kvs []resource.KeyValue + var errs []error + keys := namespacedKeys(nsPrefix, []string{"key1", "key2", "key3"}) + for kv, err := range kv.BatchGet(ctx, "", keys) { + if err != nil { + errs = append(errs, err) + continue + } + + kvs = append(kvs, kv) + } + + require.Len(t, errs, 1) + assert.Contains(t, errs[0].Error(), "section is required") + assert.Empty(t, kvs) + }) + t.Run("batch get with non-existent keys", func(t *testing.T) { // Setup some test data - saveKVHelper(t, kv, ctx, section, "existing-key", strings.NewReader("existing-value")) + saveKVHelper(t, kv, ctx, testSection, namespacedKey(nsPrefix, "existing-key"), strings.NewReader("existing-value")) // Batch get with mix of existing and non-existent keys - keys := []string{"existing-key", "non-existent-1", "non-existent-2"} + keys := namespacedKeys(nsPrefix, []string{"existing-key", "non-existent-1", "non-existent-2"}) type result struct { key string value string } var results []result - for kv, err := range kv.BatchGet(ctx, section, keys) { + for kv, err := range kv.BatchGet(ctx, testSection, keys) { require.NoError(t, err) value, err := io.ReadAll(kv.Value) require.NoError(t, err) @@ -688,15 +711,15 @@ func runTestKVBatchGet(t *testing.T, kv resource.KV, nsPrefix string) { } // Should only return the existing key - assert.Len(t, results, 1) - assert.Equal(t, "existing-key", results[0].key) + require.Len(t, results, 1) + assert.Equal(t, namespacedKey(nsPrefix, "existing-key"), results[0].key) assert.Equal(t, "existing-value", results[0].value) }) t.Run("batch get with all non-existent keys", func(t *testing.T) { - keys := []string{"non-existent-1", "non-existent-2", "non-existent-3"} + keys := namespacedKeys(nsPrefix, []string{"non-existent-1", "non-existent-2", "non-existent-3"}) var results []resource.KeyValue - for kv, err := range kv.BatchGet(ctx, section, keys) { + for kv, err := range kv.BatchGet(ctx, testSection, keys) { require.NoError(t, err) results = append(results, kv) } @@ -708,7 +731,7 @@ func runTestKVBatchGet(t *testing.T, kv resource.KV, nsPrefix string) { t.Run("batch get with empty keys list", func(t *testing.T) { keys := []string{} var results []resource.KeyValue - for kv, err := range kv.BatchGet(ctx, section, keys) { + for kv, err := range kv.BatchGet(ctx, testSection, keys) { require.NoError(t, err) results = append(results, kv) } @@ -718,16 +741,16 @@ func runTestKVBatchGet(t *testing.T, kv resource.KV, nsPrefix string) { }) t.Run("batch get with empty section", func(t *testing.T) { - keys := []string{"some-key"} + keys := namespacedKeys(nsPrefix, []string{"some-key"}) var errors []error for kv, err := range kv.BatchGet(ctx, "", keys) { if err != nil { errors = append(errors, err) - break + continue } _ = kv // unused } - assert.Len(t, errors, 1) + require.Len(t, errors, 1) assert.Contains(t, errors[0].Error(), "section is required") }) @@ -741,13 +764,13 @@ func runTestKVBatchGet(t *testing.T, kv resource.KV, nsPrefix string) { // Save test data for key, value := range testData { - saveKVHelper(t, kv, ctx, section, key, strings.NewReader(value)) + saveKVHelper(t, kv, ctx, testSection, namespacedKey(nsPrefix, key), strings.NewReader(value)) } // Batch get in specific order - keys := []string{"z-key", "a-key", "m-key"} + keys := namespacedKeys(nsPrefix, []string{"z-key", "invalid-key1", "a-key", "invalid-key2", "m-key", "invalid-key3"}) var results []string - for kv, err := range kv.BatchGet(ctx, section, keys) { + for kv, err := range kv.BatchGet(ctx, testSection, keys) { require.NoError(t, err) err = kv.Value.Close() require.NoError(t, err) @@ -755,8 +778,8 @@ func runTestKVBatchGet(t *testing.T, kv resource.KV, nsPrefix string) { } // Verify order is preserved - assert.Len(t, results, 3) - expectedOrder := []string{"z-key", "a-key", "m-key"} + require.Len(t, results, 3) + expectedOrder := namespacedKeys(nsPrefix, []string{"z-key", "a-key", "m-key"}) assert.Equal(t, expectedOrder, results) }) } diff --git a/pkg/storage/unified/testing/kv_test.go b/pkg/storage/unified/testing/kv_test.go index 780f36fcc66..dafefc15ed2 100644 --- a/pkg/storage/unified/testing/kv_test.go +++ b/pkg/storage/unified/testing/kv_test.go @@ -50,7 +50,6 @@ func TestSQLKV(t *testing.T) { TestKVSave: true, TestKVConcurrent: true, TestKVUnixTimestamp: true, - TestKVBatchGet: true, TestKVBatchDelete: true, }, })