import uuid
from django.conf import settings
from django.contrib.auth import get_user_model
from django.core.cache import cache
from django.core.exceptions import ImproperlyConfigured, ValidationError
from django.http import HttpResponseRedirect, HttpResponseNotFound
from django.shortcuts import render
from django.urls import reverse_lazy
from django.utils.http import urlsafe_base64_decode
from django.utils.translation import gettext_lazy as _
from rest_framework import status
from rest_framework.generics import GenericAPIView
from rest_framework.permissions import AllowAny
from rest_framework.response import Response
from rest_framework.throttling import AnonRateThrottle, UserRateThrottle
from rest_framework_simplejwt.exceptions import InvalidToken
from jwt_allauth.app_settings import PasswordResetSerializer
from jwt_allauth.constants import (
PASS_RESET, PASSWORD_RESET_REDIRECT, FOR_USER,
ONE_TIME_PERMISSION, PASS_SET_ACCESS, PASS_RESET_ACCESS, PASS_RESET_COOKIE,
SET_PASSWORD_COOKIE,
MFA_TOKEN_MAX_AGE_SECONDS,
MFA_TOTP_DISABLED,
MFA_TOTP_REQUIRED,
)
from jwt_allauth.password_reset.permissions import ResetPasswordPermission, SetPasswordPermission
from jwt_allauth.password_reset.serializers import SetPasswordSerializer
from jwt_allauth.tokens.app_settings import RefreshToken
from jwt_allauth.tokens.models import GenericTokenModel, RefreshTokenWhitelistModel
from jwt_allauth.tokens.serializers import GenericTokenModelSerializer
from jwt_allauth.tokens.tokens import GenericToken
from jwt_allauth.utils import get_user_agent, sensitive_post_parameters_m, build_token_response
[docs]
def get_mfa_totp_mode() -> str:
"""
Return the current MFA TOTP mode from settings.
This must be evaluated at call time (not import time) so that
Django's `override_settings` used in tests – and any runtime changes
– are respected.
"""
return getattr(settings, "JWT_ALLAUTH_MFA_TOTP_MODE", MFA_TOTP_DISABLED)
[docs]
class PasswordResetView(GenericAPIView):
"""
Calls Django Auth PasswordResetForm save method.
Accepts the following POST parameters: email
Returns the success/fail message.
"""
serializer_class = PasswordResetSerializer
permission_classes = (AllowAny,)
throttle_classes = [AnonRateThrottle]
@get_user_agent
def post(self, request):
# Create a serializer with request.data
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
serializer.save()
# Return the success message with OK HTTP status
return Response(
{"detail": _("Password reset e-mail has been sent.")},
status=status.HTTP_200_OK
)
[docs]
class DefaultPasswordResetView(GenericAPIView):
"""
Default view for password reset form.
"""
permission_classes = (AllowAny,)
template_name = 'password/reset.html'
[docs]
def get(self, request):
return render(request, self.template_name, {
'validlink': PASS_RESET_COOKIE in request.COOKIES,
'form': None
})
[docs]
class DefaultSetPasswordView(GenericAPIView):
"""
Default view for admin-managed registration password set form.
This renders a minimal HTML UI that posts to the API-based SetPasswordView
(rest_set_password) and relies on the SET_PASSWORD_COOKIE for authorization.
"""
permission_classes = (AllowAny,)
template_name = 'password/set.html'
[docs]
def get(self, request):
return render(request, self.template_name, {
'validlink': SET_PASSWORD_COOKIE in request.COOKIES,
})
[docs]
class PasswordResetConfirmView(GenericAPIView):
form_url = getattr(settings, PASSWORD_RESET_REDIRECT, None)
@get_user_agent
def get(self, *_, **kwargs):
if "uidb64" not in kwargs or "token" not in kwargs:
raise ImproperlyConfigured(
"The URL path must contain 'uidb64' and 'token' parameters."
)
user = self.get_user(kwargs["uidb64"])
if user is not None:
if GenericToken(request=self.request, purpose=PASS_RESET).check_token(user, kwargs["token"]):
refresh_token = RefreshToken()
refresh_token[FOR_USER] = user.id
refresh_token[ONE_TIME_PERMISSION] = PASS_RESET_ACCESS
access_token = refresh_token.access_token
response = HttpResponseRedirect(
self.form_url if self.form_url else reverse_lazy('default_password_reset')
)
response.set_cookie(
key=PASS_RESET_COOKIE,
value=str(access_token),
httponly=getattr(settings, 'PASSWORD_RESET_COOKIE_HTTP_ONLY', True),
secure=getattr(settings, 'PASSWORD_RESET_COOKIE_SECURE', not settings.DEBUG),
samesite=getattr(settings, 'PASSWORD_RESET_COOKIE_SAME_SITE', 'Lax'),
max_age=getattr(settings, 'PASSWORD_RESET_COOKIE_MAX_AGE', 3600)
)
token_serializer = GenericTokenModelSerializer(data={
'token': access_token['jti'],
'user': user.id,
'purpose': PASS_RESET_ACCESS
})
token_serializer.is_valid(raise_exception=True)
token_serializer.save()
return response
return render(self.request, 'password/reset.html', {
'validlink': False,
'form': None
})
[docs]
@staticmethod
def get_user(uidb64):
try:
# urlsafe_base64_decode() decodes to bytestring
uid = urlsafe_base64_decode(uidb64).decode()
user = get_user_model()._default_manager.get(pk=uid)
except (
TypeError,
ValueError,
OverflowError,
get_user_model().DoesNotExist,
ValidationError,
):
user = None
return user
[docs]
class ResetPasswordView(GenericAPIView):
"""
Calls Django Auth SetPasswordForm save method.
Accepts the following POST parameters: new_password1, new_password2
Returns the success/fail message.
"""
serializer_class = SetPasswordSerializer
permission_classes = (ResetPasswordPermission,)
throttle_classes = [UserRateThrottle]
[docs]
@sensitive_post_parameters_m
def dispatch(self, *args, **kwargs):
return super(ResetPasswordView, self).dispatch(*args, **kwargs)
[docs]
def post(self, request):
# check the token has not been used
query_set = GenericTokenModel.objects.filter(token=request.auth['jti'], purpose=PASS_RESET_ACCESS)
if len(query_set) != 1:
raise InvalidToken()
query_set.delete() # single use
# Load the user in the request
request.user = get_user_model().objects.get(id=self.request.user.id)
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
serializer.save()
# Revoke old sessions
if getattr(settings, 'LOGOUT_ON_PASSWORD_CHANGE', True):
RefreshTokenWhitelistModel.objects.filter(user=self.request.user.id).delete()
refresh_token = RefreshToken.for_user(request.user)
return build_token_response(
refresh_token,
extra_data={"detail": _("Password reset.")}
)
[docs]
class SetPasswordView(GenericAPIView):
"""
Set password for admin-managed registration.
Accepts: new_password1, new_password2
Returns: tokens and success message.
"""
serializer_class = SetPasswordSerializer
permission_classes = (SetPasswordPermission,)
throttle_classes = [UserRateThrottle]
[docs]
@sensitive_post_parameters_m
def dispatch(self, *args, **kwargs):
if not getattr(settings, 'JWT_ALLAUTH_ADMIN_MANAGED_REGISTRATION', False):
return HttpResponseNotFound()
return super(SetPasswordView, self).dispatch(*args, **kwargs)
[docs]
def post(self, request):
# check the token has not been used
query_set = GenericTokenModel.objects.filter(token=request.auth['jti'], purpose=PASS_SET_ACCESS)
if len(query_set) != 1:
raise InvalidToken()
query_set.delete() # single use
# Load the user in the request
try:
request.user = get_user_model().objects.get(id=self.request.user.id)
except get_user_model().DoesNotExist:
raise InvalidToken()
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
serializer.save()
# Revoke old sessions
if getattr(settings, 'LOGOUT_ON_PASSWORD_CHANGE', True):
RefreshTokenWhitelistModel.objects.filter(user=self.request.user.id).delete()
# If MFA TOTP is REQUIRED, return setup challenge instead of tokens
if get_mfa_totp_mode() == MFA_TOTP_REQUIRED:
setup_challenge_id = str(uuid.uuid4())
cache.set(
f"mfa_setup_challenge:{setup_challenge_id}",
{"user_id": request.user.id},
timeout=MFA_TOKEN_MAX_AGE_SECONDS,
)
return Response(
{
"mfa_setup_required": True,
"setup_challenge_id": setup_challenge_id,
"detail": _("Password set. Please configure MFA to complete registration."),
},
status=status.HTTP_200_OK,
)
refresh_token = RefreshToken.for_user(request.user)
return build_token_response(
refresh_token,
extra_data={"detail": _("Password set.")}
)