diff --git a/conf/defaults.ini b/conf/defaults.ini index 22918d44339..bf0bf994b56 100644 --- a/conf/defaults.ini +++ b/conf/defaults.ini @@ -130,6 +130,13 @@ password = # Example: mysql://user:secret@host:port/database url = +# Set to true or false to enable or disable high availability mode. +# When it's set to false some functions will be simplified and only run in-process +# instead of relying on the database. +# +# Only set it to false if you run only a single instance of Grafana. +high_availability = true + # Max idle conn setting default is 2 max_idle_conn = 2 diff --git a/conf/sample.ini b/conf/sample.ini index e3b3c1f965d..da688f03078 100644 --- a/conf/sample.ini +++ b/conf/sample.ini @@ -129,6 +129,13 @@ # Example: mysql://user:secret@host:port/database ;url = +# Set to true or false to enable or disable high availability mode. +# When it's set to false some functions will be simplified and only run in-process +# instead of relying on the database. +# +# Only set it to false if you run only a single instance of Grafana. +;high_availability = true + # Max idle conn setting default is 2 ;max_idle_conn = 2 diff --git a/pkg/storage/unified/sql/backend.go b/pkg/storage/unified/sql/backend.go index 18b95019429..4c6d00addd4 100644 --- a/pkg/storage/unified/sql/backend.go +++ b/pkg/storage/unified/sql/backend.go @@ -39,6 +39,7 @@ type BackendOptions struct { Tracer trace.Tracer PollingInterval time.Duration WatchBufferSize int + IsHA bool } func NewBackend(opts BackendOptions) (Backend, error) { @@ -50,26 +51,29 @@ func NewBackend(opts BackendOptions) (Backend, error) { } ctx, cancel := context.WithCancel(context.Background()) - pollingInterval := opts.PollingInterval - if pollingInterval == 0 { - pollingInterval = defaultPollingInterval + if opts.PollingInterval == 0 { + opts.PollingInterval = defaultPollingInterval } if opts.WatchBufferSize == 0 { opts.WatchBufferSize = defaultWatchBufferSize } return &backend{ + isHA: opts.IsHA, done: ctx.Done(), cancel: cancel, log: log.New("sql-resource-server"), tracer: opts.Tracer, dbProvider: opts.DBProvider, - pollingInterval: pollingInterval, + pollingInterval: opts.PollingInterval, watchBufferSize: opts.WatchBufferSize, batchLock: &batchLock{running: make(map[string]bool)}, }, nil } type backend struct { + //general + isHA bool + // server lifecycle done <-chan struct{} cancel context.CancelFunc @@ -90,6 +94,7 @@ type backend struct { //stream chan *resource.WatchEvent pollingInterval time.Duration watchBufferSize int + notifier eventNotifier } func (b *backend) Init(ctx context.Context) error { @@ -112,6 +117,13 @@ func (b *backend) initLocked(ctx context.Context) error { return fmt.Errorf("no dialect for driver %q", driverName) } + // Initialize notifier after dialect is set up + notifier, err := newNotifier(b) + if err != nil { + return fmt.Errorf("failed to create notifier: %w", err) + } + b.notifier = notifier + return b.db.PingContext(ctx) } @@ -187,11 +199,11 @@ func (b *backend) create(ctx context.Context, event resource.WriteEvent) (int64, defer span.End() var newVersion int64 guid := uuid.New().String() + folder := "" + if event.Object != nil { + folder = event.Object.GetFolder() + } err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error { - folder := "" - if event.Object != nil { - folder = event.Object.GetFolder() - } // 1. Insert into resource if _, err := dbutil.Exec(ctx, tx, sqlResourceInsert, sqlResourceRequest{ SQLTemplate: sqltemplate.New(b.dialect), @@ -240,7 +252,21 @@ func (b *backend) create(ctx context.Context, event resource.WriteEvent) (int64, return nil }) - return newVersion, err + + if err != nil { + return 0, err + } + + b.notifier.send(ctx, &resource.WrittenEvent{ + Type: event.Type, + Key: event.Key, + PreviousRV: event.PreviousRV, + Value: event.Value, + ResourceVersion: newVersion, + Folder: folder, + }) + + return newVersion, nil } func (b *backend) update(ctx context.Context, event resource.WriteEvent) (int64, error) { @@ -248,11 +274,11 @@ func (b *backend) update(ctx context.Context, event resource.WriteEvent) (int64, defer span.End() var newVersion int64 guid := uuid.New().String() + folder := "" + if event.Object != nil { + folder = event.Object.GetFolder() + } err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error { - folder := "" - if event.Object != nil { - folder = event.Object.GetFolder() - } // 1. Update resource _, err := dbutil.Exec(ctx, tx, sqlResourceUpdate, sqlResourceRequest{ SQLTemplate: sqltemplate.New(b.dialect), @@ -303,7 +329,20 @@ func (b *backend) update(ctx context.Context, event resource.WriteEvent) (int64, return nil }) - return newVersion, err + if err != nil { + return 0, err + } + + b.notifier.send(ctx, &resource.WrittenEvent{ + Type: event.Type, + Key: event.Key, + PreviousRV: event.PreviousRV, + Value: event.Value, + ResourceVersion: newVersion, + Folder: folder, + }) + + return newVersion, nil } func (b *backend) delete(ctx context.Context, event resource.WriteEvent) (int64, error) { @@ -311,12 +350,11 @@ func (b *backend) delete(ctx context.Context, event resource.WriteEvent) (int64, defer span.End() var newVersion int64 guid := uuid.New().String() - + folder := "" + if event.Object != nil { + folder = event.Object.GetFolder() + } err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error { - folder := "" - if event.Object != nil { - folder = event.Object.GetFolder() - } // 1. delete from resource _, err := dbutil.Exec(ctx, tx, sqlResourceDelete, sqlResourceRequest{ SQLTemplate: sqltemplate.New(b.dialect), @@ -358,7 +396,20 @@ func (b *backend) delete(ctx context.Context, event resource.WriteEvent) (int64, return nil }) - return newVersion, err + if err != nil { + return 0, err + } + + b.notifier.send(ctx, &resource.WrittenEvent{ + Type: event.Type, + Key: event.Key, + PreviousRV: event.PreviousRV, + Value: event.Value, + ResourceVersion: newVersion, + Folder: folder, + }) + + return newVersion, nil } func (b *backend) restore(ctx context.Context, event resource.WriteEvent) (int64, error) { @@ -366,12 +417,11 @@ func (b *backend) restore(ctx context.Context, event resource.WriteEvent) (int64 defer span.End() var newVersion int64 guid := uuid.New().String() + folder := "" + if event.Object != nil { + folder = event.Object.GetFolder() + } err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error { - folder := "" - if event.Object != nil { - folder = event.Object.GetFolder() - } - // 1. Re-create resource // Note: we may want to replace the write event with a create event, tbd. if _, err := dbutil.Exec(ctx, tx, sqlResourceInsert, sqlResourceRequest{ @@ -435,7 +485,20 @@ func (b *backend) restore(ctx context.Context, event resource.WriteEvent) (int64 return nil }) - return newVersion, err + if err != nil { + return 0, err + } + + b.notifier.send(ctx, &resource.WrittenEvent{ + Type: event.Type, + Key: event.Key, + PreviousRV: event.PreviousRV, + Value: event.Value, + ResourceVersion: newVersion, + Folder: folder, + }) + + return newVersion, nil } func (b *backend) ReadResource(ctx context.Context, req *resource.ReadRequest) *resource.BackendReadResponse { @@ -707,68 +770,7 @@ func (b *backend) getHistory(ctx context.Context, req *resource.ListRequest, cb } func (b *backend) WatchWriteEvents(ctx context.Context) (<-chan *resource.WrittenEvent, error) { - // Get the latest RV - since, err := b.listLatestRVs(ctx) - if err != nil { - return nil, fmt.Errorf("watch, get latest resource version: %w", err) - } - // Start the poller - stream := make(chan *resource.WrittenEvent, b.watchBufferSize) - go b.poller(ctx, since, stream) - return stream, nil -} - -func (b *backend) poller(ctx context.Context, since groupResourceRV, stream chan<- *resource.WrittenEvent) { - t := time.NewTicker(b.pollingInterval) - defer close(stream) - defer t.Stop() - isSQLite := b.dialect.DialectName() == "sqlite" - - for { - select { - case <-b.done: - return - case <-t.C: - // Block polling duffing import to avoid database locked issues - if isSQLite && b.batchLock.Active() { - continue - } - - ctx, span := b.tracer.Start(ctx, tracePrefix+"poller") - // List the latest RVs - grv, err := b.listLatestRVs(ctx) - if err != nil { - b.log.Error("poller get latest resource version", "err", err) - t.Reset(b.pollingInterval) - continue - } - for group, items := range grv { - for resource := range items { - // If we haven't seen this resource before, we start from 0 - if _, ok := since[group]; !ok { - since[group] = make(map[string]int64) - } - if _, ok := since[group][resource]; !ok { - since[group][resource] = 0 - } - - // Poll for new events - next, err := b.poll(ctx, group, resource, since[group][resource], stream) - if err != nil { - b.log.Error("polling for resource", "err", err) - t.Reset(b.pollingInterval) - continue - } - if next > since[group][resource] { - since[group][resource] = next - } - } - } - - t.Reset(b.pollingInterval) - span.End() - } - } + return b.notifier.notify(ctx) } // listLatestRVs returns the latest resource version for each (Group, Resource) pair. @@ -817,59 +819,6 @@ func fetchLatestRV(ctx context.Context, x db.ContextExecer, d sqltemplate.Dialec return res.ResourceVersion, nil } -func (b *backend) poll(ctx context.Context, grp string, res string, since int64, stream chan<- *resource.WrittenEvent) (int64, error) { - ctx, span := b.tracer.Start(ctx, tracePrefix+"poll") - defer span.End() - - start := time.Now() - var records []*historyPollResponse - err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error { - var err error - records, err = dbutil.Query(ctx, tx, sqlResourceHistoryPoll, &sqlResourceHistoryPollRequest{ - SQLTemplate: sqltemplate.New(b.dialect), - Resource: res, - Group: grp, - SinceResourceVersion: since, - Response: &historyPollResponse{}, - }) - return err - }) - if err != nil { - return 0, fmt.Errorf("poll history: %w", err) - } - end := time.Now() - resource.NewStorageMetrics().PollerLatency.Observe(end.Sub(start).Seconds()) - - var nextRV int64 - for _, rec := range records { - if rec.Key.Group == "" || rec.Key.Resource == "" || rec.Key.Name == "" { - return nextRV, fmt.Errorf("missing key in response") - } - nextRV = rec.ResourceVersion - prevRV := rec.PreviousRV - if prevRV == nil { - prevRV = new(int64) - } - stream <- &resource.WrittenEvent{ - Value: rec.Value, - Key: &resource.ResourceKey{ - Namespace: rec.Key.Namespace, - Group: rec.Key.Group, - Resource: rec.Key.Resource, - Name: rec.Key.Name, - }, - Type: resource.WatchEvent_Type(rec.Action), - PreviousRV: *prevRV, - Folder: rec.Folder, - ResourceVersion: rec.ResourceVersion, - // Timestamp: , // TODO: add timestamp - } - b.log.Debug("poller sent event to stream", "namespace", rec.Key.Namespace, "group", rec.Key.Group, "resource", rec.Key.Resource, "name", rec.Key.Name, "action", rec.Action, "rv", rec.ResourceVersion) - } - - return nextRV, nil -} - // resourceVersionAtomicInc atomically increases the version of a kind within a transaction. // TODO: Ideally we should attempt to update the RV in the resource and resource_history tables // in a single roundtrip. This would reduce the latency of the operation, and also increase the diff --git a/pkg/storage/unified/sql/notifier.go b/pkg/storage/unified/sql/notifier.go new file mode 100644 index 00000000000..a57ea6b4ec7 --- /dev/null +++ b/pkg/storage/unified/sql/notifier.go @@ -0,0 +1,119 @@ +package sql + +import ( + "context" + "sync" + + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/storage/unified/resource" + "github.com/grafana/grafana/pkg/storage/unified/sql/db" + "github.com/grafana/grafana/pkg/storage/unified/sql/dbutil" + "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" +) + +type eventNotifier interface { + notify(ctx context.Context) (<-chan *resource.WrittenEvent, error) + // send will forward an event to all subscribers who want to be notified. + // + // Note: depending on the implementation, send might be noop and new events + // will be fetched from an external source. + send(ctx context.Context, event *resource.WrittenEvent) + close() +} + +func newNotifier(b *backend) (eventNotifier, error) { + if b.isHA { + b.log.Info("Using polling notifier") + notifier, err := newPollingNotifier(&pollingNotifierConfig{ + pollingInterval: b.pollingInterval, + watchBufferSize: b.watchBufferSize, + log: b.log, + tracer: b.tracer, + batchLock: b.batchLock, + listLatestRVs: b.listLatestRVs, + historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) { + var records []*historyPollResponse + err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error { + var err error + records, err = dbutil.Query(ctx, tx, sqlResourceHistoryPoll, &sqlResourceHistoryPollRequest{ + SQLTemplate: sqltemplate.New(b.dialect), + Resource: res, + Group: grp, + SinceResourceVersion: since, + Response: &historyPollResponse{}, + }) + return err + }) + return records, err + }, + done: b.done, + dialect: b.dialect, + }) + if err != nil { + return nil, err + } + return notifier, nil + } + + b.log.Info("Using channel notifier") + return newChannelNotifier(b.watchBufferSize, b.log), nil +} + +type channelNotifier struct { + log log.Logger + bufferSize int + + mu sync.RWMutex + subscribers map[chan *resource.WrittenEvent]bool +} + +func newChannelNotifier(bufferSize int, log log.Logger) *channelNotifier { + return &channelNotifier{ + subscribers: make(map[chan *resource.WrittenEvent]bool), + log: log, + bufferSize: bufferSize, + } +} + +func (n *channelNotifier) notify(ctx context.Context) (<-chan *resource.WrittenEvent, error) { + events := make(chan *resource.WrittenEvent, n.bufferSize) + + n.mu.Lock() + n.subscribers[events] = true + n.mu.Unlock() + + go func() { + <-ctx.Done() + n.mu.Lock() + if n.subscribers[events] { + delete(n.subscribers, events) + close(events) + } + n.mu.Unlock() + }() + + return events, nil +} + +func (n *channelNotifier) send(_ context.Context, event *resource.WrittenEvent) { + n.mu.RLock() + defer n.mu.RUnlock() + + for ch := range n.subscribers { + select { + case ch <- event: + default: + n.log.Warn("Dropped event notification for subscriber - channel full") + } + } +} + +func (n *channelNotifier) close() { + n.mu.Lock() + defer n.mu.Unlock() + + for ch := range n.subscribers { + close(ch) + } + n.subscribers = make(map[chan *resource.WrittenEvent]bool) +} diff --git a/pkg/storage/unified/sql/notifier_sql.go b/pkg/storage/unified/sql/notifier_sql.go new file mode 100644 index 00000000000..a4d0a008fcc --- /dev/null +++ b/pkg/storage/unified/sql/notifier_sql.go @@ -0,0 +1,216 @@ +package sql + +import ( + "context" + "fmt" + "time" + + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/storage/unified/resource" + "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" + "go.opentelemetry.io/otel/trace" +) + +var ( + // Validation errors. + errHistoryPollRequired = fmt.Errorf("historyPoll is required") + errListLatestRVsRequired = fmt.Errorf("listLatestRVs is required") + errBatchLockRequired = fmt.Errorf("batchLock is required") + errTracerRequired = fmt.Errorf("tracer is required") + errLogRequired = fmt.Errorf("log is required") + errInvalidWatchBufferSize = fmt.Errorf("watchBufferSize must be greater than 0") + errInvalidPollingInterval = fmt.Errorf("pollingInterval must be greater than 0") + errDoneRequired = fmt.Errorf("done is required") + errDialectRequired = fmt.Errorf("dialect is required") +) + +// pollingNotifier is a notifier that polls the database for new events. +type pollingNotifier struct { + dialect sqltemplate.Dialect + pollingInterval time.Duration + watchBufferSize int + + log log.Logger + tracer trace.Tracer + + batchLock *batchLock + listLatestRVs func(ctx context.Context) (groupResourceRV, error) + historyPoll func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) + + done <-chan struct{} +} + +type pollingNotifierConfig struct { + dialect sqltemplate.Dialect + pollingInterval time.Duration + watchBufferSize int + + log log.Logger + tracer trace.Tracer + + batchLock *batchLock + listLatestRVs func(ctx context.Context) (groupResourceRV, error) + historyPoll func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) + + done <-chan struct{} +} + +func (cfg *pollingNotifierConfig) validate() error { + if cfg.historyPoll == nil { + return errHistoryPollRequired + } + if cfg.listLatestRVs == nil { + return errListLatestRVsRequired + } + if cfg.batchLock == nil { + return errBatchLockRequired + } + if cfg.tracer == nil { + return errTracerRequired + } + if cfg.log == nil { + return errLogRequired + } + if cfg.watchBufferSize <= 0 { + return errInvalidWatchBufferSize + } + if cfg.pollingInterval <= 0 { + return errInvalidPollingInterval + } + if cfg.done == nil { + return errDoneRequired + } + if cfg.dialect == nil { + return errDialectRequired + } + return nil +} + +func newPollingNotifier(cfg *pollingNotifierConfig) (*pollingNotifier, error) { + if err := cfg.validate(); err != nil { + return nil, fmt.Errorf("invalid polling notifier config: %w", err) + } + return &pollingNotifier{ + dialect: cfg.dialect, + pollingInterval: cfg.pollingInterval, + watchBufferSize: cfg.watchBufferSize, + log: cfg.log, + tracer: cfg.tracer, + batchLock: cfg.batchLock, + listLatestRVs: cfg.listLatestRVs, + historyPoll: cfg.historyPoll, + done: cfg.done, + }, nil +} + +func (p *pollingNotifier) notify(ctx context.Context) (<-chan *resource.WrittenEvent, error) { + since, err := p.listLatestRVs(ctx) + if err != nil { + return nil, fmt.Errorf("watch, get latest resource version: %w", err) + } + stream := make(chan *resource.WrittenEvent, p.watchBufferSize) + go p.poller(ctx, since, stream) + return stream, nil +} + +func (p *pollingNotifier) poller(ctx context.Context, since groupResourceRV, stream chan<- *resource.WrittenEvent) { + t := time.NewTicker(p.pollingInterval) + defer close(stream) + defer t.Stop() + + for { + select { + case <-p.done: + return + case <-t.C: + ctx, span := p.tracer.Start(ctx, tracePrefix+"poller") + // List the latest RVs to see if any of those are not have been seen before. + grv, err := p.listLatestRVs(ctx) + if err != nil { + p.log.Error("poller get latest resource version", "err", err) + t.Reset(p.pollingInterval) + continue + } + for group, items := range grv { + for resource := range items { + // If we haven't seen this resource before, we start from 0. + if _, ok := since[group]; !ok { + since[group] = make(map[string]int64) + } + if _, ok := since[group][resource]; !ok { + since[group][resource] = 0 + } + + // Poll for new events. + next, err := p.poll(ctx, group, resource, since[group][resource], stream) + if err != nil { + p.log.Error("polling for resource", "err", err) + t.Reset(p.pollingInterval) + continue + } + if next > since[group][resource] { + since[group][resource] = next + } + } + } + + t.Reset(p.pollingInterval) + span.End() + } + } +} + +func (p *pollingNotifier) poll(ctx context.Context, grp string, res string, since int64, stream chan<- *resource.WrittenEvent) (int64, error) { + ctx, span := p.tracer.Start(ctx, tracePrefix+"poll") + defer span.End() + + start := time.Now() + records, err := p.historyPoll(ctx, grp, res, since) + if err != nil { + return 0, fmt.Errorf("poll history: %w", err) + } + resource.NewStorageMetrics().PollerLatency.Observe(time.Since(start).Seconds()) + + var nextRV int64 + for _, rec := range records { + if rec.Key.Group == "" || rec.Key.Resource == "" || rec.Key.Name == "" { + return nextRV, fmt.Errorf("missing key in response") + } + nextRV = rec.ResourceVersion + prevRV := rec.PreviousRV + if prevRV == nil { + prevRV = new(int64) + } + stream <- &resource.WrittenEvent{ + Value: rec.Value, + Key: &resource.ResourceKey{ + Namespace: rec.Key.Namespace, + Group: rec.Key.Group, + Resource: rec.Key.Resource, + Name: rec.Key.Name, + }, + Type: resource.WatchEvent_Type(rec.Action), + PreviousRV: *prevRV, + Folder: rec.Folder, + ResourceVersion: rec.ResourceVersion, + // Timestamp: , // TODO: add timestamp + } + p.log.Debug("poller sent event to stream", + "namespace", rec.Key.Namespace, + "group", rec.Key.Group, + "resource", rec.Key.Resource, + "name", rec.Key.Name, + "action", rec.Action, + "rv", rec.ResourceVersion) + } + + return nextRV, nil +} + +func (p *pollingNotifier) send(_ context.Context, _ *resource.WrittenEvent) { + // No-op for polling strategy - changes are detected via polling. +} + +func (p *pollingNotifier) close() { + // No-op for polling strategy. +} diff --git a/pkg/storage/unified/sql/notifier_sql_test.go b/pkg/storage/unified/sql/notifier_sql_test.go new file mode 100644 index 00000000000..693d63ea485 --- /dev/null +++ b/pkg/storage/unified/sql/notifier_sql_test.go @@ -0,0 +1,360 @@ +package sql + +import ( + "context" + "testing" + "time" + + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/storage/unified/resource" + "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace/noop" +) + +func TestPollingNotifierConfig(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + config *pollingNotifierConfig + expectedErr error + }{ + { + name: "valid config", + config: &pollingNotifierConfig{ + historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) { + return nil, nil + }, + listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil }, + batchLock: &batchLock{}, + tracer: noop.NewTracerProvider().Tracer("test"), + log: log.NewNopLogger(), + watchBufferSize: 10, + pollingInterval: time.Second, + done: make(chan struct{}), + dialect: sqltemplate.SQLite, + }, + expectedErr: nil, + }, + { + name: "missing historyPoll", + config: &pollingNotifierConfig{ + listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil }, + batchLock: &batchLock{}, + tracer: noop.NewTracerProvider().Tracer("test"), + log: log.NewNopLogger(), + watchBufferSize: 10, + pollingInterval: time.Second, + done: make(chan struct{}), + dialect: sqltemplate.SQLite, + }, + expectedErr: errHistoryPollRequired, + }, + { + name: "missing listLatestRVs", + config: &pollingNotifierConfig{ + historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) { + return nil, nil + }, + batchLock: &batchLock{}, + tracer: noop.NewTracerProvider().Tracer("test"), + log: log.NewNopLogger(), + watchBufferSize: 10, + pollingInterval: time.Second, + done: make(chan struct{}), + dialect: sqltemplate.SQLite, + }, + expectedErr: errListLatestRVsRequired, + }, + { + name: "missing batchLock", + config: &pollingNotifierConfig{ + historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) { + return nil, nil + }, + listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil }, + tracer: noop.NewTracerProvider().Tracer("test"), + log: log.NewNopLogger(), + watchBufferSize: 10, + pollingInterval: time.Second, + done: make(chan struct{}), + dialect: sqltemplate.SQLite, + }, + expectedErr: errBatchLockRequired, + }, + { + name: "missing tracer", + config: &pollingNotifierConfig{ + historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) { + return nil, nil + }, + listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil }, + batchLock: &batchLock{}, + log: log.NewNopLogger(), + watchBufferSize: 10, + pollingInterval: time.Second, + done: make(chan struct{}), + dialect: sqltemplate.SQLite, + }, + expectedErr: errTracerRequired, + }, + { + name: "missing logger", + config: &pollingNotifierConfig{ + historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) { + return nil, nil + }, + listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil }, + batchLock: &batchLock{}, + tracer: noop.NewTracerProvider().Tracer("test"), + watchBufferSize: 10, + pollingInterval: time.Second, + done: make(chan struct{}), + dialect: sqltemplate.SQLite, + }, + expectedErr: errLogRequired, + }, + { + name: "invalid watch buffer size", + config: &pollingNotifierConfig{ + historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) { + return nil, nil + }, + listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil }, + batchLock: &batchLock{}, + tracer: noop.NewTracerProvider().Tracer("test"), + log: log.NewNopLogger(), + watchBufferSize: 0, + pollingInterval: time.Second, + done: make(chan struct{}), + dialect: sqltemplate.SQLite, + }, + expectedErr: errInvalidWatchBufferSize, + }, + { + name: "invalid polling interval", + config: &pollingNotifierConfig{ + historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) { + return nil, nil + }, + listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil }, + batchLock: &batchLock{}, + tracer: noop.NewTracerProvider().Tracer("test"), + log: log.NewNopLogger(), + watchBufferSize: 10, + pollingInterval: 0, + done: make(chan struct{}), + dialect: sqltemplate.SQLite, + }, + expectedErr: errInvalidPollingInterval, + }, + { + name: "missing done channel", + config: &pollingNotifierConfig{ + historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) { + return nil, nil + }, + listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil }, + batchLock: &batchLock{}, + tracer: noop.NewTracerProvider().Tracer("test"), + log: log.NewNopLogger(), + watchBufferSize: 10, + pollingInterval: time.Second, + dialect: sqltemplate.SQLite, + }, + expectedErr: errDoneRequired, + }, + { + name: "missing dialect", + config: &pollingNotifierConfig{ + historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) { + return nil, nil + }, + listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil }, + batchLock: &batchLock{}, + tracer: noop.NewTracerProvider().Tracer("test"), + log: log.NewNopLogger(), + watchBufferSize: 10, + pollingInterval: time.Second, + done: make(chan struct{}), + }, + expectedErr: errDialectRequired, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + err := tt.config.validate() + if tt.expectedErr != nil { + require.ErrorIs(t, err, tt.expectedErr) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestPollingNotifier(t *testing.T) { + t.Parallel() + + t.Run("notify returns channel and starts polling", func(t *testing.T) { + t.Parallel() + + done := make(chan struct{}) + defer close(done) + + testEvent := &historyPollResponse{ + Key: resource.ResourceKey{ + Namespace: "test-ns", + Group: "test-group", + Resource: "test-resource", + Name: "test-name", + }, + ResourceVersion: 2, + Folder: "test-folder", + Value: []byte(`{"test": "data"}`), + Action: 1, + } + + var latestRVsCalled bool + listLatestRVs := func(ctx context.Context) (groupResourceRV, error) { + latestRVsCalled = true + return groupResourceRV{ + "test-group": map[string]int64{ + "test-resource": 0, + }, + }, nil + } + + var historyPollCalled bool + historyPoll := func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) { + historyPollCalled = true + require.Equal(t, "test-group", grp) + require.Equal(t, "test-resource", res) + require.Equal(t, int64(0), since) + return []*historyPollResponse{testEvent}, nil + } + + cfg := &pollingNotifierConfig{ + dialect: sqltemplate.SQLite, + pollingInterval: 10 * time.Millisecond, + watchBufferSize: 10, + log: log.NewNopLogger(), + tracer: noop.NewTracerProvider().Tracer("test"), + batchLock: &batchLock{}, + listLatestRVs: listLatestRVs, + historyPoll: historyPoll, + done: done, + } + + notifier, err := newPollingNotifier(cfg) + require.NoError(t, err) + require.NotNil(t, notifier) + + events, err := notifier.notify(context.Background()) + require.NoError(t, err) + require.NotNil(t, events) + + select { + case event := <-events: + require.NotNil(t, event) + require.Equal(t, "test-ns", event.Key.Namespace) + require.Equal(t, "test-group", event.Key.Group) + require.Equal(t, "test-resource", event.Key.Resource) + require.Equal(t, "test-name", event.Key.Name) + require.Equal(t, int64(2), event.ResourceVersion) + require.Equal(t, "test-folder", event.Folder) + require.True(t, latestRVsCalled, "listLatestRVs should be called") + require.True(t, historyPollCalled, "historyPoll should be called") + case <-time.After(100 * time.Millisecond): + t.Fatal("timeout waiting for event") + } + }) + + t.Run("handles polling errors gracefully", func(t *testing.T) { + t.Parallel() + + done := make(chan struct{}) + defer close(done) + + listLatestRVs := func(ctx context.Context) (groupResourceRV, error) { + return groupResourceRV{ + "test-group": map[string]int64{ + "test-resource": 0, + }, + }, nil + } + + historyPoll := func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) { + return nil, errTest + } + + cfg := &pollingNotifierConfig{ + dialect: sqltemplate.SQLite, + pollingInterval: 10 * time.Millisecond, + watchBufferSize: 10, + log: log.NewNopLogger(), + tracer: noop.NewTracerProvider().Tracer("test"), + batchLock: &batchLock{}, + listLatestRVs: listLatestRVs, + historyPoll: historyPoll, + done: done, + } + + notifier, err := newPollingNotifier(cfg) + require.NoError(t, err) + require.NotNil(t, notifier) + + events, err := notifier.notify(context.Background()) + require.NoError(t, err) + require.NotNil(t, events) + + // Verify channel remains open despite error + select { + case _, ok := <-events: + require.True(t, ok, "channel should remain open") + case <-time.After(50 * time.Millisecond): + // Expected - no events due to error + } + }) + + t.Run("stops polling when done channel is closed", func(t *testing.T) { + t.Parallel() + + done := make(chan struct{}) + + cfg := &pollingNotifierConfig{ + dialect: sqltemplate.SQLite, + pollingInterval: 10 * time.Millisecond, + watchBufferSize: 10, + log: log.NewNopLogger(), + tracer: noop.NewTracerProvider().Tracer("test"), + batchLock: &batchLock{}, + listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil }, + historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) { + return nil, nil + }, + done: done, + } + + notifier, err := newPollingNotifier(cfg) + require.NoError(t, err) + require.NotNil(t, notifier) + + events, err := notifier.notify(context.Background()) + require.NoError(t, err) + require.NotNil(t, events) + + close(done) + + select { + case _, ok := <-events: + require.False(t, ok, "events channel should be closed") + case <-time.After(50 * time.Millisecond): + t.Fatal("timeout waiting for events channel to close") + } + }) +} diff --git a/pkg/storage/unified/sql/notifier_test.go b/pkg/storage/unified/sql/notifier_test.go new file mode 100644 index 00000000000..b2ce2459503 --- /dev/null +++ b/pkg/storage/unified/sql/notifier_test.go @@ -0,0 +1,71 @@ +package sql + +import ( + "context" + "testing" + "time" + + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/storage/unified/resource" + "github.com/stretchr/testify/require" +) + +func TestChannelNotifier(t *testing.T) { + t.Run("should notify subscribers of events", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + n := newChannelNotifier(5, log.NewNopLogger()) + + events, err := n.notify(ctx) + require.NoError(t, err) + + testEvent := &resource.WrittenEvent{ + Type: resource.WatchEvent_ADDED, + Key: &resource.ResourceKey{ + Group: "test", + Resource: "test", + Name: "test1", + Namespace: "test", + }, + ResourceVersion: 1, + } + n.send(ctx, testEvent) + + select { + case event := <-events: + require.Equal(t, testEvent, event) + case <-ctx.Done(): + t.Fatal("timeout waiting for event") + } + }) + + t.Run("should drop events when buffer is full", func(t *testing.T) { + bufferSize := 2 + n := newChannelNotifier(bufferSize, log.NewNopLogger()) + + events, err := n.notify(context.Background()) + require.NoError(t, err) + + for i := 0; i < bufferSize+1; i++ { + n.send(context.Background(), &resource.WrittenEvent{ + ResourceVersion: int64(i), + }) + } + + require.Equal(t, bufferSize, len(events)) + }) + + t.Run("should close subscriber channels when context cancelled", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + n := newChannelNotifier(5, log.NewNopLogger()) + + events, err := n.notify(ctx) + require.NoError(t, err) + + cancel() + + _, ok := <-events + require.False(t, ok, "channel should be closed") + }) +} diff --git a/pkg/storage/unified/sql/server.go b/pkg/storage/unified/sql/server.go index 9fc287b2f73..a9778d0c42d 100644 --- a/pkg/storage/unified/sql/server.go +++ b/pkg/storage/unified/sql/server.go @@ -10,6 +10,7 @@ import ( infraDB "github.com/grafana/grafana/pkg/infra/db" "github.com/grafana/grafana/pkg/infra/tracing" + "github.com/grafana/grafana/pkg/services/sqlstore/migrator" "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/storage/unified/resource" "github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl" @@ -43,7 +44,17 @@ func NewResourceServer(db infraDB.DB, cfg *setting.Cfg, if err != nil { return nil, err } - store, err := NewBackend(BackendOptions{DBProvider: eDB, Tracer: tracer}) + + dbCfg := cfg.SectionWithEnvOverrides("database") + // Check in the config if HA is enabled by default we always assume a HA setup. + isHA := dbCfg.Key("high_availability").MustBool(true) + // SQLite is not possible to run in HA, so we set it to false. + databaseType := dbCfg.Key("type").MustString(migrator.SQLite) + if databaseType == migrator.SQLite { + isHA = false + } + + store, err := NewBackend(BackendOptions{DBProvider: eDB, Tracer: tracer, IsHA: isHA}) if err != nil { return nil, err } diff --git a/pkg/storage/unified/sql/test/integration_test.go b/pkg/storage/unified/sql/test/integration_test.go index 92c644d48aa..28699e8ce9f 100644 --- a/pkg/storage/unified/sql/test/integration_test.go +++ b/pkg/storage/unified/sql/test/integration_test.go @@ -39,6 +39,24 @@ func TestIntegrationSQLStorageBackend(t *testing.T) { backend, err := sql.NewBackend(sql.BackendOptions{ DBProvider: eDB, + IsHA: true, + }) + require.NoError(t, err) + require.NotNil(t, backend) + err = backend.Init(testutil.NewDefaultTestContext(t)) + require.NoError(t, err) + return backend + }) + // Run single instance tests with in-process notifier. + unitest.RunStorageBackendTest(t, func(ctx context.Context) resource.StorageBackend { + dbstore := infraDB.InitTestDB(t) + eDB, err := dbimpl.ProvideResourceDB(dbstore, setting.NewCfg(), nil) + require.NoError(t, err) + require.NotNil(t, eDB) + + backend, err := sql.NewBackend(sql.BackendOptions{ + DBProvider: eDB, + IsHA: false, }) require.NoError(t, err) require.NotNil(t, backend)