2023-09-20 09:31:22 -05:00
package pyroscope
2022-10-28 06:33:37 -05:00
import (
"context"
"encoding/json"
"fmt"
"math"
2023-04-27 03:50:11 -05:00
"sync"
2022-10-28 06:33:37 -05:00
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/gtime"
2023-10-13 07:13:51 -05:00
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
2022-10-28 06:33:37 -05:00
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/live"
2023-06-06 22:09:29 -05:00
"github.com/grafana/grafana/pkg/tsdb/grafana-pyroscope-datasource/kinds/dataquery"
2023-03-13 05:06:04 -05:00
"github.com/xlab/treeprint"
2023-10-13 07:13:51 -05:00
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
2023-04-27 03:50:11 -05:00
"golang.org/x/sync/errgroup"
2022-10-28 06:33:37 -05:00
)
type queryModel struct {
WithStreaming bool
2023-04-25 09:08:18 -05:00
dataquery . GrafanaPyroscopeDataQuery
2022-10-28 06:33:37 -05:00
}
type dsJsonModel struct {
MinStep string ` json:"minStep" `
}
const (
2023-09-20 09:31:22 -05:00
queryTypeProfile = string ( dataquery . PyroscopeQueryTypeProfile )
queryTypeMetrics = string ( dataquery . PyroscopeQueryTypeMetrics )
queryTypeBoth = string ( dataquery . PyroscopeQueryTypeBoth )
2022-10-28 06:33:37 -05:00
)
2023-09-20 09:31:22 -05:00
// query processes single Pyroscope query transforming the response to data.Frame packaged in DataResponse
func ( d * PyroscopeDatasource ) query ( ctx context . Context , pCtx backend . PluginContext , query backend . DataQuery ) backend . DataResponse {
2023-10-13 07:13:51 -05:00
ctx , span := tracing . DefaultTracer ( ) . Start ( ctx , "datasource.pyroscope.query" , trace . WithAttributes ( attribute . String ( "query_type" , query . QueryType ) ) )
defer span . End ( )
2022-10-28 06:33:37 -05:00
var qm queryModel
response := backend . DataResponse { }
err := json . Unmarshal ( query . JSON , & qm )
if err != nil {
2023-10-13 07:13:51 -05:00
span . RecordError ( err )
span . SetStatus ( codes . Error , err . Error ( ) )
2022-10-28 06:33:37 -05:00
response . Error = fmt . Errorf ( "error unmarshaling query model: %v" , err )
return response
}
2024-03-21 05:11:29 -05:00
profileTypeId := depointerizer ( qm . ProfileTypeId )
labelSelector := depointerizer ( qm . LabelSelector )
2023-04-27 03:50:11 -05:00
responseMutex := sync . Mutex { }
g , gCtx := errgroup . WithContext ( ctx )
2022-10-28 06:33:37 -05:00
if query . QueryType == queryTypeMetrics || query . QueryType == queryTypeBoth {
2023-04-27 03:50:11 -05:00
g . Go ( func ( ) error {
var dsJson dsJsonModel
err = json . Unmarshal ( pCtx . DataSourceInstanceSettings . JSONData , & dsJson )
if err != nil {
2023-10-13 07:13:51 -05:00
span . RecordError ( err )
span . SetStatus ( codes . Error , err . Error ( ) )
2023-04-27 03:50:11 -05:00
return fmt . Errorf ( "error unmarshaling datasource json model: %v" , err )
}
2022-10-28 06:33:37 -05:00
2023-04-27 03:50:11 -05:00
parsedInterval := time . Second * 15
if dsJson . MinStep != "" {
parsedInterval , err = gtime . ParseDuration ( dsJson . MinStep )
if err != nil {
parsedInterval = time . Second * 15
2023-10-31 05:57:17 -05:00
logger . Error ( "Failed to parse the MinStep using default" , "MinStep" , dsJson . MinStep , "function" , logEntrypoint ( ) )
2023-04-27 03:50:11 -05:00
}
}
2023-10-31 05:57:17 -05:00
logger . Debug ( "Sending SelectSeriesRequest" , "queryModel" , qm , "function" , logEntrypoint ( ) )
2023-04-27 03:50:11 -05:00
seriesResp , err := d . client . GetSeries (
gCtx ,
2024-03-21 05:11:29 -05:00
profileTypeId ,
labelSelector ,
2023-04-27 03:50:11 -05:00
query . TimeRange . From . UnixMilli ( ) ,
query . TimeRange . To . UnixMilli ( ) ,
qm . GroupBy ,
math . Max ( query . Interval . Seconds ( ) , parsedInterval . Seconds ( ) ) ,
)
2022-10-28 06:33:37 -05:00
if err != nil {
2023-10-13 07:13:51 -05:00
span . RecordError ( err )
span . SetStatus ( codes . Error , err . Error ( ) )
2023-10-31 05:57:17 -05:00
logger . Error ( "Querying SelectSeries()" , "err" , err , "function" , logEntrypoint ( ) )
2023-04-27 03:50:11 -05:00
return err
2022-10-28 06:33:37 -05:00
}
2023-04-27 03:50:11 -05:00
// add the frames to the response.
responseMutex . Lock ( )
response . Frames = append ( response . Frames , seriesToDataFrames ( seriesResp ) ... )
responseMutex . Unlock ( )
return nil
} )
2022-10-28 06:33:37 -05:00
}
if query . QueryType == queryTypeProfile || query . QueryType == queryTypeBoth {
2023-04-27 03:50:11 -05:00
g . Go ( func ( ) error {
2023-11-01 05:14:24 -05:00
var profileResp * ProfileResponse
if len ( qm . SpanSelector ) > 0 {
logger . Debug ( "Calling GetSpanProfile" , "queryModel" , qm , "function" , logEntrypoint ( ) )
2024-03-21 05:11:29 -05:00
prof , err := d . client . GetSpanProfile ( gCtx , profileTypeId , labelSelector , qm . SpanSelector , query . TimeRange . From . UnixMilli ( ) , query . TimeRange . To . UnixMilli ( ) , qm . MaxNodes )
2023-11-01 05:14:24 -05:00
if err != nil {
span . RecordError ( err )
span . SetStatus ( codes . Error , err . Error ( ) )
logger . Error ( "Error GetSpanProfile()" , "err" , err , "function" , logEntrypoint ( ) )
return err
}
profileResp = prof
} else {
logger . Debug ( "Calling GetProfile" , "queryModel" , qm , "function" , logEntrypoint ( ) )
2024-03-21 05:11:29 -05:00
prof , err := d . client . GetProfile ( gCtx , profileTypeId , labelSelector , query . TimeRange . From . UnixMilli ( ) , query . TimeRange . To . UnixMilli ( ) , qm . MaxNodes )
2023-11-01 05:14:24 -05:00
if err != nil {
span . RecordError ( err )
span . SetStatus ( codes . Error , err . Error ( ) )
logger . Error ( "Error GetProfile()" , "err" , err , "function" , logEntrypoint ( ) )
return err
}
profileResp = prof
2023-04-27 03:50:11 -05:00
}
2023-10-06 08:03:20 -05:00
var frame * data . Frame
2023-11-01 05:14:24 -05:00
if profileResp != nil {
frame = responseToDataFrames ( profileResp )
2023-10-06 08:03:20 -05:00
// If query called with streaming on then return a channel
// to subscribe on a client-side and consume updates from a plugin.
// Feel free to remove this if you don't need streaming for your datasource.
if qm . WithStreaming {
channel := live . Channel {
Scope : live . ScopeDatasource ,
Namespace : pCtx . DataSourceInstanceSettings . UID ,
Path : "stream" ,
}
frame . SetMeta ( & data . FrameMeta { Channel : channel . String ( ) } )
}
} else {
// We still send empty data frame to give feedback that query really run, just didn't return any data.
frame = getEmptyDataFrame ( )
}
2023-04-27 03:50:11 -05:00
responseMutex . Lock ( )
response . Frames = append ( response . Frames , frame )
responseMutex . Unlock ( )
2022-10-28 06:33:37 -05:00
2023-04-27 03:50:11 -05:00
return nil
} )
}
if err := g . Wait ( ) ; err != nil {
2023-10-13 07:13:51 -05:00
span . RecordError ( err )
span . SetStatus ( codes . Error , err . Error ( ) )
2023-04-27 03:50:11 -05:00
response . Error = g . Wait ( )
2022-10-28 06:33:37 -05:00
}
return response
}
2023-09-20 09:31:22 -05:00
// responseToDataFrames turns Pyroscope response to data.Frame. We encode the data into a nested set format where we have
2022-10-28 06:33:37 -05:00
// [level, value, label] columns and by ordering the items in a depth first traversal order we can recreate the whole
// tree back.
2023-04-25 09:08:18 -05:00
func responseToDataFrames ( resp * ProfileResponse ) * data . Frame {
tree := levelsToTree ( resp . Flamebearer . Levels , resp . Flamebearer . Names )
return treeToNestedSetDataFrame ( tree , resp . Units )
2022-10-28 06:33:37 -05:00
}
2023-03-31 08:27:14 -05:00
// START_OFFSET is offset of the bar relative to previous sibling
const START_OFFSET = 0
// VALUE_OFFSET is value or width of the bar
const VALUE_OFFSET = 1
// SELF_OFFSET is self value of the bar
const SELF_OFFSET = 2
// NAME_OFFSET is index into the names array
const NAME_OFFSET = 3
// ITEM_OFFSET Next bar. Each bar of the profile is represented by 4 number in a flat array.
const ITEM_OFFSET = 4
2022-10-28 06:33:37 -05:00
type ProfileTree struct {
2023-03-31 08:27:14 -05:00
Start int64
Value int64
Self int64
Level int
Name string
Nodes [ ] * ProfileTree
}
// levelsToTree converts flamebearer format into a tree. This is needed to then convert it into nested set format
2023-09-20 09:31:22 -05:00
// dataframe. This should be temporary, and ideally we should get some sort of tree struct directly from Pyroscope API.
2023-04-25 09:08:18 -05:00
func levelsToTree ( levels [ ] * Level , names [ ] string ) * ProfileTree {
if len ( levels ) == 0 {
return nil
}
2023-03-31 08:27:14 -05:00
tree := & ProfileTree {
Start : 0 ,
Value : levels [ 0 ] . Values [ VALUE_OFFSET ] ,
Self : levels [ 0 ] . Values [ SELF_OFFSET ] ,
Level : 0 ,
Name : names [ levels [ 0 ] . Values [ 0 ] ] ,
}
parentsStack := [ ] * ProfileTree { tree }
currentLevel := 1
// Cycle through each level
for {
if currentLevel >= len ( levels ) {
break
}
// If we still have levels to go, this should not happen. Something is probably wrong with the flamebearer data.
if len ( parentsStack ) == 0 {
2024-03-11 07:55:18 -05:00
logger . Error ( "ParentsStack is empty but we are not at the last level" , "currentLevel" , currentLevel , "function" , logEntrypoint ( ) )
2023-03-31 08:27:14 -05:00
break
}
var nextParentsStack [ ] * ProfileTree
currentParent := parentsStack [ : 1 ] [ 0 ]
parentsStack = parentsStack [ 1 : ]
itemIndex := 0
// cumulative offset as items in flamebearer format have just relative to prev item
offset := int64 ( 0 )
// Cycle through bar in a level
for {
if itemIndex >= len ( levels [ currentLevel ] . Values ) {
break
}
itemStart := levels [ currentLevel ] . Values [ itemIndex + START_OFFSET ] + offset
itemValue := levels [ currentLevel ] . Values [ itemIndex + VALUE_OFFSET ]
selfValue := levels [ currentLevel ] . Values [ itemIndex + SELF_OFFSET ]
itemEnd := itemStart + itemValue
parentEnd := currentParent . Start + currentParent . Value
if itemStart >= currentParent . Start && itemEnd <= parentEnd {
// We have an item that is in the bounds of current parent item, so it should be its child
treeItem := & ProfileTree {
Start : itemStart ,
Value : itemValue ,
Self : selfValue ,
Level : currentLevel ,
Name : names [ levels [ currentLevel ] . Values [ itemIndex + NAME_OFFSET ] ] ,
}
// Add to parent
currentParent . Nodes = append ( currentParent . Nodes , treeItem )
// Add this item as parent for the next level
nextParentsStack = append ( nextParentsStack , treeItem )
itemIndex += ITEM_OFFSET
// Update offset for next item. This is changing relative offset to absolute one.
offset = itemEnd
} else {
// We went out of parents bounds so lets move to next parent. We will evaluate the same item again, but
// we will check if it is a child of the next parent item in line.
if len ( parentsStack ) == 0 {
2023-10-31 05:57:17 -05:00
logger . Error ( "ParentsStack is empty but there are still items in current level" , "currentLevel" , currentLevel , "itemIndex" , itemIndex , "function" , logEntrypoint ( ) )
2023-03-31 08:27:14 -05:00
break
}
currentParent = parentsStack [ : 1 ] [ 0 ]
parentsStack = parentsStack [ 1 : ]
continue
}
}
parentsStack = nextParentsStack
currentLevel ++
}
return tree
2022-10-28 06:33:37 -05:00
}
2023-02-28 04:28:45 -06:00
type Function struct {
FunctionName string
FileName string // optional
Line int64 // optional
}
2022-10-28 06:33:37 -05:00
2023-02-28 04:28:45 -06:00
func ( f Function ) String ( ) string {
return fmt . Sprintf ( "%s:%s:%d" , f . FileName , f . FunctionName , f . Line )
}
2022-10-28 06:33:37 -05:00
2023-03-07 03:26:51 -06:00
func ( pt * ProfileTree ) String ( ) string {
2023-02-28 04:28:45 -06:00
type branch struct {
nodes [ ] * ProfileTree
treeprint . Tree
}
tree := treeprint . New ( )
2023-03-07 03:26:51 -06:00
for _ , n := range [ ] * ProfileTree { pt } {
2023-03-31 08:27:14 -05:00
b := tree . AddBranch ( fmt . Sprintf ( "%s: level %d self %d total %d" , n . Name , n . Level , n . Self , n . Value ) )
2023-02-28 04:28:45 -06:00
remaining := append ( [ ] * branch { } , & branch { nodes : n . Nodes , Tree : b } )
for len ( remaining ) > 0 {
current := remaining [ 0 ]
remaining = remaining [ 1 : ]
for _ , n := range current . nodes {
if len ( n . Nodes ) > 0 {
remaining = append ( remaining ,
& branch {
2023-03-31 08:27:14 -05:00
nodes : n . Nodes , Tree : current . Tree . AddBranch ( fmt . Sprintf ( "%s: level %d self %d total %d" , n . Name , n . Level , n . Self , n . Value ) ) ,
2023-02-28 04:28:45 -06:00
} ,
)
} else {
2023-03-31 08:27:14 -05:00
current . Tree . AddNode ( fmt . Sprintf ( "%s: level %d self %d total %d" , n . Name , n . Level , n . Self , n . Value ) )
2023-02-28 04:28:45 -06:00
}
}
2022-10-28 06:33:37 -05:00
}
2023-02-28 04:28:45 -06:00
}
return tree . String ( )
}
2022-10-28 06:33:37 -05:00
2023-10-06 08:03:20 -05:00
func getEmptyDataFrame ( ) * data . Frame {
var emptyProfileDataFrame = data . NewFrame ( "response" )
emptyProfileDataFrame . Meta = & data . FrameMeta { PreferredVisualization : "flamegraph" }
emptyProfileDataFrame . Fields = data . Fields {
data . NewField ( "level" , nil , [ ] int64 { } ) ,
data . NewField ( "value" , nil , [ ] int64 { } ) ,
data . NewField ( "self" , nil , [ ] int64 { } ) ,
data . NewField ( "label" , nil , [ ] string { } ) ,
}
return emptyProfileDataFrame
}
2022-10-28 06:33:37 -05:00
type CustomMeta struct {
ProfileTypeID string
}
// treeToNestedSetDataFrame walks the tree depth first and adds items into the dataframe. This is a nested set format
2023-03-13 05:06:04 -05:00
// where ordering the items in depth first order and knowing the level/depth of each item we can recreate the
// parent - child relationship without explicitly needing parent/child column, and we can later just iterate over the
2022-10-28 06:33:37 -05:00
// dataFrame to again basically walking depth first over the tree/profile.
2023-04-25 09:08:18 -05:00
func treeToNestedSetDataFrame ( tree * ProfileTree , unit string ) * data . Frame {
2022-10-28 06:33:37 -05:00
frame := data . NewFrame ( "response" )
frame . Meta = & data . FrameMeta { PreferredVisualization : "flamegraph" }
levelField := data . NewField ( "level" , nil , [ ] int64 { } )
valueField := data . NewField ( "value" , nil , [ ] int64 { } )
selfField := data . NewField ( "self" , nil , [ ] int64 { } )
// profileTypeID should encode the type of the profile with unit being the 3rd part
2023-04-25 09:08:18 -05:00
valueField . Config = & data . FieldConfig { Unit : unit }
selfField . Config = & data . FieldConfig { Unit : unit }
2023-03-31 08:27:14 -05:00
frame . Fields = data . Fields { levelField , valueField , selfField }
2023-03-13 05:06:04 -05:00
labelField := NewEnumField ( "label" , nil )
2022-10-28 06:33:37 -05:00
2023-03-16 08:02:56 -05:00
// Tree can be nil if profile was empty, we can still send empty frame in that case
if tree != nil {
walkTree ( tree , func ( tree * ProfileTree ) {
levelField . Append ( int64 ( tree . Level ) )
valueField . Append ( tree . Value )
selfField . Append ( tree . Self )
2023-03-31 08:27:14 -05:00
labelField . Append ( tree . Name )
2023-03-16 08:02:56 -05:00
} )
}
2023-03-13 05:06:04 -05:00
2023-03-31 08:27:14 -05:00
frame . Fields = append ( frame . Fields , labelField . GetField ( ) )
2022-10-28 06:33:37 -05:00
return frame
}
2023-03-13 05:06:04 -05:00
type EnumField struct {
field * data . Field
2023-07-24 06:06:38 -05:00
valuesMap map [ string ] data . EnumItemIndex
counter data . EnumItemIndex
2023-03-13 05:06:04 -05:00
}
func NewEnumField ( name string , labels data . Labels ) * EnumField {
return & EnumField {
2023-07-21 13:08:23 -05:00
field : data . NewField ( name , labels , [ ] data . EnumItemIndex { } ) ,
2023-07-24 06:06:38 -05:00
valuesMap : make ( map [ string ] data . EnumItemIndex ) ,
2023-03-13 05:06:04 -05:00
}
}
func ( e * EnumField ) Append ( value string ) {
if valueIndex , ok := e . valuesMap [ value ] ; ok {
e . field . Append ( valueIndex )
} else {
e . valuesMap [ value ] = e . counter
2023-07-24 06:06:38 -05:00
e . field . Append ( e . counter )
2023-03-13 05:06:04 -05:00
e . counter ++
}
}
func ( e * EnumField ) GetField ( ) * data . Field {
s := make ( [ ] string , len ( e . valuesMap ) )
for k , v := range e . valuesMap {
s [ v ] = k
}
e . field . SetConfig ( & data . FieldConfig {
TypeConfig : & data . FieldTypeConfig {
Enum : & data . EnumFieldConfig {
Text : s ,
} ,
} ,
} )
return e . field
}
2022-10-28 06:33:37 -05:00
func walkTree ( tree * ProfileTree , fn func ( tree * ProfileTree ) ) {
fn ( tree )
stack := tree . Nodes
for {
if len ( stack ) == 0 {
break
}
fn ( stack [ 0 ] )
if stack [ 0 ] . Nodes != nil {
stack = append ( stack [ 0 ] . Nodes , stack [ 1 : ] ... )
} else {
stack = stack [ 1 : ]
}
}
}
2023-04-25 09:08:18 -05:00
func seriesToDataFrames ( resp * SeriesResponse ) [ ] * data . Frame {
frames := make ( [ ] * data . Frame , 0 , len ( resp . Series ) )
2022-10-28 06:33:37 -05:00
2023-04-25 09:08:18 -05:00
for _ , series := range resp . Series {
2022-10-28 06:33:37 -05:00
// We create separate data frames as the series may not have the same length
frame := data . NewFrame ( "series" )
frame . Meta = & data . FrameMeta { PreferredVisualization : "graph" }
2023-01-23 10:44:27 -06:00
fields := make ( data . Fields , 0 , 2 )
2022-10-28 06:33:37 -05:00
timeField := data . NewField ( "time" , nil , [ ] time . Time { } )
fields = append ( fields , timeField )
labels := make ( map [ string ] string )
for _ , label := range series . Labels {
labels [ label . Name ] = label . Value
}
2023-04-25 09:08:18 -05:00
valueField := data . NewField ( resp . Label , labels , [ ] float64 { } )
valueField . Config = & data . FieldConfig { Unit : resp . Units }
2022-10-28 06:33:37 -05:00
for _ , point := range series . Points {
timeField . Append ( time . UnixMilli ( point . Timestamp ) )
valueField . Append ( point . Value )
}
fields = append ( fields , valueField )
frame . Fields = fields
frames = append ( frames , frame )
}
return frames
}
2024-03-21 05:11:29 -05:00
func depointerizer [ T any ] ( v * T ) T {
var emptyValue T
if v != nil {
emptyValue = * v
}
return emptyValue
}