Search: Use new folder when creating a bleve index (#98260)

This commit is contained in:
Ryan McKinley 2024-12-19 19:40:04 +03:00 committed by GitHub
parent 805ac9cd40
commit 399cbf7c50
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 81 additions and 25 deletions

View File

@ -4,9 +4,11 @@ import (
"context" "context"
"fmt" "fmt"
"log/slog" "log/slog"
"os"
"path/filepath" "path/filepath"
"strings" "strings"
"sync" "sync"
"time"
"github.com/blevesearch/bleve/v2" "github.com/blevesearch/bleve/v2"
"github.com/blevesearch/bleve/v2/search" "github.com/blevesearch/bleve/v2/search"
@ -39,21 +41,32 @@ type bleveBackend struct {
tracer trace.Tracer tracer trace.Tracer
log *slog.Logger log *slog.Logger
opts BleveOptions opts BleveOptions
start time.Time
// cache info // cache info
cache map[resource.NamespacedResource]*bleveIndex cache map[resource.NamespacedResource]*bleveIndex
cacheMu sync.RWMutex cacheMu sync.RWMutex
} }
func NewBleveBackend(opts BleveOptions, tracer trace.Tracer) *bleveBackend { func NewBleveBackend(opts BleveOptions, tracer trace.Tracer) (*bleveBackend, error) {
b := &bleveBackend{ if opts.Root == "" {
return nil, fmt.Errorf("bleve backend missing root folder configuration")
}
root, err := os.Stat(opts.Root)
if err != nil {
return nil, fmt.Errorf("error opening bleve root folder %w", err)
}
if !root.IsDir() {
return nil, fmt.Errorf("bleve root is configured against a file (not folder)")
}
return &bleveBackend{
log: slog.Default().With("logger", "bleve-backend"), log: slog.Default().With("logger", "bleve-backend"),
tracer: tracer, tracer: tracer,
cache: make(map[resource.NamespacedResource]*bleveIndex), cache: make(map[resource.NamespacedResource]*bleveIndex),
opts: opts, opts: opts,
} start: time.Now(),
}, nil
return b
} }
// This will return nil if the key does not exist // This will return nil if the key does not exist
@ -91,13 +104,38 @@ func (b *bleveBackend) BuildIndex(ctx context.Context,
var err error var err error
var index bleve.Index var index bleve.Index
build := true
mapper := getBleveMappings(fields) mapper := getBleveMappings(fields)
if size > b.opts.FileThreshold { if size > b.opts.FileThreshold {
dir := filepath.Join(b.opts.Root, key.Namespace, fmt.Sprintf("%s.%s", key.Resource, key.Group)) fname := fmt.Sprintf("rv%d", resourceVersion)
index, err = bleve.New(dir, mapper) if resourceVersion == 0 {
fname = b.start.Format("tmp-20060102-150405")
}
dir := filepath.Join(b.opts.Root, key.Namespace,
fmt.Sprintf("%s.%s", key.Resource, key.Group),
fname,
)
if resourceVersion > 0 {
info, _ := os.Stat(dir)
if info != nil && info.IsDir() {
index, err = bleve.Open(dir) // NOTE, will use the same mappings!!!
if err == nil {
found, err := index.DocCount()
if err != nil || int64(found) != size {
b.log.Info("this size changed since the last time the index opened")
_ = index.Close()
index = nil
} else {
build = false // no need to build the index
}
}
}
}
// TODO, check last RV so we can see if the numbers have changed if index == nil {
index, err = bleve.New(dir, mapper)
}
resource.IndexMetrics.IndexTenants.WithLabelValues(key.Namespace, "file").Inc() resource.IndexMetrics.IndexTenants.WithLabelValues(key.Namespace, "file").Inc()
} else { } else {
@ -123,15 +161,17 @@ func (b *bleveBackend) BuildIndex(ctx context.Context,
return nil, err return nil, err
} }
_, err = builder(idx) if build {
if err != nil { _, err = builder(idx)
return nil, err if err != nil {
} return nil, err
}
// Flush the batch // Flush the batch
err = idx.Flush() err = idx.Flush()
if err != nil { if err != nil {
return nil, err return nil, err
}
} }
b.cacheMu.Lock() b.cacheMu.Lock()

View File

@ -27,13 +27,14 @@ func TestBleveBackend(t *testing.T) {
Group: "folder.grafana.app", Group: "folder.grafana.app",
Resource: "folders", Resource: "folders",
} }
tmpdir, err := os.CreateTemp("", "bleve-test") tmpdir, err := os.MkdirTemp("", "grafana-bleve-test")
require.NoError(t, err) require.NoError(t, err)
backend := NewBleveBackend(BleveOptions{ backend, err := NewBleveBackend(BleveOptions{
Root: tmpdir.Name(), Root: tmpdir,
FileThreshold: 5, // with more than 5 items we create a file on disk FileThreshold: 5, // with more than 5 items we create a file on disk
}, tracing.NewNoopTracerService()) }, tracing.NewNoopTracerService())
require.NoError(t, err)
// AVOID NPE in test // AVOID NPE in test
resource.NewIndexMetrics(backend.opts.Root, backend) resource.NewIndexMetrics(backend.opts.Root, backend)

View File

@ -4,11 +4,13 @@ import (
"context" "context"
"log/slog" "log/slog"
"os" "os"
"path/filepath"
"strings" "strings"
"github.com/grafana/grafana/pkg/storage/unified/search"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/grafana/grafana/pkg/storage/unified/search"
infraDB "github.com/grafana/grafana/pkg/infra/db" infraDB "github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/authz" "github.com/grafana/grafana/pkg/services/authz"
@ -57,12 +59,25 @@ func NewResourceServer(ctx context.Context, db infraDB.DB, cfg *setting.Cfg,
// Setup the search server // Setup the search server
if features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageSearch) { if features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageSearch) {
root := cfg.IndexPath
if root == "" {
root = filepath.Join(cfg.DataPath, "unified-search", "bleve")
}
err = os.MkdirAll(root, 0750)
if err != nil {
return nil, err
}
bleve, err := search.NewBleveBackend(search.BleveOptions{
Root: root,
FileThreshold: int64(cfg.IndexFileThreshold), // fewer than X items will use a memory index
BatchSize: cfg.IndexMaxBatchSize, // This is the batch size for how many objects to add to the index at once
}, tracer)
if err != nil {
return nil, err
}
opts.Search = resource.SearchOptions{ opts.Search = resource.SearchOptions{
Backend: search.NewBleveBackend(search.BleveOptions{ Backend: bleve,
Root: cfg.IndexPath,
FileThreshold: int64(cfg.IndexFileThreshold), // fewer than X items will use a memory index
BatchSize: cfg.IndexMaxBatchSize, // This is the batch size for how many objects to add to the index at once
}, tracer),
Resources: docs, Resources: docs,
WorkerThreads: cfg.IndexWorkers, WorkerThreads: cfg.IndexWorkers,
InitMinCount: cfg.IndexMinCount, InitMinCount: cfg.IndexMinCount,