#9307 Python: avoid assigning same port number to multiple grpc sessions

* Python: avoid assigning same port number to multiple grpc sessions
* Add retry count when checking for port number file
* Use grpc to find and use port number
* Add test used to start several instances of resinsight at the same time
Testing up to 50 instances works well

* Python: allow launch_port == 0 to assign port by GRPC in Instance.launch().
Also allow longer wait before failing the port number file reading:
it can take some time to launch when launching lots of instances at
the same time.
This commit is contained in:
Kristian Bendiksen 2022-09-26 14:19:21 +02:00 committed by GitHub
parent 587f700ae4
commit c2b5ab8d2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 165 additions and 36 deletions

View File

@ -33,6 +33,8 @@
#include "cvfProgramOptions.h"
#include "cvfqtUtils.h"
#include <QFile>
#ifndef WIN32
#include <sys/types.h>
#include <unistd.h>
@ -145,6 +147,28 @@ int main( int argc, char* argv[] )
if ( grpcInterface && grpcInterface->initializeGrpcServer( progOpt ) )
{
grpcInterface->launchGrpcServer();
if ( cvf::Option o = progOpt.option( "portnumberfile" ) )
{
if ( o.valueCount() == 1 )
{
int portNumber = grpcInterface->portNumber();
QString fileName = QString::fromStdString( o.value( 0 ).toStdString() );
// Write port number to the file given file.
// Temp file is used to avoid incomplete reads.
QString tempFilePath = fileName + ".tmp";
QFile file( tempFilePath );
if ( file.open( QIODevice::WriteOnly | QIODevice::Text ) )
{
QTextStream out( &file );
out << portNumber << endl;
}
file.close();
QFile::rename( tempFilePath, fileName );
}
}
}
#endif
exitCode = QCoreApplication::instance()->exec();

View File

@ -76,6 +76,11 @@ bool RiaArgumentParser::parseArguments( cvf::ProgramOptions* progOpt )
"[<portnumber>]",
"Launch as a GRPC server. Default port is 50051",
cvf::ProgramOptions::SINGLE_VALUE );
progOpt->registerOption( "portnumberfile",
"[<filename>]",
"Write the port number to this file.",
cvf::ProgramOptions::SINGLE_VALUE );
progOpt->registerOption( "startdir", "<folder>", "Set startup directory.\n", cvf::ProgramOptions::SINGLE_VALUE );
progOpt->registerOption( "summaryplot",

View File

@ -9,6 +9,7 @@ import os
import socket
import logging
import time
import tempfile
import grpc
@ -56,6 +57,22 @@ class Instance:
return False
return True
@staticmethod
def __read_port_number_from_file(file_path):
retry_count = 0
while not os.path.exists(file_path) and retry_count < 30:
time.sleep(1)
retry_count = retry_count + 1
print("Portnumber file retry count : ", retry_count)
if os.path.isfile(file_path):
with open(file_path) as f:
value = f.readline()
return int(value)
else:
return -1
@staticmethod
def launch(
resinsight_executable="",
@ -73,18 +90,19 @@ class Instance:
environment variable.
console (bool): If True, launch as console application, without GUI.
launch_port(int): If -1 will use the default port 50051 or RESINSIGHT_GRPC_PORT
if anything else, ResInsight will try to launch with this port
if anything else, ResInsight will try to launch with this port.
If 0 a random port will be used.
command_line_parameters(list): Additional parameters as string entries in the list.
Returns:
Instance: an instance object if it worked. None if not.
"""
port = 50051
requested_port = 50051
port_env = os.environ.get("RESINSIGHT_GRPC_PORT")
if port_env:
port = int(port_env)
requested_port = int(port_env)
if launch_port != -1:
port = launch_port
requested_port = launch_port
if not resinsight_executable:
resinsight_executable = os.environ.get("RESINSIGHT_EXECUTABLE")
@ -95,12 +113,6 @@ class Instance:
)
return None
print("Trying port " + str(port))
while Instance.__is_port_in_use(port):
port += 1
print("Trying port " + str(port))
print("Port " + str(port))
print("Trying to launch", resinsight_executable)
if command_line_parameters is None:
@ -108,19 +120,33 @@ class Instance:
elif isinstance(command_line_parameters, str):
command_line_parameters = [str]
parameters = ["ResInsight", "--server", str(port)] + command_line_parameters
if console:
print("Launching as console app")
parameters.append("--console")
with tempfile.TemporaryDirectory() as tmp_dir_path:
port_number_file = tmp_dir_path + "/portnumber.txt"
parameters = [
"ResInsight",
"--server",
requested_port,
"--portnumberfile",
port_number_file,
] + command_line_parameters
if console:
print("Launching as console app")
parameters.append("--console")
# Stringify all parameters
for i in range(0, len(parameters)):
parameters[i] = str(parameters[i])
# Stringify all parameters
for i in range(0, len(parameters)):
parameters[i] = str(parameters[i])
pid = os.spawnv(os.P_NOWAIT, resinsight_executable, parameters)
if pid:
instance = Instance(port=port, launched=True)
return instance
pid = os.spawnv(os.P_NOWAIT, resinsight_executable, parameters)
if pid:
port = Instance.__read_port_number_from_file(port_number_file)
if port == -1:
print("Unable to read port number. Launch failed.")
# Need to kill the process using PID since there is no GRPC connection to use.
os.kill(pid)
else:
instance = Instance(port=port, launched=True)
return instance
return None
@staticmethod
@ -178,10 +204,10 @@ class Instance:
port(int): port number
"""
logging.basicConfig()
location = "localhost:" + str(port)
self.location = "localhost:" + str(port)
self.channel = grpc.insecure_channel(
location, options=[("grpc.enable_http_proxy", False)]
self.location, options=[("grpc.enable_http_proxy", False)]
)
self.launched = launched
self.commands = Commands_pb2_grpc.CommandsStub(self.channel)
@ -189,7 +215,7 @@ class Instance:
# Main version check package
self.app = App_pb2_grpc.AppStub(self.channel)
self._check_connection_and_version(self.channel, launched, location)
self._check_connection_and_version(self.channel, launched, self.location)
# Intercept UNAVAILABLE errors and retry on failures
interceptors = (

View File

@ -0,0 +1,51 @@
import sys
import os
import math
import pytest
import grpc
import tempfile
import time
import multiprocessing
sys.path.insert(1, os.path.join(sys.path[0], "../../"))
import rips
import dataroot
def launch_resinsight(sec=1):
instance = rips.Instance.launch(console=True, launch_port=0)
print(instance.location)
print(f"Sleeping for {sec} second(s): ", instance.location)
time.sleep(sec)
print(f"Done sleeping", instance.location)
instance.exit()
def test_launch_sequential(rips_instance, initialize_test):
instance_list = []
for i in range(4):
rips_instance = rips.Instance.launch(console=True)
instance_list.append(rips_instance)
for instance in instance_list:
print(instance)
instance.exit()
def test_launch_parallell(rips_instance, initialize_test):
process_list = []
instance_count = 10
for i in range(instance_count):
process = multiprocessing.Process(target=launch_resinsight)
process_list.append(process)
for process in process_list:
process.start()
# completing process
for p in process_list:
p.join()

View File

@ -30,18 +30,22 @@ bool RiaGrpcApplicationInterface::initializeGrpcServer( const cvf::ProgramOption
{
if ( !RiaPreferences::current()->enableGrpcServer() ) return false;
int defaultPortNumber = RiaPreferences::current()->defaultGrpcPortNumber();
bool fixedPort = false;
int defaultPortNumber = RiaPreferences::current()->defaultGrpcPortNumber();
bool useGrpcGeneratedPort = false;
if ( cvf::Option o = progOpt.option( "server" ) )
{
if ( o.valueCount() == 1 )
{
defaultPortNumber = o.value( 0 ).toInt( defaultPortNumber );
fixedPort = true;
// If the port number is 0 it will be selected randomly by grpc.
if ( defaultPortNumber == 0 ) useGrpcGeneratedPort = true;
}
}
// Try to find an available port number starting from the default port
int portNumber = defaultPortNumber;
if ( !fixedPort )
if ( !useGrpcGeneratedPort )
{
portNumber = RiaGrpcServer::findAvailablePortNumber( defaultPortNumber );
}
@ -113,3 +117,13 @@ int RiaGrpcApplicationInterface::processRequests()
return 0;
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
int RiaGrpcApplicationInterface::portNumber() const
{
if ( m_grpcServer ) return m_grpcServer->portNumber();
return -1;
}

View File

@ -38,6 +38,7 @@ public:
QProcessEnvironment grpcProcessEnvironment() const;
int processRequests();
int portNumber() const;
protected:
std::unique_ptr<RiaGrpcServer> m_grpcServer;

View File

@ -137,12 +137,14 @@ void RiaGrpcServerImpl::runInThread()
//--------------------------------------------------------------------------------------------------
void RiaGrpcServerImpl::initialize()
{
CAF_ASSERT( m_portNumber > 0 && m_portNumber <= (int)std::numeric_limits<quint16>::max() );
QString serverAddress = QString( "localhost:%1" ).arg( m_portNumber );
CAF_ASSERT( m_portNumber >= 0 && m_portNumber <= (int)std::numeric_limits<quint16>::max() );
ServerBuilder builder;
builder.AddListeningPort( serverAddress.toStdString(), grpc::InsecureServerCredentials() );
// When setting port number to 0, grpc will find and use a valid port number
// The port number is assigned to the m_portNumber variable after calling builder.BuildAndStart()
QString requestedServerAddress = QString( "localhost:%1" ).arg( m_portNumber );
builder.AddListeningPort( requestedServerAddress.toStdString(), grpc::InsecureServerCredentials(), &m_portNumber );
for ( auto key : RiaGrpcServiceFactory::instance()->allKeys() )
{
@ -154,7 +156,14 @@ void RiaGrpcServerImpl::initialize()
m_completionQueue = builder.AddCompletionQueue();
m_server = builder.BuildAndStart();
CVF_ASSERT( m_server );
QString serverAddress = QString( "localhost:%1" ).arg( m_portNumber );
if ( !m_server )
{
RiaLogging::error( QString( "Failed to start server on %1" ).arg( serverAddress ) );
return;
}
RiaLogging::info( QString( "Server listening on %1" ).arg( serverAddress ) );
// Spawn new CallData instances to serve new clients.
@ -383,7 +392,7 @@ int RiaGrpcServer::findAvailablePortNumber( int defaultPortNumber )
{
int startPort = 50051;
if ( defaultPortNumber > 0 && defaultPortNumber < (int)std::numeric_limits<quint16>::max() )
if ( defaultPortNumber >= 0 && defaultPortNumber < (int)std::numeric_limits<quint16>::max() )
{
startPort = defaultPortNumber;
}
@ -391,8 +400,7 @@ int RiaGrpcServer::findAvailablePortNumber( int defaultPortNumber )
int endPort = std::min( startPort + 100, (int)std::numeric_limits<quint16>::max() );
QTcpServer serverTest;
quint16 port = static_cast<quint16>( startPort );
for ( ; port <= static_cast<quint16>( endPort ); ++port )
for ( quint16 port = static_cast<quint16>( startPort ); port <= static_cast<quint16>( endPort ); ++port )
{
if ( serverTest.listen( QHostAddress::LocalHost, port ) )
{