diff --git a/install/tools/ipa-replica-conncheck b/install/tools/ipa-replica-conncheck index 544116efb..2413754e5 100755 --- a/install/tools/ipa-replica-conncheck +++ b/install/tools/ipa-replica-conncheck @@ -31,14 +31,16 @@ from ipaserver.install import installutils from optparse import OptionGroup, OptionValueError # pylint: enable=deprecated-module from ipapython.ipa_log_manager import root_logger, standard_logging_setup +import copy import sys import os import signal import tempfile +import select import socket import time import threading -import errno +import traceback from socket import SOCK_STREAM, SOCK_DGRAM import distutils.spawn from ipaplatform.paths import paths @@ -46,11 +48,12 @@ import gssapi from cryptography.hazmat.primitives import serialization CONNECT_TIMEOUT = 5 -RESPONDERS = [ ] +RESPONDER = None QUIET = False CCACHE_FILE = None KRB5_CONFIG = None + class SshExec(object): def __init__(self, user, addr): self.user = user @@ -96,6 +99,7 @@ class CheckedPort(object): self.port_type = port_type self.description = description + BASE_PORTS = [ CheckedPort(389, SOCK_STREAM, "Directory Service: Unsecure port"), CheckedPort(636, SOCK_STREAM, "Directory Service: Secure port"), @@ -112,6 +116,7 @@ def print_info(msg): if not QUIET: print(msg) + def parse_options(): def ca_cert_file_callback(option, opt, value, parser): if not os.path.exists(value): @@ -211,6 +216,7 @@ def parse_options(): return safe_options, options + def logging_setup(options): log_file = None @@ -219,16 +225,6 @@ def logging_setup(options): standard_logging_setup(log_file, debug=options.debug) -def clean_responders(responders): - if not responders: - return - - for responder in responders: - responder.stop() - - for responder in responders: - responder.join() - responders.remove(responder) def sigterm_handler(signum, frame): # do what SIGINT does (raise a KeyboardInterrupt) @@ -236,6 +232,7 @@ def sigterm_handler(signum, frame): if callable(sigint_handler): sigint_handler(signum, frame) + def configure_krb5_conf(realm, kdc, filename): krbconf = ipaclient.install.ipachangeconf.IPAChangeConf("IPA Installer") @@ -283,32 +280,107 @@ def configure_krb5_conf(realm, kdc, filename): krbconf.newConf(filename, opts) + class PortResponder(threading.Thread): - def __init__(self, port, port_type, socket_timeout=1): + PROTO = {socket.SOCK_STREAM: 'tcp', + socket.SOCK_DGRAM: 'udp'} + + def __init__(self, ports): + """ + ports: a list of CheckedPort + """ super(PortResponder, self).__init__() - self.port = port - self.port_type = port_type - self.socket_timeout = socket_timeout - self._stop_request = False + # copy ports to avoid the need to synchronize it between threads + self.ports = copy.deepcopy(ports) + self._sockets = [] + self._close = False + self._close_lock = threading.Lock() + self.responder_data = 'FreeIPA' + self.ports_open = threading.Condition() def run(self): - while not self._stop_request: + root_logger.debug('Starting listening thread.') + + for port in self.ports: + self._bind_to_port(port.port, port.port_type) + with self.ports_open: + root_logger.debug('Ports opened, notify original thread') + self.ports_open.notify() + + while not self._is_closing(): + ready_socks, _socks1, _socks2 = select.select( + self._sockets, [], [], 1) + if ready_socks: + ready_sock = ready_socks[0] + self._respond(ready_sock) + + for sock in self._sockets: + port = sock.getsockname()[1] + proto = PortResponder.PROTO[sock.type] + sock.close() + root_logger.debug('%(port)d %(proto)s: Stopped listening' % + dict(port=port, proto=proto)) + + def _is_closing(self): + with self._close_lock: + return self._close + + def _bind_to_port(self, port, socket_type): + # Use IPv6 socket as it is able to accept both IPv6 and IPv4 + # connections. Since IPv6 kernel module is required by other + # parts of IPA, it should always be available. + family = socket.AF_INET6 + host = '::' # all available interfaces + proto = PortResponder.PROTO[socket_type] + + try: + sock = socket.socket(family, socket_type) + + # Make sure IPv4 clients can connect to IPv6 socket + sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) + + if socket_type == socket.SOCK_STREAM: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + sock.bind((host, port)) + if socket_type == socket.SOCK_STREAM: + # There might be a delay before accepting the connection, + # because a single thread is used to handle all the + # connections. Thus a backlog size of at least 1 is needed. + sock.listen(1) + + root_logger.debug('%(port)d %(proto)s: Started listening' % + dict(port=port, proto=proto)) + except socket.error as e: + root_logger.warning('%(port)d %(proto)s: Failed to bind' % + dict(port=port, proto=proto)) + root_logger.debug(traceback.format_exc(e)) + else: + self._sockets.append(sock) + + def _respond(self, sock): + port = sock.getsockname()[1] + if sock.type == socket.SOCK_STREAM: + connection, addr = sock.accept() try: - ipautil.bind_port_responder(self.port, - self.port_type, - socket_timeout=self.socket_timeout, - responder_data="FreeIPA") - except socket.timeout: - pass - except socket.error as e: - if e.errno == errno.EADDRINUSE: - time.sleep(1) - else: - raise + connection.sendall(self.responder_data) + root_logger.debug('%(port)d tcp: Responded to %(addr)s' % + dict(port=port, addr=addr[0])) + finally: + connection.close() + elif sock.type == socket.SOCK_DGRAM: + _data, addr = sock.recvfrom(1) + sock.sendto(self.responder_data, addr) + root_logger.debug('%(port)d udp: Responded to %(addr)s' % + dict(port=port, addr=addr[0])) def stop(self): - self._stop_request = True + root_logger.debug('Stopping listening thread.') + + with self._close_lock: + self._close = True + def port_check(host, port_list): ports_failed = [] @@ -344,7 +416,9 @@ def port_check(host, port_list): raise RuntimeError("Port check failed! Inaccessible port(s): %s" \ % ", ".join(msg_ports)) + def main(): + global RESPONDER safe_options, options = parse_options() logging_setup(options) @@ -386,11 +460,11 @@ def main(): # create listeners print_info("Start listening on required ports for remote master check") - for port in required_ports: - root_logger.debug("Start listening on port %d (%s)" % (port.port, port.description)) - responder = PortResponder(port.port, port.port_type) - responder.start() - RESPONDERS.append(responder) + RESPONDER = PortResponder(required_ports) + RESPONDER.start() + with RESPONDER.ports_open: + RESPONDER.ports_open.wait() + root_logger.debug('Original thread resumed') remote_check_opts = ['--replica %s' % options.hostname] @@ -550,18 +624,19 @@ def main(): time.sleep(3600) print_info("Connection check timeout: terminating listening program") + if __name__ == "__main__": try: sys.exit(main()) - except SystemExit as e: - sys.exit(e) except KeyboardInterrupt: print_info("\nCleaning up...") sys.exit(1) except RuntimeError as e: sys.exit(e) finally: - clean_responders(RESPONDERS) + if RESPONDER is not None: + RESPONDER.stop() + RESPONDER.join() for file_name in (CCACHE_FILE, KRB5_CONFIG): if file_name: try: diff --git a/ipapython/ipautil.py b/ipapython/ipautil.py index 1c95a81f6..f85fa0d1d 100644 --- a/ipapython/ipautil.py +++ b/ipapython/ipautil.py @@ -894,77 +894,6 @@ def host_port_open(host, port, socket_type=socket.SOCK_STREAM, socket_timeout=No return False -def bind_port_responder(port, socket_type=socket.SOCK_STREAM, socket_timeout=None, responder_data=None): - host = None # all available interfaces - last_socket_error = None - - # At first try to create IPv6 socket as it is able to accept both IPv6 and - # IPv4 connections (when not turned off) - families = (socket.AF_INET6, socket.AF_INET) - s = None - - for family in families: - try: - addr_infos = socket.getaddrinfo(host, port, family, socket_type, 0, - socket.AI_PASSIVE) - except socket.error as e: - last_socket_error = e - continue - for res in addr_infos: - af, socktype, proto, _canonname, sa = res - try: - s = socket.socket(af, socktype, proto) - except socket.error as e: - last_socket_error = e - s = None - continue - - if socket_timeout is not None: - s.settimeout(1) - - if af == socket.AF_INET6: - try: - # Make sure IPv4 clients can connect to IPv6 socket - s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) - except socket.error: - pass - - if socket_type == socket.SOCK_STREAM: - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - - try: - s.bind(sa) - - while True: - if socket_type == socket.SOCK_STREAM: - s.listen(1) - connection, _client_address = s.accept() - try: - if responder_data: - connection.sendall(responder_data) - finally: - connection.close() - elif socket_type == socket.SOCK_DGRAM: - _data, addr = s.recvfrom(1) - - if responder_data: - s.sendto(responder_data, addr) - except socket.timeout: - # Timeout is expectable as it was requested by caller, raise - # the exception back to him - raise - except socket.error as e: - last_socket_error = e - s.close() - s = None - continue - finally: - if s: - s.close() - - if s is None and last_socket_error is not None: - raise last_socket_error # pylint: disable=E0702 - def reverse_record_exists(ip_address): """