mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
107
pkg/bus/bus.go
107
pkg/bus/bus.go
@@ -21,41 +21,17 @@ type Msg interface{}
|
||||
// ErrHandlerNotFound defines an error if a handler is not found
|
||||
var ErrHandlerNotFound = errors.New("handler not found")
|
||||
|
||||
// TransactionManager defines a transaction interface
|
||||
type TransactionManager interface {
|
||||
InTransaction(ctx context.Context, fn func(ctx context.Context) error) error
|
||||
}
|
||||
|
||||
// Bus type defines the bus interface structure
|
||||
type Bus interface {
|
||||
Dispatch(ctx context.Context, msg Msg) error
|
||||
|
||||
Publish(ctx context.Context, msg Msg) error
|
||||
|
||||
// InTransaction starts a transaction and store it in the context.
|
||||
// The caller can then pass a function with multiple DispatchCtx calls that
|
||||
// all will be executed in the same transaction. InTransaction will rollback if the
|
||||
// callback returns an error.
|
||||
InTransaction(ctx context.Context, fn func(ctx context.Context) error) error
|
||||
|
||||
AddHandler(handler HandlerFunc)
|
||||
|
||||
AddEventListener(handler HandlerFunc)
|
||||
|
||||
// SetTransactionManager allows the user to replace the internal
|
||||
// noop TransactionManager that is responsible for managing
|
||||
// transactions in `InTransaction`
|
||||
SetTransactionManager(tm TransactionManager)
|
||||
}
|
||||
|
||||
// InProcBus defines the bus structure
|
||||
type InProcBus struct {
|
||||
logger log.Logger
|
||||
handlers map[string]HandlerFunc
|
||||
handlersWithCtx map[string]HandlerFunc
|
||||
listeners map[string][]HandlerFunc
|
||||
listenersWithCtx map[string][]HandlerFunc
|
||||
txMng TransactionManager
|
||||
tracer tracing.Tracer
|
||||
}
|
||||
|
||||
@@ -64,11 +40,6 @@ func ProvideBus(tracer tracing.Tracer) *InProcBus {
|
||||
return globalBus
|
||||
}
|
||||
|
||||
// InTransaction defines an in transaction function
|
||||
func (b *InProcBus) InTransaction(ctx context.Context, fn func(ctx context.Context) error) error {
|
||||
return b.txMng.InTransaction(ctx, fn)
|
||||
}
|
||||
|
||||
// temp stuff, not sure how to handle bus instance, and init yet
|
||||
var globalBus = New()
|
||||
|
||||
@@ -76,11 +47,8 @@ var globalBus = New()
|
||||
func New() *InProcBus {
|
||||
bus := &InProcBus{
|
||||
logger: log.New("bus"),
|
||||
handlers: make(map[string]HandlerFunc),
|
||||
handlersWithCtx: make(map[string]HandlerFunc),
|
||||
listeners: make(map[string][]HandlerFunc),
|
||||
listenersWithCtx: make(map[string][]HandlerFunc),
|
||||
txMng: &noopTransactionManager{},
|
||||
}
|
||||
bus.tracer = tracing.InitializeForBus()
|
||||
return bus
|
||||
@@ -91,46 +59,6 @@ func GetBus() Bus {
|
||||
return globalBus
|
||||
}
|
||||
|
||||
// SetTransactionManager function assign a transaction manager to the bus.
|
||||
func (b *InProcBus) SetTransactionManager(tm TransactionManager) {
|
||||
b.txMng = tm
|
||||
}
|
||||
|
||||
// DispatchCtx function dispatch a message to the bus context.
|
||||
func (b *InProcBus) Dispatch(ctx context.Context, msg Msg) error {
|
||||
var msgName = reflect.TypeOf(msg).Elem().Name()
|
||||
|
||||
ctx, span := b.tracer.Start(ctx, "bus - "+msgName)
|
||||
defer span.End()
|
||||
|
||||
span.SetAttributes("msg", msgName, attribute.Key("msg").String(msgName))
|
||||
|
||||
withCtx := true
|
||||
var handler = b.handlersWithCtx[msgName]
|
||||
if handler == nil {
|
||||
withCtx = false
|
||||
handler = b.handlers[msgName]
|
||||
if handler == nil {
|
||||
return ErrHandlerNotFound
|
||||
}
|
||||
}
|
||||
|
||||
var params = []reflect.Value{}
|
||||
if withCtx {
|
||||
params = append(params, reflect.ValueOf(ctx))
|
||||
} else if setting.Env == setting.Dev {
|
||||
b.logger.Warn("DispatchCtx called with message handler registered using AddHandler and should be changed to use AddHandler", "msgName", msgName)
|
||||
}
|
||||
params = append(params, reflect.ValueOf(msg))
|
||||
|
||||
ret := reflect.ValueOf(handler).Call(params)
|
||||
err := ret[0].Interface()
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
return err.(error)
|
||||
}
|
||||
|
||||
// PublishCtx function publish a message to the bus listener.
|
||||
func (b *InProcBus) Publish(ctx context.Context, msg Msg) error {
|
||||
var msgName = reflect.TypeOf(msg).Elem().Name()
|
||||
@@ -176,17 +104,6 @@ func callListeners(listeners []HandlerFunc, params []reflect.Value) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *InProcBus) AddHandler(handler HandlerFunc) {
|
||||
handlerType := reflect.TypeOf(handler)
|
||||
queryTypeName := handlerType.In(1).Elem().Name()
|
||||
b.handlersWithCtx[queryTypeName] = handler
|
||||
}
|
||||
|
||||
// GetHandlerCtx returns the handler function for the given struct name.
|
||||
func (b *InProcBus) GetHandlerCtx(name string) HandlerFunc {
|
||||
return b.handlersWithCtx[name]
|
||||
}
|
||||
|
||||
func (b *InProcBus) AddEventListener(handler HandlerFunc) {
|
||||
handlerType := reflect.TypeOf(handler)
|
||||
eventName := handlerType.In(1).Elem().Name()
|
||||
@@ -197,36 +114,12 @@ func (b *InProcBus) AddEventListener(handler HandlerFunc) {
|
||||
b.listenersWithCtx[eventName] = append(b.listenersWithCtx[eventName], handler)
|
||||
}
|
||||
|
||||
// AddHandler attaches a handler function to the global bus context.
|
||||
// Package level function.
|
||||
func AddHandler(implName string, handler HandlerFunc) {
|
||||
globalBus.AddHandler(handler)
|
||||
}
|
||||
|
||||
// AddEventListenerCtx attaches a handler function to the event listener.
|
||||
// Package level function.
|
||||
func AddEventListener(handler HandlerFunc) {
|
||||
globalBus.AddEventListener(handler)
|
||||
}
|
||||
|
||||
func Dispatch(ctx context.Context, msg Msg) error {
|
||||
return globalBus.Dispatch(ctx, msg)
|
||||
}
|
||||
|
||||
func Publish(ctx context.Context, msg Msg) error {
|
||||
return globalBus.Publish(ctx, msg)
|
||||
}
|
||||
|
||||
func GetHandlerCtx(name string) HandlerFunc {
|
||||
return globalBus.GetHandlerCtx(name)
|
||||
}
|
||||
|
||||
func ClearBusHandlers() {
|
||||
globalBus = New()
|
||||
}
|
||||
|
||||
type noopTransactionManager struct{}
|
||||
|
||||
func (*noopTransactionManager) InTransaction(ctx context.Context, fn func(ctx context.Context) error) error {
|
||||
return fn(ctx)
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ package bus
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
@@ -14,140 +13,6 @@ type testQuery struct {
|
||||
Resp string
|
||||
}
|
||||
|
||||
func TestDispatch(t *testing.T) {
|
||||
bus := New()
|
||||
tracer, err := tracing.InitializeTracerForTest()
|
||||
require.NoError(t, err)
|
||||
bus.tracer = tracer
|
||||
|
||||
var invoked bool
|
||||
|
||||
bus.AddHandler(func(ctx context.Context, query *testQuery) error {
|
||||
invoked = true
|
||||
return nil
|
||||
})
|
||||
|
||||
require.NoError(t, err)
|
||||
err = bus.Dispatch(context.Background(), &testQuery{})
|
||||
require.NoError(t, err)
|
||||
|
||||
require.True(t, invoked, "expected handler to be called")
|
||||
}
|
||||
|
||||
func TestDispatch_NoRegisteredHandler(t *testing.T) {
|
||||
bus := New()
|
||||
tracer, err := tracing.InitializeTracerForTest()
|
||||
require.NoError(t, err)
|
||||
bus.tracer = tracer
|
||||
|
||||
err = bus.Dispatch(context.Background(), &testQuery{})
|
||||
require.Equal(t, err, ErrHandlerNotFound,
|
||||
"expected bus to return HandlerNotFound since no handler is registered")
|
||||
}
|
||||
|
||||
func TestDispatch_ContextHandler(t *testing.T) {
|
||||
bus := New()
|
||||
tracer, err := tracing.InitializeTracerForTest()
|
||||
require.NoError(t, err)
|
||||
bus.tracer = tracer
|
||||
|
||||
var invoked bool
|
||||
|
||||
bus.AddHandler(func(ctx context.Context, query *testQuery) error {
|
||||
invoked = true
|
||||
return nil
|
||||
})
|
||||
|
||||
err = bus.Dispatch(context.Background(), &testQuery{})
|
||||
require.NoError(t, err)
|
||||
|
||||
require.True(t, invoked, "expected handler to be called")
|
||||
}
|
||||
|
||||
func TestDispatchCtx(t *testing.T) {
|
||||
bus := New()
|
||||
tracer, err := tracing.InitializeTracerForTest()
|
||||
require.NoError(t, err)
|
||||
bus.tracer = tracer
|
||||
|
||||
var invoked bool
|
||||
|
||||
bus.AddHandler(func(ctx context.Context, query *testQuery) error {
|
||||
invoked = true
|
||||
return nil
|
||||
})
|
||||
|
||||
err = bus.Dispatch(context.Background(), &testQuery{})
|
||||
require.NoError(t, err)
|
||||
|
||||
require.True(t, invoked, "expected handler to be called")
|
||||
}
|
||||
|
||||
func TestDispatchCtx_NoContextHandler(t *testing.T) {
|
||||
bus := New()
|
||||
tracer, err := tracing.InitializeTracerForTest()
|
||||
require.NoError(t, err)
|
||||
bus.tracer = tracer
|
||||
|
||||
var invoked bool
|
||||
|
||||
bus.AddHandler(func(ctx context.Context, query *testQuery) error {
|
||||
invoked = true
|
||||
return nil
|
||||
})
|
||||
|
||||
err = bus.Dispatch(context.Background(), &testQuery{})
|
||||
require.NoError(t, err)
|
||||
|
||||
require.True(t, invoked, "expected handler to be called")
|
||||
}
|
||||
|
||||
func TestDispatchCtx_NoRegisteredHandler(t *testing.T) {
|
||||
bus := New()
|
||||
tracer, err := tracing.InitializeTracerForTest()
|
||||
require.NoError(t, err)
|
||||
bus.tracer = tracer
|
||||
|
||||
err = bus.Dispatch(context.Background(), &testQuery{})
|
||||
require.Equal(t, err, ErrHandlerNotFound,
|
||||
"expected bus to return HandlerNotFound since no handler is registered")
|
||||
}
|
||||
|
||||
func TestQuery(t *testing.T) {
|
||||
bus := New()
|
||||
tracer, err := tracing.InitializeTracerForTest()
|
||||
require.NoError(t, err)
|
||||
bus.tracer = tracer
|
||||
|
||||
want := "hello from handler"
|
||||
|
||||
bus.AddHandler(func(ctx context.Context, q *testQuery) error {
|
||||
q.Resp = want
|
||||
return nil
|
||||
})
|
||||
|
||||
q := &testQuery{}
|
||||
|
||||
err = bus.Dispatch(context.Background(), q)
|
||||
require.NoError(t, err, "unable to dispatch query")
|
||||
|
||||
require.Equal(t, want, q.Resp)
|
||||
}
|
||||
|
||||
func TestQuery_HandlerReturnsError(t *testing.T) {
|
||||
bus := New()
|
||||
tracer, err := tracing.InitializeTracerForTest()
|
||||
require.NoError(t, err)
|
||||
bus.tracer = tracer
|
||||
|
||||
bus.AddHandler(func(ctx context.Context, query *testQuery) error {
|
||||
return errors.New("handler error")
|
||||
})
|
||||
|
||||
err = bus.Dispatch(context.Background(), &testQuery{})
|
||||
require.Error(t, err, "expected error but got none")
|
||||
}
|
||||
|
||||
func TestEventPublish(t *testing.T) {
|
||||
bus := New()
|
||||
tracer, err := tracing.InitializeTracerForTest()
|
||||
|
||||
Reference in New Issue
Block a user