diff --git a/pkg/setting/setting.go b/pkg/setting/setting.go index a01549d9da5..8e309433032 100644 --- a/pkg/setting/setting.go +++ b/pkg/setting/setting.go @@ -618,6 +618,7 @@ type Cfg struct { EnableSearch bool OverridesFilePath string OverridesReloadInterval time.Duration + EnableSQLKVBackend bool // Secrets Management SecretsManagement SecretsManagerSettings diff --git a/pkg/setting/setting_unified_storage.go b/pkg/setting/setting_unified_storage.go index 4f69daa64fd..72e01ce6ce9 100644 --- a/pkg/setting/setting_unified_storage.go +++ b/pkg/setting/setting_unified_storage.go @@ -100,6 +100,9 @@ func (cfg *Cfg) setUnifiedStorageConfig() { cfg.OverridesFilePath = section.Key("overrides_path").String() cfg.OverridesReloadInterval = section.Key("overrides_reload_period").MustDuration(30 * time.Second) + // use sqlkv (resource/sqlkv) instead of the sql backend (sql/backend) as the StorageServer + cfg.EnableSQLKVBackend = section.Key("enable_sqlkv_backend").MustBool(false) + cfg.MaxFileIndexAge = section.Key("max_file_index_age").MustDuration(0) cfg.MinFileIndexBuildVersion = section.Key("min_file_index_build_version").MustString("") } diff --git a/pkg/storage/unified/resource/datastore_test.go b/pkg/storage/unified/resource/datastore_test.go index 8f167c2e16e..02c318fe6d9 100644 --- a/pkg/storage/unified/resource/datastore_test.go +++ b/pkg/storage/unified/resource/datastore_test.go @@ -9,6 +9,9 @@ import ( "testing" "github.com/bwmarrin/snowflake" + "github.com/grafana/grafana/pkg/infra/db" + "github.com/grafana/grafana/pkg/setting" + "github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl" "github.com/stretchr/testify/require" ) @@ -24,6 +27,16 @@ func TestNewDataStore(t *testing.T) { require.NotNil(t, ds) } +// nolint:unused +func setupTestDataStoreSqlKv(t *testing.T) *dataStore { + dbstore := db.InitTestDB(t) + eDB, err := dbimpl.ProvideResourceDB(dbstore, setting.NewCfg(), nil) + require.NoError(t, err) + kv, err := NewSQLKV(eDB) + require.NoError(t, err) + return newDataStore(kv) +} + func TestDataKey_String(t *testing.T) { rv := int64(1934555792099250176) tests := []struct { @@ -679,10 +692,21 @@ func TestParseKey(t *testing.T) { } } -func TestDataStore_Save_And_Get(t *testing.T) { - ds := setupTestDataStore(t) - ctx := context.Background() +func runDataStoreTestWith(t *testing.T, storeName string, newStoreFn func(*testing.T) *dataStore, testFn func(*testing.T, context.Context, *dataStore)) { + t.Run(storeName, func(t *testing.T) { + ctx := context.Background() + store := newStoreFn(t) + testFn(t, ctx, store) + }) +} +func TestDataStore_Save_And_Get(t *testing.T) { + runDataStoreTestWith(t, "badger", setupTestDataStore, testDataStoreSaveAndGet) + // enable this when sqlkv is ready + // runDataStoreTestWith(t, "sqlkv", setupTestDataStoreSqlKv, testDataStoreSaveAndGet) +} + +func testDataStoreSaveAndGet(t *testing.T, ctx context.Context, ds *dataStore) { rv := node.Generate() testKey := DataKey{ @@ -744,9 +768,12 @@ func TestDataStore_Save_And_Get(t *testing.T) { } func TestDataStore_Delete(t *testing.T) { - ds := setupTestDataStore(t) - ctx := context.Background() + runDataStoreTestWith(t, "badger", setupTestDataStore, testDataStoreDelete) + // enable this when sqlkv is ready + // runDataStoreTestWith(t, "sqlkv", setupTestDataStoreSqlKv, testDataStoreDelete) +} +func testDataStoreDelete(t *testing.T, ctx context.Context, ds *dataStore) { rv := node.Generate() testKey := DataKey{ @@ -795,9 +822,12 @@ func TestDataStore_Delete(t *testing.T) { } func TestDataStore_List(t *testing.T) { - ds := setupTestDataStore(t) - ctx := context.Background() + runDataStoreTestWith(t, "badger", setupTestDataStore, testDataStoreList) + // enable this when sqlkv is ready + // runDataStoreTestWith(t, "sqlkv", setupTestDataStoreSqlKv, testDataStoreList) +} +func testDataStoreList(t *testing.T, ctx context.Context, ds *dataStore) { resourceKey := ListRequestKey{ Namespace: "test-namespace", Group: "test-group", @@ -919,9 +949,12 @@ func TestDataStore_List(t *testing.T) { } func TestDataStore_Integration(t *testing.T) { - ds := setupTestDataStore(t) - ctx := context.Background() + runDataStoreTestWith(t, "badger", setupTestDataStore, testDataStoreIntegration) + // enable this when sqlkv is ready + // runDataStoreTestWith(t, "sqlkv", setupTestDataStoreSqlKv, testDataStoreIntegration) +} +func testDataStoreIntegration(t *testing.T, ctx context.Context, ds *dataStore) { t.Run("full lifecycle test", func(t *testing.T) { resourceKey := ListRequestKey{ Namespace: "integration-ns", @@ -1007,9 +1040,12 @@ func TestDataStore_Integration(t *testing.T) { } func TestDataStore_Keys(t *testing.T) { - ds := setupTestDataStore(t) - ctx := context.Background() + runDataStoreTestWith(t, "badger", setupTestDataStore, testDataStoreKeys) + // enable this when sqlkv is ready + // runDataStoreTestWith(t, "sqlkv", setupTestDataStoreSqlKv, testDataStoreKeys) +} +func testDataStoreKeys(t *testing.T, ctx context.Context, ds *dataStore) { resourceKey := ListRequestKey{ Namespace: "test-namespace", Group: "test-group", @@ -1154,9 +1190,12 @@ func TestDataStore_Keys(t *testing.T) { } func TestDataStore_ValidationEnforced(t *testing.T) { - ds := setupTestDataStore(t) - ctx := context.Background() + runDataStoreTestWith(t, "badger", setupTestDataStore, testDataStoreValidationEnforced) + // enable this when sqlkv is ready + // runDataStoreTestWith(t, "sqlkv", setupTestDataStoreSqlKv, testDataStoreValidationEnforced) +} +func testDataStoreValidationEnforced(t *testing.T, ctx context.Context, ds *dataStore) { // Create an invalid key invalidKey := DataKey{ Namespace: "Invalid-Namespace-$$$", @@ -1483,9 +1522,12 @@ func TestListRequestKey_Prefix(t *testing.T) { } func TestDataStore_LastResourceVersion(t *testing.T) { - ds := setupTestDataStore(t) - ctx := context.Background() + runDataStoreTestWith(t, "badger", setupTestDataStore, testDataStoreLastResourceVersion) + // enable this when sqlkv is ready + // runDataStoreTestWith(t, "sqlkv", setupTestDataStoreSqlKv, testDataStoreLastResourceVersion) +} +func testDataStoreLastResourceVersion(t *testing.T, ctx context.Context, ds *dataStore) { t.Run("returns last resource version for existing data", func(t *testing.T) { resourceKey := ListRequestKey{ Namespace: "test-namespace", @@ -1585,9 +1627,12 @@ func TestDataStore_LastResourceVersion(t *testing.T) { } func TestDataStore_GetLatestResourceKey(t *testing.T) { - ds := setupTestDataStore(t) - ctx := context.Background() + runDataStoreTestWith(t, "badger", setupTestDataStore, testDataStoreGetLatestResourceKey) + // enable this when sqlkv is ready + // runDataStoreTestWith(t, "sqlkv", setupTestDataStoreSqlKv, testDataStoreGetLatestResourceKey) +} +func testDataStoreGetLatestResourceKey(t *testing.T, ctx context.Context, ds *dataStore) { key := GetRequestKey{ Group: "apps", Resource: "resources", @@ -1648,9 +1693,12 @@ func TestDataStore_GetLatestResourceKey(t *testing.T) { } func TestDataStore_GetLatestResourceKey_Deleted(t *testing.T) { - ds := setupTestDataStore(t) - ctx := context.Background() + runDataStoreTestWith(t, "badger", setupTestDataStore, testDataStoreGetLatestResourceKeyDeleted) + // enable this when sqlkv is ready + // runDataStoreTestWith(t, "sqlkv", setupTestDataStoreSqlKv, testDataStoreGetLatestResourceKeyDeleted) +} +func testDataStoreGetLatestResourceKeyDeleted(t *testing.T, ctx context.Context, ds *dataStore) { key := GetRequestKey{ Group: "apps", Resource: "resources", @@ -1676,9 +1724,12 @@ func TestDataStore_GetLatestResourceKey_Deleted(t *testing.T) { } func TestDataStore_GetLatestResourceKey_NotFound(t *testing.T) { - ds := setupTestDataStore(t) - ctx := context.Background() + runDataStoreTestWith(t, "badger", setupTestDataStore, testDataStoreGetLatestResourceKeyNotFound) + // enable this when sqlkv is ready + // runDataStoreTestWith(t, "sqlkv", setupTestDataStoreSqlKv, testDataStoreGetLatestResourceKeyNotFound) +} +func testDataStoreGetLatestResourceKeyNotFound(t *testing.T, ctx context.Context, ds *dataStore) { key := GetRequestKey{ Group: "apps", Resource: "resources", @@ -1691,9 +1742,12 @@ func TestDataStore_GetLatestResourceKey_NotFound(t *testing.T) { } func TestDataStore_GetResourceKeyAtRevision(t *testing.T) { - ds := setupTestDataStore(t) - ctx := context.Background() + runDataStoreTestWith(t, "badger", setupTestDataStore, testDataStoreGetResourceKeyAtRevision) + // enable this when sqlkv is ready + // runDataStoreTestWith(t, "sqlkv", setupTestDataStoreSqlKv, testDataStoreGetResourceKeyAtRevision) +} +func testDataStoreGetResourceKeyAtRevision(t *testing.T, ctx context.Context, ds *dataStore) { key := GetRequestKey{ Group: "apps", Resource: "resources", @@ -1766,9 +1820,12 @@ func TestDataStore_GetResourceKeyAtRevision(t *testing.T) { } func TestDataStore_ListLatestResourceKeys(t *testing.T) { - ds := setupTestDataStore(t) - ctx := context.Background() + runDataStoreTestWith(t, "badger", setupTestDataStore, testDataStoreListLatestResourceKeys) + // enable this when sqlkv is ready + // runDataStoreTestWith(t, "sqlkv", setupTestDataStoreSqlKv, testDataStoreListLatestResourceKeys) +} +func testDataStoreListLatestResourceKeys(t *testing.T, ctx context.Context, ds *dataStore) { listKey := ListRequestKey{ Group: "apps", Resource: "resources", @@ -1819,9 +1876,12 @@ func TestDataStore_ListLatestResourceKeys(t *testing.T) { } func TestDataStore_ListLatestResourceKeys_Deleted(t *testing.T) { - ds := setupTestDataStore(t) - ctx := context.Background() + runDataStoreTestWith(t, "badger", setupTestDataStore, testDataStoreListLatestResourceKeysDeleted) + // enable this when sqlkv is ready + // runDataStoreTestWith(t, "sqlkv", setupTestDataStoreSqlKv, testDataStoreListLatestResourceKeysDeleted) +} +func testDataStoreListLatestResourceKeysDeleted(t *testing.T, ctx context.Context, ds *dataStore) { listKey := ListRequestKey{ Group: "apps", Resource: "resources", @@ -1869,9 +1929,12 @@ func TestDataStore_ListLatestResourceKeys_Deleted(t *testing.T) { } func TestDataStore_ListLatestResourceKeys_Multiple(t *testing.T) { - ds := setupTestDataStore(t) - ctx := context.Background() + runDataStoreTestWith(t, "badger", setupTestDataStore, testDataStoreListLatestResourceKeysMultiple) + // enable this when sqlkv is ready + // runDataStoreTestWith(t, "sqlkv", setupTestDataStoreSqlKv, testDataStoreListLatestResourceKeysMultiple) +} +func testDataStoreListLatestResourceKeysMultiple(t *testing.T, ctx context.Context, ds *dataStore) { listKey := ListRequestKey{ Group: "apps", Resource: "resources", @@ -1940,9 +2003,12 @@ func TestDataStore_ListLatestResourceKeys_Multiple(t *testing.T) { } func TestDataStore_ListResourceKeysAtRevision(t *testing.T) { - ds := setupTestDataStore(t) - ctx := context.Background() + runDataStoreTestWith(t, "badger", setupTestDataStore, testDataStoreListResourceKeysAtRevision) + // enable this when sqlkv is ready + // runDataStoreTestWith(t, "sqlkv", setupTestDataStoreSqlKv, testDataStoreListResourceKeysAtRevision) +} +func testDataStoreListResourceKeysAtRevision(t *testing.T, ctx context.Context, ds *dataStore) { // Create multiple resources with different versions rv1 := node.Generate().Int64() rv2 := node.Generate().Int64() @@ -2152,9 +2218,12 @@ func TestDataStore_ListResourceKeysAtRevision(t *testing.T) { } func TestDataStore_ListResourceKeysAtRevision_ValidationErrors(t *testing.T) { - ds := setupTestDataStore(t) - ctx := context.Background() + runDataStoreTestWith(t, "badger", setupTestDataStore, testDataStoreListResourceKeysAtRevisionValidationErrors) + // enable this when sqlkv is ready + // runDataStoreTestWith(t, "sqlkv", setupTestDataStoreSqlKv, testDataStoreListResourceKeysAtRevisionValidationErrors) +} +func testDataStoreListResourceKeysAtRevisionValidationErrors(t *testing.T, ctx context.Context, ds *dataStore) { tests := []struct { name string key ListRequestKey @@ -2194,9 +2263,12 @@ func TestDataStore_ListResourceKeysAtRevision_ValidationErrors(t *testing.T) { } func TestDataStore_ListResourceKeysAtRevision_EmptyResults(t *testing.T) { - ds := setupTestDataStore(t) - ctx := context.Background() + runDataStoreTestWith(t, "badger", setupTestDataStore, testDataStoreListResourceKeysAtRevisionEmptyResults) + // enable this when sqlkv is ready + // runDataStoreTestWith(t, "sqlkv", setupTestDataStoreSqlKv, testDataStoreListResourceKeysAtRevisionEmptyResults) +} +func testDataStoreListResourceKeysAtRevisionEmptyResults(t *testing.T, ctx context.Context, ds *dataStore) { listKey := ListRequestKey{ Group: "apps", Resource: "resources", @@ -2213,9 +2285,12 @@ func TestDataStore_ListResourceKeysAtRevision_EmptyResults(t *testing.T) { } func TestDataStore_ListResourceKeysAtRevision_ResourcesNewerThanRevision(t *testing.T) { - ds := setupTestDataStore(t) - ctx := context.Background() + runDataStoreTestWith(t, "badger", setupTestDataStore, testDataStoreListResourceKeysAtRevisionResourcesNewerThanRevision) + // enable this when sqlkv is ready + // runDataStoreTestWith(t, "sqlkv", setupTestDataStoreSqlKv, testDataStoreListResourceKeysAtRevisionResourcesNewerThanRevision) +} +func testDataStoreListResourceKeysAtRevisionResourcesNewerThanRevision(t *testing.T, ctx context.Context, ds *dataStore) { // Create a resource with a high resource version rv := node.Generate().Int64() key := DataKey{ @@ -2681,9 +2756,12 @@ func TestGetRequestKey_Prefix(t *testing.T) { } func TestDataStore_GetResourceStats_Comprehensive(t *testing.T) { - ds := setupTestDataStore(t) - ctx := context.Background() + runDataStoreTestWith(t, "badger", setupTestDataStore, testDataStoreGetResourceStatsComprehensive) + // enable this when sqlkv is ready + // runDataStoreTestWith(t, "sqlkv", setupTestDataStoreSqlKv, testDataStoreGetResourceStatsComprehensive) +} +func testDataStoreGetResourceStatsComprehensive(t *testing.T, ctx context.Context, ds *dataStore) { // Test setup: 3 namespaces × 3 groups × 3 resources × 3 names × 3 versions = 243 total entries // But each name will have only 1 latest version that counts, so 3 × 3 × 3 × 3 = 81 non-deleted resources namespaces := []string{"ns1", "ns2", "ns3"} @@ -2888,9 +2966,12 @@ func TestDataStore_GetResourceStats_Comprehensive(t *testing.T) { } func TestDataStore_getGroupResources(t *testing.T) { - ds := setupTestDataStore(t) - ctx := context.Background() + runDataStoreTestWith(t, "badger", setupTestDataStore, testDataStoreGetGroupResources) + // enable this when sqlkv is ready + // runDataStoreTestWith(t, "sqlkv", setupTestDataStoreSqlKv, testDataStoreGetGroupResources) +} +func testDataStoreGetGroupResources(t *testing.T, ctx context.Context, ds *dataStore) { // Create test data with multiple group/resource combinations testData := []struct { group string @@ -2951,9 +3032,12 @@ func TestDataStore_getGroupResources(t *testing.T) { } func TestDataStore_BatchDelete(t *testing.T) { - ds := setupTestDataStore(t) - ctx := context.Background() + runDataStoreTestWith(t, "badger", setupTestDataStore, testDataStoreBatchDelete) + // enable this when sqlkv is ready + // runDataStoreTestWith(t, "sqlkv", setupTestDataStoreSqlKv, testDataStoreBatchDelete) +} +func testDataStoreBatchDelete(t *testing.T, ctx context.Context, ds *dataStore) { keys := make([]DataKey, 95) for i := 0; i < 95; i++ { rv := node.Generate().Int64() @@ -2987,9 +3071,12 @@ func TestDataStore_BatchDelete(t *testing.T) { } func TestDataStore_BatchGet(t *testing.T) { - ds := setupTestDataStore(t) - ctx := context.Background() + runDataStoreTestWith(t, "badger", setupTestDataStore, testDataStoreBatchGet) + // enable this when sqlkv is ready + // runDataStoreTestWith(t, "sqlkv", setupTestDataStoreSqlKv, testDataStoreBatchGet) +} +func testDataStoreBatchGet(t *testing.T, ctx context.Context, ds *dataStore) { t.Run("batch get multiple existing keys", func(t *testing.T) { // Create test data keys := make([]DataKey, 5) @@ -3132,9 +3219,12 @@ func TestDataStore_BatchGet(t *testing.T) { } func TestDataStore_GetLatestAndPredecessor(t *testing.T) { - ds := setupTestDataStore(t) - ctx := context.Background() + runDataStoreTestWith(t, "badger", setupTestDataStore, testDataStoreGetLatestAndPredecessor) + // enable this when sqlkv is ready + // runDataStoreTestWith(t, "sqlkv", setupTestDataStoreSqlKv, testDataStoreGetLatestAndPredecessor) +} +func testDataStoreGetLatestAndPredecessor(t *testing.T, ctx context.Context, ds *dataStore) { resourceKey := ListRequestKey{ Namespace: "test-namespace", Group: "test-group", diff --git a/pkg/storage/unified/resource/eventstore_test.go b/pkg/storage/unified/resource/eventstore_test.go index a9d2ee93eb4..270db1ddd3f 100644 --- a/pkg/storage/unified/resource/eventstore_test.go +++ b/pkg/storage/unified/resource/eventstore_test.go @@ -7,6 +7,10 @@ import ( "time" "github.com/bwmarrin/snowflake" + "github.com/grafana/grafana/pkg/infra/db" + "github.com/grafana/grafana/pkg/setting" + "github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl" + "github.com/grafana/grafana/pkg/tests/testsuite" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -21,6 +25,20 @@ func setupTestEventStore(t *testing.T) *eventStore { return newEventStore(kv) } +func TestMain(m *testing.M) { + testsuite.Run(m) +} + +// nolint:unused +func setupTestEventStoreSqlKv(t *testing.T) *eventStore { + dbstore := db.InitTestDB(t) + eDB, err := dbimpl.ProvideResourceDB(dbstore, setting.NewCfg(), nil) + require.NoError(t, err) + kv, err := NewSQLKV(eDB) + require.NoError(t, err) + return newEventStore(kv) +} + func TestNewEventStore(t *testing.T) { store := setupTestEventStore(t) assert.NotNil(t, store.kv) @@ -180,10 +198,21 @@ func TestEventStore_ParseEventKey(t *testing.T) { assert.Equal(t, originalKey, parsedKey) } -func TestEventStore_Save_Get(t *testing.T) { - ctx := context.Background() - store := setupTestEventStore(t) +func runEventStoreTestWith(t *testing.T, storeName string, newStoreFn func(*testing.T) *eventStore, testFn func(*testing.T, context.Context, *eventStore)) { + t.Run(storeName, func(t *testing.T) { + ctx := context.Background() + store := newStoreFn(t) + testFn(t, ctx, store) + }) +} +func TestEventStore_Save_Get(t *testing.T) { + runEventStoreTestWith(t, "badger", setupTestEventStore, testEventStoreSaveGet) + // enable this when sqlkv is ready + // runEventStoreTestWith(t, "sqlkv", setupTestEventStoreSqlKv, testEventStoreSaveGet) +} + +func testEventStoreSaveGet(t *testing.T, ctx context.Context, store *eventStore) { event := Event{ Namespace: "default", Group: "apps", @@ -216,9 +245,12 @@ func TestEventStore_Save_Get(t *testing.T) { } func TestEventStore_Get_NotFound(t *testing.T) { - ctx := context.Background() - store := setupTestEventStore(t) + runEventStoreTestWith(t, "badger", setupTestEventStore, testEventStoreGetNotFound) + // enable this when sqlkv is ready + // runEventStoreTestWith(t, "sqlkv", setupTestEventStoreSqlKv, testEventStoreGetNotFound) +} +func testEventStoreGetNotFound(t *testing.T, ctx context.Context, store *eventStore) { nonExistentKey := EventKey{ Namespace: "default", Group: "apps", @@ -233,9 +265,12 @@ func TestEventStore_Get_NotFound(t *testing.T) { } func TestEventStore_LastEventKey(t *testing.T) { - ctx := context.Background() - store := setupTestEventStore(t) + runEventStoreTestWith(t, "badger", setupTestEventStore, testEventStoreLastEventKey) + // enable this when sqlkv is ready + // runEventStoreTestWith(t, "sqlkv", setupTestEventStoreSqlKv, testEventStoreLastEventKey) +} +func testEventStoreLastEventKey(t *testing.T, ctx context.Context, store *eventStore) { // Test when no events exist _, err := store.LastEventKey(ctx) assert.Error(t, err) @@ -292,9 +327,12 @@ func TestEventStore_LastEventKey(t *testing.T) { } func TestEventStore_ListKeysSince(t *testing.T) { - ctx := context.Background() - store := setupTestEventStore(t) + runEventStoreTestWith(t, "badger", setupTestEventStore, testEventStoreListKeysSince) + // enable this when sqlkv is ready + // runEventStoreTestWith(t, "sqlkv", setupTestEventStoreSqlKv, testEventStoreListKeysSince) +} +func testEventStoreListKeysSince(t *testing.T, ctx context.Context, store *eventStore) { // Add events with different resource versions events := []Event{ { @@ -349,9 +387,12 @@ func TestEventStore_ListKeysSince(t *testing.T) { } func TestEventStore_ListSince(t *testing.T) { - ctx := context.Background() - store := setupTestEventStore(t) + runEventStoreTestWith(t, "badger", setupTestEventStore, testEventStoreListSince) + // enable this when sqlkv is ready + // runEventStoreTestWith(t, "sqlkv", setupTestEventStoreSqlKv, testEventStoreListSince) +} +func testEventStoreListSince(t *testing.T, ctx context.Context, store *eventStore) { // Add events with different resource versions events := []Event{ { @@ -404,9 +445,12 @@ func TestEventStore_ListSince(t *testing.T) { } func TestEventStore_ListSince_Empty(t *testing.T) { - ctx := context.Background() - store := setupTestEventStore(t) + runEventStoreTestWith(t, "badger", setupTestEventStore, testEventStoreListSinceEmpty) + // enable this when sqlkv is ready + // runEventStoreTestWith(t, "sqlkv", setupTestEventStoreSqlKv, testEventStoreListSinceEmpty) +} +func testEventStoreListSinceEmpty(t *testing.T, ctx context.Context, store *eventStore) { // List events when store is empty retrievedEvents := make([]Event, 0) for event, err := range store.ListSince(ctx, 0) { @@ -459,9 +503,12 @@ func TestEventKey_Struct(t *testing.T) { } func TestEventStore_Save_InvalidJSON(t *testing.T) { - ctx := context.Background() - store := setupTestEventStore(t) + runEventStoreTestWith(t, "badger", setupTestEventStore, testEventStoreSaveInvalidJSON) + // enable this when sqlkv is ready + // runEventStoreTestWith(t, "sqlkv", setupTestEventStoreSqlKv, testEventStoreSaveInvalidJSON) +} +func testEventStoreSaveInvalidJSON(t *testing.T, ctx context.Context, store *eventStore) { // This should work fine as the Event struct should be serializable event := Event{ Namespace: "default", @@ -477,9 +524,12 @@ func TestEventStore_Save_InvalidJSON(t *testing.T) { } func TestEventStore_CleanupOldEvents(t *testing.T) { - ctx := context.Background() - store := setupTestEventStore(t) + runEventStoreTestWith(t, "badger", setupTestEventStore, testEventStoreCleanupOldEvents) + // enable this when sqlkv is ready + // runEventStoreTestWith(t, "sqlkv", setupTestEventStoreSqlKv, testEventStoreCleanupOldEvents) +} +func testEventStoreCleanupOldEvents(t *testing.T, ctx context.Context, store *eventStore) { now := time.Now() oldRV := snowflakeFromTime(now.Add(-48 * time.Hour)) // 48 hours ago recentRV := snowflakeFromTime(now.Add(-1 * time.Hour)) // 1 hour ago @@ -565,9 +615,12 @@ func TestEventStore_CleanupOldEvents(t *testing.T) { } func TestEventStore_CleanupOldEvents_NoOldEvents(t *testing.T) { - ctx := context.Background() - store := setupTestEventStore(t) + runEventStoreTestWith(t, "badger", setupTestEventStore, testEventStoreCleanupOldEventsNoOldEvents) + // enable this when sqlkv is ready + // runEventStoreTestWith(t, "sqlkv", setupTestEventStoreSqlKv, testEventStoreCleanupOldEventsNoOldEvents) +} +func testEventStoreCleanupOldEventsNoOldEvents(t *testing.T, ctx context.Context, store *eventStore) { // Create an event 1 hour old rv := snowflakeFromTime(time.Now().Add(-1 * time.Hour)) event := Event{ @@ -603,9 +656,12 @@ func TestEventStore_CleanupOldEvents_NoOldEvents(t *testing.T) { } func TestEventStore_CleanupOldEvents_EmptyStore(t *testing.T) { - ctx := context.Background() - store := setupTestEventStore(t) + runEventStoreTestWith(t, "badger", setupTestEventStore, testEventStoreCleanupOldEventsEmptyStore) + // enable this when sqlkv is ready + // runEventStoreTestWith(t, "sqlkv", setupTestEventStoreSqlKv, testEventStoreCleanupOldEventsEmptyStore) +} +func testEventStoreCleanupOldEventsEmptyStore(t *testing.T, ctx context.Context, store *eventStore) { // Clean up events from empty store deletedCount, err := store.CleanupOldEvents(ctx, time.Now().Add(-24*time.Hour)) require.NoError(t, err) @@ -613,9 +669,12 @@ func TestEventStore_CleanupOldEvents_EmptyStore(t *testing.T) { } func TestEventStore_BatchDelete(t *testing.T) { - ctx := context.Background() - store := setupTestEventStore(t) + runEventStoreTestWith(t, "badger", setupTestEventStore, testEventStoreBatchDelete) + // enable this when sqlkv is ready + // runEventStoreTestWith(t, "sqlkv", setupTestEventStoreSqlKv, testEventStoreBatchDelete) +} +func testEventStoreBatchDelete(t *testing.T, ctx context.Context, store *eventStore) { // Create multiple events (more than batch size to test batching) eventKeys := make([]string, 75) for i := 0; i < 75; i++ { @@ -722,9 +781,12 @@ func TestSnowflakeFromTime(t *testing.T) { } func TestListKeysSince_WithSnowflakeTime(t *testing.T) { - ctx := context.Background() - store := setupTestEventStore(t) + runEventStoreTestWith(t, "badger", setupTestEventStore, testListKeysSinceWithSnowflakeTime) + // enable this when sqlkv is ready + // runEventStoreTestWith(t, "sqlkv", setupTestEventStoreSqlKv, testListKeysSinceWithSnowflakeTime) +} +func testListKeysSinceWithSnowflakeTime(t *testing.T, ctx context.Context, store *eventStore) { // Create events with snowflake-based resource versions at different times now := time.Now() events := []Event{ diff --git a/pkg/storage/unified/resource/notifier_test.go b/pkg/storage/unified/resource/notifier_test.go index 7b201f47420..060f8eecfbe 100644 --- a/pkg/storage/unified/resource/notifier_test.go +++ b/pkg/storage/unified/resource/notifier_test.go @@ -6,6 +6,9 @@ import ( "time" "github.com/grafana/grafana-app-sdk/logging" + "github.com/grafana/grafana/pkg/infra/db" + "github.com/grafana/grafana/pkg/setting" + "github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -22,6 +25,18 @@ func setupTestNotifier(t *testing.T) (*notifier, *eventStore) { return notifier, eventStore } +// nolint:unused +func setupTestNotifierSqlKv(t *testing.T) (*notifier, *eventStore) { + dbstore := db.InitTestDB(t) + eDB, err := dbimpl.ProvideResourceDB(dbstore, setting.NewCfg(), nil) + require.NoError(t, err) + kv, err := NewSQLKV(eDB) + require.NoError(t, err) + eventStore := newEventStore(kv) + notifier := newNotifier(eventStore, notifierOptions{log: &logging.NoOpLogger{}}) + return notifier, eventStore +} + func TestNewNotifier(t *testing.T) { notifier, _ := setupTestNotifier(t) @@ -35,10 +50,21 @@ func TestDefaultWatchOptions(t *testing.T) { assert.Equal(t, defaultBufferSize, opts.BufferSize) } -func TestNotifier_lastEventResourceVersion(t *testing.T) { - ctx := context.Background() - notifier, eventStore := setupTestNotifier(t) +func runNotifierTestWith(t *testing.T, storeName string, newStoreFn func(*testing.T) (*notifier, *eventStore), testFn func(*testing.T, context.Context, *notifier, *eventStore)) { + t.Run(storeName, func(t *testing.T) { + ctx := context.Background() + notifier, eventStore := newStoreFn(t) + testFn(t, ctx, notifier, eventStore) + }) +} +func TestNotifier_lastEventResourceVersion(t *testing.T) { + runNotifierTestWith(t, "badger", setupTestNotifier, testNotifierLastEventResourceVersion) + // enable this when sqlkv is ready + // runNotifierTestWith(t, "sqlkv", setupTestNotifierSqlKv, testNotifierLastEventResourceVersion) +} + +func testNotifierLastEventResourceVersion(t *testing.T, ctx context.Context, notifier *notifier, eventStore *eventStore) { // Test with no events rv, err := notifier.lastEventResourceVersion(ctx) assert.Error(t, err) @@ -85,8 +111,12 @@ func TestNotifier_lastEventResourceVersion(t *testing.T) { } func TestNotifier_cachekey(t *testing.T) { - notifier, _ := setupTestNotifier(t) + runNotifierTestWith(t, "badger", setupTestNotifier, testNotifierCachekey) + // enable this when sqlkv is ready + // runNotifierTestWith(t, "sqlkv", setupTestNotifierSqlKv, testNotifierCachekey) +} +func testNotifierCachekey(t *testing.T, ctx context.Context, notifier *notifier, eventStore *eventStore) { tests := []struct { name string event Event @@ -136,10 +166,14 @@ func TestNotifier_cachekey(t *testing.T) { } func TestNotifier_Watch_NoEvents(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) - defer cancel() + runNotifierTestWith(t, "badger", setupTestNotifier, testNotifierWatchNoEvents) + // enable this when sqlkv is ready + // runNotifierTestWith(t, "sqlkv", setupTestNotifierSqlKv, testNotifierWatchNoEvents) +} - notifier, eventStore := setupTestNotifier(t) +func testNotifierWatchNoEvents(t *testing.T, ctx context.Context, notifier *notifier, eventStore *eventStore) { + ctx, cancel := context.WithTimeout(ctx, 500*time.Millisecond) + defer cancel() // Add at least one event so that lastEventResourceVersion doesn't return ErrNotFound initialEvent := Event{ @@ -174,10 +208,14 @@ func TestNotifier_Watch_NoEvents(t *testing.T) { } func TestNotifier_Watch_WithExistingEvents(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() + runNotifierTestWith(t, "badger", setupTestNotifier, testNotifierWatchWithExistingEvents) + // enable this when sqlkv is ready + // runNotifierTestWith(t, "sqlkv", setupTestNotifierSqlKv, testNotifierWatchWithExistingEvents) +} - notifier, eventStore := setupTestNotifier(t) +func testNotifierWatchWithExistingEvents(t *testing.T, ctx context.Context, notifier *notifier, eventStore *eventStore) { + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() // Save some initial events initialEvents := []Event{ @@ -245,10 +283,14 @@ func TestNotifier_Watch_WithExistingEvents(t *testing.T) { } func TestNotifier_Watch_EventDeduplication(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() + runNotifierTestWith(t, "badger", setupTestNotifier, testNotifierWatchEventDeduplication) + // enable this when sqlkv is ready + // runNotifierTestWith(t, "sqlkv", setupTestNotifierSqlKv, testNotifierWatchEventDeduplication) +} - notifier, eventStore := setupTestNotifier(t) +func testNotifierWatchEventDeduplication(t *testing.T, ctx context.Context, notifier *notifier, eventStore *eventStore) { + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() // Add an initial event so that lastEventResourceVersion doesn't return ErrNotFound initialEvent := Event{ @@ -308,9 +350,13 @@ func TestNotifier_Watch_EventDeduplication(t *testing.T) { } func TestNotifier_Watch_ContextCancellation(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + runNotifierTestWith(t, "badger", setupTestNotifier, testNotifierWatchContextCancellation) + // enable this when sqlkv is ready + // runNotifierTestWith(t, "sqlkv", setupTestNotifierSqlKv, testNotifierWatchContextCancellation) +} - notifier, eventStore := setupTestNotifier(t) +func testNotifierWatchContextCancellation(t *testing.T, ctx context.Context, notifier *notifier, eventStore *eventStore) { + ctx, cancel := context.WithCancel(ctx) // Add an initial event so that lastEventResourceVersion doesn't return ErrNotFound initialEvent := Event{ @@ -351,10 +397,14 @@ func TestNotifier_Watch_ContextCancellation(t *testing.T) { } func TestNotifier_Watch_MultipleEvents(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() + runNotifierTestWith(t, "badger", setupTestNotifier, testNotifierWatchMultipleEvents) + // enable this when sqlkv is ready + // runNotifierTestWith(t, "sqlkv", setupTestNotifierSqlKv, testNotifierWatchMultipleEvents) +} - notifier, eventStore := setupTestNotifier(t) +func testNotifierWatchMultipleEvents(t *testing.T, ctx context.Context, notifier *notifier, eventStore *eventStore) { + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() rv := time.Now().UnixNano() // Add an initial event so that lastEventResourceVersion doesn't return ErrNotFound initialEvent := Event{ diff --git a/pkg/storage/unified/resource/sqlkv.go b/pkg/storage/unified/resource/sqlkv.go new file mode 100644 index 00000000000..f6e8d0698f3 --- /dev/null +++ b/pkg/storage/unified/resource/sqlkv.go @@ -0,0 +1,70 @@ +package resource + +import ( + "context" + "fmt" + "io" + "iter" + + "github.com/grafana/grafana/pkg/storage/unified/sql/db" +) + +var _ KV = &sqlKV{} + +type sqlKV struct { + dbProvider db.DBProvider + db db.DB +} + +func NewSQLKV(dbProvider db.DBProvider) (KV, error) { + if dbProvider == nil { + return nil, fmt.Errorf("dbProvider is required") + } + + ctx := context.Background() + dbConn, err := dbProvider.Init(ctx) + if err != nil { + return nil, fmt.Errorf("error initializing DB: %w", err) + } + + return &sqlKV{ + dbProvider: dbProvider, + db: dbConn, + }, nil +} + +func (k *sqlKV) Ping(ctx context.Context) error { + return k.db.PingContext(ctx) +} + +func (k *sqlKV) Keys(ctx context.Context, section string, opt ListOptions) iter.Seq2[string, error] { + return func(yield func(string, error) bool) { + panic("not implemented!") + } +} + +func (k *sqlKV) Get(ctx context.Context, section string, key string) (io.ReadCloser, error) { + panic("not implemented!") +} + +func (k *sqlKV) BatchGet(ctx context.Context, section string, keys []string) iter.Seq2[KeyValue, error] { + return func(yield func(KeyValue, error) bool) { + panic("not implemented!") + } +} + +func (k *sqlKV) Save(ctx context.Context, section string, key string) (io.WriteCloser, error) { + panic("not implemented!") +} + +func (k *sqlKV) Delete(ctx context.Context, section string, key string) error { + panic("not implemented!") +} + +func (k *sqlKV) BatchDelete(ctx context.Context, section string, keys []string) error { + panic("not implemented!") +} + +func (k *sqlKV) UnixTimestamp(ctx context.Context) (int64, error) { + panic("not implemented!") +} diff --git a/pkg/storage/unified/resource/storage_backend.go b/pkg/storage/unified/resource/storage_backend.go index 0de97b0355e..b0f51702775 100644 --- a/pkg/storage/unified/resource/storage_backend.go +++ b/pkg/storage/unified/resource/storage_backend.go @@ -70,7 +70,12 @@ type kvStorageBackend struct { //reg prometheus.Registerer } -var _ StorageBackend = &kvStorageBackend{} +var _ KVBackend = &kvStorageBackend{} + +type KVBackend interface { + StorageBackend + resourcepb.DiagnosticsServer +} type KVBackendOptions struct { KvStore KV @@ -82,7 +87,7 @@ type KVBackendOptions struct { Reg prometheus.Registerer // TODO add metrics } -func NewKVStorageBackend(opts KVBackendOptions) (StorageBackend, error) { +func NewKVStorageBackend(opts KVBackendOptions) (KVBackend, error) { ctx := context.Background() kv := opts.KvStore @@ -126,6 +131,18 @@ func NewKVStorageBackend(opts KVBackendOptions) (StorageBackend, error) { return backend, nil } +func (k *kvStorageBackend) IsHealthy(ctx context.Context, _ *resourcepb.HealthCheckRequest) (*resourcepb.HealthCheckResponse, error) { + type pinger interface { + Ping(context.Context) error + } + if p, ok := k.kv.(pinger); ok { + if err := p.Ping(ctx); err != nil { + return &resourcepb.HealthCheckResponse{Status: resourcepb.HealthCheckResponse_NOT_SERVING}, fmt.Errorf("KV store health check failed: %w", err) + } + } + return &resourcepb.HealthCheckResponse{Status: resourcepb.HealthCheckResponse_SERVING}, nil +} + // runCleanupOldEvents starts a background goroutine that periodically cleans up old events func (k *kvStorageBackend) runCleanupOldEvents(ctx context.Context) { // Run cleanup every hour diff --git a/pkg/storage/unified/sql/server.go b/pkg/storage/unified/sql/server.go index 6723a58dd29..84eda71ca20 100644 --- a/pkg/storage/unified/sql/server.go +++ b/pkg/storage/unified/sql/server.go @@ -97,22 +97,41 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) { return nil, err } - isHA := isHighAvailabilityEnabled(opts.Cfg.SectionWithEnvOverrides("database"), - opts.Cfg.SectionWithEnvOverrides("resource_api")) + if opts.Cfg.EnableSQLKVBackend { + sqlkv, err := resource.NewSQLKV(eDB) + if err != nil { + return nil, fmt.Errorf("error creating sqlkv: %s", err) + } - backend, err := NewBackend(BackendOptions{ - DBProvider: eDB, - Reg: opts.Reg, - IsHA: isHA, - storageMetrics: opts.StorageMetrics, - LastImportTimeMaxAge: opts.SearchOptions.MaxIndexAge, // No need to keep last_import_times older than max index age. - }) - if err != nil { - return nil, err + kvBackend, err := resource.NewKVStorageBackend(resource.KVBackendOptions{ + KvStore: sqlkv, + Tracer: opts.Tracer, + Reg: opts.Reg, + }) + if err != nil { + return nil, fmt.Errorf("error creating kv backend: %s", err) + } + + serverOptions.Backend = kvBackend + serverOptions.Diagnostics = kvBackend + } else { + isHA := isHighAvailabilityEnabled(opts.Cfg.SectionWithEnvOverrides("database"), + opts.Cfg.SectionWithEnvOverrides("resource_api")) + + backend, err := NewBackend(BackendOptions{ + DBProvider: eDB, + Reg: opts.Reg, + IsHA: isHA, + storageMetrics: opts.StorageMetrics, + LastImportTimeMaxAge: opts.SearchOptions.MaxIndexAge, // No need to keep last_import_times older than max index age. + }) + if err != nil { + return nil, err + } + serverOptions.Backend = backend + serverOptions.Diagnostics = backend + serverOptions.Lifecycle = backend } - serverOptions.Backend = backend - serverOptions.Diagnostics = backend - serverOptions.Lifecycle = backend } serverOptions.Search = opts.SearchOptions diff --git a/pkg/storage/unified/testing/kv.go b/pkg/storage/unified/testing/kv.go index eab9aa9c845..f30f1d761d7 100644 --- a/pkg/storage/unified/testing/kv.go +++ b/pkg/storage/unified/testing/kv.go @@ -35,7 +35,8 @@ type NewKVFunc func(ctx context.Context) resource.KV // KVTestOptions configures which tests to run type KVTestOptions struct { - NSPrefix string // namespace prefix for isolation + SkipTests map[string]bool + NSPrefix string // namespace prefix for isolation } // GenerateRandomKVPrefix creates a random namespace prefix for test isolation @@ -72,6 +73,11 @@ func RunKVTest(t *testing.T, newKV NewKVFunc, opts *KVTestOptions) { } for _, tc := range cases { + if shouldSkip := opts.SkipTests[tc.name]; shouldSkip { + t.Logf("Skipping test: %s", tc.name) + continue + } + t.Run(tc.name, func(t *testing.T) { tc.fn(t, newKV(context.Background()), opts.NSPrefix) }) diff --git a/pkg/storage/unified/testing/kv_test.go b/pkg/storage/unified/testing/kv_test.go index 1e9b1a16c45..4dbd27d5ec9 100644 --- a/pkg/storage/unified/testing/kv_test.go +++ b/pkg/storage/unified/testing/kv_test.go @@ -7,7 +7,11 @@ import ( badger "github.com/dgraph-io/badger/v4" "github.com/stretchr/testify/require" + "github.com/grafana/grafana/pkg/infra/db" + "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/storage/unified/resource" + "github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl" + "github.com/grafana/grafana/pkg/tests/testsuite" ) func TestBadgerKV(t *testing.T) { @@ -26,3 +30,33 @@ func TestBadgerKV(t *testing.T) { NSPrefix: "badger-kv-test", }) } + +func TestMain(m *testing.M) { + testsuite.Run(m) +} + +func TestSQLKV(t *testing.T) { + RunKVTest(t, func(ctx context.Context) resource.KV { + dbstore := db.InitTestDB(t) + eDB, err := dbimpl.ProvideResourceDB(dbstore, setting.NewCfg(), nil) + require.NoError(t, err) + + kv, err := resource.NewSQLKV(eDB) + require.NoError(t, err) + return kv + }, &KVTestOptions{ + NSPrefix: "sql-kv-test", + SkipTests: map[string]bool{ + TestKVGet: true, + TestKVSave: true, + TestKVDelete: true, + TestKVKeys: true, + TestKVKeysWithLimits: true, + TestKVKeysWithSort: true, + TestKVConcurrent: true, + TestKVUnixTimestamp: true, + TestKVBatchGet: true, + TestKVBatchDelete: true, + }, + }) +} diff --git a/pkg/storage/unified/testing/storage_backend_test.go b/pkg/storage/unified/testing/storage_backend_test.go index 04f34e9102f..70e3b15aa7b 100644 --- a/pkg/storage/unified/testing/storage_backend_test.go +++ b/pkg/storage/unified/testing/storage_backend_test.go @@ -7,7 +7,11 @@ import ( badger "github.com/dgraph-io/badger/v4" "github.com/stretchr/testify/require" + "github.com/grafana/grafana/pkg/infra/db" + "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/storage/unified/resource" + sqldb "github.com/grafana/grafana/pkg/storage/unified/sql/db" + "github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl" ) func TestBadgerKVStorageBackend(t *testing.T) { @@ -25,7 +29,7 @@ func TestBadgerKVStorageBackend(t *testing.T) { require.NoError(t, err) return backend }, &TestOptions{ - NSPrefix: "kvstorage-test", + NSPrefix: "badgerkvstorage-test", SkipTests: map[string]bool{ // TODO: fix these tests and remove this skip TestBlobSupport: true, @@ -35,3 +39,50 @@ func TestBadgerKVStorageBackend(t *testing.T) { }, }) } + +func TestSQLKVStorageBackend(t *testing.T) { + newBackendFunc := func(ctx context.Context) (resource.StorageBackend, sqldb.DB) { + dbstore := db.InitTestDB(t) + eDB, err := dbimpl.ProvideResourceDB(dbstore, setting.NewCfg(), nil) + require.NoError(t, err) + kv, err := resource.NewSQLKV(eDB) + require.NoError(t, err) + kvOpts := resource.KVBackendOptions{ + KvStore: kv, + } + backend, err := resource.NewKVStorageBackend(kvOpts) + require.NoError(t, err) + db, err := eDB.Init(ctx) + require.NoError(t, err) + return backend, db + } + + RunStorageBackendTest(t, func(ctx context.Context) resource.StorageBackend { + backend, _ := newBackendFunc(ctx) + return backend + }, &TestOptions{ + NSPrefix: "sqlkvstorage-test", + SkipTests: map[string]bool{ + TestHappyPath: true, + TestWatchWriteEvents: true, + TestList: true, + TestBlobSupport: true, + TestGetResourceStats: true, + TestListHistory: true, + TestListHistoryErrorReporting: true, + TestListModifiedSince: true, + TestListTrash: true, + TestCreateNewResource: true, + TestGetResourceLastImportTime: true, + TestOptimisticLocking: true, + TestKeyPathGeneration: true, + }, + }) + + RunSQLStorageBackendCompatibilityTest(t, newBackendFunc, &TestOptions{ + NSPrefix: "sqlkvstorage-compatibility-test", + SkipTests: map[string]bool{ + TestKeyPathGeneration: true, + }, + }) +}