mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Search: Fix indexing - re-index after initial provisioning (#50959)
This commit is contained in:
parent
ee3f4f1709
commit
120c92b950
@ -14,6 +14,7 @@ import (
|
|||||||
// DashboardProvisioner is responsible for syncing dashboard from disk to
|
// DashboardProvisioner is responsible for syncing dashboard from disk to
|
||||||
// Grafana's database.
|
// Grafana's database.
|
||||||
type DashboardProvisioner interface {
|
type DashboardProvisioner interface {
|
||||||
|
HasDashboardSources() bool
|
||||||
Provision(ctx context.Context) error
|
Provision(ctx context.Context) error
|
||||||
PollChanges(ctx context.Context)
|
PollChanges(ctx context.Context)
|
||||||
GetProvisionerResolvedPath(name string) string
|
GetProvisionerResolvedPath(name string) string
|
||||||
@ -33,6 +34,10 @@ type Provisioner struct {
|
|||||||
provisioner dashboards.DashboardProvisioningService
|
provisioner dashboards.DashboardProvisioningService
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (provider *Provisioner) HasDashboardSources() bool {
|
||||||
|
return len(provider.fileReaders) > 0
|
||||||
|
}
|
||||||
|
|
||||||
// New returns a new DashboardProvisioner
|
// New returns a new DashboardProvisioner
|
||||||
func New(ctx context.Context, configDirectory string, provisioner dashboards.DashboardProvisioningService, orgStore utils.OrgStore, dashboardStore utils.DashboardStore) (DashboardProvisioner, error) {
|
func New(ctx context.Context, configDirectory string, provisioner dashboards.DashboardProvisioningService, orgStore utils.OrgStore, dashboardStore utils.DashboardStore) (DashboardProvisioner, error) {
|
||||||
logger := log.New("provisioning.dashboard")
|
logger := log.New("provisioning.dashboard")
|
||||||
|
@ -26,6 +26,10 @@ func NewDashboardProvisionerMock() *ProvisionerMock {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (dpm *ProvisionerMock) HasDashboardSources() bool {
|
||||||
|
return dpm.ProvisionFunc != nil
|
||||||
|
}
|
||||||
|
|
||||||
// Provision is a mock implementation of `Provisioner.Provision`
|
// Provision is a mock implementation of `Provisioner.Provision`
|
||||||
func (dpm *ProvisionerMock) Provision(ctx context.Context) error {
|
func (dpm *ProvisionerMock) Provision(ctx context.Context) error {
|
||||||
dpm.Calls.Provision = append(dpm.Calls.Provision, nil)
|
dpm.Calls.Provision = append(dpm.Calls.Provision, nil)
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
"github.com/grafana/grafana/pkg/services/provisioning/notifiers"
|
"github.com/grafana/grafana/pkg/services/provisioning/notifiers"
|
||||||
"github.com/grafana/grafana/pkg/services/provisioning/plugins"
|
"github.com/grafana/grafana/pkg/services/provisioning/plugins"
|
||||||
"github.com/grafana/grafana/pkg/services/provisioning/utils"
|
"github.com/grafana/grafana/pkg/services/provisioning/utils"
|
||||||
|
"github.com/grafana/grafana/pkg/services/searchV2"
|
||||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||||
"github.com/grafana/grafana/pkg/setting"
|
"github.com/grafana/grafana/pkg/setting"
|
||||||
)
|
)
|
||||||
@ -30,6 +31,7 @@ func ProvideService(cfg *setting.Cfg, sqlStore *sqlstore.SQLStore, pluginStore p
|
|||||||
datasourceService datasourceservice.DataSourceService,
|
datasourceService datasourceservice.DataSourceService,
|
||||||
dashboardService dashboardservice.DashboardService,
|
dashboardService dashboardservice.DashboardService,
|
||||||
alertingService *alerting.AlertNotificationService, pluginSettings pluginsettings.Service,
|
alertingService *alerting.AlertNotificationService, pluginSettings pluginsettings.Service,
|
||||||
|
searchService searchV2.SearchService,
|
||||||
) (*ProvisioningServiceImpl, error) {
|
) (*ProvisioningServiceImpl, error) {
|
||||||
s := &ProvisioningServiceImpl{
|
s := &ProvisioningServiceImpl{
|
||||||
Cfg: cfg,
|
Cfg: cfg,
|
||||||
@ -47,6 +49,7 @@ func ProvideService(cfg *setting.Cfg, sqlStore *sqlstore.SQLStore, pluginStore p
|
|||||||
datasourceService: datasourceService,
|
datasourceService: datasourceService,
|
||||||
alertingService: alertingService,
|
alertingService: alertingService,
|
||||||
pluginsSettings: pluginSettings,
|
pluginsSettings: pluginSettings,
|
||||||
|
searchService: searchService,
|
||||||
}
|
}
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
@ -108,6 +111,7 @@ type ProvisioningServiceImpl struct {
|
|||||||
datasourceService datasourceservice.DataSourceService
|
datasourceService datasourceservice.DataSourceService
|
||||||
alertingService *alerting.AlertNotificationService
|
alertingService *alerting.AlertNotificationService
|
||||||
pluginsSettings pluginsettings.Service
|
pluginsSettings pluginsettings.Service
|
||||||
|
searchService searchV2.SearchService
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ps *ProvisioningServiceImpl) RunInitProvisioners(ctx context.Context) error {
|
func (ps *ProvisioningServiceImpl) RunInitProvisioners(ctx context.Context) error {
|
||||||
@ -135,6 +139,9 @@ func (ps *ProvisioningServiceImpl) Run(ctx context.Context) error {
|
|||||||
ps.log.Error("Failed to provision dashboard", "error", err)
|
ps.log.Error("Failed to provision dashboard", "error", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if ps.dashboardProvisioner.HasDashboardSources() {
|
||||||
|
ps.searchService.TriggerReIndex()
|
||||||
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
// Wait for unlock. This is tied to new dashboardProvisioner to be instantiated before we start polling.
|
// Wait for unlock. This is tied to new dashboardProvisioner to be instantiated before we start polling.
|
||||||
|
@ -101,7 +101,7 @@ func (i *dashboardIndex) sync(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *dashboardIndex) run(ctx context.Context, orgIDs []int64) error {
|
func (i *dashboardIndex) run(ctx context.Context, orgIDs []int64, reIndexSignalCh chan struct{}) error {
|
||||||
reIndexInterval := 5 * time.Minute
|
reIndexInterval := 5 * time.Minute
|
||||||
fullReIndexTimer := time.NewTimer(reIndexInterval)
|
fullReIndexTimer := time.NewTimer(reIndexInterval)
|
||||||
defer fullReIndexTimer.Stop()
|
defer fullReIndexTimer.Stop()
|
||||||
@ -124,18 +124,28 @@ func (i *dashboardIndex) run(ctx context.Context, orgIDs []int64) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This semaphore channel allows limiting concurrent async re-indexing routines to 1.
|
||||||
|
asyncReIndexSemaphore := make(chan struct{}, 1)
|
||||||
|
|
||||||
// Channel to handle signals about asynchronous full re-indexing completion.
|
// Channel to handle signals about asynchronous full re-indexing completion.
|
||||||
reIndexDoneCh := make(chan int64, 1)
|
reIndexDoneCh := make(chan int64, 1)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case doneCh := <-i.syncCh:
|
case doneCh := <-i.syncCh:
|
||||||
|
// Executed on search read requests to make sure index is consistent.
|
||||||
lastEventID = i.applyIndexUpdates(ctx, lastEventID)
|
lastEventID = i.applyIndexUpdates(ctx, lastEventID)
|
||||||
close(doneCh)
|
close(doneCh)
|
||||||
case <-partialUpdateTimer.C:
|
case <-partialUpdateTimer.C:
|
||||||
|
// Periodically apply updates collected in entity events table.
|
||||||
lastEventID = i.applyIndexUpdates(ctx, lastEventID)
|
lastEventID = i.applyIndexUpdates(ctx, lastEventID)
|
||||||
partialUpdateTimer.Reset(partialUpdateInterval)
|
partialUpdateTimer.Reset(partialUpdateInterval)
|
||||||
|
case <-reIndexSignalCh:
|
||||||
|
// External systems may trigger re-indexing, at this moment provisioning does this.
|
||||||
|
i.logger.Info("Full re-indexing due to external signal")
|
||||||
|
fullReIndexTimer.Reset(0)
|
||||||
case signal := <-i.buildSignals:
|
case signal := <-i.buildSignals:
|
||||||
|
// When search read request meets new not-indexed org we build index for it.
|
||||||
i.mu.RLock()
|
i.mu.RLock()
|
||||||
_, ok := i.perOrgWriter[signal.orgID]
|
_, ok := i.perOrgWriter[signal.orgID]
|
||||||
if ok {
|
if ok {
|
||||||
@ -151,15 +161,28 @@ func (i *dashboardIndex) run(ctx context.Context, orgIDs []int64) error {
|
|||||||
// branch.
|
// branch.
|
||||||
fullReIndexTimer.Stop()
|
fullReIndexTimer.Stop()
|
||||||
go func() {
|
go func() {
|
||||||
|
// We need semaphore here since asynchronous re-indexing may be in progress already.
|
||||||
|
asyncReIndexSemaphore <- struct{}{}
|
||||||
|
defer func() { <-asyncReIndexSemaphore }()
|
||||||
_, err = i.buildOrgIndex(ctx, signal.orgID)
|
_, err = i.buildOrgIndex(ctx, signal.orgID)
|
||||||
signal.done <- err
|
signal.done <- err
|
||||||
reIndexDoneCh <- lastIndexedEventID
|
reIndexDoneCh <- lastIndexedEventID
|
||||||
}()
|
}()
|
||||||
case <-fullReIndexTimer.C:
|
case <-fullReIndexTimer.C:
|
||||||
|
// Periodically rebuild indexes since we could miss updates. At this moment we are issuing
|
||||||
|
// entity events non-atomically (outside of transaction) and do not cover all possible dashboard
|
||||||
|
// change places, so periodic re-indexing fixes possibly broken state. But ideally we should
|
||||||
|
// come to an approach which does not require periodic re-indexing at all. One possible way
|
||||||
|
// is to use DB triggers, see https://github.com/grafana/grafana/pull/47712.
|
||||||
lastIndexedEventID := lastEventID
|
lastIndexedEventID := lastEventID
|
||||||
go func() {
|
go func() {
|
||||||
// Do full re-index asynchronously to avoid blocking index synchronization
|
// Do full re-index asynchronously to avoid blocking index synchronization
|
||||||
// on read for a long time.
|
// on read for a long time.
|
||||||
|
|
||||||
|
// We need semaphore here since re-indexing due to build signal may be in progress already.
|
||||||
|
asyncReIndexSemaphore <- struct{}{}
|
||||||
|
defer func() { <-asyncReIndexSemaphore }()
|
||||||
|
|
||||||
started := time.Now()
|
started := time.Now()
|
||||||
i.logger.Info("Start re-indexing")
|
i.logger.Info("Start re-indexing")
|
||||||
i.reIndexFromScratch(ctx)
|
i.reIndexFromScratch(ctx)
|
||||||
|
@ -28,6 +28,7 @@ type StandardSearchService struct {
|
|||||||
logger log.Logger
|
logger log.Logger
|
||||||
dashboardIndex *dashboardIndex
|
dashboardIndex *dashboardIndex
|
||||||
extender DashboardIndexExtender
|
extender DashboardIndexExtender
|
||||||
|
reIndexCh chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ProvideService(cfg *setting.Cfg, sql *sqlstore.SQLStore, entityEventStore store.EntityEventsService, ac accesscontrol.AccessControl) SearchService {
|
func ProvideService(cfg *setting.Cfg, sql *sqlstore.SQLStore, entityEventStore store.EntityEventsService, ac accesscontrol.AccessControl) SearchService {
|
||||||
@ -46,8 +47,9 @@ func ProvideService(cfg *setting.Cfg, sql *sqlstore.SQLStore, entityEventStore s
|
|||||||
extender.GetDocumentExtender(),
|
extender.GetDocumentExtender(),
|
||||||
newFolderIDLookup(sql),
|
newFolderIDLookup(sql),
|
||||||
),
|
),
|
||||||
logger: log.New("searchV2"),
|
logger: log.New("searchV2"),
|
||||||
extender: extender,
|
extender: extender,
|
||||||
|
reIndexCh: make(chan struct{}, 1),
|
||||||
}
|
}
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
@ -69,7 +71,15 @@ func (s *StandardSearchService) Run(ctx context.Context) error {
|
|||||||
for _, org := range orgQuery.Result {
|
for _, org := range orgQuery.Result {
|
||||||
orgIDs = append(orgIDs, org.Id)
|
orgIDs = append(orgIDs, org.Id)
|
||||||
}
|
}
|
||||||
return s.dashboardIndex.run(ctx, orgIDs)
|
return s.dashboardIndex.run(ctx, orgIDs, s.reIndexCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *StandardSearchService) TriggerReIndex() {
|
||||||
|
select {
|
||||||
|
case s.reIndexCh <- struct{}{}:
|
||||||
|
default:
|
||||||
|
// channel is full => re-index will happen soon anyway.
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *StandardSearchService) RegisterDashboardIndexExtender(ext DashboardIndexExtender) {
|
func (s *StandardSearchService) RegisterDashboardIndexExtender(ext DashboardIndexExtender) {
|
||||||
|
@ -10,6 +10,10 @@ import (
|
|||||||
type stubSearchService struct {
|
type stubSearchService struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *stubSearchService) TriggerReIndex() {
|
||||||
|
// noop.
|
||||||
|
}
|
||||||
|
|
||||||
func NewStubSearchService() SearchService {
|
func NewStubSearchService() SearchService {
|
||||||
return &stubSearchService{}
|
return &stubSearchService{}
|
||||||
}
|
}
|
||||||
|
@ -34,4 +34,5 @@ type SearchService interface {
|
|||||||
registry.BackgroundService
|
registry.BackgroundService
|
||||||
DoDashboardQuery(ctx context.Context, user *backend.User, orgId int64, query DashboardQuery) *backend.DataResponse
|
DoDashboardQuery(ctx context.Context, user *backend.User, orgId int64, query DashboardQuery) *backend.DataResponse
|
||||||
RegisterDashboardIndexExtender(ext DashboardIndexExtender)
|
RegisterDashboardIndexExtender(ext DashboardIndexExtender)
|
||||||
|
TriggerReIndex()
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user