Compare commits

..

7 Commits

Author SHA1 Message Date
Will Assis 861af005e0 unified-storage: fix listModifiedSince in storage_backend.go and enable its tests for both badger and sqlkv 2026-01-14 19:24:15 -03:00
Will Assis 14a05137e1 fmt 2026-01-14 15:43:30 -03:00
Will Assis cfe86378a1 dont run tests on sqlite (yet) 2026-01-14 15:41:51 -03:00
Will Assis f7d7e09626 unified-storage: sqlkv enable more tests 2026-01-14 15:25:42 -03:00
Will Assis ba416eab4e unified-storage: dont use polling notifier with sqlite in sqlkv (#116283)
* unified-storage: dont use polling notifier with sqlite in sqlkv
2026-01-14 18:22:39 +00:00
Alan Martin 189d50d815 UI: Use react-table column header types in InteractiveTable with story and tests (#116091)
* feat(InteractiveTable): allow custom header rendering

* docs(InteractiveTable): add story for custom header rendering

* test(InteractiveTable): add tests for custom header rendering

* docs(InteractiveTable): add custom header rendering documentation

* fix: test failure from non-a11y code
2026-01-14 17:59:03 +00:00
Mariell Hoversholm 450eaba447 test: skip integration test in short mode (#116280) 2026-01-14 18:33:55 +01:00
16 changed files with 308 additions and 224 deletions
@@ -117,6 +117,44 @@ export const MyComponent = () => {
}; };
``` ```
### Custom Header Rendering
Column headers can be customized using strings, React elements, or renderer functions. The `header` property accepts any value that matches React Table's `Renderer` type.
**Important:** When using custom header content, prefer inline elements (like `<span>`) over block elements (like `<div>`) to avoid layout issues. Block-level elements can cause extra spacing and alignment problems in table headers because they disrupt the table's inline flow. Use `display: inline-flex` or `display: inline-block` when you need flexbox or block-like behavior.
```tsx
const columns: Array<Column<TableData>> = [
// React element header
{
id: 'checkbox',
header: (
<>
<label htmlFor="select-all" className="sr-only">
Select all rows
</label>
<Checkbox id="select-all" />
</>
),
cell: () => <Checkbox aria-label="Select row" />,
},
// Function renderer header
{
id: 'firstName',
header: () => (
<span style={{ display: 'inline-flex', alignItems: 'center', gap: '8px' }}>
<Icon name="user" size="sm" />
<span>First Name</span>
</span>
),
},
// String header
{ id: 'lastName', header: 'Last name' },
];
```
### Custom Cell Rendering ### Custom Cell Rendering
Individual cells can be rendered using custom content dy defining a `cell` property on the column definition. Individual cells can be rendered using custom content dy defining a `cell` property on the column definition.
@@ -3,8 +3,11 @@ import { useCallback, useMemo, useState } from 'react';
import { CellProps } from 'react-table'; import { CellProps } from 'react-table';
import { LinkButton } from '../Button/Button'; import { LinkButton } from '../Button/Button';
import { Checkbox } from '../Forms/Checkbox';
import { Field } from '../Forms/Field'; import { Field } from '../Forms/Field';
import { Icon } from '../Icon/Icon';
import { Input } from '../Input/Input'; import { Input } from '../Input/Input';
import { Text } from '../Text/Text';
import { FetchDataArgs, InteractiveTable, InteractiveTableHeaderTooltip } from './InteractiveTable'; import { FetchDataArgs, InteractiveTable, InteractiveTableHeaderTooltip } from './InteractiveTable';
import mdx from './InteractiveTable.mdx'; import mdx from './InteractiveTable.mdx';
@@ -297,4 +300,40 @@ export const WithControlledSort: StoryFn<typeof InteractiveTable> = (args) => {
return <InteractiveTable {...args} data={data} pageSize={15} fetchData={fetchData} />; return <InteractiveTable {...args} data={data} pageSize={15} fetchData={fetchData} />;
}; };
export const WithCustomHeader: TableStoryObj = {
args: {
columns: [
// React element header
{
id: 'checkbox',
header: (
<>
<label htmlFor="select-all" className="sr-only">
Select all rows
</label>
<Checkbox id="select-all" />
</>
),
cell: () => <Checkbox aria-label="Select row" />,
},
// Function renderer header
{
id: 'firstName',
header: () => (
<span style={{ display: 'inline-flex', alignItems: 'center', gap: '8px' }}>
<Icon name="user" size="sm" />
<Text element="span">First Name</Text>
</span>
),
sortType: 'string',
},
// String header
{ id: 'lastName', header: 'Last name', sortType: 'string' },
{ id: 'car', header: 'Car', sortType: 'string' },
{ id: 'age', header: 'Age', sortType: 'number' },
],
data: pageableData.slice(0, 10),
getRowId: (r) => r.id,
},
};
export default meta; export default meta;
@@ -2,6 +2,9 @@ import { render, screen, within } from '@testing-library/react';
import userEvent from '@testing-library/user-event'; import userEvent from '@testing-library/user-event';
import * as React from 'react'; import * as React from 'react';
import { Checkbox } from '../Forms/Checkbox';
import { Icon } from '../Icon/Icon';
import { InteractiveTable } from './InteractiveTable'; import { InteractiveTable } from './InteractiveTable';
import { Column } from './types'; import { Column } from './types';
@@ -247,4 +250,104 @@ describe('InteractiveTable', () => {
expect(fetchData).toHaveBeenCalledWith({ sortBy: [{ id: 'id', desc: false }] }); expect(fetchData).toHaveBeenCalledWith({ sortBy: [{ id: 'id', desc: false }] });
}); });
}); });
describe('custom header rendering', () => {
it('should render string headers', () => {
const columns: Array<Column<TableData>> = [{ id: 'id', header: 'ID' }];
const data: TableData[] = [{ id: '1', value: '1', country: 'Sweden' }];
render(<InteractiveTable columns={columns} data={data} getRowId={getRowId} />);
expect(screen.getByRole('columnheader', { name: 'ID' })).toBeInTheDocument();
});
it('should render React element headers', () => {
const columns: Array<Column<TableData>> = [
{
id: 'checkbox',
header: (
<>
<label htmlFor="select-all" className="sr-only">
Select all rows
</label>
<Checkbox id="select-all" data-testid="header-checkbox" />
</>
),
cell: () => <Checkbox data-testid="cell-checkbox" aria-label="Select row" />,
},
];
const data: TableData[] = [{ id: '1', value: '1', country: 'Sweden' }];
render(<InteractiveTable columns={columns} data={data} getRowId={getRowId} />);
expect(screen.getByTestId('header-checkbox')).toBeInTheDocument();
expect(screen.getByTestId('cell-checkbox')).toBeInTheDocument();
expect(screen.getByLabelText('Select all rows')).toBeInTheDocument();
expect(screen.getByLabelText('Select row')).toBeInTheDocument();
expect(screen.getByText('Select all rows')).toBeInTheDocument();
});
it('should render function renderer headers', () => {
const columns: Array<Column<TableData>> = [
{
id: 'firstName',
header: () => (
<span style={{ display: 'inline-flex', alignItems: 'center', gap: '8px' }}>
<Icon name="user" size="sm" data-testid="header-icon" />
<span>First Name</span>
</span>
),
sortType: 'string',
},
];
const data: TableData[] = [{ id: '1', value: '1', country: 'Sweden' }];
render(<InteractiveTable columns={columns} data={data} getRowId={getRowId} />);
expect(screen.getByTestId('header-icon')).toBeInTheDocument();
expect(screen.getByRole('columnheader', { name: /first name/i })).toBeInTheDocument();
});
it('should render all header types together', () => {
const columns: Array<Column<TableData>> = [
{
id: 'checkbox',
header: (
<>
<label htmlFor="select-all" className="sr-only">
Select all rows
</label>
<Checkbox id="select-all" data-testid="header-checkbox" />
</>
),
cell: () => <Checkbox aria-label="Select row" />,
},
{
id: 'id',
header: () => (
<span style={{ display: 'inline-flex', alignItems: 'center', gap: '8px' }}>
<Icon name="user" size="sm" data-testid="header-icon" />
<span>ID</span>
</span>
),
sortType: 'string',
},
{ id: 'country', header: 'Country', sortType: 'string' },
{ id: 'value', header: 'Value' },
];
const data: TableData[] = [
{ id: '1', value: 'Value 1', country: 'Sweden' },
{ id: '2', value: 'Value 2', country: 'Norway' },
];
render(<InteractiveTable columns={columns} data={data} getRowId={getRowId} />);
expect(screen.getByTestId('header-checkbox')).toBeInTheDocument();
expect(screen.getByTestId('header-icon')).toBeInTheDocument();
expect(screen.getByRole('columnheader', { name: 'Country' })).toBeInTheDocument();
expect(screen.getByRole('columnheader', { name: 'Value' })).toBeInTheDocument();
// Verify data is rendered
expect(screen.getByText('Sweden')).toBeInTheDocument();
expect(screen.getByText('Norway')).toBeInTheDocument();
expect(screen.getByText('Value 1')).toBeInTheDocument();
expect(screen.getByText('Value 2')).toBeInTheDocument();
});
});
}); });
@@ -1,5 +1,5 @@
import { ReactNode } from 'react'; import { ReactNode } from 'react';
import { CellProps, DefaultSortTypes, IdType, SortByFn } from 'react-table'; import { CellProps, DefaultSortTypes, HeaderProps, IdType, Renderer, SortByFn } from 'react-table';
export interface Column<TableData extends object> { export interface Column<TableData extends object> {
/** /**
@@ -11,9 +11,9 @@ export interface Column<TableData extends object> {
*/ */
cell?: (props: CellProps<TableData>) => ReactNode; cell?: (props: CellProps<TableData>) => ReactNode;
/** /**
* Header name. if `undefined` the header will be empty. Useful for action columns. * Header name. Can be a string, renderer function, or undefined. If `undefined` the header will be empty. Useful for action columns.
*/ */
header?: string; header?: Renderer<HeaderProps<TableData>>;
/** /**
* Column sort type. If `undefined` the column will not be sortable. * Column sort type. If `undefined` the column will not be sortable.
* */ * */
+4 -4
View File
@@ -184,9 +184,9 @@ func (n *eventStore) Get(ctx context.Context, key EventKey) (Event, error) {
} }
// ListSince returns a sequence of events since the given resource version. // ListSince returns a sequence of events since the given resource version.
func (n *eventStore) ListKeysSince(ctx context.Context, sinceRV int64) iter.Seq2[string, error] { func (n *eventStore) ListKeysSince(ctx context.Context, sinceRV int64, sortOrder SortOrder) iter.Seq2[string, error] {
opts := ListOptions{ opts := ListOptions{
Sort: SortOrderAsc, Sort: sortOrder,
StartKey: fmt.Sprintf("%d", sinceRV), StartKey: fmt.Sprintf("%d", sinceRV),
} }
return func(yield func(string, error) bool) { return func(yield func(string, error) bool) {
@@ -202,9 +202,9 @@ func (n *eventStore) ListKeysSince(ctx context.Context, sinceRV int64) iter.Seq2
} }
} }
func (n *eventStore) ListSince(ctx context.Context, sinceRV int64) iter.Seq2[Event, error] { func (n *eventStore) ListSince(ctx context.Context, sinceRV int64, sortOrder SortOrder) iter.Seq2[Event, error] {
return func(yield func(Event, error) bool) { return func(yield func(Event, error) bool) {
for evtKey, err := range n.ListKeysSince(ctx, sinceRV) { for evtKey, err := range n.ListKeysSince(ctx, sinceRV, sortOrder) {
if err != nil { if err != nil {
yield(Event{}, err) yield(Event{}, err)
return return
@@ -369,7 +369,7 @@ func testEventStoreListKeysSince(t *testing.T, ctx context.Context, store *event
// List events since RV 1500 (should get events with RV 2000 and 3000) // List events since RV 1500 (should get events with RV 2000 and 3000)
retrievedEvents := make([]string, 0, 2) retrievedEvents := make([]string, 0, 2)
for eventKey, err := range store.ListKeysSince(ctx, 1500) { for eventKey, err := range store.ListKeysSince(ctx, 1500, SortOrderAsc) {
require.NoError(t, err) require.NoError(t, err)
retrievedEvents = append(retrievedEvents, eventKey) retrievedEvents = append(retrievedEvents, eventKey)
} }
@@ -429,7 +429,7 @@ func testEventStoreListSince(t *testing.T, ctx context.Context, store *eventStor
// List events since RV 1500 (should get events with RV 2000 and 3000) // List events since RV 1500 (should get events with RV 2000 and 3000)
retrievedEvents := make([]Event, 0, 2) retrievedEvents := make([]Event, 0, 2)
for event, err := range store.ListSince(ctx, 1500) { for event, err := range store.ListSince(ctx, 1500, SortOrderAsc) {
require.NoError(t, err) require.NoError(t, err)
retrievedEvents = append(retrievedEvents, event) retrievedEvents = append(retrievedEvents, event)
} }
@@ -453,7 +453,7 @@ func TestEventStore_ListSince_Empty(t *testing.T) {
func testEventStoreListSinceEmpty(t *testing.T, ctx context.Context, store *eventStore) { func testEventStoreListSinceEmpty(t *testing.T, ctx context.Context, store *eventStore) {
// List events when store is empty // List events when store is empty
retrievedEvents := make([]Event, 0) retrievedEvents := make([]Event, 0)
for event, err := range store.ListSince(ctx, 0) { for event, err := range store.ListSince(ctx, 0, SortOrderAsc) {
require.NoError(t, err) require.NoError(t, err)
retrievedEvents = append(retrievedEvents, event) retrievedEvents = append(retrievedEvents, event)
} }
@@ -825,7 +825,7 @@ func testListKeysSinceWithSnowflakeTime(t *testing.T, ctx context.Context, store
// List events since 90 minutes ago using subtractDurationFromSnowflake // List events since 90 minutes ago using subtractDurationFromSnowflake
sinceRV := subtractDurationFromSnowflake(snowflakeFromTime(now), 90*time.Minute) sinceRV := subtractDurationFromSnowflake(snowflakeFromTime(now), 90*time.Minute)
retrievedEvents := make([]string, 0) retrievedEvents := make([]string, 0)
for eventKey, err := range store.ListKeysSince(ctx, sinceRV) { for eventKey, err := range store.ListKeysSince(ctx, sinceRV, SortOrderAsc) {
require.NoError(t, err) require.NoError(t, err)
retrievedEvents = append(retrievedEvents, eventKey) retrievedEvents = append(retrievedEvents, eventKey)
} }
@@ -842,7 +842,7 @@ func testListKeysSinceWithSnowflakeTime(t *testing.T, ctx context.Context, store
// List events since 30 minutes ago using subtractDurationFromSnowflake // List events since 30 minutes ago using subtractDurationFromSnowflake
sinceRV = subtractDurationFromSnowflake(snowflakeFromTime(now), 30*time.Minute) sinceRV = subtractDurationFromSnowflake(snowflakeFromTime(now), 30*time.Minute)
retrievedEvents = make([]string, 0) retrievedEvents = make([]string, 0)
for eventKey, err := range store.ListKeysSince(ctx, sinceRV) { for eventKey, err := range store.ListKeysSince(ctx, sinceRV, SortOrderAsc) {
require.NoError(t, err) require.NoError(t, err)
retrievedEvents = append(retrievedEvents, eventKey) retrievedEvents = append(retrievedEvents, eventKey)
} }
+24 -8
View File
@@ -19,13 +19,18 @@ const (
defaultBufferSize = 10000 defaultBufferSize = 10000
) )
type notifier struct { type notifier interface {
Watch(context.Context, watchOptions) <-chan Event
}
type pollingNotifier struct {
eventStore *eventStore eventStore *eventStore
log logging.Logger log logging.Logger
} }
type notifierOptions struct { type notifierOptions struct {
log logging.Logger log logging.Logger
useChannelNotifier bool
} }
type watchOptions struct { type watchOptions struct {
@@ -44,15 +49,26 @@ func defaultWatchOptions() watchOptions {
} }
} }
func newNotifier(eventStore *eventStore, opts notifierOptions) *notifier { func newNotifier(eventStore *eventStore, opts notifierOptions) notifier {
if opts.log == nil { if opts.log == nil {
opts.log = &logging.NoOpLogger{} opts.log = &logging.NoOpLogger{}
} }
return &notifier{eventStore: eventStore, log: opts.log}
if opts.useChannelNotifier {
return &channelNotifier{}
}
return &pollingNotifier{eventStore: eventStore, log: opts.log}
}
type channelNotifier struct{}
func (cn *channelNotifier) Watch(ctx context.Context, opts watchOptions) <-chan Event {
return nil
} }
// Return the last resource version from the event store // Return the last resource version from the event store
func (n *notifier) lastEventResourceVersion(ctx context.Context) (int64, error) { func (n *pollingNotifier) lastEventResourceVersion(ctx context.Context) (int64, error) {
e, err := n.eventStore.LastEventKey(ctx) e, err := n.eventStore.LastEventKey(ctx)
if err != nil { if err != nil {
return 0, err return 0, err
@@ -60,11 +76,11 @@ func (n *notifier) lastEventResourceVersion(ctx context.Context) (int64, error)
return e.ResourceVersion, nil return e.ResourceVersion, nil
} }
func (n *notifier) cacheKey(evt Event) string { func (n *pollingNotifier) cacheKey(evt Event) string {
return fmt.Sprintf("%s~%s~%s~%s~%d", evt.Namespace, evt.Group, evt.Resource, evt.Name, evt.ResourceVersion) return fmt.Sprintf("%s~%s~%s~%s~%d", evt.Namespace, evt.Group, evt.Resource, evt.Name, evt.ResourceVersion)
} }
func (n *notifier) Watch(ctx context.Context, opts watchOptions) <-chan Event { func (n *pollingNotifier) Watch(ctx context.Context, opts watchOptions) <-chan Event {
if opts.MinBackoff <= 0 { if opts.MinBackoff <= 0 {
opts.MinBackoff = defaultMinBackoff opts.MinBackoff = defaultMinBackoff
} }
@@ -103,7 +119,7 @@ func (n *notifier) Watch(ctx context.Context, opts watchOptions) <-chan Event {
return return
case <-time.After(currentInterval): case <-time.After(currentInterval):
foundEvents := false foundEvents := false
for evt, err := range n.eventStore.ListSince(ctx, subtractDurationFromSnowflake(lastRV, opts.LookbackPeriod)) { for evt, err := range n.eventStore.ListSince(ctx, subtractDurationFromSnowflake(lastRV, opts.LookbackPeriod), SortOrderAsc) {
if err != nil { if err != nil {
n.log.Error("Failed to list events since", "error", err) n.log.Error("Failed to list events since", "error", err)
continue continue
+12 -12
View File
@@ -13,7 +13,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func setupTestNotifier(t *testing.T) (*notifier, *eventStore) { func setupTestNotifier(t *testing.T) (*pollingNotifier, *eventStore) {
db := setupTestBadgerDB(t) db := setupTestBadgerDB(t)
t.Cleanup(func() { t.Cleanup(func() {
err := db.Close() err := db.Close()
@@ -22,10 +22,10 @@ func setupTestNotifier(t *testing.T) (*notifier, *eventStore) {
kv := NewBadgerKV(db) kv := NewBadgerKV(db)
eventStore := newEventStore(kv) eventStore := newEventStore(kv)
notifier := newNotifier(eventStore, notifierOptions{log: &logging.NoOpLogger{}}) notifier := newNotifier(eventStore, notifierOptions{log: &logging.NoOpLogger{}})
return notifier, eventStore return notifier.(*pollingNotifier), eventStore
} }
func setupTestNotifierSqlKv(t *testing.T) (*notifier, *eventStore) { func setupTestNotifierSqlKv(t *testing.T) (*pollingNotifier, *eventStore) {
dbstore := db.InitTestDB(t) dbstore := db.InitTestDB(t)
eDB, err := dbimpl.ProvideResourceDB(dbstore, setting.NewCfg(), nil) eDB, err := dbimpl.ProvideResourceDB(dbstore, setting.NewCfg(), nil)
require.NoError(t, err) require.NoError(t, err)
@@ -33,7 +33,7 @@ func setupTestNotifierSqlKv(t *testing.T) (*notifier, *eventStore) {
require.NoError(t, err) require.NoError(t, err)
eventStore := newEventStore(kv) eventStore := newEventStore(kv)
notifier := newNotifier(eventStore, notifierOptions{log: &logging.NoOpLogger{}}) notifier := newNotifier(eventStore, notifierOptions{log: &logging.NoOpLogger{}})
return notifier, eventStore return notifier.(*pollingNotifier), eventStore
} }
func TestNewNotifier(t *testing.T) { func TestNewNotifier(t *testing.T) {
@@ -49,7 +49,7 @@ func TestDefaultWatchOptions(t *testing.T) {
assert.Equal(t, defaultBufferSize, opts.BufferSize) assert.Equal(t, defaultBufferSize, opts.BufferSize)
} }
func runNotifierTestWith(t *testing.T, storeName string, newStoreFn func(*testing.T) (*notifier, *eventStore), testFn func(*testing.T, context.Context, *notifier, *eventStore)) { func runNotifierTestWith(t *testing.T, storeName string, newStoreFn func(*testing.T) (*pollingNotifier, *eventStore), testFn func(*testing.T, context.Context, *pollingNotifier, *eventStore)) {
t.Run(storeName, func(t *testing.T) { t.Run(storeName, func(t *testing.T) {
ctx := context.Background() ctx := context.Background()
notifier, eventStore := newStoreFn(t) notifier, eventStore := newStoreFn(t)
@@ -62,7 +62,7 @@ func TestNotifier_lastEventResourceVersion(t *testing.T) {
runNotifierTestWith(t, "sqlkv", setupTestNotifierSqlKv, testNotifierLastEventResourceVersion) runNotifierTestWith(t, "sqlkv", setupTestNotifierSqlKv, testNotifierLastEventResourceVersion)
} }
func testNotifierLastEventResourceVersion(t *testing.T, ctx context.Context, notifier *notifier, eventStore *eventStore) { func testNotifierLastEventResourceVersion(t *testing.T, ctx context.Context, notifier *pollingNotifier, eventStore *eventStore) {
// Test with no events // Test with no events
rv, err := notifier.lastEventResourceVersion(ctx) rv, err := notifier.lastEventResourceVersion(ctx)
assert.Error(t, err) assert.Error(t, err)
@@ -113,7 +113,7 @@ func TestNotifier_cachekey(t *testing.T) {
runNotifierTestWith(t, "sqlkv", setupTestNotifierSqlKv, testNotifierCachekey) runNotifierTestWith(t, "sqlkv", setupTestNotifierSqlKv, testNotifierCachekey)
} }
func testNotifierCachekey(t *testing.T, ctx context.Context, notifier *notifier, eventStore *eventStore) { func testNotifierCachekey(t *testing.T, ctx context.Context, notifier *pollingNotifier, eventStore *eventStore) {
tests := []struct { tests := []struct {
name string name string
event Event event Event
@@ -167,7 +167,7 @@ func TestNotifier_Watch_NoEvents(t *testing.T) {
runNotifierTestWith(t, "sqlkv", setupTestNotifierSqlKv, testNotifierWatchNoEvents) runNotifierTestWith(t, "sqlkv", setupTestNotifierSqlKv, testNotifierWatchNoEvents)
} }
func testNotifierWatchNoEvents(t *testing.T, ctx context.Context, notifier *notifier, eventStore *eventStore) { func testNotifierWatchNoEvents(t *testing.T, ctx context.Context, notifier *pollingNotifier, eventStore *eventStore) {
ctx, cancel := context.WithTimeout(ctx, 500*time.Millisecond) ctx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
defer cancel() defer cancel()
@@ -208,7 +208,7 @@ func TestNotifier_Watch_WithExistingEvents(t *testing.T) {
runNotifierTestWith(t, "sqlkv", setupTestNotifierSqlKv, testNotifierWatchWithExistingEvents) runNotifierTestWith(t, "sqlkv", setupTestNotifierSqlKv, testNotifierWatchWithExistingEvents)
} }
func testNotifierWatchWithExistingEvents(t *testing.T, ctx context.Context, notifier *notifier, eventStore *eventStore) { func testNotifierWatchWithExistingEvents(t *testing.T, ctx context.Context, notifier *pollingNotifier, eventStore *eventStore) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second) ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel() defer cancel()
@@ -282,7 +282,7 @@ func TestNotifier_Watch_EventDeduplication(t *testing.T) {
runNotifierTestWith(t, "sqlkv", setupTestNotifierSqlKv, testNotifierWatchEventDeduplication) runNotifierTestWith(t, "sqlkv", setupTestNotifierSqlKv, testNotifierWatchEventDeduplication)
} }
func testNotifierWatchEventDeduplication(t *testing.T, ctx context.Context, notifier *notifier, eventStore *eventStore) { func testNotifierWatchEventDeduplication(t *testing.T, ctx context.Context, notifier *pollingNotifier, eventStore *eventStore) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second) ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel() defer cancel()
@@ -348,7 +348,7 @@ func TestNotifier_Watch_ContextCancellation(t *testing.T) {
runNotifierTestWith(t, "sqlkv", setupTestNotifierSqlKv, testNotifierWatchContextCancellation) runNotifierTestWith(t, "sqlkv", setupTestNotifierSqlKv, testNotifierWatchContextCancellation)
} }
func testNotifierWatchContextCancellation(t *testing.T, ctx context.Context, notifier *notifier, eventStore *eventStore) { func testNotifierWatchContextCancellation(t *testing.T, ctx context.Context, notifier *pollingNotifier, eventStore *eventStore) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
// Add an initial event so that lastEventResourceVersion doesn't return ErrNotFound // Add an initial event so that lastEventResourceVersion doesn't return ErrNotFound
@@ -394,7 +394,7 @@ func TestNotifier_Watch_MultipleEvents(t *testing.T) {
runNotifierTestWith(t, "sqlkv", setupTestNotifierSqlKv, testNotifierWatchMultipleEvents) runNotifierTestWith(t, "sqlkv", setupTestNotifierSqlKv, testNotifierWatchMultipleEvents)
} }
func testNotifierWatchMultipleEvents(t *testing.T, ctx context.Context, notifier *notifier, eventStore *eventStore) { func testNotifierWatchMultipleEvents(t *testing.T, ctx context.Context, notifier *pollingNotifier, eventStore *eventStore) {
ctx, cancel := context.WithTimeout(ctx, 3*time.Second) ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel() defer cancel()
rv := time.Now().UnixNano() rv := time.Now().UnixNano()
+24 -18
View File
@@ -61,7 +61,7 @@ type kvStorageBackend struct {
bulkLock *BulkLock bulkLock *BulkLock
dataStore *dataStore dataStore *dataStore
eventStore *eventStore eventStore *eventStore
notifier *notifier notifier notifier
builder DocumentBuilder builder DocumentBuilder
log logging.Logger log logging.Logger
withPruner bool withPruner bool
@@ -91,6 +91,7 @@ type KVBackendOptions struct {
Tracer trace.Tracer // TODO add tracing Tracer trace.Tracer // TODO add tracing
Reg prometheus.Registerer // TODO add metrics Reg prometheus.Registerer // TODO add metrics
UseChannelNotifier bool
// Adding RvManager overrides the RV generated with snowflake in order to keep backwards compatibility with // Adding RvManager overrides the RV generated with snowflake in order to keep backwards compatibility with
// unified/sql // unified/sql
RvManager *rvmanager.ResourceVersionManager RvManager *rvmanager.ResourceVersionManager
@@ -121,7 +122,7 @@ func NewKVStorageBackend(opts KVBackendOptions) (KVBackend, error) {
bulkLock: NewBulkLock(), bulkLock: NewBulkLock(),
dataStore: newDataStore(kv), dataStore: newDataStore(kv),
eventStore: eventStore, eventStore: eventStore,
notifier: newNotifier(eventStore, notifierOptions{}), notifier: newNotifier(eventStore, notifierOptions{useChannelNotifier: opts.UseChannelNotifier}),
snowflake: s, snowflake: s,
builder: StandardDocumentBuilder(), // For now we use the standard document builder. builder: StandardDocumentBuilder(), // For now we use the standard document builder.
log: &logging.NoOpLogger{}, // Make this configurable log: &logging.NoOpLogger{}, // Make this configurable
@@ -800,8 +801,20 @@ func (k *kvStorageBackend) ListModifiedSince(ctx context.Context, key Namespaced
} }
} }
// Generate a new resource version for the list latestEvent, err := k.eventStore.LastEventKey(ctx)
listRV := k.snowflake.Generate().Int64() if err != nil {
if errors.Is(err, ErrNotFound) {
return sinceRv, func(yield func(*ModifiedResource, error) bool) { /* nothing to return */ }
}
return 0, func(yield func(*ModifiedResource, error) bool) {
yield(nil, fmt.Errorf("error trying to retrieve last event key: %s", err))
}
}
if latestEvent.ResourceVersion == sinceRv {
return sinceRv, func(yield func(*ModifiedResource, error) bool) { /* nothing to return */ }
}
// Check if sinceRv is older than 1 hour // Check if sinceRv is older than 1 hour
sinceRvTimestamp := snowflake.ID(sinceRv).Time() sinceRvTimestamp := snowflake.ID(sinceRv).Time()
@@ -810,11 +823,11 @@ func (k *kvStorageBackend) ListModifiedSince(ctx context.Context, key Namespaced
if sinceRvAge > time.Hour { if sinceRvAge > time.Hour {
k.log.Debug("ListModifiedSince using data store", "sinceRv", sinceRv, "sinceRvAge", sinceRvAge) k.log.Debug("ListModifiedSince using data store", "sinceRv", sinceRv, "sinceRvAge", sinceRvAge)
return listRV, k.listModifiedSinceDataStore(ctx, key, sinceRv) return latestEvent.ResourceVersion, k.listModifiedSinceDataStore(ctx, key, sinceRv)
} }
k.log.Debug("ListModifiedSince using event store", "sinceRv", sinceRv, "sinceRvAge", sinceRvAge) k.log.Debug("ListModifiedSince using event store", "sinceRv", sinceRv, "sinceRvAge", sinceRvAge)
return listRV, k.listModifiedSinceEventStore(ctx, key, sinceRv) return latestEvent.ResourceVersion, k.listModifiedSinceEventStore(ctx, key, sinceRv)
} }
func convertEventType(action DataAction) resourcepb.WatchEvent_Type { func convertEventType(action DataAction) resourcepb.WatchEvent_Type {
@@ -915,9 +928,9 @@ func (k *kvStorageBackend) listModifiedSinceDataStore(ctx context.Context, key N
func (k *kvStorageBackend) listModifiedSinceEventStore(ctx context.Context, key NamespacedResource, sinceRv int64) iter.Seq2[*ModifiedResource, error] { func (k *kvStorageBackend) listModifiedSinceEventStore(ctx context.Context, key NamespacedResource, sinceRv int64) iter.Seq2[*ModifiedResource, error] {
return func(yield func(*ModifiedResource, error) bool) { return func(yield func(*ModifiedResource, error) bool) {
// store all events ordered by RV for the given tenant here // we only care about the latest revision of every resource in the list
eventKeys := make([]EventKey, 0) seen := make(map[string]struct{})
for evtKeyStr, err := range k.eventStore.ListKeysSince(ctx, subtractDurationFromSnowflake(sinceRv, defaultLookbackPeriod)) { for evtKeyStr, err := range k.eventStore.ListKeysSince(ctx, subtractDurationFromSnowflake(sinceRv, defaultLookbackPeriod), SortOrderDesc) {
if err != nil { if err != nil {
yield(&ModifiedResource{}, err) yield(&ModifiedResource{}, err)
return return
@@ -937,18 +950,11 @@ func (k *kvStorageBackend) listModifiedSinceEventStore(ctx context.Context, key
continue continue
} }
eventKeys = append(eventKeys, evtKey)
}
// we only care about the latest revision of every resource in the list
seen := make(map[string]struct{})
for i := len(eventKeys) - 1; i >= 0; i -= 1 {
evtKey := eventKeys[i]
if _, ok := seen[evtKey.Name]; ok { if _, ok := seen[evtKey.Name]; ok {
continue continue
} }
seen[evtKey.Name] = struct{}{}
seen[evtKey.Name] = struct{}{}
value, err := k.getValueFromDataStore(ctx, DataKey(evtKey)) value, err := k.getValueFromDataStore(ctx, DataKey(evtKey))
if err != nil { if err != nil {
yield(&ModifiedResource{}, err) yield(&ModifiedResource{}, err)
@@ -1306,7 +1312,7 @@ func (b *kvStorageBackend) ProcessBulk(ctx context.Context, setting BulkSettings
if setting.RebuildCollection { if setting.RebuildCollection {
for _, key := range setting.Collection { for _, key := range setting.Collection {
events := make([]string, 0) events := make([]string, 0)
for evtKeyStr, err := range b.eventStore.ListKeysSince(ctx, 1) { for evtKeyStr, err := range b.eventStore.ListKeysSince(ctx, 1, SortOrderAsc) {
if err != nil { if err != nil {
b.log.Error("failed to list event: %s", err) b.log.Error("failed to list event: %s", err)
return rsp return rsp
+7 -6
View File
@@ -99,6 +99,9 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
return nil, err return nil, err
} }
isHA := isHighAvailabilityEnabled(opts.Cfg.SectionWithEnvOverrides("database"),
opts.Cfg.SectionWithEnvOverrides("resource_api"))
if opts.Cfg.EnableSQLKVBackend { if opts.Cfg.EnableSQLKVBackend {
sqlkv, err := resource.NewSQLKV(eDB) sqlkv, err := resource.NewSQLKV(eDB)
if err != nil { if err != nil {
@@ -106,9 +109,10 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
} }
kvBackendOpts := resource.KVBackendOptions{ kvBackendOpts := resource.KVBackendOptions{
KvStore: sqlkv, KvStore: sqlkv,
Tracer: opts.Tracer, Tracer: opts.Tracer,
Reg: opts.Reg, Reg: opts.Reg,
UseChannelNotifier: !isHA,
} }
ctx := context.Background() ctx := context.Background()
@@ -140,9 +144,6 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
serverOptions.Backend = kvBackend serverOptions.Backend = kvBackend
serverOptions.Diagnostics = kvBackend serverOptions.Diagnostics = kvBackend
} else { } else {
isHA := isHighAvailabilityEnabled(opts.Cfg.SectionWithEnvOverrides("database"),
opts.Cfg.SectionWithEnvOverrides("resource_api"))
backend, err := NewBackend(BackendOptions{ backend, err := NewBackend(BackendOptions{
DBProvider: eDB, DBProvider: eDB,
Reg: opts.Reg, Reg: opts.Reg,
+11 -6
View File
@@ -23,6 +23,7 @@ import (
"github.com/grafana/authlib/types" "github.com/grafana/authlib/types"
"github.com/grafana/grafana/pkg/apimachinery/utils" "github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/storage/unified/resource" "github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb" "github.com/grafana/grafana/pkg/storage/unified/resourcepb"
sqldb "github.com/grafana/grafana/pkg/storage/unified/sql/db" sqldb "github.com/grafana/grafana/pkg/storage/unified/sql/db"
@@ -99,6 +100,10 @@ func RunStorageBackendTest(t *testing.T, newBackend NewBackendFunc, opts *TestOp
} }
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
if db.IsTestDbSQLite() {
t.Skip("Skipping tests on sqlite until channel notifier is implemented")
}
tc.fn(t, newBackend(context.Background()), opts.NSPrefix) tc.fn(t, newBackend(context.Background()), opts.NSPrefix)
}) })
} }
@@ -550,7 +555,7 @@ func runTestIntegrationBackendListModifiedSince(t *testing.T, backend resource.S
Resource: "resource", Resource: "resource",
} }
latestRv, seq := backend.ListModifiedSince(ctx, key, rvCreated) latestRv, seq := backend.ListModifiedSince(ctx, key, rvCreated)
require.Greater(t, latestRv, rvCreated) require.Equal(t, latestRv, rvDeleted)
counter := 0 counter := 0
for res, err := range seq { for res, err := range seq {
@@ -624,11 +629,11 @@ func runTestIntegrationBackendListModifiedSince(t *testing.T, backend resource.S
rvCreated3, _ := writeEvent(ctx, backend, "bItem", resourcepb.WatchEvent_ADDED, WithNamespace(ns)) rvCreated3, _ := writeEvent(ctx, backend, "bItem", resourcepb.WatchEvent_ADDED, WithNamespace(ns))
latestRv, seq := backend.ListModifiedSince(ctx, key, rvCreated1-1) latestRv, seq := backend.ListModifiedSince(ctx, key, rvCreated1-1)
require.Greater(t, latestRv, rvCreated3) require.Equal(t, latestRv, rvCreated3)
counter := 0 counter := 0
names := []string{"aItem", "bItem", "cItem"} names := []string{"bItem", "aItem", "cItem"}
rvs := []int64{rvCreated2, rvCreated3, rvCreated1} rvs := []int64{rvCreated3, rvCreated2, rvCreated1}
for res, err := range seq { for res, err := range seq {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, key.Namespace, res.Key.Namespace) require.Equal(t, key.Namespace, res.Key.Namespace)
@@ -1166,7 +1171,7 @@ func runTestIntegrationBackendCreateNewResource(t *testing.T, backend resource.S
})) }))
server := newServer(t, backend) server := newServer(t, backend)
ns := nsPrefix + "-create-resource" ns := nsPrefix + "-create-rsrce" // create-resource
ctx = request.WithNamespace(ctx, ns) ctx = request.WithNamespace(ctx, ns)
request := &resourcepb.CreateRequest{ request := &resourcepb.CreateRequest{
@@ -1607,7 +1612,7 @@ func (s *sliceBulkRequestIterator) RollbackRequested() bool {
func runTestIntegrationBackendOptimisticLocking(t *testing.T, backend resource.StorageBackend, nsPrefix string) { func runTestIntegrationBackendOptimisticLocking(t *testing.T, backend resource.StorageBackend, nsPrefix string) {
ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second)) ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second))
ns := nsPrefix + "-optimistic-locking" ns := nsPrefix + "-optimis-lock" // optimistic-locking. need to cut down on characters to not exceed namespace character limit (40)
t.Run("concurrent updates with same RV - only one succeeds", func(t *testing.T) { t.Run("concurrent updates with same RV - only one succeeds", func(t *testing.T) {
// Create initial resource with rv0 (no previous RV) // Create initial resource with rv0 (no previous RV)
@@ -36,6 +36,10 @@ func NewTestSqlKvBackend(t *testing.T, ctx context.Context, withRvManager bool)
KvStore: kv, KvStore: kv,
} }
if db.DriverName() == "sqlite3" {
kvOpts.UseChannelNotifier = true
}
if withRvManager { if withRvManager {
dialect := sqltemplate.DialectForDriver(db.DriverName()) dialect := sqltemplate.DialectForDriver(db.DriverName())
rvManager, err := rvmanager.NewResourceVersionManager(rvmanager.ResourceManagerOptions{ rvManager, err := rvmanager.NewResourceVersionManager(rvmanager.ResourceManagerOptions{
@@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/storage/unified/resource" "github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/util/testutil"
) )
func TestBadgerKVStorageBackend(t *testing.T) { func TestBadgerKVStorageBackend(t *testing.T) {
@@ -29,26 +30,18 @@ func TestBadgerKVStorageBackend(t *testing.T) {
SkipTests: map[string]bool{ SkipTests: map[string]bool{
// TODO: fix these tests and remove this skip // TODO: fix these tests and remove this skip
TestBlobSupport: true, TestBlobSupport: true,
TestListModifiedSince: true,
// Badger does not support bulk import yet. // Badger does not support bulk import yet.
TestGetResourceLastImportTime: true, TestGetResourceLastImportTime: true,
}, },
}) })
} }
func TestSQLKVStorageBackend(t *testing.T) { func TestIntegrationSQLKVStorageBackend(t *testing.T) {
testutil.SkipIntegrationTestInShortMode(t)
skipTests := map[string]bool{ skipTests := map[string]bool{
TestWatchWriteEvents: true,
TestList: true,
TestBlobSupport: true, TestBlobSupport: true,
TestGetResourceStats: true,
TestListHistory: true,
TestListHistoryErrorReporting: true,
TestListModifiedSince: true,
TestListTrash: true,
TestCreateNewResource: true,
TestGetResourceLastImportTime: true, TestGetResourceLastImportTime: true,
TestOptimisticLocking: true,
} }
t.Run("Without RvManager", func(t *testing.T) { t.Run("Without RvManager", func(t *testing.T) {
@@ -56,7 +49,7 @@ func TestSQLKVStorageBackend(t *testing.T) {
backend, _ := NewTestSqlKvBackend(t, ctx, false) backend, _ := NewTestSqlKvBackend(t, ctx, false)
return backend return backend
}, &TestOptions{ }, &TestOptions{
NSPrefix: "sqlkvstorage-test", NSPrefix: "sqlkvstoragetest",
SkipTests: skipTests, SkipTests: skipTests,
}) })
}) })
@@ -66,7 +59,7 @@ func TestSQLKVStorageBackend(t *testing.T) {
backend, _ := NewTestSqlKvBackend(t, ctx, true) backend, _ := NewTestSqlKvBackend(t, ctx, true)
return backend return backend
}, &TestOptions{ }, &TestOptions{
NSPrefix: "sqlkvstorage-withrvmanager-test", NSPrefix: "sqlkvstoragetest-rvmanager",
SkipTests: skipTests, SkipTests: skipTests,
}) })
}) })
+2 -2
View File
@@ -39,7 +39,7 @@ func (s *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthReque
_, span := tracing.DefaultTracer().Start(ctx, "graphite healthcheck") _, span := tracing.DefaultTracer().Start(ctx, "graphite healthcheck")
defer span.End() defer span.End()
graphiteReq, formData, _, _, err := s.createGraphiteRequest(ctx, healthCheckQuery, dsInfo) graphiteReq, formData, _, err := s.createGraphiteRequest(ctx, healthCheckQuery, dsInfo)
if err != nil { if err != nil {
span.RecordError(err) span.RecordError(err)
span.SetStatus(codes.Error, err.Error()) span.SetStatus(codes.Error, err.Error())
@@ -81,7 +81,7 @@ func (s *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthReque
} }
}() }()
_, err = s.toDataFrames(res, healthCheckQuery.RefID, false, targetStr) _, err = s.toDataFrames(res, healthCheckQuery.RefID, false)
if err != nil { if err != nil {
span.RecordError(err) span.RecordError(err)
span.SetStatus(codes.Error, err.Error()) span.SetStatus(codes.Error, err.Error())
+15 -23
View File
@@ -24,16 +24,15 @@ import (
func (s *Service) RunQuery(ctx context.Context, req *backend.QueryDataRequest, dsInfo *datasourceInfo) (*backend.QueryDataResponse, error) { func (s *Service) RunQuery(ctx context.Context, req *backend.QueryDataRequest, dsInfo *datasourceInfo) (*backend.QueryDataResponse, error) {
emptyQueries := []string{} emptyQueries := []string{}
graphiteQueries := map[string]struct { graphiteQueries := map[string]struct {
req *http.Request req *http.Request
formData url.Values formData url.Values
rawTarget string
}{} }{}
// FromAlert header is defined in pkg/services/ngalert/models/constants.go // FromAlert header is defined in pkg/services/ngalert/models/constants.go
fromAlert := req.Headers["FromAlert"] == "true" fromAlert := req.Headers["FromAlert"] == "true"
result := backend.NewQueryDataResponse() result := backend.NewQueryDataResponse()
for _, query := range req.Queries { for _, query := range req.Queries {
graphiteReq, formData, emptyQuery, target, err := s.createGraphiteRequest(ctx, query, dsInfo) graphiteReq, formData, emptyQuery, err := s.createGraphiteRequest(ctx, query, dsInfo)
if err != nil { if err != nil {
result.Responses[query.RefID] = backend.ErrorResponseWithErrorSource(err) result.Responses[query.RefID] = backend.ErrorResponseWithErrorSource(err)
return result, nil return result, nil
@@ -45,13 +44,11 @@ func (s *Service) RunQuery(ctx context.Context, req *backend.QueryDataRequest, d
} }
graphiteQueries[query.RefID] = struct { graphiteQueries[query.RefID] = struct {
req *http.Request req *http.Request
formData url.Values formData url.Values
rawTarget string
}{ }{
req: graphiteReq, req: graphiteReq,
formData: formData, formData: formData,
rawTarget: target,
} }
} }
@@ -102,7 +99,7 @@ func (s *Service) RunQuery(ctx context.Context, req *backend.QueryDataRequest, d
} }
}() }()
queryFrames, err := s.toDataFrames(res, refId, fromAlert, graphiteReq.rawTarget) queryFrames, err := s.toDataFrames(res, refId, fromAlert)
if err != nil { if err != nil {
span.RecordError(err) span.RecordError(err)
span.SetStatus(codes.Error, err.Error()) span.SetStatus(codes.Error, err.Error())
@@ -150,7 +147,7 @@ func (s *Service) processQuery(query backend.DataQuery) (string, *GraphiteQuery,
return target, nil, queryJSON.IsMetricTank, nil return target, nil, queryJSON.IsMetricTank, nil
} }
func (s *Service) createGraphiteRequest(ctx context.Context, query backend.DataQuery, dsInfo *datasourceInfo) (*http.Request, url.Values, *GraphiteQuery, string, error) { func (s *Service) createGraphiteRequest(ctx context.Context, query backend.DataQuery, dsInfo *datasourceInfo) (*http.Request, url.Values, *GraphiteQuery, error) {
/* /*
graphite doc about from and until, with sdk we are getting absolute instead of relative time graphite doc about from and until, with sdk we are getting absolute instead of relative time
https://graphite-api.readthedocs.io/en/latest/api.html#from-until https://graphite-api.readthedocs.io/en/latest/api.html#from-until
@@ -166,12 +163,12 @@ func (s *Service) createGraphiteRequest(ctx context.Context, query backend.DataQ
target, emptyQuery, isMetricTank, err := s.processQuery(query) target, emptyQuery, isMetricTank, err := s.processQuery(query)
if err != nil { if err != nil {
return nil, formData, nil, "", err return nil, formData, nil, err
} }
if emptyQuery != nil { if emptyQuery != nil {
s.logger.Debug("Graphite", "empty query target", emptyQuery) s.logger.Debug("Graphite", "empty query target", emptyQuery)
return nil, formData, emptyQuery, "", nil return nil, formData, emptyQuery, nil
} }
formData["target"] = []string{target} formData["target"] = []string{target}
@@ -191,23 +188,20 @@ func (s *Service) createGraphiteRequest(ctx context.Context, query backend.DataQ
QueryParams: params, QueryParams: params,
}) })
if err != nil { if err != nil {
return nil, formData, nil, "", err return nil, formData, nil, err
} }
return graphiteReq, formData, emptyQuery, target, nil return graphiteReq, formData, emptyQuery, nil
} }
func (s *Service) toDataFrames(response *http.Response, refId string, fromAlert bool, rawTarget string) (frames data.Frames, error error) { func (s *Service) toDataFrames(response *http.Response, refId string, fromAlert bool) (frames data.Frames, error error) {
responseData, err := s.parseResponse(response) responseData, err := s.parseResponse(response)
if err != nil { if err != nil {
return nil, err return nil, err
} }
aliasRegex := regexp.MustCompile(`(alias\(|aliasByMetric|aliasByNode|aliasByTags|aliasQuery|aliasSub)`)
frames = data.Frames{} frames = data.Frames{}
for _, series := range responseData { for _, series := range responseData {
aliasMatch := aliasRegex.MatchString(rawTarget)
timeVector := make([]time.Time, 0, len(series.DataPoints)) timeVector := make([]time.Time, 0, len(series.DataPoints))
values := make([]*float64, 0, len(series.DataPoints)) values := make([]*float64, 0, len(series.DataPoints))
@@ -223,9 +217,7 @@ func (s *Service) toDataFrames(response *http.Response, refId string, fromAlert
tags := make(map[string]string) tags := make(map[string]string)
for name, value := range series.Tags { for name, value := range series.Tags {
if name == "name" { if name == "name" {
// Queries with aliases should use the target as the name if fromAlert {
// to ensure multi-dimensional queries are distinguishable from each other
if fromAlert || aliasMatch {
value = series.Target value = series.Target
} }
} }
+11 -124
View File
@@ -182,7 +182,7 @@ func TestConvertResponses(t *testing.T) {
expectedFrames := data.Frames{expectedFrame} expectedFrames := data.Frames{expectedFrame}
httpResponse := &http.Response{StatusCode: 200, Body: io.NopCloser(strings.NewReader(body))} httpResponse := &http.Response{StatusCode: 200, Body: io.NopCloser(strings.NewReader(body))}
dataFrames, err := service.toDataFrames(httpResponse, refId, false, "target") dataFrames, err := service.toDataFrames(httpResponse, refId, false)
require.NoError(t, err) require.NoError(t, err)
if !reflect.DeepEqual(expectedFrames, dataFrames) { if !reflect.DeepEqual(expectedFrames, dataFrames) {
@@ -196,7 +196,7 @@ func TestConvertResponses(t *testing.T) {
body := ` body := `
[ [
{ {
"target": "alias(target)", "target": "aliasedTarget(target)",
"tags": { "name": "target", "fooTag": "fooValue", "barTag": "barValue", "int": 100, "float": 3.14 }, "tags": { "name": "target", "fooTag": "fooValue", "barTag": "barValue", "int": 100, "float": 3.14 },
"datapoints": [[50, 1], [null, 2], [100, 3]] "datapoints": [[50, 1], [null, 2], [100, 3]]
} }
@@ -211,13 +211,13 @@ func TestConvertResponses(t *testing.T) {
"barTag": "barValue", "barTag": "barValue",
"int": "100", "int": "100",
"float": "3.14", "float": "3.14",
"name": "alias(target)", "name": "target",
}, []*float64{&a, nil, &b}).SetConfig(&data.FieldConfig{DisplayNameFromDS: "alias(target)"}), }, []*float64{&a, nil, &b}).SetConfig(&data.FieldConfig{DisplayNameFromDS: "aliasedTarget(target)"}),
).SetMeta(&data.FrameMeta{Type: data.FrameTypeTimeSeriesMulti}) ).SetMeta(&data.FrameMeta{Type: data.FrameTypeTimeSeriesMulti})
expectedFrames := data.Frames{expectedFrame} expectedFrames := data.Frames{expectedFrame}
httpResponse := &http.Response{StatusCode: 200, Body: io.NopCloser(strings.NewReader(body))} httpResponse := &http.Response{StatusCode: 200, Body: io.NopCloser(strings.NewReader(body))}
dataFrames, err := service.toDataFrames(httpResponse, refId, false, "alias(target)") dataFrames, err := service.toDataFrames(httpResponse, refId, false)
require.NoError(t, err) require.NoError(t, err)
if !reflect.DeepEqual(expectedFrames, dataFrames) { if !reflect.DeepEqual(expectedFrames, dataFrames) {
@@ -240,7 +240,7 @@ func TestConvertResponses(t *testing.T) {
expectedFrames := data.Frames{} expectedFrames := data.Frames{}
httpResponse := &http.Response{StatusCode: 200, Body: io.NopCloser(strings.NewReader(body))} httpResponse := &http.Response{StatusCode: 200, Body: io.NopCloser(strings.NewReader(body))}
dataFrames, err := service.toDataFrames(httpResponse, refId, false, "") dataFrames, err := service.toDataFrames(httpResponse, refId, false)
require.NoError(t, err) require.NoError(t, err)
if !reflect.DeepEqual(expectedFrames, dataFrames) { if !reflect.DeepEqual(expectedFrames, dataFrames) {
@@ -281,7 +281,7 @@ func TestConvertResponses(t *testing.T) {
expectedFrames := data.Frames{expectedFrame} expectedFrames := data.Frames{expectedFrame}
httpResponse := &http.Response{StatusCode: 200, Body: io.NopCloser(strings.NewReader(body))} httpResponse := &http.Response{StatusCode: 200, Body: io.NopCloser(strings.NewReader(body))}
dataFrames, err := service.toDataFrames(httpResponse, refId, false, "target") dataFrames, err := service.toDataFrames(httpResponse, refId, false)
require.NoError(t, err) require.NoError(t, err)
if !reflect.DeepEqual(expectedFrames, dataFrames) { if !reflect.DeepEqual(expectedFrames, dataFrames) {
@@ -295,7 +295,7 @@ func TestConvertResponses(t *testing.T) {
body := ` body := `
[ [
{ {
"target": "alias(target)", "target": "aliasedTarget(target)",
"tags": { "name": "target", "fooTag": "fooValue", "barTag": "barValue", "int": 100, "float": 3.14 }, "tags": { "name": "target", "fooTag": "fooValue", "barTag": "barValue", "int": 100, "float": 3.14 },
"datapoints": [[50, 1], [null, 2], [100, 3]] "datapoints": [[50, 1], [null, 2], [100, 3]]
} }
@@ -310,13 +310,13 @@ func TestConvertResponses(t *testing.T) {
"barTag": "barValue", "barTag": "barValue",
"int": "100", "int": "100",
"float": "3.14", "float": "3.14",
"name": "alias(target)", "name": "aliasedTarget(target)",
}, []*float64{&a, nil, &b}).SetConfig(&data.FieldConfig{DisplayNameFromDS: "alias(target)"}), }, []*float64{&a, nil, &b}).SetConfig(&data.FieldConfig{DisplayNameFromDS: "aliasedTarget(target)"}),
).SetMeta(&data.FrameMeta{Type: data.FrameTypeTimeSeriesMulti}) ).SetMeta(&data.FrameMeta{Type: data.FrameTypeTimeSeriesMulti})
expectedFrames := data.Frames{expectedFrame} expectedFrames := data.Frames{expectedFrame}
httpResponse := &http.Response{StatusCode: 200, Body: io.NopCloser(strings.NewReader(body))} httpResponse := &http.Response{StatusCode: 200, Body: io.NopCloser(strings.NewReader(body))}
dataFrames, err := service.toDataFrames(httpResponse, refId, true, "alias(target)") dataFrames, err := service.toDataFrames(httpResponse, refId, true)
require.NoError(t, err) require.NoError(t, err)
if !reflect.DeepEqual(expectedFrames, dataFrames) { if !reflect.DeepEqual(expectedFrames, dataFrames) {
@@ -738,119 +738,6 @@ Error: Target not found
} }
} }
func TestAliasMatching(t *testing.T) {
service := &Service{
logger: backend.Logger,
}
testCases := []struct {
name string
target string
tagsName string
fromAlert bool
expectedLabelName string
}{
{
name: "alias() function sets name tag to target",
target: "alias(stats.counters.web.hits, 'Web Hits')",
tagsName: "stats.counters.web.hits",
fromAlert: false,
expectedLabelName: "alias(stats.counters.web.hits, 'Web Hits')",
},
{
name: "aliasByNode() function sets name tag to target",
target: "aliasByNode(stats.counters.web.hits, 2)",
tagsName: "stats.counters.web.hits",
fromAlert: false,
expectedLabelName: "aliasByNode(stats.counters.web.hits, 2)",
},
{
name: "aliasByMetric() function sets name tag to target",
target: "aliasByMetric(stats.counters.web.hits)",
tagsName: "stats.counters.web.hits",
fromAlert: false,
expectedLabelName: "aliasByMetric(stats.counters.web.hits)",
},
{
name: "aliasByTags() function sets name tag to target",
target: "aliasByTags(stats.counters.web.hits, 'host')",
tagsName: "stats.counters.web.hits",
fromAlert: false,
expectedLabelName: "aliasByTags(stats.counters.web.hits, 'host')",
},
{
name: "aliasSub() function sets name tag to target",
target: "aliasSub(stats.counters.web.hits, 'stats', 'metrics')",
tagsName: "stats.counters.web.hits",
fromAlert: false,
expectedLabelName: "aliasSub(stats.counters.web.hits, 'stats', 'metrics')",
},
{
name: "aliasQuery() function sets name tag to target",
target: "aliasQuery(stats.counters.web.hits, 'SELECT name FROM hosts')",
tagsName: "stats.counters.web.hits",
fromAlert: false,
expectedLabelName: "aliasQuery(stats.counters.web.hits, 'SELECT name FROM hosts')",
},
{
name: "no alias function keeps original name tag",
target: "stats.counters.web.hits",
tagsName: "stats.counters.web.hits",
fromAlert: false,
expectedLabelName: "stats.counters.web.hits",
},
{
name: "fromAlert overrides name tag even without alias",
target: "stats.counters.web.hits",
tagsName: "original.name",
fromAlert: true,
expectedLabelName: "stats.counters.web.hits",
},
{
name: "nested alias function matches",
target: "sumSeries(alias(stats.counters.*.hits, 'Hits'))",
tagsName: "stats.counters.web.hits",
fromAlert: false,
expectedLabelName: "sumSeries(alias(stats.counters.*.hits, 'Hits'))",
},
{
name: "alias in metric path should not match",
target: "stats.alias.web.hits",
tagsName: "stats.alias.web.hits",
fromAlert: false,
expectedLabelName: "stats.alias.web.hits",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
body := fmt.Sprintf(`[
{
"target": %q,
"tags": { "name": %q, "host": "server1" },
"datapoints": [[100, 1609459200]]
}
]`, tc.target, tc.tagsName)
httpResponse := &http.Response{StatusCode: 200, Body: io.NopCloser(strings.NewReader(body))}
dataFrames, err := service.toDataFrames(httpResponse, "A", tc.fromAlert, tc.target)
require.NoError(t, err)
require.Len(t, dataFrames, 1)
frame := dataFrames[0]
require.GreaterOrEqual(t, len(frame.Fields), 2)
valueField := frame.Fields[1]
require.NotNil(t, valueField.Labels)
actualName, ok := valueField.Labels["name"]
require.True(t, ok, "name label should exist")
assert.Equal(t, tc.expectedLabelName, actualName, "name label should match expected value")
})
}
}
func TestParseGraphiteError(t *testing.T) { func TestParseGraphiteError(t *testing.T) {
tests := []struct { tests := []struct {
name string name string