Compare commits
2 Commits
authlib-ba
...
authlib-ba
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1022f04063 | ||
|
|
5e7d0392a3 |
@@ -4,10 +4,10 @@ import (
|
||||
"context"
|
||||
"strconv"
|
||||
|
||||
"github.com/grafana/authlib/authz"
|
||||
authlib "github.com/grafana/authlib/types"
|
||||
|
||||
iamv0alpha1 "github.com/grafana/grafana/apps/iam/pkg/apis/iam/v0alpha1"
|
||||
"github.com/grafana/grafana/pkg/apimachinery/identity"
|
||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||
legacyiamv0 "github.com/grafana/grafana/pkg/apis/iam/v0alpha1"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
|
||||
@@ -54,10 +54,10 @@ type ListResponse[T Resource] struct {
|
||||
type ListFunc[T Resource] func(ctx context.Context, ns authlib.NamespaceInfo, p Pagination) (*ListResponse[T], error)
|
||||
|
||||
// List is a helper function that will perform access check on resources if
|
||||
// prvovided with a authlib.AccessClient.
|
||||
// provided with a authlib.AccessClient.
|
||||
func List[T Resource](
|
||||
ctx context.Context,
|
||||
resource utils.ResourceInfo,
|
||||
resourceInfo utils.ResourceInfo,
|
||||
ac authlib.AccessClient,
|
||||
p Pagination,
|
||||
fn ListFunc[T],
|
||||
@@ -67,63 +67,86 @@ func List[T Resource](
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ident, err := identity.GetRequester(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
check := func(_, _ string) bool { return true }
|
||||
if ac != nil {
|
||||
var err error
|
||||
check, _, err = ac.Compile(ctx, ident, authlib.ListRequest{
|
||||
Resource: resource.GroupResource().Resource,
|
||||
Group: resource.GroupResource().Group,
|
||||
Verb: "list",
|
||||
Namespace: ns.Value,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
res := &ListResponse[T]{Items: make([]T, 0, p.Limit)}
|
||||
|
||||
first, err := fn(ctx, ns, p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, item := range first.Items {
|
||||
if !check(item.AuthID(), "") {
|
||||
continue
|
||||
}
|
||||
res.Items = append(res.Items, item)
|
||||
}
|
||||
res.Continue = first.Continue
|
||||
res.RV = first.RV
|
||||
|
||||
// If no access client, skip authorization
|
||||
if ac == nil {
|
||||
res.Items = append(res.Items, first.Items...)
|
||||
|
||||
for len(res.Items) < int(p.Limit) && res.Continue != 0 {
|
||||
r, err := fn(ctx, ns, Pagination{Limit: p.Limit - int64(len(res.Items)), Continue: res.Continue})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res.Items = append(res.Items, r.Items...)
|
||||
res.Continue = r.Continue
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// Use FilterAuthorized to batch authorize items
|
||||
extractFn := func(item T) authz.BatchCheckItem {
|
||||
return authz.BatchCheckItem{
|
||||
Name: item.AuthID(),
|
||||
Folder: "",
|
||||
Verb: "list",
|
||||
Group: resourceInfo.GroupResource().Group,
|
||||
Resource: resourceInfo.GroupResource().Resource,
|
||||
Namespace: ns.Value,
|
||||
}
|
||||
}
|
||||
|
||||
// Convert first batch to iter.Seq and filter
|
||||
firstCandidates := func(yield func(T) bool) {
|
||||
for _, item := range first.Items {
|
||||
if !yield(item) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for item, err := range authz.FilterAuthorized(ctx, ac, firstCandidates, extractFn).Items {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res.Items = append(res.Items, item)
|
||||
}
|
||||
|
||||
outer:
|
||||
for len(res.Items) < int(p.Limit) && res.Continue != 0 {
|
||||
// FIXME: it is not optimal to reduce the amout we look for here but it is the easiest way to
|
||||
// FIXME: it is not optimal to reduce the amount we look for here but it is the easiest way to
|
||||
// correctly handle pagination and continue tokens
|
||||
r, err := fn(ctx, ns, Pagination{Limit: p.Limit - int64(len(res.Items)), Continue: res.Continue})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, item := range r.Items {
|
||||
if len(res.Items) == int(p.Limit) {
|
||||
candidates := func(yield func(T) bool) {
|
||||
for _, item := range r.Items {
|
||||
if !yield(item) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for item, authErr := range authz.FilterAuthorized(ctx, ac, candidates, extractFn).Items {
|
||||
if authErr != nil {
|
||||
return nil, authErr
|
||||
}
|
||||
if len(res.Items) >= int(p.Limit) {
|
||||
res.Continue = r.Continue
|
||||
break outer
|
||||
}
|
||||
|
||||
if !check(item.AuthID(), "") {
|
||||
continue
|
||||
}
|
||||
|
||||
res.Items = append(res.Items, item)
|
||||
}
|
||||
res.Continue = r.Continue
|
||||
}
|
||||
|
||||
return res, nil
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/authlib/authz"
|
||||
claims "github.com/grafana/authlib/types"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
@@ -297,35 +298,38 @@ func (s *SecureValueService) List(ctx context.Context, namespace xkube.Namespace
|
||||
s.metrics.SecureValueListDuration.WithLabelValues(strconv.FormatBool(success)).Observe(time.Since(start).Seconds())
|
||||
}()
|
||||
|
||||
user, ok := claims.AuthInfoFrom(ctx)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("missing auth info in context")
|
||||
}
|
||||
|
||||
hasPermissionFor, _, err := s.accessClient.Compile(ctx, user, claims.ListRequest{
|
||||
Group: secretv1beta1.APIGroup,
|
||||
Resource: secretv1beta1.SecureValuesResourceInfo.GetName(),
|
||||
Namespace: namespace.String(),
|
||||
Verb: utils.VerbGet, // Why not VerbList?
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to compile checker: %w", err)
|
||||
}
|
||||
|
||||
secureValuesMetadata, err := s.secureValueMetadataStorage.List(ctx, namespace)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("fetching secure values from storage: %+w", err)
|
||||
}
|
||||
|
||||
// Convert slice to iter.Seq
|
||||
candidates := func(yield func(secretv1beta1.SecureValue) bool) {
|
||||
for _, m := range secureValuesMetadata {
|
||||
if !yield(m) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extractFn := func(sv secretv1beta1.SecureValue) authz.BatchCheckItem {
|
||||
return authz.BatchCheckItem{
|
||||
Name: sv.Name,
|
||||
Folder: "",
|
||||
Verb: utils.VerbGet, // Why not VerbList?
|
||||
Group: secretv1beta1.APIGroup,
|
||||
Resource: secretv1beta1.SecureValuesResourceInfo.GetName(),
|
||||
Namespace: namespace.String(),
|
||||
}
|
||||
}
|
||||
|
||||
out := make([]secretv1beta1.SecureValue, 0)
|
||||
|
||||
for _, metadata := range secureValuesMetadata {
|
||||
// Check whether the user has permission to access this specific SecureValue in the namespace.
|
||||
if !hasPermissionFor(metadata.Name, "") {
|
||||
continue
|
||||
for item, err := range authz.FilterAuthorized(ctx, s.accessClient, candidates, extractFn).Items {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to check authorization: %w", err)
|
||||
}
|
||||
|
||||
out = append(out, metadata)
|
||||
out = append(out, item)
|
||||
}
|
||||
|
||||
return &secretv1beta1.SecureValueList{
|
||||
|
||||
@@ -20,6 +20,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
|
||||
"github.com/grafana/authlib/authz"
|
||||
claims "github.com/grafana/authlib/types"
|
||||
"github.com/grafana/dskit/backoff"
|
||||
|
||||
@@ -1051,78 +1052,93 @@ func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resour
|
||||
rsp := &resourcepb.ListResponse{}
|
||||
|
||||
key := req.Options.Key
|
||||
checker, _, err := s.access.Compile(ctx, user, claims.ListRequest{
|
||||
Group: key.Group,
|
||||
Resource: key.Resource,
|
||||
Namespace: key.Namespace,
|
||||
Verb: utils.VerbGet,
|
||||
})
|
||||
var trashChecker claims.ItemChecker // only for trash
|
||||
|
||||
// Determine verb for authorization
|
||||
verb := utils.VerbGet
|
||||
if req.Source == resourcepb.ListRequest_TRASH {
|
||||
trashChecker, _, err = s.access.Compile(ctx, user, claims.ListRequest{
|
||||
Group: key.Group,
|
||||
Resource: key.Resource,
|
||||
Namespace: key.Namespace,
|
||||
Verb: utils.VerbSetPermissions, // Basically Admin
|
||||
})
|
||||
if err != nil {
|
||||
return &resourcepb.ListResponse{Error: AsErrorResult(err)}, nil
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return &resourcepb.ListResponse{Error: AsErrorResult(err)}, nil
|
||||
}
|
||||
if checker == nil {
|
||||
return &resourcepb.ListResponse{Error: &resourcepb.ErrorResult{
|
||||
Code: http.StatusForbidden,
|
||||
}}, nil
|
||||
verb = utils.VerbSetPermissions // Basically Admin for trash
|
||||
}
|
||||
|
||||
// Candidate item for batch authorization
|
||||
type candidateItem struct {
|
||||
name string
|
||||
folder string
|
||||
resourceVersion int64
|
||||
value []byte
|
||||
continueToken string
|
||||
}
|
||||
|
||||
var nextToken string
|
||||
var iterErr error
|
||||
|
||||
// Process items in batches within the iterator
|
||||
iterFunc := func(iter ListIterator) error {
|
||||
for iter.Next() {
|
||||
if err := iter.Error(); err != nil {
|
||||
// Convert ListIterator to iter.Seq
|
||||
candidates := func(yield func(candidateItem) bool) {
|
||||
for iter.Next() {
|
||||
if !yield(candidateItem{
|
||||
name: iter.Name(),
|
||||
folder: iter.Folder(),
|
||||
resourceVersion: iter.ResourceVersion(),
|
||||
value: iter.Value(),
|
||||
continueToken: iter.ContinueToken(),
|
||||
}) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extractFn := func(c candidateItem) authz.BatchCheckItem {
|
||||
return authz.BatchCheckItem{
|
||||
Name: c.name,
|
||||
Folder: c.folder,
|
||||
Verb: verb,
|
||||
Group: key.Group,
|
||||
Resource: key.Resource,
|
||||
Namespace: key.Namespace,
|
||||
}
|
||||
}
|
||||
|
||||
for item, err := range authz.FilterAuthorized(ctx, s.access, candidates, extractFn).Items {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Trash is only accessible to admins or the user who deleted the object
|
||||
// For trash items, also check if user is the one who deleted it
|
||||
if req.Source == resourcepb.ListRequest_TRASH {
|
||||
if !s.isTrashItemAuthorized(ctx, iter, trashChecker) {
|
||||
if !s.isTrashItemAuthorizedByValue(ctx, item.value, true) {
|
||||
continue
|
||||
}
|
||||
} else if !checker(iter.Name(), iter.Folder()) {
|
||||
continue
|
||||
}
|
||||
|
||||
item := &resourcepb.ResourceWrapper{
|
||||
ResourceVersion: iter.ResourceVersion(),
|
||||
Value: iter.Value(),
|
||||
}
|
||||
rsp.Items = append(rsp.Items, &resourcepb.ResourceWrapper{
|
||||
ResourceVersion: item.resourceVersion,
|
||||
Value: item.value,
|
||||
})
|
||||
pageBytes += len(item.value)
|
||||
|
||||
pageBytes += len(item.Value)
|
||||
rsp.Items = append(rsp.Items, item)
|
||||
// Check if we've reached the page limit
|
||||
if (req.Limit > 0 && len(rsp.Items) >= int(req.Limit)) || pageBytes >= maxPageBytes {
|
||||
t := iter.ContinueToken()
|
||||
if iter.Next() {
|
||||
rsp.NextPageToken = t
|
||||
}
|
||||
return iter.Error()
|
||||
nextToken = item.continueToken
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return iter.Error()
|
||||
}
|
||||
|
||||
var rv int64
|
||||
switch req.Source {
|
||||
case resourcepb.ListRequest_STORE:
|
||||
rv, err = s.backend.ListIterator(ctx, req, iterFunc)
|
||||
rv, iterErr = s.backend.ListIterator(ctx, req, iterFunc)
|
||||
case resourcepb.ListRequest_HISTORY, resourcepb.ListRequest_TRASH:
|
||||
rv, err = s.backend.ListHistory(ctx, req, iterFunc)
|
||||
rv, iterErr = s.backend.ListHistory(ctx, req, iterFunc)
|
||||
default:
|
||||
return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid list source: %v", req.Source))
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
rsp.Error = AsErrorResult(err)
|
||||
if iterErr != nil {
|
||||
rsp.Error = AsErrorResult(iterErr)
|
||||
return rsp, nil
|
||||
}
|
||||
|
||||
@@ -1134,18 +1150,21 @@ func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resour
|
||||
return rsp, nil
|
||||
}
|
||||
rsp.ResourceVersion = rv
|
||||
return rsp, err
|
||||
|
||||
rsp.NextPageToken = nextToken
|
||||
return rsp, nil
|
||||
}
|
||||
|
||||
// isTrashItemAuthorized checks if the user has access to the trash item.
|
||||
func (s *server) isTrashItemAuthorized(ctx context.Context, iter ListIterator, trashChecker claims.ItemChecker) bool {
|
||||
// isTrashItemAuthorizedByValue checks if the user has access to the trash item using the raw value.
|
||||
// hasAdminPermission indicates whether the user has admin permission (from BatchCheck).
|
||||
func (s *server) isTrashItemAuthorizedByValue(ctx context.Context, value []byte, hasAdminPermission bool) bool {
|
||||
user, ok := claims.AuthInfoFrom(ctx)
|
||||
if !ok || user == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
partial := &metav1.PartialObjectMetadata{}
|
||||
err := json.Unmarshal(iter.Value(), partial)
|
||||
err := json.Unmarshal(value, partial)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
@@ -1161,7 +1180,7 @@ func (s *server) isTrashItemAuthorized(ctx context.Context, iter ListIterator, t
|
||||
}
|
||||
|
||||
// Trash is only accessible to admins or the user who deleted the object
|
||||
return obj.GetUpdatedBy() == user.GetUID() || trashChecker(iter.Name(), iter.Folder())
|
||||
return obj.GetUpdatedBy() == user.GetUID() || hasAdminPermission
|
||||
}
|
||||
|
||||
func (s *server) initWatcher() error {
|
||||
@@ -1202,18 +1221,6 @@ func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStor
|
||||
}
|
||||
|
||||
key := req.Options.Key
|
||||
checker, _, err := s.access.Compile(ctx, user, claims.ListRequest{
|
||||
Group: key.Group,
|
||||
Resource: key.Resource,
|
||||
Namespace: key.Namespace,
|
||||
Verb: utils.VerbGet,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if checker == nil {
|
||||
return apierrors.NewUnauthorized("not allowed to list anything") // ?? or a single error?
|
||||
}
|
||||
|
||||
// Start listening -- this will buffer any changes that happen while we backfill.
|
||||
// If events are generated faster than we can process them, then some events will be dropped.
|
||||
@@ -1267,22 +1274,56 @@ func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStor
|
||||
|
||||
var initialEventsRV int64 // resource version coming from the initial events
|
||||
if req.SendInitialEvents {
|
||||
// Backfill the stream by adding every existing entities.
|
||||
// Backfill the stream by adding every existing entities with batch authorization
|
||||
type candidateEvent struct {
|
||||
name string
|
||||
folder string
|
||||
value []byte
|
||||
version int64
|
||||
}
|
||||
|
||||
initialEventsRV, err = s.backend.ListIterator(ctx, &resourcepb.ListRequest{Options: req.Options}, func(iter ListIterator) error {
|
||||
for iter.Next() {
|
||||
if err := iter.Error(); err != nil {
|
||||
// Convert ListIterator to iter.Seq
|
||||
candidates := func(yield func(candidateEvent) bool) {
|
||||
for iter.Next() {
|
||||
if !yield(candidateEvent{
|
||||
name: iter.Name(),
|
||||
folder: iter.Folder(),
|
||||
value: iter.Value(),
|
||||
version: iter.ResourceVersion(),
|
||||
}) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extractFn := func(c candidateEvent) authz.BatchCheckItem {
|
||||
return authz.BatchCheckItem{
|
||||
Name: c.name,
|
||||
Folder: c.folder,
|
||||
Verb: utils.VerbGet,
|
||||
Group: key.Group,
|
||||
Resource: key.Resource,
|
||||
Namespace: key.Namespace,
|
||||
}
|
||||
}
|
||||
|
||||
for item, err := range authz.FilterAuthorized(ctx, s.access, candidates, extractFn).Items {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := srv.Send(&resourcepb.WatchEvent{
|
||||
Type: resourcepb.WatchEvent_ADDED,
|
||||
Resource: &resourcepb.WatchEvent_Resource{
|
||||
Value: iter.Value(),
|
||||
Version: iter.ResourceVersion(),
|
||||
Value: item.value,
|
||||
Version: item.version,
|
||||
},
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return iter.Error()
|
||||
})
|
||||
if err != nil {
|
||||
@@ -1309,6 +1350,127 @@ func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStor
|
||||
default:
|
||||
since = req.Since
|
||||
}
|
||||
|
||||
// Type to hold candidate events for batch authorization
|
||||
type candidateWatchEvent struct {
|
||||
event *WrittenEvent
|
||||
}
|
||||
|
||||
// Type to hold authorized event with its fetched previous object
|
||||
type authorizedEvent struct {
|
||||
event *WrittenEvent
|
||||
previous *resourcepb.ReadResponse // nil if no previous or fetch failed
|
||||
}
|
||||
|
||||
const maxBatchSize = 100
|
||||
|
||||
// processEventBatch authorizes and sends a batch of events.
|
||||
// Errors are logged but never returned to keep the watch running.
|
||||
processEventBatch := func(batch []*WrittenEvent) {
|
||||
if len(batch) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Convert batch to iter.Seq for FilterAuthorized
|
||||
candidates := func(yield func(candidateWatchEvent) bool) {
|
||||
for _, event := range batch {
|
||||
if !yield(candidateWatchEvent{event: event}) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extractFn := func(c candidateWatchEvent) authz.BatchCheckItem {
|
||||
return authz.BatchCheckItem{
|
||||
Name: c.event.Key.Name,
|
||||
Folder: c.event.Folder,
|
||||
Verb: utils.VerbGet,
|
||||
Group: key.Group,
|
||||
Resource: key.Resource,
|
||||
Namespace: key.Namespace,
|
||||
}
|
||||
}
|
||||
|
||||
// Step 1: Collect all authorized events
|
||||
var authorizedEvents []authorizedEvent
|
||||
for item, err := range authz.FilterAuthorized(ctx, s.access, candidates, extractFn).Items {
|
||||
if err != nil {
|
||||
s.log.Error("error during batch authorization", "error", err)
|
||||
continue
|
||||
}
|
||||
authorizedEvents = append(authorizedEvents, authorizedEvent{event: item.event})
|
||||
}
|
||||
|
||||
if len(authorizedEvents) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Step 2: Fetch previous objects concurrently for events that need them
|
||||
var wg sync.WaitGroup
|
||||
for i := range authorizedEvents {
|
||||
if authorizedEvents[i].event.PreviousRV > 0 {
|
||||
wg.Add(1)
|
||||
go func(idx int) {
|
||||
defer wg.Done()
|
||||
event := authorizedEvents[idx].event
|
||||
prevObj, readErr := s.Read(ctx, &resourcepb.ReadRequest{Key: event.Key, ResourceVersion: event.PreviousRV})
|
||||
if readErr != nil {
|
||||
s.log.Error("error reading previous object", "key", event.Key, "resource_version", event.PreviousRV, "error", readErr)
|
||||
return
|
||||
}
|
||||
if prevObj.Error != nil {
|
||||
s.log.Error("error reading previous object", "key", event.Key, "resource_version", event.PreviousRV, "error", prevObj.Error)
|
||||
return
|
||||
}
|
||||
if prevObj.ResourceVersion != event.PreviousRV {
|
||||
s.log.Error("resource version mismatch", "key", event.Key, "resource_version", event.PreviousRV, "actual", prevObj.ResourceVersion)
|
||||
return
|
||||
}
|
||||
authorizedEvents[idx].previous = prevObj
|
||||
}(i)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Step 3: Send all events in order
|
||||
for _, authEvent := range authorizedEvents {
|
||||
event := authEvent.event
|
||||
value := event.Value
|
||||
// remove the delete marker stored in the value for deleted objects
|
||||
if event.Type == resourcepb.WatchEvent_DELETED {
|
||||
value = []byte{}
|
||||
}
|
||||
resp := &resourcepb.WatchEvent{
|
||||
Timestamp: event.Timestamp,
|
||||
Type: event.Type,
|
||||
Resource: &resourcepb.WatchEvent_Resource{
|
||||
Value: value,
|
||||
Version: event.ResourceVersion,
|
||||
},
|
||||
}
|
||||
if authEvent.previous != nil {
|
||||
resp.Previous = &resourcepb.WatchEvent_Resource{
|
||||
Value: authEvent.previous.Value,
|
||||
Version: authEvent.previous.ResourceVersion,
|
||||
}
|
||||
}
|
||||
if err := srv.Send(resp); err != nil {
|
||||
s.log.Error("error sending watch event", "key", event.Key, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if s.storageMetrics != nil {
|
||||
// record latency - resource version is a unix timestamp in microseconds so we convert to seconds
|
||||
latencySeconds := float64(time.Now().UnixMicro()-event.ResourceVersion) / 1e6
|
||||
if latencySeconds > 0 {
|
||||
s.storageMetrics.WatchEventLatency.WithLabelValues(event.Key.Resource).Observe(latencySeconds)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Main event loop with batching
|
||||
var batch []*WrittenEvent
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -1316,57 +1478,40 @@ func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStor
|
||||
|
||||
case event, ok := <-stream:
|
||||
if !ok {
|
||||
// Process any remaining events in the batch before closing
|
||||
processEventBatch(batch)
|
||||
s.log.Debug("watch events closed")
|
||||
return nil
|
||||
}
|
||||
s.log.Debug("Server Broadcasting", "type", event.Type, "rv", event.ResourceVersion, "previousRV", event.PreviousRV, "group", event.Key.Group, "namespace", event.Key.Namespace, "resource", event.Key.Resource, "name", event.Key.Name)
|
||||
if event.ResourceVersion > since && matchesQueryKey(req.Options.Key, event.Key) {
|
||||
if !checker(event.Key.Name, event.Folder) {
|
||||
continue
|
||||
}
|
||||
batch = append(batch, event)
|
||||
}
|
||||
|
||||
value := event.Value
|
||||
// remove the delete marker stored in the value for deleted objects
|
||||
if event.Type == resourcepb.WatchEvent_DELETED {
|
||||
value = []byte{}
|
||||
}
|
||||
resp := &resourcepb.WatchEvent{
|
||||
Timestamp: event.Timestamp,
|
||||
Type: event.Type,
|
||||
Resource: &resourcepb.WatchEvent_Resource{
|
||||
Value: value,
|
||||
Version: event.ResourceVersion,
|
||||
},
|
||||
}
|
||||
if event.PreviousRV > 0 {
|
||||
prevObj, err := s.Read(ctx, &resourcepb.ReadRequest{Key: event.Key, ResourceVersion: event.PreviousRV})
|
||||
if err != nil {
|
||||
// This scenario should never happen, but if it does, we should log it and continue
|
||||
// sending the event without the previous object. The client will decide what to do.
|
||||
s.log.Error("error reading previous object", "key", event.Key, "resource_version", event.PreviousRV, "error", prevObj.Error)
|
||||
} else {
|
||||
if prevObj.ResourceVersion != event.PreviousRV {
|
||||
s.log.Error("resource version mismatch", "key", event.Key, "resource_version", event.PreviousRV, "actual", prevObj.ResourceVersion)
|
||||
return fmt.Errorf("resource version mismatch")
|
||||
}
|
||||
resp.Previous = &resourcepb.WatchEvent_Resource{
|
||||
Value: prevObj.Value,
|
||||
Version: prevObj.ResourceVersion,
|
||||
}
|
||||
// Drain any additional events that are already available (non-blocking)
|
||||
// Stop draining when we reach maxBatchSize to bound memory and latency
|
||||
draining := true
|
||||
for draining && len(batch) < maxBatchSize {
|
||||
select {
|
||||
case event, ok := <-stream:
|
||||
if !ok {
|
||||
// Process the batch before closing
|
||||
processEventBatch(batch)
|
||||
s.log.Debug("watch events closed")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
if err := srv.Send(resp); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if s.storageMetrics != nil {
|
||||
// record latency - resource version is a unix timestamp in microseconds so we convert to seconds
|
||||
latencySeconds := float64(time.Now().UnixMicro()-event.ResourceVersion) / 1e6
|
||||
if latencySeconds > 0 {
|
||||
s.storageMetrics.WatchEventLatency.WithLabelValues(event.Key.Resource).Observe(latencySeconds)
|
||||
s.log.Debug("Server Broadcasting", "type", event.Type, "rv", event.ResourceVersion, "previousRV", event.PreviousRV, "group", event.Key.Group, "namespace", event.Key.Namespace, "resource", event.Key.Resource, "name", event.Key.Name)
|
||||
if event.ResourceVersion > since && matchesQueryKey(req.Options.Key, event.Key) {
|
||||
batch = append(batch, event)
|
||||
}
|
||||
default:
|
||||
draining = false
|
||||
}
|
||||
}
|
||||
|
||||
// Process the collected batch
|
||||
processEventBatch(batch)
|
||||
batch = batch[:0] // Reset batch for reuse
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"iter"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@@ -22,7 +23,6 @@ import (
|
||||
"github.com/blevesearch/bleve/v2/mapping"
|
||||
"github.com/blevesearch/bleve/v2/search"
|
||||
"github.com/blevesearch/bleve/v2/search/query"
|
||||
bleveSearch "github.com/blevesearch/bleve/v2/search/searcher"
|
||||
index "github.com/blevesearch/bleve_index_api"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
bolterrors "go.etcd.io/bbolt/errors"
|
||||
@@ -35,6 +35,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/search/builders"
|
||||
|
||||
"github.com/grafana/authlib/authz"
|
||||
authlib "github.com/grafana/authlib/types"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||
@@ -1300,43 +1301,27 @@ func (b *bleveIndex) toBleveSearchRequest(ctx context.Context, req *resourcepb.R
|
||||
}
|
||||
|
||||
if access != nil {
|
||||
auth, ok := authlib.AuthInfoFrom(ctx)
|
||||
if !ok {
|
||||
return nil, resource.AsErrorResult(fmt.Errorf("missing auth info"))
|
||||
}
|
||||
verb := utils.VerbList
|
||||
if req.Permission == int64(dashboardaccess.PERMISSION_EDIT) {
|
||||
verb = utils.VerbPatch
|
||||
}
|
||||
|
||||
checker, _, err := access.Compile(ctx, auth, authlib.ListRequest{
|
||||
Namespace: b.key.Namespace,
|
||||
Group: b.key.Group,
|
||||
Resource: b.key.Resource,
|
||||
Verb: verb,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, resource.AsErrorResult(err)
|
||||
}
|
||||
checkers := map[string]authlib.ItemChecker{
|
||||
b.key.Resource: checker,
|
||||
// Build resource -> verb mapping for batch authorization
|
||||
resources := map[string]string{
|
||||
b.key.Resource: verb,
|
||||
}
|
||||
|
||||
// handle federation
|
||||
// Handle federation
|
||||
for _, federated := range req.Federated {
|
||||
checker, _, err := access.Compile(ctx, auth, authlib.ListRequest{
|
||||
Namespace: federated.Namespace,
|
||||
Group: federated.Group,
|
||||
Resource: federated.Resource,
|
||||
Verb: utils.VerbList,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, resource.AsErrorResult(err)
|
||||
}
|
||||
checkers[federated.Resource] = checker
|
||||
resources[federated.Resource] = utils.VerbList
|
||||
}
|
||||
|
||||
searchrequest.Query = newPermissionScopedQuery(searchrequest.Query, checkers)
|
||||
searchrequest.Query = newPermissionScopedQuery(searchrequest.Query, permissionScopedQueryConfig{
|
||||
access: access,
|
||||
namespace: b.key.Namespace,
|
||||
group: b.key.Group,
|
||||
resources: resources,
|
||||
})
|
||||
}
|
||||
|
||||
for k, v := range req.Facet {
|
||||
@@ -1866,71 +1851,239 @@ func newResponseFacet(v *search.FacetResult) *resourcepb.ResourceSearchResponse_
|
||||
|
||||
type permissionScopedQuery struct {
|
||||
query.Query
|
||||
checkers map[string]authlib.ItemChecker // one checker per resource
|
||||
log log.Logger
|
||||
access authlib.AccessClient
|
||||
namespace string
|
||||
group string
|
||||
resources map[string]string // resource -> verb mapping
|
||||
log log.Logger
|
||||
}
|
||||
|
||||
func newPermissionScopedQuery(q query.Query, checkers map[string]authlib.ItemChecker) *permissionScopedQuery {
|
||||
type permissionScopedQueryConfig struct {
|
||||
access authlib.AccessClient
|
||||
namespace string
|
||||
group string
|
||||
resources map[string]string // resource -> verb mapping
|
||||
}
|
||||
|
||||
func newPermissionScopedQuery(q query.Query, cfg permissionScopedQueryConfig) *permissionScopedQuery {
|
||||
return &permissionScopedQuery{
|
||||
Query: q,
|
||||
checkers: checkers,
|
||||
log: log.New("search_permissions"),
|
||||
Query: q,
|
||||
access: cfg.access,
|
||||
namespace: cfg.namespace,
|
||||
group: cfg.group,
|
||||
resources: cfg.resources,
|
||||
log: log.New("search_permissions"),
|
||||
}
|
||||
}
|
||||
|
||||
func (q *permissionScopedQuery) Searcher(ctx context.Context, i index.IndexReader, m mapping.IndexMapping, options search.SearcherOptions) (search.Searcher, error) {
|
||||
// Get a new logger from context, to pass traceIDs etc.
|
||||
logger := q.log.FromContext(ctx)
|
||||
searcher, err := q.Query.Searcher(ctx, i, m, options)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dvReader, err := i.DocValueReader([]string{"folder"})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
filteringSearcher := bleveSearch.NewFilteringSearcher(ctx, searcher, func(d *search.DocumentMatch) bool {
|
||||
// The doc ID has the format: <namespace>/<group>/<resourceType>/<name>
|
||||
// IndexInternalID will be the same as the doc ID when using an in-memory index, but when using a file-based
|
||||
// index it becomes a binary encoded number that has some other internal meaning. Using ExternalID() will get the
|
||||
// correct doc ID regardless of the index type.
|
||||
d.ID, err = i.ExternalID(d.IndexInternalID)
|
||||
if err != nil {
|
||||
logger.Debug("Error getting external ID", "error", err)
|
||||
return false
|
||||
}
|
||||
|
||||
parts := strings.Split(d.ID, "/")
|
||||
// Exclude doc if id isn't expected format
|
||||
if len(parts) != 4 {
|
||||
logger.Debug("Unexpected document ID format", "id", d.ID)
|
||||
return false
|
||||
}
|
||||
ns := parts[0]
|
||||
resource := parts[2]
|
||||
name := parts[3]
|
||||
folder := ""
|
||||
err = dvReader.VisitDocValues(d.IndexInternalID, func(field string, value []byte) {
|
||||
if field == "folder" {
|
||||
folder = string(value)
|
||||
return newBatchAuthzSearcher(ctx, searcher, i, dvReader, q.access, q.namespace, q.group, q.resources, logger), nil
|
||||
}
|
||||
|
||||
// docInfo holds document information for authorization
|
||||
type docInfo struct {
|
||||
doc *search.DocumentMatch
|
||||
resourceType string
|
||||
name string
|
||||
folder string
|
||||
verb string
|
||||
}
|
||||
|
||||
// batchAuthzSearcher implements a batch-aware authorization filtering searcher
|
||||
// using FilterAuthorized with iter.Pull2 for efficient batched authorization
|
||||
type batchAuthzSearcher struct {
|
||||
ctx context.Context
|
||||
searcher search.Searcher
|
||||
indexReader index.IndexReader
|
||||
dvReader index.DocValueReader
|
||||
access authlib.AccessClient
|
||||
namespace string
|
||||
group string
|
||||
resources map[string]string // resource -> verb mapping
|
||||
log log.Logger
|
||||
|
||||
// Pull iterator state (lazily initialized)
|
||||
searchCtx *search.SearchContext
|
||||
next func() (docInfo, error, bool)
|
||||
stop func()
|
||||
}
|
||||
|
||||
func newBatchAuthzSearcher(
|
||||
ctx context.Context,
|
||||
searcher search.Searcher,
|
||||
indexReader index.IndexReader,
|
||||
dvReader index.DocValueReader,
|
||||
access authlib.AccessClient,
|
||||
namespace string,
|
||||
group string,
|
||||
resources map[string]string,
|
||||
logger log.Logger,
|
||||
) *batchAuthzSearcher {
|
||||
return &batchAuthzSearcher{
|
||||
ctx: ctx,
|
||||
searcher: searcher,
|
||||
indexReader: indexReader,
|
||||
dvReader: dvReader,
|
||||
access: access,
|
||||
namespace: namespace,
|
||||
group: group,
|
||||
resources: resources,
|
||||
log: logger,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *batchAuthzSearcher) Next(searchCtx *search.SearchContext) (*search.DocumentMatch, error) {
|
||||
// Lazy initialization of pull iterator
|
||||
if s.next == nil {
|
||||
s.searchCtx = searchCtx
|
||||
s.initPullIterator()
|
||||
}
|
||||
|
||||
info, err, ok := s.next()
|
||||
if !ok {
|
||||
return nil, nil // No more documents
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return info.doc, nil
|
||||
}
|
||||
|
||||
// initPullIterator sets up the FilterAuthorized iterator as a pull iterator
|
||||
func (s *batchAuthzSearcher) initPullIterator() {
|
||||
// Create iter.Seq that pulls documents from searcher and parses them
|
||||
candidates := func(yield func(docInfo) bool) {
|
||||
for {
|
||||
doc, err := s.searcher.Next(s.searchCtx)
|
||||
if err != nil {
|
||||
s.log.Debug("Error getting next document", "error", err)
|
||||
return
|
||||
}
|
||||
if doc == nil {
|
||||
return // No more documents
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
logger.Debug("Error reading doc values", "error", err)
|
||||
return false
|
||||
}
|
||||
if _, ok := q.checkers[resource]; !ok {
|
||||
logger.Debug("No resource checker found", "resource", resource)
|
||||
return false
|
||||
}
|
||||
allowed := q.checkers[resource](name, folder)
|
||||
if !allowed {
|
||||
logger.Debug("Denying access", "ns", ns, "name", name, "folder", folder)
|
||||
}
|
||||
return allowed
|
||||
})
|
||||
|
||||
return filteringSearcher, nil
|
||||
info, ok := s.parseDocInfo(doc)
|
||||
if !ok {
|
||||
continue // Skip invalid documents
|
||||
}
|
||||
|
||||
if !yield(info) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extractFn := func(info docInfo) authz.BatchCheckItem {
|
||||
return authz.BatchCheckItem{
|
||||
Name: info.name,
|
||||
Folder: info.folder,
|
||||
Verb: info.verb,
|
||||
Group: s.group,
|
||||
Resource: info.resourceType,
|
||||
Namespace: s.namespace,
|
||||
}
|
||||
}
|
||||
|
||||
// FilterAuthorized extracts auth from context and batches internally
|
||||
authzIter := authz.FilterAuthorized(s.ctx, s.access, candidates, extractFn).Items
|
||||
|
||||
// Convert push iterator to pull iterator
|
||||
s.next, s.stop = iter.Pull2(authzIter)
|
||||
}
|
||||
|
||||
// parseDocInfo extracts document information needed for authorization
|
||||
func (s *batchAuthzSearcher) parseDocInfo(doc *search.DocumentMatch) (docInfo, bool) {
|
||||
// Get external ID
|
||||
externalID, err := s.indexReader.ExternalID(doc.IndexInternalID)
|
||||
if err != nil {
|
||||
s.log.Debug("Error getting external ID", "error", err)
|
||||
return docInfo{}, false
|
||||
}
|
||||
doc.ID = externalID
|
||||
|
||||
// Parse doc ID: <namespace>/<group>/<resourceType>/<name>
|
||||
parts := strings.Split(doc.ID, "/")
|
||||
if len(parts) != 4 {
|
||||
s.log.Debug("Unexpected document ID format", "id", doc.ID)
|
||||
return docInfo{}, false
|
||||
}
|
||||
|
||||
resourceType := parts[2]
|
||||
name := parts[3]
|
||||
|
||||
// Get folder from doc values
|
||||
folder := ""
|
||||
err = s.dvReader.VisitDocValues(doc.IndexInternalID, func(field string, value []byte) {
|
||||
if field == "folder" {
|
||||
folder = string(value)
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
s.log.Debug("Error reading doc values", "error", err)
|
||||
return docInfo{}, false
|
||||
}
|
||||
|
||||
// Check if we have a verb for this resource type
|
||||
verb, ok := s.resources[resourceType]
|
||||
if !ok {
|
||||
s.log.Debug("No verb found for resource", "resource", resourceType)
|
||||
return docInfo{}, false
|
||||
}
|
||||
|
||||
return docInfo{
|
||||
doc: doc,
|
||||
resourceType: resourceType,
|
||||
name: name,
|
||||
folder: folder,
|
||||
verb: verb,
|
||||
}, true
|
||||
}
|
||||
|
||||
func (s *batchAuthzSearcher) Advance(searchCtx *search.SearchContext, ID index.IndexInternalID) (*search.DocumentMatch, error) {
|
||||
return s.searcher.Advance(searchCtx, ID)
|
||||
}
|
||||
|
||||
func (s *batchAuthzSearcher) Close() error {
|
||||
if s.stop != nil {
|
||||
s.stop()
|
||||
}
|
||||
return s.searcher.Close()
|
||||
}
|
||||
|
||||
func (s *batchAuthzSearcher) Size() int {
|
||||
return s.searcher.Size()
|
||||
}
|
||||
|
||||
func (s *batchAuthzSearcher) DocumentMatchPoolSize() int {
|
||||
return s.searcher.DocumentMatchPoolSize()
|
||||
}
|
||||
|
||||
func (s *batchAuthzSearcher) Min() int {
|
||||
return s.searcher.Min()
|
||||
}
|
||||
|
||||
func (s *batchAuthzSearcher) Count() uint64 {
|
||||
return s.searcher.Count()
|
||||
}
|
||||
|
||||
func (s *batchAuthzSearcher) SetQueryNorm(qnorm float64) {
|
||||
s.searcher.SetQueryNorm(qnorm)
|
||||
}
|
||||
|
||||
func (s *batchAuthzSearcher) Weight() float64 {
|
||||
return s.searcher.Weight()
|
||||
}
|
||||
|
||||
// hasTerms - any value that will be split into multiple tokens
|
||||
|
||||
@@ -653,8 +653,12 @@ func (nc StubAccessClient) Write(ctx context.Context, req *authzextv1.WriteReque
|
||||
return nil
|
||||
}
|
||||
|
||||
func (nc StubAccessClient) BatchCheck(ctx context.Context, req *authzextv1.BatchCheckRequest) (*authzextv1.BatchCheckResponse, error) {
|
||||
return nil, nil
|
||||
func (nc StubAccessClient) BatchCheck(ctx context.Context, user authlib.AuthInfo, req authlib.BatchCheckRequest) (authlib.BatchCheckResponse, error) {
|
||||
results := make(map[string]authlib.BatchCheckResult, len(req.Checks))
|
||||
for _, item := range req.Checks {
|
||||
results[item.CorrelationID] = authlib.BatchCheckResult{Allowed: nc.resourceResponses[item.Resource]}
|
||||
}
|
||||
return authlib.BatchCheckResponse{Results: results}, nil
|
||||
}
|
||||
|
||||
func TestSafeInt64ToInt(t *testing.T) {
|
||||
|
||||
@@ -517,6 +517,28 @@ func (m *mockAccessClient) Check(ctx context.Context, user types.AuthInfo, req t
|
||||
return types.CheckResponse{Allowed: m.allowed}, nil
|
||||
}
|
||||
|
||||
func (m *mockAccessClient) BatchCheck(ctx context.Context, user types.AuthInfo, req types.BatchCheckRequest) (types.BatchCheckResponse, error) {
|
||||
results := make(map[string]types.BatchCheckResult, len(req.Checks))
|
||||
|
||||
for _, check := range req.Checks {
|
||||
allowed := m.allowed
|
||||
|
||||
// Check specific folder:verb mappings if provided
|
||||
if m.allowedMap != nil {
|
||||
key := fmt.Sprintf("%s:%s", check.Folder, check.Verb)
|
||||
if a, exists := m.allowedMap[key]; exists {
|
||||
allowed = a
|
||||
}
|
||||
}
|
||||
|
||||
results[check.CorrelationID] = types.BatchCheckResult{
|
||||
Allowed: allowed,
|
||||
}
|
||||
}
|
||||
|
||||
return types.BatchCheckResponse{Results: results}, nil
|
||||
}
|
||||
|
||||
func (m *mockAccessClient) Compile(ctx context.Context, user types.AuthInfo, req types.ListRequest) (types.ItemChecker, types.Zookie, error) {
|
||||
if m.compileFn != nil {
|
||||
return m.compileFn(user, req), types.NoopZookie{}, nil
|
||||
|
||||
Reference in New Issue
Block a user