Add context for Publish and AddEventListener (#41665)

* Add context for Publish and AddEventListener

* Add test and listenersWithCtx

* Refactor listener logic

* Refactor publish logic for all combination of listeners and publish with and without ctx
This commit is contained in:
idafurjes 2021-11-19 13:56:42 +01:00 committed by GitHub
parent 35c2c95fdc
commit 972764cf8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 143 additions and 16 deletions

View File

@ -30,6 +30,7 @@ type Bus interface {
Dispatch(msg Msg) error
DispatchCtx(ctx context.Context, msg Msg) error
Publish(msg Msg) error
PublishCtx(ctx context.Context, msg Msg) error
// InTransaction starts a transaction and store it in the context.
// The caller can then pass a function with multiple DispatchCtx calls that
@ -40,6 +41,7 @@ type Bus interface {
AddHandler(handler HandlerFunc)
AddHandlerCtx(handler HandlerFunc)
AddEventListener(handler HandlerFunc)
AddEventListenerCtx(handler HandlerFunc)
// SetTransactionManager allows the user to replace the internal
// noop TransactionManager that is responsible for managing
@ -49,11 +51,12 @@ type Bus interface {
// InProcBus defines the bus structure
type InProcBus struct {
logger log.Logger
handlers map[string]HandlerFunc
handlersWithCtx map[string]HandlerFunc
listeners map[string][]HandlerFunc
txMng TransactionManager
logger log.Logger
handlers map[string]HandlerFunc
handlersWithCtx map[string]HandlerFunc
listeners map[string][]HandlerFunc
listenersWithCtx map[string][]HandlerFunc
txMng TransactionManager
}
func ProvideBus() *InProcBus {
@ -71,11 +74,12 @@ var globalBus = New()
// New initialize the bus
func New() *InProcBus {
return &InProcBus{
logger: log.New("bus"),
handlers: make(map[string]HandlerFunc),
handlersWithCtx: make(map[string]HandlerFunc),
listeners: make(map[string][]HandlerFunc),
txMng: &noopTransactionManager{},
logger: log.New("bus"),
handlers: make(map[string]HandlerFunc),
handlersWithCtx: make(map[string]HandlerFunc),
listeners: make(map[string][]HandlerFunc),
listenersWithCtx: make(map[string][]HandlerFunc),
txMng: &noopTransactionManager{},
}
}
@ -155,14 +159,63 @@ func (b *InProcBus) Dispatch(msg Msg) error {
return err.(error)
}
// PublishCtx function publish a message to the bus listener.
func (b *InProcBus) PublishCtx(ctx context.Context, msg Msg) error {
var msgName = reflect.TypeOf(msg).Elem().Name()
var params = []reflect.Value{}
if listeners, exists := b.listenersWithCtx[msgName]; exists {
params = append(params, reflect.ValueOf(ctx))
params = append(params, reflect.ValueOf(msg))
if err := checkListeners(listeners, params); err != nil {
return err
}
}
if listeners, exists := b.listeners[msgName]; exists {
params = append(params, reflect.ValueOf(msg))
if setting.Env == setting.Dev {
b.logger.Warn("PublishCtx called with message listener registered using AddEventListener and should be changed to use AddEventListenerCtx", "msgName", msgName)
}
if err := checkListeners(listeners, params); err != nil {
return err
}
}
span, _ := opentracing.StartSpanFromContext(ctx, "bus - "+msgName)
defer span.Finish()
span.SetTag("msg", msgName)
return nil
}
// Publish function publish a message to the bus listener.
func (b *InProcBus) Publish(msg Msg) error {
var msgName = reflect.TypeOf(msg).Elem().Name()
var listeners = b.listeners[msgName]
var params = []reflect.Value{}
if listeners, exists := b.listenersWithCtx[msgName]; exists {
params = append(params, reflect.ValueOf(context.Background()))
params = append(params, reflect.ValueOf(msg))
if setting.Env == setting.Dev {
b.logger.Warn("Publish called with message handler registered using AddEventHandlerCtx and should be changed to use PublishCtx", "msgName", msgName)
}
if err := checkListeners(listeners, params); err != nil {
return err
}
}
var params = make([]reflect.Value, 1)
params[0] = reflect.ValueOf(msg)
if listeners, exists := b.listeners[msgName]; exists {
params = append(params, reflect.ValueOf(msg))
if err := checkListeners(listeners, params); err != nil {
return err
}
}
return nil
}
func checkListeners(listeners []HandlerFunc, params []reflect.Value) error {
for _, listenerHandler := range listeners {
ret := reflect.ValueOf(listenerHandler).Call(params)
e := ret[0].Interface()
@ -174,7 +227,6 @@ func (b *InProcBus) Publish(msg Msg) error {
return fmt.Errorf("expected listener to return an error, got '%T'", e)
}
}
return nil
}
@ -205,6 +257,16 @@ func (b *InProcBus) AddEventListener(handler HandlerFunc) {
b.listeners[eventName] = append(b.listeners[eventName], handler)
}
func (b *InProcBus) AddEventListenerCtx(handler HandlerFunc) {
handlerType := reflect.TypeOf(handler)
eventName := handlerType.In(1).Elem().Name()
_, exists := b.listenersWithCtx[eventName]
if !exists {
b.listenersWithCtx[eventName] = make([]HandlerFunc, 0)
}
b.listenersWithCtx[eventName] = append(b.listenersWithCtx[eventName], handler)
}
// AddHandler attaches a handler function to the global bus.
// Package level function.
func AddHandler(implName string, handler HandlerFunc) {
@ -223,6 +285,12 @@ func AddEventListener(handler HandlerFunc) {
globalBus.AddEventListener(handler)
}
// AddEventListenerCtx attaches a handler function to the event listener.
// Package level function.
func AddEventListenerCtx(handler HandlerFunc) {
globalBus.AddEventListenerCtx(handler)
}
func Dispatch(msg Msg) error {
return globalBus.Dispatch(msg)
}
@ -235,6 +303,10 @@ func Publish(msg Msg) error {
return globalBus.Publish(msg)
}
func PublishCtx(msg Msg) error {
return globalBus.Publish(msg)
}
func GetHandlerCtx(name string) HandlerFunc {
return globalBus.GetHandlerCtx(name)
}

View File

@ -122,7 +122,7 @@ func TestQuery_HandlerReturnsError(t *testing.T) {
require.Error(t, err, "expected error but got none")
}
func TestEvent(t *testing.T) {
func TestEventPublish(t *testing.T) {
bus := New()
var invoked bool
@ -138,9 +138,64 @@ func TestEvent(t *testing.T) {
require.True(t, invoked)
}
func TestEvent_NoRegisteredListener(t *testing.T) {
func TestEventPublish_NoRegisteredListener(t *testing.T) {
bus := New()
err := bus.Publish(&testQuery{})
require.NoError(t, err, "unable to publish event")
}
func TestEventCtxPublishCtx(t *testing.T) {
bus := New()
var invoked bool
bus.AddEventListenerCtx(func(ctx context.Context, query *testQuery) error {
invoked = true
return nil
})
err := bus.PublishCtx(context.Background(), &testQuery{})
require.NoError(t, err, "unable to publish event")
require.True(t, invoked)
}
func TestEventPublishCtx_NoRegisteredListener(t *testing.T) {
bus := New()
err := bus.PublishCtx(context.Background(), &testQuery{})
require.NoError(t, err, "unable to publish event")
}
func TestEventPublishCtx(t *testing.T) {
bus := New()
var invoked bool
bus.AddEventListener(func(query *testQuery) error {
invoked = true
return nil
})
err := bus.PublishCtx(context.Background(), &testQuery{})
require.NoError(t, err, "unable to publish event")
require.True(t, invoked)
}
func TestEventCtxPublish(t *testing.T) {
bus := New()
var invoked bool
bus.AddEventListenerCtx(func(ctx context.Context, query *testQuery) error {
invoked = true
return nil
})
err := bus.Publish(&testQuery{})
require.NoError(t, err, "unable to publish event")
require.True(t, invoked)
}