diff --git a/pkg/storage/unified/resource/search.go b/pkg/storage/unified/resource/search.go index 0a88d9a035a..96a44f6d3c3 100644 --- a/pkg/storage/unified/resource/search.go +++ b/pkg/storage/unified/resource/search.go @@ -4,6 +4,7 @@ import ( "cmp" "context" "fmt" + "hash/fnv" "log/slog" "slices" "strings" @@ -19,6 +20,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "github.com/grafana/authlib/types" + "github.com/grafana/dskit/ring" dashboardv1 "github.com/grafana/grafana/apps/dashboard/pkg/apis/dashboard/v1beta1" folders "github.com/grafana/grafana/apps/folder/pkg/apis/folder/v1beta1" @@ -109,6 +111,9 @@ type searchSupport struct { initMinSize int initMaxSize int + ring *ring.Ring + ringLifecycler *ring.BasicLifecycler + buildIndex singleflight.Group // Index queue processors @@ -128,7 +133,7 @@ var ( _ resourcepb.ManagedObjectIndexServer = (*searchSupport)(nil) ) -func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, tracer trace.Tracer, indexMetrics *BleveIndexMetrics) (support *searchSupport, err error) { +func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, tracer trace.Tracer, indexMetrics *BleveIndexMetrics, ring *ring.Ring, ringLifecycler *ring.BasicLifecycler) (support *searchSupport, err error) { // No backend search support if opts.Backend == nil { return nil, nil @@ -155,6 +160,8 @@ func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.A indexEventsChan: make(chan *IndexEvent), indexQueueProcessors: make(map[string]*indexQueueProcessor), rebuildInterval: opts.RebuildInterval, + ring: ring, + ringLifecycler: ringLifecycler, } info, err := opts.Resources.GetDocumentBuilders() @@ -379,6 +386,33 @@ func (s *searchSupport) GetStats(ctx context.Context, req *resourcepb.ResourceSt return rsp, nil } +func (s *searchSupport) shouldBuildIndex(info ResourceStats) bool { + if s.ring == nil { + s.log.Debug("ring is not setup. Will proceed to build index") + return true + } + + if s.ringLifecycler == nil { + s.log.Error("missing ring lifecycler") + return true + } + + ringHasher := fnv.New32a() + _, err := ringHasher.Write([]byte(info.Namespace)) + if err != nil { + s.log.Error("error hashing namespace", "namespace", info.Namespace, "err", err) + return true + } + + rs, err := s.ring.GetWithOptions(ringHasher.Sum32(), searchOwnerRead, ring.WithReplicationFactor(s.ring.ReplicationFactor())) + if err != nil { + s.log.Error("error getting replicaset from ring", "namespace", info.Namespace, "err", err) + return true + } + + return rs.Includes(s.ringLifecycler.GetInstanceAddr()) +} + func (s *searchSupport) buildIndexes(ctx context.Context, rebuild bool) (int, error) { totalBatchesIndexed := 0 group := errgroup.Group{} @@ -395,6 +429,11 @@ func (s *searchSupport) buildIndexes(ctx context.Context, rebuild bool) (int, er continue } + if !s.shouldBuildIndex(info) { + s.log.Debug("skip building index", "namespace", info.Namespace, "group", info.Group, "resource", info.Resource) + continue + } + group.Go(func() error { if rebuild { // we need to clear the cache to make sure we get the latest usage insights data diff --git a/pkg/storage/unified/resource/search_server_distributor.go b/pkg/storage/unified/resource/search_server_distributor.go index c148448ca7b..f410207b59e 100644 --- a/pkg/storage/unified/resource/search_server_distributor.go +++ b/pkg/storage/unified/resource/search_server_distributor.go @@ -81,9 +81,14 @@ type distributorServer struct { log log.Logger } -var activeRingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, func(s ring.InstanceState) bool { - return s != ring.ACTIVE -}) +var ( + // operation used by the distributor to select only ACTIVE instances to handle search-related requests + searchRingRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, func(s ring.InstanceState) bool { + return s != ring.ACTIVE + }) + // operation used by the search-servers to check if they own the namespace + searchOwnerRead = ring.NewOp([]ring.InstanceState{ring.JOINING, ring.ACTIVE, ring.LEAVING}, nil) +) func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) { ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace, "Search") @@ -128,7 +133,7 @@ func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, n return ctx, nil, err } - rs, err := ds.ring.GetWithOptions(ringHasher.Sum32(), activeRingOp, ring.WithReplicationFactor(ds.ring.ReplicationFactor())) + rs, err := ds.ring.GetWithOptions(ringHasher.Sum32(), searchRingRead, ring.WithReplicationFactor(ds.ring.ReplicationFactor())) if err != nil { return ctx, nil, err } diff --git a/pkg/storage/unified/resource/search_test.go b/pkg/storage/unified/resource/search_test.go index 25f7f3280c0..ab8cee98432 100644 --- a/pkg/storage/unified/resource/search_test.go +++ b/pkg/storage/unified/resource/search_test.go @@ -246,7 +246,7 @@ func TestBuildIndexes_MaxCountThreshold(t *testing.T) { InitMaxCount: tt.initMaxSize, } - support, err := newSearchSupport(opts, storage, nil, nil, noop.NewTracerProvider().Tracer("test"), nil) + support, err := newSearchSupport(opts, storage, nil, nil, noop.NewTracerProvider().Tracer("test"), nil, nil, nil) require.NoError(t, err) require.NotNil(t, support) @@ -304,7 +304,7 @@ func TestSearchGetOrCreateIndex(t *testing.T) { InitMaxCount: 0, } - support, err := newSearchSupport(opts, storage, nil, nil, noop.NewTracerProvider().Tracer("test"), nil) + support, err := newSearchSupport(opts, storage, nil, nil, noop.NewTracerProvider().Tracer("test"), nil, nil, nil) require.NoError(t, err) require.NotNil(t, support) diff --git a/pkg/storage/unified/resource/server.go b/pkg/storage/unified/resource/server.go index b06ee3e7240..78bf79627da 100644 --- a/pkg/storage/unified/resource/server.go +++ b/pkg/storage/unified/resource/server.go @@ -20,6 +20,7 @@ import ( claims "github.com/grafana/authlib/types" "github.com/grafana/dskit/backoff" + "github.com/grafana/dskit/ring" "github.com/grafana/grafana/pkg/apimachinery/utils" "github.com/grafana/grafana/pkg/storage/unified/resourcepb" @@ -180,6 +181,8 @@ type SearchOptions struct { // Interval for periodic index rebuilds (0 disables periodic rebuilds) RebuildInterval time.Duration + + Ring *ring.Ring } type ResourceServerOptions struct { @@ -223,6 +226,9 @@ type ResourceServerOptions struct { // QOSQueue is the quality of service queue used to enqueue QOSQueue QOSEnqueuer + + Ring *ring.Ring + RingLifecycler *ring.BasicLifecycler } func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) { @@ -305,7 +311,7 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) { if opts.Search.Resources != nil { var err error - s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.Tracer, opts.IndexMetrics) + s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.Tracer, opts.IndexMetrics, opts.Ring, opts.RingLifecycler) if err != nil { return nil, err } diff --git a/pkg/storage/unified/sql/server.go b/pkg/storage/unified/sql/server.go index 97163958972..e89b20a0780 100644 --- a/pkg/storage/unified/sql/server.go +++ b/pkg/storage/unified/sql/server.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/otel/trace" "github.com/grafana/authlib/types" + "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" infraDB "github.com/grafana/grafana/pkg/infra/db" @@ -37,6 +38,8 @@ type ServerOptions struct { IndexMetrics *resource.BleveIndexMetrics Features featuremgmt.FeatureToggles QOSQueue QOSEnqueueDequeuer + Ring *ring.Ring + RingLifecycler *ring.BasicLifecycler } // Creates a new ResourceServer @@ -96,6 +99,8 @@ func NewResourceServer( serverOptions.Search = opts.SearchOptions serverOptions.IndexMetrics = opts.IndexMetrics serverOptions.QOSQueue = opts.QOSQueue + serverOptions.Ring = opts.Ring + serverOptions.RingLifecycler = opts.RingLifecycler return resource.NewResourceServer(serverOptions) } diff --git a/pkg/storage/unified/sql/service.go b/pkg/storage/unified/sql/service.go index c8a2ef8260d..171895cb67b 100644 --- a/pkg/storage/unified/sql/service.go +++ b/pkg/storage/unified/sql/service.go @@ -75,8 +75,8 @@ type service struct { docBuilders resource.DocumentBuilderSupplier - storageRing *ring.Ring - lifecycler *ring.BasicLifecycler + searchRing *ring.Ring + ringLifecycler *ring.BasicLifecycler queue QOSEnqueueDequeuer scheduler *scheduler.Scheduler @@ -91,7 +91,7 @@ func ProvideUnifiedStorageGrpcService( docBuilders resource.DocumentBuilderSupplier, storageMetrics *resource.StorageMetrics, indexMetrics *resource.BleveIndexMetrics, - storageRing *ring.Ring, + searchRing *ring.Ring, memberlistKVConfig kv.Config, ) (UnifiedStorageGrpcService, error) { var err error @@ -116,7 +116,7 @@ func ProvideUnifiedStorageGrpcService( docBuilders: docBuilders, storageMetrics: storageMetrics, indexMetrics: indexMetrics, - storageRing: storageRing, + searchRing: searchRing, subservicesWatcher: services.NewFailureWatcher(), } @@ -143,7 +143,7 @@ func ProvideUnifiedStorageGrpcService( delegate = ring.NewLeaveOnStoppingDelegate(delegate, log) delegate = ring.NewAutoForgetDelegate(resource.RingHeartbeatTimeout*2, delegate, log) - s.lifecycler, err = ring.NewBasicLifecycler( + s.ringLifecycler, err = ring.NewBasicLifecycler( lifecyclerCfg, resource.RingName, resource.RingKey, @@ -155,7 +155,7 @@ func ProvideUnifiedStorageGrpcService( if err != nil { return nil, fmt.Errorf("failed to initialize storage-ring lifecycler: %s", err) } - subservices = append(subservices, s.lifecycler) + subservices = append(subservices, s.ringLifecycler) } if cfg.QOSEnabled { @@ -220,6 +220,8 @@ func (s *service) starting(ctx context.Context) error { IndexMetrics: s.indexMetrics, Features: s.features, QOSQueue: s.queue, + Ring: s.searchRing, + RingLifecycler: s.ringLifecycler, } server, err := NewResourceServer(serverOptions) if err != nil { @@ -254,12 +256,12 @@ func (s *service) starting(ctx context.Context) error { s.log.Info("waiting until resource server is JOINING in the ring") lfcCtx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - if err := ring.WaitInstanceState(lfcCtx, s.storageRing, s.lifecycler.GetInstanceID(), ring.JOINING); err != nil { + if err := ring.WaitInstanceState(lfcCtx, s.searchRing, s.ringLifecycler.GetInstanceID(), ring.JOINING); err != nil { return fmt.Errorf("error switching to JOINING in the ring: %s", err) } s.log.Info("resource server is JOINING in the ring") - if err := s.lifecycler.ChangeState(ctx, ring.ACTIVE); err != nil { + if err := s.ringLifecycler.ChangeState(ctx, ring.ACTIVE); err != nil { return fmt.Errorf("error switching to ACTIVE in the ring: %s", err) } s.log.Info("resource server is ACTIVE in the ring")