mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Add context.Context to InstanceStore (#45049)
This commit is contained in:
parent
fc42dfe396
commit
67a3e1d6fd
@ -453,13 +453,13 @@ func (sch *schedule) schedulePeriodic(ctx context.Context) error {
|
||||
case <-ctx.Done():
|
||||
waitErr := dispatcherGroup.Wait()
|
||||
|
||||
orgIds, err := sch.instanceStore.FetchOrgIds()
|
||||
orgIds, err := sch.instanceStore.FetchOrgIds(ctx)
|
||||
if err != nil {
|
||||
sch.log.Error("unable to fetch orgIds", "msg", err.Error())
|
||||
}
|
||||
|
||||
for _, v := range orgIds {
|
||||
sch.saveAlertStates(sch.stateManager.GetAll(v))
|
||||
sch.saveAlertStates(ctx, sch.stateManager.GetAll(v))
|
||||
}
|
||||
|
||||
sch.stateManager.Close()
|
||||
@ -541,8 +541,8 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul
|
||||
return q.Result, nil
|
||||
}
|
||||
|
||||
evaluate := func(alertRule *models.AlertRule, attempt int64, ctx *evalContext) error {
|
||||
logger := logger.New("version", alertRule.Version, "attempt", attempt, "now", ctx.now)
|
||||
evaluate := func(ctx context.Context, alertRule *models.AlertRule, attempt int64, evalCtx *evalContext) error {
|
||||
logger := logger.New("version", alertRule.Version, "attempt", attempt, "now", evalCtx.now)
|
||||
start := sch.clock.Now()
|
||||
|
||||
condition := models.Condition{
|
||||
@ -550,7 +550,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul
|
||||
OrgID: alertRule.OrgID,
|
||||
Data: alertRule.Data,
|
||||
}
|
||||
results, err := sch.evaluator.ConditionEval(&condition, ctx.now, sch.expressionService)
|
||||
results, err := sch.evaluator.ConditionEval(&condition, evalCtx.now, sch.expressionService)
|
||||
dur := sch.clock.Now().Sub(start)
|
||||
evalTotal.Inc()
|
||||
evalDuration.Observe(dur.Seconds())
|
||||
@ -562,8 +562,8 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul
|
||||
}
|
||||
logger.Debug("alert rule evaluated", "results", results, "duration", dur)
|
||||
|
||||
processedStates := sch.stateManager.ProcessEvalResults(context.Background(), alertRule, results)
|
||||
sch.saveAlertStates(processedStates)
|
||||
processedStates := sch.stateManager.ProcessEvalResults(ctx, alertRule, results)
|
||||
sch.saveAlertStates(ctx, processedStates)
|
||||
alerts := FromAlertStateToPostableAlerts(processedStates, sch.stateManager, sch.appURL)
|
||||
|
||||
notify(alerts, logger)
|
||||
@ -629,7 +629,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul
|
||||
currentRule = newRule
|
||||
logger.Debug("new alert rule version fetched", "title", newRule.Title, "version", newRule.Version)
|
||||
}
|
||||
return evaluate(currentRule, attempt, ctx)
|
||||
return evaluate(grafanaCtx, currentRule, attempt, ctx)
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error("evaluation failed after all retries", "err", err)
|
||||
@ -643,7 +643,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul
|
||||
}
|
||||
}
|
||||
|
||||
func (sch *schedule) saveAlertStates(states []*state.State) {
|
||||
func (sch *schedule) saveAlertStates(ctx context.Context, states []*state.State) {
|
||||
sch.log.Debug("saving alert states", "count", len(states))
|
||||
for _, s := range states {
|
||||
cmd := models.SaveAlertInstanceCommand{
|
||||
@ -655,7 +655,7 @@ func (sch *schedule) saveAlertStates(states []*state.State) {
|
||||
CurrentStateSince: s.StartsAt,
|
||||
CurrentStateEnd: s.EndsAt,
|
||||
}
|
||||
err := sch.instanceStore.SaveAlertInstance(&cmd)
|
||||
err := sch.instanceStore.SaveAlertInstance(ctx, &cmd)
|
||||
if err != nil {
|
||||
sch.log.Error("failed to save alert state", "uid", s.AlertRuleUID, "orgId", s.OrgID, "labels", s.Labels.String(), "state", s.State.String(), "msg", err.Error())
|
||||
}
|
||||
|
@ -82,7 +82,7 @@ func TestWarmStateCache(t *testing.T) {
|
||||
CurrentStateEnd: evaluationTime.Add(1 * time.Minute),
|
||||
}
|
||||
|
||||
_ = dbstore.SaveAlertInstance(saveCmd1)
|
||||
_ = dbstore.SaveAlertInstance(ctx, saveCmd1)
|
||||
|
||||
saveCmd2 := &models.SaveAlertInstanceCommand{
|
||||
RuleOrgID: rule.OrgID,
|
||||
@ -93,7 +93,7 @@ func TestWarmStateCache(t *testing.T) {
|
||||
CurrentStateSince: evaluationTime.Add(-1 * time.Minute),
|
||||
CurrentStateEnd: evaluationTime.Add(1 * time.Minute),
|
||||
}
|
||||
_ = dbstore.SaveAlertInstance(saveCmd2)
|
||||
_ = dbstore.SaveAlertInstance(ctx, saveCmd2)
|
||||
|
||||
schedCfg := schedule.SchedulerCfg{
|
||||
C: clock.NewMock(),
|
||||
|
@ -289,27 +289,29 @@ type FakeInstanceStore struct {
|
||||
recordedOps []interface{}
|
||||
}
|
||||
|
||||
func (f *FakeInstanceStore) GetAlertInstance(q *models.GetAlertInstanceQuery) error {
|
||||
func (f *FakeInstanceStore) GetAlertInstance(_ context.Context, q *models.GetAlertInstanceQuery) error {
|
||||
f.mtx.Lock()
|
||||
defer f.mtx.Unlock()
|
||||
f.recordedOps = append(f.recordedOps, *q)
|
||||
return nil
|
||||
}
|
||||
func (f *FakeInstanceStore) ListAlertInstances(q *models.ListAlertInstancesQuery) error {
|
||||
func (f *FakeInstanceStore) ListAlertInstances(_ context.Context, q *models.ListAlertInstancesQuery) error {
|
||||
f.mtx.Lock()
|
||||
defer f.mtx.Unlock()
|
||||
f.recordedOps = append(f.recordedOps, *q)
|
||||
return nil
|
||||
}
|
||||
func (f *FakeInstanceStore) SaveAlertInstance(q *models.SaveAlertInstanceCommand) error {
|
||||
func (f *FakeInstanceStore) SaveAlertInstance(_ context.Context, q *models.SaveAlertInstanceCommand) error {
|
||||
f.mtx.Lock()
|
||||
defer f.mtx.Unlock()
|
||||
f.recordedOps = append(f.recordedOps, *q)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *FakeInstanceStore) FetchOrgIds() ([]int64, error) { return []int64{}, nil }
|
||||
func (f *FakeInstanceStore) DeleteAlertInstance(_ int64, _, _ string) error { return nil }
|
||||
func (f *FakeInstanceStore) FetchOrgIds(_ context.Context) ([]int64, error) { return []int64{}, nil }
|
||||
func (f *FakeInstanceStore) DeleteAlertInstance(_ context.Context, _ int64, _, _ string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func newFakeAdminConfigStore(t *testing.T) *fakeAdminConfigStore {
|
||||
t.Helper()
|
||||
|
@ -58,7 +58,7 @@ func (st *Manager) Warm(ctx context.Context) {
|
||||
st.log.Info("warming cache for startup")
|
||||
st.ResetCache()
|
||||
|
||||
orgIds, err := st.instanceStore.FetchOrgIds()
|
||||
orgIds, err := st.instanceStore.FetchOrgIds(ctx)
|
||||
if err != nil {
|
||||
st.log.Error("unable to fetch orgIds", "msg", err.Error())
|
||||
}
|
||||
@ -82,7 +82,7 @@ func (st *Manager) Warm(ctx context.Context) {
|
||||
cmd := ngModels.ListAlertInstancesQuery{
|
||||
RuleOrgID: orgId,
|
||||
}
|
||||
if err := st.instanceStore.ListAlertInstances(&cmd); err != nil {
|
||||
if err := st.instanceStore.ListAlertInstances(ctx, &cmd); err != nil {
|
||||
st.log.Error("unable to fetch previous state", "msg", err.Error())
|
||||
}
|
||||
|
||||
@ -150,7 +150,7 @@ func (st *Manager) ProcessEvalResults(ctx context.Context, alertRule *ngModels.A
|
||||
states = append(states, s)
|
||||
processedResults[s.CacheId] = s
|
||||
}
|
||||
st.staleResultsHandler(alertRule, processedResults)
|
||||
st.staleResultsHandler(ctx, alertRule, processedResults)
|
||||
return states
|
||||
}
|
||||
|
||||
@ -282,7 +282,7 @@ func (st *Manager) createAlertAnnotation(ctx context.Context, new eval.State, al
|
||||
}
|
||||
}
|
||||
|
||||
func (st *Manager) staleResultsHandler(alertRule *ngModels.AlertRule, states map[string]*State) {
|
||||
func (st *Manager) staleResultsHandler(ctx context.Context, alertRule *ngModels.AlertRule, states map[string]*State) {
|
||||
allStates := st.GetStatesForRuleUID(alertRule.OrgID, alertRule.UID)
|
||||
for _, s := range allStates {
|
||||
_, ok := states[s.CacheId]
|
||||
@ -295,7 +295,7 @@ func (st *Manager) staleResultsHandler(alertRule *ngModels.AlertRule, states map
|
||||
st.log.Error("unable to get labelsHash", "error", err.Error(), "orgID", s.OrgID, "alertRuleUID", s.AlertRuleUID)
|
||||
}
|
||||
|
||||
if err = st.instanceStore.DeleteAlertInstance(s.OrgID, s.AlertRuleUID, labelsHash); err != nil {
|
||||
if err = st.instanceStore.DeleteAlertInstance(ctx, s.OrgID, s.AlertRuleUID, labelsHash); err != nil {
|
||||
st.log.Error("unable to delete stale instance from database", "error", err.Error(), "orgID", s.OrgID, "alertRuleUID", s.AlertRuleUID, "cacheID", s.CacheId)
|
||||
}
|
||||
}
|
||||
|
@ -1529,7 +1529,7 @@ func TestStaleResultsHandler(t *testing.T) {
|
||||
CurrentStateEnd: evaluationTime.Add(1 * time.Minute),
|
||||
}
|
||||
|
||||
_ = dbstore.SaveAlertInstance(saveCmd1)
|
||||
_ = dbstore.SaveAlertInstance(ctx, saveCmd1)
|
||||
|
||||
saveCmd2 := &models.SaveAlertInstanceCommand{
|
||||
RuleOrgID: rule.OrgID,
|
||||
@ -1540,7 +1540,7 @@ func TestStaleResultsHandler(t *testing.T) {
|
||||
CurrentStateSince: evaluationTime.Add(-1 * time.Minute),
|
||||
CurrentStateEnd: evaluationTime.Add(1 * time.Minute),
|
||||
}
|
||||
_ = dbstore.SaveAlertInstance(saveCmd2)
|
||||
_ = dbstore.SaveAlertInstance(ctx, saveCmd2)
|
||||
|
||||
testCases := []struct {
|
||||
desc string
|
||||
|
@ -10,17 +10,17 @@ import (
|
||||
)
|
||||
|
||||
type InstanceStore interface {
|
||||
GetAlertInstance(cmd *models.GetAlertInstanceQuery) error
|
||||
ListAlertInstances(cmd *models.ListAlertInstancesQuery) error
|
||||
SaveAlertInstance(cmd *models.SaveAlertInstanceCommand) error
|
||||
FetchOrgIds() ([]int64, error)
|
||||
DeleteAlertInstance(orgID int64, ruleUID, labelsHash string) error
|
||||
GetAlertInstance(ctx context.Context, cmd *models.GetAlertInstanceQuery) error
|
||||
ListAlertInstances(ctx context.Context, cmd *models.ListAlertInstancesQuery) error
|
||||
SaveAlertInstance(ctx context.Context, cmd *models.SaveAlertInstanceCommand) error
|
||||
FetchOrgIds(ctx context.Context) ([]int64, error)
|
||||
DeleteAlertInstance(ctx context.Context, orgID int64, ruleUID, labelsHash string) error
|
||||
}
|
||||
|
||||
// GetAlertInstance is a handler for retrieving an alert instance based on OrgId, AlertDefintionID, and
|
||||
// the hash of the labels.
|
||||
func (st DBstore) GetAlertInstance(cmd *models.GetAlertInstanceQuery) error {
|
||||
return st.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
|
||||
func (st DBstore) GetAlertInstance(ctx context.Context, cmd *models.GetAlertInstanceQuery) error {
|
||||
return st.SQLStore.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
instance := models.AlertInstance{}
|
||||
s := strings.Builder{}
|
||||
s.WriteString(`SELECT * FROM alert_instance
|
||||
@ -52,8 +52,8 @@ func (st DBstore) GetAlertInstance(cmd *models.GetAlertInstanceQuery) error {
|
||||
|
||||
// ListAlertInstances is a handler for retrieving alert instances within specific organisation
|
||||
// based on various filters.
|
||||
func (st DBstore) ListAlertInstances(cmd *models.ListAlertInstancesQuery) error {
|
||||
return st.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
|
||||
func (st DBstore) ListAlertInstances(ctx context.Context, cmd *models.ListAlertInstancesQuery) error {
|
||||
return st.SQLStore.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
alertInstances := make([]*models.ListAlertInstancesQueryResult, 0)
|
||||
|
||||
s := strings.Builder{}
|
||||
@ -84,8 +84,8 @@ func (st DBstore) ListAlertInstances(cmd *models.ListAlertInstancesQuery) error
|
||||
}
|
||||
|
||||
// SaveAlertInstance is a handler for saving a new alert instance.
|
||||
func (st DBstore) SaveAlertInstance(cmd *models.SaveAlertInstanceCommand) error {
|
||||
return st.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
|
||||
func (st DBstore) SaveAlertInstance(ctx context.Context, cmd *models.SaveAlertInstanceCommand) error {
|
||||
return st.SQLStore.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
labelTupleJSON, labelsHash, err := cmd.Labels.StringAndHash()
|
||||
if err != nil {
|
||||
return err
|
||||
@ -121,10 +121,10 @@ func (st DBstore) SaveAlertInstance(cmd *models.SaveAlertInstanceCommand) error
|
||||
})
|
||||
}
|
||||
|
||||
func (st DBstore) FetchOrgIds() ([]int64, error) {
|
||||
func (st DBstore) FetchOrgIds(ctx context.Context) ([]int64, error) {
|
||||
orgIds := []int64{}
|
||||
|
||||
err := st.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
|
||||
err := st.SQLStore.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
s := strings.Builder{}
|
||||
params := make([]interface{}, 0)
|
||||
|
||||
@ -144,8 +144,8 @@ func (st DBstore) FetchOrgIds() ([]int64, error) {
|
||||
return orgIds, err
|
||||
}
|
||||
|
||||
func (st DBstore) DeleteAlertInstance(orgID int64, ruleUID, labelsHash string) error {
|
||||
return st.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
|
||||
func (st DBstore) DeleteAlertInstance(ctx context.Context, orgID int64, ruleUID, labelsHash string) error {
|
||||
return st.SQLStore.WithTransactionalDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
_, err := sess.Exec("DELETE FROM alert_instance WHERE rule_org_id = ? AND rule_uid = ? AND labels_hash = ?", orgID, ruleUID, labelsHash)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -51,7 +51,7 @@ func TestAlertInstanceOperations(t *testing.T) {
|
||||
State: models.InstanceStateFiring,
|
||||
Labels: models.InstanceLabels{"test": "testValue"},
|
||||
}
|
||||
err := dbstore.SaveAlertInstance(saveCmd)
|
||||
err := dbstore.SaveAlertInstance(ctx, saveCmd)
|
||||
require.NoError(t, err)
|
||||
|
||||
getCmd := &models.GetAlertInstanceQuery{
|
||||
@ -60,7 +60,7 @@ func TestAlertInstanceOperations(t *testing.T) {
|
||||
Labels: models.InstanceLabels{"test": "testValue"},
|
||||
}
|
||||
|
||||
err = dbstore.GetAlertInstance(getCmd)
|
||||
err = dbstore.GetAlertInstance(ctx, getCmd)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, saveCmd.Labels, getCmd.Result.Labels)
|
||||
@ -75,7 +75,7 @@ func TestAlertInstanceOperations(t *testing.T) {
|
||||
State: models.InstanceStateNormal,
|
||||
Labels: models.InstanceLabels{},
|
||||
}
|
||||
err := dbstore.SaveAlertInstance(saveCmd)
|
||||
err := dbstore.SaveAlertInstance(ctx, saveCmd)
|
||||
require.NoError(t, err)
|
||||
|
||||
getCmd := &models.GetAlertInstanceQuery{
|
||||
@ -83,7 +83,7 @@ func TestAlertInstanceOperations(t *testing.T) {
|
||||
RuleUID: saveCmd.RuleUID,
|
||||
}
|
||||
|
||||
err = dbstore.GetAlertInstance(getCmd)
|
||||
err = dbstore.GetAlertInstance(ctx, getCmd)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, alertRule2.OrgID, getCmd.Result.RuleOrgID)
|
||||
@ -99,7 +99,7 @@ func TestAlertInstanceOperations(t *testing.T) {
|
||||
Labels: models.InstanceLabels{"test": "testValue"},
|
||||
}
|
||||
|
||||
err := dbstore.SaveAlertInstance(saveCmdOne)
|
||||
err := dbstore.SaveAlertInstance(ctx, saveCmdOne)
|
||||
require.NoError(t, err)
|
||||
|
||||
saveCmdTwo := &models.SaveAlertInstanceCommand{
|
||||
@ -108,7 +108,7 @@ func TestAlertInstanceOperations(t *testing.T) {
|
||||
State: models.InstanceStateFiring,
|
||||
Labels: models.InstanceLabels{"test": "meow"},
|
||||
}
|
||||
err = dbstore.SaveAlertInstance(saveCmdTwo)
|
||||
err = dbstore.SaveAlertInstance(ctx, saveCmdTwo)
|
||||
require.NoError(t, err)
|
||||
|
||||
listQuery := &models.ListAlertInstancesQuery{
|
||||
@ -116,7 +116,7 @@ func TestAlertInstanceOperations(t *testing.T) {
|
||||
RuleUID: saveCmdOne.RuleUID,
|
||||
}
|
||||
|
||||
err = dbstore.ListAlertInstances(listQuery)
|
||||
err = dbstore.ListAlertInstances(ctx, listQuery)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, listQuery.Result, 2)
|
||||
@ -127,7 +127,7 @@ func TestAlertInstanceOperations(t *testing.T) {
|
||||
RuleOrgID: orgID,
|
||||
}
|
||||
|
||||
err := dbstore.ListAlertInstances(listQuery)
|
||||
err := dbstore.ListAlertInstances(ctx, listQuery)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, listQuery.Result, 4)
|
||||
@ -139,7 +139,7 @@ func TestAlertInstanceOperations(t *testing.T) {
|
||||
State: models.InstanceStateNormal,
|
||||
}
|
||||
|
||||
err := dbstore.ListAlertInstances(listQuery)
|
||||
err := dbstore.ListAlertInstances(ctx, listQuery)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, listQuery.Result, 1)
|
||||
@ -153,7 +153,7 @@ func TestAlertInstanceOperations(t *testing.T) {
|
||||
Labels: models.InstanceLabels{"test": "testValue"},
|
||||
}
|
||||
|
||||
err := dbstore.SaveAlertInstance(saveCmdOne)
|
||||
err := dbstore.SaveAlertInstance(ctx, saveCmdOne)
|
||||
require.NoError(t, err)
|
||||
|
||||
saveCmdTwo := &models.SaveAlertInstanceCommand{
|
||||
@ -162,7 +162,7 @@ func TestAlertInstanceOperations(t *testing.T) {
|
||||
State: models.InstanceStateNormal,
|
||||
Labels: models.InstanceLabels{"test": "testValue"},
|
||||
}
|
||||
err = dbstore.SaveAlertInstance(saveCmdTwo)
|
||||
err = dbstore.SaveAlertInstance(ctx, saveCmdTwo)
|
||||
require.NoError(t, err)
|
||||
|
||||
listQuery := &models.ListAlertInstancesQuery{
|
||||
@ -170,7 +170,7 @@ func TestAlertInstanceOperations(t *testing.T) {
|
||||
RuleUID: alertRule4.UID,
|
||||
}
|
||||
|
||||
err = dbstore.ListAlertInstances(listQuery)
|
||||
err = dbstore.ListAlertInstances(ctx, listQuery)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, listQuery.Result, 1)
|
||||
|
Loading…
Reference in New Issue
Block a user