From 1865205d6823a5b7b39ec576e0ec7b16efadda44 Mon Sep 17 00:00:00 2001 From: Ieva Date: Fri, 27 Jan 2023 15:42:08 +0000 Subject: [PATCH] Benchmarks for searchv2 (#60730) * bench-test * cleanup * more simplification * fix tests * correct wrong argument ordering & use constant * fix issues with tests setup * add benchmark results * reuse Gabriel's concurrent setup code * correct error logs for ac benchmarks --- .../acimpl/service_bench_test.go | 68 +----- pkg/services/accesscontrol/actest/common.go | 61 +++++ pkg/services/searchV2/service_bench_test.go | 211 ++++++++++++++++++ 3 files changed, 277 insertions(+), 63 deletions(-) create mode 100644 pkg/services/accesscontrol/actest/common.go create mode 100644 pkg/services/searchV2/service_bench_test.go diff --git a/pkg/services/accesscontrol/acimpl/service_bench_test.go b/pkg/services/accesscontrol/acimpl/service_bench_test.go index 3d864a3aa28..2a12199eda2 100644 --- a/pkg/services/accesscontrol/acimpl/service_bench_test.go +++ b/pkg/services/accesscontrol/acimpl/service_bench_test.go @@ -3,13 +3,13 @@ package acimpl import ( "context" "fmt" - "sync" "testing" "time" "github.com/grafana/grafana/pkg/infra/db" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/services/accesscontrol" + "github.com/grafana/grafana/pkg/services/accesscontrol/actest" "github.com/grafana/grafana/pkg/services/accesscontrol/database" "github.com/grafana/grafana/pkg/services/org" "github.com/grafana/grafana/pkg/services/user" @@ -17,64 +17,6 @@ import ( "github.com/stretchr/testify/require" ) -const concurrency = 10 -const batchSize = 1000 - -type bounds struct { - start, end int -} - -// concurrentBatch spawns the requested amount of workers then ask them to run eachFn on chunks of the requested size -func concurrentBatch(workers, count, size int, eachFn func(start, end int) error) error { - var wg sync.WaitGroup - alldone := make(chan bool) // Indicates that all workers have finished working - chunk := make(chan bounds) // Gives the workers the bounds they should work with - ret := make(chan error) // Allow workers to notify in case of errors - defer close(ret) - - // Launch all workers - for x := 0; x < workers; x++ { - wg.Add(1) - go func() { - defer wg.Done() - for ck := range chunk { - if err := eachFn(ck.start, ck.end); err != nil { - ret <- err - return - } - } - }() - } - - go func() { - // Tell the workers the chunks they have to work on - for i := 0; i < count; { - end := i + size - if end > count { - end = count - } - - chunk <- bounds{start: i, end: end} - - i = end - } - close(chunk) - - // Wait for the workers - wg.Wait() - close(alldone) - }() - - // wait for an error or for all workers to be done - select { - case err := <-ret: - return err - case <-alldone: - break - } - return nil -} - // setupBenchEnv will create userCount users, userCount managed roles with resourceCount managed permission each // Example: setupBenchEnv(b, 2, 3): // - will create 2 users and assign them 2 managed roles @@ -102,7 +44,7 @@ func setupBenchEnv(b *testing.B, usersCount, resourceCount int) (accesscontrol.S require.NoError(b, err) // Populate users, roles and assignments - if errInsert := concurrentBatch(concurrency, usersCount, batchSize, func(start, end int) error { + if errInsert := actest.ConcurrentBatch(actest.Concurrency, usersCount, actest.BatchSize, func(start, end int) error { n := end - start users := make([]user.User, 0, n) orgUsers := make([]org.OrgUser, 0, n) @@ -157,13 +99,13 @@ func setupBenchEnv(b *testing.B, usersCount, resourceCount int) (accesscontrol.S }) return err }); errInsert != nil { - require.NoError(b, err, "could not insert users and roles") + require.NoError(b, errInsert, "could not insert users and roles") return nil, nil } // Populate permissions action2 := "resources:action2" - if errInsert := concurrentBatch(concurrency, resourceCount*usersCount, batchSize, func(start, end int) error { + if errInsert := actest.ConcurrentBatch(actest.Concurrency, resourceCount*usersCount, actest.BatchSize, func(start, end int) error { permissions := make([]accesscontrol.Permission, 0, end-start) for i := start; i < end; i++ { permissions = append(permissions, accesscontrol.Permission{ @@ -180,7 +122,7 @@ func setupBenchEnv(b *testing.B, usersCount, resourceCount int) (accesscontrol.S return err }) }); errInsert != nil { - require.NoError(b, err, "could not insert permissions") + require.NoError(b, errInsert, "could not insert permissions") return nil, nil } diff --git a/pkg/services/accesscontrol/actest/common.go b/pkg/services/accesscontrol/actest/common.go new file mode 100644 index 00000000000..5aa71efe08c --- /dev/null +++ b/pkg/services/accesscontrol/actest/common.go @@ -0,0 +1,61 @@ +package actest + +import "sync" + +const Concurrency = 10 +const BatchSize = 1000 + +type bounds struct { + start, end int +} + +// ConcurrentBatch spawns the requested amount of workers then ask them to run eachFn on chunks of the requested size +func ConcurrentBatch(workers, count, size int, eachFn func(start, end int) error) error { + var wg sync.WaitGroup + alldone := make(chan bool) // Indicates that all workers have finished working + chunk := make(chan bounds) // Gives the workers the bounds they should work with + ret := make(chan error) // Allow workers to notify in case of errors + defer close(ret) + + // Launch all workers + for x := 0; x < workers; x++ { + wg.Add(1) + go func() { + defer wg.Done() + for ck := range chunk { + if err := eachFn(ck.start, ck.end); err != nil { + ret <- err + return + } + } + }() + } + + go func() { + // Tell the workers the chunks they have to work on + for i := 0; i < count; { + end := i + size + if end > count { + end = count + } + + chunk <- bounds{start: i, end: end} + + i = end + } + close(chunk) + + // Wait for the workers + wg.Wait() + close(alldone) + }() + + // wait for an error or for all workers to be done + select { + case err := <-ret: + return err + case <-alldone: + break + } + return nil +} diff --git a/pkg/services/searchV2/service_bench_test.go b/pkg/services/searchV2/service_bench_test.go new file mode 100644 index 00000000000..9fc9fbfa573 --- /dev/null +++ b/pkg/services/searchV2/service_bench_test.go @@ -0,0 +1,211 @@ +package searchV2 + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/grafana/grafana/pkg/infra/db" + "github.com/grafana/grafana/pkg/infra/tracing" + "github.com/grafana/grafana/pkg/services/accesscontrol/actest" + "github.com/grafana/grafana/pkg/services/dashboards" + "github.com/grafana/grafana/pkg/services/featuremgmt" + "github.com/grafana/grafana/pkg/services/org" + "github.com/grafana/grafana/pkg/services/org/orgtest" + "github.com/grafana/grafana/pkg/services/querylibrary/querylibraryimpl" + "github.com/grafana/grafana/pkg/services/sqlstore" + "github.com/grafana/grafana/pkg/services/store" + "github.com/grafana/grafana/pkg/services/user" + "github.com/grafana/grafana/pkg/setting" + + "github.com/stretchr/testify/require" +) + +// setupBenchEnv will set up a database with folderCount folders and dashboardsPerFolder dashboards per folder +// It will also set up and run the search service +// and create a signed in user object with explicit permissions on each dashboard and folder. +func setupBenchEnv(b *testing.B, folderCount, dashboardsPerFolder int) (*StandardSearchService, *user.SignedInUser, error) { + sqlStore := db.InitTestDB(b) + err := populateDB(folderCount, dashboardsPerFolder, sqlStore) + require.NoError(b, err, "error when populating the database") + + // load all dashboards and folders + dbLoadingBatchSize := (dashboardsPerFolder + 1) * folderCount + cfg := &setting.Cfg{Search: setting.SearchSettings{DashboardLoadingBatchSize: dbLoadingBatchSize}} + features := featuremgmt.WithFeatures() + orgSvc := &orgtest.FakeOrgService{ + ExpectedOrgs: []*org.OrgDTO{{ID: 1}}, + } + querySvc := querylibraryimpl.ProvideService(cfg, features) + searchService, ok := ProvideService(cfg, sqlStore, store.NewDummyEntityEventsService(), actest.FakeService{}, + tracing.InitializeTracerForTest(), features, orgSvc, nil, querySvc).(*StandardSearchService) + require.True(b, ok) + + err = runSearchService(searchService) + require.NoError(b, err, "error when running search service") + + user := getSignedInUser(folderCount, dashboardsPerFolder) + + return searchService, user, nil +} + +// Returns a signed in user object with permissions on all dashboards and folders +func getSignedInUser(folderCount, dashboardsPerFolder int) *user.SignedInUser { + folderScopes := make([]string, folderCount) + for i := 1; i <= folderCount; i++ { + folderScopes[i-1] = dashboards.ScopeFoldersProvider.GetResourceScopeUID(fmt.Sprintf("folder%d", i)) + } + + dashScopes := make([]string, folderCount*dashboardsPerFolder) + for i := folderCount + 1; i <= (folderCount * (dashboardsPerFolder + 1)); i++ { + dashScopes[i-(folderCount+1)] = dashboards.ScopeDashboardsProvider.GetResourceScopeUID(fmt.Sprintf("dashboard%d", i)) + } + + user := &user.SignedInUser{ + UserID: 1, + OrgID: 1, + Permissions: map[int64]map[string][]string{ + 1: { + dashboards.ActionDashboardsRead: dashScopes, + dashboards.ActionFoldersRead: folderScopes, + }, + }, + } + + return user +} + +// Runs initial indexing of search service +func runSearchService(searchService *StandardSearchService) error { + if err := searchService.dashboardIndex.buildInitialIndexes(context.Background(), []int64{int64(1)}); err != nil { + return err + } + searchService.dashboardIndex.initialIndexingComplete = true + + // Required for sync that is called during dashboard search + go func() { + for { + doneCh := <-searchService.dashboardIndex.syncCh + close(doneCh) + } + }() + + return nil +} + +// Populates database with dashboards and folders +func populateDB(folderCount, dashboardsPerFolder int, sqlStore *sqlstore.SQLStore) error { + // Insert folders + offset := 1 + if errInsert := actest.ConcurrentBatch(actest.Concurrency, folderCount, actest.BatchSize, func(start, end int) error { + n := end - start + folders := make([]dashboards.Dashboard, 0, n) + now := time.Now() + + for u := start; u < end; u++ { + folderID := int64(u + offset) + folders = append(folders, dashboards.Dashboard{ + ID: folderID, + UID: fmt.Sprintf("folder%v", folderID), + Title: fmt.Sprintf("folder%v", folderID), + IsFolder: true, + OrgID: 1, + Created: now, + Updated: now, + }) + } + + err := sqlStore.WithDbSession(context.Background(), func(sess *db.Session) error { + if _, err := sess.Insert(folders); err != nil { + return err + } + return nil + }) + return err + }); errInsert != nil { + return errInsert + } + + // Insert dashboards + offset += folderCount + if errInsert := actest.ConcurrentBatch(actest.Concurrency, dashboardsPerFolder*folderCount, actest.BatchSize, func(start, end int) error { + n := end - start + dbs := make([]dashboards.Dashboard, 0, n) + now := time.Now() + + for u := start; u < end; u++ { + dashID := int64(u + offset) + folderID := int64((u+offset)%folderCount + 1) + dbs = append(dbs, dashboards.Dashboard{ + ID: dashID, + UID: fmt.Sprintf("dashboard%v", dashID), + Title: fmt.Sprintf("dashboard%v", dashID), + IsFolder: false, + FolderID: folderID, + OrgID: 1, + Created: now, + Updated: now, + }) + } + + err := sqlStore.WithDbSession(context.Background(), func(sess *db.Session) error { + if _, err := sess.Insert(dbs); err != nil { + return err + } + return nil + }) + return err + }); errInsert != nil { + return errInsert + } + + return nil +} + +func benchSearchV2(b *testing.B, folderCount, dashboardsPerFolder int) { + searchService, testUser, err := setupBenchEnv(b, folderCount, dashboardsPerFolder) + require.NoError(b, err) + + b.ResetTimer() + + expectedResultCount := (dashboardsPerFolder + 1) * folderCount + for n := 0; n < b.N; n++ { + result := searchService.doDashboardQuery(context.Background(), testUser, 1, DashboardQuery{Limit: expectedResultCount}) + require.NoError(b, result.Error) + require.NotZero(b, len(result.Frames)) + for _, field := range result.Frames[0].Fields { + if field.Name == "uid" { + require.Equal(b, expectedResultCount, field.Len()) + break + } + } + } +} + +// Test with some dashboards and some folders +func BenchmarkSearchV2_10_10(b *testing.B) { + benchSearchV2(b, 10, 10) +} // ~0.0002 s/op +func BenchmarkSearchV2_10_100(b *testing.B) { + benchSearchV2(b, 10, 100) +} // ~0.002 s/op + +// Test with many dashboards and only one folder +func BenchmarkSearchV2_1_1k(b *testing.B) { + benchSearchV2(b, 1, 1000) +} // ~0.002 s/op +func BenchmarkSearchV2_1_10k(b *testing.B) { + benchSearchV2(b, 1, 10000) +} // ~0.019 s/op + +// Test with a large number of dashboards and folders +func BenchmarkSearchV2_100_100(b *testing.B) { + benchSearchV2(b, 100, 100) +} // ~0.02 s/op +func BenchmarkSearchV2_100_1k(b *testing.B) { + benchSearchV2(b, 100, 1000) +} // ~0.22 s/op +func BenchmarkSearchV2_1k_100(b *testing.B) { + benchSearchV2(b, 1000, 100) +} // ~0.22 s/op