diff --git a/ipapython/ipautil.py b/ipapython/ipautil.py index 354361f6c..9e2bddf27 100644 --- a/ipapython/ipautil.py +++ b/ipapython/ipautil.py @@ -1037,6 +1037,55 @@ def host_port_open(host, port, socket_type=socket.SOCK_STREAM, return port_open +def check_port_bindable(port, socket_type=socket.SOCK_STREAM): + """Check if a port is free and not bound by any other application + + :param port: port number + :param socket_type: type (SOCK_STREAM for TCP, SOCK_DGRAM for UDP) + + Returns True if the port is free, False otherwise + """ + if socket_type == socket.SOCK_STREAM: + proto = 'TCP' + elif socket_type == socket.SOCK_DGRAM: + proto = 'UDP' + else: + raise ValueError(socket_type) + + # Detect dual stack or IPv4 single stack + try: + s = socket.socket(socket.AF_INET6, socket_type) + anyaddr = '::' + logger.debug( + "check_port_bindable: Checking IPv4/IPv6 dual stack and %s", + proto + ) + except socket.error: + s = socket.socket(socket.AF_INET, socket_type) + anyaddr = '' + logger.debug("check_port_bindable: Checking IPv4 only and %s", proto) + + # Attempt to bind + try: + if socket_type == socket.SOCK_STREAM: + # reuse TCP sockets in TIME_WAIT state + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) + s.bind((anyaddr, port)) + except socket.error as e: + logger.debug( + "check_port_bindable: failed to bind to port %i/%s: %s", + port, proto, e + ) + return False + else: + logger.debug( + "check_port_bindable: bind success: %i/%s", port, proto + ) + return True + finally: + s.close() + + def reverse_record_exists(ip_address): """ Checks if IP address have some reverse record somewhere. diff --git a/ipaserver/install/ca.py b/ipaserver/install/ca.py index bef0af897..8780f6f9e 100644 --- a/ipaserver/install/ca.py +++ b/ipaserver/install/ca.py @@ -205,8 +205,11 @@ def install_check(standalone, replica_config, options): ) if not options.external_cert_files: - if not cainstance.check_port(): - print("IPA requires port 8443 for PKI but it is currently in use.") + if not cainstance.check_ports(): + print( + "IPA requires ports 8080 and 8443 for PKI, but one or more " + "are currently in use." + ) raise ScriptError("Aborting installation") if standalone: diff --git a/ipaserver/install/cainstance.py b/ipaserver/install/cainstance.py index cc86e18fc..bde5c77b6 100644 --- a/ipaserver/install/cainstance.py +++ b/ipaserver/install/cainstance.py @@ -105,13 +105,14 @@ class ExternalCAType(enum.Enum): MS_CS = 'ms-cs' -def check_port(): - """ - Check that dogtag port (8443) is available. +def check_ports(): + """Check that dogtag ports (8080, 8443) are available. - Returns True when the port is free, False if it's taken. + Returns True when ports are free, False if they are taken. """ - return not ipautil.host_port_open(None, 8443) + return all([ipautil.check_port_bindable(8443), + ipautil.check_port_bindable(8080)]) + def get_preop_pin(instance_root, instance_name): # Only used for Dogtag 9 diff --git a/ipatests/test_ipapython/test_ipautil.py b/ipatests/test_ipapython/test_ipautil.py index ef70f20fe..79283defd 100644 --- a/ipatests/test_ipapython/test_ipautil.py +++ b/ipatests/test_ipapython/test_ipautil.py @@ -21,6 +21,7 @@ """ Test the `ipapython/ipautil.py` module. """ +import socket import sys import tempfile @@ -508,3 +509,42 @@ def test_run_stderr(): assert "message" not in str(cm.value) assert "message" not in str(cm.value.output) assert "message" not in str(cm.value.stderr) + + +@pytest.fixture(scope='function') +def tcp_listen(): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) + # port 0 means the OS selects a random, unused port for the test. + s.bind(('', 0)) + s.listen(1) + yield s.getsockname()[-1], s + finally: + s.close() + + +@pytest.fixture(scope='function') +def udp_listen(): + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + # port 0 means the OS selects a random, unused port for the test. + s.bind(('', 0)) + yield s.getsockname()[-1], s + finally: + s.close() + + +def test_check_port_bindable_tcp(tcp_listen): + port, sock = tcp_listen + assert not ipautil.check_port_bindable(port) + assert not ipautil.check_port_bindable(port, socket.SOCK_STREAM) + sock.close() + assert ipautil.check_port_bindable(port) + + +def test_check_port_bindable_udp(udp_listen): + port, sock = udp_listen + assert not ipautil.check_port_bindable(port, socket.SOCK_DGRAM) + sock.close() + assert ipautil.check_port_bindable(port, socket.SOCK_DGRAM)