kvstore: add cluster-scoped resource support (#113183)

kvstore add experimental clusterscope resource
This commit is contained in:
Georges Chaudy
2025-11-04 00:53:59 +01:00
committed by GitHub
parent 0649635639
commit 07bf7b2ae1
5 changed files with 345 additions and 44 deletions
+9 -5
View File
@@ -77,8 +77,10 @@ func (k DataKey) Validate() error {
}
// Validate naming conventions for all required fields
if err := validation.IsValidNamespace(k.Namespace); err != nil {
return NewValidationError("namespace", k.Namespace, err[0])
if k.Namespace != clusterScopeNamespace {
if err := validation.IsValidNamespace(k.Namespace); err != nil {
return NewValidationError("namespace", k.Namespace, err[0])
}
}
if err := validation.IsValidGroup(k.Group); err != nil {
return NewValidationError("group", k.Group, err[0])
@@ -117,7 +119,7 @@ func (k ListRequestKey) Validate() error {
if k.Namespace == "" && k.Name != "" {
return errors.New(ErrNameMustBeEmptyWhenNamespaceEmpty)
}
if k.Namespace != "" {
if k.Namespace != "" && k.Namespace != clusterScopeNamespace {
if err := validation.IsValidNamespace(k.Namespace); err != nil {
return NewValidationError("namespace", k.Namespace, err[0])
}
@@ -155,8 +157,10 @@ func (k GetRequestKey) Validate() error {
if k.Namespace == "" {
return errors.New(ErrNamespaceRequired)
}
if err := validation.IsValidNamespace(k.Namespace); err != nil {
return NewValidationError("namespace", k.Namespace, err[0])
if k.Namespace != clusterScopeNamespace {
if err := validation.IsValidNamespace(k.Namespace); err != nil {
return NewValidationError("namespace", k.Namespace, err[0])
}
}
if err := validation.IsValidGroup(k.Group); err != nil {
return NewValidationError("group", k.Group, err[0])
+4 -2
View File
@@ -51,8 +51,10 @@ func (k EventKey) Validate() error {
// Validate each field against the naming rules
// Validate naming conventions for all required fields
if err := validation.IsValidNamespace(k.Namespace); err != nil {
return NewValidationError("namespace", k.Namespace, err[0])
if k.Namespace != clusterScopeNamespace {
if err := validation.IsValidNamespace(k.Namespace); err != nil {
return NewValidationError("namespace", k.Namespace, err[0])
}
}
if err := validation.IsValidGroup(k.Group); err != nil {
return NewValidationError("group", k.Group, err[0])
+4 -2
View File
@@ -35,8 +35,10 @@ func verifyRequestKeyNamespaceGroupResource(key *resourcepb.ResourceKey) *resour
if key.Resource == "" {
return NewBadRequestError("request key is missing resource")
}
if err := validation.IsValidNamespace(key.Namespace); err != nil {
return NewBadRequestError(err[0])
if key.Namespace != clusterScopeNamespace {
if err := validation.IsValidNamespace(key.Namespace); err != nil {
return NewBadRequestError(err[0])
}
}
if err := validation.IsValidGroup(key.Group); err != nil {
return NewBadRequestError(err[0])
+65 -35
View File
@@ -29,21 +29,41 @@ const (
prunerMaxEvents = 20
defaultEventRetentionPeriod = 1 * time.Hour
defaultEventPruningInterval = 5 * time.Minute
clusterScopeNamespace = "__cluster__"
)
// convertClusterNamespaceToEmpty converts the internal __cluster__ namespace back to empty string
// for cluster-scoped resources when returning to users
func convertClusterNamespaceToEmpty(namespace string) string {
if namespace == clusterScopeNamespace {
return ""
}
return namespace
}
// convertEmptyToClusterNamespace converts empty namespace to the internal __cluster__ namespace
// for cluster-scoped resources when WithExperimentalClusterScope is enabled
func convertEmptyToClusterNamespace(namespace string, withExperimentalClusterScope bool) string {
if withExperimentalClusterScope && namespace == "" {
return clusterScopeNamespace
}
return namespace
}
// kvStorageBackend Unified storage backend based on KV storage.
type kvStorageBackend struct {
snowflake *snowflake.Node
kv KV
dataStore *dataStore
eventStore *eventStore
notifier *notifier
builder DocumentBuilder
log logging.Logger
withPruner bool
eventRetentionPeriod time.Duration
eventPruningInterval time.Duration
historyPruner Pruner
snowflake *snowflake.Node
kv KV
dataStore *dataStore
eventStore *eventStore
notifier *notifier
builder DocumentBuilder
log logging.Logger
withPruner bool
eventRetentionPeriod time.Duration
eventPruningInterval time.Duration
historyPruner Pruner
withExperimentalClusterScope bool
//tracer trace.Tracer
//reg prometheus.Registerer
}
@@ -51,12 +71,13 @@ type kvStorageBackend struct {
var _ StorageBackend = &kvStorageBackend{}
type KVBackendOptions struct {
KvStore KV
WithPruner bool
EventRetentionPeriod time.Duration // How long to keep events (default: 1 hour)
EventPruningInterval time.Duration // How often to run the event pruning (default: 5 minutes)
Tracer trace.Tracer // TODO add tracing
Reg prometheus.Registerer // TODO add metrics
KvStore KV
WithPruner bool
WithExperimentalClusterScope bool // Allow empty namespace to be used for cluster-scoped resources.
EventRetentionPeriod time.Duration // How long to keep events (default: 1 hour)
EventPruningInterval time.Duration // How often to run the event pruning (default: 5 minutes)
Tracer trace.Tracer // TODO add tracing
Reg prometheus.Registerer // TODO add metrics
}
func NewKVStorageBackend(opts KVBackendOptions) (StorageBackend, error) {
@@ -80,15 +101,16 @@ func NewKVStorageBackend(opts KVBackendOptions) (StorageBackend, error) {
}
backend := &kvStorageBackend{
kv: kv,
dataStore: newDataStore(kv),
eventStore: eventStore,
notifier: newNotifier(eventStore, notifierOptions{}),
snowflake: s,
builder: StandardDocumentBuilder(), // For now we use the standard document builder.
log: &logging.NoOpLogger{}, // Make this configurable
eventRetentionPeriod: eventRetentionPeriod,
eventPruningInterval: eventPruningInterval,
kv: kv,
dataStore: newDataStore(kv),
eventStore: eventStore,
notifier: newNotifier(eventStore, notifierOptions{}),
snowflake: s,
builder: StandardDocumentBuilder(), // For now we use the standard document builder.
log: &logging.NoOpLogger{}, // Make this configurable
eventRetentionPeriod: eventRetentionPeriod,
eventPruningInterval: eventPruningInterval,
withExperimentalClusterScope: opts.WithExperimentalClusterScope,
}
err = backend.initPruner(ctx)
if err != nil {
@@ -206,6 +228,8 @@ func (k *kvStorageBackend) WriteEvent(ctx context.Context, event WriteEvent) (in
}
rv := k.snowflake.Generate().Int64()
namespace := convertEmptyToClusterNamespace(event.Key.Namespace, k.withExperimentalClusterScope)
obj := event.Object
// Write data.
var action DataAction
@@ -216,7 +240,7 @@ func (k *kvStorageBackend) WriteEvent(ctx context.Context, event WriteEvent) (in
_, err := k.dataStore.GetLatestResourceKey(ctx, GetRequestKey{
Group: event.Key.Group,
Resource: event.Key.Resource,
Namespace: event.Key.Namespace,
Namespace: namespace,
Name: event.Key.Name,
})
if err == nil {
@@ -244,7 +268,7 @@ func (k *kvStorageBackend) WriteEvent(ctx context.Context, event WriteEvent) (in
err := k.dataStore.Save(ctx, DataKey{
Group: event.Key.Group,
Resource: event.Key.Resource,
Namespace: event.Key.Namespace,
Namespace: namespace,
Name: event.Key.Name,
ResourceVersion: rv,
Action: action,
@@ -256,7 +280,7 @@ func (k *kvStorageBackend) WriteEvent(ctx context.Context, event WriteEvent) (in
// Write event
err = k.eventStore.Save(ctx, Event{
Namespace: event.Key.Namespace,
Namespace: namespace,
Group: event.Key.Group,
Resource: event.Key.Resource,
Name: event.Key.Name,
@@ -270,7 +294,7 @@ func (k *kvStorageBackend) WriteEvent(ctx context.Context, event WriteEvent) (in
}
_ = k.historyPruner.Add(PruningKey{
Namespace: event.Key.Namespace,
Namespace: namespace,
Group: event.Key.Group,
Resource: event.Key.Resource,
Name: event.Key.Name,
@@ -283,10 +307,13 @@ func (k *kvStorageBackend) ReadResource(ctx context.Context, req *resourcepb.Rea
if req.Key == nil {
return &BackendReadResponse{Error: &resourcepb.ErrorResult{Code: http.StatusBadRequest, Message: "missing key"}}
}
namespace := convertEmptyToClusterNamespace(req.Key.Namespace, k.withExperimentalClusterScope)
meta, err := k.dataStore.GetResourceKeyAtRevision(ctx, GetRequestKey{
Group: req.Key.Group,
Resource: req.Key.Resource,
Namespace: req.Key.Namespace,
Namespace: namespace,
Name: req.Key.Name,
}, req.ResourceVersion)
if errors.Is(err, ErrNotFound) {
@@ -297,7 +324,7 @@ func (k *kvStorageBackend) ReadResource(ctx context.Context, req *resourcepb.Rea
data, err := k.dataStore.Get(ctx, DataKey{
Group: req.Key.Group,
Resource: req.Key.Resource,
Namespace: req.Key.Namespace,
Namespace: namespace,
Name: req.Key.Name,
ResourceVersion: meta.ResourceVersion,
Action: meta.Action,
@@ -323,6 +350,9 @@ func (k *kvStorageBackend) ListIterator(ctx context.Context, req *resourcepb.Lis
if req.Options == nil || req.Options.Key == nil {
return 0, fmt.Errorf("missing options or key in ListRequest")
}
namespace := convertEmptyToClusterNamespace(req.Options.Key.Namespace, k.withExperimentalClusterScope)
// Parse continue token if provided
offset := int64(0)
resourceVersion := req.ResourceVersion
@@ -347,7 +377,7 @@ func (k *kvStorageBackend) ListIterator(ctx context.Context, req *resourcepb.Lis
for dataKey, err := range k.dataStore.ListResourceKeysAtRevision(ctx, ListRequestKey{
Group: req.Options.Key.Group,
Resource: req.Options.Key.Resource,
Namespace: req.Options.Key.Namespace,
Namespace: namespace,
Name: req.Options.Key.Name,
}, resourceVersion) {
if err != nil {
@@ -439,7 +469,7 @@ func (i *kvListIterator) ResourceVersion() int64 {
func (i *kvListIterator) Namespace() string {
if i.currentDataObj != nil {
return i.currentDataObj.Key.Namespace
return convertClusterNamespaceToEmpty(i.currentDataObj.Key.Namespace)
}
return ""
}
@@ -1049,7 +1079,7 @@ func (k *kvStorageBackend) WatchWriteEvents(ctx context.Context) (<-chan *Writte
events <- &WrittenEvent{
Key: &resourcepb.ResourceKey{
Namespace: event.Namespace,
Namespace: convertClusterNamespaceToEmpty(event.Namespace),
Group: event.Group,
Resource: event.Resource,
Name: event.Name,
@@ -36,6 +36,19 @@ func setupTestStorageBackend(t *testing.T) *kvStorageBackend {
return kvBackend
}
func setupTestStorageBackendWithClusterScope(t *testing.T) *kvStorageBackend {
kv := setupTestKV(t)
opts := KVBackendOptions{
KvStore: kv,
WithPruner: true,
WithExperimentalClusterScope: true,
}
backend, err := NewKVStorageBackend(opts)
kvBackend := backend.(*kvStorageBackend)
require.NoError(t, err)
return kvBackend
}
func TestNewKvStorageBackend(t *testing.T) {
backend := setupTestStorageBackend(t)
@@ -1584,3 +1597,253 @@ func createAndWriteTestObject(t *testing.T, backend *kvStorageBackend) (*unstruc
return testObj, rv
}
// TestKvStorageBackend_ClusterScopedResources tests create, update, delete, list, and watch
// operations for cluster-scoped resources (empty namespace).
// This test requires the backend to be configured with WithExperimentalClusterScoped set to true.
//
// The test verifies that:
// - All write operations accept empty namespace
// - ReadResource responses return empty namespace
// - ListIterator results return empty namespace
// - WatchWriteEvents return empty namespace
func TestKvStorageBackend_ClusterScopedResources(t *testing.T) {
backend := setupTestStorageBackendWithClusterScope(t)
ctx := context.Background()
// Start watching for events before creating resources
stream, err := backend.WatchWriteEvents(ctx)
require.NoError(t, err)
// Use empty namespace for cluster-scoped resources
clusterNS := NamespacedResource{
Namespace: "",
Group: "cluster.example.com",
Resource: "clusterresources",
}
// Test Create - Add 3 cluster-scoped resources
testObj1, err := createTestObjectWithName("cluster-item1", clusterNS, "data-1")
require.NoError(t, err)
metaAccessor1, err := utils.MetaAccessor(testObj1)
require.NoError(t, err)
writeEvent1 := WriteEvent{
Type: resourcepb.WatchEvent_ADDED,
Key: &resourcepb.ResourceKey{
Namespace: "",
Group: "cluster.example.com",
Resource: "clusterresources",
Name: "cluster-item1",
},
Value: objectToJSONBytes(t, testObj1),
Object: metaAccessor1,
PreviousRV: 0,
}
rv1, err := backend.WriteEvent(ctx, writeEvent1)
require.NoError(t, err)
require.Greater(t, rv1, int64(0))
testObj2, err := createTestObjectWithName("cluster-item2", clusterNS, "data-2")
require.NoError(t, err)
metaAccessor2, err := utils.MetaAccessor(testObj2)
require.NoError(t, err)
writeEvent2 := WriteEvent{
Type: resourcepb.WatchEvent_ADDED,
Key: &resourcepb.ResourceKey{
Namespace: "",
Group: "cluster.example.com",
Resource: "clusterresources",
Name: "cluster-item2",
},
Value: objectToJSONBytes(t, testObj2),
Object: metaAccessor2,
PreviousRV: 0,
}
rv2, err := backend.WriteEvent(ctx, writeEvent2)
require.NoError(t, err)
require.Greater(t, rv2, rv1)
testObj3, err := createTestObjectWithName("cluster-item3", clusterNS, "data-3")
require.NoError(t, err)
metaAccessor3, err := utils.MetaAccessor(testObj3)
require.NoError(t, err)
writeEvent3 := WriteEvent{
Type: resourcepb.WatchEvent_ADDED,
Key: &resourcepb.ResourceKey{
Namespace: "",
Group: "cluster.example.com",
Resource: "clusterresources",
Name: "cluster-item3",
},
Value: objectToJSONBytes(t, testObj3),
Object: metaAccessor3,
PreviousRV: 0,
}
rv3, err := backend.WriteEvent(ctx, writeEvent3)
require.NoError(t, err)
require.Greater(t, rv3, rv2)
// Test Update - Modify cluster-item2
testObj2.Object["spec"].(map[string]any)["value"] = "updated-data"
metaAccessor2Updated, err := utils.MetaAccessor(testObj2)
require.NoError(t, err)
writeEvent2Updated := WriteEvent{
Type: resourcepb.WatchEvent_MODIFIED,
Key: &resourcepb.ResourceKey{
Namespace: "",
Group: "cluster.example.com",
Resource: "clusterresources",
Name: "cluster-item2",
},
Value: objectToJSONBytes(t, testObj2),
Object: metaAccessor2Updated,
ObjectOld: metaAccessor2,
PreviousRV: rv2,
}
rv4, err := backend.WriteEvent(ctx, writeEvent2Updated)
require.NoError(t, err)
require.Greater(t, rv4, rv3)
// Test Read - Read latest cluster-item2
readReq := &resourcepb.ReadRequest{
Key: &resourcepb.ResourceKey{
Name: "cluster-item2",
Namespace: "", // Request with empty namespace
Group: "cluster.example.com",
Resource: "clusterresources",
},
ResourceVersion: 0,
}
response := backend.ReadResource(ctx, readReq)
require.Nil(t, response.Error)
require.Equal(t, rv4, response.ResourceVersion)
require.Contains(t, string(response.Value), "updated-data")
require.NotNil(t, response.Key, "response key should be populated")
require.Empty(t, response.Key.Namespace, "cluster-scoped resource should have empty namespace in response")
// Test Read - Read early version of cluster-item2
readReq.ResourceVersion = rv3 // Should return rv2 version
response = backend.ReadResource(ctx, readReq)
require.Nil(t, response.Error)
require.Equal(t, rv2, response.ResourceVersion)
require.Contains(t, string(response.Value), "data-2")
require.NotNil(t, response.Key, "response key should be populated")
require.Empty(t, response.Key.Namespace, "cluster-scoped resource should have empty namespace in response")
// Test Delete - Delete cluster-item1
writeEvent1Delete := WriteEvent{
Type: resourcepb.WatchEvent_DELETED,
Key: &resourcepb.ResourceKey{
Namespace: "",
Group: "cluster.example.com",
Resource: "clusterresources",
Name: "cluster-item1",
},
Value: objectToJSONBytes(t, testObj1),
Object: metaAccessor1,
ObjectOld: metaAccessor1,
PreviousRV: rv1,
}
rv5, err := backend.WriteEvent(ctx, writeEvent1Delete)
require.NoError(t, err)
require.Greater(t, rv5, rv4)
// Test List - List all cluster-scoped resources
listReq := &resourcepb.ListRequest{
Options: &resourcepb.ListOptions{
Key: &resourcepb.ResourceKey{
Namespace: "",
Group: "cluster.example.com",
Resource: "clusterresources",
},
},
Limit: 10,
}
var listedItems []struct {
name string
namespace string
value []byte
}
rv, err := backend.ListIterator(ctx, listReq, func(iter ListIterator) error {
for iter.Next() {
if err := iter.Error(); err != nil {
return err
}
listedItems = append(listedItems, struct {
name string
namespace string
value []byte
}{
name: iter.Name(),
namespace: iter.Namespace(),
value: iter.Value(),
})
}
return iter.Error()
})
require.NoError(t, err)
require.GreaterOrEqual(t, rv, rv5)
require.Len(t, listedItems, 2) // cluster-item2 and cluster-item3 (item1 was deleted)
// Verify all items have empty namespace
for _, item := range listedItems {
require.Empty(t, item.namespace, "cluster-scoped resources should have empty namespace")
}
// Verify items are sorted and have expected content
require.Equal(t, "cluster-item2", listedItems[0].name)
require.Contains(t, string(listedItems[0].value), "updated-data")
require.Equal(t, "cluster-item3", listedItems[1].name)
require.Contains(t, string(listedItems[1].value), "data-3")
// Verify deleted resource is not in list
readReqDeleted := &resourcepb.ReadRequest{
Key: &resourcepb.ResourceKey{
Name: "cluster-item1",
Namespace: "",
Group: "cluster.example.com",
Resource: "clusterresources",
},
ResourceVersion: 0,
}
responseDeleted := backend.ReadResource(ctx, readReqDeleted)
require.NotNil(t, responseDeleted.Error)
require.Equal(t, int32(404), responseDeleted.Error.Code)
// Key should still be empty for cluster-scoped resources even on error
if responseDeleted.Key != nil {
require.Empty(t, responseDeleted.Key.Namespace, "cluster-scoped resource should have empty namespace even on error")
}
// Test Watch - Verify all events were published with empty namespace
watchedEvents := []struct {
name string
expectedType resourcepb.WatchEvent_Type
expectedRV int64
}{
{"cluster-item1", resourcepb.WatchEvent_ADDED, rv1},
{"cluster-item2", resourcepb.WatchEvent_ADDED, rv2},
{"cluster-item3", resourcepb.WatchEvent_ADDED, rv3},
{"cluster-item2", resourcepb.WatchEvent_MODIFIED, rv4},
{"cluster-item1", resourcepb.WatchEvent_DELETED, rv5},
}
for i, expected := range watchedEvents {
select {
case event := <-stream:
require.Equal(t, expected.name, event.Key.Name, "Event %d: wrong name", i)
require.Empty(t, event.Key.Namespace, "Event %d: cluster-scoped resource should have empty namespace", i)
require.Equal(t, "cluster.example.com", event.Key.Group, "Event %d: wrong group", i)
require.Equal(t, "clusterresources", event.Key.Resource, "Event %d: wrong resource", i)
require.Equal(t, expected.expectedType, event.Type, "Event %d: wrong type", i)
require.Equal(t, expected.expectedRV, event.ResourceVersion, "Event %d: wrong resource version", i)
case <-ctx.Done():
t.Fatalf("Timeout waiting for event %d", i)
}
}
}