From 4c996a8a7f9a37862a3910d5962bc21eda278218 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Mon, 25 Aug 2025 10:13:07 +0200 Subject: [PATCH] search: Guarantee search-after-write consistency (#109972) --- pkg/registry/apis/dashboard/search_test.go | 3 + .../unified/resource/bleve_index_metrics.go | 37 +- pkg/storage/unified/resource/search.go | 229 ++++++++--- pkg/storage/unified/resource/search_test.go | 106 ++++- pkg/storage/unified/search/bleve.go | 237 +++++++++++- .../unified/search/bleve_performance_test.go | 2 +- .../unified/search/bleve_search_test.go | 8 +- pkg/storage/unified/search/bleve_test.go | 363 +++++++++++++++++- pkg/storage/unified/search/options.go | 3 +- pkg/storage/unified/testing/benchmark.go | 2 +- pkg/storage/unified/testing/search_backend.go | 6 +- 11 files changed, 863 insertions(+), 133 deletions(-) diff --git a/pkg/registry/apis/dashboard/search_test.go b/pkg/registry/apis/dashboard/search_test.go index bf15cd7cc14..340ad01dc86 100644 --- a/pkg/registry/apis/dashboard/search_test.go +++ b/pkg/registry/apis/dashboard/search_test.go @@ -691,3 +691,6 @@ func (m *MockClient) IsHealthy(ctx context.Context, in *resourcepb.HealthCheckRe func (m *MockClient) BulkProcess(ctx context.Context, opts ...grpc.CallOption) (resourcepb.BulkStore_BulkProcessClient, error) { return nil, nil } +func (m *MockClient) UpdateIndex(ctx context.Context, reason string) error { + return nil +} diff --git a/pkg/storage/unified/resource/bleve_index_metrics.go b/pkg/storage/unified/resource/bleve_index_metrics.go index 3f130eb0b1b..b5447b10062 100644 --- a/pkg/storage/unified/resource/bleve_index_metrics.go +++ b/pkg/storage/unified/resource/bleve_index_metrics.go @@ -9,14 +9,17 @@ import ( ) type BleveIndexMetrics struct { - IndexLatency *prometheus.HistogramVec - IndexSize prometheus.Gauge - IndexedKinds *prometheus.GaugeVec - IndexCreationTime *prometheus.HistogramVec - OpenIndexes *prometheus.GaugeVec - IndexBuilds *prometheus.CounterVec - IndexBuildFailures prometheus.Counter - IndexBuildSkipped prometheus.Counter + IndexLatency *prometheus.HistogramVec + IndexSize prometheus.Gauge + IndexedKinds *prometheus.GaugeVec + IndexCreationTime *prometheus.HistogramVec + OpenIndexes *prometheus.GaugeVec + IndexBuilds *prometheus.CounterVec + IndexBuildFailures prometheus.Counter + IndexBuildSkipped prometheus.Counter + UpdateLatency prometheus.Histogram + UpdatedDocuments prometheus.Summary + SearchUpdateWaitTime *prometheus.HistogramVec } var IndexCreationBuckets = []float64{1, 5, 10, 25, 50, 75, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000} @@ -63,6 +66,24 @@ func ProvideIndexMetrics(reg prometheus.Registerer) *BleveIndexMetrics { Name: "index_server_index_build_skipped_total", Help: "Number of times index build has been skipped due to existing valid index being found on disk", }), + UpdateLatency: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Name: "index_server_update_latency_seconds", + Help: "Time to execute index update with latest modifications", + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 160, + NativeHistogramMinResetDuration: time.Hour, + }), + UpdatedDocuments: promauto.With(reg).NewSummary(prometheus.SummaryOpts{ + Name: "index_server_update_documents_total", + Help: "Number of documents indexed during index update", + }), + SearchUpdateWaitTime: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "index_server_search_update_wait_time_seconds", + Help: "Time spent waiting for index update during search queries", + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 160, + NativeHistogramMinResetDuration: time.Hour, + }, []string{"reason"}), } // Initialize labels. diff --git a/pkg/storage/unified/resource/search.go b/pkg/storage/unified/resource/search.go index f7b822df600..f51f6aaa2b7 100644 --- a/pkg/storage/unified/resource/search.go +++ b/pkg/storage/unified/resource/search.go @@ -79,8 +79,17 @@ type ResourceIndex interface { // Get the number of documents in the index DocCount(ctx context.Context, folder string) (int64, error) + + // UpdateIndex updates the index with the latest data (using update function provided when index was built) to guarantee strong consistency during the search. + // Returns RV to which index was updated. + UpdateIndex(ctx context.Context, reason string) (int64, error) } +type BuildFn func(index ResourceIndex) (int64, error) + +// UpdateFn is responsible for updating index with changes since given RV. It should return new RV (to be used as next sinceRV), number of updated documents and error, if any. +type UpdateFn func(context context.Context, index ResourceIndex, sinceRV int64) (newRV int64, updatedDocs int, _ error) + // SearchBackend contains the technology specific logic to support search type SearchBackend interface { // GetIndex returns existing index, or nil. @@ -90,7 +99,17 @@ type SearchBackend interface { // Depending on the size, the backend may choose different options (eg: memory vs disk). // The last known resource version can be used to detect that nothing has changed, and existing on-disk index can be reused. // The builder will write all documents before returning. - BuildIndex(ctx context.Context, key NamespacedResource, size int64, resourceVersion int64, nonStandardFields SearchableDocumentFields, indexBuildReason string, builder func(index ResourceIndex) (int64, error)) (ResourceIndex, error) + // Updater function is used to update the index before performing the search. + BuildIndex( + ctx context.Context, + key NamespacedResource, + size int64, + resourceVersion int64, + nonStandardFields SearchableDocumentFields, + indexBuildReason string, + builder BuildFn, + updater UpdateFn, + ) (ResourceIndex, error) // TotalDocs returns the total number of documents across all indexes. TotalDocs() int64 @@ -644,61 +663,73 @@ func (s *searchSupport) getOrCreateIndex(ctx context.Context, key NamespacedReso return nil, err } - if idx != nil { - return idx, nil - } + if idx == nil { + ch := s.buildIndex.DoChan(key.String(), func() (interface{}, error) { + // We want to finish building of the index even if original context is canceled. + // We reuse original context without cancel to keep the tracing spans correct. + ctx := context.WithoutCancel(ctx) - ch := s.buildIndex.DoChan(key.String(), func() (interface{}, error) { - // We want to finish building of the index even if original context is canceled. - // We reuse original context without cancel to keep the tracing spans correct. - ctx := context.WithoutCancel(ctx) - - // Recheck if some other goroutine managed to build an index in the meantime. - // (That is, it finished running this function and stored the index into the cache) - idx, err := s.search.GetIndex(ctx, key) - if err == nil && idx != nil { - return idx, nil - } - - // Get correct value of size + RV for building the index. This is important for our Bleve - // backend to decide whether to build index in-memory or as file-based. - stats, err := s.storage.GetResourceStats(ctx, key.Namespace, 0) - if err != nil { - return nil, fmt.Errorf("failed to get resource stats: %w", err) - } - - size := int64(0) - rv := int64(0) - for _, stat := range stats { - if stat.Namespace == key.Namespace && stat.Group == key.Group && stat.Resource == key.Resource { - size = stat.Count - rv = stat.ResourceVersion - break + // Recheck if some other goroutine managed to build an index in the meantime. + // (That is, it finished running this function and stored the index into the cache) + idx, err := s.search.GetIndex(ctx, key) + if err == nil && idx != nil { + return idx, nil } - } - idx, _, err = s.build(ctx, key, size, rv, reason) - if err != nil { - return nil, fmt.Errorf("error building search index, %w", err) - } - if idx == nil { - return nil, fmt.Errorf("nil index after build") - } - return idx, nil - }) + // Get correct value of size + RV for building the index. This is important for our Bleve + // backend to decide whether to build index in-memory or as file-based. + stats, err := s.storage.GetResourceStats(ctx, key.Namespace, 0) + if err != nil { + return nil, fmt.Errorf("failed to get resource stats: %w", err) + } - select { - case res := <-ch: - if res.Err != nil { - return nil, res.Err + size := int64(0) + rv := int64(0) + for _, stat := range stats { + if stat.Namespace == key.Namespace && stat.Group == key.Group && stat.Resource == key.Resource { + size = stat.Count + rv = stat.ResourceVersion + break + } + } + + idx, _, err = s.build(ctx, key, size, rv, reason) + if err != nil { + return nil, fmt.Errorf("error building search index, %w", err) + } + if idx == nil { + return nil, fmt.Errorf("nil index after build") + } + return idx, nil + }) + + select { + case res := <-ch: + if res.Err != nil { + return nil, res.Err + } + idx = res.Val.(ResourceIndex) + case <-ctx.Done(): + return nil, fmt.Errorf("failed to get index: %w", ctx.Err()) } - return res.Val.(ResourceIndex), nil - case <-ctx.Done(): - return nil, fmt.Errorf("failed to get index: %w", ctx.Err()) } + + if s.searchAfterWrite { + start := time.Now() + _, err := idx.UpdateIndex(ctx, reason) + if err != nil { + return nil, fmt.Errorf("failed to update index to guarantee strong consistency: %w", err) + } + elapsed := time.Since(start) + if s.indexMetrics != nil { + s.indexMetrics.SearchUpdateWaitTime.WithLabelValues(reason).Observe(elapsed.Seconds()) + } + } + + return idx, nil } -func (s *searchSupport) build(ctx context.Context, nsr NamespacedResource, size int64, rv int64, indexBuildReason string) (ResourceIndex, int64, error) { +func (s *searchSupport) build(ctx context.Context, nsr NamespacedResource, size int64, documentStatsRV int64, indexBuildReason string) (ResourceIndex, int64, error) { ctx, span := s.tracer.Start(ctx, tracingPrexfixSearch+"Build") defer span.End() @@ -707,7 +738,7 @@ func (s *searchSupport) build(ctx context.Context, nsr NamespacedResource, size attribute.String("group", nsr.Group), attribute.String("resource", nsr.Resource), attribute.Int64("size", size), - attribute.Int64("rv", rv), + attribute.Int64("rv", documentStatsRV), ) logger := s.log.With("namespace", nsr.Namespace, "group", nsr.Group, "resource", nsr.Resource) @@ -718,9 +749,9 @@ func (s *searchSupport) build(ctx context.Context, nsr NamespacedResource, size } fields := s.builders.GetFields(nsr) - index, err := s.search.BuildIndex(ctx, nsr, size, rv, fields, indexBuildReason, func(index ResourceIndex) (int64, error) { + builderFn := func(index ResourceIndex) (int64, error) { span := trace.SpanFromContext(ctx) - span.AddEvent("building index", trace.WithAttributes(attribute.Int64("size", size), attribute.Int64("rv", rv), attribute.String("reason", indexBuildReason))) + span.AddEvent("building index", trace.WithAttributes(attribute.Int64("size", size), attribute.Int64("rv", documentStatsRV), attribute.String("reason", indexBuildReason))) listRV, err := s.storage.ListIterator(ctx, &resourcepb.ListRequest{ Limit: 1000000000000, // big number @@ -768,13 +799,10 @@ func (s *searchSupport) build(ctx context.Context, nsr NamespacedResource, size // When we reach the batch size, perform bulk index and reset the batch. if len(items) >= maxBatchSize { span.AddEvent("bulk indexing", trace.WithAttributes(attribute.Int("count", len(items)))) - if err = index.BulkIndex(&BulkIndexRequest{ - Items: items, - }); err != nil { + if err = index.BulkIndex(&BulkIndexRequest{Items: items}); err != nil { return err } - // Reset the slice for the next batch while preserving capacity. items = items[:0] } } @@ -782,16 +810,94 @@ func (s *searchSupport) build(ctx context.Context, nsr NamespacedResource, size // Index any remaining items in the final batch. if len(items) > 0 { span.AddEvent("bulk indexing", trace.WithAttributes(attribute.Int("count", len(items)))) - if err = index.BulkIndex(&BulkIndexRequest{ - Items: items, - }); err != nil { + if err = index.BulkIndex(&BulkIndexRequest{Items: items}); err != nil { return err } } return iter.Error() }) return listRV, err - }) + } + + updaterFn := func(ctx context.Context, index ResourceIndex, sinceRV int64) (int64, int, error) { + span := trace.SpanFromContext(ctx) + span.AddEvent("updating index", trace.WithAttributes(attribute.Int64("sinceRV", documentStatsRV))) + + rv, it := s.storage.ListModifiedSince(ctx, NamespacedResource{ + Group: nsr.Group, + Resource: nsr.Resource, + Namespace: nsr.Namespace, + }, sinceRV) + + // Process documents in batches to avoid memory issues + // When dealing with large collections (e.g., 100k+ documents), + // loading all documents into memory at once can cause OOM errors. + items := make([]*BulkIndexItem, 0, maxBatchSize) + + docs := 0 + for res, err := range it { + // Finish quickly if context is done. + if ctx.Err() != nil { + return 0, 0, ctx.Err() + } + + docs++ + + if err != nil { + span.RecordError(err) + return 0, 0, err + } + + key := &res.Key + switch res.Action { + case resourcepb.WatchEvent_ADDED, resourcepb.WatchEvent_MODIFIED: + span.AddEvent("building document", trace.WithAttributes(attribute.String("name", res.Key.Name))) + // Convert it to an indexable document + doc, err := builder.BuildDocument(ctx, key, res.ResourceVersion, res.Value) + if err != nil { + span.RecordError(err) + logger.Error("error building search document", "key", SearchID(key), "err", err) + continue + } + + items = append(items, &BulkIndexItem{ + Action: ActionIndex, + Doc: doc, + }) + case resourcepb.WatchEvent_DELETED: + span.AddEvent("deleting document", trace.WithAttributes(attribute.String("name", res.Key.Name))) + items = append(items, &BulkIndexItem{ + Action: ActionDelete, + Key: &res.Key, + }) + default: + logger.Error("can't update index with item, unknown action", "action", res.Action, "key", key) + continue + } + + // When we reach the batch size, perform bulk index and reset the batch. + if len(items) >= maxBatchSize { + span.AddEvent("bulk indexing", trace.WithAttributes(attribute.Int("count", len(items)))) + if err = index.BulkIndex(&BulkIndexRequest{Items: items}); err != nil { + return 0, 0, err + } + + items = items[:0] + } + } + + // Index any remaining items in the final batch. + if len(items) > 0 { + span.AddEvent("bulk indexing", trace.WithAttributes(attribute.Int("count", len(items)))) + if err = index.BulkIndex(&BulkIndexRequest{Items: items}); err != nil { + return 0, 0, err + } + } + + return rv, docs, nil + } + + index, err := s.search.BuildIndex(ctx, nsr, size, documentStatsRV, fields, indexBuildReason, builderFn, updaterFn) if err != nil { return nil, 0, err @@ -807,7 +913,7 @@ func (s *searchSupport) build(ctx context.Context, nsr NamespacedResource, size } // rv is the last RV we read. when watching, we must add all events since that time - return index, rv, err + return index, documentStatsRV, err } // buildEmptyIndex creates an empty index without adding any documents @@ -822,6 +928,9 @@ func (s *searchSupport) buildEmptyIndex(ctx context.Context, nsr NamespacedResou return s.search.BuildIndex(ctx, nsr, 0, rv, fields, "empty", func(index ResourceIndex) (int64, error) { // Return the resource version without adding any documents to the index return 0, nil + }, func(context context.Context, index ResourceIndex, sinceRV int64) (int64, int, error) { + // No update is performed. + return 0, 0, nil }) } diff --git a/pkg/storage/unified/resource/search_test.go b/pkg/storage/unified/resource/search_test.go index f1e005ade86..45c32f2e462 100644 --- a/pkg/storage/unified/resource/search_test.go +++ b/pkg/storage/unified/resource/search_test.go @@ -23,6 +23,11 @@ var _ ResourceIndex = &MockResourceIndex{} // Mock implementations type MockResourceIndex struct { mock.Mock + + updateIndexError error + + updateIndexMu sync.Mutex + updateIndexCalls []string } func (m *MockResourceIndex) BulkIndex(req *BulkIndexRequest) error { @@ -50,6 +55,14 @@ func (m *MockResourceIndex) ListManagedObjects(ctx context.Context, req *resourc return args.Get(0).(*resourcepb.ListManagedObjectsResponse), args.Error(1) } +func (m *MockResourceIndex) UpdateIndex(ctx context.Context, reason string) (int64, error) { + m.updateIndexMu.Lock() + defer m.updateIndexMu.Unlock() + + m.updateIndexCalls = append(m.updateIndexCalls, reason) + return 0, m.updateIndexError +} + var _ DocumentBuilder = &MockDocumentBuilder{} type MockDocumentBuilder struct { @@ -111,6 +124,7 @@ type mockSearchBackend struct { mu sync.Mutex buildIndexCalls []buildIndexCall buildEmptyIndexCalls []buildEmptyIndexCall + cache map[NamespacedResource]ResourceIndex } type buildIndexCall struct { @@ -128,10 +142,12 @@ type buildEmptyIndexCall struct { } func (m *mockSearchBackend) GetIndex(ctx context.Context, key NamespacedResource) (ResourceIndex, error) { - return nil, nil + m.mu.Lock() + defer m.mu.Unlock() + return m.cache[key], nil } -func (m *mockSearchBackend) BuildIndex(ctx context.Context, key NamespacedResource, size int64, resourceVersion int64, fields SearchableDocumentFields, reason string, builder func(index ResourceIndex) (int64, error)) (ResourceIndex, error) { +func (m *mockSearchBackend) BuildIndex(ctx context.Context, key NamespacedResource, size int64, resourceVersion int64, fields SearchableDocumentFields, reason string, builder BuildFn, updater UpdateFn) (ResourceIndex, error) { index := &MockResourceIndex{} index.On("BulkIndex", mock.Anything).Return(nil).Maybe() index.On("DocCount", mock.Anything, mock.Anything).Return(int64(0), nil).Maybe() @@ -145,6 +161,11 @@ func (m *mockSearchBackend) BuildIndex(ctx context.Context, key NamespacedResour m.mu.Lock() defer m.mu.Unlock() + if m.cache == nil { + m.cache = make(map[NamespacedResource]ResourceIndex) + } + m.cache[key] = index + // Determine if this is an empty index based on size // Empty indexes are characterized by size == 0 if size == 0 { @@ -341,6 +362,72 @@ func TestSearchGetOrCreateIndex(t *testing.T) { require.Less(t, len(search.buildIndexCalls), concurrency, "Should not have built index more than a few times (ideally once)") require.Equal(t, int64(50), search.buildIndexCalls[0].size) require.Equal(t, int64(11111111), search.buildIndexCalls[0].resourceVersion) + + // Verify that UpdateIndex was not called at all, since searchAfterWrite is not enabled. + idx, err := support.getOrCreateIndex(context.Background(), NamespacedResource{Namespace: "ns", Group: "group", Resource: "resource"}, "test") + require.NoError(t, err) + checkMockIndexUpdateCalls(t, idx, nil) +} + +func TestSearchGetOrCreateIndexWithIndexUpdate(t *testing.T) { + // Setup mock implementations + storage := &mockStorageBackend{ + resourceStats: []ResourceStats{ + {NamespacedResource: NamespacedResource{Namespace: "ns", Group: "group", Resource: "resource"}, Count: 50, ResourceVersion: 11111111}, + }, + } + failedErr := fmt.Errorf("failed to update index") + search := &mockSearchBackend{ + buildIndexCalls: []buildIndexCall{}, + buildEmptyIndexCalls: []buildEmptyIndexCall{}, + + cache: map[NamespacedResource]ResourceIndex{ + NamespacedResource{Namespace: "ns", Group: "group", Resource: "bad"}: &MockResourceIndex{ + updateIndexError: failedErr, + }, + }, + } + supplier := &TestDocumentBuilderSupplier{ + GroupsResources: map[string]string{ + "group": "resource", + }, + } + + // Create search support with the specified initMaxSize + opts := SearchOptions{ + Backend: search, + Resources: supplier, + WorkerThreads: 1, + InitMinCount: 1, // set min count to default for this test + InitMaxCount: 0, + } + + // Enable searchAfterWrite + support, err := newSearchSupport(opts, storage, nil, nil, noop.NewTracerProvider().Tracer("test"), nil, nil, nil, true) + require.NoError(t, err) + require.NotNil(t, support) + + idx, err := support.getOrCreateIndex(context.Background(), NamespacedResource{Namespace: "ns", Group: "group", Resource: "resource"}, "initial call") + require.NoError(t, err) + require.NotNil(t, idx) + checkMockIndexUpdateCalls(t, idx, []string{"initial call"}) + + idx, err = support.getOrCreateIndex(context.Background(), NamespacedResource{Namespace: "ns", Group: "group", Resource: "resource"}, "second call") + require.NoError(t, err) + require.NotNil(t, idx) + checkMockIndexUpdateCalls(t, idx, []string{"initial call", "second call"}) + + idx, err = support.getOrCreateIndex(context.Background(), NamespacedResource{Namespace: "ns", Group: "group", Resource: "bad"}, "call to bad index") + require.ErrorIs(t, err, failedErr) + require.Nil(t, idx) +} + +func checkMockIndexUpdateCalls(t *testing.T, idx ResourceIndex, strings []string) { + mi, ok := idx.(*MockResourceIndex) + require.True(t, ok) + mi.updateIndexMu.Lock() + defer mi.updateIndexMu.Unlock() + require.Equal(t, strings, mi.updateIndexCalls) } func TestSearchGetOrCreateIndexWithCancellation(t *testing.T) { @@ -443,9 +530,6 @@ func TestSearchWillUpdateIndexOnQueueProcessor(t *testing.T) { type slowSearchBackendWithCache struct { mockSearchBackend wg sync.WaitGroup - - mu sync.Mutex - cache map[NamespacedResource]ResourceIndex } func (m *slowSearchBackendWithCache) GetIndex(ctx context.Context, key NamespacedResource) (ResourceIndex, error) { @@ -454,7 +538,7 @@ func (m *slowSearchBackendWithCache) GetIndex(ctx context.Context, key Namespace return m.cache[key], nil } -func (m *slowSearchBackendWithCache) BuildIndex(ctx context.Context, key NamespacedResource, size int64, resourceVersion int64, fields SearchableDocumentFields, reason string, builder func(index ResourceIndex) (int64, error)) (ResourceIndex, error) { +func (m *slowSearchBackendWithCache) BuildIndex(ctx context.Context, key NamespacedResource, size int64, resourceVersion int64, fields SearchableDocumentFields, reason string, builder BuildFn, updater UpdateFn) (ResourceIndex, error) { m.wg.Add(1) defer m.wg.Done() @@ -464,17 +548,9 @@ func (m *slowSearchBackendWithCache) BuildIndex(ctx context.Context, key Namespa if ctx.Err() != nil { return nil, ctx.Err() } - idx, err := m.mockSearchBackend.BuildIndex(ctx, key, size, resourceVersion, fields, reason, builder) + idx, err := m.mockSearchBackend.BuildIndex(ctx, key, size, resourceVersion, fields, reason, builder, updater) if err != nil { return nil, err } - - m.mu.Lock() - defer m.mu.Unlock() - - if m.cache == nil { - m.cache = make(map[NamespacedResource]ResourceIndex) - } - m.cache[key] = idx return idx, nil } diff --git a/pkg/storage/unified/search/bleve.go b/pkg/storage/unified/search/bleve.go index 347ee8103aa..1bb6953a527 100644 --- a/pkg/storage/unified/search/bleve.go +++ b/pkg/storage/unified/search/bleve.go @@ -24,6 +24,7 @@ import ( "github.com/blevesearch/bleve/v2/search/query" bleveSearch "github.com/blevesearch/bleve/v2/search/searcher" index "github.com/blevesearch/bleve_index_api" + "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "k8s.io/apimachinery/pkg/selection" @@ -145,7 +146,7 @@ func (b *bleveBackend) getCachedIndex(key resource.NamespacedResource) *bleveInd } // Index is no longer in the cache, but we need to close it. - err := val.index.Close() + err := val.stopUpdaterAndCloseIndex() if err != nil { b.log.Error("failed to close index", "key", key, "err", err) } @@ -205,7 +206,8 @@ func (b *bleveBackend) BuildIndex( resourceVersion int64, fields resource.SearchableDocumentFields, indexBuildReason string, - builder func(index resource.ResourceIndex) (int64, error), + builder resource.BuildFn, + updater resource.UpdateFn, ) (resource.ResourceIndex, error) { _, span := b.tracer.Start(ctx, tracingPrexfixBleve+"BuildIndex") defer span.End() @@ -261,7 +263,7 @@ func (b *bleveBackend) BuildIndex( newIndexType := indexStorageMemory build := true - if size > b.opts.FileThreshold { + if size >= b.opts.FileThreshold { newIndexType = indexStorageFile // We only check for the existing file-based index if we don't already have an open index for this key. @@ -312,16 +314,7 @@ func (b *bleveBackend) BuildIndex( } // Batch all the changes - idx := &bleveIndex{ - key: key, - index: index, - indexStorage: newIndexType, - fields: fields, - allFields: allFields, - standard: standardSearchFields, - features: b.features, - tracing: b.tracer, - } + idx := b.newBleveIndex(key, index, newIndexType, fields, allFields, standardSearchFields, updater, b.log.With("namespace", key.Namespace, "group", key.Group, "resource", key.Resource)) if build { if b.indexMetrics != nil { @@ -339,11 +332,12 @@ func (b *bleveBackend) BuildIndex( } err = idx.updateResourceVersion(listRV) if err != nil { - return nil, fmt.Errorf("fail to persist rv to index: %w", err) + logWithDetails.Error("Failed to persist RV to index", "err", err, "rv", listRV) + return nil, fmt.Errorf("failed to persist RV to index: %w", err) } elapsed := time.Since(start) - logWithDetails.Info("Finished building index", "elapsed", elapsed) + logWithDetails.Info("Finished building index", "elapsed", elapsed, "listRV", listRV) if b.indexMetrics != nil { b.indexMetrics.IndexCreationTime.WithLabelValues().Observe(elapsed.Seconds()) @@ -351,10 +345,7 @@ func (b *bleveBackend) BuildIndex( } else { logWithDetails.Info("Skipping index build, using existing index") - idx.resourceVersion, err = getRV(index) - if err != nil { - return nil, fmt.Errorf("failed to get RV from bleve index: %w", err) - } + idx.resourceVersion = indexRV if b.indexMetrics != nil { b.indexMetrics.IndexBuildSkipped.Inc() @@ -387,7 +378,7 @@ func (b *bleveBackend) BuildIndex( b.indexMetrics.OpenIndexes.WithLabelValues(prev.indexStorage).Dec() } - err := prev.index.Close() + err := prev.stopUpdaterAndCloseIndex() if err != nil { logWithDetails.Error("failed to close previous index", "key", key, "err", err) } @@ -555,7 +546,9 @@ func (b *bleveBackend) CloseAllIndexes() { defer b.cacheMx.Unlock() for key, idx := range b.cache { - _ = idx.index.Close() + if err := idx.stopUpdaterAndCloseIndex(); err != nil { + b.log.Error("Failed to close index", "err", err) + } delete(b.cache, key) if b.indexMetrics != nil { @@ -564,10 +557,21 @@ func (b *bleveBackend) CloseAllIndexes() { } } +type updateRequest struct { + reason string + callback chan updateResult +} + +type updateResult struct { + rv int64 + err error +} + type bleveIndex struct { key resource.NamespacedResource index bleve.Index + // RV returned by last List/ListModifiedSince operation. Updated when updating index. resourceVersion int64 standard resource.SearchableDocumentFields @@ -583,6 +587,49 @@ type bleveIndex struct { allFields []*resourcepb.ResourceTableColumnDefinition features featuremgmt.FeatureToggles tracing trace.Tracer + logger *slog.Logger + + updaterFn resource.UpdateFn + + updaterMu sync.Mutex + updaterCond *sync.Cond // Used to signal the updater goroutine that there is work to do, or updater is no longer enabled and should stop. Also used by updater itself to stop early if there's no work to be done. + updaterShutdown bool // When set to true, index is getting closed and updater is no longer going to update index. + updaterQueue []updateRequest // Queue of requests for next updater iteration. + updaterCancel context.CancelFunc // If not nil, the updater goroutine is running with context associated with this cancel function. + updaterWg sync.WaitGroup + + updateLatency prometheus.Histogram + updatedDocuments prometheus.Summary +} + +func (b *bleveBackend) newBleveIndex( + key resource.NamespacedResource, + index bleve.Index, + newIndexType string, + fields resource.SearchableDocumentFields, + allFields []*resourcepb.ResourceTableColumnDefinition, + standardSearchFields resource.SearchableDocumentFields, + updaterFn resource.UpdateFn, + logger *slog.Logger, +) *bleveIndex { + bi := &bleveIndex{ + key: key, + index: index, + indexStorage: newIndexType, + fields: fields, + allFields: allFields, + standard: standardSearchFields, + features: b.features, + tracing: b.tracer, + logger: logger, + updaterFn: updaterFn, + } + bi.updaterCond = sync.NewCond(&bi.updaterMu) + if b.indexMetrics != nil { + bi.updateLatency = b.indexMetrics.UpdateLatency + bi.updatedDocuments = b.indexMetrics.UpdatedDocuments + } + return bi } // BulkIndex implements resource.ResourceIndex. @@ -1094,6 +1141,154 @@ func (b *bleveIndex) toBleveSearchRequest(ctx context.Context, req *resourcepb.R return searchrequest, nil } +func (b *bleveIndex) stopUpdaterAndCloseIndex() error { + // Signal updater to stop. We do this by 1) setting updaterShuttingDown + sending signal, and by 2) calling cancel. + b.updaterMu.Lock() + b.updaterShutdown = true + b.updaterCond.Broadcast() + // if updater is running, cancel it. (Setting to nil is only done from updater itself in defer.) + if b.updaterCancel != nil { + b.updaterCancel() + } + b.updaterMu.Unlock() + + b.updaterWg.Wait() + // Close index only after updater is not working on it anymore. + return b.index.Close() +} + +func (b *bleveIndex) UpdateIndex(ctx context.Context, reason string) (int64, error) { + // We don't have to do anything if the index cannot be updated (typically in tests). + if b.updaterFn == nil { + return 0, nil + } + + // Use chan with buffer size 1 to ensure that we can always send the result back, even if there's no reader anymore. + req := updateRequest{reason: reason, callback: make(chan updateResult, 1)} + + // Make sure that the updater goroutine is running. + b.updaterMu.Lock() + if b.updaterShutdown { + b.updaterMu.Unlock() + return 0, fmt.Errorf("cannot update index: %w", bleve.ErrorIndexClosed) + } + + b.updaterQueue = append(b.updaterQueue, req) + + // If updater is not running, start it. + if b.updaterCancel == nil { + b.startUpdater() + } + b.updaterCond.Broadcast() // If updater is waiting for next batch, wake it up. + b.updaterMu.Unlock() + + // wait for the update to finish + select { + case <-ctx.Done(): + return 0, ctx.Err() + case ur := <-req.callback: + return ur.rv, ur.err + } +} + +// Must be called with b.updaterMu lock held. +func (b *bleveIndex) startUpdater() { + c, cancel := context.WithCancel(context.Background()) + b.updaterCancel = cancel + b.updaterWg.Add(1) + + go func() { + defer func() { + cancel() // Make sure to call this to release resources. + + b.updaterMu.Lock() + b.updaterCancel = nil + b.updaterMu.Unlock() + + b.updaterWg.Done() + }() + + b.runUpdater(c) + }() +} + +const maxWait = 5 * time.Second + +func (b *bleveIndex) runUpdater(ctx context.Context) { + for { + start := time.Now() + t := time.AfterFunc(maxWait, b.updaterCond.Broadcast) + + b.updaterMu.Lock() + for !b.updaterShutdown && ctx.Err() == nil && len(b.updaterQueue) == 0 && time.Since(start) < maxWait { + // Cond is signalled when updaterShutdown changes, updaterQueue gets new element or when timeout occurs. + b.updaterCond.Wait() + } + + shutdown := b.updaterShutdown + batch := b.updaterQueue + b.updaterQueue = nil // empty the queue for the next batch + b.updaterMu.Unlock() + + t.Stop() + + // Nothing to index after maxWait, exit the goroutine. + if len(batch) == 0 { + return + } + + if shutdown { + for _, req := range batch { + req.callback <- updateResult{err: fmt.Errorf("cannot update index: %w", bleve.ErrorIndexClosed)} + } + return + } + + // Build reasons map + reasons := map[string]int{} + for _, req := range batch { + reasons[req.reason]++ + } + + var rv int64 + var err = ctx.Err() + if err == nil { + rv, err = b.updateIndexWithLatestModifications(ctx, len(batch), reasons) + } + for _, req := range batch { + req.callback <- updateResult{rv: rv, err: err} + } + } +} + +func (b *bleveIndex) updateIndexWithLatestModifications(ctx context.Context, requests int, reasons map[string]int) (int64, error) { + ctx, span := b.tracing.Start(ctx, tracingPrexfixBleve+"updateIndexWithLatestModifications") + defer span.End() + + b.logger.Debug("Updating index", "sinceRV", b.resourceVersion, "requests", requests, "reasons", reasons) + + startTime := time.Now() + rv, docs, err := b.updaterFn(ctx, b, b.resourceVersion) + if err == nil && rv > 0 { + err = b.updateResourceVersion(rv) + } + + elapsed := time.Since(startTime) + if err == nil { + b.logger.Debug("Finished updating index", "listRV", b.resourceVersion, "duration", elapsed, "docs", docs) + + if b.updateLatency != nil { + b.updateLatency.Observe(elapsed.Seconds()) + } + if b.updatedDocuments != nil { + b.updatedDocuments.Observe(float64(docs)) + } + } else { + b.logger.Debug("Updating of index finished with error", "duration", elapsed, "err", err) + } + return rv, err +} + func safeInt64ToInt(i64 int64) (int, error) { if i64 > math.MaxInt32 || i64 < math.MinInt32 { return 0, fmt.Errorf("int64 value %d overflows int", i64) diff --git a/pkg/storage/unified/search/bleve_performance_test.go b/pkg/storage/unified/search/bleve_performance_test.go index d6cb840647e..f54298719d9 100644 --- a/pkg/storage/unified/search/bleve_performance_test.go +++ b/pkg/storage/unified/search/bleve_performance_test.go @@ -79,7 +79,7 @@ func BenchmarkBleveQuery(b *testing.B) { } } -func newTestWriter(size int, batchSize int) IndexWriter { +func newTestWriter(size int, batchSize int) resource.BuildFn { key := &resourcepb.ResourceKey{ Namespace: "default", Group: "dashboard.grafana.app", diff --git a/pkg/storage/unified/search/bleve_search_test.go b/pkg/storage/unified/search/bleve_search_test.go index 865f570389a..b61cd36111f 100644 --- a/pkg/storage/unified/search/bleve_search_test.go +++ b/pkg/storage/unified/search/bleve_search_test.go @@ -519,7 +519,7 @@ func newQueryByTitle(query string) *resourcepb.ResourceSearchRequest { } } -func newTestDashboardsIndex(t testing.TB, threshold int64, size int64, batchSize int64, writer IndexWriter) resource.ResourceIndex { +func newTestDashboardsIndex(t testing.TB, threshold int64, size int64, batchSize int64, writer resource.BuildFn) resource.ResourceIndex { key := &resourcepb.ResourceKey{ Namespace: "default", Group: "dashboard.grafana.app", @@ -551,15 +551,13 @@ func newTestDashboardsIndex(t testing.TB, threshold int64, size int64, batchSize Namespace: key.Namespace, Group: key.Group, Resource: key.Resource, - }, size, rv, info.Fields, "test", writer) + }, size, rv, info.Fields, "test", writer, nil) require.NoError(t, err) return index } -type IndexWriter func(index resource.ResourceIndex) (int64, error) - -var noop IndexWriter = func(index resource.ResourceIndex) (int64, error) { +var noop resource.BuildFn = func(index resource.ResourceIndex) (int64, error) { return 0, nil } diff --git a/pkg/storage/unified/search/bleve_test.go b/pkg/storage/unified/search/bleve_test.go index fbec10dc9f2..f48b74a5b7f 100644 --- a/pkg/storage/unified/search/bleve_test.go +++ b/pkg/storage/unified/search/bleve_test.go @@ -4,20 +4,23 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "math" "os" "path/filepath" + "sync" "testing" "time" "github.com/blevesearch/bleve/v2" + authlib "github.com/grafana/authlib/types" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - - authlib "github.com/grafana/authlib/types" + "go.uber.org/atomic" + "go.uber.org/goleak" "github.com/grafana/grafana/pkg/apimachinery/identity" "github.com/grafana/grafana/pkg/apimachinery/utils" @@ -30,6 +33,17 @@ import ( "github.com/grafana/grafana/pkg/storage/unified/resourcepb" ) +// This verifies that we close all indexes properly and shutdown all background goroutines from our tests. +// (Except for goroutines running specific functions. If possible we should fix this, esp. our own updateIndexSizeMetric.) +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m, + goleak.IgnoreTopFunction("github.com/open-feature/go-sdk/openfeature.(*eventExecutor).startEventListener.func1.1"), + goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + goleak.IgnoreTopFunction("github.com/blevesearch/bleve_index_api.AnalysisWorker"), // These don't stop when index is closed. + goleak.IgnoreAnyFunction("github.com/grafana/grafana/pkg/storage/unified/search.(*bleveBackend).updateIndexSizeMetric"), // We don't have a way to stop this one yet. + ) +} + func TestBleveBackend(t *testing.T) { dashboardskey := &resourcepb.ResourceKey{ Namespace: "default", @@ -176,7 +190,7 @@ func TestBleveBackend(t *testing.T) { return 0, err } return rv, nil - }) + }, nil) require.NoError(t, err) require.NotNil(t, index) dashboardsIndex = index @@ -405,7 +419,7 @@ func TestBleveBackend(t *testing.T) { return 0, err } return rv, nil - }) + }, nil) require.NoError(t, err) require.NotNil(t, index) foldersIndex = index @@ -768,7 +782,7 @@ func TestBleveInMemoryIndexExpiration(t *testing.T) { Resource: "resource", } - builtIndex, err := backend.BuildIndex(context.Background(), ns, 1 /* below FileThreshold */, 100, nil, "test", indexTestDocs(ns, 1, 100)) + builtIndex, err := backend.BuildIndex(context.Background(), ns, 1 /* below FileThreshold */, 100, nil, "test", indexTestDocs(ns, 1, 100), nil) require.NoError(t, err) // Wait for index expiration, which is 1ns @@ -800,7 +814,7 @@ func TestBleveFileIndexExpiration(t *testing.T) { } // size=100 is above FileThreshold, this will be file-based index - builtIndex, err := backend.BuildIndex(context.Background(), ns, 100, 100, nil, "test", indexTestDocs(ns, 1, 100)) + builtIndex, err := backend.BuildIndex(context.Background(), ns, 100, 100, nil, "test", indexTestDocs(ns, 1, 100), nil) require.NoError(t, err) // Wait for index expiration, which is 1ns @@ -832,7 +846,7 @@ func TestFileIndexIsReusedOnSameSizeAndRVLessThanIndexRV(t *testing.T) { tmpDir := t.TempDir() backend1, reg1 := setupBleveBackend(t, 5, time.Nanosecond, tmpDir) - _, err := backend1.BuildIndex(context.Background(), ns, 10 /* file based */, 100, nil, "test", indexTestDocs(ns, 10, 100)) + _, err := backend1.BuildIndex(context.Background(), ns, 10 /* file based */, 100, nil, "test", indexTestDocs(ns, 10, 100), nil) require.NoError(t, err) // Verify one open index. @@ -855,7 +869,7 @@ func TestFileIndexIsReusedOnSameSizeAndRVLessThanIndexRV(t *testing.T) { // We open new backend using same directory, and run indexing with same size (10) and RV (100). This should reuse existing index, and skip indexing. backend2, reg2 := setupBleveBackend(t, 5, time.Nanosecond, tmpDir) - idx, err := backend2.BuildIndex(context.Background(), ns, 10 /* file based */, 100, nil, "test", indexTestDocs(ns, 1000, 100)) + idx, err := backend2.BuildIndex(context.Background(), ns, 10 /* file based */, 100, nil, "test", indexTestDocs(ns, 1000, 100), nil) require.NoError(t, err) // Verify that we're reusing existing index and there is only 10 documents in it, not 1000. @@ -881,7 +895,7 @@ func TestFileIndexIsReusedOnSameSizeAndRVLessThanIndexRV(t *testing.T) { // We repeat with backend3 and RV 99. This should also reuse existing index and skip indexing backend3, reg3 := setupBleveBackend(t, 5, time.Nanosecond, tmpDir) - idx, err = backend3.BuildIndex(context.Background(), ns, 10 /* file based */, 99, nil, "test", indexTestDocs(ns, 1000, 99)) + idx, err = backend3.BuildIndex(context.Background(), ns, 10 /* file based */, 99, nil, "test", indexTestDocs(ns, 1000, 99), nil) require.NoError(t, err) // Verify that we're reusing existing index and there is only 10 documents in it, not 1000. @@ -909,13 +923,13 @@ func TestFileIndexIsNotReusedOnDifferentSize(t *testing.T) { tmpDir := t.TempDir() backend1, _ := setupBleveBackend(t, 5, time.Nanosecond, tmpDir) - _, err := backend1.BuildIndex(context.Background(), ns, 10, 100, nil, "test", indexTestDocs(ns, 10, 100)) + _, err := backend1.BuildIndex(context.Background(), ns, 10, 100, nil, "test", indexTestDocs(ns, 10, 100), nil) require.NoError(t, err) backend1.CloseAllIndexes() // We open new backend using same directory, but with different size. Index should be rebuilt. backend2, _ := setupBleveBackend(t, 5, time.Nanosecond, tmpDir) - idx, err := backend2.BuildIndex(context.Background(), ns, 100, 100, nil, "test", indexTestDocs(ns, 100, 100)) + idx, err := backend2.BuildIndex(context.Background(), ns, 100, 100, nil, "test", indexTestDocs(ns, 100, 100), nil) require.NoError(t, err) // Verify that index has updated number of documents. @@ -934,13 +948,13 @@ func TestFileIndexIsNotReusedOnDifferentRV(t *testing.T) { tmpDir := t.TempDir() backend1, _ := setupBleveBackend(t, 5, time.Nanosecond, tmpDir) - _, err := backend1.BuildIndex(context.Background(), ns, 10, 100, nil, "test", indexTestDocs(ns, 10, 100)) + _, err := backend1.BuildIndex(context.Background(), ns, 10, 100, nil, "test", indexTestDocs(ns, 10, 100), nil) require.NoError(t, err) backend1.CloseAllIndexes() // We open new backend using same directory, but with different RV. Index should be rebuilt. backend2, _ := setupBleveBackend(t, 5, time.Nanosecond, tmpDir) - idx, err := backend2.BuildIndex(context.Background(), ns, 10 /* file based */, 999999, nil, "test", indexTestDocs(ns, 100, 999999)) + idx, err := backend2.BuildIndex(context.Background(), ns, 10 /* file based */, 999999, nil, "test", indexTestDocs(ns, 100, 999999), nil) require.NoError(t, err) // Verify that index has updated number of documents. @@ -972,7 +986,7 @@ func TestRebuildingIndexClosesPreviousCachedIndex(t *testing.T) { if testCase.firstInMemory { firstSize = 1 } - firstIndex, err := backend.BuildIndex(context.Background(), ns, int64(firstSize), 100, nil, "test", indexTestDocs(ns, firstSize, 100)) + firstIndex, err := backend.BuildIndex(context.Background(), ns, int64(firstSize), 100, nil, "test", indexTestDocs(ns, firstSize, 100), nil) require.NoError(t, err) if testCase.firstInMemory { @@ -988,7 +1002,7 @@ func TestRebuildingIndexClosesPreviousCachedIndex(t *testing.T) { secondSize = 1 openInMemoryIndexes = 1 } - secondIndex, err := backend.BuildIndex(context.Background(), ns, int64(secondSize), 100, nil, "test", indexTestDocs(ns, secondSize, 100)) + secondIndex, err := backend.BuildIndex(context.Background(), ns, int64(secondSize), 100, nil, "test", indexTestDocs(ns, secondSize, 100), nil) require.NoError(t, err) if testCase.secondInMemory { @@ -1030,7 +1044,7 @@ func verifyDirEntriesCount(t *testing.T, dir string, count int) { require.Len(t, ents, count) } -func indexTestDocs(ns resource.NamespacedResource, docs int, listRV int64) func(index resource.ResourceIndex) (int64, error) { +func indexTestDocs(ns resource.NamespacedResource, docs int, listRV int64) resource.BuildFn { return func(index resource.ResourceIndex) (int64, error) { var items []*resource.BulkIndexItem for i := 0; i < docs; i++ { @@ -1053,6 +1067,34 @@ func indexTestDocs(ns resource.NamespacedResource, docs int, listRV int64) func( } } +func updateTestDocs(ns resource.NamespacedResource, docs int) resource.UpdateFn { + cnt := 0 + + return func(context context.Context, index resource.ResourceIndex, sinceRV int64) (newRV int64, updatedDocs int, _ error) { + cnt++ + + var items []*resource.BulkIndexItem + for i := 0; i < docs; i++ { + items = append(items, &resource.BulkIndexItem{ + Action: resource.ActionIndex, + Doc: &resource.IndexableDocument{ + Key: &resourcepb.ResourceKey{ + Namespace: ns.Namespace, + Group: ns.Group, + Resource: ns.Resource, + Name: fmt.Sprintf("doc%d", i), + }, + Title: fmt.Sprintf("Document %d (gen_%d)", i, cnt), + }, + }) + } + + err := index.BulkIndex(&resource.BulkIndexRequest{Items: items}) + // Simulate RV increase + return sinceRV + int64(docs), docs, err + } +} + func TestCleanOldIndexes(t *testing.T) { dir := t.TempDir() @@ -1107,10 +1149,295 @@ func testBleveIndexWithFailures(t *testing.T, fileBased bool) { } _, err := backend.BuildIndex(context.Background(), ns, size, 100, nil, "test", func(index resource.ResourceIndex) (int64, error) { return 0, fmt.Errorf("fail") - }) + }, nil) require.Error(t, err) // Even though previous build of the index failed, new building of the index should work. - _, err = backend.BuildIndex(context.Background(), ns, size, 100, nil, "test", indexTestDocs(ns, int(size), 100)) + _, err = backend.BuildIndex(context.Background(), ns, size, 100, nil, "test", indexTestDocs(ns, int(size), 100), nil) require.NoError(t, err) } + +func TestIndexUpdate(t *testing.T) { + ns := resource.NamespacedResource{ + Namespace: "test", + Group: "group", + Resource: "resource", + } + + be, _ := setupBleveBackend(t, 5, 1*time.Minute, "") + idx, err := be.BuildIndex(t.Context(), ns, 10 /* file based */, 100, nil, "test", indexTestDocs(ns, 10, 100), updateTestDocs(ns, 5)) + require.NoError(t, err) + + resp := searchTitle(t, idx, "gen", 10, ns) + require.Equal(t, int64(0), resp.TotalHits) + + // Update index. + _, err = idx.UpdateIndex(context.Background(), "test") + require.NoError(t, err) + + // Verify that index was updated -- number of docs didn't change, but we can search "gen_1" documents now. + require.Equal(t, 10, docCount(t, idx)) + require.Equal(t, int64(5), searchTitle(t, idx, "gen_1", 10, ns).TotalHits) + + // Update index again. + _, err = idx.UpdateIndex(context.Background(), "test") + require.NoError(t, err) + // Verify that index was updated again -- we can search "gen_2" now. "gen_1" documents are gone. + require.Equal(t, 10, docCount(t, idx)) + require.Equal(t, int64(0), searchTitle(t, idx, "gen_1", 10, ns).TotalHits) + require.Equal(t, int64(5), searchTitle(t, idx, "gen_2", 10, ns).TotalHits) +} + +func TestConcurrentIndexUpdateAndBuildIndex(t *testing.T) { + ns := resource.NamespacedResource{ + Namespace: "test", + Group: "group", + Resource: "resource", + } + + be, _ := setupBleveBackend(t, 5, 1*time.Minute, "") + + updaterFn := func(context context.Context, index resource.ResourceIndex, sinceRV int64) (newRV int64, updatedDocs int, _ error) { + var items []*resource.BulkIndexItem + for i := 0; i < 5; i++ { + items = append(items, &resource.BulkIndexItem{ + Action: resource.ActionIndex, + Doc: &resource.IndexableDocument{ + Key: &resourcepb.ResourceKey{ + Namespace: ns.Namespace, + Group: ns.Group, + Resource: ns.Resource, + Name: fmt.Sprintf("doc%d", i), + }, + Title: fmt.Sprintf("Document %d (gen_%d)", i, 5), + }, + }) + } + + err := index.BulkIndex(&resource.BulkIndexRequest{Items: items}) + // Simulate RV increase + return sinceRV + int64(5), 5, err + } + + idx, err := be.BuildIndex(t.Context(), ns, 10 /* file based */, 100, nil, "test", indexTestDocs(ns, 10, 100), updaterFn) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + _, err = idx.UpdateIndex(ctx, "test") + require.NoError(t, err) + + _, err = be.BuildIndex(t.Context(), ns, 10 /* file based */, 100, nil, "test", indexTestDocs(ns, 10, 100), updaterFn) + require.NoError(t, err) + + _, err = idx.UpdateIndex(ctx, "test") + require.Contains(t, err.Error(), bleve.ErrorIndexClosed.Error()) +} + +func TestConcurrentIndexUpdateSearchAndRebuild(t *testing.T) { + ns := resource.NamespacedResource{ + Namespace: "test", + Group: "group", + Resource: "resource", + } + + be, _ := setupBleveBackend(t, 5, 1*time.Minute, "") + + _, err := be.BuildIndex(t.Context(), ns, 10, 0, nil, "test", indexTestDocs(ns, 10, 100), updateTestDocs(ns, 5)) + require.NoError(t, err) + + wg := sync.WaitGroup{} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + rebuilds := atomic.NewInt64(0) + updates := atomic.NewInt64(0) + searches := atomic.NewInt64(0) + const searchConcurrency = 25 + for i := 0; i < searchConcurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + for ctx.Err() == nil { + select { + case <-ctx.Done(): + return + case <-time.After(time.Duration(i) * time.Millisecond): // introduce small jitter + } + + idx, err := be.GetIndex(ctx, ns) + require.NoError(t, err) // GetIndex doesn't really return error. + + _, err = idx.UpdateIndex(ctx, "test") + if err != nil { + if errors.Is(err, bleve.ErrorIndexClosed) || errors.Is(err, context.Canceled) { + continue + } + require.NoError(t, err) + } + updates.Inc() + + resp, err := idx.Search(ctx, nil, &resourcepb.ResourceSearchRequest{ + Options: &resourcepb.ListOptions{ + Key: &resourcepb.ResourceKey{ + Namespace: ns.Namespace, + Group: ns.Group, + Resource: ns.Resource, + }, + }, + Fields: []string{"title"}, + Query: "Document", + Limit: 10, + }, nil) + if err != nil { + if errors.Is(err, bleve.ErrorIndexClosed) || errors.Is(err, context.Canceled) { + continue + } + require.NoError(t, err) + } + require.Equal(t, int64(10), resp.TotalHits) + searches.Inc() + } + }() + } + + wg.Add(1) + go func() { + defer wg.Done() + for ctx.Err() == nil { + _, err := be.BuildIndex(t.Context(), ns, 10, 0, nil, "test", indexTestDocs(ns, 10, 100), updateTestDocs(ns, 5)) + require.NoError(t, err) + rebuilds.Inc() + } + }() + + time.Sleep(5 * time.Second) + cancel() + wg.Wait() + + fmt.Println("Updates:", updates.Load(), "searches:", searches.Load(), "rebuilds:", rebuilds.Load()) +} + +// Verify concurrent updates and searches work as expected. +func TestConcurrentIndexUpdateAndSearch(t *testing.T) { + ns := resource.NamespacedResource{ + Namespace: "test", + Group: "group", + Resource: "resource", + } + + be, _ := setupBleveBackend(t, 5, 1*time.Minute, "") + + idx, err := be.BuildIndex(t.Context(), ns, 10 /* file based */, 100, nil, "test", indexTestDocs(ns, 10, 100), updateTestDocs(ns, 5)) + require.NoError(t, err) + + wg := sync.WaitGroup{} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // We count how many goroutines received given updated RV. We expect at least some RVs to be returned to multiple + // goroutines, if batching works. + mu := sync.Mutex{} + updatedRVs := map[int64]int{} + + const searchConcurrency = 25 + for i := 0; i < searchConcurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + prevRV := int64(0) + for ctx.Err() == nil { + // We use t.Context() here to avoid getting errors from context cancellation. + rv, err := idx.UpdateIndex(t.Context(), "test") + require.NoError(t, err) + require.Greater(t, rv, prevRV) // Each update should return new RV (that's how our update function works) + require.Equal(t, int64(10), searchTitle(t, idx, "Document", 10, ns).TotalHits) + prevRV = rv + + mu.Lock() + updatedRVs[rv]++ + mu.Unlock() + } + }() + } + + time.Sleep(1 * time.Second) + cancel() + wg.Wait() + + // Check that some RVs were updated due to requests from multiple goroutines + var rvUpdatedByMultipleGoroutines int64 + for rv, count := range updatedRVs { + if count > 1 { + rvUpdatedByMultipleGoroutines = rv + break + } + } + require.Greater(t, rvUpdatedByMultipleGoroutines, int64(0)) +} + +// Verify concurrent updates and searches work as expected. +func TestIndexUpdateWithErrors(t *testing.T) { + ns := resource.NamespacedResource{ + Namespace: "test", + Group: "group", + Resource: "resource", + } + + be, _ := setupBleveBackend(t, 5, 1*time.Minute, "") + + updateErr := fmt.Errorf("failed to update index") + updaterFn := func(context context.Context, index resource.ResourceIndex, sinceRV int64) (newRV int64, updatedDocs int, _ error) { + time.Sleep(100 * time.Millisecond) + return 0, 0, updateErr + } + idx, err := be.BuildIndex(t.Context(), ns, 10 /* file based */, 100, nil, "test", indexTestDocs(ns, 10, 100), updaterFn) + require.NoError(t, err) + + t.Run("update fail", func(t *testing.T) { + _, err = idx.UpdateIndex(t.Context(), "test") + require.ErrorIs(t, err, updateErr) + }) + + t.Run("update timeout", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond) + defer cancel() + + _, err = idx.UpdateIndex(ctx, "test") + require.ErrorIs(t, err, context.DeadlineExceeded) + }) + + t.Run("context canceled", func(t *testing.T) { + // Canceled context + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + _, err = idx.UpdateIndex(ctx, "test") + require.ErrorIs(t, err, context.Canceled) + }) +} + +func searchTitle(t *testing.T, idx resource.ResourceIndex, query string, limit int, ns resource.NamespacedResource) *resourcepb.ResourceSearchResponse { + resp, err := idx.Search(t.Context(), nil, &resourcepb.ResourceSearchRequest{ + Options: &resourcepb.ListOptions{ + Key: &resourcepb.ResourceKey{ + Namespace: ns.Namespace, + Group: ns.Group, + Resource: ns.Resource, + }, + }, + Fields: []string{"title"}, + Query: query, + Limit: int64(limit), + }, nil) + require.NoError(t, err) + return resp +} + +func docCount(t *testing.T, idx resource.ResourceIndex) int { + cnt, err := idx.DocCount(context.Background(), "") + require.NoError(t, err) + return int(cnt) +} diff --git a/pkg/storage/unified/search/options.go b/pkg/storage/unified/search/options.go index 0a68a1ec138..0093da44264 100644 --- a/pkg/storage/unified/search/options.go +++ b/pkg/storage/unified/search/options.go @@ -4,10 +4,11 @@ import ( "os" "path/filepath" + "go.opentelemetry.io/otel/trace" + "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/storage/unified/resource" - "go.opentelemetry.io/otel/trace" ) func NewSearchOptions(features featuremgmt.FeatureToggles, cfg *setting.Cfg, tracer trace.Tracer, docs resource.DocumentBuilderSupplier, indexMetrics *resource.BleveIndexMetrics) (resource.SearchOptions, error) { diff --git a/pkg/storage/unified/testing/benchmark.go b/pkg/storage/unified/testing/benchmark.go index dcdf113c212..964f2faeba0 100644 --- a/pkg/storage/unified/testing/benchmark.go +++ b/pkg/storage/unified/testing/benchmark.go @@ -217,7 +217,7 @@ func runSearchBackendBenchmarkWriteThroughput(ctx context.Context, backend resou size := int64(10000) // force the index to be on disk index, err := backend.BuildIndex(ctx, nr, size, 0, nil, "benchmark", func(index resource.ResourceIndex) (int64, error) { return 0, nil - }) + }, nil) if err != nil { return nil, fmt.Errorf("failed to initialize backend: %w", err) } diff --git a/pkg/storage/unified/testing/search_backend.go b/pkg/storage/unified/testing/search_backend.go index 52af572b01a..a91915f0076 100644 --- a/pkg/storage/unified/testing/search_backend.go +++ b/pkg/storage/unified/testing/search_backend.go @@ -86,7 +86,7 @@ func runTestSearchBackendBuildIndex(t *testing.T, backend resource.SearchBackend return 0, err } return 1, nil - }) + }, nil) require.NoError(t, err) require.NotNil(t, index) @@ -152,7 +152,7 @@ func runTestResourceIndex(t *testing.T, backend resource.SearchBackend, nsPrefix }) require.NoError(t, err) return int64(2), nil - }) + }, nil) require.NoError(t, err) require.NotNil(t, index) @@ -294,7 +294,7 @@ func runTestResourceIndex(t *testing.T, backend resource.SearchBackend, nsPrefix }) require.NoError(t, err) return int64(3), nil - }) + }, nil) require.NoError(t, err) require.NotNil(t, index)