mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Alerting: Fix HA alerting membership sync (#70607)
* Alerting: Fix HA alerting membership sync * Added comment about filtering duplicates
This commit is contained in:
parent
0f2922d709
commit
1d68f5ba77
@ -13,6 +13,7 @@ import (
|
||||
"github.com/prometheus/alertmanager/cluster"
|
||||
"github.com/prometheus/alertmanager/cluster/clusterpb"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"golang.org/x/exp/slices"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
|
||||
@ -239,11 +240,7 @@ func (p *redisPeer) membersSyncLoop() {
|
||||
|
||||
func (p *redisPeer) membersSync() {
|
||||
startTime := time.Now()
|
||||
// The 100 is a hint for the server, how many records there might be for the
|
||||
// provided pattern. It _might_ only return the first 100 records, which should
|
||||
// be more than enough for our use case.
|
||||
// More here: https://redis.io/commands/scan/
|
||||
members, _, err := p.redis.Scan(context.Background(), 0, p.withPrefix(peerPattern), 100).Result()
|
||||
members, err := p.membersScan()
|
||||
if err != nil {
|
||||
p.logger.Error("error getting keys from redis", "err", err, "pattern", p.withPrefix(peerPattern))
|
||||
// To prevent a spike of duplicate messages, we return for the duration of
|
||||
@ -273,6 +270,11 @@ func (p *redisPeer) membersSync() {
|
||||
// those out that have failed to send a heartbeat during the heartbeatTimeout.
|
||||
peers := p.filterUnhealthyMembers(members, values.Val())
|
||||
sort.Strings(peers)
|
||||
|
||||
// Redis Scan may return duplicate elements
|
||||
// Filtering duplicates with Compact after sorting to prevent inconsistencies when calculating Position
|
||||
peers = slices.Compact(peers)
|
||||
|
||||
dur := time.Since(startTime)
|
||||
p.logger.Debug("membership sync done", "duration_ms", dur.Milliseconds())
|
||||
p.membersMtx.Lock()
|
||||
@ -281,6 +283,30 @@ func (p *redisPeer) membersSync() {
|
||||
p.membersFetchedAt = time.Now()
|
||||
}
|
||||
|
||||
func (p *redisPeer) membersScan() ([]string, error) {
|
||||
var (
|
||||
cursor uint64
|
||||
err error
|
||||
members = []string{}
|
||||
keys []string
|
||||
)
|
||||
// The 100 is a hint for the server, how many records there might be for the
|
||||
// provided pattern. It _might_ only return the first 100 records, which should
|
||||
// be more than enough for our use case.
|
||||
// More here: https://redis.io/commands/scan/
|
||||
for {
|
||||
keys, cursor, err = p.redis.Scan(context.Background(), cursor, p.withPrefix(peerPattern), 100).Result()
|
||||
if err != nil {
|
||||
return []string{}, err
|
||||
}
|
||||
members = append(members, keys...)
|
||||
if cursor == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
return members, nil
|
||||
}
|
||||
|
||||
// filterUnhealthyMembers will filter out the members that have failed to send
|
||||
// a heartbeat since heartbeatTimeout.
|
||||
func (p *redisPeer) filterUnhealthyMembers(members []string, values []interface{}) []string {
|
||||
@ -318,12 +344,11 @@ func (p *redisPeer) Position() int {
|
||||
// Returns the known size of the Cluster. This also includes dead nodes that
|
||||
// haven't timeout yet.
|
||||
func (p *redisPeer) ClusterSize() int {
|
||||
scan := p.redis.Scan(context.Background(), 0, p.withPrefix(peerPattern), 100)
|
||||
if scan.Err() != nil {
|
||||
p.logger.Error("error getting keys from redis", "err", scan.Err(), "pattern", p.withPrefix(peerPattern))
|
||||
members, err := p.membersScan()
|
||||
if err != nil {
|
||||
p.logger.Error("error getting keys from redis", "err", err, "pattern", p.withPrefix(peerPattern))
|
||||
return 0
|
||||
}
|
||||
members, _ := scan.Val()
|
||||
return len(members)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user