Search PoC: Improves initial indexing speed. Makes params configurable. (#95439)

* Improves initial indexing speed. Makes params configurable.

* fix linter errors

* removes kind param

* updates index test

* remove println from test

* removes error check in test

* adds log for high index latency ands updates max goroutine var with workers config var

* fix test timing out - set worker limit

* set the batch size

---------

Co-authored-by: Scott Lepper <scott.lepper@gmail.com>
This commit is contained in:
owensmallwood 2024-10-29 12:24:31 -06:00 committed by GitHub
parent 189802d3c3
commit 995128d1db
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 137 additions and 54 deletions

View File

@ -534,6 +534,9 @@ type Cfg struct {
// Unified Storage // Unified Storage
UnifiedStorage map[string]UnifiedStorageConfig UnifiedStorage map[string]UnifiedStorageConfig
IndexPath string IndexPath string
IndexWorkers int
IndexMaxBatchSize int
IndexListLimit int
} }
type UnifiedStorageConfig struct { type UnifiedStorageConfig struct {
@ -1343,7 +1346,6 @@ func (cfg *Cfg) parseINIFile(iniFile *ini.File) error {
// unified storage config // unified storage config
cfg.setUnifiedStorageConfig() cfg.setUnifiedStorageConfig()
cfg.setIndexPath()
return nil return nil
} }

View File

@ -35,8 +35,11 @@ func (cfg *Cfg) setUnifiedStorageConfig() {
} }
} }
cfg.UnifiedStorage = storageConfig cfg.UnifiedStorage = storageConfig
}
func (cfg *Cfg) setIndexPath() { // Set indexer config for unified storaae
cfg.IndexPath = cfg.Raw.Section("unified_storage").Key("index_path").String() section := cfg.Raw.Section("unified_storage")
cfg.IndexPath = section.Key("index_path").String()
cfg.IndexWorkers = section.Key("index_workers").MustInt(10)
cfg.IndexMaxBatchSize = section.Key("index_max_batch_size").MustInt(100)
cfg.IndexListLimit = section.Key("index_list_limit").MustInt(1000)
} }

View File

@ -13,10 +13,10 @@ import (
"github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/infra/tracing"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
) )
const tracingPrexfixIndex = "unified_storage.index." const tracingPrexfixIndex = "unified_storage.index."
const pageSize = 10000
type Shard struct { type Shard struct {
index bleve.Index index bleve.Index
@ -30,12 +30,11 @@ type Index struct {
s *server s *server
log log.Logger log log.Logger
tracer tracing.Tracer tracer tracing.Tracer
path string
} }
func NewIndex(s *server, opts Opts, path string, tracer tracing.Tracer) *Index { func NewIndex(s *server, opts Opts, tracer tracing.Tracer) *Index {
if path == "" { if opts.IndexDir == "" {
path = os.TempDir() opts.IndexDir = os.TempDir()
} }
idx := &Index{ idx := &Index{
@ -44,46 +43,88 @@ func NewIndex(s *server, opts Opts, path string, tracer tracing.Tracer) *Index {
shards: make(map[string]Shard), shards: make(map[string]Shard),
log: log.New("unifiedstorage.search.index"), log: log.New("unifiedstorage.search.index"),
tracer: tracer, tracer: tracer,
path: path,
} }
return idx return idx
} }
func (i *Index) IndexBatch(ctx context.Context, list *ListResponse) error { // IndexBatches goes through all the shards and indexes their batches if they are large enough
ctx, span := i.tracer.Start(ctx, tracingPrexfixIndex+"CreateIndexBatches") func (i *Index) IndexBatches(ctx context.Context, maxSize int, tenants []string) error {
for _, obj := range list.Items { _, span := i.tracer.Start(ctx, tracingPrexfixIndex+"IndexBatches")
indexableResource, err := NewIndexedResource(obj.Value)
if err != nil {
return err
}
shard, err := i.getShard(indexableResource.Namespace)
if err != nil {
return err
}
i.log.Debug("indexing resource in batch", "batch_count", len(list.Items), "tenant", indexableResource.Namespace)
err = shard.batch.Index(indexableResource.Uid, indexableResource)
if err != nil {
return err
}
}
span.End()
_, span = i.tracer.Start(ctx, tracingPrexfixIndex+"IndexBatches")
defer span.End() defer span.End()
for _, shard := range i.shards {
err := shard.index.Batch(shard.batch) group := errgroup.Group{}
group.SetLimit(i.opts.Workers)
totalBatchesIndexed := 0
for _, tenant := range tenants {
shard, err := i.getShard(tenant)
if err != nil {
return err
}
// Index the batch if it is large enough
if shard.batch.Size() >= maxSize {
totalBatchesIndexed++
group.Go(func() error {
i.log.Debug("indexing batch for shard", "tenant", tenant, "size", shard.batch.Size())
err = shard.index.Batch(shard.batch)
if err != nil { if err != nil {
return err return err
} }
shard.batch.Reset() shard.batch.Reset()
return nil
})
}
} }
err := group.Wait()
if err != nil {
return err
}
span.AddEvent("batches indexed", trace.WithAttributes(attribute.Int("batches_indexed", totalBatchesIndexed)))
return nil return nil
} }
// AddToBatches adds resources to their respective shard's batch
// returns a list of tenants that have changes
func (i *Index) AddToBatches(ctx context.Context, list *ListResponse) ([]string, error) {
_, span := i.tracer.Start(ctx, tracingPrexfixIndex+"AddToBatches")
defer span.End()
tenantsWithChanges := make(map[string]bool)
for _, obj := range list.Items {
// Transform the raw resource into a more generic indexable resource
res, err := NewIndexedResource(obj.Value)
if err != nil {
return nil, err
}
shard, err := i.getShard(res.Namespace)
if err != nil {
return nil, err
}
i.log.Debug("indexing resource in batch", "batch_count", len(list.Items), "kind", res.Kind, "tenant", res.Namespace)
err = shard.batch.Index(res.Uid, res)
if err != nil {
return nil, err
}
if _, ok := tenantsWithChanges[res.Namespace]; !ok {
tenantsWithChanges[res.Namespace] = true
}
}
tenants := make([]string, 0, len(tenantsWithChanges))
for tenant, _ := range tenantsWithChanges {
tenants = append(tenants, tenant)
}
return tenants, nil
}
func (i *Index) Init(ctx context.Context) error { func (i *Index) Init(ctx context.Context) error {
ctx, span := i.tracer.Start(ctx, tracingPrexfixIndex+"Init") ctx, span := i.tracer.Start(ctx, tracingPrexfixIndex+"Init")
defer span.End() defer span.End()
@ -92,12 +133,12 @@ func (i *Index) Init(ctx context.Context) error {
resourceTypes := fetchResourceTypes() resourceTypes := fetchResourceTypes()
totalObjectsFetched := 0 totalObjectsFetched := 0
for _, rt := range resourceTypes { for _, rt := range resourceTypes {
i.log.Info("indexing resource", "kind", rt.Key.Resource) i.log.Info("indexing resource", "kind", rt.Key.Resource, "list_limit", i.opts.ListLimit, "batch_size", i.opts.BatchSize, "workers", i.opts.Workers)
r := &ListRequest{Options: rt, Limit: pageSize} r := &ListRequest{Options: rt, Limit: int64(i.opts.ListLimit)}
// Paginate through the list of resources and index each page // Paginate through the list of resources and index each page
for { for {
i.log.Debug("fetching resource list", "kind", rt.Key.Resource) i.log.Info("fetching resource list", "kind", rt.Key.Resource)
list, err := i.s.List(ctx, r) list, err := i.s.List(ctx, r)
if err != nil { if err != nil {
return err return err
@ -105,8 +146,15 @@ func (i *Index) Init(ctx context.Context) error {
totalObjectsFetched += len(list.Items) totalObjectsFetched += len(list.Items)
// Index current page i.log.Info("indexing batch", "kind", rt.Key.Resource, "count", len(list.Items))
err = i.IndexBatch(ctx, list) //add changes to batches for shards with changes in the List
tenants, err := i.AddToBatches(ctx, list)
if err != nil {
return err
}
// Index the batches for tenants with changes if the batch is large enough
err = i.IndexBatches(ctx, i.opts.BatchSize, tenants)
if err != nil { if err != nil {
return err return err
} }
@ -118,6 +166,14 @@ func (i *Index) Init(ctx context.Context) error {
r.NextPageToken = list.NextPageToken r.NextPageToken = list.NextPageToken
} }
} }
//index all remaining batches
i.log.Info("indexing remaining batches", "shards", len(i.shards))
err := i.IndexBatches(ctx, 1, i.allTenants())
if err != nil {
return err
}
span.AddEvent("indexing finished", trace.WithAttributes(attribute.Int64("objects_indexed", int64(totalObjectsFetched)))) span.AddEvent("indexing finished", trace.WithAttributes(attribute.Int64("objects_indexed", int64(totalObjectsFetched))))
end := time.Now().Unix() end := time.Now().Unix()
i.log.Info("Initial indexing finished", "seconds", float64(end-start)) i.log.Info("Initial indexing finished", "seconds", float64(end-start))
@ -150,6 +206,9 @@ func (i *Index) Index(ctx context.Context, data *Data) error {
// record latency from when event was created to when it was indexed // record latency from when event was created to when it was indexed
latencySeconds := float64(time.Now().UnixMicro()-data.Value.ResourceVersion) / 1e6 latencySeconds := float64(time.Now().UnixMicro()-data.Value.ResourceVersion) / 1e6
if latencySeconds > 5 {
i.log.Warn("high index latency", "latency", latencySeconds)
}
if IndexServerMetrics != nil { if IndexServerMetrics != nil {
IndexServerMetrics.IndexLatency.WithLabelValues(data.Key.Resource).Observe(latencySeconds) IndexServerMetrics.IndexLatency.WithLabelValues(data.Key.Resource).Observe(latencySeconds)
} }
@ -236,7 +295,8 @@ func (i *Index) Count() (uint64, error) {
type Opts struct { type Opts struct {
Workers int // This controls how many goroutines are used to index objects Workers int // This controls how many goroutines are used to index objects
BatchSize int // This is the batch size for how many objects to add to the index at once BatchSize int // This is the batch size for how many objects to add to the index at once
Concurrent bool ListLimit int // This is how big the List page size is. If the response size is too large, the number of items will be limited by the server.
IndexDir string // The directory where the indexes for each tenant are stored
} }
func createFileIndex(path string) (bleve.Index, string, error) { func createFileIndex(path string) (bleve.Index, string, error) {
@ -248,12 +308,20 @@ func createFileIndex(path string) (bleve.Index, string, error) {
return index, indexPath, err return index, indexPath, err
} }
func (i *Index) allTenants() []string {
tenants := make([]string, 0, len(i.shards))
for tenant := range i.shards {
tenants = append(tenants, tenant)
}
return tenants
}
func (i *Index) getShard(tenant string) (Shard, error) { func (i *Index) getShard(tenant string) (Shard, error) {
shard, ok := i.shards[tenant] shard, ok := i.shards[tenant]
if ok { if ok {
return shard, nil return shard, nil
} }
index, path, err := createFileIndex(i.path) index, path, err := createFileIndex(i.opts.IndexDir)
if err != nil { if err != nil {
return Shard{}, err return Shard{}, err
} }

View File

@ -56,7 +56,13 @@ func (is *IndexServer) Load(ctx context.Context) error {
ctx, span := is.tracer.Start(ctx, tracingPrefixIndexServer+"Load") ctx, span := is.tracer.Start(ctx, tracingPrefixIndexServer+"Load")
defer span.End() defer span.End()
is.index = NewIndex(is.s, Opts{}, is.cfg.IndexPath, is.tracer) opts := Opts{
Workers: is.cfg.IndexWorkers,
BatchSize: is.cfg.IndexMaxBatchSize,
ListLimit: is.cfg.IndexListLimit,
IndexDir: is.cfg.IndexPath,
}
is.index = NewIndex(is.s, opts, is.tracer)
err := is.index.Init(ctx) err := is.index.Init(ctx)
if err != nil { if err != nil {
return err return err

View File

@ -20,24 +20,27 @@ func TestIndexBatch(t *testing.T) {
trace, err := tracing.ProvideService(tracingCfg) trace, err := tracing.ProvideService(tracingCfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
return
} }
tmpdir := os.TempDir() + "testindexbatch" tmpdir := os.TempDir() + "testindexbatch"
defer func() { defer func() {
err := os.RemoveAll(tmpdir) err = os.RemoveAll(tmpdir)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
return
} }
}() }()
index := &Index{ index := &Index{
tracer: trace, tracer: trace,
shards: make(map[string]Shard), shards: make(map[string]Shard),
path: tmpdir,
log: log.New("unifiedstorage.search.index"), log: log.New("unifiedstorage.search.index"),
opts: Opts{
IndexDir: tmpdir,
ListLimit: 10000,
Workers: 10,
BatchSize: 10000,
},
} }
ctx := context.Background() ctx := context.Background()
@ -47,15 +50,17 @@ func TestIndexBatch(t *testing.T) {
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
list := &ListResponse{Items: loadTestItems(strconv.Itoa(i))} list := &ListResponse{Items: loadTestItems(strconv.Itoa(i))}
start := time.Now() start := time.Now()
err = index.IndexBatch(ctx, list) _, err = index.AddToBatches(ctx, list)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
return
} }
elapsed := time.Since(start) elapsed := time.Since(start)
fmt.Println("Time elapsed:", elapsed) fmt.Println("Time elapsed:", elapsed)
} }
// index all batches for each shard/tenant
err = index.IndexBatches(ctx, 1, namespaces)
elapsed := time.Since(startAll) elapsed := time.Since(startAll)
fmt.Println("Total Time elapsed:", elapsed) fmt.Println("Total Time elapsed:", elapsed)
@ -64,7 +69,6 @@ func TestIndexBatch(t *testing.T) {
total, err := index.Count() total, err := index.Count()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
return
} }
assert.Equal(t, uint64(100000), total) assert.Equal(t, uint64(100000), total)