InfluxDB: Add support for LIST arrow type (#93227)

Add support for LIST arrow type
This commit is contained in:
Andreas Christou 2024-09-24 18:36:07 +01:00 committed by GitHub
parent 7e61d4f80d
commit 488e71226b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 31 additions and 2 deletions

View File

@ -151,6 +151,9 @@ func newField(f arrow.Field) *data.Field {
return newDataField[time.Time](f) return newDataField[time.Time](f)
case arrow.DURATION: case arrow.DURATION:
return newDataField[int64](f) return newDataField[int64](f)
case arrow.LIST:
nestedType := f.Type.(*arrow.ListType).ElemField()
return newField(nestedType)
default: default:
return newDataField[json.RawMessage](f) return newDataField[json.RawMessage](f)
} }
@ -166,6 +169,8 @@ func newDataField[T any](f arrow.Field) *data.Field {
} }
// copyData copies the contents of an Arrow column into a Data Frame field. // copyData copies the contents of an Arrow column into a Data Frame field.
//
//nolint:gocyclo
func copyData(field *data.Field, col arrow.Array) error { func copyData(field *data.Field, col arrow.Array) error {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
@ -174,7 +179,6 @@ func copyData(field *data.Field, col arrow.Array) error {
}() }()
colData := col.Data() colData := col.Data()
switch col.DataType().ID() { switch col.DataType().ID() {
case arrow.TIMESTAMP: case arrow.TIMESTAMP:
v := array.NewTimestampData(colData) v := array.NewTimestampData(colData)
@ -221,6 +225,19 @@ func copyData(field *data.Field, col arrow.Array) error {
} }
field.Append(json.RawMessage(b)) field.Append(json.RawMessage(b))
} }
case arrow.LIST:
v := array.NewListData(colData)
for i := 0; i < v.Len(); i++ {
sc, err := scalar.GetScalar(v, i)
if err != nil {
return err
}
err = copyData(field, sc.(*scalar.List).Value)
if err != nil {
return err
}
}
case arrow.STRING: case arrow.STRING:
copyBasic[string](field, array.NewStringData(colData)) copyBasic[string](field, array.NewStringData(colData))
case arrow.UINT8: case arrow.UINT8:

View File

@ -37,6 +37,8 @@ func TestNewQueryDataResponse(t *testing.T) {
{Name: "utf8", Type: &arrow.StringType{}}, {Name: "utf8", Type: &arrow.StringType{}},
{Name: "duration", Type: &arrow.DurationType{}}, {Name: "duration", Type: &arrow.DurationType{}},
{Name: "timestamp", Type: &arrow.TimestampType{}}, {Name: "timestamp", Type: &arrow.TimestampType{}},
{Name: "item", Type: arrow.ListOf(&arrow.StringType{})},
}, },
nil, nil,
) )
@ -58,6 +60,8 @@ func TestNewQueryDataResponse(t *testing.T) {
newJSONArray(`["foo", "bar", "baz"]`, &arrow.StringType{}), newJSONArray(`["foo", "bar", "baz"]`, &arrow.StringType{}),
newJSONArray(`[0, 1, -2]`, &arrow.DurationType{}), newJSONArray(`[0, 1, -2]`, &arrow.DurationType{}),
newJSONArray(`[0, 1, 2]`, &arrow.TimestampType{}), newJSONArray(`[0, 1, 2]`, &arrow.TimestampType{}),
newJSONArray(`[["test", "test1", "test2"],[],[]]`, arrow.ListOf(&arrow.StringType{})),
} }
arr := make([]arrow.Array, 0, len(strValues)) arr := make([]arrow.Array, 0, len(strValues))
@ -82,7 +86,7 @@ func TestNewQueryDataResponse(t *testing.T) {
resp := newQueryDataResponse(errReader{RecordReader: reader}, query, metadata.MD{}) resp := newQueryDataResponse(errReader{RecordReader: reader}, query, metadata.MD{})
assert.NoError(t, resp.Error) assert.NoError(t, resp.Error)
assert.Len(t, resp.Frames, 1) assert.Len(t, resp.Frames, 1)
assert.Len(t, resp.Frames[0].Fields, 13) assert.Len(t, resp.Frames[0].Fields, 14)
frame := resp.Frames[0] frame := resp.Frames[0]
f0 := frame.Fields[0] f0 := frame.Fields[0]
@ -156,6 +160,14 @@ func TestNewQueryDataResponse(t *testing.T) {
}, },
extractFieldValues[time.Time](t, f12), extractFieldValues[time.Time](t, f12),
) )
s1 := "test"
s2 := "test1"
s3 := "test2"
f13 := frame.Fields[13]
assert.Equal(t, f13.Name, "item")
assert.Equal(t, f13.Type(), data.FieldTypeNullableString)
assert.Equal(t, []*string{&s1, &s2, &s3}, extractFieldValues[*string](t, f13))
} }
type jsonArray struct { type jsonArray struct {