Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 1022f04063 | |||
| 5e7d0392a3 |
@@ -4,10 +4,10 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/grafana/authlib/authz"
|
||||||
authlib "github.com/grafana/authlib/types"
|
authlib "github.com/grafana/authlib/types"
|
||||||
|
|
||||||
iamv0alpha1 "github.com/grafana/grafana/apps/iam/pkg/apis/iam/v0alpha1"
|
iamv0alpha1 "github.com/grafana/grafana/apps/iam/pkg/apis/iam/v0alpha1"
|
||||||
"github.com/grafana/grafana/pkg/apimachinery/identity"
|
|
||||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||||
legacyiamv0 "github.com/grafana/grafana/pkg/apis/iam/v0alpha1"
|
legacyiamv0 "github.com/grafana/grafana/pkg/apis/iam/v0alpha1"
|
||||||
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
|
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
|
||||||
@@ -54,10 +54,10 @@ type ListResponse[T Resource] struct {
|
|||||||
type ListFunc[T Resource] func(ctx context.Context, ns authlib.NamespaceInfo, p Pagination) (*ListResponse[T], error)
|
type ListFunc[T Resource] func(ctx context.Context, ns authlib.NamespaceInfo, p Pagination) (*ListResponse[T], error)
|
||||||
|
|
||||||
// List is a helper function that will perform access check on resources if
|
// List is a helper function that will perform access check on resources if
|
||||||
// prvovided with a authlib.AccessClient.
|
// provided with a authlib.AccessClient.
|
||||||
func List[T Resource](
|
func List[T Resource](
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
resource utils.ResourceInfo,
|
resourceInfo utils.ResourceInfo,
|
||||||
ac authlib.AccessClient,
|
ac authlib.AccessClient,
|
||||||
p Pagination,
|
p Pagination,
|
||||||
fn ListFunc[T],
|
fn ListFunc[T],
|
||||||
@@ -67,63 +67,86 @@ func List[T Resource](
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
ident, err := identity.GetRequester(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
check := func(_, _ string) bool { return true }
|
|
||||||
if ac != nil {
|
|
||||||
var err error
|
|
||||||
check, _, err = ac.Compile(ctx, ident, authlib.ListRequest{
|
|
||||||
Resource: resource.GroupResource().Resource,
|
|
||||||
Group: resource.GroupResource().Group,
|
|
||||||
Verb: "list",
|
|
||||||
Namespace: ns.Value,
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
res := &ListResponse[T]{Items: make([]T, 0, p.Limit)}
|
res := &ListResponse[T]{Items: make([]T, 0, p.Limit)}
|
||||||
|
|
||||||
first, err := fn(ctx, ns, p)
|
first, err := fn(ctx, ns, p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, item := range first.Items {
|
|
||||||
if !check(item.AuthID(), "") {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
res.Items = append(res.Items, item)
|
|
||||||
}
|
|
||||||
res.Continue = first.Continue
|
res.Continue = first.Continue
|
||||||
res.RV = first.RV
|
res.RV = first.RV
|
||||||
|
|
||||||
|
// If no access client, skip authorization
|
||||||
|
if ac == nil {
|
||||||
|
res.Items = append(res.Items, first.Items...)
|
||||||
|
|
||||||
|
for len(res.Items) < int(p.Limit) && res.Continue != 0 {
|
||||||
|
r, err := fn(ctx, ns, Pagination{Limit: p.Limit - int64(len(res.Items)), Continue: res.Continue})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
res.Items = append(res.Items, r.Items...)
|
||||||
|
res.Continue = r.Continue
|
||||||
|
}
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use FilterAuthorized to batch authorize items
|
||||||
|
extractFn := func(item T) authz.BatchCheckItem {
|
||||||
|
return authz.BatchCheckItem{
|
||||||
|
Name: item.AuthID(),
|
||||||
|
Folder: "",
|
||||||
|
Verb: "list",
|
||||||
|
Group: resourceInfo.GroupResource().Group,
|
||||||
|
Resource: resourceInfo.GroupResource().Resource,
|
||||||
|
Namespace: ns.Value,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert first batch to iter.Seq and filter
|
||||||
|
firstCandidates := func(yield func(T) bool) {
|
||||||
|
for _, item := range first.Items {
|
||||||
|
if !yield(item) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for item, err := range authz.FilterAuthorized(ctx, ac, firstCandidates, extractFn).Items {
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
res.Items = append(res.Items, item)
|
||||||
|
}
|
||||||
|
|
||||||
outer:
|
outer:
|
||||||
for len(res.Items) < int(p.Limit) && res.Continue != 0 {
|
for len(res.Items) < int(p.Limit) && res.Continue != 0 {
|
||||||
// FIXME: it is not optimal to reduce the amout we look for here but it is the easiest way to
|
// FIXME: it is not optimal to reduce the amount we look for here but it is the easiest way to
|
||||||
// correctly handle pagination and continue tokens
|
// correctly handle pagination and continue tokens
|
||||||
r, err := fn(ctx, ns, Pagination{Limit: p.Limit - int64(len(res.Items)), Continue: res.Continue})
|
r, err := fn(ctx, ns, Pagination{Limit: p.Limit - int64(len(res.Items)), Continue: res.Continue})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, item := range r.Items {
|
candidates := func(yield func(T) bool) {
|
||||||
if len(res.Items) == int(p.Limit) {
|
for _, item := range r.Items {
|
||||||
|
if !yield(item) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for item, authErr := range authz.FilterAuthorized(ctx, ac, candidates, extractFn).Items {
|
||||||
|
if authErr != nil {
|
||||||
|
return nil, authErr
|
||||||
|
}
|
||||||
|
if len(res.Items) >= int(p.Limit) {
|
||||||
res.Continue = r.Continue
|
res.Continue = r.Continue
|
||||||
break outer
|
break outer
|
||||||
}
|
}
|
||||||
|
|
||||||
if !check(item.AuthID(), "") {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
res.Items = append(res.Items, item)
|
res.Items = append(res.Items, item)
|
||||||
}
|
}
|
||||||
|
res.Continue = r.Continue
|
||||||
}
|
}
|
||||||
|
|
||||||
return res, nil
|
return res, nil
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/grafana/authlib/authz"
|
||||||
claims "github.com/grafana/authlib/types"
|
claims "github.com/grafana/authlib/types"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
@@ -297,35 +298,38 @@ func (s *SecureValueService) List(ctx context.Context, namespace xkube.Namespace
|
|||||||
s.metrics.SecureValueListDuration.WithLabelValues(strconv.FormatBool(success)).Observe(time.Since(start).Seconds())
|
s.metrics.SecureValueListDuration.WithLabelValues(strconv.FormatBool(success)).Observe(time.Since(start).Seconds())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
user, ok := claims.AuthInfoFrom(ctx)
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("missing auth info in context")
|
|
||||||
}
|
|
||||||
|
|
||||||
hasPermissionFor, _, err := s.accessClient.Compile(ctx, user, claims.ListRequest{
|
|
||||||
Group: secretv1beta1.APIGroup,
|
|
||||||
Resource: secretv1beta1.SecureValuesResourceInfo.GetName(),
|
|
||||||
Namespace: namespace.String(),
|
|
||||||
Verb: utils.VerbGet, // Why not VerbList?
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to compile checker: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
secureValuesMetadata, err := s.secureValueMetadataStorage.List(ctx, namespace)
|
secureValuesMetadata, err := s.secureValueMetadataStorage.List(ctx, namespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("fetching secure values from storage: %+w", err)
|
return nil, fmt.Errorf("fetching secure values from storage: %+w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Convert slice to iter.Seq
|
||||||
|
candidates := func(yield func(secretv1beta1.SecureValue) bool) {
|
||||||
|
for _, m := range secureValuesMetadata {
|
||||||
|
if !yield(m) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
extractFn := func(sv secretv1beta1.SecureValue) authz.BatchCheckItem {
|
||||||
|
return authz.BatchCheckItem{
|
||||||
|
Name: sv.Name,
|
||||||
|
Folder: "",
|
||||||
|
Verb: utils.VerbGet, // Why not VerbList?
|
||||||
|
Group: secretv1beta1.APIGroup,
|
||||||
|
Resource: secretv1beta1.SecureValuesResourceInfo.GetName(),
|
||||||
|
Namespace: namespace.String(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
out := make([]secretv1beta1.SecureValue, 0)
|
out := make([]secretv1beta1.SecureValue, 0)
|
||||||
|
|
||||||
for _, metadata := range secureValuesMetadata {
|
for item, err := range authz.FilterAuthorized(ctx, s.accessClient, candidates, extractFn).Items {
|
||||||
// Check whether the user has permission to access this specific SecureValue in the namespace.
|
if err != nil {
|
||||||
if !hasPermissionFor(metadata.Name, "") {
|
return nil, fmt.Errorf("failed to check authorization: %w", err)
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
out = append(out, item)
|
||||||
out = append(out, metadata)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return &secretv1beta1.SecureValueList{
|
return &secretv1beta1.SecureValueList{
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ import (
|
|||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||||
|
|
||||||
|
"github.com/grafana/authlib/authz"
|
||||||
claims "github.com/grafana/authlib/types"
|
claims "github.com/grafana/authlib/types"
|
||||||
"github.com/grafana/dskit/backoff"
|
"github.com/grafana/dskit/backoff"
|
||||||
|
|
||||||
@@ -1051,78 +1052,93 @@ func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resour
|
|||||||
rsp := &resourcepb.ListResponse{}
|
rsp := &resourcepb.ListResponse{}
|
||||||
|
|
||||||
key := req.Options.Key
|
key := req.Options.Key
|
||||||
checker, _, err := s.access.Compile(ctx, user, claims.ListRequest{
|
|
||||||
Group: key.Group,
|
// Determine verb for authorization
|
||||||
Resource: key.Resource,
|
verb := utils.VerbGet
|
||||||
Namespace: key.Namespace,
|
|
||||||
Verb: utils.VerbGet,
|
|
||||||
})
|
|
||||||
var trashChecker claims.ItemChecker // only for trash
|
|
||||||
if req.Source == resourcepb.ListRequest_TRASH {
|
if req.Source == resourcepb.ListRequest_TRASH {
|
||||||
trashChecker, _, err = s.access.Compile(ctx, user, claims.ListRequest{
|
verb = utils.VerbSetPermissions // Basically Admin for trash
|
||||||
Group: key.Group,
|
|
||||||
Resource: key.Resource,
|
|
||||||
Namespace: key.Namespace,
|
|
||||||
Verb: utils.VerbSetPermissions, // Basically Admin
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return &resourcepb.ListResponse{Error: AsErrorResult(err)}, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return &resourcepb.ListResponse{Error: AsErrorResult(err)}, nil
|
|
||||||
}
|
|
||||||
if checker == nil {
|
|
||||||
return &resourcepb.ListResponse{Error: &resourcepb.ErrorResult{
|
|
||||||
Code: http.StatusForbidden,
|
|
||||||
}}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Candidate item for batch authorization
|
||||||
|
type candidateItem struct {
|
||||||
|
name string
|
||||||
|
folder string
|
||||||
|
resourceVersion int64
|
||||||
|
value []byte
|
||||||
|
continueToken string
|
||||||
|
}
|
||||||
|
|
||||||
|
var nextToken string
|
||||||
|
var iterErr error
|
||||||
|
|
||||||
|
// Process items in batches within the iterator
|
||||||
iterFunc := func(iter ListIterator) error {
|
iterFunc := func(iter ListIterator) error {
|
||||||
for iter.Next() {
|
// Convert ListIterator to iter.Seq
|
||||||
if err := iter.Error(); err != nil {
|
candidates := func(yield func(candidateItem) bool) {
|
||||||
|
for iter.Next() {
|
||||||
|
if !yield(candidateItem{
|
||||||
|
name: iter.Name(),
|
||||||
|
folder: iter.Folder(),
|
||||||
|
resourceVersion: iter.ResourceVersion(),
|
||||||
|
value: iter.Value(),
|
||||||
|
continueToken: iter.ContinueToken(),
|
||||||
|
}) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
extractFn := func(c candidateItem) authz.BatchCheckItem {
|
||||||
|
return authz.BatchCheckItem{
|
||||||
|
Name: c.name,
|
||||||
|
Folder: c.folder,
|
||||||
|
Verb: verb,
|
||||||
|
Group: key.Group,
|
||||||
|
Resource: key.Resource,
|
||||||
|
Namespace: key.Namespace,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for item, err := range authz.FilterAuthorized(ctx, s.access, candidates, extractFn).Items {
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Trash is only accessible to admins or the user who deleted the object
|
// For trash items, also check if user is the one who deleted it
|
||||||
if req.Source == resourcepb.ListRequest_TRASH {
|
if req.Source == resourcepb.ListRequest_TRASH {
|
||||||
if !s.isTrashItemAuthorized(ctx, iter, trashChecker) {
|
if !s.isTrashItemAuthorizedByValue(ctx, item.value, true) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
} else if !checker(iter.Name(), iter.Folder()) {
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
item := &resourcepb.ResourceWrapper{
|
rsp.Items = append(rsp.Items, &resourcepb.ResourceWrapper{
|
||||||
ResourceVersion: iter.ResourceVersion(),
|
ResourceVersion: item.resourceVersion,
|
||||||
Value: iter.Value(),
|
Value: item.value,
|
||||||
}
|
})
|
||||||
|
pageBytes += len(item.value)
|
||||||
|
|
||||||
pageBytes += len(item.Value)
|
// Check if we've reached the page limit
|
||||||
rsp.Items = append(rsp.Items, item)
|
|
||||||
if (req.Limit > 0 && len(rsp.Items) >= int(req.Limit)) || pageBytes >= maxPageBytes {
|
if (req.Limit > 0 && len(rsp.Items) >= int(req.Limit)) || pageBytes >= maxPageBytes {
|
||||||
t := iter.ContinueToken()
|
nextToken = item.continueToken
|
||||||
if iter.Next() {
|
break
|
||||||
rsp.NextPageToken = t
|
|
||||||
}
|
|
||||||
return iter.Error()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return iter.Error()
|
return iter.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
var rv int64
|
var rv int64
|
||||||
switch req.Source {
|
switch req.Source {
|
||||||
case resourcepb.ListRequest_STORE:
|
case resourcepb.ListRequest_STORE:
|
||||||
rv, err = s.backend.ListIterator(ctx, req, iterFunc)
|
rv, iterErr = s.backend.ListIterator(ctx, req, iterFunc)
|
||||||
case resourcepb.ListRequest_HISTORY, resourcepb.ListRequest_TRASH:
|
case resourcepb.ListRequest_HISTORY, resourcepb.ListRequest_TRASH:
|
||||||
rv, err = s.backend.ListHistory(ctx, req, iterFunc)
|
rv, iterErr = s.backend.ListHistory(ctx, req, iterFunc)
|
||||||
default:
|
default:
|
||||||
return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid list source: %v", req.Source))
|
return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid list source: %v", req.Source))
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if iterErr != nil {
|
||||||
rsp.Error = AsErrorResult(err)
|
rsp.Error = AsErrorResult(iterErr)
|
||||||
return rsp, nil
|
return rsp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1134,18 +1150,21 @@ func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resour
|
|||||||
return rsp, nil
|
return rsp, nil
|
||||||
}
|
}
|
||||||
rsp.ResourceVersion = rv
|
rsp.ResourceVersion = rv
|
||||||
return rsp, err
|
|
||||||
|
rsp.NextPageToken = nextToken
|
||||||
|
return rsp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// isTrashItemAuthorized checks if the user has access to the trash item.
|
// isTrashItemAuthorizedByValue checks if the user has access to the trash item using the raw value.
|
||||||
func (s *server) isTrashItemAuthorized(ctx context.Context, iter ListIterator, trashChecker claims.ItemChecker) bool {
|
// hasAdminPermission indicates whether the user has admin permission (from BatchCheck).
|
||||||
|
func (s *server) isTrashItemAuthorizedByValue(ctx context.Context, value []byte, hasAdminPermission bool) bool {
|
||||||
user, ok := claims.AuthInfoFrom(ctx)
|
user, ok := claims.AuthInfoFrom(ctx)
|
||||||
if !ok || user == nil {
|
if !ok || user == nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
partial := &metav1.PartialObjectMetadata{}
|
partial := &metav1.PartialObjectMetadata{}
|
||||||
err := json.Unmarshal(iter.Value(), partial)
|
err := json.Unmarshal(value, partial)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@@ -1161,7 +1180,7 @@ func (s *server) isTrashItemAuthorized(ctx context.Context, iter ListIterator, t
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Trash is only accessible to admins or the user who deleted the object
|
// Trash is only accessible to admins or the user who deleted the object
|
||||||
return obj.GetUpdatedBy() == user.GetUID() || trashChecker(iter.Name(), iter.Folder())
|
return obj.GetUpdatedBy() == user.GetUID() || hasAdminPermission
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *server) initWatcher() error {
|
func (s *server) initWatcher() error {
|
||||||
@@ -1202,18 +1221,6 @@ func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStor
|
|||||||
}
|
}
|
||||||
|
|
||||||
key := req.Options.Key
|
key := req.Options.Key
|
||||||
checker, _, err := s.access.Compile(ctx, user, claims.ListRequest{
|
|
||||||
Group: key.Group,
|
|
||||||
Resource: key.Resource,
|
|
||||||
Namespace: key.Namespace,
|
|
||||||
Verb: utils.VerbGet,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if checker == nil {
|
|
||||||
return apierrors.NewUnauthorized("not allowed to list anything") // ?? or a single error?
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start listening -- this will buffer any changes that happen while we backfill.
|
// Start listening -- this will buffer any changes that happen while we backfill.
|
||||||
// If events are generated faster than we can process them, then some events will be dropped.
|
// If events are generated faster than we can process them, then some events will be dropped.
|
||||||
@@ -1267,22 +1274,56 @@ func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStor
|
|||||||
|
|
||||||
var initialEventsRV int64 // resource version coming from the initial events
|
var initialEventsRV int64 // resource version coming from the initial events
|
||||||
if req.SendInitialEvents {
|
if req.SendInitialEvents {
|
||||||
// Backfill the stream by adding every existing entities.
|
// Backfill the stream by adding every existing entities with batch authorization
|
||||||
|
type candidateEvent struct {
|
||||||
|
name string
|
||||||
|
folder string
|
||||||
|
value []byte
|
||||||
|
version int64
|
||||||
|
}
|
||||||
|
|
||||||
initialEventsRV, err = s.backend.ListIterator(ctx, &resourcepb.ListRequest{Options: req.Options}, func(iter ListIterator) error {
|
initialEventsRV, err = s.backend.ListIterator(ctx, &resourcepb.ListRequest{Options: req.Options}, func(iter ListIterator) error {
|
||||||
for iter.Next() {
|
// Convert ListIterator to iter.Seq
|
||||||
if err := iter.Error(); err != nil {
|
candidates := func(yield func(candidateEvent) bool) {
|
||||||
|
for iter.Next() {
|
||||||
|
if !yield(candidateEvent{
|
||||||
|
name: iter.Name(),
|
||||||
|
folder: iter.Folder(),
|
||||||
|
value: iter.Value(),
|
||||||
|
version: iter.ResourceVersion(),
|
||||||
|
}) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
extractFn := func(c candidateEvent) authz.BatchCheckItem {
|
||||||
|
return authz.BatchCheckItem{
|
||||||
|
Name: c.name,
|
||||||
|
Folder: c.folder,
|
||||||
|
Verb: utils.VerbGet,
|
||||||
|
Group: key.Group,
|
||||||
|
Resource: key.Resource,
|
||||||
|
Namespace: key.Namespace,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for item, err := range authz.FilterAuthorized(ctx, s.access, candidates, extractFn).Items {
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := srv.Send(&resourcepb.WatchEvent{
|
if err := srv.Send(&resourcepb.WatchEvent{
|
||||||
Type: resourcepb.WatchEvent_ADDED,
|
Type: resourcepb.WatchEvent_ADDED,
|
||||||
Resource: &resourcepb.WatchEvent_Resource{
|
Resource: &resourcepb.WatchEvent_Resource{
|
||||||
Value: iter.Value(),
|
Value: item.value,
|
||||||
Version: iter.ResourceVersion(),
|
Version: item.version,
|
||||||
},
|
},
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return iter.Error()
|
return iter.Error()
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -1309,6 +1350,127 @@ func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStor
|
|||||||
default:
|
default:
|
||||||
since = req.Since
|
since = req.Since
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Type to hold candidate events for batch authorization
|
||||||
|
type candidateWatchEvent struct {
|
||||||
|
event *WrittenEvent
|
||||||
|
}
|
||||||
|
|
||||||
|
// Type to hold authorized event with its fetched previous object
|
||||||
|
type authorizedEvent struct {
|
||||||
|
event *WrittenEvent
|
||||||
|
previous *resourcepb.ReadResponse // nil if no previous or fetch failed
|
||||||
|
}
|
||||||
|
|
||||||
|
const maxBatchSize = 100
|
||||||
|
|
||||||
|
// processEventBatch authorizes and sends a batch of events.
|
||||||
|
// Errors are logged but never returned to keep the watch running.
|
||||||
|
processEventBatch := func(batch []*WrittenEvent) {
|
||||||
|
if len(batch) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert batch to iter.Seq for FilterAuthorized
|
||||||
|
candidates := func(yield func(candidateWatchEvent) bool) {
|
||||||
|
for _, event := range batch {
|
||||||
|
if !yield(candidateWatchEvent{event: event}) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
extractFn := func(c candidateWatchEvent) authz.BatchCheckItem {
|
||||||
|
return authz.BatchCheckItem{
|
||||||
|
Name: c.event.Key.Name,
|
||||||
|
Folder: c.event.Folder,
|
||||||
|
Verb: utils.VerbGet,
|
||||||
|
Group: key.Group,
|
||||||
|
Resource: key.Resource,
|
||||||
|
Namespace: key.Namespace,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 1: Collect all authorized events
|
||||||
|
var authorizedEvents []authorizedEvent
|
||||||
|
for item, err := range authz.FilterAuthorized(ctx, s.access, candidates, extractFn).Items {
|
||||||
|
if err != nil {
|
||||||
|
s.log.Error("error during batch authorization", "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
authorizedEvents = append(authorizedEvents, authorizedEvent{event: item.event})
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(authorizedEvents) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 2: Fetch previous objects concurrently for events that need them
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := range authorizedEvents {
|
||||||
|
if authorizedEvents[i].event.PreviousRV > 0 {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(idx int) {
|
||||||
|
defer wg.Done()
|
||||||
|
event := authorizedEvents[idx].event
|
||||||
|
prevObj, readErr := s.Read(ctx, &resourcepb.ReadRequest{Key: event.Key, ResourceVersion: event.PreviousRV})
|
||||||
|
if readErr != nil {
|
||||||
|
s.log.Error("error reading previous object", "key", event.Key, "resource_version", event.PreviousRV, "error", readErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if prevObj.Error != nil {
|
||||||
|
s.log.Error("error reading previous object", "key", event.Key, "resource_version", event.PreviousRV, "error", prevObj.Error)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if prevObj.ResourceVersion != event.PreviousRV {
|
||||||
|
s.log.Error("resource version mismatch", "key", event.Key, "resource_version", event.PreviousRV, "actual", prevObj.ResourceVersion)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
authorizedEvents[idx].previous = prevObj
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Step 3: Send all events in order
|
||||||
|
for _, authEvent := range authorizedEvents {
|
||||||
|
event := authEvent.event
|
||||||
|
value := event.Value
|
||||||
|
// remove the delete marker stored in the value for deleted objects
|
||||||
|
if event.Type == resourcepb.WatchEvent_DELETED {
|
||||||
|
value = []byte{}
|
||||||
|
}
|
||||||
|
resp := &resourcepb.WatchEvent{
|
||||||
|
Timestamp: event.Timestamp,
|
||||||
|
Type: event.Type,
|
||||||
|
Resource: &resourcepb.WatchEvent_Resource{
|
||||||
|
Value: value,
|
||||||
|
Version: event.ResourceVersion,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if authEvent.previous != nil {
|
||||||
|
resp.Previous = &resourcepb.WatchEvent_Resource{
|
||||||
|
Value: authEvent.previous.Value,
|
||||||
|
Version: authEvent.previous.ResourceVersion,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := srv.Send(resp); err != nil {
|
||||||
|
s.log.Error("error sending watch event", "key", event.Key, "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.storageMetrics != nil {
|
||||||
|
// record latency - resource version is a unix timestamp in microseconds so we convert to seconds
|
||||||
|
latencySeconds := float64(time.Now().UnixMicro()-event.ResourceVersion) / 1e6
|
||||||
|
if latencySeconds > 0 {
|
||||||
|
s.storageMetrics.WatchEventLatency.WithLabelValues(event.Key.Resource).Observe(latencySeconds)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Main event loop with batching
|
||||||
|
var batch []*WrittenEvent
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
@@ -1316,57 +1478,40 @@ func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStor
|
|||||||
|
|
||||||
case event, ok := <-stream:
|
case event, ok := <-stream:
|
||||||
if !ok {
|
if !ok {
|
||||||
|
// Process any remaining events in the batch before closing
|
||||||
|
processEventBatch(batch)
|
||||||
s.log.Debug("watch events closed")
|
s.log.Debug("watch events closed")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
s.log.Debug("Server Broadcasting", "type", event.Type, "rv", event.ResourceVersion, "previousRV", event.PreviousRV, "group", event.Key.Group, "namespace", event.Key.Namespace, "resource", event.Key.Resource, "name", event.Key.Name)
|
s.log.Debug("Server Broadcasting", "type", event.Type, "rv", event.ResourceVersion, "previousRV", event.PreviousRV, "group", event.Key.Group, "namespace", event.Key.Namespace, "resource", event.Key.Resource, "name", event.Key.Name)
|
||||||
if event.ResourceVersion > since && matchesQueryKey(req.Options.Key, event.Key) {
|
if event.ResourceVersion > since && matchesQueryKey(req.Options.Key, event.Key) {
|
||||||
if !checker(event.Key.Name, event.Folder) {
|
batch = append(batch, event)
|
||||||
continue
|
}
|
||||||
}
|
|
||||||
|
|
||||||
value := event.Value
|
// Drain any additional events that are already available (non-blocking)
|
||||||
// remove the delete marker stored in the value for deleted objects
|
// Stop draining when we reach maxBatchSize to bound memory and latency
|
||||||
if event.Type == resourcepb.WatchEvent_DELETED {
|
draining := true
|
||||||
value = []byte{}
|
for draining && len(batch) < maxBatchSize {
|
||||||
}
|
select {
|
||||||
resp := &resourcepb.WatchEvent{
|
case event, ok := <-stream:
|
||||||
Timestamp: event.Timestamp,
|
if !ok {
|
||||||
Type: event.Type,
|
// Process the batch before closing
|
||||||
Resource: &resourcepb.WatchEvent_Resource{
|
processEventBatch(batch)
|
||||||
Value: value,
|
s.log.Debug("watch events closed")
|
||||||
Version: event.ResourceVersion,
|
return nil
|
||||||
},
|
|
||||||
}
|
|
||||||
if event.PreviousRV > 0 {
|
|
||||||
prevObj, err := s.Read(ctx, &resourcepb.ReadRequest{Key: event.Key, ResourceVersion: event.PreviousRV})
|
|
||||||
if err != nil {
|
|
||||||
// This scenario should never happen, but if it does, we should log it and continue
|
|
||||||
// sending the event without the previous object. The client will decide what to do.
|
|
||||||
s.log.Error("error reading previous object", "key", event.Key, "resource_version", event.PreviousRV, "error", prevObj.Error)
|
|
||||||
} else {
|
|
||||||
if prevObj.ResourceVersion != event.PreviousRV {
|
|
||||||
s.log.Error("resource version mismatch", "key", event.Key, "resource_version", event.PreviousRV, "actual", prevObj.ResourceVersion)
|
|
||||||
return fmt.Errorf("resource version mismatch")
|
|
||||||
}
|
|
||||||
resp.Previous = &resourcepb.WatchEvent_Resource{
|
|
||||||
Value: prevObj.Value,
|
|
||||||
Version: prevObj.ResourceVersion,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
s.log.Debug("Server Broadcasting", "type", event.Type, "rv", event.ResourceVersion, "previousRV", event.PreviousRV, "group", event.Key.Group, "namespace", event.Key.Namespace, "resource", event.Key.Resource, "name", event.Key.Name)
|
||||||
if err := srv.Send(resp); err != nil {
|
if event.ResourceVersion > since && matchesQueryKey(req.Options.Key, event.Key) {
|
||||||
return err
|
batch = append(batch, event)
|
||||||
}
|
|
||||||
|
|
||||||
if s.storageMetrics != nil {
|
|
||||||
// record latency - resource version is a unix timestamp in microseconds so we convert to seconds
|
|
||||||
latencySeconds := float64(time.Now().UnixMicro()-event.ResourceVersion) / 1e6
|
|
||||||
if latencySeconds > 0 {
|
|
||||||
s.storageMetrics.WatchEventLatency.WithLabelValues(event.Key.Resource).Observe(latencySeconds)
|
|
||||||
}
|
}
|
||||||
|
default:
|
||||||
|
draining = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Process the collected batch
|
||||||
|
processEventBatch(batch)
|
||||||
|
batch = batch[:0] // Reset batch for reuse
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"iter"
|
||||||
"math"
|
"math"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
@@ -22,7 +23,6 @@ import (
|
|||||||
"github.com/blevesearch/bleve/v2/mapping"
|
"github.com/blevesearch/bleve/v2/mapping"
|
||||||
"github.com/blevesearch/bleve/v2/search"
|
"github.com/blevesearch/bleve/v2/search"
|
||||||
"github.com/blevesearch/bleve/v2/search/query"
|
"github.com/blevesearch/bleve/v2/search/query"
|
||||||
bleveSearch "github.com/blevesearch/bleve/v2/search/searcher"
|
|
||||||
index "github.com/blevesearch/bleve_index_api"
|
index "github.com/blevesearch/bleve_index_api"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
bolterrors "go.etcd.io/bbolt/errors"
|
bolterrors "go.etcd.io/bbolt/errors"
|
||||||
@@ -35,6 +35,7 @@ import (
|
|||||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/search/builders"
|
"github.com/grafana/grafana/pkg/storage/unified/search/builders"
|
||||||
|
|
||||||
|
"github.com/grafana/authlib/authz"
|
||||||
authlib "github.com/grafana/authlib/types"
|
authlib "github.com/grafana/authlib/types"
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||||
@@ -1300,43 +1301,27 @@ func (b *bleveIndex) toBleveSearchRequest(ctx context.Context, req *resourcepb.R
|
|||||||
}
|
}
|
||||||
|
|
||||||
if access != nil {
|
if access != nil {
|
||||||
auth, ok := authlib.AuthInfoFrom(ctx)
|
|
||||||
if !ok {
|
|
||||||
return nil, resource.AsErrorResult(fmt.Errorf("missing auth info"))
|
|
||||||
}
|
|
||||||
verb := utils.VerbList
|
verb := utils.VerbList
|
||||||
if req.Permission == int64(dashboardaccess.PERMISSION_EDIT) {
|
if req.Permission == int64(dashboardaccess.PERMISSION_EDIT) {
|
||||||
verb = utils.VerbPatch
|
verb = utils.VerbPatch
|
||||||
}
|
}
|
||||||
|
|
||||||
checker, _, err := access.Compile(ctx, auth, authlib.ListRequest{
|
// Build resource -> verb mapping for batch authorization
|
||||||
Namespace: b.key.Namespace,
|
resources := map[string]string{
|
||||||
Group: b.key.Group,
|
b.key.Resource: verb,
|
||||||
Resource: b.key.Resource,
|
|
||||||
Verb: verb,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, resource.AsErrorResult(err)
|
|
||||||
}
|
|
||||||
checkers := map[string]authlib.ItemChecker{
|
|
||||||
b.key.Resource: checker,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// handle federation
|
// Handle federation
|
||||||
for _, federated := range req.Federated {
|
for _, federated := range req.Federated {
|
||||||
checker, _, err := access.Compile(ctx, auth, authlib.ListRequest{
|
resources[federated.Resource] = utils.VerbList
|
||||||
Namespace: federated.Namespace,
|
|
||||||
Group: federated.Group,
|
|
||||||
Resource: federated.Resource,
|
|
||||||
Verb: utils.VerbList,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, resource.AsErrorResult(err)
|
|
||||||
}
|
|
||||||
checkers[federated.Resource] = checker
|
|
||||||
}
|
}
|
||||||
|
|
||||||
searchrequest.Query = newPermissionScopedQuery(searchrequest.Query, checkers)
|
searchrequest.Query = newPermissionScopedQuery(searchrequest.Query, permissionScopedQueryConfig{
|
||||||
|
access: access,
|
||||||
|
namespace: b.key.Namespace,
|
||||||
|
group: b.key.Group,
|
||||||
|
resources: resources,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
for k, v := range req.Facet {
|
for k, v := range req.Facet {
|
||||||
@@ -1866,71 +1851,239 @@ func newResponseFacet(v *search.FacetResult) *resourcepb.ResourceSearchResponse_
|
|||||||
|
|
||||||
type permissionScopedQuery struct {
|
type permissionScopedQuery struct {
|
||||||
query.Query
|
query.Query
|
||||||
checkers map[string]authlib.ItemChecker // one checker per resource
|
access authlib.AccessClient
|
||||||
log log.Logger
|
namespace string
|
||||||
|
group string
|
||||||
|
resources map[string]string // resource -> verb mapping
|
||||||
|
log log.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPermissionScopedQuery(q query.Query, checkers map[string]authlib.ItemChecker) *permissionScopedQuery {
|
type permissionScopedQueryConfig struct {
|
||||||
|
access authlib.AccessClient
|
||||||
|
namespace string
|
||||||
|
group string
|
||||||
|
resources map[string]string // resource -> verb mapping
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPermissionScopedQuery(q query.Query, cfg permissionScopedQueryConfig) *permissionScopedQuery {
|
||||||
return &permissionScopedQuery{
|
return &permissionScopedQuery{
|
||||||
Query: q,
|
Query: q,
|
||||||
checkers: checkers,
|
access: cfg.access,
|
||||||
log: log.New("search_permissions"),
|
namespace: cfg.namespace,
|
||||||
|
group: cfg.group,
|
||||||
|
resources: cfg.resources,
|
||||||
|
log: log.New("search_permissions"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *permissionScopedQuery) Searcher(ctx context.Context, i index.IndexReader, m mapping.IndexMapping, options search.SearcherOptions) (search.Searcher, error) {
|
func (q *permissionScopedQuery) Searcher(ctx context.Context, i index.IndexReader, m mapping.IndexMapping, options search.SearcherOptions) (search.Searcher, error) {
|
||||||
// Get a new logger from context, to pass traceIDs etc.
|
|
||||||
logger := q.log.FromContext(ctx)
|
logger := q.log.FromContext(ctx)
|
||||||
searcher, err := q.Query.Searcher(ctx, i, m, options)
|
searcher, err := q.Query.Searcher(ctx, i, m, options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
dvReader, err := i.DocValueReader([]string{"folder"})
|
dvReader, err := i.DocValueReader([]string{"folder"})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
filteringSearcher := bleveSearch.NewFilteringSearcher(ctx, searcher, func(d *search.DocumentMatch) bool {
|
|
||||||
// The doc ID has the format: <namespace>/<group>/<resourceType>/<name>
|
|
||||||
// IndexInternalID will be the same as the doc ID when using an in-memory index, but when using a file-based
|
|
||||||
// index it becomes a binary encoded number that has some other internal meaning. Using ExternalID() will get the
|
|
||||||
// correct doc ID regardless of the index type.
|
|
||||||
d.ID, err = i.ExternalID(d.IndexInternalID)
|
|
||||||
if err != nil {
|
|
||||||
logger.Debug("Error getting external ID", "error", err)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
parts := strings.Split(d.ID, "/")
|
return newBatchAuthzSearcher(ctx, searcher, i, dvReader, q.access, q.namespace, q.group, q.resources, logger), nil
|
||||||
// Exclude doc if id isn't expected format
|
}
|
||||||
if len(parts) != 4 {
|
|
||||||
logger.Debug("Unexpected document ID format", "id", d.ID)
|
// docInfo holds document information for authorization
|
||||||
return false
|
type docInfo struct {
|
||||||
}
|
doc *search.DocumentMatch
|
||||||
ns := parts[0]
|
resourceType string
|
||||||
resource := parts[2]
|
name string
|
||||||
name := parts[3]
|
folder string
|
||||||
folder := ""
|
verb string
|
||||||
err = dvReader.VisitDocValues(d.IndexInternalID, func(field string, value []byte) {
|
}
|
||||||
if field == "folder" {
|
|
||||||
folder = string(value)
|
// batchAuthzSearcher implements a batch-aware authorization filtering searcher
|
||||||
|
// using FilterAuthorized with iter.Pull2 for efficient batched authorization
|
||||||
|
type batchAuthzSearcher struct {
|
||||||
|
ctx context.Context
|
||||||
|
searcher search.Searcher
|
||||||
|
indexReader index.IndexReader
|
||||||
|
dvReader index.DocValueReader
|
||||||
|
access authlib.AccessClient
|
||||||
|
namespace string
|
||||||
|
group string
|
||||||
|
resources map[string]string // resource -> verb mapping
|
||||||
|
log log.Logger
|
||||||
|
|
||||||
|
// Pull iterator state (lazily initialized)
|
||||||
|
searchCtx *search.SearchContext
|
||||||
|
next func() (docInfo, error, bool)
|
||||||
|
stop func()
|
||||||
|
}
|
||||||
|
|
||||||
|
func newBatchAuthzSearcher(
|
||||||
|
ctx context.Context,
|
||||||
|
searcher search.Searcher,
|
||||||
|
indexReader index.IndexReader,
|
||||||
|
dvReader index.DocValueReader,
|
||||||
|
access authlib.AccessClient,
|
||||||
|
namespace string,
|
||||||
|
group string,
|
||||||
|
resources map[string]string,
|
||||||
|
logger log.Logger,
|
||||||
|
) *batchAuthzSearcher {
|
||||||
|
return &batchAuthzSearcher{
|
||||||
|
ctx: ctx,
|
||||||
|
searcher: searcher,
|
||||||
|
indexReader: indexReader,
|
||||||
|
dvReader: dvReader,
|
||||||
|
access: access,
|
||||||
|
namespace: namespace,
|
||||||
|
group: group,
|
||||||
|
resources: resources,
|
||||||
|
log: logger,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *batchAuthzSearcher) Next(searchCtx *search.SearchContext) (*search.DocumentMatch, error) {
|
||||||
|
// Lazy initialization of pull iterator
|
||||||
|
if s.next == nil {
|
||||||
|
s.searchCtx = searchCtx
|
||||||
|
s.initPullIterator()
|
||||||
|
}
|
||||||
|
|
||||||
|
info, err, ok := s.next()
|
||||||
|
if !ok {
|
||||||
|
return nil, nil // No more documents
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return info.doc, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// initPullIterator sets up the FilterAuthorized iterator as a pull iterator
|
||||||
|
func (s *batchAuthzSearcher) initPullIterator() {
|
||||||
|
// Create iter.Seq that pulls documents from searcher and parses them
|
||||||
|
candidates := func(yield func(docInfo) bool) {
|
||||||
|
for {
|
||||||
|
doc, err := s.searcher.Next(s.searchCtx)
|
||||||
|
if err != nil {
|
||||||
|
s.log.Debug("Error getting next document", "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if doc == nil {
|
||||||
|
return // No more documents
|
||||||
}
|
}
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
logger.Debug("Error reading doc values", "error", err)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if _, ok := q.checkers[resource]; !ok {
|
|
||||||
logger.Debug("No resource checker found", "resource", resource)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
allowed := q.checkers[resource](name, folder)
|
|
||||||
if !allowed {
|
|
||||||
logger.Debug("Denying access", "ns", ns, "name", name, "folder", folder)
|
|
||||||
}
|
|
||||||
return allowed
|
|
||||||
})
|
|
||||||
|
|
||||||
return filteringSearcher, nil
|
info, ok := s.parseDocInfo(doc)
|
||||||
|
if !ok {
|
||||||
|
continue // Skip invalid documents
|
||||||
|
}
|
||||||
|
|
||||||
|
if !yield(info) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
extractFn := func(info docInfo) authz.BatchCheckItem {
|
||||||
|
return authz.BatchCheckItem{
|
||||||
|
Name: info.name,
|
||||||
|
Folder: info.folder,
|
||||||
|
Verb: info.verb,
|
||||||
|
Group: s.group,
|
||||||
|
Resource: info.resourceType,
|
||||||
|
Namespace: s.namespace,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// FilterAuthorized extracts auth from context and batches internally
|
||||||
|
authzIter := authz.FilterAuthorized(s.ctx, s.access, candidates, extractFn).Items
|
||||||
|
|
||||||
|
// Convert push iterator to pull iterator
|
||||||
|
s.next, s.stop = iter.Pull2(authzIter)
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseDocInfo extracts document information needed for authorization
|
||||||
|
func (s *batchAuthzSearcher) parseDocInfo(doc *search.DocumentMatch) (docInfo, bool) {
|
||||||
|
// Get external ID
|
||||||
|
externalID, err := s.indexReader.ExternalID(doc.IndexInternalID)
|
||||||
|
if err != nil {
|
||||||
|
s.log.Debug("Error getting external ID", "error", err)
|
||||||
|
return docInfo{}, false
|
||||||
|
}
|
||||||
|
doc.ID = externalID
|
||||||
|
|
||||||
|
// Parse doc ID: <namespace>/<group>/<resourceType>/<name>
|
||||||
|
parts := strings.Split(doc.ID, "/")
|
||||||
|
if len(parts) != 4 {
|
||||||
|
s.log.Debug("Unexpected document ID format", "id", doc.ID)
|
||||||
|
return docInfo{}, false
|
||||||
|
}
|
||||||
|
|
||||||
|
resourceType := parts[2]
|
||||||
|
name := parts[3]
|
||||||
|
|
||||||
|
// Get folder from doc values
|
||||||
|
folder := ""
|
||||||
|
err = s.dvReader.VisitDocValues(doc.IndexInternalID, func(field string, value []byte) {
|
||||||
|
if field == "folder" {
|
||||||
|
folder = string(value)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
s.log.Debug("Error reading doc values", "error", err)
|
||||||
|
return docInfo{}, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if we have a verb for this resource type
|
||||||
|
verb, ok := s.resources[resourceType]
|
||||||
|
if !ok {
|
||||||
|
s.log.Debug("No verb found for resource", "resource", resourceType)
|
||||||
|
return docInfo{}, false
|
||||||
|
}
|
||||||
|
|
||||||
|
return docInfo{
|
||||||
|
doc: doc,
|
||||||
|
resourceType: resourceType,
|
||||||
|
name: name,
|
||||||
|
folder: folder,
|
||||||
|
verb: verb,
|
||||||
|
}, true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *batchAuthzSearcher) Advance(searchCtx *search.SearchContext, ID index.IndexInternalID) (*search.DocumentMatch, error) {
|
||||||
|
return s.searcher.Advance(searchCtx, ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *batchAuthzSearcher) Close() error {
|
||||||
|
if s.stop != nil {
|
||||||
|
s.stop()
|
||||||
|
}
|
||||||
|
return s.searcher.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *batchAuthzSearcher) Size() int {
|
||||||
|
return s.searcher.Size()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *batchAuthzSearcher) DocumentMatchPoolSize() int {
|
||||||
|
return s.searcher.DocumentMatchPoolSize()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *batchAuthzSearcher) Min() int {
|
||||||
|
return s.searcher.Min()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *batchAuthzSearcher) Count() uint64 {
|
||||||
|
return s.searcher.Count()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *batchAuthzSearcher) SetQueryNorm(qnorm float64) {
|
||||||
|
s.searcher.SetQueryNorm(qnorm)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *batchAuthzSearcher) Weight() float64 {
|
||||||
|
return s.searcher.Weight()
|
||||||
}
|
}
|
||||||
|
|
||||||
// hasTerms - any value that will be split into multiple tokens
|
// hasTerms - any value that will be split into multiple tokens
|
||||||
|
|||||||
@@ -653,8 +653,12 @@ func (nc StubAccessClient) Write(ctx context.Context, req *authzextv1.WriteReque
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nc StubAccessClient) BatchCheck(ctx context.Context, req *authzextv1.BatchCheckRequest) (*authzextv1.BatchCheckResponse, error) {
|
func (nc StubAccessClient) BatchCheck(ctx context.Context, user authlib.AuthInfo, req authlib.BatchCheckRequest) (authlib.BatchCheckResponse, error) {
|
||||||
return nil, nil
|
results := make(map[string]authlib.BatchCheckResult, len(req.Checks))
|
||||||
|
for _, item := range req.Checks {
|
||||||
|
results[item.CorrelationID] = authlib.BatchCheckResult{Allowed: nc.resourceResponses[item.Resource]}
|
||||||
|
}
|
||||||
|
return authlib.BatchCheckResponse{Results: results}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSafeInt64ToInt(t *testing.T) {
|
func TestSafeInt64ToInt(t *testing.T) {
|
||||||
|
|||||||
@@ -517,6 +517,28 @@ func (m *mockAccessClient) Check(ctx context.Context, user types.AuthInfo, req t
|
|||||||
return types.CheckResponse{Allowed: m.allowed}, nil
|
return types.CheckResponse{Allowed: m.allowed}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *mockAccessClient) BatchCheck(ctx context.Context, user types.AuthInfo, req types.BatchCheckRequest) (types.BatchCheckResponse, error) {
|
||||||
|
results := make(map[string]types.BatchCheckResult, len(req.Checks))
|
||||||
|
|
||||||
|
for _, check := range req.Checks {
|
||||||
|
allowed := m.allowed
|
||||||
|
|
||||||
|
// Check specific folder:verb mappings if provided
|
||||||
|
if m.allowedMap != nil {
|
||||||
|
key := fmt.Sprintf("%s:%s", check.Folder, check.Verb)
|
||||||
|
if a, exists := m.allowedMap[key]; exists {
|
||||||
|
allowed = a
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
results[check.CorrelationID] = types.BatchCheckResult{
|
||||||
|
Allowed: allowed,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return types.BatchCheckResponse{Results: results}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (m *mockAccessClient) Compile(ctx context.Context, user types.AuthInfo, req types.ListRequest) (types.ItemChecker, types.Zookie, error) {
|
func (m *mockAccessClient) Compile(ctx context.Context, user types.AuthInfo, req types.ListRequest) (types.ItemChecker, types.Zookie, error) {
|
||||||
if m.compileFn != nil {
|
if m.compileFn != nil {
|
||||||
return m.compileFn(user, req), types.NoopZookie{}, nil
|
return m.compileFn(user, req), types.NoopZookie{}, nil
|
||||||
|
|||||||
Reference in New Issue
Block a user