Unified Storage Indexer: Build tenant indexes concurrently (#95795)

* WIP - build tenant indexes concurrently

* adds local dev seeders and readme for indexer

* update logging and adds locking in getShard()

* update logs

* Adds Namespaces func. Initializes index after ResourceServer is created.

* fixes Count() and updates test lint issues

* Initialize index separately. Don't do it when creating the resource server. Makes testing really awkward.

* fix lint error

* handles error when getting namespaces

* updates test and index helper funcs
This commit is contained in:
owensmallwood 2024-11-06 12:58:07 -06:00 committed by GitHub
parent 3877537dc0
commit b6596db75e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 373 additions and 70 deletions

View File

@ -131,6 +131,10 @@ type rowsWrapper struct {
err error
}
func (a *dashboardSqlAccess) Namespaces(ctx context.Context) ([]string, error) {
return nil, fmt.Errorf("not implemented")
}
func (r *rowsWrapper) Close() error {
if r.rows == nil {
return nil

View File

@ -108,6 +108,10 @@ func (s *cdkBackend) getPath(key *ResourceKey, rv int64) string {
return buffer.String()
}
func (s *cdkBackend) Namespaces(ctx context.Context) ([]string, error) {
return nil, fmt.Errorf("not implemented")
}
func (s *cdkBackend) WriteEvent(ctx context.Context, event WriteEvent) (rv int64, err error) {
// Scope the lock
{

View File

@ -4,6 +4,7 @@ import (
"context"
golog "log"
"path/filepath"
"sync"
"time"
"github.com/blevesearch/bleve/v2"
@ -31,20 +32,22 @@ type Opts struct {
}
type Index struct {
shards map[string]Shard
opts Opts
s *server
log log.Logger
tracer tracing.Tracer
shardMutex sync.RWMutex
shards map[string]*Shard
opts Opts
s *server
log log.Logger
tracer tracing.Tracer
}
func NewIndex(s *server, opts Opts, tracer tracing.Tracer) *Index {
idx := &Index{
s: s,
opts: opts,
shards: make(map[string]Shard),
log: log.New("unifiedstorage.search.index"),
tracer: tracer,
shardMutex: sync.RWMutex{},
s: s,
opts: opts,
shards: make(map[string]*Shard),
log: log.New("unifiedstorage.search.index"),
tracer: tracer,
}
return idx
@ -128,32 +131,86 @@ func (i *Index) AddToBatches(ctx context.Context, list *ListResponse) ([]string,
}
func (i *Index) Init(ctx context.Context) error {
logger := i.log.FromContext(ctx)
ctx, span := i.tracer.Start(ctx, tracingPrexfixIndex+"Init")
defer span.End()
start := time.Now().Unix()
group := errgroup.Group{}
group.SetLimit(i.opts.Workers)
totalObjects := 0
// Get all tenants currently in Unified Storage
tenants, err := i.s.backend.Namespaces(ctx)
if err != nil {
return err
}
for _, tenant := range tenants {
group.Go(func() error {
logger.Info("initializing index for tenant", "tenant", tenant)
objs, err := i.InitForTenant(ctx, tenant)
if err != nil {
return err
}
totalObjects += objs
return nil
})
}
err = group.Wait()
if err != nil {
return err
}
//index all remaining batches for all tenants
logger.Info("indexing remaining batches", "shards", len(i.shards))
err = i.IndexBatches(ctx, 1, i.allTenants())
if err != nil {
return err
}
end := time.Now().Unix()
totalDocCount := getTotalDocCount(i)
logger.Info("Initial indexing finished", "seconds", float64(end-start), "objs_fetched", totalObjects, "objs_indexed", totalDocCount)
span.AddEvent(
"indexing finished",
trace.WithAttributes(attribute.Int64("objects_indexed", int64(totalDocCount))),
trace.WithAttributes(attribute.Int64("objects_fetched", int64(totalObjects))),
)
if IndexServerMetrics != nil {
IndexServerMetrics.IndexCreationTime.WithLabelValues().Observe(float64(end - start))
}
return nil
}
func (i *Index) InitForTenant(ctx context.Context, namespace string) (int, error) {
ctx, span := i.tracer.Start(ctx, tracingPrexfixIndex+"InitForTenant")
defer span.End()
logger := i.log.FromContext(ctx)
start := time.Now().Unix()
resourceTypes := fetchResourceTypes()
totalObjectsFetched := 0
for _, rt := range resourceTypes {
logger.Info("indexing resource", "kind", rt.Key.Resource, "list_limit", i.opts.ListLimit, "batch_size", i.opts.BatchSize, "workers", i.opts.Workers)
logger.Debug("indexing resource", "kind", rt.Key.Resource, "list_limit", i.opts.ListLimit, "batch_size", i.opts.BatchSize, "workers", i.opts.Workers, "namespace", namespace)
r := &ListRequest{Options: rt, Limit: int64(i.opts.ListLimit)}
r.Options.Key.Namespace = namespace // scope the list to a tenant or this will take forever when US has 1M+ resources
// Paginate through the list of resources and index each page
for {
logger.Info("fetching resource list", "kind", rt.Key.Resource)
logger.Debug("fetching resource list", "kind", rt.Key.Resource, "namespace", namespace)
list, err := i.s.List(ctx, r)
if err != nil {
return err
return totalObjectsFetched, err
}
totalObjectsFetched += len(list.Items)
logger.Info("indexing batch", "kind", rt.Key.Resource, "count", len(list.Items))
logger.Debug("indexing batch", "kind", rt.Key.Resource, "count", len(list.Items), "namespace", namespace)
//add changes to batches for shards with changes in the List
err = i.writeBatch(ctx, list)
if err != nil {
return err
return totalObjectsFetched, err
}
if list.NextPageToken == "" {
@ -164,21 +221,13 @@ func (i *Index) Init(ctx context.Context) error {
}
}
//index all remaining batches
logger.Info("indexing remaining batches", "shards", len(i.shards))
err := i.IndexBatches(ctx, 1, i.allTenants())
if err != nil {
return err
}
span.AddEvent(
"indexing finished for tenant",
trace.WithAttributes(attribute.Int64("objects_indexed", int64(totalObjectsFetched))),
trace.WithAttributes(attribute.String("tenant", namespace)),
)
span.AddEvent("indexing finished", trace.WithAttributes(attribute.Int64("objects_indexed", int64(totalObjectsFetched))))
end := time.Now().Unix()
logger.Info("Initial indexing finished", "seconds", float64(end-start))
if IndexServerMetrics != nil {
IndexServerMetrics.IndexCreationTime.WithLabelValues().Observe(float64(end - start))
}
return nil
return totalObjectsFetched, nil
}
func (i *Index) writeBatch(ctx context.Context, list *ListResponse) error {
@ -207,6 +256,17 @@ func (i *Index) Index(ctx context.Context, data *Data) error {
}
tenant := res.Namespace
logger.Debug("indexing resource for tenant", "res", string(data.Value.Value), "tenant", tenant)
// if tenant doesn't exist, they may have been created during initial indexing
_, ok := i.shards[tenant]
if !ok {
i.log.Info("tenant not found, initializing their index", "tenant", tenant)
_, err = i.InitForTenant(ctx, tenant)
if err != nil {
return err
}
}
shard, err := i.getShard(tenant)
if err != nil {
return err
@ -320,18 +380,20 @@ func (i *Index) Search(ctx context.Context, request *SearchRequest) (*IndexResul
return &IndexResults{Values: results, Groups: groups}, nil
}
func (i *Index) Count() (uint64, error) {
var total uint64
// Count returns the total doc count
func (i *Index) Count() (int, error) {
total := 0
for _, shard := range i.shards {
count, err := shard.index.DocCount()
if err != nil {
i.log.Error("failed to get doc count", "error", err)
}
total += count
total += int(count)
}
return total, nil
}
// allTenants returns a list of all tenants in the index
func (i *Index) allTenants() []string {
tenants := make([]string, 0, len(i.shards))
for tenant := range i.shards {
@ -340,7 +402,10 @@ func (i *Index) allTenants() []string {
return tenants
}
func (i *Index) getShard(tenant string) (Shard, error) {
func (i *Index) getShard(tenant string) (*Shard, error) {
i.shardMutex.Lock()
defer i.shardMutex.Unlock()
shard, ok := i.shards[tenant]
if ok {
return shard, nil
@ -348,16 +413,16 @@ func (i *Index) getShard(tenant string) (Shard, error) {
index, path, err := i.createIndex()
if err != nil {
return Shard{}, err
return &Shard{}, err
}
shard = Shard{
shard = &Shard{
index: index,
path: path,
batch: index.NewBatch(),
}
// TODO: do we need to lock this?
i.shards[tenant] = shard
return shard, nil
}
@ -389,17 +454,19 @@ func createInMemoryIndex() (bleve.Index, string, error) {
// TODO - fetch from api
func fetchResourceTypes() []*ListOptions {
items := []*ListOptions{}
items = append(items, &ListOptions{
Key: &ResourceKey{
Group: "playlist.grafana.app",
Resource: "playlists",
items = append(items,
&ListOptions{
Key: &ResourceKey{
Group: "playlist.grafana.app",
Resource: "playlists",
},
},
}, &ListOptions{
Key: &ResourceKey{
Group: "folder.grafana.app",
Resource: "folders",
&ListOptions{
Key: &ResourceKey{
Group: "folder.grafana.app",
Resource: "folders",
},
},
},
&ListOptions{
Key: &ResourceKey{
Group: "dashboard.grafana.app",

View File

@ -87,23 +87,23 @@ func (s *IndexMetrics) Describe(ch chan<- *prometheus.Desc) {
s.IndexLatency.Describe(ch)
}
// getTotalDocCount returns the total number of documents in the index
func getTotalDocCount(index *Index) float64 {
var totalCount float64
totalCount = 0
if index == nil {
return totalCount
}
for _, shard := range index.shards {
docCount, err := shard.index.DocCount()
if err != nil {
continue
}
totalCount += float64(docCount)
count, _ := shard.index.DocCount()
totalCount += float64(count)
}
return totalCount
}
// getTotalIndexSize returns the total size of the index directory when using a file-based index
func getTotalIndexSize(dir string) (int64, error) {
var totalSize int64

View File

@ -29,6 +29,7 @@ func TestIndexDashboard(t *testing.T) {
require.NoError(t, err)
assertCountEquals(t, index, 1)
require.Equal(t, 1, len(index.allTenants()))
assertSearchCountEquals(t, index, "*", nil, 1)
}
@ -83,7 +84,7 @@ func TestLookupNames(t *testing.T) {
err := index.writeBatch(testContext, list)
require.NoError(t, err)
assertCountEquals(t, index, uint64(records))
assertCountEquals(t, index, records)
query := ""
chunk := ids[:100] // query for n folders by id
for _, id := range chunk {
@ -185,7 +186,7 @@ func newTestIndex(t *testing.T, batchSize int) *Index {
return &Index{
tracer: trace,
shards: make(map[string]Shard),
shards: make(map[string]*Shard),
log: log.New("unifiedstorage.search.index"),
opts: Opts{
ListLimit: 5000,
@ -195,7 +196,7 @@ func newTestIndex(t *testing.T, batchSize int) *Index {
}
}
func assertCountEquals(t *testing.T, index *Index, expected uint64) {
func assertCountEquals(t *testing.T, index *Index, expected int) {
total, err := index.Count()
require.NoError(t, err)
assert.Equal(t, expected, total)

View File

@ -75,6 +75,8 @@ type StorageBackend interface {
// Get all events from the store
// For HA setups, this will be more events than the local WriteEvent above!
WatchWriteEvents(ctx context.Context) (<-chan *WrittenEvent, error)
Namespaces(ctx context.Context) ([]string, error)
}
// This interface is not exposed to end users directly
@ -182,7 +184,7 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
UserID: 1,
IsGrafanaAdmin: true,
}))
return &server{
s := &server{
tracer: opts.Tracer,
log: logger,
backend: opts.Backend,
@ -194,7 +196,9 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
now: opts.Now,
ctx: ctx,
cancel: cancel,
}, nil
}
return s, nil
}
var _ ResourceServer = &server{}
@ -765,6 +769,10 @@ func (s *server) Origin(ctx context.Context, req *OriginRequest) (*OriginRespons
// Index returns the search index. If the index is not initialized, it will be initialized.
func (s *server) Index(ctx context.Context) (*Index, error) {
if err := s.Init(ctx); err != nil {
return nil, err
}
index := s.index.(*IndexServer)
if index.index == nil {
err := index.Init(ctx, s)

View File

@ -137,6 +137,31 @@ func (b *backend) WriteEvent(ctx context.Context, event resource.WriteEvent) (in
}
}
// Namespaces returns the list of unique namespaces in storage.
func (b *backend) Namespaces(ctx context.Context) ([]string, error) {
var namespaces []string
err := b.db.WithTx(ctx, RepeatableRead, func(ctx context.Context, tx db.Tx) error {
rows, err := tx.QueryContext(ctx, "SELECT DISTINCT(namespace) FROM resource ORDER BY namespace;")
if err != nil {
return err
}
for rows.Next() {
var ns string
err = rows.Scan(&ns)
if err != nil {
return err
}
namespaces = append(namespaces, ns)
}
err = rows.Close()
return err
})
return namespaces, err
}
func (b *backend) create(ctx context.Context, event resource.WriteEvent) (int64, error) {
ctx, span := b.tracer.Start(ctx, tracePrefix+"Create")
defer span.End()

View File

@ -0,0 +1,5 @@
<h2>Indexer Seeders</h2>
These seeders are used for performance testing in the indexer in your local dev environment. They are not used in production.
By default, they each create 1 million resources. You can adjust this in the script if needed. Creating 1M objects takes about 5-10 minutes.

View File

@ -0,0 +1,81 @@
DROP PROCEDURE IF EXISTS InsertMillionDashboards;
DELIMITER //
CREATE PROCEDURE InsertMillionDashboards()
BEGIN
DECLARE i INT DEFAULT 0;
DECLARE new_guid CHAR(36);
DECLARE unique_name VARCHAR(20);
DECLARE batch_size INT DEFAULT 10;
DECLARE stmt_resource TEXT DEFAULT '';
DECLARE stmt_resource_history TEXT DEFAULT '';
DECLARE random_number INT;
WHILE i < 1000000 DO
-- Generate a unique GUID and unique name
SET new_guid = UUID();
SET unique_name = CONCAT('ad5wkqk', i);
SET @new_uid = CONCAT('dashboard', i);
-- Generate a random number between 1 and 1000
SET random_number = FLOOR(1 + (RAND() * 1000));
SET @stack_namespace = CONCAT('stacks-', random_number); -- Store stack namespace in a variable
-- Append the value part of the SQL insert statement to both resource and history inserts
SET stmt_resource = CONCAT(stmt_resource,
'(', QUOTE(new_guid), ', ', QUOTE('1730396628210501'), ', ', QUOTE('dashboard.grafana.app'), ', ', QUOTE('dashboards'), ', ',
QUOTE(@stack_namespace), ', ', QUOTE(unique_name), ', ', QUOTE(CONCAT('{\"kind\":\"Dashboard\",\"apiVersion\":\"dashboard.grafana.app/v0alpha1\",\"metadata\":{\"name\":\"ad5wkqk\",\"namespace\":\"', @stack_namespace, '\",\"uid\":\"', @new_uid, '\",\"creationTimestamp\":\"2024-10-31T17:43:48Z\",\"annotations\":{\"grafana.app/createdBy\":\"user:u000000001\",\"grafana.app/originHash\":\"Grafana v11.4.0-pre (d2d7ae2e86)\",\"grafana.app/originName\":\"UI\",\"grafana.app/originPath\":\"/dashboard/new\"},\"managedFields\":[{\"manager\":\"Mozilla\",\"operation\":\"Update\",\"apiVersion\":\"dashboard.grafana.app/v0alpha1\",\"time\":\"2024-10-31T17:43:48Z\",\"fieldsType\":\"FieldsV1\",\"fieldsV1\":{\"f:metadata\":{\"f:annotations\":{\".\":{},\"f:grafana.app/originHash\":{},\"f:grafana.app/originName\":{},\"f:grafana.app/originPath\":{}},\"f:generateName\":{}},\"f:spec\":{\"f:annotations\":{\".\":{},\"f:list\":{}},\"f:description\":{},\"f:editable\":{},\"f:fiscalYearStartMonth\":{},\"f:graphTooltip\":{},\"f:id\":{},\"f:links\":{},\"f:panels\":{},\"f:preload\":{},\"f:schemaVersion\":{},\"f:tags\":{},\"f:templating\":{\".\":{},\"f:list\":{}},\"f:timepicker\":{},\"f:timezone\":{},\"f:title\":{},\"f:uid\":{},\"f:version\":{},\"f:weekStart\":{}}}}]},\"spec\":{\"annotations\":{\"list\":[{\"builtIn\":1,\"datasource\":{\"type\":\"grafana\",\"uid\":\"-- Grafana --\"},\"enable\":true,\"hide\":true,\"iconColor\":\"rgba(0, 211, 255, 1)\",\"name\":\"Annotations \\u0026 Alerts\",\"type\":\"dashboard\"}]},\"description\":\"\",\"editable\":true,\"fiscalYearStartMonth\":0,\"graphTooltip\":0,\"id\":null,\"links\":[],\"panels\":[{\"datasource\":{\"type\":\"grafana-testdata-datasource\",\"uid\":\"PD8C576611E62080A\"},\"fieldConfig\":{\"defaults\":{\"color\":{\"mode\":\"palette-classic\"},\"custom\":{\"axisBorderShow\":false,\"axisCenteredZero\":false,\"axisColorMode\":\"text\",\"axisLabel\":\"\",\"axisPlacement\":\"auto\",\"barAlignment\":0,\"barWidthFactor\":0.6,\"drawStyle\":\"line\",\"fillOpacity\":0,\"gradientMode\":\"none\",\"hideFrom\":{\"legend\":false,\"tooltip\":false,\"viz\":false},\"insertNulls\":false,\"lineInterpolation\":\"linear\",\"lineWidth\":1,\"pointSize\":5,\"scaleDistribution\":{\"type\":\"linear\"},\"showPoints\":\"auto\",\"spanNulls\":false,\"stacking\":{\"group\":\"A\",\"mode\":\"none\"},\"thresholdsStyle\":{\"mode\":\"off\"}},\"mappings\":[],\"thresholds\":{\"mode\":\"absolute\",\"steps\":[{\"color\":\"green\",\"value\":null},{\"color\":\"red\",\"value\":80}]}},\"overrides\":[]},\"gridPos\":{\"h\":8,\"w\":12,\"x\":0,\"y\":0},\"id\":1,\"options\":{\"legend\":{\"calcs\":[],\"displayMode\":\"list\",\"placement\":\"bottom\",\"showLegend\":true},\"tooltip\":{\"mode\":\"single\",\"sort\":\"none\"}},\"pluginVersion\":\"11.4.0-pre\",\"targets\":[{\"datasource\":{\"type\":\"grafana-testdata-datasource\",\"uid\":\"PD8C576611E62080A\"},\"refId\":\"A\"}],\"title\":\"Panel Title\",\"type\":\"timeseries\"}],\"preload\":false,\"schemaVersion\":40,\"tags\":[],\"templating\":{\"list\":[]},\"timepicker\":{},\"timezone\":\"browser\",\"title\":\"dashboard1\",\"uid\":\"\",\"version\":0,\"weekStart\":\"\"}}\n')), ', ', QUOTE('1'), ', NULL, ', QUOTE('0'), '), ');
SET stmt_resource_history = CONCAT(stmt_resource_history,
'(', QUOTE(new_guid), ', ', QUOTE('1730396628210501'), ', ', QUOTE('dashboard.grafana.app'), ', ', QUOTE('dashboards'), ', ',
QUOTE(@stack_namespace), ', ', QUOTE(unique_name), ', ', QUOTE(CONCAT('{\"kind\":\"Dashboard\",\"apiVersion\":\"dashboard.grafana.app/v0alpha1\",\"metadata\":{\"name\":\"ad5wkqk\",\"namespace\":\"', @stack_namespace, '\",\"uid\":\"', @new_uid ,'\",\"creationTimestamp\":\"2024-10-31T17:43:48Z\",\"annotations\":{\"grafana.app/createdBy\":\"user:u000000001\",\"grafana.app/originHash\":\"Grafana v11.4.0-pre (d2d7ae2e86)\",\"grafana.app/originName\":\"UI\",\"grafana.app/originPath\":\"/dashboard/new\"},\"managedFields\":[{\"manager\":\"Mozilla\",\"operation\":\"Update\",\"apiVersion\":\"dashboard.grafana.app/v0alpha1\",\"time\":\"2024-10-31T17:43:48Z\",\"fieldsType\":\"FieldsV1\",\"fieldsV1\":{\"f:metadata\":{\"f:annotations\":{\".\":{},\"f:grafana.app/originHash\":{},\"f:grafana.app/originName\":{},\"f:grafana.app/originPath\":{}},\"f:generateName\":{}},\"f:spec\":{\"f:annotations\":{\".\":{},\"f:list\":{}},\"f:description\":{},\"f:editable\":{},\"f:fiscalYearStartMonth\":{},\"f:graphTooltip\":{},\"f:id\":{},\"f:links\":{},\"f:panels\":{},\"f:preload\":{},\"f:schemaVersion\":{},\"f:tags\":{},\"f:templating\":{\".\":{},\"f:list\":{}},\"f:timepicker\":{},\"f:timezone\":{},\"f:title\":{},\"f:uid\":{},\"f:version\":{},\"f:weekStart\":{}}}}]},\"spec\":{\"annotations\":{\"list\":[{\"builtIn\":1,\"datasource\":{\"type\":\"grafana\",\"uid\":\"-- Grafana --\"},\"enable\":true,\"hide\":true,\"iconColor\":\"rgba(0, 211, 255, 1)\",\"name\":\"Annotations \\u0026 Alerts\",\"type\":\"dashboard\"}]},\"description\":\"\",\"editable\":true,\"fiscalYearStartMonth\":0,\"graphTooltip\":0,\"id\":null,\"links\":[],\"panels\":[{\"datasource\":{\"type\":\"grafana-testdata-datasource\",\"uid\":\"PD8C576611E62080A\"},\"fieldConfig\":{\"defaults\":{\"color\":{\"mode\":\"palette-classic\"},\"custom\":{\"axisBorderShow\":false,\"axisCenteredZero\":false,\"axisColorMode\":\"text\",\"axisLabel\":\"\",\"axisPlacement\":\"auto\",\"barAlignment\":0,\"barWidthFactor\":0.6,\"drawStyle\":\"line\",\"fillOpacity\":0,\"gradientMode\":\"none\",\"hideFrom\":{\"legend\":false,\"tooltip\":false,\"viz\":false},\"insertNulls\":false,\"lineInterpolation\":\"linear\",\"lineWidth\":1,\"pointSize\":5,\"scaleDistribution\":{\"type\":\"linear\"},\"showPoints\":\"auto\",\"spanNulls\":false,\"stacking\":{\"group\":\"A\",\"mode\":\"none\"},\"thresholdsStyle\":{\"mode\":\"off\"}},\"mappings\":[],\"thresholds\":{\"mode\":\"absolute\",\"steps\":[{\"color\":\"green\",\"value\":null},{\"color\":\"red\",\"value\":80}]}},\"overrides\":[]},\"gridPos\":{\"h\":8,\"w\":12,\"x\":0,\"y\":0},\"id\":1,\"options\":{\"legend\":{\"calcs\":[],\"displayMode\":\"list\",\"placement\":\"bottom\",\"showLegend\":true},\"tooltip\":{\"mode\":\"single\",\"sort\":\"none\"}},\"pluginVersion\":\"11.4.0-pre\",\"targets\":[{\"datasource\":{\"type\":\"grafana-testdata-datasource\",\"uid\":\"PD8C576611E62080A\"},\"refId\":\"A\"}],\"title\":\"Panel Title\",\"type\":\"timeseries\"}],\"preload\":false,\"schemaVersion\":40,\"tags\":[],\"templating\":{\"list\":[]},\"timepicker\":{},\"timezone\":\"browser\",\"title\":\"dashboard1\",\"uid\":\"\",\"version\":0,\"weekStart\":\"\"}}\n')), ', ', QUOTE('1'), ', NULL, ', QUOTE('0'), '), ');
SET i = i + 1;
-- Execute statements in batches to avoid reaching the TEXT limit
IF i % batch_size = 0 THEN
-- Remove the last comma and space
SET stmt_resource = LEFT(stmt_resource, LENGTH(stmt_resource) - 2);
SET stmt_resource_history = LEFT(stmt_resource_history, LENGTH(stmt_resource_history) - 2);
-- Insert current batch into `resource`
SET @stmt_resource = CONCAT('INSERT INTO `resource` (`guid`, `resource_version`, `group`, `resource`, `namespace`, `name`, `value`, `action`, `label_set`, `previous_resource_version`) VALUES ', stmt_resource);
PREPARE stmt_resource_stmt FROM @stmt_resource;
EXECUTE stmt_resource_stmt;
DEALLOCATE PREPARE stmt_resource_stmt;
-- Insert current batch into `resource_history`
SET @stmt_resource_history = CONCAT('INSERT INTO `resource_history` (`guid`, `resource_version`, `group`, `resource`, `namespace`, `name`, `value`, `action`, `label_set`, `previous_resource_version`) VALUES ', stmt_resource_history);
PREPARE stmt_resource_history_stmt FROM @stmt_resource_history;
EXECUTE stmt_resource_history_stmt;
DEALLOCATE PREPARE stmt_resource_history_stmt;
-- Reset the batch for the next iteration
SET stmt_resource = '';
SET stmt_resource_history = '';
END IF;
END WHILE;
-- Insert any remaining records if they don't fill a full batch
IF stmt_resource != '' THEN
SET stmt_resource = LEFT(stmt_resource, LENGTH(stmt_resource) - 2);
SET stmt_resource_history = LEFT(stmt_resource_history, LENGTH(stmt_resource_history) - 2);
SET @stmt_resource = CONCAT('INSERT INTO `resource` (`guid`, `resource_version`, `group`, `resource`, `namespace`, `name`, `value`, `action`, `label_set`, `previous_resource_version`) VALUES ', stmt_resource);
PREPARE stmt_resource_stmt FROM @stmt_resource;
EXECUTE stmt_resource_stmt;
DEALLOCATE PREPARE stmt_resource_stmt;
SET @stmt_resource_history = CONCAT('INSERT INTO `resource_history` (`guid`, `resource_version`, `group`, `resource`, `namespace`, `name`, `value`, `action`, `label_set`, `previous_resource_version`) VALUES ', stmt_resource_history);
PREPARE stmt_resource_history_stmt FROM @stmt_resource_history;
EXECUTE stmt_resource_history_stmt;
DEALLOCATE PREPARE stmt_resource_history_stmt;
END IF;
END //
DELIMITER ;
call InsertMillionDashboards();
insert into resource_version values ('dashboard.grafana.app', 'dashboards', 1730396628210501) ON DUPLICATE KEY UPDATE resource_version = 1730396628210501;

View File

@ -0,0 +1,104 @@
DROP PROCEDURE IF EXISTS InsertMillionPlaylists;
DELIMITER //
CREATE PROCEDURE InsertMillionPlaylists()
BEGIN
DECLARE i INT DEFAULT 0;
DECLARE new_guid CHAR(36);
DECLARE unique_name VARCHAR(20);
DECLARE batch_size INT DEFAULT 25;
DECLARE stmt_resource TEXT DEFAULT '';
DECLARE stmt_resource_history TEXT DEFAULT '';
DECLARE random_number INT;
WHILE i < 1000000 DO
-- Generate a unique GUID and unique name
SET new_guid = UUID();
SET unique_name = CONCAT('playlist', i);
SET @new_uid = CONCAT('playlist', i);
-- Generate a random number between 1 and 1000
SET random_number = FLOOR(1 + (RAND() * 1000));
SET @stack_namespace = CONCAT('stacks-', random_number); -- Store stack namespace in a variable
-- Append the value part of the SQL insert statement to both resource and history inserts
SET stmt_resource = CONCAT(stmt_resource,
'(', QUOTE(new_guid), ', ', QUOTE('1729715497301945'), ', ', QUOTE('playlist.grafana.app'), ', ', QUOTE('playlists'), ', ',
QUOTE(@stack_namespace), ', ', QUOTE(unique_name), ', ',
QUOTE(CONCAT(
'{\"kind\":\"Playlist\",\"apiVersion\":\"playlist.grafana.app/v0alpha1\",\"metadata\":{',
'\"name\":\"', unique_name, '\",\"namespace\":\"', @stack_namespace, '\",\"uid\":\"', @new_uid, '\",',
'\"resourceVersion\":\"1729715497301945\",\"creationTimestamp\":\"2024-10-05T02:17:49Z\",',
'\"annotations\":{\"grafana.app/createdBy\":\"user:u000000002\",\"grafana.app/originName\":\"SQL\",',
'\"grafana.app/originPath\":\"10182\",\"grafana.app/originTimestamp\":\"2024-10-05T02:17:49Z\",',
'\"grafana.app/updatedBy\":\"service-account:\",\"grafana.app/updatedTimestamp\":\"2024-10-23T21:00:21Z\"}},',
'\"spec\":{\"interval\":\"5m\",\"items\":[{\"type\":\"dashboard_by_uid\",\"value\":\"a6232629-98b3-42fa-91a4-579a43fbcda0\"},',
'{\"type\":\"dashboard_by_tag\",\"value\":\"tag1\"},{\"type\":\"dashboard_by_tag\",\"value\":\"tag2\"}],',
'\"title\":\"k6 test playlist create cp3f14j11tthck1\"},\"status\":{}}'
)),
', ', QUOTE('1'), ', NULL, ', QUOTE('0'), '), ');
SET stmt_resource_history = CONCAT(stmt_resource_history,
'(', QUOTE(new_guid), ', ', QUOTE('1729715497301945'), ', ', QUOTE('playlist.grafana.app'), ', ', QUOTE('playlists'), ', ',
QUOTE(@stack_namespace), ', ', QUOTE(unique_name), ', ',
QUOTE(CONCAT(
'{\"kind\":\"Playlist\",\"apiVersion\":\"playlist.grafana.app/v0alpha1\",\"metadata\":{',
'\"name\":\"', unique_name, '\",\"namespace\":\"', @stack_namespace, '\",\"uid\":\"', @new_uid, '\",',
'\"resourceVersion\":\"1729715497301945\",\"creationTimestamp\":\"2024-10-05T02:17:49Z\",',
'\"annotations\":{\"grafana.app/createdBy\":\"user:u000000002\",\"grafana.app/originName\":\"SQL\",',
'\"grafana.app/originPath\":\"10182\",\"grafana.app/originTimestamp\":\"2024-10-05T02:17:49Z\",',
'\"grafana.app/updatedBy\":\"service-account:\",\"grafana.app/updatedTimestamp\":\"2024-10-23T21:00:21Z\"}},',
'\"spec\":{\"interval\":\"5m\",\"items\":[{\"type\":\"dashboard_by_uid\",\"value\":\"a6232629-98b3-42fa-91a4-579a43fbcda0\"},',
'{\"type\":\"dashboard_by_tag\",\"value\":\"tag1\"},{\"type\":\"dashboard_by_tag\",\"value\":\"tag2\"}],',
'\"title\":\"k6 test playlist create cp3f14j11tthck1\"},\"status\":{}}'
)), ', ', QUOTE('1'), ', NULL, ', QUOTE('0'), '), ');
SET i = i + 1;
-- Execute statements in batches to avoid reaching the TEXT limit
IF i % batch_size = 0 THEN
-- Remove the last comma and space
SET stmt_resource = LEFT(stmt_resource, LENGTH(stmt_resource) - 2);
SET stmt_resource_history = LEFT(stmt_resource_history, LENGTH(stmt_resource_history) - 2);
-- Insert current batch into `resource`
SET @stmt_resource = CONCAT('INSERT INTO `resource` (`guid`, `resource_version`, `group`, `resource`, `namespace`, `name`, `value`, `action`, `label_set`, `previous_resource_version`) VALUES ', stmt_resource);
PREPARE stmt_resource_stmt FROM @stmt_resource;
EXECUTE stmt_resource_stmt;
DEALLOCATE PREPARE stmt_resource_stmt;
-- Insert current batch into `resource_history`
SET @stmt_resource_history = CONCAT('INSERT INTO `resource_history` (`guid`, `resource_version`, `group`, `resource`, `namespace`, `name`, `value`, `action`, `label_set`, `previous_resource_version`) VALUES ', stmt_resource_history);
PREPARE stmt_resource_history_stmt FROM @stmt_resource_history;
EXECUTE stmt_resource_history_stmt;
DEALLOCATE PREPARE stmt_resource_history_stmt;
-- Reset the batch for the next iteration
SET stmt_resource = '';
SET stmt_resource_history = '';
END IF;
END WHILE;
-- Insert any remaining records if they don't fill a full batch
IF stmt_resource != '' THEN
SET stmt_resource = LEFT(stmt_resource, LENGTH(stmt_resource) - 2);
SET stmt_resource_history = LEFT(stmt_resource_history, LENGTH(stmt_resource_history) - 2);
SET @stmt_resource = CONCAT('INSERT INTO `resource` (`guid`, `resource_version`, `group`, `resource`, `namespace`, `name`, `value`, `action`, `label_set`, `previous_resource_version`) VALUES ', stmt_resource);
PREPARE stmt_resource_stmt FROM @stmt_resource;
EXECUTE stmt_resource_stmt;
DEALLOCATE PREPARE stmt_resource_stmt;
SET @stmt_resource_history = CONCAT('INSERT INTO `resource_history` (`guid`, `resource_version`, `group`, `resource`, `namespace`, `name`, `value`, `action`, `label_set`, `previous_resource_version`) VALUES ', stmt_resource_history);
PREPARE stmt_resource_history_stmt FROM @stmt_resource_history;
EXECUTE stmt_resource_history_stmt;
DEALLOCATE PREPARE stmt_resource_history_stmt;
END IF;
END //
DELIMITER ;
call InsertMillionPlaylists();
insert into resource_version values ('playlist.grafana.app', 'playlists', 1729715497301945) ON DUPLICATE KEY UPDATE resource_version = 1729715497301945;

View File

@ -55,6 +55,9 @@ var (
Isolation: sql.LevelReadCommitted,
ReadOnly: true,
}
RepeatableRead = &sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
}
)
type sqlResourceRequest struct {

View File

@ -2,7 +2,6 @@ package sql
import (
"context"
"errors"
"os"
"strings"
@ -51,17 +50,6 @@ func NewResourceServer(ctx context.Context, db infraDB.DB, cfg *setting.Cfg, fea
if features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageSearch) {
opts.Index = resource.NewResourceIndexServer(cfg, tracer)
server, err := resource.NewResourceServer(opts)
if err != nil {
return nil, err
}
// initialze the search index
indexer, ok := server.(resource.ResourceIndexer)
if !ok {
return nil, errors.New("index server does not implement ResourceIndexer")
}
_, err = indexer.Index(ctx)
return server, err
}
if features.IsEnabledGlobally(featuremgmt.FlagKubernetesFolders) {
@ -75,5 +63,18 @@ func NewResourceServer(ctx context.Context, db infraDB.DB, cfg *setting.Cfg, fea
}
}
return resource.NewResourceServer(opts)
rs, err := resource.NewResourceServer(opts)
if err != nil {
return nil, err
}
// Initialize the indexer if one is configured
if opts.Index != nil {
_, err = rs.(resource.ResourceIndexer).Index(ctx)
if err != nil {
return nil, err
}
}
return rs, nil
}

View File

@ -77,7 +77,7 @@ func TestIntegrationIndexerSearch(t *testing.T) {
addResource(t, ctx, backend, "playlists", playlist1)
addResource(t, ctx, backend, "playlists", playlist2)
// initialze and build the search index
// initialize and build the search index
indexer, ok := server.(resource.ResourceIndexer)
if !ok {
t.Fatal("server does not implement ResourceIndexer")