diff --git a/pkg/cmd/grafana-server/main.go b/pkg/cmd/grafana-server/main.go index 42c8dfedacf..877fc573169 100644 --- a/pkg/cmd/grafana-server/main.go +++ b/pkg/cmd/grafana-server/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "flag" "fmt" "io/ioutil" @@ -12,18 +13,26 @@ import ( "syscall" "time" + "golang.org/x/sync/errgroup" + "github.com/grafana/grafana/pkg/log" "github.com/grafana/grafana/pkg/login" "github.com/grafana/grafana/pkg/metrics" "github.com/grafana/grafana/pkg/plugins" - alertingInit "github.com/grafana/grafana/pkg/services/alerting/init" - "github.com/grafana/grafana/pkg/services/backgroundtasks" + "github.com/grafana/grafana/pkg/services/cleanup" "github.com/grafana/grafana/pkg/services/eventpublisher" "github.com/grafana/grafana/pkg/services/notifications" "github.com/grafana/grafana/pkg/services/search" "github.com/grafana/grafana/pkg/services/sqlstore" "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/social" + + "github.com/grafana/grafana/pkg/services/alerting" + _ "github.com/grafana/grafana/pkg/services/alerting/conditions" + _ "github.com/grafana/grafana/pkg/services/alerting/notifiers" + _ "github.com/grafana/grafana/pkg/tsdb/graphite" + _ "github.com/grafana/grafana/pkg/tsdb/prometheus" + _ "github.com/grafana/grafana/pkg/tsdb/testdata" ) var version = "3.1.0" @@ -57,26 +66,41 @@ func main() { setting.BuildCommit = commit setting.BuildStamp = buildstampInt64 - go listenToSystemSignals() + appContext, shutdownFn := context.WithCancel(context.Background()) + grafanaGroup, appContext := errgroup.WithContext(appContext) + + go listenToSystemSignals(shutdownFn, grafanaGroup) flag.Parse() writePIDFile() + initRuntime() + initSql() metrics.Init() search.Init() login.Init() social.NewOAuthService() eventpublisher.Init() plugins.Init() - alertingInit.Init() - backgroundtasks.Init() + + // init alerting + if setting.AlertingEnabled { + engine := alerting.NewEngine() + grafanaGroup.Go(func() error { return engine.Run(appContext) }) + } + + // cleanup service + cleanUpService := cleanup.NewCleanUpService() + grafanaGroup.Go(func() error { return cleanUpService.Run(appContext) }) if err := notifications.Init(); err != nil { log.Fatal(3, "Notification service failed to initialize", err) } - StartServer() - exitChan <- 0 + exitCode := StartServer() + + grafanaGroup.Wait() + exitChan <- exitCode } func initRuntime() { @@ -94,7 +118,9 @@ func initRuntime() { logger.Info("Starting Grafana", "version", version, "commit", commit, "compiled", time.Unix(setting.BuildStamp, 0)) setting.LogConfigurationInfo() +} +func initSql() { sqlstore.NewEngine() sqlstore.EnsureAdminUser() } @@ -117,7 +143,7 @@ func writePIDFile() { } } -func listenToSystemSignals() { +func listenToSystemSignals(cancel context.CancelFunc, grafanaGroup *errgroup.Group) { signalChan := make(chan os.Signal, 1) code := 0 @@ -125,7 +151,7 @@ func listenToSystemSignals() { select { case sig := <-signalChan: - log.Info("Received signal %s. shutting down", sig) + log.Info2("Received system signal. Shutting down", "signal", sig) case code = <-exitChan: switch code { case 0: @@ -135,6 +161,8 @@ func listenToSystemSignals() { } } + cancel() + grafanaGroup.Wait() log.Close() os.Exit(code) } diff --git a/pkg/cmd/grafana-server/web.go b/pkg/cmd/grafana-server/web.go index 4c294a814ca..44177ccc5b4 100644 --- a/pkg/cmd/grafana-server/web.go +++ b/pkg/cmd/grafana-server/web.go @@ -6,7 +6,6 @@ package main import ( "fmt" "net/http" - "os" "path" "gopkg.in/macaron.v1" @@ -79,7 +78,7 @@ func mapStatic(m *macaron.Macaron, rootDir string, dir string, prefix string) { )) } -func StartServer() { +func StartServer() int { logger = log.New("server") var err error @@ -95,11 +94,13 @@ func StartServer() { err = http.ListenAndServeTLS(listenAddr, setting.CertFile, setting.KeyFile, m) default: logger.Error("Invalid protocol", "protocol", setting.Protocol) - os.Exit(1) + return 1 } if err != nil { logger.Error("Fail to start server", "error", err) - os.Exit(1) + return 1 } + + return 0 } diff --git a/pkg/models/dashboard_snapshot.go b/pkg/models/dashboard_snapshot.go index 4a4d1290b6d..57c5524ace8 100644 --- a/pkg/models/dashboard_snapshot.go +++ b/pkg/models/dashboard_snapshot.go @@ -63,6 +63,9 @@ type DeleteDashboardSnapshotCommand struct { DeleteKey string `json:"-"` } +type DeleteExpiredSnapshotsCommand struct { +} + type GetDashboardSnapshotQuery struct { Key string diff --git a/pkg/models/timer.go b/pkg/models/timer.go deleted file mode 100644 index 6cbd7ed29d5..00000000000 --- a/pkg/models/timer.go +++ /dev/null @@ -1,7 +0,0 @@ -package models - -import "time" - -type HourCommand struct { - Time time.Time -} diff --git a/pkg/services/alerting/engine.go b/pkg/services/alerting/engine.go index 19befe87ed8..10b8af64119 100644 --- a/pkg/services/alerting/engine.go +++ b/pkg/services/alerting/engine.go @@ -1,10 +1,12 @@ package alerting import ( + "context" "time" "github.com/benbjohnson/clock" "github.com/grafana/grafana/pkg/log" + "golang.org/x/sync/errgroup" ) type Engine struct { @@ -34,12 +36,19 @@ func NewEngine() *Engine { return e } -func (e *Engine) Start() { - e.log.Info("Starting Alerting Engine") +func (e *Engine) Run(ctx context.Context) error { + e.log.Info("Initializing Alerting") - go e.alertingTicker() - go e.execDispatcher() - go e.resultDispatcher() + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { return e.alertingTicker(ctx) }) + g.Go(func() error { return e.execDispatcher(ctx) }) + g.Go(func() error { return e.resultDispatcher(ctx) }) + + err := g.Wait() + + e.log.Info("Stopped Alerting", "reason", err) + return err } func (e *Engine) Stop() { @@ -47,7 +56,7 @@ func (e *Engine) Stop() { close(e.resultQueue) } -func (e *Engine) alertingTicker() { +func (e *Engine) alertingTicker(grafanaCtx context.Context) error { defer func() { if err := recover(); err != nil { e.log.Error("Scheduler Panic: stopping alertingTicker", "error", err, "stack", log.Stack(1)) @@ -58,6 +67,8 @@ func (e *Engine) alertingTicker() { for { select { + case <-grafanaCtx.Done(): + return grafanaCtx.Err() case tick := <-e.ticker.C: // TEMP SOLUTION update rules ever tenth tick if tickIndex%10 == 0 { @@ -70,31 +81,59 @@ func (e *Engine) alertingTicker() { } } -func (e *Engine) execDispatcher() { - for job := range e.execQueue { - e.log.Debug("Starting executing alert rule", "alert id", job.Rule.Id) - go e.executeJob(job) +func (e *Engine) execDispatcher(grafanaCtx context.Context) error { + for { + select { + case <-grafanaCtx.Done(): + close(e.resultQueue) + return grafanaCtx.Err() + case job := <-e.execQueue: + go e.executeJob(grafanaCtx, job) + } } } -func (e *Engine) executeJob(job *Job) { +func (e *Engine) executeJob(grafanaCtx context.Context, job *Job) error { defer func() { if err := recover(); err != nil { e.log.Error("Execute Alert Panic", "error", err, "stack", log.Stack(1)) } }() - job.Running = true - context := NewEvalContext(job.Rule) - e.evalHandler.Eval(context) - job.Running = false + done := make(chan *EvalContext, 1) + go func() { + job.Running = true + context := NewEvalContext(job.Rule) + e.evalHandler.Eval(context) + job.Running = false + done <- context + close(done) + }() - e.resultQueue <- context + select { + + case <-grafanaCtx.Done(): + return grafanaCtx.Err() + case evalContext := <-done: + e.resultQueue <- evalContext + } + + return nil } -func (e *Engine) resultDispatcher() { - for result := range e.resultQueue { - go e.handleResponse(result) +func (e *Engine) resultDispatcher(grafanaCtx context.Context) error { + for { + select { + case <-grafanaCtx.Done(): + //handle all responses before shutting down. + for result := range e.resultQueue { + e.handleResponse(result) + } + + return grafanaCtx.Err() + case result := <-e.resultQueue: + e.handleResponse(result) + } } } diff --git a/pkg/services/alerting/init/init.go b/pkg/services/alerting/init/init.go index 94f97a41905..b0e247d5c0a 100644 --- a/pkg/services/alerting/init/init.go +++ b/pkg/services/alerting/init/init.go @@ -1,6 +1,8 @@ package init import ( + "context" + "github.com/grafana/grafana/pkg/services/alerting" _ "github.com/grafana/grafana/pkg/services/alerting/conditions" _ "github.com/grafana/grafana/pkg/services/alerting/notifiers" @@ -10,13 +12,11 @@ import ( _ "github.com/grafana/grafana/pkg/tsdb/testdata" ) -var engine *alerting.Engine - -func Init() { +func Init(ctx context.Context) error { if !setting.AlertingEnabled { - return + return nil } engine = alerting.NewEngine() - engine.Start() + return engine.Start(ctx) } diff --git a/pkg/services/backgroundtasks/background_tasks.go b/pkg/services/backgroundtasks/background_tasks.go deleted file mode 100644 index 5c4a7d197a8..00000000000 --- a/pkg/services/backgroundtasks/background_tasks.go +++ /dev/null @@ -1,39 +0,0 @@ -//"I want to be a cleaner, just like you," said Mathilda -//"Okay," replied Leon - -package backgroundtasks - -import ( - "time" - - "github.com/grafana/grafana/pkg/bus" - "github.com/grafana/grafana/pkg/log" - "github.com/grafana/grafana/pkg/models" -) - -var ( - tlog log.Logger = log.New("ticker") -) - -func Init() { - go start() -} - -func start() { - go cleanup(time.Now()) - - ticker := time.NewTicker(time.Hour * 1) - for { - select { - case tick := <-ticker.C: - go cleanup(tick) - } - } -} - -func cleanup(now time.Time) { - err := bus.Publish(&models.HourCommand{Time: now}) - if err != nil { - tlog.Error("Cleanup job failed", "error", err) - } -} diff --git a/pkg/services/backgroundtasks/remove_tmp_images.go b/pkg/services/backgroundtasks/remove_tmp_images.go deleted file mode 100644 index d6048f09523..00000000000 --- a/pkg/services/backgroundtasks/remove_tmp_images.go +++ /dev/null @@ -1,38 +0,0 @@ -package backgroundtasks - -import ( - "io/ioutil" - "os" - "path" - - "github.com/grafana/grafana/pkg/bus" - "github.com/grafana/grafana/pkg/models" - "github.com/grafana/grafana/pkg/setting" -) - -func init() { - bus.AddEventListener(CleanTmpFiles) -} - -func CleanTmpFiles(cmd *models.HourCommand) error { - files, err := ioutil.ReadDir(setting.ImagesDir) - - var toDelete []os.FileInfo - for _, file := range files { - if file.ModTime().AddDate(0, 0, setting.RenderedImageTTLDays).Before(cmd.Time) { - toDelete = append(toDelete, file) - } - } - - for _, file := range toDelete { - fullPath := path.Join(setting.ImagesDir, file.Name()) - err := os.Remove(fullPath) - if err != nil { - return err - } - } - - tlog.Debug("Found old rendered image to delete", "deleted", len(toDelete), "keept", len(files)) - - return err -} diff --git a/pkg/services/cleanup/cleanup.go b/pkg/services/cleanup/cleanup.go new file mode 100644 index 00000000000..fcc75762fd4 --- /dev/null +++ b/pkg/services/cleanup/cleanup.go @@ -0,0 +1,81 @@ +package cleanup + +import ( + "context" + "io/ioutil" + "os" + "path" + "time" + + "golang.org/x/sync/errgroup" + + "github.com/grafana/grafana/pkg/bus" + "github.com/grafana/grafana/pkg/log" + m "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/setting" +) + +type CleanUpService struct { + log log.Logger +} + +func NewCleanUpService() *CleanUpService { + return &CleanUpService{ + log: log.New("cleanup"), + } +} + +func (service *CleanUpService) Run(ctx context.Context) error { + service.log.Info("Initializing CleanUpService") + + g, _ := errgroup.WithContext(ctx) + g.Go(func() error { return service.start(ctx) }) + + err := g.Wait() + service.log.Info("Stopped CleanUpService", "reason", err) + return err +} + +func (service *CleanUpService) start(ctx context.Context) error { + service.cleanUpTmpFiles() + + ticker := time.NewTicker(time.Hour * 1) + for { + select { + case <-ticker.C: + service.cleanUpTmpFiles() + service.deleteExpiredSnapshots() + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (service *CleanUpService) cleanUpTmpFiles() { + files, err := ioutil.ReadDir(setting.ImagesDir) + if err != nil { + service.log.Error("Problem reading image dir", "error", err) + return + } + + var toDelete []os.FileInfo + for _, file := range files { + if file.ModTime().AddDate(0, 0, 1).Before(time.Now()) { + toDelete = append(toDelete, file) + } + } + + for _, file := range toDelete { + fullPath := path.Join(setting.ImagesDir, file.Name()) + err := os.Remove(fullPath) + if err != nil { + service.log.Error("Failed to delete temp file", "file", file.Name(), "error", err) + } + } + + service.log.Debug("Found old rendered image to delete", "deleted", len(toDelete), "keept", len(files)) +} + +func (service *CleanUpService) deleteExpiredSnapshots() { + bus.Dispatch(&m.DeleteExpiredSnapshotsCommand{}) +} diff --git a/pkg/services/sqlstore/dashboard_snapshot.go b/pkg/services/sqlstore/dashboard_snapshot.go index 50a7ece05f3..bb3a4f8f57e 100644 --- a/pkg/services/sqlstore/dashboard_snapshot.go +++ b/pkg/services/sqlstore/dashboard_snapshot.go @@ -5,7 +5,6 @@ import ( "github.com/go-xorm/xorm" "github.com/grafana/grafana/pkg/bus" - "github.com/grafana/grafana/pkg/log" m "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/setting" ) @@ -15,30 +14,24 @@ func init() { bus.AddHandler("sql", GetDashboardSnapshot) bus.AddHandler("sql", DeleteDashboardSnapshot) bus.AddHandler("sql", SearchDashboardSnapshots) - bus.AddEventListener(DeleteExpiredSnapshots) + bus.AddHandler("sql", DeleteExpiredSnapshots) } -func DeleteExpiredSnapshots(cmd *m.HourCommand) error { +func DeleteExpiredSnapshots(cmd *m.DeleteExpiredSnapshotsCommand) error { return inTransaction(func(sess *xorm.Session) error { var expiredCount int64 = 0 - var oldCount int64 = 0 if setting.SnapShotRemoveExpired { deleteExpiredSql := "DELETE FROM dashboard_snapshot WHERE expires < ?" - expiredResponse, err := x.Exec(deleteExpiredSql, cmd.Time) + expiredResponse, err := x.Exec(deleteExpiredSql, time.Now) if err != nil { return err } expiredCount, _ = expiredResponse.RowsAffected() } - oldSnapshotsSql := "DELETE FROM dashboard_snapshot WHERE created < ?" - oldResponse, err := x.Exec(oldSnapshotsSql, cmd.Time.AddDate(0, 0, setting.SnapShotTTLDays*-1)) - oldCount, _ = oldResponse.RowsAffected() - - log.Debug2("Deleted old/expired snaphots", "to old", oldCount, "expired", expiredCount) - - return err + sqlog.Debug("Deleted old/expired snaphots", "expired", expiredCount) + return nil }) } diff --git a/pkg/setting/setting.go b/pkg/setting/setting.go index 806e94dee98..d8a867a12b8 100644 --- a/pkg/setting/setting.go +++ b/pkg/setting/setting.go @@ -120,9 +120,8 @@ var ( IsWindows bool // PhantomJs Rendering - ImagesDir string - PhantomDir string - RenderedImageTTLDays int + ImagesDir string + PhantomDir string // for logging purposes configFiles []string @@ -543,9 +542,6 @@ func NewConfigContext(args *CommandLineArgs) error { ImagesDir = filepath.Join(DataPath, "png") PhantomDir = filepath.Join(HomePath, "vendor/phantomjs") - tmpFilesSection := Cfg.Section("tmp.files") - RenderedImageTTLDays = tmpFilesSection.Key("rendered_image_ttl_days").MustInt(14) - analytics := Cfg.Section("analytics") ReportingEnabled = analytics.Key("reporting_enabled").MustBool(true) CheckForUpdates = analytics.Key("check_for_updates").MustBool(true) diff --git a/vendor/golang.org/x/sync/LICENSE b/vendor/golang.org/x/sync/LICENSE new file mode 100644 index 00000000000..6a66aea5eaf --- /dev/null +++ b/vendor/golang.org/x/sync/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/golang.org/x/sync/PATENTS b/vendor/golang.org/x/sync/PATENTS new file mode 100644 index 00000000000..733099041f8 --- /dev/null +++ b/vendor/golang.org/x/sync/PATENTS @@ -0,0 +1,22 @@ +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. diff --git a/vendor/golang.org/x/sync/errgroup/errgroup.go b/vendor/golang.org/x/sync/errgroup/errgroup.go new file mode 100644 index 00000000000..533438d91c1 --- /dev/null +++ b/vendor/golang.org/x/sync/errgroup/errgroup.go @@ -0,0 +1,67 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package errgroup provides synchronization, error propagation, and Context +// cancelation for groups of goroutines working on subtasks of a common task. +package errgroup + +import ( + "sync" + + "golang.org/x/net/context" +) + +// A Group is a collection of goroutines working on subtasks that are part of +// the same overall task. +// +// A zero Group is valid and does not cancel on error. +type Group struct { + cancel func() + + wg sync.WaitGroup + + errOnce sync.Once + err error +} + +// WithContext returns a new Group and an associated Context derived from ctx. +// +// The derived Context is canceled the first time a function passed to Go +// returns a non-nil error or the first time Wait returns, whichever occurs +// first. +func WithContext(ctx context.Context) (*Group, context.Context) { + ctx, cancel := context.WithCancel(ctx) + return &Group{cancel: cancel}, ctx +} + +// Wait blocks until all function calls from the Go method have returned, then +// returns the first non-nil error (if any) from them. +func (g *Group) Wait() error { + g.wg.Wait() + if g.cancel != nil { + g.cancel() + } + return g.err +} + +// Go calls the given function in a new goroutine. +// +// The first call to return a non-nil error cancels the group; its error will be +// returned by Wait. +func (g *Group) Go(f func() error) { + g.wg.Add(1) + + go func() { + defer g.wg.Done() + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel() + } + }) + } + }() +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 05396911094..1c5a8962a49 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -57,6 +57,12 @@ "revisionTime": "2016-09-12T21:59:12Z" }, { + "checksumSHA1": "S0DP7Pn7sZUmXc55IzZnNvERu6s=", + "path": "golang.org/x/sync/errgroup", + "revision": "316e794f7b5e3df4e95175a45a5fb8b12f85cb4f", + "revisionTime": "2016-07-15T18:54:39Z" + }, + { "checksumSHA1": "PoHLopxwkiXxa3uVhezeq/qJ/Vo=", "path": "gopkg.in/guregu/null.v3", "revision": "41961cea0328defc5f95c1c473f89ebf0d1813f6",