diff --git a/pkg/services/alerting/notifiers/kafka.go b/pkg/services/alerting/notifiers/kafka.go
new file mode 100644
index 00000000000..52bbcd74b9e
--- /dev/null
+++ b/pkg/services/alerting/notifiers/kafka.go
@@ -0,0 +1,120 @@
+package notifiers
+
+import (
+ "strconv"
+
+ "fmt"
+
+ "github.com/grafana/grafana/pkg/bus"
+ "github.com/grafana/grafana/pkg/components/simplejson"
+ "github.com/grafana/grafana/pkg/log"
+ m "github.com/grafana/grafana/pkg/models"
+ "github.com/grafana/grafana/pkg/services/alerting"
+)
+
+func init() {
+ alerting.RegisterNotifier(&alerting.NotifierPlugin{
+ Type: "kafka",
+ Name: "Kafka REST Proxy",
+ Description: "Sends notifications to Kafka Rest Proxy",
+ Factory: NewKafkaNotifier,
+ OptionsTemplate: `
+
Kafka settings
+
+ Kafka REST Proxy
+
+
+
+ Topic
+
+
+ `,
+ })
+}
+
+func NewKafkaNotifier(model *m.AlertNotification) (alerting.Notifier, error) {
+ endpoint := model.Settings.Get("kafkaRestProxy").MustString()
+ if endpoint == "" {
+ return nil, alerting.ValidationError{Reason: "Could not find kafka rest proxy endpoint property in settings"}
+ }
+ topic := model.Settings.Get("kafkaTopic").MustString()
+ if topic == "" {
+ return nil, alerting.ValidationError{Reason: "Could not find kafka topic property in settings"}
+ }
+
+ return &KafkaNotifier{
+ NotifierBase: NewNotifierBase(model.Id, model.IsDefault, model.Name, model.Type, model.Settings),
+ Endpoint: endpoint,
+ Topic: topic,
+ log: log.New("alerting.notifier.kafka"),
+ }, nil
+}
+
+type KafkaNotifier struct {
+ NotifierBase
+ Endpoint string
+ Topic string
+ log log.Logger
+}
+
+func (this *KafkaNotifier) Notify(evalContext *alerting.EvalContext) error {
+
+ state := evalContext.Rule.State
+
+ customData := "Triggered metrics:\n\n"
+ for _, evt := range evalContext.EvalMatches {
+ customData = customData + fmt.Sprintf("%s: %v\n", evt.Metric, evt.Value)
+ }
+
+ this.log.Info("Notifying Kafka", "alert_state", state)
+
+ recordJSON := simplejson.New()
+ records := make([]interface{}, 1)
+
+ bodyJSON := simplejson.New()
+ bodyJSON.Set("description", evalContext.Rule.Name+" - "+evalContext.Rule.Message)
+ bodyJSON.Set("client", "Grafana")
+ bodyJSON.Set("details", customData)
+ bodyJSON.Set("incident_key", "alertId-"+strconv.FormatInt(evalContext.Rule.Id, 10))
+
+ ruleUrl, err := evalContext.GetRuleUrl()
+ if err != nil {
+ this.log.Error("Failed get rule link", "error", err)
+ return err
+ }
+ bodyJSON.Set("client_url", ruleUrl)
+
+ if evalContext.ImagePublicUrl != "" {
+ contexts := make([]interface{}, 1)
+ imageJSON := simplejson.New()
+ imageJSON.Set("type", "image")
+ imageJSON.Set("src", evalContext.ImagePublicUrl)
+ contexts[0] = imageJSON
+ bodyJSON.Set("contexts", contexts)
+ }
+
+ valueJSON := simplejson.New()
+ valueJSON.Set("value", bodyJSON)
+ records[0] = valueJSON
+ recordJSON.Set("records", records)
+ body, _ := recordJSON.MarshalJSON()
+
+ topicUrl := this.Endpoint+"/topics/"+this.Topic
+
+ cmd := &m.SendWebhookSync{
+ Url: topicUrl,
+ Body: string(body),
+ HttpMethod: "POST",
+ HttpHeader: map[string]string{
+ "Content-Type": "application/vnd.kafka.json.v2+json",
+ "Accept": "application/vnd.kafka.v2+json",
+ },
+ }
+
+ if err := bus.DispatchCtx(evalContext.Ctx, cmd); err != nil {
+ this.log.Error("Failed to send notification to Kafka", "error", err, "body", string(body))
+ return err
+ }
+
+ return nil
+}
diff --git a/public/app/features/alerting/alert_tab_ctrl.ts b/public/app/features/alerting/alert_tab_ctrl.ts
index 677eec31060..25c23580ed7 100644
--- a/public/app/features/alerting/alert_tab_ctrl.ts
+++ b/public/app/features/alerting/alert_tab_ctrl.ts
@@ -94,6 +94,7 @@ export class AlertTabCtrl {
case "opsgenie": return "fa fa-bell";
case "hipchat": return "fa fa-mail-forward";
case "pushover": return "fa fa-mobile";
+ case "kafka": return "fa fa-random";
}
return 'fa fa-bell';
}