mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-02-25 18:55:28 -06:00
python3: port certmonger requests script
This commit fixes requesting certificates via certmonger in Python 3. This includes dogtag-ipa-ca-renew-agent-submit script and scripts used during the scripts restarting. https://pagure.io/freeipa/issue/4985 Reviewed-By: Florence Blanc-Renaud <frenaud@redhat.com>
This commit is contained in:
parent
a3c11b01af
commit
7ef6de931b
@ -89,7 +89,7 @@ def get_nickname():
|
|||||||
# an OpenSSL certificate for which we have to reverse the order of its DN
|
# an OpenSSL certificate for which we have to reverse the order of its DN
|
||||||
# components thus changing the CERTMONGER_REQ_SUBJECT
|
# components thus changing the CERTMONGER_REQ_SUBJECT
|
||||||
# https://pagure.io/certmonger/issue/62
|
# https://pagure.io/certmonger/issue/62
|
||||||
csr = os.environ.get('CERTMONGER_CSR')
|
csr = os.environ.get('CERTMONGER_CSR').encode('ascii')
|
||||||
csr_obj = crypto_x509.load_pem_x509_csr(csr, default_backend())
|
csr_obj = crypto_x509.load_pem_x509_csr(csr, default_backend())
|
||||||
subject = csr_obj.subject
|
subject = csr_obj.subject
|
||||||
if not subject:
|
if not subject:
|
||||||
@ -166,7 +166,8 @@ def call_handler(_handler, *args, **kwargs):
|
|||||||
if profile is not None:
|
if profile is not None:
|
||||||
if not isinstance(profile, unicode):
|
if not isinstance(profile, unicode):
|
||||||
raise TypeError
|
raise TypeError
|
||||||
profile = profile.encode('raw_unicode_escape')
|
profile = (profile.encode('raw_unicode_escape')
|
||||||
|
.decode('ascii'))
|
||||||
except (TypeError, UnicodeEncodeError):
|
except (TypeError, UnicodeEncodeError):
|
||||||
return (UNCONFIGURED,
|
return (UNCONFIGURED,
|
||||||
"Invalid 'profile' in cookie: %r" % profile)
|
"Invalid 'profile' in cookie: %r" % profile)
|
||||||
@ -183,7 +184,7 @@ def call_handler(_handler, *args, **kwargs):
|
|||||||
try:
|
try:
|
||||||
if not isinstance(cookie, unicode):
|
if not isinstance(cookie, unicode):
|
||||||
raise TypeError
|
raise TypeError
|
||||||
cookie = cookie.encode('raw_unicode_escape')
|
cookie = cookie.encode('raw_unicode_escape').decode('ascii')
|
||||||
except (TypeError, UnicodeEncodeError):
|
except (TypeError, UnicodeEncodeError):
|
||||||
return (UNCONFIGURED,
|
return (UNCONFIGURED,
|
||||||
"Invalid 'cookie' in cookie: %r" % cookie)
|
"Invalid 'cookie' in cookie: %r" % cookie)
|
||||||
@ -194,11 +195,12 @@ def call_handler(_handler, *args, **kwargs):
|
|||||||
result = _handler(*args, **kwargs)
|
result = _handler(*args, **kwargs)
|
||||||
|
|
||||||
if result[0] in (WAIT, WAIT_WITH_DELAY):
|
if result[0] in (WAIT, WAIT_WITH_DELAY):
|
||||||
context['cookie'] = result[-1].decode('raw_unicode_escape')
|
context['cookie'] = (result[-1].encode('ascii')
|
||||||
|
.decode('raw_unicode_escape'))
|
||||||
|
|
||||||
profile = os.environ.get('CERTMONGER_CA_PROFILE')
|
profile = os.environ.get('CERTMONGER_CA_PROFILE')
|
||||||
if profile is not None:
|
if profile is not None:
|
||||||
profile = profile.decode('raw_unicode_escape')
|
profile = profile.encode('ascii').decode('raw_unicode_escape')
|
||||||
context['profile'] = profile
|
context['profile'] = profile
|
||||||
|
|
||||||
cookie = json.dumps(context)
|
cookie = json.dumps(context)
|
||||||
@ -232,7 +234,7 @@ def request_cert(reuse_existing, **kwargs):
|
|||||||
if os.environ.get('CERTMONGER_CA_PROFILE') == 'caCACert':
|
if os.environ.get('CERTMONGER_CA_PROFILE') == 'caCACert':
|
||||||
args += ['-N', '-O', 'bypassCAnotafter=true']
|
args += ['-N', '-O', 'bypassCAnotafter=true']
|
||||||
result = ipautil.run(args, raiseonerr=False, env=os.environ,
|
result = ipautil.run(args, raiseonerr=False, env=os.environ,
|
||||||
capture_output=True)
|
capture_output=True)
|
||||||
if six.PY2:
|
if six.PY2:
|
||||||
sys.stderr.write(result.raw_error_output)
|
sys.stderr.write(result.raw_error_output)
|
||||||
else:
|
else:
|
||||||
@ -395,7 +397,7 @@ def retrieve_or_reuse_cert(**kwargs):
|
|||||||
else:
|
else:
|
||||||
cert = entry.single_value['usercertificate']
|
cert = entry.single_value['usercertificate']
|
||||||
|
|
||||||
return (ISSUED, cert.public_bytes(x509.Encoding.PEM))
|
return (ISSUED, cert.public_bytes(x509.Encoding.PEM).decode('ascii'))
|
||||||
|
|
||||||
|
|
||||||
def retrieve_cert_continuous(reuse_existing, **kwargs):
|
def retrieve_cert_continuous(reuse_existing, **kwargs):
|
||||||
@ -405,7 +407,8 @@ def retrieve_cert_continuous(reuse_existing, **kwargs):
|
|||||||
"""
|
"""
|
||||||
old_cert = os.environ.get('CERTMONGER_CERTIFICATE')
|
old_cert = os.environ.get('CERTMONGER_CERTIFICATE')
|
||||||
if old_cert:
|
if old_cert:
|
||||||
old_cert = x509.load_pem_x509_certificate(fix_pem(old_cert))
|
old_cert = x509.load_pem_x509_certificate(
|
||||||
|
fix_pem(old_cert.encode('ascii')))
|
||||||
|
|
||||||
result = call_handler(retrieve_or_reuse_cert,
|
result = call_handler(retrieve_or_reuse_cert,
|
||||||
reuse_existing=reuse_existing,
|
reuse_existing=reuse_existing,
|
||||||
@ -413,7 +416,8 @@ def retrieve_cert_continuous(reuse_existing, **kwargs):
|
|||||||
if result[0] != ISSUED or reuse_existing:
|
if result[0] != ISSUED or reuse_existing:
|
||||||
return result
|
return result
|
||||||
|
|
||||||
new_cert = x509.load_pem_x509_certificate(fix_pem(result[1]))
|
new_cert = x509.load_pem_x509_certificate(
|
||||||
|
fix_pem(result[1].encode('ascii')))
|
||||||
if new_cert == old_cert:
|
if new_cert == old_cert:
|
||||||
syslog.syslog(syslog.LOG_INFO, "Updated certificate not available")
|
syslog.syslog(syslog.LOG_INFO, "Updated certificate not available")
|
||||||
# No cert available yet, tell certmonger to wait another 8 hours
|
# No cert available yet, tell certmonger to wait another 8 hours
|
||||||
@ -437,14 +441,14 @@ def renew_ca_cert(reuse_existing, **kwargs):
|
|||||||
"""
|
"""
|
||||||
This is used for automatic CA certificate renewal.
|
This is used for automatic CA certificate renewal.
|
||||||
"""
|
"""
|
||||||
csr = os.environ.get('CERTMONGER_CSR')
|
csr = os.environ.get('CERTMONGER_CSR').encode('ascii')
|
||||||
if not csr:
|
if not csr:
|
||||||
return (UNCONFIGURED, "Certificate request not provided")
|
return (UNCONFIGURED, "Certificate request not provided")
|
||||||
|
|
||||||
cert = os.environ.get('CERTMONGER_CERTIFICATE')
|
cert = os.environ.get('CERTMONGER_CERTIFICATE')
|
||||||
if not cert:
|
if not cert:
|
||||||
return (REJECTED, "New certificate requests not supported")
|
return (REJECTED, "New certificate requests not supported")
|
||||||
cert = x509.load_pem_x509_certificate(fix_pem(cert))
|
cert = x509.load_pem_x509_certificate(fix_pem(cert.encode('ascii')))
|
||||||
is_self_signed = cert.is_self_signed()
|
is_self_signed = cert.is_self_signed()
|
||||||
|
|
||||||
operation = os.environ.get('CERTMONGER_OPERATION')
|
operation = os.environ.get('CERTMONGER_OPERATION')
|
||||||
|
@ -351,7 +351,8 @@ class DogtagInstance(service.Service):
|
|||||||
cs_cfg,
|
cs_cfg,
|
||||||
directive,
|
directive,
|
||||||
# the cert must be only the base64 string without headers
|
# the cert must be only the base64 string without headers
|
||||||
base64.b64encode(cert.public_bytes(x509.Encoding.DER)),
|
(base64.b64encode(cert.public_bytes(x509.Encoding.DER))
|
||||||
|
.decode('ascii')),
|
||||||
quotes=False,
|
quotes=False,
|
||||||
separator='=')
|
separator='=')
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user