Osdu: Split blocking and non-blocking api, and fix race condition on parquet data.

This commit is contained in:
Kristian Bendiksen
2024-07-10 10:27:46 +02:00
parent 56e139b9d6
commit 122b724a83
3 changed files with 70 additions and 29 deletions

View File

@@ -21,6 +21,8 @@
#include <limits>
#include "cafAssert.h"
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
@@ -87,6 +89,11 @@ RiaOsduConnector::RiaOsduConnector( QObject* parent,
SIGNAL( authorizationCallbackReceived( const QVariantMap& ) ),
this,
SLOT( authorizationCallbackReceived( const QVariantMap& ) ) );
connect( this,
SIGNAL( parquetDownloadFinished( const QByteArray&, const QString&, const QString& ) ),
this,
SLOT( parquetDownloadComplete( const QByteArray&, const QString&, const QString& ) ) );
}
//--------------------------------------------------------------------------------------------------
@@ -155,6 +162,7 @@ void RiaOsduConnector::clearCachedData()
m_wellboreTrajectories.clear();
m_wellLogs.clear();
m_parquetData.clear();
m_parquetErrors.clear();
}
//--------------------------------------------------------------------------------------------------
@@ -779,66 +787,92 @@ std::vector<OsduWellboreTrajectory> RiaOsduConnector::wellboreTrajectories( cons
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
std::pair<QByteArray, QString> RiaOsduConnector::requestWellboreTrajectoryParquetDataById( const QString& wellboreTrajectoryId )
void RiaOsduConnector::requestWellboreTrajectoryParquetDataById( const QString& wellboreTrajectoryId )
{
QString url = constructWellboreTrajectoriesDownloadUrl( m_server, wellboreTrajectoryId );
RiaLogging::debug( "Wellbore trajectory URL: " + url );
return requestParquetDataByUrl( url );
requestParquetDataByUrl( url, wellboreTrajectoryId );
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
std::pair<QByteArray, QString> RiaOsduConnector::requestWellLogParquetDataById( const QString& wellLogId )
void RiaOsduConnector::requestWellLogParquetDataById( const QString& wellLogId )
{
QString url = constructWellLogDownloadUrl( m_server, wellLogId );
RiaLogging::debug( "Well log URL: " + url );
return requestParquetDataByUrl( url );
requestParquetDataByUrl( url, wellLogId );
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
std::pair<QByteArray, QString> RiaOsduConnector::requestParquetDataByUrl( const QString& url )
void RiaOsduConnector::requestParquetDataByUrl( const QString& url, const QString& id )
{
QString token = m_token;
requestParquetData( url, m_dataPartitionId, token, id );
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
std::pair<QByteArray, QString> RiaOsduConnector::requestWellboreTrajectoryParquetDataByIdBlocking( const QString& wellboreTrajectoryId )
{
QString url = constructWellboreTrajectoriesDownloadUrl( m_server, wellboreTrajectoryId );
RiaLogging::debug( "Wellbore trajectory URL: " + url );
return requestParquetDataByUrlBlocking( url, wellboreTrajectoryId );
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
std::pair<QByteArray, QString> RiaOsduConnector::requestWellLogParquetDataByIdBlocking( const QString& wellLogId )
{
QString url = constructWellLogDownloadUrl( m_server, wellLogId );
RiaLogging::debug( "Well log URL: " + url );
return requestParquetDataByUrlBlocking( url, wellLogId );
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
std::pair<QByteArray, QString> RiaOsduConnector::requestParquetDataByUrlBlocking( const QString& url, const QString& id )
{
QString token = requestTokenBlocking();
QEventLoop loop;
connect( this,
SIGNAL( parquetDownloadFinished( const QByteArray&, const QString& ) ),
this,
SLOT( parquetDownloadComplete( const QByteArray&, const QString& ) ) );
connect( this, SIGNAL( parquetDownloadFinished( const QByteArray&, const QString& ) ), &loop, SLOT( quit() ) );
requestParquetData( url, m_dataPartitionId, token );
connect( this, SIGNAL( parquetDownloadFinished( const QByteArray&, const QString&, const QString& ) ), &loop, SLOT( quit() ) );
requestParquetData( url, m_dataPartitionId, token, id );
loop.exec();
return { m_parquetData, "" };
QMutexLocker lock( &m_mutex );
return { m_parquetData[id], m_parquetErrors[id] };
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
void RiaOsduConnector::requestParquetData( const QString& url, const QString& dataPartitionId, const QString& token )
void RiaOsduConnector::requestParquetData( const QString& url, const QString& dataPartitionId, const QString& token, const QString& id )
{
RiaLogging::info( "Requesting download of parquet from: " + url );
auto reply = makeDownloadRequest( url, dataPartitionId, token, RiaDefines::contentTypeParquet() );
connect( reply,
&QNetworkReply::finished,
[this, reply, url]()
[this, reply, url, id]()
{
if ( reply->error() == QNetworkReply::NoError )
{
QByteArray contents = reply->readAll();
RiaLogging::info( QString( "Download succeeded: %1 bytes." ).arg( contents.length() ) );
emit parquetDownloadFinished( contents, "" );
emit parquetDownloadFinished( contents, "", id );
}
else
{
QString errorMessage = "Request failed: " + url + " failed." + reply->errorString();
RiaLogging::error( errorMessage );
emit parquetDownloadFinished( QByteArray(), errorMessage );
emit parquetDownloadFinished( QByteArray(), errorMessage, id );
}
} );
}
@@ -846,9 +880,12 @@ void RiaOsduConnector::requestParquetData( const QString& url, const QString& da
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
void RiaOsduConnector::parquetDownloadComplete( const QByteArray& contents, const QString& url )
void RiaOsduConnector::parquetDownloadComplete( const QByteArray& contents, const QString& errorMessage, const QString& id )
{
m_parquetData = contents;
CAF_ASSERT( !id.isEmpty() );
QMutexLocker lock( &m_mutex );
m_parquetData[id] = contents;
m_parquetErrors[id] = errorMessage;
}
//--------------------------------------------------------------------------------------------------

View File

@@ -67,8 +67,11 @@ public:
void requestWellLogsByWellboreId( const QString& wellboreId );
std::vector<OsduWellLog> requestWellLogsByWellboreIdBlocking( const QString& wellboreId );
std::pair<QByteArray, QString> requestWellLogParquetDataById( const QString& wellLogId );
std::pair<QByteArray, QString> requestWellboreTrajectoryParquetDataById( const QString& wellboreTrajectoryId );
void requestWellLogParquetDataById( const QString& wellLogId );
void requestWellboreTrajectoryParquetDataById( const QString& wellboreTrajectoryId );
std::pair<QByteArray, QString> requestWellLogParquetDataByIdBlocking( const QString& wellLogId );
std::pair<QByteArray, QString> requestWellboreTrajectoryParquetDataByIdBlocking( const QString& wellboreTrajectoryId );
QString wellIdForWellboreId( const QString& wellboreId ) const;
@@ -92,10 +95,10 @@ public slots:
void parseWellLogs( QNetworkReply* reply, const QString& wellboreId );
void accessGranted();
void parquetDownloadComplete( const QByteArray&, const QString& url );
void parquetDownloadComplete( const QByteArray&, const QString& url, const QString& id );
signals:
void parquetDownloadFinished( const QByteArray& contents, const QString& url );
void parquetDownloadFinished( const QByteArray& contents, const QString& url, const QString& id );
void fieldsFinished();
void wellsFinished();
void wellboresFinished( const QString& wellId );
@@ -106,10 +109,9 @@ signals:
private slots:
void errorReceived( const QString& error, const QString& errorDescription, const QUrl& uri );
void authorizationCallbackReceived( const QVariantMap& data );
void requestParquetData( const QString& url, const QString& dataPartitionId, const QString& token, const QString& id );
private:
void requestParquetData( const QString& url, const QString& dataPartitionId, const QString& token );
void addStandardHeader( QNetworkRequest& networkRequest, const QString& token, const QString& dataPartitionId, const QString& contentType );
QNetworkReply* makeSearchRequest( const std::map<QString, QString>& parameters,
@@ -136,7 +138,8 @@ private:
static QString constructWellLogDownloadUrl( const QString& server, const QString& wellLogId );
static QString constructWellboreTrajectoriesDownloadUrl( const QString& server, const QString& wellboreTrajectoryId );
std::pair<QByteArray, QString> requestParquetDataByUrl( const QString& url );
std::pair<QByteArray, QString> requestParquetDataByUrlBlocking( const QString& url, const QString& id );
void requestParquetDataByUrl( const QString& url, const QString& id );
QString requestTokenBlocking();
@@ -156,5 +159,6 @@ private:
std::map<QString, std::vector<OsduWellbore>> m_wellbores;
std::map<QString, std::vector<OsduWellboreTrajectory>> m_wellboreTrajectories;
std::map<QString, std::vector<OsduWellLog>> m_wellLogs;
QByteArray m_parquetData;
std::map<QString, QByteArray> m_parquetData;
std::map<QString, QString> m_parquetErrors;
};

View File

@@ -1056,7 +1056,7 @@ std::pair<cvf::ref<RigWellPath>, QString> RimWellPathCollection::loadWellPathGeo
const QString& wellboreTrajectoryId,
double datumElevation )
{
auto [fileContents, errorMessage] = osduConnector->requestWellboreTrajectoryParquetDataById( wellboreTrajectoryId );
auto [fileContents, errorMessage] = osduConnector->requestWellboreTrajectoryParquetDataByIdBlocking( wellboreTrajectoryId );
if ( !errorMessage.isEmpty() )
{
return { nullptr, errorMessage };
@@ -1071,7 +1071,7 @@ std::pair<cvf::ref<RigWellPath>, QString> RimWellPathCollection::loadWellPathGeo
std::pair<cvf::ref<RigOsduWellLogData>, QString> RimWellPathCollection::loadWellLogFromOsdu( RiaOsduConnector* osduConnector,
const QString& wellLogId )
{
auto [fileContents, errorMessage] = osduConnector->requestWellLogParquetDataById( wellLogId );
auto [fileContents, errorMessage] = osduConnector->requestWellLogParquetDataByIdBlocking( wellLogId );
if ( !errorMessage.isEmpty() )
{
return { nullptr, errorMessage };