ci: merge main to release (#8091)
This commit is contained in:
commit
9e46a0581d
|
@ -33,9 +33,8 @@ from ietf.meeting.factories import MeetingFactory, SessionFactory
|
|||
from ietf.meeting.models import Session
|
||||
from ietf.nomcom.models import Volunteer
|
||||
from ietf.nomcom.factories import NomComFactory, nomcom_kwargs_for_year
|
||||
from ietf.person.factories import PersonFactory, random_faker, EmailFactory
|
||||
from ietf.person.factories import PersonFactory, random_faker, EmailFactory, PersonalApiKeyFactory
|
||||
from ietf.person.models import Email, User
|
||||
from ietf.person.models import PersonalApiKey
|
||||
from ietf.stats.models import MeetingRegistration
|
||||
from ietf.utils.mail import empty_outbox, outbox, get_payload_text
|
||||
from ietf.utils.models import DumpInfo
|
||||
|
@ -71,7 +70,7 @@ class CustomApiTests(TestCase):
|
|||
meeting = MeetingFactory(type_id='ietf')
|
||||
session = SessionFactory(group__type_id='wg', meeting=meeting)
|
||||
group = session.group
|
||||
apikey = PersonalApiKey.objects.create(endpoint=url, person=recman)
|
||||
apikey = PersonalApiKeyFactory(endpoint=url, person=recman)
|
||||
video = 'https://foo.example.com/bar/beer/'
|
||||
|
||||
# error cases
|
||||
|
@ -79,7 +78,7 @@ class CustomApiTests(TestCase):
|
|||
self.assertContains(r, "Missing apikey parameter", status_code=400)
|
||||
|
||||
badrole = RoleFactory(group__type_id='ietf', name_id='ad')
|
||||
badapikey = PersonalApiKey.objects.create(endpoint=url, person=badrole.person)
|
||||
badapikey = PersonalApiKeyFactory(endpoint=url, person=badrole.person)
|
||||
badrole.person.user.last_login = timezone.now()
|
||||
badrole.person.user.save()
|
||||
r = self.client.post(url, {'apikey': badapikey.hash()} )
|
||||
|
@ -151,7 +150,7 @@ class CustomApiTests(TestCase):
|
|||
recman = recmanrole.person
|
||||
meeting = MeetingFactory(type_id="ietf")
|
||||
session = SessionFactory(group__type_id="wg", meeting=meeting)
|
||||
apikey = PersonalApiKey.objects.create(endpoint=url, person=recman)
|
||||
apikey = PersonalApiKeyFactory(endpoint=url, person=recman)
|
||||
video = "https://foo.example.com/bar/beer/"
|
||||
|
||||
# error cases
|
||||
|
@ -159,7 +158,7 @@ class CustomApiTests(TestCase):
|
|||
self.assertContains(r, "Missing apikey parameter", status_code=400)
|
||||
|
||||
badrole = RoleFactory(group__type_id="ietf", name_id="ad")
|
||||
badapikey = PersonalApiKey.objects.create(endpoint=url, person=badrole.person)
|
||||
badapikey = PersonalApiKeyFactory(endpoint=url, person=badrole.person)
|
||||
badrole.person.user.last_login = timezone.now()
|
||||
badrole.person.user.save()
|
||||
r = self.client.post(url, {"apikey": badapikey.hash()})
|
||||
|
@ -228,7 +227,7 @@ class CustomApiTests(TestCase):
|
|||
recman = recmanrole.person
|
||||
meeting = MeetingFactory(type_id="ietf")
|
||||
session = SessionFactory(group__type_id="wg", meeting=meeting)
|
||||
apikey = PersonalApiKey.objects.create(endpoint=url, person=recman)
|
||||
apikey = PersonalApiKeyFactory(endpoint=url, person=recman)
|
||||
name = "testname"
|
||||
|
||||
# error cases
|
||||
|
@ -236,7 +235,7 @@ class CustomApiTests(TestCase):
|
|||
self.assertContains(r, "Missing apikey parameter", status_code=400)
|
||||
|
||||
badrole = RoleFactory(group__type_id="ietf", name_id="ad")
|
||||
badapikey = PersonalApiKey.objects.create(endpoint=url, person=badrole.person)
|
||||
badapikey = PersonalApiKeyFactory(endpoint=url, person=badrole.person)
|
||||
badrole.person.user.last_login = timezone.now()
|
||||
badrole.person.user.save()
|
||||
r = self.client.post(url, {"apikey": badapikey.hash()})
|
||||
|
@ -295,10 +294,10 @@ class CustomApiTests(TestCase):
|
|||
recman = recmanrole.person
|
||||
meeting = MeetingFactory(type_id='ietf')
|
||||
session = SessionFactory(group__type_id='wg', meeting=meeting)
|
||||
apikey = PersonalApiKey.objects.create(endpoint=url, person=recman)
|
||||
apikey = PersonalApiKeyFactory(endpoint=url, person=recman)
|
||||
|
||||
badrole = RoleFactory(group__type_id='ietf', name_id='ad')
|
||||
badapikey = PersonalApiKey.objects.create(endpoint=url, person=badrole.person)
|
||||
badapikey = PersonalApiKeyFactory(endpoint=url, person=badrole.person)
|
||||
badrole.person.user.last_login = timezone.now()
|
||||
badrole.person.user.save()
|
||||
|
||||
|
@ -361,10 +360,10 @@ class CustomApiTests(TestCase):
|
|||
recman = recmanrole.person
|
||||
meeting = MeetingFactory(type_id="ietf")
|
||||
session = SessionFactory(group__type_id="wg", meeting=meeting)
|
||||
apikey = PersonalApiKey.objects.create(endpoint=url, person=recman)
|
||||
apikey = PersonalApiKeyFactory(endpoint=url, person=recman)
|
||||
|
||||
badrole = RoleFactory(group__type_id="ietf", name_id="ad")
|
||||
badapikey = PersonalApiKey.objects.create(endpoint=url, person=badrole.person)
|
||||
badapikey = PersonalApiKeyFactory(endpoint=url, person=badrole.person)
|
||||
badrole.person.user.last_login = timezone.now()
|
||||
badrole.person.user.save()
|
||||
|
||||
|
@ -517,8 +516,8 @@ class CustomApiTests(TestCase):
|
|||
),
|
||||
):
|
||||
url = urlreverse(f"ietf.meeting.views.api_upload_{type_id}")
|
||||
apikey = PersonalApiKey.objects.create(endpoint=url, person=recmanrole.person)
|
||||
badapikey = PersonalApiKey.objects.create(endpoint=url, person=badrole.person)
|
||||
apikey = PersonalApiKeyFactory(endpoint=url, person=recmanrole.person)
|
||||
badapikey = PersonalApiKeyFactory(endpoint=url, person=badrole.person)
|
||||
|
||||
r = self.client.post(url, {})
|
||||
self.assertContains(r, "Missing apikey parameter", status_code=400)
|
||||
|
@ -562,7 +561,7 @@ class CustomApiTests(TestCase):
|
|||
meeting = MeetingFactory(type_id='ietf')
|
||||
session = SessionFactory(group__type_id='wg', meeting=meeting)
|
||||
group = session.group
|
||||
apikey = PersonalApiKey.objects.create(endpoint=url, person=recman)
|
||||
apikey = PersonalApiKeyFactory(endpoint=url, person=recman)
|
||||
|
||||
people = [
|
||||
{"name": "Andrea Andreotti", "affiliation": "Azienda"},
|
||||
|
@ -579,7 +578,7 @@ class CustomApiTests(TestCase):
|
|||
self.assertContains(r, "Missing apikey parameter", status_code=400)
|
||||
|
||||
badrole = RoleFactory(group__type_id='ietf', name_id='ad')
|
||||
badapikey = PersonalApiKey.objects.create(endpoint=url, person=badrole.person)
|
||||
badapikey = PersonalApiKeyFactory(endpoint=url, person=badrole.person)
|
||||
badrole.person.user.last_login = timezone.now()
|
||||
badrole.person.user.save()
|
||||
r = self.client.post(url, {'apikey': badapikey.hash()})
|
||||
|
@ -654,7 +653,7 @@ class CustomApiTests(TestCase):
|
|||
meeting = MeetingFactory(type_id="ietf")
|
||||
session = SessionFactory(group__type_id="wg", meeting=meeting)
|
||||
group = session.group
|
||||
apikey = PersonalApiKey.objects.create(endpoint=url, person=recman)
|
||||
apikey = PersonalApiKeyFactory(endpoint=url, person=recman)
|
||||
|
||||
people = [
|
||||
{"name": "Andrea Andreotti", "affiliation": "Azienda"},
|
||||
|
@ -671,7 +670,7 @@ class CustomApiTests(TestCase):
|
|||
self.assertContains(r, "Missing apikey parameter", status_code=400)
|
||||
|
||||
badrole = RoleFactory(group__type_id="ietf", name_id="ad")
|
||||
badapikey = PersonalApiKey.objects.create(endpoint=url, person=badrole.person)
|
||||
badapikey = PersonalApiKeyFactory(endpoint=url, person=badrole.person)
|
||||
badrole.person.user.last_login = timezone.now()
|
||||
badrole.person.user.save()
|
||||
r = self.client.post(url, {"apikey": badapikey.hash()})
|
||||
|
@ -781,14 +780,14 @@ class CustomApiTests(TestCase):
|
|||
url = urlreverse('ietf.api.views.ApiV2PersonExportView')
|
||||
robot = PersonFactory(user__is_staff=True)
|
||||
RoleFactory(name_id='robot', person=robot, email=robot.email(), group__acronym='secretariat')
|
||||
apikey = PersonalApiKey.objects.create(endpoint=url, person=robot)
|
||||
apikey = PersonalApiKeyFactory(endpoint=url, person=robot)
|
||||
|
||||
# error cases
|
||||
r = self.client.post(url, {})
|
||||
self.assertContains(r, "Missing apikey parameter", status_code=400)
|
||||
|
||||
badrole = RoleFactory(group__type_id='ietf', name_id='ad')
|
||||
badapikey = PersonalApiKey.objects.create(endpoint=url, person=badrole.person)
|
||||
badapikey = PersonalApiKeyFactory(endpoint=url, person=badrole.person)
|
||||
badrole.person.user.last_login = timezone.now()
|
||||
badrole.person.user.save()
|
||||
r = self.client.post(url, {'apikey': badapikey.hash()})
|
||||
|
@ -827,7 +826,7 @@ class CustomApiTests(TestCase):
|
|||
oidcp = PersonFactory(user__is_staff=True)
|
||||
# Make sure 'oidcp' has an acceptable role
|
||||
RoleFactory(name_id='robot', person=oidcp, email=oidcp.email(), group__acronym='secretariat')
|
||||
key = PersonalApiKey.objects.create(person=oidcp, endpoint=url)
|
||||
key = PersonalApiKeyFactory(person=oidcp, endpoint=url)
|
||||
reg['apikey'] = key.hash()
|
||||
#
|
||||
# Test valid POST
|
||||
|
@ -911,7 +910,7 @@ class CustomApiTests(TestCase):
|
|||
oidcp = PersonFactory(user__is_staff=True)
|
||||
# Make sure 'oidcp' has an acceptable role
|
||||
RoleFactory(name_id='robot', person=oidcp, email=oidcp.email(), group__acronym='secretariat')
|
||||
key = PersonalApiKey.objects.create(person=oidcp, endpoint=url)
|
||||
key = PersonalApiKeyFactory(person=oidcp, endpoint=url)
|
||||
reg['apikey'] = key.hash()
|
||||
|
||||
# first test is_nomcom_volunteer False
|
||||
|
@ -945,28 +944,30 @@ class CustomApiTests(TestCase):
|
|||
|
||||
|
||||
def test_api_appauth(self):
|
||||
url = urlreverse('ietf.api.views.app_auth')
|
||||
person = PersonFactory()
|
||||
apikey = PersonalApiKey.objects.create(endpoint=url, person=person)
|
||||
|
||||
self.client.login(username=person.user.username,password=f'{person.user.username}+password')
|
||||
self.client.logout()
|
||||
|
||||
# error cases
|
||||
# missing apikey
|
||||
r = self.client.post(url, {})
|
||||
self.assertContains(r, 'Missing apikey parameter', status_code=400)
|
||||
|
||||
# invalid apikey
|
||||
r = self.client.post(url, {'apikey': 'foobar'})
|
||||
self.assertContains(r, 'Invalid apikey', status_code=403)
|
||||
|
||||
# working case
|
||||
r = self.client.post(url, {'apikey': apikey.hash()})
|
||||
self.assertEqual(r.status_code, 200)
|
||||
jsondata = r.json()
|
||||
self.assertEqual(jsondata['success'], True)
|
||||
for app in ["authortools", "bibxml"]:
|
||||
url = urlreverse('ietf.api.views.app_auth', kwargs={"app": app})
|
||||
person = PersonFactory()
|
||||
apikey = PersonalApiKeyFactory(endpoint=url, person=person)
|
||||
|
||||
self.client.login(username=person.user.username,password=f'{person.user.username}+password')
|
||||
self.client.logout()
|
||||
|
||||
# error cases
|
||||
# missing apikey
|
||||
r = self.client.post(url, {})
|
||||
self.assertContains(r, 'Missing apikey parameter', status_code=400)
|
||||
|
||||
# invalid apikey
|
||||
r = self.client.post(url, {'apikey': 'foobar'})
|
||||
self.assertContains(r, 'Invalid apikey', status_code=403)
|
||||
|
||||
# working case
|
||||
r = self.client.post(url, {'apikey': apikey.hash()})
|
||||
self.assertEqual(r.status_code, 200)
|
||||
jsondata = r.json()
|
||||
self.assertEqual(jsondata['success'], True)
|
||||
self.client.logout()
|
||||
|
||||
def test_api_get_session_matherials_no_agenda_meeting_url(self):
|
||||
meeting = MeetingFactory(type_id='ietf')
|
||||
session = SessionFactory(meeting=meeting)
|
||||
|
|
|
@ -69,7 +69,7 @@ urlpatterns = [
|
|||
# Datatracker version
|
||||
url(r'^version/?$', api_views.version),
|
||||
# Application authentication API key
|
||||
url(r'^appauth/[authortools|bibxml]', api_views.app_auth),
|
||||
url(r'^appauth/(?P<app>authortools|bibxml)$', api_views.app_auth),
|
||||
# latest versions
|
||||
url(r'^rfcdiff-latest-json/%(name)s(?:-%(rev)s)?(\.txt|\.html)?/?$' % settings.URL_REGEXPS, api_views.rfcdiff_latest_json),
|
||||
url(r'^rfcdiff-latest-json/(?P<name>[Rr][Ff][Cc] [0-9]+?)(\.txt|\.html)?/?$', api_views.rfcdiff_latest_json),
|
||||
|
|
|
@ -30,7 +30,7 @@ from tastypie.utils import is_valid_jsonp_callback_value
|
|||
from tastypie.utils.mime import determine_format, build_content_type
|
||||
from textwrap import dedent
|
||||
from traceback import format_exception, extract_tb
|
||||
from typing import Iterable, Optional
|
||||
from typing import Iterable, Optional, Literal
|
||||
|
||||
import ietf
|
||||
from ietf.api import _api_list
|
||||
|
@ -251,7 +251,7 @@ def version(request):
|
|||
|
||||
@require_api_key
|
||||
@csrf_exempt
|
||||
def app_auth(request):
|
||||
def app_auth(request, app: Literal["authortools", "bibxml"]):
|
||||
return HttpResponse(
|
||||
json.dumps({'success': True}),
|
||||
content_type='application/json')
|
||||
|
|
|
@ -27,8 +27,8 @@ from ietf.group.factories import GroupFactory, RoleFactory, ReviewTeamFactory
|
|||
from ietf.ipr.factories import HolderIprDisclosureFactory
|
||||
from ietf.name.models import BallotPositionName
|
||||
from ietf.iesg.models import TelechatDate
|
||||
from ietf.person.models import Person, PersonalApiKey
|
||||
from ietf.person.factories import PersonFactory
|
||||
from ietf.person.models import Person
|
||||
from ietf.person.factories import PersonFactory, PersonalApiKeyFactory
|
||||
from ietf.person.utils import get_active_ads
|
||||
from ietf.utils.test_utils import TestCase, login_testing_unauthorized
|
||||
from ietf.utils.mail import outbox, empty_outbox, get_payload_text
|
||||
|
@ -111,7 +111,7 @@ class EditPositionTests(TestCase):
|
|||
create_ballot_if_not_open(None, draft, ad, 'approve')
|
||||
ad.user.last_login = timezone.now()
|
||||
ad.user.save()
|
||||
apikey = PersonalApiKey.objects.create(endpoint=url, person=ad)
|
||||
apikey = PersonalApiKeyFactory(endpoint=url, person=ad)
|
||||
|
||||
# vote
|
||||
events_before = draft.docevent_set.count()
|
||||
|
|
|
@ -35,7 +35,7 @@ from ietf.ietfauth.utils import has_role
|
|||
from ietf.meeting.factories import MeetingFactory
|
||||
from ietf.nomcom.factories import NomComFactory
|
||||
from ietf.person.factories import PersonFactory, EmailFactory, UserFactory, PersonalApiKeyFactory
|
||||
from ietf.person.models import Person, Email, PersonalApiKey
|
||||
from ietf.person.models import Person, Email
|
||||
from ietf.person.tasks import send_apikey_usage_emails_task
|
||||
from ietf.review.factories import ReviewRequestFactory, ReviewAssignmentFactory
|
||||
from ietf.review.models import ReviewWish, UnavailablePeriod
|
||||
|
@ -788,9 +788,8 @@ class IetfAuthTests(TestCase):
|
|||
self.assertContains(r, 'Invalid apikey', status_code=403)
|
||||
|
||||
# invalid apikey (invalidated api key)
|
||||
unauthorized_url = urlreverse('ietf.api.views.app_auth')
|
||||
invalidated_apikey = PersonalApiKey.objects.create(
|
||||
endpoint=unauthorized_url, person=person, valid=False)
|
||||
unauthorized_url = urlreverse('ietf.api.views.app_auth', kwargs={'app': 'authortools'})
|
||||
invalidated_apikey = PersonalApiKeyFactory(endpoint=unauthorized_url, person=person, valid=False)
|
||||
r = self.client.post(unauthorized_url, {'apikey': invalidated_apikey.hash()})
|
||||
self.assertContains(r, 'Invalid apikey', status_code=403)
|
||||
|
||||
|
@ -803,7 +802,11 @@ class IetfAuthTests(TestCase):
|
|||
person.user.save()
|
||||
|
||||
# endpoint mismatch
|
||||
key2 = PersonalApiKey.objects.create(person=person, endpoint='/')
|
||||
key2 = PersonalApiKeyFactory(
|
||||
person=person,
|
||||
endpoint='/',
|
||||
validate_model=False, # allow invalid endpoint
|
||||
)
|
||||
r = self.client.post(key.endpoint, {'apikey':key2.hash(), 'dummy':'dummy',})
|
||||
self.assertContains(r, 'Apikey endpoint mismatch', status_code=400)
|
||||
key2.delete()
|
||||
|
|
|
@ -38,7 +38,7 @@ import debug # pyflakes:ignore
|
|||
from ietf.doc.models import Document, NewRevisionDocEvent
|
||||
from ietf.group.models import Group, Role, GroupFeatures
|
||||
from ietf.group.utils import can_manage_group
|
||||
from ietf.person.models import Person, PersonalApiKey
|
||||
from ietf.person.models import Person
|
||||
from ietf.meeting.helpers import can_approve_interim_request, can_request_interim_meeting, can_view_interim_request, preprocess_assignments_for_agenda
|
||||
from ietf.meeting.helpers import send_interim_approval_request, AgendaKeywordTagger
|
||||
from ietf.meeting.helpers import send_interim_meeting_cancellation_notice, send_interim_session_cancellation_notice
|
||||
|
@ -56,7 +56,7 @@ from ietf.utils.mail import outbox, empty_outbox, get_payload_text
|
|||
from ietf.utils.test_utils import TestCase, login_testing_unauthorized, unicontent
|
||||
from ietf.utils.timezone import date_today, time_now
|
||||
|
||||
from ietf.person.factories import PersonFactory
|
||||
from ietf.person.factories import PersonFactory, PersonalApiKeyFactory
|
||||
from ietf.group.factories import GroupFactory, GroupEventFactory, RoleFactory
|
||||
from ietf.meeting.factories import (SessionFactory, ScheduleFactory,
|
||||
SessionPresentationFactory, MeetingFactory, FloorPlanFactory,
|
||||
|
@ -8743,7 +8743,7 @@ class ProceedingsTests(BaseMeetingTestCase):
|
|||
add_attendees_url = urlreverse('ietf.meeting.views.api_add_session_attendees')
|
||||
recmanrole = RoleFactory(group__type_id='ietf', name_id='recman', person__user__last_login=timezone.now())
|
||||
recman = recmanrole.person
|
||||
apikey = PersonalApiKey.objects.create(endpoint=add_attendees_url, person=recman)
|
||||
apikey = PersonalApiKeyFactory(endpoint=add_attendees_url, person=recman)
|
||||
attendees = [person.user.pk for person in persons]
|
||||
self.client.login(username='recman', password='recman+password')
|
||||
r = self.client.post(add_attendees_url, {'apikey':apikey.hash(), 'attended':f'{{"session_id":{session.pk},"attendees":{attendees}}}'})
|
||||
|
|
|
@ -158,10 +158,22 @@ class EmailFactory(factory.django.DjangoModelFactory):
|
|||
|
||||
class PersonalApiKeyFactory(factory.django.DjangoModelFactory):
|
||||
person = factory.SubFactory(PersonFactory)
|
||||
endpoint = FuzzyChoice(PERSON_API_KEY_ENDPOINTS)
|
||||
|
||||
endpoint = FuzzyChoice(v for v, n in PERSON_API_KEY_ENDPOINTS)
|
||||
|
||||
class Meta:
|
||||
model = PersonalApiKey
|
||||
skip_postgeneration_save = True
|
||||
|
||||
@factory.post_generation
|
||||
def validate_model(obj, create, extracted, **kwargs):
|
||||
"""Validate the model after creation
|
||||
|
||||
Passing validate_model=False will disable the validation.
|
||||
"""
|
||||
do_clean = True if extracted is None else extracted
|
||||
if do_clean:
|
||||
obj.full_clean()
|
||||
|
||||
|
||||
class PersonApiKeyEventFactory(factory.django.DjangoModelFactory):
|
||||
key = factory.SubFactory(PersonalApiKeyFactory)
|
||||
|
|
42
ietf/person/migrations/0003_alter_personalapikey_endpoint.py
Normal file
42
ietf/person/migrations/0003_alter_personalapikey_endpoint.py
Normal file
|
@ -0,0 +1,42 @@
|
|||
# Generated by Django 4.2.16 on 2024-10-24 21:39
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("person", "0002_alter_historicalperson_ascii_and_more"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AlterField(
|
||||
model_name="personalapikey",
|
||||
name="endpoint",
|
||||
field=models.CharField(
|
||||
choices=[
|
||||
("/api/appauth/authortools", "/api/appauth/authortools"),
|
||||
("/api/appauth/bibxml", "/api/appauth/bibxml"),
|
||||
("/api/iesg/position", "/api/iesg/position"),
|
||||
(
|
||||
"/api/meeting/session/recording-name",
|
||||
"/api/meeting/session/recording-name",
|
||||
),
|
||||
(
|
||||
"/api/meeting/session/video/url",
|
||||
"/api/meeting/session/video/url",
|
||||
),
|
||||
("/api/notify/meeting/bluesheet", "/api/notify/meeting/bluesheet"),
|
||||
(
|
||||
"/api/notify/meeting/registration",
|
||||
"/api/notify/meeting/registration",
|
||||
),
|
||||
("/api/notify/session/attendees", "/api/notify/session/attendees"),
|
||||
("/api/notify/session/chatlog", "/api/notify/session/chatlog"),
|
||||
("/api/notify/session/polls", "/api/notify/session/polls"),
|
||||
("/api/v2/person/person", "/api/v2/person/person"),
|
||||
],
|
||||
max_length=128,
|
||||
),
|
||||
),
|
||||
]
|
|
@ -376,6 +376,7 @@ PERSON_API_KEY_VALUES = [
|
|||
("/api/iesg/position", "/api/iesg/position", "Area Director"),
|
||||
("/api/v2/person/person", "/api/v2/person/person", "Robot"),
|
||||
("/api/meeting/session/video/url", "/api/meeting/session/video/url", "Recording Manager"),
|
||||
("/api/meeting/session/recording-name", "/api/meeting/session/recording-name", "Recording Manager"),
|
||||
("/api/notify/meeting/registration", "/api/notify/meeting/registration", "Robot"),
|
||||
("/api/notify/meeting/bluesheet", "/api/notify/meeting/bluesheet", "Recording Manager"),
|
||||
("/api/notify/session/attendees", "/api/notify/session/attendees", "Recording Manager"),
|
||||
|
|
|
@ -1,21 +0,0 @@
|
|||
#
|
||||
# pyzmail/__init__.py
|
||||
# (c) Alain Spineux <alain.spineux@gmail.com>
|
||||
# http://www.magiksys.net/pyzmail
|
||||
# Released under LGPL
|
||||
|
||||
from . import utils
|
||||
from .generate import compose_mail, send_mail, send_mail2
|
||||
from .parse import email_address_re, PyzMessage, PzMessage, decode_text
|
||||
from .parse import message_from_string, message_from_file
|
||||
from .parse import message_from_bytes, message_from_binary_file # python >= 3.2
|
||||
from .version import __version__
|
||||
|
||||
# to help epydoc to display functions available from top of the package
|
||||
__all__= [ 'compose_mail', 'send_mail', 'send_mail2', 'email_address_re', \
|
||||
'PyzMessage', 'PzMessage', 'decode_text', '__version__',
|
||||
'utils', 'generate', 'parse', 'version',
|
||||
'message_from_string','message_from_file',
|
||||
'message_from_binary_file', 'message_from_bytes', # python >= 3.2
|
||||
]
|
||||
|
|
@ -1,529 +0,0 @@
|
|||
#
|
||||
# pyzmail/generate.py
|
||||
# (c) Alain Spineux <alain.spineux@gmail.com>
|
||||
# http://www.magiksys.net/pyzmail
|
||||
# Released under LGPL
|
||||
|
||||
"""
|
||||
Useful functions to compose and send emails.
|
||||
|
||||
For short:
|
||||
|
||||
>>> payload, mail_from, rcpt_to, msg_id=compose_mail((u'Me', 'me@foo.com'),
|
||||
... [(u'Him', 'him@bar.com')], u'the subject', 'iso-8859-1', ('Hello world', 'us-ascii'),
|
||||
... attachments=[('attached', 'text', 'plain', 'text.txt', 'us-ascii')])
|
||||
... #doctest: +SKIP
|
||||
>>> error=send_mail(payload, mail_from, rcpt_to, 'localhost', smtp_port=25)
|
||||
... #doctest: +SKIP
|
||||
"""
|
||||
|
||||
import os, sys
|
||||
import time
|
||||
import base64
|
||||
import smtplib, socket
|
||||
import email
|
||||
import email.encoders
|
||||
import email.header
|
||||
import email.utils
|
||||
import email.mime
|
||||
import email.mime.base
|
||||
import email.mime.text
|
||||
import email.mime.image
|
||||
import email.mime.multipart
|
||||
|
||||
from . import utils
|
||||
|
||||
def format_addresses(addresses, header_name=None, charset=None):
|
||||
"""
|
||||
Convert a list of addresses into a MIME-compliant header for a From, To, Cc,
|
||||
or any other I{address} related field.
|
||||
This mixes the use of email.utils.formataddr() and email.header.Header().
|
||||
|
||||
@type addresses: list
|
||||
@param addresses: list of addresses, can be a mix of string a tuple of the form
|
||||
C{[ 'address@domain', (u'Name', 'name@domain'), ...]}.
|
||||
If C{u'Name'} contains non us-ascii characters, it must be a
|
||||
unicode string or encoded using the I{charset} argument.
|
||||
@type header_name: string or None
|
||||
@keyword header_name: the name of the header. Its length is used to limit
|
||||
the length of the first line of the header according the RFC's
|
||||
requirements. (not very important, but it's better to match the
|
||||
requirements when possible)
|
||||
@type charset: str
|
||||
@keyword charset: the encoding charset for non unicode I{name} and a B{hint}
|
||||
for encoding of unicode string. In other words,
|
||||
if the I{name} of an address in a byte string containing non
|
||||
I{us-ascii} characters, then C{name.decode(charset)}
|
||||
must generate the expected result. If a unicode string
|
||||
is used instead, charset will be tried to encode the
|
||||
string, if it fail, I{utf-8} will be used.
|
||||
With B{Python 3.x} I{charset} is no more a hint and an exception will
|
||||
be raised instead of using I{utf-8} has a fall back.
|
||||
@rtype: str
|
||||
@return: the encoded list of formated addresses separated by commas,
|
||||
ready to use as I{Header} value.
|
||||
|
||||
>>> print format_addresses([('John', 'john@foo.com') ], 'From', 'us-ascii').encode()
|
||||
John <john@foo.com>
|
||||
>>> print format_addresses([(u'l\\xe9o', 'leo@foo.com') ], 'To', 'iso-8859-1').encode()
|
||||
=?iso-8859-1?q?l=E9o?= <leo@foo.com>
|
||||
>>> print format_addresses([(u'l\\xe9o', 'leo@foo.com') ], 'To', 'us-ascii').encode()
|
||||
... # don't work in 3.X because charset is more than a hint
|
||||
... #doctest: +SKIP
|
||||
=?utf-8?q?l=C3=A9o?= <leo@foo.com>
|
||||
>>> # because u'l\xe9o' cannot be encoded into us-ascii, utf8 is used instead
|
||||
>>> print format_addresses([('No\\xe9', 'noe@f.com'), (u'M\\u0101ori', 'maori@b.com') ], 'Cc', 'iso-8859-1').encode()
|
||||
... # don't work in 3.X because charset is more than a hint
|
||||
... #doctest: +SKIP
|
||||
=?iso-8859-1?q?No=E9?= <noe@f.com> , =?utf-8?b?TcSBb3Jp?= <maori@b.com>
|
||||
>>> # 'No\xe9' is already encoded into iso-8859-1, but u'M\\u0101ori' cannot be encoded into iso-8859-1
|
||||
>>> # then utf8 is used here
|
||||
>>> print format_addresses(['a@bar.com', ('John', 'john@foo.com') ], 'From', 'us-ascii').encode()
|
||||
a@bar.com , John <john@foo.com>
|
||||
"""
|
||||
header=email.header.Header(charset=charset, header_name=header_name)
|
||||
for i, address in enumerate(addresses):
|
||||
if i!=0:
|
||||
# add separator between addresses
|
||||
header.append(',', charset='us-ascii')
|
||||
|
||||
try:
|
||||
name, addr=address
|
||||
except ValueError:
|
||||
# address is not a tuple, their is no name, only email address
|
||||
header.append(address, charset='us-ascii')
|
||||
else:
|
||||
# check if address name is a unicode or byte string in "pure" us-ascii
|
||||
if utils.is_usascii(name):
|
||||
# name is a us-ascii byte string, i can use formataddr
|
||||
formated_addr=email.utils.formataddr((name, addr))
|
||||
# us-ascii must be used and not default 'charset'
|
||||
header.append(formated_addr, charset='us-ascii')
|
||||
else:
|
||||
# this is not as "pure" us-ascii string
|
||||
# Header will use "RFC2047" to encode the address name
|
||||
# if name is byte string, charset will be used to decode it first
|
||||
header.append(name)
|
||||
# here us-ascii must be used and not default 'charset'
|
||||
header.append('<%s>' % (addr,), charset='us-ascii')
|
||||
|
||||
return header
|
||||
|
||||
|
||||
def build_mail(text, html=None, attachments=None, embeddeds=None):
|
||||
"""
|
||||
Generate the core of the email message regarding the parameters.
|
||||
The structure of the MIME email may vary, but the general one is as follow::
|
||||
|
||||
multipart/mixed (only if attachments are included)
|
||||
|
|
||||
+-- multipart/related (only if embedded contents are included)
|
||||
| |
|
||||
| +-- multipart/alternative (only if text AND html are available)
|
||||
| | |
|
||||
| | +-- text/plain (text version of the message)
|
||||
| | +-- text/html (html version of the message)
|
||||
| |
|
||||
| +-- image/gif (where to include embedded contents)
|
||||
|
|
||||
+-- application/msword (where to add attachments)
|
||||
|
||||
@param text: the text version of the message, under the form of a tuple:
|
||||
C{(encoded_content, encoding)} where I{encoded_content} is a byte string
|
||||
encoded using I{encoding}.
|
||||
I{text} can be None if the message has no text version.
|
||||
@type text: tuple or None
|
||||
@keyword html: the HTML version of the message, under the form of a tuple:
|
||||
C{(encoded_content, encoding)} where I{encoded_content} is a byte string
|
||||
encoded using I{encoding}
|
||||
I{html} can be None if the message has no HTML version.
|
||||
@type html: tuple or None
|
||||
@keyword attachments: the list of attachments to include into the mail, in the
|
||||
form [(data, maintype, subtype, filename, charset), ..] where :
|
||||
- I{data} : is the raw data, or a I{charset} encoded string for 'text'
|
||||
content.
|
||||
- I{maintype} : is a MIME main type like : 'text', 'image', 'application' ....
|
||||
- I{subtype} : is a MIME sub type of the above I{maintype} for example :
|
||||
'plain', 'png', 'msword' for respectively 'text/plain', 'image/png',
|
||||
'application/msword'.
|
||||
- I{filename} this is the filename of the attachment, it must be a
|
||||
'us-ascii' string or a tuple of the form
|
||||
C{(encoding, language, encoded_filename)}
|
||||
following the RFC2231 requirement, for example
|
||||
C{('iso-8859-1', 'fr', u'r\\xe9pertoir.png'.encode('iso-8859-1'))}
|
||||
- I{charset} : if I{maintype} is 'text', then I{data} must be encoded
|
||||
using this I{charset}. It can be None for non 'text' content.
|
||||
@type attachments: list
|
||||
@keyword embeddeds: is a list of documents embedded inside the HTML or text
|
||||
version of the message. It is similar to the I{attachments} list,
|
||||
but I{filename} is replaced by I{content_id} that is related to
|
||||
the B{cid} reference into the HTML or text version of the message.
|
||||
@type embeddeds: list
|
||||
@rtype: inherit from email.Message
|
||||
@return: the message in a MIME object
|
||||
|
||||
>>> mail=build_mail(('Hello world', 'us-ascii'), attachments=[('attached', 'text', 'plain', 'text.txt', 'us-ascii')])
|
||||
>>> mail.set_boundary('===limit1==')
|
||||
>>> print mail.as_string(unixfrom=False)
|
||||
Content-Type: multipart/mixed; boundary="===limit1=="
|
||||
MIME-Version: 1.0
|
||||
<BLANKLINE>
|
||||
--===limit1==
|
||||
Content-Type: text/plain; charset="us-ascii"
|
||||
MIME-Version: 1.0
|
||||
Content-Transfer-Encoding: 7bit
|
||||
<BLANKLINE>
|
||||
Hello world
|
||||
--===limit1==
|
||||
Content-Type: text/plain; charset="us-ascii"
|
||||
MIME-Version: 1.0
|
||||
Content-Transfer-Encoding: 7bit
|
||||
Content-Disposition: attachment; filename="text.txt"
|
||||
<BLANKLINE>
|
||||
attached
|
||||
--===limit1==--
|
||||
"""
|
||||
|
||||
if attachments is None:
|
||||
attachments = []
|
||||
if embeddeds is None:
|
||||
embeddeds = []
|
||||
|
||||
main=text_part=html_part=None
|
||||
if text:
|
||||
content, charset=text
|
||||
main=text_part=email.mime.text.MIMEText(content, 'plain', charset)
|
||||
|
||||
if html:
|
||||
content, charset=html
|
||||
main=html_part=email.mime.text.MIMEText(content, 'html', charset)
|
||||
|
||||
if not text_part and not html_part:
|
||||
main=text_part=email.mime.text.MIMEText('', 'plain', 'us-ascii')
|
||||
elif text_part and html_part:
|
||||
# need to create a multipart/alternative to include text and html version
|
||||
main=email.mime.multipart.MIMEMultipart('alternative', None, [text_part, html_part])
|
||||
|
||||
if embeddeds:
|
||||
related=email.mime.multipart.MIMEMultipart('related')
|
||||
related.attach(main)
|
||||
for part in embeddeds:
|
||||
if not isinstance(part, email.mime.base.MIMEBase):
|
||||
data, maintype, subtype, content_id, charset=part
|
||||
if (maintype=='text'):
|
||||
part=email.mime.text.MIMEText(data, subtype, charset)
|
||||
else:
|
||||
part=email.mime.base.MIMEBase(maintype, subtype)
|
||||
part.set_payload(data)
|
||||
email.encoders.encode_base64(part)
|
||||
part.add_header('Content-ID', '<'+content_id+'>')
|
||||
part.add_header('Content-Disposition', 'inline')
|
||||
related.attach(part)
|
||||
main=related
|
||||
|
||||
if attachments:
|
||||
mixed=email.mime.multipart.MIMEMultipart('mixed')
|
||||
mixed.attach(main)
|
||||
for part in attachments:
|
||||
if not isinstance(part, email.mime.base.MIMEBase):
|
||||
data, maintype, subtype, filename, charset=part
|
||||
if (maintype=='text'):
|
||||
part=email.mime.text.MIMEText(data, subtype, charset)
|
||||
else:
|
||||
part=email.mime.base.MIMEBase(maintype, subtype)
|
||||
part.set_payload(data)
|
||||
email.encoders.encode_base64(part)
|
||||
part.add_header('Content-Disposition', 'attachment', filename=filename)
|
||||
mixed.attach(part)
|
||||
main=mixed
|
||||
|
||||
return main
|
||||
|
||||
def complete_mail(message, sender, recipients, subject, default_charset, cc=None, bcc=None, message_id_string=None, date=None, headers=None):
|
||||
"""
|
||||
Fill in the From, To, Cc, Subject, Date and Message-Id I{headers} of
|
||||
one existing message regarding the parameters.
|
||||
|
||||
@type message:email.Message
|
||||
@param message: the message to fill in
|
||||
@type sender: tuple
|
||||
@param sender: a tuple of the form (u'Sender Name', 'sender.address@domain.com')
|
||||
@type recipients: list
|
||||
@param recipients: a list of addresses. Address can be tuple or string like
|
||||
expected by L{format_addresses()}, for example: C{[ 'address@dmain.com',
|
||||
(u'Recipient Name', 'recipient.address@domain.com'), ... ]}
|
||||
@type subject: str
|
||||
@param subject: The subject of the message, can be a unicode string or a
|
||||
string encoded using I{default_charset} encoding. Prefert unicode to
|
||||
byte string here.
|
||||
@type default_charset: str
|
||||
@param default_charset: The default charset for this email. Arguments
|
||||
that are non unicode string are supposed to be encoded using this charset.
|
||||
This I{charset} will be used has an hint when encoding mail content.
|
||||
@type cc: list
|
||||
@keyword cc: The I{carbone copy} addresses. Same format as the I{recipients}
|
||||
argument.
|
||||
@type bcc: list
|
||||
@keyword bcc: The I{blind carbone copy} addresses. Same format as the I{recipients}
|
||||
argument.
|
||||
@type message_id_string: str or None
|
||||
@keyword message_id_string: if None, don't append any I{Message-ID} to the
|
||||
mail, let the SMTP do the job, else use the string to generate a unique
|
||||
I{ID} using C{email.utils.make_msgid()}. The generated value is
|
||||
returned as last argument. For example use the name of your application.
|
||||
@type date: int or None
|
||||
@keyword date: utc time in second from the epoch or None. If None then
|
||||
use curent time C{time.time()} instead.
|
||||
@type headers: list of tuple
|
||||
@keyword headers: a list of C{(field, value)} tuples to fill in the mail
|
||||
header fields. Values are encoded using I{default_charset}.
|
||||
@rtype: tuple
|
||||
@return: B{(payload, mail_from, rcpt_to, msg_id)}
|
||||
- I{payload} (str) is the content of the email, generated from the message
|
||||
- I{mail_from} (str) is the address of the sender to pass to the SMTP host
|
||||
- I{rcpt_to} (list) is a list of the recipients addresses to pass to the SMTP host
|
||||
of the form C{[ 'a@b.com', c@d.com', ]}. This combine all recipients,
|
||||
I{carbone copy} addresses and I{blind carbone copy} addresses.
|
||||
- I{msg_id} (None or str) None if message_id_string==None else the generated value for
|
||||
the message-id. If not None, this I{Message-ID} is already written
|
||||
into the payload.
|
||||
|
||||
>>> import email.mime.text
|
||||
>>> msg=email.mime.text.MIMEText('The text.', 'plain', 'us-ascii')
|
||||
>>> # I could use build_mail() instead
|
||||
>>> payload, mail_from, rcpt_to, msg_id=complete_mail(msg, ('Me', 'me@foo.com'),
|
||||
... [ ('Him', 'him@bar.com'), ], 'Non unicode subject', 'iso-8859-1',
|
||||
... cc=['her@bar.com',], date=1313558269, headers=[('User-Agent', u'pyzmail'), ])
|
||||
>>> print payload
|
||||
... # 3.X encode User-Agent: using 'iso-8859-1' even if it contains only us-asccii
|
||||
... # doctest: +ELLIPSIS
|
||||
Content-Type: text/plain; charset="us-ascii"
|
||||
MIME-Version: 1.0
|
||||
Content-Transfer-Encoding: 7bit
|
||||
From: Me <me@foo.com>
|
||||
To: Him <him@bar.com>
|
||||
Cc: her@bar.com
|
||||
Subject: =?iso-8859-1?q?Non_unicode_subject?=
|
||||
Date: ...
|
||||
User-Agent: ...pyzmail...
|
||||
<BLANKLINE>
|
||||
The text.
|
||||
>>> print 'mail_from=%r rcpt_to=%r' % (mail_from, rcpt_to)
|
||||
mail_from='me@foo.com' rcpt_to=['him@bar.com', 'her@bar.com']
|
||||
"""
|
||||
def getaddr(address):
|
||||
if isinstance(address, tuple):
|
||||
return address[1]
|
||||
else:
|
||||
return address
|
||||
|
||||
if cc is None:
|
||||
cc=[]
|
||||
if bcc is None:
|
||||
bcc=[]
|
||||
if headers is None:
|
||||
headers=[]
|
||||
|
||||
mail_from=getaddr(sender[1])
|
||||
rcpt_to=list(map(getaddr, recipients))
|
||||
rcpt_to.extend(list(map(getaddr, cc)))
|
||||
rcpt_to.extend(list(map(getaddr, bcc)))
|
||||
|
||||
message['From'] = format_addresses([ sender, ], header_name='from', charset=default_charset)
|
||||
if recipients:
|
||||
message['To'] = format_addresses(recipients, header_name='to', charset=default_charset)
|
||||
if cc:
|
||||
message['Cc'] = format_addresses(cc, header_name='cc', charset=default_charset)
|
||||
message['Subject'] = email.header.Header(subject, default_charset)
|
||||
if date:
|
||||
utc_from_epoch=date
|
||||
else:
|
||||
utc_from_epoch=time.time()
|
||||
message['Date'] = email.utils.formatdate(utc_from_epoch, localtime=True)
|
||||
|
||||
if message_id_string:
|
||||
msg_id=message['Message-Id']=email.utils.make_msgid(message_id_string)
|
||||
else:
|
||||
msg_id=None
|
||||
|
||||
for field, value in headers:
|
||||
message[field]=email.header.Header(value, default_charset)
|
||||
|
||||
payload=message.as_string()
|
||||
|
||||
return payload, mail_from, rcpt_to, msg_id
|
||||
|
||||
def compose_mail(sender, recipients, subject, default_charset, text, html=None, attachments=None, embeddeds=None, cc=None, bcc=None, message_id_string=None, date=None, headers=None):
|
||||
"""
|
||||
Compose an email regarding the arguments. Call L{build_mail()} and
|
||||
L{complete_mail()} at once.
|
||||
|
||||
Read the B{parameters} descriptions of both functions L{build_mail()} and L{complete_mail()}.
|
||||
|
||||
Returned value is the same as for L{build_mail()} and L{complete_mail()}.
|
||||
You can pass the returned values to L{send_mail()} or L{send_mail2()}.
|
||||
|
||||
@rtype: tuple
|
||||
@return: B{(payload, mail_from, rcpt_to, msg_id)}
|
||||
|
||||
>>> payload, mail_from, rcpt_to, msg_id=compose_mail((u'Me', 'me@foo.com'), [(u'Him', 'him@bar.com')], u'the subject', 'iso-8859-1', ('Hello world', 'us-ascii'), attachments=[('attached', 'text', 'plain', 'text.txt', 'us-ascii')])
|
||||
"""
|
||||
if attachments is None:
|
||||
attachments=[]
|
||||
if embeddeds is None:
|
||||
embeddeds=[]
|
||||
if cc is None:
|
||||
cc=[]
|
||||
if bcc is None:
|
||||
bcc = []
|
||||
if headers is None:
|
||||
headers=[]
|
||||
|
||||
message=build_mail(text, html, attachments, embeddeds)
|
||||
return complete_mail(message, sender, recipients, subject, default_charset, cc, bcc, message_id_string, date, headers)
|
||||
|
||||
|
||||
def send_mail2(payload, mail_from, rcpt_to, smtp_host, smtp_port=25, smtp_mode='normal', smtp_login=None, smtp_password=None):
|
||||
"""
|
||||
Send the message to a SMTP host. Look at the L{send_mail()} documentation.
|
||||
L{send_mail()} call this function and catch all exceptions to convert them
|
||||
into a user friendly error message. The returned value
|
||||
is always a dictionary. It can be empty if all recipients have been
|
||||
accepted.
|
||||
|
||||
@rtype: dict
|
||||
@return: This function return the value returnd by C{smtplib.SMTP.sendmail()}
|
||||
or raise the same exceptions.
|
||||
|
||||
This method will return normally if the mail is accepted for at least one
|
||||
recipient. Otherwise it will raise an exception. That is, if this
|
||||
method does not raise an exception, then someone should get your mail.
|
||||
If this method does not raise an exception, it returns a dictionary,
|
||||
with one entry for each recipient that was refused. Each entry contains a
|
||||
tuple of the SMTP error code and the accompanying error message sent by the server.
|
||||
|
||||
@raise smtplib.SMTPException: Look at the standard C{smtplib.SMTP.sendmail()} documentation.
|
||||
|
||||
"""
|
||||
if smtp_mode=='ssl':
|
||||
smtp=smtplib.SMTP_SSL(smtp_host, smtp_port)
|
||||
else:
|
||||
smtp=smtplib.SMTP(smtp_host, smtp_port)
|
||||
if smtp_mode=='tls':
|
||||
smtp.starttls()
|
||||
|
||||
if smtp_login and smtp_password:
|
||||
if sys.version_info<(3, 0):
|
||||
# python 2.x
|
||||
# login and password must be encoded
|
||||
# because HMAC used in CRAM_MD5 require non unicode string
|
||||
smtp.login(smtp_login.encode('utf-8'), smtp_password.encode('utf-8'))
|
||||
else:
|
||||
#python 3.x
|
||||
smtp.login(smtp_login, smtp_password)
|
||||
try:
|
||||
ret=smtp.sendmail(mail_from, rcpt_to, payload)
|
||||
finally:
|
||||
try:
|
||||
smtp.quit()
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
return ret
|
||||
|
||||
def send_mail(payload, mail_from, rcpt_to, smtp_host, smtp_port=25, smtp_mode='normal', smtp_login=None, smtp_password=None):
|
||||
"""
|
||||
Send the message to a SMTP host. Handle SSL, TLS and authentication.
|
||||
I{payload}, I{mail_from} and I{rcpt_to} can come from values returned by
|
||||
L{complete_mail()}. This function call L{send_mail2()} but catch all
|
||||
exceptions and return friendly error message instead.
|
||||
|
||||
@type payload: str
|
||||
@param payload: the mail content.
|
||||
@type mail_from: str
|
||||
@param mail_from: the sender address, for example: C{'me@domain.com'}.
|
||||
@type rcpt_to: list
|
||||
@param rcpt_to: The list of the recipient addresses in the form
|
||||
C{[ 'a@b.com', c@d.com', ]}. No names here, only email addresses.
|
||||
@type smtp_host: str
|
||||
@param smtp_host: the IP address or the name of the SMTP host.
|
||||
@type smtp_port: int
|
||||
@keyword smtp_port: the port to connect to on the SMTP host. Default is C{25}.
|
||||
@type smtp_mode: str
|
||||
@keyword smtp_mode: the way to connect to the SMTP host, can be:
|
||||
C{'normal'}, C{'ssl'} or C{'tls'}. default is C{'normal'}
|
||||
@type smtp_login: str or None
|
||||
@keyword smtp_login: If authentication is required, this is the login.
|
||||
Be carefull to I{UTF8} encode your login if it contains
|
||||
non I{us-ascii} characters.
|
||||
@type smtp_password: str or None
|
||||
@keyword smtp_password: If authentication is required, this is the password.
|
||||
Be carefull to I{UTF8} encode your password if it
|
||||
contains non I{us-ascii} characters.
|
||||
|
||||
@rtype: dict or str
|
||||
@return: This function return a dictionary of failed recipients
|
||||
or a string with an error message.
|
||||
|
||||
If all recipients have been accepted the dictionary is empty. If the
|
||||
returned value is a string, none of the recipients will get the message.
|
||||
|
||||
The dictionary is exactly of the same sort as
|
||||
smtplib.SMTP.sendmail() returns with one entry for each recipient that
|
||||
was refused. Each entry contains a tuple of the SMTP error code and
|
||||
the accompanying error message sent by the server.
|
||||
|
||||
Example:
|
||||
|
||||
>>> send_mail('Subject: hello\\n\\nmessage', 'a@foo.com', [ 'b@bar.com', ], 'localhost') #doctest: +SKIP
|
||||
{}
|
||||
|
||||
Here is how to use the returned value::
|
||||
if isinstance(ret, dict):
|
||||
if ret:
|
||||
print 'failed' recipients:
|
||||
for recipient, (code, msg) in ret.iteritems():
|
||||
print 'code=%d recipient=%s\terror=%s' % (code, recipient, msg)
|
||||
else:
|
||||
print 'success'
|
||||
else:
|
||||
print 'Error:', ret
|
||||
|
||||
To use your GMail account to send your mail::
|
||||
smtp_host='smtp.gmail.com'
|
||||
smtp_port=587
|
||||
smtp_mode='tls'
|
||||
smtp_login='your.gmail.addresse@gmail.com'
|
||||
smtp_password='your.gmail.password'
|
||||
|
||||
Use your GMail address for the sender !
|
||||
|
||||
"""
|
||||
|
||||
error=dict()
|
||||
try:
|
||||
ret=send_mail2(payload, mail_from, rcpt_to, smtp_host, smtp_port, smtp_mode, smtp_login, smtp_password)
|
||||
except (socket.error, ) as e:
|
||||
error='server %s:%s not responding: %s' % (smtp_host, smtp_port, e)
|
||||
except smtplib.SMTPAuthenticationError as e:
|
||||
error='authentication error: %s' % (e, )
|
||||
except smtplib.SMTPRecipientsRefused as e:
|
||||
# code, error=e.recipients[recipient_addr]
|
||||
error='all recipients refused: '+', '.join(list(e.recipients.keys()))
|
||||
except smtplib.SMTPSenderRefused as e:
|
||||
# e.sender, e.smtp_code, e.smtp_error
|
||||
error='sender refused: %s' % (e.sender, )
|
||||
except smtplib.SMTPDataError as e:
|
||||
error='SMTP protocol mismatch: %s' % (e, )
|
||||
except smtplib.SMTPHeloError as e:
|
||||
error="server didn't reply properly to the HELO greeting: %s" % (e, )
|
||||
except smtplib.SMTPException as e:
|
||||
error='SMTP error: %s' % (e, )
|
||||
# except Exception, e:
|
||||
# raise # unknown error
|
||||
else:
|
||||
# failed addresses and error messages
|
||||
error=ret
|
||||
|
||||
return error
|
||||
|
817
pyzmail/parse.py
817
pyzmail/parse.py
|
@ -1,817 +0,0 @@
|
|||
#
|
||||
# pyzmail/parse.py
|
||||
# (c) Alain Spineux <alain.spineux@gmail.com>
|
||||
# http://www.magiksys.net/pyzmail
|
||||
# Released under LGPL
|
||||
|
||||
"""
|
||||
Useful functions to parse emails
|
||||
|
||||
@var email_address_re: a regex that match well formed email address (from perlfaq9)
|
||||
@undocumented: atom_rfc2822
|
||||
@undocumented: atom_posfix_restricted
|
||||
@undocumented: atom
|
||||
@undocumented: dot_atom
|
||||
@undocumented: local
|
||||
@undocumented: domain_lit
|
||||
@undocumented: domain
|
||||
@undocumented: addr_spec
|
||||
"""
|
||||
|
||||
import re
|
||||
import io
|
||||
import email
|
||||
import email.errors
|
||||
import email.header
|
||||
import email.message
|
||||
import mimetypes
|
||||
|
||||
from .utils import *
|
||||
|
||||
# email address REGEX matching the RFC 2822 spec from perlfaq9
|
||||
# my $atom = qr{[a-zA-Z0-9_!#\$\%&'*+/=?\^`{}~|\-]+};
|
||||
# my $dot_atom = qr{$atom(?:\.$atom)*};
|
||||
# my $quoted = qr{"(?:\\[^\r\n]|[^\\"])*"};
|
||||
# my $local = qr{(?:$dot_atom|$quoted)};
|
||||
# my $domain_lit = qr{\[(?:\\\S|[\x21-\x5a\x5e-\x7e])*\]};
|
||||
# my $domain = qr{(?:$dot_atom|$domain_lit)};
|
||||
# my $addr_spec = qr{$local\@$domain};
|
||||
#
|
||||
# Python's translation
|
||||
atom_rfc2822=r"[a-zA-Z0-9_!#\$\%&'*+/=?\^`{}~|\-]+"
|
||||
atom_posfix_restricted=r"[a-zA-Z0-9_#\$&'*+/=?\^`{}~|\-]+" # without '!' and '%'
|
||||
atom=atom_rfc2822
|
||||
dot_atom=atom + r"(?:\." + atom + ")*"
|
||||
quoted=r'"(?:\\[^\r\n]|[^\\"])*"'
|
||||
local="(?:" + dot_atom + "|" + quoted + ")"
|
||||
domain_lit=r"\[(?:\\\S|[\x21-\x5a\x5e-\x7e])*\]"
|
||||
domain="(?:" + dot_atom + "|" + domain_lit + ")"
|
||||
addr_spec=local + "@" + domain
|
||||
# and the result
|
||||
email_address_re=re.compile('^'+addr_spec+'$')
|
||||
|
||||
class MailPart:
|
||||
"""
|
||||
Data related to a mail part (aka message content, attachment or
|
||||
embedded content in an email)
|
||||
|
||||
@type charset: str or None
|
||||
@ivar charset: the encoding of the I{get_payload()} content if I{type} is 'text/*'
|
||||
and charset has been specified in the message
|
||||
@type content_id: str or None
|
||||
@ivar content_id: the MIME Content-ID if specified in the message.
|
||||
@type description: str or None
|
||||
@ivar description: the MIME Content-Description if specified in the message.
|
||||
@type disposition: str or None
|
||||
@ivar disposition: C{None}, C{'inline'} or C{'attachment'} depending
|
||||
the MIME Content-Disposition value
|
||||
@type filename: unicode or None
|
||||
@ivar filename: the name of the file, if specified in the message.
|
||||
@type part: inherit from email.mime.base.MIMEBase
|
||||
@ivar part: the related part inside the message.
|
||||
@type is_body: str or None
|
||||
@ivar is_body: None if this part is not the mail content itself (an
|
||||
attachment or embedded content), C{'text/plain'} if this part is the
|
||||
text content or C{'text/html'} if this part is the HTML version.
|
||||
@type sanitized_filename: str or None
|
||||
@ivar sanitized_filename: This field is filled by L{PyzMessage} to store
|
||||
a valid unique filename related or not with the original filename.
|
||||
@type type: str
|
||||
@ivar type: the MIME type, like 'text/plain', 'image/png', 'application/msword' ...
|
||||
"""
|
||||
|
||||
def __init__(self, part, filename=None, type=None, charset=None, content_id=None, description=None, disposition=None, sanitized_filename=None, is_body=None):
|
||||
"""
|
||||
Create an mail part and initialize all attributes
|
||||
"""
|
||||
self.part=part # original python part
|
||||
self.filename=filename # filename in unicode (if any)
|
||||
self.type=type # the mime-type
|
||||
self.charset=charset # the charset (if any)
|
||||
self.description=description # if any
|
||||
self.disposition=disposition # 'inline', 'attachment' or None
|
||||
self.sanitized_filename=sanitized_filename # cleanup your filename here (TODO)
|
||||
self.is_body=is_body # usually in (None, 'text/plain' or 'text/html')
|
||||
self.content_id=content_id # if any
|
||||
if self.content_id:
|
||||
# strip '<>' to ease search and replace in "root" content (TODO)
|
||||
if self.content_id.startswith('<') and self.content_id.endswith('>'):
|
||||
self.content_id=self.content_id[1:-1]
|
||||
|
||||
def get_payload(self):
|
||||
"""
|
||||
decode and return part payload. if I{type} is 'text/*' and I{charset}
|
||||
not C{None}, be careful to take care of the text encoding. Use
|
||||
something like C{part.get_payload().decode(part.charset)}
|
||||
"""
|
||||
|
||||
payload=None
|
||||
if self.type.startswith('message/'):
|
||||
# I don't use msg.as_string() because I want to use mangle_from_=False
|
||||
if sys.version_info<(3, 0):
|
||||
# python 2.x
|
||||
from email.generator import Generator
|
||||
fp = io.StringIO()
|
||||
g = Generator(fp, mangle_from_=False)
|
||||
g.flatten(self.part, unixfrom=False)
|
||||
payload=fp.getvalue()
|
||||
else:
|
||||
# support only for python >= 3.2
|
||||
from email.generator import BytesGenerator
|
||||
import io
|
||||
fp = io.BytesIO()
|
||||
g = BytesGenerator(fp, mangle_from_=False)
|
||||
g.flatten(self.part, unixfrom=False)
|
||||
payload=fp.getvalue()
|
||||
|
||||
else:
|
||||
payload=self.part.get_payload(decode=True)
|
||||
return payload
|
||||
|
||||
def __repr__(self):
|
||||
st='MailPart<'
|
||||
if self.is_body:
|
||||
st+='*'
|
||||
st+=self.type
|
||||
if self.charset:
|
||||
st+=' charset='+self.charset
|
||||
if self.filename:
|
||||
st+=' filename='+self.filename
|
||||
if self.content_id:
|
||||
st+=' content_id='+self.content_id
|
||||
st+=' len=%d' % (len(self.get_payload()), )
|
||||
st+='>'
|
||||
return st
|
||||
|
||||
|
||||
|
||||
_line_end_re=re.compile('\r\n|\n\r|\n|\r')
|
||||
|
||||
def _friendly_header(header):
|
||||
"""
|
||||
Convert header returned by C{email.message.Message.get()} into a
|
||||
user friendly string.
|
||||
|
||||
Py3k C{email.message.Message.get()} return C{header.Header()} with charset
|
||||
set to C{charset.UNKNOWN8BIT} when the header contains invalid characters,
|
||||
else it return I{str} as Python 2.X does
|
||||
|
||||
@type header: str or email.header.Header
|
||||
@param header: the header to convert into a user friendly string
|
||||
|
||||
@rtype: str
|
||||
@returns: the converter header
|
||||
"""
|
||||
|
||||
save=header
|
||||
if isinstance(header, email.header.Header):
|
||||
header=str(header)
|
||||
|
||||
return re.sub(_line_end_re, ' ', header)
|
||||
|
||||
def decode_mail_header(value, default_charset='us-ascii'):
|
||||
"""
|
||||
Decode a header value into a unicode string.
|
||||
Works like a more smarter python
|
||||
C{u"".join(email.header.decode_header()} function
|
||||
|
||||
@type value: str
|
||||
@param value: the value of the header.
|
||||
@type default_charset: str
|
||||
@keyword default_charset: if one charset used in the header (multiple charset
|
||||
can be mixed) is unknown, then use this charset instead.
|
||||
|
||||
>>> decode_mail_header('=?iso-8859-1?q?Courrier_=E8lectronique_en_Fran=E7ais?=')
|
||||
u'Courrier \\xe8lectronique en Fran\\xe7ais'
|
||||
"""
|
||||
|
||||
# value=_friendly_header(value)
|
||||
try:
|
||||
headers=email.header.decode_header(value)
|
||||
except email.errors.HeaderParseError:
|
||||
# this can append in email.base64mime.decode(), for example for this value:
|
||||
# '=?UTF-8?B?15HXmdeh15jXqNeVINeY15DXpteUINeTJ9eV16jXlSDXkdeg15XXldeUINem15PXpywg15TXptei16bXldei15nXnSDXqdecINek15zXmdeZ?==?UTF-8?B?157XldeR15nXnCwg157Xldek16Ig157Xl9eV15wg15HXodeV15bXnyDXk9ec15DXnCDXldeh15gg157Xl9eR16rXldeqINep15wg15HXmdeQ?==?UTF-8?B?15zXmNeZ?='
|
||||
# then return a sanitized ascii string
|
||||
# TODO: some improvements are possible here, but a failure here is
|
||||
# unlikely
|
||||
return value.encode('us-ascii', 'replace').decode('us-ascii')
|
||||
else:
|
||||
for i, (text, charset) in enumerate(headers):
|
||||
# python 3.x
|
||||
# email.header.decode_header('a') -> [('a', None)]
|
||||
# email.header.decode_header('a =?ISO-8859-1?Q?foo?= b')
|
||||
# --> [(b'a', None), (b'foo', 'iso-8859-1'), (b'b', None)]
|
||||
# in Py3 text is sometime str and sometime byte :-(
|
||||
# python 2.x
|
||||
# email.header.decode_header('a') -> [('a', None)]
|
||||
# email.header.decode_header('a =?ISO-8859-1?Q?foo?= b')
|
||||
# --> [('a', None), ('foo', 'iso-8859-1'), ('b', None)]
|
||||
if (charset is None and sys.version_info>=(3, 0)):
|
||||
# Py3
|
||||
if isinstance(text, str):
|
||||
# convert Py3 string into bytes string to be sure their is no
|
||||
# non us-ascii chars and because next line expect byte string
|
||||
text=text.encode('us-ascii', 'replace')
|
||||
try:
|
||||
headers[i]=text.decode(charset or 'us-ascii', 'replace')
|
||||
except LookupError:
|
||||
# if the charset is unknown, force default
|
||||
headers[i]=text.decode(default_charset, 'replace')
|
||||
|
||||
return "".join(headers)
|
||||
|
||||
def get_mail_addresses(message, header_name):
|
||||
"""
|
||||
retrieve all email addresses from one message header
|
||||
|
||||
@type message: email.message.Message
|
||||
@param message: the email message
|
||||
@type header_name: str
|
||||
@param header_name: the name of the header, can be 'from', 'to', 'cc' or
|
||||
any other header containing one or more email addresses
|
||||
@rtype: list
|
||||
@returns: a list of the addresses in the form of tuples
|
||||
C{[(u'Name', 'addresse@domain.com'), ...]}
|
||||
|
||||
>>> import email
|
||||
>>> import email.mime.text
|
||||
>>> msg=email.mime.text.MIMEText('The text.', 'plain', 'us-ascii')
|
||||
>>> msg['From']=email.email.utils.formataddr(('Me', 'me@foo.com'))
|
||||
>>> msg['To']=email.email.utils.formataddr(('A', 'a@foo.com'))+', '+email.email.utils.formataddr(('B', 'b@foo.com'))
|
||||
>>> print msg.as_string(unixfrom=False)
|
||||
Content-Type: text/plain; charset="us-ascii"
|
||||
MIME-Version: 1.0
|
||||
Content-Transfer-Encoding: 7bit
|
||||
From: Me <me@foo.com>
|
||||
To: A <a@foo.com>, B <b@foo.com>
|
||||
<BLANKLINE>
|
||||
The text.
|
||||
>>> get_mail_addresses(msg, 'from')
|
||||
[(u'Me', 'me@foo.com')]
|
||||
>>> get_mail_addresses(msg, 'to')
|
||||
[(u'A', 'a@foo.com'), (u'B', 'b@foo.com')]
|
||||
"""
|
||||
addrs=email.utils.getaddresses([ _friendly_header(h) for h in message.get_all(header_name, [])])
|
||||
for i, (addr_name, addr) in enumerate(addrs):
|
||||
if not addr_name and addr:
|
||||
# only one string! Is it the address or the address name ?
|
||||
# use the same for both and see later
|
||||
addr_name=addr
|
||||
|
||||
if is_usascii(addr):
|
||||
# address must be ascii only and must match address regex
|
||||
if not email_address_re.match(addr):
|
||||
addr=''
|
||||
else:
|
||||
addr=''
|
||||
addrs[i]=(decode_mail_header(addr_name), addr)
|
||||
return addrs
|
||||
|
||||
def get_filename(part):
|
||||
"""
|
||||
Find the filename of a mail part. Many MUA send attachments with the
|
||||
filename in the I{name} parameter of the I{Content-type} header instead
|
||||
of in the I{filename} parameter of the I{Content-Disposition} header.
|
||||
|
||||
@type part: inherit from email.mime.base.MIMEBase
|
||||
@param part: the mail part
|
||||
@rtype: None or unicode
|
||||
@returns: the filename or None if not found
|
||||
|
||||
>>> import email.mime.image
|
||||
>>> attach=email.mime.image.MIMEImage('data', 'png')
|
||||
>>> attach.add_header('Content-Disposition', 'attachment', filename='image.png')
|
||||
>>> get_filename(attach)
|
||||
u'image.png'
|
||||
>>> print attach.as_string(unixfrom=False)
|
||||
Content-Type: image/png
|
||||
MIME-Version: 1.0
|
||||
Content-Transfer-Encoding: base64
|
||||
Content-Disposition: attachment; filename="image.png"
|
||||
<BLANKLINE>
|
||||
ZGF0YQ==
|
||||
>>> import email.mime.text
|
||||
>>> attach=email.mime.text.MIMEText('The text.', 'plain', 'us-ascii')
|
||||
>>> attach.add_header('Content-Disposition', 'attachment', filename=('iso-8859-1', 'fr', u'Fran\\xe7ais.txt'.encode('iso-8859-1')))
|
||||
>>> get_filename(attach)
|
||||
u'Fran\\xe7ais.txt'
|
||||
>>> print attach.as_string(unixfrom=False)
|
||||
Content-Type: text/plain; charset="us-ascii"
|
||||
MIME-Version: 1.0
|
||||
Content-Transfer-Encoding: 7bit
|
||||
Content-Disposition: attachment; filename*="iso-8859-1'fr'Fran%E7ais.txt"
|
||||
<BLANKLINE>
|
||||
The text.
|
||||
"""
|
||||
filename=part.get_param('filename', None, 'content-disposition')
|
||||
if not filename:
|
||||
filename=part.get_param('name', None) # default is 'content-type'
|
||||
|
||||
if filename:
|
||||
if isinstance(filename, tuple):
|
||||
# RFC 2231 must be used to encode parameters inside MIME header
|
||||
filename=email.utils.collapse_rfc2231_value(filename).strip()
|
||||
else:
|
||||
# But a lot of MUA erroneously use RFC 2047 instead of RFC 2231
|
||||
# in fact anybody missuse RFC2047 here !!!
|
||||
filename=decode_mail_header(filename)
|
||||
|
||||
return filename
|
||||
|
||||
def _search_message_content(contents, part):
|
||||
"""
|
||||
recursive search of message content (text or HTML) inside
|
||||
the structure of the email. Used by L{search_message_content()}
|
||||
|
||||
@type contents: dict
|
||||
@param contents: contents already found in parents or brothers I{parts}.
|
||||
The dictionary will be completed as and when. key is the MIME type of the part.
|
||||
@type part: inherit email.mime.base.MIMEBase
|
||||
@param part: the part of the mail to look inside recursively.
|
||||
"""
|
||||
type=part.get_content_type()
|
||||
if part.is_multipart(): # type.startswith('multipart/'):
|
||||
# explore only True 'multipart/*'
|
||||
# because 'messages/rfc822' are 'multipart/*' too but
|
||||
# must not be explored here
|
||||
if type=='multipart/related':
|
||||
# the first part or the one pointed by start
|
||||
start=part.get_param('start', None)
|
||||
related_type=part.get_param('type', None)
|
||||
for i, subpart in enumerate(part.get_payload()):
|
||||
if (not start and i==0) or (start and start==subpart.get('Content-Id')):
|
||||
_search_message_content(contents, subpart)
|
||||
return
|
||||
elif type=='multipart/alternative':
|
||||
# all parts are candidates and latest is the best
|
||||
for subpart in part.get_payload():
|
||||
_search_message_content(contents, subpart)
|
||||
elif type in ('multipart/report', 'multipart/signed'):
|
||||
# only the first part is candidate
|
||||
try:
|
||||
subpart=part.get_payload()[0]
|
||||
except IndexError:
|
||||
return
|
||||
else:
|
||||
_search_message_content(contents, subpart)
|
||||
return
|
||||
|
||||
elif type=='multipart/encrypted':
|
||||
# the second part is the good one, but we need to de-crypt it
|
||||
# using the first part. Do nothing
|
||||
return
|
||||
|
||||
else:
|
||||
# unknown types must be handled as 'multipart/mixed'
|
||||
# This is the peace of code that could probably be improved,
|
||||
# I use a heuristic : if not already found, use first valid non
|
||||
# 'attachment' parts found
|
||||
for subpart in part.get_payload():
|
||||
tmp_contents=dict()
|
||||
_search_message_content(tmp_contents, subpart)
|
||||
for k, v in tmp_contents.items():
|
||||
if not subpart.get_param('attachment', None, 'content-disposition')=='':
|
||||
# if not an attachment, initiate value if not already found
|
||||
contents.setdefault(k, v)
|
||||
return
|
||||
else:
|
||||
contents[part.get_content_type().lower()]=part
|
||||
return
|
||||
|
||||
return
|
||||
|
||||
def search_message_content(mail):
|
||||
"""
|
||||
search of message content (text or HTML) inside
|
||||
the structure of the mail. This function is used by L{get_mail_parts()}
|
||||
to set the C{is_body} part of the L{MailPart}s
|
||||
|
||||
@type mail: inherit from email.message.Message
|
||||
@param mail: the message to search in.
|
||||
@rtype: dict
|
||||
@returns: a dictionary of the form C{{'text/plain': text_part, 'text/html': html_part}}
|
||||
where text_part and html_part inherite from C{email.mime.text.MIMEText}
|
||||
and are respectively the I{text} and I{HTML} version of the message content.
|
||||
One part can be missing. The dictionay can aven be empty if none of the
|
||||
parts math the requirements to be considered as the content.
|
||||
"""
|
||||
contents=dict()
|
||||
_search_message_content(contents, mail)
|
||||
return contents
|
||||
|
||||
def get_mail_parts(msg):
|
||||
"""
|
||||
return a list of all parts of the message as a list of L{MailPart}.
|
||||
Retrieve parts attributes to fill in L{MailPart} object.
|
||||
|
||||
@type msg: inherit email.message.Message
|
||||
@param msg: the message
|
||||
@rtype: list
|
||||
@returns: list of mail parts
|
||||
|
||||
>>> import email.mime.multipart
|
||||
>>> msg=email.mime.multipart.MIMEMultipart(boundary='===limit1==')
|
||||
>>> import email.mime.text
|
||||
>>> txt=email.mime.text.MIMEText('The text.', 'plain', 'us-ascii')
|
||||
>>> msg.attach(txt)
|
||||
>>> import email.mime.image
|
||||
>>> image=email.mime.image.MIMEImage('data', 'png')
|
||||
>>> image.add_header('Content-Disposition', 'attachment', filename='image.png')
|
||||
>>> msg.attach(image)
|
||||
>>> print msg.as_string(unixfrom=False)
|
||||
Content-Type: multipart/mixed; boundary="===limit1=="
|
||||
MIME-Version: 1.0
|
||||
<BLANKLINE>
|
||||
--===limit1==
|
||||
Content-Type: text/plain; charset="us-ascii"
|
||||
MIME-Version: 1.0
|
||||
Content-Transfer-Encoding: 7bit
|
||||
<BLANKLINE>
|
||||
The text.
|
||||
--===limit1==
|
||||
Content-Type: image/png
|
||||
MIME-Version: 1.0
|
||||
Content-Transfer-Encoding: base64
|
||||
Content-Disposition: attachment; filename="image.png"
|
||||
<BLANKLINE>
|
||||
ZGF0YQ==
|
||||
--===limit1==--
|
||||
>>> parts=get_mail_parts(msg)
|
||||
>>> parts
|
||||
[MailPart<*text/plain charset=us-ascii len=9>, MailPart<image/png filename=image.png len=4>]
|
||||
>>> # the star "*" means this is the mail content, not an attachment
|
||||
>>> parts[0].get_payload().decode(parts[0].charset)
|
||||
u'The text.'
|
||||
>>> parts[1].filename, len(parts[1].get_payload())
|
||||
(u'image.png', 4)
|
||||
|
||||
"""
|
||||
mailparts=[]
|
||||
|
||||
# retrieve messages of the email
|
||||
contents=search_message_content(msg)
|
||||
# reverse contents dict
|
||||
parts=dict((v,k) for k, v in contents.items())
|
||||
|
||||
# organize the stack to handle deep first search
|
||||
stack=[ msg, ]
|
||||
while stack:
|
||||
part=stack.pop(0)
|
||||
type=part.get_content_type()
|
||||
if type.startswith('message/'):
|
||||
# ('message/delivery-status', 'message/rfc822', 'message/disposition-notification'):
|
||||
# I don't want to explore the tree deeper her and just save source using msg.as_string()
|
||||
# but I don't use msg.as_string() because I want to use mangle_from_=False
|
||||
filename='message.eml'
|
||||
mailparts.append(MailPart(part, filename=filename, type=type, charset=part.get_param('charset'), description=part.get('Content-Description')))
|
||||
elif part.is_multipart():
|
||||
# insert new parts at the beginning of the stack (deep first search)
|
||||
stack[:0]=part.get_payload()
|
||||
else:
|
||||
charset=part.get_param('charset')
|
||||
filename=get_filename(part)
|
||||
|
||||
disposition=None
|
||||
if part.get_param('inline', None, 'content-disposition')=='':
|
||||
disposition='inline'
|
||||
elif part.get_param('attachment', None, 'content-disposition')=='':
|
||||
disposition='attachment'
|
||||
|
||||
mailparts.append(MailPart(part, filename=filename, type=type, charset=charset, content_id=part.get('Content-Id'), description=part.get('Content-Description'), disposition=disposition, is_body=parts.get(part, False)))
|
||||
|
||||
return mailparts
|
||||
|
||||
|
||||
def decode_text(payload, charset, default_charset):
|
||||
"""
|
||||
Try to decode text content by trying multiple charset until success.
|
||||
First try I{charset}, else try I{default_charset} finally
|
||||
try popular charsets in order : ascii, utf-8, utf-16, windows-1252, cp850
|
||||
If all fail then use I{default_charset} and replace wrong characters
|
||||
|
||||
@type payload: str
|
||||
@param payload: the content to decode
|
||||
@type charset: str or None
|
||||
@param charset: the first charset to try if != C{None}
|
||||
@type default_charset: str or None
|
||||
@param default_charset: the second charset to try if != C{None}
|
||||
|
||||
@rtype: tuple
|
||||
@returns: a tuple of the form C{(payload, charset)}
|
||||
- I{payload}: this is the decoded payload if charset is not None and
|
||||
payload is a unicode string
|
||||
- I{charset}: the charset that was used to decode I{payload} If charset is
|
||||
C{None} then something goes wrong: if I{payload} is unicode then
|
||||
invalid characters have been replaced and the used charset is I{default_charset}
|
||||
else, if I{payload} is still byte string then nothing has been done.
|
||||
|
||||
|
||||
"""
|
||||
for chset in [ charset, default_charset, 'ascii', 'utf-8', 'utf-16', 'windows-1252', 'cp850' ]:
|
||||
if chset:
|
||||
try:
|
||||
return payload.decode(chset), chset
|
||||
except UnicodeError:
|
||||
pass
|
||||
|
||||
if default_charset:
|
||||
return payload.decode(chset, 'replace'), None
|
||||
|
||||
return payload, None
|
||||
|
||||
class PyzMessage(email.message.Message):
|
||||
"""
|
||||
Inherit from email.message.Message. Combine L{get_mail_parts()},
|
||||
L{get_mail_addresses()} and L{decode_mail_header()} into a
|
||||
B{convenient} object to access mail contents and attributes.
|
||||
This class also B{sanitize} part filenames.
|
||||
|
||||
@type mailparts: list of L{MailPart}
|
||||
@ivar mailparts: list of L{MailPart} objects composing the email, I{text_part}
|
||||
and I{html_part} are part of this list as are other attachements and embedded
|
||||
contents.
|
||||
@type text_part: L{MailPart} or None
|
||||
@ivar text_part: the L{MailPart} object that contains the I{text}
|
||||
version of the message, None if the mail has not I{text} content.
|
||||
@type html_part: L{MailPart} or None
|
||||
@ivar html_part: the L{MailPart} object that contains the I{HTML}
|
||||
version of the message, None if the mail has not I{HTML} content.
|
||||
|
||||
@note: Sample:
|
||||
|
||||
>>> raw='''Content-Type: text/plain; charset="us-ascii"
|
||||
... MIME-Version: 1.0
|
||||
... Content-Transfer-Encoding: 7bit
|
||||
... Subject: The subject
|
||||
... From: Me <me@foo.com>
|
||||
... To: A <a@foo.com>, B <b@foo.com>
|
||||
...
|
||||
... The text.
|
||||
... '''
|
||||
>>> msg=PyzMessage.factory(raw)
|
||||
>>> print 'Subject: %r' % (msg.get_subject(), )
|
||||
Subject: u'The subject'
|
||||
>>> print 'From: %r' % (msg.get_address('from'), )
|
||||
From: (u'Me', 'me@foo.com')
|
||||
>>> print 'To: %r' % (msg.get_addresses('to'), )
|
||||
To: [(u'A', 'a@foo.com'), (u'B', 'b@foo.com')]
|
||||
>>> print 'Cc: %r' % (msg.get_addresses('cc'), )
|
||||
Cc: []
|
||||
>>> for mailpart in msg.mailparts:
|
||||
... print ' %sfilename=%r sanitized_filename=%r type=%s charset=%s desc=%s size=%d' % ('*'if mailpart.is_body else ' ', mailpart.filename, mailpart.sanitized_filename, mailpart.type, mailpart.charset, mailpart.part.get('Content-Description'), 0 if mailpart.get_payload()==None else len(mailpart.get_payload()))
|
||||
... if mailpart.is_body=='text/plain':
|
||||
... payload, used_charset=decode_text(mailpart.get_payload(), mailpart.charset, None)
|
||||
... print ' >', payload.split('\\n')[0]
|
||||
...
|
||||
*filename=None sanitized_filename='text.txt' type=text/plain charset=us-ascii desc=None size=10
|
||||
> The text.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def smart_parser(input):
|
||||
"""
|
||||
Use the appropriate parser and return a email.message.Message object
|
||||
(this is not a L{PyzMessage} object)
|
||||
|
||||
@type input: string, file, bytes, binary_file or email.message.Message
|
||||
@param input: the source of the message
|
||||
@rtype: email.message.Message
|
||||
@returns: the message
|
||||
"""
|
||||
if isinstance(input, email.message.Message):
|
||||
return input
|
||||
|
||||
if sys.version_info<(3, 0):
|
||||
# python 2.x
|
||||
if isinstance(input, str):
|
||||
return email.message_from_string(input)
|
||||
elif hasattr(input, 'read') and hasattr(input, 'readline'):
|
||||
return email.message_from_file(input)
|
||||
else:
|
||||
raise ValueError('input must be a string, a file or a Message')
|
||||
else:
|
||||
# python 3.x
|
||||
if isinstance(input, str):
|
||||
return email.message_from_string(input)
|
||||
elif isinstance(input, bytes):
|
||||
# python >= 3.2 only
|
||||
return email.message_from_bytes(input)
|
||||
elif hasattr(input, 'read') and hasattr(input, 'readline'):
|
||||
if hasattr(input, 'encoding'):
|
||||
# python >= 3.2 only
|
||||
return email.message_from_file(input)
|
||||
else:
|
||||
return email.message_from_binary_file(input)
|
||||
else:
|
||||
raise ValueError('input must be a string a bytes, a file or a Message')
|
||||
|
||||
@staticmethod
|
||||
def factory(input):
|
||||
"""
|
||||
Use the appropriate parser and return a L{PyzMessage} object
|
||||
see L{smart_parser}
|
||||
@type input: string, file, bytes, binary_file or email.message.Message
|
||||
@param input: the source of the message
|
||||
@rtype: L{PyzMessage}
|
||||
@returns: the L{PyzMessage} message
|
||||
"""
|
||||
return PyzMessage(PyzMessage.smart_parser(input))
|
||||
|
||||
|
||||
def __init__(self, message):
|
||||
"""
|
||||
Initialize the object with data coming from I{message}.
|
||||
|
||||
@type message: inherit email.message.Message
|
||||
@param message: The message
|
||||
"""
|
||||
if not isinstance(message, email.message.Message):
|
||||
raise ValueError("message must inherit from email.message.Message use PyzMessage.factory() instead")
|
||||
self.__dict__.update(message.__dict__)
|
||||
|
||||
self.mailparts=get_mail_parts(self)
|
||||
self.text_part=None
|
||||
self.html_part=None
|
||||
|
||||
filenames=[]
|
||||
for part in self.mailparts:
|
||||
ext=mimetypes.guess_extension(part.type)
|
||||
if not ext:
|
||||
# default to .bin
|
||||
ext='.bin'
|
||||
elif ext=='.ksh':
|
||||
# guess_extension() is not very accurate, .txt is more
|
||||
# appropriate than .ksh
|
||||
ext='.txt'
|
||||
|
||||
sanitized_filename=sanitize_filename(part.filename, part.type.split('/', 1)[0], ext)
|
||||
sanitized_filename=handle_filename_collision(sanitized_filename, filenames)
|
||||
filenames.append(sanitized_filename.lower())
|
||||
part.sanitized_filename=sanitized_filename
|
||||
|
||||
if part.is_body=='text/plain':
|
||||
self.text_part=part
|
||||
|
||||
if part.is_body=='text/html':
|
||||
self.html_part=part
|
||||
|
||||
def get_addresses(self, name):
|
||||
"""
|
||||
return the I{name} header value as an list of addresses tuple as
|
||||
returned by L{get_mail_addresses()}
|
||||
|
||||
@type name: str
|
||||
@param name: the name of the header to read value from: 'to', 'cc' are
|
||||
valid I{name} here.
|
||||
@rtype: tuple
|
||||
@returns: a tuple of the form C{('Sender Name', 'sender.address@domain.com')}
|
||||
or C{('', '')} if no header match that I{name}.
|
||||
"""
|
||||
return get_mail_addresses(self, name)
|
||||
|
||||
def get_address(self, name):
|
||||
"""
|
||||
return the I{name} header value as an address tuple as returned by
|
||||
L{get_mail_addresses()}
|
||||
|
||||
@type name: str
|
||||
@param name: the name of the header to read value from: : C{'from'} can
|
||||
be used to return the sender address.
|
||||
@rtype: list of tuple
|
||||
@returns: a list of tuple of the form C{[('Recipient Name', 'recipient.address@domain.com'), ...]}
|
||||
or an empty list if no header match that I{name}.
|
||||
"""
|
||||
value=get_mail_addresses(self, name)
|
||||
if value:
|
||||
return value[0]
|
||||
else:
|
||||
return ('', '')
|
||||
|
||||
def get_subject(self, default=''):
|
||||
"""
|
||||
return the RFC2047 decoded subject.
|
||||
|
||||
@type default: any
|
||||
@param default: The value to return if the message has no I{Subject}
|
||||
@rtype: unicode
|
||||
@returns: the subject or C{default}
|
||||
"""
|
||||
return self.get_decoded_header('subject', default)
|
||||
|
||||
def get_decoded_header(self, name, default=''):
|
||||
"""
|
||||
return decoded header I{name} using RFC2047. Always use this function
|
||||
to access header, because any header can contain invalid characters
|
||||
and this function sanitize the string and avoid unicode exception later
|
||||
in your program.
|
||||
EVEN for date, I already saw a "Center box bar horizontal" instead
|
||||
of a minus character.
|
||||
|
||||
@type name: str
|
||||
@param name: the name of the header to read value from.
|
||||
@type default: any
|
||||
@param default: The value to return if the I{name} field don't exist
|
||||
in this message.
|
||||
@rtype: unicode
|
||||
@returns: the value of the header having that I{name} or C{default} if no
|
||||
header have that name.
|
||||
"""
|
||||
value=self.get(name)
|
||||
if value==None:
|
||||
value=default
|
||||
else:
|
||||
value=decode_mail_header(value)
|
||||
return value
|
||||
|
||||
class PzMessage(PyzMessage):
|
||||
"""
|
||||
Old name and interface for PyzMessage.
|
||||
B{Deprecated}
|
||||
"""
|
||||
|
||||
def __init__(self, input):
|
||||
"""
|
||||
Initialize the object with data coming from I{input}.
|
||||
|
||||
@type input: str or file or email.message.Message
|
||||
@param input: used as the raw content for the email, can be a string,
|
||||
a file object or an email.message.Message object.
|
||||
"""
|
||||
PyzMessage.__init__(self, self.smart_parser(input))
|
||||
|
||||
|
||||
def message_from_string(s, *args, **kws):
|
||||
"""
|
||||
Parse a string into a L{PyzMessage} object model.
|
||||
@type s: str
|
||||
@param s: the input string
|
||||
@rtype: L{PyzMessage}
|
||||
@return: the L{PyzMessage} object
|
||||
"""
|
||||
return PyzMessage(email.message_from_string(s, *args, **kws))
|
||||
|
||||
def message_from_file(fp, *args, **kws):
|
||||
"""
|
||||
Read a file and parse its contents into a L{PyzMessage} object model.
|
||||
@type fp: text_file
|
||||
@param fp: the input file (must be open in text mode if Python >= 3.0)
|
||||
@rtype: L{PyzMessage}
|
||||
@return: the L{PyzMessage} object
|
||||
"""
|
||||
return PyzMessage(email.message_from_file(fp, *args, **kws))
|
||||
|
||||
def message_from_bytes(s, *args, **kws):
|
||||
"""
|
||||
Parse a bytes string into a L{PyzMessage} object model.
|
||||
B{(Python >= 3.2)}
|
||||
@type s: bytes
|
||||
@param s: the input bytes string
|
||||
@rtype: L{PyzMessage}
|
||||
@return: the L{PyzMessage} object
|
||||
"""
|
||||
return PyzMessage(email.message_from_bytes(s, *args, **kws))
|
||||
|
||||
def message_from_binary_file(fp, *args, **kws):
|
||||
"""
|
||||
Read a binary file and parse its contents into a L{PyzMessage} object model.
|
||||
B{(Python >= 3.2)}
|
||||
@type fp: binary_file
|
||||
@param fp: the input file, must be open in binary mode
|
||||
@rtype: L{PyzMessage}
|
||||
@return: the L{PyzMessage} object
|
||||
"""
|
||||
return PyzMessage(email.message_from_binary_file(fp, *args, **kws))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
|
||||
if len(sys.argv)<=1:
|
||||
print('usage : %s filename' % sys.argv[0])
|
||||
print('read an email from file and display a resume of its content')
|
||||
sys.exit(1)
|
||||
|
||||
msg=PyzMessage.factory(open(sys.argv[1], 'rb'))
|
||||
|
||||
print('Subject: %r' % (msg.get_subject(), ))
|
||||
print('From: %r' % (msg.get_address('from'), ))
|
||||
print('To: %r' % (msg.get_addresses('to'), ))
|
||||
print('Cc: %r' % (msg.get_addresses('cc'), ))
|
||||
print('Date: %r' % (msg.get_decoded_header('date', ''), ))
|
||||
print('Message-Id: %r' % (msg.get_decoded_header('message-id', ''), ))
|
||||
|
||||
for mailpart in msg.mailparts:
|
||||
# dont forget to be careful to sanitize 'filename' and be carefull
|
||||
# for filename collision, to before to save :
|
||||
print(' %sfilename=%r type=%s charset=%s desc=%s size=%d' % ('*'if mailpart.is_body else ' ', mailpart.filename, mailpart.type, mailpart.charset, mailpart.part.get('Content-Description'), 0 if mailpart.get_payload()==None else len(mailpart.get_payload())))
|
||||
|
||||
if mailpart.is_body=='text/plain':
|
||||
# print first 3 lines
|
||||
payload, used_charset=decode_text(mailpart.get_payload(), mailpart.charset, None)
|
||||
for line in payload.split('\n')[:3]:
|
||||
# be careful console can be unable to display unicode characters
|
||||
if line:
|
||||
print(' >', line)
|
||||
|
||||
|
||||
|
|
@ -1,99 +0,0 @@
|
|||
import unittest
|
||||
import pyzmail
|
||||
from pyzmail.generate import *
|
||||
from pyzmail.parse import *
|
||||
|
||||
class TestBoth(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
pass
|
||||
|
||||
def test_compose_and_parse(self):
|
||||
"""test generate and parse"""
|
||||
|
||||
sender=('Me', 'me@foo.com')
|
||||
recipients=[('Him', 'him@bar.com'), 'just@me.com']
|
||||
subject='Le sujet en Fran\xe7ais'
|
||||
text_content='Bonjour aux Fran\xe7ais'
|
||||
prefered_encoding='iso-8859-1'
|
||||
text_encoding='iso-8859-1'
|
||||
attachments=[('attached content', 'text', 'plain', 'textfile1.txt', 'us-ascii'),
|
||||
('Fran\xe7ais', 'text', 'plain', 'textfile2.txt', 'iso-8859-1'),
|
||||
('Fran\xe7ais', 'text', 'plain', 'textfile3.txt', 'iso-8859-1'),
|
||||
(b'image', 'image', 'jpg', 'imagefile.jpg', None),
|
||||
]
|
||||
embeddeds=[('embedded content', 'text', 'plain', 'embedded', 'us-ascii'),
|
||||
(b'picture', 'image', 'png', 'picture', None),
|
||||
]
|
||||
headers=[ ('X-extra', 'extra value'), ('X-extra2', "Seconde ent\xe8te"), ('X-extra3', 'last extra'),]
|
||||
|
||||
message_id_string='pyzmail'
|
||||
date=1313558269
|
||||
|
||||
payload, mail_from, rcpt_to, msg_id=pyzmail.compose_mail(\
|
||||
sender, \
|
||||
recipients, \
|
||||
subject, \
|
||||
prefered_encoding, \
|
||||
(text_content, text_encoding), \
|
||||
html=None, \
|
||||
attachments=attachments, \
|
||||
embeddeds=embeddeds, \
|
||||
headers=headers, \
|
||||
message_id_string=message_id_string, \
|
||||
date=date\
|
||||
)
|
||||
|
||||
msg=PyzMessage.factory(payload)
|
||||
|
||||
self.assertEqual(sender, msg.get_address('from'))
|
||||
self.assertEqual(recipients[0], msg.get_addresses('to')[0])
|
||||
self.assertEqual(recipients[1], msg.get_addresses('to')[1][1])
|
||||
self.assertEqual(subject, msg.get_subject())
|
||||
self.assertEqual(subject, msg.get_decoded_header('subject'))
|
||||
|
||||
# try to handle different timezone carefully
|
||||
mail_date=list(email.utils.parsedate(msg.get_decoded_header('date')))
|
||||
self.assertEqual(mail_date[:6], list(time.localtime(date))[:6])
|
||||
|
||||
self.assertNotEqual(msg.get('message-id').find(message_id_string), -1)
|
||||
for name, value in headers:
|
||||
self.assertEqual(value, msg.get_decoded_header(name))
|
||||
|
||||
for mailpart in msg.mailparts:
|
||||
if mailpart.is_body:
|
||||
self.assertEqual(mailpart.content_id, None)
|
||||
self.assertEqual(mailpart.filename, None)
|
||||
self.assertEqual(type(mailpart.sanitized_filename), str)
|
||||
if mailpart.type=='text/plain':
|
||||
self.assertEqual(mailpart.get_payload(), text_content.encode(text_encoding))
|
||||
else:
|
||||
self.fail('found unknown body part')
|
||||
else:
|
||||
if mailpart.filename:
|
||||
lst=attachments
|
||||
self.assertEqual(mailpart.filename, mailpart.sanitized_filename)
|
||||
self.assertEqual(mailpart.content_id, None)
|
||||
elif mailpart.content_id:
|
||||
lst=embeddeds
|
||||
self.assertEqual(mailpart.filename, None)
|
||||
else:
|
||||
self.fail('found unknown part')
|
||||
|
||||
found=False
|
||||
for attach in lst:
|
||||
found=(mailpart.filename and attach[3]==mailpart.filename) \
|
||||
or (mailpart.content_id and attach[3]==mailpart.content_id)
|
||||
if found:
|
||||
break
|
||||
|
||||
if found:
|
||||
self.assertEqual(mailpart.type, attach[1]+'/'+attach[2])
|
||||
payload=mailpart.get_payload()
|
||||
if attach[1]=='text' and attach[4] and isinstance(attach[0], str):
|
||||
payload=payload.decode(attach[4])
|
||||
self.assertEqual(payload, attach[0])
|
||||
else:
|
||||
self.fail('found unknown attachment')
|
||||
|
||||
|
|
@ -1,30 +0,0 @@
|
|||
import unittest, doctest
|
||||
import pyzmail
|
||||
from pyzmail.generate import *
|
||||
|
||||
class TestGenerate(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
pass
|
||||
|
||||
def test_format_addresses(self):
|
||||
"""test format_addresse"""
|
||||
self.assertEqual('foo@example.com', str(format_addresses([ 'foo@example.com', ])))
|
||||
self.assertEqual('Foo <foo@example.com>', str(format_addresses([ ('Foo', 'foo@example.com'), ])))
|
||||
# notice the space around the comma
|
||||
self.assertEqual('foo@example.com , bar@example.com', str(format_addresses([ 'foo@example.com', 'bar@example.com'])))
|
||||
# notice the space around the comma
|
||||
self.assertEqual('Foo <foo@example.com> , Bar <bar@example.com>', str(format_addresses([ ('Foo', 'foo@example.com'), ( 'Bar', 'bar@example.com')])))
|
||||
|
||||
# Add doctest
|
||||
def load_tests(loader, tests, ignore):
|
||||
# this works with python 2.7 and 3.x
|
||||
tests.addTests(doctest.DocTestSuite(pyzmail.generate))
|
||||
return tests
|
||||
|
||||
def additional_tests():
|
||||
# Add doctest for python 2.6 and below
|
||||
if sys.version_info<(2, 7):
|
||||
return doctest.DocTestSuite(pyzmail.generate)
|
||||
else:
|
||||
return unittest.TestSuite()
|
|
@ -1,290 +0,0 @@
|
|||
import unittest, doctest
|
||||
import pyzmail
|
||||
from pyzmail.parse import *
|
||||
|
||||
|
||||
class Msg:
|
||||
"""mimic a email.Message"""
|
||||
def __init__(self, value):
|
||||
self.value=value
|
||||
|
||||
def get_all(self, header_name, default):
|
||||
if self.value:
|
||||
return [self.value, ]
|
||||
else:
|
||||
return []
|
||||
|
||||
class TestParse(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
pass
|
||||
|
||||
def test_decode_mail_header(self):
|
||||
"""test decode_mail_header()"""
|
||||
self.assertEqual(decode_mail_header(''), '')
|
||||
self.assertEqual(decode_mail_header('hello'), 'hello')
|
||||
self.assertEqual(decode_mail_header('hello '), 'hello ')
|
||||
self.assertEqual(decode_mail_header('=?iso-8859-1?q?Courrier_=E8lectronique_Fran=E7ais?='), 'Courrier \xe8lectronique Fran\xe7ais')
|
||||
self.assertEqual(decode_mail_header('=?utf8?q?Courrier_=C3=A8lectronique_Fran=C3=A7ais?='), 'Courrier \xe8lectronique Fran\xe7ais')
|
||||
self.assertEqual(decode_mail_header('=?utf-8?b?RnJhbsOnYWlz?='), 'Fran\xe7ais')
|
||||
self.assertEqual(decode_mail_header('=?iso-8859-1?q?Courrier_=E8lectronique_?= =?utf8?q?Fran=C3=A7ais?='), 'Courrier \xe8lectronique Fran\xe7ais')
|
||||
self.assertEqual(decode_mail_header('=?iso-8859-1?q?Courrier_=E8lectronique_?= =?utf-8?b?RnJhbsOnYWlz?='), 'Courrier \xe8lectronique Fran\xe7ais')
|
||||
self.assertEqual(decode_mail_header('h_subject_q_iso_8858_1 : =?ISO-8859-1?Q?Fran=E7ais=E20accentu=E9?= !'), 'h_subject_q_iso_8858_1 :Fran\xe7ais\xe20accentu\xe9!')
|
||||
|
||||
def test_get_mail_addresses(self):
|
||||
"""test get_mail_addresses()"""
|
||||
self.assertEqual([ ('foo@example.com', 'foo@example.com') ], get_mail_addresses(Msg('foo@example.com'), 'to'))
|
||||
self.assertEqual([ ('Foo', 'foo@example.com'), ], get_mail_addresses(Msg('Foo <foo@example.com>'), 'to'))
|
||||
# notice the space around the comma
|
||||
self.assertEqual([ ('foo@example.com', 'foo@example.com'), ('bar@example.com', 'bar@example.com')], get_mail_addresses(Msg('foo@example.com , bar@example.com'), 'to'))
|
||||
self.assertEqual([ ('Foo', 'foo@example.com'), ( 'Bar', 'bar@example.com')], get_mail_addresses(Msg('Foo <foo@example.com> , Bar <bar@example.com>'), 'to'))
|
||||
self.assertEqual([ ('Foo', 'foo@example.com'), ('bar@example.com', 'bar@example.com')], get_mail_addresses(Msg('Foo <foo@example.com> , bar@example.com'), 'to'))
|
||||
self.assertEqual([ ('Mr Foo', 'foo@example.com'), ('bar@example.com', 'bar@example.com')], get_mail_addresses(Msg('Mr\nFoo <foo@example.com> , bar@example.com'), 'to'))
|
||||
|
||||
self.assertEqual([ ('Beno\xeet', 'benoit@example.com')], get_mail_addresses(Msg('=?utf-8?q?Beno=C3=AEt?= <benoit@example.com>'), 'to'))
|
||||
|
||||
# address already encoded into utf8 (bad)
|
||||
address='Ant\xf3nio Foo <a.foo@example.com>'.encode('utf8')
|
||||
if sys.version_info<(3, 0):
|
||||
self.assertEqual([('Ant\ufffd\ufffdnio Foo', 'a.foo@example.com')], get_mail_addresses(Msg(address), 'to'))
|
||||
else:
|
||||
# Python 3.2 return header when surrogate characters are used in header
|
||||
self.assertEqual([('Ant??nio Foo', 'a.foo@example.com'), ], get_mail_addresses(Msg(email.header.Header(address, charset=email.charset.UNKNOWN8BIT, header_name='to')), 'to'))
|
||||
|
||||
def test_get_filename(self):
|
||||
"""test get_filename()"""
|
||||
import email.mime.image
|
||||
|
||||
filename='Fran\xe7ais.png'
|
||||
if sys.version_info<(3, 0):
|
||||
encoded_filename=filename.encode('iso-8859-1')
|
||||
else:
|
||||
encoded_filename=filename
|
||||
|
||||
payload=b'data'
|
||||
attach=email.mime.image.MIMEImage(payload, 'png')
|
||||
attach.add_header('Content-Disposition', 'attachment', filename='image.png')
|
||||
self.assertEqual('image.png', get_filename(attach))
|
||||
|
||||
attach=email.mime.image.MIMEImage(payload, 'png')
|
||||
attach.add_header('Content-Disposition', 'attachment', filename=('iso-8859-1', 'fr', encoded_filename))
|
||||
self.assertEqual('Fran\xe7ais.png', get_filename(attach))
|
||||
|
||||
attach=email.mime.image.MIMEImage(payload, 'png')
|
||||
attach.set_param('name', 'image.png')
|
||||
self.assertEqual('image.png', get_filename(attach))
|
||||
|
||||
attach=email.mime.image.MIMEImage(payload, 'png')
|
||||
attach.set_param('name', ('iso-8859-1', 'fr', encoded_filename))
|
||||
self.assertEqual('Fran\xe7ais.png', get_filename(attach))
|
||||
|
||||
attach=email.mime.image.MIMEImage(payload, 'png')
|
||||
attach.add_header('Content-Disposition', 'attachment', filename='image.png')
|
||||
attach.set_param('name', 'image_wrong.png')
|
||||
self.assertEqual('image.png', get_filename(attach))
|
||||
|
||||
def test_get_mailparts(self):
|
||||
"""test get_mailparts()"""
|
||||
import email.mime.multipart
|
||||
import email.mime.text
|
||||
import email.mime.image
|
||||
msg=email.mime.multipart.MIMEMultipart(boundary='===limit1==')
|
||||
txt=email.mime.text.MIMEText('The text.', 'plain', 'us-ascii')
|
||||
msg.attach(txt)
|
||||
image=email.mime.image.MIMEImage(b'data', 'png')
|
||||
image.add_header('Content-Disposition', 'attachment', filename='image.png')
|
||||
image.add_header('Content-Description', 'the description')
|
||||
image.add_header('Content-ID', '<this.is.the.normaly.unique.contentid>')
|
||||
msg.attach(image)
|
||||
|
||||
raw=msg.as_string(unixfrom=False)
|
||||
expected_raw="""Content-Type: multipart/mixed; boundary="===limit1=="
|
||||
MIME-Version: 1.0
|
||||
|
||||
--===limit1==
|
||||
Content-Type: text/plain; charset="us-ascii"
|
||||
MIME-Version: 1.0
|
||||
Content-Transfer-Encoding: 7bit
|
||||
|
||||
The text.
|
||||
--===limit1==
|
||||
Content-Type: image/png
|
||||
MIME-Version: 1.0
|
||||
Content-Transfer-Encoding: base64
|
||||
Content-Disposition: attachment; filename="image.png"
|
||||
Content-Description: the description
|
||||
Content-ID: <this.is.the.normaly.unique.contentid>
|
||||
|
||||
ZGF0YQ==<HERE1>
|
||||
--===limit1==--"""
|
||||
|
||||
if sys.version_info<(3, 0):
|
||||
expected_raw=expected_raw.replace('<HERE1>','')
|
||||
else:
|
||||
expected_raw=expected_raw.replace('<HERE1>','\n')
|
||||
|
||||
self.assertEqual(raw, expected_raw)
|
||||
|
||||
parts=get_mail_parts(msg)
|
||||
# [MailPart<*text/plain charset=us-ascii len=9>, MailPart<image/png filename=image.png len=4>]
|
||||
|
||||
self.assertEqual(len(parts), 2)
|
||||
|
||||
self.assertEqual(parts[0].type, 'text/plain')
|
||||
self.assertEqual(parts[0].is_body, 'text/plain') # not a error, is_body must be type
|
||||
self.assertEqual(parts[0].charset, 'us-ascii')
|
||||
self.assertEqual(parts[0].get_payload().decode(parts[0].charset), 'The text.')
|
||||
|
||||
self.assertEqual(parts[1].type, 'image/png')
|
||||
self.assertEqual(parts[1].is_body, False)
|
||||
self.assertEqual(parts[1].charset, None)
|
||||
self.assertEqual(parts[1].filename, 'image.png')
|
||||
self.assertEqual(parts[1].description, 'the description')
|
||||
self.assertEqual(parts[1].content_id, 'this.is.the.normaly.unique.contentid')
|
||||
self.assertEqual(parts[1].get_payload(), b'data')
|
||||
|
||||
|
||||
raw_1='''Content-Type: text/plain; charset="us-ascii"
|
||||
MIME-Version: 1.0
|
||||
Content-Transfer-Encoding: 7bit
|
||||
Subject: simple test
|
||||
From: Me <me@foo.com>
|
||||
To: A <a@foo.com>, B <b@foo.com>
|
||||
Cc: C <c@foo.com>, d@foo.com
|
||||
User-Agent: pyzmail
|
||||
|
||||
The text.
|
||||
'''
|
||||
|
||||
def check_message_1(self, msg):
|
||||
self.assertEqual(msg.get_subject(), 'simple test')
|
||||
self.assertEqual(msg.get_decoded_header('subject'), 'simple test')
|
||||
self.assertEqual(msg.get_decoded_header('User-Agent'), 'pyzmail')
|
||||
self.assertEqual(msg.get('User-Agent'), 'pyzmail')
|
||||
self.assertEqual(msg.get_address('from'), ('Me', 'me@foo.com'))
|
||||
self.assertEqual(msg.get_addresses('to'), [('A', 'a@foo.com'), ('B', 'b@foo.com')])
|
||||
self.assertEqual(msg.get_addresses('cc'), [('C', 'c@foo.com'), ('d@foo.com', 'd@foo.com')])
|
||||
self.assertEqual(len(msg.mailparts), 1)
|
||||
self.assertEqual(msg.text_part, msg.mailparts[0])
|
||||
self.assertEqual(msg.html_part, None)
|
||||
|
||||
# use 8bits encoding and 2 different charsets ! python 3.0 & 3.1 are not eable to parse this sample
|
||||
raw_2=b"""From: sender@domain.com
|
||||
To: recipient@domain.com
|
||||
Date: Tue, 7 Jun 2011 16:32:17 +0200
|
||||
Subject: contains 8bits attachments using different encoding
|
||||
Content-Type: multipart/mixed; boundary=mixed
|
||||
|
||||
--mixed
|
||||
Content-Type: text/plain; charset="us-ascii"
|
||||
MIME-Version: 1.0
|
||||
Content-Transfer-Encoding: 7bit
|
||||
|
||||
body
|
||||
--mixed
|
||||
Content-Type: text/plain; charset="windows-1252"
|
||||
MIME-Version: 1.0
|
||||
Content-Transfer-Encoding: 8bit
|
||||
Content-Disposition: attachment; filename="file1.txt"
|
||||
|
||||
bo\xeete mail = mailbox
|
||||
--mixed
|
||||
Content-Type: text/plain; charset="utf-8"
|
||||
MIME-Version: 1.0
|
||||
Content-Transfer-Encoding: 8bit
|
||||
Content-Disposition: attachment; filename="file2.txt"
|
||||
|
||||
bo\xc3\xaete mail = mailbox
|
||||
--mixed--
|
||||
"""
|
||||
|
||||
def check_message_2(self, msg):
|
||||
self.assertEqual(msg.get_subject(), 'contains 8bits attachments using different encoding')
|
||||
|
||||
body, file1, file2=msg.mailparts
|
||||
|
||||
self.assertEqual('file1.txt', file1.filename)
|
||||
self.assertEqual('file2.txt', file2.filename)
|
||||
self.assertEqual('windows-1252', file1.charset)
|
||||
self.assertEqual('utf-8', file2.charset)
|
||||
content=b'bo\xeete mail = mailbox'.decode("windows-1252")
|
||||
content1=file1.get_payload().decode(file1.charset)
|
||||
content2=file2.get_payload().decode(file2.charset)
|
||||
self.assertEqual(content, content1)
|
||||
self.assertEqual(content, content2)
|
||||
|
||||
# this one contain non us-ascii chars in the header
|
||||
# py 2x and py3k return different value here
|
||||
raw_3=b'Content-Type: text/plain; charset="us-ascii"\n' \
|
||||
b'MIME-Version: 1.0\n' \
|
||||
b'Content-Transfer-Encoding: 7bit\n' \
|
||||
+ 'Subject: Beno\xeet & Ant\xf3nio\n'.encode('utf8') +\
|
||||
b'From: =?utf-8?q?Beno=C3=AEt?= <benoit@example.com>\n' \
|
||||
+ 'To: Ant\xf3nio Foo <a.foo@example.com>\n'.encode('utf8') \
|
||||
+ 'Cc: Beno\xeet <benoit@foo.com>, d@foo.com\n'.encode('utf8') +\
|
||||
b'User-Agent: pyzmail\n' \
|
||||
b'\n' \
|
||||
b'The text.\n'
|
||||
|
||||
def check_message_3(self, msg):
|
||||
subject='Beno\ufffd\ufffdt & Ant\ufffd\ufffdnio' # if sys.version_info<(3, 0) else u'Beno??t & Ant??nio'
|
||||
self.assertEqual(msg.get_subject(), subject)
|
||||
self.assertEqual(msg.get_decoded_header('subject'), subject)
|
||||
self.assertEqual(msg.get_decoded_header('User-Agent'), 'pyzmail')
|
||||
self.assertEqual(msg.get('User-Agent'), 'pyzmail')
|
||||
self.assertEqual(msg.get_address('from'), ('Beno\xeet', 'benoit@example.com'))
|
||||
|
||||
to=msg.get_addresses('to')
|
||||
self.assertEqual(to[0][1], 'a.foo@example.com')
|
||||
self.assertEqual(to[0][0], 'Ant\ufffd\ufffdnio Foo' if sys.version_info<(3, 0) else 'Ant??nio Foo')
|
||||
|
||||
cc=msg.get_addresses('cc')
|
||||
self.assertEqual(cc[0][1], 'benoit@foo.com')
|
||||
self.assertEqual(cc[0][0], 'Beno\ufffd\ufffdt' if sys.version_info<(3, 0) else 'Beno??t')
|
||||
self.assertEqual(cc[1], ('d@foo.com', 'd@foo.com'))
|
||||
|
||||
self.assertEqual(len(msg.mailparts), 1)
|
||||
self.assertEqual(msg.text_part, msg.mailparts[0])
|
||||
self.assertEqual(msg.html_part, None)
|
||||
|
||||
|
||||
def check_pyzmessage_factories(self, input, check):
|
||||
"""test PyzMessage from different sources"""
|
||||
if isinstance(input, bytes) and sys.version_info>=(3, 2):
|
||||
check(PyzMessage.factory(input))
|
||||
check(message_from_bytes(input))
|
||||
|
||||
import io
|
||||
check(PyzMessage.factory(io.BytesIO(input)))
|
||||
check(message_from_binary_file(io.BytesIO(input)))
|
||||
|
||||
if isinstance(input, str):
|
||||
|
||||
check(PyzMessage.factory(input))
|
||||
check(message_from_string(input))
|
||||
|
||||
import io
|
||||
check(PyzMessage.factory(io.StringIO(input)))
|
||||
check(message_from_file(io.StringIO(input)))
|
||||
|
||||
def test_pyzmessage_factories(self):
|
||||
"""test PyzMessage class different sources"""
|
||||
self.check_pyzmessage_factories(self.raw_1, self.check_message_1)
|
||||
self.check_pyzmessage_factories(self.raw_2, self.check_message_2)
|
||||
self.check_pyzmessage_factories(self.raw_3, self.check_message_3)
|
||||
|
||||
|
||||
# Add doctest
|
||||
def load_tests(loader, tests, ignore):
|
||||
# this works with python 2.7 and 3.x
|
||||
if sys.version_info<(3, 0):
|
||||
tests.addTests(doctest.DocTestSuite(pyzmail.parse))
|
||||
return tests
|
||||
|
||||
def additional_tests():
|
||||
# Add doctest for python 2.6 and below
|
||||
if sys.version_info<(2, 7):
|
||||
return doctest.DocTestSuite(pyzmail.parse)
|
||||
else:
|
||||
return unittest.TestSuite()
|
||||
|
|
@ -1,77 +0,0 @@
|
|||
import threading, smtpd, asyncore, socket, smtplib, time
|
||||
import unittest
|
||||
import pyzmail
|
||||
from pyzmail.generate import *
|
||||
|
||||
|
||||
smtpd_addr='127.0.0.1'
|
||||
smtpd_port=32525
|
||||
smtp_bad_port=smtpd_port-1
|
||||
|
||||
smtp_mode='normal'
|
||||
smtp_login=None
|
||||
smtp_password=None
|
||||
|
||||
|
||||
class SMTPServer(smtpd.SMTPServer):
|
||||
def __init__(self, localaddr, remoteaddr, received):
|
||||
smtpd.SMTPServer.__init__(self, localaddr, remoteaddr)
|
||||
self.set_reuse_addr()
|
||||
# put the received mail into received list
|
||||
self.received=received
|
||||
|
||||
def process_message(self, peer, mail_from, rcpt_to, data):
|
||||
ret=None
|
||||
if mail_from.startswith('data_error'):
|
||||
ret='552 Requested mail action aborted: exceeded storage allocation'
|
||||
self.received.append((ret, peer, mail_from, rcpt_to, data))
|
||||
return ret
|
||||
|
||||
class TestSend(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.received=[]
|
||||
self.smtp_server=SMTPServer((smtpd_addr, smtpd_port), None, self.received)
|
||||
|
||||
def asyncloop():
|
||||
# check every sec if all channel are close
|
||||
asyncore.loop(1)
|
||||
|
||||
|
||||
self.payload, self.mail_from, self.rcpt_to, self.msg_id=compose_mail(('Me', 'me@foo.com'), [('Him', 'him@bar.com')], 'the subject', 'iso-8859-1', ('Hello world', 'us-ascii'))
|
||||
|
||||
# start the server after having built the payload, to handle failure in
|
||||
# the code above
|
||||
self.smtpd_thread=threading.Thread(target=asyncloop)
|
||||
self.smtpd_thread.daemon=True
|
||||
self.smtpd_thread.start()
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
self.smtp_server.close()
|
||||
self.smtpd_thread.join()
|
||||
|
||||
def test_simple_send(self):
|
||||
"""simple send"""
|
||||
ret=send_mail(self.payload, self.mail_from, self.rcpt_to, smtpd_addr, smtpd_port, smtp_mode=smtp_mode, smtp_login=smtp_login, smtp_password=smtp_password)
|
||||
self.assertEqual(ret, dict())
|
||||
(ret, peer, mail_from, rcpt_to, payload)=self.received[0]
|
||||
self.assertEqual(self.payload, payload)
|
||||
self.assertEqual(self.mail_from, mail_from)
|
||||
self.assertEqual(self.rcpt_to, rcpt_to)
|
||||
self.assertEqual('127.0.0.1', peer[0])
|
||||
|
||||
def test_send_to_a_wrong_port(self):
|
||||
"""send to a wrong port"""
|
||||
self.smtp_server.close()
|
||||
ret=send_mail(self.payload, self.mail_from, self.rcpt_to, smtpd_addr, smtpd_port, smtp_mode=smtp_mode, smtp_login=smtp_login, smtp_password=smtp_password)
|
||||
self.assertEqual(type(ret), str)
|
||||
|
||||
def test_send_data_error(self):
|
||||
"""smtp server return error code"""
|
||||
ret=send_mail(self.payload, 'data_error@foo.com', self.rcpt_to, smtpd_addr, smtp_bad_port, smtp_mode=smtp_mode, smtp_login=smtp_login, smtp_password=smtp_password)
|
||||
self.assertEqual(type(ret), str)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
@ -1,24 +0,0 @@
|
|||
import unittest, doctest
|
||||
import pyzmail
|
||||
from pyzmail.utils import *
|
||||
|
||||
class TestUtils(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
pass
|
||||
|
||||
def test_nothing(self):
|
||||
pass
|
||||
|
||||
# Add doctest
|
||||
def load_tests(loader, tests, ignore):
|
||||
# this works with python 2.7 and 3.x
|
||||
tests.addTests(doctest.DocTestSuite(pyzmail.utils))
|
||||
return tests
|
||||
|
||||
def additional_tests():
|
||||
# Add doctest for python 2.6 and below
|
||||
if sys.version_info<(2, 7):
|
||||
return doctest.DocTestSuite(pyzmail.utils)
|
||||
else:
|
||||
return unittest.TestSuite()
|
155
pyzmail/utils.py
155
pyzmail/utils.py
|
@ -1,155 +0,0 @@
|
|||
#
|
||||
# pyzmail/utils.py
|
||||
# (c) Alain Spineux <alain.spineux@gmail.com>
|
||||
# http://www.magiksys.net/pyzmail
|
||||
# Released under LGPL
|
||||
|
||||
"""
|
||||
Various functions used by other modules
|
||||
@var invalid_chars_in_filename: a mix of characters not permitted in most used filesystems
|
||||
@var invalid_windows_name: a list of unauthorized filenames under Windows
|
||||
"""
|
||||
|
||||
import sys
|
||||
|
||||
invalid_chars_in_filename=b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f' \
|
||||
b'\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f' \
|
||||
b'<>:"/\\|?*%\''
|
||||
|
||||
invalid_windows_name=[b'CON', b'PRN', b'AUX', b'NUL', b'COM1', b'COM2', b'COM3',
|
||||
b'COM4', b'COM5', b'COM6', b'COM7', b'COM8', b'COM9',
|
||||
b'LPT1', b'LPT2', b'LPT3', b'LPT4', b'LPT5', b'LPT6', b'LPT7',
|
||||
b'LPT8', b'LPT9' ]
|
||||
|
||||
def sanitize_filename(filename, alt_name, alt_ext):
|
||||
"""
|
||||
Convert the given filename into a name that should work on all
|
||||
platform. Remove non us-ascii characters, and drop invalid filename.
|
||||
Use the I{alternative} filename if needed.
|
||||
|
||||
@type filename: unicode or None
|
||||
@param filename: the originale filename or None. Can be unicode.
|
||||
@type alt_name: str
|
||||
@param alt_name: the alternative filename if filename is None or useless
|
||||
@type alt_ext: str
|
||||
@param alt_ext: the alternative filename extension (including the '.')
|
||||
|
||||
@rtype: str
|
||||
@returns: a valid filename.
|
||||
|
||||
>>> sanitize_filename('document.txt', 'file', '.txt')
|
||||
'document.txt'
|
||||
>>> sanitize_filename('number1.txt', 'file', '.txt')
|
||||
'number1.txt'
|
||||
>>> sanitize_filename(None, 'file', '.txt')
|
||||
'file.txt'
|
||||
>>> sanitize_filename(u'R\\xe9pertoir.txt', 'file', '.txt')
|
||||
'Rpertoir.txt'
|
||||
>>> # the '\\xe9' has been removed
|
||||
>>> sanitize_filename(u'\\xe9\\xe6.html', 'file', '.txt')
|
||||
'file.html'
|
||||
>>> # all non us-ascii characters have been removed, the alternative name
|
||||
>>> # has been used the replace empty string. The originale extention
|
||||
>>> # is still valid
|
||||
>>> sanitize_filename(u'COM1.txt', 'file', '.txt')
|
||||
'COM1A.txt'
|
||||
>>> # if name match an invalid name or assimilated then a A is added
|
||||
"""
|
||||
|
||||
if not filename:
|
||||
return alt_name+alt_ext
|
||||
|
||||
if ((sys.version_info<(3, 0) and isinstance(filename, str)) or \
|
||||
(sys.version_info>=(3, 0) and isinstance(filename, str))):
|
||||
filename=filename.encode('ascii', 'ignore')
|
||||
|
||||
filename=filename.translate(None, invalid_chars_in_filename)
|
||||
filename=filename.strip()
|
||||
|
||||
upper=filename.upper()
|
||||
for name in invalid_windows_name:
|
||||
if upper==name:
|
||||
filename=filename+b'A'
|
||||
break
|
||||
if upper.startswith(name+b'.'):
|
||||
filename=filename[:len(name)]+b'A'+filename[len(name):]
|
||||
break
|
||||
|
||||
if sys.version_info>=(3, 0):
|
||||
# back to string
|
||||
filename=filename.decode('us-ascii')
|
||||
|
||||
if filename.rfind('.')==0:
|
||||
filename=alt_name+filename
|
||||
|
||||
return filename
|
||||
|
||||
def handle_filename_collision(filename, filenames):
|
||||
"""
|
||||
Avoid filename collision, add a sequence number to the name when required.
|
||||
'file.txt' will be renamed into 'file-01.txt' then 'file-02.txt' ...
|
||||
until their is no more collision. The file is not added to the list.
|
||||
|
||||
Windows don't make the difference between lower and upper case. To avoid
|
||||
"case" collision, the function compare C{filename.lower()} to the list.
|
||||
If you provide a list in lower case only, then any collisions will be avoided.
|
||||
|
||||
@type filename: str
|
||||
@param filename: the filename
|
||||
@type filenames: list or set
|
||||
@param filenames: a list of filenames.
|
||||
|
||||
@rtype: str
|
||||
@returns: the I{filename} or the appropriately I{indexed} I{filename}
|
||||
|
||||
>>> handle_filename_collision('file.txt', [ ])
|
||||
'file.txt'
|
||||
>>> handle_filename_collision('file.txt', [ 'file.txt' ])
|
||||
'file-01.txt'
|
||||
>>> handle_filename_collision('file.txt', [ 'file.txt', 'file-01.txt',])
|
||||
'file-02.txt'
|
||||
>>> handle_filename_collision('foo', [ 'foo',])
|
||||
'foo-01'
|
||||
>>> handle_filename_collision('foo', [ 'foo', 'foo-01',])
|
||||
'foo-02'
|
||||
>>> handle_filename_collision('FOO', [ 'foo', 'foo-01',])
|
||||
'FOO-02'
|
||||
"""
|
||||
if filename.lower() in filenames:
|
||||
try:
|
||||
basename, ext=filename.rsplit('.', 1)
|
||||
ext='.'+ext
|
||||
except ValueError:
|
||||
basename, ext=filename, ''
|
||||
|
||||
i=1
|
||||
while True:
|
||||
filename='%s-%02d%s' % (basename, i, ext)
|
||||
if filename.lower() not in filenames:
|
||||
break
|
||||
i+=1
|
||||
|
||||
return filename
|
||||
|
||||
def is_usascii(value):
|
||||
""""
|
||||
test if string contains us-ascii characters only
|
||||
|
||||
>>> is_usascii('foo')
|
||||
True
|
||||
>>> is_usascii(u'foo')
|
||||
True
|
||||
>>> is_usascii(u'Fran\xe7ais')
|
||||
False
|
||||
>>> is_usascii('bad\x81')
|
||||
False
|
||||
"""
|
||||
try:
|
||||
# if value is byte string, it will be decoded first using us-ascii
|
||||
# and will generate UnicodeEncodeError, this is fine too
|
||||
value.encode('us-ascii')
|
||||
except UnicodeError:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
|
@ -1 +0,0 @@
|
|||
__version__='1.0.3'
|
Loading…
Reference in a new issue