ipa-replica-conncheck: do not close listening ports until required

Previously, a separate thread would be created for each socket used
for conncheck. It would also time out after one second, after which it
would be closed and reopened again. This caused random failures of
conncheck.

Now all sockets are handled in a single thread and once the server
starts to listen on a port, it does not close that connection until the
script finishes.

Only IPv6 socket is used for simplicity, since it can handle both IPv6
and IPv4 connections. This requires IPv6 kernel support, which is
required by other parts of IPA anyway.

https://fedorahosted.org/freeipa/ticket/6487

Reviewed-By: Petr Spacek <pspacek@redhat.com>
This commit is contained in:
Tomas Krizek 2016-11-23 13:55:14 +01:00 committed by Martin Basti
parent 027fc32fe0
commit af0ba66188
2 changed files with 113 additions and 109 deletions

View File

@ -31,14 +31,16 @@ from ipaserver.install import installutils
from optparse import OptionGroup, OptionValueError from optparse import OptionGroup, OptionValueError
# pylint: enable=deprecated-module # pylint: enable=deprecated-module
from ipapython.ipa_log_manager import root_logger, standard_logging_setup from ipapython.ipa_log_manager import root_logger, standard_logging_setup
import copy
import sys import sys
import os import os
import signal import signal
import tempfile import tempfile
import select
import socket import socket
import time import time
import threading import threading
import errno import traceback
from socket import SOCK_STREAM, SOCK_DGRAM from socket import SOCK_STREAM, SOCK_DGRAM
import distutils.spawn import distutils.spawn
from ipaplatform.paths import paths from ipaplatform.paths import paths
@ -46,11 +48,12 @@ import gssapi
from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives import serialization
CONNECT_TIMEOUT = 5 CONNECT_TIMEOUT = 5
RESPONDERS = [ ] RESPONDER = None
QUIET = False QUIET = False
CCACHE_FILE = None CCACHE_FILE = None
KRB5_CONFIG = None KRB5_CONFIG = None
class SshExec(object): class SshExec(object):
def __init__(self, user, addr): def __init__(self, user, addr):
self.user = user self.user = user
@ -96,6 +99,7 @@ class CheckedPort(object):
self.port_type = port_type self.port_type = port_type
self.description = description self.description = description
BASE_PORTS = [ BASE_PORTS = [
CheckedPort(389, SOCK_STREAM, "Directory Service: Unsecure port"), CheckedPort(389, SOCK_STREAM, "Directory Service: Unsecure port"),
CheckedPort(636, SOCK_STREAM, "Directory Service: Secure port"), CheckedPort(636, SOCK_STREAM, "Directory Service: Secure port"),
@ -112,6 +116,7 @@ def print_info(msg):
if not QUIET: if not QUIET:
print(msg) print(msg)
def parse_options(): def parse_options():
def ca_cert_file_callback(option, opt, value, parser): def ca_cert_file_callback(option, opt, value, parser):
if not os.path.exists(value): if not os.path.exists(value):
@ -211,6 +216,7 @@ def parse_options():
return safe_options, options return safe_options, options
def logging_setup(options): def logging_setup(options):
log_file = None log_file = None
@ -219,16 +225,6 @@ def logging_setup(options):
standard_logging_setup(log_file, debug=options.debug) standard_logging_setup(log_file, debug=options.debug)
def clean_responders(responders):
if not responders:
return
for responder in responders:
responder.stop()
for responder in responders:
responder.join()
responders.remove(responder)
def sigterm_handler(signum, frame): def sigterm_handler(signum, frame):
# do what SIGINT does (raise a KeyboardInterrupt) # do what SIGINT does (raise a KeyboardInterrupt)
@ -236,6 +232,7 @@ def sigterm_handler(signum, frame):
if callable(sigint_handler): if callable(sigint_handler):
sigint_handler(signum, frame) sigint_handler(signum, frame)
def configure_krb5_conf(realm, kdc, filename): def configure_krb5_conf(realm, kdc, filename):
krbconf = ipaclient.install.ipachangeconf.IPAChangeConf("IPA Installer") krbconf = ipaclient.install.ipachangeconf.IPAChangeConf("IPA Installer")
@ -283,32 +280,107 @@ def configure_krb5_conf(realm, kdc, filename):
krbconf.newConf(filename, opts) krbconf.newConf(filename, opts)
class PortResponder(threading.Thread): class PortResponder(threading.Thread):
def __init__(self, port, port_type, socket_timeout=1): PROTO = {socket.SOCK_STREAM: 'tcp',
socket.SOCK_DGRAM: 'udp'}
def __init__(self, ports):
"""
ports: a list of CheckedPort
"""
super(PortResponder, self).__init__() super(PortResponder, self).__init__()
self.port = port # copy ports to avoid the need to synchronize it between threads
self.port_type = port_type self.ports = copy.deepcopy(ports)
self.socket_timeout = socket_timeout self._sockets = []
self._stop_request = False self._close = False
self._close_lock = threading.Lock()
self.responder_data = 'FreeIPA'
self.ports_open = threading.Condition()
def run(self): def run(self):
while not self._stop_request: root_logger.debug('Starting listening thread.')
for port in self.ports:
self._bind_to_port(port.port, port.port_type)
with self.ports_open:
root_logger.debug('Ports opened, notify original thread')
self.ports_open.notify()
while not self._is_closing():
ready_socks, _socks1, _socks2 = select.select(
self._sockets, [], [], 1)
if ready_socks:
ready_sock = ready_socks[0]
self._respond(ready_sock)
for sock in self._sockets:
port = sock.getsockname()[1]
proto = PortResponder.PROTO[sock.type]
sock.close()
root_logger.debug('%(port)d %(proto)s: Stopped listening' %
dict(port=port, proto=proto))
def _is_closing(self):
with self._close_lock:
return self._close
def _bind_to_port(self, port, socket_type):
# Use IPv6 socket as it is able to accept both IPv6 and IPv4
# connections. Since IPv6 kernel module is required by other
# parts of IPA, it should always be available.
family = socket.AF_INET6
host = '::' # all available interfaces
proto = PortResponder.PROTO[socket_type]
try:
sock = socket.socket(family, socket_type)
# Make sure IPv4 clients can connect to IPv6 socket
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
if socket_type == socket.SOCK_STREAM:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
if socket_type == socket.SOCK_STREAM:
# There might be a delay before accepting the connection,
# because a single thread is used to handle all the
# connections. Thus a backlog size of at least 1 is needed.
sock.listen(1)
root_logger.debug('%(port)d %(proto)s: Started listening' %
dict(port=port, proto=proto))
except socket.error as e:
root_logger.warning('%(port)d %(proto)s: Failed to bind' %
dict(port=port, proto=proto))
root_logger.debug(traceback.format_exc(e))
else:
self._sockets.append(sock)
def _respond(self, sock):
port = sock.getsockname()[1]
if sock.type == socket.SOCK_STREAM:
connection, addr = sock.accept()
try: try:
ipautil.bind_port_responder(self.port, connection.sendall(self.responder_data)
self.port_type, root_logger.debug('%(port)d tcp: Responded to %(addr)s' %
socket_timeout=self.socket_timeout, dict(port=port, addr=addr[0]))
responder_data="FreeIPA") finally:
except socket.timeout: connection.close()
pass elif sock.type == socket.SOCK_DGRAM:
except socket.error as e: _data, addr = sock.recvfrom(1)
if e.errno == errno.EADDRINUSE: sock.sendto(self.responder_data, addr)
time.sleep(1) root_logger.debug('%(port)d udp: Responded to %(addr)s' %
else: dict(port=port, addr=addr[0]))
raise
def stop(self): def stop(self):
self._stop_request = True root_logger.debug('Stopping listening thread.')
with self._close_lock:
self._close = True
def port_check(host, port_list): def port_check(host, port_list):
ports_failed = [] ports_failed = []
@ -344,7 +416,9 @@ def port_check(host, port_list):
raise RuntimeError("Port check failed! Inaccessible port(s): %s" \ raise RuntimeError("Port check failed! Inaccessible port(s): %s" \
% ", ".join(msg_ports)) % ", ".join(msg_ports))
def main(): def main():
global RESPONDER
safe_options, options = parse_options() safe_options, options = parse_options()
logging_setup(options) logging_setup(options)
@ -386,11 +460,11 @@ def main():
# create listeners # create listeners
print_info("Start listening on required ports for remote master check") print_info("Start listening on required ports for remote master check")
for port in required_ports: RESPONDER = PortResponder(required_ports)
root_logger.debug("Start listening on port %d (%s)" % (port.port, port.description)) RESPONDER.start()
responder = PortResponder(port.port, port.port_type) with RESPONDER.ports_open:
responder.start() RESPONDER.ports_open.wait()
RESPONDERS.append(responder) root_logger.debug('Original thread resumed')
remote_check_opts = ['--replica %s' % options.hostname] remote_check_opts = ['--replica %s' % options.hostname]
@ -550,18 +624,19 @@ def main():
time.sleep(3600) time.sleep(3600)
print_info("Connection check timeout: terminating listening program") print_info("Connection check timeout: terminating listening program")
if __name__ == "__main__": if __name__ == "__main__":
try: try:
sys.exit(main()) sys.exit(main())
except SystemExit as e:
sys.exit(e)
except KeyboardInterrupt: except KeyboardInterrupt:
print_info("\nCleaning up...") print_info("\nCleaning up...")
sys.exit(1) sys.exit(1)
except RuntimeError as e: except RuntimeError as e:
sys.exit(e) sys.exit(e)
finally: finally:
clean_responders(RESPONDERS) if RESPONDER is not None:
RESPONDER.stop()
RESPONDER.join()
for file_name in (CCACHE_FILE, KRB5_CONFIG): for file_name in (CCACHE_FILE, KRB5_CONFIG):
if file_name: if file_name:
try: try:

View File

@ -894,77 +894,6 @@ def host_port_open(host, port, socket_type=socket.SOCK_STREAM, socket_timeout=No
return False return False
def bind_port_responder(port, socket_type=socket.SOCK_STREAM, socket_timeout=None, responder_data=None):
host = None # all available interfaces
last_socket_error = None
# At first try to create IPv6 socket as it is able to accept both IPv6 and
# IPv4 connections (when not turned off)
families = (socket.AF_INET6, socket.AF_INET)
s = None
for family in families:
try:
addr_infos = socket.getaddrinfo(host, port, family, socket_type, 0,
socket.AI_PASSIVE)
except socket.error as e:
last_socket_error = e
continue
for res in addr_infos:
af, socktype, proto, _canonname, sa = res
try:
s = socket.socket(af, socktype, proto)
except socket.error as e:
last_socket_error = e
s = None
continue
if socket_timeout is not None:
s.settimeout(1)
if af == socket.AF_INET6:
try:
# Make sure IPv4 clients can connect to IPv6 socket
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
except socket.error:
pass
if socket_type == socket.SOCK_STREAM:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
s.bind(sa)
while True:
if socket_type == socket.SOCK_STREAM:
s.listen(1)
connection, _client_address = s.accept()
try:
if responder_data:
connection.sendall(responder_data)
finally:
connection.close()
elif socket_type == socket.SOCK_DGRAM:
_data, addr = s.recvfrom(1)
if responder_data:
s.sendto(responder_data, addr)
except socket.timeout:
# Timeout is expectable as it was requested by caller, raise
# the exception back to him
raise
except socket.error as e:
last_socket_error = e
s.close()
s = None
continue
finally:
if s:
s.close()
if s is None and last_socket_error is not None:
raise last_socket_error # pylint: disable=E0702
def reverse_record_exists(ip_address): def reverse_record_exists(ip_address):
""" """