mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Search: use only bluge-based search (#48968)
This commit is contained in:
@@ -31,15 +31,6 @@ type eventStore interface {
|
||||
GetAllEventsAfter(ctx context.Context, id int64) ([]*store.EntityEvent, error)
|
||||
}
|
||||
|
||||
type dashboardIndex struct {
|
||||
mu sync.RWMutex
|
||||
loader dashboardLoader
|
||||
dashboards map[int64][]dashboard // orgId -> []dashboards
|
||||
reader map[int64]*bluge.Reader // orgId -> bluge index
|
||||
eventStore eventStore
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
type dashboard struct {
|
||||
id int64
|
||||
uid string
|
||||
@@ -51,13 +42,24 @@ type dashboard struct {
|
||||
info *extract.DashboardInfo
|
||||
}
|
||||
|
||||
type dashboardIndex struct {
|
||||
mu sync.RWMutex
|
||||
loader dashboardLoader
|
||||
perOrgReader map[int64]*bluge.Reader // orgId -> bluge reader
|
||||
perOrgWriter map[int64]*bluge.Writer // orgId -> bluge writer
|
||||
eventStore eventStore
|
||||
logger log.Logger
|
||||
buildSignals chan int64
|
||||
}
|
||||
|
||||
func newDashboardIndex(dashLoader dashboardLoader, evStore eventStore) *dashboardIndex {
|
||||
return &dashboardIndex{
|
||||
loader: dashLoader,
|
||||
eventStore: evStore,
|
||||
dashboards: map[int64][]dashboard{},
|
||||
reader: map[int64]*bluge.Reader{},
|
||||
logger: log.New("dashboardIndex"),
|
||||
loader: dashLoader,
|
||||
eventStore: evStore,
|
||||
perOrgReader: map[int64]*bluge.Reader{},
|
||||
perOrgWriter: map[int64]*bluge.Writer{},
|
||||
logger: log.New("dashboardIndex"),
|
||||
buildSignals: make(chan int64),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -79,19 +81,26 @@ func (i *dashboardIndex) run(ctx context.Context) error {
|
||||
|
||||
// Build on start for orgID 1 but keep lazy for others.
|
||||
started := time.Now()
|
||||
dashboards, err := i.getDashboards(ctx, 1)
|
||||
numDashboards, err := i.buildOrgIndex(ctx, 1)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't build dashboard search index for org ID 1: %w", err)
|
||||
}
|
||||
i.logger.Info("Indexing for main org finished", "mainOrgIndexElapsed", time.Since(started), "numDashboards", len(dashboards))
|
||||
|
||||
// build bluge index on startup (will catch panics)
|
||||
go i.reIndexFromScratchBluge(ctx)
|
||||
i.logger.Info("Indexing for main org finished", "mainOrgIndexElapsed", time.Since(started), "numDashboards", numDashboards)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-partialUpdateTicker.C:
|
||||
lastEventID = i.applyIndexUpdates(ctx, lastEventID)
|
||||
case orgID := <-i.buildSignals:
|
||||
i.mu.RLock()
|
||||
_, ok := i.perOrgWriter[orgID]
|
||||
if ok {
|
||||
// Index for org already exists, do nothing.
|
||||
i.mu.RUnlock()
|
||||
continue
|
||||
}
|
||||
i.mu.RUnlock()
|
||||
_, _ = i.buildOrgIndex(ctx, orgID)
|
||||
case <-fullReIndexTicker.C:
|
||||
started := time.Now()
|
||||
i.reIndexFromScratch(ctx)
|
||||
@@ -102,82 +111,67 @@ func (i *dashboardIndex) run(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (i *dashboardIndex) reIndexFromScratch(ctx context.Context) {
|
||||
i.mu.RLock()
|
||||
orgIDs := make([]int64, 0, len(i.dashboards))
|
||||
for orgID := range i.dashboards {
|
||||
orgIDs = append(orgIDs, orgID)
|
||||
}
|
||||
i.mu.RUnlock()
|
||||
func (i *dashboardIndex) buildOrgIndex(ctx context.Context, orgID int64) (int, error) {
|
||||
started := time.Now()
|
||||
ctx, cancel := context.WithTimeout(ctx, time.Minute)
|
||||
defer cancel()
|
||||
|
||||
for _, orgID := range orgIDs {
|
||||
started := time.Now()
|
||||
ctx, cancel := context.WithTimeout(ctx, time.Minute)
|
||||
dashboards, err := i.loader.LoadDashboards(ctx, orgID, "")
|
||||
if err != nil {
|
||||
cancel()
|
||||
i.logger.Error("Error re-indexing dashboards for organization", "orgId", orgID, "error", err)
|
||||
continue
|
||||
}
|
||||
cancel()
|
||||
i.logger.Info("Re-indexed dashboards for organization", "orgId", orgID, "orgReIndexElapsed", time.Since(started))
|
||||
i.mu.Lock()
|
||||
i.dashboards[orgID] = dashboards
|
||||
i.mu.Unlock()
|
||||
i.logger.Info("Start building org index", "orgId", orgID)
|
||||
dashboards, err := i.loader.LoadDashboards(ctx, orgID, "")
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("error loading dashboards: %w", err)
|
||||
}
|
||||
orgSearchIndexLoadTime := time.Since(started)
|
||||
i.logger.Info("Finish loading org dashboards", "elapsed", orgSearchIndexLoadTime, "orgId", orgID)
|
||||
|
||||
reader, writer, err := initIndex(dashboards, i.logger)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("error initializing index: %w", err)
|
||||
}
|
||||
orgSearchIndexTotalTime := time.Since(started)
|
||||
orgSearchIndexBuildTime := orgSearchIndexTotalTime - orgSearchIndexLoadTime
|
||||
|
||||
i.logger.Info("Re-indexed dashboards for organization",
|
||||
"orgId", orgID,
|
||||
"orgSearchIndexLoadTime", orgSearchIndexLoadTime,
|
||||
"orgSearchIndexBuildTime", orgSearchIndexBuildTime,
|
||||
"orgSearchIndexTotalTime", orgSearchIndexTotalTime)
|
||||
|
||||
i.mu.Lock()
|
||||
i.perOrgReader[orgID] = reader
|
||||
i.perOrgWriter[orgID] = writer
|
||||
i.mu.Unlock()
|
||||
return len(dashboards), nil
|
||||
}
|
||||
|
||||
// Variation of the above function that builds bluge index from scratch
|
||||
// Once the frontend is wired up, we should switch to this one
|
||||
func (i *dashboardIndex) reIndexFromScratchBluge(ctx context.Context) {
|
||||
// Catch Panic (just in case)
|
||||
defer func() {
|
||||
recv := recover()
|
||||
if recv != nil {
|
||||
i.logger.Error("panic in search runner", "recv", recv) // REMVOE after we are sure it works!
|
||||
}
|
||||
}()
|
||||
|
||||
func (i *dashboardIndex) getOrgReader(orgID int64) (*bluge.Reader, bool) {
|
||||
i.mu.RLock()
|
||||
orgIDs := make([]int64, 0, len(i.dashboards))
|
||||
for orgID := range i.dashboards {
|
||||
defer i.mu.RUnlock()
|
||||
r, ok := i.perOrgReader[orgID]
|
||||
return r, ok
|
||||
}
|
||||
|
||||
func (i *dashboardIndex) getOrgWriter(orgID int64) (*bluge.Writer, bool) {
|
||||
i.mu.RLock()
|
||||
defer i.mu.RUnlock()
|
||||
w, ok := i.perOrgWriter[orgID]
|
||||
return w, ok
|
||||
}
|
||||
|
||||
func (i *dashboardIndex) reIndexFromScratch(ctx context.Context) {
|
||||
i.mu.RLock()
|
||||
orgIDs := make([]int64, 0, len(i.perOrgWriter))
|
||||
for orgID := range i.perOrgWriter {
|
||||
orgIDs = append(orgIDs, orgID)
|
||||
}
|
||||
i.mu.RUnlock()
|
||||
if len(orgIDs) < 1 {
|
||||
orgIDs = append(orgIDs, int64(1)) // make sure we index
|
||||
}
|
||||
|
||||
for _, orgID := range orgIDs {
|
||||
started := time.Now()
|
||||
ctx, cancel := context.WithTimeout(ctx, time.Minute)
|
||||
|
||||
dashboards, err := i.loader.LoadDashboards(ctx, orgID, "")
|
||||
_, err := i.buildOrgIndex(ctx, orgID)
|
||||
if err != nil {
|
||||
cancel()
|
||||
i.logger.Error("Error re-indexing dashboards for organization", "orgId", orgID, "error", err)
|
||||
continue
|
||||
}
|
||||
orgSearchIndexLoadTime := time.Since(started)
|
||||
|
||||
reader, err := initBlugeIndex(dashboards, i.logger)
|
||||
if err != nil {
|
||||
cancel()
|
||||
i.logger.Error("Error re-indexing dashboards for organization", "orgId", orgID, "error", err)
|
||||
continue
|
||||
}
|
||||
orgSearchIndexTotalTime := time.Since(started)
|
||||
orgSearchIndexBuildTime := orgSearchIndexTotalTime - orgSearchIndexLoadTime
|
||||
|
||||
cancel()
|
||||
i.logger.Info("Re-indexed dashboards for organization (bluge)",
|
||||
"orgId", orgID,
|
||||
"orgSearchIndexLoadTime", orgSearchIndexLoadTime,
|
||||
"orgSearchIndexBuildTime", orgSearchIndexBuildTime,
|
||||
"orgSearchIndexTotalTime", orgSearchIndexTotalTime)
|
||||
i.mu.Lock()
|
||||
i.reader[orgID] = reader
|
||||
i.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -231,7 +225,7 @@ func (i *dashboardIndex) applyEventOnIndex(ctx context.Context, e *store.EntityE
|
||||
|
||||
func (i *dashboardIndex) applyDashboardEvent(ctx context.Context, orgID int64, dashboardUID string, _ store.EntityEventType) error {
|
||||
i.mu.Lock()
|
||||
_, ok := i.dashboards[orgID]
|
||||
_, ok := i.perOrgWriter[orgID]
|
||||
if !ok {
|
||||
// Skip event for org not yet indexed.
|
||||
i.mu.Unlock()
|
||||
@@ -247,64 +241,112 @@ func (i *dashboardIndex) applyDashboardEvent(ctx context.Context, orgID int64, d
|
||||
i.mu.Lock()
|
||||
defer i.mu.Unlock()
|
||||
|
||||
dashboards, ok := i.dashboards[orgID]
|
||||
writer, ok := i.perOrgWriter[orgID]
|
||||
if !ok {
|
||||
// Skip event for org not yet fully indexed.
|
||||
return nil
|
||||
}
|
||||
reader, ok := i.perOrgReader[orgID]
|
||||
if !ok {
|
||||
// Skip event for org not yet fully indexed.
|
||||
return nil
|
||||
}
|
||||
|
||||
var newReader *bluge.Reader
|
||||
|
||||
// In the future we can rely on operation types to reduce work here.
|
||||
if len(dbDashboards) == 0 {
|
||||
// Delete.
|
||||
i.dashboards[orgID] = removeDashboard(dashboards, dashboardUID)
|
||||
newReader, err = i.removeDashboard(writer, reader, dashboardUID)
|
||||
} else {
|
||||
updated := false
|
||||
for i, d := range dashboards {
|
||||
if d.uid == dashboardUID {
|
||||
// Update.
|
||||
dashboards[i] = dbDashboards[0]
|
||||
updated = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !updated {
|
||||
// Create.
|
||||
dashboards = append(dashboards, dbDashboards...)
|
||||
}
|
||||
i.dashboards[orgID] = dashboards
|
||||
newReader, err = i.updateDashboard(writer, reader, dbDashboards[0])
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
i.perOrgReader[orgID] = newReader
|
||||
return nil
|
||||
}
|
||||
|
||||
func removeDashboard(dashboards []dashboard, dashboardUID string) []dashboard {
|
||||
k := 0
|
||||
for _, d := range dashboards {
|
||||
if d.uid != dashboardUID {
|
||||
dashboards[k] = d
|
||||
k++
|
||||
}
|
||||
func (i *dashboardIndex) removeDashboard(writer *bluge.Writer, reader *bluge.Reader, dashboardUID string) (*bluge.Reader, error) {
|
||||
// Find all panel docs to remove with dashboard.
|
||||
panelIDs, err := getDashboardPanelIDs(reader, dashboardUID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return dashboards[:k]
|
||||
|
||||
batch := bluge.NewBatch()
|
||||
batch.Delete(bluge.NewDocument(dashboardUID).ID())
|
||||
for _, panelID := range panelIDs {
|
||||
batch.Delete(bluge.NewDocument(panelID).ID())
|
||||
}
|
||||
|
||||
err = writer.Batch(batch)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return writer.Reader()
|
||||
}
|
||||
|
||||
func (i *dashboardIndex) getDashboards(ctx context.Context, orgId int64) ([]dashboard, error) {
|
||||
var dashboards []dashboard
|
||||
func stringInSlice(str string, slice []string) bool {
|
||||
for _, s := range slice {
|
||||
if s == str {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
i.mu.Lock()
|
||||
defer i.mu.Unlock()
|
||||
func (i *dashboardIndex) updateDashboard(writer *bluge.Writer, reader *bluge.Reader, dash dashboard) (*bluge.Reader, error) {
|
||||
batch := bluge.NewBatch()
|
||||
|
||||
if cachedDashboards, ok := i.dashboards[orgId]; ok {
|
||||
dashboards = cachedDashboards
|
||||
var doc *bluge.Document
|
||||
if dash.isFolder {
|
||||
doc = getFolderDashboardDoc(dash)
|
||||
} else {
|
||||
// Load and parse all dashboards for given orgId.
|
||||
var err error
|
||||
dashboards, err = i.loader.LoadDashboards(ctx, orgId, "")
|
||||
var folderUID string
|
||||
if dash.folderID == 0 {
|
||||
folderUID = "general"
|
||||
} else {
|
||||
var err error
|
||||
folderUID, err = getDashboardFolderUID(reader, dash.folderID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
location := folderUID
|
||||
doc = getNonFolderDashboardDoc(dash, location)
|
||||
|
||||
var actualPanelIDs []string
|
||||
|
||||
location += "/" + dash.uid
|
||||
panelDocs := getDashboardPanelDocs(dash, location)
|
||||
for _, panelDoc := range panelDocs {
|
||||
actualPanelIDs = append(actualPanelIDs, string(panelDoc.ID().Term()))
|
||||
batch.Update(panelDoc.ID(), panelDoc)
|
||||
}
|
||||
|
||||
indexedPanelIDs, err := getDashboardPanelIDs(reader, dash.uid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
i.dashboards[orgId] = dashboards
|
||||
|
||||
for _, panelID := range indexedPanelIDs {
|
||||
if !stringInSlice(panelID, actualPanelIDs) {
|
||||
batch.Delete(bluge.NewDocument(panelID).ID())
|
||||
}
|
||||
}
|
||||
}
|
||||
return dashboards, nil
|
||||
|
||||
batch.Update(doc.ID(), doc)
|
||||
|
||||
err := writer.Batch(batch)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return writer.Reader()
|
||||
}
|
||||
|
||||
type sqlDashboardLoader struct {
|
||||
|
||||
Reference in New Issue
Block a user