From f661e20dd757c76fb050306374601241f6b4097e Mon Sep 17 00:00:00 2001 From: Ryan McKinley Date: Fri, 19 Nov 2021 09:54:05 -0800 Subject: [PATCH] Testdata: add random labeled stream (#41932) --- pkg/services/live/managedstream/runner.go | 4 ++ .../live/managedstream/runner_test.go | 12 ++--- pkg/tsdb/testdatasource/stream_handler.go | 48 ++++++++++++++++--- pkg/tsdb/testdatasource/testdata.go | 16 +++++-- .../testdata/components/GrafanaLiveEditor.tsx | 5 ++ 5 files changed, 68 insertions(+), 17 deletions(-) diff --git a/pkg/services/live/managedstream/runner.go b/pkg/services/live/managedstream/runner.go index 609da93396b..f622c95e91d 100644 --- a/pkg/services/live/managedstream/runner.go +++ b/pkg/services/live/managedstream/runner.go @@ -94,6 +94,10 @@ func (r *Runner) GetManagedChannels(orgID int64) ([]*ManagedChannel, error) { Channel: "plugin/testdata/random-flakey-stream", Data: frameJSON, MinuteRate: 150, + }, &ManagedChannel{ + Channel: "plugin/testdata/random-labeled-stream", + Data: frameJSON, + MinuteRate: 250, }, &ManagedChannel{ Channel: "plugin/testdata/random-20Hz-stream", Data: frameJSON, diff --git a/pkg/services/live/managedstream/runner_test.go b/pkg/services/live/managedstream/runner_test.go index 66f0347cca6..cde92eb489e 100644 --- a/pkg/services/live/managedstream/runner_test.go +++ b/pkg/services/live/managedstream/runner_test.go @@ -55,7 +55,7 @@ func TestGetManagedStreams(t *testing.T) { managedChannels, err := runner.GetManagedChannels(1) require.NoError(t, err) - require.Len(t, managedChannels, 3) // 3 hardcoded testdata streams. + require.Len(t, managedChannels, 4) // 4 hardcoded testdata streams. err = s1.Push("cpu1", data.NewFrame("cpu1")) require.NoError(t, err) @@ -68,10 +68,10 @@ func TestGetManagedStreams(t *testing.T) { managedChannels, err = runner.GetManagedChannels(1) require.NoError(t, err) - require.Len(t, managedChannels, 6) // 3 hardcoded testdata streams + 3 test channels. - require.Equal(t, "stream/test1/cpu1", managedChannels[3].Channel) - require.Equal(t, "stream/test1/cpu2", managedChannels[4].Channel) - require.Equal(t, "stream/test2/cpu1", managedChannels[5].Channel) + require.Len(t, managedChannels, 7) // 4 hardcoded testdata streams + 3 test channels. + require.Equal(t, "stream/test1/cpu1", managedChannels[4].Channel) + require.Equal(t, "stream/test1/cpu2", managedChannels[5].Channel) + require.Equal(t, "stream/test2/cpu1", managedChannels[6].Channel) // Different org. s3, err := runner.GetOrCreateStream(2, "stream", "test1") @@ -80,5 +80,5 @@ func TestGetManagedStreams(t *testing.T) { require.NoError(t, err) managedChannels, err = runner.GetManagedChannels(1) require.NoError(t, err) - require.Len(t, managedChannels, 6) // Not affected by other org. + require.Len(t, managedChannels, 7) // Not affected by other org. } diff --git a/pkg/tsdb/testdatasource/stream_handler.go b/pkg/tsdb/testdatasource/stream_handler.go index 2a5d4331872..6f049e19511 100644 --- a/pkg/tsdb/testdatasource/stream_handler.go +++ b/pkg/tsdb/testdatasource/stream_handler.go @@ -27,6 +27,13 @@ func (s *Service) SubscribeStream(_ context.Context, req *backend.SubscribeStrea } } + if strings.Contains(req.Path, "-labeled") { + initialData, err = backend.NewInitialFrame(s.labelFrame, data.IncludeSchemaOnly) + if err != nil { + return nil, err + } + } + if s.cfg.FeatureToggles["live-pipeline"] { // While developing Live pipeline avoid sending initial data. initialData = nil @@ -58,6 +65,12 @@ func (s *Service) RunStream(ctx context.Context, request *backend.RunStreamReque Interval: 100 * time.Millisecond, Drop: 0.75, // keep 25% } + case "random-labeled-stream": + conf = testStreamConfig{ + Interval: 200 * time.Millisecond, + Drop: 0.2, // keep 80% + Labeled: true, + } case "random-20Hz-stream": conf = testStreamConfig{ Interval: 50 * time.Millisecond, @@ -77,6 +90,7 @@ type testStreamConfig struct { Interval time.Duration Drop float64 Flight *flightConfig + Labeled bool } func (s *Service) runTestStream(ctx context.Context, path string, conf testStreamConfig, sender *backend.StreamSender) error { @@ -92,6 +106,12 @@ func (s *Service) runTestStream(ctx context.Context, path string, conf testStrea flight.append(conf.Flight.getNextPoint(time.Now())) } + labelFrame := data.NewFrame("labeled", + data.NewField("labels", nil, make([]string, 2)), + data.NewField("Time", nil, make([]time.Time, 2)), + data.NewField("Value", nil, make([]float64, 2)), + ) + for { select { case <-ctx.Done(): @@ -116,12 +136,28 @@ func (s *Service) runTestStream(ctx context.Context, path string, conf testStrea delta := rand.Float64() - 0.5 walker += delta - s.frame.Fields[0].Set(0, t) - s.frame.Fields[1].Set(0, walker) // Value - s.frame.Fields[2].Set(0, walker-((rand.Float64()*spread)+0.01)) // Min - s.frame.Fields[3].Set(0, walker+((rand.Float64()*spread)+0.01)) // Max - if err := sender.SendFrame(s.frame, mode); err != nil { - return err + if conf.Labeled { + secA := t.Second() / 3 + secB := t.Second() / 7 + + labelFrame.Fields[0].Set(0, fmt.Sprintf("s=A,s=p%d,x=X", secA)) + labelFrame.Fields[1].Set(0, t) + labelFrame.Fields[2].Set(0, walker) + + labelFrame.Fields[0].Set(1, fmt.Sprintf("s=B,s=p%d,x=X", secB)) + labelFrame.Fields[1].Set(1, t) + labelFrame.Fields[2].Set(1, walker+10) + if err := sender.SendFrame(labelFrame, mode); err != nil { + return err + } + } else { + s.frame.Fields[0].Set(0, t) + s.frame.Fields[1].Set(0, walker) // Value + s.frame.Fields[2].Set(0, walker-((rand.Float64()*spread)+0.01)) // Min + s.frame.Fields[3].Set(0, walker+((rand.Float64()*spread)+0.01)) // Max + if err := sender.SendFrame(s.frame, mode); err != nil { + return err + } } } } diff --git a/pkg/tsdb/testdatasource/testdata.go b/pkg/tsdb/testdatasource/testdata.go index 6a90d2cf0b3..4bb78afc114 100644 --- a/pkg/tsdb/testdatasource/testdata.go +++ b/pkg/tsdb/testdatasource/testdata.go @@ -24,6 +24,11 @@ func ProvideService(cfg *setting.Cfg, registrar plugins.CoreBackendRegistrar) (* data.NewField("Min", nil, make([]float64, 1)), data.NewField("Max", nil, make([]float64, 1)), ), + labelFrame: data.NewFrame("labeled", + data.NewField("labels", nil, make([]string, 1)), + data.NewField("Time", nil, make([]time.Time, 1)), + data.NewField("Value", nil, make([]float64, 1)), + ), logger: log.New("tsdb.testdata"), cfg: cfg, } @@ -47,9 +52,10 @@ func ProvideService(cfg *setting.Cfg, registrar plugins.CoreBackendRegistrar) (* } type Service struct { - cfg *setting.Cfg - logger log.Logger - scenarios map[string]*Scenario - frame *data.Frame - queryMux *datasource.QueryTypeMux + cfg *setting.Cfg + logger log.Logger + scenarios map[string]*Scenario + frame *data.Frame + labelFrame *data.Frame + queryMux *datasource.QueryTypeMux } diff --git a/public/app/plugins/datasource/testdata/components/GrafanaLiveEditor.tsx b/public/app/plugins/datasource/testdata/components/GrafanaLiveEditor.tsx index a37252f6baa..3eba6dd409a 100644 --- a/public/app/plugins/datasource/testdata/components/GrafanaLiveEditor.tsx +++ b/public/app/plugins/datasource/testdata/components/GrafanaLiveEditor.tsx @@ -14,6 +14,11 @@ const liveTestDataChannels = [ value: 'random-flakey-stream', description: 'Stream that returns data in random intervals', }, + { + label: 'random-labeled-stream', + value: 'random-labeled-stream', + description: 'Value with moving labels', + }, { label: 'random-20Hz-stream', value: 'random-20Hz-stream',