diff --git a/.gitignore b/.gitignore index b26c9b1e7d5..08c6fe87692 100644 --- a/.gitignore +++ b/.gitignore @@ -245,3 +245,7 @@ public/mockServiceWorker.js /e2e-playwright/test-plugins/*/dist /apps/provisioning/cmd/job-controller/bin/ + + +# Ignore unified storage kv store files +/grafana-kv-data diff --git a/pkg/storage/unified/resource/pruner.go b/pkg/storage/unified/resource/pruner.go new file mode 100644 index 00000000000..e88519cbeae --- /dev/null +++ b/pkg/storage/unified/resource/pruner.go @@ -0,0 +1,30 @@ +package resource + +import "context" + +// Pruner Small abstraction to allow for different Pruner implementations. +// This can be removed once the debouncer is deployed. +type Pruner interface { + Add(key PruningKey) error + Start(ctx context.Context) +} + +// PruningKey is a comparable key for pruning history. +type PruningKey struct { + Namespace string + Group string + Resource string + Name string +} + +func (k PruningKey) Validate() bool { + return k.Namespace != "" && k.Group != "" && k.Resource != "" && k.Name != "" +} + +type NoopPruner struct{} + +func (p *NoopPruner) Add(key PruningKey) error { + return nil +} + +func (p *NoopPruner) Start(ctx context.Context) {} diff --git a/pkg/storage/unified/resource/pruner_test.go b/pkg/storage/unified/resource/pruner_test.go new file mode 100644 index 00000000000..eaf7ce27cb9 --- /dev/null +++ b/pkg/storage/unified/resource/pruner_test.go @@ -0,0 +1,67 @@ +package resource + +import "testing" + +func TestPrunerValidate(t *testing.T) { + tests := []struct { + name string + key PruningKey + expected bool + }{ + { + name: "valid key", + key: PruningKey{ + Namespace: "default", + Group: "apps", + Resource: "deployments", + Name: "my-deployment", + }, + expected: true, + }, + { + name: "missing namespace", + key: PruningKey{ + Group: "apps", + Resource: "deployments", + Name: "my-deployment", + }, + expected: false, + }, + { + name: "missing group", + key: PruningKey{ + Namespace: "default", + Resource: "deployments", + Name: "my-deployment", + }, + expected: false, + }, + { + name: "missing resource", + key: PruningKey{ + Namespace: "default", + Group: "apps", + Name: "my-deployment", + }, + expected: false, + }, + { + name: "missing name", + key: PruningKey{ + Namespace: "default", + Group: "apps", + Resource: "deployments", + }, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := tt.key.Validate() + if result != tt.expected { + t.Errorf("expected %v, got %v", tt.expected, result) + } + }) + } +} diff --git a/pkg/storage/unified/resource/storage_backend.go b/pkg/storage/unified/resource/storage_backend.go index 7c2ab610930..0408a758b0b 100644 --- a/pkg/storage/unified/resource/storage_backend.go +++ b/pkg/storage/unified/resource/storage_backend.go @@ -18,34 +18,52 @@ import ( "github.com/grafana/grafana-app-sdk/logging" "github.com/grafana/grafana/pkg/apimachinery/utils" "github.com/grafana/grafana/pkg/storage/unified/resourcepb" + "github.com/grafana/grafana/pkg/util/debouncer" + "github.com/prometheus/client_golang/prometheus" + "go.opentelemetry.io/otel/trace" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) const ( defaultListBufferSize = 100 + prunerMaxEvents = 20 ) -// Unified storage backend based on KV storage. +// kvStorageBackend Unified storage backend based on KV storage. type kvStorageBackend struct { - snowflake *snowflake.Node - kv KV - dataStore *dataStore - metaStore *metadataStore - eventStore *eventStore - notifier *notifier - builder DocumentBuilder - log logging.Logger + snowflake *snowflake.Node + kv KV + dataStore *dataStore + metaStore *metadataStore + eventStore *eventStore + notifier *notifier + builder DocumentBuilder + log logging.Logger + withPruner bool + historyPruner Pruner + //tracer trace.Tracer + //reg prometheus.Registerer } var _ StorageBackend = &kvStorageBackend{} -func NewKvStorageBackend(kv KV) *kvStorageBackend { +type KvBackendOptions struct { + KvStore KV + WithPruner bool + Tracer trace.Tracer // TODO add tracing + Reg prometheus.Registerer // TODO add metrics +} + +func NewKvStorageBackend(opts KvBackendOptions) (StorageBackend, error) { + ctx := context.Background() + kv := opts.KvStore + s, err := snowflake.NewNode(rand.Int64N(1024)) if err != nil { - panic(err) + return nil, fmt.Errorf("failed to create snowflake node: %w", err) } eventStore := newEventStore(kv) - return &kvStorageBackend{ + backend := &kvStorageBackend{ kv: kv, dataStore: newDataStore(kv), metaStore: newMetadataStore(kv), @@ -55,6 +73,72 @@ func NewKvStorageBackend(kv KV) *kvStorageBackend { builder: StandardDocumentBuilder(), // For now we use the standard document builder. log: &logging.NoOpLogger{}, // Make this configurable } + err = backend.initPruner(ctx) + if err != nil { + return nil, fmt.Errorf("failed to initialize pruner: %w", err) + } + return backend, nil +} + +func (k *kvStorageBackend) pruneEvents(ctx context.Context, key PruningKey) error { + if !key.Validate() { + return fmt.Errorf("invalid pruning key, all fields must be set: %+v", key) + } + + keepEvents := make([]DataKey, 0, prunerMaxEvents) + + // iterate over all keys for the resource and delete versions beyond the latest 20 + for datakey, err := range k.dataStore.Keys(ctx, ListRequestKey(key)) { + if err != nil { + return err + } + + if len(keepEvents) < prunerMaxEvents { + keepEvents = append(keepEvents, datakey) + continue + } + + // If we already have 20 versions, delete the oldest one and append the new one + err := k.dataStore.Delete(ctx, keepEvents[0]) + if err != nil { + return err + } + keepEvents = append(keepEvents[1:], datakey) + } + + return nil +} + +func (k *kvStorageBackend) initPruner(ctx context.Context) error { + if !k.withPruner { + k.log.Debug("Pruner disabled, using noop pruner") + k.historyPruner = &NoopPruner{} + return nil + } + + k.log.Debug("Initializing history pruner") + pruner, err := debouncer.NewGroup(debouncer.DebouncerOpts[PruningKey]{ + Name: "history_pruner", + BufferSize: 1000, + MinWait: time.Second * 30, + MaxWait: time.Minute * 5, + ProcessHandler: k.pruneEvents, + ErrorHandler: func(key PruningKey, err error) { + k.log.Error("failed to prune history", + "namespace", key.Namespace, + "group", key.Group, + "resource", key.Resource, + "name", key.Name, + "error", err) + }, + }) + if err != nil { + return err + } + + k.historyPruner = pruner + k.historyPruner.Start(ctx) + return nil } // WriteEvent writes a resource event (create/update/delete) to the storage backend. @@ -150,6 +234,14 @@ func (k *kvStorageBackend) WriteEvent(ctx context.Context, event WriteEvent) (in if err != nil { return 0, fmt.Errorf("failed to save event: %w", err) } + + _ = k.historyPruner.Add(PruningKey{ + Namespace: event.Key.Namespace, + Group: event.Key.Group, + Resource: event.Key.Resource, + Name: event.Key.Name, + }) + return rv, nil } diff --git a/pkg/storage/unified/resource/storage_backend_test.go b/pkg/storage/unified/resource/storage_backend_test.go index df33be85d5b..0ffad8d6e6a 100644 --- a/pkg/storage/unified/resource/storage_backend_test.go +++ b/pkg/storage/unified/resource/storage_backend_test.go @@ -27,7 +27,14 @@ var appsNamespace = NamespacedResource{ func setupTestStorageBackend(t *testing.T) *kvStorageBackend { kv := setupTestKV(t) - return NewKvStorageBackend(kv) + opts := KvBackendOptions{ + KvStore: kv, + WithPruner: true, + } + backend, err := NewKvStorageBackend(opts) + kvBackend := backend.(*kvStorageBackend) + require.NoError(t, err) + return kvBackend } func TestNewKvStorageBackend(t *testing.T) { @@ -1180,6 +1187,141 @@ func TestKvStorageBackend_GetResourceStats_Success(t *testing.T) { require.Equal(t, int64(2), filteredStats[0].Count) } +func TestKvStorageBackend_PruneEvents(t *testing.T) { + t.Run("will prune oldest events when exceeding limit", func(t *testing.T) { + backend := setupTestStorageBackend(t) + ctx := context.Background() + + // Create a resource + testObj, err := createTestObjectWithName("test-resource", "apps", "test-data") + require.NoError(t, err) + metaAccessor, err := utils.MetaAccessor(testObj) + require.NoError(t, err) + writeEvent := WriteEvent{ + Type: resourcepb.WatchEvent_ADDED, + Key: &resourcepb.ResourceKey{ + Namespace: "default", + Group: "apps", + Resource: "resources", + Name: "test-resource", + }, + Value: objectToJSONBytes(t, testObj), + Object: metaAccessor, + PreviousRV: 0, + } + rv1, err := backend.WriteEvent(ctx, writeEvent) + require.NoError(t, err) + + // Update the resource prunerMaxEvents times. This will create one more event than the pruner limit. + previousRV := rv1 + for i := 0; i < prunerMaxEvents; i++ { + testObj.Object["spec"].(map[string]any)["value"] = fmt.Sprintf("update-%d", i) + writeEvent.Type = resourcepb.WatchEvent_MODIFIED + writeEvent.Value = objectToJSONBytes(t, testObj) + writeEvent.PreviousRV = previousRV + newRv, err := backend.WriteEvent(ctx, writeEvent) + require.NoError(t, err) + previousRV = newRv + } + + pruningKey := PruningKey{ + Namespace: "default", + Group: "apps", + Resource: "resources", + Name: "test-resource", + } + + err = backend.pruneEvents(ctx, pruningKey) + require.NoError(t, err) + + // Verify the first event has been pruned (rv1) + eventKey1 := DataKey{ + Namespace: "default", + Group: "apps", + Resource: "resources", + Name: "test-resource", + ResourceVersion: rv1, + } + + _, err = backend.dataStore.Get(ctx, eventKey1) + require.Error(t, err) // Should return error as event is pruned + + // assert prunerMaxEvents most recent events exist + counter := 0 + for datakey, err := range backend.dataStore.Keys(ctx, ListRequestKey{ + Namespace: "default", + Group: "apps", + Resource: "resources", + Name: "test-resource", + }) { + require.NoError(t, err) + require.NotEqual(t, rv1, datakey.ResourceVersion) + counter++ + } + require.Equal(t, prunerMaxEvents, counter) + }) + + t.Run("will not prune events when less than limit", func(t *testing.T) { + backend := setupTestStorageBackend(t) + ctx := context.Background() + + // Create a resource + testObj, err := createTestObjectWithName("test-resource", "apps", "test-data") + require.NoError(t, err) + metaAccessor, err := utils.MetaAccessor(testObj) + require.NoError(t, err) + writeEvent := WriteEvent{ + Type: resourcepb.WatchEvent_ADDED, + Key: &resourcepb.ResourceKey{ + Namespace: "default", + Group: "apps", + Resource: "resources", + Name: "test-resource", + }, + Value: objectToJSONBytes(t, testObj), + Object: metaAccessor, + PreviousRV: 0, + } + rv1, err := backend.WriteEvent(ctx, writeEvent) + require.NoError(t, err) + + // Update the resource prunerMaxEvents-1 times. This will create same number of events as the pruner limit. + previousRV := rv1 + for i := 0; i < prunerMaxEvents-1; i++ { + testObj.Object["spec"].(map[string]any)["value"] = fmt.Sprintf("update-%d", i) + writeEvent.Type = resourcepb.WatchEvent_MODIFIED + writeEvent.Value = objectToJSONBytes(t, testObj) + writeEvent.PreviousRV = previousRV + newRv, err := backend.WriteEvent(ctx, writeEvent) + require.NoError(t, err) + previousRV = newRv + } + + pruningKey := PruningKey{ + Namespace: "default", + Group: "apps", + Resource: "resources", + Name: "test-resource", + } + + err = backend.pruneEvents(ctx, pruningKey) + require.NoError(t, err) + + // assert all events exist + counter := 0 + for _, err := range backend.dataStore.Keys(ctx, ListRequestKey{ + Namespace: "default", + Group: "apps", + Resource: "resources", + Name: "test-resource", + }) { + require.NoError(t, err) + counter++ + } + require.Equal(t, prunerMaxEvents, counter) + }) +} + // createTestObject creates a test unstructured object with standard values func createTestObject() (*unstructured.Unstructured, error) { return createTestObjectWithName("test-resource", appsNamespace, "test data") diff --git a/pkg/storage/unified/sql/backend.go b/pkg/storage/unified/sql/backend.go index 74cff3c14e0..edbbe7760e0 100644 --- a/pkg/storage/unified/sql/backend.go +++ b/pkg/storage/unified/sql/backend.go @@ -91,29 +91,6 @@ func NewBackend(opts BackendOptions) (Backend, error) { }, nil } -// pruningKey is a comparable key for pruning history. -type pruningKey struct { - namespace string - group string - resource string - name string -} - -// Small abstraction to allow for different pruner implementations. -// This can be removed once the debouncer is deployed. -type pruner interface { - Add(key pruningKey) error - Start(ctx context.Context) -} - -type noopPruner struct{} - -func (p *noopPruner) Add(key pruningKey) error { - return nil -} - -func (p *noopPruner) Start(ctx context.Context) {} - type backend struct { //general isHA bool @@ -148,7 +125,7 @@ type backend struct { // testing simulatedNetworkLatency time.Duration - historyPruner pruner + historyPruner resource.Pruner withPruner bool } @@ -205,26 +182,26 @@ func (b *backend) initLocked(ctx context.Context) error { func (b *backend) initPruner(ctx context.Context) error { if !b.withPruner { b.log.Debug("using noop history pruner") - b.historyPruner = &noopPruner{} + b.historyPruner = &resource.NoopPruner{} return nil } b.log.Debug("using debounced history pruner") // Initialize history pruner. - pruner, err := debouncer.NewGroup(debouncer.DebouncerOpts[pruningKey]{ + pruner, err := debouncer.NewGroup(debouncer.DebouncerOpts[resource.PruningKey]{ Name: "history_pruner", BufferSize: 1000, MinWait: time.Second * 30, MaxWait: time.Minute * 5, - ProcessHandler: func(ctx context.Context, key pruningKey) error { + ProcessHandler: func(ctx context.Context, key resource.PruningKey) error { return b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error { res, err := dbutil.Exec(ctx, tx, sqlResourceHistoryPrune, &sqlPruneHistoryRequest{ SQLTemplate: sqltemplate.New(b.dialect), HistoryLimit: defaultPrunerHistoryLimit, Key: &resourcepb.ResourceKey{ - Namespace: key.namespace, - Group: key.group, - Resource: key.resource, - Name: key.name, + Namespace: key.Namespace, + Group: key.Group, + Resource: key.Resource, + Name: key.Name, }, }) if err != nil { @@ -235,20 +212,20 @@ func (b *backend) initPruner(ctx context.Context) error { return fmt.Errorf("failed to get rows affected: %w", err) } b.log.Debug("pruned history successfully", - "namespace", key.namespace, - "group", key.group, - "resource", key.resource, - "name", key.name, + "namespace", key.Namespace, + "group", key.Group, + "resource", key.Resource, + "name", key.Name, "rows", rows) return nil }) }, - ErrorHandler: func(key pruningKey, err error) { + ErrorHandler: func(key resource.PruningKey, err error) { b.log.Error("failed to prune history", - "namespace", key.namespace, - "group", key.group, - "resource", key.resource, - "name", key.name, + "namespace", key.Namespace, + "group", key.Group, + "resource", key.Resource, + "name", key.Name, "error", err) }, Reg: b.reg, @@ -361,11 +338,11 @@ func (b *backend) create(ctx context.Context, event resource.WriteEvent) (int64, }); err != nil { return event.GUID, fmt.Errorf("insert into resource history: %w", err) } - _ = b.historyPruner.Add(pruningKey{ - namespace: event.Key.Namespace, - group: event.Key.Group, - resource: event.Key.Resource, - name: event.Key.Name, + _ = b.historyPruner.Add(resource.PruningKey{ + Namespace: event.Key.Namespace, + Group: event.Key.Group, + Resource: event.Key.Resource, + Name: event.Key.Name, }) if b.simulatedNetworkLatency > 0 { time.Sleep(b.simulatedNetworkLatency) @@ -448,11 +425,11 @@ func (b *backend) update(ctx context.Context, event resource.WriteEvent) (int64, }); err != nil { return event.GUID, fmt.Errorf("insert into resource history: %w", err) } - _ = b.historyPruner.Add(pruningKey{ - namespace: event.Key.Namespace, - group: event.Key.Group, - resource: event.Key.Resource, - name: event.Key.Name, + _ = b.historyPruner.Add(resource.PruningKey{ + Namespace: event.Key.Namespace, + Group: event.Key.Group, + Resource: event.Key.Resource, + Name: event.Key.Name, }) return event.GUID, nil }) @@ -502,11 +479,11 @@ func (b *backend) delete(ctx context.Context, event resource.WriteEvent) (int64, }); err != nil { return event.GUID, fmt.Errorf("insert into resource history: %w", err) } - _ = b.historyPruner.Add(pruningKey{ - namespace: event.Key.Namespace, - group: event.Key.Group, - resource: event.Key.Resource, - name: event.Key.Name, + _ = b.historyPruner.Add(resource.PruningKey{ + Namespace: event.Key.Namespace, + Group: event.Key.Group, + Resource: event.Key.Resource, + Name: event.Key.Name, }) return event.GUID, nil }) diff --git a/pkg/storage/unified/testing/storage_backend_test.go b/pkg/storage/unified/testing/storage_backend_test.go index ced0b6869a8..e1cab8b9c62 100644 --- a/pkg/storage/unified/testing/storage_backend_test.go +++ b/pkg/storage/unified/testing/storage_backend_test.go @@ -18,7 +18,12 @@ func TestBadgerKVStorageBackend(t *testing.T) { t.Cleanup(func() { _ = db.Close() }) - return resource.NewKvStorageBackend(resource.NewBadgerKV(db)) + kvOpts := resource.KvBackendOptions{ + KvStore: resource.NewBadgerKV(db), + } + backend, err := resource.NewKvStorageBackend(kvOpts) + require.NoError(t, err) + return backend }, &TestOptions{ NSPrefix: "kvstorage-test", SkipTests: map[string]bool{