Compare commits

...

2 Commits

Author SHA1 Message Date
Georges Chaudy
1022f04063 Refactor Watch method to implement batch event processing and authorization 2026-01-09 10:25:50 +01:00
Georges Chaudy
5e7d0392a3 Refactor authorization checks to utilize batch checks instead of compile across multiple services
- Integrated batch authorization checks using the authz package in List functions for IAM and SecureValue services, improving efficiency in permission validation.
- Updated List functions to handle pagination and authorization in a more streamlined manner, reducing redundant checks.
- Enhanced the server's List method to support batch authorization for resource listing, ensuring proper access control.
- Refactored related test cases to validate the new batch authorization logic and ensure comprehensive coverage of various scenarios.
2026-01-09 10:25:36 +01:00
6 changed files with 594 additions and 243 deletions

View File

@@ -4,10 +4,10 @@ import (
"context"
"strconv"
"github.com/grafana/authlib/authz"
authlib "github.com/grafana/authlib/types"
iamv0alpha1 "github.com/grafana/grafana/apps/iam/pkg/apis/iam/v0alpha1"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/apimachinery/utils"
legacyiamv0 "github.com/grafana/grafana/pkg/apis/iam/v0alpha1"
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
@@ -54,10 +54,10 @@ type ListResponse[T Resource] struct {
type ListFunc[T Resource] func(ctx context.Context, ns authlib.NamespaceInfo, p Pagination) (*ListResponse[T], error)
// List is a helper function that will perform access check on resources if
// prvovided with a authlib.AccessClient.
// provided with a authlib.AccessClient.
func List[T Resource](
ctx context.Context,
resource utils.ResourceInfo,
resourceInfo utils.ResourceInfo,
ac authlib.AccessClient,
p Pagination,
fn ListFunc[T],
@@ -67,63 +67,86 @@ func List[T Resource](
return nil, err
}
ident, err := identity.GetRequester(ctx)
if err != nil {
return nil, err
}
check := func(_, _ string) bool { return true }
if ac != nil {
var err error
check, _, err = ac.Compile(ctx, ident, authlib.ListRequest{
Resource: resource.GroupResource().Resource,
Group: resource.GroupResource().Group,
Verb: "list",
Namespace: ns.Value,
})
if err != nil {
return nil, err
}
}
res := &ListResponse[T]{Items: make([]T, 0, p.Limit)}
first, err := fn(ctx, ns, p)
if err != nil {
return nil, err
}
for _, item := range first.Items {
if !check(item.AuthID(), "") {
continue
}
res.Items = append(res.Items, item)
}
res.Continue = first.Continue
res.RV = first.RV
// If no access client, skip authorization
if ac == nil {
res.Items = append(res.Items, first.Items...)
for len(res.Items) < int(p.Limit) && res.Continue != 0 {
r, err := fn(ctx, ns, Pagination{Limit: p.Limit - int64(len(res.Items)), Continue: res.Continue})
if err != nil {
return nil, err
}
res.Items = append(res.Items, r.Items...)
res.Continue = r.Continue
}
return res, nil
}
// Use FilterAuthorized to batch authorize items
extractFn := func(item T) authz.BatchCheckItem {
return authz.BatchCheckItem{
Name: item.AuthID(),
Folder: "",
Verb: "list",
Group: resourceInfo.GroupResource().Group,
Resource: resourceInfo.GroupResource().Resource,
Namespace: ns.Value,
}
}
// Convert first batch to iter.Seq and filter
firstCandidates := func(yield func(T) bool) {
for _, item := range first.Items {
if !yield(item) {
return
}
}
}
for item, err := range authz.FilterAuthorized(ctx, ac, firstCandidates, extractFn).Items {
if err != nil {
return nil, err
}
res.Items = append(res.Items, item)
}
outer:
for len(res.Items) < int(p.Limit) && res.Continue != 0 {
// FIXME: it is not optimal to reduce the amout we look for here but it is the easiest way to
// FIXME: it is not optimal to reduce the amount we look for here but it is the easiest way to
// correctly handle pagination and continue tokens
r, err := fn(ctx, ns, Pagination{Limit: p.Limit - int64(len(res.Items)), Continue: res.Continue})
if err != nil {
return nil, err
}
for _, item := range r.Items {
if len(res.Items) == int(p.Limit) {
candidates := func(yield func(T) bool) {
for _, item := range r.Items {
if !yield(item) {
return
}
}
}
for item, authErr := range authz.FilterAuthorized(ctx, ac, candidates, extractFn).Items {
if authErr != nil {
return nil, authErr
}
if len(res.Items) >= int(p.Limit) {
res.Continue = r.Continue
break outer
}
if !check(item.AuthID(), "") {
continue
}
res.Items = append(res.Items, item)
}
res.Continue = r.Continue
}
return res, nil

View File

@@ -6,6 +6,7 @@ import (
"strconv"
"time"
"github.com/grafana/authlib/authz"
claims "github.com/grafana/authlib/types"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
@@ -297,35 +298,38 @@ func (s *SecureValueService) List(ctx context.Context, namespace xkube.Namespace
s.metrics.SecureValueListDuration.WithLabelValues(strconv.FormatBool(success)).Observe(time.Since(start).Seconds())
}()
user, ok := claims.AuthInfoFrom(ctx)
if !ok {
return nil, fmt.Errorf("missing auth info in context")
}
hasPermissionFor, _, err := s.accessClient.Compile(ctx, user, claims.ListRequest{
Group: secretv1beta1.APIGroup,
Resource: secretv1beta1.SecureValuesResourceInfo.GetName(),
Namespace: namespace.String(),
Verb: utils.VerbGet, // Why not VerbList?
})
if err != nil {
return nil, fmt.Errorf("failed to compile checker: %w", err)
}
secureValuesMetadata, err := s.secureValueMetadataStorage.List(ctx, namespace)
if err != nil {
return nil, fmt.Errorf("fetching secure values from storage: %+w", err)
}
// Convert slice to iter.Seq
candidates := func(yield func(secretv1beta1.SecureValue) bool) {
for _, m := range secureValuesMetadata {
if !yield(m) {
return
}
}
}
extractFn := func(sv secretv1beta1.SecureValue) authz.BatchCheckItem {
return authz.BatchCheckItem{
Name: sv.Name,
Folder: "",
Verb: utils.VerbGet, // Why not VerbList?
Group: secretv1beta1.APIGroup,
Resource: secretv1beta1.SecureValuesResourceInfo.GetName(),
Namespace: namespace.String(),
}
}
out := make([]secretv1beta1.SecureValue, 0)
for _, metadata := range secureValuesMetadata {
// Check whether the user has permission to access this specific SecureValue in the namespace.
if !hasPermissionFor(metadata.Name, "") {
continue
for item, err := range authz.FilterAuthorized(ctx, s.accessClient, candidates, extractFn).Items {
if err != nil {
return nil, fmt.Errorf("failed to check authorization: %w", err)
}
out = append(out, metadata)
out = append(out, item)
}
return &secretv1beta1.SecureValueList{

View File

@@ -20,6 +20,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"github.com/grafana/authlib/authz"
claims "github.com/grafana/authlib/types"
"github.com/grafana/dskit/backoff"
@@ -1051,78 +1052,93 @@ func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resour
rsp := &resourcepb.ListResponse{}
key := req.Options.Key
checker, _, err := s.access.Compile(ctx, user, claims.ListRequest{
Group: key.Group,
Resource: key.Resource,
Namespace: key.Namespace,
Verb: utils.VerbGet,
})
var trashChecker claims.ItemChecker // only for trash
// Determine verb for authorization
verb := utils.VerbGet
if req.Source == resourcepb.ListRequest_TRASH {
trashChecker, _, err = s.access.Compile(ctx, user, claims.ListRequest{
Group: key.Group,
Resource: key.Resource,
Namespace: key.Namespace,
Verb: utils.VerbSetPermissions, // Basically Admin
})
if err != nil {
return &resourcepb.ListResponse{Error: AsErrorResult(err)}, nil
}
}
if err != nil {
return &resourcepb.ListResponse{Error: AsErrorResult(err)}, nil
}
if checker == nil {
return &resourcepb.ListResponse{Error: &resourcepb.ErrorResult{
Code: http.StatusForbidden,
}}, nil
verb = utils.VerbSetPermissions // Basically Admin for trash
}
// Candidate item for batch authorization
type candidateItem struct {
name string
folder string
resourceVersion int64
value []byte
continueToken string
}
var nextToken string
var iterErr error
// Process items in batches within the iterator
iterFunc := func(iter ListIterator) error {
for iter.Next() {
if err := iter.Error(); err != nil {
// Convert ListIterator to iter.Seq
candidates := func(yield func(candidateItem) bool) {
for iter.Next() {
if !yield(candidateItem{
name: iter.Name(),
folder: iter.Folder(),
resourceVersion: iter.ResourceVersion(),
value: iter.Value(),
continueToken: iter.ContinueToken(),
}) {
return
}
}
}
extractFn := func(c candidateItem) authz.BatchCheckItem {
return authz.BatchCheckItem{
Name: c.name,
Folder: c.folder,
Verb: verb,
Group: key.Group,
Resource: key.Resource,
Namespace: key.Namespace,
}
}
for item, err := range authz.FilterAuthorized(ctx, s.access, candidates, extractFn).Items {
if err != nil {
return err
}
// Trash is only accessible to admins or the user who deleted the object
// For trash items, also check if user is the one who deleted it
if req.Source == resourcepb.ListRequest_TRASH {
if !s.isTrashItemAuthorized(ctx, iter, trashChecker) {
if !s.isTrashItemAuthorizedByValue(ctx, item.value, true) {
continue
}
} else if !checker(iter.Name(), iter.Folder()) {
continue
}
item := &resourcepb.ResourceWrapper{
ResourceVersion: iter.ResourceVersion(),
Value: iter.Value(),
}
rsp.Items = append(rsp.Items, &resourcepb.ResourceWrapper{
ResourceVersion: item.resourceVersion,
Value: item.value,
})
pageBytes += len(item.value)
pageBytes += len(item.Value)
rsp.Items = append(rsp.Items, item)
// Check if we've reached the page limit
if (req.Limit > 0 && len(rsp.Items) >= int(req.Limit)) || pageBytes >= maxPageBytes {
t := iter.ContinueToken()
if iter.Next() {
rsp.NextPageToken = t
}
return iter.Error()
nextToken = item.continueToken
break
}
}
return iter.Error()
}
var rv int64
switch req.Source {
case resourcepb.ListRequest_STORE:
rv, err = s.backend.ListIterator(ctx, req, iterFunc)
rv, iterErr = s.backend.ListIterator(ctx, req, iterFunc)
case resourcepb.ListRequest_HISTORY, resourcepb.ListRequest_TRASH:
rv, err = s.backend.ListHistory(ctx, req, iterFunc)
rv, iterErr = s.backend.ListHistory(ctx, req, iterFunc)
default:
return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid list source: %v", req.Source))
}
if err != nil {
rsp.Error = AsErrorResult(err)
if iterErr != nil {
rsp.Error = AsErrorResult(iterErr)
return rsp, nil
}
@@ -1134,18 +1150,21 @@ func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resour
return rsp, nil
}
rsp.ResourceVersion = rv
return rsp, err
rsp.NextPageToken = nextToken
return rsp, nil
}
// isTrashItemAuthorized checks if the user has access to the trash item.
func (s *server) isTrashItemAuthorized(ctx context.Context, iter ListIterator, trashChecker claims.ItemChecker) bool {
// isTrashItemAuthorizedByValue checks if the user has access to the trash item using the raw value.
// hasAdminPermission indicates whether the user has admin permission (from BatchCheck).
func (s *server) isTrashItemAuthorizedByValue(ctx context.Context, value []byte, hasAdminPermission bool) bool {
user, ok := claims.AuthInfoFrom(ctx)
if !ok || user == nil {
return false
}
partial := &metav1.PartialObjectMetadata{}
err := json.Unmarshal(iter.Value(), partial)
err := json.Unmarshal(value, partial)
if err != nil {
return false
}
@@ -1161,7 +1180,7 @@ func (s *server) isTrashItemAuthorized(ctx context.Context, iter ListIterator, t
}
// Trash is only accessible to admins or the user who deleted the object
return obj.GetUpdatedBy() == user.GetUID() || trashChecker(iter.Name(), iter.Folder())
return obj.GetUpdatedBy() == user.GetUID() || hasAdminPermission
}
func (s *server) initWatcher() error {
@@ -1202,18 +1221,6 @@ func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStor
}
key := req.Options.Key
checker, _, err := s.access.Compile(ctx, user, claims.ListRequest{
Group: key.Group,
Resource: key.Resource,
Namespace: key.Namespace,
Verb: utils.VerbGet,
})
if err != nil {
return err
}
if checker == nil {
return apierrors.NewUnauthorized("not allowed to list anything") // ?? or a single error?
}
// Start listening -- this will buffer any changes that happen while we backfill.
// If events are generated faster than we can process them, then some events will be dropped.
@@ -1267,22 +1274,56 @@ func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStor
var initialEventsRV int64 // resource version coming from the initial events
if req.SendInitialEvents {
// Backfill the stream by adding every existing entities.
// Backfill the stream by adding every existing entities with batch authorization
type candidateEvent struct {
name string
folder string
value []byte
version int64
}
initialEventsRV, err = s.backend.ListIterator(ctx, &resourcepb.ListRequest{Options: req.Options}, func(iter ListIterator) error {
for iter.Next() {
if err := iter.Error(); err != nil {
// Convert ListIterator to iter.Seq
candidates := func(yield func(candidateEvent) bool) {
for iter.Next() {
if !yield(candidateEvent{
name: iter.Name(),
folder: iter.Folder(),
value: iter.Value(),
version: iter.ResourceVersion(),
}) {
return
}
}
}
extractFn := func(c candidateEvent) authz.BatchCheckItem {
return authz.BatchCheckItem{
Name: c.name,
Folder: c.folder,
Verb: utils.VerbGet,
Group: key.Group,
Resource: key.Resource,
Namespace: key.Namespace,
}
}
for item, err := range authz.FilterAuthorized(ctx, s.access, candidates, extractFn).Items {
if err != nil {
return err
}
if err := srv.Send(&resourcepb.WatchEvent{
Type: resourcepb.WatchEvent_ADDED,
Resource: &resourcepb.WatchEvent_Resource{
Value: iter.Value(),
Version: iter.ResourceVersion(),
Value: item.value,
Version: item.version,
},
}); err != nil {
return err
}
}
return iter.Error()
})
if err != nil {
@@ -1309,6 +1350,127 @@ func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStor
default:
since = req.Since
}
// Type to hold candidate events for batch authorization
type candidateWatchEvent struct {
event *WrittenEvent
}
// Type to hold authorized event with its fetched previous object
type authorizedEvent struct {
event *WrittenEvent
previous *resourcepb.ReadResponse // nil if no previous or fetch failed
}
const maxBatchSize = 100
// processEventBatch authorizes and sends a batch of events.
// Errors are logged but never returned to keep the watch running.
processEventBatch := func(batch []*WrittenEvent) {
if len(batch) == 0 {
return
}
// Convert batch to iter.Seq for FilterAuthorized
candidates := func(yield func(candidateWatchEvent) bool) {
for _, event := range batch {
if !yield(candidateWatchEvent{event: event}) {
return
}
}
}
extractFn := func(c candidateWatchEvent) authz.BatchCheckItem {
return authz.BatchCheckItem{
Name: c.event.Key.Name,
Folder: c.event.Folder,
Verb: utils.VerbGet,
Group: key.Group,
Resource: key.Resource,
Namespace: key.Namespace,
}
}
// Step 1: Collect all authorized events
var authorizedEvents []authorizedEvent
for item, err := range authz.FilterAuthorized(ctx, s.access, candidates, extractFn).Items {
if err != nil {
s.log.Error("error during batch authorization", "error", err)
continue
}
authorizedEvents = append(authorizedEvents, authorizedEvent{event: item.event})
}
if len(authorizedEvents) == 0 {
return
}
// Step 2: Fetch previous objects concurrently for events that need them
var wg sync.WaitGroup
for i := range authorizedEvents {
if authorizedEvents[i].event.PreviousRV > 0 {
wg.Add(1)
go func(idx int) {
defer wg.Done()
event := authorizedEvents[idx].event
prevObj, readErr := s.Read(ctx, &resourcepb.ReadRequest{Key: event.Key, ResourceVersion: event.PreviousRV})
if readErr != nil {
s.log.Error("error reading previous object", "key", event.Key, "resource_version", event.PreviousRV, "error", readErr)
return
}
if prevObj.Error != nil {
s.log.Error("error reading previous object", "key", event.Key, "resource_version", event.PreviousRV, "error", prevObj.Error)
return
}
if prevObj.ResourceVersion != event.PreviousRV {
s.log.Error("resource version mismatch", "key", event.Key, "resource_version", event.PreviousRV, "actual", prevObj.ResourceVersion)
return
}
authorizedEvents[idx].previous = prevObj
}(i)
}
}
wg.Wait()
// Step 3: Send all events in order
for _, authEvent := range authorizedEvents {
event := authEvent.event
value := event.Value
// remove the delete marker stored in the value for deleted objects
if event.Type == resourcepb.WatchEvent_DELETED {
value = []byte{}
}
resp := &resourcepb.WatchEvent{
Timestamp: event.Timestamp,
Type: event.Type,
Resource: &resourcepb.WatchEvent_Resource{
Value: value,
Version: event.ResourceVersion,
},
}
if authEvent.previous != nil {
resp.Previous = &resourcepb.WatchEvent_Resource{
Value: authEvent.previous.Value,
Version: authEvent.previous.ResourceVersion,
}
}
if err := srv.Send(resp); err != nil {
s.log.Error("error sending watch event", "key", event.Key, "error", err)
continue
}
if s.storageMetrics != nil {
// record latency - resource version is a unix timestamp in microseconds so we convert to seconds
latencySeconds := float64(time.Now().UnixMicro()-event.ResourceVersion) / 1e6
if latencySeconds > 0 {
s.storageMetrics.WatchEventLatency.WithLabelValues(event.Key.Resource).Observe(latencySeconds)
}
}
}
}
// Main event loop with batching
var batch []*WrittenEvent
for {
select {
case <-ctx.Done():
@@ -1316,57 +1478,40 @@ func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStor
case event, ok := <-stream:
if !ok {
// Process any remaining events in the batch before closing
processEventBatch(batch)
s.log.Debug("watch events closed")
return nil
}
s.log.Debug("Server Broadcasting", "type", event.Type, "rv", event.ResourceVersion, "previousRV", event.PreviousRV, "group", event.Key.Group, "namespace", event.Key.Namespace, "resource", event.Key.Resource, "name", event.Key.Name)
if event.ResourceVersion > since && matchesQueryKey(req.Options.Key, event.Key) {
if !checker(event.Key.Name, event.Folder) {
continue
}
batch = append(batch, event)
}
value := event.Value
// remove the delete marker stored in the value for deleted objects
if event.Type == resourcepb.WatchEvent_DELETED {
value = []byte{}
}
resp := &resourcepb.WatchEvent{
Timestamp: event.Timestamp,
Type: event.Type,
Resource: &resourcepb.WatchEvent_Resource{
Value: value,
Version: event.ResourceVersion,
},
}
if event.PreviousRV > 0 {
prevObj, err := s.Read(ctx, &resourcepb.ReadRequest{Key: event.Key, ResourceVersion: event.PreviousRV})
if err != nil {
// This scenario should never happen, but if it does, we should log it and continue
// sending the event without the previous object. The client will decide what to do.
s.log.Error("error reading previous object", "key", event.Key, "resource_version", event.PreviousRV, "error", prevObj.Error)
} else {
if prevObj.ResourceVersion != event.PreviousRV {
s.log.Error("resource version mismatch", "key", event.Key, "resource_version", event.PreviousRV, "actual", prevObj.ResourceVersion)
return fmt.Errorf("resource version mismatch")
}
resp.Previous = &resourcepb.WatchEvent_Resource{
Value: prevObj.Value,
Version: prevObj.ResourceVersion,
}
// Drain any additional events that are already available (non-blocking)
// Stop draining when we reach maxBatchSize to bound memory and latency
draining := true
for draining && len(batch) < maxBatchSize {
select {
case event, ok := <-stream:
if !ok {
// Process the batch before closing
processEventBatch(batch)
s.log.Debug("watch events closed")
return nil
}
}
if err := srv.Send(resp); err != nil {
return err
}
if s.storageMetrics != nil {
// record latency - resource version is a unix timestamp in microseconds so we convert to seconds
latencySeconds := float64(time.Now().UnixMicro()-event.ResourceVersion) / 1e6
if latencySeconds > 0 {
s.storageMetrics.WatchEventLatency.WithLabelValues(event.Key.Resource).Observe(latencySeconds)
s.log.Debug("Server Broadcasting", "type", event.Type, "rv", event.ResourceVersion, "previousRV", event.PreviousRV, "group", event.Key.Group, "namespace", event.Key.Namespace, "resource", event.Key.Resource, "name", event.Key.Name)
if event.ResourceVersion > since && matchesQueryKey(req.Options.Key, event.Key) {
batch = append(batch, event)
}
default:
draining = false
}
}
// Process the collected batch
processEventBatch(batch)
batch = batch[:0] // Reset batch for reuse
}
}
}

View File

@@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"iter"
"math"
"os"
"path/filepath"
@@ -22,7 +23,6 @@ import (
"github.com/blevesearch/bleve/v2/mapping"
"github.com/blevesearch/bleve/v2/search"
"github.com/blevesearch/bleve/v2/search/query"
bleveSearch "github.com/blevesearch/bleve/v2/search/searcher"
index "github.com/blevesearch/bleve_index_api"
"github.com/prometheus/client_golang/prometheus"
bolterrors "go.etcd.io/bbolt/errors"
@@ -35,6 +35,7 @@ import (
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/grafana/grafana/pkg/storage/unified/search/builders"
"github.com/grafana/authlib/authz"
authlib "github.com/grafana/authlib/types"
"github.com/grafana/grafana/pkg/apimachinery/utils"
@@ -1300,43 +1301,27 @@ func (b *bleveIndex) toBleveSearchRequest(ctx context.Context, req *resourcepb.R
}
if access != nil {
auth, ok := authlib.AuthInfoFrom(ctx)
if !ok {
return nil, resource.AsErrorResult(fmt.Errorf("missing auth info"))
}
verb := utils.VerbList
if req.Permission == int64(dashboardaccess.PERMISSION_EDIT) {
verb = utils.VerbPatch
}
checker, _, err := access.Compile(ctx, auth, authlib.ListRequest{
Namespace: b.key.Namespace,
Group: b.key.Group,
Resource: b.key.Resource,
Verb: verb,
})
if err != nil {
return nil, resource.AsErrorResult(err)
}
checkers := map[string]authlib.ItemChecker{
b.key.Resource: checker,
// Build resource -> verb mapping for batch authorization
resources := map[string]string{
b.key.Resource: verb,
}
// handle federation
// Handle federation
for _, federated := range req.Federated {
checker, _, err := access.Compile(ctx, auth, authlib.ListRequest{
Namespace: federated.Namespace,
Group: federated.Group,
Resource: federated.Resource,
Verb: utils.VerbList,
})
if err != nil {
return nil, resource.AsErrorResult(err)
}
checkers[federated.Resource] = checker
resources[federated.Resource] = utils.VerbList
}
searchrequest.Query = newPermissionScopedQuery(searchrequest.Query, checkers)
searchrequest.Query = newPermissionScopedQuery(searchrequest.Query, permissionScopedQueryConfig{
access: access,
namespace: b.key.Namespace,
group: b.key.Group,
resources: resources,
})
}
for k, v := range req.Facet {
@@ -1866,71 +1851,239 @@ func newResponseFacet(v *search.FacetResult) *resourcepb.ResourceSearchResponse_
type permissionScopedQuery struct {
query.Query
checkers map[string]authlib.ItemChecker // one checker per resource
log log.Logger
access authlib.AccessClient
namespace string
group string
resources map[string]string // resource -> verb mapping
log log.Logger
}
func newPermissionScopedQuery(q query.Query, checkers map[string]authlib.ItemChecker) *permissionScopedQuery {
type permissionScopedQueryConfig struct {
access authlib.AccessClient
namespace string
group string
resources map[string]string // resource -> verb mapping
}
func newPermissionScopedQuery(q query.Query, cfg permissionScopedQueryConfig) *permissionScopedQuery {
return &permissionScopedQuery{
Query: q,
checkers: checkers,
log: log.New("search_permissions"),
Query: q,
access: cfg.access,
namespace: cfg.namespace,
group: cfg.group,
resources: cfg.resources,
log: log.New("search_permissions"),
}
}
func (q *permissionScopedQuery) Searcher(ctx context.Context, i index.IndexReader, m mapping.IndexMapping, options search.SearcherOptions) (search.Searcher, error) {
// Get a new logger from context, to pass traceIDs etc.
logger := q.log.FromContext(ctx)
searcher, err := q.Query.Searcher(ctx, i, m, options)
if err != nil {
return nil, err
}
dvReader, err := i.DocValueReader([]string{"folder"})
if err != nil {
return nil, err
}
filteringSearcher := bleveSearch.NewFilteringSearcher(ctx, searcher, func(d *search.DocumentMatch) bool {
// The doc ID has the format: <namespace>/<group>/<resourceType>/<name>
// IndexInternalID will be the same as the doc ID when using an in-memory index, but when using a file-based
// index it becomes a binary encoded number that has some other internal meaning. Using ExternalID() will get the
// correct doc ID regardless of the index type.
d.ID, err = i.ExternalID(d.IndexInternalID)
if err != nil {
logger.Debug("Error getting external ID", "error", err)
return false
}
parts := strings.Split(d.ID, "/")
// Exclude doc if id isn't expected format
if len(parts) != 4 {
logger.Debug("Unexpected document ID format", "id", d.ID)
return false
}
ns := parts[0]
resource := parts[2]
name := parts[3]
folder := ""
err = dvReader.VisitDocValues(d.IndexInternalID, func(field string, value []byte) {
if field == "folder" {
folder = string(value)
return newBatchAuthzSearcher(ctx, searcher, i, dvReader, q.access, q.namespace, q.group, q.resources, logger), nil
}
// docInfo holds document information for authorization
type docInfo struct {
doc *search.DocumentMatch
resourceType string
name string
folder string
verb string
}
// batchAuthzSearcher implements a batch-aware authorization filtering searcher
// using FilterAuthorized with iter.Pull2 for efficient batched authorization
type batchAuthzSearcher struct {
ctx context.Context
searcher search.Searcher
indexReader index.IndexReader
dvReader index.DocValueReader
access authlib.AccessClient
namespace string
group string
resources map[string]string // resource -> verb mapping
log log.Logger
// Pull iterator state (lazily initialized)
searchCtx *search.SearchContext
next func() (docInfo, error, bool)
stop func()
}
func newBatchAuthzSearcher(
ctx context.Context,
searcher search.Searcher,
indexReader index.IndexReader,
dvReader index.DocValueReader,
access authlib.AccessClient,
namespace string,
group string,
resources map[string]string,
logger log.Logger,
) *batchAuthzSearcher {
return &batchAuthzSearcher{
ctx: ctx,
searcher: searcher,
indexReader: indexReader,
dvReader: dvReader,
access: access,
namespace: namespace,
group: group,
resources: resources,
log: logger,
}
}
func (s *batchAuthzSearcher) Next(searchCtx *search.SearchContext) (*search.DocumentMatch, error) {
// Lazy initialization of pull iterator
if s.next == nil {
s.searchCtx = searchCtx
s.initPullIterator()
}
info, err, ok := s.next()
if !ok {
return nil, nil // No more documents
}
if err != nil {
return nil, err
}
return info.doc, nil
}
// initPullIterator sets up the FilterAuthorized iterator as a pull iterator
func (s *batchAuthzSearcher) initPullIterator() {
// Create iter.Seq that pulls documents from searcher and parses them
candidates := func(yield func(docInfo) bool) {
for {
doc, err := s.searcher.Next(s.searchCtx)
if err != nil {
s.log.Debug("Error getting next document", "error", err)
return
}
if doc == nil {
return // No more documents
}
})
if err != nil {
logger.Debug("Error reading doc values", "error", err)
return false
}
if _, ok := q.checkers[resource]; !ok {
logger.Debug("No resource checker found", "resource", resource)
return false
}
allowed := q.checkers[resource](name, folder)
if !allowed {
logger.Debug("Denying access", "ns", ns, "name", name, "folder", folder)
}
return allowed
})
return filteringSearcher, nil
info, ok := s.parseDocInfo(doc)
if !ok {
continue // Skip invalid documents
}
if !yield(info) {
return
}
}
}
extractFn := func(info docInfo) authz.BatchCheckItem {
return authz.BatchCheckItem{
Name: info.name,
Folder: info.folder,
Verb: info.verb,
Group: s.group,
Resource: info.resourceType,
Namespace: s.namespace,
}
}
// FilterAuthorized extracts auth from context and batches internally
authzIter := authz.FilterAuthorized(s.ctx, s.access, candidates, extractFn).Items
// Convert push iterator to pull iterator
s.next, s.stop = iter.Pull2(authzIter)
}
// parseDocInfo extracts document information needed for authorization
func (s *batchAuthzSearcher) parseDocInfo(doc *search.DocumentMatch) (docInfo, bool) {
// Get external ID
externalID, err := s.indexReader.ExternalID(doc.IndexInternalID)
if err != nil {
s.log.Debug("Error getting external ID", "error", err)
return docInfo{}, false
}
doc.ID = externalID
// Parse doc ID: <namespace>/<group>/<resourceType>/<name>
parts := strings.Split(doc.ID, "/")
if len(parts) != 4 {
s.log.Debug("Unexpected document ID format", "id", doc.ID)
return docInfo{}, false
}
resourceType := parts[2]
name := parts[3]
// Get folder from doc values
folder := ""
err = s.dvReader.VisitDocValues(doc.IndexInternalID, func(field string, value []byte) {
if field == "folder" {
folder = string(value)
}
})
if err != nil {
s.log.Debug("Error reading doc values", "error", err)
return docInfo{}, false
}
// Check if we have a verb for this resource type
verb, ok := s.resources[resourceType]
if !ok {
s.log.Debug("No verb found for resource", "resource", resourceType)
return docInfo{}, false
}
return docInfo{
doc: doc,
resourceType: resourceType,
name: name,
folder: folder,
verb: verb,
}, true
}
func (s *batchAuthzSearcher) Advance(searchCtx *search.SearchContext, ID index.IndexInternalID) (*search.DocumentMatch, error) {
return s.searcher.Advance(searchCtx, ID)
}
func (s *batchAuthzSearcher) Close() error {
if s.stop != nil {
s.stop()
}
return s.searcher.Close()
}
func (s *batchAuthzSearcher) Size() int {
return s.searcher.Size()
}
func (s *batchAuthzSearcher) DocumentMatchPoolSize() int {
return s.searcher.DocumentMatchPoolSize()
}
func (s *batchAuthzSearcher) Min() int {
return s.searcher.Min()
}
func (s *batchAuthzSearcher) Count() uint64 {
return s.searcher.Count()
}
func (s *batchAuthzSearcher) SetQueryNorm(qnorm float64) {
s.searcher.SetQueryNorm(qnorm)
}
func (s *batchAuthzSearcher) Weight() float64 {
return s.searcher.Weight()
}
// hasTerms - any value that will be split into multiple tokens

View File

@@ -653,8 +653,12 @@ func (nc StubAccessClient) Write(ctx context.Context, req *authzextv1.WriteReque
return nil
}
func (nc StubAccessClient) BatchCheck(ctx context.Context, req *authzextv1.BatchCheckRequest) (*authzextv1.BatchCheckResponse, error) {
return nil, nil
func (nc StubAccessClient) BatchCheck(ctx context.Context, user authlib.AuthInfo, req authlib.BatchCheckRequest) (authlib.BatchCheckResponse, error) {
results := make(map[string]authlib.BatchCheckResult, len(req.Checks))
for _, item := range req.Checks {
results[item.CorrelationID] = authlib.BatchCheckResult{Allowed: nc.resourceResponses[item.Resource]}
}
return authlib.BatchCheckResponse{Results: results}, nil
}
func TestSafeInt64ToInt(t *testing.T) {

View File

@@ -517,6 +517,28 @@ func (m *mockAccessClient) Check(ctx context.Context, user types.AuthInfo, req t
return types.CheckResponse{Allowed: m.allowed}, nil
}
func (m *mockAccessClient) BatchCheck(ctx context.Context, user types.AuthInfo, req types.BatchCheckRequest) (types.BatchCheckResponse, error) {
results := make(map[string]types.BatchCheckResult, len(req.Checks))
for _, check := range req.Checks {
allowed := m.allowed
// Check specific folder:verb mappings if provided
if m.allowedMap != nil {
key := fmt.Sprintf("%s:%s", check.Folder, check.Verb)
if a, exists := m.allowedMap[key]; exists {
allowed = a
}
}
results[check.CorrelationID] = types.BatchCheckResult{
Allowed: allowed,
}
}
return types.BatchCheckResponse{Results: results}, nil
}
func (m *mockAccessClient) Compile(ctx context.Context, user types.AuthInfo, req types.ListRequest) (types.ItemChecker, types.Zookie, error) {
if m.compileFn != nil {
return m.compileFn(user, req), types.NoopZookie{}, nil