import re
from allauth.account import app_settings as allauth_settings
from allauth.account.adapter import get_adapter
from allauth.account.admin import EmailAddress
from allauth.account.models import get_emailconfirmation_model
from allauth.account.utils import setup_user_email
# from allauth.socialaccount.helpers import complete_social_login
# from allauth.socialaccount.models import SocialAccount
# from allauth.socialaccount.providers.base import AuthProcess
from allauth.utils import get_username_max_length
from django.conf import settings as django_settings
# from django.contrib.auth import get_user_model
from django.db import transaction
# from django.http import HttpRequest
from django.utils.crypto import constant_time_compare
from django.utils.translation import gettext_lazy as _
# from requests.exceptions import HTTPError
from rest_framework import serializers
[docs]
class RegisterSerializer(serializers.Serializer):
username = serializers.CharField(
max_length=get_username_max_length(),
min_length=allauth_settings.USERNAME_MIN_LENGTH,
required=False
)
email = serializers.EmailField(required=True, max_length=100)
password1 = serializers.CharField(write_only=True)
password2 = serializers.CharField(write_only=True)
first_name = serializers.CharField(required=True, write_only=True, max_length=100)
last_name = serializers.CharField(required=True, write_only=True, max_length=100)
_has_phone_field = False
[docs]
def validate_username(self, username):
username = get_adapter().clean_username(username)
return username
[docs]
def validate_email(self, email):
adapter = get_adapter()
email = adapter.clean_email(email)
if allauth_settings.UNIQUE_EMAIL:
if EmailAddress.objects.filter(email__iexact=email, verified=True).exists():
raise serializers.ValidationError(
_("A user is already registered with this e-mail address."))
# delete previous non-verified registration attempts
EmailAddress.objects.filter(email__iexact=email, verified=False).delete()
return email
[docs]
def validate_password1(self, password):
return get_adapter().clean_password(password)
[docs]
def validate_first_name(self, first_name):
pattern = r'^[A-Za-zÀ-ÖØ-öø-ÿ ]+$'
if not re.match(pattern, first_name):
raise serializers.ValidationError('Incorrect format')
first_name = re.sub(' +', ' ', first_name)
return " ".join([txt.capitalize() for txt in first_name.split(" ")])
[docs]
def validate_last_name(self, last_name):
pattern = r'^[A-Za-zÀ-ÖØ-öø-ÿ ]+$'
if not re.match(pattern, last_name):
raise serializers.ValidationError('Incorrect format')
last_name = re.sub(' +', ' ', last_name)
return " ".join([txt.capitalize() for txt in last_name.split(" ")])
[docs]
def validate(self, data):
# Only validate passwords if they exist (not required for admin-managed registration)
if 'password1' in data and 'password2' in data:
if not constant_time_compare(data['password1'], data['password2']):
raise serializers.ValidationError(_("The two password fields didn't match."))
return data
[docs]
def custom_signup(self, request, user):
pass
[docs]
def get_cleaned_data(self):
return {
'username': self.validated_data.get('username', ''),
'password1': self.validated_data.get('password1', ''),
'email': self.validated_data.get('email', ''),
'first_name': self.validated_data.get('first_name', ''),
'last_name': self.validated_data.get('last_name', ''),
}
[docs]
@transaction.atomic
def save(self, request):
adapter = get_adapter()
user = adapter.new_user(request)
self.cleaned_data = self.get_cleaned_data()
adapter.save_user(request, user, self, commit=False)
self.custom_signup(request, user)
user.save()
setup_user_email(request, user, [])
if not bool(django_settings.EMAIL_VERIFICATION):
email = EmailAddress.objects.filter(user=user.id).first()
if email is not None:
adapter.confirm_email(request, email)
return user
[docs]
class UserRegisterSerializer(RegisterSerializer):
"""
Registration serializer for admin-managed user creation.
- Requires email and role.
- Does not accept passwords; user sets password after email verification.
- first_name/last_name optional.
"""
# Remove password fields
password1 = None # type: ignore
password2 = None # type: ignore
# Override optionality of names
first_name = serializers.CharField(required=False, write_only=True, max_length=100)
last_name = serializers.CharField(required=False, write_only=True, max_length=100)
# Require explicit role
role = serializers.IntegerField(required=True, write_only=True)
[docs]
def validate(self, data):
if 'role' not in data:
raise serializers.ValidationError({"role": _("Role is required")})
return super().validate(data)
[docs]
def get_cleaned_data(self):
base = super().get_cleaned_data()
base.update({
'role': self.validated_data.get('role'),
})
return base
[docs]
def custom_signup(self, request, user):
"""
Apply role and ensure no password is set at creation time.
"""
cleaned = getattr(self, 'cleaned_data', {}) or {}
role = cleaned.get('role')
if role is not None:
try:
user.role = int(role)
except (TypeError, ValueError):
pass
# Prevent login until password is set
user.set_unusable_password()
[docs]
@transaction.atomic
def save(self, request):
"""
Override to ignore EMAIL_VERIFICATION auto-confirm logic and always keep email unverified
until the set-password step in admin-managed registration.
"""
adapter = get_adapter()
user = adapter.new_user(request)
self.cleaned_data = self.get_cleaned_data()
adapter.save_user(request, user, self, commit=False)
self.custom_signup(request, user)
user.save()
setup_user_email(request, user, [])
# Create an EmailConfirmation instance for the user's primary email
email_address = EmailAddress.objects.get_primary(user)
if email_address is not None:
confirmation_model = get_emailconfirmation_model()
emailconfirmation = confirmation_model.create(email_address)
adapter.send_confirmation_mail(request, emailconfirmation, signup=True)
return user
#
# class SocialAccountSerializer(serializers.ModelSerializer):
# """
# serialize allauth SocialAccounts for use with a REST API
# """
# class Meta:
# model = SocialAccount
# fields = (
# 'id',
# 'provider',
# 'uid',
# 'last_login',
# 'date_joined',
# )
#
#
# class SocialLoginSerializer(serializers.Serializer):
# access_token = serializers.CharField(required=False, allow_blank=True)
# code = serializers.CharField(required=False, allow_blank=True)
#
# def _get_request(self):
# request = self.context.get('request')
# if not isinstance(request, HttpRequest):
# request = request._request
# return request
#
# def get_social_login(self, adapter, app, token, response):
# """
# :param adapter: allauth.socialaccount Adapter subclass.
# Usually OAuthAdapter or Auth2Adapter
# :param app: `allauth.socialaccount.SocialApp` instance
# :param token: `allauth.socialaccount.SocialToken` instance
# :param response: Provider's response for OAuth1. Not used in the
# :returns: A populated instance of the
# `allauth.socialaccount.SocialLoginView` instance
# """
# request = self._get_request()
# social_login = adapter.complete_login(request, app, token, response=response)
# social_login.token = token
# return social_login
#
# def validate(self, attrs):
# view = self.context.get('view')
# request = self._get_request()
#
# if not view:
# raise serializers.ValidationError(
# _("View is not defined, pass it as a context variable")
# )
#
# adapter_class = getattr(view, 'adapter_class', None)
# if not adapter_class:
# raise serializers.ValidationError(_("Define adapter_class in view"))
#
# adapter = adapter_class(request)
# app = adapter.get_provider().get_app(request)
#
# # More info on code vs access_token
# # http://stackoverflow.com/questions/8666316/facebook-oauth-2-0-code-and-token
#
# # Case 1: We received the access_token
# if attrs.get('access_token'):
# access_token = attrs.get('access_token')
#
# # Case 2: We received the authorization code
# elif attrs.get('code'):
# self.callback_url = getattr(view, 'callback_url', None)
# self.client_class = getattr(view, 'client_class', None)
#
# if not self.callback_url:
# raise serializers.ValidationError(
# _("Define callback_url in view")
# )
# if not self.client_class:
# raise serializers.ValidationError(
# _("Define client_class in view")
# )
#
# code = attrs.get('code')
#
# provider = adapter.get_provider()
# scope = provider.get_scope(request)
# client = self.client_class(
# request,
# app.client_id,
# app.secret,
# adapter.access_token_method,
# adapter.access_token_url,
# self.callback_url,
# scope
# )
# token = client.get_access_token(code)
# access_token = token['access_token']
#
# else:
# raise serializers.ValidationError(
# _("Incorrect input. access_token or code is required."))
#
# social_token = adapter.parse_token({'access_token': access_token})
# social_token.app = app
#
# try:
# login = self.get_social_login(adapter, app, social_token, access_token)
# complete_social_login(request, login)
# except HTTPError:
# raise serializers.ValidationError(_("Incorrect value"))
#
# if not login.is_existing:
# # We have an account already signed up in a different flow
# # with the same email address: raise an exception.
# # This needs to be handled in the frontend. We can not just
# # link up the accounts due to security constraints
# if allauth_settings.UNIQUE_EMAIL:
# # Do we have an account already with this email address?
# account_exists = get_user_model().objects.filter(
# email=login.user.email,
# ).exists()
# if account_exists:
# raise serializers.ValidationError(
# _("User is already registered with this e-mail address.")
# )
#
# login.lookup()
# login.save(request, connect=True)
#
# attrs['user'] = login.account.user
#
# return attrs
#
#
# class SocialConnectMixin(object):
# def get_social_login(self, *args, **kwargs):
# """
# Set the social login process state to connect rather than login
# Refer to the implementation of get_social_login in base class and to the
# allauth.socialaccount.helpers module complete_social_login function.
# """
# social_login = super(SocialConnectMixin, self).get_social_login(*args, **kwargs)
# social_login.state['process'] = AuthProcess.CONNECT
# return social_login
#
#
# class SocialConnectSerializer(SocialConnectMixin, SocialLoginSerializer):
# pass