feat: endpoint for imapd to authenticate against (#5295)
* feat: endpoint for imapd to authenticate against * chore: remove unintended whitespace * fix: be stricter in matching User
This commit is contained in:
parent
f810eda851
commit
2fe4647832
15
ietf/api/ietf_utils.py
Normal file
15
ietf/api/ietf_utils.py
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
# Copyright The IETF Trust 2023, All Rights Reserved
|
||||||
|
|
||||||
|
# This is not utils.py because Tastypie implicitly consumes ietf.api.utils.
|
||||||
|
# See ietf.api.__init__.py for details.
|
||||||
|
|
||||||
|
from django.conf import settings
|
||||||
|
|
||||||
|
def is_valid_token(endpoint, token):
|
||||||
|
# This is where we would consider integration with vault
|
||||||
|
# Settings implementation for now.
|
||||||
|
if hasattr(settings, "APP_API_TOKENS"):
|
||||||
|
token_store = settings.APP_API_TOKENS
|
||||||
|
if endpoint in token_store and token in token_store[endpoint]:
|
||||||
|
return True
|
||||||
|
return False
|
|
@ -13,6 +13,7 @@ from pathlib import Path
|
||||||
from django.apps import apps
|
from django.apps import apps
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.test import Client
|
from django.test import Client
|
||||||
|
from django.test.utils import override_settings
|
||||||
from django.urls import reverse as urlreverse
|
from django.urls import reverse as urlreverse
|
||||||
from django.utils import timezone
|
from django.utils import timezone
|
||||||
|
|
||||||
|
@ -530,6 +531,101 @@ class CustomApiTests(TestCase):
|
||||||
jsondata = r.json()
|
jsondata = r.json()
|
||||||
self.assertEqual(jsondata['success'], True)
|
self.assertEqual(jsondata['success'], True)
|
||||||
|
|
||||||
|
class DirectAuthApiTests(TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super().setUp()
|
||||||
|
self.valid_token = "nSZJDerbau6WZwbEAYuQ"
|
||||||
|
self.invalid_token = self.valid_token
|
||||||
|
while self.invalid_token == self.valid_token:
|
||||||
|
self.invalid_token = User.objects.make_random_password(20)
|
||||||
|
self.url = urlreverse("ietf.api.views.directauth")
|
||||||
|
self.valid_person = PersonFactory()
|
||||||
|
self.valid_password = self.valid_person.user.username+"+password"
|
||||||
|
self.invalid_password = self.valid_password
|
||||||
|
while self.invalid_password == self.valid_password:
|
||||||
|
self.invalid_password = User.objects.make_random_password(20)
|
||||||
|
|
||||||
|
self.valid_body_with_good_password = self.post_dict(authtoken=self.valid_token, username=self.valid_person.user.username, password=self.valid_password)
|
||||||
|
self.valid_body_with_bad_password = self.post_dict(authtoken=self.valid_token, username=self.valid_person.user.username, password=self.invalid_password)
|
||||||
|
self.valid_body_with_unknown_user = self.post_dict(authtoken=self.valid_token, username="notauser@nowhere.nada", password=self.valid_password)
|
||||||
|
|
||||||
|
def post_dict(self, authtoken, username, password):
|
||||||
|
data = dict()
|
||||||
|
if authtoken is not None:
|
||||||
|
data["authtoken"] = authtoken
|
||||||
|
if username is not None:
|
||||||
|
data["username"] = username
|
||||||
|
if password is not None:
|
||||||
|
data["password"] = password
|
||||||
|
return dict(data = json.dumps(data))
|
||||||
|
|
||||||
|
def response_data(self, response):
|
||||||
|
try:
|
||||||
|
data = json.loads(response.content)
|
||||||
|
except json.decoder.JSONDecodeError:
|
||||||
|
data = None
|
||||||
|
self.assertIsNotNone(data)
|
||||||
|
return data
|
||||||
|
|
||||||
|
def test_bad_methods(self):
|
||||||
|
for method in (self.client.get, self.client.put, self.client.head, self.client.delete, self.client.patch):
|
||||||
|
r = method(self.url)
|
||||||
|
self.assertEqual(r.status_code, 405)
|
||||||
|
|
||||||
|
def test_bad_post(self):
|
||||||
|
for bad in [
|
||||||
|
self.post_dict(authtoken=None, username=self.valid_person.user.username, password=self.valid_password),
|
||||||
|
self.post_dict(authtoken=self.valid_token, username=None, password=self.valid_password),
|
||||||
|
self.post_dict(authtoken=self.valid_token, username=self.valid_person.user.username, password=None),
|
||||||
|
self.post_dict(authtoken=None, username=None, password=self.valid_password),
|
||||||
|
self.post_dict(authtoken=self.valid_token, username=None, password=None),
|
||||||
|
self.post_dict(authtoken=None, username=self.valid_person.user.username, password=None),
|
||||||
|
self.post_dict(authtoken=None, username=None, password=None),
|
||||||
|
]:
|
||||||
|
r = self.client.post(self.url, bad)
|
||||||
|
self.assertEqual(r.status_code, 200)
|
||||||
|
data = self.response_data(r)
|
||||||
|
self.assertEqual(data["result"], "failure")
|
||||||
|
self.assertEqual(data["reason"], "invalid post")
|
||||||
|
|
||||||
|
bad = dict(authtoken=self.valid_token, username=self.valid_person.user.username, password=self.valid_password)
|
||||||
|
r = self.client.post(self.url, bad)
|
||||||
|
self.assertEqual(r.status_code, 200)
|
||||||
|
data = self.response_data(r)
|
||||||
|
self.assertEqual(data["result"], "failure")
|
||||||
|
self.assertEqual(data["reason"], "invalid post")
|
||||||
|
|
||||||
|
def test_notokenstore(self):
|
||||||
|
self.assertFalse(hasattr(settings, "APP_API_TOKENS"))
|
||||||
|
r = self.client.post(self.url,self.valid_body_with_good_password)
|
||||||
|
self.assertEqual(r.status_code, 200)
|
||||||
|
data = self.response_data(r)
|
||||||
|
self.assertEqual(data["result"], "failure")
|
||||||
|
self.assertEqual(data["reason"], "invalid authtoken")
|
||||||
|
|
||||||
|
@override_settings(APP_API_TOKENS={"ietf.api.views.directauth":"nSZJDerbau6WZwbEAYuQ"})
|
||||||
|
def test_bad_username(self):
|
||||||
|
r = self.client.post(self.url, self.valid_body_with_unknown_user)
|
||||||
|
self.assertEqual(r.status_code, 200)
|
||||||
|
data = self.response_data(r)
|
||||||
|
self.assertEqual(data["result"], "failure")
|
||||||
|
self.assertEqual(data["reason"], "authentication failed")
|
||||||
|
|
||||||
|
@override_settings(APP_API_TOKENS={"ietf.api.views.directauth":"nSZJDerbau6WZwbEAYuQ"})
|
||||||
|
def test_bad_password(self):
|
||||||
|
r = self.client.post(self.url, self.valid_body_with_bad_password)
|
||||||
|
self.assertEqual(r.status_code, 200)
|
||||||
|
data = self.response_data(r)
|
||||||
|
self.assertEqual(data["result"], "failure")
|
||||||
|
self.assertEqual(data["reason"], "authentication failed")
|
||||||
|
|
||||||
|
@override_settings(APP_API_TOKENS={"ietf.api.views.directauth":"nSZJDerbau6WZwbEAYuQ"})
|
||||||
|
def test_good_password(self):
|
||||||
|
r = self.client.post(self.url, self.valid_body_with_good_password)
|
||||||
|
self.assertEqual(r.status_code, 200)
|
||||||
|
data = self.response_data(r)
|
||||||
|
self.assertEqual(data["result"], "success")
|
||||||
|
|
||||||
class TastypieApiTestCase(ResourceTestCaseMixin, TestCase):
|
class TastypieApiTestCase(ResourceTestCaseMixin, TestCase):
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
|
|
|
@ -58,6 +58,8 @@ urlpatterns = [
|
||||||
# latest versions
|
# latest versions
|
||||||
url(r'^rfcdiff-latest-json/%(name)s(?:-%(rev)s)?(\.txt|\.html)?/?$' % settings.URL_REGEXPS, api_views.rfcdiff_latest_json),
|
url(r'^rfcdiff-latest-json/%(name)s(?:-%(rev)s)?(\.txt|\.html)?/?$' % settings.URL_REGEXPS, api_views.rfcdiff_latest_json),
|
||||||
url(r'^rfcdiff-latest-json/(?P<name>[Rr][Ff][Cc] [0-9]+?)(\.txt|\.html)?/?$', api_views.rfcdiff_latest_json),
|
url(r'^rfcdiff-latest-json/(?P<name>[Rr][Ff][Cc] [0-9]+?)(\.txt|\.html)?/?$', api_views.rfcdiff_latest_json),
|
||||||
|
# direct authentication
|
||||||
|
url(r'^directauth/?$', api_views.directauth),
|
||||||
]
|
]
|
||||||
|
|
||||||
# Additional (standard) Tastypie endpoints
|
# Additional (standard) Tastypie endpoints
|
||||||
|
|
|
@ -9,6 +9,7 @@ import re
|
||||||
from jwcrypto.jwk import JWK
|
from jwcrypto.jwk import JWK
|
||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
|
from django.contrib.auth import authenticate
|
||||||
from django.contrib.auth.decorators import login_required
|
from django.contrib.auth.decorators import login_required
|
||||||
from django.contrib.auth.models import User
|
from django.contrib.auth.models import User
|
||||||
from django.core.exceptions import ValidationError
|
from django.core.exceptions import ValidationError
|
||||||
|
@ -32,6 +33,7 @@ import ietf
|
||||||
from ietf.person.models import Person, Email
|
from ietf.person.models import Person, Email
|
||||||
from ietf.api import _api_list
|
from ietf.api import _api_list
|
||||||
from ietf.api.serializer import JsonExportMixin
|
from ietf.api.serializer import JsonExportMixin
|
||||||
|
from ietf.api.ietf_utils import is_valid_token
|
||||||
from ietf.doc.utils import fuzzy_find_documents
|
from ietf.doc.utils import fuzzy_find_documents
|
||||||
from ietf.ietfauth.views import send_account_creation_email
|
from ietf.ietfauth.views import send_account_creation_email
|
||||||
from ietf.ietfauth.utils import role_required
|
from ietf.ietfauth.utils import role_required
|
||||||
|
@ -388,3 +390,42 @@ def rfcdiff_latest_json(request, name, rev=None):
|
||||||
if not response:
|
if not response:
|
||||||
raise Http404
|
raise Http404
|
||||||
return HttpResponse(json.dumps(response), content_type='application/json')
|
return HttpResponse(json.dumps(response), content_type='application/json')
|
||||||
|
|
||||||
|
@csrf_exempt
|
||||||
|
def directauth(request):
|
||||||
|
if request.method == "POST":
|
||||||
|
raw_data = request.POST.get("data", None)
|
||||||
|
if raw_data:
|
||||||
|
try:
|
||||||
|
data = json.loads(raw_data)
|
||||||
|
except json.decoder.JSONDecodeError:
|
||||||
|
data = None
|
||||||
|
|
||||||
|
if raw_data is None or data is None:
|
||||||
|
return HttpResponse(json.dumps(dict(result="failure",reason="invalid post")), content_type='application/json')
|
||||||
|
|
||||||
|
authtoken = data.get('authtoken', None)
|
||||||
|
username = data.get('username', None)
|
||||||
|
password = data.get('password', None)
|
||||||
|
|
||||||
|
if any([item is None for item in (authtoken, username, password)]):
|
||||||
|
return HttpResponse(json.dumps(dict(result="failure",reason="invalid post")), content_type='application/json')
|
||||||
|
|
||||||
|
if not is_valid_token("ietf.api.views.directauth", authtoken):
|
||||||
|
return HttpResponse(json.dumps(dict(result="failure",reason="invalid authtoken")), content_type='application/json')
|
||||||
|
|
||||||
|
user_query = User.objects.filter(username__iexact=username)
|
||||||
|
|
||||||
|
# Matching email would be consistent with auth everywhere else in the app, but until we can map users well
|
||||||
|
# in the imap server, people's annotations are associated with a very specific login.
|
||||||
|
# If we get a second user of this API, add an "allow_any_email" argument.
|
||||||
|
|
||||||
|
|
||||||
|
# Note well that we are using user.username, not what was passed to the API.
|
||||||
|
if user_query.count() == 1 and authenticate(username = user_query.first().username, password = password):
|
||||||
|
return HttpResponse(json.dumps(dict(result="success")), content_type='application/json')
|
||||||
|
|
||||||
|
return HttpResponse(json.dumps(dict(result="failure", reason="authentication failed")), content_type='application/json')
|
||||||
|
|
||||||
|
else:
|
||||||
|
return HttpResponse(status=405)
|
||||||
|
|
Loading…
Reference in a new issue