mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-02-25 18:55:28 -06:00
IPA-EPN: Add tests for sending real mail with auth and templates
Send e-mail using postfix on localhost and read the contents to verify that the mail was delivered and that the template was applied correctly. Fixes: https://pagure.io/freeipa/issue/3687 Signed-off-by: Rob Crittenden <rcritten@redhat.com> Reviewed-By: Francois Cami <fcami@redhat.com>
This commit is contained in:
parent
c3cbaed9fd
commit
1760ad48ae
@ -17,10 +17,13 @@
|
|||||||
|
|
||||||
from __future__ import print_function, absolute_import
|
from __future__ import print_function, absolute_import
|
||||||
|
|
||||||
|
import base64
|
||||||
import datetime
|
import datetime
|
||||||
|
import email
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import pytest
|
import pytest
|
||||||
|
import textwrap
|
||||||
|
|
||||||
from ipatests.test_integration.base import IntegrationTest
|
from ipatests.test_integration.base import IntegrationTest
|
||||||
from ipatests.pytest_ipa.integration import tasks
|
from ipatests.pytest_ipa.integration import tasks
|
||||||
@ -40,15 +43,77 @@ def datetime_to_generalized_time(dt):
|
|||||||
return generalized_time_str + "Z"
|
return generalized_time_str + "Z"
|
||||||
|
|
||||||
|
|
||||||
def configure_postfix(host):
|
def configure_postfix(host, realm):
|
||||||
"""Configure postfix to be the destination of the IPA domain.
|
"""Configure postfix for:
|
||||||
|
* SASL auth
|
||||||
|
* to be the destination of the IPA domain.
|
||||||
"""
|
"""
|
||||||
host.run_command(["systemctl", "start", "postfix"])
|
def postconf(host, option):
|
||||||
|
host.run_command(r"postconf -e '%s'" % option)
|
||||||
|
|
||||||
|
# Setup the keytab we need for SASL auth
|
||||||
|
host.run_command(r"ipa service-add smtp/%s --force" % host.hostname)
|
||||||
|
host.run_command(r"ipa-getkeytab -p smtp/%s -k /etc/postfix/smtp.keytab" %
|
||||||
|
host.hostname)
|
||||||
|
host.run_command(r"chown root:mail /etc/postfix/smtp.keytab")
|
||||||
|
host.run_command(r"chmod 640 /etc/postfix/smtp.keytab")
|
||||||
|
|
||||||
|
# Configure the SASL smtp service to use GSSAPI
|
||||||
|
host.run_command(
|
||||||
|
r"sed -i 's/plain login/GSSAPI plain login/' /etc/sasl2/smtpd.conf")
|
||||||
|
host.run_command(
|
||||||
|
r"sed -i 's/MECH=pam/MECH=kerberos5/' /etc/sysconfig/saslauthd")
|
||||||
|
postconf(host,
|
||||||
|
'import_environment = MAIL_CONFIG MAIL_DEBUG MAIL_LOGTAG TZ '
|
||||||
|
'XAUTHORITY DISPLAY LANG=C KRB5_KTNAME=/etc/postfix/smtp.keytab')
|
||||||
|
postconf(host,
|
||||||
|
'smtpd_client_restrictions = permit_sasl_authenticated, reject')
|
||||||
|
postconf(host,
|
||||||
|
'smtpd_recipient_restrictions = permit_sasl_authenticated, reject')
|
||||||
|
postconf(host,
|
||||||
|
'smtpd_sender_restrictions = permit_sasl_authenticated, reject')
|
||||||
|
postconf(host, 'smtpd_sasl_auth_enable = yes')
|
||||||
|
postconf(host, 'smtpd_sasl_security_options = noanonymous')
|
||||||
|
postconf(host,
|
||||||
|
'smtpd_sasl_tls_security_options = $smtpd_sasl_security_options')
|
||||||
|
postconf(host, 'broken_sasl_auth_clients = yes')
|
||||||
|
postconf(host, 'smtpd_sasl_authenticated_header = yes')
|
||||||
|
postconf(host, 'smtpd_sasl_local_domain = %s' % realm)
|
||||||
|
|
||||||
|
host.run_command(["systemctl", "restart", "saslauthd"])
|
||||||
|
|
||||||
result = host.run_command(["postconf", "mydestination"])
|
result = host.run_command(["postconf", "mydestination"])
|
||||||
mydestination = result.stdout_text.strip() + ", " + host.domain.name
|
mydestination = result.stdout_text.strip() + ", " + host.domain.name
|
||||||
cmd = ["postconf", "-e", mydestination]
|
postconf(host, mydestination)
|
||||||
print(cmd)
|
|
||||||
host.run_command(cmd)
|
host.run_command(["systemctl", "restart", "postfix"])
|
||||||
|
|
||||||
|
|
||||||
|
def decode_header(header):
|
||||||
|
"""Decode the header if needed and return the value"""
|
||||||
|
# Only support one value for now
|
||||||
|
(value, encoding) = email.header.decode_header(header)[0]
|
||||||
|
if encoding:
|
||||||
|
return value.decode(encoding)
|
||||||
|
else:
|
||||||
|
return value
|
||||||
|
|
||||||
|
|
||||||
|
def validate_mail(host, id, content):
|
||||||
|
"""Retrieve a remote e-mail and determine if it matches the current user"""
|
||||||
|
mail = host.get_file_contents('/var/mail/user%d' % id)
|
||||||
|
msg = email.message_from_bytes(mail)
|
||||||
|
assert decode_header(msg['To']) == 'user%d@%s' % (id, host.domain.name)
|
||||||
|
assert decode_header(msg['From']) == 'IPA-EPN <noreply@%s>' % \
|
||||||
|
host.domain.name
|
||||||
|
assert decode_header(msg['subject']) == 'Your password is expiring.'
|
||||||
|
|
||||||
|
for part in msg.walk():
|
||||||
|
if part.get_content_maintype() == 'multipart':
|
||||||
|
continue
|
||||||
|
body = part.get_payload()
|
||||||
|
decoded = base64.b64decode(body).decode('utf-8')
|
||||||
|
assert content in decoded
|
||||||
|
|
||||||
|
|
||||||
class TestEPN(IntegrationTest):
|
class TestEPN(IntegrationTest):
|
||||||
@ -56,6 +121,7 @@ class TestEPN(IntegrationTest):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
num_clients = 1
|
num_clients = 1
|
||||||
|
notify_ttls = (28, 14, 7, 3, 1)
|
||||||
|
|
||||||
def _check_epn_output(
|
def _check_epn_output(
|
||||||
self,
|
self,
|
||||||
@ -75,16 +141,23 @@ class TestEPN(IntegrationTest):
|
|||||||
def install(cls, mh):
|
def install(cls, mh):
|
||||||
tasks.install_packages(cls.master, ["postfix"])
|
tasks.install_packages(cls.master, ["postfix"])
|
||||||
tasks.install_packages(cls.clients[0], ["postfix"])
|
tasks.install_packages(cls.clients[0], ["postfix"])
|
||||||
|
for host in (cls.master, cls.clients[0]):
|
||||||
|
try:
|
||||||
|
tasks.install_packages(host, ["cyrus-sasl"])
|
||||||
|
except Exception:
|
||||||
|
# the package is likely already installed
|
||||||
|
pass
|
||||||
tasks.install_master(cls.master, setup_dns=True)
|
tasks.install_master(cls.master, setup_dns=True)
|
||||||
tasks.install_client(cls.master, cls.clients[0])
|
tasks.install_client(cls.master, cls.clients[0])
|
||||||
configure_postfix(cls.master)
|
configure_postfix(cls.master, cls.master.domain.realm)
|
||||||
configure_postfix(cls.clients[0])
|
configure_postfix(cls.clients[0], cls.master.domain.realm)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def uninstall(cls, mh):
|
def uninstall(cls, mh):
|
||||||
super(TestEPN, cls).uninstall(mh)
|
super(TestEPN, cls).uninstall(mh)
|
||||||
tasks.uninstall_packages(cls.master, ["postfix"])
|
tasks.uninstall_packages(cls.master, ["postfix"])
|
||||||
tasks.uninstall_packages(cls.clients[0], ["postfix"])
|
tasks.uninstall_packages(cls.clients[0], ["postfix"])
|
||||||
|
cls.master.run_command(r'rm -f /etc/postfix/smtp.keytab')
|
||||||
|
|
||||||
def test_EPN_smoketest_1(self):
|
def test_EPN_smoketest_1(self):
|
||||||
"""No users except admin. Check --dry-run output.
|
"""No users except admin. Check --dry-run output.
|
||||||
@ -117,6 +190,12 @@ class TestEPN(IntegrationTest):
|
|||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def cleanupmail(self):
|
||||||
|
"""Cleanup any existing mail that has been sent."""
|
||||||
|
for i in self.notify_ttls:
|
||||||
|
self.master.run_command(["rm", "-f", "/var/mail/user%d" % i])
|
||||||
|
|
||||||
def test_EPN_smoketest_2(self, cleanupusers):
|
def test_EPN_smoketest_2(self, cleanupusers):
|
||||||
"""Add a user without password.
|
"""Add a user without password.
|
||||||
Add a user whose password expires within the default time range.
|
Add a user whose password expires within the default time range.
|
||||||
@ -137,11 +216,9 @@ class TestEPN(IntegrationTest):
|
|||||||
(stdout_text_client, unused) = self._check_epn_output(
|
(stdout_text_client, unused) = self._check_epn_output(
|
||||||
self.clients[0], dry_run=True
|
self.clients[0], dry_run=True
|
||||||
)
|
)
|
||||||
print(json.dumps(json.loads(stdout_text_client), ensure_ascii=False))
|
|
||||||
(stdout_text_master, unused) = self._check_epn_output(
|
(stdout_text_master, unused) = self._check_epn_output(
|
||||||
self.master, dry_run=True
|
self.master, dry_run=True
|
||||||
)
|
)
|
||||||
print(json.dumps(json.loads(stdout_text_master), ensure_ascii=False))
|
|
||||||
assert stdout_text_master == stdout_text_client
|
assert stdout_text_master == stdout_text_client
|
||||||
assert "testuser0" not in stdout_text_client
|
assert "testuser0" not in stdout_text_client
|
||||||
assert "testuser1" in stdout_text_client
|
assert "testuser1" in stdout_text_client
|
||||||
@ -178,11 +255,9 @@ class TestEPN(IntegrationTest):
|
|||||||
(stdout_text_client, unused) = self._check_epn_output(
|
(stdout_text_client, unused) = self._check_epn_output(
|
||||||
self.clients[0], dry_run=True
|
self.clients[0], dry_run=True
|
||||||
)
|
)
|
||||||
print(json.dumps(json.loads(stdout_text_client), ensure_ascii=False))
|
|
||||||
(stdout_text_master, unused) = self._check_epn_output(
|
(stdout_text_master, unused) = self._check_epn_output(
|
||||||
self.master, dry_run=True
|
self.master, dry_run=True
|
||||||
)
|
)
|
||||||
print(json.dumps(json.loads(stdout_text_master), ensure_ascii=False))
|
|
||||||
assert stdout_text_master == stdout_text_client
|
assert stdout_text_master == stdout_text_client
|
||||||
user_lst = []
|
user_lst = []
|
||||||
for user in json.loads(stdout_text_master):
|
for user in json.loads(stdout_text_master):
|
||||||
@ -198,7 +273,7 @@ class TestEPN(IntegrationTest):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
# Compare the notify_ttls values
|
# Compare the notify_ttls values
|
||||||
for i in (28, 14, 7, 3, 1):
|
for i in self.notify_ttls:
|
||||||
user_list = []
|
user_list = []
|
||||||
(stdout_text_client, unused) = self._check_epn_output(
|
(stdout_text_client, unused) = self._check_epn_output(
|
||||||
self.clients[0], from_nbdays=i, to_nbdays=i + 1, dry_run=True)
|
self.clients[0], from_nbdays=i, to_nbdays=i + 1, dry_run=True)
|
||||||
@ -206,3 +281,43 @@ class TestEPN(IntegrationTest):
|
|||||||
user_list.append(user["uid"])
|
user_list.append(user["uid"])
|
||||||
assert len(user_list) == 1
|
assert len(user_list) == 1
|
||||||
assert user_list[0] == "user%d" % i
|
assert user_list[0] == "user%d" % i
|
||||||
|
|
||||||
|
def test_EPN_authenticated(self, cleanupmail):
|
||||||
|
"""Test the to/from nbdays options (implies --dry-run)
|
||||||
|
|
||||||
|
We have a set of users installed with varying expiration
|
||||||
|
dates. Confirm that to/from nbdays finds them.
|
||||||
|
"""
|
||||||
|
epn_conf = textwrap.dedent('''
|
||||||
|
[global]
|
||||||
|
smtp_user={user}
|
||||||
|
smtp_password={password}
|
||||||
|
'''.format(user=self.master.config.admin_name,
|
||||||
|
password=self.master.config.admin_password))
|
||||||
|
self.master.put_file_contents('/etc/ipa/epn.conf', epn_conf)
|
||||||
|
|
||||||
|
tasks.ipa_epn(self.master)
|
||||||
|
for i in self.notify_ttls:
|
||||||
|
validate_mail(self.master, i,
|
||||||
|
"Hi test user,\n\nYour password will expire")
|
||||||
|
|
||||||
|
def test_EPN_template(self, cleanupmail):
|
||||||
|
"""Test the to/from nbdays options (implies --dry-run)
|
||||||
|
|
||||||
|
We have a set of users installed with varying expiration
|
||||||
|
dates. Confirm that to/from nbdays finds them.
|
||||||
|
"""
|
||||||
|
exp_msg = textwrap.dedent('''
|
||||||
|
Hi {{ first }} {{last}},
|
||||||
|
Your login entry {{uid}} is going to expire on
|
||||||
|
{{ expiration }}. Please change it soon.
|
||||||
|
|
||||||
|
Your friendly neighborhood admins.
|
||||||
|
''')
|
||||||
|
self.master.put_file_contents('/etc/ipa/epn/expire_msg.template',
|
||||||
|
exp_msg)
|
||||||
|
|
||||||
|
tasks.ipa_epn(self.master)
|
||||||
|
for i in self.notify_ttls:
|
||||||
|
validate_mail(self.master, i,
|
||||||
|
"Hi user,\nYour login entry user%d is going" % i)
|
||||||
|
Loading…
Reference in New Issue
Block a user