From 122b724a836deb6849a83a0b4d13383d79d7510a Mon Sep 17 00:00:00 2001 From: Kristian Bendiksen Date: Wed, 10 Jul 2024 10:27:46 +0200 Subject: [PATCH] Osdu: Split blocking and non-blocking api, and fix race condition on parquet data. --- .../OsduImportCommands/RiaOsduConnector.cpp | 75 ++++++++++++++----- .../OsduImportCommands/RiaOsduConnector.h | 20 +++-- .../WellPath/RimWellPathCollection.cpp | 4 +- 3 files changed, 70 insertions(+), 29 deletions(-) diff --git a/ApplicationLibCode/Commands/OsduImportCommands/RiaOsduConnector.cpp b/ApplicationLibCode/Commands/OsduImportCommands/RiaOsduConnector.cpp index e8aa60997b..efec2e0a02 100644 --- a/ApplicationLibCode/Commands/OsduImportCommands/RiaOsduConnector.cpp +++ b/ApplicationLibCode/Commands/OsduImportCommands/RiaOsduConnector.cpp @@ -21,6 +21,8 @@ #include +#include "cafAssert.h" + //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- @@ -87,6 +89,11 @@ RiaOsduConnector::RiaOsduConnector( QObject* parent, SIGNAL( authorizationCallbackReceived( const QVariantMap& ) ), this, SLOT( authorizationCallbackReceived( const QVariantMap& ) ) ); + + connect( this, + SIGNAL( parquetDownloadFinished( const QByteArray&, const QString&, const QString& ) ), + this, + SLOT( parquetDownloadComplete( const QByteArray&, const QString&, const QString& ) ) ); } //-------------------------------------------------------------------------------------------------- @@ -155,6 +162,7 @@ void RiaOsduConnector::clearCachedData() m_wellboreTrajectories.clear(); m_wellLogs.clear(); m_parquetData.clear(); + m_parquetErrors.clear(); } //-------------------------------------------------------------------------------------------------- @@ -779,66 +787,92 @@ std::vector RiaOsduConnector::wellboreTrajectories( cons //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- -std::pair RiaOsduConnector::requestWellboreTrajectoryParquetDataById( const QString& wellboreTrajectoryId ) +void RiaOsduConnector::requestWellboreTrajectoryParquetDataById( const QString& wellboreTrajectoryId ) { QString url = constructWellboreTrajectoriesDownloadUrl( m_server, wellboreTrajectoryId ); RiaLogging::debug( "Wellbore trajectory URL: " + url ); - return requestParquetDataByUrl( url ); + requestParquetDataByUrl( url, wellboreTrajectoryId ); } //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- -std::pair RiaOsduConnector::requestWellLogParquetDataById( const QString& wellLogId ) +void RiaOsduConnector::requestWellLogParquetDataById( const QString& wellLogId ) { QString url = constructWellLogDownloadUrl( m_server, wellLogId ); RiaLogging::debug( "Well log URL: " + url ); - return requestParquetDataByUrl( url ); + requestParquetDataByUrl( url, wellLogId ); } //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- -std::pair RiaOsduConnector::requestParquetDataByUrl( const QString& url ) +void RiaOsduConnector::requestParquetDataByUrl( const QString& url, const QString& id ) +{ + QString token = m_token; + requestParquetData( url, m_dataPartitionId, token, id ); +} + +//-------------------------------------------------------------------------------------------------- +/// +//-------------------------------------------------------------------------------------------------- +std::pair RiaOsduConnector::requestWellboreTrajectoryParquetDataByIdBlocking( const QString& wellboreTrajectoryId ) +{ + QString url = constructWellboreTrajectoriesDownloadUrl( m_server, wellboreTrajectoryId ); + RiaLogging::debug( "Wellbore trajectory URL: " + url ); + return requestParquetDataByUrlBlocking( url, wellboreTrajectoryId ); +} + +//-------------------------------------------------------------------------------------------------- +/// +//-------------------------------------------------------------------------------------------------- +std::pair RiaOsduConnector::requestWellLogParquetDataByIdBlocking( const QString& wellLogId ) +{ + QString url = constructWellLogDownloadUrl( m_server, wellLogId ); + RiaLogging::debug( "Well log URL: " + url ); + return requestParquetDataByUrlBlocking( url, wellLogId ); +} + +//-------------------------------------------------------------------------------------------------- +/// +//-------------------------------------------------------------------------------------------------- +std::pair RiaOsduConnector::requestParquetDataByUrlBlocking( const QString& url, const QString& id ) { QString token = requestTokenBlocking(); QEventLoop loop; - connect( this, - SIGNAL( parquetDownloadFinished( const QByteArray&, const QString& ) ), - this, - SLOT( parquetDownloadComplete( const QByteArray&, const QString& ) ) ); - connect( this, SIGNAL( parquetDownloadFinished( const QByteArray&, const QString& ) ), &loop, SLOT( quit() ) ); - - requestParquetData( url, m_dataPartitionId, token ); + connect( this, SIGNAL( parquetDownloadFinished( const QByteArray&, const QString&, const QString& ) ), &loop, SLOT( quit() ) ); + requestParquetData( url, m_dataPartitionId, token, id ); loop.exec(); - return { m_parquetData, "" }; + QMutexLocker lock( &m_mutex ); + return { m_parquetData[id], m_parquetErrors[id] }; } //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- -void RiaOsduConnector::requestParquetData( const QString& url, const QString& dataPartitionId, const QString& token ) +void RiaOsduConnector::requestParquetData( const QString& url, const QString& dataPartitionId, const QString& token, const QString& id ) { RiaLogging::info( "Requesting download of parquet from: " + url ); auto reply = makeDownloadRequest( url, dataPartitionId, token, RiaDefines::contentTypeParquet() ); + connect( reply, &QNetworkReply::finished, - [this, reply, url]() + [this, reply, url, id]() { if ( reply->error() == QNetworkReply::NoError ) { QByteArray contents = reply->readAll(); RiaLogging::info( QString( "Download succeeded: %1 bytes." ).arg( contents.length() ) ); - emit parquetDownloadFinished( contents, "" ); + emit parquetDownloadFinished( contents, "", id ); } else { QString errorMessage = "Request failed: " + url + " failed." + reply->errorString(); RiaLogging::error( errorMessage ); - emit parquetDownloadFinished( QByteArray(), errorMessage ); + emit parquetDownloadFinished( QByteArray(), errorMessage, id ); } } ); } @@ -846,9 +880,12 @@ void RiaOsduConnector::requestParquetData( const QString& url, const QString& da //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- -void RiaOsduConnector::parquetDownloadComplete( const QByteArray& contents, const QString& url ) +void RiaOsduConnector::parquetDownloadComplete( const QByteArray& contents, const QString& errorMessage, const QString& id ) { - m_parquetData = contents; + CAF_ASSERT( !id.isEmpty() ); + QMutexLocker lock( &m_mutex ); + m_parquetData[id] = contents; + m_parquetErrors[id] = errorMessage; } //-------------------------------------------------------------------------------------------------- diff --git a/ApplicationLibCode/Commands/OsduImportCommands/RiaOsduConnector.h b/ApplicationLibCode/Commands/OsduImportCommands/RiaOsduConnector.h index 8198e24b6f..ce63f6d5fc 100644 --- a/ApplicationLibCode/Commands/OsduImportCommands/RiaOsduConnector.h +++ b/ApplicationLibCode/Commands/OsduImportCommands/RiaOsduConnector.h @@ -67,8 +67,11 @@ public: void requestWellLogsByWellboreId( const QString& wellboreId ); std::vector requestWellLogsByWellboreIdBlocking( const QString& wellboreId ); - std::pair requestWellLogParquetDataById( const QString& wellLogId ); - std::pair requestWellboreTrajectoryParquetDataById( const QString& wellboreTrajectoryId ); + void requestWellLogParquetDataById( const QString& wellLogId ); + void requestWellboreTrajectoryParquetDataById( const QString& wellboreTrajectoryId ); + + std::pair requestWellLogParquetDataByIdBlocking( const QString& wellLogId ); + std::pair requestWellboreTrajectoryParquetDataByIdBlocking( const QString& wellboreTrajectoryId ); QString wellIdForWellboreId( const QString& wellboreId ) const; @@ -92,10 +95,10 @@ public slots: void parseWellLogs( QNetworkReply* reply, const QString& wellboreId ); void accessGranted(); - void parquetDownloadComplete( const QByteArray&, const QString& url ); + void parquetDownloadComplete( const QByteArray&, const QString& url, const QString& id ); signals: - void parquetDownloadFinished( const QByteArray& contents, const QString& url ); + void parquetDownloadFinished( const QByteArray& contents, const QString& url, const QString& id ); void fieldsFinished(); void wellsFinished(); void wellboresFinished( const QString& wellId ); @@ -106,10 +109,9 @@ signals: private slots: void errorReceived( const QString& error, const QString& errorDescription, const QUrl& uri ); void authorizationCallbackReceived( const QVariantMap& data ); + void requestParquetData( const QString& url, const QString& dataPartitionId, const QString& token, const QString& id ); private: - void requestParquetData( const QString& url, const QString& dataPartitionId, const QString& token ); - void addStandardHeader( QNetworkRequest& networkRequest, const QString& token, const QString& dataPartitionId, const QString& contentType ); QNetworkReply* makeSearchRequest( const std::map& parameters, @@ -136,7 +138,8 @@ private: static QString constructWellLogDownloadUrl( const QString& server, const QString& wellLogId ); static QString constructWellboreTrajectoriesDownloadUrl( const QString& server, const QString& wellboreTrajectoryId ); - std::pair requestParquetDataByUrl( const QString& url ); + std::pair requestParquetDataByUrlBlocking( const QString& url, const QString& id ); + void requestParquetDataByUrl( const QString& url, const QString& id ); QString requestTokenBlocking(); @@ -156,5 +159,6 @@ private: std::map> m_wellbores; std::map> m_wellboreTrajectories; std::map> m_wellLogs; - QByteArray m_parquetData; + std::map m_parquetData; + std::map m_parquetErrors; }; diff --git a/ApplicationLibCode/ProjectDataModel/WellPath/RimWellPathCollection.cpp b/ApplicationLibCode/ProjectDataModel/WellPath/RimWellPathCollection.cpp index 9d52f24cda..5572980258 100644 --- a/ApplicationLibCode/ProjectDataModel/WellPath/RimWellPathCollection.cpp +++ b/ApplicationLibCode/ProjectDataModel/WellPath/RimWellPathCollection.cpp @@ -1056,7 +1056,7 @@ std::pair, QString> RimWellPathCollection::loadWellPathGeo const QString& wellboreTrajectoryId, double datumElevation ) { - auto [fileContents, errorMessage] = osduConnector->requestWellboreTrajectoryParquetDataById( wellboreTrajectoryId ); + auto [fileContents, errorMessage] = osduConnector->requestWellboreTrajectoryParquetDataByIdBlocking( wellboreTrajectoryId ); if ( !errorMessage.isEmpty() ) { return { nullptr, errorMessage }; @@ -1071,7 +1071,7 @@ std::pair, QString> RimWellPathCollection::loadWellPathGeo std::pair, QString> RimWellPathCollection::loadWellLogFromOsdu( RiaOsduConnector* osduConnector, const QString& wellLogId ) { - auto [fileContents, errorMessage] = osduConnector->requestWellLogParquetDataById( wellLogId ); + auto [fileContents, errorMessage] = osduConnector->requestWellLogParquetDataByIdBlocking( wellLogId ); if ( !errorMessage.isEmpty() ) { return { nullptr, errorMessage };