grafana/pkg/tsdb/tempo/search_stream.go
Andre Pereira c1709c9301
Tempo: TraceQL query response streaming (#69212)
* Refactor Tempo datasource backend to support multiple queryData types.
Added traceId query type that is set when performing the request but doesn't map to a tab.

* WIP data is reaching the frontend

* WIP

* Use channels and goroutines

* Some fixes

* Simplify backend code.
Return traces, metrics, state and error in a dataframe.
Shared state type between FE and BE.
Use getStream() instead of getQueryData()

* Handle errors in frontend

* Update Tempo and use same URL for RPC and HTTP

* Cleanup backend code

* Merge main

* Create grpc client only with host and authenticate

* Create grpc client only with host and authenticate

* Cleanup

* Add streaming to TraceQL Search tab

* Fix merge conflicts

* Added tests for processStream

* make gen-cue

* make gen-cue

* goimports

* lint

* Cleanup go.mod

* Comments

* Addressing PR comments

* Fix streaming for tracel search tab

* Added streaming kill switch as the disableTraceQLStreaming feature toggle

* Small comment

* Fix conflicts

* Correctly capture and send all errors as a DF to client

* Fix infinite error loop

* Fix merge conflicts

* Fix test

* Update deprecated import

* Fix feature toggles gen

* Fix merge conflicts
2023-07-14 15:10:46 +01:00

161 lines
4.2 KiB
Go

package tempo
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/tsdb/tempo/kinds/dataquery"
"github.com/grafana/tempo/pkg/tempopb"
)
const SearchPathPrefix = "search/"
type ExtendedResponse struct {
*tempopb.SearchResponse
State dataquery.SearchStreamingState
}
type StreamSender interface {
SendFrame(frame *data.Frame, include data.FrameInclude) error
SendJSON(data []byte) error
SendBytes(data []byte) error
}
func (s *Service) runSearchStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender, datasource *Datasource) error {
response := &backend.DataResponse{}
var backendQuery *backend.DataQuery
err := json.Unmarshal(req.Data, &backendQuery)
if err != nil {
response.Error = fmt.Errorf("error unmarshaling backend query model: %v", err)
return err
}
var sr *tempopb.SearchRequest
err = json.Unmarshal(req.Data, &sr)
if err != nil {
response.Error = fmt.Errorf("error unmarshaling Tempo query model: %v", err)
return err
}
if sr.GetQuery() == "" {
return fmt.Errorf("query is empty")
}
sr.Start = uint32(backendQuery.TimeRange.From.Unix())
sr.End = uint32(backendQuery.TimeRange.To.Unix())
stream, err := datasource.StreamingClient.Search(ctx, sr)
if err != nil {
s.logger.Error("Error Search()", "err", err)
return err
}
return s.processStream(stream, sender)
}
func (s *Service) processStream(stream tempopb.StreamingQuerier_SearchClient, sender StreamSender) error {
var traceList []*tempopb.TraceSearchMetadata
var metrics *tempopb.SearchMetrics
for {
msg, err := stream.Recv()
if errors.Is(err, io.EOF) {
if err := sendResponse(&ExtendedResponse{
State: dataquery.SearchStreamingStateDone,
SearchResponse: &tempopb.SearchResponse{
Metrics: metrics,
Traces: traceList,
},
}, sender); err != nil {
return err
}
break
}
if err != nil {
s.logger.Error("Error receiving message", "err", err)
return err
}
metrics = msg.Metrics
traceList = append(traceList, msg.Traces...)
traceList = removeDuplicates(traceList)
if err := sendResponse(&ExtendedResponse{
State: dataquery.SearchStreamingStateStreaming,
SearchResponse: &tempopb.SearchResponse{
Metrics: metrics,
Traces: traceList,
},
}, sender); err != nil {
return err
}
}
return nil
}
func sendResponse(response *ExtendedResponse, sender StreamSender) error {
frame := createResponseDataFrame()
if response != nil {
tracesAsJson, err := json.Marshal(response.Traces)
if err != nil {
return err
}
tracesRawMessage := json.RawMessage(tracesAsJson)
frame.Fields[0].Append(tracesRawMessage)
metricsAsJson, err := json.Marshal(response.Metrics)
if err != nil {
return err
}
metricsRawMessage := json.RawMessage(metricsAsJson)
frame.Fields[1].Append(metricsRawMessage)
frame.Fields[2].Append(string(response.State))
frame.Fields[3].Append("")
}
return sender.SendFrame(frame, data.IncludeAll)
}
func sendError(searchErr error, sender StreamSender) error {
frame := createResponseDataFrame()
if searchErr != nil {
frame.Fields[0].Append(json.RawMessage{})
frame.Fields[1].Append(json.RawMessage{})
frame.Fields[2].Append(string(dataquery.SearchStreamingStateError))
frame.Fields[3].Append(searchErr.Error())
}
return sender.SendFrame(frame, data.IncludeAll)
}
func createResponseDataFrame() *data.Frame {
frame := data.NewFrame("response")
frame.Fields = append(frame.Fields, data.NewField("traces", nil, []json.RawMessage{}))
frame.Fields = append(frame.Fields, data.NewField("metrics", nil, []json.RawMessage{}))
frame.Fields = append(frame.Fields, data.NewField("state", nil, []string{}))
frame.Fields = append(frame.Fields, data.NewField("error", nil, []string{}))
return frame
}
func removeDuplicates(traceList []*tempopb.TraceSearchMetadata) []*tempopb.TraceSearchMetadata {
keys := make(map[string]bool)
var list []*tempopb.TraceSearchMetadata
for _, entry := range traceList {
if _, value := keys[entry.TraceID]; !value {
keys[entry.TraceID] = true
list = append(list, entry)
}
}
return list
}