Influxdb: Response parser performance improvement (#64776)

* Influxdb response parser allocation improvements

* More improvements

* more improvements

* embed file

* use json-iterator

* rename

* use encoding/json
This commit is contained in:
ismail simsek 2023-03-17 20:45:13 +01:00 committed by GitHub
parent 40c5713cbd
commit ae07bf7ce5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 74 additions and 16 deletions

View File

@ -20,6 +20,12 @@ var (
)
func (rp *ResponseParser) Parse(buf io.ReadCloser, queries []Query) *backend.QueryDataResponse {
return rp.parse(buf, queries)
}
// parse is the same as Parse, but without the io.ReadCloser (we don't need to
// close the buffer)
func (*ResponseParser) parse(buf io.Reader, queries []Query) *backend.QueryDataResponse {
resp := backend.NewQueryDataResponse()
response, jsonErr := parseJSON(buf)
@ -45,7 +51,7 @@ func (rp *ResponseParser) Parse(buf io.ReadCloser, queries []Query) *backend.Que
return resp
}
func parseJSON(buf io.ReadCloser) (Response, error) {
func parseJSON(buf io.Reader) (Response, error) {
var response Response
dec := json.NewDecoder(buf)
@ -57,7 +63,17 @@ func parseJSON(buf io.ReadCloser) (Response, error) {
}
func transformRows(rows []Row, query Query) data.Frames {
frames := data.Frames{}
// pre-allocate frames - this can save many allocations
cols := 0
for _, row := range rows {
cols += len(row.Columns)
}
frames := make([]*data.Frame, 0, len(rows)+cols)
// frameName is pre-allocated so we can reuse it, saving memory.
// It's sized for a reasonably-large name, but will grow if needed.
frameName := make([]byte, 0, 128)
for _, row := range rows {
var hasTimeCol = false
@ -95,7 +111,6 @@ func transformRows(rows []Row, query Query) data.Frames {
var stringArray []*string
var boolArray []*bool
valType := typeof(row.Values, colIndex)
name := formatFrameName(row, column, query)
for _, valuePair := range row.Values {
timestamp, timestampErr := parseTimestamp(valuePair[0])
@ -128,6 +143,8 @@ func transformRows(rows []Row, query Query) data.Frames {
}
}
name := string(formatFrameName(row, column, query, frameName[:]))
timeField := data.NewField("time", nil, timeArray)
if valType == "string" {
valueField := data.NewField("value", row.Tags, stringArray)
@ -162,9 +179,9 @@ func newDataFrame(name string, queryString string, timeField *data.Field, valueF
return frame
}
func formatFrameName(row Row, column string, query Query) string {
func formatFrameName(row Row, column string, query Query, frameName []byte) []byte {
if query.Alias == "" {
return buildFrameNameFromQuery(row, column)
return buildFrameNameFromQuery(row, column, frameName)
}
nameSegment := strings.Split(row.Name, ".")
@ -199,21 +216,32 @@ func formatFrameName(row Row, column string, query Query) string {
return in
})
return string(result)
return result
}
func buildFrameNameFromQuery(row Row, column string) string {
tags := make([]string, 0, len(row.Tags))
for k, v := range row.Tags {
tags = append(tags, fmt.Sprintf("%s: %s", k, v))
func buildFrameNameFromQuery(row Row, column string, frameName []byte) []byte {
frameName = append(frameName, row.Name...)
frameName = append(frameName, '.')
frameName = append(frameName, column...)
if len(row.Tags) > 0 {
frameName = append(frameName, ' ', '{', ' ')
first := true
for k, v := range row.Tags {
if !first {
frameName = append(frameName, ' ')
} else {
first = false
}
frameName = append(frameName, k...)
frameName = append(frameName, ':', ' ')
frameName = append(frameName, v...)
}
frameName = append(frameName, ' ', '}')
}
tagText := ""
if len(tags) > 0 {
tagText = fmt.Sprintf(" { %s }", strings.Join(tags, " "))
}
return fmt.Sprintf("%s.%s%s", row.Name, column, tagText)
return frameName
}
func parseTimestamp(value interface{}) (time.Time, error) {

View File

@ -0,0 +1,29 @@
package influxdb
import (
_ "embed"
"strings"
"testing"
"github.com/stretchr/testify/require"
)
//go:embed testdata/response.json
var testResponse string
// go test -benchmem -run=^$ -memprofile memprofile.out -count=10 -bench ^BenchmarkParseJson$ github.com/grafana/grafana/pkg/tsdb/influxdb
// go tool pprof -http=localhost:9999 memprofile.out
func BenchmarkParseJson(b *testing.B) {
parser := &ResponseParser{}
query := &Query{}
queries := addQueryToQueries(*query)
b.ResetTimer()
for n := 0; n < b.N; n++ {
buf := strings.NewReader(testResponse)
result := parser.parse(buf, queries)
require.NotNil(b, result.Responses["A"].Frames)
require.NoError(b, result.Responses["A"].Error)
}
}

View File

@ -0,0 +1 @@
{"results":[{"statement_id":0,"series":[{"name":"series_name","tags":{"series_tag_1":"-248","series_tag_2":"3167640"},"columns":["time","col_1_name","timestamp","c_name","mem_used","dbytes","ubytes","dused","jitter","free","late","core","ploss","idletime","sol","isp","vpn"],"values":[[1678723623474,"589051IR",1678734134503,"",16.726718841369227,null,null,53,null,48.72,null,null,null,null,"BYOD",null,null]]},{"name":"series_name","tags":{"series_tag_1":"-258","series_tag_2":"0"},"columns":["time","col_1_name","timestamp","c_name","mem_used","dbytes","ubytes","dused","jitter","free","late","core","ploss","idletime","sol","isp","vpn"],"values":[[1678723623474,"tertggdfs",1678734188679,"",23.432988173518233,null,null,64,null,70.92,null,null,null,null,null,null,null]]},{"name":"series_name","tags":{"series_tag_1":"-28","series_tag_2":"0"},"columns":["time","col_1_name","timestamp","c_name","mem_used","dbytes","ubytes","dused","jitter","free","late","core","ploss","idletime","sol","isp","vpn"],"values":[[1678723623474,null,null,"Unknown",null,null,null,null,null,null,null,null,null,null,null,null,null]]},{"name":"series_name","tags":{"series_tag_1":"-28","series_tag_2":"2249908"},"columns":["time","col_1_name","timestamp","c_name","mem_used","dbytes","ubytes","dused","jitter","free","late","core","ploss","idletime","sol","isp","vpn"],"values":[[1678723623474,"ASDWQ234",1678734167565,"",10.772527844239201,null,null,25,null,58.33,null,null,null,null,null,null,null]]},{"name":"series_name","tags":{"series_tag_1":"-327","series_tag_2":"3415374"},"columns":["time","col_1_name","timestamp","c_name","mem_used","dbytes","ubytes","dused","jitter","free","late","core","ploss","idletime","sol","isp","vpn"],"values":[[1678723623474,"123SDQWE",1678734222642,"ADSADS",40.61198862661182,null,null,33,null,51.41,null,null,null,null,"SCCM",null,null]]},{"name":"series_name","tags":{"series_tag_1":"-331","series_tag_2":"0"},"columns":["time","col_1_name","timestamp","c_name","mem_used","dbytes","ubytes","dused","jitter","free","late","core","ploss","idletime","sol","isp","vpn"],"values":[[1678723623474,null,null,"Unknown",null,null,null,null,null,null,null,null,null,null,null,null,null]]},{"name":"series_name","tags":{"series_tag_1":"-331","series_tag_2":"2191438"},"columns":["time","col_1_name","timestamp","c_name","mem_used","dbytes","ubytes","dused","jitter","free","late","core","ploss","idletime","sol","isp","vpn"],"values":[[1678723623474,"SDF32R",1678734212872,"",15.267734587695077,null,null,43,null,58.43,null,null,null,null,null,null,null]]},{"name":"series_name","tags":{"series_tag_1":"0","series_tag_2":"0"},"columns":["time","col_1_name","timestamp","c_name","mem_used","dbytes","ubytes","dused","jitter","free","late","core","ploss","idletime","sol","isp","vpn"],"values":[[1678723623474,null,null,"Unknown",null,null,null,null,null,null,null,null,null,null,null,null,null]]},{"name":"series_name","tags":{"series_tag_1":"00-00-00-08-1E-0C","series_tag_2":"0"},"columns":["time","col_1_name","timestamp","c_name","mem_used","dbytes","ubytes","dused","jitter","free","late","core","ploss","idletime","sol","isp","vpn"],"values":[[1678723623474,null,null,"Unknown",null,null,null,null,null,null,null,null,null,null,null,null,null]]}]}]}