3
0
mirror of https://github.com/grafana/grafana.git synced 2025-02-25 18:55:37 -06:00

Alerting: Send alerts to external Alertmanager(s) ()

* Alerting: Send alerts to external Alertmanager(s)

Within this PR we're adding support for registering or unregistering
sending to a set of external alertmanagers. A few of the things that are
going are:

- Introduce a new table to hold "admin" (either org or global)
  configuration we can change at runtime.
- A new periodic check that polls for this configuration and adjusts the
  "senders" accordingly.
- Introduces a new concept of "senders" that are responsible for
  shipping the alerts to the external Alertmanager(s). In a nutshell,
this is the Prometheus notifier (the one in charge of sending the alert)
mapped to a multi-tenant map.

There are a few code movements here and there but those are minor, I
tried to keep things intact as much as possible so that we could have an
easier diff.
This commit is contained in:
gotjosh 2021-08-06 13:06:56 +01:00 committed by GitHub
parent 7e42bb5df0
commit f83cd401e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 1666 additions and 128 deletions

2
go.mod
View File

@ -96,7 +96,7 @@ require (
github.com/xorcare/pointer v1.1.0
github.com/yudai/gojsondiff v1.0.0
go.opentelemetry.io/collector v0.31.0
go.opentelemetry.io/collector/model v0.31.0 // indirect
go.opentelemetry.io/collector/model v0.31.0
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e
golang.org/x/exp v0.0.0-20210220032938-85be41e4509f // indirect
golang.org/x/net v0.0.0-20210614182718-04defd469f4e

1
go.sum
View File

@ -213,6 +213,7 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15 h1:AUNCr9CiJuwrRYS3XieqF+Z9B9gNxo/eANAJCF2eiN4=
github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis v2.5.0+incompatible/go.mod h1:8HZjEj4yU0dwhYHky+DxYx+6BMjkBbe5ONFIF1MXffk=

View File

@ -3,18 +3,16 @@ package api
import (
"time"
"github.com/grafana/grafana/pkg/services/quota"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
"github.com/grafana/grafana/pkg/services/ngalert/state"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/datasourceproxy"
"github.com/grafana/grafana/pkg/services/datasources"
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
"github.com/grafana/grafana/pkg/services/ngalert/schedule"
"github.com/grafana/grafana/pkg/services/ngalert/state"
"github.com/grafana/grafana/pkg/services/ngalert/store"
"github.com/grafana/grafana/pkg/services/quota"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb"
)
@ -41,18 +39,19 @@ type Alertmanager interface {
// API handlers.
type API struct {
Cfg *setting.Cfg
DatasourceCache datasources.CacheService
RouteRegister routing.RouteRegister
DataService *tsdb.Service
QuotaService *quota.QuotaService
Schedule schedule.ScheduleService
RuleStore store.RuleStore
InstanceStore store.InstanceStore
AlertingStore store.AlertingStore
DataProxy *datasourceproxy.DatasourceProxyService
Alertmanager Alertmanager
StateManager *state.Manager
Cfg *setting.Cfg
DatasourceCache datasources.CacheService
RouteRegister routing.RouteRegister
DataService *tsdb.Service
QuotaService *quota.QuotaService
Schedule schedule.ScheduleService
RuleStore store.RuleStore
InstanceStore store.InstanceStore
AlertingStore store.AlertingStore
AdminConfigStore store.AdminConfigurationStore
DataProxy *datasourceproxy.DatasourceProxyService
Alertmanager Alertmanager
StateManager *state.Manager
}
// RegisterAPIEndpoints registers API handlers
@ -87,4 +86,8 @@ func (api *API) RegisterAPIEndpoints(m *metrics.Metrics) {
DatasourceCache: api.DatasourceCache,
log: logger,
}, m)
api.RegisterConfigurationApiEndpoints(AdminSrv{
store: api.AdminConfigStore,
log: logger,
}, m)
}

View File

@ -0,0 +1,74 @@
package api
import (
"errors"
"net/http"
"github.com/grafana/grafana/pkg/api/response"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/store"
)
type AdminSrv struct {
store store.AdminConfigurationStore
log log.Logger
}
func (srv AdminSrv) RouteGetNGalertConfig(c *models.ReqContext) response.Response {
if c.OrgRole != models.ROLE_ADMIN {
return accessForbiddenResp()
}
cfg, err := srv.store.GetAdminConfiguration(c.OrgId)
if err != nil {
if errors.Is(err, store.ErrNoAdminConfiguration) {
return ErrResp(http.StatusNotFound, err, "")
}
msg := "failed to fetch admin configuration from the database"
srv.log.Error(msg, "err", err)
return ErrResp(http.StatusInternalServerError, err, msg)
}
resp := apimodels.GettableNGalertConfig{
Alertmanagers: cfg.Alertmanagers,
}
return response.JSON(http.StatusOK, resp)
}
func (srv AdminSrv) RoutePostNGalertConfig(c *models.ReqContext, body apimodels.PostableNGalertConfig) response.Response {
if c.OrgRole != models.ROLE_ADMIN {
return accessForbiddenResp()
}
cfg := &ngmodels.AdminConfiguration{
Alertmanagers: body.Alertmanagers,
OrgID: c.OrgId,
}
cmd := store.UpdateAdminConfigurationCmd{AdminConfiguration: cfg}
if err := srv.store.UpdateAdminConfiguration(cmd); err != nil {
msg := "failed to save the admin configuration to the database"
srv.log.Error(msg, "err", err)
return ErrResp(http.StatusBadRequest, err, msg)
}
return response.JSON(http.StatusCreated, "admin configuration updated")
}
func (srv AdminSrv) RouteDeleteNGalertConfig(c *models.ReqContext) response.Response {
if c.OrgRole != models.ROLE_ADMIN {
return accessForbiddenResp()
}
err := srv.store.DeleteAdminConfiguration(c.OrgId)
if err != nil {
srv.log.Error("unable to delete configuration", "err", err)
return ErrResp(http.StatusInternalServerError, err, "")
}
return response.JSON(http.StatusOK, "admin configuration deleted")
}

View File

@ -4,7 +4,6 @@
*
*Do not manually edit these files, please find ngalert/api/swagger-codegen/ for commands on how to generate them.
*/
package api
import (

View File

@ -0,0 +1,59 @@
/*Package api contains base API implementation of unified alerting
*
*Generated by: Swagger Codegen (https://github.com/swagger-api/swagger-codegen.git)
*
*Do not manually edit these files, please find ngalert/api/swagger-codegen/ for commands on how to generate them.
*/
package api
import (
"net/http"
"github.com/go-macaron/binding"
"github.com/grafana/grafana/pkg/api/response"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/middleware"
"github.com/grafana/grafana/pkg/models"
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
)
type ConfigurationApiService interface {
RouteDeleteNGalertConfig(*models.ReqContext) response.Response
RouteGetNGalertConfig(*models.ReqContext) response.Response
RoutePostNGalertConfig(*models.ReqContext, apimodels.PostableNGalertConfig) response.Response
}
func (api *API) RegisterConfigurationApiEndpoints(srv ConfigurationApiService, m *metrics.Metrics) {
api.RouteRegister.Group("", func(group routing.RouteRegister) {
group.Delete(
toMacaronPath("/api/v1/ngalert/admin_config"),
metrics.Instrument(
http.MethodDelete,
"/api/v1/ngalert/admin_config",
srv.RouteDeleteNGalertConfig,
m,
),
)
group.Get(
toMacaronPath("/api/v1/ngalert/admin_config"),
metrics.Instrument(
http.MethodGet,
"/api/v1/ngalert/admin_config",
srv.RouteGetNGalertConfig,
m,
),
)
group.Post(
toMacaronPath("/api/v1/ngalert/admin_config"),
binding.Bind(apimodels.PostableNGalertConfig{}),
metrics.Instrument(
http.MethodPost,
"/api/v1/ngalert/admin_config",
srv.RoutePostNGalertConfig,
m,
),
)
}, middleware.ReqSignedIn)
}

View File

@ -4,7 +4,6 @@
*
*Do not manually edit these files, please find ngalert/api/swagger-codegen/ for commands on how to generate them.
*/
package api
import (

View File

@ -4,7 +4,6 @@
*
*Do not manually edit these files, please find ngalert/api/swagger-codegen/ for commands on how to generate them.
*/
package api
import (

View File

@ -4,7 +4,6 @@
*
*Do not manually edit these files, please find ngalert/api/swagger-codegen/ for commands on how to generate them.
*/
package api
import (

View File

@ -0,0 +1,51 @@
package definitions
// swagger:route GET /api/v1/ngalert/admin_config configuration RouteGetNGalertConfig
//
// Get the NGalert configuration of the user's organization, returns 404 if no configuration is present.
//
// Produces:
// - application/json
//
// Responses:
// 200: GettableNGalertConfig
// 404: Failure
// 500: Failure
// swagger:route POST /api/v1/ngalert/admin_config configuration RoutePostNGalertConfig
//
// Creates or updates the NGalert configuration of the user's organization.
//
// Consumes:
// - application/json
//
// Responses:
// 201: Ack
// 400: ValidationError
// swagger:route DELETE /api/v1/ngalert/admin_config configuration RouteDeleteNGalertConfig
//
// Deletes the NGalert configuration of the user's organization.
//
// Consumes:
// - application/json
//
// Responses:
// 200: Ack
// 500: Failure
// swagger:parameters RoutePostNGalertConfig
type NGalertConfig struct {
// in:body
Body PostableNGalertConfig
}
// swagger:model
type PostableNGalertConfig struct {
Alertmanagers []string `json:"alertmanagers"`
}
// swagger:model
type GettableNGalertConfig struct {
Alertmanagers []string `json:"alertmanagers"`
}

View File

@ -57,7 +57,9 @@
"type": "object",
"x-go-package": "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
},
"AlertGroup": {},
"AlertGroup": {
"$ref": "#/definitions/alertGroup"
},
"AlertGroups": {
"$ref": "#/definitions/alertGroups"
},
@ -751,6 +753,19 @@
"type": "object",
"x-go-package": "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
},
"GettableNGalertConfig": {
"properties": {
"alertmanagers": {
"items": {
"type": "string"
},
"type": "array",
"x-go-name": "Alertmanagers"
}
},
"type": "object",
"x-go-package": "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
},
"GettableRuleGroupConfig": {
"properties": {
"interval": {
@ -771,12 +786,8 @@
"type": "object",
"x-go-package": "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
},
"GettableSilence": {
"$ref": "#/definitions/gettableSilence"
},
"GettableSilences": {
"$ref": "#/definitions/gettableSilences"
},
"GettableSilence": {},
"GettableSilences": {},
"GettableStatus": {
"properties": {
"cluster": {
@ -1545,6 +1556,19 @@
"type": "object",
"x-go-package": "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
},
"PostableNGalertConfig": {
"properties": {
"alertmanagers": {
"items": {
"type": "string"
},
"type": "array",
"x-go-name": "Alertmanagers"
}
},
"type": "object",
"x-go-package": "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
},
"PostableRuleGroupConfig": {
"properties": {
"interval": {
@ -2425,7 +2449,7 @@
"alerts": {
"description": "alerts",
"items": {
"$ref": "#/definitions/GettableAlert"
"$ref": "#/definitions/gettableAlert"
},
"type": "array",
"x-go-name": "Alerts"
@ -2434,7 +2458,7 @@
"$ref": "#/definitions/labelSet"
},
"receiver": {
"$ref": "#/definitions/receiver"
"$ref": "#/definitions/Receiver"
}
},
"required": [
@ -2601,7 +2625,7 @@
"receivers": {
"description": "receivers",
"items": {
"$ref": "#/definitions/receiver"
"$ref": "#/definitions/Receiver"
},
"type": "array",
"x-go-name": "Receivers"
@ -2639,7 +2663,7 @@
"gettableAlerts": {
"description": "GettableAlerts gettable alerts",
"items": {
"$ref": "#/definitions/GettableAlert"
"$ref": "#/definitions/gettableAlert"
},
"type": "array",
"x-go-name": "GettableAlerts",
@ -2705,7 +2729,7 @@
"gettableSilences": {
"description": "GettableSilences gettable silences",
"items": {
"$ref": "#/definitions/gettableSilence"
"$ref": "#/definitions/GettableSilence"
},
"type": "array",
"x-go-name": "GettableSilences",
@ -3775,6 +3799,95 @@
]
}
},
"/api/v1/ngalert/admin_config": {
"delete": {
"consumes": [
"application/json"
],
"operationId": "RouteDeleteNGalertConfig",
"responses": {
"200": {
"description": "Ack",
"schema": {
"$ref": "#/definitions/Ack"
}
},
"500": {
"description": "Failure",
"schema": {
"$ref": "#/definitions/Failure"
}
}
},
"summary": "Deletes the NGalert configuration of the user's organization.",
"tags": [
"configuration"
]
},
"get": {
"operationId": "RouteGetNGalertConfig",
"produces": [
"application/json"
],
"responses": {
"200": {
"description": "GettableNGalertConfig",
"schema": {
"$ref": "#/definitions/GettableNGalertConfig"
}
},
"404": {
"description": "Failure",
"schema": {
"$ref": "#/definitions/Failure"
}
},
"500": {
"description": "Failure",
"schema": {
"$ref": "#/definitions/Failure"
}
}
},
"summary": "Get the NGalert configuration of the user's organization, returns 404 if no configuration is present.",
"tags": [
"configuration"
]
},
"post": {
"consumes": [
"application/json"
],
"operationId": "RoutePostNGalertConfig",
"parameters": [
{
"in": "body",
"name": "Body",
"schema": {
"$ref": "#/definitions/PostableNGalertConfig"
}
}
],
"responses": {
"201": {
"description": "Ack",
"schema": {
"$ref": "#/definitions/Ack"
}
},
"400": {
"description": "ValidationError",
"schema": {
"$ref": "#/definitions/ValidationError"
}
}
},
"summary": "Creates or updates the NGalert configuration of the user's organization.",
"tags": [
"configuration"
]
}
},
"/api/v1/receiver/test/{Recipient}": {
"post": {
"consumes": [

View File

@ -781,6 +781,95 @@
}
}
},
"/api/v1/ngalert/admin_config": {
"get": {
"produces": [
"application/json"
],
"tags": [
"configuration"
],
"summary": "Get the NGalert configuration of the user's organization, returns 404 if no configuration is present.",
"operationId": "RouteGetNGalertConfig",
"responses": {
"200": {
"description": "GettableNGalertConfig",
"schema": {
"$ref": "#/definitions/GettableNGalertConfig"
}
},
"404": {
"description": "Failure",
"schema": {
"$ref": "#/definitions/Failure"
}
},
"500": {
"description": "Failure",
"schema": {
"$ref": "#/definitions/Failure"
}
}
}
},
"post": {
"consumes": [
"application/json"
],
"tags": [
"configuration"
],
"summary": "Creates or updates the NGalert configuration of the user's organization.",
"operationId": "RoutePostNGalertConfig",
"parameters": [
{
"name": "Body",
"in": "body",
"schema": {
"$ref": "#/definitions/PostableNGalertConfig"
}
}
],
"responses": {
"201": {
"description": "Ack",
"schema": {
"$ref": "#/definitions/Ack"
}
},
"400": {
"description": "ValidationError",
"schema": {
"$ref": "#/definitions/ValidationError"
}
}
}
},
"delete": {
"consumes": [
"application/json"
],
"tags": [
"configuration"
],
"summary": "Deletes the NGalert configuration of the user's organization.",
"operationId": "RouteDeleteNGalertConfig",
"responses": {
"200": {
"description": "Ack",
"schema": {
"$ref": "#/definitions/Ack"
}
},
"500": {
"description": "Failure",
"schema": {
"$ref": "#/definitions/Failure"
}
}
}
}
},
"/api/v1/receiver/test/{Recipient}": {
"post": {
"description": "Test receiver",
@ -927,7 +1016,7 @@
"x-go-package": "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
},
"AlertGroup": {
"$ref": "#/definitions/AlertGroup"
"$ref": "#/definitions/alertGroup"
},
"AlertGroups": {
"$ref": "#/definitions/alertGroups"
@ -1625,6 +1714,19 @@
},
"x-go-package": "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
},
"GettableNGalertConfig": {
"type": "object",
"properties": {
"alertmanagers": {
"type": "array",
"items": {
"type": "string"
},
"x-go-name": "Alertmanagers"
}
},
"x-go-package": "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
},
"GettableRuleGroupConfig": {
"type": "object",
"properties": {
@ -1646,10 +1748,10 @@
"x-go-package": "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
},
"GettableSilence": {
"$ref": "#/definitions/gettableSilence"
"$ref": "#/definitions/GettableSilence"
},
"GettableSilences": {
"$ref": "#/definitions/gettableSilences"
"$ref": "#/definitions/GettableSilences"
},
"GettableStatus": {
"type": "object",
@ -2420,6 +2522,19 @@
},
"x-go-package": "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
},
"PostableNGalertConfig": {
"type": "object",
"properties": {
"alertmanagers": {
"type": "array",
"items": {
"type": "string"
},
"x-go-name": "Alertmanagers"
}
},
"x-go-package": "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
},
"PostableRuleGroupConfig": {
"type": "object",
"properties": {
@ -3310,7 +3425,7 @@
"description": "alerts",
"type": "array",
"items": {
"$ref": "#/definitions/GettableAlert"
"$ref": "#/definitions/gettableAlert"
},
"x-go-name": "Alerts"
},
@ -3318,7 +3433,7 @@
"$ref": "#/definitions/labelSet"
},
"receiver": {
"$ref": "#/definitions/receiver"
"$ref": "#/definitions/Receiver"
}
},
"x-go-name": "AlertGroup",
@ -3491,7 +3606,7 @@
"description": "receivers",
"type": "array",
"items": {
"$ref": "#/definitions/receiver"
"$ref": "#/definitions/Receiver"
},
"x-go-name": "Receivers"
},
@ -3518,7 +3633,7 @@
"description": "GettableAlerts gettable alerts",
"type": "array",
"items": {
"$ref": "#/definitions/GettableAlert"
"$ref": "#/definitions/gettableAlert"
},
"x-go-name": "GettableAlerts",
"x-go-package": "github.com/prometheus/alertmanager/api/v2/models"
@ -3584,7 +3699,7 @@
"description": "GettableSilences gettable silences",
"type": "array",
"items": {
"$ref": "#/definitions/gettableSilence"
"$ref": "#/definitions/GettableSilence"
},
"x-go-name": "GettableSilences",
"x-go-package": "github.com/prometheus/alertmanager/api/v2/models"

View File

@ -257,3 +257,8 @@ func ErrResp(status int, err error, msg string, args ...interface{}) *response.N
}
return response.Error(status, err.Error(), nil)
}
// accessForbiddenResp creates a response of forbidden access.
func accessForbiddenResp() response.Response {
return ErrResp(http.StatusForbidden, errors.New("Permission denied"), "")
}

View File

@ -0,0 +1,24 @@
package models
import (
"crypto/sha256"
"fmt"
)
// AdminConfiguration represents the ngalert administration configuration settings.
type AdminConfiguration struct {
ID int64 `xorm:"pk autoincr 'id'"`
OrgID int64 `xorm:"org_id"`
// List of Alertmanager(s) URL to push alerts to.
Alertmanagers []string
CreatedAt int64 `xorm:"created"`
UpdatedAt int64 `xorm:"updated"`
}
func (ac *AdminConfiguration) AsSHA256() string {
h := sha256.New()
_, _ = h.Write([]byte(fmt.Sprintf("%v", ac.Alertmanagers)))
return fmt.Sprintf("%x", h.Sum(nil))
}

View File

@ -35,24 +35,27 @@ const (
// with intervals that are not exactly divided by this number
// not to be evaluated
baseIntervalSeconds = 10
// default alert definiiton interval
// default alert definition interval
defaultIntervalSeconds int64 = 6 * baseIntervalSeconds
)
// AlertNG is the service for evaluating the condition of an alert definition.
type AlertNG struct {
Cfg *setting.Cfg `inject:""`
Cfg *setting.Cfg `inject:""`
Log log.Logger
Metrics *metrics.Metrics `inject:""`
DatasourceCache datasources.CacheService `inject:""`
RouteRegister routing.RouteRegister `inject:""`
SQLStore *sqlstore.SQLStore `inject:""`
DataService *tsdb.Service `inject:""`
DataProxy *datasourceproxy.DatasourceProxyService `inject:""`
QuotaService *quota.QuotaService `inject:""`
Metrics *metrics.Metrics `inject:""`
Log log.Logger
schedule schedule.ScheduleService
stateManager *state.Manager
Alertmanager *notifier.Alertmanager
// Alerting notification services
Alertmanager *notifier.Alertmanager
}
func init() {
@ -78,32 +81,34 @@ func (ng *AlertNG) Init() error {
}
schedCfg := schedule.SchedulerCfg{
C: clock.New(),
BaseInterval: baseInterval,
Logger: ng.Log,
MaxAttempts: maxAttempts,
Evaluator: eval.Evaluator{Cfg: ng.Cfg, Log: ng.Log},
InstanceStore: store,
RuleStore: store,
Notifier: ng.Alertmanager,
Metrics: ng.Metrics,
C: clock.New(),
BaseInterval: baseInterval,
Logger: log.New("ngalert.scheduler"),
MaxAttempts: maxAttempts,
Evaluator: eval.Evaluator{Cfg: ng.Cfg, Log: ng.Log},
InstanceStore: store,
RuleStore: store,
AdminConfigStore: store,
Notifier: ng.Alertmanager,
Metrics: ng.Metrics,
}
ng.stateManager = state.NewManager(ng.Log, ng.Metrics, store, store)
ng.schedule = schedule.NewScheduler(schedCfg, ng.DataService, ng.Cfg.AppURL, ng.stateManager)
api := api.API{
Cfg: ng.Cfg,
DatasourceCache: ng.DatasourceCache,
RouteRegister: ng.RouteRegister,
DataService: ng.DataService,
Schedule: ng.schedule,
DataProxy: ng.DataProxy,
QuotaService: ng.QuotaService,
InstanceStore: store,
RuleStore: store,
AlertingStore: store,
Alertmanager: ng.Alertmanager,
StateManager: ng.stateManager,
Cfg: ng.Cfg,
DatasourceCache: ng.DatasourceCache,
RouteRegister: ng.RouteRegister,
DataService: ng.DataService,
Schedule: ng.schedule,
DataProxy: ng.DataProxy,
QuotaService: ng.QuotaService,
InstanceStore: store,
RuleStore: store,
AlertingStore: store,
AdminConfigStore: store,
Alertmanager: ng.Alertmanager,
StateManager: ng.stateManager,
}
api.RegisterAPIEndpoints(ng.Metrics)

View File

@ -511,6 +511,7 @@ func (am *Alertmanager) PutAlerts(postableAlerts apimodels.PostableAlerts) error
},
UpdatedAt: now,
}
for k, v := range a.Labels {
if len(v) == 0 || k == ngmodels.NamespaceUIDLabel { // Skip empty and namespace UID labels.
continue

View File

@ -3,26 +3,31 @@ package schedule
import (
"context"
"fmt"
"net/url"
"sync"
"time"
"github.com/benbjohnson/clock"
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
"golang.org/x/sync/errgroup"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/alerting"
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/sender"
"github.com/grafana/grafana/pkg/services/ngalert/state"
"github.com/grafana/grafana/pkg/services/ngalert/store"
"github.com/grafana/grafana/pkg/tsdb"
"github.com/benbjohnson/clock"
"golang.org/x/sync/errgroup"
)
// timeNow makes it possible to test usage of time
var timeNow = time.Now
// AdminConfigPollingInterval of how often we sync admin configuration.
var AdminConfigPollingInterval = 1 * time.Minute
// ScheduleService handles scheduling
type ScheduleService interface {
Run(context.Context) error
@ -41,8 +46,6 @@ type Notifier interface {
}
type schedule struct {
wg sync.WaitGroup
// base tick rate (fastest possible configured check)
baseInterval time.Duration
@ -69,9 +72,10 @@ type schedule struct {
evaluator eval.Evaluator
ruleStore store.RuleStore
instanceStore store.InstanceStore
dataService *tsdb.Service
ruleStore store.RuleStore
instanceStore store.InstanceStore
adminConfigStore store.AdminConfigurationStore
dataService *tsdb.Service
stateManager *state.Manager
@ -79,43 +83,52 @@ type schedule struct {
notifier Notifier
metrics *metrics.Metrics
// Senders help us send alerts to external Alertmanagers.
sendersMtx sync.RWMutex
sendersCfgHash map[int64]string
senders map[int64]*sender.Sender
}
// SchedulerCfg is the scheduler configuration.
type SchedulerCfg struct {
C clock.Clock
BaseInterval time.Duration
Logger log.Logger
EvalAppliedFunc func(models.AlertRuleKey, time.Time)
MaxAttempts int64
StopAppliedFunc func(models.AlertRuleKey)
Evaluator eval.Evaluator
RuleStore store.RuleStore
InstanceStore store.InstanceStore
Notifier Notifier
Metrics *metrics.Metrics
C clock.Clock
BaseInterval time.Duration
Logger log.Logger
EvalAppliedFunc func(models.AlertRuleKey, time.Time)
MaxAttempts int64
StopAppliedFunc func(models.AlertRuleKey)
Evaluator eval.Evaluator
RuleStore store.RuleStore
InstanceStore store.InstanceStore
AdminConfigStore store.AdminConfigurationStore
Notifier Notifier
Metrics *metrics.Metrics
}
// NewScheduler returns a new schedule.
func NewScheduler(cfg SchedulerCfg, dataService *tsdb.Service, appURL string, stateManager *state.Manager) *schedule {
ticker := alerting.NewTicker(cfg.C.Now(), time.Second*0, cfg.C, int64(cfg.BaseInterval.Seconds()))
sch := schedule{
registry: alertRuleRegistry{alertRuleInfo: make(map[models.AlertRuleKey]alertRuleInfo)},
maxAttempts: cfg.MaxAttempts,
clock: cfg.C,
baseInterval: cfg.BaseInterval,
log: cfg.Logger,
heartbeat: ticker,
evalAppliedFunc: cfg.EvalAppliedFunc,
stopAppliedFunc: cfg.StopAppliedFunc,
evaluator: cfg.Evaluator,
ruleStore: cfg.RuleStore,
instanceStore: cfg.InstanceStore,
dataService: dataService,
notifier: cfg.Notifier,
metrics: cfg.Metrics,
appURL: appURL,
stateManager: stateManager,
registry: alertRuleRegistry{alertRuleInfo: make(map[models.AlertRuleKey]alertRuleInfo)},
maxAttempts: cfg.MaxAttempts,
clock: cfg.C,
baseInterval: cfg.BaseInterval,
log: cfg.Logger,
heartbeat: ticker,
evalAppliedFunc: cfg.EvalAppliedFunc,
stopAppliedFunc: cfg.StopAppliedFunc,
evaluator: cfg.Evaluator,
ruleStore: cfg.RuleStore,
instanceStore: cfg.InstanceStore,
dataService: dataService,
adminConfigStore: cfg.AdminConfigStore,
notifier: cfg.Notifier,
metrics: cfg.Metrics,
appURL: appURL,
stateManager: stateManager,
senders: map[int64]*sender.Sender{},
sendersCfgHash: map[int64]string{},
}
return &sch
}
@ -139,21 +152,163 @@ func (sch *schedule) Unpause() error {
}
func (sch *schedule) Run(ctx context.Context) error {
sch.wg.Add(1)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
if err := sch.ruleEvaluationLoop(ctx); err != nil {
sch.log.Error("failure while running the rule evaluation loop", "err", err)
}
}()
sch.wg.Wait()
go func() {
defer wg.Done()
if err := sch.adminConfigSync(ctx); err != nil {
sch.log.Error("failure while running the admin configuration sync", "err", err)
}
}()
wg.Wait()
return nil
}
func (sch *schedule) ruleEvaluationLoop(ctx context.Context) error {
defer sch.wg.Done()
// SyncAndApplyConfigFromDatabase looks for the admin configuration in the database and adjusts the sender(s) accordingly.
func (sch *schedule) SyncAndApplyConfigFromDatabase() error {
sch.log.Debug("start of admin configuration sync")
cfgs, err := sch.adminConfigStore.GetAdminConfigurations()
if err != nil {
return err
}
sch.log.Debug("found admin configurations", "count", len(cfgs))
orgsFound := make(map[int64]struct{}, len(cfgs))
sch.sendersMtx.Lock()
for _, cfg := range cfgs {
orgsFound[cfg.OrgID] = struct{}{} // keep track of the which senders we need to keep.
existing, ok := sch.senders[cfg.OrgID]
// If the tenant has no Alertmanager(s) configured and no running sender no-op.
if !ok && len(cfg.Alertmanagers) == 0 {
sch.log.Debug("no external alertmanagers configured", "org", cfg.OrgID)
continue
}
// We have a running sender but no Alertmanager(s) configured, shut it down.
if ok && len(cfg.Alertmanagers) == 0 {
sch.log.Debug("no external alertmanager(s) configured, sender will be stopped", "org", cfg.OrgID)
delete(orgsFound, cfg.OrgID)
continue
}
// We have a running sender, check if we need to apply a new config.
if ok {
if sch.sendersCfgHash[cfg.OrgID] == cfg.AsSHA256() {
sch.log.Debug("sender configuration is the same as the one running, no-op", "org", cfg.OrgID, "alertmanagers", cfg.Alertmanagers)
continue
}
sch.log.Debug("applying new configuration to sender", "org", cfg.OrgID, "alertmanagers", cfg.Alertmanagers)
err := existing.ApplyConfig(cfg)
if err != nil {
sch.log.Error("failed to apply configuration", "err", err, "org", cfg.OrgID)
continue
}
sch.sendersCfgHash[cfg.OrgID] = cfg.AsSHA256()
continue
}
// No sender and have Alertmanager(s) to send to - start a new one.
sch.log.Info("creating new sender for the external alertmanagers", "org", cfg.OrgID, "alertmanagers", cfg.Alertmanagers)
s, err := sender.New(sch.metrics)
if err != nil {
sch.log.Error("unable to start the sender", "err", err, "org", cfg.OrgID)
continue
}
sch.senders[cfg.OrgID] = s
s.Run()
err = s.ApplyConfig(cfg)
if err != nil {
sch.log.Error("failed to apply configuration", "err", err, "org", cfg.OrgID)
continue
}
sch.sendersCfgHash[cfg.OrgID] = cfg.AsSHA256()
}
sendersToStop := map[int64]*sender.Sender{}
for orgID, s := range sch.senders {
if _, exists := orgsFound[orgID]; !exists {
sendersToStop[orgID] = s
delete(sch.senders, orgID)
delete(sch.sendersCfgHash, orgID)
}
}
sch.sendersMtx.Unlock()
// We can now stop these senders w/o having to hold a lock.
for orgID, s := range sendersToStop {
sch.log.Info("stopping sender", "org", orgID)
s.Stop()
sch.log.Info("stopped sender", "org", orgID)
}
sch.log.Debug("finish of admin configuration sync")
return nil
}
// AlertmanagersFor returns all the discovered Alertmanager(s) for a particular organization.
func (sch *schedule) AlertmanagersFor(orgID int64) []*url.URL {
sch.sendersMtx.RLock()
defer sch.sendersMtx.RUnlock()
s, ok := sch.senders[orgID]
if !ok {
return []*url.URL{}
}
return s.Alertmanagers()
}
// DroppedAlertmanagersFor returns all the dropped Alertmanager(s) for a particular organization.
func (sch *schedule) DroppedAlertmanagersFor(orgID int64) []*url.URL {
sch.sendersMtx.RLock()
defer sch.sendersMtx.RUnlock()
s, ok := sch.senders[orgID]
if !ok {
return []*url.URL{}
}
return s.DroppedAlertmanagers()
}
func (sch *schedule) adminConfigSync(ctx context.Context) error {
for {
select {
case <-time.After(AdminConfigPollingInterval):
if err := sch.SyncAndApplyConfigFromDatabase(); err != nil {
sch.log.Error("unable to sync admin configuration", "err", err)
}
case <-ctx.Done():
// Stop sending alerts to all external Alertmanager(s).
sch.sendersMtx.Lock()
for orgID, s := range sch.senders {
delete(sch.senders, orgID) // delete before we stop to make sure we don't accept any more alerts.
s.Stop()
}
sch.sendersMtx.Unlock()
return nil
}
}
}
func (sch *schedule) ruleEvaluationLoop(ctx context.Context) error {
dispatcherGroup, ctx := errgroup.WithContext(ctx)
for {
select {
@ -172,6 +327,7 @@ func (sch *schedule) ruleEvaluationLoop(ctx context.Context) error {
key models.AlertRuleKey
ruleInfo alertRuleInfo
}
readyToRun := make([]readyToRunItem, 0)
for _, item := range alertRules {
key := item.GetKey()
@ -297,10 +453,19 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul
sch.saveAlertStates(processedStates)
alerts := FromAlertStateToPostableAlerts(sch.log, processedStates, sch.stateManager, sch.appURL)
sch.log.Debug("sending alerts to notifier", "count", len(alerts.PostableAlerts), "alerts", alerts.PostableAlerts)
err = sch.sendAlerts(alerts)
err = sch.notifier.PutAlerts(alerts)
if err != nil {
sch.log.Error("failed to put alerts in the notifier", "count", len(alerts.PostableAlerts), "err", err)
}
// Send alerts to external Alertmanager(s) if we have a sender for this organization.
sch.sendersMtx.RLock()
defer sch.sendersMtx.RUnlock()
s, ok := sch.senders[alertRule.OrgID]
if ok {
s.SendAlerts(alerts)
}
return nil
}
@ -329,10 +494,6 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul
}
}
func (sch *schedule) sendAlerts(alerts apimodels.PostableAlerts) error {
return sch.notifier.PutAlerts(alerts)
}
func (sch *schedule) saveAlertStates(states []*state.State) {
sch.log.Debug("saving alert states", "count", len(states))
for _, s := range states {

View File

@ -8,26 +8,22 @@ import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
"github.com/grafana/grafana/pkg/services/ngalert/tests"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/grafana/pkg/services/ngalert/state"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/ngalert/schedule"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/services/ngalert/schedule"
"github.com/grafana/grafana/pkg/services/ngalert/state"
"github.com/grafana/grafana/pkg/services/ngalert/tests"
"github.com/benbjohnson/clock"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var nilMetrics = metrics.NewMetrics(nil)

View File

@ -0,0 +1,310 @@
package schedule
import (
"context"
"encoding/json"
"fmt"
"math/rand"
"testing"
"time"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/registry"
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/state"
"github.com/grafana/grafana/pkg/services/ngalert/store"
"github.com/grafana/grafana/pkg/setting"
"github.com/benbjohnson/clock"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
)
func TestSendingToExternalAlertmanager(t *testing.T) {
t.Cleanup(registry.ClearOverrides)
fakeAM := newFakeExternalAlertmanager(t)
defer fakeAM.Close()
fakeRuleStore := newFakeRuleStore(t)
fakeInstanceStore := &fakeInstanceStore{}
fakeAdminConfigStore := newFakeAdminConfigStore(t)
// create alert rule with one second interval
alertRule := CreateTestAlertRule(t, fakeRuleStore, 1, 1)
// First, let's create an admin configuration that holds an alertmanager.
adminConfig := &models.AdminConfiguration{OrgID: 1, Alertmanagers: []string{fakeAM.server.URL}}
cmd := store.UpdateAdminConfigurationCmd{AdminConfiguration: adminConfig}
require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd))
sched, mockedClock := setupScheduler(t, fakeRuleStore, fakeInstanceStore, fakeAdminConfigStore)
// Make sure we sync the configuration at least once before the evaluation happens to guarantee the sender is running
// when the first alert triggers.
require.NoError(t, sched.SyncAndApplyConfigFromDatabase())
sched.sendersMtx.Lock()
require.Equal(t, 1, len(sched.senders))
require.Equal(t, 1, len(sched.sendersCfgHash))
sched.sendersMtx.Unlock()
// Then, ensure we've discovered the Alertmanager.
require.Eventually(t, func() bool {
return len(sched.AlertmanagersFor(1)) == 1 && len(sched.DroppedAlertmanagersFor(1)) == 0
}, 10*time.Second, 200*time.Millisecond)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(func() {
cancel()
})
go func() {
AdminConfigPollingInterval = 10 * time.Minute // Do not poll in unit tests.
err := sched.Run(ctx)
require.NoError(t, err)
}()
// With everything up and running, let's advance the time to make sure we get at least one alert iteration.
mockedClock.Add(2 * time.Second)
// Eventually, our Alertmanager should have received at least one alert.
require.Eventually(t, func() bool {
return fakeAM.AlertsCount() >= 1 && fakeAM.AlertNamesCompare([]string{alertRule.Title})
}, 10*time.Second, 200*time.Millisecond)
// Now, let's remove the Alertmanager from the admin configuration.
adminConfig.Alertmanagers = []string{}
cmd = store.UpdateAdminConfigurationCmd{AdminConfiguration: adminConfig}
require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd))
// Again, make sure we sync and verify the senders.
require.NoError(t, sched.SyncAndApplyConfigFromDatabase())
sched.sendersMtx.Lock()
require.Equal(t, 0, len(sched.senders))
require.Equal(t, 0, len(sched.sendersCfgHash))
sched.sendersMtx.Unlock()
// Then, ensure we've dropped the Alertmanager.
require.Eventually(t, func() bool {
return len(sched.AlertmanagersFor(1)) == 0 && len(sched.DroppedAlertmanagersFor(1)) == 0
}, 10*time.Second, 200*time.Millisecond)
}
func TestSendingToExternalAlertmanager_WithMultipleOrgs(t *testing.T) {
t.Cleanup(registry.ClearOverrides)
fakeAM := newFakeExternalAlertmanager(t)
defer fakeAM.Close()
fakeRuleStore := newFakeRuleStore(t)
fakeInstanceStore := &fakeInstanceStore{}
fakeAdminConfigStore := newFakeAdminConfigStore(t)
// Create two alert rules with one second interval.
alertRuleOrgOne := CreateTestAlertRule(t, fakeRuleStore, 1, 1)
alertRuleOrgTwo := CreateTestAlertRule(t, fakeRuleStore, 1, 2)
// First, let's create an admin configuration that holds an alertmanager.
adminConfig := &models.AdminConfiguration{OrgID: 1, Alertmanagers: []string{fakeAM.server.URL}}
cmd := store.UpdateAdminConfigurationCmd{AdminConfiguration: adminConfig}
require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd))
sched, mockedClock := setupScheduler(t, fakeRuleStore, fakeInstanceStore, fakeAdminConfigStore)
// Make sure we sync the configuration at least once before the evaluation happens to guarantee the sender is running
// when the first alert triggers.
require.NoError(t, sched.SyncAndApplyConfigFromDatabase())
sched.sendersMtx.Lock()
require.Equal(t, 1, len(sched.senders))
require.Equal(t, 1, len(sched.sendersCfgHash))
sched.sendersMtx.Unlock()
// Then, ensure we've discovered the Alertmanager.
require.Eventually(t, func() bool {
return len(sched.AlertmanagersFor(1)) == 1 && len(sched.DroppedAlertmanagersFor(1)) == 0
}, 10*time.Second, 200*time.Millisecond)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(func() {
cancel()
})
go func() {
AdminConfigPollingInterval = 10 * time.Minute // Do not poll in unit tests.
err := sched.Run(ctx)
require.NoError(t, err)
}()
// 1. Now, let's assume a new org comes along.
adminConfig2 := &models.AdminConfiguration{OrgID: 2, Alertmanagers: []string{fakeAM.server.URL}}
cmd = store.UpdateAdminConfigurationCmd{AdminConfiguration: adminConfig2}
require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd))
// If we sync again, new senders must have spawned.
require.NoError(t, sched.SyncAndApplyConfigFromDatabase())
sched.sendersMtx.Lock()
require.Equal(t, 2, len(sched.senders))
require.Equal(t, 2, len(sched.sendersCfgHash))
sched.sendersMtx.Unlock()
// Then, ensure we've discovered the Alertmanager for the new organization.
require.Eventually(t, func() bool {
return len(sched.AlertmanagersFor(2)) == 1 && len(sched.DroppedAlertmanagersFor(2)) == 0
}, 10*time.Second, 200*time.Millisecond)
// With everything up and running, let's advance the time to make sure we get at least one alert iteration.
mockedClock.Add(2 * time.Second)
// Eventually, our Alertmanager should have received at least two alerts.
require.Eventually(t, func() bool {
return fakeAM.AlertsCount() == 2 && fakeAM.AlertNamesCompare([]string{alertRuleOrgOne.Title, alertRuleOrgTwo.Title})
}, 20*time.Second, 200*time.Millisecond)
// 2. Next, let's modify the configuration of an organization by adding an extra alertmanager.
fakeAM2 := newFakeExternalAlertmanager(t)
adminConfig2 = &models.AdminConfiguration{OrgID: 2, Alertmanagers: []string{fakeAM.server.URL, fakeAM2.server.URL}}
cmd = store.UpdateAdminConfigurationCmd{AdminConfiguration: adminConfig2}
require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd))
// Before we sync, let's grab the existing hash of this particular org.
sched.sendersMtx.Lock()
currentHash := sched.sendersCfgHash[2]
sched.sendersMtx.Unlock()
// Now, sync again.
require.NoError(t, sched.SyncAndApplyConfigFromDatabase())
// The hash for org two should not be the same and we should still have two senders.
sched.sendersMtx.Lock()
require.NotEqual(t, sched.sendersCfgHash[2], currentHash)
require.Equal(t, 2, len(sched.senders))
require.Equal(t, 2, len(sched.sendersCfgHash))
sched.sendersMtx.Unlock()
// Wait for the discovery of the new Alertmanager for orgID = 2.
require.Eventually(t, func() bool {
return len(sched.AlertmanagersFor(2)) == 2 && len(sched.DroppedAlertmanagersFor(2)) == 0
}, 10*time.Second, 200*time.Millisecond)
// 3. Now, let's provide a configuration that fails for OrgID = 1.
adminConfig2 = &models.AdminConfiguration{OrgID: 1, Alertmanagers: []string{"123://invalid.org"}}
cmd = store.UpdateAdminConfigurationCmd{AdminConfiguration: adminConfig2}
require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd))
// Before we sync, let's get the current config hash.
sched.sendersMtx.Lock()
currentHash = sched.sendersCfgHash[1]
sched.sendersMtx.Unlock()
// Now, sync again.
require.NoError(t, sched.SyncAndApplyConfigFromDatabase())
// The old configuration should still be running.
sched.sendersMtx.Lock()
require.Equal(t, sched.sendersCfgHash[1], currentHash)
sched.sendersMtx.Unlock()
require.Equal(t, 1, len(sched.AlertmanagersFor(1)))
// If we fix it - it should be applied.
adminConfig2 = &models.AdminConfiguration{OrgID: 1, Alertmanagers: []string{"notarealalertmanager:3030"}}
cmd = store.UpdateAdminConfigurationCmd{AdminConfiguration: adminConfig2}
require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd))
require.NoError(t, sched.SyncAndApplyConfigFromDatabase())
sched.sendersMtx.Lock()
require.NotEqual(t, sched.sendersCfgHash[1], currentHash)
sched.sendersMtx.Unlock()
// Finally, remove everything.
require.NoError(t, fakeAdminConfigStore.DeleteAdminConfiguration(1))
require.NoError(t, fakeAdminConfigStore.DeleteAdminConfiguration(2))
require.NoError(t, sched.SyncAndApplyConfigFromDatabase())
sched.sendersMtx.Lock()
require.Equal(t, 0, len(sched.senders))
require.Equal(t, 0, len(sched.sendersCfgHash))
sched.sendersMtx.Unlock()
require.Eventually(t, func() bool {
NoAlertmanagerOrgOne := len(sched.AlertmanagersFor(1)) == 0 && len(sched.DroppedAlertmanagersFor(1)) == 0
NoAlertmanagerOrgTwo := len(sched.AlertmanagersFor(2)) == 0 && len(sched.DroppedAlertmanagersFor(2)) == 0
return NoAlertmanagerOrgOne && NoAlertmanagerOrgTwo
}, 10*time.Second, 200*time.Millisecond)
}
func setupScheduler(t *testing.T, rs store.RuleStore, is store.InstanceStore, acs store.AdminConfigurationStore) (*schedule, *clock.Mock) {
t.Helper()
mockedClock := clock.NewMock()
logger := log.New("ngalert schedule test")
nilMetrics := metrics.NewMetrics(nil)
schedCfg := SchedulerCfg{
C: mockedClock,
BaseInterval: time.Second,
MaxAttempts: 1,
Evaluator: eval.Evaluator{Cfg: &setting.Cfg{ExpressionsEnabled: true}, Log: logger},
RuleStore: rs,
InstanceStore: is,
AdminConfigStore: acs,
Notifier: &fakeNotifier{},
Logger: logger,
Metrics: metrics.NewMetrics(prometheus.NewRegistry()),
}
st := state.NewManager(schedCfg.Logger, nilMetrics, rs, is)
return NewScheduler(schedCfg, nil, "http://localhost", st), mockedClock
}
// createTestAlertRule creates a dummy alert definition to be used by the tests.
func CreateTestAlertRule(t *testing.T, dbstore *fakeRuleStore, intervalSeconds int64, orgID int64) *models.AlertRule {
t.Helper()
d := rand.Intn(1000)
ruleGroup := fmt.Sprintf("ruleGroup-%d", d)
err := dbstore.UpdateRuleGroup(store.UpdateRuleGroupCmd{
OrgID: orgID,
NamespaceUID: "namespace",
RuleGroupConfig: apimodels.PostableRuleGroupConfig{
Name: ruleGroup,
Interval: model.Duration(time.Duration(intervalSeconds) * time.Second),
Rules: []apimodels.PostableExtendedRuleNode{
{
ApiRuleNode: &apimodels.ApiRuleNode{
Annotations: map[string]string{"testAnnoKey": "testAnnoValue"},
},
GrafanaManagedAlert: &apimodels.PostableGrafanaRule{
Title: fmt.Sprintf("an alert definition %d", d),
Condition: "A",
Data: []models.AlertQuery{
{
DatasourceUID: "-100",
Model: json.RawMessage(`{
"datasourceUid": "-100",
"type":"math",
"expression":"2 + 2 > 1"
}`),
RelativeTimeRange: models.RelativeTimeRange{
From: models.Duration(5 * time.Hour),
To: models.Duration(3 * time.Hour),
},
RefID: "A",
},
},
},
},
},
},
})
require.NoError(t, err)
q := models.ListRuleGroupAlertRulesQuery{
OrgID: orgID,
NamespaceUID: "namespace",
RuleGroup: ruleGroup,
}
err = dbstore.GetRuleGroupAlertRules(&q)
require.NoError(t, err)
require.NotEmpty(t, q.Result)
rule := q.Result[0]
t.Logf("alert definition: %v with interval: %d created", rule.GetKey(), rule.IntervalSeconds)
return rule
}

View File

@ -0,0 +1,307 @@
package schedule
import (
"encoding/json"
"io/ioutil"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
models2 "github.com/grafana/grafana/pkg/models"
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/store"
"github.com/grafana/grafana/pkg/util"
amv2 "github.com/prometheus/alertmanager/api/v2/models"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func newFakeRuleStore(t *testing.T) *fakeRuleStore {
return &fakeRuleStore{t: t, rules: map[int64]map[string]map[string][]*models.AlertRule{}}
}
// FakeRuleStore mocks the RuleStore of the scheduler.
type fakeRuleStore struct {
t *testing.T
mtx sync.Mutex
rules map[int64]map[string]map[string][]*models.AlertRule
}
func (f *fakeRuleStore) DeleteAlertRuleByUID(_ int64, _ string) error { return nil }
func (f *fakeRuleStore) DeleteNamespaceAlertRules(_ int64, _ string) ([]string, error) {
return []string{}, nil
}
func (f *fakeRuleStore) DeleteRuleGroupAlertRules(_ int64, _ string, _ string) ([]string, error) {
return []string{}, nil
}
func (f *fakeRuleStore) DeleteAlertInstancesByRuleUID(_ int64, _ string) error { return nil }
func (f *fakeRuleStore) GetAlertRuleByUID(q *models.GetAlertRuleByUIDQuery) error {
f.mtx.Lock()
defer f.mtx.Unlock()
rgs, ok := f.rules[q.OrgID]
if !ok {
return nil
}
for _, rg := range rgs {
for _, rules := range rg {
for _, r := range rules {
if r.UID == q.UID {
q.Result = r
break
}
}
}
}
return nil
}
// For now, we're not implementing namespace filtering.
func (f *fakeRuleStore) GetAlertRulesForScheduling(q *models.ListAlertRulesQuery) error {
f.mtx.Lock()
defer f.mtx.Unlock()
for _, rg := range f.rules {
for _, n := range rg {
for _, r := range n {
q.Result = append(q.Result, r...)
}
}
}
return nil
}
func (f *fakeRuleStore) GetOrgAlertRules(_ *models.ListAlertRulesQuery) error { return nil }
func (f *fakeRuleStore) GetNamespaceAlertRules(_ *models.ListNamespaceAlertRulesQuery) error {
return nil
}
func (f *fakeRuleStore) GetRuleGroupAlertRules(q *models.ListRuleGroupAlertRulesQuery) error {
f.mtx.Lock()
defer f.mtx.Unlock()
rgs, ok := f.rules[q.OrgID]
if !ok {
return nil
}
rg, ok := rgs[q.RuleGroup]
if !ok {
return nil
}
if q.NamespaceUID != "" {
r, ok := rg[q.NamespaceUID]
if !ok {
return nil
}
q.Result = r
return nil
}
for _, r := range rg {
q.Result = append(q.Result, r...)
}
return nil
}
func (f *fakeRuleStore) GetNamespaces(_ int64, _ *models2.SignedInUser) (map[string]*models2.Folder, error) {
return nil, nil
}
func (f *fakeRuleStore) GetNamespaceByTitle(_ string, _ int64, _ *models2.SignedInUser, _ bool) (*models2.Folder, error) {
return nil, nil
}
func (f *fakeRuleStore) GetOrgRuleGroups(_ *models.ListOrgRuleGroupsQuery) error { return nil }
func (f *fakeRuleStore) UpsertAlertRules(_ []store.UpsertRule) error { return nil }
func (f *fakeRuleStore) UpdateRuleGroup(cmd store.UpdateRuleGroupCmd) error {
f.mtx.Lock()
defer f.mtx.Unlock()
rgs, ok := f.rules[cmd.OrgID]
if !ok {
f.rules[cmd.OrgID] = map[string]map[string][]*models.AlertRule{}
}
rg, ok := rgs[cmd.RuleGroupConfig.Name]
if !ok {
f.rules[cmd.OrgID][cmd.RuleGroupConfig.Name] = map[string][]*models.AlertRule{}
}
_, ok = rg[cmd.NamespaceUID]
if !ok {
f.rules[cmd.OrgID][cmd.RuleGroupConfig.Name][cmd.NamespaceUID] = []*models.AlertRule{}
}
rules := []*models.AlertRule{}
for _, r := range cmd.RuleGroupConfig.Rules {
//TODO: Not sure why this is not being set properly, where is the code that sets this?
for i := range r.GrafanaManagedAlert.Data {
r.GrafanaManagedAlert.Data[i].DatasourceUID = "-100"
}
new := &models.AlertRule{
OrgID: cmd.OrgID,
Title: r.GrafanaManagedAlert.Title,
Condition: r.GrafanaManagedAlert.Condition,
Data: r.GrafanaManagedAlert.Data,
UID: util.GenerateShortUID(),
IntervalSeconds: int64(time.Duration(cmd.RuleGroupConfig.Interval).Seconds()),
NamespaceUID: cmd.NamespaceUID,
RuleGroup: cmd.RuleGroupConfig.Name,
NoDataState: models.NoDataState(r.GrafanaManagedAlert.NoDataState),
ExecErrState: models.ExecutionErrorState(r.GrafanaManagedAlert.ExecErrState),
Version: 1,
}
if r.ApiRuleNode != nil {
new.For = time.Duration(r.ApiRuleNode.For)
new.Annotations = r.ApiRuleNode.Annotations
new.Labels = r.ApiRuleNode.Labels
}
if new.NoDataState == "" {
new.NoDataState = models.NoData
}
if new.ExecErrState == "" {
new.ExecErrState = models.AlertingErrState
}
err := new.PreSave(time.Now)
require.NoError(f.t, err)
rules = append(rules, new)
}
f.rules[cmd.OrgID][cmd.RuleGroupConfig.Name][cmd.NamespaceUID] = rules
return nil
}
type fakeInstanceStore struct{}
func (f *fakeInstanceStore) GetAlertInstance(_ *models.GetAlertInstanceQuery) error { return nil }
func (f *fakeInstanceStore) ListAlertInstances(_ *models.ListAlertInstancesQuery) error { return nil }
func (f *fakeInstanceStore) SaveAlertInstance(_ *models.SaveAlertInstanceCommand) error { return nil }
func (f *fakeInstanceStore) FetchOrgIds() ([]int64, error) { return []int64{}, nil }
func (f *fakeInstanceStore) DeleteAlertInstance(_ int64, _, _ string) error { return nil }
func newFakeAdminConfigStore(t *testing.T) *fakeAdminConfigStore {
t.Helper()
return &fakeAdminConfigStore{configs: map[int64]*models.AdminConfiguration{}}
}
type fakeAdminConfigStore struct {
mtx sync.Mutex
configs map[int64]*models.AdminConfiguration
}
func (f *fakeAdminConfigStore) GetAdminConfiguration(orgID int64) (*models.AdminConfiguration, error) {
f.mtx.Lock()
defer f.mtx.Unlock()
return f.configs[orgID], nil
}
func (f *fakeAdminConfigStore) GetAdminConfigurations() ([]*models.AdminConfiguration, error) {
f.mtx.Lock()
defer f.mtx.Unlock()
acs := make([]*models.AdminConfiguration, 0, len(f.configs))
for _, ac := range f.configs {
acs = append(acs, ac)
}
return acs, nil
}
func (f *fakeAdminConfigStore) DeleteAdminConfiguration(orgID int64) error {
f.mtx.Lock()
defer f.mtx.Unlock()
delete(f.configs, orgID)
return nil
}
func (f *fakeAdminConfigStore) UpdateAdminConfiguration(cmd store.UpdateAdminConfigurationCmd) error {
f.mtx.Lock()
defer f.mtx.Unlock()
f.configs[cmd.AdminConfiguration.OrgID] = cmd.AdminConfiguration
return nil
}
// fakeNotifier represents a fake internal Alertmanager.
type fakeNotifier struct{}
func (n *fakeNotifier) PutAlerts(alerts apimodels.PostableAlerts) error {
return nil
}
type fakeExternalAlertmanager struct {
t *testing.T
mtx sync.Mutex
alerts amv2.PostableAlerts
server *httptest.Server
}
func newFakeExternalAlertmanager(t *testing.T) *fakeExternalAlertmanager {
t.Helper()
am := &fakeExternalAlertmanager{
t: t,
alerts: amv2.PostableAlerts{},
}
am.server = httptest.NewServer(http.HandlerFunc(am.Handler()))
return am
}
func (am *fakeExternalAlertmanager) AlertNamesCompare(expected []string) bool {
n := []string{}
alerts := am.Alerts()
if len(expected) != len(alerts) {
return false
}
for _, a := range am.Alerts() {
for k, v := range a.Alert.Labels {
if k == model.AlertNameLabel {
n = append(n, v)
}
}
}
return assert.ObjectsAreEqual(expected, n)
}
func (am *fakeExternalAlertmanager) AlertsCount() int {
am.mtx.Lock()
defer am.mtx.Unlock()
return len(am.alerts)
}
func (am *fakeExternalAlertmanager) Alerts() amv2.PostableAlerts {
am.mtx.Lock()
defer am.mtx.Unlock()
return am.alerts
}
func (am *fakeExternalAlertmanager) Handler() func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
b, err := ioutil.ReadAll(r.Body)
require.NoError(am.t, err)
a := amv2.PostableAlerts{}
require.NoError(am.t, json.Unmarshal(b, &a))
am.mtx.Lock()
am.alerts = append(am.alerts, a...)
am.mtx.Unlock()
}
}
func (am *fakeExternalAlertmanager) Close() {
am.server.Close()
}

View File

@ -0,0 +1,202 @@
package sender
import (
"context"
"net/url"
"strings"
"sync"
"time"
"github.com/grafana/grafana/pkg/infra/log"
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/services/ngalert/logging"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
gokit_log "github.com/go-kit/kit/log"
"github.com/prometheus/alertmanager/api/v2/models"
"github.com/prometheus/client_golang/prometheus"
common_config "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/pkg/labels"
)
const (
defaultMaxQueueCapacity = 10000
defaultTimeout = 10 * time.Second
)
// Sender is responsible for dispatching alert notifications to an external Alertmanager service.
type Sender struct {
logger log.Logger
gokitLogger gokit_log.Logger
wg sync.WaitGroup
manager *notifier.Manager
sdCancel context.CancelFunc
sdManager *discovery.Manager
}
func New(metrics *metrics.Metrics) (*Sender, error) {
l := log.New("sender")
sdCtx, sdCancel := context.WithCancel(context.Background())
s := &Sender{
logger: l,
gokitLogger: gokit_log.NewLogfmtLogger(logging.NewWrapper(l)),
sdCancel: sdCancel,
}
s.manager = notifier.NewManager(
&notifier.Options{QueueCapacity: defaultMaxQueueCapacity, Registerer: prometheus.NewRegistry()},
s.gokitLogger,
)
s.sdManager = discovery.NewManager(sdCtx, s.gokitLogger)
return s, nil
}
// ApplyConfig syncs a configuration with the sender.
func (s *Sender) ApplyConfig(cfg *ngmodels.AdminConfiguration) error {
notifierCfg, err := buildNotifierConfig(cfg)
if err != nil {
return err
}
if err := s.manager.ApplyConfig(notifierCfg); err != nil {
return err
}
sdCfgs := make(map[string]discovery.Configs)
for k, v := range notifierCfg.AlertingConfig.AlertmanagerConfigs.ToMap() {
sdCfgs[k] = v.ServiceDiscoveryConfigs
}
return s.sdManager.ApplyConfig(sdCfgs)
}
func (s *Sender) Run() {
s.wg.Add(2)
go func() {
if err := s.sdManager.Run(); err != nil {
s.logger.Error("failed to start the sender service discovery manager", "err", err)
}
s.wg.Done()
}()
go func() {
s.manager.Run(s.sdManager.SyncCh())
s.wg.Done()
}()
}
// SendAlerts sends a set of alerts to the configured Alertmanager(s).
func (s *Sender) SendAlerts(alerts apimodels.PostableAlerts) {
if len(alerts.PostableAlerts) == 0 {
s.logger.Debug("no alerts to send to external Alertmanager(s)")
return
}
as := make([]*notifier.Alert, 0, len(alerts.PostableAlerts))
for _, a := range alerts.PostableAlerts {
na := alertToNotifierAlert(a)
as = append(as, na)
}
s.logger.Debug("sending alerts to the external Alertmanager(s)", "am_count", len(s.manager.Alertmanagers()), "alert_count", len(as))
s.manager.Send(as...)
}
// Stop shuts down the sender.
func (s *Sender) Stop() {
s.sdCancel()
s.manager.Stop()
s.wg.Wait()
}
// Alertmanagers returns a list of the discovered Alertmanager(s).
func (s *Sender) Alertmanagers() []*url.URL {
return s.manager.Alertmanagers()
}
// DroppedAlertmanagers returns a list of Alertmanager(s) we no longer send alerts to.
func (s *Sender) DroppedAlertmanagers() []*url.URL {
return s.manager.DroppedAlertmanagers()
}
func buildNotifierConfig(cfg *ngmodels.AdminConfiguration) (*config.Config, error) {
amConfigs := make([]*config.AlertmanagerConfig, 0, len(cfg.Alertmanagers))
for _, amURL := range cfg.Alertmanagers {
u, err := url.Parse(amURL)
if err != nil {
return nil, err
}
sdConfig := discovery.Configs{
discovery.StaticConfig{
{
Targets: []model.LabelSet{{model.AddressLabel: model.LabelValue(u.Host)}},
},
},
}
amConfig := &config.AlertmanagerConfig{
APIVersion: config.AlertmanagerAPIVersionV2,
Scheme: u.Scheme,
PathPrefix: u.Path,
Timeout: model.Duration(defaultTimeout),
ServiceDiscoveryConfigs: sdConfig,
}
// Check the URL for basic authentication information first
if u.User != nil {
amConfig.HTTPClientConfig.BasicAuth = &common_config.BasicAuth{
Username: u.User.Username(),
}
if password, isSet := u.User.Password(); isSet {
amConfig.HTTPClientConfig.BasicAuth.Password = common_config.Secret(password)
}
}
amConfigs = append(amConfigs, amConfig)
}
notifierConfig := &config.Config{
AlertingConfig: config.AlertingConfig{
AlertmanagerConfigs: amConfigs,
},
}
return notifierConfig, nil
}
func alertToNotifierAlert(alert models.PostableAlert) *notifier.Alert {
ls := make(labels.Labels, 0, len(alert.Alert.Labels))
a := make(labels.Labels, 0, len(alert.Annotations))
// Prometheus does not allow spaces in labels or annotations while Grafana does, we need to make sure we
// remove them before sending the alerts.
for k, v := range alert.Alert.Labels {
ls = append(ls, labels.Label{Name: removeSpaces(k), Value: v})
}
for k, v := range alert.Annotations {
a = append(a, labels.Label{Name: removeSpaces(k), Value: v})
}
return &notifier.Alert{
Labels: ls,
Annotations: a,
StartsAt: time.Time(alert.StartsAt),
EndsAt: time.Time(alert.EndsAt),
GeneratorURL: alert.Alert.GeneratorURL.String(),
}
}
func removeSpaces(labelName string) string {
return strings.Join(strings.Fields(labelName), "")
}

View File

@ -0,0 +1,92 @@
package store
import (
"context"
"fmt"
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/sqlstore"
)
var (
// ErrNoAdminConfiguration is an error for when no admin configuration is found.
ErrNoAdminConfiguration = fmt.Errorf("no admin configuration available")
)
type UpdateAdminConfigurationCmd struct {
AdminConfiguration *ngmodels.AdminConfiguration
}
type AdminConfigurationStore interface {
GetAdminConfiguration(orgID int64) (*ngmodels.AdminConfiguration, error)
GetAdminConfigurations() ([]*ngmodels.AdminConfiguration, error)
DeleteAdminConfiguration(orgID int64) error
UpdateAdminConfiguration(UpdateAdminConfigurationCmd) error
}
func (st *DBstore) GetAdminConfiguration(orgID int64) (*ngmodels.AdminConfiguration, error) {
cfg := &ngmodels.AdminConfiguration{}
err := st.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
ok, err := sess.Table("ngalert_configuration").Where("org_id = ?", orgID).Get(cfg)
if err != nil {
return err
}
if !ok {
return ErrNoAdminConfiguration
}
return nil
})
if err != nil {
return nil, err
}
return cfg, nil
}
func (st DBstore) GetAdminConfigurations() ([]*ngmodels.AdminConfiguration, error) {
var cfg []*ngmodels.AdminConfiguration
err := st.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
if err := sess.Table("ngalert_configuration").Find(&cfg); err != nil {
return err
}
return nil
})
if err != nil {
return nil, err
}
return cfg, nil
}
func (st DBstore) DeleteAdminConfiguration(orgID int64) error {
return st.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
_, err := sess.Exec("DELETE FROM ngalert_configuration WHERE org_id = ?", orgID)
if err != nil {
return err
}
return nil
})
}
func (st DBstore) UpdateAdminConfiguration(cmd UpdateAdminConfigurationCmd) error {
return st.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
has, err := sess.Table("ngalert_configuration").Where("org_id = ?", cmd.AdminConfiguration.OrgID).Exist()
if err != nil {
return err
}
if !has {
_, err := sess.Table("ngalert_configuration").Insert(cmd.AdminConfiguration)
return err
}
_, err = sess.Table("ngalert_configuration").AllCols().Update(cmd.AdminConfiguration)
return err
})
}

View File

@ -19,6 +19,9 @@ func AddTablesMigrations(mg *migrator.Migrator) {
// Create Alertmanager configurations
AddAlertmanagerConfigMigrations(mg)
// Create Admin Configuration
AddAlertAdminConfigMigrations(mg)
}
// AddAlertDefinitionMigrations should not be modified.
@ -265,3 +268,23 @@ func AddAlertmanagerConfigMigrations(mg *migrator.Migrator) {
mg.AddMigration("alert alert_configuration alertmanager_configuration column from TEXT to MEDIUMTEXT if mysql", migrator.NewRawSQLMigration("").
Mysql("ALTER TABLE alert_configuration MODIFY alertmanager_configuration MEDIUMTEXT;"))
}
func AddAlertAdminConfigMigrations(mg *migrator.Migrator) {
adminConfiguration := migrator.Table{
Name: "ngalert_configuration",
Columns: []*migrator.Column{
{Name: "id", Type: migrator.DB_BigInt, IsPrimaryKey: true, IsAutoIncrement: true},
{Name: "org_id", Type: migrator.DB_BigInt, Nullable: false},
{Name: "alertmanagers", Type: migrator.DB_Text, Nullable: true},
{Name: "created_at", Type: migrator.DB_Int, Nullable: false},
{Name: "updated_at", Type: migrator.DB_Int, Nullable: false},
},
Indices: []*migrator.Index{
{Cols: []string{"org_id"}, Type: migrator.UniqueIndex},
},
}
mg.AddMigration("create_ngalert_configuration_table", migrator.NewAddTableMigration(adminConfiguration))
mg.AddMigration("add index in ngalert_configuration on org_id column", migrator.NewAddIndexMigration(adminConfiguration, adminConfiguration.Indices[0]))
}