diff --git a/pkg/server/search_server_distributor_test.go b/pkg/server/search_server_distributor_test.go index 31bf3b10c0a..18b5431ed89 100644 --- a/pkg/server/search_server_distributor_test.go +++ b/pkg/server/search_server_distributor_test.go @@ -13,6 +13,15 @@ import ( "time" claims "github.com/grafana/authlib/types" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace/noop" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/metadata" + "k8s.io/component-base/metrics/legacyregistry" + "github.com/grafana/grafana/pkg/api" "github.com/grafana/grafana/pkg/apimachinery/identity" "github.com/grafana/grafana/pkg/infra/tracing" @@ -25,14 +34,6 @@ import ( "github.com/grafana/grafana/pkg/storage/unified/search" "github.com/grafana/grafana/pkg/storage/unified/sql" "github.com/grafana/grafana/pkg/util/testutil" - "github.com/prometheus/client_golang/prometheus" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/otel/trace/noop" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/health/grpc_health_v1" - "google.golang.org/grpc/metadata" - "k8s.io/component-base/metrics/legacyregistry" ) var ( @@ -354,7 +355,7 @@ func createBaselineServer(t *testing.T, dbType, dbConnStr string, testNamespaces require.NoError(t, err) tracer := noop.NewTracerProvider().Tracer("test-tracer") require.NoError(t, err) - searchOpts, err := search.NewSearchOptions(features, cfg, tracer, docBuilders, nil) + searchOpts, err := search.NewSearchOptions(features, cfg, tracer, docBuilders, nil, nil) require.NoError(t, err) server, err := sql.NewResourceServer(sql.ServerOptions{ DB: nil, diff --git a/pkg/storage/unified/client.go b/pkg/storage/unified/client.go index 60a96e77855..9d59f880aef 100644 --- a/pkg/storage/unified/client.go +++ b/pkg/storage/unified/client.go @@ -154,7 +154,7 @@ func newClient(opts options.StorageOptions, return client, nil default: - searchOptions, err := search.NewSearchOptions(features, cfg, tracer, docs, indexMetrics) + searchOptions, err := search.NewSearchOptions(features, cfg, tracer, docs, indexMetrics, nil) if err != nil { return nil, err } diff --git a/pkg/storage/unified/resource/search.go b/pkg/storage/unified/resource/search.go index 88444b02cee..fbf70eebe4f 100644 --- a/pkg/storage/unified/resource/search.go +++ b/pkg/storage/unified/resource/search.go @@ -4,7 +4,6 @@ import ( "cmp" "context" "fmt" - "hash/fnv" "log/slog" "slices" "strings" @@ -20,7 +19,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "github.com/grafana/authlib/types" - "github.com/grafana/dskit/ring" dashboardv1 "github.com/grafana/grafana/apps/dashboard/pkg/apis/dashboard/v1beta1" folders "github.com/grafana/grafana/apps/folder/pkg/apis/folder/v1beta1" @@ -130,8 +128,7 @@ type searchSupport struct { initWorkers int initMinSize int - ring *ring.Ring - ringLifecycler *ring.BasicLifecycler + ownsIndexFn func(key NamespacedResource) (bool, error) buildIndex singleflight.Group @@ -144,7 +141,7 @@ var ( _ resourcepb.ManagedObjectIndexServer = (*searchSupport)(nil) ) -func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, tracer trace.Tracer, indexMetrics *BleveIndexMetrics, ring *ring.Ring, ringLifecycler *ring.BasicLifecycler) (support *searchSupport, err error) { +func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, tracer trace.Tracer, indexMetrics *BleveIndexMetrics, ownsIndexFn func(key NamespacedResource) (bool, error)) (support *searchSupport, err error) { // No backend search support if opts.Backend == nil { return nil, nil @@ -157,6 +154,12 @@ func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.A opts.WorkerThreads = 1 } + if ownsIndexFn == nil { + ownsIndexFn = func(key NamespacedResource) (bool, error) { + return true, nil + } + } + support = &searchSupport{ access: access, tracer: tracer, @@ -167,8 +170,7 @@ func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.A initMinSize: opts.InitMinCount, indexMetrics: indexMetrics, rebuildInterval: opts.RebuildInterval, - ring: ring, - ringLifecycler: ringLifecycler, + ownsIndexFn: ownsIndexFn, } info, err := opts.Resources.GetDocumentBuilders() @@ -399,33 +401,6 @@ func (s *searchSupport) GetStats(ctx context.Context, req *resourcepb.ResourceSt return rsp, nil } -func (s *searchSupport) shouldBuildIndex(info ResourceStats) bool { - if s.ring == nil { - s.log.Debug("ring is not setup. Will proceed to build index") - return true - } - - if s.ringLifecycler == nil { - s.log.Error("missing ring lifecycler") - return true - } - - ringHasher := fnv.New32a() - _, err := ringHasher.Write([]byte(info.Namespace)) - if err != nil { - s.log.Error("error hashing namespace", "namespace", info.Namespace, "err", err) - return true - } - - rs, err := s.ring.GetWithOptions(ringHasher.Sum32(), searchOwnerRead, ring.WithReplicationFactor(s.ring.ReplicationFactor())) - if err != nil { - s.log.Error("error getting replicaset from ring", "namespace", info.Namespace, "err", err) - return true - } - - return rs.Includes(s.ringLifecycler.GetInstanceAddr()) -} - func (s *searchSupport) buildIndexes(ctx context.Context, rebuild bool) (int, error) { totalBatchesIndexed := 0 group := errgroup.Group{} @@ -442,7 +417,10 @@ func (s *searchSupport) buildIndexes(ctx context.Context, rebuild bool) (int, er continue } - if !s.shouldBuildIndex(info) { + own, err := s.ownsIndexFn(info.NamespacedResource) + if err != nil { + s.log.Warn("failed to check index ownership, building index", "namespace", info.Namespace, "group", info.Group, "resource", info.Resource, "error", err) + } else if !own { s.log.Debug("skip building index", "namespace", info.Namespace, "group", info.Group, "resource", info.Resource) continue } diff --git a/pkg/storage/unified/resource/search_server_distributor.go b/pkg/storage/unified/resource/search_server_distributor.go index 988153e0c12..1363c2452f3 100644 --- a/pkg/storage/unified/resource/search_server_distributor.go +++ b/pkg/storage/unified/resource/search_server_distributor.go @@ -90,8 +90,6 @@ var ( searchRingRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, func(s ring.InstanceState) bool { return s != ring.ACTIVE }) - // operation used by the search-servers to check if they own the namespace - searchOwnerRead = ring.NewOp([]ring.InstanceState{ring.JOINING, ring.ACTIVE, ring.LEAVING}, nil) ) func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) { diff --git a/pkg/storage/unified/resource/search_test.go b/pkg/storage/unified/resource/search_test.go index b4e7108383c..be34e3a5724 100644 --- a/pkg/storage/unified/resource/search_test.go +++ b/pkg/storage/unified/resource/search_test.go @@ -213,7 +213,7 @@ func TestSearchGetOrCreateIndex(t *testing.T) { InitMinCount: 1, // set min count to default for this test } - support, err := newSearchSupport(opts, storage, nil, nil, noop.NewTracerProvider().Tracer("test"), nil, nil, nil) + support, err := newSearchSupport(opts, storage, nil, nil, noop.NewTracerProvider().Tracer("test"), nil, nil) require.NoError(t, err) require.NotNil(t, support) @@ -274,7 +274,7 @@ func TestSearchGetOrCreateIndexWithIndexUpdate(t *testing.T) { } // Enable searchAfterWrite - support, err := newSearchSupport(opts, storage, nil, nil, noop.NewTracerProvider().Tracer("test"), nil, nil, nil) + support, err := newSearchSupport(opts, storage, nil, nil, noop.NewTracerProvider().Tracer("test"), nil, nil) require.NoError(t, err) require.NotNil(t, support) @@ -325,7 +325,7 @@ func TestSearchGetOrCreateIndexWithCancellation(t *testing.T) { InitMinCount: 1, // set min count to default for this test } - support, err := newSearchSupport(opts, storage, nil, nil, noop.NewTracerProvider().Tracer("test"), nil, nil, nil) + support, err := newSearchSupport(opts, storage, nil, nil, noop.NewTracerProvider().Tracer("test"), nil, nil) require.NoError(t, err) require.NotNil(t, support) diff --git a/pkg/storage/unified/resource/server.go b/pkg/storage/unified/resource/server.go index 35987a418b8..fdcdc9d9301 100644 --- a/pkg/storage/unified/resource/server.go +++ b/pkg/storage/unified/resource/server.go @@ -235,8 +235,7 @@ type ResourceServerOptions struct { QOSQueue QOSEnqueuer QOSConfig QueueConfig - Ring *ring.Ring - RingLifecycler *ring.BasicLifecycler + OwnsIndexFn func(key NamespacedResource) (bool, error) } func NewResourceServer(opts ResourceServerOptions) (*server, error) { @@ -334,7 +333,7 @@ func NewResourceServer(opts ResourceServerOptions) (*server, error) { if opts.Search.Resources != nil { var err error - s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.Tracer, opts.IndexMetrics, opts.Ring, opts.RingLifecycler) + s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.Tracer, opts.IndexMetrics, opts.OwnsIndexFn) if err != nil { return nil, err } diff --git a/pkg/storage/unified/search/bleve.go b/pkg/storage/unified/search/bleve.go index 74ffa5ec74f..ef06f585363 100644 --- a/pkg/storage/unified/search/bleve.go +++ b/pkg/storage/unified/search/bleve.go @@ -28,6 +28,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + "go.uber.org/atomic" "k8s.io/apimachinery/pkg/selection" "github.com/grafana/grafana/pkg/services/dashboards/dashboardaccess" @@ -69,6 +70,7 @@ type BleveOptions struct { BatchSize int // Index cache TTL for bleve indices. 0 disables expiration for in-memory indexes. + // Also used for file-based indexes, if they are not owned by this instance, and they are not fetched from the cache recently. IndexCacheTTL time.Duration BuildVersion string @@ -79,6 +81,11 @@ type BleveOptions struct { Logger *slog.Logger UseFullNgram bool + + // This function is called to check whether the index is owned by the current instance. + // Indexes that are not owned by current instance are eligible for cleanup. + // If nil, all indexes are owned by the current instance. + OwnsIndex func(key resource.NamespacedResource) (bool, error) } type bleveBackend struct { @@ -86,6 +93,9 @@ type bleveBackend struct { log *slog.Logger opts BleveOptions + // set from opts.OwnsIndex, always non-nil + ownsIndexFn func(key resource.NamespacedResource) (bool, error) + cacheMx sync.RWMutex cache map[resource.NamespacedResource]*bleveIndex @@ -94,8 +104,8 @@ type bleveBackend struct { // if true will use ngram instead of edge_ngram for title indexes. See custom_analyzers.go useFullNgram bool - metricsUpdaterCancel func() - metricsUpdaterWg sync.WaitGroup + bgTasksCancel func() + bgTasksWg sync.WaitGroup } func NewBleveBackend(opts BleveOptions, tracer trace.Tracer, indexMetrics *resource.BleveIndexMetrics) (*bleveBackend, error) { @@ -129,22 +139,31 @@ func NewBleveBackend(opts BleveOptions, tracer trace.Tracer, indexMetrics *resou log = slog.Default().With("logger", "bleve-backend") } + ownFn := opts.OwnsIndex + if ownFn == nil { + // By default all indexes are owned by this instance. + ownFn = func(key resource.NamespacedResource) (bool, error) { return true, nil } + } + be := &bleveBackend{ log: log, tracer: tracer, cache: map[resource.NamespacedResource]*bleveIndex{}, opts: opts, + ownsIndexFn: ownFn, indexMetrics: indexMetrics, useFullNgram: opts.UseFullNgram, } + ctx, cancel := context.WithCancel(context.Background()) + be.bgTasksCancel = cancel + + be.bgTasksWg.Add(1) + go be.evictExpiredOrUnownedIndexesPeriodically(ctx) + if be.indexMetrics != nil { - ctx, cancel := context.WithCancel(context.Background()) - be.metricsUpdaterCancel = cancel - be.metricsUpdaterWg.Add(1) + be.bgTasksWg.Add(1) go be.updateIndexSizeMetric(ctx, opts.Root) - } else { - be.metricsUpdaterCancel = func() { /* empty */ } } return be, nil @@ -152,7 +171,7 @@ func NewBleveBackend(opts BleveOptions, tracer trace.Tracer, indexMetrics *resou // GetIndex will return nil if the key does not exist func (b *bleveBackend) GetIndex(_ context.Context, key resource.NamespacedResource) (resource.ResourceIndex, error) { - idx := b.getCachedIndex(key) + idx := b.getCachedIndex(key, time.Now()) // Avoid returning typed nils. if idx == nil { return nil, nil @@ -160,48 +179,95 @@ func (b *bleveBackend) GetIndex(_ context.Context, key resource.NamespacedResour return idx, nil } -func (b *bleveBackend) getCachedIndex(key resource.NamespacedResource) *bleveIndex { +func (b *bleveBackend) getCachedIndex(key resource.NamespacedResource, now time.Time) *bleveIndex { // Check index with read-lock first. b.cacheMx.RLock() - val := b.cache[key] + idx := b.cache[key] b.cacheMx.RUnlock() - if val == nil { + if idx == nil { return nil } - if val.expiration.IsZero() || val.expiration.After(time.Now()) { - // Not expired yet. - return val - } + idx.lastFetchedFromCache.Store(now.UnixMilli()) + return idx +} - // We're dealing with expired index. We need to remove it from the cache and close it. - b.cacheMx.Lock() - val = b.cache[key] - delete(b.cache, key) - b.cacheMx.Unlock() - - if val == nil { - return nil - } - - // Index is no longer in the cache, but we need to close it. - err := val.stopUpdaterAndCloseIndex() +func (b *bleveBackend) closeIndex(idx *bleveIndex, key resource.NamespacedResource) { + err := idx.stopUpdaterAndCloseIndex() if err != nil { b.log.Error("failed to close index", "key", key, "err", err) } - b.log.Info("index evicted from cache", "key", key) if b.indexMetrics != nil { - b.indexMetrics.OpenIndexes.WithLabelValues(val.indexStorage).Dec() + b.indexMetrics.OpenIndexes.WithLabelValues(idx.indexStorage).Dec() + } +} + +// This function will periodically evict expired or un-owned indexes from the cache. +func (b *bleveBackend) evictExpiredOrUnownedIndexesPeriodically(ctx context.Context) { + defer b.bgTasksWg.Done() + + t := time.NewTicker(2 * time.Minute) + + for ctx.Err() == nil { + select { + case <-ctx.Done(): + return + case <-t.C: + b.runEvictExpiredOrUnownedIndexes(time.Now()) + } + } +} + +func (b *bleveBackend) runEvictExpiredOrUnownedIndexes(now time.Time) { + cacheTTLMillis := b.opts.IndexCacheTTL.Milliseconds() + + // Collect all expired or unowned into this map, and perform the actual closing without holding the lock. + expired := map[resource.NamespacedResource]*bleveIndex{} + unowned := map[resource.NamespacedResource]*bleveIndex{} + ownCheckErrors := map[resource.NamespacedResource]error{} + + b.cacheMx.Lock() + for key, idx := range b.cache { + // Check if index has expired. + if !idx.expiration.IsZero() && now.After(idx.expiration) { + delete(b.cache, key) + expired[key] = idx + continue + } + + // Check if index is owned by this instance. + if cacheTTLMillis > 0 { + owned, err := b.ownsIndexFn(key) + if err != nil { + ownCheckErrors[key] = err + } else if !owned && now.UnixMilli()-idx.lastFetchedFromCache.Load() > cacheTTLMillis { + delete(b.cache, key) + unowned[key] = idx + } + } + } + b.cacheMx.Unlock() + + for key, err := range ownCheckErrors { + b.log.Warn("failed to check if index belongs to this instance", "key", key, "err", err) } - return nil + for key, idx := range unowned { + b.log.Info("index evicted from cache", "reason", "unowned", "key", key, "storage", idx.indexStorage) + b.closeIndex(idx, key) + } + + for key, idx := range expired { + b.log.Info("index evicted from cache", "reason", "expired", "key", key, "storage", idx.indexStorage) + b.closeIndex(idx, key) + } } // updateIndexSizeMetric sets the total size of all file-based indices metric. func (b *bleveBackend) updateIndexSizeMetric(ctx context.Context, indexPath string) { - defer b.metricsUpdaterWg.Done() + defer b.bgTasksWg.Done() for ctx.Err() == nil { var totalSize int64 @@ -357,7 +423,7 @@ func (b *bleveBackend) BuildIndex( var index bleve.Index var indexRV int64 - cachedIndex := b.getCachedIndex(key) + cachedIndex := b.getCachedIndex(key, time.Now()) fileIndexName := "" // Name of the file-based index, or empty for in-memory indexes. newIndexType := indexStorageMemory build := true @@ -572,9 +638,10 @@ func (b *bleveBackend) TotalDocs() int64 { var totalDocs int64 // We iterate over keys and call getCachedIndex for each index individually. // We do this to avoid keeping a lock for the entire TotalDocs function, since DocCount may be slow (due to disk access). - // Calling getCachedIndex also handles index expiration. + + now := time.Now() for _, key := range b.cacheKeys() { - idx := b.getCachedIndex(key) + idx := b.getCachedIndex(key, now) if idx == nil { continue } @@ -652,8 +719,8 @@ func (b *bleveBackend) findPreviousFileBasedIndex(resourceDir string, minBuildTi func (b *bleveBackend) Stop() { b.closeAllIndexes() - b.metricsUpdaterCancel() - b.metricsUpdaterWg.Wait() + b.bgTasksCancel() + b.bgTasksWg.Wait() } func (b *bleveBackend) closeAllIndexes() { @@ -714,6 +781,9 @@ type bleveIndex struct { updateLatency prometheus.Histogram updatedDocuments prometheus.Summary + + // Used to detect if the index can be safely closed, if it no longer belongs to this instance. UnixMilli. + lastFetchedFromCache atomic.Int64 } func (b *bleveBackend) newBleveIndex( diff --git a/pkg/storage/unified/search/bleve_integration_test.go b/pkg/storage/unified/search/bleve_integration_test.go index 150fdd1468b..b6d31526385 100644 --- a/pkg/storage/unified/search/bleve_integration_test.go +++ b/pkg/storage/unified/search/bleve_integration_test.go @@ -25,7 +25,7 @@ func TestBleveSearchBackend(t *testing.T) { require.NoError(t, err) require.NotNil(t, backend) - t.Cleanup(backend.closeAllIndexes) + t.Cleanup(backend.Stop) return backend }, &unitest.TestOptions{ @@ -73,7 +73,7 @@ func TestSearchBackendBenchmark(t *testing.T) { require.NoError(t, err) require.NotNil(t, backend) - t.Cleanup(backend.closeAllIndexes) + t.Cleanup(backend.Stop) unitest.BenchmarkSearchBackend(t, backend, opts) } diff --git a/pkg/storage/unified/search/bleve_test.go b/pkg/storage/unified/search/bleve_test.go index 441a6a81d72..347195786a5 100644 --- a/pkg/storage/unified/search/bleve_test.go +++ b/pkg/storage/unified/search/bleve_test.go @@ -55,6 +55,7 @@ func TestBleveBackend(t *testing.T) { UseFullNgram: false, }, tracing.NewNoopTracerService(), nil) require.NoError(t, err) + t.Cleanup(backend.Stop) testBleveBackend(t, backend) } @@ -69,6 +70,7 @@ func TestBleveBackendFullNgramEnabled(t *testing.T) { UseFullNgram: true, }, tracing.NewNoopTracerService(), nil) require.NoError(t, err) + t.Cleanup(backend.Stop) testBleveBackend(t, backend) } @@ -85,8 +87,6 @@ func testBleveBackend(t *testing.T, backend *bleveBackend) { Resource: "folders", } - t.Cleanup(backend.Stop) - rv := int64(10) ctx := identity.WithRequester(context.Background(), &user.SignedInUser{Namespace: "ns"}) var dashboardsIndex resource.ResourceIndex @@ -845,6 +845,12 @@ func withMaxFileIndexAge(maxAge time.Duration) setupOption { } } +func withOwnsIndexFn(fn func(key resource.NamespacedResource) (bool, error)) setupOption { + return func(options *BleveOptions) { + options.OwnsIndex = fn + } +} + func TestBuildIndexExpiration(t *testing.T) { ns := resource.NamespacedResource{ Namespace: "test", @@ -852,46 +858,90 @@ func TestBuildIndexExpiration(t *testing.T) { Resource: "resource", } - t.Run("memory based indexes should expire", func(t *testing.T) { - backend, reg := setupBleveBackend(t, withIndexCacheTTL(time.Nanosecond)) + type testCase struct { + inMemory bool + owned bool + ownedCheckError error + expectedEviction bool + } - builtIndex, err := backend.BuildIndex(context.Background(), ns, 1 /* below FileThreshold */, nil, "test", indexTestDocs(ns, 1, 100), nil, false) - require.NoError(t, err) + cacheTTL := time.Millisecond - // Wait for index expiration, which is 1ns - time.Sleep(10 * time.Millisecond) - idx, err := backend.GetIndex(context.Background(), ns) - require.NoError(t, err) - require.Nil(t, idx) + for name, tc := range map[string]testCase{ + "memory index should expire, if owned": { + inMemory: true, + owned: true, + expectedEviction: true, + }, + "memory index should expire, if not owned": { + inMemory: true, + owned: false, + expectedEviction: true, + }, + "memory index should expire, if ownership check fails": { + inMemory: true, + ownedCheckError: errors.New("error"), + expectedEviction: true, + }, + "file index should NOT expire, if owned": { + inMemory: false, + owned: true, + expectedEviction: false, + }, + "file index should expire, if not owned": { + inMemory: false, + owned: false, + expectedEviction: true, + }, + "file index should NOT expire, if ownership check fails": { + inMemory: false, + ownedCheckError: errors.New("error"), + expectedEviction: false, + }, + } { + t.Run(name, func(t *testing.T) { + backend, reg := setupBleveBackend(t, withIndexCacheTTL(cacheTTL), withOwnsIndexFn(func(key resource.NamespacedResource) (bool, error) { + return tc.owned, tc.ownedCheckError + })) - // Verify that builtIndex is now closed. - _, err = builtIndex.DocCount(context.Background(), "") - require.ErrorIs(t, err, bleve.ErrorIndexClosed) + size := int64(1) + if !tc.inMemory { + size = 100 // above defaultFileTreshold + } + builtIndex, err := backend.BuildIndex(context.Background(), ns, size, nil, "test", indexTestDocs(ns, 1, 100), nil, false) + require.NoError(t, err) - // Verify that there are no open indexes. - checkOpenIndexes(t, reg, 0, 0) - }) + // Evict indexes. + backend.runEvictExpiredOrUnownedIndexes(time.Now().Add(5 * time.Minute)) - t.Run("file based indexes should NOT expire", func(t *testing.T) { - backend, reg := setupBleveBackend(t, withIndexCacheTTL(time.Nanosecond)) + if tc.expectedEviction { + idx, err := backend.GetIndex(context.Background(), ns) + require.NoError(t, err) + require.Nil(t, idx) - // size=100 is above FileThreshold, this will be file-based index - builtIndex, err := backend.BuildIndex(context.Background(), ns, 100, nil, "test", indexTestDocs(ns, 1, 100), nil, false) - require.NoError(t, err) + _, err = builtIndex.DocCount(context.Background(), "") + require.ErrorIs(t, err, bleve.ErrorIndexClosed) - // Wait for index expiration, which is 1ns - time.Sleep(10 * time.Millisecond) - idx, err := backend.GetIndex(context.Background(), ns) - require.NoError(t, err) - require.NotNil(t, idx) + // Verify that there are no open indexes. + checkOpenIndexes(t, reg, 0, 0) + } else { + idx, err := backend.GetIndex(context.Background(), ns) + require.NoError(t, err) + require.NotNil(t, idx) - // Verify that builtIndex is still open. - cnt, err := builtIndex.DocCount(context.Background(), "") - require.NoError(t, err) - require.Equal(t, int64(1), cnt) + cnt, err := builtIndex.DocCount(context.Background(), "") + require.NoError(t, err) + require.Equal(t, int64(1), cnt) - checkOpenIndexes(t, reg, 0, 1) - }) + // Verify that index is still open + if tc.inMemory { + checkOpenIndexes(t, reg, 1, 0) + } else { + checkOpenIndexes(t, reg, 0, 1) + } + } + }) + } } func TestCloseAllIndexes(t *testing.T) { diff --git a/pkg/storage/unified/search/options.go b/pkg/storage/unified/search/options.go index 7c367b3b421..a5bb1d52a8c 100644 --- a/pkg/storage/unified/search/options.go +++ b/pkg/storage/unified/search/options.go @@ -12,10 +12,15 @@ import ( "github.com/grafana/grafana/pkg/storage/unified/resource" ) -func NewSearchOptions(features featuremgmt.FeatureToggles, cfg *setting.Cfg, tracer trace.Tracer, docs resource.DocumentBuilderSupplier, indexMetrics *resource.BleveIndexMetrics) (resource.SearchOptions, error) { - // Setup the search server - if features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageSearch) || - features.IsEnabledGlobally(featuremgmt.FlagProvisioning) { +func NewSearchOptions( + features featuremgmt.FeatureToggles, + cfg *setting.Cfg, + tracer trace.Tracer, + docs resource.DocumentBuilderSupplier, + indexMetrics *resource.BleveIndexMetrics, + ownsIndexFn func(key resource.NamespacedResource) (bool, error), +) (resource.SearchOptions, error) { + if features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageSearch) || features.IsEnabledGlobally(featuremgmt.FlagProvisioning) { root := cfg.IndexPath if root == "" { root = filepath.Join(cfg.DataPath, "unified-search", "bleve") @@ -44,6 +49,7 @@ func NewSearchOptions(features featuremgmt.FeatureToggles, cfg *setting.Cfg, tra MaxFileIndexAge: cfg.MaxFileIndexAge, MinBuildVersion: minVersion, UseFullNgram: features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageUseFullNgram), + OwnsIndex: ownsIndexFn, }, tracer, indexMetrics) if err != nil { diff --git a/pkg/storage/unified/sql/server.go b/pkg/storage/unified/sql/server.go index b2debf0acdc..83424b0edea 100644 --- a/pkg/storage/unified/sql/server.go +++ b/pkg/storage/unified/sql/server.go @@ -10,7 +10,6 @@ import ( "go.opentelemetry.io/otel/trace" "github.com/grafana/authlib/types" - "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" infraDB "github.com/grafana/grafana/pkg/infra/db" @@ -42,14 +41,10 @@ type ServerOptions struct { Features featuremgmt.FeatureToggles QOSQueue QOSEnqueueDequeuer SecureValues secrets.InlineSecureValueSupport - Ring *ring.Ring - RingLifecycler *ring.BasicLifecycler + OwnsIndexFn func(key resource.NamespacedResource) (bool, error) } -// Creates a new ResourceServer -func NewResourceServer( - opts ServerOptions, -) (resource.ResourceServer, error) { +func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) { apiserverCfg := opts.Cfg.SectionWithEnvOverrides("grafana-apiserver") if opts.SecureValues == nil && opts.Cfg != nil && opts.Cfg.SecretsManagement.GrpcClientEnable { @@ -118,8 +113,7 @@ func NewResourceServer( serverOptions.Search = opts.SearchOptions serverOptions.IndexMetrics = opts.IndexMetrics serverOptions.QOSQueue = opts.QOSQueue - serverOptions.Ring = opts.Ring - serverOptions.RingLifecycler = opts.RingLifecycler + serverOptions.OwnsIndexFn = opts.OwnsIndexFn return resource.NewResourceServer(serverOptions) } diff --git a/pkg/storage/unified/sql/service.go b/pkg/storage/unified/sql/service.go index 50cc24c9a69..6dd851ad74f 100644 --- a/pkg/storage/unified/sql/service.go +++ b/pkg/storage/unified/sql/service.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "hash/fnv" "net" "os" "strconv" @@ -193,6 +194,34 @@ func ProvideUnifiedStorageGrpcService( return s, nil } +var ( + // operation used by the search-servers to check if they own the namespace + searchOwnerRead = ring.NewOp([]ring.InstanceState{ring.JOINING, ring.ACTIVE, ring.LEAVING}, nil) +) + +func (s *service) OwnsIndex(key resource.NamespacedResource) (bool, error) { + if s.searchRing == nil { + return true, nil + } + + if st := s.searchRing.State(); st != services.Running { + return false, fmt.Errorf("ring is not Running: %s", st) + } + + ringHasher := fnv.New32a() + _, err := ringHasher.Write([]byte(key.Namespace)) + if err != nil { + return false, fmt.Errorf("error hashing namespace: %w", err) + } + + rs, err := s.searchRing.GetWithOptions(ringHasher.Sum32(), searchOwnerRead, ring.WithReplicationFactor(s.searchRing.ReplicationFactor())) + if err != nil { + return false, fmt.Errorf("error getting replicaset from ring: %w", err) + } + + return rs.Includes(s.ringLifecycler.GetInstanceAddr()), nil +} + func (s *service) starting(ctx context.Context) error { if s.hasSubservices { s.subservicesWatcher.WatchManager(s.subservices) @@ -206,7 +235,7 @@ func (s *service) starting(ctx context.Context) error { return err } - searchOptions, err := search.NewSearchOptions(s.features, s.cfg, s.tracing, s.docBuilders, s.indexMetrics) + searchOptions, err := search.NewSearchOptions(s.features, s.cfg, s.tracing, s.docBuilders, s.indexMetrics, s.OwnsIndex) if err != nil { return err } @@ -222,8 +251,7 @@ func (s *service) starting(ctx context.Context) error { IndexMetrics: s.indexMetrics, Features: s.features, QOSQueue: s.queue, - Ring: s.searchRing, - RingLifecycler: s.ringLifecycler, + OwnsIndexFn: s.OwnsIndex, } server, err := NewResourceServer(serverOptions) if err != nil { diff --git a/pkg/storage/unified/sql/test/integration_test.go b/pkg/storage/unified/sql/test/integration_test.go index dc2cbfa9ec7..c5420b71280 100644 --- a/pkg/storage/unified/sql/test/integration_test.go +++ b/pkg/storage/unified/sql/test/integration_test.go @@ -2,7 +2,6 @@ package test import ( "context" - "os" "sync" "testing" "time" @@ -98,17 +97,14 @@ func TestIntegrationSearchAndStorage(t *testing.T) { ctx := context.Background() - tempDir := t.TempDir() - t.Cleanup(func() { - _ = os.RemoveAll(tempDir) - }) // Create a new bleve backend search, err := search.NewBleveBackend(search.BleveOptions{ FileThreshold: 0, - Root: tempDir, + Root: t.TempDir(), }, tracing.NewNoopTracerService(), nil) require.NoError(t, err) require.NotNil(t, search) + t.Cleanup(search.Stop) // Create a new resource backend storage := newTestBackend(t, false, 0)