refactor: Implement require_api_key with functools.wraps
The @decorator mechanism does not seem to work with @method_decorator in Django 4.0, have not tracked down why.
This commit is contained in:
parent
223c679942
commit
8cf609bfa9
ietf/utils
|
@ -5,6 +5,7 @@
|
||||||
import datetime
|
import datetime
|
||||||
|
|
||||||
from decorator import decorator, decorate
|
from decorator import decorator, decorate
|
||||||
|
from functools import wraps
|
||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.contrib.auth import login
|
from django.contrib.auth import login
|
||||||
|
@ -39,52 +40,54 @@ def person_required(f, request, *args, **kwargs):
|
||||||
return render(request, 'registration/missing_person.html')
|
return render(request, 'registration/missing_person.html')
|
||||||
return f(request, *args, **kwargs)
|
return f(request, *args, **kwargs)
|
||||||
|
|
||||||
@decorator
|
|
||||||
def require_api_key(f, request, *args, **kwargs):
|
def require_api_key(f):
|
||||||
|
@wraps(f)
|
||||||
def err(code, text):
|
def _wrapper(request, *args, **kwargs):
|
||||||
return HttpResponse(text, status=code, content_type='text/plain')
|
def err(code, text):
|
||||||
# Check method and get hash
|
return HttpResponse(text, status=code, content_type='text/plain')
|
||||||
if request.method == 'POST':
|
# Check method and get hash
|
||||||
hash = request.POST.get('apikey')
|
if request.method == 'POST':
|
||||||
elif request.method == 'GET':
|
hash = request.POST.get('apikey')
|
||||||
hash = request.GET.get('apikey')
|
elif request.method == 'GET':
|
||||||
else:
|
hash = request.GET.get('apikey')
|
||||||
return err(405, "Method not allowed")
|
else:
|
||||||
if not hash:
|
return err(405, "Method not allowed")
|
||||||
return err(400, "Missing apikey parameter")
|
if not hash:
|
||||||
# Check hash
|
return err(400, "Missing apikey parameter")
|
||||||
key = PersonalApiKey.validate_key(force_bytes(hash))
|
# Check hash
|
||||||
if not key:
|
key = PersonalApiKey.validate_key(force_bytes(hash))
|
||||||
return err(403, "Invalid apikey")
|
if not key:
|
||||||
# Check endpoint
|
return err(403, "Invalid apikey")
|
||||||
urlpath = request.META.get('PATH_INFO')
|
# Check endpoint
|
||||||
if not (urlpath and urlpath == key.endpoint):
|
urlpath = request.META.get('PATH_INFO')
|
||||||
return err(400, "Apikey endpoint mismatch")
|
if not (urlpath and urlpath == key.endpoint):
|
||||||
# Check time since regular login
|
return err(400, "Apikey endpoint mismatch")
|
||||||
person = key.person
|
# Check time since regular login
|
||||||
last_login = person.user.last_login
|
person = key.person
|
||||||
if not person.user.is_staff:
|
last_login = person.user.last_login
|
||||||
time_limit = (timezone.now() - datetime.timedelta(days=settings.UTILS_APIKEY_GUI_LOGIN_LIMIT_DAYS))
|
if not person.user.is_staff:
|
||||||
if last_login == None or last_login < time_limit:
|
time_limit = (timezone.now() - datetime.timedelta(days=settings.UTILS_APIKEY_GUI_LOGIN_LIMIT_DAYS))
|
||||||
return err(400, "Too long since last regular login")
|
if last_login == None or last_login < time_limit:
|
||||||
# Log in
|
return err(400, "Too long since last regular login")
|
||||||
login(request, person.user)
|
# Log in
|
||||||
# restore the user.last_login field, so it reflects only gui logins
|
login(request, person.user)
|
||||||
person.user.last_login = last_login
|
# restore the user.last_login field, so it reflects only gui logins
|
||||||
person.user.save()
|
person.user.last_login = last_login
|
||||||
# Update stats
|
person.user.save()
|
||||||
key.count += 1
|
# Update stats
|
||||||
key.latest = timezone.now()
|
key.count += 1
|
||||||
key.save()
|
key.latest = timezone.now()
|
||||||
PersonApiKeyEvent.objects.create(person=person, type='apikey_login', key=key, desc="Logged in with key ID %s, endpoint %s" % (key.id, key.endpoint))
|
key.save()
|
||||||
# Execute decorated function
|
PersonApiKeyEvent.objects.create(person=person, type='apikey_login', key=key, desc="Logged in with key ID %s, endpoint %s" % (key.id, key.endpoint))
|
||||||
try:
|
# Execute decorated function
|
||||||
ret = f(request, *args, **kwargs)
|
try:
|
||||||
except AttributeError as e:
|
ret = f(request, *args, **kwargs)
|
||||||
log.log("Bad API call: args: %s, kwargs: %s, exception: %s" % (args, kwargs, e))
|
except AttributeError as e:
|
||||||
return err(400, "Bad or missing parameters")
|
log.log("Bad API call: args: %s, kwargs: %s, exception: %s" % (args, kwargs, e))
|
||||||
return ret
|
return err(400, "Bad or missing parameters")
|
||||||
|
return ret
|
||||||
|
return _wrapper
|
||||||
|
|
||||||
|
|
||||||
def _memoize(func, self, *args, **kwargs):
|
def _memoize(func, self, *args, **kwargs):
|
||||||
|
|
|
@ -11,7 +11,9 @@ from django.utils.encoding import force_str
|
||||||
from django.views.generic import View
|
from django.views.generic import View
|
||||||
|
|
||||||
def url(regex, view, kwargs=None, name=None):
|
def url(regex, view, kwargs=None, name=None):
|
||||||
if callable(view) and hasattr(view, '__name__'):
|
if hasattr(view, "view_class"):
|
||||||
|
view_name = "%s.%s" % (view.__module__, view.view_class.__name__)
|
||||||
|
elif callable(view) and hasattr(view, '__name__'):
|
||||||
view_name = "%s.%s" % (view.__module__, view.__name__)
|
view_name = "%s.%s" % (view.__module__, view.__name__)
|
||||||
else:
|
else:
|
||||||
view_name = regex
|
view_name = regex
|
||||||
|
|
Loading…
Reference in a new issue