diff --git a/docs/sources/http_api/admin.md b/docs/sources/http_api/admin.md index 1cb20a7f7e9..f96a69331fc 100644 --- a/docs/sources/http_api/admin.md +++ b/docs/sources/http_api/admin.md @@ -447,3 +447,36 @@ Content-Type: application/json "message": "User auth token revoked" } ``` + +## Reload provisioning configurations + +`POST /api/admin/provisioning/dashboards/reload` + +`POST /api/admin/provisioning/datasources/reload` + +`POST /api/admin/provisioning/notifications/reload` + +Reloads the provisioning config files for specified type and provision entities again. It won't return +until the new provisioned entities are already stored in the database. In case of dashboards, it will stop +polling for changes in dashboard files and then restart it with new configs after returning. + +Only works with Basic Authentication (username and password). See [introduction](http://docs.grafana.org/http_api/admin/#admin-api) for an explanation. + +**Example Request**: + +```http +POST /api/admin/provisioning/dashboards/reload HTTP/1.1 +Accept: application/json +Content-Type: application/json +``` + +**Example Response**: + +```http +HTTP/1.1 200 +Content-Type: application/json + +{ + "message": "Dashboards config reloaded" +} +``` diff --git a/pkg/api/admin_provisioning.go b/pkg/api/admin_provisioning.go new file mode 100644 index 00000000000..cdbdeda31b0 --- /dev/null +++ b/pkg/api/admin_provisioning.go @@ -0,0 +1,30 @@ +package api + +import ( + "context" + "github.com/grafana/grafana/pkg/models" +) + +func (server *HTTPServer) AdminProvisioningReloadDasboards(c *models.ReqContext) Response { + err := server.ProvisioningService.ProvisionDashboards() + if err != nil && err != context.Canceled { + return Error(500, "", err) + } + return Success("Dashboards config reloaded") +} + +func (server *HTTPServer) AdminProvisioningReloadDatasources(c *models.ReqContext) Response { + err := server.ProvisioningService.ProvisionDatasources() + if err != nil { + return Error(500, "", err) + } + return Success("Datasources config reloaded") +} + +func (server *HTTPServer) AdminProvisioningReloadNotifications(c *models.ReqContext) Response { + err := server.ProvisioningService.ProvisionNotifications() + if err != nil { + return Error(500, "", err) + } + return Success("Notifications config reloaded") +} diff --git a/pkg/api/api.go b/pkg/api/api.go index 82f75014ca4..a727ed15e96 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -387,6 +387,10 @@ func (hs *HTTPServer) registerRoutes() { adminRoute.Post("/users/:id/logout", Wrap(hs.AdminLogoutUser)) adminRoute.Get("/users/:id/auth-tokens", Wrap(hs.AdminGetUserAuthTokens)) adminRoute.Post("/users/:id/revoke-auth-token", bind(m.RevokeAuthTokenCmd{}), Wrap(hs.AdminRevokeUserAuthToken)) + + adminRoute.Post("/provisioning/dashboards/reload", Wrap(hs.AdminProvisioningReloadDasboards)) + adminRoute.Post("/provisioning/datasources/reload", Wrap(hs.AdminProvisioningReloadDatasources)) + adminRoute.Post("/provisioning/notifications/reload", Wrap(hs.AdminProvisioningReloadNotifications)) }, reqGrafanaAdmin) // rendering diff --git a/pkg/api/http_server.go b/pkg/api/http_server.go index 2efa22c34fb..a8e92014a8f 100644 --- a/pkg/api/http_server.go +++ b/pkg/api/http_server.go @@ -25,6 +25,7 @@ import ( "github.com/grafana/grafana/pkg/services/cache" "github.com/grafana/grafana/pkg/services/datasources" "github.com/grafana/grafana/pkg/services/hooks" + "github.com/grafana/grafana/pkg/services/provisioning" "github.com/grafana/grafana/pkg/services/quota" "github.com/grafana/grafana/pkg/services/rendering" "github.com/grafana/grafana/pkg/setting" @@ -48,16 +49,17 @@ type HTTPServer struct { streamManager *live.StreamManager httpSrv *http.Server - RouteRegister routing.RouteRegister `inject:""` - Bus bus.Bus `inject:""` - RenderService rendering.Service `inject:""` - Cfg *setting.Cfg `inject:""` - HooksService *hooks.HooksService `inject:""` - CacheService *cache.CacheService `inject:""` - DatasourceCache datasources.CacheService `inject:""` - AuthTokenService models.UserTokenService `inject:""` - QuotaService *quota.QuotaService `inject:""` - RemoteCacheService *remotecache.RemoteCache `inject:""` + RouteRegister routing.RouteRegister `inject:""` + Bus bus.Bus `inject:""` + RenderService rendering.Service `inject:""` + Cfg *setting.Cfg `inject:""` + HooksService *hooks.HooksService `inject:""` + CacheService *cache.CacheService `inject:""` + DatasourceCache datasources.CacheService `inject:""` + AuthTokenService models.UserTokenService `inject:""` + QuotaService *quota.QuotaService `inject:""` + RemoteCacheService *remotecache.RemoteCache `inject:""` + ProvisioningService provisioning.ProvisioningService `inject:""` } func (hs *HTTPServer) Init() error { diff --git a/pkg/services/provisioning/dashboards/dashboard.go b/pkg/services/provisioning/dashboards/dashboard.go index a856565bf01..b7e5539c3b0 100644 --- a/pkg/services/provisioning/dashboards/dashboard.go +++ b/pkg/services/provisioning/dashboards/dashboard.go @@ -3,44 +3,79 @@ package dashboards import ( "context" "fmt" - "github.com/grafana/grafana/pkg/log" + "github.com/pkg/errors" ) -type DashboardProvisioner struct { - cfgReader *configReader - log log.Logger +type DashboardProvisioner interface { + Provision() error + PollChanges(ctx context.Context) } -func NewDashboardProvisioner(configDirectory string) *DashboardProvisioner { - log := log.New("provisioning.dashboard") - d := &DashboardProvisioner{ - cfgReader: &configReader{path: configDirectory, log: log}, - log: log, - } - - return d +type DashboardProvisionerImpl struct { + log log.Logger + fileReaders []*fileReader } -func (provider *DashboardProvisioner) Provision(ctx context.Context) error { - cfgs, err := provider.cfgReader.readConfig() +type DashboardProvisionerFactory func(string) (DashboardProvisioner, error) + +func NewDashboardProvisionerImpl(configDirectory string) (*DashboardProvisionerImpl, error) { + logger := log.New("provisioning.dashboard") + cfgReader := &configReader{path: configDirectory, log: logger} + configs, err := cfgReader.readConfig() + if err != nil { - return err + return nil, errors.Wrap(err, "Failed to read dashboards config") } - for _, cfg := range cfgs { - switch cfg.Type { - case "file": - fileReader, err := NewDashboardFileReader(cfg, provider.log.New("type", cfg.Type, "name", cfg.Name)) - if err != nil { - return err - } + fileReaders, err := getFileReaders(configs, logger) - go fileReader.ReadAndListen(ctx) - default: - return fmt.Errorf("type %s is not supported", cfg.Type) + if err != nil { + return nil, errors.Wrap(err, "Failed to initialize file readers") + } + + d := &DashboardProvisionerImpl{ + log: logger, + fileReaders: fileReaders, + } + + return d, nil +} + +func (provider *DashboardProvisionerImpl) Provision() error { + for _, reader := range provider.fileReaders { + err := reader.startWalkingDisk() + if err != nil { + return errors.Wrapf(err, "Failed to provision config %v", reader.Cfg.Name) } } return nil } + +// PollChanges starts polling for changes in dashboard definition files. It creates goroutine for each provider +// defined in the config. +func (provider *DashboardProvisionerImpl) PollChanges(ctx context.Context) { + for _, reader := range provider.fileReaders { + go reader.pollChanges(ctx) + } +} + +func getFileReaders(configs []*DashboardsAsConfig, logger log.Logger) ([]*fileReader, error) { + var readers []*fileReader + + for _, config := range configs { + switch config.Type { + case "file": + fileReader, err := NewDashboardFileReader(config, logger.New("type", config.Type, "name", config.Name)) + if err != nil { + return nil, errors.Wrapf(err, "Failed to create file reader for config %v", config.Name) + } + readers = append(readers, fileReader) + default: + return nil, fmt.Errorf("type %s is not supported", config.Type) + } + } + + return readers, nil +} diff --git a/pkg/services/provisioning/dashboards/dashboard_mock.go b/pkg/services/provisioning/dashboards/dashboard_mock.go new file mode 100644 index 00000000000..5cdaab9be70 --- /dev/null +++ b/pkg/services/provisioning/dashboards/dashboard_mock.go @@ -0,0 +1,36 @@ +package dashboards + +import "context" + +type Calls struct { + Provision []interface{} + PollChanges []interface{} +} + +type DashboardProvisionerMock struct { + Calls *Calls + ProvisionFunc func() error + PollChangesFunc func(ctx context.Context) +} + +func NewDashboardProvisionerMock() *DashboardProvisionerMock { + return &DashboardProvisionerMock{ + Calls: &Calls{}, + } +} + +func (dpm *DashboardProvisionerMock) Provision() error { + dpm.Calls.Provision = append(dpm.Calls.Provision, nil) + if dpm.ProvisionFunc != nil { + return dpm.ProvisionFunc() + } else { + return nil + } +} + +func (dpm *DashboardProvisionerMock) PollChanges(ctx context.Context) { + dpm.Calls.PollChanges = append(dpm.Calls.PollChanges, ctx) + if dpm.PollChangesFunc != nil { + dpm.PollChangesFunc(ctx) + } +} diff --git a/pkg/services/provisioning/dashboards/file_reader.go b/pkg/services/provisioning/dashboards/file_reader.go index b12057c963b..62709ce6ba9 100644 --- a/pkg/services/provisioning/dashboards/file_reader.go +++ b/pkg/services/provisioning/dashboards/file_reader.go @@ -51,35 +51,25 @@ func NewDashboardFileReader(cfg *DashboardsAsConfig, log log.Logger) (*fileReade }, nil } -func (fr *fileReader) ReadAndListen(ctx context.Context) error { - if err := fr.startWalkingDisk(); err != nil { - fr.log.Error("failed to search for dashboards", "error", err) - } - - ticker := time.NewTicker(time.Duration(int64(time.Second) * fr.Cfg.UpdateIntervalSeconds)) - - running := false - +// pollChanges periodically runs startWalkingDisk based on interval specified in the config. +func (fr *fileReader) pollChanges(ctx context.Context) { + ticker := time.Tick(time.Duration(int64(time.Second) * fr.Cfg.UpdateIntervalSeconds)) for { select { - case <-ticker.C: - if !running { // avoid walking the filesystem in parallel. in-case fs is very slow. - running = true - go func() { - if err := fr.startWalkingDisk(); err != nil { - fr.log.Error("failed to search for dashboards", "error", err) - } - running = false - }() + case <-ticker: + if err := fr.startWalkingDisk(); err != nil { + fr.log.Error("failed to search for dashboards", "error", err) } case <-ctx.Done(): - return nil + return } } } -// startWalkingDisk finds and saves dashboards on disk. +// startWalkingDisk traverses the file system for defined path, reads dashboard definition files and applies any change +// to the database. func (fr *fileReader) startWalkingDisk() error { + fr.log.Debug("Start walking disk", "path", fr.Path) resolvedPath := fr.resolvePath(fr.Path) if _, err := os.Stat(resolvedPath); err != nil { if os.IsNotExist(err) { diff --git a/pkg/services/provisioning/provisioning.go b/pkg/services/provisioning/provisioning.go index 45f0972b885..21f61977620 100644 --- a/pkg/services/provisioning/provisioning.go +++ b/pkg/services/provisioning/provisioning.go @@ -2,8 +2,10 @@ package provisioning import ( "context" - "fmt" + "github.com/grafana/grafana/pkg/log" + "github.com/pkg/errors" "path" + "sync" "github.com/grafana/grafana/pkg/registry" "github.com/grafana/grafana/pkg/services/provisioning/dashboards" @@ -13,35 +15,122 @@ import ( ) func init() { - registry.RegisterService(&ProvisioningService{}) + registry.RegisterService(NewProvisioningServiceImpl( + func(path string) (dashboards.DashboardProvisioner, error) { + return dashboards.NewDashboardProvisionerImpl(path) + }, + notifiers.Provision, + datasources.Provision, + )) } -type ProvisioningService struct { - Cfg *setting.Cfg `inject:""` +type ProvisioningService interface { + ProvisionDatasources() error + ProvisionNotifications() error + ProvisionDashboards() error } -func (ps *ProvisioningService) Init() error { - datasourcePath := path.Join(ps.Cfg.ProvisioningPath, "datasources") - if err := datasources.Provision(datasourcePath); err != nil { - return fmt.Errorf("Datasource provisioning error: %v", err) +func NewProvisioningServiceImpl( + newDashboardProvisioner dashboards.DashboardProvisionerFactory, + provisionNotifiers func(string) error, + provisionDatasources func(string) error, +) *provisioningServiceImpl { + return &provisioningServiceImpl{ + log: log.New("provisioning"), + newDashboardProvisioner: newDashboardProvisioner, + provisionNotifiers: provisionNotifiers, + provisionDatasources: provisionDatasources, + } +} + +type provisioningServiceImpl struct { + Cfg *setting.Cfg `inject:""` + log log.Logger + pollingCtxCancel context.CancelFunc + newDashboardProvisioner dashboards.DashboardProvisionerFactory + dashboardProvisioner dashboards.DashboardProvisioner + provisionNotifiers func(string) error + provisionDatasources func(string) error + mutex sync.Mutex +} + +func (ps *provisioningServiceImpl) Init() error { + err := ps.ProvisionDatasources() + if err != nil { + return err } - alertNotificationsPath := path.Join(ps.Cfg.ProvisioningPath, "notifiers") - if err := notifiers.Provision(alertNotificationsPath); err != nil { - return fmt.Errorf("Alert notification provisioning error: %v", err) + err = ps.ProvisionNotifications() + if err != nil { + return err + } + + err = ps.ProvisionDashboards() + if err != nil { + return err } return nil } -func (ps *ProvisioningService) Run(ctx context.Context) error { - dashboardPath := path.Join(ps.Cfg.ProvisioningPath, "dashboards") - dashProvisioner := dashboards.NewDashboardProvisioner(dashboardPath) +func (ps *provisioningServiceImpl) Run(ctx context.Context) error { + for { - if err := dashProvisioner.Provision(ctx); err != nil { - return err + // Wait for unlock. This is tied to new dashboardProvisioner to be instantiated before we start polling. + ps.mutex.Lock() + pollingContext, cancelFun := context.WithCancel(ctx) + ps.pollingCtxCancel = cancelFun + ps.dashboardProvisioner.PollChanges(pollingContext) + ps.mutex.Unlock() + + select { + case <-pollingContext.Done(): + // Polling was canceled. + continue + case <-ctx.Done(): + // Root server context was cancelled so just leave. + return ctx.Err() + } + } +} + +func (ps *provisioningServiceImpl) ProvisionDatasources() error { + datasourcePath := path.Join(ps.Cfg.ProvisioningPath, "datasources") + err := ps.provisionDatasources(datasourcePath) + return errors.Wrap(err, "Datasource provisioning error") +} + +func (ps *provisioningServiceImpl) ProvisionNotifications() error { + alertNotificationsPath := path.Join(ps.Cfg.ProvisioningPath, "notifiers") + err := ps.provisionNotifiers(alertNotificationsPath) + return errors.Wrap(err, "Alert notification provisioning error") +} + +func (ps *provisioningServiceImpl) ProvisionDashboards() error { + dashboardPath := path.Join(ps.Cfg.ProvisioningPath, "dashboards") + dashProvisioner, err := ps.newDashboardProvisioner(dashboardPath) + if err != nil { + return errors.Wrap(err, "Failed to create provisioner") } - <-ctx.Done() - return ctx.Err() + ps.mutex.Lock() + defer ps.mutex.Unlock() + + ps.cancelPolling() + + if err := dashProvisioner.Provision(); err != nil { + // If we fail to provision with the new provisioner, mutex will unlock and the polling we restart with the + // old provisioner as we did not switch them yet. + return errors.Wrap(err, "Failed to provision dashboards") + } + ps.dashboardProvisioner = dashProvisioner + return nil +} + +func (ps *provisioningServiceImpl) cancelPolling() { + if ps.pollingCtxCancel != nil { + ps.log.Debug("Stop polling for dashboard changes") + ps.pollingCtxCancel() + } + ps.pollingCtxCancel = nil } diff --git a/pkg/services/provisioning/provisioning_test.go b/pkg/services/provisioning/provisioning_test.go new file mode 100644 index 00000000000..73f207122f7 --- /dev/null +++ b/pkg/services/provisioning/provisioning_test.go @@ -0,0 +1,90 @@ +package provisioning + +import ( + "context" + "errors" + "github.com/grafana/grafana/pkg/services/provisioning/dashboards" + "github.com/grafana/grafana/pkg/setting" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestProvisioningServiceImpl(t *testing.T) { + t.Run("Restart dashboard provisioning and stop service", func(t *testing.T) { + service, mock := setup() + ctx, cancel := context.WithCancel(context.Background()) + var serviceRunning bool + var serviceError error + + err := service.ProvisionDashboards() + assert.Nil(t, err) + go func() { + serviceRunning = true + serviceError = service.Run(ctx) + serviceRunning = false + }() + time.Sleep(time.Millisecond) + assert.Equal(t, 1, len(mock.Calls.PollChanges), "PollChanges should have been called") + + err = service.ProvisionDashboards() + assert.Nil(t, err) + time.Sleep(time.Millisecond) + assert.Equal(t, 2, len(mock.Calls.PollChanges), "PollChanges should have been called 2 times") + + pollingCtx := mock.Calls.PollChanges[0].(context.Context) + assert.Equal(t, context.Canceled, pollingCtx.Err(), "Polling context from first call should have been cancelled") + assert.True(t, serviceRunning, "Service should be still running") + + // Cancelling the root context and stopping the service + cancel() + time.Sleep(time.Millisecond) + + assert.False(t, serviceRunning, "Service should not be running") + assert.Equal(t, context.Canceled, serviceError, "Service should have returned canceled error") + + }) + + t.Run("Failed reloading does not stop polling with old provisioned", func(t *testing.T) { + service, mock := setup() + ctx, cancel := context.WithCancel(context.Background()) + var serviceRunning bool + + err := service.ProvisionDashboards() + assert.Nil(t, err) + go func() { + serviceRunning = true + _ = service.Run(ctx) + serviceRunning = false + }() + time.Sleep(time.Millisecond) + assert.Equal(t, 1, len(mock.Calls.PollChanges), "PollChanges should have been called") + + mock.ProvisionFunc = func() error { + return errors.New("Test error") + } + err = service.ProvisionDashboards() + assert.NotNil(t, err) + time.Sleep(time.Millisecond) + // This should have been called with the old provisioner, after the last one failed. + assert.Equal(t, 2, len(mock.Calls.PollChanges), "PollChanges should have been called 2 times") + assert.True(t, serviceRunning, "Service should be still running") + + // Cancelling the root context and stopping the service + cancel() + + }) +} + +func setup() (*provisioningServiceImpl, *dashboards.DashboardProvisionerMock) { + dashMock := dashboards.NewDashboardProvisionerMock() + service := NewProvisioningServiceImpl( + func(path string) (dashboards.DashboardProvisioner, error) { + return dashMock, nil + }, + nil, + nil, + ) + service.Cfg = setting.NewCfg() + return service, dashMock +}