Compare commits
4 Commits
main
...
sqlkv-enab
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
861af005e0 | ||
|
|
14a05137e1 | ||
|
|
cfe86378a1 | ||
|
|
f7d7e09626 |
@@ -184,9 +184,9 @@ func (n *eventStore) Get(ctx context.Context, key EventKey) (Event, error) {
|
||||
}
|
||||
|
||||
// ListSince returns a sequence of events since the given resource version.
|
||||
func (n *eventStore) ListKeysSince(ctx context.Context, sinceRV int64) iter.Seq2[string, error] {
|
||||
func (n *eventStore) ListKeysSince(ctx context.Context, sinceRV int64, sortOrder SortOrder) iter.Seq2[string, error] {
|
||||
opts := ListOptions{
|
||||
Sort: SortOrderAsc,
|
||||
Sort: sortOrder,
|
||||
StartKey: fmt.Sprintf("%d", sinceRV),
|
||||
}
|
||||
return func(yield func(string, error) bool) {
|
||||
@@ -202,9 +202,9 @@ func (n *eventStore) ListKeysSince(ctx context.Context, sinceRV int64) iter.Seq2
|
||||
}
|
||||
}
|
||||
|
||||
func (n *eventStore) ListSince(ctx context.Context, sinceRV int64) iter.Seq2[Event, error] {
|
||||
func (n *eventStore) ListSince(ctx context.Context, sinceRV int64, sortOrder SortOrder) iter.Seq2[Event, error] {
|
||||
return func(yield func(Event, error) bool) {
|
||||
for evtKey, err := range n.ListKeysSince(ctx, sinceRV) {
|
||||
for evtKey, err := range n.ListKeysSince(ctx, sinceRV, sortOrder) {
|
||||
if err != nil {
|
||||
yield(Event{}, err)
|
||||
return
|
||||
|
||||
@@ -369,7 +369,7 @@ func testEventStoreListKeysSince(t *testing.T, ctx context.Context, store *event
|
||||
|
||||
// List events since RV 1500 (should get events with RV 2000 and 3000)
|
||||
retrievedEvents := make([]string, 0, 2)
|
||||
for eventKey, err := range store.ListKeysSince(ctx, 1500) {
|
||||
for eventKey, err := range store.ListKeysSince(ctx, 1500, SortOrderAsc) {
|
||||
require.NoError(t, err)
|
||||
retrievedEvents = append(retrievedEvents, eventKey)
|
||||
}
|
||||
@@ -429,7 +429,7 @@ func testEventStoreListSince(t *testing.T, ctx context.Context, store *eventStor
|
||||
|
||||
// List events since RV 1500 (should get events with RV 2000 and 3000)
|
||||
retrievedEvents := make([]Event, 0, 2)
|
||||
for event, err := range store.ListSince(ctx, 1500) {
|
||||
for event, err := range store.ListSince(ctx, 1500, SortOrderAsc) {
|
||||
require.NoError(t, err)
|
||||
retrievedEvents = append(retrievedEvents, event)
|
||||
}
|
||||
@@ -453,7 +453,7 @@ func TestEventStore_ListSince_Empty(t *testing.T) {
|
||||
func testEventStoreListSinceEmpty(t *testing.T, ctx context.Context, store *eventStore) {
|
||||
// List events when store is empty
|
||||
retrievedEvents := make([]Event, 0)
|
||||
for event, err := range store.ListSince(ctx, 0) {
|
||||
for event, err := range store.ListSince(ctx, 0, SortOrderAsc) {
|
||||
require.NoError(t, err)
|
||||
retrievedEvents = append(retrievedEvents, event)
|
||||
}
|
||||
@@ -825,7 +825,7 @@ func testListKeysSinceWithSnowflakeTime(t *testing.T, ctx context.Context, store
|
||||
// List events since 90 minutes ago using subtractDurationFromSnowflake
|
||||
sinceRV := subtractDurationFromSnowflake(snowflakeFromTime(now), 90*time.Minute)
|
||||
retrievedEvents := make([]string, 0)
|
||||
for eventKey, err := range store.ListKeysSince(ctx, sinceRV) {
|
||||
for eventKey, err := range store.ListKeysSince(ctx, sinceRV, SortOrderAsc) {
|
||||
require.NoError(t, err)
|
||||
retrievedEvents = append(retrievedEvents, eventKey)
|
||||
}
|
||||
@@ -842,7 +842,7 @@ func testListKeysSinceWithSnowflakeTime(t *testing.T, ctx context.Context, store
|
||||
// List events since 30 minutes ago using subtractDurationFromSnowflake
|
||||
sinceRV = subtractDurationFromSnowflake(snowflakeFromTime(now), 30*time.Minute)
|
||||
retrievedEvents = make([]string, 0)
|
||||
for eventKey, err := range store.ListKeysSince(ctx, sinceRV) {
|
||||
for eventKey, err := range store.ListKeysSince(ctx, sinceRV, SortOrderAsc) {
|
||||
require.NoError(t, err)
|
||||
retrievedEvents = append(retrievedEvents, eventKey)
|
||||
}
|
||||
|
||||
@@ -119,7 +119,7 @@ func (n *pollingNotifier) Watch(ctx context.Context, opts watchOptions) <-chan E
|
||||
return
|
||||
case <-time.After(currentInterval):
|
||||
foundEvents := false
|
||||
for evt, err := range n.eventStore.ListSince(ctx, subtractDurationFromSnowflake(lastRV, opts.LookbackPeriod)) {
|
||||
for evt, err := range n.eventStore.ListSince(ctx, subtractDurationFromSnowflake(lastRV, opts.LookbackPeriod), SortOrderAsc) {
|
||||
if err != nil {
|
||||
n.log.Error("Failed to list events since", "error", err)
|
||||
continue
|
||||
|
||||
@@ -801,8 +801,20 @@ func (k *kvStorageBackend) ListModifiedSince(ctx context.Context, key Namespaced
|
||||
}
|
||||
}
|
||||
|
||||
// Generate a new resource version for the list
|
||||
listRV := k.snowflake.Generate().Int64()
|
||||
latestEvent, err := k.eventStore.LastEventKey(ctx)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrNotFound) {
|
||||
return sinceRv, func(yield func(*ModifiedResource, error) bool) { /* nothing to return */ }
|
||||
}
|
||||
|
||||
return 0, func(yield func(*ModifiedResource, error) bool) {
|
||||
yield(nil, fmt.Errorf("error trying to retrieve last event key: %s", err))
|
||||
}
|
||||
}
|
||||
|
||||
if latestEvent.ResourceVersion == sinceRv {
|
||||
return sinceRv, func(yield func(*ModifiedResource, error) bool) { /* nothing to return */ }
|
||||
}
|
||||
|
||||
// Check if sinceRv is older than 1 hour
|
||||
sinceRvTimestamp := snowflake.ID(sinceRv).Time()
|
||||
@@ -811,11 +823,11 @@ func (k *kvStorageBackend) ListModifiedSince(ctx context.Context, key Namespaced
|
||||
|
||||
if sinceRvAge > time.Hour {
|
||||
k.log.Debug("ListModifiedSince using data store", "sinceRv", sinceRv, "sinceRvAge", sinceRvAge)
|
||||
return listRV, k.listModifiedSinceDataStore(ctx, key, sinceRv)
|
||||
return latestEvent.ResourceVersion, k.listModifiedSinceDataStore(ctx, key, sinceRv)
|
||||
}
|
||||
|
||||
k.log.Debug("ListModifiedSince using event store", "sinceRv", sinceRv, "sinceRvAge", sinceRvAge)
|
||||
return listRV, k.listModifiedSinceEventStore(ctx, key, sinceRv)
|
||||
return latestEvent.ResourceVersion, k.listModifiedSinceEventStore(ctx, key, sinceRv)
|
||||
}
|
||||
|
||||
func convertEventType(action DataAction) resourcepb.WatchEvent_Type {
|
||||
@@ -916,9 +928,9 @@ func (k *kvStorageBackend) listModifiedSinceDataStore(ctx context.Context, key N
|
||||
|
||||
func (k *kvStorageBackend) listModifiedSinceEventStore(ctx context.Context, key NamespacedResource, sinceRv int64) iter.Seq2[*ModifiedResource, error] {
|
||||
return func(yield func(*ModifiedResource, error) bool) {
|
||||
// store all events ordered by RV for the given tenant here
|
||||
eventKeys := make([]EventKey, 0)
|
||||
for evtKeyStr, err := range k.eventStore.ListKeysSince(ctx, subtractDurationFromSnowflake(sinceRv, defaultLookbackPeriod)) {
|
||||
// we only care about the latest revision of every resource in the list
|
||||
seen := make(map[string]struct{})
|
||||
for evtKeyStr, err := range k.eventStore.ListKeysSince(ctx, subtractDurationFromSnowflake(sinceRv, defaultLookbackPeriod), SortOrderDesc) {
|
||||
if err != nil {
|
||||
yield(&ModifiedResource{}, err)
|
||||
return
|
||||
@@ -938,18 +950,11 @@ func (k *kvStorageBackend) listModifiedSinceEventStore(ctx context.Context, key
|
||||
continue
|
||||
}
|
||||
|
||||
eventKeys = append(eventKeys, evtKey)
|
||||
}
|
||||
|
||||
// we only care about the latest revision of every resource in the list
|
||||
seen := make(map[string]struct{})
|
||||
for i := len(eventKeys) - 1; i >= 0; i -= 1 {
|
||||
evtKey := eventKeys[i]
|
||||
if _, ok := seen[evtKey.Name]; ok {
|
||||
continue
|
||||
}
|
||||
seen[evtKey.Name] = struct{}{}
|
||||
|
||||
seen[evtKey.Name] = struct{}{}
|
||||
value, err := k.getValueFromDataStore(ctx, DataKey(evtKey))
|
||||
if err != nil {
|
||||
yield(&ModifiedResource{}, err)
|
||||
@@ -1307,7 +1312,7 @@ func (b *kvStorageBackend) ProcessBulk(ctx context.Context, setting BulkSettings
|
||||
if setting.RebuildCollection {
|
||||
for _, key := range setting.Collection {
|
||||
events := make([]string, 0)
|
||||
for evtKeyStr, err := range b.eventStore.ListKeysSince(ctx, 1) {
|
||||
for evtKeyStr, err := range b.eventStore.ListKeysSince(ctx, 1, SortOrderAsc) {
|
||||
if err != nil {
|
||||
b.log.Error("failed to list event: %s", err)
|
||||
return rsp
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
"github.com/grafana/authlib/types"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||
"github.com/grafana/grafana/pkg/infra/db"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
sqldb "github.com/grafana/grafana/pkg/storage/unified/sql/db"
|
||||
@@ -99,6 +100,10 @@ func RunStorageBackendTest(t *testing.T, newBackend NewBackendFunc, opts *TestOp
|
||||
}
|
||||
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
if db.IsTestDbSQLite() {
|
||||
t.Skip("Skipping tests on sqlite until channel notifier is implemented")
|
||||
}
|
||||
|
||||
tc.fn(t, newBackend(context.Background()), opts.NSPrefix)
|
||||
})
|
||||
}
|
||||
@@ -550,7 +555,7 @@ func runTestIntegrationBackendListModifiedSince(t *testing.T, backend resource.S
|
||||
Resource: "resource",
|
||||
}
|
||||
latestRv, seq := backend.ListModifiedSince(ctx, key, rvCreated)
|
||||
require.Greater(t, latestRv, rvCreated)
|
||||
require.Equal(t, latestRv, rvDeleted)
|
||||
|
||||
counter := 0
|
||||
for res, err := range seq {
|
||||
@@ -624,11 +629,11 @@ func runTestIntegrationBackendListModifiedSince(t *testing.T, backend resource.S
|
||||
rvCreated3, _ := writeEvent(ctx, backend, "bItem", resourcepb.WatchEvent_ADDED, WithNamespace(ns))
|
||||
|
||||
latestRv, seq := backend.ListModifiedSince(ctx, key, rvCreated1-1)
|
||||
require.Greater(t, latestRv, rvCreated3)
|
||||
require.Equal(t, latestRv, rvCreated3)
|
||||
|
||||
counter := 0
|
||||
names := []string{"aItem", "bItem", "cItem"}
|
||||
rvs := []int64{rvCreated2, rvCreated3, rvCreated1}
|
||||
names := []string{"bItem", "aItem", "cItem"}
|
||||
rvs := []int64{rvCreated3, rvCreated2, rvCreated1}
|
||||
for res, err := range seq {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, key.Namespace, res.Key.Namespace)
|
||||
@@ -1166,7 +1171,7 @@ func runTestIntegrationBackendCreateNewResource(t *testing.T, backend resource.S
|
||||
}))
|
||||
|
||||
server := newServer(t, backend)
|
||||
ns := nsPrefix + "-create-resource"
|
||||
ns := nsPrefix + "-create-rsrce" // create-resource
|
||||
ctx = request.WithNamespace(ctx, ns)
|
||||
|
||||
request := &resourcepb.CreateRequest{
|
||||
@@ -1607,7 +1612,7 @@ func (s *sliceBulkRequestIterator) RollbackRequested() bool {
|
||||
|
||||
func runTestIntegrationBackendOptimisticLocking(t *testing.T, backend resource.StorageBackend, nsPrefix string) {
|
||||
ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second))
|
||||
ns := nsPrefix + "-optimistic-locking"
|
||||
ns := nsPrefix + "-optimis-lock" // optimistic-locking. need to cut down on characters to not exceed namespace character limit (40)
|
||||
|
||||
t.Run("concurrent updates with same RV - only one succeeds", func(t *testing.T) {
|
||||
// Create initial resource with rv0 (no previous RV)
|
||||
|
||||
@@ -36,6 +36,10 @@ func NewTestSqlKvBackend(t *testing.T, ctx context.Context, withRvManager bool)
|
||||
KvStore: kv,
|
||||
}
|
||||
|
||||
if db.DriverName() == "sqlite3" {
|
||||
kvOpts.UseChannelNotifier = true
|
||||
}
|
||||
|
||||
if withRvManager {
|
||||
dialect := sqltemplate.DialectForDriver(db.DriverName())
|
||||
rvManager, err := rvmanager.NewResourceVersionManager(rvmanager.ResourceManagerOptions{
|
||||
|
||||
@@ -30,7 +30,6 @@ func TestBadgerKVStorageBackend(t *testing.T) {
|
||||
SkipTests: map[string]bool{
|
||||
// TODO: fix these tests and remove this skip
|
||||
TestBlobSupport: true,
|
||||
TestListModifiedSince: true,
|
||||
// Badger does not support bulk import yet.
|
||||
TestGetResourceLastImportTime: true,
|
||||
},
|
||||
@@ -41,17 +40,8 @@ func TestIntegrationSQLKVStorageBackend(t *testing.T) {
|
||||
testutil.SkipIntegrationTestInShortMode(t)
|
||||
|
||||
skipTests := map[string]bool{
|
||||
TestWatchWriteEvents: true,
|
||||
TestList: true,
|
||||
TestBlobSupport: true,
|
||||
TestGetResourceStats: true,
|
||||
TestListHistory: true,
|
||||
TestListHistoryErrorReporting: true,
|
||||
TestListModifiedSince: true,
|
||||
TestListTrash: true,
|
||||
TestCreateNewResource: true,
|
||||
TestGetResourceLastImportTime: true,
|
||||
TestOptimisticLocking: true,
|
||||
}
|
||||
|
||||
t.Run("Without RvManager", func(t *testing.T) {
|
||||
@@ -59,7 +49,7 @@ func TestIntegrationSQLKVStorageBackend(t *testing.T) {
|
||||
backend, _ := NewTestSqlKvBackend(t, ctx, false)
|
||||
return backend
|
||||
}, &TestOptions{
|
||||
NSPrefix: "sqlkvstorage-test",
|
||||
NSPrefix: "sqlkvstoragetest",
|
||||
SkipTests: skipTests,
|
||||
})
|
||||
})
|
||||
@@ -69,7 +59,7 @@ func TestIntegrationSQLKVStorageBackend(t *testing.T) {
|
||||
backend, _ := NewTestSqlKvBackend(t, ctx, true)
|
||||
return backend
|
||||
}, &TestOptions{
|
||||
NSPrefix: "sqlkvstorage-withrvmanager-test",
|
||||
NSPrefix: "sqlkvstoragetest-rvmanager",
|
||||
SkipTests: skipTests,
|
||||
})
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user