refactor(backend): refactoring management of go routines in backend

This commit is contained in:
Torkel Ödegaard 2016-09-28 21:06:00 +02:00
parent bc634f20d5
commit 71e2c6f6ef
10 changed files with 119 additions and 123 deletions

View File

@ -19,14 +19,20 @@ import (
"github.com/grafana/grafana/pkg/login" "github.com/grafana/grafana/pkg/login"
"github.com/grafana/grafana/pkg/metrics" "github.com/grafana/grafana/pkg/metrics"
"github.com/grafana/grafana/pkg/plugins" "github.com/grafana/grafana/pkg/plugins"
alertingInit "github.com/grafana/grafana/pkg/services/alerting/init" "github.com/grafana/grafana/pkg/services/cleanup"
"github.com/grafana/grafana/pkg/services/backgroundtasks"
"github.com/grafana/grafana/pkg/services/eventpublisher" "github.com/grafana/grafana/pkg/services/eventpublisher"
"github.com/grafana/grafana/pkg/services/notifications" "github.com/grafana/grafana/pkg/services/notifications"
"github.com/grafana/grafana/pkg/services/search" "github.com/grafana/grafana/pkg/services/search"
"github.com/grafana/grafana/pkg/services/sqlstore" "github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/social" "github.com/grafana/grafana/pkg/social"
"github.com/grafana/grafana/pkg/services/alerting"
_ "github.com/grafana/grafana/pkg/services/alerting/conditions"
_ "github.com/grafana/grafana/pkg/services/alerting/notifiers"
_ "github.com/grafana/grafana/pkg/tsdb/graphite"
_ "github.com/grafana/grafana/pkg/tsdb/prometheus"
_ "github.com/grafana/grafana/pkg/tsdb/testdata"
) )
var version = "3.1.0" var version = "3.1.0"
@ -67,6 +73,7 @@ func main() {
flag.Parse() flag.Parse()
writePIDFile() writePIDFile()
initRuntime() initRuntime()
initSql() initSql()
metrics.Init() metrics.Init()
@ -76,8 +83,15 @@ func main() {
eventpublisher.Init() eventpublisher.Init()
plugins.Init() plugins.Init()
grafanaGroup.Go(func() error { return alertingInit.Init(appContext) }) // init alerting
grafanaGroup.Go(func() error { return backgroundtasks.Init(appContext) }) if setting.AlertingEnabled {
engine := alerting.NewEngine()
grafanaGroup.Go(func() error { return engine.Run(appContext) })
}
// cleanup service
cleanUpService := cleanup.NewCleanUpService()
grafanaGroup.Go(func() error { return cleanUpService.Run(appContext) })
if err := notifications.Init(); err != nil { if err := notifications.Init(); err != nil {
log.Fatal(3, "Notification service failed to initialize", err) log.Fatal(3, "Notification service failed to initialize", err)

View File

@ -63,6 +63,9 @@ type DeleteDashboardSnapshotCommand struct {
DeleteKey string `json:"-"` DeleteKey string `json:"-"`
} }
type DeleteExpiredSnapshotsCommand struct {
}
type GetDashboardSnapshotQuery struct { type GetDashboardSnapshotQuery struct {
Key string Key string

View File

@ -1,7 +0,0 @@
package models
import "time"
type HourCommand struct {
Time time.Time
}

View File

@ -36,16 +36,19 @@ func NewEngine() *Engine {
return e return e
} }
func (e *Engine) Start(grafanaCtx context.Context) error { func (e *Engine) Run(ctx context.Context) error {
e.log.Info("Starting Alerting Engine") e.log.Info("Initializing Alerting")
g, grafanaCtx := errgroup.WithContext(grafanaCtx) g, ctx := errgroup.WithContext(ctx)
g.Go(func() error { return e.alertingTicker(grafanaCtx) }) g.Go(func() error { return e.alertingTicker(ctx) })
g.Go(func() error { return e.execDispatcher(grafanaCtx) }) g.Go(func() error { return e.execDispatcher(ctx) })
g.Go(func() error { return e.resultDispatcher(grafanaCtx) }) g.Go(func() error { return e.resultDispatcher(ctx) })
return g.Wait() err := g.Wait()
e.log.Info("Stopped Alerting", "reason", err)
return err
} }
func (e *Engine) Stop() { func (e *Engine) Stop() {

View File

@ -12,8 +12,6 @@ import (
_ "github.com/grafana/grafana/pkg/tsdb/testdata" _ "github.com/grafana/grafana/pkg/tsdb/testdata"
) )
var engine *alerting.Engine
func Init(ctx context.Context) error { func Init(ctx context.Context) error {
if !setting.AlertingEnabled { if !setting.AlertingEnabled {
return nil return nil

View File

@ -1,47 +0,0 @@
//"I want to be a cleaner, just like you," said Mathilda
//"Okay," replied Leon
package backgroundtasks
import (
"context"
"time"
"golang.org/x/sync/errgroup"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/models"
)
var (
tlog log.Logger = log.New("ticker")
)
func Init(ctx context.Context) error {
g, _ := errgroup.WithContext(ctx)
g.Go(func() error { return start(ctx) })
return g.Wait()
}
func start(ctx context.Context) error {
go cleanup(time.Now())
ticker := time.NewTicker(time.Hour * 1)
for {
select {
case tick := <-ticker.C:
go cleanup(tick)
case <-ctx.Done():
return ctx.Err()
}
}
}
func cleanup(now time.Time) {
err := bus.Publish(&models.HourCommand{Time: now})
if err != nil {
tlog.Error("Cleanup job failed", "error", err)
}
}

View File

@ -1,38 +0,0 @@
package backgroundtasks
import (
"io/ioutil"
"os"
"path"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting"
)
func init() {
bus.AddEventListener(CleanTmpFiles)
}
func CleanTmpFiles(cmd *models.HourCommand) error {
files, err := ioutil.ReadDir(setting.ImagesDir)
var toDelete []os.FileInfo
for _, file := range files {
if file.ModTime().AddDate(0, 0, setting.RenderedImageTTLDays).Before(cmd.Time) {
toDelete = append(toDelete, file)
}
}
for _, file := range toDelete {
fullPath := path.Join(setting.ImagesDir, file.Name())
err := os.Remove(fullPath)
if err != nil {
return err
}
}
tlog.Debug("Found old rendered image to delete", "deleted", len(toDelete), "keept", len(files))
return err
}

View File

@ -0,0 +1,81 @@
package cleanup
import (
"context"
"io/ioutil"
"os"
"path"
"time"
"golang.org/x/sync/errgroup"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/log"
m "github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting"
)
type CleanUpService struct {
log log.Logger
}
func NewCleanUpService() *CleanUpService {
return &CleanUpService{
log: log.New("cleanup"),
}
}
func (service *CleanUpService) Run(ctx context.Context) error {
service.log.Info("Initializing CleanUpService")
g, _ := errgroup.WithContext(ctx)
g.Go(func() error { return service.start(ctx) })
err := g.Wait()
service.log.Info("Stopped CleanUpService", "reason", err)
return err
}
func (service *CleanUpService) start(ctx context.Context) error {
service.cleanUpTmpFiles()
ticker := time.NewTicker(time.Hour * 1)
for {
select {
case <-ticker.C:
service.cleanUpTmpFiles()
service.deleteExpiredSnapshots()
case <-ctx.Done():
return ctx.Err()
}
}
}
func (service *CleanUpService) cleanUpTmpFiles() {
files, err := ioutil.ReadDir(setting.ImagesDir)
if err != nil {
service.log.Error("Problem reading image dir", "error", err)
return
}
var toDelete []os.FileInfo
for _, file := range files {
if file.ModTime().AddDate(0, 0, 1).Before(time.Now()) {
toDelete = append(toDelete, file)
}
}
for _, file := range toDelete {
fullPath := path.Join(setting.ImagesDir, file.Name())
err := os.Remove(fullPath)
if err != nil {
service.log.Error("Failed to delete temp file", "file", file.Name(), "error", err)
}
}
service.log.Debug("Found old rendered image to delete", "deleted", len(toDelete), "keept", len(files))
}
func (service *CleanUpService) deleteExpiredSnapshots() {
bus.Dispatch(&m.DeleteExpiredSnapshotsCommand{})
}

View File

@ -5,7 +5,6 @@ import (
"github.com/go-xorm/xorm" "github.com/go-xorm/xorm"
"github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/log"
m "github.com/grafana/grafana/pkg/models" m "github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/setting"
) )
@ -15,30 +14,24 @@ func init() {
bus.AddHandler("sql", GetDashboardSnapshot) bus.AddHandler("sql", GetDashboardSnapshot)
bus.AddHandler("sql", DeleteDashboardSnapshot) bus.AddHandler("sql", DeleteDashboardSnapshot)
bus.AddHandler("sql", SearchDashboardSnapshots) bus.AddHandler("sql", SearchDashboardSnapshots)
bus.AddEventListener(DeleteExpiredSnapshots) bus.AddHandler("sql", DeleteExpiredSnapshots)
} }
func DeleteExpiredSnapshots(cmd *m.HourCommand) error { func DeleteExpiredSnapshots(cmd *m.DeleteExpiredSnapshotsCommand) error {
return inTransaction(func(sess *xorm.Session) error { return inTransaction(func(sess *xorm.Session) error {
var expiredCount int64 = 0 var expiredCount int64 = 0
var oldCount int64 = 0
if setting.SnapShotRemoveExpired { if setting.SnapShotRemoveExpired {
deleteExpiredSql := "DELETE FROM dashboard_snapshot WHERE expires < ?" deleteExpiredSql := "DELETE FROM dashboard_snapshot WHERE expires < ?"
expiredResponse, err := x.Exec(deleteExpiredSql, cmd.Time) expiredResponse, err := x.Exec(deleteExpiredSql, time.Now)
if err != nil { if err != nil {
return err return err
} }
expiredCount, _ = expiredResponse.RowsAffected() expiredCount, _ = expiredResponse.RowsAffected()
} }
oldSnapshotsSql := "DELETE FROM dashboard_snapshot WHERE created < ?" sqlog.Debug("Deleted old/expired snaphots", "expired", expiredCount)
oldResponse, err := x.Exec(oldSnapshotsSql, cmd.Time.AddDate(0, 0, setting.SnapShotTTLDays*-1)) return nil
oldCount, _ = oldResponse.RowsAffected()
log.Debug2("Deleted old/expired snaphots", "to old", oldCount, "expired", expiredCount)
return err
}) })
} }

View File

@ -120,9 +120,8 @@ var (
IsWindows bool IsWindows bool
// PhantomJs Rendering // PhantomJs Rendering
ImagesDir string ImagesDir string
PhantomDir string PhantomDir string
RenderedImageTTLDays int
// for logging purposes // for logging purposes
configFiles []string configFiles []string
@ -540,9 +539,6 @@ func NewConfigContext(args *CommandLineArgs) error {
ImagesDir = filepath.Join(DataPath, "png") ImagesDir = filepath.Join(DataPath, "png")
PhantomDir = filepath.Join(HomePath, "vendor/phantomjs") PhantomDir = filepath.Join(HomePath, "vendor/phantomjs")
tmpFilesSection := Cfg.Section("tmp.files")
RenderedImageTTLDays = tmpFilesSection.Key("rendered_image_ttl_days").MustInt(14)
analytics := Cfg.Section("analytics") analytics := Cfg.Section("analytics")
ReportingEnabled = analytics.Key("reporting_enabled").MustBool(true) ReportingEnabled = analytics.Key("reporting_enabled").MustBool(true)
CheckForUpdates = analytics.Key("check_for_updates").MustBool(true) CheckForUpdates = analytics.Key("check_for_updates").MustBool(true)