mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Parca: Update supported version to v0.19+ (#89826)
* pkg/tsdb/parca: Upgrade to using the flamegraph arrow * pkg/tsdb/parca: Delete code for old flamegraphs * pkg/tsdb/parca: Fix golangci-lint error in test * pkg/tsdb/parca: Handle errors nicely * docs/sources/datasource: Add Parca v0.19+ support note * pkg/tsdb/parca: Don't use arrow table reader As pointed out during reviews, it's not really needed and we can read the record directly.
This commit is contained in:
parent
530355d5db
commit
fbc195549f
@ -37,6 +37,12 @@ refs:
|
||||
|
||||
Grafana ships with built-in support for Parca, a continuous profiling OSS database for analysis of CPU and memory usage, down to the line number and throughout time. Add it as a data source, and you are ready to query your profiles in [Explore](ref:explore).
|
||||
|
||||
## Supported Parca versions
|
||||
|
||||
This data source supports these versions of Parca:
|
||||
|
||||
- v0.19+
|
||||
|
||||
## Configure the Parca data source
|
||||
|
||||
To configure basic settings for the data source, complete the following steps:
|
||||
|
4
go.mod
4
go.mod
@ -11,8 +11,8 @@ replace cuelang.org/go => github.com/grafana/cue v0.0.0-20230926092038-971951014
|
||||
replace github.com/prometheus/prometheus => github.com/prometheus/prometheus v0.52.0
|
||||
|
||||
require (
|
||||
buf.build/gen/go/parca-dev/parca/bufbuild/connect-go v1.4.1-20221222094228-8b1d3d0f62e6.1 // @grafana/observability-traces-and-profiling
|
||||
buf.build/gen/go/parca-dev/parca/protocolbuffers/go v1.33.0-20240414232344-9ca06271cb73.1 // @grafana/observability-traces-and-profiling
|
||||
buf.build/gen/go/parca-dev/parca/bufbuild/connect-go v1.10.0-20240523185345-933eab74d046.1 // @grafana/observability-traces-and-profiling
|
||||
buf.build/gen/go/parca-dev/parca/protocolbuffers/go v1.34.1-20240523185345-933eab74d046.1 // @grafana/observability-traces-and-profiling
|
||||
cloud.google.com/go/kms v1.15.7 // @grafana/grafana-backend-group
|
||||
cloud.google.com/go/storage v1.38.0 // @grafana/grafana-backend-group
|
||||
cuelang.org/go v0.6.0-0.dev // @grafana/grafana-as-code
|
||||
|
9
go.sum
9
go.sum
@ -1,7 +1,7 @@
|
||||
buf.build/gen/go/parca-dev/parca/bufbuild/connect-go v1.4.1-20221222094228-8b1d3d0f62e6.1 h1:wQ75SnlaD0X30PnrmA+07A/5fnQWrAHy1mzv+CPB5Oo=
|
||||
buf.build/gen/go/parca-dev/parca/bufbuild/connect-go v1.4.1-20221222094228-8b1d3d0f62e6.1/go.mod h1:VYzBTKhjl92cl3sv+xznQcJHCezU7qnI0FhBAUb4n8c=
|
||||
buf.build/gen/go/parca-dev/parca/protocolbuffers/go v1.33.0-20240414232344-9ca06271cb73.1 h1:arEscdzM2EZPZT8x7tSzuBVgyZrnsKuvoftapClxUgw=
|
||||
buf.build/gen/go/parca-dev/parca/protocolbuffers/go v1.33.0-20240414232344-9ca06271cb73.1/go.mod h1:/vvnaG5MGgbuJxYTucpo0QOom9xbSb6Y43za3/as9qk=
|
||||
buf.build/gen/go/parca-dev/parca/bufbuild/connect-go v1.10.0-20240523185345-933eab74d046.1 h1:q6MBjQ5fj3ygW/ySxOck7iwLhPQWbzOKJRZYzXRBmBk=
|
||||
buf.build/gen/go/parca-dev/parca/bufbuild/connect-go v1.10.0-20240523185345-933eab74d046.1/go.mod h1:DEC5vsD4oLn7c6QeBVfUS7WpW5iwmW5lXfhpsjVxVt0=
|
||||
buf.build/gen/go/parca-dev/parca/protocolbuffers/go v1.34.1-20240523185345-933eab74d046.1 h1:Osqg+/+sFJKr5iyna6/AvyxXPY0jqaIGr3ltzzSDLRk=
|
||||
buf.build/gen/go/parca-dev/parca/protocolbuffers/go v1.34.1-20240523185345-933eab74d046.1/go.mod h1:lbDqoSeErWK6pETEKo/LO+JmU2GbZqVE8ILESypLuZU=
|
||||
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
||||
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
||||
cloud.google.com/go v0.37.4/go.mod h1:NHPJ89PdicEuT9hdPXMROBD91xc5uRDxsMtSB16k7hw=
|
||||
@ -4631,6 +4631,7 @@ google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw
|
||||
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
|
||||
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
|
||||
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
|
||||
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
|
||||
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
|
||||
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
|
||||
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
|
||||
|
@ -1,23 +1,29 @@
|
||||
package parca
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
v1alpha1 "buf.build/gen/go/parca-dev/parca/protocolbuffers/go/parca/query/v1alpha1"
|
||||
"github.com/apache/arrow/go/v15/arrow"
|
||||
"github.com/apache/arrow/go/v15/arrow/array"
|
||||
"github.com/apache/arrow/go/v15/arrow/ipc"
|
||||
"github.com/bufbuild/connect-go"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/utils"
|
||||
"github.com/grafana/grafana/pkg/tsdb/parca/kinds/dataquery"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
|
||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/utils"
|
||||
"github.com/grafana/grafana/pkg/tsdb/parca/kinds/dataquery"
|
||||
)
|
||||
|
||||
type queryModel struct {
|
||||
@ -65,13 +71,25 @@ func (d *ParcaDatasource) query(ctx context.Context, pCtx backend.PluginContext,
|
||||
ctxLogger.Debug("Querying SelectMergeStacktraces()", "queryModel", qm, "function", logEntrypoint())
|
||||
resp, err := d.client.Query(ctx, makeProfileRequest(qm, query))
|
||||
if err != nil {
|
||||
response.Error = err
|
||||
if strings.Contains(err.Error(), "invalid report type") {
|
||||
response.Error = fmt.Errorf("try updating Parca to v0.19+: %v", err)
|
||||
} else {
|
||||
response.Error = err
|
||||
}
|
||||
|
||||
ctxLogger.Error("Failed to process query", "error", err, "queryType", query.QueryType, "function", logEntrypoint())
|
||||
span.RecordError(response.Error)
|
||||
span.SetStatus(codes.Error, response.Error.Error())
|
||||
return response
|
||||
}
|
||||
frame := responseToDataFrames(resp)
|
||||
frame, err := responseToDataFrames(resp)
|
||||
if err != nil {
|
||||
response.Error = err
|
||||
ctxLogger.Error("Failed to convert the response to a data frame", "error", err, "queryType", query.QueryType)
|
||||
span.RecordError(response.Error)
|
||||
span.SetStatus(codes.Error, response.Error.Error())
|
||||
return response
|
||||
}
|
||||
response.Frames = append(response.Frames, frame)
|
||||
}
|
||||
|
||||
@ -93,9 +111,8 @@ func makeProfileRequest(qm queryModel, query backend.DataQuery) *connect.Request
|
||||
},
|
||||
},
|
||||
},
|
||||
// We should change this to QueryRequest_REPORT_TYPE_FLAMEGRAPH_TABLE later on
|
||||
// nolint:staticcheck
|
||||
ReportType: v1alpha1.QueryRequest_REPORT_TYPE_FLAMEGRAPH_UNSPECIFIED,
|
||||
ReportType: v1alpha1.QueryRequest_REPORT_TYPE_FLAMEGRAPH_ARROW,
|
||||
},
|
||||
}
|
||||
}
|
||||
@ -122,120 +139,14 @@ type CustomMeta struct {
|
||||
// responseToDataFrames turns Parca response to data.Frame. We encode the data into a nested set format where we have
|
||||
// [level, value, label] columns and by ordering the items in a depth first traversal order we can recreate the whole
|
||||
// tree back.
|
||||
func responseToDataFrames(resp *connect.Response[v1alpha1.QueryResponse]) *data.Frame {
|
||||
if flameResponse, ok := resp.Msg.Report.(*v1alpha1.QueryResponse_Flamegraph); ok {
|
||||
frame := treeToNestedSetDataFrame(flameResponse.Flamegraph)
|
||||
frame.Meta = &data.FrameMeta{PreferredVisualization: "flamegraph"}
|
||||
return frame
|
||||
func responseToDataFrames(resp *connect.Response[v1alpha1.QueryResponse]) (*data.Frame, error) {
|
||||
if flameResponse, ok := resp.Msg.Report.(*v1alpha1.QueryResponse_FlamegraphArrow); ok {
|
||||
return arrowToNestedSetDataFrame(flameResponse.FlamegraphArrow)
|
||||
} else {
|
||||
panic("unknown report type returned from query")
|
||||
return nil, fmt.Errorf("unknown report type returned from query. update parca")
|
||||
}
|
||||
}
|
||||
|
||||
// treeToNestedSetDataFrame walks the tree depth first and adds items into the dataframe. This is a nested set format
|
||||
// where by ordering the items in depth first order and knowing the level/depth of each item we can recreate the
|
||||
// parent - child relationship without explicitly needing parent/child column and we can later just iterate over the
|
||||
// dataFrame to again basically walking depth first over the tree/profile.
|
||||
func treeToNestedSetDataFrame(tree *v1alpha1.Flamegraph) *data.Frame {
|
||||
frame := data.NewFrame("response")
|
||||
|
||||
levelField := data.NewField("level", nil, []int64{})
|
||||
valueField := data.NewField("value", nil, []int64{})
|
||||
valueField.Config = &data.FieldConfig{Unit: normalizeUnit(tree.Unit)}
|
||||
selfField := data.NewField("self", nil, []int64{})
|
||||
selfField.Config = &data.FieldConfig{Unit: normalizeUnit(tree.Unit)}
|
||||
labelField := data.NewField("label", nil, []string{})
|
||||
frame.Fields = data.Fields{levelField, valueField, selfField, labelField}
|
||||
|
||||
walkTree(tree.Root, func(level int64, value int64, name string, self int64) {
|
||||
levelField.Append(level)
|
||||
valueField.Append(value)
|
||||
labelField.Append(name)
|
||||
selfField.Append(self)
|
||||
})
|
||||
return frame
|
||||
}
|
||||
|
||||
type Node struct {
|
||||
Node *v1alpha1.FlamegraphNode
|
||||
Level int64
|
||||
}
|
||||
|
||||
func walkTree(tree *v1alpha1.FlamegraphRootNode, fn func(level int64, value int64, name string, self int64)) {
|
||||
stack := make([]*Node, 0, len(tree.Children))
|
||||
var childrenValue int64 = 0
|
||||
|
||||
for _, child := range tree.Children {
|
||||
childrenValue += child.Cumulative
|
||||
stack = append(stack, &Node{Node: child, Level: 1})
|
||||
}
|
||||
|
||||
fn(0, tree.Cumulative, "total", tree.Cumulative-childrenValue)
|
||||
|
||||
for {
|
||||
if len(stack) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
// shift stack
|
||||
node := stack[0]
|
||||
stack = stack[1:]
|
||||
childrenValue = 0
|
||||
|
||||
if node.Node.Children != nil {
|
||||
var children []*Node
|
||||
for _, child := range node.Node.Children {
|
||||
childrenValue += child.Cumulative
|
||||
children = append(children, &Node{Node: child, Level: node.Level + 1})
|
||||
}
|
||||
// Put the children first so we do depth first traversal
|
||||
stack = append(children, stack...)
|
||||
}
|
||||
fn(node.Level, node.Node.Cumulative, nodeName(node.Node), node.Node.Cumulative-childrenValue)
|
||||
}
|
||||
}
|
||||
|
||||
func nodeName(node *v1alpha1.FlamegraphNode) string {
|
||||
if node.Meta == nil {
|
||||
return "<unknown>"
|
||||
}
|
||||
|
||||
mapping := ""
|
||||
if node.Meta.Mapping != nil && node.Meta.Mapping.File != "" {
|
||||
mapping = "[" + getLastItem(node.Meta.Mapping.File) + "] "
|
||||
}
|
||||
|
||||
if node.Meta.Function != nil && node.Meta.Function.Name != "" {
|
||||
return mapping + node.Meta.Function.Name
|
||||
}
|
||||
|
||||
address := ""
|
||||
if node.Meta.Location != nil {
|
||||
address = fmt.Sprintf("0x%x", node.Meta.Location.Address)
|
||||
}
|
||||
|
||||
if mapping == "" && address == "" {
|
||||
return "<unknown>"
|
||||
} else {
|
||||
return mapping + address
|
||||
}
|
||||
}
|
||||
|
||||
func getLastItem(path string) string {
|
||||
parts := strings.Split(path, "/")
|
||||
return parts[len(parts)-1]
|
||||
}
|
||||
|
||||
func normalizeUnit(unit string) string {
|
||||
if unit == "nanoseconds" {
|
||||
return "ns"
|
||||
}
|
||||
if unit == "count" {
|
||||
return "short"
|
||||
}
|
||||
return unit
|
||||
}
|
||||
|
||||
func seriesToDataFrame(seriesResp *connect.Response[v1alpha1.QueryRangeResponse], profileTypeID string) []*data.Frame {
|
||||
frames := make([]*data.Frame, 0, len(seriesResp.Msg.Series))
|
||||
|
||||
@ -266,3 +177,208 @@ func seriesToDataFrame(seriesResp *connect.Response[v1alpha1.QueryRangeResponse]
|
||||
|
||||
return frames
|
||||
}
|
||||
|
||||
func arrowToNestedSetDataFrame(flamegraph *v1alpha1.FlamegraphArrow) (*data.Frame, error) {
|
||||
frame := data.NewFrame("response")
|
||||
frame.Meta = &data.FrameMeta{PreferredVisualization: "flamegraph"}
|
||||
|
||||
levelField := data.NewField("level", nil, []int64{})
|
||||
valueField := data.NewField("value", nil, []int64{})
|
||||
valueField.Config = &data.FieldConfig{Unit: normalizeUnit(flamegraph.Unit)}
|
||||
selfField := data.NewField("self", nil, []int64{})
|
||||
selfField.Config = &data.FieldConfig{Unit: normalizeUnit(flamegraph.Unit)}
|
||||
labelField := data.NewField("label", nil, []string{})
|
||||
frame.Fields = data.Fields{levelField, valueField, selfField, labelField}
|
||||
|
||||
arrowReader, err := ipc.NewReader(bytes.NewBuffer(flamegraph.GetRecord()))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer arrowReader.Release()
|
||||
|
||||
arrowReader.Next()
|
||||
rec := arrowReader.Record()
|
||||
|
||||
fi, err := newFlamegraphIterator(rec)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create flamegraph iterator: %w", err)
|
||||
}
|
||||
|
||||
fi.iterate(func(name string, level, value, self int64) {
|
||||
labelField.Append(name)
|
||||
levelField.Append(level)
|
||||
valueField.Append(value)
|
||||
selfField.Append(self)
|
||||
})
|
||||
|
||||
return frame, nil
|
||||
}
|
||||
|
||||
const (
|
||||
FlamegraphFieldMappingFile = "mapping_file"
|
||||
FlamegraphFieldLocationAddress = "location_address"
|
||||
FlamegraphFieldFunctionName = "function_name"
|
||||
FlamegraphFieldChildren = "children"
|
||||
FlamegraphFieldCumulative = "cumulative"
|
||||
FlamegraphFieldFlat = "flat"
|
||||
)
|
||||
|
||||
type flamegraphIterator struct {
|
||||
columnChildren *array.List
|
||||
columnChildrenValues *array.Uint32
|
||||
columnCumulative func(i int) int64
|
||||
columnMappingFile *array.Dictionary
|
||||
columnMappingFileDict *array.Binary
|
||||
columnFunctionName *array.Dictionary
|
||||
columnFunctionNameDict *array.Binary
|
||||
columnLocationAddress *array.Uint64
|
||||
|
||||
nameBuilder *bytes.Buffer
|
||||
addressBuilder *bytes.Buffer
|
||||
}
|
||||
|
||||
func newFlamegraphIterator(rec arrow.Record) (*flamegraphIterator, error) {
|
||||
schema := rec.Schema()
|
||||
|
||||
columnChildren := rec.Column(schema.FieldIndices(FlamegraphFieldChildren)[0]).(*array.List)
|
||||
columnChildrenValues := columnChildren.ListValues().(*array.Uint32)
|
||||
columnCumulative := uintValue(rec.Column(schema.FieldIndices(FlamegraphFieldCumulative)[0]))
|
||||
|
||||
columnMappingFile := rec.Column(schema.FieldIndices(FlamegraphFieldMappingFile)[0]).(*array.Dictionary)
|
||||
columnMappingFileDict := columnMappingFile.Dictionary().(*array.Binary)
|
||||
columnFunctionName := rec.Column(schema.FieldIndices(FlamegraphFieldFunctionName)[0]).(*array.Dictionary)
|
||||
columnFunctionNameDict := columnFunctionName.Dictionary().(*array.Binary)
|
||||
columnLocationAddress := rec.Column(schema.FieldIndices(FlamegraphFieldLocationAddress)[0]).(*array.Uint64)
|
||||
|
||||
return &flamegraphIterator{
|
||||
columnChildren: columnChildren,
|
||||
columnChildrenValues: columnChildrenValues,
|
||||
columnCumulative: columnCumulative,
|
||||
columnMappingFile: columnMappingFile,
|
||||
columnMappingFileDict: columnMappingFileDict,
|
||||
columnFunctionName: columnFunctionName,
|
||||
columnFunctionNameDict: columnFunctionNameDict,
|
||||
columnLocationAddress: columnLocationAddress,
|
||||
|
||||
nameBuilder: &bytes.Buffer{},
|
||||
addressBuilder: &bytes.Buffer{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (fi *flamegraphIterator) iterate(fn func(name string, level, value, self int64)) {
|
||||
type rowNode struct {
|
||||
row int
|
||||
level int64
|
||||
}
|
||||
childrenStart, childrenEnd := fi.columnChildren.ValueOffsets(0)
|
||||
stack := make([]rowNode, 0, childrenEnd-childrenStart)
|
||||
var childrenValue int64 = 0
|
||||
|
||||
for i := int(childrenStart); i < int(childrenEnd); i++ {
|
||||
child := int(fi.columnChildrenValues.Value(i))
|
||||
childrenValue += fi.columnCumulative(child)
|
||||
stack = append(stack, rowNode{row: child, level: 1})
|
||||
}
|
||||
|
||||
cumulative := fi.columnCumulative(0)
|
||||
fn("total", 0, cumulative, cumulative-childrenValue)
|
||||
|
||||
for {
|
||||
if len(stack) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
// shift stack
|
||||
node := stack[0]
|
||||
stack = stack[1:]
|
||||
childrenValue = 0
|
||||
|
||||
// Get the children for this node and add them to the stack if they exist.
|
||||
start, end := fi.columnChildren.ValueOffsets(node.row)
|
||||
children := make([]rowNode, 0, end-start)
|
||||
for i := start; i < end; i++ {
|
||||
child := fi.columnChildrenValues.Value(int(i))
|
||||
if fi.columnChildrenValues.IsValid(int(child)) {
|
||||
childrenValue += fi.columnCumulative(int(child))
|
||||
children = append(children, rowNode{row: int(child), level: node.level + 1})
|
||||
}
|
||||
}
|
||||
// prepend the new children to the top of the stack
|
||||
stack = append(children, stack...)
|
||||
|
||||
cumulative := fi.columnCumulative(node.row)
|
||||
name := fi.nodeName(node.row)
|
||||
fn(name, node.level, cumulative, cumulative-childrenValue)
|
||||
}
|
||||
}
|
||||
|
||||
func (fi *flamegraphIterator) nodeName(row int) string {
|
||||
fi.nameBuilder.Reset()
|
||||
fi.addressBuilder.Reset()
|
||||
|
||||
if fi.columnMappingFile.IsValid(row) {
|
||||
m := fi.columnMappingFileDict.ValueString(fi.columnMappingFile.GetValueIndex(row))
|
||||
fi.nameBuilder.WriteString("[")
|
||||
fi.nameBuilder.WriteString(getLastItem(m))
|
||||
fi.nameBuilder.WriteString("]")
|
||||
fi.nameBuilder.WriteString(" ")
|
||||
}
|
||||
if fi.columnFunctionName.IsValid(row) {
|
||||
if f := fi.columnFunctionNameDict.ValueString(fi.columnFunctionName.GetValueIndex(row)); f != "" {
|
||||
fi.nameBuilder.WriteString(f)
|
||||
return fi.nameBuilder.String()
|
||||
}
|
||||
}
|
||||
|
||||
if fi.columnLocationAddress.IsValid(row) {
|
||||
a := fi.columnLocationAddress.Value(row)
|
||||
fi.addressBuilder.WriteString("0x")
|
||||
fi.addressBuilder.WriteString(strconv.FormatUint(a, 16))
|
||||
}
|
||||
|
||||
if fi.nameBuilder.Len() == 0 && fi.addressBuilder.Len() == 0 {
|
||||
return "<unknown>"
|
||||
} else {
|
||||
return fi.nameBuilder.String() + fi.addressBuilder.String()
|
||||
}
|
||||
}
|
||||
|
||||
// uintValue is a wrapper to read different uint sizes.
|
||||
// Parca returns values encoded depending on the max value in an array.
|
||||
func uintValue(arr arrow.Array) func(i int) int64 {
|
||||
switch b := arr.(type) {
|
||||
case *array.Uint64:
|
||||
return func(i int) int64 {
|
||||
return int64(b.Value(i))
|
||||
}
|
||||
case *array.Uint32:
|
||||
return func(i int) int64 {
|
||||
return int64(b.Value(i))
|
||||
}
|
||||
case *array.Uint16:
|
||||
return func(i int) int64 {
|
||||
return int64(b.Value(i))
|
||||
}
|
||||
case *array.Uint8:
|
||||
return func(i int) int64 {
|
||||
return int64(b.Value(i))
|
||||
}
|
||||
default:
|
||||
panic(fmt.Errorf("unsupported type %T", b))
|
||||
}
|
||||
}
|
||||
|
||||
func getLastItem(path string) string {
|
||||
parts := strings.Split(path, "/")
|
||||
return parts[len(parts)-1]
|
||||
}
|
||||
|
||||
func normalizeUnit(unit string) string {
|
||||
if unit == "nanoseconds" {
|
||||
return "ns"
|
||||
}
|
||||
if unit == "count" {
|
||||
return "short"
|
||||
}
|
||||
return unit
|
||||
}
|
||||
|
@ -1,13 +1,17 @@
|
||||
package parca
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
v1alpha11 "buf.build/gen/go/parca-dev/parca/protocolbuffers/go/parca/metastore/v1alpha1"
|
||||
profilestore "buf.build/gen/go/parca-dev/parca/protocolbuffers/go/parca/profilestore/v1alpha1"
|
||||
v1alpha1 "buf.build/gen/go/parca-dev/parca/protocolbuffers/go/parca/query/v1alpha1"
|
||||
"github.com/apache/arrow/go/v15/arrow"
|
||||
"github.com/apache/arrow/go/v15/arrow/array"
|
||||
"github.com/apache/arrow/go/v15/arrow/ipc"
|
||||
"github.com/apache/arrow/go/v15/arrow/memory"
|
||||
"github.com/bufbuild/connect-go"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
@ -38,7 +42,7 @@ func Test_query(t *testing.T) {
|
||||
require.Nil(t, resp.Error)
|
||||
require.Equal(t, 2, len(resp.Frames))
|
||||
require.Equal(t, "time", resp.Frames[0].Fields[0].Name)
|
||||
require.Equal(t, data.NewField("level", nil, []int64{0, 1, 2, 3}), resp.Frames[1].Fields[0])
|
||||
require.Equal(t, data.NewField("level", nil, []int64{0, 1, 2, 3, 4, 4}), resp.Frames[1].Fields[0])
|
||||
})
|
||||
|
||||
t.Run("query profile", func(t *testing.T) {
|
||||
@ -46,7 +50,7 @@ func Test_query(t *testing.T) {
|
||||
resp := ds.query(context.Background(), backend.PluginContext{}, dataQuery)
|
||||
require.Nil(t, resp.Error)
|
||||
require.Equal(t, 1, len(resp.Frames))
|
||||
require.Equal(t, data.NewField("level", nil, []int64{0, 1, 2, 3}), resp.Frames[0].Fields[0])
|
||||
require.Equal(t, data.NewField("level", nil, []int64{0, 1, 2, 3, 4, 4}), resp.Frames[0].Fields[0])
|
||||
})
|
||||
|
||||
t.Run("query metrics", func(t *testing.T) {
|
||||
@ -60,22 +64,28 @@ func Test_query(t *testing.T) {
|
||||
|
||||
// This is where the tests for the datasource backend live.
|
||||
func Test_profileToDataFrame(t *testing.T) {
|
||||
frame := responseToDataFrames(flamegraphResponse)
|
||||
frame, err := responseToDataFrames(flamegraphResponse())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 4, len(frame.Fields))
|
||||
require.Equal(t, data.NewField("level", nil, []int64{0, 1, 2, 3}), frame.Fields[0])
|
||||
values := data.NewField("value", nil, []int64{100, 10, 9, 8})
|
||||
values.Config = &data.FieldConfig{
|
||||
Unit: "samples",
|
||||
}
|
||||
require.Equal(t, data.NewField("level", nil, []int64{0, 1, 2, 3, 4, 4}), frame.Fields[0])
|
||||
values := data.NewField("value", nil, []int64{11, 11, 11, 8, 3, 5})
|
||||
values.Config = &data.FieldConfig{Unit: "ns"}
|
||||
require.Equal(t, values, frame.Fields[1])
|
||||
|
||||
self := data.NewField("self", nil, []int64{90, 1, 1, 8})
|
||||
self.Config = &data.FieldConfig{
|
||||
Unit: "samples",
|
||||
}
|
||||
self := data.NewField("self", nil, []int64{0, 0, 3, 0, 3, 5})
|
||||
self.Config = &data.FieldConfig{Unit: "ns"}
|
||||
require.Equal(t, self, frame.Fields[2])
|
||||
|
||||
require.Equal(t, data.NewField("label", nil, []string{"total", "foo", "bar", "baz"}), frame.Fields[3])
|
||||
require.Equal(t, data.NewField("label", nil, []string{"total", "[a] 1", "2", "[a] 3", "[a] 4", "[a] 5"}), frame.Fields[3])
|
||||
}
|
||||
|
||||
func BenchmarkProfileToDataFrame(b *testing.B) {
|
||||
response := flamegraphResponse()
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, _ = responseToDataFrames(response)
|
||||
}
|
||||
}
|
||||
|
||||
func Test_seriesToDataFrame(t *testing.T) {
|
||||
@ -121,53 +131,190 @@ var rangeResponse = &connect.Response[v1alpha1.QueryRangeResponse]{
|
||||
},
|
||||
}
|
||||
|
||||
var flamegraphResponse = &connect.Response[v1alpha1.QueryResponse]{
|
||||
Msg: &v1alpha1.QueryResponse{
|
||||
Report: &v1alpha1.QueryResponse_Flamegraph{
|
||||
Flamegraph: &v1alpha1.Flamegraph{
|
||||
Root: &v1alpha1.FlamegraphRootNode{
|
||||
Cumulative: 100,
|
||||
Diff: 0,
|
||||
Children: []*v1alpha1.FlamegraphNode{
|
||||
{
|
||||
Meta: &v1alpha1.FlamegraphNodeMeta{
|
||||
Function: &v1alpha11.Function{
|
||||
Name: "foo",
|
||||
},
|
||||
},
|
||||
Cumulative: 10,
|
||||
Diff: 0,
|
||||
Children: []*v1alpha1.FlamegraphNode{
|
||||
{
|
||||
Meta: &v1alpha1.FlamegraphNodeMeta{
|
||||
Function: &v1alpha11.Function{
|
||||
Name: "bar",
|
||||
},
|
||||
},
|
||||
Cumulative: 9,
|
||||
Diff: 0,
|
||||
Children: []*v1alpha1.FlamegraphNode{
|
||||
{
|
||||
Meta: &v1alpha1.FlamegraphNodeMeta{
|
||||
Function: &v1alpha11.Function{
|
||||
Name: "baz",
|
||||
},
|
||||
},
|
||||
Cumulative: 8,
|
||||
Diff: 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
// Copied from github.com/parca-dev/parca/pkg/query/flamegraph_arrow_test.go
|
||||
type flamegraphRow struct {
|
||||
LabelsOnly bool
|
||||
MappingStart uint64
|
||||
MappingLimit uint64
|
||||
MappingOffset uint64
|
||||
MappingFile string
|
||||
MappingBuildID string
|
||||
LocationAddress uint64
|
||||
Inlined bool
|
||||
LocationLine uint8
|
||||
FunctionStartLine uint8
|
||||
FunctionName string
|
||||
FunctionSystemName string
|
||||
FunctionFilename string
|
||||
Labels map[string]string
|
||||
Children []uint32
|
||||
Cumulative uint8
|
||||
CumulativePerSecond float64
|
||||
Flat uint8
|
||||
FlatPerSecond float64
|
||||
Diff int8
|
||||
}
|
||||
|
||||
type flamegraphColumns struct {
|
||||
labelsOnly []bool
|
||||
mappingFiles []string
|
||||
mappingBuildIDs []string
|
||||
locationAddresses []uint64
|
||||
inlined []bool
|
||||
locationLines []uint8
|
||||
functionStartLines []uint8
|
||||
functionNames []string
|
||||
functionSystemNames []string
|
||||
functionFileNames []string
|
||||
labels []map[string]string
|
||||
children [][]uint32
|
||||
cumulative []uint8
|
||||
cumulativePerSecond []float64
|
||||
flat []uint8
|
||||
flatPerSecond []float64
|
||||
diff []int8
|
||||
}
|
||||
|
||||
func rowsToColumn(rows []flamegraphRow) flamegraphColumns {
|
||||
columns := flamegraphColumns{}
|
||||
for _, row := range rows {
|
||||
columns.labelsOnly = append(columns.labelsOnly, row.LabelsOnly)
|
||||
columns.mappingFiles = append(columns.mappingFiles, row.MappingFile)
|
||||
columns.mappingBuildIDs = append(columns.mappingBuildIDs, row.MappingBuildID)
|
||||
columns.locationAddresses = append(columns.locationAddresses, row.LocationAddress)
|
||||
columns.locationLines = append(columns.locationLines, row.LocationLine)
|
||||
columns.inlined = append(columns.inlined, row.Inlined)
|
||||
columns.functionStartLines = append(columns.functionStartLines, row.FunctionStartLine)
|
||||
columns.functionNames = append(columns.functionNames, row.FunctionName)
|
||||
columns.functionSystemNames = append(columns.functionSystemNames, row.FunctionSystemName)
|
||||
columns.functionFileNames = append(columns.functionFileNames, row.FunctionFilename)
|
||||
columns.labels = append(columns.labels, row.Labels)
|
||||
columns.children = append(columns.children, row.Children)
|
||||
columns.cumulative = append(columns.cumulative, row.Cumulative)
|
||||
columns.cumulativePerSecond = append(columns.cumulativePerSecond, row.CumulativePerSecond)
|
||||
columns.flat = append(columns.flat, row.Flat)
|
||||
columns.flatPerSecond = append(columns.flatPerSecond, row.FlatPerSecond)
|
||||
columns.diff = append(columns.diff, row.Diff)
|
||||
}
|
||||
return columns
|
||||
}
|
||||
|
||||
func flamegraphResponse() *connect.Response[v1alpha1.QueryResponse] {
|
||||
mem := memory.NewGoAllocator()
|
||||
|
||||
rows := []flamegraphRow{
|
||||
{MappingStart: 0, MappingLimit: 0, MappingOffset: 0, MappingFile: array.NullValueStr, MappingBuildID: array.NullValueStr, LocationAddress: 0, LocationLine: 0, FunctionStartLine: 0, FunctionName: array.NullValueStr, FunctionSystemName: array.NullValueStr, FunctionFilename: array.NullValueStr, Cumulative: 11, CumulativePerSecond: 1.1, Flat: 0, FlatPerSecond: 0, Labels: nil, Children: []uint32{1}}, // 0
|
||||
{MappingStart: 1, MappingLimit: 1, MappingOffset: 0x1234, MappingFile: "a", MappingBuildID: "aID", LocationAddress: 0xa1, LocationLine: 1, FunctionStartLine: 1, FunctionName: "1", FunctionSystemName: "1", FunctionFilename: "1", Cumulative: 11, CumulativePerSecond: 1.1, Flat: 0, FlatPerSecond: 0, Labels: nil, Children: []uint32{2}}, // 1
|
||||
{MappingStart: 0, MappingLimit: 0, MappingOffset: 0, MappingFile: array.NullValueStr, MappingBuildID: array.NullValueStr, LocationAddress: 0x0, LocationLine: 0, FunctionStartLine: 0, FunctionName: "2", FunctionSystemName: array.NullValueStr, FunctionFilename: array.NullValueStr, Cumulative: 11, CumulativePerSecond: 1.1, Flat: 3, FlatPerSecond: 0.3, Labels: nil, Children: []uint32{3}}, // 2
|
||||
{MappingStart: 1, MappingLimit: 1, MappingOffset: 0x1234, MappingFile: "a", MappingBuildID: "aID", LocationAddress: 0xa3, LocationLine: 3, FunctionStartLine: 3, FunctionName: "3", FunctionSystemName: "3", FunctionFilename: "3", Cumulative: 8, CumulativePerSecond: 0.8, Flat: 0, FlatPerSecond: 0, Labels: nil, Children: []uint32{4, 5}}, // 3
|
||||
{MappingStart: 1, MappingLimit: 1, MappingOffset: 0x1234, MappingFile: "a", MappingBuildID: "aID", LocationAddress: 0xa4, LocationLine: 4, FunctionStartLine: 4, FunctionName: "4", FunctionSystemName: "4", FunctionFilename: "4", Cumulative: 3, CumulativePerSecond: 0.3, Flat: 3, FlatPerSecond: 0.3, Labels: nil, Children: nil}, // 4
|
||||
{MappingStart: 1, MappingLimit: 1, MappingOffset: 0x1234, MappingFile: "a", MappingBuildID: "aID", LocationAddress: 0xa5, LocationLine: 5, FunctionStartLine: 5, FunctionName: "5", FunctionSystemName: "5", FunctionFilename: "5", Cumulative: 5, CumulativePerSecond: 0.5, Flat: 5, FlatPerSecond: 0.5, Labels: nil, Children: nil}, // 5
|
||||
}
|
||||
columns := rowsToColumn(rows)
|
||||
|
||||
// This is a copy from Parca. A lot of fields aren't used by the Grafana datasource but are kept to make future updates easier.
|
||||
fields := []arrow.Field{
|
||||
// Location
|
||||
//{Name: FlamegraphFieldLabelsOnly, Type: arrow.FixedWidthTypes.Boolean},
|
||||
{Name: FlamegraphFieldLocationAddress, Type: arrow.PrimitiveTypes.Uint64},
|
||||
{Name: FlamegraphFieldMappingFile, Type: &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32, ValueType: arrow.BinaryTypes.Binary}},
|
||||
//{Name: FlamegraphFieldMappingBuildID, Type: fb.mappingBuildID.DataType()},
|
||||
// Function
|
||||
//{Name: FlamegraphFieldLocationLine, Type: fb.trimmedLocationLine.Type()},
|
||||
//{Name: FlamegraphFieldInlined, Type: arrow.FixedWidthTypes.Boolean, Nullable: true},
|
||||
//{Name: FlamegraphFieldFunctionStartLine, Type: fb.trimmedFunctionStartLine.Type()},
|
||||
{Name: FlamegraphFieldFunctionName, Type: &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32, ValueType: arrow.BinaryTypes.Binary}},
|
||||
//{Name: FlamegraphFieldFunctionSystemName, Type: fb.functionSystemName.DataType()},
|
||||
//{Name: FlamegraphFieldFunctionFileName, Type: fb.functionFilename.DataType()},
|
||||
// Values
|
||||
{Name: FlamegraphFieldChildren, Type: arrow.ListOf(arrow.PrimitiveTypes.Uint32)},
|
||||
{Name: FlamegraphFieldCumulative, Type: arrow.PrimitiveTypes.Uint8},
|
||||
//{Name: FlamegraphFieldCumulativePerSecond, Type: arrow.PrimitiveTypes.Float64},
|
||||
{Name: FlamegraphFieldFlat, Type: arrow.PrimitiveTypes.Uint8},
|
||||
//{Name: FlamegraphFieldFlatPerSecond, Type: arrow.PrimitiveTypes.Float64},
|
||||
//{Name: FlamegraphFieldDiff, Type: fb.trimmedDiff.Type()},
|
||||
//{Name: FlamegraphFieldDiffPerSecond, Type: arrow.PrimitiveTypes.Float64},
|
||||
}
|
||||
|
||||
builderLocationAddress := array.NewUint64Builder(mem)
|
||||
builderMappingFile := array.NewDictionaryBuilder(mem, fields[1].Type.(*arrow.DictionaryType)).(*array.BinaryDictionaryBuilder)
|
||||
builderFunctionName := array.NewDictionaryBuilder(mem, fields[2].Type.(*arrow.DictionaryType)).(*array.BinaryDictionaryBuilder)
|
||||
builderChildren := array.NewListBuilder(mem, arrow.PrimitiveTypes.Uint32)
|
||||
builderChildrenValues := builderChildren.ValueBuilder().(*array.Uint32Builder)
|
||||
builderCumulative := array.NewUint8Builder(mem)
|
||||
builderFlat := array.NewUint8Builder(mem)
|
||||
|
||||
defer func() {
|
||||
builderLocationAddress.Release()
|
||||
builderMappingFile.Release()
|
||||
builderFunctionName.Release()
|
||||
builderChildren.Release()
|
||||
builderCumulative.Release()
|
||||
builderFlat.Release()
|
||||
}()
|
||||
|
||||
for i := range columns.cumulative { // iterate over all rows in one of the columns
|
||||
if columns.mappingFiles[i] == array.NullValueStr {
|
||||
builderMappingFile.AppendNull()
|
||||
} else {
|
||||
_ = builderMappingFile.AppendString(columns.mappingFiles[i])
|
||||
}
|
||||
if columns.functionNames[i] == array.NullValueStr {
|
||||
builderFunctionName.AppendNull()
|
||||
} else {
|
||||
_ = builderFunctionName.AppendString(columns.functionNames[i])
|
||||
}
|
||||
if len(columns.children[i]) == 0 {
|
||||
builderChildren.AppendNull()
|
||||
} else {
|
||||
builderChildren.Append(true)
|
||||
for _, child := range columns.children[i] {
|
||||
builderChildrenValues.Append(child)
|
||||
}
|
||||
}
|
||||
|
||||
builderLocationAddress.Append(columns.locationAddresses[i])
|
||||
builderCumulative.Append(columns.cumulative[i])
|
||||
builderFlat.Append(columns.flat[i])
|
||||
}
|
||||
|
||||
record := array.NewRecord(
|
||||
arrow.NewSchema(fields, nil),
|
||||
[]arrow.Array{
|
||||
builderLocationAddress.NewArray(),
|
||||
builderMappingFile.NewArray(),
|
||||
builderFunctionName.NewArray(),
|
||||
builderChildren.NewArray(),
|
||||
builderCumulative.NewArray(),
|
||||
builderFlat.NewArray(),
|
||||
},
|
||||
int64(len(rows)),
|
||||
)
|
||||
|
||||
var buf bytes.Buffer
|
||||
w := ipc.NewWriter(&buf,
|
||||
ipc.WithSchema(record.Schema()),
|
||||
)
|
||||
defer func() {
|
||||
_ = w.Close()
|
||||
}()
|
||||
|
||||
if err := w.Write(record); err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &connect.Response[v1alpha1.QueryResponse]{
|
||||
Msg: &v1alpha1.QueryResponse{
|
||||
Report: &v1alpha1.QueryResponse_FlamegraphArrow{
|
||||
FlamegraphArrow: &v1alpha1.FlamegraphArrow{
|
||||
Record: buf.Bytes(),
|
||||
Unit: "nanoseconds",
|
||||
Height: 2,
|
||||
Trimmed: 0,
|
||||
},
|
||||
Total: 100,
|
||||
Unit: "samples",
|
||||
Height: 3,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
type FakeClient struct {
|
||||
@ -180,16 +327,16 @@ func (f *FakeClient) QueryRange(ctx context.Context, c *connect.Request[v1alpha1
|
||||
|
||||
func (f *FakeClient) Query(ctx context.Context, c *connect.Request[v1alpha1.QueryRequest]) (*connect.Response[v1alpha1.QueryResponse], error) {
|
||||
f.Req = c
|
||||
return flamegraphResponse, nil
|
||||
return flamegraphResponse(), nil
|
||||
}
|
||||
|
||||
func (f *FakeClient) Series(ctx context.Context, c *connect.Request[v1alpha1.SeriesRequest]) (*connect.Response[v1alpha1.SeriesResponse], error) {
|
||||
//TODO implement me
|
||||
// TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (f *FakeClient) ProfileTypes(ctx context.Context, c *connect.Request[v1alpha1.ProfileTypesRequest]) (*connect.Response[v1alpha1.ProfileTypesResponse], error) {
|
||||
//TODO implement me
|
||||
// TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
@ -212,6 +359,6 @@ func (f *FakeClient) Values(ctx context.Context, c *connect.Request[v1alpha1.Val
|
||||
}
|
||||
|
||||
func (f *FakeClient) ShareProfile(ctx context.Context, c *connect.Request[v1alpha1.ShareProfileRequest]) (*connect.Response[v1alpha1.ShareProfileResponse], error) {
|
||||
//TODO implement me
|
||||
// TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user