diff --git a/pkg/tsdb/influxdb/influxql/response_parser.go b/pkg/tsdb/influxdb/influxql/response_parser.go index 22972d209ca..bd09676cb8f 100644 --- a/pkg/tsdb/influxdb/influxql/response_parser.go +++ b/pkg/tsdb/influxdb/influxql/response_parser.go @@ -16,7 +16,16 @@ import ( ) var ( + timeColumn = "time" + timeColumnName = "Time" + valueColumnName = "Value" + legendFormat = regexp.MustCompile(`\[\[([\@\/\w-]+)(\.[\@\/\w-]+)*\]\]*|\$([\@\w-]+?)*`) + + timeArray []time.Time + floatArray []*float64 + stringArray []*string + boolArray []*bool ) const ( @@ -66,35 +75,34 @@ func parseJSON(buf io.Reader) (models.Response, error) { } func transformRows(rows []models.Row, query models.Query) data.Frames { - // pre-allocate frames - this can save many allocations - cols := 0 + // Create a map for faster column name lookups + columnToLowerCase := make(map[string]string) for _, row := range rows { - cols += len(row.Columns) + for _, column := range row.Columns { + columnToLowerCase[column] = strings.ToLower(column) + } } - frames := make([]*data.Frame, 0, len(rows)+cols) + + // Preallocate for the worst-case scenario + frames := make([]*data.Frame, 0, len(rows)*len(rows[0].Columns)) // frameName is pre-allocated. So we can reuse it, saving memory. // It's sized for a reasonably-large name, but will grow if needed. frameName := make([]byte, 0, 128) - retentionPolicyQuery := isRetentionPolicyQuery(query) - tagValuesQuery := isTagValuesQuery(query) - for _, row := range rows { var hasTimeCol = false - for _, column := range row.Columns { - if strings.ToLower(column) == "time" { - hasTimeCol = true - } + if _, ok := columnToLowerCase[timeColumn]; ok { + hasTimeCol = true } if !hasTimeCol { - newFrame := newFrameWithoutTimeField(row, retentionPolicyQuery, tagValuesQuery) + newFrame := newFrameWithoutTimeField(row, query) frames = append(frames, newFrame) } else { for colIndex, column := range row.Columns { - if column == "time" { + if columnToLowerCase[column] == timeColumn { continue } newFrame := newFrameWithTimeField(row, column, colIndex, query, frameName) @@ -107,20 +115,21 @@ func transformRows(rows []models.Row, query models.Query) data.Frames { } func newFrameWithTimeField(row models.Row, column string, colIndex int, query models.Query, frameName []byte) *data.Frame { - var timeArray []time.Time - var floatArray []*float64 - var stringArray []*string - var boolArray []*bool + timeArray = timeArray[:0] + floatArray = floatArray[:0] + stringArray = stringArray[:0] + boolArray = boolArray[:0] + valType := typeof(row.Values, colIndex) for _, valuePair := range row.Values { timestamp, timestampErr := parseTimestamp(valuePair[0]) - // we only add this row if the timestamp is valid if timestampErr != nil { continue } timeArray = append(timeArray, timestamp) + switch valType { case "string": value, ok := valuePair[colIndex].(string) @@ -144,19 +153,19 @@ func newFrameWithTimeField(row models.Row, column string, colIndex int, query mo } } - timeField := data.NewField("Time", nil, timeArray) + timeField := data.NewField(timeColumnName, nil, timeArray) var valueField *data.Field switch valType { case "string": - valueField = data.NewField("Value", row.Tags, stringArray) + valueField = data.NewField(valueColumnName, row.Tags, stringArray) case "json.Number": - valueField = data.NewField("Value", row.Tags, floatArray) + valueField = data.NewField(valueColumnName, row.Tags, floatArray) case "bool": - valueField = data.NewField("Value", row.Tags, boolArray) + valueField = data.NewField(valueColumnName, row.Tags, boolArray) case "null": - valueField = data.NewField("Value", row.Tags, floatArray) + valueField = data.NewField(valueColumnName, row.Tags, floatArray) } name := string(formatFrameName(row, column, query, frameName[:])) @@ -164,35 +173,14 @@ func newFrameWithTimeField(row models.Row, column string, colIndex int, query mo return newDataFrame(name, query.RawQuery, timeField, valueField, getVisType(query.ResultFormat)) } -func newFrameWithoutTimeField(row models.Row, retentionPolicyQuery bool, tagValuesQuery bool) *data.Frame { +func newFrameWithoutTimeField(row models.Row, query models.Query) *data.Frame { var values []string - if retentionPolicyQuery { - values = make([]string, 1, len(row.Values)) - } else { - values = make([]string, 0, len(row.Values)) - } - for _, valuePair := range row.Values { - if tagValuesQuery { + if strings.Contains(strings.ToLower(query.RawQuery), strings.ToLower("SHOW TAG VALUES")) { if len(valuePair) >= 2 { values = append(values, valuePair[1].(string)) } - } else if retentionPolicyQuery { - // We want to know whether the given retention policy is the default one or not. - // If it is default policy then we should add it to the beginning. - // The index 4 gives us if that policy is default or not. - // https://docs.influxdata.com/influxdb/v1.8/query_language/explore-schema/#show-retention-policies - // Only difference is v0.9. In that version we don't receive shardGroupDuration value. - // https://archive.docs.influxdata.com/influxdb/v0.9/query_language/schema_exploration/#show-retention-policies - // Since it is always the last value we will check that last value always. - if len(valuePair) >= 1 { - if valuePair[len(row.Columns)-1].(bool) { - values[0] = valuePair[0].(string) - } else { - values = append(values, valuePair[0].(string)) - } - } } else { if len(valuePair) >= 1 { values = append(values, valuePair[0].(string)) @@ -342,11 +330,3 @@ func getVisType(resFormat string) data.VisType { return graphVisType } } - -func isTagValuesQuery(query models.Query) bool { - return strings.Contains(strings.ToLower(query.RawQuery), strings.ToLower("SHOW TAG VALUES")) -} - -func isRetentionPolicyQuery(query models.Query) bool { - return strings.Contains(strings.ToLower(query.RawQuery), strings.ToLower("SHOW RETENTION POLICIES")) -} diff --git a/pkg/tsdb/influxdb/influxql/response_parser_test.go b/pkg/tsdb/influxdb/influxql/response_parser_test.go index b089542f8eb..b84eeb1860f 100644 --- a/pkg/tsdb/influxdb/influxql/response_parser_test.go +++ b/pkg/tsdb/influxdb/influxql/response_parser_test.go @@ -739,7 +739,7 @@ func TestResponseParser_Parse_RetentionPolicy(t *testing.T) { query := models.Query{RefID: "metricFindQuery", RawQuery: "SHOW RETENTION POLICIES"} policyFrame := data.NewFrame("", data.NewField("Value", nil, []string{ - "bar", "autogen", "5m_avg", "1m_avg", + "autogen", "bar", "5m_avg", "1m_avg", }), ) @@ -871,3 +871,27 @@ func TestResponseParser_Parse(t *testing.T) { }) } } + +func TestParseTimestamp(t *testing.T) { + validValue := json.Number("1609459200000") // Milliseconds since epoch (January 1, 2021) + invalidValue := "invalid" + + t.Run("ValidTimestamp", func(t *testing.T) { + parsedTime, err := parseTimestamp(validValue) + if err != nil { + t.Errorf("Expected no error, got: %v", err) + } + + expectedTime := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC) + if !parsedTime.Equal(expectedTime) { + t.Errorf("Expected time: %v, got: %v", expectedTime, parsedTime) + } + }) + + t.Run("InvalidTimestamp", func(t *testing.T) { + _, err := parseTimestamp(invalidValue) + if err == nil { + t.Errorf("Expected an error, got nil") + } + }) +}