Files
grafana/pkg/storage/unified/search/bleve.go
T
Georges Chaudy 5e7d0392a3 Refactor authorization checks to utilize batch checks instead of compile across multiple services
- Integrated batch authorization checks using the authz package in List functions for IAM and SecureValue services, improving efficiency in permission validation.
- Updated List functions to handle pagination and authorization in a more streamlined manner, reducing redundant checks.
- Enhanced the server's List method to support batch authorization for resource listing, ensuring proper access control.
- Refactored related test cases to validate the new batch authorization logic and ensure comprehensive coverage of various scenarios.
2026-01-09 10:25:36 +01:00

2105 lines
58 KiB
Go

package search
import (
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"iter"
"math"
"os"
"path/filepath"
"slices"
"strconv"
"strings"
"sync"
"time"
"github.com/Masterminds/semver"
"github.com/blevesearch/bleve/v2"
"github.com/blevesearch/bleve/v2/analysis/analyzer/keyword"
"github.com/blevesearch/bleve/v2/analysis/analyzer/standard"
"github.com/blevesearch/bleve/v2/mapping"
"github.com/blevesearch/bleve/v2/search"
"github.com/blevesearch/bleve/v2/search/query"
index "github.com/blevesearch/bleve_index_api"
"github.com/prometheus/client_golang/prometheus"
bolterrors "go.etcd.io/bbolt/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.uber.org/atomic"
"k8s.io/apimachinery/pkg/selection"
"github.com/grafana/grafana/pkg/services/dashboards/dashboardaccess"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/grafana/grafana/pkg/storage/unified/search/builders"
"github.com/grafana/authlib/authz"
authlib "github.com/grafana/authlib/types"
"github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
const (
indexStorageMemory = "memory"
indexStorageFile = "file"
boltTimeout = "500ms"
)
// Keys used to store internal data in index.
const (
internalRVKey = "rv" // Encoded as big-endian int64
internalBuildInfoKey = "build_info" // Encoded as JSON of buildInfo struct
)
var tracer = otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/search")
var _ resource.SearchBackend = &bleveBackend{}
var _ resource.ResourceIndex = &bleveIndex{}
type BleveOptions struct {
// The root folder where file objects are saved
Root string
// The resource count where values switch from memory to file based
FileThreshold int64
// Index cache TTL for bleve indices. 0 disables expiration for in-memory indexes.
// Also used for file-based indexes, if they are not owned by this instance, and they are not fetched from the cache recently.
IndexCacheTTL time.Duration
BuildVersion string
Logger log.Logger
// Minimum time between index updates.
IndexMinUpdateInterval time.Duration
// This function is called to check whether the index is owned by the current instance.
// Indexes that are not owned by current instance are eligible for cleanup.
// If nil, all indexes are owned by the current instance.
OwnsIndex func(key resource.NamespacedResource) (bool, error)
}
type bleveBackend struct {
log log.Logger
opts BleveOptions
// set from opts.OwnsIndex, always non-nil
ownsIndexFn func(key resource.NamespacedResource) (bool, error)
cacheMx sync.RWMutex
cache map[resource.NamespacedResource]*bleveIndex
indexMetrics *resource.BleveIndexMetrics
bgTasksCancel func()
bgTasksWg sync.WaitGroup
}
func NewBleveBackend(opts BleveOptions, indexMetrics *resource.BleveIndexMetrics) (*bleveBackend, error) {
if opts.Root == "" {
return nil, fmt.Errorf("bleve backend missing root folder configuration")
}
absRoot, err := filepath.Abs(opts.Root)
if err != nil {
return nil, fmt.Errorf("error getting absolute path for bleve root folder %w", err)
}
opts.Root = absRoot
root, err := os.Stat(opts.Root)
if err != nil {
return nil, fmt.Errorf("error opening bleve root folder %w", err)
}
if !root.IsDir() {
return nil, fmt.Errorf("bleve root is configured against a file (not folder)")
}
if opts.BuildVersion != "" {
// Don't allow storing invalid versions to the index.
_, err := semver.NewVersion(opts.BuildVersion)
if err != nil {
return nil, fmt.Errorf("cannot parse build version %s: %w", opts.BuildVersion, err)
}
}
l := opts.Logger
if l == nil {
l = log.New("bleve-backend")
}
ownFn := opts.OwnsIndex
if ownFn == nil {
// By default all indexes are owned by this instance.
ownFn = func(key resource.NamespacedResource) (bool, error) { return true, nil }
}
be := &bleveBackend{
log: l,
cache: map[resource.NamespacedResource]*bleveIndex{},
opts: opts,
ownsIndexFn: ownFn,
indexMetrics: indexMetrics,
}
ctx, cancel := context.WithCancel(context.Background())
be.bgTasksCancel = cancel
be.bgTasksWg.Add(1)
go be.evictExpiredOrUnownedIndexesPeriodically(ctx)
if be.indexMetrics != nil {
be.bgTasksWg.Add(1)
go be.updateIndexSizeMetric(ctx, opts.Root)
}
return be, nil
}
// GetIndex will return nil if the key does not exist
func (b *bleveBackend) GetIndex(key resource.NamespacedResource) resource.ResourceIndex {
idx := b.getCachedIndex(key, time.Now())
// Avoid returning typed nils.
if idx == nil {
return nil
}
return idx
}
func (b *bleveBackend) GetOpenIndexes() []resource.NamespacedResource {
b.cacheMx.RLock()
defer b.cacheMx.RUnlock()
result := make([]resource.NamespacedResource, 0, len(b.cache))
for key := range b.cache {
result = append(result, key)
}
return result
}
func (b *bleveBackend) getCachedIndex(key resource.NamespacedResource, now time.Time) *bleveIndex {
// Check index with read-lock first.
b.cacheMx.RLock()
idx := b.cache[key]
b.cacheMx.RUnlock()
if idx == nil {
return nil
}
idx.lastFetchedFromCache.Store(now.UnixMilli())
return idx
}
func (b *bleveBackend) closeIndex(idx *bleveIndex, key resource.NamespacedResource) {
err := idx.stopUpdaterAndCloseIndex()
if err != nil {
b.log.Error("failed to close index", "key", key, "err", err)
}
if b.indexMetrics != nil {
b.indexMetrics.OpenIndexes.WithLabelValues(idx.indexStorage).Dec()
}
}
// This function will periodically evict expired or un-owned indexes from the cache.
func (b *bleveBackend) evictExpiredOrUnownedIndexesPeriodically(ctx context.Context) {
defer b.bgTasksWg.Done()
t := time.NewTicker(2 * time.Minute)
for ctx.Err() == nil {
select {
case <-ctx.Done():
return
case <-t.C:
b.runEvictExpiredOrUnownedIndexes(time.Now())
}
}
}
func (b *bleveBackend) runEvictExpiredOrUnownedIndexes(now time.Time) {
cacheTTLMillis := b.opts.IndexCacheTTL.Milliseconds()
// Collect all expired or unowned into this map, and perform the actual closing without holding the lock.
expired := map[resource.NamespacedResource]*bleveIndex{}
unowned := map[resource.NamespacedResource]*bleveIndex{}
ownCheckErrors := map[resource.NamespacedResource]error{}
b.cacheMx.Lock()
for key, idx := range b.cache {
// Check if index has expired.
if !idx.expiration.IsZero() && now.After(idx.expiration) {
delete(b.cache, key)
expired[key] = idx
continue
}
// Check if index is owned by this instance.
if cacheTTLMillis > 0 {
owned, err := b.ownsIndexFn(key)
if err != nil {
ownCheckErrors[key] = err
} else if !owned && now.UnixMilli()-idx.lastFetchedFromCache.Load() > cacheTTLMillis {
delete(b.cache, key)
unowned[key] = idx
}
}
}
b.cacheMx.Unlock()
for key, err := range ownCheckErrors {
b.log.Warn("failed to check if index belongs to this instance", "key", key, "err", err)
}
for key, idx := range unowned {
b.log.Info("index evicted from cache", "reason", "unowned", "key", key, "storage", idx.indexStorage)
b.closeIndex(idx, key)
}
for key, idx := range expired {
b.log.Info("index evicted from cache", "reason", "expired", "key", key, "storage", idx.indexStorage)
b.closeIndex(idx, key)
}
}
// updateIndexSizeMetric sets the total size of all file-based indices metric.
func (b *bleveBackend) updateIndexSizeMetric(ctx context.Context, indexPath string) {
defer b.bgTasksWg.Done()
for ctx.Err() == nil {
var totalSize int64
err := filepath.WalkDir(indexPath, func(path string, info os.DirEntry, err error) error {
if err != nil {
return err
}
if err = ctx.Err(); err != nil {
return err
}
if !info.IsDir() {
fileInfo, err := info.Info()
if err != nil {
return err
}
totalSize += fileInfo.Size()
}
return nil
})
if err == nil {
b.indexMetrics.IndexSize.Set(float64(totalSize))
} else {
b.log.Error("got error while trying to calculate bleve file index size", "error", err)
}
select {
case <-ctx.Done():
return
case <-time.After(60 * time.Second):
continue
}
}
}
// newBleveIndex creates a new bleve index with consistent configuration.
// If path is empty, creates an in-memory index.
// If path is not empty, creates a file-based index at the specified path.
func newBleveIndex(path string, mapper mapping.IndexMapping, buildTime time.Time, buildVersion string) (bleve.Index, error) {
kvstore := bleve.Config.DefaultKVStore
if path == "" {
// use in-memory kvstore
kvstore = bleve.Config.DefaultMemKVStore
}
ix, err := bleve.NewUsing(path, mapper, bleve.Config.DefaultIndexType, kvstore, nil)
if err != nil {
return nil, err
}
bi := buildInfo{
BuildTime: buildTime.Unix(),
BuildVersion: buildVersion,
}
biBytes, err := json.Marshal(bi)
if err != nil {
cErr := ix.Close()
return nil, errors.Join(fmt.Errorf("failed to store index build info: %w", err), cErr)
}
if err = ix.SetInternal([]byte(internalBuildInfoKey), biBytes); err != nil {
cErr := ix.Close()
return nil, errors.Join(fmt.Errorf("failed to store index build info: %w", err), cErr)
}
return ix, nil
}
type buildInfo struct {
BuildTime int64 `json:"build_time"` // Unix seconds timestamp of time when the index was built
BuildVersion string `json:"build_version"` // Grafana version used when building the index
}
// BuildIndex builds an index from scratch or retrieves it from the filesystem.
// If built successfully, the new index replaces the old index in the cache (if there was any).
// Existing index in the file system is reused, if it exists, and if size indicates that we should use file-based index, and rebuild is not true.
// The return value of "builder" should be the RV returned from List. This will be stored as the index RV
//
//nolint:gocyclo
func (b *bleveBackend) BuildIndex(
ctx context.Context,
key resource.NamespacedResource,
size int64,
fields resource.SearchableDocumentFields,
indexBuildReason string,
builder resource.BuildFn,
updater resource.UpdateFn,
rebuild bool,
) (resource.ResourceIndex, error) {
_, span := tracer.Start(ctx, "search.bleveBackend.BuildIndex")
defer span.End()
span.SetAttributes(
attribute.String("namespace", key.Namespace),
attribute.String("group", key.Group),
attribute.String("resource", key.Resource),
attribute.Int64("size", size),
attribute.String("reason", indexBuildReason),
)
mapper, err := GetBleveMappings(fields)
if err != nil {
return nil, err
}
// Prepare fields before opening/creating indexes, so that we don't need to deal with closing them in case of errors.
standardSearchFields := resource.StandardSearchFields()
allFields, err := getAllFields(standardSearchFields, fields)
if err != nil {
return nil, err
}
logWithDetails := b.log.FromContext(ctx).New("namespace", key.Namespace, "group", key.Group, "resource", key.Resource, "size", size, "reason", indexBuildReason)
// Close the newly created/opened index by default.
closeIndex := true
// This function is added via defer after new index has been created/opened, to make sure we close it properly when needed.
// Whether index needs closing or not is controlled by closeIndex.
closeIndexOnExit := func(index bleve.Index, indexDir string) {
if !closeIndex {
return
}
if closeErr := index.Close(); closeErr != nil {
logWithDetails.Error("Failed to close index after index build failure", "err", closeErr)
}
if indexDir != "" {
if removeErr := os.RemoveAll(indexDir); removeErr != nil {
logWithDetails.Error("Failed to remove index directory after index build failure", "err", removeErr)
}
}
}
resourceDir := b.getResourceDir(key)
var index bleve.Index
var indexRV int64
cachedIndex := b.getCachedIndex(key, time.Now())
fileIndexName := "" // Name of the file-based index, or empty for in-memory indexes.
newIndexType := indexStorageMemory
build := true
if size >= b.opts.FileThreshold {
newIndexType = indexStorageFile
// We only check for the existing file-based index if we don't already have an open index for this key.
// This happens on startup, or when memory-based index has expired. (We don't expire file-based indexes)
// If we do have an unexpired cached index already, we always build a new index from scratch.
if cachedIndex == nil && !rebuild {
result := b.findPreviousFileBasedIndex(resourceDir)
if result != nil && result.IsOpen {
// Index file exists but is opened by another process, fallback to memory.
// Keep the name so we can skip cleanup of that directory.
newIndexType = indexStorageMemory
fileIndexName = result.Name
} else if result != nil && result.Index != nil {
// Found and opened existing index successfully
index = result.Index
fileIndexName = result.Name
indexRV = result.RV
}
}
if newIndexType == indexStorageFile && index != nil {
build = false
logWithDetails.Debug("Existing index found on filesystem", "indexRV", indexRV, "directory", filepath.Join(resourceDir, fileIndexName))
defer closeIndexOnExit(index, "") // Close index, but don't delete directory.
} else if newIndexType == indexStorageFile {
// Building index from scratch. Index name has a time component in it to be unique, but if
// we happen to create non-unique name, we bump the time and try again.
indexDir := ""
now := time.Now()
for index == nil {
fileIndexName = formatIndexName(now)
indexDir = filepath.Join(resourceDir, fileIndexName)
if !isPathWithinRoot(indexDir, b.opts.Root) {
return nil, fmt.Errorf("invalid path %s", indexDir)
}
index, err = newBleveIndex(indexDir, mapper, time.Now(), b.opts.BuildVersion)
if errors.Is(err, bleve.ErrorIndexPathExists) {
now = now.Add(time.Second) // Bump time for next try
index = nil // Bleve actually returns non-nil value with ErrorIndexPathExists
continue
}
if err != nil {
return nil, fmt.Errorf("error creating new bleve index: %s %w", indexDir, err)
}
}
logWithDetails.Info("Building index using filesystem", "directory", indexDir)
defer closeIndexOnExit(index, indexDir) // Close index, and delete new index directory.
}
}
if newIndexType == indexStorageMemory {
index, err = newBleveIndex("", mapper, time.Now(), b.opts.BuildVersion)
if err != nil {
return nil, fmt.Errorf("error creating new in-memory bleve index: %w", err)
}
logWithDetails.Info("Building index using memory")
defer closeIndexOnExit(index, "") // Close index, don't cleanup directory.
}
// Batch all the changes
idx := b.newBleveIndex(key, index, newIndexType, fields, allFields, standardSearchFields, updater, b.log.New("namespace", key.Namespace, "group", key.Group, "resource", key.Resource))
if build {
if b.indexMetrics != nil {
b.indexMetrics.IndexBuilds.WithLabelValues(indexBuildReason).Inc()
}
start := time.Now()
listRV, err := builder(idx)
if err != nil {
logWithDetails.Error("Failed to build index", "err", err)
if b.indexMetrics != nil {
b.indexMetrics.IndexBuildFailures.Inc()
}
return nil, fmt.Errorf("failed to build index: %w", err)
}
err = idx.updateResourceVersion(listRV)
if err != nil {
logWithDetails.Error("Failed to persist RV to index", "err", err, "rv", listRV)
return nil, fmt.Errorf("failed to persist RV to index: %w", err)
}
elapsed := time.Since(start)
logWithDetails.Info("Finished building index", "elapsed", elapsed, "listRV", listRV)
if b.indexMetrics != nil {
b.indexMetrics.IndexCreationTime.WithLabelValues().Observe(elapsed.Seconds())
}
} else {
logWithDetails.Info("Skipping index build, using existing index")
idx.resourceVersion = indexRV
if b.indexMetrics != nil {
b.indexMetrics.IndexBuildSkipped.Inc()
}
}
// Set expiration after building the index. Only expire in-memory indexes.
if fileIndexName == "" && b.opts.IndexCacheTTL > 0 {
idx.expiration = time.Now().Add(b.opts.IndexCacheTTL)
}
// Store the index in the cache.
if idx.expiration.IsZero() {
logWithDetails.Info("Storing index in cache, with no expiration", "key", key)
} else {
logWithDetails.Info("Storing index in cache", "key", key, "expiration", idx.expiration)
}
// We're storing index in the cache, so we can't close it.
closeIndex = false
b.cacheMx.Lock()
prev := b.cache[key]
b.cache[key] = idx
b.cacheMx.Unlock()
// If there was a previous index in the cache, close it.
if prev != nil {
if b.indexMetrics != nil {
b.indexMetrics.OpenIndexes.WithLabelValues(prev.indexStorage).Dec()
}
err := prev.stopUpdaterAndCloseIndex()
if err != nil {
logWithDetails.Error("failed to close previous index", "key", key, "err", err)
}
}
if b.indexMetrics != nil {
b.indexMetrics.OpenIndexes.WithLabelValues(idx.indexStorage).Inc()
}
// Clean up the old index directories. If we have built a new file-based index, the new name is ignored.
// If we have created in-memory index and fileIndexName is empty, all old directories can be removed.
//
// We do the cleanup on the same goroutine as the index building. Using background goroutine could
// cleanup new index directory that is being built by new call to BuildIndex.
b.cleanOldIndexes(resourceDir, fileIndexName)
return idx, nil
}
func (b *bleveBackend) getResourceDir(key resource.NamespacedResource) string {
return filepath.Join(b.opts.Root, cleanFileSegment(key.Namespace), cleanFileSegment(fmt.Sprintf("%s.%s", key.Resource, key.Group)))
}
func cleanFileSegment(input string) string {
input = strings.ReplaceAll(input, string(filepath.Separator), "_")
input = strings.ReplaceAll(input, "..", "_")
return input
}
// cleanOldIndexes deletes all subdirectories inside resourceDir, skipping directory with "skipName".
// "skipName" can be empty.
func (b *bleveBackend) cleanOldIndexes(resourceDir string, skipName string) {
entries, err := os.ReadDir(resourceDir)
if err != nil {
if os.IsNotExist(err) {
return
}
b.log.Warn("error cleaning folders from", "directory", resourceDir, "error", err)
return
}
for _, ent := range entries {
if ent.IsDir() && ent.Name() != skipName {
indexDir := filepath.Join(resourceDir, ent.Name())
if !isPathWithinRoot(indexDir, b.opts.Root) {
b.log.Warn("Skipping cleanup of directory", "directory", indexDir)
continue
}
err = os.RemoveAll(indexDir)
if err != nil {
b.log.Error("Unable to remove old index folder", "directory", indexDir, "error", err)
} else {
b.log.Info("Removed old index folder", "directory", indexDir)
}
}
}
}
// isPathWithinRoot verifies that path is within given absoluteRoot.
func isPathWithinRoot(path, absoluteRoot string) bool {
if path == "" || absoluteRoot == "" {
return false
}
path, err := filepath.Abs(path)
if err != nil {
return false
}
if !strings.HasPrefix(path, absoluteRoot) {
return false
}
return true
}
// TotalDocs returns the total number of documents across all indices
func (b *bleveBackend) TotalDocs() int64 {
var totalDocs int64
// We iterate over keys and call getCachedIndex for each index individually.
// We do this to avoid keeping a lock for the entire TotalDocs function, since DocCount may be slow (due to disk access).
now := time.Now()
for _, key := range b.GetOpenIndexes() {
idx := b.getCachedIndex(key, now)
if idx == nil {
continue
}
c, err := idx.index.DocCount()
if err != nil {
continue
}
totalDocs += int64(c)
}
return totalDocs
}
func formatIndexName(now time.Time) string {
return now.Format("20060102-150405")
}
type fileIndex struct {
Index bleve.Index
Name string
RV int64
IsOpen bool
}
func (b *bleveBackend) findPreviousFileBasedIndex(resourceDir string) *fileIndex {
entries, err := os.ReadDir(resourceDir)
if err != nil {
return nil
}
for _, ent := range entries {
if !ent.IsDir() {
continue
}
indexName := ent.Name()
indexDir := filepath.Join(resourceDir, indexName)
idx, err := bleve.OpenUsing(indexDir, map[string]interface{}{"bolt_timeout": boltTimeout})
if err != nil {
if errors.Is(err, bolterrors.ErrTimeout) {
b.log.Debug("Index is opened by another process (timeout), skipping", "indexDir", indexDir)
return &fileIndex{Name: indexName, IsOpen: true}
}
b.log.Debug("error opening index", "indexDir", indexDir, "err", err)
continue
}
indexRV, err := getRV(idx)
if err != nil {
b.log.Error("error getting rv from index", "indexDir", indexDir, "err", err)
_ = idx.Close()
continue
}
return &fileIndex{
Index: idx,
Name: indexName,
RV: indexRV,
}
}
return nil
}
// Stop closes all indexes and stops background tasks.
func (b *bleveBackend) Stop() {
b.closeAllIndexes()
b.bgTasksCancel()
b.bgTasksWg.Wait()
}
func (b *bleveBackend) closeAllIndexes() {
b.cacheMx.Lock()
defer b.cacheMx.Unlock()
for key, idx := range b.cache {
if err := idx.stopUpdaterAndCloseIndex(); err != nil {
b.log.Error("Failed to close index", "err", err)
}
delete(b.cache, key)
if b.indexMetrics != nil {
b.indexMetrics.OpenIndexes.WithLabelValues(idx.indexStorage).Dec()
}
}
}
type updateRequest struct {
requestTime time.Time
callback chan updateResult
}
type updateResult struct {
rv int64
err error
}
type bleveIndex struct {
key resource.NamespacedResource
index bleve.Index
// RV returned by last List/ListModifiedSince operation. Updated when updating index.
resourceVersion int64
// Timestamp when the last update to the index was done (started).
// Subsequent update requests only trigger new update if minUpdateInterval has elapsed.
nextUpdateTime time.Time
standard resource.SearchableDocumentFields
fields resource.SearchableDocumentFields
indexStorage string // memory or file, used when updating metrics
// When to expire and close the index. Zero value = no expiration.
// We only expire in-memory indexes.
expiration time.Time
// The values returned with all
allFields []*resourcepb.ResourceTableColumnDefinition
logger log.Logger
updaterFn resource.UpdateFn
minUpdateInterval time.Duration
updaterMu sync.Mutex
updaterCond *sync.Cond // Used to signal the updater goroutine that there is work to do, or updater is no longer enabled and should stop. Also used by updater itself to stop early if there's no work to be done.
updaterShutdown bool // When set to true, index is getting closed and updater is no longer going to update index.
updaterQueue []updateRequest // Queue of requests for next updater iteration.
updaterCancel context.CancelFunc // If not nil, the updater goroutine is running with context associated with this cancel function.
updaterWg sync.WaitGroup
updateLatency prometheus.Histogram
updatedDocuments prometheus.Summary
// Used to detect if the index can be safely closed, if it no longer belongs to this instance. UnixMilli.
lastFetchedFromCache atomic.Int64
}
func (b *bleveBackend) newBleveIndex(
key resource.NamespacedResource,
index bleve.Index,
newIndexType string,
fields resource.SearchableDocumentFields,
allFields []*resourcepb.ResourceTableColumnDefinition,
standardSearchFields resource.SearchableDocumentFields,
updaterFn resource.UpdateFn,
logger log.Logger,
) *bleveIndex {
bi := &bleveIndex{
key: key,
index: index,
indexStorage: newIndexType,
fields: fields,
allFields: allFields,
standard: standardSearchFields,
logger: logger,
updaterFn: updaterFn,
minUpdateInterval: b.opts.IndexMinUpdateInterval,
}
bi.updaterCond = sync.NewCond(&bi.updaterMu)
if b.indexMetrics != nil {
bi.updateLatency = b.indexMetrics.UpdateLatency
bi.updatedDocuments = b.indexMetrics.UpdatedDocuments
}
return bi
}
// BulkIndex implements resource.ResourceIndex.
func (b *bleveIndex) BulkIndex(req *resource.BulkIndexRequest) error {
if len(req.Items) == 0 {
return nil
}
batch := b.index.NewBatch()
for _, item := range req.Items {
switch item.Action {
case resource.ActionIndex:
if item.Doc == nil {
return fmt.Errorf("missing document")
}
doc := item.Doc.UpdateCopyFields()
err := batch.Index(resource.SearchID(doc.Key), doc)
if err != nil {
return err
}
case resource.ActionDelete:
batch.Delete(resource.SearchID(item.Key))
}
}
return b.index.Batch(batch)
}
func (b *bleveIndex) updateResourceVersion(rv int64) error {
if rv == 0 {
return nil
}
if err := setRV(b.index, rv); err != nil {
return err
}
b.resourceVersion = rv
return nil
}
func setRV(index bleve.Index, rv int64) error {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, uint64(rv))
return index.SetInternal([]byte(internalRVKey), buf)
}
// getRV will call index.GetInternal to retrieve the RV saved in the index. If index is closed, it will return a
// bleve.ErrorIndexClosed error. If there's no RV saved in the index, or it's invalid format, it will return 0
func getRV(index bleve.Index) (int64, error) {
raw, err := index.GetInternal([]byte(internalRVKey))
if err != nil {
return 0, err
}
if len(raw) < 8 {
return 0, nil
}
return int64(binary.BigEndian.Uint64(raw)), nil
}
func getBuildInfo(index bleve.Index) (buildInfo, error) {
raw, err := index.GetInternal([]byte(internalBuildInfoKey))
if err != nil {
return buildInfo{}, err
}
if len(raw) == 0 {
return buildInfo{}, nil
}
res := buildInfo{}
err = json.Unmarshal(raw, &res)
return res, err
}
func (b *bleveIndex) BuildInfo() (resource.IndexBuildInfo, error) {
bi, err := getBuildInfo(b.index)
if err != nil {
return resource.IndexBuildInfo{}, err
}
bt := time.Time{}
if bi.BuildTime > 0 {
bt = time.Unix(bi.BuildTime, 0)
}
var bv *semver.Version
if bi.BuildVersion != "" {
v, err := semver.NewVersion(bi.BuildVersion)
if err == nil {
bv = v
}
}
return resource.IndexBuildInfo{
BuildTime: bt,
BuildVersion: bv,
}, nil
}
func (b *bleveIndex) ListManagedObjects(ctx context.Context, req *resourcepb.ListManagedObjectsRequest, stats *resource.SearchStats) (*resourcepb.ListManagedObjectsResponse, error) {
if req.NextPageToken != "" {
return nil, fmt.Errorf("next page not implemented yet")
}
if req.Kind == "" {
return &resourcepb.ListManagedObjectsResponse{
Error: resource.NewBadRequestError("empty manager kind"),
}, nil
}
if req.Id == "" {
return &resourcepb.ListManagedObjectsResponse{
Error: resource.NewBadRequestError("empty manager id"),
}, nil
}
start := time.Now()
q := bleve.NewBooleanQuery()
q.AddMust(&query.TermQuery{
Term: req.Kind,
FieldVal: resource.SEARCH_FIELD_MANAGER_KIND,
})
q.AddMust(&query.TermQuery{
Term: req.Id,
FieldVal: resource.SEARCH_FIELD_MANAGER_ID,
})
stats.AddResultsConversionTime(time.Since(start))
found, err := b.index.SearchInContext(ctx, &bleve.SearchRequest{
Query: q,
Fields: []string{
resource.SEARCH_FIELD_TITLE,
resource.SEARCH_FIELD_FOLDER,
resource.SEARCH_FIELD_MANAGER_KIND,
resource.SEARCH_FIELD_MANAGER_ID,
resource.SEARCH_FIELD_SOURCE_PATH,
resource.SEARCH_FIELD_SOURCE_CHECKSUM,
resource.SEARCH_FIELD_SOURCE_TIME,
},
Sort: search.SortOrder{
&search.SortField{
Field: resource.SEARCH_FIELD_SOURCE_PATH,
Type: search.SortFieldAsString,
Desc: false,
},
},
Size: 1000000000, // big number
From: 0, // next page token not yet supported
})
if err != nil {
return nil, err
}
stats.AddTotalHits(int(found.Total))
stats.AddSearchTime(found.Took)
stats.AddReturnedDocuments(len(found.Hits))
asString := func(v any) string {
if v == nil {
return ""
}
str, ok := v.(string)
if ok {
return str
}
return fmt.Sprintf("%v", v)
}
asTime := func(v any) int64 {
if v == nil {
return 0
}
intV, ok := v.(int64)
if ok {
return intV
}
floatV, ok := v.(float64)
if ok {
return int64(floatV)
}
str, ok := v.(string)
if ok {
t, _ := time.Parse(time.RFC3339, str)
return t.UnixMilli()
}
return 0
}
start = time.Now()
rsp := &resourcepb.ListManagedObjectsResponse{}
for _, hit := range found.Hits {
item := &resourcepb.ListManagedObjectsResponse_Item{
Object: &resourcepb.ResourceKey{},
Hash: asString(hit.Fields[resource.SEARCH_FIELD_SOURCE_CHECKSUM]),
Path: asString(hit.Fields[resource.SEARCH_FIELD_SOURCE_PATH]),
Time: asTime(hit.Fields[resource.SEARCH_FIELD_SOURCE_TIME]),
Title: asString(hit.Fields[resource.SEARCH_FIELD_TITLE]),
Folder: asString(hit.Fields[resource.SEARCH_FIELD_FOLDER]),
}
err := resource.ReadSearchID(item.Object, hit.ID)
if err != nil {
return nil, err
}
rsp.Items = append(rsp.Items, item)
}
stats.AddResultsConversionTime(time.Since(start))
return rsp, nil
}
func (b *bleveIndex) CountManagedObjects(ctx context.Context, stats *resource.SearchStats) ([]*resourcepb.CountManagedObjectsResponse_ResourceCount, error) {
found, err := b.index.SearchInContext(ctx, &bleve.SearchRequest{
Query: bleve.NewMatchAllQuery(),
Size: 0,
Facets: bleve.FacetsRequest{
"count": bleve.NewFacetRequest(resource.SEARCH_FIELD_MANAGED_BY, 1000), // typically less then 5
},
})
if err != nil {
return nil, err
}
stats.AddSearchTime(found.Took)
stats.AddTotalHits(int(found.Total))
stats.AddReturnedDocuments(len(found.Hits))
vals := make([]*resourcepb.CountManagedObjectsResponse_ResourceCount, 0)
f, ok := found.Facets["count"]
if ok && f.Terms != nil {
for _, v := range f.Terms.Terms() {
val := v.Term
idx := strings.Index(val, ":")
if idx > 0 {
vals = append(vals, &resourcepb.CountManagedObjectsResponse_ResourceCount{
Kind: val[0:idx],
Id: val[idx+1:],
Group: b.key.Group,
Resource: b.key.Resource,
Count: int64(v.Count),
})
}
}
}
return vals, nil
}
// Search implements resource.DocumentIndex.
func (b *bleveIndex) Search(
ctx context.Context,
access authlib.AccessClient,
req *resourcepb.ResourceSearchRequest,
federate []resource.ResourceIndex, // For federated queries, these will match the values in req.federate
stats *resource.SearchStats,
) (*resourcepb.ResourceSearchResponse, error) {
ctx, span := tracer.Start(ctx, "search.bleveIndex.Search")
defer span.End()
if req.Options == nil || req.Options.Key == nil {
return &resourcepb.ResourceSearchResponse{
Error: resource.NewBadRequestError("missing query key"),
}, nil
}
response := &resourcepb.ResourceSearchResponse{
Error: b.verifyKey(req.Options.Key),
}
if response.Error != nil {
return response, nil
}
// Verifies the index federation
index, err := b.getIndex(ctx, req, federate)
if err != nil {
return nil, err
}
conversionStarts := time.Now()
// convert protobuf request to bleve request
searchrequest, e := b.toBleveSearchRequest(ctx, req, access)
if e != nil {
response.Error = e
return response, nil
}
// Show all fields when nothing is selected
if len(searchrequest.Fields) < 1 && req.Limit > 0 {
f, err := b.index.Fields()
if err != nil {
return nil, err
}
if len(f) > 0 {
searchrequest.Fields = f
} else {
searchrequest.Fields = []string{
resource.SEARCH_FIELD_TITLE,
resource.SEARCH_FIELD_FOLDER,
resource.SEARCH_FIELD_SOURCE_PATH,
resource.SEARCH_FIELD_MANAGED_BY,
}
}
}
stats.AddRequestConversionTime(time.Since(conversionStarts))
res, err := index.SearchInContext(ctx, searchrequest)
if err != nil {
return nil, err
}
response.TotalHits = int64(res.Total)
response.QueryCost = float64(res.Cost)
response.MaxScore = res.MaxScore
stats.AddSearchTime(res.Took)
stats.AddTotalHits(int(res.Total))
stats.AddReturnedDocuments(len(res.Hits))
resultsConversionStart := time.Now()
response.Results, err = b.hitsToTable(ctx, searchrequest.Fields, res.Hits, req.Explain)
if err != nil {
return nil, err
}
// parse the facet fields
for k, v := range res.Facets {
f := newResponseFacet(v)
if response.Facet == nil {
response.Facet = make(map[string]*resourcepb.ResourceSearchResponse_Facet)
}
response.Facet[k] = f
}
stats.AddResultsConversionTime(time.Since(resultsConversionStart))
return response, nil
}
func (b *bleveIndex) DocCount(ctx context.Context, folder string, stats *resource.SearchStats) (int64, error) {
ctx, span := tracer.Start(ctx, "search.bleveIndex.DocCount")
defer span.End()
if folder == "" {
count, err := b.index.DocCount()
return int64(count), err
}
req := &bleve.SearchRequest{
Size: 0, // we just need the count
Fields: []string{},
Query: &query.TermQuery{
Term: folder,
FieldVal: resource.SEARCH_FIELD_FOLDER,
},
}
rsp, err := b.index.SearchInContext(ctx, req)
if rsp == nil {
return 0, err
}
if stats != nil {
stats.AddTotalHits(int(rsp.Total))
stats.AddSearchTime(rsp.Took)
}
return int64(rsp.Total), err
}
// make sure the request key matches the index
func (b *bleveIndex) verifyKey(key *resourcepb.ResourceKey) *resourcepb.ErrorResult {
if key.Namespace != b.key.Namespace {
return resource.NewBadRequestError("namespace mismatch (expected " + b.key.Namespace + ")")
}
if key.Group != b.key.Group {
return resource.NewBadRequestError("group mismatch (expected " + b.key.Group + ")")
}
if key.Resource != b.key.Resource {
return resource.NewBadRequestError("resource mismatch (expected " + b.key.Resource + ")")
}
return nil
}
func (b *bleveIndex) getIndex(
ctx context.Context,
req *resourcepb.ResourceSearchRequest,
federate []resource.ResourceIndex,
) (bleve.Index, error) {
_, span := tracer.Start(ctx, "search.bleveIndex.getIndex")
defer span.End()
if len(req.Federated) != len(federate) {
return nil, fmt.Errorf("federation is misconfigured")
}
// Search across resources using
// https://blevesearch.com/docs/IndexAlias/
if len(federate) > 0 {
all := []bleve.Index{b.index}
for i, extra := range federate {
typedindex, ok := extra.(*bleveIndex)
if !ok {
return nil, fmt.Errorf("federated indexes must be the same type")
}
if typedindex.verifyKey(req.Federated[i]) != nil {
return nil, fmt.Errorf("federated index keys do not match (%v != %v)", typedindex, req.Federated[i])
}
all = append(all, typedindex.index)
}
return bleve.NewIndexAlias(all...), nil
}
return b.index, nil
}
func (b *bleveIndex) toBleveSearchRequest(ctx context.Context, req *resourcepb.ResourceSearchRequest, access authlib.AccessClient) (*bleve.SearchRequest, *resourcepb.ErrorResult) {
ctx, span := tracer.Start(ctx, "search.bleveIndex.toBleveSearchRequest")
defer span.End()
facets := bleve.FacetsRequest{}
for _, f := range req.Facet {
facets[f.Field] = bleve.NewFacetRequest(f.Field, int(f.Limit))
}
// Convert resource-specific fields to bleve fields.
// TODO: use b.fields.Field(f) instead of builders.DashboardFields() to avoid dashboard-specific code in search server.
fields := make([]string, 0, len(req.Fields))
for _, f := range req.Fields {
if slices.Contains(builders.DashboardFields(), f) {
f = resource.SEARCH_FIELD_PREFIX + f
}
fields = append(fields, f)
}
size, err := safeInt64ToInt(req.Limit)
if err != nil {
return nil, resource.AsErrorResult(err)
}
offset, err := safeInt64ToInt(req.Offset)
if err != nil {
return nil, resource.AsErrorResult(err)
}
searchrequest := &bleve.SearchRequest{
Fields: fields,
Size: size,
From: offset,
Explain: req.Explain,
Facets: facets,
}
// Currently everything is within an AND query
queries := []query.Query{}
if len(req.Options.Labels) > 0 {
for _, v := range req.Options.Labels {
q, err := requirementQuery(v, "labels.")
if err != nil {
return nil, err
}
queries = append(queries, q)
}
}
// filters
if len(req.Options.Fields) > 0 {
for _, v := range req.Options.Fields {
q, err := requirementQuery(v, "")
if err != nil {
return nil, err
}
queries = append(queries, q)
}
}
if len(req.Query) > 1 && strings.Contains(req.Query, "*") {
// wildcard query is expensive - should be used with caution
wildcard := bleve.NewWildcardQuery(req.Query)
queries = append(queries, wildcard)
}
if req.Query != "" && !strings.Contains(req.Query, "*") {
// Add a text query
searchrequest.Fields = append(searchrequest.Fields, resource.SEARCH_FIELD_SCORE)
// There are multiple ways to match the query string to documents. The following queries are ordered by priority:
// Query 1: Match the exact query string
queryExact := bleve.NewMatchQuery(req.Query)
queryExact.SetBoost(10.0)
queryExact.SetField(resource.SEARCH_FIELD_TITLE)
queryExact.Analyzer = keyword.Name // don't analyze the query input - treat it as a single token
queryExact.Operator = query.MatchQueryOperatorAnd // This doesn't make a difference for keyword analyzer, we add it just to be explicit.
// Query 2: Phrase query with standard analyzer
queryPhrase := bleve.NewMatchPhraseQuery(req.Query)
queryPhrase.SetBoost(5.0)
queryPhrase.SetField(resource.SEARCH_FIELD_TITLE)
queryPhrase.Analyzer = standard.Name
// Query 3: Match query with standard analyzer
queryAnalyzed := bleve.NewMatchQuery(removeSmallTerms(req.Query))
queryAnalyzed.SetField(resource.SEARCH_FIELD_TITLE)
queryAnalyzed.Analyzer = standard.Name
queryAnalyzed.Operator = query.MatchQueryOperatorAnd // Make sure all terms from the query are matched
// At least one of the queries must match
searchQuery := bleve.NewDisjunctionQuery(queryExact, queryAnalyzed, queryPhrase)
queries = append(queries, searchQuery)
}
switch len(queries) {
case 0:
searchrequest.Query = bleve.NewMatchAllQuery()
case 1:
searchrequest.Query = queries[0]
default:
searchrequest.Query = bleve.NewConjunctionQuery(queries...) // AND
}
if access != nil {
verb := utils.VerbList
if req.Permission == int64(dashboardaccess.PERMISSION_EDIT) {
verb = utils.VerbPatch
}
// Build resource -> verb mapping for batch authorization
resources := map[string]string{
b.key.Resource: verb,
}
// Handle federation
for _, federated := range req.Federated {
resources[federated.Resource] = utils.VerbList
}
searchrequest.Query = newPermissionScopedQuery(searchrequest.Query, permissionScopedQueryConfig{
access: access,
namespace: b.key.Namespace,
group: b.key.Group,
resources: resources,
})
}
for k, v := range req.Facet {
if searchrequest.Facets == nil {
searchrequest.Facets = make(bleve.FacetsRequest)
}
searchrequest.Facets[k] = bleve.NewFacetRequest(v.Field, int(v.Limit))
}
// Add the sort fields
sorting := getSortFields(req)
searchrequest.SortBy(sorting)
// When no sort fields are provided, sort by score if there is a query, otherwise sort by title
if len(sorting) == 0 {
if req.Query != "" && req.Query != "*" {
searchrequest.Sort = append(searchrequest.Sort, &search.SortScore{
Desc: true,
})
} else {
searchrequest.Sort = append(searchrequest.Sort, &search.SortField{
Field: resource.SEARCH_FIELD_TITLE_PHRASE,
Desc: false,
})
}
}
return searchrequest, nil
}
func removeSmallTerms(query string) string {
words := strings.Fields(query)
validWords := make([]string, 0, len(words))
for _, word := range words {
if len(word) >= EDGE_NGRAM_MIN_TOKEN {
validWords = append(validWords, word)
}
}
if len(validWords) == 0 {
return query
}
return strings.Join(validWords, " ")
}
func (b *bleveIndex) stopUpdaterAndCloseIndex() error {
// Signal updater to stop. We do this by 1) setting updaterShuttingDown + sending signal, and by 2) calling cancel.
b.updaterMu.Lock()
b.updaterShutdown = true
b.updaterCond.Broadcast()
// if updater is running, cancel it. (Setting to nil is only done from updater itself in defer.)
if b.updaterCancel != nil {
b.updaterCancel()
}
b.updaterMu.Unlock()
b.updaterWg.Wait()
// Close index only after updater is not working on it anymore.
return b.index.Close()
}
func (b *bleveIndex) UpdateIndex(ctx context.Context) (int64, error) {
// We don't have to do anything if the index cannot be updated (typically in tests).
if b.updaterFn == nil {
return 0, nil
}
// Use chan with buffer size 1 to ensure that we can always send the result back, even if there's no reader anymore.
req := updateRequest{requestTime: time.Now(), callback: make(chan updateResult, 1)}
// Make sure that the updater goroutine is running.
b.updaterMu.Lock()
if b.updaterShutdown {
b.updaterMu.Unlock()
return 0, fmt.Errorf("cannot update index: %w", bleve.ErrorIndexClosed)
}
b.updaterQueue = append(b.updaterQueue, req)
// If updater is not running, start it.
if b.updaterCancel == nil {
b.startUpdater()
}
b.updaterCond.Broadcast() // If updater is waiting for next batch, wake it up.
b.updaterMu.Unlock()
// wait for the update to finish
select {
case <-ctx.Done():
return 0, ctx.Err()
case ur := <-req.callback:
return ur.rv, ur.err
}
}
// Must be called with b.updaterMu lock held.
func (b *bleveIndex) startUpdater() {
c, cancel := context.WithCancel(context.Background())
b.updaterCancel = cancel
b.updaterWg.Add(1)
go func() {
defer func() {
cancel() // Make sure to call this to release resources.
b.updaterMu.Lock()
b.updaterCancel = nil
b.updaterMu.Unlock()
b.updaterWg.Done()
}()
b.runUpdater(c)
}()
}
const maxWait = 5 * time.Second
func (b *bleveIndex) runUpdater(ctx context.Context) {
for {
start := time.Now()
t := time.AfterFunc(maxWait, b.updaterCond.Broadcast)
b.updaterMu.Lock()
for !b.updaterShutdown && ctx.Err() == nil && len(b.updaterQueue) == 0 && time.Since(start) < maxWait {
// Cond is signaled when updaterShutdown changes, updaterQueue gets new element or when timeout occurs.
b.updaterCond.Wait()
}
shutdown := b.updaterShutdown
batch := b.updaterQueue
b.updaterQueue = nil // empty the queue for the next batch
b.updaterMu.Unlock()
t.Stop()
// Nothing to index after maxWait, exit the goroutine.
if len(batch) == 0 {
return
}
if shutdown {
for _, req := range batch {
req.callback <- updateResult{err: fmt.Errorf("cannot update index: %w", bleve.ErrorIndexClosed)}
}
return
}
// Check if requests arrived before minUpdateInterval since the last update has elapsed, and remove such requests.
for ix := 0; ix < len(batch); {
req := batch[ix]
if req.requestTime.Before(b.nextUpdateTime) {
req.callback <- updateResult{rv: b.resourceVersion}
batch = append(batch[:ix], batch[ix+1:]...)
} else {
// Keep in the batch
ix++
}
}
// If all requests are now handled, don't perform update.
if len(batch) == 0 {
continue
}
// Bump next update time
b.nextUpdateTime = time.Now().Add(b.minUpdateInterval)
var rv int64
var err = ctx.Err()
if err == nil {
rv, err = b.updateIndexWithLatestModifications(ctx, len(batch))
}
for _, req := range batch {
req.callback <- updateResult{rv: rv, err: err}
}
}
}
func (b *bleveIndex) updateIndexWithLatestModifications(ctx context.Context, requests int) (int64, error) {
ctx, span := tracer.Start(ctx, "search.bleveIndex.updateIndexWithLatestModifications")
defer span.End()
sinceRV := b.resourceVersion
b.logger.Debug("Updating index", "sinceRV", sinceRV, "requests", requests)
startTime := time.Now()
listRV, docs, err := b.updaterFn(ctx, b, sinceRV)
if err == nil && listRV > 0 && listRV != sinceRV {
err = b.updateResourceVersion(listRV) // updates b.resourceVersion
}
elapsed := time.Since(startTime)
if err == nil {
b.logger.Debug("Finished updating index", "sinceRV", sinceRV, "listRV", listRV, "duration", elapsed, "docs", docs)
if b.updateLatency != nil {
b.updateLatency.Observe(elapsed.Seconds())
}
if b.updatedDocuments != nil {
b.updatedDocuments.Observe(float64(docs))
}
} else {
b.logger.Error("Updating of index finished with error", "duration", elapsed, "err", err)
}
return listRV, err
}
func safeInt64ToInt(i64 int64) (int, error) {
if i64 > math.MaxInt32 || i64 < math.MinInt32 {
return 0, fmt.Errorf("int64 value %d overflows int", i64)
}
return int(i64), nil
}
func getSortFields(req *resourcepb.ResourceSearchRequest) []string {
sorting := make([]string, 0, len(req.SortBy))
for _, sort := range req.SortBy {
input := sort.Field
if field, ok := textSortFields[input]; ok {
input = field
}
// TODO: pass fields parameter and use fields.Field(input) instead of builders.DashboardFields() to avoid dashboard-specific code.
if slices.Contains(builders.DashboardFields(), input) {
input = resource.SEARCH_FIELD_PREFIX + input
}
if sort.Desc {
input = "-" + input
}
sorting = append(sorting, input)
}
return sorting
}
// fields that we went to sort by the full text
var textSortFields = map[string]string{
resource.SEARCH_FIELD_TITLE: resource.SEARCH_FIELD_TITLE_PHRASE,
}
const lowerCase = "phrase"
// termField fields to use termQuery for filtering
var termFields = []string{
resource.SEARCH_FIELD_TITLE,
}
// Convert a "requirement" into a bleve query
func requirementQuery(req *resourcepb.Requirement, prefix string) (query.Query, *resourcepb.ErrorResult) {
switch selection.Operator(req.Operator) {
case selection.Equals, selection.DoubleEquals:
if len(req.Values) == 0 {
return query.NewMatchAllQuery(), nil
}
// FIXME: special case for login and email to use term query only because those fields are using keyword analyzer
// This should be fixed by using the info from the schema
if (req.Key == "login" || req.Key == "email") && len(req.Values) == 1 {
tq := bleve.NewTermQuery(req.Values[0])
tq.SetField(prefix + req.Key)
return tq, nil
}
if len(req.Values) == 1 {
filter := filterValue(req.Key, req.Values[0])
return newQuery(req.Key, filter, prefix), nil
}
conjuncts := []query.Query{}
for _, v := range req.Values {
q := newQuery(req.Key, filterValue(req.Key, v), prefix)
conjuncts = append(conjuncts, q)
}
return query.NewConjunctionQuery(conjuncts), nil
case selection.NotEquals:
case selection.DoesNotExist:
case selection.GreaterThan:
case selection.LessThan:
case selection.Exists:
case selection.In:
if len(req.Values) == 0 {
return query.NewMatchAllQuery(), nil
}
if len(req.Values) == 1 {
q := newQuery(req.Key, filterValue(req.Key, req.Values[0]), prefix)
return q, nil
}
disjuncts := []query.Query{}
for _, v := range req.Values {
q := newQuery(req.Key, filterValue(req.Key, v), prefix)
disjuncts = append(disjuncts, q)
}
return query.NewDisjunctionQuery(disjuncts), nil
case selection.NotIn:
boolQuery := bleve.NewBooleanQuery()
var mustNotQueries []query.Query
for _, value := range req.Values {
q := newQuery(req.Key, filterValue(req.Key, value), prefix)
mustNotQueries = append(mustNotQueries, q)
}
boolQuery.AddMustNot(mustNotQueries...)
// must still have a value
notEmptyQuery := bleve.NewMatchAllQuery()
boolQuery.AddMust(notEmptyQuery)
return boolQuery, nil
}
return nil, resource.NewBadRequestError(
fmt.Sprintf("unsupported query operation (%s %s %v)", req.Key, req.Operator, req.Values),
)
}
// newQuery will create a query that will match the value or the tokens of the value
func newQuery(key string, value string, prefix string) query.Query {
if value == "*" {
return bleve.NewMatchAllQuery()
}
if strings.Contains(value, "*") {
// wildcard query is expensive - should be used with caution
return bleve.NewWildcardQuery(value)
}
delimiter, ok := hasTerms(value)
if slices.Contains(termFields, key) && ok {
return newTermsQuery(key, value, delimiter, prefix)
}
q := bleve.NewMatchQuery(value)
q.SetField(prefix + key)
return q
}
// newTermsQuery will create a query that will match on term or tokens
func newTermsQuery(key string, value string, delimiter string, prefix string) query.Query {
tokens := strings.Split(value, delimiter)
// won't match with ending space
value = strings.TrimSuffix(value, " ")
q := bleve.NewTermQuery(value)
q.SetField(prefix + key)
cq := newMatchAllTokensQuery(tokens, key, prefix)
return bleve.NewDisjunctionQuery(q, cq)
}
// newMatchAllTokensQuery will create a query that will match on all tokens
func newMatchAllTokensQuery(tokens []string, key string, prefix string) query.Query {
cq := bleve.NewConjunctionQuery()
for _, token := range tokens {
_, ok := hasTerms(token)
if ok {
tq := bleve.NewTermQuery(token)
tq.SetField(prefix + key)
cq.AddQuery(tq)
continue
}
mq := bleve.NewMatchQuery(token)
mq.SetField(prefix + key)
cq.AddQuery(mq)
}
return cq
}
// filterValue will convert the value to lower case if the field is a phrase field
func filterValue(field string, v string) string {
if strings.HasSuffix(field, lowerCase) {
return strings.ToLower(v)
}
return v
}
func (b *bleveIndex) hitsToTable(ctx context.Context, selectFields []string, hits search.DocumentMatchCollection, explain bool) (*resourcepb.ResourceTable, error) {
_, span := tracer.Start(ctx, "search.bleveIndex.hitsToTable")
defer span.End()
fields := []*resourcepb.ResourceTableColumnDefinition{}
for _, name := range selectFields {
if name == "_all" {
fields = b.allFields
break
}
f := b.standard.Field(name)
if f == nil && b.fields != nil {
f = b.fields.Field(name)
}
if f == nil {
// Labels as a string
if strings.HasPrefix(name, "labels.") {
f = &resourcepb.ResourceTableColumnDefinition{
Name: name,
Type: resourcepb.ResourceTableColumnDefinition_STRING,
}
}
// return nil, fmt.Errorf("unknown response field: " + name)
if f == nil {
continue // OK for now
}
}
fields = append(fields, f)
}
if explain {
fields = append(fields, b.standard.Field(resource.SEARCH_FIELD_EXPLAIN))
}
builder, err := resource.NewTableBuilder(fields)
if err != nil {
return nil, err
}
encoders := builder.Encoders()
table := &resourcepb.ResourceTable{
Columns: fields,
Rows: make([]*resourcepb.ResourceTableRow, hits.Len()),
}
for rowID, match := range hits {
row := &resourcepb.ResourceTableRow{
Key: &resourcepb.ResourceKey{},
Cells: make([][]byte, len(fields)),
}
table.Rows[rowID] = row
err := resource.ReadSearchID(row.Key, match.ID)
if err != nil {
return nil, err
}
for i, f := range fields {
var v any
switch f.Name {
case resource.SEARCH_FIELD_ID:
row.Cells[i] = []byte(match.ID)
case resource.SEARCH_FIELD_SCORE:
row.Cells[i], err = encoders[i](match.Score)
case resource.SEARCH_FIELD_EXPLAIN:
if match.Expl != nil {
row.Cells[i], err = json.Marshal(match.Expl)
}
case resource.SEARCH_FIELD_LEGACY_ID:
v := match.Fields[resource.SEARCH_FIELD_LABELS+"."+resource.SEARCH_FIELD_LEGACY_ID]
if v != nil {
str, ok := v.(string)
if ok {
id, _ := strconv.ParseInt(str, 10, 64)
row.Cells[i], err = encoders[i](id)
}
}
default:
fieldName := f.Name
// since the bleve index fields mix common and resource-specific fields, it is possible a conflict can happen
// if a specific field is named the same as a common field
v := match.Fields[fieldName]
// fields that are specific to the resource get stored as fields.<fieldName>, so we need to check for that
if v == nil {
v = match.Fields[resource.SEARCH_FIELD_PREFIX+fieldName]
}
if v != nil {
// Encode the value to protobuf
row.Cells[i], err = encoders[i](v)
}
}
if err != nil {
return nil, fmt.Errorf("error encoding (row:%d/col:%d) %v %w", rowID, i, v, err)
}
}
}
return table, nil
}
func getAllFields(standard resource.SearchableDocumentFields, custom resource.SearchableDocumentFields) ([]*resourcepb.ResourceTableColumnDefinition, error) {
fields := []*resourcepb.ResourceTableColumnDefinition{
standard.Field(resource.SEARCH_FIELD_ID),
standard.Field(resource.SEARCH_FIELD_TITLE),
standard.Field(resource.SEARCH_FIELD_TAGS),
standard.Field(resource.SEARCH_FIELD_FOLDER),
standard.Field(resource.SEARCH_FIELD_RV),
standard.Field(resource.SEARCH_FIELD_CREATED),
standard.Field(resource.SEARCH_FIELD_LEGACY_ID),
standard.Field(resource.SEARCH_FIELD_MANAGER_KIND),
}
if custom != nil {
for _, name := range custom.Fields() {
f := custom.Field(name)
if f.Priority > 10 {
continue
}
fields = append(fields, f)
}
}
for _, field := range fields {
if field == nil {
return nil, fmt.Errorf("invalid all field")
}
}
return fields, nil
}
func newResponseFacet(v *search.FacetResult) *resourcepb.ResourceSearchResponse_Facet {
f := &resourcepb.ResourceSearchResponse_Facet{
Field: v.Field,
Total: int64(v.Total),
Missing: int64(v.Missing),
}
if v.Terms != nil {
for _, t := range v.Terms.Terms() {
f.Terms = append(f.Terms, &resourcepb.ResourceSearchResponse_TermFacet{
Term: t.Term,
Count: int64(t.Count),
})
}
}
return f
}
type permissionScopedQuery struct {
query.Query
access authlib.AccessClient
namespace string
group string
resources map[string]string // resource -> verb mapping
log log.Logger
}
type permissionScopedQueryConfig struct {
access authlib.AccessClient
namespace string
group string
resources map[string]string // resource -> verb mapping
}
func newPermissionScopedQuery(q query.Query, cfg permissionScopedQueryConfig) *permissionScopedQuery {
return &permissionScopedQuery{
Query: q,
access: cfg.access,
namespace: cfg.namespace,
group: cfg.group,
resources: cfg.resources,
log: log.New("search_permissions"),
}
}
func (q *permissionScopedQuery) Searcher(ctx context.Context, i index.IndexReader, m mapping.IndexMapping, options search.SearcherOptions) (search.Searcher, error) {
logger := q.log.FromContext(ctx)
searcher, err := q.Query.Searcher(ctx, i, m, options)
if err != nil {
return nil, err
}
dvReader, err := i.DocValueReader([]string{"folder"})
if err != nil {
return nil, err
}
return newBatchAuthzSearcher(ctx, searcher, i, dvReader, q.access, q.namespace, q.group, q.resources, logger), nil
}
// docInfo holds document information for authorization
type docInfo struct {
doc *search.DocumentMatch
resourceType string
name string
folder string
verb string
}
// batchAuthzSearcher implements a batch-aware authorization filtering searcher
// using FilterAuthorized with iter.Pull2 for efficient batched authorization
type batchAuthzSearcher struct {
ctx context.Context
searcher search.Searcher
indexReader index.IndexReader
dvReader index.DocValueReader
access authlib.AccessClient
namespace string
group string
resources map[string]string // resource -> verb mapping
log log.Logger
// Pull iterator state (lazily initialized)
searchCtx *search.SearchContext
next func() (docInfo, error, bool)
stop func()
}
func newBatchAuthzSearcher(
ctx context.Context,
searcher search.Searcher,
indexReader index.IndexReader,
dvReader index.DocValueReader,
access authlib.AccessClient,
namespace string,
group string,
resources map[string]string,
logger log.Logger,
) *batchAuthzSearcher {
return &batchAuthzSearcher{
ctx: ctx,
searcher: searcher,
indexReader: indexReader,
dvReader: dvReader,
access: access,
namespace: namespace,
group: group,
resources: resources,
log: logger,
}
}
func (s *batchAuthzSearcher) Next(searchCtx *search.SearchContext) (*search.DocumentMatch, error) {
// Lazy initialization of pull iterator
if s.next == nil {
s.searchCtx = searchCtx
s.initPullIterator()
}
info, err, ok := s.next()
if !ok {
return nil, nil // No more documents
}
if err != nil {
return nil, err
}
return info.doc, nil
}
// initPullIterator sets up the FilterAuthorized iterator as a pull iterator
func (s *batchAuthzSearcher) initPullIterator() {
// Create iter.Seq that pulls documents from searcher and parses them
candidates := func(yield func(docInfo) bool) {
for {
doc, err := s.searcher.Next(s.searchCtx)
if err != nil {
s.log.Debug("Error getting next document", "error", err)
return
}
if doc == nil {
return // No more documents
}
info, ok := s.parseDocInfo(doc)
if !ok {
continue // Skip invalid documents
}
if !yield(info) {
return
}
}
}
extractFn := func(info docInfo) authz.BatchCheckItem {
return authz.BatchCheckItem{
Name: info.name,
Folder: info.folder,
Verb: info.verb,
Group: s.group,
Resource: info.resourceType,
Namespace: s.namespace,
}
}
// FilterAuthorized extracts auth from context and batches internally
authzIter := authz.FilterAuthorized(s.ctx, s.access, candidates, extractFn).Items
// Convert push iterator to pull iterator
s.next, s.stop = iter.Pull2(authzIter)
}
// parseDocInfo extracts document information needed for authorization
func (s *batchAuthzSearcher) parseDocInfo(doc *search.DocumentMatch) (docInfo, bool) {
// Get external ID
externalID, err := s.indexReader.ExternalID(doc.IndexInternalID)
if err != nil {
s.log.Debug("Error getting external ID", "error", err)
return docInfo{}, false
}
doc.ID = externalID
// Parse doc ID: <namespace>/<group>/<resourceType>/<name>
parts := strings.Split(doc.ID, "/")
if len(parts) != 4 {
s.log.Debug("Unexpected document ID format", "id", doc.ID)
return docInfo{}, false
}
resourceType := parts[2]
name := parts[3]
// Get folder from doc values
folder := ""
err = s.dvReader.VisitDocValues(doc.IndexInternalID, func(field string, value []byte) {
if field == "folder" {
folder = string(value)
}
})
if err != nil {
s.log.Debug("Error reading doc values", "error", err)
return docInfo{}, false
}
// Check if we have a verb for this resource type
verb, ok := s.resources[resourceType]
if !ok {
s.log.Debug("No verb found for resource", "resource", resourceType)
return docInfo{}, false
}
return docInfo{
doc: doc,
resourceType: resourceType,
name: name,
folder: folder,
verb: verb,
}, true
}
func (s *batchAuthzSearcher) Advance(searchCtx *search.SearchContext, ID index.IndexInternalID) (*search.DocumentMatch, error) {
return s.searcher.Advance(searchCtx, ID)
}
func (s *batchAuthzSearcher) Close() error {
if s.stop != nil {
s.stop()
}
return s.searcher.Close()
}
func (s *batchAuthzSearcher) Size() int {
return s.searcher.Size()
}
func (s *batchAuthzSearcher) DocumentMatchPoolSize() int {
return s.searcher.DocumentMatchPoolSize()
}
func (s *batchAuthzSearcher) Min() int {
return s.searcher.Min()
}
func (s *batchAuthzSearcher) Count() uint64 {
return s.searcher.Count()
}
func (s *batchAuthzSearcher) SetQueryNorm(qnorm float64) {
s.searcher.SetQueryNorm(qnorm)
}
func (s *batchAuthzSearcher) Weight() float64 {
return s.searcher.Weight()
}
// hasTerms - any value that will be split into multiple tokens
var hasTerms = func(v string) (string, bool) {
for _, c := range TermCharacters {
if strings.Contains(v, c) {
return c, true
}
}
return "", false
}
// TermCharacters characters that will be used to determine if a value is split into tokens
var TermCharacters = []string{
" ", "-", "_", ".", ",", ":", ";", "?", "!", "@", "#", "$", "%", "^", "&", "*", "(", ")", "+",
"=", "{", "}", "[", "]", "|", "\\", "/", "<", ">", "~", "`",
"'", "\"",
}