Added an API endpoint to support automation of IESG ballot position posting, at /api/iesg/position. Added tests for the API endpoint, and updated the apikey validation decorator tests. Tweaked the decorator to handle a weakness found during testing.

- Legacy-Id: 14429
This commit is contained in:
Henrik Levkowetz 2017-12-17 21:55:27 +00:00
parent b0863c8963
commit a08c8dc76f
4 changed files with 166 additions and 76 deletions

View file

@ -12,7 +12,7 @@ from ietf.doc.factories import DocumentFactory
from ietf.group.models import Group, Role
from ietf.name.models import BallotPositionName
from ietf.iesg.models import TelechatDate
from ietf.person.models import Person
from ietf.person.models import Person, PersonalApiKey
from ietf.utils.test_utils import TestCase, unicontent
from ietf.utils.mail import outbox, empty_outbox
from ietf.utils.test_data import make_test_data
@ -82,6 +82,65 @@ class EditPositionTests(TestCase):
self.assertEqual(draft.docevent_set.count(), events_before + 2)
self.assertTrue("Ballot comment text updated" in pos.desc)
def test_api_set_position(self):
draft = make_test_data()
url = urlreverse('ietf.doc.views_ballot.api_set_position')
ad = Person.objects.get(name="Areað Irector")
ad.user.last_login = datetime.datetime.now()
ad.user.save()
apikey = PersonalApiKey.objects.create(endpoint=url, person=ad)
# vote
events_before = draft.docevent_set.count()
r = self.client.post(url, dict(
apikey=apikey.hash(),
doc=draft.name,
position="discuss",
discuss=" This is a discussion test. \n ",
comment=" This is a test. \n ")
)
self.assertEqual(r.status_code, 200)
pos = draft.latest_event(BallotPositionDocEvent, ad=ad)
self.assertEqual(pos.pos.slug, "discuss")
self.assertTrue(" This is a discussion test." in pos.discuss)
self.assertTrue(pos.discuss_time != None)
self.assertTrue(" This is a test." in pos.comment)
self.assertTrue(pos.comment_time != None)
self.assertTrue("New position" in pos.desc)
self.assertEqual(draft.docevent_set.count(), events_before + 3)
# recast vote
events_before = draft.docevent_set.count()
r = self.client.post(url, dict(apikey=apikey.hash(), doc=draft.name, position="noobj"))
self.assertEqual(r.status_code, 200)
pos = draft.latest_event(BallotPositionDocEvent, ad=ad)
self.assertEqual(pos.pos.slug, "noobj")
self.assertEqual(draft.docevent_set.count(), events_before + 1)
self.assertTrue("Position for" in pos.desc)
# clear vote
events_before = draft.docevent_set.count()
r = self.client.post(url, dict(apikey=apikey.hash(), doc=draft.name, position="norecord"))
self.assertEqual(r.status_code, 200)
pos = draft.latest_event(BallotPositionDocEvent, ad=ad)
self.assertEqual(pos.pos.slug, "norecord")
self.assertEqual(draft.docevent_set.count(), events_before + 1)
self.assertTrue("Position for" in pos.desc)
# change comment
events_before = draft.docevent_set.count()
r = self.client.post(url, dict(apikey=apikey.hash(), doc=draft.name, position="norecord", comment="New comment."))
self.assertEqual(r.status_code, 200)
pos = draft.latest_event(BallotPositionDocEvent, ad=ad)
self.assertEqual(pos.pos.slug, "norecord")
self.assertEqual(draft.docevent_set.count(), events_before + 2)
self.assertTrue("Ballot comment text updated" in pos.desc)
def test_edit_position_as_secretary(self):
draft = make_test_data()
url = urlreverse('ietf.doc.views_ballot.edit_position', kwargs=dict(name=draft.name,

View file

@ -7,6 +7,7 @@ from django import forms
from django.conf import settings
from django.http import HttpResponse, HttpResponseForbidden, HttpResponseRedirect, Http404
from django.shortcuts import render, get_object_or_404, redirect
from django.template.defaultfilters import striptags
from django.template.loader import render_to_string
from django.urls import reverse as urlreverse
from django.views.decorators.csrf import csrf_exempt
@ -108,6 +109,74 @@ class EditPositionForm(forms.Form):
raise forms.ValidationError("You must enter a non-empty discuss")
return entered_discuss
def save_position(form, doc, ballot, ad, login=None):
# save the vote
if login is None:
login = ad
clean = form.cleaned_data
old_pos = doc.latest_event(BallotPositionDocEvent, type="changed_ballot_position", ad=ad, ballot=ballot)
pos = BallotPositionDocEvent(doc=doc, rev=doc.rev, by=login)
pos.type = "changed_ballot_position"
pos.ballot = ballot
pos.ad = ad
pos.pos = clean["position"]
pos.comment = clean["comment"].rstrip()
pos.comment_time = old_pos.comment_time if old_pos else None
pos.discuss = clean["discuss"].rstrip()
if not pos.pos.blocking:
pos.discuss = ""
pos.discuss_time = old_pos.discuss_time if old_pos else None
changes = []
added_events = []
# possibly add discuss/comment comments to history trail
# so it's easy to see what's happened
old_comment = old_pos.comment if old_pos else ""
if pos.comment != old_comment:
pos.comment_time = pos.time
changes.append("comment")
if pos.comment:
e = DocEvent(doc=doc, rev=doc.rev)
e.by = ad # otherwise we can't see who's saying it
e.type = "added_comment"
e.desc = "[Ballot comment]\n" + pos.comment
added_events.append(e)
old_discuss = old_pos.discuss if old_pos else ""
if pos.discuss != old_discuss:
pos.discuss_time = pos.time
changes.append("discuss")
if pos.pos.blocking:
e = DocEvent(doc=doc, rev=doc.rev, by=login)
e.by = ad # otherwise we can't see who's saying it
e.type = "added_comment"
e.desc = "[Ballot %s]\n" % pos.pos.name.lower()
e.desc += pos.discuss
added_events.append(e)
# figure out a description
if not old_pos and pos.pos.slug != "norecord":
pos.desc = u"[Ballot Position Update] New position, %s, has been recorded for %s" % (pos.pos.name, pos.ad.plain_name())
elif old_pos and pos.pos != old_pos.pos:
pos.desc = "[Ballot Position Update] Position for %s has been changed to %s from %s" % (pos.ad.plain_name(), pos.pos.name, old_pos.pos.name)
if not pos.desc and changes:
pos.desc = u"Ballot %s text updated for %s" % (u" and ".join(changes), ad.plain_name())
# only add new event if we actually got a change
if pos.desc:
if login != ad:
pos.desc += u" by %s" % login.plain_name()
pos.save()
for e in added_events:
e.save() # save them after the position is saved to get later id for sorting order
@role_required('Area Director','Secretariat')
def edit_position(request, name, ballot_id):
"""Vote and edit discuss and comment on document as Area Director."""
@ -128,8 +197,6 @@ def edit_position(request, name, ballot_id):
raise Http404
ad = get_object_or_404(Person, pk=ad_id)
old_pos = doc.latest_event(BallotPositionDocEvent, type="changed_ballot_position", ad=ad, ballot=ballot)
if request.method == 'POST':
if not has_role(request.user, "Secretariat") and not ad.role_set.filter(name="ad", group__type="area", group__state="active"):
# prevent pre-ADs from voting
@ -137,68 +204,7 @@ def edit_position(request, name, ballot_id):
form = EditPositionForm(request.POST, ballot_type=ballot.ballot_type)
if form.is_valid():
# save the vote
clean = form.cleaned_data
pos = BallotPositionDocEvent(doc=doc, rev=doc.rev, by=login)
pos.type = "changed_ballot_position"
pos.ballot = ballot
pos.ad = ad
pos.pos = clean["position"]
pos.comment = clean["comment"].rstrip()
pos.comment_time = old_pos.comment_time if old_pos else None
pos.discuss = clean["discuss"].rstrip()
if not pos.pos.blocking:
pos.discuss = ""
pos.discuss_time = old_pos.discuss_time if old_pos else None
changes = []
added_events = []
# possibly add discuss/comment comments to history trail
# so it's easy to see what's happened
old_comment = old_pos.comment if old_pos else ""
if pos.comment != old_comment:
pos.comment_time = pos.time
changes.append("comment")
if pos.comment:
e = DocEvent(doc=doc, rev=doc.rev)
e.by = ad # otherwise we can't see who's saying it
e.type = "added_comment"
e.desc = "[Ballot comment]\n" + pos.comment
added_events.append(e)
old_discuss = old_pos.discuss if old_pos else ""
if pos.discuss != old_discuss:
pos.discuss_time = pos.time
changes.append("discuss")
if pos.pos.blocking:
e = DocEvent(doc=doc, rev=doc.rev, by=login)
e.by = ad # otherwise we can't see who's saying it
e.type = "added_comment"
e.desc = "[Ballot %s]\n" % pos.pos.name.lower()
e.desc += pos.discuss
added_events.append(e)
# figure out a description
if not old_pos and pos.pos.slug != "norecord":
pos.desc = u"[Ballot Position Update] New position, %s, has been recorded for %s" % (pos.pos.name, pos.ad.plain_name())
elif old_pos and pos.pos != old_pos.pos:
pos.desc = "[Ballot Position Update] Position for %s has been changed to %s from %s" % (pos.ad.plain_name(), pos.pos.name, old_pos.pos.name)
if not pos.desc and changes:
pos.desc = u"Ballot %s text updated for %s" % (u" and ".join(changes), ad.plain_name())
# only add new event if we actually got a change
if pos.desc:
if login != ad:
pos.desc += u" by %s" % login.plain_name()
pos.save()
for e in added_events:
e.save() # save them after the position is saved to get later id for sorting order
save_position(form, doc, ballot, ad, login)
if request.POST.get("send_mail"):
qstr=""
@ -213,6 +219,7 @@ def edit_position(request, name, ballot_id):
return HttpResponseRedirect(return_to_url)
else:
initial = {}
old_pos = doc.latest_event(BallotPositionDocEvent, type="changed_ballot_position", ad=ad, ballot=ballot)
if old_pos:
initial['position'] = old_pos.pos.slug
initial['discuss'] = old_pos.discuss
@ -240,10 +247,37 @@ def edit_position(request, name, ballot_id):
@role_required('Area Director', 'Secretariat')
@csrf_exempt
def api_set_position(request):
def err(code, text):
return HttpResponse(text, status=code, content_type='text/plain')
if request.method == 'POST':
pass
ad = request.user.person
name = request.POST.get('doc')
if not name:
return err(400, "Missing document name")
try:
doc = Document.objects.get(docalias__name=name)
except Document.DoesNotExist:
return err(404, "Document not found")
position_names = BallotPositionName.objects.values_list('slug', flat=True)
position = request.POST.get('position')
if not position:
return err(400, "Missing parameter: position, one of: %s " % ','.join(position_names))
if not position in position_names:
return err(400, "Bad position name, must be one of: %s " % ','.join(position_names))
ballot = doc.active_ballot()
if not ballot:
return err(404, "No open ballot found")
form = EditPositionForm(request.POST, ballot_type=ballot.ballot_type)
if form.is_valid():
save_position(form, doc, ballot, ad)
else:
debug.type('form.errors')
debug.show('form.errors')
errors = form.errors
summary = ','.join([ "%s: %s" % (f, striptags(errors[f])) for f in errors ])
return err(400, "Form not valid: %s" % summary)
else:
return HttpResponse("Method not allowed", status=405, content_type='text/plain')
return err(405, "Method not allowed")
return HttpResponse("Done", status=200, content_type='text/plain')

View file

@ -1,5 +1,6 @@
# Copyright The IETF Trust 2017, All Rights Reserved
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from __future__ import unicode_literals, print_function
import os, shutil, time, datetime
from urlparse import urlsplit
@ -101,7 +102,7 @@ class IetfAuthTests(TestCase):
if l.startswith(username + ":"):
return True
with open(settings.HTPASSWD_FILE) as f:
print f.read()
print(f.read())
return False
@ -552,7 +553,7 @@ class IetfAuthTests(TestCase):
self.assertEqual(len(q('td code')), len(PERSON_API_KEY_ENDPOINTS)) # key hash
self.assertEqual(len(q('td a:contains("Disable")')), len(PERSON_API_KEY_ENDPOINTS)-1)
def test_apikey_usage(self):
def test_apikey_errors(self):
BAD_KEY = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
person = PersonFactory()
@ -571,10 +572,6 @@ class IetfAuthTests(TestCase):
for key in person.apikeys.all()[:3]:
url = key.endpoint
# successful access
r = self.client.post(url, {'apikey':key.hash(), 'dummy':'dummy',})
self.assertEqual(r.status_code, 200)
# bad method
r = self.client.put(url, {'apikey':key.hash()})
self.assertEqual(r.status_code, 405)
@ -638,7 +635,7 @@ class IetfAuthTests(TestCase):
self.assertEqual(len(outbox), len(PERSON_API_KEY_ENDPOINTS))
for mail in outbox:
body = mail.get_payload()
body = mail.get_payload(decode=True).decode('utf-8')
self.assertIn("API key usage", mail['subject'])
self.assertIn(" %s times" % count, body)
self.assertIn(date, body)

View file

@ -60,7 +60,7 @@ def require_api_key(f, request, *args, **kwargs):
person = key.person
last_login = person.user.last_login
time_limit = (datetime.datetime.now() - datetime.timedelta(days=settings.UTILS_APIKEY_GUI_LOGIN_LIMIT_DAYS))
if last_login < time_limit:
if last_login == None or last_login < time_limit:
return err(400, "Too long since last regular login")
# Log in
login(request, person.user)