From 6aca027ecc5c1bbcd1a69deea10d2ff991c53f5e Mon Sep 17 00:00:00 2001 From: Christian Heimes Date: Wed, 28 Mar 2018 12:05:34 +0200 Subject: [PATCH] Fix installer CA port check for port 8080 The installer now checks that port 8080 is available and not in use by any other application. The port checker has been rewritten to use bind() rather than just checking if a server responds on localhost. It's much more reliable and detects more problems. Original patch by m3gat0nn4ge. Co-authored-by: Mega Tonnage Fixes: https://pagure.io/freeipa/issue/7415 Signed-off-by: Christian Heimes Reviewed-By: Rob Crittenden --- ipapython/ipautil.py | 49 +++++++++++++++++++++++++ ipaserver/install/ca.py | 7 +++- ipaserver/install/cainstance.py | 11 +++--- ipatests/test_ipapython/test_ipautil.py | 40 ++++++++++++++++++++ 4 files changed, 100 insertions(+), 7 deletions(-) diff --git a/ipapython/ipautil.py b/ipapython/ipautil.py index 354361f6c..9e2bddf27 100644 --- a/ipapython/ipautil.py +++ b/ipapython/ipautil.py @@ -1037,6 +1037,55 @@ def host_port_open(host, port, socket_type=socket.SOCK_STREAM, return port_open +def check_port_bindable(port, socket_type=socket.SOCK_STREAM): + """Check if a port is free and not bound by any other application + + :param port: port number + :param socket_type: type (SOCK_STREAM for TCP, SOCK_DGRAM for UDP) + + Returns True if the port is free, False otherwise + """ + if socket_type == socket.SOCK_STREAM: + proto = 'TCP' + elif socket_type == socket.SOCK_DGRAM: + proto = 'UDP' + else: + raise ValueError(socket_type) + + # Detect dual stack or IPv4 single stack + try: + s = socket.socket(socket.AF_INET6, socket_type) + anyaddr = '::' + logger.debug( + "check_port_bindable: Checking IPv4/IPv6 dual stack and %s", + proto + ) + except socket.error: + s = socket.socket(socket.AF_INET, socket_type) + anyaddr = '' + logger.debug("check_port_bindable: Checking IPv4 only and %s", proto) + + # Attempt to bind + try: + if socket_type == socket.SOCK_STREAM: + # reuse TCP sockets in TIME_WAIT state + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) + s.bind((anyaddr, port)) + except socket.error as e: + logger.debug( + "check_port_bindable: failed to bind to port %i/%s: %s", + port, proto, e + ) + return False + else: + logger.debug( + "check_port_bindable: bind success: %i/%s", port, proto + ) + return True + finally: + s.close() + + def reverse_record_exists(ip_address): """ Checks if IP address have some reverse record somewhere. diff --git a/ipaserver/install/ca.py b/ipaserver/install/ca.py index bef0af897..8780f6f9e 100644 --- a/ipaserver/install/ca.py +++ b/ipaserver/install/ca.py @@ -205,8 +205,11 @@ def install_check(standalone, replica_config, options): ) if not options.external_cert_files: - if not cainstance.check_port(): - print("IPA requires port 8443 for PKI but it is currently in use.") + if not cainstance.check_ports(): + print( + "IPA requires ports 8080 and 8443 for PKI, but one or more " + "are currently in use." + ) raise ScriptError("Aborting installation") if standalone: diff --git a/ipaserver/install/cainstance.py b/ipaserver/install/cainstance.py index cc86e18fc..bde5c77b6 100644 --- a/ipaserver/install/cainstance.py +++ b/ipaserver/install/cainstance.py @@ -105,13 +105,14 @@ class ExternalCAType(enum.Enum): MS_CS = 'ms-cs' -def check_port(): - """ - Check that dogtag port (8443) is available. +def check_ports(): + """Check that dogtag ports (8080, 8443) are available. - Returns True when the port is free, False if it's taken. + Returns True when ports are free, False if they are taken. """ - return not ipautil.host_port_open(None, 8443) + return all([ipautil.check_port_bindable(8443), + ipautil.check_port_bindable(8080)]) + def get_preop_pin(instance_root, instance_name): # Only used for Dogtag 9 diff --git a/ipatests/test_ipapython/test_ipautil.py b/ipatests/test_ipapython/test_ipautil.py index ef70f20fe..79283defd 100644 --- a/ipatests/test_ipapython/test_ipautil.py +++ b/ipatests/test_ipapython/test_ipautil.py @@ -21,6 +21,7 @@ """ Test the `ipapython/ipautil.py` module. """ +import socket import sys import tempfile @@ -508,3 +509,42 @@ def test_run_stderr(): assert "message" not in str(cm.value) assert "message" not in str(cm.value.output) assert "message" not in str(cm.value.stderr) + + +@pytest.fixture(scope='function') +def tcp_listen(): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) + # port 0 means the OS selects a random, unused port for the test. + s.bind(('', 0)) + s.listen(1) + yield s.getsockname()[-1], s + finally: + s.close() + + +@pytest.fixture(scope='function') +def udp_listen(): + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + # port 0 means the OS selects a random, unused port for the test. + s.bind(('', 0)) + yield s.getsockname()[-1], s + finally: + s.close() + + +def test_check_port_bindable_tcp(tcp_listen): + port, sock = tcp_listen + assert not ipautil.check_port_bindable(port) + assert not ipautil.check_port_bindable(port, socket.SOCK_STREAM) + sock.close() + assert ipautil.check_port_bindable(port) + + +def test_check_port_bindable_udp(udp_listen): + port, sock = udp_listen + assert not ipautil.check_port_bindable(port, socket.SOCK_DGRAM) + sock.close() + assert ipautil.check_port_bindable(port, socket.SOCK_DGRAM)