Compare commits
19 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 098c924a93 | |||
| 3f2f46822b | |||
| 9e183c262f | |||
| d757da0cc6 | |||
| 5fc89089d4 | |||
| 4c77b7b493 | |||
| 96e57b270d | |||
| 50393d3a63 | |||
| 57dbb993af | |||
| 08e9908d27 | |||
| 8212473330 | |||
| 49b3413c04 | |||
| 20fe9b455d | |||
| 6d7ecc500c | |||
| c96c4d1e09 | |||
| f45cf6bdf8 | |||
| e6b2a6cea3 | |||
| cd15d73e32 | |||
| 821c553132 |
@@ -6,11 +6,16 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"google.golang.org/grpc/metadata"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
"github.com/bwmarrin/snowflake"
|
||||
authlib "github.com/grafana/authlib/types"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||
@@ -328,3 +333,107 @@ func (b *batchRunner) RollbackRequested() bool {
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// getDummySnowflake returns a snowflake with machine and counter bits set to 0
|
||||
func getDummySnowflake(ts time.Time) int64 {
|
||||
return (ts.UnixMilli() - snowflake.Epoch) << 22
|
||||
}
|
||||
|
||||
type bulkRV struct {
|
||||
max int64
|
||||
counters map[int64]int64
|
||||
}
|
||||
|
||||
// Used when executing a bulk import so that we can generate snowflake RVs in the past
|
||||
func newBulkRV() *bulkRV {
|
||||
t := getDummySnowflake(time.Now())
|
||||
return &bulkRV{
|
||||
max: t,
|
||||
counters: make(map[int64]int64),
|
||||
}
|
||||
}
|
||||
|
||||
func (x *bulkRV) next(obj metav1.Object) int64 {
|
||||
ts := getDummySnowflake(obj.GetCreationTimestamp().Time)
|
||||
anno := obj.GetAnnotations()
|
||||
if anno != nil {
|
||||
v := anno[utils.AnnoKeyUpdatedTimestamp]
|
||||
t, err := time.Parse(time.RFC3339, v)
|
||||
if err == nil {
|
||||
ts = getDummySnowflake(t)
|
||||
}
|
||||
}
|
||||
if ts > x.max || ts < 0 {
|
||||
ts = x.max
|
||||
}
|
||||
|
||||
counter := x.counters[ts]
|
||||
counter++
|
||||
|
||||
if counter > 65535 {
|
||||
for {
|
||||
ts += (1 << 22) // Add 1ms in snowflake format
|
||||
if x.counters[ts] < 65535 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
counter = x.counters[ts] + 1
|
||||
|
||||
if ts > x.max {
|
||||
x.max = ts
|
||||
}
|
||||
}
|
||||
|
||||
x.counters[ts] = counter
|
||||
return ts + counter
|
||||
}
|
||||
|
||||
type BulkLock struct {
|
||||
running map[string]bool
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func NewBulkLock() *BulkLock {
|
||||
return &BulkLock{
|
||||
running: make(map[string]bool),
|
||||
}
|
||||
}
|
||||
|
||||
func (x *BulkLock) Start(keys []*resourcepb.ResourceKey) error {
|
||||
x.mu.Lock()
|
||||
defer x.mu.Unlock()
|
||||
|
||||
// First verify that it is not already running
|
||||
ids := make([]string, len(keys))
|
||||
for i, k := range keys {
|
||||
id := NSGR(k)
|
||||
if x.running[id] {
|
||||
return &apierrors.StatusError{ErrStatus: metav1.Status{
|
||||
Code: http.StatusPreconditionFailed,
|
||||
Message: "bulk export is already running",
|
||||
}}
|
||||
}
|
||||
ids[i] = id
|
||||
}
|
||||
|
||||
// Then add the keys to the lock
|
||||
for _, k := range ids {
|
||||
x.running[k] = true
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *BulkLock) Finish(keys []*resourcepb.ResourceKey) {
|
||||
x.mu.Lock()
|
||||
defer x.mu.Unlock()
|
||||
for _, k := range keys {
|
||||
delete(x.running, NSGR(k))
|
||||
}
|
||||
}
|
||||
|
||||
func (x *BulkLock) Active() bool {
|
||||
x.mu.Lock()
|
||||
defer x.mu.Unlock()
|
||||
return len(x.running) > 0
|
||||
}
|
||||
|
||||
@@ -537,6 +537,27 @@ func (d *dataStore) Delete(ctx context.Context, key DataKey) error {
|
||||
return d.kv.Delete(ctx, dataSection, key.String())
|
||||
}
|
||||
|
||||
func (n *dataStore) batchDelete(ctx context.Context, keys []DataKey) error {
|
||||
for len(keys) > 0 {
|
||||
batch := keys
|
||||
if len(batch) > dataBatchSize {
|
||||
batch = batch[:dataBatchSize]
|
||||
}
|
||||
|
||||
keys = keys[len(batch):]
|
||||
stringKeys := make([]string, len(batch))
|
||||
for _, dataKey := range batch {
|
||||
stringKeys = append(stringKeys, dataKey.String())
|
||||
}
|
||||
|
||||
if err := n.kv.BatchDelete(ctx, dataSection, stringKeys); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ParseKey parses a string key into a DataKey struct
|
||||
func ParseKey(key string) (DataKey, error) {
|
||||
parts := strings.Split(key, "/")
|
||||
|
||||
@@ -2950,6 +2950,42 @@ func TestDataStore_getGroupResources(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDataStore_BatchDelete(t *testing.T) {
|
||||
ds := setupTestDataStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
keys := make([]DataKey, 95)
|
||||
for i := 0; i < 95; i++ {
|
||||
rv := node.Generate().Int64()
|
||||
keys[i] = DataKey{
|
||||
Namespace: "test-namespace",
|
||||
Group: "test-group",
|
||||
Resource: "test-resource",
|
||||
Name: fmt.Sprintf("test-name-%d", i),
|
||||
ResourceVersion: rv,
|
||||
Action: DataActionCreated,
|
||||
Folder: "test-folder",
|
||||
}
|
||||
content := fmt.Sprintf("test-value-%d", i)
|
||||
err := ds.Save(ctx, keys[i], bytes.NewReader([]byte(content)))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
err := ds.batchDelete(ctx, keys)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify all events were deleted
|
||||
for i := 0; i < 95; i++ {
|
||||
_, err := ds.Get(ctx, DataKey{
|
||||
Namespace: "test-namespace",
|
||||
Group: "test-group",
|
||||
Resource: "test-resource",
|
||||
Name: fmt.Sprintf("test-name-%d", i),
|
||||
})
|
||||
require.Error(t, err, "Resource should have been deleted")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDataStore_BatchGet(t *testing.T) {
|
||||
ds := setupTestDataStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
@@ -0,0 +1,204 @@
|
||||
package resource
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"iter"
|
||||
"strings"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/validation"
|
||||
)
|
||||
|
||||
const (
|
||||
internalSection = "unified/internal"
|
||||
)
|
||||
|
||||
type internalStore struct {
|
||||
kv KV
|
||||
}
|
||||
|
||||
type InternalKey struct {
|
||||
Namespace string
|
||||
Group string
|
||||
Resource string
|
||||
Subsection string
|
||||
}
|
||||
|
||||
func (k InternalKey) String() string {
|
||||
return fmt.Sprintf("%s/%s/%s/%s", strings.ToLower(k.Subsection), k.Group, k.Resource, k.Namespace)
|
||||
}
|
||||
|
||||
func (k InternalKey) Validate() error {
|
||||
if k.Namespace == "" {
|
||||
return NewValidationError("namespace", k.Namespace, ErrNamespaceRequired)
|
||||
}
|
||||
if k.Subsection == "" {
|
||||
return NewValidationError("Subsection", k.Subsection, "Subsection is required")
|
||||
}
|
||||
if err := validation.IsValidGroup(k.Group); err != nil {
|
||||
return NewValidationError("group", k.Group, err[0])
|
||||
}
|
||||
if err := validation.IsValidResource(k.Resource); err != nil {
|
||||
return NewValidationError("resource", k.Resource, err[0])
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func parseInternalKey(key string) (InternalKey, error) {
|
||||
parts := strings.Split(key, "/")
|
||||
if len(parts) != 5 {
|
||||
return InternalKey{}, fmt.Errorf("invalid internal key: %s", key)
|
||||
}
|
||||
return InternalKey{
|
||||
Subsection: parts[0],
|
||||
Group: parts[1],
|
||||
Resource: parts[2],
|
||||
Namespace: parts[3],
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newInternalStore(kv KV) *internalStore {
|
||||
return &internalStore{
|
||||
kv: kv,
|
||||
}
|
||||
}
|
||||
|
||||
type InternalData struct {
|
||||
Namespace string
|
||||
Group string
|
||||
Resource string
|
||||
Subsection string
|
||||
Value string
|
||||
}
|
||||
|
||||
func (d *internalStore) Get(ctx context.Context, key InternalKey) (InternalData, error) {
|
||||
if err := key.Validate(); err != nil {
|
||||
return InternalData{}, fmt.Errorf("invalid internal key: %w", err)
|
||||
}
|
||||
|
||||
reader, err := d.kv.Get(ctx, internalSection, key.String())
|
||||
defer func() { _ = reader.Close() }()
|
||||
if err != nil {
|
||||
return InternalData{}, err
|
||||
}
|
||||
|
||||
value, err := io.ReadAll(reader)
|
||||
if err != nil {
|
||||
return InternalData{}, err
|
||||
}
|
||||
|
||||
return InternalData{
|
||||
Namespace: key.Namespace,
|
||||
Group: key.Group,
|
||||
Resource: key.Resource,
|
||||
Subsection: key.Subsection,
|
||||
Value: string(value),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (d *internalStore) BatchGet(ctx context.Context, keys []InternalKey) iter.Seq2[InternalData, error] {
|
||||
return func(yield func(InternalData, error) bool) {
|
||||
for _, key := range keys {
|
||||
if err := key.Validate(); err != nil {
|
||||
yield(InternalData{}, fmt.Errorf("invalid internal key %s: %w", key.String(), err))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Process keys in batches. Uses same batch size as datastore.go
|
||||
for i := 0; i < len(keys); i += dataBatchSize {
|
||||
end := i + dataBatchSize
|
||||
if end > len(keys) {
|
||||
end = len(keys)
|
||||
}
|
||||
batch := keys[i:end]
|
||||
|
||||
stringKeys := make([]string, len(batch))
|
||||
for j, key := range batch {
|
||||
stringKeys[j] = key.String()
|
||||
}
|
||||
|
||||
for kv, err := range d.kv.BatchGet(ctx, internalSection, stringKeys) {
|
||||
if err != nil {
|
||||
yield(InternalData{}, err)
|
||||
return
|
||||
}
|
||||
|
||||
key, err := parseInternalKey(kv.Key)
|
||||
if err != nil {
|
||||
yield(InternalData{}, err)
|
||||
return
|
||||
}
|
||||
|
||||
value, err := io.ReadAll(kv.Value)
|
||||
if err != nil {
|
||||
yield(InternalData{}, err)
|
||||
return
|
||||
}
|
||||
|
||||
if !yield(InternalData{
|
||||
Namespace: key.Namespace,
|
||||
Group: key.Group,
|
||||
Resource: key.Resource,
|
||||
Subsection: key.Subsection,
|
||||
Value: string(value),
|
||||
}, nil) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *internalStore) GetSubsection(ctx context.Context, Subsection string) iter.Seq2[InternalKey, error] {
|
||||
opts := ListOptions{
|
||||
Sort: SortOrderAsc,
|
||||
StartKey: Subsection,
|
||||
}
|
||||
return func(yield func(InternalKey, error) bool) {
|
||||
for key, err := range d.kv.Keys(ctx, internalSection, opts) {
|
||||
if err != nil {
|
||||
yield(InternalKey{}, err)
|
||||
return
|
||||
}
|
||||
|
||||
internalKey, err := parseInternalKey(key)
|
||||
if err != nil {
|
||||
yield(InternalKey{}, err)
|
||||
return
|
||||
}
|
||||
|
||||
if !yield(internalKey, nil) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *internalStore) Save(ctx context.Context, key InternalKey, value string) error {
|
||||
if err := key.Validate(); err != nil {
|
||||
return fmt.Errorf("invalid internal key: %w", err)
|
||||
}
|
||||
|
||||
writer, err := d.kv.Save(ctx, internalSection, key.String())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = io.WriteString(writer, value)
|
||||
if err != nil {
|
||||
_ = writer.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
return writer.Close()
|
||||
}
|
||||
|
||||
func (d *internalStore) Delete(ctx context.Context, key InternalKey) error {
|
||||
if err := key.Validate(); err != nil {
|
||||
return fmt.Errorf("invalid internal key: %w", err)
|
||||
}
|
||||
|
||||
return d.kv.Delete(ctx, internalSection, key.String())
|
||||
}
|
||||
@@ -0,0 +1,246 @@
|
||||
package resource
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func setupTestInternalStore(t *testing.T) *internalStore {
|
||||
db := setupTestBadgerDB(t)
|
||||
t.Cleanup(func() {
|
||||
err := db.Close()
|
||||
require.NoError(t, err)
|
||||
})
|
||||
kv := NewBadgerKV(db)
|
||||
return newInternalStore(kv)
|
||||
}
|
||||
|
||||
func TestNewInternalStore(t *testing.T) {
|
||||
store := setupTestInternalStore(t)
|
||||
assert.NotNil(t, store.kv)
|
||||
}
|
||||
|
||||
func TestInternalStore_InternalKey_String(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
internalKey InternalKey
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
name: "basic internal key",
|
||||
internalKey: InternalKey{
|
||||
Namespace: "default",
|
||||
Group: "apps",
|
||||
Resource: "resource",
|
||||
Subsection: "lastimporttime",
|
||||
},
|
||||
expected: "lastimporttime/apps/resource/default",
|
||||
},
|
||||
{
|
||||
name: "subsection should be lowercased",
|
||||
internalKey: InternalKey{
|
||||
Namespace: "default",
|
||||
Group: "apps",
|
||||
Resource: "resource",
|
||||
Subsection: "LastImportTime",
|
||||
},
|
||||
expected: "lastimporttime/apps/resource/default",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
result := tt.internalKey.String()
|
||||
assert.Equal(t, tt.expected, result)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestInternalStore_InternalKey_Validate(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
key InternalKey
|
||||
error error
|
||||
}{
|
||||
{
|
||||
name: "valid key",
|
||||
key: InternalKey{
|
||||
Namespace: "default",
|
||||
Group: "apps",
|
||||
Resource: "resource",
|
||||
Subsection: "lastimporttime",
|
||||
},
|
||||
error: nil,
|
||||
},
|
||||
{
|
||||
name: "valid key no value",
|
||||
key: InternalKey{
|
||||
Namespace: "default",
|
||||
Group: "apps",
|
||||
Resource: "resource",
|
||||
Subsection: "lastimporttime",
|
||||
},
|
||||
error: nil,
|
||||
},
|
||||
{
|
||||
name: "empty namespace",
|
||||
key: InternalKey{
|
||||
Namespace: "",
|
||||
Group: "apps",
|
||||
Resource: "resource",
|
||||
Subsection: "lastimporttime",
|
||||
},
|
||||
error: errors.New("namespace '' is invalid: namespace is required"),
|
||||
},
|
||||
{
|
||||
name: "empty group",
|
||||
key: InternalKey{
|
||||
Namespace: "default",
|
||||
Group: "",
|
||||
Resource: "resource",
|
||||
Subsection: "lastimporttime",
|
||||
},
|
||||
error: errors.New("group '' is invalid: group is too short"),
|
||||
},
|
||||
{
|
||||
name: "empty resource",
|
||||
key: InternalKey{
|
||||
Namespace: "default",
|
||||
Group: "apps",
|
||||
Resource: "",
|
||||
Subsection: "lastimporttime",
|
||||
},
|
||||
error: errors.New("resource '' is invalid: resource is too short"),
|
||||
},
|
||||
{
|
||||
name: "empty subsection",
|
||||
key: InternalKey{
|
||||
Namespace: "default",
|
||||
Group: "apps",
|
||||
Resource: "resource",
|
||||
Subsection: "",
|
||||
},
|
||||
error: errors.New("Subsection '' is invalid: Subsection is required"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
err := tt.key.Validate()
|
||||
|
||||
if tt.error == nil {
|
||||
assert.NoError(t, err)
|
||||
} else {
|
||||
assert.Error(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestInternalStore(t *testing.T) {
|
||||
t.Run("Save and Get", func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store := setupTestInternalStore(t)
|
||||
key := InternalKey{
|
||||
Namespace: "default",
|
||||
Group: "apps",
|
||||
Resource: "resource",
|
||||
Subsection: "lastimporttime",
|
||||
}
|
||||
|
||||
err := store.Save(ctx, key, "1")
|
||||
require.NoError(t, err)
|
||||
|
||||
value, err := store.Get(ctx, key)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "1", value)
|
||||
})
|
||||
|
||||
t.Run("GetSubsection and BatchGet", func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store := setupTestInternalStore(t)
|
||||
things := []struct {
|
||||
key InternalKey
|
||||
value string
|
||||
}{
|
||||
{
|
||||
key: InternalKey{
|
||||
Namespace: "stacks-1",
|
||||
Group: "apps",
|
||||
Resource: "resource",
|
||||
Subsection: "lastimporttime",
|
||||
},
|
||||
value: "foo1",
|
||||
},
|
||||
{
|
||||
key: InternalKey{
|
||||
Namespace: "stacks-2",
|
||||
Group: "apps",
|
||||
Resource: "resource",
|
||||
Subsection: "lastimporttime",
|
||||
},
|
||||
value: "foo2",
|
||||
},
|
||||
{
|
||||
key: InternalKey{
|
||||
Namespace: "stacks-3",
|
||||
Group: "apps",
|
||||
Resource: "resource",
|
||||
Subsection: "lastimporttime",
|
||||
},
|
||||
value: "foo3",
|
||||
},
|
||||
{
|
||||
key: InternalKey{
|
||||
Namespace: "stacks-4",
|
||||
Group: "apps",
|
||||
Resource: "resource",
|
||||
Subsection: "lastimporttime",
|
||||
},
|
||||
value: "foo4",
|
||||
},
|
||||
{
|
||||
key: InternalKey{
|
||||
Namespace: "stacks-5",
|
||||
Group: "apps",
|
||||
Resource: "resource",
|
||||
Subsection: "lastimporttime",
|
||||
},
|
||||
value: "foo5",
|
||||
},
|
||||
}
|
||||
|
||||
for _, thing := range things {
|
||||
err := store.Save(ctx, thing.key, thing.value)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
var i int
|
||||
for thing, err := range store.GetSubsection(ctx, "lastimporttime") {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, things[i].key, thing)
|
||||
data, err := store.Get(ctx, things[i].key)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, data.Value, things[i].value)
|
||||
i++
|
||||
}
|
||||
|
||||
keys := make([]InternalKey, len(things))
|
||||
for _, thing := range things {
|
||||
keys = append(keys, thing.key)
|
||||
}
|
||||
var j int
|
||||
for data, err := range store.BatchGet(ctx, keys) {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, things[j].key.Namespace, data.Namespace)
|
||||
require.Equal(t, things[j].key.Group, data.Group)
|
||||
require.Equal(t, things[j].key.Resource, data.Resource)
|
||||
require.Equal(t, things[j].key.Subsection, data.Subsection)
|
||||
require.Equal(t, things[j].value, data.Value)
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
@@ -54,8 +55,10 @@ func convertEmptyToClusterNamespace(namespace string, withExperimentalClusterSco
|
||||
type kvStorageBackend struct {
|
||||
snowflake *snowflake.Node
|
||||
kv KV
|
||||
bulkLock *BulkLock
|
||||
dataStore *dataStore
|
||||
eventStore *eventStore
|
||||
internalStore *internalStore
|
||||
notifier *notifier
|
||||
builder DocumentBuilder
|
||||
log logging.Logger
|
||||
@@ -102,8 +105,10 @@ func NewKVStorageBackend(opts KVBackendOptions) (StorageBackend, error) {
|
||||
|
||||
backend := &kvStorageBackend{
|
||||
kv: kv,
|
||||
bulkLock: NewBulkLock(),
|
||||
dataStore: newDataStore(kv),
|
||||
eventStore: eventStore,
|
||||
internalStore: newInternalStore(kv),
|
||||
notifier: newNotifier(eventStore, notifierOptions{}),
|
||||
snowflake: s,
|
||||
builder: StandardDocumentBuilder(), // For now we use the standard document builder.
|
||||
@@ -1230,12 +1235,240 @@ func (k *kvStorageBackend) GetResourceStats(ctx context.Context, nsr NamespacedR
|
||||
return k.dataStore.GetResourceStats(ctx, nsr.Namespace, minCount)
|
||||
}
|
||||
|
||||
const lastImportTimeSubsection = "lastimporttime"
|
||||
|
||||
func (k *kvStorageBackend) GetResourceLastImportTimes(ctx context.Context) iter.Seq2[ResourceLastImportTime, error] {
|
||||
return func(yield func(ResourceLastImportTime, error) bool) {
|
||||
yield(ResourceLastImportTime{}, fmt.Errorf("not implemented"))
|
||||
for key, err := range k.internalStore.GetSubsection(ctx, lastImportTimeSubsection) {
|
||||
if err != nil {
|
||||
yield(ResourceLastImportTime{}, err)
|
||||
return
|
||||
}
|
||||
|
||||
data, err := k.internalStore.Get(ctx, key)
|
||||
if err != nil {
|
||||
yield(ResourceLastImportTime{}, err)
|
||||
return
|
||||
}
|
||||
|
||||
value, err := time.Parse(time.RFC3339, data.Value)
|
||||
if err != nil {
|
||||
yield(ResourceLastImportTime{}, err)
|
||||
return
|
||||
}
|
||||
|
||||
if !yield(ResourceLastImportTime{
|
||||
NamespacedResource: NamespacedResource{
|
||||
Namespace: data.Namespace,
|
||||
Group: data.Group,
|
||||
Resource: data.Resource,
|
||||
},
|
||||
LastImportTime: value,
|
||||
}, nil) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (k *kvStorageBackend) updateLastImportTime(ctx context.Context, key *resourcepb.ResourceKey, now time.Time) error {
|
||||
dataKey := InternalKey{
|
||||
Namespace: key.Namespace,
|
||||
Group: key.Group,
|
||||
Resource: key.Resource,
|
||||
Subsection: lastImportTimeSubsection,
|
||||
}
|
||||
|
||||
return k.internalStore.Save(ctx, dataKey, now.UTC().Format(time.RFC3339))
|
||||
}
|
||||
|
||||
func (k *kvStorageBackend) ProcessBulk(ctx context.Context, setting BulkSettings, iter BulkRequestIterator) *resourcepb.BulkResponse {
|
||||
// TODO cross-node lock
|
||||
err := k.bulkLock.Start(setting.Collection)
|
||||
if err != nil {
|
||||
return &resourcepb.BulkResponse{
|
||||
Error: AsErrorResult(err),
|
||||
}
|
||||
}
|
||||
defer k.bulkLock.Finish(setting.Collection)
|
||||
|
||||
bulkRvGenerator := newBulkRV()
|
||||
summaries := make(map[string]*resourcepb.BulkResponse_Summary, len(setting.Collection))
|
||||
rsp := &resourcepb.BulkResponse{}
|
||||
|
||||
if setting.RebuildCollection {
|
||||
for _, key := range setting.Collection {
|
||||
events := make([]string, 0)
|
||||
for evtKeyStr, err := range k.eventStore.ListKeysSince(ctx, 1) {
|
||||
if err != nil {
|
||||
k.log.Error("failed to list event: %s", err)
|
||||
return rsp
|
||||
}
|
||||
|
||||
evtKey, err := ParseEventKey(evtKeyStr)
|
||||
if err != nil {
|
||||
k.log.Error("error parsing event key: %s", err)
|
||||
return rsp
|
||||
}
|
||||
|
||||
if evtKey.Group != key.Group || evtKey.Resource != key.Resource || evtKey.Namespace != key.Namespace {
|
||||
continue
|
||||
}
|
||||
|
||||
events = append(events, evtKeyStr)
|
||||
}
|
||||
|
||||
if err := k.eventStore.batchDelete(ctx, events); err != nil {
|
||||
k.log.Error("failed to delete events: %s", err)
|
||||
return rsp
|
||||
}
|
||||
|
||||
historyKeys := make([]DataKey, 0)
|
||||
|
||||
for dataKey, err := range k.dataStore.Keys(ctx, ListRequestKey{
|
||||
Namespace: key.Namespace,
|
||||
Group: key.Group,
|
||||
Resource: key.Resource,
|
||||
}, SortOrderAsc) {
|
||||
if err != nil {
|
||||
k.log.Error("failed to list collection before delete: %s", err)
|
||||
return rsp
|
||||
}
|
||||
|
||||
historyKeys = append(historyKeys, dataKey)
|
||||
}
|
||||
|
||||
previousCount := int64(len(historyKeys))
|
||||
if err := k.dataStore.batchDelete(ctx, historyKeys); err != nil {
|
||||
k.log.Error("failed to delete collection: %s", err)
|
||||
return rsp
|
||||
}
|
||||
summaries[NSGR(key)] = &resourcepb.BulkResponse_Summary{
|
||||
Namespace: key.Namespace,
|
||||
Group: key.Group,
|
||||
Resource: key.Resource,
|
||||
PreviousCount: previousCount,
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for _, key := range setting.Collection {
|
||||
summaries[NSGR(key)] = &resourcepb.BulkResponse_Summary{
|
||||
Namespace: key.Namespace,
|
||||
Group: key.Group,
|
||||
Resource: key.Resource,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
obj := &unstructured.Unstructured{}
|
||||
|
||||
saved := make([]DataKey, 0)
|
||||
rollback := func() {
|
||||
// we don't have transactions in the kv store, so we simply delete everything we created
|
||||
err = k.dataStore.batchDelete(ctx, saved)
|
||||
if err != nil {
|
||||
k.log.Error("failed to delete during rollback: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
for iter.Next() {
|
||||
if iter.RollbackRequested() {
|
||||
rollback()
|
||||
break
|
||||
}
|
||||
|
||||
req := iter.Request()
|
||||
if req == nil {
|
||||
rollback()
|
||||
rsp.Error = AsErrorResult(fmt.Errorf("missing request"))
|
||||
break
|
||||
}
|
||||
|
||||
rsp.Processed++
|
||||
|
||||
var action DataAction
|
||||
switch resourcepb.WatchEvent_Type(req.Action) {
|
||||
case resourcepb.WatchEvent_ADDED:
|
||||
action = DataActionCreated
|
||||
// Check if resource already exists for create operations
|
||||
_, err := k.dataStore.GetLatestResourceKey(ctx, GetRequestKey{
|
||||
Group: req.Key.Group,
|
||||
Resource: req.Key.Resource,
|
||||
Namespace: req.Key.Namespace,
|
||||
Name: req.Key.Name,
|
||||
})
|
||||
if err == nil {
|
||||
rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
|
||||
Key: req.Key,
|
||||
Action: req.Action,
|
||||
Error: "resource already exists",
|
||||
})
|
||||
continue
|
||||
}
|
||||
if !errors.Is(err, ErrNotFound) {
|
||||
rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
|
||||
Key: req.Key,
|
||||
Action: req.Action,
|
||||
Error: fmt.Sprintf("failed to check if resource exists: %s", err),
|
||||
})
|
||||
continue
|
||||
}
|
||||
case resourcepb.WatchEvent_MODIFIED:
|
||||
action = DataActionUpdated
|
||||
case resourcepb.WatchEvent_DELETED:
|
||||
action = DataActionDeleted
|
||||
default:
|
||||
rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
|
||||
Key: req.Key,
|
||||
Action: req.Action,
|
||||
Error: "invalid event type",
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
err := obj.UnmarshalJSON(req.Value)
|
||||
if err != nil {
|
||||
rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
|
||||
Key: req.Key,
|
||||
Action: req.Action,
|
||||
Error: "unable to unmarshal json",
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
dataKey := DataKey{
|
||||
Group: req.Key.Group,
|
||||
Resource: req.Key.Resource,
|
||||
Namespace: req.Key.Namespace,
|
||||
Name: req.Key.Name,
|
||||
ResourceVersion: bulkRvGenerator.next(obj),
|
||||
Action: action,
|
||||
Folder: req.Folder,
|
||||
}
|
||||
err = k.dataStore.Save(ctx, dataKey, bytes.NewReader(req.Value))
|
||||
if err != nil {
|
||||
rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
|
||||
Key: req.Key,
|
||||
Action: req.Action,
|
||||
Error: fmt.Sprintf("failed to save resource: %s", err),
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
saved = append(saved, dataKey)
|
||||
}
|
||||
|
||||
for _, key := range setting.Collection {
|
||||
if err := k.updateLastImportTime(ctx, key, time.Now()); err != nil {
|
||||
rollback()
|
||||
rsp.Error = AsErrorResult(err)
|
||||
return rsp
|
||||
}
|
||||
}
|
||||
|
||||
return rsp
|
||||
}
|
||||
|
||||
// readAndClose reads all data from a ReadCloser and ensures it's closed,
|
||||
// combining any errors from both operations.
|
||||
func readAndClose(r io.ReadCloser) ([]byte, error) {
|
||||
|
||||
@@ -1997,3 +1997,68 @@ func TestKvStorageBackend_ClusterScopedResources(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestKvStorageBackend_ResourceLastImportTime(t *testing.T) {
|
||||
backend := setupTestStorageBackend(t)
|
||||
ctx := context.Background()
|
||||
|
||||
lastImportTimes := []ResourceLastImportTime{
|
||||
{
|
||||
NamespacedResource: NamespacedResource{
|
||||
Namespace: "stacks-1",
|
||||
Group: "apps",
|
||||
Resource: "resource",
|
||||
},
|
||||
LastImportTime: time.Now().Add(-2 * time.Minute).UTC().Truncate(time.Microsecond),
|
||||
},
|
||||
{
|
||||
NamespacedResource: NamespacedResource{
|
||||
Namespace: "stacks-2",
|
||||
Group: "apps",
|
||||
Resource: "resource",
|
||||
},
|
||||
LastImportTime: time.Now().Add(-7 * time.Minute).UTC().Truncate(time.Microsecond),
|
||||
},
|
||||
{
|
||||
NamespacedResource: NamespacedResource{
|
||||
Namespace: "stacks-3",
|
||||
Group: "apps",
|
||||
Resource: "resource",
|
||||
},
|
||||
LastImportTime: time.Now().Add(-3 * time.Minute).UTC().Truncate(time.Microsecond),
|
||||
},
|
||||
{
|
||||
NamespacedResource: NamespacedResource{
|
||||
Namespace: "stacks-4",
|
||||
Group: "apps",
|
||||
Resource: "resource",
|
||||
},
|
||||
LastImportTime: time.Now().Add(-24 * time.Minute).UTC().Truncate(time.Microsecond),
|
||||
},
|
||||
{
|
||||
NamespacedResource: NamespacedResource{
|
||||
Namespace: "stacks-5",
|
||||
Group: "apps",
|
||||
Resource: "resource",
|
||||
},
|
||||
LastImportTime: time.Now().Add(-44 * time.Minute).UTC().Truncate(time.Microsecond),
|
||||
},
|
||||
}
|
||||
|
||||
for _, lastImportTime := range lastImportTimes {
|
||||
err := backend.updateLastImportTime(ctx, &resourcepb.ResourceKey{
|
||||
Namespace: lastImportTime.Namespace,
|
||||
Group: lastImportTime.Group,
|
||||
Resource: lastImportTime.Resource,
|
||||
}, lastImportTime.LastImportTime)
|
||||
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
var i int
|
||||
for lastImportTime, err := range backend.GetResourceLastImportTimes(ctx) {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, lastImportTimes[i], lastImportTime)
|
||||
i++
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ func TestBadgerKVStorageBackend(t *testing.T) {
|
||||
TestBlobSupport: true,
|
||||
TestListModifiedSince: true,
|
||||
// Badger does not support bulk import yet.
|
||||
TestGetResourceLastImportTime: true,
|
||||
// TestGetResourceLastImportTime: true,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user