Alerting: Update alerting package and imports from cluster and clusterpb (#79786)

* Alerting: Update alerting package

* update to latest commit

* alias for imports
This commit is contained in:
Santiago
2023-12-21 12:34:48 +01:00
committed by GitHub
parent c1c3e9387a
commit c46da8ea9b
7 changed files with 33 additions and 33 deletions

View File

@@ -7,8 +7,8 @@ import (
"os"
"path/filepath"
alertingClusterPB "github.com/grafana/alerting/cluster/clusterpb"
alertingNotify "github.com/grafana/alerting/notify"
"github.com/prometheus/alertmanager/cluster/clusterpb"
"github.com/grafana/grafana/pkg/infra/kvstore"
"github.com/grafana/grafana/pkg/infra/log"
@@ -77,7 +77,7 @@ func (fileStore *FileStore) GetFullState(ctx context.Context, filenames ...strin
return "", fmt.Errorf("no values for org %d", fileStore.orgID)
}
var parts []clusterpb.Part
var parts []alertingClusterPB.Part
for _, f := range filenames {
v, ok := keys[f]
if !ok {
@@ -88,10 +88,10 @@ func (fileStore *FileStore) GetFullState(ctx context.Context, filenames ...strin
if err != nil {
return "", fmt.Errorf("error decoding value for key %q", f)
}
parts = append(parts, clusterpb.Part{Key: f, Data: b})
parts = append(parts, alertingClusterPB.Part{Key: f, Data: b})
}
fs := clusterpb.FullState{
fs := alertingClusterPB.FullState{
Parts: parts,
}
b, err := fs.Marshal()

View File

@@ -9,7 +9,7 @@ import (
"sync"
"time"
"github.com/prometheus/alertmanager/cluster"
alertingCluster "github.com/grafana/alerting/cluster"
"github.com/prometheus/client_golang/prometheus"
alertingNotify "github.com/grafana/alerting/notify"
@@ -137,7 +137,7 @@ func (moa *MultiOrgAlertmanager) setupClustering(cfg *setting.Cfg) error {
// We set the settlement timeout to be a multiple of the gossip interval,
// ensuring that a sufficient number of broadcasts have occurred, thereby
// increasing the probability of success when waiting for the cluster to settle.
const settleTimeout = cluster.DefaultGossipInterval * 10
const settleTimeout = alertingCluster.DefaultGossipInterval * 10
// Redis setup.
if cfg.UnifiedAlerting.HARedisAddr != "" {
redisPeer, err := newRedisPeer(redisConfig{
@@ -160,7 +160,7 @@ func (moa *MultiOrgAlertmanager) setupClustering(cfg *setting.Cfg) error {
}
// Memberlist setup.
if len(cfg.UnifiedAlerting.HAPeers) > 0 {
peer, err := cluster.Create(
peer, err := alertingCluster.Create(
clusterLogger,
moa.metrics.Registerer,
cfg.UnifiedAlerting.HAListenAddr,
@@ -169,9 +169,9 @@ func (moa *MultiOrgAlertmanager) setupClustering(cfg *setting.Cfg) error {
true,
cfg.UnifiedAlerting.HAPushPullInterval,
cfg.UnifiedAlerting.HAGossipInterval,
cluster.DefaultTCPTimeout,
cluster.DefaultProbeTimeout,
cluster.DefaultProbeInterval,
alertingCluster.DefaultTCPTimeout,
alertingCluster.DefaultProbeTimeout,
alertingCluster.DefaultProbeInterval,
nil,
true,
cfg.UnifiedAlerting.HALabel,
@@ -181,7 +181,7 @@ func (moa *MultiOrgAlertmanager) setupClustering(cfg *setting.Cfg) error {
return fmt.Errorf("unable to initialize gossip mesh: %w", err)
}
err = peer.Join(cluster.DefaultReconnectInterval, cluster.DefaultReconnectTimeout)
err = peer.Join(alertingCluster.DefaultReconnectInterval, alertingCluster.DefaultReconnectTimeout)
if err != nil {
moa.logger.Error("Msg", "Unable to join gossip mesh while initializing cluster for high availability mode", "error", err)
}
@@ -385,7 +385,7 @@ func (moa *MultiOrgAlertmanager) StopAndWait() {
am.StopAndWait()
}
p, ok := moa.peer.(*cluster.Peer)
p, ok := moa.peer.(*alertingCluster.Peer)
if ok {
moa.settleCancel()
if err := p.Leave(10 * time.Second); err != nil {
@@ -423,7 +423,7 @@ type NilPeer struct{}
func (p *NilPeer) Position() int { return 0 }
func (p *NilPeer) WaitReady(context.Context) error { return nil }
func (p *NilPeer) AddState(string, cluster.State, prometheus.Registerer) cluster.ClusterChannel {
func (p *NilPeer) AddState(string, alertingCluster.State, prometheus.Registerer) alertingCluster.ClusterChannel {
return &NilChannel{}
}

View File

@@ -4,8 +4,8 @@ import (
"context"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/alertmanager/cluster"
"github.com/prometheus/alertmanager/cluster/clusterpb"
alertingCluster "github.com/grafana/alerting/cluster"
alertingClusterPB "github.com/grafana/alerting/cluster/clusterpb"
)
type RedisChannel struct {
@@ -16,7 +16,7 @@ type RedisChannel struct {
msgc chan []byte
}
func newRedisChannel(p *redisPeer, key, channel, msgType string) cluster.ClusterChannel {
func newRedisChannel(p *redisPeer, key, channel, msgType string) alertingCluster.ClusterChannel {
redisChannel := &RedisChannel{
p: p,
key: key,
@@ -50,7 +50,7 @@ func (c *RedisChannel) handleMessages() {
}
func (c *RedisChannel) Broadcast(b []byte) {
b, err := proto.Marshal(&clusterpb.Part{Key: c.key, Data: b})
b, err := proto.Marshal(&alertingClusterPB.Part{Key: c.key, Data: b})
if err != nil {
c.p.logger.Error("Error marshalling broadcast into proto", "err", err, "channel", c.channel)
return

View File

@@ -10,8 +10,8 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/google/uuid"
"github.com/prometheus/alertmanager/cluster"
"github.com/prometheus/alertmanager/cluster/clusterpb"
alertingCluster "github.com/grafana/alerting/cluster"
alertingClusterPB "github.com/grafana/alerting/cluster/clusterpb"
"github.com/prometheus/client_golang/prometheus"
"github.com/redis/go-redis/v9"
@@ -54,7 +54,7 @@ type redisPeer struct {
redis *redis.Client
prefix string
logger log.Logger
states map[string]cluster.State
states map[string]alertingCluster.State
subs map[string]*redis.PubSub
statesMtx sync.RWMutex
@@ -110,7 +110,7 @@ func newRedisPeer(cfg redisConfig, logger log.Logger, reg prometheus.Registerer,
name: name,
redis: rdb,
logger: logger,
states: map[string]cluster.State{},
states: map[string]alertingCluster.State{},
subs: map[string]*redis.PubSub{},
pushPullInterval: pushPullInterval,
readyc: make(chan struct{}),
@@ -425,7 +425,7 @@ func (p *redisPeer) Settle(ctx context.Context, interval time.Duration) {
close(p.readyc)
}
func (p *redisPeer) AddState(key string, state cluster.State, _ prometheus.Registerer) cluster.ClusterChannel {
func (p *redisPeer) AddState(key string, state alertingCluster.State, _ prometheus.Registerer) alertingCluster.ClusterChannel {
p.statesMtx.Lock()
defer p.statesMtx.Unlock()
p.states[key] = state
@@ -453,7 +453,7 @@ func (p *redisPeer) mergePartialState(buf []byte) {
p.messagesReceived.WithLabelValues(update).Inc()
p.messagesReceivedSize.WithLabelValues(update).Add(float64(len(buf)))
var part clusterpb.Part
var part alertingClusterPB.Part
if err := proto.Unmarshal(buf, &part); err != nil {
p.logger.Warn("Error decoding the received broadcast message", "err", err)
return
@@ -510,7 +510,7 @@ func (p *redisPeer) mergeFullState(buf []byte) {
p.messagesReceived.WithLabelValues(fullState).Inc()
p.messagesReceivedSize.WithLabelValues(fullState).Add(float64(len(buf)))
var fs clusterpb.FullState
var fs alertingClusterPB.FullState
if err := proto.Unmarshal(buf, &fs); err != nil {
p.logger.Warn("Error unmarshaling the received remote state", "err", err)
return
@@ -564,8 +564,8 @@ func (p *redisPeer) requestFullState() {
func (p *redisPeer) LocalState() []byte {
p.statesMtx.RLock()
defer p.statesMtx.RUnlock()
all := &clusterpb.FullState{
Parts: make([]clusterpb.Part, 0, len(p.states)),
all := &alertingClusterPB.FullState{
Parts: make([]alertingClusterPB.Part, 0, len(p.states)),
}
for key, s := range p.states {
@@ -573,7 +573,7 @@ func (p *redisPeer) LocalState() []byte {
if err != nil {
p.logger.Warn("Error encoding the local state", "err", err, "key", key)
}
all.Parts = append(all.Parts, clusterpb.Part{Key: key, Data: b})
all.Parts = append(all.Parts, alertingClusterPB.Part{Key: key, Data: b})
}
b, err := proto.Marshal(all)
if err != nil {