mirror of
https://github.com/grafana/grafana.git
synced 2025-01-24 23:37:01 -06:00
365 lines
11 KiB
Go
365 lines
11 KiB
Go
package resource
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
)
|
|
|
|
// Please, when reviewing or working on this file have the following cheat-sheet
|
|
// in mind:
|
|
// 1. A channel type in Go has one of three directions: send-only (chan<- T),
|
|
// receive-only (<-chan T) or bidirctional (chan T). Each of them are a
|
|
// different type. A bidirectional type can be converted to any of the other
|
|
// two types and is automatic, any other conversion attempt results in a
|
|
// panic.
|
|
// 2. There are three operations you can do on a channel: send, receive and
|
|
// close. Availability of operation for each channel direction:
|
|
// | Channel direction
|
|
// Operation | Receive-only | Send-only | Bidirectional
|
|
// ----------+--------------+------------+--------------
|
|
// Receive | Yes | No (panic) | Yes
|
|
// Send | No (panic) | Yes | Yes
|
|
// Close | No (panic) | Yes | Yes
|
|
// 3. A channel of any type also has one of three states: nil (zero value),
|
|
// closed, or open (technically called "non-nil, not-closed channel",
|
|
// created with the `make` builtin). Nil and closed channels are also
|
|
// useful, but you have to know and care for how you use them. Outcome of
|
|
// each operation on a channel depending on its state, assuming the
|
|
// operation is available to the channel given its direction:
|
|
// | Channel state
|
|
// Operation | Nil | Closed | Open
|
|
// ----------+---------------+---------------+------------------
|
|
// Receive | Block forever | Block forever | Receive/Block until receive
|
|
// Send | Block forever | Panic | Send/Block until send
|
|
// Close | Panic | Panic | Close the channel
|
|
// 4. A `select` statement has zero or more `case` branches, each one of them
|
|
// containing either a send or a receive channel operation. A `select` with
|
|
// no branches blocks forever. At most one branch will be executed, which
|
|
// means it behaves similar to a `switch`. If more than one branch can be
|
|
// executed then one of them is picked AT RANDOM (i.e. not the one first in
|
|
// the list). A `select` statement can also have a (single and optional)
|
|
// `default` branch that is executed if all the other branches are
|
|
// operations that are blocked at the time the `select` statement is
|
|
// reached. This means that having a `default` branch causes the `select`
|
|
// statement to never block.
|
|
// 5. A receive operation on a closed channel never blocks (as said before),
|
|
// but it will always yield a zero value. As it is also valid to send a zero
|
|
// value to the channel, you can receive from channels in two forms:
|
|
// v := <-c // get a zero value if closed
|
|
// v2, ok := <-c // `ok` is set to false iif the channel is closed
|
|
// 6. The `make` builtin is used to create open channels (and is the only way
|
|
// to get them). It has an optional second parameter to specify the amount
|
|
// of items that can buffered. After that, a send operation will block
|
|
// waiting for another goroutine to receive from it (which would make room
|
|
// for the new item). When the second argument is not passed to `make`, then
|
|
// all operations are fully synchronized, meaning that a send will block
|
|
// until a receive in another goroutine is performed, and vice versa. Less
|
|
// interestingly, `make` can also create send-only or receive-only channel.
|
|
//
|
|
// The sources are the Go Specs, Effective Go and Go 101, which are already
|
|
// linked in the contributing guide for the backend or elsewhere in Grafana, but
|
|
// this file exploits so many of these subtleties that it's worth keeping a
|
|
// refresher about them at all times. The above is unlikely to change in the
|
|
// foreseeable future, so it's zero maintenance as well. We exclude patterns for
|
|
// using channels and other concurrency patterns since that's a way longer
|
|
// topic for a refresher.
|
|
|
|
// ConnectFunc is used to initialize the watch implementation. It should do very
|
|
// basic work and checks and it has the chance to return an error. After that,
|
|
// it should fork to a different goroutine with the provided channel and send to
|
|
// it all the new events from the backing database. It is also responsible for
|
|
// closing the provided channel under all circumstances, included returning an
|
|
// error. The caller of this function will only receive from this channel (i.e.
|
|
// it is guaranteed to never send to it or close it), hence providing a safe
|
|
// separation of concerns and preventing panics.
|
|
//
|
|
// FIXME: this signature suffers from inversion of control. It would also be
|
|
// much simpler if NewBroadcaster receives a context.Context and a <-chan T
|
|
// instead. That would also reduce the scope of the broadcaster to only
|
|
// broadcast to subscribers what it receives on the provided <-chan T. The
|
|
// context.Context is still needed to provide additional values in case we want
|
|
// to add observability into the broadcaster, which we want. The broadcaster
|
|
// should still terminate on either the context being done or the provided
|
|
// channel being closed.
|
|
type ConnectFunc[T any] func(chan<- T) error
|
|
|
|
type Broadcaster[T any] interface {
|
|
Subscribe(context.Context) (<-chan T, error)
|
|
Unsubscribe(<-chan T)
|
|
}
|
|
|
|
func NewBroadcaster[T any](ctx context.Context, connect ConnectFunc[T]) (Broadcaster[T], error) {
|
|
b := &broadcaster[T]{
|
|
started: make(chan struct{}),
|
|
}
|
|
err := b.init(ctx, connect)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return b, nil
|
|
}
|
|
|
|
type broadcaster[T any] struct {
|
|
// lifecycle management
|
|
|
|
started, terminated chan struct{}
|
|
shouldTerminate <-chan struct{}
|
|
|
|
// subscription management
|
|
|
|
cache channelCache[T]
|
|
subscribe chan chan T
|
|
unsubscribe chan (<-chan T)
|
|
subs map[<-chan T]chan T
|
|
}
|
|
|
|
func (b *broadcaster[T]) Subscribe(ctx context.Context) (<-chan T, error) {
|
|
select {
|
|
case <-ctx.Done(): // client canceled
|
|
return nil, ctx.Err()
|
|
case <-b.started: // wait for broadcaster to start
|
|
}
|
|
|
|
// create the subscription
|
|
sub := make(chan T, 100)
|
|
|
|
select {
|
|
case <-ctx.Done(): // client canceled
|
|
return nil, ctx.Err()
|
|
case <-b.terminated: // no more data
|
|
return nil, io.EOF
|
|
case b.subscribe <- sub: // success submitting subscription
|
|
return sub, nil
|
|
}
|
|
}
|
|
|
|
func (b *broadcaster[T]) Unsubscribe(sub <-chan T) {
|
|
// wait for broadcaster to start. In practice, the only way to reach
|
|
// Unsubscribe is by first having called Subscribe, which means we have
|
|
// already started. But a malfunctioning caller may call Unsubscribe freely,
|
|
// which would cause us to block forever the goroutine of the caller when
|
|
// trying to send to a nil `b.unsubscribe` or receive from a nil
|
|
// `b.terminated` if we haven't yet initialized those values. This would
|
|
// mean leaking that malfunctioninig caller's goroutine, so we rather make
|
|
// Unsubscribe safe in any possible case
|
|
if sub == nil {
|
|
return
|
|
}
|
|
<-b.started // wait for broadcaster to start
|
|
|
|
select {
|
|
case b.unsubscribe <- sub: // success submitting unsubscription
|
|
case <-b.terminated: // broadcaster terminated, nothing to do
|
|
}
|
|
}
|
|
|
|
// init initializes the broadcaster. It should not be run more than once.
|
|
func (b *broadcaster[T]) init(ctx context.Context, connect ConnectFunc[T]) error {
|
|
// create the stream that will connect us with the watch implementation and
|
|
// send it to them so they initialize and start sending data
|
|
stream := make(chan T, 100)
|
|
if err := connect(stream); err != nil {
|
|
return err
|
|
}
|
|
|
|
// initialize our internal state
|
|
b.shouldTerminate = ctx.Done()
|
|
b.cache = newChannelCache[T](ctx, 100)
|
|
b.subscribe = make(chan chan T, 100)
|
|
b.unsubscribe = make(chan (<-chan T), 100)
|
|
b.subs = make(map[<-chan T]chan T)
|
|
b.terminated = make(chan struct{})
|
|
|
|
// start handling incoming data from the watch implementation. If data came
|
|
// in until now, it will be buffered in `stream`
|
|
go b.stream(stream)
|
|
|
|
// unblock any Subscribe/Unsubscribe calls since we are ready to handle them
|
|
close(b.started)
|
|
|
|
return nil
|
|
}
|
|
|
|
// stream acts a message broker between the watch implementation that receives a
|
|
// raw stream of events and the individual clients watching for those events.
|
|
// Thus, we hold the receive side of the watch implementation, and we are
|
|
// limited here to receive from it, whereas we are responsible for sending to
|
|
// watchers and closing their channels. The responsibility of closing `input`
|
|
// (as with any other channel) will always be of the sending side. Hence, the
|
|
// watch implementation should do it.
|
|
func (b *broadcaster[T]) stream(input <-chan T) {
|
|
// make sure we unconditionally cleanup upon return
|
|
defer func() {
|
|
// prevent new subscriptions and make sure to discard unsubscriptions
|
|
close(b.terminated)
|
|
// terminate all subscirptions and clean the map
|
|
for _, sub := range b.subs {
|
|
close(sub)
|
|
delete(b.subs, sub)
|
|
}
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-b.shouldTerminate: // service context cancelled
|
|
return
|
|
|
|
case sub := <-b.subscribe: // subscribe
|
|
// send initial batch of cached items
|
|
err := b.cache.ReadInto(sub)
|
|
if err != nil {
|
|
close(sub)
|
|
continue
|
|
}
|
|
b.subs[sub] = sub
|
|
|
|
case recv := <-b.unsubscribe: // unsubscribe
|
|
if sub, ok := b.subs[recv]; ok {
|
|
close(sub)
|
|
delete(b.subs, sub)
|
|
}
|
|
|
|
case item, ok := <-input: // data arrived, send to subscribers
|
|
// input closed, drain subscribers and exit
|
|
if !ok {
|
|
return
|
|
}
|
|
b.cache.Add(item)
|
|
for _, sub := range b.subs {
|
|
select {
|
|
case sub <- item:
|
|
default:
|
|
// Slow consumer, drop
|
|
b.unsubscribe <- sub
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
const defaultCacheSize = 100
|
|
|
|
type channelCache[T any] interface {
|
|
Len() int
|
|
Add(item T)
|
|
Get(i int) T
|
|
Range(f func(T) error) error
|
|
Slice() []T
|
|
ReadInto(dst chan T) error
|
|
}
|
|
|
|
type cache[T any] struct {
|
|
cache []T
|
|
size int
|
|
cacheZero int
|
|
cacheLen int
|
|
add chan T
|
|
read chan chan T
|
|
ctx context.Context
|
|
}
|
|
|
|
func newChannelCache[T any](ctx context.Context, size int) channelCache[T] {
|
|
c := &cache[T]{}
|
|
|
|
c.ctx = ctx
|
|
if size <= 0 {
|
|
size = defaultCacheSize
|
|
}
|
|
c.size = size
|
|
c.cache = make([]T, c.size)
|
|
|
|
c.add = make(chan T)
|
|
c.read = make(chan chan T)
|
|
|
|
go c.run()
|
|
|
|
return c
|
|
}
|
|
|
|
func (c *cache[T]) Len() int {
|
|
return c.cacheLen
|
|
}
|
|
|
|
func (c *cache[T]) Add(item T) {
|
|
c.add <- item
|
|
}
|
|
|
|
func (c *cache[T]) run() {
|
|
for {
|
|
select {
|
|
case <-c.ctx.Done():
|
|
return
|
|
case item := <-c.add:
|
|
i := (c.cacheZero + c.cacheLen) % len(c.cache)
|
|
c.cache[i] = item
|
|
if c.cacheLen < len(c.cache) {
|
|
c.cacheLen++
|
|
} else {
|
|
c.cacheZero = (c.cacheZero + 1) % len(c.cache)
|
|
}
|
|
case r := <-c.read:
|
|
read:
|
|
for i := 0; i < c.cacheLen; i++ {
|
|
select {
|
|
case r <- c.cache[(c.cacheZero+i)%len(c.cache)]:
|
|
// don't wait for slow consumers
|
|
default:
|
|
break read
|
|
}
|
|
}
|
|
close(r)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *cache[T]) Get(i int) T {
|
|
r := make(chan T, c.size)
|
|
c.read <- r
|
|
idx := 0
|
|
for item := range r {
|
|
if idx == i {
|
|
return item
|
|
}
|
|
idx++
|
|
}
|
|
var zero T
|
|
return zero
|
|
}
|
|
|
|
func (c *cache[T]) Range(f func(T) error) error {
|
|
r := make(chan T, c.size)
|
|
c.read <- r
|
|
for item := range r {
|
|
err := f(item)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *cache[T]) Slice() []T {
|
|
s := make([]T, 0, c.size)
|
|
r := make(chan T, c.size)
|
|
c.read <- r
|
|
for item := range r {
|
|
s = append(s, item)
|
|
}
|
|
return s
|
|
}
|
|
|
|
func (c *cache[T]) ReadInto(dst chan T) error {
|
|
r := make(chan T, c.size)
|
|
c.read <- r
|
|
for item := range r {
|
|
select {
|
|
case dst <- item:
|
|
default:
|
|
return fmt.Errorf("slow consumer")
|
|
}
|
|
}
|
|
return nil
|
|
}
|