Alerting: Move alertRuleRegistry to its own file (#48890)

* move alertRuleRegistry to its own file
* move tests to separate file
This commit is contained in:
Yuriy Tseretyan
2022-05-11 10:04:50 -04:00
committed by GitHub
parent dc33e09b24
commit 99156b40bd
4 changed files with 231 additions and 210 deletions

View File

@@ -0,0 +1,113 @@
package schedule
import (
"context"
"fmt"
"sync"
"time"
"github.com/grafana/grafana/pkg/services/ngalert/models"
)
type alertRuleRegistry struct {
mu sync.Mutex
alertRuleInfo map[models.AlertRuleKey]*alertRuleInfo
}
// getOrCreateInfo gets rule routine information from registry by the key. If it does not exist, it creates a new one.
// Returns a pointer to the rule routine information and a flag that indicates whether it is a new struct or not.
func (r *alertRuleRegistry) getOrCreateInfo(context context.Context, key models.AlertRuleKey) (*alertRuleInfo, bool) {
r.mu.Lock()
defer r.mu.Unlock()
info, ok := r.alertRuleInfo[key]
if !ok {
info = newAlertRuleInfo(context)
r.alertRuleInfo[key] = info
}
return info, !ok
}
// get returns the channel for the specific alert rule
// if the key does not exist returns an error
func (r *alertRuleRegistry) get(key models.AlertRuleKey) (*alertRuleInfo, error) {
r.mu.Lock()
defer r.mu.Unlock()
info, ok := r.alertRuleInfo[key]
if !ok {
return nil, fmt.Errorf("%v key not found", key)
}
return info, nil
}
func (r *alertRuleRegistry) exists(key models.AlertRuleKey) bool {
r.mu.Lock()
defer r.mu.Unlock()
_, ok := r.alertRuleInfo[key]
return ok
}
// del removes pair that has specific key from alertRuleInfo.
// Returns 2-tuple where the first element is value of the removed pair
// and the second element indicates whether element with the specified key existed.
func (r *alertRuleRegistry) del(key models.AlertRuleKey) (*alertRuleInfo, bool) {
r.mu.Lock()
defer r.mu.Unlock()
info, ok := r.alertRuleInfo[key]
if ok {
delete(r.alertRuleInfo, key)
}
return info, ok
}
func (r *alertRuleRegistry) keyMap() map[models.AlertRuleKey]struct{} {
r.mu.Lock()
defer r.mu.Unlock()
definitionsIDs := make(map[models.AlertRuleKey]struct{}, len(r.alertRuleInfo))
for k := range r.alertRuleInfo {
definitionsIDs[k] = struct{}{}
}
return definitionsIDs
}
type alertRuleInfo struct {
evalCh chan *evaluation
updateCh chan struct{}
ctx context.Context
stop context.CancelFunc
}
func newAlertRuleInfo(parent context.Context) *alertRuleInfo {
ctx, cancel := context.WithCancel(parent)
return &alertRuleInfo{evalCh: make(chan *evaluation), updateCh: make(chan struct{}), ctx: ctx, stop: cancel}
}
// eval signals the rule evaluation routine to perform the evaluation of the rule. Does nothing if the loop is stopped
func (a *alertRuleInfo) eval(t time.Time, version int64) bool {
select {
case a.evalCh <- &evaluation{
scheduledAt: t,
version: version,
}:
return true
case <-a.ctx.Done():
return false
}
}
// update signals the rule evaluation routine to update the internal state. Does nothing if the loop is stopped
func (a *alertRuleInfo) update() bool {
select {
case a.updateCh <- struct{}{}:
return true
case <-a.ctx.Done():
return false
}
}
type evaluation struct {
scheduledAt time.Time
version int64
}

View File

@@ -0,0 +1,118 @@
package schedule
import (
"context"
"math/rand"
"runtime"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestSchedule_alertRuleInfo(t *testing.T) {
t.Run("when rule evaluation is not stopped", func(t *testing.T) {
t.Run("Update should send to updateCh", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
resultCh := make(chan bool)
go func() {
resultCh <- r.update()
}()
select {
case <-r.updateCh:
require.True(t, <-resultCh)
case <-time.After(5 * time.Second):
t.Fatal("No message was received on update channel")
}
})
t.Run("eval should send to evalCh", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
expected := time.Now()
resultCh := make(chan bool)
version := rand.Int63()
go func() {
resultCh <- r.eval(expected, version)
}()
select {
case ctx := <-r.evalCh:
require.Equal(t, version, ctx.version)
require.Equal(t, expected, ctx.scheduledAt)
require.True(t, <-resultCh)
case <-time.After(5 * time.Second):
t.Fatal("No message was received on eval channel")
}
})
t.Run("eval should exit when context is cancelled", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
resultCh := make(chan bool)
go func() {
resultCh <- r.eval(time.Now(), rand.Int63())
}()
runtime.Gosched()
r.stop()
select {
case result := <-resultCh:
require.False(t, result)
case <-time.After(5 * time.Second):
t.Fatal("No message was received on eval channel")
}
})
})
t.Run("when rule evaluation is stopped", func(t *testing.T) {
t.Run("Update should do nothing", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
r.stop()
require.False(t, r.update())
})
t.Run("eval should do nothing", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
r.stop()
require.False(t, r.eval(time.Now(), rand.Int63()))
})
t.Run("stop should do nothing", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
r.stop()
r.stop()
})
})
t.Run("should be thread-safe", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
wg := sync.WaitGroup{}
go func() {
for {
select {
case <-r.evalCh:
time.Sleep(time.Microsecond)
case <-r.updateCh:
time.Sleep(time.Microsecond)
case <-r.ctx.Done():
return
}
}
}()
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
for i := 0; i < 20; i++ {
max := 3
if i <= 10 {
max = 2
}
switch rand.Intn(max) + 1 {
case 1:
r.update()
case 2:
r.eval(time.Now(), rand.Int63())
case 3:
r.stop()
}
}
wg.Done()
}()
}
wg.Wait()
})
}

View File

@@ -647,109 +647,6 @@ func (sch *schedule) saveAlertStates(ctx context.Context, states []*state.State)
}
}
type alertRuleRegistry struct {
mu sync.Mutex
alertRuleInfo map[models.AlertRuleKey]*alertRuleInfo
}
// getOrCreateInfo gets rule routine information from registry by the key. If it does not exist, it creates a new one.
// Returns a pointer to the rule routine information and a flag that indicates whether it is a new struct or not.
func (r *alertRuleRegistry) getOrCreateInfo(context context.Context, key models.AlertRuleKey) (*alertRuleInfo, bool) {
r.mu.Lock()
defer r.mu.Unlock()
info, ok := r.alertRuleInfo[key]
if !ok {
info = newAlertRuleInfo(context)
r.alertRuleInfo[key] = info
}
return info, !ok
}
// get returns the channel for the specific alert rule
// if the key does not exist returns an error
func (r *alertRuleRegistry) get(key models.AlertRuleKey) (*alertRuleInfo, error) {
r.mu.Lock()
defer r.mu.Unlock()
info, ok := r.alertRuleInfo[key]
if !ok {
return nil, fmt.Errorf("%v key not found", key)
}
return info, nil
}
func (r *alertRuleRegistry) exists(key models.AlertRuleKey) bool {
r.mu.Lock()
defer r.mu.Unlock()
_, ok := r.alertRuleInfo[key]
return ok
}
// del removes pair that has specific key from alertRuleInfo.
// Returns 2-tuple where the first element is value of the removed pair
// and the second element indicates whether element with the specified key existed.
func (r *alertRuleRegistry) del(key models.AlertRuleKey) (*alertRuleInfo, bool) {
r.mu.Lock()
defer r.mu.Unlock()
info, ok := r.alertRuleInfo[key]
if ok {
delete(r.alertRuleInfo, key)
}
return info, ok
}
func (r *alertRuleRegistry) keyMap() map[models.AlertRuleKey]struct{} {
r.mu.Lock()
defer r.mu.Unlock()
definitionsIDs := make(map[models.AlertRuleKey]struct{}, len(r.alertRuleInfo))
for k := range r.alertRuleInfo {
definitionsIDs[k] = struct{}{}
}
return definitionsIDs
}
type alertRuleInfo struct {
evalCh chan *evaluation
updateCh chan struct{}
ctx context.Context
stop context.CancelFunc
}
func newAlertRuleInfo(parent context.Context) *alertRuleInfo {
ctx, cancel := context.WithCancel(parent)
return &alertRuleInfo{evalCh: make(chan *evaluation), updateCh: make(chan struct{}), ctx: ctx, stop: cancel}
}
// eval signals the rule evaluation routine to perform the evaluation of the rule. Does nothing if the loop is stopped
func (a *alertRuleInfo) eval(t time.Time, version int64) bool {
select {
case a.evalCh <- &evaluation{
scheduledAt: t,
version: version,
}:
return true
case <-a.ctx.Done():
return false
}
}
// update signals the rule evaluation routine to update the internal state. Does nothing if the loop is stopped
func (a *alertRuleInfo) update() bool {
select {
case a.updateCh <- struct{}{}:
return true
case <-a.ctx.Done():
return false
}
}
type evaluation struct {
scheduledAt time.Time
version int64
}
// overrideCfg is only used on tests.
func (sch *schedule) overrideCfg(cfg SchedulerCfg) {
sch.clock = cfg.C

View File

@@ -8,7 +8,6 @@ import (
"fmt"
"math/rand"
"net/url"
"runtime"
"sync"
"testing"
"time"
@@ -817,112 +816,6 @@ func TestSchedule_ruleRoutine(t *testing.T) {
})
}
func TestSchedule_alertRuleInfo(t *testing.T) {
t.Run("when rule evaluation is not stopped", func(t *testing.T) {
t.Run("Update should send to updateCh", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
resultCh := make(chan bool)
go func() {
resultCh <- r.update()
}()
select {
case <-r.updateCh:
require.True(t, <-resultCh)
case <-time.After(5 * time.Second):
t.Fatal("No message was received on update channel")
}
})
t.Run("eval should send to evalCh", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
expected := time.Now()
resultCh := make(chan bool)
version := rand.Int63()
go func() {
resultCh <- r.eval(expected, version)
}()
select {
case ctx := <-r.evalCh:
require.Equal(t, version, ctx.version)
require.Equal(t, expected, ctx.scheduledAt)
require.True(t, <-resultCh)
case <-time.After(5 * time.Second):
t.Fatal("No message was received on eval channel")
}
})
t.Run("eval should exit when context is cancelled", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
resultCh := make(chan bool)
go func() {
resultCh <- r.eval(time.Now(), rand.Int63())
}()
runtime.Gosched()
r.stop()
select {
case result := <-resultCh:
require.False(t, result)
case <-time.After(5 * time.Second):
t.Fatal("No message was received on eval channel")
}
})
})
t.Run("when rule evaluation is stopped", func(t *testing.T) {
t.Run("Update should do nothing", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
r.stop()
require.False(t, r.update())
})
t.Run("eval should do nothing", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
r.stop()
require.False(t, r.eval(time.Now(), rand.Int63()))
})
t.Run("stop should do nothing", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
r.stop()
r.stop()
})
})
t.Run("should be thread-safe", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
wg := sync.WaitGroup{}
go func() {
for {
select {
case <-r.evalCh:
time.Sleep(time.Microsecond)
case <-r.updateCh:
time.Sleep(time.Microsecond)
case <-r.ctx.Done():
return
}
}
}()
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
for i := 0; i < 20; i++ {
max := 3
if i <= 10 {
max = 2
}
switch rand.Intn(max) + 1 {
case 1:
r.update()
case 2:
r.eval(time.Now(), rand.Int63())
case 3:
r.stop()
}
}
wg.Done()
}()
}
wg.Wait()
})
}
func TestSchedule_UpdateAlertRule(t *testing.T) {
t.Run("when rule exists", func(t *testing.T) {
t.Run("it should call Update", func(t *testing.T) {