Merged in [7591] from rjsparks@nostrum.com:

Adds a real (if simple) SMTP server to the test framework and tests handling of exceptions and rejected addresses. Fixes ticket #1314.
 - Legacy-Id: 7613
Note: SVN reference [7591] has been migrated to Git commit 54919f01343995ab154d27dcfaf7a60b15dd5eee
This commit is contained in:
Henrik Levkowetz 2014-04-16 19:40:57 +00:00
parent fc0c605f5b
commit fe5d53fc40
4 changed files with 123 additions and 8 deletions

View file

@ -28,6 +28,8 @@ from contextlib import contextmanager
test_mode = False
outbox = []
SMTP_ADDR = { 'ip4':settings.EMAIL_HOST, 'port':settings.EMAIL_PORT}
def empty_outbox():
outbox[:] = []
@ -63,7 +65,7 @@ def send_smtp(msg, bcc=None):
Message. The From address will be used if present or will default
to the django setting DEFAULT_FROM_EMAIL
If someone has set test_mode=True, then just append the msg to
If someone has set test_mode=True, then append the msg to
the outbox.
'''
add_headers(msg)
@ -77,14 +79,13 @@ def send_smtp(msg, bcc=None):
else:
if test_mode:
outbox.append(msg)
return
server = None
try:
server = smtplib.SMTP()
#log("SMTP server: %s" % repr(server))
#if settings.DEBUG:
# server.set_debuglevel(1)
conn_code, conn_msg = server.connect(settings.EMAIL_HOST, settings.EMAIL_PORT)
conn_code, conn_msg = server.connect(SMTP_ADDR['ip4'], SMTP_ADDR['port'])
#log("SMTP connect: code: %s; msg: %s" % (conn_code, conn_msg))
if settings.EMAIL_HOST_USER and settings.EMAIL_HOST_PASSWORD:
server.ehlo()

View file

@ -41,6 +41,7 @@ from django.test.runner import DiscoverRunner
from django.core.management import call_command
import ietf.utils.mail
from ietf.utils.test_smtpserver import SMTPTestServerDriver
loaded_templates = set()
visited_urls = set()
@ -192,6 +193,8 @@ class IetfTestRunner(DiscoverRunner):
if socket.gethostname().split('.')[0] in ['core3', 'ietfa', 'ietfb', 'ietfc', ]:
raise EnvironmentError("Refusing to run tests on production server")
ietf.utils.mail.test_mode = True
ietf.utils.mail.SMTP_ADDR['ip4'] = '127.0.0.1'
ietf.utils.mail.SMTP_ADDR['port'] = 2025
global old_destroy, old_create, test_database_name
from django.db import connection
@ -219,7 +222,13 @@ class IetfTestRunner(DiscoverRunner):
assert not settings.IDTRACKER_BASE_URL.endswith('/')
failures = super(IetfTestRunner, self).run_tests(test_labels, extra_tests=extra_tests, **kwargs)
smtpd_driver = SMTPTestServerDriver((ietf.utils.mail.SMTP_ADDR['ip4'],ietf.utils.mail.SMTP_ADDR['port']),None)
smtpd_driver.start()
try:
failures = super(IetfTestRunner, self).run_tests(test_labels, extra_tests=extra_tests, **kwargs)
finally:
smtpd_driver.stop()
if check_coverage and not failures:
check_template_coverage(self.verbosity)

View file

@ -0,0 +1,80 @@
import smtpd
import threading
import asyncore
class AsyncCoreLoopThread(object):
def wrap_loop(self, exit_condition, timeout=1.0, use_poll=False, map=None):
if map is None:
map = asyncore.socket_map
while map and not exit_condition:
asyncore.loop(timeout=1.0, use_poll=False, map=map, count=1)
def start(self):
"""Start the listening service"""
self.exit_condition = []
kwargs={'exit_condition':self.exit_condition,'timeout':1.0}
self.thread = threading.Thread(target=self.wrap_loop,kwargs=kwargs )
self.thread.start()
def stop(self):
"""Stop the listening service"""
self.exit_condition.append(True)
self.thread.join()
class SMTPTestChannel(smtpd.SMTPChannel):
def smtp_RCPT(self, arg):
if not self._SMTPChannel__mailfrom:
self.push('503 Error: need MAIL command')
return
address = self._SMTPChannel__getaddr('TO:', arg) if arg else None
if not address:
self.push('501 Syntax: RCPT TO: <address>')
return
if "poison" in address:
self.push('550 Error: Not touching that')
return
self._SMTPChannel__rcpttos.append(address)
self.push('250 Ok')
class SMTPTestServer(smtpd.SMTPServer):
def __init__(self,localaddr,remoteaddr,inbox):
if inbox is not None:
self.inbox=inbox
else:
self.inbox = []
smtpd.SMTPServer.__init__(self,localaddr,remoteaddr)
def handle_accept(self):
pair = self.accept()
if pair is not None:
conn, addr = pair
#channel = SMTPTestChannel(self, conn, addr)
SMTPTestChannel(self, conn, addr)
def process_message(self, peer, mailfrom, rcpttos, data):
self.inbox.append(data)
class SMTPTestServerDriver(object):
def __init__(self, localaddr, remoteaddr, inbox=None):
self.localaddr=localaddr
self.remoteaddr=remoteaddr
if inbox is not None:
self.inbox = inbox
else:
self.inbox = []
self.thread_driver = None
def start(self):
self.smtpserver = SMTPTestServer(self.localaddr,self.remoteaddr,self.inbox)
self.thread_driver = AsyncCoreLoopThread()
self.thread_driver.start()
def stop(self):
if self.thread_driver:
self.thread_driver.stop()

View file

@ -1,12 +1,11 @@
from __future__ import print_function
import os.path
from smtplib import SMTPRecipientsRefused
from django.conf import settings
from django.test import TestCase
from ietf.utils.management.commands import pyflakes
from ietf.utils.test_utils import TestCase
from ietf.utils.mail import send_mail_text, outbox, SMTPSomeRefusedRecipients, smtp_error_logging
class PyFlakesTestCase(TestCase):
@ -15,3 +14,29 @@ class PyFlakesTestCase(TestCase):
warnings = []
warnings = pyflakes.checkPaths([path], verbosity=0)
self.assertEqual([str(w) for w in warnings], [])
class TestSMTPServer(TestCase):
def test_address_rejected(self):
def send_mail(to):
send_mail_text(None, to=to, frm=None, subject="Test for rejection", txt="dummy body")
with self.assertRaises(SMTPSomeRefusedRecipients):
send_mail('good@example.com,poison@example.com')
with self.assertRaises(SMTPRecipientsRefused):
send_mail('poison@example.com')
len_before = len(outbox)
with smtp_error_logging(send_mail) as send:
send('good@example.com,poison@example.com')
self.assertEqual(len(outbox),len_before+2)
self.assertTrue('Some recipients were refused' in outbox[-1]['Subject'])
len_before = len(outbox)
with smtp_error_logging(send_mail) as send:
send('poison@example.com')
self.assertEqual(len(outbox),len_before+2)
self.assertTrue('error while sending email' in outbox[-1]['Subject'])