Fix installer CA port check for port 8080

The installer now checks that port 8080 is available and not in use by
any other application.

The port checker has been rewritten to use bind() rather than just
checking if a server responds on localhost. It's much more reliable and
detects more problems.

Original patch by m3gat0nn4ge.

Co-authored-by: Mega Tonnage <m3gat0nn4ge@gmail.com>
Fixes: https://pagure.io/freeipa/issue/7415
Signed-off-by: Christian Heimes <cheimes@redhat.com>
Reviewed-By: Rob Crittenden <rcritten@redhat.com>
This commit is contained in:
Christian Heimes 2018-03-28 12:05:34 +02:00
parent a947695ab0
commit 6aca027ecc
4 changed files with 100 additions and 7 deletions

View File

@ -1037,6 +1037,55 @@ def host_port_open(host, port, socket_type=socket.SOCK_STREAM,
return port_open
def check_port_bindable(port, socket_type=socket.SOCK_STREAM):
"""Check if a port is free and not bound by any other application
:param port: port number
:param socket_type: type (SOCK_STREAM for TCP, SOCK_DGRAM for UDP)
Returns True if the port is free, False otherwise
"""
if socket_type == socket.SOCK_STREAM:
proto = 'TCP'
elif socket_type == socket.SOCK_DGRAM:
proto = 'UDP'
else:
raise ValueError(socket_type)
# Detect dual stack or IPv4 single stack
try:
s = socket.socket(socket.AF_INET6, socket_type)
anyaddr = '::'
logger.debug(
"check_port_bindable: Checking IPv4/IPv6 dual stack and %s",
proto
)
except socket.error:
s = socket.socket(socket.AF_INET, socket_type)
anyaddr = ''
logger.debug("check_port_bindable: Checking IPv4 only and %s", proto)
# Attempt to bind
try:
if socket_type == socket.SOCK_STREAM:
# reuse TCP sockets in TIME_WAIT state
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
s.bind((anyaddr, port))
except socket.error as e:
logger.debug(
"check_port_bindable: failed to bind to port %i/%s: %s",
port, proto, e
)
return False
else:
logger.debug(
"check_port_bindable: bind success: %i/%s", port, proto
)
return True
finally:
s.close()
def reverse_record_exists(ip_address):
"""
Checks if IP address have some reverse record somewhere.

View File

@ -205,8 +205,11 @@ def install_check(standalone, replica_config, options):
)
if not options.external_cert_files:
if not cainstance.check_port():
print("IPA requires port 8443 for PKI but it is currently in use.")
if not cainstance.check_ports():
print(
"IPA requires ports 8080 and 8443 for PKI, but one or more "
"are currently in use."
)
raise ScriptError("Aborting installation")
if standalone:

View File

@ -105,13 +105,14 @@ class ExternalCAType(enum.Enum):
MS_CS = 'ms-cs'
def check_port():
"""
Check that dogtag port (8443) is available.
def check_ports():
"""Check that dogtag ports (8080, 8443) are available.
Returns True when the port is free, False if it's taken.
Returns True when ports are free, False if they are taken.
"""
return not ipautil.host_port_open(None, 8443)
return all([ipautil.check_port_bindable(8443),
ipautil.check_port_bindable(8080)])
def get_preop_pin(instance_root, instance_name):
# Only used for Dogtag 9

View File

@ -21,6 +21,7 @@
"""
Test the `ipapython/ipautil.py` module.
"""
import socket
import sys
import tempfile
@ -508,3 +509,42 @@ def test_run_stderr():
assert "message" not in str(cm.value)
assert "message" not in str(cm.value.output)
assert "message" not in str(cm.value.stderr)
@pytest.fixture(scope='function')
def tcp_listen():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
# port 0 means the OS selects a random, unused port for the test.
s.bind(('', 0))
s.listen(1)
yield s.getsockname()[-1], s
finally:
s.close()
@pytest.fixture(scope='function')
def udp_listen():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# port 0 means the OS selects a random, unused port for the test.
s.bind(('', 0))
yield s.getsockname()[-1], s
finally:
s.close()
def test_check_port_bindable_tcp(tcp_listen):
port, sock = tcp_listen
assert not ipautil.check_port_bindable(port)
assert not ipautil.check_port_bindable(port, socket.SOCK_STREAM)
sock.close()
assert ipautil.check_port_bindable(port)
def test_check_port_bindable_udp(udp_listen):
port, sock = udp_listen
assert not ipautil.check_port_bindable(port, socket.SOCK_DGRAM)
sock.close()
assert ipautil.check_port_bindable(port, socket.SOCK_DGRAM)