Compare commits

...

4 Commits

Author SHA1 Message Date
Will Assis
861af005e0 unified-storage: fix listModifiedSince in storage_backend.go and enable its tests for both badger and sqlkv 2026-01-14 19:24:15 -03:00
Will Assis
14a05137e1 fmt 2026-01-14 15:43:30 -03:00
Will Assis
cfe86378a1 dont run tests on sqlite (yet) 2026-01-14 15:41:51 -03:00
Will Assis
f7d7e09626 unified-storage: sqlkv enable more tests 2026-01-14 15:25:42 -03:00
7 changed files with 48 additions and 44 deletions

View File

@@ -184,9 +184,9 @@ func (n *eventStore) Get(ctx context.Context, key EventKey) (Event, error) {
}
// ListSince returns a sequence of events since the given resource version.
func (n *eventStore) ListKeysSince(ctx context.Context, sinceRV int64) iter.Seq2[string, error] {
func (n *eventStore) ListKeysSince(ctx context.Context, sinceRV int64, sortOrder SortOrder) iter.Seq2[string, error] {
opts := ListOptions{
Sort: SortOrderAsc,
Sort: sortOrder,
StartKey: fmt.Sprintf("%d", sinceRV),
}
return func(yield func(string, error) bool) {
@@ -202,9 +202,9 @@ func (n *eventStore) ListKeysSince(ctx context.Context, sinceRV int64) iter.Seq2
}
}
func (n *eventStore) ListSince(ctx context.Context, sinceRV int64) iter.Seq2[Event, error] {
func (n *eventStore) ListSince(ctx context.Context, sinceRV int64, sortOrder SortOrder) iter.Seq2[Event, error] {
return func(yield func(Event, error) bool) {
for evtKey, err := range n.ListKeysSince(ctx, sinceRV) {
for evtKey, err := range n.ListKeysSince(ctx, sinceRV, sortOrder) {
if err != nil {
yield(Event{}, err)
return

View File

@@ -369,7 +369,7 @@ func testEventStoreListKeysSince(t *testing.T, ctx context.Context, store *event
// List events since RV 1500 (should get events with RV 2000 and 3000)
retrievedEvents := make([]string, 0, 2)
for eventKey, err := range store.ListKeysSince(ctx, 1500) {
for eventKey, err := range store.ListKeysSince(ctx, 1500, SortOrderAsc) {
require.NoError(t, err)
retrievedEvents = append(retrievedEvents, eventKey)
}
@@ -429,7 +429,7 @@ func testEventStoreListSince(t *testing.T, ctx context.Context, store *eventStor
// List events since RV 1500 (should get events with RV 2000 and 3000)
retrievedEvents := make([]Event, 0, 2)
for event, err := range store.ListSince(ctx, 1500) {
for event, err := range store.ListSince(ctx, 1500, SortOrderAsc) {
require.NoError(t, err)
retrievedEvents = append(retrievedEvents, event)
}
@@ -453,7 +453,7 @@ func TestEventStore_ListSince_Empty(t *testing.T) {
func testEventStoreListSinceEmpty(t *testing.T, ctx context.Context, store *eventStore) {
// List events when store is empty
retrievedEvents := make([]Event, 0)
for event, err := range store.ListSince(ctx, 0) {
for event, err := range store.ListSince(ctx, 0, SortOrderAsc) {
require.NoError(t, err)
retrievedEvents = append(retrievedEvents, event)
}
@@ -825,7 +825,7 @@ func testListKeysSinceWithSnowflakeTime(t *testing.T, ctx context.Context, store
// List events since 90 minutes ago using subtractDurationFromSnowflake
sinceRV := subtractDurationFromSnowflake(snowflakeFromTime(now), 90*time.Minute)
retrievedEvents := make([]string, 0)
for eventKey, err := range store.ListKeysSince(ctx, sinceRV) {
for eventKey, err := range store.ListKeysSince(ctx, sinceRV, SortOrderAsc) {
require.NoError(t, err)
retrievedEvents = append(retrievedEvents, eventKey)
}
@@ -842,7 +842,7 @@ func testListKeysSinceWithSnowflakeTime(t *testing.T, ctx context.Context, store
// List events since 30 minutes ago using subtractDurationFromSnowflake
sinceRV = subtractDurationFromSnowflake(snowflakeFromTime(now), 30*time.Minute)
retrievedEvents = make([]string, 0)
for eventKey, err := range store.ListKeysSince(ctx, sinceRV) {
for eventKey, err := range store.ListKeysSince(ctx, sinceRV, SortOrderAsc) {
require.NoError(t, err)
retrievedEvents = append(retrievedEvents, eventKey)
}

View File

@@ -119,7 +119,7 @@ func (n *pollingNotifier) Watch(ctx context.Context, opts watchOptions) <-chan E
return
case <-time.After(currentInterval):
foundEvents := false
for evt, err := range n.eventStore.ListSince(ctx, subtractDurationFromSnowflake(lastRV, opts.LookbackPeriod)) {
for evt, err := range n.eventStore.ListSince(ctx, subtractDurationFromSnowflake(lastRV, opts.LookbackPeriod), SortOrderAsc) {
if err != nil {
n.log.Error("Failed to list events since", "error", err)
continue

View File

@@ -801,8 +801,20 @@ func (k *kvStorageBackend) ListModifiedSince(ctx context.Context, key Namespaced
}
}
// Generate a new resource version for the list
listRV := k.snowflake.Generate().Int64()
latestEvent, err := k.eventStore.LastEventKey(ctx)
if err != nil {
if errors.Is(err, ErrNotFound) {
return sinceRv, func(yield func(*ModifiedResource, error) bool) { /* nothing to return */ }
}
return 0, func(yield func(*ModifiedResource, error) bool) {
yield(nil, fmt.Errorf("error trying to retrieve last event key: %s", err))
}
}
if latestEvent.ResourceVersion == sinceRv {
return sinceRv, func(yield func(*ModifiedResource, error) bool) { /* nothing to return */ }
}
// Check if sinceRv is older than 1 hour
sinceRvTimestamp := snowflake.ID(sinceRv).Time()
@@ -811,11 +823,11 @@ func (k *kvStorageBackend) ListModifiedSince(ctx context.Context, key Namespaced
if sinceRvAge > time.Hour {
k.log.Debug("ListModifiedSince using data store", "sinceRv", sinceRv, "sinceRvAge", sinceRvAge)
return listRV, k.listModifiedSinceDataStore(ctx, key, sinceRv)
return latestEvent.ResourceVersion, k.listModifiedSinceDataStore(ctx, key, sinceRv)
}
k.log.Debug("ListModifiedSince using event store", "sinceRv", sinceRv, "sinceRvAge", sinceRvAge)
return listRV, k.listModifiedSinceEventStore(ctx, key, sinceRv)
return latestEvent.ResourceVersion, k.listModifiedSinceEventStore(ctx, key, sinceRv)
}
func convertEventType(action DataAction) resourcepb.WatchEvent_Type {
@@ -916,9 +928,9 @@ func (k *kvStorageBackend) listModifiedSinceDataStore(ctx context.Context, key N
func (k *kvStorageBackend) listModifiedSinceEventStore(ctx context.Context, key NamespacedResource, sinceRv int64) iter.Seq2[*ModifiedResource, error] {
return func(yield func(*ModifiedResource, error) bool) {
// store all events ordered by RV for the given tenant here
eventKeys := make([]EventKey, 0)
for evtKeyStr, err := range k.eventStore.ListKeysSince(ctx, subtractDurationFromSnowflake(sinceRv, defaultLookbackPeriod)) {
// we only care about the latest revision of every resource in the list
seen := make(map[string]struct{})
for evtKeyStr, err := range k.eventStore.ListKeysSince(ctx, subtractDurationFromSnowflake(sinceRv, defaultLookbackPeriod), SortOrderDesc) {
if err != nil {
yield(&ModifiedResource{}, err)
return
@@ -938,18 +950,11 @@ func (k *kvStorageBackend) listModifiedSinceEventStore(ctx context.Context, key
continue
}
eventKeys = append(eventKeys, evtKey)
}
// we only care about the latest revision of every resource in the list
seen := make(map[string]struct{})
for i := len(eventKeys) - 1; i >= 0; i -= 1 {
evtKey := eventKeys[i]
if _, ok := seen[evtKey.Name]; ok {
continue
}
seen[evtKey.Name] = struct{}{}
seen[evtKey.Name] = struct{}{}
value, err := k.getValueFromDataStore(ctx, DataKey(evtKey))
if err != nil {
yield(&ModifiedResource{}, err)
@@ -1307,7 +1312,7 @@ func (b *kvStorageBackend) ProcessBulk(ctx context.Context, setting BulkSettings
if setting.RebuildCollection {
for _, key := range setting.Collection {
events := make([]string, 0)
for evtKeyStr, err := range b.eventStore.ListKeysSince(ctx, 1) {
for evtKeyStr, err := range b.eventStore.ListKeysSince(ctx, 1, SortOrderAsc) {
if err != nil {
b.log.Error("failed to list event: %s", err)
return rsp

View File

@@ -23,6 +23,7 @@ import (
"github.com/grafana/authlib/types"
"github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
sqldb "github.com/grafana/grafana/pkg/storage/unified/sql/db"
@@ -99,6 +100,10 @@ func RunStorageBackendTest(t *testing.T, newBackend NewBackendFunc, opts *TestOp
}
t.Run(tc.name, func(t *testing.T) {
if db.IsTestDbSQLite() {
t.Skip("Skipping tests on sqlite until channel notifier is implemented")
}
tc.fn(t, newBackend(context.Background()), opts.NSPrefix)
})
}
@@ -550,7 +555,7 @@ func runTestIntegrationBackendListModifiedSince(t *testing.T, backend resource.S
Resource: "resource",
}
latestRv, seq := backend.ListModifiedSince(ctx, key, rvCreated)
require.Greater(t, latestRv, rvCreated)
require.Equal(t, latestRv, rvDeleted)
counter := 0
for res, err := range seq {
@@ -624,11 +629,11 @@ func runTestIntegrationBackendListModifiedSince(t *testing.T, backend resource.S
rvCreated3, _ := writeEvent(ctx, backend, "bItem", resourcepb.WatchEvent_ADDED, WithNamespace(ns))
latestRv, seq := backend.ListModifiedSince(ctx, key, rvCreated1-1)
require.Greater(t, latestRv, rvCreated3)
require.Equal(t, latestRv, rvCreated3)
counter := 0
names := []string{"aItem", "bItem", "cItem"}
rvs := []int64{rvCreated2, rvCreated3, rvCreated1}
names := []string{"bItem", "aItem", "cItem"}
rvs := []int64{rvCreated3, rvCreated2, rvCreated1}
for res, err := range seq {
require.NoError(t, err)
require.Equal(t, key.Namespace, res.Key.Namespace)
@@ -1166,7 +1171,7 @@ func runTestIntegrationBackendCreateNewResource(t *testing.T, backend resource.S
}))
server := newServer(t, backend)
ns := nsPrefix + "-create-resource"
ns := nsPrefix + "-create-rsrce" // create-resource
ctx = request.WithNamespace(ctx, ns)
request := &resourcepb.CreateRequest{
@@ -1607,7 +1612,7 @@ func (s *sliceBulkRequestIterator) RollbackRequested() bool {
func runTestIntegrationBackendOptimisticLocking(t *testing.T, backend resource.StorageBackend, nsPrefix string) {
ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second))
ns := nsPrefix + "-optimistic-locking"
ns := nsPrefix + "-optimis-lock" // optimistic-locking. need to cut down on characters to not exceed namespace character limit (40)
t.Run("concurrent updates with same RV - only one succeeds", func(t *testing.T) {
// Create initial resource with rv0 (no previous RV)

View File

@@ -36,6 +36,10 @@ func NewTestSqlKvBackend(t *testing.T, ctx context.Context, withRvManager bool)
KvStore: kv,
}
if db.DriverName() == "sqlite3" {
kvOpts.UseChannelNotifier = true
}
if withRvManager {
dialect := sqltemplate.DialectForDriver(db.DriverName())
rvManager, err := rvmanager.NewResourceVersionManager(rvmanager.ResourceManagerOptions{

View File

@@ -30,7 +30,6 @@ func TestBadgerKVStorageBackend(t *testing.T) {
SkipTests: map[string]bool{
// TODO: fix these tests and remove this skip
TestBlobSupport: true,
TestListModifiedSince: true,
// Badger does not support bulk import yet.
TestGetResourceLastImportTime: true,
},
@@ -41,17 +40,8 @@ func TestIntegrationSQLKVStorageBackend(t *testing.T) {
testutil.SkipIntegrationTestInShortMode(t)
skipTests := map[string]bool{
TestWatchWriteEvents: true,
TestList: true,
TestBlobSupport: true,
TestGetResourceStats: true,
TestListHistory: true,
TestListHistoryErrorReporting: true,
TestListModifiedSince: true,
TestListTrash: true,
TestCreateNewResource: true,
TestGetResourceLastImportTime: true,
TestOptimisticLocking: true,
}
t.Run("Without RvManager", func(t *testing.T) {
@@ -59,7 +49,7 @@ func TestIntegrationSQLKVStorageBackend(t *testing.T) {
backend, _ := NewTestSqlKvBackend(t, ctx, false)
return backend
}, &TestOptions{
NSPrefix: "sqlkvstorage-test",
NSPrefix: "sqlkvstoragetest",
SkipTests: skipTests,
})
})
@@ -69,7 +59,7 @@ func TestIntegrationSQLKVStorageBackend(t *testing.T) {
backend, _ := NewTestSqlKvBackend(t, ctx, true)
return backend
}, &TestOptions{
NSPrefix: "sqlkvstorage-withrvmanager-test",
NSPrefix: "sqlkvstoragetest-rvmanager",
SkipTests: skipTests,
})
})