mirror of
https://github.com/grafana/grafana.git
synced 2024-12-01 21:19:28 -06:00
81f8c022a0
Decouple backend from infra imports
332 lines
12 KiB
Go
332 lines
12 KiB
Go
package pyroscope
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"net/url"
|
|
"slices"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
|
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/prometheus/prometheus/promql/parser"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/trace"
|
|
)
|
|
|
|
var (
|
|
_ backend.QueryDataHandler = (*PyroscopeDatasource)(nil)
|
|
_ backend.CallResourceHandler = (*PyroscopeDatasource)(nil)
|
|
_ backend.CheckHealthHandler = (*PyroscopeDatasource)(nil)
|
|
_ backend.StreamHandler = (*PyroscopeDatasource)(nil)
|
|
)
|
|
|
|
type ProfilingClient interface {
|
|
ProfileTypes(ctx context.Context, start int64, end int64) ([]*ProfileType, error)
|
|
LabelNames(ctx context.Context, labelSelector string, start int64, end int64) ([]string, error)
|
|
LabelValues(ctx context.Context, label string, labelSelector string, start int64, end int64) ([]string, error)
|
|
GetSeries(ctx context.Context, profileTypeID string, labelSelector string, start int64, end int64, groupBy []string, step float64) (*SeriesResponse, error)
|
|
GetProfile(ctx context.Context, profileTypeID string, labelSelector string, start int64, end int64, maxNodes *int64) (*ProfileResponse, error)
|
|
GetSpanProfile(ctx context.Context, profileTypeID string, labelSelector string, spanSelector []string, start int64, end int64, maxNodes *int64) (*ProfileResponse, error)
|
|
}
|
|
|
|
// PyroscopeDatasource is a datasource for querying application performance profiles.
|
|
type PyroscopeDatasource struct {
|
|
httpClient *http.Client
|
|
client ProfilingClient
|
|
settings backend.DataSourceInstanceSettings
|
|
}
|
|
|
|
// NewPyroscopeDatasource creates a new datasource instance.
|
|
func NewPyroscopeDatasource(ctx context.Context, httpClientProvider httpclient.Provider, settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
|
|
ctxLogger := logger.FromContext(ctx)
|
|
opt, err := settings.HTTPClientOptions(ctx)
|
|
if err != nil {
|
|
ctxLogger.Error("Failed to get HTTP client options", "error", err, "function", logEntrypoint())
|
|
return nil, err
|
|
}
|
|
httpClient, err := httpClientProvider.New(opt)
|
|
if err != nil {
|
|
ctxLogger.Error("Failed to create HTTP client", "error", err, "function", logEntrypoint())
|
|
return nil, err
|
|
}
|
|
|
|
return &PyroscopeDatasource{
|
|
httpClient: httpClient,
|
|
client: NewPyroscopeClient(httpClient, settings.URL),
|
|
settings: settings,
|
|
}, nil
|
|
}
|
|
|
|
func (d *PyroscopeDatasource) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
|
|
ctxLogger := logger.FromContext(ctx)
|
|
ctx, span := tracing.DefaultTracer().Start(ctx, "datasource.pyroscope.CallResource", trace.WithAttributes(attribute.String("path", req.Path), attribute.String("method", req.Method)))
|
|
defer span.End()
|
|
ctxLogger.Debug("CallResource", "Path", req.Path, "Method", req.Method, "Body", req.Body, "function", logEntrypoint())
|
|
if req.Path == "profileTypes" {
|
|
return d.profileTypes(ctx, req, sender)
|
|
}
|
|
if req.Path == "labelNames" {
|
|
return d.labelNames(ctx, req, sender)
|
|
}
|
|
if req.Path == "labelValues" {
|
|
return d.labelValues(ctx, req, sender)
|
|
}
|
|
return sender.Send(&backend.CallResourceResponse{
|
|
Status: 404,
|
|
})
|
|
}
|
|
|
|
func (d *PyroscopeDatasource) profileTypes(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
|
|
ctxLogger := logger.FromContext(ctx)
|
|
|
|
u, err := url.Parse(req.URL)
|
|
if err != nil {
|
|
ctxLogger.Error("Failed to parse URL", "error", err, "function", logEntrypoint())
|
|
return err
|
|
}
|
|
query := u.Query()
|
|
|
|
var start, end int64
|
|
if query.Has("start") && query.Has("end") {
|
|
start, err = strconv.ParseInt(query.Get("start"), 10, 64)
|
|
if err != nil {
|
|
ctxLogger.Error("Failed to parse start as int", "error", err, "function", logEntrypoint())
|
|
return err
|
|
}
|
|
|
|
end, err = strconv.ParseInt(query.Get("end"), 10, 64)
|
|
if err != nil {
|
|
ctxLogger.Error("Failed to parse end as int", "error", err, "function", logEntrypoint())
|
|
return err
|
|
}
|
|
}
|
|
|
|
types, err := d.client.ProfileTypes(ctx, start, end)
|
|
if err != nil {
|
|
ctxLogger.Error("Received error from client", "error", err, "function", logEntrypoint())
|
|
return err
|
|
}
|
|
bodyData, err := json.Marshal(types)
|
|
if err != nil {
|
|
ctxLogger.Error("Failed to marshal response", "error", err, "function", logEntrypoint())
|
|
return err
|
|
}
|
|
err = sender.Send(&backend.CallResourceResponse{Body: bodyData, Headers: req.Headers, Status: 200})
|
|
if err != nil {
|
|
ctxLogger.Error("Failed to send response", "error", err, "function", logEntrypoint())
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (d *PyroscopeDatasource) labelNames(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
|
|
ctxLogger := logger.FromContext(ctx)
|
|
|
|
u, err := url.Parse(req.URL)
|
|
if err != nil {
|
|
ctxLogger.Error("Failed to parse URL", "error", err, "function", logEntrypoint())
|
|
return err
|
|
}
|
|
query := u.Query()
|
|
|
|
start, _ := strconv.ParseInt(query.Get("start"), 10, 64)
|
|
end, _ := strconv.ParseInt(query.Get("end"), 10, 64)
|
|
labelSelector := query.Get("query")
|
|
matchers, err := parser.ParseMetricSelector(labelSelector)
|
|
if err != nil {
|
|
ctxLogger.Error("Could not parse label selector", "error", err, "function", logEntrypoint())
|
|
return fmt.Errorf("failed parsing label selector: %v", err)
|
|
}
|
|
|
|
labelNames, err := d.client.LabelNames(ctx, labelSelector, start, end)
|
|
if err != nil {
|
|
ctxLogger.Error("Received error from client", "error", err, "function", logEntrypoint())
|
|
return fmt.Errorf("error calling LabelNames: %v", err)
|
|
}
|
|
|
|
finalLabels := make([]string, 0)
|
|
for _, label := range labelNames {
|
|
if slices.ContainsFunc(matchers, func(m *labels.Matcher) bool {
|
|
return m.Name == label
|
|
}) {
|
|
continue
|
|
}
|
|
finalLabels = append(finalLabels, label)
|
|
}
|
|
|
|
jsonResponse, err := json.Marshal(finalLabels)
|
|
if err != nil {
|
|
ctxLogger.Error("Failed to marshal response", "error", err, "function", logEntrypoint())
|
|
return err
|
|
}
|
|
err = sender.Send(&backend.CallResourceResponse{Body: jsonResponse, Headers: req.Headers, Status: 200})
|
|
if err != nil {
|
|
ctxLogger.Error("Failed to send response", "error", err, "function", logEntrypoint())
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type LabelValuesPayload struct {
|
|
Query string
|
|
Label string
|
|
Start int64
|
|
End int64
|
|
}
|
|
|
|
func (d *PyroscopeDatasource) labelValues(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
|
|
ctxLogger := logger.FromContext(ctx)
|
|
u, err := url.Parse(req.URL)
|
|
if err != nil {
|
|
ctxLogger.Error("Failed to parse URL", "error", err, "function", logEntrypoint())
|
|
return err
|
|
}
|
|
query := u.Query()
|
|
|
|
start, _ := strconv.ParseInt(query.Get("start"), 10, 64)
|
|
end, _ := strconv.ParseInt(query.Get("end"), 10, 64)
|
|
label := query.Get("label")
|
|
|
|
res, err := d.client.LabelValues(ctx, label, query.Get("query"), start, end)
|
|
if err != nil {
|
|
ctxLogger.Error("Received error from client", "error", err, "function", logEntrypoint())
|
|
return fmt.Errorf("error calling LabelValues: %v", err)
|
|
}
|
|
|
|
data, err := json.Marshal(res)
|
|
if err != nil {
|
|
ctxLogger.Error("Failed to marshal response", "error", err, "function", logEntrypoint())
|
|
return err
|
|
}
|
|
|
|
err = sender.Send(&backend.CallResourceResponse{Body: data, Headers: req.Headers, Status: 200})
|
|
if err != nil {
|
|
ctxLogger.Error("Failed to send response", "error", err, "function", logEntrypoint())
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// QueryData handles multiple queries and returns multiple responses.
|
|
// req contains the queries []DataQuery (where each query contains RefID as a unique identifier).
|
|
// The QueryDataResponse contains a map of RefID to the response for each query, and each response
|
|
// contains Frames ([]*Frame).
|
|
func (d *PyroscopeDatasource) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
|
ctxLogger := logger.FromContext(ctx)
|
|
ctxLogger.Debug("Processing queries", "queryLength", len(req.Queries), "function", logEntrypoint())
|
|
|
|
// create response struct
|
|
response := backend.NewQueryDataResponse()
|
|
|
|
// loop over queries and execute them individually.
|
|
for i, q := range req.Queries {
|
|
ctxLogger.Debug("Processing query", "counter", i, "function", logEntrypoint())
|
|
res := d.query(ctx, req.PluginContext, q)
|
|
|
|
// save the response in a hashmap
|
|
// based on with RefID as identifier
|
|
response.Responses[q.RefID] = res
|
|
}
|
|
|
|
ctxLogger.Debug("All queries processed", "function", logEntrypoint())
|
|
return response, nil
|
|
}
|
|
|
|
// CheckHealth handles health checks sent from Grafana to the plugin.
|
|
// The main use case for these health checks is the test button on the
|
|
// datasource configuration page which allows users to verify that
|
|
// a datasource is working as expected.
|
|
func (d *PyroscopeDatasource) CheckHealth(ctx context.Context, _ *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
|
|
logger.FromContext(ctx).Debug("CheckHealth called", "function", logEntrypoint())
|
|
|
|
status := backend.HealthStatusOk
|
|
message := "Data source is working"
|
|
|
|
// Since this is a health check mechanism and we only care about whether the
|
|
// request succeeded or failed, we set the window to be small.
|
|
start := time.Now().Add(-5 * time.Minute).UnixMilli()
|
|
end := time.Now().UnixMilli()
|
|
if _, err := d.client.ProfileTypes(ctx, start, end); err != nil {
|
|
status = backend.HealthStatusError
|
|
message = err.Error()
|
|
}
|
|
|
|
return &backend.CheckHealthResult{
|
|
Status: status,
|
|
Message: message,
|
|
}, nil
|
|
}
|
|
|
|
// SubscribeStream is called when a client wants to connect to a stream. This callback
|
|
// allows sending the first message.
|
|
func (d *PyroscopeDatasource) SubscribeStream(_ context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
|
|
logger.Debug("Subscribing stream called", "function", logEntrypoint())
|
|
|
|
status := backend.SubscribeStreamStatusPermissionDenied
|
|
if req.Path == "stream" {
|
|
// Allow subscribing only on expected path.
|
|
status = backend.SubscribeStreamStatusOK
|
|
}
|
|
return &backend.SubscribeStreamResponse{
|
|
Status: status,
|
|
}, nil
|
|
}
|
|
|
|
// RunStream is called once for any open channel. Results are shared with everyone
|
|
// subscribed to the same channel.
|
|
func (d *PyroscopeDatasource) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
|
|
ctxLogger := logger.FromContext(ctx)
|
|
ctxLogger.Debug("Running stream", "path", req.Path, "function", logEntrypoint())
|
|
|
|
// Create the same data frame as for query data.
|
|
frame := data.NewFrame("response")
|
|
|
|
// Add fields (matching the same schema used in QueryData).
|
|
frame.Fields = append(frame.Fields,
|
|
data.NewField("time", nil, make([]time.Time, 1)),
|
|
data.NewField("values", nil, make([]int64, 1)),
|
|
)
|
|
|
|
counter := 0
|
|
|
|
// Stream data frames periodically till stream closed by Grafana.
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
ctxLogger.Info("Context done, finish streaming", "path", req.Path, "function", logEntrypoint())
|
|
return nil
|
|
case <-time.After(time.Second):
|
|
// Send new data periodically.
|
|
frame.Fields[0].Set(0, time.Now())
|
|
frame.Fields[1].Set(0, int64(10*(counter%2+1)))
|
|
|
|
counter++
|
|
|
|
err := sender.SendFrame(frame, data.IncludeAll)
|
|
if err != nil {
|
|
ctxLogger.Error("Error sending frame", "error", err, "function", logEntrypoint())
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// PublishStream is called when a client sends a message to the stream.
|
|
func (d *PyroscopeDatasource) PublishStream(ctx context.Context, _ *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
|
|
logger.FromContext(ctx).Debug("Publishing stream", "function", logEntrypoint())
|
|
|
|
// Do not allow publishing at all.
|
|
return &backend.PublishStreamResponse{
|
|
Status: backend.PublishStreamStatusPermissionDenied,
|
|
}, nil
|
|
}
|