Compare commits

..

10 Commits

Author SHA1 Message Date
Michael Mandrus 74c21ce75c update workspace 2026-01-14 14:56:17 -05:00
Michael Mandrus ad7a6e9a7a Merge branch 'main' into mmandrus/secrets/dek-cache 2026-01-14 14:38:57 -05:00
Ezequiel Victorero 72f7bd3900 Snapshots: Support public snapshot instance in latest version (#116086) 2026-01-14 15:28:16 -03:00
Michael Mandrus b73869ea9c use noop cache 2025-11-18 12:10:56 -05:00
Michael Mandrus 3c2f629bb9 Merge remote-tracking branch 'origin/main' into mmandrus/secrets/dek-cache 2025-11-18 11:59:33 -05:00
Michael Mandrus 075761ec66 Merge remote-tracking branch 'origin/main' into mmandrus/secrets/dek-cache 2025-11-14 00:13:08 -05:00
Michael Mandrus 3974e88cbe flush the encryption cache during consolidation 2025-11-14 00:03:48 -05:00
Michael Mandrus 1da89b70a0 use the cache in most places 2025-11-13 23:55:31 -05:00
Michael Mandrus 197019f554 add namespace, plus unit tests for cache 2025-11-13 22:34:25 -05:00
Michael Mandrus 773baf47e1 pass dek cache into encryption manager 2025-11-13 15:33:25 -05:00
26 changed files with 1493 additions and 139 deletions
+2 -2
View File
@@ -32,14 +32,14 @@ require (
github.com/armon/go-radix v1.0.0 // @grafana/grafana-app-platform-squad
github.com/aws/aws-sdk-go v1.55.7 // @grafana/aws-datasources
github.com/aws/aws-sdk-go-v2 v1.40.0 // @grafana/aws-datasources
github.com/aws/aws-sdk-go-v2/credentials v1.18.21 // indirect; @grafana/grafana-operator-experience-squad
github.com/aws/aws-sdk-go-v2/credentials v1.18.21 // @grafana/grafana-operator-experience-squad
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.45.3 // @grafana/aws-datasources
github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.51.0 // @grafana/aws-datasources
github.com/aws/aws-sdk-go-v2/service/ec2 v1.225.2 // @grafana/aws-datasources
github.com/aws/aws-sdk-go-v2/service/oam v1.18.3 // @grafana/aws-datasources
github.com/aws/aws-sdk-go-v2/service/resourcegroupstaggingapi v1.26.6 // @grafana/aws-datasources
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.40.1 // @grafana/grafana-operator-experience-squad
github.com/aws/aws-sdk-go-v2/service/sts v1.39.1 // indirect; @grafana/grafana-operator-experience-squad
github.com/aws/aws-sdk-go-v2/service/sts v1.39.1 // @grafana/grafana-operator-experience-squad
github.com/aws/smithy-go v1.23.2 // @grafana/aws-datasources
github.com/beevik/etree v1.4.1 // @grafana/grafana-backend-group
github.com/benbjohnson/clock v1.3.5 // @grafana/alerting-backend
+24 -18
View File
@@ -76,21 +76,27 @@ func (hs *HTTPServer) CreateDashboardSnapshot(c *contextmodel.ReqContext) {
return
}
// Do not check permissions when the instance snapshot public mode is enabled
if !hs.Cfg.SnapshotPublicMode {
evaluator := ac.EvalAll(ac.EvalPermission(dashboards.ActionSnapshotsCreate), ac.EvalPermission(dashboards.ActionDashboardsRead, dashboards.ScopeDashboardsProvider.GetResourceScopeUID(cmd.Dashboard.GetNestedString("uid"))))
if canSave, err := hs.AccessControl.Evaluate(c.Req.Context(), c.SignedInUser, evaluator); err != nil || !canSave {
c.JsonApiErr(http.StatusForbidden, "forbidden", err)
return
}
}
dashboardsnapshots.CreateDashboardSnapshot(c, snapshot.SnapshotSharingOptions{
cfg := snapshot.SnapshotSharingOptions{
SnapshotsEnabled: hs.Cfg.SnapshotEnabled,
ExternalEnabled: hs.Cfg.ExternalEnabled,
ExternalSnapshotName: hs.Cfg.ExternalSnapshotName,
ExternalSnapshotURL: hs.Cfg.ExternalSnapshotUrl,
}, cmd, hs.dashboardsnapshotsService)
}
if hs.Cfg.SnapshotPublicMode {
// Public mode: no user or dashboard validation needed
dashboardsnapshots.CreateDashboardSnapshotPublic(c, cfg, cmd, hs.dashboardsnapshotsService)
return
}
// Regular mode: check permissions
evaluator := ac.EvalAll(ac.EvalPermission(dashboards.ActionSnapshotsCreate), ac.EvalPermission(dashboards.ActionDashboardsRead, dashboards.ScopeDashboardsProvider.GetResourceScopeUID(cmd.Dashboard.GetNestedString("uid"))))
if canSave, err := hs.AccessControl.Evaluate(c.Req.Context(), c.SignedInUser, evaluator); err != nil || !canSave {
c.JsonApiErr(http.StatusForbidden, "forbidden", err)
return
}
dashboardsnapshots.CreateDashboardSnapshot(c, cfg, cmd, hs.dashboardsnapshotsService)
}
// GET /api/snapshots/:key
@@ -213,13 +219,6 @@ func (hs *HTTPServer) DeleteDashboardSnapshot(c *contextmodel.ReqContext) respon
return response.Error(http.StatusUnauthorized, "OrgID mismatch", nil)
}
if queryResult.External {
err := dashboardsnapshots.DeleteExternalDashboardSnapshot(queryResult.ExternalDeleteURL)
if err != nil {
return response.Error(http.StatusInternalServerError, "Failed to delete external dashboard", err)
}
}
// Dashboard can be empty (creation error or external snapshot). This means that the mustInt here returns a 0,
// which before RBAC would result in a dashboard which has no ACL. A dashboard without an ACL would fallback
// to the users org role, which for editors and admins would essentially always be allowed here. With RBAC,
@@ -239,6 +238,13 @@ func (hs *HTTPServer) DeleteDashboardSnapshot(c *contextmodel.ReqContext) respon
}
}
if queryResult.External {
err := dashboardsnapshots.DeleteExternalDashboardSnapshot(queryResult.ExternalDeleteURL)
if err != nil {
return response.Error(http.StatusInternalServerError, "Failed to delete external dashboard", err)
}
}
cmd := &dashboardsnapshots.DeleteDashboardSnapshotCommand{DeleteKey: queryResult.DeleteKey}
if err := hs.dashboardsnapshotsService.DeleteDashboardSnapshot(c.Req.Context(), cmd); err != nil {
+2 -3
View File
@@ -16,7 +16,6 @@ import (
_ "github.com/blugelabs/bluge"
_ "github.com/blugelabs/bluge_segment_api"
_ "github.com/crewjam/saml"
_ "github.com/docker/go-connections/nat"
_ "github.com/go-jose/go-jose/v4"
_ "github.com/gobwas/glob"
_ "github.com/googleapis/gax-go/v2"
@@ -32,7 +31,6 @@ import (
_ "github.com/spf13/cobra" // used by the standalone apiserver cli
_ "github.com/spyzhov/ajson"
_ "github.com/stretchr/testify/require"
_ "github.com/testcontainers/testcontainers-go"
_ "gocloud.dev/secrets/awskms"
_ "gocloud.dev/secrets/azurekeyvault"
_ "gocloud.dev/secrets/gcpkms"
@@ -57,7 +55,8 @@ import (
_ "github.com/grafana/e2e"
_ "github.com/grafana/gofpdf"
_ "github.com/grafana/gomemcache/memcache"
_ "github.com/grafana/tempo/pkg/traceql"
_ "github.com/grafana/grafana/apps/alerting/alertenrichment/pkg/apis/alertenrichment/v1beta1"
_ "github.com/grafana/grafana/apps/scope/pkg/apis/scope/v0alpha1"
_ "github.com/grafana/tempo/pkg/traceql"
)
@@ -14,6 +14,9 @@ type EncryptionManager interface {
// implementation present at manager.EncryptionService.
Encrypt(ctx context.Context, namespace xkube.Namespace, payload []byte) (EncryptedPayload, error)
Decrypt(ctx context.Context, namespace xkube.Namespace, payload EncryptedPayload) ([]byte, error)
// Since consolidation occurs at a level above the EncryptionManager, we need to allow that process to manually flush the cache
FlushCache(namespace xkube.Namespace)
}
type EncryptedPayload struct {
@@ -7,11 +7,13 @@ import (
"fmt"
"strconv"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/usagestats"
@@ -19,6 +21,7 @@ import (
"github.com/grafana/grafana/pkg/registry/apis/secret/encryption"
"github.com/grafana/grafana/pkg/registry/apis/secret/encryption/cipher"
"github.com/grafana/grafana/pkg/registry/apis/secret/xkube"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/util"
)
@@ -26,6 +29,9 @@ type EncryptionManager struct {
tracer trace.Tracer
store contracts.DataKeyStorage
usageStats usagestats.Service
cfg *setting.Cfg
dataKeyCache encryption.DataKeyCache
mtx sync.Mutex
@@ -44,6 +50,8 @@ func ProvideEncryptionManager(
usageStats usagestats.Service,
enc cipher.Cipher,
providerConfig encryption.ProviderConfig,
dataKeyCache encryption.DataKeyCache,
cfg *setting.Cfg,
) (contracts.EncryptionManager, error) {
currentProviderID := providerConfig.CurrentProvider
if _, ok := providerConfig.AvailableProviders[currentProviderID]; !ok {
@@ -57,6 +65,8 @@ func ProvideEncryptionManager(
cipher: enc,
log: log.New("encryption"),
providerConfig: providerConfig,
dataKeyCache: dataKeyCache,
cfg: cfg,
}
s.registerUsageMetrics()
@@ -173,6 +183,11 @@ func (s *EncryptionManager) currentDataKey(ctx context.Context, namespace xkube.
// dataKeyByLabel looks up for data key in cache by label.
// Otherwise, it fetches it from database, decrypts it and caches it decrypted.
func (s *EncryptionManager) dataKeyByLabel(ctx context.Context, namespace, label string) (string, []byte, error) {
// 0. Get data key from in-memory cache.
if entry, exists := s.dataKeyCache.GetByLabel(namespace, label); exists && entry.Active {
return entry.Id, entry.DataKey, nil
}
// 1. Get data key from database.
dataKey, err := s.store.GetCurrentDataKey(ctx, namespace, label)
if err != nil {
@@ -194,6 +209,9 @@ func (s *EncryptionManager) dataKeyByLabel(ctx context.Context, namespace, label
return "", nil, err
}
// 3. Store the decrypted data key into the in-memory cache.
s.cacheDataKey(namespace, dataKey, decrypted)
return dataKey.UID, decrypted, nil
}
@@ -240,6 +258,9 @@ func (s *EncryptionManager) newDataKey(ctx context.Context, namespace string, la
return "", nil, err
}
// 4. Store the decrypted data key into the in-memory cache.
s.cacheDataKey(namespace, &dbDataKey, dataKey)
return id, dataKey, nil
}
@@ -303,6 +324,11 @@ func (s *EncryptionManager) dataKeyById(ctx context.Context, namespace, id strin
))
defer span.End()
// 0. Get data key from in-memory cache.
if entry, exists := s.dataKeyCache.GetById(namespace, id); exists && entry.Active {
return entry.DataKey, nil
}
// 1. Get encrypted data key from database.
dataKey, err := s.store.GetDataKey(ctx, namespace, id)
if err != nil {
@@ -321,9 +347,82 @@ func (s *EncryptionManager) dataKeyById(ctx context.Context, namespace, id strin
return nil, err
}
// 3. Store the decrypted data key into the in-memory cache.
s.cacheDataKey(namespace, dataKey, decrypted)
return decrypted, nil
}
func (s *EncryptionManager) GetProviders() encryption.ProviderConfig {
return s.providerConfig
}
func (s *EncryptionManager) FlushCache(namespace xkube.Namespace) {
s.dataKeyCache.Flush(namespace.String())
}
func (s *EncryptionManager) Run(ctx context.Context) error {
gc := time.NewTicker(s.cfg.SecretsManagement.DataKeysCacheCleanupInterval)
grp, gCtx := errgroup.WithContext(ctx)
for {
select {
case <-gc.C:
s.log.Debug("Removing expired data keys from cache...")
s.dataKeyCache.RemoveExpired()
s.log.Debug("Removing expired data keys from cache finished successfully")
case <-gCtx.Done():
s.log.Debug("Grafana is shutting down; stopping...")
gc.Stop()
if err := grp.Wait(); err != nil && !errors.Is(err, context.Canceled) {
return err
}
return nil
}
}
}
// NB: Much of this was copied or derived from the original implementation in the legacy SecretsService.
//
// Caching a data key is tricky, because at SecretsService level we cannot guarantee
// that a newly created data key has actually been persisted, depending on the different
// use cases that rely on SecretsService encryption and different database engines that
// we have support for, because the data key creation may have happened within a DB TX,
// that may fail afterwards.
//
// Therefore, if we cache a data key that hasn't been persisted with success (and won't),
// and later that one is used for a encryption operation (aside from the DB TX that created
// it), we may end up with data encrypted by a non-persisted data key, which could end up
// in (unrecoverable) data corruption.
//
// So, we cache the data key by id and/or by label, depending on the data key's lifetime,
// assuming that a data key older than a "caution period" should have been persisted.
//
// Look at the comments inline for further details.
// You can also take a look at the issue below for more context:
// https://github.com/grafana/grafana-enterprise/issues/4252
func (s *EncryptionManager) cacheDataKey(namespace string, dataKey *contracts.SecretDataKey, decrypted []byte) {
// First, we cache the data key by id, because cache "by id" is
// only used by decrypt operations, so no risk of corrupting data.
entry := &encryption.DataKeyCacheEntry{
Namespace: namespace,
Id: dataKey.UID,
Label: dataKey.Label,
DataKey: decrypted,
Active: dataKey.Active,
}
s.dataKeyCache.AddById(namespace, entry)
// Then, we cache the data key by label, ONLY if data key's lifetime
// is longer than a certain "caution period", because cache "by label"
// is used (only) by encrypt operations, and we want to ensure that
// no data key is cached for encryption ops before being persisted.
nowMinusCautionPeriod := time.Now().Add(-s.cfg.SecretsManagement.DataKeysCacheCautionPeriod)
if dataKey.Created.Before(nowMinusCautionPeriod) {
s.dataKeyCache.AddByLabel(namespace, entry)
}
}
@@ -4,6 +4,7 @@ import (
"context"
"errors"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
@@ -201,6 +202,8 @@ func TestEncryptionService_UseCurrentProvider(t *testing.T) {
usageStats,
enc,
ossProviders,
&NoopDataKeyCache{},
cfg,
)
require.NoError(t, err)
@@ -226,6 +229,8 @@ func TestEncryptionService_UseCurrentProvider(t *testing.T) {
usageStats,
enc,
ossProviders,
&NoopDataKeyCache{},
cfg,
)
require.NoError(t, err)
@@ -275,6 +280,8 @@ func TestEncryptionService_SecretKeyVersionUpgrade(t *testing.T) {
usageStats,
enc,
ossProviders,
&NoopDataKeyCache{},
cfgV1,
)
require.NoError(t, err)
@@ -313,6 +320,8 @@ func TestEncryptionService_SecretKeyVersionUpgrade(t *testing.T) {
usageStats,
enc,
ossProvidersV2,
&NoopDataKeyCache{},
cfgV2,
)
require.NoError(t, err)
@@ -368,6 +377,8 @@ func TestEncryptionService_SecretKeyVersionUpgrade(t *testing.T) {
usageStats,
enc,
ossProviders,
&NoopDataKeyCache{},
cfgV1,
)
require.NoError(t, err)
@@ -392,6 +403,8 @@ func TestEncryptionService_SecretKeyVersionUpgrade(t *testing.T) {
usageStats,
enc,
ossProvidersV2,
&NoopDataKeyCache{},
cfgV2,
)
require.NoError(t, err)
@@ -573,6 +586,8 @@ func TestIntegration_SecretsService(t *testing.T) {
usageStats,
enc,
ossProviders,
&NoopDataKeyCache{},
cfg,
)
require.NoError(t, err)
@@ -610,6 +625,8 @@ func TestEncryptionService_ThirdPartyProviders(t *testing.T) {
enc, err := service.ProvideAESGCMCipherService(tracer, usageStats)
require.NoError(t, err)
cfg := &setting.Cfg{}
svc, err := ProvideEncryptionManager(
tracer,
nil,
@@ -621,6 +638,8 @@ func TestEncryptionService_ThirdPartyProviders(t *testing.T) {
encryption.ProviderID("fakeProvider.v1"): &fakeProvider{},
},
},
&NoopDataKeyCache{},
cfg,
)
require.NoError(t, err)
@@ -628,3 +647,88 @@ func TestEncryptionService_ThirdPartyProviders(t *testing.T) {
require.Len(t, encMgr.providerConfig.AvailableProviders, 1)
require.Contains(t, encMgr.providerConfig.AvailableProviders, encryption.ProviderID("fakeProvider.v1"))
}
func TestEncryptionService_FlushCache(t *testing.T) {
ctx := context.Background()
namespace := xkube.Namespace("test-namespace")
plaintext := []byte("secret data to encrypt")
// Set up the encryption manager with a real OSS DEK cache
testDB := sqlstore.NewTestStore(t, sqlstore.WithMigrator(migrator.New()))
tracer := noop.NewTracerProvider().Tracer("test")
database := database.ProvideDatabase(testDB, tracer)
cfg := &setting.Cfg{
SecretsManagement: setting.SecretsManagerSettings{
CurrentEncryptionProvider: "secret_key.v1",
ConfiguredKMSProviders: map[string]map[string]string{"secret_key.v1": {"secret_key": "SW2YcwTIb9zpOOhoPsMm"}},
DataKeysCacheTTL: time.Hour, // Long TTL to ensure keys don't expire during test
DataKeysCacheCautionPeriod: 0 * time.Second, // Override the caution period for testing
},
}
store, err := encryptionstorage.ProvideDataKeyStorage(database, tracer, nil)
require.NoError(t, err)
usageStats := &usagestats.UsageStatsMock{T: t}
enc, err := service.ProvideAESGCMCipherService(tracer, usageStats)
require.NoError(t, err)
ossProviders, err := osskmsproviders.ProvideOSSKMSProviders(cfg, enc)
require.NoError(t, err)
// Create a real OSS DEK cache
dekCache := ProvideOSSDataKeyCache(cfg)
encMgr, err := ProvideEncryptionManager(
tracer,
store,
usageStats,
enc,
ossProviders,
dekCache,
cfg,
)
require.NoError(t, err)
svc := encMgr.(*EncryptionManager)
// Encrypt some data - this will create a DEK and cache it
encrypted, err := svc.Encrypt(ctx, namespace, plaintext)
require.NoError(t, err)
// Verify we can decrypt - this should use the cached key
decrypted, err := svc.Decrypt(ctx, namespace, encrypted)
require.NoError(t, err)
assert.Equal(t, plaintext, decrypted)
// Get the data key ID from the encrypted payload
dataKeyID := encrypted.DataKeyID
// Verify the key is in the cache by checking both by ID and by label
label := encryption.KeyLabel(svc.providerConfig.CurrentProvider)
_, existsById := dekCache.GetById(namespace.String(), dataKeyID)
assert.True(t, existsById, "DEK should be cached by ID before flush")
_, existsByLabel := dekCache.GetByLabel(namespace.String(), label)
assert.True(t, existsByLabel, "DEK should be cached by label before flush")
// Flush the cache for this namespace
svc.FlushCache(namespace)
// Verify the cache is empty for this namespace
_, existsById = dekCache.GetById(namespace.String(), dataKeyID)
assert.False(t, existsById, "DEK should not be in cache by ID after flush")
_, existsByLabel = dekCache.GetByLabel(namespace.String(), label)
assert.False(t, existsByLabel, "DEK should not be in cache by label after flush")
// Verify we can still decrypt - this should fetch from DB and re-cache
decrypted, err = svc.Decrypt(ctx, namespace, encrypted)
require.NoError(t, err)
assert.Equal(t, plaintext, decrypted)
// Verify the key is back in the cache after the decrypt operation
_, existsById = dekCache.GetById(namespace.String(), dataKeyID)
assert.True(t, existsById, "DEK should be re-cached by ID after decrypt")
}
@@ -0,0 +1,130 @@
package manager
import (
"strconv"
"sync"
"time"
"github.com/grafana/grafana/pkg/registry/apis/secret/encryption"
"github.com/grafana/grafana/pkg/setting"
"github.com/prometheus/client_golang/prometheus"
)
type ossDataKeyCache struct {
mtx sync.RWMutex
byId map[string]map[string]*encryption.DataKeyCacheEntry
byLabel map[string]map[string]*encryption.DataKeyCacheEntry
cacheTTL time.Duration
}
func ProvideOSSDataKeyCache(cfg *setting.Cfg) encryption.DataKeyCache {
return &ossDataKeyCache{
byId: make(map[string]map[string]*encryption.DataKeyCacheEntry),
byLabel: make(map[string]map[string]*encryption.DataKeyCacheEntry),
cacheTTL: cfg.SecretsManagement.DataKeysCacheTTL,
}
}
func (c *ossDataKeyCache) GetById(namespace, id string) (_ *encryption.DataKeyCacheEntry, exists bool) {
defer func() {
cacheReadsCounter.With(prometheus.Labels{
"hit": strconv.FormatBool(exists),
"method": "byId",
}).Inc()
}()
c.mtx.RLock()
defer c.mtx.RUnlock()
entries, exists := c.byId[namespace]
if !exists {
return nil, false
}
entry, exists := entries[id]
if !exists || entry.IsExpired() || entry.Namespace != namespace {
return nil, false
}
return entry, true
}
func (c *ossDataKeyCache) GetByLabel(namespace, label string) (_ *encryption.DataKeyCacheEntry, exists bool) {
defer func() {
cacheReadsCounter.With(prometheus.Labels{
"hit": strconv.FormatBool(exists),
"method": "byLabel",
}).Inc()
}()
c.mtx.RLock()
defer c.mtx.RUnlock()
entries, exists := c.byLabel[namespace]
if !exists {
return nil, false
}
entry, exists := entries[label]
if !exists || entry.IsExpired() || entry.Namespace != namespace {
return nil, false
}
return entry, true
}
func (c *ossDataKeyCache) AddById(namespace string, entry *encryption.DataKeyCacheEntry) {
c.mtx.Lock()
defer c.mtx.Unlock()
entry.Expiration = time.Now().Add(c.cacheTTL)
entry.Namespace = namespace
entries, exists := c.byId[namespace]
if !exists {
entries = make(map[string]*encryption.DataKeyCacheEntry)
c.byId[namespace] = entries
}
entries[entry.Id] = entry
}
func (c *ossDataKeyCache) AddByLabel(namespace string, entry *encryption.DataKeyCacheEntry) {
c.mtx.Lock()
defer c.mtx.Unlock()
entry.Expiration = time.Now().Add(c.cacheTTL)
entry.Namespace = namespace
entries, exists := c.byLabel[namespace]
if !exists {
entries = make(map[string]*encryption.DataKeyCacheEntry)
c.byLabel[namespace] = entries
}
entries[entry.Label] = entry
}
func (c *ossDataKeyCache) RemoveExpired() {
c.mtx.Lock()
defer c.mtx.Unlock()
for _, entries := range c.byId {
for id, entry := range entries {
if entry.IsExpired() {
delete(entries, id)
}
}
}
for _, entries := range c.byLabel {
for label, entry := range entries {
if entry.IsExpired() {
delete(entries, label)
}
}
}
}
func (c *ossDataKeyCache) Flush(namespace string) {
c.mtx.Lock()
c.byId[namespace] = make(map[string]*encryption.DataKeyCacheEntry)
c.byLabel[namespace] = make(map[string]*encryption.DataKeyCacheEntry)
c.mtx.Unlock()
}
@@ -0,0 +1,570 @@
package manager
import (
"testing"
"time"
"github.com/grafana/grafana/pkg/registry/apis/secret/encryption"
"github.com/grafana/grafana/pkg/setting"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestOSSDataKeyCache(t *testing.T) {
t.Parallel()
settings := setting.NewCfg()
settings.SecretsManagement = setting.SecretsManagerSettings{
DataKeysCacheTTL: 999 * time.Hour, // avoid expiration for testing
}
cache := ProvideOSSDataKeyCache(settings)
namespace := "test-namespace"
entry := &encryption.DataKeyCacheEntry{
Id: "key-123",
Label: "2024-01-01@provider.key1",
DataKey: []byte("test-data-key"),
Active: true,
}
t.Run("AddById and GetById", func(t *testing.T) {
cache.AddById(namespace, entry)
retrieved, exists := cache.GetById(namespace, entry.Id)
require.True(t, exists, "entry should exist after adding")
assert.Equal(t, entry.Id, retrieved.Id)
assert.Equal(t, entry.Label, retrieved.Label)
assert.Equal(t, entry.DataKey, retrieved.DataKey)
assert.Equal(t, entry.Active, retrieved.Active)
assert.Equal(t, namespace, retrieved.Namespace)
assert.True(t, retrieved.Expiration.After(time.Now()), "expiration should be in the future")
})
t.Run("AddByLabel and GetByLabel", func(t *testing.T) {
cache.AddByLabel(namespace, entry)
retrieved, exists := cache.GetByLabel(namespace, entry.Label)
require.True(t, exists, "entry should exist after adding")
assert.Equal(t, entry.Id, retrieved.Id)
assert.Equal(t, entry.Label, retrieved.Label)
assert.Equal(t, entry.DataKey, retrieved.DataKey)
assert.Equal(t, entry.Active, retrieved.Active)
assert.Equal(t, namespace, retrieved.Namespace)
assert.True(t, retrieved.Expiration.After(time.Now()), "expiration should be in the future")
})
t.Run("GetById and GetByLabel are independent", func(t *testing.T) {
cache2 := ProvideOSSDataKeyCache(settings)
ns := "independent-test"
entryById := &encryption.DataKeyCacheEntry{
Id: "id-only-key",
Label: "label1",
DataKey: []byte("data1"),
}
entryByLabel := &encryption.DataKeyCacheEntry{
Id: "id2",
Label: "label-only-key",
DataKey: []byte("data2"),
}
cache2.AddById(ns, entryById)
cache2.AddByLabel(ns, entryByLabel)
// Should find by ID
retrieved, exists := cache2.GetById(ns, entryById.Id)
require.True(t, exists)
assert.Equal(t, entryById.Id, retrieved.Id)
// Should not find by label that wasn't added via AddByLabel
_, exists = cache2.GetByLabel(ns, entryById.Label)
assert.False(t, exists)
// Should find by label
retrieved, exists = cache2.GetByLabel(ns, entryByLabel.Label)
require.True(t, exists)
assert.Equal(t, entryByLabel.Label, retrieved.Label)
// Should not find by ID that wasn't added via AddById
_, exists = cache2.GetById(ns, entryByLabel.Id)
assert.False(t, exists)
})
}
func TestOSSDataKeyCache_FalseConditions(t *testing.T) {
t.Parallel()
settings := setting.NewCfg()
settings.SecretsManagement = setting.SecretsManagerSettings{
DataKeysCacheTTL: 999 * time.Hour,
}
cache := ProvideOSSDataKeyCache(settings)
namespace := "test-namespace"
entry := &encryption.DataKeyCacheEntry{
Id: "key-123",
Label: "2024-01-01@provider.key1",
DataKey: []byte("test-data-key"),
Active: true,
}
t.Run("GetById returns false for non-existent namespace", func(t *testing.T) {
_, exists := cache.GetById("non-existent-namespace", "any-id")
assert.False(t, exists)
})
t.Run("GetById returns false for non-existent id", func(t *testing.T) {
cache.AddById(namespace, entry)
_, exists := cache.GetById(namespace, "non-existent-id")
assert.False(t, exists)
})
t.Run("GetByLabel returns false for non-existent namespace", func(t *testing.T) {
_, exists := cache.GetByLabel("non-existent-namespace", "any-label")
assert.False(t, exists)
})
t.Run("GetByLabel returns false for non-existent label", func(t *testing.T) {
cache.AddByLabel(namespace, entry)
_, exists := cache.GetByLabel(namespace, "non-existent-label")
assert.False(t, exists)
})
t.Run("GetById returns false for expired entry", func(t *testing.T) {
shortTTLSettings := setting.NewCfg()
shortTTLSettings.SecretsManagement = setting.SecretsManagerSettings{
DataKeysCacheTTL: 1 * time.Millisecond,
}
shortCache := ProvideOSSDataKeyCache(shortTTLSettings)
namespace := "test-ns"
expiredEntry := &encryption.DataKeyCacheEntry{
Id: "expired-key",
Label: "expired-label",
DataKey: []byte("expired-data"),
}
shortCache.AddById(namespace, expiredEntry)
time.Sleep(10 * time.Millisecond)
_, exists := shortCache.GetById(namespace, expiredEntry.Id)
assert.False(t, exists, "should return false for expired entry")
})
t.Run("GetByLabel returns false for expired entry", func(t *testing.T) {
shortTTLSettings := setting.NewCfg()
shortTTLSettings.SecretsManagement = setting.SecretsManagerSettings{
DataKeysCacheTTL: 1 * time.Millisecond,
}
shortCache := ProvideOSSDataKeyCache(shortTTLSettings)
namespace := "test-ns"
expiredEntry := &encryption.DataKeyCacheEntry{
Id: "expired-key",
Label: "expired-label",
DataKey: []byte("expired-data"),
}
shortCache.AddByLabel(namespace, expiredEntry)
time.Sleep(10 * time.Millisecond)
_, exists := shortCache.GetByLabel(namespace, expiredEntry.Label)
assert.False(t, exists, "should return false for expired entry")
})
t.Run("GetById returns false when entry namespace doesn't match", func(t *testing.T) {
// This tests the entry.Namespace != namespace check in GetById
// This is a defensive check that shouldn't normally happen if AddById works correctly
testCache := ProvideOSSDataKeyCache(settings).(*ossDataKeyCache)
// Manually insert an entry with mismatched namespace to test the defensive check
mismatchedEntry := &encryption.DataKeyCacheEntry{
Id: "test-id",
Label: "test-label",
DataKey: []byte("test-data"),
Namespace: "wrong-namespace",
Expiration: time.Now().Add(999 * time.Hour),
}
testCache.mtx.Lock()
testCache.byId["correct-namespace"] = map[string]*encryption.DataKeyCacheEntry{
mismatchedEntry.Id: mismatchedEntry,
}
testCache.mtx.Unlock()
_, exists := testCache.GetById("correct-namespace", mismatchedEntry.Id)
assert.False(t, exists, "should return false when entry namespace doesn't match lookup namespace")
})
t.Run("GetByLabel returns false when entry namespace doesn't match", func(t *testing.T) {
// This tests the entry.Namespace != namespace check in GetByLabel
testCache := ProvideOSSDataKeyCache(settings).(*ossDataKeyCache)
// Manually insert an entry with mismatched namespace to test the defensive check
mismatchedEntry := &encryption.DataKeyCacheEntry{
Id: "test-id",
Label: "test-label",
DataKey: []byte("test-data"),
Namespace: "wrong-namespace",
Expiration: time.Now().Add(999 * time.Hour),
}
testCache.mtx.Lock()
testCache.byLabel["correct-namespace"] = map[string]*encryption.DataKeyCacheEntry{
"test-label": mismatchedEntry,
}
testCache.mtx.Unlock()
_, exists := testCache.GetByLabel("correct-namespace", mismatchedEntry.Label)
assert.False(t, exists, "should return false when entry namespace doesn't match lookup namespace")
})
}
// Test namespace isolation
func TestOSSDataKeyCache_NamespaceIsolation(t *testing.T) {
t.Parallel()
settings := setting.NewCfg()
settings.SecretsManagement = setting.SecretsManagerSettings{
DataKeysCacheTTL: 999 * time.Hour,
}
cache := ProvideOSSDataKeyCache(settings)
namespace1 := "namespace-1"
namespace2 := "namespace-2"
entry1 := &encryption.DataKeyCacheEntry{
Id: "shared-id",
Label: "shared-label",
DataKey: []byte("data-from-ns1"),
Active: true,
}
entry2 := &encryption.DataKeyCacheEntry{
Id: "shared-id",
Label: "shared-label",
DataKey: []byte("data-from-ns2"),
Active: false,
}
t.Run("entries with same ID in different namespaces are isolated", func(t *testing.T) {
cache.AddById(namespace1, entry1)
cache.AddById(namespace2, entry2)
retrieved1, exists := cache.GetById(namespace1, entry1.Id)
require.True(t, exists)
assert.Equal(t, entry1.DataKey, retrieved1.DataKey)
assert.Equal(t, namespace1, retrieved1.Namespace)
assert.True(t, retrieved1.Active)
retrieved2, exists := cache.GetById(namespace2, entry2.Id)
require.True(t, exists)
assert.Equal(t, entry2.DataKey, retrieved2.DataKey)
assert.Equal(t, namespace2, retrieved2.Namespace)
assert.False(t, retrieved2.Active)
})
t.Run("entries with same label in different namespaces are isolated", func(t *testing.T) {
cache.AddByLabel(namespace1, entry1)
cache.AddByLabel(namespace2, entry2)
retrieved1, exists := cache.GetByLabel(namespace1, entry1.Label)
require.True(t, exists)
assert.Equal(t, entry1.DataKey, retrieved1.DataKey)
assert.Equal(t, namespace1, retrieved1.Namespace)
assert.True(t, retrieved1.Active)
retrieved2, exists := cache.GetByLabel(namespace2, entry2.Label)
require.True(t, exists)
assert.Equal(t, entry2.DataKey, retrieved2.DataKey)
assert.Equal(t, namespace2, retrieved2.Namespace)
assert.False(t, retrieved2.Active)
})
t.Run("cannot retrieve entry from wrong namespace", func(t *testing.T) {
// flush both namespaces since the cache is full of stuff now
cache.Flush(namespace1)
cache.Flush(namespace2)
cache.AddById(namespace1, entry1)
_, exists := cache.GetById(namespace2, entry1.Id)
assert.False(t, exists, "should not find entry from different namespace")
cache.AddByLabel(namespace1, entry1)
_, exists = cache.GetByLabel(namespace2, entry1.Label)
assert.False(t, exists, "should not find entry from different namespace")
})
}
func TestOSSDataKeyCache_Expiration(t *testing.T) {
t.Parallel()
t.Run("entries expire after TTL", func(t *testing.T) {
settings := setting.NewCfg()
settings.SecretsManagement = setting.SecretsManagerSettings{
DataKeysCacheTTL: 50 * time.Millisecond,
}
cache := ProvideOSSDataKeyCache(settings)
namespace := "test-ns"
entry := &encryption.DataKeyCacheEntry{
Id: "expiring-key",
Label: "expiring-label",
DataKey: []byte("expiring-data"),
}
cache.AddById(namespace, entry)
cache.AddByLabel(namespace, entry)
// Should exist immediately
_, exists := cache.GetById(namespace, entry.Id)
assert.True(t, exists, "entry should exist immediately after adding")
_, exists = cache.GetByLabel(namespace, entry.Label)
assert.True(t, exists, "entry should exist immediately after adding")
// Wait for expiration
time.Sleep(100 * time.Millisecond)
// Should not exist after expiration
_, exists = cache.GetById(namespace, entry.Id)
assert.False(t, exists, "entry should not exist after TTL expires")
_, exists = cache.GetByLabel(namespace, entry.Label)
assert.False(t, exists, "entry should not exist after TTL expires")
})
t.Run("RemoveExpired removes only expired entries", func(t *testing.T) {
settings := setting.NewCfg()
settings.SecretsManagement = setting.SecretsManagerSettings{
DataKeysCacheTTL: 50 * time.Millisecond,
}
cache := ProvideOSSDataKeyCache(settings)
namespace := "test-ns"
// Add entries that will expire
expiredEntry1 := &encryption.DataKeyCacheEntry{
Id: "expired-1",
Label: "expired-label-1",
DataKey: []byte("expired-data-1"),
}
expiredEntry2 := &encryption.DataKeyCacheEntry{
Id: "expired-2",
Label: "expired-label-2",
DataKey: []byte("expired-data-2"),
}
cache.AddById(namespace, expiredEntry1)
cache.AddByLabel(namespace, expiredEntry2)
// Wait for expiration
time.Sleep(100 * time.Millisecond)
// Add fresh entries
freshEntry1 := &encryption.DataKeyCacheEntry{
Id: "fresh-1",
Label: "fresh-label-1",
DataKey: []byte("fresh-data-1"),
}
freshEntry2 := &encryption.DataKeyCacheEntry{
Id: "fresh-2",
Label: "fresh-label-2",
DataKey: []byte("fresh-data-2"),
}
cache.AddById(namespace, freshEntry1)
cache.AddByLabel(namespace, freshEntry2)
// Before RemoveExpired, expired entries still exist in the map
// but GetById/GetByLabel return false due to IsExpired() check
// Call RemoveExpired
cache.RemoveExpired()
// Fresh entries should still exist
_, exists := cache.GetById(namespace, freshEntry1.Id)
assert.True(t, exists, "fresh entry should still exist after RemoveExpired")
_, exists = cache.GetByLabel(namespace, freshEntry2.Label)
assert.True(t, exists, "fresh entry should still exist after RemoveExpired")
// Expired entries should not exist
ossCache := cache.(*ossDataKeyCache)
_, exists = ossCache.byId[namespace][expiredEntry1.Id]
assert.False(t, exists, "expired entry should not exist after RemoveExpired")
_, exists = ossCache.byLabel[namespace][expiredEntry2.Label]
assert.False(t, exists, "expired entry should not exist after RemoveExpired")
})
t.Run("RemoveExpired handles multiple namespaces", func(t *testing.T) {
settings := setting.NewCfg()
settings.SecretsManagement = setting.SecretsManagerSettings{
DataKeysCacheTTL: 50 * time.Millisecond,
}
cache := ProvideOSSDataKeyCache(settings)
ns1 := "namespace-1"
ns2 := "namespace-2"
ns1ExpiredEntry := &encryption.DataKeyCacheEntry{
Id: "expired-key-ns1",
Label: "expired-label-ns1",
DataKey: []byte("expired-data"),
}
ns2ExpiredEntry := &encryption.DataKeyCacheEntry{
Id: "expired-key-ns2",
Label: "expired-label-ns2",
DataKey: []byte("expired-data"),
}
cache.AddById(ns1, ns1ExpiredEntry)
cache.AddByLabel(ns1, ns1ExpiredEntry)
cache.AddById(ns2, ns2ExpiredEntry)
cache.AddByLabel(ns2, ns2ExpiredEntry)
time.Sleep(100 * time.Millisecond)
ns1FreshEntry := &encryption.DataKeyCacheEntry{
Id: "fresh-key-ns1",
Label: "fresh-label-ns1",
DataKey: []byte("fresh-data-ns1"),
}
ns2FreshEntry := &encryption.DataKeyCacheEntry{
Id: "fresh-key-ns2",
Label: "fresh-label-ns2",
DataKey: []byte("fresh-data-ns2"),
}
cache.AddById(ns1, ns1FreshEntry)
cache.AddByLabel(ns1, ns1FreshEntry)
cache.AddById(ns2, ns2FreshEntry)
cache.AddByLabel(ns2, ns2FreshEntry)
cache.RemoveExpired()
// Fresh entries in both namespaces should exist
_, exists := cache.GetById(ns1, ns1FreshEntry.Id)
assert.True(t, exists)
_, exists = cache.GetByLabel(ns1, ns1FreshEntry.Label)
assert.True(t, exists)
_, exists = cache.GetById(ns2, ns2FreshEntry.Id)
assert.True(t, exists)
_, exists = cache.GetByLabel(ns2, ns2FreshEntry.Label)
assert.True(t, exists)
// Expired entries in both namespaces should not exist
ossCache := cache.(*ossDataKeyCache)
_, exists = ossCache.byId[ns1][ns1ExpiredEntry.Id]
assert.False(t, exists)
_, exists = ossCache.byId[ns2][ns2ExpiredEntry.Id]
assert.False(t, exists)
_, exists = ossCache.byLabel[ns1][ns1ExpiredEntry.Label]
assert.False(t, exists)
_, exists = ossCache.byLabel[ns2][ns2ExpiredEntry.Label]
assert.False(t, exists)
})
}
// Test Flush()
func TestOSSDataKeyCache_Flush(t *testing.T) {
t.Parallel()
settings := setting.NewCfg()
settings.SecretsManagement = setting.SecretsManagerSettings{
DataKeysCacheTTL: 999 * time.Hour,
}
cache := ProvideOSSDataKeyCache(settings)
namespace1 := "namespace-1"
namespace2 := "namespace-2"
entry1 := &encryption.DataKeyCacheEntry{
Id: "key-1",
Label: "label-1",
DataKey: []byte("data-1"),
}
entry2 := &encryption.DataKeyCacheEntry{
Id: "key-2",
Label: "label-2",
DataKey: []byte("data-2"),
}
t.Run("Flush removes all entries from specified namespace", func(t *testing.T) {
cache.AddById(namespace1, entry1)
cache.AddByLabel(namespace1, entry1)
// Verify entries exist
_, exists := cache.GetById(namespace1, entry1.Id)
require.True(t, exists)
_, exists = cache.GetByLabel(namespace1, entry1.Label)
require.True(t, exists)
// Flush namespace1
cache.Flush(namespace1)
// Entries should no longer exist
_, exists = cache.GetById(namespace1, entry1.Id)
assert.False(t, exists, "entry should not exist after flush")
_, exists = cache.GetByLabel(namespace1, entry1.Label)
assert.False(t, exists, "entry should not exist after flush")
})
t.Run("Flush only affects specified namespace", func(t *testing.T) {
cache.AddById(namespace1, entry1)
cache.AddByLabel(namespace1, entry1)
cache.AddById(namespace2, entry2)
cache.AddByLabel(namespace2, entry2)
// Flush only namespace1
cache.Flush(namespace1)
// namespace1 entries should not exist
_, exists := cache.GetById(namespace1, entry1.Id)
assert.False(t, exists)
_, exists = cache.GetByLabel(namespace1, entry1.Label)
assert.False(t, exists)
// namespace2 entries should still exist
_, exists = cache.GetById(namespace2, entry2.Id)
assert.True(t, exists, "entries in other namespace should not be affected")
_, exists = cache.GetByLabel(namespace2, entry2.Label)
assert.True(t, exists, "entries in other namespace should not be affected")
})
t.Run("Flush on non-existent namespace does not panic", func(t *testing.T) {
assert.NotPanics(t, func() {
cache.Flush("non-existent-namespace")
})
})
t.Run("can add entries after flush", func(t *testing.T) {
cache.AddById(namespace1, entry1)
cache.Flush(namespace1)
// Add new entry after flush
newEntry := &encryption.DataKeyCacheEntry{
Id: "new-key",
Label: "new-label",
DataKey: []byte("new-data"),
}
cache.AddById(namespace1, newEntry)
// New entry should exist
_, exists := cache.GetById(namespace1, "new-key")
assert.True(t, exists, "should be able to add entries after flush")
})
}
@@ -0,0 +1,27 @@
package manager
import "github.com/grafana/grafana/pkg/registry/apis/secret/encryption"
// This is being used as the data key cache in both OSS and Enterprise while we discuss security requirements for DEK caching
type noopDataKeyCache struct {
}
func ProvideNoopDataKeyCache() encryption.DataKeyCache {
return &noopDataKeyCache{}
}
func (c *noopDataKeyCache) GetById(_ string, _ string) (*encryption.DataKeyCacheEntry, bool) {
return nil, false
}
func (c *noopDataKeyCache) GetByLabel(_ string, _ string) (*encryption.DataKeyCacheEntry, bool) {
return nil, false
}
func (c *noopDataKeyCache) AddById(_ string, _ *encryption.DataKeyCacheEntry) {}
func (c *noopDataKeyCache) AddByLabel(_ string, _ *encryption.DataKeyCacheEntry) {}
func (c *noopDataKeyCache) RemoveExpired() {}
func (c *noopDataKeyCache) Flush(_ string) {}
@@ -7,6 +7,7 @@ import (
"go.opentelemetry.io/otel/trace/noop"
"github.com/grafana/grafana/pkg/infra/usagestats"
"github.com/grafana/grafana/pkg/registry/apis/secret/encryption"
"github.com/grafana/grafana/pkg/registry/apis/secret/encryption/cipher/service"
osskmsproviders "github.com/grafana/grafana/pkg/registry/apis/secret/encryption/kmsproviders"
"github.com/grafana/grafana/pkg/services/sqlstore"
@@ -47,8 +48,32 @@ func setupTestService(tb testing.TB) *EncryptionManager {
usageStats,
enc,
ossProviders,
&NoopDataKeyCache{},
cfg,
)
require.NoError(tb, err)
return encMgr.(*EncryptionManager)
}
type NoopDataKeyCache struct {
}
func (c *NoopDataKeyCache) GetById(namespace, id string) (*encryption.DataKeyCacheEntry, bool) {
return nil, false
}
func (c *NoopDataKeyCache) GetByLabel(namespace, label string) (*encryption.DataKeyCacheEntry, bool) {
return nil, false
}
func (c *NoopDataKeyCache) AddById(namespace string, entry *encryption.DataKeyCacheEntry) {
}
func (c *NoopDataKeyCache) AddByLabel(namespace string, entry *encryption.DataKeyCacheEntry) {
}
func (c *NoopDataKeyCache) RemoveExpired() {
}
func (c *NoopDataKeyCache) Flush(namespace string) {}
@@ -40,3 +40,25 @@ func (id ProviderID) Kind() (string, error) {
func KeyLabel(providerID ProviderID) string {
return fmt.Sprintf("%s@%s", time.Now().Format("2006-01-02"), providerID)
}
type DataKeyCache interface {
GetById(namespace, id string) (*DataKeyCacheEntry, bool)
GetByLabel(namespace, label string) (*DataKeyCacheEntry, bool)
AddById(namespace string, entry *DataKeyCacheEntry)
AddByLabel(namespace string, entry *DataKeyCacheEntry)
RemoveExpired()
Flush(namespace string)
}
type DataKeyCacheEntry struct {
Namespace string
Id string
Label string
DataKey []byte
Active bool
Expiration time.Time
}
func (e DataKeyCacheEntry) IsExpired() bool {
return e.Expiration.Before(time.Now())
}
@@ -62,7 +62,7 @@ func setupTestService(t *testing.T, cfg *setting.Cfg) (*OSSKeeperService, error)
ossProviders, err := osskmsproviders.ProvideOSSKMSProviders(cfg, enc)
require.NoError(t, err)
encryptionManager, err := manager.ProvideEncryptionManager(tracer, dataKeyStore, usageStats, enc, ossProviders)
encryptionManager, err := manager.ProvideEncryptionManager(tracer, dataKeyStore, usageStats, enc, ossProviders, &manager.NoopDataKeyCache{}, cfg)
require.NoError(t, err)
// Initialize the keeper service
@@ -53,6 +53,9 @@ func (s *ConsolidationService) Consolidate(ctx context.Context) (err error) {
return fmt.Errorf("disabling all data keys: %w", err)
}
// Keep track of which namespaces we have already flushed so we get to take advantage of caching the new values
flushedNamespaces := make(map[string]bool)
// List all encrypted values.
encryptedValues, err := s.globalEncryptedValueStore.ListAll(ctx, contracts.ListOpts{}, nil)
if err != nil {
@@ -60,6 +63,12 @@ func (s *ConsolidationService) Consolidate(ctx context.Context) (err error) {
}
for _, ev := range encryptedValues {
// Flush the cache for this namespace if we haven't already
if !flushedNamespaces[ev.Namespace] {
s.encryptionManager.FlushCache(xkube.Namespace(ev.Namespace))
flushedNamespaces[ev.Namespace] = true
}
// Decrypt the value using its old data key.
decryptedValue, err := s.encryptionManager.Decrypt(ctx, xkube.Namespace(ev.Namespace), ev.EncryptedPayload)
if err != nil {
@@ -121,6 +121,8 @@ func Setup(t *testing.T, opts ...func(*SetupConfig)) Sut {
usageStats,
enc,
ossProviders,
&manager.NoopDataKeyCache{},
cfg,
)
require.NoError(t, err)
+6 -3
View File
@@ -488,7 +488,8 @@ func Initialize(ctx context.Context, cfg *setting.Cfg, opts Options, apiOpts api
if err != nil {
return nil, err
}
encryptionManager, err := manager2.ProvideEncryptionManager(tracer, dataKeyStorage, usageStats, cipher, providerConfig)
dataKeyCache := manager2.ProvideNoopDataKeyCache()
encryptionManager, err := manager2.ProvideEncryptionManager(tracer, dataKeyStorage, usageStats, cipher, providerConfig, dataKeyCache, cfg)
if err != nil {
return nil, err
}
@@ -1154,7 +1155,8 @@ func InitializeForTest(ctx context.Context, t sqlutil.ITestDB, testingT interfac
if err != nil {
return nil, err
}
encryptionManager, err := manager2.ProvideEncryptionManager(tracer, dataKeyStorage, usageStats, cipher, providerConfig)
dataKeyCache := manager2.ProvideNoopDataKeyCache()
encryptionManager, err := manager2.ProvideEncryptionManager(tracer, dataKeyStorage, usageStats, cipher, providerConfig, dataKeyCache, cfg)
if err != nil {
return nil, err
}
@@ -1716,7 +1718,8 @@ func InitializeForCLI(ctx context.Context, cfg *setting.Cfg) (Runner, error) {
if err != nil {
return Runner{}, err
}
encryptionManager, err := manager2.ProvideEncryptionManager(tracer, dataKeyStorage, usageStats, cipher, providerConfig)
dataKeyCache := manager2.ProvideNoopDataKeyCache()
encryptionManager, err := manager2.ProvideEncryptionManager(tracer, dataKeyStorage, usageStats, cipher, providerConfig, dataKeyCache, cfg)
if err != nil {
return Runner{}, err
}
+3
View File
@@ -18,6 +18,7 @@ import (
"github.com/grafana/grafana/pkg/registry/apis/secret"
"github.com/grafana/grafana/pkg/registry/apis/secret/contracts"
gsmKMSProviders "github.com/grafana/grafana/pkg/registry/apis/secret/encryption/kmsproviders"
gsmEncryptionManager "github.com/grafana/grafana/pkg/registry/apis/secret/encryption/manager"
"github.com/grafana/grafana/pkg/registry/apis/secret/secretkeeper"
secretService "github.com/grafana/grafana/pkg/registry/apis/secret/service"
"github.com/grafana/grafana/pkg/registry/apps/advisor"
@@ -152,6 +153,8 @@ var wireExtsBasicSet = wire.NewSet(
aggregatorrunner.ProvideNoopAggregatorConfigurator,
apisregistry.WireSetExts,
gsmKMSProviders.ProvideOSSKMSProviders,
//gsmEncryptionManager.ProvideOSSDataKeyCache, // Temporarily use noop cache
gsmEncryptionManager.ProvideNoopDataKeyCache,
secret.ProvideSecureValueClient,
provisioningExtras,
configProviderExtras,
+73 -29
View File
@@ -36,6 +36,9 @@ var client = &http.Client{
Transport: &http.Transport{Proxy: http.ProxyFromEnvironment},
}
// CreateDashboardSnapshot creates a snapshot when running Grafana in regular mode.
// It validates the user and dashboard exist before creating the snapshot.
// This mode supports both local and external snapshots.
func CreateDashboardSnapshot(c *contextmodel.ReqContext, cfg snapshot.SnapshotSharingOptions, cmd CreateDashboardSnapshotCommand, svc Service) {
if !cfg.SnapshotsEnabled {
c.JsonApiErr(http.StatusForbidden, "Dashboard Snapshots are disabled", nil)
@@ -43,6 +46,7 @@ func CreateDashboardSnapshot(c *contextmodel.ReqContext, cfg snapshot.SnapshotSh
}
uid := cmd.Dashboard.GetNestedString("uid")
user, err := identity.GetRequester(c.Req.Context())
if err != nil {
c.JsonApiErr(http.StatusBadRequest, "missing user in context", nil)
@@ -59,21 +63,18 @@ func CreateDashboardSnapshot(c *contextmodel.ReqContext, cfg snapshot.SnapshotSh
return
}
cmd.ExternalURL = ""
cmd.OrgID = user.GetOrgID()
cmd.UserID, _ = identity.UserIdentifier(user.GetID())
if cmd.Name == "" {
cmd.Name = "Unnamed snapshot"
}
var snapshotUrl string
cmd.ExternalURL = ""
cmd.OrgID = user.GetOrgID()
cmd.UserID, _ = identity.UserIdentifier(user.GetID())
originalDashboardURL, err := createOriginalDashboardURL(&cmd)
if err != nil {
c.JsonApiErr(http.StatusInternalServerError, "Invalid app URL", err)
return
}
var snapshotURL string
if cmd.External {
// Handle external snapshot creation
if !cfg.ExternalEnabled {
c.JsonApiErr(http.StatusForbidden, "External dashboard creation is disabled", nil)
return
@@ -85,40 +86,83 @@ func CreateDashboardSnapshot(c *contextmodel.ReqContext, cfg snapshot.SnapshotSh
return
}
snapshotUrl = resp.Url
cmd.Key = resp.Key
cmd.DeleteKey = resp.DeleteKey
cmd.ExternalURL = resp.Url
cmd.ExternalDeleteURL = resp.DeleteUrl
cmd.Dashboard = &common.Unstructured{}
snapshotURL = resp.Url
metrics.MApiDashboardSnapshotExternal.Inc()
} else {
cmd.Dashboard.SetNestedField(originalDashboardURL, "snapshot", "originalUrl")
if cmd.Key == "" {
var err error
cmd.Key, err = util.GetRandomString(32)
if err != nil {
c.JsonApiErr(http.StatusInternalServerError, "Could not generate random string", err)
return
}
// Handle local snapshot creation
originalDashboardURL, err := createOriginalDashboardURL(&cmd)
if err != nil {
c.JsonApiErr(http.StatusInternalServerError, "Invalid app URL", err)
return
}
if cmd.DeleteKey == "" {
var err error
cmd.DeleteKey, err = util.GetRandomString(32)
if err != nil {
c.JsonApiErr(http.StatusInternalServerError, "Could not generate random string", err)
return
}
snapshotURL, err = prepareLocalSnapshot(&cmd, originalDashboardURL)
if err != nil {
c.JsonApiErr(http.StatusInternalServerError, "Could not generate random string", err)
return
}
snapshotUrl = setting.ToAbsUrl("dashboard/snapshot/" + cmd.Key)
metrics.MApiDashboardSnapshotCreate.Inc()
}
saveAndRespond(c, svc, cmd, snapshotURL)
}
// CreateDashboardSnapshotPublic creates a snapshot when running Grafana in public mode.
// In public mode, there is no user or dashboard information to validate.
// Only local snapshots are supported (external snapshots are not available).
func CreateDashboardSnapshotPublic(c *contextmodel.ReqContext, cfg snapshot.SnapshotSharingOptions, cmd CreateDashboardSnapshotCommand, svc Service) {
if !cfg.SnapshotsEnabled {
c.JsonApiErr(http.StatusForbidden, "Dashboard Snapshots are disabled", nil)
return
}
if cmd.Name == "" {
cmd.Name = "Unnamed snapshot"
}
snapshotURL, err := prepareLocalSnapshot(&cmd, "")
if err != nil {
c.JsonApiErr(http.StatusInternalServerError, "Could not generate random string", err)
return
}
metrics.MApiDashboardSnapshotCreate.Inc()
saveAndRespond(c, svc, cmd, snapshotURL)
}
// prepareLocalSnapshot prepares the command for a local snapshot and returns the snapshot URL.
func prepareLocalSnapshot(cmd *CreateDashboardSnapshotCommand, originalDashboardURL string) (string, error) {
cmd.Dashboard.SetNestedField(originalDashboardURL, "snapshot", "originalUrl")
if cmd.Key == "" {
key, err := util.GetRandomString(32)
if err != nil {
return "", err
}
cmd.Key = key
}
if cmd.DeleteKey == "" {
deleteKey, err := util.GetRandomString(32)
if err != nil {
return "", err
}
cmd.DeleteKey = deleteKey
}
return setting.ToAbsUrl("dashboard/snapshot/" + cmd.Key), nil
}
// saveAndRespond saves the snapshot and sends the response.
func saveAndRespond(c *contextmodel.ReqContext, svc Service, cmd CreateDashboardSnapshotCommand, snapshotURL string) {
result, err := svc.CreateDashboardSnapshot(c.Req.Context(), &cmd)
if err != nil {
c.JsonApiErr(http.StatusInternalServerError, "Failed to create snapshot", err)
@@ -128,7 +172,7 @@ func CreateDashboardSnapshot(c *contextmodel.ReqContext, cfg snapshot.SnapshotSh
c.JSON(http.StatusOK, snapshot.DashboardCreateResponse{
Key: result.Key,
DeleteKey: result.DeleteKey,
URL: snapshotUrl,
URL: snapshotURL,
DeleteURL: setting.ToAbsUrl("api/snapshots-delete/" + result.DeleteKey),
})
}
+331 -35
View File
@@ -20,40 +20,30 @@ import (
"github.com/grafana/grafana/pkg/web"
)
func TestCreateDashboardSnapshot_DashboardNotFound(t *testing.T) {
mockService := &MockService{}
cfg := snapshot.SnapshotSharingOptions{
SnapshotsEnabled: true,
ExternalEnabled: false,
func createTestDashboard(t *testing.T) *common.Unstructured {
t.Helper()
dashboard := &common.Unstructured{}
dashboardData := map[string]any{
"uid": "test-dashboard-uid",
"id": 123,
}
testUser := &user.SignedInUser{
dashboardBytes, _ := json.Marshal(dashboardData)
_ = json.Unmarshal(dashboardBytes, dashboard)
return dashboard
}
func createTestUser() *user.SignedInUser {
return &user.SignedInUser{
UserID: 1,
OrgID: 1,
Login: "testuser",
Name: "Test User",
Email: "test@example.com",
}
dashboard := &common.Unstructured{}
dashboardData := map[string]interface{}{
"uid": "test-dashboard-uid",
"id": 123,
}
dashboardBytes, _ := json.Marshal(dashboardData)
_ = json.Unmarshal(dashboardBytes, dashboard)
cmd := CreateDashboardSnapshotCommand{
DashboardCreateCommand: snapshot.DashboardCreateCommand{
Dashboard: dashboard,
Name: "Test Snapshot",
},
}
mockService.On("ValidateDashboardExists", mock.Anything, int64(1), "test-dashboard-uid").
Return(dashboards.ErrDashboardNotFound)
req, _ := http.NewRequest("POST", "/api/snapshots", nil)
req = req.WithContext(identity.WithRequester(req.Context(), testUser))
}
func createReqContext(t *testing.T, req *http.Request, testUser *user.SignedInUser) (*contextmodel.ReqContext, *httptest.ResponseRecorder) {
t.Helper()
recorder := httptest.NewRecorder()
ctx := &contextmodel.ReqContext{
Context: &web.Context{
@@ -63,13 +53,319 @@ func TestCreateDashboardSnapshot_DashboardNotFound(t *testing.T) {
SignedInUser: testUser,
Logger: log.NewNopLogger(),
}
CreateDashboardSnapshot(ctx, cfg, cmd, mockService)
mockService.AssertExpectations(t)
assert.Equal(t, http.StatusBadRequest, recorder.Code)
var response map[string]interface{}
err := json.Unmarshal(recorder.Body.Bytes(), &response)
require.NoError(t, err)
assert.Equal(t, "Dashboard not found", response["message"])
return ctx, recorder
}
// TestCreateDashboardSnapshot tests snapshot creation in regular mode (non-public instance).
// These tests cover scenarios when Grafana is running as a regular server with user authentication.
func TestCreateDashboardSnapshot(t *testing.T) {
t.Run("should return error when dashboard not found", func(t *testing.T) {
mockService := &MockService{}
cfg := snapshot.SnapshotSharingOptions{
SnapshotsEnabled: true,
ExternalEnabled: false,
}
testUser := createTestUser()
dashboard := createTestDashboard(t)
cmd := CreateDashboardSnapshotCommand{
DashboardCreateCommand: snapshot.DashboardCreateCommand{
Dashboard: dashboard,
Name: "Test Snapshot",
},
}
mockService.On("ValidateDashboardExists", mock.Anything, int64(1), "test-dashboard-uid").
Return(dashboards.ErrDashboardNotFound)
req, _ := http.NewRequest("POST", "/api/snapshots", nil)
req = req.WithContext(identity.WithRequester(req.Context(), testUser))
ctx, recorder := createReqContext(t, req, testUser)
CreateDashboardSnapshot(ctx, cfg, cmd, mockService)
mockService.AssertExpectations(t)
assert.Equal(t, http.StatusBadRequest, recorder.Code)
var response map[string]any
err := json.Unmarshal(recorder.Body.Bytes(), &response)
require.NoError(t, err)
assert.Equal(t, "Dashboard not found", response["message"])
})
t.Run("should create external snapshot when external is enabled", func(t *testing.T) {
externalServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/snapshots", r.URL.Path)
assert.Equal(t, "POST", r.Method)
response := map[string]any{
"key": "external-key",
"deleteKey": "external-delete-key",
"url": "https://external.example.com/dashboard/snapshot/external-key",
"deleteUrl": "https://external.example.com/api/snapshots-delete/external-delete-key",
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(response)
}))
defer externalServer.Close()
mockService := NewMockService(t)
cfg := snapshot.SnapshotSharingOptions{
SnapshotsEnabled: true,
ExternalEnabled: true,
ExternalSnapshotURL: externalServer.URL,
}
testUser := createTestUser()
dashboard := createTestDashboard(t)
cmd := CreateDashboardSnapshotCommand{
DashboardCreateCommand: snapshot.DashboardCreateCommand{
Dashboard: dashboard,
Name: "Test External Snapshot",
External: true,
},
}
mockService.On("ValidateDashboardExists", mock.Anything, int64(1), "test-dashboard-uid").
Return(nil)
mockService.On("CreateDashboardSnapshot", mock.Anything, mock.Anything).
Return(&DashboardSnapshot{
Key: "external-key",
DeleteKey: "external-delete-key",
}, nil)
req, _ := http.NewRequest("POST", "/api/snapshots", nil)
req = req.WithContext(identity.WithRequester(req.Context(), testUser))
ctx, recorder := createReqContext(t, req, testUser)
CreateDashboardSnapshot(ctx, cfg, cmd, mockService)
mockService.AssertExpectations(t)
assert.Equal(t, http.StatusOK, recorder.Code)
var response map[string]any
err := json.Unmarshal(recorder.Body.Bytes(), &response)
require.NoError(t, err)
assert.Equal(t, "external-key", response["key"])
assert.Equal(t, "external-delete-key", response["deleteKey"])
assert.Equal(t, "https://external.example.com/dashboard/snapshot/external-key", response["url"])
})
t.Run("should return forbidden when external is disabled", func(t *testing.T) {
mockService := NewMockService(t)
cfg := snapshot.SnapshotSharingOptions{
SnapshotsEnabled: true,
ExternalEnabled: false,
}
testUser := createTestUser()
dashboard := createTestDashboard(t)
cmd := CreateDashboardSnapshotCommand{
DashboardCreateCommand: snapshot.DashboardCreateCommand{
Dashboard: dashboard,
Name: "Test External Snapshot",
External: true,
},
}
mockService.On("ValidateDashboardExists", mock.Anything, int64(1), "test-dashboard-uid").
Return(nil)
req, _ := http.NewRequest("POST", "/api/snapshots", nil)
req = req.WithContext(identity.WithRequester(req.Context(), testUser))
ctx, recorder := createReqContext(t, req, testUser)
CreateDashboardSnapshot(ctx, cfg, cmd, mockService)
mockService.AssertExpectations(t)
assert.Equal(t, http.StatusForbidden, recorder.Code)
var response map[string]any
err := json.Unmarshal(recorder.Body.Bytes(), &response)
require.NoError(t, err)
assert.Equal(t, "External dashboard creation is disabled", response["message"])
})
t.Run("should create local snapshot", func(t *testing.T) {
mockService := NewMockService(t)
cfg := snapshot.SnapshotSharingOptions{
SnapshotsEnabled: true,
}
testUser := createTestUser()
dashboard := createTestDashboard(t)
cmd := CreateDashboardSnapshotCommand{
DashboardCreateCommand: snapshot.DashboardCreateCommand{
Dashboard: dashboard,
Name: "Test Local Snapshot",
},
Key: "local-key",
DeleteKey: "local-delete-key",
}
mockService.On("ValidateDashboardExists", mock.Anything, int64(1), "test-dashboard-uid").
Return(nil)
mockService.On("CreateDashboardSnapshot", mock.Anything, mock.Anything).
Return(&DashboardSnapshot{
Key: "local-key",
DeleteKey: "local-delete-key",
}, nil)
req, _ := http.NewRequest("POST", "/api/snapshots", nil)
req = req.WithContext(identity.WithRequester(req.Context(), testUser))
ctx, recorder := createReqContext(t, req, testUser)
CreateDashboardSnapshot(ctx, cfg, cmd, mockService)
mockService.AssertExpectations(t)
assert.Equal(t, http.StatusOK, recorder.Code)
var response map[string]any
err := json.Unmarshal(recorder.Body.Bytes(), &response)
require.NoError(t, err)
assert.Equal(t, "local-key", response["key"])
assert.Equal(t, "local-delete-key", response["deleteKey"])
assert.Contains(t, response["url"], "dashboard/snapshot/local-key")
assert.Contains(t, response["deleteUrl"], "api/snapshots-delete/local-delete-key")
})
}
// TestCreateDashboardSnapshotPublic tests snapshot creation in public mode.
// These tests cover scenarios when Grafana is running as a public snapshot server
// where no user authentication or dashboard validation is required.
func TestCreateDashboardSnapshotPublic(t *testing.T) {
t.Run("should create local snapshot without user context", func(t *testing.T) {
mockService := NewMockService(t)
cfg := snapshot.SnapshotSharingOptions{
SnapshotsEnabled: true,
}
dashboard := createTestDashboard(t)
cmd := CreateDashboardSnapshotCommand{
DashboardCreateCommand: snapshot.DashboardCreateCommand{
Dashboard: dashboard,
Name: "Test Snapshot",
},
Key: "test-key",
DeleteKey: "test-delete-key",
}
mockService.On("CreateDashboardSnapshot", mock.Anything, mock.Anything).
Return(&DashboardSnapshot{
Key: "test-key",
DeleteKey: "test-delete-key",
}, nil)
req, _ := http.NewRequest("POST", "/api/snapshots", nil)
recorder := httptest.NewRecorder()
ctx := &contextmodel.ReqContext{
Context: &web.Context{
Req: req,
Resp: web.NewResponseWriter("POST", recorder),
},
Logger: log.NewNopLogger(),
}
CreateDashboardSnapshotPublic(ctx, cfg, cmd, mockService)
mockService.AssertExpectations(t)
assert.Equal(t, http.StatusOK, recorder.Code)
var response map[string]any
err := json.Unmarshal(recorder.Body.Bytes(), &response)
require.NoError(t, err)
assert.Equal(t, "test-key", response["key"])
assert.Equal(t, "test-delete-key", response["deleteKey"])
assert.Contains(t, response["url"], "dashboard/snapshot/test-key")
assert.Contains(t, response["deleteUrl"], "api/snapshots-delete/test-delete-key")
})
t.Run("should return forbidden when snapshots are disabled", func(t *testing.T) {
mockService := NewMockService(t)
cfg := snapshot.SnapshotSharingOptions{
SnapshotsEnabled: false,
}
dashboard := createTestDashboard(t)
cmd := CreateDashboardSnapshotCommand{
DashboardCreateCommand: snapshot.DashboardCreateCommand{
Dashboard: dashboard,
Name: "Test Snapshot",
},
}
req, _ := http.NewRequest("POST", "/api/snapshots", nil)
recorder := httptest.NewRecorder()
ctx := &contextmodel.ReqContext{
Context: &web.Context{
Req: req,
Resp: web.NewResponseWriter("POST", recorder),
},
Logger: log.NewNopLogger(),
}
CreateDashboardSnapshotPublic(ctx, cfg, cmd, mockService)
assert.Equal(t, http.StatusForbidden, recorder.Code)
var response map[string]any
err := json.Unmarshal(recorder.Body.Bytes(), &response)
require.NoError(t, err)
assert.Equal(t, "Dashboard Snapshots are disabled", response["message"])
})
}
// TestDeleteExternalDashboardSnapshot tests deletion of external snapshots.
// This function is called in public mode and doesn't require user context.
func TestDeleteExternalDashboardSnapshot(t *testing.T) {
t.Run("should return nil on successful deletion", func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "GET", r.Method)
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
err := DeleteExternalDashboardSnapshot(server.URL)
assert.NoError(t, err)
})
t.Run("should gracefully handle already deleted snapshot", func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
response := map[string]any{
"message": "Failed to get dashboard snapshot",
}
_ = json.NewEncoder(w).Encode(response)
}))
defer server.Close()
err := DeleteExternalDashboardSnapshot(server.URL)
assert.NoError(t, err)
})
t.Run("should return error on unexpected status code", func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
}))
defer server.Close()
err := DeleteExternalDashboardSnapshot(server.URL)
assert.Error(t, err)
assert.Contains(t, err.Error(), "unexpected response when deleting external snapshot")
assert.Contains(t, err.Error(), "404")
})
t.Run("should return error on 500 with different message", func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
response := map[string]any{
"message": "Some other error",
}
_ = json.NewEncoder(w).Encode(response)
}))
defer server.Close()
err := DeleteExternalDashboardSnapshot(server.URL)
assert.Error(t, err)
assert.Contains(t, err.Error(), "500")
})
}
+16
View File
@@ -11,8 +11,18 @@ const (
)
type SecretsManagerSettings struct {
// Which encryption provider to use to encrypt any new secrets
CurrentEncryptionProvider string
// The time to live for decrypted data keys in memory
DataKeysCacheTTL time.Duration
// The interval to remove expired data keys from the cache
DataKeysCacheCleanupInterval time.Duration
// The caution period is the time after which a data key is assumed to be persisted in the worst case scenario.
DataKeysCacheCautionPeriod time.Duration
// Whether to use a Redis cache for data keys instead of the in-memory cache
DataKeysCacheUseRedis bool
// ConfiguredKMSProviders is a map of KMS providers found in the config file. The keys are in the format of <provider>.<keyName>, and the values are a map of the properties in that section
// In OSS, the provider type can only be "secret_key". In Enterprise, it can additionally be one of: "aws_kms", "azure_keyvault", "google_kms", "hashicorp_vault"
ConfiguredKMSProviders map[string]map[string]string
@@ -73,6 +83,12 @@ func (cfg *Cfg) readSecretsManagerSettings() {
cfg.SecretsManagement.AWSKeeperAccessKeyID = secretsMgmt.Key("aws_access_key_id").MustString("")
cfg.SecretsManagement.AWSKeeperSecretAccessKey = secretsMgmt.Key("aws_secret_access_key").MustString("")
cfg.SecretsManagement.DataKeysCacheUseRedis = secretsMgmt.Key("data_keys_cache_use_redis").MustBool(false)
cfg.SecretsManagement.DataKeysCacheTTL = secretsMgmt.Key("data_keys_cache_ttl").MustDuration(15 * time.Minute)
cfg.SecretsManagement.DataKeysCacheCleanupInterval = secretsMgmt.Key("data_keys_cache_cleanup_interval").MustDuration(1 * time.Minute)
// We consider a "caution period" of 10m to be long enough for any database transaction that implied a data key creation to have finished successfully.
cfg.SecretsManagement.DataKeysCacheCautionPeriod = secretsMgmt.Key("data_keys_cache_caution_period").MustDuration(10 * time.Minute)
// Extract available KMS providers from configuration sections
providers := make(map[string]map[string]string)
for _, section := range cfg.Raw.Sections() {
+4 -4
View File
@@ -184,9 +184,9 @@ func (n *eventStore) Get(ctx context.Context, key EventKey) (Event, error) {
}
// ListSince returns a sequence of events since the given resource version.
func (n *eventStore) ListKeysSince(ctx context.Context, sinceRV int64, sortOrder SortOrder) iter.Seq2[string, error] {
func (n *eventStore) ListKeysSince(ctx context.Context, sinceRV int64) iter.Seq2[string, error] {
opts := ListOptions{
Sort: sortOrder,
Sort: SortOrderAsc,
StartKey: fmt.Sprintf("%d", sinceRV),
}
return func(yield func(string, error) bool) {
@@ -202,9 +202,9 @@ func (n *eventStore) ListKeysSince(ctx context.Context, sinceRV int64, sortOrder
}
}
func (n *eventStore) ListSince(ctx context.Context, sinceRV int64, sortOrder SortOrder) iter.Seq2[Event, error] {
func (n *eventStore) ListSince(ctx context.Context, sinceRV int64) iter.Seq2[Event, error] {
return func(yield func(Event, error) bool) {
for evtKey, err := range n.ListKeysSince(ctx, sinceRV, sortOrder) {
for evtKey, err := range n.ListKeysSince(ctx, sinceRV) {
if err != nil {
yield(Event{}, err)
return
@@ -369,7 +369,7 @@ func testEventStoreListKeysSince(t *testing.T, ctx context.Context, store *event
// List events since RV 1500 (should get events with RV 2000 and 3000)
retrievedEvents := make([]string, 0, 2)
for eventKey, err := range store.ListKeysSince(ctx, 1500, SortOrderAsc) {
for eventKey, err := range store.ListKeysSince(ctx, 1500) {
require.NoError(t, err)
retrievedEvents = append(retrievedEvents, eventKey)
}
@@ -429,7 +429,7 @@ func testEventStoreListSince(t *testing.T, ctx context.Context, store *eventStor
// List events since RV 1500 (should get events with RV 2000 and 3000)
retrievedEvents := make([]Event, 0, 2)
for event, err := range store.ListSince(ctx, 1500, SortOrderAsc) {
for event, err := range store.ListSince(ctx, 1500) {
require.NoError(t, err)
retrievedEvents = append(retrievedEvents, event)
}
@@ -453,7 +453,7 @@ func TestEventStore_ListSince_Empty(t *testing.T) {
func testEventStoreListSinceEmpty(t *testing.T, ctx context.Context, store *eventStore) {
// List events when store is empty
retrievedEvents := make([]Event, 0)
for event, err := range store.ListSince(ctx, 0, SortOrderAsc) {
for event, err := range store.ListSince(ctx, 0) {
require.NoError(t, err)
retrievedEvents = append(retrievedEvents, event)
}
@@ -825,7 +825,7 @@ func testListKeysSinceWithSnowflakeTime(t *testing.T, ctx context.Context, store
// List events since 90 minutes ago using subtractDurationFromSnowflake
sinceRV := subtractDurationFromSnowflake(snowflakeFromTime(now), 90*time.Minute)
retrievedEvents := make([]string, 0)
for eventKey, err := range store.ListKeysSince(ctx, sinceRV, SortOrderAsc) {
for eventKey, err := range store.ListKeysSince(ctx, sinceRV) {
require.NoError(t, err)
retrievedEvents = append(retrievedEvents, eventKey)
}
@@ -842,7 +842,7 @@ func testListKeysSinceWithSnowflakeTime(t *testing.T, ctx context.Context, store
// List events since 30 minutes ago using subtractDurationFromSnowflake
sinceRV = subtractDurationFromSnowflake(snowflakeFromTime(now), 30*time.Minute)
retrievedEvents = make([]string, 0)
for eventKey, err := range store.ListKeysSince(ctx, sinceRV, SortOrderAsc) {
for eventKey, err := range store.ListKeysSince(ctx, sinceRV) {
require.NoError(t, err)
retrievedEvents = append(retrievedEvents, eventKey)
}
+1 -1
View File
@@ -119,7 +119,7 @@ func (n *pollingNotifier) Watch(ctx context.Context, opts watchOptions) <-chan E
return
case <-time.After(currentInterval):
foundEvents := false
for evt, err := range n.eventStore.ListSince(ctx, subtractDurationFromSnowflake(lastRV, opts.LookbackPeriod), SortOrderAsc) {
for evt, err := range n.eventStore.ListSince(ctx, subtractDurationFromSnowflake(lastRV, opts.LookbackPeriod)) {
if err != nil {
n.log.Error("Failed to list events since", "error", err)
continue
+16 -21
View File
@@ -801,20 +801,8 @@ func (k *kvStorageBackend) ListModifiedSince(ctx context.Context, key Namespaced
}
}
latestEvent, err := k.eventStore.LastEventKey(ctx)
if err != nil {
if errors.Is(err, ErrNotFound) {
return sinceRv, func(yield func(*ModifiedResource, error) bool) { /* nothing to return */ }
}
return 0, func(yield func(*ModifiedResource, error) bool) {
yield(nil, fmt.Errorf("error trying to retrieve last event key: %s", err))
}
}
if latestEvent.ResourceVersion == sinceRv {
return sinceRv, func(yield func(*ModifiedResource, error) bool) { /* nothing to return */ }
}
// Generate a new resource version for the list
listRV := k.snowflake.Generate().Int64()
// Check if sinceRv is older than 1 hour
sinceRvTimestamp := snowflake.ID(sinceRv).Time()
@@ -823,11 +811,11 @@ func (k *kvStorageBackend) ListModifiedSince(ctx context.Context, key Namespaced
if sinceRvAge > time.Hour {
k.log.Debug("ListModifiedSince using data store", "sinceRv", sinceRv, "sinceRvAge", sinceRvAge)
return latestEvent.ResourceVersion, k.listModifiedSinceDataStore(ctx, key, sinceRv)
return listRV, k.listModifiedSinceDataStore(ctx, key, sinceRv)
}
k.log.Debug("ListModifiedSince using event store", "sinceRv", sinceRv, "sinceRvAge", sinceRvAge)
return latestEvent.ResourceVersion, k.listModifiedSinceEventStore(ctx, key, sinceRv)
return listRV, k.listModifiedSinceEventStore(ctx, key, sinceRv)
}
func convertEventType(action DataAction) resourcepb.WatchEvent_Type {
@@ -928,9 +916,9 @@ func (k *kvStorageBackend) listModifiedSinceDataStore(ctx context.Context, key N
func (k *kvStorageBackend) listModifiedSinceEventStore(ctx context.Context, key NamespacedResource, sinceRv int64) iter.Seq2[*ModifiedResource, error] {
return func(yield func(*ModifiedResource, error) bool) {
// we only care about the latest revision of every resource in the list
seen := make(map[string]struct{})
for evtKeyStr, err := range k.eventStore.ListKeysSince(ctx, subtractDurationFromSnowflake(sinceRv, defaultLookbackPeriod), SortOrderDesc) {
// store all events ordered by RV for the given tenant here
eventKeys := make([]EventKey, 0)
for evtKeyStr, err := range k.eventStore.ListKeysSince(ctx, subtractDurationFromSnowflake(sinceRv, defaultLookbackPeriod)) {
if err != nil {
yield(&ModifiedResource{}, err)
return
@@ -950,11 +938,18 @@ func (k *kvStorageBackend) listModifiedSinceEventStore(ctx context.Context, key
continue
}
eventKeys = append(eventKeys, evtKey)
}
// we only care about the latest revision of every resource in the list
seen := make(map[string]struct{})
for i := len(eventKeys) - 1; i >= 0; i -= 1 {
evtKey := eventKeys[i]
if _, ok := seen[evtKey.Name]; ok {
continue
}
seen[evtKey.Name] = struct{}{}
value, err := k.getValueFromDataStore(ctx, DataKey(evtKey))
if err != nil {
yield(&ModifiedResource{}, err)
@@ -1312,7 +1307,7 @@ func (b *kvStorageBackend) ProcessBulk(ctx context.Context, setting BulkSettings
if setting.RebuildCollection {
for _, key := range setting.Collection {
events := make([]string, 0)
for evtKeyStr, err := range b.eventStore.ListKeysSince(ctx, 1, SortOrderAsc) {
for evtKeyStr, err := range b.eventStore.ListKeysSince(ctx, 1) {
if err != nil {
b.log.Error("failed to list event: %s", err)
return rsp
+6 -11
View File
@@ -23,7 +23,6 @@ import (
"github.com/grafana/authlib/types"
"github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
sqldb "github.com/grafana/grafana/pkg/storage/unified/sql/db"
@@ -100,10 +99,6 @@ func RunStorageBackendTest(t *testing.T, newBackend NewBackendFunc, opts *TestOp
}
t.Run(tc.name, func(t *testing.T) {
if db.IsTestDbSQLite() {
t.Skip("Skipping tests on sqlite until channel notifier is implemented")
}
tc.fn(t, newBackend(context.Background()), opts.NSPrefix)
})
}
@@ -555,7 +550,7 @@ func runTestIntegrationBackendListModifiedSince(t *testing.T, backend resource.S
Resource: "resource",
}
latestRv, seq := backend.ListModifiedSince(ctx, key, rvCreated)
require.Equal(t, latestRv, rvDeleted)
require.Greater(t, latestRv, rvCreated)
counter := 0
for res, err := range seq {
@@ -629,11 +624,11 @@ func runTestIntegrationBackendListModifiedSince(t *testing.T, backend resource.S
rvCreated3, _ := writeEvent(ctx, backend, "bItem", resourcepb.WatchEvent_ADDED, WithNamespace(ns))
latestRv, seq := backend.ListModifiedSince(ctx, key, rvCreated1-1)
require.Equal(t, latestRv, rvCreated3)
require.Greater(t, latestRv, rvCreated3)
counter := 0
names := []string{"bItem", "aItem", "cItem"}
rvs := []int64{rvCreated3, rvCreated2, rvCreated1}
names := []string{"aItem", "bItem", "cItem"}
rvs := []int64{rvCreated2, rvCreated3, rvCreated1}
for res, err := range seq {
require.NoError(t, err)
require.Equal(t, key.Namespace, res.Key.Namespace)
@@ -1171,7 +1166,7 @@ func runTestIntegrationBackendCreateNewResource(t *testing.T, backend resource.S
}))
server := newServer(t, backend)
ns := nsPrefix + "-create-rsrce" // create-resource
ns := nsPrefix + "-create-resource"
ctx = request.WithNamespace(ctx, ns)
request := &resourcepb.CreateRequest{
@@ -1612,7 +1607,7 @@ func (s *sliceBulkRequestIterator) RollbackRequested() bool {
func runTestIntegrationBackendOptimisticLocking(t *testing.T, backend resource.StorageBackend, nsPrefix string) {
ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second))
ns := nsPrefix + "-optimis-lock" // optimistic-locking. need to cut down on characters to not exceed namespace character limit (40)
ns := nsPrefix + "-optimistic-locking"
t.Run("concurrent updates with same RV - only one succeeds", func(t *testing.T) {
// Create initial resource with rv0 (no previous RV)
@@ -36,10 +36,6 @@ func NewTestSqlKvBackend(t *testing.T, ctx context.Context, withRvManager bool)
KvStore: kv,
}
if db.DriverName() == "sqlite3" {
kvOpts.UseChannelNotifier = true
}
if withRvManager {
dialect := sqltemplate.DialectForDriver(db.DriverName())
rvManager, err := rvmanager.NewResourceVersionManager(rvmanager.ResourceManagerOptions{
@@ -30,6 +30,7 @@ func TestBadgerKVStorageBackend(t *testing.T) {
SkipTests: map[string]bool{
// TODO: fix these tests and remove this skip
TestBlobSupport: true,
TestListModifiedSince: true,
// Badger does not support bulk import yet.
TestGetResourceLastImportTime: true,
},
@@ -40,8 +41,17 @@ func TestIntegrationSQLKVStorageBackend(t *testing.T) {
testutil.SkipIntegrationTestInShortMode(t)
skipTests := map[string]bool{
TestWatchWriteEvents: true,
TestList: true,
TestBlobSupport: true,
TestGetResourceStats: true,
TestListHistory: true,
TestListHistoryErrorReporting: true,
TestListModifiedSince: true,
TestListTrash: true,
TestCreateNewResource: true,
TestGetResourceLastImportTime: true,
TestOptimisticLocking: true,
}
t.Run("Without RvManager", func(t *testing.T) {
@@ -49,7 +59,7 @@ func TestIntegrationSQLKVStorageBackend(t *testing.T) {
backend, _ := NewTestSqlKvBackend(t, ctx, false)
return backend
}, &TestOptions{
NSPrefix: "sqlkvstoragetest",
NSPrefix: "sqlkvstorage-test",
SkipTests: skipTests,
})
})
@@ -59,7 +69,7 @@ func TestIntegrationSQLKVStorageBackend(t *testing.T) {
backend, _ := NewTestSqlKvBackend(t, ctx, true)
return backend
}, &TestOptions{
NSPrefix: "sqlkvstoragetest-rvmanager",
NSPrefix: "sqlkvstorage-withrvmanager-test",
SkipTests: skipTests,
})
})