diff --git a/docs/sources/datasources/parca.md b/docs/sources/datasources/parca.md index afc90846aea..69aeda93851 100644 --- a/docs/sources/datasources/parca.md +++ b/docs/sources/datasources/parca.md @@ -37,6 +37,12 @@ refs: Grafana ships with built-in support for Parca, a continuous profiling OSS database for analysis of CPU and memory usage, down to the line number and throughout time. Add it as a data source, and you are ready to query your profiles in [Explore](ref:explore). +## Supported Parca versions + +This data source supports these versions of Parca: + +- v0.19+ + ## Configure the Parca data source To configure basic settings for the data source, complete the following steps: diff --git a/go.mod b/go.mod index 755e25db2e4..2325cd13dc8 100644 --- a/go.mod +++ b/go.mod @@ -11,8 +11,8 @@ replace cuelang.org/go => github.com/grafana/cue v0.0.0-20230926092038-971951014 replace github.com/prometheus/prometheus => github.com/prometheus/prometheus v0.52.0 require ( - buf.build/gen/go/parca-dev/parca/bufbuild/connect-go v1.4.1-20221222094228-8b1d3d0f62e6.1 // @grafana/observability-traces-and-profiling - buf.build/gen/go/parca-dev/parca/protocolbuffers/go v1.33.0-20240414232344-9ca06271cb73.1 // @grafana/observability-traces-and-profiling + buf.build/gen/go/parca-dev/parca/bufbuild/connect-go v1.10.0-20240523185345-933eab74d046.1 // @grafana/observability-traces-and-profiling + buf.build/gen/go/parca-dev/parca/protocolbuffers/go v1.34.1-20240523185345-933eab74d046.1 // @grafana/observability-traces-and-profiling cloud.google.com/go/kms v1.15.7 // @grafana/grafana-backend-group cloud.google.com/go/storage v1.38.0 // @grafana/grafana-backend-group cuelang.org/go v0.6.0-0.dev // @grafana/grafana-as-code diff --git a/go.sum b/go.sum index 1a4fb31516e..984aaf81fd4 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ -buf.build/gen/go/parca-dev/parca/bufbuild/connect-go v1.4.1-20221222094228-8b1d3d0f62e6.1 h1:wQ75SnlaD0X30PnrmA+07A/5fnQWrAHy1mzv+CPB5Oo= -buf.build/gen/go/parca-dev/parca/bufbuild/connect-go v1.4.1-20221222094228-8b1d3d0f62e6.1/go.mod h1:VYzBTKhjl92cl3sv+xznQcJHCezU7qnI0FhBAUb4n8c= -buf.build/gen/go/parca-dev/parca/protocolbuffers/go v1.33.0-20240414232344-9ca06271cb73.1 h1:arEscdzM2EZPZT8x7tSzuBVgyZrnsKuvoftapClxUgw= -buf.build/gen/go/parca-dev/parca/protocolbuffers/go v1.33.0-20240414232344-9ca06271cb73.1/go.mod h1:/vvnaG5MGgbuJxYTucpo0QOom9xbSb6Y43za3/as9qk= +buf.build/gen/go/parca-dev/parca/bufbuild/connect-go v1.10.0-20240523185345-933eab74d046.1 h1:q6MBjQ5fj3ygW/ySxOck7iwLhPQWbzOKJRZYzXRBmBk= +buf.build/gen/go/parca-dev/parca/bufbuild/connect-go v1.10.0-20240523185345-933eab74d046.1/go.mod h1:DEC5vsD4oLn7c6QeBVfUS7WpW5iwmW5lXfhpsjVxVt0= +buf.build/gen/go/parca-dev/parca/protocolbuffers/go v1.34.1-20240523185345-933eab74d046.1 h1:Osqg+/+sFJKr5iyna6/AvyxXPY0jqaIGr3ltzzSDLRk= +buf.build/gen/go/parca-dev/parca/protocolbuffers/go v1.34.1-20240523185345-933eab74d046.1/go.mod h1:lbDqoSeErWK6pETEKo/LO+JmU2GbZqVE8ILESypLuZU= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.37.4/go.mod h1:NHPJ89PdicEuT9hdPXMROBD91xc5uRDxsMtSB16k7hw= @@ -4631,6 +4631,7 @@ google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= diff --git a/pkg/tsdb/parca/query.go b/pkg/tsdb/parca/query.go index 97e6a3e1ec5..9423ad6e62f 100644 --- a/pkg/tsdb/parca/query.go +++ b/pkg/tsdb/parca/query.go @@ -1,23 +1,29 @@ package parca import ( + "bytes" "context" "encoding/json" "fmt" + "strconv" "strings" "time" v1alpha1 "buf.build/gen/go/parca-dev/parca/protocolbuffers/go/parca/query/v1alpha1" + "github.com/apache/arrow/go/v15/arrow" + "github.com/apache/arrow/go/v15/arrow/array" + "github.com/apache/arrow/go/v15/arrow/ipc" "github.com/bufbuild/connect-go" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend/tracing" "github.com/grafana/grafana-plugin-sdk-go/data" - "github.com/grafana/grafana/pkg/tsdb/cloudwatch/utils" - "github.com/grafana/grafana/pkg/tsdb/parca/kinds/dataquery" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/grafana/grafana/pkg/tsdb/cloudwatch/utils" + "github.com/grafana/grafana/pkg/tsdb/parca/kinds/dataquery" ) type queryModel struct { @@ -65,13 +71,25 @@ func (d *ParcaDatasource) query(ctx context.Context, pCtx backend.PluginContext, ctxLogger.Debug("Querying SelectMergeStacktraces()", "queryModel", qm, "function", logEntrypoint()) resp, err := d.client.Query(ctx, makeProfileRequest(qm, query)) if err != nil { - response.Error = err + if strings.Contains(err.Error(), "invalid report type") { + response.Error = fmt.Errorf("try updating Parca to v0.19+: %v", err) + } else { + response.Error = err + } + ctxLogger.Error("Failed to process query", "error", err, "queryType", query.QueryType, "function", logEntrypoint()) span.RecordError(response.Error) span.SetStatus(codes.Error, response.Error.Error()) return response } - frame := responseToDataFrames(resp) + frame, err := responseToDataFrames(resp) + if err != nil { + response.Error = err + ctxLogger.Error("Failed to convert the response to a data frame", "error", err, "queryType", query.QueryType) + span.RecordError(response.Error) + span.SetStatus(codes.Error, response.Error.Error()) + return response + } response.Frames = append(response.Frames, frame) } @@ -93,9 +111,8 @@ func makeProfileRequest(qm queryModel, query backend.DataQuery) *connect.Request }, }, }, - // We should change this to QueryRequest_REPORT_TYPE_FLAMEGRAPH_TABLE later on // nolint:staticcheck - ReportType: v1alpha1.QueryRequest_REPORT_TYPE_FLAMEGRAPH_UNSPECIFIED, + ReportType: v1alpha1.QueryRequest_REPORT_TYPE_FLAMEGRAPH_ARROW, }, } } @@ -122,120 +139,14 @@ type CustomMeta struct { // responseToDataFrames turns Parca response to data.Frame. We encode the data into a nested set format where we have // [level, value, label] columns and by ordering the items in a depth first traversal order we can recreate the whole // tree back. -func responseToDataFrames(resp *connect.Response[v1alpha1.QueryResponse]) *data.Frame { - if flameResponse, ok := resp.Msg.Report.(*v1alpha1.QueryResponse_Flamegraph); ok { - frame := treeToNestedSetDataFrame(flameResponse.Flamegraph) - frame.Meta = &data.FrameMeta{PreferredVisualization: "flamegraph"} - return frame +func responseToDataFrames(resp *connect.Response[v1alpha1.QueryResponse]) (*data.Frame, error) { + if flameResponse, ok := resp.Msg.Report.(*v1alpha1.QueryResponse_FlamegraphArrow); ok { + return arrowToNestedSetDataFrame(flameResponse.FlamegraphArrow) } else { - panic("unknown report type returned from query") + return nil, fmt.Errorf("unknown report type returned from query. update parca") } } -// treeToNestedSetDataFrame walks the tree depth first and adds items into the dataframe. This is a nested set format -// where by ordering the items in depth first order and knowing the level/depth of each item we can recreate the -// parent - child relationship without explicitly needing parent/child column and we can later just iterate over the -// dataFrame to again basically walking depth first over the tree/profile. -func treeToNestedSetDataFrame(tree *v1alpha1.Flamegraph) *data.Frame { - frame := data.NewFrame("response") - - levelField := data.NewField("level", nil, []int64{}) - valueField := data.NewField("value", nil, []int64{}) - valueField.Config = &data.FieldConfig{Unit: normalizeUnit(tree.Unit)} - selfField := data.NewField("self", nil, []int64{}) - selfField.Config = &data.FieldConfig{Unit: normalizeUnit(tree.Unit)} - labelField := data.NewField("label", nil, []string{}) - frame.Fields = data.Fields{levelField, valueField, selfField, labelField} - - walkTree(tree.Root, func(level int64, value int64, name string, self int64) { - levelField.Append(level) - valueField.Append(value) - labelField.Append(name) - selfField.Append(self) - }) - return frame -} - -type Node struct { - Node *v1alpha1.FlamegraphNode - Level int64 -} - -func walkTree(tree *v1alpha1.FlamegraphRootNode, fn func(level int64, value int64, name string, self int64)) { - stack := make([]*Node, 0, len(tree.Children)) - var childrenValue int64 = 0 - - for _, child := range tree.Children { - childrenValue += child.Cumulative - stack = append(stack, &Node{Node: child, Level: 1}) - } - - fn(0, tree.Cumulative, "total", tree.Cumulative-childrenValue) - - for { - if len(stack) == 0 { - break - } - - // shift stack - node := stack[0] - stack = stack[1:] - childrenValue = 0 - - if node.Node.Children != nil { - var children []*Node - for _, child := range node.Node.Children { - childrenValue += child.Cumulative - children = append(children, &Node{Node: child, Level: node.Level + 1}) - } - // Put the children first so we do depth first traversal - stack = append(children, stack...) - } - fn(node.Level, node.Node.Cumulative, nodeName(node.Node), node.Node.Cumulative-childrenValue) - } -} - -func nodeName(node *v1alpha1.FlamegraphNode) string { - if node.Meta == nil { - return "" - } - - mapping := "" - if node.Meta.Mapping != nil && node.Meta.Mapping.File != "" { - mapping = "[" + getLastItem(node.Meta.Mapping.File) + "] " - } - - if node.Meta.Function != nil && node.Meta.Function.Name != "" { - return mapping + node.Meta.Function.Name - } - - address := "" - if node.Meta.Location != nil { - address = fmt.Sprintf("0x%x", node.Meta.Location.Address) - } - - if mapping == "" && address == "" { - return "" - } else { - return mapping + address - } -} - -func getLastItem(path string) string { - parts := strings.Split(path, "/") - return parts[len(parts)-1] -} - -func normalizeUnit(unit string) string { - if unit == "nanoseconds" { - return "ns" - } - if unit == "count" { - return "short" - } - return unit -} - func seriesToDataFrame(seriesResp *connect.Response[v1alpha1.QueryRangeResponse], profileTypeID string) []*data.Frame { frames := make([]*data.Frame, 0, len(seriesResp.Msg.Series)) @@ -266,3 +177,208 @@ func seriesToDataFrame(seriesResp *connect.Response[v1alpha1.QueryRangeResponse] return frames } + +func arrowToNestedSetDataFrame(flamegraph *v1alpha1.FlamegraphArrow) (*data.Frame, error) { + frame := data.NewFrame("response") + frame.Meta = &data.FrameMeta{PreferredVisualization: "flamegraph"} + + levelField := data.NewField("level", nil, []int64{}) + valueField := data.NewField("value", nil, []int64{}) + valueField.Config = &data.FieldConfig{Unit: normalizeUnit(flamegraph.Unit)} + selfField := data.NewField("self", nil, []int64{}) + selfField.Config = &data.FieldConfig{Unit: normalizeUnit(flamegraph.Unit)} + labelField := data.NewField("label", nil, []string{}) + frame.Fields = data.Fields{levelField, valueField, selfField, labelField} + + arrowReader, err := ipc.NewReader(bytes.NewBuffer(flamegraph.GetRecord())) + if err != nil { + return nil, err + } + defer arrowReader.Release() + + arrowReader.Next() + rec := arrowReader.Record() + + fi, err := newFlamegraphIterator(rec) + if err != nil { + return nil, fmt.Errorf("failed to create flamegraph iterator: %w", err) + } + + fi.iterate(func(name string, level, value, self int64) { + labelField.Append(name) + levelField.Append(level) + valueField.Append(value) + selfField.Append(self) + }) + + return frame, nil +} + +const ( + FlamegraphFieldMappingFile = "mapping_file" + FlamegraphFieldLocationAddress = "location_address" + FlamegraphFieldFunctionName = "function_name" + FlamegraphFieldChildren = "children" + FlamegraphFieldCumulative = "cumulative" + FlamegraphFieldFlat = "flat" +) + +type flamegraphIterator struct { + columnChildren *array.List + columnChildrenValues *array.Uint32 + columnCumulative func(i int) int64 + columnMappingFile *array.Dictionary + columnMappingFileDict *array.Binary + columnFunctionName *array.Dictionary + columnFunctionNameDict *array.Binary + columnLocationAddress *array.Uint64 + + nameBuilder *bytes.Buffer + addressBuilder *bytes.Buffer +} + +func newFlamegraphIterator(rec arrow.Record) (*flamegraphIterator, error) { + schema := rec.Schema() + + columnChildren := rec.Column(schema.FieldIndices(FlamegraphFieldChildren)[0]).(*array.List) + columnChildrenValues := columnChildren.ListValues().(*array.Uint32) + columnCumulative := uintValue(rec.Column(schema.FieldIndices(FlamegraphFieldCumulative)[0])) + + columnMappingFile := rec.Column(schema.FieldIndices(FlamegraphFieldMappingFile)[0]).(*array.Dictionary) + columnMappingFileDict := columnMappingFile.Dictionary().(*array.Binary) + columnFunctionName := rec.Column(schema.FieldIndices(FlamegraphFieldFunctionName)[0]).(*array.Dictionary) + columnFunctionNameDict := columnFunctionName.Dictionary().(*array.Binary) + columnLocationAddress := rec.Column(schema.FieldIndices(FlamegraphFieldLocationAddress)[0]).(*array.Uint64) + + return &flamegraphIterator{ + columnChildren: columnChildren, + columnChildrenValues: columnChildrenValues, + columnCumulative: columnCumulative, + columnMappingFile: columnMappingFile, + columnMappingFileDict: columnMappingFileDict, + columnFunctionName: columnFunctionName, + columnFunctionNameDict: columnFunctionNameDict, + columnLocationAddress: columnLocationAddress, + + nameBuilder: &bytes.Buffer{}, + addressBuilder: &bytes.Buffer{}, + }, nil +} + +func (fi *flamegraphIterator) iterate(fn func(name string, level, value, self int64)) { + type rowNode struct { + row int + level int64 + } + childrenStart, childrenEnd := fi.columnChildren.ValueOffsets(0) + stack := make([]rowNode, 0, childrenEnd-childrenStart) + var childrenValue int64 = 0 + + for i := int(childrenStart); i < int(childrenEnd); i++ { + child := int(fi.columnChildrenValues.Value(i)) + childrenValue += fi.columnCumulative(child) + stack = append(stack, rowNode{row: child, level: 1}) + } + + cumulative := fi.columnCumulative(0) + fn("total", 0, cumulative, cumulative-childrenValue) + + for { + if len(stack) == 0 { + break + } + + // shift stack + node := stack[0] + stack = stack[1:] + childrenValue = 0 + + // Get the children for this node and add them to the stack if they exist. + start, end := fi.columnChildren.ValueOffsets(node.row) + children := make([]rowNode, 0, end-start) + for i := start; i < end; i++ { + child := fi.columnChildrenValues.Value(int(i)) + if fi.columnChildrenValues.IsValid(int(child)) { + childrenValue += fi.columnCumulative(int(child)) + children = append(children, rowNode{row: int(child), level: node.level + 1}) + } + } + // prepend the new children to the top of the stack + stack = append(children, stack...) + + cumulative := fi.columnCumulative(node.row) + name := fi.nodeName(node.row) + fn(name, node.level, cumulative, cumulative-childrenValue) + } +} + +func (fi *flamegraphIterator) nodeName(row int) string { + fi.nameBuilder.Reset() + fi.addressBuilder.Reset() + + if fi.columnMappingFile.IsValid(row) { + m := fi.columnMappingFileDict.ValueString(fi.columnMappingFile.GetValueIndex(row)) + fi.nameBuilder.WriteString("[") + fi.nameBuilder.WriteString(getLastItem(m)) + fi.nameBuilder.WriteString("]") + fi.nameBuilder.WriteString(" ") + } + if fi.columnFunctionName.IsValid(row) { + if f := fi.columnFunctionNameDict.ValueString(fi.columnFunctionName.GetValueIndex(row)); f != "" { + fi.nameBuilder.WriteString(f) + return fi.nameBuilder.String() + } + } + + if fi.columnLocationAddress.IsValid(row) { + a := fi.columnLocationAddress.Value(row) + fi.addressBuilder.WriteString("0x") + fi.addressBuilder.WriteString(strconv.FormatUint(a, 16)) + } + + if fi.nameBuilder.Len() == 0 && fi.addressBuilder.Len() == 0 { + return "" + } else { + return fi.nameBuilder.String() + fi.addressBuilder.String() + } +} + +// uintValue is a wrapper to read different uint sizes. +// Parca returns values encoded depending on the max value in an array. +func uintValue(arr arrow.Array) func(i int) int64 { + switch b := arr.(type) { + case *array.Uint64: + return func(i int) int64 { + return int64(b.Value(i)) + } + case *array.Uint32: + return func(i int) int64 { + return int64(b.Value(i)) + } + case *array.Uint16: + return func(i int) int64 { + return int64(b.Value(i)) + } + case *array.Uint8: + return func(i int) int64 { + return int64(b.Value(i)) + } + default: + panic(fmt.Errorf("unsupported type %T", b)) + } +} + +func getLastItem(path string) string { + parts := strings.Split(path, "/") + return parts[len(parts)-1] +} + +func normalizeUnit(unit string) string { + if unit == "nanoseconds" { + return "ns" + } + if unit == "count" { + return "short" + } + return unit +} diff --git a/pkg/tsdb/parca/query_test.go b/pkg/tsdb/parca/query_test.go index f4a035f0449..1f1c7a6638d 100644 --- a/pkg/tsdb/parca/query_test.go +++ b/pkg/tsdb/parca/query_test.go @@ -1,13 +1,17 @@ package parca import ( + "bytes" "context" "testing" "time" - v1alpha11 "buf.build/gen/go/parca-dev/parca/protocolbuffers/go/parca/metastore/v1alpha1" profilestore "buf.build/gen/go/parca-dev/parca/protocolbuffers/go/parca/profilestore/v1alpha1" v1alpha1 "buf.build/gen/go/parca-dev/parca/protocolbuffers/go/parca/query/v1alpha1" + "github.com/apache/arrow/go/v15/arrow" + "github.com/apache/arrow/go/v15/arrow/array" + "github.com/apache/arrow/go/v15/arrow/ipc" + "github.com/apache/arrow/go/v15/arrow/memory" "github.com/bufbuild/connect-go" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" @@ -38,7 +42,7 @@ func Test_query(t *testing.T) { require.Nil(t, resp.Error) require.Equal(t, 2, len(resp.Frames)) require.Equal(t, "time", resp.Frames[0].Fields[0].Name) - require.Equal(t, data.NewField("level", nil, []int64{0, 1, 2, 3}), resp.Frames[1].Fields[0]) + require.Equal(t, data.NewField("level", nil, []int64{0, 1, 2, 3, 4, 4}), resp.Frames[1].Fields[0]) }) t.Run("query profile", func(t *testing.T) { @@ -46,7 +50,7 @@ func Test_query(t *testing.T) { resp := ds.query(context.Background(), backend.PluginContext{}, dataQuery) require.Nil(t, resp.Error) require.Equal(t, 1, len(resp.Frames)) - require.Equal(t, data.NewField("level", nil, []int64{0, 1, 2, 3}), resp.Frames[0].Fields[0]) + require.Equal(t, data.NewField("level", nil, []int64{0, 1, 2, 3, 4, 4}), resp.Frames[0].Fields[0]) }) t.Run("query metrics", func(t *testing.T) { @@ -60,22 +64,28 @@ func Test_query(t *testing.T) { // This is where the tests for the datasource backend live. func Test_profileToDataFrame(t *testing.T) { - frame := responseToDataFrames(flamegraphResponse) + frame, err := responseToDataFrames(flamegraphResponse()) + require.NoError(t, err) require.Equal(t, 4, len(frame.Fields)) - require.Equal(t, data.NewField("level", nil, []int64{0, 1, 2, 3}), frame.Fields[0]) - values := data.NewField("value", nil, []int64{100, 10, 9, 8}) - values.Config = &data.FieldConfig{ - Unit: "samples", - } + require.Equal(t, data.NewField("level", nil, []int64{0, 1, 2, 3, 4, 4}), frame.Fields[0]) + values := data.NewField("value", nil, []int64{11, 11, 11, 8, 3, 5}) + values.Config = &data.FieldConfig{Unit: "ns"} require.Equal(t, values, frame.Fields[1]) - self := data.NewField("self", nil, []int64{90, 1, 1, 8}) - self.Config = &data.FieldConfig{ - Unit: "samples", - } + self := data.NewField("self", nil, []int64{0, 0, 3, 0, 3, 5}) + self.Config = &data.FieldConfig{Unit: "ns"} require.Equal(t, self, frame.Fields[2]) - require.Equal(t, data.NewField("label", nil, []string{"total", "foo", "bar", "baz"}), frame.Fields[3]) + require.Equal(t, data.NewField("label", nil, []string{"total", "[a] 1", "2", "[a] 3", "[a] 4", "[a] 5"}), frame.Fields[3]) +} + +func BenchmarkProfileToDataFrame(b *testing.B) { + response := flamegraphResponse() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _, _ = responseToDataFrames(response) + } } func Test_seriesToDataFrame(t *testing.T) { @@ -121,53 +131,190 @@ var rangeResponse = &connect.Response[v1alpha1.QueryRangeResponse]{ }, } -var flamegraphResponse = &connect.Response[v1alpha1.QueryResponse]{ - Msg: &v1alpha1.QueryResponse{ - Report: &v1alpha1.QueryResponse_Flamegraph{ - Flamegraph: &v1alpha1.Flamegraph{ - Root: &v1alpha1.FlamegraphRootNode{ - Cumulative: 100, - Diff: 0, - Children: []*v1alpha1.FlamegraphNode{ - { - Meta: &v1alpha1.FlamegraphNodeMeta{ - Function: &v1alpha11.Function{ - Name: "foo", - }, - }, - Cumulative: 10, - Diff: 0, - Children: []*v1alpha1.FlamegraphNode{ - { - Meta: &v1alpha1.FlamegraphNodeMeta{ - Function: &v1alpha11.Function{ - Name: "bar", - }, - }, - Cumulative: 9, - Diff: 0, - Children: []*v1alpha1.FlamegraphNode{ - { - Meta: &v1alpha1.FlamegraphNodeMeta{ - Function: &v1alpha11.Function{ - Name: "baz", - }, - }, - Cumulative: 8, - Diff: 0, - }, - }, - }, - }, - }, - }, +// Copied from github.com/parca-dev/parca/pkg/query/flamegraph_arrow_test.go +type flamegraphRow struct { + LabelsOnly bool + MappingStart uint64 + MappingLimit uint64 + MappingOffset uint64 + MappingFile string + MappingBuildID string + LocationAddress uint64 + Inlined bool + LocationLine uint8 + FunctionStartLine uint8 + FunctionName string + FunctionSystemName string + FunctionFilename string + Labels map[string]string + Children []uint32 + Cumulative uint8 + CumulativePerSecond float64 + Flat uint8 + FlatPerSecond float64 + Diff int8 +} + +type flamegraphColumns struct { + labelsOnly []bool + mappingFiles []string + mappingBuildIDs []string + locationAddresses []uint64 + inlined []bool + locationLines []uint8 + functionStartLines []uint8 + functionNames []string + functionSystemNames []string + functionFileNames []string + labels []map[string]string + children [][]uint32 + cumulative []uint8 + cumulativePerSecond []float64 + flat []uint8 + flatPerSecond []float64 + diff []int8 +} + +func rowsToColumn(rows []flamegraphRow) flamegraphColumns { + columns := flamegraphColumns{} + for _, row := range rows { + columns.labelsOnly = append(columns.labelsOnly, row.LabelsOnly) + columns.mappingFiles = append(columns.mappingFiles, row.MappingFile) + columns.mappingBuildIDs = append(columns.mappingBuildIDs, row.MappingBuildID) + columns.locationAddresses = append(columns.locationAddresses, row.LocationAddress) + columns.locationLines = append(columns.locationLines, row.LocationLine) + columns.inlined = append(columns.inlined, row.Inlined) + columns.functionStartLines = append(columns.functionStartLines, row.FunctionStartLine) + columns.functionNames = append(columns.functionNames, row.FunctionName) + columns.functionSystemNames = append(columns.functionSystemNames, row.FunctionSystemName) + columns.functionFileNames = append(columns.functionFileNames, row.FunctionFilename) + columns.labels = append(columns.labels, row.Labels) + columns.children = append(columns.children, row.Children) + columns.cumulative = append(columns.cumulative, row.Cumulative) + columns.cumulativePerSecond = append(columns.cumulativePerSecond, row.CumulativePerSecond) + columns.flat = append(columns.flat, row.Flat) + columns.flatPerSecond = append(columns.flatPerSecond, row.FlatPerSecond) + columns.diff = append(columns.diff, row.Diff) + } + return columns +} + +func flamegraphResponse() *connect.Response[v1alpha1.QueryResponse] { + mem := memory.NewGoAllocator() + + rows := []flamegraphRow{ + {MappingStart: 0, MappingLimit: 0, MappingOffset: 0, MappingFile: array.NullValueStr, MappingBuildID: array.NullValueStr, LocationAddress: 0, LocationLine: 0, FunctionStartLine: 0, FunctionName: array.NullValueStr, FunctionSystemName: array.NullValueStr, FunctionFilename: array.NullValueStr, Cumulative: 11, CumulativePerSecond: 1.1, Flat: 0, FlatPerSecond: 0, Labels: nil, Children: []uint32{1}}, // 0 + {MappingStart: 1, MappingLimit: 1, MappingOffset: 0x1234, MappingFile: "a", MappingBuildID: "aID", LocationAddress: 0xa1, LocationLine: 1, FunctionStartLine: 1, FunctionName: "1", FunctionSystemName: "1", FunctionFilename: "1", Cumulative: 11, CumulativePerSecond: 1.1, Flat: 0, FlatPerSecond: 0, Labels: nil, Children: []uint32{2}}, // 1 + {MappingStart: 0, MappingLimit: 0, MappingOffset: 0, MappingFile: array.NullValueStr, MappingBuildID: array.NullValueStr, LocationAddress: 0x0, LocationLine: 0, FunctionStartLine: 0, FunctionName: "2", FunctionSystemName: array.NullValueStr, FunctionFilename: array.NullValueStr, Cumulative: 11, CumulativePerSecond: 1.1, Flat: 3, FlatPerSecond: 0.3, Labels: nil, Children: []uint32{3}}, // 2 + {MappingStart: 1, MappingLimit: 1, MappingOffset: 0x1234, MappingFile: "a", MappingBuildID: "aID", LocationAddress: 0xa3, LocationLine: 3, FunctionStartLine: 3, FunctionName: "3", FunctionSystemName: "3", FunctionFilename: "3", Cumulative: 8, CumulativePerSecond: 0.8, Flat: 0, FlatPerSecond: 0, Labels: nil, Children: []uint32{4, 5}}, // 3 + {MappingStart: 1, MappingLimit: 1, MappingOffset: 0x1234, MappingFile: "a", MappingBuildID: "aID", LocationAddress: 0xa4, LocationLine: 4, FunctionStartLine: 4, FunctionName: "4", FunctionSystemName: "4", FunctionFilename: "4", Cumulative: 3, CumulativePerSecond: 0.3, Flat: 3, FlatPerSecond: 0.3, Labels: nil, Children: nil}, // 4 + {MappingStart: 1, MappingLimit: 1, MappingOffset: 0x1234, MappingFile: "a", MappingBuildID: "aID", LocationAddress: 0xa5, LocationLine: 5, FunctionStartLine: 5, FunctionName: "5", FunctionSystemName: "5", FunctionFilename: "5", Cumulative: 5, CumulativePerSecond: 0.5, Flat: 5, FlatPerSecond: 0.5, Labels: nil, Children: nil}, // 5 + } + columns := rowsToColumn(rows) + + // This is a copy from Parca. A lot of fields aren't used by the Grafana datasource but are kept to make future updates easier. + fields := []arrow.Field{ + // Location + //{Name: FlamegraphFieldLabelsOnly, Type: arrow.FixedWidthTypes.Boolean}, + {Name: FlamegraphFieldLocationAddress, Type: arrow.PrimitiveTypes.Uint64}, + {Name: FlamegraphFieldMappingFile, Type: &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32, ValueType: arrow.BinaryTypes.Binary}}, + //{Name: FlamegraphFieldMappingBuildID, Type: fb.mappingBuildID.DataType()}, + // Function + //{Name: FlamegraphFieldLocationLine, Type: fb.trimmedLocationLine.Type()}, + //{Name: FlamegraphFieldInlined, Type: arrow.FixedWidthTypes.Boolean, Nullable: true}, + //{Name: FlamegraphFieldFunctionStartLine, Type: fb.trimmedFunctionStartLine.Type()}, + {Name: FlamegraphFieldFunctionName, Type: &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32, ValueType: arrow.BinaryTypes.Binary}}, + //{Name: FlamegraphFieldFunctionSystemName, Type: fb.functionSystemName.DataType()}, + //{Name: FlamegraphFieldFunctionFileName, Type: fb.functionFilename.DataType()}, + // Values + {Name: FlamegraphFieldChildren, Type: arrow.ListOf(arrow.PrimitiveTypes.Uint32)}, + {Name: FlamegraphFieldCumulative, Type: arrow.PrimitiveTypes.Uint8}, + //{Name: FlamegraphFieldCumulativePerSecond, Type: arrow.PrimitiveTypes.Float64}, + {Name: FlamegraphFieldFlat, Type: arrow.PrimitiveTypes.Uint8}, + //{Name: FlamegraphFieldFlatPerSecond, Type: arrow.PrimitiveTypes.Float64}, + //{Name: FlamegraphFieldDiff, Type: fb.trimmedDiff.Type()}, + //{Name: FlamegraphFieldDiffPerSecond, Type: arrow.PrimitiveTypes.Float64}, + } + + builderLocationAddress := array.NewUint64Builder(mem) + builderMappingFile := array.NewDictionaryBuilder(mem, fields[1].Type.(*arrow.DictionaryType)).(*array.BinaryDictionaryBuilder) + builderFunctionName := array.NewDictionaryBuilder(mem, fields[2].Type.(*arrow.DictionaryType)).(*array.BinaryDictionaryBuilder) + builderChildren := array.NewListBuilder(mem, arrow.PrimitiveTypes.Uint32) + builderChildrenValues := builderChildren.ValueBuilder().(*array.Uint32Builder) + builderCumulative := array.NewUint8Builder(mem) + builderFlat := array.NewUint8Builder(mem) + + defer func() { + builderLocationAddress.Release() + builderMappingFile.Release() + builderFunctionName.Release() + builderChildren.Release() + builderCumulative.Release() + builderFlat.Release() + }() + + for i := range columns.cumulative { // iterate over all rows in one of the columns + if columns.mappingFiles[i] == array.NullValueStr { + builderMappingFile.AppendNull() + } else { + _ = builderMappingFile.AppendString(columns.mappingFiles[i]) + } + if columns.functionNames[i] == array.NullValueStr { + builderFunctionName.AppendNull() + } else { + _ = builderFunctionName.AppendString(columns.functionNames[i]) + } + if len(columns.children[i]) == 0 { + builderChildren.AppendNull() + } else { + builderChildren.Append(true) + for _, child := range columns.children[i] { + builderChildrenValues.Append(child) + } + } + + builderLocationAddress.Append(columns.locationAddresses[i]) + builderCumulative.Append(columns.cumulative[i]) + builderFlat.Append(columns.flat[i]) + } + + record := array.NewRecord( + arrow.NewSchema(fields, nil), + []arrow.Array{ + builderLocationAddress.NewArray(), + builderMappingFile.NewArray(), + builderFunctionName.NewArray(), + builderChildren.NewArray(), + builderCumulative.NewArray(), + builderFlat.NewArray(), + }, + int64(len(rows)), + ) + + var buf bytes.Buffer + w := ipc.NewWriter(&buf, + ipc.WithSchema(record.Schema()), + ) + defer func() { + _ = w.Close() + }() + + if err := w.Write(record); err != nil { + return nil + } + + return &connect.Response[v1alpha1.QueryResponse]{ + Msg: &v1alpha1.QueryResponse{ + Report: &v1alpha1.QueryResponse_FlamegraphArrow{ + FlamegraphArrow: &v1alpha1.FlamegraphArrow{ + Record: buf.Bytes(), + Unit: "nanoseconds", + Height: 2, + Trimmed: 0, }, - Total: 100, - Unit: "samples", - Height: 3, }, }, - }, + } } type FakeClient struct { @@ -180,16 +327,16 @@ func (f *FakeClient) QueryRange(ctx context.Context, c *connect.Request[v1alpha1 func (f *FakeClient) Query(ctx context.Context, c *connect.Request[v1alpha1.QueryRequest]) (*connect.Response[v1alpha1.QueryResponse], error) { f.Req = c - return flamegraphResponse, nil + return flamegraphResponse(), nil } func (f *FakeClient) Series(ctx context.Context, c *connect.Request[v1alpha1.SeriesRequest]) (*connect.Response[v1alpha1.SeriesResponse], error) { - //TODO implement me + // TODO implement me panic("implement me") } func (f *FakeClient) ProfileTypes(ctx context.Context, c *connect.Request[v1alpha1.ProfileTypesRequest]) (*connect.Response[v1alpha1.ProfileTypesResponse], error) { - //TODO implement me + // TODO implement me panic("implement me") } @@ -212,6 +359,6 @@ func (f *FakeClient) Values(ctx context.Context, c *connect.Request[v1alpha1.Val } func (f *FakeClient) ShareProfile(ctx context.Context, c *connect.Request[v1alpha1.ShareProfileRequest]) (*connect.Response[v1alpha1.ShareProfileResponse], error) { - //TODO implement me + // TODO implement me panic("implement me") }