Compare commits

...

19 Commits

Author SHA1 Message Date
Will Assis 098c924a93 lint 2025-12-03 13:40:23 -03:00
Will Assis 3f2f46822b lint 2025-12-03 12:10:25 -03:00
Will Assis 9e183c262f implement last import time 2025-12-03 12:07:58 -03:00
Will Assis d757da0cc6 rename to internalstore 2025-12-02 15:23:37 -03:00
Will Assis 5fc89089d4 lint 2025-12-02 15:18:05 -03:00
Will Assis 4c77b7b493 implement last import time 2025-12-02 15:18:03 -03:00
Will Assis 96e57b270d implement GetAll in metadatastore 2025-12-02 15:17:04 -03:00
Will Assis 50393d3a63 rename settingstore to metadatastore 2025-12-02 15:17:04 -03:00
Will Assis 57dbb993af yo 2025-12-02 15:17:04 -03:00
Will Assis 08e9908d27 setup settingstore 2025-12-02 15:17:03 -03:00
Will Assis 8212473330 fix 2025-12-02 14:51:09 -03:00
Will Assis 49b3413c04 Merge branch 'main' of github.com:grafana/grafana into unified-storage-kvstore-bulk-import-support 2025-12-02 14:43:58 -03:00
Will Assis 20fe9b455d convert RVs to snowflake 2025-12-02 14:37:30 -03:00
Will Assis 6d7ecc500c revert sql/bulk.go changes 2025-12-02 13:11:33 -03:00
Will Assis c96c4d1e09 use batch delete when rolling back bulkimport 2025-12-01 14:59:46 -03:00
Will Assis f45cf6bdf8 fmt 2025-11-12 17:52:04 -03:00
Will Assis e6b2a6cea3 implement bulkprocess in kv storage_backend 2025-11-12 16:21:59 -03:00
Will Assis cd15d73e32 batchdelete to datastore 2025-11-12 15:44:04 -03:00
Will Assis 821c553132 move bulkLock and bulkRv logic from sql into resource package 2025-11-11 17:14:40 -03:00
8 changed files with 916 additions and 2 deletions
+109
View File
@@ -6,11 +6,16 @@ import (
"fmt"
"io"
"net/http"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/metadata"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/bwmarrin/snowflake"
authlib "github.com/grafana/authlib/types"
"github.com/grafana/grafana/pkg/apimachinery/utils"
@@ -328,3 +333,107 @@ func (b *batchRunner) RollbackRequested() bool {
}
return false
}
// getDummySnowflake returns a snowflake with machine and counter bits set to 0
func getDummySnowflake(ts time.Time) int64 {
return (ts.UnixMilli() - snowflake.Epoch) << 22
}
type bulkRV struct {
max int64
counters map[int64]int64
}
// Used when executing a bulk import so that we can generate snowflake RVs in the past
func newBulkRV() *bulkRV {
t := getDummySnowflake(time.Now())
return &bulkRV{
max: t,
counters: make(map[int64]int64),
}
}
func (x *bulkRV) next(obj metav1.Object) int64 {
ts := getDummySnowflake(obj.GetCreationTimestamp().Time)
anno := obj.GetAnnotations()
if anno != nil {
v := anno[utils.AnnoKeyUpdatedTimestamp]
t, err := time.Parse(time.RFC3339, v)
if err == nil {
ts = getDummySnowflake(t)
}
}
if ts > x.max || ts < 0 {
ts = x.max
}
counter := x.counters[ts]
counter++
if counter > 65535 {
for {
ts += (1 << 22) // Add 1ms in snowflake format
if x.counters[ts] < 65535 {
break
}
}
counter = x.counters[ts] + 1
if ts > x.max {
x.max = ts
}
}
x.counters[ts] = counter
return ts + counter
}
type BulkLock struct {
running map[string]bool
mu sync.Mutex
}
func NewBulkLock() *BulkLock {
return &BulkLock{
running: make(map[string]bool),
}
}
func (x *BulkLock) Start(keys []*resourcepb.ResourceKey) error {
x.mu.Lock()
defer x.mu.Unlock()
// First verify that it is not already running
ids := make([]string, len(keys))
for i, k := range keys {
id := NSGR(k)
if x.running[id] {
return &apierrors.StatusError{ErrStatus: metav1.Status{
Code: http.StatusPreconditionFailed,
Message: "bulk export is already running",
}}
}
ids[i] = id
}
// Then add the keys to the lock
for _, k := range ids {
x.running[k] = true
}
return nil
}
func (x *BulkLock) Finish(keys []*resourcepb.ResourceKey) {
x.mu.Lock()
defer x.mu.Unlock()
for _, k := range keys {
delete(x.running, NSGR(k))
}
}
func (x *BulkLock) Active() bool {
x.mu.Lock()
defer x.mu.Unlock()
return len(x.running) > 0
}
+21
View File
@@ -537,6 +537,27 @@ func (d *dataStore) Delete(ctx context.Context, key DataKey) error {
return d.kv.Delete(ctx, dataSection, key.String())
}
func (n *dataStore) batchDelete(ctx context.Context, keys []DataKey) error {
for len(keys) > 0 {
batch := keys
if len(batch) > dataBatchSize {
batch = batch[:dataBatchSize]
}
keys = keys[len(batch):]
stringKeys := make([]string, len(batch))
for _, dataKey := range batch {
stringKeys = append(stringKeys, dataKey.String())
}
if err := n.kv.BatchDelete(ctx, dataSection, stringKeys); err != nil {
return err
}
}
return nil
}
// ParseKey parses a string key into a DataKey struct
func ParseKey(key string) (DataKey, error) {
parts := strings.Split(key, "/")
@@ -2950,6 +2950,42 @@ func TestDataStore_getGroupResources(t *testing.T) {
}
}
func TestDataStore_BatchDelete(t *testing.T) {
ds := setupTestDataStore(t)
ctx := context.Background()
keys := make([]DataKey, 95)
for i := 0; i < 95; i++ {
rv := node.Generate().Int64()
keys[i] = DataKey{
Namespace: "test-namespace",
Group: "test-group",
Resource: "test-resource",
Name: fmt.Sprintf("test-name-%d", i),
ResourceVersion: rv,
Action: DataActionCreated,
Folder: "test-folder",
}
content := fmt.Sprintf("test-value-%d", i)
err := ds.Save(ctx, keys[i], bytes.NewReader([]byte(content)))
require.NoError(t, err)
}
err := ds.batchDelete(ctx, keys)
require.NoError(t, err)
// Verify all events were deleted
for i := 0; i < 95; i++ {
_, err := ds.Get(ctx, DataKey{
Namespace: "test-namespace",
Group: "test-group",
Resource: "test-resource",
Name: fmt.Sprintf("test-name-%d", i),
})
require.Error(t, err, "Resource should have been deleted")
}
}
func TestDataStore_BatchGet(t *testing.T) {
ds := setupTestDataStore(t)
ctx := context.Background()
@@ -0,0 +1,204 @@
package resource
import (
"context"
"fmt"
"io"
"iter"
"strings"
"github.com/grafana/grafana/pkg/apimachinery/validation"
)
const (
internalSection = "unified/internal"
)
type internalStore struct {
kv KV
}
type InternalKey struct {
Namespace string
Group string
Resource string
Subsection string
}
func (k InternalKey) String() string {
return fmt.Sprintf("%s/%s/%s/%s", strings.ToLower(k.Subsection), k.Group, k.Resource, k.Namespace)
}
func (k InternalKey) Validate() error {
if k.Namespace == "" {
return NewValidationError("namespace", k.Namespace, ErrNamespaceRequired)
}
if k.Subsection == "" {
return NewValidationError("Subsection", k.Subsection, "Subsection is required")
}
if err := validation.IsValidGroup(k.Group); err != nil {
return NewValidationError("group", k.Group, err[0])
}
if err := validation.IsValidResource(k.Resource); err != nil {
return NewValidationError("resource", k.Resource, err[0])
}
return nil
}
func parseInternalKey(key string) (InternalKey, error) {
parts := strings.Split(key, "/")
if len(parts) != 5 {
return InternalKey{}, fmt.Errorf("invalid internal key: %s", key)
}
return InternalKey{
Subsection: parts[0],
Group: parts[1],
Resource: parts[2],
Namespace: parts[3],
}, nil
}
func newInternalStore(kv KV) *internalStore {
return &internalStore{
kv: kv,
}
}
type InternalData struct {
Namespace string
Group string
Resource string
Subsection string
Value string
}
func (d *internalStore) Get(ctx context.Context, key InternalKey) (InternalData, error) {
if err := key.Validate(); err != nil {
return InternalData{}, fmt.Errorf("invalid internal key: %w", err)
}
reader, err := d.kv.Get(ctx, internalSection, key.String())
defer func() { _ = reader.Close() }()
if err != nil {
return InternalData{}, err
}
value, err := io.ReadAll(reader)
if err != nil {
return InternalData{}, err
}
return InternalData{
Namespace: key.Namespace,
Group: key.Group,
Resource: key.Resource,
Subsection: key.Subsection,
Value: string(value),
}, nil
}
func (d *internalStore) BatchGet(ctx context.Context, keys []InternalKey) iter.Seq2[InternalData, error] {
return func(yield func(InternalData, error) bool) {
for _, key := range keys {
if err := key.Validate(); err != nil {
yield(InternalData{}, fmt.Errorf("invalid internal key %s: %w", key.String(), err))
return
}
}
// Process keys in batches. Uses same batch size as datastore.go
for i := 0; i < len(keys); i += dataBatchSize {
end := i + dataBatchSize
if end > len(keys) {
end = len(keys)
}
batch := keys[i:end]
stringKeys := make([]string, len(batch))
for j, key := range batch {
stringKeys[j] = key.String()
}
for kv, err := range d.kv.BatchGet(ctx, internalSection, stringKeys) {
if err != nil {
yield(InternalData{}, err)
return
}
key, err := parseInternalKey(kv.Key)
if err != nil {
yield(InternalData{}, err)
return
}
value, err := io.ReadAll(kv.Value)
if err != nil {
yield(InternalData{}, err)
return
}
if !yield(InternalData{
Namespace: key.Namespace,
Group: key.Group,
Resource: key.Resource,
Subsection: key.Subsection,
Value: string(value),
}, nil) {
return
}
}
}
}
}
func (d *internalStore) GetSubsection(ctx context.Context, Subsection string) iter.Seq2[InternalKey, error] {
opts := ListOptions{
Sort: SortOrderAsc,
StartKey: Subsection,
}
return func(yield func(InternalKey, error) bool) {
for key, err := range d.kv.Keys(ctx, internalSection, opts) {
if err != nil {
yield(InternalKey{}, err)
return
}
internalKey, err := parseInternalKey(key)
if err != nil {
yield(InternalKey{}, err)
return
}
if !yield(internalKey, nil) {
return
}
}
}
}
func (d *internalStore) Save(ctx context.Context, key InternalKey, value string) error {
if err := key.Validate(); err != nil {
return fmt.Errorf("invalid internal key: %w", err)
}
writer, err := d.kv.Save(ctx, internalSection, key.String())
if err != nil {
return err
}
_, err = io.WriteString(writer, value)
if err != nil {
_ = writer.Close()
return err
}
return writer.Close()
}
func (d *internalStore) Delete(ctx context.Context, key InternalKey) error {
if err := key.Validate(); err != nil {
return fmt.Errorf("invalid internal key: %w", err)
}
return d.kv.Delete(ctx, internalSection, key.String())
}
@@ -0,0 +1,246 @@
package resource
import (
"context"
"errors"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func setupTestInternalStore(t *testing.T) *internalStore {
db := setupTestBadgerDB(t)
t.Cleanup(func() {
err := db.Close()
require.NoError(t, err)
})
kv := NewBadgerKV(db)
return newInternalStore(kv)
}
func TestNewInternalStore(t *testing.T) {
store := setupTestInternalStore(t)
assert.NotNil(t, store.kv)
}
func TestInternalStore_InternalKey_String(t *testing.T) {
tests := []struct {
name string
internalKey InternalKey
expected string
}{
{
name: "basic internal key",
internalKey: InternalKey{
Namespace: "default",
Group: "apps",
Resource: "resource",
Subsection: "lastimporttime",
},
expected: "lastimporttime/apps/resource/default",
},
{
name: "subsection should be lowercased",
internalKey: InternalKey{
Namespace: "default",
Group: "apps",
Resource: "resource",
Subsection: "LastImportTime",
},
expected: "lastimporttime/apps/resource/default",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := tt.internalKey.String()
assert.Equal(t, tt.expected, result)
})
}
}
func TestInternalStore_InternalKey_Validate(t *testing.T) {
tests := []struct {
name string
key InternalKey
error error
}{
{
name: "valid key",
key: InternalKey{
Namespace: "default",
Group: "apps",
Resource: "resource",
Subsection: "lastimporttime",
},
error: nil,
},
{
name: "valid key no value",
key: InternalKey{
Namespace: "default",
Group: "apps",
Resource: "resource",
Subsection: "lastimporttime",
},
error: nil,
},
{
name: "empty namespace",
key: InternalKey{
Namespace: "",
Group: "apps",
Resource: "resource",
Subsection: "lastimporttime",
},
error: errors.New("namespace '' is invalid: namespace is required"),
},
{
name: "empty group",
key: InternalKey{
Namespace: "default",
Group: "",
Resource: "resource",
Subsection: "lastimporttime",
},
error: errors.New("group '' is invalid: group is too short"),
},
{
name: "empty resource",
key: InternalKey{
Namespace: "default",
Group: "apps",
Resource: "",
Subsection: "lastimporttime",
},
error: errors.New("resource '' is invalid: resource is too short"),
},
{
name: "empty subsection",
key: InternalKey{
Namespace: "default",
Group: "apps",
Resource: "resource",
Subsection: "",
},
error: errors.New("Subsection '' is invalid: Subsection is required"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.key.Validate()
if tt.error == nil {
assert.NoError(t, err)
} else {
assert.Error(t, err)
}
})
}
}
func TestInternalStore(t *testing.T) {
t.Run("Save and Get", func(t *testing.T) {
ctx := context.Background()
store := setupTestInternalStore(t)
key := InternalKey{
Namespace: "default",
Group: "apps",
Resource: "resource",
Subsection: "lastimporttime",
}
err := store.Save(ctx, key, "1")
require.NoError(t, err)
value, err := store.Get(ctx, key)
require.NoError(t, err)
assert.Equal(t, "1", value)
})
t.Run("GetSubsection and BatchGet", func(t *testing.T) {
ctx := context.Background()
store := setupTestInternalStore(t)
things := []struct {
key InternalKey
value string
}{
{
key: InternalKey{
Namespace: "stacks-1",
Group: "apps",
Resource: "resource",
Subsection: "lastimporttime",
},
value: "foo1",
},
{
key: InternalKey{
Namespace: "stacks-2",
Group: "apps",
Resource: "resource",
Subsection: "lastimporttime",
},
value: "foo2",
},
{
key: InternalKey{
Namespace: "stacks-3",
Group: "apps",
Resource: "resource",
Subsection: "lastimporttime",
},
value: "foo3",
},
{
key: InternalKey{
Namespace: "stacks-4",
Group: "apps",
Resource: "resource",
Subsection: "lastimporttime",
},
value: "foo4",
},
{
key: InternalKey{
Namespace: "stacks-5",
Group: "apps",
Resource: "resource",
Subsection: "lastimporttime",
},
value: "foo5",
},
}
for _, thing := range things {
err := store.Save(ctx, thing.key, thing.value)
require.NoError(t, err)
}
var i int
for thing, err := range store.GetSubsection(ctx, "lastimporttime") {
require.NoError(t, err)
require.Equal(t, things[i].key, thing)
data, err := store.Get(ctx, things[i].key)
require.NoError(t, err)
require.Equal(t, data.Value, things[i].value)
i++
}
keys := make([]InternalKey, len(things))
for _, thing := range things {
keys = append(keys, thing.key)
}
var j int
for data, err := range store.BatchGet(ctx, keys) {
require.NoError(t, err)
require.Equal(t, things[j].key.Namespace, data.Namespace)
require.Equal(t, things[j].key.Group, data.Group)
require.Equal(t, things[j].key.Resource, data.Resource)
require.Equal(t, things[j].key.Subsection, data.Subsection)
require.Equal(t, things[j].value, data.Value)
}
})
}
+234 -1
View File
@@ -18,6 +18,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
@@ -54,8 +55,10 @@ func convertEmptyToClusterNamespace(namespace string, withExperimentalClusterSco
type kvStorageBackend struct {
snowflake *snowflake.Node
kv KV
bulkLock *BulkLock
dataStore *dataStore
eventStore *eventStore
internalStore *internalStore
notifier *notifier
builder DocumentBuilder
log logging.Logger
@@ -102,8 +105,10 @@ func NewKVStorageBackend(opts KVBackendOptions) (StorageBackend, error) {
backend := &kvStorageBackend{
kv: kv,
bulkLock: NewBulkLock(),
dataStore: newDataStore(kv),
eventStore: eventStore,
internalStore: newInternalStore(kv),
notifier: newNotifier(eventStore, notifierOptions{}),
snowflake: s,
builder: StandardDocumentBuilder(), // For now we use the standard document builder.
@@ -1230,12 +1235,240 @@ func (k *kvStorageBackend) GetResourceStats(ctx context.Context, nsr NamespacedR
return k.dataStore.GetResourceStats(ctx, nsr.Namespace, minCount)
}
const lastImportTimeSubsection = "lastimporttime"
func (k *kvStorageBackend) GetResourceLastImportTimes(ctx context.Context) iter.Seq2[ResourceLastImportTime, error] {
return func(yield func(ResourceLastImportTime, error) bool) {
yield(ResourceLastImportTime{}, fmt.Errorf("not implemented"))
for key, err := range k.internalStore.GetSubsection(ctx, lastImportTimeSubsection) {
if err != nil {
yield(ResourceLastImportTime{}, err)
return
}
data, err := k.internalStore.Get(ctx, key)
if err != nil {
yield(ResourceLastImportTime{}, err)
return
}
value, err := time.Parse(time.RFC3339, data.Value)
if err != nil {
yield(ResourceLastImportTime{}, err)
return
}
if !yield(ResourceLastImportTime{
NamespacedResource: NamespacedResource{
Namespace: data.Namespace,
Group: data.Group,
Resource: data.Resource,
},
LastImportTime: value,
}, nil) {
return
}
}
}
}
func (k *kvStorageBackend) updateLastImportTime(ctx context.Context, key *resourcepb.ResourceKey, now time.Time) error {
dataKey := InternalKey{
Namespace: key.Namespace,
Group: key.Group,
Resource: key.Resource,
Subsection: lastImportTimeSubsection,
}
return k.internalStore.Save(ctx, dataKey, now.UTC().Format(time.RFC3339))
}
func (k *kvStorageBackend) ProcessBulk(ctx context.Context, setting BulkSettings, iter BulkRequestIterator) *resourcepb.BulkResponse {
// TODO cross-node lock
err := k.bulkLock.Start(setting.Collection)
if err != nil {
return &resourcepb.BulkResponse{
Error: AsErrorResult(err),
}
}
defer k.bulkLock.Finish(setting.Collection)
bulkRvGenerator := newBulkRV()
summaries := make(map[string]*resourcepb.BulkResponse_Summary, len(setting.Collection))
rsp := &resourcepb.BulkResponse{}
if setting.RebuildCollection {
for _, key := range setting.Collection {
events := make([]string, 0)
for evtKeyStr, err := range k.eventStore.ListKeysSince(ctx, 1) {
if err != nil {
k.log.Error("failed to list event: %s", err)
return rsp
}
evtKey, err := ParseEventKey(evtKeyStr)
if err != nil {
k.log.Error("error parsing event key: %s", err)
return rsp
}
if evtKey.Group != key.Group || evtKey.Resource != key.Resource || evtKey.Namespace != key.Namespace {
continue
}
events = append(events, evtKeyStr)
}
if err := k.eventStore.batchDelete(ctx, events); err != nil {
k.log.Error("failed to delete events: %s", err)
return rsp
}
historyKeys := make([]DataKey, 0)
for dataKey, err := range k.dataStore.Keys(ctx, ListRequestKey{
Namespace: key.Namespace,
Group: key.Group,
Resource: key.Resource,
}, SortOrderAsc) {
if err != nil {
k.log.Error("failed to list collection before delete: %s", err)
return rsp
}
historyKeys = append(historyKeys, dataKey)
}
previousCount := int64(len(historyKeys))
if err := k.dataStore.batchDelete(ctx, historyKeys); err != nil {
k.log.Error("failed to delete collection: %s", err)
return rsp
}
summaries[NSGR(key)] = &resourcepb.BulkResponse_Summary{
Namespace: key.Namespace,
Group: key.Group,
Resource: key.Resource,
PreviousCount: previousCount,
}
}
} else {
for _, key := range setting.Collection {
summaries[NSGR(key)] = &resourcepb.BulkResponse_Summary{
Namespace: key.Namespace,
Group: key.Group,
Resource: key.Resource,
}
}
}
obj := &unstructured.Unstructured{}
saved := make([]DataKey, 0)
rollback := func() {
// we don't have transactions in the kv store, so we simply delete everything we created
err = k.dataStore.batchDelete(ctx, saved)
if err != nil {
k.log.Error("failed to delete during rollback: %s", err)
}
}
for iter.Next() {
if iter.RollbackRequested() {
rollback()
break
}
req := iter.Request()
if req == nil {
rollback()
rsp.Error = AsErrorResult(fmt.Errorf("missing request"))
break
}
rsp.Processed++
var action DataAction
switch resourcepb.WatchEvent_Type(req.Action) {
case resourcepb.WatchEvent_ADDED:
action = DataActionCreated
// Check if resource already exists for create operations
_, err := k.dataStore.GetLatestResourceKey(ctx, GetRequestKey{
Group: req.Key.Group,
Resource: req.Key.Resource,
Namespace: req.Key.Namespace,
Name: req.Key.Name,
})
if err == nil {
rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
Key: req.Key,
Action: req.Action,
Error: "resource already exists",
})
continue
}
if !errors.Is(err, ErrNotFound) {
rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
Key: req.Key,
Action: req.Action,
Error: fmt.Sprintf("failed to check if resource exists: %s", err),
})
continue
}
case resourcepb.WatchEvent_MODIFIED:
action = DataActionUpdated
case resourcepb.WatchEvent_DELETED:
action = DataActionDeleted
default:
rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
Key: req.Key,
Action: req.Action,
Error: "invalid event type",
})
continue
}
err := obj.UnmarshalJSON(req.Value)
if err != nil {
rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
Key: req.Key,
Action: req.Action,
Error: "unable to unmarshal json",
})
continue
}
dataKey := DataKey{
Group: req.Key.Group,
Resource: req.Key.Resource,
Namespace: req.Key.Namespace,
Name: req.Key.Name,
ResourceVersion: bulkRvGenerator.next(obj),
Action: action,
Folder: req.Folder,
}
err = k.dataStore.Save(ctx, dataKey, bytes.NewReader(req.Value))
if err != nil {
rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
Key: req.Key,
Action: req.Action,
Error: fmt.Sprintf("failed to save resource: %s", err),
})
continue
}
saved = append(saved, dataKey)
}
for _, key := range setting.Collection {
if err := k.updateLastImportTime(ctx, key, time.Now()); err != nil {
rollback()
rsp.Error = AsErrorResult(err)
return rsp
}
}
return rsp
}
// readAndClose reads all data from a ReadCloser and ensures it's closed,
// combining any errors from both operations.
func readAndClose(r io.ReadCloser) ([]byte, error) {
@@ -1997,3 +1997,68 @@ func TestKvStorageBackend_ClusterScopedResources(t *testing.T) {
}
}
}
func TestKvStorageBackend_ResourceLastImportTime(t *testing.T) {
backend := setupTestStorageBackend(t)
ctx := context.Background()
lastImportTimes := []ResourceLastImportTime{
{
NamespacedResource: NamespacedResource{
Namespace: "stacks-1",
Group: "apps",
Resource: "resource",
},
LastImportTime: time.Now().Add(-2 * time.Minute).UTC().Truncate(time.Microsecond),
},
{
NamespacedResource: NamespacedResource{
Namespace: "stacks-2",
Group: "apps",
Resource: "resource",
},
LastImportTime: time.Now().Add(-7 * time.Minute).UTC().Truncate(time.Microsecond),
},
{
NamespacedResource: NamespacedResource{
Namespace: "stacks-3",
Group: "apps",
Resource: "resource",
},
LastImportTime: time.Now().Add(-3 * time.Minute).UTC().Truncate(time.Microsecond),
},
{
NamespacedResource: NamespacedResource{
Namespace: "stacks-4",
Group: "apps",
Resource: "resource",
},
LastImportTime: time.Now().Add(-24 * time.Minute).UTC().Truncate(time.Microsecond),
},
{
NamespacedResource: NamespacedResource{
Namespace: "stacks-5",
Group: "apps",
Resource: "resource",
},
LastImportTime: time.Now().Add(-44 * time.Minute).UTC().Truncate(time.Microsecond),
},
}
for _, lastImportTime := range lastImportTimes {
err := backend.updateLastImportTime(ctx, &resourcepb.ResourceKey{
Namespace: lastImportTime.Namespace,
Group: lastImportTime.Group,
Resource: lastImportTime.Resource,
}, lastImportTime.LastImportTime)
require.NoError(t, err)
}
var i int
for lastImportTime, err := range backend.GetResourceLastImportTimes(ctx) {
require.NoError(t, err)
require.Equal(t, lastImportTimes[i], lastImportTime)
i++
}
}
@@ -31,7 +31,7 @@ func TestBadgerKVStorageBackend(t *testing.T) {
TestBlobSupport: true,
TestListModifiedSince: true,
// Badger does not support bulk import yet.
TestGetResourceLastImportTime: true,
// TestGetResourceLastImportTime: true,
},
})
}