diff --git a/conf/grafana.ini b/conf/grafana.ini index 5f70bf3f9da..2d643191b4a 100644 --- a/conf/grafana.ini +++ b/conf/grafana.ini @@ -121,7 +121,7 @@ daily_rotate = true ; Expired days of log file(delete after max days), default is 7 max_days = 7 -[notifications] +[event_publisher] enabled = false rabbitmq_url = amqp://localhost/ -notifications_exchange = notifications +exchange = grafana_events diff --git a/grafana b/grafana index 0fe83d51981..07ec00641fb 160000 --- a/grafana +++ b/grafana @@ -1 +1 @@ -Subproject commit 0fe83d51981333600f1e3801044fc1cfd5acf1ae +Subproject commit 07ec00641fb6d633dc2914ff433e61db1ef8a313 diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index f6c313b91a2..2865c740fd1 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -11,13 +11,16 @@ type Msg interface{} type Bus interface { Dispatch(msg Msg) error Publish(msg Msg) error + AddHandler(handler HandlerFunc) AddEventListener(handler HandlerFunc) + AddWildcardListener(handler HandlerFunc) } type InProcBus struct { - handlers map[string]HandlerFunc - listeners map[string][]HandlerFunc + handlers map[string]HandlerFunc + listeners map[string][]HandlerFunc + wildcardListeners []HandlerFunc } // temp stuff, not sure how to handle bus instance, and init yet @@ -27,6 +30,7 @@ func New() Bus { bus := &InProcBus{} bus.handlers = make(map[string]HandlerFunc) bus.listeners = make(map[string][]HandlerFunc) + bus.wildcardListeners = make([]HandlerFunc, 0) return bus } @@ -53,9 +57,6 @@ func (b *InProcBus) Dispatch(msg Msg) error { func (b *InProcBus) Publish(msg Msg) error { var msgName = reflect.TypeOf(msg).Elem().Name() var listeners = b.listeners[msgName] - if len(listeners) == 0 { - return nil - } var params = make([]reflect.Value, 1) params[0] = reflect.ValueOf(msg) @@ -68,9 +69,21 @@ func (b *InProcBus) Publish(msg Msg) error { } } + for _, listenerHandler := range b.wildcardListeners { + ret := reflect.ValueOf(listenerHandler).Call(params) + err := ret[0].Interface() + if err != nil { + return err.(error) + } + } + return nil } +func (b *InProcBus) AddWildcardListener(handler HandlerFunc) { + b.wildcardListeners = append(b.wildcardListeners, handler) +} + func (b *InProcBus) AddHandler(handler HandlerFunc) { handlerType := reflect.TypeOf(handler) queryTypeName := handlerType.In(0).Elem().Name() @@ -97,10 +110,14 @@ func AddEventListener(handler HandlerFunc) { globalBus.AddEventListener(handler) } +func AddWildcardListener(handler HandlerFunc) { + globalBus.AddWildcardListener(handler) +} + func Dispatch(msg Msg) error { return globalBus.Dispatch(msg) } func Publish(msg Msg) error { return globalBus.Publish(msg) -} \ No newline at end of file +} diff --git a/pkg/bus/bus_test.go b/pkg/bus/bus_test.go index 45191eeb682..62e72f18308 100644 --- a/pkg/bus/bus_test.go +++ b/pkg/bus/bus_test.go @@ -2,6 +2,7 @@ package bus import ( "errors" + "fmt" "testing" ) @@ -62,7 +63,7 @@ func TestEventListeners(t *testing.T) { if err != nil { t.Fatal("Publish event failed " + err.Error()) - } else if count != 0 { - t.Fatal("Publish event failed, listeners called: %v, expected: %v", count, 11) + } else if count != 11 { + t.Fatal(fmt.Sprintf("Publish event failed, listeners called: %v, expected: %v", count, 11)) } } diff --git a/pkg/cmd/web.go b/pkg/cmd/web.go index caef2e9f64d..dfc5ff41bb3 100644 --- a/pkg/cmd/web.go +++ b/pkg/cmd/web.go @@ -16,10 +16,10 @@ import ( "github.com/torkelo/grafana-pro/pkg/api" "github.com/torkelo/grafana-pro/pkg/log" "github.com/torkelo/grafana-pro/pkg/middleware" + "github.com/torkelo/grafana-pro/pkg/services/eventpublisher" "github.com/torkelo/grafana-pro/pkg/services/sqlstore" "github.com/torkelo/grafana-pro/pkg/setting" "github.com/torkelo/grafana-pro/pkg/social" - "github.com/torkelo/grafana-pro/pkg/services/notification" ) var CmdWeb = cli.Command{ @@ -82,13 +82,9 @@ func runWeb(c *cli.Context) { social.NewOAuthService() sqlstore.NewEngine() sqlstore.EnsureAdminUser() + eventpublisher.Init() + var err error - if setting.NotificationsEnabled { - err = notification.Init(setting.RabbitmqUrl, setting.NotificationsExchange) - if err != nil { - log.Fatal(4, "Failed to connect to notification queue: %v", err) - } - } m := newMacaron() api.Register(m) diff --git a/pkg/events/events.go b/pkg/events/events.go new file mode 100644 index 00000000000..a86af406230 --- /dev/null +++ b/pkg/events/events.go @@ -0,0 +1,8 @@ +package events + +// Events can be passed to external systems via for example AMPQ +// Treat these events as basically DTOs so changes has to be backward compatible + +type AccountCreated struct { + Name string `json:"name"` +} diff --git a/pkg/events/events_test.go b/pkg/events/events_test.go new file mode 100644 index 00000000000..f527569d0cf --- /dev/null +++ b/pkg/events/events_test.go @@ -0,0 +1,18 @@ +package events + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" +) + +func TestEventCreation(t *testing.T) { + + Convey("When generating slug", t, func() { + dashboard := NewDashboard("Grafana Play Home") + dashboard.UpdateSlug() + + So(dashboard.Slug, ShouldEqual, "grafana-play-home") + }) + +} diff --git a/pkg/services/notification/notification.go b/pkg/services/eventpublisher/eventpublisher.go similarity index 60% rename from pkg/services/notification/notification.go rename to pkg/services/eventpublisher/eventpublisher.go index 833fd29bf43..59c03f3c09a 100644 --- a/pkg/services/notification/notification.go +++ b/pkg/services/eventpublisher/eventpublisher.go @@ -1,25 +1,28 @@ -package notification +package eventpublisher import ( - "fmt" - "time" "encoding/json" + "fmt" + "log" + "reflect" + "time" + "github.com/streadway/amqp" "github.com/torkelo/grafana-pro/pkg/bus" - m "github.com/torkelo/grafana-pro/pkg/models" + "github.com/torkelo/grafana-pro/pkg/setting" ) var ( - url string + url string exchange string - conn *amqp.Connection - channel *amqp.Channel + conn *amqp.Connection + channel *amqp.Channel ) func getConnection() (*amqp.Connection, error) { c, err := amqp.Dial(url) if err != nil { - return nil, err + return nil, err } return c, err } @@ -31,25 +34,35 @@ func getChannel() (*amqp.Channel, error) { } err = ch.ExchangeDeclare( - exchange, // name - "topic", // type - true, // durable - false, // auto-deleted - false, // internal - false, // no-wait - nil, // arguments + exchange, // name + "topic", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments ) - if (err != nil) { + if err != nil { return nil, err } return ch, err } -func Init(rabbitUrl string, exchangeName string) error { - url = rabbitUrl - exchange = exchangeName - bus.AddEventListener(NotificationHandler) - return Setup() +func Init() { + sec := setting.Cfg.Section("event_publisher") + + if !sec.Key("enabled").MustBool(false) { + return + } + + url = sec.Key("rabbitmq_url").String() + exchange = sec.Key("exchange").String() + bus.AddWildcardListener(eventListener) + + if err := Setup(); err != nil { + log.Fatal(4, "Failed to connect to notification queue: %v", err) + return + } } // Every connection should declare the topology they expect @@ -82,7 +95,7 @@ func Setup() error { //could not create channel, so lets close the connection // and re-create. _ = conn.Close() - + for err != nil { time.Sleep(2 * time.Second) fmt.Println("attempting to reconnect to rabbitmq.") @@ -92,39 +105,42 @@ func Setup() error { } }() - return nil + return nil } -func Publish(routingKey string, msgString []byte) { +func publish(routingKey string, msgString []byte) { err := channel.Publish( - exchange, //exchange - routingKey, // routing key - false, // mandatory + exchange, //exchange + routingKey, // routing key + false, // mandatory false, // immediate amqp.Publishing{ ContentType: "application/json", - Body: msgString, + Body: msgString, }, ) if err != nil { // failures are most likely because the connection was lost. - // the connection will be re-established, so just keep + // the connection will be re-established, so just keep // retrying every 2seconds until we successfully publish. time.Sleep(2 * time.Second) - fmt.Println("publish failed, retrying."); - Publish(routingKey, msgString) + fmt.Println("publish failed, retrying.") + publish(routingKey, msgString) } return } -func NotificationHandler(event *m.Notification) error { +func eventListener(event interface{}) error { msgString, err := json.Marshal(event) if err != nil { return err } - routingKey := fmt.Sprintf("%s.%s", event.Priority, event.EventType) + + eventType := reflect.TypeOf(event) + + routingKey := fmt.Sprintf("%s.%s", "INFO", eventType.Name()) // this is run in a greenthread and we expect that publish will keep // retrying until the message gets sent. - go Publish(routingKey, msgString) + go publish(routingKey, msgString) return nil -} \ No newline at end of file +} diff --git a/pkg/services/sqlstore/account.go b/pkg/services/sqlstore/account.go index 11cde674268..4fddee071c0 100644 --- a/pkg/services/sqlstore/account.go +++ b/pkg/services/sqlstore/account.go @@ -6,6 +6,7 @@ import ( "github.com/go-xorm/xorm" "github.com/torkelo/grafana-pro/pkg/bus" + "github.com/torkelo/grafana-pro/pkg/events" m "github.com/torkelo/grafana-pro/pkg/models" ) @@ -48,7 +49,7 @@ func GetAccountByName(query *m.GetAccountByNameQuery) error { } func CreateAccount(cmd *m.CreateAccountCommand) error { - return inTransaction(func(sess *xorm.Session) error { + return inTransaction2(func(sess *session) error { account := m.Account{ Name: cmd.Name, @@ -60,7 +61,6 @@ func CreateAccount(cmd *m.CreateAccountCommand) error { return err } - // create inital admin account user user := m.AccountUser{ AccountId: account.Id, UserId: cmd.UserId, @@ -72,6 +72,8 @@ func CreateAccount(cmd *m.CreateAccountCommand) error { _, err := sess.Insert(&user) cmd.Result = account + sess.publishAfterCommit(&events.AccountCreated{}) + // silently ignore failures to publish events. _ = bus.Publish(&m.Notification{ EventType: "account.create", @@ -79,7 +81,7 @@ func CreateAccount(cmd *m.CreateAccountCommand) error { Priority: m.PRIO_INFO, Payload: account, }) - + return err }) } diff --git a/pkg/services/sqlstore/shared.go b/pkg/services/sqlstore/shared.go new file mode 100644 index 00000000000..51300167757 --- /dev/null +++ b/pkg/services/sqlstore/shared.go @@ -0,0 +1,71 @@ +package sqlstore + +import ( + "github.com/go-xorm/xorm" + "github.com/torkelo/grafana-pro/pkg/bus" + "github.com/torkelo/grafana-pro/pkg/log" +) + +type dbTransactionFunc func(sess *xorm.Session) error +type dbTransactionFunc2 func(sess *session) error + +type session struct { + *xorm.Session + events []interface{} +} + +func (sess *session) publishAfterCommit(msg interface{}) { + sess.events = append(sess.events, msg) +} + +func inTransaction(callback dbTransactionFunc) error { + var err error + + sess := x.NewSession() + defer sess.Close() + + if err = sess.Begin(); err != nil { + return err + } + + err = callback(sess) + + if err != nil { + sess.Rollback() + return err + } else if err = sess.Commit(); err != nil { + return err + } + + return nil +} + +func inTransaction2(callback dbTransactionFunc2) error { + var err error + + sess := session{Session: x.NewSession()} + + defer sess.Close() + if err = sess.Begin(); err != nil { + return err + } + + err = callback(&sess) + + if err != nil { + sess.Rollback() + return err + } else if err = sess.Commit(); err != nil { + return err + } + + if len(sess.events) > 0 { + for _, e := range sess.events { + if err = bus.Publish(e); err != nil { + log.Error(3, "Failed to publish event after commit", err) + } + } + } + + return nil +} diff --git a/pkg/services/sqlstore/sqlstore.go b/pkg/services/sqlstore/sqlstore.go index f3968b4923d..9d1bef8106f 100644 --- a/pkg/services/sqlstore/sqlstore.go +++ b/pkg/services/sqlstore/sqlstore.go @@ -149,27 +149,3 @@ func LoadConfig() { DbCfg.SslMode = sec.Key("ssl_mode").String() DbCfg.Path = sec.Key("path").MustString("data/grafana.db") } - -type dbTransactionFunc func(sess *xorm.Session) error - -func inTransaction(callback dbTransactionFunc) error { - var err error - - sess := x.NewSession() - defer sess.Close() - - if err = sess.Begin(); err != nil { - return err - } - - err = callback(sess) - - if err != nil { - sess.Rollback() - return err - } else if err = sess.Commit(); err != nil { - return err - } - - return nil -} diff --git a/pkg/setting/setting.go b/pkg/setting/setting.go index 64df98603b0..50a78c8a793 100644 --- a/pkg/setting/setting.go +++ b/pkg/setting/setting.go @@ -93,12 +93,6 @@ var ( // PhantomJs Rendering ImagesDir string PhantomDir string - - //Notifications - NotificationsEnabled bool - RabbitmqUrl string - NotificationsExchange string - ) func init() { @@ -228,7 +222,7 @@ func NewConfigContext() { // Notifications NotificationsEnabled = Cfg.Section("notifications").Key("enabled").MustBool(false) RabbitmqUrl = Cfg.Section("notifications").Key("rabbitmq_url").MustString("amqp://localhost/") - + // validate rabbitmqUrl. _, err = url.Parse(RabbitmqUrl) if err != nil {