mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Alerting: Fetch configuration from the database and run a notification service (#32175)
* Alerting: Fetch configuration from the database and run a notification instance Co-Authored-By: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>
This commit is contained in:
parent
49c4211295
commit
9b52ffc6a9
1
go.sum
1
go.sum
@ -1417,7 +1417,6 @@ github.com/prometheus/prometheus v1.8.2-0.20210217141258-a6be548dbc17 h1:VN3p3Nb
|
||||
github.com/prometheus/prometheus v1.8.2-0.20210217141258-a6be548dbc17/go.mod h1:dv3B1syqmkrkmo665MPCU6L8PbTXIiUeg/OEQULLNxA=
|
||||
github.com/prometheus/statsd_exporter v0.15.0/go.mod h1:Dv8HnkoLQkeEjkIE4/2ndAA7WL1zHKK7WMqFQqu72rw=
|
||||
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
|
||||
github.com/quasilyte/go-ruleguard/dsl/fluent v0.0.0-20201222093424-5d7e62a465d3 h1:eL7x4/zMnlquMxYe7V078BD7MGskZ0daGln+SJCVzuY=
|
||||
github.com/quasilyte/go-ruleguard/dsl/fluent v0.0.0-20201222093424-5d7e62a465d3/go.mod h1:P7JlQWFT7jDcFZMtUPQbtGzzzxva3rBn6oIF+LPwFcM=
|
||||
github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1/go.mod h1:JaY6n2sDr+z2WTsXkOmNRUfDy6FN0L6Nk7x06ndm4tY=
|
||||
github.com/rainycape/unidecode v0.0.0-20150907023854-cb7f23ec59be h1:ta7tUOvsPHVHGom5hKW5VXNc2xZIkfCKP8iaqOyYtUQ=
|
||||
|
@ -4,25 +4,24 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/datasourceproxy"
|
||||
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/schedule"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/store"
|
||||
|
||||
"github.com/go-macaron/binding"
|
||||
|
||||
apimodels "github.com/grafana/alerting-api/pkg/api"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana/pkg/api/response"
|
||||
"github.com/grafana/grafana/pkg/api/routing"
|
||||
"github.com/grafana/grafana/pkg/bus"
|
||||
"github.com/grafana/grafana/pkg/expr/translate"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/middleware"
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/services/datasourceproxy"
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/schedule"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/store"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/tsdb"
|
||||
"github.com/grafana/grafana/pkg/util"
|
||||
@ -31,6 +30,10 @@ import (
|
||||
// timeNow makes it possible to test usage of time
|
||||
var timeNow = time.Now
|
||||
|
||||
type Alertmanager interface {
|
||||
ApplyConfig(config *apimodels.PostableUserConfig) error
|
||||
}
|
||||
|
||||
// API handlers.
|
||||
type API struct {
|
||||
Cfg *setting.Cfg
|
||||
@ -40,6 +43,7 @@ type API struct {
|
||||
Schedule schedule.ScheduleService
|
||||
Store store.Store
|
||||
DataProxy *datasourceproxy.DatasourceProxyService
|
||||
Alertmanager Alertmanager
|
||||
}
|
||||
|
||||
// RegisterAPIEndpoints registers API handlers
|
||||
|
@ -107,18 +107,3 @@ func alertInstanceMigration(mg *migrator.Migrator) {
|
||||
mg.AddMigration("add index in alert_instance table on def_org_id, def_uid and current_state columns", migrator.NewAddIndexMigration(alertInstance, alertInstance.Indices[0]))
|
||||
mg.AddMigration("add index in alert_instance table on def_org_id, current_state columns", migrator.NewAddIndexMigration(alertInstance, alertInstance.Indices[1]))
|
||||
}
|
||||
|
||||
func alertmanagerConfigurationMigration(mg *migrator.Migrator) {
|
||||
alertConfiguration := migrator.Table{
|
||||
Name: "alert_configuration",
|
||||
Columns: []*migrator.Column{
|
||||
{Name: "id", Type: migrator.DB_BigInt, IsPrimaryKey: true, IsAutoIncrement: true},
|
||||
{Name: "alertmanager_configuration", Type: migrator.DB_Text, Nullable: false},
|
||||
{Name: "configuration_version", Type: migrator.DB_NVarchar, Length: 3}, // In a format of vXX e.g. v1, v2, v10, etc
|
||||
{Name: "created_at", Type: migrator.DB_Int, Nullable: false},
|
||||
{Name: "updated_at", Type: migrator.DB_Int, Nullable: false},
|
||||
},
|
||||
}
|
||||
|
||||
mg.AddMigration("create_alert_configuration_table", migrator.NewAddTableMigration(alertConfiguration))
|
||||
}
|
||||
|
@ -9,7 +9,6 @@ type AlertConfiguration struct {
|
||||
AlertmanagerConfiguration string
|
||||
ConfigurationVersion string
|
||||
CreatedAt time.Time `xorm:"created"`
|
||||
UpdatedAt time.Time `xorm:"updated"`
|
||||
}
|
||||
|
||||
// GetLatestAlertmanagerConfigurationQuery is the query to get the latest alertmanager configuration.
|
||||
|
@ -4,23 +4,22 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/datasourceproxy"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/api"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/schedule"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/store"
|
||||
|
||||
"github.com/benbjohnson/clock"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||
"github.com/grafana/grafana/pkg/tsdb"
|
||||
|
||||
"github.com/grafana/grafana/pkg/api/routing"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/registry"
|
||||
"github.com/grafana/grafana/pkg/services/datasourceproxy"
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/api"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/notifier"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/schedule"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/store"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/tsdb"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -42,6 +41,7 @@ type AlertNG struct {
|
||||
RouteRegister routing.RouteRegister `inject:""`
|
||||
SQLStore *sqlstore.SQLStore `inject:""`
|
||||
DataService *tsdb.Service `inject:""`
|
||||
Alertmanager *notifier.Alertmanager `inject:""`
|
||||
DataProxy *datasourceproxy.DatasourceProxyService `inject:""`
|
||||
Log log.Logger
|
||||
schedule schedule.ScheduleService
|
||||
@ -76,7 +76,9 @@ func (ng *AlertNG) Init() error {
|
||||
DataService: ng.DataService,
|
||||
Schedule: ng.schedule,
|
||||
DataProxy: ng.DataProxy,
|
||||
Store: store}
|
||||
Store: store,
|
||||
Alertmanager: ng.Alertmanager,
|
||||
}
|
||||
api.RegisterAPIEndpoints()
|
||||
|
||||
return nil
|
||||
@ -107,5 +109,4 @@ func (ng *AlertNG) AddMigration(mg *migrator.Migrator) {
|
||||
addAlertDefinitionVersionMigrations(mg)
|
||||
// Create alert_instance table
|
||||
alertInstanceMigration(mg)
|
||||
alertmanagerConfigurationMigration(mg)
|
||||
}
|
||||
|
@ -5,7 +5,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
gokit_log "github.com/go-kit/kit/log"
|
||||
|
||||
"github.com/prometheus/alertmanager/api/v2/models"
|
||||
"github.com/prometheus/alertmanager/notify"
|
||||
"github.com/prometheus/alertmanager/provider"
|
||||
@ -23,7 +24,6 @@ type PostableAlert struct {
|
||||
|
||||
type AlertProvider struct {
|
||||
provider.Alerts
|
||||
logger log.Logger
|
||||
|
||||
// TODO(codesome): This stage is temporary to get code out quickly.
|
||||
// Eventually, the alerts meant directly for receivers and not routing
|
||||
@ -37,8 +37,8 @@ type AlertProvider struct {
|
||||
// NewAlertProvider returns AlertProvider that also supports legacy alerts via PutPostableAlert.
|
||||
// The notify.Stage should be of the type notify.RoutingStage or something similar that takes
|
||||
// notification channel name from the context.
|
||||
func NewAlertProvider(s notify.Stage, m types.Marker, l log.Logger) (*AlertProvider, error) {
|
||||
alerts, err := mem.NewAlerts(context.Background(), m, 30*time.Minute, l)
|
||||
func NewAlertProvider(s notify.Stage, m types.Marker) (*AlertProvider, error) {
|
||||
alerts, err := mem.NewAlerts(context.Background(), m, 30*time.Minute, gokit_log.NewNopLogger())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -46,7 +46,6 @@ func NewAlertProvider(s notify.Stage, m types.Marker, l log.Logger) (*AlertProvi
|
||||
return &AlertProvider{
|
||||
Alerts: alerts,
|
||||
stage: s,
|
||||
logger: l,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -66,7 +65,7 @@ func (ap *AlertProvider) PutPostableAlert(alerts ...*PostableAlert) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(alertsWithReceivers) == 0 {
|
||||
if len(alertsWithReceivers) == 0 || ap.stage == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -81,7 +80,7 @@ func (ap *AlertProvider) PutPostableAlert(alerts ...*PostableAlert) error {
|
||||
for recv, alerts := range groupedAlerts {
|
||||
ap.stageMtx.Lock()
|
||||
ctx := notify.WithReceiverName(context.Background(), recv)
|
||||
_, _, err := ap.stage.Exec(ctx, ap.logger, alerts...)
|
||||
_, _, err := ap.stage.Exec(ctx, gokit_log.NewNopLogger(), alerts...)
|
||||
ap.stageMtx.Unlock()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -20,7 +20,7 @@ func TestAlertProvider_PutPostableAlert(t *testing.T) {
|
||||
stage := &mockStage{alerts: make(map[string][]*types.Alert)}
|
||||
provider := &mockAlertProvider{}
|
||||
|
||||
ap, err := NewAlertProvider(stage, marker, log.NewNopLogger())
|
||||
ap, err := NewAlertProvider(stage, marker)
|
||||
require.NoError(t, err)
|
||||
ap.Alerts = provider
|
||||
|
||||
|
@ -9,7 +9,6 @@ import (
|
||||
|
||||
gokit_log "github.com/go-kit/kit/log"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/alertmanager/config"
|
||||
"github.com/prometheus/alertmanager/dispatch"
|
||||
"github.com/prometheus/alertmanager/nflog"
|
||||
"github.com/prometheus/alertmanager/nflog/nflogpb"
|
||||
@ -17,17 +16,30 @@ import (
|
||||
"github.com/prometheus/alertmanager/pkg/labels"
|
||||
"github.com/prometheus/alertmanager/silence"
|
||||
"github.com/prometheus/alertmanager/silence/silencepb"
|
||||
"github.com/prometheus/alertmanager/template"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
"github.com/grafana/alerting-api/pkg/api"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/registry"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/notifier/channels"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/store"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
)
|
||||
|
||||
const (
|
||||
workingDir = "alerting"
|
||||
)
|
||||
|
||||
type Alertmanager struct {
|
||||
logger log.Logger
|
||||
logger log.Logger
|
||||
Settings *setting.Cfg `inject:""`
|
||||
SQLStore *sqlstore.SQLStore `inject:""`
|
||||
Store store.AlertingStore
|
||||
|
||||
// notificationLog keeps tracks of which notifications we've fired already.
|
||||
notificationLog *nflog.Log
|
||||
@ -52,8 +64,9 @@ func (am *Alertmanager) IsDisabled() bool {
|
||||
|
||||
func (am *Alertmanager) Init() (err error) {
|
||||
am.logger = log.New("alertmanager")
|
||||
//TODO: Speak with David Parrot wrt to the marker, we'll probably need our own.
|
||||
am.marker = types.NewMarker(prometheus.DefaultRegisterer)
|
||||
r := prometheus.NewRegistry()
|
||||
am.marker = types.NewMarker(r)
|
||||
am.Store = store.DBstore{SQLStore: am.SQLStore}
|
||||
|
||||
am.notificationLog, err = nflog.New(
|
||||
nflog.WithRetention(time.Hour*24), //TODO: This is a setting.
|
||||
@ -70,14 +83,20 @@ func (am *Alertmanager) Init() (err error) {
|
||||
return errors.Wrap(err, "unable to initialize the silencing component of alerting")
|
||||
}
|
||||
|
||||
am.alerts, err = NewAlertProvider(nil, am.marker)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to initialize the alert provider component of alerting")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (am *Alertmanager) Run(ctx context.Context) error {
|
||||
// Make sure dispatcher starts. We can tolerate future reload failures.
|
||||
if err := am.ReloadConfigFromDatabase(); err != nil {
|
||||
if err := am.SyncAndApplyConfigFromDatabase(); err != nil && !errors.Is(err, store.ErrNoAlertmanagerConfiguration) {
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@ -85,13 +104,21 @@ func (am *Alertmanager) Run(ctx context.Context) error {
|
||||
return nil
|
||||
case <-time.After(1 * time.Minute):
|
||||
// TODO: once we have a check to skip reload on same config, uncomment this.
|
||||
//if err := am.ReloadConfigFromDatabase(); err != nil {
|
||||
// am.logger.Error("failed to sync config from database", "error", err)
|
||||
//if err := am.SyncAndApplyConfigFromDatabase(); err != nil {
|
||||
// if err == store.ErrNoAlertmanagerConfiguration {
|
||||
// am.logger.Warn(errors.Wrap(err, "unable to sync configuration").Error())
|
||||
// }
|
||||
// am.logger.Error(errors.Wrap(err, "unable to sync configuration").Error())
|
||||
//}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// AddMigration runs the database migrations as the service starts.
|
||||
func (am *Alertmanager) AddMigration(mg *migrator.Migrator) {
|
||||
alertmanagerConfigurationMigration(mg)
|
||||
}
|
||||
|
||||
func (am *Alertmanager) StopAndWait() {
|
||||
if am.dispatcher != nil {
|
||||
am.dispatcher.Stop()
|
||||
@ -99,43 +126,67 @@ func (am *Alertmanager) StopAndWait() {
|
||||
am.dispatcherWG.Wait()
|
||||
}
|
||||
|
||||
// ReloadConfigFromDatabase picks the latest config from database and restarts
|
||||
// SyncAndApplyConfigFromDatabase picks the latest config from database and restarts
|
||||
// the components with the new config.
|
||||
func (am *Alertmanager) ReloadConfigFromDatabase() error {
|
||||
func (am *Alertmanager) SyncAndApplyConfigFromDatabase() error {
|
||||
am.reloadConfigMtx.Lock()
|
||||
defer am.reloadConfigMtx.Unlock()
|
||||
|
||||
// TODO: check if config is same as before using hashes and skip reload in case they are same.
|
||||
cfg, err := getConfigFromDatabase()
|
||||
cfg, err := am.getConfigFromDatabase()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "get config from database")
|
||||
}
|
||||
return errors.Wrap(am.ApplyConfig(cfg), "reload from config")
|
||||
}
|
||||
|
||||
func getConfigFromDatabase() (*api.PostableApiAlertingConfig, error) {
|
||||
// TODO: get configs from the database.
|
||||
return &api.PostableApiAlertingConfig{}, nil
|
||||
func (am *Alertmanager) getConfigFromDatabase() (*api.PostableUserConfig, error) {
|
||||
// First, let's get the configuration we need from the database.
|
||||
q := &models.GetLatestAlertmanagerConfigurationQuery{}
|
||||
if err := am.Store.GetLatestAlertmanagerConfiguration(q); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Then, let's parse and return the alertmanager configuration.
|
||||
return Load(q.Result.AlertmanagerConfiguration)
|
||||
}
|
||||
|
||||
// ApplyConfig applies a new configuration by re-initializing all components using the configuration provided.
|
||||
// It is not safe to call concurrently.
|
||||
func (am *Alertmanager) ApplyConfig(cfg *api.PostableApiAlertingConfig) error {
|
||||
func (am *Alertmanager) ApplyConfig(cfg *api.PostableUserConfig) error {
|
||||
// First, we need to make sure we persist the templates to disk.
|
||||
paths, _, err := PersistTemplates(cfg, am.WorkingDirPath())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// With the templates persisted, create the template list using the paths.
|
||||
tmpl, err := template.FromGlobs(paths...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Finally, build the integrations map using the receiver configuration and templates.
|
||||
integrationsMap, err := am.buildIntegrationsMap(cfg.AlertmanagerConfig.Receivers, tmpl)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Now, let's put together our notification pipeline
|
||||
receivers := buildIntegrationsMap()
|
||||
routingStage := make(notify.RoutingStage, len(receivers))
|
||||
routingStage := make(notify.RoutingStage, len(integrationsMap))
|
||||
|
||||
silencingStage := notify.NewMuteStage(silence.NewSilencer(am.silences, am.marker, gokit_log.NewNopLogger()))
|
||||
//TODO: We need to unify these receivers
|
||||
for name := range receivers {
|
||||
stage := createReceiverStage(name, receivers[name], waitFunc, am.notificationLog)
|
||||
for name := range integrationsMap {
|
||||
stage := createReceiverStage(name, integrationsMap[name], waitFunc, am.notificationLog)
|
||||
routingStage[name] = notify.MultiStage{silencingStage, stage}
|
||||
}
|
||||
|
||||
am.alerts.SetStage(routingStage)
|
||||
|
||||
am.StopAndWait()
|
||||
am.dispatcher = dispatch.NewDispatcher(am.alerts, BuildRoutingConfiguration(), routingStage, am.marker, timeoutFunc, gokit_log.NewNopLogger(), nil)
|
||||
//TODO: Verify this is correct
|
||||
route := dispatch.NewRoute(cfg.AlertmanagerConfig.Route, nil)
|
||||
//TODO: This needs the metrics
|
||||
am.dispatcher = dispatch.NewDispatcher(am.alerts, route, routingStage, am.marker, timeoutFunc, gokit_log.NewNopLogger(), nil)
|
||||
|
||||
am.dispatcherWG.Add(1)
|
||||
go func() {
|
||||
@ -146,6 +197,43 @@ func (am *Alertmanager) ApplyConfig(cfg *api.PostableApiAlertingConfig) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (am *Alertmanager) WorkingDirPath() string {
|
||||
return filepath.Join(am.Settings.DataPath, workingDir)
|
||||
}
|
||||
|
||||
// buildIntegrationsMap builds a map of name to the list of Grafana integration notifiers off of a list of receiver config.
|
||||
func (am *Alertmanager) buildIntegrationsMap(receivers []*api.PostableApiReceiver, templates *template.Template) (map[string][]notify.Integration, error) {
|
||||
integrationsMap := make(map[string][]notify.Integration, len(receivers))
|
||||
for _, receiver := range receivers {
|
||||
integrations, err := am.buildReceiverIntegrations(receiver, templates)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
integrationsMap[receiver.Name] = integrations
|
||||
}
|
||||
|
||||
return integrationsMap, nil
|
||||
}
|
||||
|
||||
// buildReceiverIntegrations builds a list of integration notifiers off of a receiver config.
|
||||
func (am *Alertmanager) buildReceiverIntegrations(receiver *api.PostableApiReceiver, _ *template.Template) ([]notify.Integration, error) {
|
||||
var integrations []notify.Integration
|
||||
|
||||
for i, r := range receiver.GrafanaManagedReceivers {
|
||||
switch r.Type {
|
||||
case "email":
|
||||
n, err := channels.NewEmailNotifier(r.Result)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
integrations = append(integrations, notify.NewIntegration(n, n, r.Name, i))
|
||||
}
|
||||
}
|
||||
|
||||
return integrations, nil
|
||||
}
|
||||
|
||||
// CreateAlerts receives the alerts and then sends them through the corresponding route based on whenever the alert has a receiver embedded or not
|
||||
func (am *Alertmanager) CreateAlerts(alerts ...*PostableAlert) error {
|
||||
return am.alerts.PutPostableAlert(alerts...)
|
||||
@ -232,16 +320,6 @@ func createReceiverStage(name string, integrations []notify.Integration, wait fu
|
||||
return fs
|
||||
}
|
||||
|
||||
// BuildRoutingConfiguration produces an alertmanager-based routing configuration.
|
||||
func BuildRoutingConfiguration() *dispatch.Route {
|
||||
var cfg *config.Config
|
||||
return dispatch.NewRoute(cfg.Route, nil)
|
||||
}
|
||||
|
||||
func buildIntegrationsMap() map[string][]notify.Integration {
|
||||
return map[string][]notify.Integration{}
|
||||
}
|
||||
|
||||
func waitFunc() time.Duration {
|
||||
return setting.AlertingNotificationTimeout
|
||||
}
|
||||
|
13
pkg/services/ngalert/notifier/alertmanager_test.go
Normal file
13
pkg/services/ngalert/notifier/alertmanager_test.go
Normal file
@ -0,0 +1,13 @@
|
||||
package notifier
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestAlertmanager(t *testing.T) {
|
||||
t.SkipNow()
|
||||
am := &Alertmanager{}
|
||||
require.NoError(t, am.Init())
|
||||
}
|
@ -57,7 +57,7 @@ func NewEmailNotifier(model *models.AlertNotification) (*EmailNotifier, error) {
|
||||
}
|
||||
|
||||
// Notify sends the alert notification.
|
||||
func (en *EmailNotifier) Notify(ctx context.Context, as ...*types.Alert) error {
|
||||
func (en *EmailNotifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) {
|
||||
// TODO(codesome): make sure the receiver name is added in the ctx before calling this.
|
||||
ctx = notify.WithReceiverName(ctx, "email-notification-channel") // Dummy.
|
||||
// TODO(codesome): make sure the group labels is added in the ctx before calling this.
|
||||
@ -88,5 +88,14 @@ func (en *EmailNotifier) Notify(ctx context.Context, as ...*types.Alert) error {
|
||||
},
|
||||
}
|
||||
|
||||
return bus.DispatchCtx(ctx, cmd)
|
||||
if err := bus.DispatchCtx(ctx, cmd); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (en *EmailNotifier) SendResolved() bool {
|
||||
// TODO: implement this.
|
||||
return true
|
||||
}
|
||||
|
99
pkg/services/ngalert/notifier/config.go
Normal file
99
pkg/services/ngalert/notifier/config.go
Normal file
@ -0,0 +1,99 @@
|
||||
package notifier
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/grafana/alerting-api/pkg/api"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"gopkg.in/yaml.v2"
|
||||
)
|
||||
|
||||
func PersistTemplates(cfg *api.PostableUserConfig, path string) ([]string, bool, error) {
|
||||
if len(cfg.TemplateFiles) < 1 {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
var templatesChanged bool
|
||||
pathSet := map[string]struct{}{}
|
||||
for name, content := range cfg.TemplateFiles {
|
||||
if name != filepath.Base(filepath.Clean(name)) {
|
||||
return nil, false, fmt.Errorf("template file name '%s' is not valid", name)
|
||||
}
|
||||
|
||||
err := os.MkdirAll(path, 0750)
|
||||
if err != nil {
|
||||
return nil, false, fmt.Errorf("unable to create template directory %q: %s", path, err)
|
||||
}
|
||||
|
||||
file := filepath.Join(path, name)
|
||||
pathSet[file] = struct{}{}
|
||||
|
||||
// Check if the template file already exists and if it has changed
|
||||
// We can safeily ignore gosec here and we've previously checked the filename is clean
|
||||
// nolint:gosec
|
||||
if tmpl, err := ioutil.ReadFile(file); err == nil && string(tmpl) == content {
|
||||
// Templates file is the same we have, no-op and continue.
|
||||
continue
|
||||
} else if err != nil && !os.IsNotExist(err) {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
if err := ioutil.WriteFile(file, []byte(content), 0644); err != nil {
|
||||
return nil, false, fmt.Errorf("unable to create Alertmanager template file %q: %s", file, err)
|
||||
}
|
||||
// nolint:gosec
|
||||
|
||||
templatesChanged = true
|
||||
}
|
||||
|
||||
// Now that we have the list of _actual_ templates, let's remove the ones that we don't need.
|
||||
existingFiles, err := ioutil.ReadDir(path)
|
||||
if err != nil {
|
||||
log.Error("unable to read directory for deleting alertmanager templates", "err", err, "path", path)
|
||||
}
|
||||
for _, existingFile := range existingFiles {
|
||||
p := filepath.Join(path, existingFile.Name())
|
||||
_, ok := pathSet[p]
|
||||
if !ok {
|
||||
templatesChanged = true
|
||||
err := os.Remove(p)
|
||||
if err != nil {
|
||||
log.Error("unable to delete template", "err", err, "file", p)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
paths := make([]string, 0, len(pathSet))
|
||||
for path := range pathSet {
|
||||
paths = append(paths, path)
|
||||
}
|
||||
return paths, templatesChanged, nil
|
||||
}
|
||||
|
||||
func Load(rawConfig string) (*api.PostableUserConfig, error) {
|
||||
cfg := &api.PostableUserConfig{}
|
||||
|
||||
if err := yaml.UnmarshalStrict([]byte(rawConfig), cfg); err != nil {
|
||||
return nil, errors.Wrap(err, "unable to parse Alertmanager configuration")
|
||||
}
|
||||
|
||||
// Taken from https://github.com/prometheus/alertmanager/blob/master/config/config.go#L170-L191
|
||||
// Check if we have a root route. We cannot check for it in the
|
||||
// UnmarshalYAML method because it won't be called if the input is empty
|
||||
// (e.g. the config file is empty or only contains whitespace).
|
||||
if cfg.AlertmanagerConfig.Route == nil {
|
||||
return nil, errors.New("no route provided in config")
|
||||
}
|
||||
|
||||
// Check if continue in root route.
|
||||
if cfg.AlertmanagerConfig.Route.Continue {
|
||||
return nil, errors.New("cannot have continue in root route")
|
||||
}
|
||||
|
||||
return cfg, nil
|
||||
}
|
153
pkg/services/ngalert/notifier/config_test.go
Normal file
153
pkg/services/ngalert/notifier/config_test.go
Normal file
@ -0,0 +1,153 @@
|
||||
package notifier
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io/ioutil"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/grafana/alerting-api/pkg/api"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestPersistTemplates(t *testing.T) {
|
||||
tc := []struct {
|
||||
name string
|
||||
templates map[string]string
|
||||
existingTemplates map[string]string
|
||||
expectedPaths []string
|
||||
expectedError error
|
||||
expectedChange bool
|
||||
}{
|
||||
{
|
||||
name: "With valid templates file names, it persists successfully",
|
||||
templates: map[string]string{"email.template": "a perfectly fine template"},
|
||||
expectedChange: true,
|
||||
expectedError: nil,
|
||||
expectedPaths: []string{"email.template"},
|
||||
},
|
||||
{
|
||||
name: "With a invalid filename, it fails",
|
||||
templates: map[string]string{"adirectory/email.template": "a perfectly fine template"},
|
||||
expectedError: errors.New("template file name 'adirectory/email.template' is not valid"),
|
||||
},
|
||||
{
|
||||
name: "with a template that has the same name but different content to an existing one",
|
||||
existingTemplates: map[string]string{"email.template": "a perfectly fine template"},
|
||||
templates: map[string]string{"email.template": "a completely different content"},
|
||||
expectedChange: true,
|
||||
expectedError: nil,
|
||||
expectedPaths: []string{"email.template"},
|
||||
},
|
||||
{
|
||||
name: "with a template that has the same name and the same content as an existing one",
|
||||
existingTemplates: map[string]string{"email.template": "a perfectly fine template"},
|
||||
templates: map[string]string{"email.template": "a perfectly fine template"},
|
||||
expectedChange: false,
|
||||
expectedError: nil,
|
||||
expectedPaths: []string{"email.template"},
|
||||
},
|
||||
{
|
||||
name: "with two new template files, it changes the template tree",
|
||||
existingTemplates: map[string]string{"email.template": "a perfectly fine template"},
|
||||
templates: map[string]string{"slack.template": "a perfectly fine template", "webhook.template": "a webhook template"},
|
||||
expectedChange: true,
|
||||
expectedError: nil,
|
||||
expectedPaths: []string{"slack.template", "webhook.template"},
|
||||
},
|
||||
{
|
||||
name: "when we remove a template file from the list, it changes the template tree",
|
||||
existingTemplates: map[string]string{"slack.template": "a perfectly fine template", "webhook.template": "a webhook template"},
|
||||
templates: map[string]string{"slack.template": "a perfectly fine template"},
|
||||
expectedChange: true,
|
||||
expectedError: nil,
|
||||
expectedPaths: []string{"slack.template"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tc {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
// Write "existing files"
|
||||
for name, content := range tt.existingTemplates {
|
||||
err := ioutil.WriteFile(filepath.Join(dir, name), []byte(content), 0644)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
c := &api.PostableUserConfig{TemplateFiles: tt.templates}
|
||||
|
||||
paths, changed, persistErr := PersistTemplates(c, dir)
|
||||
|
||||
files := map[string]string{}
|
||||
readFiles, err := ioutil.ReadDir(dir)
|
||||
require.NoError(t, err)
|
||||
for _, f := range readFiles {
|
||||
if f.IsDir() || f.Name() == "" {
|
||||
continue
|
||||
}
|
||||
// Safe to disable, this is a test.
|
||||
// nolint:gosec
|
||||
content, err := ioutil.ReadFile(filepath.Join(dir, f.Name()))
|
||||
// nolint:gosec
|
||||
require.NoError(t, err)
|
||||
files[f.Name()] = string(content)
|
||||
}
|
||||
|
||||
// Given we use a temporary directory in tests, we need to prepend the expected paths with it.
|
||||
for i, p := range tt.expectedPaths {
|
||||
tt.expectedPaths[i] = filepath.Join(dir, p)
|
||||
}
|
||||
|
||||
require.Equal(t, tt.expectedError, persistErr)
|
||||
require.ElementsMatch(t, tt.expectedPaths, paths)
|
||||
require.Equal(t, tt.expectedChange, changed)
|
||||
if tt.expectedError == nil {
|
||||
require.Equal(t, tt.templates, files)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoad(t *testing.T) {
|
||||
tc := []struct {
|
||||
name string
|
||||
rawConfig string
|
||||
expectedTemplates map[string]string
|
||||
expectedError error
|
||||
}{
|
||||
{
|
||||
name: "with a valid config and template",
|
||||
rawConfig: `
|
||||
alertmanager_config:
|
||||
global:
|
||||
smtp_from: noreply@grafana.net
|
||||
route:
|
||||
receiver: email
|
||||
receivers:
|
||||
template_files:
|
||||
'email.template': something with a pretty good content
|
||||
`,
|
||||
expectedTemplates: map[string]string{"email.template": "something with a pretty good content"},
|
||||
},
|
||||
{
|
||||
name: "with an empty configuration, it is not valid.",
|
||||
rawConfig: "",
|
||||
expectedError: errors.New("no route provided in config"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tc {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
c, err := Load(tt.rawConfig)
|
||||
|
||||
if tt.expectedError != nil {
|
||||
assert.Nil(t, c)
|
||||
assert.Equal(t, tt.expectedError.Error(), err.Error())
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, c.TemplateFiles)
|
||||
assert.Equal(t, tt.expectedTemplates, c.TemplateFiles)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
17
pkg/services/ngalert/notifier/database_migrations.go
Normal file
17
pkg/services/ngalert/notifier/database_migrations.go
Normal file
@ -0,0 +1,17 @@
|
||||
package notifier
|
||||
|
||||
import "github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||
|
||||
func alertmanagerConfigurationMigration(mg *migrator.Migrator) {
|
||||
alertConfiguration := migrator.Table{
|
||||
Name: "alert_configuration",
|
||||
Columns: []*migrator.Column{
|
||||
{Name: "id", Type: migrator.DB_BigInt, IsPrimaryKey: true, IsAutoIncrement: true},
|
||||
{Name: "alertmanager_configuration", Type: migrator.DB_Text, Nullable: false},
|
||||
{Name: "configuration_version", Type: migrator.DB_NVarchar, Length: 3}, // In a format of vXX e.g. v1, v2, v10, etc
|
||||
{Name: "created_at", Type: migrator.DB_Int, Nullable: false},
|
||||
},
|
||||
}
|
||||
|
||||
mg.AddMigration("create_alert_configuration_table", migrator.NewAddTableMigration(alertConfiguration))
|
||||
}
|
1
pkg/services/ngalert/notifier/silences.go
Normal file
1
pkg/services/ngalert/notifier/silences.go
Normal file
@ -0,0 +1 @@
|
||||
package notifier
|
@ -10,7 +10,7 @@ import (
|
||||
|
||||
var (
|
||||
// ErrNoAlertmanagerConfiguration is an error for when no alertmanager configuration is found.
|
||||
ErrNoAlertmanagerConfiguration = fmt.Errorf("could not find an alert configuration")
|
||||
ErrNoAlertmanagerConfiguration = fmt.Errorf("could not find an Alertmanager configuration")
|
||||
)
|
||||
|
||||
// GetLatestAlertmanagerConfiguration returns the lastest version of the alertmanager configuration.
|
||||
|
@ -35,8 +35,10 @@ type Store interface {
|
||||
SaveAlertInstance(cmd *models.SaveAlertInstanceCommand) error
|
||||
ValidateAlertDefinition(*models.AlertDefinition, bool) error
|
||||
UpdateAlertDefinitionPaused(*models.UpdateAlertDefinitionPausedCommand) error
|
||||
}
|
||||
|
||||
// Alertmanager
|
||||
// AlertingStore is the database interface used by the Alertmanager service.
|
||||
type AlertingStore interface {
|
||||
GetLatestAlertmanagerConfiguration(cmd *models.GetLatestAlertmanagerConfigurationQuery) error
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user