mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Alerting: Refactor readiness check (#78799)
* Alerting: Refactor readiness check Moves the readiness check to the mimir client and removes the need to assert that we have senders - it already has a queue and can hold notifications until we're ready to send them. --------- Signed-off-by: gotjosh <josue.abreu@gmail.com>
This commit is contained in:
parent
7e68e3f49e
commit
cc3c0a2cc2
@ -6,26 +6,20 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
httptransport "github.com/go-openapi/runtime/client"
|
||||
"github.com/go-openapi/strfmt"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/notifier"
|
||||
mimirClient "github.com/grafana/grafana/pkg/services/ngalert/remote/client"
|
||||
remoteClient "github.com/grafana/grafana/pkg/services/ngalert/remote/client"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/sender"
|
||||
amclient "github.com/prometheus/alertmanager/api/v2/client"
|
||||
amalert "github.com/prometheus/alertmanager/api/v2/client/alert"
|
||||
amalertgroup "github.com/prometheus/alertmanager/api/v2/client/alertgroup"
|
||||
amreceiver "github.com/prometheus/alertmanager/api/v2/client/receiver"
|
||||
amsilence "github.com/prometheus/alertmanager/api/v2/client/silence"
|
||||
)
|
||||
|
||||
const readyPath = "/-/ready"
|
||||
|
||||
type stateStore interface {
|
||||
GetFullState(ctx context.Context, keys ...string) (string, error)
|
||||
}
|
||||
@ -39,9 +33,8 @@ type Alertmanager struct {
|
||||
tenantID string
|
||||
url string
|
||||
|
||||
amClient *amclient.AlertmanagerAPI
|
||||
httpClient *http.Client
|
||||
mimirClient mimirClient.MimirClient
|
||||
amClient *remoteClient.Alertmanager
|
||||
mimirClient remoteClient.MimirClient
|
||||
}
|
||||
|
||||
type AlertmanagerConfig struct {
|
||||
@ -50,15 +43,7 @@ type AlertmanagerConfig struct {
|
||||
BasicAuthPassword string
|
||||
}
|
||||
|
||||
func NewAlertmanager(cfg AlertmanagerConfig, orgID int64, stateStore stateStore) (*Alertmanager, error) {
|
||||
client := http.Client{
|
||||
Transport: &mimirClient.MimirAuthRoundTripper{
|
||||
TenantID: cfg.TenantID,
|
||||
Password: cfg.BasicAuthPassword,
|
||||
Next: http.DefaultTransport,
|
||||
},
|
||||
}
|
||||
|
||||
func NewAlertmanager(cfg AlertmanagerConfig, orgID int64, store stateStore) (*Alertmanager, error) {
|
||||
if cfg.URL == "" {
|
||||
return nil, fmt.Errorf("empty remote Alertmanager URL for tenant '%s'", cfg.TenantID)
|
||||
}
|
||||
@ -70,31 +55,36 @@ func NewAlertmanager(cfg AlertmanagerConfig, orgID int64, stateStore stateStore)
|
||||
|
||||
logger := log.New("ngalert.remote.alertmanager")
|
||||
|
||||
mcCfg := &mimirClient.Config{
|
||||
mcCfg := &remoteClient.Config{
|
||||
URL: u,
|
||||
TenantID: cfg.TenantID,
|
||||
Password: cfg.BasicAuthPassword,
|
||||
Logger: logger,
|
||||
}
|
||||
|
||||
mc, err := mimirClient.New(mcCfg)
|
||||
mc, err := remoteClient.New(mcCfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
u = u.JoinPath("/alertmanager", amclient.DefaultBasePath)
|
||||
transport := httptransport.NewWithClient(u.Host, u.Path, []string{u.Scheme}, &client)
|
||||
amcCfg := &remoteClient.AlertmanagerConfig{
|
||||
URL: u,
|
||||
TenantID: cfg.TenantID,
|
||||
Password: cfg.BasicAuthPassword,
|
||||
Logger: logger,
|
||||
}
|
||||
amc, err := remoteClient.NewAlertmanager(amcCfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Using our client with custom headers and basic auth credentials.
|
||||
// Configure and start the components that sends alerts.
|
||||
c := amc.GetAuthedClient()
|
||||
doFunc := func(ctx context.Context, _ *http.Client, req *http.Request) (*http.Response, error) {
|
||||
return client.Do(req.WithContext(ctx))
|
||||
return c.Do(req.WithContext(ctx))
|
||||
}
|
||||
s := sender.NewExternalAlertmanagerSender(sender.WithDoFunc(doFunc))
|
||||
s.Run()
|
||||
|
||||
err = s.ApplyConfig(orgID, 0, []sender.ExternalAMcfg{{
|
||||
URL: cfg.URL + "/alertmanager",
|
||||
}})
|
||||
err = s.ApplyConfig(orgID, 0, []sender.ExternalAMcfg{{URL: cfg.URL + "/alertmanager"}})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -102,9 +92,8 @@ func NewAlertmanager(cfg AlertmanagerConfig, orgID int64, stateStore stateStore)
|
||||
return &Alertmanager{
|
||||
log: logger,
|
||||
mimirClient: mc,
|
||||
amClient: amclient.New(transport, nil),
|
||||
httpClient: &client,
|
||||
state: stateStore,
|
||||
state: store,
|
||||
amClient: amc,
|
||||
sender: s,
|
||||
orgID: orgID,
|
||||
tenantID: cfg.TenantID,
|
||||
@ -157,45 +146,18 @@ func (am *Alertmanager) ApplyConfig(ctx context.Context, config *models.AlertCon
|
||||
}
|
||||
|
||||
func (am *Alertmanager) checkReadiness(ctx context.Context) error {
|
||||
readyURL := strings.TrimSuffix(am.url, "/") + "/alertmanager" + readyPath
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, readyURL, nil)
|
||||
ready, err := am.amClient.IsReadyWithBackoff(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating readiness request: %w", err)
|
||||
return err
|
||||
}
|
||||
|
||||
res, err := am.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error performing readiness check: %w", err)
|
||||
if ready {
|
||||
am.log.Debug("Alertmanager readiness check successful")
|
||||
am.ready = true
|
||||
return nil
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := res.Body.Close(); err != nil {
|
||||
am.log.Warn("Error closing response body", "err", err)
|
||||
}
|
||||
}()
|
||||
|
||||
if res.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("%w, status code: %d", notifier.ErrAlertmanagerNotReady, res.StatusCode)
|
||||
}
|
||||
|
||||
// Wait for active senders.
|
||||
var attempts int
|
||||
ticker := time.NewTicker(100 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
attempts++
|
||||
if len(am.sender.Alertmanagers()) > 0 {
|
||||
am.log.Debug("Alertmanager readiness check successful", "attempts", attempts)
|
||||
am.ready = true
|
||||
return nil
|
||||
}
|
||||
case <-time.After(10 * time.Second):
|
||||
return notifier.ErrAlertmanagerNotReady
|
||||
}
|
||||
}
|
||||
return notifier.ErrAlertmanagerNotReady
|
||||
}
|
||||
|
||||
func (am *Alertmanager) SaveAndApplyConfig(ctx context.Context, cfg *apimodels.PostableUserConfig) error {
|
||||
|
@ -78,7 +78,6 @@ func TestNewAlertmanager(t *testing.T) {
|
||||
require.Equal(tt, am.url, test.url)
|
||||
require.Equal(tt, am.orgID, test.orgID)
|
||||
require.NotNil(tt, am.amClient)
|
||||
require.NotNil(tt, am.httpClient)
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -340,6 +339,9 @@ func TestIntegrationRemoteAlertmanagerAlerts(t *testing.T) {
|
||||
// Wait until the Alertmanager is ready to send alerts.
|
||||
require.NoError(t, am.checkReadiness(context.Background()))
|
||||
require.True(t, am.Ready())
|
||||
require.Eventually(t, func() bool {
|
||||
return len(am.sender.Alertmanagers()) > 0
|
||||
}, 10*time.Second, 500*time.Millisecond)
|
||||
|
||||
// We should have no alerts and no groups at first.
|
||||
alerts, err := am.GetAlerts(context.Background(), true, true, true, []string{}, "")
|
||||
|
114
pkg/services/ngalert/remote/client/alertmanager.go
Normal file
114
pkg/services/ngalert/remote/client/alertmanager.go
Normal file
@ -0,0 +1,114 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
httptransport "github.com/go-openapi/runtime/client"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
amclient "github.com/prometheus/alertmanager/api/v2/client"
|
||||
)
|
||||
|
||||
const alertmanagerAPIMountPath = "/alertmanager"
|
||||
const alertmanagerReadyPath = "/-/ready"
|
||||
|
||||
type AlertmanagerConfig struct {
|
||||
TenantID string
|
||||
Password string
|
||||
URL *url.URL
|
||||
Logger log.Logger
|
||||
}
|
||||
|
||||
type Alertmanager struct {
|
||||
*amclient.AlertmanagerAPI
|
||||
httpClient *http.Client
|
||||
url *url.URL
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
func NewAlertmanager(cfg *AlertmanagerConfig) (*Alertmanager, error) {
|
||||
// First, add the authentication middleware.
|
||||
c := &http.Client{Transport: &MimirAuthRoundTripper{
|
||||
TenantID: cfg.TenantID,
|
||||
Password: cfg.Password,
|
||||
Next: http.DefaultTransport,
|
||||
}}
|
||||
|
||||
apiEndpoint := *cfg.URL
|
||||
|
||||
// Next, make sure you set the right path.
|
||||
u := apiEndpoint.JoinPath(alertmanagerAPIMountPath, amclient.DefaultBasePath)
|
||||
transport := httptransport.NewWithClient(u.Host, u.Path, []string{u.Scheme}, c)
|
||||
|
||||
return &Alertmanager{
|
||||
logger: cfg.Logger,
|
||||
url: cfg.URL,
|
||||
AlertmanagerAPI: amclient.New(transport, nil),
|
||||
httpClient: c,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetAuthedClient returns a *http.Client that includes a configured MimirAuthRoundTripper.
|
||||
// Requests using this client are fully authenticated.
|
||||
func (am *Alertmanager) GetAuthedClient() *http.Client {
|
||||
return am.httpClient
|
||||
}
|
||||
|
||||
// IsReadyWithBackoff executes a readiness check against the `/-/ready` Alertmanager endpoint.
|
||||
// If it takes more than 10s to get a response back - we abort the check.
|
||||
func (am *Alertmanager) IsReadyWithBackoff(ctx context.Context) (bool, error) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
readyURL := am.url.JoinPath(am.url.Path, alertmanagerAPIMountPath, alertmanagerReadyPath)
|
||||
|
||||
attempt := func() (int, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, readyURL.String(), nil)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("error creating the readiness request: %w", err)
|
||||
}
|
||||
|
||||
res, err := am.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("error performing the readiness check: %w", err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := res.Body.Close(); err != nil {
|
||||
am.logger.Warn("Error closing response body", "err", err)
|
||||
}
|
||||
}()
|
||||
|
||||
return res.StatusCode, nil
|
||||
}
|
||||
|
||||
var attempts int
|
||||
ticker := time.NewTicker(100 * time.Millisecond)
|
||||
deadlineCh := time.After(10 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
attempts++
|
||||
status, err := attempt()
|
||||
if err != nil {
|
||||
am.logger.Debug("Ready check attempt failed", "attempt", attempts, "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if status != http.StatusOK {
|
||||
am.logger.Debug("Ready check failed, status code is not 200", "attempt", attempts, "status", status, "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
return true, nil
|
||||
case <-deadlineCh:
|
||||
cancel()
|
||||
return false, fmt.Errorf("ready check timed out after %d attempts", attempts)
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user