Revert "InfluxDB: Response parser improvements (#76852)" (#78224)

This reverts commit 046791e2be.

Is causing malformed frames, and resulting in errors about mismatched field lengths in the logs, No data in the UI, and will cause SSE to panic.

I think this is because of the global slice vars since it seems to take concurrency to replicate it.
This commit is contained in:
Kyle Brandt
2023-11-15 14:01:36 -05:00
committed by GitHub
parent b5429a456c
commit a94acf4b63
2 changed files with 54 additions and 56 deletions

View File

@@ -16,16 +16,7 @@ import (
)
var (
timeColumn = "time"
timeColumnName = "Time"
valueColumnName = "Value"
legendFormat = regexp.MustCompile(`\[\[([\@\/\w-]+)(\.[\@\/\w-]+)*\]\]*|\$([\@\w-]+?)*`)
timeArray []time.Time
floatArray []*float64
stringArray []*string
boolArray []*bool
)
const (
@@ -75,12 +66,10 @@ func parseJSON(buf io.Reader) (models.Response, error) {
}
func transformRows(rows []models.Row, query models.Query) data.Frames {
// Create a map for faster column name lookups
columnToLowerCase := make(map[string]string)
// pre-allocate frames - this can save many allocations
cols := 0
for _, row := range rows {
for _, column := range row.Columns {
columnToLowerCase[column] = strings.ToLower(column)
}
cols += len(row.Columns)
}
if len(rows) == 0 {
@@ -94,19 +83,24 @@ func transformRows(rows []models.Row, query models.Query) data.Frames {
// It's sized for a reasonably-large name, but will grow if needed.
frameName := make([]byte, 0, 128)
retentionPolicyQuery := isRetentionPolicyQuery(query)
tagValuesQuery := isTagValuesQuery(query)
for _, row := range rows {
var hasTimeCol = false
if _, ok := columnToLowerCase[timeColumn]; ok {
hasTimeCol = true
for _, column := range row.Columns {
if strings.ToLower(column) == "time" {
hasTimeCol = true
}
}
if !hasTimeCol {
newFrame := newFrameWithoutTimeField(row, query)
newFrame := newFrameWithoutTimeField(row, retentionPolicyQuery, tagValuesQuery)
frames = append(frames, newFrame)
} else {
for colIndex, column := range row.Columns {
if columnToLowerCase[column] == timeColumn {
if column == "time" {
continue
}
newFrame := newFrameWithTimeField(row, column, colIndex, query, frameName)
@@ -119,21 +113,20 @@ func transformRows(rows []models.Row, query models.Query) data.Frames {
}
func newFrameWithTimeField(row models.Row, column string, colIndex int, query models.Query, frameName []byte) *data.Frame {
timeArray = timeArray[:0]
floatArray = floatArray[:0]
stringArray = stringArray[:0]
boolArray = boolArray[:0]
var timeArray []time.Time
var floatArray []*float64
var stringArray []*string
var boolArray []*bool
valType := typeof(row.Values, colIndex)
for _, valuePair := range row.Values {
timestamp, timestampErr := parseTimestamp(valuePair[0])
// we only add this row if the timestamp is valid
if timestampErr != nil {
continue
}
timeArray = append(timeArray, timestamp)
switch valType {
case "string":
value, ok := valuePair[colIndex].(string)
@@ -157,19 +150,19 @@ func newFrameWithTimeField(row models.Row, column string, colIndex int, query mo
}
}
timeField := data.NewField(timeColumnName, nil, timeArray)
timeField := data.NewField("Time", nil, timeArray)
var valueField *data.Field
switch valType {
case "string":
valueField = data.NewField(valueColumnName, row.Tags, stringArray)
valueField = data.NewField("Value", row.Tags, stringArray)
case "json.Number":
valueField = data.NewField(valueColumnName, row.Tags, floatArray)
valueField = data.NewField("Value", row.Tags, floatArray)
case "bool":
valueField = data.NewField(valueColumnName, row.Tags, boolArray)
valueField = data.NewField("Value", row.Tags, boolArray)
case "null":
valueField = data.NewField(valueColumnName, row.Tags, floatArray)
valueField = data.NewField("Value", row.Tags, floatArray)
}
name := string(formatFrameName(row, column, query, frameName[:]))
@@ -177,14 +170,35 @@ func newFrameWithTimeField(row models.Row, column string, colIndex int, query mo
return newDataFrame(name, query.RawQuery, timeField, valueField, getVisType(query.ResultFormat))
}
func newFrameWithoutTimeField(row models.Row, query models.Query) *data.Frame {
func newFrameWithoutTimeField(row models.Row, retentionPolicyQuery bool, tagValuesQuery bool) *data.Frame {
var values []string
if retentionPolicyQuery {
values = make([]string, 1, len(row.Values))
} else {
values = make([]string, 0, len(row.Values))
}
for _, valuePair := range row.Values {
if strings.Contains(strings.ToLower(query.RawQuery), strings.ToLower("SHOW TAG VALUES")) {
if tagValuesQuery {
if len(valuePair) >= 2 {
values = append(values, valuePair[1].(string))
}
} else if retentionPolicyQuery {
// We want to know whether the given retention policy is the default one or not.
// If it is default policy then we should add it to the beginning.
// The index 4 gives us if that policy is default or not.
// https://docs.influxdata.com/influxdb/v1.8/query_language/explore-schema/#show-retention-policies
// Only difference is v0.9. In that version we don't receive shardGroupDuration value.
// https://archive.docs.influxdata.com/influxdb/v0.9/query_language/schema_exploration/#show-retention-policies
// Since it is always the last value we will check that last value always.
if len(valuePair) >= 1 {
if valuePair[len(row.Columns)-1].(bool) {
values[0] = valuePair[0].(string)
} else {
values = append(values, valuePair[0].(string))
}
}
} else {
if len(valuePair) >= 1 {
values = append(values, valuePair[0].(string))
@@ -335,3 +349,11 @@ func getVisType(resFormat string) data.VisType {
return graphVisType
}
}
func isTagValuesQuery(query models.Query) bool {
return strings.Contains(strings.ToLower(query.RawQuery), strings.ToLower("SHOW TAG VALUES"))
}
func isRetentionPolicyQuery(query models.Query) bool {
return strings.Contains(strings.ToLower(query.RawQuery), strings.ToLower("SHOW RETENTION POLICIES"))
}

View File

@@ -798,7 +798,7 @@ func TestResponseParser_Parse_RetentionPolicy(t *testing.T) {
query := models.Query{RefID: "metricFindQuery", RawQuery: "SHOW RETENTION POLICIES"}
policyFrame := data.NewFrame("",
data.NewField("Value", nil, []string{
"autogen", "bar", "5m_avg", "1m_avg",
"bar", "autogen", "5m_avg", "1m_avg",
}),
)
@@ -930,27 +930,3 @@ func TestResponseParser_Parse(t *testing.T) {
})
}
}
func TestParseTimestamp(t *testing.T) {
validValue := json.Number("1609459200000") // Milliseconds since epoch (January 1, 2021)
invalidValue := "invalid"
t.Run("ValidTimestamp", func(t *testing.T) {
parsedTime, err := parseTimestamp(validValue)
if err != nil {
t.Errorf("Expected no error, got: %v", err)
}
expectedTime := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC)
if !parsedTime.Equal(expectedTime) {
t.Errorf("Expected time: %v, got: %v", expectedTime, parsedTime)
}
})
t.Run("InvalidTimestamp", func(t *testing.T) {
_, err := parseTimestamp(invalidValue)
if err == nil {
t.Errorf("Expected an error, got nil")
}
})
}