* Modify index eviction mechanism such that unowned indexes are also evicted. * Propagate OwnsIndex function to bleve backend Fix tests. Stop eviction goroutine when stopping backend. Make linter happy. Make sure we stop backend created by tests. Review suggestion. Removed newline.
1431 lines
41 KiB
Go
1431 lines
41 KiB
Go
package resource
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"iter"
|
||
"log/slog"
|
||
"net/http"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
"github.com/google/uuid"
|
||
"github.com/prometheus/client_golang/prometheus"
|
||
"go.opentelemetry.io/otel/trace"
|
||
"go.opentelemetry.io/otel/trace/noop"
|
||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||
|
||
claims "github.com/grafana/authlib/types"
|
||
"github.com/grafana/dskit/backoff"
|
||
"github.com/grafana/dskit/ring"
|
||
|
||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||
secrets "github.com/grafana/grafana/pkg/registry/apis/secret/contracts"
|
||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||
"github.com/grafana/grafana/pkg/util/scheduler"
|
||
)
|
||
|
||
// ResourceServer implements all gRPC services
|
||
type ResourceServer interface {
|
||
resourcepb.ResourceStoreServer
|
||
resourcepb.BulkStoreServer
|
||
resourcepb.ResourceIndexServer
|
||
resourcepb.ManagedObjectIndexServer
|
||
resourcepb.BlobStoreServer
|
||
resourcepb.DiagnosticsServer
|
||
}
|
||
|
||
type ListIterator interface {
|
||
// Next advances iterator and returns true if there is next value is available from the iterator.
|
||
// Error() should be checked after every call of Next(), even when Next() returns true.
|
||
Next() bool // sql.Rows
|
||
|
||
// Error returns iterator error, if any. This should be checked after any Next() call.
|
||
// (Some iterator implementations return true from Next, but also set the error at the same time).
|
||
Error() error
|
||
|
||
// ContinueToken returns the token that can be used to start iterating *after* this item
|
||
ContinueToken() string
|
||
|
||
// ResourceVersion of the current item
|
||
ResourceVersion() int64
|
||
|
||
// Namespace of the current item
|
||
// Used for fast(er) authz filtering
|
||
Namespace() string
|
||
|
||
// Name of the current item
|
||
// Used for fast(er) authz filtering
|
||
Name() string
|
||
|
||
// Folder of the current item
|
||
// Used for fast(er) authz filtering
|
||
Folder() string
|
||
|
||
// Value for the current item
|
||
Value() []byte
|
||
}
|
||
|
||
type BackendReadResponse struct {
|
||
// Metadata
|
||
Key *resourcepb.ResourceKey
|
||
Folder string
|
||
|
||
// GUID that is used internally
|
||
GUID string
|
||
// The new resource version
|
||
ResourceVersion int64
|
||
// The properties
|
||
Value []byte
|
||
// Error details
|
||
Error *resourcepb.ErrorResult
|
||
}
|
||
|
||
// The StorageBackend is an internal abstraction that supports interacting with
|
||
// the underlying raw storage medium. This interface is never exposed directly,
|
||
// it is provided by concrete instances that actually write values.
|
||
type StorageBackend interface {
|
||
// Write a Create/Update/Delete,
|
||
// NOTE: the contents of WriteEvent have been validated
|
||
// Return the revisionVersion for this event or error
|
||
WriteEvent(context.Context, WriteEvent) (int64, error)
|
||
|
||
// Read a resource from storage optionally at an explicit version
|
||
ReadResource(context.Context, *resourcepb.ReadRequest) *BackendReadResponse
|
||
|
||
// When the ResourceServer executes a List request, this iterator will
|
||
// query the backend for potential results. All results will be
|
||
// checked against the kubernetes requirements before finally returning
|
||
// results. The list options can be used to improve performance
|
||
// but are the the final answer.
|
||
ListIterator(context.Context, *resourcepb.ListRequest, func(ListIterator) error) (int64, error)
|
||
|
||
// ListHistory is like ListIterator, but it returns the history of a resource
|
||
ListHistory(context.Context, *resourcepb.ListRequest, func(ListIterator) error) (int64, error)
|
||
|
||
// ListModifiedSince will return all resources that have changed since the given resource version.
|
||
// If a resource has changes, only the latest change will be returned.
|
||
ListModifiedSince(ctx context.Context, key NamespacedResource, sinceRv int64) (int64, iter.Seq2[*ModifiedResource, error])
|
||
|
||
// Get all events from the store
|
||
// For HA setups, this will be more events than the local WriteEvent above!
|
||
WatchWriteEvents(ctx context.Context) (<-chan *WrittenEvent, error)
|
||
|
||
// Get resource stats within the storage backend. When namespace is empty, it will apply to all
|
||
GetResourceStats(ctx context.Context, namespace string, minCount int) ([]ResourceStats, error)
|
||
}
|
||
|
||
type ModifiedResource struct {
|
||
Action resourcepb.WatchEvent_Type
|
||
Key resourcepb.ResourceKey
|
||
Value []byte
|
||
ResourceVersion int64
|
||
}
|
||
|
||
type ResourceStats struct {
|
||
NamespacedResource
|
||
|
||
Count int64
|
||
ResourceVersion int64
|
||
}
|
||
|
||
// This interface is not exposed to end users directly
|
||
// Access to this interface is already gated by access control
|
||
type BlobSupport interface {
|
||
// Indicates if storage layer supports signed urls
|
||
SupportsSignedURLs() bool
|
||
|
||
// Get the raw blob bytes and metadata -- limited to protobuf message size
|
||
// For larger payloads, we should use presigned URLs to upload from the client
|
||
PutResourceBlob(context.Context, *resourcepb.PutBlobRequest) (*resourcepb.PutBlobResponse, error)
|
||
|
||
// Get blob contents. When possible, this will return a signed URL
|
||
// For large payloads, signed URLs are required to avoid protobuf message size limits
|
||
GetResourceBlob(ctx context.Context, resource *resourcepb.ResourceKey, info *utils.BlobInfo, mustProxy bool) (*resourcepb.GetBlobResponse, error)
|
||
|
||
// TODO? List+Delete? This is for admin access
|
||
}
|
||
|
||
type QOSEnqueuer interface {
|
||
Enqueue(ctx context.Context, tenantID string, runnable func()) error
|
||
}
|
||
|
||
type QueueConfig struct {
|
||
MaxBackoff time.Duration
|
||
MinBackoff time.Duration
|
||
MaxRetries int
|
||
Timeout time.Duration
|
||
}
|
||
|
||
type BlobConfig struct {
|
||
// The CDK configuration URL
|
||
URL string
|
||
|
||
// Directly implemented blob support
|
||
Backend BlobSupport
|
||
}
|
||
|
||
// Passed as input to the constructor
|
||
type SearchOptions struct {
|
||
// The raw index backend (eg, bleve, frames, parquet, etc)
|
||
Backend SearchBackend
|
||
|
||
// The supported resource types
|
||
Resources DocumentBuilderSupplier
|
||
|
||
// How many threads should build indexes
|
||
WorkerThreads int
|
||
|
||
// Skip building index on startup for small indexes
|
||
InitMinCount int
|
||
|
||
// Interval for periodic index rebuilds (0 disables periodic rebuilds)
|
||
RebuildInterval time.Duration
|
||
|
||
Ring *ring.Ring
|
||
}
|
||
|
||
type ResourceServerOptions struct {
|
||
// OTel tracer
|
||
Tracer trace.Tracer
|
||
|
||
// Real storage backend
|
||
Backend StorageBackend
|
||
|
||
// The blob configuration
|
||
Blob BlobConfig
|
||
|
||
// Search options
|
||
Search SearchOptions
|
||
|
||
// Diagnostics
|
||
Diagnostics resourcepb.DiagnosticsServer
|
||
|
||
// Check if a user has access to write folders
|
||
// When this is nil, no resources can have folders configured
|
||
WriteHooks WriteAccessHooks
|
||
|
||
// Link RBAC
|
||
AccessClient claims.AccessClient
|
||
|
||
// Manage secure values
|
||
SecureValues secrets.InlineSecureValueSupport
|
||
|
||
// Callbacks for startup and shutdown
|
||
Lifecycle LifecycleHooks
|
||
|
||
// Get the current time in unix millis
|
||
Now func() int64
|
||
|
||
// Registerer to register prometheus Metrics for the Resource server
|
||
Reg prometheus.Registerer
|
||
|
||
storageMetrics *StorageMetrics
|
||
|
||
IndexMetrics *BleveIndexMetrics
|
||
|
||
// MaxPageSizeBytes is the maximum size of a page in bytes.
|
||
MaxPageSizeBytes int
|
||
|
||
// QOSQueue is the quality of service queue used to enqueue
|
||
QOSQueue QOSEnqueuer
|
||
QOSConfig QueueConfig
|
||
|
||
OwnsIndexFn func(key NamespacedResource) (bool, error)
|
||
}
|
||
|
||
func NewResourceServer(opts ResourceServerOptions) (*server, error) {
|
||
if opts.Tracer == nil {
|
||
opts.Tracer = noop.NewTracerProvider().Tracer("resource-server")
|
||
}
|
||
|
||
if opts.Backend == nil {
|
||
return nil, fmt.Errorf("missing Backend implementation")
|
||
}
|
||
|
||
if opts.AccessClient == nil {
|
||
opts.AccessClient = claims.FixedAccessClient(true) // everything OK
|
||
}
|
||
|
||
if opts.Diagnostics == nil {
|
||
opts.Diagnostics = &noopService{}
|
||
}
|
||
|
||
if opts.Now == nil {
|
||
opts.Now = func() int64 {
|
||
return time.Now().UnixMilli()
|
||
}
|
||
}
|
||
|
||
if opts.MaxPageSizeBytes <= 0 {
|
||
// By default, we use 2MB for the page size.
|
||
opts.MaxPageSizeBytes = 1024 * 1024 * 2
|
||
}
|
||
|
||
if opts.QOSQueue == nil {
|
||
opts.QOSQueue = scheduler.NewNoopQueue()
|
||
}
|
||
|
||
if opts.QOSConfig.Timeout == 0 {
|
||
opts.QOSConfig.Timeout = 30 * time.Second
|
||
}
|
||
if opts.QOSConfig.MaxBackoff == 0 {
|
||
opts.QOSConfig.MaxBackoff = 1 * time.Second
|
||
}
|
||
if opts.QOSConfig.MinBackoff == 0 {
|
||
opts.QOSConfig.MinBackoff = 100 * time.Millisecond
|
||
}
|
||
if opts.QOSConfig.MaxRetries == 0 {
|
||
opts.QOSConfig.MaxRetries = 3
|
||
}
|
||
|
||
// Initialize the blob storage
|
||
blobstore := opts.Blob.Backend
|
||
if blobstore == nil {
|
||
if opts.Blob.URL != "" {
|
||
ctx := context.Background()
|
||
bucket, err := OpenBlobBucket(ctx, opts.Blob.URL)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
blobstore, err = NewCDKBlobSupport(ctx, CDKBlobSupportOptions{
|
||
Tracer: opts.Tracer,
|
||
Bucket: NewInstrumentedBucket(bucket, opts.Reg, opts.Tracer),
|
||
})
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
} else {
|
||
// Check if the backend supports blob storage
|
||
blobstore, _ = opts.Backend.(BlobSupport)
|
||
}
|
||
}
|
||
|
||
logger := slog.Default().With("logger", "resource-server")
|
||
|
||
// Make this cancelable
|
||
ctx, cancel := context.WithCancel(context.Background())
|
||
s := &server{
|
||
tracer: opts.Tracer,
|
||
log: logger,
|
||
backend: opts.Backend,
|
||
blob: blobstore,
|
||
diagnostics: opts.Diagnostics,
|
||
access: opts.AccessClient,
|
||
secure: opts.SecureValues,
|
||
writeHooks: opts.WriteHooks,
|
||
lifecycle: opts.Lifecycle,
|
||
now: opts.Now,
|
||
ctx: ctx,
|
||
cancel: cancel,
|
||
storageMetrics: opts.storageMetrics,
|
||
indexMetrics: opts.IndexMetrics,
|
||
maxPageSizeBytes: opts.MaxPageSizeBytes,
|
||
reg: opts.Reg,
|
||
queue: opts.QOSQueue,
|
||
queueConfig: opts.QOSConfig,
|
||
}
|
||
|
||
if opts.Search.Resources != nil {
|
||
var err error
|
||
s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.Tracer, opts.IndexMetrics, opts.OwnsIndexFn)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
}
|
||
|
||
err := s.Init(ctx)
|
||
if err != nil {
|
||
s.log.Error("resource server init failed", "error", err)
|
||
return nil, err
|
||
}
|
||
|
||
return s, nil
|
||
}
|
||
|
||
var _ ResourceServer = &server{}
|
||
|
||
type server struct {
|
||
tracer trace.Tracer
|
||
log *slog.Logger
|
||
backend StorageBackend
|
||
blob BlobSupport
|
||
secure secrets.InlineSecureValueSupport
|
||
search *searchSupport
|
||
diagnostics resourcepb.DiagnosticsServer
|
||
access claims.AccessClient
|
||
writeHooks WriteAccessHooks
|
||
lifecycle LifecycleHooks
|
||
now func() int64
|
||
mostRecentRV atomic.Int64 // The most recent resource version seen by the server
|
||
storageMetrics *StorageMetrics
|
||
indexMetrics *BleveIndexMetrics
|
||
|
||
// Background watch task -- this has permissions for everything
|
||
ctx context.Context
|
||
cancel context.CancelFunc
|
||
broadcaster Broadcaster[*WrittenEvent]
|
||
|
||
// init checking
|
||
once sync.Once
|
||
initErr error
|
||
|
||
maxPageSizeBytes int
|
||
reg prometheus.Registerer
|
||
queue QOSEnqueuer
|
||
queueConfig QueueConfig
|
||
}
|
||
|
||
// Init implements ResourceServer.
|
||
func (s *server) Init(ctx context.Context) error {
|
||
s.once.Do(func() {
|
||
// Call lifecycle hooks
|
||
if s.lifecycle != nil {
|
||
err := s.lifecycle.Init(ctx)
|
||
if err != nil {
|
||
s.initErr = fmt.Errorf("initialize Resource Server: %w", err)
|
||
}
|
||
}
|
||
|
||
// initialize the search index
|
||
if s.initErr == nil && s.search != nil {
|
||
s.initErr = s.search.init(ctx)
|
||
}
|
||
|
||
// Start watching for changes
|
||
if s.initErr == nil {
|
||
s.initErr = s.initWatcher()
|
||
}
|
||
|
||
if s.initErr != nil {
|
||
s.log.Error("error running resource server init", "error", s.initErr)
|
||
}
|
||
})
|
||
return s.initErr
|
||
}
|
||
|
||
func (s *server) Stop(ctx context.Context) error {
|
||
s.initErr = fmt.Errorf("service is stopping")
|
||
|
||
var stopFailed bool
|
||
if s.lifecycle != nil {
|
||
err := s.lifecycle.Stop(ctx)
|
||
if err != nil {
|
||
stopFailed = true
|
||
s.initErr = fmt.Errorf("service stopeed with error: %w", err)
|
||
}
|
||
}
|
||
|
||
// Stops the streaming
|
||
s.cancel()
|
||
|
||
// mark the value as done
|
||
if stopFailed {
|
||
return s.initErr
|
||
}
|
||
s.initErr = fmt.Errorf("service is stopped")
|
||
|
||
return nil
|
||
}
|
||
|
||
// Old value indicates an update -- otherwise a create
|
||
//
|
||
//nolint:gocyclo
|
||
func (s *server) newEvent(ctx context.Context, user claims.AuthInfo, key *resourcepb.ResourceKey, value, oldValue []byte) (*WriteEvent, *resourcepb.ErrorResult) {
|
||
tmp := &unstructured.Unstructured{}
|
||
err := tmp.UnmarshalJSON(value)
|
||
if err != nil {
|
||
return nil, AsErrorResult(err)
|
||
}
|
||
obj, err := utils.MetaAccessor(tmp)
|
||
if err != nil {
|
||
return nil, AsErrorResult(err)
|
||
}
|
||
|
||
if obj.GetUID() == "" {
|
||
// TODO! once https://github.com/grafana/grafana/pull/96086 is deployed everywhere
|
||
// return nil, NewBadRequestError("object is missing UID")
|
||
s.log.Error("object is missing UID", "key", key)
|
||
}
|
||
|
||
if obj.GetResourceVersion() != "" {
|
||
s.log.Error("object must not include a resource version", "key", key)
|
||
}
|
||
|
||
// Make sure the command labels are not saved
|
||
for k := range obj.GetLabels() {
|
||
if k == utils.LabelKeyGetHistory || k == utils.LabelKeyGetTrash || k == utils.LabelGetFullpath {
|
||
return nil, NewBadRequestError("can not save label: " + k)
|
||
}
|
||
}
|
||
|
||
if obj.GetAnnotation(utils.AnnoKeyGrantPermissions) != "" {
|
||
return nil, NewBadRequestError("can not save annotation: " + utils.AnnoKeyGrantPermissions)
|
||
}
|
||
|
||
event := &WriteEvent{
|
||
Value: value,
|
||
Key: key,
|
||
Object: obj,
|
||
GUID: uuid.New().String(),
|
||
}
|
||
|
||
if oldValue == nil {
|
||
event.Type = resourcepb.WatchEvent_ADDED
|
||
} else {
|
||
event.Type = resourcepb.WatchEvent_MODIFIED
|
||
|
||
temp := &unstructured.Unstructured{}
|
||
err = temp.UnmarshalJSON(oldValue)
|
||
if err != nil {
|
||
return nil, AsErrorResult(err)
|
||
}
|
||
event.ObjectOld, err = utils.MetaAccessor(temp)
|
||
if err != nil {
|
||
return nil, AsErrorResult(err)
|
||
}
|
||
}
|
||
|
||
// Verify that this resource can reference secure values
|
||
if err := canReferenceSecureValues(ctx, obj, event.ObjectOld, s.secure); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
if key.Namespace != obj.GetNamespace() {
|
||
return nil, NewBadRequestError("key/namespace do not match")
|
||
}
|
||
|
||
gvk := obj.GetGroupVersionKind()
|
||
if gvk.Kind == "" {
|
||
return nil, NewBadRequestError("expecting resources with a kind in the body")
|
||
}
|
||
if gvk.Version == "" {
|
||
return nil, NewBadRequestError("expecting resources with an apiVersion")
|
||
}
|
||
if gvk.Group != "" && gvk.Group != key.Group {
|
||
return nil, NewBadRequestError(
|
||
fmt.Sprintf("group in key does not match group in the body (%s != %s)", key.Group, gvk.Group),
|
||
)
|
||
}
|
||
|
||
// This needs to be a create function
|
||
if key.Name == "" {
|
||
if obj.GetName() == "" {
|
||
return nil, NewBadRequestError("missing name")
|
||
}
|
||
key.Name = obj.GetName()
|
||
} else if key.Name != obj.GetName() {
|
||
return nil, NewBadRequestError(
|
||
fmt.Sprintf("key/name do not match (key: %s, name: %s)", key.Name, obj.GetName()))
|
||
}
|
||
if err := validateName(obj.GetName()); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// For folder moves, we need to check permissions on both folders
|
||
if s.isFolderMove(event) {
|
||
if err := s.checkFolderMovePermissions(ctx, user, key, event.ObjectOld.GetFolder(), obj.GetFolder()); err != nil {
|
||
return nil, err
|
||
}
|
||
} else {
|
||
// Regular permission check for create/update
|
||
check := claims.CheckRequest{
|
||
Verb: utils.VerbCreate,
|
||
Group: key.Group,
|
||
Resource: key.Resource,
|
||
Namespace: key.Namespace,
|
||
}
|
||
|
||
if event.Type == resourcepb.WatchEvent_MODIFIED {
|
||
check.Verb = utils.VerbUpdate
|
||
check.Name = key.Name
|
||
}
|
||
|
||
check.Folder = obj.GetFolder()
|
||
a, err := s.access.Check(ctx, user, check)
|
||
if err != nil {
|
||
return nil, AsErrorResult(err)
|
||
}
|
||
if !a.Allowed {
|
||
return nil, &resourcepb.ErrorResult{
|
||
Code: http.StatusForbidden,
|
||
}
|
||
}
|
||
}
|
||
|
||
m, ok := obj.GetManagerProperties()
|
||
if ok && m.Kind == utils.ManagerKindRepo {
|
||
err = s.writeHooks.CanWriteValueFromRepository(ctx, user, m.Identity)
|
||
if err != nil {
|
||
return nil, AsErrorResult(err)
|
||
}
|
||
}
|
||
return event, nil
|
||
}
|
||
|
||
// isFolderMove determines if an event represents a resource being moved between folders
|
||
func (s *server) isFolderMove(event *WriteEvent) bool {
|
||
return event.Type == resourcepb.WatchEvent_MODIFIED &&
|
||
event.ObjectOld != nil &&
|
||
event.ObjectOld.GetFolder() != event.Object.GetFolder()
|
||
}
|
||
|
||
// checkFolderMovePermissions handles permission checks when a resource is being moved between folders
|
||
func (s *server) checkFolderMovePermissions(ctx context.Context, user claims.AuthInfo, key *resourcepb.ResourceKey, oldFolder, newFolder string) *resourcepb.ErrorResult {
|
||
// First check if user can update the resource in the original folder
|
||
updateCheck := claims.CheckRequest{
|
||
Verb: utils.VerbUpdate,
|
||
Group: key.Group,
|
||
Resource: key.Resource,
|
||
Namespace: key.Namespace,
|
||
Name: key.Name,
|
||
Folder: oldFolder,
|
||
}
|
||
|
||
a, err := s.access.Check(ctx, user, updateCheck)
|
||
if err != nil {
|
||
return AsErrorResult(err)
|
||
}
|
||
if !a.Allowed {
|
||
return &resourcepb.ErrorResult{
|
||
Code: http.StatusForbidden,
|
||
Message: "not allowed to update resource in the source folder",
|
||
}
|
||
}
|
||
|
||
// Then check if user can create the resource in the destination folder
|
||
createCheck := claims.CheckRequest{
|
||
Verb: utils.VerbCreate,
|
||
Group: key.Group,
|
||
Resource: key.Resource,
|
||
Namespace: key.Namespace,
|
||
Folder: newFolder,
|
||
}
|
||
|
||
a, err = s.access.Check(ctx, user, createCheck)
|
||
if err != nil {
|
||
return AsErrorResult(err)
|
||
}
|
||
if !a.Allowed {
|
||
return &resourcepb.ErrorResult{
|
||
Code: http.StatusForbidden,
|
||
Message: "not allowed to create resource in the destination folder",
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func (s *server) Create(ctx context.Context, req *resourcepb.CreateRequest) (*resourcepb.CreateResponse, error) {
|
||
ctx, span := s.tracer.Start(ctx, "storage_server.Create")
|
||
defer span.End()
|
||
|
||
rsp := &resourcepb.CreateResponse{}
|
||
user, ok := claims.AuthInfoFrom(ctx)
|
||
if !ok || user == nil {
|
||
rsp.Error = &resourcepb.ErrorResult{
|
||
Message: "no user found in context",
|
||
Code: http.StatusUnauthorized,
|
||
}
|
||
return rsp, nil
|
||
}
|
||
|
||
var (
|
||
res *resourcepb.CreateResponse
|
||
err error
|
||
)
|
||
runErr := s.runInQueue(ctx, req.Key.Namespace, func(queueCtx context.Context) {
|
||
res, err = s.create(queueCtx, user, req)
|
||
})
|
||
if runErr != nil {
|
||
return HandleQueueError(runErr, func(e *resourcepb.ErrorResult) *resourcepb.CreateResponse {
|
||
return &resourcepb.CreateResponse{Error: e}
|
||
})
|
||
}
|
||
|
||
return res, err
|
||
}
|
||
|
||
func (s *server) create(ctx context.Context, user claims.AuthInfo, req *resourcepb.CreateRequest) (*resourcepb.CreateResponse, error) {
|
||
rsp := &resourcepb.CreateResponse{}
|
||
|
||
event, e := s.newEvent(ctx, user, req.Key, req.Value, nil)
|
||
if e != nil {
|
||
rsp.Error = e
|
||
return rsp, nil
|
||
}
|
||
|
||
// If the resource already exists, the create will return an already exists error that is remapped appropriately by AsErrorResult.
|
||
// This also benefits from ACID behaviours on our databases, so we avoid race conditions.
|
||
var err error
|
||
rsp.ResourceVersion, err = s.backend.WriteEvent(ctx, *event)
|
||
if err != nil {
|
||
rsp.Error = AsErrorResult(err)
|
||
}
|
||
s.log.Debug("server.WriteEvent", "type", event.Type, "rv", rsp.ResourceVersion, "previousRV", event.PreviousRV, "group", event.Key.Group, "namespace", event.Key.Namespace, "name", event.Key.Name, "resource", event.Key.Resource)
|
||
return rsp, nil
|
||
}
|
||
|
||
func (s *server) Update(ctx context.Context, req *resourcepb.UpdateRequest) (*resourcepb.UpdateResponse, error) {
|
||
ctx, span := s.tracer.Start(ctx, "storage_server.Update")
|
||
defer span.End()
|
||
|
||
rsp := &resourcepb.UpdateResponse{}
|
||
user, ok := claims.AuthInfoFrom(ctx)
|
||
if !ok || user == nil {
|
||
rsp.Error = &resourcepb.ErrorResult{
|
||
Message: "no user found in context",
|
||
Code: http.StatusUnauthorized,
|
||
}
|
||
return rsp, nil
|
||
}
|
||
if req.ResourceVersion < 0 {
|
||
rsp.Error = AsErrorResult(apierrors.NewBadRequest("update must include the previous version"))
|
||
return rsp, nil
|
||
}
|
||
|
||
var (
|
||
res *resourcepb.UpdateResponse
|
||
err error
|
||
)
|
||
runErr := s.runInQueue(ctx, req.Key.Namespace, func(queueCtx context.Context) {
|
||
res, err = s.update(queueCtx, user, req)
|
||
})
|
||
if runErr != nil {
|
||
return HandleQueueError(runErr, func(e *resourcepb.ErrorResult) *resourcepb.UpdateResponse {
|
||
return &resourcepb.UpdateResponse{Error: e}
|
||
})
|
||
}
|
||
|
||
return res, err
|
||
}
|
||
|
||
func (s *server) update(ctx context.Context, user claims.AuthInfo, req *resourcepb.UpdateRequest) (*resourcepb.UpdateResponse, error) {
|
||
rsp := &resourcepb.UpdateResponse{}
|
||
latest := s.backend.ReadResource(ctx, &resourcepb.ReadRequest{
|
||
Key: req.Key,
|
||
})
|
||
if latest.Error != nil {
|
||
return rsp, nil
|
||
}
|
||
if latest.Value == nil {
|
||
rsp.Error = NewBadRequestError("current value does not exist")
|
||
return rsp, nil
|
||
}
|
||
|
||
if req.ResourceVersion > 0 && latest.ResourceVersion != req.ResourceVersion {
|
||
return nil, ErrOptimisticLockingFailed
|
||
}
|
||
|
||
event, e := s.newEvent(ctx, user, req.Key, req.Value, latest.Value)
|
||
if e != nil {
|
||
rsp.Error = e
|
||
return rsp, nil
|
||
}
|
||
|
||
event.Type = resourcepb.WatchEvent_MODIFIED
|
||
event.PreviousRV = latest.ResourceVersion
|
||
|
||
var err error
|
||
rsp.ResourceVersion, err = s.backend.WriteEvent(ctx, *event)
|
||
if err != nil {
|
||
rsp.Error = AsErrorResult(err)
|
||
}
|
||
return rsp, nil
|
||
}
|
||
|
||
func (s *server) Delete(ctx context.Context, req *resourcepb.DeleteRequest) (*resourcepb.DeleteResponse, error) {
|
||
ctx, span := s.tracer.Start(ctx, "storage_server.Delete")
|
||
defer span.End()
|
||
|
||
rsp := &resourcepb.DeleteResponse{}
|
||
if req.ResourceVersion < 0 {
|
||
return nil, apierrors.NewBadRequest("update must include the previous version")
|
||
}
|
||
user, ok := claims.AuthInfoFrom(ctx)
|
||
if !ok || user == nil {
|
||
rsp.Error = &resourcepb.ErrorResult{
|
||
Message: "no user found in context",
|
||
Code: http.StatusUnauthorized,
|
||
}
|
||
return rsp, nil
|
||
}
|
||
|
||
var (
|
||
res *resourcepb.DeleteResponse
|
||
err error
|
||
)
|
||
|
||
runErr := s.runInQueue(ctx, req.Key.Namespace, func(queueCtx context.Context) {
|
||
res, err = s.delete(queueCtx, user, req)
|
||
})
|
||
if runErr != nil {
|
||
return HandleQueueError(runErr, func(e *resourcepb.ErrorResult) *resourcepb.DeleteResponse {
|
||
return &resourcepb.DeleteResponse{Error: e}
|
||
})
|
||
}
|
||
|
||
return res, err
|
||
}
|
||
|
||
func (s *server) delete(ctx context.Context, user claims.AuthInfo, req *resourcepb.DeleteRequest) (*resourcepb.DeleteResponse, error) {
|
||
rsp := &resourcepb.DeleteResponse{}
|
||
latest := s.backend.ReadResource(ctx, &resourcepb.ReadRequest{
|
||
Key: req.Key,
|
||
})
|
||
if latest.Error != nil {
|
||
rsp.Error = latest.Error
|
||
return rsp, nil
|
||
}
|
||
if req.ResourceVersion > 0 && latest.ResourceVersion != req.ResourceVersion {
|
||
rsp.Error = AsErrorResult(ErrOptimisticLockingFailed)
|
||
return rsp, nil
|
||
}
|
||
|
||
access, err := s.access.Check(ctx, user, claims.CheckRequest{
|
||
Verb: "delete",
|
||
Group: req.Key.Group,
|
||
Resource: req.Key.Resource,
|
||
Namespace: req.Key.Namespace,
|
||
Name: req.Key.Name,
|
||
Folder: latest.Folder,
|
||
})
|
||
if err != nil {
|
||
rsp.Error = AsErrorResult(err)
|
||
return rsp, nil
|
||
}
|
||
if !access.Allowed {
|
||
rsp.Error = &resourcepb.ErrorResult{
|
||
Code: http.StatusForbidden,
|
||
}
|
||
return rsp, nil
|
||
}
|
||
|
||
now := metav1.NewTime(time.UnixMilli(s.now()))
|
||
event := WriteEvent{
|
||
Key: req.Key,
|
||
Type: resourcepb.WatchEvent_DELETED,
|
||
PreviousRV: latest.ResourceVersion,
|
||
GUID: uuid.New().String(),
|
||
}
|
||
marker := &unstructured.Unstructured{}
|
||
err = json.Unmarshal(latest.Value, marker)
|
||
if err != nil {
|
||
return nil, apierrors.NewBadRequest(
|
||
fmt.Sprintf("unable to read previous object, %v", err))
|
||
}
|
||
oldObj, err := utils.MetaAccessor(marker)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
obj, err := utils.MetaAccessor(marker)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
obj.SetDeletionTimestamp(&now)
|
||
obj.SetUpdatedTimestamp(&now.Time)
|
||
obj.SetManagedFields(nil)
|
||
obj.SetFinalizers(nil)
|
||
obj.SetUpdatedBy(user.GetUID())
|
||
obj.SetGeneration(utils.DeletedGeneration)
|
||
obj.SetAnnotation(utils.AnnoKeyKubectlLastAppliedConfig, "") // clears it
|
||
event.ObjectOld = oldObj
|
||
event.Object = obj
|
||
event.Value, err = marker.MarshalJSON()
|
||
if err != nil {
|
||
return nil, apierrors.NewBadRequest(
|
||
fmt.Sprintf("unable creating deletion marker, %v", err))
|
||
}
|
||
|
||
rsp.ResourceVersion, err = s.backend.WriteEvent(ctx, event)
|
||
if err != nil {
|
||
rsp.Error = AsErrorResult(err)
|
||
}
|
||
return rsp, nil
|
||
}
|
||
|
||
func (s *server) Read(ctx context.Context, req *resourcepb.ReadRequest) (*resourcepb.ReadResponse, error) {
|
||
user, ok := claims.AuthInfoFrom(ctx)
|
||
if !ok || user == nil {
|
||
return &resourcepb.ReadResponse{
|
||
Error: &resourcepb.ErrorResult{
|
||
Message: "no user found in context",
|
||
Code: http.StatusUnauthorized,
|
||
}}, nil
|
||
}
|
||
|
||
// if req.Key.Group == "" {
|
||
// status, _ := AsErrorResult(apierrors.NewBadRequest("missing group"))
|
||
// return &ReadResponse{Status: status}, nil
|
||
// }
|
||
if req.Key.Resource == "" {
|
||
return &resourcepb.ReadResponse{Error: NewBadRequestError("missing resource")}, nil
|
||
}
|
||
|
||
var (
|
||
res *resourcepb.ReadResponse
|
||
err error
|
||
)
|
||
runErr := s.runInQueue(ctx, req.Key.Namespace, func(queueCtx context.Context) {
|
||
res, err = s.read(queueCtx, user, req)
|
||
})
|
||
if runErr != nil {
|
||
return HandleQueueError(runErr, func(e *resourcepb.ErrorResult) *resourcepb.ReadResponse {
|
||
return &resourcepb.ReadResponse{Error: e}
|
||
})
|
||
}
|
||
|
||
return res, err
|
||
}
|
||
|
||
func (s *server) read(ctx context.Context, user claims.AuthInfo, req *resourcepb.ReadRequest) (*resourcepb.ReadResponse, error) {
|
||
rsp := s.backend.ReadResource(ctx, req)
|
||
if rsp.Error != nil && rsp.Error.Code == http.StatusNotFound {
|
||
return &resourcepb.ReadResponse{Error: rsp.Error}, nil
|
||
}
|
||
|
||
a, err := s.access.Check(ctx, user, claims.CheckRequest{
|
||
Verb: "get",
|
||
Group: req.Key.Group,
|
||
Resource: req.Key.Resource,
|
||
Namespace: req.Key.Namespace,
|
||
Name: req.Key.Name,
|
||
Folder: rsp.Folder,
|
||
})
|
||
if err != nil {
|
||
return &resourcepb.ReadResponse{Error: AsErrorResult(err)}, nil
|
||
}
|
||
if !a.Allowed {
|
||
return &resourcepb.ReadResponse{
|
||
Error: &resourcepb.ErrorResult{
|
||
Code: http.StatusForbidden,
|
||
}}, nil
|
||
}
|
||
return &resourcepb.ReadResponse{
|
||
ResourceVersion: rsp.ResourceVersion,
|
||
Value: rsp.Value,
|
||
Error: rsp.Error,
|
||
}, nil
|
||
}
|
||
|
||
func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resourcepb.ListResponse, error) {
|
||
ctx, span := s.tracer.Start(ctx, "storage_server.List")
|
||
defer span.End()
|
||
|
||
// The history + trash queries do not yet support additional filters
|
||
if req.Source != resourcepb.ListRequest_STORE {
|
||
if len(req.Options.Fields) > 0 || len(req.Options.Labels) > 0 {
|
||
return &resourcepb.ListResponse{
|
||
Error: NewBadRequestError("unexpected field/label selector for history query"),
|
||
}, nil
|
||
}
|
||
}
|
||
|
||
user, ok := claims.AuthInfoFrom(ctx)
|
||
if !ok || user == nil {
|
||
return &resourcepb.ListResponse{
|
||
Error: &resourcepb.ErrorResult{
|
||
Message: "no user found in context",
|
||
Code: http.StatusUnauthorized,
|
||
}}, nil
|
||
}
|
||
|
||
// Do not allow label query for trash/history
|
||
for _, v := range req.Options.Labels {
|
||
if v.Key == utils.LabelKeyGetHistory || v.Key == utils.LabelKeyGetTrash {
|
||
return &resourcepb.ListResponse{Error: NewBadRequestError("history and trash must be requested as source")}, nil
|
||
}
|
||
}
|
||
|
||
if req.Limit < 1 {
|
||
req.Limit = 500 // default max 500 items in a page
|
||
}
|
||
maxPageBytes := s.maxPageSizeBytes
|
||
pageBytes := 0
|
||
rsp := &resourcepb.ListResponse{}
|
||
|
||
key := req.Options.Key
|
||
checker, _, err := s.access.Compile(ctx, user, claims.ListRequest{
|
||
Group: key.Group,
|
||
Resource: key.Resource,
|
||
Namespace: key.Namespace,
|
||
Verb: utils.VerbGet,
|
||
})
|
||
var trashChecker claims.ItemChecker // only for trash
|
||
if req.Source == resourcepb.ListRequest_TRASH {
|
||
trashChecker, _, err = s.access.Compile(ctx, user, claims.ListRequest{
|
||
Group: key.Group,
|
||
Resource: key.Resource,
|
||
Namespace: key.Namespace,
|
||
Verb: utils.VerbSetPermissions, // Basically Admin
|
||
})
|
||
if err != nil {
|
||
return &resourcepb.ListResponse{Error: AsErrorResult(err)}, nil
|
||
}
|
||
}
|
||
if err != nil {
|
||
return &resourcepb.ListResponse{Error: AsErrorResult(err)}, nil
|
||
}
|
||
if checker == nil {
|
||
return &resourcepb.ListResponse{Error: &resourcepb.ErrorResult{
|
||
Code: http.StatusForbidden,
|
||
}}, nil
|
||
}
|
||
|
||
iterFunc := func(iter ListIterator) error {
|
||
for iter.Next() {
|
||
if err := iter.Error(); err != nil {
|
||
return err
|
||
}
|
||
|
||
item := &resourcepb.ResourceWrapper{
|
||
ResourceVersion: iter.ResourceVersion(),
|
||
Value: iter.Value(),
|
||
}
|
||
// Trash is only accessible to admins or the user who deleted the object
|
||
if req.Source == resourcepb.ListRequest_TRASH {
|
||
if !s.isTrashItemAuthorized(ctx, iter, trashChecker) {
|
||
continue
|
||
}
|
||
} else if !checker(iter.Name(), iter.Folder()) {
|
||
continue
|
||
}
|
||
|
||
pageBytes += len(item.Value)
|
||
rsp.Items = append(rsp.Items, item)
|
||
if len(rsp.Items) >= int(req.Limit) || pageBytes >= maxPageBytes {
|
||
t := iter.ContinueToken()
|
||
if iter.Next() {
|
||
rsp.NextPageToken = t
|
||
}
|
||
return iter.Error()
|
||
}
|
||
}
|
||
return iter.Error()
|
||
}
|
||
|
||
var rv int64
|
||
switch req.Source {
|
||
case resourcepb.ListRequest_STORE:
|
||
rv, err = s.backend.ListIterator(ctx, req, iterFunc)
|
||
case resourcepb.ListRequest_HISTORY, resourcepb.ListRequest_TRASH:
|
||
rv, err = s.backend.ListHistory(ctx, req, iterFunc)
|
||
default:
|
||
return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid list source: %v", req.Source))
|
||
}
|
||
|
||
if err != nil {
|
||
rsp.Error = AsErrorResult(err)
|
||
return rsp, nil
|
||
}
|
||
|
||
if rv < 1 {
|
||
rsp.Error = &resourcepb.ErrorResult{
|
||
Code: http.StatusInternalServerError,
|
||
Message: fmt.Sprintf("invalid resource version for list: %v", rv),
|
||
}
|
||
return rsp, nil
|
||
}
|
||
rsp.ResourceVersion = rv
|
||
return rsp, err
|
||
}
|
||
|
||
// isTrashItemAuthorized checks if the user has access to the trash item.
|
||
func (s *server) isTrashItemAuthorized(ctx context.Context, iter ListIterator, trashChecker claims.ItemChecker) bool {
|
||
user, ok := claims.AuthInfoFrom(ctx)
|
||
if !ok || user == nil {
|
||
return false
|
||
}
|
||
|
||
partial := &metav1.PartialObjectMetadata{}
|
||
err := json.Unmarshal(iter.Value(), partial)
|
||
if err != nil {
|
||
return false
|
||
}
|
||
|
||
obj, err := utils.MetaAccessor(partial)
|
||
if err != nil {
|
||
return false
|
||
}
|
||
|
||
// Trash is only accessible to admins or the user who deleted the object
|
||
return obj.GetUpdatedBy() == user.GetUID() || trashChecker(iter.Name(), iter.Folder())
|
||
}
|
||
|
||
func (s *server) initWatcher() error {
|
||
var err error
|
||
s.broadcaster, err = NewBroadcaster(s.ctx, func(out chan<- *WrittenEvent) error {
|
||
events, err := s.backend.WatchWriteEvents(s.ctx)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
go func() {
|
||
for v := range events {
|
||
if v == nil {
|
||
s.log.Error("received nil event")
|
||
continue
|
||
}
|
||
// Skip events during batch updates
|
||
if v.PreviousRV < 0 {
|
||
continue
|
||
}
|
||
|
||
s.log.Debug("Server. Streaming Event", "type", v.Type, "previousRV", v.PreviousRV, "group", v.Key.Group, "namespace", v.Key.Namespace, "resource", v.Key.Resource, "name", v.Key.Name)
|
||
s.mostRecentRV.Store(v.ResourceVersion)
|
||
out <- v
|
||
}
|
||
}()
|
||
return nil
|
||
})
|
||
return err
|
||
}
|
||
|
||
//nolint:gocyclo
|
||
func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStore_WatchServer) error {
|
||
ctx := srv.Context()
|
||
|
||
user, ok := claims.AuthInfoFrom(ctx)
|
||
if !ok || user == nil {
|
||
return apierrors.NewUnauthorized("no user found in context")
|
||
}
|
||
|
||
key := req.Options.Key
|
||
checker, _, err := s.access.Compile(ctx, user, claims.ListRequest{
|
||
Group: key.Group,
|
||
Resource: key.Resource,
|
||
Namespace: key.Namespace,
|
||
Verb: utils.VerbGet,
|
||
})
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if checker == nil {
|
||
return apierrors.NewUnauthorized("not allowed to list anything") // ?? or a single error?
|
||
}
|
||
|
||
// Start listening -- this will buffer any changes that happen while we backfill.
|
||
// If events are generated faster than we can process them, then some events will be dropped.
|
||
// TODO: Think of a way to allow the client to catch up.
|
||
stream, err := s.broadcaster.Subscribe(ctx)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer s.broadcaster.Unsubscribe(stream)
|
||
|
||
// Determine a safe starting resource-version for the watch.
|
||
// When the client requests SendInitialEvents we will use the resource-version
|
||
// of the last object returned from the initial list (handled below).
|
||
// When the client supplies an explicit `since` we honour that.
|
||
// In the remaining case (SendInitialEvents == false && since == 0) we need
|
||
// a high-water-mark representing the current state of storage so that we
|
||
// donʼt replay events that happened before the watch was established. Using
|
||
// `mostRecentRV` – which is updated asynchronously by the broadcaster – is
|
||
// subject to races because the broadcaster may not yet have observed the
|
||
// latest committed writes. Instead we ask the backend directly for the
|
||
// current resource-version.
|
||
var mostRecentRV int64
|
||
if !req.SendInitialEvents && req.Since == 0 {
|
||
// We only need the current RV. A cheap way to obtain it is to issue a
|
||
// List with a very small limit and read the listRV returned by the
|
||
// iterator. The callback is a no-op so we avoid materialising any
|
||
// items.
|
||
listReq := &resourcepb.ListRequest{
|
||
Options: req.Options,
|
||
// This has right now no effect, as the list request only uses the limit if it lists from history or trash.
|
||
// It might be worth adding it in a subsequent PR. We only list once during setup of the watch, so it's
|
||
// fine for now.
|
||
Limit: 1,
|
||
}
|
||
|
||
rv, err := s.backend.ListIterator(ctx, listReq, func(ListIterator) error { return nil })
|
||
if err != nil {
|
||
// Fallback to the broadcasterʼs view if the backend lookup fails.
|
||
// This preserves previous behaviour while still eliminating the
|
||
// common race in the majority of cases.
|
||
s.log.Warn("watch: failed to fetch current RV from backend, falling back to broadcaster", "err", err)
|
||
mostRecentRV = s.mostRecentRV.Load()
|
||
} else {
|
||
mostRecentRV = rv
|
||
}
|
||
} else {
|
||
// For all other code-paths we either already have an explicit RV or we
|
||
// will derive it from the initial list below.
|
||
mostRecentRV = s.mostRecentRV.Load()
|
||
}
|
||
|
||
var initialEventsRV int64 // resource version coming from the initial events
|
||
if req.SendInitialEvents {
|
||
// Backfill the stream by adding every existing entities.
|
||
initialEventsRV, err = s.backend.ListIterator(ctx, &resourcepb.ListRequest{Options: req.Options}, func(iter ListIterator) error {
|
||
for iter.Next() {
|
||
if err := iter.Error(); err != nil {
|
||
return err
|
||
}
|
||
if err := srv.Send(&resourcepb.WatchEvent{
|
||
Type: resourcepb.WatchEvent_ADDED,
|
||
Resource: &resourcepb.WatchEvent_Resource{
|
||
Value: iter.Value(),
|
||
Version: iter.ResourceVersion(),
|
||
},
|
||
}); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return iter.Error()
|
||
})
|
||
if err != nil {
|
||
return err
|
||
}
|
||
}
|
||
if req.SendInitialEvents && req.AllowWatchBookmarks {
|
||
if err := srv.Send(&resourcepb.WatchEvent{
|
||
Type: resourcepb.WatchEvent_BOOKMARK,
|
||
Resource: &resourcepb.WatchEvent_Resource{
|
||
Version: initialEventsRV,
|
||
},
|
||
}); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
|
||
var since int64 // resource version to start watching from
|
||
switch {
|
||
case req.SendInitialEvents:
|
||
since = initialEventsRV
|
||
case req.Since == 0:
|
||
since = mostRecentRV
|
||
default:
|
||
since = req.Since
|
||
}
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return nil
|
||
|
||
case event, ok := <-stream:
|
||
if !ok {
|
||
s.log.Debug("watch events closed")
|
||
return nil
|
||
}
|
||
s.log.Debug("Server Broadcasting", "type", event.Type, "rv", event.ResourceVersion, "previousRV", event.PreviousRV, "group", event.Key.Group, "namespace", event.Key.Namespace, "resource", event.Key.Resource, "name", event.Key.Name)
|
||
if event.ResourceVersion > since && matchesQueryKey(req.Options.Key, event.Key) {
|
||
if !checker(event.Key.Name, event.Folder) {
|
||
continue
|
||
}
|
||
|
||
value := event.Value
|
||
// remove the delete marker stored in the value for deleted objects
|
||
if event.Type == resourcepb.WatchEvent_DELETED {
|
||
value = []byte{}
|
||
}
|
||
resp := &resourcepb.WatchEvent{
|
||
Timestamp: event.Timestamp,
|
||
Type: event.Type,
|
||
Resource: &resourcepb.WatchEvent_Resource{
|
||
Value: value,
|
||
Version: event.ResourceVersion,
|
||
},
|
||
}
|
||
if event.PreviousRV > 0 {
|
||
prevObj, err := s.Read(ctx, &resourcepb.ReadRequest{Key: event.Key, ResourceVersion: event.PreviousRV})
|
||
if err != nil {
|
||
// This scenario should never happen, but if it does, we should log it and continue
|
||
// sending the event without the previous object. The client will decide what to do.
|
||
s.log.Error("error reading previous object", "key", event.Key, "resource_version", event.PreviousRV, "error", prevObj.Error)
|
||
} else {
|
||
if prevObj.ResourceVersion != event.PreviousRV {
|
||
s.log.Error("resource version mismatch", "key", event.Key, "resource_version", event.PreviousRV, "actual", prevObj.ResourceVersion)
|
||
return fmt.Errorf("resource version mismatch")
|
||
}
|
||
resp.Previous = &resourcepb.WatchEvent_Resource{
|
||
Value: prevObj.Value,
|
||
Version: prevObj.ResourceVersion,
|
||
}
|
||
}
|
||
}
|
||
if err := srv.Send(resp); err != nil {
|
||
return err
|
||
}
|
||
|
||
if s.storageMetrics != nil {
|
||
// record latency - resource version is a unix timestamp in microseconds so we convert to seconds
|
||
latencySeconds := float64(time.Now().UnixMicro()-event.ResourceVersion) / 1e6
|
||
if latencySeconds > 0 {
|
||
s.storageMetrics.WatchEventLatency.WithLabelValues(event.Key.Resource).Observe(latencySeconds)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
func (s *server) Search(ctx context.Context, req *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
|
||
if s.search == nil {
|
||
return nil, fmt.Errorf("search index not configured")
|
||
}
|
||
|
||
return s.search.Search(ctx, req)
|
||
}
|
||
|
||
// GetStats implements ResourceServer.
|
||
func (s *server) GetStats(ctx context.Context, req *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
|
||
if err := s.Init(ctx); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
if s.search == nil {
|
||
// If the backend implements "GetStats", we can use it
|
||
srv, ok := s.backend.(resourcepb.ResourceIndexServer)
|
||
if ok {
|
||
return srv.GetStats(ctx, req)
|
||
}
|
||
return nil, fmt.Errorf("search index not configured")
|
||
}
|
||
return s.search.GetStats(ctx, req)
|
||
}
|
||
|
||
func (s *server) ListManagedObjects(ctx context.Context, req *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
|
||
if s.search == nil {
|
||
return nil, fmt.Errorf("search index not configured")
|
||
}
|
||
|
||
return s.search.ListManagedObjects(ctx, req)
|
||
}
|
||
|
||
func (s *server) CountManagedObjects(ctx context.Context, req *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) {
|
||
if s.search == nil {
|
||
return nil, fmt.Errorf("search index not configured")
|
||
}
|
||
|
||
return s.search.CountManagedObjects(ctx, req)
|
||
}
|
||
|
||
// IsHealthy implements ResourceServer.
|
||
func (s *server) IsHealthy(ctx context.Context, req *resourcepb.HealthCheckRequest) (*resourcepb.HealthCheckResponse, error) {
|
||
return s.diagnostics.IsHealthy(ctx, req)
|
||
}
|
||
|
||
// GetBlob implements BlobStore.
|
||
func (s *server) PutBlob(ctx context.Context, req *resourcepb.PutBlobRequest) (*resourcepb.PutBlobResponse, error) {
|
||
if s.blob == nil {
|
||
return &resourcepb.PutBlobResponse{Error: &resourcepb.ErrorResult{
|
||
Message: "blob store not configured",
|
||
Code: http.StatusNotImplemented,
|
||
}}, nil
|
||
}
|
||
|
||
rsp, err := s.blob.PutResourceBlob(ctx, req)
|
||
if err != nil {
|
||
rsp.Error = AsErrorResult(err)
|
||
}
|
||
return rsp, nil
|
||
}
|
||
|
||
func (s *server) getPartialObject(ctx context.Context, key *resourcepb.ResourceKey, rv int64) (utils.GrafanaMetaAccessor, *resourcepb.ErrorResult) {
|
||
if r := verifyRequestKey(key); r != nil {
|
||
return nil, r
|
||
}
|
||
|
||
rsp := s.backend.ReadResource(ctx, &resourcepb.ReadRequest{
|
||
Key: key,
|
||
ResourceVersion: rv,
|
||
})
|
||
if rsp.Error != nil {
|
||
return nil, rsp.Error
|
||
}
|
||
|
||
partial := &metav1.PartialObjectMetadata{}
|
||
err := json.Unmarshal(rsp.Value, partial)
|
||
if err != nil {
|
||
return nil, AsErrorResult(err)
|
||
}
|
||
obj, err := utils.MetaAccessor(partial)
|
||
if err != nil {
|
||
return nil, AsErrorResult(err)
|
||
}
|
||
return obj, nil
|
||
}
|
||
|
||
// GetBlob implements BlobStore.
|
||
func (s *server) GetBlob(ctx context.Context, req *resourcepb.GetBlobRequest) (*resourcepb.GetBlobResponse, error) {
|
||
if s.blob == nil {
|
||
return &resourcepb.GetBlobResponse{Error: &resourcepb.ErrorResult{
|
||
Message: "blob store not configured",
|
||
Code: http.StatusNotImplemented,
|
||
}}, nil
|
||
}
|
||
|
||
var info *utils.BlobInfo
|
||
if req.Uid == "" {
|
||
// The linked blob is stored in the resource metadata attributes
|
||
obj, status := s.getPartialObject(ctx, req.Resource, req.ResourceVersion)
|
||
if status != nil {
|
||
return &resourcepb.GetBlobResponse{Error: status}, nil
|
||
}
|
||
|
||
info = obj.GetBlob()
|
||
if info == nil || info.UID == "" {
|
||
return &resourcepb.GetBlobResponse{Error: &resourcepb.ErrorResult{
|
||
Message: "Resource does not have a linked blob",
|
||
Code: 404,
|
||
}}, nil
|
||
}
|
||
} else {
|
||
info = &utils.BlobInfo{UID: req.Uid}
|
||
}
|
||
|
||
rsp, err := s.blob.GetResourceBlob(ctx, req.Resource, info, req.MustProxyBytes)
|
||
if err != nil {
|
||
rsp.Error = AsErrorResult(err)
|
||
}
|
||
return rsp, nil
|
||
}
|
||
|
||
func (s *server) runInQueue(ctx context.Context, tenantID string, runnable func(ctx context.Context)) error {
|
||
// Enforce a timeout for the entire operation, including queueing and execution.
|
||
queueCtx, cancel := context.WithTimeout(ctx, s.queueConfig.Timeout)
|
||
defer cancel()
|
||
|
||
done := make(chan struct{})
|
||
wrappedRunnable := func() {
|
||
defer close(done)
|
||
runnable(queueCtx)
|
||
}
|
||
|
||
// Retry enqueueing with backoff, respecting the timeout context.
|
||
boff := backoff.New(queueCtx, backoff.Config{
|
||
MinBackoff: s.queueConfig.MinBackoff,
|
||
MaxBackoff: s.queueConfig.MaxBackoff,
|
||
MaxRetries: s.queueConfig.MaxRetries,
|
||
})
|
||
|
||
for {
|
||
err := s.queue.Enqueue(queueCtx, tenantID, wrappedRunnable)
|
||
if err == nil {
|
||
// Successfully enqueued.
|
||
break
|
||
}
|
||
|
||
s.log.Warn("failed to enqueue runnable, retrying", "tenantID", tenantID, "error", err)
|
||
if !boff.Ongoing() {
|
||
// Backoff finished (retries exhausted or context canceled).
|
||
return fmt.Errorf("failed to enqueue for tenant %s: %w", tenantID, err)
|
||
}
|
||
boff.Wait()
|
||
}
|
||
|
||
// Wait for the runnable to complete or for the context to be done.
|
||
select {
|
||
case <-done:
|
||
return nil // Completed successfully.
|
||
case <-queueCtx.Done():
|
||
return queueCtx.Err() // Timed out or canceled while waiting for execution.
|
||
}
|
||
}
|