Split Delete expired dashboard version store (#49610)

* Split Delete expired dashboard version store

* Add method to fakes

* Fix lint

* Fix lint 2

* Use split store method in cleanup

* Add tests

* Remove DeleteExpiredVersions from sqlstore

* Fix lint

* Fix integration tests
This commit is contained in:
idafurjes 2022-05-31 11:56:05 +02:00 committed by GitHub
parent 72367cf1ad
commit f69c9bd704
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 177 additions and 172 deletions

View File

@ -8,6 +8,7 @@ import (
"path"
"time"
dashver "github.com/grafana/grafana/pkg/services/dashboardversion"
"github.com/grafana/grafana/pkg/services/queryhistory"
"github.com/grafana/grafana/pkg/services/shorturls"
"github.com/grafana/grafana/pkg/services/sqlstore"
@ -20,25 +21,28 @@ import (
)
func ProvideService(cfg *setting.Cfg, serverLockService *serverlock.ServerLockService,
shortURLService shorturls.Service, store sqlstore.Store, queryHistoryService queryhistory.Service) *CleanUpService {
shortURLService shorturls.Service, store sqlstore.Store, queryHistoryService queryhistory.Service,
dashboardVersionService dashver.Service) *CleanUpService {
s := &CleanUpService{
Cfg: cfg,
ServerLockService: serverLockService,
ShortURLService: shortURLService,
QueryHistoryService: queryHistoryService,
store: store,
log: log.New("cleanup"),
Cfg: cfg,
ServerLockService: serverLockService,
ShortURLService: shortURLService,
QueryHistoryService: queryHistoryService,
store: store,
log: log.New("cleanup"),
dashboardVersionService: dashboardVersionService,
}
return s
}
type CleanUpService struct {
log log.Logger
store sqlstore.Store
Cfg *setting.Cfg
ServerLockService *serverlock.ServerLockService
ShortURLService shorturls.Service
QueryHistoryService queryhistory.Service
log log.Logger
store sqlstore.Store
Cfg *setting.Cfg
ServerLockService *serverlock.ServerLockService
ShortURLService shorturls.Service
QueryHistoryService queryhistory.Service
dashboardVersionService dashver.Service
}
func (srv *CleanUpService) Run(ctx context.Context) error {
@ -141,8 +145,8 @@ func (srv *CleanUpService) deleteExpiredSnapshots(ctx context.Context) {
}
func (srv *CleanUpService) deleteExpiredDashboardVersions(ctx context.Context) {
cmd := models.DeleteExpiredVersionsCommand{}
if err := srv.store.DeleteExpiredVersions(ctx, &cmd); err != nil {
cmd := dashver.DeleteExpiredVersionsCommand{}
if err := srv.dashboardVersionService.DeleteExpired(ctx, &cmd); err != nil {
srv.log.Error("Failed to delete expired dashboard versions", "error", err.Error())
} else {
srv.log.Debug("Deleted old/expired dashboard versions", "rows affected", cmd.DeletedRows)

View File

@ -6,4 +6,5 @@ import (
type Service interface {
Get(ctx context.Context, query *GetDashboardVersionQuery) (*DashboardVersion, error)
DeleteExpired(ctx context.Context, cmd *DeleteExpiredVersionsCommand) error
}

View File

@ -8,11 +8,16 @@ import (
"github.com/grafana/grafana/pkg/setting"
)
const (
maxVersionsToDeletePerBatch = 100
maxVersionDeletionBatches = 50
)
type Service struct {
store store
}
func ProvideService(db db.DB, cfg *setting.Cfg) dashver.Service {
func ProvideService(db db.DB) dashver.Service {
return &Service{
store: &sqlStore{
db: db,
@ -28,3 +33,33 @@ func (s *Service) Get(ctx context.Context, query *dashver.GetDashboardVersionQue
version.Data.Set("id", version.DashboardID)
return version, nil
}
func (s *Service) DeleteExpired(ctx context.Context, cmd *dashver.DeleteExpiredVersionsCommand) error {
versionsToKeep := setting.DashboardVersionsToKeep
if versionsToKeep < 1 {
versionsToKeep = 1
}
for batch := 0; batch < maxVersionDeletionBatches; batch++ {
versionIdsToDelete, batchErr := s.store.GetBatch(ctx, cmd, maxVersionsToDeletePerBatch, versionsToKeep)
if batchErr != nil {
return batchErr
}
if len(versionIdsToDelete) < 1 {
return nil
}
deleted, err := s.store.DeleteBatch(ctx, cmd, versionIdsToDelete)
if err != nil {
return err
}
cmd.DeletedRows += deleted
if deleted < int64(maxVersionsToDeletePerBatch) {
break
}
}
return nil
}

View File

@ -2,10 +2,12 @@ package dashverimpl
import (
"context"
"errors"
"testing"
"github.com/grafana/grafana/pkg/components/simplejson"
dashver "github.com/grafana/grafana/pkg/services/dashboardversion"
"github.com/grafana/grafana/pkg/setting"
"github.com/stretchr/testify/require"
)
@ -25,15 +27,51 @@ func TestDashboardVersionService(t *testing.T) {
})
}
type FakeDashboardVersionStroe struct {
func TestDeleteExpiredVersions(t *testing.T) {
versionsToKeep := 5
setting.DashboardVersionsToKeep = versionsToKeep
dashboardVersionStore := newDashboardVersionStoreFake()
dashboardVersionService := Service{store: dashboardVersionStore}
t.Run("Don't delete anything if there are no expired versions", func(t *testing.T) {
err := dashboardVersionService.DeleteExpired(context.Background(), &dashver.DeleteExpiredVersionsCommand{DeletedRows: 4})
require.Nil(t, err)
})
t.Run("Clean up old dashboard versions successfully", func(t *testing.T) {
dashboardVersionStore.ExptectedDeletedVersions = 4
dashboardVersionStore.ExpectedVersions = []interface{}{1, 2, 3, 4}
err := dashboardVersionService.DeleteExpired(context.Background(), &dashver.DeleteExpiredVersionsCommand{DeletedRows: 4})
require.Nil(t, err)
})
t.Run("Clean up old dashboard versions with error", func(t *testing.T) {
dashboardVersionStore.ExpectedError = errors.New("some error")
err := dashboardVersionService.DeleteExpired(context.Background(), &dashver.DeleteExpiredVersionsCommand{DeletedRows: 4})
require.NotNil(t, err)
})
}
type FakeDashboardVersionStore struct {
ExpectedDashboardVersion *dashver.DashboardVersion
ExptectedDeletedVersions int64
ExpectedVersions []interface{}
ExpectedError error
}
func newDashboardVersionStoreFake() *FakeDashboardVersionStroe {
return &FakeDashboardVersionStroe{}
func newDashboardVersionStoreFake() *FakeDashboardVersionStore {
return &FakeDashboardVersionStore{}
}
func (f *FakeDashboardVersionStroe) Get(ctx context.Context, query *dashver.GetDashboardVersionQuery) (*dashver.DashboardVersion, error) {
func (f *FakeDashboardVersionStore) Get(ctx context.Context, query *dashver.GetDashboardVersionQuery) (*dashver.DashboardVersion, error) {
return f.ExpectedDashboardVersion, f.ExpectedError
}
func (f *FakeDashboardVersionStore) GetBatch(ctx context.Context, cmd *dashver.DeleteExpiredVersionsCommand, perBatch int, versionsToKeep int) ([]interface{}, error) {
return f.ExpectedVersions, f.ExpectedError
}
func (f *FakeDashboardVersionStore) DeleteBatch(ctx context.Context, cmd *dashver.DeleteExpiredVersionsCommand, versionIdsToDelete []interface{}) (int64, error) {
return f.ExptectedDeletedVersions, f.ExpectedError
}

View File

@ -2,24 +2,26 @@ package dashverimpl
import (
"context"
"strings"
"github.com/grafana/grafana/pkg/models"
dashver "github.com/grafana/grafana/pkg/services/dashboardversion"
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/services/sqlstore/db"
)
type store interface {
Get(ctx context.Context, query *dashver.GetDashboardVersionQuery) (*dashver.DashboardVersion, error)
Get(context.Context, *dashver.GetDashboardVersionQuery) (*dashver.DashboardVersion, error)
GetBatch(context.Context, *dashver.DeleteExpiredVersionsCommand, int, int) ([]interface{}, error)
DeleteBatch(context.Context, *dashver.DeleteExpiredVersionsCommand, []interface{}) (int64, error)
}
type sqlStore struct {
db db.DB
}
func (s *sqlStore) Get(ctx context.Context, query *dashver.GetDashboardVersionQuery) (*dashver.DashboardVersion, error) {
func (ss *sqlStore) Get(ctx context.Context, query *dashver.GetDashboardVersionQuery) (*dashver.DashboardVersion, error) {
var version dashver.DashboardVersion
err := s.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
err := ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
has, err := sess.Where("dashboard_version.dashboard_id=? AND dashboard_version.version=? AND dashboard.org_id=?", query.DashboardID, query.Version, query.OrgID).
Join("LEFT", "dashboard", `dashboard.id = dashboard_version.dashboard_id`).
Get(&version)
@ -29,7 +31,7 @@ func (s *sqlStore) Get(ctx context.Context, query *dashver.GetDashboardVersionQu
}
if !has {
return models.ErrDashboardVersionNotFound
return dashver.ErrDashboardVersionNotFound
}
return nil
})
@ -38,3 +40,38 @@ func (s *sqlStore) Get(ctx context.Context, query *dashver.GetDashboardVersionQu
}
return &version, nil
}
func (ss *sqlStore) GetBatch(ctx context.Context, cmd *dashver.DeleteExpiredVersionsCommand, perBatch int, versionsToKeep int) ([]interface{}, error) {
var versionIds []interface{}
err := ss.db.WithTransactionalDbSession(ctx, func(sess *sqlstore.DBSession) error {
versionIdsToDeleteQuery := `SELECT id
FROM dashboard_version, (
SELECT dashboard_id, count(version) as count, min(version) as min
FROM dashboard_version
GROUP BY dashboard_id
) AS vtd
WHERE dashboard_version.dashboard_id=vtd.dashboard_id
AND version < vtd.min + vtd.count - ?
LIMIT ?`
err := sess.SQL(versionIdsToDeleteQuery, versionsToKeep, perBatch).Find(&versionIds)
return err
})
return versionIds, err
}
func (ss *sqlStore) DeleteBatch(ctx context.Context, cmd *dashver.DeleteExpiredVersionsCommand, versionIdsToDelete []interface{}) (int64, error) {
var deleted int64
err := ss.db.WithTransactionalDbSession(ctx, func(sess *sqlstore.DBSession) error {
deleteExpiredSQL := `DELETE FROM dashboard_version WHERE id IN (?` + strings.Repeat(",?", len(versionIdsToDelete)-1) + `)`
sqlOrArgs := append([]interface{}{deleteExpiredSQL}, versionIdsToDelete...)
expiredResponse, err := sess.Exec(sqlOrArgs...)
if err != nil {
return err
}
deleted, err = expiredResponse.RowsAffected()
return err
})
return deleted, err
}

View File

@ -14,10 +14,11 @@ import (
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/util"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestIntegrationDashboardVersion(t *testing.T) {
func TestIntegrationGetDashboardVersion(t *testing.T) {
ss := sqlstore.InitTestDB(t)
dashVerStore := sqlStore{db: ss}
@ -60,6 +61,27 @@ func TestIntegrationDashboardVersion(t *testing.T) {
})
}
func TestIntegrationDeleteExpiredVersions(t *testing.T) {
versionsToWrite := 10
ss := sqlstore.InitTestDB(t)
dashVerStore := sqlStore{db: ss}
for i := 0; i < versionsToWrite-1; i++ {
insertTestDashboard(t, ss, "test dash 53", 1, int64(i), false, "diff-all")
}
t.Run("Clean up old dashboard versions", func(t *testing.T) {
versionIDsToDelete := []interface{}{1, 2, 3, 4}
res, err := dashVerStore.DeleteBatch(
context.Background(),
&dashver.DeleteExpiredVersionsCommand{DeletedRows: 4},
versionIDsToDelete,
)
require.Nil(t, err)
assert.EqualValues(t, 4, res)
})
}
func getDashboard(t *testing.T, sqlStore *sqlstore.SQLStore, dashboard *models.Dashboard) error {
t.Helper()
return sqlStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error {

View File

@ -24,3 +24,7 @@ func (f *FakeDashboardVersionService) Get(ctx context.Context, query *dashver.Ge
f.counter++
return f.ExpectedDashboardVersions[f.counter-1], f.ExpectedError
}
func (f *FakeDashboardVersionService) DeleteExpired(ctx context.Context, cmd *dashver.DeleteExpiredVersionsCommand) error {
return f.ExpectedError
}

View File

@ -1,11 +1,17 @@
package dashver
import (
"errors"
"time"
"github.com/grafana/grafana/pkg/components/simplejson"
)
var (
ErrDashboardVersionNotFound = errors.New("dashboard version not found")
ErrNoVersionsForDashboardId = errors.New("no dashboard versions found for the given DashboardId")
)
type DashboardVersion struct {
ID int64 `json:"id"`
DashboardID int64 `json:"dashboardId"`
@ -25,3 +31,7 @@ type GetDashboardVersionQuery struct {
OrgID int64
Version int
}
type DeleteExpiredVersionsCommand struct {
DeletedRows int64
}

View File

@ -2,10 +2,8 @@ package sqlstore
import (
"context"
"strings"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting"
)
// GetDashboardVersions gets all dashboard versions for the given dashboard ID.
@ -42,69 +40,3 @@ func (ss *SQLStore) GetDashboardVersions(ctx context.Context, query *models.GetD
return nil
})
}
const MAX_VERSIONS_TO_DELETE_PER_BATCH = 100
const MAX_VERSION_DELETION_BATCHES = 50
func (ss *SQLStore) DeleteExpiredVersions(ctx context.Context, cmd *models.DeleteExpiredVersionsCommand) error {
return ss.deleteExpiredVersions(ctx, cmd, MAX_VERSIONS_TO_DELETE_PER_BATCH, MAX_VERSION_DELETION_BATCHES)
}
func (ss *SQLStore) deleteExpiredVersions(ctx context.Context, cmd *models.DeleteExpiredVersionsCommand, perBatch int, maxBatches int) error {
versionsToKeep := setting.DashboardVersionsToKeep
if versionsToKeep < 1 {
versionsToKeep = 1
}
for batch := 0; batch < maxBatches; batch++ {
deleted := int64(0)
batchErr := ss.WithTransactionalDbSession(ctx, func(sess *DBSession) error {
// Idea of this query is finding version IDs to delete based on formula:
// min_version_to_keep = min_version + (versions_count - versions_to_keep)
// where version stats is processed for each dashboard. This guarantees that we keep at least versions_to_keep
// versions, but in some cases (when versions are sparse) this number may be more.
versionIdsToDeleteQuery := `SELECT id
FROM dashboard_version, (
SELECT dashboard_id, count(version) as count, min(version) as min
FROM dashboard_version
GROUP BY dashboard_id
) AS vtd
WHERE dashboard_version.dashboard_id=vtd.dashboard_id
AND version < vtd.min + vtd.count - ?
LIMIT ?`
var versionIdsToDelete []interface{}
err := sess.SQL(versionIdsToDeleteQuery, versionsToKeep, perBatch).Find(&versionIdsToDelete)
if err != nil {
return err
}
if len(versionIdsToDelete) < 1 {
return nil
}
deleteExpiredSQL := `DELETE FROM dashboard_version WHERE id IN (?` + strings.Repeat(",?", len(versionIdsToDelete)-1) + `)`
sqlOrArgs := append([]interface{}{deleteExpiredSQL}, versionIdsToDelete...)
expiredResponse, err := sess.Exec(sqlOrArgs...)
if err != nil {
return err
}
deleted, err = expiredResponse.RowsAffected()
return err
})
if batchErr != nil {
return batchErr
}
cmd.DeletedRows += deleted
if deleted < int64(perBatch) {
break
}
}
return nil
}

View File

@ -12,7 +12,6 @@ import (
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/util"
)
@ -110,78 +109,6 @@ func TestIntegrationGetDashboardVersions(t *testing.T) {
})
}
func TestIntegrationDeleteExpiredVersions(t *testing.T) {
versionsToKeep := 5
versionsToWrite := 10
setting.DashboardVersionsToKeep = versionsToKeep
var sqlStore *SQLStore
var savedDash *models.Dashboard
setup := func(t *testing.T) {
sqlStore = InitTestDB(t)
savedDash = insertTestDashboard(t, sqlStore, "test dash 53", 1, 0, false, "diff-all")
for i := 0; i < versionsToWrite-1; i++ {
updateTestDashboard(t, sqlStore, savedDash, map[string]interface{}{
"tags": "different-tag",
})
}
}
t.Run("Clean up old dashboard versions", func(t *testing.T) {
setup(t)
err := sqlStore.DeleteExpiredVersions(context.Background(), &models.DeleteExpiredVersionsCommand{})
require.Nil(t, err)
query := models.GetDashboardVersionsQuery{DashboardId: savedDash.Id, OrgId: 1}
err = sqlStore.GetDashboardVersions(context.Background(), &query)
require.Nil(t, err)
require.Equal(t, versionsToKeep, len(query.Result))
// Ensure latest versions were kept
require.Equal(t, versionsToWrite-versionsToKeep+1, query.Result[versionsToKeep-1].Version)
require.Equal(t, versionsToWrite, query.Result[0].Version)
})
t.Run("Don't delete anything if there are no expired versions", func(t *testing.T) {
setup(t)
setting.DashboardVersionsToKeep = versionsToWrite
err := sqlStore.DeleteExpiredVersions(context.Background(), &models.DeleteExpiredVersionsCommand{})
require.Nil(t, err)
query := models.GetDashboardVersionsQuery{DashboardId: savedDash.Id, OrgId: 1, Limit: versionsToWrite}
err = sqlStore.GetDashboardVersions(context.Background(), &query)
require.Nil(t, err)
require.Equal(t, versionsToWrite, len(query.Result))
})
t.Run("Don't delete more than MAX_VERSIONS_TO_DELETE_PER_BATCH * MAX_VERSION_DELETION_BATCHES per iteration", func(t *testing.T) {
setup(t)
perBatch := 10
maxBatches := 10
versionsToWriteBigNumber := perBatch*maxBatches + versionsToWrite
for i := 0; i < versionsToWriteBigNumber-versionsToWrite; i++ {
updateTestDashboard(t, sqlStore, savedDash, map[string]interface{}{
"tags": "different-tag",
})
}
err := sqlStore.deleteExpiredVersions(context.Background(), &models.DeleteExpiredVersionsCommand{}, perBatch, maxBatches)
require.Nil(t, err)
query := models.GetDashboardVersionsQuery{DashboardId: savedDash.Id, OrgId: 1, Limit: versionsToWriteBigNumber}
err = sqlStore.GetDashboardVersions(context.Background(), &query)
require.Nil(t, err)
// Ensure we have at least versionsToKeep versions
require.GreaterOrEqual(t, len(query.Result), versionsToKeep)
// Ensure we haven't deleted more than perBatch * maxBatches rows
require.LessOrEqual(t, versionsToWriteBigNumber-len(query.Result), perBatch*maxBatches)
})
}
func getDashboard(t *testing.T, sqlStore *SQLStore, dashboard *models.Dashboard) error {
t.Helper()
return sqlStore.WithDbSession(context.Background(), func(sess *DBSession) error {

View File

@ -348,10 +348,6 @@ func (m *SQLStoreMock) GetDashboardVersions(ctx context.Context, query *models.G
return m.ExpectedError
}
func (m *SQLStoreMock) DeleteExpiredVersions(ctx context.Context, cmd *models.DeleteExpiredVersionsCommand) error {
return m.ExpectedError
}
func (m SQLStoreMock) GetDashboardAclInfoList(ctx context.Context, query *models.GetDashboardAclInfoListQuery) error {
query.Result = m.ExpectedDashboardAclInfoList
return m.ExpectedError

View File

@ -75,7 +75,6 @@ type Store interface {
WithTransactionalDbSession(ctx context.Context, callback DBTransactionFunc) error
InTransaction(ctx context.Context, fn func(ctx context.Context) error) error
GetDashboardVersions(ctx context.Context, query *models.GetDashboardVersionsQuery) error
DeleteExpiredVersions(ctx context.Context, cmd *models.DeleteExpiredVersionsCommand) error
GetDashboardAclInfoList(ctx context.Context, query *models.GetDashboardAclInfoListQuery) error
CreatePlaylist(ctx context.Context, cmd *models.CreatePlaylistCommand) error
UpdatePlaylist(ctx context.Context, cmd *models.UpdatePlaylistCommand) error