Sumo: load parameter sensitivities (i.e. "parameters.txt") for ensembles.

This commit is contained in:
Kristian Bendiksen
2024-08-26 08:37:26 +02:00
parent 41256e0b75
commit 44076f4911
5 changed files with 272 additions and 40 deletions

View File

@@ -356,6 +356,99 @@ void RiaSumoConnector::requestRealizationIdsForEnsembleBlocking( const SumoCaseI
wrapAndCallNetworkRequest( requestCallable, signalMethod );
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
QByteArray RiaSumoConnector::requestParametersParquetDataBlocking( const SumoCaseId& caseId, const QString& ensembleName )
{
requestParametersBlobIdForEnsembleBlocking( caseId, ensembleName );
if ( m_blobUrl.empty() ) return {};
auto blobId = m_blobUrl.back();
QEventLoop eventLoop;
QTimer timer;
timer.setSingleShot( true );
QObject::connect( &timer, SIGNAL( timeout() ), &eventLoop, SLOT( quit() ) );
QObject::connect( this, SIGNAL( parquetDownloadFinished( const QByteArray&, const QString& ) ), &eventLoop, SLOT( quit() ) );
requestBlobDownload( blobId );
timer.start( RiaSumoDefines::requestTimeoutMillis() );
eventLoop.exec( QEventLoop::ProcessEventsFlag::ExcludeUserInputEvents );
for ( const auto& blobData : m_redirectInfo )
{
if ( blobData.objectId == blobId )
{
return blobData.contents;
}
}
return {};
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
void RiaSumoConnector::requestParametersBlobIdForEnsembleBlocking( const SumoCaseId& caseId, const QString& ensembleName )
{
auto requestCallable = [this, caseId, ensembleName] { requestParametersBlobIdForEnsemble( caseId, ensembleName ); };
QMetaMethod signalMethod = QMetaMethod::fromSignal( &RiaSumoConnector::blobIdFinished );
wrapAndCallNetworkRequest( requestCallable, signalMethod );
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
void RiaSumoConnector::requestParametersBlobIdForEnsemble( const SumoCaseId& caseId, const QString& ensembleName )
{
requestTokenBlocking();
QString payloadTemplate = R"(
{
"track_total_hits": true,
"query": { "bool": {
"must": [
{"term": {"class": "table"}},
{"term": {"_sumo.parent_object.keyword": "%1"}},
{"term": {"data.tagname.keyword": "all"}},
{"term": {"fmu.iteration.name.keyword": "%2"}},
{"term": {"fmu.aggregation.operation.keyword": "collection"}}
]}
},
"fields": [
"data.name",
"_sumo.blob_name"
],
"size": 1
}
)";
QNetworkRequest m_networkRequest;
m_networkRequest.setUrl( QUrl( m_server + "/api/v1/search" ) );
addStandardHeader( m_networkRequest, token(), RiaCloudDefines::contentTypeJson() );
auto payload = payloadTemplate.arg( caseId.get() ).arg( ensembleName );
auto reply = m_networkAccessManager->post( m_networkRequest, payload.toUtf8() );
connect( reply,
&QNetworkReply::finished,
[this, reply, ensembleName, caseId, payload]()
{
if ( reply->error() == QNetworkReply::NoError )
{
parseBlobIds( reply, caseId, ensembleName, "" );
}
else
{
RiaLogging::error( QString( "Request parameters failed: : '%s'" ).arg( reply->errorString() ) );
}
} );
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------

View File

@@ -93,6 +93,10 @@ public:
void requestRealizationIdsForEnsemble( const SumoCaseId& caseId, const QString& ensembleName );
void requestRealizationIdsForEnsembleBlocking( const SumoCaseId& caseId, const QString& ensembleName );
void requestParametersBlobIdForEnsemble( const SumoCaseId& caseId, const QString& ensembleName );
void requestParametersBlobIdForEnsembleBlocking( const SumoCaseId& caseId, const QString& ensembleName );
QByteArray requestParametersParquetDataBlocking( const SumoCaseId& caseId, const QString& ensembleName );
void requestBlobIdForEnsemble( const SumoCaseId& caseId, const QString& ensembleName, const QString& vectorName );
void requestBlobIdForEnsembleBlocking( const SumoCaseId& caseId, const QString& ensembleName, const QString& vectorName );

View File

@@ -27,6 +27,7 @@
#include "RifEclipseSummaryTools.h"
#include "RifOpmCommonSummary.h"
#include "RifSummaryCaseRestartSelector.h"
#include "Sumo/RimSummaryCaseSumo.h"
#ifdef USE_HDF5
#include "RifHdf5SummaryExporter.h"
@@ -80,11 +81,14 @@ void addCaseRealizationParametersIfFound( RimSummaryCase& sumCase, const QString
parameters = std::shared_ptr<RigCaseRealizationParameters>( new RigCaseRealizationParameters() );
}
int realizationNumber = RifCaseRealizationParametersFileLocator::realizationNumber( modelFolderOrFile );
parameters->setRealizationNumber( realizationNumber );
parameters->addParameter( RiaDefines::summaryRealizationNumber(), realizationNumber );
if ( dynamic_cast<RimSummaryCaseSumo*>( &sumCase ) == nullptr )
{
int realizationNumber = RifCaseRealizationParametersFileLocator::realizationNumber( modelFolderOrFile );
parameters->setRealizationNumber( realizationNumber );
parameters->addParameter( RiaDefines::summaryRealizationNumber(), realizationNumber );
sumCase.setCaseRealizationParameters( parameters );
sumCase.setCaseRealizationParameters( parameters );
}
}
//--------------------------------------------------------------------------------------------------

View File

@@ -119,36 +119,25 @@ void RimSummaryEnsembleSumo::loadSummaryData( const RifEclipseSummaryAddress& re
if ( m_parquetTable.find( key ) == m_parquetTable.end() )
{
auto contents = loadParquetData( key );
RiaLogging::debug( QString( "Load Summary Data. Contents size: %1" ).arg( contents.size() ) );
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::shared_ptr<arrow::io::RandomAccessFile> input = std::make_shared<RifByteArrayArrowRandomAccessFile>( contents );
std::shared_ptr<arrow::Table> table;
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
if ( auto openResult = parquet::arrow::OpenFile( input, pool, &arrow_reader ); openResult.ok() )
{
if ( auto readResult = arrow_reader->ReadTable( &table ); readResult.ok() )
{
RiaLogging::info( QString( "Parquet: Read table successfully for %1" ).arg( QString::fromStdString( resultAddress.uiText() ) ) );
}
else
{
RiaLogging::warning( QString( "Parquet: Error detected during parsing of table. Message: %1" )
.arg( QString::fromStdString( readResult.ToString() ) ) );
}
}
else
{
RiaLogging::warning(
QString( "Parquet: Not able to open data stream. Message: %1" ).arg( QString::fromStdString( openResult.ToString() ) ) );
auto contents = loadParquetData( key );
RiaLogging::debug( QString( "Load Summary Data. Contents size: %1" ).arg( contents.size() ) );
std::shared_ptr<arrow::Table> table = readParquetTable( contents, QString::fromStdString( resultAddress.uiText() ) );
m_parquetTable[key] = table;
distributeDataToRealizations( resultAddress, table );
}
m_parquetTable[key] = table;
{
auto contents = m_sumoConnector->requestParametersParquetDataBlocking( sumoCaseId, sumoEnsembleName );
RiaLogging::debug( QString( "Load ensemble parameter sensitivities. Contents size: %1" ).arg( contents.size() ) );
distributeDataToRealizations( resultAddress, table );
std::shared_ptr<arrow::Table> table =
readParquetTable( contents, QString::fromStdString( resultAddress.uiText() + " parameter sensitivities" ) );
distributeParametersDataToRealizations( table );
}
}
}
@@ -162,6 +151,38 @@ QByteArray RimSummaryEnsembleSumo::loadParquetData( const ParquetKey& parquetKey
return m_sumoConnector->requestParquetDataBlocking( SumoCaseId( parquetKey.caseId ), parquetKey.ensembleId, parquetKey.vectorName );
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
std::shared_ptr<arrow::Table> RimSummaryEnsembleSumo::readParquetTable( const QByteArray& contents, const QString& messageTag )
{
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::shared_ptr<arrow::io::RandomAccessFile> input = std::make_shared<RifByteArrayArrowRandomAccessFile>( contents );
std::shared_ptr<arrow::Table> table;
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
if ( auto openResult = parquet::arrow::OpenFile( input, pool, &arrow_reader ); openResult.ok() )
{
if ( auto readResult = arrow_reader->ReadTable( &table ); readResult.ok() )
{
RiaLogging::info( QString( "Parquet: Read table successfully for %1" ).arg( messageTag ) );
}
else
{
RiaLogging::warning(
QString( "Parquet: Error detected during parsing of table. Message: %1" ).arg( QString::fromStdString( readResult.ToString() ) ) );
}
}
else
{
RiaLogging::warning(
QString( "Parquet: Not able to open data stream. Message: %1" ).arg( QString::fromStdString( openResult.ToString() ) ) );
}
return table;
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
@@ -287,6 +308,122 @@ void RimSummaryEnsembleSumo::distributeDataToRealizations( const RifEclipseSumma
}
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
void RimSummaryEnsembleSumo::distributeParametersDataToRealizations( std::shared_ptr<arrow::Table> table )
{
if ( !table )
{
RiaLogging::warning( "Failed to load table" );
return;
}
{
// print header information
QString txt = "Column Names: ";
for ( std::string columnName : table->ColumnNames() )
{
txt += QString::fromStdString( columnName ) + " (" +
QString::fromStdString( table->GetColumnByName( columnName )->type()->ToString() + ") " );
}
RiaLogging::debug( txt );
}
std::vector<int64_t> realizations;
{
const std::string columnName = "REAL";
std::shared_ptr<arrow::ChunkedArray> column = table->GetColumnByName( columnName );
if ( column && column->type()->id() == arrow::Type::INT64 )
{
realizations = RifArrowTools::chunkedArrayToVector<arrow::Int64Array, int64_t>( column );
}
else
{
RiaLogging::warning( "Failed to find realization column for parameter sensitivities." );
return;
}
}
std::map<std::string, std::vector<double>> doubleValuesForRealizations;
std::map<std::string, std::vector<int64_t>> intValuesForRealizations;
for ( std::string columnName : table->ColumnNames() )
{
if ( columnName != "REAL" )
{
std::shared_ptr<arrow::ChunkedArray> column = table->GetColumnByName( columnName );
if ( column )
{
if ( column->type()->id() == arrow::Type::DOUBLE )
{
std::vector<double> values = RifArrowTools::chunkedArrayToVector<arrow::DoubleArray, double>( column );
doubleValuesForRealizations[columnName] = values;
}
else if ( column->type()->id() == arrow::Type::INT64 )
{
std::vector<int64_t> values = RifArrowTools::chunkedArrayToVector<arrow::Int64Array, int64_t>( column );
intValuesForRealizations[columnName] = values;
}
}
else
{
RiaLogging::warning( QString( "Failed to find values column for %1" ).arg( QString::fromStdString( columnName ) ) );
return;
}
}
}
// find unique realizations
std::set<int16_t> uniqueRealizations;
for ( auto realizationNumber : realizations )
{
uniqueRealizations.insert( realizationNumber );
}
auto findSummaryCase = [this]( int16_t realizationNumber ) -> RimSummaryCaseSumo*
{
for ( auto sumCase : allSummaryCases() )
{
auto sumCaseSumo = dynamic_cast<RimSummaryCaseSumo*>( sumCase );
if ( sumCaseSumo->realizationNumber() == realizationNumber ) return sumCaseSumo;
}
return nullptr;
};
for ( auto realizationNumber : uniqueRealizations )
{
if ( auto summaryCase = findSummaryCase( realizationNumber ) )
{
auto parameters = std::make_shared<RigCaseRealizationParameters>();
parameters->setRealizationNumber( realizationNumber );
parameters->addParameter( RiaDefines::summaryRealizationNumber(), realizationNumber );
for ( std::string columnName : table->ColumnNames() )
{
if ( columnName != "REAL" )
{
if ( auto it = doubleValuesForRealizations.find( columnName ); it != doubleValuesForRealizations.end() )
{
double value = it->second[realizationNumber];
parameters->addParameter( QString::fromStdString( columnName ), value );
}
else if ( auto it = intValuesForRealizations.find( columnName ); it != intValuesForRealizations.end() )
{
double value = it->second[realizationNumber];
parameters->addParameter( QString::fromStdString( columnName ), value );
}
}
}
summaryCase->setCaseRealizationParameters( parameters );
}
}
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
@@ -399,16 +536,6 @@ void RimSummaryEnsembleSumo::onLoadDataAndUpdate()
realization->setShowVectorItemsInProjectTree( m_cases.empty() );
// Create realization parameters, required to make derived ensemble cases work
// See RimDerivedEnsembleCaseCollection::createDerivedEnsembleCases()
auto parameters = std::shared_ptr<RigCaseRealizationParameters>( new RigCaseRealizationParameters() );
int realizationNumber = realId.toInt();
parameters->setRealizationNumber( realizationNumber );
parameters->addParameter( RiaDefines::summaryRealizationNumber(), realizationNumber );
realization->setCaseRealizationParameters( parameters );
m_cases.push_back( realization );
}
}

View File

@@ -82,6 +82,10 @@ private:
void distributeDataToRealizations( const RifEclipseSummaryAddress& resultAddress, std::shared_ptr<arrow::Table> table );
void buildMetaData();
void distributeParametersDataToRealizations( std::shared_ptr<arrow::Table> table );
static std::shared_ptr<arrow::Table> readParquetTable( const QByteArray& contents, const QString& messageTag );
private:
caf::PdmPtrField<RimSummarySumoDataSource*> m_sumoDataSource;