Search: limit max size of batches during indexing (#49187)

This commit is contained in:
Alexander Emelin 2022-05-19 18:57:26 +03:00 committed by GitHub
parent 8919c8b014
commit 444c585c99
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 165 additions and 7 deletions

View File

@ -42,11 +42,31 @@ func initIndex(dashboards []dashboard, logger log.Logger) (*bluge.Reader, *bluge
}
// Not closing Writer here since we use it later while processing dashboard change events.
batch := bluge.NewBatch()
start := time.Now()
label := start
batch := bluge.NewBatch()
// In order to reduce memory usage while initial indexing we are limiting
// the size of batch here.
docsInBatch := 0
maxBatchSize := 100
flushIfRequired := func(force bool) error {
docsInBatch++
needFlush := force || (maxBatchSize > 0 && docsInBatch >= maxBatchSize)
if !needFlush {
return nil
}
err := writer.Batch(batch)
if err != nil {
return err
}
docsInBatch = 0
batch.Reset()
return nil
}
// First index the folders to construct folderIdLookup.
folderIdLookup := make(map[int64]string, 50)
for _, dash := range dashboards {
@ -55,6 +75,9 @@ func initIndex(dashboards []dashboard, logger log.Logger) (*bluge.Reader, *bluge
}
doc := getFolderDashboardDoc(dash)
batch.Insert(doc)
if err := flushIfRequired(false); err != nil {
return nil, nil, err
}
uid := dash.uid
if uid == "" {
uid = "general"
@ -71,14 +94,23 @@ func initIndex(dashboards []dashboard, logger log.Logger) (*bluge.Reader, *bluge
location := folderUID
doc := getNonFolderDashboardDoc(dash, location)
batch.Insert(doc)
if err := flushIfRequired(false); err != nil {
return nil, nil, err
}
// Index each panel in dashboard.
location += "/" + dash.uid
docs := getDashboardPanelDocs(dash, location)
for _, panelDoc := range docs {
batch.Insert(panelDoc)
if err := flushIfRequired(false); err != nil {
return nil, nil, err
}
}
}
if err := flushIfRequired(true); err != nil {
return nil, nil, err
}
logger.Info("Finish inserting docs into batch", "elapsed", time.Since(label))
label = time.Now()

View File

@ -4,6 +4,9 @@ import (
"bytes"
"context"
"fmt"
"io/ioutil"
"os"
"runtime"
"strconv"
"strings"
"sync"
@ -79,13 +82,10 @@ func (i *dashboardIndex) run(ctx context.Context) error {
lastEventID = lastEvent.Id
}
// Build on start for orgID 1 but keep lazy for others.
started := time.Now()
numDashboards, err := i.buildOrgIndex(ctx, 1)
err = i.buildInitialIndex(ctx)
if err != nil {
return fmt.Errorf("can't build dashboard search index for org ID 1: %w", err)
return fmt.Errorf("can't build initial dashboard search index: %w", err)
}
i.logger.Info("Indexing for main org finished", "mainOrgIndexElapsed", time.Since(started), "numDashboards", numDashboards)
for {
select {
@ -111,6 +111,93 @@ func (i *dashboardIndex) run(ctx context.Context) error {
}
}
func (i *dashboardIndex) buildInitialIndex(ctx context.Context) error {
memCtx, memCancel := context.WithCancel(ctx)
if os.Getenv("GF_SEARCH_DEBUG") != "" {
go i.debugMemStats(memCtx, 200*time.Millisecond)
}
// Build on start for orgID 1 but keep lazy for others.
started := time.Now()
numDashboards, err := i.buildOrgIndex(ctx, 1)
if err != nil {
memCancel()
return fmt.Errorf("can't build dashboard search index for org ID 1: %w", err)
}
i.logger.Info("Indexing for main org finished", "mainOrgIndexElapsed", time.Since(started), "numDashboards", numDashboards)
memCancel()
if os.Getenv("GF_SEARCH_DEBUG") != "" {
// May help to estimate size of index when introducing changes. Though it's not a direct
// match to a memory consumption, but at least make give some relative difference understanding.
// Moreover, changes in indexing can cause additional memory consumption upon initial index build
// which is not reflected here.
i.reportSizeOfIndexDiskBackup(1)
}
return nil
}
func (i *dashboardIndex) debugMemStats(ctx context.Context, frequency time.Duration) {
var maxHeapInuse uint64
var maxSys uint64
captureMemStats := func() {
var m runtime.MemStats
runtime.ReadMemStats(&m)
if m.HeapInuse > maxHeapInuse {
maxHeapInuse = m.HeapInuse
}
if m.Sys > maxSys {
maxSys = m.Sys
}
}
captureMemStats()
for {
select {
case <-ctx.Done():
i.logger.Warn("Memory stats during indexing", "maxHeapInUse", formatBytes(maxHeapInuse), "maxSys", formatBytes(maxSys))
return
case <-time.After(frequency):
captureMemStats()
}
}
}
func (i *dashboardIndex) reportSizeOfIndexDiskBackup(orgID int64) {
reader, _ := i.getOrgReader(orgID)
// create a temp directory to store the index
tmpDir, err := ioutil.TempDir("", "grafana.dashboard_index")
if err != nil {
i.logger.Error("can't create temp dir", "error", err)
return
}
defer func() {
err := os.RemoveAll(tmpDir)
if err != nil {
i.logger.Error("can't remove temp dir", "error", err, "tmpDir", tmpDir)
return
}
}()
cancel := make(chan struct{})
err = reader.Backup(tmpDir, cancel)
if err != nil {
i.logger.Error("can't create index disk backup", "error", err)
return
}
size, err := dirSize(tmpDir)
if err != nil {
i.logger.Error("can't calculate dir size", "error", err)
return
}
i.logger.Warn("Size of index disk backup", "size", formatBytes(uint64(size)))
}
func (i *dashboardIndex) buildOrgIndex(ctx context.Context, orgID int64) (int, error) {
started := time.Now()
ctx, cancel := context.WithTimeout(ctx, time.Minute)

View File

@ -0,0 +1,39 @@
package searchV2
import (
"fmt"
"math"
"os"
"path/filepath"
)
func dirSize(path string) (int64, error) {
var size int64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
size += info.Size()
}
return err
})
return size, err
}
func logN(n, b float64) float64 {
return math.Log(n) / math.Log(b)
}
// Slightly modified function from https://github.com/dustin/go-humanize (MIT).
func formatBytes(numBytes uint64) string {
base := 1024.0
sizes := []string{"B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB"}
if numBytes < 10 {
return fmt.Sprintf("%d B", numBytes)
}
e := math.Floor(logN(float64(numBytes), base))
suffix := sizes[int(e)]
val := math.Floor(float64(numBytes)/math.Pow(base, e)*10+0.5) / 10
return fmt.Sprintf("%.1f %s", val, suffix)
}