diff --git a/pkg/services/alerting/engine.go b/pkg/services/alerting/engine.go index 0f8e24bcef5..22cbe2456b7 100644 --- a/pkg/services/alerting/engine.go +++ b/pkg/services/alerting/engine.go @@ -105,8 +105,9 @@ func (e *AlertingService) runJobDispatcher(grafanaCtx context.Context) error { var ( unfinishedWorkTimeout = time.Second * 5 // TODO: Make alertTimeout and alertMaxAttempts configurable in the config file. - alertTimeout = time.Second * 30 - alertMaxAttempts = 3 + alertTimeout = time.Second * 30 + resultHandleTimeout = time.Second * 30 + alertMaxAttempts = 3 ) func (e *AlertingService) processJobWithRetry(grafanaCtx context.Context, job *Job) error { @@ -116,7 +117,7 @@ func (e *AlertingService) processJobWithRetry(grafanaCtx context.Context, job *J } }() - cancelChan := make(chan context.CancelFunc, alertMaxAttempts) + cancelChan := make(chan context.CancelFunc, alertMaxAttempts*2) attemptChan := make(chan int, 1) // Initialize with first attemptID=1 @@ -204,6 +205,15 @@ func (e *AlertingService) processJob(attemptID int, attemptChan chan int, cancel } } + // create new context with timeout for notifications + resultHandleCtx, resultHandleCancelFn := context.WithTimeout(context.Background(), resultHandleTimeout) + cancelChan <- resultHandleCancelFn + + // override the context used for evaluation with a new context for notifications. + // This makes it possible for notifiers to execute when datasources + // dont respond within the timeout limit. We should rewrite this so notifications + // dont reuse the evalContext and get its own context. + evalContext.Ctx = resultHandleCtx evalContext.Rule.State = evalContext.GetNewState() e.resultHandler.Handle(evalContext) span.Finish() diff --git a/pkg/services/alerting/engine_integration_test.go b/pkg/services/alerting/engine_integration_test.go new file mode 100644 index 00000000000..aa518baae24 --- /dev/null +++ b/pkg/services/alerting/engine_integration_test.go @@ -0,0 +1,148 @@ +// +build integration + +package alerting + +import ( + "context" + "errors" + "net" + "net/http" + "net/http/httptest" + "testing" + "time" + + . "github.com/smartystreets/goconvey/convey" +) + +func TestEngineTimeouts(t *testing.T) { + Convey("Alerting engine timeout tests", t, func() { + engine := NewEngine() + engine.resultHandler = &FakeResultHandler{} + job := &Job{Running: true, Rule: &Rule{}} + + Convey("Should trigger as many retries as needed", func() { + Convey("pended alert for datasource -> result handler should be worked", func() { + // reduce alert timeout to test quickly + originAlertTimeout := alertTimeout + alertTimeout = 2 * time.Second + transportTimeoutInterval := 2 * time.Second + serverBusySleepDuration := 1 * time.Second + + evalHandler := NewFakeCommonTimeoutHandler(transportTimeoutInterval, serverBusySleepDuration) + resultHandler := NewFakeCommonTimeoutHandler(transportTimeoutInterval, serverBusySleepDuration) + engine.evalHandler = evalHandler + engine.resultHandler = resultHandler + + engine.processJobWithRetry(context.TODO(), job) + + So(evalHandler.EvalSucceed, ShouldEqual, true) + So(resultHandler.ResultHandleSucceed, ShouldEqual, true) + + // initialize for other tests. + alertTimeout = originAlertTimeout + engine.resultHandler = &FakeResultHandler{} + }) + }) + }) +} + +type FakeCommonTimeoutHandler struct { + TransportTimeoutDuration time.Duration + ServerBusySleepDuration time.Duration + EvalSucceed bool + ResultHandleSucceed bool +} + +func NewFakeCommonTimeoutHandler(transportTimeoutDuration time.Duration, serverBusySleepDuration time.Duration) *FakeCommonTimeoutHandler { + return &FakeCommonTimeoutHandler{ + TransportTimeoutDuration: transportTimeoutDuration, + ServerBusySleepDuration: serverBusySleepDuration, + EvalSucceed: false, + ResultHandleSucceed: false, + } +} + +func (handler *FakeCommonTimeoutHandler) Eval(evalContext *EvalContext) { + // 1. prepare mock server + path := "/evaltimeout" + srv := runBusyServer(path, handler.ServerBusySleepDuration) + defer srv.Close() + + // 2. send requests + url := srv.URL + path + res, err := sendRequest(evalContext.Ctx, url, handler.TransportTimeoutDuration) + if res != nil { + defer res.Body.Close() + } + + if err != nil { + evalContext.Error = errors.New("Fake evaluation timeout test failure") + return + } + + if res.StatusCode == 200 { + handler.EvalSucceed = true + } + + evalContext.Error = errors.New("Fake evaluation timeout test failure; wrong response") +} + +func (handler *FakeCommonTimeoutHandler) Handle(evalContext *EvalContext) error { + // 1. prepare mock server + path := "/resulthandle" + srv := runBusyServer(path, handler.ServerBusySleepDuration) + defer srv.Close() + + // 2. send requests + url := srv.URL + path + res, err := sendRequest(evalContext.Ctx, url, handler.TransportTimeoutDuration) + if res != nil { + defer res.Body.Close() + } + + if err != nil { + evalContext.Error = errors.New("Fake result handle timeout test failure") + return evalContext.Error + } + + if res.StatusCode == 200 { + handler.ResultHandleSucceed = true + return nil + } + + evalContext.Error = errors.New("Fake result handle timeout test failure; wrong response") + + return evalContext.Error +} + +func runBusyServer(path string, serverBusySleepDuration time.Duration) *httptest.Server { + mux := http.NewServeMux() + server := httptest.NewServer(mux) + + mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + time.Sleep(serverBusySleepDuration) + }) + + return server +} + +func sendRequest(context context.Context, url string, transportTimeoutInterval time.Duration) (resp *http.Response, err error) { + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + + req = req.WithContext(context) + + transport := http.Transport{ + Dial: (&net.Dialer{ + Timeout: transportTimeoutInterval, + KeepAlive: transportTimeoutInterval, + }).Dial, + } + client := http.Client{ + Transport: &transport, + } + + return client.Do(req) +}