diff --git a/pkg/tsdb/grafana-pyroscope-datasource/instance.go b/pkg/tsdb/grafana-pyroscope-datasource/instance.go index 58b97e19d08..bf1dd60558a 100644 --- a/pkg/tsdb/grafana-pyroscope-datasource/instance.go +++ b/pkg/tsdb/grafana-pyroscope-datasource/instance.go @@ -43,12 +43,15 @@ type PyroscopeDatasource struct { // NewPyroscopeDatasource creates a new datasource instance. func NewPyroscopeDatasource(ctx context.Context, httpClientProvider httpclient.Provider, settings backend.DataSourceInstanceSettings, ac accesscontrol.AccessControl) (instancemgmt.Instance, error) { + ctxLogger := logger.FromContext(ctx) opt, err := settings.HTTPClientOptions(ctx) if err != nil { + ctxLogger.Error("Failed to get HTTP client options", "error", err, "function", logEntrypoint()) return nil, err } httpClient, err := httpClientProvider.New(opt) if err != nil { + ctxLogger.Error("Failed to create HTTP client", "error", err, "function", logEntrypoint()) return nil, err } @@ -61,9 +64,10 @@ func NewPyroscopeDatasource(ctx context.Context, httpClientProvider httpclient.P } func (d *PyroscopeDatasource) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + ctxLogger := logger.FromContext(ctx) ctx, span := tracing.DefaultTracer().Start(ctx, "datasource.pyroscope.CallResource", trace.WithAttributes(attribute.String("path", req.Path), attribute.String("method", req.Method))) defer span.End() - logger.Debug("CallResource", "Path", req.Path, "Method", req.Method, "Body", req.Body) + ctxLogger.Debug("CallResource", "Path", req.Path, "Method", req.Method, "Body", req.Body, "function", logEntrypoint()) if req.Path == "profileTypes" { return d.profileTypes(ctx, req, sender) } @@ -79,32 +83,40 @@ func (d *PyroscopeDatasource) CallResource(ctx context.Context, req *backend.Cal } func (d *PyroscopeDatasource) profileTypes(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + ctxLogger := logger.FromContext(ctx) types, err := d.client.ProfileTypes(ctx) if err != nil { + ctxLogger.Error("Received error from client", "error", err, "function", logEntrypoint()) return err } bodyData, err := json.Marshal(types) if err != nil { + ctxLogger.Error("Failed to marshal response", "error", err, "function", logEntrypoint()) return err } err = sender.Send(&backend.CallResourceResponse{Body: bodyData, Headers: req.Headers, Status: 200}) if err != nil { + ctxLogger.Error("Failed to send response", "error", err, "function", logEntrypoint()) return err } return nil } func (d *PyroscopeDatasource) labelNames(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + ctxLogger := logger.FromContext(ctx) res, err := d.client.LabelNames(ctx) if err != nil { + ctxLogger.Error("Received error from client", "error", err, "function", logEntrypoint()) return fmt.Errorf("error calling LabelNames: %v", err) } data, err := json.Marshal(res) if err != nil { + ctxLogger.Error("Failed to marshal response", "error", err, "function", logEntrypoint()) return err } err = sender.Send(&backend.CallResourceResponse{Body: data, Headers: req.Headers, Status: 200}) if err != nil { + ctxLogger.Error("Failed to send response", "error", err, "function", logEntrypoint()) return err } return nil @@ -118,24 +130,32 @@ type LabelValuesPayload struct { } func (d *PyroscopeDatasource) labelValues(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + ctxLogger := logger.FromContext(ctx) u, err := url.Parse(req.URL) if err != nil { + ctxLogger.Error("Failed to parse URL", "error", err, "function", logEntrypoint()) return err } query := u.Query() res, err := d.client.LabelValues(ctx, query["label"][0]) if err != nil { + ctxLogger.Error("Received error from client", "error", err, "function", logEntrypoint()) return fmt.Errorf("error calling LabelValues: %v", err) } + data, err := json.Marshal(res) if err != nil { + ctxLogger.Error("Failed to marshal response", "error", err, "function", logEntrypoint()) return err } + err = sender.Send(&backend.CallResourceResponse{Body: data, Headers: req.Headers, Status: 200}) if err != nil { + ctxLogger.Error("Failed to send response", "error", err, "function", logEntrypoint()) return err } + return nil } @@ -144,13 +164,15 @@ func (d *PyroscopeDatasource) labelValues(ctx context.Context, req *backend.Call // The QueryDataResponse contains a map of RefID to the response for each query, and each response // contains Frames ([]*Frame). func (d *PyroscopeDatasource) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { - logger.Debug("QueryData called", "Queries", req.Queries) + ctxLogger := logger.FromContext(ctx) + ctxLogger.Debug("Processing queries", "queryLenght", len(req.Queries), "function", logEntrypoint()) // create response struct response := backend.NewQueryDataResponse() // loop over queries and execute them individually. - for _, q := range req.Queries { + for i, q := range req.Queries { + ctxLogger.Debug("Processing query", "counter", i, "function", logEntrypoint()) res := d.query(ctx, req.PluginContext, q) // save the response in a hashmap @@ -158,6 +180,7 @@ func (d *PyroscopeDatasource) QueryData(ctx context.Context, req *backend.QueryD response.Responses[q.RefID] = res } + ctxLogger.Debug("All queries processed", "function", logEntrypoint()) return response, nil } @@ -166,7 +189,7 @@ func (d *PyroscopeDatasource) QueryData(ctx context.Context, req *backend.QueryD // datasource configuration page which allows users to verify that // a datasource is working as expected. func (d *PyroscopeDatasource) CheckHealth(ctx context.Context, _ *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { - logger.Debug("CheckHealth called") + logger.FromContext(ctx).Debug("CheckHealth called", "function", logEntrypoint()) status := backend.HealthStatusOk message := "Data source is working" @@ -185,7 +208,7 @@ func (d *PyroscopeDatasource) CheckHealth(ctx context.Context, _ *backend.CheckH // SubscribeStream is called when a client wants to connect to a stream. This callback // allows sending the first message. func (d *PyroscopeDatasource) SubscribeStream(_ context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { - logger.Debug("SubscribeStream called") + logger.Debug("Subscribing stream called", "function", logEntrypoint()) status := backend.SubscribeStreamStatusPermissionDenied if req.Path == "stream" { @@ -200,7 +223,8 @@ func (d *PyroscopeDatasource) SubscribeStream(_ context.Context, req *backend.Su // RunStream is called once for any open channel. Results are shared with everyone // subscribed to the same channel. func (d *PyroscopeDatasource) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error { - logger.Debug("RunStream called") + ctxLogger := logger.FromContext(ctx) + ctxLogger.Debug("Running stream", "path", req.Path, "function", logEntrypoint()) // Create the same data frame as for query data. frame := data.NewFrame("response") @@ -217,7 +241,7 @@ func (d *PyroscopeDatasource) RunStream(ctx context.Context, req *backend.RunStr for { select { case <-ctx.Done(): - logger.Info("Context done, finish streaming", "path", req.Path) + ctxLogger.Info("Context done, finish streaming", "path", req.Path, "function", logEntrypoint()) return nil case <-time.After(time.Second): // Send new data periodically. @@ -228,7 +252,7 @@ func (d *PyroscopeDatasource) RunStream(ctx context.Context, req *backend.RunStr err := sender.SendFrame(frame, data.IncludeAll) if err != nil { - logger.Error("Error sending frame", "error", err) + ctxLogger.Error("Error sending frame", "error", err, "function", logEntrypoint()) continue } } @@ -236,8 +260,8 @@ func (d *PyroscopeDatasource) RunStream(ctx context.Context, req *backend.RunStr } // PublishStream is called when a client sends a message to the stream. -func (d *PyroscopeDatasource) PublishStream(_ context.Context, _ *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { - logger.Debug("PublishStream called") +func (d *PyroscopeDatasource) PublishStream(ctx context.Context, _ *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { + logger.FromContext(ctx).Debug("Publishing stream", "function", logEntrypoint()) // Do not allow publishing at all. return &backend.PublishStreamResponse{ diff --git a/pkg/tsdb/grafana-pyroscope-datasource/pyroscopeClient.go b/pkg/tsdb/grafana-pyroscope-datasource/pyroscopeClient.go index 007ee6d0605..32e26af37a6 100644 --- a/pkg/tsdb/grafana-pyroscope-datasource/pyroscopeClient.go +++ b/pkg/tsdb/grafana-pyroscope-datasource/pyroscopeClient.go @@ -75,6 +75,7 @@ func (c *PyroscopeClient) ProfileTypes(ctx context.Context) ([]*ProfileType, err defer span.End() res, err := c.connectClient.ProfileTypes(ctx, connect.NewRequest(&querierv1.ProfileTypesRequest{})) if err != nil { + logger.Error("Received error from client", "error", err, "function", logEntrypoint()) span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return nil, err @@ -108,6 +109,7 @@ func (c *PyroscopeClient) GetSeries(ctx context.Context, profileTypeID string, l resp, err := c.connectClient.SelectSeries(ctx, req) if err != nil { + logger.Error("Received error from client", "error", err, "function", logEntrypoint()) span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return nil, err @@ -162,6 +164,7 @@ func (c *PyroscopeClient) GetProfile(ctx context.Context, profileTypeID, labelSe resp, err := c.connectClient.SelectMergeStacktraces(ctx, req) if err != nil { + logger.Error("Received error from client", "error", err, "function", logEntrypoint()) span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return nil, err @@ -207,9 +210,10 @@ func (c *PyroscopeClient) LabelNames(ctx context.Context) ([]string, error) { defer span.End() resp, err := c.connectClient.LabelNames(ctx, connect.NewRequest(&typesv1.LabelNamesRequest{})) if err != nil { + logger.Error("Received error from client", "error", err, "function", logEntrypoint()) span.RecordError(err) span.SetStatus(codes.Error, err.Error()) - return nil, fmt.Errorf("error seding LabelNames request %v", err) + return nil, fmt.Errorf("error sending LabelNames request %v", err) } var filtered []string @@ -227,6 +231,7 @@ func (c *PyroscopeClient) LabelValues(ctx context.Context, label string) ([]stri defer span.End() resp, err := c.connectClient.LabelValues(ctx, connect.NewRequest(&typesv1.LabelValuesRequest{Name: label})) if err != nil { + logger.Error("Received error from client", "error", err, "function", logEntrypoint()) span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return nil, err diff --git a/pkg/tsdb/grafana-pyroscope-datasource/query.go b/pkg/tsdb/grafana-pyroscope-datasource/query.go index 3f21c0cf8c8..0501ef19985 100644 --- a/pkg/tsdb/grafana-pyroscope-datasource/query.go +++ b/pkg/tsdb/grafana-pyroscope-datasource/query.go @@ -69,10 +69,10 @@ func (d *PyroscopeDatasource) query(ctx context.Context, pCtx backend.PluginCont parsedInterval, err = gtime.ParseDuration(dsJson.MinStep) if err != nil { parsedInterval = time.Second * 15 - logger.Debug("Failed to parse the MinStep using default", "MinStep", dsJson.MinStep) + logger.Error("Failed to parse the MinStep using default", "MinStep", dsJson.MinStep, "function", logEntrypoint()) } } - logger.Debug("Sending SelectSeriesRequest", "queryModel", qm) + logger.Debug("Sending SelectSeriesRequest", "queryModel", qm, "function", logEntrypoint()) seriesResp, err := d.client.GetSeries( gCtx, qm.ProfileTypeId, @@ -85,7 +85,7 @@ func (d *PyroscopeDatasource) query(ctx context.Context, pCtx backend.PluginCont if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) - logger.Error("Querying SelectSeries()", "err", err) + logger.Error("Querying SelectSeries()", "err", err, "function", logEntrypoint()) return err } // add the frames to the response. @@ -98,12 +98,12 @@ func (d *PyroscopeDatasource) query(ctx context.Context, pCtx backend.PluginCont if query.QueryType == queryTypeProfile || query.QueryType == queryTypeBoth { g.Go(func() error { - logger.Debug("Calling GetProfile", "queryModel", qm) + logger.Debug("Calling GetProfile", "queryModel", qm, "function", logEntrypoint()) prof, err := d.client.GetProfile(gCtx, qm.ProfileTypeId, qm.LabelSelector, query.TimeRange.From.UnixMilli(), query.TimeRange.To.UnixMilli(), qm.MaxNodes) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) - logger.Error("Error GetProfile()", "err", err) + logger.Error("Error GetProfile()", "err", err, "function", logEntrypoint()) return err } @@ -201,7 +201,7 @@ func levelsToTree(levels []*Level, names []string) *ProfileTree { // If we still have levels to go, this should not happen. Something is probably wrong with the flamebearer data. if len(parentsStack) == 0 { - logger.Error("ParentsStack is empty but we are not at the the last level", "currentLevel", currentLevel) + logger.Error("ParentsStack is empty but we are not at the the last level", "currentLevel", currentLevel, "function", logEntrypoint()) break } @@ -245,7 +245,7 @@ func levelsToTree(levels []*Level, names []string) *ProfileTree { // We went out of parents bounds so lets move to next parent. We will evaluate the same item again, but // we will check if it is a child of the next parent item in line. if len(parentsStack) == 0 { - logger.Error("ParentsStack is empty but there are still items in current level", "currentLevel", currentLevel, "itemIndex", itemIndex) + logger.Error("ParentsStack is empty but there are still items in current level", "currentLevel", currentLevel, "itemIndex", itemIndex, "function", logEntrypoint()) break } currentParent = parentsStack[:1][0] diff --git a/pkg/tsdb/grafana-pyroscope-datasource/service.go b/pkg/tsdb/grafana-pyroscope-datasource/service.go index 23f3ace482a..f80f18c8053 100644 --- a/pkg/tsdb/grafana-pyroscope-datasource/service.go +++ b/pkg/tsdb/grafana-pyroscope-datasource/service.go @@ -2,6 +2,9 @@ package pyroscope import ( "context" + "fmt" + "runtime" + "strings" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend/datasource" @@ -27,15 +30,34 @@ var ( _ backend.StreamHandler = (*Service)(nil) ) +type Service struct { + im instancemgmt.InstanceManager + logger log.Logger +} + var logger = log.New("tsdb.pyroscope") -type Service struct { - im instancemgmt.InstanceManager +// Return the file, line, and (full-path) function name of the caller +func getRunContext() (string, int, string) { + pc := make([]uintptr, 10) + runtime.Callers(2, pc) + f := runtime.FuncForPC(pc[0]) + file, line := f.FileLine(pc[0]) + return file, line, f.Name() +} + +// Return a formatted string representing the execution context for the logger +func logEntrypoint() string { + file, line, pathToFunction := getRunContext() + parts := strings.Split(pathToFunction, "/") + functionName := parts[len(parts)-1] + return fmt.Sprintf("%s:%d[%s]", file, line, functionName) } func (s *Service) getInstance(ctx context.Context, pluginCtx backend.PluginContext) (*PyroscopeDatasource, error) { i, err := s.im.Get(ctx, pluginCtx) if err != nil { + s.logger.FromContext(ctx).Error("Failed to get instance", "error", err, "pluginID", pluginCtx.PluginID, "function", logEntrypoint()) return nil, err } in := i.(*PyroscopeDatasource) @@ -44,7 +66,8 @@ func (s *Service) getInstance(ctx context.Context, pluginCtx backend.PluginConte func ProvideService(httpClientProvider httpclient.Provider, ac accesscontrol.AccessControl) *Service { return &Service{ - im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider, ac)), + im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider, ac)), + logger: logger, } } @@ -55,50 +78,110 @@ func newInstanceSettings(httpClientProvider httpclient.Provider, ac accesscontro } func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { + ctxLogger := s.logger.FromContext(ctx) + ctxLogger.Debug("Processing queries", "queriesLength", len(req.Queries), "function", logEntrypoint()) + i, err := s.getInstance(ctx, req.PluginContext) if err != nil { return nil, err } - return i.QueryData(ctx, req) + + response, err := i.QueryData(ctx, req) + if err != nil { + ctxLogger.Error("Received error from Pyroscope", "error", err, "function", logEntrypoint()) + } else { + ctxLogger.Debug("All queries processed", "function", logEntrypoint()) + } + return response, err } func (s *Service) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + loggerWithContext := s.logger.FromContext(ctx) + loggerWithContext.Debug("Calling resource", "function", logEntrypoint()) + i, err := s.getInstance(ctx, req.PluginContext) if err != nil { return err } - return i.CallResource(ctx, req, sender) + + err = i.CallResource(ctx, req, sender) + if err != nil { + loggerWithContext.Error("Received error from Pyroscope", "error", err, "function", logEntrypoint()) + } else { + loggerWithContext.Debug("Health check succeeded", "function", logEntrypoint()) + } + return err } func (s *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { + loggerWithContext := s.logger.FromContext(ctx) + loggerWithContext.Debug("Checking health", "function", logEntrypoint()) + i, err := s.getInstance(ctx, req.PluginContext) if err != nil { return nil, err } - return i.CheckHealth(ctx, req) + + response, err := i.CheckHealth(ctx, req) + if err != nil { + loggerWithContext.Error("Received error from Pyroscope", "error", err, "function", logEntrypoint()) + } else { + loggerWithContext.Debug("Health check succeeded", "function", logEntrypoint()) + } + return response, err } func (s *Service) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { + loggerWithContext := s.logger.FromContext(ctx) + loggerWithContext.Debug("Subscribing stream", "function", logEntrypoint()) + i, err := s.getInstance(ctx, req.PluginContext) if err != nil { return nil, err } - return i.SubscribeStream(ctx, req) + + response, err := i.SubscribeStream(ctx, req) + if err != nil { + loggerWithContext.Error("Received error from Pyroscope", "error", err, "function", logEntrypoint()) + } else { + loggerWithContext.Debug("Stream subscribed", "function", logEntrypoint()) + } + return response, err } func (s *Service) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error { + loggerWithContext := s.logger.FromContext(ctx) + loggerWithContext.Debug("Running stream", "function", logEntrypoint()) + i, err := s.getInstance(ctx, req.PluginContext) if err != nil { return err } - return i.RunStream(ctx, req, sender) + + err = i.RunStream(ctx, req, sender) + if err != nil { + loggerWithContext.Error("Received error from Pyroscope", "error", err, "function", logEntrypoint()) + } else { + loggerWithContext.Debug("Stream run", "function", logEntrypoint()) + } + return err } // PublishStream is called when a client sends a message to the stream. func (s *Service) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { + loggerWithContext := s.logger.FromContext(ctx) + loggerWithContext.Debug("Publishing stream", "function", logEntrypoint()) + i, err := s.getInstance(ctx, req.PluginContext) if err != nil { return nil, err } - return i.PublishStream(ctx, req) + + response, err := i.PublishStream(ctx, req) + if err != nil { + loggerWithContext.Error("Received error from Pyroscope", "error", err, "function", logEntrypoint()) + } else { + loggerWithContext.Debug("Stream published", "function", logEntrypoint()) + } + return response, err } diff --git a/pkg/tsdb/parca/plugin.go b/pkg/tsdb/parca/plugin.go index 50bf3a233a9..0d841bf5fb8 100644 --- a/pkg/tsdb/parca/plugin.go +++ b/pkg/tsdb/parca/plugin.go @@ -36,12 +36,15 @@ type ParcaDatasource struct { // NewParcaDatasource creates a new datasource instance. func NewParcaDatasource(ctx context.Context, httpClientProvider httpclient.Provider, settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { + ctxLogger := logger.FromContext(ctx) opt, err := settings.HTTPClientOptions(ctx) if err != nil { + ctxLogger.Error("Failed to get HTTP options", "error", err, "function", logEntrypoint()) return nil, err } httpClient, err := httpClientProvider.New(opt) if err != nil { + ctxLogger.Error("Failed to create HTTP client", "error", err, "function", logEntrypoint()) return nil, err } @@ -58,7 +61,8 @@ func (d *ParcaDatasource) Dispose() { } func (d *ParcaDatasource) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { - logger.Debug("CallResource", "Path", req.Path, "Method", req.Method, "Body", req.Body) + ctxLogger := logger.FromContext(ctx) + ctxLogger.Debug("CallResource", "Path", req.Path, "Method", req.Method, "Body", req.Body, "function", logEntrypoint()) ctx, span := tracing.DefaultTracer().Start(ctx, "datasource.parca.CallResource", trace.WithAttributes(attribute.String("path", req.Path), attribute.String("method", req.Method))) defer span.End() @@ -69,6 +73,7 @@ func (d *ParcaDatasource) CallResource(ctx context.Context, req *backend.CallRes return d.callLabelNames(ctx, req, sender) } if req.Path == "labelValues" { + ctxLogger.Debug("CallResource completed", "function", logEntrypoint()) return d.callLabelValues(ctx, req, sender) } return sender.Send(&backend.CallResourceResponse{ @@ -81,8 +86,6 @@ func (d *ParcaDatasource) CallResource(ctx context.Context, req *backend.CallRes // The QueryDataResponse contains a map of RefID to the response for each query, and each response // contains Frames ([]*Frame). func (d *ParcaDatasource) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { - logger.Debug("QueryData called", "queries", req.Queries) - // create response struct response := backend.NewQueryDataResponse() @@ -103,8 +106,7 @@ func (d *ParcaDatasource) QueryData(ctx context.Context, req *backend.QueryDataR // datasource configuration page which allows users to verify that // a datasource is working as expected. func (d *ParcaDatasource) CheckHealth(ctx context.Context, _ *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { - logger.Debug("CheckHealth called") - + ctxLogger := logger.FromContext(ctx) status := backend.HealthStatusOk message := "Data source is working" @@ -113,6 +115,7 @@ func (d *ParcaDatasource) CheckHealth(ctx context.Context, _ *backend.CheckHealt message = err.Error() } + ctxLogger.Debug("CheckHealth completed", "function", logEntrypoint()) return &backend.CheckHealthResult{ Status: status, Message: message, diff --git a/pkg/tsdb/parca/query.go b/pkg/tsdb/parca/query.go index 83ee24b6769..a01346bcde8 100644 --- a/pkg/tsdb/parca/query.go +++ b/pkg/tsdb/parca/query.go @@ -31,6 +31,7 @@ const ( // query processes single Parca query transforming the response to data.Frame packaged in DataResponse func (d *ParcaDatasource) query(ctx context.Context, pCtx backend.PluginContext, query backend.DataQuery) backend.DataResponse { + ctxLogger := logger.FromContext(ctx) ctx, span := tracing.DefaultTracer().Start(ctx, "datasource.parca.query", trace.WithAttributes(attribute.String("query_type", query.QueryType))) defer span.End() @@ -40,6 +41,7 @@ func (d *ParcaDatasource) query(ctx context.Context, pCtx backend.PluginContext, err := json.Unmarshal(query.JSON, &qm) if err != nil { response.Error = err + ctxLogger.Error("Failed to unmarshall query", "error", err, "function", logEntrypoint()) span.RecordError(response.Error) span.SetStatus(codes.Error, response.Error.Error()) return response @@ -49,6 +51,7 @@ func (d *ParcaDatasource) query(ctx context.Context, pCtx backend.PluginContext, seriesResp, err := d.client.QueryRange(ctx, makeMetricRequest(qm, query)) if err != nil { response.Error = err + ctxLogger.Error("Failed to process query", "error", err, "queryType", query.QueryType, "function", logEntrypoint()) span.RecordError(response.Error) span.SetStatus(codes.Error, response.Error.Error()) return response @@ -57,10 +60,11 @@ func (d *ParcaDatasource) query(ctx context.Context, pCtx backend.PluginContext, } if query.QueryType == queryTypeProfile || query.QueryType == queryTypeBoth { - logger.Debug("Querying SelectMergeStacktraces()", "queryModel", qm) + ctxLogger.Debug("Querying SelectMergeStacktraces()", "queryModel", qm, "function", logEntrypoint()) resp, err := d.client.Query(ctx, makeProfileRequest(qm, query)) if err != nil { response.Error = err + ctxLogger.Error("Failed to process query", "error", err, "queryType", query.QueryType, "function", logEntrypoint()) span.RecordError(response.Error) span.SetStatus(codes.Error, response.Error.Error()) return response diff --git a/pkg/tsdb/parca/resources.go b/pkg/tsdb/parca/resources.go index db27a9bfad4..f625137d505 100644 --- a/pkg/tsdb/parca/resources.go +++ b/pkg/tsdb/parca/resources.go @@ -25,10 +25,14 @@ type ProfileType struct { } func (d *ParcaDatasource) callProfileTypes(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + ctxLogger := logger.FromContext(ctx) + ctxLogger.Debug("Getting profile types", "function", logEntrypoint()) + ctx, span := tracing.DefaultTracer().Start(ctx, "datasource.parca.callProfileTypes") defer span.End() res, err := d.client.ProfileTypes(ctx, connect.NewRequest(&v1alpha1.ProfileTypesRequest{})) if err != nil { + ctxLogger.Error("Failed to get profile types", "error", err, "function", logEntrypoint()) span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return err @@ -56,24 +60,32 @@ func (d *ParcaDatasource) callProfileTypes(ctx context.Context, req *backend.Cal data, err := json.Marshal(types) if err != nil { + ctxLogger.Error("Failed to marshal profile types", "error", err, "function", logEntrypoint()) span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return err } err = sender.Send(&backend.CallResourceResponse{Body: data, Headers: req.Headers, Status: 200}) if err != nil { + ctxLogger.Error("Failed to send data to Parca", "error", err, "function", logEntrypoint()) span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return err } + + ctxLogger.Debug("Successfully got profile types", "function", logEntrypoint()) return nil } func (d *ParcaDatasource) callLabelNames(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + ctxLogger := logger.FromContext(ctx) + ctxLogger.Debug("Getting label names", "function", logEntrypoint()) + ctx, span := tracing.DefaultTracer().Start(ctx, "datasource.parca.callLabelNames") defer span.End() res, err := d.client.Labels(ctx, connect.NewRequest(&v1alpha1.LabelsRequest{})) if err != nil { + ctxLogger.Error("Failed to get label names", "error", err, "function", logEntrypoint()) span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return err @@ -81,49 +93,63 @@ func (d *ParcaDatasource) callLabelNames(ctx context.Context, req *backend.CallR data, err := json.Marshal(res.Msg.LabelNames) if err != nil { + ctxLogger.Error("Failed to marshal label names", "error", err, "function", logEntrypoint()) span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return err } err = sender.Send(&backend.CallResourceResponse{Body: data, Headers: req.Headers, Status: 200}) if err != nil { + ctxLogger.Error("Failed to send data to Parca", "error", err, "function", logEntrypoint()) span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return err } + + ctxLogger.Debug("Successfully got label names", "function", logEntrypoint()) return nil } func (d *ParcaDatasource) callLabelValues(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + ctxLogger := logger.FromContext(ctx) + ctxLogger.Debug("Getting label values", "function", logEntrypoint()) + ctx, span := tracing.DefaultTracer().Start(ctx, "datasource.parca.callLabelValues") defer span.End() parsedUrl, err := url.Parse(req.URL) if err != nil { + ctxLogger.Error("Failed to parse URL", "error", err, "function", logEntrypoint()) span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return err } label, ok := parsedUrl.Query()["label"] if !ok { + ctxLogger.Error("Failed to get label from query", "error", err, "function", logEntrypoint()) label = []string{""} } res, err := d.client.Values(ctx, connect.NewRequest(&v1alpha1.ValuesRequest{LabelName: label[0]})) if err != nil { + ctxLogger.Error("Failed to get values for given label", "error", err, "function", logEntrypoint()) span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return err } data, err := json.Marshal(res.Msg.LabelValues) if err != nil { + ctxLogger.Error("Failed to marshal label values", "error", err, "function", logEntrypoint()) span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return err } err = sender.Send(&backend.CallResourceResponse{Body: data, Headers: req.Headers, Status: 200}) if err != nil { + ctxLogger.Error("Failed to send data to Parca", "error", err, "function", logEntrypoint()) span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return err } + + ctxLogger.Debug("Successfully got label values", "function", logEntrypoint()) return nil } diff --git a/pkg/tsdb/parca/service.go b/pkg/tsdb/parca/service.go index b47b2ad7abf..6ecca95cd70 100644 --- a/pkg/tsdb/parca/service.go +++ b/pkg/tsdb/parca/service.go @@ -2,6 +2,9 @@ package parca import ( "context" + "fmt" + "runtime" + "strings" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend/datasource" @@ -25,15 +28,35 @@ var ( _ backend.CheckHealthHandler = (*Service)(nil) ) +type Service struct { + im instancemgmt.InstanceManager + logger log.Logger +} + var logger = log.New("tsdb.parca") -type Service struct { - im instancemgmt.InstanceManager +// Return the file, line, and (full-path) function name of the caller +func getRunContext() (string, int, string) { + pc := make([]uintptr, 10) + runtime.Callers(2, pc) + f := runtime.FuncForPC(pc[0]) + file, line := f.FileLine(pc[0]) + return file, line, f.Name() +} + +// Return a formatted string representing the execution context for the logger +func logEntrypoint() string { + file, line, pathToFunction := getRunContext() + parts := strings.Split(pathToFunction, "/") + functionName := parts[len(parts)-1] + return fmt.Sprintf("%s:%d[%s]", file, line, functionName) } func (s *Service) getInstance(ctx context.Context, pluginCtx backend.PluginContext) (*ParcaDatasource, error) { + ctxLogger := s.logger.FromContext(ctx) i, err := s.im.Get(ctx, pluginCtx) if err != nil { + ctxLogger.Error("Failed to get instance", "error", err, "pluginID", pluginCtx.PluginID, "function", logEntrypoint()) return nil, err } in := i.(*ParcaDatasource) @@ -42,7 +65,8 @@ func (s *Service) getInstance(ctx context.Context, pluginCtx backend.PluginConte func ProvideService(httpClientProvider httpclient.Provider) *Service { return &Service{ - im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider)), + im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider)), + logger: logger, } } @@ -53,25 +77,55 @@ func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.Inst } func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { + ctxLogger := s.logger.FromContext(ctx) + ctxLogger.Debug("Processing queries", "queryLength", len(req.Queries), "function", logEntrypoint()) + i, err := s.getInstance(ctx, req.PluginContext) if err != nil { return nil, err } - return i.QueryData(ctx, req) + + data, err := i.QueryData(ctx, req) + if err != nil { + ctxLogger.Error("Received error from Parca", "error", err, "function", logEntrypoint()) + } else { + ctxLogger.Debug("All queries processed", "function", logEntrypoint()) + } + return data, err } func (s *Service) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + ctxLogger := s.logger.FromContext(ctx) + ctxLogger.Debug("Calling resource", "path", req.Path, "function", logEntrypoint()) + i, err := s.getInstance(ctx, req.PluginContext) if err != nil { return err } - return i.CallResource(ctx, req, sender) + + err = i.CallResource(ctx, req, sender) + if err != nil { + ctxLogger.Error("Failed to call resource", "error", err, "function", logEntrypoint()) + } else { + ctxLogger.Debug("Resource called", "function", logEntrypoint()) + } + return err } func (s *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { + ctxLogger := s.logger.FromContext(ctx) + ctxLogger.Debug("Checking health", "function", logEntrypoint()) + i, err := s.getInstance(ctx, req.PluginContext) if err != nil { return nil, err } - return i.CheckHealth(ctx, req) + + check, err := i.CheckHealth(ctx, req) + if err != nil { + ctxLogger.Error("Health check failed", "error", err, "function", logEntrypoint()) + } else { + ctxLogger.Debug("Health check succeeded", "function", logEntrypoint()) + } + return check, err } diff --git a/pkg/tsdb/tempo/grpc.go b/pkg/tsdb/tempo/grpc.go index 39a90631f62..95f3fdefab2 100644 --- a/pkg/tsdb/tempo/grpc.go +++ b/pkg/tsdb/tempo/grpc.go @@ -10,12 +10,15 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient" + "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/tempo/pkg/tempopb" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" ) +var logger = log.New("tsdb.tempo") + // This function creates a new gRPC client to connect to a streaming query service. // It starts by parsing the URL from the data source settings and extracting the host, since that's what the gRPC connection expects. // If the URL does not contain a port number, it adds a default port based on the scheme (80 for HTTP and 443 for HTTPS). @@ -24,6 +27,7 @@ import ( func newGrpcClient(settings backend.DataSourceInstanceSettings, opts httpclient.Options) (tempopb.StreamingQuerierClient, error) { parsedUrl, err := url.Parse(settings.URL) if err != nil { + logger.Error("Error parsing URL for gRPC client", "error", err, "URL", settings.URL, "function", logEntrypoint()) return nil, err } @@ -48,8 +52,10 @@ func newGrpcClient(settings backend.DataSourceInstanceSettings, opts httpclient. clientConn, err := grpc.Dial(onlyHost, dialOps...) if err != nil { + logger.Error("Error dialing gRPC client", "error", err, "URL", settings.URL, "function", logEntrypoint()) return nil, err } + return tempopb.NewStreamingQuerierClient(clientConn), nil } diff --git a/pkg/tsdb/tempo/search_stream_test.go b/pkg/tsdb/tempo/search_stream_test.go index dddda21d201..5cf6c09b62e 100644 --- a/pkg/tsdb/tempo/search_stream_test.go +++ b/pkg/tsdb/tempo/search_stream_test.go @@ -18,7 +18,10 @@ import ( ) func TestProcessStream_ValidInput_ReturnsNoError(t *testing.T) { - service := &Service{} + logger := log.New("tsdb.tempo.test") + service := &Service{ + logger: logger, + } searchClient := &mockStreamer{} streamSender := &mockSender{} err := service.processStream(context.Background(), searchClient, streamSender) diff --git a/pkg/tsdb/tempo/tempo.go b/pkg/tsdb/tempo/tempo.go index b4c924de285..df697ebd6c8 100644 --- a/pkg/tsdb/tempo/tempo.go +++ b/pkg/tsdb/tempo/tempo.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "net/http" + "runtime" + "strings" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend/datasource" @@ -19,6 +21,23 @@ type Service struct { logger log.Logger } +// Return the file, line, and (full-path) function name of the caller +func getRunContext() (string, int, string) { + pc := make([]uintptr, 10) + runtime.Callers(2, pc) + f := runtime.FuncForPC(pc[0]) + file, line := f.FileLine(pc[0]) + return file, line, f.Name() +} + +// Return a formatted string representing the execution context for the logger +func logEntrypoint() string { + file, line, pathToFunction := getRunContext() + parts := strings.Split(pathToFunction, "/") + functionName := parts[len(parts)-1] + return fmt.Sprintf("%s:%d[%s]", file, line, functionName) +} + func ProvideService(httpClientProvider httpclient.Provider) *Service { return &Service{ logger: log.New("tsdb.tempo"), @@ -34,18 +53,22 @@ type Datasource struct { func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.InstanceFactoryFunc { return func(ctx context.Context, settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { + ctxLogger := log.New("tsdb.tempo").FromContext(ctx) opts, err := settings.HTTPClientOptions(ctx) if err != nil { + ctxLogger.Error("Failed to get HTTP client options", "error", err, "function", logEntrypoint()) return nil, err } client, err := httpClientProvider.New(opts) if err != nil { + ctxLogger.Error("Failed to get HTTP client provider", "error", err, "function", logEntrypoint()) return nil, err } streamingClient, err := newGrpcClient(settings, opts) if err != nil { + ctxLogger.Error("Failed to get gRPC client", "error", err, "function", logEntrypoint()) return nil, err } @@ -59,20 +82,29 @@ func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.Inst } func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { + ctxLogger := s.logger.FromContext(ctx) + ctxLogger.Debug("Processing queries", "queryLength", len(req.Queries), "function", logEntrypoint()) + // create response struct response := backend.NewQueryDataResponse() // loop over queries and execute them individually. - for _, q := range req.Queries { + for i, q := range req.Queries { + ctxLogger.Debug("Processing query", "counter", i, "function", logEntrypoint()) if res, err := s.query(ctx, req.PluginContext, q); err != nil { + ctxLogger.Error("Error processing query", "error", err) return response, err } else { if res != nil { + ctxLogger.Debug("Query processed", "counter", i, "function", logEntrypoint()) response.Responses[q.RefID] = *res + } else { + ctxLogger.Debug("Query resulted in empty response", "counter", i, "function", logEntrypoint()) } } } + ctxLogger.Debug("All queries processed", "function", logEntrypoint()) return response, nil } @@ -80,7 +112,6 @@ func (s *Service) query(ctx context.Context, pCtx backend.PluginContext, query b if query.QueryType == string(dataquery.TempoQueryTypeTraceId) { return s.getTrace(ctx, pCtx, query) } - return nil, fmt.Errorf("unsupported query type: '%s' for query with refID '%s'", query.QueryType, query.RefID) } diff --git a/pkg/tsdb/tempo/trace.go b/pkg/tsdb/tempo/trace.go index 8b3ac71aef7..02ed21b4245 100644 --- a/pkg/tsdb/tempo/trace.go +++ b/pkg/tsdb/tempo/trace.go @@ -18,6 +18,9 @@ import ( ) func (s *Service) getTrace(ctx context.Context, pCtx backend.PluginContext, query backend.DataQuery) (*backend.DataResponse, error) { + ctxLogger := s.logger.FromContext(ctx) + ctxLogger.Debug("Getting trace", "function", logEntrypoint()) + result := &backend.DataResponse{} refID := query.RefID @@ -29,20 +32,25 @@ func (s *Service) getTrace(ctx context.Context, pCtx backend.PluginContext, quer model := &dataquery.TempoQuery{} err := json.Unmarshal(query.JSON, model) if err != nil { + ctxLogger.Error("Failed to unmarshall Tempo query model", "error", err, "function", logEntrypoint()) return result, err } dsInfo, err := s.getDSInfo(ctx, pCtx) if err != nil { + ctxLogger.Error("Failed to get datasource information", "error", err, "function", logEntrypoint()) return nil, err } if model.Query == nil || *model.Query == "" { - return result, fmt.Errorf("trace id is required") + err := fmt.Errorf("trace id is required") + ctxLogger.Error("Failed to validate model query", "error", err, "function", logEntrypoint()) + return result, err } request, err := s.createRequest(ctx, dsInfo, *model.Query, query.TimeRange.From.Unix(), query.TimeRange.To.Unix()) if err != nil { + ctxLogger.Error("Failed to create request", "error", err, "function", logEntrypoint()) span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return result, err @@ -50,6 +58,7 @@ func (s *Service) getTrace(ctx context.Context, pCtx backend.PluginContext, quer resp, err := dsInfo.HTTPClient.Do(request) if err != nil { + ctxLogger.Error("Failed to send request to Tempo", "error", err, "function", logEntrypoint()) span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return result, fmt.Errorf("failed get to tempo: %w", err) @@ -57,16 +66,18 @@ func (s *Service) getTrace(ctx context.Context, pCtx backend.PluginContext, quer defer func() { if err := resp.Body.Close(); err != nil { - s.logger.FromContext(ctx).Warn("Failed to close response body", "err", err) + ctxLogger.Error("Failed to close response body", "error", err, "function", logEntrypoint()) } }() body, err := io.ReadAll(resp.Body) if err != nil { + ctxLogger.Error("Failed to read response body", "error", err, "function", logEntrypoint()) return &backend.DataResponse{}, err } if resp.StatusCode != http.StatusOK { + ctxLogger.Error("Failed to get trace", "error", err, "function", logEntrypoint()) result.Error = fmt.Errorf("failed to get trace with id: %v Status: %s Body: %s", model.Query, resp.Status, string(body)) span.RecordError(result.Error) span.SetStatus(codes.Error, result.Error.Error()) @@ -77,6 +88,7 @@ func (s *Service) getTrace(ctx context.Context, pCtx backend.PluginContext, quer otTrace, err := pbUnmarshaler.UnmarshalTraces(body) if err != nil { + ctxLogger.Error("Failed to convert tempo response to Otlp", "error", err, "function", logEntrypoint()) span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return &backend.DataResponse{}, fmt.Errorf("failed to convert tempo response to Otlp: %w", err) @@ -84,18 +96,23 @@ func (s *Service) getTrace(ctx context.Context, pCtx backend.PluginContext, quer frame, err := TraceToFrame(otTrace) if err != nil { + ctxLogger.Error("Failed to transform trace to data frame", "error", err, "function", logEntrypoint()) span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return &backend.DataResponse{}, fmt.Errorf("failed to transform trace %v to data frame: %w", model.Query, err) } + frame.RefID = refID frames := []*data.Frame{frame} result.Frames = frames + ctxLogger.Debug("Successfully got trace", "function", logEntrypoint()) return result, nil } func (s *Service) createRequest(ctx context.Context, dsInfo *Datasource, traceID string, start int64, end int64) (*http.Request, error) { + ctxLogger := s.logger.FromContext(ctx) var tempoQuery string + if start == 0 || end == 0 { tempoQuery = fmt.Sprintf("%s/api/traces/%s", dsInfo.URL, traceID) } else { @@ -104,11 +121,10 @@ func (s *Service) createRequest(ctx context.Context, dsInfo *Datasource, traceID req, err := http.NewRequestWithContext(ctx, "GET", tempoQuery, nil) if err != nil { + ctxLogger.Error("Failed to create request", "error", err, "function", logEntrypoint()) return nil, err } req.Header.Set("Accept", "application/protobuf") - - s.logger.FromContext(ctx).Debug("Tempo request", "url", req.URL.String(), "headers", req.Header) return req, nil }