# Copyright The IETF Trust 2015-2020, All Rights Reserved # -*- coding: utf-8 -*- import datetime import json import html import os import sys from importlib import import_module from mock import patch from django.apps import apps from django.conf import settings from django.test import Client from django.urls import reverse as urlreverse from django.utils import timezone from tastypie.test import ResourceTestCaseMixin import debug # pyflakes:ignore import ietf from ietf.group.factories import RoleFactory from ietf.meeting.factories import MeetingFactory, SessionFactory from ietf.meeting.test_data import make_meeting_test_data from ietf.meeting.models import Session from ietf.person.factories import PersonFactory, random_faker from ietf.person.models import User from ietf.person.models import PersonalApiKey from ietf.stats.models import MeetingRegistration from ietf.utils.mail import outbox, get_payload_text from ietf.utils.test_utils import TestCase, login_testing_unauthorized OMITTED_APPS = ( 'ietf.secr.meetings', 'ietf.secr.proceedings', 'ietf.ipr', ) class CustomApiTests(TestCase): settings_temp_path_overrides = TestCase.settings_temp_path_overrides + ['AGENDA_PATH'] # Using mock to patch the import functions in ietf.meeting.views, where # api_import_recordings() are using them: @patch('ietf.meeting.views.import_audio_files') def test_notify_meeting_import_audio_files(self, mock_import_audio): meeting = make_meeting_test_data() client = Client(Accept='application/json') # try invalid method GET url = urlreverse('ietf.meeting.views.api_import_recordings', kwargs={'number':meeting.number}) r = client.get(url) self.assertEqual(r.status_code, 405) # try valid method POST r = client.post(url) self.assertEqual(r.status_code, 201) def test_api_help_page(self): url = urlreverse('ietf.api.views.api_help') r = self.client.get(url) self.assertContains(r, 'The datatracker API', status_code=200) def test_api_openid_issuer(self): url = urlreverse('ietf.api.urls.oidc_issuer') r = self.client.get(url) self.assertContains(r, 'OpenID Connect Issuer', status_code=200) def test_api_set_session_video_url(self): url = urlreverse('ietf.meeting.views.api_set_session_video_url') recmanrole = RoleFactory(group__type_id='ietf', name_id='recman') recman = recmanrole.person meeting = MeetingFactory(type_id='ietf') session = SessionFactory(group__type_id='wg', meeting=meeting) group = session.group apikey = PersonalApiKey.objects.create(endpoint=url, person=recman) video = 'https://foo.example.com/bar/beer/' # error cases r = self.client.post(url, {}) self.assertContains(r, "Missing apikey parameter", status_code=400) badrole = RoleFactory(group__type_id='ietf', name_id='ad') badapikey = PersonalApiKey.objects.create(endpoint=url, person=badrole.person) badrole.person.user.last_login = timezone.now() badrole.person.user.save() r = self.client.post(url, {'apikey': badapikey.hash()} ) self.assertContains(r, "Restricted to role: Recording Manager", status_code=403) r = self.client.post(url, {'apikey': apikey.hash()} ) self.assertContains(r, "Too long since last regular login", status_code=400) recman.user.last_login = timezone.now() recman.user.save() r = self.client.get(url, {'apikey': apikey.hash()} ) self.assertContains(r, "Method not allowed", status_code=405) r = self.client.post(url, {'apikey': apikey.hash()} ) self.assertContains(r, "Missing meeting parameter", status_code=400) r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, } ) self.assertContains(r, "Missing group parameter", status_code=400) r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': group.acronym} ) self.assertContains(r, "Missing item parameter", status_code=400) r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': group.acronym, 'item': '1'} ) self.assertContains(r, "Missing url parameter", status_code=400) r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': '1', 'group': group.acronym, 'item': '1', 'url': video, }) self.assertContains(r, "No sessions found for meeting", status_code=400) r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': 'bogous', 'item': '1', 'url': video, }) self.assertContains(r, "No sessions found in meeting '%s' for group 'bogous'"%meeting.number, status_code=400) r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': group.acronym, 'item': '1', 'url': "foobar", }) self.assertContains(r, "Invalid url value: 'foobar'", status_code=400) r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': group.acronym, 'item': '5', 'url': video, }) self.assertContains(r, "No item '5' found in list of sessions for group", status_code=400) r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': group.acronym, 'item': 'foo', 'url': video, }) self.assertContains(r, "Expected a numeric value for 'item', found 'foo'", status_code=400) r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': group.acronym, 'item': '1', 'url': video+'/rum', }) self.assertContains(r, "Done", status_code=200) r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': group.acronym, 'item': '1', 'url': video+'/rum', }) self.assertContains(r, "URL is the same", status_code=400) r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': group.acronym, 'item': '1', 'url': video, }) self.assertContains(r, "Done", status_code=200) recordings = session.recordings() self.assertEqual(len(recordings), 1) doc = recordings[0] self.assertEqual(doc.external_url, video) event = doc.latest_event() self.assertEqual(event.by, recman) def test_api_add_session_attendees(self): url = urlreverse('ietf.meeting.views.api_add_session_attendees') otherperson = PersonFactory() recmanrole = RoleFactory(group__type_id='ietf', name_id='recman') recman = recmanrole.person meeting = MeetingFactory(type_id='ietf') session = SessionFactory(group__type_id='wg', meeting=meeting) apikey = PersonalApiKey.objects.create(endpoint=url, person=recman) badrole = RoleFactory(group__type_id='ietf', name_id='ad') badapikey = PersonalApiKey.objects.create(endpoint=url, person=badrole.person) badrole.person.user.last_login = timezone.now() badrole.person.user.save() # Improper credentials, or method r = self.client.post(url, {}) self.assertContains(r, "Missing apikey parameter", status_code=400) r = self.client.post(url, {'apikey': badapikey.hash()} ) self.assertContains(r, "Restricted to role: Recording Manager", status_code=403) r = self.client.post(url, {'apikey': apikey.hash()} ) self.assertContains(r, "Too long since last regular login", status_code=400) recman.user.last_login = timezone.now()-datetime.timedelta(days=365) recman.user.save() r = self.client.post(url, {'apikey': apikey.hash()} ) self.assertContains(r, "Too long since last regular login", status_code=400) recman.user.last_login = timezone.now() recman.user.save() r = self.client.get(url, {'apikey': apikey.hash()} ) self.assertContains(r, "Method not allowed", status_code=405) recman.user.last_login = timezone.now() recman.user.save() # Malformed requests r = self.client.post(url, {'apikey': apikey.hash()} ) self.assertContains(r, "Missing attended parameter", status_code=400) for baddict in ( '{}', '{"bogons;drop table":"bogons;drop table"}', '{"session_id":"Not an integer;drop table"}', f'{{"session_id":{session.pk},"attendees":"not a list;drop table"}}', f'{{"session_id":{session.pk},"attendees":"not a list;drop table"}}', f'{{"session_id":{session.pk},"attendees":[1,2,"not an int;drop table",4]}}', ): r = self.client.post(url, {'apikey': apikey.hash(), 'attended': baddict}) self.assertContains(r, "Malformed post", status_code=400) bad_session_id = Session.objects.order_by('-pk').first().pk + 1 r = self.client.post(url, {'apikey': apikey.hash(), 'attended': f'{{"session_id":{bad_session_id},"attendees":[]}}'}) self.assertContains(r, "Invalid session", status_code=400) bad_user_id = User.objects.order_by('-pk').first().pk + 1 r = self.client.post(url, {'apikey': apikey.hash(), 'attended': f'{{"session_id":{session.pk},"attendees":[{bad_user_id}]}}'}) self.assertContains(r, "Invalid attendee", status_code=400) # Reasonable request r = self.client.post(url, {'apikey':apikey.hash(), 'attended': f'{{"session_id":{session.pk},"attendees":[{recman.user.pk}, {otherperson.user.pk}]}}'}) self.assertEqual(session.attended_set.count(),2) self.assertTrue(session.attended_set.filter(person=recman).exists()) self.assertTrue(session.attended_set.filter(person=otherperson).exists()) def test_api_upload_bluesheet(self): url = urlreverse('ietf.meeting.views.api_upload_bluesheet') recmanrole = RoleFactory(group__type_id='ietf', name_id='recman') recman = recmanrole.person meeting = MeetingFactory(type_id='ietf') session = SessionFactory(group__type_id='wg', meeting=meeting) group = session.group apikey = PersonalApiKey.objects.create(endpoint=url, person=recman) people = [ {"name":"Andrea Andreotti", "affiliation": "Azienda"}, {"name":"Bosse Bernadotte", "affiliation": "Bolag"}, {"name":"Charles Charlemagne", "affiliation": "Compagnie"}, ] for i in range(3): faker = random_faker() people.append(dict(name=faker.name(), affiliation=faker.company())) bluesheet = json.dumps(people) # error cases r = self.client.post(url, {}) self.assertContains(r, "Missing apikey parameter", status_code=400) badrole = RoleFactory(group__type_id='ietf', name_id='ad') badapikey = PersonalApiKey.objects.create(endpoint=url, person=badrole.person) badrole.person.user.last_login = timezone.now() badrole.person.user.save() r = self.client.post(url, {'apikey': badapikey.hash()} ) self.assertContains(r, "Restricted to roles: Recording Manager, Secretariat", status_code=403) r = self.client.post(url, {'apikey': apikey.hash()} ) self.assertContains(r, "Too long since last regular login", status_code=400) recman.user.last_login = timezone.now() recman.user.save() r = self.client.get(url, {'apikey': apikey.hash()} ) self.assertContains(r, "Method not allowed", status_code=405) r = self.client.post(url, {'apikey': apikey.hash()} ) self.assertContains(r, "Missing meeting parameter", status_code=400) r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, } ) self.assertContains(r, "Missing group parameter", status_code=400) r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': group.acronym} ) self.assertContains(r, "Missing item parameter", status_code=400) r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': group.acronym, 'item': '1'} ) self.assertContains(r, "Missing bluesheet parameter", status_code=400) r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': '1', 'group': group.acronym, 'item': '1', 'bluesheet': bluesheet, }) self.assertContains(r, "No sessions found for meeting", status_code=400) r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': 'bogous', 'item': '1', 'bluesheet': bluesheet, }) self.assertContains(r, "No sessions found in meeting '%s' for group 'bogous'"%meeting.number, status_code=400) r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': group.acronym, 'item': '1', 'bluesheet': "foobar", }) self.assertContains(r, "Invalid json value: 'foobar'", status_code=400) r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': group.acronym, 'item': '5', 'bluesheet': bluesheet, }) self.assertContains(r, "No item '5' found in list of sessions for group", status_code=400) r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': group.acronym, 'item': 'foo', 'bluesheet': bluesheet, }) self.assertContains(r, "Expected a numeric value for 'item', found 'foo'", status_code=400) r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': group.acronym, 'item': '1', 'bluesheet': bluesheet, }) self.assertContains(r, "Done", status_code=200) # Submit again, with slightly different content, as an updated version people[1]['affiliation'] = 'Bolaget AB' bluesheet = json.dumps(people) r = self.client.post(url, {'apikey': apikey.hash(), 'meeting': meeting.number, 'group': group.acronym, 'item': '1', 'bluesheet': bluesheet, }) self.assertContains(r, "Done", status_code=200) bluesheet = session.sessionpresentation_set.filter(document__type__slug='bluesheets').first().document # We've submitted an update; check that the rev is right self.assertEqual(bluesheet.rev, '01') # Check the content with open(bluesheet.get_file_name()) as file: text = file.read() for p in people: self.assertIn(p['name'], html.unescape(text)) self.assertIn(p['affiliation'], html.unescape(text)) def test_person_export(self): person = PersonFactory() url = urlreverse('ietf.api.views.PersonalInformationExportView') login_testing_unauthorized(self, person.user.username, url) r = self.client.get(url) jsondata = r.json() data = jsondata['person.person'][str(person.id)] self.assertEqual(data['name'], person.name) self.assertEqual(data['ascii'], person.ascii) self.assertEqual(data['user']['email'], person.user.email) def test_api_v2_person_export_view(self): url = urlreverse('ietf.api.views.ApiV2PersonExportView') robot = PersonFactory(user__is_staff=True) RoleFactory(name_id='robot', person=robot, email=robot.email(), group__acronym='secretariat') apikey = PersonalApiKey.objects.create(endpoint=url, person=robot) # error cases r = self.client.post(url, {}) self.assertContains(r, "Missing apikey parameter", status_code=400) badrole = RoleFactory(group__type_id='ietf', name_id='ad') badapikey = PersonalApiKey.objects.create(endpoint=url, person=badrole.person) badrole.person.user.last_login = timezone.now() badrole.person.user.save() r = self.client.post(url, {'apikey': badapikey.hash()}) self.assertContains(r, "Restricted to role: Robot", status_code=403) r = self.client.post(url, {'apikey': apikey.hash()}) self.assertContains(r, "No filters provided", status_code=400) # working case r = self.client.post(url, {'apikey': apikey.hash(), 'email': robot.email().address, '_expand': 'user'}) self.assertEqual(r.status_code, 200) jsondata = r.json() data = jsondata['person.person'][str(robot.id)] self.assertEqual(data['name'], robot.name) self.assertEqual(data['ascii'], robot.ascii) self.assertEqual(data['user']['email'], robot.user.email) def test_api_new_meeting_registration(self): meeting = MeetingFactory(type_id='ietf') reg = { 'apikey': 'invalid', 'affiliation': "Alguma Corporação", 'country_code': 'PT', 'email': 'foo@example.pt', 'first_name': 'Foo', 'last_name': 'Bar', 'meeting': meeting.number, 'reg_type': 'hackathon', 'ticket_type': '', 'checkedin': 'False', } url = urlreverse('ietf.api.views.api_new_meeting_registration') r = self.client.post(url, reg) self.assertContains(r, 'Invalid apikey', status_code=403) oidcp = PersonFactory(user__is_staff=True) # Make sure 'oidcp' has an acceptable role RoleFactory(name_id='robot', person=oidcp, email=oidcp.email(), group__acronym='secretariat') key = PersonalApiKey.objects.create(person=oidcp, endpoint=url) reg['apikey'] = key.hash() # # Test valid POST # FIXME: sometimes, there seems to be something in the outbox? old_len = len(outbox) r = self.client.post(url, reg) self.assertContains(r, "Accepted, New registration, Email sent", status_code=202) # # Check outgoing mail self.assertEqual(len(outbox), old_len + 1) body = get_payload_text(outbox[-1]) self.assertIn(reg['email'], outbox[-1]['To'] ) self.assertIn(reg['email'], body) self.assertIn('account creation request', body) # # Check record obj = MeetingRegistration.objects.get(email=reg['email'], meeting__number=reg['meeting']) for key in ['affiliation', 'country_code', 'first_name', 'last_name', 'person', 'reg_type', 'ticket_type', 'checkedin']: self.assertEqual(getattr(obj, key), False if key=='checkedin' else reg.get(key) , "Bad data for field '%s'" % key) # # Test with existing user person = PersonFactory() reg['email'] = person.email().address reg['first_name'] = person.first_name() reg['last_name'] = person.last_name() # r = self.client.post(url, reg) self.assertContains(r, "Accepted, New registration", status_code=202) # # There should be no new outgoing mail self.assertEqual(len(outbox), old_len + 1) # # Test multiple reg types reg['reg_type'] = 'remote' reg['ticket_type'] = 'full_week_pass' r = self.client.post(url, reg) self.assertContains(r, "Accepted, New registration", status_code=202) objs = MeetingRegistration.objects.filter(email=reg['email'], meeting__number=reg['meeting']) self.assertEqual(len(objs), 2) self.assertEqual(objs.filter(reg_type='hackathon').count(), 1) self.assertEqual(objs.filter(reg_type='remote', ticket_type='full_week_pass').count(), 1) self.assertEqual(len(outbox), old_len + 1) # # Test incomplete POST drop_fields = ['affiliation', 'first_name', 'reg_type'] for field in drop_fields: del reg[field] r = self.client.post(url, reg) self.assertContains(r, 'Missing parameters:', status_code=400) err, fields = r.content.decode().split(':', 1) missing_fields = [f.strip() for f in fields.split(',')] self.assertEqual(set(missing_fields), set(drop_fields)) def test_api_version(self): url = urlreverse('ietf.api.views.version') r = self.client.get(url) data = r.json() self.assertEqual(data['version'], ietf.__version__+ietf.__patch__) def test_api_appauth(self): url = urlreverse('ietf.api.views.app_auth') person = PersonFactory() apikey = PersonalApiKey.objects.create(endpoint=url, person=person) self.client.login(username=person.user.username,password=f'{person.user.username}+password') self.client.logout() # error cases # missing apikey r = self.client.post(url, {}) self.assertContains(r, 'Missing apikey parameter', status_code=400) # invalid apikey r = self.client.post(url, {'apikey': 'foobar'}) self.assertContains(r, 'Invalid apikey', status_code=403) # working case r = self.client.post(url, {'apikey': apikey.hash()}) self.assertEqual(r.status_code, 200) jsondata = r.json() self.assertEqual(jsondata['success'], True) class TastypieApiTestCase(ResourceTestCaseMixin, TestCase): def __init__(self, *args, **kwargs): self.apps = {} for app_name in settings.INSTALLED_APPS: if app_name.startswith('ietf') and not app_name in OMITTED_APPS: app = import_module(app_name) name = app_name.split('.',1)[-1] models_path = os.path.join(os.path.dirname(app.__file__), "models.py") if os.path.exists(models_path): self.apps[name] = app_name super(TastypieApiTestCase, self).__init__(*args, **kwargs) def test_api_top_level(self): client = Client(Accept='application/json') r = client.get("/api/v1/") self.assertValidJSONResponse(r) resource_list = r.json() for name in self.apps: if not name in self.apps: sys.stderr.write("Expected a REST API resource for %s, but didn't find one\n" % name) for name in self.apps: self.assertIn(name, resource_list, "Expected a REST API resource for %s, but didn't find one" % name) def test_all_model_resources_exist(self): client = Client(Accept='application/json') r = client.get("/api/v1") top = r.json() for name in self.apps: app_name = self.apps[name] app = import_module(app_name) self.assertEqual("/api/v1/%s/"%name, top[name]["list_endpoint"]) r = client.get(top[name]["list_endpoint"]) self.assertValidJSONResponse(r) app_resources = r.json() # model_list = apps.get_app_config(name).get_models() for model in model_list: if not model._meta.model_name in list(app_resources.keys()): #print("There doesn't seem to be any resource for model %s.models.%s"%(app.__name__,model.__name__,)) self.assertIn(model._meta.model_name, list(app_resources.keys()), "There doesn't seem to be any API resource for model %s.models.%s"%(app.__name__,model.__name__,))