* chore: Remove unused "rendertest" stuff (#6015) * fix: restore ability to create status change documents (#5963) * fix: restore ability to create status change documents Fixes #5962 * chore: address review comment * fix: Provide human-friendly status in submission status API response (#6011) Co-authored-by: nectostr <bastinda96@gmail.com> * fix: Make name/email lookups case-insensitive (#5972) (#6007) * fix: Make name/email lookups case-insensitive (#5972) Use icontains so that looking up name or email is case insensitive Added a test Fixes: 5972 * fix: Use __iexact not __icontains * fix: Clarify no-action-needed (#5918) (#6020) When a draft is submitted for manual processing, clarify that no action is needed; the Secretariat has the next steps. Fixes: #5918 * fix: Fix menu hover issue (#6019) * fix: Fix menu hover issue Fixes #5702 * Fix leftmenu hover issue * fix: Server error from api_get_session_materials() (#6025) Fixes #5877 * fix: Clarify Questionnaire label (#4688) (#6017) When filtering nominees, `Questionnaire` implies `Accepted == yes` so fix the dropdown test tosay that. Fixes: #4688 * chore: Merge from @martinthomson's rfc-txt-html (#6023) * fix:no history entry when changing RFC Editor note for doc (#6021) * fix:no history entry when changing RFC Editor note for doc * fix:no history entry when changing RFC Editor note for doc --------- Co-authored-by: Priyanka Narkar <priyankanarkar@dhcp-91f8.meeting.ietf.org> * fix: avoid deprecation warning on view_list() for objs without CommunityList Fixes #5942 * fix: return 404 for non-existing revisions (#6014) * fix: return 404 for non-existing revisions Links to non-existing revisions to docs should return 404 * fix: change rfc/rev and search behaviour * refactor: fix tab level * fix: return 404 for rfc revision for bibtex * fix: provide date for revisions in bibtex output (#6029) * fix: provide date for revisions in bibtex output * refactor: change walrus to if's * fix: specify particular revision for events * fix: review refactoring issue fixes #5447 * fix: Remove automatically suggested document for document that is already has review request (fixes #3211) (#5425) * Added check that if there is already review request for the document in question, ignore the automatic suggestion for that document. Fixes #3211. * fix: dont block on open requests for a previous version. Add tests --------- Co-authored-by: Nicolas Giard <github@ngpixel.com> Co-authored-by: Robert Sparks <rjsparks@nostrum.com> * feat: IAB statements (#5940) * feat: support iab and iesg statements. Import iab statements. (#5895) * feat: infrastructure for statements doctype * chore: basic test framework * feat: basic statement document view * feat: show replaced statements * chore: black * fix: state help for statements * fix: cleanout non-relevant email expansions * feat: import iab statements, provide group statements tab * fix: guard against running import twice * feat: build redirect csv for iab statements * fix: set document state on import * feat: show published date on main doc view * feat: handle pdf statements * feat: create new and update statements * chore: copyright block updates * chore: remove flakes * chore: black * feat: add edit/new buttons for the secretariat * fix: address PR #5895 review comments * fix: pin pydantic until inflect catches up (#5901) (#5902) * chore: re-un-pin pydantic * feat: include submitter in email about submitted slides (#6033) * feat: include submitter in email about submitted slides fixes #6031 * chore: remove unintended whitespace change * chore(dev): update .vscode/settings.json with new taskExplorer settings * fix: Add editorial stream to proceedings (#6027) * fix: Add editorial stream to proceedings Fixes #5717 * fix: Move editorial stream after the irtf in proceedings * fix: Add editorial stream to meeting materials (#6047) Fixes #6042 * fix: Shows requested reviews for doc fixes (#6022) * Fix: Shows requested reviews for doc * Changed template includes to only give required variables to them. * feat: allow openId to choose an unactive email if there are none active (#6041) * feat: allow openId to choose an unactive email if there are no active ones * chore: correct typo * chore: rename unactive to inactive * fix: Make review table more responsive (#6053) * fix: Improve layout of review table * Progress * Progress * Final changes * Fix tests * Remove fluff * Undo commits * ci: add --validate-html-harder to tests * ci: add --validate-html-harder to build.yml workflow * fix: Set colspan to actual number of columns (#6069) * fix: Clean up view_feedback_pending (#6070) - Remove "Unclassified" column header, which caused misalignment in the table body. - Show the message author - previously displayed as `(None)`. * docs: Update LICENSE year * fix: Remove IESG state edit button when state is 'dead' (#6051) (#6065) * fix: Correctly order "last call requested" column in the IESG dashboard (#6079) * ci: update dev sandbox init script to start memcached * feat: Reclassify nomcom feedback (#6002) * fix: Clean up view_feedback_pending - Remove "Unclassified" column header, which caused misalignment in the table body. - Show the message author - previously displayed as `(None)`. * feat: Reclassify nomcom feedback (#4669) - There's a new `Chair/Advisor Tasks` menu item `Reclassify feedback`. - I overloaded `view_feedback*` URLs with a `?reclassify` parameter. - This adds a checkbox to each feedback message, and a `Reclassify` button at the bottom of each feedback page. - "Reclassifying" basically de-classifies the feedback, and punts it back to the "Pending emails" view for reclassification. - If a feedback has been applied to multiple nominees, declassifying it from one nominee removes it from all. * fix: Remove unused local variables * fix: Fix some missing and mis-nested html * test: Add tests for reclassifying feedback * refactor: Substantial redesign of feedback reclassification - Break out reclassify_feedback* as their own URLs and views, and revert changes to view_feedback*.html. - Replace checkboxes with a Reclassify button on each message. * fix: Remember to clear the feedback associations when reclassifying * feat: Add an 'Overcome by events' feedback type * refactor: When invoking reclassification from a view-feedback page, load the corresponding reclassify-feedback page * fix: De-conflict migration with 0004_statements Also change the coding style to match, and add a reverse migration. * fix: Fix a test case to account for new feedback type * fix: 842e730 broke the Back button * refactor: Reclassify feedback directly instead of putting it back in the work queue * fix: Adjust tests to new workflow * refactor: Further refine reclassification to avoid redirects * refactor: Impose a FeedbackTypeName ordering Also add FeedbackTypeName.legend field, rather than synthesizing it every time we classify or reclassify feedback. In the reclassification forms, only show the relevant feedback types. * refactor: Merge reclassify_feedback_* back into view_feedback_* This means the "Reclassify" button is always present, but eliminates some complexity. * refactor: Add filter(used=True) on FeedbackTypeName querysets * refactor: Add the new FeedbackTypeName to the reclassification success message * fix: Secure reclassification against rogue nomcom members * fix: Print decoded key and fully clean up test nomcom (#6094) * fix: Delete Person records when deleting a test nomcom * fix: Decode test nomcom private key before printing * test: Use correct time zone for test_statement_doc_view (#6064) * chore(deps): update all npm dependencies for playwright (#6061) Co-authored-by: depfu[bot] <23717796+depfu[bot]@users.noreply.github.com> * chore(deps): update all npm dependencies for dev/diff (#6062) Co-authored-by: depfu[bot] <23717796+depfu[bot]@users.noreply.github.com> * chore(deps): update all npm dependencies for dev/coverage-action (#6063) Co-authored-by: depfu[bot] <23717796+depfu[bot]@users.noreply.github.com> * fix: Hash cache key for default memcached cache (#6089) * feat: Show docs that an AD hasn't balloted on that need ballots to progress (#6075) * fix(doc): Unify help texts for document states (#6060) * Fix IESG State help text link (only) * Intermediate checkpoint * Correct URL filtering of state descriptions * Unify help texts for document states * Remove redundant load static from template --------- Co-authored-by: Robert Sparks <rjsparks@nostrum.com> * ci: fix sandbox start.sh memcached user * fix: refactor how settings handles cache definitions (#6099) * fix: refactor how settings handles cache definitions * chore: more english-speaker readable expression * fix: Cast cache key to str before calling encode (#6100) --------- Co-authored-by: Robert Sparks <rjsparks@nostrum.com> Co-authored-by: Liubov Kurafeeva <liubov.kurafeeva@gmail.com> Co-authored-by: nectostr <bastinda96@gmail.com> Co-authored-by: Rich Salz <rsalz@akamai.com> Co-authored-by: PriyankaN <priyanka@amsl.com> Co-authored-by: Priyanka Narkar <priyankanarkar@dhcp-91f8.meeting.ietf.org> Co-authored-by: Ali <alireza83@gmail.com> Co-authored-by: Roman Beltiukov <maybe.hello.world@gmail.com> Co-authored-by: Tero Kivinen <kivinen@iki.fi> Co-authored-by: Nicolas Giard <github@ngpixel.com> Co-authored-by: Kesara Rathnayake <kesara@fq.nz> Co-authored-by: Jennifer Richards <jennifer@staff.ietf.org> Co-authored-by: Paul Selkirk <paul@painless-security.com> Co-authored-by: depfu[bot] <23717796+depfu[bot]@users.noreply.github.com> Co-authored-by: Jim Fenton <fenton@bluepopcorn.net>
1082 lines
48 KiB
Python
1082 lines
48 KiB
Python
# Copyright The IETF Trust 2015-2020, All Rights Reserved
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import datetime
|
|
import json
|
|
import html
|
|
import os
|
|
import sys
|
|
|
|
from importlib import import_module
|
|
from pathlib import Path
|
|
|
|
from django.apps import apps
|
|
from django.conf import settings
|
|
from django.test import Client
|
|
from django.test.utils import override_settings
|
|
from django.urls import reverse as urlreverse
|
|
from django.utils import timezone
|
|
|
|
from tastypie.test import ResourceTestCaseMixin
|
|
|
|
import debug # pyflakes:ignore
|
|
|
|
import ietf
|
|
from ietf.doc.utils import get_unicode_document_content
|
|
from ietf.doc.models import RelatedDocument, State
|
|
from ietf.doc.factories import IndividualDraftFactory, WgDraftFactory
|
|
from ietf.group.factories import RoleFactory
|
|
from ietf.meeting.factories import MeetingFactory, SessionFactory
|
|
from ietf.meeting.models import Session
|
|
from ietf.person.factories import PersonFactory, random_faker
|
|
from ietf.person.models import User
|
|
from ietf.person.models import PersonalApiKey
|
|
from ietf.stats.models import MeetingRegistration
|
|
from ietf.utils.mail import outbox, get_payload_text
|
|
from ietf.utils.models import DumpInfo
|
|
from ietf.utils.test_utils import TestCase, login_testing_unauthorized, reload_db_objects
|
|
|
|
OMITTED_APPS = (
|
|
'ietf.secr.meetings',
|
|
'ietf.secr.proceedings',
|
|
'ietf.ipr',
|
|
)
|
|
|
|
class CustomApiTests(TestCase):
|
|
settings_temp_path_overrides = TestCase.settings_temp_path_overrides + ['AGENDA_PATH']
|
|
|
|
def test_api_help_page(self):
|
|
url = urlreverse('ietf.api.views.api_help')
|
|
r = self.client.get(url)
|
|
self.assertContains(r, 'The datatracker API', status_code=200)
|
|
|
|
def test_api_openid_issuer(self):
|
|
url = urlreverse('ietf.api.urls.oidc_issuer')
|
|
r = self.client.get(url)
|
|
self.assertContains(r, 'OpenID Connect Issuer', status_code=200)
|
|
|
|
def test_deprecated_api_set_session_video_url(self):
|
|
url = urlreverse('ietf.meeting.views.api_set_session_video_url')
|
|
recmanrole = RoleFactory(group__type_id='ietf', name_id='recman')
|
|
recman = recmanrole.person
|
|
meeting = MeetingFactory(type_id='ietf')
|
|
session = SessionFactory(group__type_id='wg', meeting=meeting)
|
|
group = session.group
|
|
apikey = PersonalApiKey.objects.create(endpoint=url, person=recman)
|
|
video = 'https://foo.example.com/bar/beer/'
|
|
|
|
# error cases
|
|
r = self.client.post(url, {})
|
|
self.assertContains(r, "Missing apikey parameter", status_code=400)
|
|
|
|
badrole = RoleFactory(group__type_id='ietf', name_id='ad')
|
|
badapikey = PersonalApiKey.objects.create(endpoint=url, person=badrole.person)
|
|
badrole.person.user.last_login = timezone.now()
|
|
badrole.person.user.save()
|
|
r = self.client.post(url, {'apikey': badapikey.hash()} )
|
|
self.assertContains(r, "Restricted to role: Recording Manager", status_code=403)
|
|
|
|
r = self.client.post(url, {'apikey': apikey.hash()} )
|
|
self.assertContains(r, "Too long since last regular login", status_code=400)
|
|
recman.user.last_login = timezone.now()
|
|
recman.user.save()
|
|
|
|
r = self.client.get(url, {'apikey': apikey.hash()} )
|
|
self.assertContains(r, "Method not allowed", status_code=405)
|
|
|
|
r = self.client.post(url, {'apikey': apikey.hash(), 'group': group.acronym} )
|
|
self.assertContains(r, "Missing meeting parameter", status_code=400)
|
|
|
|
|
|
r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, } )
|
|
self.assertContains(r, "Missing group parameter", status_code=400)
|
|
|
|
r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': group.acronym} )
|
|
self.assertContains(r, "Missing item parameter", status_code=400)
|
|
|
|
r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': group.acronym, 'item': '1'} )
|
|
self.assertContains(r, "Missing url parameter", status_code=400)
|
|
|
|
r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': '1', 'group': group.acronym,
|
|
'item': '1', 'url': video, })
|
|
self.assertContains(r, "No sessions found for meeting", status_code=400)
|
|
|
|
r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': 'bogous',
|
|
'item': '1', 'url': video, })
|
|
self.assertContains(r, "No sessions found in meeting '%s' for group 'bogous'"%meeting.number, status_code=400)
|
|
|
|
r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': group.acronym,
|
|
'item': '1', 'url': "foobar", })
|
|
self.assertContains(r, "Invalid url value: 'foobar'", status_code=400)
|
|
|
|
r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': group.acronym,
|
|
'item': '5', 'url': video, })
|
|
self.assertContains(r, "No item '5' found in list of sessions for group", status_code=400)
|
|
|
|
r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': group.acronym,
|
|
'item': 'foo', 'url': video, })
|
|
self.assertContains(r, "Expected a numeric value for 'item', found 'foo'", status_code=400)
|
|
|
|
r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': group.acronym,
|
|
'item': '1', 'url': video+'/rum', })
|
|
self.assertContains(r, "Done", status_code=200)
|
|
|
|
r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': group.acronym,
|
|
'item': '1', 'url': video+'/rum', })
|
|
self.assertContains(r, "URL is the same", status_code=400)
|
|
|
|
r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': group.acronym,
|
|
'item': '1', 'url': video, })
|
|
self.assertContains(r, "Done", status_code=200)
|
|
|
|
recordings = session.recordings()
|
|
self.assertEqual(len(recordings), 1)
|
|
doc = recordings[0]
|
|
self.assertEqual(doc.external_url, video)
|
|
event = doc.latest_event()
|
|
self.assertEqual(event.by, recman)
|
|
|
|
def test_api_set_session_video_url(self):
|
|
url = urlreverse("ietf.meeting.views.api_set_session_video_url")
|
|
recmanrole = RoleFactory(group__type_id="ietf", name_id="recman")
|
|
recman = recmanrole.person
|
|
meeting = MeetingFactory(type_id="ietf")
|
|
session = SessionFactory(group__type_id="wg", meeting=meeting)
|
|
apikey = PersonalApiKey.objects.create(endpoint=url, person=recman)
|
|
video = "https://foo.example.com/bar/beer/"
|
|
|
|
# error cases
|
|
r = self.client.post(url, {})
|
|
self.assertContains(r, "Missing apikey parameter", status_code=400)
|
|
|
|
badrole = RoleFactory(group__type_id="ietf", name_id="ad")
|
|
badapikey = PersonalApiKey.objects.create(endpoint=url, person=badrole.person)
|
|
badrole.person.user.last_login = timezone.now()
|
|
badrole.person.user.save()
|
|
r = self.client.post(url, {"apikey": badapikey.hash()})
|
|
self.assertContains(r, "Restricted to role: Recording Manager", status_code=403)
|
|
|
|
r = self.client.post(url, {"apikey": apikey.hash()})
|
|
self.assertContains(r, "Too long since last regular login", status_code=400)
|
|
recman.user.last_login = timezone.now()
|
|
recman.user.save()
|
|
|
|
r = self.client.get(url, {"apikey": apikey.hash()})
|
|
self.assertContains(r, "Method not allowed", status_code=405)
|
|
|
|
r = self.client.post(url, {"apikey": apikey.hash()})
|
|
self.assertContains(r, "Missing session_id parameter", status_code=400)
|
|
|
|
r = self.client.post(url, {"apikey": apikey.hash(), "session_id": session.pk})
|
|
self.assertContains(r, "Missing url parameter", status_code=400)
|
|
|
|
bad_pk = int(Session.objects.order_by("-pk").first().pk) + 1
|
|
r = self.client.post(
|
|
url,
|
|
{
|
|
"apikey": apikey.hash(),
|
|
"session_id": bad_pk,
|
|
"url": video,
|
|
},
|
|
)
|
|
self.assertContains(r, "Session not found", status_code=400)
|
|
|
|
r = self.client.post(
|
|
url,
|
|
{
|
|
"apikey": apikey.hash(),
|
|
"session_id": "foo",
|
|
"url": video,
|
|
},
|
|
)
|
|
self.assertContains(r, "Invalid session_id", status_code=400)
|
|
|
|
r = self.client.post(
|
|
url,
|
|
{
|
|
"apikey": apikey.hash(),
|
|
"session_id": session.pk,
|
|
"url": "foobar",
|
|
},
|
|
)
|
|
self.assertContains(r, "Invalid url value: 'foobar'", status_code=400)
|
|
|
|
r = self.client.post(
|
|
url, {"apikey": apikey.hash(), "session_id": session.pk, "url": video}
|
|
)
|
|
self.assertContains(r, "Done", status_code=200)
|
|
|
|
recordings = session.recordings()
|
|
self.assertEqual(len(recordings), 1)
|
|
doc = recordings[0]
|
|
self.assertEqual(doc.external_url, video)
|
|
event = doc.latest_event()
|
|
self.assertEqual(event.by, recman)
|
|
|
|
def test_api_add_session_attendees(self):
|
|
url = urlreverse('ietf.meeting.views.api_add_session_attendees')
|
|
otherperson = PersonFactory()
|
|
recmanrole = RoleFactory(group__type_id='ietf', name_id='recman')
|
|
recman = recmanrole.person
|
|
meeting = MeetingFactory(type_id='ietf')
|
|
session = SessionFactory(group__type_id='wg', meeting=meeting)
|
|
apikey = PersonalApiKey.objects.create(endpoint=url, person=recman)
|
|
|
|
badrole = RoleFactory(group__type_id='ietf', name_id='ad')
|
|
badapikey = PersonalApiKey.objects.create(endpoint=url, person=badrole.person)
|
|
badrole.person.user.last_login = timezone.now()
|
|
badrole.person.user.save()
|
|
|
|
# Improper credentials, or method
|
|
r = self.client.post(url, {})
|
|
self.assertContains(r, "Missing apikey parameter", status_code=400)
|
|
|
|
r = self.client.post(url, {'apikey': badapikey.hash()} )
|
|
self.assertContains(r, "Restricted to role: Recording Manager", status_code=403)
|
|
|
|
r = self.client.post(url, {'apikey': apikey.hash()} )
|
|
self.assertContains(r, "Too long since last regular login", status_code=400)
|
|
|
|
recman.user.last_login = timezone.now()-datetime.timedelta(days=365)
|
|
recman.user.save()
|
|
r = self.client.post(url, {'apikey': apikey.hash()} )
|
|
self.assertContains(r, "Too long since last regular login", status_code=400)
|
|
|
|
recman.user.last_login = timezone.now()
|
|
recman.user.save()
|
|
r = self.client.get(url, {'apikey': apikey.hash()} )
|
|
self.assertContains(r, "Method not allowed", status_code=405)
|
|
|
|
recman.user.last_login = timezone.now()
|
|
recman.user.save()
|
|
|
|
# Malformed requests
|
|
r = self.client.post(url, {'apikey': apikey.hash()} )
|
|
self.assertContains(r, "Missing attended parameter", status_code=400)
|
|
|
|
for baddict in (
|
|
'{}',
|
|
'{"bogons;drop table":"bogons;drop table"}',
|
|
'{"session_id":"Not an integer;drop table"}',
|
|
f'{{"session_id":{session.pk},"attendees":"not a list;drop table"}}',
|
|
f'{{"session_id":{session.pk},"attendees":"not a list;drop table"}}',
|
|
f'{{"session_id":{session.pk},"attendees":[1,2,"not an int;drop table",4]}}',
|
|
):
|
|
r = self.client.post(url, {'apikey': apikey.hash(), 'attended': baddict})
|
|
self.assertContains(r, "Malformed post", status_code=400)
|
|
|
|
bad_session_id = Session.objects.order_by('-pk').first().pk + 1
|
|
r = self.client.post(url, {'apikey': apikey.hash(), 'attended': f'{{"session_id":{bad_session_id},"attendees":[]}}'})
|
|
self.assertContains(r, "Invalid session", status_code=400)
|
|
bad_user_id = User.objects.order_by('-pk').first().pk + 1
|
|
r = self.client.post(url, {'apikey': apikey.hash(), 'attended': f'{{"session_id":{session.pk},"attendees":[{bad_user_id}]}}'})
|
|
self.assertContains(r, "Invalid attendee", status_code=400)
|
|
|
|
# Reasonable request
|
|
r = self.client.post(url, {'apikey':apikey.hash(), 'attended': f'{{"session_id":{session.pk},"attendees":[{recman.user.pk}, {otherperson.user.pk}]}}'})
|
|
|
|
self.assertEqual(session.attended_set.count(),2)
|
|
self.assertTrue(session.attended_set.filter(person=recman).exists())
|
|
self.assertTrue(session.attended_set.filter(person=otherperson).exists())
|
|
|
|
def test_api_upload_polls_and_chatlog(self):
|
|
recmanrole = RoleFactory(group__type_id='ietf', name_id='recman')
|
|
recmanrole.person.user.last_login = timezone.now()
|
|
recmanrole.person.user.save()
|
|
|
|
badrole = RoleFactory(group__type_id='ietf', name_id='ad')
|
|
badrole.person.user.last_login = timezone.now()
|
|
badrole.person.user.save()
|
|
|
|
meeting = MeetingFactory(type_id='ietf')
|
|
session = SessionFactory(group__type_id='wg', meeting=meeting)
|
|
|
|
for type_id, content in (
|
|
(
|
|
"chatlog",
|
|
"""[
|
|
{
|
|
"author": "Raymond Lutz",
|
|
"text": "<p>Yes I like that comment just made</p>",
|
|
"time": "2022-07-28T19:26:16Z"
|
|
},
|
|
{
|
|
"author": "Carsten Bormann",
|
|
"text": "<p>But software is not a thing.</p>",
|
|
"time": "2022-07-28T19:26:45Z"
|
|
}
|
|
]"""
|
|
),
|
|
(
|
|
"polls",
|
|
"""[
|
|
{
|
|
"start_time": "2022-07-28T19:19:54Z",
|
|
"end_time": "2022-07-28T19:20:23Z",
|
|
"text": "Are you willing to review the documents?",
|
|
"raise_hand": 57,
|
|
"do_not_raise_hand": 11
|
|
},
|
|
{
|
|
"start_time": "2022-07-28T19:20:56Z",
|
|
"end_time": "2022-07-28T19:21:30Z",
|
|
"text": "Would you be willing to edit or coauthor a document?",
|
|
"raise_hand": 31,
|
|
"do_not_raise_hand": 31
|
|
}
|
|
]"""
|
|
),
|
|
):
|
|
url = urlreverse(f"ietf.meeting.views.api_upload_{type_id}")
|
|
apikey = PersonalApiKey.objects.create(endpoint=url, person=recmanrole.person)
|
|
badapikey = PersonalApiKey.objects.create(endpoint=url, person=badrole.person)
|
|
|
|
r = self.client.post(url, {})
|
|
self.assertContains(r, "Missing apikey parameter", status_code=400)
|
|
|
|
r = self.client.post(url, {'apikey': badapikey.hash()} )
|
|
self.assertContains(r, "Restricted to role: Recording Manager", status_code=403)
|
|
|
|
r = self.client.get(url, {'apikey': apikey.hash()} )
|
|
self.assertContains(r, "Method not allowed", status_code=405)
|
|
|
|
r = self.client.post(url, {'apikey': apikey.hash()} )
|
|
self.assertContains(r, "Missing apidata parameter", status_code=400)
|
|
|
|
for baddict in (
|
|
'{}',
|
|
'{"bogons;drop table":"bogons;drop table"}',
|
|
'{"session_id":"Not an integer;drop table"}',
|
|
f'{{"session_id":{session.pk},"{type_id}":"not a list;drop table"}}',
|
|
f'{{"session_id":{session.pk},"{type_id}":"not a list;drop table"}}',
|
|
f'{{"session_id":{session.pk},"{type_id}":[{{}}, {{}}, "not an int;drop table", {{}}]}}',
|
|
):
|
|
r = self.client.post(url, {'apikey': apikey.hash(), 'apidata': baddict})
|
|
self.assertContains(r, "Malformed post", status_code=400)
|
|
|
|
bad_session_id = Session.objects.order_by('-pk').first().pk + 1
|
|
r = self.client.post(url, {'apikey': apikey.hash(), 'apidata': f'{{"session_id":{bad_session_id},"{type_id}":[]}}'})
|
|
self.assertContains(r, "Invalid session", status_code=400)
|
|
|
|
# Valid POST
|
|
r = self.client.post(url,{'apikey':apikey.hash(),'apidata': f'{{"session_id":{session.pk}, "{type_id}":{content}}}'})
|
|
self.assertEqual(r.status_code, 200)
|
|
|
|
newdoc = session.sessionpresentation_set.get(document__type_id=type_id).document
|
|
newdoccontent = get_unicode_document_content(newdoc.name, Path(session.meeting.get_materials_path()) / type_id / newdoc.uploaded_filename)
|
|
self.assertEqual(json.loads(content), json.loads(newdoccontent))
|
|
|
|
def test_deprecated_api_upload_bluesheet(self):
|
|
url = urlreverse('ietf.meeting.views.api_upload_bluesheet')
|
|
recmanrole = RoleFactory(group__type_id='ietf', name_id='recman')
|
|
recman = recmanrole.person
|
|
meeting = MeetingFactory(type_id='ietf')
|
|
session = SessionFactory(group__type_id='wg', meeting=meeting)
|
|
group = session.group
|
|
apikey = PersonalApiKey.objects.create(endpoint=url, person=recman)
|
|
|
|
people = [
|
|
{"name": "Andrea Andreotti", "affiliation": "Azienda"},
|
|
{"name": "Bosse Bernadotte", "affiliation": "Bolag"},
|
|
{"name": "Charles Charlemagne", "affiliation": "Compagnie"},
|
|
]
|
|
for i in range(3):
|
|
faker = random_faker()
|
|
people.append(dict(name=faker.name(), affiliation=faker.company()))
|
|
bluesheet = json.dumps(people)
|
|
|
|
# error cases
|
|
r = self.client.post(url, {})
|
|
self.assertContains(r, "Missing apikey parameter", status_code=400)
|
|
|
|
badrole = RoleFactory(group__type_id='ietf', name_id='ad')
|
|
badapikey = PersonalApiKey.objects.create(endpoint=url, person=badrole.person)
|
|
badrole.person.user.last_login = timezone.now()
|
|
badrole.person.user.save()
|
|
r = self.client.post(url, {'apikey': badapikey.hash()})
|
|
self.assertContains(r, "Restricted to roles: Recording Manager, Secretariat", status_code=403)
|
|
|
|
r = self.client.post(url, {'apikey': apikey.hash()})
|
|
self.assertContains(r, "Too long since last regular login", status_code=400)
|
|
recman.user.last_login = timezone.now()
|
|
recman.user.save()
|
|
|
|
r = self.client.get(url, {'apikey': apikey.hash()})
|
|
self.assertContains(r, "Method not allowed", status_code=405)
|
|
|
|
r = self.client.post(url, {'apikey': apikey.hash(), 'group': group.acronym})
|
|
self.assertContains(r, "Missing meeting parameter", status_code=400)
|
|
|
|
r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, })
|
|
self.assertContains(r, "Missing group parameter", status_code=400)
|
|
|
|
r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': group.acronym})
|
|
self.assertContains(r, "Missing item parameter", status_code=400)
|
|
|
|
r = self.client.post(url,
|
|
{'apikey': apikey.hash(), 'meeting': meeting.number, 'group': group.acronym, 'item': '1'})
|
|
self.assertContains(r, "Missing bluesheet parameter", status_code=400)
|
|
|
|
r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': '1', 'group': group.acronym,
|
|
'item': '1', 'bluesheet': bluesheet, })
|
|
self.assertContains(r, "No sessions found for meeting", status_code=400)
|
|
|
|
r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': 'bogous',
|
|
'item': '1', 'bluesheet': bluesheet, })
|
|
self.assertContains(r, "No sessions found in meeting '%s' for group 'bogous'" % meeting.number, status_code=400)
|
|
|
|
r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': group.acronym,
|
|
'item': '1', 'bluesheet': "foobar", })
|
|
self.assertContains(r, "Invalid json value: 'foobar'", status_code=400)
|
|
|
|
r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': group.acronym,
|
|
'item': '5', 'bluesheet': bluesheet, })
|
|
self.assertContains(r, "No item '5' found in list of sessions for group", status_code=400)
|
|
|
|
r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': group.acronym,
|
|
'item': 'foo', 'bluesheet': bluesheet, })
|
|
self.assertContains(r, "Expected a numeric value for 'item', found 'foo'", status_code=400)
|
|
|
|
r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': group.acronym,
|
|
'item': '1', 'bluesheet': bluesheet, })
|
|
self.assertContains(r, "Done", status_code=200)
|
|
|
|
# Submit again, with slightly different content, as an updated version
|
|
people[1]['affiliation'] = 'Bolaget AB'
|
|
bluesheet = json.dumps(people)
|
|
r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': group.acronym,
|
|
'item': '1', 'bluesheet': bluesheet, })
|
|
self.assertContains(r, "Done", status_code=200)
|
|
|
|
bluesheet = session.sessionpresentation_set.filter(document__type__slug='bluesheets').first().document
|
|
# We've submitted an update; check that the rev is right
|
|
self.assertEqual(bluesheet.rev, '01')
|
|
# Check the content
|
|
with open(bluesheet.get_file_name()) as file:
|
|
text = file.read()
|
|
for p in people:
|
|
self.assertIn(p['name'], html.unescape(text))
|
|
self.assertIn(p['affiliation'], html.unescape(text))
|
|
|
|
def test_api_upload_bluesheet(self):
|
|
url = urlreverse("ietf.meeting.views.api_upload_bluesheet")
|
|
recmanrole = RoleFactory(group__type_id="ietf", name_id="recman")
|
|
recman = recmanrole.person
|
|
meeting = MeetingFactory(type_id="ietf")
|
|
session = SessionFactory(group__type_id="wg", meeting=meeting)
|
|
group = session.group
|
|
apikey = PersonalApiKey.objects.create(endpoint=url, person=recman)
|
|
|
|
people = [
|
|
{"name": "Andrea Andreotti", "affiliation": "Azienda"},
|
|
{"name": "Bosse Bernadotte", "affiliation": "Bolag"},
|
|
{"name": "Charles Charlemagne", "affiliation": "Compagnie"},
|
|
]
|
|
for i in range(3):
|
|
faker = random_faker()
|
|
people.append(dict(name=faker.name(), affiliation=faker.company()))
|
|
bluesheet = json.dumps(people)
|
|
|
|
# error cases
|
|
r = self.client.post(url, {})
|
|
self.assertContains(r, "Missing apikey parameter", status_code=400)
|
|
|
|
badrole = RoleFactory(group__type_id="ietf", name_id="ad")
|
|
badapikey = PersonalApiKey.objects.create(endpoint=url, person=badrole.person)
|
|
badrole.person.user.last_login = timezone.now()
|
|
badrole.person.user.save()
|
|
r = self.client.post(url, {"apikey": badapikey.hash()})
|
|
self.assertContains(
|
|
r, "Restricted to roles: Recording Manager, Secretariat", status_code=403
|
|
)
|
|
|
|
r = self.client.post(url, {"apikey": apikey.hash()})
|
|
self.assertContains(r, "Too long since last regular login", status_code=400)
|
|
recman.user.last_login = timezone.now()
|
|
recman.user.save()
|
|
|
|
r = self.client.get(url, {"apikey": apikey.hash()})
|
|
self.assertContains(r, "Method not allowed", status_code=405)
|
|
|
|
r = self.client.post(url, {"apikey": apikey.hash()})
|
|
self.assertContains(r, "Missing session_id parameter", status_code=400)
|
|
|
|
r = self.client.post(url, {"apikey": apikey.hash(), "session_id": session.pk})
|
|
self.assertContains(r, "Missing bluesheet parameter", status_code=400)
|
|
|
|
r = self.client.post(
|
|
url,
|
|
{
|
|
"apikey": apikey.hash(),
|
|
"meeting": meeting.number,
|
|
"group": group.acronym,
|
|
"item": "1",
|
|
"bluesheet": "foobar",
|
|
},
|
|
)
|
|
self.assertContains(r, "Invalid json value: 'foobar'", status_code=400)
|
|
|
|
bad_session_pk = int(Session.objects.order_by("-pk").first().pk) + 1
|
|
r = self.client.post(
|
|
url,
|
|
{
|
|
"apikey": apikey.hash(),
|
|
"session_id": bad_session_pk,
|
|
"bluesheet": bluesheet,
|
|
},
|
|
)
|
|
self.assertContains(r, "Session not found", status_code=400)
|
|
|
|
r = self.client.post(
|
|
url,
|
|
{
|
|
"apikey": apikey.hash(),
|
|
"session_id": "foo",
|
|
"bluesheet": bluesheet,
|
|
},
|
|
)
|
|
self.assertContains(r, "Invalid session_id", status_code=400)
|
|
|
|
r = self.client.post(
|
|
url,
|
|
{
|
|
"apikey": apikey.hash(),
|
|
"session_id": session.pk,
|
|
"bluesheet": bluesheet,
|
|
},
|
|
)
|
|
self.assertContains(r, "Done", status_code=200)
|
|
|
|
# Submit again, with slightly different content, as an updated version
|
|
people[1]["affiliation"] = "Bolaget AB"
|
|
bluesheet = json.dumps(people)
|
|
r = self.client.post(
|
|
url,
|
|
{
|
|
"apikey": apikey.hash(),
|
|
"meeting": meeting.number,
|
|
"group": group.acronym,
|
|
"item": "1",
|
|
"bluesheet": bluesheet,
|
|
},
|
|
)
|
|
self.assertContains(r, "Done", status_code=200)
|
|
|
|
bluesheet = (
|
|
session.sessionpresentation_set.filter(document__type__slug="bluesheets")
|
|
.first()
|
|
.document
|
|
)
|
|
# We've submitted an update; check that the rev is right
|
|
self.assertEqual(bluesheet.rev, "01")
|
|
# Check the content
|
|
with open(bluesheet.get_file_name()) as file:
|
|
text = file.read()
|
|
for p in people:
|
|
self.assertIn(p["name"], html.unescape(text))
|
|
self.assertIn(p["affiliation"], html.unescape(text))
|
|
|
|
def test_person_export(self):
|
|
person = PersonFactory()
|
|
url = urlreverse('ietf.api.views.PersonalInformationExportView')
|
|
login_testing_unauthorized(self, person.user.username, url)
|
|
r = self.client.get(url)
|
|
jsondata = r.json()
|
|
data = jsondata['person.person'][str(person.id)]
|
|
self.assertEqual(data['name'], person.name)
|
|
self.assertEqual(data['ascii'], person.ascii)
|
|
self.assertEqual(data['user']['email'], person.user.email)
|
|
|
|
def test_api_v2_person_export_view(self):
|
|
url = urlreverse('ietf.api.views.ApiV2PersonExportView')
|
|
robot = PersonFactory(user__is_staff=True)
|
|
RoleFactory(name_id='robot', person=robot, email=robot.email(), group__acronym='secretariat')
|
|
apikey = PersonalApiKey.objects.create(endpoint=url, person=robot)
|
|
|
|
# error cases
|
|
r = self.client.post(url, {})
|
|
self.assertContains(r, "Missing apikey parameter", status_code=400)
|
|
|
|
badrole = RoleFactory(group__type_id='ietf', name_id='ad')
|
|
badapikey = PersonalApiKey.objects.create(endpoint=url, person=badrole.person)
|
|
badrole.person.user.last_login = timezone.now()
|
|
badrole.person.user.save()
|
|
r = self.client.post(url, {'apikey': badapikey.hash()})
|
|
self.assertContains(r, "Restricted to role: Robot", status_code=403)
|
|
|
|
r = self.client.post(url, {'apikey': apikey.hash()})
|
|
self.assertContains(r, "No filters provided", status_code=400)
|
|
|
|
# working case
|
|
r = self.client.post(url, {'apikey': apikey.hash(), 'email': robot.email().address, '_expand': 'user'})
|
|
self.assertEqual(r.status_code, 200)
|
|
jsondata = r.json()
|
|
data = jsondata['person.person'][str(robot.id)]
|
|
self.assertEqual(data['name'], robot.name)
|
|
self.assertEqual(data['ascii'], robot.ascii)
|
|
self.assertEqual(data['user']['email'], robot.user.email)
|
|
|
|
def test_api_new_meeting_registration(self):
|
|
meeting = MeetingFactory(type_id='ietf')
|
|
reg = {
|
|
'apikey': 'invalid',
|
|
'affiliation': "Alguma Corporação",
|
|
'country_code': 'PT',
|
|
'email': 'foo@example.pt',
|
|
'first_name': 'Foo',
|
|
'last_name': 'Bar',
|
|
'meeting': meeting.number,
|
|
'reg_type': 'hackathon',
|
|
'ticket_type': '',
|
|
'checkedin': 'False',
|
|
}
|
|
url = urlreverse('ietf.api.views.api_new_meeting_registration')
|
|
r = self.client.post(url, reg)
|
|
self.assertContains(r, 'Invalid apikey', status_code=403)
|
|
oidcp = PersonFactory(user__is_staff=True)
|
|
# Make sure 'oidcp' has an acceptable role
|
|
RoleFactory(name_id='robot', person=oidcp, email=oidcp.email(), group__acronym='secretariat')
|
|
key = PersonalApiKey.objects.create(person=oidcp, endpoint=url)
|
|
reg['apikey'] = key.hash()
|
|
#
|
|
# Test valid POST
|
|
# FIXME: sometimes, there seems to be something in the outbox?
|
|
old_len = len(outbox)
|
|
r = self.client.post(url, reg)
|
|
self.assertContains(r, "Accepted, New registration, Email sent", status_code=202)
|
|
#
|
|
# Check outgoing mail
|
|
self.assertEqual(len(outbox), old_len + 1)
|
|
body = get_payload_text(outbox[-1])
|
|
self.assertIn(reg['email'], outbox[-1]['To'] )
|
|
self.assertIn(reg['email'], body)
|
|
self.assertIn('account creation request', body)
|
|
#
|
|
# Check record
|
|
obj = MeetingRegistration.objects.get(email=reg['email'], meeting__number=reg['meeting'])
|
|
for key in ['affiliation', 'country_code', 'first_name', 'last_name', 'person', 'reg_type', 'ticket_type', 'checkedin']:
|
|
self.assertEqual(getattr(obj, key), False if key=='checkedin' else reg.get(key) , "Bad data for field '%s'" % key)
|
|
#
|
|
# Test with existing user
|
|
person = PersonFactory()
|
|
reg['email'] = person.email().address
|
|
reg['first_name'] = person.first_name()
|
|
reg['last_name'] = person.last_name()
|
|
#
|
|
r = self.client.post(url, reg)
|
|
self.assertContains(r, "Accepted, New registration", status_code=202)
|
|
#
|
|
# There should be no new outgoing mail
|
|
self.assertEqual(len(outbox), old_len + 1)
|
|
#
|
|
# Test multiple reg types
|
|
reg['reg_type'] = 'remote'
|
|
reg['ticket_type'] = 'full_week_pass'
|
|
r = self.client.post(url, reg)
|
|
self.assertContains(r, "Accepted, New registration", status_code=202)
|
|
objs = MeetingRegistration.objects.filter(email=reg['email'], meeting__number=reg['meeting'])
|
|
self.assertEqual(len(objs), 2)
|
|
self.assertEqual(objs.filter(reg_type='hackathon').count(), 1)
|
|
self.assertEqual(objs.filter(reg_type='remote', ticket_type='full_week_pass').count(), 1)
|
|
self.assertEqual(len(outbox), old_len + 1)
|
|
#
|
|
# Test incomplete POST
|
|
drop_fields = ['affiliation', 'first_name', 'reg_type']
|
|
for field in drop_fields:
|
|
del reg[field]
|
|
r = self.client.post(url, reg)
|
|
self.assertContains(r, 'Missing parameters:', status_code=400)
|
|
err, fields = r.content.decode().split(':', 1)
|
|
missing_fields = [f.strip() for f in fields.split(',')]
|
|
self.assertEqual(set(missing_fields), set(drop_fields))
|
|
|
|
def test_api_version(self):
|
|
DumpInfo.objects.create(date=timezone.datetime(2022,8,31,7,10,1,tzinfo=datetime.timezone.utc), host='testapi.example.com',tz='UTC')
|
|
url = urlreverse('ietf.api.views.version')
|
|
r = self.client.get(url)
|
|
data = r.json()
|
|
self.assertEqual(data['version'], ietf.__version__+ietf.__patch__)
|
|
self.assertEqual(data['dumptime'], "2022-08-31 07:10:01 +0000")
|
|
DumpInfo.objects.update(tz='PST8PDT')
|
|
r = self.client.get(url)
|
|
data = r.json()
|
|
self.assertEqual(data['dumptime'], "2022-08-31 07:10:01 -0700")
|
|
|
|
|
|
def test_api_appauth(self):
|
|
url = urlreverse('ietf.api.views.app_auth')
|
|
person = PersonFactory()
|
|
apikey = PersonalApiKey.objects.create(endpoint=url, person=person)
|
|
|
|
self.client.login(username=person.user.username,password=f'{person.user.username}+password')
|
|
self.client.logout()
|
|
|
|
# error cases
|
|
# missing apikey
|
|
r = self.client.post(url, {})
|
|
self.assertContains(r, 'Missing apikey parameter', status_code=400)
|
|
|
|
# invalid apikey
|
|
r = self.client.post(url, {'apikey': 'foobar'})
|
|
self.assertContains(r, 'Invalid apikey', status_code=403)
|
|
|
|
# working case
|
|
r = self.client.post(url, {'apikey': apikey.hash()})
|
|
self.assertEqual(r.status_code, 200)
|
|
jsondata = r.json()
|
|
self.assertEqual(jsondata['success'], True)
|
|
|
|
def test_api_get_session_matherials_no_agenda_meeting_url(self):
|
|
meeting = MeetingFactory(type_id='ietf')
|
|
session = SessionFactory(meeting=meeting)
|
|
url = urlreverse('ietf.meeting.views.api_get_session_materials', kwargs={'session_id': session.pk})
|
|
r = self.client.get(url)
|
|
self.assertEqual(r.status_code, 200)
|
|
|
|
|
|
|
|
class DirectAuthApiTests(TestCase):
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.valid_token = "nSZJDerbau6WZwbEAYuQ"
|
|
self.invalid_token = self.valid_token
|
|
while self.invalid_token == self.valid_token:
|
|
self.invalid_token = User.objects.make_random_password(20)
|
|
self.url = urlreverse("ietf.api.views.directauth")
|
|
self.valid_person = PersonFactory()
|
|
self.valid_password = self.valid_person.user.username+"+password"
|
|
self.invalid_password = self.valid_password
|
|
while self.invalid_password == self.valid_password:
|
|
self.invalid_password = User.objects.make_random_password(20)
|
|
|
|
self.valid_body_with_good_password = self.post_dict(authtoken=self.valid_token, username=self.valid_person.user.username, password=self.valid_password)
|
|
self.valid_body_with_bad_password = self.post_dict(authtoken=self.valid_token, username=self.valid_person.user.username, password=self.invalid_password)
|
|
self.valid_body_with_unknown_user = self.post_dict(authtoken=self.valid_token, username="notauser@nowhere.nada", password=self.valid_password)
|
|
|
|
def post_dict(self, authtoken, username, password):
|
|
data = dict()
|
|
if authtoken is not None:
|
|
data["authtoken"] = authtoken
|
|
if username is not None:
|
|
data["username"] = username
|
|
if password is not None:
|
|
data["password"] = password
|
|
return dict(data = json.dumps(data))
|
|
|
|
def response_data(self, response):
|
|
try:
|
|
data = json.loads(response.content)
|
|
except json.decoder.JSONDecodeError:
|
|
data = None
|
|
self.assertIsNotNone(data)
|
|
return data
|
|
|
|
def test_bad_methods(self):
|
|
for method in (self.client.get, self.client.put, self.client.head, self.client.delete, self.client.patch):
|
|
r = method(self.url)
|
|
self.assertEqual(r.status_code, 405)
|
|
|
|
def test_bad_post(self):
|
|
for bad in [
|
|
self.post_dict(authtoken=None, username=self.valid_person.user.username, password=self.valid_password),
|
|
self.post_dict(authtoken=self.valid_token, username=None, password=self.valid_password),
|
|
self.post_dict(authtoken=self.valid_token, username=self.valid_person.user.username, password=None),
|
|
self.post_dict(authtoken=None, username=None, password=self.valid_password),
|
|
self.post_dict(authtoken=self.valid_token, username=None, password=None),
|
|
self.post_dict(authtoken=None, username=self.valid_person.user.username, password=None),
|
|
self.post_dict(authtoken=None, username=None, password=None),
|
|
]:
|
|
r = self.client.post(self.url, bad)
|
|
self.assertEqual(r.status_code, 200)
|
|
data = self.response_data(r)
|
|
self.assertEqual(data["result"], "failure")
|
|
self.assertEqual(data["reason"], "invalid post")
|
|
|
|
bad = dict(authtoken=self.valid_token, username=self.valid_person.user.username, password=self.valid_password)
|
|
r = self.client.post(self.url, bad)
|
|
self.assertEqual(r.status_code, 200)
|
|
data = self.response_data(r)
|
|
self.assertEqual(data["result"], "failure")
|
|
self.assertEqual(data["reason"], "invalid post")
|
|
|
|
def test_notokenstore(self):
|
|
self.assertFalse(hasattr(settings, "APP_API_TOKENS"))
|
|
r = self.client.post(self.url,self.valid_body_with_good_password)
|
|
self.assertEqual(r.status_code, 200)
|
|
data = self.response_data(r)
|
|
self.assertEqual(data["result"], "failure")
|
|
self.assertEqual(data["reason"], "invalid authtoken")
|
|
|
|
@override_settings(APP_API_TOKENS={"ietf.api.views.directauth":"nSZJDerbau6WZwbEAYuQ"})
|
|
def test_bad_username(self):
|
|
r = self.client.post(self.url, self.valid_body_with_unknown_user)
|
|
self.assertEqual(r.status_code, 200)
|
|
data = self.response_data(r)
|
|
self.assertEqual(data["result"], "failure")
|
|
self.assertEqual(data["reason"], "authentication failed")
|
|
|
|
@override_settings(APP_API_TOKENS={"ietf.api.views.directauth":"nSZJDerbau6WZwbEAYuQ"})
|
|
def test_bad_password(self):
|
|
r = self.client.post(self.url, self.valid_body_with_bad_password)
|
|
self.assertEqual(r.status_code, 200)
|
|
data = self.response_data(r)
|
|
self.assertEqual(data["result"], "failure")
|
|
self.assertEqual(data["reason"], "authentication failed")
|
|
|
|
@override_settings(APP_API_TOKENS={"ietf.api.views.directauth":"nSZJDerbau6WZwbEAYuQ"})
|
|
def test_good_password(self):
|
|
r = self.client.post(self.url, self.valid_body_with_good_password)
|
|
self.assertEqual(r.status_code, 200)
|
|
data = self.response_data(r)
|
|
self.assertEqual(data["result"], "success")
|
|
|
|
class TastypieApiTestCase(ResourceTestCaseMixin, TestCase):
|
|
def __init__(self, *args, **kwargs):
|
|
self.apps = {}
|
|
for app_name in settings.INSTALLED_APPS:
|
|
if app_name.startswith('ietf') and not app_name in OMITTED_APPS:
|
|
app = import_module(app_name)
|
|
name = app_name.split('.',1)[-1]
|
|
models_path = os.path.join(os.path.dirname(app.__file__), "models.py")
|
|
if os.path.exists(models_path):
|
|
self.apps[name] = app_name
|
|
super(TastypieApiTestCase, self).__init__(*args, **kwargs)
|
|
|
|
def test_api_top_level(self):
|
|
client = Client(Accept='application/json')
|
|
r = client.get("/api/v1/")
|
|
self.assertValidJSONResponse(r)
|
|
resource_list = r.json()
|
|
|
|
for name in self.apps:
|
|
if not name in self.apps:
|
|
sys.stderr.write("Expected a REST API resource for %s, but didn't find one\n" % name)
|
|
|
|
for name in self.apps:
|
|
self.assertIn(name, resource_list,
|
|
"Expected a REST API resource for %s, but didn't find one" % name)
|
|
|
|
def test_all_model_resources_exist(self):
|
|
client = Client(Accept='application/json')
|
|
r = client.get("/api/v1")
|
|
top = r.json()
|
|
for name in self.apps:
|
|
app_name = self.apps[name]
|
|
app = import_module(app_name)
|
|
self.assertEqual("/api/v1/%s/"%name, top[name]["list_endpoint"])
|
|
r = client.get(top[name]["list_endpoint"])
|
|
self.assertValidJSONResponse(r)
|
|
app_resources = r.json()
|
|
#
|
|
model_list = apps.get_app_config(name).get_models()
|
|
for model in model_list:
|
|
if not model._meta.model_name in list(app_resources.keys()):
|
|
#print("There doesn't seem to be any resource for model %s.models.%s"%(app.__name__,model.__name__,))
|
|
self.assertIn(model._meta.model_name, list(app_resources.keys()),
|
|
"There doesn't seem to be any API resource for model %s.models.%s"%(app.__name__,model.__name__,))
|
|
|
|
|
|
class RfcdiffSupportTests(TestCase):
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.target_view = 'ietf.api.views.rfcdiff_latest_json'
|
|
self._last_rfc_num = 8000
|
|
|
|
def getJson(self, view_args):
|
|
url = urlreverse(self.target_view, kwargs=view_args)
|
|
r = self.client.get(url)
|
|
self.assertEqual(r.status_code, 200)
|
|
return r.json()
|
|
|
|
def next_rfc_number(self):
|
|
self._last_rfc_num += 1
|
|
return self._last_rfc_num
|
|
|
|
def do_draft_test(self, name):
|
|
draft = IndividualDraftFactory(name=name, rev='00', create_revisions=range(0,13))
|
|
draft = reload_db_objects(draft)
|
|
prev_draft_rev = f'{(int(draft.rev)-1):02d}'
|
|
|
|
received = self.getJson(dict(name=draft.name))
|
|
self.assertEqual(
|
|
received,
|
|
dict(
|
|
name=draft.name,
|
|
rev=draft.rev,
|
|
content_url=draft.get_href(),
|
|
previous=f'{draft.name}-{prev_draft_rev}',
|
|
previous_url= draft.history_set.get(rev=prev_draft_rev).get_href(),
|
|
),
|
|
'Incorrect JSON when draft revision not specified',
|
|
)
|
|
|
|
received = self.getJson(dict(name=draft.name, rev=draft.rev))
|
|
self.assertEqual(
|
|
received,
|
|
dict(
|
|
name=draft.name,
|
|
rev=draft.rev,
|
|
content_url=draft.get_href(),
|
|
previous=f'{draft.name}-{prev_draft_rev}',
|
|
previous_url= draft.history_set.get(rev=prev_draft_rev).get_href(),
|
|
),
|
|
'Incorrect JSON when latest revision specified',
|
|
)
|
|
|
|
received = self.getJson(dict(name=draft.name, rev='10'))
|
|
prev_draft_rev = '09'
|
|
self.assertEqual(
|
|
received,
|
|
dict(
|
|
name=draft.name,
|
|
rev='10',
|
|
content_url=draft.history_set.get(rev='10').get_href(),
|
|
previous=f'{draft.name}-{prev_draft_rev}',
|
|
previous_url= draft.history_set.get(rev=prev_draft_rev).get_href(),
|
|
),
|
|
'Incorrect JSON when historical revision specified',
|
|
)
|
|
|
|
received = self.getJson(dict(name=draft.name, rev='00'))
|
|
self.assertNotIn('previous', received, 'Rev 00 has no previous name when not replacing a draft')
|
|
|
|
replaced = IndividualDraftFactory()
|
|
RelatedDocument.objects.create(relationship_id='replaces',source=draft,target=replaced.docalias.first())
|
|
received = self.getJson(dict(name=draft.name, rev='00'))
|
|
self.assertEqual(received['previous'], f'{replaced.name}-{replaced.rev}',
|
|
'Rev 00 has a previous name when replacing a draft')
|
|
|
|
def test_draft(self):
|
|
# test with typical, straightforward names
|
|
self.do_draft_test(name='draft-somebody-did-a-thing')
|
|
# try with different potentially problematic names
|
|
self.do_draft_test(name='draft-someone-did-something-01-02')
|
|
self.do_draft_test(name='draft-someone-did-something-else-02')
|
|
self.do_draft_test(name='draft-someone-did-something-02-weird-01')
|
|
|
|
def do_draft_with_broken_history_test(self, name):
|
|
draft = IndividualDraftFactory(name=name, rev='10')
|
|
received = self.getJson(dict(name=draft.name,rev='09'))
|
|
self.assertEqual(received['rev'],'09')
|
|
self.assertEqual(received['previous'], f'{draft.name}-08')
|
|
self.assertTrue('warning' in received)
|
|
|
|
def test_draft_with_broken_history(self):
|
|
# test with typical, straightforward names
|
|
self.do_draft_with_broken_history_test(name='draft-somebody-did-something')
|
|
# try with different potentially problematic names
|
|
self.do_draft_with_broken_history_test(name='draft-someone-did-something-01-02')
|
|
self.do_draft_with_broken_history_test(name='draft-someone-did-something-else-02')
|
|
self.do_draft_with_broken_history_test(name='draft-someone-did-something-02-weird-03')
|
|
|
|
def do_rfc_test(self, draft_name):
|
|
draft = WgDraftFactory(name=draft_name, create_revisions=range(0,2))
|
|
draft.docalias.create(name=f'rfc{self.next_rfc_number():04}')
|
|
draft.set_state(State.objects.get(type_id='draft',slug='rfc'))
|
|
draft.set_state(State.objects.get(type_id='draft-iesg', slug='pub'))
|
|
draft = reload_db_objects(draft)
|
|
rfc = draft
|
|
|
|
number = rfc.rfc_number()
|
|
received = self.getJson(dict(name=number))
|
|
self.assertEqual(
|
|
received,
|
|
dict(
|
|
content_url=rfc.get_href(),
|
|
name=rfc.canonical_name(),
|
|
previous=f'{draft.name}-{draft.rev}',
|
|
previous_url= draft.history_set.get(rev=draft.rev).get_href(),
|
|
),
|
|
'Can look up an RFC by number',
|
|
)
|
|
|
|
num_received = received
|
|
received = self.getJson(dict(name=rfc.canonical_name()))
|
|
self.assertEqual(num_received, received, 'RFC by canonical name gives same result as by number')
|
|
|
|
received = self.getJson(dict(name=f'RfC {number}'))
|
|
self.assertEqual(num_received, received, 'RFC with unusual spacing/caps gives same result as by number')
|
|
|
|
received = self.getJson(dict(name=draft.name))
|
|
self.assertEqual(num_received, received, 'RFC by draft name and no rev gives same result as by number')
|
|
|
|
received = self.getJson(dict(name=draft.name, rev='01'))
|
|
prev_draft_rev = '00'
|
|
self.assertEqual(
|
|
received,
|
|
dict(
|
|
content_url=draft.history_set.get(rev='01').get_href(),
|
|
name=draft.name,
|
|
rev='01',
|
|
previous=f'{draft.name}-{prev_draft_rev}',
|
|
previous_url= draft.history_set.get(rev=prev_draft_rev).get_href(),
|
|
),
|
|
'RFC by draft name with rev should give draft name, not canonical name'
|
|
)
|
|
|
|
def test_rfc(self):
|
|
# simple draft name
|
|
self.do_rfc_test(draft_name='draft-test-ar-ef-see')
|
|
# tricky draft names
|
|
self.do_rfc_test(draft_name='draft-whatever-02')
|
|
self.do_rfc_test(draft_name='draft-test-me-03-04')
|
|
|
|
def test_rfc_with_tombstone(self):
|
|
draft = WgDraftFactory(create_revisions=range(0,2))
|
|
draft.docalias.create(name='rfc3261') # See views_doc.HAS_TOMBSTONE
|
|
draft.set_state(State.objects.get(type_id='draft',slug='rfc'))
|
|
draft.set_state(State.objects.get(type_id='draft-iesg', slug='pub'))
|
|
draft = reload_db_objects(draft)
|
|
rfc = draft
|
|
|
|
# Some old rfcs had tombstones that shouldn't be used for comparisons
|
|
received = self.getJson(dict(name=rfc.canonical_name()))
|
|
self.assertTrue(received['previous'].endswith('00'))
|
|
|
|
def do_rfc_with_broken_history_test(self, draft_name):
|
|
draft = WgDraftFactory(rev='10', name=draft_name)
|
|
draft.docalias.create(name=f'rfc{self.next_rfc_number():04}')
|
|
draft.set_state(State.objects.get(type_id='draft',slug='rfc'))
|
|
draft.set_state(State.objects.get(type_id='draft-iesg', slug='pub'))
|
|
draft = reload_db_objects(draft)
|
|
rfc = draft
|
|
|
|
received = self.getJson(dict(name=draft.name))
|
|
self.assertEqual(
|
|
received,
|
|
dict(
|
|
content_url=rfc.get_href(),
|
|
name=rfc.canonical_name(),
|
|
previous=f'{draft.name}-10',
|
|
previous_url= f'{settings.IETF_ID_ARCHIVE_URL}{draft.name}-10.txt',
|
|
),
|
|
'RFC by draft name without rev should return canonical RFC name and no rev',
|
|
)
|
|
|
|
received = self.getJson(dict(name=draft.name, rev='10'))
|
|
self.assertEqual(received['name'], draft.name, 'RFC by draft name with rev should return draft name')
|
|
self.assertEqual(received['rev'], '10', 'Requested rev should be returned')
|
|
self.assertEqual(received['previous'], f'{draft.name}-09', 'Previous rev is one less than requested')
|
|
self.assertIn(f'{draft.name}-10', received['content_url'], 'Returned URL should include requested rev')
|
|
self.assertNotIn('warning', received, 'No warning when we have the rev requested')
|
|
|
|
received = self.getJson(dict(name=f'{draft.name}-09'))
|
|
self.assertEqual(received['name'], draft.name, 'RFC by draft name with rev should return draft name')
|
|
self.assertEqual(received['rev'], '09', 'Requested rev should be returned')
|
|
self.assertEqual(received['previous'], f'{draft.name}-08', 'Previous rev is one less than requested')
|
|
self.assertIn(f'{draft.name}-09', received['content_url'], 'Returned URL should include requested rev')
|
|
self.assertEqual(
|
|
received['warning'],
|
|
'History for this version not found - these results are speculation',
|
|
'Warning should be issued when requested rev is not found'
|
|
)
|
|
|
|
def test_rfc_with_broken_history(self):
|
|
# simple draft name
|
|
self.do_rfc_with_broken_history_test(draft_name='draft-some-draft')
|
|
# tricky draft names
|
|
self.do_rfc_with_broken_history_test(draft_name='draft-gizmo-01')
|
|
self.do_rfc_with_broken_history_test(draft_name='draft-oh-boy-what-a-draft-02-03')
|