loki: use single-dataframe format on the backend (#47069)

This commit is contained in:
Gábor Farkas
2022-04-12 11:58:48 +02:00
committed by GitHub
parent 201557c6fc
commit 68511e7712
10 changed files with 383 additions and 437 deletions

View File

@@ -2,6 +2,7 @@ package loki
import (
"fmt"
"hash/fnv"
"sort"
"strings"
"time"
@@ -11,14 +12,42 @@ import (
// we adjust the dataframes to be the way frontend & alerting
// wants them.
func adjustFrame(frame *data.Frame, query *lokiQuery) *data.Frame {
func adjustFrame(frame *data.Frame, query *lokiQuery) error {
fields := frame.Fields
if len(fields) < 2 {
return fmt.Errorf("missing fields in frame")
}
// metric-fields have "timefield, valuefield"
// logs-fields have "labelsfield, timefield, ..."
secondField := fields[1]
if secondField.Type() == data.FieldTypeFloat64 {
return adjustMetricFrame(frame, query)
} else {
return adjustLogsFrame(frame, query)
}
}
func adjustMetricFrame(frame *data.Frame, query *lokiQuery) error {
fields := frame.Fields
// we check if the fields are of correct type
if len(fields) != 2 {
return fmt.Errorf("invalid fields in metric frame")
}
timeField := fields[0]
valueField := fields[1]
if (timeField.Type() != data.FieldTypeTime) || (valueField.Type() != data.FieldTypeFloat64) {
return fmt.Errorf("invalid fields in metric frame")
}
labels := getFrameLabels(frame)
timeFields, nonTimeFields := partitionFields(frame)
isMetricFrame := nonTimeFields[0].Type() != data.FieldTypeString
isMetricRange := isMetricFrame && query.QueryType == QueryTypeRange
isMetricRange := query.QueryType == QueryTypeRange
name := formatName(labels, query)
frame.Name = name
@@ -33,48 +62,109 @@ func adjustFrame(frame *data.Frame, query *lokiQuery) *data.Frame {
frame.Meta.ExecutedQueryString = "Expr: " + query.Expr
}
for _, field := range timeFields {
field.Name = "time"
if isMetricRange {
if field.Config == nil {
field.Config = &data.FieldConfig{}
}
field.Config.Interval = float64(query.Step.Milliseconds())
if isMetricRange {
if timeField.Config == nil {
timeField.Config = &data.FieldConfig{}
}
timeField.Config.Interval = float64(query.Step.Milliseconds())
}
for _, field := range nonTimeFields {
field.Name = "value"
if field.Config == nil {
field.Config = &data.FieldConfig{}
}
field.Config.DisplayNameFromDS = name
if valueField.Config == nil {
valueField.Config = &data.FieldConfig{}
}
valueField.Config.DisplayNameFromDS = name
return nil
}
func adjustLogsFrame(frame *data.Frame, query *lokiQuery) error {
// we check if the fields are of correct type and length
fields := frame.Fields
if len(fields) != 3 {
return fmt.Errorf("invalid fields in logs frame")
}
// for streams-dataframes, we need to send to the browser the nanosecond-precision timestamp too.
labelsField := fields[0]
timeField := fields[1]
lineField := fields[2]
if (timeField.Type() != data.FieldTypeTime) || (lineField.Type() != data.FieldTypeString) || (labelsField.Type() != data.FieldTypeString) {
return fmt.Errorf("invalid fields in metric frame")
}
if (timeField.Len() != lineField.Len()) || (timeField.Len() != labelsField.Len()) {
return fmt.Errorf("invalid fields in metric frame")
}
if frame.Meta == nil {
frame.Meta = &data.FrameMeta{}
}
frame.Meta.ExecutedQueryString = "Expr: " + query.Expr
// we need to send to the browser the nanosecond-precision timestamp too.
// usually timestamps become javascript-date-objects in the browser automatically, which only
// have millisecond-precision.
// so we send a separate timestamp-as-string field too.
if !isMetricFrame {
stringTimeField := makeStringTimeField(timeFields[0])
frame.Fields = append(frame.Fields, stringTimeField)
}
stringTimeField := makeStringTimeField(timeField)
return frame
idField, err := makeIdField(stringTimeField, lineField, labelsField, frame.RefID)
if err != nil {
return err
}
frame.Fields = append(frame.Fields, stringTimeField, idField)
return nil
}
func makeStringTimeField(field *data.Field) *data.Field {
length := field.Len()
func makeStringTimeField(timeField *data.Field) *data.Field {
length := timeField.Len()
stringTimestamps := make([]string, length)
for i := 0; i < length; i++ {
if v, ok := field.ConcreteAt(i); ok {
nsNumber := v.(time.Time).UnixNano()
stringTimestamps[i] = fmt.Sprintf("%d", nsNumber)
}
nsNumber := timeField.At(i).(time.Time).UnixNano()
stringTimestamps[i] = fmt.Sprintf("%d", nsNumber)
}
return data.NewField("tsNs", field.Labels.Copy(), stringTimestamps)
return data.NewField("tsNs", timeField.Labels.Copy(), stringTimestamps)
}
func calculateCheckSum(time string, line string, labels string) (string, error) {
input := []byte(line + "_" + labels)
hash := fnv.New32()
_, err := hash.Write(input)
if err != nil {
return "", err
}
return fmt.Sprintf("%s_%x", time, hash.Sum32()), nil
}
func makeIdField(stringTimeField *data.Field, lineField *data.Field, labelsField *data.Field, refId string) (*data.Field, error) {
length := stringTimeField.Len()
ids := make([]string, length)
checksums := make(map[string]int)
for i := 0; i < length; i++ {
time := stringTimeField.At(i).(string)
line := lineField.At(i).(string)
labels := labelsField.At(i).(string)
sum, err := calculateCheckSum(time, line, labels)
if err != nil {
return nil, err
}
sumCount := checksums[sum]
idSuffix := ""
if sumCount > 0 {
// we had this checksum already, we need to do something to make it unique
idSuffix = fmt.Sprintf("_%d", sumCount)
}
checksums[sum] = sumCount + 1
ids[i] = sum + idSuffix + "_" + refId
}
return data.NewField("id", nil, ids), nil
}
func formatNamePrometheusStyle(labels map[string]string) string {
@@ -119,18 +209,3 @@ func getFrameLabels(frame *data.Frame) map[string]string {
return labels
}
func partitionFields(frame *data.Frame) ([]*data.Field, []*data.Field) {
var timeFields []*data.Field
var nonTimeFields []*data.Field
for _, field := range frame.Fields {
if field.Type() == data.FieldTypeTime {
timeFields = append(timeFields, field)
} else {
nonTimeFields = append(nonTimeFields, field)
}
}
return timeFields, nonTimeFields
}

View File

@@ -37,7 +37,56 @@ func TestFormatName(t *testing.T) {
}
func TestAdjustFrame(t *testing.T) {
t.Run("response should be parsed normally", func(t *testing.T) {
t.Run("logs-frame metadata should be set correctly", func(t *testing.T) {
frame := data.NewFrame("",
data.NewField("labels", nil, []string{
`{"level":"info"}`,
`{"level":"error"}`,
`{"level":"error"}`,
`{"level":"info"}`,
}),
data.NewField("time", nil, []time.Time{
time.Date(2022, 1, 2, 3, 4, 5, 6, time.UTC),
time.Date(2022, 1, 2, 3, 5, 5, 6, time.UTC),
time.Date(2022, 1, 2, 3, 5, 5, 6, time.UTC),
time.Date(2022, 1, 2, 3, 6, 5, 6, time.UTC),
}),
data.NewField("line", nil, []string{"line1", "line2", "line2", "line3"}),
)
frame.RefID = "A"
query := &lokiQuery{
Expr: `{type="important"}`,
QueryType: QueryTypeRange,
}
err := adjustFrame(frame, query)
require.NoError(t, err)
fields := frame.Fields
require.Equal(t, 5, len(fields))
tsNsField := fields[3]
require.Equal(t, "tsNs", tsNsField.Name)
require.Equal(t, data.FieldTypeString, tsNsField.Type())
require.Equal(t, 4, tsNsField.Len())
require.Equal(t, "1641092645000000006", tsNsField.At(0))
require.Equal(t, "1641092705000000006", tsNsField.At(1))
require.Equal(t, "1641092705000000006", tsNsField.At(2))
require.Equal(t, "1641092765000000006", tsNsField.At(3))
idField := fields[4]
require.Equal(t, "id", idField.Name)
require.Equal(t, data.FieldTypeString, idField.Type())
require.Equal(t, 4, idField.Len())
require.Equal(t, "1641092645000000006_a36f4e1b_A", idField.At(0))
require.Equal(t, "1641092705000000006_1d77c9ca_A", idField.At(1))
require.Equal(t, "1641092705000000006_1d77c9ca_1_A", idField.At(2))
require.Equal(t, "1641092765000000006_948c1a7d_A", idField.At(3))
})
t.Run("logs-frame id and string-time fields should be created", func(t *testing.T) {
field1 := data.NewField("", nil, make([]time.Time, 0))
field2 := data.NewField("", nil, make([]float64, 0))
field2.Labels = data.Labels{"app": "Application", "tag2": "tag2"}
@@ -52,7 +101,8 @@ func TestAdjustFrame(t *testing.T) {
Step: time.Second * 42,
}
adjustFrame(frame, query)
err := adjustFrame(frame, query)
require.NoError(t, err)
require.Equal(t, frame.Name, "legend Application")
require.Equal(t, frame.Meta.ExecutedQueryString, "Expr: up(ALERTS)\nStep: 42s")
@@ -72,7 +122,8 @@ func TestAdjustFrame(t *testing.T) {
frame := data.NewFrame("test", field1, field2)
frame.SetMeta(&data.FrameMeta{Type: data.FrameTypeTimeSeriesMany})
adjustFrame(frame, query)
err := adjustFrame(frame, query)
require.NoError(t, err)
// to keep the test simple, we assume the
// first field is the time-field

View File

@@ -2,11 +2,13 @@ package loki
import (
"fmt"
"sort"
"time"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logqlmodel/stats"
jsoniter "github.com/json-iterator/go"
)
func parseResponse(value *loghttp.QueryResponse, query *lokiQuery) (data.Frames, error) {
@@ -17,7 +19,10 @@ func parseResponse(value *loghttp.QueryResponse, query *lokiQuery) (data.Frames,
}
for _, frame := range frames {
adjustFrame(frame, query)
err = adjustFrame(frame, query)
if err != nil {
return nil, err
}
}
return frames, nil
@@ -31,7 +36,7 @@ func lokiResponseToDataFrames(value *loghttp.QueryResponse, query *lokiQuery) (d
case loghttp.Vector:
return lokiVectorToDataFrames(res, query, stats), nil
case loghttp.Streams:
return lokiStreamsToDataFrames(res, query, stats), nil
return lokiStreamsToDataFrames(res, query, stats)
default:
return nil, fmt.Errorf("resultType %T not supported{", res)
}
@@ -54,8 +59,8 @@ func lokiMatrixToDataFrames(matrix loghttp.Matrix, query *lokiQuery, stats []dat
values = append(values, float64(k.Value))
}
timeField := data.NewField("", nil, timeVector)
valueField := data.NewField("", tags, values)
timeField := data.NewField("time", nil, timeVector)
valueField := data.NewField("value", tags, values)
frame := data.NewFrame("", timeField, valueField)
frame.SetMeta(&data.FrameMeta{
@@ -79,8 +84,8 @@ func lokiVectorToDataFrames(vector loghttp.Vector, query *lokiQuery, stats []dat
for k, v := range v.Metric {
tags[string(k)] = string(v)
}
timeField := data.NewField("", nil, timeVector)
valueField := data.NewField("", tags, values)
timeField := data.NewField("time", nil, timeVector)
valueField := data.NewField("value", tags, values)
frame := data.NewFrame("", timeField, valueField)
frame.SetMeta(&data.FrameMeta{
@@ -93,35 +98,61 @@ func lokiVectorToDataFrames(vector loghttp.Vector, query *lokiQuery, stats []dat
return frames
}
func lokiStreamsToDataFrames(streams loghttp.Streams, query *lokiQuery, stats []data.QueryStat) data.Frames {
frames := data.Frames{}
// we serialize the labels as an ordered list of pairs
func labelsToString(labels data.Labels) (string, error) {
keys := make([]string, 0, len(labels))
for k := range labels {
keys = append(keys, k)
}
sort.Strings(keys)
labelArray := make([][2]string, 0, len(labels))
for _, k := range keys {
pair := [2]string{k, labels[k]}
labelArray = append(labelArray, pair)
}
bytes, err := jsoniter.Marshal(labelArray)
if err != nil {
return "", err
}
return string(bytes), nil
}
func lokiStreamsToDataFrames(streams loghttp.Streams, query *lokiQuery, stats []data.QueryStat) (data.Frames, error) {
var timeVector []time.Time
var values []string
var labelsVector []string
for _, v := range streams {
tags := make(map[string]string, len(v.Labels))
timeVector := make([]time.Time, 0, len(v.Entries))
values := make([]string, 0, len(v.Entries))
for k, v := range v.Labels {
tags[k] = v
labelsText, err := labelsToString(v.Labels.Map())
if err != nil {
return nil, err
}
for _, k := range v.Entries {
timeVector = append(timeVector, k.Timestamp.UTC())
values = append(values, k.Line)
labelsVector = append(labelsVector, labelsText)
}
timeField := data.NewField("", nil, timeVector)
valueField := data.NewField("", tags, values)
frame := data.NewFrame("", timeField, valueField)
frame.SetMeta(&data.FrameMeta{
Stats: stats,
})
frames = append(frames, frame)
}
return frames
timeField := data.NewField("ts", nil, timeVector)
valueField := data.NewField("line", nil, values)
labelsField := data.NewField("labels", nil, labelsVector)
labelsField.Config = &data.FieldConfig{
// we should have a native json-field-type
Custom: map[string]interface{}{"json": true},
}
frame := data.NewFrame("", labelsField, timeField, valueField)
frame.SetMeta(&data.FrameMeta{
Stats: stats,
})
return data.Frames{frame}, nil
}
func parseStats(result stats.Result) []data.QueryStat {

File diff suppressed because one or more lines are too long

View File

@@ -5,8 +5,8 @@
"result": [
{
"stream": {
"level": "error",
"location": "moon"
"code": "one\",",
"location": "moon🌙"
},
"values": [
[
@@ -17,8 +17,8 @@
},
{
"stream": {
"level": "info",
"location": "moon"
"code": "\",two",
"location": "moon🌙"
},
"values": [
[
@@ -29,6 +29,10 @@
"1645030246277587968",
"log line info 2"
],
[
"1645030246277587968",
"log line info 2"
],
[
"1645030245539423744",
"log line info 3"