2014-11-28 22:16:49 +01:00
package bus
import (
2016-10-03 09:38:03 +02:00
"context"
2018-04-27 13:41:58 +02:00
"errors"
2020-11-17 11:27:45 +01:00
"fmt"
2014-11-28 22:16:49 +01:00
"reflect"
2020-11-27 14:58:45 +01:00
2021-07-15 14:33:38 +02:00
"github.com/grafana/grafana/pkg/infra/log"
2022-01-20 11:10:12 +01:00
"github.com/grafana/grafana/pkg/infra/tracing"
2021-07-15 14:33:38 +02:00
"github.com/grafana/grafana/pkg/setting"
2022-01-20 11:10:12 +01:00
"go.opentelemetry.io/otel/attribute"
2014-11-28 22:16:49 +01:00
)
2019-06-04 22:00:05 +02:00
// HandlerFunc defines a handler function interface.
2014-12-16 16:45:07 +01:00
type HandlerFunc interface { }
2019-06-04 22:00:05 +02:00
// Msg defines a message interface.
2014-12-16 16:45:07 +01:00
type Msg interface { }
2014-11-28 22:16:49 +01:00
2019-06-04 22:00:05 +02:00
// ErrHandlerNotFound defines an error if a handler is not found
2018-04-27 13:41:58 +02:00
var ErrHandlerNotFound = errors . New ( "handler not found" )
2019-06-04 22:00:05 +02:00
// TransactionManager defines a transaction interface
2018-06-07 12:54:36 -07:00
type TransactionManager interface {
InTransaction ( ctx context . Context , fn func ( ctx context . Context ) error ) error
2018-06-05 21:13:53 +02:00
}
2019-06-04 22:00:05 +02:00
// Bus type defines the bus interface structure
2014-12-01 08:56:03 -08:00
type Bus interface {
2021-12-28 17:36:22 +01:00
Dispatch ( ctx context . Context , msg Msg ) error
2021-11-29 14:23:24 +01:00
2022-01-04 09:36:01 +01:00
Publish ( ctx context . Context , msg Msg ) error
2015-02-04 15:37:26 +01:00
2018-06-05 21:13:53 +02:00
// InTransaction starts a transaction and store it in the context.
// The caller can then pass a function with multiple DispatchCtx calls that
// all will be executed in the same transaction. InTransaction will rollback if the
2018-06-07 18:02:28 +02:00
// callback returns an error.
2018-06-05 21:13:53 +02:00
InTransaction ( ctx context . Context , fn func ( ctx context . Context ) error ) error
2021-12-28 16:08:07 +01:00
AddHandler ( handler HandlerFunc )
2021-11-29 14:23:24 +01:00
2022-01-04 09:36:01 +01:00
AddEventListener ( handler HandlerFunc )
2018-06-05 21:13:53 +02:00
// SetTransactionManager allows the user to replace the internal
2020-06-01 17:11:25 +02:00
// noop TransactionManager that is responsible for managing
2018-06-05 21:13:53 +02:00
// transactions in `InTransaction`
2018-06-07 12:54:36 -07:00
SetTransactionManager ( tm TransactionManager )
2018-06-05 21:13:53 +02:00
}
2019-06-04 22:00:05 +02:00
// InProcBus defines the bus structure
2014-12-01 08:56:03 -08:00
type InProcBus struct {
2021-11-19 13:56:42 +01:00
logger log . Logger
handlers map [ string ] HandlerFunc
handlersWithCtx map [ string ] HandlerFunc
listeners map [ string ] [ ] HandlerFunc
listenersWithCtx map [ string ] [ ] HandlerFunc
txMng TransactionManager
2022-01-20 11:10:12 +01:00
tracer tracing . Tracer
2014-12-01 08:56:03 -08:00
}
2014-11-28 22:16:49 +01:00
2022-01-20 11:10:12 +01:00
func ProvideBus ( tracer tracing . Tracer ) * InProcBus {
globalBus . tracer = tracer
2021-08-25 15:11:22 +02:00
return globalBus
}
// InTransaction defines an in transaction function
func ( b * InProcBus ) InTransaction ( ctx context . Context , fn func ( ctx context . Context ) error ) error {
return b . txMng . InTransaction ( ctx , fn )
}
2014-12-16 12:04:08 +01:00
// temp stuff, not sure how to handle bus instance, and init yet
var globalBus = New ( )
2019-06-04 22:00:05 +02:00
// New initialize the bus
2021-08-25 15:11:22 +02:00
func New ( ) * InProcBus {
2022-01-20 11:10:12 +01:00
bus := & InProcBus {
2021-11-19 13:56:42 +01:00
logger : log . New ( "bus" ) ,
handlers : make ( map [ string ] HandlerFunc ) ,
handlersWithCtx : make ( map [ string ] HandlerFunc ) ,
listeners : make ( map [ string ] [ ] HandlerFunc ) ,
listenersWithCtx : make ( map [ string ] [ ] HandlerFunc ) ,
txMng : & noopTransactionManager { } ,
2021-07-15 14:33:38 +02:00
}
2022-01-20 11:10:12 +01:00
bus . tracer = tracing . InitializeForBus ( )
return bus
2014-11-28 22:16:49 +01:00
}
2018-04-27 13:41:58 +02:00
// Want to get rid of global bus
func GetBus ( ) Bus {
return globalBus
}
2019-06-04 22:00:05 +02:00
// SetTransactionManager function assign a transaction manager to the bus.
2018-06-07 12:54:36 -07:00
func ( b * InProcBus ) SetTransactionManager ( tm TransactionManager ) {
b . txMng = tm
2018-06-05 21:13:53 +02:00
}
2019-06-04 22:00:05 +02:00
// DispatchCtx function dispatch a message to the bus context.
2021-12-28 17:36:22 +01:00
func ( b * InProcBus ) Dispatch ( ctx context . Context , msg Msg ) error {
2016-10-03 09:38:03 +02:00
var msgName = reflect . TypeOf ( msg ) . Elem ( ) . Name ( )
2022-01-20 11:10:12 +01:00
ctx , span := b . tracer . Start ( ctx , "bus - " + msgName )
defer span . End ( )
2020-11-27 14:58:45 +01:00
2022-01-20 11:10:12 +01:00
span . SetAttributes ( "msg" , msgName , attribute . Key ( "msg" ) . String ( msgName ) )
2020-11-27 14:58:45 +01:00
2021-07-15 14:33:38 +02:00
withCtx := true
2018-06-08 10:51:27 +02:00
var handler = b . handlersWithCtx [ msgName ]
2016-10-03 09:38:03 +02:00
if handler == nil {
2021-07-15 14:33:38 +02:00
withCtx = false
handler = b . handlers [ msgName ]
if handler == nil {
return ErrHandlerNotFound
}
2016-10-03 09:38:03 +02:00
}
2018-06-08 10:51:27 +02:00
var params = [ ] reflect . Value { }
2021-07-15 14:33:38 +02:00
if withCtx {
params = append ( params , reflect . ValueOf ( ctx ) )
} else if setting . Env == setting . Dev {
2021-12-28 16:08:07 +01:00
b . logger . Warn ( "DispatchCtx called with message handler registered using AddHandler and should be changed to use AddHandler" , "msgName" , msgName )
2021-07-15 14:33:38 +02:00
}
2018-06-08 10:51:27 +02:00
params = append ( params , reflect . ValueOf ( msg ) )
2016-10-03 09:38:03 +02:00
ret := reflect . ValueOf ( handler ) . Call ( params )
err := ret [ 0 ] . Interface ( )
if err == nil {
return nil
}
Outdent code after if block that ends with return (golint)
This commit fixes the following golint warnings:
pkg/bus/bus.go:64:9: if block ends with a return statement, so drop this else and outdent its block
pkg/bus/bus.go:84:9: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:137:10: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:177:9: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:183:10: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:199:9: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:208:9: if block ends with a return statement, so drop this else and outdent its block (move short variable declaration to its own line if necessary)
pkg/components/dynmap/dynmap.go:236:9: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:242:10: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:257:9: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:263:10: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:278:9: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:284:10: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:299:9: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:331:9: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:350:9: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:356:10: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:366:12: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:390:9: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:396:10: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:405:12: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:427:9: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:433:10: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:442:12: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:459:9: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:465:10: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:474:12: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:491:9: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:497:10: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:506:12: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:523:9: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:529:10: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:538:12: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:555:9: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:561:10: if block ends with a return statement, so drop this else and outdent its block
pkg/components/dynmap/dynmap.go:570:12: if block ends with a return statement, so drop this else and outdent its block
pkg/login/ldap.go:55:11: if block ends with a return statement, so drop this else and outdent its block (move short variable declaration to its own line if necessary)
pkg/login/ldap_test.go:372:10: if block ends with a return statement, so drop this else and outdent its block
pkg/middleware/middleware_test.go:213:12: if block ends with a return statement, so drop this else and outdent its block
pkg/plugins/dashboard_importer.go:153:11: if block ends with a return statement, so drop this else and outdent its block (move short variable declaration to its own line if necessary)
pkg/plugins/dashboards_updater.go:39:9: if block ends with a return statement, so drop this else and outdent its block (move short variable declaration to its own line if necessary)
pkg/plugins/dashboards_updater.go:121:10: if block ends with a return statement, so drop this else and outdent its block (move short variable declaration to its own line if necessary)
pkg/plugins/plugins.go:210:9: if block ends with a return statement, so drop this else and outdent its block (move short variable declaration to its own line if necessary)
pkg/plugins/plugins.go:235:9: if block ends with a return statement, so drop this else and outdent its block (move short variable declaration to its own line if necessary)
pkg/services/alerting/eval_context.go:111:9: if block ends with a return statement, so drop this else and outdent its block (move short variable declaration to its own line if necessary)
pkg/services/alerting/notifier.go:92:9: if block ends with a return statement, so drop this else and outdent its block (move short variable declaration to its own line if necessary)
pkg/services/alerting/notifier.go:98:9: if block ends with a return statement, so drop this else and outdent its block (move short variable declaration to its own line if necessary)
pkg/services/alerting/notifier.go:122:10: if block ends with a return statement, so drop this else and outdent its block (move short variable declaration to its own line if necessary)
pkg/services/alerting/rule.go:108:10: if block ends with a return statement, so drop this else and outdent its block (move short variable declaration to its own line if necessary)
pkg/services/alerting/rule.go:118:10: if block ends with a return statement, so drop this else and outdent its block (move short variable declaration to its own line if necessary)
pkg/services/alerting/rule.go:121:11: if block ends with a return statement, so drop this else and outdent its block (move short variable declaration to its own line if necessary)
pkg/services/alerting/notifiers/telegram.go:94:10: if block ends with a return statement, so drop this else and outdent its block
pkg/services/sqlstore/annotation.go:34:11: if block ends with a return statement, so drop this else and outdent its block (move short variable declaration to its own line if necessary)
pkg/services/sqlstore/annotation.go:99:11: if block ends with a return statement, so drop this else and outdent its block (move short variable declaration to its own line if necessary)
pkg/services/sqlstore/dashboard_test.go:107:13: if block ends with a return statement, so drop this else and outdent its block
pkg/services/sqlstore/plugin_setting.go:78:10: if block ends with a return statement, so drop this else and outdent its block
pkg/services/sqlstore/preferences.go:91:10: if block ends with a return statement, so drop this else and outdent its block
pkg/services/sqlstore/user.go:50:10: if block ends with a return statement, so drop this else and outdent its block
pkg/services/sqlstore/migrator/migrator.go:106:11: if block ends with a return statement, so drop this else and outdent its block (move short variable declaration to its own line if necessary)
pkg/services/sqlstore/migrator/postgres_dialect.go:48:10: if block ends with a return statement, so drop this else and outdent its block
pkg/tsdb/time_range.go:59:9: if block ends with a return statement, so drop this else and outdent its block (move short variable declaration to its own line if necessary)
pkg/tsdb/time_range.go:67:9: if block ends with a return statement, so drop this else and outdent its block (move short variable declaration to its own line if necessary)
pkg/tsdb/cloudwatch/metric_find_query.go:225:9: if block ends with a return statement, so drop this else and outdent its block
pkg/util/filepath.go:68:11: if block ends with a return statement, so drop this else and outdent its block (move short variable declaration to its own line if necessary)
2018-04-27 22:42:49 +02:00
return err . ( error )
2016-10-03 09:38:03 +02:00
}
2021-11-19 13:56:42 +01:00
// PublishCtx function publish a message to the bus listener.
2022-01-04 09:36:01 +01:00
func ( b * InProcBus ) Publish ( ctx context . Context , msg Msg ) error {
2021-11-19 13:56:42 +01:00
var msgName = reflect . TypeOf ( msg ) . Elem ( ) . Name ( )
var params = [ ] reflect . Value { }
if listeners , exists := b . listenersWithCtx [ msgName ] ; exists {
params = append ( params , reflect . ValueOf ( ctx ) )
params = append ( params , reflect . ValueOf ( msg ) )
2021-11-19 14:32:14 +01:00
if err := callListeners ( listeners , params ) ; err != nil {
2021-11-19 13:56:42 +01:00
return err
}
}
if listeners , exists := b . listeners [ msgName ] ; exists {
params = append ( params , reflect . ValueOf ( msg ) )
if setting . Env == setting . Dev {
b . logger . Warn ( "PublishCtx called with message listener registered using AddEventListener and should be changed to use AddEventListenerCtx" , "msgName" , msgName )
}
2021-11-19 14:32:14 +01:00
if err := callListeners ( listeners , params ) ; err != nil {
2021-11-19 13:56:42 +01:00
return err
}
}
2022-01-20 11:10:12 +01:00
_ , span := b . tracer . Start ( ctx , "bus - " + msgName )
defer span . End ( )
2021-11-19 13:56:42 +01:00
2022-01-20 11:10:12 +01:00
span . SetAttributes ( "msg" , msgName , attribute . Key ( "msg" ) . String ( msgName ) )
2021-11-19 13:56:42 +01:00
return nil
}
2021-11-19 14:32:14 +01:00
func callListeners ( listeners [ ] HandlerFunc , params [ ] reflect . Value ) error {
2015-02-03 23:57:42 +08:00
for _ , listenerHandler := range listeners {
2015-01-09 16:36:23 +01:00
ret := reflect . ValueOf ( listenerHandler ) . Call ( params )
2020-11-17 11:27:45 +01:00
e := ret [ 0 ] . Interface ( )
if e != nil {
err , ok := e . ( error )
if ok {
return err
}
return fmt . Errorf ( "expected listener to return an error, got '%T'" , e )
2015-01-09 16:36:23 +01:00
}
}
return nil
}
2021-12-28 16:08:07 +01:00
func ( b * InProcBus ) AddHandler ( handler HandlerFunc ) {
2016-10-03 09:38:03 +02:00
handlerType := reflect . TypeOf ( handler )
queryTypeName := handlerType . In ( 1 ) . Elem ( ) . Name ( )
2018-06-08 10:51:27 +02:00
b . handlersWithCtx [ queryTypeName ] = handler
2016-10-03 09:38:03 +02:00
}
2021-06-24 20:31:29 +05:30
// GetHandlerCtx returns the handler function for the given struct name.
func ( b * InProcBus ) GetHandlerCtx ( name string ) HandlerFunc {
return b . handlersWithCtx [ name ]
}
2022-01-04 09:36:01 +01:00
func ( b * InProcBus ) AddEventListener ( handler HandlerFunc ) {
2021-11-19 13:56:42 +01:00
handlerType := reflect . TypeOf ( handler )
eventName := handlerType . In ( 1 ) . Elem ( ) . Name ( )
_ , exists := b . listenersWithCtx [ eventName ]
if ! exists {
b . listenersWithCtx [ eventName ] = make ( [ ] HandlerFunc , 0 )
}
b . listenersWithCtx [ eventName ] = append ( b . listenersWithCtx [ eventName ] , handler )
}
2021-12-28 16:08:07 +01:00
// AddHandler attaches a handler function to the global bus context.
2020-06-17 18:43:16 +02:00
// Package level function.
2021-12-28 16:08:07 +01:00
func AddHandler ( implName string , handler HandlerFunc ) {
globalBus . AddHandler ( handler )
2016-10-03 09:38:03 +02:00
}
2021-11-19 13:56:42 +01:00
// AddEventListenerCtx attaches a handler function to the event listener.
// Package level function.
2022-01-04 09:36:01 +01:00
func AddEventListener ( handler HandlerFunc ) {
globalBus . AddEventListener ( handler )
2021-11-19 13:56:42 +01:00
}
2021-12-28 17:36:22 +01:00
func Dispatch ( ctx context . Context , msg Msg ) error {
return globalBus . Dispatch ( ctx , msg )
2016-10-03 09:38:03 +02:00
}
2022-01-04 09:36:01 +01:00
func Publish ( ctx context . Context , msg Msg ) error {
return globalBus . Publish ( ctx , msg )
2021-11-19 13:56:42 +01:00
}
2021-06-24 20:31:29 +05:30
func GetHandlerCtx ( name string ) HandlerFunc {
2021-08-25 15:11:22 +02:00
return globalBus . GetHandlerCtx ( name )
2021-06-24 20:31:29 +05:30
}
2015-05-02 09:24:56 +02:00
func ClearBusHandlers ( ) {
globalBus = New ( )
}
2018-06-05 21:13:53 +02:00
2018-06-07 18:02:28 +02:00
type noopTransactionManager struct { }
2018-06-05 21:13:53 +02:00
2018-06-07 12:54:36 -07:00
func ( * noopTransactionManager ) InTransaction ( ctx context . Context , fn func ( ctx context . Context ) error ) error {
2018-06-14 09:59:52 +02:00
return fn ( ctx )
2018-06-07 18:02:28 +02:00
}