mirror of
https://github.com/grafana/grafana.git
synced 2024-11-28 11:44:26 -06:00
767 lines
18 KiB
Go
767 lines
18 KiB
Go
package converter
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"sort"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
|
"github.com/grafana/grafana-plugin-sdk-go/experimental"
|
|
jsoniter "github.com/json-iterator/go"
|
|
)
|
|
|
|
// helpful while debugging all the options that may appear
|
|
func logf(format string, a ...interface{}) {
|
|
//fmt.Printf(format, a...)
|
|
}
|
|
|
|
type Options struct {
|
|
MatrixWideSeries bool
|
|
VectorWideSeries bool
|
|
}
|
|
|
|
// ReadPrometheusStyleResult will read results from a prometheus or loki server and return data frames
|
|
func ReadPrometheusStyleResult(iter *jsoniter.Iterator, opt Options) *backend.DataResponse {
|
|
var rsp *backend.DataResponse
|
|
status := "unknown"
|
|
errorType := ""
|
|
err := ""
|
|
warnings := []data.Notice{}
|
|
|
|
for l1Field := iter.ReadObject(); l1Field != ""; l1Field = iter.ReadObject() {
|
|
switch l1Field {
|
|
case "status":
|
|
status = iter.ReadString()
|
|
|
|
case "data":
|
|
rsp = readPrometheusData(iter, opt)
|
|
|
|
case "error":
|
|
err = iter.ReadString()
|
|
|
|
case "errorType":
|
|
errorType = iter.ReadString()
|
|
|
|
case "warnings":
|
|
warnings = readWarnings(iter)
|
|
|
|
default:
|
|
v := iter.Read()
|
|
logf("[ROOT] TODO, support key: %s / %v\n", l1Field, v)
|
|
}
|
|
}
|
|
|
|
if status == "error" {
|
|
return &backend.DataResponse{
|
|
Error: fmt.Errorf("%s: %s", errorType, err),
|
|
}
|
|
}
|
|
|
|
if len(warnings) > 0 {
|
|
for _, frame := range rsp.Frames {
|
|
if frame.Meta == nil {
|
|
frame.Meta = &data.FrameMeta{}
|
|
}
|
|
frame.Meta.Notices = warnings
|
|
}
|
|
}
|
|
|
|
return rsp
|
|
}
|
|
|
|
func readWarnings(iter *jsoniter.Iterator) []data.Notice {
|
|
warnings := []data.Notice{}
|
|
if iter.WhatIsNext() != jsoniter.ArrayValue {
|
|
return warnings
|
|
}
|
|
|
|
for iter.ReadArray() {
|
|
if iter.WhatIsNext() == jsoniter.StringValue {
|
|
notice := data.Notice{
|
|
Severity: data.NoticeSeverityWarning,
|
|
Text: iter.ReadString(),
|
|
}
|
|
warnings = append(warnings, notice)
|
|
}
|
|
}
|
|
|
|
return warnings
|
|
}
|
|
|
|
func readPrometheusData(iter *jsoniter.Iterator, opt Options) *backend.DataResponse {
|
|
t := iter.WhatIsNext()
|
|
if t == jsoniter.ArrayValue {
|
|
return readArrayData(iter)
|
|
}
|
|
|
|
if t != jsoniter.ObjectValue {
|
|
return &backend.DataResponse{
|
|
Error: fmt.Errorf("expected object type"),
|
|
}
|
|
}
|
|
|
|
resultType := ""
|
|
var rsp *backend.DataResponse
|
|
|
|
for l1Field := iter.ReadObject(); l1Field != ""; l1Field = iter.ReadObject() {
|
|
switch l1Field {
|
|
case "resultType":
|
|
resultType = iter.ReadString()
|
|
|
|
case "result":
|
|
switch resultType {
|
|
case "matrix":
|
|
if opt.MatrixWideSeries {
|
|
rsp = readMatrixOrVectorWide(iter, resultType)
|
|
} else {
|
|
rsp = readMatrixOrVectorMulti(iter, resultType)
|
|
}
|
|
case "vector":
|
|
if opt.VectorWideSeries {
|
|
rsp = readMatrixOrVectorWide(iter, resultType)
|
|
} else {
|
|
rsp = readMatrixOrVectorMulti(iter, resultType)
|
|
}
|
|
case "streams":
|
|
rsp = readStream(iter)
|
|
case "string":
|
|
rsp = readString(iter)
|
|
case "scalar":
|
|
rsp = readScalar(iter)
|
|
default:
|
|
iter.Skip()
|
|
rsp = &backend.DataResponse{
|
|
Error: fmt.Errorf("unknown result type: %s", resultType),
|
|
}
|
|
}
|
|
|
|
case "stats":
|
|
v := iter.Read()
|
|
if len(rsp.Frames) > 0 {
|
|
meta := rsp.Frames[0].Meta
|
|
if meta == nil {
|
|
meta = &data.FrameMeta{}
|
|
rsp.Frames[0].Meta = meta
|
|
}
|
|
meta.Custom = map[string]interface{}{
|
|
"stats": v,
|
|
}
|
|
}
|
|
|
|
default:
|
|
v := iter.Read()
|
|
logf("[data] TODO, support key: %s / %v\n", l1Field, v)
|
|
}
|
|
}
|
|
|
|
return rsp
|
|
}
|
|
|
|
// will return strings or exemplars
|
|
func readArrayData(iter *jsoniter.Iterator) *backend.DataResponse {
|
|
lookup := make(map[string]*data.Field)
|
|
|
|
var labelFrame *data.Frame
|
|
rsp := &backend.DataResponse{}
|
|
stringField := data.NewFieldFromFieldType(data.FieldTypeString, 0)
|
|
stringField.Name = "Value"
|
|
for iter.ReadArray() {
|
|
switch iter.WhatIsNext() {
|
|
case jsoniter.StringValue:
|
|
stringField.Append(iter.ReadString())
|
|
|
|
// Either label or exemplars
|
|
case jsoniter.ObjectValue:
|
|
exemplar, labelPairs := readLabelsOrExemplars(iter)
|
|
if exemplar != nil {
|
|
rsp.Frames = append(rsp.Frames, exemplar)
|
|
} else if labelPairs != nil {
|
|
max := 0
|
|
for _, pair := range labelPairs {
|
|
k := pair[0]
|
|
v := pair[1]
|
|
f, ok := lookup[k]
|
|
if !ok {
|
|
f = data.NewFieldFromFieldType(data.FieldTypeString, 0)
|
|
f.Name = k
|
|
lookup[k] = f
|
|
|
|
if labelFrame == nil {
|
|
labelFrame = data.NewFrame("")
|
|
rsp.Frames = append(rsp.Frames, labelFrame)
|
|
}
|
|
labelFrame.Fields = append(labelFrame.Fields, f)
|
|
}
|
|
f.Append(fmt.Sprintf("%v", v))
|
|
if f.Len() > max {
|
|
max = f.Len()
|
|
}
|
|
}
|
|
|
|
// Make sure all fields have equal length
|
|
for _, f := range lookup {
|
|
diff := max - f.Len()
|
|
if diff > 0 {
|
|
f.Extend(diff)
|
|
}
|
|
}
|
|
}
|
|
|
|
default:
|
|
{
|
|
ext := iter.ReadAny()
|
|
v := fmt.Sprintf("%v", ext)
|
|
stringField.Append(v)
|
|
}
|
|
}
|
|
}
|
|
|
|
if stringField.Len() > 0 {
|
|
rsp.Frames = append(rsp.Frames, data.NewFrame("", stringField))
|
|
}
|
|
|
|
return rsp
|
|
}
|
|
|
|
// For consistent ordering read values to an array not a map
|
|
func readLabelsAsPairs(iter *jsoniter.Iterator) [][2]string {
|
|
pairs := make([][2]string, 0, 10)
|
|
for k := iter.ReadObject(); k != ""; k = iter.ReadObject() {
|
|
pairs = append(pairs, [2]string{k, iter.ReadString()})
|
|
}
|
|
return pairs
|
|
}
|
|
|
|
func readLabelsOrExemplars(iter *jsoniter.Iterator) (*data.Frame, [][2]string) {
|
|
pairs := make([][2]string, 0, 10)
|
|
labels := data.Labels{}
|
|
var frame *data.Frame
|
|
|
|
for l1Field := iter.ReadObject(); l1Field != ""; l1Field = iter.ReadObject() {
|
|
switch l1Field {
|
|
case "seriesLabels":
|
|
iter.ReadVal(&labels)
|
|
case "exemplars":
|
|
lookup := make(map[string]*data.Field)
|
|
timeField := data.NewFieldFromFieldType(data.FieldTypeTime, 0)
|
|
timeField.Name = data.TimeSeriesTimeFieldName
|
|
valueField := data.NewFieldFromFieldType(data.FieldTypeFloat64, 0)
|
|
valueField.Name = data.TimeSeriesValueFieldName
|
|
valueField.Labels = labels
|
|
frame = data.NewFrame("", timeField, valueField)
|
|
frame.Meta = &data.FrameMeta{
|
|
Custom: resultTypeToCustomMeta("exemplar"),
|
|
}
|
|
for iter.ReadArray() {
|
|
for l2Field := iter.ReadObject(); l2Field != ""; l2Field = iter.ReadObject() {
|
|
switch l2Field {
|
|
// nolint:goconst
|
|
case "value":
|
|
v, _ := strconv.ParseFloat(iter.ReadString(), 64)
|
|
valueField.Append(v)
|
|
|
|
case "timestamp":
|
|
ts := timeFromFloat(iter.ReadFloat64())
|
|
timeField.Append(ts)
|
|
|
|
case "labels":
|
|
max := 0
|
|
for _, pair := range readLabelsAsPairs(iter) {
|
|
k := pair[0]
|
|
v := pair[1]
|
|
f, ok := lookup[k]
|
|
if !ok {
|
|
f = data.NewFieldFromFieldType(data.FieldTypeString, 0)
|
|
f.Name = k
|
|
lookup[k] = f
|
|
frame.Fields = append(frame.Fields, f)
|
|
}
|
|
f.Append(v)
|
|
if f.Len() > max {
|
|
max = f.Len()
|
|
}
|
|
}
|
|
|
|
// Make sure all fields have equal length
|
|
for _, f := range lookup {
|
|
diff := max - f.Len()
|
|
if diff > 0 {
|
|
f.Extend(diff)
|
|
}
|
|
}
|
|
|
|
default:
|
|
iter.Skip()
|
|
frame.AppendNotices(data.Notice{
|
|
Severity: data.NoticeSeverityError,
|
|
Text: fmt.Sprintf("unable to parse key: %s in response body", l2Field),
|
|
})
|
|
}
|
|
}
|
|
}
|
|
default:
|
|
v := fmt.Sprintf("%v", iter.Read())
|
|
pairs = append(pairs, [2]string{l1Field, v})
|
|
}
|
|
}
|
|
|
|
return frame, pairs
|
|
}
|
|
|
|
func readString(iter *jsoniter.Iterator) *backend.DataResponse {
|
|
timeField := data.NewFieldFromFieldType(data.FieldTypeTime, 0)
|
|
timeField.Name = data.TimeSeriesTimeFieldName
|
|
valueField := data.NewFieldFromFieldType(data.FieldTypeString, 0)
|
|
valueField.Name = data.TimeSeriesValueFieldName
|
|
valueField.Labels = data.Labels{}
|
|
|
|
iter.ReadArray()
|
|
t := iter.ReadFloat64()
|
|
iter.ReadArray()
|
|
v := iter.ReadString()
|
|
iter.ReadArray()
|
|
|
|
tt := timeFromFloat(t)
|
|
timeField.Append(tt)
|
|
valueField.Append(v)
|
|
|
|
frame := data.NewFrame("", timeField, valueField)
|
|
frame.Meta = &data.FrameMeta{
|
|
Type: data.FrameTypeTimeSeriesMulti,
|
|
Custom: resultTypeToCustomMeta("string"),
|
|
}
|
|
|
|
return &backend.DataResponse{
|
|
Frames: []*data.Frame{frame},
|
|
}
|
|
}
|
|
|
|
func readScalar(iter *jsoniter.Iterator) *backend.DataResponse {
|
|
timeField := data.NewFieldFromFieldType(data.FieldTypeTime, 0)
|
|
timeField.Name = data.TimeSeriesTimeFieldName
|
|
valueField := data.NewFieldFromFieldType(data.FieldTypeFloat64, 0)
|
|
valueField.Name = data.TimeSeriesValueFieldName
|
|
valueField.Labels = data.Labels{}
|
|
|
|
t, v, err := readTimeValuePair(iter)
|
|
if err == nil {
|
|
timeField.Append(t)
|
|
valueField.Append(v)
|
|
}
|
|
|
|
frame := data.NewFrame("", timeField, valueField)
|
|
frame.Meta = &data.FrameMeta{
|
|
Type: data.FrameTypeTimeSeriesMulti,
|
|
Custom: resultTypeToCustomMeta("scalar"),
|
|
}
|
|
|
|
return &backend.DataResponse{
|
|
Frames: []*data.Frame{frame},
|
|
}
|
|
}
|
|
|
|
func readMatrixOrVectorWide(iter *jsoniter.Iterator, resultType string) *backend.DataResponse {
|
|
rowIdx := 0
|
|
timeMap := map[int64]int{}
|
|
timeField := data.NewFieldFromFieldType(data.FieldTypeTime, 0)
|
|
timeField.Name = data.TimeSeriesTimeFieldName
|
|
frame := data.NewFrame("", timeField)
|
|
frame.Meta = &data.FrameMeta{
|
|
Type: data.FrameTypeTimeSeriesWide,
|
|
Custom: resultTypeToCustomMeta(resultType),
|
|
}
|
|
rsp := &backend.DataResponse{
|
|
Frames: []*data.Frame{},
|
|
}
|
|
|
|
for iter.ReadArray() {
|
|
valueField := data.NewFieldFromFieldType(data.FieldTypeNullableFloat64, frame.Rows())
|
|
valueField.Name = data.TimeSeriesValueFieldName
|
|
valueField.Labels = data.Labels{}
|
|
frame.Fields = append(frame.Fields, valueField)
|
|
|
|
var histogram *histogramInfo
|
|
|
|
for l1Field := iter.ReadObject(); l1Field != ""; l1Field = iter.ReadObject() {
|
|
switch l1Field {
|
|
case "metric":
|
|
iter.ReadVal(&valueField.Labels)
|
|
|
|
case "value":
|
|
timeMap, rowIdx = addValuePairToFrame(frame, timeMap, rowIdx, iter)
|
|
|
|
// nolint:goconst
|
|
case "values":
|
|
for iter.ReadArray() {
|
|
timeMap, rowIdx = addValuePairToFrame(frame, timeMap, rowIdx, iter)
|
|
}
|
|
|
|
case "histogram":
|
|
if histogram == nil {
|
|
histogram = newHistogramInfo()
|
|
}
|
|
err := readHistogram(iter, histogram)
|
|
if err != nil {
|
|
rsp.Error = err
|
|
}
|
|
|
|
case "histograms":
|
|
if histogram == nil {
|
|
histogram = newHistogramInfo()
|
|
}
|
|
for iter.ReadArray() {
|
|
err := readHistogram(iter, histogram)
|
|
if err != nil {
|
|
rsp.Error = err
|
|
}
|
|
}
|
|
|
|
default:
|
|
iter.Skip()
|
|
logf("readMatrixOrVector: %s\n", l1Field)
|
|
}
|
|
}
|
|
|
|
if histogram != nil {
|
|
histogram.yMin.Labels = valueField.Labels
|
|
frame := data.NewFrame(valueField.Name, histogram.time, histogram.yMin, histogram.yMax, histogram.count, histogram.yLayout)
|
|
frame.Meta = &data.FrameMeta{
|
|
Type: "heatmap-cells",
|
|
}
|
|
if frame.Name == data.TimeSeriesValueFieldName {
|
|
frame.Name = "" // only set the name if useful
|
|
}
|
|
rsp.Frames = append(rsp.Frames, frame)
|
|
}
|
|
}
|
|
|
|
if len(rsp.Frames) == 0 {
|
|
sorter := experimental.NewFrameSorter(frame, frame.Fields[0])
|
|
sort.Sort(sorter)
|
|
rsp.Frames = append(rsp.Frames, frame)
|
|
}
|
|
|
|
return rsp
|
|
}
|
|
|
|
func addValuePairToFrame(frame *data.Frame, timeMap map[int64]int, rowIdx int, iter *jsoniter.Iterator) (map[int64]int, int) {
|
|
timeField := frame.Fields[0]
|
|
valueField := frame.Fields[len(frame.Fields)-1]
|
|
|
|
t, v, err := readTimeValuePair(iter)
|
|
if err != nil {
|
|
return timeMap, rowIdx
|
|
}
|
|
|
|
ns := t.UnixNano()
|
|
i, ok := timeMap[ns]
|
|
if !ok {
|
|
timeMap[ns] = rowIdx
|
|
i = rowIdx
|
|
expandFrame(frame, i)
|
|
rowIdx++
|
|
}
|
|
|
|
timeField.Set(i, t)
|
|
valueField.Set(i, &v)
|
|
|
|
return timeMap, rowIdx
|
|
}
|
|
|
|
func readMatrixOrVectorMulti(iter *jsoniter.Iterator, resultType string) *backend.DataResponse {
|
|
rsp := &backend.DataResponse{}
|
|
|
|
for iter.ReadArray() {
|
|
timeField := data.NewFieldFromFieldType(data.FieldTypeTime, 0)
|
|
timeField.Name = data.TimeSeriesTimeFieldName
|
|
valueField := data.NewFieldFromFieldType(data.FieldTypeFloat64, 0)
|
|
valueField.Name = data.TimeSeriesValueFieldName
|
|
valueField.Labels = data.Labels{}
|
|
|
|
var histogram *histogramInfo
|
|
|
|
for l1Field := iter.ReadObject(); l1Field != ""; l1Field = iter.ReadObject() {
|
|
switch l1Field {
|
|
case "metric":
|
|
iter.ReadVal(&valueField.Labels)
|
|
|
|
case "value":
|
|
t, v, err := readTimeValuePair(iter)
|
|
if err == nil {
|
|
timeField.Append(t)
|
|
valueField.Append(v)
|
|
}
|
|
|
|
// nolint:goconst
|
|
case "values":
|
|
for iter.ReadArray() {
|
|
t, v, err := readTimeValuePair(iter)
|
|
if err == nil {
|
|
timeField.Append(t)
|
|
valueField.Append(v)
|
|
}
|
|
}
|
|
|
|
case "histogram":
|
|
if histogram == nil {
|
|
histogram = newHistogramInfo()
|
|
}
|
|
err := readHistogram(iter, histogram)
|
|
if err != nil {
|
|
rsp.Error = err
|
|
}
|
|
|
|
case "histograms":
|
|
if histogram == nil {
|
|
histogram = newHistogramInfo()
|
|
}
|
|
for iter.ReadArray() {
|
|
err := readHistogram(iter, histogram)
|
|
if err != nil {
|
|
rsp.Error = err
|
|
}
|
|
}
|
|
|
|
default:
|
|
iter.Skip()
|
|
logf("readMatrixOrVector: %s\n", l1Field)
|
|
}
|
|
}
|
|
|
|
if histogram != nil {
|
|
histogram.yMin.Labels = valueField.Labels
|
|
frame := data.NewFrame(valueField.Name, histogram.time, histogram.yMin, histogram.yMax, histogram.count, histogram.yLayout)
|
|
frame.Meta = &data.FrameMeta{
|
|
Type: "heatmap-cells",
|
|
}
|
|
if frame.Name == data.TimeSeriesValueFieldName {
|
|
frame.Name = "" // only set the name if useful
|
|
}
|
|
rsp.Frames = append(rsp.Frames, frame)
|
|
} else {
|
|
frame := data.NewFrame("", timeField, valueField)
|
|
frame.Meta = &data.FrameMeta{
|
|
Type: data.FrameTypeTimeSeriesMulti,
|
|
Custom: resultTypeToCustomMeta(resultType),
|
|
}
|
|
rsp.Frames = append(rsp.Frames, frame)
|
|
}
|
|
}
|
|
|
|
return rsp
|
|
}
|
|
|
|
func readTimeValuePair(iter *jsoniter.Iterator) (time.Time, float64, error) {
|
|
iter.ReadArray()
|
|
t := iter.ReadFloat64()
|
|
iter.ReadArray()
|
|
v := iter.ReadString()
|
|
iter.ReadArray()
|
|
|
|
tt := timeFromFloat(t)
|
|
fv, err := strconv.ParseFloat(v, 64)
|
|
return tt, fv, err
|
|
}
|
|
|
|
func expandFrame(frame *data.Frame, idx int) {
|
|
for _, f := range frame.Fields {
|
|
if idx+1 > f.Len() {
|
|
f.Extend(idx + 1 - f.Len())
|
|
}
|
|
}
|
|
}
|
|
|
|
type histogramInfo struct {
|
|
//XMax (time) YMin Ymax Count YLayout
|
|
time *data.Field
|
|
yMin *data.Field // will have labels?
|
|
yMax *data.Field
|
|
count *data.Field
|
|
yLayout *data.Field
|
|
}
|
|
|
|
func newHistogramInfo() *histogramInfo {
|
|
hist := &histogramInfo{
|
|
time: data.NewFieldFromFieldType(data.FieldTypeTime, 0),
|
|
yMin: data.NewFieldFromFieldType(data.FieldTypeFloat64, 0),
|
|
yMax: data.NewFieldFromFieldType(data.FieldTypeFloat64, 0),
|
|
count: data.NewFieldFromFieldType(data.FieldTypeFloat64, 0),
|
|
yLayout: data.NewFieldFromFieldType(data.FieldTypeInt8, 0),
|
|
}
|
|
hist.time.Name = "xMax"
|
|
hist.yMin.Name = "yMin"
|
|
hist.yMax.Name = "yMax"
|
|
hist.count.Name = "count"
|
|
hist.yLayout.Name = "yLayout"
|
|
return hist
|
|
}
|
|
|
|
// This will read a single sparse histogram
|
|
// [ time, { count, sum, buckets: [...] }]
|
|
func readHistogram(iter *jsoniter.Iterator, hist *histogramInfo) error {
|
|
// first element
|
|
iter.ReadArray()
|
|
t := timeFromFloat(iter.ReadFloat64())
|
|
|
|
var err error
|
|
|
|
// next object element
|
|
iter.ReadArray()
|
|
for l1Field := iter.ReadObject(); l1Field != ""; l1Field = iter.ReadObject() {
|
|
switch l1Field {
|
|
case "count":
|
|
iter.Skip()
|
|
case "sum":
|
|
iter.Skip()
|
|
|
|
case "buckets":
|
|
for iter.ReadArray() {
|
|
hist.time.Append(t)
|
|
|
|
iter.ReadArray()
|
|
hist.yLayout.Append(iter.ReadInt8())
|
|
|
|
iter.ReadArray()
|
|
err = appendValueFromString(iter, hist.yMin)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
iter.ReadArray()
|
|
err = appendValueFromString(iter, hist.yMax)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
iter.ReadArray()
|
|
err = appendValueFromString(iter, hist.count)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if iter.ReadArray() {
|
|
return fmt.Errorf("expected close array")
|
|
}
|
|
}
|
|
|
|
default:
|
|
iter.Skip()
|
|
logf("[SKIP]readHistogram: %s\n", l1Field)
|
|
}
|
|
}
|
|
|
|
if iter.ReadArray() {
|
|
return fmt.Errorf("expected to be done")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func appendValueFromString(iter *jsoniter.Iterator, field *data.Field) error {
|
|
v, err := strconv.ParseFloat(iter.ReadString(), 64)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
field.Append(v)
|
|
return nil
|
|
}
|
|
|
|
func readStream(iter *jsoniter.Iterator) *backend.DataResponse {
|
|
rsp := &backend.DataResponse{}
|
|
|
|
labelsField := data.NewFieldFromFieldType(data.FieldTypeJSON, 0)
|
|
labelsField.Name = "__labels" // avoid automatically spreading this by labels
|
|
|
|
timeField := data.NewFieldFromFieldType(data.FieldTypeTime, 0)
|
|
timeField.Name = "Time"
|
|
|
|
lineField := data.NewFieldFromFieldType(data.FieldTypeString, 0)
|
|
lineField.Name = "Line"
|
|
|
|
// Nanoseconds time field
|
|
tsField := data.NewFieldFromFieldType(data.FieldTypeString, 0)
|
|
tsField.Name = "TS"
|
|
|
|
labels := data.Labels{}
|
|
labelJson, err := labelsToRawJson(labels)
|
|
if err != nil {
|
|
return &backend.DataResponse{Error: err}
|
|
}
|
|
|
|
for iter.ReadArray() {
|
|
for l1Field := iter.ReadObject(); l1Field != ""; l1Field = iter.ReadObject() {
|
|
switch l1Field {
|
|
case "stream":
|
|
// we need to clear `labels`, because `iter.ReadVal`
|
|
// only appends to it
|
|
labels := data.Labels{}
|
|
iter.ReadVal(&labels)
|
|
labelJson, err = labelsToRawJson(labels)
|
|
if err != nil {
|
|
return &backend.DataResponse{Error: err}
|
|
}
|
|
|
|
case "values":
|
|
for iter.ReadArray() {
|
|
iter.ReadArray()
|
|
ts := iter.ReadString()
|
|
iter.ReadArray()
|
|
line := iter.ReadString()
|
|
iter.ReadArray()
|
|
|
|
t := timeFromLokiString(ts)
|
|
|
|
labelsField.Append(labelJson)
|
|
timeField.Append(t)
|
|
lineField.Append(line)
|
|
tsField.Append(ts)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
frame := data.NewFrame("", labelsField, timeField, lineField, tsField)
|
|
frame.Meta = &data.FrameMeta{}
|
|
rsp.Frames = append(rsp.Frames, frame)
|
|
|
|
return rsp
|
|
}
|
|
|
|
func resultTypeToCustomMeta(resultType string) map[string]string {
|
|
return map[string]string{"resultType": resultType}
|
|
}
|
|
|
|
func timeFromFloat(fv float64) time.Time {
|
|
return time.UnixMilli(int64(fv * 1000.0)).UTC()
|
|
}
|
|
|
|
func timeFromLokiString(str string) time.Time {
|
|
// normal time values look like: 1645030246277587968
|
|
// and are less than: math.MaxInt65=9223372036854775807
|
|
// This will do a fast path for any date before 2033
|
|
s := len(str)
|
|
if s < 19 || (s == 19 && str[0] == '1') {
|
|
ns, err := strconv.ParseInt(str, 10, 64)
|
|
if err == nil {
|
|
return time.Unix(0, ns).UTC()
|
|
}
|
|
}
|
|
|
|
ss, _ := strconv.ParseInt(str[0:10], 10, 64)
|
|
ns, _ := strconv.ParseInt(str[10:], 10, 64)
|
|
return time.Unix(ss, ns).UTC()
|
|
}
|
|
|
|
func labelsToRawJson(labels data.Labels) (json.RawMessage, error) {
|
|
// data.Labels when converted to JSON keep the fields sorted
|
|
bytes, err := jsoniter.Marshal(labels)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return json.RawMessage(bytes), nil
|
|
}
|