Added two new management commands to make it easier to load back selected objects that have been removed by mistake (providing they are available in a full database dump or backup that can be loaded and worked with): dumprelated and loadrelated.
- Legacy-Id: 15790
This commit is contained in:
parent
d2b86bc225
commit
0e8f63951e
|
@ -1,174 +0,0 @@
|
|||
# Copyright The IETF Trust 2016, All Rights Reserved
|
||||
# -*- coding: utf-8 -*-
|
||||
from __future__ import unicode_literals, print_function
|
||||
|
||||
import datetime
|
||||
from tqdm import tqdm
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib.admin.utils import NestedObjects
|
||||
from django.contrib.auth.models import User
|
||||
from django.core.management.base import BaseCommand
|
||||
from django.db.models import F
|
||||
|
||||
import debug # pyflakes:ignore
|
||||
|
||||
from ietf.community.models import SearchRule
|
||||
from ietf.person.models import Person, Alias, PersonalApiKey, Email
|
||||
from ietf.person.name import unidecode_name
|
||||
from ietf.utils.log import log
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = (u"""
|
||||
|
||||
Delete data for which consent to store the data has not been given,
|
||||
where the data does not fall under the GDPR Legitimate Interest clause
|
||||
for the IETF. This includes full name, ascii name, bio, login,
|
||||
notification subscriptions and email addresses that are not derived from
|
||||
published drafts or ietf roles.
|
||||
|
||||
""")
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument('-n', '--dry-run', action='store_true', default=False,
|
||||
help="Don't delete anything, just list what would be done.")
|
||||
# parser.add_argument('-d', '--date', help="Date of deletion (mentioned in message)")
|
||||
parser.add_argument('-m', '--minimum-response-time', metavar='TIME', type=int, default=7,
|
||||
help="Minimum response time, default: %(default)s days. Persons to whom a "
|
||||
"consent request email has been sent more recently than this will not "
|
||||
"be affected by the run.")
|
||||
# parser.add_argument('-r', '--rate', type=float, default=1.0,
|
||||
# help='Rate of sending mail, default: %(default)s/s')
|
||||
# parser.add_argument('user', nargs='*')
|
||||
|
||||
|
||||
def handle(self, *args, **options):
|
||||
dry_run = options['dry_run']
|
||||
verbosity = int(options['verbosity'])
|
||||
event_type = 'gdpr_notice_email'
|
||||
settings.DEBUG = False # don't log to console
|
||||
|
||||
# users
|
||||
users = User.objects.filter(person__isnull=True, username__contains='@')
|
||||
self.stdout.write("Found %d users without associated person records" % (users.count(), ))
|
||||
emails = Email.objects.filter(address__in=users.values_list('username', flat=True))
|
||||
# fix up users that don't have person records, but have a username matching a nown email record
|
||||
self.stdout.write("Checking usernames against email records ...")
|
||||
for email in tqdm(emails):
|
||||
user = users.get(username=email.address)
|
||||
if email.person.user_id:
|
||||
if dry_run:
|
||||
self.stdout.write("Would delete user #%-6s (%s) %s" % (user.id, user.last_login, user.username))
|
||||
else:
|
||||
log("Deleting user #%-6s (%s) %s: no person record, matching email has other user" % (user.id, user.last_login, user.username))
|
||||
user_id = user.id
|
||||
user.delete()
|
||||
Person.history.filter(user_id=user_id).delete()
|
||||
Email.history.filter(history_user=user_id).delete()
|
||||
else:
|
||||
if dry_run:
|
||||
self.stdout.write("Would connect user #%-6s %s to person #%-6s %s" % (user.id, user.username, email.person.id, email.person.ascii_name()))
|
||||
else:
|
||||
log("Connecting user #%-6s %s to person #%-6s %s" % (user.id, user.username, email.person.id, email.person.ascii_name()))
|
||||
email.person.user_id = user.id
|
||||
email.person.save()
|
||||
# delete users without person records
|
||||
users = users.exclude(username__in=emails.values_list('address', flat=True))
|
||||
if dry_run:
|
||||
self.stdout.write("Would delete %d users without associated person records" % (users.count(), ))
|
||||
else:
|
||||
if users.count():
|
||||
log("Deleting %d users without associated person records" % (users.count(), ))
|
||||
assert not users.filter(person__isnull=False).exists()
|
||||
user_ids = users.values_list('id', flat=True)
|
||||
users.delete()
|
||||
assert not Person.history.filter(user_id__in=user_ids).exists()
|
||||
|
||||
|
||||
# persons
|
||||
self.stdout.write('Querying the database for person records without given consent ...')
|
||||
notification_cutoff = datetime.datetime.now() - datetime.timedelta(days=options['minimum_response_time'])
|
||||
persons = Person.objects.exclude(consent=True)
|
||||
persons = persons.exclude(id=1) # make sure we don't delete System ;-)
|
||||
self.stdout.write("Found %d persons with information for which we don't have consent." % (persons.count(), ))
|
||||
|
||||
# Narrow to persons we don't have Legitimate Interest in, and delete those fully
|
||||
persons = persons.exclude(docevent__by=F('pk'))
|
||||
persons = persons.exclude(documentauthor__person=F('pk')).exclude(dochistoryauthor__person=F('pk'))
|
||||
persons = persons.exclude(email__liaisonstatement__from_contact__person=F('pk'))
|
||||
persons = persons.exclude(email__reviewrequest__reviewer__person=F('pk'))
|
||||
persons = persons.exclude(email__shepherd_dochistory_set__shepherd__person=F('pk'))
|
||||
persons = persons.exclude(email__shepherd_document_set__shepherd__person=F('pk'))
|
||||
persons = persons.exclude(iprevent__by=F('pk'))
|
||||
persons = persons.exclude(meetingregistration__person=F('pk'))
|
||||
persons = persons.exclude(message__by=F('pk'))
|
||||
persons = persons.exclude(name_from_draft='')
|
||||
persons = persons.exclude(personevent__time__gt=notification_cutoff, personevent__type=event_type)
|
||||
persons = persons.exclude(reviewrequest__requested_by=F('pk'))
|
||||
persons = persons.exclude(role__person=F('pk')).exclude(rolehistory__person=F('pk'))
|
||||
persons = persons.exclude(session__requested_by=F('pk'))
|
||||
persons = persons.exclude(submissionevent__by=F('pk'))
|
||||
self.stdout.write("Found %d persons with information for which we neither have consent nor legitimate interest." % (persons.count(), ))
|
||||
if persons.count() > 0:
|
||||
self.stdout.write("Deleting records for persons for which we have with neither consent nor legitimate interest ...")
|
||||
for person in (persons if dry_run else tqdm(persons)):
|
||||
if dry_run:
|
||||
self.stdout.write(("Would delete record #%-6d: (%s) %-32s %-48s" % (person.pk, person.time, person.ascii_name(), "<%s>"%person.email())).encode('utf8'))
|
||||
else:
|
||||
if verbosity > 1:
|
||||
# development aids
|
||||
collector = NestedObjects(using='default')
|
||||
collector.collect([person,])
|
||||
objects = collector.nested()
|
||||
related = [ o for o in objects[-1] if not isinstance(o, (Alias, Person, SearchRule, PersonalApiKey)) ]
|
||||
if len(related) > 0:
|
||||
self.stderr.write("Person record #%-6s %s has unexpected related records" % (person.pk, person.ascii_name()))
|
||||
|
||||
# Historical records using simple_history has on_delete=DO_NOTHING, so
|
||||
# we have to do explicit deletions:
|
||||
id = person.id
|
||||
person.delete()
|
||||
Person.history.filter(id=id).delete()
|
||||
Email.history.filter(person_id=id).delete()
|
||||
|
||||
# Deal with remaining persons (lacking consent, but with legitimate interest)
|
||||
persons = Person.objects.exclude(consent=True)
|
||||
persons = persons.exclude(id=1)
|
||||
self.stdout.write("Found %d remaining persons with information for which we don't have consent." % (persons.count(), ))
|
||||
if persons.count() > 0:
|
||||
self.stdout.write("Removing personal information requiring consent ...")
|
||||
for person in (persons if dry_run else tqdm(persons)):
|
||||
fields = ', '.join(person.needs_consent())
|
||||
if dry_run:
|
||||
self.stdout.write(("Would remove info for #%-6d: (%s) %-32s %-48s %s" % (person.pk, person.time, person.ascii_name(), "<%s>"%person.email(), fields)).encode('utf8'))
|
||||
else:
|
||||
if person.name_from_draft:
|
||||
log("Using name info from draft for #%-6d %s: no consent, no roles" % (person.pk, person))
|
||||
person.name = person.name_from_draft
|
||||
person.ascii = unidecode_name(person.name_from_draft)
|
||||
if person.biography:
|
||||
log("Deleting biography for #%-6d %s: no consent, no roles" % (person.pk, person))
|
||||
person.biography = ''
|
||||
person.save()
|
||||
if person.user_id:
|
||||
if User.objects.filter(id=person.user_id).exists():
|
||||
log("Deleting communitylist for #%-6d %s: no consent, no roles" % (person.pk, person))
|
||||
person.user.communitylist_set.all().delete()
|
||||
for email in person.email_set.all():
|
||||
if not email.origin.split(':')[0] in ['author', 'role', 'reviewer', 'liaison', 'shepherd', ]:
|
||||
log("Deleting email <%s> for #%-6d %s: no consent, no roles" % (email.address, person.pk, person))
|
||||
address = email.address
|
||||
email.delete()
|
||||
Email.history.filter(address=address).delete()
|
||||
|
||||
emails = Email.objects.filter(origin='', person__consent=False)
|
||||
self.stdout.write("Found %d emails without origin for which we lack consent." % (emails.count(), ))
|
||||
if dry_run:
|
||||
self.stdout.write("Would delete %d email records without origin and consent" % (emails.count(), ))
|
||||
else:
|
||||
if emails.count():
|
||||
log("Deleting %d email records without origin and consent" % (emails.count(), ))
|
||||
addresses = emails.values_list('address', flat=True)
|
||||
emails.delete()
|
||||
Email.history.filter(address__in=addresses).delete()
|
||||
|
204
ietf/utils/management/commands/dumprelated.py
Normal file
204
ietf/utils/management/commands/dumprelated.py
Normal file
|
@ -0,0 +1,204 @@
|
|||
import warnings
|
||||
from collections import OrderedDict
|
||||
|
||||
from django.apps import apps
|
||||
from django.contrib.admin.utils import NestedObjects
|
||||
from django.core import serializers
|
||||
from django.core.management.base import BaseCommand, CommandError
|
||||
from django.core.management.utils import parse_apps_and_model_labels
|
||||
from django.db import DEFAULT_DB_ALIAS, router
|
||||
|
||||
import debug # pyflakes:ignore
|
||||
debug.debug = True
|
||||
|
||||
class ProxyModelWarning(Warning):
|
||||
pass
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = (
|
||||
"Output a database object and its related objects as a fixture of the given format "
|
||||
)
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument(
|
||||
'args', metavar='app_label.ModelName', nargs=1,
|
||||
help='Specifies the app_label.ModelName for which to dump objects given by --pks',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--format', default='json', dest='format',
|
||||
help='Specifies the output serialization format for fixtures.',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--indent', default=None, dest='indent', type=int,
|
||||
help='Specifies the indent level to use when pretty-printing output.',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--database', action='store', dest='database',
|
||||
default=DEFAULT_DB_ALIAS,
|
||||
help='Nominates a specific database to dump fixtures from. '
|
||||
'Defaults to the "default" database.',
|
||||
)
|
||||
parser.add_argument(
|
||||
'-e', '--exclude', dest='exclude', action='append', default=[],
|
||||
help='An app_label or app_label.ModelName to exclude '
|
||||
'(use multiple --exclude to exclude multiple apps/models).',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--natural-foreign', action='store_true', dest='use_natural_foreign_keys', default=False,
|
||||
help='Use natural foreign keys if they are available.',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--natural-primary', action='store_true', dest='use_natural_primary_keys', default=False,
|
||||
help='Use natural primary keys if they are available.',
|
||||
)
|
||||
parser.add_argument(
|
||||
'-o', '--output', default=None, dest='output',
|
||||
help='Specifies file to which the output is written.'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--pks', dest='primary_keys', required=True,
|
||||
help="Only dump objects with given primary keys. Accepts a comma-separated "
|
||||
"list of keys. This option only works when you specify one model.",
|
||||
)
|
||||
|
||||
def handle(self, *app_labels, **options):
|
||||
format = options['format']
|
||||
indent = options['indent']
|
||||
using = options['database']
|
||||
excludes = options['exclude']
|
||||
output = options['output']
|
||||
show_traceback = options['traceback']
|
||||
use_natural_foreign_keys = options['use_natural_foreign_keys']
|
||||
use_natural_primary_keys = options['use_natural_primary_keys']
|
||||
pks = options['primary_keys']
|
||||
|
||||
if pks:
|
||||
primary_keys = [pk.strip() for pk in pks.split(',')]
|
||||
else:
|
||||
primary_keys = []
|
||||
|
||||
excluded_models, excluded_apps = parse_apps_and_model_labels(excludes)
|
||||
|
||||
if len(app_labels) == 0:
|
||||
if primary_keys:
|
||||
raise CommandError("You can only use --pks option with one model")
|
||||
app_list = OrderedDict(
|
||||
(app_config, None) for app_config in apps.get_app_configs()
|
||||
if app_config.models_module is not None and app_config not in excluded_apps
|
||||
)
|
||||
else:
|
||||
if len(app_labels) > 1 and primary_keys:
|
||||
raise CommandError("You can only use --pks option with one model")
|
||||
app_list = OrderedDict()
|
||||
for label in app_labels:
|
||||
try:
|
||||
app_label, model_label = label.split('.')
|
||||
try:
|
||||
app_config = apps.get_app_config(app_label)
|
||||
except LookupError as e:
|
||||
raise CommandError(str(e))
|
||||
if app_config.models_module is None or app_config in excluded_apps:
|
||||
continue
|
||||
try:
|
||||
model = app_config.get_model(model_label)
|
||||
except LookupError:
|
||||
raise CommandError("Unknown model: %s.%s" % (app_label, model_label))
|
||||
|
||||
app_list_value = app_list.setdefault(app_config, [])
|
||||
|
||||
# We may have previously seen a "all-models" request for
|
||||
# this app (no model qualifier was given). In this case
|
||||
# there is no need adding specific models to the list.
|
||||
if app_list_value is not None:
|
||||
if model not in app_list_value:
|
||||
app_list_value.append(model)
|
||||
except ValueError:
|
||||
if primary_keys:
|
||||
raise CommandError("You can only use --pks option with one model")
|
||||
# This is just an app - no model qualifier
|
||||
app_label = label
|
||||
try:
|
||||
app_config = apps.get_app_config(app_label)
|
||||
except LookupError as e:
|
||||
raise CommandError(str(e))
|
||||
if app_config.models_module is None or app_config in excluded_apps:
|
||||
continue
|
||||
app_list[app_config] = None
|
||||
|
||||
# Check that the serialization format exists; this is a shortcut to
|
||||
# avoid collating all the objects and _then_ failing.
|
||||
if format not in serializers.get_public_serializer_formats():
|
||||
try:
|
||||
serializers.get_serializer(format)
|
||||
except serializers.SerializerDoesNotExist:
|
||||
pass
|
||||
|
||||
raise CommandError("Unknown serialization format: %s" % format)
|
||||
|
||||
def flatten(l):
|
||||
if isinstance(l, list):
|
||||
for el in l:
|
||||
if isinstance(el, list):
|
||||
for sub in flatten(el):
|
||||
yield sub
|
||||
else:
|
||||
yield el
|
||||
else:
|
||||
yield l
|
||||
|
||||
def get_objects(count_only=False):
|
||||
"""
|
||||
Collate the objects to be serialized. If count_only is True, just
|
||||
count the number of objects to be serialized.
|
||||
"""
|
||||
models = serializers.sort_dependencies(app_list.items())
|
||||
for model in models:
|
||||
if model in excluded_models:
|
||||
continue
|
||||
if model._meta.proxy and model._meta.proxy_for_model not in models:
|
||||
warnings.warn(
|
||||
"%s is a proxy model and won't be serialized." % model._meta.label,
|
||||
category=ProxyModelWarning,
|
||||
)
|
||||
if not model._meta.proxy and router.allow_migrate_model(using, model):
|
||||
objects = model._default_manager
|
||||
|
||||
queryset = objects.using(using).order_by(model._meta.pk.name)
|
||||
if primary_keys:
|
||||
queryset = queryset.filter(pk__in=primary_keys)
|
||||
if count_only:
|
||||
yield queryset.order_by().count()
|
||||
else:
|
||||
for obj in queryset.iterator():
|
||||
collector = NestedObjects(using=using)
|
||||
collector.collect([obj,])
|
||||
object_list = list(flatten(collector.nested()))
|
||||
object_list.reverse()
|
||||
for o in object_list:
|
||||
yield o
|
||||
|
||||
try:
|
||||
self.stdout.ending = None
|
||||
progress_output = None
|
||||
object_count = 0
|
||||
# If dumpdata is outputting to stdout, there is no way to display progress
|
||||
if (output and self.stdout.isatty() and options['verbosity'] > 0):
|
||||
progress_output = self.stdout
|
||||
object_count = sum(get_objects(count_only=True))
|
||||
stream = open(output, 'w') if output else None
|
||||
try:
|
||||
serializers.serialize(
|
||||
format, get_objects(), indent=indent,
|
||||
use_natural_foreign_keys=use_natural_foreign_keys,
|
||||
use_natural_primary_keys=use_natural_primary_keys,
|
||||
stream=stream or self.stdout, progress_output=progress_output,
|
||||
object_count=object_count,
|
||||
)
|
||||
finally:
|
||||
if stream:
|
||||
stream.close()
|
||||
except Exception as e:
|
||||
if show_traceback:
|
||||
raise
|
||||
raise CommandError("Unable to serialize database: %s" % e)
|
142
ietf/utils/management/commands/loadrelated.py
Normal file
142
ietf/utils/management/commands/loadrelated.py
Normal file
|
@ -0,0 +1,142 @@
|
|||
# Copyright The IETF Trust 2016, All Rights Reserved
|
||||
# -*- coding: utf-8 -*-
|
||||
from __future__ import unicode_literals, print_function
|
||||
|
||||
import datetime
|
||||
import gzip
|
||||
import io
|
||||
import os
|
||||
#import sys
|
||||
import tqdm
|
||||
import zipfile
|
||||
|
||||
try:
|
||||
import bz2
|
||||
has_bz2 = True
|
||||
except ImportError:
|
||||
has_bz2 = False
|
||||
|
||||
from django.core.exceptions import ObjectDoesNotExist
|
||||
from django.conf import settings
|
||||
from django.contrib.admin.utils import NestedObjects
|
||||
from django.contrib.auth.models import User
|
||||
from django.core import serializers
|
||||
from django.db import DEFAULT_DB_ALIAS, DatabaseError, IntegrityError, connections
|
||||
from django.db.models import F
|
||||
from django.db.models.signals import pre_delete, post_save
|
||||
from django.dispatch import receiver
|
||||
from django.utils.encoding import force_text
|
||||
import django.core.management.commands.loaddata as loaddata
|
||||
|
||||
|
||||
import debug # pyflakes:ignore
|
||||
|
||||
from ietf.community.models import SearchRule, CommunityList, EmailSubscription, notify_events
|
||||
from ietf.doc.models import Document
|
||||
from ietf.person.models import Person, HistoricalPerson, PersonEvent, Alias, PersonalApiKey, Email, HistoricalEmail
|
||||
from ietf.person.name import unidecode_name
|
||||
from ietf.utils.log import log
|
||||
|
||||
class Command(loaddata.Command):
|
||||
help = (u"""
|
||||
|
||||
Load a fixture of related objects to the database. The fixture is expected
|
||||
to contain a set of related objects, created with the 'dumprelated' management
|
||||
command. It differs from the 'loaddata' command in that it silently ignores
|
||||
attempts to load duplicate entries, and continues loading subsequent entries.
|
||||
|
||||
""")
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument('args', metavar='fixture', nargs='+', help='Fixture files.')
|
||||
parser.add_argument(
|
||||
'--database', action='store', dest='database', default=DEFAULT_DB_ALIAS,
|
||||
help='Nominates a specific database to load fixtures into. Defaults to the "default" database.',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--ignorenonexistent', '-i', action='store_true', dest='ignore', default=False,
|
||||
help='Ignores entries in the serialized data for fields that do not '
|
||||
'currently exist on the model.',
|
||||
)
|
||||
|
||||
def handle(self, *args, **options):
|
||||
self.ignore = options['ignore']
|
||||
self.using = options['database']
|
||||
self.verbosity = options['verbosity']
|
||||
#
|
||||
self.compression_formats = {
|
||||
None: (open, 'rb'),
|
||||
'gz': (gzip.GzipFile, 'rb'),
|
||||
'zip': (SingleZipReader, 'r'),
|
||||
}
|
||||
if has_bz2:
|
||||
self.compression_formats['bz2'] = (bz2.BZ2File, 'r')
|
||||
#
|
||||
self.serialization_formats = serializers.get_public_serializer_formats()
|
||||
#
|
||||
post_save.disconnect(notify_events)
|
||||
#
|
||||
show_progress = self.verbosity >= 3
|
||||
connection = connections[self.using]
|
||||
self.fixture_count = 0
|
||||
self.loaded_object_count = 0
|
||||
self.fixture_object_count = 0
|
||||
#
|
||||
for arg in args:
|
||||
fixture_file = arg
|
||||
self.stdout.write("Loading objects from %s" % fixture_file)
|
||||
_, ser_fmt, cmp_fmt = self.parse_name(os.path.basename(fixture_file))
|
||||
open_method, mode = self.compression_formats[cmp_fmt]
|
||||
fixture = open_method(fixture_file, mode)
|
||||
objects_in_fixture = 0
|
||||
self.stdout.write("Getting object count...\b\b\b", ending='')
|
||||
self.stdout.flush()
|
||||
for o in serializers.deserialize(ser_fmt, fixture, using=self.using, ignorenonexistent=self.ignore,):
|
||||
objects_in_fixture += 1
|
||||
self.stdout.write(" %d" % objects_in_fixture)
|
||||
#
|
||||
fixture = open_method(fixture_file, mode)
|
||||
self.fixture_count += 1
|
||||
objects = serializers.deserialize(ser_fmt, fixture, using=self.using, ignorenonexistent=self.ignore,)
|
||||
with connection.constraint_checks_disabled():
|
||||
for obj in tqdm.tqdm(objects, total=objects_in_fixture):
|
||||
try:
|
||||
obj.save(using=self.using)
|
||||
self.loaded_object_count += 1
|
||||
except (DatabaseError, IntegrityError, ObjectDoesNotExist, AttributeError) as e:
|
||||
error_msg = force_text(e)
|
||||
if "Duplicate entry" in error_msg:
|
||||
pass
|
||||
else:
|
||||
self.stderr.write("Could not load %(app_label)s.%(object_name)s(pk=%(pk)s): %(error_msg)s" % {
|
||||
'app_label': obj.object._meta.app_label,
|
||||
'object_name': obj.object._meta.object_name,
|
||||
'pk': obj.object.pk,
|
||||
'error_msg': error_msg,
|
||||
}, )
|
||||
self.fixture_object_count += objects_in_fixture
|
||||
|
||||
if self.verbosity >= 1:
|
||||
if self.fixture_object_count == self.loaded_object_count:
|
||||
self.stdout.write(
|
||||
"Installed %d object(s) from %d fixture(s)"
|
||||
% (self.loaded_object_count, self.fixture_count)
|
||||
)
|
||||
else:
|
||||
self.stdout.write(
|
||||
"Installed %d object(s) (of %d) from %d fixture(s)"
|
||||
% (self.loaded_object_count, self.fixture_object_count, self.fixture_count)
|
||||
)
|
||||
|
||||
|
||||
class SingleZipReader(zipfile.ZipFile):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
zipfile.ZipFile.__init__(self, *args, **kwargs)
|
||||
if len(self.namelist()) != 1:
|
||||
raise ValueError("Zip-compressed fixtures must contain one file.")
|
||||
|
||||
def read(self):
|
||||
return zipfile.ZipFile.read(self, self.namelist()[0])
|
||||
|
||||
|
Loading…
Reference in a new issue