Alerting: Update Kafka receiver to use encoding/json to build messages (#60593)

Co-authored-by: Santiago <santiagohernandez.1997@gmail.com>
This commit is contained in:
Yuri Tseretyan 2022-12-20 14:20:09 -05:00 committed by GitHub
parent 3e52d5e72b
commit aaa55b4252
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -14,10 +14,32 @@ import (
"github.com/grafana/alerting/alerting/notifier/channels"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
)
type kafkaBody struct {
Records []kafkaRecordEnvelope `json:"records"`
}
type kafkaRecordEnvelope struct {
Value kafkaRecord `json:"value"`
}
type kafkaRecord struct {
Description string `json:"description"`
Client string `json:"client,omitempty"`
Details string `json:"details,omitempty"`
AlertState models.AlertStateType `json:"alert_state,omitempty"`
ClientURL string `json:"client_url,omitempty"`
Contexts []kafkaContext `json:"contexts,omitempty"`
IncidentKey string `json:"incident_key,omitempty"`
}
type kafkaContext struct {
Type string `json:"type"`
Source string `json:"src"`
}
// KafkaNotifier is responsible for sending
// alert notifications to Kafka.
type KafkaNotifier struct {
@ -125,36 +147,36 @@ func (kn *KafkaNotifier) SendResolved() bool {
}
func (kn *KafkaNotifier) buildBody(ctx context.Context, tmpl func(string) string, as ...*types.Alert) (string, error) {
bodyJSON := simplejson.New()
bodyJSON.Set("client", "Grafana")
bodyJSON.Set("description", tmpl(kn.settings.Description))
bodyJSON.Set("details", tmpl(kn.settings.Details))
var record kafkaRecord
record.Client = "Grafana"
record.Description = tmpl(kn.settings.Description)
record.Details = tmpl(kn.settings.Details)
state := buildState(as...)
kn.log.Debug("notifying Kafka", "alert_state", state)
bodyJSON.Set("alert_state", state)
record.AlertState = state
ruleURL := joinUrlPath(kn.tmpl.ExternalURL.String(), "/alerting/list", kn.log)
bodyJSON.Set("client_url", ruleURL)
record.ClientURL = ruleURL
contexts := buildContextImages(ctx, kn.log, kn.images, as...)
if len(contexts) > 0 {
bodyJSON.Set("contexts", contexts)
record.Contexts = contexts
}
groupKey, err := notify.ExtractGroupKey(ctx)
if err != nil {
return "", err
}
bodyJSON.Set("incident_key", groupKey.Hash())
record.IncidentKey = groupKey.Hash()
valueJSON := simplejson.New()
valueJSON.Set("value", bodyJSON)
records := kafkaBody{
Records: []kafkaRecordEnvelope{
{Value: record},
},
}
recordJSON := simplejson.New()
recordJSON.Set("records", []interface{}{valueJSON})
body, err := recordJSON.MarshalJSON()
body, err := json.Marshal(records)
if err != nil {
return "", err
}
@ -170,15 +192,15 @@ func buildState(as ...*types.Alert) models.AlertStateType {
return models.AlertStateAlerting
}
func buildContextImages(ctx context.Context, l channels.Logger, imageStore channels.ImageStore, as ...*types.Alert) []interface{} {
var contexts []interface{}
func buildContextImages(ctx context.Context, l channels.Logger, imageStore channels.ImageStore, as ...*types.Alert) []kafkaContext {
var contexts []kafkaContext
_ = withStoredImages(ctx, l, imageStore,
func(_ int, image channels.Image) error {
if image.URL != "" {
imageJSON := simplejson.New()
imageJSON.Set("type", "image")
imageJSON.Set("src", image.URL)
contexts = append(contexts, imageJSON)
contexts = append(contexts, kafkaContext{
Type: "image",
Source: image.URL,
})
}
return nil
}, as...)