update storage-api to only build index if it owns the namespace (#108418)

* update storage-api to only build index if it owns the namespace

---------

Co-authored-by: Mustafa Sencer Özcan <mustafasencer.ozcan@grafana.com>
This commit is contained in:
Will Assis
2025-07-23 15:59:24 -04:00
committed by GitHub
parent 4df4f9cc07
commit 981fdb29d4
6 changed files with 73 additions and 16 deletions
+5
View File
@@ -9,6 +9,7 @@ import (
"go.opentelemetry.io/otel/trace"
"github.com/grafana/authlib/types"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
infraDB "github.com/grafana/grafana/pkg/infra/db"
@@ -37,6 +38,8 @@ type ServerOptions struct {
IndexMetrics *resource.BleveIndexMetrics
Features featuremgmt.FeatureToggles
QOSQueue QOSEnqueueDequeuer
Ring *ring.Ring
RingLifecycler *ring.BasicLifecycler
}
// Creates a new ResourceServer
@@ -96,6 +99,8 @@ func NewResourceServer(
serverOptions.Search = opts.SearchOptions
serverOptions.IndexMetrics = opts.IndexMetrics
serverOptions.QOSQueue = opts.QOSQueue
serverOptions.Ring = opts.Ring
serverOptions.RingLifecycler = opts.RingLifecycler
return resource.NewResourceServer(serverOptions)
}
+10 -8
View File
@@ -75,8 +75,8 @@ type service struct {
docBuilders resource.DocumentBuilderSupplier
storageRing *ring.Ring
lifecycler *ring.BasicLifecycler
searchRing *ring.Ring
ringLifecycler *ring.BasicLifecycler
queue QOSEnqueueDequeuer
scheduler *scheduler.Scheduler
@@ -91,7 +91,7 @@ func ProvideUnifiedStorageGrpcService(
docBuilders resource.DocumentBuilderSupplier,
storageMetrics *resource.StorageMetrics,
indexMetrics *resource.BleveIndexMetrics,
storageRing *ring.Ring,
searchRing *ring.Ring,
memberlistKVConfig kv.Config,
) (UnifiedStorageGrpcService, error) {
var err error
@@ -116,7 +116,7 @@ func ProvideUnifiedStorageGrpcService(
docBuilders: docBuilders,
storageMetrics: storageMetrics,
indexMetrics: indexMetrics,
storageRing: storageRing,
searchRing: searchRing,
subservicesWatcher: services.NewFailureWatcher(),
}
@@ -143,7 +143,7 @@ func ProvideUnifiedStorageGrpcService(
delegate = ring.NewLeaveOnStoppingDelegate(delegate, log)
delegate = ring.NewAutoForgetDelegate(resource.RingHeartbeatTimeout*2, delegate, log)
s.lifecycler, err = ring.NewBasicLifecycler(
s.ringLifecycler, err = ring.NewBasicLifecycler(
lifecyclerCfg,
resource.RingName,
resource.RingKey,
@@ -155,7 +155,7 @@ func ProvideUnifiedStorageGrpcService(
if err != nil {
return nil, fmt.Errorf("failed to initialize storage-ring lifecycler: %s", err)
}
subservices = append(subservices, s.lifecycler)
subservices = append(subservices, s.ringLifecycler)
}
if cfg.QOSEnabled {
@@ -220,6 +220,8 @@ func (s *service) starting(ctx context.Context) error {
IndexMetrics: s.indexMetrics,
Features: s.features,
QOSQueue: s.queue,
Ring: s.searchRing,
RingLifecycler: s.ringLifecycler,
}
server, err := NewResourceServer(serverOptions)
if err != nil {
@@ -254,12 +256,12 @@ func (s *service) starting(ctx context.Context) error {
s.log.Info("waiting until resource server is JOINING in the ring")
lfcCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
if err := ring.WaitInstanceState(lfcCtx, s.storageRing, s.lifecycler.GetInstanceID(), ring.JOINING); err != nil {
if err := ring.WaitInstanceState(lfcCtx, s.searchRing, s.ringLifecycler.GetInstanceID(), ring.JOINING); err != nil {
return fmt.Errorf("error switching to JOINING in the ring: %s", err)
}
s.log.Info("resource server is JOINING in the ring")
if err := s.lifecycler.ChangeState(ctx, ring.ACTIVE); err != nil {
if err := s.ringLifecycler.ChangeState(ctx, ring.ACTIVE); err != nil {
return fmt.Errorf("error switching to ACTIVE in the ring: %s", err)
}
s.log.Info("resource server is ACTIVE in the ring")