Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 314db3c24c | |||
| e7ae54f1a6 | |||
| fa4dbce7da | |||
| 93264ac67d | |||
| 0b9b47aaea | |||
| 5e74848ee0 | |||
| aa90ac7ccc |
@@ -603,6 +603,7 @@ type Cfg struct {
|
||||
MaxFileIndexAge time.Duration // Max age of file-based indexes. Index older than this will be rebuilt asynchronously.
|
||||
MinFileIndexBuildVersion string // Minimum version of Grafana that built the file-based index. If index was built with older Grafana, it will be rebuilt asynchronously.
|
||||
EnableSharding bool
|
||||
SubIndexesPerNamespace int // Number of sub-indexes per (namespace, group, resource) for sharding. 0 = disabled.
|
||||
QOSEnabled bool
|
||||
QOSNumberWorker int
|
||||
QOSMaxSizePerTenant int
|
||||
|
||||
@@ -102,19 +102,33 @@ func (cfg *Cfg) setUnifiedStorageConfig() {
|
||||
}
|
||||
cfg.EnableSearch = section.Key("enable_search").MustBool(false)
|
||||
cfg.MaxPageSizeBytes = section.Key("max_page_size_bytes").MustInt(0)
|
||||
// Index storage path. For Kubernetes Deployments without PVCs, use emptyDir:
|
||||
// index_path = /var/lib/grafana/unified-search/bleve
|
||||
// Indexes are derived data and will be rebuilt from SQL on pod restart.
|
||||
cfg.IndexPath = section.Key("index_path").String()
|
||||
cfg.IndexWorkers = section.Key("index_workers").MustInt(10)
|
||||
cfg.IndexRebuildWorkers = section.Key("index_rebuild_workers").MustInt(5)
|
||||
// Sharding configuration for large-scale deployments (200k+ dashboards)
|
||||
// When enable_sharding=true, indexes are distributed across pods using a ring.
|
||||
// Each pod owns a subset of sub-indexes and rebuilds them from SQL on startup.
|
||||
// This enables horizontal scaling without requiring PVCs (use emptyDir volumes).
|
||||
cfg.EnableSharding = section.Key("enable_sharding").MustBool(false)
|
||||
cfg.SubIndexesPerNamespace = section.Key("sub_indexes_per_namespace").MustInt(0) // 0 = disabled, recommended: 64 for 1M scale
|
||||
cfg.QOSEnabled = section.Key("qos_enabled").MustBool(false)
|
||||
cfg.QOSNumberWorker = section.Key("qos_num_worker").MustInt(16)
|
||||
cfg.QOSMaxSizePerTenant = section.Key("qos_max_size_per_tenant").MustInt(1000)
|
||||
// Memberlist ring configuration for distributed search
|
||||
// For Kubernetes Deployments, use DNS-based discovery with headless services:
|
||||
// memberlist_join_member = dnssrv+grafana-memberlist.namespace.svc:7946
|
||||
// The dnssrv+ prefix triggers SRV record lookup for pod IPs.
|
||||
cfg.MemberlistBindAddr = section.Key("memberlist_bind_addr").String()
|
||||
cfg.MemberlistAdvertiseAddr = section.Key("memberlist_advertise_addr").String()
|
||||
cfg.MemberlistAdvertisePort = section.Key("memberlist_advertise_port").MustInt(7946)
|
||||
cfg.MemberlistJoinMember = section.Key("memberlist_join_member").String()
|
||||
cfg.MemberlistClusterLabel = section.Key("memberlist_cluster_label").String()
|
||||
cfg.MemberlistClusterLabelVerificationDisabled = section.Key("memberlist_cluster_label_verification_disabled").MustBool(false)
|
||||
// SearchRingReplicationFactor configures replication factor of indexes across multiple instances.
|
||||
// Recommended: 2 for production deployments using emptyDir volumes to provides availability during pod restarts/rebuilds.
|
||||
cfg.SearchRingReplicationFactor = section.Key("search_ring_replication_factor").MustInt(1)
|
||||
cfg.InstanceID = section.Key("instance_id").String()
|
||||
cfg.IndexFileThreshold = section.Key("index_file_threshold").MustInt(10)
|
||||
|
||||
@@ -45,6 +45,23 @@ func (s *NamespacedResource) String() string {
|
||||
return fmt.Sprintf("%s/%s/%s", s.Namespace, s.Group, s.Resource)
|
||||
}
|
||||
|
||||
// SubIndexKey extends NamespacedResource with a sub-index identifier for sharding.
|
||||
// When sub-index sharding is enabled, each (namespace, group, resource) is split
|
||||
// into multiple sub-indexes identified by SubIndexID.
|
||||
type SubIndexKey struct {
|
||||
NamespacedResource
|
||||
SubIndexID int
|
||||
}
|
||||
|
||||
func (s *SubIndexKey) String() string {
|
||||
return fmt.Sprintf("%s/%s/%s/shard-%d", s.Namespace, s.Group, s.Resource, s.SubIndexID)
|
||||
}
|
||||
|
||||
// ToNamespacedResource returns the NamespacedResource without the sub-index ID.
|
||||
func (s *SubIndexKey) ToNamespacedResource() NamespacedResource {
|
||||
return s.NamespacedResource
|
||||
}
|
||||
|
||||
type IndexAction int
|
||||
|
||||
const (
|
||||
|
||||
@@ -1,11 +1,14 @@
|
||||
package resource
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"math/rand"
|
||||
"slices"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@@ -15,6 +18,7 @@ import (
|
||||
"github.com/grafana/dskit/services"
|
||||
userutils "github.com/grafana/dskit/user"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/health/grpc_health_v1"
|
||||
@@ -35,10 +39,11 @@ func ProvideSearchDistributorServer(cfg *setting.Cfg, features featuremgmt.Featu
|
||||
}
|
||||
|
||||
distributorServer := &distributorServer{
|
||||
log: log.New("index-server-distributor"),
|
||||
ring: ring,
|
||||
clientPool: ringClientPool,
|
||||
tracing: tracer,
|
||||
log: log.New("index-server-distributor"),
|
||||
ring: ring,
|
||||
clientPool: ringClientPool,
|
||||
tracing: tracer,
|
||||
subIndexesPerNamespace: cfg.SubIndexesPerNamespace,
|
||||
}
|
||||
|
||||
healthService, err := ProvideHealthService(distributorServer)
|
||||
@@ -83,10 +88,11 @@ const RingHeartbeatTimeout = time.Minute
|
||||
const RingNumTokens = 128
|
||||
|
||||
type distributorServer struct {
|
||||
clientPool *ringclient.Pool
|
||||
ring *ring.Ring
|
||||
log log.Logger
|
||||
tracing trace.Tracer
|
||||
clientPool *ringclient.Pool
|
||||
ring *ring.Ring
|
||||
log log.Logger
|
||||
tracing trace.Tracer
|
||||
subIndexesPerNamespace int // Number of sub-indexes per namespace (0 = disabled)
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -99,12 +105,33 @@ var (
|
||||
func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
|
||||
ctx, span := ds.tracing.Start(ctx, "distributor.Search")
|
||||
defer span.End()
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace, "Search")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
// If sub-index sharding is not enabled, use the existing single-node routing
|
||||
if ds.subIndexesPerNamespace <= 0 {
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace, "Search")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return client.Search(ctx, r)
|
||||
}
|
||||
|
||||
return client.Search(ctx, r)
|
||||
// Scatter-gather search across all sub-indexes
|
||||
nsr := NamespacedResource{
|
||||
Namespace: r.Options.Key.Namespace,
|
||||
Group: r.Options.Key.Group,
|
||||
Resource: r.Options.Key.Resource,
|
||||
}
|
||||
|
||||
span.SetAttributes(
|
||||
attribute.String("namespace", nsr.Namespace),
|
||||
attribute.String("group", nsr.Group),
|
||||
attribute.String("resource", nsr.Resource),
|
||||
attribute.Int("sub_indexes", ds.subIndexesPerNamespace),
|
||||
)
|
||||
|
||||
subIndexes := ds.getSubIndexesForNamespace(nsr)
|
||||
results := ds.parallelSearchWithFailover(ctx, subIndexes, r)
|
||||
return ds.mergeResults(ctx, results, r)
|
||||
}
|
||||
|
||||
func (ds *distributorServer) GetStats(ctx context.Context, r *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
|
||||
@@ -242,7 +269,7 @@ func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, n
|
||||
return ctx, nil, err
|
||||
}
|
||||
|
||||
rs, err := ds.ring.GetWithOptions(ringHasher.Sum32(), searchRingRead, ring.WithReplicationFactor(ds.ring.ReplicationFactor()))
|
||||
rs, err := ds.ring.GetWithOptions(ringHasher.Sum32(), searchRingRead)
|
||||
if err != nil {
|
||||
ds.log.Debug("error getting replication set from ring", "err", err, "namespace", namespace)
|
||||
return ctx, nil, err
|
||||
@@ -276,3 +303,384 @@ func (ds *distributorServer) IsHealthy(ctx context.Context, r *resourcepb.Health
|
||||
|
||||
return &resourcepb.HealthCheckResponse{Status: resourcepb.HealthCheckResponse_NOT_SERVING}, nil
|
||||
}
|
||||
|
||||
// --- Scatter-Gather Query Implementation with Replica Failover ---
|
||||
|
||||
// subIndexSearchResult holds the result from searching a single sub-index.
|
||||
type subIndexSearchResult struct {
|
||||
subIndexID int
|
||||
response *resourcepb.ResourceSearchResponse
|
||||
err error
|
||||
partialFailure bool // true if all replicas for this sub-index failed
|
||||
}
|
||||
|
||||
// getSubIndexesForNamespace returns all sub-index keys for a NamespacedResource.
|
||||
// This is used for scatter-gather queries that need to query all sub-indexes.
|
||||
func (ds *distributorServer) getSubIndexesForNamespace(nsr NamespacedResource) []SubIndexKey {
|
||||
count := ds.subIndexesPerNamespace
|
||||
if count <= 0 {
|
||||
count = 1
|
||||
}
|
||||
keys := make([]SubIndexKey, count)
|
||||
for i := 0; i < count; i++ {
|
||||
keys[i] = SubIndexKey{
|
||||
NamespacedResource: nsr,
|
||||
SubIndexID: i,
|
||||
}
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
||||
// getReplicasForSubIndex returns the ordered list of replicas (instances) that own
|
||||
// the given sub-index. Replicas are ordered by preference from the ring.
|
||||
// Uses consistent hashing including the sub-index ID to determine ownership.
|
||||
func (ds *distributorServer) getReplicasForSubIndex(subIndex SubIndexKey) ([]ring.InstanceDesc, error) {
|
||||
ringHasher := fnv.New32a()
|
||||
// Include sub-index ID in hash to distribute sub-indexes across nodes
|
||||
_, err := ringHasher.Write([]byte(fmt.Sprintf("%s/%d", subIndex.Namespace, subIndex.SubIndexID)))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error hashing sub-index key: %w", err)
|
||||
}
|
||||
rs, err := ds.ring.GetWithOptions(ringHasher.Sum32(), searchRingRead)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error getting replication set from ring for sub-index %s: %w", subIndex.String(), err)
|
||||
}
|
||||
|
||||
return rs.Instances, nil
|
||||
}
|
||||
|
||||
// parallelSearchWithFailover executes search queries across all sub-indexes in parallel.
|
||||
// For each sub-index, it tries replicas in order until one succeeds.
|
||||
// Returns results from all sub-indexes (some may be marked as partial failures).
|
||||
func (ds *distributorServer) parallelSearchWithFailover(ctx context.Context, subIndexes []SubIndexKey, r *resourcepb.ResourceSearchRequest) []*subIndexSearchResult {
|
||||
ctx, span := ds.tracing.Start(ctx, "distributor.parallelSearchWithFailover")
|
||||
defer span.End()
|
||||
|
||||
results := make([]*subIndexSearchResult, len(subIndexes))
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for i, subIdx := range subIndexes {
|
||||
wg.Add(1)
|
||||
go func(idx int, key SubIndexKey) {
|
||||
defer wg.Done()
|
||||
results[idx] = ds.searchSubIndexWithFailover(ctx, key, r)
|
||||
}(i, subIdx)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// Count partial failures for logging
|
||||
partialFailures := 0
|
||||
for _, result := range results {
|
||||
if result.partialFailure {
|
||||
partialFailures++
|
||||
}
|
||||
}
|
||||
span.SetAttributes(attribute.Int("partial_failures", partialFailures))
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
// searchSubIndexWithFailover searches a single sub-index, trying each replica in order
|
||||
// until one succeeds. If all replicas fail, marks the result as a partial failure.
|
||||
func (ds *distributorServer) searchSubIndexWithFailover(ctx context.Context, subIndex SubIndexKey, r *resourcepb.ResourceSearchRequest) *subIndexSearchResult {
|
||||
result := &subIndexSearchResult{
|
||||
subIndexID: subIndex.SubIndexID,
|
||||
}
|
||||
|
||||
// Get ordered list of replicas for this sub-index
|
||||
replicas, err := ds.getReplicasForSubIndex(subIndex)
|
||||
if err != nil {
|
||||
ds.log.Warn("failed to get replicas for sub-index", "subIndex", subIndex.String(), "error", err)
|
||||
result.err = err
|
||||
result.partialFailure = true
|
||||
return result
|
||||
}
|
||||
|
||||
if len(replicas) == 0 {
|
||||
ds.log.Warn("no replicas available for sub-index", "subIndex", subIndex.String())
|
||||
result.err = fmt.Errorf("no replicas available for sub-index %s", subIndex.String())
|
||||
result.partialFailure = true
|
||||
return result
|
||||
}
|
||||
|
||||
// Prepare context with metadata
|
||||
md, ok := metadata.FromIncomingContext(ctx)
|
||||
if !ok {
|
||||
md = make(metadata.MD)
|
||||
}
|
||||
rCtx := userutils.InjectOrgID(metadata.NewOutgoingContext(ctx, md), subIndex.Namespace)
|
||||
|
||||
// Try each replica in order until success
|
||||
// Per-replica timeout ensures SLO compliance (≤500ms for distributed search)
|
||||
// With replication factor 2: worst case is 2 attempts × 200ms = 400ms, leaving room for merge
|
||||
const replicaTimeout = 200 * time.Millisecond
|
||||
var lastErr error
|
||||
for replicaIdx, replica := range replicas {
|
||||
client, err := ds.clientPool.GetClientForInstance(replica)
|
||||
if err != nil {
|
||||
ds.log.Debug("failed to get client for replica",
|
||||
"subIndex", subIndex.String(),
|
||||
"replica", replica.Id,
|
||||
"replicaIdx", replicaIdx,
|
||||
"error", err)
|
||||
lastErr = err
|
||||
continue
|
||||
}
|
||||
|
||||
// Apply per-replica timeout to ensure failover happens quickly
|
||||
replicaCtx, cancel := context.WithTimeout(rCtx, replicaTimeout)
|
||||
resp, err := client.(*RingClient).Client.Search(replicaCtx, r)
|
||||
cancel() // Always cancel to release resources
|
||||
|
||||
if err == nil && resp.Error == nil {
|
||||
// Success
|
||||
result.response = resp
|
||||
return result
|
||||
}
|
||||
|
||||
// Log failover event
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
ds.log.Warn("search failed on replica, failing over to next",
|
||||
"subIndex", subIndex.String(),
|
||||
"replica", replica.Id,
|
||||
"replicaIdx", replicaIdx,
|
||||
"remainingReplicas", len(replicas)-replicaIdx-1,
|
||||
"error", err)
|
||||
} else if resp.Error != nil {
|
||||
lastErr = fmt.Errorf("search error: %s", resp.Error.Message)
|
||||
ds.log.Warn("search returned error on replica, failing over to next",
|
||||
"subIndex", subIndex.String(),
|
||||
"replica", replica.Id,
|
||||
"replicaIdx", replicaIdx,
|
||||
"remainingReplicas", len(replicas)-replicaIdx-1,
|
||||
"errorMessage", resp.Error.Message)
|
||||
}
|
||||
}
|
||||
|
||||
// All replicas failed - mark as partial failure
|
||||
ds.log.Error("all replicas failed for sub-index",
|
||||
"subIndex", subIndex.String(),
|
||||
"totalReplicas", len(replicas),
|
||||
"lastError", lastErr)
|
||||
result.err = lastErr
|
||||
result.partialFailure = true
|
||||
return result
|
||||
}
|
||||
|
||||
// mergeResults combines results from all sub-indexes into a single response.
|
||||
// It handles:
|
||||
// - Result deduplication by resource key
|
||||
// - Sort merging (merge-sort for sorted results)
|
||||
// - Pagination across shards
|
||||
// - Tracking and reporting partial failures
|
||||
func (ds *distributorServer) mergeResults(ctx context.Context, results []*subIndexSearchResult, req *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
|
||||
ctx, span := ds.tracing.Start(ctx, "distributor.mergeResults")
|
||||
defer span.End()
|
||||
|
||||
// Collect all rows and track partial failures
|
||||
var allRows []*resourcepb.ResourceTableRow
|
||||
var columns []*resourcepb.ResourceTableColumnDefinition
|
||||
var totalHits int64
|
||||
var totalQueryCost float64
|
||||
var maxScore float64
|
||||
partialFailures := 0
|
||||
failedSubIndexes := []int{}
|
||||
facets := make(map[string]*resourcepb.ResourceSearchResponse_Facet)
|
||||
|
||||
for _, result := range results {
|
||||
if result.partialFailure {
|
||||
partialFailures++
|
||||
failedSubIndexes = append(failedSubIndexes, result.subIndexID)
|
||||
continue
|
||||
}
|
||||
|
||||
if result.response == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
resp := result.response
|
||||
totalHits += resp.TotalHits
|
||||
totalQueryCost += resp.QueryCost
|
||||
if resp.MaxScore > maxScore {
|
||||
maxScore = resp.MaxScore
|
||||
}
|
||||
|
||||
// Use columns from first valid response
|
||||
if columns == nil && resp.Results != nil {
|
||||
columns = resp.Results.Columns
|
||||
}
|
||||
|
||||
// Collect rows
|
||||
if resp.Results != nil && len(resp.Results.Rows) > 0 {
|
||||
allRows = append(allRows, resp.Results.Rows...)
|
||||
}
|
||||
|
||||
// Merge facets
|
||||
for k, v := range resp.Facet {
|
||||
if existing, ok := facets[k]; ok {
|
||||
mergeFacets(existing, v)
|
||||
} else {
|
||||
facets[k] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
span.SetAttributes(
|
||||
attribute.Int("total_rows_before_dedup", len(allRows)),
|
||||
attribute.Int("partial_failures", partialFailures),
|
||||
)
|
||||
|
||||
// Deduplicate rows by resource key
|
||||
allRows = deduplicateRows(allRows)
|
||||
|
||||
span.SetAttributes(attribute.Int("total_rows_after_dedup", len(allRows)))
|
||||
|
||||
// Sort rows if sort criteria provided
|
||||
if len(req.SortBy) > 0 && columns != nil {
|
||||
sortRows(allRows, columns, req.SortBy)
|
||||
}
|
||||
|
||||
// Apply pagination
|
||||
offset := int(req.Offset)
|
||||
limit := int(req.Limit)
|
||||
if limit <= 0 {
|
||||
limit = 100 // default limit
|
||||
}
|
||||
|
||||
var paginatedRows []*resourcepb.ResourceTableRow
|
||||
if offset < len(allRows) {
|
||||
end := offset + limit
|
||||
if end > len(allRows) {
|
||||
end = len(allRows)
|
||||
}
|
||||
paginatedRows = allRows[offset:end]
|
||||
}
|
||||
|
||||
// Build response
|
||||
response := &resourcepb.ResourceSearchResponse{
|
||||
TotalHits: totalHits,
|
||||
QueryCost: totalQueryCost,
|
||||
MaxScore: maxScore,
|
||||
Results: &resourcepb.ResourceTable{
|
||||
Columns: columns,
|
||||
Rows: paginatedRows,
|
||||
},
|
||||
Facet: facets,
|
||||
}
|
||||
|
||||
// Report partial failures if any
|
||||
if partialFailures > 0 {
|
||||
response.Error = &resourcepb.ErrorResult{
|
||||
Code: 206, // Partial Content
|
||||
Message: fmt.Sprintf("partial results: %d of %d sub-indexes failed (sub-indexes: %v)", partialFailures, len(results), failedSubIndexes),
|
||||
}
|
||||
ds.log.Warn("search returned partial results",
|
||||
"namespace", req.Options.Key.Namespace,
|
||||
"failedSubIndexes", partialFailures,
|
||||
"totalSubIndexes", len(results))
|
||||
}
|
||||
|
||||
return response, nil
|
||||
}
|
||||
|
||||
// deduplicateRows removes duplicate rows based on their resource key.
|
||||
// If duplicates exist, keeps the first occurrence.
|
||||
func deduplicateRows(rows []*resourcepb.ResourceTableRow) []*resourcepb.ResourceTableRow {
|
||||
if len(rows) == 0 {
|
||||
return rows
|
||||
}
|
||||
|
||||
seen := make(map[string]bool, len(rows))
|
||||
result := make([]*resourcepb.ResourceTableRow, 0, len(rows))
|
||||
|
||||
for _, row := range rows {
|
||||
if row.Key == nil {
|
||||
continue
|
||||
}
|
||||
key := SearchID(row.Key)
|
||||
if !seen[key] {
|
||||
seen[key] = true
|
||||
result = append(result, row)
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// sortRows sorts rows based on the sort criteria.
|
||||
// Uses stable sort to maintain relative ordering of equal elements.
|
||||
func sortRows(rows []*resourcepb.ResourceTableRow, columns []*resourcepb.ResourceTableColumnDefinition, sortBy []*resourcepb.ResourceSearchRequest_Sort) {
|
||||
if len(rows) == 0 || len(sortBy) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Build column index map
|
||||
columnIndex := make(map[string]int)
|
||||
for i, col := range columns {
|
||||
columnIndex[col.Name] = i
|
||||
}
|
||||
|
||||
sort.SliceStable(rows, func(i, j int) bool {
|
||||
for _, s := range sortBy {
|
||||
colIdx, ok := columnIndex[s.Field]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// Get cell values
|
||||
var valI, valJ []byte
|
||||
if colIdx < len(rows[i].Cells) {
|
||||
valI = rows[i].Cells[colIdx]
|
||||
}
|
||||
if colIdx < len(rows[j].Cells) {
|
||||
valJ = rows[j].Cells[colIdx]
|
||||
}
|
||||
|
||||
// Compare byte slices
|
||||
cmpResult := slices.Compare(valI, valJ)
|
||||
if cmpResult == 0 {
|
||||
continue // Values are equal, check next sort field
|
||||
}
|
||||
|
||||
// Apply descending order if needed
|
||||
if s.Desc {
|
||||
return cmpResult > 0
|
||||
}
|
||||
return cmpResult < 0
|
||||
}
|
||||
return false // All sort fields are equal
|
||||
})
|
||||
}
|
||||
|
||||
// mergeFacets merges facet data from source into target.
|
||||
func mergeFacets(target, source *resourcepb.ResourceSearchResponse_Facet) {
|
||||
if source == nil {
|
||||
return
|
||||
}
|
||||
|
||||
target.Total += source.Total
|
||||
target.Missing += source.Missing
|
||||
|
||||
// Merge term facets
|
||||
termMap := make(map[string]int64)
|
||||
for _, t := range target.Terms {
|
||||
termMap[t.Term] = t.Count
|
||||
}
|
||||
for _, t := range source.Terms {
|
||||
termMap[t.Term] += t.Count
|
||||
}
|
||||
|
||||
// Rebuild term slice sorted by count (descending)
|
||||
target.Terms = make([]*resourcepb.ResourceSearchResponse_TermFacet, 0, len(termMap))
|
||||
for term, count := range termMap {
|
||||
target.Terms = append(target.Terms, &resourcepb.ResourceSearchResponse_TermFacet{
|
||||
Term: term,
|
||||
Count: count,
|
||||
})
|
||||
}
|
||||
slices.SortFunc(target.Terms, func(a, b *resourcepb.ResourceSearchResponse_TermFacet) int {
|
||||
return cmp.Compare(b.Count, a.Count) // Descending order
|
||||
})
|
||||
}
|
||||
|
||||
@@ -0,0 +1,390 @@
|
||||
package resource
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.opentelemetry.io/otel/trace/noop"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
)
|
||||
|
||||
func TestGetSubIndexesForNamespace(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
subIndexCount int
|
||||
expectedCount int
|
||||
expectedIDs []int
|
||||
}{
|
||||
{
|
||||
name: "zero sub-indexes defaults to 1",
|
||||
subIndexCount: 0,
|
||||
expectedCount: 1,
|
||||
expectedIDs: []int{0},
|
||||
},
|
||||
{
|
||||
name: "negative sub-indexes defaults to 1",
|
||||
subIndexCount: -1,
|
||||
expectedCount: 1,
|
||||
expectedIDs: []int{0},
|
||||
},
|
||||
{
|
||||
name: "4 sub-indexes",
|
||||
subIndexCount: 4,
|
||||
expectedCount: 4,
|
||||
expectedIDs: []int{0, 1, 2, 3},
|
||||
},
|
||||
{
|
||||
name: "64 sub-indexes",
|
||||
subIndexCount: 64,
|
||||
expectedCount: 64,
|
||||
expectedIDs: nil, // Don't check all 64
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ds := &distributorServer{
|
||||
subIndexesPerNamespace: tt.subIndexCount,
|
||||
}
|
||||
|
||||
nsr := NamespacedResource{
|
||||
Namespace: "org-1",
|
||||
Group: "dashboard.grafana.app",
|
||||
Resource: "dashboards",
|
||||
}
|
||||
|
||||
result := ds.getSubIndexesForNamespace(nsr)
|
||||
assert.Len(t, result, tt.expectedCount)
|
||||
|
||||
// Check that all keys have the correct NSR
|
||||
for i, key := range result {
|
||||
assert.Equal(t, nsr, key.NamespacedResource)
|
||||
assert.Equal(t, i, key.SubIndexID)
|
||||
}
|
||||
|
||||
// Check specific IDs if provided
|
||||
if tt.expectedIDs != nil {
|
||||
for i, expectedID := range tt.expectedIDs {
|
||||
assert.Equal(t, expectedID, result[i].SubIndexID)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeduplicateRows(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
input []*resourcepb.ResourceTableRow
|
||||
expected int // expected number of rows after dedup
|
||||
}{
|
||||
{
|
||||
name: "empty input",
|
||||
input: []*resourcepb.ResourceTableRow{},
|
||||
expected: 0,
|
||||
},
|
||||
{
|
||||
name: "no duplicates",
|
||||
input: []*resourcepb.ResourceTableRow{
|
||||
{Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "a"}},
|
||||
{Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "b"}},
|
||||
{Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "c"}},
|
||||
},
|
||||
expected: 3,
|
||||
},
|
||||
{
|
||||
name: "with duplicates",
|
||||
input: []*resourcepb.ResourceTableRow{
|
||||
{Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "a"}},
|
||||
{Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "b"}},
|
||||
{Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "a"}}, // duplicate
|
||||
{Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "c"}},
|
||||
{Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "b"}}, // duplicate
|
||||
},
|
||||
expected: 3,
|
||||
},
|
||||
{
|
||||
name: "rows with nil keys are skipped",
|
||||
input: []*resourcepb.ResourceTableRow{
|
||||
{Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "a"}},
|
||||
{Key: nil},
|
||||
{Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "b"}},
|
||||
},
|
||||
expected: 2,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
result := deduplicateRows(tt.input)
|
||||
assert.Len(t, result, tt.expected)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSortRows(t *testing.T) {
|
||||
columns := []*resourcepb.ResourceTableColumnDefinition{
|
||||
{Name: "title"},
|
||||
{Name: "created"},
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
rows []*resourcepb.ResourceTableRow
|
||||
sortBy []*resourcepb.ResourceSearchRequest_Sort
|
||||
expectedOrder []string // expected order of first cell values
|
||||
}{
|
||||
{
|
||||
name: "sort ascending by title",
|
||||
rows: []*resourcepb.ResourceTableRow{
|
||||
{Key: &resourcepb.ResourceKey{Name: "c"}, Cells: [][]byte{[]byte("charlie"), nil}},
|
||||
{Key: &resourcepb.ResourceKey{Name: "a"}, Cells: [][]byte{[]byte("alpha"), nil}},
|
||||
{Key: &resourcepb.ResourceKey{Name: "b"}, Cells: [][]byte{[]byte("bravo"), nil}},
|
||||
},
|
||||
sortBy: []*resourcepb.ResourceSearchRequest_Sort{
|
||||
{Field: "title", Desc: false},
|
||||
},
|
||||
expectedOrder: []string{"alpha", "bravo", "charlie"},
|
||||
},
|
||||
{
|
||||
name: "sort descending by title",
|
||||
rows: []*resourcepb.ResourceTableRow{
|
||||
{Key: &resourcepb.ResourceKey{Name: "a"}, Cells: [][]byte{[]byte("alpha"), nil}},
|
||||
{Key: &resourcepb.ResourceKey{Name: "b"}, Cells: [][]byte{[]byte("bravo"), nil}},
|
||||
{Key: &resourcepb.ResourceKey{Name: "c"}, Cells: [][]byte{[]byte("charlie"), nil}},
|
||||
},
|
||||
sortBy: []*resourcepb.ResourceSearchRequest_Sort{
|
||||
{Field: "title", Desc: true},
|
||||
},
|
||||
expectedOrder: []string{"charlie", "bravo", "alpha"},
|
||||
},
|
||||
{
|
||||
name: "empty rows",
|
||||
rows: []*resourcepb.ResourceTableRow{},
|
||||
sortBy: []*resourcepb.ResourceSearchRequest_Sort{{Field: "title"}},
|
||||
expectedOrder: []string{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
sortRows(tt.rows, columns, tt.sortBy)
|
||||
|
||||
for i, expected := range tt.expectedOrder {
|
||||
if i < len(tt.rows) {
|
||||
assert.Equal(t, expected, string(tt.rows[i].Cells[0]))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMergeFacets(t *testing.T) {
|
||||
t.Run("merge term facets", func(t *testing.T) {
|
||||
target := &resourcepb.ResourceSearchResponse_Facet{
|
||||
Field: "tags",
|
||||
Total: 100,
|
||||
Missing: 5,
|
||||
Terms: []*resourcepb.ResourceSearchResponse_TermFacet{
|
||||
{Term: "production", Count: 50},
|
||||
{Term: "staging", Count: 30},
|
||||
},
|
||||
}
|
||||
|
||||
source := &resourcepb.ResourceSearchResponse_Facet{
|
||||
Field: "tags",
|
||||
Total: 80,
|
||||
Missing: 3,
|
||||
Terms: []*resourcepb.ResourceSearchResponse_TermFacet{
|
||||
{Term: "production", Count: 40},
|
||||
{Term: "development", Count: 25},
|
||||
},
|
||||
}
|
||||
|
||||
mergeFacets(target, source)
|
||||
|
||||
assert.Equal(t, int64(180), target.Total)
|
||||
assert.Equal(t, int64(8), target.Missing)
|
||||
assert.Len(t, target.Terms, 3)
|
||||
|
||||
// Terms should be sorted by count descending
|
||||
termCounts := make(map[string]int64)
|
||||
for _, term := range target.Terms {
|
||||
termCounts[term.Term] = term.Count
|
||||
}
|
||||
assert.Equal(t, int64(90), termCounts["production"])
|
||||
assert.Equal(t, int64(30), termCounts["staging"])
|
||||
assert.Equal(t, int64(25), termCounts["development"])
|
||||
})
|
||||
|
||||
t.Run("merge nil source", func(t *testing.T) {
|
||||
target := &resourcepb.ResourceSearchResponse_Facet{
|
||||
Total: 100,
|
||||
}
|
||||
mergeFacets(target, nil)
|
||||
assert.Equal(t, int64(100), target.Total)
|
||||
})
|
||||
}
|
||||
|
||||
func TestSubIndexSearchResult(t *testing.T) {
|
||||
t.Run("successful result", func(t *testing.T) {
|
||||
result := &subIndexSearchResult{
|
||||
subIndexID: 5,
|
||||
response: &resourcepb.ResourceSearchResponse{
|
||||
TotalHits: 100,
|
||||
},
|
||||
partialFailure: false,
|
||||
}
|
||||
assert.False(t, result.partialFailure)
|
||||
assert.NotNil(t, result.response)
|
||||
})
|
||||
|
||||
t.Run("partial failure result", func(t *testing.T) {
|
||||
result := &subIndexSearchResult{
|
||||
subIndexID: 3,
|
||||
err: assert.AnError,
|
||||
partialFailure: true,
|
||||
}
|
||||
assert.True(t, result.partialFailure)
|
||||
assert.Nil(t, result.response)
|
||||
assert.Error(t, result.err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestSubIndexKey(t *testing.T) {
|
||||
key := SubIndexKey{
|
||||
NamespacedResource: NamespacedResource{
|
||||
Namespace: "org-1",
|
||||
Group: "dashboard.grafana.app",
|
||||
Resource: "dashboards",
|
||||
},
|
||||
SubIndexID: 42,
|
||||
}
|
||||
|
||||
t.Run("String representation", func(t *testing.T) {
|
||||
expected := "org-1/dashboard.grafana.app/dashboards/shard-42"
|
||||
assert.Equal(t, expected, key.String())
|
||||
})
|
||||
|
||||
t.Run("ToNamespacedResource", func(t *testing.T) {
|
||||
nsr := key.ToNamespacedResource()
|
||||
assert.Equal(t, "org-1", nsr.Namespace)
|
||||
assert.Equal(t, "dashboard.grafana.app", nsr.Group)
|
||||
assert.Equal(t, "dashboards", nsr.Resource)
|
||||
})
|
||||
}
|
||||
|
||||
func newTestDistributorServer() *distributorServer {
|
||||
return &distributorServer{
|
||||
tracing: noop.NewTracerProvider().Tracer("test"),
|
||||
log: log.New("test-distributor"),
|
||||
}
|
||||
}
|
||||
|
||||
func TestMergeResultsPagination(t *testing.T) {
|
||||
// Create mock results from 3 sub-indexes
|
||||
results := []*subIndexSearchResult{
|
||||
{
|
||||
subIndexID: 0,
|
||||
response: &resourcepb.ResourceSearchResponse{
|
||||
TotalHits: 10,
|
||||
Results: &resourcepb.ResourceTable{
|
||||
Columns: []*resourcepb.ResourceTableColumnDefinition{{Name: "title"}},
|
||||
Rows: []*resourcepb.ResourceTableRow{
|
||||
{Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "a1"}, Cells: [][]byte{[]byte("a1")}},
|
||||
{Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "a2"}, Cells: [][]byte{[]byte("a2")}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
subIndexID: 1,
|
||||
response: &resourcepb.ResourceSearchResponse{
|
||||
TotalHits: 10,
|
||||
Results: &resourcepb.ResourceTable{
|
||||
Columns: []*resourcepb.ResourceTableColumnDefinition{{Name: "title"}},
|
||||
Rows: []*resourcepb.ResourceTableRow{
|
||||
{Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "b1"}, Cells: [][]byte{[]byte("b1")}},
|
||||
{Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "b2"}, Cells: [][]byte{[]byte("b2")}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
t.Run("pagination with limit", func(t *testing.T) {
|
||||
ds := newTestDistributorServer()
|
||||
req := &resourcepb.ResourceSearchRequest{
|
||||
Limit: 2,
|
||||
Offset: 0,
|
||||
Options: &resourcepb.ListOptions{
|
||||
Key: &resourcepb.ResourceKey{Namespace: "ns"},
|
||||
},
|
||||
}
|
||||
|
||||
resp, err := ds.mergeResults(context.Background(), results, req)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int64(20), resp.TotalHits) // 10 + 10
|
||||
assert.Len(t, resp.Results.Rows, 2)
|
||||
})
|
||||
|
||||
t.Run("pagination with offset", func(t *testing.T) {
|
||||
ds := newTestDistributorServer()
|
||||
req := &resourcepb.ResourceSearchRequest{
|
||||
Limit: 2,
|
||||
Offset: 2,
|
||||
Options: &resourcepb.ListOptions{
|
||||
Key: &resourcepb.ResourceKey{Namespace: "ns"},
|
||||
},
|
||||
}
|
||||
|
||||
resp, err := ds.mergeResults(context.Background(), results, req)
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, resp.Results.Rows, 2) // rows 3 and 4
|
||||
})
|
||||
}
|
||||
|
||||
func TestMergeResultsPartialFailure(t *testing.T) {
|
||||
results := []*subIndexSearchResult{
|
||||
{
|
||||
subIndexID: 0,
|
||||
response: &resourcepb.ResourceSearchResponse{
|
||||
TotalHits: 10,
|
||||
Results: &resourcepb.ResourceTable{
|
||||
Columns: []*resourcepb.ResourceTableColumnDefinition{{Name: "title"}},
|
||||
Rows: []*resourcepb.ResourceTableRow{},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
subIndexID: 1,
|
||||
partialFailure: true,
|
||||
err: assert.AnError,
|
||||
},
|
||||
{
|
||||
subIndexID: 2,
|
||||
partialFailure: true,
|
||||
err: assert.AnError,
|
||||
},
|
||||
}
|
||||
|
||||
ds := newTestDistributorServer()
|
||||
req := &resourcepb.ResourceSearchRequest{
|
||||
Limit: 100,
|
||||
Options: &resourcepb.ListOptions{
|
||||
Key: &resourcepb.ResourceKey{Namespace: "ns"},
|
||||
},
|
||||
}
|
||||
|
||||
resp, err := ds.mergeResults(context.Background(), results, req)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Should have partial failure error
|
||||
require.NotNil(t, resp.Error)
|
||||
assert.Equal(t, int32(206), resp.Error.Code) // HTTP 206 Partial Content
|
||||
assert.Contains(t, resp.Error.Message, "2 of 3 sub-indexes failed")
|
||||
}
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@@ -81,6 +82,17 @@ type BleveOptions struct {
|
||||
// Indexes that are not owned by current instance are eligible for cleanup.
|
||||
// If nil, all indexes are owned by the current instance.
|
||||
OwnsIndex func(key resource.NamespacedResource) (bool, error)
|
||||
|
||||
// OwnsSubIndex is called to check whether a specific sub-index is owned by the current instance.
|
||||
// This function considers the sub-index ID when determining ownership via ring hash.
|
||||
// If nil, falls back to OwnsIndex behavior with subIndexID=0.
|
||||
OwnsSubIndex func(key resource.NamespacedResource, subIndexID int) (bool, error)
|
||||
|
||||
// SubIndexCount is the number of sub-indexes per (namespace, group, resource).
|
||||
// When > 0, documents are distributed across sub-indexes using consistent hashing.
|
||||
// This enables horizontal scaling for large namespaces (1M+ documents).
|
||||
// Recommended: 64 for large scale deployments.
|
||||
SubIndexCount int
|
||||
}
|
||||
|
||||
type bleveBackend struct {
|
||||
@@ -90,9 +102,17 @@ type bleveBackend struct {
|
||||
// set from opts.OwnsIndex, always non-nil
|
||||
ownsIndexFn func(key resource.NamespacedResource) (bool, error)
|
||||
|
||||
// set from opts.OwnsSubIndex, always non-nil
|
||||
// Used for checking ownership of sub-indexes when sharding is enabled
|
||||
ownsSubIndexFn func(key resource.NamespacedResource, subIndexID int) (bool, error)
|
||||
|
||||
cacheMx sync.RWMutex
|
||||
cache map[resource.NamespacedResource]*bleveIndex
|
||||
|
||||
// subIndexCache stores sub-indexes when sub-index sharding is enabled.
|
||||
// Key is SubIndexKey which includes the sub-index ID.
|
||||
subIndexCache map[resource.SubIndexKey]*bleveIndex
|
||||
|
||||
indexMetrics *resource.BleveIndexMetrics
|
||||
|
||||
bgTasksCancel func()
|
||||
@@ -136,12 +156,23 @@ func NewBleveBackend(opts BleveOptions, indexMetrics *resource.BleveIndexMetrics
|
||||
ownFn = func(key resource.NamespacedResource) (bool, error) { return true, nil }
|
||||
}
|
||||
|
||||
ownSubFn := opts.OwnsSubIndex
|
||||
if ownSubFn == nil {
|
||||
// By default, fall back to OwnsIndex behavior (ignore subIndexID).
|
||||
// This maintains backward compatibility when sub-index sharding is not enabled.
|
||||
ownSubFn = func(key resource.NamespacedResource, subIndexID int) (bool, error) {
|
||||
return ownFn(key)
|
||||
}
|
||||
}
|
||||
|
||||
be := &bleveBackend{
|
||||
log: l,
|
||||
cache: map[resource.NamespacedResource]*bleveIndex{},
|
||||
opts: opts,
|
||||
ownsIndexFn: ownFn,
|
||||
indexMetrics: indexMetrics,
|
||||
log: l,
|
||||
cache: map[resource.NamespacedResource]*bleveIndex{},
|
||||
subIndexCache: map[resource.SubIndexKey]*bleveIndex{},
|
||||
opts: opts,
|
||||
ownsIndexFn: ownFn,
|
||||
ownsSubIndexFn: ownSubFn,
|
||||
indexMetrics: indexMetrics,
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
@@ -158,6 +189,53 @@ func NewBleveBackend(opts BleveOptions, indexMetrics *resource.BleveIndexMetrics
|
||||
return be, nil
|
||||
}
|
||||
|
||||
// IsSubIndexShardingEnabled returns true if sub-index sharding is configured.
|
||||
func (b *bleveBackend) IsSubIndexShardingEnabled() bool {
|
||||
return b.opts.SubIndexCount > 0
|
||||
}
|
||||
|
||||
// GetSubIndexCount returns the number of sub-indexes per (namespace, group, resource).
|
||||
func (b *bleveBackend) GetSubIndexCount() int {
|
||||
return b.opts.SubIndexCount
|
||||
}
|
||||
|
||||
// GetSubIndexForDocument computes the sub-index ID for a document based on its key.
|
||||
// Uses FNV32a hash of the full document key for consistent distribution.
|
||||
func (b *bleveBackend) GetSubIndexForDocument(key *resourcepb.ResourceKey) int {
|
||||
if b.opts.SubIndexCount <= 0 {
|
||||
return 0
|
||||
}
|
||||
h := fnv.New32a()
|
||||
// Hash the full document key: namespace/group/resource/name
|
||||
_, _ = h.Write([]byte(fmt.Sprintf("%s/%s/%s/%s", key.Namespace, key.Group, key.Resource, key.Name)))
|
||||
return int(h.Sum32() % uint32(b.opts.SubIndexCount))
|
||||
}
|
||||
|
||||
// GetSubIndexKey returns the SubIndexKey for a document.
|
||||
func (b *bleveBackend) GetSubIndexKey(nsr resource.NamespacedResource, docKey *resourcepb.ResourceKey) resource.SubIndexKey {
|
||||
return resource.SubIndexKey{
|
||||
NamespacedResource: nsr,
|
||||
SubIndexID: b.GetSubIndexForDocument(docKey),
|
||||
}
|
||||
}
|
||||
|
||||
// GetAllSubIndexKeys returns all SubIndexKey values for a NamespacedResource.
|
||||
// This is used for scatter-gather queries that need to query all sub-indexes.
|
||||
func (b *bleveBackend) GetAllSubIndexKeys(nsr resource.NamespacedResource) []resource.SubIndexKey {
|
||||
count := b.opts.SubIndexCount
|
||||
if count <= 0 {
|
||||
count = 1
|
||||
}
|
||||
keys := make([]resource.SubIndexKey, count)
|
||||
for i := 0; i < count; i++ {
|
||||
keys[i] = resource.SubIndexKey{
|
||||
NamespacedResource: nsr,
|
||||
SubIndexID: i,
|
||||
}
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
||||
// GetIndex will return nil if the key does not exist
|
||||
func (b *bleveBackend) GetIndex(key resource.NamespacedResource) resource.ResourceIndex {
|
||||
idx := b.getCachedIndex(key, time.Now())
|
||||
@@ -228,7 +306,13 @@ func (b *bleveBackend) runEvictExpiredOrUnownedIndexes(now time.Time) {
|
||||
unowned := map[resource.NamespacedResource]*bleveIndex{}
|
||||
ownCheckErrors := map[resource.NamespacedResource]error{}
|
||||
|
||||
// For sub-indexes
|
||||
expiredSubIndexes := map[resource.SubIndexKey]*bleveIndex{}
|
||||
unownedSubIndexes := map[resource.SubIndexKey]*bleveIndex{}
|
||||
ownSubCheckErrors := map[resource.SubIndexKey]error{}
|
||||
|
||||
b.cacheMx.Lock()
|
||||
// Process main cache (non-sharded indexes)
|
||||
for key, idx := range b.cache {
|
||||
// Check if index has expired.
|
||||
if !idx.expiration.IsZero() && now.After(idx.expiration) {
|
||||
@@ -248,21 +332,75 @@ func (b *bleveBackend) runEvictExpiredOrUnownedIndexes(now time.Time) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Process sub-index cache (sharded indexes)
|
||||
for subKey, idx := range b.subIndexCache {
|
||||
// Check if sub-index has expired.
|
||||
if !idx.expiration.IsZero() && now.After(idx.expiration) {
|
||||
delete(b.subIndexCache, subKey)
|
||||
expiredSubIndexes[subKey] = idx
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if sub-index is owned by this instance using OwnsSubIndex.
|
||||
// This considers the subIndexID when determining ownership via ring hash.
|
||||
if cacheTTLMillis > 0 {
|
||||
owned, err := b.ownsSubIndexFn(subKey.NamespacedResource, subKey.SubIndexID)
|
||||
if err != nil {
|
||||
ownSubCheckErrors[subKey] = err
|
||||
} else if !owned && now.UnixMilli()-idx.lastFetchedFromCache.Load() > cacheTTLMillis {
|
||||
delete(b.subIndexCache, subKey)
|
||||
unownedSubIndexes[subKey] = idx
|
||||
}
|
||||
}
|
||||
}
|
||||
b.cacheMx.Unlock()
|
||||
|
||||
// Log errors for main cache ownership checks
|
||||
for key, err := range ownCheckErrors {
|
||||
b.log.Warn("failed to check if index belongs to this instance", "key", key, "err", err)
|
||||
}
|
||||
|
||||
// Log errors for sub-index ownership checks
|
||||
for subKey, err := range ownSubCheckErrors {
|
||||
b.log.Warn("failed to check if sub-index belongs to this instance", "subKey", subKey, "err", err)
|
||||
}
|
||||
|
||||
// Evict unowned main indexes
|
||||
for key, idx := range unowned {
|
||||
b.log.Info("index evicted from cache", "reason", "unowned", "key", key, "storage", idx.indexStorage)
|
||||
b.closeIndex(idx, key)
|
||||
}
|
||||
|
||||
// Evict unowned sub-indexes
|
||||
for subKey, idx := range unownedSubIndexes {
|
||||
b.log.Info("sub-index evicted from cache", "reason", "unowned", "subKey", subKey, "storage", idx.indexStorage)
|
||||
b.closeSubIndex(idx, subKey)
|
||||
}
|
||||
|
||||
// Evict expired main indexes
|
||||
for key, idx := range expired {
|
||||
b.log.Info("index evicted from cache", "reason", "expired", "key", key, "storage", idx.indexStorage)
|
||||
b.closeIndex(idx, key)
|
||||
}
|
||||
|
||||
// Evict expired sub-indexes
|
||||
for subKey, idx := range expiredSubIndexes {
|
||||
b.log.Info("sub-index evicted from cache", "reason", "expired", "subKey", subKey, "storage", idx.indexStorage)
|
||||
b.closeSubIndex(idx, subKey)
|
||||
}
|
||||
}
|
||||
|
||||
// closeSubIndex closes a sub-index and updates metrics.
|
||||
func (b *bleveBackend) closeSubIndex(idx *bleveIndex, key resource.SubIndexKey) {
|
||||
err := idx.stopUpdaterAndCloseIndex()
|
||||
if err != nil {
|
||||
b.log.Error("failed to close sub-index", "key", key, "err", err)
|
||||
}
|
||||
|
||||
if b.indexMetrics != nil {
|
||||
b.indexMetrics.OpenIndexes.WithLabelValues(idx.indexStorage).Dec()
|
||||
}
|
||||
}
|
||||
|
||||
// updateIndexSizeMetric sets the total size of all file-based indices metric.
|
||||
@@ -368,6 +506,12 @@ func (b *bleveBackend) BuildIndex(
|
||||
attribute.String("reason", indexBuildReason),
|
||||
)
|
||||
|
||||
// If sub-index sharding is enabled, delegate to the parallel sub-index builder
|
||||
if b.opts.SubIndexCount > 0 {
|
||||
span.SetAttributes(attribute.Int("sub_index_count", b.opts.SubIndexCount))
|
||||
return b.buildShardedIndex(ctx, key, size, fields, indexBuildReason, builder, updater, rebuild)
|
||||
}
|
||||
|
||||
mapper, err := GetBleveMappings(fields)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -603,6 +747,294 @@ func isPathWithinRoot(path, absoluteRoot string) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// buildShardedIndex builds all sub-indexes in parallel and returns a compositeIndex.
|
||||
// This is used when sub-index sharding is enabled (SubIndexCount > 0).
|
||||
func (b *bleveBackend) buildShardedIndex(
|
||||
ctx context.Context,
|
||||
key resource.NamespacedResource,
|
||||
size int64,
|
||||
fields resource.SearchableDocumentFields,
|
||||
indexBuildReason string,
|
||||
builder resource.BuildFn,
|
||||
updater resource.UpdateFn,
|
||||
rebuild bool,
|
||||
) (resource.ResourceIndex, error) {
|
||||
_, span := tracer.Start(ctx, "search.bleveBackend.buildShardedIndex")
|
||||
defer span.End()
|
||||
|
||||
logWithDetails := b.log.FromContext(ctx).New(
|
||||
"namespace", key.Namespace,
|
||||
"group", key.Group,
|
||||
"resource", key.Resource,
|
||||
"size", size,
|
||||
"reason", indexBuildReason,
|
||||
"sub_index_count", b.opts.SubIndexCount,
|
||||
)
|
||||
|
||||
mapper, err := GetBleveMappings(fields)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
standardSearchFields := resource.StandardSearchFields()
|
||||
allFields, err := getAllFields(standardSearchFields, fields)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Calculate size per sub-index for threshold decisions
|
||||
sizePerSubIndex := size / int64(b.opts.SubIndexCount)
|
||||
if sizePerSubIndex < 1 {
|
||||
sizePerSubIndex = 1
|
||||
}
|
||||
|
||||
// Create all sub-indexes with limited concurrency to avoid overwhelming the file system
|
||||
// Use a worker pool pattern - limit to 8 concurrent index creations
|
||||
type subIndexResult struct {
|
||||
subIndex *bleveIndex
|
||||
subIndexKey resource.SubIndexKey
|
||||
err error
|
||||
}
|
||||
|
||||
results := make([]subIndexResult, b.opts.SubIndexCount)
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Limit concurrent index creations to avoid file system contention
|
||||
maxConcurrentIndexCreations := 8
|
||||
if b.opts.SubIndexCount < maxConcurrentIndexCreations {
|
||||
maxConcurrentIndexCreations = b.opts.SubIndexCount
|
||||
}
|
||||
semaphore := make(chan struct{}, maxConcurrentIndexCreations)
|
||||
|
||||
logWithDetails.Info("Building sharded index", "sub_index_count", b.opts.SubIndexCount, "size_per_sub_index", sizePerSubIndex, "max_concurrent", maxConcurrentIndexCreations)
|
||||
|
||||
for i := 0; i < b.opts.SubIndexCount; i++ {
|
||||
wg.Add(1)
|
||||
go func(subIndexID int) {
|
||||
defer wg.Done()
|
||||
// Acquire semaphore
|
||||
semaphore <- struct{}{}
|
||||
defer func() { <-semaphore }()
|
||||
|
||||
subKey := resource.SubIndexKey{
|
||||
NamespacedResource: key,
|
||||
SubIndexID: subIndexID,
|
||||
}
|
||||
results[subIndexID].subIndexKey = subKey
|
||||
|
||||
// Build individual sub-index
|
||||
idx, buildErr := b.buildSingleSubIndex(
|
||||
ctx, subKey, sizePerSubIndex, mapper, fields, allFields, standardSearchFields, updater, indexBuildReason, rebuild,
|
||||
)
|
||||
if buildErr != nil {
|
||||
results[subIndexID].err = buildErr
|
||||
return
|
||||
}
|
||||
results[subIndexID].subIndex = idx
|
||||
}(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// Check for errors and collect successful sub-indexes
|
||||
subIndexes := make([]*bleveIndex, 0, b.opts.SubIndexCount)
|
||||
var firstErr error
|
||||
for i, result := range results {
|
||||
if result.err != nil {
|
||||
logWithDetails.Error("Failed to build sub-index", "sub_index_id", i, "err", result.err)
|
||||
if firstErr == nil {
|
||||
firstErr = result.err
|
||||
}
|
||||
continue
|
||||
}
|
||||
if result.subIndex != nil {
|
||||
subIndexes = append(subIndexes, result.subIndex)
|
||||
}
|
||||
}
|
||||
|
||||
// If any sub-index failed, clean up and return error
|
||||
if firstErr != nil {
|
||||
for _, result := range results {
|
||||
if result.subIndex != nil {
|
||||
_ = result.subIndex.stopUpdaterAndCloseIndex()
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("failed to build sharded index: %w", firstErr)
|
||||
}
|
||||
|
||||
// Create the composite index
|
||||
composite := b.newCompositeIndex(key, subIndexes, fields, allFields, standardSearchFields, b.log.New("namespace", key.Namespace, "group", key.Group, "resource", key.Resource))
|
||||
|
||||
// Build the index by calling the builder with the composite index
|
||||
// The builder will call BulkIndex on the composite, which routes documents to correct sub-indexes
|
||||
if b.indexMetrics != nil {
|
||||
b.indexMetrics.IndexBuilds.WithLabelValues(indexBuildReason).Inc()
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
listRV, err := builder(composite)
|
||||
if err != nil {
|
||||
logWithDetails.Error("Failed to build sharded index", "err", err)
|
||||
if b.indexMetrics != nil {
|
||||
b.indexMetrics.IndexBuildFailures.Inc()
|
||||
}
|
||||
// Clean up all sub-indexes on failure
|
||||
for _, idx := range subIndexes {
|
||||
_ = idx.stopUpdaterAndCloseIndex()
|
||||
}
|
||||
return nil, fmt.Errorf("failed to build sharded index: %w", err)
|
||||
}
|
||||
|
||||
// Update resource version on all sub-indexes
|
||||
for _, idx := range subIndexes {
|
||||
if err := idx.updateResourceVersion(listRV); err != nil {
|
||||
logWithDetails.Error("Failed to persist RV to sub-index", "err", err, "rv", listRV)
|
||||
// Continue - this is not fatal
|
||||
}
|
||||
}
|
||||
|
||||
elapsed := time.Since(start)
|
||||
logWithDetails.Info("Finished building sharded index", "elapsed", elapsed, "listRV", listRV, "sub_indexes_built", len(subIndexes))
|
||||
|
||||
if b.indexMetrics != nil {
|
||||
b.indexMetrics.IndexCreationTime.WithLabelValues().Observe(elapsed.Seconds())
|
||||
}
|
||||
|
||||
// Store sub-indexes in the cache
|
||||
b.cacheMx.Lock()
|
||||
for i, idx := range subIndexes {
|
||||
subKey := resource.SubIndexKey{
|
||||
NamespacedResource: key,
|
||||
SubIndexID: i,
|
||||
}
|
||||
prev := b.subIndexCache[subKey]
|
||||
b.subIndexCache[subKey] = idx
|
||||
|
||||
// Close previous sub-index if it existed
|
||||
if prev != nil {
|
||||
if b.indexMetrics != nil {
|
||||
b.indexMetrics.OpenIndexes.WithLabelValues(prev.indexStorage).Dec()
|
||||
}
|
||||
if err := prev.stopUpdaterAndCloseIndex(); err != nil {
|
||||
logWithDetails.Error("failed to close previous sub-index", "sub_key", subKey, "err", err)
|
||||
}
|
||||
}
|
||||
if b.indexMetrics != nil {
|
||||
b.indexMetrics.OpenIndexes.WithLabelValues(idx.indexStorage).Inc()
|
||||
}
|
||||
}
|
||||
b.cacheMx.Unlock()
|
||||
|
||||
return composite, nil
|
||||
}
|
||||
|
||||
// buildSingleSubIndex builds a single sub-index for sharded indexing.
|
||||
func (b *bleveBackend) buildSingleSubIndex(
|
||||
ctx context.Context,
|
||||
subKey resource.SubIndexKey,
|
||||
size int64,
|
||||
mapper mapping.IndexMapping,
|
||||
fields resource.SearchableDocumentFields,
|
||||
allFields []*resourcepb.ResourceTableColumnDefinition,
|
||||
standardSearchFields resource.SearchableDocumentFields,
|
||||
updater resource.UpdateFn,
|
||||
indexBuildReason string,
|
||||
rebuild bool,
|
||||
) (*bleveIndex, error) {
|
||||
key := subKey.ToNamespacedResource()
|
||||
logWithDetails := b.log.FromContext(ctx).New(
|
||||
"namespace", key.Namespace,
|
||||
"group", key.Group,
|
||||
"resource", key.Resource,
|
||||
"sub_index_id", subKey.SubIndexID,
|
||||
"size", size,
|
||||
)
|
||||
|
||||
// Get directory for this sub-index
|
||||
subIndexDir := b.getSubIndexDir(subKey)
|
||||
|
||||
var index bleve.Index
|
||||
var indexRV int64
|
||||
var err error
|
||||
fileIndexName := ""
|
||||
newIndexType := indexStorageMemory
|
||||
|
||||
if size >= b.opts.FileThreshold {
|
||||
newIndexType = indexStorageFile
|
||||
|
||||
// Check for existing file-based index if not rebuilding
|
||||
if !rebuild {
|
||||
index, fileIndexName, indexRV, err = b.findPreviousFileBasedIndex(subIndexDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if index != nil {
|
||||
logWithDetails.Debug("Existing sub-index found on filesystem", "indexRV", indexRV, "directory", filepath.Join(subIndexDir, fileIndexName))
|
||||
} else {
|
||||
// Create new file-based index
|
||||
indexDir := ""
|
||||
now := time.Now()
|
||||
for index == nil {
|
||||
fileIndexName = formatIndexName(now)
|
||||
indexDir = filepath.Join(subIndexDir, fileIndexName)
|
||||
if !isPathWithinRoot(indexDir, b.opts.Root) {
|
||||
return nil, fmt.Errorf("invalid path %s", indexDir)
|
||||
}
|
||||
|
||||
// Ensure sub-index directory exists
|
||||
if err := os.MkdirAll(subIndexDir, 0750); err != nil {
|
||||
return nil, fmt.Errorf("failed to create sub-index directory: %w", err)
|
||||
}
|
||||
|
||||
index, err = newBleveIndex(indexDir, mapper, time.Now(), b.opts.BuildVersion)
|
||||
if errors.Is(err, bleve.ErrorIndexPathExists) {
|
||||
now = now.Add(time.Second)
|
||||
index = nil
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating new bleve sub-index: %s %w", indexDir, err)
|
||||
}
|
||||
}
|
||||
logWithDetails.Debug("Created new file-based sub-index", "directory", indexDir)
|
||||
}
|
||||
} else {
|
||||
index, err = newBleveIndex("", mapper, time.Now(), b.opts.BuildVersion)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating new in-memory bleve sub-index: %w", err)
|
||||
}
|
||||
logWithDetails.Debug("Created new in-memory sub-index")
|
||||
}
|
||||
|
||||
// Create the bleveIndex wrapper
|
||||
idx := b.newBleveIndex(key, index, newIndexType, fields, allFields, standardSearchFields, updater, logWithDetails)
|
||||
|
||||
// Set expiration for in-memory indexes
|
||||
if fileIndexName == "" && b.opts.IndexCacheTTL > 0 {
|
||||
idx.expiration = time.Now().Add(b.opts.IndexCacheTTL)
|
||||
}
|
||||
|
||||
// If we reused an existing index, set its resource version
|
||||
if indexRV > 0 {
|
||||
idx.resourceVersion = indexRV
|
||||
}
|
||||
|
||||
return idx, nil
|
||||
}
|
||||
|
||||
// getSubIndexDir returns the directory path for a sub-index.
|
||||
func (b *bleveBackend) getSubIndexDir(subKey resource.SubIndexKey) string {
|
||||
key := subKey.ToNamespacedResource()
|
||||
return filepath.Join(
|
||||
b.opts.Root,
|
||||
cleanFileSegment(key.Namespace),
|
||||
cleanFileSegment(fmt.Sprintf("%s.%s", key.Resource, key.Group)),
|
||||
fmt.Sprintf("shard-%d", subKey.SubIndexID),
|
||||
)
|
||||
}
|
||||
|
||||
// TotalDocs returns the total number of documents across all indices
|
||||
func (b *bleveBackend) TotalDocs() int64 {
|
||||
var totalDocs int64
|
||||
@@ -678,6 +1110,7 @@ func (b *bleveBackend) closeAllIndexes() {
|
||||
b.cacheMx.Lock()
|
||||
defer b.cacheMx.Unlock()
|
||||
|
||||
// Close main indexes
|
||||
for key, idx := range b.cache {
|
||||
if err := idx.stopUpdaterAndCloseIndex(); err != nil {
|
||||
b.log.Error("Failed to close index", "err", err)
|
||||
@@ -688,6 +1121,18 @@ func (b *bleveBackend) closeAllIndexes() {
|
||||
b.indexMetrics.OpenIndexes.WithLabelValues(idx.indexStorage).Dec()
|
||||
}
|
||||
}
|
||||
|
||||
// Close sub-indexes
|
||||
for subKey, idx := range b.subIndexCache {
|
||||
if err := idx.stopUpdaterAndCloseIndex(); err != nil {
|
||||
b.log.Error("Failed to close sub-index", "subKey", subKey, "err", err)
|
||||
}
|
||||
delete(b.subIndexCache, subKey)
|
||||
|
||||
if b.indexMetrics != nil {
|
||||
b.indexMetrics.OpenIndexes.WithLabelValues(idx.indexStorage).Dec()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type updateRequest struct {
|
||||
@@ -741,6 +1186,184 @@ type bleveIndex struct {
|
||||
lastFetchedFromCache atomic.Int64
|
||||
}
|
||||
|
||||
// compositeIndex wraps multiple sub-indexes for sharded search.
|
||||
// It implements resource.ResourceIndex and distributes operations across sub-indexes.
|
||||
type compositeIndex struct {
|
||||
key resource.NamespacedResource
|
||||
subIndexes []*bleveIndex
|
||||
alias bleve.Index // Bleve IndexAlias for unified search across all sub-indexes
|
||||
subIndexCount int
|
||||
backend *bleveBackend
|
||||
logger log.Logger
|
||||
|
||||
// standard and custom fields, shared across all sub-indexes
|
||||
standard resource.SearchableDocumentFields
|
||||
fields resource.SearchableDocumentFields
|
||||
allFields []*resourcepb.ResourceTableColumnDefinition
|
||||
}
|
||||
|
||||
var _ resource.ResourceIndex = &compositeIndex{}
|
||||
|
||||
// newCompositeIndex creates a new composite index wrapping multiple sub-indexes.
|
||||
func (b *bleveBackend) newCompositeIndex(
|
||||
key resource.NamespacedResource,
|
||||
subIndexes []*bleveIndex,
|
||||
fields resource.SearchableDocumentFields,
|
||||
allFields []*resourcepb.ResourceTableColumnDefinition,
|
||||
standardSearchFields resource.SearchableDocumentFields,
|
||||
logger log.Logger,
|
||||
) *compositeIndex {
|
||||
// Create Bleve IndexAlias for unified search
|
||||
indexes := make([]bleve.Index, len(subIndexes))
|
||||
for i, idx := range subIndexes {
|
||||
indexes[i] = idx.index
|
||||
}
|
||||
|
||||
return &compositeIndex{
|
||||
key: key,
|
||||
subIndexes: subIndexes,
|
||||
alias: bleve.NewIndexAlias(indexes...),
|
||||
subIndexCount: len(subIndexes),
|
||||
backend: b,
|
||||
logger: logger,
|
||||
standard: standardSearchFields,
|
||||
fields: fields,
|
||||
allFields: allFields,
|
||||
}
|
||||
}
|
||||
|
||||
// BulkIndex routes documents to the appropriate sub-index based on document key hash.
|
||||
// Documents are grouped by sub-index and then indexed in parallel to maximize throughput.
|
||||
func (c *compositeIndex) BulkIndex(req *resource.BulkIndexRequest) error {
|
||||
if len(req.Items) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Group items by sub-index
|
||||
itemsBySubIndex := make(map[int][]*resource.BulkIndexItem)
|
||||
for _, item := range req.Items {
|
||||
var docKey *resourcepb.ResourceKey
|
||||
if item.Action == resource.ActionIndex && item.Doc != nil {
|
||||
docKey = item.Doc.Key
|
||||
} else if item.Action == resource.ActionDelete {
|
||||
docKey = item.Key
|
||||
}
|
||||
if docKey == nil {
|
||||
return fmt.Errorf("missing document key for bulk index item")
|
||||
}
|
||||
|
||||
subIndexID := c.backend.GetSubIndexForDocument(docKey)
|
||||
itemsBySubIndex[subIndexID] = append(itemsBySubIndex[subIndexID], item)
|
||||
}
|
||||
|
||||
// Process sub-index batches in parallel for better throughput
|
||||
var wg sync.WaitGroup
|
||||
errCh := make(chan error, len(itemsBySubIndex))
|
||||
|
||||
for subIndexID, items := range itemsBySubIndex {
|
||||
if subIndexID >= len(c.subIndexes) {
|
||||
return fmt.Errorf("sub-index ID %d out of range (max %d)", subIndexID, len(c.subIndexes)-1)
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func(idx int, indexItems []*resource.BulkIndexItem) {
|
||||
defer wg.Done()
|
||||
subReq := &resource.BulkIndexRequest{
|
||||
Items: indexItems,
|
||||
ResourceVersion: req.ResourceVersion,
|
||||
}
|
||||
if err := c.subIndexes[idx].BulkIndex(subReq); err != nil {
|
||||
errCh <- fmt.Errorf("error indexing to sub-index %d: %w", idx, err)
|
||||
}
|
||||
}(subIndexID, items)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(errCh)
|
||||
|
||||
// Return first error if any
|
||||
for err := range errCh {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Search performs a search across all sub-indexes using the IndexAlias.
|
||||
func (c *compositeIndex) Search(
|
||||
ctx context.Context,
|
||||
access authlib.AccessClient,
|
||||
req *resourcepb.ResourceSearchRequest,
|
||||
federate []resource.ResourceIndex,
|
||||
stats *resource.SearchStats,
|
||||
) (*resourcepb.ResourceSearchResponse, error) {
|
||||
// Delegate to the first sub-index's search logic, but use our alias
|
||||
// This works because we set up the IndexAlias to search across all sub-indexes
|
||||
if len(c.subIndexes) == 0 {
|
||||
return &resourcepb.ResourceSearchResponse{
|
||||
Error: resource.NewBadRequestError("no sub-indexes available"),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Use the first sub-index for search implementation, but with our composite alias
|
||||
// The sub-index will use its search logic with our federated alias
|
||||
return c.subIndexes[0].Search(ctx, access, req, federate, stats)
|
||||
}
|
||||
|
||||
// ListManagedObjects aggregates results from all sub-indexes.
|
||||
func (c *compositeIndex) ListManagedObjects(ctx context.Context, req *resourcepb.ListManagedObjectsRequest, stats *resource.SearchStats) (*resourcepb.ListManagedObjectsResponse, error) {
|
||||
if len(c.subIndexes) == 0 {
|
||||
return &resourcepb.ListManagedObjectsResponse{}, nil
|
||||
}
|
||||
// Use the first sub-index - the alias handles cross-index queries
|
||||
return c.subIndexes[0].ListManagedObjects(ctx, req, stats)
|
||||
}
|
||||
|
||||
// CountManagedObjects aggregates counts from all sub-indexes.
|
||||
func (c *compositeIndex) CountManagedObjects(ctx context.Context, stats *resource.SearchStats) ([]*resourcepb.CountManagedObjectsResponse_ResourceCount, error) {
|
||||
if len(c.subIndexes) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
// Use the first sub-index - the alias handles cross-index queries
|
||||
return c.subIndexes[0].CountManagedObjects(ctx, stats)
|
||||
}
|
||||
|
||||
// DocCount returns the total document count across all sub-indexes.
|
||||
func (c *compositeIndex) DocCount(ctx context.Context, folder string, stats *resource.SearchStats) (int64, error) {
|
||||
var total int64
|
||||
for _, idx := range c.subIndexes {
|
||||
count, err := idx.DocCount(ctx, folder, stats)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
total += count
|
||||
}
|
||||
return total, nil
|
||||
}
|
||||
|
||||
// UpdateIndex updates all sub-indexes to the latest data.
|
||||
func (c *compositeIndex) UpdateIndex(ctx context.Context) (int64, error) {
|
||||
var maxRV int64
|
||||
for _, idx := range c.subIndexes {
|
||||
rv, err := idx.UpdateIndex(ctx)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if rv > maxRV {
|
||||
maxRV = rv
|
||||
}
|
||||
}
|
||||
return maxRV, nil
|
||||
}
|
||||
|
||||
// BuildInfo returns build information from the first sub-index.
|
||||
func (c *compositeIndex) BuildInfo() (resource.IndexBuildInfo, error) {
|
||||
if len(c.subIndexes) == 0 {
|
||||
return resource.IndexBuildInfo{}, fmt.Errorf("no sub-indexes available")
|
||||
}
|
||||
return c.subIndexes[0].BuildInfo()
|
||||
}
|
||||
|
||||
func (b *bleveBackend) newBleveIndex(
|
||||
key resource.NamespacedResource,
|
||||
index bleve.Index,
|
||||
|
||||
@@ -17,6 +17,7 @@ func NewSearchOptions(
|
||||
docs resource.DocumentBuilderSupplier,
|
||||
indexMetrics *resource.BleveIndexMetrics,
|
||||
ownsIndexFn func(key resource.NamespacedResource) (bool, error),
|
||||
ownsSubIndexFn ...func(key resource.NamespacedResource, subIndexID int) (bool, error),
|
||||
) (resource.SearchOptions, error) {
|
||||
//nolint:staticcheck // not yet migrated to OpenFeature
|
||||
if cfg.EnableSearch || features.IsEnabledGlobally(featuremgmt.FlagProvisioning) {
|
||||
@@ -39,13 +40,21 @@ func NewSearchOptions(
|
||||
}
|
||||
}
|
||||
|
||||
// Get OwnsSubIndex function if provided
|
||||
var ownsSubIdx func(key resource.NamespacedResource, subIndexID int) (bool, error)
|
||||
if len(ownsSubIndexFn) > 0 && ownsSubIndexFn[0] != nil {
|
||||
ownsSubIdx = ownsSubIndexFn[0]
|
||||
}
|
||||
|
||||
bleve, err := NewBleveBackend(BleveOptions{
|
||||
Root: root,
|
||||
FileThreshold: int64(cfg.IndexFileThreshold), // fewer than X items will use a memory index
|
||||
IndexCacheTTL: cfg.IndexCacheTTL, // How long to keep the index cache in memory
|
||||
BuildVersion: cfg.BuildVersion,
|
||||
OwnsIndex: ownsIndexFn,
|
||||
OwnsSubIndex: ownsSubIdx,
|
||||
IndexMinUpdateInterval: cfg.IndexMinUpdateInterval,
|
||||
SubIndexCount: cfg.SubIndexesPerNamespace,
|
||||
}, indexMetrics)
|
||||
|
||||
if err != nil {
|
||||
|
||||
@@ -226,6 +226,18 @@ var (
|
||||
)
|
||||
|
||||
func (s *service) OwnsIndex(key resource.NamespacedResource) (bool, error) {
|
||||
// When sub-index sharding is enabled, use OwnsSubIndex with subIndexID=0
|
||||
// to maintain backward compatibility. OwnsIndex is used for the main index.
|
||||
return s.OwnsSubIndex(key, 0)
|
||||
}
|
||||
|
||||
// OwnsSubIndex checks if the current instance owns a specific sub-index.
|
||||
// The sub-index ID is included in the ring hash to distribute sub-indexes
|
||||
// across nodes in the ring. This enables horizontal scaling for large namespaces.
|
||||
//
|
||||
// When subIndexID is 0 and SubIndexesPerNamespace is 0 (disabled), this behaves
|
||||
// exactly like the original OwnsIndex - maintaining backward compatibility.
|
||||
func (s *service) OwnsSubIndex(key resource.NamespacedResource, subIndexID int) (bool, error) {
|
||||
if s.searchRing == nil {
|
||||
return true, nil
|
||||
}
|
||||
@@ -235,12 +247,25 @@ func (s *service) OwnsIndex(key resource.NamespacedResource) (bool, error) {
|
||||
}
|
||||
|
||||
ringHasher := fnv.New32a()
|
||||
_, err := ringHasher.Write([]byte(key.Namespace))
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("error hashing namespace: %w", err)
|
||||
|
||||
// When sub-index sharding is enabled (SubIndexesPerNamespace > 0),
|
||||
// include the subIndexID in the hash to distribute sub-indexes across nodes.
|
||||
// This allows different sub-indexes of the same namespace to be owned by
|
||||
// different nodes, enabling horizontal scaling.
|
||||
if s.cfg.SubIndexesPerNamespace > 0 {
|
||||
_, err := ringHasher.Write([]byte(fmt.Sprintf("%s/%d", key.Namespace, subIndexID)))
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("error hashing namespace with sub-index: %w", err)
|
||||
}
|
||||
} else {
|
||||
// Original behavior: hash only the namespace
|
||||
_, err := ringHasher.Write([]byte(key.Namespace))
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("error hashing namespace: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
rs, err := s.searchRing.GetWithOptions(ringHasher.Sum32(), searchOwnerRead, ring.WithReplicationFactor(s.searchRing.ReplicationFactor()))
|
||||
rs, err := s.searchRing.GetWithOptions(ringHasher.Sum32(), searchOwnerRead)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("error getting replicaset from ring: %w", err)
|
||||
}
|
||||
@@ -261,7 +286,7 @@ func (s *service) starting(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
searchOptions, err := search.NewSearchOptions(s.features, s.cfg, s.docBuilders, s.indexMetrics, s.OwnsIndex)
|
||||
searchOptions, err := search.NewSearchOptions(s.features, s.cfg, s.docBuilders, s.indexMetrics, s.OwnsIndex, s.OwnsSubIndex)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user