mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Search: Build indexes on start for all orgs (#50762)
This commit is contained in:
parent
6e44b36a30
commit
54e9408cfd
@ -50,6 +50,13 @@ type dashboard struct {
|
|||||||
info *extract.DashboardInfo
|
info *extract.DashboardInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// buildSignal is sent when search index is accessed in organization for which
|
||||||
|
// we have not constructed an index yet.
|
||||||
|
type buildSignal struct {
|
||||||
|
orgID int64
|
||||||
|
done chan error
|
||||||
|
}
|
||||||
|
|
||||||
type dashboardIndex struct {
|
type dashboardIndex struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
loader dashboardLoader
|
loader dashboardLoader
|
||||||
@ -57,7 +64,7 @@ type dashboardIndex struct {
|
|||||||
perOrgWriter map[int64]*bluge.Writer // orgId -> bluge writer
|
perOrgWriter map[int64]*bluge.Writer // orgId -> bluge writer
|
||||||
eventStore eventStore
|
eventStore eventStore
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
buildSignals chan int64
|
buildSignals chan buildSignal
|
||||||
extender DocumentExtender
|
extender DocumentExtender
|
||||||
folderIdLookup folderUIDLookup
|
folderIdLookup folderUIDLookup
|
||||||
syncCh chan chan struct{}
|
syncCh chan chan struct{}
|
||||||
@ -70,7 +77,7 @@ func newDashboardIndex(dashLoader dashboardLoader, evStore eventStore, extender
|
|||||||
perOrgReader: map[int64]*bluge.Reader{},
|
perOrgReader: map[int64]*bluge.Reader{},
|
||||||
perOrgWriter: map[int64]*bluge.Writer{},
|
perOrgWriter: map[int64]*bluge.Writer{},
|
||||||
logger: log.New("dashboardIndex"),
|
logger: log.New("dashboardIndex"),
|
||||||
buildSignals: make(chan int64),
|
buildSignals: make(chan buildSignal),
|
||||||
extender: extender,
|
extender: extender,
|
||||||
folderIdLookup: folderIDs,
|
folderIdLookup: folderIDs,
|
||||||
syncCh: make(chan chan struct{}),
|
syncCh: make(chan chan struct{}),
|
||||||
@ -92,7 +99,7 @@ func (i *dashboardIndex) sync(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *dashboardIndex) run(ctx context.Context) error {
|
func (i *dashboardIndex) run(ctx context.Context, orgIDs []int64) error {
|
||||||
reIndexInterval := 5 * time.Minute
|
reIndexInterval := 5 * time.Minute
|
||||||
fullReIndexTimer := time.NewTimer(reIndexInterval)
|
fullReIndexTimer := time.NewTimer(reIndexInterval)
|
||||||
defer fullReIndexTimer.Stop()
|
defer fullReIndexTimer.Stop()
|
||||||
@ -110,9 +117,9 @@ func (i *dashboardIndex) run(ctx context.Context) error {
|
|||||||
lastEventID = lastEvent.Id
|
lastEventID = lastEvent.Id
|
||||||
}
|
}
|
||||||
|
|
||||||
err = i.buildInitialIndex(ctx)
|
err = i.buildInitialIndexes(ctx, orgIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't build initial dashboard search index: %w", err)
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Channel to handle signals about asynchronous full re-indexing completion.
|
// Channel to handle signals about asynchronous full re-indexing completion.
|
||||||
@ -126,16 +133,26 @@ func (i *dashboardIndex) run(ctx context.Context) error {
|
|||||||
case <-partialUpdateTimer.C:
|
case <-partialUpdateTimer.C:
|
||||||
lastEventID = i.applyIndexUpdates(ctx, lastEventID)
|
lastEventID = i.applyIndexUpdates(ctx, lastEventID)
|
||||||
partialUpdateTimer.Reset(partialUpdateInterval)
|
partialUpdateTimer.Reset(partialUpdateInterval)
|
||||||
case orgID := <-i.buildSignals:
|
case signal := <-i.buildSignals:
|
||||||
i.mu.RLock()
|
i.mu.RLock()
|
||||||
_, ok := i.perOrgWriter[orgID]
|
_, ok := i.perOrgWriter[signal.orgID]
|
||||||
if ok {
|
if ok {
|
||||||
// Index for org already exists, do nothing.
|
// Index for org already exists, do nothing.
|
||||||
i.mu.RUnlock()
|
i.mu.RUnlock()
|
||||||
|
close(signal.done)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
i.mu.RUnlock()
|
i.mu.RUnlock()
|
||||||
_, _ = i.buildOrgIndex(ctx, orgID)
|
lastIndexedEventID := lastEventID
|
||||||
|
// Prevent full re-indexing while we are building index for new org.
|
||||||
|
// Full re-indexing will be later re-started in `case lastIndexedEventID := <-reIndexDoneCh`
|
||||||
|
// branch.
|
||||||
|
fullReIndexTimer.Stop()
|
||||||
|
go func() {
|
||||||
|
_, err = i.buildOrgIndex(ctx, signal.orgID)
|
||||||
|
signal.done <- err
|
||||||
|
reIndexDoneCh <- lastIndexedEventID
|
||||||
|
}()
|
||||||
case <-fullReIndexTimer.C:
|
case <-fullReIndexTimer.C:
|
||||||
lastIndexedEventID := lastEventID
|
lastIndexedEventID := lastEventID
|
||||||
go func() {
|
go func() {
|
||||||
@ -164,20 +181,32 @@ func (i *dashboardIndex) run(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *dashboardIndex) buildInitialIndex(ctx context.Context) error {
|
func (i *dashboardIndex) buildInitialIndexes(ctx context.Context, orgIDs []int64) error {
|
||||||
|
started := time.Now()
|
||||||
|
i.logger.Info("Start building in-memory indexes")
|
||||||
|
for _, orgID := range orgIDs {
|
||||||
|
err := i.buildInitialIndex(ctx, orgID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("can't build initial dashboard search index for org %d: %w", orgID, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
i.logger.Info("Finish building in-memory indexes", "elapsed", time.Since(started))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *dashboardIndex) buildInitialIndex(ctx context.Context, orgID int64) error {
|
||||||
memCtx, memCancel := context.WithCancel(ctx)
|
memCtx, memCancel := context.WithCancel(ctx)
|
||||||
if os.Getenv("GF_SEARCH_DEBUG") != "" {
|
if os.Getenv("GF_SEARCH_DEBUG") != "" {
|
||||||
go i.debugMemStats(memCtx, 200*time.Millisecond)
|
go i.debugMemStats(memCtx, 200*time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Build on start for orgID 1 but keep lazy for others.
|
|
||||||
started := time.Now()
|
started := time.Now()
|
||||||
numDashboards, err := i.buildOrgIndex(ctx, 1)
|
numDashboards, err := i.buildOrgIndex(ctx, orgID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
memCancel()
|
memCancel()
|
||||||
return fmt.Errorf("can't build dashboard search index for org ID 1: %w", err)
|
return fmt.Errorf("can't build dashboard search index for org ID %d: %w", orgID, err)
|
||||||
}
|
}
|
||||||
i.logger.Info("Indexing for main org finished", "mainOrgIndexElapsed", time.Since(started), "numDashboards", numDashboards)
|
i.logger.Info("Indexing for org finished", "orgIndexElapsed", time.Since(started), "orgId", orgID, "numDashboards", numDashboards)
|
||||||
memCancel()
|
memCancel()
|
||||||
|
|
||||||
if os.Getenv("GF_SEARCH_DEBUG") != "" {
|
if os.Getenv("GF_SEARCH_DEBUG") != "" {
|
||||||
@ -185,7 +214,7 @@ func (i *dashboardIndex) buildInitialIndex(ctx context.Context) error {
|
|||||||
// match to a memory consumption, but at least make give some relative difference understanding.
|
// match to a memory consumption, but at least make give some relative difference understanding.
|
||||||
// Moreover, changes in indexing can cause additional memory consumption upon initial index build
|
// Moreover, changes in indexing can cause additional memory consumption upon initial index build
|
||||||
// which is not reflected here.
|
// which is not reflected here.
|
||||||
i.reportSizeOfIndexDiskBackup(1)
|
i.reportSizeOfIndexDiskBackup(orgID)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -303,6 +332,33 @@ func (i *dashboardIndex) getOrgReader(orgID int64) (*bluge.Reader, bool) {
|
|||||||
return r, ok
|
return r, ok
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (i *dashboardIndex) getOrCreateReader(ctx context.Context, orgID int64) (*bluge.Reader, error) {
|
||||||
|
reader, ok := i.getOrgReader(orgID)
|
||||||
|
if !ok {
|
||||||
|
// For non-main organization indexes are built lazily.
|
||||||
|
// If we don't have an index then we are blocking here until an index for
|
||||||
|
// an organization is ready. This actually takes time only during the first
|
||||||
|
// access, all the consequent search requests do not fall into this branch.
|
||||||
|
doneIndexing := make(chan error, 1)
|
||||||
|
signal := buildSignal{orgID: orgID, done: doneIndexing}
|
||||||
|
select {
|
||||||
|
case i.buildSignals <- signal:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ctx.Err()
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case err := <-doneIndexing:
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ctx.Err()
|
||||||
|
}
|
||||||
|
reader, _ = i.getOrgReader(orgID)
|
||||||
|
}
|
||||||
|
return reader, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (i *dashboardIndex) getOrgWriter(orgID int64) (*bluge.Writer, bool) {
|
func (i *dashboardIndex) getOrgWriter(orgID int64) (*bluge.Writer, bool) {
|
||||||
i.mu.RLock()
|
i.mu.RLock()
|
||||||
defer i.mu.RUnlock()
|
defer i.mu.RUnlock()
|
||||||
|
@ -3,6 +3,7 @@ package searchV2
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/infra/log"
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
"github.com/grafana/grafana/pkg/models"
|
"github.com/grafana/grafana/pkg/models"
|
||||||
@ -59,7 +60,16 @@ func (s *StandardSearchService) IsDisabled() bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *StandardSearchService) Run(ctx context.Context) error {
|
func (s *StandardSearchService) Run(ctx context.Context) error {
|
||||||
return s.dashboardIndex.run(ctx)
|
orgQuery := &models.SearchOrgsQuery{}
|
||||||
|
err := s.sql.SearchOrgs(ctx, orgQuery)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("can't get org list: %w", err)
|
||||||
|
}
|
||||||
|
orgIDs := make([]int64, 0, len(orgQuery.Result))
|
||||||
|
for _, org := range orgQuery.Result {
|
||||||
|
orgIDs = append(orgIDs, org.Id)
|
||||||
|
}
|
||||||
|
return s.dashboardIndex.run(ctx, orgIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *StandardSearchService) RegisterDashboardIndexExtender(ext DashboardIndexExtender) {
|
func (s *StandardSearchService) RegisterDashboardIndexExtender(ext DashboardIndexExtender) {
|
||||||
@ -143,20 +153,17 @@ func (s *StandardSearchService) DoDashboardQuery(ctx context.Context, user *back
|
|||||||
return rsp
|
return rsp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
reader, err := s.dashboardIndex.getOrCreateReader(ctx, orgID)
|
||||||
|
if err != nil {
|
||||||
|
rsp.Error = err
|
||||||
|
return rsp
|
||||||
|
}
|
||||||
|
|
||||||
err = s.dashboardIndex.sync(ctx)
|
err = s.dashboardIndex.sync(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
rsp.Error = err
|
rsp.Error = err
|
||||||
return rsp
|
return rsp
|
||||||
}
|
}
|
||||||
|
|
||||||
reader, ok := s.dashboardIndex.getOrgReader(orgID)
|
|
||||||
if !ok {
|
|
||||||
go func() {
|
|
||||||
s.dashboardIndex.buildSignals <- orgID
|
|
||||||
}()
|
|
||||||
rsp.Error = errors.New("search index is not ready, try again later")
|
|
||||||
return rsp
|
|
||||||
}
|
|
||||||
|
|
||||||
return doSearchQuery(ctx, s.logger, reader, filter, q, s.extender.GetQueryExtender(q), s.cfg.AppSubURL)
|
return doSearchQuery(ctx, s.logger, reader, filter, q, s.extender.GetQueryExtender(q), s.cfg.AppSubURL)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user