fix: decode header fields for alert msg (#8034)

* style: Black

* chore: type hints

* refactor: % to f-string

* refactor: helper to decode header values

* fix: decode header fields for pop-up msg

* test: add tests

* fix: use truthiness check

We want to suppress empty strings, too...

* refactor: use f-string for time formatting

* test: clarify side_effect intention
This commit is contained in:
Jennifer Richards 2024-10-10 14:23:25 -03:00 committed by GitHub
parent f7e0a67095
commit 148fa2f9f6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 195 additions and 16 deletions

View file

@ -19,11 +19,13 @@ from email.mime.multipart import MIMEMultipart
from email.header import Header, decode_header from email.header import Header, decode_header
from email import message_from_bytes, message_from_string from email import message_from_bytes, message_from_string
from email import charset as Charset from email import charset as Charset
from typing import Optional
from django.conf import settings from django.conf import settings
from django.contrib import messages from django.contrib import messages
from django.core.exceptions import ImproperlyConfigured, ValidationError from django.core.exceptions import ImproperlyConfigured, ValidationError
from django.core.validators import validate_email from django.core.validators import validate_email
from django.http import HttpRequest
from django.template.loader import render_to_string from django.template.loader import render_to_string
from django.template import Context,RequestContext from django.template import Context,RequestContext
from django.utils import timezone from django.utils import timezone
@ -64,6 +66,18 @@ def add_headers(msg):
msg['From'] = settings.DEFAULT_FROM_EMAIL msg['From'] = settings.DEFAULT_FROM_EMAIL
return msg return msg
def decode_header_value(value: str) -> str:
"""Decode a header value
Easier-to-use wrapper around email.message.decode_header()
"""
return "".join(
part.decode(charset if charset else "utf-8") if isinstance(part, bytes) else part
for part, charset in decode_header(value)
)
class SMTPSomeRefusedRecipients(smtplib.SMTPException): class SMTPSomeRefusedRecipients(smtplib.SMTPException):
def __init__(self, message, original_msg, refusals): def __init__(self, message, original_msg, refusals):
@ -251,8 +265,7 @@ def parseaddr(addr):
""" """
addr = ''.join( [ ( s.decode(m) if m else s.decode()) if isinstance(s, bytes) else s for (s,m) in decode_header(addr) ] ) name, addr = simple_parseaddr(decode_header_value(addr))
name, addr = simple_parseaddr(addr)
return name, addr return name, addr
def excludeaddrs(addrlist, exlist): def excludeaddrs(addrlist, exlist):
@ -330,18 +343,45 @@ def condition_message(to, frm, subject, msg, cc, extra):
msg['Message-ID'] = make_msgid() msg['Message-ID'] = make_msgid()
def show_that_mail_was_sent(request,leadline,msg,bcc): def show_that_mail_was_sent(request: HttpRequest, leadline: str, msg: Message, bcc: Optional[str]):
if request and request.user: if request and request.user:
from ietf.ietfauth.utils import has_role from ietf.ietfauth.utils import has_role
if has_role(request.user,['Area Director','Secretariat','IANA','RFC Editor','ISE','IAD','IRTF Chair','WG Chair','RG Chair','WG Secretary','RG Secretary']):
info = "%s at %s %s\n" % (leadline,timezone.now().strftime("%Y-%m-%d %H:%M:%S"),settings.TIME_ZONE) if has_role(
info += "Subject: %s\n" % force_str(msg.get('Subject','[no subject]')) request.user,
info += "To: %s\n" % msg.get('To','[no to]') [
if msg.get('Cc'): "Area Director",
info += "Cc: %s\n" % msg.get('Cc') "Secretariat",
if bcc: "IANA",
info += "Bcc: %s\n" % bcc "RFC Editor",
messages.info(request,info,extra_tags='preformatted',fail_silently=True) "ISE",
"IAD",
"IRTF Chair",
"WG Chair",
"RG Chair",
"WG Secretary",
"RG Secretary",
],
):
subject = decode_header_value(msg.get("Subject", "[no subject]"))
_to = decode_header_value(msg.get("To", "[no to]"))
info_lines = [
f"{leadline} at {timezone.now():%Y-%m-%d %H:%M:%S %Z}",
f"Subject: {subject}",
f"To: {_to}",
]
cc = msg.get("Cc", None)
if cc:
info_lines.append(f"Cc: {decode_header_value(cc)}")
if bcc:
info_lines.append(f"Bcc: {decode_header_value(bcc)}")
messages.info(
request,
"\n".join(info_lines),
extra_tags="preformatted",
fail_silently=True,
)
def save_as_message(request, msg, bcc): def save_as_message(request, msg, bcc):
by = ((request and request.user and not request.user.is_anonymous and request.user.person) by = ((request and request.user and not request.user.is_anonymous and request.user.person)

View file

@ -11,10 +11,11 @@ import pytz
import shutil import shutil
import types import types
from mock import patch from mock import call, patch
from pyquery import PyQuery from pyquery import PyQuery
from typing import Dict, List # pyflakes:ignore from typing import Dict, List # pyflakes:ignore
from email.message import Message
from email.mime.image import MIMEImage from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText from email.mime.text import MIMEText
@ -32,6 +33,7 @@ from django.template import Template # pyflakes:ignore
from django.template.defaulttags import URLNode from django.template.defaulttags import URLNode
from django.template.loader import get_template, render_to_string from django.template.loader import get_template, render_to_string
from django.templatetags.static import StaticNode from django.templatetags.static import StaticNode
from django.test import RequestFactory
from django.urls import reverse as urlreverse from django.urls import reverse as urlreverse
import debug # pyflakes:ignore import debug # pyflakes:ignore
@ -42,7 +44,15 @@ from ietf.submit.tests import submission_file
from ietf.utils.draft import PlaintextDraft, getmeta from ietf.utils.draft import PlaintextDraft, getmeta
from ietf.utils.fields import SearchableField from ietf.utils.fields import SearchableField
from ietf.utils.log import unreachable, assertion from ietf.utils.log import unreachable, assertion
from ietf.utils.mail import send_mail_preformatted, send_mail_text, send_mail_mime, outbox, get_payload_text from ietf.utils.mail import (
send_mail_preformatted,
send_mail_text,
send_mail_mime,
outbox,
get_payload_text,
decode_header_value,
show_that_mail_was_sent,
)
from ietf.utils.test_runner import get_template_paths, set_coverage_checking from ietf.utils.test_runner import get_template_paths, set_coverage_checking
from ietf.utils.test_utils import TestCase, unicontent from ietf.utils.test_utils import TestCase, unicontent
from ietf.utils.text import parse_unicode from ietf.utils.text import parse_unicode
@ -109,6 +119,135 @@ body
recv = outbox[-1] recv = outbox[-1]
self.assertEqual(recv['Fuzz'], 'bucket, monger') self.assertEqual(recv['Fuzz'], 'bucket, monger')
class MailUtilsTests(TestCase):
def test_decode_header_value(self):
self.assertEqual(
decode_header_value("cake"),
"cake",
"decodes simple string value",
)
self.assertEqual(
decode_header_value("=?utf-8?b?8J+Ogg==?="),
"\U0001f382",
"decodes single utf-8-encoded part",
)
self.assertEqual(
decode_header_value("=?utf-8?b?8J+Ogg==?= = =?macintosh?b?jYxrjg==?="),
"\U0001f382 = çåké",
"decodes a value with non-utf-8 encodings",
)
# Patch in a side_effect so we can distinguish values that came from decode_header_value.
@patch("ietf.utils.mail.decode_header_value", side_effect=lambda s: f"decoded-{s}")
@patch("ietf.utils.mail.messages")
def test_show_that_mail_was_sent(self, mock_messages, mock_decode_header_value):
request = RequestFactory().get("/some/path")
request.user = object() # just needs to exist
msg = Message()
msg["To"] = "to-value"
msg["Subject"] = "subject-value"
msg["Cc"] = "cc-value"
with patch("ietf.ietfauth.utils.has_role", return_value=True):
show_that_mail_was_sent(request, "mail was sent", msg, "bcc-value")
self.assertCountEqual(
mock_decode_header_value.call_args_list,
[call("to-value"), call("subject-value"), call("cc-value"), call("bcc-value")],
)
self.assertEqual(mock_messages.info.call_args[0][0], request)
self.assertIn("mail was sent", mock_messages.info.call_args[0][1])
self.assertIn("decoded-subject-value", mock_messages.info.call_args[0][1])
self.assertIn("decoded-to-value", mock_messages.info.call_args[0][1])
self.assertIn("decoded-cc-value", mock_messages.info.call_args[0][1])
self.assertIn("decoded-bcc-value", mock_messages.info.call_args[0][1])
mock_messages.reset_mock()
mock_decode_header_value.reset_mock()
# no bcc
with patch("ietf.ietfauth.utils.has_role", return_value=True):
show_that_mail_was_sent(request, "mail was sent", msg, None)
self.assertCountEqual(
mock_decode_header_value.call_args_list,
[call("to-value"), call("subject-value"), call("cc-value")],
)
self.assertEqual(mock_messages.info.call_args[0][0], request)
self.assertIn("mail was sent", mock_messages.info.call_args[0][1])
self.assertIn("decoded-subject-value", mock_messages.info.call_args[0][1])
self.assertIn("decoded-to-value", mock_messages.info.call_args[0][1])
self.assertIn("decoded-cc-value", mock_messages.info.call_args[0][1])
# Note: here and below - when using assertNotIn(), leaving off the "decoded-" prefix
# proves that neither the original value nor the decoded value appear.
self.assertNotIn("bcc-value", mock_messages.info.call_args[0][1])
mock_messages.reset_mock()
mock_decode_header_value.reset_mock()
# no cc
del msg["Cc"]
with patch("ietf.ietfauth.utils.has_role", return_value=True):
show_that_mail_was_sent(request, "mail was sent", msg, None)
self.assertCountEqual(
mock_decode_header_value.call_args_list,
[call("to-value"), call("subject-value")],
)
self.assertEqual(mock_messages.info.call_args[0][0], request)
self.assertIn("mail was sent", mock_messages.info.call_args[0][1])
self.assertIn("decoded-subject-value", mock_messages.info.call_args[0][1])
self.assertIn("decoded-to-value", mock_messages.info.call_args[0][1])
self.assertNotIn("cc-value", mock_messages.info.call_args[0][1])
self.assertNotIn("bcc-value", mock_messages.info.call_args[0][1])
mock_messages.reset_mock()
mock_decode_header_value.reset_mock()
# no to
del msg["To"]
with patch("ietf.ietfauth.utils.has_role", return_value=True):
show_that_mail_was_sent(request, "mail was sent", msg, None)
self.assertCountEqual(
mock_decode_header_value.call_args_list,
[call("[no to]"), call("subject-value")],
)
self.assertEqual(mock_messages.info.call_args[0][0], request)
self.assertIn("mail was sent", mock_messages.info.call_args[0][1])
self.assertIn("decoded-subject-value", mock_messages.info.call_args[0][1])
self.assertIn("decoded-[no to]", mock_messages.info.call_args[0][1])
self.assertNotIn("to-value", mock_messages.info.call_args[0][1])
self.assertNotIn("cc-value", mock_messages.info.call_args[0][1])
self.assertNotIn("bcc-value", mock_messages.info.call_args[0][1])
mock_messages.reset_mock()
mock_decode_header_value.reset_mock()
# no subject
del msg["Subject"]
with patch("ietf.ietfauth.utils.has_role", return_value=True):
show_that_mail_was_sent(request, "mail was sent", msg, None)
self.assertCountEqual(
mock_decode_header_value.call_args_list,
[call("[no to]"), call("[no subject]")],
)
self.assertEqual(mock_messages.info.call_args[0][0], request)
self.assertIn("mail was sent", mock_messages.info.call_args[0][1])
self.assertIn("decoded-[no subject]", mock_messages.info.call_args[0][1])
self.assertNotIn("subject-value", mock_messages.info.call_args[0][1])
self.assertIn("decoded-[no to]", mock_messages.info.call_args[0][1])
self.assertNotIn("to-value", mock_messages.info.call_args[0][1])
self.assertNotIn("cc-value", mock_messages.info.call_args[0][1])
self.assertNotIn("bcc-value", mock_messages.info.call_args[0][1])
mock_messages.reset_mock()
mock_decode_header_value.reset_mock()
# user does not have role
with patch("ietf.ietfauth.utils.has_role", return_value=False):
show_that_mail_was_sent(request, "mail was sent", msg, None)
self.assertFalse(mock_messages.called)
# no user
request.user = None
with patch("ietf.ietfauth.utils.has_role", return_value=True) as mock_has_role:
show_that_mail_was_sent(request, "mail was sent", msg, None)
self.assertFalse(mock_messages.called)
self.assertFalse(mock_has_role.called)
class TestSMTPServer(TestCase): class TestSMTPServer(TestCase):
def test_address_rejected(self): def test_address_rejected(self):