diff --git a/go.mod b/go.mod index a950fbf387a..035d3c70e2d 100644 --- a/go.mod +++ b/go.mod @@ -242,7 +242,7 @@ require ( github.com/google/go-github/v45 v45.2.0 github.com/grafana/codejen v0.0.3 github.com/grafana/dskit v0.0.0-20230202092222-880a7f8141cc - github.com/grafana/phlare/api v0.1.3 + github.com/grafana/phlare/api v0.1.4-0.20230426005640-f90edba05413 github.com/huandu/xstrings v1.3.1 github.com/jmoiron/sqlx v1.3.5 github.com/matryer/is v1.4.0 diff --git a/go.sum b/go.sum index 9d33a7965f4..727940e1869 100644 --- a/go.sum +++ b/go.sum @@ -1268,10 +1268,6 @@ github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/grafana/alerting v0.0.0-20230315185333-d1e3c68ac064 h1:MtsWzSTav7NGKolO+TaJQUcyR7VY0YpUROVsJX8ktIU= -github.com/grafana/alerting v0.0.0-20230315185333-d1e3c68ac064/go.mod h1:nHfrSTdV7/l74N5/ezqlQ+JwSvIChhN3G5+PjCfwG/E= -github.com/grafana/alerting v0.0.0-20230410151633-4a7ecc241d72 h1:WuQGIUeDIyPviylMaMD1d2nEYIiD/icHYO0rc/AH8kQ= -github.com/grafana/alerting v0.0.0-20230410151633-4a7ecc241d72/go.mod h1:nHfrSTdV7/l74N5/ezqlQ+JwSvIChhN3G5+PjCfwG/E= github.com/grafana/alerting v0.0.0-20230418161049-5f374e58cb32 h1:LdPoVBj+CA5oHLeUejDzqy8/c4Fa0UfTtCcOHka0Jws= github.com/grafana/alerting v0.0.0-20230418161049-5f374e58cb32/go.mod h1:nHfrSTdV7/l74N5/ezqlQ+JwSvIChhN3G5+PjCfwG/E= github.com/grafana/alerting v0.0.0-20230426173942-011a41e1fbe2 h1:teRmmE08bSnvyh3e+adfv/6RA1ZZdhTCmNL9Ckfm1Rk= @@ -1302,8 +1298,8 @@ github.com/grafana/grafana-plugin-sdk-go v0.159.0 h1:tjqzTe/wz+1zzaeHpOJvBvSvZFx github.com/grafana/grafana-plugin-sdk-go v0.159.0/go.mod h1:7/F3lL/w3MGuZTwu0jam5oL/4zNOQmhs2k8WZ9VlzhI= github.com/grafana/kindsys v0.0.0-20230414093523-5df3e256ebc0 h1:Or8DllkxKq3+kLXAS/tkr5K2Yv745c6K2rRvslGWGd4= github.com/grafana/kindsys v0.0.0-20230414093523-5df3e256ebc0/go.mod h1:GNcfpy5+SY6RVbNGQW264gC0r336Dm+0zgQ5vt6+M8Y= -github.com/grafana/phlare/api v0.1.3 h1:mYTaE9mLsAW/uzPXlW/PQSLsZ4ojBFA+oAMfR/PDdw8= -github.com/grafana/phlare/api v0.1.3/go.mod h1:29vcLwFDmZBDce2jwFIMtzvof7fzPadT8VMKw9ks7FU= +github.com/grafana/phlare/api v0.1.4-0.20230426005640-f90edba05413 h1:bBzCezZNRyYlJpXTkyZdY4fpPxHZUdyeyRWzhtw/P6I= +github.com/grafana/phlare/api v0.1.4-0.20230426005640-f90edba05413/go.mod h1:IvwuGG9xa/h96UH/exgvsfy3zE+ZpctkNT9o5aaGdrU= github.com/grafana/prometheus-alertmanager v0.25.1-0.20230308154952-78fedf89728b h1:VQOGGGJ2lKcVPANyzIESKYhSeA0QIvUQwfA3CbrkDfA= github.com/grafana/prometheus-alertmanager v0.25.1-0.20230308154952-78fedf89728b/go.mod h1:MnBfDPXJqXmmfPwQlCLvVUdqfnvrAw+hSPtDeaaFwj4= github.com/grafana/saml v0.4.13-0.20230203140620-5f476db5c00a h1:aWSTt/pTOI4uGY9DhBMG1l0GOnGjIYtaqxzYR3/q82o= diff --git a/pkg/tsdb/phlare/client.go b/pkg/tsdb/phlare/client.go index 3414122b74b..0651791ae50 100644 --- a/pkg/tsdb/phlare/client.go +++ b/pkg/tsdb/phlare/client.go @@ -10,7 +10,7 @@ type ProfilingClient interface { LabelNames(ctx context.Context, query string, start int64, end int64) ([]string, error) LabelValues(ctx context.Context, query string, label string, start int64, end int64) ([]string, error) GetSeries(ctx context.Context, profileTypeID string, labelSelector string, start int64, end int64, groupBy []string, step float64) (*SeriesResponse, error) - GetProfile(ctx context.Context, profileTypeID string, labelSelector string, start int64, end int64) (*ProfileResponse, error) + GetProfile(ctx context.Context, profileTypeID string, labelSelector string, start int64, end int64, maxNodes *int64) (*ProfileResponse, error) } type ProfileType struct { diff --git a/pkg/tsdb/phlare/kinds/dataquery/types_dataquery_gen.go b/pkg/tsdb/phlare/kinds/dataquery/types_dataquery_gen.go index 1b4ff0173ac..4f0d24785b2 100644 --- a/pkg/tsdb/phlare/kinds/dataquery/types_dataquery_gen.go +++ b/pkg/tsdb/phlare/kinds/dataquery/types_dataquery_gen.go @@ -35,6 +35,9 @@ type GrafanaPyroscopeDataQuery struct { // Specifies the query label selectors. LabelSelector string `json:"labelSelector"` + // Sets the maximum number of nodes in the flamegraph. + MaxNodes *int64 `json:"maxNodes,omitempty"` + // Specifies the type of profile to query. ProfileTypeId string `json:"profileTypeId"` diff --git a/pkg/tsdb/phlare/phlareClient.go b/pkg/tsdb/phlare/phlareClient.go index 39e0dd218f7..5ec0a620e67 100644 --- a/pkg/tsdb/phlare/phlareClient.go +++ b/pkg/tsdb/phlare/phlareClient.go @@ -90,13 +90,14 @@ func (c *PhlareClient) GetSeries(ctx context.Context, profileTypeID string, labe }, nil } -func (c *PhlareClient) GetProfile(ctx context.Context, profileTypeID string, labelSelector string, start int64, end int64) (*ProfileResponse, error) { +func (c *PhlareClient) GetProfile(ctx context.Context, profileTypeID, labelSelector string, start, end int64, maxNodes *int64) (*ProfileResponse, error) { req := &connect.Request[querierv1.SelectMergeStacktracesRequest]{ Msg: &querierv1.SelectMergeStacktracesRequest{ ProfileTypeID: profileTypeID, LabelSelector: labelSelector, Start: start, End: end, + MaxNodes: maxNodes, }, } diff --git a/pkg/tsdb/phlare/phlareClient_test.go b/pkg/tsdb/phlare/phlareClient_test.go index f594a253b32..4f84ad29b17 100644 --- a/pkg/tsdb/phlare/phlareClient_test.go +++ b/pkg/tsdb/phlare/phlareClient_test.go @@ -32,7 +32,8 @@ func Test_PhlareClient(t *testing.T) { }) t.Run("GetProfile", func(t *testing.T) { - resp, err := client.GetProfile(context.Background(), "memory:alloc_objects:count:space:bytes", "{}", 0, 100) + maxNodes := int64(-1) + resp, err := client.GetProfile(context.Background(), "memory:alloc_objects:count:space:bytes", "{}", 0, 100, &maxNodes) require.Nil(t, err) series := &ProfileResponse{ diff --git a/pkg/tsdb/phlare/pyroscopeClient.go b/pkg/tsdb/phlare/pyroscopeClient.go index 00bab8acde2..60b69473cbf 100644 --- a/pkg/tsdb/phlare/pyroscopeClient.go +++ b/pkg/tsdb/phlare/pyroscopeClient.go @@ -83,11 +83,14 @@ type PyroFlamebearer struct { Names []string `json:"names"` } -func (c *PyroscopeClient) getProfileData(ctx context.Context, profileTypeID string, labelSelector string, start int64, end int64, groupBy []string) (*PyroscopeProfileResponse, error) { +func (c *PyroscopeClient) getProfileData(ctx context.Context, profileTypeID, labelSelector string, start, end int64, maxNodes *int64, groupBy []string) (*PyroscopeProfileResponse, error) { params := url.Values{} params.Add("from", strconv.FormatInt(start, 10)) params.Add("until", strconv.FormatInt(end, 10)) params.Add("query", profileTypeID+labelSelector) + if maxNodes != nil { + params.Add("maxNodes", strconv.FormatInt(*maxNodes, 10)) + } params.Add("format", "json") if len(groupBy) > 0 { params.Add("groupBy", groupBy[0]) @@ -122,8 +125,8 @@ func (c *PyroscopeClient) getProfileData(ctx context.Context, profileTypeID stri return respData, nil } -func (c *PyroscopeClient) GetProfile(ctx context.Context, profileTypeID string, labelSelector string, start int64, end int64) (*ProfileResponse, error) { - respData, err := c.getProfileData(ctx, profileTypeID, labelSelector, start, end, nil) +func (c *PyroscopeClient) GetProfile(ctx context.Context, profileTypeID, labelSelector string, start, end int64, maxNodes *int64) (*ProfileResponse, error) { + respData, err := c.getProfileData(ctx, profileTypeID, labelSelector, start, end, maxNodes, nil) if err != nil { return nil, err } @@ -154,11 +157,11 @@ func (c *PyroscopeClient) GetProfile(ctx context.Context, profileTypeID string, }, nil } -func (c *PyroscopeClient) GetSeries(ctx context.Context, profileTypeID string, labelSelector string, start int64, end int64, groupBy []string, step float64) (*SeriesResponse, error) { +func (c *PyroscopeClient) GetSeries(ctx context.Context, profileTypeID string, labelSelector string, start, end int64, groupBy []string, step float64) (*SeriesResponse, error) { // This is super ineffective at the moment. We need 2 different APIs one for profile one for series (timeline) data // but Pyro returns all in single response. This currently does the simplest thing and calls the same API 2 times // and gets the part of the response it needs. - respData, err := c.getProfileData(ctx, profileTypeID, labelSelector, start, end, groupBy) + respData, err := c.getProfileData(ctx, profileTypeID, labelSelector, start, end, nil, groupBy) if err != nil { return nil, err } diff --git a/pkg/tsdb/phlare/query.go b/pkg/tsdb/phlare/query.go index 3b926217110..10e1dffc0bc 100644 --- a/pkg/tsdb/phlare/query.go +++ b/pkg/tsdb/phlare/query.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "math" + "sync" "time" "github.com/grafana/grafana-plugin-sdk-go/backend" @@ -13,6 +14,7 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/live" "github.com/grafana/grafana/pkg/tsdb/phlare/kinds/dataquery" "github.com/xlab/treeprint" + "golang.org/x/sync/errgroup" ) type queryModel struct { @@ -41,63 +43,76 @@ func (d *PhlareDatasource) query(ctx context.Context, pCtx backend.PluginContext return response } + responseMutex := sync.Mutex{} + g, gCtx := errgroup.WithContext(ctx) if query.QueryType == queryTypeMetrics || query.QueryType == queryTypeBoth { - var dsJson dsJsonModel - err = json.Unmarshal(pCtx.DataSourceInstanceSettings.JSONData, &dsJson) - if err != nil { - response.Error = fmt.Errorf("error unmarshaling datasource json model: %v", err) - return response - } - - parsedInterval := time.Second * 15 - if dsJson.MinStep != "" { - parsedInterval, err = gtime.ParseDuration(dsJson.MinStep) + g.Go(func() error { + var dsJson dsJsonModel + err = json.Unmarshal(pCtx.DataSourceInstanceSettings.JSONData, &dsJson) if err != nil { - parsedInterval = time.Second * 15 - logger.Debug("Failed to parse the MinStep using default", "MinStep", dsJson.MinStep) + return fmt.Errorf("error unmarshaling datasource json model: %v", err) } - } - logger.Debug("Sending SelectSeriesRequest", "queryModel", qm) - seriesResp, err := d.client.GetSeries( - ctx, - qm.ProfileTypeId, - qm.LabelSelector, - query.TimeRange.From.UnixMilli(), - query.TimeRange.To.UnixMilli(), - qm.GroupBy, - math.Max(query.Interval.Seconds(), parsedInterval.Seconds()), - ) - if err != nil { - logger.Error("Querying SelectSeries()", "err", err) - response.Error = err - return response - } - // add the frames to the response. - response.Frames = append(response.Frames, seriesToDataFrames(seriesResp)...) + + parsedInterval := time.Second * 15 + if dsJson.MinStep != "" { + parsedInterval, err = gtime.ParseDuration(dsJson.MinStep) + if err != nil { + parsedInterval = time.Second * 15 + logger.Debug("Failed to parse the MinStep using default", "MinStep", dsJson.MinStep) + } + } + logger.Debug("Sending SelectSeriesRequest", "queryModel", qm) + seriesResp, err := d.client.GetSeries( + gCtx, + qm.ProfileTypeId, + qm.LabelSelector, + query.TimeRange.From.UnixMilli(), + query.TimeRange.To.UnixMilli(), + qm.GroupBy, + math.Max(query.Interval.Seconds(), parsedInterval.Seconds()), + ) + if err != nil { + logger.Error("Querying SelectSeries()", "err", err) + return err + } + // add the frames to the response. + responseMutex.Lock() + response.Frames = append(response.Frames, seriesToDataFrames(seriesResp)...) + responseMutex.Unlock() + return nil + }) } if query.QueryType == queryTypeProfile || query.QueryType == queryTypeBoth { - logger.Debug("Calling GetProfile", "queryModel", qm) - prof, err := d.client.GetProfile(ctx, qm.ProfileTypeId, qm.LabelSelector, query.TimeRange.From.UnixMilli(), query.TimeRange.To.UnixMilli()) - if err != nil { - logger.Error("Error GetProfile()", "err", err) - response.Error = err - return response - } - frame := responseToDataFrames(prof) - response.Frames = append(response.Frames, frame) - - // If query called with streaming on then return a channel - // to subscribe on a client-side and consume updates from a plugin. - // Feel free to remove this if you don't need streaming for your datasource. - if qm.WithStreaming { - channel := live.Channel{ - Scope: live.ScopeDatasource, - Namespace: pCtx.DataSourceInstanceSettings.UID, - Path: "stream", + g.Go(func() error { + logger.Debug("Calling GetProfile", "queryModel", qm) + prof, err := d.client.GetProfile(gCtx, qm.ProfileTypeId, qm.LabelSelector, query.TimeRange.From.UnixMilli(), query.TimeRange.To.UnixMilli(), qm.MaxNodes) + if err != nil { + logger.Error("Error GetProfile()", "err", err) + return err } - frame.SetMeta(&data.FrameMeta{Channel: channel.String()}) - } + frame := responseToDataFrames(prof) + responseMutex.Lock() + response.Frames = append(response.Frames, frame) + responseMutex.Unlock() + + // If query called with streaming on then return a channel + // to subscribe on a client-side and consume updates from a plugin. + // Feel free to remove this if you don't need streaming for your datasource. + if qm.WithStreaming { + channel := live.Channel{ + Scope: live.ScopeDatasource, + Namespace: pCtx.DataSourceInstanceSettings.UID, + Path: "stream", + } + frame.SetMeta(&data.FrameMeta{Channel: channel.String()}) + } + return nil + }) + } + + if err := g.Wait(); err != nil { + response.Error = g.Wait() } return response diff --git a/pkg/tsdb/phlare/query_test.go b/pkg/tsdb/phlare/query_test.go index cc95e376ab7..19ef1b922d6 100644 --- a/pkg/tsdb/phlare/query_test.go +++ b/pkg/tsdb/phlare/query_test.go @@ -28,6 +28,12 @@ func Test_query(t *testing.T) { resp := ds.query(context.Background(), pCtx, *dataQuery) require.Nil(t, resp.Error) require.Equal(t, 2, len(resp.Frames)) + + // The order of the frames is not guaranteed, so we normalize it + if resp.Frames[0].Fields[0].Name == "level" { + resp.Frames[1], resp.Frames[0] = resp.Frames[0], resp.Frames[1] + } + require.Equal(t, "time", resp.Frames[0].Fields[0].Name) require.Equal(t, data.NewField("level", nil, []int64{0, 1, 2}), resp.Frames[1].Fields[0]) }) @@ -289,7 +295,7 @@ func (f *FakeClient) LabelNames(ctx context.Context, query string, start int64, panic("implement me") } -func (f *FakeClient) GetProfile(ctx context.Context, profileTypeID string, labelSelector string, start int64, end int64) (*ProfileResponse, error) { +func (f *FakeClient) GetProfile(ctx context.Context, profileTypeID, labelSelector string, start, end int64, maxNodes *int64) (*ProfileResponse, error) { return &ProfileResponse{ Flamebearer: &Flamebearer{ Names: []string{"foo", "bar", "baz"}, @@ -305,7 +311,7 @@ func (f *FakeClient) GetProfile(ctx context.Context, profileTypeID string, label }, nil } -func (f *FakeClient) GetSeries(ctx context.Context, profileTypeID string, labelSelector string, start int64, end int64, groupBy []string, step float64) (*SeriesResponse, error) { +func (f *FakeClient) GetSeries(ctx context.Context, profileTypeID, labelSelector string, start, end int64, groupBy []string, step float64) (*SeriesResponse, error) { f.Args = []interface{}{profileTypeID, labelSelector, start, end, groupBy, step} return &SeriesResponse{ Series: []*Series{ diff --git a/public/app/plugins/datasource/phlare/QueryEditor/QueryEditor.test.tsx b/public/app/plugins/datasource/phlare/QueryEditor/QueryEditor.test.tsx index d9d67521d8e..728d5060c20 100644 --- a/public/app/plugins/datasource/phlare/QueryEditor/QueryEditor.test.tsx +++ b/public/app/plugins/datasource/phlare/QueryEditor/QueryEditor.test.tsx @@ -92,6 +92,7 @@ function setup(options: { props: Partial } = { props: {} }) { labelSelector: '', profileTypeId: 'process_cpu:cpu', refId: 'A', + maxNodes: 1000, groupBy: [], }} datasource={ds} diff --git a/public/app/plugins/datasource/phlare/QueryEditor/QueryOptions.tsx b/public/app/plugins/datasource/phlare/QueryEditor/QueryOptions.tsx index 56cb059cba9..8723716b98a 100644 --- a/public/app/plugins/datasource/phlare/QueryEditor/QueryOptions.tsx +++ b/public/app/plugins/datasource/phlare/QueryEditor/QueryOptions.tsx @@ -3,7 +3,7 @@ import React from 'react'; import { useToggle } from 'react-use'; import { CoreApp, GrafanaTheme2, SelectableValue } from '@grafana/data'; -import { Icon, useStyles2, RadioButtonGroup, MultiSelect } from '@grafana/ui'; +import { Icon, useStyles2, RadioButtonGroup, MultiSelect, Input } from '@grafana/ui'; import { Query } from '../types'; @@ -91,6 +91,18 @@ export function QueryOptions({ query, onQueryChange, app, labels }: Props) { }} /> + Sets the maximum number of nodes to return in the flamegraph.}> + ) => { + let newValue = parseInt(event.currentTarget.value, 10); + newValue = isNaN(newValue) ? 0 : newValue; + onQueryChange({ ...query, maxNodes: newValue }); + }} + /> + )} diff --git a/public/app/plugins/datasource/phlare/dataquery.cue b/public/app/plugins/datasource/phlare/dataquery.cue index f77dd20a9b6..1a766f8d182 100644 --- a/public/app/plugins/datasource/phlare/dataquery.cue +++ b/public/app/plugins/datasource/phlare/dataquery.cue @@ -39,6 +39,8 @@ composableKinds: DataQuery: { profileTypeId: string // Allows to group the results. groupBy: [...string] + // Sets the maximum number of nodes in the flamegraph. + maxNodes?: int64 #PhlareQueryType: "metrics" | "profile" | *"both" @cuetsy(kind="type") }, ] diff --git a/public/app/plugins/datasource/phlare/dataquery.gen.ts b/public/app/plugins/datasource/phlare/dataquery.gen.ts index 0d4936d7867..7be23baa6a2 100644 --- a/public/app/plugins/datasource/phlare/dataquery.gen.ts +++ b/public/app/plugins/datasource/phlare/dataquery.gen.ts @@ -25,6 +25,10 @@ export interface GrafanaPyroscope extends common.DataQuery { * Specifies the query label selectors. */ labelSelector: string; + /** + * Sets the maximum number of nodes in the flamegraph. + */ + maxNodes?: number; /** * Specifies the type of profile to query. */ diff --git a/public/app/plugins/datasource/phlare/datasource.ts b/public/app/plugins/datasource/phlare/datasource.ts index 4cf80c31e71..21e4f31dfe9 100644 --- a/public/app/plugins/datasource/phlare/datasource.ts +++ b/public/app/plugins/datasource/phlare/datasource.ts @@ -82,6 +82,7 @@ export class PhlareDataSource extends DataSourceWithBackend