InfluxDB: new row value types (#44789)

* Check type and act accordingly

* Add string type

* Add bool type

* Change method name in test

* Remove comment

* Changed test var names to represent the float case

* Added string test case

* Added bool test case

* Update response in test

* Change string val

* Fix frame meta missing in tests

* Fixed test parse query

* Fixed out of bounds test

* parseFloatSeries

* parseStringSeries and parseBoolSeries

* Formatting

* Use multi frames for now

* strings.ToLower for time col check

* Move timeField out of if checks
This commit is contained in:
Joey Tawadrous 2022-02-18 17:37:45 +00:00 committed by GitHub
parent c04b0179ff
commit 955b921c88
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 89 additions and 40 deletions

View File

@ -59,7 +59,7 @@ func transformRows(rows []Row, query Query) data.Frames {
var hasTimeCol = false
for _, column := range row.Columns {
if column == "time" {
if strings.ToLower(column) == "time" {
hasTimeCol = true
}
}
@ -82,32 +82,50 @@ func transformRows(rows []Row, query Query) data.Frames {
field := data.NewField("value", nil, values)
frames = append(frames, data.NewFrame(row.Name, field))
} else {
for columnIndex, column := range row.Columns {
for colIndex, column := range row.Columns {
if column == "time" {
continue
}
var timeArray []time.Time
var valueArray []*float64
var floatArray []*float64
var stringArray []string
var boolArray []bool
valType := typeof(row.Values[0][colIndex])
name := formatFrameName(row, column, query)
for _, valuePair := range row.Values {
timestamp, timestampErr := parseTimestamp(valuePair[0])
// we only add this row if the timestamp is valid
if timestampErr == nil {
value := parseValue(valuePair[columnIndex])
timeArray = append(timeArray, timestamp)
valueArray = append(valueArray, value)
if valType == "string" {
value := valuePair[colIndex].(string)
stringArray = append(stringArray, value)
} else if valType == "json.Number" {
value := parseNumber(valuePair[colIndex])
floatArray = append(floatArray, value)
} else if valType == "bool" {
value := valuePair[colIndex].(bool)
boolArray = append(boolArray, value)
}
}
}
name := formatFrameName(row, column, query)
timeField := data.NewField("time", nil, timeArray)
valueField := data.NewField("value", row.Tags, valueArray)
// set a nice name on the value-field
valueField.SetConfig(&data.FieldConfig{DisplayNameFromDS: name})
frames = append(frames, newDataFrame(name, query.RawQuery, timeField, valueField))
if valType == "string" {
valueField := data.NewField("value", row.Tags, stringArray)
valueField.SetConfig(&data.FieldConfig{DisplayNameFromDS: name})
frames = append(frames, newDataFrame(name, query.RawQuery, timeField, valueField))
} else if valType == "json.Number" {
valueField := data.NewField("value", row.Tags, floatArray)
valueField.SetConfig(&data.FieldConfig{DisplayNameFromDS: name})
frames = append(frames, newDataFrame(name, query.RawQuery, timeField, valueField))
} else if valType == "bool" {
valueField := data.NewField("value", row.Tags, boolArray)
valueField.SetConfig(&data.FieldConfig{DisplayNameFromDS: name})
frames = append(frames, newDataFrame(name, query.RawQuery, timeField, valueField))
}
}
}
}
@ -195,21 +213,15 @@ func parseTimestamp(value interface{}) (time.Time, error) {
return t, nil
}
func parseValue(value interface{}) *float64 {
func typeof(v interface{}) string {
return fmt.Sprintf("%T", v)
}
func parseNumber(value interface{}) *float64 {
// NOTE: we use pointers-to-float64 because we need
// to represent null-json-values. they come for example
// when we do a group-by with fill(null)
// FIXME: the value of an influxdb-query can be:
// - string
// - float
// - integer
// - boolean
//
// here we only handle numeric values. this is probably
// enough for alerting, but later if we want to support
// arbitrary queries, we will have to improve the code
if value == nil {
// this is what json-nulls become
return nil

View File

@ -51,12 +51,12 @@ func TestInfluxdbResponseParser(t *testing.T) {
"series": [
{
"name": "cpu",
"columns": ["time","mean","sum"],
"columns": ["time","mean","path","isActive"],
"tags": {"datacenter": "America"},
"values": [
[111,222,333],
[111,222,333],
[111,null,333]
[111,222,"/usr/path",true],
[111,222,"/usr/path",false],
[111,null,"/usr/path",true]
]
}
]
@ -68,25 +68,62 @@ func TestInfluxdbResponseParser(t *testing.T) {
query := &Query{}
labels, err := data.LabelsFromString("datacenter=America")
require.Nil(t, err)
newField := data.NewField("value", labels, []*float64{
floatField := data.NewField("value", labels, []*float64{
pointer.Float64(222), pointer.Float64(222), nil,
})
newField.Config = &data.FieldConfig{DisplayNameFromDS: "cpu.mean { datacenter: America }"}
testFrame := data.NewFrame("cpu.mean { datacenter: America }",
floatField.Config = &data.FieldConfig{DisplayNameFromDS: "cpu.mean { datacenter: America }"}
floatFrame := data.NewFrame("cpu.mean { datacenter: America }",
data.NewField("time", nil,
[]time.Time{
time.Date(1970, 1, 1, 0, 1, 51, 0, time.UTC),
time.Date(1970, 1, 1, 0, 1, 51, 0, time.UTC),
time.Date(1970, 1, 1, 0, 1, 51, 0, time.UTC),
}),
newField,
floatField,
)
testFrame.Meta = &data.FrameMeta{ExecutedQueryString: "Test raw query"}
floatFrame.Meta = &data.FrameMeta{ExecutedQueryString: "Test raw query"}
stringField := data.NewField("value", labels, []string{
"/usr/path", "/usr/path", "/usr/path",
})
stringField.Config = &data.FieldConfig{DisplayNameFromDS: "cpu.path { datacenter: America }"}
stringFrame := data.NewFrame("cpu.path { datacenter: America }",
data.NewField("time", nil,
[]time.Time{
time.Date(1970, 1, 1, 0, 1, 51, 0, time.UTC),
time.Date(1970, 1, 1, 0, 1, 51, 0, time.UTC),
time.Date(1970, 1, 1, 0, 1, 51, 0, time.UTC),
}),
stringField,
)
stringFrame.Meta = &data.FrameMeta{ExecutedQueryString: "Test raw query"}
boolField := data.NewField("value", labels, []bool{
true, false, true,
})
boolField.Config = &data.FieldConfig{DisplayNameFromDS: "cpu.isActive { datacenter: America }"}
boolFrame := data.NewFrame("cpu.isActive { datacenter: America }",
data.NewField("time", nil,
[]time.Time{
time.Date(1970, 1, 1, 0, 1, 51, 0, time.UTC),
time.Date(1970, 1, 1, 0, 1, 51, 0, time.UTC),
time.Date(1970, 1, 1, 0, 1, 51, 0, time.UTC),
}),
boolField,
)
boolFrame.Meta = &data.FrameMeta{ExecutedQueryString: "Test raw query"}
result := parser.Parse(prepare(response), addQueryToQueries(*query))
frame := result.Responses["A"]
if diff := cmp.Diff(testFrame, frame.Frames[0], data.FrameTestCompareOptions()...); diff != "" {
if diff := cmp.Diff(floatFrame, frame.Frames[0], data.FrameTestCompareOptions()...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
}
if diff := cmp.Diff(stringFrame, frame.Frames[1], data.FrameTestCompareOptions()...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
}
if diff := cmp.Diff(boolFrame, frame.Frames[2], data.FrameTestCompareOptions()...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
}
})
@ -654,18 +691,18 @@ func TestInfluxdbResponseParser(t *testing.T) {
require.EqualError(t, result.Responses["A"].Error, "error parsing query: found THING")
})
t.Run("Influxdb response parser parseValue nil", func(t *testing.T) {
value := parseValue(nil)
t.Run("Influxdb response parser parseNumber nil", func(t *testing.T) {
value := parseNumber(nil)
require.Nil(t, value)
})
t.Run("Influxdb response parser parseValue valid JSON.number", func(t *testing.T) {
value := parseValue(json.Number("95.4"))
t.Run("Influxdb response parser parseNumber valid JSON.number", func(t *testing.T) {
value := parseNumber(json.Number("95.4"))
require.Equal(t, *value, 95.4)
})
t.Run("Influxdb response parser parseValue invalid type", func(t *testing.T) {
value := parseValue("95.4")
t.Run("Influxdb response parser parseNumber invalid type", func(t *testing.T) {
value := parseNumber("95.4")
require.Nil(t, value)
})
@ -677,7 +714,7 @@ func TestInfluxdbResponseParser(t *testing.T) {
require.Equal(t, timestamp.Format(time.RFC3339), "2021-01-02T03:04:05Z")
})
t.Run("Influxdb response parser parseValue invalid type", func(t *testing.T) {
t.Run("Influxdb response parser parseNumber invalid type", func(t *testing.T) {
_, err := parseTimestamp("hello")
require.Error(t, err)
})