InfluxDB: Response parser improvements (#76852)

* remove retention policy lookup

* Back to one big function

* %10 less memory allocation

pkg: github.com/grafana/grafana/pkg/tsdb/influxdb/influxql
                │    1.txt    │                2.txt                │
                │   sec/op    │   sec/op     vs base                │
ParseBigJson-10   540.9m ± 3%   474.0m ± 2%  -12.37% (p=0.000 n=10)

                │    1.txt     │                2.txt                │
                │     B/op     │     B/op      vs base               │
ParseBigJson-10   580.6Mi ± 0%   573.2Mi ± 0%  -1.28% (p=0.000 n=10)

                │    1.txt     │                2.txt                │
                │  allocs/op   │  allocs/op   vs base                │
ParseBigJson-10   10.123M ± 0%   9.086M ± 0%  -10.25% (p=0.000 n=10)

* Slightly better results comparing with the previous commit
pkg: github.com/grafana/grafana/pkg/tsdb/influxdb/influxql
                │    2.txt    │               3.txt                │
                │   sec/op    │   sec/op     vs base               │
ParseBigJson-10   474.0m ± 1%   503.4m ± 3%  +6.21% (p=0.000 n=10)

                │    2.txt     │                3.txt                │
                │     B/op     │     B/op      vs base               │
ParseBigJson-10   573.2Mi ± 0%   564.0Mi ± 0%  -1.60% (p=0.000 n=10)

                │    2.txt    │               3.txt                │
                │  allocs/op  │  allocs/op   vs base               │
ParseBigJson-10   9.086M ± 0%   9.052M ± 0%  -0.37% (p=0.000 n=10)

* Split into smaller functions

* Unit test for parseTimestamp
This commit is contained in:
ismail simsek 2023-10-27 17:17:19 +02:00 committed by GitHub
parent ff67b03dc8
commit 046791e2be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 59 additions and 55 deletions

View File

@ -16,7 +16,16 @@ import (
)
var (
timeColumn = "time"
timeColumnName = "Time"
valueColumnName = "Value"
legendFormat = regexp.MustCompile(`\[\[([\@\/\w-]+)(\.[\@\/\w-]+)*\]\]*|\$([\@\w-]+?)*`)
timeArray []time.Time
floatArray []*float64
stringArray []*string
boolArray []*bool
)
const (
@ -66,35 +75,34 @@ func parseJSON(buf io.Reader) (models.Response, error) {
}
func transformRows(rows []models.Row, query models.Query) data.Frames {
// pre-allocate frames - this can save many allocations
cols := 0
// Create a map for faster column name lookups
columnToLowerCase := make(map[string]string)
for _, row := range rows {
cols += len(row.Columns)
for _, column := range row.Columns {
columnToLowerCase[column] = strings.ToLower(column)
}
}
frames := make([]*data.Frame, 0, len(rows)+cols)
// Preallocate for the worst-case scenario
frames := make([]*data.Frame, 0, len(rows)*len(rows[0].Columns))
// frameName is pre-allocated. So we can reuse it, saving memory.
// It's sized for a reasonably-large name, but will grow if needed.
frameName := make([]byte, 0, 128)
retentionPolicyQuery := isRetentionPolicyQuery(query)
tagValuesQuery := isTagValuesQuery(query)
for _, row := range rows {
var hasTimeCol = false
for _, column := range row.Columns {
if strings.ToLower(column) == "time" {
hasTimeCol = true
}
if _, ok := columnToLowerCase[timeColumn]; ok {
hasTimeCol = true
}
if !hasTimeCol {
newFrame := newFrameWithoutTimeField(row, retentionPolicyQuery, tagValuesQuery)
newFrame := newFrameWithoutTimeField(row, query)
frames = append(frames, newFrame)
} else {
for colIndex, column := range row.Columns {
if column == "time" {
if columnToLowerCase[column] == timeColumn {
continue
}
newFrame := newFrameWithTimeField(row, column, colIndex, query, frameName)
@ -107,20 +115,21 @@ func transformRows(rows []models.Row, query models.Query) data.Frames {
}
func newFrameWithTimeField(row models.Row, column string, colIndex int, query models.Query, frameName []byte) *data.Frame {
var timeArray []time.Time
var floatArray []*float64
var stringArray []*string
var boolArray []*bool
timeArray = timeArray[:0]
floatArray = floatArray[:0]
stringArray = stringArray[:0]
boolArray = boolArray[:0]
valType := typeof(row.Values, colIndex)
for _, valuePair := range row.Values {
timestamp, timestampErr := parseTimestamp(valuePair[0])
// we only add this row if the timestamp is valid
if timestampErr != nil {
continue
}
timeArray = append(timeArray, timestamp)
switch valType {
case "string":
value, ok := valuePair[colIndex].(string)
@ -144,19 +153,19 @@ func newFrameWithTimeField(row models.Row, column string, colIndex int, query mo
}
}
timeField := data.NewField("Time", nil, timeArray)
timeField := data.NewField(timeColumnName, nil, timeArray)
var valueField *data.Field
switch valType {
case "string":
valueField = data.NewField("Value", row.Tags, stringArray)
valueField = data.NewField(valueColumnName, row.Tags, stringArray)
case "json.Number":
valueField = data.NewField("Value", row.Tags, floatArray)
valueField = data.NewField(valueColumnName, row.Tags, floatArray)
case "bool":
valueField = data.NewField("Value", row.Tags, boolArray)
valueField = data.NewField(valueColumnName, row.Tags, boolArray)
case "null":
valueField = data.NewField("Value", row.Tags, floatArray)
valueField = data.NewField(valueColumnName, row.Tags, floatArray)
}
name := string(formatFrameName(row, column, query, frameName[:]))
@ -164,35 +173,14 @@ func newFrameWithTimeField(row models.Row, column string, colIndex int, query mo
return newDataFrame(name, query.RawQuery, timeField, valueField, getVisType(query.ResultFormat))
}
func newFrameWithoutTimeField(row models.Row, retentionPolicyQuery bool, tagValuesQuery bool) *data.Frame {
func newFrameWithoutTimeField(row models.Row, query models.Query) *data.Frame {
var values []string
if retentionPolicyQuery {
values = make([]string, 1, len(row.Values))
} else {
values = make([]string, 0, len(row.Values))
}
for _, valuePair := range row.Values {
if tagValuesQuery {
if strings.Contains(strings.ToLower(query.RawQuery), strings.ToLower("SHOW TAG VALUES")) {
if len(valuePair) >= 2 {
values = append(values, valuePair[1].(string))
}
} else if retentionPolicyQuery {
// We want to know whether the given retention policy is the default one or not.
// If it is default policy then we should add it to the beginning.
// The index 4 gives us if that policy is default or not.
// https://docs.influxdata.com/influxdb/v1.8/query_language/explore-schema/#show-retention-policies
// Only difference is v0.9. In that version we don't receive shardGroupDuration value.
// https://archive.docs.influxdata.com/influxdb/v0.9/query_language/schema_exploration/#show-retention-policies
// Since it is always the last value we will check that last value always.
if len(valuePair) >= 1 {
if valuePair[len(row.Columns)-1].(bool) {
values[0] = valuePair[0].(string)
} else {
values = append(values, valuePair[0].(string))
}
}
} else {
if len(valuePair) >= 1 {
values = append(values, valuePair[0].(string))
@ -342,11 +330,3 @@ func getVisType(resFormat string) data.VisType {
return graphVisType
}
}
func isTagValuesQuery(query models.Query) bool {
return strings.Contains(strings.ToLower(query.RawQuery), strings.ToLower("SHOW TAG VALUES"))
}
func isRetentionPolicyQuery(query models.Query) bool {
return strings.Contains(strings.ToLower(query.RawQuery), strings.ToLower("SHOW RETENTION POLICIES"))
}

View File

@ -739,7 +739,7 @@ func TestResponseParser_Parse_RetentionPolicy(t *testing.T) {
query := models.Query{RefID: "metricFindQuery", RawQuery: "SHOW RETENTION POLICIES"}
policyFrame := data.NewFrame("",
data.NewField("Value", nil, []string{
"bar", "autogen", "5m_avg", "1m_avg",
"autogen", "bar", "5m_avg", "1m_avg",
}),
)
@ -871,3 +871,27 @@ func TestResponseParser_Parse(t *testing.T) {
})
}
}
func TestParseTimestamp(t *testing.T) {
validValue := json.Number("1609459200000") // Milliseconds since epoch (January 1, 2021)
invalidValue := "invalid"
t.Run("ValidTimestamp", func(t *testing.T) {
parsedTime, err := parseTimestamp(validValue)
if err != nil {
t.Errorf("Expected no error, got: %v", err)
}
expectedTime := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC)
if !parsedTime.Equal(expectedTime) {
t.Errorf("Expected time: %v, got: %v", expectedTime, parsedTime)
}
})
t.Run("InvalidTimestamp", func(t *testing.T) {
_, err := parseTimestamp(invalidValue)
if err == nil {
t.Errorf("Expected an error, got nil")
}
})
}