SearchV2: add tracing to background jobs (#55250)

* searchv2: add tracing to background jobs

* searchv2: lint

* searchv2: lint

* searchv2: fix context passing

* searchv2: add init org index span

* searchv2: add traceid to logs

* searchv2: add db count to logs
This commit is contained in:
Artur Wierzbicki 2022-09-20 16:49:44 +02:00 committed by GitHub
parent a14bd4055f
commit 03af63d52e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 63 additions and 22 deletions

View File

@ -103,7 +103,7 @@ func TestIntegrationPluginManager_Run(t *testing.T) {
pg := postgres.ProvideService(cfg)
my := mysql.ProvideService(cfg, hcp)
ms := mssql.ProvideService(cfg)
sv2 := searchV2.ProvideService(cfg, sqlstore.InitTestDB(t), nil, nil, nil)
sv2 := searchV2.ProvideService(cfg, sqlstore.InitTestDB(t), nil, nil, tracing.InitializeTracerForTest(), featuremgmt.WithFeatures(), nil)
graf := grafanads.ProvideService(cfg, sv2, nil)
coreRegistry := coreplugin.ProvideCoreRegistry(am, cw, cm, es, grap, idb, lk, otsdb, pr, tmpo, td, pg, my, ms, graf)

View File

@ -9,10 +9,12 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/experimental"
"github.com/grafana/grafana/pkg/infra/tracing"
ac "github.com/grafana/grafana/pkg/services/accesscontrol"
accesscontrolmock "github.com/grafana/grafana/pkg/services/accesscontrol/mock"
"github.com/grafana/grafana/pkg/services/dashboards"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/user"
"github.com/stretchr/testify/require"
)
@ -80,7 +82,7 @@ var (
)
func service(t *testing.T) *StandardSearchService {
service, ok := ProvideService(nil, nil, nil, accesscontrolmock.New(), nil).(*StandardSearchService)
service, ok := ProvideService(nil, nil, nil, accesscontrolmock.New(), tracing.InitializeTracerForTest(), featuremgmt.WithFeatures(), nil).(*StandardSearchService)
require.True(t, ok)
return service
}

View File

@ -14,10 +14,13 @@ import (
"time"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/searchV2/dslookup"
"github.com/grafana/grafana/pkg/services/searchV2/extract"
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/services/store"
"go.opentelemetry.io/otel/attribute"
"github.com/blugelabs/bluge"
)
@ -93,9 +96,11 @@ type searchIndex struct {
extender DocumentExtender
folderIdLookup folderUIDLookup
syncCh chan chan struct{}
tracer tracing.Tracer
features featuremgmt.FeatureToggles
}
func newSearchIndex(dashLoader dashboardLoader, evStore eventStore, extender DocumentExtender, folderIDs folderUIDLookup) *searchIndex {
func newSearchIndex(dashLoader dashboardLoader, evStore eventStore, extender DocumentExtender, folderIDs folderUIDLookup, tracer tracing.Tracer, features featuremgmt.FeatureToggles) *searchIndex {
return &searchIndex{
loader: dashLoader,
eventStore: evStore,
@ -106,6 +111,8 @@ func newSearchIndex(dashLoader dashboardLoader, evStore eventStore, extender Doc
extender: extender,
folderIdLookup: folderIDs,
syncCh: make(chan chan struct{}),
tracer: tracer,
features: features,
}
}
@ -209,17 +216,22 @@ func (i *searchIndex) run(ctx context.Context, orgIDs []int64, reIndexSignalCh c
close(doneCh)
case <-partialUpdateTimer.C:
// Periodically apply updates collected in entity events table.
lastEventID = i.applyIndexUpdates(ctx, lastEventID)
partialIndexUpdateCtx, span := i.tracer.Start(ctx, "searchV2 partial update timer")
lastEventID = i.applyIndexUpdates(partialIndexUpdateCtx, lastEventID)
span.End()
partialUpdateTimer.Reset(partialUpdateInterval)
case <-reIndexSignalCh:
// External systems may trigger re-indexing, at this moment provisioning does this.
i.logger.Info("Full re-indexing due to external signal")
fullReIndexTimer.Reset(0)
case signal := <-i.buildSignals:
buildSignalCtx, span := i.tracer.Start(ctx, "searchV2 build signal")
// When search read request meets new not-indexed org we build index for it.
i.mu.RLock()
_, ok := i.perOrgIndex[signal.orgID]
if ok {
span.End()
// Index for org already exists, do nothing.
i.mu.RUnlock()
close(signal.done)
@ -232,14 +244,17 @@ func (i *searchIndex) run(ctx context.Context, orgIDs []int64, reIndexSignalCh c
// branch.
fullReIndexTimer.Stop()
go func() {
defer span.End()
// We need semaphore here since asynchronous re-indexing may be in progress already.
asyncReIndexSemaphore <- struct{}{}
defer func() { <-asyncReIndexSemaphore }()
_, err = i.buildOrgIndex(ctx, signal.orgID)
_, err = i.buildOrgIndex(buildSignalCtx, signal.orgID)
signal.done <- err
reIndexDoneCh <- lastIndexedEventID
}()
case <-fullReIndexTimer.C:
fullReindexCtx, span := i.tracer.Start(ctx, "searchV2 full reindex timer")
// Periodically rebuild indexes since we could miss updates. At this moment we are issuing
// entity events non-atomically (outside of transaction) and do not cover all possible dashboard
// change places, so periodic re-indexing fixes possibly broken state. But ideally we should
@ -247,6 +262,7 @@ func (i *searchIndex) run(ctx context.Context, orgIDs []int64, reIndexSignalCh c
// is to use DB triggers, see https://github.com/grafana/grafana/pull/47712.
lastIndexedEventID := lastEventID
go func() {
defer span.End()
// Do full re-index asynchronously to avoid blocking index synchronization
// on read for a long time.
@ -255,9 +271,9 @@ func (i *searchIndex) run(ctx context.Context, orgIDs []int64, reIndexSignalCh c
defer func() { <-asyncReIndexSemaphore }()
started := time.Now()
i.logger.Info("Start re-indexing")
i.reIndexFromScratch(ctx)
i.logger.Info("Full re-indexing finished", "fullReIndexElapsed", time.Since(started))
i.logger.Info("Start re-indexing", i.withCtxData(fullReindexCtx)...)
i.reIndexFromScratch(fullReindexCtx)
i.logger.Info("Full re-indexing finished", i.withCtxData(fullReindexCtx, "fullReIndexElapsed", time.Since(started))...)
reIndexDoneCh <- lastIndexedEventID
}()
case lastIndexedEventID := <-reIndexDoneCh:
@ -439,6 +455,8 @@ func (i *searchIndex) reportSizeOfIndexDiskBackup(orgID int64) {
func (i *searchIndex) buildOrgIndex(ctx context.Context, orgID int64) (int, error) {
started := time.Now()
ctx, cancel := context.WithTimeout(ctx, time.Minute)
ctx = log.InitCounter(ctx)
defer cancel()
i.logger.Info("Start building org index", "orgId", orgID)
@ -450,7 +468,14 @@ func (i *searchIndex) buildOrgIndex(ctx context.Context, orgID int64) (int, erro
i.logger.Info("Finish loading org dashboards", "elapsed", orgSearchIndexLoadTime, "orgId", orgID)
dashboardExtender := i.extender.GetDashboardExtender(orgID)
_, initOrgIndexSpan := i.tracer.Start(ctx, "searchV2 init org index")
initOrgIndexSpan.SetAttributes("org_id", orgID, attribute.Key("org_id").Int64(orgID))
index, err := initOrgIndex(dashboards, i.logger, dashboardExtender)
initOrgIndexSpan.End()
if err != nil {
return 0, fmt.Errorf("error initializing index: %w", err)
}
@ -458,11 +483,11 @@ func (i *searchIndex) buildOrgIndex(ctx context.Context, orgID int64) (int, erro
orgSearchIndexBuildTime := orgSearchIndexTotalTime - orgSearchIndexLoadTime
i.logger.Info("Re-indexed dashboards for organization",
"orgId", orgID,
i.withCtxData(ctx, "orgId", orgID,
"orgSearchIndexLoadTime", orgSearchIndexLoadTime,
"orgSearchIndexBuildTime", orgSearchIndexBuildTime,
"orgSearchIndexTotalTime", orgSearchIndexTotalTime,
"orgSearchDashboardCount", len(dashboards))
"orgSearchDashboardCount", len(dashboards))...)
i.mu.Lock()
if oldIndex, ok := i.perOrgIndex[orgID]; ok {
@ -539,8 +564,22 @@ func (i *searchIndex) reIndexFromScratch(ctx context.Context) {
}
}
func (i *searchIndex) withCtxData(ctx context.Context, params ...interface{}) []interface{} {
traceID := tracing.TraceIDFromContext(ctx, false)
if traceID != "" {
params = append(params, "traceID", traceID)
}
if i.features.IsEnabled(featuremgmt.FlagDatabaseMetrics) {
params = append(params, "db_call_count", log.TotalDBCallCount(ctx))
}
return params
}
func (i *searchIndex) applyIndexUpdates(ctx context.Context, lastEventID int64) int64 {
events, err := i.eventStore.GetAllEventsAfter(context.Background(), lastEventID)
ctx = log.InitCounter(ctx)
events, err := i.eventStore.GetAllEventsAfter(ctx, lastEventID)
if err != nil {
i.logger.Error("can't load events", "error", err)
return lastEventID
@ -557,7 +596,7 @@ func (i *searchIndex) applyIndexUpdates(ctx context.Context, lastEventID int64)
}
lastEventID = e.Id
}
i.logger.Info("Index updates applied", "indexEventsAppliedElapsed", time.Since(started), "numEvents", len(events))
i.logger.Info("Index updates applied", i.withCtxData(ctx, "indexEventsAppliedElapsed", time.Since(started), "numEvents", len(events))...)
return lastEventID
}

View File

@ -8,6 +8,8 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/searchV2/extract"
@ -60,11 +62,7 @@ func initTestIndexFromDashesExtended(t *testing.T, dashboards []dashboard, exten
dashboardLoader := &testDashboardLoader{
dashboards: dashboards,
}
index := newSearchIndex(
dashboardLoader,
&store.MockEntityEventsService{},
extender,
func(ctx context.Context, folderId int64) (string, error) { return "x", nil })
index := newSearchIndex(dashboardLoader, &store.MockEntityEventsService{}, extender, func(ctx context.Context, folderId int64) (string, error) { return "x", nil }, tracing.InitializeTracerForTest(), featuremgmt.WithFeatures())
require.NotNil(t, index)
numDashboards, err := index.buildOrgIndex(context.Background(), testOrgID)
require.NoError(t, err)

View File

@ -7,6 +7,7 @@ import (
"time"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/services/accesscontrol"
@ -69,8 +70,7 @@ func (s *StandardSearchService) IsReady(ctx context.Context, orgId int64) IsSear
return s.dashboardIndex.isInitialized(ctx, orgId)
}
func ProvideService(cfg *setting.Cfg, sql *sqlstore.SQLStore, entityEventStore store.EntityEventsService,
ac accesscontrol.Service, orgService org.Service) SearchService {
func ProvideService(cfg *setting.Cfg, sql *sqlstore.SQLStore, entityEventStore store.EntityEventsService, ac accesscontrol.Service, tracer tracing.Tracer, features featuremgmt.FeatureToggles, orgService org.Service) SearchService {
extender := &NoopExtender{}
s := &StandardSearchService{
cfg: cfg,
@ -85,6 +85,8 @@ func ProvideService(cfg *setting.Cfg, sql *sqlstore.SQLStore, entityEventStore s
entityEventStore,
extender.GetDocumentExtender(),
newFolderIDLookup(sql),
tracer,
features,
),
logger: log.New("searchV2"),
extender: extender,