mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Search: Build index from resource stats (#97320)
This commit is contained in:
parent
64e7dbcadb
commit
9d89d8757f
@ -132,7 +132,7 @@ type rowsWrapper struct {
|
|||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *dashboardSqlAccess) Namespaces(ctx context.Context) ([]string, error) {
|
func (a *dashboardSqlAccess) GetResourceStats(ctx context.Context, minCount int) ([]resource.ResourceStats, error) {
|
||||||
return nil, fmt.Errorf("not implemented")
|
return nil, fmt.Errorf("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,7 +108,8 @@ func (s *cdkBackend) getPath(key *ResourceKey, rv int64) string {
|
|||||||
return buffer.String()
|
return buffer.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *cdkBackend) Namespaces(ctx context.Context) ([]string, error) {
|
// GetResourceStats implements Backend.
|
||||||
|
func (s *cdkBackend) GetResourceStats(ctx context.Context, minCount int) ([]ResourceStats, error) {
|
||||||
return nil, fmt.Errorf("not implemented")
|
return nil, fmt.Errorf("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,6 +81,7 @@ type searchSupport struct {
|
|||||||
access authz.AccessClient
|
access authz.AccessClient
|
||||||
builders *builderCache
|
builders *builderCache
|
||||||
initWorkers int
|
initWorkers int
|
||||||
|
initMinSize int
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -104,6 +105,7 @@ func newSearchSupport(opts SearchOptions, storage StorageBackend, access authz.A
|
|||||||
search: opts.Backend,
|
search: opts.Backend,
|
||||||
log: slog.Default().With("logger", "resource-search"),
|
log: slog.Default().With("logger", "resource-search"),
|
||||||
initWorkers: opts.WorkerThreads,
|
initWorkers: opts.WorkerThreads,
|
||||||
|
initMinSize: opts.InitMinCount,
|
||||||
}
|
}
|
||||||
|
|
||||||
info, err := opts.Resources.GetDocumentBuilders()
|
info, err := opts.Resources.GetDocumentBuilders()
|
||||||
@ -164,40 +166,22 @@ func (s *searchSupport) init(ctx context.Context) error {
|
|||||||
_, span := s.tracer.Start(ctx, tracingPrexfixSearch+"Init")
|
_, span := s.tracer.Start(ctx, tracingPrexfixSearch+"Init")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
// TODO, replace namespaces with a query that gets top values
|
|
||||||
namespaces, err := s.storage.Namespaces(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Hardcoded for now... should come from the query
|
|
||||||
kinds := []schema.GroupResource{
|
|
||||||
{Group: "dashboard.grafana.app", Resource: "dashboards"},
|
|
||||||
{Group: "playlist.grafana.app", Resource: "playlists"},
|
|
||||||
}
|
|
||||||
|
|
||||||
totalBatchesIndexed := 0
|
totalBatchesIndexed := 0
|
||||||
group := errgroup.Group{}
|
group := errgroup.Group{}
|
||||||
group.SetLimit(s.initWorkers)
|
group.SetLimit(s.initWorkers)
|
||||||
|
|
||||||
// Prepare all the (large) indexes
|
stats, err := s.storage.GetResourceStats(ctx, s.initMinSize)
|
||||||
// TODO, threading and query real information:
|
if err != nil {
|
||||||
// SELECT namespace,"group",resource,COUNT(*),resource_version FROM resource
|
return err
|
||||||
// GROUP BY "group", "resource", "namespace"
|
}
|
||||||
// ORDER BY resource_version desc;
|
|
||||||
for _, ns := range namespaces {
|
for _, info := range stats {
|
||||||
for _, gr := range kinds {
|
group.Go(func() error {
|
||||||
group.Go(func() error {
|
s.log.Debug("initializing search index", "namespace", info.Namespace, "group", info.Group, "resource", info.Resource)
|
||||||
s.log.Debug("initializing search index", "namespace", ns, "gr", gr)
|
totalBatchesIndexed++
|
||||||
totalBatchesIndexed++
|
_, _, err = s.build(ctx, info.NamespacedResource, info.Count, info.ResourceVersion)
|
||||||
_, _, err = s.build(ctx, NamespacedResource{
|
return err
|
||||||
Group: gr.Group,
|
})
|
||||||
Resource: gr.Resource,
|
|
||||||
Namespace: ns,
|
|
||||||
}, 10, 0) // TODO, approximate size
|
|
||||||
return err
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
err = group.Wait()
|
err = group.Wait()
|
||||||
|
@ -95,7 +95,14 @@ type StorageBackend interface {
|
|||||||
// For HA setups, this will be more events than the local WriteEvent above!
|
// For HA setups, this will be more events than the local WriteEvent above!
|
||||||
WatchWriteEvents(ctx context.Context) (<-chan *WrittenEvent, error)
|
WatchWriteEvents(ctx context.Context) (<-chan *WrittenEvent, error)
|
||||||
|
|
||||||
Namespaces(ctx context.Context) ([]string, error)
|
GetResourceStats(ctx context.Context, minCount int) ([]ResourceStats, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type ResourceStats struct {
|
||||||
|
NamespacedResource
|
||||||
|
|
||||||
|
Count int64
|
||||||
|
ResourceVersion int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// This interface is not exposed to end users directly
|
// This interface is not exposed to end users directly
|
||||||
@ -133,6 +140,9 @@ type SearchOptions struct {
|
|||||||
|
|
||||||
// How many threads should build indexes
|
// How many threads should build indexes
|
||||||
WorkerThreads int
|
WorkerThreads int
|
||||||
|
|
||||||
|
// Skip building index on startup for small indexes
|
||||||
|
InitMinCount int
|
||||||
}
|
}
|
||||||
|
|
||||||
type ResourceServerOptions struct {
|
type ResourceServerOptions struct {
|
||||||
|
@ -121,6 +121,38 @@ func (b *backend) Stop(_ context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetResourceStats implements Backend.
|
||||||
|
func (b *backend) GetResourceStats(ctx context.Context, minCount int) ([]resource.ResourceStats, error) {
|
||||||
|
_, span := b.tracer.Start(ctx, tracePrefix+".GetResourceStats")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
req := &sqlStatsRequest{
|
||||||
|
SQLTemplate: sqltemplate.New(b.dialect),
|
||||||
|
MinCount: minCount, // not used in query... yet?
|
||||||
|
}
|
||||||
|
|
||||||
|
res := make([]resource.ResourceStats, 0, 100)
|
||||||
|
err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error {
|
||||||
|
rows, err := dbutil.QueryRows(ctx, tx, sqlResourceStats, req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for rows.Next() {
|
||||||
|
row := resource.ResourceStats{}
|
||||||
|
err = rows.Scan(&row.Namespace, &row.Group, &row.Resource, &row.Count, &row.ResourceVersion)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if row.Count > int64(minCount) {
|
||||||
|
res = append(res, row)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
|
||||||
func (b *backend) WriteEvent(ctx context.Context, event resource.WriteEvent) (int64, error) {
|
func (b *backend) WriteEvent(ctx context.Context, event resource.WriteEvent) (int64, error) {
|
||||||
_, span := b.tracer.Start(ctx, tracePrefix+"WriteEvent")
|
_, span := b.tracer.Start(ctx, tracePrefix+"WriteEvent")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
@ -137,35 +169,6 @@ func (b *backend) WriteEvent(ctx context.Context, event resource.WriteEvent) (in
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Namespaces returns the list of unique namespaces in storage.
|
|
||||||
func (b *backend) Namespaces(ctx context.Context) ([]string, error) {
|
|
||||||
var namespaces []string
|
|
||||||
|
|
||||||
err := b.db.WithTx(ctx, RepeatableRead, func(ctx context.Context, tx db.Tx) error {
|
|
||||||
rows, err := tx.QueryContext(ctx, "SELECT DISTINCT(namespace) FROM resource ORDER BY namespace;")
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
_ = rows.Close()
|
|
||||||
}()
|
|
||||||
|
|
||||||
for rows.Next() {
|
|
||||||
var ns string
|
|
||||||
err = rows.Scan(&ns)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
namespaces = append(namespaces, ns)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
return namespaces, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *backend) create(ctx context.Context, event resource.WriteEvent) (int64, error) {
|
func (b *backend) create(ctx context.Context, event resource.WriteEvent) (int64, error) {
|
||||||
ctx, span := b.tracer.Start(ctx, tracePrefix+"Create")
|
ctx, span := b.tracer.Start(ctx, tracePrefix+"Create")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
12
pkg/storage/unified/sql/data/resource_stats.sql
Normal file
12
pkg/storage/unified/sql/data/resource_stats.sql
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
SELECT
|
||||||
|
{{ .Ident "namespace" }},
|
||||||
|
{{ .Ident "group" }},
|
||||||
|
{{ .Ident "resource" }},
|
||||||
|
COUNT(*),
|
||||||
|
MAX({{ .Ident "resource_version" }})
|
||||||
|
FROM {{ .Ident "resource" }}
|
||||||
|
GROUP BY
|
||||||
|
{{ .Ident "namespace" }},
|
||||||
|
{{ .Ident "group" }},
|
||||||
|
{{ .Ident "resource" }}
|
||||||
|
;
|
@ -31,6 +31,7 @@ var (
|
|||||||
sqlResourceInsert = mustTemplate("resource_insert.sql")
|
sqlResourceInsert = mustTemplate("resource_insert.sql")
|
||||||
sqlResourceUpdate = mustTemplate("resource_update.sql")
|
sqlResourceUpdate = mustTemplate("resource_update.sql")
|
||||||
sqlResourceRead = mustTemplate("resource_read.sql")
|
sqlResourceRead = mustTemplate("resource_read.sql")
|
||||||
|
sqlResourceStats = mustTemplate("resource_stats.sql")
|
||||||
sqlResourceList = mustTemplate("resource_list.sql")
|
sqlResourceList = mustTemplate("resource_list.sql")
|
||||||
sqlResourceHistoryList = mustTemplate("resource_history_list.sql")
|
sqlResourceHistoryList = mustTemplate("resource_history_list.sql")
|
||||||
sqlResourceUpdateRV = mustTemplate("resource_update_rv.sql")
|
sqlResourceUpdateRV = mustTemplate("resource_update_rv.sql")
|
||||||
@ -71,6 +72,15 @@ func (r sqlResourceRequest) Validate() error {
|
|||||||
return nil // TODO
|
return nil // TODO
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type sqlStatsRequest struct {
|
||||||
|
sqltemplate.SQLTemplate
|
||||||
|
MinCount int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r sqlStatsRequest) Validate() error {
|
||||||
|
return nil // TODO
|
||||||
|
}
|
||||||
|
|
||||||
type historyPollResponse struct {
|
type historyPollResponse struct {
|
||||||
Key resource.ResourceKey
|
Key resource.ResourceKey
|
||||||
ResourceVersion int64
|
ResourceVersion int64
|
||||||
|
@ -219,5 +219,15 @@ func TestUnifiedStorageQueries(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
|
sqlResourceStats: {
|
||||||
|
{
|
||||||
|
Name: "query",
|
||||||
|
Data: &sqlStatsRequest{
|
||||||
|
SQLTemplate: mocks.NewTestingSQLTemplate(),
|
||||||
|
MinCount: 10, // Not yet used in query (only response filter)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
}})
|
}})
|
||||||
}
|
}
|
||||||
|
@ -65,6 +65,7 @@ func NewResourceServer(ctx context.Context, db infraDB.DB, cfg *setting.Cfg,
|
|||||||
}, tracer, reg),
|
}, tracer, reg),
|
||||||
Resources: docs,
|
Resources: docs,
|
||||||
WorkerThreads: 5, // from cfg?
|
WorkerThreads: 5, // from cfg?
|
||||||
|
InitMinCount: 1,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -13,7 +13,6 @@ import (
|
|||||||
|
|
||||||
"github.com/grafana/authlib/claims"
|
"github.com/grafana/authlib/claims"
|
||||||
"github.com/grafana/dskit/services"
|
"github.com/grafana/dskit/services"
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/apimachinery/identity"
|
"github.com/grafana/grafana/pkg/apimachinery/identity"
|
||||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||||
infraDB "github.com/grafana/grafana/pkg/infra/db"
|
infraDB "github.com/grafana/grafana/pkg/infra/db"
|
||||||
@ -98,6 +97,12 @@ func TestIntegrationBackendHappyPath(t *testing.T) {
|
|||||||
rv3, err = writeEvent(ctx, backend, "item3", resource.WatchEvent_ADDED)
|
rv3, err = writeEvent(ctx, backend, "item3", resource.WatchEvent_ADDED)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Greater(t, rv3, rv2)
|
require.Greater(t, rv3, rv2)
|
||||||
|
|
||||||
|
stats, err := backend.GetResourceStats(ctx, 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, stats, 1)
|
||||||
|
require.Equal(t, int64(3), stats[0].Count)
|
||||||
|
require.Equal(t, rv3, stats[0].ResourceVersion)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("Update item2", func(t *testing.T) {
|
t.Run("Update item2", func(t *testing.T) {
|
||||||
|
12
pkg/storage/unified/sql/testdata/mysql--resource_stats-query.sql
vendored
Executable file
12
pkg/storage/unified/sql/testdata/mysql--resource_stats-query.sql
vendored
Executable file
@ -0,0 +1,12 @@
|
|||||||
|
SELECT
|
||||||
|
`namespace`,
|
||||||
|
`group`,
|
||||||
|
`resource`,
|
||||||
|
COUNT(*),
|
||||||
|
MAX(`resource_version`)
|
||||||
|
FROM `resource`
|
||||||
|
GROUP BY
|
||||||
|
`namespace`,
|
||||||
|
`group`,
|
||||||
|
`resource`
|
||||||
|
;
|
12
pkg/storage/unified/sql/testdata/postgres--resource_stats-query.sql
vendored
Executable file
12
pkg/storage/unified/sql/testdata/postgres--resource_stats-query.sql
vendored
Executable file
@ -0,0 +1,12 @@
|
|||||||
|
SELECT
|
||||||
|
"namespace",
|
||||||
|
"group",
|
||||||
|
"resource",
|
||||||
|
COUNT(*),
|
||||||
|
MAX("resource_version")
|
||||||
|
FROM "resource"
|
||||||
|
GROUP BY
|
||||||
|
"namespace",
|
||||||
|
"group",
|
||||||
|
"resource"
|
||||||
|
;
|
12
pkg/storage/unified/sql/testdata/sqlite--resource_stats-query.sql
vendored
Executable file
12
pkg/storage/unified/sql/testdata/sqlite--resource_stats-query.sql
vendored
Executable file
@ -0,0 +1,12 @@
|
|||||||
|
SELECT
|
||||||
|
"namespace",
|
||||||
|
"group",
|
||||||
|
"resource",
|
||||||
|
COUNT(*),
|
||||||
|
MAX("resource_version")
|
||||||
|
FROM "resource"
|
||||||
|
GROUP BY
|
||||||
|
"namespace",
|
||||||
|
"group",
|
||||||
|
"resource"
|
||||||
|
;
|
Loading…
Reference in New Issue
Block a user