Improve logs for backends owned by the Observability Traces and Profiling squad (#76109)

This commit is contained in:
Fabrizio
2023-10-31 11:57:17 +01:00
committed by GitHub
parent ba7a8fb75f
commit f8912517e6
12 changed files with 301 additions and 46 deletions

View File

@@ -43,12 +43,15 @@ type PyroscopeDatasource struct {
// NewPyroscopeDatasource creates a new datasource instance.
func NewPyroscopeDatasource(ctx context.Context, httpClientProvider httpclient.Provider, settings backend.DataSourceInstanceSettings, ac accesscontrol.AccessControl) (instancemgmt.Instance, error) {
ctxLogger := logger.FromContext(ctx)
opt, err := settings.HTTPClientOptions(ctx)
if err != nil {
ctxLogger.Error("Failed to get HTTP client options", "error", err, "function", logEntrypoint())
return nil, err
}
httpClient, err := httpClientProvider.New(opt)
if err != nil {
ctxLogger.Error("Failed to create HTTP client", "error", err, "function", logEntrypoint())
return nil, err
}
@@ -61,9 +64,10 @@ func NewPyroscopeDatasource(ctx context.Context, httpClientProvider httpclient.P
}
func (d *PyroscopeDatasource) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
ctxLogger := logger.FromContext(ctx)
ctx, span := tracing.DefaultTracer().Start(ctx, "datasource.pyroscope.CallResource", trace.WithAttributes(attribute.String("path", req.Path), attribute.String("method", req.Method)))
defer span.End()
logger.Debug("CallResource", "Path", req.Path, "Method", req.Method, "Body", req.Body)
ctxLogger.Debug("CallResource", "Path", req.Path, "Method", req.Method, "Body", req.Body, "function", logEntrypoint())
if req.Path == "profileTypes" {
return d.profileTypes(ctx, req, sender)
}
@@ -79,32 +83,40 @@ func (d *PyroscopeDatasource) CallResource(ctx context.Context, req *backend.Cal
}
func (d *PyroscopeDatasource) profileTypes(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
ctxLogger := logger.FromContext(ctx)
types, err := d.client.ProfileTypes(ctx)
if err != nil {
ctxLogger.Error("Received error from client", "error", err, "function", logEntrypoint())
return err
}
bodyData, err := json.Marshal(types)
if err != nil {
ctxLogger.Error("Failed to marshal response", "error", err, "function", logEntrypoint())
return err
}
err = sender.Send(&backend.CallResourceResponse{Body: bodyData, Headers: req.Headers, Status: 200})
if err != nil {
ctxLogger.Error("Failed to send response", "error", err, "function", logEntrypoint())
return err
}
return nil
}
func (d *PyroscopeDatasource) labelNames(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
ctxLogger := logger.FromContext(ctx)
res, err := d.client.LabelNames(ctx)
if err != nil {
ctxLogger.Error("Received error from client", "error", err, "function", logEntrypoint())
return fmt.Errorf("error calling LabelNames: %v", err)
}
data, err := json.Marshal(res)
if err != nil {
ctxLogger.Error("Failed to marshal response", "error", err, "function", logEntrypoint())
return err
}
err = sender.Send(&backend.CallResourceResponse{Body: data, Headers: req.Headers, Status: 200})
if err != nil {
ctxLogger.Error("Failed to send response", "error", err, "function", logEntrypoint())
return err
}
return nil
@@ -118,24 +130,32 @@ type LabelValuesPayload struct {
}
func (d *PyroscopeDatasource) labelValues(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
ctxLogger := logger.FromContext(ctx)
u, err := url.Parse(req.URL)
if err != nil {
ctxLogger.Error("Failed to parse URL", "error", err, "function", logEntrypoint())
return err
}
query := u.Query()
res, err := d.client.LabelValues(ctx, query["label"][0])
if err != nil {
ctxLogger.Error("Received error from client", "error", err, "function", logEntrypoint())
return fmt.Errorf("error calling LabelValues: %v", err)
}
data, err := json.Marshal(res)
if err != nil {
ctxLogger.Error("Failed to marshal response", "error", err, "function", logEntrypoint())
return err
}
err = sender.Send(&backend.CallResourceResponse{Body: data, Headers: req.Headers, Status: 200})
if err != nil {
ctxLogger.Error("Failed to send response", "error", err, "function", logEntrypoint())
return err
}
return nil
}
@@ -144,13 +164,15 @@ func (d *PyroscopeDatasource) labelValues(ctx context.Context, req *backend.Call
// The QueryDataResponse contains a map of RefID to the response for each query, and each response
// contains Frames ([]*Frame).
func (d *PyroscopeDatasource) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
logger.Debug("QueryData called", "Queries", req.Queries)
ctxLogger := logger.FromContext(ctx)
ctxLogger.Debug("Processing queries", "queryLenght", len(req.Queries), "function", logEntrypoint())
// create response struct
response := backend.NewQueryDataResponse()
// loop over queries and execute them individually.
for _, q := range req.Queries {
for i, q := range req.Queries {
ctxLogger.Debug("Processing query", "counter", i, "function", logEntrypoint())
res := d.query(ctx, req.PluginContext, q)
// save the response in a hashmap
@@ -158,6 +180,7 @@ func (d *PyroscopeDatasource) QueryData(ctx context.Context, req *backend.QueryD
response.Responses[q.RefID] = res
}
ctxLogger.Debug("All queries processed", "function", logEntrypoint())
return response, nil
}
@@ -166,7 +189,7 @@ func (d *PyroscopeDatasource) QueryData(ctx context.Context, req *backend.QueryD
// datasource configuration page which allows users to verify that
// a datasource is working as expected.
func (d *PyroscopeDatasource) CheckHealth(ctx context.Context, _ *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
logger.Debug("CheckHealth called")
logger.FromContext(ctx).Debug("CheckHealth called", "function", logEntrypoint())
status := backend.HealthStatusOk
message := "Data source is working"
@@ -185,7 +208,7 @@ func (d *PyroscopeDatasource) CheckHealth(ctx context.Context, _ *backend.CheckH
// SubscribeStream is called when a client wants to connect to a stream. This callback
// allows sending the first message.
func (d *PyroscopeDatasource) SubscribeStream(_ context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
logger.Debug("SubscribeStream called")
logger.Debug("Subscribing stream called", "function", logEntrypoint())
status := backend.SubscribeStreamStatusPermissionDenied
if req.Path == "stream" {
@@ -200,7 +223,8 @@ func (d *PyroscopeDatasource) SubscribeStream(_ context.Context, req *backend.Su
// RunStream is called once for any open channel. Results are shared with everyone
// subscribed to the same channel.
func (d *PyroscopeDatasource) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
logger.Debug("RunStream called")
ctxLogger := logger.FromContext(ctx)
ctxLogger.Debug("Running stream", "path", req.Path, "function", logEntrypoint())
// Create the same data frame as for query data.
frame := data.NewFrame("response")
@@ -217,7 +241,7 @@ func (d *PyroscopeDatasource) RunStream(ctx context.Context, req *backend.RunStr
for {
select {
case <-ctx.Done():
logger.Info("Context done, finish streaming", "path", req.Path)
ctxLogger.Info("Context done, finish streaming", "path", req.Path, "function", logEntrypoint())
return nil
case <-time.After(time.Second):
// Send new data periodically.
@@ -228,7 +252,7 @@ func (d *PyroscopeDatasource) RunStream(ctx context.Context, req *backend.RunStr
err := sender.SendFrame(frame, data.IncludeAll)
if err != nil {
logger.Error("Error sending frame", "error", err)
ctxLogger.Error("Error sending frame", "error", err, "function", logEntrypoint())
continue
}
}
@@ -236,8 +260,8 @@ func (d *PyroscopeDatasource) RunStream(ctx context.Context, req *backend.RunStr
}
// PublishStream is called when a client sends a message to the stream.
func (d *PyroscopeDatasource) PublishStream(_ context.Context, _ *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
logger.Debug("PublishStream called")
func (d *PyroscopeDatasource) PublishStream(ctx context.Context, _ *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
logger.FromContext(ctx).Debug("Publishing stream", "function", logEntrypoint())
// Do not allow publishing at all.
return &backend.PublishStreamResponse{

View File

@@ -75,6 +75,7 @@ func (c *PyroscopeClient) ProfileTypes(ctx context.Context) ([]*ProfileType, err
defer span.End()
res, err := c.connectClient.ProfileTypes(ctx, connect.NewRequest(&querierv1.ProfileTypesRequest{}))
if err != nil {
logger.Error("Received error from client", "error", err, "function", logEntrypoint())
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
@@ -108,6 +109,7 @@ func (c *PyroscopeClient) GetSeries(ctx context.Context, profileTypeID string, l
resp, err := c.connectClient.SelectSeries(ctx, req)
if err != nil {
logger.Error("Received error from client", "error", err, "function", logEntrypoint())
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
@@ -162,6 +164,7 @@ func (c *PyroscopeClient) GetProfile(ctx context.Context, profileTypeID, labelSe
resp, err := c.connectClient.SelectMergeStacktraces(ctx, req)
if err != nil {
logger.Error("Received error from client", "error", err, "function", logEntrypoint())
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
@@ -207,9 +210,10 @@ func (c *PyroscopeClient) LabelNames(ctx context.Context) ([]string, error) {
defer span.End()
resp, err := c.connectClient.LabelNames(ctx, connect.NewRequest(&typesv1.LabelNamesRequest{}))
if err != nil {
logger.Error("Received error from client", "error", err, "function", logEntrypoint())
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("error seding LabelNames request %v", err)
return nil, fmt.Errorf("error sending LabelNames request %v", err)
}
var filtered []string
@@ -227,6 +231,7 @@ func (c *PyroscopeClient) LabelValues(ctx context.Context, label string) ([]stri
defer span.End()
resp, err := c.connectClient.LabelValues(ctx, connect.NewRequest(&typesv1.LabelValuesRequest{Name: label}))
if err != nil {
logger.Error("Received error from client", "error", err, "function", logEntrypoint())
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err

View File

@@ -69,10 +69,10 @@ func (d *PyroscopeDatasource) query(ctx context.Context, pCtx backend.PluginCont
parsedInterval, err = gtime.ParseDuration(dsJson.MinStep)
if err != nil {
parsedInterval = time.Second * 15
logger.Debug("Failed to parse the MinStep using default", "MinStep", dsJson.MinStep)
logger.Error("Failed to parse the MinStep using default", "MinStep", dsJson.MinStep, "function", logEntrypoint())
}
}
logger.Debug("Sending SelectSeriesRequest", "queryModel", qm)
logger.Debug("Sending SelectSeriesRequest", "queryModel", qm, "function", logEntrypoint())
seriesResp, err := d.client.GetSeries(
gCtx,
qm.ProfileTypeId,
@@ -85,7 +85,7 @@ func (d *PyroscopeDatasource) query(ctx context.Context, pCtx backend.PluginCont
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
logger.Error("Querying SelectSeries()", "err", err)
logger.Error("Querying SelectSeries()", "err", err, "function", logEntrypoint())
return err
}
// add the frames to the response.
@@ -98,12 +98,12 @@ func (d *PyroscopeDatasource) query(ctx context.Context, pCtx backend.PluginCont
if query.QueryType == queryTypeProfile || query.QueryType == queryTypeBoth {
g.Go(func() error {
logger.Debug("Calling GetProfile", "queryModel", qm)
logger.Debug("Calling GetProfile", "queryModel", qm, "function", logEntrypoint())
prof, err := d.client.GetProfile(gCtx, qm.ProfileTypeId, qm.LabelSelector, query.TimeRange.From.UnixMilli(), query.TimeRange.To.UnixMilli(), qm.MaxNodes)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
logger.Error("Error GetProfile()", "err", err)
logger.Error("Error GetProfile()", "err", err, "function", logEntrypoint())
return err
}
@@ -201,7 +201,7 @@ func levelsToTree(levels []*Level, names []string) *ProfileTree {
// If we still have levels to go, this should not happen. Something is probably wrong with the flamebearer data.
if len(parentsStack) == 0 {
logger.Error("ParentsStack is empty but we are not at the the last level", "currentLevel", currentLevel)
logger.Error("ParentsStack is empty but we are not at the the last level", "currentLevel", currentLevel, "function", logEntrypoint())
break
}
@@ -245,7 +245,7 @@ func levelsToTree(levels []*Level, names []string) *ProfileTree {
// We went out of parents bounds so lets move to next parent. We will evaluate the same item again, but
// we will check if it is a child of the next parent item in line.
if len(parentsStack) == 0 {
logger.Error("ParentsStack is empty but there are still items in current level", "currentLevel", currentLevel, "itemIndex", itemIndex)
logger.Error("ParentsStack is empty but there are still items in current level", "currentLevel", currentLevel, "itemIndex", itemIndex, "function", logEntrypoint())
break
}
currentParent = parentsStack[:1][0]

View File

@@ -2,6 +2,9 @@ package pyroscope
import (
"context"
"fmt"
"runtime"
"strings"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
@@ -27,15 +30,34 @@ var (
_ backend.StreamHandler = (*Service)(nil)
)
type Service struct {
im instancemgmt.InstanceManager
logger log.Logger
}
var logger = log.New("tsdb.pyroscope")
type Service struct {
im instancemgmt.InstanceManager
// Return the file, line, and (full-path) function name of the caller
func getRunContext() (string, int, string) {
pc := make([]uintptr, 10)
runtime.Callers(2, pc)
f := runtime.FuncForPC(pc[0])
file, line := f.FileLine(pc[0])
return file, line, f.Name()
}
// Return a formatted string representing the execution context for the logger
func logEntrypoint() string {
file, line, pathToFunction := getRunContext()
parts := strings.Split(pathToFunction, "/")
functionName := parts[len(parts)-1]
return fmt.Sprintf("%s:%d[%s]", file, line, functionName)
}
func (s *Service) getInstance(ctx context.Context, pluginCtx backend.PluginContext) (*PyroscopeDatasource, error) {
i, err := s.im.Get(ctx, pluginCtx)
if err != nil {
s.logger.FromContext(ctx).Error("Failed to get instance", "error", err, "pluginID", pluginCtx.PluginID, "function", logEntrypoint())
return nil, err
}
in := i.(*PyroscopeDatasource)
@@ -44,7 +66,8 @@ func (s *Service) getInstance(ctx context.Context, pluginCtx backend.PluginConte
func ProvideService(httpClientProvider httpclient.Provider, ac accesscontrol.AccessControl) *Service {
return &Service{
im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider, ac)),
im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider, ac)),
logger: logger,
}
}
@@ -55,50 +78,110 @@ func newInstanceSettings(httpClientProvider httpclient.Provider, ac accesscontro
}
func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
ctxLogger := s.logger.FromContext(ctx)
ctxLogger.Debug("Processing queries", "queriesLength", len(req.Queries), "function", logEntrypoint())
i, err := s.getInstance(ctx, req.PluginContext)
if err != nil {
return nil, err
}
return i.QueryData(ctx, req)
response, err := i.QueryData(ctx, req)
if err != nil {
ctxLogger.Error("Received error from Pyroscope", "error", err, "function", logEntrypoint())
} else {
ctxLogger.Debug("All queries processed", "function", logEntrypoint())
}
return response, err
}
func (s *Service) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
loggerWithContext := s.logger.FromContext(ctx)
loggerWithContext.Debug("Calling resource", "function", logEntrypoint())
i, err := s.getInstance(ctx, req.PluginContext)
if err != nil {
return err
}
return i.CallResource(ctx, req, sender)
err = i.CallResource(ctx, req, sender)
if err != nil {
loggerWithContext.Error("Received error from Pyroscope", "error", err, "function", logEntrypoint())
} else {
loggerWithContext.Debug("Health check succeeded", "function", logEntrypoint())
}
return err
}
func (s *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
loggerWithContext := s.logger.FromContext(ctx)
loggerWithContext.Debug("Checking health", "function", logEntrypoint())
i, err := s.getInstance(ctx, req.PluginContext)
if err != nil {
return nil, err
}
return i.CheckHealth(ctx, req)
response, err := i.CheckHealth(ctx, req)
if err != nil {
loggerWithContext.Error("Received error from Pyroscope", "error", err, "function", logEntrypoint())
} else {
loggerWithContext.Debug("Health check succeeded", "function", logEntrypoint())
}
return response, err
}
func (s *Service) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
loggerWithContext := s.logger.FromContext(ctx)
loggerWithContext.Debug("Subscribing stream", "function", logEntrypoint())
i, err := s.getInstance(ctx, req.PluginContext)
if err != nil {
return nil, err
}
return i.SubscribeStream(ctx, req)
response, err := i.SubscribeStream(ctx, req)
if err != nil {
loggerWithContext.Error("Received error from Pyroscope", "error", err, "function", logEntrypoint())
} else {
loggerWithContext.Debug("Stream subscribed", "function", logEntrypoint())
}
return response, err
}
func (s *Service) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
loggerWithContext := s.logger.FromContext(ctx)
loggerWithContext.Debug("Running stream", "function", logEntrypoint())
i, err := s.getInstance(ctx, req.PluginContext)
if err != nil {
return err
}
return i.RunStream(ctx, req, sender)
err = i.RunStream(ctx, req, sender)
if err != nil {
loggerWithContext.Error("Received error from Pyroscope", "error", err, "function", logEntrypoint())
} else {
loggerWithContext.Debug("Stream run", "function", logEntrypoint())
}
return err
}
// PublishStream is called when a client sends a message to the stream.
func (s *Service) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
loggerWithContext := s.logger.FromContext(ctx)
loggerWithContext.Debug("Publishing stream", "function", logEntrypoint())
i, err := s.getInstance(ctx, req.PluginContext)
if err != nil {
return nil, err
}
return i.PublishStream(ctx, req)
response, err := i.PublishStream(ctx, req)
if err != nil {
loggerWithContext.Error("Received error from Pyroscope", "error", err, "function", logEntrypoint())
} else {
loggerWithContext.Debug("Stream published", "function", logEntrypoint())
}
return response, err
}

View File

@@ -36,12 +36,15 @@ type ParcaDatasource struct {
// NewParcaDatasource creates a new datasource instance.
func NewParcaDatasource(ctx context.Context, httpClientProvider httpclient.Provider, settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
ctxLogger := logger.FromContext(ctx)
opt, err := settings.HTTPClientOptions(ctx)
if err != nil {
ctxLogger.Error("Failed to get HTTP options", "error", err, "function", logEntrypoint())
return nil, err
}
httpClient, err := httpClientProvider.New(opt)
if err != nil {
ctxLogger.Error("Failed to create HTTP client", "error", err, "function", logEntrypoint())
return nil, err
}
@@ -58,7 +61,8 @@ func (d *ParcaDatasource) Dispose() {
}
func (d *ParcaDatasource) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
logger.Debug("CallResource", "Path", req.Path, "Method", req.Method, "Body", req.Body)
ctxLogger := logger.FromContext(ctx)
ctxLogger.Debug("CallResource", "Path", req.Path, "Method", req.Method, "Body", req.Body, "function", logEntrypoint())
ctx, span := tracing.DefaultTracer().Start(ctx, "datasource.parca.CallResource", trace.WithAttributes(attribute.String("path", req.Path), attribute.String("method", req.Method)))
defer span.End()
@@ -69,6 +73,7 @@ func (d *ParcaDatasource) CallResource(ctx context.Context, req *backend.CallRes
return d.callLabelNames(ctx, req, sender)
}
if req.Path == "labelValues" {
ctxLogger.Debug("CallResource completed", "function", logEntrypoint())
return d.callLabelValues(ctx, req, sender)
}
return sender.Send(&backend.CallResourceResponse{
@@ -81,8 +86,6 @@ func (d *ParcaDatasource) CallResource(ctx context.Context, req *backend.CallRes
// The QueryDataResponse contains a map of RefID to the response for each query, and each response
// contains Frames ([]*Frame).
func (d *ParcaDatasource) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
logger.Debug("QueryData called", "queries", req.Queries)
// create response struct
response := backend.NewQueryDataResponse()
@@ -103,8 +106,7 @@ func (d *ParcaDatasource) QueryData(ctx context.Context, req *backend.QueryDataR
// datasource configuration page which allows users to verify that
// a datasource is working as expected.
func (d *ParcaDatasource) CheckHealth(ctx context.Context, _ *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
logger.Debug("CheckHealth called")
ctxLogger := logger.FromContext(ctx)
status := backend.HealthStatusOk
message := "Data source is working"
@@ -113,6 +115,7 @@ func (d *ParcaDatasource) CheckHealth(ctx context.Context, _ *backend.CheckHealt
message = err.Error()
}
ctxLogger.Debug("CheckHealth completed", "function", logEntrypoint())
return &backend.CheckHealthResult{
Status: status,
Message: message,

View File

@@ -31,6 +31,7 @@ const (
// query processes single Parca query transforming the response to data.Frame packaged in DataResponse
func (d *ParcaDatasource) query(ctx context.Context, pCtx backend.PluginContext, query backend.DataQuery) backend.DataResponse {
ctxLogger := logger.FromContext(ctx)
ctx, span := tracing.DefaultTracer().Start(ctx, "datasource.parca.query", trace.WithAttributes(attribute.String("query_type", query.QueryType)))
defer span.End()
@@ -40,6 +41,7 @@ func (d *ParcaDatasource) query(ctx context.Context, pCtx backend.PluginContext,
err := json.Unmarshal(query.JSON, &qm)
if err != nil {
response.Error = err
ctxLogger.Error("Failed to unmarshall query", "error", err, "function", logEntrypoint())
span.RecordError(response.Error)
span.SetStatus(codes.Error, response.Error.Error())
return response
@@ -49,6 +51,7 @@ func (d *ParcaDatasource) query(ctx context.Context, pCtx backend.PluginContext,
seriesResp, err := d.client.QueryRange(ctx, makeMetricRequest(qm, query))
if err != nil {
response.Error = err
ctxLogger.Error("Failed to process query", "error", err, "queryType", query.QueryType, "function", logEntrypoint())
span.RecordError(response.Error)
span.SetStatus(codes.Error, response.Error.Error())
return response
@@ -57,10 +60,11 @@ func (d *ParcaDatasource) query(ctx context.Context, pCtx backend.PluginContext,
}
if query.QueryType == queryTypeProfile || query.QueryType == queryTypeBoth {
logger.Debug("Querying SelectMergeStacktraces()", "queryModel", qm)
ctxLogger.Debug("Querying SelectMergeStacktraces()", "queryModel", qm, "function", logEntrypoint())
resp, err := d.client.Query(ctx, makeProfileRequest(qm, query))
if err != nil {
response.Error = err
ctxLogger.Error("Failed to process query", "error", err, "queryType", query.QueryType, "function", logEntrypoint())
span.RecordError(response.Error)
span.SetStatus(codes.Error, response.Error.Error())
return response

View File

@@ -25,10 +25,14 @@ type ProfileType struct {
}
func (d *ParcaDatasource) callProfileTypes(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
ctxLogger := logger.FromContext(ctx)
ctxLogger.Debug("Getting profile types", "function", logEntrypoint())
ctx, span := tracing.DefaultTracer().Start(ctx, "datasource.parca.callProfileTypes")
defer span.End()
res, err := d.client.ProfileTypes(ctx, connect.NewRequest(&v1alpha1.ProfileTypesRequest{}))
if err != nil {
ctxLogger.Error("Failed to get profile types", "error", err, "function", logEntrypoint())
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
@@ -56,24 +60,32 @@ func (d *ParcaDatasource) callProfileTypes(ctx context.Context, req *backend.Cal
data, err := json.Marshal(types)
if err != nil {
ctxLogger.Error("Failed to marshal profile types", "error", err, "function", logEntrypoint())
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
err = sender.Send(&backend.CallResourceResponse{Body: data, Headers: req.Headers, Status: 200})
if err != nil {
ctxLogger.Error("Failed to send data to Parca", "error", err, "function", logEntrypoint())
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
ctxLogger.Debug("Successfully got profile types", "function", logEntrypoint())
return nil
}
func (d *ParcaDatasource) callLabelNames(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
ctxLogger := logger.FromContext(ctx)
ctxLogger.Debug("Getting label names", "function", logEntrypoint())
ctx, span := tracing.DefaultTracer().Start(ctx, "datasource.parca.callLabelNames")
defer span.End()
res, err := d.client.Labels(ctx, connect.NewRequest(&v1alpha1.LabelsRequest{}))
if err != nil {
ctxLogger.Error("Failed to get label names", "error", err, "function", logEntrypoint())
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
@@ -81,49 +93,63 @@ func (d *ParcaDatasource) callLabelNames(ctx context.Context, req *backend.CallR
data, err := json.Marshal(res.Msg.LabelNames)
if err != nil {
ctxLogger.Error("Failed to marshal label names", "error", err, "function", logEntrypoint())
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
err = sender.Send(&backend.CallResourceResponse{Body: data, Headers: req.Headers, Status: 200})
if err != nil {
ctxLogger.Error("Failed to send data to Parca", "error", err, "function", logEntrypoint())
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
ctxLogger.Debug("Successfully got label names", "function", logEntrypoint())
return nil
}
func (d *ParcaDatasource) callLabelValues(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
ctxLogger := logger.FromContext(ctx)
ctxLogger.Debug("Getting label values", "function", logEntrypoint())
ctx, span := tracing.DefaultTracer().Start(ctx, "datasource.parca.callLabelValues")
defer span.End()
parsedUrl, err := url.Parse(req.URL)
if err != nil {
ctxLogger.Error("Failed to parse URL", "error", err, "function", logEntrypoint())
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
label, ok := parsedUrl.Query()["label"]
if !ok {
ctxLogger.Error("Failed to get label from query", "error", err, "function", logEntrypoint())
label = []string{""}
}
res, err := d.client.Values(ctx, connect.NewRequest(&v1alpha1.ValuesRequest{LabelName: label[0]}))
if err != nil {
ctxLogger.Error("Failed to get values for given label", "error", err, "function", logEntrypoint())
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
data, err := json.Marshal(res.Msg.LabelValues)
if err != nil {
ctxLogger.Error("Failed to marshal label values", "error", err, "function", logEntrypoint())
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
err = sender.Send(&backend.CallResourceResponse{Body: data, Headers: req.Headers, Status: 200})
if err != nil {
ctxLogger.Error("Failed to send data to Parca", "error", err, "function", logEntrypoint())
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
ctxLogger.Debug("Successfully got label values", "function", logEntrypoint())
return nil
}

View File

@@ -2,6 +2,9 @@ package parca
import (
"context"
"fmt"
"runtime"
"strings"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
@@ -25,15 +28,35 @@ var (
_ backend.CheckHealthHandler = (*Service)(nil)
)
type Service struct {
im instancemgmt.InstanceManager
logger log.Logger
}
var logger = log.New("tsdb.parca")
type Service struct {
im instancemgmt.InstanceManager
// Return the file, line, and (full-path) function name of the caller
func getRunContext() (string, int, string) {
pc := make([]uintptr, 10)
runtime.Callers(2, pc)
f := runtime.FuncForPC(pc[0])
file, line := f.FileLine(pc[0])
return file, line, f.Name()
}
// Return a formatted string representing the execution context for the logger
func logEntrypoint() string {
file, line, pathToFunction := getRunContext()
parts := strings.Split(pathToFunction, "/")
functionName := parts[len(parts)-1]
return fmt.Sprintf("%s:%d[%s]", file, line, functionName)
}
func (s *Service) getInstance(ctx context.Context, pluginCtx backend.PluginContext) (*ParcaDatasource, error) {
ctxLogger := s.logger.FromContext(ctx)
i, err := s.im.Get(ctx, pluginCtx)
if err != nil {
ctxLogger.Error("Failed to get instance", "error", err, "pluginID", pluginCtx.PluginID, "function", logEntrypoint())
return nil, err
}
in := i.(*ParcaDatasource)
@@ -42,7 +65,8 @@ func (s *Service) getInstance(ctx context.Context, pluginCtx backend.PluginConte
func ProvideService(httpClientProvider httpclient.Provider) *Service {
return &Service{
im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider)),
im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider)),
logger: logger,
}
}
@@ -53,25 +77,55 @@ func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.Inst
}
func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
ctxLogger := s.logger.FromContext(ctx)
ctxLogger.Debug("Processing queries", "queryLength", len(req.Queries), "function", logEntrypoint())
i, err := s.getInstance(ctx, req.PluginContext)
if err != nil {
return nil, err
}
return i.QueryData(ctx, req)
data, err := i.QueryData(ctx, req)
if err != nil {
ctxLogger.Error("Received error from Parca", "error", err, "function", logEntrypoint())
} else {
ctxLogger.Debug("All queries processed", "function", logEntrypoint())
}
return data, err
}
func (s *Service) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
ctxLogger := s.logger.FromContext(ctx)
ctxLogger.Debug("Calling resource", "path", req.Path, "function", logEntrypoint())
i, err := s.getInstance(ctx, req.PluginContext)
if err != nil {
return err
}
return i.CallResource(ctx, req, sender)
err = i.CallResource(ctx, req, sender)
if err != nil {
ctxLogger.Error("Failed to call resource", "error", err, "function", logEntrypoint())
} else {
ctxLogger.Debug("Resource called", "function", logEntrypoint())
}
return err
}
func (s *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
ctxLogger := s.logger.FromContext(ctx)
ctxLogger.Debug("Checking health", "function", logEntrypoint())
i, err := s.getInstance(ctx, req.PluginContext)
if err != nil {
return nil, err
}
return i.CheckHealth(ctx, req)
check, err := i.CheckHealth(ctx, req)
if err != nil {
ctxLogger.Error("Health check failed", "error", err, "function", logEntrypoint())
} else {
ctxLogger.Debug("Health check succeeded", "function", logEntrypoint())
}
return check, err
}

View File

@@ -10,12 +10,15 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/tempo/pkg/tempopb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)
var logger = log.New("tsdb.tempo")
// This function creates a new gRPC client to connect to a streaming query service.
// It starts by parsing the URL from the data source settings and extracting the host, since that's what the gRPC connection expects.
// If the URL does not contain a port number, it adds a default port based on the scheme (80 for HTTP and 443 for HTTPS).
@@ -24,6 +27,7 @@ import (
func newGrpcClient(settings backend.DataSourceInstanceSettings, opts httpclient.Options) (tempopb.StreamingQuerierClient, error) {
parsedUrl, err := url.Parse(settings.URL)
if err != nil {
logger.Error("Error parsing URL for gRPC client", "error", err, "URL", settings.URL, "function", logEntrypoint())
return nil, err
}
@@ -48,8 +52,10 @@ func newGrpcClient(settings backend.DataSourceInstanceSettings, opts httpclient.
clientConn, err := grpc.Dial(onlyHost, dialOps...)
if err != nil {
logger.Error("Error dialing gRPC client", "error", err, "URL", settings.URL, "function", logEntrypoint())
return nil, err
}
return tempopb.NewStreamingQuerierClient(clientConn), nil
}

View File

@@ -18,7 +18,10 @@ import (
)
func TestProcessStream_ValidInput_ReturnsNoError(t *testing.T) {
service := &Service{}
logger := log.New("tsdb.tempo.test")
service := &Service{
logger: logger,
}
searchClient := &mockStreamer{}
streamSender := &mockSender{}
err := service.processStream(context.Background(), searchClient, streamSender)

View File

@@ -4,6 +4,8 @@ import (
"context"
"fmt"
"net/http"
"runtime"
"strings"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
@@ -19,6 +21,23 @@ type Service struct {
logger log.Logger
}
// Return the file, line, and (full-path) function name of the caller
func getRunContext() (string, int, string) {
pc := make([]uintptr, 10)
runtime.Callers(2, pc)
f := runtime.FuncForPC(pc[0])
file, line := f.FileLine(pc[0])
return file, line, f.Name()
}
// Return a formatted string representing the execution context for the logger
func logEntrypoint() string {
file, line, pathToFunction := getRunContext()
parts := strings.Split(pathToFunction, "/")
functionName := parts[len(parts)-1]
return fmt.Sprintf("%s:%d[%s]", file, line, functionName)
}
func ProvideService(httpClientProvider httpclient.Provider) *Service {
return &Service{
logger: log.New("tsdb.tempo"),
@@ -34,18 +53,22 @@ type Datasource struct {
func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.InstanceFactoryFunc {
return func(ctx context.Context, settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
ctxLogger := log.New("tsdb.tempo").FromContext(ctx)
opts, err := settings.HTTPClientOptions(ctx)
if err != nil {
ctxLogger.Error("Failed to get HTTP client options", "error", err, "function", logEntrypoint())
return nil, err
}
client, err := httpClientProvider.New(opts)
if err != nil {
ctxLogger.Error("Failed to get HTTP client provider", "error", err, "function", logEntrypoint())
return nil, err
}
streamingClient, err := newGrpcClient(settings, opts)
if err != nil {
ctxLogger.Error("Failed to get gRPC client", "error", err, "function", logEntrypoint())
return nil, err
}
@@ -59,20 +82,29 @@ func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.Inst
}
func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
ctxLogger := s.logger.FromContext(ctx)
ctxLogger.Debug("Processing queries", "queryLength", len(req.Queries), "function", logEntrypoint())
// create response struct
response := backend.NewQueryDataResponse()
// loop over queries and execute them individually.
for _, q := range req.Queries {
for i, q := range req.Queries {
ctxLogger.Debug("Processing query", "counter", i, "function", logEntrypoint())
if res, err := s.query(ctx, req.PluginContext, q); err != nil {
ctxLogger.Error("Error processing query", "error", err)
return response, err
} else {
if res != nil {
ctxLogger.Debug("Query processed", "counter", i, "function", logEntrypoint())
response.Responses[q.RefID] = *res
} else {
ctxLogger.Debug("Query resulted in empty response", "counter", i, "function", logEntrypoint())
}
}
}
ctxLogger.Debug("All queries processed", "function", logEntrypoint())
return response, nil
}
@@ -80,7 +112,6 @@ func (s *Service) query(ctx context.Context, pCtx backend.PluginContext, query b
if query.QueryType == string(dataquery.TempoQueryTypeTraceId) {
return s.getTrace(ctx, pCtx, query)
}
return nil, fmt.Errorf("unsupported query type: '%s' for query with refID '%s'", query.QueryType, query.RefID)
}

View File

@@ -18,6 +18,9 @@ import (
)
func (s *Service) getTrace(ctx context.Context, pCtx backend.PluginContext, query backend.DataQuery) (*backend.DataResponse, error) {
ctxLogger := s.logger.FromContext(ctx)
ctxLogger.Debug("Getting trace", "function", logEntrypoint())
result := &backend.DataResponse{}
refID := query.RefID
@@ -29,20 +32,25 @@ func (s *Service) getTrace(ctx context.Context, pCtx backend.PluginContext, quer
model := &dataquery.TempoQuery{}
err := json.Unmarshal(query.JSON, model)
if err != nil {
ctxLogger.Error("Failed to unmarshall Tempo query model", "error", err, "function", logEntrypoint())
return result, err
}
dsInfo, err := s.getDSInfo(ctx, pCtx)
if err != nil {
ctxLogger.Error("Failed to get datasource information", "error", err, "function", logEntrypoint())
return nil, err
}
if model.Query == nil || *model.Query == "" {
return result, fmt.Errorf("trace id is required")
err := fmt.Errorf("trace id is required")
ctxLogger.Error("Failed to validate model query", "error", err, "function", logEntrypoint())
return result, err
}
request, err := s.createRequest(ctx, dsInfo, *model.Query, query.TimeRange.From.Unix(), query.TimeRange.To.Unix())
if err != nil {
ctxLogger.Error("Failed to create request", "error", err, "function", logEntrypoint())
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return result, err
@@ -50,6 +58,7 @@ func (s *Service) getTrace(ctx context.Context, pCtx backend.PluginContext, quer
resp, err := dsInfo.HTTPClient.Do(request)
if err != nil {
ctxLogger.Error("Failed to send request to Tempo", "error", err, "function", logEntrypoint())
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return result, fmt.Errorf("failed get to tempo: %w", err)
@@ -57,16 +66,18 @@ func (s *Service) getTrace(ctx context.Context, pCtx backend.PluginContext, quer
defer func() {
if err := resp.Body.Close(); err != nil {
s.logger.FromContext(ctx).Warn("Failed to close response body", "err", err)
ctxLogger.Error("Failed to close response body", "error", err, "function", logEntrypoint())
}
}()
body, err := io.ReadAll(resp.Body)
if err != nil {
ctxLogger.Error("Failed to read response body", "error", err, "function", logEntrypoint())
return &backend.DataResponse{}, err
}
if resp.StatusCode != http.StatusOK {
ctxLogger.Error("Failed to get trace", "error", err, "function", logEntrypoint())
result.Error = fmt.Errorf("failed to get trace with id: %v Status: %s Body: %s", model.Query, resp.Status, string(body))
span.RecordError(result.Error)
span.SetStatus(codes.Error, result.Error.Error())
@@ -77,6 +88,7 @@ func (s *Service) getTrace(ctx context.Context, pCtx backend.PluginContext, quer
otTrace, err := pbUnmarshaler.UnmarshalTraces(body)
if err != nil {
ctxLogger.Error("Failed to convert tempo response to Otlp", "error", err, "function", logEntrypoint())
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return &backend.DataResponse{}, fmt.Errorf("failed to convert tempo response to Otlp: %w", err)
@@ -84,18 +96,23 @@ func (s *Service) getTrace(ctx context.Context, pCtx backend.PluginContext, quer
frame, err := TraceToFrame(otTrace)
if err != nil {
ctxLogger.Error("Failed to transform trace to data frame", "error", err, "function", logEntrypoint())
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return &backend.DataResponse{}, fmt.Errorf("failed to transform trace %v to data frame: %w", model.Query, err)
}
frame.RefID = refID
frames := []*data.Frame{frame}
result.Frames = frames
ctxLogger.Debug("Successfully got trace", "function", logEntrypoint())
return result, nil
}
func (s *Service) createRequest(ctx context.Context, dsInfo *Datasource, traceID string, start int64, end int64) (*http.Request, error) {
ctxLogger := s.logger.FromContext(ctx)
var tempoQuery string
if start == 0 || end == 0 {
tempoQuery = fmt.Sprintf("%s/api/traces/%s", dsInfo.URL, traceID)
} else {
@@ -104,11 +121,10 @@ func (s *Service) createRequest(ctx context.Context, dsInfo *Datasource, traceID
req, err := http.NewRequestWithContext(ctx, "GET", tempoQuery, nil)
if err != nil {
ctxLogger.Error("Failed to create request", "error", err, "function", logEntrypoint())
return nil, err
}
req.Header.Set("Accept", "application/protobuf")
s.logger.FromContext(ctx).Debug("Tempo request", "url", req.URL.String(), "headers", req.Header)
return req, nil
}