Compare commits

...

2 Commits

Author SHA1 Message Date
Georges Chaudy 1022f04063 Refactor Watch method to implement batch event processing and authorization 2026-01-09 10:25:50 +01:00
Georges Chaudy 5e7d0392a3 Refactor authorization checks to utilize batch checks instead of compile across multiple services
- Integrated batch authorization checks using the authz package in List functions for IAM and SecureValue services, improving efficiency in permission validation.
- Updated List functions to handle pagination and authorization in a more streamlined manner, reducing redundant checks.
- Enhanced the server's List method to support batch authorization for resource listing, ensuring proper access control.
- Refactored related test cases to validate the new batch authorization logic and ensure comprehensive coverage of various scenarios.
2026-01-09 10:25:36 +01:00
6 changed files with 594 additions and 243 deletions
+61 -38
View File
@@ -4,10 +4,10 @@ import (
"context" "context"
"strconv" "strconv"
"github.com/grafana/authlib/authz"
authlib "github.com/grafana/authlib/types" authlib "github.com/grafana/authlib/types"
iamv0alpha1 "github.com/grafana/grafana/apps/iam/pkg/apis/iam/v0alpha1" iamv0alpha1 "github.com/grafana/grafana/apps/iam/pkg/apis/iam/v0alpha1"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/apimachinery/utils" "github.com/grafana/grafana/pkg/apimachinery/utils"
legacyiamv0 "github.com/grafana/grafana/pkg/apis/iam/v0alpha1" legacyiamv0 "github.com/grafana/grafana/pkg/apis/iam/v0alpha1"
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request" "github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
@@ -54,10 +54,10 @@ type ListResponse[T Resource] struct {
type ListFunc[T Resource] func(ctx context.Context, ns authlib.NamespaceInfo, p Pagination) (*ListResponse[T], error) type ListFunc[T Resource] func(ctx context.Context, ns authlib.NamespaceInfo, p Pagination) (*ListResponse[T], error)
// List is a helper function that will perform access check on resources if // List is a helper function that will perform access check on resources if
// prvovided with a authlib.AccessClient. // provided with a authlib.AccessClient.
func List[T Resource]( func List[T Resource](
ctx context.Context, ctx context.Context,
resource utils.ResourceInfo, resourceInfo utils.ResourceInfo,
ac authlib.AccessClient, ac authlib.AccessClient,
p Pagination, p Pagination,
fn ListFunc[T], fn ListFunc[T],
@@ -67,63 +67,86 @@ func List[T Resource](
return nil, err return nil, err
} }
ident, err := identity.GetRequester(ctx)
if err != nil {
return nil, err
}
check := func(_, _ string) bool { return true }
if ac != nil {
var err error
check, _, err = ac.Compile(ctx, ident, authlib.ListRequest{
Resource: resource.GroupResource().Resource,
Group: resource.GroupResource().Group,
Verb: "list",
Namespace: ns.Value,
})
if err != nil {
return nil, err
}
}
res := &ListResponse[T]{Items: make([]T, 0, p.Limit)} res := &ListResponse[T]{Items: make([]T, 0, p.Limit)}
first, err := fn(ctx, ns, p) first, err := fn(ctx, ns, p)
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, item := range first.Items {
if !check(item.AuthID(), "") {
continue
}
res.Items = append(res.Items, item)
}
res.Continue = first.Continue res.Continue = first.Continue
res.RV = first.RV res.RV = first.RV
// If no access client, skip authorization
if ac == nil {
res.Items = append(res.Items, first.Items...)
for len(res.Items) < int(p.Limit) && res.Continue != 0 {
r, err := fn(ctx, ns, Pagination{Limit: p.Limit - int64(len(res.Items)), Continue: res.Continue})
if err != nil {
return nil, err
}
res.Items = append(res.Items, r.Items...)
res.Continue = r.Continue
}
return res, nil
}
// Use FilterAuthorized to batch authorize items
extractFn := func(item T) authz.BatchCheckItem {
return authz.BatchCheckItem{
Name: item.AuthID(),
Folder: "",
Verb: "list",
Group: resourceInfo.GroupResource().Group,
Resource: resourceInfo.GroupResource().Resource,
Namespace: ns.Value,
}
}
// Convert first batch to iter.Seq and filter
firstCandidates := func(yield func(T) bool) {
for _, item := range first.Items {
if !yield(item) {
return
}
}
}
for item, err := range authz.FilterAuthorized(ctx, ac, firstCandidates, extractFn).Items {
if err != nil {
return nil, err
}
res.Items = append(res.Items, item)
}
outer: outer:
for len(res.Items) < int(p.Limit) && res.Continue != 0 { for len(res.Items) < int(p.Limit) && res.Continue != 0 {
// FIXME: it is not optimal to reduce the amout we look for here but it is the easiest way to // FIXME: it is not optimal to reduce the amount we look for here but it is the easiest way to
// correctly handle pagination and continue tokens // correctly handle pagination and continue tokens
r, err := fn(ctx, ns, Pagination{Limit: p.Limit - int64(len(res.Items)), Continue: res.Continue}) r, err := fn(ctx, ns, Pagination{Limit: p.Limit - int64(len(res.Items)), Continue: res.Continue})
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, item := range r.Items { candidates := func(yield func(T) bool) {
if len(res.Items) == int(p.Limit) { for _, item := range r.Items {
if !yield(item) {
return
}
}
}
for item, authErr := range authz.FilterAuthorized(ctx, ac, candidates, extractFn).Items {
if authErr != nil {
return nil, authErr
}
if len(res.Items) >= int(p.Limit) {
res.Continue = r.Continue res.Continue = r.Continue
break outer break outer
} }
if !check(item.AuthID(), "") {
continue
}
res.Items = append(res.Items, item) res.Items = append(res.Items, item)
} }
res.Continue = r.Continue
} }
return res, nil return res, nil
@@ -6,6 +6,7 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/grafana/authlib/authz"
claims "github.com/grafana/authlib/types" claims "github.com/grafana/authlib/types"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
@@ -297,35 +298,38 @@ func (s *SecureValueService) List(ctx context.Context, namespace xkube.Namespace
s.metrics.SecureValueListDuration.WithLabelValues(strconv.FormatBool(success)).Observe(time.Since(start).Seconds()) s.metrics.SecureValueListDuration.WithLabelValues(strconv.FormatBool(success)).Observe(time.Since(start).Seconds())
}() }()
user, ok := claims.AuthInfoFrom(ctx)
if !ok {
return nil, fmt.Errorf("missing auth info in context")
}
hasPermissionFor, _, err := s.accessClient.Compile(ctx, user, claims.ListRequest{
Group: secretv1beta1.APIGroup,
Resource: secretv1beta1.SecureValuesResourceInfo.GetName(),
Namespace: namespace.String(),
Verb: utils.VerbGet, // Why not VerbList?
})
if err != nil {
return nil, fmt.Errorf("failed to compile checker: %w", err)
}
secureValuesMetadata, err := s.secureValueMetadataStorage.List(ctx, namespace) secureValuesMetadata, err := s.secureValueMetadataStorage.List(ctx, namespace)
if err != nil { if err != nil {
return nil, fmt.Errorf("fetching secure values from storage: %+w", err) return nil, fmt.Errorf("fetching secure values from storage: %+w", err)
} }
// Convert slice to iter.Seq
candidates := func(yield func(secretv1beta1.SecureValue) bool) {
for _, m := range secureValuesMetadata {
if !yield(m) {
return
}
}
}
extractFn := func(sv secretv1beta1.SecureValue) authz.BatchCheckItem {
return authz.BatchCheckItem{
Name: sv.Name,
Folder: "",
Verb: utils.VerbGet, // Why not VerbList?
Group: secretv1beta1.APIGroup,
Resource: secretv1beta1.SecureValuesResourceInfo.GetName(),
Namespace: namespace.String(),
}
}
out := make([]secretv1beta1.SecureValue, 0) out := make([]secretv1beta1.SecureValue, 0)
for _, metadata := range secureValuesMetadata { for item, err := range authz.FilterAuthorized(ctx, s.accessClient, candidates, extractFn).Items {
// Check whether the user has permission to access this specific SecureValue in the namespace. if err != nil {
if !hasPermissionFor(metadata.Name, "") { return nil, fmt.Errorf("failed to check authorization: %w", err)
continue
} }
out = append(out, item)
out = append(out, metadata)
} }
return &secretv1beta1.SecureValueList{ return &secretv1beta1.SecureValueList{
+253 -108
View File
@@ -20,6 +20,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"github.com/grafana/authlib/authz"
claims "github.com/grafana/authlib/types" claims "github.com/grafana/authlib/types"
"github.com/grafana/dskit/backoff" "github.com/grafana/dskit/backoff"
@@ -1051,78 +1052,93 @@ func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resour
rsp := &resourcepb.ListResponse{} rsp := &resourcepb.ListResponse{}
key := req.Options.Key key := req.Options.Key
checker, _, err := s.access.Compile(ctx, user, claims.ListRequest{
Group: key.Group, // Determine verb for authorization
Resource: key.Resource, verb := utils.VerbGet
Namespace: key.Namespace,
Verb: utils.VerbGet,
})
var trashChecker claims.ItemChecker // only for trash
if req.Source == resourcepb.ListRequest_TRASH { if req.Source == resourcepb.ListRequest_TRASH {
trashChecker, _, err = s.access.Compile(ctx, user, claims.ListRequest{ verb = utils.VerbSetPermissions // Basically Admin for trash
Group: key.Group,
Resource: key.Resource,
Namespace: key.Namespace,
Verb: utils.VerbSetPermissions, // Basically Admin
})
if err != nil {
return &resourcepb.ListResponse{Error: AsErrorResult(err)}, nil
}
}
if err != nil {
return &resourcepb.ListResponse{Error: AsErrorResult(err)}, nil
}
if checker == nil {
return &resourcepb.ListResponse{Error: &resourcepb.ErrorResult{
Code: http.StatusForbidden,
}}, nil
} }
// Candidate item for batch authorization
type candidateItem struct {
name string
folder string
resourceVersion int64
value []byte
continueToken string
}
var nextToken string
var iterErr error
// Process items in batches within the iterator
iterFunc := func(iter ListIterator) error { iterFunc := func(iter ListIterator) error {
for iter.Next() { // Convert ListIterator to iter.Seq
if err := iter.Error(); err != nil { candidates := func(yield func(candidateItem) bool) {
for iter.Next() {
if !yield(candidateItem{
name: iter.Name(),
folder: iter.Folder(),
resourceVersion: iter.ResourceVersion(),
value: iter.Value(),
continueToken: iter.ContinueToken(),
}) {
return
}
}
}
extractFn := func(c candidateItem) authz.BatchCheckItem {
return authz.BatchCheckItem{
Name: c.name,
Folder: c.folder,
Verb: verb,
Group: key.Group,
Resource: key.Resource,
Namespace: key.Namespace,
}
}
for item, err := range authz.FilterAuthorized(ctx, s.access, candidates, extractFn).Items {
if err != nil {
return err return err
} }
// Trash is only accessible to admins or the user who deleted the object // For trash items, also check if user is the one who deleted it
if req.Source == resourcepb.ListRequest_TRASH { if req.Source == resourcepb.ListRequest_TRASH {
if !s.isTrashItemAuthorized(ctx, iter, trashChecker) { if !s.isTrashItemAuthorizedByValue(ctx, item.value, true) {
continue continue
} }
} else if !checker(iter.Name(), iter.Folder()) {
continue
} }
item := &resourcepb.ResourceWrapper{ rsp.Items = append(rsp.Items, &resourcepb.ResourceWrapper{
ResourceVersion: iter.ResourceVersion(), ResourceVersion: item.resourceVersion,
Value: iter.Value(), Value: item.value,
} })
pageBytes += len(item.value)
pageBytes += len(item.Value) // Check if we've reached the page limit
rsp.Items = append(rsp.Items, item)
if (req.Limit > 0 && len(rsp.Items) >= int(req.Limit)) || pageBytes >= maxPageBytes { if (req.Limit > 0 && len(rsp.Items) >= int(req.Limit)) || pageBytes >= maxPageBytes {
t := iter.ContinueToken() nextToken = item.continueToken
if iter.Next() { break
rsp.NextPageToken = t
}
return iter.Error()
} }
} }
return iter.Error() return iter.Error()
} }
var rv int64 var rv int64
switch req.Source { switch req.Source {
case resourcepb.ListRequest_STORE: case resourcepb.ListRequest_STORE:
rv, err = s.backend.ListIterator(ctx, req, iterFunc) rv, iterErr = s.backend.ListIterator(ctx, req, iterFunc)
case resourcepb.ListRequest_HISTORY, resourcepb.ListRequest_TRASH: case resourcepb.ListRequest_HISTORY, resourcepb.ListRequest_TRASH:
rv, err = s.backend.ListHistory(ctx, req, iterFunc) rv, iterErr = s.backend.ListHistory(ctx, req, iterFunc)
default: default:
return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid list source: %v", req.Source)) return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid list source: %v", req.Source))
} }
if err != nil { if iterErr != nil {
rsp.Error = AsErrorResult(err) rsp.Error = AsErrorResult(iterErr)
return rsp, nil return rsp, nil
} }
@@ -1134,18 +1150,21 @@ func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resour
return rsp, nil return rsp, nil
} }
rsp.ResourceVersion = rv rsp.ResourceVersion = rv
return rsp, err
rsp.NextPageToken = nextToken
return rsp, nil
} }
// isTrashItemAuthorized checks if the user has access to the trash item. // isTrashItemAuthorizedByValue checks if the user has access to the trash item using the raw value.
func (s *server) isTrashItemAuthorized(ctx context.Context, iter ListIterator, trashChecker claims.ItemChecker) bool { // hasAdminPermission indicates whether the user has admin permission (from BatchCheck).
func (s *server) isTrashItemAuthorizedByValue(ctx context.Context, value []byte, hasAdminPermission bool) bool {
user, ok := claims.AuthInfoFrom(ctx) user, ok := claims.AuthInfoFrom(ctx)
if !ok || user == nil { if !ok || user == nil {
return false return false
} }
partial := &metav1.PartialObjectMetadata{} partial := &metav1.PartialObjectMetadata{}
err := json.Unmarshal(iter.Value(), partial) err := json.Unmarshal(value, partial)
if err != nil { if err != nil {
return false return false
} }
@@ -1161,7 +1180,7 @@ func (s *server) isTrashItemAuthorized(ctx context.Context, iter ListIterator, t
} }
// Trash is only accessible to admins or the user who deleted the object // Trash is only accessible to admins or the user who deleted the object
return obj.GetUpdatedBy() == user.GetUID() || trashChecker(iter.Name(), iter.Folder()) return obj.GetUpdatedBy() == user.GetUID() || hasAdminPermission
} }
func (s *server) initWatcher() error { func (s *server) initWatcher() error {
@@ -1202,18 +1221,6 @@ func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStor
} }
key := req.Options.Key key := req.Options.Key
checker, _, err := s.access.Compile(ctx, user, claims.ListRequest{
Group: key.Group,
Resource: key.Resource,
Namespace: key.Namespace,
Verb: utils.VerbGet,
})
if err != nil {
return err
}
if checker == nil {
return apierrors.NewUnauthorized("not allowed to list anything") // ?? or a single error?
}
// Start listening -- this will buffer any changes that happen while we backfill. // Start listening -- this will buffer any changes that happen while we backfill.
// If events are generated faster than we can process them, then some events will be dropped. // If events are generated faster than we can process them, then some events will be dropped.
@@ -1267,22 +1274,56 @@ func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStor
var initialEventsRV int64 // resource version coming from the initial events var initialEventsRV int64 // resource version coming from the initial events
if req.SendInitialEvents { if req.SendInitialEvents {
// Backfill the stream by adding every existing entities. // Backfill the stream by adding every existing entities with batch authorization
type candidateEvent struct {
name string
folder string
value []byte
version int64
}
initialEventsRV, err = s.backend.ListIterator(ctx, &resourcepb.ListRequest{Options: req.Options}, func(iter ListIterator) error { initialEventsRV, err = s.backend.ListIterator(ctx, &resourcepb.ListRequest{Options: req.Options}, func(iter ListIterator) error {
for iter.Next() { // Convert ListIterator to iter.Seq
if err := iter.Error(); err != nil { candidates := func(yield func(candidateEvent) bool) {
for iter.Next() {
if !yield(candidateEvent{
name: iter.Name(),
folder: iter.Folder(),
value: iter.Value(),
version: iter.ResourceVersion(),
}) {
return
}
}
}
extractFn := func(c candidateEvent) authz.BatchCheckItem {
return authz.BatchCheckItem{
Name: c.name,
Folder: c.folder,
Verb: utils.VerbGet,
Group: key.Group,
Resource: key.Resource,
Namespace: key.Namespace,
}
}
for item, err := range authz.FilterAuthorized(ctx, s.access, candidates, extractFn).Items {
if err != nil {
return err return err
} }
if err := srv.Send(&resourcepb.WatchEvent{ if err := srv.Send(&resourcepb.WatchEvent{
Type: resourcepb.WatchEvent_ADDED, Type: resourcepb.WatchEvent_ADDED,
Resource: &resourcepb.WatchEvent_Resource{ Resource: &resourcepb.WatchEvent_Resource{
Value: iter.Value(), Value: item.value,
Version: iter.ResourceVersion(), Version: item.version,
}, },
}); err != nil { }); err != nil {
return err return err
} }
} }
return iter.Error() return iter.Error()
}) })
if err != nil { if err != nil {
@@ -1309,6 +1350,127 @@ func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStor
default: default:
since = req.Since since = req.Since
} }
// Type to hold candidate events for batch authorization
type candidateWatchEvent struct {
event *WrittenEvent
}
// Type to hold authorized event with its fetched previous object
type authorizedEvent struct {
event *WrittenEvent
previous *resourcepb.ReadResponse // nil if no previous or fetch failed
}
const maxBatchSize = 100
// processEventBatch authorizes and sends a batch of events.
// Errors are logged but never returned to keep the watch running.
processEventBatch := func(batch []*WrittenEvent) {
if len(batch) == 0 {
return
}
// Convert batch to iter.Seq for FilterAuthorized
candidates := func(yield func(candidateWatchEvent) bool) {
for _, event := range batch {
if !yield(candidateWatchEvent{event: event}) {
return
}
}
}
extractFn := func(c candidateWatchEvent) authz.BatchCheckItem {
return authz.BatchCheckItem{
Name: c.event.Key.Name,
Folder: c.event.Folder,
Verb: utils.VerbGet,
Group: key.Group,
Resource: key.Resource,
Namespace: key.Namespace,
}
}
// Step 1: Collect all authorized events
var authorizedEvents []authorizedEvent
for item, err := range authz.FilterAuthorized(ctx, s.access, candidates, extractFn).Items {
if err != nil {
s.log.Error("error during batch authorization", "error", err)
continue
}
authorizedEvents = append(authorizedEvents, authorizedEvent{event: item.event})
}
if len(authorizedEvents) == 0 {
return
}
// Step 2: Fetch previous objects concurrently for events that need them
var wg sync.WaitGroup
for i := range authorizedEvents {
if authorizedEvents[i].event.PreviousRV > 0 {
wg.Add(1)
go func(idx int) {
defer wg.Done()
event := authorizedEvents[idx].event
prevObj, readErr := s.Read(ctx, &resourcepb.ReadRequest{Key: event.Key, ResourceVersion: event.PreviousRV})
if readErr != nil {
s.log.Error("error reading previous object", "key", event.Key, "resource_version", event.PreviousRV, "error", readErr)
return
}
if prevObj.Error != nil {
s.log.Error("error reading previous object", "key", event.Key, "resource_version", event.PreviousRV, "error", prevObj.Error)
return
}
if prevObj.ResourceVersion != event.PreviousRV {
s.log.Error("resource version mismatch", "key", event.Key, "resource_version", event.PreviousRV, "actual", prevObj.ResourceVersion)
return
}
authorizedEvents[idx].previous = prevObj
}(i)
}
}
wg.Wait()
// Step 3: Send all events in order
for _, authEvent := range authorizedEvents {
event := authEvent.event
value := event.Value
// remove the delete marker stored in the value for deleted objects
if event.Type == resourcepb.WatchEvent_DELETED {
value = []byte{}
}
resp := &resourcepb.WatchEvent{
Timestamp: event.Timestamp,
Type: event.Type,
Resource: &resourcepb.WatchEvent_Resource{
Value: value,
Version: event.ResourceVersion,
},
}
if authEvent.previous != nil {
resp.Previous = &resourcepb.WatchEvent_Resource{
Value: authEvent.previous.Value,
Version: authEvent.previous.ResourceVersion,
}
}
if err := srv.Send(resp); err != nil {
s.log.Error("error sending watch event", "key", event.Key, "error", err)
continue
}
if s.storageMetrics != nil {
// record latency - resource version is a unix timestamp in microseconds so we convert to seconds
latencySeconds := float64(time.Now().UnixMicro()-event.ResourceVersion) / 1e6
if latencySeconds > 0 {
s.storageMetrics.WatchEventLatency.WithLabelValues(event.Key.Resource).Observe(latencySeconds)
}
}
}
}
// Main event loop with batching
var batch []*WrittenEvent
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@@ -1316,57 +1478,40 @@ func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStor
case event, ok := <-stream: case event, ok := <-stream:
if !ok { if !ok {
// Process any remaining events in the batch before closing
processEventBatch(batch)
s.log.Debug("watch events closed") s.log.Debug("watch events closed")
return nil return nil
} }
s.log.Debug("Server Broadcasting", "type", event.Type, "rv", event.ResourceVersion, "previousRV", event.PreviousRV, "group", event.Key.Group, "namespace", event.Key.Namespace, "resource", event.Key.Resource, "name", event.Key.Name) s.log.Debug("Server Broadcasting", "type", event.Type, "rv", event.ResourceVersion, "previousRV", event.PreviousRV, "group", event.Key.Group, "namespace", event.Key.Namespace, "resource", event.Key.Resource, "name", event.Key.Name)
if event.ResourceVersion > since && matchesQueryKey(req.Options.Key, event.Key) { if event.ResourceVersion > since && matchesQueryKey(req.Options.Key, event.Key) {
if !checker(event.Key.Name, event.Folder) { batch = append(batch, event)
continue }
}
value := event.Value // Drain any additional events that are already available (non-blocking)
// remove the delete marker stored in the value for deleted objects // Stop draining when we reach maxBatchSize to bound memory and latency
if event.Type == resourcepb.WatchEvent_DELETED { draining := true
value = []byte{} for draining && len(batch) < maxBatchSize {
} select {
resp := &resourcepb.WatchEvent{ case event, ok := <-stream:
Timestamp: event.Timestamp, if !ok {
Type: event.Type, // Process the batch before closing
Resource: &resourcepb.WatchEvent_Resource{ processEventBatch(batch)
Value: value, s.log.Debug("watch events closed")
Version: event.ResourceVersion, return nil
},
}
if event.PreviousRV > 0 {
prevObj, err := s.Read(ctx, &resourcepb.ReadRequest{Key: event.Key, ResourceVersion: event.PreviousRV})
if err != nil {
// This scenario should never happen, but if it does, we should log it and continue
// sending the event without the previous object. The client will decide what to do.
s.log.Error("error reading previous object", "key", event.Key, "resource_version", event.PreviousRV, "error", prevObj.Error)
} else {
if prevObj.ResourceVersion != event.PreviousRV {
s.log.Error("resource version mismatch", "key", event.Key, "resource_version", event.PreviousRV, "actual", prevObj.ResourceVersion)
return fmt.Errorf("resource version mismatch")
}
resp.Previous = &resourcepb.WatchEvent_Resource{
Value: prevObj.Value,
Version: prevObj.ResourceVersion,
}
} }
} s.log.Debug("Server Broadcasting", "type", event.Type, "rv", event.ResourceVersion, "previousRV", event.PreviousRV, "group", event.Key.Group, "namespace", event.Key.Namespace, "resource", event.Key.Resource, "name", event.Key.Name)
if err := srv.Send(resp); err != nil { if event.ResourceVersion > since && matchesQueryKey(req.Options.Key, event.Key) {
return err batch = append(batch, event)
}
if s.storageMetrics != nil {
// record latency - resource version is a unix timestamp in microseconds so we convert to seconds
latencySeconds := float64(time.Now().UnixMicro()-event.ResourceVersion) / 1e6
if latencySeconds > 0 {
s.storageMetrics.WatchEventLatency.WithLabelValues(event.Key.Resource).Observe(latencySeconds)
} }
default:
draining = false
} }
} }
// Process the collected batch
processEventBatch(batch)
batch = batch[:0] // Reset batch for reuse
} }
} }
} }
+227 -74
View File
@@ -6,6 +6,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"iter"
"math" "math"
"os" "os"
"path/filepath" "path/filepath"
@@ -22,7 +23,6 @@ import (
"github.com/blevesearch/bleve/v2/mapping" "github.com/blevesearch/bleve/v2/mapping"
"github.com/blevesearch/bleve/v2/search" "github.com/blevesearch/bleve/v2/search"
"github.com/blevesearch/bleve/v2/search/query" "github.com/blevesearch/bleve/v2/search/query"
bleveSearch "github.com/blevesearch/bleve/v2/search/searcher"
index "github.com/blevesearch/bleve_index_api" index "github.com/blevesearch/bleve_index_api"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
bolterrors "go.etcd.io/bbolt/errors" bolterrors "go.etcd.io/bbolt/errors"
@@ -35,6 +35,7 @@ import (
"github.com/grafana/grafana/pkg/storage/unified/resourcepb" "github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/grafana/grafana/pkg/storage/unified/search/builders" "github.com/grafana/grafana/pkg/storage/unified/search/builders"
"github.com/grafana/authlib/authz"
authlib "github.com/grafana/authlib/types" authlib "github.com/grafana/authlib/types"
"github.com/grafana/grafana/pkg/apimachinery/utils" "github.com/grafana/grafana/pkg/apimachinery/utils"
@@ -1300,43 +1301,27 @@ func (b *bleveIndex) toBleveSearchRequest(ctx context.Context, req *resourcepb.R
} }
if access != nil { if access != nil {
auth, ok := authlib.AuthInfoFrom(ctx)
if !ok {
return nil, resource.AsErrorResult(fmt.Errorf("missing auth info"))
}
verb := utils.VerbList verb := utils.VerbList
if req.Permission == int64(dashboardaccess.PERMISSION_EDIT) { if req.Permission == int64(dashboardaccess.PERMISSION_EDIT) {
verb = utils.VerbPatch verb = utils.VerbPatch
} }
checker, _, err := access.Compile(ctx, auth, authlib.ListRequest{ // Build resource -> verb mapping for batch authorization
Namespace: b.key.Namespace, resources := map[string]string{
Group: b.key.Group, b.key.Resource: verb,
Resource: b.key.Resource,
Verb: verb,
})
if err != nil {
return nil, resource.AsErrorResult(err)
}
checkers := map[string]authlib.ItemChecker{
b.key.Resource: checker,
} }
// handle federation // Handle federation
for _, federated := range req.Federated { for _, federated := range req.Federated {
checker, _, err := access.Compile(ctx, auth, authlib.ListRequest{ resources[federated.Resource] = utils.VerbList
Namespace: federated.Namespace,
Group: federated.Group,
Resource: federated.Resource,
Verb: utils.VerbList,
})
if err != nil {
return nil, resource.AsErrorResult(err)
}
checkers[federated.Resource] = checker
} }
searchrequest.Query = newPermissionScopedQuery(searchrequest.Query, checkers) searchrequest.Query = newPermissionScopedQuery(searchrequest.Query, permissionScopedQueryConfig{
access: access,
namespace: b.key.Namespace,
group: b.key.Group,
resources: resources,
})
} }
for k, v := range req.Facet { for k, v := range req.Facet {
@@ -1866,71 +1851,239 @@ func newResponseFacet(v *search.FacetResult) *resourcepb.ResourceSearchResponse_
type permissionScopedQuery struct { type permissionScopedQuery struct {
query.Query query.Query
checkers map[string]authlib.ItemChecker // one checker per resource access authlib.AccessClient
log log.Logger namespace string
group string
resources map[string]string // resource -> verb mapping
log log.Logger
} }
func newPermissionScopedQuery(q query.Query, checkers map[string]authlib.ItemChecker) *permissionScopedQuery { type permissionScopedQueryConfig struct {
access authlib.AccessClient
namespace string
group string
resources map[string]string // resource -> verb mapping
}
func newPermissionScopedQuery(q query.Query, cfg permissionScopedQueryConfig) *permissionScopedQuery {
return &permissionScopedQuery{ return &permissionScopedQuery{
Query: q, Query: q,
checkers: checkers, access: cfg.access,
log: log.New("search_permissions"), namespace: cfg.namespace,
group: cfg.group,
resources: cfg.resources,
log: log.New("search_permissions"),
} }
} }
func (q *permissionScopedQuery) Searcher(ctx context.Context, i index.IndexReader, m mapping.IndexMapping, options search.SearcherOptions) (search.Searcher, error) { func (q *permissionScopedQuery) Searcher(ctx context.Context, i index.IndexReader, m mapping.IndexMapping, options search.SearcherOptions) (search.Searcher, error) {
// Get a new logger from context, to pass traceIDs etc.
logger := q.log.FromContext(ctx) logger := q.log.FromContext(ctx)
searcher, err := q.Query.Searcher(ctx, i, m, options) searcher, err := q.Query.Searcher(ctx, i, m, options)
if err != nil { if err != nil {
return nil, err return nil, err
} }
dvReader, err := i.DocValueReader([]string{"folder"}) dvReader, err := i.DocValueReader([]string{"folder"})
if err != nil { if err != nil {
return nil, err return nil, err
} }
filteringSearcher := bleveSearch.NewFilteringSearcher(ctx, searcher, func(d *search.DocumentMatch) bool {
// The doc ID has the format: <namespace>/<group>/<resourceType>/<name>
// IndexInternalID will be the same as the doc ID when using an in-memory index, but when using a file-based
// index it becomes a binary encoded number that has some other internal meaning. Using ExternalID() will get the
// correct doc ID regardless of the index type.
d.ID, err = i.ExternalID(d.IndexInternalID)
if err != nil {
logger.Debug("Error getting external ID", "error", err)
return false
}
parts := strings.Split(d.ID, "/") return newBatchAuthzSearcher(ctx, searcher, i, dvReader, q.access, q.namespace, q.group, q.resources, logger), nil
// Exclude doc if id isn't expected format }
if len(parts) != 4 {
logger.Debug("Unexpected document ID format", "id", d.ID) // docInfo holds document information for authorization
return false type docInfo struct {
} doc *search.DocumentMatch
ns := parts[0] resourceType string
resource := parts[2] name string
name := parts[3] folder string
folder := "" verb string
err = dvReader.VisitDocValues(d.IndexInternalID, func(field string, value []byte) { }
if field == "folder" {
folder = string(value) // batchAuthzSearcher implements a batch-aware authorization filtering searcher
// using FilterAuthorized with iter.Pull2 for efficient batched authorization
type batchAuthzSearcher struct {
ctx context.Context
searcher search.Searcher
indexReader index.IndexReader
dvReader index.DocValueReader
access authlib.AccessClient
namespace string
group string
resources map[string]string // resource -> verb mapping
log log.Logger
// Pull iterator state (lazily initialized)
searchCtx *search.SearchContext
next func() (docInfo, error, bool)
stop func()
}
func newBatchAuthzSearcher(
ctx context.Context,
searcher search.Searcher,
indexReader index.IndexReader,
dvReader index.DocValueReader,
access authlib.AccessClient,
namespace string,
group string,
resources map[string]string,
logger log.Logger,
) *batchAuthzSearcher {
return &batchAuthzSearcher{
ctx: ctx,
searcher: searcher,
indexReader: indexReader,
dvReader: dvReader,
access: access,
namespace: namespace,
group: group,
resources: resources,
log: logger,
}
}
func (s *batchAuthzSearcher) Next(searchCtx *search.SearchContext) (*search.DocumentMatch, error) {
// Lazy initialization of pull iterator
if s.next == nil {
s.searchCtx = searchCtx
s.initPullIterator()
}
info, err, ok := s.next()
if !ok {
return nil, nil // No more documents
}
if err != nil {
return nil, err
}
return info.doc, nil
}
// initPullIterator sets up the FilterAuthorized iterator as a pull iterator
func (s *batchAuthzSearcher) initPullIterator() {
// Create iter.Seq that pulls documents from searcher and parses them
candidates := func(yield func(docInfo) bool) {
for {
doc, err := s.searcher.Next(s.searchCtx)
if err != nil {
s.log.Debug("Error getting next document", "error", err)
return
}
if doc == nil {
return // No more documents
} }
})
if err != nil {
logger.Debug("Error reading doc values", "error", err)
return false
}
if _, ok := q.checkers[resource]; !ok {
logger.Debug("No resource checker found", "resource", resource)
return false
}
allowed := q.checkers[resource](name, folder)
if !allowed {
logger.Debug("Denying access", "ns", ns, "name", name, "folder", folder)
}
return allowed
})
return filteringSearcher, nil info, ok := s.parseDocInfo(doc)
if !ok {
continue // Skip invalid documents
}
if !yield(info) {
return
}
}
}
extractFn := func(info docInfo) authz.BatchCheckItem {
return authz.BatchCheckItem{
Name: info.name,
Folder: info.folder,
Verb: info.verb,
Group: s.group,
Resource: info.resourceType,
Namespace: s.namespace,
}
}
// FilterAuthorized extracts auth from context and batches internally
authzIter := authz.FilterAuthorized(s.ctx, s.access, candidates, extractFn).Items
// Convert push iterator to pull iterator
s.next, s.stop = iter.Pull2(authzIter)
}
// parseDocInfo extracts document information needed for authorization
func (s *batchAuthzSearcher) parseDocInfo(doc *search.DocumentMatch) (docInfo, bool) {
// Get external ID
externalID, err := s.indexReader.ExternalID(doc.IndexInternalID)
if err != nil {
s.log.Debug("Error getting external ID", "error", err)
return docInfo{}, false
}
doc.ID = externalID
// Parse doc ID: <namespace>/<group>/<resourceType>/<name>
parts := strings.Split(doc.ID, "/")
if len(parts) != 4 {
s.log.Debug("Unexpected document ID format", "id", doc.ID)
return docInfo{}, false
}
resourceType := parts[2]
name := parts[3]
// Get folder from doc values
folder := ""
err = s.dvReader.VisitDocValues(doc.IndexInternalID, func(field string, value []byte) {
if field == "folder" {
folder = string(value)
}
})
if err != nil {
s.log.Debug("Error reading doc values", "error", err)
return docInfo{}, false
}
// Check if we have a verb for this resource type
verb, ok := s.resources[resourceType]
if !ok {
s.log.Debug("No verb found for resource", "resource", resourceType)
return docInfo{}, false
}
return docInfo{
doc: doc,
resourceType: resourceType,
name: name,
folder: folder,
verb: verb,
}, true
}
func (s *batchAuthzSearcher) Advance(searchCtx *search.SearchContext, ID index.IndexInternalID) (*search.DocumentMatch, error) {
return s.searcher.Advance(searchCtx, ID)
}
func (s *batchAuthzSearcher) Close() error {
if s.stop != nil {
s.stop()
}
return s.searcher.Close()
}
func (s *batchAuthzSearcher) Size() int {
return s.searcher.Size()
}
func (s *batchAuthzSearcher) DocumentMatchPoolSize() int {
return s.searcher.DocumentMatchPoolSize()
}
func (s *batchAuthzSearcher) Min() int {
return s.searcher.Min()
}
func (s *batchAuthzSearcher) Count() uint64 {
return s.searcher.Count()
}
func (s *batchAuthzSearcher) SetQueryNorm(qnorm float64) {
s.searcher.SetQueryNorm(qnorm)
}
func (s *batchAuthzSearcher) Weight() float64 {
return s.searcher.Weight()
} }
// hasTerms - any value that will be split into multiple tokens // hasTerms - any value that will be split into multiple tokens
+6 -2
View File
@@ -653,8 +653,12 @@ func (nc StubAccessClient) Write(ctx context.Context, req *authzextv1.WriteReque
return nil return nil
} }
func (nc StubAccessClient) BatchCheck(ctx context.Context, req *authzextv1.BatchCheckRequest) (*authzextv1.BatchCheckResponse, error) { func (nc StubAccessClient) BatchCheck(ctx context.Context, user authlib.AuthInfo, req authlib.BatchCheckRequest) (authlib.BatchCheckResponse, error) {
return nil, nil results := make(map[string]authlib.BatchCheckResult, len(req.Checks))
for _, item := range req.Checks {
results[item.CorrelationID] = authlib.BatchCheckResult{Allowed: nc.resourceResponses[item.Resource]}
}
return authlib.BatchCheckResponse{Results: results}, nil
} }
func TestSafeInt64ToInt(t *testing.T) { func TestSafeInt64ToInt(t *testing.T) {
+22
View File
@@ -517,6 +517,28 @@ func (m *mockAccessClient) Check(ctx context.Context, user types.AuthInfo, req t
return types.CheckResponse{Allowed: m.allowed}, nil return types.CheckResponse{Allowed: m.allowed}, nil
} }
func (m *mockAccessClient) BatchCheck(ctx context.Context, user types.AuthInfo, req types.BatchCheckRequest) (types.BatchCheckResponse, error) {
results := make(map[string]types.BatchCheckResult, len(req.Checks))
for _, check := range req.Checks {
allowed := m.allowed
// Check specific folder:verb mappings if provided
if m.allowedMap != nil {
key := fmt.Sprintf("%s:%s", check.Folder, check.Verb)
if a, exists := m.allowedMap[key]; exists {
allowed = a
}
}
results[check.CorrelationID] = types.BatchCheckResult{
Allowed: allowed,
}
}
return types.BatchCheckResponse{Results: results}, nil
}
func (m *mockAccessClient) Compile(ctx context.Context, user types.AuthInfo, req types.ListRequest) (types.ItemChecker, types.Zookie, error) { func (m *mockAccessClient) Compile(ctx context.Context, user types.AuthInfo, req types.ListRequest) (types.ItemChecker, types.Zookie, error) {
if m.compileFn != nil { if m.compileFn != nil {
return m.compileFn(user, req), types.NoopZookie{}, nil return m.compileFn(user, req), types.NoopZookie{}, nil