Alerting: Run state manager as regular sub-service (#58246)

This commit is contained in:
Yuri Tseretyan 2022-11-04 17:06:47 -04:00 committed by GitHub
parent e6a9fa1cf9
commit 978f1119d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 22 additions and 30 deletions

View File

@ -282,6 +282,10 @@ func (ng *AlertNG) Run(ctx context.Context) error {
children, subCtx := errgroup.WithContext(ctx)
children.Go(func() error {
return ng.stateManager.Run(subCtx)
})
children.Go(func() error {
return ng.MultiOrgAlertmanager.Run(subCtx)
})

View File

@ -298,8 +298,6 @@ func (sch *schedule) schedulePeriodic(ctx context.Context, t *ticker.T) error {
case <-ctx.Done():
// waiting for all rule evaluation routines to stop
waitErr := dispatcherGroup.Wait()
// close the state manager and flush the state
sch.stateManager.Close()
return waitErr
}
}

View File

@ -15,7 +15,10 @@ import (
ngModels "github.com/grafana/grafana/pkg/services/ngalert/models"
)
var ResendDelay = 30 * time.Second
var (
ResendDelay = 30 * time.Second
MetricsScrapeInterval = 15 * time.Second // TODO: parameterize? // Setting to a reasonable default scrape interval for Prometheus.
)
// AlertInstanceManager defines the interface for querying the current alert instances.
type AlertInstanceManager interface {
@ -29,7 +32,6 @@ type Manager struct {
clock clock.Clock
cache *cache
quit chan struct{}
ResendDelay time.Duration
instanceStore InstanceStore
@ -39,9 +41,8 @@ type Manager struct {
}
func NewManager(metrics *metrics.State, externalURL *url.URL, instanceStore InstanceStore, imageService image.ImageService, clock clock.Clock, historian Historian) *Manager {
manager := &Manager{
return &Manager{
cache: newCache(),
quit: make(chan struct{}),
ResendDelay: ResendDelay, // TODO: make this configurable
log: log.New("ngalert.state.manager"),
metrics: metrics,
@ -51,14 +52,21 @@ func NewManager(metrics *metrics.State, externalURL *url.URL, instanceStore Inst
clock: clock,
externalURL: externalURL,
}
if manager.metrics != nil {
go manager.recordMetrics()
}
return manager
}
func (st *Manager) Close() {
st.quit <- struct{}{}
func (st *Manager) Run(ctx context.Context) error {
ticker := st.clock.Ticker(MetricsScrapeInterval)
for {
select {
case <-ticker.C:
st.log.Debug("Recording state cache metrics", "now", st.clock.Now())
st.cache.recordMetrics(st.metrics)
case <-ctx.Done():
st.log.Debug("Stopping")
ticker.Stop()
return ctx.Err()
}
}
}
func (st *Manager) Warm(ctx context.Context, rulesReader RuleReader) {
@ -269,24 +277,6 @@ func (st *Manager) GetStatesForRuleUID(orgID int64, alertRuleUID string) []*Stat
return st.cache.getStatesForRuleUID(orgID, alertRuleUID)
}
func (st *Manager) recordMetrics() {
// TODO: parameterize?
// Setting to a reasonable default scrape interval for Prometheus.
dur := time.Duration(15) * time.Second
ticker := st.clock.Ticker(dur)
for {
select {
case <-ticker.C:
st.log.Debug("Recording state cache metrics", "now", st.clock.Now())
st.cache.recordMetrics(st.metrics)
case <-st.quit:
st.log.Debug("Stopping state cache metrics recording", "now", st.clock.Now())
ticker.Stop()
return
}
}
}
func (st *Manager) Put(states []*State) {
for _, s := range states {
st.cache.set(s)