mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Chore: Refactor tempo using SDK contracts (#37440)
* Refactor tempo using SDK contracts * Remove commented code * Removing simplejson use * Unmarshal JSON
This commit is contained in:
@@ -17,7 +17,6 @@ import (
|
|||||||
"github.com/grafana/grafana/pkg/tsdb/mysql"
|
"github.com/grafana/grafana/pkg/tsdb/mysql"
|
||||||
"github.com/grafana/grafana/pkg/tsdb/postgres"
|
"github.com/grafana/grafana/pkg/tsdb/postgres"
|
||||||
"github.com/grafana/grafana/pkg/tsdb/prometheus"
|
"github.com/grafana/grafana/pkg/tsdb/prometheus"
|
||||||
"github.com/grafana/grafana/pkg/tsdb/tempo"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewService returns a new Service.
|
// NewService returns a new Service.
|
||||||
@@ -58,7 +57,6 @@ func (s *Service) Init() error {
|
|||||||
s.registry["postgres"] = s.PostgresService.NewExecutor
|
s.registry["postgres"] = s.PostgresService.NewExecutor
|
||||||
s.registry["mysql"] = mysql.New(s.HTTPClientProvider)
|
s.registry["mysql"] = mysql.New(s.HTTPClientProvider)
|
||||||
s.registry["stackdriver"] = s.CloudMonitoringService.NewExecutor
|
s.registry["stackdriver"] = s.CloudMonitoringService.NewExecutor
|
||||||
s.registry["tempo"] = tempo.New(s.HTTPClientProvider)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2,58 +2,110 @@ package tempo
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||||
|
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
|
||||||
|
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||||
"github.com/grafana/grafana/pkg/infra/httpclient"
|
"github.com/grafana/grafana/pkg/infra/httpclient"
|
||||||
"github.com/grafana/grafana/pkg/infra/log"
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
"github.com/grafana/grafana/pkg/models"
|
"github.com/grafana/grafana/pkg/plugins/backendplugin"
|
||||||
"github.com/grafana/grafana/pkg/plugins"
|
"github.com/grafana/grafana/pkg/plugins/backendplugin/coreplugin"
|
||||||
|
"github.com/grafana/grafana/pkg/registry"
|
||||||
|
|
||||||
otlp "go.opentelemetry.io/collector/model/otlp"
|
otlp "go.opentelemetry.io/collector/model/otlp"
|
||||||
)
|
)
|
||||||
|
|
||||||
type tempoExecutor struct {
|
type Service struct {
|
||||||
httpClient *http.Client
|
HTTPClientProvider httpclient.Provider `inject:""`
|
||||||
|
BackendPluginManager backendplugin.Manager `inject:""`
|
||||||
|
|
||||||
|
im instancemgmt.InstanceManager
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewExecutor returns a tempoExecutor.
|
type datasourceInfo struct {
|
||||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
HTTPClient *http.Client
|
||||||
func New(httpClientProvider httpclient.Provider) func(*models.DataSource) (plugins.DataPlugin, error) {
|
URL string
|
||||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
}
|
||||||
return func(dsInfo *models.DataSource) (plugins.DataPlugin, error) {
|
|
||||||
httpClient, err := dsInfo.GetHTTPClient(httpClientProvider)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &tempoExecutor{
|
type QueryModel struct {
|
||||||
httpClient: httpClient,
|
TraceID string `json:"query"`
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
tlog = log.New("tsdb.tempo")
|
tlog = log.New("tsdb.tempo")
|
||||||
)
|
)
|
||||||
|
|
||||||
//nolint: staticcheck // plugins.DataQuery deprecated
|
func init() {
|
||||||
func (e *tempoExecutor) DataQuery(ctx context.Context, dsInfo *models.DataSource,
|
registry.Register(®istry.Descriptor{
|
||||||
queryContext plugins.DataQuery) (plugins.DataResponse, error) {
|
Name: "TempoService",
|
||||||
refID := queryContext.Queries[0].RefID
|
InitPriority: registry.Low,
|
||||||
queryResult := plugins.DataQueryResult{}
|
Instance: &Service{},
|
||||||
traceID := queryContext.Queries[0].Model.Get("query").MustString("")
|
})
|
||||||
|
}
|
||||||
|
|
||||||
req, err := e.createRequest(ctx, dsInfo, traceID)
|
func (s *Service) Init() error {
|
||||||
if err != nil {
|
s.im = datasource.NewInstanceManager(newInstanceSettings(s.HTTPClientProvider))
|
||||||
return plugins.DataResponse{}, err
|
|
||||||
|
factory := coreplugin.New(backend.ServeOpts{
|
||||||
|
QueryDataHandler: s,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err := s.BackendPluginManager.RegisterAndStart(context.Background(), "tempo", factory); err != nil {
|
||||||
|
tlog.Error("Failed to register plugin", "error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := e.httpClient.Do(req)
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.InstanceFactoryFunc {
|
||||||
|
return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
|
||||||
|
opts, err := settings.HTTPClientOptions()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
client, err := httpClientProvider.New(opts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
model := &datasourceInfo{
|
||||||
|
HTTPClient: client,
|
||||||
|
URL: settings.URL,
|
||||||
|
}
|
||||||
|
return model, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||||
|
result := backend.NewQueryDataResponse()
|
||||||
|
queryRes := backend.DataResponse{}
|
||||||
|
refID := req.Queries[0].RefID
|
||||||
|
|
||||||
|
model := &QueryModel{}
|
||||||
|
err := json.Unmarshal(req.Queries[0].JSON, model)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return plugins.DataResponse{}, fmt.Errorf("failed get to tempo: %w", err)
|
return result, err
|
||||||
|
}
|
||||||
|
|
||||||
|
dsInfo, err := s.getDSInfo(req.PluginContext)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
request, err := s.createRequest(ctx, dsInfo, model.TraceID)
|
||||||
|
if err != nil {
|
||||||
|
return result, err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := dsInfo.HTTPClient.Do(request)
|
||||||
|
if err != nil {
|
||||||
|
return result, fmt.Errorf("failed get to tempo: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -64,51 +116,54 @@ func (e *tempoExecutor) DataQuery(ctx context.Context, dsInfo *models.DataSource
|
|||||||
|
|
||||||
body, err := ioutil.ReadAll(resp.Body)
|
body, err := ioutil.ReadAll(resp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return plugins.DataResponse{}, err
|
return &backend.QueryDataResponse{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
if resp.StatusCode != http.StatusOK {
|
||||||
queryResult.Error = fmt.Errorf("failed to get trace with id: %s Status: %s Body: %s", traceID, resp.Status, string(body))
|
queryRes.Error = fmt.Errorf("failed to get trace with id: %s Status: %s Body: %s", model.TraceID, resp.Status, string(body))
|
||||||
return plugins.DataResponse{
|
result.Responses[refID] = queryRes
|
||||||
Results: map[string]plugins.DataQueryResult{
|
return result, nil
|
||||||
refID: queryResult,
|
|
||||||
},
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
otTrace, err := otlp.NewProtobufTracesUnmarshaler().UnmarshalTraces(body)
|
otTrace, err := otlp.NewProtobufTracesUnmarshaler().UnmarshalTraces(body)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return plugins.DataResponse{}, fmt.Errorf("failed to convert tempo response to Otlp: %w", err)
|
return &backend.QueryDataResponse{}, fmt.Errorf("failed to convert tempo response to Otlp: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
frame, err := TraceToFrame(otTrace)
|
frame, err := TraceToFrame(otTrace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return plugins.DataResponse{}, fmt.Errorf("failed to transform trace %v to data frame: %w", traceID, err)
|
return &backend.QueryDataResponse{}, fmt.Errorf("failed to transform trace %v to data frame: %w", model.TraceID, err)
|
||||||
}
|
}
|
||||||
frame.RefID = refID
|
frame.RefID = refID
|
||||||
frames := []*data.Frame{frame}
|
frames := []*data.Frame{frame}
|
||||||
queryResult.Dataframes = plugins.NewDecodedDataFrames(frames)
|
queryRes.Frames = frames
|
||||||
|
result.Responses[refID] = queryRes
|
||||||
return plugins.DataResponse{
|
return result, nil
|
||||||
Results: map[string]plugins.DataQueryResult{
|
|
||||||
refID: queryResult,
|
|
||||||
},
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *tempoExecutor) createRequest(ctx context.Context, dsInfo *models.DataSource, traceID string) (*http.Request, error) {
|
func (s *Service) createRequest(ctx context.Context, dsInfo *datasourceInfo, traceID string) (*http.Request, error) {
|
||||||
req, err := http.NewRequestWithContext(ctx, "GET", dsInfo.Url+"/api/traces/"+traceID, nil)
|
req, err := http.NewRequestWithContext(ctx, "GET", dsInfo.URL+"/api/traces/"+traceID, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if dsInfo.BasicAuth {
|
|
||||||
req.SetBasicAuth(dsInfo.BasicAuthUser, dsInfo.DecryptedBasicAuthPassword())
|
|
||||||
}
|
|
||||||
|
|
||||||
req.Header.Set("Accept", "application/protobuf")
|
req.Header.Set("Accept", "application/protobuf")
|
||||||
|
|
||||||
tlog.Debug("Tempo request", "url", req.URL.String(), "headers", req.Header)
|
tlog.Debug("Tempo request", "url", req.URL.String(), "headers", req.Header)
|
||||||
return req, nil
|
return req, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Service) getDSInfo(pluginCtx backend.PluginContext) (*datasourceInfo, error) {
|
||||||
|
i, err := s.im.Get(pluginCtx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
instance, ok := i.(*datasourceInfo)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("failed to cast datsource info")
|
||||||
|
}
|
||||||
|
|
||||||
|
return instance, nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -4,21 +4,15 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/infra/httpclient"
|
|
||||||
"github.com/grafana/grafana/pkg/models"
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestTempo(t *testing.T) {
|
func TestTempo(t *testing.T) {
|
||||||
plug, err := New(httpclient.NewProvider())(&models.DataSource{})
|
t.Run("createRequest - success", func(t *testing.T) {
|
||||||
executor := plug.(*tempoExecutor)
|
service := &Service{}
|
||||||
require.NoError(t, err)
|
req, err := service.createRequest(context.Background(), &datasourceInfo{}, "traceID")
|
||||||
|
|
||||||
t.Run("createRequest should set Auth header when basic auth is true ", func(t *testing.T) {
|
|
||||||
req, err := executor.createRequest(context.Background(), &models.DataSource{BasicAuth: true, BasicAuthUser: "john", BasicAuthPassword: "pass"}, "traceID")
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Equal(t, 2, len(req.Header))
|
assert.Equal(t, 1, len(req.Header))
|
||||||
assert.NotEqual(t, req.Header.Get("Authorization"), "")
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user