diff --git a/pkg/plugins/manager/manager_integration_test.go b/pkg/plugins/manager/manager_integration_test.go index f2d0a7915f2..c14ae9036f5 100644 --- a/pkg/plugins/manager/manager_integration_test.go +++ b/pkg/plugins/manager/manager_integration_test.go @@ -103,7 +103,7 @@ func TestIntegrationPluginManager_Run(t *testing.T) { pg := postgres.ProvideService(cfg) my := mysql.ProvideService(cfg, hcp) ms := mssql.ProvideService(cfg) - sv2 := searchV2.ProvideService(cfg, sqlstore.InitTestDB(t), nil, nil, nil) + sv2 := searchV2.ProvideService(cfg, sqlstore.InitTestDB(t), nil, nil, tracing.InitializeTracerForTest(), featuremgmt.WithFeatures(), nil) graf := grafanads.ProvideService(cfg, sv2, nil) coreRegistry := coreplugin.ProvideCoreRegistry(am, cw, cm, es, grap, idb, lk, otsdb, pr, tmpo, td, pg, my, ms, graf) diff --git a/pkg/services/searchV2/allowed_actions_test.go b/pkg/services/searchV2/allowed_actions_test.go index aff80b31ac6..b65b83f8d76 100644 --- a/pkg/services/searchV2/allowed_actions_test.go +++ b/pkg/services/searchV2/allowed_actions_test.go @@ -9,10 +9,12 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana-plugin-sdk-go/experimental" + "github.com/grafana/grafana/pkg/infra/tracing" ac "github.com/grafana/grafana/pkg/services/accesscontrol" accesscontrolmock "github.com/grafana/grafana/pkg/services/accesscontrol/mock" "github.com/grafana/grafana/pkg/services/dashboards" "github.com/grafana/grafana/pkg/services/datasources" + "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/user" "github.com/stretchr/testify/require" ) @@ -80,7 +82,7 @@ var ( ) func service(t *testing.T) *StandardSearchService { - service, ok := ProvideService(nil, nil, nil, accesscontrolmock.New(), nil).(*StandardSearchService) + service, ok := ProvideService(nil, nil, nil, accesscontrolmock.New(), tracing.InitializeTracerForTest(), featuremgmt.WithFeatures(), nil).(*StandardSearchService) require.True(t, ok) return service } diff --git a/pkg/services/searchV2/index.go b/pkg/services/searchV2/index.go index 4126956d9e1..fa91635b0c2 100644 --- a/pkg/services/searchV2/index.go +++ b/pkg/services/searchV2/index.go @@ -14,10 +14,13 @@ import ( "time" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/tracing" + "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/searchV2/dslookup" "github.com/grafana/grafana/pkg/services/searchV2/extract" "github.com/grafana/grafana/pkg/services/sqlstore" "github.com/grafana/grafana/pkg/services/store" + "go.opentelemetry.io/otel/attribute" "github.com/blugelabs/bluge" ) @@ -93,9 +96,11 @@ type searchIndex struct { extender DocumentExtender folderIdLookup folderUIDLookup syncCh chan chan struct{} + tracer tracing.Tracer + features featuremgmt.FeatureToggles } -func newSearchIndex(dashLoader dashboardLoader, evStore eventStore, extender DocumentExtender, folderIDs folderUIDLookup) *searchIndex { +func newSearchIndex(dashLoader dashboardLoader, evStore eventStore, extender DocumentExtender, folderIDs folderUIDLookup, tracer tracing.Tracer, features featuremgmt.FeatureToggles) *searchIndex { return &searchIndex{ loader: dashLoader, eventStore: evStore, @@ -106,6 +111,8 @@ func newSearchIndex(dashLoader dashboardLoader, evStore eventStore, extender Doc extender: extender, folderIdLookup: folderIDs, syncCh: make(chan chan struct{}), + tracer: tracer, + features: features, } } @@ -209,17 +216,22 @@ func (i *searchIndex) run(ctx context.Context, orgIDs []int64, reIndexSignalCh c close(doneCh) case <-partialUpdateTimer.C: // Periodically apply updates collected in entity events table. - lastEventID = i.applyIndexUpdates(ctx, lastEventID) + partialIndexUpdateCtx, span := i.tracer.Start(ctx, "searchV2 partial update timer") + lastEventID = i.applyIndexUpdates(partialIndexUpdateCtx, lastEventID) + span.End() partialUpdateTimer.Reset(partialUpdateInterval) case <-reIndexSignalCh: // External systems may trigger re-indexing, at this moment provisioning does this. i.logger.Info("Full re-indexing due to external signal") fullReIndexTimer.Reset(0) case signal := <-i.buildSignals: + buildSignalCtx, span := i.tracer.Start(ctx, "searchV2 build signal") + // When search read request meets new not-indexed org we build index for it. i.mu.RLock() _, ok := i.perOrgIndex[signal.orgID] if ok { + span.End() // Index for org already exists, do nothing. i.mu.RUnlock() close(signal.done) @@ -232,14 +244,17 @@ func (i *searchIndex) run(ctx context.Context, orgIDs []int64, reIndexSignalCh c // branch. fullReIndexTimer.Stop() go func() { + defer span.End() // We need semaphore here since asynchronous re-indexing may be in progress already. asyncReIndexSemaphore <- struct{}{} defer func() { <-asyncReIndexSemaphore }() - _, err = i.buildOrgIndex(ctx, signal.orgID) + _, err = i.buildOrgIndex(buildSignalCtx, signal.orgID) signal.done <- err reIndexDoneCh <- lastIndexedEventID }() case <-fullReIndexTimer.C: + fullReindexCtx, span := i.tracer.Start(ctx, "searchV2 full reindex timer") + // Periodically rebuild indexes since we could miss updates. At this moment we are issuing // entity events non-atomically (outside of transaction) and do not cover all possible dashboard // change places, so periodic re-indexing fixes possibly broken state. But ideally we should @@ -247,6 +262,7 @@ func (i *searchIndex) run(ctx context.Context, orgIDs []int64, reIndexSignalCh c // is to use DB triggers, see https://github.com/grafana/grafana/pull/47712. lastIndexedEventID := lastEventID go func() { + defer span.End() // Do full re-index asynchronously to avoid blocking index synchronization // on read for a long time. @@ -255,9 +271,9 @@ func (i *searchIndex) run(ctx context.Context, orgIDs []int64, reIndexSignalCh c defer func() { <-asyncReIndexSemaphore }() started := time.Now() - i.logger.Info("Start re-indexing") - i.reIndexFromScratch(ctx) - i.logger.Info("Full re-indexing finished", "fullReIndexElapsed", time.Since(started)) + i.logger.Info("Start re-indexing", i.withCtxData(fullReindexCtx)...) + i.reIndexFromScratch(fullReindexCtx) + i.logger.Info("Full re-indexing finished", i.withCtxData(fullReindexCtx, "fullReIndexElapsed", time.Since(started))...) reIndexDoneCh <- lastIndexedEventID }() case lastIndexedEventID := <-reIndexDoneCh: @@ -439,6 +455,8 @@ func (i *searchIndex) reportSizeOfIndexDiskBackup(orgID int64) { func (i *searchIndex) buildOrgIndex(ctx context.Context, orgID int64) (int, error) { started := time.Now() ctx, cancel := context.WithTimeout(ctx, time.Minute) + ctx = log.InitCounter(ctx) + defer cancel() i.logger.Info("Start building org index", "orgId", orgID) @@ -450,7 +468,14 @@ func (i *searchIndex) buildOrgIndex(ctx context.Context, orgID int64) (int, erro i.logger.Info("Finish loading org dashboards", "elapsed", orgSearchIndexLoadTime, "orgId", orgID) dashboardExtender := i.extender.GetDashboardExtender(orgID) + + _, initOrgIndexSpan := i.tracer.Start(ctx, "searchV2 init org index") + initOrgIndexSpan.SetAttributes("org_id", orgID, attribute.Key("org_id").Int64(orgID)) + index, err := initOrgIndex(dashboards, i.logger, dashboardExtender) + + initOrgIndexSpan.End() + if err != nil { return 0, fmt.Errorf("error initializing index: %w", err) } @@ -458,11 +483,11 @@ func (i *searchIndex) buildOrgIndex(ctx context.Context, orgID int64) (int, erro orgSearchIndexBuildTime := orgSearchIndexTotalTime - orgSearchIndexLoadTime i.logger.Info("Re-indexed dashboards for organization", - "orgId", orgID, - "orgSearchIndexLoadTime", orgSearchIndexLoadTime, - "orgSearchIndexBuildTime", orgSearchIndexBuildTime, - "orgSearchIndexTotalTime", orgSearchIndexTotalTime, - "orgSearchDashboardCount", len(dashboards)) + i.withCtxData(ctx, "orgId", orgID, + "orgSearchIndexLoadTime", orgSearchIndexLoadTime, + "orgSearchIndexBuildTime", orgSearchIndexBuildTime, + "orgSearchIndexTotalTime", orgSearchIndexTotalTime, + "orgSearchDashboardCount", len(dashboards))...) i.mu.Lock() if oldIndex, ok := i.perOrgIndex[orgID]; ok { @@ -539,8 +564,22 @@ func (i *searchIndex) reIndexFromScratch(ctx context.Context) { } } +func (i *searchIndex) withCtxData(ctx context.Context, params ...interface{}) []interface{} { + traceID := tracing.TraceIDFromContext(ctx, false) + if traceID != "" { + params = append(params, "traceID", traceID) + } + + if i.features.IsEnabled(featuremgmt.FlagDatabaseMetrics) { + params = append(params, "db_call_count", log.TotalDBCallCount(ctx)) + } + + return params +} + func (i *searchIndex) applyIndexUpdates(ctx context.Context, lastEventID int64) int64 { - events, err := i.eventStore.GetAllEventsAfter(context.Background(), lastEventID) + ctx = log.InitCounter(ctx) + events, err := i.eventStore.GetAllEventsAfter(ctx, lastEventID) if err != nil { i.logger.Error("can't load events", "error", err) return lastEventID @@ -557,7 +596,7 @@ func (i *searchIndex) applyIndexUpdates(ctx context.Context, lastEventID int64) } lastEventID = e.Id } - i.logger.Info("Index updates applied", "indexEventsAppliedElapsed", time.Since(started), "numEvents", len(events)) + i.logger.Info("Index updates applied", i.withCtxData(ctx, "indexEventsAppliedElapsed", time.Since(started), "numEvents", len(events))...) return lastEventID } diff --git a/pkg/services/searchV2/index_test.go b/pkg/services/searchV2/index_test.go index 07960520962..70f5c45d060 100644 --- a/pkg/services/searchV2/index_test.go +++ b/pkg/services/searchV2/index_test.go @@ -8,6 +8,8 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/grafana/grafana/pkg/infra/tracing" + "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/services/searchV2/extract" @@ -60,11 +62,7 @@ func initTestIndexFromDashesExtended(t *testing.T, dashboards []dashboard, exten dashboardLoader := &testDashboardLoader{ dashboards: dashboards, } - index := newSearchIndex( - dashboardLoader, - &store.MockEntityEventsService{}, - extender, - func(ctx context.Context, folderId int64) (string, error) { return "x", nil }) + index := newSearchIndex(dashboardLoader, &store.MockEntityEventsService{}, extender, func(ctx context.Context, folderId int64) (string, error) { return "x", nil }, tracing.InitializeTracerForTest(), featuremgmt.WithFeatures()) require.NotNil(t, index) numDashboards, err := index.buildOrgIndex(context.Background(), testOrgID) require.NoError(t, err) diff --git a/pkg/services/searchV2/service.go b/pkg/services/searchV2/service.go index 457e0921dd5..08f8a2ef03f 100644 --- a/pkg/services/searchV2/service.go +++ b/pkg/services/searchV2/service.go @@ -7,6 +7,7 @@ import ( "time" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/registry" "github.com/grafana/grafana/pkg/services/accesscontrol" @@ -69,8 +70,7 @@ func (s *StandardSearchService) IsReady(ctx context.Context, orgId int64) IsSear return s.dashboardIndex.isInitialized(ctx, orgId) } -func ProvideService(cfg *setting.Cfg, sql *sqlstore.SQLStore, entityEventStore store.EntityEventsService, - ac accesscontrol.Service, orgService org.Service) SearchService { +func ProvideService(cfg *setting.Cfg, sql *sqlstore.SQLStore, entityEventStore store.EntityEventsService, ac accesscontrol.Service, tracer tracing.Tracer, features featuremgmt.FeatureToggles, orgService org.Service) SearchService { extender := &NoopExtender{} s := &StandardSearchService{ cfg: cfg, @@ -85,6 +85,8 @@ func ProvideService(cfg *setting.Cfg, sql *sqlstore.SQLStore, entityEventStore s entityEventStore, extender.GetDocumentExtender(), newFolderIDLookup(sql), + tracer, + features, ), logger: log.New("searchV2"), extender: extender,