Extricates reusable utilities for different alerting proxy types (#32268)

* backendtype helper

* abstracts alertingproxy

* updates alerting api dep

* prom endpoints
This commit is contained in:
Owen Diehl 2021-03-24 07:43:25 -04:00 committed by GitHub
parent 376ed8a381
commit 2179a2658e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 239 additions and 112 deletions

2
go.mod
View File

@ -39,7 +39,7 @@ require (
github.com/google/go-cmp v0.5.5
github.com/google/uuid v1.2.0
github.com/gosimple/slug v1.9.0
github.com/grafana/alerting-api v0.0.0-20210323142651-d6515052e2f0
github.com/grafana/alerting-api v0.0.0-20210323194814-03a29a4c4c27
github.com/grafana/grafana-aws-sdk v0.2.0
github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4
github.com/grafana/grafana-plugin-sdk-go v0.89.0

8
go.sum
View File

@ -797,12 +797,8 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gosimple/slug v1.9.0 h1:r5vDcYrFz9BmfIAMC829un9hq7hKM4cHUrsv36LbEqs=
github.com/gosimple/slug v1.9.0/go.mod h1:AMZ+sOVe65uByN3kgEyf9WEBKBCSS+dJjMX9x4vDJbg=
github.com/grafana/alerting-api v0.0.0-20210318231719-9499804fc548 h1:KjyaZJhPJ15Ul/+OQr8mbO7kDpU5i7G3r5FGVZKClTQ=
github.com/grafana/alerting-api v0.0.0-20210318231719-9499804fc548/go.mod h1:5IppnPguSHcCbVLGCVzVjBvuQZNbYgVJ4KyXXjhCyWY=
github.com/grafana/alerting-api v0.0.0-20210323141138-8873de5bf07a h1:OGKDRdmQSXKFJelrJUf9O8Xh0C8u+OQG1NSurcBYpOI=
github.com/grafana/alerting-api v0.0.0-20210323141138-8873de5bf07a/go.mod h1:5IppnPguSHcCbVLGCVzVjBvuQZNbYgVJ4KyXXjhCyWY=
github.com/grafana/alerting-api v0.0.0-20210323142651-d6515052e2f0 h1:bMYGd71RigZvkLmcdedGdMDJXKJ20luqEQLbqjgAAjI=
github.com/grafana/alerting-api v0.0.0-20210323142651-d6515052e2f0/go.mod h1:5IppnPguSHcCbVLGCVzVjBvuQZNbYgVJ4KyXXjhCyWY=
github.com/grafana/alerting-api v0.0.0-20210323194814-03a29a4c4c27 h1:DuyuEAHJeI+CMxIyzCVhmHcIeK+sjqberhDUfrgd3PY=
github.com/grafana/alerting-api v0.0.0-20210323194814-03a29a4c4c27/go.mod h1:5IppnPguSHcCbVLGCVzVjBvuQZNbYgVJ4KyXXjhCyWY=
github.com/grafana/grafana v1.9.2-0.20210308201921-4ce0a49eac03/go.mod h1:AHRRvd4utJGY25J5nW8aL7wZzn/LcJ0z2za9oOp14j4=
github.com/grafana/grafana-aws-sdk v0.1.0/go.mod h1:+pPo5U+pX0zWimR7YBc7ASeSQfbRkcTyQYqMiAj7G5U=
github.com/grafana/grafana-aws-sdk v0.2.0 h1:UTBBYwye+ad5YUIlwN7TGxLdz1wXN3Ezhl0pseDGRVA=

View File

@ -45,11 +45,18 @@ type API struct {
// RegisterAPIEndpoints registers API handlers
func (api *API) RegisterAPIEndpoints() {
logger := log.New("ngalert.api")
proxy := &AlertingProxy{
DataProxy: api.DataProxy,
}
api.RegisterAlertmanagerApiEndpoints(AlertmanagerApiMock{log: logger})
api.RegisterPrometheusApiEndpoints(PrometheusApiMock{log: logger})
api.RegisterPrometheusApiEndpoints(NewForkedProm(
api.DatasourceCache,
NewLotexProm(proxy, logger),
PrometheusApiMock{log: logger},
))
api.RegisterRulerApiEndpoints(NewForkedRuler(
api.DatasourceCache,
&LotexRuler{DataProxy: api.DataProxy, log: logger},
NewLotexRuler(proxy, logger),
RulerApiMock{log: logger},
))
api.RegisterTestingApiEndpoints(TestingApiMock{log: logger})

View File

@ -2,7 +2,6 @@ package api
import (
"fmt"
"strconv"
apimodels "github.com/grafana/alerting-api/pkg/api"
"github.com/grafana/grafana/pkg/api/response"
@ -24,26 +23,8 @@ func NewForkedRuler(datasourceCache datasources.CacheService, lotex, grafana Rul
}
}
func (r *ForkedRuler) backendType(ctx *models.ReqContext) (apimodels.Backend, error) {
recipient := ctx.Params("Recipient")
if recipient == apimodels.GrafanaBackend.String() {
return apimodels.GrafanaBackend, nil
}
if datasourceID, err := strconv.ParseInt(recipient, 10, 64); err == nil {
if ds, err := r.DatasourceCache.GetDatasource(datasourceID, ctx.SignedInUser, ctx.SkipCache); err == nil {
switch ds.Type {
case "loki", "prometheus":
return apimodels.LoTexRulerBackend, nil
default:
return 0, fmt.Errorf("unexpected backend type (%v)", ds.Type)
}
}
}
return 0, fmt.Errorf("unexpected backend type (%v)", recipient)
}
func (r *ForkedRuler) RouteDeleteNamespaceRulesConfig(ctx *models.ReqContext) response.Response {
t, err := r.backendType(ctx)
t, err := backendType(ctx, r.DatasourceCache)
if err != nil {
return response.Error(400, err.Error(), nil)
}
@ -58,7 +39,7 @@ func (r *ForkedRuler) RouteDeleteNamespaceRulesConfig(ctx *models.ReqContext) re
}
func (r *ForkedRuler) RouteDeleteRuleGroupConfig(ctx *models.ReqContext) response.Response {
t, err := r.backendType(ctx)
t, err := backendType(ctx, r.DatasourceCache)
if err != nil {
return response.Error(400, err.Error(), nil)
}
@ -73,7 +54,7 @@ func (r *ForkedRuler) RouteDeleteRuleGroupConfig(ctx *models.ReqContext) respons
}
func (r *ForkedRuler) RouteGetNamespaceRulesConfig(ctx *models.ReqContext) response.Response {
t, err := r.backendType(ctx)
t, err := backendType(ctx, r.DatasourceCache)
if err != nil {
return response.Error(400, err.Error(), nil)
}
@ -88,7 +69,7 @@ func (r *ForkedRuler) RouteGetNamespaceRulesConfig(ctx *models.ReqContext) respo
}
func (r *ForkedRuler) RouteGetRulegGroupConfig(ctx *models.ReqContext) response.Response {
t, err := r.backendType(ctx)
t, err := backendType(ctx, r.DatasourceCache)
if err != nil {
return response.Error(400, err.Error(), nil)
}
@ -103,7 +84,7 @@ func (r *ForkedRuler) RouteGetRulegGroupConfig(ctx *models.ReqContext) response.
}
func (r *ForkedRuler) RouteGetRulesConfig(ctx *models.ReqContext) response.Response {
t, err := r.backendType(ctx)
t, err := backendType(ctx, r.DatasourceCache)
if err != nil {
return response.Error(400, err.Error(), nil)
}
@ -118,7 +99,7 @@ func (r *ForkedRuler) RouteGetRulesConfig(ctx *models.ReqContext) response.Respo
}
func (r *ForkedRuler) RoutePostNameRulesConfig(ctx *models.ReqContext, conf apimodels.RuleGroupConfig) response.Response {
backendType, err := r.backendType(ctx)
backendType, err := backendType(ctx, r.DatasourceCache)
if err != nil {
return response.Error(400, err.Error(), nil)
}

View File

@ -0,0 +1,55 @@
package api
import (
"fmt"
apimodels "github.com/grafana/alerting-api/pkg/api"
"github.com/grafana/grafana/pkg/api/response"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/datasources"
)
type ForkedPromSvc struct {
ProxySvc, GrafanaSvc PrometheusApiService
DatasourceCache datasources.CacheService
}
func NewForkedProm(datasourceCache datasources.CacheService, proxy, grafana PrometheusApiService) *ForkedPromSvc {
return &ForkedPromSvc{
ProxySvc: proxy,
GrafanaSvc: grafana,
DatasourceCache: datasourceCache,
}
}
func (p *ForkedPromSvc) RouteGetAlertStatuses(ctx *models.ReqContext) response.Response {
t, err := backendType(ctx, p.DatasourceCache)
if err != nil {
return response.Error(400, err.Error(), nil)
}
switch t {
case apimodels.GrafanaBackend:
return p.GrafanaSvc.RouteGetAlertStatuses(ctx)
case apimodels.LoTexRulerBackend:
return p.ProxySvc.RouteGetAlertStatuses(ctx)
default:
return response.Error(400, fmt.Sprintf("unexpected backend type (%v)", t), nil)
}
}
func (p *ForkedPromSvc) RouteGetRuleStatuses(ctx *models.ReqContext) response.Response {
t, err := backendType(ctx, p.DatasourceCache)
if err != nil {
return response.Error(400, err.Error(), nil)
}
switch t {
case apimodels.GrafanaBackend:
return p.GrafanaSvc.RouteGetRuleStatuses(ctx)
case apimodels.LoTexRulerBackend:
return p.ProxySvc.RouteGetRuleStatuses(ctx)
default:
return response.Error(400, fmt.Sprintf("unexpected backend type (%v)", t), nil)
}
}

View File

@ -2,7 +2,6 @@ package api
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
@ -10,96 +9,25 @@ import (
"net/url"
apimodels "github.com/grafana/alerting-api/pkg/api"
"gopkg.in/macaron.v1"
"gopkg.in/yaml.v3"
"github.com/grafana/grafana/pkg/api/response"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/datasourceproxy"
)
const legacyRulerPrefix = "/api/prom/rules"
type LotexRuler struct {
DataProxy *datasourceproxy.DatasourceProxyService
log log.Logger
log log.Logger
*AlertingProxy
}
// macaron unsafely asserts the http.ResponseWriter is an http.CloseNotifier, which will panic.
// Here we impl it, which will ensure this no longer happens, but neither will we take
// advantage cancelling upstream requests when the downstream has closed.
// NB: http.CloseNotifier is a deprecated ifc from before the context pkg.
type safeMacaronWrapper struct {
http.ResponseWriter
}
func (w *safeMacaronWrapper) CloseNotify() <-chan bool {
return make(chan bool)
}
// replacedResponseWriter overwrites the underlying responsewriter used by a *models.ReqContext.
// It's ugly because it needs to replace a value behind a few nested pointers.
func replacedResponseWriter(ctx *models.ReqContext) (*models.ReqContext, *response.NormalResponse) {
resp := response.CreateNormalResponse(make(http.Header), nil, 0)
cpy := *ctx
cpyMCtx := *cpy.Context
cpyMCtx.Resp = macaron.NewResponseWriter(ctx.Req.Method, &safeMacaronWrapper{resp})
cpy.Context = &cpyMCtx
return &cpy, resp
}
// withReq proxies a different request
func (r *LotexRuler) withReq(
ctx *models.ReqContext,
req *http.Request,
extractor func([]byte) (interface{}, error),
) response.Response {
newCtx, resp := replacedResponseWriter(ctx)
newCtx.Req.Request = req
r.DataProxy.ProxyDatasourceRequestWithID(newCtx, ctx.ParamsInt64("Recipient"))
status := resp.Status()
if status >= 400 {
return response.Error(status, string(resp.Body()), nil)
func NewLotexRuler(proxy *AlertingProxy, log log.Logger) *LotexRuler {
return &LotexRuler{
log: log,
AlertingProxy: proxy,
}
t, err := extractor(resp.Body())
if err != nil {
return response.Error(500, err.Error(), nil)
}
b, err := json.Marshal(t)
if err != nil {
return response.Error(500, err.Error(), nil)
}
return response.JSON(status, b)
}
func yamlExtractor(v interface{}) func([]byte) (interface{}, error) {
return func(b []byte) (interface{}, error) {
decoder := yaml.NewDecoder(bytes.NewReader(b))
decoder.KnownFields(true)
err := decoder.Decode(v)
return v, err
}
}
func jsonExtractor(v interface{}) func([]byte) (interface{}, error) {
if v == nil {
// json unmarshal expects a pointer
v = &map[string]interface{}{}
}
return func(b []byte) (interface{}, error) {
return v, json.Unmarshal(b, v)
}
}
func messageExtractor(b []byte) (interface{}, error) {
return map[string]string{"message": string(b)}, nil
}
func (r *LotexRuler) RouteDeleteNamespaceRulesConfig(ctx *models.ReqContext) response.Response {

View File

@ -0,0 +1,51 @@
package api
import (
"net/http"
apimodels "github.com/grafana/alerting-api/pkg/api"
"github.com/grafana/grafana/pkg/api/response"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
)
const (
promRulesPath = "/prometheus/api/v1/rules"
promAlertsPath = "/prometheus/api/v1/alerts"
)
type LotexProm struct {
log log.Logger
*AlertingProxy
}
func NewLotexProm(proxy *AlertingProxy, log log.Logger) *LotexProm {
return &LotexProm{
log: log,
AlertingProxy: proxy,
}
}
func (p *LotexProm) RouteGetAlertStatuses(ctx *models.ReqContext) response.Response {
return p.withReq(
ctx, &http.Request{
URL: withPath(
*ctx.Req.URL,
promAlertsPath,
),
},
jsonExtractor(&apimodels.AlertResponse{}),
)
}
func (p *LotexProm) RouteGetRuleStatuses(ctx *models.ReqContext) response.Response {
return p.withReq(
ctx, &http.Request{
URL: withPath(
*ctx.Req.URL,
promRulesPath,
),
},
jsonExtractor(&apimodels.RuleResponse{}),
)
}

View File

@ -1,10 +1,21 @@
package api
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"regexp"
"strconv"
"github.com/go-openapi/strfmt"
apimodels "github.com/grafana/alerting-api/pkg/api"
"github.com/grafana/grafana/pkg/api/response"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/datasourceproxy"
"github.com/grafana/grafana/pkg/services/datasources"
"gopkg.in/macaron.v1"
"gopkg.in/yaml.v3"
)
var searchRegex = regexp.MustCompile(`\{(\w+)\}`)
@ -27,3 +38,101 @@ func stringPtr(s string) *string {
func boolPtr(b bool) *bool {
return &b
}
func backendType(ctx *models.ReqContext, cache datasources.CacheService) (apimodels.Backend, error) {
recipient := ctx.Params("Recipient")
if recipient == apimodels.GrafanaBackend.String() {
return apimodels.GrafanaBackend, nil
}
if datasourceID, err := strconv.ParseInt(recipient, 10, 64); err == nil {
if ds, err := cache.GetDatasource(datasourceID, ctx.SignedInUser, ctx.SkipCache); err == nil {
switch ds.Type {
case "loki", "prometheus":
return apimodels.LoTexRulerBackend, nil
default:
return 0, fmt.Errorf("unexpected backend type (%v)", ds.Type)
}
}
}
return 0, fmt.Errorf("unexpected backend type (%v)", recipient)
}
// macaron unsafely asserts the http.ResponseWriter is an http.CloseNotifier, which will panic.
// Here we impl it, which will ensure this no longer happens, but neither will we take
// advantage cancelling upstream requests when the downstream has closed.
// NB: http.CloseNotifier is a deprecated ifc from before the context pkg.
type safeMacaronWrapper struct {
http.ResponseWriter
}
func (w *safeMacaronWrapper) CloseNotify() <-chan bool {
return make(chan bool)
}
// replacedResponseWriter overwrites the underlying responsewriter used by a *models.ReqContext.
// It's ugly because it needs to replace a value behind a few nested pointers.
func replacedResponseWriter(ctx *models.ReqContext) (*models.ReqContext, *response.NormalResponse) {
resp := response.CreateNormalResponse(make(http.Header), nil, 0)
cpy := *ctx
cpyMCtx := *cpy.Context
cpyMCtx.Resp = macaron.NewResponseWriter(ctx.Req.Method, &safeMacaronWrapper{resp})
cpy.Context = &cpyMCtx
return &cpy, resp
}
type AlertingProxy struct {
DataProxy *datasourceproxy.DatasourceProxyService
}
// withReq proxies a different request
func (p *AlertingProxy) withReq(
ctx *models.ReqContext,
req *http.Request,
extractor func([]byte) (interface{}, error),
) response.Response {
newCtx, resp := replacedResponseWriter(ctx)
newCtx.Req.Request = req
p.DataProxy.ProxyDatasourceRequestWithID(newCtx, ctx.ParamsInt64("Recipient"))
status := resp.Status()
if status >= 400 {
return response.Error(status, string(resp.Body()), nil)
}
t, err := extractor(resp.Body())
if err != nil {
return response.Error(500, err.Error(), nil)
}
b, err := json.Marshal(t)
if err != nil {
return response.Error(500, err.Error(), nil)
}
return response.JSON(status, b)
}
func yamlExtractor(v interface{}) func([]byte) (interface{}, error) {
return func(b []byte) (interface{}, error) {
decoder := yaml.NewDecoder(bytes.NewReader(b))
decoder.KnownFields(true)
err := decoder.Decode(v)
return v, err
}
}
func jsonExtractor(v interface{}) func([]byte) (interface{}, error) {
if v == nil {
// json unmarshal expects a pointer
v = &map[string]interface{}{}
}
return func(b []byte) (interface{}, error) {
return v, json.Unmarshal(b, v)
}
}
func messageExtractor(b []byte) (interface{}, error) {
return map[string]string{"message": string(b)}, nil
}