diff --git a/pkg/services/searchV2/index.go b/pkg/services/searchV2/index.go index 2610e649ee0..3ad9249a131 100644 --- a/pkg/services/searchV2/index.go +++ b/pkg/services/searchV2/index.go @@ -50,6 +50,13 @@ type dashboard struct { info *extract.DashboardInfo } +// buildSignal is sent when search index is accessed in organization for which +// we have not constructed an index yet. +type buildSignal struct { + orgID int64 + done chan error +} + type dashboardIndex struct { mu sync.RWMutex loader dashboardLoader @@ -57,7 +64,7 @@ type dashboardIndex struct { perOrgWriter map[int64]*bluge.Writer // orgId -> bluge writer eventStore eventStore logger log.Logger - buildSignals chan int64 + buildSignals chan buildSignal extender DocumentExtender folderIdLookup folderUIDLookup syncCh chan chan struct{} @@ -70,7 +77,7 @@ func newDashboardIndex(dashLoader dashboardLoader, evStore eventStore, extender perOrgReader: map[int64]*bluge.Reader{}, perOrgWriter: map[int64]*bluge.Writer{}, logger: log.New("dashboardIndex"), - buildSignals: make(chan int64), + buildSignals: make(chan buildSignal), extender: extender, folderIdLookup: folderIDs, syncCh: make(chan chan struct{}), @@ -92,7 +99,7 @@ func (i *dashboardIndex) sync(ctx context.Context) error { } } -func (i *dashboardIndex) run(ctx context.Context) error { +func (i *dashboardIndex) run(ctx context.Context, orgIDs []int64) error { reIndexInterval := 5 * time.Minute fullReIndexTimer := time.NewTimer(reIndexInterval) defer fullReIndexTimer.Stop() @@ -110,9 +117,9 @@ func (i *dashboardIndex) run(ctx context.Context) error { lastEventID = lastEvent.Id } - err = i.buildInitialIndex(ctx) + err = i.buildInitialIndexes(ctx, orgIDs) if err != nil { - return fmt.Errorf("can't build initial dashboard search index: %w", err) + return err } // Channel to handle signals about asynchronous full re-indexing completion. @@ -126,16 +133,26 @@ func (i *dashboardIndex) run(ctx context.Context) error { case <-partialUpdateTimer.C: lastEventID = i.applyIndexUpdates(ctx, lastEventID) partialUpdateTimer.Reset(partialUpdateInterval) - case orgID := <-i.buildSignals: + case signal := <-i.buildSignals: i.mu.RLock() - _, ok := i.perOrgWriter[orgID] + _, ok := i.perOrgWriter[signal.orgID] if ok { // Index for org already exists, do nothing. i.mu.RUnlock() + close(signal.done) continue } i.mu.RUnlock() - _, _ = i.buildOrgIndex(ctx, orgID) + lastIndexedEventID := lastEventID + // Prevent full re-indexing while we are building index for new org. + // Full re-indexing will be later re-started in `case lastIndexedEventID := <-reIndexDoneCh` + // branch. + fullReIndexTimer.Stop() + go func() { + _, err = i.buildOrgIndex(ctx, signal.orgID) + signal.done <- err + reIndexDoneCh <- lastIndexedEventID + }() case <-fullReIndexTimer.C: lastIndexedEventID := lastEventID go func() { @@ -164,20 +181,32 @@ func (i *dashboardIndex) run(ctx context.Context) error { } } -func (i *dashboardIndex) buildInitialIndex(ctx context.Context) error { +func (i *dashboardIndex) buildInitialIndexes(ctx context.Context, orgIDs []int64) error { + started := time.Now() + i.logger.Info("Start building in-memory indexes") + for _, orgID := range orgIDs { + err := i.buildInitialIndex(ctx, orgID) + if err != nil { + return fmt.Errorf("can't build initial dashboard search index for org %d: %w", orgID, err) + } + } + i.logger.Info("Finish building in-memory indexes", "elapsed", time.Since(started)) + return nil +} + +func (i *dashboardIndex) buildInitialIndex(ctx context.Context, orgID int64) error { memCtx, memCancel := context.WithCancel(ctx) if os.Getenv("GF_SEARCH_DEBUG") != "" { go i.debugMemStats(memCtx, 200*time.Millisecond) } - // Build on start for orgID 1 but keep lazy for others. started := time.Now() - numDashboards, err := i.buildOrgIndex(ctx, 1) + numDashboards, err := i.buildOrgIndex(ctx, orgID) if err != nil { memCancel() - return fmt.Errorf("can't build dashboard search index for org ID 1: %w", err) + return fmt.Errorf("can't build dashboard search index for org ID %d: %w", orgID, err) } - i.logger.Info("Indexing for main org finished", "mainOrgIndexElapsed", time.Since(started), "numDashboards", numDashboards) + i.logger.Info("Indexing for org finished", "orgIndexElapsed", time.Since(started), "orgId", orgID, "numDashboards", numDashboards) memCancel() if os.Getenv("GF_SEARCH_DEBUG") != "" { @@ -185,7 +214,7 @@ func (i *dashboardIndex) buildInitialIndex(ctx context.Context) error { // match to a memory consumption, but at least make give some relative difference understanding. // Moreover, changes in indexing can cause additional memory consumption upon initial index build // which is not reflected here. - i.reportSizeOfIndexDiskBackup(1) + i.reportSizeOfIndexDiskBackup(orgID) } return nil } @@ -303,6 +332,33 @@ func (i *dashboardIndex) getOrgReader(orgID int64) (*bluge.Reader, bool) { return r, ok } +func (i *dashboardIndex) getOrCreateReader(ctx context.Context, orgID int64) (*bluge.Reader, error) { + reader, ok := i.getOrgReader(orgID) + if !ok { + // For non-main organization indexes are built lazily. + // If we don't have an index then we are blocking here until an index for + // an organization is ready. This actually takes time only during the first + // access, all the consequent search requests do not fall into this branch. + doneIndexing := make(chan error, 1) + signal := buildSignal{orgID: orgID, done: doneIndexing} + select { + case i.buildSignals <- signal: + case <-ctx.Done(): + return nil, ctx.Err() + } + select { + case err := <-doneIndexing: + if err != nil { + return nil, err + } + case <-ctx.Done(): + return nil, ctx.Err() + } + reader, _ = i.getOrgReader(orgID) + } + return reader, nil +} + func (i *dashboardIndex) getOrgWriter(orgID int64) (*bluge.Writer, bool) { i.mu.RLock() defer i.mu.RUnlock() diff --git a/pkg/services/searchV2/service.go b/pkg/services/searchV2/service.go index e329b2bfa8b..1e92c7d5619 100644 --- a/pkg/services/searchV2/service.go +++ b/pkg/services/searchV2/service.go @@ -3,6 +3,7 @@ package searchV2 import ( "context" "errors" + "fmt" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/models" @@ -59,7 +60,16 @@ func (s *StandardSearchService) IsDisabled() bool { } func (s *StandardSearchService) Run(ctx context.Context) error { - return s.dashboardIndex.run(ctx) + orgQuery := &models.SearchOrgsQuery{} + err := s.sql.SearchOrgs(ctx, orgQuery) + if err != nil { + return fmt.Errorf("can't get org list: %w", err) + } + orgIDs := make([]int64, 0, len(orgQuery.Result)) + for _, org := range orgQuery.Result { + orgIDs = append(orgIDs, org.Id) + } + return s.dashboardIndex.run(ctx, orgIDs) } func (s *StandardSearchService) RegisterDashboardIndexExtender(ext DashboardIndexExtender) { @@ -143,20 +153,17 @@ func (s *StandardSearchService) DoDashboardQuery(ctx context.Context, user *back return rsp } + reader, err := s.dashboardIndex.getOrCreateReader(ctx, orgID) + if err != nil { + rsp.Error = err + return rsp + } + err = s.dashboardIndex.sync(ctx) if err != nil { rsp.Error = err return rsp } - reader, ok := s.dashboardIndex.getOrgReader(orgID) - if !ok { - go func() { - s.dashboardIndex.buildSignals <- orgID - }() - rsp.Error = errors.New("search index is not ready, try again later") - return rsp - } - return doSearchQuery(ctx, s.logger, reader, filter, q, s.extender.GetQueryExtender(q), s.cfg.AppSubURL) }