fix(util): don't use wall clock time for testing (#103924)
This commit is contained in:
committed by
GitHub
parent
383f043be8
commit
6c45cc9e2d
@@ -6,6 +6,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/benbjohnson/clock"
|
||||
"github.com/grafana/dskit/instrument"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
@@ -91,6 +92,9 @@ type DebouncerOpts[T comparable] struct {
|
||||
// for the same key keep arriving, we guarantee processing after MaxWait from the first event.
|
||||
MaxWait time.Duration
|
||||
Reg prometheus.Registerer
|
||||
|
||||
// clock can be used for testing to not having to relay on wall clock time.
|
||||
clock clock.Clock
|
||||
}
|
||||
|
||||
type Group[T comparable] struct {
|
||||
@@ -108,6 +112,8 @@ type Group[T comparable] struct {
|
||||
minWait time.Duration
|
||||
maxWait time.Duration
|
||||
metrics *metrics
|
||||
|
||||
clock clock.Clock
|
||||
}
|
||||
|
||||
// NewGroup creates a new debouncer group for processing events with unique keys.
|
||||
@@ -165,6 +171,10 @@ func NewGroup[T comparable](opts DebouncerOpts[T]) (*Group[T], error) {
|
||||
opts.ErrorHandler = func(_ T, _ error) {}
|
||||
}
|
||||
|
||||
if opts.clock == nil {
|
||||
opts.clock = clock.New()
|
||||
}
|
||||
|
||||
return &Group[T]{
|
||||
buffer: make(chan T, opts.BufferSize),
|
||||
debouncers: make(map[T]*debouncer[T]),
|
||||
@@ -173,6 +183,7 @@ func NewGroup[T comparable](opts DebouncerOpts[T]) (*Group[T], error) {
|
||||
minWait: opts.MinWait,
|
||||
maxWait: opts.MaxWait,
|
||||
metrics: newMetrics(opts.Reg, opts.Name),
|
||||
clock: opts.clock,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -217,7 +228,7 @@ func (g *Group[T]) processValue(key T) {
|
||||
g.debouncersMu.Lock()
|
||||
deb, ok := g.debouncers[key]
|
||||
if !ok {
|
||||
deb = newDebouncer[T](g.minWait, g.maxWait, key, func(v T) {
|
||||
deb = newDebouncer[T](g.minWait, g.maxWait, g.clock, key, func(v T) {
|
||||
g.processWithMetrics(g.ctx, v, g.processHandler)
|
||||
|
||||
g.debouncersMu.Lock()
|
||||
@@ -256,16 +267,18 @@ type debouncer[T comparable] struct {
|
||||
minWait time.Duration
|
||||
maxWait time.Duration
|
||||
processFunc func(T)
|
||||
clock clock.Clock
|
||||
}
|
||||
|
||||
// newDebouncer creates a new key debouncer.
|
||||
func newDebouncer[T comparable](minWait, maxWait time.Duration, key T, processFunc func(T)) *debouncer[T] {
|
||||
func newDebouncer[T comparable](minWait, maxWait time.Duration, clock clock.Clock, key T, processFunc func(T)) *debouncer[T] {
|
||||
deb := &debouncer[T]{
|
||||
key: key,
|
||||
resetChan: make(chan struct{}, 1),
|
||||
minWait: minWait,
|
||||
maxWait: maxWait,
|
||||
processFunc: processFunc,
|
||||
clock: clock,
|
||||
}
|
||||
return deb
|
||||
}
|
||||
@@ -285,8 +298,8 @@ func (d *debouncer[T]) reset() {
|
||||
// run manages the debouncing process for a specific key.
|
||||
func (d *debouncer[T]) run(ctx context.Context) {
|
||||
// Create timers after getting the first updateChan.
|
||||
minTimer := time.NewTimer(d.minWait)
|
||||
maxTimer := time.NewTimer(d.maxWait)
|
||||
minTimer := d.clock.Timer(d.minWait)
|
||||
maxTimer := d.clock.Timer(d.maxWait)
|
||||
defer func() {
|
||||
minTimer.Stop()
|
||||
maxTimer.Stop()
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/benbjohnson/clock"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
@@ -53,6 +54,7 @@ func TestDebouncer(t *testing.T) {
|
||||
|
||||
t.Run("should process values after max wait", func(t *testing.T) {
|
||||
processed := make(map[string]int, 1)
|
||||
clockMock := clock.NewMock()
|
||||
|
||||
group, err := NewGroup(DebouncerOpts[string]{
|
||||
BufferSize: 10,
|
||||
@@ -62,6 +64,7 @@ func TestDebouncer(t *testing.T) {
|
||||
},
|
||||
MinWait: 50 * time.Millisecond,
|
||||
MaxWait: 500 * time.Millisecond,
|
||||
clock: clockMock,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -70,20 +73,17 @@ func TestDebouncer(t *testing.T) {
|
||||
|
||||
group.Start(ctx)
|
||||
|
||||
ticker := time.NewTicker(time.Millisecond * 40)
|
||||
defer ticker.Stop()
|
||||
|
||||
start := time.Now()
|
||||
start := clockMock.Now()
|
||||
|
||||
for counter := 0; counter < 25; counter++ {
|
||||
<-ticker.C
|
||||
_ = group.Add("key1")
|
||||
clockMock.Add(time.Millisecond * 40)
|
||||
if processed["key1"] == 1 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
require.WithinDuration(t, start.Add(time.Millisecond*500), time.Now(), time.Millisecond*100)
|
||||
require.WithinDuration(t, start.Add(time.Millisecond*500), clockMock.Now(), time.Millisecond*100)
|
||||
})
|
||||
|
||||
t.Run("should handle buffer full", func(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user