diff --git a/ietf/utils/mail.py b/ietf/utils/mail.py index 7fc7a521d..3299b8523 100644 --- a/ietf/utils/mail.py +++ b/ietf/utils/mail.py @@ -28,6 +28,8 @@ from contextlib import contextmanager test_mode = False outbox = [] +SMTP_ADDR = { 'ip4':settings.EMAIL_HOST, 'port':settings.EMAIL_PORT} + def empty_outbox(): outbox[:] = [] @@ -63,7 +65,7 @@ def send_smtp(msg, bcc=None): Message. The From address will be used if present or will default to the django setting DEFAULT_FROM_EMAIL - If someone has set test_mode=True, then just append the msg to + If someone has set test_mode=True, then append the msg to the outbox. ''' add_headers(msg) @@ -77,14 +79,13 @@ def send_smtp(msg, bcc=None): else: if test_mode: outbox.append(msg) - return server = None try: server = smtplib.SMTP() #log("SMTP server: %s" % repr(server)) #if settings.DEBUG: # server.set_debuglevel(1) - conn_code, conn_msg = server.connect(settings.EMAIL_HOST, settings.EMAIL_PORT) + conn_code, conn_msg = server.connect(SMTP_ADDR['ip4'], SMTP_ADDR['port']) #log("SMTP connect: code: %s; msg: %s" % (conn_code, conn_msg)) if settings.EMAIL_HOST_USER and settings.EMAIL_HOST_PASSWORD: server.ehlo() diff --git a/ietf/utils/test_runner.py b/ietf/utils/test_runner.py index 1b1ec22f1..882cf4012 100644 --- a/ietf/utils/test_runner.py +++ b/ietf/utils/test_runner.py @@ -41,6 +41,7 @@ from django.test.runner import DiscoverRunner from django.core.management import call_command import ietf.utils.mail +from ietf.utils.test_smtpserver import SMTPTestServerDriver loaded_templates = set() visited_urls = set() @@ -192,6 +193,8 @@ class IetfTestRunner(DiscoverRunner): if socket.gethostname().split('.')[0] in ['core3', 'ietfa', 'ietfb', 'ietfc', ]: raise EnvironmentError("Refusing to run tests on production server") ietf.utils.mail.test_mode = True + ietf.utils.mail.SMTP_ADDR['ip4'] = '127.0.0.1' + ietf.utils.mail.SMTP_ADDR['port'] = 2025 global old_destroy, old_create, test_database_name from django.db import connection @@ -219,7 +222,13 @@ class IetfTestRunner(DiscoverRunner): assert not settings.IDTRACKER_BASE_URL.endswith('/') - failures = super(IetfTestRunner, self).run_tests(test_labels, extra_tests=extra_tests, **kwargs) + smtpd_driver = SMTPTestServerDriver((ietf.utils.mail.SMTP_ADDR['ip4'],ietf.utils.mail.SMTP_ADDR['port']),None) + smtpd_driver.start() + + try: + failures = super(IetfTestRunner, self).run_tests(test_labels, extra_tests=extra_tests, **kwargs) + finally: + smtpd_driver.stop() if check_coverage and not failures: check_template_coverage(self.verbosity) diff --git a/ietf/utils/test_smtpserver.py b/ietf/utils/test_smtpserver.py new file mode 100644 index 000000000..1475c9637 --- /dev/null +++ b/ietf/utils/test_smtpserver.py @@ -0,0 +1,80 @@ +import smtpd +import threading +import asyncore + +class AsyncCoreLoopThread(object): + + def wrap_loop(self, exit_condition, timeout=1.0, use_poll=False, map=None): + if map is None: + map = asyncore.socket_map + while map and not exit_condition: + asyncore.loop(timeout=1.0, use_poll=False, map=map, count=1) + + def start(self): + """Start the listening service""" + self.exit_condition = [] + kwargs={'exit_condition':self.exit_condition,'timeout':1.0} + self.thread = threading.Thread(target=self.wrap_loop,kwargs=kwargs ) + self.thread.start() + + def stop(self): + """Stop the listening service""" + self.exit_condition.append(True) + self.thread.join() + + +class SMTPTestChannel(smtpd.SMTPChannel): + + def smtp_RCPT(self, arg): + if not self._SMTPChannel__mailfrom: + self.push('503 Error: need MAIL command') + return + address = self._SMTPChannel__getaddr('TO:', arg) if arg else None + if not address: + self.push('501 Syntax: RCPT TO:
') + return + if "poison" in address: + self.push('550 Error: Not touching that') + return + self._SMTPChannel__rcpttos.append(address) + self.push('250 Ok') + +class SMTPTestServer(smtpd.SMTPServer): + + def __init__(self,localaddr,remoteaddr,inbox): + if inbox is not None: + self.inbox=inbox + else: + self.inbox = [] + smtpd.SMTPServer.__init__(self,localaddr,remoteaddr) + + def handle_accept(self): + pair = self.accept() + if pair is not None: + conn, addr = pair + #channel = SMTPTestChannel(self, conn, addr) + SMTPTestChannel(self, conn, addr) + + def process_message(self, peer, mailfrom, rcpttos, data): + self.inbox.append(data) + + +class SMTPTestServerDriver(object): + def __init__(self, localaddr, remoteaddr, inbox=None): + self.localaddr=localaddr + self.remoteaddr=remoteaddr + if inbox is not None: + self.inbox = inbox + else: + self.inbox = [] + self.thread_driver = None + + def start(self): + self.smtpserver = SMTPTestServer(self.localaddr,self.remoteaddr,self.inbox) + self.thread_driver = AsyncCoreLoopThread() + self.thread_driver.start() + + def stop(self): + if self.thread_driver: + self.thread_driver.stop() + diff --git a/ietf/utils/tests.py b/ietf/utils/tests.py index c6e087128..02a0c8067 100644 --- a/ietf/utils/tests.py +++ b/ietf/utils/tests.py @@ -1,12 +1,11 @@ -from __future__ import print_function - import os.path +from smtplib import SMTPRecipientsRefused from django.conf import settings +from django.test import TestCase from ietf.utils.management.commands import pyflakes -from ietf.utils.test_utils import TestCase - +from ietf.utils.mail import send_mail_text, outbox, SMTPSomeRefusedRecipients, smtp_error_logging class PyFlakesTestCase(TestCase): @@ -15,3 +14,29 @@ class PyFlakesTestCase(TestCase): warnings = [] warnings = pyflakes.checkPaths([path], verbosity=0) self.assertEqual([str(w) for w in warnings], []) + +class TestSMTPServer(TestCase): + + def test_address_rejected(self): + + def send_mail(to): + send_mail_text(None, to=to, frm=None, subject="Test for rejection", txt="dummy body") + + with self.assertRaises(SMTPSomeRefusedRecipients): + send_mail('good@example.com,poison@example.com') + + with self.assertRaises(SMTPRecipientsRefused): + send_mail('poison@example.com') + + len_before = len(outbox) + with smtp_error_logging(send_mail) as send: + send('good@example.com,poison@example.com') + self.assertEqual(len(outbox),len_before+2) + self.assertTrue('Some recipients were refused' in outbox[-1]['Subject']) + + len_before = len(outbox) + with smtp_error_logging(send_mail) as send: + send('poison@example.com') + self.assertEqual(len(outbox),len_before+2) + self.assertTrue('error while sending email' in outbox[-1]['Subject']) + \ No newline at end of file