Source code for jwt_allauth.tokens.tokens
import hashlib
from uuid import uuid4
from django.contrib.auth.tokens import PasswordResetTokenGenerator
from rest_framework.exceptions import ValidationError
from rest_framework_simplejwt.exceptions import InvalidToken
from rest_framework_simplejwt.tokens import RefreshToken as DefaultRefreshToken
from jwt_allauth.roles import STAFF_CODE, SUPER_USER_CODE
from jwt_allauth.tokens.models import GenericTokenModel
from jwt_allauth.tokens.serializers import RefreshTokenWhitelistSerializer, GenericTokenModelSerializer
from jwt_allauth.utils import user_agent_dict
[docs]
class RefreshToken(DefaultRefreshToken):
[docs]
def set_session(self, id_=None):
"""
Unique identifier of the session associated to the refresh token.
"""
if id_ is None:
id_ = uuid4().hex
self.payload['session'] = id_
[docs]
def set_user_role(self, user):
self.payload['role'] = user.role
[docs]
@classmethod
def for_user(cls, user, request=None, enabled=True):
"""
Return
------
RefreshToken
"""
token = super().for_user(user)
token.set_session() # type: ignore
token.set_user_role(user) # type: ignore
# Store the token in the database
refresh_serializer = RefreshTokenWhitelistSerializer(data={
'jti': token.payload['jti'],
'user': user.id,
'enabled': enabled,
'session': token.payload['session'],
**user_agent_dict(request)
})
try:
refresh_serializer.is_valid(raise_exception=True)
refresh_serializer.save()
except ValidationError as e:
raise InvalidToken(e.args[0])
return token
[docs]
class GenericToken(PasswordResetTokenGenerator):
def __init__(self, purpose, request=None):
super().__init__()
self.request = request
self.purpose = purpose
[docs]
def make_token(self, user):
token = super().make_token(user)
hashed_token = hashlib.sha256(str(token).encode()).hexdigest()
token_serializer = GenericTokenModelSerializer(data={
'token': hashed_token,
'user': user.id,
'purpose': self.purpose,
**user_agent_dict(self.request)
})
try:
token_serializer.is_valid(raise_exception=True)
token_serializer.save()
# remove existing tokens for the same purpose
GenericTokenModel.objects.filter(user=user, purpose=self.purpose).exclude(token=hashed_token).delete()
except ValidationError as e:
raise InvalidToken(e.args[0])
return token
[docs]
def check_token(self, user, token):
result = super().check_token(user, token)
if result:
hashed_token = hashlib.sha256(str(token).encode()).hexdigest()
if GenericTokenModel.objects.filter(token=hashed_token, purpose=self.purpose).count() == 0:
return False
GenericTokenModel.objects.filter(token=hashed_token, purpose=self.purpose).delete()
return result