Live: remote write sampling (#40079)

This commit is contained in:
Alexander Emelin 2021-10-07 11:42:57 +03:00 committed by GitHub
parent 5c7e874008
commit 8180121495
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 311 additions and 5 deletions

View File

@ -107,6 +107,7 @@ func (f *DevRuleBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelR
Endpoint: os.Getenv("GF_LIVE_REMOTE_WRITE_ENDPOINT"),
User: os.Getenv("GF_LIVE_REMOTE_WRITE_USER"),
Password: os.Getenv("GF_LIVE_REMOTE_WRITE_PASSWORD"),
SampleMilliseconds: 1000,
}),
},
Subscribers: []Subscriber{

View File

@ -14,6 +14,8 @@ import (
"github.com/prometheus/prometheus/prompb"
)
const flushInterval = 15 * time.Second
type RemoteWriteConfig struct {
// Endpoint to send streaming frames to.
Endpoint string `json:"endpoint"`
@ -21,6 +23,14 @@ type RemoteWriteConfig struct {
User string `json:"user"`
// Password for remote write endpoint.
Password string `json:"password"`
// SampleMilliseconds allow defining an interval to sample points inside a channel
// when outputting to remote write endpoint (on __name__ label basis). For example
// when having a 20Hz stream and SampleMilliseconds 1000 then only one point in a
// second will be sent to remote write endpoint. This reduces data resolution of course.
// If not set - then no down-sampling will be performed. If SampleMilliseconds is
// greater than flushInterval then each flush will include a point as we only keeping
// track of timestamps in terms of each individual flush at the moment.
SampleMilliseconds int64 `json:"sampleMilliseconds"`
}
type RemoteWriteFrameOutput struct {
@ -48,7 +58,7 @@ func (out *RemoteWriteFrameOutput) Type() string {
}
func (out *RemoteWriteFrameOutput) flushPeriodically() {
for range time.NewTicker(15 * time.Second).C {
for range time.NewTicker(flushInterval).C {
out.mu.Lock()
if len(out.buffer) == 0 {
out.mu.Unlock()
@ -70,8 +80,66 @@ func (out *RemoteWriteFrameOutput) flushPeriodically() {
}
}
func (out *RemoteWriteFrameOutput) sample(timeSeries []prompb.TimeSeries) []prompb.TimeSeries {
samples := map[string]prompb.TimeSeries{}
timestamps := map[string]int64{}
for _, ts := range timeSeries {
var name string
for _, label := range ts.Labels {
if label.Name == "__name__" {
name = label.Value
break
}
}
sample, ok := samples[name]
if !ok {
sample = prompb.TimeSeries{}
}
lastTimestamp := timestamps[name]
// In-place filtering, see https://github.com/golang/go/wiki/SliceTricks#filter-in-place.
n := 0
for _, s := range ts.Samples {
if lastTimestamp == 0 || s.Timestamp > lastTimestamp+out.config.SampleMilliseconds {
ts.Samples[n] = s
n++
lastTimestamp = s.Timestamp
}
}
filteredSamples := ts.Samples[:n]
timestamps[name] = lastTimestamp
sample.Labels = ts.Labels
sample.Samples = append(sample.Samples, filteredSamples...)
samples[name] = sample
}
var toReturn []prompb.TimeSeries
for _, ts := range samples {
toReturn = append(toReturn, ts)
}
return toReturn
}
func (out *RemoteWriteFrameOutput) flush(timeSeries []prompb.TimeSeries) error {
logger.Debug("Remote write flush", "num time series", len(timeSeries))
numSamples := 0
for _, ts := range timeSeries {
numSamples += len(ts.Samples)
}
logger.Debug("Remote write flush", "numTimeSeries", len(timeSeries), "numSamples", numSamples)
if out.config.SampleMilliseconds > 0 {
timeSeries = out.sample(timeSeries)
numSamples = 0
for _, ts := range timeSeries {
numSamples += len(ts.Samples)
}
logger.Debug("After down-sampling", "numTimeSeries", len(timeSeries), "numSamples", numSamples)
}
remoteWriteData, err := remotewrite.TimeSeriesToBytes(timeSeries)
if err != nil {
return fmt.Errorf("error converting time series to bytes: %v", err)

View File

@ -0,0 +1,150 @@
package pipeline
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/prompb"
)
func TestRemoteWriteFrameOutput_sample(t *testing.T) {
// Given 2 time series in a buffer, we output the same number
// of time series but with one sample removed from each.
now := time.Now().UnixNano() / int64(time.Millisecond)
timeSeries := []prompb.TimeSeries{
{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "test1",
},
},
Samples: []prompb.Sample{
{
Timestamp: now,
Value: 1,
},
{
Timestamp: now + 100,
Value: 2,
},
},
},
{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "test2",
},
},
Samples: []prompb.Sample{
{
Timestamp: now,
Value: 1,
},
{
Timestamp: now + 100,
Value: 2,
},
},
},
}
out := NewRemoteWriteFrameOutput(RemoteWriteConfig{
SampleMilliseconds: 500,
})
sampledTimeSeries := out.sample(timeSeries)
require.Len(t, sampledTimeSeries, 2)
require.Equal(t, []prompb.Sample{{Timestamp: now, Value: 1}}, sampledTimeSeries[0].Samples)
require.Equal(t, []prompb.Sample{{Timestamp: now, Value: 1}}, sampledTimeSeries[1].Samples)
}
func TestRemoteWriteFrameOutput_sample_merge(t *testing.T) {
// Given 3 time series in a buffer, we output only
// 2 time series since we merge by __name__ label.
now := time.Now().UnixNano() / int64(time.Millisecond)
timeSeries := []prompb.TimeSeries{
{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "test1",
},
},
Samples: []prompb.Sample{
{
Timestamp: now,
Value: 1,
},
{
Timestamp: now + 100,
Value: 2,
},
{
Timestamp: now + 200,
Value: 2,
},
},
},
{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "test2",
},
},
Samples: []prompb.Sample{
{
Timestamp: now,
Value: 1,
},
{
Timestamp: now + 100,
Value: 2,
},
},
},
{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "test2",
},
},
Samples: []prompb.Sample{
{
Timestamp: now,
Value: 1,
},
{
Timestamp: now + 100,
Value: 2,
},
},
},
}
out := NewRemoteWriteFrameOutput(RemoteWriteConfig{
SampleMilliseconds: 50,
})
sampledTimeSeries := out.sample(timeSeries)
require.Len(t, sampledTimeSeries, 2)
expectedSamples := map[string][]prompb.Sample{
"test1": timeSeries[0].Samples,
"test2": {
{
Timestamp: now,
Value: 1,
},
{
Timestamp: now + 100,
Value: 2,
},
},
}
require.Equal(t, expectedSamples[sampledTimeSeries[0].Labels[0].Value], sampledTimeSeries[0].Samples)
require.Equal(t, expectedSamples[sampledTimeSeries[1].Labels[0].Value], sampledTimeSeries[1].Samples)
}

View File

@ -0,0 +1,87 @@
package remotewrite
import (
"testing"
"time"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/stretchr/testify/require"
)
func TestTsFromFrames(t *testing.T) {
t1 := time.Now()
t2 := time.Now().Add(time.Second)
frame := data.NewFrame("test",
data.NewField("time", map[string]string{"test": "yes"}, []time.Time{t1, t2}),
data.NewField("value", map[string]string{"test": "yes"}, []float64{1.0, 2.0}),
)
ts := TimeSeriesFromFrames(frame)
require.Len(t, ts, 1)
require.Len(t, ts[0].Samples, 2)
require.Equal(t, toSampleTime(t1), ts[0].Samples[0].Timestamp)
require.Equal(t, toSampleTime(t2), ts[0].Samples[1].Timestamp)
require.Len(t, ts[0].Labels, 2)
require.Equal(t, "test", ts[0].Labels[0].Name)
require.Equal(t, "yes", ts[0].Labels[0].Value)
require.Equal(t, "__name__", ts[0].Labels[1].Name)
require.Equal(t, "test_value", ts[0].Labels[1].Value)
}
func TestTsFromFramesMultipleSeries(t *testing.T) {
t1 := time.Now()
t2 := time.Now().Add(time.Second)
frame := data.NewFrame("test",
data.NewField("time", nil, []time.Time{t1, t2}),
data.NewField("value1", nil, []float64{1.0, 2.0}),
data.NewField("value2", nil, []bool{true, false}),
data.NewField("value3", nil, []float64{3.0, 4.0}),
)
ts := TimeSeriesFromFrames(frame)
require.Len(t, ts, 2)
require.Len(t, ts[0].Samples, 2)
require.Equal(t, toSampleTime(t1), ts[0].Samples[0].Timestamp)
require.Equal(t, toSampleTime(t2), ts[0].Samples[1].Timestamp)
require.Equal(t, 1.0, ts[0].Samples[0].Value)
require.Equal(t, 2.0, ts[0].Samples[1].Value)
require.Len(t, ts[1].Samples, 2)
require.Equal(t, toSampleTime(t1), ts[1].Samples[0].Timestamp)
require.Equal(t, toSampleTime(t2), ts[1].Samples[1].Timestamp)
require.Equal(t, 3.0, ts[1].Samples[0].Value)
require.Equal(t, 4.0, ts[1].Samples[1].Value)
}
func TestTsFromFramesMultipleFrames(t *testing.T) {
t1 := time.Now()
t2 := time.Now().Add(time.Second)
t3 := time.Now().Add(2 * time.Second)
t4 := time.Now().Add(3 * time.Second)
frame1 := data.NewFrame("test",
data.NewField("time", nil, []time.Time{t1, t2}),
data.NewField("value1", nil, []float64{1.0, 2.0}),
)
frame2 := data.NewFrame("test",
data.NewField("time", nil, []time.Time{t3, t4}),
data.NewField("value3", nil, []float64{3.0, 4.0}),
)
ts := TimeSeriesFromFrames(frame1, frame2)
require.Len(t, ts, 2)
require.Len(t, ts[0].Samples, 2)
require.Equal(t, toSampleTime(t1), ts[0].Samples[0].Timestamp)
require.Equal(t, toSampleTime(t2), ts[0].Samples[1].Timestamp)
require.Equal(t, 1.0, ts[0].Samples[0].Value)
require.Equal(t, 2.0, ts[0].Samples[1].Value)
require.Len(t, ts[1].Samples, 2)
require.Equal(t, toSampleTime(t3), ts[1].Samples[0].Timestamp)
require.Equal(t, toSampleTime(t4), ts[1].Samples[1].Timestamp)
require.Equal(t, 3.0, ts[1].Samples[0].Value)
require.Equal(t, 4.0, ts[1].Samples[1].Value)
}
func TestSerialize(t *testing.T) {
frame := data.NewFrame("test",
data.NewField("time", nil, []time.Time{time.Now(), time.Now().Add(time.Second)}),
data.NewField("value", nil, []float64{1.0, 2.0}),
)
_, err := Serialize(frame)
require.NoError(t, err)
}