mirror of
https://github.com/grafana/grafana.git
synced 2025-01-09 15:43:23 -06:00
196 lines
5.8 KiB
Go
196 lines
5.8 KiB
Go
package tempo
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
|
|
"google.golang.org/grpc/metadata"
|
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
|
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
|
"github.com/grafana/grafana/pkg/tsdb/tempo/kinds/dataquery"
|
|
"github.com/grafana/tempo/pkg/tempopb"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/codes"
|
|
)
|
|
|
|
const SearchPathPrefix = "search/"
|
|
|
|
type ExtendedResponse struct {
|
|
*tempopb.SearchResponse
|
|
State dataquery.SearchStreamingState
|
|
}
|
|
|
|
type StreamSender interface {
|
|
SendFrame(frame *data.Frame, include data.FrameInclude) error
|
|
SendJSON(data []byte) error
|
|
SendBytes(data []byte) error
|
|
}
|
|
|
|
func (s *Service) runSearchStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender, datasource *Datasource) error {
|
|
ctx, span := tracing.DefaultTracer().Start(ctx, "datasource.tempo.runSearchStream")
|
|
defer span.End()
|
|
|
|
response := &backend.DataResponse{}
|
|
|
|
var backendQuery *backend.DataQuery
|
|
err := json.Unmarshal(req.Data, &backendQuery)
|
|
if err != nil {
|
|
response.Error = fmt.Errorf("error unmarshaling backend query model: %v", err)
|
|
span.RecordError(response.Error)
|
|
span.SetStatus(codes.Error, response.Error.Error())
|
|
return err
|
|
}
|
|
|
|
var sr *tempopb.SearchRequest
|
|
err = json.Unmarshal(req.Data, &sr)
|
|
if err != nil {
|
|
response.Error = fmt.Errorf("error unmarshaling Tempo query model: %v", err)
|
|
span.RecordError(response.Error)
|
|
span.SetStatus(codes.Error, response.Error.Error())
|
|
return err
|
|
}
|
|
|
|
if sr.GetQuery() == "" {
|
|
return fmt.Errorf("query is empty")
|
|
}
|
|
|
|
sr.Start = uint32(backendQuery.TimeRange.From.Unix())
|
|
sr.End = uint32(backendQuery.TimeRange.To.Unix())
|
|
|
|
// Setting the user agent for the gRPC call. When DS is decoupled we don't recreate instance when grafana config
|
|
// changes or updates, so we have to get it from context.
|
|
// Ideally this would be pushed higher, so it's set once for all rpc calls, but we have only one now.
|
|
ctx = metadata.AppendToOutgoingContext(ctx, "User-Agent", backend.UserAgentFromContext(ctx).String())
|
|
|
|
stream, err := datasource.StreamingClient.Search(ctx, sr)
|
|
if err != nil {
|
|
span.RecordError(err)
|
|
span.SetStatus(codes.Error, err.Error())
|
|
s.logger.Error("Error Search()", "err", err)
|
|
return err
|
|
}
|
|
|
|
return s.processStream(ctx, stream, sender)
|
|
}
|
|
|
|
func (s *Service) processStream(ctx context.Context, stream tempopb.StreamingQuerier_SearchClient, sender StreamSender) error {
|
|
ctx, span := tracing.DefaultTracer().Start(ctx, "datasource.tempo.processStream")
|
|
defer span.End()
|
|
var traceList []*tempopb.TraceSearchMetadata
|
|
var metrics *tempopb.SearchMetrics
|
|
messageCount := 0
|
|
for {
|
|
msg, err := stream.Recv()
|
|
messageCount++
|
|
span.SetAttributes(attribute.Int("message_count", messageCount))
|
|
if errors.Is(err, io.EOF) {
|
|
if err := s.sendResponse(ctx, &ExtendedResponse{
|
|
State: dataquery.SearchStreamingStateDone,
|
|
SearchResponse: &tempopb.SearchResponse{
|
|
Metrics: metrics,
|
|
Traces: traceList,
|
|
},
|
|
}, sender); err != nil {
|
|
span.RecordError(err)
|
|
span.SetStatus(codes.Error, err.Error())
|
|
return err
|
|
}
|
|
break
|
|
}
|
|
if err != nil {
|
|
s.logger.Error("Error receiving message", "err", err)
|
|
span.RecordError(err)
|
|
span.SetStatus(codes.Error, err.Error())
|
|
return err
|
|
}
|
|
|
|
metrics = msg.Metrics
|
|
traceList = append(traceList, msg.Traces...)
|
|
traceList = removeDuplicates(traceList)
|
|
span.SetAttributes(attribute.Int("traces_count", len(traceList)))
|
|
|
|
if err := s.sendResponse(ctx, &ExtendedResponse{
|
|
State: dataquery.SearchStreamingStateStreaming,
|
|
SearchResponse: &tempopb.SearchResponse{
|
|
Metrics: metrics,
|
|
Traces: traceList,
|
|
},
|
|
}, sender); err != nil {
|
|
span.RecordError(err)
|
|
span.SetStatus(codes.Error, err.Error())
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) sendResponse(ctx context.Context, response *ExtendedResponse, sender StreamSender) error {
|
|
_, span := tracing.DefaultTracer().Start(ctx, "datasource.tempo.sendResponse")
|
|
defer span.End()
|
|
frame := createResponseDataFrame()
|
|
|
|
if response != nil {
|
|
span.SetAttributes(attribute.Int("trace_count", len(response.Traces)), attribute.String("state", string(response.State)))
|
|
|
|
tracesAsJson, err := json.Marshal(response.Traces)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
tracesRawMessage := json.RawMessage(tracesAsJson)
|
|
frame.Fields[0].Append(tracesRawMessage)
|
|
|
|
metricsAsJson, err := json.Marshal(response.Metrics)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
metricsRawMessage := json.RawMessage(metricsAsJson)
|
|
frame.Fields[1].Append(metricsRawMessage)
|
|
frame.Fields[2].Append(string(response.State))
|
|
frame.Fields[3].Append("")
|
|
}
|
|
|
|
return sender.SendFrame(frame, data.IncludeAll)
|
|
}
|
|
|
|
func sendError(searchErr error, sender StreamSender) error {
|
|
frame := createResponseDataFrame()
|
|
|
|
if searchErr != nil {
|
|
frame.Fields[0].Append(json.RawMessage{})
|
|
frame.Fields[1].Append(json.RawMessage{})
|
|
frame.Fields[2].Append(string(dataquery.SearchStreamingStateError))
|
|
frame.Fields[3].Append(searchErr.Error())
|
|
}
|
|
|
|
return sender.SendFrame(frame, data.IncludeAll)
|
|
}
|
|
|
|
func createResponseDataFrame() *data.Frame {
|
|
frame := data.NewFrame("response")
|
|
frame.Fields = append(frame.Fields, data.NewField("traces", nil, []json.RawMessage{}))
|
|
frame.Fields = append(frame.Fields, data.NewField("metrics", nil, []json.RawMessage{}))
|
|
frame.Fields = append(frame.Fields, data.NewField("state", nil, []string{}))
|
|
frame.Fields = append(frame.Fields, data.NewField("error", nil, []string{}))
|
|
|
|
return frame
|
|
}
|
|
|
|
func removeDuplicates(traceList []*tempopb.TraceSearchMetadata) []*tempopb.TraceSearchMetadata {
|
|
keys := make(map[string]bool)
|
|
var list []*tempopb.TraceSearchMetadata
|
|
|
|
for _, entry := range traceList {
|
|
if _, value := keys[entry.TraceID]; !value {
|
|
keys[entry.TraceID] = true
|
|
list = append(list, entry)
|
|
}
|
|
}
|
|
return list
|
|
}
|