Source code for jwt_allauth.login.serializers

from typing import Dict, Any
import uuid

from django.conf import settings
from django.contrib.auth.models import update_last_login
from django.core.cache import cache
from django.db import transaction
from rest_framework import exceptions
from rest_framework_simplejwt.serializers import TokenObtainPairSerializer
from rest_framework_simplejwt.settings import api_settings

from jwt_allauth.constants import (
    MFA_TOKEN_MAX_AGE_SECONDS,
    MFA_TOTP_DISABLED,
    MFA_TOTP_REQUIRED,
)
from jwt_allauth.tokens.app_settings import RefreshToken
from jwt_allauth.utils import allauth_authenticate


[docs] def get_mfa_totp_mode() -> str: """ Return the current MFA TOTP mode from settings. This must be evaluated at call time (not import time) so that Django's `override_settings` used in tests – and any runtime changes – are respected. """ return getattr(settings, "JWT_ALLAUTH_MFA_TOTP_MODE", MFA_TOTP_DISABLED)
try: from allauth.mfa.models import Authenticator # type: ignore except Exception: # pragma: no cover - optional dependency guard Authenticator = None # type: ignore if get_mfa_totp_mode() != MFA_TOTP_DISABLED: raise Exception( "MFA TOTP is not available. Please ensure 'django-jwt-allauth[mfa]' " "is installed and 'allauth.mfa' is added to INSTALLED_APPS." )
[docs] class LoginSerializer(TokenObtainPairSerializer): token_class = RefreshToken username_field = getattr(settings, 'ACCOUNT_AUTHENTICATION_METHOD', 'email') user = None
[docs] @classmethod def get_token(cls, user) -> RefreshToken: """ Instantiates a new TokenObtainPairSerializer object, sets a token for the given user and returns the token. """ cls.token = cls.token_class.for_user(user) return cls.token # type: ignore
[docs] @transaction.atomic def validate(self, attrs: Dict[str, Any]) -> Dict[Any, Any]: # Get the email and password information authenticate_kwargs = { self.username_field: attrs[self.username_field], "password": attrs["password"], } try: authenticate_kwargs["request"] = self.context["request"] except KeyError: pass # User authentication (allauth) self.user = allauth_authenticate(**authenticate_kwargs) # Active account check if not api_settings.USER_AUTHENTICATION_RULE(self.user): raise exceptions.AuthenticationFailed( self.error_messages["no_active_account"], "no_active_account", ) # MFA TOTP check mfa_mode = get_mfa_totp_mode() if mfa_mode != MFA_TOTP_DISABLED and Authenticator is not None: has_mfa = Authenticator.objects.filter( user=self.user, type=getattr(Authenticator, "Type").TOTP if hasattr(Authenticator, "Type") else "totp", ).exists() # If MFA is REQUIRED, user must have MFA enabled # Instead of raising 403, return setup challenge for bootstrap if mfa_mode == MFA_TOTP_REQUIRED and not has_mfa: setup_challenge_id = str(uuid.uuid4()) cache.set( f"mfa_setup_challenge:{setup_challenge_id}", {"user_id": self.user.id}, timeout=MFA_TOKEN_MAX_AGE_SECONDS, ) return { "mfa_setup_required": True, "setup_challenge_id": setup_challenge_id, } # If user has MFA enabled (OPTIONAL or REQUIRED mode), request MFA verification if has_mfa: # Store MFA challenge server-side in cache with TTL challenge_id = str(uuid.uuid4()) challenge_data = {"user_id": self.user.id, "timestamp": None} cache.set(f"mfa_challenge:{challenge_id}", challenge_data, timeout=MFA_TOKEN_MAX_AGE_SECONDS) return {"mfa_required": True, "challenge_id": challenge_id} validated_data = super().validate(attrs) # Set the refresh token refresh = self.get_token(self.user) validated_data["refresh"] = str(refresh) validated_data["access"] = str(refresh.access_token) if api_settings.UPDATE_LAST_LOGIN: update_last_login(None, self.user) return validated_data