Best Python code snippet using tempest_python
auth.py
Source:auth.py  
1"""Authentication module for MyGamePlan."""2import binascii3import datetime4import functools5import hashlib6import logging7import os8import re9import time10import typing11from typing import Any, Callable, NoReturn, Optional, Tuple, TypedDict, Union12from urllib import parse13import bson14from bson import objectid15import flask16from google.protobuf import json_format17from google.protobuf import timestamp_pb218import requests19from requests import exceptions as request_exceptions20import werkzeug21from werkzeug import exceptions22from werkzeug import http23from bob_emploi.common.python import now24from bob_emploi.frontend.server import auth_token as token25from bob_emploi.frontend.server import french26from bob_emploi.frontend.server import i18n27from bob_emploi.frontend.server import mongo28from bob_emploi.frontend.server import privacy29from bob_emploi.frontend.server import product30from bob_emploi.frontend.server import proto31from bob_emploi.frontend.server.mail import campaign32from bob_emploi.frontend.server.mail import mail_send33from bob_emploi.frontend.api import auth_pb234from bob_emploi.frontend.api import diagnostic_pb235from bob_emploi.frontend.api import features_pb236from bob_emploi.frontend.api import user_pb237from bob_emploi.frontend.api import user_profile_pb238if typing.TYPE_CHECKING:39    class _UpdateReturningUserFuncType(typing.Protocol):40        def __call__(41            self, user: user_pb2.User, /, force_update: bool = ...,42        ) -> timestamp_pb2.Timestamp:43            ...44_LinkedInEmailResponse = TypedDict(45    '_LinkedInEmailResponse', {46        'handle~': dict[str, str],47    }, total=False)48_EMPLOI_STORE_CLIENT_ID = os.getenv('EMPLOI_STORE_CLIENT_ID')49_EMPLOI_STORE_CLIENT_SECRET = os.getenv('EMPLOI_STORE_CLIENT_SECRET')50# https://www.linkedin.com/developer/apps/4800174/auth51_LINKED_IN_CLIENT_ID = os.getenv('LINKED_IN_CLIENT_ID', '86r4xh5py0mw9k')52_LINKED_IN_CLIENT_SECRET = os.getenv('LINKED_IN_CLIENT_SECRET', 'fake-key-fake-key')53_PE_CONNECT_GENDER = {54    'female': user_profile_pb2.FEMININE,55    'male': user_profile_pb2.MASCULINE,56}57_AUTH_FIELDS = {58    'googleId': 'Google',59    'facebookId': 'Facebook',60    'peConnectId': 'Pôle emploi',61    'linkedInId': 'LinkedIn',62}63_AUTH_TOKEN_COOKIE_NAME = 'auth_token'64_AUTH_TOKEN_PERSISTENCE_DURATION = datetime.timedelta(days=30)65http.HTTP_STATUS_CODES[498] = 'Authentication token expired'66_MA_VOIE_API_URL = os.getenv('MA_VOIE_API_URL')67_MA_VOIE_AUTH = tuple(os.getenv('MA_VOIE_AUTH', ':').split(':', 1))68# TODO(cyrille): Add the env variable to the cloudformation template.69_ENABLE_ACTION_PLAN = bool(os.getenv('ENABLE_ACTION_PLAN', os.getenv('BOB_DEPLOYMENT') == 'fr'))70def _register_to_ma_voie(ma_voie: user_pb2.MaVoieInfo) -> None:71    if not _MA_VOIE_API_URL or not _MA_VOIE_AUTH[0] or not _MA_VOIE_AUTH[1]:72        logging.warning(73            'Ma Voie registration is not well configured:\nURL "%s"\nusername "%s"\npassword %s',74            _MA_VOIE_API_URL, _MA_VOIE_AUTH[0], '"****"' if _MA_VOIE_AUTH[1] else 'not set')75        return76    response = requests.post(77        f'{_MA_VOIE_API_URL}/user/{ma_voie.ma_voie_id}/register',78        auth=_MA_VOIE_AUTH, json={'stepId': ma_voie.step_id})79    try:80        response.raise_for_status()81    except request_exceptions.HTTPError:82        logging.error("Couln't register user to Ma Voie.")83class ExpiredTokenException(exceptions.HTTPException):84    """Exception class for expired authentication tokens."""85    code = 49886    description = '<p>The authentication token has expired.</p>'87def require_admin(func: Callable[..., Any]) -> Callable[..., Any]:88    """Decorator for a function that requires admin authorization."""89    def _decorated_fun(*args: Any, **kwargs: Any) -> Any:90        request_token = flask.request.headers.get('Authorization', '')91        try:92            is_valid_token = token.check_admin_token(request_token)93        except ValueError:94            is_valid_token = False95        if not is_valid_token:96            # TODO(cyrille): Raise on empty ADMIN_AUTH_TOKEN and check this before.97            if not request_token:98                flask.abort(401)99            flask.abort(403)100        return func(*args, **kwargs)101    return functools.wraps(func)(_decorated_fun)102# TODO(cyrille): Use typing_extension to ensure get_user_id has the same param103# as the function that is returned.104def require_user(get_user_id: Callable[..., str], role: str = 'auth') \105        -> Callable[[Callable[..., Any]], Any]:106    """Check if authenticated user has a valid token in Authorization header."""107    def _decorator(func: Callable[..., Any]) -> Any:108        def _decorated_fun(*args: Any, **kwargs: Any) -> Any:109            auth_token = flask.request.headers.get('Authorization', '').replace('Bearer ', '')110            if not auth_token:111                auth_token = flask.request.args.get('token', '')112                if not auth_token:113                    auth_token = flask.request.cookies.get(_AUTH_TOKEN_COOKIE_NAME, '')114                    if not auth_token:115                        flask.abort(401, i18n.flask_translate('Token manquant'))116            user_id = get_user_id(*args, **kwargs)117            try:118                token.check_token(user_id, auth_token, role=role)119            except ValueError:120                flask.abort(403, i18n.flask_translate('Unauthorized token'))121            return func(*args, **kwargs)122        return functools.wraps(func)(_decorated_fun)123    return _decorator124def _set_auth_cookie(125        response: flask.Response, *, value: str, expires_delta: Optional[float]) -> None:126    response.set_cookie(127        _AUTH_TOKEN_COOKIE_NAME, value=value, path='/api/', secure=True, httponly=True,128        expires=(int(time.time() - expires_delta)) if expires_delta else None)129_FlaskResponse = Union[str, werkzeug.Response, tuple[str, int]]130def clear_user_cookies(func: Callable[..., _FlaskResponse]) -> Callable[..., flask.Response]:131    """Decorator to clear a user auth cookies when returning."""132    def _decorated_fun(*args: Any, **kwargs: Any) -> flask.Response:133        response = func(*args, **kwargs)134        flask_response = flask.make_response(response)135        _set_auth_cookie(flask_response, value='', expires_delta=-3600)136        return flask_response137    return functools.wraps(func)(_decorated_fun)138def set_auth_cookie(139        response: flask.Response, auth_token: str, is_persistent: bool = False) -> None:140    """Sets the auth cookie for future re-authentication."""141    _set_auth_cookie(142        response, value=auth_token,143        expires_delta=_AUTH_TOKEN_PERSISTENCE_DURATION.total_seconds() if is_persistent else None)144def require_user_in_args(role: str = 'auth') -> Callable[[Callable[..., Any]], Callable[..., Any]]:145    """Check if authenticated user has a valid token in request GET args."""146    def _decorator(func: Callable[..., Any]) -> Callable[..., Any]:147        def _decorated_fun(*args: Any, **kwargs: Any) -> Any:148            auth_token = flask.request.args.get('token')149            user_id = flask.request.args.get('user')150            if not user_id or not auth_token:151                flask.abort(422, i18n.flask_translate('Paramètres manquants.'))152            try:153                token.check_token(user_id, auth_token, role=role)154            except ValueError:155                flask.abort(403, i18n.flask_translate('Accès non autorisé.'))156            return func(*args, **dict(kwargs, user_id=user_id))157        return functools.wraps(func)(_decorated_fun)158    return _decorator159def require_google_user(160        emails_pattern: str = '@bayesimpact.org', email_kwarg: Optional[str] = None) \161        -> Callable[[Callable[..., Any]], Callable[..., Any]]:162    """Check if authenticated user has a valid google id token163    in Authorization header, and associated google account is from164    bayesimpact.org domain.165    Args:166        emails_pattern: the regex pattern that email should validate to be authorized.167        email_kwarg: if set, the decorated function will get an extra keyword arg with168            the actual email address. If an admin token is used, the forwarded email address is169            'admin@bayesimpact.org'.170    """171    emails_regexp = re.compile(emails_pattern)172    def _get_google_email(authorization: str) -> str:173        try:174            token.check_admin_token(authorization)175            return 'admin@bayesimpact.org'176        except ValueError:177            pass178        if authorization.startswith('Bearer '):179            authorization = authorization.removeprefix('Bearer ')180        else:181            authorization = flask.request.args.get('token', '')182            if not authorization:183                flask.abort(401, i18n.flask_translate('Token manquant'))184        try:185            return token.check_google_token(authorization, emails_regexp)186        except i18n.TranslatableException as error:187            flask.abort(401, error.flask_translate())188    def _decorator(wrapped: Callable[..., Any]) -> Callable[..., Any]:189        @functools.wraps(wrapped)190        def _wrapper(*args: Any, **kwargs: Any) -> Any:191            email = _get_google_email(flask.request.headers.get('Authorization', ''))192            if email_kwarg:193                kwargs = kwargs | {email_kwarg: email}194            return wrapped(*args, **kwargs)195        return _wrapper196    return _decorator197def hash_user_email(email: str) -> str:198    """Hash email for better obfuscation of personal data."""199    hashed_email = hashlib.sha1()200    hashed_email.update(b'bob-emploi')201    hashed_email.update(email.lower().encode('utf-8'))202    return hashed_email.hexdigest()203def delete_user(user_proto: user_pb2.User, user_database: mongo.UsersDatabase) -> bool:204    """Close a user's account.205    We assume the given user_proto is up-to-date, e.g. just being fetched from database.206    """207    try:208        user_id = objectid.ObjectId(user_proto.user_id)209    except bson.errors.InvalidId:210        logging.exception('Tried to delete a user with invalid ID "%s"', user_proto.user_id)211        return False212    filter_user = {'_id': user_id}213    # Remove authentication informations.214    user_database.user_auth.delete_one(filter_user)215    try:216        privacy.redact_proto(user_proto)217    except TypeError:218        logging.exception('Cannot delete account %s', str(user_id))219        return False220    user_proto.deleted_at.FromDatetime(now.get())221    user_proto.ClearField('user_id')222    user_database.user.replace_one(filter_user, json_format.MessageToDict(user_proto))223    return True224def _parse_user_from_mongo(user_dict: dict[str, Any], user: user_pb2.User) -> None:225    if not proto.parse_from_mongo(user_dict, user, 'user_id'):226        flask.abort(227            500,228            i18n.flask_translate(229                'Les données utilisateur sont corrompues dans la base de données.'))230def _abort_failed_login() -> None:231    flask.abort(232        403,233        i18n.flask_translate(234            "L'email et le mot de passe ne correspondent pas. " +235            "Si vous avez déjà créé un compte mais que vous n'avez pas créé votre mot de passe, " +236            'nous venons de vous envoyer un email pour vous connecter.'))237def _check_password(stored_hashed_password: str, salt: str, request_hashed_password: str) -> None:238    hashed_password = hashlib.sha1()239    hashed_password.update(salt.encode('ascii'))240    hashed_password.update(stored_hashed_password.encode('ascii'))241    request_hashed_password_bin = binascii.unhexlify(request_hashed_password)242    if request_hashed_password_bin != hashed_password.digest():243        _abort_failed_login()244def _get_auth_error_message() -> str:245    return i18n.flask_translate("Les informations d'authentification ne sont pas valides.")246def _abort_on_bad_auth() -> NoReturn:247    flask.abort(403, _get_auth_error_message())248class Authenticator:249    """An object to authenticate requests."""250    def __init__(251            self, user_db: mongo.UsersDatabase,252            save_new_user: Callable[[user_pb2.User], user_pb2.User],253            update_returning_user: '_UpdateReturningUserFuncType') -> None:254        self._user_db = user_db255        self._save_new_user = save_new_user256        self._update_returning_user = update_returning_user257        self._user_collection = self._user_db.user258    def save_new_user(259            self, user: user_pb2.User, user_data: auth_pb2.AuthUserData) -> user_pb2.User:260        """Save a user with additional data."""261        user.profile.locale = user_data.locale262        user.features_enabled.alpha = user_data.is_alpha263        user.features_enabled.exclude_from_analytics = user_data.is_alpha264        if _ENABLE_ACTION_PLAN or user_data.is_action_plan_enabled:265            user.features_enabled.action_plan = features_pb2.ACTIVE266        if user_data.HasField('ma_voie'):267            user.ma_voie.CopyFrom(user_data.ma_voie)268            _register_to_ma_voie(user_data.ma_voie)269        if user_data.HasField('original_self_diagnostic'):270            original_self_diagnostic = diagnostic_pb2.SelfDiagnostic()271            original_self_diagnostic.CopyFrom(user_data.original_self_diagnostic)272            user.projects.add(is_incomplete=True, original_self_diagnostic=original_self_diagnostic)273        return self._save_new_user(user)274    def authenticate(self, auth_request: auth_pb2.AuthRequest) -> auth_pb2.AuthResponse:275        """Authenticate a user."""276        if auth_request.google_token_id:277            return self._google_authenticate(auth_request)278        if auth_request.facebook_access_token:279            return self._facebook_authenticate(auth_request)280        if auth_request.pe_connect_code:281            return self._pe_connect_authenticate(auth_request)282        if auth_request.linked_in_code:283            return self._linked_in_authenticate(auth_request)284        if auth_request.email:285            return self._email_authenticate(auth_request)286        if auth_request.user_id:287            return self._token_authenticate(auth_request)288        # Create a guest user.289        if auth_request.first_name:290            return self._create_guest_user(auth_request)291        logging.warning('No mean of authentication found:\n%s', auth_request)292        flask.abort(422, i18n.flask_translate("Aucun moyen d'authentification n'a été trouvé."))293    def change_email(self, user_proto: user_pb2.User, auth_request: auth_pb2.AuthRequest) \294            -> user_pb2.User:295        """Change user's email address."""296        new_hashed_email = hash_user_email(auth_request.email)297        if user_proto.hashed_email == new_hashed_email:298            # Trying to set the same email.299            return user_proto300        user_auth_dict = self._user_db.user_auth.find_one(301            {'_id': objectid.ObjectId(user_proto.user_id)})302        if user_auth_dict or auth_request.hashed_password:303            if not user_auth_dict:304                flask.abort(305                    422, i18n.flask_translate("L'utilisateur n'a pas encore de mot de passe"))306            stored_hashed_password = user_auth_dict.get('hashedPassword', '')307            _check_password(308                stored_hashed_password, auth_request.hash_salt, auth_request.hashed_password)309        existing = self._user_collection.find_one({'hashedEmail': new_hashed_email}, {'_id': 1})310        if existing:311            flask.abort(312                403,313                i18n.flask_translate('L\'email "{email}" est déjà utilisé par un autre compte')314                .format(email=auth_request.email))315        user_proto.profile.email = auth_request.email316        user_proto.hashed_email = new_hashed_email317        if user_auth_dict:318            self._user_db.user_auth.replace_one(319                {'_id': objectid.ObjectId(user_proto.user_id)},320                {'hashedPassword': auth_request.new_hashed_password})321        self._update_returning_user(user_proto, force_update=True)322        return user_proto323    def _google_authenticate(self, auth_request: auth_pb2.AuthRequest) -> auth_pb2.AuthResponse:324        try:325            id_info = token.decode_google_id_token(auth_request.google_token_id)326        except i18n.TranslatableException as error:327            flask.abort(401, error.flask_translate())328        response = auth_pb2.AuthResponse()329        user_dict = self._user_collection.find_one({'googleId': id_info['sub']})330        if proto.parse_from_mongo(user_dict, response.authenticated_user, 'user_id'):331            self._handle_returning_user(response)332        else:333            is_existing_user = self._load_user_from_token_or_email(334                auth_request, response.authenticated_user, id_info['email'])335            response.authenticated_user.profile.picture_url = id_info.get('picture', '')336            response.authenticated_user.google_id = id_info['sub']337            response.is_new_user = not response.authenticated_user.has_account338            response.authenticated_user.has_account = True339            if is_existing_user:340                self._handle_returning_user(response, force_update=True)341            else:342                self.save_new_user(response.authenticated_user, auth_request.user_data)343        response.auth_token = token.create_token(response.authenticated_user.user_id, 'auth')344        return response345    def _facebook_authenticate(346            self, auth_request: auth_pb2.AuthRequest) -> auth_pb2.AuthResponse:347        user_info_response = requests.get('https://graph.facebook.com/v4.0/me', params=dict(348            fields='id,first_name,email',349            access_token=auth_request.facebook_access_token,350        ))351        if user_info_response.status_code < 200 or user_info_response.status_code >= 300:352            flask.abort(353                user_info_response.status_code,354                user_info_response.json().get('error', {}).get('message', ''))355        user_info = typing.cast(dict[str, str], user_info_response.json())356        response = auth_pb2.AuthResponse()357        user_dict = self._user_collection.find_one({'facebookId': user_info['id']})358        if proto.parse_from_mongo(user_dict, response.authenticated_user, 'user_id'):359            self._handle_returning_user(response)360        else:361            is_existing_user = self._load_user_from_token_or_email(362                auth_request, response.authenticated_user, user_info.get('email'))363            response.authenticated_user.facebook_id = user_info['id']364            response.is_new_user = not response.authenticated_user.has_account365            response.authenticated_user.has_account = True366            if is_existing_user:367                self._handle_returning_user(response, force_update=True)368            else:369                response.authenticated_user.profile.name = user_info.get('first_name', '')370                self.save_new_user(response.authenticated_user, auth_request.user_data)371        response.auth_token = token.create_token(response.authenticated_user.user_id, 'auth')372        return response373    def _pe_connect_authenticate(self, auth_request: auth_pb2.AuthRequest) -> auth_pb2.AuthResponse:374        token_data = _get_oauth2_access_token(375            'https://authentification-candidat.pole-emploi.fr/connexion/oauth2/access_token?'376            'realm=/individu',377            code=auth_request.pe_connect_code,378            client_id=_EMPLOI_STORE_CLIENT_ID or '',379            client_secret=_EMPLOI_STORE_CLIENT_SECRET or '',380            auth_name='PE Connect',381        )382        if token_data.get('nonce') != auth_request.pe_connect_nonce:383            flask.abort(403, i18n.flask_translate('Mauvais paramètre nonce'))384        bearer = token_data.get('token_type', 'Bearer')385        access_token = token_data.get('access_token', '')386        authorization_header = f'{bearer} {access_token}'387        user_info_response = requests.get(388            'https://api.emploi-store.fr/partenaire/peconnect-individu/v1/userinfo',389            headers={'Authorization': authorization_header})390        if user_info_response.status_code < 200 or user_info_response.status_code >= 400:391            logging.warning(392                'PE Connect fails (%d): "%s"', user_info_response.status_code,393                user_info_response.text)394            flask.abort(403, user_info_response.text)395        user_info = typing.cast(dict[str, str], user_info_response.json())396        response = auth_pb2.AuthResponse()397        user_dict = self._user_collection.find_one({'peConnectId': user_info['sub']})398        if proto.parse_from_mongo(user_dict, response.authenticated_user, 'user_id'):399            self._handle_returning_user(response)400        else:401            user = response.authenticated_user402            is_existing_user = self._load_user_from_token_or_email(403                auth_request, user, user_info.get('email'))404            user.pe_connect_id = user_info['sub']405            response.is_new_user = force_update = not user.has_account406            user.has_account = True407            if is_existing_user:408                self._handle_returning_user(response, force_update=force_update)409            else:410                # TODO(pascal): Handle the case where one of the name is missing.411                user.profile.name = french.cleanup_firstname(user_info.get('given_name', ''))412                user.profile.last_name = french.cleanup_firstname(user_info.get('family_name', ''))413                user.profile.gender = _PE_CONNECT_GENDER.get(414                    user_info.get('gender', ''), user_profile_pb2.UNKNOWN_GENDER)415                self.save_new_user(user, auth_request.user_data)416        response.auth_token = token.create_token(response.authenticated_user.user_id, 'auth')417        return response418    def _linked_in_authenticate(self, auth_request: auth_pb2.AuthRequest) -> auth_pb2.AuthResponse:419        token_data = _get_oauth2_access_token(420            'https://www.linkedin.com/oauth/v2/accessToken',421            code=auth_request.linked_in_code,422            client_id=_LINKED_IN_CLIENT_ID or '',423            client_secret=_LINKED_IN_CLIENT_SECRET or '',424            auth_name='LinkedIn Auth',425        )426        bearer = token_data.get('token_type', 'Bearer')427        access_token = token_data.get('access_token', '')428        authorization_header = f'{bearer} {access_token}'429        user_info_response = requests.get(430            'https://api.linkedin.com/v2/me',431            headers={'Authorization': authorization_header})432        if user_info_response.status_code < 200 or user_info_response.status_code >= 400:433            logging.warning(434                'LinkedIn Auth fails (%d): "%s"', user_info_response.status_code,435                user_info_response.text)436            flask.abort(403, user_info_response.text)437        user_info = typing.cast(dict[str, str], user_info_response.json())438        response = auth_pb2.AuthResponse()439        # TODO(cyrille): Factorize with other 3rd party auth.440        user_dict = self._user_collection.find_one({'linkedInId': user_info['id']})441        if proto.parse_from_mongo(user_dict, response.authenticated_user, 'user_id'):442            self._handle_returning_user(response)443        else:444            email_response = requests.get(445                'https://api.linkedin.com/v2/emailAddress?q=members&projection=(elements*(handle~))',446                headers={'Authorization': authorization_header})447            if email_response.status_code < 200 or email_response.status_code >= 400:448                logging.warning(449                    'LinkedIn Auth fails (%d): "%s"', email_response.status_code,450                    email_response.text)451                flask.abort(403, email_response.text)452            email_response_json = typing.cast(_LinkedInEmailResponse, email_response.json())453            email = email_response_json.get('handle~', {}).get('emailAddress')454            user = response.authenticated_user455            is_existing_user = self._load_user_from_token_or_email(auth_request, user, email)456            user.linked_in_id = user_info['id']457            response.is_new_user = not user.has_account458            user.has_account = True459            if is_existing_user:460                self._handle_returning_user(response, force_update=True)461            else:462                # TODO(pascal): Handle the case where one of the name is missing.463                user.profile.name = user_info.get('localizedFirstName', '')464                user.profile.last_name = user_info.get('localizedLastName', '')465                self.save_new_user(user, auth_request.user_data)466        response.auth_token = token.create_token(response.authenticated_user.user_id, 'auth')467        return response468    def _get_reset_password_link(self, user_dict: dict[str, Any]) -> str:469        auth_token, email = self._create_reset_token_from_user(user_dict)470        return parse.urljoin(flask.request.url_root, '?' + parse.urlencode({471            'email': email,472            'resetToken': auth_token}))473    def send_update_confirmation(self, user_dict: dict[str, Any]) -> None:474        """Sends an email to the user that confirms password change."""475        user_id = str(user_dict['_id'])476        if not user_id:477            return478        auth_link = token.create_logged_url(user_id)479        reset_link = self._get_reset_password_link(user_dict)480        if not reset_link or not auth_link:481            return482        user = proto.create_from_mongo(user_dict.copy(), user_pb2.User)483        template_vars = dict(484            campaign.get_default_coaching_email_vars(user),485            authLink=auth_link, resetPwdLink=reset_link)486        # TODO(cyrille): Create a static Campaign object and use it.487        result = mail_send.send_template(488            'send-pwd-update-confirmation', user.profile, template_vars)489        if result.status_code != 200:490            logging.error('Failed to send an email with MailJet:\n %s', result.text)491            flask.abort(result.status_code)492    def _email_authenticate(self, auth_request: auth_pb2.AuthRequest) -> auth_pb2.AuthResponse:493        instant = int(time.time())494        response = auth_pb2.AuthResponse()495        response.hash_salt = token.timestamped_hash(instant, auth_request.email)496        user_dict = self._user_collection.find_one(497            {'hashedEmail': hash_user_email(auth_request.email)})498        if not user_dict:499            return self._email_register(auth_request, response)500        user_object_id = user_dict['_id']501        user_id = str(user_object_id)502        user_auth_dict = self._user_db.user_auth.find_one({'_id': user_object_id})503        if not auth_request.hashed_password:504            # User exists but did not send a password: probably just getting some fresh salt.505            return response506        if not user_auth_dict:507            if not auth_request.auth_token:508                # User is trying to connect with a password, but never created one.509                self.send_auth_token(user_dict)510                _abort_failed_login()511            try:512                token.check_token(user_id, auth_request.auth_token, role='auth')513            except ValueError as error:514                logging.info('Invalid token:\n %s', error)515                _abort_on_bad_auth()516            # User that already uses an SSO account is now trying to add a password.517            _parse_user_from_mongo(user_dict, response.authenticated_user)518            self._user_db.user_auth.insert_one({519                '_id': user_object_id,520                'hashedPassword': auth_request.hashed_password,521            })522            response.auth_token = token.create_token(user_id, 'auth')523            response.authenticated_user.has_account = True524            response.authenticated_user.has_password = True525            response.is_password_updated = True526            self._handle_returning_user(response, force_update=True)527            return response528        if auth_request.user_id:529            # User is a guest using a pre-existing email account:530            # maybe they're using the same password.531            if auth_request.hashed_password != user_auth_dict.get('hashedPassword', ''):532                return response533            _parse_user_from_mongo(user_dict, response.authenticated_user)534            return response535        if auth_request.auth_token:536            self._reset_password(auth_request, user_id, user_auth_dict, user_dict)537            _parse_user_from_mongo(user_dict, response.authenticated_user)538            response.auth_token = token.create_token(user_id, 'auth')539            return response540        if not auth_request.hash_salt:541            # User exists but has not sent salt: probably just getting some fresh salt.542            return response543        # Check that salt is valid.544        salt = auth_request.hash_salt545        try:546            if not token.assert_valid_salt(salt, auth_request.email, instant):547                return response548        except ValueError as error:549            logging.info('Salt has not been generated by this server:\n %s', error)550            _abort_on_bad_auth()551        stored_hashed_password = user_auth_dict.get('hashedPassword', '')552        _check_password(stored_hashed_password, salt, auth_request.hashed_password)553        _parse_user_from_mongo(user_dict, response.authenticated_user)554        response.auth_token = token.create_token(user_id, 'auth')555        # Update the password.556        if auth_request.new_hashed_password:557            self._user_db.user_auth.replace_one(558                {'_id': user_object_id},559                {'hashedPassword': auth_request.new_hashed_password})560            response.is_password_updated = True561            user_dict['_id'] = user_id562            self.send_update_confirmation(user_dict)563        self._handle_returning_user(response)564        return response565    def _email_register(566            self, auth_request: auth_pb2.AuthRequest, response: auth_pb2.AuthResponse) \567            -> auth_pb2.AuthResponse:568        """Registers a new user using an email address."""569        is_existing_user = self._load_user_with_token(570            auth_request, response.authenticated_user, is_timestamp_required=False)571        force_update = False572        if not is_existing_user and auth_request.hashed_password:573            if not auth_request.first_name:574                flask.abort(422, i18n.flask_translate('Le champ first_name est nécessaire'))575        should_create_account = not is_existing_user and auth_request.hashed_password576        if is_existing_user or should_create_account:577            force_update |= response.authenticated_user.profile.email != auth_request.email578            response.authenticated_user.profile.email = auth_request.email579            response.authenticated_user.hashed_email = hash_user_email(auth_request.email)580            response.is_new_user = not response.authenticated_user.has_account581            force_update |= response.is_new_user582            response.authenticated_user.has_account = True583            force_update |= \584                response.authenticated_user.has_password != bool(auth_request.hashed_password)585            response.authenticated_user.has_password = bool(auth_request.hashed_password)586        if is_existing_user:587            self._handle_returning_user(response, force_update=force_update)588        elif should_create_account:589            response.authenticated_user.profile.name = auth_request.first_name590            response.authenticated_user.profile.last_name = auth_request.last_name591            self.save_new_user(response.authenticated_user, auth_request.user_data)592        if auth_request.hashed_password:593            object_id = objectid.ObjectId(response.authenticated_user.user_id)594            self._user_db.user_auth.replace_one(595                {'_id': object_id},596                {'hashedPassword': auth_request.hashed_password},597                upsert=True)598            response.is_password_updated = True599        response.is_new_user = not is_existing_user600        # TODO(cyrille): Consider dropping if there's no user_id.601        response.auth_token = token.create_token(response.authenticated_user.user_id, 'auth')602        return response603    def _reset_password(604            self, auth_request: auth_pb2.AuthRequest, user_id: str,605            user_auth_dict: dict[str, str], user_dict: dict[str, Any]) -> None:606        """Resets the user's password.607        The auth_request.auth_token here is the reset_token provided by create_reset_token.608        """609        try:610            is_token_valid = token.assert_valid_salt(611                auth_request.auth_token,612                auth_request.email + user_id + user_auth_dict.get('hashedPassword', ''),613                int(time.time()))614            if not is_token_valid:615                logging.info('Token in outdated.')616                raise ExpiredTokenException(_get_auth_error_message())617        except ValueError as error:618            logging.info('Token has not been generated by this server:\n %s', error)619            _abort_on_bad_auth()620        self._user_db.user_auth.replace_one(621            {'_id': objectid.ObjectId(user_id)},622            {'hashedPassword': auth_request.hashed_password})623        self.send_update_confirmation(user_dict)624    def _token_authenticate(self, auth_request: auth_pb2.AuthRequest) -> auth_pb2.AuthResponse:625        instant = int(time.time())626        response = auth_pb2.AuthResponse()627        response.hash_salt = token.timestamped_hash(instant, auth_request.email)628        self._load_user_with_token(auth_request, response.authenticated_user)629        if response.authenticated_user.HasField('deleted_at'):630            flask.abort(404, i18n.flask_translate('Compte supprimé'))631        response.auth_token = token.create_token(auth_request.user_id, 'auth')632        self._handle_returning_user(response)633        return response634    def _load_user_with_token(635            self, auth_request: auth_pb2.AuthRequest, out_user: user_pb2.User,636            is_timestamp_required: bool = True) -> bool:637        if not auth_request.user_id:638            return False639        try:640            user_id = objectid.ObjectId(auth_request.user_id)641        except bson.errors.InvalidId:642            flask.abort(643                400,644                i18n.flask_translate(645                    'L\'identifiant utilisateur "{user_id}" n\'a pas le bon format.',646                ).format(user_id=auth_request.user_id))647        try:648            if not token.assert_valid_salt(649                    auth_request.auth_token, str(user_id), int(time.time()),650                    validity_seconds=datetime.timedelta(days=5).total_seconds(),651                    role='auth') \652                    and is_timestamp_required:653                raise ExpiredTokenException(i18n.flask_translate("Token d'authentification périmé"))654        except ValueError as error:655            flask.abort(656                403,657                i18n.flask_translate("Le sel n'a pas été généré par ce serveur\xa0: {error}.")658                .format(error=error))659        user_dict = self._user_collection.find_one({'_id': user_id})660        if not user_dict:661            flask.abort(404, i18n.flask_translate('Utilisateur inconnu.'))662        _parse_user_from_mongo(user_dict, out_user)663        return True664    def _load_user_from_token_or_email(665            self, auth_request: auth_pb2.AuthRequest, user: user_pb2.User,666            email: Optional[str]) -> bool:667        is_existing_user = email and proto.parse_from_mongo(668            self._user_collection.find_one({'hashedEmail': hash_user_email(email)}),669            user, 'user_id') or \670            self._load_user_with_token(auth_request, user, is_timestamp_required=False)671        had_email = bool(user.profile.email)672        if not had_email and email:673            user.profile.email = email674        return is_existing_user675    def _create_guest_user(self, auth_request: auth_pb2.AuthRequest) -> auth_pb2.AuthResponse:676        response = auth_pb2.AuthResponse()677        response.authenticated_user.profile.name = auth_request.first_name678        self.save_new_user(response.authenticated_user, auth_request.user_data)679        response.auth_token = token.create_token(response.authenticated_user.user_id, 'auth')680        response.is_new_user = True681        return response682    def create_reset_token(self, user_id: objectid.ObjectId) -> Tuple[Optional[str], str]:683        """Create a token to reset the user's password."""684        user_dict = self._user_collection.find_one({'_id': user_id})685        if not user_dict:686            return None, ''687        return self._create_reset_token_from_user(user_dict)688    def _create_reset_token_from_user(self, user_dict: dict[str, Any]) -> Tuple[Optional[str], str]:689        """Returns the reset token, and the linked email address."""690        email = user_dict.get('profile', {}).get('email', '')691        user_auth_dict = self._user_db.user_auth.find_one({'_id': user_dict['_id']})692        if not user_auth_dict or not user_auth_dict.get('hashedPassword'):693            return None, email694        hashed_old_password = user_auth_dict.get('hashedPassword', '')695        reset_token = token.timestamped_hash(696            int(time.time()), email + str(user_dict['_id']) + hashed_old_password)697        return reset_token, email698    def send_reset_password_token(self, email: str) -> None:699        """Sends an email to user with a reset token so that they can reset their password."""700        user_dict = self._user_collection.find_one({'hashedEmail': hash_user_email(email)})701        if not user_dict:702            # No user with this email address, however we don't want to tell that to a potential703            # attacker.704            return705        reset_token, unused_email = self._create_reset_token_from_user(user_dict)706        # User is a guest and/or doesn't have a password so we send them a email to login.707        if not reset_token:708            self.send_auth_token(user_dict)709            return710        user_profile = proto.create_from_mongo(711            user_dict.get('profile'), user_profile_pb2.UserProfile)712        reset_link = self._get_reset_password_link(user_dict)713        if not reset_link:714            return715        template_vars = {716            'firstname': user_profile.name,717            'productName': product.bob.name,718            'productLogoUrl': product.bob.get_config('productLogoUrl', ''),719            'resetLink': reset_link,720        }721        # TODO(cyrille): Create a static Campaign object and use it.722        result = mail_send.send_template('reset-password', user_profile, template_vars)723        if result.status_code != 200:724            logging.error('Failed to send an email with MailJet:\n %s', result.text)725            flask.abort(result.status_code)726    def send_auth_token(self, user_dict: dict[str, Any]) -> None:727        """Sends an email to the user with an auth token so that they can log in."""728        user_profile = proto.create_from_mongo(729            user_dict.get('profile'), user_profile_pb2.UserProfile)730        user_id = str(user_dict['_id'])731        auth_link = token.create_logged_url(user_id)732        template_vars = {733            'authLink': auth_link,734            'firstname': user_profile.name,735            'productName': product.bob.name,736            'productLogoUrl': product.bob.get_config('productLogoUrl', ''),737        }738        # TODO(cyrille): Create a static Campaign object and use it.739        result = mail_send.send_template('send-auth-token', user_profile, template_vars)740        if result.status_code != 200:741            logging.error('Failed to send an email with MailJet:\n %s', result.text)742            flask.abort(result.status_code)743    def _handle_returning_user(744            self, response: auth_pb2.AuthResponse, force_update: bool = False) -> None:745        response.last_access_at.CopyFrom(self._update_returning_user(746            response.authenticated_user,747            force_update=force_update or response.is_new_user))748def _get_oauth2_access_token(749        endpoint: str, code: str, client_id: str, client_secret: str, auth_name: str = 'OAuth2') \750        -> dict[str, str]:751    token_response = requests.post(752        endpoint,753        data=dict(754            grant_type='authorization_code',755            code=code,756            client_id=client_id,757            client_secret=client_secret,758            redirect_uri=flask.request.url_root,759        ))760    if token_response.status_code < 200 or token_response.status_code >= 400:761        try:762            json_error = typing.cast(dict[str, str], token_response.json())763        except request_exceptions.JSONDecodeError:764            json_error = {}765        if json_error and json_error.keys() == {'error', 'error_description'}:766            error_description = json_error['error_description']767            if json_error['error'] in ('redirect_uri_mismatch', 'invalid_redirect_uri'):768                error_description += f' "{flask.request.url_root}"'769            logging.warning('%s fails (%s): %s', auth_name, json_error['error'], error_description)770        else:771            logging.warning(772                '%s fails (%d): "%s"',773                auth_name, token_response.status_code, token_response.text)774        flask.abort(403, token_response.text)...test_auth.py
Source:test_auth.py  
...28@pytest.fixture29def user(username="testuser", **kw):30    return User.objects.get_or_create(username=username, **kw)[0]31@pytest.fixture32def auth_request():33    r = create_request("GET", "/")34    r.user = user()35    return r36@pytest.fixture37def super_request():38    r = create_request("GET", "/")39    r.user = superuser()40    return r41from wheelcms_axle.workflows.default import Workflow42@pytest.mark.usefixtures("localtyperegistry")43class TestAssignments(object):44    type = Type1Type45    def setup(self):46        ## Make sure there are no workflow permission updates...aws_v4.py
Source:aws_v4.py  
1"""p2 s3 authentication mixin"""2import hashlib3import hmac4from typing import Any, List, Optional5from urllib.parse import quote6from django.contrib.auth.models import User7from django.http import HttpRequest, QueryDict8from structlog import get_logger9from p2.api.models import APIKey10from p2.lib.hash import chunked_hasher11from p2.s3.auth.base import BaseAuth12from p2.s3.errors import (AWSAccessDenied, AWSContentSignatureMismatch,13                          AWSSignatureMismatch)14LOGGER = get_logger()15UNSIGNED_PAYLOAD = 'UNSIGNED-PAYLOAD'16class SignatureMismatch(Exception):17    """Exception raised when given Hash does not match request body's hash"""18# pylint: disable=too-many-instance-attributes19class AWSv4AuthenticationRequest:20    """Holds all pieces of an AWSv4 Authenticated Request"""21    algorithm: str = ""22    signed_headers: str = ""23    signature: str = ""24    access_key: str = ""25    date: str = ""26    date_long: str = ""27    region: str = ""28    service: str = ""29    request: str = ""30    hash: str = ""31    def __init__(self):32        self.algorithm = self.date = self.signed_headers = self.signature = self.hash = ""33        self.access_key = self.date_long = self.region = self.service = self.request = ""34    @property35    def credentials(self) -> str:36        """Join properties together to re-construct credential string"""37        return "/".join([38            self.date,39            self.region,40            self.service,41            self.request42        ])43    @credentials.setter44    def credentials(self, value: str):45        # Further split credential value46        self.access_key, self.date, self.region, self.service, self.request = value.split('/')47    @staticmethod48    def from_querystring(get_dict: QueryDict) -> Optional['AWSv4AuthenticationRequest']:49        """Check if AWSv4 Authentication information was sent via Querystring,50        abd parse it into an AWSv4AuthenticationRequest object. If querystring doesn't51        contain necessary parameters, None is returned."""52        required_parameters = ['X-Amz-Date', 'X-Amz-Credential',53                               'X-Amz-SignedHeaders', 'X-Amz-Signature']54        for required_parameter in required_parameters:55            if required_parameter not in get_dict:56                return None57        auth_request = AWSv4AuthenticationRequest()58        auth_request.credentials = get_dict.get('X-Amz-Credential')59        auth_request.signed_headers = get_dict.get('X-Amz-SignedHeaders')60        auth_request.date_long = get_dict.get('X-Amz-Date')61        auth_request.signature = get_dict.get('X-Amz-Signature')62        return auth_request63    @staticmethod64    def from_header(headers: dict) -> Optional['AWSv4AuthenticationRequest']:65        """Check if AWSv4 Authentication information was sent via headers,66        and parse it into an AWSv4AuthenticationRequest object. If headers don't67        contain necessary information, None is returned."""68        # Check if headers exist, otherwise return None69        if 'HTTP_AUTHORIZATION' not in headers:70            return None71        auth_request = AWSv4AuthenticationRequest()72        auth_request.algorithm, credential_container = \73            headers.get('HTTP_AUTHORIZATION').split(' ', 1)74        credential, signed_headers, signature = credential_container.split(',')75        # Remove "Credential=" from string76        _, auth_request.credentials = credential.split("=")77        _, auth_request.signed_headers = signed_headers.split("=")78        _, auth_request.signature = signature.split("=")79        auth_request.date_long = headers.get('HTTP_X_AMZ_DATE')80        if not auth_request.date_long:81            auth_request.date_long = auth_request.date82        return auth_request83class AWSV4Authentication(BaseAuth):84    """AWS v4 Signer"""85    # Key derivation functions. See:86    # http://docs.aws.amazon.com/general/latest/gr/signature-v4-examples.html#signature-v4-examples-python87    def _sign(self, key: str, msg: str) -> hmac.HMAC:88        """Simple HMAC wrapper"""89        return hmac.new(key, msg.encode('utf-8'), hashlib.sha256)90    def _get_signature_key(self, key: str, auth_request: AWSv4AuthenticationRequest) -> str:91        """Create signature like92        https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-header-based-auth.html"""93        k_date = self._sign(('AWS4' + key).encode('utf-8'), auth_request.date).digest()94        k_region = self._sign(k_date, auth_request.region).digest()95        k_service = self._sign(k_region, auth_request.service).digest()96        k_signing = self._sign(k_service, auth_request.request).digest()97        return k_signing98    def _make_query_string(self) -> str:99        """Parse existing Querystring, URI-encode them and sort them and put them back together"""100        pairs = []101        if self.request.META['QUERY_STRING'] == '':102            return self.request.META['QUERY_STRING']103        for kv_pair in self.request.META['QUERY_STRING'].split('&'):104            if '=' not in kv_pair:105                kv_pair = kv_pair + '='106            pairs.append(kv_pair)107        pairs.sort()108        return '&'.join(pairs)109    def _get_canonical_headers(self, only: List[str]) -> str:110        """Fix header keys from HTTP_X to x"""111        canonical_headers = ""112        def sorter(item):113            """Remove HTTP_ prefix, replace underscores with hyphens114            and lowercase convert to lowercase for comparison"""115            return item[0].replace('HTTP_', '', 1).replace('_', '-').lower()116        for header_key, header_value in sorted(self.request.META.items(), key=sorter):117            fixed_key = header_key.replace('HTTP_', '', 1).replace('_', '-').lower()118            if fixed_key in only:119                canonical_headers += f"{fixed_key}:{header_value}\n"120        return canonical_headers121    def _get_sha256(self, data: Any) -> str:122        """Get body hash in sha256"""123        hasher = hashlib.sha256()124        hasher.update(data)125        return hasher.hexdigest()126    def _get_canonical_request(self, auth_request: AWSv4AuthenticationRequest) -> str:127        """Create canonical request in AWS format (128        https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-header-based-auth.html)"""129        signed_headers_keys = auth_request.signed_headers.split(';')130        canonical_request = [131            self.request.META.get('REQUEST_METHOD', ''),132            quote(self.request.META.get('PATH_INFO', '')),133            self._make_query_string(),134            self._get_canonical_headers(signed_headers_keys),135            auth_request.signed_headers,136            auth_request.hash,137        ]138        return '\n'.join(canonical_request)139    def _lookup_access_key(self, access_key: str) -> Optional[APIKey]:140        """Lookup access_key in database, return secret if found otherwise None"""141        keys = APIKey.objects.filter(access_key=access_key)142        if keys.exists():143            return keys.first()144        return None145    @staticmethod146    def can_handle(request: HttpRequest) -> bool:147        if 'HTTP_AUTHORIZATION' in request.META:148            return 'AWS4-HMAC-SHA256' in request.META['HTTP_AUTHORIZATION']149        if 'X-Amz-Signature' in request.GET:150            return True151        return False152    def verify_content_sha256(self, auth_request: AWSv4AuthenticationRequest):153        """Verify X-Amz-Content-Sha256 Header, if sent"""154        # Header not set -> Empty hash, no checking155        if not auth_request.hash:156            auth_request.hash = ''157            return158        # Client has not calculated SHA256 of payload, no checking159        if auth_request.hash == UNSIGNED_PAYLOAD:160            return161        # Read request body chunk by chunk, compute sha256 and compare.162        request_body_hash = chunked_hasher(hashlib.sha256(), self.request)163        if auth_request.hash != request_body_hash:164            LOGGER.warning("CONTENT_SHA256 Header/param incorrect",165                           theirs=auth_request.hash,166                           ours=request_body_hash)167            raise AWSContentSignatureMismatch168    def validate(self) -> Optional[User]:169        """Check Authorization Header in AWS Compatible format"""170        auth_request = AWSv4AuthenticationRequest.from_header(self.request.META)171        if not auth_request:172            auth_request = AWSv4AuthenticationRequest.from_querystring(self.request.GET)173        auth_request.hash = self.request.META.get('HTTP_X_AMZ_CONTENT_SHA256')174        # Verify given Hash with request body175        self.verify_content_sha256(auth_request)176        # Build our own signature to compare177        secret_key = self._lookup_access_key(auth_request.access_key)178        if not secret_key:179            LOGGER.warning("No secret key found for request", access_key=auth_request.access_key)180            raise AWSAccessDenied181        signing_key = self._get_signature_key(secret_key.secret_key, auth_request)182        canonical_request = self._get_canonical_request(auth_request)183        string_to_sign = '\n'.join([184            auth_request.algorithm,185            auth_request.date_long,186            auth_request.credentials,187            self._get_sha256(canonical_request.encode('utf-8')),188        ])189        our_signature = self._sign(signing_key, string_to_sign).hexdigest()190        if auth_request.signature != our_signature:191            LOGGER.warning("Canonical Request", canonical_request=canonical_request)192            LOGGER.warning("Signatures", theirs=auth_request.signature,193                           ours=our_signature)194            raise AWSSignatureMismatch...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
