mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Search: Sync state on read for HA consistency (#50152)
This commit is contained in:
@@ -60,6 +60,7 @@ type dashboardIndex struct {
|
|||||||
buildSignals chan int64
|
buildSignals chan int64
|
||||||
extender DocumentExtender
|
extender DocumentExtender
|
||||||
folderIdLookup folderUIDLookup
|
folderIdLookup folderUIDLookup
|
||||||
|
syncCh chan chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newDashboardIndex(dashLoader dashboardLoader, evStore eventStore, extender DocumentExtender, folderIDs folderUIDLookup) *dashboardIndex {
|
func newDashboardIndex(dashLoader dashboardLoader, evStore eventStore, extender DocumentExtender, folderIDs folderUIDLookup) *dashboardIndex {
|
||||||
@@ -72,15 +73,33 @@ func newDashboardIndex(dashLoader dashboardLoader, evStore eventStore, extender
|
|||||||
buildSignals: make(chan int64),
|
buildSignals: make(chan int64),
|
||||||
extender: extender,
|
extender: extender,
|
||||||
folderIdLookup: folderIDs,
|
folderIdLookup: folderIDs,
|
||||||
|
syncCh: make(chan chan struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *dashboardIndex) sync(ctx context.Context) error {
|
||||||
|
doneCh := make(chan struct{}, 1)
|
||||||
|
select {
|
||||||
|
case i.syncCh <- doneCh:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-doneCh:
|
||||||
|
return nil
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *dashboardIndex) run(ctx context.Context) error {
|
func (i *dashboardIndex) run(ctx context.Context) error {
|
||||||
fullReIndexTicker := time.NewTicker(5 * time.Minute)
|
reIndexInterval := 5 * time.Minute
|
||||||
defer fullReIndexTicker.Stop()
|
fullReIndexTimer := time.NewTimer(reIndexInterval)
|
||||||
|
defer fullReIndexTimer.Stop()
|
||||||
|
|
||||||
partialUpdateTicker := time.NewTicker(5 * time.Second)
|
partialUpdateInterval := 5 * time.Second
|
||||||
defer partialUpdateTicker.Stop()
|
partialUpdateTimer := time.NewTimer(partialUpdateInterval)
|
||||||
|
defer partialUpdateTimer.Stop()
|
||||||
|
|
||||||
var lastEventID int64
|
var lastEventID int64
|
||||||
lastEvent, err := i.eventStore.GetLastEvent(ctx)
|
lastEvent, err := i.eventStore.GetLastEvent(ctx)
|
||||||
@@ -96,12 +115,17 @@ func (i *dashboardIndex) run(ctx context.Context) error {
|
|||||||
return fmt.Errorf("can't build initial dashboard search index: %w", err)
|
return fmt.Errorf("can't build initial dashboard search index: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
i.eventStore.OnEvent(i.applyEventOnIndex)
|
// Channel to handle signals about asynchronous full re-indexing completion.
|
||||||
|
reIndexDoneCh := make(chan int64, 1)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-partialUpdateTicker.C:
|
case doneCh := <-i.syncCh:
|
||||||
lastEventID = i.applyIndexUpdates(ctx, lastEventID)
|
lastEventID = i.applyIndexUpdates(ctx, lastEventID)
|
||||||
|
close(doneCh)
|
||||||
|
case <-partialUpdateTimer.C:
|
||||||
|
lastEventID = i.applyIndexUpdates(ctx, lastEventID)
|
||||||
|
partialUpdateTimer.Reset(partialUpdateInterval)
|
||||||
case orgID := <-i.buildSignals:
|
case orgID := <-i.buildSignals:
|
||||||
i.mu.RLock()
|
i.mu.RLock()
|
||||||
_, ok := i.perOrgWriter[orgID]
|
_, ok := i.perOrgWriter[orgID]
|
||||||
@@ -112,10 +136,28 @@ func (i *dashboardIndex) run(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
i.mu.RUnlock()
|
i.mu.RUnlock()
|
||||||
_, _ = i.buildOrgIndex(ctx, orgID)
|
_, _ = i.buildOrgIndex(ctx, orgID)
|
||||||
case <-fullReIndexTicker.C:
|
case <-fullReIndexTimer.C:
|
||||||
started := time.Now()
|
lastIndexedEventID := lastEventID
|
||||||
i.reIndexFromScratch(ctx)
|
go func() {
|
||||||
i.logger.Info("Full re-indexing finished", "fullReIndexElapsed", time.Since(started))
|
// Do full re-index asynchronously to avoid blocking index synchronization
|
||||||
|
// on read for a long time.
|
||||||
|
started := time.Now()
|
||||||
|
i.logger.Info("Start re-indexing")
|
||||||
|
i.reIndexFromScratch(ctx)
|
||||||
|
i.logger.Info("Full re-indexing finished", "fullReIndexElapsed", time.Since(started))
|
||||||
|
reIndexDoneCh <- lastIndexedEventID
|
||||||
|
}()
|
||||||
|
case lastIndexedEventID := <-reIndexDoneCh:
|
||||||
|
// Asynchronous re-indexing is finished. Set lastEventID to the value which
|
||||||
|
// was actual at the re-indexing start – so that we could re-apply all the
|
||||||
|
// events happened during async index build process and make sure it's consistent.
|
||||||
|
if lastEventID != lastIndexedEventID {
|
||||||
|
i.logger.Info("Re-apply event ID to last indexed", "currentEventID", lastEventID, "lastIndexedEventID", lastIndexedEventID)
|
||||||
|
lastEventID = lastIndexedEventID
|
||||||
|
// Apply events immediately.
|
||||||
|
partialUpdateTimer.Reset(0)
|
||||||
|
}
|
||||||
|
fullReIndexTimer.Reset(reIndexInterval)
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -143,6 +143,12 @@ func (s *StandardSearchService) DoDashboardQuery(ctx context.Context, user *back
|
|||||||
return rsp
|
return rsp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = s.dashboardIndex.sync(ctx)
|
||||||
|
if err != nil {
|
||||||
|
rsp.Error = err
|
||||||
|
return rsp
|
||||||
|
}
|
||||||
|
|
||||||
reader, ok := s.dashboardIndex.getOrgReader(orgID)
|
reader, ok := s.dashboardIndex.getOrgReader(orgID)
|
||||||
if !ok {
|
if !ok {
|
||||||
go func() {
|
go func() {
|
||||||
|
|||||||
Reference in New Issue
Block a user