Unified Storage: added pkg/util/ring package to handle queueing of notifications (#84657)

* added pkg/util/rinq package to handle queueing of notifications

* fix linters

* Fix typo in comment

Co-authored-by: Dan Cech <dcech@grafana.com>

* improve allocation strategy for Enqueue; remove unnecessary clearing of slice

* Update pkg/util/ringq/dyn_chan_bench_test.go

Co-authored-by: Dan Cech <dcech@grafana.com>

* Update pkg/util/ringq/ringq.go

Co-authored-by: Dan Cech <dcech@grafana.com>

* refactor to move stats and shrinking into Ring

* add missing error assertions in tests

* add missing error assertions in tests and linting issues

* simplify controller closed check

* improve encapsulation of internal state in Ring

* use (*Ring).Len for clarity instead of stats

---------

Co-authored-by: Dan Cech <dcech@grafana.com>
This commit is contained in:
Diego Augusto Molina 2024-04-11 19:32:31 -03:00 committed by GitHub
parent eb86fd867f
commit e6ead667b3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 1457 additions and 0 deletions

View File

@ -0,0 +1,285 @@
package ring
import (
"context"
"errors"
"sync"
)
// Package level named errors.
var (
ErrAdaptiveChanClosed = errors.New("closed AdaptiveChan")
ErrAdaptiveChanControllerClosed = errors.New("closed AdaptiveChanController")
)
// AdaptiveChan provides a queueing system based on a send-only, a receive-only,
// and an internal ring buffer queue backed by a *Ring. It also provides an
// AdaptiveChanController to provide stats and some control on the internal
// *Ring. Termination is controlled by closing the returned send-only channel.
// After doing so, the receive-only channel will have the chance to receive all
// the items still in the queue and will be immediately closed afterwards. Once
// both channels are closed, the AdaptiveChanController will no longer be usable
// and will only return ErrAdaptiveChanClosed for all its methods. Leaving the
// growth and shrinkage of the internal *Ring apart, which can be controlled
// with the AdaptiveChanController, the implementation is allocation free.
//
// The implementation explicitly returns two channels and a struct, instead of
// just one struct that has the channels, to make a clear statement about the
// intended usage pattern:
//
// 1. Create an adaptive channel.
// 2. Provide the send-only channel to your producer(s). They are responsible
// for closing this channel when they're done. If more than one goroutine
// will have access to this channel, then it's the producer's responsibility
// to coordinate the channel close operation.
// 3. Provide the receive-only channel to your consumer(s), and let them
// receive from it with the two return value syntax for channels in order to
// check for termination from the sending side.
// 4. Use the AdaptiveChanController to control the internal buffer behaviour
// and to monitor stats. This should typically be held by the creator of the
// adaptive channel. Refrain from holding a reference to the send-only
// channel to force termination of the producing side. Instead, provide a
// side mechanism to communicate the intention of terminating the sending
// side, e.g. providing your producer(s) with a context as well as the
// send-only channel. An adaptive channel is meant as a queueing system, not
// as a coordination mechanism for producer(s), consumer(s) and
// controller(s).
//
// This pattern is designed to maximize decoupling while providing insights and
// granular control on memory usage. While the controller is not meant to make
// any direct changes to the queued data, the Clear method provides the
// opportunity to discard all queued items as an administrative measure. This
// doesn't terminate the queue, though, i.e. it doesn't close the send-only
// channel.
func AdaptiveChan[T any]() (send chan<- T, recv <-chan T, ctrl *AdaptiveChanController) {
internalSend := make(chan T)
internalRecv := make(chan T)
statsChan := make(chan AdaptiveChanStats)
cmdChan := make(chan acCmd)
ctrl = &AdaptiveChanController{
statsChan: statsChan,
cmdChan: cmdChan,
}
go func() {
defer close(internalRecv)
defer close(statsChan)
var q Ring[T]
var stats AdaptiveChanStats
// the loop condition is that we either have items to dequeue or that we
// have the possibility to receive new items to be queued
for q.Len() > 0 || internalSend != nil {
// NOTE: the overhead of writing stats in each iteration is
// negligible. I tried a two phase stats writing with a chan
// struct{} to get notified that the controller wanted stats, then
// updating the stats and finally writing to statsChan. There was no
// observable difference for just enqueueing and dequeueing after
// running the benchmarks several times, and reading stats got worse
// by ~22%
q.WriteStats(&stats.RingStats)
// if we don't have anything in the queue, then make the dequeueing
// branch of the select block indefinitely by providing a nil
// channel, leaving only the queueing branch available
dequeueChan := internalRecv
if q.Len() == 0 {
dequeueChan = nil
}
select {
case v, ok := <-internalSend: // blocks until something is queued
if !ok {
// in was closed, so if we leave it like that the next
// iteration will keep receiving zero values with ok=false
// without any blocking. So we set in to nil, so that
// the next iteration the select will block indefinitely on
// this branch of the select and leave only the dequeing
// branch active until all items have been dequeued
internalSend = nil
} else {
q.Enqueue(v)
}
case dequeueChan <- q.Peek(): // blocks if nothing to dequeue
// we don't want to call Dequeue in the `case` above since that
// would consume the item and it would be lost if the queueing
// branch was selected, so instead we Peek in the `case` and we
// do the actual dequeueing here once we know this branch was
// selected
q.Dequeue()
case statsChan <- stats:
// stats reading
stats.StatsRead++
case cmd, ok := <-cmdChan:
if !ok {
// AdaptiveChanController was closed. Set cmdChan to nil so
// this branch blocks in the next iteration
cmdChan = nil
continue
}
// execute a command on the internal *Ring
switch cmd.acCmdType {
case acCmdMin:
q.Min = cmd.intValue
stats.Min = cmd.intValue
case acCmdMax:
q.Max = cmd.intValue
stats.Max = cmd.intValue
case acCmdGrow:
q.Grow(cmd.intValue)
case acCmdShrink:
q.Shrink(cmd.intValue)
case acCmdClear:
q.Clear()
}
}
}
}()
return internalSend, internalRecv, ctrl
}
type acCmdType uint8
const (
acCmdMin = iota
acCmdMax
acCmdClear
acCmdGrow
acCmdShrink
)
type acCmd struct {
acCmdType
intValue int
}
// AdaptiveChanController provides access to an AdaptiveChan's internal *Ring.
type AdaptiveChanController struct {
statsChan <-chan AdaptiveChanStats
cmdChan chan<- acCmd
cmdChanMu sync.Mutex
}
// Close releases resources associated with this controller. After calling this
// method, all other methods will return ErrAdaptiveChanControllerClosed. It is
// idempotent. This doesn't affect the queue itself, but rather prevents further
// administrative tasks to be performed through the AdaptiveChanController.
func (r *AdaptiveChanController) Close() {
r.cmdChanMu.Lock()
defer r.cmdChanMu.Unlock()
if r.cmdChan != nil {
close(r.cmdChan)
r.cmdChan = nil
}
}
// Min sets the value of Min in the internal *Ring.
func (r *AdaptiveChanController) Min(ctx context.Context, n int) error {
r.cmdChanMu.Lock()
defer r.cmdChanMu.Unlock()
return sendOrErr(ctx, r.cmdChan, acCmd{
acCmdType: acCmdMin,
intValue: n,
})
}
// Max sets the value of Max in the internal *Ring.
func (r *AdaptiveChanController) Max(ctx context.Context, n int) error {
r.cmdChanMu.Lock()
defer r.cmdChanMu.Unlock()
return sendOrErr(ctx, r.cmdChan, acCmd{
acCmdType: acCmdMax,
intValue: n,
})
}
// Grow calls Grow on the internal *Ring.
func (r *AdaptiveChanController) Grow(ctx context.Context, n int) error {
r.cmdChanMu.Lock()
defer r.cmdChanMu.Unlock()
return sendOrErr(ctx, r.cmdChan, acCmd{
acCmdType: acCmdGrow,
intValue: n,
})
}
// Shrink calls Shrink on the internal *Ring.
func (r *AdaptiveChanController) Shrink(ctx context.Context, n int) error {
r.cmdChanMu.Lock()
defer r.cmdChanMu.Unlock()
return sendOrErr(ctx, r.cmdChan, acCmd{
acCmdType: acCmdShrink,
intValue: n,
})
}
// Clear calls Clear on the internal *Ring.
func (r *AdaptiveChanController) Clear(ctx context.Context) error {
r.cmdChanMu.Lock()
defer r.cmdChanMu.Unlock()
return sendOrErr(ctx, r.cmdChan, acCmd{
acCmdType: acCmdClear,
})
}
// WriteStats writes a snapshot of general stats about the associated
// AdaptiveChan to the given *AdaptiveChanStats.
func (r *AdaptiveChanController) WriteStats(ctx context.Context, s *AdaptiveChanStats) error {
r.cmdChanMu.Lock()
defer r.cmdChanMu.Unlock()
if r.cmdChan == nil {
return ErrAdaptiveChanControllerClosed
}
return recvOrErr(ctx, r.statsChan, s)
}
// AdaptiveChanStats is a snapshot of general stats for an AdaptiveChan.
type AdaptiveChanStats struct {
RingStats
// Min is the value of Min in the internal *Ring.
Min int
// Max value of Max in the internal *Ring.
Max int
// StatsRead is the total number of stats read before this snapshot. If it
// is zero, it means this snapshot is the first reading.
StatsRead uint64
}
func sendOrErr[T any](ctx context.Context, c chan<- T, v T) error {
if c == nil {
return ErrAdaptiveChanControllerClosed
}
select {
case c <- v:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func recvOrErr[T any](ctx context.Context, c <-chan T, tptr *T) error {
select {
case t, ok := <-c:
if !ok {
return ErrAdaptiveChanClosed
}
*tptr = t
return nil
case <-ctx.Done():
return ctx.Err()
}
}

View File

@ -0,0 +1,73 @@
package ring
import (
"context"
"testing"
)
func BenchmarkAdaptiveChanBaseline(b *testing.B) {
in, out, _ := AdaptiveChan[int]()
in <- 1
<-out
b.Cleanup(func() {
close(in)
})
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
in <- i
val := <-out
if val != i {
b.Fatalf("expected 1, got %d", val)
}
}
}
func BenchmarkAdaptiveChanWithStatsRead(b *testing.B) {
var stats AdaptiveChanStats
in, out, sr := AdaptiveChan[int]()
in <- 1
<-out
ctx := context.Background()
b.Cleanup(func() {
close(in)
})
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
in <- 1
val := <-out
if val != 1 {
b.Fatalf("expected 1, got %d", val)
}
err := sr.WriteStats(ctx, &stats)
if err != nil {
b.Fatalf("unexpected error: %v", err)
}
if stats.Enqueued == 0 {
b.Fatalf("unexpected stats: %v", stats)
}
}
}
func BenchmarkGoChanBaseline(b *testing.B) {
c := make(chan int, 1)
b.Cleanup(func() {
close(c)
})
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
c <- 1
val := <-c
if val != 1 {
b.Fatalf("expected 1, got %d", val)
}
}
}

View File

@ -0,0 +1,272 @@
package ring
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
)
func TestMain(m *testing.M) {
// make sure we don't leak goroutines after tests in this package have
// finished. This is especially important as AdaptiveChan uses a different
// goroutine to coordinate work
goleak.VerifyTestMain(m)
}
func TestAdaptiveChan(t *testing.T) {
t.Parallel()
t.Run("edge case - close send and controller after creation", func(t *testing.T) {
t.Parallel()
send, recv, ctrl := AdaptiveChan[int]()
cleanupAC(t, send, recv, ctrl)
})
t.Run("basic operation", func(t *testing.T) {
t.Parallel()
var stats, expectedStats AdaptiveChanStats
send, recv, ctrl := AdaptiveChan[int]()
cleanupAC(t, send, recv, ctrl)
sendNonBlock(t, send, ints(10)...)
err := ctrl.WriteStats(ctxFromTest(t), &stats)
require.NoError(t, err)
removeAllocStats(&stats.RingStats)
expectedStats.Len = 10
expectedStats.Enqueued = 10
require.Equal(t, expectedStats, stats)
recvNonBlock(t, recv, ints(10)...)
err = ctrl.WriteStats(ctxFromTest(t), &stats)
require.NoError(t, err)
removeAllocStats(&stats.RingStats)
expectedStats.Len = 0
expectedStats.Dequeued = 10
expectedStats.StatsRead = 1
require.Equal(t, expectedStats, stats)
})
t.Run("using commands to control the ring", func(t *testing.T) {
t.Parallel()
send, recv, ctrl := AdaptiveChan[int]()
cleanupAC(t, send, recv, ctrl)
var stats, expectedStats AdaptiveChanStats
expectedStats.Min = 10
expectedStats.Max = 20
err := ctrl.Min(ctxFromTest(t), expectedStats.Min)
require.NoError(t, err)
err = ctrl.Max(ctxFromTest(t), expectedStats.Max)
require.NoError(t, err)
sendNonBlock(t, send, 1)
err = ctrl.WriteStats(ctxFromTest(t), &stats)
require.NoError(t, err)
require.Equal(t, expectedStats.Min, stats.Cap, "failed to allocate Min")
removeAllocStats(&stats.RingStats)
expectedStats.Len = 1
expectedStats.Enqueued = 1
require.Equal(t, expectedStats, stats)
err = ctrl.Grow(ctxFromTest(t), (expectedStats.Max+expectedStats.Min)/2-1)
require.NoError(t, err)
err = ctrl.WriteStats(ctxFromTest(t), &stats)
require.NoError(t, err)
require.Equal(t, (expectedStats.Max+expectedStats.Min)/2, stats.Cap, "failed to Grow")
err = ctrl.Shrink(ctxFromTest(t), expectedStats.Min)
require.NoError(t, err)
err = ctrl.WriteStats(ctxFromTest(t), &stats)
require.NoError(t, err)
require.Equal(t, expectedStats.Min+1, stats.Cap, "failed to Shrink")
err = ctrl.Clear(ctxFromTest(t))
require.NoError(t, err)
err = ctrl.WriteStats(ctxFromTest(t), &stats)
require.NoError(t, err)
require.Equal(t, expectedStats.Min, stats.Cap, "failed to Clear")
})
t.Run("use of send and recv channels with a closed controller", func(t *testing.T) {
t.Parallel()
send, recv, ctrl := AdaptiveChan[int]()
ctrl.Close()
assertCtrlWriteErr(t, ctrl, ctxFromTest(t), ErrAdaptiveChanControllerClosed)
sendNonBlock(t, send, ints(10)...)
recvNonBlock(t, recv, ints(10)...)
close(send)
shouldBeClosed(t, recv)
})
}
func TestSendOrErr(t *testing.T) {
t.Parallel()
const val = 44203
var c chan int
err := sendOrErr(ctxFromTest(t), c, val)
require.Error(t, err)
require.ErrorIs(t, err, ErrAdaptiveChanControllerClosed)
c = make(chan int, 1)
err = sendOrErr(ctxFromTest(t), c, val)
require.NoError(t, err)
canceledCtx, cancel := context.WithCancel(context.Background())
cancel()
err = sendOrErr(canceledCtx, c, val)
require.Error(t, err)
require.ErrorIs(t, err, context.Canceled)
select {
case v, ok := <-c:
require.True(t, ok)
require.Equal(t, val, v)
default:
t.Fatalf("value not sent to channel")
}
}
func TestRecvOrErr(t *testing.T) {
t.Parallel()
const (
val = 44203
witness = -1
)
var c chan int
canceledCtx, cancel := context.WithCancel(context.Background())
cancel()
got := witness
err := recvOrErr(canceledCtx, c, &got)
require.Error(t, err)
require.ErrorIs(t, err, context.Canceled)
require.Equal(t, witness, got)
c = make(chan int, 1)
c <- val
err = recvOrErr(ctxFromTest(t), c, &got)
require.NoError(t, err)
require.Equal(t, val, got)
close(c)
got = witness
err = recvOrErr(ctxFromTest(t), c, &got)
require.ErrorIs(t, err, ErrAdaptiveChanClosed)
require.Equal(t, witness, got)
}
// cleanupAC closes the send channel and the controller, and perform a series of
// rutinary assertions.
func cleanupAC[T any](t *testing.T, send chan<- T, recv <-chan T, ctrl *AdaptiveChanController) {
t.Cleanup(func() {
close(send)
shouldBeClosed(t, recv)
var stats AdaptiveChanStats
err := ctrl.WriteStats(ctxFromTest(t), &stats)
require.Error(t, err)
require.ErrorIs(t, err, ErrAdaptiveChanClosed)
require.Equal(t, AdaptiveChanStats{}, stats)
canceledCtx, cancel := context.WithCancel(context.Background())
cancel()
assertCtrlWriteErr(t, ctrl, canceledCtx, context.Canceled)
ctrl.Close()
err = ctrl.WriteStats(ctxFromTest(t), &stats)
require.Error(t, err)
require.ErrorIs(t, err, ErrAdaptiveChanControllerClosed)
require.Equal(t, AdaptiveChanStats{}, stats)
assertCtrlWriteErr(t, ctrl, ctxFromTest(t), ErrAdaptiveChanControllerClosed)
})
}
func assertCtrlWriteErr(t *testing.T, ctrl *AdaptiveChanController, ctx context.Context, expectedErr error) {
t.Helper()
err := ctrl.Min(ctx, 1)
require.Error(t, err)
require.ErrorIs(t, err, expectedErr)
err = ctrl.Max(ctx, 1)
require.Error(t, err)
require.ErrorIs(t, err, expectedErr)
err = ctrl.Grow(ctx, 1)
require.Error(t, err)
require.ErrorIs(t, err, expectedErr)
err = ctrl.Shrink(ctx, 1)
require.Error(t, err)
require.ErrorIs(t, err, expectedErr)
err = ctrl.Clear(ctx)
require.Error(t, err)
require.ErrorIs(t, err, expectedErr)
}
func shouldBeClosed[T any](t *testing.T, recv <-chan T) {
t.Helper()
select {
case v, ok := <-recv:
require.False(t, ok, "unexpected value %q received", v)
case <-ctxFromTest(t).Done():
t.Fatalf("context canceled where recv chan should be closed")
}
}
func sendNonBlock[T any](t *testing.T, send chan<- T, s ...T) {
t.Helper()
var canceled bool
for i, v := range s {
require.NotPanics(t, func() {
select {
case send <- v:
case <-ctxFromTest(t).Done():
canceled = true
}
})
require.False(t, canceled, "context canceled while sending item %d/%d", i+1, len(s))
}
}
func recvNonBlock[T any](t *testing.T, recv <-chan T, s ...T) {
t.Helper()
var canceled bool
for i := range s {
select {
case s[i] = <-recv:
case <-ctxFromTest(t).Done():
canceled = true
}
require.False(t, canceled, "context canceled while receiving item %d/%d", i+1, len(s))
}
}
func ctxFromTest(t *testing.T) context.Context {
return ctxFromTestWithDefault(t, time.Second)
}
func ctxFromTestWithDefault(t *testing.T, d time.Duration) context.Context {
require.Greater(t, d, 0*time.Second)
deadline, ok := t.Deadline()
if !ok {
deadline = time.Now().Add(d)
}
ctx, cancel := context.WithDeadline(context.Background(), deadline)
t.Cleanup(cancel)
return ctx
}

254
pkg/util/ring/ring.go Normal file
View File

@ -0,0 +1,254 @@
package ring
// Ring is a ring buffer backed by a slice that rearranges itself to grow and
// shrink as needed. It can also be grown and shrunk manually. It is not safe
// for concurrent use, and the zero value is ready for use. Dequeued and cleared
// items are zeroed in the underlying slice to release references and allow
// garbage collection. Leaving growth and shrinkage of the internal slice apart,
// which can be directly controlled, all operations are allocation free.
type Ring[T any] struct {
buf []T
stats RingStats
back, len int
// Min sets the minimum capacity that the Ring can have, and takes effect
// in the next write operation. The Ring will naturally tend to shrink
// towards Min when free capacity allows it. Setting this value has no
// immediate effect, but instead affects future writing operations. Min is
// valid only if:
// 0 < Min && ( Max <= 0 || Min <= Max )
// Note that this allows setting Min but not Max, or setting both as well.
Min int
// Max sets the maximum capacity that the Ring can grow to store new items.
// Setting this value has no immediate effect, but instead affects future
// writing operations. Max is valid only if:
// 0 < Max && Min <= Max
// Note that this allows setting Max but not Min, or setting both as well.
Max int
}
// RingStats provides general stats for a Ring.
type RingStats struct {
// Len is the used capacity.
Len int
// Cap is the current total capacity.
Cap int
// Grown is the number of times a larger buffer was allocated.
Grown uint64
// Shrunk is the number of times a smaller buffer was allocated.
Shrunk uint64
// Allocs is Grown + Shrunk.
Allocs uint64
// Enqueued is the total number of items entered into the Ring, including
// those which caused other items to be dropped.
Enqueued uint64
// Dequeued is the total number of items removed from the Ring, including
// items removed with Dequeue and with Clear.
Dequeued uint64
// Dropped is the number of items lost due to the Ring being at capacity.
Dropped uint64
}
// Len returns the used capacity.
func (rq *Ring[T]) Len() int {
return rq.len
}
// Cap returns the current total capacity.
func (rq *Ring[T]) Cap() int {
return len(rq.buf)
}
// WriteStats writes general stats about this Ring to the given *RingStats, if
// it's non-nil.
func (rq *Ring[T]) WriteStats(s *RingStats) {
if s == nil {
return
}
rq.stats.Len = rq.len
rq.stats.Cap = len(rq.buf)
*s = rq.stats
}
// Clear removes all items from the Ring and returns the number of items
// removed. If Min is valid and Cap() > Min, it will also shrink the capacity to
// Min. Stats are not cleared, but instead Dequeued is increased by the number
// of removed items.
func (rq *Ring[T]) Clear() int {
cleared := rq.len
rq.stats.Dequeued += uint64(cleared)
shouldMigrate := clearShouldMigrate(len(rq.buf), rq.Min, rq.Max)
if rq.len > 0 && !shouldMigrate {
// if we migrate we don't need to clear items, since moving to the new
// slice will just have the old slice garbage collected
chunk := min(rq.back+rq.len, len(rq.buf))
clear(rq.buf[rq.back:chunk])
clear(rq.buf[:rq.len-chunk])
}
rq.back = 0
rq.len = 0
if shouldMigrate {
rq.migrate(rq.Min)
}
return cleared
}
// Shrink makes sure free capacity is not greater than n, shrinking if
// necessary. If a new allocation is needed then it will be capped to Min, given
// than Min is valid.
func (rq *Ring[T]) Shrink(n int) {
if n < 0 || rq.len+n >= len(rq.buf) {
return
}
rq.migrate(n)
}
// Grow makes sure free capacity is at least n, growing if necessary. If a new
// allocation is needed then it will be capped to Max, given that Max is valid.
func (rq *Ring[T]) Grow(n int) {
if n < 1 || rq.len+n <= len(rq.buf) {
return
}
rq.migrate(n)
}
func (rq *Ring[T]) migrate(newFreeCap int) {
newCap := rq.len + newFreeCap
newCap = fixAllocSize(rq.len, rq.Min, rq.Max, newCap)
if newCap == len(rq.buf) {
return
}
var s []T
if newCap > 0 {
// if newCap == 0 then just set rq.s to nil
s = make([]T, newCap)
}
if len(s) > len(rq.buf) {
rq.stats.Grown++
} else {
rq.stats.Shrunk++
}
if rq.len > 0 {
chunk1 := min(rq.back+rq.len, len(rq.buf))
copied := copy(s, rq.buf[rq.back:chunk1])
if copied < rq.len {
// wrapped the slice
chunk2 := rq.len - copied
copy(s[copied:], rq.buf[:chunk2])
}
}
rq.back = 0
rq.buf = s
}
// Enqueue adds the given item to the Ring, growing the capacity if needed. If
// the Ring is at capacity (0 < Max && Min <= Max && rq.Len() == rq.Cap()),
// then the new item will overwrite the oldest enqueued item.
func (rq *Ring[T]) Enqueue(v T) {
// try to add space if we're at capacity or fix min allocation
if rq.len == len(rq.buf) || (minIsValid(rq.Min, rq.Max) && len(rq.buf) < rq.Min) {
newFreeCap := rq.len + 1
newFreeCap = newFreeCap*3/2 + 1 // classic append: https://go.dev/blog/slices
newFreeCap -= rq.len // migrate only takes free capacity
rq.migrate(newFreeCap)
// if growing was capped at max, then overwrite the first item to be
// dequeued
if rq.len == len(rq.buf) {
rq.stats.Dropped++
rq.len--
if rq.back++; rq.back >= len(rq.buf) {
rq.back = 0 // wrap the slice
}
}
}
writePos := rq.back + rq.len
if writePos >= len(rq.buf) {
writePos -= len(rq.buf)
}
rq.buf[writePos] = v
rq.len++
rq.stats.Enqueued++
}
// Peek is like Dequeue, but it doesn't remove the item.
func (rq *Ring[T]) Peek() (v T) {
if rq.len == 0 {
return
}
return rq.buf[rq.back]
}
// Dequeue removes the oldest enqueued item and returns it. If the Ring is
// empty, it returns the zero value.
func (rq *Ring[T]) Dequeue() (v T) {
if rq.len == 0 {
return
}
// get the value into v, and also zero out the slice item to release
// references so they can be gc'd
v, rq.buf[rq.back] = rq.buf[rq.back], v
rq.len--
if rq.back++; rq.back >= len(rq.buf) {
rq.back = 0 // wrap the slice
}
if minIsValid(rq.Min, rq.Max) && rq.len < len(rq.buf)/2+1 {
newFreeCap := len(rq.buf)*2/3 + 1 // opposite of growing arithmetic
newFreeCap -= rq.len // migrate only takes free capacity
rq.migrate(newFreeCap)
}
rq.stats.Dequeued++
return v
}
// the following functions provide small checks and arithmetics that are far
// easier to test separately than creating big and more complex tests covering a
// huge amount of combinatory options. This reduces the complexity of higher
// level tests and leaves only higher level logic, but also allows us to provide
// high coverage for even the most rare edge and boundary cases by adding a new
// line to the test cases table. They're also inlineable, so no penalty in
// calling them.
func minIsValid(Min, Max int) bool {
return 0 < Min && (Max <= 0 || Min <= Max)
}
func maxIsValid(Min, Max int) bool {
return 0 < Max && Min <= Max
}
func clearShouldMigrate(CurCap, Min, Max int) bool {
return minIsValid(Min, Max) && CurCap > Min
}
// fixAllocSize is a helper to determine what should be the new size to be
// allocated for a new slice, given the intended NewCap and the current relevant
// state of Ring. This is expected to be called inside (*Ring).migrate.
func fixAllocSize(CurLen, Min, Max, NewCap int) int {
if minIsValid(Min, Max) { // Min is valid
NewCap = max(NewCap, CurLen, Min)
} else {
NewCap = max(CurLen, NewCap)
}
if maxIsValid(Min, Max) { // Max is valid
NewCap = min(NewCap, Max)
}
return NewCap
}

573
pkg/util/ring/ring_test.go Normal file
View File

@ -0,0 +1,573 @@
package ring
import (
"slices"
"testing"
"github.com/stretchr/testify/require"
)
func ints(n int) []int {
ret := make([]int, n)
for i := range ret {
ret[i] = i + 1
}
return ret
}
func TestRing(t *testing.T) {
t.Parallel()
const (
dLen = 10
dHalfLen = dLen / 2
)
data := ints(dLen)
lData := slices.Clone(data[:dHalfLen])
rData := slices.Clone(data[dHalfLen:])
require.NotPanics(t, func() {
new(Ring[int]).WriteStats(nil)
}, "WriteStats should be panic free")
t.Run("basic enqueue and dequeue - no min, no max", func(t *testing.T) {
t.Parallel()
q, expected := new(Ring[int]), new(Ring[int])
enq(t, q, data...)
expected.len = dLen
expected.stats.Enqueued = dLen
expected.buf = data
ringEq(t, expected, q)
deq(t, q, lData...)
expected.back = dHalfLen
expected.len = dHalfLen
expected.stats.Dequeued = dHalfLen
expected.buf = append(make([]int, dHalfLen), rData...)
ringEq(t, expected, q)
enq(t, q, data...)
expected.back = 0
expected.len = dLen + dHalfLen
expected.stats.Enqueued += dLen
expected.buf = append(rData, data...)
ringEq(t, expected, q)
deqAll(t, q, append(rData, data...)...)
expected.len = 0
expected.stats.Dequeued += dLen + dHalfLen
expected.buf = []int{}
ringEq(t, expected, q)
enq(t, q, data...)
expected.len = dLen
expected.stats.Enqueued += dLen
expected.buf = data
ringEq(t, expected, q)
clearRing(t, q)
expected.len = 0
expected.stats.Dequeued += dLen
expected.buf = []int{}
ringEq(t, expected, q)
})
t.Run("enqueue, dequeue, grow and shrink - no min, yes max", func(t *testing.T) {
t.Parallel()
q, expected := new(Ring[int]), new(Ring[int])
q.Max = dLen
// basic wrap and overwrite
enq(t, q, lData...)
enq(t, q, data...)
enq(t, q, data...)
expected.back = dHalfLen
expected.buf = append(rData, lData...)
expected.len = dLen
expected.stats.Enqueued = 2*dLen + dHalfLen
expected.stats.Dropped = dLen + dHalfLen
ringEq(t, expected, q)
require.Equal(t, dLen, q.Cap())
// can't allocate past max and cannot shrink because we're at capacity
q.Grow(3 * dLen)
ringEq(t, expected, q)
require.Equal(t, dLen, q.Cap())
q.Shrink(2 * dLen)
ringEq(t, expected, q)
require.Equal(t, dLen, q.Cap())
// remove some items and play with extra space
deq(t, q, lData...)
expected.back = 0
expected.buf = rData
expected.len -= dHalfLen
expected.stats.Dequeued = dHalfLen
require.Equal(t, dLen, q.Cap())
ringEq(t, expected, q)
q.Shrink(1)
ringEq(t, expected, q)
require.Equal(t, dHalfLen+1, q.Cap())
q.Grow(2)
ringEq(t, expected, q)
require.Equal(t, dHalfLen+2, q.Cap())
q.Grow(dLen)
ringEq(t, expected, q)
require.Equal(t, dLen, q.Cap())
})
t.Run("enqueue, dequeue, grow and shrink - yes min, no max", func(t *testing.T) {
t.Parallel()
q, expected := new(Ring[int]), new(Ring[int])
q.Min = dHalfLen
// enqueueing one item should allocate Min
enq(t, q, 1)
expected.buf = []int{1}
expected.len = 1
expected.stats.Enqueued = 1
ringEq(t, expected, q)
require.Equal(t, dHalfLen, q.Cap())
// clearing should not migrate now
clearRing(t, q)
expected.buf = []int{}
expected.len = 0
expected.stats.Dequeued = 1
ringEq(t, expected, q)
require.Equal(t, dHalfLen, q.Cap())
// enqueue some data
enq(t, q, data...)
expected.buf = data
expected.len = dLen
expected.stats.Enqueued += dLen
ringEq(t, expected, q)
require.GreaterOrEqual(t, q.Cap(), dLen)
// now clearing should migrate and move to a slice of Min length
clearRing(t, q)
expected.buf = []int{}
expected.len = 0
expected.stats.Dequeued += dLen
ringEq(t, expected, q)
require.Equal(t, dHalfLen, q.Cap())
// we shouldn't be able to shrink past Min, but it shouldn't allocate a
// greater slice either because it's purpose is to reduce allocated
// memory if possible
q.Min = dLen
q.Shrink(dHalfLen)
ringEq(t, expected, q)
require.Equal(t, dHalfLen, q.Cap())
// dequeueing shouldn't allocate either, just in case
require.Zero(t, q.Dequeue())
ringEq(t, expected, q)
require.Equal(t, dHalfLen, q.Cap())
// enqueueing one item allocates again to Min, which is now greater than
// before
enq(t, q, 1)
expected.buf = []int{1}
expected.len = 1
expected.stats.Enqueued += 1
ringEq(t, expected, q)
require.Equal(t, dLen, q.Cap())
// we reduce Min again, then we should be able to shrink as well
q.Min = dHalfLen
q.Shrink(dHalfLen)
ringEq(t, expected, q)
require.Equal(t, dHalfLen+1, q.Cap())
q.Shrink(1)
ringEq(t, expected, q)
require.Equal(t, dHalfLen, q.Cap())
q.Shrink(0)
ringEq(t, expected, q)
require.Equal(t, dHalfLen, q.Cap())
// enqueue a lot and then dequeue all, we should still see Min cap
enq(t, q, data...)
expected.buf = append(expected.buf, data...)
expected.len += dLen
expected.stats.Enqueued += dLen
ringEq(t, expected, q)
require.GreaterOrEqual(t, q.Cap(), dLen+1)
deqAll(t, q, expected.buf...)
expected.buf = []int{}
expected.len = 0
expected.stats.Dequeued += dLen + 1
ringEq(t, expected, q)
require.Equal(t, dHalfLen, q.Cap())
})
t.Run("enqueue, dequeue, grow and shrink - yes min, yes max", func(t *testing.T) {
t.Parallel()
q, expected := new(Ring[int]), new(Ring[int])
q.Min, q.Max = dHalfLen, dLen
// single enqueueing should allocate for Min
enq(t, q, 1)
expected.buf = []int{1}
expected.len = 1
expected.stats.Enqueued = 1
ringEq(t, expected, q)
require.Equal(t, dHalfLen, q.Cap())
// enqueue a lot until we overwrite the first item
enq(t, q, data...)
expected.back = 1
expected.buf = append(data[dLen-1:], data[:dLen-1]...)
expected.len = dLen
expected.stats.Enqueued += dLen
expected.stats.Dropped = 1
ringEq(t, expected, q)
require.Equal(t, dLen, q.Cap())
// clearing should bring us back to Min alloc
clearRing(t, q)
expected.back = 0
expected.buf = expected.buf[:0]
expected.len = 0
expected.stats.Dequeued += dLen
ringEq(t, expected, q)
require.Equal(t, dHalfLen, q.Cap())
})
t.Run("growing and shrinking invariants - no min, no max", func(t *testing.T) {
t.Parallel()
q, expected := new(Ring[int]), new(Ring[int])
// dummy grow and shrink
q.Grow(0)
require.Equal(t, 0, q.Cap())
ringEq(t, expected, q)
q.Shrink(0)
require.Equal(t, 0, q.Cap())
ringEq(t, expected, q)
// add 3*dLen and leave 2*dLen
q.Grow(3 * dLen)
expected.buf = []int{}
require.Equal(t, 3*dLen, q.Cap())
ringEq(t, expected, q)
q.Shrink(2 * dLen)
require.Equal(t, 2*dLen, q.Cap())
ringEq(t, expected, q)
// add dLen items and play with cap
enq(t, q, data...)
expected.buf = data
expected.len = dLen
expected.stats.Enqueued = dLen
require.Equal(t, 2*dLen, q.Cap())
ringEq(t, expected, q)
q.Grow(2 * dLen)
require.GreaterOrEqual(t, q.Cap(), 3*dLen)
ringEq(t, expected, q)
q.Shrink(0)
require.Equal(t, dLen, q.Cap())
ringEq(t, expected, q)
// remove all items and shrink to zero
deqAll(t, q, data...)
expected.buf = []int{}
expected.len = 0
expected.stats.Dequeued = dLen
require.Equal(t, dLen, q.Cap())
ringEq(t, expected, q)
q.Shrink(0)
expected.buf = nil
require.Equal(t, 0, q.Cap())
ringEq(t, expected, q)
})
}
// enq enqueues the given items into the given Ring.
func enq[T any](t *testing.T, q *Ring[T], s ...T) {
t.Helper()
initLen := q.Len()
initCap := q.Cap()
for _, v := range s {
require.NotPanics(t, func() {
q.Enqueue(v)
})
}
expectedLen := initLen + len(s)
if q.Max > 0 {
expectedMax := max(initCap, q.Max)
expectedLen = min(expectedLen, expectedMax)
}
require.Equal(t, expectedLen, q.Len())
}
// deq dequeues len(expected) items from the given Ring and compares them to
// expected. Ring should have at least len(expected) items.
func deq[T any](t *testing.T, q *Ring[T], expected ...T) {
t.Helper()
if q.Cap() == 0 {
require.Nil(t, q.buf) // internal state
require.Equal(t, 0, q.back) // internal state
return
}
oldLen := q.Len()
require.True(t, oldLen >= len(expected))
got := make([]T, len(expected))
for i := range got {
var val T
require.NotPanics(t, func() {
prePeekLen := q.Len()
val = q.Peek()
require.Equal(t, prePeekLen, q.Len())
got[i] = q.Dequeue()
})
require.Equal(t, val, got[i])
}
require.Equal(t, expected, got)
require.Equal(t, oldLen-len(expected), q.Len())
}
// clearRing calls Clear on the given Ring and performs a set of assertions that
// should be satisfied afterwards.
func clearRing[T any](t *testing.T, q *Ring[T]) {
t.Helper()
var expectedBuf []T
if clearShouldMigrate(q.Cap(), q.Min, q.Max) {
expectedBuf = make([]T, q.Min)
} else {
expectedBuf = make([]T, q.Cap())
}
require.NotPanics(t, func() {
q.Clear()
})
require.Equal(t, expectedBuf, q.buf) // internal state
require.Equal(t, 0, q.Len())
require.Equal(t, 0, q.back) // internal state
// dequeueing should yield zero values
var zero T
for i := 0; i < 10; i++ {
var val1, val2 T
require.NotPanics(t, func() {
val1 = q.Peek()
val1 = q.Dequeue()
})
require.Equal(t, zero, val1)
require.Equal(t, zero, val2)
}
}
// deqAll depletes the given Ring and compares the dequeued items to those
// provided.
func deqAll[T any](t *testing.T, q *Ring[T], expected ...T) {
t.Helper()
deq[T](t, q, expected...)
zeroS := make([]T, q.Cap())
require.Equal(t, zeroS, q.buf) // internal state
require.Equal(t, 0, q.Len())
// dequeueing further should yield zero values when empty
var zero T
for i := 0; i < 10; i++ {
var val1, val2 T
require.NotPanics(t, func() {
val1 = q.Peek()
val2 = q.Dequeue()
})
require.Equal(t, zero, val1)
require.Equal(t, zero, val2)
}
clearRing(t, q)
}
// ringEq tests that the given Rings are the same in many aspects. The following
// are the things that are not checked:
// - The values of Min and Max, since the code does not programmatically
// channge them
// - Allocation numbers (Cap, Grown, Shrunk, Allocs)
// - The free capacity to the right of `got`
func ringEq[T any](t *testing.T, expected, got *Ring[T]) {
t.Helper()
var expStats, gotStats RingStats
require.NotPanics(t, func() {
expected.WriteStats(&expStats)
got.WriteStats(&gotStats)
})
// capacity and allocations are to be tested separately
removeAllocStats(&expStats)
removeAllocStats(&gotStats)
require.Equal(t, expStats, gotStats, "expStats == gotStats")
// internal state
require.Equal(t, expected.back, got.back, "expected.back == got.back")
// only check for used capacity
require.Equal(t, expected.buf, got.buf[:min(got.back+got.len, len(got.buf))],
"expected.buf == got.buf[:min(got.back+got.len, len(got.s))]")
}
func removeAllocStats(s *RingStats) {
s.Cap = 0
s.Grown = 0
s.Shrunk = 0
s.Allocs = 0
}
func TestMinMaxValidity(t *testing.T) {
t.Parallel()
testCases := []struct {
Min, Max int
minIsValid, maxIsValid bool
}{
{Min: 0, Max: 0, minIsValid: false, maxIsValid: false},
{Min: 0, Max: 1, minIsValid: false, maxIsValid: true},
{Min: 0, Max: 2, minIsValid: false, maxIsValid: true},
{Min: 1, Max: 0, minIsValid: true, maxIsValid: false},
{Min: 1, Max: 1, minIsValid: true, maxIsValid: true},
{Min: 1, Max: 2, minIsValid: true, maxIsValid: true},
{Min: 2, Max: 0, minIsValid: true, maxIsValid: false},
{Min: 2, Max: 1, minIsValid: false, maxIsValid: false},
{Min: 2, Max: 2, minIsValid: true, maxIsValid: true},
}
for i, tc := range testCases {
gotMinIsValid := minIsValid(tc.Min, tc.Max)
require.Equal(t, tc.minIsValid, gotMinIsValid,
"test index %d; test data: %#v", i, tc)
gotMaxIsValid := maxIsValid(tc.Min, tc.Max)
require.Equal(t, tc.maxIsValid, gotMaxIsValid,
"test index %d; test data: %#v", i, tc)
}
}
func TestClearShouldMigrate(t *testing.T) {
t.Parallel()
testCases := []struct {
// we don't need to include Max in the test, we just disable it by
// passing zero because Max is only needed to establish the validity of
// Min. The validity of Min wrt Max is already covered in the test for
// minIsValid, and once Min is valid Max has no impact on the outcome of
// clearShouldMigrate.
CurCap, Min int
expected bool
}{
{CurCap: 0, Min: 0, expected: false},
{CurCap: 0, Min: 9, expected: false},
{CurCap: 0, Min: 10, expected: false},
{CurCap: 0, Min: 11, expected: false},
{CurCap: 10, Min: 0, expected: false},
{CurCap: 10, Min: 9, expected: true},
{CurCap: 10, Min: 10, expected: false},
{CurCap: 10, Min: 11, expected: false},
}
for i, tc := range testCases {
got := clearShouldMigrate(tc.CurCap, tc.Min, 0)
require.Equal(t, tc.expected, got,
"test index %d; test data: %#v", i, tc)
}
}
func TestFixAllocSize(t *testing.T) {
t.Parallel()
testCases := []struct {
CurLen, Min, Max, NewCap, expected int
}{
// we don't need to add test cases for odd configurations of Min and Max
// not being valid for different reasons because that is already covered
// in the unit tests for minIsValid and maxIsValid. It suffices to
// provide a zero for Min or Max to disable their respective behaviour
{CurLen: 0, Min: 0, Max: 0, NewCap: 0, expected: 0},
{CurLen: 0, Min: 0, Max: 0, NewCap: 5, expected: 5},
{CurLen: 0, Min: 0, Max: 10, NewCap: 0, expected: 0},
{CurLen: 0, Min: 0, Max: 10, NewCap: 9, expected: 9},
{CurLen: 0, Min: 0, Max: 10, NewCap: 10, expected: 10},
{CurLen: 0, Min: 0, Max: 10, NewCap: 11, expected: 10},
{CurLen: 0, Min: 10, Max: 0, NewCap: 0, expected: 10},
{CurLen: 0, Min: 10, Max: 0, NewCap: 5, expected: 10},
{CurLen: 0, Min: 10, Max: 0, NewCap: 9, expected: 10},
{CurLen: 0, Min: 10, Max: 0, NewCap: 10, expected: 10},
{CurLen: 0, Min: 10, Max: 0, NewCap: 11, expected: 11},
{CurLen: 0, Min: 10, Max: 10, NewCap: 0, expected: 10},
{CurLen: 0, Min: 10, Max: 10, NewCap: 5, expected: 10},
{CurLen: 0, Min: 10, Max: 10, NewCap: 9, expected: 10},
{CurLen: 0, Min: 10, Max: 10, NewCap: 10, expected: 10},
{CurLen: 0, Min: 10, Max: 10, NewCap: 11, expected: 10},
{CurLen: 0, Min: 10, Max: 20, NewCap: 0, expected: 10},
{CurLen: 0, Min: 10, Max: 20, NewCap: 5, expected: 10},
{CurLen: 0, Min: 10, Max: 20, NewCap: 9, expected: 10},
{CurLen: 0, Min: 10, Max: 20, NewCap: 10, expected: 10},
{CurLen: 0, Min: 10, Max: 20, NewCap: 19, expected: 19},
{CurLen: 0, Min: 10, Max: 20, NewCap: 20, expected: 20},
{CurLen: 0, Min: 10, Max: 20, NewCap: 21, expected: 20},
{CurLen: 5, Min: 0, Max: 0, NewCap: 0, expected: 5},
{CurLen: 5, Min: 0, Max: 0, NewCap: 5, expected: 5},
{CurLen: 5, Min: 0, Max: 0, NewCap: 10, expected: 10},
{CurLen: 5, Min: 0, Max: 10, NewCap: 0, expected: 5},
{CurLen: 5, Min: 0, Max: 10, NewCap: 5, expected: 5},
{CurLen: 5, Min: 0, Max: 10, NewCap: 9, expected: 9},
{CurLen: 5, Min: 0, Max: 10, NewCap: 10, expected: 10},
{CurLen: 5, Min: 0, Max: 10, NewCap: 11, expected: 10},
{CurLen: 5, Min: 10, Max: 0, NewCap: 0, expected: 10},
{CurLen: 5, Min: 10, Max: 0, NewCap: 5, expected: 10},
{CurLen: 5, Min: 10, Max: 0, NewCap: 9, expected: 10},
{CurLen: 5, Min: 10, Max: 0, NewCap: 10, expected: 10},
{CurLen: 5, Min: 10, Max: 0, NewCap: 11, expected: 11},
{CurLen: 5, Min: 10, Max: 10, NewCap: 0, expected: 10},
{CurLen: 5, Min: 10, Max: 10, NewCap: 5, expected: 10},
{CurLen: 5, Min: 10, Max: 10, NewCap: 9, expected: 10},
{CurLen: 5, Min: 10, Max: 10, NewCap: 10, expected: 10},
{CurLen: 5, Min: 10, Max: 10, NewCap: 11, expected: 10},
{CurLen: 5, Min: 10, Max: 20, NewCap: 0, expected: 10},
{CurLen: 5, Min: 10, Max: 20, NewCap: 5, expected: 10},
{CurLen: 5, Min: 10, Max: 20, NewCap: 9, expected: 10},
{CurLen: 5, Min: 10, Max: 20, NewCap: 10, expected: 10},
{CurLen: 5, Min: 10, Max: 20, NewCap: 19, expected: 19},
{CurLen: 5, Min: 10, Max: 20, NewCap: 20, expected: 20},
{CurLen: 5, Min: 10, Max: 20, NewCap: 21, expected: 20},
}
for i, tc := range testCases {
got := fixAllocSize(tc.CurLen, tc.Min, tc.Max, tc.NewCap)
require.Equal(t, tc.expected, got,
"test index %d; test data %#v", i, tc)
}
}