Use template to convert arrow::ChunkedArray

This commit is contained in:
Magne Sjaastad 2024-07-01 14:00:41 +02:00
parent 58b3f0b928
commit f2d79da8be
5 changed files with 173 additions and 33 deletions

View File

@ -18,40 +18,86 @@
#include "RifArrowTools.h" #include "RifArrowTools.h"
#include "cafAssert.h" #include "RifByteArrayArrowRandomAccessFile.h"
#include "RifCsvDataTableFormatter.h"
#include <vector> #include <vector>
//-------------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------------
/// ///
//-------------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------------
std::vector<double> RifArrowTools::convertChunkedArrayToStdVector( const std::shared_ptr<arrow::ChunkedArray>& column ) QString RifArrowTools::readFirstRowsOfTable( const QByteArray& contents )
{ {
auto convertChunkToVector = []( const std::shared_ptr<arrow::Array>& array ) -> std::vector<double> arrow::MemoryPool* pool = arrow::default_memory_pool();
std::shared_ptr<arrow::io::RandomAccessFile> input = std::make_shared<RifByteArrayArrowRandomAccessFile>( contents );
// Open Parquet file reader
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
if ( !parquet::arrow::OpenFile( input, pool, &arrow_reader ).ok() )
{ {
std::vector<double> result; return {};
auto double_array = std::static_pointer_cast<arrow::DoubleArray>( array );
result.resize( double_array->length() );
for ( int64_t i = 0; i < double_array->length(); ++i )
{
result[i] = double_array->Value( i );
}
return result;
};
CAF_ASSERT( column->type()->id() == arrow::Type::DOUBLE );
std::vector<double> result;
// Iterate over each chunk in the column
for ( int i = 0; i < column->num_chunks(); ++i )
{
std::shared_ptr<arrow::Array> chunk = column->chunk( i );
std::vector<double> chunk_vector = convertChunkToVector( chunk );
result.insert( result.end(), chunk_vector.begin(), chunk_vector.end() );
} }
return result; // Read entire file as a single Arrow table
}; std::shared_ptr<arrow::Table> table;
if ( !arrow_reader->ReadTable( &table ).ok() )
{
return {};
}
QString tableText;
QTextStream stream( &tableText );
RifCsvDataTableFormatter formatter( stream, ";" );
std::vector<RifTextDataTableColumn> header;
for ( std::string columnName : table->ColumnNames() )
{
header.push_back( RifTextDataTableColumn( QString::fromStdString( columnName ) ) );
}
formatter.header( header );
std::vector<std::vector<double>> columnVectors;
for ( std::string columnName : table->ColumnNames() )
{
std::shared_ptr<arrow::ChunkedArray> column = table->GetColumnByName( columnName );
auto columnType = column->type()->id();
if ( columnType == arrow::Type::DOUBLE )
{
std::vector<double> columnVector = RifArrowTools::chunkedArrayToVector<arrow::DoubleArray, double>( column );
columnVectors.push_back( columnVector );
}
else if ( columnType == arrow::Type::FLOAT )
{
auto columnVector = RifArrowTools::chunkedArrayToVector<arrow::FloatArray, double>( column );
columnVectors.push_back( columnVector );
}
else if ( columnType == arrow::Type::TIMESTAMP )
{
auto columnVector = RifArrowTools::chunkedArrayToVector<arrow::Int64Array, double>( column );
columnVectors.push_back( columnVector );
}
}
if ( columnVectors.empty() )
{
return {};
}
for ( int i = 0; i < std::min( 20, int( columnVectors[0].size() ) ); i++ )
{
for ( int j = 0; j < int( columnVectors.size() ); j++ )
{
formatter.add( columnVectors[j][i] );
}
formatter.rowCompleted();
}
formatter.tableCompleted();
return tableText;
}

View File

@ -22,15 +22,50 @@
#include <arrow/array/array_primitive.h> #include <arrow/array/array_primitive.h>
#define signals Q_SIGNALS #define signals Q_SIGNALS
#include <limits>
#include <memory> #include <memory>
#include <vector> #include <vector>
#include <QByteArray>
#include <QString>
//================================================================================================== //==================================================================================================
// //
// //
//================================================================================================== //==================================================================================================
class RifArrowTools namespace RifArrowTools
{ {
public:
static std::vector<double> convertChunkedArrayToStdVector( const std::shared_ptr<arrow::ChunkedArray>& column ); // Template class used to handle most of the basic types. Conversiont to std::string requuires a specialization using chunk->GetString(j).
}; template <typename ArrowArrayType, typename CType>
std::vector<CType> chunkedArrayToVector( const std::shared_ptr<arrow::ChunkedArray>& chunkedArray )
{
static_assert( std::is_base_of<arrow::Array, ArrowArrayType>::value, "ArrowArrayType must be derived from arrow::Array" );
std::vector<CType> result;
for ( int i = 0; i < chunkedArray->num_chunks(); ++i )
{
auto chunk = std::static_pointer_cast<ArrowArrayType>( chunkedArray->chunk( i ) );
// Use auto here instead of CType to allow conversion between different types
// Use raw_values() to get the raw data pointer for best performance
const auto* data = chunk->raw_values();
for ( int j = 0; j < chunk->length(); ++j )
{
if ( !chunk->IsNull( j ) )
{
result.push_back( data[j] );
}
else
{
result.push_back( std::numeric_limits<CType>::quiet_NaN() );
}
}
}
return result;
}
QString readFirstRowsOfTable( const QByteArray& contents );
}; // namespace RifArrowTools

View File

@ -65,7 +65,7 @@ std::pair<cvf::ref<RigOsduWellLogData>, QString> RifOsduWellLogReader::readWellL
if ( column->type()->id() == arrow::Type::DOUBLE ) if ( column->type()->id() == arrow::Type::DOUBLE )
{ {
std::vector<double> columnVector = RifArrowTools::convertChunkedArrayToStdVector( column ); std::vector<double> columnVector = RifArrowTools::chunkedArrayToVector<arrow::DoubleArray, double>( column );
logData->setValues( QString::fromStdString( columnName ), columnVector ); logData->setValues( QString::fromStdString( columnName ), columnVector );
} }
} }

View File

@ -143,7 +143,7 @@ std::pair<cvf::ref<RigWellPath>, QString> RifOsduWellPathReader::readWellPathDat
if ( column->type()->id() == arrow::Type::DOUBLE ) if ( column->type()->id() == arrow::Type::DOUBLE )
{ {
std::vector<double> columnVector = RifArrowTools::convertChunkedArrayToStdVector( column ); std::vector<double> columnVector = RifArrowTools::chunkedArrayToVector<arrow::DoubleArray, double>( column );
RiaLogging::debug( QString( "Column name: %1. Size: %2" ).arg( QString::fromStdString( columnName ) ).arg( columnVector.size() ) ); RiaLogging::debug( QString( "Column name: %1. Size: %2" ).arg( QString::fromStdString( columnName ) ).arg( columnVector.size() ) );
readValues[columnName] = columnVector; readValues[columnName] = columnVector;
} }

View File

@ -2,7 +2,10 @@
#include "RiaTestDataDirectory.h" #include "RiaTestDataDirectory.h"
#include "RifArrowTools.h"
#undef signals #undef signals
#include <arrow/array/builder_primitive.h>
#include <arrow/csv/api.h> #include <arrow/csv/api.h>
#include <arrow/io/api.h> #include <arrow/io/api.h>
#include <arrow/scalar.h> #include <arrow/scalar.h>
@ -51,3 +54,59 @@ TEST( RifParquetReaderTest, ReadValidFile )
EXPECT_TRUE( scalar->Equals( arrow::Int64Scalar( i ) ) ); EXPECT_TRUE( scalar->Equals( arrow::Int64Scalar( i ) ) );
} }
} }
TEST( RifParquetReaderTest, ConvertIntChunkedArrays )
{
arrow::Status status;
arrow::Int32Builder int_builder;
status = int_builder.Append( 1 );
status = int_builder.Append( 2 );
status = int_builder.Append( 3 );
std::shared_ptr<arrow::Array> int_array;
status = int_builder.Finish( &int_array );
auto int_chunked_array = std::make_shared<arrow::ChunkedArray>( int_array );
{
auto columnVector = RifArrowTools::chunkedArrayToVector<arrow::FloatArray, double>( int_chunked_array );
EXPECT_EQ( columnVector.size(), 3 );
}
{
auto columnVector = RifArrowTools::chunkedArrayToVector<arrow::Int32Array, int>( int_chunked_array );
EXPECT_EQ( columnVector.size(), 3 );
}
}
TEST( RifParquetReaderTest, ConvertFloatChunkedArrays )
{
arrow::Status status;
// Create an Arrow double array
std::vector<double> values = { 1.0, 2.0, 3.0, 4.0 };
std::shared_ptr<arrow::Array> array;
arrow::DoubleBuilder builder;
status = builder.AppendValues( values );
status = builder.Finish( &array );
// Create a chunked array from the Arrow array
std::shared_ptr<arrow::ChunkedArray> chunkedArray = std::make_shared<arrow::ChunkedArray>( array );
// Call the function under test
auto resultVector = RifArrowTools::chunkedArrayToVector<arrow::DoubleArray, double>( chunkedArray );
// Assert that the returned vector contains the expected values
ASSERT_EQ( resultVector.size(), values.size() );
for ( size_t i = 0; i < values.size(); ++i )
{
EXPECT_DOUBLE_EQ( resultVector[i], values[i] );
}
auto floatVector = RifArrowTools::chunkedArrayToVector<arrow::DoubleArray, float>( chunkedArray );
ASSERT_EQ( floatVector.size(), values.size() );
for ( size_t i = 0; i < values.size(); ++i )
{
EXPECT_DOUBLE_EQ( floatVector[i], values[i] );
}
}