From 3bb75e2a90183ce3500f4fae50fc85bf8ca9cbb4 Mon Sep 17 00:00:00 2001 From: Georges Chaudy Date: Tue, 14 Oct 2025 17:19:26 +0200 Subject: [PATCH] kvstore: Add BatchGet (#111594) * Add BatchGet * restructure the batch get tests --- pkg/storage/unified/resource/kv.go | 61 ++++++++++++ pkg/storage/unified/testing/kv.go | 146 +++++++++++++++++++++++++++++ 2 files changed, 207 insertions(+) diff --git a/pkg/storage/unified/resource/kv.go b/pkg/storage/unified/resource/kv.go index a4215a1570a..8700c9538aa 100644 --- a/pkg/storage/unified/resource/kv.go +++ b/pkg/storage/unified/resource/kv.go @@ -15,6 +15,12 @@ import ( var ErrNotFound = errors.New("key not found") +// KeyValue represents a key-value pair returned by BatchGet +type KeyValue struct { + Key string + Value io.ReadCloser +} + type SortOrder int const ( @@ -36,6 +42,11 @@ type KV interface { // Get retrieves the value for a key from the store Get(ctx context.Context, section string, key string) (io.ReadCloser, error) + // BatchGet retrieves multiple values for the given keys from the store. + // Non-existent entries will not appear in the result. + // The order of the keys is retained in the result. + BatchGet(ctx context.Context, section string, keys []string) iter.Seq2[KeyValue, error] + // Save a new value - returns a WriteCloser to write the value to Save(ctx context.Context, section string, key string) (io.WriteCloser, error) @@ -92,6 +103,56 @@ func (k *badgerKV) Get(ctx context.Context, section string, key string) (io.Read return io.NopCloser(bytes.NewReader(value)), nil } +func (k *badgerKV) BatchGet(ctx context.Context, section string, keys []string) iter.Seq2[KeyValue, error] { + if k.db.IsClosed() { + return func(yield func(KeyValue, error) bool) { + yield(KeyValue{}, fmt.Errorf("database is closed")) + } + } + + if section == "" { + return func(yield func(KeyValue, error) bool) { + yield(KeyValue{}, fmt.Errorf("section is required")) + } + } + + return func(yield func(KeyValue, error) bool) { + txn := k.db.NewTransaction(false) + defer txn.Discard() + + for _, key := range keys { + keyWithSection := section + "/" + key + + item, err := txn.Get([]byte(keyWithSection)) + if err != nil { + if errors.Is(err, badger.ErrKeyNotFound) { + // Skip non-existent keys as per the requirement + continue + } + // For other errors, yield the error and stop + yield(KeyValue{}, err) + return + } + + // Get the value and create a reader from it + value, err := item.ValueCopy(nil) + if err != nil { + yield(KeyValue{}, err) + return + } + + kv := KeyValue{ + Key: key, + Value: io.NopCloser(bytes.NewReader(value)), + } + + if !yield(kv, nil) { + return + } + } + } +} + // badgerWriteCloser implements io.WriteCloser for badgerKV type badgerWriteCloser struct { db *badger.DB diff --git a/pkg/storage/unified/testing/kv.go b/pkg/storage/unified/testing/kv.go index 79704b48b55..00f5a43e5e3 100644 --- a/pkg/storage/unified/testing/kv.go +++ b/pkg/storage/unified/testing/kv.go @@ -26,6 +26,7 @@ const ( TestKVKeysWithSort = "keys with sorting" TestKVConcurrent = "concurrent operations" TestKVUnixTimestamp = "unix timestamp" + TestKVBatchGet = "batch get operations" ) // NewKVFunc is a function that creates a new KV instance for testing @@ -65,6 +66,7 @@ func RunKVTest(t *testing.T, newKV NewKVFunc, opts *KVTestOptions) { {TestKVKeysWithSort, runTestKVKeysWithSort}, {TestKVConcurrent, runTestKVConcurrent}, {TestKVUnixTimestamp, runTestKVUnixTimestamp}, + {TestKVBatchGet, runTestKVBatchGet}, } for _, tc := range cases { @@ -527,6 +529,150 @@ func runTestKVUnixTimestamp(t *testing.T, kv resource.KV, nsPrefix string) { }) } +func runTestKVBatchGet(t *testing.T, kv resource.KV, nsPrefix string) { + ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second)) + section := nsPrefix + "-batchget" + + t.Run("batch get existing keys", func(t *testing.T) { + // Setup test data + testData := map[string]string{ + "key1": "value1", + "key2": "value2", + "key3": "value3", + } + + // Save test data + for key, value := range testData { + saveKVHelper(t, kv, ctx, section, key, strings.NewReader(value)) + } + + // Batch get all keys + keys := []string{"key1", "key2", "key3"} + type result struct { + key string + value string + } + var results []result + for kv, err := range kv.BatchGet(ctx, section, keys) { + require.NoError(t, err) + value, err := io.ReadAll(kv.Value) + require.NoError(t, err) + err = kv.Value.Close() + require.NoError(t, err) + results = append(results, result{key: kv.Key, value: string(value)}) + } + + // Verify results + assert.Len(t, results, 3) + + // Check that all keys are present and in order + expectedKeys := []string{"key1", "key2", "key3"} + actualKeys := make([]string, len(results)) + for i, r := range results { + actualKeys[i] = r.key + } + assert.Equal(t, expectedKeys, actualKeys) + + // Verify values + for _, r := range results { + assert.Equal(t, testData[r.key], r.value) + } + }) + + t.Run("batch get with non-existent keys", func(t *testing.T) { + // Setup some test data + saveKVHelper(t, kv, ctx, section, "existing-key", strings.NewReader("existing-value")) + + // Batch get with mix of existing and non-existent keys + keys := []string{"existing-key", "non-existent-1", "non-existent-2"} + type result struct { + key string + value string + } + var results []result + for kv, err := range kv.BatchGet(ctx, section, keys) { + require.NoError(t, err) + value, err := io.ReadAll(kv.Value) + require.NoError(t, err) + err = kv.Value.Close() + require.NoError(t, err) + results = append(results, result{key: kv.Key, value: string(value)}) + } + + // Should only return the existing key + assert.Len(t, results, 1) + assert.Equal(t, "existing-key", results[0].key) + assert.Equal(t, "existing-value", results[0].value) + }) + + t.Run("batch get with all non-existent keys", func(t *testing.T) { + keys := []string{"non-existent-1", "non-existent-2", "non-existent-3"} + var results []resource.KeyValue + for kv, err := range kv.BatchGet(ctx, section, keys) { + require.NoError(t, err) + results = append(results, kv) + } + + // Should return no results + assert.Empty(t, results) + }) + + t.Run("batch get with empty keys list", func(t *testing.T) { + keys := []string{} + var results []resource.KeyValue + for kv, err := range kv.BatchGet(ctx, section, keys) { + require.NoError(t, err) + results = append(results, kv) + } + + // Should return no results + assert.Empty(t, results) + }) + + t.Run("batch get with empty section", func(t *testing.T) { + keys := []string{"some-key"} + var errors []error + for kv, err := range kv.BatchGet(ctx, "", keys) { + if err != nil { + errors = append(errors, err) + break + } + _ = kv // unused + } + assert.Len(t, errors, 1) + assert.Contains(t, errors[0].Error(), "section is required") + }) + + t.Run("batch get preserves order", func(t *testing.T) { + // Setup test data + testData := map[string]string{ + "z-key": "z-value", + "a-key": "a-value", + "m-key": "m-value", + } + + // Save test data + for key, value := range testData { + saveKVHelper(t, kv, ctx, section, key, strings.NewReader(value)) + } + + // Batch get in specific order + keys := []string{"z-key", "a-key", "m-key"} + var results []string + for kv, err := range kv.BatchGet(ctx, section, keys) { + require.NoError(t, err) + err = kv.Value.Close() + require.NoError(t, err) + results = append(results, kv.Key) + } + + // Verify order is preserved + assert.Len(t, results, 3) + expectedOrder := []string{"z-key", "a-key", "m-key"} + assert.Equal(t, expectedOrder, results) + }) +} + // saveKVHelper is a helper function to save data to KV store using the new WriteCloser interface func saveKVHelper(t *testing.T, kv resource.KV, ctx context.Context, section, key string, value io.Reader) { t.Helper()