mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Loki: backend: use json-field (#48486)
* fixed strings * loki: backend: use json-field
This commit is contained in:
parent
a8354a0319
commit
7d369a1dea
@ -1,6 +1,7 @@
|
|||||||
package loki
|
package loki
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"hash/fnv"
|
"hash/fnv"
|
||||||
"sort"
|
"sort"
|
||||||
@ -88,12 +89,12 @@ func adjustLogsFrame(frame *data.Frame, query *lokiQuery) error {
|
|||||||
timeField := fields[1]
|
timeField := fields[1]
|
||||||
lineField := fields[2]
|
lineField := fields[2]
|
||||||
|
|
||||||
if (timeField.Type() != data.FieldTypeTime) || (lineField.Type() != data.FieldTypeString) || (labelsField.Type() != data.FieldTypeString) {
|
if (timeField.Type() != data.FieldTypeTime) || (lineField.Type() != data.FieldTypeString) || (labelsField.Type() != data.FieldTypeJSON) {
|
||||||
return fmt.Errorf("invalid fields in metric frame")
|
return fmt.Errorf("invalid fields in logs frame")
|
||||||
}
|
}
|
||||||
|
|
||||||
if (timeField.Len() != lineField.Len()) || (timeField.Len() != labelsField.Len()) {
|
if (timeField.Len() != lineField.Len()) || (timeField.Len() != labelsField.Len()) {
|
||||||
return fmt.Errorf("invalid fields in metric frame")
|
return fmt.Errorf("invalid fields in logs frame")
|
||||||
}
|
}
|
||||||
|
|
||||||
if frame.Meta == nil {
|
if frame.Meta == nil {
|
||||||
@ -127,8 +128,9 @@ func makeStringTimeField(timeField *data.Field) *data.Field {
|
|||||||
return data.NewField("tsNs", timeField.Labels.Copy(), stringTimestamps)
|
return data.NewField("tsNs", timeField.Labels.Copy(), stringTimestamps)
|
||||||
}
|
}
|
||||||
|
|
||||||
func calculateCheckSum(time string, line string, labels string) (string, error) {
|
func calculateCheckSum(time string, line string, labels []byte) (string, error) {
|
||||||
input := []byte(line + "_" + labels)
|
input := []byte(line + "_")
|
||||||
|
input = append(input, labels...)
|
||||||
hash := fnv.New32()
|
hash := fnv.New32()
|
||||||
_, err := hash.Write(input)
|
_, err := hash.Write(input)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -147,7 +149,7 @@ func makeIdField(stringTimeField *data.Field, lineField *data.Field, labelsField
|
|||||||
for i := 0; i < length; i++ {
|
for i := 0; i < length; i++ {
|
||||||
time := stringTimeField.At(i).(string)
|
time := stringTimeField.At(i).(string)
|
||||||
line := lineField.At(i).(string)
|
line := lineField.At(i).(string)
|
||||||
labels := labelsField.At(i).(string)
|
labels := labelsField.At(i).(json.RawMessage)
|
||||||
|
|
||||||
sum, err := calculateCheckSum(time, line, labels)
|
sum, err := calculateCheckSum(time, line, labels)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package loki
|
package loki
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -39,11 +40,11 @@ func TestFormatName(t *testing.T) {
|
|||||||
func TestAdjustFrame(t *testing.T) {
|
func TestAdjustFrame(t *testing.T) {
|
||||||
t.Run("logs-frame metadata should be set correctly", func(t *testing.T) {
|
t.Run("logs-frame metadata should be set correctly", func(t *testing.T) {
|
||||||
frame := data.NewFrame("",
|
frame := data.NewFrame("",
|
||||||
data.NewField("labels", nil, []string{
|
data.NewField("labels", nil, []json.RawMessage{
|
||||||
`{"level":"info"}`,
|
json.RawMessage(`{"level":"info"}`),
|
||||||
`{"level":"error"}`,
|
json.RawMessage(`{"level":"error"}`),
|
||||||
`{"level":"error"}`,
|
json.RawMessage(`{"level":"error"}`),
|
||||||
`{"level":"info"}`,
|
json.RawMessage(`{"level":"info"}`),
|
||||||
}),
|
}),
|
||||||
data.NewField("time", nil, []time.Time{
|
data.NewField("time", nil, []time.Time{
|
||||||
time.Date(2022, 1, 2, 3, 4, 5, 6, time.UTC),
|
time.Date(2022, 1, 2, 3, 4, 5, 6, time.UTC),
|
||||||
@ -86,7 +87,7 @@ func TestAdjustFrame(t *testing.T) {
|
|||||||
require.Equal(t, "1641092765000000006_948c1a7d_A", idField.At(3))
|
require.Equal(t, "1641092765000000006_948c1a7d_A", idField.At(3))
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("logs-frame id and string-time fields should be created", func(t *testing.T) {
|
t.Run("naming inside metric fields should be correct", func(t *testing.T) {
|
||||||
field1 := data.NewField("", nil, make([]time.Time, 0))
|
field1 := data.NewField("", nil, make([]time.Time, 0))
|
||||||
field2 := data.NewField("", nil, make([]float64, 0))
|
field2 := data.NewField("", nil, make([]float64, 0))
|
||||||
field2.Labels = data.Labels{"app": "Application", "tag2": "tag2"}
|
field2.Labels = data.Labels{"app": "Application", "tag2": "tag2"}
|
||||||
|
@ -1,8 +1,8 @@
|
|||||||
package loki
|
package loki
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sort"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||||
@ -109,35 +109,23 @@ func lokiVectorToDataFrames(vector loghttp.Vector, query *lokiQuery, stats []dat
|
|||||||
}
|
}
|
||||||
|
|
||||||
// we serialize the labels as an ordered list of pairs
|
// we serialize the labels as an ordered list of pairs
|
||||||
func labelsToString(labels data.Labels) (string, error) {
|
func labelsToRawJson(labels data.Labels) (json.RawMessage, error) {
|
||||||
keys := make([]string, 0, len(labels))
|
// data.Labels when converted to JSON keep the fields sorted
|
||||||
for k := range labels {
|
bytes, err := jsoniter.Marshal(labels)
|
||||||
keys = append(keys, k)
|
|
||||||
}
|
|
||||||
sort.Strings(keys)
|
|
||||||
|
|
||||||
labelArray := make([][2]string, 0, len(labels))
|
|
||||||
|
|
||||||
for _, k := range keys {
|
|
||||||
pair := [2]string{k, labels[k]}
|
|
||||||
labelArray = append(labelArray, pair)
|
|
||||||
}
|
|
||||||
|
|
||||||
bytes, err := jsoniter.Marshal(labelArray)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return string(bytes), nil
|
return json.RawMessage(bytes), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func lokiStreamsToDataFrames(streams loghttp.Streams, query *lokiQuery, stats []data.QueryStat) (data.Frames, error) {
|
func lokiStreamsToDataFrames(streams loghttp.Streams, query *lokiQuery, stats []data.QueryStat) (data.Frames, error) {
|
||||||
var timeVector []time.Time
|
var timeVector []time.Time
|
||||||
var values []string
|
var values []string
|
||||||
var labelsVector []string
|
var labelsVector []json.RawMessage
|
||||||
|
|
||||||
for _, v := range streams {
|
for _, v := range streams {
|
||||||
labelsText, err := labelsToString(v.Labels.Map())
|
labelsJson, err := labelsToRawJson(v.Labels.Map())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -145,17 +133,13 @@ func lokiStreamsToDataFrames(streams loghttp.Streams, query *lokiQuery, stats []
|
|||||||
for _, k := range v.Entries {
|
for _, k := range v.Entries {
|
||||||
timeVector = append(timeVector, k.Timestamp.UTC())
|
timeVector = append(timeVector, k.Timestamp.UTC())
|
||||||
values = append(values, k.Line)
|
values = append(values, k.Line)
|
||||||
labelsVector = append(labelsVector, labelsText)
|
labelsVector = append(labelsVector, labelsJson)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
timeField := data.NewField(data.TimeSeriesTimeFieldName, nil, timeVector)
|
timeField := data.NewField(data.TimeSeriesTimeFieldName, nil, timeVector)
|
||||||
valueField := data.NewField("Line", nil, values)
|
valueField := data.NewField("Line", nil, values)
|
||||||
labelsField := data.NewField("labels", nil, labelsVector)
|
labelsField := data.NewField("labels", nil, labelsVector)
|
||||||
labelsField.Config = &data.FieldConfig{
|
|
||||||
// we should have a native json-field-type
|
|
||||||
Custom: map[string]interface{}{"json": true},
|
|
||||||
}
|
|
||||||
|
|
||||||
frame := data.NewFrame("", labelsField, timeField, valueField)
|
frame := data.NewFrame("", labelsField, timeField, valueField)
|
||||||
frame.SetMeta(&data.FrameMeta{
|
frame.SetMeta(&data.FrameMeta{
|
||||||
|
22
pkg/tsdb/loki/testdata/streams_simple.golden.txt
vendored
22
pkg/tsdb/loki/testdata/streams_simple.golden.txt
vendored
File diff suppressed because one or more lines are too long
@ -15,13 +15,13 @@ const inputFrame: DataFrame = {
|
|||||||
name: 'time',
|
name: 'time',
|
||||||
type: FieldType.time,
|
type: FieldType.time,
|
||||||
config: {},
|
config: {},
|
||||||
values: new ArrayVector([1645030244810, 1645030247027, 1645030246277, 1645030245539, 1645030244091]),
|
values: new ArrayVector([1645030244810, 1645030247027]),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: 'value',
|
name: 'value',
|
||||||
type: FieldType.string,
|
type: FieldType.string,
|
||||||
config: {},
|
config: {},
|
||||||
values: new ArrayVector(['line1', 'line2', 'line3', 'line4', 'line5']),
|
values: new ArrayVector(['line1', 'line2']),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: 'labels',
|
name: 'labels',
|
||||||
@ -31,31 +31,19 @@ const inputFrame: DataFrame = {
|
|||||||
json: true,
|
json: true,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
values: new ArrayVector([
|
values: new ArrayVector([`[["level", "info"],["code", "41🌙"]]`, `[["level", "error"],["code", "41🌙"]]`]),
|
||||||
`[["level", "info"],["code", "41🌙"]]`,
|
|
||||||
`[["level", "error"],["code", "41🌙"]]`,
|
|
||||||
`[["level", "error"],["code", "43🌙"]]`,
|
|
||||||
`[["level", "error"],["code", "41🌙"]]`,
|
|
||||||
`[["level", "info"],["code", "41🌙"]]`,
|
|
||||||
]),
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: 'tsNs',
|
name: 'tsNs',
|
||||||
type: FieldType.time,
|
type: FieldType.string,
|
||||||
config: {},
|
config: {},
|
||||||
values: new ArrayVector([
|
values: new ArrayVector(['1645030244810757120', '1645030247027735040']),
|
||||||
'1645030244810757120',
|
|
||||||
'1645030247027735040',
|
|
||||||
'1645030246277587968',
|
|
||||||
'1645030245539423744',
|
|
||||||
'1645030244091700992',
|
|
||||||
]),
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: 'id',
|
name: 'id',
|
||||||
type: FieldType.string,
|
type: FieldType.string,
|
||||||
config: {},
|
config: {},
|
||||||
values: new ArrayVector(['id1', 'id2', 'id3', 'id4', 'id5']),
|
values: new ArrayVector(['id1', 'id2']),
|
||||||
},
|
},
|
||||||
],
|
],
|
||||||
length: 5,
|
length: 5,
|
||||||
@ -74,14 +62,7 @@ describe('loki backendResultTransformer', () => {
|
|||||||
lokiQueryStatKey: 'Summary: total bytes processed',
|
lokiQueryStatKey: 'Summary: total bytes processed',
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
expectedFrame.fields[2].type = FieldType.other;
|
expectedFrame.fields[3].type = FieldType.time;
|
||||||
expectedFrame.fields[2].values = new ArrayVector([
|
|
||||||
{ level: 'info', code: '41🌙' },
|
|
||||||
{ level: 'error', code: '41🌙' },
|
|
||||||
{ level: 'error', code: '43🌙' },
|
|
||||||
{ level: 'error', code: '41🌙' },
|
|
||||||
{ level: 'info', code: '41🌙' },
|
|
||||||
]);
|
|
||||||
|
|
||||||
const expected: DataQueryResponse = { data: [expectedFrame] };
|
const expected: DataQueryResponse = { data: [expectedFrame] };
|
||||||
|
|
||||||
|
@ -1,12 +1,4 @@
|
|||||||
import {
|
import { DataQueryResponse, DataFrame, isDataFrame, FieldType, QueryResultMeta } from '@grafana/data';
|
||||||
DataQueryResponse,
|
|
||||||
DataFrame,
|
|
||||||
isDataFrame,
|
|
||||||
FieldType,
|
|
||||||
QueryResultMeta,
|
|
||||||
ArrayVector,
|
|
||||||
Labels,
|
|
||||||
} from '@grafana/data';
|
|
||||||
|
|
||||||
import { makeTableFrames } from './makeTableFrames';
|
import { makeTableFrames } from './makeTableFrames';
|
||||||
import { formatQuery, getHighlighterExpressionsFromQuery } from './query_utils';
|
import { formatQuery, getHighlighterExpressionsFromQuery } from './query_utils';
|
||||||
@ -27,12 +19,6 @@ function setFrameMeta(frame: DataFrame, meta: QueryResultMeta): DataFrame {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
function decodeLabelsInJson(text: string): Labels {
|
|
||||||
const array: Array<[string, string]> = JSON.parse(text);
|
|
||||||
// NOTE: maybe we should go with maps, those have guaranteed ordering
|
|
||||||
return Object.fromEntries(array);
|
|
||||||
}
|
|
||||||
|
|
||||||
function processStreamFrame(frame: DataFrame, query: LokiQuery | undefined): DataFrame {
|
function processStreamFrame(frame: DataFrame, query: LokiQuery | undefined): DataFrame {
|
||||||
const meta: QueryResultMeta = {
|
const meta: QueryResultMeta = {
|
||||||
preferredVisualisationType: 'logs',
|
preferredVisualisationType: 'logs',
|
||||||
@ -46,19 +32,6 @@ function processStreamFrame(frame: DataFrame, query: LokiQuery | undefined): Dat
|
|||||||
|
|
||||||
const newFields = newFrame.fields.map((field) => {
|
const newFields = newFrame.fields.map((field) => {
|
||||||
switch (field.name) {
|
switch (field.name) {
|
||||||
case 'labels': {
|
|
||||||
// the labels, when coming from the server, are json-encoded.
|
|
||||||
// here we decode them if needed.
|
|
||||||
return field.config.custom.json
|
|
||||||
? {
|
|
||||||
name: field.name,
|
|
||||||
type: FieldType.other,
|
|
||||||
config: field.config,
|
|
||||||
// we are parsing the labels the same way as streaming-dataframes do
|
|
||||||
values: new ArrayVector(field.values.toArray().map((text) => decodeLabelsInJson(text))),
|
|
||||||
}
|
|
||||||
: field;
|
|
||||||
}
|
|
||||||
case 'tsNs': {
|
case 'tsNs': {
|
||||||
// we need to switch the field-type to be `time`
|
// we need to switch the field-type to be `time`
|
||||||
return {
|
return {
|
||||||
|
Loading…
Reference in New Issue
Block a user