implement sql queries for transactional alert reminders

This commit is contained in:
bergquist 2018-09-27 11:14:44 +02:00
parent ff79f80685
commit 3fab616239
4 changed files with 118 additions and 54 deletions

View File

@ -9,7 +9,16 @@ import (
var ( var (
ErrNotificationFrequencyNotFound = errors.New("Notification frequency not specified") ErrNotificationFrequencyNotFound = errors.New("Notification frequency not specified")
ErrJournalingNotFound = errors.New("alert notification journaling not found") ErrAlertNotificationStateNotFound = errors.New("alert notification state not found")
ErrAlertNotificationStateVersionConflict = errors.New("alert notification state update version conflict")
ErrAlertNotificationStateAllreadyExist = errors.New("alert notification state allready exists.")
)
type AlertNotificationStateType string
var (
AlertNotificationStatePending = AlertNotificationStateType("pending")
AlertNotificationStateCompleted = AlertNotificationStateType("completed")
) )
type AlertNotification struct { type AlertNotification struct {
@ -82,16 +91,15 @@ type AlertNotificationState struct {
AlertId int64 AlertId int64
NotifierId int64 NotifierId int64
SentAt int64 SentAt int64
State string State AlertNotificationStateType
Version int64 Version int64
} }
type UpdateAlertNotificationStateCommand struct { type UpdateAlertNotificationStateCommand struct {
OrgId int64 Id int64
AlertId int64
NotifierId int64
SentAt int64 SentAt int64
State bool State AlertNotificationStateType
Version int64
} }
type GetNotificationStateQuery struct { type GetNotificationStateQuery struct {
@ -107,5 +115,5 @@ type InsertAlertNotificationCommand struct {
AlertId int64 AlertId int64
NotifierId int64 NotifierId int64
SentAt int64 SentAt int64
State string State AlertNotificationStateType
} }

View File

@ -18,8 +18,8 @@ func init() {
bus.AddHandler("sql", DeleteAlertNotification) bus.AddHandler("sql", DeleteAlertNotification)
bus.AddHandler("sql", GetAlertNotificationsToSend) bus.AddHandler("sql", GetAlertNotificationsToSend)
bus.AddHandler("sql", GetAllAlertNotifications) bus.AddHandler("sql", GetAllAlertNotifications)
bus.AddHandlerCtx("sql", RecordNotificationJournal) bus.AddHandlerCtx("sql", InsertAlertNotificationState)
bus.AddHandlerCtx("sql", GetLatestNotification) bus.AddHandlerCtx("sql", GetAlertNotificationState)
} }
func DeleteAlertNotification(cmd *m.DeleteAlertNotificationCommand) error { func DeleteAlertNotification(cmd *m.DeleteAlertNotificationCommand) error {
@ -228,34 +228,73 @@ func UpdateAlertNotification(cmd *m.UpdateAlertNotificationCommand) error {
}) })
} }
func RecordNotificationJournal(ctx context.Context, cmd *m.UpdateAlertNotificationStateCommand) error { func InsertAlertNotificationState(ctx context.Context, cmd *m.InsertAlertNotificationCommand) error {
return withDbSession(ctx, func(sess *DBSession) error { return withDbSession(ctx, func(sess *DBSession) error {
journalEntry := &m.AlertNotificationState{ notificationState := &m.AlertNotificationState{
OrgId: cmd.OrgId, OrgId: cmd.OrgId,
AlertId: cmd.AlertId, AlertId: cmd.AlertId,
NotifierId: cmd.NotifierId, NotifierId: cmd.NotifierId,
SentAt: cmd.SentAt, SentAt: cmd.SentAt,
State: cmd.State,
}
_, err := sess.Insert(notificationState)
if err == nil {
return nil
}
if strings.HasPrefix(err.Error(), "UNIQUE constraint failed") {
return m.ErrAlertNotificationStateAllreadyExist
} }
_, err := sess.Insert(journalEntry)
return err return err
}) })
} }
func GetLatestNotification(ctx context.Context, cmd *m.GetNotificationStateQuery) error { func UpdateAlertNotificationState(ctx context.Context, cmd *m.UpdateAlertNotificationStateCommand) error {
return withDbSession(ctx, func(sess *DBSession) error {
sql := `UPDATE alert_notification_state SET
state= ?,
version = ?
WHERE
id = ? AND
version = ?
`
res, err := sess.Exec(sql, cmd.State, cmd.Version+1, cmd.Id, cmd.Version)
if err != nil {
return err
}
affected, _ := res.RowsAffected()
if affected == 0 {
return m.ErrAlertNotificationStateVersionConflict
}
return nil
})
}
func GetAlertNotificationState(ctx context.Context, cmd *m.GetNotificationStateQuery) error {
return withDbSession(ctx, func(sess *DBSession) error { return withDbSession(ctx, func(sess *DBSession) error {
nj := &m.AlertNotificationState{} nj := &m.AlertNotificationState{}
err := sess.Desc("alert_notification_journal.sent_at"). exist, err := sess.Desc("alert_notification_state.sent_at").
Where("alert_notification_journal.org_id = ?", cmd.OrgId). Where("alert_notification_state.org_id = ?", cmd.OrgId).
Where("alert_notification_journal.alert_id = ?", cmd.AlertId). Where("alert_notification_state.alert_id = ?", cmd.AlertId).
Where("alert_notification_journal.notifier_id = ?", cmd.NotifierId). Where("alert_notification_state.notifier_id = ?", cmd.NotifierId).
Find(&nj) Get(nj)
if err != nil { if err != nil {
return err return err
} }
if !exist {
return m.ErrAlertNotificationStateNotFound
}
cmd.Result = nj cmd.Result = nj
return nil return nil
}) })

View File

@ -6,7 +6,7 @@ import (
"time" "time"
"github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/components/simplejson"
m "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/models"
. "github.com/smartystreets/goconvey/convey" . "github.com/smartystreets/goconvey/convey"
) )
@ -14,37 +14,53 @@ func TestAlertNotificationSQLAccess(t *testing.T) {
Convey("Testing Alert notification sql access", t, func() { Convey("Testing Alert notification sql access", t, func() {
InitTestDB(t) InitTestDB(t)
Convey("Alert notification journal", func() { Convey("Alert notification state", func() {
var alertId int64 = 7 var alertId int64 = 7
var orgId int64 = 5 var orgId int64 = 5
var notifierId int64 = 10 var notifierId int64 = 10
Convey("Getting last journal should raise error if no one exists", func() { Convey("Getting no existant state returns error", func() {
query := &m.GetNotificationStateQuery{AlertId: alertId, OrgId: orgId, NotifierId: notifierId} query := &models.GetNotificationStateQuery{AlertId: alertId, OrgId: orgId, NotifierId: notifierId}
err := GetLatestNotification(context.Background(), query) err := GetAlertNotificationState(context.Background(), query)
So(err, ShouldNotBeNil) So(err, ShouldEqual, models.ErrAlertNotificationStateNotFound)
})
// recording an journal entry in another org to make sure org filter works as expected. Convey("Can insert new state for alert notifier", func() {
journalInOtherOrg := &m.UpdateAlertNotificationStateCommand{AlertId: alertId, NotifierId: notifierId, OrgId: 10, SentAt: 1} createCmd := &models.InsertAlertNotificationCommand{
err = RecordNotificationJournal(context.Background(), journalInOtherOrg) AlertId: alertId,
NotifierId: notifierId,
OrgId: orgId,
SentAt: 1,
State: models.AlertNotificationStateCompleted,
}
err := InsertAlertNotificationState(context.Background(), createCmd)
So(err, ShouldBeNil) So(err, ShouldBeNil)
Convey("should be able to record two journaling events", func() { err = InsertAlertNotificationState(context.Background(), createCmd)
createCmd := &m.UpdateAlertNotificationStateCommand{AlertId: alertId, NotifierId: notifierId, OrgId: orgId, SentAt: 1} So(err, ShouldEqual, models.ErrAlertNotificationStateAllreadyExist)
err := RecordNotificationJournal(context.Background(), createCmd) Convey("should be able to update alert notifier state", func() {
updateCmd := &models.UpdateAlertNotificationStateCommand{
Id: 1,
SentAt: 1,
State: models.AlertNotificationStatePending,
Version: 0,
}
err := UpdateAlertNotificationState(context.Background(), updateCmd)
So(err, ShouldBeNil) So(err, ShouldBeNil)
createCmd.SentAt += 1000 //increase epoch Convey("should not be able to update older versions", func() {
err = UpdateAlertNotificationState(context.Background(), updateCmd)
err = RecordNotificationJournal(context.Background(), createCmd) So(err, ShouldEqual, models.ErrAlertNotificationStateVersionConflict)
So(err, ShouldBeNil) })
}) })
}) })
}) })
Convey("Alert notifications should be empty", func() { Convey("Alert notifications should be empty", func() {
cmd := &m.GetAlertNotificationsQuery{ cmd := &models.GetAlertNotificationsQuery{
OrgId: 2, OrgId: 2,
Name: "email", Name: "email",
} }
@ -55,7 +71,7 @@ func TestAlertNotificationSQLAccess(t *testing.T) {
}) })
Convey("Cannot save alert notifier with send reminder = true", func() { Convey("Cannot save alert notifier with send reminder = true", func() {
cmd := &m.CreateAlertNotificationCommand{ cmd := &models.CreateAlertNotificationCommand{
Name: "ops", Name: "ops",
Type: "email", Type: "email",
OrgId: 1, OrgId: 1,
@ -65,7 +81,7 @@ func TestAlertNotificationSQLAccess(t *testing.T) {
Convey("and missing frequency", func() { Convey("and missing frequency", func() {
err := CreateAlertNotificationCommand(cmd) err := CreateAlertNotificationCommand(cmd)
So(err, ShouldEqual, m.ErrNotificationFrequencyNotFound) So(err, ShouldEqual, models.ErrNotificationFrequencyNotFound)
}) })
Convey("invalid frequency", func() { Convey("invalid frequency", func() {
@ -77,7 +93,7 @@ func TestAlertNotificationSQLAccess(t *testing.T) {
}) })
Convey("Cannot update alert notifier with send reminder = false", func() { Convey("Cannot update alert notifier with send reminder = false", func() {
cmd := &m.CreateAlertNotificationCommand{ cmd := &models.CreateAlertNotificationCommand{
Name: "ops update", Name: "ops update",
Type: "email", Type: "email",
OrgId: 1, OrgId: 1,
@ -88,14 +104,14 @@ func TestAlertNotificationSQLAccess(t *testing.T) {
err := CreateAlertNotificationCommand(cmd) err := CreateAlertNotificationCommand(cmd)
So(err, ShouldBeNil) So(err, ShouldBeNil)
updateCmd := &m.UpdateAlertNotificationCommand{ updateCmd := &models.UpdateAlertNotificationCommand{
Id: cmd.Result.Id, Id: cmd.Result.Id,
SendReminder: true, SendReminder: true,
} }
Convey("and missing frequency", func() { Convey("and missing frequency", func() {
err := UpdateAlertNotification(updateCmd) err := UpdateAlertNotification(updateCmd)
So(err, ShouldEqual, m.ErrNotificationFrequencyNotFound) So(err, ShouldEqual, models.ErrNotificationFrequencyNotFound)
}) })
Convey("invalid frequency", func() { Convey("invalid frequency", func() {
@ -108,7 +124,7 @@ func TestAlertNotificationSQLAccess(t *testing.T) {
}) })
Convey("Can save Alert Notification", func() { Convey("Can save Alert Notification", func() {
cmd := &m.CreateAlertNotificationCommand{ cmd := &models.CreateAlertNotificationCommand{
Name: "ops", Name: "ops",
Type: "email", Type: "email",
OrgId: 1, OrgId: 1,
@ -130,7 +146,7 @@ func TestAlertNotificationSQLAccess(t *testing.T) {
}) })
Convey("Can update alert notification", func() { Convey("Can update alert notification", func() {
newCmd := &m.UpdateAlertNotificationCommand{ newCmd := &models.UpdateAlertNotificationCommand{
Name: "NewName", Name: "NewName",
Type: "webhook", Type: "webhook",
OrgId: cmd.Result.OrgId, OrgId: cmd.Result.OrgId,
@ -146,7 +162,7 @@ func TestAlertNotificationSQLAccess(t *testing.T) {
}) })
Convey("Can update alert notification to disable sending of reminders", func() { Convey("Can update alert notification to disable sending of reminders", func() {
newCmd := &m.UpdateAlertNotificationCommand{ newCmd := &models.UpdateAlertNotificationCommand{
Name: "NewName", Name: "NewName",
Type: "webhook", Type: "webhook",
OrgId: cmd.Result.OrgId, OrgId: cmd.Result.OrgId,
@ -161,12 +177,12 @@ func TestAlertNotificationSQLAccess(t *testing.T) {
}) })
Convey("Can search using an array of ids", func() { Convey("Can search using an array of ids", func() {
cmd1 := m.CreateAlertNotificationCommand{Name: "nagios", Type: "webhook", OrgId: 1, SendReminder: true, Frequency: "10s", Settings: simplejson.New()} cmd1 := models.CreateAlertNotificationCommand{Name: "nagios", Type: "webhook", OrgId: 1, SendReminder: true, Frequency: "10s", Settings: simplejson.New()}
cmd2 := m.CreateAlertNotificationCommand{Name: "slack", Type: "webhook", OrgId: 1, SendReminder: true, Frequency: "10s", Settings: simplejson.New()} cmd2 := models.CreateAlertNotificationCommand{Name: "slack", Type: "webhook", OrgId: 1, SendReminder: true, Frequency: "10s", Settings: simplejson.New()}
cmd3 := m.CreateAlertNotificationCommand{Name: "ops2", Type: "email", OrgId: 1, SendReminder: true, Frequency: "10s", Settings: simplejson.New()} cmd3 := models.CreateAlertNotificationCommand{Name: "ops2", Type: "email", OrgId: 1, SendReminder: true, Frequency: "10s", Settings: simplejson.New()}
cmd4 := m.CreateAlertNotificationCommand{IsDefault: true, Name: "default", Type: "email", OrgId: 1, SendReminder: true, Frequency: "10s", Settings: simplejson.New()} cmd4 := models.CreateAlertNotificationCommand{IsDefault: true, Name: "default", Type: "email", OrgId: 1, SendReminder: true, Frequency: "10s", Settings: simplejson.New()}
otherOrg := m.CreateAlertNotificationCommand{Name: "default", Type: "email", OrgId: 2, SendReminder: true, Frequency: "10s", Settings: simplejson.New()} otherOrg := models.CreateAlertNotificationCommand{Name: "default", Type: "email", OrgId: 2, SendReminder: true, Frequency: "10s", Settings: simplejson.New()}
So(CreateAlertNotificationCommand(&cmd1), ShouldBeNil) So(CreateAlertNotificationCommand(&cmd1), ShouldBeNil)
So(CreateAlertNotificationCommand(&cmd2), ShouldBeNil) So(CreateAlertNotificationCommand(&cmd2), ShouldBeNil)
@ -175,7 +191,7 @@ func TestAlertNotificationSQLAccess(t *testing.T) {
So(CreateAlertNotificationCommand(&otherOrg), ShouldBeNil) So(CreateAlertNotificationCommand(&otherOrg), ShouldBeNil)
Convey("search", func() { Convey("search", func() {
query := &m.GetAlertNotificationsToSendQuery{ query := &models.GetAlertNotificationsToSendQuery{
Ids: []int64{cmd1.Result.Id, cmd2.Result.Id, 112341231}, Ids: []int64{cmd1.Result.Id, cmd2.Result.Id, 112341231},
OrgId: 1, OrgId: 1,
} }
@ -186,7 +202,7 @@ func TestAlertNotificationSQLAccess(t *testing.T) {
}) })
Convey("all", func() { Convey("all", func() {
query := &m.GetAllAlertNotificationsQuery{ query := &models.GetAllAlertNotificationsQuery{
OrgId: 1, OrgId: 1,
} }

View File

@ -122,10 +122,11 @@ func addAlertMigrations(mg *Migrator) {
{Name: "version", Type: DB_BigInt, Nullable: false}, {Name: "version", Type: DB_BigInt, Nullable: false},
}, },
Indices: []*Index{ Indices: []*Index{
{Cols: []string{"org_id", "alert_id", "notifier_id"}, Type: IndexType}, {Cols: []string{"org_id", "alert_id", "notifier_id"}, Type: UniqueIndex},
}, },
} }
mg.AddMigration("create alert_notification_state table v1", NewAddTableMigration(alert_notification_state)) mg.AddMigration("create alert_notification_state table v1", NewAddTableMigration(alert_notification_state))
mg.AddMigration("add index alert_notification_state org_id & alert_id & notifier_id", NewAddIndexMigration(alert_notification_state, notification_journal.Indices[0])) mg.AddMigration("add index alert_notification_state org_id & alert_id & notifier_id",
NewAddIndexMigration(alert_notification_state, alert_notification_state.Indices[0]))
} }