mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Unified Storage Indexer: Dont create temp dir for indexDir (#95626)
* dont create temp dir for indexDir * uses logger from trace ctx so traceID is on logs
This commit is contained in:
parent
3c8bfb539d
commit
2b0a439ad3
@ -3,7 +3,6 @@ package resource
|
||||
import (
|
||||
"context"
|
||||
golog "log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
@ -40,10 +39,6 @@ type Index struct {
|
||||
}
|
||||
|
||||
func NewIndex(s *server, opts Opts, tracer tracing.Tracer) *Index {
|
||||
if opts.IndexDir == "" {
|
||||
opts.IndexDir = os.TempDir()
|
||||
}
|
||||
|
||||
idx := &Index{
|
||||
s: s,
|
||||
opts: opts,
|
||||
@ -135,17 +130,18 @@ func (i *Index) AddToBatches(ctx context.Context, list *ListResponse) ([]string,
|
||||
func (i *Index) Init(ctx context.Context) error {
|
||||
ctx, span := i.tracer.Start(ctx, tracingPrexfixIndex+"Init")
|
||||
defer span.End()
|
||||
logger := i.log.FromContext(ctx)
|
||||
|
||||
start := time.Now().Unix()
|
||||
resourceTypes := fetchResourceTypes()
|
||||
totalObjectsFetched := 0
|
||||
for _, rt := range resourceTypes {
|
||||
i.log.Info("indexing resource", "kind", rt.Key.Resource, "list_limit", i.opts.ListLimit, "batch_size", i.opts.BatchSize, "workers", i.opts.Workers)
|
||||
logger.Info("indexing resource", "kind", rt.Key.Resource, "list_limit", i.opts.ListLimit, "batch_size", i.opts.BatchSize, "workers", i.opts.Workers)
|
||||
r := &ListRequest{Options: rt, Limit: int64(i.opts.ListLimit)}
|
||||
|
||||
// Paginate through the list of resources and index each page
|
||||
for {
|
||||
i.log.Info("fetching resource list", "kind", rt.Key.Resource)
|
||||
logger.Info("fetching resource list", "kind", rt.Key.Resource)
|
||||
list, err := i.s.List(ctx, r)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -153,7 +149,7 @@ func (i *Index) Init(ctx context.Context) error {
|
||||
|
||||
totalObjectsFetched += len(list.Items)
|
||||
|
||||
i.log.Info("indexing batch", "kind", rt.Key.Resource, "count", len(list.Items))
|
||||
logger.Info("indexing batch", "kind", rt.Key.Resource, "count", len(list.Items))
|
||||
//add changes to batches for shards with changes in the List
|
||||
tenants, err := i.AddToBatches(ctx, list)
|
||||
if err != nil {
|
||||
@ -175,7 +171,7 @@ func (i *Index) Init(ctx context.Context) error {
|
||||
}
|
||||
|
||||
//index all remaining batches
|
||||
i.log.Info("indexing remaining batches", "shards", len(i.shards))
|
||||
logger.Info("indexing remaining batches", "shards", len(i.shards))
|
||||
err := i.IndexBatches(ctx, 1, i.allTenants())
|
||||
if err != nil {
|
||||
return err
|
||||
@ -183,7 +179,7 @@ func (i *Index) Init(ctx context.Context) error {
|
||||
|
||||
span.AddEvent("indexing finished", trace.WithAttributes(attribute.Int64("objects_indexed", int64(totalObjectsFetched))))
|
||||
end := time.Now().Unix()
|
||||
i.log.Info("Initial indexing finished", "seconds", float64(end-start))
|
||||
logger.Info("Initial indexing finished", "seconds", float64(end-start))
|
||||
if IndexServerMetrics != nil {
|
||||
IndexServerMetrics.IndexCreationTime.WithLabelValues().Observe(float64(end - start))
|
||||
}
|
||||
@ -192,8 +188,9 @@ func (i *Index) Init(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (i *Index) Index(ctx context.Context, data *Data) error {
|
||||
_, span := i.tracer.Start(ctx, tracingPrexfixIndex+"Index")
|
||||
ctx, span := i.tracer.Start(ctx, tracingPrexfixIndex+"Index")
|
||||
defer span.End()
|
||||
logger := i.log.FromContext(ctx)
|
||||
|
||||
// Transform the raw resource into a more generic indexable resource
|
||||
res, err := NewIndexedResource(data.Value.Value)
|
||||
@ -201,7 +198,7 @@ func (i *Index) Index(ctx context.Context, data *Data) error {
|
||||
return err
|
||||
}
|
||||
tenant := res.Namespace
|
||||
i.log.Debug("indexing resource for tenant", "res", string(data.Value.Value), "tenant", tenant)
|
||||
logger.Debug("indexing resource for tenant", "res", string(data.Value.Value), "tenant", tenant)
|
||||
shard, err := i.getShard(tenant)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -214,7 +211,7 @@ func (i *Index) Index(ctx context.Context, data *Data) error {
|
||||
// record latency from when event was created to when it was indexed
|
||||
latencySeconds := float64(time.Now().UnixMicro()-data.Value.ResourceVersion) / 1e6
|
||||
if latencySeconds > 5 {
|
||||
i.log.Warn("high index latency", "latency", latencySeconds)
|
||||
logger.Warn("high index latency", "latency", latencySeconds)
|
||||
}
|
||||
if IndexServerMetrics != nil {
|
||||
IndexServerMetrics.IndexLatency.WithLabelValues(data.Key.Resource).Observe(latencySeconds)
|
||||
@ -239,8 +236,9 @@ func (i *Index) Delete(ctx context.Context, uid string, key *ResourceKey) error
|
||||
}
|
||||
|
||||
func (i *Index) Search(ctx context.Context, tenant string, query string, limit int, offset int) ([]IndexedResource, error) {
|
||||
_, span := i.tracer.Start(ctx, tracingPrexfixIndex+"Search")
|
||||
ctx, span := i.tracer.Start(ctx, tracingPrexfixIndex+"Search")
|
||||
defer span.End()
|
||||
logger := i.log.FromContext(ctx)
|
||||
|
||||
if tenant == "" {
|
||||
tenant = "default"
|
||||
@ -253,10 +251,10 @@ func (i *Index) Search(ctx context.Context, tenant string, query string, limit i
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
i.log.Info("got index for tenant", "tenant", tenant, "docCount", docCount)
|
||||
logger.Info("got index for tenant", "tenant", tenant, "docCount", docCount)
|
||||
|
||||
fields, _ := shard.index.Fields()
|
||||
i.log.Debug("indexed fields", "fields", fields)
|
||||
logger.Debug("indexed fields", "fields", fields)
|
||||
|
||||
// use 10 as a default limit for now
|
||||
if limit <= 0 {
|
||||
@ -269,14 +267,14 @@ func (i *Index) Search(ctx context.Context, tenant string, query string, limit i
|
||||
|
||||
req.Fields = []string{"*"} // return all indexed fields in search results
|
||||
|
||||
i.log.Info("searching index", "query", query, "tenant", tenant)
|
||||
logger.Info("searching index", "query", query, "tenant", tenant)
|
||||
res, err := shard.index.Search(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
hits := res.Hits
|
||||
|
||||
i.log.Info("got search results", "hits", hits)
|
||||
logger.Info("got search results", "hits", hits)
|
||||
|
||||
results := make([]IndexedResource, len(hits))
|
||||
for resKey, hit := range hits {
|
||||
|
Loading…
Reference in New Issue
Block a user