Search PoC: Adds traces to indexer (#95372)

* adds traces to indexer

* fix linting errors
This commit is contained in:
owensmallwood
2024-10-24 15:05:33 -06:00
committed by GitHub
parent ecebaf1bdf
commit 5e3e7cca0e
4 changed files with 55 additions and 16 deletions

View File

@@ -10,8 +10,13 @@ import (
"github.com/blevesearch/bleve/v2"
"github.com/google/uuid"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
const tracingPrexfixIndex = "unified_storage.index."
type Shard struct {
index bleve.Index
path string
@@ -23,10 +28,11 @@ type Index struct {
opts Opts
s *server
log log.Logger
tracer tracing.Tracer
path string
}
func NewIndex(s *server, opts Opts, path string) *Index {
func NewIndex(s *server, opts Opts, path string, tracer tracing.Tracer) *Index {
if path == "" {
path = os.TempDir()
}
@@ -36,13 +42,15 @@ func NewIndex(s *server, opts Opts, path string) *Index {
opts: opts,
shards: make(map[string]Shard),
log: log.New("unifiedstorage.search.index"),
tracer: tracer,
path: path,
}
return idx
}
func (i *Index) IndexBatch(list *ListResponse, kind string) error {
func (i *Index) IndexBatch(ctx context.Context, list *ListResponse, kind string) error {
ctx, span := i.tracer.Start(ctx, tracingPrexfixIndex+"CreateIndexBatches")
for _, obj := range list.Items {
res, err := NewIndexedResource(obj.Value)
if err != nil {
@@ -66,7 +74,10 @@ func (i *Index) IndexBatch(list *ListResponse, kind string) error {
return err
}
}
span.End()
_, span = i.tracer.Start(ctx, tracingPrexfixIndex+"IndexBatches")
defer span.End()
for _, shard := range i.shards {
err := shard.index.Batch(shard.batch)
if err != nil {
@@ -79,9 +90,12 @@ func (i *Index) IndexBatch(list *ListResponse, kind string) error {
}
func (i *Index) Init(ctx context.Context) error {
start := time.Now().Unix()
ctx, span := i.tracer.Start(ctx, tracingPrexfixIndex+"Init")
defer span.End()
start := time.Now().Unix()
resourceTypes := fetchResourceTypes()
totalObjectsFetched := 0
for _, rt := range resourceTypes {
i.log.Info("indexing resource", "kind", rt.Key.Resource)
r := &ListRequest{Options: rt, Limit: 100}
@@ -94,8 +108,10 @@ func (i *Index) Init(ctx context.Context) error {
return err
}
totalObjectsFetched += len(list.Items)
// Index current page
err = i.IndexBatch(list, rt.Key.Resource)
err = i.IndexBatch(ctx, list, rt.Key.Resource)
if err != nil {
return err
}
@@ -107,9 +123,9 @@ func (i *Index) Init(ctx context.Context) error {
r.NextPageToken = list.NextPageToken
}
}
span.AddEvent("indexing finished", trace.WithAttributes(attribute.Int64("objects_indexed", int64(totalObjectsFetched))))
end := time.Now().Unix()
i.log.Debug("Initial indexing finished", "seconds", float64(end-start))
i.log.Info("Initial indexing finished", "seconds", float64(end-start))
if IndexServerMetrics != nil {
IndexServerMetrics.IndexCreationTime.WithLabelValues().Observe(float64(end - start))
}
@@ -118,6 +134,9 @@ func (i *Index) Init(ctx context.Context) error {
}
func (i *Index) Index(ctx context.Context, data *Data) error {
_, span := i.tracer.Start(ctx, tracingPrexfixIndex+"Index")
defer span.End()
// Transform the raw resource into a more generic indexable resource
res, err := NewIndexedResource(data.Value.Value)
if err != nil {
@@ -144,6 +163,9 @@ func (i *Index) Index(ctx context.Context, data *Data) error {
}
func (i *Index) Delete(ctx context.Context, uid string, key *ResourceKey) error {
_, span := i.tracer.Start(ctx, tracingPrexfixIndex+"Delete")
defer span.End()
shard, err := i.getShard(key.Namespace)
if err != nil {
return err
@@ -156,6 +178,9 @@ func (i *Index) Delete(ctx context.Context, uid string, key *ResourceKey) error
}
func (i *Index) Search(ctx context.Context, tenant string, query string, limit int, offset int) ([]IndexedResource, error) {
_, span := i.tracer.Start(ctx, tracingPrexfixIndex+"Search")
defer span.End()
if tenant == "" {
tenant = "default"
}

View File

@@ -6,6 +6,7 @@ import (
"errors"
"log/slog"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/setting"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
@@ -13,14 +14,20 @@ import (
type IndexServer struct {
ResourceServer
s *server
index *Index
ws *indexWatchServer
log *slog.Logger
cfg *setting.Cfg
s *server
index *Index
ws *indexWatchServer
log *slog.Logger
cfg *setting.Cfg
tracer tracing.Tracer
}
const tracingPrefixIndexServer = "unified_storage.index_server."
func (is *IndexServer) Search(ctx context.Context, req *SearchRequest) (*SearchResponse, error) {
ctx, span := is.tracer.Start(ctx, tracingPrefixIndexServer+"Search")
defer span.End()
results, err := is.index.Search(ctx, req.Tenant, req.Query, int(req.Limit), int(req.Offset))
if err != nil {
return nil, err
@@ -46,7 +53,10 @@ func (is *IndexServer) Origin(ctx context.Context, req *OriginRequest) (*OriginR
// Load the index
func (is *IndexServer) Load(ctx context.Context) error {
is.index = NewIndex(is.s, Opts{}, is.cfg.IndexPath)
ctx, span := is.tracer.Start(ctx, tracingPrefixIndexServer+"Load")
defer span.End()
is.index = NewIndex(is.s, Opts{}, is.cfg.IndexPath, is.tracer)
err := is.index.Init(ctx)
if err != nil {
return err
@@ -88,12 +98,13 @@ func (is *IndexServer) Init(ctx context.Context, rs *server) error {
return nil
}
func NewResourceIndexServer(cfg *setting.Cfg) ResourceIndexServer {
func NewResourceIndexServer(cfg *setting.Cfg, tracer tracing.Tracer) ResourceIndexServer {
logger := slog.Default().With("logger", "index-server")
indexServer := &IndexServer{
log: logger,
cfg: cfg,
log: logger,
cfg: cfg,
tracer: tracer,
}
err := prometheus.Register(NewIndexMetrics(cfg.IndexPath, indexServer))

View File

@@ -542,6 +542,9 @@ func (s *server) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, err
}
func (s *server) List(ctx context.Context, req *ListRequest) (*ListResponse, error) {
ctx, span := s.tracer.Start(ctx, "storage_server.List")
defer span.End()
if err := s.Init(ctx); err != nil {
return nil, err
}

View File

@@ -50,7 +50,7 @@ func NewResourceServer(ctx context.Context, db infraDB.DB, cfg *setting.Cfg, fea
opts.Lifecycle = store
if features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageSearch) {
opts.Index = resource.NewResourceIndexServer(cfg)
opts.Index = resource.NewResourceIndexServer(cfg, tracer)
server, err := resource.NewResourceServer(opts)
if err != nil {
return nil, err