From 50a60695328cff0fbe25336c97bd41236e1dc8a5 Mon Sep 17 00:00:00 2001 From: owensmallwood Date: Tue, 29 Oct 2024 15:40:25 -0600 Subject: [PATCH] Unified Storage Indexer: Uses in-memory index (#95576) uses in-memory index --- pkg/storage/unified/resource/index.go | 51 ++++++++++++++-------- pkg/storage/unified/resource/index_test.go | 46 +++++++++---------- 2 files changed, 55 insertions(+), 42 deletions(-) diff --git a/pkg/storage/unified/resource/index.go b/pkg/storage/unified/resource/index.go index 18466097cda..60196ebbbf7 100644 --- a/pkg/storage/unified/resource/index.go +++ b/pkg/storage/unified/resource/index.go @@ -24,6 +24,13 @@ type Shard struct { batch *bleve.Batch } +type Opts struct { + Workers int // This controls how many goroutines are used to index objects + BatchSize int // This is the batch size for how many objects to add to the index at once + ListLimit int // This is how big the List page size is. If the response size is too large, the number of items will be limited by the server. + IndexDir string // The directory where the indexes for each tenant are stored +} + type Index struct { shards map[string]Shard opts Opts @@ -292,22 +299,6 @@ func (i *Index) Count() (uint64, error) { return total, nil } -type Opts struct { - Workers int // This controls how many goroutines are used to index objects - BatchSize int // This is the batch size for how many objects to add to the index at once - ListLimit int // This is how big the List page size is. If the response size is too large, the number of items will be limited by the server. - IndexDir string // The directory where the indexes for each tenant are stored -} - -func createFileIndex(path string) (bleve.Index, string, error) { - indexPath := filepath.Join(path, uuid.New().String()) - index, err := bleve.New(indexPath, createIndexMappings()) - if err != nil { - golog.Fatalf("Failed to create index: %v", err) - } - return index, indexPath, err -} - func (i *Index) allTenants() []string { tenants := make([]string, 0, len(i.shards)) for tenant := range i.shards { @@ -321,7 +312,8 @@ func (i *Index) getShard(tenant string) (Shard, error) { if ok { return shard, nil } - index, path, err := createFileIndex(i.opts.IndexDir) + + index, path, err := i.createIndex() if err != nil { return Shard{}, err } @@ -336,6 +328,31 @@ func (i *Index) getShard(tenant string) (Shard, error) { return shard, nil } +func (i *Index) createIndex() (bleve.Index, string, error) { + if i.opts.IndexDir == "" { + return createInMemoryIndex() + } + return createFileIndex(i.opts.IndexDir) +} + +var mappings = createIndexMappings() + +// less memory intensive alternative for larger indexes with less tenants (on-prem) +func createFileIndex(path string) (bleve.Index, string, error) { + indexPath := filepath.Join(path, uuid.New().String()) + index, err := bleve.New(indexPath, mappings) + if err != nil { + golog.Fatalf("Failed to create index: %v", err) + } + return index, indexPath, err +} + +// faster indexing when there are many tenants with smaller batches (cloud) +func createInMemoryIndex() (bleve.Index, string, error) { + index, err := bleve.NewMemOnly(mappings) + return index, "", err +} + // TODO - fetch from api func fetchResourceTypes() []*ListOptions { items := []*ListOptions{} diff --git a/pkg/storage/unified/resource/index_test.go b/pkg/storage/unified/resource/index_test.go index 379f066c547..7bc8898b4f3 100644 --- a/pkg/storage/unified/resource/index_test.go +++ b/pkg/storage/unified/resource/index_test.go @@ -3,7 +3,6 @@ package resource import ( "context" "fmt" - "os" "strconv" "strings" "testing" @@ -22,33 +21,24 @@ func TestIndexBatch(t *testing.T) { t.Fatal(err) } - tmpdir := os.TempDir() + "testindexbatch" - - defer func() { - err = os.RemoveAll(tmpdir) - if err != nil { - t.Fatal(err) - } - }() - index := &Index{ tracer: trace, shards: make(map[string]Shard), log: log.New("unifiedstorage.search.index"), opts: Opts{ - IndexDir: tmpdir, - ListLimit: 10000, + ListLimit: 5000, Workers: 10, - BatchSize: 10000, + BatchSize: 1000, }, } ctx := context.Background() startAll := time.Now() + ns := namespaces() // simulate 10 List calls for i := 0; i < 10; i++ { - list := &ListResponse{Items: loadTestItems(strconv.Itoa(i))} + list := &ListResponse{Items: loadTestItems(strconv.Itoa(i), ns)} start := time.Now() _, err = index.AddToBatches(ctx, list) if err != nil { @@ -59,12 +49,15 @@ func TestIndexBatch(t *testing.T) { } // index all batches for each shard/tenant - err = index.IndexBatches(ctx, 1, namespaces) + err = index.IndexBatches(ctx, 1, ns) + if err != nil { + t.Fatal(err) + } elapsed := time.Since(startAll) fmt.Println("Total Time elapsed:", elapsed) - assert.Equal(t, 3, len(index.shards)) + assert.Equal(t, len(ns), len(index.shards)) total, err := index.Count() if err != nil { @@ -74,7 +67,7 @@ func TestIndexBatch(t *testing.T) { assert.Equal(t, uint64(100000), total) } -func loadTestItems(uid string) []*ResourceWrapper { +func loadTestItems(uid string, tenants []string) []*ResourceWrapper { resource := `{ "kind": "", "title": "test", @@ -94,23 +87,26 @@ func loadTestItems(uid string) []*ResourceWrapper { for i := 0; i < 10000; i++ { res := strings.Replace(resource, "", strconv.Itoa(i)+uid, 1) // shuffle kinds - kind := namespaces[rand.Intn(len(kinds))] + kind := kinds[rand.Intn(len(kinds))] res = strings.Replace(res, "", kind, 1) // shuffle namespaces - ns := namespaces[rand.Intn(len(namespaces))] + ns := tenants[rand.Intn(len(tenants))] res = strings.Replace(res, "", ns, 1) items = append(items, &ResourceWrapper{Value: []byte(res)}) } return items } -var namespaces = []string{ - "tenant1", - "tenant2", - "tenant3", -} - var kinds = []string{ "playlist", "folder", } + +// simulate many tenants ( cloud ) +func namespaces() []string { + ns := []string{} + for i := 0; i < 1000; i++ { + ns = append(ns, "tenant"+strconv.Itoa(i)) + } + return ns +}