Improve search index eviction (#111542)
* Modify index eviction mechanism such that unowned indexes are also evicted. * Propagate OwnsIndex function to bleve backend Fix tests. Stop eviction goroutine when stopping backend. Make linter happy. Make sure we stop backend created by tests. Review suggestion. Removed newline.
This commit is contained in:
@@ -10,7 +10,6 @@ import (
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"github.com/grafana/authlib/types"
|
||||
"github.com/grafana/dskit/ring"
|
||||
"github.com/grafana/dskit/services"
|
||||
|
||||
infraDB "github.com/grafana/grafana/pkg/infra/db"
|
||||
@@ -42,14 +41,10 @@ type ServerOptions struct {
|
||||
Features featuremgmt.FeatureToggles
|
||||
QOSQueue QOSEnqueueDequeuer
|
||||
SecureValues secrets.InlineSecureValueSupport
|
||||
Ring *ring.Ring
|
||||
RingLifecycler *ring.BasicLifecycler
|
||||
OwnsIndexFn func(key resource.NamespacedResource) (bool, error)
|
||||
}
|
||||
|
||||
// Creates a new ResourceServer
|
||||
func NewResourceServer(
|
||||
opts ServerOptions,
|
||||
) (resource.ResourceServer, error) {
|
||||
func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
apiserverCfg := opts.Cfg.SectionWithEnvOverrides("grafana-apiserver")
|
||||
|
||||
if opts.SecureValues == nil && opts.Cfg != nil && opts.Cfg.SecretsManagement.GrpcClientEnable {
|
||||
@@ -118,8 +113,7 @@ func NewResourceServer(
|
||||
serverOptions.Search = opts.SearchOptions
|
||||
serverOptions.IndexMetrics = opts.IndexMetrics
|
||||
serverOptions.QOSQueue = opts.QOSQueue
|
||||
serverOptions.Ring = opts.Ring
|
||||
serverOptions.RingLifecycler = opts.RingLifecycler
|
||||
serverOptions.OwnsIndexFn = opts.OwnsIndexFn
|
||||
|
||||
return resource.NewResourceServer(serverOptions)
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"net"
|
||||
"os"
|
||||
"strconv"
|
||||
@@ -193,6 +194,34 @@ func ProvideUnifiedStorageGrpcService(
|
||||
return s, nil
|
||||
}
|
||||
|
||||
var (
|
||||
// operation used by the search-servers to check if they own the namespace
|
||||
searchOwnerRead = ring.NewOp([]ring.InstanceState{ring.JOINING, ring.ACTIVE, ring.LEAVING}, nil)
|
||||
)
|
||||
|
||||
func (s *service) OwnsIndex(key resource.NamespacedResource) (bool, error) {
|
||||
if s.searchRing == nil {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
if st := s.searchRing.State(); st != services.Running {
|
||||
return false, fmt.Errorf("ring is not Running: %s", st)
|
||||
}
|
||||
|
||||
ringHasher := fnv.New32a()
|
||||
_, err := ringHasher.Write([]byte(key.Namespace))
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("error hashing namespace: %w", err)
|
||||
}
|
||||
|
||||
rs, err := s.searchRing.GetWithOptions(ringHasher.Sum32(), searchOwnerRead, ring.WithReplicationFactor(s.searchRing.ReplicationFactor()))
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("error getting replicaset from ring: %w", err)
|
||||
}
|
||||
|
||||
return rs.Includes(s.ringLifecycler.GetInstanceAddr()), nil
|
||||
}
|
||||
|
||||
func (s *service) starting(ctx context.Context) error {
|
||||
if s.hasSubservices {
|
||||
s.subservicesWatcher.WatchManager(s.subservices)
|
||||
@@ -206,7 +235,7 @@ func (s *service) starting(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
searchOptions, err := search.NewSearchOptions(s.features, s.cfg, s.tracing, s.docBuilders, s.indexMetrics)
|
||||
searchOptions, err := search.NewSearchOptions(s.features, s.cfg, s.tracing, s.docBuilders, s.indexMetrics, s.OwnsIndex)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -222,8 +251,7 @@ func (s *service) starting(ctx context.Context) error {
|
||||
IndexMetrics: s.indexMetrics,
|
||||
Features: s.features,
|
||||
QOSQueue: s.queue,
|
||||
Ring: s.searchRing,
|
||||
RingLifecycler: s.ringLifecycler,
|
||||
OwnsIndexFn: s.OwnsIndex,
|
||||
}
|
||||
server, err := NewResourceServer(serverOptions)
|
||||
if err != nil {
|
||||
|
||||
@@ -2,7 +2,6 @@ package test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -98,17 +97,14 @@ func TestIntegrationSearchAndStorage(t *testing.T) {
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
tempDir := t.TempDir()
|
||||
t.Cleanup(func() {
|
||||
_ = os.RemoveAll(tempDir)
|
||||
})
|
||||
// Create a new bleve backend
|
||||
search, err := search.NewBleveBackend(search.BleveOptions{
|
||||
FileThreshold: 0,
|
||||
Root: tempDir,
|
||||
Root: t.TempDir(),
|
||||
}, tracing.NewNoopTracerService(), nil)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, search)
|
||||
t.Cleanup(search.Stop)
|
||||
|
||||
// Create a new resource backend
|
||||
storage := newTestBackend(t, false, 0)
|
||||
|
||||
Reference in New Issue
Block a user