mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Chore: Move annotations cleanup to the annotations service (#55618)
This commit is contained in:
parent
4739ff06a5
commit
2f14575dd3
@ -213,6 +213,8 @@ var wireBasicSet = wire.NewSet(
|
|||||||
httpclientprovider.New,
|
httpclientprovider.New,
|
||||||
wire.Bind(new(httpclient.Provider), new(*sdkhttpclient.Provider)),
|
wire.Bind(new(httpclient.Provider), new(*sdkhttpclient.Provider)),
|
||||||
serverlock.ProvideService,
|
serverlock.ProvideService,
|
||||||
|
annotationsimpl.ProvideCleanupService,
|
||||||
|
wire.Bind(new(annotations.Cleaner), new(*annotationsimpl.CleanupServiceImpl)),
|
||||||
cleanup.ProvideService,
|
cleanup.ProvideService,
|
||||||
shorturls.ProvideService,
|
shorturls.ProvideService,
|
||||||
wire.Bind(new(shorturls.Service), new(*shorturls.ShortURLService)),
|
wire.Bind(new(shorturls.Service), new(*shorturls.ShortURLService)),
|
||||||
|
@ -19,18 +19,7 @@ type Repository interface {
|
|||||||
FindTags(ctx context.Context, query *TagsQuery) (FindTagsResult, error)
|
FindTags(ctx context.Context, query *TagsQuery) (FindTagsResult, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// AnnotationCleaner is responsible for cleaning up old annotations
|
// Cleaner is responsible for cleaning up old annotations
|
||||||
type AnnotationCleaner interface {
|
type Cleaner interface {
|
||||||
CleanAnnotations(ctx context.Context, cfg *setting.Cfg) (int64, int64, error)
|
Run(ctx context.Context, cfg *setting.Cfg) (int64, int64, error)
|
||||||
}
|
|
||||||
|
|
||||||
// var repositoryInstance Repository
|
|
||||||
var cleanerInstance AnnotationCleaner
|
|
||||||
|
|
||||||
func GetAnnotationCleaner() AnnotationCleaner {
|
|
||||||
return cleanerInstance
|
|
||||||
}
|
|
||||||
|
|
||||||
func SetAnnotationCleaner(rep AnnotationCleaner) {
|
|
||||||
cleanerInstance = rep
|
|
||||||
}
|
}
|
||||||
|
@ -16,7 +16,7 @@ type RepositoryImpl struct {
|
|||||||
|
|
||||||
func ProvideService(db db.DB, cfg *setting.Cfg, tagService tag.Service) *RepositoryImpl {
|
func ProvideService(db db.DB, cfg *setting.Cfg, tagService tag.Service) *RepositoryImpl {
|
||||||
return &RepositoryImpl{
|
return &RepositoryImpl{
|
||||||
store: &SQLAnnotationRepo{
|
store: &xormRepositoryImpl{
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
db: db,
|
db: db,
|
||||||
log: log.New("annotations"),
|
log: log.New("annotations"),
|
||||||
|
62
pkg/services/annotations/annotationsimpl/cleanup.go
Normal file
62
pkg/services/annotations/annotationsimpl/cleanup.go
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
package annotationsimpl
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
|
"github.com/grafana/grafana/pkg/services/sqlstore/db"
|
||||||
|
"github.com/grafana/grafana/pkg/setting"
|
||||||
|
)
|
||||||
|
|
||||||
|
// CleanupServiceImpl is responsible for cleaning old annotations.
|
||||||
|
type CleanupServiceImpl struct {
|
||||||
|
store store
|
||||||
|
}
|
||||||
|
|
||||||
|
func ProvideCleanupService(db db.DB, cfg *setting.Cfg) *CleanupServiceImpl {
|
||||||
|
return &CleanupServiceImpl{
|
||||||
|
store: &xormRepositoryImpl{
|
||||||
|
cfg: cfg,
|
||||||
|
db: db,
|
||||||
|
log: log.New("annotations"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
alertAnnotationType = "alert_id <> 0"
|
||||||
|
dashboardAnnotationType = "dashboard_id <> 0 AND alert_id = 0"
|
||||||
|
apiAnnotationType = "alert_id = 0 AND dashboard_id = 0"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Run deletes old annotations created by alert rules, API
|
||||||
|
// requests and human made in the UI. It subsequently deletes orphaned rows
|
||||||
|
// from the annotation_tag table. Cleanup actions are performed in batches
|
||||||
|
// so that no query takes too long to complete.
|
||||||
|
//
|
||||||
|
// Returns the number of annotation and annotation_tag rows deleted. If an
|
||||||
|
// error occurs, it returns the number of rows affected so far.
|
||||||
|
func (cs *CleanupServiceImpl) Run(ctx context.Context, cfg *setting.Cfg) (int64, int64, error) {
|
||||||
|
var totalCleanedAnnotations int64
|
||||||
|
affected, err := cs.store.CleanAnnotations(ctx, cfg.AlertingAnnotationCleanupSetting, alertAnnotationType)
|
||||||
|
totalCleanedAnnotations += affected
|
||||||
|
if err != nil {
|
||||||
|
return totalCleanedAnnotations, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
affected, err = cs.store.CleanAnnotations(ctx, cfg.APIAnnotationCleanupSettings, apiAnnotationType)
|
||||||
|
totalCleanedAnnotations += affected
|
||||||
|
if err != nil {
|
||||||
|
return totalCleanedAnnotations, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
affected, err = cs.store.CleanAnnotations(ctx, cfg.DashboardAnnotationCleanupSettings, dashboardAnnotationType)
|
||||||
|
totalCleanedAnnotations += affected
|
||||||
|
if err != nil {
|
||||||
|
return totalCleanedAnnotations, 0, err
|
||||||
|
}
|
||||||
|
if totalCleanedAnnotations > 0 {
|
||||||
|
affected, err = cs.store.CleanOrphanedAnnotationTags(ctx)
|
||||||
|
}
|
||||||
|
return totalCleanedAnnotations, affected, err
|
||||||
|
}
|
@ -1,4 +1,4 @@
|
|||||||
package sqlstore
|
package annotationsimpl
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@ -7,16 +7,17 @@ import (
|
|||||||
|
|
||||||
"github.com/grafana/grafana/pkg/infra/log"
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
"github.com/grafana/grafana/pkg/services/annotations"
|
"github.com/grafana/grafana/pkg/services/annotations"
|
||||||
|
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||||
"github.com/grafana/grafana/pkg/setting"
|
"github.com/grafana/grafana/pkg/setting"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestAnnotationCleanUp(t *testing.T) {
|
func TestAnnotationCleanUp(t *testing.T) {
|
||||||
fakeSQL := InitTestDB(t)
|
fakeSQL := sqlstore.InitTestDB(t)
|
||||||
|
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
err := fakeSQL.WithDbSession(context.Background(), func(session *DBSession) error {
|
err := fakeSQL.WithDbSession(context.Background(), func(session *sqlstore.DBSession) error {
|
||||||
_, err := session.Exec("DELETE FROM annotation")
|
_, err := session.Exec("DELETE FROM annotation")
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
@ -87,8 +88,10 @@ func TestAnnotationCleanUp(t *testing.T) {
|
|||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
cleaner := &AnnotationCleanupService{batchSize: 1, log: log.New("test-logger"), sqlstore: fakeSQL}
|
cfg := setting.NewCfg()
|
||||||
affectedAnnotations, affectedAnnotationTags, err := cleaner.CleanAnnotations(context.Background(), test.cfg)
|
cfg.AnnotationCleanupJobBatchSize = 1
|
||||||
|
cleaner := ProvideCleanupService(fakeSQL, cfg)
|
||||||
|
affectedAnnotations, affectedAnnotationTags, err := cleaner.Run(context.Background(), test.cfg)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
assert.Equal(t, test.affectedAnnotations, affectedAnnotations)
|
assert.Equal(t, test.affectedAnnotations, affectedAnnotations)
|
||||||
@ -108,10 +111,10 @@ func TestAnnotationCleanUp(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestOldAnnotationsAreDeletedFirst(t *testing.T) {
|
func TestOldAnnotationsAreDeletedFirst(t *testing.T) {
|
||||||
fakeSQL := InitTestDB(t)
|
fakeSQL := sqlstore.InitTestDB(t)
|
||||||
|
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
err := fakeSQL.WithDbSession(context.Background(), func(session *DBSession) error {
|
err := fakeSQL.WithDbSession(context.Background(), func(session *sqlstore.DBSession) error {
|
||||||
_, err := session.Exec("DELETE FROM annotation")
|
_, err := session.Exec("DELETE FROM annotation")
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
@ -142,8 +145,10 @@ func TestOldAnnotationsAreDeletedFirst(t *testing.T) {
|
|||||||
require.NoError(t, err, "cannot insert annotation")
|
require.NoError(t, err, "cannot insert annotation")
|
||||||
|
|
||||||
// run the clean up task to keep one annotation.
|
// run the clean up task to keep one annotation.
|
||||||
cleaner := &AnnotationCleanupService{batchSize: 1, log: log.New("test-logger"), sqlstore: fakeSQL}
|
cfg := setting.NewCfg()
|
||||||
_, err = cleaner.cleanAnnotations(context.Background(), setting.AnnotationCleanupSettings{MaxCount: 1}, alertAnnotationType)
|
cfg.AnnotationCleanupJobBatchSize = 1
|
||||||
|
cleaner := &xormRepositoryImpl{cfg: cfg, log: log.New("test-logger"), db: fakeSQL}
|
||||||
|
_, err = cleaner.CleanAnnotations(context.Background(), setting.AnnotationCleanupSettings{MaxCount: 1}, alertAnnotationType)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// assert that the last annotations were kept
|
// assert that the last annotations were kept
|
||||||
@ -156,7 +161,7 @@ func TestOldAnnotationsAreDeletedFirst(t *testing.T) {
|
|||||||
require.Equal(t, int64(0), countOld, "the two first annotations should have been deleted")
|
require.Equal(t, int64(0), countOld, "the two first annotations should have been deleted")
|
||||||
}
|
}
|
||||||
|
|
||||||
func assertAnnotationCount(t *testing.T, fakeSQL *SQLStore, sql string, expectedCount int64) {
|
func assertAnnotationCount(t *testing.T, fakeSQL *sqlstore.SQLStore, sql string, expectedCount int64) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
session := fakeSQL.NewSession(context.Background())
|
session := fakeSQL.NewSession(context.Background())
|
||||||
@ -166,7 +171,7 @@ func assertAnnotationCount(t *testing.T, fakeSQL *SQLStore, sql string, expected
|
|||||||
require.Equal(t, expectedCount, count)
|
require.Equal(t, expectedCount, count)
|
||||||
}
|
}
|
||||||
|
|
||||||
func assertAnnotationTagCount(t *testing.T, fakeSQL *SQLStore, expectedCount int64) {
|
func assertAnnotationTagCount(t *testing.T, fakeSQL *sqlstore.SQLStore, expectedCount int64) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
session := fakeSQL.NewSession(context.Background())
|
session := fakeSQL.NewSession(context.Background())
|
||||||
@ -177,7 +182,7 @@ func assertAnnotationTagCount(t *testing.T, fakeSQL *SQLStore, expectedCount int
|
|||||||
require.Equal(t, expectedCount, count)
|
require.Equal(t, expectedCount, count)
|
||||||
}
|
}
|
||||||
|
|
||||||
func createTestAnnotations(t *testing.T, sqlstore *SQLStore, expectedCount int, oldAnnotations int) {
|
func createTestAnnotations(t *testing.T, sqlstore *sqlstore.SQLStore, expectedCount int, oldAnnotations int) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
cutoffDate := time.Now()
|
cutoffDate := time.Now()
|
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/services/annotations"
|
"github.com/grafana/grafana/pkg/services/annotations"
|
||||||
|
"github.com/grafana/grafana/pkg/setting"
|
||||||
)
|
)
|
||||||
|
|
||||||
type store interface {
|
type store interface {
|
||||||
@ -12,4 +13,6 @@ type store interface {
|
|||||||
Get(ctx context.Context, query *annotations.ItemQuery) ([]*annotations.ItemDTO, error)
|
Get(ctx context.Context, query *annotations.ItemQuery) ([]*annotations.ItemDTO, error)
|
||||||
Delete(ctx context.Context, params *annotations.DeleteParams) error
|
Delete(ctx context.Context, params *annotations.DeleteParams) error
|
||||||
GetTags(ctx context.Context, query *annotations.TagsQuery) (annotations.FindTagsResult, error)
|
GetTags(ctx context.Context, query *annotations.TagsQuery) (annotations.FindTagsResult, error)
|
||||||
|
CleanAnnotations(ctx context.Context, cfg setting.AnnotationCleanupSettings, annotationType string) (int64, error)
|
||||||
|
CleanOrphanedAnnotationTags(ctx context.Context) (int64, error)
|
||||||
}
|
}
|
||||||
|
@ -40,14 +40,14 @@ func validateTimeRange(item *annotations.Item) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type SQLAnnotationRepo struct {
|
type xormRepositoryImpl struct {
|
||||||
cfg *setting.Cfg
|
cfg *setting.Cfg
|
||||||
db db.DB
|
db db.DB
|
||||||
log log.Logger
|
log log.Logger
|
||||||
tagService tag.Service
|
tagService tag.Service
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *SQLAnnotationRepo) Add(ctx context.Context, item *annotations.Item) error {
|
func (r *xormRepositoryImpl) Add(ctx context.Context, item *annotations.Item) error {
|
||||||
tags := tag.ParseTagPairs(item.Tags)
|
tags := tag.ParseTagPairs(item.Tags)
|
||||||
item.Tags = tag.JoinTagPairs(tags)
|
item.Tags = tag.JoinTagPairs(tags)
|
||||||
item.Created = timeNow().UnixNano() / int64(time.Millisecond)
|
item.Created = timeNow().UnixNano() / int64(time.Millisecond)
|
||||||
@ -79,7 +79,7 @@ func (r *SQLAnnotationRepo) Add(ctx context.Context, item *annotations.Item) err
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *SQLAnnotationRepo) Update(ctx context.Context, item *annotations.Item) error {
|
func (r *xormRepositoryImpl) Update(ctx context.Context, item *annotations.Item) error {
|
||||||
return r.db.WithTransactionalDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
return r.db.WithTransactionalDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||||
var (
|
var (
|
||||||
isExist bool
|
isExist bool
|
||||||
@ -132,7 +132,7 @@ func (r *SQLAnnotationRepo) Update(ctx context.Context, item *annotations.Item)
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *SQLAnnotationRepo) Get(ctx context.Context, query *annotations.ItemQuery) ([]*annotations.ItemDTO, error) {
|
func (r *xormRepositoryImpl) Get(ctx context.Context, query *annotations.ItemQuery) ([]*annotations.ItemDTO, error) {
|
||||||
var sql bytes.Buffer
|
var sql bytes.Buffer
|
||||||
params := make([]interface{}, 0)
|
params := make([]interface{}, 0)
|
||||||
items := make([]*annotations.ItemDTO, 0)
|
items := make([]*annotations.ItemDTO, 0)
|
||||||
@ -291,7 +291,7 @@ func getAccessControlFilter(user *user.SignedInUser) (string, []interface{}, err
|
|||||||
return strings.Join(filters, " OR "), params, nil
|
return strings.Join(filters, " OR "), params, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *SQLAnnotationRepo) Delete(ctx context.Context, params *annotations.DeleteParams) error {
|
func (r *xormRepositoryImpl) Delete(ctx context.Context, params *annotations.DeleteParams) error {
|
||||||
return r.db.WithTransactionalDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
return r.db.WithTransactionalDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||||
var (
|
var (
|
||||||
sql string
|
sql string
|
||||||
@ -327,7 +327,7 @@ func (r *SQLAnnotationRepo) Delete(ctx context.Context, params *annotations.Dele
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *SQLAnnotationRepo) GetTags(ctx context.Context, query *annotations.TagsQuery) (annotations.FindTagsResult, error) {
|
func (r *xormRepositoryImpl) GetTags(ctx context.Context, query *annotations.TagsQuery) (annotations.FindTagsResult, error) {
|
||||||
var items []*annotations.Tag
|
var items []*annotations.Tag
|
||||||
err := r.db.WithDbSession(ctx, func(dbSession *sqlstore.DBSession) error {
|
err := r.db.WithDbSession(ctx, func(dbSession *sqlstore.DBSession) error {
|
||||||
if query.Limit == 0 {
|
if query.Limit == 0 {
|
||||||
@ -378,3 +378,64 @@ func (r *SQLAnnotationRepo) GetTags(ctx context.Context, query *annotations.Tags
|
|||||||
|
|
||||||
return annotations.FindTagsResult{Tags: tags}, nil
|
return annotations.FindTagsResult{Tags: tags}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *xormRepositoryImpl) CleanAnnotations(ctx context.Context, cfg setting.AnnotationCleanupSettings, annotationType string) (int64, error) {
|
||||||
|
var totalAffected int64
|
||||||
|
if cfg.MaxAge > 0 {
|
||||||
|
cutoffDate := time.Now().Add(-cfg.MaxAge).UnixNano() / int64(time.Millisecond)
|
||||||
|
deleteQuery := `DELETE FROM annotation WHERE id IN (SELECT id FROM (SELECT id FROM annotation WHERE %s AND created < %v ORDER BY id DESC %s) a)`
|
||||||
|
sql := fmt.Sprintf(deleteQuery, annotationType, cutoffDate, r.db.GetDialect().Limit(r.cfg.AnnotationCleanupJobBatchSize))
|
||||||
|
|
||||||
|
affected, err := r.executeUntilDoneOrCancelled(ctx, sql)
|
||||||
|
totalAffected += affected
|
||||||
|
if err != nil {
|
||||||
|
return totalAffected, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if cfg.MaxCount > 0 {
|
||||||
|
deleteQuery := `DELETE FROM annotation WHERE id IN (SELECT id FROM (SELECT id FROM annotation WHERE %s ORDER BY id DESC %s) a)`
|
||||||
|
sql := fmt.Sprintf(deleteQuery, annotationType, r.db.GetDialect().LimitOffset(r.cfg.AnnotationCleanupJobBatchSize, cfg.MaxCount))
|
||||||
|
affected, err := r.executeUntilDoneOrCancelled(ctx, sql)
|
||||||
|
totalAffected += affected
|
||||||
|
return totalAffected, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return totalAffected, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *xormRepositoryImpl) CleanOrphanedAnnotationTags(ctx context.Context) (int64, error) {
|
||||||
|
deleteQuery := `DELETE FROM annotation_tag WHERE id IN ( SELECT id FROM (SELECT id FROM annotation_tag WHERE NOT EXISTS (SELECT 1 FROM annotation a WHERE annotation_id = a.id) %s) a)`
|
||||||
|
sql := fmt.Sprintf(deleteQuery, r.db.GetDialect().Limit(r.cfg.AnnotationCleanupJobBatchSize))
|
||||||
|
return r.executeUntilDoneOrCancelled(ctx, sql)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *xormRepositoryImpl) executeUntilDoneOrCancelled(ctx context.Context, sql string) (int64, error) {
|
||||||
|
var totalAffected int64
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return totalAffected, ctx.Err()
|
||||||
|
default:
|
||||||
|
var affected int64
|
||||||
|
err := r.db.WithDbSession(ctx, func(session *sqlstore.DBSession) error {
|
||||||
|
res, err := session.Exec(sql)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
affected, err = res.RowsAffected()
|
||||||
|
totalAffected += affected
|
||||||
|
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return totalAffected, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if affected == 0 {
|
||||||
|
return totalAffected, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -28,7 +28,7 @@ func TestIntegrationAnnotations(t *testing.T) {
|
|||||||
t.Skip("skipping integration test")
|
t.Skip("skipping integration test")
|
||||||
}
|
}
|
||||||
sql := sqlstore.InitTestDB(t)
|
sql := sqlstore.InitTestDB(t)
|
||||||
repo := SQLAnnotationRepo{db: sql, cfg: setting.NewCfg(), log: log.New("annotation.test"), tagService: tagimpl.ProvideService(sql)}
|
repo := xormRepositoryImpl{db: sql, cfg: setting.NewCfg(), log: log.New("annotation.test"), tagService: tagimpl.ProvideService(sql)}
|
||||||
|
|
||||||
testUser := &user.SignedInUser{
|
testUser := &user.SignedInUser{
|
||||||
OrgID: 1,
|
OrgID: 1,
|
||||||
@ -394,7 +394,7 @@ func TestIntegrationAnnotationListingWithRBAC(t *testing.T) {
|
|||||||
t.Skip("skipping integration test")
|
t.Skip("skipping integration test")
|
||||||
}
|
}
|
||||||
sql := sqlstore.InitTestDB(t, sqlstore.InitTestDBOpt{})
|
sql := sqlstore.InitTestDB(t, sqlstore.InitTestDBOpt{})
|
||||||
repo := SQLAnnotationRepo{db: sql, cfg: setting.NewCfg(), log: log.New("annotation.test"), tagService: tagimpl.ProvideService(sql)}
|
repo := xormRepositoryImpl{db: sql, cfg: setting.NewCfg(), log: log.New("annotation.test"), tagService: tagimpl.ProvideService(sql)}
|
||||||
dashboardStore := dashboardstore.ProvideDashboardStore(sql, featuremgmt.WithFeatures(), tagimpl.ProvideService(sql))
|
dashboardStore := dashboardstore.ProvideDashboardStore(sql, featuremgmt.WithFeatures(), tagimpl.ProvideService(sql))
|
||||||
|
|
||||||
testDashboard1 := models.SaveDashboardCommand{
|
testDashboard1 := models.SaveDashboardCommand{
|
||||||
|
18
pkg/services/annotations/annotationstest/fake_cleanup.go
Normal file
18
pkg/services/annotations/annotationstest/fake_cleanup.go
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
package annotationstest
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/grafana/grafana/pkg/setting"
|
||||||
|
)
|
||||||
|
|
||||||
|
type fakeCleaner struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewFakeCleaner() *fakeCleaner {
|
||||||
|
return &fakeCleaner{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeCleaner) Run(ctx context.Context, cfg *setting.Cfg) (int64, int64, error) {
|
||||||
|
return 0, 0, nil
|
||||||
|
}
|
@ -31,7 +31,7 @@ import (
|
|||||||
func ProvideService(cfg *setting.Cfg, serverLockService *serverlock.ServerLockService,
|
func ProvideService(cfg *setting.Cfg, serverLockService *serverlock.ServerLockService,
|
||||||
shortURLService shorturls.Service, sqlstore *sqlstore.SQLStore, queryHistoryService queryhistory.Service,
|
shortURLService shorturls.Service, sqlstore *sqlstore.SQLStore, queryHistoryService queryhistory.Service,
|
||||||
dashboardVersionService dashver.Service, dashSnapSvc dashboardsnapshots.Service, deleteExpiredImageService *image.DeleteExpiredService,
|
dashboardVersionService dashver.Service, dashSnapSvc dashboardsnapshots.Service, deleteExpiredImageService *image.DeleteExpiredService,
|
||||||
loginAttemptService loginattempt.Service, tempUserService tempuser.Service, tracer tracing.Tracer) *CleanUpService {
|
loginAttemptService loginattempt.Service, tempUserService tempuser.Service, tracer tracing.Tracer, annotationCleaner annotations.Cleaner) *CleanUpService {
|
||||||
s := &CleanUpService{
|
s := &CleanUpService{
|
||||||
Cfg: cfg,
|
Cfg: cfg,
|
||||||
ServerLockService: serverLockService,
|
ServerLockService: serverLockService,
|
||||||
@ -45,6 +45,7 @@ func ProvideService(cfg *setting.Cfg, serverLockService *serverlock.ServerLockSe
|
|||||||
loginAttemptService: loginAttemptService,
|
loginAttemptService: loginAttemptService,
|
||||||
tempUserService: tempUserService,
|
tempUserService: tempUserService,
|
||||||
tracer: tracer,
|
tracer: tracer,
|
||||||
|
annotationCleaner: annotationCleaner,
|
||||||
}
|
}
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
@ -62,6 +63,7 @@ type CleanUpService struct {
|
|||||||
deleteExpiredImageService *image.DeleteExpiredService
|
deleteExpiredImageService *image.DeleteExpiredService
|
||||||
loginAttemptService loginattempt.Service
|
loginAttemptService loginattempt.Service
|
||||||
tempUserService tempuser.Service
|
tempUserService tempuser.Service
|
||||||
|
annotationCleaner annotations.Cleaner
|
||||||
}
|
}
|
||||||
|
|
||||||
type cleanUpJob struct {
|
type cleanUpJob struct {
|
||||||
@ -125,8 +127,7 @@ func (srv *CleanUpService) clean(ctx context.Context) {
|
|||||||
|
|
||||||
func (srv *CleanUpService) cleanUpOldAnnotations(ctx context.Context) {
|
func (srv *CleanUpService) cleanUpOldAnnotations(ctx context.Context) {
|
||||||
logger := srv.log.FromContext(ctx)
|
logger := srv.log.FromContext(ctx)
|
||||||
cleaner := annotations.GetAnnotationCleaner()
|
affected, affectedTags, err := srv.annotationCleaner.Run(ctx, srv.Cfg)
|
||||||
affected, affectedTags, err := cleaner.CleanAnnotations(ctx, srv.Cfg)
|
|
||||||
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
|
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
|
||||||
logger.Error("failed to clean up old annotations", "error", err)
|
logger.Error("failed to clean up old annotations", "error", err)
|
||||||
} else {
|
} else {
|
||||||
|
@ -1,116 +0,0 @@
|
|||||||
package sqlstore
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/infra/log"
|
|
||||||
"github.com/grafana/grafana/pkg/setting"
|
|
||||||
)
|
|
||||||
|
|
||||||
// AnnotationCleanupService is responsible for cleaning old annotations.
|
|
||||||
type AnnotationCleanupService struct {
|
|
||||||
batchSize int64
|
|
||||||
log log.Logger
|
|
||||||
sqlstore *SQLStore
|
|
||||||
}
|
|
||||||
|
|
||||||
const (
|
|
||||||
alertAnnotationType = "alert_id <> 0"
|
|
||||||
dashboardAnnotationType = "dashboard_id <> 0 AND alert_id = 0"
|
|
||||||
apiAnnotationType = "alert_id = 0 AND dashboard_id = 0"
|
|
||||||
)
|
|
||||||
|
|
||||||
// CleanAnnotations deletes old annotations created by alert rules, API
|
|
||||||
// requests and human made in the UI. It subsequently deletes orphaned rows
|
|
||||||
// from the annotation_tag table. Cleanup actions are performed in batches
|
|
||||||
// so that no query takes too long to complete.
|
|
||||||
//
|
|
||||||
// Returns the number of annotation and annotation_tag rows deleted. If an
|
|
||||||
// error occurs, it returns the number of rows affected so far.
|
|
||||||
func (acs *AnnotationCleanupService) CleanAnnotations(ctx context.Context, cfg *setting.Cfg) (int64, int64, error) {
|
|
||||||
var totalCleanedAnnotations int64
|
|
||||||
affected, err := acs.cleanAnnotations(ctx, cfg.AlertingAnnotationCleanupSetting, alertAnnotationType)
|
|
||||||
totalCleanedAnnotations += affected
|
|
||||||
if err != nil {
|
|
||||||
return totalCleanedAnnotations, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
affected, err = acs.cleanAnnotations(ctx, cfg.APIAnnotationCleanupSettings, apiAnnotationType)
|
|
||||||
totalCleanedAnnotations += affected
|
|
||||||
if err != nil {
|
|
||||||
return totalCleanedAnnotations, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
affected, err = acs.cleanAnnotations(ctx, cfg.DashboardAnnotationCleanupSettings, dashboardAnnotationType)
|
|
||||||
totalCleanedAnnotations += affected
|
|
||||||
if err != nil {
|
|
||||||
return totalCleanedAnnotations, 0, err
|
|
||||||
}
|
|
||||||
if totalCleanedAnnotations > 0 {
|
|
||||||
affected, err = acs.cleanOrphanedAnnotationTags(ctx)
|
|
||||||
}
|
|
||||||
return totalCleanedAnnotations, affected, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (acs *AnnotationCleanupService) cleanAnnotations(ctx context.Context, cfg setting.AnnotationCleanupSettings, annotationType string) (int64, error) {
|
|
||||||
var totalAffected int64
|
|
||||||
if cfg.MaxAge > 0 {
|
|
||||||
cutoffDate := time.Now().Add(-cfg.MaxAge).UnixNano() / int64(time.Millisecond)
|
|
||||||
deleteQuery := `DELETE FROM annotation WHERE id IN (SELECT id FROM (SELECT id FROM annotation WHERE %s AND created < %v ORDER BY id DESC %s) a)`
|
|
||||||
sql := fmt.Sprintf(deleteQuery, annotationType, cutoffDate, dialect.Limit(acs.batchSize))
|
|
||||||
|
|
||||||
affected, err := acs.executeUntilDoneOrCancelled(ctx, sql)
|
|
||||||
totalAffected += affected
|
|
||||||
if err != nil {
|
|
||||||
return totalAffected, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if cfg.MaxCount > 0 {
|
|
||||||
deleteQuery := `DELETE FROM annotation WHERE id IN (SELECT id FROM (SELECT id FROM annotation WHERE %s ORDER BY id DESC %s) a)`
|
|
||||||
sql := fmt.Sprintf(deleteQuery, annotationType, dialect.LimitOffset(acs.batchSize, cfg.MaxCount))
|
|
||||||
affected, err := acs.executeUntilDoneOrCancelled(ctx, sql)
|
|
||||||
totalAffected += affected
|
|
||||||
return totalAffected, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return totalAffected, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (acs *AnnotationCleanupService) cleanOrphanedAnnotationTags(ctx context.Context) (int64, error) {
|
|
||||||
deleteQuery := `DELETE FROM annotation_tag WHERE id IN ( SELECT id FROM (SELECT id FROM annotation_tag WHERE NOT EXISTS (SELECT 1 FROM annotation a WHERE annotation_id = a.id) %s) a)`
|
|
||||||
sql := fmt.Sprintf(deleteQuery, dialect.Limit(acs.batchSize))
|
|
||||||
return acs.executeUntilDoneOrCancelled(ctx, sql)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (acs *AnnotationCleanupService) executeUntilDoneOrCancelled(ctx context.Context, sql string) (int64, error) {
|
|
||||||
var totalAffected int64
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return totalAffected, ctx.Err()
|
|
||||||
default:
|
|
||||||
var affected int64
|
|
||||||
err := withDbSession(ctx, acs.sqlstore.engine, func(session *DBSession) error {
|
|
||||||
res, err := session.Exec(sql)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
affected, err = res.RowsAffected()
|
|
||||||
totalAffected += affected
|
|
||||||
|
|
||||||
return err
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return totalAffected, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if affected == 0 {
|
|
||||||
return totalAffected, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -25,7 +25,6 @@ import (
|
|||||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||||
"github.com/grafana/grafana/pkg/models"
|
"github.com/grafana/grafana/pkg/models"
|
||||||
"github.com/grafana/grafana/pkg/registry"
|
"github.com/grafana/grafana/pkg/registry"
|
||||||
"github.com/grafana/grafana/pkg/services/annotations"
|
|
||||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||||
"github.com/grafana/grafana/pkg/services/sqlstore/migrations"
|
"github.com/grafana/grafana/pkg/services/sqlstore/migrations"
|
||||||
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||||
@ -119,9 +118,6 @@ func newSQLStore(cfg *setting.Cfg, cacheService *localcache.CacheService, engine
|
|||||||
|
|
||||||
dialect = ss.Dialect
|
dialect = ss.Dialect
|
||||||
|
|
||||||
// Init repo instances
|
|
||||||
annotations.SetAnnotationCleaner(&AnnotationCleanupService{batchSize: ss.Cfg.AnnotationCleanupJobBatchSize, log: log.New("annotationcleaner"), sqlstore: ss})
|
|
||||||
|
|
||||||
// if err := ss.Reset(); err != nil {
|
// if err := ss.Reset(); err != nil {
|
||||||
// return nil, err
|
// return nil, err
|
||||||
// }
|
// }
|
||||||
|
Loading…
Reference in New Issue
Block a user