Files
grafana/pkg/storage/unified/resource/bulk.go
T
Will Assis ae4d2324d6 Unified storage kvstore bulk import support (#113791)
* implement batchdelete in datastore

* implement bulkprocess in kv storage_backend

* convert bulkRVs to snowflake
2025-12-04 09:26:50 -05:00

416 lines
10 KiB
Go

package resource
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/metadata"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
authlib "github.com/grafana/authlib/types"
"github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
)
const grpcMetaKeyCollection = "x-gf-batch-collection"
const grpcMetaKeyRebuildCollection = "x-gf-batch-rebuild-collection"
const grpcMetaKeySkipValidation = "x-gf-batch-skip-validation"
// Logged in trace.
var metadataKeys = []string{
grpcMetaKeyCollection,
grpcMetaKeyRebuildCollection,
grpcMetaKeySkipValidation,
}
func grpcMetaValueIsTrue(vals []string) bool {
return len(vals) == 1 && vals[0] == "true"
}
type BulkRequestIterator interface {
// Next advances the iterator to the next element if one exists.
Next() bool
// Request returns the current element. Only valid after Next() returns true.
Request() *resourcepb.BulkRequest
// RollbackRequested returns true if there was an error advancing the iterator. Checked after Next() returns true.
RollbackRequested() bool
}
type BulkProcessingBackend interface {
ProcessBulk(ctx context.Context, setting BulkSettings, iter BulkRequestIterator) *resourcepb.BulkResponse
}
type BulkResourceWriter interface {
io.Closer
Write(ctx context.Context, key *resourcepb.ResourceKey, value []byte) error
// Called when finished writing
CloseWithResults() (*resourcepb.BulkResponse, error)
}
type BulkSettings struct {
// All requests will be within this namespace/group/resource
Collection []*resourcepb.ResourceKey
// The batch will include everything from the collection
// - all existing values will be removed/replaced if the batch completes successfully
RebuildCollection bool
// The byte[] payload and folder has already been validated - no need to decode and verify
SkipValidation bool
}
func (x *BulkSettings) ToMD() metadata.MD {
md := make(metadata.MD)
if len(x.Collection) > 0 {
for _, v := range x.Collection {
md[grpcMetaKeyCollection] = append(md[grpcMetaKeyCollection], SearchID(v))
}
}
if x.RebuildCollection {
md[grpcMetaKeyRebuildCollection] = []string{"true"}
}
if x.SkipValidation {
md[grpcMetaKeySkipValidation] = []string{"true"}
}
return md
}
func NewBulkSettings(md metadata.MD) (BulkSettings, error) {
settings := BulkSettings{}
for k, v := range md {
switch k {
case grpcMetaKeyCollection:
for _, c := range v {
key := &resourcepb.ResourceKey{}
err := ReadSearchID(key, c)
if err != nil {
return settings, fmt.Errorf("error reading collection metadata: %s / %w", c, err)
}
settings.Collection = append(settings.Collection, key)
}
case grpcMetaKeyRebuildCollection:
settings.RebuildCollection = grpcMetaValueIsTrue(v)
case grpcMetaKeySkipValidation:
settings.SkipValidation = grpcMetaValueIsTrue(v)
}
}
return settings, nil
}
// BulkWrite implements ResourceServer.
// All requests must be to the same NAMESPACE/GROUP/RESOURCE
func (s *server) BulkProcess(stream resourcepb.BulkStore_BulkProcessServer) error {
ctx := stream.Context()
ctx, span := tracer.Start(ctx, "resource.server.BulkProcess")
defer span.End()
sendAndClose := func(rsp *resourcepb.BulkResponse) error {
span.AddEvent("sendAndClose", trace.WithAttributes(attribute.String("msg", rsp.String())))
return stream.SendAndClose(rsp)
}
user, ok := authlib.AuthInfoFrom(ctx)
if !ok || user == nil {
return sendAndClose(&resourcepb.BulkResponse{
Error: &resourcepb.ErrorResult{
Message: "no user found in context",
Code: http.StatusUnauthorized,
},
})
}
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return sendAndClose(&resourcepb.BulkResponse{
Error: &resourcepb.ErrorResult{
Message: "unable to read metadata gRPC request",
Code: http.StatusPreconditionFailed,
},
})
}
// Add relevant metadata into span.
for _, k := range metadataKeys {
meta := md.Get(k)
if len(meta) > 0 {
span.SetAttributes(attribute.StringSlice(k, meta))
}
}
runner := &batchRunner{
checker: make(map[string]authlib.ItemChecker), // Can create
stream: stream,
span: span,
}
settings, err := NewBulkSettings(md)
if err != nil {
return sendAndClose(&resourcepb.BulkResponse{
Error: &resourcepb.ErrorResult{
Message: "error reading settings",
Reason: err.Error(),
Code: http.StatusPreconditionFailed,
},
})
}
if len(settings.Collection) < 1 {
return sendAndClose(&resourcepb.BulkResponse{
Error: &resourcepb.ErrorResult{
Message: "Missing target collection(s) in request header",
Code: http.StatusBadRequest,
},
})
}
// Verify all collection request keys are valid
for _, k := range settings.Collection {
if r := verifyRequestKeyCollection(k); r != nil {
return sendAndClose(&resourcepb.BulkResponse{
Error: &resourcepb.ErrorResult{
Message: fmt.Sprintf("invalid request key: %s", r.Message),
Code: http.StatusBadRequest,
},
})
}
}
if settings.RebuildCollection {
for _, k := range settings.Collection {
// Can we delete the whole collection
rsp, err := s.access.Check(ctx, user, authlib.CheckRequest{
Namespace: k.Namespace,
Group: k.Group,
Resource: k.Resource,
Verb: utils.VerbDeleteCollection,
}, "")
if err != nil || !rsp.Allowed {
return sendAndClose(&resourcepb.BulkResponse{
Error: &resourcepb.ErrorResult{
Message: fmt.Sprintf("Requester must be able to: %s", utils.VerbDeleteCollection),
Code: http.StatusForbidden,
},
})
}
// This will be called for each request -- with the folder ID
runner.checker[NSGR(k)], _, err = s.access.Compile(ctx, user, authlib.ListRequest{
Namespace: k.Namespace,
Group: k.Group,
Resource: k.Resource,
Verb: utils.VerbCreate,
})
if err != nil {
return sendAndClose(&resourcepb.BulkResponse{
Error: &resourcepb.ErrorResult{
Message: "Unable to check `create` permission",
Code: http.StatusForbidden,
},
})
}
}
} else {
return sendAndClose(&resourcepb.BulkResponse{
Error: &resourcepb.ErrorResult{
Message: "Bulk currently only supports RebuildCollection",
Code: http.StatusBadRequest,
},
})
}
backend, ok := s.backend.(BulkProcessingBackend)
if !ok {
return sendAndClose(&resourcepb.BulkResponse{
Error: &resourcepb.ErrorResult{
Message: "The server backend does not support batch processing",
Code: http.StatusNotImplemented,
},
})
}
// BulkProcess requests
rsp := backend.ProcessBulk(ctx, settings, runner)
if rsp == nil {
rsp = &resourcepb.BulkResponse{
Error: &resourcepb.ErrorResult{
Code: http.StatusInternalServerError,
Message: "Nothing returned from process batch",
},
}
}
if runner.err != nil {
rsp.Error = AsErrorResult(runner.err)
}
return sendAndClose(rsp)
}
var (
_ BulkRequestIterator = (*batchRunner)(nil)
)
type batchRunner struct {
stream resourcepb.BulkStore_BulkProcessServer
rollback bool
request *resourcepb.BulkRequest
err error
checker map[string]authlib.ItemChecker
span trace.Span
}
// Next implements BulkRequestIterator.
func (b *batchRunner) Next() bool {
if b.rollback {
return true
}
b.request, b.err = b.stream.Recv()
if errors.Is(b.err, io.EOF) {
b.err = nil
b.rollback = false
b.request = nil
return false
}
if b.err != nil {
b.rollback = true
b.span.AddEvent("next", trace.WithAttributes(attribute.String("error", b.err.Error())))
return true
}
if b.request != nil {
key := b.request.Key
k := NSGR(key)
checker, ok := b.checker[k]
if !ok {
b.err = fmt.Errorf("missing access control for: %s", k)
b.rollback = true
} else if !checker(key.Name, b.request.Folder) {
b.err = fmt.Errorf("not allowed to create resource")
b.rollback = true
}
// Mention resource in the span.
attrs := []attribute.KeyValue{
attribute.String("key", nsgrWithName(key)),
}
if b.err != nil {
attrs = append(attrs, attribute.String("error", b.err.Error()))
}
b.span.AddEvent("next", trace.WithAttributes(attrs...))
return true
}
return false
}
// Request implements BulkRequestIterator.
func (b *batchRunner) Request() *resourcepb.BulkRequest {
if b.rollback {
return nil
}
return b.request
}
// RollbackRequested implements BulkRequestIterator.
func (b *batchRunner) RollbackRequested() bool {
if b.rollback {
b.rollback = false // break iterator
return true
}
return false
}
type bulkRV struct {
max int64
counter int64
}
// Used when executing a bulk import so that we can generate snowflake RVs in the past
func newBulkRV() *bulkRV {
t := snowflakeFromTime(time.Now())
return &bulkRV{
max: t,
counter: 0,
}
}
func (x *bulkRV) next(obj metav1.Object) int64 {
ts := snowflakeFromTime(obj.GetCreationTimestamp().Time)
anno := obj.GetAnnotations()
if anno != nil {
v := anno[utils.AnnoKeyUpdatedTimestamp]
t, err := time.Parse(time.RFC3339, v)
if err == nil {
ts = snowflakeFromTime(t)
}
}
if ts > x.max || ts < 0 {
ts = x.max
}
x.counter++
return ts + x.counter
}
type BulkLock struct {
running map[string]bool
mu sync.Mutex
}
func NewBulkLock() *BulkLock {
return &BulkLock{
running: make(map[string]bool),
}
}
func (x *BulkLock) Start(keys []*resourcepb.ResourceKey) error {
x.mu.Lock()
defer x.mu.Unlock()
// First verify that it is not already running
ids := make([]string, len(keys))
for i, k := range keys {
id := NSGR(k)
if x.running[id] {
return &apierrors.StatusError{ErrStatus: metav1.Status{
Code: http.StatusPreconditionFailed,
Message: "bulk export is already running",
}}
}
ids[i] = id
}
// Then add the keys to the lock
for _, k := range ids {
x.running[k] = true
}
return nil
}
func (x *BulkLock) Finish(keys []*resourcepb.ResourceKey) {
x.mu.Lock()
defer x.mu.Unlock()
for _, k := range keys {
delete(x.running, NSGR(k))
}
}
func (x *BulkLock) Active() bool {
x.mu.Lock()
defer x.mu.Unlock()
return len(x.running) > 0
}