Extract NewSearchOptions from unified sql client setup. (#100719)

* Extract NewSearchOptions from unified sql client setup.

Co-authored-by: Georges Chaudy <chaudyg@gmail.com>
This commit is contained in:
Peter Štibraný 2025-02-14 16:39:48 +01:00 committed by GitHub
parent 861686adaa
commit d0394bfa7e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 71 additions and 48 deletions

View File

@ -23,6 +23,7 @@ import (
"github.com/grafana/grafana/pkg/storage/legacysql"
"github.com/grafana/grafana/pkg/storage/unified/federated"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/search"
"github.com/grafana/grafana/pkg/storage/unified/sql"
)
@ -121,7 +122,11 @@ func newClient(opts options.StorageOptions,
// Use the local SQL
default:
server, err := sql.NewResourceServer(ctx, db, cfg, features, docs, tracer, reg, authzc)
searchOptions, err := search.NewSearchOptions(features, cfg, tracer, docs, reg)
if err != nil {
return nil, err
}
server, err := sql.NewResourceServer(db, cfg, tracer, reg, authzc, searchOptions)
if err != nil {
return nil, err
}

View File

@ -0,0 +1,54 @@
package search
import (
"log/slog"
"os"
"path/filepath"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
func NewSearchOptions(features featuremgmt.FeatureToggles, cfg *setting.Cfg, tracer tracing.Tracer, docs resource.DocumentBuilderSupplier, reg prometheus.Registerer) (resource.SearchOptions, error) {
// Setup the search server
if features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageSearch) {
root := cfg.IndexPath
if root == "" {
root = filepath.Join(cfg.DataPath, "unified-search", "bleve")
}
err := os.MkdirAll(root, 0750)
if err != nil {
return resource.SearchOptions{}, err
}
bleve, err := NewBleveBackend(BleveOptions{
Root: root,
FileThreshold: int64(cfg.IndexFileThreshold), // fewer than X items will use a memory index
BatchSize: cfg.IndexMaxBatchSize, // This is the batch size for how many objects to add to the index at once
}, tracer, features)
if err != nil {
return resource.SearchOptions{}, err
}
err = reg.Register(resource.NewIndexMetrics(cfg.IndexPath, bleve))
if err != nil {
slog.Warn("Failed to register indexer metrics", "error", err)
}
err = reg.Register(resource.NewSprinklesMetrics())
if err != nil {
slog.Warn("Failed to register sprinkles metrics", "error", err)
}
return resource.SearchOptions{
Backend: bleve,
Resources: docs,
WorkerThreads: cfg.IndexWorkers,
InitMinCount: cfg.IndexMinCount,
}, nil
}
return resource.SearchOptions{}, nil
}

View File

@ -1,28 +1,23 @@
package sql
import (
"context"
"log/slog"
"os"
"path/filepath"
"strings"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/authlib/types"
infraDB "github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/search"
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
)
// Creates a new ResourceServer
func NewResourceServer(ctx context.Context, db infraDB.DB, cfg *setting.Cfg,
features featuremgmt.FeatureToggles, docs resource.DocumentBuilderSupplier,
tracer tracing.Tracer, reg prometheus.Registerer, ac types.AccessClient) (resource.ResourceServer, error) {
func NewResourceServer(db infraDB.DB, cfg *setting.Cfg,
tracer tracing.Tracer, reg prometheus.Registerer, ac types.AccessClient, searchOptions resource.SearchOptions) (resource.ResourceServer, error) {
apiserverCfg := cfg.SectionWithEnvOverrides("grafana-apiserver")
opts := resource.ResourceServerOptions{
Tracer: tracer,
@ -55,44 +50,7 @@ func NewResourceServer(ctx context.Context, db infraDB.DB, cfg *setting.Cfg,
opts.Backend = store
opts.Diagnostics = store
opts.Lifecycle = store
// Setup the search server
if features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageSearch) {
root := cfg.IndexPath
if root == "" {
root = filepath.Join(cfg.DataPath, "unified-search", "bleve")
}
err = os.MkdirAll(root, 0750)
if err != nil {
return nil, err
}
bleve, err := search.NewBleveBackend(search.BleveOptions{
Root: root,
FileThreshold: int64(cfg.IndexFileThreshold), // fewer than X items will use a memory index
BatchSize: cfg.IndexMaxBatchSize, // This is the batch size for how many objects to add to the index at once
}, tracer, features)
if err != nil {
return nil, err
}
opts.Search = resource.SearchOptions{
Backend: bleve,
Resources: docs,
WorkerThreads: cfg.IndexWorkers,
InitMinCount: cfg.IndexMinCount,
}
// Register indexer metrics
err = reg.Register(resource.NewIndexMetrics(cfg.IndexPath, opts.Search.Backend))
if err != nil {
slog.Warn("Failed to register indexer metrics", "error", err)
}
err = reg.Register(resource.NewSprinklesMetrics())
if err != nil {
slog.Warn("Failed to register sprinkles metrics", "error", err)
}
}
opts.Search = searchOptions
rs, err := resource.NewResourceServer(opts)
if err != nil {

View File

@ -20,6 +20,7 @@ import (
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resource/grpc"
"github.com/grafana/grafana/pkg/storage/unified/search"
)
var (
@ -110,7 +111,12 @@ func (s *service) start(ctx context.Context) error {
return err
}
server, err := NewResourceServer(ctx, s.db, s.cfg, s.features, s.docBuilders, s.tracing, s.reg, authzClient)
searchOptions, err := search.NewSearchOptions(s.features, s.cfg, s.tracing, s.docBuilders, s.reg)
if err != nil {
return err
}
server, err := NewResourceServer(s.db, s.cfg, s.tracing, s.reg, authzClient, searchOptions)
if err != nil {
return err
}