diff --git a/pkg/tsdb/influxdb/fsql/arrow.go b/pkg/tsdb/influxdb/fsql/arrow.go index 965e17f8357..e8db01c730d 100644 --- a/pkg/tsdb/influxdb/fsql/arrow.go +++ b/pkg/tsdb/influxdb/fsql/arrow.go @@ -151,6 +151,9 @@ func newField(f arrow.Field) *data.Field { return newDataField[time.Time](f) case arrow.DURATION: return newDataField[int64](f) + case arrow.LIST: + nestedType := f.Type.(*arrow.ListType).ElemField() + return newField(nestedType) default: return newDataField[json.RawMessage](f) } @@ -166,6 +169,8 @@ func newDataField[T any](f arrow.Field) *data.Field { } // copyData copies the contents of an Arrow column into a Data Frame field. +// +//nolint:gocyclo func copyData(field *data.Field, col arrow.Array) error { defer func() { if r := recover(); r != nil { @@ -174,7 +179,6 @@ func copyData(field *data.Field, col arrow.Array) error { }() colData := col.Data() - switch col.DataType().ID() { case arrow.TIMESTAMP: v := array.NewTimestampData(colData) @@ -221,6 +225,19 @@ func copyData(field *data.Field, col arrow.Array) error { } field.Append(json.RawMessage(b)) } + case arrow.LIST: + v := array.NewListData(colData) + for i := 0; i < v.Len(); i++ { + sc, err := scalar.GetScalar(v, i) + if err != nil { + return err + } + + err = copyData(field, sc.(*scalar.List).Value) + if err != nil { + return err + } + } case arrow.STRING: copyBasic[string](field, array.NewStringData(colData)) case arrow.UINT8: diff --git a/pkg/tsdb/influxdb/fsql/arrow_test.go b/pkg/tsdb/influxdb/fsql/arrow_test.go index 32d260a1152..1bbdc51e919 100644 --- a/pkg/tsdb/influxdb/fsql/arrow_test.go +++ b/pkg/tsdb/influxdb/fsql/arrow_test.go @@ -37,6 +37,8 @@ func TestNewQueryDataResponse(t *testing.T) { {Name: "utf8", Type: &arrow.StringType{}}, {Name: "duration", Type: &arrow.DurationType{}}, {Name: "timestamp", Type: &arrow.TimestampType{}}, + + {Name: "item", Type: arrow.ListOf(&arrow.StringType{})}, }, nil, ) @@ -58,6 +60,8 @@ func TestNewQueryDataResponse(t *testing.T) { newJSONArray(`["foo", "bar", "baz"]`, &arrow.StringType{}), newJSONArray(`[0, 1, -2]`, &arrow.DurationType{}), newJSONArray(`[0, 1, 2]`, &arrow.TimestampType{}), + + newJSONArray(`[["test", "test1", "test2"],[],[]]`, arrow.ListOf(&arrow.StringType{})), } arr := make([]arrow.Array, 0, len(strValues)) @@ -82,7 +86,7 @@ func TestNewQueryDataResponse(t *testing.T) { resp := newQueryDataResponse(errReader{RecordReader: reader}, query, metadata.MD{}) assert.NoError(t, resp.Error) assert.Len(t, resp.Frames, 1) - assert.Len(t, resp.Frames[0].Fields, 13) + assert.Len(t, resp.Frames[0].Fields, 14) frame := resp.Frames[0] f0 := frame.Fields[0] @@ -156,6 +160,14 @@ func TestNewQueryDataResponse(t *testing.T) { }, extractFieldValues[time.Time](t, f12), ) + + s1 := "test" + s2 := "test1" + s3 := "test2" + f13 := frame.Fields[13] + assert.Equal(t, f13.Name, "item") + assert.Equal(t, f13.Type(), data.FieldTypeNullableString) + assert.Equal(t, []*string{&s1, &s2, &s3}, extractFieldValues[*string](t, f13)) } type jsonArray struct {