fix: close open things (#5593)

* fix: close open things

* fix: clean up test created files

* fix: remove one close too many
This commit is contained in:
Robert Sparks 2023-05-10 11:19:34 -05:00 committed by GitHub
parent 9fa54270e6
commit f8113cb862
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 156 additions and 111 deletions

View file

@ -320,22 +320,26 @@ This test section has some text.
file = NamedTemporaryFile(delete=False,mode="w+",encoding='utf-8') file = NamedTemporaryFile(delete=False,mode="w+",encoding='utf-8')
file.write(f'# {username}') file.write(f'# {username}')
file.close() file.close()
for postdict in [ try:
{'bofreq_submission':'enter','bofreq_content':f'# {username}'}, with open(file.name, 'rb') as bofreq_fd:
{'bofreq_submission':'upload','bofreq_file':open(file.name,'rb')}, for postdict in [
]: {'bofreq_submission':'enter','bofreq_content':f'# {username}'},
docevent_count = doc.docevent_set.count() {'bofreq_submission':'upload','bofreq_file':bofreq_fd},
empty_outbox() ]:
r = self.client.post(url, postdict) docevent_count = doc.docevent_set.count()
self.assertEqual(r.status_code, 302) empty_outbox()
doc = reload_db_objects(doc) r = self.client.post(url, postdict)
self.assertEqual('%02d'%(int(rev)+1) ,doc.rev) self.assertEqual(r.status_code, 302)
self.assertEqual(f'# {username}', doc.text()) doc = reload_db_objects(doc)
self.assertEqual(docevent_count+1, doc.docevent_set.count()) self.assertEqual('%02d'%(int(rev)+1) ,doc.rev)
self.assertEqual(1, len(outbox)) self.assertEqual(f'# {username}', doc.text())
rev = doc.rev self.assertEqual(docevent_count+1, doc.docevent_set.count())
self.assertEqual(1, len(outbox))
rev = doc.rev
finally:
os.unlink(file.name)
self.client.logout() self.client.logout()
os.unlink(file.name)
def test_start_new_bofreq(self): def test_start_new_bofreq(self):
url = urlreverse('ietf.doc.views_bofreq.new_bof_request') url = urlreverse('ietf.doc.views_bofreq.new_bof_request')
@ -350,25 +354,28 @@ This test section has some text.
file = NamedTemporaryFile(delete=False,mode="w+",encoding='utf-8') file = NamedTemporaryFile(delete=False,mode="w+",encoding='utf-8')
file.write('some stuff') file.write('some stuff')
file.close() file.close()
for postdict in [ try:
dict(title='title one', bofreq_submission='enter', bofreq_content='some stuff'), with open(file.name,'rb') as bofreq_fd:
dict(title='title two', bofreq_submission='upload', bofreq_file=open(file.name,'rb')), for postdict in [
]: dict(title='title one', bofreq_submission='enter', bofreq_content='some stuff'),
empty_outbox() dict(title='title two', bofreq_submission='upload', bofreq_file=bofreq_fd),
r = self.client.post(url, postdict) ]:
self.assertEqual(r.status_code,302) empty_outbox()
name = f"bofreq-{xslugify(nobody.last_name())[:64]}-{postdict['title']}".replace(' ','-') r = self.client.post(url, postdict)
bofreq = Document.objects.filter(name=name,type_id='bofreq').first() self.assertEqual(r.status_code,302)
self.assertIsNotNone(bofreq) name = f"bofreq-{xslugify(nobody.last_name())[:64]}-{postdict['title']}".replace(' ','-')
self.assertIsNotNone(DocAlias.objects.filter(name=name).first()) bofreq = Document.objects.filter(name=name,type_id='bofreq').first()
self.assertEqual(bofreq.title, postdict['title']) self.assertIsNotNone(bofreq)
self.assertEqual(bofreq.rev, '00') self.assertIsNotNone(DocAlias.objects.filter(name=name).first())
self.assertEqual(bofreq.get_state_slug(), 'proposed') self.assertEqual(bofreq.title, postdict['title'])
self.assertEqual(list(bofreq_editors(bofreq)), [nobody]) self.assertEqual(bofreq.rev, '00')
self.assertEqual(bofreq.latest_event(NewRevisionDocEvent).rev, '00') self.assertEqual(bofreq.get_state_slug(), 'proposed')
self.assertEqual(bofreq.text_or_error(), 'some stuff') self.assertEqual(list(bofreq_editors(bofreq)), [nobody])
self.assertEqual(len(outbox),1) self.assertEqual(bofreq.latest_event(NewRevisionDocEvent).rev, '00')
os.unlink(file.name) self.assertEqual(bofreq.text_or_error(), 'some stuff')
self.assertEqual(len(outbox),1)
finally:
os.unlink(file.name)
existing_bofreq = BofreqFactory(requester_lastname=nobody.last_name()) existing_bofreq = BofreqFactory(requester_lastname=nobody.last_name())
for postdict in [ for postdict in [
dict(title='', bofreq_submission='enter', bofreq_content='some stuff'), dict(title='', bofreq_submission='enter', bofreq_content='some stuff'),

View file

@ -499,6 +499,8 @@ class ReviewTests(TestCase):
tar.add(os.path.relpath(tmp.name)) tar.add(os.path.relpath(tmp.name))
mbox.close()
return mbox_path return mbox_path
def test_search_mail_archive(self): def test_search_mail_archive(self):

View file

@ -23,8 +23,12 @@ class Command(EmailOnFailureCommand):
def handle(self, *args, **options): def handle(self, *args, **options):
email = options.get('email', None) email = options.get('email', None)
binary_input = io.open(email, 'rb') if email else sys.stdin.buffer if email:
self.msg_bytes = binary_input.read() binary_input = io.open(email, 'rb')
self.msg_bytes = binary_input.read()
binary_input.close()
else:
self.msg_bytes = sys.stdin.buffer.read()
try: try:
process_response_email(self.msg_bytes) process_response_email(self.msg_bytes)
except ValueError as e: except ValueError as e:
@ -44,4 +48,4 @@ class Command(EmailOnFailureCommand):
'application', 'octet-stream', # mime type 'application', 'octet-stream', # mime type
filename='original-message', filename='original-message',
) )
return msg return msg

View file

@ -894,23 +894,27 @@ class MeetingTests(BaseMeetingTestCase):
def test_session_draft_tarfile(self): def test_session_draft_tarfile(self):
session, filenames = self.build_session_setup() session, filenames = self.build_session_setup()
url = urlreverse('ietf.meeting.views.session_draft_tarfile', kwargs={'num':session.meeting.number,'acronym':session.group.acronym}) try:
response = self.client.get(url) url = urlreverse('ietf.meeting.views.session_draft_tarfile', kwargs={'num':session.meeting.number,'acronym':session.group.acronym})
self.assertEqual(response.status_code, 200) response = self.client.get(url)
self.assertEqual(response.get('Content-Type'), 'application/octet-stream') self.assertEqual(response.status_code, 200)
for filename in filenames: self.assertEqual(response.get('Content-Type'), 'application/octet-stream')
os.unlink(filename) finally:
for filename in filenames:
os.unlink(filename)
@skipIf(skip_pdf_tests, skip_message) @skipIf(skip_pdf_tests, skip_message)
@skip_coverage @skip_coverage
def test_session_draft_pdf(self): def test_session_draft_pdf(self):
session, filenames = self.build_session_setup() session, filenames = self.build_session_setup()
url = urlreverse('ietf.meeting.views.session_draft_pdf', kwargs={'num':session.meeting.number,'acronym':session.group.acronym}) try:
response = self.client.get(url) url = urlreverse('ietf.meeting.views.session_draft_pdf', kwargs={'num':session.meeting.number,'acronym':session.group.acronym})
self.assertEqual(response.status_code, 200) response = self.client.get(url)
self.assertEqual(response.get('Content-Type'), 'application/pdf') self.assertEqual(response.status_code, 200)
for filename in filenames: self.assertEqual(response.get('Content-Type'), 'application/pdf')
os.unlink(filename) finally:
for filename in filenames:
os.unlink(filename)
def test_current_materials(self): def test_current_materials(self):
url = urlreverse('ietf.meeting.views.current_materials') url = urlreverse('ietf.meeting.views.current_materials')
@ -6411,7 +6415,9 @@ class MaterialsTests(TestCase):
path = os.path.join(submission.session.meeting.get_materials_path(),'slides') path = os.path.join(submission.session.meeting.get_materials_path(),'slides')
filename = os.path.join(path,session.sessionpresentation_set.first().document.name+'-01.txt') filename = os.path.join(path,session.sessionpresentation_set.first().document.name+'-01.txt')
self.assertTrue(os.path.exists(filename)) self.assertTrue(os.path.exists(filename))
contents = io.open(filename,'r').read() fd = io.open(filename, 'r')
contents = fd.read()
fd.close()
self.assertIn('third version', contents) self.assertIn('third version', contents)
@ -7946,12 +7952,13 @@ class ProceedingsTests(BaseMeetingTestCase):
"""Upload proceedings materials document""" """Upload proceedings materials document"""
meeting = self._procmat_test_meeting() meeting = self._procmat_test_meeting()
for mat_type in ProceedingsMaterialTypeName.objects.filter(used=True): for mat_type in ProceedingsMaterialTypeName.objects.filter(used=True):
mat = self.upload_proceedings_material_test( with self._proceedings_file() as fd:
meeting, mat = self.upload_proceedings_material_test(
mat_type, meeting,
{'file': self._proceedings_file(), 'external_url': ''}, mat_type,
) {'file': fd, 'external_url': ''},
self.assertEqual(mat.get_href(), f'{mat.document.name}:00') )
self.assertEqual(mat.get_href(), f'{mat.document.name}:00')
def test_add_proceedings_material_doc_invalid_ext(self): def test_add_proceedings_material_doc_invalid_ext(self):
"""Upload proceedings materials document with disallowed extension""" """Upload proceedings materials document with disallowed extension"""
@ -8038,12 +8045,13 @@ class ProceedingsTests(BaseMeetingTestCase):
kwargs=dict(num=meeting.number, material_type=pm_doc.type.slug), kwargs=dict(num=meeting.number, material_type=pm_doc.type.slug),
) )
self.client.login(username='secretary', password='secretary+password') self.client.login(username='secretary', password='secretary+password')
r = self.client.post(pm_doc_url, {'file': self._proceedings_file(), 'external_url': ''}) with self._proceedings_file() as fd:
self.assertRedirects(r, success_url) r = self.client.post(pm_doc_url, {'file': fd, 'external_url': ''})
self.assertEqual(meeting.proceedings_materials.count(), 2) self.assertRedirects(r, success_url)
pm_doc = meeting.proceedings_materials.get(pk=pm_doc.pk) # refresh from DB self.assertEqual(meeting.proceedings_materials.count(), 2)
self.assertEqual(pm_doc.document.rev, '01') pm_doc = meeting.proceedings_materials.get(pk=pm_doc.pk) # refresh from DB
self.assertEqual(pm_doc.get_href(), f'{pm_doc.document.name}:01') self.assertEqual(pm_doc.document.rev, '01')
self.assertEqual(pm_doc.get_href(), f'{pm_doc.document.name}:01')
# Replace the uploaded document with a URL # Replace the uploaded document with a URL
r = self.client.post(pm_doc_url, {'use_url': 'on', 'external_url': 'https://example.com/second'}) r = self.client.post(pm_doc_url, {'use_url': 'on', 'external_url': 'https://example.com/second'})
@ -8066,12 +8074,13 @@ class ProceedingsTests(BaseMeetingTestCase):
self.assertEqual(pm_url.get_href(), 'https://example.com/third') self.assertEqual(pm_url.get_href(), 'https://example.com/third')
# Now replace the URL doc with an uploaded file # Now replace the URL doc with an uploaded file
r = self.client.post(pm_url_url, {'file': self._proceedings_file(), 'external_url': ''}) with self._proceedings_file() as fd:
self.assertRedirects(r, success_url) r = self.client.post(pm_url_url, {'file': fd, 'external_url': ''})
self.assertEqual(meeting.proceedings_materials.count(), 2) self.assertRedirects(r, success_url)
pm_url = meeting.proceedings_materials.get(pk=pm_url.pk) # refresh from DB self.assertEqual(meeting.proceedings_materials.count(), 2)
self.assertEqual(pm_url.document.rev, '02') pm_url = meeting.proceedings_materials.get(pk=pm_url.pk) # refresh from DB
self.assertEqual(pm_url.get_href(), f'{pm_url.document.name}:02') self.assertEqual(pm_url.document.rev, '02')
self.assertEqual(pm_url.get_href(), f'{pm_url.document.name}:02')
def test_remove_proceedings_material(self): def test_remove_proceedings_material(self):
"""Proceedings material can be removed""" """Proceedings material can be removed"""

View file

@ -42,8 +42,11 @@ class Command(EmailOnFailureCommand):
except NomCom.DoesNotExist: except NomCom.DoesNotExist:
raise CommandError("NomCom %s does not exist or it isn't active" % year) raise CommandError("NomCom %s does not exist or it isn't active" % year)
binary_input = io.open(email, 'rb') if email else sys.stdin.buffer if email:
self.msg = binary_input.read() with io.open(email, 'rb') as binary_input:
self.msg = binary_input.read()
else:
self.msg = sys.stdin.buffer.read()
try: try:
feedback = create_feedback_email(self.nomcom, self.msg) feedback = create_feedback_email(self.nomcom, self.msg)

View file

@ -94,7 +94,8 @@ def check_comments(encryped, plain, privatekey_file):
decrypted_file.close() decrypted_file.close()
encrypted_file.close() encrypted_file.close()
decrypted_comments = io.open(decrypted_file.name, 'rb').read().decode('utf-8') with io.open(decrypted_file.name, 'rb') as fd:
decrypted_comments = fd.read().decode('utf-8')
os.unlink(encrypted_file.name) os.unlink(encrypted_file.name)
os.unlink(decrypted_file.name) os.unlink(decrypted_file.name)
@ -116,7 +117,8 @@ def nomcom_test_data():
nomcom_test_cert_file, privatekey_file = generate_cert() nomcom_test_cert_file, privatekey_file = generate_cert()
nomcom.public_key.storage = FileSystemStorage(location=settings.NOMCOM_PUBLIC_KEYS_DIR) nomcom.public_key.storage = FileSystemStorage(location=settings.NOMCOM_PUBLIC_KEYS_DIR)
nomcom.public_key.save('cert', File(io.open(nomcom_test_cert_file.name, 'r'))) with io.open(nomcom_test_cert_file.name, 'r') as fd:
nomcom.public_key.save('cert', File(fd))
# chair and member # chair and member
create_person(group, "chair", username=CHAIR_USER, email_address='%s%s'%(CHAIR_USER,EMAIL_DOMAIN)) create_person(group, "chair", username=CHAIR_USER, email_address='%s%s'%(CHAIR_USER,EMAIL_DOMAIN))

View file

@ -715,7 +715,8 @@ class NomcomViewsTest(TestCase):
# save the cert file in tmp # save the cert file in tmp
#nomcom.public_key.storage.location = tempfile.gettempdir() #nomcom.public_key.storage.location = tempfile.gettempdir()
nomcom.public_key.save('cert', File(io.open(self.cert_file.name, 'r'))) with io.open(self.cert_file.name, 'r') as fd:
nomcom.public_key.save('cert', File(fd))
response = self.client.get(nominate_url) response = self.client.get(nominate_url)
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
@ -781,7 +782,8 @@ class NomcomViewsTest(TestCase):
# save the cert file in tmp # save the cert file in tmp
#nomcom.public_key.storage.location = tempfile.gettempdir() #nomcom.public_key.storage.location = tempfile.gettempdir()
nomcom.public_key.save('cert', File(io.open(self.cert_file.name, 'r'))) with io.open(self.cert_file.name, 'r') as fd:
nomcom.public_key.save('cert', File(fd))
response = self.client.get(nominate_url) response = self.client.get(nominate_url)
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
@ -863,7 +865,8 @@ class NomcomViewsTest(TestCase):
# save the cert file in tmp # save the cert file in tmp
#nomcom.public_key.storage.location = tempfile.gettempdir() #nomcom.public_key.storage.location = tempfile.gettempdir()
nomcom.public_key.save('cert', File(io.open(self.cert_file.name, 'r'))) with io.open(self.cert_file.name, 'r') as fd:
nomcom.public_key.save('cert', File(fd))
response = self.client.get(self.add_questionnaire_url) response = self.client.get(self.add_questionnaire_url)
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
@ -942,7 +945,8 @@ class NomcomViewsTest(TestCase):
# save the cert file in tmp # save the cert file in tmp
#nomcom.public_key.storage.location = tempfile.gettempdir() #nomcom.public_key.storage.location = tempfile.gettempdir()
nomcom.public_key.save('cert', File(io.open(self.cert_file.name, 'r'))) with io.open(self.cert_file.name, 'r') as fd:
nomcom.public_key.save('cert', File(fd))
response = self.client.get(feedback_url) response = self.client.get(feedback_url)
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
@ -1066,7 +1070,8 @@ class FeedbackTest(TestCase):
# save the cert file in tmp # save the cert file in tmp
#nomcom.public_key.storage.location = tempfile.gettempdir() #nomcom.public_key.storage.location = tempfile.gettempdir()
nomcom.public_key.save('cert', File(io.open(self.cert_file.name, 'r'))) with io.open(self.cert_file.name, 'r') as fd:
nomcom.public_key.save('cert', File(fd))
comment_text = 'Plain text. Comments with accents äöåÄÖÅ éáíóú âêîôû ü àèìòù.' comment_text = 'Plain text. Comments with accents äöåÄÖÅ éáíóú âêîôû ü àèìòù.'
comments = nomcom.encrypt(comment_text) comments = nomcom.encrypt(comment_text)
@ -1089,7 +1094,8 @@ class ReminderTest(TestCase):
self.nomcom = get_nomcom_by_year(NOMCOM_YEAR) self.nomcom = get_nomcom_by_year(NOMCOM_YEAR)
self.cert_file, self.privatekey_file = get_cert_files() self.cert_file, self.privatekey_file = get_cert_files()
#self.nomcom.public_key.storage.location = tempfile.gettempdir() #self.nomcom.public_key.storage.location = tempfile.gettempdir()
self.nomcom.public_key.save('cert', File(io.open(self.cert_file.name, 'r'))) with io.open(self.cert_file.name, 'r') as fd:
self.nomcom.public_key.save('cert', File(fd))
gen = Position.objects.get(nomcom=self.nomcom,name='GEN') gen = Position.objects.get(nomcom=self.nomcom,name='GEN')
rai = Position.objects.get(nomcom=self.nomcom,name='RAI') rai = Position.objects.get(nomcom=self.nomcom,name='RAI')

View file

@ -100,14 +100,14 @@ def photo(request, email_or_name):
if not size.isdigit(): if not size.isdigit():
return HttpResponse("Size must be integer", status=400) return HttpResponse("Size must be integer", status=400)
size = int(size) size = int(size)
img = Image.open(person.photo) with Image.open(person.photo) as img:
img = img.resize((size, img.height*size//img.width)) img = img.resize((size, img.height*size//img.width))
bytes = BytesIO() bytes = BytesIO()
try: try:
img.save(bytes, format='JPEG') img.save(bytes, format='JPEG')
return HttpResponse(bytes.getvalue(), content_type='image/jpg') return HttpResponse(bytes.getvalue(), content_type='image/jpg')
except OSError: except OSError:
raise Http404 raise Http404
@role_required("Secretariat") @role_required("Secretariat")

View file

@ -103,6 +103,7 @@ def retrieve_messages_from_mbox(mbox_fileobj):
"date": msg["Date"], "date": msg["Date"],
"utcdate": (utcdate.date().isoformat(), utcdate.time().isoformat()) if utcdate else ("", ""), "utcdate": (utcdate.date().isoformat(), utcdate.time().isoformat()) if utcdate else ("", ""),
}) })
mbox.close()
return res return res

View file

@ -1561,10 +1561,16 @@ class SubmitTests(BaseSubmitTestCase):
self.assertEqual(Submission.objects.filter(name=name).count(), 1) self.assertEqual(Submission.objects.filter(name=name).count(), 1)
self.assertTrue(os.path.exists(os.path.join(self.staging_dir, "%s-%s.txt" % (name, rev)))) self.assertTrue(os.path.exists(os.path.join(self.staging_dir, "%s-%s.txt" % (name, rev))))
self.assertTrue(name in io.open(os.path.join(self.staging_dir, "%s-%s.txt" % (name, rev))).read()) fd = io.open(os.path.join(self.staging_dir, "%s-%s.txt" % (name, rev)))
txt_contents = fd.read()
fd.close()
self.assertTrue(name in txt_contents)
self.assertTrue(os.path.exists(os.path.join(self.staging_dir, "%s-%s.xml" % (name, rev)))) self.assertTrue(os.path.exists(os.path.join(self.staging_dir, "%s-%s.xml" % (name, rev))))
self.assertTrue(name in io.open(os.path.join(self.staging_dir, "%s-%s.xml" % (name, rev))).read()) fd = io.open(os.path.join(self.staging_dir, "%s-%s.xml" % (name, rev)))
self.assertTrue('<?xml version="1.0" encoding="UTF-8"?>' in io.open(os.path.join(self.staging_dir, "%s-%s.xml" % (name, rev))).read()) xml_contents = fd.read()
fd.close()
self.assertTrue(name in xml_contents)
self.assertTrue('<?xml version="1.0" encoding="UTF-8"?>' in xml_contents)
def test_expire_submissions(self): def test_expire_submissions(self):
s = Submission.objects.create(name="draft-ietf-mars-foo", s = Submission.objects.create(name="draft-ietf-mars-foo",
@ -3402,8 +3408,10 @@ class AsyncSubmissionTests(BaseSubmitTestCase):
"test_submission.txt", "test_submission.txt",
title="Correct Draft Title", title="Correct Draft Title",
) )
txt_path.write_text(txt.read()) with txt_path.open('w') as fd:
with self.assertRaisesMessage(SubmissionError, "disagrees with submission filename"): fd.write(txt.read())
txt.close()
with self.assertRaisesMessage(SubmissionError, 'disagrees with submission filename'):
process_submission_text("draft-somebody-test", "00") process_submission_text("draft-somebody-test", "00")
# rev mismatch # rev mismatch
@ -3414,8 +3422,10 @@ class AsyncSubmissionTests(BaseSubmitTestCase):
"test_submission.txt", "test_submission.txt",
title="Correct Draft Title", title="Correct Draft Title",
) )
txt_path.write_text(txt.read()) with txt_path.open('w') as fd:
with self.assertRaisesMessage(SubmissionError, "disagrees with submission revision"): fd.write(txt.read())
txt.close()
with self.assertRaisesMessage(SubmissionError, 'disagrees with submission revision'):
process_submission_text("draft-somebody-test", "00") process_submission_text("draft-somebody-test", "00")
def test_process_and_validate_submission(self): def test_process_and_validate_submission(self):

View file

@ -73,6 +73,7 @@ class Command(BaseCommand):
data = json.load(file) data = json.load(file)
except ValueError as e: except ValueError as e:
raise CommandError("Failure to read json data from %s: %s" % (filename, e)) raise CommandError("Failure to read json data from %s: %s" % (filename, e))
file.close()
version = version or data["version"] version = version or data["version"]
if not version in data: if not version in data:
raise CommandError("There is no data for version %s available in %s" % (version, filename)) raise CommandError("There is no data for version %s available in %s" % (version, filename))

View file

@ -12,23 +12,23 @@ def pipe(cmd, str=None):
if str and len(str) > 4096: # XXX: Hardcoded Linux 2.4, 2.6 pipe buffer size if str and len(str) > 4096: # XXX: Hardcoded Linux 2.4, 2.6 pipe buffer size
bufsize = len(str) bufsize = len(str)
pipe = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, bufsize=bufsize, shell=True) with Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, bufsize=bufsize, shell=True) as pipe:
if not str is None: if not str is None:
pipe.stdin.write(str) pipe.stdin.write(str)
pipe.stdin.close() pipe.stdin.close()
out = b"" out = b""
err = b"" err = b""
while True: while True:
str = pipe.stdout.read() str = pipe.stdout.read()
if str: if str:
out += str out += str
code = pipe.poll() code = pipe.poll()
if code != None: if code != None:
err = pipe.stderr.read() err = pipe.stderr.read()
break break
if len(out) >= MAX: if len(out) >= MAX:
err = "Output exceeds %s bytes and has been truncated" % MAX err = "Output exceeds %s bytes and has been truncated" % MAX
break break
return (code, out, err) return (code, out, err)

View file

@ -73,7 +73,7 @@ def validate_file_size(file, missing_ok=False):
def validate_mime_type(file, valid, missing_ok=False): def validate_mime_type(file, valid, missing_ok=False):
try: try:
file.open() file.open() # Callers expect this to remain open. Consider refactoring.
except FileNotFoundError: except FileNotFoundError:
if missing_ok: if missing_ok:
return None, None return None, None