diff --git a/pkg/services/live/pipeline/devdata.go b/pkg/services/live/pipeline/devdata.go index 1c1afdb67cf..12ce499ef8c 100644 --- a/pkg/services/live/pipeline/devdata.go +++ b/pkg/services/live/pipeline/devdata.go @@ -104,9 +104,10 @@ func (f *DevRuleBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelR FrameOutputters: []FrameOutputter{ NewManagedStreamFrameOutput(f.ManagedStream), NewRemoteWriteFrameOutput(RemoteWriteConfig{ - Endpoint: os.Getenv("GF_LIVE_REMOTE_WRITE_ENDPOINT"), - User: os.Getenv("GF_LIVE_REMOTE_WRITE_USER"), - Password: os.Getenv("GF_LIVE_REMOTE_WRITE_PASSWORD"), + Endpoint: os.Getenv("GF_LIVE_REMOTE_WRITE_ENDPOINT"), + User: os.Getenv("GF_LIVE_REMOTE_WRITE_USER"), + Password: os.Getenv("GF_LIVE_REMOTE_WRITE_PASSWORD"), + SampleMilliseconds: 1000, }), }, Subscribers: []Subscriber{ diff --git a/pkg/services/live/pipeline/frame_output_remote_write.go b/pkg/services/live/pipeline/frame_output_remote_write.go index 19fa3642041..f0c6a5e7e7b 100644 --- a/pkg/services/live/pipeline/frame_output_remote_write.go +++ b/pkg/services/live/pipeline/frame_output_remote_write.go @@ -14,6 +14,8 @@ import ( "github.com/prometheus/prometheus/prompb" ) +const flushInterval = 15 * time.Second + type RemoteWriteConfig struct { // Endpoint to send streaming frames to. Endpoint string `json:"endpoint"` @@ -21,6 +23,14 @@ type RemoteWriteConfig struct { User string `json:"user"` // Password for remote write endpoint. Password string `json:"password"` + // SampleMilliseconds allow defining an interval to sample points inside a channel + // when outputting to remote write endpoint (on __name__ label basis). For example + // when having a 20Hz stream and SampleMilliseconds 1000 then only one point in a + // second will be sent to remote write endpoint. This reduces data resolution of course. + // If not set - then no down-sampling will be performed. If SampleMilliseconds is + // greater than flushInterval then each flush will include a point as we only keeping + // track of timestamps in terms of each individual flush at the moment. + SampleMilliseconds int64 `json:"sampleMilliseconds"` } type RemoteWriteFrameOutput struct { @@ -48,7 +58,7 @@ func (out *RemoteWriteFrameOutput) Type() string { } func (out *RemoteWriteFrameOutput) flushPeriodically() { - for range time.NewTicker(15 * time.Second).C { + for range time.NewTicker(flushInterval).C { out.mu.Lock() if len(out.buffer) == 0 { out.mu.Unlock() @@ -70,8 +80,66 @@ func (out *RemoteWriteFrameOutput) flushPeriodically() { } } +func (out *RemoteWriteFrameOutput) sample(timeSeries []prompb.TimeSeries) []prompb.TimeSeries { + samples := map[string]prompb.TimeSeries{} + timestamps := map[string]int64{} + + for _, ts := range timeSeries { + var name string + + for _, label := range ts.Labels { + if label.Name == "__name__" { + name = label.Value + break + } + } + + sample, ok := samples[name] + if !ok { + sample = prompb.TimeSeries{} + } + + lastTimestamp := timestamps[name] + + // In-place filtering, see https://github.com/golang/go/wiki/SliceTricks#filter-in-place. + n := 0 + for _, s := range ts.Samples { + if lastTimestamp == 0 || s.Timestamp > lastTimestamp+out.config.SampleMilliseconds { + ts.Samples[n] = s + n++ + lastTimestamp = s.Timestamp + } + } + filteredSamples := ts.Samples[:n] + + timestamps[name] = lastTimestamp + + sample.Labels = ts.Labels + sample.Samples = append(sample.Samples, filteredSamples...) + samples[name] = sample + } + var toReturn []prompb.TimeSeries + for _, ts := range samples { + toReturn = append(toReturn, ts) + } + return toReturn +} + func (out *RemoteWriteFrameOutput) flush(timeSeries []prompb.TimeSeries) error { - logger.Debug("Remote write flush", "num time series", len(timeSeries)) + numSamples := 0 + for _, ts := range timeSeries { + numSamples += len(ts.Samples) + } + logger.Debug("Remote write flush", "numTimeSeries", len(timeSeries), "numSamples", numSamples) + + if out.config.SampleMilliseconds > 0 { + timeSeries = out.sample(timeSeries) + numSamples = 0 + for _, ts := range timeSeries { + numSamples += len(ts.Samples) + } + logger.Debug("After down-sampling", "numTimeSeries", len(timeSeries), "numSamples", numSamples) + } remoteWriteData, err := remotewrite.TimeSeriesToBytes(timeSeries) if err != nil { return fmt.Errorf("error converting time series to bytes: %v", err) diff --git a/pkg/services/live/pipeline/frame_output_remote_write_test.go b/pkg/services/live/pipeline/frame_output_remote_write_test.go new file mode 100644 index 00000000000..70d94159acc --- /dev/null +++ b/pkg/services/live/pipeline/frame_output_remote_write_test.go @@ -0,0 +1,150 @@ +package pipeline + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/prompb" +) + +func TestRemoteWriteFrameOutput_sample(t *testing.T) { + // Given 2 time series in a buffer, we output the same number + // of time series but with one sample removed from each. + now := time.Now().UnixNano() / int64(time.Millisecond) + timeSeries := []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + { + Name: "__name__", + Value: "test1", + }, + }, + Samples: []prompb.Sample{ + { + Timestamp: now, + Value: 1, + }, + { + Timestamp: now + 100, + Value: 2, + }, + }, + }, + { + Labels: []prompb.Label{ + { + Name: "__name__", + Value: "test2", + }, + }, + Samples: []prompb.Sample{ + { + Timestamp: now, + Value: 1, + }, + { + Timestamp: now + 100, + Value: 2, + }, + }, + }, + } + out := NewRemoteWriteFrameOutput(RemoteWriteConfig{ + SampleMilliseconds: 500, + }) + sampledTimeSeries := out.sample(timeSeries) + require.Len(t, sampledTimeSeries, 2) + + require.Equal(t, []prompb.Sample{{Timestamp: now, Value: 1}}, sampledTimeSeries[0].Samples) + require.Equal(t, []prompb.Sample{{Timestamp: now, Value: 1}}, sampledTimeSeries[1].Samples) +} + +func TestRemoteWriteFrameOutput_sample_merge(t *testing.T) { + // Given 3 time series in a buffer, we output only + // 2 time series since we merge by __name__ label. + now := time.Now().UnixNano() / int64(time.Millisecond) + timeSeries := []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + { + Name: "__name__", + Value: "test1", + }, + }, + Samples: []prompb.Sample{ + { + Timestamp: now, + Value: 1, + }, + { + Timestamp: now + 100, + Value: 2, + }, + { + Timestamp: now + 200, + Value: 2, + }, + }, + }, + { + Labels: []prompb.Label{ + { + Name: "__name__", + Value: "test2", + }, + }, + Samples: []prompb.Sample{ + { + Timestamp: now, + Value: 1, + }, + { + Timestamp: now + 100, + Value: 2, + }, + }, + }, + { + Labels: []prompb.Label{ + { + Name: "__name__", + Value: "test2", + }, + }, + Samples: []prompb.Sample{ + { + Timestamp: now, + Value: 1, + }, + { + Timestamp: now + 100, + Value: 2, + }, + }, + }, + } + out := NewRemoteWriteFrameOutput(RemoteWriteConfig{ + SampleMilliseconds: 50, + }) + sampledTimeSeries := out.sample(timeSeries) + require.Len(t, sampledTimeSeries, 2) + + expectedSamples := map[string][]prompb.Sample{ + "test1": timeSeries[0].Samples, + "test2": { + { + Timestamp: now, + Value: 1, + }, + { + Timestamp: now + 100, + Value: 2, + }, + }, + } + + require.Equal(t, expectedSamples[sampledTimeSeries[0].Labels[0].Value], sampledTimeSeries[0].Samples) + require.Equal(t, expectedSamples[sampledTimeSeries[1].Labels[0].Value], sampledTimeSeries[1].Samples) +} diff --git a/pkg/services/live/remotewrite/remotewrite_test.go b/pkg/services/live/remotewrite/remotewrite_test.go new file mode 100644 index 00000000000..2f4494a25b4 --- /dev/null +++ b/pkg/services/live/remotewrite/remotewrite_test.go @@ -0,0 +1,87 @@ +package remotewrite + +import ( + "testing" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/stretchr/testify/require" +) + +func TestTsFromFrames(t *testing.T) { + t1 := time.Now() + t2 := time.Now().Add(time.Second) + frame := data.NewFrame("test", + data.NewField("time", map[string]string{"test": "yes"}, []time.Time{t1, t2}), + data.NewField("value", map[string]string{"test": "yes"}, []float64{1.0, 2.0}), + ) + ts := TimeSeriesFromFrames(frame) + require.Len(t, ts, 1) + require.Len(t, ts[0].Samples, 2) + require.Equal(t, toSampleTime(t1), ts[0].Samples[0].Timestamp) + require.Equal(t, toSampleTime(t2), ts[0].Samples[1].Timestamp) + require.Len(t, ts[0].Labels, 2) + require.Equal(t, "test", ts[0].Labels[0].Name) + require.Equal(t, "yes", ts[0].Labels[0].Value) + require.Equal(t, "__name__", ts[0].Labels[1].Name) + require.Equal(t, "test_value", ts[0].Labels[1].Value) +} + +func TestTsFromFramesMultipleSeries(t *testing.T) { + t1 := time.Now() + t2 := time.Now().Add(time.Second) + frame := data.NewFrame("test", + data.NewField("time", nil, []time.Time{t1, t2}), + data.NewField("value1", nil, []float64{1.0, 2.0}), + data.NewField("value2", nil, []bool{true, false}), + data.NewField("value3", nil, []float64{3.0, 4.0}), + ) + ts := TimeSeriesFromFrames(frame) + require.Len(t, ts, 2) + require.Len(t, ts[0].Samples, 2) + require.Equal(t, toSampleTime(t1), ts[0].Samples[0].Timestamp) + require.Equal(t, toSampleTime(t2), ts[0].Samples[1].Timestamp) + require.Equal(t, 1.0, ts[0].Samples[0].Value) + require.Equal(t, 2.0, ts[0].Samples[1].Value) + require.Len(t, ts[1].Samples, 2) + require.Equal(t, toSampleTime(t1), ts[1].Samples[0].Timestamp) + require.Equal(t, toSampleTime(t2), ts[1].Samples[1].Timestamp) + require.Equal(t, 3.0, ts[1].Samples[0].Value) + require.Equal(t, 4.0, ts[1].Samples[1].Value) +} + +func TestTsFromFramesMultipleFrames(t *testing.T) { + t1 := time.Now() + t2 := time.Now().Add(time.Second) + t3 := time.Now().Add(2 * time.Second) + t4 := time.Now().Add(3 * time.Second) + frame1 := data.NewFrame("test", + data.NewField("time", nil, []time.Time{t1, t2}), + data.NewField("value1", nil, []float64{1.0, 2.0}), + ) + frame2 := data.NewFrame("test", + data.NewField("time", nil, []time.Time{t3, t4}), + data.NewField("value3", nil, []float64{3.0, 4.0}), + ) + ts := TimeSeriesFromFrames(frame1, frame2) + require.Len(t, ts, 2) + require.Len(t, ts[0].Samples, 2) + require.Equal(t, toSampleTime(t1), ts[0].Samples[0].Timestamp) + require.Equal(t, toSampleTime(t2), ts[0].Samples[1].Timestamp) + require.Equal(t, 1.0, ts[0].Samples[0].Value) + require.Equal(t, 2.0, ts[0].Samples[1].Value) + require.Len(t, ts[1].Samples, 2) + require.Equal(t, toSampleTime(t3), ts[1].Samples[0].Timestamp) + require.Equal(t, toSampleTime(t4), ts[1].Samples[1].Timestamp) + require.Equal(t, 3.0, ts[1].Samples[0].Value) + require.Equal(t, 4.0, ts[1].Samples[1].Value) +} + +func TestSerialize(t *testing.T) { + frame := data.NewFrame("test", + data.NewField("time", nil, []time.Time{time.Now(), time.Now().Add(time.Second)}), + data.NewField("value", nil, []float64{1.0, 2.0}), + ) + _, err := Serialize(frame) + require.NoError(t, err) +}