mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Replace AddEventListener with AddEventListenerCtx and Publish with PublishCtx (#42284)
This commit is contained in:
parent
8927a3ca20
commit
a65e0be110
@ -206,7 +206,7 @@ func (hs *HTTPServer) CompleteInvite(c *models.ReqContext) response.Response {
|
||||
return response.Error(500, "failed to create user", err)
|
||||
}
|
||||
|
||||
if err := bus.Publish(&events.SignUpCompleted{
|
||||
if err := bus.PublishCtx(c.Req.Context(), &events.SignUpCompleted{
|
||||
Name: user.NameOrFallback(),
|
||||
Email: user.Email,
|
||||
}); err != nil {
|
||||
|
@ -55,7 +55,7 @@ func SignUp(c *models.ReqContext) response.Response {
|
||||
return response.Error(500, "Failed to create signup", err)
|
||||
}
|
||||
|
||||
if err := bus.Publish(&events.SignUpStarted{
|
||||
if err := bus.PublishCtx(c.Req.Context(), &events.SignUpStarted{
|
||||
Email: form.Email,
|
||||
Code: cmd.Code,
|
||||
}); err != nil {
|
||||
@ -102,7 +102,7 @@ func (hs *HTTPServer) SignUpStep2(c *models.ReqContext) response.Response {
|
||||
}
|
||||
|
||||
// publish signup event
|
||||
if err := bus.Publish(&events.SignUpCompleted{
|
||||
if err := bus.PublishCtx(c.Req.Context(), &events.SignUpCompleted{
|
||||
Email: user.Email,
|
||||
Name: user.NameOrFallback(),
|
||||
}); err != nil {
|
||||
|
@ -29,7 +29,7 @@ type TransactionManager interface {
|
||||
type Bus interface {
|
||||
Dispatch(msg Msg) error
|
||||
DispatchCtx(ctx context.Context, msg Msg) error
|
||||
Publish(msg Msg) error
|
||||
|
||||
PublishCtx(ctx context.Context, msg Msg) error
|
||||
|
||||
// InTransaction starts a transaction and store it in the context.
|
||||
@ -40,7 +40,7 @@ type Bus interface {
|
||||
|
||||
AddHandler(handler HandlerFunc)
|
||||
AddHandlerCtx(handler HandlerFunc)
|
||||
AddEventListener(handler HandlerFunc)
|
||||
|
||||
AddEventListenerCtx(handler HandlerFunc)
|
||||
|
||||
// SetTransactionManager allows the user to replace the internal
|
||||
@ -190,31 +190,6 @@ func (b *InProcBus) PublishCtx(ctx context.Context, msg Msg) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Publish function publish a message to the bus listener.
|
||||
func (b *InProcBus) Publish(msg Msg) error {
|
||||
var msgName = reflect.TypeOf(msg).Elem().Name()
|
||||
var params = []reflect.Value{}
|
||||
if listeners, exists := b.listenersWithCtx[msgName]; exists {
|
||||
params = append(params, reflect.ValueOf(context.Background()))
|
||||
params = append(params, reflect.ValueOf(msg))
|
||||
if setting.Env == setting.Dev {
|
||||
b.logger.Warn("Publish called with message handler registered using AddEventHandlerCtx and should be changed to use PublishCtx", "msgName", msgName)
|
||||
}
|
||||
if err := callListeners(listeners, params); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if listeners, exists := b.listeners[msgName]; exists {
|
||||
params = append(params, reflect.ValueOf(msg))
|
||||
if err := callListeners(listeners, params); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func callListeners(listeners []HandlerFunc, params []reflect.Value) error {
|
||||
for _, listenerHandler := range listeners {
|
||||
ret := reflect.ValueOf(listenerHandler).Call(params)
|
||||
@ -247,16 +222,6 @@ func (b *InProcBus) GetHandlerCtx(name string) HandlerFunc {
|
||||
return b.handlersWithCtx[name]
|
||||
}
|
||||
|
||||
func (b *InProcBus) AddEventListener(handler HandlerFunc) {
|
||||
handlerType := reflect.TypeOf(handler)
|
||||
eventName := handlerType.In(0).Elem().Name()
|
||||
_, exists := b.listeners[eventName]
|
||||
if !exists {
|
||||
b.listeners[eventName] = make([]HandlerFunc, 0)
|
||||
}
|
||||
b.listeners[eventName] = append(b.listeners[eventName], handler)
|
||||
}
|
||||
|
||||
func (b *InProcBus) AddEventListenerCtx(handler HandlerFunc) {
|
||||
handlerType := reflect.TypeOf(handler)
|
||||
eventName := handlerType.In(1).Elem().Name()
|
||||
@ -279,12 +244,6 @@ func AddHandlerCtx(implName string, handler HandlerFunc) {
|
||||
globalBus.AddHandlerCtx(handler)
|
||||
}
|
||||
|
||||
// AddEventListener attaches a handler function to the event listener.
|
||||
// Package level function.
|
||||
func AddEventListener(handler HandlerFunc) {
|
||||
globalBus.AddEventListener(handler)
|
||||
}
|
||||
|
||||
// AddEventListenerCtx attaches a handler function to the event listener.
|
||||
// Package level function.
|
||||
func AddEventListenerCtx(handler HandlerFunc) {
|
||||
@ -299,12 +258,8 @@ func DispatchCtx(ctx context.Context, msg Msg) error {
|
||||
return globalBus.DispatchCtx(ctx, msg)
|
||||
}
|
||||
|
||||
func Publish(msg Msg) error {
|
||||
return globalBus.Publish(msg)
|
||||
}
|
||||
|
||||
func PublishCtx(msg Msg) error {
|
||||
return globalBus.Publish(msg)
|
||||
func PublishCtx(ctx context.Context, msg Msg) error {
|
||||
return globalBus.PublishCtx(ctx, msg)
|
||||
}
|
||||
|
||||
func GetHandlerCtx(name string) HandlerFunc {
|
||||
|
@ -127,12 +127,12 @@ func TestEventPublish(t *testing.T) {
|
||||
|
||||
var invoked bool
|
||||
|
||||
bus.AddEventListener(func(query *testQuery) error {
|
||||
bus.AddEventListenerCtx(func(ctx context.Context, query *testQuery) error {
|
||||
invoked = true
|
||||
return nil
|
||||
})
|
||||
|
||||
err := bus.Publish(&testQuery{})
|
||||
err := bus.PublishCtx(context.Background(), &testQuery{})
|
||||
require.NoError(t, err, "unable to publish event")
|
||||
|
||||
require.True(t, invoked)
|
||||
@ -141,7 +141,7 @@ func TestEventPublish(t *testing.T) {
|
||||
func TestEventPublish_NoRegisteredListener(t *testing.T) {
|
||||
bus := New()
|
||||
|
||||
err := bus.Publish(&testQuery{})
|
||||
err := bus.PublishCtx(context.Background(), &testQuery{})
|
||||
require.NoError(t, err, "unable to publish event")
|
||||
}
|
||||
|
||||
@ -173,7 +173,7 @@ func TestEventPublishCtx(t *testing.T) {
|
||||
|
||||
var invoked bool
|
||||
|
||||
bus.AddEventListener(func(query *testQuery) error {
|
||||
bus.AddEventListenerCtx(func(ctx context.Context, query *testQuery) error {
|
||||
invoked = true
|
||||
return nil
|
||||
})
|
||||
@ -194,7 +194,7 @@ func TestEventCtxPublish(t *testing.T) {
|
||||
return nil
|
||||
})
|
||||
|
||||
err := bus.Publish(&testQuery{})
|
||||
err := bus.PublishCtx(context.Background(), &testQuery{})
|
||||
require.NoError(t, err, "unable to publish event")
|
||||
|
||||
require.True(t, invoked)
|
||||
|
@ -38,8 +38,8 @@ func ProvideService(bus bus.Bus, cfg *setting.Cfg) (*NotificationService, error)
|
||||
ns.Bus.AddHandlerCtx(ns.sendEmailCommandHandlerSync)
|
||||
ns.Bus.AddHandlerCtx(ns.SendWebhookSync)
|
||||
|
||||
ns.Bus.AddEventListener(ns.signUpStartedHandler)
|
||||
ns.Bus.AddEventListener(ns.signUpCompletedHandler)
|
||||
ns.Bus.AddEventListenerCtx(ns.signUpStartedHandler)
|
||||
ns.Bus.AddEventListenerCtx(ns.signUpCompletedHandler)
|
||||
|
||||
mailTemplates = template.New("name")
|
||||
mailTemplates.Funcs(template.FuncMap{
|
||||
@ -186,7 +186,7 @@ func (ns *NotificationService) validateResetPasswordCode(ctx context.Context, qu
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ns *NotificationService) signUpStartedHandler(evt *events.SignUpStarted) error {
|
||||
func (ns *NotificationService) signUpStartedHandler(ctx context.Context, evt *events.SignUpStarted) error {
|
||||
if !setting.VerifyEmailEnabled {
|
||||
return nil
|
||||
}
|
||||
@ -215,7 +215,7 @@ func (ns *NotificationService) signUpStartedHandler(evt *events.SignUpStarted) e
|
||||
return bus.Dispatch(&emailSentCmd)
|
||||
}
|
||||
|
||||
func (ns *NotificationService) signUpCompletedHandler(evt *events.SignUpCompleted) error {
|
||||
func (ns *NotificationService) signUpCompletedHandler(ctx context.Context, evt *events.SignUpCompleted) error {
|
||||
if evt.Email == "" || !ns.Cfg.Smtp.SendWelcomeEmailOnSignUp {
|
||||
return nil
|
||||
}
|
||||
|
@ -68,7 +68,7 @@ func inTransactionWithRetryCtx(ctx context.Context, engine *xorm.Engine, callbac
|
||||
|
||||
if len(sess.events) > 0 {
|
||||
for _, e := range sess.events {
|
||||
if err = bus.Publish(e); err != nil {
|
||||
if err = bus.PublishCtx(ctx, e); err != nil {
|
||||
tsclogger.Error("Failed to publish event after commit.", "error", err)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user