datatracker/ietf/group/migrations/0041_create_liaison_contact_roles.py

159 lines
6.6 KiB
Python

# Generated by Django 2.2.17 on 2020-12-09 06:59
from django.db import migrations
from ietf.person.name import plain_name
from ietf.utils.mail import parseaddr
def find_or_create_email(email_model, person_model, formatted_email, group):
"""Look up an email address or create if needed
Also creates a Person if the email does not have one. Created Email will have
the origin field set to the origin parameter to this method.
"""
name, address = parseaddr(formatted_email)
if not address:
raise ValueError('Could not parse email "%s"' % formatted_email)
email, _ = email_model.objects.get_or_create(
address=address,
defaults=dict(origin='liaison contact: ' + group.acronym)
)
if not email.person:
person = person_model.objects.create(name=name if name else address)
email.person = person
email.save()
# Display an alert if the formatted address sent from the Role will differ
# from what was in the original contacts list
if not email.person.plain and email.person.name == email.address:
recreated_contact_email = email.address
else:
person_plain = email.person.plain if email.person.plain else plain_name(email.person.name)
recreated_contact_email = "%s <%s>" % (person_plain, email.address)
if recreated_contact_email != formatted_email:
print('>> Note: address "%s" is now "%s" (%s)' % (
formatted_email,
recreated_contact_email,
group.acronym,
))
return email
def forward(apps, schema_editor):
"""Perform forward migration
Creates liaison_contact and liaison_cc_contact Roles corresponding to existing
LiaisonStatementGroupContact instances.
"""
Group = apps.get_model('group', 'Group')
Role = apps.get_model('group', 'Role')
Email = apps.get_model('person', 'Email')
Person = apps.get_model('person', 'Person')
RoleName = apps.get_model('name', 'RoleName')
contact_role_name = RoleName.objects.get(slug='liaison_contact')
cc_contact_role_name = RoleName.objects.get(slug='liaison_cc_contact')
print()
LiaisonStatementGroupContacts = apps.get_model('liaisons', 'LiaisonStatementGroupContacts')
for lsgc in LiaisonStatementGroupContacts.objects.all():
group = lsgc.group
for contact_email in lsgc.contacts.split(','):
if contact_email:
email = find_or_create_email(Email, Person,
contact_email.strip(),
group)
Role.objects.create(
group=group,
name=contact_role_name,
person=email.person,
email=email,
)
for contact_email in lsgc.cc_contacts.split(','):
if contact_email:
email = find_or_create_email(Email, Person,
contact_email.strip(),
group)
Role.objects.create(
group=group,
name=cc_contact_role_name,
person=email.person,
email=email,
)
# Now validate that we got them all. As much as possible, use independent code
# to avoid replicating any bugs from the original migration.
for group in Group.objects.all():
lsgc = LiaisonStatementGroupContacts.objects.filter(group_id=group.pk).first()
if not lsgc:
if group.role_set.filter(name__in=[contact_role_name, cc_contact_role_name]).exists():
raise ValueError('%s group has contact roles after migration but had no LiaisonStatementGroupContacts' % (
group.acronym,
))
else:
contacts = group.role_set.filter(name=contact_role_name)
num_lsgc_contacts = len(lsgc.contacts.split(',')) if lsgc.contacts else 0
if len(contacts) != num_lsgc_contacts:
raise ValueError(
'%s group has %d contact(s) but only %d address(es) in its LiaisonStatementGroupContacts (contact addresses = "%s", LSGC.contacts="%s")' % (
group.acronym, len(contacts), num_lsgc_contacts,
'","'.join([c.email.address for c in contacts]),
lsgc.contacts,
)
)
for contact in contacts:
email = contact.email.address
if email.lower() not in lsgc.contacts.lower():
raise ValueError(
'%s group has "%s" contact but not found in LiaisonStatementGroupContacts.contacts = "%s"' % (
group.acronym, email, lsgc.contacts,
)
)
cc_contacts = group.role_set.filter(name=cc_contact_role_name)
num_lsgc_cc_contacts = len(lsgc.cc_contacts.split(',')) if lsgc.cc_contacts else 0
if len(cc_contacts) != num_lsgc_cc_contacts:
raise ValueError(
'%s group has %d CC contact(s) but %d address(es) in its LiaisonStatementGroupContacts (cc_contact addresses = "%s", LSGC.cc_contacts="%s")' % (
group.acronym, len(cc_contacts), num_lsgc_cc_contacts,
'","'.join([c.email.address for c in cc_contacts]),
lsgc.cc_contacts,
)
)
for cc_contact in cc_contacts:
email = cc_contact.email.address
if email.lower() not in lsgc.cc_contacts.lower():
raise ValueError(
'%s group has "%s" CC contact but not found in LiaisonStatementGroupContacts.cc_contacts = "%s"' % (
group.acronym, email, lsgc.cc_contacts,
)
)
def reverse(apps, schema_editor):
"""Perform reverse migration
Removes liaison_contact and liaison_cc_contact Roles. The forward migration creates missing
Email and Person instances, but these are not removed because it's difficult to do this
safely and correctly.
"""
Role = apps.get_model('group', 'Role')
Role.objects.filter(
name_id__in=['liaison_contact', 'liaison_cc_contact']
).delete()
class Migration(migrations.Migration):
dependencies = [
('group', '0040_lengthen_used_roles_fields'),
('name', '0022_add_liaison_contact_rolenames'),
]
operations = [
migrations.RunPython(forward, reverse),
]