mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Query history: Cleanup (#48303)
* Query history: Clean up stale history after 14 days * Add unstarring sleanup * Add wraapping * Update sql for mysql database * Update * Remove fmt.Print * Refactor and simplify solution * Update pkg/services/queryhistory/database.go Co-authored-by: Emil Tullstedt <emil.tullstedt@grafana.com> * Adjust SQL to limit number of deleted queries * Add limit enforcmenet to cleanup * Change limit * Update Co-authored-by: Emil Tullstedt <emil.tullstedt@grafana.com>
This commit is contained in:
parent
cd462c5b21
commit
4661c9ca47
@ -8,6 +8,7 @@ import (
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/queryhistory"
|
||||
"github.com/grafana/grafana/pkg/services/shorturls"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||
|
||||
@ -19,11 +20,12 @@ import (
|
||||
)
|
||||
|
||||
func ProvideService(cfg *setting.Cfg, serverLockService *serverlock.ServerLockService,
|
||||
shortURLService shorturls.Service, store sqlstore.Store) *CleanUpService {
|
||||
shortURLService shorturls.Service, store sqlstore.Store, queryHistoryService queryhistory.Service) *CleanUpService {
|
||||
s := &CleanUpService{
|
||||
Cfg: cfg,
|
||||
ServerLockService: serverLockService,
|
||||
ShortURLService: shortURLService,
|
||||
QueryHistoryService: queryHistoryService,
|
||||
store: store,
|
||||
log: log.New("cleanup"),
|
||||
}
|
||||
@ -36,6 +38,7 @@ type CleanUpService struct {
|
||||
Cfg *setting.Cfg
|
||||
ServerLockService *serverlock.ServerLockService
|
||||
ShortURLService shorturls.Service
|
||||
QueryHistoryService queryhistory.Service
|
||||
}
|
||||
|
||||
func (srv *CleanUpService) Run(ctx context.Context) error {
|
||||
@ -54,6 +57,7 @@ func (srv *CleanUpService) Run(ctx context.Context) error {
|
||||
srv.cleanUpOldAnnotations(ctxWithTimeout)
|
||||
srv.expireOldUserInvites(ctx)
|
||||
srv.deleteStaleShortURLs(ctx)
|
||||
srv.deleteStaleQueryHistory(ctx)
|
||||
err := srv.ServerLockService.LockAndExecute(ctx, "delete old login attempts",
|
||||
time.Minute*10, func(context.Context) {
|
||||
srv.deleteOldLoginAttempts(ctx)
|
||||
@ -183,3 +187,33 @@ func (srv *CleanUpService) deleteStaleShortURLs(ctx context.Context) {
|
||||
srv.log.Debug("Deleted short urls", "rows affected", cmd.NumDeleted)
|
||||
}
|
||||
}
|
||||
|
||||
func (srv *CleanUpService) deleteStaleQueryHistory(ctx context.Context) {
|
||||
// Delete query history from 14+ days ago with exception of starred queries
|
||||
maxQueryHistoryLifetime := time.Hour * 24 * 14
|
||||
olderThan := time.Now().Add(-maxQueryHistoryLifetime).Unix()
|
||||
rowsCount, err := srv.QueryHistoryService.DeleteStaleQueriesInQueryHistory(ctx, olderThan)
|
||||
if err != nil {
|
||||
srv.log.Error("Problem deleting stale query history", "error", err.Error())
|
||||
} else {
|
||||
srv.log.Debug("Deleted stale query history", "rows affected", rowsCount)
|
||||
}
|
||||
|
||||
// Enforce 200k limit for query_history table
|
||||
queryHistoryLimit := 200000
|
||||
rowsCount, err = srv.QueryHistoryService.EnforceRowLimitInQueryHistory(ctx, queryHistoryLimit, false)
|
||||
if err != nil {
|
||||
srv.log.Error("Problem with enforcing row limit for query_history", "error", err.Error())
|
||||
} else {
|
||||
srv.log.Debug("Enforced row limit for query_history", "rows affected", rowsCount)
|
||||
}
|
||||
|
||||
// Enforce 150k limit for query_history_star table
|
||||
queryHistoryStarLimit := 150000
|
||||
rowsCount, err = srv.QueryHistoryService.EnforceRowLimitInQueryHistory(ctx, queryHistoryStarLimit, true)
|
||||
if err != nil {
|
||||
srv.log.Error("Problem with enforcing row limit for query_history_star", "error", err.Error())
|
||||
} else {
|
||||
srv.log.Debug("Enforced row limit for query_history_star", "rows affected", rowsCount)
|
||||
}
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package queryhistory
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
@ -323,3 +324,108 @@ func (s QueryHistoryService) migrateQueries(ctx context.Context, user *models.Si
|
||||
|
||||
return len(queryHistories), len(starredQueries), nil
|
||||
}
|
||||
|
||||
func (s QueryHistoryService) deleteStaleQueries(ctx context.Context, olderThan int64) (int, error) {
|
||||
var rowsCount int64
|
||||
|
||||
err := s.SQLStore.WithDbSession(ctx, func(session *sqlstore.DBSession) error {
|
||||
sql := `DELETE
|
||||
FROM query_history
|
||||
WHERE uid IN (
|
||||
SELECT uid FROM (
|
||||
SELECT uid FROM query_history
|
||||
LEFT JOIN query_history_star
|
||||
ON query_history_star.query_uid = query_history.uid
|
||||
WHERE query_history_star.query_uid IS NULL
|
||||
AND query_history.created_at <= ?
|
||||
ORDER BY query_history.id ASC
|
||||
LIMIT 10000
|
||||
) AS q
|
||||
)`
|
||||
|
||||
res, err := session.Exec(sql, strconv.FormatInt(olderThan, 10))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rowsCount, err = res.RowsAffected()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return int(rowsCount), nil
|
||||
}
|
||||
|
||||
func (s QueryHistoryService) enforceQueryHistoryRowLimit(ctx context.Context, limit int, starredQueries bool) (int, error) {
|
||||
var deletedRowsCount int64
|
||||
|
||||
err := s.SQLStore.WithTransactionalDbSession(ctx, func(session *sqlstore.DBSession) error {
|
||||
var rowsCount int64
|
||||
var err error
|
||||
if starredQueries {
|
||||
rowsCount, err = session.Table("query_history_star").Count(QueryHistoryStar{})
|
||||
} else {
|
||||
rowsCount, err = session.Table("query_history").Count(QueryHistory{})
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
countRowsToDelete := rowsCount - int64(limit)
|
||||
if countRowsToDelete > 0 {
|
||||
var sql string
|
||||
if starredQueries {
|
||||
sql = `DELETE FROM query_history_star
|
||||
WHERE id IN (
|
||||
SELECT id FROM (
|
||||
SELECT id FROM query_history_star
|
||||
ORDER BY id ASC
|
||||
LIMIT ?
|
||||
) AS q
|
||||
)`
|
||||
} else {
|
||||
sql = `DELETE
|
||||
FROM query_history
|
||||
WHERE uid IN (
|
||||
SELECT uid FROM (
|
||||
SELECT uid FROM query_history
|
||||
LEFT JOIN query_history_star
|
||||
ON query_history_star.query_uid = query_history.uid
|
||||
WHERE query_history_star.query_uid IS NULL
|
||||
ORDER BY query_history.id ASC
|
||||
LIMIT ?
|
||||
) AS q
|
||||
)`
|
||||
}
|
||||
|
||||
sqlLimit := countRowsToDelete
|
||||
if sqlLimit > 10000 {
|
||||
sqlLimit = 10000
|
||||
}
|
||||
|
||||
res, err := session.Exec(sql, strconv.FormatInt(sqlLimit, 10))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
deletedRowsCount, err = res.RowsAffected()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return int(deletedRowsCount), nil
|
||||
}
|
||||
|
@ -34,6 +34,8 @@ type Service interface {
|
||||
StarQueryInQueryHistory(ctx context.Context, user *models.SignedInUser, UID string) (QueryHistoryDTO, error)
|
||||
UnstarQueryInQueryHistory(ctx context.Context, user *models.SignedInUser, UID string) (QueryHistoryDTO, error)
|
||||
MigrateQueriesToQueryHistory(ctx context.Context, user *models.SignedInUser, cmd MigrateQueriesToQueryHistoryCommand) (int, int, error)
|
||||
DeleteStaleQueriesInQueryHistory(ctx context.Context, olderThan int64) (int, error)
|
||||
EnforceRowLimitInQueryHistory(ctx context.Context, limit int, starredQueries bool) (int, error)
|
||||
}
|
||||
|
||||
type QueryHistoryService struct {
|
||||
@ -70,3 +72,11 @@ func (s QueryHistoryService) UnstarQueryInQueryHistory(ctx context.Context, user
|
||||
func (s QueryHistoryService) MigrateQueriesToQueryHistory(ctx context.Context, user *models.SignedInUser, cmd MigrateQueriesToQueryHistoryCommand) (int, int, error) {
|
||||
return s.migrateQueries(ctx, user, cmd)
|
||||
}
|
||||
|
||||
func (s QueryHistoryService) DeleteStaleQueriesInQueryHistory(ctx context.Context, olderThan int64) (int, error) {
|
||||
return s.deleteStaleQueries(ctx, olderThan)
|
||||
}
|
||||
|
||||
func (s QueryHistoryService) EnforceRowLimitInQueryHistory(ctx context.Context, limit int, starredQueries bool) (int, error) {
|
||||
return s.enforceQueryHistoryRowLimit(ctx, limit, starredQueries)
|
||||
}
|
||||
|
52
pkg/services/queryhistory/queryhistory_delete_stale_test.go
Normal file
52
pkg/services/queryhistory/queryhistory_delete_stale_test.go
Normal file
@ -0,0 +1,52 @@
|
||||
//go:build integration
|
||||
// +build integration
|
||||
|
||||
package queryhistory
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/web"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestDeleteStaleQueryFromQueryHistory(t *testing.T) {
|
||||
testScenarioWithQueryInQueryHistory(t, "Stale query history can be deleted",
|
||||
func(t *testing.T, sc scenarioContext) {
|
||||
olderThan := time.Now().Unix() + 60
|
||||
rowsDeleted, err := sc.service.DeleteStaleQueriesInQueryHistory(context.Background(), olderThan)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, rowsDeleted)
|
||||
})
|
||||
|
||||
testScenarioWithQueryInQueryHistory(t, "Stale single starred query history can not be deleted",
|
||||
func(t *testing.T, sc scenarioContext) {
|
||||
sc.ctx.Req = web.SetURLParams(sc.ctx.Req, map[string]string{":uid": sc.initialResult.Result.UID})
|
||||
resp := sc.service.starHandler(sc.reqContext)
|
||||
require.Equal(t, 200, resp.Status())
|
||||
|
||||
olderThan := time.Now().Unix() + 60
|
||||
rowsDeleted, err := sc.service.DeleteStaleQueriesInQueryHistory(context.Background(), olderThan)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, rowsDeleted)
|
||||
})
|
||||
|
||||
testScenarioWithQueryInQueryHistory(t, "Not stale query history is not deleted",
|
||||
func(t *testing.T, sc scenarioContext) {
|
||||
olderThan := time.Now().Unix() - 60
|
||||
rowsDeleted, err := sc.service.DeleteStaleQueriesInQueryHistory(context.Background(), olderThan)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, rowsDeleted)
|
||||
})
|
||||
|
||||
// In this scenario we have 2 starred queries and 1 not starred query
|
||||
testScenarioWithMultipleQueriesInQueryHistory(t, "Stale starred query history can not be deleted",
|
||||
func(t *testing.T, sc scenarioContext) {
|
||||
olderThan := time.Now().Unix() + 60
|
||||
rowsDeleted, err := sc.service.DeleteStaleQueriesInQueryHistory(context.Background(), olderThan)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, rowsDeleted)
|
||||
})
|
||||
}
|
48
pkg/services/queryhistory/queryhistory_enforce_limit_test.go
Normal file
48
pkg/services/queryhistory/queryhistory_enforce_limit_test.go
Normal file
@ -0,0 +1,48 @@
|
||||
//go:build integration
|
||||
// +build integration
|
||||
|
||||
package queryhistory
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestEnforceRowLimitInQueryHistory(t *testing.T) {
|
||||
testScenarioWithQueryInQueryHistory(t, "Enforce limit for query_history",
|
||||
func(t *testing.T, sc scenarioContext) {
|
||||
limit := 0
|
||||
rowsDeleted, err := sc.service.EnforceRowLimitInQueryHistory(context.Background(), limit, false)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, rowsDeleted)
|
||||
})
|
||||
|
||||
// In this scenario we have 2 starred queries and 1 not starred query
|
||||
testScenarioWithMultipleQueriesInQueryHistory(t, "Enforce limit for unstarred queries in query_history",
|
||||
func(t *testing.T, sc scenarioContext) {
|
||||
limit := 2
|
||||
rowsDeleted, err := sc.service.EnforceRowLimitInQueryHistory(context.Background(), limit, false)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, rowsDeleted)
|
||||
})
|
||||
|
||||
// In this scenario we have 2 starred queries and 1 not starred query
|
||||
testScenarioWithMultipleQueriesInQueryHistory(t, "Enforce limit for stars in query_history_star",
|
||||
func(t *testing.T, sc scenarioContext) {
|
||||
limit := 1
|
||||
rowsDeleted, err := sc.service.EnforceRowLimitInQueryHistory(context.Background(), limit, true)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, rowsDeleted)
|
||||
})
|
||||
|
||||
// In this scenario we have 2 starred queries and 1 not starred query
|
||||
testScenarioWithMultipleQueriesInQueryHistory(t, "Enforce limit for stars in query_history_star",
|
||||
func(t *testing.T, sc scenarioContext) {
|
||||
limit := 0
|
||||
rowsDeleted, err := sc.service.EnforceRowLimitInQueryHistory(context.Background(), limit, true)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, rowsDeleted)
|
||||
})
|
||||
}
|
@ -20,4 +20,8 @@ func addQueryHistoryStarMigrations(mg *Migrator) {
|
||||
mg.AddMigration("create query_history_star table v1", NewAddTableMigration(queryHistoryStarV1))
|
||||
|
||||
mg.AddMigration("add index query_history.user_id-query_uid", NewAddIndexMigration(queryHistoryStarV1, queryHistoryStarV1.Indices[0]))
|
||||
|
||||
mg.AddMigration("add column org_id in query_history_star", NewAddColumnMigration(queryHistoryStarV1, &Column{
|
||||
Name: "org_id", Type: DB_BigInt, Nullable: false, Default: "1",
|
||||
}))
|
||||
}
|
||||
|
@ -251,6 +251,7 @@ func (ss *SQLStore) RemoveOrgUser(ctx context.Context, cmd *models.RemoveOrgUser
|
||||
"DELETE FROM org_user WHERE org_id=? and user_id=?",
|
||||
"DELETE FROM dashboard_acl WHERE org_id=? and user_id = ?",
|
||||
"DELETE FROM team_member WHERE org_id=? and user_id = ?",
|
||||
"DELETE FROM query_history_star WHERE org_id=? and user_id = ?",
|
||||
}
|
||||
|
||||
for _, sql := range deletes {
|
||||
|
Loading…
Reference in New Issue
Block a user