Testdata: add random labeled stream (#41932)

This commit is contained in:
Ryan McKinley 2021-11-19 09:54:05 -08:00 committed by GitHub
parent 82cf49143d
commit f661e20dd7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 68 additions and 17 deletions

View File

@ -94,6 +94,10 @@ func (r *Runner) GetManagedChannels(orgID int64) ([]*ManagedChannel, error) {
Channel: "plugin/testdata/random-flakey-stream",
Data: frameJSON,
MinuteRate: 150,
}, &ManagedChannel{
Channel: "plugin/testdata/random-labeled-stream",
Data: frameJSON,
MinuteRate: 250,
}, &ManagedChannel{
Channel: "plugin/testdata/random-20Hz-stream",
Data: frameJSON,

View File

@ -55,7 +55,7 @@ func TestGetManagedStreams(t *testing.T) {
managedChannels, err := runner.GetManagedChannels(1)
require.NoError(t, err)
require.Len(t, managedChannels, 3) // 3 hardcoded testdata streams.
require.Len(t, managedChannels, 4) // 4 hardcoded testdata streams.
err = s1.Push("cpu1", data.NewFrame("cpu1"))
require.NoError(t, err)
@ -68,10 +68,10 @@ func TestGetManagedStreams(t *testing.T) {
managedChannels, err = runner.GetManagedChannels(1)
require.NoError(t, err)
require.Len(t, managedChannels, 6) // 3 hardcoded testdata streams + 3 test channels.
require.Equal(t, "stream/test1/cpu1", managedChannels[3].Channel)
require.Equal(t, "stream/test1/cpu2", managedChannels[4].Channel)
require.Equal(t, "stream/test2/cpu1", managedChannels[5].Channel)
require.Len(t, managedChannels, 7) // 4 hardcoded testdata streams + 3 test channels.
require.Equal(t, "stream/test1/cpu1", managedChannels[4].Channel)
require.Equal(t, "stream/test1/cpu2", managedChannels[5].Channel)
require.Equal(t, "stream/test2/cpu1", managedChannels[6].Channel)
// Different org.
s3, err := runner.GetOrCreateStream(2, "stream", "test1")
@ -80,5 +80,5 @@ func TestGetManagedStreams(t *testing.T) {
require.NoError(t, err)
managedChannels, err = runner.GetManagedChannels(1)
require.NoError(t, err)
require.Len(t, managedChannels, 6) // Not affected by other org.
require.Len(t, managedChannels, 7) // Not affected by other org.
}

View File

@ -27,6 +27,13 @@ func (s *Service) SubscribeStream(_ context.Context, req *backend.SubscribeStrea
}
}
if strings.Contains(req.Path, "-labeled") {
initialData, err = backend.NewInitialFrame(s.labelFrame, data.IncludeSchemaOnly)
if err != nil {
return nil, err
}
}
if s.cfg.FeatureToggles["live-pipeline"] {
// While developing Live pipeline avoid sending initial data.
initialData = nil
@ -58,6 +65,12 @@ func (s *Service) RunStream(ctx context.Context, request *backend.RunStreamReque
Interval: 100 * time.Millisecond,
Drop: 0.75, // keep 25%
}
case "random-labeled-stream":
conf = testStreamConfig{
Interval: 200 * time.Millisecond,
Drop: 0.2, // keep 80%
Labeled: true,
}
case "random-20Hz-stream":
conf = testStreamConfig{
Interval: 50 * time.Millisecond,
@ -77,6 +90,7 @@ type testStreamConfig struct {
Interval time.Duration
Drop float64
Flight *flightConfig
Labeled bool
}
func (s *Service) runTestStream(ctx context.Context, path string, conf testStreamConfig, sender *backend.StreamSender) error {
@ -92,6 +106,12 @@ func (s *Service) runTestStream(ctx context.Context, path string, conf testStrea
flight.append(conf.Flight.getNextPoint(time.Now()))
}
labelFrame := data.NewFrame("labeled",
data.NewField("labels", nil, make([]string, 2)),
data.NewField("Time", nil, make([]time.Time, 2)),
data.NewField("Value", nil, make([]float64, 2)),
)
for {
select {
case <-ctx.Done():
@ -116,12 +136,28 @@ func (s *Service) runTestStream(ctx context.Context, path string, conf testStrea
delta := rand.Float64() - 0.5
walker += delta
s.frame.Fields[0].Set(0, t)
s.frame.Fields[1].Set(0, walker) // Value
s.frame.Fields[2].Set(0, walker-((rand.Float64()*spread)+0.01)) // Min
s.frame.Fields[3].Set(0, walker+((rand.Float64()*spread)+0.01)) // Max
if err := sender.SendFrame(s.frame, mode); err != nil {
return err
if conf.Labeled {
secA := t.Second() / 3
secB := t.Second() / 7
labelFrame.Fields[0].Set(0, fmt.Sprintf("s=A,s=p%d,x=X", secA))
labelFrame.Fields[1].Set(0, t)
labelFrame.Fields[2].Set(0, walker)
labelFrame.Fields[0].Set(1, fmt.Sprintf("s=B,s=p%d,x=X", secB))
labelFrame.Fields[1].Set(1, t)
labelFrame.Fields[2].Set(1, walker+10)
if err := sender.SendFrame(labelFrame, mode); err != nil {
return err
}
} else {
s.frame.Fields[0].Set(0, t)
s.frame.Fields[1].Set(0, walker) // Value
s.frame.Fields[2].Set(0, walker-((rand.Float64()*spread)+0.01)) // Min
s.frame.Fields[3].Set(0, walker+((rand.Float64()*spread)+0.01)) // Max
if err := sender.SendFrame(s.frame, mode); err != nil {
return err
}
}
}
}

View File

@ -24,6 +24,11 @@ func ProvideService(cfg *setting.Cfg, registrar plugins.CoreBackendRegistrar) (*
data.NewField("Min", nil, make([]float64, 1)),
data.NewField("Max", nil, make([]float64, 1)),
),
labelFrame: data.NewFrame("labeled",
data.NewField("labels", nil, make([]string, 1)),
data.NewField("Time", nil, make([]time.Time, 1)),
data.NewField("Value", nil, make([]float64, 1)),
),
logger: log.New("tsdb.testdata"),
cfg: cfg,
}
@ -47,9 +52,10 @@ func ProvideService(cfg *setting.Cfg, registrar plugins.CoreBackendRegistrar) (*
}
type Service struct {
cfg *setting.Cfg
logger log.Logger
scenarios map[string]*Scenario
frame *data.Frame
queryMux *datasource.QueryTypeMux
cfg *setting.Cfg
logger log.Logger
scenarios map[string]*Scenario
frame *data.Frame
labelFrame *data.Frame
queryMux *datasource.QueryTypeMux
}

View File

@ -14,6 +14,11 @@ const liveTestDataChannels = [
value: 'random-flakey-stream',
description: 'Stream that returns data in random intervals',
},
{
label: 'random-labeled-stream',
value: 'random-labeled-stream',
description: 'Value with moving labels',
},
{
label: 'random-20Hz-stream',
value: 'random-20Hz-stream',