mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Alerting: Introduce a Mimir client as part of the Remote Alertmanager (#78357)
* Alerting: Introduce a Mimir client as part of the Remote Alertmanager This is our first attempt at making Grafana communicate use Mimir as a backend - it uses a new set of APIs that we've developed on the Mimir side to upload the grafana configuration and alertmanager state so that it can then be ported over. Codewise, we've introduced a couple of things: A client to isolate in its own package all the communication that happens with Mimir A few changes to the remote/alertmanager to include uploading the configuration and state when it starts A few refactors that align a bit better with the design approach that we're thinking An integration tests again these newly developed APIs using a custom image --------- Signed-off-by: gotjosh <josue.abreu@gmail.com> Co-authored-by: Santiago <santiagohernandez.1997@gmail.com>
This commit is contained in:
parent
eedc19f9f0
commit
23fe8f4e9c
17
.drone.yml
17
.drone.yml
@ -802,7 +802,7 @@ services:
|
||||
- commands:
|
||||
- /bin/mimir -target=backend
|
||||
environment: {}
|
||||
image: grafana/mimir:latest
|
||||
image: us.gcr.io/kubernetes-dev/mimir:gotjosh-state-config-grafana-663a0ae78
|
||||
name: mimir_backend
|
||||
- environment: {}
|
||||
image: redis:6.2.11-alpine
|
||||
@ -980,7 +980,6 @@ steps:
|
||||
- wire-install
|
||||
- wait-for-remote-alertmanager
|
||||
environment:
|
||||
AM_PASSWORD: test
|
||||
AM_TENANT_ID: test
|
||||
AM_URL: http://mimir_backend:8080
|
||||
image: golang:1.21.3-alpine
|
||||
@ -1246,7 +1245,7 @@ services:
|
||||
- commands:
|
||||
- /bin/mimir -target=backend
|
||||
environment: {}
|
||||
image: grafana/mimir:latest
|
||||
image: us.gcr.io/kubernetes-dev/mimir:gotjosh-state-config-grafana-663a0ae78
|
||||
name: mimir_backend
|
||||
- environment: {}
|
||||
image: redis:6.2.11-alpine
|
||||
@ -2174,7 +2173,7 @@ services:
|
||||
- commands:
|
||||
- /bin/mimir -target=backend
|
||||
environment: {}
|
||||
image: grafana/mimir:latest
|
||||
image: us.gcr.io/kubernetes-dev/mimir:gotjosh-state-config-grafana-663a0ae78
|
||||
name: mimir_backend
|
||||
- environment: {}
|
||||
image: redis:6.2.11-alpine
|
||||
@ -2331,7 +2330,6 @@ steps:
|
||||
- wire-install
|
||||
- wait-for-remote-alertmanager
|
||||
environment:
|
||||
AM_PASSWORD: test
|
||||
AM_TENANT_ID: test
|
||||
AM_URL: http://mimir_backend:8080
|
||||
image: golang:1.21.3-alpine
|
||||
@ -3844,7 +3842,7 @@ services:
|
||||
- commands:
|
||||
- /bin/mimir -target=backend
|
||||
environment: {}
|
||||
image: grafana/mimir:latest
|
||||
image: us.gcr.io/kubernetes-dev/mimir:gotjosh-state-config-grafana-663a0ae78
|
||||
name: mimir_backend
|
||||
- environment: {}
|
||||
image: redis:6.2.11-alpine
|
||||
@ -3994,7 +3992,6 @@ steps:
|
||||
- wire-install
|
||||
- wait-for-remote-alertmanager
|
||||
environment:
|
||||
AM_PASSWORD: test
|
||||
AM_TENANT_ID: test
|
||||
AM_URL: http://mimir_backend:8080
|
||||
image: golang:1.21.3-alpine
|
||||
@ -4413,7 +4410,7 @@ steps:
|
||||
- trivy --exit-code 0 --severity UNKNOWN,LOW,MEDIUM plugins/slack
|
||||
- trivy --exit-code 0 --severity UNKNOWN,LOW,MEDIUM python:3.8
|
||||
- trivy --exit-code 0 --severity UNKNOWN,LOW,MEDIUM postgres:12.3-alpine
|
||||
- trivy --exit-code 0 --severity UNKNOWN,LOW,MEDIUM grafana/mimir:latest
|
||||
- trivy --exit-code 0 --severity UNKNOWN,LOW,MEDIUM us.gcr.io/kubernetes-dev/mimir:gotjosh-state-config-grafana-663a0ae78
|
||||
- trivy --exit-code 0 --severity UNKNOWN,LOW,MEDIUM mysql:5.7.39
|
||||
- trivy --exit-code 0 --severity UNKNOWN,LOW,MEDIUM mysql:8.0.32
|
||||
- trivy --exit-code 0 --severity UNKNOWN,LOW,MEDIUM redis:6.2.11-alpine
|
||||
@ -4447,7 +4444,7 @@ steps:
|
||||
- trivy --exit-code 1 --severity HIGH,CRITICAL plugins/slack
|
||||
- trivy --exit-code 1 --severity HIGH,CRITICAL python:3.8
|
||||
- trivy --exit-code 1 --severity HIGH,CRITICAL postgres:12.3-alpine
|
||||
- trivy --exit-code 1 --severity HIGH,CRITICAL grafana/mimir:latest
|
||||
- trivy --exit-code 1 --severity HIGH,CRITICAL us.gcr.io/kubernetes-dev/mimir:gotjosh-state-config-grafana-663a0ae78
|
||||
- trivy --exit-code 1 --severity HIGH,CRITICAL mysql:5.7.39
|
||||
- trivy --exit-code 1 --severity HIGH,CRITICAL mysql:8.0.32
|
||||
- trivy --exit-code 1 --severity HIGH,CRITICAL redis:6.2.11-alpine
|
||||
@ -4685,6 +4682,6 @@ kind: secret
|
||||
name: gcr_credentials
|
||||
---
|
||||
kind: signature
|
||||
hmac: 64302d9316abab775d7ec1132f26ea4f1829558fa0bfd85812597182c1abe61a
|
||||
hmac: 0e9f67184e414d3afbda81c86dfa58b3c2cf7c1a668be5313c851ff5f42de44d
|
||||
|
||||
...
|
||||
|
2
Makefile
2
Makefile
@ -184,7 +184,7 @@ test-go-integration: ## Run integration tests for backend with flags.
|
||||
test-go-integration-alertmanager: ## Run integration tests for the remote alertmanager (config taken from the mimir_backend block).
|
||||
@echo "test remote alertmanager integration tests"
|
||||
$(GO) clean -testcache
|
||||
AM_URL=http://localhost:8080 AM_TENANT_ID=test AM_PASSWORD=test \
|
||||
AM_URL=http://localhost:8080 AM_TENANT_ID=test \
|
||||
$(GO) test -count=1 -run "^TestIntegrationRemoteAlertmanager" -covermode=atomic -timeout=5m ./pkg/services/ngalert/...
|
||||
|
||||
.PHONY: test-go-integration-postgres
|
||||
|
@ -2,8 +2,8 @@ package remote
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
@ -15,6 +15,7 @@ import (
|
||||
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/notifier"
|
||||
mimirClient "github.com/grafana/grafana/pkg/services/ngalert/remote/client"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/sender"
|
||||
amclient "github.com/prometheus/alertmanager/api/v2/client"
|
||||
amalert "github.com/prometheus/alertmanager/api/v2/client/alert"
|
||||
@ -31,10 +32,11 @@ type Alertmanager struct {
|
||||
tenantID string
|
||||
url string
|
||||
|
||||
amClient *amclient.AlertmanagerAPI
|
||||
httpClient *http.Client
|
||||
ready bool
|
||||
sender *sender.ExternalAlertmanager
|
||||
amClient *amclient.AlertmanagerAPI
|
||||
mimirClient mimirClient.MimirClient
|
||||
httpClient *http.Client
|
||||
ready bool
|
||||
sender *sender.ExternalAlertmanager
|
||||
}
|
||||
|
||||
type AlertmanagerConfig struct {
|
||||
@ -45,10 +47,10 @@ type AlertmanagerConfig struct {
|
||||
|
||||
func NewAlertmanager(cfg AlertmanagerConfig, orgID int64) (*Alertmanager, error) {
|
||||
client := http.Client{
|
||||
Transport: &roundTripper{
|
||||
tenantID: cfg.TenantID,
|
||||
basicAuthPassword: cfg.BasicAuthPassword,
|
||||
next: http.DefaultTransport,
|
||||
Transport: &mimirClient.MimirAuthRoundTripper{
|
||||
TenantID: cfg.TenantID,
|
||||
Password: cfg.BasicAuthPassword,
|
||||
Next: http.DefaultTransport,
|
||||
},
|
||||
}
|
||||
|
||||
@ -56,12 +58,26 @@ func NewAlertmanager(cfg AlertmanagerConfig, orgID int64) (*Alertmanager, error)
|
||||
return nil, fmt.Errorf("empty URL for tenant %s", cfg.TenantID)
|
||||
}
|
||||
|
||||
logger := log.New("ngalert.remote.alertmanager")
|
||||
|
||||
mcCfg := &mimirClient.Config{
|
||||
Address: cfg.URL,
|
||||
TenantID: cfg.TenantID,
|
||||
Password: cfg.BasicAuthPassword,
|
||||
Logger: logger,
|
||||
}
|
||||
|
||||
mc, err := mimirClient.New(mcCfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
u, err := url.Parse(cfg.URL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
u = u.JoinPath(amclient.DefaultBasePath)
|
||||
u = u.JoinPath("/alertmanager", amclient.DefaultBasePath)
|
||||
transport := httptransport.NewWithClient(u.Host, u.Path, []string{u.Scheme}, &client)
|
||||
|
||||
// Using our client with custom headers and basic auth credentials.
|
||||
@ -72,33 +88,67 @@ func NewAlertmanager(cfg AlertmanagerConfig, orgID int64) (*Alertmanager, error)
|
||||
s.Run()
|
||||
|
||||
err = s.ApplyConfig(orgID, 0, []sender.ExternalAMcfg{{
|
||||
URL: cfg.URL,
|
||||
URL: cfg.URL + "/alertmanager",
|
||||
}})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Alertmanager{
|
||||
amClient: amclient.New(transport, nil),
|
||||
httpClient: &client,
|
||||
log: log.New("ngalert.remote.alertmanager"),
|
||||
sender: s,
|
||||
orgID: orgID,
|
||||
tenantID: cfg.TenantID,
|
||||
url: cfg.URL,
|
||||
log: logger,
|
||||
mimirClient: mc,
|
||||
amClient: amclient.New(transport, nil),
|
||||
httpClient: &client,
|
||||
sender: s,
|
||||
orgID: orgID,
|
||||
tenantID: cfg.TenantID,
|
||||
url: cfg.URL,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ApplyConfig is called everytime we've determined we need to apply an existing configuration to the Alertmanager,
|
||||
// including the first time the Alertmanager is started. In the context of a "remote Alertmanager" it's as good of a heuristic,
|
||||
// for "a function that gets called when the Alertmanager starts". As a result we do two things:
|
||||
// 1. Execute a readiness check to make sure the remote Alertmanager we're about to communicate with is up and ready.
|
||||
// 2. Upload the configuration and state we currently hold.
|
||||
func (am *Alertmanager) ApplyConfig(ctx context.Context, config *models.AlertConfiguration) error {
|
||||
if am.ready {
|
||||
am.log.Debug("Alertmanager previously marked as ready, skipping readiness check")
|
||||
return nil
|
||||
}
|
||||
|
||||
return am.checkReadiness(ctx)
|
||||
// First, execute a readiness check to make sure the remote Alertmanager is ready.
|
||||
am.log.Debug("Start readiness check for remote Alertmanager", "url", am.url)
|
||||
if err := am.checkReadiness(ctx); err != nil {
|
||||
am.log.Error("unable to pass the readiness check", "err", err)
|
||||
return err
|
||||
}
|
||||
am.log.Debug("Completed readiness check for remote Alertmanager", "url", am.url)
|
||||
|
||||
am.log.Debug("Start configuration upload to remote Alertmanager", "url", am.url)
|
||||
if ok := am.compareRemoteConfig(ctx, config); !ok {
|
||||
err := am.mimirClient.CreateGrafanaAlertmanagerConfig(ctx, config.AlertmanagerConfiguration, config.ConfigurationHash, config.ID, config.CreatedAt, config.Default)
|
||||
if err != nil {
|
||||
am.log.Error("Unable to upload the configuration to the remote Alertmanager", "err", err)
|
||||
} else {
|
||||
am.log.Debug("Completed configuration upload to remote Alertmanager", "url", am.url)
|
||||
}
|
||||
}
|
||||
|
||||
am.log.Debug("Start state upload to remote Alertmanager", "url", am.url)
|
||||
if ok := am.compareRemoteState(ctx, ""); !ok {
|
||||
if err := am.mimirClient.CreateGrafanaAlertmanagerState(ctx, ""); err != nil {
|
||||
am.log.Error("Unable to upload the state to the remote Alertmanager", "err", err)
|
||||
}
|
||||
}
|
||||
am.log.Debug("Completed state upload to remote Alertmanager", "url", am.url)
|
||||
// upload the state
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (am *Alertmanager) checkReadiness(ctx context.Context) error {
|
||||
readyURL := strings.TrimSuffix(am.url, "/") + readyPath
|
||||
readyURL := strings.TrimSuffix(am.url, "/") + "/alertmanager" + readyPath
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, readyURL, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating readiness request: %w", err)
|
||||
@ -286,64 +336,41 @@ func (am *Alertmanager) TestTemplate(ctx context.Context, c apimodels.TestTempla
|
||||
return ¬ifier.TestTemplatesResults{}, nil
|
||||
}
|
||||
|
||||
// StopAndWait is called when the grafana server is instructed to shut down or an org is deleted.
|
||||
// In the context of a "remote Alertmanager" it is a good heuristic for Grafana is about to shut down or we no longer need you.
|
||||
func (am *Alertmanager) StopAndWait() {
|
||||
am.sender.Stop()
|
||||
|
||||
// Upload the configuration and state
|
||||
}
|
||||
|
||||
func (am *Alertmanager) Ready() bool {
|
||||
return am.ready
|
||||
}
|
||||
|
||||
// We don't have files on disk, no-op.
|
||||
// CleanUp does not have an equivalent in a "remote Alertmanager" context, we don't have files on disk, no-op.
|
||||
func (am *Alertmanager) CleanUp() {}
|
||||
|
||||
type roundTripper struct {
|
||||
tenantID string
|
||||
basicAuthPassword string
|
||||
next http.RoundTripper
|
||||
// compareRemoteConfig gets the remote Alertmanager config and compares it to the existing configuration.
|
||||
func (am *Alertmanager) compareRemoteConfig(ctx context.Context, config *models.AlertConfiguration) bool {
|
||||
rc, err := am.mimirClient.GetGrafanaAlertmanagerConfig(ctx)
|
||||
if err != nil {
|
||||
// If we get an error trying to compare log it and return false so that we try to upload it anyway.
|
||||
am.log.Error("Unable to get the remote Alertmanager Configuration for comparison", "err", err)
|
||||
return false
|
||||
}
|
||||
|
||||
return md5.Sum([]byte(rc.GrafanaAlertmanagerConfig)) == md5.Sum([]byte(config.AlertmanagerConfiguration))
|
||||
}
|
||||
|
||||
// RoundTrip implements the http.RoundTripper interface
|
||||
// while adding the `X-Scope-OrgID` header and basic auth credentials.
|
||||
func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
req.Header.Set("X-Scope-OrgID", r.tenantID)
|
||||
if r.tenantID != "" && r.basicAuthPassword != "" {
|
||||
req.SetBasicAuth(r.tenantID, r.basicAuthPassword)
|
||||
// compareRemoteState gets the remote Alertmanager state and compares it to the existing state.
|
||||
func (am *Alertmanager) compareRemoteState(ctx context.Context, state string) bool {
|
||||
rs, err := am.mimirClient.GetGrafanaAlertmanagerState(ctx)
|
||||
if err != nil {
|
||||
// If we get an error trying to compare log it and return false so that we try to upload it anyway.
|
||||
am.log.Error("Unable to get the remote Alertmanager state for comparison", "err", err)
|
||||
return false
|
||||
}
|
||||
|
||||
return r.next.RoundTrip(req)
|
||||
}
|
||||
|
||||
// TODO: change implementation, this is only useful for testing other methods.
|
||||
func (am *Alertmanager) postConfig(ctx context.Context, rawConfig string) error {
|
||||
alertsURL := strings.TrimSuffix(am.url, "/alertmanager") + "/api/v1/alerts"
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, alertsURL, strings.NewReader(rawConfig))
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating request: %v", err)
|
||||
}
|
||||
|
||||
res, err := am.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if res.StatusCode == http.StatusNotFound {
|
||||
return fmt.Errorf("config not found")
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := res.Body.Close(); err != nil {
|
||||
am.log.Warn("Error while closing body", "err", err)
|
||||
}
|
||||
}()
|
||||
|
||||
_, err = io.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error reading request response: %w", err)
|
||||
}
|
||||
|
||||
if res.StatusCode != http.StatusCreated {
|
||||
return fmt.Errorf("setting config failed with status code %d", res.StatusCode)
|
||||
}
|
||||
return nil
|
||||
return rs.State == state
|
||||
}
|
||||
|
@ -2,20 +2,25 @@ package remote
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-openapi/strfmt"
|
||||
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
|
||||
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
"github.com/grafana/grafana/pkg/util"
|
||||
amv2 "github.com/prometheus/alertmanager/api/v2/models"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// Valid config for Cloud AM, no `grafana_managed_receievers` field.
|
||||
const upstreamConfig = `{"template_files": {}, "alertmanager_config": "{\"global\": {\"smtp_from\": \"test@test.com\"}, \"route\": {\"receiver\": \"discord\"}, \"receivers\": [{\"name\": \"discord\", \"discord_configs\": [{\"webhook_url\": \"http://localhost:1234\"}]}]}"}`
|
||||
// Valid Grafana Alertmanager configuration.
|
||||
const testGrafanaConfig = `{"template_files":{},"alertmanager_config":{"route":{"receiver":"grafana-default-email","group_by":["grafana_folder","alertname"]},"templates":null,"receivers":[{"name":"grafana-default-email","grafana_managed_receiver_configs":[{"uid":"","name":"some other name","type":"email","disableResolveMessage":false,"settings":{"addresses":"\u003cexample@email.com\u003e"},"secureSettings":null}]}]}}`
|
||||
|
||||
func TestNewAlertmanager(t *testing.T) {
|
||||
tests := []struct {
|
||||
@ -66,6 +71,124 @@ func TestNewAlertmanager(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestApplyConfig(t *testing.T) {
|
||||
errorHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
})
|
||||
okHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
})
|
||||
|
||||
// ApplyConfig performs a readiness check at startup.
|
||||
// A non-200 response should result in an error.
|
||||
server := httptest.NewServer(errorHandler)
|
||||
cfg := AlertmanagerConfig{
|
||||
URL: server.URL,
|
||||
}
|
||||
am, err := NewAlertmanager(cfg, 1)
|
||||
require.NoError(t, err)
|
||||
|
||||
config := &ngmodels.AlertConfiguration{}
|
||||
ctx := context.Background()
|
||||
require.Error(t, am.ApplyConfig(ctx, config))
|
||||
require.False(t, am.Ready())
|
||||
|
||||
// A 200 status code response should make the check succeed.
|
||||
server.Config.Handler = okHandler
|
||||
require.NoError(t, am.ApplyConfig(ctx, config))
|
||||
require.True(t, am.Ready())
|
||||
|
||||
// If we already got a 200 status code response, we shouldn't make the HTTP request again.
|
||||
server.Config.Handler = errorHandler
|
||||
require.NoError(t, am.ApplyConfig(ctx, config))
|
||||
require.True(t, am.Ready())
|
||||
}
|
||||
|
||||
func TestIntegrationRemoteAlertmanagerApplyConfigOnlyUploadsOnce(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test")
|
||||
}
|
||||
|
||||
amURL, ok := os.LookupEnv("AM_URL")
|
||||
if !ok {
|
||||
t.Skip("No Alertmanager URL provided")
|
||||
}
|
||||
tenantID := os.Getenv("AM_TENANT_ID")
|
||||
password := os.Getenv("AM_PASSWORD")
|
||||
|
||||
// ApplyConfig performs a readiness check.
|
||||
cfg := AlertmanagerConfig{
|
||||
URL: amURL,
|
||||
TenantID: tenantID,
|
||||
BasicAuthPassword: password,
|
||||
}
|
||||
|
||||
fakeConfigHash := fmt.Sprintf("%x", md5.Sum([]byte(testGrafanaConfig)))
|
||||
fakeConfigCreatedAt := time.Date(2020, 6, 5, 12, 6, 0, 0, time.UTC).Unix()
|
||||
fakeConfig := &ngmodels.AlertConfiguration{
|
||||
ID: 100,
|
||||
AlertmanagerConfiguration: testGrafanaConfig,
|
||||
ConfigurationHash: fakeConfigHash,
|
||||
ConfigurationVersion: "v2",
|
||||
CreatedAt: fakeConfigCreatedAt,
|
||||
Default: true,
|
||||
OrgID: 1,
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
am, err := NewAlertmanager(cfg, 1)
|
||||
require.NoError(t, err)
|
||||
|
||||
// We should have no configuration at first.
|
||||
{
|
||||
_, err = am.mimirClient.GetGrafanaAlertmanagerConfig(ctx)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, "Error response from the Mimir API: alertmanager storage object not found", err.Error())
|
||||
}
|
||||
|
||||
// Using `ApplyConfig` as a heuristic of a function that gets called when the Alertmanager starts
|
||||
// We call it as if the Alertmanager were starting.
|
||||
{
|
||||
require.NoError(t, am.ApplyConfig(ctx, fakeConfig))
|
||||
|
||||
// First, we need to verify that the readiness check passes.
|
||||
require.True(t, am.Ready())
|
||||
|
||||
// Next, we need to verify that Mimir received the configuration.
|
||||
config, err := am.mimirClient.GetGrafanaAlertmanagerConfig(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(100), config.ID)
|
||||
require.Equal(t, testGrafanaConfig, config.GrafanaAlertmanagerConfig)
|
||||
require.Equal(t, fakeConfigHash, config.Hash)
|
||||
require.Equal(t, fakeConfigCreatedAt, config.CreatedAt)
|
||||
require.Equal(t, true, config.Default)
|
||||
|
||||
// TODO: Check that the state was uploaded.
|
||||
}
|
||||
|
||||
// Calling `ApplyConfig` again with a changed configuration yields no effect.
|
||||
{
|
||||
fakeConfig.ID = 30000000000000000
|
||||
require.NoError(t, am.ApplyConfig(ctx, fakeConfig))
|
||||
|
||||
// The remote Alertmanager continues to be ready.
|
||||
require.True(t, am.Ready())
|
||||
|
||||
// Next, we need to verify that the config that was uploaded remains the same.
|
||||
config, err := am.mimirClient.GetGrafanaAlertmanagerConfig(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(100), config.ID)
|
||||
require.Equal(t, testGrafanaConfig, config.GrafanaAlertmanagerConfig)
|
||||
require.Equal(t, fakeConfigHash, config.Hash)
|
||||
require.Equal(t, fakeConfigCreatedAt, config.CreatedAt)
|
||||
require.Equal(t, true, config.Default)
|
||||
}
|
||||
|
||||
// TODO: Now, shutdown the Alertmanager and we expect the latest configuration to be uploaded.
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
func TestIntegrationRemoteAlertmanagerSilences(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test")
|
||||
@ -79,7 +202,7 @@ func TestIntegrationRemoteAlertmanagerSilences(t *testing.T) {
|
||||
password := os.Getenv("AM_PASSWORD")
|
||||
|
||||
cfg := AlertmanagerConfig{
|
||||
URL: amURL + "/alertmanager",
|
||||
URL: amURL,
|
||||
TenantID: tenantID,
|
||||
BasicAuthPassword: password,
|
||||
}
|
||||
@ -158,7 +281,7 @@ func TestIntegrationRemoteAlertmanagerAlerts(t *testing.T) {
|
||||
password := os.Getenv("AM_PASSWORD")
|
||||
|
||||
cfg := AlertmanagerConfig{
|
||||
URL: amURL + "/alertmanager",
|
||||
URL: amURL,
|
||||
TenantID: tenantID,
|
||||
BasicAuthPassword: password,
|
||||
}
|
||||
@ -219,7 +342,7 @@ func TestIntegrationRemoteAlertmanagerReceivers(t *testing.T) {
|
||||
password := os.Getenv("AM_PASSWORD")
|
||||
|
||||
cfg := AlertmanagerConfig{
|
||||
URL: amURL + "/alertmanager",
|
||||
URL: amURL,
|
||||
TenantID: tenantID,
|
||||
BasicAuthPassword: password,
|
||||
}
|
||||
@ -231,14 +354,6 @@ func TestIntegrationRemoteAlertmanagerReceivers(t *testing.T) {
|
||||
rcvs, err := am.GetReceivers(context.Background())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "empty-receiver", *rcvs[0].Name)
|
||||
|
||||
// After changing the configuration, we should have a new `discord` receiver.
|
||||
require.NoError(t, am.postConfig(context.Background(), upstreamConfig))
|
||||
require.Eventually(t, func() bool {
|
||||
rcvs, err = am.GetReceivers(context.Background())
|
||||
require.NoError(t, err)
|
||||
return *rcvs[0].Name == "discord"
|
||||
}, 16*time.Second, 1*time.Second)
|
||||
}
|
||||
|
||||
func genSilence(createdBy string) apimodels.PostableSilence {
|
||||
|
@ -0,0 +1,59 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
const (
|
||||
grafanaAlertmanagerConfigPath = "/api/v1/grafana/config"
|
||||
)
|
||||
|
||||
type UserGrafanaConfig struct {
|
||||
ID int64 `json:"id"`
|
||||
GrafanaAlertmanagerConfig string `json:"configuration"`
|
||||
Hash string `json:"configuration_hash"`
|
||||
CreatedAt int64 `json:"created"`
|
||||
Default bool `json:"default"`
|
||||
}
|
||||
|
||||
func (mc *Mimir) GetGrafanaAlertmanagerConfig(ctx context.Context) (*UserGrafanaConfig, error) {
|
||||
gc := &UserGrafanaConfig{}
|
||||
response := successResponse{
|
||||
Data: gc,
|
||||
}
|
||||
// nolint:bodyclose
|
||||
// closed within `do`
|
||||
_, err := mc.do(ctx, grafanaAlertmanagerConfigPath, http.MethodGet, nil, &response)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if response.Status != "success" {
|
||||
return nil, fmt.Errorf("returned non-success `status` from the MimirAPI: %s", response.Status)
|
||||
}
|
||||
|
||||
return gc, nil
|
||||
}
|
||||
|
||||
func (mc *Mimir) CreateGrafanaAlertmanagerConfig(ctx context.Context, c, hash string, id, created int64, d bool) error {
|
||||
payload, err := json.Marshal(&UserGrafanaConfig{
|
||||
ID: id,
|
||||
GrafanaAlertmanagerConfig: c,
|
||||
Hash: hash,
|
||||
CreatedAt: created,
|
||||
Default: d,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return mc.doOK(ctx, grafanaAlertmanagerConfigPath, http.MethodPost, bytes.NewBuffer(payload))
|
||||
}
|
||||
|
||||
func (mc *Mimir) DeleteGrafanaAlertmanagerConfig(ctx context.Context) error {
|
||||
return mc.doOK(ctx, grafanaAlertmanagerConfigPath, http.MethodDelete, nil)
|
||||
}
|
51
pkg/services/ngalert/remote/client/alertmanager_state.go
Normal file
51
pkg/services/ngalert/remote/client/alertmanager_state.go
Normal file
@ -0,0 +1,51 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
const (
|
||||
grafanaAlertmanagerStatePath = "/grafana/state"
|
||||
)
|
||||
|
||||
type UserGrafanaState struct {
|
||||
State string `json:"state"`
|
||||
}
|
||||
|
||||
func (mc *Mimir) GetGrafanaAlertmanagerState(ctx context.Context) (*UserGrafanaState, error) {
|
||||
gs := &UserGrafanaState{}
|
||||
response := successResponse{
|
||||
Data: gs,
|
||||
}
|
||||
// nolint:bodyclose
|
||||
// closed within `do`
|
||||
_, err := mc.do(ctx, grafanaAlertmanagerStatePath, http.MethodGet, nil, &response)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if response.Status != "success" {
|
||||
return nil, fmt.Errorf("returned non-success `status` from the MimirAPI: %s", response.Status)
|
||||
}
|
||||
|
||||
return gs, nil
|
||||
}
|
||||
|
||||
func (mc *Mimir) CreateGrafanaAlertmanagerState(ctx context.Context, state string) error {
|
||||
payload, err := json.Marshal(&UserGrafanaState{
|
||||
State: state,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return mc.doOK(ctx, grafanaAlertmanagerStatePath, http.MethodPost, bytes.NewBuffer(payload))
|
||||
}
|
||||
|
||||
func (mc *Mimir) DeleteGrafanaAlertmanagerState(ctx context.Context) error {
|
||||
return mc.doOK(ctx, grafanaAlertmanagerStatePath, http.MethodDelete, nil)
|
||||
}
|
179
pkg/services/ngalert/remote/client/mimir.go
Normal file
179
pkg/services/ngalert/remote/client/mimir.go
Normal file
@ -0,0 +1,179 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
"strings"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
)
|
||||
|
||||
// MimirClient contains all the methods to query the migration critical endpoints of Mimir instance, it's an interface to allow multiple implementations.
|
||||
type MimirClient interface {
|
||||
GetGrafanaAlertmanagerState(ctx context.Context) (*UserGrafanaState, error)
|
||||
CreateGrafanaAlertmanagerState(ctx context.Context, s string) error
|
||||
DeleteGrafanaAlertmanagerState(ctx context.Context) error
|
||||
|
||||
GetGrafanaAlertmanagerConfig(ctx context.Context) (*UserGrafanaConfig, error)
|
||||
CreateGrafanaAlertmanagerConfig(ctx context.Context, configuration string, hash string, id int64, at int64, d bool) error
|
||||
DeleteGrafanaAlertmanagerConfig(ctx context.Context) error
|
||||
}
|
||||
|
||||
type Mimir struct {
|
||||
endpoint *url.URL
|
||||
client http.Client
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
Address string
|
||||
TenantID string
|
||||
Password string
|
||||
|
||||
Logger log.Logger
|
||||
}
|
||||
|
||||
// successResponse represents a successful response from the Mimir API.
|
||||
type successResponse struct {
|
||||
Status string `json:"status"`
|
||||
Data any `json:"data"`
|
||||
}
|
||||
|
||||
// errorResponse represents an error from the Mimir API.
|
||||
type errorResponse struct {
|
||||
Status string `json:"status"`
|
||||
Error1 string `json:"error"`
|
||||
Error2 string `json:"Error"`
|
||||
}
|
||||
|
||||
func (e *errorResponse) Error() string {
|
||||
if e.Error1 != "" {
|
||||
return e.Error1
|
||||
}
|
||||
|
||||
return e.Error2
|
||||
}
|
||||
|
||||
func New(cfg *Config) (*Mimir, error) {
|
||||
endpoint, err := url.Parse(cfg.Address)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rt := &MimirAuthRoundTripper{
|
||||
TenantID: cfg.TenantID,
|
||||
Password: cfg.Password,
|
||||
Next: http.DefaultTransport,
|
||||
}
|
||||
|
||||
c := http.Client{
|
||||
Transport: rt,
|
||||
}
|
||||
|
||||
return &Mimir{
|
||||
endpoint: endpoint,
|
||||
client: c,
|
||||
logger: cfg.Logger,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// do execute an HTTP requests against the specified path and method using the specified payload.
|
||||
// It returns the HTTP response.
|
||||
func (mc *Mimir) do(ctx context.Context, p, method string, payload io.Reader, out any) (*http.Response, error) {
|
||||
pathURL, err := url.Parse(p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
endpoint := *mc.endpoint
|
||||
endpoint.Path = path.Join(endpoint.Path, pathURL.Path)
|
||||
|
||||
r, err := http.NewRequestWithContext(ctx, method, endpoint.String(), payload)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
r.Header.Set("Accept", "application/json")
|
||||
r.Header.Set("Content-Type", "application/json")
|
||||
|
||||
resp, err := mc.client.Do(r)
|
||||
if err != nil {
|
||||
msg := "Unable to fulfill request to the Mimir API"
|
||||
mc.logger.Error(msg, "err", err, "url", r.URL.String(), "method", r.Method)
|
||||
return nil, fmt.Errorf("%s: %w", msg, err)
|
||||
}
|
||||
defer func() {
|
||||
if err := resp.Body.Close(); err != nil {
|
||||
mc.logger.Error("Error closing HTTP body", "err", err, "url", r.URL.String(), "method", r.Method)
|
||||
}
|
||||
}()
|
||||
|
||||
ct := resp.Header.Get("Content-Type")
|
||||
if !strings.HasPrefix(ct, "application/json") {
|
||||
msg := "Response content-type is not application/json"
|
||||
mc.logger.Error(msg, "content-type", "url", r.URL.String(), "method", r.Method, ct, "status", resp.StatusCode)
|
||||
return nil, fmt.Errorf("%s: %s", msg, ct)
|
||||
}
|
||||
|
||||
if out == nil {
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
msg := "Failed to read the request body"
|
||||
mc.logger.Error(msg, "err", err, "url", r.URL.String(), "method", r.Method, "status", resp.StatusCode)
|
||||
return nil, fmt.Errorf("%s: %w", msg, err)
|
||||
}
|
||||
|
||||
if resp.StatusCode/100 != 2 {
|
||||
errResponse := &errorResponse{}
|
||||
err = json.Unmarshal(body, errResponse)
|
||||
|
||||
if err == nil && errResponse.Error() != "" {
|
||||
msg := "Error response from the Mimir API"
|
||||
mc.logger.Error(msg, "err", errResponse, "url", r.URL.String(), "method", r.Method, "status", resp.StatusCode)
|
||||
return nil, fmt.Errorf("%s: %w", msg, errResponse)
|
||||
}
|
||||
|
||||
msg := "Failed to decode non-2xx JSON response"
|
||||
mc.logger.Error(msg, "err", err, "url", r.URL.String(), "method", r.Method, "status", resp.StatusCode)
|
||||
return nil, fmt.Errorf("%s: %w", msg, err)
|
||||
}
|
||||
|
||||
if err = json.Unmarshal(body, out); err != nil {
|
||||
msg := "Failed to decode 2xx JSON response"
|
||||
mc.logger.Error(msg, "err", err, "url", r.URL.String(), "method", r.Method, "status", resp.StatusCode)
|
||||
return nil, fmt.Errorf("%s: %w", msg, err)
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (mc *Mimir) doOK(ctx context.Context, p, method string, payload io.Reader) error {
|
||||
var sr successResponse
|
||||
resp, err := mc.do(ctx, p, method, payload, &sr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err := resp.Body.Close(); err != nil {
|
||||
mc.logger.Error("Error closing HTTP body", "err", err)
|
||||
}
|
||||
}()
|
||||
|
||||
switch sr.Status {
|
||||
case "success":
|
||||
return nil
|
||||
case "error":
|
||||
return errors.New("received an 2xx status code but the request body reflected an error")
|
||||
default:
|
||||
return fmt.Errorf("received an unknown status from the request body: %s", sr.Status)
|
||||
}
|
||||
}
|
@ -0,0 +1,28 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
)
|
||||
|
||||
const mimirTenantHeader = "X-Scope-OrgID"
|
||||
|
||||
type MimirAuthRoundTripper struct {
|
||||
TenantID string
|
||||
Password string
|
||||
Next http.RoundTripper
|
||||
}
|
||||
|
||||
// RoundTrip implements the http.RoundTripper interface
|
||||
// It adds an `X-Scope-OrgID` header with the TenantID if only provided with a tenantID or sets HTTP Basic Authentication if both
|
||||
// a tenantID and a password are provided.
|
||||
func (r *MimirAuthRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
if r.TenantID != "" && r.Password == "" {
|
||||
req.Header.Set(mimirTenantHeader, r.TenantID)
|
||||
}
|
||||
|
||||
if r.TenantID != "" && r.Password != "" {
|
||||
req.SetBasicAuth(r.TenantID, r.Password)
|
||||
}
|
||||
|
||||
return r.Next.RoundTrip(req)
|
||||
}
|
@ -971,7 +971,6 @@ def remote_alertmanager_integration_tests_steps():
|
||||
|
||||
environment = {
|
||||
"AM_TENANT_ID": "test",
|
||||
"AM_PASSWORD": "test",
|
||||
"AM_URL": "http://mimir_backend:8080",
|
||||
}
|
||||
|
||||
|
@ -20,7 +20,7 @@ images = {
|
||||
"plugins_slack": "plugins/slack",
|
||||
"python": "python:3.8",
|
||||
"postgres_alpine": "postgres:12.3-alpine",
|
||||
"mimir": "grafana/mimir:latest",
|
||||
"mimir": "us.gcr.io/kubernetes-dev/mimir:gotjosh-state-config-grafana-663a0ae78",
|
||||
"mysql5": "mysql:5.7.39",
|
||||
"mysql8": "mysql:8.0.32",
|
||||
"redis_alpine": "redis:6.2.11-alpine",
|
||||
|
Loading…
Reference in New Issue
Block a user