Unified Storage: Fix data races and context usage in broadcaster (#88955)

* Fix several broadcaster data races and error handling

- Separate concerns between sender and receiver sides in channel usage
- broadcaster: Fix data race between Subscribe/Unsubscribe and start
- Fix Subscribe error to be io.EOF when broadcaster is terminated
- Fix Watch never unsubscribing
- General cleanup
- fix usage of context
- add a huge amount of documentation about channels
This commit is contained in:
Diego Augusto Molina 2024-06-15 02:46:14 -03:00 committed by GitHub
parent 8491e02caf
commit b9812a0784
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 181 additions and 68 deletions

View File

@ -3,18 +3,97 @@ package sqlstash
import ( import (
"context" "context"
"fmt" "fmt"
"io"
) )
type ConnectFunc[T any] func(chan T) error // Please, when reviewing or working on this file have the following cheat-sheet
// in mind:
// 1. A channel type in Go has one of three directions: send-only (chan<- T),
// receive-only (<-chan T) or bidirctional (chan T). Each of them are a
// different type. A bidirectional type can be converted to any of the other
// two types and is automatic, any other conversion attempt results in a
// panic.
// 2. There are three operations you can do on a channel: send, receive and
// close. Availability of operation for each channel direction:
// | Channel direction
// Operation | Receive-only | Send-only | Bidirectional
// ----------+--------------+------------+--------------
// Receive | Yes | No (panic) | Yes
// Send | No (panic) | Yes | Yes
// Close | No (panic) | Yes | Yes
// 3. A channel of any type also has one of three states: nil (zero value),
// closed, or open (technically called "non-nil, not-closed channel",
// created with the `make` builtin). Nil and closed channels are also
// useful, but you have to know and care for how you use them. Outcome of
// each operation on a channel depending on its state, assuming the
// operation is available to the channel given its direction:
// | Channel state
// Operation | Nil | Closed | Open
// ----------+---------------+---------------+------------------
// Receive | Block forever | Block forever | Receive/Block until receive
// Send | Block forever | Panic | Send/Block until send
// Close | Panic | Panic | Close the channel
// 4. A `select` statement has zero or more `case` branches, each one of them
// containing either a send or a receive channel operation. A `select` with
// no branches blocks forever. At most one branch will be executed, which
// means it behaves similar to a `switch`. If more than one branch can be
// executed then one of them is picked AT RANDOM (i.e. not the one first in
// the list). A `select` statement can also have a (single and optional)
// `default` branch that is executed if all the other branches are
// operations that are blocked at the time the `select` statement is
// reached. This means that having a `default` branch causes the `select`
// statement to never block.
// 5. A receive operation on a closed channel never blocks (as said before),
// but it will always yield a zero value. As it is also valid to send a zero
// value to the channel, you can receive from channels in two forms:
// v := <-c // get a zero value if closed
// v2, ok := <-c // `ok` is set to false iif the channel is closed
// 6. The `make` builtin is used to create open channels (and is the only way
// to get them). It has an optional second parameter to specify the amount
// of items that can buffered. After that, a send operation will block
// waiting for another goroutine to receive from it (which would make room
// for the new item). When the second argument is not passed to `make`, then
// all operations are fully synchronized, meaning that a send will block
// until a receive in another goroutine is performed, and vice versa. Less
// interestingly, `make` can also create send-only or receive-only channel.
//
// The sources are the Go Specs, Effective Go and Go 101, which are already
// linked in the contributing guide for the backend or elsewhere in Grafana, but
// this file exploits so many of these subtleties that it's worth keeping a
// refresher about them at all times. The above is unlikely to change in the
// foreseeable future, so it's zero maintenance as well. We exclude patterns for
// using channels and other concurrency patterns since that's a way longer
// topic for a refresher.
// ConnectFunc is used to initialize the watch implementation. It should do very
// basic work and checks and it has the chance to return an error. After that,
// it should fork to a different goroutine with the provided channel and send to
// it all the new events from the backing database. It is also responsible for
// closing the provided channel under all circumstances, included returning an
// error. The caller of this function will only receive from this channel (i.e.
// it is guaranteed to never send to it or close it), hence providing a safe
// separation of concerns and preventing panics.
//
// FIXME: this signature suffers from inversion of control. It would also be
// much simpler if NewBroadcaster receives a context.Context and a <-chan T
// instead. That would also reduce the scope of the broadcaster to only
// broadcast to subscribers what it receives on the provided <-chan T. The
// context.Context is still needed to provide additional values in case we want
// to add observability into the broadcaster, which we want. The broadcaster
// should still terminate on either the context being done or the provided
// channel being closed.
type ConnectFunc[T any] func(chan<- T) error
type Broadcaster[T any] interface { type Broadcaster[T any] interface {
Subscribe(context.Context) (<-chan T, error) Subscribe(context.Context) (<-chan T, error)
Unsubscribe(chan T) Unsubscribe(<-chan T)
} }
func NewBroadcaster[T any](ctx context.Context, connect ConnectFunc[T]) (Broadcaster[T], error) { func NewBroadcaster[T any](ctx context.Context, connect ConnectFunc[T]) (Broadcaster[T], error) {
b := &broadcaster[T]{} b := &broadcaster[T]{
err := b.start(ctx, connect) started: make(chan struct{}),
}
err := b.init(ctx, connect)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -23,101 +102,132 @@ func NewBroadcaster[T any](ctx context.Context, connect ConnectFunc[T]) (Broadca
} }
type broadcaster[T any] struct { type broadcaster[T any] struct {
running bool // FIXME: race condition between `Subscribe`/`Unsubscribe` and `start` // lifecycle management
ctx context.Context
subs map[chan T]struct{} started, terminated chan struct{}
shouldTerminate <-chan struct{}
// subscription management
cache Cache[T] cache Cache[T]
subscribe chan chan T subscribe chan chan T
unsubscribe chan chan T unsubscribe chan (<-chan T)
subs map[<-chan T]chan T
} }
func (b *broadcaster[T]) Subscribe(ctx context.Context) (<-chan T, error) { func (b *broadcaster[T]) Subscribe(ctx context.Context) (<-chan T, error) {
if !b.running { select {
return nil, fmt.Errorf("broadcaster not running") case <-ctx.Done(): // client canceled
return nil, ctx.Err()
case <-b.started: // wait for broadcaster to start
} }
// create the subscription
sub := make(chan T, 100) sub := make(chan T, 100)
b.subscribe <- sub
go func() {
<-ctx.Done()
b.unsubscribe <- sub
}()
return sub, nil select {
} case <-ctx.Done(): // client canceled
return nil, ctx.Err()
func (b *broadcaster[T]) Unsubscribe(sub chan T) { case <-b.terminated: // no more data
b.unsubscribe <- sub return nil, io.EOF
} case b.subscribe <- sub: // success submitting subscription
return sub, nil
func (b *broadcaster[T]) start(ctx context.Context, connect ConnectFunc[T]) error {
if b.running {
return fmt.Errorf("broadcaster already running")
} }
}
func (b *broadcaster[T]) Unsubscribe(sub <-chan T) {
// wait for broadcaster to start. In practice, the only way to reach
// Unsubscribe is by first having called Subscribe, which means we have
// already started. But a malfunctioning caller may call Unsubscribe freely,
// which would cause us to block forever the goroutine of the caller when
// trying to send to a nil `b.unsubscribe` or receive from a nil
// `b.terminated` if we haven't yet initialized those values. This would
// mean leaking that malfunctioninig caller's goroutine, so we rather make
// Unsubscribe safe in any possible case
if sub == nil {
return
}
<-b.started // wait for broadcaster to start
select {
case b.unsubscribe <- sub: // success submitting unsubscription
case <-b.terminated: // broadcaster terminated, nothing to do
}
}
// init initializes the broadcaster. It should not be run more than once.
func (b *broadcaster[T]) init(ctx context.Context, connect ConnectFunc[T]) error {
// create the stream that will connect us with the watch implementation and
// send it to them so they initialize and start sending data
stream := make(chan T, 100) stream := make(chan T, 100)
if err := connect(stream); err != nil {
err := connect(stream)
if err != nil {
return err return err
} }
b.ctx = ctx // initialize our internal state
b.shouldTerminate = ctx.Done()
b.cache = NewCache[T](ctx, 100) b.cache = NewCache[T](ctx, 100)
b.subscribe = make(chan chan T, 100) b.subscribe = make(chan chan T, 100)
b.unsubscribe = make(chan chan T, 100) b.unsubscribe = make(chan (<-chan T), 100)
b.subs = make(map[chan T]struct{}) b.subs = make(map[<-chan T]chan T)
b.terminated = make(chan struct{})
// start handling incoming data from the watch implementation. If data came
// in until now, it will be buffered in `stream`
go b.stream(stream) go b.stream(stream)
b.running = true // unblock any Subscribe/Unsubscribe calls since we are ready to handle them
close(b.started)
return nil return nil
} }
func (b *broadcaster[T]) stream(input chan T) { // stream acts a message broker between the watch implementation that receives a
// raw stream of events and the individual clients watching for those events.
// Thus, we hold the receive side of the watch implementation, and we are
// limited here to receive from it, whereas we are responsible for sending to
// watchers and closing their channels. The responsibility of closing `input`
// (as with any other channel) will always be of the sending side. Hence, the
// watch implementation should do it.
func (b *broadcaster[T]) stream(input <-chan T) {
// make sure we unconditionally cleanup upon return
defer func() {
// prevent new subscriptions and make sure to discard unsubscriptions
close(b.terminated)
// terminate all subscirptions and clean the map
for _, sub := range b.subs {
close(sub)
delete(b.subs, sub)
}
}()
for { for {
select { select {
// context cancelled case <-b.shouldTerminate: // service context cancelled
case <-b.ctx.Done():
close(input)
for sub := range b.subs {
close(sub)
delete(b.subs, sub)
}
b.running = false
return return
// new subscriber
case sub := <-b.subscribe: case sub := <-b.subscribe: // subscribe
// send initial batch of cached items // send initial batch of cached items
err := b.cache.ReadInto(sub) err := b.cache.ReadInto(sub)
if err != nil { if err != nil {
close(sub) close(sub)
continue continue
} }
b.subs[sub] = sub
b.subs[sub] = struct{}{} case recv := <-b.unsubscribe: // unsubscribe
// unsubscribe if sub, ok := b.subs[recv]; ok {
case sub := <-b.unsubscribe:
if _, ok := b.subs[sub]; ok {
close(sub) close(sub)
delete(b.subs, sub) delete(b.subs, sub)
} }
// read item from input
case item, ok := <-input: case item, ok := <-input: // data arrived, send to subscribers
// input closed, drain subscribers and exit // input closed, drain subscribers and exit
if !ok { if !ok {
for sub := range b.subs {
close(sub)
delete(b.subs, sub)
}
b.running = false
return return
} }
b.cache.Add(item) b.cache.Add(item)
for _, sub := range b.subs {
for sub := range b.subs {
select { select {
case sub <- item: case sub <- item:
default: default:

View File

@ -63,6 +63,9 @@ func ProvideSQLEntityServer(db db.EntityDBInterface, tracer tracing.Tracer /*, c
type SqlEntityServer interface { type SqlEntityServer interface {
entity.EntityStoreServer entity.EntityStoreServer
// FIXME: accpet a context.Context in the lifecycle methods, and Stop should
// also return an error.
Init() error Init() error
Stop() Stop()
} }
@ -75,7 +78,6 @@ type sqlEntityServer struct {
broadcaster Broadcaster[*entity.EntityWatchResponse] broadcaster Broadcaster[*entity.EntityWatchResponse]
ctx context.Context // TODO: remove ctx context.Context // TODO: remove
cancel context.CancelFunc cancel context.CancelFunc
stream chan *entity.EntityWatchResponse
tracer trace.Tracer tracer trace.Tracer
once sync.Once once sync.Once
@ -139,9 +141,7 @@ func (s *sqlEntityServer) init() error {
s.dialect = migrator.NewDialect(engine.DriverName()) s.dialect = migrator.NewDialect(engine.DriverName())
// set up the broadcaster // set up the broadcaster
s.broadcaster, err = NewBroadcaster(s.ctx, func(stream chan *entity.EntityWatchResponse) error { s.broadcaster, err = NewBroadcaster(s.ctx, func(stream chan<- *entity.EntityWatchResponse) error {
s.stream = stream
// start the poller // start the poller
go s.poller(stream) go s.poller(stream)
@ -994,13 +994,18 @@ func (s *sqlEntityServer) watchInit(ctx context.Context, r *entity.EntityWatchRe
return lastRv, nil return lastRv, nil
} }
func (s *sqlEntityServer) poller(stream chan *entity.EntityWatchResponse) { func (s *sqlEntityServer) poller(stream chan<- *entity.EntityWatchResponse) {
var err error var err error
// FIXME: we need a way to state startup of server from a (Group, Resource)
// standpoint, and consider that new (Group, Resource) may be added to
// `kind_version`, so we should probably also poll for changes in there
since := int64(0) since := int64(0)
interval := 1 * time.Second interval := 1 * time.Second
t := time.NewTicker(interval) t := time.NewTicker(interval)
defer close(stream)
defer t.Stop() defer t.Stop()
for { for {
@ -1017,7 +1022,7 @@ func (s *sqlEntityServer) poller(stream chan *entity.EntityWatchResponse) {
} }
} }
func (s *sqlEntityServer) poll(since int64, out chan *entity.EntityWatchResponse) (int64, error) { func (s *sqlEntityServer) poll(since int64, out chan<- *entity.EntityWatchResponse) (int64, error) {
ctx, span := s.tracer.Start(s.ctx, "storage_server.poll") ctx, span := s.tracer.Start(s.ctx, "storage_server.poll")
defer span.End() defer span.End()
ctxLogger := s.log.FromContext(log.WithContextualAttributes(ctx, []any{"method", "poll"})) ctxLogger := s.log.FromContext(log.WithContextualAttributes(ctx, []any{"method", "poll"}))
@ -1182,26 +1187,25 @@ func (s *sqlEntityServer) watch(r *entity.EntityWatchRequest, w entity.EntitySto
if err != nil { if err != nil {
return err return err
} }
defer s.broadcaster.Unsubscribe(evts)
stop := make(chan struct{}) stop := make(chan struct{})
since := r.Since since := r.Since
go func() { go func() {
defer close(stop)
for { for {
r, err := w.Recv() r, err := w.Recv()
if errors.Is(err, io.EOF) { if errors.Is(err, io.EOF) {
s.log.Debug("watch client closed stream") s.log.Debug("watch client closed stream")
stop <- struct{}{}
return return
} }
if err != nil { if err != nil {
s.log.Error("error receiving message", "err", err) s.log.Error("error receiving message", "err", err)
stop <- struct{}{}
return return
} }
if r.Action == entity.EntityWatchRequest_STOP { if r.Action == entity.EntityWatchRequest_STOP {
s.log.Debug("watch stop requested") s.log.Debug("watch stop requested")
stop <- struct{}{}
return return
} }
// handle any other message types // handle any other message types
@ -1211,7 +1215,6 @@ func (s *sqlEntityServer) watch(r *entity.EntityWatchRequest, w entity.EntitySto
for { for {
select { select {
// stop signal
case <-stop: case <-stop:
s.log.Debug("watch stopped") s.log.Debug("watch stopped")
return nil return nil