Compare commits

...

7 Commits

Author SHA1 Message Date
Will Assis
861af005e0 unified-storage: fix listModifiedSince in storage_backend.go and enable its tests for both badger and sqlkv 2026-01-14 19:24:15 -03:00
Will Assis
14a05137e1 fmt 2026-01-14 15:43:30 -03:00
Will Assis
cfe86378a1 dont run tests on sqlite (yet) 2026-01-14 15:41:51 -03:00
Will Assis
f7d7e09626 unified-storage: sqlkv enable more tests 2026-01-14 15:25:42 -03:00
Will Assis
ba416eab4e unified-storage: dont use polling notifier with sqlite in sqlkv (#116283)
* unified-storage: dont use polling notifier with sqlite in sqlkv
2026-01-14 18:22:39 +00:00
Alan Martin
189d50d815 UI: Use react-table column header types in InteractiveTable with story and tests (#116091)
* feat(InteractiveTable): allow custom header rendering

* docs(InteractiveTable): add story for custom header rendering

* test(InteractiveTable): add tests for custom header rendering

* docs(InteractiveTable): add custom header rendering documentation

* fix: test failure from non-a11y code
2026-01-14 17:59:03 +00:00
Mariell Hoversholm
450eaba447 test: skip integration test in short mode (#116280) 2026-01-14 18:33:55 +01:00
13 changed files with 280 additions and 75 deletions

View File

@@ -117,6 +117,44 @@ export const MyComponent = () => {
};
```
### Custom Header Rendering
Column headers can be customized using strings, React elements, or renderer functions. The `header` property accepts any value that matches React Table's `Renderer` type.
**Important:** When using custom header content, prefer inline elements (like `<span>`) over block elements (like `<div>`) to avoid layout issues. Block-level elements can cause extra spacing and alignment problems in table headers because they disrupt the table's inline flow. Use `display: inline-flex` or `display: inline-block` when you need flexbox or block-like behavior.
```tsx
const columns: Array<Column<TableData>> = [
// React element header
{
id: 'checkbox',
header: (
<>
<label htmlFor="select-all" className="sr-only">
Select all rows
</label>
<Checkbox id="select-all" />
</>
),
cell: () => <Checkbox aria-label="Select row" />,
},
// Function renderer header
{
id: 'firstName',
header: () => (
<span style={{ display: 'inline-flex', alignItems: 'center', gap: '8px' }}>
<Icon name="user" size="sm" />
<span>First Name</span>
</span>
),
},
// String header
{ id: 'lastName', header: 'Last name' },
];
```
### Custom Cell Rendering
Individual cells can be rendered using custom content dy defining a `cell` property on the column definition.

View File

@@ -3,8 +3,11 @@ import { useCallback, useMemo, useState } from 'react';
import { CellProps } from 'react-table';
import { LinkButton } from '../Button/Button';
import { Checkbox } from '../Forms/Checkbox';
import { Field } from '../Forms/Field';
import { Icon } from '../Icon/Icon';
import { Input } from '../Input/Input';
import { Text } from '../Text/Text';
import { FetchDataArgs, InteractiveTable, InteractiveTableHeaderTooltip } from './InteractiveTable';
import mdx from './InteractiveTable.mdx';
@@ -297,4 +300,40 @@ export const WithControlledSort: StoryFn<typeof InteractiveTable> = (args) => {
return <InteractiveTable {...args} data={data} pageSize={15} fetchData={fetchData} />;
};
export const WithCustomHeader: TableStoryObj = {
args: {
columns: [
// React element header
{
id: 'checkbox',
header: (
<>
<label htmlFor="select-all" className="sr-only">
Select all rows
</label>
<Checkbox id="select-all" />
</>
),
cell: () => <Checkbox aria-label="Select row" />,
},
// Function renderer header
{
id: 'firstName',
header: () => (
<span style={{ display: 'inline-flex', alignItems: 'center', gap: '8px' }}>
<Icon name="user" size="sm" />
<Text element="span">First Name</Text>
</span>
),
sortType: 'string',
},
// String header
{ id: 'lastName', header: 'Last name', sortType: 'string' },
{ id: 'car', header: 'Car', sortType: 'string' },
{ id: 'age', header: 'Age', sortType: 'number' },
],
data: pageableData.slice(0, 10),
getRowId: (r) => r.id,
},
};
export default meta;

View File

@@ -2,6 +2,9 @@ import { render, screen, within } from '@testing-library/react';
import userEvent from '@testing-library/user-event';
import * as React from 'react';
import { Checkbox } from '../Forms/Checkbox';
import { Icon } from '../Icon/Icon';
import { InteractiveTable } from './InteractiveTable';
import { Column } from './types';
@@ -247,4 +250,104 @@ describe('InteractiveTable', () => {
expect(fetchData).toHaveBeenCalledWith({ sortBy: [{ id: 'id', desc: false }] });
});
});
describe('custom header rendering', () => {
it('should render string headers', () => {
const columns: Array<Column<TableData>> = [{ id: 'id', header: 'ID' }];
const data: TableData[] = [{ id: '1', value: '1', country: 'Sweden' }];
render(<InteractiveTable columns={columns} data={data} getRowId={getRowId} />);
expect(screen.getByRole('columnheader', { name: 'ID' })).toBeInTheDocument();
});
it('should render React element headers', () => {
const columns: Array<Column<TableData>> = [
{
id: 'checkbox',
header: (
<>
<label htmlFor="select-all" className="sr-only">
Select all rows
</label>
<Checkbox id="select-all" data-testid="header-checkbox" />
</>
),
cell: () => <Checkbox data-testid="cell-checkbox" aria-label="Select row" />,
},
];
const data: TableData[] = [{ id: '1', value: '1', country: 'Sweden' }];
render(<InteractiveTable columns={columns} data={data} getRowId={getRowId} />);
expect(screen.getByTestId('header-checkbox')).toBeInTheDocument();
expect(screen.getByTestId('cell-checkbox')).toBeInTheDocument();
expect(screen.getByLabelText('Select all rows')).toBeInTheDocument();
expect(screen.getByLabelText('Select row')).toBeInTheDocument();
expect(screen.getByText('Select all rows')).toBeInTheDocument();
});
it('should render function renderer headers', () => {
const columns: Array<Column<TableData>> = [
{
id: 'firstName',
header: () => (
<span style={{ display: 'inline-flex', alignItems: 'center', gap: '8px' }}>
<Icon name="user" size="sm" data-testid="header-icon" />
<span>First Name</span>
</span>
),
sortType: 'string',
},
];
const data: TableData[] = [{ id: '1', value: '1', country: 'Sweden' }];
render(<InteractiveTable columns={columns} data={data} getRowId={getRowId} />);
expect(screen.getByTestId('header-icon')).toBeInTheDocument();
expect(screen.getByRole('columnheader', { name: /first name/i })).toBeInTheDocument();
});
it('should render all header types together', () => {
const columns: Array<Column<TableData>> = [
{
id: 'checkbox',
header: (
<>
<label htmlFor="select-all" className="sr-only">
Select all rows
</label>
<Checkbox id="select-all" data-testid="header-checkbox" />
</>
),
cell: () => <Checkbox aria-label="Select row" />,
},
{
id: 'id',
header: () => (
<span style={{ display: 'inline-flex', alignItems: 'center', gap: '8px' }}>
<Icon name="user" size="sm" data-testid="header-icon" />
<span>ID</span>
</span>
),
sortType: 'string',
},
{ id: 'country', header: 'Country', sortType: 'string' },
{ id: 'value', header: 'Value' },
];
const data: TableData[] = [
{ id: '1', value: 'Value 1', country: 'Sweden' },
{ id: '2', value: 'Value 2', country: 'Norway' },
];
render(<InteractiveTable columns={columns} data={data} getRowId={getRowId} />);
expect(screen.getByTestId('header-checkbox')).toBeInTheDocument();
expect(screen.getByTestId('header-icon')).toBeInTheDocument();
expect(screen.getByRole('columnheader', { name: 'Country' })).toBeInTheDocument();
expect(screen.getByRole('columnheader', { name: 'Value' })).toBeInTheDocument();
// Verify data is rendered
expect(screen.getByText('Sweden')).toBeInTheDocument();
expect(screen.getByText('Norway')).toBeInTheDocument();
expect(screen.getByText('Value 1')).toBeInTheDocument();
expect(screen.getByText('Value 2')).toBeInTheDocument();
});
});
});

View File

@@ -1,5 +1,5 @@
import { ReactNode } from 'react';
import { CellProps, DefaultSortTypes, IdType, SortByFn } from 'react-table';
import { CellProps, DefaultSortTypes, HeaderProps, IdType, Renderer, SortByFn } from 'react-table';
export interface Column<TableData extends object> {
/**
@@ -11,9 +11,9 @@ export interface Column<TableData extends object> {
*/
cell?: (props: CellProps<TableData>) => ReactNode;
/**
* Header name. if `undefined` the header will be empty. Useful for action columns.
* Header name. Can be a string, renderer function, or undefined. If `undefined` the header will be empty. Useful for action columns.
*/
header?: string;
header?: Renderer<HeaderProps<TableData>>;
/**
* Column sort type. If `undefined` the column will not be sortable.
* */

View File

@@ -184,9 +184,9 @@ func (n *eventStore) Get(ctx context.Context, key EventKey) (Event, error) {
}
// ListSince returns a sequence of events since the given resource version.
func (n *eventStore) ListKeysSince(ctx context.Context, sinceRV int64) iter.Seq2[string, error] {
func (n *eventStore) ListKeysSince(ctx context.Context, sinceRV int64, sortOrder SortOrder) iter.Seq2[string, error] {
opts := ListOptions{
Sort: SortOrderAsc,
Sort: sortOrder,
StartKey: fmt.Sprintf("%d", sinceRV),
}
return func(yield func(string, error) bool) {
@@ -202,9 +202,9 @@ func (n *eventStore) ListKeysSince(ctx context.Context, sinceRV int64) iter.Seq2
}
}
func (n *eventStore) ListSince(ctx context.Context, sinceRV int64) iter.Seq2[Event, error] {
func (n *eventStore) ListSince(ctx context.Context, sinceRV int64, sortOrder SortOrder) iter.Seq2[Event, error] {
return func(yield func(Event, error) bool) {
for evtKey, err := range n.ListKeysSince(ctx, sinceRV) {
for evtKey, err := range n.ListKeysSince(ctx, sinceRV, sortOrder) {
if err != nil {
yield(Event{}, err)
return

View File

@@ -369,7 +369,7 @@ func testEventStoreListKeysSince(t *testing.T, ctx context.Context, store *event
// List events since RV 1500 (should get events with RV 2000 and 3000)
retrievedEvents := make([]string, 0, 2)
for eventKey, err := range store.ListKeysSince(ctx, 1500) {
for eventKey, err := range store.ListKeysSince(ctx, 1500, SortOrderAsc) {
require.NoError(t, err)
retrievedEvents = append(retrievedEvents, eventKey)
}
@@ -429,7 +429,7 @@ func testEventStoreListSince(t *testing.T, ctx context.Context, store *eventStor
// List events since RV 1500 (should get events with RV 2000 and 3000)
retrievedEvents := make([]Event, 0, 2)
for event, err := range store.ListSince(ctx, 1500) {
for event, err := range store.ListSince(ctx, 1500, SortOrderAsc) {
require.NoError(t, err)
retrievedEvents = append(retrievedEvents, event)
}
@@ -453,7 +453,7 @@ func TestEventStore_ListSince_Empty(t *testing.T) {
func testEventStoreListSinceEmpty(t *testing.T, ctx context.Context, store *eventStore) {
// List events when store is empty
retrievedEvents := make([]Event, 0)
for event, err := range store.ListSince(ctx, 0) {
for event, err := range store.ListSince(ctx, 0, SortOrderAsc) {
require.NoError(t, err)
retrievedEvents = append(retrievedEvents, event)
}
@@ -825,7 +825,7 @@ func testListKeysSinceWithSnowflakeTime(t *testing.T, ctx context.Context, store
// List events since 90 minutes ago using subtractDurationFromSnowflake
sinceRV := subtractDurationFromSnowflake(snowflakeFromTime(now), 90*time.Minute)
retrievedEvents := make([]string, 0)
for eventKey, err := range store.ListKeysSince(ctx, sinceRV) {
for eventKey, err := range store.ListKeysSince(ctx, sinceRV, SortOrderAsc) {
require.NoError(t, err)
retrievedEvents = append(retrievedEvents, eventKey)
}
@@ -842,7 +842,7 @@ func testListKeysSinceWithSnowflakeTime(t *testing.T, ctx context.Context, store
// List events since 30 minutes ago using subtractDurationFromSnowflake
sinceRV = subtractDurationFromSnowflake(snowflakeFromTime(now), 30*time.Minute)
retrievedEvents = make([]string, 0)
for eventKey, err := range store.ListKeysSince(ctx, sinceRV) {
for eventKey, err := range store.ListKeysSince(ctx, sinceRV, SortOrderAsc) {
require.NoError(t, err)
retrievedEvents = append(retrievedEvents, eventKey)
}

View File

@@ -19,13 +19,18 @@ const (
defaultBufferSize = 10000
)
type notifier struct {
type notifier interface {
Watch(context.Context, watchOptions) <-chan Event
}
type pollingNotifier struct {
eventStore *eventStore
log logging.Logger
}
type notifierOptions struct {
log logging.Logger
log logging.Logger
useChannelNotifier bool
}
type watchOptions struct {
@@ -44,15 +49,26 @@ func defaultWatchOptions() watchOptions {
}
}
func newNotifier(eventStore *eventStore, opts notifierOptions) *notifier {
func newNotifier(eventStore *eventStore, opts notifierOptions) notifier {
if opts.log == nil {
opts.log = &logging.NoOpLogger{}
}
return &notifier{eventStore: eventStore, log: opts.log}
if opts.useChannelNotifier {
return &channelNotifier{}
}
return &pollingNotifier{eventStore: eventStore, log: opts.log}
}
type channelNotifier struct{}
func (cn *channelNotifier) Watch(ctx context.Context, opts watchOptions) <-chan Event {
return nil
}
// Return the last resource version from the event store
func (n *notifier) lastEventResourceVersion(ctx context.Context) (int64, error) {
func (n *pollingNotifier) lastEventResourceVersion(ctx context.Context) (int64, error) {
e, err := n.eventStore.LastEventKey(ctx)
if err != nil {
return 0, err
@@ -60,11 +76,11 @@ func (n *notifier) lastEventResourceVersion(ctx context.Context) (int64, error)
return e.ResourceVersion, nil
}
func (n *notifier) cacheKey(evt Event) string {
func (n *pollingNotifier) cacheKey(evt Event) string {
return fmt.Sprintf("%s~%s~%s~%s~%d", evt.Namespace, evt.Group, evt.Resource, evt.Name, evt.ResourceVersion)
}
func (n *notifier) Watch(ctx context.Context, opts watchOptions) <-chan Event {
func (n *pollingNotifier) Watch(ctx context.Context, opts watchOptions) <-chan Event {
if opts.MinBackoff <= 0 {
opts.MinBackoff = defaultMinBackoff
}
@@ -103,7 +119,7 @@ func (n *notifier) Watch(ctx context.Context, opts watchOptions) <-chan Event {
return
case <-time.After(currentInterval):
foundEvents := false
for evt, err := range n.eventStore.ListSince(ctx, subtractDurationFromSnowflake(lastRV, opts.LookbackPeriod)) {
for evt, err := range n.eventStore.ListSince(ctx, subtractDurationFromSnowflake(lastRV, opts.LookbackPeriod), SortOrderAsc) {
if err != nil {
n.log.Error("Failed to list events since", "error", err)
continue

View File

@@ -13,7 +13,7 @@ import (
"github.com/stretchr/testify/require"
)
func setupTestNotifier(t *testing.T) (*notifier, *eventStore) {
func setupTestNotifier(t *testing.T) (*pollingNotifier, *eventStore) {
db := setupTestBadgerDB(t)
t.Cleanup(func() {
err := db.Close()
@@ -22,10 +22,10 @@ func setupTestNotifier(t *testing.T) (*notifier, *eventStore) {
kv := NewBadgerKV(db)
eventStore := newEventStore(kv)
notifier := newNotifier(eventStore, notifierOptions{log: &logging.NoOpLogger{}})
return notifier, eventStore
return notifier.(*pollingNotifier), eventStore
}
func setupTestNotifierSqlKv(t *testing.T) (*notifier, *eventStore) {
func setupTestNotifierSqlKv(t *testing.T) (*pollingNotifier, *eventStore) {
dbstore := db.InitTestDB(t)
eDB, err := dbimpl.ProvideResourceDB(dbstore, setting.NewCfg(), nil)
require.NoError(t, err)
@@ -33,7 +33,7 @@ func setupTestNotifierSqlKv(t *testing.T) (*notifier, *eventStore) {
require.NoError(t, err)
eventStore := newEventStore(kv)
notifier := newNotifier(eventStore, notifierOptions{log: &logging.NoOpLogger{}})
return notifier, eventStore
return notifier.(*pollingNotifier), eventStore
}
func TestNewNotifier(t *testing.T) {
@@ -49,7 +49,7 @@ func TestDefaultWatchOptions(t *testing.T) {
assert.Equal(t, defaultBufferSize, opts.BufferSize)
}
func runNotifierTestWith(t *testing.T, storeName string, newStoreFn func(*testing.T) (*notifier, *eventStore), testFn func(*testing.T, context.Context, *notifier, *eventStore)) {
func runNotifierTestWith(t *testing.T, storeName string, newStoreFn func(*testing.T) (*pollingNotifier, *eventStore), testFn func(*testing.T, context.Context, *pollingNotifier, *eventStore)) {
t.Run(storeName, func(t *testing.T) {
ctx := context.Background()
notifier, eventStore := newStoreFn(t)
@@ -62,7 +62,7 @@ func TestNotifier_lastEventResourceVersion(t *testing.T) {
runNotifierTestWith(t, "sqlkv", setupTestNotifierSqlKv, testNotifierLastEventResourceVersion)
}
func testNotifierLastEventResourceVersion(t *testing.T, ctx context.Context, notifier *notifier, eventStore *eventStore) {
func testNotifierLastEventResourceVersion(t *testing.T, ctx context.Context, notifier *pollingNotifier, eventStore *eventStore) {
// Test with no events
rv, err := notifier.lastEventResourceVersion(ctx)
assert.Error(t, err)
@@ -113,7 +113,7 @@ func TestNotifier_cachekey(t *testing.T) {
runNotifierTestWith(t, "sqlkv", setupTestNotifierSqlKv, testNotifierCachekey)
}
func testNotifierCachekey(t *testing.T, ctx context.Context, notifier *notifier, eventStore *eventStore) {
func testNotifierCachekey(t *testing.T, ctx context.Context, notifier *pollingNotifier, eventStore *eventStore) {
tests := []struct {
name string
event Event
@@ -167,7 +167,7 @@ func TestNotifier_Watch_NoEvents(t *testing.T) {
runNotifierTestWith(t, "sqlkv", setupTestNotifierSqlKv, testNotifierWatchNoEvents)
}
func testNotifierWatchNoEvents(t *testing.T, ctx context.Context, notifier *notifier, eventStore *eventStore) {
func testNotifierWatchNoEvents(t *testing.T, ctx context.Context, notifier *pollingNotifier, eventStore *eventStore) {
ctx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
defer cancel()
@@ -208,7 +208,7 @@ func TestNotifier_Watch_WithExistingEvents(t *testing.T) {
runNotifierTestWith(t, "sqlkv", setupTestNotifierSqlKv, testNotifierWatchWithExistingEvents)
}
func testNotifierWatchWithExistingEvents(t *testing.T, ctx context.Context, notifier *notifier, eventStore *eventStore) {
func testNotifierWatchWithExistingEvents(t *testing.T, ctx context.Context, notifier *pollingNotifier, eventStore *eventStore) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
@@ -282,7 +282,7 @@ func TestNotifier_Watch_EventDeduplication(t *testing.T) {
runNotifierTestWith(t, "sqlkv", setupTestNotifierSqlKv, testNotifierWatchEventDeduplication)
}
func testNotifierWatchEventDeduplication(t *testing.T, ctx context.Context, notifier *notifier, eventStore *eventStore) {
func testNotifierWatchEventDeduplication(t *testing.T, ctx context.Context, notifier *pollingNotifier, eventStore *eventStore) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
@@ -348,7 +348,7 @@ func TestNotifier_Watch_ContextCancellation(t *testing.T) {
runNotifierTestWith(t, "sqlkv", setupTestNotifierSqlKv, testNotifierWatchContextCancellation)
}
func testNotifierWatchContextCancellation(t *testing.T, ctx context.Context, notifier *notifier, eventStore *eventStore) {
func testNotifierWatchContextCancellation(t *testing.T, ctx context.Context, notifier *pollingNotifier, eventStore *eventStore) {
ctx, cancel := context.WithCancel(ctx)
// Add an initial event so that lastEventResourceVersion doesn't return ErrNotFound
@@ -394,7 +394,7 @@ func TestNotifier_Watch_MultipleEvents(t *testing.T) {
runNotifierTestWith(t, "sqlkv", setupTestNotifierSqlKv, testNotifierWatchMultipleEvents)
}
func testNotifierWatchMultipleEvents(t *testing.T, ctx context.Context, notifier *notifier, eventStore *eventStore) {
func testNotifierWatchMultipleEvents(t *testing.T, ctx context.Context, notifier *pollingNotifier, eventStore *eventStore) {
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
rv := time.Now().UnixNano()

View File

@@ -61,7 +61,7 @@ type kvStorageBackend struct {
bulkLock *BulkLock
dataStore *dataStore
eventStore *eventStore
notifier *notifier
notifier notifier
builder DocumentBuilder
log logging.Logger
withPruner bool
@@ -91,6 +91,7 @@ type KVBackendOptions struct {
Tracer trace.Tracer // TODO add tracing
Reg prometheus.Registerer // TODO add metrics
UseChannelNotifier bool
// Adding RvManager overrides the RV generated with snowflake in order to keep backwards compatibility with
// unified/sql
RvManager *rvmanager.ResourceVersionManager
@@ -121,7 +122,7 @@ func NewKVStorageBackend(opts KVBackendOptions) (KVBackend, error) {
bulkLock: NewBulkLock(),
dataStore: newDataStore(kv),
eventStore: eventStore,
notifier: newNotifier(eventStore, notifierOptions{}),
notifier: newNotifier(eventStore, notifierOptions{useChannelNotifier: opts.UseChannelNotifier}),
snowflake: s,
builder: StandardDocumentBuilder(), // For now we use the standard document builder.
log: &logging.NoOpLogger{}, // Make this configurable
@@ -800,8 +801,20 @@ func (k *kvStorageBackend) ListModifiedSince(ctx context.Context, key Namespaced
}
}
// Generate a new resource version for the list
listRV := k.snowflake.Generate().Int64()
latestEvent, err := k.eventStore.LastEventKey(ctx)
if err != nil {
if errors.Is(err, ErrNotFound) {
return sinceRv, func(yield func(*ModifiedResource, error) bool) { /* nothing to return */ }
}
return 0, func(yield func(*ModifiedResource, error) bool) {
yield(nil, fmt.Errorf("error trying to retrieve last event key: %s", err))
}
}
if latestEvent.ResourceVersion == sinceRv {
return sinceRv, func(yield func(*ModifiedResource, error) bool) { /* nothing to return */ }
}
// Check if sinceRv is older than 1 hour
sinceRvTimestamp := snowflake.ID(sinceRv).Time()
@@ -810,11 +823,11 @@ func (k *kvStorageBackend) ListModifiedSince(ctx context.Context, key Namespaced
if sinceRvAge > time.Hour {
k.log.Debug("ListModifiedSince using data store", "sinceRv", sinceRv, "sinceRvAge", sinceRvAge)
return listRV, k.listModifiedSinceDataStore(ctx, key, sinceRv)
return latestEvent.ResourceVersion, k.listModifiedSinceDataStore(ctx, key, sinceRv)
}
k.log.Debug("ListModifiedSince using event store", "sinceRv", sinceRv, "sinceRvAge", sinceRvAge)
return listRV, k.listModifiedSinceEventStore(ctx, key, sinceRv)
return latestEvent.ResourceVersion, k.listModifiedSinceEventStore(ctx, key, sinceRv)
}
func convertEventType(action DataAction) resourcepb.WatchEvent_Type {
@@ -915,9 +928,9 @@ func (k *kvStorageBackend) listModifiedSinceDataStore(ctx context.Context, key N
func (k *kvStorageBackend) listModifiedSinceEventStore(ctx context.Context, key NamespacedResource, sinceRv int64) iter.Seq2[*ModifiedResource, error] {
return func(yield func(*ModifiedResource, error) bool) {
// store all events ordered by RV for the given tenant here
eventKeys := make([]EventKey, 0)
for evtKeyStr, err := range k.eventStore.ListKeysSince(ctx, subtractDurationFromSnowflake(sinceRv, defaultLookbackPeriod)) {
// we only care about the latest revision of every resource in the list
seen := make(map[string]struct{})
for evtKeyStr, err := range k.eventStore.ListKeysSince(ctx, subtractDurationFromSnowflake(sinceRv, defaultLookbackPeriod), SortOrderDesc) {
if err != nil {
yield(&ModifiedResource{}, err)
return
@@ -937,18 +950,11 @@ func (k *kvStorageBackend) listModifiedSinceEventStore(ctx context.Context, key
continue
}
eventKeys = append(eventKeys, evtKey)
}
// we only care about the latest revision of every resource in the list
seen := make(map[string]struct{})
for i := len(eventKeys) - 1; i >= 0; i -= 1 {
evtKey := eventKeys[i]
if _, ok := seen[evtKey.Name]; ok {
continue
}
seen[evtKey.Name] = struct{}{}
seen[evtKey.Name] = struct{}{}
value, err := k.getValueFromDataStore(ctx, DataKey(evtKey))
if err != nil {
yield(&ModifiedResource{}, err)
@@ -1306,7 +1312,7 @@ func (b *kvStorageBackend) ProcessBulk(ctx context.Context, setting BulkSettings
if setting.RebuildCollection {
for _, key := range setting.Collection {
events := make([]string, 0)
for evtKeyStr, err := range b.eventStore.ListKeysSince(ctx, 1) {
for evtKeyStr, err := range b.eventStore.ListKeysSince(ctx, 1, SortOrderAsc) {
if err != nil {
b.log.Error("failed to list event: %s", err)
return rsp

View File

@@ -99,6 +99,9 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
return nil, err
}
isHA := isHighAvailabilityEnabled(opts.Cfg.SectionWithEnvOverrides("database"),
opts.Cfg.SectionWithEnvOverrides("resource_api"))
if opts.Cfg.EnableSQLKVBackend {
sqlkv, err := resource.NewSQLKV(eDB)
if err != nil {
@@ -106,9 +109,10 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
}
kvBackendOpts := resource.KVBackendOptions{
KvStore: sqlkv,
Tracer: opts.Tracer,
Reg: opts.Reg,
KvStore: sqlkv,
Tracer: opts.Tracer,
Reg: opts.Reg,
UseChannelNotifier: !isHA,
}
ctx := context.Background()
@@ -140,9 +144,6 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
serverOptions.Backend = kvBackend
serverOptions.Diagnostics = kvBackend
} else {
isHA := isHighAvailabilityEnabled(opts.Cfg.SectionWithEnvOverrides("database"),
opts.Cfg.SectionWithEnvOverrides("resource_api"))
backend, err := NewBackend(BackendOptions{
DBProvider: eDB,
Reg: opts.Reg,

View File

@@ -23,6 +23,7 @@ import (
"github.com/grafana/authlib/types"
"github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
sqldb "github.com/grafana/grafana/pkg/storage/unified/sql/db"
@@ -99,6 +100,10 @@ func RunStorageBackendTest(t *testing.T, newBackend NewBackendFunc, opts *TestOp
}
t.Run(tc.name, func(t *testing.T) {
if db.IsTestDbSQLite() {
t.Skip("Skipping tests on sqlite until channel notifier is implemented")
}
tc.fn(t, newBackend(context.Background()), opts.NSPrefix)
})
}
@@ -550,7 +555,7 @@ func runTestIntegrationBackendListModifiedSince(t *testing.T, backend resource.S
Resource: "resource",
}
latestRv, seq := backend.ListModifiedSince(ctx, key, rvCreated)
require.Greater(t, latestRv, rvCreated)
require.Equal(t, latestRv, rvDeleted)
counter := 0
for res, err := range seq {
@@ -624,11 +629,11 @@ func runTestIntegrationBackendListModifiedSince(t *testing.T, backend resource.S
rvCreated3, _ := writeEvent(ctx, backend, "bItem", resourcepb.WatchEvent_ADDED, WithNamespace(ns))
latestRv, seq := backend.ListModifiedSince(ctx, key, rvCreated1-1)
require.Greater(t, latestRv, rvCreated3)
require.Equal(t, latestRv, rvCreated3)
counter := 0
names := []string{"aItem", "bItem", "cItem"}
rvs := []int64{rvCreated2, rvCreated3, rvCreated1}
names := []string{"bItem", "aItem", "cItem"}
rvs := []int64{rvCreated3, rvCreated2, rvCreated1}
for res, err := range seq {
require.NoError(t, err)
require.Equal(t, key.Namespace, res.Key.Namespace)
@@ -1166,7 +1171,7 @@ func runTestIntegrationBackendCreateNewResource(t *testing.T, backend resource.S
}))
server := newServer(t, backend)
ns := nsPrefix + "-create-resource"
ns := nsPrefix + "-create-rsrce" // create-resource
ctx = request.WithNamespace(ctx, ns)
request := &resourcepb.CreateRequest{
@@ -1607,7 +1612,7 @@ func (s *sliceBulkRequestIterator) RollbackRequested() bool {
func runTestIntegrationBackendOptimisticLocking(t *testing.T, backend resource.StorageBackend, nsPrefix string) {
ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second))
ns := nsPrefix + "-optimistic-locking"
ns := nsPrefix + "-optimis-lock" // optimistic-locking. need to cut down on characters to not exceed namespace character limit (40)
t.Run("concurrent updates with same RV - only one succeeds", func(t *testing.T) {
// Create initial resource with rv0 (no previous RV)

View File

@@ -36,6 +36,10 @@ func NewTestSqlKvBackend(t *testing.T, ctx context.Context, withRvManager bool)
KvStore: kv,
}
if db.DriverName() == "sqlite3" {
kvOpts.UseChannelNotifier = true
}
if withRvManager {
dialect := sqltemplate.DialectForDriver(db.DriverName())
rvManager, err := rvmanager.NewResourceVersionManager(rvmanager.ResourceManagerOptions{

View File

@@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/util/testutil"
)
func TestBadgerKVStorageBackend(t *testing.T) {
@@ -29,26 +30,18 @@ func TestBadgerKVStorageBackend(t *testing.T) {
SkipTests: map[string]bool{
// TODO: fix these tests and remove this skip
TestBlobSupport: true,
TestListModifiedSince: true,
// Badger does not support bulk import yet.
TestGetResourceLastImportTime: true,
},
})
}
func TestSQLKVStorageBackend(t *testing.T) {
func TestIntegrationSQLKVStorageBackend(t *testing.T) {
testutil.SkipIntegrationTestInShortMode(t)
skipTests := map[string]bool{
TestWatchWriteEvents: true,
TestList: true,
TestBlobSupport: true,
TestGetResourceStats: true,
TestListHistory: true,
TestListHistoryErrorReporting: true,
TestListModifiedSince: true,
TestListTrash: true,
TestCreateNewResource: true,
TestGetResourceLastImportTime: true,
TestOptimisticLocking: true,
}
t.Run("Without RvManager", func(t *testing.T) {
@@ -56,7 +49,7 @@ func TestSQLKVStorageBackend(t *testing.T) {
backend, _ := NewTestSqlKvBackend(t, ctx, false)
return backend
}, &TestOptions{
NSPrefix: "sqlkvstorage-test",
NSPrefix: "sqlkvstoragetest",
SkipTests: skipTests,
})
})
@@ -66,7 +59,7 @@ func TestSQLKVStorageBackend(t *testing.T) {
backend, _ := NewTestSqlKvBackend(t, ctx, true)
return backend
}, &TestOptions{
NSPrefix: "sqlkvstorage-withrvmanager-test",
NSPrefix: "sqlkvstoragetest-rvmanager",
SkipTests: skipTests,
})
})