diff --git a/pkg/registry/apis/dashboard/legacy/sql_dashboards.go b/pkg/registry/apis/dashboard/legacy/sql_dashboards.go index f44fd66cf64..817f0e49323 100644 --- a/pkg/registry/apis/dashboard/legacy/sql_dashboards.go +++ b/pkg/registry/apis/dashboard/legacy/sql_dashboards.go @@ -132,7 +132,7 @@ type rowsWrapper struct { err error } -func (a *dashboardSqlAccess) Namespaces(ctx context.Context) ([]string, error) { +func (a *dashboardSqlAccess) GetResourceStats(ctx context.Context, minCount int) ([]resource.ResourceStats, error) { return nil, fmt.Errorf("not implemented") } diff --git a/pkg/storage/unified/resource/cdk_backend.go b/pkg/storage/unified/resource/cdk_backend.go index d9aabec1005..34d49fd2fcc 100644 --- a/pkg/storage/unified/resource/cdk_backend.go +++ b/pkg/storage/unified/resource/cdk_backend.go @@ -108,7 +108,8 @@ func (s *cdkBackend) getPath(key *ResourceKey, rv int64) string { return buffer.String() } -func (s *cdkBackend) Namespaces(ctx context.Context) ([]string, error) { +// GetResourceStats implements Backend. +func (s *cdkBackend) GetResourceStats(ctx context.Context, minCount int) ([]ResourceStats, error) { return nil, fmt.Errorf("not implemented") } diff --git a/pkg/storage/unified/resource/search.go b/pkg/storage/unified/resource/search.go index 56bfcacb3ce..c37f5124fd1 100644 --- a/pkg/storage/unified/resource/search.go +++ b/pkg/storage/unified/resource/search.go @@ -81,6 +81,7 @@ type searchSupport struct { access authz.AccessClient builders *builderCache initWorkers int + initMinSize int } var ( @@ -104,6 +105,7 @@ func newSearchSupport(opts SearchOptions, storage StorageBackend, access authz.A search: opts.Backend, log: slog.Default().With("logger", "resource-search"), initWorkers: opts.WorkerThreads, + initMinSize: opts.InitMinCount, } info, err := opts.Resources.GetDocumentBuilders() @@ -164,40 +166,22 @@ func (s *searchSupport) init(ctx context.Context) error { _, span := s.tracer.Start(ctx, tracingPrexfixSearch+"Init") defer span.End() - // TODO, replace namespaces with a query that gets top values - namespaces, err := s.storage.Namespaces(ctx) - if err != nil { - return err - } - - // Hardcoded for now... should come from the query - kinds := []schema.GroupResource{ - {Group: "dashboard.grafana.app", Resource: "dashboards"}, - {Group: "playlist.grafana.app", Resource: "playlists"}, - } - totalBatchesIndexed := 0 group := errgroup.Group{} group.SetLimit(s.initWorkers) - // Prepare all the (large) indexes - // TODO, threading and query real information: - // SELECT namespace,"group",resource,COUNT(*),resource_version FROM resource - // GROUP BY "group", "resource", "namespace" - // ORDER BY resource_version desc; - for _, ns := range namespaces { - for _, gr := range kinds { - group.Go(func() error { - s.log.Debug("initializing search index", "namespace", ns, "gr", gr) - totalBatchesIndexed++ - _, _, err = s.build(ctx, NamespacedResource{ - Group: gr.Group, - Resource: gr.Resource, - Namespace: ns, - }, 10, 0) // TODO, approximate size - return err - }) - } + stats, err := s.storage.GetResourceStats(ctx, s.initMinSize) + if err != nil { + return err + } + + for _, info := range stats { + group.Go(func() error { + s.log.Debug("initializing search index", "namespace", info.Namespace, "group", info.Group, "resource", info.Resource) + totalBatchesIndexed++ + _, _, err = s.build(ctx, info.NamespacedResource, info.Count, info.ResourceVersion) + return err + }) } err = group.Wait() diff --git a/pkg/storage/unified/resource/server.go b/pkg/storage/unified/resource/server.go index 30c6d0a6ade..614f8534ed7 100644 --- a/pkg/storage/unified/resource/server.go +++ b/pkg/storage/unified/resource/server.go @@ -95,7 +95,14 @@ type StorageBackend interface { // For HA setups, this will be more events than the local WriteEvent above! WatchWriteEvents(ctx context.Context) (<-chan *WrittenEvent, error) - Namespaces(ctx context.Context) ([]string, error) + GetResourceStats(ctx context.Context, minCount int) ([]ResourceStats, error) +} + +type ResourceStats struct { + NamespacedResource + + Count int64 + ResourceVersion int64 } // This interface is not exposed to end users directly @@ -133,6 +140,9 @@ type SearchOptions struct { // How many threads should build indexes WorkerThreads int + + // Skip building index on startup for small indexes + InitMinCount int } type ResourceServerOptions struct { diff --git a/pkg/storage/unified/sql/backend.go b/pkg/storage/unified/sql/backend.go index 0e8f45d173a..4be8773cf67 100644 --- a/pkg/storage/unified/sql/backend.go +++ b/pkg/storage/unified/sql/backend.go @@ -121,6 +121,38 @@ func (b *backend) Stop(_ context.Context) error { return nil } +// GetResourceStats implements Backend. +func (b *backend) GetResourceStats(ctx context.Context, minCount int) ([]resource.ResourceStats, error) { + _, span := b.tracer.Start(ctx, tracePrefix+".GetResourceStats") + defer span.End() + + req := &sqlStatsRequest{ + SQLTemplate: sqltemplate.New(b.dialect), + MinCount: minCount, // not used in query... yet? + } + + res := make([]resource.ResourceStats, 0, 100) + err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error { + rows, err := dbutil.QueryRows(ctx, tx, sqlResourceStats, req) + if err != nil { + return err + } + for rows.Next() { + row := resource.ResourceStats{} + err = rows.Scan(&row.Namespace, &row.Group, &row.Resource, &row.Count, &row.ResourceVersion) + if err != nil { + return err + } + if row.Count > int64(minCount) { + res = append(res, row) + } + } + return err + }) + + return res, err +} + func (b *backend) WriteEvent(ctx context.Context, event resource.WriteEvent) (int64, error) { _, span := b.tracer.Start(ctx, tracePrefix+"WriteEvent") defer span.End() @@ -137,35 +169,6 @@ func (b *backend) WriteEvent(ctx context.Context, event resource.WriteEvent) (in } } -// Namespaces returns the list of unique namespaces in storage. -func (b *backend) Namespaces(ctx context.Context) ([]string, error) { - var namespaces []string - - err := b.db.WithTx(ctx, RepeatableRead, func(ctx context.Context, tx db.Tx) error { - rows, err := tx.QueryContext(ctx, "SELECT DISTINCT(namespace) FROM resource ORDER BY namespace;") - if err != nil { - return err - } - - defer func() { - _ = rows.Close() - }() - - for rows.Next() { - var ns string - err = rows.Scan(&ns) - if err != nil { - return err - } - namespaces = append(namespaces, ns) - } - - return nil - }) - - return namespaces, err -} - func (b *backend) create(ctx context.Context, event resource.WriteEvent) (int64, error) { ctx, span := b.tracer.Start(ctx, tracePrefix+"Create") defer span.End() diff --git a/pkg/storage/unified/sql/data/resource_stats.sql b/pkg/storage/unified/sql/data/resource_stats.sql new file mode 100644 index 00000000000..c5d8cc69589 --- /dev/null +++ b/pkg/storage/unified/sql/data/resource_stats.sql @@ -0,0 +1,12 @@ +SELECT + {{ .Ident "namespace" }}, + {{ .Ident "group" }}, + {{ .Ident "resource" }}, + COUNT(*), + MAX({{ .Ident "resource_version" }}) +FROM {{ .Ident "resource" }} +GROUP BY + {{ .Ident "namespace" }}, + {{ .Ident "group" }}, + {{ .Ident "resource" }} +; \ No newline at end of file diff --git a/pkg/storage/unified/sql/queries.go b/pkg/storage/unified/sql/queries.go index 7ce060e07a7..27fc77ad55c 100644 --- a/pkg/storage/unified/sql/queries.go +++ b/pkg/storage/unified/sql/queries.go @@ -31,6 +31,7 @@ var ( sqlResourceInsert = mustTemplate("resource_insert.sql") sqlResourceUpdate = mustTemplate("resource_update.sql") sqlResourceRead = mustTemplate("resource_read.sql") + sqlResourceStats = mustTemplate("resource_stats.sql") sqlResourceList = mustTemplate("resource_list.sql") sqlResourceHistoryList = mustTemplate("resource_history_list.sql") sqlResourceUpdateRV = mustTemplate("resource_update_rv.sql") @@ -71,6 +72,15 @@ func (r sqlResourceRequest) Validate() error { return nil // TODO } +type sqlStatsRequest struct { + sqltemplate.SQLTemplate + MinCount int +} + +func (r sqlStatsRequest) Validate() error { + return nil // TODO +} + type historyPollResponse struct { Key resource.ResourceKey ResourceVersion int64 diff --git a/pkg/storage/unified/sql/queries_test.go b/pkg/storage/unified/sql/queries_test.go index 788abe95dad..30fe25eed2f 100644 --- a/pkg/storage/unified/sql/queries_test.go +++ b/pkg/storage/unified/sql/queries_test.go @@ -219,5 +219,15 @@ func TestUnifiedStorageQueries(t *testing.T) { }, }, }, + + sqlResourceStats: { + { + Name: "query", + Data: &sqlStatsRequest{ + SQLTemplate: mocks.NewTestingSQLTemplate(), + MinCount: 10, // Not yet used in query (only response filter) + }, + }, + }, }}) } diff --git a/pkg/storage/unified/sql/server.go b/pkg/storage/unified/sql/server.go index d234cfaa5df..a0fe7e230ad 100644 --- a/pkg/storage/unified/sql/server.go +++ b/pkg/storage/unified/sql/server.go @@ -65,6 +65,7 @@ func NewResourceServer(ctx context.Context, db infraDB.DB, cfg *setting.Cfg, }, tracer, reg), Resources: docs, WorkerThreads: 5, // from cfg? + InitMinCount: 1, } } diff --git a/pkg/storage/unified/sql/test/integration_test.go b/pkg/storage/unified/sql/test/integration_test.go index 6297c71fe78..2abe4aec365 100644 --- a/pkg/storage/unified/sql/test/integration_test.go +++ b/pkg/storage/unified/sql/test/integration_test.go @@ -13,7 +13,6 @@ import ( "github.com/grafana/authlib/claims" "github.com/grafana/dskit/services" - "github.com/grafana/grafana/pkg/apimachinery/identity" "github.com/grafana/grafana/pkg/apimachinery/utils" infraDB "github.com/grafana/grafana/pkg/infra/db" @@ -98,6 +97,12 @@ func TestIntegrationBackendHappyPath(t *testing.T) { rv3, err = writeEvent(ctx, backend, "item3", resource.WatchEvent_ADDED) require.NoError(t, err) require.Greater(t, rv3, rv2) + + stats, err := backend.GetResourceStats(ctx, 0) + require.NoError(t, err) + require.Len(t, stats, 1) + require.Equal(t, int64(3), stats[0].Count) + require.Equal(t, rv3, stats[0].ResourceVersion) }) t.Run("Update item2", func(t *testing.T) { diff --git a/pkg/storage/unified/sql/testdata/mysql--resource_stats-query.sql b/pkg/storage/unified/sql/testdata/mysql--resource_stats-query.sql new file mode 100755 index 00000000000..3818b621a11 --- /dev/null +++ b/pkg/storage/unified/sql/testdata/mysql--resource_stats-query.sql @@ -0,0 +1,12 @@ +SELECT + `namespace`, + `group`, + `resource`, + COUNT(*), + MAX(`resource_version`) +FROM `resource` +GROUP BY + `namespace`, + `group`, + `resource` +; diff --git a/pkg/storage/unified/sql/testdata/postgres--resource_stats-query.sql b/pkg/storage/unified/sql/testdata/postgres--resource_stats-query.sql new file mode 100755 index 00000000000..095b0a8011b --- /dev/null +++ b/pkg/storage/unified/sql/testdata/postgres--resource_stats-query.sql @@ -0,0 +1,12 @@ +SELECT + "namespace", + "group", + "resource", + COUNT(*), + MAX("resource_version") +FROM "resource" +GROUP BY + "namespace", + "group", + "resource" +; diff --git a/pkg/storage/unified/sql/testdata/sqlite--resource_stats-query.sql b/pkg/storage/unified/sql/testdata/sqlite--resource_stats-query.sql new file mode 100755 index 00000000000..095b0a8011b --- /dev/null +++ b/pkg/storage/unified/sql/testdata/sqlite--resource_stats-query.sql @@ -0,0 +1,12 @@ +SELECT + "namespace", + "group", + "resource", + COUNT(*), + MAX("resource_version") +FROM "resource" +GROUP BY + "namespace", + "group", + "resource" +;