mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
loki: backend-mode: add stats support (#46526)
* loki: backend-mode: add stats support * refactor: rename variable
This commit is contained in:
parent
1a4b1184bd
commit
a246381879
@ -39,6 +39,9 @@ func TestSuccessResponse(t *testing.T) {
|
||||
{name: "parse a matrix response with Infinity", filepath: "matrix_inf", query: matrixQuery},
|
||||
{name: "parse a matrix response with very small step value", filepath: "matrix_small_step", query: matrixQuery},
|
||||
|
||||
// loki adds stats to matrix-responses too
|
||||
{name: "parse a matrix response with stats", filepath: "matrix_with_stats", query: matrixQuery},
|
||||
|
||||
{name: "parse a simple vector response", filepath: "vector_simple", query: vectorQuery},
|
||||
{name: "parse a vector response with special values", filepath: "vector_special_values", query: vectorQuery},
|
||||
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/loki/pkg/loghttp"
|
||||
"github.com/grafana/loki/pkg/logqlmodel/stats"
|
||||
)
|
||||
|
||||
func parseResponse(value *loghttp.QueryResponse, query *lokiQuery) (data.Frames, error) {
|
||||
@ -23,19 +24,20 @@ func parseResponse(value *loghttp.QueryResponse, query *lokiQuery) (data.Frames,
|
||||
}
|
||||
|
||||
func lokiResponseToDataFrames(value *loghttp.QueryResponse, query *lokiQuery) (data.Frames, error) {
|
||||
stats := parseStats(value.Data.Statistics)
|
||||
switch res := value.Data.Result.(type) {
|
||||
case loghttp.Matrix:
|
||||
return lokiMatrixToDataFrames(res, query), nil
|
||||
return lokiMatrixToDataFrames(res, query, stats), nil
|
||||
case loghttp.Vector:
|
||||
return lokiVectorToDataFrames(res, query), nil
|
||||
return lokiVectorToDataFrames(res, query, stats), nil
|
||||
case loghttp.Streams:
|
||||
return lokiStreamsToDataFrames(res, query), nil
|
||||
return lokiStreamsToDataFrames(res, query, stats), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("resultType %T not supported{", res)
|
||||
}
|
||||
}
|
||||
|
||||
func lokiMatrixToDataFrames(matrix loghttp.Matrix, query *lokiQuery) data.Frames {
|
||||
func lokiMatrixToDataFrames(matrix loghttp.Matrix, query *lokiQuery, stats []data.QueryStat) data.Frames {
|
||||
frames := data.Frames{}
|
||||
|
||||
for _, v := range matrix {
|
||||
@ -56,6 +58,9 @@ func lokiMatrixToDataFrames(matrix loghttp.Matrix, query *lokiQuery) data.Frames
|
||||
valueField := data.NewField("", tags, values)
|
||||
|
||||
frame := data.NewFrame("", timeField, valueField)
|
||||
frame.SetMeta(&data.FrameMeta{
|
||||
Stats: stats,
|
||||
})
|
||||
|
||||
frames = append(frames, frame)
|
||||
}
|
||||
@ -63,7 +68,7 @@ func lokiMatrixToDataFrames(matrix loghttp.Matrix, query *lokiQuery) data.Frames
|
||||
return frames
|
||||
}
|
||||
|
||||
func lokiVectorToDataFrames(vector loghttp.Vector, query *lokiQuery) data.Frames {
|
||||
func lokiVectorToDataFrames(vector loghttp.Vector, query *lokiQuery, stats []data.QueryStat) data.Frames {
|
||||
frames := data.Frames{}
|
||||
|
||||
for _, v := range vector {
|
||||
@ -78,6 +83,9 @@ func lokiVectorToDataFrames(vector loghttp.Vector, query *lokiQuery) data.Frames
|
||||
valueField := data.NewField("", tags, values)
|
||||
|
||||
frame := data.NewFrame("", timeField, valueField)
|
||||
frame.SetMeta(&data.FrameMeta{
|
||||
Stats: stats,
|
||||
})
|
||||
|
||||
frames = append(frames, frame)
|
||||
}
|
||||
@ -85,7 +93,7 @@ func lokiVectorToDataFrames(vector loghttp.Vector, query *lokiQuery) data.Frames
|
||||
return frames
|
||||
}
|
||||
|
||||
func lokiStreamsToDataFrames(streams loghttp.Streams, query *lokiQuery) data.Frames {
|
||||
func lokiStreamsToDataFrames(streams loghttp.Streams, query *lokiQuery, stats []data.QueryStat) data.Frames {
|
||||
frames := data.Frames{}
|
||||
|
||||
for _, v := range streams {
|
||||
@ -106,9 +114,68 @@ func lokiStreamsToDataFrames(streams loghttp.Streams, query *lokiQuery) data.Fra
|
||||
valueField := data.NewField("", tags, values)
|
||||
|
||||
frame := data.NewFrame("", timeField, valueField)
|
||||
frame.SetMeta(&data.FrameMeta{
|
||||
Stats: stats,
|
||||
})
|
||||
|
||||
frames = append(frames, frame)
|
||||
}
|
||||
|
||||
return frames
|
||||
}
|
||||
|
||||
func parseStats(result stats.Result) []data.QueryStat {
|
||||
data := []data.QueryStat{
|
||||
makeStat("Summary: bytes processed per second", float64(result.Summary.BytesProcessedPerSecond), "Bps"),
|
||||
makeStat("Summary: lines processed per second", float64(result.Summary.LinesProcessedPerSecond), ""),
|
||||
makeStat("Summary: total bytes processed", float64(result.Summary.TotalBytesProcessed), "decbytes"),
|
||||
makeStat("Summary: total lines processed", float64(result.Summary.TotalLinesProcessed), ""),
|
||||
makeStat("Summary: exec time", result.Summary.ExecTime, "s"),
|
||||
makeStat("Store: total chunks ref", float64(result.Store.TotalChunksRef), ""),
|
||||
makeStat("Store: total chunks downloaded", float64(result.Store.TotalChunksDownloaded), ""),
|
||||
makeStat("Store: chunks download time", result.Store.ChunksDownloadTime, "s"),
|
||||
makeStat("Store: head chunk bytes", float64(result.Store.HeadChunkBytes), "decbytes"),
|
||||
makeStat("Store: head chunk lines", float64(result.Store.HeadChunkLines), ""),
|
||||
makeStat("Store: decompressed bytes", float64(result.Store.DecompressedBytes), "decbytes"),
|
||||
makeStat("Store: decompressed lines", float64(result.Store.DecompressedLines), ""),
|
||||
makeStat("Store: compressed bytes", float64(result.Store.CompressedBytes), "decbytes"),
|
||||
makeStat("Store: total duplicates", float64(result.Store.TotalDuplicates), ""),
|
||||
makeStat("Ingester: total reached", float64(result.Ingester.TotalReached), ""),
|
||||
makeStat("Ingester: total chunks matched", float64(result.Ingester.TotalChunksMatched), ""),
|
||||
makeStat("Ingester: total batches", float64(result.Ingester.TotalBatches), ""),
|
||||
makeStat("Ingester: total lines sent", float64(result.Ingester.TotalLinesSent), ""),
|
||||
makeStat("Ingester: head chunk bytes", float64(result.Ingester.HeadChunkBytes), "decbytes"),
|
||||
makeStat("Ingester: head chunk lines", float64(result.Ingester.HeadChunkLines), ""),
|
||||
makeStat("Ingester: decompressed bytes", float64(result.Ingester.DecompressedBytes), "decbytes"),
|
||||
makeStat("Ingester: decompressed lines", float64(result.Ingester.DecompressedLines), ""),
|
||||
makeStat("Ingester: compressed bytes", float64(result.Ingester.CompressedBytes), "decbytes"),
|
||||
makeStat("Ingester: total duplicates", float64(result.Ingester.TotalDuplicates), ""),
|
||||
}
|
||||
|
||||
// it is not possible to know whether the given statistics was missing, or
|
||||
// it's value was zero.
|
||||
// we do a heuristic here, if every stat-value is zero, we assume we got no stats-data
|
||||
allStatsZero := true
|
||||
for _, stat := range data {
|
||||
if stat.Value > 0 {
|
||||
allStatsZero = false
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if allStatsZero {
|
||||
return nil
|
||||
}
|
||||
|
||||
return data
|
||||
}
|
||||
|
||||
func makeStat(name string, value float64, unit string) data.QueryStat {
|
||||
return data.QueryStat{
|
||||
FieldConfig: data.FieldConfig{
|
||||
DisplayName: name,
|
||||
Unit: unit,
|
||||
},
|
||||
Value: value,
|
||||
}
|
||||
}
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/loki/pkg/loghttp"
|
||||
"github.com/grafana/loki/pkg/logqlmodel/stats"
|
||||
p "github.com/prometheus/common/model"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@ -105,4 +106,77 @@ func TestParseResponse(t *testing.T) {
|
||||
require.NotNil(t, timeFieldConfig)
|
||||
require.Equal(t, float64(42000), timeFieldConfig.Interval)
|
||||
})
|
||||
|
||||
t.Run("should parse response stats", func(t *testing.T) {
|
||||
stats := stats.Result{
|
||||
Summary: stats.Summary{
|
||||
BytesProcessedPerSecond: 1,
|
||||
LinesProcessedPerSecond: 2,
|
||||
TotalBytesProcessed: 3,
|
||||
TotalLinesProcessed: 4,
|
||||
ExecTime: 5.5,
|
||||
},
|
||||
Store: stats.Store{
|
||||
TotalChunksRef: 6,
|
||||
TotalChunksDownloaded: 7,
|
||||
ChunksDownloadTime: 8.8,
|
||||
HeadChunkBytes: 9,
|
||||
HeadChunkLines: 10,
|
||||
DecompressedBytes: 11,
|
||||
DecompressedLines: 12,
|
||||
CompressedBytes: 13,
|
||||
TotalDuplicates: 14,
|
||||
},
|
||||
Ingester: stats.Ingester{
|
||||
TotalReached: 15,
|
||||
TotalChunksMatched: 16,
|
||||
TotalBatches: 17,
|
||||
TotalLinesSent: 18,
|
||||
HeadChunkBytes: 19,
|
||||
HeadChunkLines: 20,
|
||||
DecompressedBytes: 21,
|
||||
DecompressedLines: 22,
|
||||
CompressedBytes: 23,
|
||||
TotalDuplicates: 24,
|
||||
},
|
||||
}
|
||||
|
||||
expected := []data.QueryStat{
|
||||
{FieldConfig: data.FieldConfig{DisplayName: "Summary: bytes processed per second", Unit: "Bps"}, Value: 1},
|
||||
{FieldConfig: data.FieldConfig{DisplayName: "Summary: lines processed per second", Unit: ""}, Value: 2},
|
||||
{FieldConfig: data.FieldConfig{DisplayName: "Summary: total bytes processed", Unit: "decbytes"}, Value: 3},
|
||||
{FieldConfig: data.FieldConfig{DisplayName: "Summary: total lines processed", Unit: ""}, Value: 4},
|
||||
{FieldConfig: data.FieldConfig{DisplayName: "Summary: exec time", Unit: "s"}, Value: 5.5},
|
||||
|
||||
{FieldConfig: data.FieldConfig{DisplayName: "Store: total chunks ref", Unit: ""}, Value: 6},
|
||||
{FieldConfig: data.FieldConfig{DisplayName: "Store: total chunks downloaded", Unit: ""}, Value: 7},
|
||||
{FieldConfig: data.FieldConfig{DisplayName: "Store: chunks download time", Unit: "s"}, Value: 8.8},
|
||||
{FieldConfig: data.FieldConfig{DisplayName: "Store: head chunk bytes", Unit: "decbytes"}, Value: 9},
|
||||
{FieldConfig: data.FieldConfig{DisplayName: "Store: head chunk lines", Unit: ""}, Value: 10},
|
||||
{FieldConfig: data.FieldConfig{DisplayName: "Store: decompressed bytes", Unit: "decbytes"}, Value: 11},
|
||||
{FieldConfig: data.FieldConfig{DisplayName: "Store: decompressed lines", Unit: ""}, Value: 12},
|
||||
{FieldConfig: data.FieldConfig{DisplayName: "Store: compressed bytes", Unit: "decbytes"}, Value: 13},
|
||||
{FieldConfig: data.FieldConfig{DisplayName: "Store: total duplicates", Unit: ""}, Value: 14},
|
||||
|
||||
{FieldConfig: data.FieldConfig{DisplayName: "Ingester: total reached", Unit: ""}, Value: 15},
|
||||
{FieldConfig: data.FieldConfig{DisplayName: "Ingester: total chunks matched", Unit: ""}, Value: 16},
|
||||
{FieldConfig: data.FieldConfig{DisplayName: "Ingester: total batches", Unit: ""}, Value: 17},
|
||||
{FieldConfig: data.FieldConfig{DisplayName: "Ingester: total lines sent", Unit: ""}, Value: 18},
|
||||
{FieldConfig: data.FieldConfig{DisplayName: "Ingester: head chunk bytes", Unit: "decbytes"}, Value: 19},
|
||||
{FieldConfig: data.FieldConfig{DisplayName: "Ingester: head chunk lines", Unit: ""}, Value: 20},
|
||||
{FieldConfig: data.FieldConfig{DisplayName: "Ingester: decompressed bytes", Unit: "decbytes"}, Value: 21},
|
||||
{FieldConfig: data.FieldConfig{DisplayName: "Ingester: decompressed lines", Unit: ""}, Value: 22},
|
||||
{FieldConfig: data.FieldConfig{DisplayName: "Ingester: compressed bytes", Unit: "decbytes"}, Value: 23},
|
||||
{FieldConfig: data.FieldConfig{DisplayName: "Ingester: total duplicates", Unit: ""}, Value: 24},
|
||||
}
|
||||
|
||||
result := parseStats((stats))
|
||||
|
||||
// NOTE: i compare it item-by-item otherwise the test-fail-error-message is very hard to read
|
||||
require.Len(t, result, len(expected))
|
||||
|
||||
for i := 0; i < len(result); i++ {
|
||||
require.Equal(t, expected[i], result[i])
|
||||
}
|
||||
})
|
||||
}
|
||||
|
252
pkg/tsdb/loki/testdata/matrix_with_stats.golden.txt
vendored
Normal file
252
pkg/tsdb/loki/testdata/matrix_with_stats.golden.txt
vendored
Normal file
File diff suppressed because one or more lines are too long
60
pkg/tsdb/loki/testdata/matrix_with_stats.json
vendored
Normal file
60
pkg/tsdb/loki/testdata/matrix_with_stats.json
vendored
Normal file
@ -0,0 +1,60 @@
|
||||
{
|
||||
"status": "success",
|
||||
"data": {
|
||||
"resultType": "matrix",
|
||||
"result": [
|
||||
{
|
||||
"metric": {
|
||||
"level": "error",
|
||||
"location": "moon"
|
||||
},
|
||||
"values": [
|
||||
[1639125366.989, "0.4"],
|
||||
[1639125406.989, "0.2"]
|
||||
]
|
||||
},
|
||||
{
|
||||
"metric": {
|
||||
"level": "info",
|
||||
"location": "mars"
|
||||
},
|
||||
"values": [
|
||||
[1639125386.989, "0.6"],
|
||||
[1639125396.989, "0.8"]
|
||||
]
|
||||
}
|
||||
],
|
||||
"stats": {
|
||||
"summary": {
|
||||
"bytesProcessedPerSecond": 3507022,
|
||||
"linesProcessedPerSecond": 24818,
|
||||
"totalBytesProcessed": 7772,
|
||||
"totalLinesProcessed": 55,
|
||||
"execTime": 0.002216125
|
||||
},
|
||||
"store": {
|
||||
"totalChunksRef": 2,
|
||||
"totalChunksDownloaded": 3,
|
||||
"chunksDownloadTime": 0.000390958,
|
||||
"headChunkBytes": 4,
|
||||
"headChunkLines": 5,
|
||||
"decompressedBytes": 7772,
|
||||
"decompressedLines": 55,
|
||||
"compressedBytes": 31432,
|
||||
"totalDuplicates": 6
|
||||
},
|
||||
"ingester": {
|
||||
"totalReached": 7,
|
||||
"totalChunksMatched": 8,
|
||||
"totalBatches": 9,
|
||||
"totalLinesSent": 10,
|
||||
"headChunkBytes": 11,
|
||||
"headChunkLines": 12,
|
||||
"decompressedBytes": 13,
|
||||
"decompressedLines": 14,
|
||||
"compressedBytes": 15,
|
||||
"totalDuplicates": 16
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
220
pkg/tsdb/loki/testdata/streams_simple.golden.txt
vendored
220
pkg/tsdb/loki/testdata/streams_simple.golden.txt
vendored
File diff suppressed because one or more lines are too long
28
pkg/tsdb/loki/testdata/streams_simple.json
vendored
28
pkg/tsdb/loki/testdata/streams_simple.json
vendored
@ -50,26 +50,26 @@
|
||||
},
|
||||
"store": {
|
||||
"totalChunksRef": 2,
|
||||
"totalChunksDownloaded": 2,
|
||||
"totalChunksDownloaded": 3,
|
||||
"chunksDownloadTime": 0.000390958,
|
||||
"headChunkBytes": 0,
|
||||
"headChunkLines": 0,
|
||||
"headChunkBytes": 4,
|
||||
"headChunkLines": 5,
|
||||
"decompressedBytes": 7772,
|
||||
"decompressedLines": 55,
|
||||
"compressedBytes": 31432,
|
||||
"totalDuplicates": 0
|
||||
"totalDuplicates": 6
|
||||
},
|
||||
"ingester": {
|
||||
"totalReached": 0,
|
||||
"totalChunksMatched": 0,
|
||||
"totalBatches": 0,
|
||||
"totalLinesSent": 0,
|
||||
"headChunkBytes": 0,
|
||||
"headChunkLines": 0,
|
||||
"decompressedBytes": 0,
|
||||
"decompressedLines": 0,
|
||||
"compressedBytes": 0,
|
||||
"totalDuplicates": 0
|
||||
"totalReached": 7,
|
||||
"totalChunksMatched": 8,
|
||||
"totalBatches": 9,
|
||||
"totalLinesSent": 10,
|
||||
"headChunkBytes": 11,
|
||||
"headChunkLines": 12,
|
||||
"decompressedBytes": 13,
|
||||
"decompressedLines": 14,
|
||||
"compressedBytes": 15,
|
||||
"totalDuplicates": 16
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -76,6 +76,9 @@ describe('loki backendResultTransformer', () => {
|
||||
executedQueryString: 'something1',
|
||||
preferredVisualisationType: 'logs',
|
||||
searchWords: ['thing1'],
|
||||
custom: {
|
||||
lokiQueryStatKey: 'Summary: total bytes processed',
|
||||
},
|
||||
};
|
||||
expectedFrame.fields[2].type = FieldType.time;
|
||||
expectedFrame.fields.push({
|
||||
|
@ -23,6 +23,10 @@ function processStreamFrame(frame: DataFrame, query: LokiQuery | undefined): Dat
|
||||
const meta: QueryResultMeta = {
|
||||
preferredVisualisationType: 'logs',
|
||||
searchWords: query !== undefined ? getHighlighterExpressionsFromQuery(formatQuery(query.expr)) : undefined,
|
||||
custom: {
|
||||
// used by logs_model
|
||||
lokiQueryStatKey: 'Summary: total bytes processed',
|
||||
},
|
||||
};
|
||||
const newFrame = setFrameMeta(frame, meta);
|
||||
const newFields = frame.fields.map((field) => {
|
||||
|
Loading…
Reference in New Issue
Block a user