mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
notifications: Fix confused trace spans in SMTP client (#90559)
Signed-off-by: Dave Henderson <dave.henderson@grafana.com>
This commit is contained in:
parent
418b077c59
commit
05b66aac5f
1
go.mod
1
go.mod
@ -124,6 +124,7 @@ require (
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.4 // @grafana/alerting-backend
|
||||
github.com/microsoft/go-mssqldb v1.7.0 // @grafana/grafana-bi-squad
|
||||
github.com/mitchellh/mapstructure v1.5.1-0.20231216201459-8508981c8b6c //@grafana/identity-access-team
|
||||
github.com/mocktools/go-smtp-mock/v2 v2.3.0 // @grafana/grafana-backend-group
|
||||
github.com/modern-go/reflect2 v1.0.2 // @grafana/alerting-backend
|
||||
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // @grafana/alerting-backend
|
||||
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // @grafana/grafana-operator-experience-squad
|
||||
|
2
go.sum
2
go.sum
@ -2835,6 +2835,8 @@ github.com/moby/sys/user v0.1.0/go.mod h1:fKJhFOnsCN6xZ5gSfbM6zaHGgDJMrqt9/reuj4
|
||||
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6/go.mod h1:E2VnQOmVuvZB6UYnnDB0qG5Nq/1tD9acaOpo6xmt0Kw=
|
||||
github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=
|
||||
github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y=
|
||||
github.com/mocktools/go-smtp-mock/v2 v2.3.0 h1:jgTDBEoQ8Kpw/fPWxy6qR2pGwtNn5j01T3Wut4xJo5Y=
|
||||
github.com/mocktools/go-smtp-mock/v2 v2.3.0/go.mod h1:n8aNpDYncZHH/cZHtJKzQyeYT/Dut00RghVM+J1Ed94=
|
||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
|
@ -14,11 +14,11 @@ import (
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/propagation"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
gomail "gopkg.in/mail.v2"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
)
|
||||
|
||||
@ -47,40 +47,47 @@ func (sc *SmtpClient) Send(ctx context.Context, messages ...*Message) (int, erro
|
||||
defer span.End()
|
||||
|
||||
sentEmailsCount := 0
|
||||
|
||||
dialer, err := sc.createDialer()
|
||||
if err != nil {
|
||||
return sentEmailsCount, err
|
||||
}
|
||||
|
||||
for _, msg := range messages {
|
||||
span.SetAttributes(
|
||||
attribute.String("smtp.sender", msg.From),
|
||||
attribute.StringSlice("smtp.recipients", msg.To),
|
||||
)
|
||||
|
||||
m := sc.buildEmail(ctx, msg)
|
||||
|
||||
innerError := dialer.DialAndSend(m)
|
||||
emailsSentTotal.Inc()
|
||||
if innerError != nil {
|
||||
// As gomail does not returned typed errors we have to parse the error
|
||||
// to catch invalid error when the address is invalid.
|
||||
// https://github.com/go-gomail/gomail/blob/81ebce5c23dfd25c6c67194b37d3dd3f338c98b1/send.go#L113
|
||||
if !strings.HasPrefix(innerError.Error(), "gomail: invalid address") {
|
||||
emailsSentFailed.Inc()
|
||||
}
|
||||
|
||||
err = fmt.Errorf("failed to send email: %w", innerError)
|
||||
span.RecordError(err)
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
|
||||
continue
|
||||
err := sc.sendMessage(ctx, dialer, msg)
|
||||
if err != nil {
|
||||
return sentEmailsCount, err
|
||||
}
|
||||
|
||||
sentEmailsCount++
|
||||
}
|
||||
|
||||
return sentEmailsCount, err
|
||||
return sentEmailsCount, nil
|
||||
}
|
||||
|
||||
func (sc *SmtpClient) sendMessage(ctx context.Context, dialer *gomail.Dialer, msg *Message) error {
|
||||
ctx, span := tracer.Start(ctx, "notifications.SmtpClient.sendMessage", trace.WithAttributes(
|
||||
attribute.String("smtp.sender", msg.From),
|
||||
attribute.StringSlice("smtp.recipients", msg.To),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
m := sc.buildEmail(ctx, msg)
|
||||
|
||||
err := dialer.DialAndSend(m)
|
||||
emailsSentTotal.Inc()
|
||||
if err != nil {
|
||||
// As gomail does not returned typed errors we have to parse the error
|
||||
// to catch invalid error when the address is invalid.
|
||||
// https://github.com/go-gomail/gomail/blob/81ebce5c23dfd25c6c67194b37d3dd3f338c98b1/send.go#L113
|
||||
if !strings.HasPrefix(err.Error(), "gomail: invalid address") {
|
||||
emailsSentFailed.Inc()
|
||||
}
|
||||
|
||||
return tracing.Errorf(span, "failed to send email: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// buildEmail converts the Message DTO to a gomail message.
|
||||
|
@ -1,11 +1,17 @@
|
||||
package notifications
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/textproto"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
smtpmock "github.com/mocktools/go-smtp-mock/v2"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
@ -142,3 +148,185 @@ func TestSmtpDialer(t *testing.T) {
|
||||
require.EqualError(t, err, "could not load cert or key file: open /var/certs/does-not-exist.pem: no such file or directory")
|
||||
})
|
||||
}
|
||||
|
||||
func TestSmtpSend(t *testing.T) {
|
||||
srv := smtpmock.New(smtpmock.ConfigurationAttr{
|
||||
MultipleRcptto: true,
|
||||
})
|
||||
require.NoError(t, srv.Start())
|
||||
defer func() { _ = srv.Stop() }()
|
||||
|
||||
cfg := createSmtpConfig()
|
||||
cfg.Smtp.Host = fmt.Sprintf("127.0.0.1:%d", srv.PortNumber())
|
||||
cfg.Smtp.EnableTracing = true
|
||||
|
||||
client, err := NewSmtpClient(cfg.Smtp)
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
t.Run("single message sends", func(t *testing.T) {
|
||||
tracer := tracing.InitializeTracerForTest()
|
||||
ctx, span := tracer.Start(ctx, "notifications.SmtpClient.SendContext")
|
||||
defer span.End()
|
||||
|
||||
message := &Message{
|
||||
From: "from@example.com",
|
||||
To: []string{"rcpt@example.com"},
|
||||
Subject: "subject",
|
||||
Body: map[string]string{"text/plain": "hello world"},
|
||||
}
|
||||
|
||||
count, err := client.Send(ctx, message)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, count)
|
||||
|
||||
messages := srv.MessagesAndPurge()
|
||||
require.Len(t, messages, 1)
|
||||
sentMsg := messages[0]
|
||||
|
||||
// read the headers
|
||||
r := bufio.NewReader(strings.NewReader(sentMsg.MsgRequest()))
|
||||
mimeReader := textproto.NewReader(r)
|
||||
hdr, err := mimeReader.ReadMIMEHeader()
|
||||
require.NoError(t, err)
|
||||
|
||||
// make sure the trace is propagated
|
||||
traceId := span.SpanContext().TraceID().String()
|
||||
hasPrefix := strings.HasPrefix(hdr.Get("traceparent"), "00-"+traceId+"-")
|
||||
require.True(t, hasPrefix)
|
||||
|
||||
// one of the lines should be the body we expect!
|
||||
found := false
|
||||
for {
|
||||
line, err := mimeReader.ReadLine()
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Logf("line: %q", line)
|
||||
if strings.Contains(line, "hello world") {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
require.True(t, found)
|
||||
})
|
||||
|
||||
t.Run("multiple recipients, single message", func(t *testing.T) {
|
||||
tracer := tracing.InitializeTracerForTest()
|
||||
ctx, span := tracer.Start(ctx, "notifications.SmtpClient.SendContext")
|
||||
defer span.End()
|
||||
|
||||
message := &Message{
|
||||
From: "from@example.com",
|
||||
To: []string{"rcpt1@example.com", "rcpt2@example.com", "rcpt3@example.com"},
|
||||
Subject: "subject",
|
||||
Body: map[string]string{"text/plain": "hello world"},
|
||||
}
|
||||
|
||||
count, err := client.Send(ctx, message)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, count)
|
||||
|
||||
messages := srv.MessagesAndPurge()
|
||||
require.Len(t, messages, 1)
|
||||
sentMsg := messages[0]
|
||||
|
||||
rcpts := sentMsg.RcpttoRequestResponse()
|
||||
require.EqualValues(t, [][]string{
|
||||
{"RCPT TO:<rcpt1@example.com>", "250 Received"},
|
||||
{"RCPT TO:<rcpt2@example.com>", "250 Received"},
|
||||
{"RCPT TO:<rcpt3@example.com>", "250 Received"},
|
||||
}, rcpts)
|
||||
|
||||
// read the headers
|
||||
r := bufio.NewReader(strings.NewReader(sentMsg.MsgRequest()))
|
||||
mimeReader := textproto.NewReader(r)
|
||||
hdr, err := mimeReader.ReadMIMEHeader()
|
||||
require.NoError(t, err)
|
||||
|
||||
// make sure the trace is propagated
|
||||
traceId := span.SpanContext().TraceID().String()
|
||||
hasPrefix := strings.HasPrefix(hdr.Get("traceparent"), "00-"+traceId+"-")
|
||||
require.True(t, hasPrefix)
|
||||
|
||||
// one of the lines should be the body we expect!
|
||||
found := false
|
||||
for {
|
||||
line, err := mimeReader.ReadLine()
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Logf("line: %q", line)
|
||||
if strings.Contains(line, "hello world") {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
require.True(t, found)
|
||||
})
|
||||
|
||||
t.Run("multiple recipients, multiple messages", func(t *testing.T) {
|
||||
tracer := tracing.InitializeTracerForTest()
|
||||
ctx, span := tracer.Start(ctx, "notifications.SmtpClient.SendContext")
|
||||
defer span.End()
|
||||
|
||||
msgs := []*Message{
|
||||
{From: "from@example.com", To: []string{"rcpt1@example.com"},
|
||||
Subject: "subject", Body: map[string]string{"text/plain": "hello world"}},
|
||||
{From: "from@example.com", To: []string{"rcpt2@example.com"},
|
||||
Subject: "subject", Body: map[string]string{"text/plain": "hello world"}},
|
||||
{From: "from@example.com", To: []string{"rcpt3@example.com"},
|
||||
Subject: "subject", Body: map[string]string{"text/plain": "hello world"}},
|
||||
}
|
||||
|
||||
count, err := client.Send(ctx, msgs...)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 3, count)
|
||||
|
||||
messages := srv.MessagesAndPurge()
|
||||
require.Len(t, messages, 3)
|
||||
|
||||
for i, sentMsg := range messages {
|
||||
rcpts := sentMsg.RcpttoRequestResponse()
|
||||
require.EqualValues(t, [][]string{
|
||||
{fmt.Sprintf("RCPT TO:<rcpt%d@example.com>", i+1), "250 Received"},
|
||||
}, rcpts)
|
||||
|
||||
// read the headers
|
||||
r := bufio.NewReader(strings.NewReader(sentMsg.MsgRequest()))
|
||||
mimeReader := textproto.NewReader(r)
|
||||
hdr, err := mimeReader.ReadMIMEHeader()
|
||||
require.NoError(t, err)
|
||||
|
||||
// make sure the trace is propagated
|
||||
traceId := span.SpanContext().TraceID().String()
|
||||
hasPrefix := strings.HasPrefix(hdr.Get("traceparent"), "00-"+traceId+"-")
|
||||
require.True(t, hasPrefix)
|
||||
|
||||
// one of the lines should be the body we expect!
|
||||
found := false
|
||||
for {
|
||||
line, err := mimeReader.ReadLine()
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Logf("line: %q", line)
|
||||
if strings.Contains(line, "hello world") {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
require.True(t, found)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user