Chore: Remove global bus variable (#50765)

* Chore: Remove global bus variable

* fix bus in tests
This commit is contained in:
Serge Zaitsev 2022-06-14 16:07:41 +02:00 committed by GitHub
parent 99c8ce5ab9
commit 0b55c41d05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 68 additions and 92 deletions

View File

@ -13,6 +13,7 @@ import (
"strings"
"sync"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/middleware/csrf"
"github.com/prometheus/client_golang/prometheus"
@ -86,6 +87,7 @@ type HTTPServer struct {
httpSrv *http.Server
middlewares []web.Handler
namedMiddlewares []routing.RegisterNamedMiddleware
bus bus.Bus
PluginContextProvider *plugincontext.Provider
RouteRegister routing.RouteRegister
@ -166,7 +168,7 @@ type ServerOptions struct {
Listener net.Listener
}
func ProvideHTTPServer(opts ServerOptions, cfg *setting.Cfg, routeRegister routing.RouteRegister,
func ProvideHTTPServer(opts ServerOptions, cfg *setting.Cfg, routeRegister routing.RouteRegister, bus bus.Bus,
renderService rendering.Service, licensing models.Licensing, hooksService *hooks.HooksService,
cacheService *localcache.CacheService, sqlStore *sqlstore.SQLStore, alertEngine *alerting.AlertEngine,
pluginRequestValidator models.PluginRequestValidator, pluginStaticRouteResolver plugins.StaticRouteResolver,
@ -201,6 +203,7 @@ func ProvideHTTPServer(opts ServerOptions, cfg *setting.Cfg, routeRegister routi
hs := &HTTPServer{
Cfg: cfg,
RouteRegister: routeRegister,
bus: bus,
RenderService: renderService,
License: licensing,
HooksService: hooksService,

View File

@ -8,7 +8,6 @@ import (
"github.com/grafana/grafana/pkg/api/dtos"
"github.com/grafana/grafana/pkg/api/response"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/events"
"github.com/grafana/grafana/pkg/infra/metrics"
"github.com/grafana/grafana/pkg/models"
@ -206,7 +205,7 @@ func (hs *HTTPServer) CompleteInvite(c *models.ReqContext) response.Response {
return response.Error(500, "failed to create user", err)
}
if err := bus.Publish(c.Req.Context(), &events.SignUpCompleted{
if err := hs.bus.Publish(c.Req.Context(), &events.SignUpCompleted{
Name: user.NameOrFallback(),
Email: user.Email,
}); err != nil {

View File

@ -7,7 +7,6 @@ import (
"github.com/grafana/grafana/pkg/api/dtos"
"github.com/grafana/grafana/pkg/api/response"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/events"
"github.com/grafana/grafana/pkg/infra/metrics"
"github.com/grafana/grafana/pkg/models"
@ -55,7 +54,7 @@ func (hs *HTTPServer) SignUp(c *models.ReqContext) response.Response {
return response.Error(500, "Failed to create signup", err)
}
if err := bus.Publish(c.Req.Context(), &events.SignUpStarted{
if err := hs.bus.Publish(c.Req.Context(), &events.SignUpStarted{
Email: form.Email,
Code: cmd.Code,
}); err != nil {
@ -102,7 +101,7 @@ func (hs *HTTPServer) SignUpStep2(c *models.ReqContext) response.Response {
}
// publish signup event
if err := bus.Publish(c.Req.Context(), &events.SignUpCompleted{
if err := hs.bus.Publish(c.Req.Context(), &events.SignUpCompleted{
Email: user.Email,
Name: user.NameOrFallback(),
}); err != nil {

View File

@ -6,9 +6,7 @@ import (
"fmt"
"reflect"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/setting"
"go.opentelemetry.io/otel/attribute"
)
@ -29,29 +27,15 @@ type Bus interface {
// InProcBus defines the bus structure
type InProcBus struct {
logger log.Logger
listeners map[string][]HandlerFunc
listenersWithCtx map[string][]HandlerFunc
tracer tracing.Tracer
listeners map[string][]HandlerFunc
tracer tracing.Tracer
}
func ProvideBus(tracer tracing.Tracer) *InProcBus {
globalBus.tracer = tracer
return globalBus
}
// temp stuff, not sure how to handle bus instance, and init yet
var globalBus = New()
// New initialize the bus
func New() *InProcBus {
bus := &InProcBus{
logger: log.New("bus"),
listeners: make(map[string][]HandlerFunc),
listenersWithCtx: make(map[string][]HandlerFunc),
return &InProcBus{
listeners: make(map[string][]HandlerFunc),
tracer: tracer,
}
bus.tracer = tracing.InitializeForBus()
return bus
}
// PublishCtx function publish a message to the bus listener.
@ -59,7 +43,7 @@ func (b *InProcBus) Publish(ctx context.Context, msg Msg) error {
var msgName = reflect.TypeOf(msg).Elem().Name()
var params = []reflect.Value{}
if listeners, exists := b.listenersWithCtx[msgName]; exists {
if listeners, exists := b.listeners[msgName]; exists {
params = append(params, reflect.ValueOf(ctx))
params = append(params, reflect.ValueOf(msg))
if err := callListeners(listeners, params); err != nil {
@ -67,15 +51,6 @@ func (b *InProcBus) Publish(ctx context.Context, msg Msg) error {
}
}
if listeners, exists := b.listeners[msgName]; exists {
params = append(params, reflect.ValueOf(msg))
if setting.Env == setting.Dev {
b.logger.Warn("PublishCtx called with message listener registered using AddEventListener and should be changed to use AddEventListenerCtx", "msgName", msgName)
}
if err := callListeners(listeners, params); err != nil {
return err
}
}
_, span := b.tracer.Start(ctx, "bus - "+msgName)
defer span.End()
@ -102,19 +77,9 @@ func callListeners(listeners []HandlerFunc, params []reflect.Value) error {
func (b *InProcBus) AddEventListener(handler HandlerFunc) {
handlerType := reflect.TypeOf(handler)
eventName := handlerType.In(1).Elem().Name()
_, exists := b.listenersWithCtx[eventName]
_, exists := b.listeners[eventName]
if !exists {
b.listenersWithCtx[eventName] = make([]HandlerFunc, 0)
b.listeners[eventName] = make([]HandlerFunc, 0)
}
b.listenersWithCtx[eventName] = append(b.listenersWithCtx[eventName], handler)
}
// AddEventListenerCtx attaches a handler function to the event listener.
// Package level function.
func AddEventListener(handler HandlerFunc) {
globalBus.AddEventListener(handler)
}
func Publish(ctx context.Context, msg Msg) error {
return globalBus.Publish(ctx, msg)
b.listeners[eventName] = append(b.listeners[eventName], handler)
}

View File

@ -14,10 +14,9 @@ type testQuery struct {
}
func TestEventPublish(t *testing.T) {
bus := New()
tracer, err := tracing.InitializeTracerForTest()
require.NoError(t, err)
bus.tracer = tracer
bus := ProvideBus(tracer)
var invoked bool
@ -33,20 +32,18 @@ func TestEventPublish(t *testing.T) {
}
func TestEventPublish_NoRegisteredListener(t *testing.T) {
bus := New()
tracer, err := tracing.InitializeTracerForTest()
require.NoError(t, err)
bus.tracer = tracer
bus := ProvideBus(tracer)
err = bus.Publish(context.Background(), &testQuery{})
require.NoError(t, err, "unable to publish event")
}
func TestEventCtxPublishCtx(t *testing.T) {
bus := New()
tracer, err := tracing.InitializeTracerForTest()
require.NoError(t, err)
bus.tracer = tracer
bus := ProvideBus(tracer)
var invoked bool
@ -62,20 +59,18 @@ func TestEventCtxPublishCtx(t *testing.T) {
}
func TestEventPublishCtx_NoRegisteredListener(t *testing.T) {
bus := New()
tracer, err := tracing.InitializeTracerForTest()
require.NoError(t, err)
bus.tracer = tracer
bus := ProvideBus(tracer)
err = bus.Publish(context.Background(), &testQuery{})
require.NoError(t, err, "unable to publish event")
}
func TestEventPublishCtx(t *testing.T) {
bus := New()
tracer, err := tracing.InitializeTracerForTest()
require.NoError(t, err)
bus.tracer = tracer
bus := ProvideBus(tracer)
var invoked bool
@ -91,10 +86,9 @@ func TestEventPublishCtx(t *testing.T) {
}
func TestEventCtxPublish(t *testing.T) {
bus := New()
tracer, err := tracing.InitializeTracerForTest()
require.NoError(t, err)
bus.tracer = tracer
bus := ProvideBus(tracer)
var invoked bool

View File

@ -5,6 +5,7 @@ import (
"strings"
"github.com/fatih/color"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/cmd/grafana-cli/commands/datamigrations"
"github.com/grafana/grafana/pkg/cmd/grafana-cli/commands/secretsmigrations"
"github.com/grafana/grafana/pkg/cmd/grafana-cli/logger"
@ -55,7 +56,9 @@ func runDbCommand(command func(commandLine utils.CommandLine, sqlStore *sqlstore
return fmt.Errorf("%v: %w", "failed to initialize tracer service", err)
}
sqlStore, err := sqlstore.ProvideService(cfg, nil, &migrations.OSSMigrations{}, tracer)
bus := bus.ProvideBus(tracer)
sqlStore, err := sqlstore.ProvideService(cfg, nil, &migrations.OSSMigrations{}, bus, tracer)
if err != nil {
return fmt.Errorf("%v: %w", "failed to initialize SQL store", err)
}

View File

@ -12,6 +12,7 @@ import (
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/notifications"
"github.com/grafana/grafana/pkg/setting"
)
@ -268,7 +269,10 @@ func TestEmailNotifierIntegration(t *testing.T) {
func createCoreEmailService(t *testing.T) *notifications.NotificationService {
t.Helper()
bus := bus.New()
tracer, err := tracing.InitializeTracerForTest()
require.NoError(t, err)
bus := bus.ProvideBus(tracer)
cfg := setting.NewCfg()
cfg.StaticRootPath = "../../../../../public/"
cfg.BuildVersion = "4.0.0"

View File

@ -5,14 +5,22 @@ import (
"testing"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func newBus(t *testing.T) bus.Bus {
t.Helper()
tracer, err := tracing.InitializeTracerForTest()
require.NoError(t, err)
return bus.ProvideBus(tracer)
}
func TestProvideService(t *testing.T) {
bus := bus.New()
bus := newBus(t)
t.Run("When invalid from_address in configuration", func(t *testing.T) {
cfg := createSmtpConfig()
@ -32,7 +40,7 @@ func TestProvideService(t *testing.T) {
}
func TestSendEmailSync(t *testing.T) {
bus := bus.New()
bus := newBus(t)
t.Run("When sending emails synchronously", func(t *testing.T) {
ns, mailer := createSut(t, bus)
@ -173,7 +181,7 @@ func TestSendEmailSync(t *testing.T) {
}
func TestSendEmailAsync(t *testing.T) {
bus := bus.New()
bus := newBus(t)
t.Run("When sending reset email password", func(t *testing.T) {
sut, _ := createSut(t, bus)

View File

@ -5,7 +5,6 @@ import (
"io/ioutil"
"testing"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting"
@ -20,7 +19,7 @@ func TestEmailIntegrationTest(t *testing.T) {
setting.BuildVersion = "4.0.0"
ns := &NotificationService{}
ns.Bus = bus.New()
ns.Bus = newBus(t)
ns.Cfg = setting.NewCfg()
ns.Cfg.Smtp.Enabled = true
ns.Cfg.Smtp.TemplatesPatterns = []string{"emails/*.html", "emails/*.txt"}

View File

@ -6,6 +6,7 @@ import (
"testing"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/services/dashboardimport"
@ -472,9 +473,12 @@ type scenarioContext struct {
func scenario(t *testing.T, desc string, input scenarioInput, f func(ctx *scenarioContext)) {
t.Helper()
tracer, err := tracing.InitializeTracerForTest()
require.NoError(t, err)
sCtx := &scenarioContext{
t: t,
bus: bus.New(),
bus: bus.ProvideBus(tracer),
importDashboardArgs: []*dashboardimport.ImportDashboardRequest{},
getPluginSettingsByIdArgs: []*models.GetPluginSettingByIdQuery{},
updatePluginSettingVersionArgs: []*models.UpdatePluginSettingVersionCmd{},

View File

@ -8,7 +8,6 @@ import (
"testing"
"time"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/events"
"github.com/grafana/grafana/pkg/models"
ac "github.com/grafana/grafana/pkg/services/accesscontrol"
@ -99,7 +98,7 @@ func TestIntegrationDataAccess(t *testing.T) {
sqlStore := InitTestDB(t)
var created *events.DataSourceCreated
bus.AddEventListener(func(ctx context.Context, e *events.DataSourceCreated) error {
sqlStore.bus.AddEventListener(func(ctx context.Context, e *events.DataSourceCreated) error {
created = e
return nil
})
@ -245,7 +244,7 @@ func TestIntegrationDataAccess(t *testing.T) {
ds := initDatasource(sqlStore)
var deleted *events.DataSourceDeleted
bus.AddEventListener(func(ctx context.Context, e *events.DataSourceDeleted) error {
sqlStore.bus.AddEventListener(func(ctx context.Context, e *events.DataSourceDeleted) error {
deleted = e
return nil
})

View File

@ -102,13 +102,13 @@ func isOrgNameTaken(name string, existingId int64, sess *DBSession) (bool, error
return false, nil
}
func createOrg(name string, userID int64, engine *xorm.Engine) (models.Org, error) {
func (ss *SQLStore) createOrg(ctx context.Context, name string, userID int64, engine *xorm.Engine) (models.Org, error) {
org := models.Org{
Name: name,
Created: time.Now(),
Updated: time.Now(),
}
if err := inTransactionWithRetryCtx(context.Background(), engine, func(sess *DBSession) error {
if err := inTransactionWithRetryCtx(ctx, engine, ss.bus, func(sess *DBSession) error {
if isNameTaken, err := isOrgNameTaken(name, 0, sess); err != nil {
return err
} else if isNameTaken {
@ -145,11 +145,11 @@ func createOrg(name string, userID int64, engine *xorm.Engine) (models.Org, erro
// CreateOrgWithMember creates an organization with a certain name and a certain user as member.
func (ss *SQLStore) CreateOrgWithMember(name string, userID int64) (models.Org, error) {
return createOrg(name, userID, ss.engine)
return ss.createOrg(context.Background(), name, userID, ss.engine)
}
func (ss *SQLStore) CreateOrg(ctx context.Context, cmd *models.CreateOrgCommand) error {
org, err := createOrg(cmd.Name, cmd.UserId, ss.engine)
org, err := ss.createOrg(ctx, cmd.Name, cmd.UserId, ss.engine)
if err != nil {
return err
}

View File

@ -5,7 +5,6 @@ import (
"testing"
"time"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/models"
"github.com/stretchr/testify/require"
)
@ -86,7 +85,7 @@ func TestIntegrationPluginSettings(t *testing.T) {
t.Run("UpdatePluginSetting should update existing plugin settings and publish PluginStateChangedEvent", func(t *testing.T) {
var pluginStateChangedEvent *models.PluginStateChangedEvent
bus.AddEventListener(func(_ context.Context, evt *models.PluginStateChangedEvent) error {
store.bus.AddEventListener(func(_ context.Context, evt *models.PluginStateChangedEvent) error {
pluginStateChangedEvent = evt
return nil
})
@ -145,7 +144,7 @@ func TestIntegrationPluginSettings(t *testing.T) {
t.Run("Non-existing plugin settings", func(t *testing.T) {
t.Run("UpdatePluginSetting should insert plugin settings and publish PluginStateChangedEvent", func(t *testing.T) {
var pluginStateChangedEvent *models.PluginStateChangedEvent
bus.AddEventListener(func(_ context.Context, evt *models.PluginStateChangedEvent) error {
store.bus.AddEventListener(func(_ context.Context, evt *models.PluginStateChangedEvent) error {
pluginStateChangedEvent = evt
return nil
})

View File

@ -15,6 +15,7 @@ import (
_ "github.com/lib/pq"
"xorm.io/xorm"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/infra/fs"
"github.com/grafana/grafana/pkg/infra/localcache"
"github.com/grafana/grafana/pkg/infra/log"
@ -43,6 +44,7 @@ type SQLStore struct {
Cfg *setting.Cfg
CacheService *localcache.CacheService
bus bus.Bus
dbCfg DatabaseConfig
engine *xorm.Engine
log log.Logger
@ -52,12 +54,12 @@ type SQLStore struct {
tracer tracing.Tracer
}
func ProvideService(cfg *setting.Cfg, cacheService *localcache.CacheService, migrations registry.DatabaseMigrator, tracer tracing.Tracer) (*SQLStore, error) {
func ProvideService(cfg *setting.Cfg, cacheService *localcache.CacheService, migrations registry.DatabaseMigrator, bus bus.Bus, tracer tracing.Tracer) (*SQLStore, error) {
// This change will make xorm use an empty default schema for postgres and
// by that mimic the functionality of how it was functioning before
// xorm's changes above.
xorm.DefaultPostgresSchema = ""
s, err := newSQLStore(cfg, cacheService, nil, migrations, tracer)
s, err := newSQLStore(cfg, cacheService, nil, migrations, bus, tracer)
if err != nil {
return nil, err
}
@ -78,13 +80,14 @@ func ProvideServiceForTests(migrations registry.DatabaseMigrator) (*SQLStore, er
}
func newSQLStore(cfg *setting.Cfg, cacheService *localcache.CacheService, engine *xorm.Engine,
migrations registry.DatabaseMigrator, tracer tracing.Tracer, opts ...InitTestDBOpt) (*SQLStore, error) {
migrations registry.DatabaseMigrator, bus bus.Bus, tracer tracing.Tracer, opts ...InitTestDBOpt) (*SQLStore, error) {
ss := &SQLStore{
Cfg: cfg,
CacheService: cacheService,
log: log.New("sqlstore"),
skipEnsureDefaultOrgAndUser: false,
migrations: migrations,
bus: bus,
tracer: tracer,
}
for _, opt := range opts {
@ -543,7 +546,8 @@ func initTestDB(migration registry.DatabaseMigrator, opts ...InitTestDBOpt) (*SQ
if err != nil {
return nil, err
}
testSQLStore, err = newSQLStore(cfg, localcache.New(5*time.Minute, 10*time.Minute), engine, migration, tracer, opts...)
bus := bus.ProvideBus(tracer)
testSQLStore, err = newSQLStore(cfg, localcache.New(5*time.Minute, 10*time.Minute), engine, migration, bus, tracer, opts...)
if err != nil {
return nil, err
}

View File

@ -17,7 +17,7 @@ var tsclogger = log.New("sqlstore.transactions")
// WithTransactionalDbSession calls the callback with a session within a transaction.
func (ss *SQLStore) WithTransactionalDbSession(ctx context.Context, callback DBTransactionFunc) error {
return inTransactionWithRetryCtx(ctx, ss.engine, callback, 0)
return inTransactionWithRetryCtx(ctx, ss.engine, ss.bus, callback, 0)
}
func (ss *SQLStore) InTransaction(ctx context.Context, fn func(ctx context.Context) error) error {
@ -25,17 +25,13 @@ func (ss *SQLStore) InTransaction(ctx context.Context, fn func(ctx context.Conte
}
func (ss *SQLStore) inTransactionWithRetry(ctx context.Context, fn func(ctx context.Context) error, retry int) error {
return inTransactionWithRetryCtx(ctx, ss.engine, func(sess *DBSession) error {
return inTransactionWithRetryCtx(ctx, ss.engine, ss.bus, func(sess *DBSession) error {
withValue := context.WithValue(ctx, ContextSessionKey{}, sess)
return fn(withValue)
}, retry)
}
func inTransactionWithRetry(callback DBTransactionFunc, engine *xorm.Engine, retry int) error {
return inTransactionWithRetryCtx(context.Background(), engine, callback, retry)
}
func inTransactionWithRetryCtx(ctx context.Context, engine *xorm.Engine, callback DBTransactionFunc, retry int) error {
func inTransactionWithRetryCtx(ctx context.Context, engine *xorm.Engine, bus bus.Bus, callback DBTransactionFunc, retry int) error {
sess, isNew, err := startSessionOrUseExisting(ctx, engine, true)
if err != nil {
return err
@ -67,7 +63,7 @@ func inTransactionWithRetryCtx(ctx context.Context, engine *xorm.Engine, callbac
time.Sleep(time.Millisecond * time.Duration(10))
sqlog.Info("Database locked, sleeping then retrying", "error", err, "retry", retry)
return inTransactionWithRetry(callback, engine, retry+1)
return inTransactionWithRetryCtx(ctx, engine, bus, callback, retry+1)
}
if err != nil {