Alerting: Add support for running HA using Redis (#65267)

Co-authored-by: Steve Simpson <steve.simpson@grafana.com>
This commit is contained in:
Jean-Philippe Quéméner 2023-04-19 17:05:26 +02:00 committed by GitHub
parent 33186e3e23
commit bc11a484ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 806 additions and 8 deletions

View File

@ -962,6 +962,26 @@ admin_config_poll_interval = 60s
# The interval string is a possibly signed sequence of decimal numbers, followed by a unit suffix (ms, s, m, h, d), e.g. 30s or 1m.
alertmanager_config_poll_interval = 60s
# The redis server address that should be connected to.
ha_redis_address =
# The username that should be used to authenticate with the redis server.
ha_redis_username =
# The password that should be used to authenticate with the redis server.
ha_redis_password =
# The redis database, by default it's 0.
ha_redis_db =
# A prefix that is used for every key or channel that is created on the redis server
# as part of HA for alerting.
ha_redis_prefix =
# The name of the cluster peer that will be used as identifier. If none is
# provided, a random one will be generated.
ha_redis_peer_name =
# Listen address/hostname and port to receive unified alerting messages for other Grafana instances. The port is used for both TCP and UDP. It is assumed other Grafana instances are also running on the same port.
ha_listen_address = "0.0.0.0:9094"

View File

@ -931,6 +931,26 @@
# The interval string is a possibly signed sequence of decimal numbers, followed by a unit suffix (ms, s, m, h, d), e.g. 30s or 1m.
;alertmanager_config_poll_interval = 60s
# The redis server address that should be connected to.
;ha_redis_address =
# The username that should be used to authenticate with the redis server.
;ha_redis_username =
# The password that should be used to authenticate with the redis server.
;ha_redis_password =
# The redis database, by default it's 0.
;ha_redis_db =
# A prefix that is used for every key or channel that is created on the redis server
# as part of HA for alerting.
;ha_redis_prefix =
# The name of the cluster peer that will be used as identifier. If none is
# provided, a random one will be generated.
;ha_redis_peer_name =
# Listen address/hostname and port to receive unified alerting messages for other Grafana instances. The port is used for both TCP and UDP. It is assumed other Grafana instances are also running on the same port. The default value is `0.0.0.0:9094`.
;ha_listen_address = "0.0.0.0:9094"

5
go.mod
View File

@ -281,6 +281,9 @@ require (
github.com/Azure/go-ntlmssp v0.0.0-20220621081337-cb9428e4ac1e // indirect
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
github.com/Masterminds/goutils v1.1.1 // indirect
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/alicebob/miniredis v2.5.0+incompatible // indirect
github.com/alicebob/miniredis/v2 v2.30.1 // indirect
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/bmatcuk/doublestar v1.1.1 // indirect
@ -320,6 +323,7 @@ require (
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 // indirect
github.com/redis/go-redis/v9 v9.0.2 // indirect
github.com/rivo/uniseg v0.3.4 // indirect
github.com/rueian/rueidis v0.0.100-go1.18 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
@ -332,6 +336,7 @@ require (
github.com/unknwon/log v0.0.0-20150304194804-e617c87089d3 // indirect
github.com/weaveworks/promrus v1.2.0 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
github.com/yuin/gopher-lua v1.1.0 // indirect
go.opentelemetry.io/otel/metric v0.37.0 // indirect
go.starlark.net v0.0.0-20221020143700-22309ac47eac // indirect
gopkg.in/fsnotify/fsnotify.v1 v1.4.7 // indirect

13
go.sum
View File

@ -287,6 +287,12 @@ github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a/go.mod h1:OMCwj8V
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc=
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE=
github.com/alexflint/go-filemutex v0.0.0-20171022225611-72bdc8eae2ae/go.mod h1:CgnQgUtFrFz9mxFNtED3jI5tLDjKlOM+oUF/sTk6ps0=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis v2.5.0+incompatible h1:yBHoLpsyjupjz3NL3MhKMVkR41j82Yjf3KFv7ApYzUI=
github.com/alicebob/miniredis v2.5.0+incompatible/go.mod h1:8HZjEj4yU0dwhYHky+DxYx+6BMjkBbe5ONFIF1MXffk=
github.com/alicebob/miniredis/v2 v2.30.1 h1:HM1rlQjq1bm9yQcsawJqSZBJ9AYgxvjkMsNtddh90+g=
github.com/alicebob/miniredis/v2 v2.30.1/go.mod h1:b25qWj4fCEsBeAAR2mlb0ufImGC6uH3VlUfb/HS5zKg=
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY=
github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
@ -2047,6 +2053,8 @@ github.com/protocolbuffers/txtpbfmt v0.0.0-20220428173112-74888fd59c2b h1:zd/2RN
github.com/protocolbuffers/txtpbfmt v0.0.0-20220428173112-74888fd59c2b/go.mod h1:KjY0wibdYKc4DYkerHSbguaf3JeIPGhNJBp2BNiFH78=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redis/go-redis/v9 v9.0.2 h1:BA426Zqe/7r56kCcvxYLWe1mkaz71LKF77GwgFzSxfE=
github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEtEHbBQevps=
github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc=
github.com/rhnvrm/simples3 v0.6.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA=
github.com/rivo/tview v0.0.0-20200219210816-cd38d7432498/go.mod h1:6lkG1x+13OShEf0EaOCaTQYyB7d5nSbb181KtjlS+84=
@ -2333,6 +2341,10 @@ github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1
github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 h1:k/gmLsJDWwWqbLCur2yWnJzwQEKRcAHXo6seXGuSwWw=
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA=
github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE=
github.com/yuin/gopher-lua v1.1.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
github.com/yvasiyarov/go-metrics v0.0.0-20140926110328-57bccd1ccd43/go.mod h1:aX5oPXxHm3bOH+xeAttToC8pqch2ScQN/JoXYupl6xs=
github.com/yvasiyarov/gorelic v0.0.0-20141212073537-a9bba5b9ab50/go.mod h1:NUSPSUX/bi6SeDMUh6brw0nXpxHnc96TguQh0+r/ssA=
github.com/yvasiyarov/newrelic_platform_go v0.0.0-20140908184405-b21fdbd4370f/go.mod h1:GlGEuHIJweS1mbCqG+7vt2nvWLzLLnRHbXz5JKd/Qbg=
@ -2748,6 +2760,7 @@ golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190102155601-82a175fd1598/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190129075346-302c3dd5f1cc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190209173611-3b5209105503/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

View File

@ -72,14 +72,44 @@ func NewMultiOrgAlertmanager(cfg *setting.Cfg, configStore AlertingStore, orgSto
decryptFn: decryptFn,
metrics: m,
ns: ns,
peer: &NilPeer{},
}
if err := moa.setupClustering(cfg); err != nil {
return nil, err
}
return moa, nil
}
clusterLogger := l.New("component", "cluster")
moa.peer = &NilPeer{}
func (moa *MultiOrgAlertmanager) setupClustering(cfg *setting.Cfg) error {
clusterLogger := moa.logger.New("component", "clustering")
// We set the settlement timeout to be a multiple of the gossip interval,
// ensuring that a sufficient number of broadcasts have occurred, thereby
// increasing the probability of success when waiting for the cluster to settle.
const settleTimeout = cluster.DefaultGossipInterval * 10
// Redis setup.
if cfg.UnifiedAlerting.HARedisAddr != "" {
redisPeer, err := newRedisPeer(redisConfig{
addr: cfg.UnifiedAlerting.HARedisAddr,
name: cfg.UnifiedAlerting.HARedisPeerName,
prefix: cfg.UnifiedAlerting.HARedisPrefix,
password: cfg.UnifiedAlerting.HARedisPassword,
username: cfg.UnifiedAlerting.HARedisUsername,
db: cfg.UnifiedAlerting.HARedisDB,
}, clusterLogger, moa.metrics.Registerer, cfg.UnifiedAlerting.HAPushPullInterval)
if err != nil {
return fmt.Errorf("unable to initialize redis: %w", err)
}
var ctx context.Context
ctx, moa.settleCancel = context.WithTimeout(context.Background(), 30*time.Second)
go redisPeer.Settle(ctx, settleTimeout)
moa.peer = redisPeer
return nil
}
// Memberlist setup.
if len(cfg.UnifiedAlerting.HAPeers) > 0 {
peer, err := cluster.Create(
clusterLogger,
m.Registerer,
moa.metrics.Registerer,
cfg.UnifiedAlerting.HAListenAddr,
cfg.UnifiedAlerting.HAAdvertiseAddr,
cfg.UnifiedAlerting.HAPeers, // peers
@ -94,22 +124,22 @@ func NewMultiOrgAlertmanager(cfg *setting.Cfg, configStore AlertingStore, orgSto
)
if err != nil {
return nil, fmt.Errorf("unable to initialize gossip mesh: %w", err)
return fmt.Errorf("unable to initialize gossip mesh: %w", err)
}
err = peer.Join(cluster.DefaultReconnectInterval, cluster.DefaultReconnectTimeout)
if err != nil {
l.Error("msg", "unable to join gossip mesh while initializing cluster for high availability mode", "error", err)
moa.logger.Error("msg", "unable to join gossip mesh while initializing cluster for high availability mode", "error", err)
}
// Attempt to verify the number of peers for 30s every 2s. The risk here is what we send a notification "too soon".
// Which should _never_ happen given we share the notification log via the database so the risk of double notification is very low.
var ctx context.Context
ctx, moa.settleCancel = context.WithTimeout(context.Background(), 30*time.Second)
go peer.Settle(ctx, cluster.DefaultGossipInterval*10)
go peer.Settle(ctx, settleTimeout)
moa.peer = peer
return nil
}
return moa, nil
return nil
}
func (moa *MultiOrgAlertmanager) Run(ctx context.Context) error {
@ -308,6 +338,11 @@ func (moa *MultiOrgAlertmanager) StopAndWait() {
moa.logger.Warn("unable to leave the gossip mesh", "error", err)
}
}
r, ok := moa.peer.(*redisPeer)
if ok {
moa.settleCancel()
r.Shutdown()
}
}
// AlertmanagerFor returns the Alertmanager instance for the organization provided.

View File

@ -0,0 +1,65 @@
package notifier
import (
"context"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/alertmanager/cluster"
"github.com/prometheus/alertmanager/cluster/clusterpb"
)
type RedisChannel struct {
p *redisPeer
key string
channel string
msgType string
msgc chan []byte
}
func newRedisChannel(p *redisPeer, key, channel, msgType string) cluster.ClusterChannel {
redisChannel := &RedisChannel{
p: p,
key: key,
channel: channel,
msgType: msgType,
// The buffer size of 200 was taken from the Memberlist implementation.
msgc: make(chan []byte, 200),
}
go redisChannel.handleMessages()
return redisChannel
}
func (c *RedisChannel) handleMessages() {
for {
select {
case <-c.p.shutdownc:
return
case b := <-c.msgc:
pub := c.p.redis.Publish(context.Background(), c.channel, string(b))
// An error here might not be as critical as one might think on first sight.
// The state will eventually be propagated to other members by the full sync.
if pub.Err() != nil {
c.p.messagesPublishFailures.WithLabelValues(c.msgType, reasonRedisIssue).Inc()
c.p.logger.Error("error publishing a message to redis", "err", pub.Err(), "channel", c.channel)
continue
}
c.p.messagesSent.WithLabelValues(c.msgType).Inc()
c.p.messagesSentSize.WithLabelValues(c.msgType).Add(float64(len(b)))
}
}
}
func (c *RedisChannel) Broadcast(b []byte) {
b, err := proto.Marshal(&clusterpb.Part{Key: c.key, Data: b})
if err != nil {
c.p.logger.Error("error marshalling broadcast into proto", "err", err, "channel", c.channel)
return
}
select {
case c.msgc <- b:
default:
// This is not the end of the world, we will catch up when we do a full state sync.
c.p.messagesPublishFailures.WithLabelValues(c.msgType, reasonBufferOverflow).Inc()
c.p.logger.Warn("buffer full, droping message", "channel", c.channel)
}
}

View File

@ -0,0 +1,64 @@
package notifier
import (
"context"
"testing"
"github.com/alicebob/miniredis/v2"
"github.com/prometheus/alertmanager/cluster/clusterpb"
"github.com/prometheus/client_golang/prometheus"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/require"
)
func TestNewRedisChannel(t *testing.T) {
mr, err := miniredis.Run()
require.NoError(t, err)
defer mr.Close()
rdb := redis.NewClient(&redis.Options{
Addr: mr.Addr(),
})
p := &redisPeer{
redis: rdb,
}
channel := newRedisChannel(p, "testKey", "testChannel", "testType")
require.NotNil(t, channel)
}
func TestBroadcastAndHandleMessages(t *testing.T) {
const channelName = "testChannel"
mr, err := miniredis.Run()
require.NoError(t, err)
defer mr.Close()
rdb := redis.NewClient(&redis.Options{
Addr: mr.Addr(),
})
p := &redisPeer{
redis: rdb,
messagesSent: prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{update}),
messagesSentSize: prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{update}),
}
channel := newRedisChannel(p, "testKey", channelName, "testType").(*RedisChannel)
pubSub := rdb.Subscribe(context.Background(), channelName)
msgs := pubSub.Channel()
msg := []byte("test message")
channel.Broadcast(msg)
receivedMsg := <-msgs
var part clusterpb.Part
err = part.Unmarshal([]byte(receivedMsg.Payload))
require.NoError(t, err)
require.Equal(t, channelName, receivedMsg.Channel)
require.Equal(t, msg, part.Data)
}

View File

@ -0,0 +1,564 @@
package notifier
import (
"context"
"fmt"
"sort"
"strconv"
"sync"
"time"
"github.com/gogo/protobuf/proto"
"github.com/google/uuid"
"github.com/prometheus/alertmanager/cluster"
"github.com/prometheus/alertmanager/cluster/clusterpb"
"github.com/prometheus/client_golang/prometheus"
"github.com/redis/go-redis/v9"
"github.com/grafana/grafana/pkg/infra/log"
)
type redisConfig struct {
addr string
username string
password string
db int
name string
prefix string
}
const (
peerPattern = "*"
fullState = "full_state"
fullStateChannel = fullState
fullStateChannelReq = fullStateChannel + ":request"
update = "update"
redisServerLabel = "redis-server"
networkRetryIntervalMin = time.Millisecond * 100
networkRetryIntervalMax = time.Second * 10
membersSyncInterval = time.Second * 5
waitForMsgIdle = time.Millisecond * 100
reasonBufferOverflow = "buffer_overflow"
reasonRedisIssue = "redis_issue"
heartbeatInterval = time.Second * 5
heartbeatTimeout = time.Minute
// The duration we want to return the members if the network is down.
membersValidFor = time.Minute
)
type redisPeer struct {
name string
redis *redis.Client
prefix string
logger log.Logger
states map[string]cluster.State
subs map[string]*redis.PubSub
statesMtx sync.RWMutex
readyc chan struct{}
shutdownc chan struct{}
pushPullInterval time.Duration
messagesReceived *prometheus.CounterVec
messagesReceivedSize *prometheus.CounterVec
messagesSent *prometheus.CounterVec
messagesSentSize *prometheus.CounterVec
messagesPublishFailures *prometheus.CounterVec
nodePingDuration *prometheus.HistogramVec
nodePingFailures prometheus.Counter
// List of active members of the cluster. Should be accessed through the Members function.
members []string
membersMtx sync.Mutex
// The time when we fetched the members from redis the last time successfully.
membersFetchedAt time.Time
}
func newRedisPeer(cfg redisConfig, logger log.Logger, reg prometheus.Registerer,
pushPullInterval time.Duration) (*redisPeer, error) {
name := "peer-" + uuid.New().String()
// If a specific name is provided, overwrite default one.
if cfg.name != "" {
name = cfg.name
}
rdb := redis.NewClient(&redis.Options{
Addr: cfg.addr,
Username: cfg.username,
Password: cfg.password,
DB: cfg.db,
})
cmd := rdb.Ping(context.Background())
if cmd.Err() != nil {
return nil, fmt.Errorf("failed to ping redis: %w", cmd.Err())
}
// Make sure that the prefix uses a colon at the end as deliminator.
if cfg.prefix != "" && cfg.prefix[len(cfg.prefix)-1] != ':' {
cfg.prefix = cfg.prefix + ":"
}
p := &redisPeer{
name: name,
redis: rdb,
logger: logger,
states: map[string]cluster.State{},
subs: map[string]*redis.PubSub{},
pushPullInterval: pushPullInterval,
readyc: make(chan struct{}),
shutdownc: make(chan struct{}),
prefix: cfg.prefix,
members: make([]string, 0),
}
// The metrics for the redis peer are exactly the same as for the official
// upstream Memberlist implementation. Three metrics that doesn't make sense
// for redis are not available: messagesPruned, messagesQueued, nodeAlive.
messagesReceived := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "alertmanager_cluster_messages_received_total",
Help: "Total number of cluster messages received.",
}, []string{"msg_type"})
messagesReceivedSize := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "alertmanager_cluster_messages_received_size_total",
Help: "Total size of cluster messages received.",
}, []string{"msg_type"})
messagesSent := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "alertmanager_cluster_messages_sent_total",
Help: "Total number of cluster messages sent.",
}, []string{"msg_type"})
messagesSentSize := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "alertmanager_cluster_messages_sent_size_total",
Help: "Total size of cluster messages sent.",
}, []string{"msg_type"})
messagesPublishFailures := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "alertmanager_cluster_messages_publish_failures_total",
Help: "Total number of messages that failed to be published.",
}, []string{"msg_type", "reason"})
gossipClusterMembers := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "alertmanager_cluster_members",
Help: "Number indicating current number of members in cluster.",
}, func() float64 {
return float64(p.ClusterSize())
})
peerPosition := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "alertmanager_peer_position",
Help: "Position the Alertmanager instance believes it's in. The position determines a peer's behavior in the cluster.",
}, func() float64 {
return float64(p.Position())
})
healthScore := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "alertmanager_cluster_health_score",
Help: "Health score of the cluster. Lower values are better and zero means 'totally healthy'.",
}, func() float64 {
return float64(p.GetHealthScore())
})
nodePingDuration := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "alertmanager_cluster_pings_seconds",
Help: "Histogram of latencies for ping messages.",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5},
}, []string{"peer"},
)
nodePingFailures := prometheus.NewCounter(prometheus.CounterOpts{
Name: "alertmanager_cluster_pings_failures_total",
Help: "Total number of failed pings.",
})
messagesReceived.WithLabelValues(fullState)
messagesReceivedSize.WithLabelValues(fullState)
messagesReceived.WithLabelValues(update)
messagesReceivedSize.WithLabelValues(update)
messagesSent.WithLabelValues(fullState)
messagesSentSize.WithLabelValues(fullState)
messagesSent.WithLabelValues(update)
messagesSentSize.WithLabelValues(update)
messagesPublishFailures.WithLabelValues(fullState, reasonRedisIssue)
messagesPublishFailures.WithLabelValues(update, reasonRedisIssue)
messagesPublishFailures.WithLabelValues(update, reasonBufferOverflow)
reg.MustRegister(messagesReceived, messagesReceivedSize, messagesSent, messagesSentSize,
gossipClusterMembers, peerPosition, healthScore, nodePingDuration, nodePingFailures,
messagesPublishFailures,
)
p.messagesReceived = messagesReceived
p.messagesReceivedSize = messagesReceivedSize
p.messagesSent = messagesSent
p.messagesSentSize = messagesSentSize
p.messagesPublishFailures = messagesPublishFailures
p.nodePingDuration = nodePingDuration
p.nodePingFailures = nodePingFailures
p.subs[fullStateChannel] = p.redis.Subscribe(context.Background(), p.withPrefix(fullStateChannel))
p.subs[fullStateChannelReq] = p.redis.Subscribe(context.Background(), p.withPrefix(fullStateChannelReq))
go p.heartbeatLoop()
go p.membersSyncLoop()
go p.fullStateSyncPublishLoop()
go p.fullStateSyncReceiveLoop()
go p.fullStateReqReceiveLoop()
return p, nil
}
func (p *redisPeer) withPrefix(str string) string {
return p.prefix + str
}
func (p *redisPeer) heartbeatLoop() {
ticker := time.NewTicker(heartbeatInterval)
for {
select {
case <-ticker.C:
startTime := time.Now()
cmd := p.redis.Set(context.Background(), p.withPrefix(p.name), time.Now().Unix(), time.Minute*5)
reqDur := time.Since(startTime)
if cmd.Err() != nil {
p.nodePingFailures.Inc()
p.logger.Error("error setting the heartbeat key", "err", cmd.Err(), "peer", p.withPrefix(p.name))
continue
}
p.nodePingDuration.WithLabelValues(redisServerLabel).Observe(reqDur.Seconds())
case <-p.shutdownc:
ticker.Stop()
return
}
}
}
func (p *redisPeer) membersSyncLoop() {
ticker := time.NewTicker(membersSyncInterval)
for {
select {
case <-ticker.C:
p.membersSync()
case <-p.shutdownc:
ticker.Stop()
return
}
}
}
func (p *redisPeer) membersSync() {
startTime := time.Now()
// The 100 is a hint for the server, how many records there might be for the
// provided pattern. It _might_ only return the first 100 records, which should
// be more than enough for our use case.
// More here: https://redis.io/commands/scan/
members, _, err := p.redis.Scan(context.Background(), 0, p.withPrefix(peerPattern), 100).Result()
if err != nil {
p.logger.Error("error getting keys from redis", "err", err, "pattern", p.withPrefix(peerPattern))
// To prevent a spike of duplicate messages, we return for the duration of
// membersValidFor the last known members and only empty the list if we do
// not eventually recover.
if p.membersFetchedAt.Before(time.Now().Add(-membersValidFor)) {
p.membersMtx.Lock()
p.members = []string{}
p.membersMtx.Unlock()
return
}
p.logger.Warn("fetching members from redis failed, falling back to last known members", "last_known", p.members)
return
}
// This might happen on startup, when no value is in the store yet.
if len(members) == 0 {
p.membersMtx.Lock()
p.members = []string{}
p.membersMtx.Unlock()
return
}
values := p.redis.MGet(context.Background(), members...)
if values.Err() != nil {
p.logger.Error("error getting values from redis", "err", values.Err(), "keys", members)
}
// After getting the list of possible members from redis, we filter
// those out that have failed to send a heartbeat during the heartbeatTimeout.
peers := p.filterUnhealthyMembers(members, values.Val())
sort.Strings(peers)
dur := time.Since(startTime)
p.logger.Debug("membership sync done", "duration_ms", dur.Milliseconds())
p.membersMtx.Lock()
p.members = peers
p.membersMtx.Unlock()
p.membersFetchedAt = time.Now()
}
// filterUnhealthyMembers will filter out the members that have failed to send
// a heartbeat since heartbeatTimeout.
func (p *redisPeer) filterUnhealthyMembers(members []string, values []interface{}) []string {
peers := []string{}
for i, peer := range members {
val := values[i]
if val == nil {
continue
}
ts, err := strconv.ParseInt(val.(string), 10, 64)
if err != nil {
p.logger.Error("error parsing timestamp value", "err", err, "peer", peer, "val", val)
continue
}
tm := time.Unix(ts, 0)
if tm.Before(time.Now().Add(-heartbeatTimeout)) {
continue
}
peers = append(peers, peer)
}
return peers
}
func (p *redisPeer) Position() int {
for i, peer := range p.Members() {
if peer == p.withPrefix(p.name) {
p.logger.Debug("cluster position found", "name", p.name, "position", i)
return i
}
}
p.logger.Warn("failed to look up position, falling back to position 0")
return 0
}
// Returns the known size of the Cluster. This also includes dead nodes that
// haven't timeout yet.
func (p *redisPeer) ClusterSize() int {
scan := p.redis.Scan(context.Background(), 0, p.withPrefix(peerPattern), 100)
if scan.Err() != nil {
p.logger.Error("error getting keys from redis", "err", scan.Err(), "pattern", p.withPrefix(peerPattern))
return 0
}
members, _ := scan.Val()
return len(members)
}
// If the cluster is healthy it should return 0, otherwise the number of
// unhealthy nodes.
func (p *redisPeer) GetHealthScore() int {
size := p.ClusterSize()
members := len(p.Members())
if size > members {
return size - members
}
return 0
}
// Members returns a list of active cluster Members.
func (p *redisPeer) Members() []string {
p.membersMtx.Lock()
defer p.membersMtx.Unlock()
return p.members
}
func (p *redisPeer) WaitReady(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-p.readyc:
return nil
}
}
// Settle is mostly copied from uptream.
// Ref: https://github.com/prometheus/alertmanager/blob/2888649b473970400c0bd375fdd563486dc80481/cluster/cluster.go#L674-L712
func (p *redisPeer) Settle(ctx context.Context, interval time.Duration) {
const NumOkayRequired = 3
p.logger.Info("Waiting for gossip to settle...", "interval", interval)
start := time.Now()
nPeers := 0
nOkay := 0
totalPolls := 0
for {
select {
case <-ctx.Done():
elapsed := time.Since(start)
p.logger.Info("gossip not settled but continuing anyway", "polls", totalPolls, "elapsed", elapsed)
close(p.readyc)
return
case <-time.After(interval):
}
elapsed := time.Since(start)
n := len(p.Members())
if nOkay >= NumOkayRequired {
p.logger.Info("gossip settled; proceeding", "elapsed", elapsed)
break
}
if n == nPeers {
nOkay++
p.logger.Debug("gossip looks settled", "elapsed", elapsed)
} else {
nOkay = 0
p.logger.Info("gossip not settled", "polls", totalPolls, "before", nPeers, "now", n, "elapsed", elapsed)
}
nPeers = n
totalPolls++
}
p.requestFullState()
close(p.readyc)
}
func (p *redisPeer) AddState(key string, state cluster.State, _ prometheus.Registerer) cluster.ClusterChannel {
p.statesMtx.Lock()
defer p.statesMtx.Unlock()
p.states[key] = state
// As we also want to get the state from other nodes, we subscribe to the key.
sub := p.redis.Subscribe(context.Background(), p.withPrefix(key))
go p.receiveLoop(key, sub)
p.subs[key] = sub
return newRedisChannel(p, key, p.withPrefix(key), update)
}
func (p *redisPeer) receiveLoop(name string, channel *redis.PubSub) {
for {
select {
case <-p.shutdownc:
return
case data := <-channel.Channel():
p.mergePartialState([]byte(data.Payload))
default:
time.Sleep(waitForMsgIdle)
}
}
}
func (p *redisPeer) mergePartialState(buf []byte) {
p.messagesReceived.WithLabelValues(update).Inc()
p.messagesReceivedSize.WithLabelValues(update).Add(float64(len(buf)))
var part clusterpb.Part
if err := proto.Unmarshal(buf, &part); err != nil {
p.logger.Warn("error decoding the received broadcast message", "err", err)
return
}
p.statesMtx.RLock()
s, ok := p.states[part.Key]
p.statesMtx.RUnlock()
if !ok {
return
}
if err := s.Merge(part.Data); err != nil {
p.logger.Warn("error merging the received broadcast message", "err", err, "key", part.Key)
return
}
p.logger.Debug("partial state was successfully merged", "key", part.Key)
}
func (p *redisPeer) fullStateReqReceiveLoop() {
for {
select {
case <-p.shutdownc:
return
case data := <-p.subs[fullStateChannelReq].Channel():
// The payload of a full state request is the name of the peer that is
// requesting the full state. In case we received our own request, we
// can just ignore it. Redis pub/sub fanouts to all clients, regardless
// if a client was also the publisher.
if data.Payload == p.name {
continue
}
p.fullStateSyncPublish()
default:
time.Sleep(waitForMsgIdle)
}
}
}
func (p *redisPeer) fullStateSyncReceiveLoop() {
for {
select {
case <-p.shutdownc:
return
case data := <-p.subs[fullStateChannel].Channel():
p.mergeFullState([]byte(data.Payload))
default:
time.Sleep(waitForMsgIdle)
}
}
}
func (p *redisPeer) mergeFullState(buf []byte) {
p.messagesReceived.WithLabelValues(fullState).Inc()
p.messagesReceivedSize.WithLabelValues(fullState).Add(float64(len(buf)))
var fs clusterpb.FullState
if err := proto.Unmarshal(buf, &fs); err != nil {
p.logger.Warn("error unmarshaling the received remote state", "err", err)
return
}
p.statesMtx.RLock()
defer p.statesMtx.RUnlock()
for _, part := range fs.Parts {
s, ok := p.states[part.Key]
if !ok {
p.logger.Warn("received", "unknown state key", "len", len(buf), "key", part.Key)
continue
}
if err := s.Merge(part.Data); err != nil {
p.logger.Warn("error merging the received remote state", "err", err, "key", part.Key)
return
}
}
p.logger.Debug("full state was successfully merged")
}
func (p *redisPeer) fullStateSyncPublish() {
pub := p.redis.Publish(context.Background(), p.withPrefix(fullStateChannel), p.LocalState())
if pub.Err() != nil {
p.messagesPublishFailures.WithLabelValues(fullState, reasonRedisIssue).Inc()
p.logger.Error("error publishing a message to redis", "err", pub.Err(), "channel", p.withPrefix(fullStateChannel))
}
}
func (p *redisPeer) fullStateSyncPublishLoop() {
ticker := time.NewTicker(p.pushPullInterval)
for {
select {
case <-ticker.C:
p.fullStateSyncPublish()
case <-p.shutdownc:
ticker.Stop()
return
}
}
}
func (p *redisPeer) requestFullState() {
pub := p.redis.Publish(context.Background(), p.withPrefix(fullStateChannelReq), p.name)
if pub.Err() != nil {
p.messagesPublishFailures.WithLabelValues(fullState, reasonRedisIssue).Inc()
p.logger.Error("error publishing a message to redis", "err", pub.Err(), "channel", p.withPrefix(fullStateChannelReq))
}
}
func (p *redisPeer) LocalState() []byte {
p.statesMtx.RLock()
defer p.statesMtx.RUnlock()
all := &clusterpb.FullState{
Parts: make([]clusterpb.Part, 0, len(p.states)),
}
for key, s := range p.states {
b, err := s.MarshalBinary()
if err != nil {
p.logger.Warn("error encoding the local state", "err", err, "key", key)
}
all.Parts = append(all.Parts, clusterpb.Part{Key: key, Data: b})
}
b, err := proto.Marshal(all)
if err != nil {
p.logger.Warn("error encoding the local state to proto", "err", err)
}
p.messagesSent.WithLabelValues(fullState).Inc()
p.messagesSentSize.WithLabelValues(fullState).Add(float64(len(b)))
return b
}
func (p *redisPeer) Shutdown() {
p.logger.Info("Stopping redis peer...")
close(p.shutdownc)
p.fullStateSyncPublish()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
del := p.redis.Del(ctx, p.withPrefix(p.name))
if del.Err() != nil {
p.logger.Error("error deleting the redis key on shutdown", "err", del.Err(), "key", p.withPrefix(p.name))
}
}

View File

@ -71,6 +71,12 @@ type UnifiedAlertingSettings struct {
HAPeerTimeout time.Duration
HAGossipInterval time.Duration
HAPushPullInterval time.Duration
HARedisAddr string
HARedisPeerName string
HARedisPrefix string
HARedisUsername string
HARedisPassword string
HARedisDB int
MaxAttempts int64
MinInterval time.Duration
EvaluationTimeout time.Duration
@ -224,6 +230,12 @@ func (cfg *Cfg) ReadUnifiedAlertingSettings(iniFile *ini.File) error {
}
uaCfg.HAListenAddr = ua.Key("ha_listen_address").MustString(alertmanagerDefaultClusterAddr)
uaCfg.HAAdvertiseAddr = ua.Key("ha_advertise_address").MustString("")
uaCfg.HARedisAddr = ua.Key("ha_redis_address").MustString("")
uaCfg.HARedisPeerName = ua.Key("ha_redis_peer_name").MustString("")
uaCfg.HARedisPrefix = ua.Key("ha_redis_prefix").MustString("")
uaCfg.HARedisUsername = ua.Key("ha_redis_username").MustString("")
uaCfg.HARedisPassword = ua.Key("ha_redis_password").MustString("")
uaCfg.HARedisDB = ua.Key("ha_redis_db").MustInt(0)
peers := ua.Key("ha_peers").MustString("")
uaCfg.HAPeers = make([]string, 0)
if peers != "" {