diff --git a/pkg/services/provisioning/dashboards/dashboard.go b/pkg/services/provisioning/dashboards/dashboard.go index 10b691bfda8..9ede9c2808f 100644 --- a/pkg/services/provisioning/dashboards/dashboard.go +++ b/pkg/services/provisioning/dashboards/dashboard.go @@ -14,6 +14,7 @@ import ( // DashboardProvisioner is responsible for syncing dashboard from disk to // Grafana's database. type DashboardProvisioner interface { + HasDashboardSources() bool Provision(ctx context.Context) error PollChanges(ctx context.Context) GetProvisionerResolvedPath(name string) string @@ -33,6 +34,10 @@ type Provisioner struct { provisioner dashboards.DashboardProvisioningService } +func (provider *Provisioner) HasDashboardSources() bool { + return len(provider.fileReaders) > 0 +} + // New returns a new DashboardProvisioner func New(ctx context.Context, configDirectory string, provisioner dashboards.DashboardProvisioningService, orgStore utils.OrgStore, dashboardStore utils.DashboardStore) (DashboardProvisioner, error) { logger := log.New("provisioning.dashboard") diff --git a/pkg/services/provisioning/dashboards/dashboard_mock.go b/pkg/services/provisioning/dashboards/dashboard_mock.go index 87e859a3cfb..c8988b48010 100644 --- a/pkg/services/provisioning/dashboards/dashboard_mock.go +++ b/pkg/services/provisioning/dashboards/dashboard_mock.go @@ -26,6 +26,10 @@ func NewDashboardProvisionerMock() *ProvisionerMock { } } +func (dpm *ProvisionerMock) HasDashboardSources() bool { + return dpm.ProvisionFunc != nil +} + // Provision is a mock implementation of `Provisioner.Provision` func (dpm *ProvisionerMock) Provision(ctx context.Context) error { dpm.Calls.Provision = append(dpm.Calls.Provision, nil) diff --git a/pkg/services/provisioning/provisioning.go b/pkg/services/provisioning/provisioning.go index b6ac3c82a06..e4d7ec292a1 100644 --- a/pkg/services/provisioning/provisioning.go +++ b/pkg/services/provisioning/provisioning.go @@ -20,6 +20,7 @@ import ( "github.com/grafana/grafana/pkg/services/provisioning/notifiers" "github.com/grafana/grafana/pkg/services/provisioning/plugins" "github.com/grafana/grafana/pkg/services/provisioning/utils" + "github.com/grafana/grafana/pkg/services/searchV2" "github.com/grafana/grafana/pkg/services/sqlstore" "github.com/grafana/grafana/pkg/setting" ) @@ -30,6 +31,7 @@ func ProvideService(cfg *setting.Cfg, sqlStore *sqlstore.SQLStore, pluginStore p datasourceService datasourceservice.DataSourceService, dashboardService dashboardservice.DashboardService, alertingService *alerting.AlertNotificationService, pluginSettings pluginsettings.Service, + searchService searchV2.SearchService, ) (*ProvisioningServiceImpl, error) { s := &ProvisioningServiceImpl{ Cfg: cfg, @@ -47,6 +49,7 @@ func ProvideService(cfg *setting.Cfg, sqlStore *sqlstore.SQLStore, pluginStore p datasourceService: datasourceService, alertingService: alertingService, pluginsSettings: pluginSettings, + searchService: searchService, } return s, nil } @@ -108,6 +111,7 @@ type ProvisioningServiceImpl struct { datasourceService datasourceservice.DataSourceService alertingService *alerting.AlertNotificationService pluginsSettings pluginsettings.Service + searchService searchV2.SearchService } func (ps *ProvisioningServiceImpl) RunInitProvisioners(ctx context.Context) error { @@ -135,6 +139,9 @@ func (ps *ProvisioningServiceImpl) Run(ctx context.Context) error { ps.log.Error("Failed to provision dashboard", "error", err) return err } + if ps.dashboardProvisioner.HasDashboardSources() { + ps.searchService.TriggerReIndex() + } for { // Wait for unlock. This is tied to new dashboardProvisioner to be instantiated before we start polling. diff --git a/pkg/services/searchV2/index.go b/pkg/services/searchV2/index.go index 8a790cefa49..44cf9b92c29 100644 --- a/pkg/services/searchV2/index.go +++ b/pkg/services/searchV2/index.go @@ -101,7 +101,7 @@ func (i *dashboardIndex) sync(ctx context.Context) error { } } -func (i *dashboardIndex) run(ctx context.Context, orgIDs []int64) error { +func (i *dashboardIndex) run(ctx context.Context, orgIDs []int64, reIndexSignalCh chan struct{}) error { reIndexInterval := 5 * time.Minute fullReIndexTimer := time.NewTimer(reIndexInterval) defer fullReIndexTimer.Stop() @@ -124,18 +124,28 @@ func (i *dashboardIndex) run(ctx context.Context, orgIDs []int64) error { return err } + // This semaphore channel allows limiting concurrent async re-indexing routines to 1. + asyncReIndexSemaphore := make(chan struct{}, 1) + // Channel to handle signals about asynchronous full re-indexing completion. reIndexDoneCh := make(chan int64, 1) for { select { case doneCh := <-i.syncCh: + // Executed on search read requests to make sure index is consistent. lastEventID = i.applyIndexUpdates(ctx, lastEventID) close(doneCh) case <-partialUpdateTimer.C: + // Periodically apply updates collected in entity events table. lastEventID = i.applyIndexUpdates(ctx, lastEventID) partialUpdateTimer.Reset(partialUpdateInterval) + case <-reIndexSignalCh: + // External systems may trigger re-indexing, at this moment provisioning does this. + i.logger.Info("Full re-indexing due to external signal") + fullReIndexTimer.Reset(0) case signal := <-i.buildSignals: + // When search read request meets new not-indexed org we build index for it. i.mu.RLock() _, ok := i.perOrgWriter[signal.orgID] if ok { @@ -151,15 +161,28 @@ func (i *dashboardIndex) run(ctx context.Context, orgIDs []int64) error { // branch. fullReIndexTimer.Stop() go func() { + // We need semaphore here since asynchronous re-indexing may be in progress already. + asyncReIndexSemaphore <- struct{}{} + defer func() { <-asyncReIndexSemaphore }() _, err = i.buildOrgIndex(ctx, signal.orgID) signal.done <- err reIndexDoneCh <- lastIndexedEventID }() case <-fullReIndexTimer.C: + // Periodically rebuild indexes since we could miss updates. At this moment we are issuing + // entity events non-atomically (outside of transaction) and do not cover all possible dashboard + // change places, so periodic re-indexing fixes possibly broken state. But ideally we should + // come to an approach which does not require periodic re-indexing at all. One possible way + // is to use DB triggers, see https://github.com/grafana/grafana/pull/47712. lastIndexedEventID := lastEventID go func() { // Do full re-index asynchronously to avoid blocking index synchronization // on read for a long time. + + // We need semaphore here since re-indexing due to build signal may be in progress already. + asyncReIndexSemaphore <- struct{}{} + defer func() { <-asyncReIndexSemaphore }() + started := time.Now() i.logger.Info("Start re-indexing") i.reIndexFromScratch(ctx) diff --git a/pkg/services/searchV2/service.go b/pkg/services/searchV2/service.go index 1e92c7d5619..bfcf080a044 100644 --- a/pkg/services/searchV2/service.go +++ b/pkg/services/searchV2/service.go @@ -28,6 +28,7 @@ type StandardSearchService struct { logger log.Logger dashboardIndex *dashboardIndex extender DashboardIndexExtender + reIndexCh chan struct{} } func ProvideService(cfg *setting.Cfg, sql *sqlstore.SQLStore, entityEventStore store.EntityEventsService, ac accesscontrol.AccessControl) SearchService { @@ -46,8 +47,9 @@ func ProvideService(cfg *setting.Cfg, sql *sqlstore.SQLStore, entityEventStore s extender.GetDocumentExtender(), newFolderIDLookup(sql), ), - logger: log.New("searchV2"), - extender: extender, + logger: log.New("searchV2"), + extender: extender, + reIndexCh: make(chan struct{}, 1), } return s } @@ -69,7 +71,15 @@ func (s *StandardSearchService) Run(ctx context.Context) error { for _, org := range orgQuery.Result { orgIDs = append(orgIDs, org.Id) } - return s.dashboardIndex.run(ctx, orgIDs) + return s.dashboardIndex.run(ctx, orgIDs, s.reIndexCh) +} + +func (s *StandardSearchService) TriggerReIndex() { + select { + case s.reIndexCh <- struct{}{}: + default: + // channel is full => re-index will happen soon anyway. + } } func (s *StandardSearchService) RegisterDashboardIndexExtender(ext DashboardIndexExtender) { diff --git a/pkg/services/searchV2/stub.go b/pkg/services/searchV2/stub.go index 9d0dc6a105d..83f4bc8cb34 100644 --- a/pkg/services/searchV2/stub.go +++ b/pkg/services/searchV2/stub.go @@ -10,6 +10,10 @@ import ( type stubSearchService struct { } +func (s *stubSearchService) TriggerReIndex() { + // noop. +} + func NewStubSearchService() SearchService { return &stubSearchService{} } diff --git a/pkg/services/searchV2/types.go b/pkg/services/searchV2/types.go index 29153079e6c..9be6e5737a5 100644 --- a/pkg/services/searchV2/types.go +++ b/pkg/services/searchV2/types.go @@ -34,4 +34,5 @@ type SearchService interface { registry.BackgroundService DoDashboardQuery(ctx context.Context, user *backend.User, orgId int64, query DashboardQuery) *backend.DataResponse RegisterDashboardIndexExtender(ext DashboardIndexExtender) + TriggerReIndex() }