From 148fa2f9f62d04fd11d2bac74c5ec1727a13a91b Mon Sep 17 00:00:00 2001 From: Jennifer Richards Date: Thu, 10 Oct 2024 14:23:25 -0300 Subject: [PATCH] fix: decode header fields for alert msg (#8034) * style: Black * chore: type hints * refactor: % to f-string * refactor: helper to decode header values * fix: decode header fields for pop-up msg * test: add tests * fix: use truthiness check We want to suppress empty strings, too... * refactor: use f-string for time formatting * test: clarify side_effect intention --- ietf/utils/mail.py | 68 ++++++++++++++++----- ietf/utils/tests.py | 143 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 195 insertions(+), 16 deletions(-) diff --git a/ietf/utils/mail.py b/ietf/utils/mail.py index 4585fdb84..541716145 100644 --- a/ietf/utils/mail.py +++ b/ietf/utils/mail.py @@ -19,11 +19,13 @@ from email.mime.multipart import MIMEMultipart from email.header import Header, decode_header from email import message_from_bytes, message_from_string from email import charset as Charset +from typing import Optional from django.conf import settings from django.contrib import messages from django.core.exceptions import ImproperlyConfigured, ValidationError from django.core.validators import validate_email +from django.http import HttpRequest from django.template.loader import render_to_string from django.template import Context,RequestContext from django.utils import timezone @@ -64,6 +66,18 @@ def add_headers(msg): msg['From'] = settings.DEFAULT_FROM_EMAIL return msg + +def decode_header_value(value: str) -> str: + """Decode a header value + + Easier-to-use wrapper around email.message.decode_header() + """ + return "".join( + part.decode(charset if charset else "utf-8") if isinstance(part, bytes) else part + for part, charset in decode_header(value) + ) + + class SMTPSomeRefusedRecipients(smtplib.SMTPException): def __init__(self, message, original_msg, refusals): @@ -251,8 +265,7 @@ def parseaddr(addr): """ - addr = ''.join( [ ( s.decode(m) if m else s.decode()) if isinstance(s, bytes) else s for (s,m) in decode_header(addr) ] ) - name, addr = simple_parseaddr(addr) + name, addr = simple_parseaddr(decode_header_value(addr)) return name, addr def excludeaddrs(addrlist, exlist): @@ -330,18 +343,45 @@ def condition_message(to, frm, subject, msg, cc, extra): msg['Message-ID'] = make_msgid() -def show_that_mail_was_sent(request,leadline,msg,bcc): - if request and request.user: - from ietf.ietfauth.utils import has_role - if has_role(request.user,['Area Director','Secretariat','IANA','RFC Editor','ISE','IAD','IRTF Chair','WG Chair','RG Chair','WG Secretary','RG Secretary']): - info = "%s at %s %s\n" % (leadline,timezone.now().strftime("%Y-%m-%d %H:%M:%S"),settings.TIME_ZONE) - info += "Subject: %s\n" % force_str(msg.get('Subject','[no subject]')) - info += "To: %s\n" % msg.get('To','[no to]') - if msg.get('Cc'): - info += "Cc: %s\n" % msg.get('Cc') - if bcc: - info += "Bcc: %s\n" % bcc - messages.info(request,info,extra_tags='preformatted',fail_silently=True) +def show_that_mail_was_sent(request: HttpRequest, leadline: str, msg: Message, bcc: Optional[str]): + if request and request.user: + from ietf.ietfauth.utils import has_role + + if has_role( + request.user, + [ + "Area Director", + "Secretariat", + "IANA", + "RFC Editor", + "ISE", + "IAD", + "IRTF Chair", + "WG Chair", + "RG Chair", + "WG Secretary", + "RG Secretary", + ], + ): + subject = decode_header_value(msg.get("Subject", "[no subject]")) + _to = decode_header_value(msg.get("To", "[no to]")) + info_lines = [ + f"{leadline} at {timezone.now():%Y-%m-%d %H:%M:%S %Z}", + f"Subject: {subject}", + f"To: {_to}", + ] + cc = msg.get("Cc", None) + if cc: + info_lines.append(f"Cc: {decode_header_value(cc)}") + if bcc: + info_lines.append(f"Bcc: {decode_header_value(bcc)}") + messages.info( + request, + "\n".join(info_lines), + extra_tags="preformatted", + fail_silently=True, + ) + def save_as_message(request, msg, bcc): by = ((request and request.user and not request.user.is_anonymous and request.user.person) diff --git a/ietf/utils/tests.py b/ietf/utils/tests.py index decdd778d..0a1986a60 100644 --- a/ietf/utils/tests.py +++ b/ietf/utils/tests.py @@ -11,10 +11,11 @@ import pytz import shutil import types -from mock import patch +from mock import call, patch from pyquery import PyQuery from typing import Dict, List # pyflakes:ignore +from email.message import Message from email.mime.image import MIMEImage from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText @@ -32,6 +33,7 @@ from django.template import Template # pyflakes:ignore from django.template.defaulttags import URLNode from django.template.loader import get_template, render_to_string from django.templatetags.static import StaticNode +from django.test import RequestFactory from django.urls import reverse as urlreverse import debug # pyflakes:ignore @@ -42,7 +44,15 @@ from ietf.submit.tests import submission_file from ietf.utils.draft import PlaintextDraft, getmeta from ietf.utils.fields import SearchableField from ietf.utils.log import unreachable, assertion -from ietf.utils.mail import send_mail_preformatted, send_mail_text, send_mail_mime, outbox, get_payload_text +from ietf.utils.mail import ( + send_mail_preformatted, + send_mail_text, + send_mail_mime, + outbox, + get_payload_text, + decode_header_value, + show_that_mail_was_sent, +) from ietf.utils.test_runner import get_template_paths, set_coverage_checking from ietf.utils.test_utils import TestCase, unicontent from ietf.utils.text import parse_unicode @@ -109,6 +119,135 @@ body recv = outbox[-1] self.assertEqual(recv['Fuzz'], 'bucket, monger') + +class MailUtilsTests(TestCase): + def test_decode_header_value(self): + self.assertEqual( + decode_header_value("cake"), + "cake", + "decodes simple string value", + ) + self.assertEqual( + decode_header_value("=?utf-8?b?8J+Ogg==?="), + "\U0001f382", + "decodes single utf-8-encoded part", + ) + self.assertEqual( + decode_header_value("=?utf-8?b?8J+Ogg==?= = =?macintosh?b?jYxrjg==?="), + "\U0001f382 = çåké", + "decodes a value with non-utf-8 encodings", + ) + + # Patch in a side_effect so we can distinguish values that came from decode_header_value. + @patch("ietf.utils.mail.decode_header_value", side_effect=lambda s: f"decoded-{s}") + @patch("ietf.utils.mail.messages") + def test_show_that_mail_was_sent(self, mock_messages, mock_decode_header_value): + request = RequestFactory().get("/some/path") + request.user = object() # just needs to exist + msg = Message() + msg["To"] = "to-value" + msg["Subject"] = "subject-value" + msg["Cc"] = "cc-value" + with patch("ietf.ietfauth.utils.has_role", return_value=True): + show_that_mail_was_sent(request, "mail was sent", msg, "bcc-value") + self.assertCountEqual( + mock_decode_header_value.call_args_list, + [call("to-value"), call("subject-value"), call("cc-value"), call("bcc-value")], + ) + self.assertEqual(mock_messages.info.call_args[0][0], request) + self.assertIn("mail was sent", mock_messages.info.call_args[0][1]) + self.assertIn("decoded-subject-value", mock_messages.info.call_args[0][1]) + self.assertIn("decoded-to-value", mock_messages.info.call_args[0][1]) + self.assertIn("decoded-cc-value", mock_messages.info.call_args[0][1]) + self.assertIn("decoded-bcc-value", mock_messages.info.call_args[0][1]) + mock_messages.reset_mock() + mock_decode_header_value.reset_mock() + + # no bcc + with patch("ietf.ietfauth.utils.has_role", return_value=True): + show_that_mail_was_sent(request, "mail was sent", msg, None) + self.assertCountEqual( + mock_decode_header_value.call_args_list, + [call("to-value"), call("subject-value"), call("cc-value")], + ) + self.assertEqual(mock_messages.info.call_args[0][0], request) + self.assertIn("mail was sent", mock_messages.info.call_args[0][1]) + self.assertIn("decoded-subject-value", mock_messages.info.call_args[0][1]) + self.assertIn("decoded-to-value", mock_messages.info.call_args[0][1]) + self.assertIn("decoded-cc-value", mock_messages.info.call_args[0][1]) + # Note: here and below - when using assertNotIn(), leaving off the "decoded-" prefix + # proves that neither the original value nor the decoded value appear. + self.assertNotIn("bcc-value", mock_messages.info.call_args[0][1]) + mock_messages.reset_mock() + mock_decode_header_value.reset_mock() + + # no cc + del msg["Cc"] + with patch("ietf.ietfauth.utils.has_role", return_value=True): + show_that_mail_was_sent(request, "mail was sent", msg, None) + self.assertCountEqual( + mock_decode_header_value.call_args_list, + [call("to-value"), call("subject-value")], + ) + self.assertEqual(mock_messages.info.call_args[0][0], request) + self.assertIn("mail was sent", mock_messages.info.call_args[0][1]) + self.assertIn("decoded-subject-value", mock_messages.info.call_args[0][1]) + self.assertIn("decoded-to-value", mock_messages.info.call_args[0][1]) + self.assertNotIn("cc-value", mock_messages.info.call_args[0][1]) + self.assertNotIn("bcc-value", mock_messages.info.call_args[0][1]) + mock_messages.reset_mock() + mock_decode_header_value.reset_mock() + + # no to + del msg["To"] + with patch("ietf.ietfauth.utils.has_role", return_value=True): + show_that_mail_was_sent(request, "mail was sent", msg, None) + self.assertCountEqual( + mock_decode_header_value.call_args_list, + [call("[no to]"), call("subject-value")], + ) + self.assertEqual(mock_messages.info.call_args[0][0], request) + self.assertIn("mail was sent", mock_messages.info.call_args[0][1]) + self.assertIn("decoded-subject-value", mock_messages.info.call_args[0][1]) + self.assertIn("decoded-[no to]", mock_messages.info.call_args[0][1]) + self.assertNotIn("to-value", mock_messages.info.call_args[0][1]) + self.assertNotIn("cc-value", mock_messages.info.call_args[0][1]) + self.assertNotIn("bcc-value", mock_messages.info.call_args[0][1]) + mock_messages.reset_mock() + mock_decode_header_value.reset_mock() + + # no subject + del msg["Subject"] + with patch("ietf.ietfauth.utils.has_role", return_value=True): + show_that_mail_was_sent(request, "mail was sent", msg, None) + self.assertCountEqual( + mock_decode_header_value.call_args_list, + [call("[no to]"), call("[no subject]")], + ) + self.assertEqual(mock_messages.info.call_args[0][0], request) + self.assertIn("mail was sent", mock_messages.info.call_args[0][1]) + self.assertIn("decoded-[no subject]", mock_messages.info.call_args[0][1]) + self.assertNotIn("subject-value", mock_messages.info.call_args[0][1]) + self.assertIn("decoded-[no to]", mock_messages.info.call_args[0][1]) + self.assertNotIn("to-value", mock_messages.info.call_args[0][1]) + self.assertNotIn("cc-value", mock_messages.info.call_args[0][1]) + self.assertNotIn("bcc-value", mock_messages.info.call_args[0][1]) + mock_messages.reset_mock() + mock_decode_header_value.reset_mock() + + # user does not have role + with patch("ietf.ietfauth.utils.has_role", return_value=False): + show_that_mail_was_sent(request, "mail was sent", msg, None) + self.assertFalse(mock_messages.called) + + # no user + request.user = None + with patch("ietf.ietfauth.utils.has_role", return_value=True) as mock_has_role: + show_that_mail_was_sent(request, "mail was sent", msg, None) + self.assertFalse(mock_messages.called) + self.assertFalse(mock_has_role.called) + + class TestSMTPServer(TestCase): def test_address_rejected(self):