Compare commits

...

7 Commits

Author SHA1 Message Date
Rafael Paulovic 314db3c24c chore(unified-storage): add comment on new configs and some cleanup 2026-01-12 13:57:11 +01:00
Rafael Paulovic e7ae54f1a6 fix(unified-storage): add parallelism for sub-index request 2026-01-12 13:57:11 +01:00
Rafael Paulovic fa4dbce7da fix(unified-storage): add per-replica timeout for SLO compliance
Addresses Phase 3 review findings:
- Add 200ms per-replica timeout in searchSubIndexWithFailover()
- Ensures failover happens quickly if primary replica is slow
- Maintains ≤500ms search-distributor latency SLO target
- With replication factor 2: worst case is 2×200ms + merge time
2026-01-12 13:57:11 +01:00
Rafael Paulovic 93264ac67d feat(unified-storage): implement parallel sub-index build coordination
Phase 5 of the sub-index sharding implementation:

- Add buildShardedIndex() that creates all sub-indexes in parallel
- Add buildSingleSubIndex() for building individual sub-indexes
- Add getSubIndexDir() for sub-index-specific directory paths
- Delegate to parallel builder when SubIndexCount > 0
- Maintain backward compatibility with single-index mode

Sub-indexes are built concurrently for faster startup at scale.
The compositeIndex wraps all sub-indexes for unified search operations.
2026-01-12 13:57:10 +01:00
Rafael Paulovic 0b9b47aaea feat(unified-storage): implement scatter-gather queries with replica failover
Phase 3 of the sub-index sharding implementation:

- Add parallelSearchWithFailover() that fans out queries to all sub-indexes
- Implement replica failover - tries each replica in order until success
- Add mergeResults() with deduplication, sorting, pagination
- Handle partial failures with HTTP 206 status when some shards fail
- Add comprehensive unit tests for merge, dedup, sort, and facet operations

The scatter-gather pattern enables distributed search across sub-indexes while
maintaining the ≤250ms SLO target through parallel execution and failover.
2026-01-12 13:57:10 +01:00
Rafael Paulovic 5e74848ee0 feat(unified-storage): add ring integration for sub-index sharding
Phase 2 of unified storage search sharding implementation:

- Add OwnsSubIndex() to service.go that includes subIndexID in ring hash
- Modify ring hash: fmt.Sprintf("%s/%d", namespace, subIndexID)
- Update OwnsIndex() to delegate to OwnsSubIndex(key, 0) for compatibility
- Add OwnsSubIndex callback to BleveOptions for ownership checks
- Update eviction logic to handle sub-indexes separately:
  - runEvictExpiredOrUnownedIndexes() now processes subIndexCache
  - Uses ownsSubIndexFn to check sub-index ownership via ring
- Add closeSubIndex() helper for proper sub-index cleanup
- Update closeAllIndexes() to close both main and sub-indexes
- Pass SubIndexCount and LargeFolderThreshold from config to BleveOptions

This enables sub-indexes to be distributed across ring nodes, with each
sub-index potentially owned by a different node based on the ring hash.
2026-01-12 13:57:09 +01:00
Rafael Paulovic aa90ac7ccc feat(unified-storage): add sub-index sharding infrastructure for 1M scale
Phase 1 of unified storage search sharding implementation:

- Add SubIndexKey type extending NamespacedResource with SubIndexID
- Add new settings: sub_indexes_per_namespace, large_folder_threshold
- Add subIndexCache to bleveBackend for sharded index storage
- Add GetSubIndexForDocument() using FNV32a hash for consistent distribution
- Add GetAllSubIndexKeys() for scatter-gather queries
- Implement compositeIndex wrapper that:
  - Routes BulkIndex to correct sub-index based on document key hash
  - Uses Bleve IndexAlias for unified search across sub-indexes
  - Aggregates DocCount across all sub-indexes
  - Delegates UpdateIndex to all sub-indexes

This enables horizontal scaling for namespaces with 1M+ resources by
distributing documents across multiple sub-indexes (recommended: 64).
2026-01-12 13:57:09 +01:00
8 changed files with 1510 additions and 23 deletions
+1
View File
@@ -603,6 +603,7 @@ type Cfg struct {
MaxFileIndexAge time.Duration // Max age of file-based indexes. Index older than this will be rebuilt asynchronously.
MinFileIndexBuildVersion string // Minimum version of Grafana that built the file-based index. If index was built with older Grafana, it will be rebuilt asynchronously.
EnableSharding bool
SubIndexesPerNamespace int // Number of sub-indexes per (namespace, group, resource) for sharding. 0 = disabled.
QOSEnabled bool
QOSNumberWorker int
QOSMaxSizePerTenant int
+14
View File
@@ -102,19 +102,33 @@ func (cfg *Cfg) setUnifiedStorageConfig() {
}
cfg.EnableSearch = section.Key("enable_search").MustBool(false)
cfg.MaxPageSizeBytes = section.Key("max_page_size_bytes").MustInt(0)
// Index storage path. For Kubernetes Deployments without PVCs, use emptyDir:
// index_path = /var/lib/grafana/unified-search/bleve
// Indexes are derived data and will be rebuilt from SQL on pod restart.
cfg.IndexPath = section.Key("index_path").String()
cfg.IndexWorkers = section.Key("index_workers").MustInt(10)
cfg.IndexRebuildWorkers = section.Key("index_rebuild_workers").MustInt(5)
// Sharding configuration for large-scale deployments (200k+ dashboards)
// When enable_sharding=true, indexes are distributed across pods using a ring.
// Each pod owns a subset of sub-indexes and rebuilds them from SQL on startup.
// This enables horizontal scaling without requiring PVCs (use emptyDir volumes).
cfg.EnableSharding = section.Key("enable_sharding").MustBool(false)
cfg.SubIndexesPerNamespace = section.Key("sub_indexes_per_namespace").MustInt(0) // 0 = disabled, recommended: 64 for 1M scale
cfg.QOSEnabled = section.Key("qos_enabled").MustBool(false)
cfg.QOSNumberWorker = section.Key("qos_num_worker").MustInt(16)
cfg.QOSMaxSizePerTenant = section.Key("qos_max_size_per_tenant").MustInt(1000)
// Memberlist ring configuration for distributed search
// For Kubernetes Deployments, use DNS-based discovery with headless services:
// memberlist_join_member = dnssrv+grafana-memberlist.namespace.svc:7946
// The dnssrv+ prefix triggers SRV record lookup for pod IPs.
cfg.MemberlistBindAddr = section.Key("memberlist_bind_addr").String()
cfg.MemberlistAdvertiseAddr = section.Key("memberlist_advertise_addr").String()
cfg.MemberlistAdvertisePort = section.Key("memberlist_advertise_port").MustInt(7946)
cfg.MemberlistJoinMember = section.Key("memberlist_join_member").String()
cfg.MemberlistClusterLabel = section.Key("memberlist_cluster_label").String()
cfg.MemberlistClusterLabelVerificationDisabled = section.Key("memberlist_cluster_label_verification_disabled").MustBool(false)
// SearchRingReplicationFactor configures replication factor of indexes across multiple instances.
// Recommended: 2 for production deployments using emptyDir volumes to provides availability during pod restarts/rebuilds.
cfg.SearchRingReplicationFactor = section.Key("search_ring_replication_factor").MustInt(1)
cfg.InstanceID = section.Key("instance_id").String()
cfg.IndexFileThreshold = section.Key("index_file_threshold").MustInt(10)
+17
View File
@@ -45,6 +45,23 @@ func (s *NamespacedResource) String() string {
return fmt.Sprintf("%s/%s/%s", s.Namespace, s.Group, s.Resource)
}
// SubIndexKey extends NamespacedResource with a sub-index identifier for sharding.
// When sub-index sharding is enabled, each (namespace, group, resource) is split
// into multiple sub-indexes identified by SubIndexID.
type SubIndexKey struct {
NamespacedResource
SubIndexID int
}
func (s *SubIndexKey) String() string {
return fmt.Sprintf("%s/%s/%s/shard-%d", s.Namespace, s.Group, s.Resource, s.SubIndexID)
}
// ToNamespacedResource returns the NamespacedResource without the sub-index ID.
func (s *SubIndexKey) ToNamespacedResource() NamespacedResource {
return s.NamespacedResource
}
type IndexAction int
const (
@@ -1,11 +1,14 @@
package resource
import (
"cmp"
"context"
"errors"
"fmt"
"hash/fnv"
"math/rand"
"slices"
"sort"
"sync"
"sync/atomic"
"time"
@@ -15,6 +18,7 @@ import (
"github.com/grafana/dskit/services"
userutils "github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
@@ -35,10 +39,11 @@ func ProvideSearchDistributorServer(cfg *setting.Cfg, features featuremgmt.Featu
}
distributorServer := &distributorServer{
log: log.New("index-server-distributor"),
ring: ring,
clientPool: ringClientPool,
tracing: tracer,
log: log.New("index-server-distributor"),
ring: ring,
clientPool: ringClientPool,
tracing: tracer,
subIndexesPerNamespace: cfg.SubIndexesPerNamespace,
}
healthService, err := ProvideHealthService(distributorServer)
@@ -83,10 +88,11 @@ const RingHeartbeatTimeout = time.Minute
const RingNumTokens = 128
type distributorServer struct {
clientPool *ringclient.Pool
ring *ring.Ring
log log.Logger
tracing trace.Tracer
clientPool *ringclient.Pool
ring *ring.Ring
log log.Logger
tracing trace.Tracer
subIndexesPerNamespace int // Number of sub-indexes per namespace (0 = disabled)
}
var (
@@ -99,12 +105,33 @@ var (
func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
ctx, span := ds.tracing.Start(ctx, "distributor.Search")
defer span.End()
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace, "Search")
if err != nil {
return nil, err
// If sub-index sharding is not enabled, use the existing single-node routing
if ds.subIndexesPerNamespace <= 0 {
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace, "Search")
if err != nil {
return nil, err
}
return client.Search(ctx, r)
}
return client.Search(ctx, r)
// Scatter-gather search across all sub-indexes
nsr := NamespacedResource{
Namespace: r.Options.Key.Namespace,
Group: r.Options.Key.Group,
Resource: r.Options.Key.Resource,
}
span.SetAttributes(
attribute.String("namespace", nsr.Namespace),
attribute.String("group", nsr.Group),
attribute.String("resource", nsr.Resource),
attribute.Int("sub_indexes", ds.subIndexesPerNamespace),
)
subIndexes := ds.getSubIndexesForNamespace(nsr)
results := ds.parallelSearchWithFailover(ctx, subIndexes, r)
return ds.mergeResults(ctx, results, r)
}
func (ds *distributorServer) GetStats(ctx context.Context, r *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
@@ -242,7 +269,7 @@ func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, n
return ctx, nil, err
}
rs, err := ds.ring.GetWithOptions(ringHasher.Sum32(), searchRingRead, ring.WithReplicationFactor(ds.ring.ReplicationFactor()))
rs, err := ds.ring.GetWithOptions(ringHasher.Sum32(), searchRingRead)
if err != nil {
ds.log.Debug("error getting replication set from ring", "err", err, "namespace", namespace)
return ctx, nil, err
@@ -276,3 +303,384 @@ func (ds *distributorServer) IsHealthy(ctx context.Context, r *resourcepb.Health
return &resourcepb.HealthCheckResponse{Status: resourcepb.HealthCheckResponse_NOT_SERVING}, nil
}
// --- Scatter-Gather Query Implementation with Replica Failover ---
// subIndexSearchResult holds the result from searching a single sub-index.
type subIndexSearchResult struct {
subIndexID int
response *resourcepb.ResourceSearchResponse
err error
partialFailure bool // true if all replicas for this sub-index failed
}
// getSubIndexesForNamespace returns all sub-index keys for a NamespacedResource.
// This is used for scatter-gather queries that need to query all sub-indexes.
func (ds *distributorServer) getSubIndexesForNamespace(nsr NamespacedResource) []SubIndexKey {
count := ds.subIndexesPerNamespace
if count <= 0 {
count = 1
}
keys := make([]SubIndexKey, count)
for i := 0; i < count; i++ {
keys[i] = SubIndexKey{
NamespacedResource: nsr,
SubIndexID: i,
}
}
return keys
}
// getReplicasForSubIndex returns the ordered list of replicas (instances) that own
// the given sub-index. Replicas are ordered by preference from the ring.
// Uses consistent hashing including the sub-index ID to determine ownership.
func (ds *distributorServer) getReplicasForSubIndex(subIndex SubIndexKey) ([]ring.InstanceDesc, error) {
ringHasher := fnv.New32a()
// Include sub-index ID in hash to distribute sub-indexes across nodes
_, err := ringHasher.Write([]byte(fmt.Sprintf("%s/%d", subIndex.Namespace, subIndex.SubIndexID)))
if err != nil {
return nil, fmt.Errorf("error hashing sub-index key: %w", err)
}
rs, err := ds.ring.GetWithOptions(ringHasher.Sum32(), searchRingRead)
if err != nil {
return nil, fmt.Errorf("error getting replication set from ring for sub-index %s: %w", subIndex.String(), err)
}
return rs.Instances, nil
}
// parallelSearchWithFailover executes search queries across all sub-indexes in parallel.
// For each sub-index, it tries replicas in order until one succeeds.
// Returns results from all sub-indexes (some may be marked as partial failures).
func (ds *distributorServer) parallelSearchWithFailover(ctx context.Context, subIndexes []SubIndexKey, r *resourcepb.ResourceSearchRequest) []*subIndexSearchResult {
ctx, span := ds.tracing.Start(ctx, "distributor.parallelSearchWithFailover")
defer span.End()
results := make([]*subIndexSearchResult, len(subIndexes))
var wg sync.WaitGroup
for i, subIdx := range subIndexes {
wg.Add(1)
go func(idx int, key SubIndexKey) {
defer wg.Done()
results[idx] = ds.searchSubIndexWithFailover(ctx, key, r)
}(i, subIdx)
}
wg.Wait()
// Count partial failures for logging
partialFailures := 0
for _, result := range results {
if result.partialFailure {
partialFailures++
}
}
span.SetAttributes(attribute.Int("partial_failures", partialFailures))
return results
}
// searchSubIndexWithFailover searches a single sub-index, trying each replica in order
// until one succeeds. If all replicas fail, marks the result as a partial failure.
func (ds *distributorServer) searchSubIndexWithFailover(ctx context.Context, subIndex SubIndexKey, r *resourcepb.ResourceSearchRequest) *subIndexSearchResult {
result := &subIndexSearchResult{
subIndexID: subIndex.SubIndexID,
}
// Get ordered list of replicas for this sub-index
replicas, err := ds.getReplicasForSubIndex(subIndex)
if err != nil {
ds.log.Warn("failed to get replicas for sub-index", "subIndex", subIndex.String(), "error", err)
result.err = err
result.partialFailure = true
return result
}
if len(replicas) == 0 {
ds.log.Warn("no replicas available for sub-index", "subIndex", subIndex.String())
result.err = fmt.Errorf("no replicas available for sub-index %s", subIndex.String())
result.partialFailure = true
return result
}
// Prepare context with metadata
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
md = make(metadata.MD)
}
rCtx := userutils.InjectOrgID(metadata.NewOutgoingContext(ctx, md), subIndex.Namespace)
// Try each replica in order until success
// Per-replica timeout ensures SLO compliance (≤500ms for distributed search)
// With replication factor 2: worst case is 2 attempts × 200ms = 400ms, leaving room for merge
const replicaTimeout = 200 * time.Millisecond
var lastErr error
for replicaIdx, replica := range replicas {
client, err := ds.clientPool.GetClientForInstance(replica)
if err != nil {
ds.log.Debug("failed to get client for replica",
"subIndex", subIndex.String(),
"replica", replica.Id,
"replicaIdx", replicaIdx,
"error", err)
lastErr = err
continue
}
// Apply per-replica timeout to ensure failover happens quickly
replicaCtx, cancel := context.WithTimeout(rCtx, replicaTimeout)
resp, err := client.(*RingClient).Client.Search(replicaCtx, r)
cancel() // Always cancel to release resources
if err == nil && resp.Error == nil {
// Success
result.response = resp
return result
}
// Log failover event
if err != nil {
lastErr = err
ds.log.Warn("search failed on replica, failing over to next",
"subIndex", subIndex.String(),
"replica", replica.Id,
"replicaIdx", replicaIdx,
"remainingReplicas", len(replicas)-replicaIdx-1,
"error", err)
} else if resp.Error != nil {
lastErr = fmt.Errorf("search error: %s", resp.Error.Message)
ds.log.Warn("search returned error on replica, failing over to next",
"subIndex", subIndex.String(),
"replica", replica.Id,
"replicaIdx", replicaIdx,
"remainingReplicas", len(replicas)-replicaIdx-1,
"errorMessage", resp.Error.Message)
}
}
// All replicas failed - mark as partial failure
ds.log.Error("all replicas failed for sub-index",
"subIndex", subIndex.String(),
"totalReplicas", len(replicas),
"lastError", lastErr)
result.err = lastErr
result.partialFailure = true
return result
}
// mergeResults combines results from all sub-indexes into a single response.
// It handles:
// - Result deduplication by resource key
// - Sort merging (merge-sort for sorted results)
// - Pagination across shards
// - Tracking and reporting partial failures
func (ds *distributorServer) mergeResults(ctx context.Context, results []*subIndexSearchResult, req *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
ctx, span := ds.tracing.Start(ctx, "distributor.mergeResults")
defer span.End()
// Collect all rows and track partial failures
var allRows []*resourcepb.ResourceTableRow
var columns []*resourcepb.ResourceTableColumnDefinition
var totalHits int64
var totalQueryCost float64
var maxScore float64
partialFailures := 0
failedSubIndexes := []int{}
facets := make(map[string]*resourcepb.ResourceSearchResponse_Facet)
for _, result := range results {
if result.partialFailure {
partialFailures++
failedSubIndexes = append(failedSubIndexes, result.subIndexID)
continue
}
if result.response == nil {
continue
}
resp := result.response
totalHits += resp.TotalHits
totalQueryCost += resp.QueryCost
if resp.MaxScore > maxScore {
maxScore = resp.MaxScore
}
// Use columns from first valid response
if columns == nil && resp.Results != nil {
columns = resp.Results.Columns
}
// Collect rows
if resp.Results != nil && len(resp.Results.Rows) > 0 {
allRows = append(allRows, resp.Results.Rows...)
}
// Merge facets
for k, v := range resp.Facet {
if existing, ok := facets[k]; ok {
mergeFacets(existing, v)
} else {
facets[k] = v
}
}
}
span.SetAttributes(
attribute.Int("total_rows_before_dedup", len(allRows)),
attribute.Int("partial_failures", partialFailures),
)
// Deduplicate rows by resource key
allRows = deduplicateRows(allRows)
span.SetAttributes(attribute.Int("total_rows_after_dedup", len(allRows)))
// Sort rows if sort criteria provided
if len(req.SortBy) > 0 && columns != nil {
sortRows(allRows, columns, req.SortBy)
}
// Apply pagination
offset := int(req.Offset)
limit := int(req.Limit)
if limit <= 0 {
limit = 100 // default limit
}
var paginatedRows []*resourcepb.ResourceTableRow
if offset < len(allRows) {
end := offset + limit
if end > len(allRows) {
end = len(allRows)
}
paginatedRows = allRows[offset:end]
}
// Build response
response := &resourcepb.ResourceSearchResponse{
TotalHits: totalHits,
QueryCost: totalQueryCost,
MaxScore: maxScore,
Results: &resourcepb.ResourceTable{
Columns: columns,
Rows: paginatedRows,
},
Facet: facets,
}
// Report partial failures if any
if partialFailures > 0 {
response.Error = &resourcepb.ErrorResult{
Code: 206, // Partial Content
Message: fmt.Sprintf("partial results: %d of %d sub-indexes failed (sub-indexes: %v)", partialFailures, len(results), failedSubIndexes),
}
ds.log.Warn("search returned partial results",
"namespace", req.Options.Key.Namespace,
"failedSubIndexes", partialFailures,
"totalSubIndexes", len(results))
}
return response, nil
}
// deduplicateRows removes duplicate rows based on their resource key.
// If duplicates exist, keeps the first occurrence.
func deduplicateRows(rows []*resourcepb.ResourceTableRow) []*resourcepb.ResourceTableRow {
if len(rows) == 0 {
return rows
}
seen := make(map[string]bool, len(rows))
result := make([]*resourcepb.ResourceTableRow, 0, len(rows))
for _, row := range rows {
if row.Key == nil {
continue
}
key := SearchID(row.Key)
if !seen[key] {
seen[key] = true
result = append(result, row)
}
}
return result
}
// sortRows sorts rows based on the sort criteria.
// Uses stable sort to maintain relative ordering of equal elements.
func sortRows(rows []*resourcepb.ResourceTableRow, columns []*resourcepb.ResourceTableColumnDefinition, sortBy []*resourcepb.ResourceSearchRequest_Sort) {
if len(rows) == 0 || len(sortBy) == 0 {
return
}
// Build column index map
columnIndex := make(map[string]int)
for i, col := range columns {
columnIndex[col.Name] = i
}
sort.SliceStable(rows, func(i, j int) bool {
for _, s := range sortBy {
colIdx, ok := columnIndex[s.Field]
if !ok {
continue
}
// Get cell values
var valI, valJ []byte
if colIdx < len(rows[i].Cells) {
valI = rows[i].Cells[colIdx]
}
if colIdx < len(rows[j].Cells) {
valJ = rows[j].Cells[colIdx]
}
// Compare byte slices
cmpResult := slices.Compare(valI, valJ)
if cmpResult == 0 {
continue // Values are equal, check next sort field
}
// Apply descending order if needed
if s.Desc {
return cmpResult > 0
}
return cmpResult < 0
}
return false // All sort fields are equal
})
}
// mergeFacets merges facet data from source into target.
func mergeFacets(target, source *resourcepb.ResourceSearchResponse_Facet) {
if source == nil {
return
}
target.Total += source.Total
target.Missing += source.Missing
// Merge term facets
termMap := make(map[string]int64)
for _, t := range target.Terms {
termMap[t.Term] = t.Count
}
for _, t := range source.Terms {
termMap[t.Term] += t.Count
}
// Rebuild term slice sorted by count (descending)
target.Terms = make([]*resourcepb.ResourceSearchResponse_TermFacet, 0, len(termMap))
for term, count := range termMap {
target.Terms = append(target.Terms, &resourcepb.ResourceSearchResponse_TermFacet{
Term: term,
Count: count,
})
}
slices.SortFunc(target.Terms, func(a, b *resourcepb.ResourceSearchResponse_TermFacet) int {
return cmp.Compare(b.Count, a.Count) // Descending order
})
}
@@ -0,0 +1,390 @@
package resource
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace/noop"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
)
func TestGetSubIndexesForNamespace(t *testing.T) {
tests := []struct {
name string
subIndexCount int
expectedCount int
expectedIDs []int
}{
{
name: "zero sub-indexes defaults to 1",
subIndexCount: 0,
expectedCount: 1,
expectedIDs: []int{0},
},
{
name: "negative sub-indexes defaults to 1",
subIndexCount: -1,
expectedCount: 1,
expectedIDs: []int{0},
},
{
name: "4 sub-indexes",
subIndexCount: 4,
expectedCount: 4,
expectedIDs: []int{0, 1, 2, 3},
},
{
name: "64 sub-indexes",
subIndexCount: 64,
expectedCount: 64,
expectedIDs: nil, // Don't check all 64
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ds := &distributorServer{
subIndexesPerNamespace: tt.subIndexCount,
}
nsr := NamespacedResource{
Namespace: "org-1",
Group: "dashboard.grafana.app",
Resource: "dashboards",
}
result := ds.getSubIndexesForNamespace(nsr)
assert.Len(t, result, tt.expectedCount)
// Check that all keys have the correct NSR
for i, key := range result {
assert.Equal(t, nsr, key.NamespacedResource)
assert.Equal(t, i, key.SubIndexID)
}
// Check specific IDs if provided
if tt.expectedIDs != nil {
for i, expectedID := range tt.expectedIDs {
assert.Equal(t, expectedID, result[i].SubIndexID)
}
}
})
}
}
func TestDeduplicateRows(t *testing.T) {
tests := []struct {
name string
input []*resourcepb.ResourceTableRow
expected int // expected number of rows after dedup
}{
{
name: "empty input",
input: []*resourcepb.ResourceTableRow{},
expected: 0,
},
{
name: "no duplicates",
input: []*resourcepb.ResourceTableRow{
{Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "a"}},
{Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "b"}},
{Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "c"}},
},
expected: 3,
},
{
name: "with duplicates",
input: []*resourcepb.ResourceTableRow{
{Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "a"}},
{Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "b"}},
{Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "a"}}, // duplicate
{Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "c"}},
{Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "b"}}, // duplicate
},
expected: 3,
},
{
name: "rows with nil keys are skipped",
input: []*resourcepb.ResourceTableRow{
{Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "a"}},
{Key: nil},
{Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "b"}},
},
expected: 2,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := deduplicateRows(tt.input)
assert.Len(t, result, tt.expected)
})
}
}
func TestSortRows(t *testing.T) {
columns := []*resourcepb.ResourceTableColumnDefinition{
{Name: "title"},
{Name: "created"},
}
tests := []struct {
name string
rows []*resourcepb.ResourceTableRow
sortBy []*resourcepb.ResourceSearchRequest_Sort
expectedOrder []string // expected order of first cell values
}{
{
name: "sort ascending by title",
rows: []*resourcepb.ResourceTableRow{
{Key: &resourcepb.ResourceKey{Name: "c"}, Cells: [][]byte{[]byte("charlie"), nil}},
{Key: &resourcepb.ResourceKey{Name: "a"}, Cells: [][]byte{[]byte("alpha"), nil}},
{Key: &resourcepb.ResourceKey{Name: "b"}, Cells: [][]byte{[]byte("bravo"), nil}},
},
sortBy: []*resourcepb.ResourceSearchRequest_Sort{
{Field: "title", Desc: false},
},
expectedOrder: []string{"alpha", "bravo", "charlie"},
},
{
name: "sort descending by title",
rows: []*resourcepb.ResourceTableRow{
{Key: &resourcepb.ResourceKey{Name: "a"}, Cells: [][]byte{[]byte("alpha"), nil}},
{Key: &resourcepb.ResourceKey{Name: "b"}, Cells: [][]byte{[]byte("bravo"), nil}},
{Key: &resourcepb.ResourceKey{Name: "c"}, Cells: [][]byte{[]byte("charlie"), nil}},
},
sortBy: []*resourcepb.ResourceSearchRequest_Sort{
{Field: "title", Desc: true},
},
expectedOrder: []string{"charlie", "bravo", "alpha"},
},
{
name: "empty rows",
rows: []*resourcepb.ResourceTableRow{},
sortBy: []*resourcepb.ResourceSearchRequest_Sort{{Field: "title"}},
expectedOrder: []string{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sortRows(tt.rows, columns, tt.sortBy)
for i, expected := range tt.expectedOrder {
if i < len(tt.rows) {
assert.Equal(t, expected, string(tt.rows[i].Cells[0]))
}
}
})
}
}
func TestMergeFacets(t *testing.T) {
t.Run("merge term facets", func(t *testing.T) {
target := &resourcepb.ResourceSearchResponse_Facet{
Field: "tags",
Total: 100,
Missing: 5,
Terms: []*resourcepb.ResourceSearchResponse_TermFacet{
{Term: "production", Count: 50},
{Term: "staging", Count: 30},
},
}
source := &resourcepb.ResourceSearchResponse_Facet{
Field: "tags",
Total: 80,
Missing: 3,
Terms: []*resourcepb.ResourceSearchResponse_TermFacet{
{Term: "production", Count: 40},
{Term: "development", Count: 25},
},
}
mergeFacets(target, source)
assert.Equal(t, int64(180), target.Total)
assert.Equal(t, int64(8), target.Missing)
assert.Len(t, target.Terms, 3)
// Terms should be sorted by count descending
termCounts := make(map[string]int64)
for _, term := range target.Terms {
termCounts[term.Term] = term.Count
}
assert.Equal(t, int64(90), termCounts["production"])
assert.Equal(t, int64(30), termCounts["staging"])
assert.Equal(t, int64(25), termCounts["development"])
})
t.Run("merge nil source", func(t *testing.T) {
target := &resourcepb.ResourceSearchResponse_Facet{
Total: 100,
}
mergeFacets(target, nil)
assert.Equal(t, int64(100), target.Total)
})
}
func TestSubIndexSearchResult(t *testing.T) {
t.Run("successful result", func(t *testing.T) {
result := &subIndexSearchResult{
subIndexID: 5,
response: &resourcepb.ResourceSearchResponse{
TotalHits: 100,
},
partialFailure: false,
}
assert.False(t, result.partialFailure)
assert.NotNil(t, result.response)
})
t.Run("partial failure result", func(t *testing.T) {
result := &subIndexSearchResult{
subIndexID: 3,
err: assert.AnError,
partialFailure: true,
}
assert.True(t, result.partialFailure)
assert.Nil(t, result.response)
assert.Error(t, result.err)
})
}
func TestSubIndexKey(t *testing.T) {
key := SubIndexKey{
NamespacedResource: NamespacedResource{
Namespace: "org-1",
Group: "dashboard.grafana.app",
Resource: "dashboards",
},
SubIndexID: 42,
}
t.Run("String representation", func(t *testing.T) {
expected := "org-1/dashboard.grafana.app/dashboards/shard-42"
assert.Equal(t, expected, key.String())
})
t.Run("ToNamespacedResource", func(t *testing.T) {
nsr := key.ToNamespacedResource()
assert.Equal(t, "org-1", nsr.Namespace)
assert.Equal(t, "dashboard.grafana.app", nsr.Group)
assert.Equal(t, "dashboards", nsr.Resource)
})
}
func newTestDistributorServer() *distributorServer {
return &distributorServer{
tracing: noop.NewTracerProvider().Tracer("test"),
log: log.New("test-distributor"),
}
}
func TestMergeResultsPagination(t *testing.T) {
// Create mock results from 3 sub-indexes
results := []*subIndexSearchResult{
{
subIndexID: 0,
response: &resourcepb.ResourceSearchResponse{
TotalHits: 10,
Results: &resourcepb.ResourceTable{
Columns: []*resourcepb.ResourceTableColumnDefinition{{Name: "title"}},
Rows: []*resourcepb.ResourceTableRow{
{Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "a1"}, Cells: [][]byte{[]byte("a1")}},
{Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "a2"}, Cells: [][]byte{[]byte("a2")}},
},
},
},
},
{
subIndexID: 1,
response: &resourcepb.ResourceSearchResponse{
TotalHits: 10,
Results: &resourcepb.ResourceTable{
Columns: []*resourcepb.ResourceTableColumnDefinition{{Name: "title"}},
Rows: []*resourcepb.ResourceTableRow{
{Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "b1"}, Cells: [][]byte{[]byte("b1")}},
{Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "b2"}, Cells: [][]byte{[]byte("b2")}},
},
},
},
},
}
t.Run("pagination with limit", func(t *testing.T) {
ds := newTestDistributorServer()
req := &resourcepb.ResourceSearchRequest{
Limit: 2,
Offset: 0,
Options: &resourcepb.ListOptions{
Key: &resourcepb.ResourceKey{Namespace: "ns"},
},
}
resp, err := ds.mergeResults(context.Background(), results, req)
require.NoError(t, err)
assert.Equal(t, int64(20), resp.TotalHits) // 10 + 10
assert.Len(t, resp.Results.Rows, 2)
})
t.Run("pagination with offset", func(t *testing.T) {
ds := newTestDistributorServer()
req := &resourcepb.ResourceSearchRequest{
Limit: 2,
Offset: 2,
Options: &resourcepb.ListOptions{
Key: &resourcepb.ResourceKey{Namespace: "ns"},
},
}
resp, err := ds.mergeResults(context.Background(), results, req)
require.NoError(t, err)
assert.Len(t, resp.Results.Rows, 2) // rows 3 and 4
})
}
func TestMergeResultsPartialFailure(t *testing.T) {
results := []*subIndexSearchResult{
{
subIndexID: 0,
response: &resourcepb.ResourceSearchResponse{
TotalHits: 10,
Results: &resourcepb.ResourceTable{
Columns: []*resourcepb.ResourceTableColumnDefinition{{Name: "title"}},
Rows: []*resourcepb.ResourceTableRow{},
},
},
},
{
subIndexID: 1,
partialFailure: true,
err: assert.AnError,
},
{
subIndexID: 2,
partialFailure: true,
err: assert.AnError,
},
}
ds := newTestDistributorServer()
req := &resourcepb.ResourceSearchRequest{
Limit: 100,
Options: &resourcepb.ListOptions{
Key: &resourcepb.ResourceKey{Namespace: "ns"},
},
}
resp, err := ds.mergeResults(context.Background(), results, req)
require.NoError(t, err)
// Should have partial failure error
require.NotNil(t, resp.Error)
assert.Equal(t, int32(206), resp.Error.Code) // HTTP 206 Partial Content
assert.Contains(t, resp.Error.Message, "2 of 3 sub-indexes failed")
}
+628 -5
View File
@@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"hash/fnv"
"math"
"os"
"path/filepath"
@@ -81,6 +82,17 @@ type BleveOptions struct {
// Indexes that are not owned by current instance are eligible for cleanup.
// If nil, all indexes are owned by the current instance.
OwnsIndex func(key resource.NamespacedResource) (bool, error)
// OwnsSubIndex is called to check whether a specific sub-index is owned by the current instance.
// This function considers the sub-index ID when determining ownership via ring hash.
// If nil, falls back to OwnsIndex behavior with subIndexID=0.
OwnsSubIndex func(key resource.NamespacedResource, subIndexID int) (bool, error)
// SubIndexCount is the number of sub-indexes per (namespace, group, resource).
// When > 0, documents are distributed across sub-indexes using consistent hashing.
// This enables horizontal scaling for large namespaces (1M+ documents).
// Recommended: 64 for large scale deployments.
SubIndexCount int
}
type bleveBackend struct {
@@ -90,9 +102,17 @@ type bleveBackend struct {
// set from opts.OwnsIndex, always non-nil
ownsIndexFn func(key resource.NamespacedResource) (bool, error)
// set from opts.OwnsSubIndex, always non-nil
// Used for checking ownership of sub-indexes when sharding is enabled
ownsSubIndexFn func(key resource.NamespacedResource, subIndexID int) (bool, error)
cacheMx sync.RWMutex
cache map[resource.NamespacedResource]*bleveIndex
// subIndexCache stores sub-indexes when sub-index sharding is enabled.
// Key is SubIndexKey which includes the sub-index ID.
subIndexCache map[resource.SubIndexKey]*bleveIndex
indexMetrics *resource.BleveIndexMetrics
bgTasksCancel func()
@@ -136,12 +156,23 @@ func NewBleveBackend(opts BleveOptions, indexMetrics *resource.BleveIndexMetrics
ownFn = func(key resource.NamespacedResource) (bool, error) { return true, nil }
}
ownSubFn := opts.OwnsSubIndex
if ownSubFn == nil {
// By default, fall back to OwnsIndex behavior (ignore subIndexID).
// This maintains backward compatibility when sub-index sharding is not enabled.
ownSubFn = func(key resource.NamespacedResource, subIndexID int) (bool, error) {
return ownFn(key)
}
}
be := &bleveBackend{
log: l,
cache: map[resource.NamespacedResource]*bleveIndex{},
opts: opts,
ownsIndexFn: ownFn,
indexMetrics: indexMetrics,
log: l,
cache: map[resource.NamespacedResource]*bleveIndex{},
subIndexCache: map[resource.SubIndexKey]*bleveIndex{},
opts: opts,
ownsIndexFn: ownFn,
ownsSubIndexFn: ownSubFn,
indexMetrics: indexMetrics,
}
ctx, cancel := context.WithCancel(context.Background())
@@ -158,6 +189,53 @@ func NewBleveBackend(opts BleveOptions, indexMetrics *resource.BleveIndexMetrics
return be, nil
}
// IsSubIndexShardingEnabled returns true if sub-index sharding is configured.
func (b *bleveBackend) IsSubIndexShardingEnabled() bool {
return b.opts.SubIndexCount > 0
}
// GetSubIndexCount returns the number of sub-indexes per (namespace, group, resource).
func (b *bleveBackend) GetSubIndexCount() int {
return b.opts.SubIndexCount
}
// GetSubIndexForDocument computes the sub-index ID for a document based on its key.
// Uses FNV32a hash of the full document key for consistent distribution.
func (b *bleveBackend) GetSubIndexForDocument(key *resourcepb.ResourceKey) int {
if b.opts.SubIndexCount <= 0 {
return 0
}
h := fnv.New32a()
// Hash the full document key: namespace/group/resource/name
_, _ = h.Write([]byte(fmt.Sprintf("%s/%s/%s/%s", key.Namespace, key.Group, key.Resource, key.Name)))
return int(h.Sum32() % uint32(b.opts.SubIndexCount))
}
// GetSubIndexKey returns the SubIndexKey for a document.
func (b *bleveBackend) GetSubIndexKey(nsr resource.NamespacedResource, docKey *resourcepb.ResourceKey) resource.SubIndexKey {
return resource.SubIndexKey{
NamespacedResource: nsr,
SubIndexID: b.GetSubIndexForDocument(docKey),
}
}
// GetAllSubIndexKeys returns all SubIndexKey values for a NamespacedResource.
// This is used for scatter-gather queries that need to query all sub-indexes.
func (b *bleveBackend) GetAllSubIndexKeys(nsr resource.NamespacedResource) []resource.SubIndexKey {
count := b.opts.SubIndexCount
if count <= 0 {
count = 1
}
keys := make([]resource.SubIndexKey, count)
for i := 0; i < count; i++ {
keys[i] = resource.SubIndexKey{
NamespacedResource: nsr,
SubIndexID: i,
}
}
return keys
}
// GetIndex will return nil if the key does not exist
func (b *bleveBackend) GetIndex(key resource.NamespacedResource) resource.ResourceIndex {
idx := b.getCachedIndex(key, time.Now())
@@ -228,7 +306,13 @@ func (b *bleveBackend) runEvictExpiredOrUnownedIndexes(now time.Time) {
unowned := map[resource.NamespacedResource]*bleveIndex{}
ownCheckErrors := map[resource.NamespacedResource]error{}
// For sub-indexes
expiredSubIndexes := map[resource.SubIndexKey]*bleveIndex{}
unownedSubIndexes := map[resource.SubIndexKey]*bleveIndex{}
ownSubCheckErrors := map[resource.SubIndexKey]error{}
b.cacheMx.Lock()
// Process main cache (non-sharded indexes)
for key, idx := range b.cache {
// Check if index has expired.
if !idx.expiration.IsZero() && now.After(idx.expiration) {
@@ -248,21 +332,75 @@ func (b *bleveBackend) runEvictExpiredOrUnownedIndexes(now time.Time) {
}
}
}
// Process sub-index cache (sharded indexes)
for subKey, idx := range b.subIndexCache {
// Check if sub-index has expired.
if !idx.expiration.IsZero() && now.After(idx.expiration) {
delete(b.subIndexCache, subKey)
expiredSubIndexes[subKey] = idx
continue
}
// Check if sub-index is owned by this instance using OwnsSubIndex.
// This considers the subIndexID when determining ownership via ring hash.
if cacheTTLMillis > 0 {
owned, err := b.ownsSubIndexFn(subKey.NamespacedResource, subKey.SubIndexID)
if err != nil {
ownSubCheckErrors[subKey] = err
} else if !owned && now.UnixMilli()-idx.lastFetchedFromCache.Load() > cacheTTLMillis {
delete(b.subIndexCache, subKey)
unownedSubIndexes[subKey] = idx
}
}
}
b.cacheMx.Unlock()
// Log errors for main cache ownership checks
for key, err := range ownCheckErrors {
b.log.Warn("failed to check if index belongs to this instance", "key", key, "err", err)
}
// Log errors for sub-index ownership checks
for subKey, err := range ownSubCheckErrors {
b.log.Warn("failed to check if sub-index belongs to this instance", "subKey", subKey, "err", err)
}
// Evict unowned main indexes
for key, idx := range unowned {
b.log.Info("index evicted from cache", "reason", "unowned", "key", key, "storage", idx.indexStorage)
b.closeIndex(idx, key)
}
// Evict unowned sub-indexes
for subKey, idx := range unownedSubIndexes {
b.log.Info("sub-index evicted from cache", "reason", "unowned", "subKey", subKey, "storage", idx.indexStorage)
b.closeSubIndex(idx, subKey)
}
// Evict expired main indexes
for key, idx := range expired {
b.log.Info("index evicted from cache", "reason", "expired", "key", key, "storage", idx.indexStorage)
b.closeIndex(idx, key)
}
// Evict expired sub-indexes
for subKey, idx := range expiredSubIndexes {
b.log.Info("sub-index evicted from cache", "reason", "expired", "subKey", subKey, "storage", idx.indexStorage)
b.closeSubIndex(idx, subKey)
}
}
// closeSubIndex closes a sub-index and updates metrics.
func (b *bleveBackend) closeSubIndex(idx *bleveIndex, key resource.SubIndexKey) {
err := idx.stopUpdaterAndCloseIndex()
if err != nil {
b.log.Error("failed to close sub-index", "key", key, "err", err)
}
if b.indexMetrics != nil {
b.indexMetrics.OpenIndexes.WithLabelValues(idx.indexStorage).Dec()
}
}
// updateIndexSizeMetric sets the total size of all file-based indices metric.
@@ -368,6 +506,12 @@ func (b *bleveBackend) BuildIndex(
attribute.String("reason", indexBuildReason),
)
// If sub-index sharding is enabled, delegate to the parallel sub-index builder
if b.opts.SubIndexCount > 0 {
span.SetAttributes(attribute.Int("sub_index_count", b.opts.SubIndexCount))
return b.buildShardedIndex(ctx, key, size, fields, indexBuildReason, builder, updater, rebuild)
}
mapper, err := GetBleveMappings(fields)
if err != nil {
return nil, err
@@ -603,6 +747,294 @@ func isPathWithinRoot(path, absoluteRoot string) bool {
return true
}
// buildShardedIndex builds all sub-indexes in parallel and returns a compositeIndex.
// This is used when sub-index sharding is enabled (SubIndexCount > 0).
func (b *bleveBackend) buildShardedIndex(
ctx context.Context,
key resource.NamespacedResource,
size int64,
fields resource.SearchableDocumentFields,
indexBuildReason string,
builder resource.BuildFn,
updater resource.UpdateFn,
rebuild bool,
) (resource.ResourceIndex, error) {
_, span := tracer.Start(ctx, "search.bleveBackend.buildShardedIndex")
defer span.End()
logWithDetails := b.log.FromContext(ctx).New(
"namespace", key.Namespace,
"group", key.Group,
"resource", key.Resource,
"size", size,
"reason", indexBuildReason,
"sub_index_count", b.opts.SubIndexCount,
)
mapper, err := GetBleveMappings(fields)
if err != nil {
return nil, err
}
standardSearchFields := resource.StandardSearchFields()
allFields, err := getAllFields(standardSearchFields, fields)
if err != nil {
return nil, err
}
// Calculate size per sub-index for threshold decisions
sizePerSubIndex := size / int64(b.opts.SubIndexCount)
if sizePerSubIndex < 1 {
sizePerSubIndex = 1
}
// Create all sub-indexes with limited concurrency to avoid overwhelming the file system
// Use a worker pool pattern - limit to 8 concurrent index creations
type subIndexResult struct {
subIndex *bleveIndex
subIndexKey resource.SubIndexKey
err error
}
results := make([]subIndexResult, b.opts.SubIndexCount)
var wg sync.WaitGroup
// Limit concurrent index creations to avoid file system contention
maxConcurrentIndexCreations := 8
if b.opts.SubIndexCount < maxConcurrentIndexCreations {
maxConcurrentIndexCreations = b.opts.SubIndexCount
}
semaphore := make(chan struct{}, maxConcurrentIndexCreations)
logWithDetails.Info("Building sharded index", "sub_index_count", b.opts.SubIndexCount, "size_per_sub_index", sizePerSubIndex, "max_concurrent", maxConcurrentIndexCreations)
for i := 0; i < b.opts.SubIndexCount; i++ {
wg.Add(1)
go func(subIndexID int) {
defer wg.Done()
// Acquire semaphore
semaphore <- struct{}{}
defer func() { <-semaphore }()
subKey := resource.SubIndexKey{
NamespacedResource: key,
SubIndexID: subIndexID,
}
results[subIndexID].subIndexKey = subKey
// Build individual sub-index
idx, buildErr := b.buildSingleSubIndex(
ctx, subKey, sizePerSubIndex, mapper, fields, allFields, standardSearchFields, updater, indexBuildReason, rebuild,
)
if buildErr != nil {
results[subIndexID].err = buildErr
return
}
results[subIndexID].subIndex = idx
}(i)
}
wg.Wait()
// Check for errors and collect successful sub-indexes
subIndexes := make([]*bleveIndex, 0, b.opts.SubIndexCount)
var firstErr error
for i, result := range results {
if result.err != nil {
logWithDetails.Error("Failed to build sub-index", "sub_index_id", i, "err", result.err)
if firstErr == nil {
firstErr = result.err
}
continue
}
if result.subIndex != nil {
subIndexes = append(subIndexes, result.subIndex)
}
}
// If any sub-index failed, clean up and return error
if firstErr != nil {
for _, result := range results {
if result.subIndex != nil {
_ = result.subIndex.stopUpdaterAndCloseIndex()
}
}
return nil, fmt.Errorf("failed to build sharded index: %w", firstErr)
}
// Create the composite index
composite := b.newCompositeIndex(key, subIndexes, fields, allFields, standardSearchFields, b.log.New("namespace", key.Namespace, "group", key.Group, "resource", key.Resource))
// Build the index by calling the builder with the composite index
// The builder will call BulkIndex on the composite, which routes documents to correct sub-indexes
if b.indexMetrics != nil {
b.indexMetrics.IndexBuilds.WithLabelValues(indexBuildReason).Inc()
}
start := time.Now()
listRV, err := builder(composite)
if err != nil {
logWithDetails.Error("Failed to build sharded index", "err", err)
if b.indexMetrics != nil {
b.indexMetrics.IndexBuildFailures.Inc()
}
// Clean up all sub-indexes on failure
for _, idx := range subIndexes {
_ = idx.stopUpdaterAndCloseIndex()
}
return nil, fmt.Errorf("failed to build sharded index: %w", err)
}
// Update resource version on all sub-indexes
for _, idx := range subIndexes {
if err := idx.updateResourceVersion(listRV); err != nil {
logWithDetails.Error("Failed to persist RV to sub-index", "err", err, "rv", listRV)
// Continue - this is not fatal
}
}
elapsed := time.Since(start)
logWithDetails.Info("Finished building sharded index", "elapsed", elapsed, "listRV", listRV, "sub_indexes_built", len(subIndexes))
if b.indexMetrics != nil {
b.indexMetrics.IndexCreationTime.WithLabelValues().Observe(elapsed.Seconds())
}
// Store sub-indexes in the cache
b.cacheMx.Lock()
for i, idx := range subIndexes {
subKey := resource.SubIndexKey{
NamespacedResource: key,
SubIndexID: i,
}
prev := b.subIndexCache[subKey]
b.subIndexCache[subKey] = idx
// Close previous sub-index if it existed
if prev != nil {
if b.indexMetrics != nil {
b.indexMetrics.OpenIndexes.WithLabelValues(prev.indexStorage).Dec()
}
if err := prev.stopUpdaterAndCloseIndex(); err != nil {
logWithDetails.Error("failed to close previous sub-index", "sub_key", subKey, "err", err)
}
}
if b.indexMetrics != nil {
b.indexMetrics.OpenIndexes.WithLabelValues(idx.indexStorage).Inc()
}
}
b.cacheMx.Unlock()
return composite, nil
}
// buildSingleSubIndex builds a single sub-index for sharded indexing.
func (b *bleveBackend) buildSingleSubIndex(
ctx context.Context,
subKey resource.SubIndexKey,
size int64,
mapper mapping.IndexMapping,
fields resource.SearchableDocumentFields,
allFields []*resourcepb.ResourceTableColumnDefinition,
standardSearchFields resource.SearchableDocumentFields,
updater resource.UpdateFn,
indexBuildReason string,
rebuild bool,
) (*bleveIndex, error) {
key := subKey.ToNamespacedResource()
logWithDetails := b.log.FromContext(ctx).New(
"namespace", key.Namespace,
"group", key.Group,
"resource", key.Resource,
"sub_index_id", subKey.SubIndexID,
"size", size,
)
// Get directory for this sub-index
subIndexDir := b.getSubIndexDir(subKey)
var index bleve.Index
var indexRV int64
var err error
fileIndexName := ""
newIndexType := indexStorageMemory
if size >= b.opts.FileThreshold {
newIndexType = indexStorageFile
// Check for existing file-based index if not rebuilding
if !rebuild {
index, fileIndexName, indexRV, err = b.findPreviousFileBasedIndex(subIndexDir)
if err != nil {
return nil, err
}
}
if index != nil {
logWithDetails.Debug("Existing sub-index found on filesystem", "indexRV", indexRV, "directory", filepath.Join(subIndexDir, fileIndexName))
} else {
// Create new file-based index
indexDir := ""
now := time.Now()
for index == nil {
fileIndexName = formatIndexName(now)
indexDir = filepath.Join(subIndexDir, fileIndexName)
if !isPathWithinRoot(indexDir, b.opts.Root) {
return nil, fmt.Errorf("invalid path %s", indexDir)
}
// Ensure sub-index directory exists
if err := os.MkdirAll(subIndexDir, 0750); err != nil {
return nil, fmt.Errorf("failed to create sub-index directory: %w", err)
}
index, err = newBleveIndex(indexDir, mapper, time.Now(), b.opts.BuildVersion)
if errors.Is(err, bleve.ErrorIndexPathExists) {
now = now.Add(time.Second)
index = nil
continue
}
if err != nil {
return nil, fmt.Errorf("error creating new bleve sub-index: %s %w", indexDir, err)
}
}
logWithDetails.Debug("Created new file-based sub-index", "directory", indexDir)
}
} else {
index, err = newBleveIndex("", mapper, time.Now(), b.opts.BuildVersion)
if err != nil {
return nil, fmt.Errorf("error creating new in-memory bleve sub-index: %w", err)
}
logWithDetails.Debug("Created new in-memory sub-index")
}
// Create the bleveIndex wrapper
idx := b.newBleveIndex(key, index, newIndexType, fields, allFields, standardSearchFields, updater, logWithDetails)
// Set expiration for in-memory indexes
if fileIndexName == "" && b.opts.IndexCacheTTL > 0 {
idx.expiration = time.Now().Add(b.opts.IndexCacheTTL)
}
// If we reused an existing index, set its resource version
if indexRV > 0 {
idx.resourceVersion = indexRV
}
return idx, nil
}
// getSubIndexDir returns the directory path for a sub-index.
func (b *bleveBackend) getSubIndexDir(subKey resource.SubIndexKey) string {
key := subKey.ToNamespacedResource()
return filepath.Join(
b.opts.Root,
cleanFileSegment(key.Namespace),
cleanFileSegment(fmt.Sprintf("%s.%s", key.Resource, key.Group)),
fmt.Sprintf("shard-%d", subKey.SubIndexID),
)
}
// TotalDocs returns the total number of documents across all indices
func (b *bleveBackend) TotalDocs() int64 {
var totalDocs int64
@@ -678,6 +1110,7 @@ func (b *bleveBackend) closeAllIndexes() {
b.cacheMx.Lock()
defer b.cacheMx.Unlock()
// Close main indexes
for key, idx := range b.cache {
if err := idx.stopUpdaterAndCloseIndex(); err != nil {
b.log.Error("Failed to close index", "err", err)
@@ -688,6 +1121,18 @@ func (b *bleveBackend) closeAllIndexes() {
b.indexMetrics.OpenIndexes.WithLabelValues(idx.indexStorage).Dec()
}
}
// Close sub-indexes
for subKey, idx := range b.subIndexCache {
if err := idx.stopUpdaterAndCloseIndex(); err != nil {
b.log.Error("Failed to close sub-index", "subKey", subKey, "err", err)
}
delete(b.subIndexCache, subKey)
if b.indexMetrics != nil {
b.indexMetrics.OpenIndexes.WithLabelValues(idx.indexStorage).Dec()
}
}
}
type updateRequest struct {
@@ -741,6 +1186,184 @@ type bleveIndex struct {
lastFetchedFromCache atomic.Int64
}
// compositeIndex wraps multiple sub-indexes for sharded search.
// It implements resource.ResourceIndex and distributes operations across sub-indexes.
type compositeIndex struct {
key resource.NamespacedResource
subIndexes []*bleveIndex
alias bleve.Index // Bleve IndexAlias for unified search across all sub-indexes
subIndexCount int
backend *bleveBackend
logger log.Logger
// standard and custom fields, shared across all sub-indexes
standard resource.SearchableDocumentFields
fields resource.SearchableDocumentFields
allFields []*resourcepb.ResourceTableColumnDefinition
}
var _ resource.ResourceIndex = &compositeIndex{}
// newCompositeIndex creates a new composite index wrapping multiple sub-indexes.
func (b *bleveBackend) newCompositeIndex(
key resource.NamespacedResource,
subIndexes []*bleveIndex,
fields resource.SearchableDocumentFields,
allFields []*resourcepb.ResourceTableColumnDefinition,
standardSearchFields resource.SearchableDocumentFields,
logger log.Logger,
) *compositeIndex {
// Create Bleve IndexAlias for unified search
indexes := make([]bleve.Index, len(subIndexes))
for i, idx := range subIndexes {
indexes[i] = idx.index
}
return &compositeIndex{
key: key,
subIndexes: subIndexes,
alias: bleve.NewIndexAlias(indexes...),
subIndexCount: len(subIndexes),
backend: b,
logger: logger,
standard: standardSearchFields,
fields: fields,
allFields: allFields,
}
}
// BulkIndex routes documents to the appropriate sub-index based on document key hash.
// Documents are grouped by sub-index and then indexed in parallel to maximize throughput.
func (c *compositeIndex) BulkIndex(req *resource.BulkIndexRequest) error {
if len(req.Items) == 0 {
return nil
}
// Group items by sub-index
itemsBySubIndex := make(map[int][]*resource.BulkIndexItem)
for _, item := range req.Items {
var docKey *resourcepb.ResourceKey
if item.Action == resource.ActionIndex && item.Doc != nil {
docKey = item.Doc.Key
} else if item.Action == resource.ActionDelete {
docKey = item.Key
}
if docKey == nil {
return fmt.Errorf("missing document key for bulk index item")
}
subIndexID := c.backend.GetSubIndexForDocument(docKey)
itemsBySubIndex[subIndexID] = append(itemsBySubIndex[subIndexID], item)
}
// Process sub-index batches in parallel for better throughput
var wg sync.WaitGroup
errCh := make(chan error, len(itemsBySubIndex))
for subIndexID, items := range itemsBySubIndex {
if subIndexID >= len(c.subIndexes) {
return fmt.Errorf("sub-index ID %d out of range (max %d)", subIndexID, len(c.subIndexes)-1)
}
wg.Add(1)
go func(idx int, indexItems []*resource.BulkIndexItem) {
defer wg.Done()
subReq := &resource.BulkIndexRequest{
Items: indexItems,
ResourceVersion: req.ResourceVersion,
}
if err := c.subIndexes[idx].BulkIndex(subReq); err != nil {
errCh <- fmt.Errorf("error indexing to sub-index %d: %w", idx, err)
}
}(subIndexID, items)
}
wg.Wait()
close(errCh)
// Return first error if any
for err := range errCh {
return err
}
return nil
}
// Search performs a search across all sub-indexes using the IndexAlias.
func (c *compositeIndex) Search(
ctx context.Context,
access authlib.AccessClient,
req *resourcepb.ResourceSearchRequest,
federate []resource.ResourceIndex,
stats *resource.SearchStats,
) (*resourcepb.ResourceSearchResponse, error) {
// Delegate to the first sub-index's search logic, but use our alias
// This works because we set up the IndexAlias to search across all sub-indexes
if len(c.subIndexes) == 0 {
return &resourcepb.ResourceSearchResponse{
Error: resource.NewBadRequestError("no sub-indexes available"),
}, nil
}
// Use the first sub-index for search implementation, but with our composite alias
// The sub-index will use its search logic with our federated alias
return c.subIndexes[0].Search(ctx, access, req, federate, stats)
}
// ListManagedObjects aggregates results from all sub-indexes.
func (c *compositeIndex) ListManagedObjects(ctx context.Context, req *resourcepb.ListManagedObjectsRequest, stats *resource.SearchStats) (*resourcepb.ListManagedObjectsResponse, error) {
if len(c.subIndexes) == 0 {
return &resourcepb.ListManagedObjectsResponse{}, nil
}
// Use the first sub-index - the alias handles cross-index queries
return c.subIndexes[0].ListManagedObjects(ctx, req, stats)
}
// CountManagedObjects aggregates counts from all sub-indexes.
func (c *compositeIndex) CountManagedObjects(ctx context.Context, stats *resource.SearchStats) ([]*resourcepb.CountManagedObjectsResponse_ResourceCount, error) {
if len(c.subIndexes) == 0 {
return nil, nil
}
// Use the first sub-index - the alias handles cross-index queries
return c.subIndexes[0].CountManagedObjects(ctx, stats)
}
// DocCount returns the total document count across all sub-indexes.
func (c *compositeIndex) DocCount(ctx context.Context, folder string, stats *resource.SearchStats) (int64, error) {
var total int64
for _, idx := range c.subIndexes {
count, err := idx.DocCount(ctx, folder, stats)
if err != nil {
return 0, err
}
total += count
}
return total, nil
}
// UpdateIndex updates all sub-indexes to the latest data.
func (c *compositeIndex) UpdateIndex(ctx context.Context) (int64, error) {
var maxRV int64
for _, idx := range c.subIndexes {
rv, err := idx.UpdateIndex(ctx)
if err != nil {
return 0, err
}
if rv > maxRV {
maxRV = rv
}
}
return maxRV, nil
}
// BuildInfo returns build information from the first sub-index.
func (c *compositeIndex) BuildInfo() (resource.IndexBuildInfo, error) {
if len(c.subIndexes) == 0 {
return resource.IndexBuildInfo{}, fmt.Errorf("no sub-indexes available")
}
return c.subIndexes[0].BuildInfo()
}
func (b *bleveBackend) newBleveIndex(
key resource.NamespacedResource,
index bleve.Index,
+9
View File
@@ -17,6 +17,7 @@ func NewSearchOptions(
docs resource.DocumentBuilderSupplier,
indexMetrics *resource.BleveIndexMetrics,
ownsIndexFn func(key resource.NamespacedResource) (bool, error),
ownsSubIndexFn ...func(key resource.NamespacedResource, subIndexID int) (bool, error),
) (resource.SearchOptions, error) {
//nolint:staticcheck // not yet migrated to OpenFeature
if cfg.EnableSearch || features.IsEnabledGlobally(featuremgmt.FlagProvisioning) {
@@ -39,13 +40,21 @@ func NewSearchOptions(
}
}
// Get OwnsSubIndex function if provided
var ownsSubIdx func(key resource.NamespacedResource, subIndexID int) (bool, error)
if len(ownsSubIndexFn) > 0 && ownsSubIndexFn[0] != nil {
ownsSubIdx = ownsSubIndexFn[0]
}
bleve, err := NewBleveBackend(BleveOptions{
Root: root,
FileThreshold: int64(cfg.IndexFileThreshold), // fewer than X items will use a memory index
IndexCacheTTL: cfg.IndexCacheTTL, // How long to keep the index cache in memory
BuildVersion: cfg.BuildVersion,
OwnsIndex: ownsIndexFn,
OwnsSubIndex: ownsSubIdx,
IndexMinUpdateInterval: cfg.IndexMinUpdateInterval,
SubIndexCount: cfg.SubIndexesPerNamespace,
}, indexMetrics)
if err != nil {
+30 -5
View File
@@ -226,6 +226,18 @@ var (
)
func (s *service) OwnsIndex(key resource.NamespacedResource) (bool, error) {
// When sub-index sharding is enabled, use OwnsSubIndex with subIndexID=0
// to maintain backward compatibility. OwnsIndex is used for the main index.
return s.OwnsSubIndex(key, 0)
}
// OwnsSubIndex checks if the current instance owns a specific sub-index.
// The sub-index ID is included in the ring hash to distribute sub-indexes
// across nodes in the ring. This enables horizontal scaling for large namespaces.
//
// When subIndexID is 0 and SubIndexesPerNamespace is 0 (disabled), this behaves
// exactly like the original OwnsIndex - maintaining backward compatibility.
func (s *service) OwnsSubIndex(key resource.NamespacedResource, subIndexID int) (bool, error) {
if s.searchRing == nil {
return true, nil
}
@@ -235,12 +247,25 @@ func (s *service) OwnsIndex(key resource.NamespacedResource) (bool, error) {
}
ringHasher := fnv.New32a()
_, err := ringHasher.Write([]byte(key.Namespace))
if err != nil {
return false, fmt.Errorf("error hashing namespace: %w", err)
// When sub-index sharding is enabled (SubIndexesPerNamespace > 0),
// include the subIndexID in the hash to distribute sub-indexes across nodes.
// This allows different sub-indexes of the same namespace to be owned by
// different nodes, enabling horizontal scaling.
if s.cfg.SubIndexesPerNamespace > 0 {
_, err := ringHasher.Write([]byte(fmt.Sprintf("%s/%d", key.Namespace, subIndexID)))
if err != nil {
return false, fmt.Errorf("error hashing namespace with sub-index: %w", err)
}
} else {
// Original behavior: hash only the namespace
_, err := ringHasher.Write([]byte(key.Namespace))
if err != nil {
return false, fmt.Errorf("error hashing namespace: %w", err)
}
}
rs, err := s.searchRing.GetWithOptions(ringHasher.Sum32(), searchOwnerRead, ring.WithReplicationFactor(s.searchRing.ReplicationFactor()))
rs, err := s.searchRing.GetWithOptions(ringHasher.Sum32(), searchOwnerRead)
if err != nil {
return false, fmt.Errorf("error getting replicaset from ring: %w", err)
}
@@ -261,7 +286,7 @@ func (s *service) starting(ctx context.Context) error {
return err
}
searchOptions, err := search.NewSearchOptions(s.features, s.cfg, s.docBuilders, s.indexMetrics, s.OwnsIndex)
searchOptions, err := search.NewSearchOptions(s.features, s.cfg, s.docBuilders, s.indexMetrics, s.OwnsIndex, s.OwnsSubIndex)
if err != nil {
return err
}