Search: Downgrade Bluge, refactor code (#51560)

Co-authored-by: Ryan McKinley <ryantxu@gmail.com>
This commit is contained in:
Alexander Emelin
2022-07-04 11:33:07 +03:00
committed by GitHub
parent 16f9830fc3
commit f233a74b70
6 changed files with 287 additions and 235 deletions

View File

@@ -59,11 +59,32 @@ type buildSignal struct {
done chan error
}
type dashboardIndex struct {
type orgIndex struct {
writers map[indexType]*bluge.Writer
}
type indexType string
const (
indexTypeDashboard indexType = "dashboard"
)
func (i *orgIndex) writerForIndex(idxType indexType) *bluge.Writer {
return i.writers[idxType]
}
func (i *orgIndex) readerForIndex(idxType indexType) (*bluge.Reader, func(), error) {
reader, err := i.writers[idxType].Reader()
if err != nil {
return nil, nil, err
}
return reader, func() { _ = reader.Close() }, nil
}
type searchIndex struct {
mu sync.RWMutex
loader dashboardLoader
perOrgReader map[int64]*bluge.Reader // orgId -> bluge reader
perOrgWriter map[int64]*bluge.Writer // orgId -> bluge writer
perOrgIndex map[int64]*orgIndex
eventStore eventStore
logger log.Logger
buildSignals chan buildSignal
@@ -72,13 +93,12 @@ type dashboardIndex struct {
syncCh chan chan struct{}
}
func newDashboardIndex(dashLoader dashboardLoader, evStore eventStore, extender DocumentExtender, folderIDs folderUIDLookup) *dashboardIndex {
return &dashboardIndex{
func newSearchIndex(dashLoader dashboardLoader, evStore eventStore, extender DocumentExtender, folderIDs folderUIDLookup) *searchIndex {
return &searchIndex{
loader: dashLoader,
eventStore: evStore,
perOrgReader: map[int64]*bluge.Reader{},
perOrgWriter: map[int64]*bluge.Writer{},
logger: log.New("dashboardIndex"),
perOrgIndex: map[int64]*orgIndex{},
logger: log.New("searchIndex"),
buildSignals: make(chan buildSignal),
extender: extender,
folderIdLookup: folderIDs,
@@ -86,7 +106,7 @@ func newDashboardIndex(dashLoader dashboardLoader, evStore eventStore, extender
}
}
func (i *dashboardIndex) sync(ctx context.Context) error {
func (i *searchIndex) sync(ctx context.Context) error {
doneCh := make(chan struct{}, 1)
select {
case i.syncCh <- doneCh:
@@ -101,7 +121,7 @@ func (i *dashboardIndex) sync(ctx context.Context) error {
}
}
func (i *dashboardIndex) run(ctx context.Context, orgIDs []int64, reIndexSignalCh chan struct{}) error {
func (i *searchIndex) run(ctx context.Context, orgIDs []int64, reIndexSignalCh chan struct{}) error {
reIndexInterval := 5 * time.Minute
fullReIndexTimer := time.NewTimer(reIndexInterval)
defer fullReIndexTimer.Stop()
@@ -147,7 +167,7 @@ func (i *dashboardIndex) run(ctx context.Context, orgIDs []int64, reIndexSignalC
case signal := <-i.buildSignals:
// When search read request meets new not-indexed org we build index for it.
i.mu.RLock()
_, ok := i.perOrgWriter[signal.orgID]
_, ok := i.perOrgIndex[signal.orgID]
if ok {
// Index for org already exists, do nothing.
i.mu.RUnlock()
@@ -206,7 +226,7 @@ func (i *dashboardIndex) run(ctx context.Context, orgIDs []int64, reIndexSignalC
}
}
func (i *dashboardIndex) buildInitialIndexes(ctx context.Context, orgIDs []int64) error {
func (i *searchIndex) buildInitialIndexes(ctx context.Context, orgIDs []int64) error {
started := time.Now()
i.logger.Info("Start building in-memory indexes")
for _, orgID := range orgIDs {
@@ -219,7 +239,7 @@ func (i *dashboardIndex) buildInitialIndexes(ctx context.Context, orgIDs []int64
return nil
}
func (i *dashboardIndex) buildInitialIndex(ctx context.Context, orgID int64) error {
func (i *searchIndex) buildInitialIndex(ctx context.Context, orgID int64) error {
debugCtx, debugCtxCancel := context.WithCancel(ctx)
if os.Getenv("GF_SEARCH_DEBUG") != "" {
go i.debugResourceUsage(debugCtx, 200*time.Millisecond)
@@ -284,7 +304,7 @@ func getProcessCPU(currentPid int) (float64, error) {
return 0, errors.New("process not found")
}
func (i *dashboardIndex) debugResourceUsage(ctx context.Context, frequency time.Duration) {
func (i *searchIndex) debugResourceUsage(ctx context.Context, frequency time.Duration) {
var maxHeapInuse uint64
var maxSys uint64
@@ -326,8 +346,14 @@ func (i *dashboardIndex) debugResourceUsage(ctx context.Context, frequency time.
}
}
func (i *dashboardIndex) reportSizeOfIndexDiskBackup(orgID int64) {
reader, _ := i.getOrgReader(orgID)
func (i *searchIndex) reportSizeOfIndexDiskBackup(orgID int64) {
index, _ := i.getOrgIndex(orgID)
reader, cancel, err := index.readerForIndex(indexTypeDashboard)
if err != nil {
i.logger.Warn("Error getting reader", "error", err)
return
}
defer cancel()
// create a temp directory to store the index
tmpDir, err := ioutil.TempDir("", "grafana.dashboard_index")
@@ -343,8 +369,8 @@ func (i *dashboardIndex) reportSizeOfIndexDiskBackup(orgID int64) {
}
}()
cancel := make(chan struct{})
err = reader.Backup(tmpDir, cancel)
cancelCh := make(chan struct{})
err = reader.Backup(tmpDir, cancelCh)
if err != nil {
i.logger.Error("can't create index disk backup", "error", err)
return
@@ -359,7 +385,7 @@ func (i *dashboardIndex) reportSizeOfIndexDiskBackup(orgID int64) {
i.logger.Warn("Size of index disk backup", "size", formatBytes(uint64(size)))
}
func (i *dashboardIndex) buildOrgIndex(ctx context.Context, orgID int64) (int, error) {
func (i *searchIndex) buildOrgIndex(ctx context.Context, orgID int64) (int, error) {
started := time.Now()
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
@@ -373,7 +399,7 @@ func (i *dashboardIndex) buildOrgIndex(ctx context.Context, orgID int64) (int, e
i.logger.Info("Finish loading org dashboards", "elapsed", orgSearchIndexLoadTime, "orgId", orgID)
dashboardExtender := i.extender.GetDashboardExtender(orgID)
reader, writer, err := initIndex(dashboards, i.logger, dashboardExtender)
index, err := initOrgIndex(dashboards, i.logger, dashboardExtender)
if err != nil {
return 0, fmt.Errorf("error initializing index: %w", err)
}
@@ -388,31 +414,34 @@ func (i *dashboardIndex) buildOrgIndex(ctx context.Context, orgID int64) (int, e
"orgSearchDashboardCount", len(dashboards))
i.mu.Lock()
if oldReader, ok := i.perOrgReader[orgID]; ok {
_ = oldReader.Close()
if oldIndex, ok := i.perOrgIndex[orgID]; ok {
for _, w := range oldIndex.writers {
_ = w.Close()
}
}
if oldWriter, ok := i.perOrgWriter[orgID]; ok {
_ = oldWriter.Close()
}
i.perOrgReader[orgID] = reader
i.perOrgWriter[orgID] = writer
i.perOrgIndex[orgID] = index
i.mu.Unlock()
if orgID == 1 {
go updateUsageStats(context.Background(), reader, i.logger)
go func() {
if reader, cancel, err := index.readerForIndex(indexTypeDashboard); err == nil {
defer cancel()
updateUsageStats(context.Background(), reader, i.logger)
}
}()
}
return len(dashboards), nil
}
func (i *dashboardIndex) getOrgReader(orgID int64) (*bluge.Reader, bool) {
func (i *searchIndex) getOrgIndex(orgID int64) (*orgIndex, bool) {
i.mu.RLock()
defer i.mu.RUnlock()
r, ok := i.perOrgReader[orgID]
r, ok := i.perOrgIndex[orgID]
return r, ok
}
func (i *dashboardIndex) getOrCreateReader(ctx context.Context, orgID int64) (*bluge.Reader, error) {
reader, ok := i.getOrgReader(orgID)
func (i *searchIndex) getOrCreateOrgIndex(ctx context.Context, orgID int64) (*orgIndex, error) {
index, ok := i.getOrgIndex(orgID)
if !ok {
// For non-main organization indexes are built lazily.
// If we don't have an index then we are blocking here until an index for
@@ -433,22 +462,15 @@ func (i *dashboardIndex) getOrCreateReader(ctx context.Context, orgID int64) (*b
case <-ctx.Done():
return nil, ctx.Err()
}
reader, _ = i.getOrgReader(orgID)
index, _ = i.getOrgIndex(orgID)
}
return reader, nil
return index, nil
}
func (i *dashboardIndex) getOrgWriter(orgID int64) (*bluge.Writer, bool) {
func (i *searchIndex) reIndexFromScratch(ctx context.Context) {
i.mu.RLock()
defer i.mu.RUnlock()
w, ok := i.perOrgWriter[orgID]
return w, ok
}
func (i *dashboardIndex) reIndexFromScratch(ctx context.Context) {
i.mu.RLock()
orgIDs := make([]int64, 0, len(i.perOrgWriter))
for orgID := range i.perOrgWriter {
orgIDs := make([]int64, 0, len(i.perOrgIndex))
for orgID := range i.perOrgIndex {
orgIDs = append(orgIDs, orgID)
}
i.mu.RUnlock()
@@ -462,7 +484,7 @@ func (i *dashboardIndex) reIndexFromScratch(ctx context.Context) {
}
}
func (i *dashboardIndex) applyIndexUpdates(ctx context.Context, lastEventID int64) int64 {
func (i *searchIndex) applyIndexUpdates(ctx context.Context, lastEventID int64) int64 {
events, err := i.eventStore.GetAllEventsAfter(context.Background(), lastEventID)
if err != nil {
i.logger.Error("can't load events", "error", err)
@@ -484,7 +506,7 @@ func (i *dashboardIndex) applyIndexUpdates(ctx context.Context, lastEventID int6
return lastEventID
}
func (i *dashboardIndex) applyEventOnIndex(ctx context.Context, e *store.EntityEvent) error {
func (i *searchIndex) applyEventOnIndex(ctx context.Context, e *store.EntityEvent) error {
i.logger.Debug("processing event", "event", e)
if !strings.HasPrefix(e.EntityId, "database/") {
@@ -508,9 +530,9 @@ func (i *dashboardIndex) applyEventOnIndex(ctx context.Context, e *store.EntityE
return i.applyEvent(ctx, orgID, kind, uid, e.EventType)
}
func (i *dashboardIndex) applyEvent(ctx context.Context, orgID int64, kind store.EntityType, uid string, _ store.EntityEventType) error {
func (i *searchIndex) applyEvent(ctx context.Context, orgID int64, kind store.EntityType, uid string, _ store.EntityEventType) error {
i.mu.Lock()
_, ok := i.perOrgWriter[orgID]
_, ok := i.perOrgIndex[orgID]
if !ok {
// Skip event for org not yet indexed.
i.mu.Unlock()
@@ -527,49 +549,39 @@ func (i *dashboardIndex) applyEvent(ctx context.Context, orgID int64, kind store
i.mu.Lock()
defer i.mu.Unlock()
writer, ok := i.perOrgWriter[orgID]
index, ok := i.perOrgIndex[orgID]
if !ok {
// Skip event for org not yet fully indexed.
return nil
}
// TODO: should we release index lock while performing removeDashboard/updateDashboard?
reader, ok := i.perOrgReader[orgID]
if !ok {
// Skip event for org not yet fully indexed.
return nil
}
var newReader *bluge.Reader
// In the future we can rely on operation types to reduce work here.
if len(dbDashboards) == 0 {
switch kind {
case store.EntityTypeDashboard:
newReader, err = i.removeDashboard(ctx, writer, reader, uid)
err = i.removeDashboard(ctx, index, uid)
case store.EntityTypeFolder:
newReader, err = i.removeFolder(ctx, writer, reader, uid)
err = i.removeFolder(ctx, index, uid)
default:
return nil
}
} else {
newReader, err = i.updateDashboard(ctx, orgID, writer, reader, dbDashboards[0])
err = i.updateDashboard(ctx, orgID, index, dbDashboards[0])
}
if err != nil {
return err
}
_ = reader.Close()
i.perOrgReader[orgID] = newReader
return nil
}
func (i *dashboardIndex) removeDashboard(_ context.Context, writer *bluge.Writer, reader *bluge.Reader, dashboardUID string) (*bluge.Reader, error) {
dashboardLocation, ok, err := getDashboardLocation(reader, dashboardUID)
func (i *searchIndex) removeDashboard(_ context.Context, index *orgIndex, dashboardUID string) error {
dashboardLocation, ok, err := getDashboardLocation(index, dashboardUID)
if err != nil {
return nil, err
return err
}
if !ok {
// No dashboard, nothing to remove.
return reader, nil
return nil
}
// Find all panel docs to remove with dashboard.
@@ -577,40 +589,35 @@ func (i *dashboardIndex) removeDashboard(_ context.Context, writer *bluge.Writer
if dashboardLocation != "" {
panelLocation = dashboardLocation + "/" + dashboardUID
}
panelIDs, err := getDocsIDsByLocationPrefix(reader, panelLocation)
panelIDs, err := getDocsIDsByLocationPrefix(index, panelLocation)
if err != nil {
return nil, err
return err
}
writer := index.writerForIndex(indexTypeDashboard)
batch := bluge.NewBatch()
batch.Delete(bluge.NewDocument(dashboardUID).ID())
for _, panelID := range panelIDs {
batch.Delete(bluge.NewDocument(panelID).ID())
}
err = writer.Batch(batch)
if err != nil {
return nil, err
}
return writer.Reader()
return writer.Batch(batch)
}
func (i *dashboardIndex) removeFolder(_ context.Context, writer *bluge.Writer, reader *bluge.Reader, folderUID string) (*bluge.Reader, error) {
ids, err := getDocsIDsByLocationPrefix(reader, folderUID)
func (i *searchIndex) removeFolder(_ context.Context, index *orgIndex, folderUID string) error {
ids, err := getDocsIDsByLocationPrefix(index, folderUID)
if err != nil {
return nil, err
return fmt.Errorf("error getting by location prefix: %w", err)
}
batch := bluge.NewBatch()
batch.Delete(bluge.NewDocument(folderUID).ID())
for _, id := range ids {
batch.Delete(bluge.NewDocument(id).ID())
}
err = writer.Batch(batch)
if err != nil {
return nil, err
}
return writer.Reader()
writer := index.writerForIndex(indexTypeDashboard)
return writer.Batch(batch)
}
func stringInSlice(str string, slice []string) bool {
@@ -622,67 +629,65 @@ func stringInSlice(str string, slice []string) bool {
return false
}
func (i *dashboardIndex) updateDashboard(ctx context.Context, orgID int64, writer *bluge.Writer, reader *bluge.Reader, dash dashboard) (*bluge.Reader, error) {
batch := bluge.NewBatch()
func (i *searchIndex) updateDashboard(ctx context.Context, orgID int64, index *orgIndex, dash dashboard) error {
extendDoc := i.extender.GetDashboardExtender(orgID, dash.uid)
writer := index.writerForIndex(indexTypeDashboard)
var doc *bluge.Document
if dash.isFolder {
doc = getFolderDashboardDoc(dash)
if err := extendDoc(dash.uid, doc); err != nil {
return nil, err
return err
}
return writer.Update(doc.ID(), doc)
}
batch := bluge.NewBatch()
var folderUID string
if dash.folderID == 0 {
folderUID = "general"
} else {
var folderUID string
if dash.folderID == 0 {
folderUID = "general"
} else {
var err error
folderUID, err = i.folderIdLookup(ctx, dash.folderID)
if err != nil {
return nil, err
}
}
location := folderUID
doc = getNonFolderDashboardDoc(dash, location)
if err := extendDoc(dash.uid, doc); err != nil {
return nil, err
}
var actualPanelIDs []string
if location != "" {
location += "/"
}
location += dash.uid
panelDocs := getDashboardPanelDocs(dash, location)
for _, panelDoc := range panelDocs {
actualPanelIDs = append(actualPanelIDs, string(panelDoc.ID().Term()))
batch.Update(panelDoc.ID(), panelDoc)
}
indexedPanelIDs, err := getDashboardPanelIDs(reader, location)
var err error
folderUID, err = i.folderIdLookup(ctx, dash.folderID)
if err != nil {
return nil, err
return err
}
}
for _, panelID := range indexedPanelIDs {
if !stringInSlice(panelID, actualPanelIDs) {
batch.Delete(bluge.NewDocument(panelID).ID())
}
location := folderUID
doc = getNonFolderDashboardDoc(dash, location)
if err := extendDoc(dash.uid, doc); err != nil {
return err
}
var actualPanelIDs []string
if location != "" {
location += "/"
}
location += dash.uid
panelDocs := getDashboardPanelDocs(dash, location)
for _, panelDoc := range panelDocs {
actualPanelIDs = append(actualPanelIDs, string(panelDoc.ID().Term()))
batch.Update(panelDoc.ID(), panelDoc)
}
indexedPanelIDs, err := getDashboardPanelIDs(index, location)
if err != nil {
return err
}
for _, panelID := range indexedPanelIDs {
if !stringInSlice(panelID, actualPanelIDs) {
batch.Delete(bluge.NewDocument(panelID).ID())
}
}
batch.Update(doc.ID(), doc)
err := writer.Batch(batch)
if err != nil {
return nil, err
}
return writer.Reader()
return writer.Batch(batch)
}
type sqlDashboardLoader struct {