Pyroscope: Rename phlare to grafana-pyroscope-datasource (#68859)

This commit is contained in:
Ryan McKinley
2023-06-06 20:09:29 -07:00
committed by GitHub
parent cefcbfa5ed
commit e17ef5e504
42 changed files with 48 additions and 22 deletions

View File

@@ -0,0 +1,66 @@
package phlare
import (
"context"
"net/http"
)
type ProfilingClient interface {
ProfileTypes(context.Context) ([]*ProfileType, error)
LabelNames(ctx context.Context, query string, start int64, end int64) ([]string, error)
LabelValues(ctx context.Context, query string, label string, start int64, end int64) ([]string, error)
GetSeries(ctx context.Context, profileTypeID string, labelSelector string, start int64, end int64, groupBy []string, step float64) (*SeriesResponse, error)
GetProfile(ctx context.Context, profileTypeID string, labelSelector string, start int64, end int64, maxNodes *int64) (*ProfileResponse, error)
}
type ProfileType struct {
ID string `json:"id"`
Label string `json:"label"`
}
func getClient(backendType string, httpClient *http.Client, url string) ProfilingClient {
if backendType == "pyroscope" {
return NewPyroscopeClient(httpClient, url)
}
// We treat unset value as phlare
return NewPhlareClient(httpClient, url)
}
type Flamebearer struct {
Names []string
Levels []*Level
Total int64
MaxSelf int64
}
type Level struct {
Values []int64
}
type Series struct {
Labels []*LabelPair
Points []*Point
}
type LabelPair struct {
Name string
Value string
}
type Point struct {
Value float64
// Milliseconds unix timestamp
Timestamp int64
}
type ProfileResponse struct {
Flamebearer *Flamebearer
Units string
}
type SeriesResponse struct {
Series []*Series
Units string
Label string
}

View File

@@ -0,0 +1,339 @@
package phlare
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"strconv"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/infra/httpclient"
"github.com/grafana/grafana/pkg/services/accesscontrol"
"github.com/grafana/grafana/pkg/services/contexthandler"
"github.com/grafana/grafana/pkg/services/datasources"
)
var (
_ backend.QueryDataHandler = (*PhlareDatasource)(nil)
_ backend.CallResourceHandler = (*PhlareDatasource)(nil)
_ backend.CheckHealthHandler = (*PhlareDatasource)(nil)
_ backend.StreamHandler = (*PhlareDatasource)(nil)
)
// PhlareDatasource is a datasource for querying application performance profiles.
type PhlareDatasource struct {
httpClient *http.Client
client ProfilingClient
settings backend.DataSourceInstanceSettings
ac accesscontrol.AccessControl
}
type JsonData struct {
BackendType string `json:"backendType"`
}
// NewPhlareDatasource creates a new datasource instance.
func NewPhlareDatasource(httpClientProvider httpclient.Provider, settings backend.DataSourceInstanceSettings, ac accesscontrol.AccessControl) (instancemgmt.Instance, error) {
opt, err := settings.HTTPClientOptions()
if err != nil {
return nil, err
}
httpClient, err := httpClientProvider.New(opt)
if err != nil {
return nil, err
}
var jsonData *JsonData
err = json.Unmarshal(settings.JSONData, &jsonData)
if err != nil {
return nil, err
}
return &PhlareDatasource{
httpClient: httpClient,
client: getClient(jsonData.BackendType, httpClient, settings.URL),
settings: settings,
ac: ac,
}, nil
}
func (d *PhlareDatasource) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
logger.Debug("CallResource", "Path", req.Path, "Method", req.Method, "Body", req.Body)
if req.Path == "profileTypes" {
return d.profileTypes(ctx, req, sender)
}
if req.Path == "labelNames" {
return d.labelNames(ctx, req, sender)
}
if req.Path == "labelValues" {
return d.labelValues(ctx, req, sender)
}
if req.Path == "backendType" {
return d.backendType(ctx, req, sender)
}
return sender.Send(&backend.CallResourceResponse{
Status: 404,
})
}
func (d *PhlareDatasource) profileTypes(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
types, err := d.client.ProfileTypes(ctx)
if err != nil {
return err
}
bodyData, err := json.Marshal(types)
if err != nil {
return err
}
err = sender.Send(&backend.CallResourceResponse{Body: bodyData, Headers: req.Headers, Status: 200})
if err != nil {
return err
}
return nil
}
func (d *PhlareDatasource) labelNames(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
u, err := url.Parse(req.URL)
if err != nil {
return err
}
query := u.Query()
start, err := strconv.ParseInt(query["start"][0], 10, 64)
if err != nil {
return err
}
end, err := strconv.ParseInt(query["end"][0], 10, 64)
if err != nil {
return err
}
res, err := d.client.LabelNames(ctx, query["query"][0], start, end)
if err != nil {
return fmt.Errorf("error calling LabelNames: %v", err)
}
data, err := json.Marshal(res)
if err != nil {
return err
}
err = sender.Send(&backend.CallResourceResponse{Body: data, Headers: req.Headers, Status: 200})
if err != nil {
return err
}
return nil
}
type LabelValuesPayload struct {
Query string
Label string
Start int64
End int64
}
func (d *PhlareDatasource) labelValues(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
u, err := url.Parse(req.URL)
if err != nil {
return err
}
query := u.Query()
start, err := strconv.ParseInt(query["start"][0], 10, 64)
if err != nil {
return err
}
end, err := strconv.ParseInt(query["end"][0], 10, 64)
if err != nil {
return err
}
res, err := d.client.LabelValues(ctx, query["query"][0], query["label"][0], start, end)
if err != nil {
return fmt.Errorf("error calling LabelValues: %v", err)
}
data, err := json.Marshal(res)
if err != nil {
return err
}
err = sender.Send(&backend.CallResourceResponse{Body: data, Headers: req.Headers, Status: 200})
if err != nil {
return err
}
return nil
}
type BackendTypeRespBody struct {
BackendType string `json:"backendType"` // "phlare" or "pyroscope"
}
// backendType is a simplistic test to figure out if we are speaking to phlare or pyroscope backend
func (d *PhlareDatasource) backendType(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
// To prevent any user sending arbitrary URL for us to test with we allow this only for users who can edit the datasource
// as config page is where this is meant to be used.
ok, err := d.isUserAllowedToEditDatasource(ctx)
if err != nil {
return err
}
if !ok {
return sender.Send(&backend.CallResourceResponse{Headers: req.Headers, Status: 401})
}
u, err := url.Parse(req.URL)
if err != nil {
return err
}
query := u.Query()
body := &BackendTypeRespBody{BackendType: "unknown"}
// We take the url from the request query because the data source may not yet be saved in DB with the URL we want
// to test with (like when filling in the confgi page for the first time)
url := query["url"][0]
pyroClient := getClient("pyroscope", d.httpClient, url)
_, err = pyroClient.ProfileTypes(ctx)
if err == nil {
body.BackendType = "pyroscope"
} else {
phlareClient := getClient("phlare", d.httpClient, url)
_, err := phlareClient.ProfileTypes(ctx)
if err == nil {
body.BackendType = "phlare"
}
}
data, err := json.Marshal(body)
if err != nil {
return err
}
return sender.Send(&backend.CallResourceResponse{Body: data, Headers: req.Headers, Status: 200})
}
func (d *PhlareDatasource) isUserAllowedToEditDatasource(ctx context.Context) (bool, error) {
reqCtx := contexthandler.FromContext(ctx)
uidScope := datasources.ScopeProvider.GetResourceScopeUID(accesscontrol.Parameter(":uid"))
if reqCtx == nil || reqCtx.SignedInUser == nil {
return false, nil
}
ok, err := d.ac.Evaluate(ctx, reqCtx.SignedInUser, accesscontrol.EvalPermission(datasources.ActionWrite, uidScope))
if err != nil {
return false, err
}
if !ok {
return false, nil
}
return true, nil
}
// QueryData handles multiple queries and returns multiple responses.
// req contains the queries []DataQuery (where each query contains RefID as a unique identifier).
// The QueryDataResponse contains a map of RefID to the response for each query, and each response
// contains Frames ([]*Frame).
func (d *PhlareDatasource) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
logger.Debug("QueryData called", "Queries", req.Queries)
// create response struct
response := backend.NewQueryDataResponse()
// loop over queries and execute them individually.
for _, q := range req.Queries {
res := d.query(ctx, req.PluginContext, q)
// save the response in a hashmap
// based on with RefID as identifier
response.Responses[q.RefID] = res
}
return response, nil
}
// CheckHealth handles health checks sent from Grafana to the plugin.
// The main use case for these health checks is the test button on the
// datasource configuration page which allows users to verify that
// a datasource is working as expected.
func (d *PhlareDatasource) CheckHealth(ctx context.Context, _ *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
logger.Debug("CheckHealth called")
status := backend.HealthStatusOk
message := "Data source is working"
if _, err := d.client.ProfileTypes(ctx); err != nil {
status = backend.HealthStatusError
message = err.Error()
}
return &backend.CheckHealthResult{
Status: status,
Message: message,
}, nil
}
// SubscribeStream is called when a client wants to connect to a stream. This callback
// allows sending the first message.
func (d *PhlareDatasource) SubscribeStream(_ context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
logger.Debug("SubscribeStream called")
status := backend.SubscribeStreamStatusPermissionDenied
if req.Path == "stream" {
// Allow subscribing only on expected path.
status = backend.SubscribeStreamStatusOK
}
return &backend.SubscribeStreamResponse{
Status: status,
}, nil
}
// RunStream is called once for any open channel. Results are shared with everyone
// subscribed to the same channel.
func (d *PhlareDatasource) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
logger.Debug("RunStream called")
// Create the same data frame as for query data.
frame := data.NewFrame("response")
// Add fields (matching the same schema used in QueryData).
frame.Fields = append(frame.Fields,
data.NewField("time", nil, make([]time.Time, 1)),
data.NewField("values", nil, make([]int64, 1)),
)
counter := 0
// Stream data frames periodically till stream closed by Grafana.
for {
select {
case <-ctx.Done():
logger.Info("Context done, finish streaming", "path", req.Path)
return nil
case <-time.After(time.Second):
// Send new data periodically.
frame.Fields[0].Set(0, time.Now())
frame.Fields[1].Set(0, int64(10*(counter%2+1)))
counter++
err := sender.SendFrame(frame, data.IncludeAll)
if err != nil {
logger.Error("Error sending frame", "error", err)
continue
}
}
}
}
// PublishStream is called when a client sends a message to the stream.
func (d *PhlareDatasource) PublishStream(_ context.Context, _ *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
logger.Debug("PublishStream called")
// Do not allow publishing at all.
return &backend.PublishStreamResponse{
Status: backend.PublishStreamStatusPermissionDenied,
}, nil
}

View File

@@ -0,0 +1,64 @@
package phlare
import (
"context"
"testing"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/stretchr/testify/require"
)
// This is where the tests for the datasource backend live.
func Test_QueryData(t *testing.T) {
ds := PhlareDatasource{}
resp, err := ds.QueryData(
context.Background(),
&backend.QueryDataRequest{
Queries: []backend.DataQuery{
{RefID: "A"},
},
},
)
if err != nil {
t.Error(err)
}
if len(resp.Responses) != 1 {
t.Fatal("QueryData must return a response")
}
}
func Test_CallResource(t *testing.T) {
ds := &PhlareDatasource{
client: &FakeClient{},
}
t.Run("series resource", func(t *testing.T) {
sender := &FakeSender{}
err := ds.CallResource(
context.Background(),
&backend.CallResourceRequest{
PluginContext: backend.PluginContext{},
Path: "profileTypes",
Method: "GET",
URL: "profileTypes",
Headers: nil,
Body: nil,
},
sender,
)
require.NoError(t, err)
require.Equal(t, 200, sender.Resp.Status)
require.Equal(t, `[{"id":"type:1","label":"cpu"},{"id":"type:2","label":"memory"}]`, string(sender.Resp.Body))
})
}
type FakeSender struct {
Resp *backend.CallResourceResponse
}
func (fs *FakeSender) Send(resp *backend.CallResourceResponse) error {
fs.Resp = resp
return nil
}

View File

@@ -0,0 +1,85 @@
// Code generated - EDITING IS FUTILE. DO NOT EDIT.
//
// Generated by:
// public/app/plugins/gen.go
// Using jennies:
// PluginGoTypesJenny
//
// Run 'make gen-cue' from repository root to regenerate.
package dataquery
// Defines values for PhlareQueryType.
const (
PhlareQueryTypeBoth PhlareQueryType = "both"
PhlareQueryTypeMetrics PhlareQueryType = "metrics"
PhlareQueryTypeProfile PhlareQueryType = "profile"
)
// These are the common properties available to all queries in all datasources.
// Specific implementations will *extend* this interface, adding the required
// properties for the given context.
type DataQuery struct {
// For mixed data sources the selected datasource is on the query level.
// For non mixed scenarios this is undefined.
// TODO find a better way to do this ^ that's friendly to schema
// TODO this shouldn't be unknown but DataSourceRef | null
Datasource *interface{} `json:"datasource,omitempty"`
// Hide true if query is disabled (ie should not be returned to the dashboard)
// Note this does not always imply that the query should not be executed since
// the results from a hidden query may be used as the input to other queries (SSE etc)
Hide *bool `json:"hide,omitempty"`
// Specify the query flavor
// TODO make this required and give it a default
QueryType *string `json:"queryType,omitempty"`
// A unique identifier for the query within the list of targets.
// In server side expressions, the refId is used as a variable name to identify results.
// By default, the UI will assign A->Z; however setting meaningful names may be useful.
RefId string `json:"refId"`
}
// GrafanaPyroscopeDataQuery defines model for GrafanaPyroscopeDataQuery.
type GrafanaPyroscopeDataQuery struct {
// DataQuery These are the common properties available to all queries in all datasources.
// Specific implementations will *extend* this interface, adding the required
// properties for the given context.
DataQuery
// For mixed data sources the selected datasource is on the query level.
// For non mixed scenarios this is undefined.
// TODO find a better way to do this ^ that's friendly to schema
// TODO this shouldn't be unknown but DataSourceRef | null
Datasource *interface{} `json:"datasource,omitempty"`
// Allows to group the results.
GroupBy []string `json:"groupBy"`
// Hide true if query is disabled (ie should not be returned to the dashboard)
// Note this does not always imply that the query should not be executed since
// the results from a hidden query may be used as the input to other queries (SSE etc)
Hide *bool `json:"hide,omitempty"`
// Specifies the query label selectors.
LabelSelector string `json:"labelSelector"`
// Sets the maximum number of nodes in the flamegraph.
MaxNodes *int64 `json:"maxNodes,omitempty"`
// Specifies the type of profile to query.
ProfileTypeId string `json:"profileTypeId"`
// Specify the query flavor
// TODO make this required and give it a default
QueryType *string `json:"queryType,omitempty"`
// A unique identifier for the query within the list of targets.
// In server side expressions, the refId is used as a variable name to identify results.
// By default, the UI will assign A->Z; however setting meaningful names may be useful.
RefId string `json:"refId"`
}
// PhlareQueryType defines model for PhlareQueryType.
type PhlareQueryType string

View File

@@ -0,0 +1,165 @@
package phlare
import (
"context"
"fmt"
"net/http"
"strings"
"github.com/bufbuild/connect-go"
querierv1 "github.com/grafana/phlare/api/gen/proto/go/querier/v1"
"github.com/grafana/phlare/api/gen/proto/go/querier/v1/querierv1connect"
)
type PhlareClient struct {
connectClient querierv1connect.QuerierServiceClient
}
func NewPhlareClient(httpClient *http.Client, url string) *PhlareClient {
return &PhlareClient{
connectClient: querierv1connect.NewQuerierServiceClient(httpClient, url),
}
}
func (c *PhlareClient) ProfileTypes(ctx context.Context) ([]*ProfileType, error) {
res, err := c.connectClient.ProfileTypes(ctx, connect.NewRequest(&querierv1.ProfileTypesRequest{}))
if err != nil {
return nil, err
}
if res.Msg.ProfileTypes == nil {
// Let's make sure we send at least empty array if we don't have any types
return []*ProfileType{}, nil
} else {
pTypes := make([]*ProfileType, len(res.Msg.ProfileTypes))
for i, pType := range res.Msg.ProfileTypes {
pTypes[i] = &ProfileType{
ID: pType.ID,
Label: pType.Name + " - " + pType.SampleType,
}
}
return pTypes, nil
}
}
func (c *PhlareClient) GetSeries(ctx context.Context, profileTypeID string, labelSelector string, start int64, end int64, groupBy []string, step float64) (*SeriesResponse, error) {
req := connect.NewRequest(&querierv1.SelectSeriesRequest{
ProfileTypeID: profileTypeID,
LabelSelector: labelSelector,
Start: start,
End: end,
Step: step,
GroupBy: groupBy,
})
resp, err := c.connectClient.SelectSeries(ctx, req)
if err != nil {
return nil, err
}
series := make([]*Series, len(resp.Msg.Series))
for i, s := range resp.Msg.Series {
labels := make([]*LabelPair, len(s.Labels))
for i, l := range s.Labels {
labels[i] = &LabelPair{
Name: l.Name,
Value: l.Value,
}
}
points := make([]*Point, len(s.Points))
for i, p := range s.Points {
points[i] = &Point{
Value: p.Value,
Timestamp: p.Timestamp,
}
}
series[i] = &Series{
Labels: labels,
Points: points,
}
}
parts := strings.Split(profileTypeID, ":")
return &SeriesResponse{
Series: series,
Units: getUnits(profileTypeID),
Label: parts[1],
}, nil
}
func (c *PhlareClient) GetProfile(ctx context.Context, profileTypeID, labelSelector string, start, end int64, maxNodes *int64) (*ProfileResponse, error) {
req := &connect.Request[querierv1.SelectMergeStacktracesRequest]{
Msg: &querierv1.SelectMergeStacktracesRequest{
ProfileTypeID: profileTypeID,
LabelSelector: labelSelector,
Start: start,
End: end,
MaxNodes: maxNodes,
},
}
resp, err := c.connectClient.SelectMergeStacktraces(ctx, req)
if err != nil {
return nil, err
}
levels := make([]*Level, len(resp.Msg.Flamegraph.Levels))
for i, level := range resp.Msg.Flamegraph.Levels {
levels[i] = &Level{
Values: level.Values,
}
}
return &ProfileResponse{
Flamebearer: &Flamebearer{
Names: resp.Msg.Flamegraph.Names,
Levels: levels,
Total: resp.Msg.Flamegraph.Total,
MaxSelf: resp.Msg.Flamegraph.MaxSelf,
},
Units: getUnits(profileTypeID),
}, nil
}
func getUnits(profileTypeID string) string {
parts := strings.Split(profileTypeID, ":")
unit := parts[2]
if unit == "nanoseconds" {
return "ns"
}
if unit == "count" {
return "short"
}
return unit
}
func (c *PhlareClient) LabelNames(ctx context.Context, query string, start int64, end int64) ([]string, error) {
resp, err := c.connectClient.LabelNames(ctx, connect.NewRequest(&querierv1.LabelNamesRequest{}))
if err != nil {
return nil, fmt.Errorf("error seding LabelNames request %v", err)
}
var filtered []string
for _, label := range resp.Msg.Names {
if !isPrivateLabel(label) {
filtered = append(filtered, label)
}
}
return filtered, nil
}
func (c *PhlareClient) LabelValues(ctx context.Context, query string, label string, start int64, end int64) ([]string, error) {
resp, err := c.connectClient.LabelValues(ctx, connect.NewRequest(&querierv1.LabelValuesRequest{Name: label}))
if err != nil {
return nil, err
}
return resp.Msg.Names, nil
}
func isPrivateLabel(label string) bool {
return strings.HasPrefix(label, "__")
}

View File

@@ -0,0 +1,110 @@
package phlare
import (
"context"
"testing"
"github.com/bufbuild/connect-go"
googlev1 "github.com/grafana/phlare/api/gen/proto/go/google/v1"
querierv1 "github.com/grafana/phlare/api/gen/proto/go/querier/v1"
typesv1 "github.com/grafana/phlare/api/gen/proto/go/types/v1"
"github.com/stretchr/testify/require"
)
func Test_PhlareClient(t *testing.T) {
connectClient := &FakePhlareConnectClient{}
client := &PhlareClient{
connectClient: connectClient,
}
t.Run("GetSeries", func(t *testing.T) {
resp, err := client.GetSeries(context.Background(), "memory:alloc_objects:count:space:bytes", "{}", 0, 100, []string{}, 15)
require.Nil(t, err)
series := &SeriesResponse{
Series: []*Series{
{Labels: []*LabelPair{{Name: "foo", Value: "bar"}}, Points: []*Point{{Timestamp: int64(1000), Value: 30}, {Timestamp: int64(2000), Value: 10}}},
},
Units: "short",
Label: "alloc_objects",
}
require.Equal(t, series, resp)
})
t.Run("GetProfile", func(t *testing.T) {
maxNodes := int64(-1)
resp, err := client.GetProfile(context.Background(), "memory:alloc_objects:count:space:bytes", "{}", 0, 100, &maxNodes)
require.Nil(t, err)
series := &ProfileResponse{
Flamebearer: &Flamebearer{
Names: []string{"foo", "bar", "baz"},
Levels: []*Level{
{Values: []int64{0, 10, 0, 0}},
{Values: []int64{0, 9, 0, 1}},
{Values: []int64{0, 8, 8, 2}},
},
Total: 100,
MaxSelf: 56,
},
Units: "short",
}
require.Equal(t, series, resp)
})
}
type FakePhlareConnectClient struct {
Req interface{}
}
func (f *FakePhlareConnectClient) ProfileTypes(ctx context.Context, c *connect.Request[querierv1.ProfileTypesRequest]) (*connect.Response[querierv1.ProfileTypesResponse], error) {
panic("implement me")
}
func (f *FakePhlareConnectClient) LabelValues(ctx context.Context, c *connect.Request[querierv1.LabelValuesRequest]) (*connect.Response[querierv1.LabelValuesResponse], error) {
panic("implement me")
}
func (f *FakePhlareConnectClient) LabelNames(context.Context, *connect.Request[querierv1.LabelNamesRequest]) (*connect.Response[querierv1.LabelNamesResponse], error) {
panic("implement me")
}
func (f *FakePhlareConnectClient) Series(ctx context.Context, c *connect.Request[querierv1.SeriesRequest]) (*connect.Response[querierv1.SeriesResponse], error) {
panic("implement me")
}
func (f *FakePhlareConnectClient) SelectMergeStacktraces(ctx context.Context, c *connect.Request[querierv1.SelectMergeStacktracesRequest]) (*connect.Response[querierv1.SelectMergeStacktracesResponse], error) {
f.Req = c
return &connect.Response[querierv1.SelectMergeStacktracesResponse]{
Msg: &querierv1.SelectMergeStacktracesResponse{
Flamegraph: &querierv1.FlameGraph{
Names: []string{"foo", "bar", "baz"},
Levels: []*querierv1.Level{
{Values: []int64{0, 10, 0, 0}},
{Values: []int64{0, 9, 0, 1}},
{Values: []int64{0, 8, 8, 2}},
},
Total: 100,
MaxSelf: 56,
},
},
}, nil
}
func (f *FakePhlareConnectClient) SelectSeries(ctx context.Context, req *connect.Request[querierv1.SelectSeriesRequest]) (*connect.Response[querierv1.SelectSeriesResponse], error) {
f.Req = req
return &connect.Response[querierv1.SelectSeriesResponse]{
Msg: &querierv1.SelectSeriesResponse{
Series: []*typesv1.Series{
{
Labels: []*typesv1.LabelPair{{Name: "foo", Value: "bar"}},
Points: []*typesv1.Point{{Timestamp: int64(1000), Value: 30}, {Timestamp: int64(2000), Value: 10}},
},
},
},
}, nil
}
func (f *FakePhlareConnectClient) SelectMergeProfile(ctx context.Context, c *connect.Request[querierv1.SelectMergeProfileRequest]) (*connect.Response[googlev1.Profile], error) {
panic("implement me")
}

View File

@@ -0,0 +1,282 @@
package phlare
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
)
type PyroscopeClient struct {
httpClient *http.Client
URL string
}
type App struct {
Name string `json:"name"`
}
func NewPyroscopeClient(httpClient *http.Client, url string) *PyroscopeClient {
return &PyroscopeClient{
httpClient: httpClient,
URL: url,
}
}
func (c *PyroscopeClient) ProfileTypes(ctx context.Context) ([]*ProfileType, error) {
resp, err := c.httpClient.Get(c.URL + "/api/apps")
if err != nil {
return nil, err
}
defer func() {
if err := resp.Body.Close(); err != nil {
logger.Error("failed to close response body", "err", err)
}
}()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var apps []App
err = json.Unmarshal(body, &apps)
if err != nil {
return nil, err
}
var profileTypes []*ProfileType
for _, app := range apps {
profileTypes = append(profileTypes, &ProfileType{
ID: app.Name,
Label: app.Name,
})
}
return profileTypes, nil
}
type PyroscopeProfileResponse struct {
Flamebearer *PyroFlamebearer `json:"flamebearer"`
Metadata *Metadata `json:"metadata"`
Groups map[string]*Group `json:"groups"`
}
type Metadata struct {
Units string `json:"units"`
}
type Group struct {
StartTime int64 `json:"startTime"`
Samples []int64 `json:"samples"`
DurationDelta int64 `json:"durationDelta"`
}
type PyroFlamebearer struct {
Levels [][]int64 `json:"levels"`
MaxSelf int64 `json:"maxSelf"`
NumTicks int64 `json:"numTicks"`
Names []string `json:"names"`
}
func (c *PyroscopeClient) getProfileData(ctx context.Context, profileTypeID, labelSelector string, start, end int64, maxNodes *int64, groupBy []string) (*PyroscopeProfileResponse, error) {
params := url.Values{}
params.Add("from", strconv.FormatInt(start, 10))
params.Add("until", strconv.FormatInt(end, 10))
params.Add("query", profileTypeID+labelSelector)
if maxNodes != nil {
params.Add("maxNodes", strconv.FormatInt(*maxNodes, 10))
}
params.Add("format", "json")
if len(groupBy) > 0 {
params.Add("groupBy", groupBy[0])
}
url := c.URL + "/render?" + params.Encode()
logger.Debug("calling /render", "url", url)
resp, err := c.httpClient.Get(url)
if err != nil {
return nil, fmt.Errorf("error calling /render api: %v", err)
}
defer func() {
if err := resp.Body.Close(); err != nil {
logger.Error("failed to close response body", "err", err)
}
}()
var respData *PyroscopeProfileResponse
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("error reading response body: %v", err)
}
err = json.Unmarshal(body, &respData)
if err != nil {
logger.Debug("flamegraph data", "body", string(body))
return nil, fmt.Errorf("error decoding flamegraph data: %v", err)
}
return respData, nil
}
func (c *PyroscopeClient) GetProfile(ctx context.Context, profileTypeID, labelSelector string, start, end int64, maxNodes *int64) (*ProfileResponse, error) {
respData, err := c.getProfileData(ctx, profileTypeID, labelSelector, start, end, maxNodes, nil)
if err != nil {
return nil, err
}
mappedLevels := make([]*Level, len(respData.Flamebearer.Levels))
for i, level := range respData.Flamebearer.Levels {
mappedLevels[i] = &Level{
Values: level,
}
}
units := "short"
if respData.Metadata.Units == "bytes" {
units = "bytes"
}
if respData.Metadata.Units == "samples" {
units = "ms"
}
return &ProfileResponse{
Flamebearer: &Flamebearer{
Names: respData.Flamebearer.Names,
Levels: mappedLevels,
Total: respData.Flamebearer.NumTicks,
MaxSelf: respData.Flamebearer.MaxSelf,
},
Units: units,
}, nil
}
func (c *PyroscopeClient) GetSeries(ctx context.Context, profileTypeID string, labelSelector string, start, end int64, groupBy []string, step float64) (*SeriesResponse, error) {
// This is super ineffective at the moment. We need 2 different APIs one for profile one for series (timeline) data
// but Pyro returns all in single response. This currently does the simplest thing and calls the same API 2 times
// and gets the part of the response it needs.
respData, err := c.getProfileData(ctx, profileTypeID, labelSelector, start, end, nil, groupBy)
if err != nil {
return nil, err
}
stepMillis := int64(step * 1000)
var series []*Series
if len(respData.Groups) == 1 {
series = []*Series{processGroup(respData.Groups["*"], stepMillis, nil)}
} else {
for key, val := range respData.Groups {
// If we have a group by, we don't want the * group
if key != "*" {
label := &LabelPair{
Name: groupBy[0],
Value: key,
}
series = append(series, processGroup(val, stepMillis, label))
}
}
}
return &SeriesResponse{Series: series}, nil
}
// processGroup turns group timeline data into the Series format. Pyro does not seem to have a way to define step, so we
// always get the data in specific step, and we have to aggregate a bit into s step size we need.
func processGroup(group *Group, step int64, label *LabelPair) *Series {
series := &Series{}
if label != nil {
series.Labels = []*LabelPair{label}
}
durationDeltaMillis := group.DurationDelta * 1000
timestamp := group.StartTime * 1000
value := int64(0)
for i, sample := range group.Samples {
pointsLen := int64(len(series.Points))
// Check if the timestamp of the sample is more than next timestamp in the series. If so we create a new point
// with the value we have so far.
if int64(i)*durationDeltaMillis > step*pointsLen+1 {
series.Points = append(series.Points, &Point{
Value: float64(value),
Timestamp: timestamp + step*pointsLen,
})
value = 0
}
value += sample
}
return series
}
func (c *PyroscopeClient) LabelNames(ctx context.Context, query string, start int64, end int64) ([]string, error) {
params := url.Values{}
// Seems like this should be seconds instead of millis for other endpoints
params.Add("from", strconv.FormatInt(start/1000, 10))
params.Add("until", strconv.FormatInt(end/1000, 10))
params.Add("query", query)
resp, err := c.httpClient.Get(c.URL + "/labels?" + params.Encode())
if err != nil {
return nil, err
}
defer func() {
if err := resp.Body.Close(); err != nil {
logger.Error("failed to close response body", "err", err)
}
}()
var names []string
err = json.NewDecoder(resp.Body).Decode(&names)
if err != nil {
return nil, err
}
var filtered []string
for _, label := range names {
// Using the same func from Phlare client, works but should do separate one probably
if !isPrivateLabel(label) {
filtered = append(filtered, label)
}
}
return filtered, nil
}
func (c *PyroscopeClient) LabelValues(ctx context.Context, query string, label string, start int64, end int64) ([]string, error) {
params := url.Values{}
// Seems like this should be seconds instead of millis for other endpoints
params.Add("from", strconv.FormatInt(start/1000, 10))
params.Add("until", strconv.FormatInt(end/1000, 10))
params.Add("label", label)
params.Add("query", query)
resp, err := c.httpClient.Get(c.URL + "/labels?" + params.Encode())
if err != nil {
return nil, err
}
defer func() {
if err := resp.Body.Close(); err != nil {
logger.Error("failed to close response body", "err", err)
}
}()
var values []string
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
err = json.Unmarshal(body, &values)
if err != nil {
logger.Debug("response", "body", string(body))
return nil, fmt.Errorf("error unmarshaling response %v", err)
}
return values, nil
}

View File

@@ -0,0 +1,401 @@
package phlare
import (
"context"
"encoding/json"
"fmt"
"math"
"sync"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/gtime"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/live"
"github.com/grafana/grafana/pkg/tsdb/grafana-pyroscope-datasource/kinds/dataquery"
"github.com/xlab/treeprint"
"golang.org/x/sync/errgroup"
)
type queryModel struct {
WithStreaming bool
dataquery.GrafanaPyroscopeDataQuery
}
type dsJsonModel struct {
MinStep string `json:"minStep"`
}
const (
queryTypeProfile = string(dataquery.PhlareQueryTypeProfile)
queryTypeMetrics = string(dataquery.PhlareQueryTypeMetrics)
queryTypeBoth = string(dataquery.PhlareQueryTypeBoth)
)
// query processes single Phlare query transforming the response to data.Frame packaged in DataResponse
func (d *PhlareDatasource) query(ctx context.Context, pCtx backend.PluginContext, query backend.DataQuery) backend.DataResponse {
var qm queryModel
response := backend.DataResponse{}
err := json.Unmarshal(query.JSON, &qm)
if err != nil {
response.Error = fmt.Errorf("error unmarshaling query model: %v", err)
return response
}
responseMutex := sync.Mutex{}
g, gCtx := errgroup.WithContext(ctx)
if query.QueryType == queryTypeMetrics || query.QueryType == queryTypeBoth {
g.Go(func() error {
var dsJson dsJsonModel
err = json.Unmarshal(pCtx.DataSourceInstanceSettings.JSONData, &dsJson)
if err != nil {
return fmt.Errorf("error unmarshaling datasource json model: %v", err)
}
parsedInterval := time.Second * 15
if dsJson.MinStep != "" {
parsedInterval, err = gtime.ParseDuration(dsJson.MinStep)
if err != nil {
parsedInterval = time.Second * 15
logger.Debug("Failed to parse the MinStep using default", "MinStep", dsJson.MinStep)
}
}
logger.Debug("Sending SelectSeriesRequest", "queryModel", qm)
seriesResp, err := d.client.GetSeries(
gCtx,
qm.ProfileTypeId,
qm.LabelSelector,
query.TimeRange.From.UnixMilli(),
query.TimeRange.To.UnixMilli(),
qm.GroupBy,
math.Max(query.Interval.Seconds(), parsedInterval.Seconds()),
)
if err != nil {
logger.Error("Querying SelectSeries()", "err", err)
return err
}
// add the frames to the response.
responseMutex.Lock()
response.Frames = append(response.Frames, seriesToDataFrames(seriesResp)...)
responseMutex.Unlock()
return nil
})
}
if query.QueryType == queryTypeProfile || query.QueryType == queryTypeBoth {
g.Go(func() error {
logger.Debug("Calling GetProfile", "queryModel", qm)
prof, err := d.client.GetProfile(gCtx, qm.ProfileTypeId, qm.LabelSelector, query.TimeRange.From.UnixMilli(), query.TimeRange.To.UnixMilli(), qm.MaxNodes)
if err != nil {
logger.Error("Error GetProfile()", "err", err)
return err
}
frame := responseToDataFrames(prof)
responseMutex.Lock()
response.Frames = append(response.Frames, frame)
responseMutex.Unlock()
// If query called with streaming on then return a channel
// to subscribe on a client-side and consume updates from a plugin.
// Feel free to remove this if you don't need streaming for your datasource.
if qm.WithStreaming {
channel := live.Channel{
Scope: live.ScopeDatasource,
Namespace: pCtx.DataSourceInstanceSettings.UID,
Path: "stream",
}
frame.SetMeta(&data.FrameMeta{Channel: channel.String()})
}
return nil
})
}
if err := g.Wait(); err != nil {
response.Error = g.Wait()
}
return response
}
// responseToDataFrames turns Phlare response to data.Frame. We encode the data into a nested set format where we have
// [level, value, label] columns and by ordering the items in a depth first traversal order we can recreate the whole
// tree back.
func responseToDataFrames(resp *ProfileResponse) *data.Frame {
tree := levelsToTree(resp.Flamebearer.Levels, resp.Flamebearer.Names)
return treeToNestedSetDataFrame(tree, resp.Units)
}
// START_OFFSET is offset of the bar relative to previous sibling
const START_OFFSET = 0
// VALUE_OFFSET is value or width of the bar
const VALUE_OFFSET = 1
// SELF_OFFSET is self value of the bar
const SELF_OFFSET = 2
// NAME_OFFSET is index into the names array
const NAME_OFFSET = 3
// ITEM_OFFSET Next bar. Each bar of the profile is represented by 4 number in a flat array.
const ITEM_OFFSET = 4
type ProfileTree struct {
Start int64
Value int64
Self int64
Level int
Name string
Nodes []*ProfileTree
}
// levelsToTree converts flamebearer format into a tree. This is needed to then convert it into nested set format
// dataframe. This should be temporary, and ideally we should get some sort of tree struct directly from Phlare API.
func levelsToTree(levels []*Level, names []string) *ProfileTree {
if len(levels) == 0 {
return nil
}
tree := &ProfileTree{
Start: 0,
Value: levels[0].Values[VALUE_OFFSET],
Self: levels[0].Values[SELF_OFFSET],
Level: 0,
Name: names[levels[0].Values[0]],
}
parentsStack := []*ProfileTree{tree}
currentLevel := 1
// Cycle through each level
for {
if currentLevel >= len(levels) {
break
}
// If we still have levels to go, this should not happen. Something is probably wrong with the flamebearer data.
if len(parentsStack) == 0 {
logger.Error("parentsStack is empty but we are not at the the last level", "currentLevel", currentLevel)
break
}
var nextParentsStack []*ProfileTree
currentParent := parentsStack[:1][0]
parentsStack = parentsStack[1:]
itemIndex := 0
// cumulative offset as items in flamebearer format have just relative to prev item
offset := int64(0)
// Cycle through bar in a level
for {
if itemIndex >= len(levels[currentLevel].Values) {
break
}
itemStart := levels[currentLevel].Values[itemIndex+START_OFFSET] + offset
itemValue := levels[currentLevel].Values[itemIndex+VALUE_OFFSET]
selfValue := levels[currentLevel].Values[itemIndex+SELF_OFFSET]
itemEnd := itemStart + itemValue
parentEnd := currentParent.Start + currentParent.Value
if itemStart >= currentParent.Start && itemEnd <= parentEnd {
// We have an item that is in the bounds of current parent item, so it should be its child
treeItem := &ProfileTree{
Start: itemStart,
Value: itemValue,
Self: selfValue,
Level: currentLevel,
Name: names[levels[currentLevel].Values[itemIndex+NAME_OFFSET]],
}
// Add to parent
currentParent.Nodes = append(currentParent.Nodes, treeItem)
// Add this item as parent for the next level
nextParentsStack = append(nextParentsStack, treeItem)
itemIndex += ITEM_OFFSET
// Update offset for next item. This is changing relative offset to absolute one.
offset = itemEnd
} else {
// We went out of parents bounds so lets move to next parent. We will evaluate the same item again, but
// we will check if it is a child of the next parent item in line.
if len(parentsStack) == 0 {
logger.Error("parentsStack is empty but there are still items in current level", "currentLevel", currentLevel, "itemIndex", itemIndex)
break
}
currentParent = parentsStack[:1][0]
parentsStack = parentsStack[1:]
continue
}
}
parentsStack = nextParentsStack
currentLevel++
}
return tree
}
type Function struct {
FunctionName string
FileName string // optional
Line int64 // optional
}
func (f Function) String() string {
return fmt.Sprintf("%s:%s:%d", f.FileName, f.FunctionName, f.Line)
}
func (pt *ProfileTree) String() string {
type branch struct {
nodes []*ProfileTree
treeprint.Tree
}
tree := treeprint.New()
for _, n := range []*ProfileTree{pt} {
b := tree.AddBranch(fmt.Sprintf("%s: level %d self %d total %d", n.Name, n.Level, n.Self, n.Value))
remaining := append([]*branch{}, &branch{nodes: n.Nodes, Tree: b})
for len(remaining) > 0 {
current := remaining[0]
remaining = remaining[1:]
for _, n := range current.nodes {
if len(n.Nodes) > 0 {
remaining = append(remaining,
&branch{
nodes: n.Nodes, Tree: current.Tree.AddBranch(fmt.Sprintf("%s: level %d self %d total %d", n.Name, n.Level, n.Self, n.Value)),
},
)
} else {
current.Tree.AddNode(fmt.Sprintf("%s: level %d self %d total %d", n.Name, n.Level, n.Self, n.Value))
}
}
}
}
return tree.String()
}
type CustomMeta struct {
ProfileTypeID string
}
// treeToNestedSetDataFrame walks the tree depth first and adds items into the dataframe. This is a nested set format
// where ordering the items in depth first order and knowing the level/depth of each item we can recreate the
// parent - child relationship without explicitly needing parent/child column, and we can later just iterate over the
// dataFrame to again basically walking depth first over the tree/profile.
func treeToNestedSetDataFrame(tree *ProfileTree, unit string) *data.Frame {
frame := data.NewFrame("response")
frame.Meta = &data.FrameMeta{PreferredVisualization: "flamegraph"}
levelField := data.NewField("level", nil, []int64{})
valueField := data.NewField("value", nil, []int64{})
selfField := data.NewField("self", nil, []int64{})
// profileTypeID should encode the type of the profile with unit being the 3rd part
valueField.Config = &data.FieldConfig{Unit: unit}
selfField.Config = &data.FieldConfig{Unit: unit}
frame.Fields = data.Fields{levelField, valueField, selfField}
labelField := NewEnumField("label", nil)
// Tree can be nil if profile was empty, we can still send empty frame in that case
if tree != nil {
walkTree(tree, func(tree *ProfileTree) {
levelField.Append(int64(tree.Level))
valueField.Append(tree.Value)
selfField.Append(tree.Self)
labelField.Append(tree.Name)
})
}
frame.Fields = append(frame.Fields, labelField.GetField())
return frame
}
type EnumField struct {
field *data.Field
valuesMap map[string]int64
counter int64
}
func NewEnumField(name string, labels data.Labels) *EnumField {
return &EnumField{
field: data.NewField(name, labels, []int64{}),
valuesMap: make(map[string]int64),
}
}
func (e *EnumField) Append(value string) {
if valueIndex, ok := e.valuesMap[value]; ok {
e.field.Append(valueIndex)
} else {
e.valuesMap[value] = e.counter
e.field.Append(e.counter)
e.counter++
}
}
func (e *EnumField) GetField() *data.Field {
s := make([]string, len(e.valuesMap))
for k, v := range e.valuesMap {
s[v] = k
}
e.field.SetConfig(&data.FieldConfig{
TypeConfig: &data.FieldTypeConfig{
Enum: &data.EnumFieldConfig{
Text: s,
},
},
})
return e.field
}
func walkTree(tree *ProfileTree, fn func(tree *ProfileTree)) {
fn(tree)
stack := tree.Nodes
for {
if len(stack) == 0 {
break
}
fn(stack[0])
if stack[0].Nodes != nil {
stack = append(stack[0].Nodes, stack[1:]...)
} else {
stack = stack[1:]
}
}
}
func seriesToDataFrames(resp *SeriesResponse) []*data.Frame {
frames := make([]*data.Frame, 0, len(resp.Series))
for _, series := range resp.Series {
// We create separate data frames as the series may not have the same length
frame := data.NewFrame("series")
frame.Meta = &data.FrameMeta{PreferredVisualization: "graph"}
fields := make(data.Fields, 0, 2)
timeField := data.NewField("time", nil, []time.Time{})
fields = append(fields, timeField)
labels := make(map[string]string)
for _, label := range series.Labels {
labels[label.Name] = label.Value
}
valueField := data.NewField(resp.Label, labels, []float64{})
valueField.Config = &data.FieldConfig{Unit: resp.Units}
for _, point := range series.Points {
timeField.Append(time.UnixMilli(point.Timestamp))
valueField.Append(point.Value)
}
fields = append(fields, valueField)
frame.Fields = fields
frames = append(frames, frame)
}
return frames
}

View File

@@ -0,0 +1,326 @@
package phlare
import (
"context"
"testing"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/stretchr/testify/require"
)
// This is where the tests for the datasource backend live.
func Test_query(t *testing.T) {
client := &FakeClient{}
ds := &PhlareDatasource{
client: client,
}
pCtx := backend.PluginContext{
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{
JSONData: []byte(`{"minStep":"30s"}`),
},
}
t.Run("query both", func(t *testing.T) {
dataQuery := makeDataQuery()
resp := ds.query(context.Background(), pCtx, *dataQuery)
require.Nil(t, resp.Error)
require.Equal(t, 2, len(resp.Frames))
// The order of the frames is not guaranteed, so we normalize it
if resp.Frames[0].Fields[0].Name == "level" {
resp.Frames[1], resp.Frames[0] = resp.Frames[0], resp.Frames[1]
}
require.Equal(t, "time", resp.Frames[0].Fields[0].Name)
require.Equal(t, data.NewField("level", nil, []int64{0, 1, 2}), resp.Frames[1].Fields[0])
})
t.Run("query profile", func(t *testing.T) {
dataQuery := makeDataQuery()
dataQuery.QueryType = queryTypeProfile
resp := ds.query(context.Background(), pCtx, *dataQuery)
require.Nil(t, resp.Error)
require.Equal(t, 1, len(resp.Frames))
require.Equal(t, data.NewField("level", nil, []int64{0, 1, 2}), resp.Frames[0].Fields[0])
})
t.Run("query metrics", func(t *testing.T) {
dataQuery := makeDataQuery()
dataQuery.QueryType = queryTypeMetrics
resp := ds.query(context.Background(), pCtx, *dataQuery)
require.Nil(t, resp.Error)
require.Equal(t, 1, len(resp.Frames))
require.Equal(t, "time", resp.Frames[0].Fields[0].Name)
})
t.Run("query metrics uses min step", func(t *testing.T) {
dataQuery := makeDataQuery()
dataQuery.QueryType = queryTypeMetrics
resp := ds.query(context.Background(), pCtx, *dataQuery)
require.Nil(t, resp.Error)
step, ok := client.Args[5].(float64)
require.True(t, ok)
require.Equal(t, float64(30), step)
})
t.Run("query metrics uses default min step", func(t *testing.T) {
dataQuery := makeDataQuery()
dataQuery.QueryType = queryTypeMetrics
pCtxNoMinStep := backend.PluginContext{
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{
JSONData: []byte(`{}`),
},
}
resp := ds.query(context.Background(), pCtxNoMinStep, *dataQuery)
require.Nil(t, resp.Error)
step, ok := client.Args[5].(float64)
require.True(t, ok)
require.Equal(t, float64(15), step)
})
t.Run("query metrics uses group by", func(t *testing.T) {
dataQuery := makeDataQuery()
dataQuery.QueryType = queryTypeMetrics
dataQuery.JSON = []byte(`{"profileTypeId":"memory:alloc_objects:count:space:bytes","labelSelector":"{app=\\\"baz\\\"}","groupBy":["app","instance"]}`)
resp := ds.query(context.Background(), pCtx, *dataQuery)
require.Nil(t, resp.Error)
groupBy, ok := client.Args[4].([]string)
require.True(t, ok)
require.Equal(t, []string{"app", "instance"}, groupBy)
})
}
func makeDataQuery() *backend.DataQuery {
return &backend.DataQuery{
RefID: "A",
QueryType: queryTypeBoth,
MaxDataPoints: 0,
Interval: 0,
TimeRange: backend.TimeRange{
From: time.UnixMilli(10000),
To: time.UnixMilli(20000),
},
JSON: []byte(`{"profileTypeId":"memory:alloc_objects:count:space:bytes","labelSelector":"{app=\\\"baz\\\"}"}`),
}
}
func fieldValues[T any](field *data.Field) []T {
values := make([]T, field.Len())
for i := 0; i < field.Len(); i++ {
values[i] = field.At(i).(T)
}
return values
}
// This is where the tests for the datasource backend live.
func Test_profileToDataFrame(t *testing.T) {
profile := &ProfileResponse{
Flamebearer: &Flamebearer{
Names: []string{"func1", "func2", "func3"},
Levels: []*Level{
{Values: []int64{0, 20, 1, 2}},
{Values: []int64{0, 10, 3, 1, 4, 5, 5, 2}},
},
Total: 987,
MaxSelf: 123,
},
Units: "short",
}
frame := responseToDataFrames(profile)
require.Equal(t, 4, len(frame.Fields))
require.Equal(t, data.NewField("level", nil, []int64{0, 1, 1}), frame.Fields[0])
require.Equal(t, data.NewField("value", nil, []int64{20, 10, 5}).SetConfig(&data.FieldConfig{Unit: "short"}), frame.Fields[1])
require.Equal(t, data.NewField("self", nil, []int64{1, 3, 5}).SetConfig(&data.FieldConfig{Unit: "short"}), frame.Fields[2])
require.Equal(t, "label", frame.Fields[3].Name)
require.Equal(t, []int64{0, 1, 2}, fieldValues[int64](frame.Fields[3]))
require.Equal(t, []string{"func1", "func2", "func3"}, frame.Fields[3].Config.TypeConfig.Enum.Text)
}
// This is where the tests for the datasource backend live.
func Test_levelsToTree(t *testing.T) {
t.Run("simple", func(t *testing.T) {
levels := []*Level{
{Values: []int64{0, 100, 0, 0}},
{Values: []int64{0, 40, 0, 1, 0, 30, 0, 2}},
{Values: []int64{0, 15, 0, 3}},
}
tree := levelsToTree(levels, []string{"root", "func1", "func2", "func1:func3"})
require.Equal(t, &ProfileTree{
Start: 0, Value: 100, Level: 0, Name: "root", Nodes: []*ProfileTree{
{
Start: 0, Value: 40, Level: 1, Name: "func1", Nodes: []*ProfileTree{
{Start: 0, Value: 15, Level: 2, Name: "func1:func3"},
},
},
{Start: 40, Value: 30, Level: 1, Name: "func2"},
},
}, tree)
})
t.Run("medium", func(t *testing.T) {
levels := []*Level{
{Values: []int64{0, 100, 0, 0}},
{Values: []int64{0, 40, 0, 1, 0, 30, 0, 2, 0, 30, 0, 3}},
{Values: []int64{0, 20, 0, 4, 50, 10, 0, 5}},
}
tree := levelsToTree(levels, []string{"root", "func1", "func2", "func3", "func1:func4", "func3:func5"})
require.Equal(t, &ProfileTree{
Start: 0, Value: 100, Level: 0, Name: "root", Nodes: []*ProfileTree{
{
Start: 0, Value: 40, Level: 1, Name: "func1", Nodes: []*ProfileTree{
{Start: 0, Value: 20, Level: 2, Name: "func1:func4"},
},
},
{Start: 40, Value: 30, Level: 1, Name: "func2"},
{
Start: 70, Value: 30, Level: 1, Name: "func3", Nodes: []*ProfileTree{
{Start: 70, Value: 10, Level: 2, Name: "func3:func5"},
},
},
},
}, tree)
})
}
func Test_treeToNestedDataFrame(t *testing.T) {
t.Run("sample profile tree", func(t *testing.T) {
tree := &ProfileTree{
Value: 100, Level: 0, Self: 1, Name: "root", Nodes: []*ProfileTree{
{
Value: 40, Level: 1, Self: 2, Name: "func1",
},
{Value: 30, Level: 1, Self: 3, Name: "func2", Nodes: []*ProfileTree{
{Value: 15, Level: 2, Self: 4, Name: "func1:func3"},
}},
},
}
frame := treeToNestedSetDataFrame(tree, "short")
labelConfig := &data.FieldConfig{
TypeConfig: &data.FieldTypeConfig{
Enum: &data.EnumFieldConfig{
Text: []string{"root", "func1", "func2", "func1:func3"},
},
},
}
require.Equal(t,
[]*data.Field{
data.NewField("level", nil, []int64{0, 1, 1, 2}),
data.NewField("value", nil, []int64{100, 40, 30, 15}).SetConfig(&data.FieldConfig{Unit: "short"}),
data.NewField("self", nil, []int64{1, 2, 3, 4}).SetConfig(&data.FieldConfig{Unit: "short"}),
data.NewField("label", nil, []int64{0, 1, 2, 3}).SetConfig(labelConfig),
}, frame.Fields)
})
t.Run("nil profile tree", func(t *testing.T) {
frame := treeToNestedSetDataFrame(nil, "short")
require.Equal(t, 4, len(frame.Fields))
require.Equal(t, 0, frame.Fields[0].Len())
})
}
func Test_seriesToDataFrame(t *testing.T) {
t.Run("single series", func(t *testing.T) {
series := &SeriesResponse{
Series: []*Series{
{Labels: []*LabelPair{}, Points: []*Point{{Timestamp: int64(1000), Value: 30}, {Timestamp: int64(2000), Value: 10}}},
},
Units: "short",
Label: "samples",
}
frames := seriesToDataFrames(series)
require.Equal(t, 2, len(frames[0].Fields))
require.Equal(t, data.NewField("time", nil, []time.Time{time.UnixMilli(1000), time.UnixMilli(2000)}), frames[0].Fields[0])
require.Equal(t, data.NewField("samples", map[string]string{}, []float64{30, 10}).SetConfig(&data.FieldConfig{Unit: "short"}), frames[0].Fields[1])
// with a label pair, the value field should name itself with a label pair name and not the profile type
series = &SeriesResponse{
Series: []*Series{
{Labels: []*LabelPair{{Name: "app", Value: "bar"}}, Points: []*Point{{Timestamp: int64(1000), Value: 30}, {Timestamp: int64(2000), Value: 10}}},
},
Units: "short",
Label: "samples",
}
frames = seriesToDataFrames(series)
require.Equal(t, data.NewField("samples", map[string]string{"app": "bar"}, []float64{30, 10}).SetConfig(&data.FieldConfig{Unit: "short"}), frames[0].Fields[1])
})
t.Run("single series", func(t *testing.T) {
resp := &SeriesResponse{
Series: []*Series{
{Labels: []*LabelPair{{Name: "foo", Value: "bar"}}, Points: []*Point{{Timestamp: int64(1000), Value: 30}, {Timestamp: int64(2000), Value: 10}}},
{Labels: []*LabelPair{{Name: "foo", Value: "baz"}}, Points: []*Point{{Timestamp: int64(1000), Value: 30}, {Timestamp: int64(2000), Value: 10}}},
},
Units: "short",
Label: "samples",
}
frames := seriesToDataFrames(resp)
require.Equal(t, 2, len(frames))
require.Equal(t, 2, len(frames[0].Fields))
require.Equal(t, 2, len(frames[1].Fields))
require.Equal(t, data.NewField("samples", map[string]string{"foo": "bar"}, []float64{30, 10}).SetConfig(&data.FieldConfig{Unit: "short"}), frames[0].Fields[1])
require.Equal(t, data.NewField("samples", map[string]string{"foo": "baz"}, []float64{30, 10}).SetConfig(&data.FieldConfig{Unit: "short"}), frames[1].Fields[1])
})
}
type FakeClient struct {
Args []interface{}
}
func (f *FakeClient) ProfileTypes(ctx context.Context) ([]*ProfileType, error) {
return []*ProfileType{
{
ID: "type:1",
Label: "cpu",
},
{
ID: "type:2",
Label: "memory",
},
}, nil
}
func (f *FakeClient) LabelValues(ctx context.Context, query string, label string, start int64, end int64) ([]string, error) {
panic("implement me")
}
func (f *FakeClient) LabelNames(ctx context.Context, query string, start int64, end int64) ([]string, error) {
panic("implement me")
}
func (f *FakeClient) GetProfile(ctx context.Context, profileTypeID, labelSelector string, start, end int64, maxNodes *int64) (*ProfileResponse, error) {
return &ProfileResponse{
Flamebearer: &Flamebearer{
Names: []string{"foo", "bar", "baz"},
Levels: []*Level{
{Values: []int64{0, 10, 0, 0}},
{Values: []int64{0, 9, 0, 1}},
{Values: []int64{0, 8, 8, 2}},
},
Total: 100,
MaxSelf: 56,
},
Units: "count",
}, nil
}
func (f *FakeClient) GetSeries(ctx context.Context, profileTypeID, labelSelector string, start, end int64, groupBy []string, step float64) (*SeriesResponse, error) {
f.Args = []interface{}{profileTypeID, labelSelector, start, end, groupBy, step}
return &SeriesResponse{
Series: []*Series{
{
Labels: []*LabelPair{{Name: "foo", Value: "bar"}},
Points: []*Point{{Timestamp: int64(1000), Value: 30}, {Timestamp: int64(2000), Value: 10}},
},
},
Units: "count",
Label: "test",
}, nil
}

View File

@@ -0,0 +1,104 @@
package phlare
import (
"context"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana/pkg/infra/httpclient"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/accesscontrol"
)
// Make sure PhlareDatasource implements required interfaces. This is important to do
// since otherwise we will only get a not implemented error response from plugin in
// runtime. In this example datasource instance implements backend.QueryDataHandler,
// backend.CheckHealthHandler, backend.StreamHandler interfaces. Plugin should not
// implement all these interfaces - only those which are required for a particular task.
// For example if plugin does not need streaming functionality then you are free to remove
// methods that implement backend.StreamHandler. Implementing instancemgmt.InstanceDisposer
// is useful to clean up resources used by previous datasource instance when a new datasource
// instance created upon datasource settings changed.
var (
_ backend.QueryDataHandler = (*Service)(nil)
_ backend.CallResourceHandler = (*Service)(nil)
_ backend.CheckHealthHandler = (*Service)(nil)
_ backend.StreamHandler = (*Service)(nil)
)
var logger = log.New("tsdb.phlare")
type Service struct {
im instancemgmt.InstanceManager
}
func (s *Service) getInstance(ctx context.Context, pluginCtx backend.PluginContext) (*PhlareDatasource, error) {
i, err := s.im.Get(ctx, pluginCtx)
if err != nil {
return nil, err
}
in := i.(*PhlareDatasource)
return in, nil
}
func ProvideService(httpClientProvider httpclient.Provider, ac accesscontrol.AccessControl) *Service {
return &Service{
im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider, ac)),
}
}
func newInstanceSettings(httpClientProvider httpclient.Provider, ac accesscontrol.AccessControl) datasource.InstanceFactoryFunc {
return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
return NewPhlareDatasource(httpClientProvider, settings, ac)
}
}
func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
i, err := s.getInstance(ctx, req.PluginContext)
if err != nil {
return nil, err
}
return i.QueryData(ctx, req)
}
func (s *Service) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
i, err := s.getInstance(ctx, req.PluginContext)
if err != nil {
return err
}
return i.CallResource(ctx, req, sender)
}
func (s *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
i, err := s.getInstance(ctx, req.PluginContext)
if err != nil {
return nil, err
}
return i.CheckHealth(ctx, req)
}
func (s *Service) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
i, err := s.getInstance(ctx, req.PluginContext)
if err != nil {
return nil, err
}
return i.SubscribeStream(ctx, req)
}
func (s *Service) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
i, err := s.getInstance(ctx, req.PluginContext)
if err != nil {
return err
}
return i.RunStream(ctx, req, sender)
}
// PublishStream is called when a client sends a message to the stream.
func (s *Service) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
i, err := s.getInstance(ctx, req.PluginContext)
if err != nil {
return nil, err
}
return i.PublishStream(ctx, req)
}

File diff suppressed because one or more lines are too long