From d60339a9b144ca24983bec8d33668b4a52fa74a3 Mon Sep 17 00:00:00 2001 From: utkarshcmu Date: Mon, 9 Oct 2017 18:07:44 -0700 Subject: [PATCH 1/5] Kafka REST Proxy works with Grafana --- pkg/services/alerting/notifiers/kafka.go | 120 ++++++++++++++++++ .../app/features/alerting/alert_tab_ctrl.ts | 1 + 2 files changed, 121 insertions(+) create mode 100644 pkg/services/alerting/notifiers/kafka.go diff --git a/pkg/services/alerting/notifiers/kafka.go b/pkg/services/alerting/notifiers/kafka.go new file mode 100644 index 00000000000..52bbcd74b9e --- /dev/null +++ b/pkg/services/alerting/notifiers/kafka.go @@ -0,0 +1,120 @@ +package notifiers + +import ( + "strconv" + + "fmt" + + "github.com/grafana/grafana/pkg/bus" + "github.com/grafana/grafana/pkg/components/simplejson" + "github.com/grafana/grafana/pkg/log" + m "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/services/alerting" +) + +func init() { + alerting.RegisterNotifier(&alerting.NotifierPlugin{ + Type: "kafka", + Name: "Kafka REST Proxy", + Description: "Sends notifications to Kafka Rest Proxy", + Factory: NewKafkaNotifier, + OptionsTemplate: ` +

Kafka settings

+
+ Kafka REST Proxy + +
+
+ Topic + +
+ `, + }) +} + +func NewKafkaNotifier(model *m.AlertNotification) (alerting.Notifier, error) { + endpoint := model.Settings.Get("kafkaRestProxy").MustString() + if endpoint == "" { + return nil, alerting.ValidationError{Reason: "Could not find kafka rest proxy endpoint property in settings"} + } + topic := model.Settings.Get("kafkaTopic").MustString() + if topic == "" { + return nil, alerting.ValidationError{Reason: "Could not find kafka topic property in settings"} + } + + return &KafkaNotifier{ + NotifierBase: NewNotifierBase(model.Id, model.IsDefault, model.Name, model.Type, model.Settings), + Endpoint: endpoint, + Topic: topic, + log: log.New("alerting.notifier.kafka"), + }, nil +} + +type KafkaNotifier struct { + NotifierBase + Endpoint string + Topic string + log log.Logger +} + +func (this *KafkaNotifier) Notify(evalContext *alerting.EvalContext) error { + + state := evalContext.Rule.State + + customData := "Triggered metrics:\n\n" + for _, evt := range evalContext.EvalMatches { + customData = customData + fmt.Sprintf("%s: %v\n", evt.Metric, evt.Value) + } + + this.log.Info("Notifying Kafka", "alert_state", state) + + recordJSON := simplejson.New() + records := make([]interface{}, 1) + + bodyJSON := simplejson.New() + bodyJSON.Set("description", evalContext.Rule.Name+" - "+evalContext.Rule.Message) + bodyJSON.Set("client", "Grafana") + bodyJSON.Set("details", customData) + bodyJSON.Set("incident_key", "alertId-"+strconv.FormatInt(evalContext.Rule.Id, 10)) + + ruleUrl, err := evalContext.GetRuleUrl() + if err != nil { + this.log.Error("Failed get rule link", "error", err) + return err + } + bodyJSON.Set("client_url", ruleUrl) + + if evalContext.ImagePublicUrl != "" { + contexts := make([]interface{}, 1) + imageJSON := simplejson.New() + imageJSON.Set("type", "image") + imageJSON.Set("src", evalContext.ImagePublicUrl) + contexts[0] = imageJSON + bodyJSON.Set("contexts", contexts) + } + + valueJSON := simplejson.New() + valueJSON.Set("value", bodyJSON) + records[0] = valueJSON + recordJSON.Set("records", records) + body, _ := recordJSON.MarshalJSON() + + topicUrl := this.Endpoint+"/topics/"+this.Topic + + cmd := &m.SendWebhookSync{ + Url: topicUrl, + Body: string(body), + HttpMethod: "POST", + HttpHeader: map[string]string{ + "Content-Type": "application/vnd.kafka.json.v2+json", + "Accept": "application/vnd.kafka.v2+json", + }, + } + + if err := bus.DispatchCtx(evalContext.Ctx, cmd); err != nil { + this.log.Error("Failed to send notification to Kafka", "error", err, "body", string(body)) + return err + } + + return nil +} diff --git a/public/app/features/alerting/alert_tab_ctrl.ts b/public/app/features/alerting/alert_tab_ctrl.ts index 677eec31060..25c23580ed7 100644 --- a/public/app/features/alerting/alert_tab_ctrl.ts +++ b/public/app/features/alerting/alert_tab_ctrl.ts @@ -94,6 +94,7 @@ export class AlertTabCtrl { case "opsgenie": return "fa fa-bell"; case "hipchat": return "fa fa-mail-forward"; case "pushover": return "fa fa-mobile"; + case "kafka": return "fa fa-random"; } return 'fa fa-bell'; } From 138bee99ef3f7d7b78180e56455c694ca00252c2 Mon Sep 17 00:00:00 2001 From: utkarshcmu Date: Mon, 9 Oct 2017 18:12:12 -0700 Subject: [PATCH 2/5] Added tests --- pkg/services/alerting/notifiers/kafka_test.go | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 pkg/services/alerting/notifiers/kafka_test.go diff --git a/pkg/services/alerting/notifiers/kafka_test.go b/pkg/services/alerting/notifiers/kafka_test.go new file mode 100644 index 00000000000..977bea7cf17 --- /dev/null +++ b/pkg/services/alerting/notifiers/kafka_test.go @@ -0,0 +1,55 @@ +package notifiers + +import ( + "testing" + + "github.com/grafana/grafana/pkg/components/simplejson" + m "github.com/grafana/grafana/pkg/models" + . "github.com/smartystreets/goconvey/convey" +) + +func TestKafkaNotifier(t *testing.T) { + Convey("Kafka notifier tests", t, func() { + + Convey("Parsing alert notification from settings", func() { + Convey("empty settings should return error", func() { + json := `{ }` + + settingsJSON, _ := simplejson.NewJson([]byte(json)) + model := &m.AlertNotification{ + Name: "kafka_testing", + Type: "kafka", + Settings: settingsJSON, + } + + _, err := NewKafkaNotifier(model) + So(err, ShouldNotBeNil) + }) + + Convey("settings should send an event to kafka", func() { + json := ` + { + "kafkaEndpoint": "http://localhost:8082", + "kafkaTopic": "topic1" + }` + + settingsJSON, _ := simplejson.NewJson([]byte(json)) + model := &m.AlertNotification{ + Name: "kafka_testing", + Type: "kafka", + Settings: settingsJSON, + } + + not, err := NewKafkaNotifier(model) + kafkaNotifier := not.(*KafkaNotifier) + + So(err, ShouldBeNil) + So(kafkaNotifier.Name, ShouldEqual, "kafka_testing") + So(kafkaNotifier.Type, ShouldEqual, "kafka") + So(kafkaNotifier.Endpoint, ShouldEqual, "http://localhost:8082") + So(kafkaNotifier.Topic, ShouldEqual, "topic1") + }) + + }) + }) +} From a562dc7c2be3aa8430121b05bd2e43cb2175cfef Mon Sep 17 00:00:00 2001 From: utkarshcmu Date: Mon, 9 Oct 2017 18:12:39 -0700 Subject: [PATCH 3/5] gofmt fixes --- pkg/services/alerting/notifiers/kafka.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/services/alerting/notifiers/kafka.go b/pkg/services/alerting/notifiers/kafka.go index 52bbcd74b9e..92f6489106b 100644 --- a/pkg/services/alerting/notifiers/kafka.go +++ b/pkg/services/alerting/notifiers/kafka.go @@ -38,9 +38,9 @@ func NewKafkaNotifier(model *m.AlertNotification) (alerting.Notifier, error) { return nil, alerting.ValidationError{Reason: "Could not find kafka rest proxy endpoint property in settings"} } topic := model.Settings.Get("kafkaTopic").MustString() - if topic == "" { - return nil, alerting.ValidationError{Reason: "Could not find kafka topic property in settings"} - } + if topic == "" { + return nil, alerting.ValidationError{Reason: "Could not find kafka topic property in settings"} + } return &KafkaNotifier{ NotifierBase: NewNotifierBase(model.Id, model.IsDefault, model.Name, model.Type, model.Settings), @@ -52,9 +52,9 @@ func NewKafkaNotifier(model *m.AlertNotification) (alerting.Notifier, error) { type KafkaNotifier struct { NotifierBase - Endpoint string - Topic string - log log.Logger + Endpoint string + Topic string + log log.Logger } func (this *KafkaNotifier) Notify(evalContext *alerting.EvalContext) error { @@ -99,16 +99,16 @@ func (this *KafkaNotifier) Notify(evalContext *alerting.EvalContext) error { recordJSON.Set("records", records) body, _ := recordJSON.MarshalJSON() - topicUrl := this.Endpoint+"/topics/"+this.Topic + topicUrl := this.Endpoint + "/topics/" + this.Topic cmd := &m.SendWebhookSync{ Url: topicUrl, Body: string(body), HttpMethod: "POST", HttpHeader: map[string]string{ - "Content-Type": "application/vnd.kafka.json.v2+json", - "Accept": "application/vnd.kafka.v2+json", - }, + "Content-Type": "application/vnd.kafka.json.v2+json", + "Accept": "application/vnd.kafka.v2+json", + }, } if err := bus.DispatchCtx(evalContext.Ctx, cmd); err != nil { From da47dc8947f57ed37a58e6b087c539b2a63b3eed Mon Sep 17 00:00:00 2001 From: utkarshcmu Date: Mon, 9 Oct 2017 18:56:05 -0700 Subject: [PATCH 4/5] Fixed failing go tests --- pkg/services/alerting/notifiers/kafka_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/services/alerting/notifiers/kafka_test.go b/pkg/services/alerting/notifiers/kafka_test.go index 977bea7cf17..045976cb14b 100644 --- a/pkg/services/alerting/notifiers/kafka_test.go +++ b/pkg/services/alerting/notifiers/kafka_test.go @@ -29,7 +29,7 @@ func TestKafkaNotifier(t *testing.T) { Convey("settings should send an event to kafka", func() { json := ` { - "kafkaEndpoint": "http://localhost:8082", + "kafkaRestProxy": "http://localhost:8082", "kafkaTopic": "topic1" }` From 8aff343ce333ebac672487e04a1aac54df238a8e Mon Sep 17 00:00:00 2001 From: utkarshcmu Date: Tue, 10 Oct 2017 02:11:02 -0700 Subject: [PATCH 5/5] Added docs for Kafka alerting --- docs/sources/alerting/notifications.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/docs/sources/alerting/notifications.md b/docs/sources/alerting/notifications.md index de9e5abd472..102134f34dd 100644 --- a/docs/sources/alerting/notifications.md +++ b/docs/sources/alerting/notifications.md @@ -115,6 +115,17 @@ In DingTalk PC Client: Dingtalk supports the following "message type": `text`, `link` and `markdown`. Only the `text` message type is supported. +### Kafka + +Notifications can be sent to a Kafka topic from Grafana using [Kafka REST Proxy](https://docs.confluent.io/1.0/kafka-rest/docs/index.html). +There are couple of configurations options which need to be set in Grafana UI under Kafka Settings: + +1. Kafka REST Proxy endpoint. + +2. Kafka Topic. + +Once these two properties are set, you can send the alerts to Kafka for further processing or throttling them. + ### Other Supported Notification Channels Grafana also supports the following Notification Channels: