How to use auth_request method in tempest

Best Python code snippet using tempest_python

auth.py

Source:auth.py Github

copy

Full Screen

1"""Authentication module for MyGamePlan."""2import binascii3import datetime4import functools5import hashlib6import logging7import os8import re9import time10import typing11from typing import Any, Callable, NoReturn, Optional, Tuple, TypedDict, Union12from urllib import parse13import bson14from bson import objectid15import flask16from google.protobuf import json_format17from google.protobuf import timestamp_pb218import requests19from requests import exceptions as request_exceptions20import werkzeug21from werkzeug import exceptions22from werkzeug import http23from bob_emploi.common.python import now24from bob_emploi.frontend.server import auth_token as token25from bob_emploi.frontend.server import french26from bob_emploi.frontend.server import i18n27from bob_emploi.frontend.server import mongo28from bob_emploi.frontend.server import privacy29from bob_emploi.frontend.server import product30from bob_emploi.frontend.server import proto31from bob_emploi.frontend.server.mail import campaign32from bob_emploi.frontend.server.mail import mail_send33from bob_emploi.frontend.api import auth_pb234from bob_emploi.frontend.api import diagnostic_pb235from bob_emploi.frontend.api import features_pb236from bob_emploi.frontend.api import user_pb237from bob_emploi.frontend.api import user_profile_pb238if typing.TYPE_CHECKING:39 class _UpdateReturningUserFuncType(typing.Protocol):40 def __call__(41 self, user: user_pb2.User, /, force_update: bool = ...,42 ) -> timestamp_pb2.Timestamp:43 ...44_LinkedInEmailResponse = TypedDict(45 '_LinkedInEmailResponse', {46 'handle~': dict[str, str],47 }, total=False)48_EMPLOI_STORE_CLIENT_ID = os.getenv('EMPLOI_STORE_CLIENT_ID')49_EMPLOI_STORE_CLIENT_SECRET = os.getenv('EMPLOI_STORE_CLIENT_SECRET')50# https://www.linkedin.com/developer/apps/4800174/auth51_LINKED_IN_CLIENT_ID = os.getenv('LINKED_IN_CLIENT_ID', '86r4xh5py0mw9k')52_LINKED_IN_CLIENT_SECRET = os.getenv('LINKED_IN_CLIENT_SECRET', 'fake-key-fake-key')53_PE_CONNECT_GENDER = {54 'female': user_profile_pb2.FEMININE,55 'male': user_profile_pb2.MASCULINE,56}57_AUTH_FIELDS = {58 'googleId': 'Google',59 'facebookId': 'Facebook',60 'peConnectId': 'Pôle emploi',61 'linkedInId': 'LinkedIn',62}63_AUTH_TOKEN_COOKIE_NAME = 'auth_token'64_AUTH_TOKEN_PERSISTENCE_DURATION = datetime.timedelta(days=30)65http.HTTP_STATUS_CODES[498] = 'Authentication token expired'66_MA_VOIE_API_URL = os.getenv('MA_VOIE_API_URL')67_MA_VOIE_AUTH = tuple(os.getenv('MA_VOIE_AUTH', ':').split(':', 1))68# TODO(cyrille): Add the env variable to the cloudformation template.69_ENABLE_ACTION_PLAN = bool(os.getenv('ENABLE_ACTION_PLAN', os.getenv('BOB_DEPLOYMENT') == 'fr'))70def _register_to_ma_voie(ma_voie: user_pb2.MaVoieInfo) -> None:71 if not _MA_VOIE_API_URL or not _MA_VOIE_AUTH[0] or not _MA_VOIE_AUTH[1]:72 logging.warning(73 'Ma Voie registration is not well configured:\nURL "%s"\nusername "%s"\npassword %s',74 _MA_VOIE_API_URL, _MA_VOIE_AUTH[0], '"****"' if _MA_VOIE_AUTH[1] else 'not set')75 return76 response = requests.post(77 f'{_MA_VOIE_API_URL}/user/{ma_voie.ma_voie_id}/register',78 auth=_MA_VOIE_AUTH, json={'stepId': ma_voie.step_id})79 try:80 response.raise_for_status()81 except request_exceptions.HTTPError:82 logging.error("Couln't register user to Ma Voie.")83class ExpiredTokenException(exceptions.HTTPException):84 """Exception class for expired authentication tokens."""85 code = 49886 description = '<p>The authentication token has expired.</p>'87def require_admin(func: Callable[..., Any]) -> Callable[..., Any]:88 """Decorator for a function that requires admin authorization."""89 def _decorated_fun(*args: Any, **kwargs: Any) -> Any:90 request_token = flask.request.headers.get('Authorization', '')91 try:92 is_valid_token = token.check_admin_token(request_token)93 except ValueError:94 is_valid_token = False95 if not is_valid_token:96 # TODO(cyrille): Raise on empty ADMIN_AUTH_TOKEN and check this before.97 if not request_token:98 flask.abort(401)99 flask.abort(403)100 return func(*args, **kwargs)101 return functools.wraps(func)(_decorated_fun)102# TODO(cyrille): Use typing_extension to ensure get_user_id has the same param103# as the function that is returned.104def require_user(get_user_id: Callable[..., str], role: str = 'auth') \105 -> Callable[[Callable[..., Any]], Any]:106 """Check if authenticated user has a valid token in Authorization header."""107 def _decorator(func: Callable[..., Any]) -> Any:108 def _decorated_fun(*args: Any, **kwargs: Any) -> Any:109 auth_token = flask.request.headers.get('Authorization', '').replace('Bearer ', '')110 if not auth_token:111 auth_token = flask.request.args.get('token', '')112 if not auth_token:113 auth_token = flask.request.cookies.get(_AUTH_TOKEN_COOKIE_NAME, '')114 if not auth_token:115 flask.abort(401, i18n.flask_translate('Token manquant'))116 user_id = get_user_id(*args, **kwargs)117 try:118 token.check_token(user_id, auth_token, role=role)119 except ValueError:120 flask.abort(403, i18n.flask_translate('Unauthorized token'))121 return func(*args, **kwargs)122 return functools.wraps(func)(_decorated_fun)123 return _decorator124def _set_auth_cookie(125 response: flask.Response, *, value: str, expires_delta: Optional[float]) -> None:126 response.set_cookie(127 _AUTH_TOKEN_COOKIE_NAME, value=value, path='/api/', secure=True, httponly=True,128 expires=(int(time.time() - expires_delta)) if expires_delta else None)129_FlaskResponse = Union[str, werkzeug.Response, tuple[str, int]]130def clear_user_cookies(func: Callable[..., _FlaskResponse]) -> Callable[..., flask.Response]:131 """Decorator to clear a user auth cookies when returning."""132 def _decorated_fun(*args: Any, **kwargs: Any) -> flask.Response:133 response = func(*args, **kwargs)134 flask_response = flask.make_response(response)135 _set_auth_cookie(flask_response, value='', expires_delta=-3600)136 return flask_response137 return functools.wraps(func)(_decorated_fun)138def set_auth_cookie(139 response: flask.Response, auth_token: str, is_persistent: bool = False) -> None:140 """Sets the auth cookie for future re-authentication."""141 _set_auth_cookie(142 response, value=auth_token,143 expires_delta=_AUTH_TOKEN_PERSISTENCE_DURATION.total_seconds() if is_persistent else None)144def require_user_in_args(role: str = 'auth') -> Callable[[Callable[..., Any]], Callable[..., Any]]:145 """Check if authenticated user has a valid token in request GET args."""146 def _decorator(func: Callable[..., Any]) -> Callable[..., Any]:147 def _decorated_fun(*args: Any, **kwargs: Any) -> Any:148 auth_token = flask.request.args.get('token')149 user_id = flask.request.args.get('user')150 if not user_id or not auth_token:151 flask.abort(422, i18n.flask_translate('Paramètres manquants.'))152 try:153 token.check_token(user_id, auth_token, role=role)154 except ValueError:155 flask.abort(403, i18n.flask_translate('Accès non autorisé.'))156 return func(*args, **dict(kwargs, user_id=user_id))157 return functools.wraps(func)(_decorated_fun)158 return _decorator159def require_google_user(160 emails_pattern: str = '@bayesimpact.org', email_kwarg: Optional[str] = None) \161 -> Callable[[Callable[..., Any]], Callable[..., Any]]:162 """Check if authenticated user has a valid google id token163 in Authorization header, and associated google account is from164 bayesimpact.org domain.165 Args:166 emails_pattern: the regex pattern that email should validate to be authorized.167 email_kwarg: if set, the decorated function will get an extra keyword arg with168 the actual email address. If an admin token is used, the forwarded email address is169 'admin@bayesimpact.org'.170 """171 emails_regexp = re.compile(emails_pattern)172 def _get_google_email(authorization: str) -> str:173 try:174 token.check_admin_token(authorization)175 return 'admin@bayesimpact.org'176 except ValueError:177 pass178 if authorization.startswith('Bearer '):179 authorization = authorization.removeprefix('Bearer ')180 else:181 authorization = flask.request.args.get('token', '')182 if not authorization:183 flask.abort(401, i18n.flask_translate('Token manquant'))184 try:185 return token.check_google_token(authorization, emails_regexp)186 except i18n.TranslatableException as error:187 flask.abort(401, error.flask_translate())188 def _decorator(wrapped: Callable[..., Any]) -> Callable[..., Any]:189 @functools.wraps(wrapped)190 def _wrapper(*args: Any, **kwargs: Any) -> Any:191 email = _get_google_email(flask.request.headers.get('Authorization', ''))192 if email_kwarg:193 kwargs = kwargs | {email_kwarg: email}194 return wrapped(*args, **kwargs)195 return _wrapper196 return _decorator197def hash_user_email(email: str) -> str:198 """Hash email for better obfuscation of personal data."""199 hashed_email = hashlib.sha1()200 hashed_email.update(b'bob-emploi')201 hashed_email.update(email.lower().encode('utf-8'))202 return hashed_email.hexdigest()203def delete_user(user_proto: user_pb2.User, user_database: mongo.UsersDatabase) -> bool:204 """Close a user's account.205 We assume the given user_proto is up-to-date, e.g. just being fetched from database.206 """207 try:208 user_id = objectid.ObjectId(user_proto.user_id)209 except bson.errors.InvalidId:210 logging.exception('Tried to delete a user with invalid ID "%s"', user_proto.user_id)211 return False212 filter_user = {'_id': user_id}213 # Remove authentication informations.214 user_database.user_auth.delete_one(filter_user)215 try:216 privacy.redact_proto(user_proto)217 except TypeError:218 logging.exception('Cannot delete account %s', str(user_id))219 return False220 user_proto.deleted_at.FromDatetime(now.get())221 user_proto.ClearField('user_id')222 user_database.user.replace_one(filter_user, json_format.MessageToDict(user_proto))223 return True224def _parse_user_from_mongo(user_dict: dict[str, Any], user: user_pb2.User) -> None:225 if not proto.parse_from_mongo(user_dict, user, 'user_id'):226 flask.abort(227 500,228 i18n.flask_translate(229 'Les données utilisateur sont corrompues dans la base de données.'))230def _abort_failed_login() -> None:231 flask.abort(232 403,233 i18n.flask_translate(234 "L'email et le mot de passe ne correspondent pas. " +235 "Si vous avez déjà créé un compte mais que vous n'avez pas créé votre mot de passe, " +236 'nous venons de vous envoyer un email pour vous connecter.'))237def _check_password(stored_hashed_password: str, salt: str, request_hashed_password: str) -> None:238 hashed_password = hashlib.sha1()239 hashed_password.update(salt.encode('ascii'))240 hashed_password.update(stored_hashed_password.encode('ascii'))241 request_hashed_password_bin = binascii.unhexlify(request_hashed_password)242 if request_hashed_password_bin != hashed_password.digest():243 _abort_failed_login()244def _get_auth_error_message() -> str:245 return i18n.flask_translate("Les informations d'authentification ne sont pas valides.")246def _abort_on_bad_auth() -> NoReturn:247 flask.abort(403, _get_auth_error_message())248class Authenticator:249 """An object to authenticate requests."""250 def __init__(251 self, user_db: mongo.UsersDatabase,252 save_new_user: Callable[[user_pb2.User], user_pb2.User],253 update_returning_user: '_UpdateReturningUserFuncType') -> None:254 self._user_db = user_db255 self._save_new_user = save_new_user256 self._update_returning_user = update_returning_user257 self._user_collection = self._user_db.user258 def save_new_user(259 self, user: user_pb2.User, user_data: auth_pb2.AuthUserData) -> user_pb2.User:260 """Save a user with additional data."""261 user.profile.locale = user_data.locale262 user.features_enabled.alpha = user_data.is_alpha263 user.features_enabled.exclude_from_analytics = user_data.is_alpha264 if _ENABLE_ACTION_PLAN or user_data.is_action_plan_enabled:265 user.features_enabled.action_plan = features_pb2.ACTIVE266 if user_data.HasField('ma_voie'):267 user.ma_voie.CopyFrom(user_data.ma_voie)268 _register_to_ma_voie(user_data.ma_voie)269 if user_data.HasField('original_self_diagnostic'):270 original_self_diagnostic = diagnostic_pb2.SelfDiagnostic()271 original_self_diagnostic.CopyFrom(user_data.original_self_diagnostic)272 user.projects.add(is_incomplete=True, original_self_diagnostic=original_self_diagnostic)273 return self._save_new_user(user)274 def authenticate(self, auth_request: auth_pb2.AuthRequest) -> auth_pb2.AuthResponse:275 """Authenticate a user."""276 if auth_request.google_token_id:277 return self._google_authenticate(auth_request)278 if auth_request.facebook_access_token:279 return self._facebook_authenticate(auth_request)280 if auth_request.pe_connect_code:281 return self._pe_connect_authenticate(auth_request)282 if auth_request.linked_in_code:283 return self._linked_in_authenticate(auth_request)284 if auth_request.email:285 return self._email_authenticate(auth_request)286 if auth_request.user_id:287 return self._token_authenticate(auth_request)288 # Create a guest user.289 if auth_request.first_name:290 return self._create_guest_user(auth_request)291 logging.warning('No mean of authentication found:\n%s', auth_request)292 flask.abort(422, i18n.flask_translate("Aucun moyen d'authentification n'a été trouvé."))293 def change_email(self, user_proto: user_pb2.User, auth_request: auth_pb2.AuthRequest) \294 -> user_pb2.User:295 """Change user's email address."""296 new_hashed_email = hash_user_email(auth_request.email)297 if user_proto.hashed_email == new_hashed_email:298 # Trying to set the same email.299 return user_proto300 user_auth_dict = self._user_db.user_auth.find_one(301 {'_id': objectid.ObjectId(user_proto.user_id)})302 if user_auth_dict or auth_request.hashed_password:303 if not user_auth_dict:304 flask.abort(305 422, i18n.flask_translate("L'utilisateur n'a pas encore de mot de passe"))306 stored_hashed_password = user_auth_dict.get('hashedPassword', '')307 _check_password(308 stored_hashed_password, auth_request.hash_salt, auth_request.hashed_password)309 existing = self._user_collection.find_one({'hashedEmail': new_hashed_email}, {'_id': 1})310 if existing:311 flask.abort(312 403,313 i18n.flask_translate('L\'email "{email}" est déjà utilisé par un autre compte')314 .format(email=auth_request.email))315 user_proto.profile.email = auth_request.email316 user_proto.hashed_email = new_hashed_email317 if user_auth_dict:318 self._user_db.user_auth.replace_one(319 {'_id': objectid.ObjectId(user_proto.user_id)},320 {'hashedPassword': auth_request.new_hashed_password})321 self._update_returning_user(user_proto, force_update=True)322 return user_proto323 def _google_authenticate(self, auth_request: auth_pb2.AuthRequest) -> auth_pb2.AuthResponse:324 try:325 id_info = token.decode_google_id_token(auth_request.google_token_id)326 except i18n.TranslatableException as error:327 flask.abort(401, error.flask_translate())328 response = auth_pb2.AuthResponse()329 user_dict = self._user_collection.find_one({'googleId': id_info['sub']})330 if proto.parse_from_mongo(user_dict, response.authenticated_user, 'user_id'):331 self._handle_returning_user(response)332 else:333 is_existing_user = self._load_user_from_token_or_email(334 auth_request, response.authenticated_user, id_info['email'])335 response.authenticated_user.profile.picture_url = id_info.get('picture', '')336 response.authenticated_user.google_id = id_info['sub']337 response.is_new_user = not response.authenticated_user.has_account338 response.authenticated_user.has_account = True339 if is_existing_user:340 self._handle_returning_user(response, force_update=True)341 else:342 self.save_new_user(response.authenticated_user, auth_request.user_data)343 response.auth_token = token.create_token(response.authenticated_user.user_id, 'auth')344 return response345 def _facebook_authenticate(346 self, auth_request: auth_pb2.AuthRequest) -> auth_pb2.AuthResponse:347 user_info_response = requests.get('https://graph.facebook.com/v4.0/me', params=dict(348 fields='id,first_name,email',349 access_token=auth_request.facebook_access_token,350 ))351 if user_info_response.status_code < 200 or user_info_response.status_code >= 300:352 flask.abort(353 user_info_response.status_code,354 user_info_response.json().get('error', {}).get('message', ''))355 user_info = typing.cast(dict[str, str], user_info_response.json())356 response = auth_pb2.AuthResponse()357 user_dict = self._user_collection.find_one({'facebookId': user_info['id']})358 if proto.parse_from_mongo(user_dict, response.authenticated_user, 'user_id'):359 self._handle_returning_user(response)360 else:361 is_existing_user = self._load_user_from_token_or_email(362 auth_request, response.authenticated_user, user_info.get('email'))363 response.authenticated_user.facebook_id = user_info['id']364 response.is_new_user = not response.authenticated_user.has_account365 response.authenticated_user.has_account = True366 if is_existing_user:367 self._handle_returning_user(response, force_update=True)368 else:369 response.authenticated_user.profile.name = user_info.get('first_name', '')370 self.save_new_user(response.authenticated_user, auth_request.user_data)371 response.auth_token = token.create_token(response.authenticated_user.user_id, 'auth')372 return response373 def _pe_connect_authenticate(self, auth_request: auth_pb2.AuthRequest) -> auth_pb2.AuthResponse:374 token_data = _get_oauth2_access_token(375 'https://authentification-candidat.pole-emploi.fr/connexion/oauth2/access_token?'376 'realm=/individu',377 code=auth_request.pe_connect_code,378 client_id=_EMPLOI_STORE_CLIENT_ID or '',379 client_secret=_EMPLOI_STORE_CLIENT_SECRET or '',380 auth_name='PE Connect',381 )382 if token_data.get('nonce') != auth_request.pe_connect_nonce:383 flask.abort(403, i18n.flask_translate('Mauvais paramètre nonce'))384 bearer = token_data.get('token_type', 'Bearer')385 access_token = token_data.get('access_token', '')386 authorization_header = f'{bearer} {access_token}'387 user_info_response = requests.get(388 'https://api.emploi-store.fr/partenaire/peconnect-individu/v1/userinfo',389 headers={'Authorization': authorization_header})390 if user_info_response.status_code < 200 or user_info_response.status_code >= 400:391 logging.warning(392 'PE Connect fails (%d): "%s"', user_info_response.status_code,393 user_info_response.text)394 flask.abort(403, user_info_response.text)395 user_info = typing.cast(dict[str, str], user_info_response.json())396 response = auth_pb2.AuthResponse()397 user_dict = self._user_collection.find_one({'peConnectId': user_info['sub']})398 if proto.parse_from_mongo(user_dict, response.authenticated_user, 'user_id'):399 self._handle_returning_user(response)400 else:401 user = response.authenticated_user402 is_existing_user = self._load_user_from_token_or_email(403 auth_request, user, user_info.get('email'))404 user.pe_connect_id = user_info['sub']405 response.is_new_user = force_update = not user.has_account406 user.has_account = True407 if is_existing_user:408 self._handle_returning_user(response, force_update=force_update)409 else:410 # TODO(pascal): Handle the case where one of the name is missing.411 user.profile.name = french.cleanup_firstname(user_info.get('given_name', ''))412 user.profile.last_name = french.cleanup_firstname(user_info.get('family_name', ''))413 user.profile.gender = _PE_CONNECT_GENDER.get(414 user_info.get('gender', ''), user_profile_pb2.UNKNOWN_GENDER)415 self.save_new_user(user, auth_request.user_data)416 response.auth_token = token.create_token(response.authenticated_user.user_id, 'auth')417 return response418 def _linked_in_authenticate(self, auth_request: auth_pb2.AuthRequest) -> auth_pb2.AuthResponse:419 token_data = _get_oauth2_access_token(420 'https://www.linkedin.com/oauth/v2/accessToken',421 code=auth_request.linked_in_code,422 client_id=_LINKED_IN_CLIENT_ID or '',423 client_secret=_LINKED_IN_CLIENT_SECRET or '',424 auth_name='LinkedIn Auth',425 )426 bearer = token_data.get('token_type', 'Bearer')427 access_token = token_data.get('access_token', '')428 authorization_header = f'{bearer} {access_token}'429 user_info_response = requests.get(430 'https://api.linkedin.com/v2/me',431 headers={'Authorization': authorization_header})432 if user_info_response.status_code < 200 or user_info_response.status_code >= 400:433 logging.warning(434 'LinkedIn Auth fails (%d): "%s"', user_info_response.status_code,435 user_info_response.text)436 flask.abort(403, user_info_response.text)437 user_info = typing.cast(dict[str, str], user_info_response.json())438 response = auth_pb2.AuthResponse()439 # TODO(cyrille): Factorize with other 3rd party auth.440 user_dict = self._user_collection.find_one({'linkedInId': user_info['id']})441 if proto.parse_from_mongo(user_dict, response.authenticated_user, 'user_id'):442 self._handle_returning_user(response)443 else:444 email_response = requests.get(445 'https://api.linkedin.com/v2/emailAddress?q=members&projection=(elements*(handle~))',446 headers={'Authorization': authorization_header})447 if email_response.status_code < 200 or email_response.status_code >= 400:448 logging.warning(449 'LinkedIn Auth fails (%d): "%s"', email_response.status_code,450 email_response.text)451 flask.abort(403, email_response.text)452 email_response_json = typing.cast(_LinkedInEmailResponse, email_response.json())453 email = email_response_json.get('handle~', {}).get('emailAddress')454 user = response.authenticated_user455 is_existing_user = self._load_user_from_token_or_email(auth_request, user, email)456 user.linked_in_id = user_info['id']457 response.is_new_user = not user.has_account458 user.has_account = True459 if is_existing_user:460 self._handle_returning_user(response, force_update=True)461 else:462 # TODO(pascal): Handle the case where one of the name is missing.463 user.profile.name = user_info.get('localizedFirstName', '')464 user.profile.last_name = user_info.get('localizedLastName', '')465 self.save_new_user(user, auth_request.user_data)466 response.auth_token = token.create_token(response.authenticated_user.user_id, 'auth')467 return response468 def _get_reset_password_link(self, user_dict: dict[str, Any]) -> str:469 auth_token, email = self._create_reset_token_from_user(user_dict)470 return parse.urljoin(flask.request.url_root, '?' + parse.urlencode({471 'email': email,472 'resetToken': auth_token}))473 def send_update_confirmation(self, user_dict: dict[str, Any]) -> None:474 """Sends an email to the user that confirms password change."""475 user_id = str(user_dict['_id'])476 if not user_id:477 return478 auth_link = token.create_logged_url(user_id)479 reset_link = self._get_reset_password_link(user_dict)480 if not reset_link or not auth_link:481 return482 user = proto.create_from_mongo(user_dict.copy(), user_pb2.User)483 template_vars = dict(484 campaign.get_default_coaching_email_vars(user),485 authLink=auth_link, resetPwdLink=reset_link)486 # TODO(cyrille): Create a static Campaign object and use it.487 result = mail_send.send_template(488 'send-pwd-update-confirmation', user.profile, template_vars)489 if result.status_code != 200:490 logging.error('Failed to send an email with MailJet:\n %s', result.text)491 flask.abort(result.status_code)492 def _email_authenticate(self, auth_request: auth_pb2.AuthRequest) -> auth_pb2.AuthResponse:493 instant = int(time.time())494 response = auth_pb2.AuthResponse()495 response.hash_salt = token.timestamped_hash(instant, auth_request.email)496 user_dict = self._user_collection.find_one(497 {'hashedEmail': hash_user_email(auth_request.email)})498 if not user_dict:499 return self._email_register(auth_request, response)500 user_object_id = user_dict['_id']501 user_id = str(user_object_id)502 user_auth_dict = self._user_db.user_auth.find_one({'_id': user_object_id})503 if not auth_request.hashed_password:504 # User exists but did not send a password: probably just getting some fresh salt.505 return response506 if not user_auth_dict:507 if not auth_request.auth_token:508 # User is trying to connect with a password, but never created one.509 self.send_auth_token(user_dict)510 _abort_failed_login()511 try:512 token.check_token(user_id, auth_request.auth_token, role='auth')513 except ValueError as error:514 logging.info('Invalid token:\n %s', error)515 _abort_on_bad_auth()516 # User that already uses an SSO account is now trying to add a password.517 _parse_user_from_mongo(user_dict, response.authenticated_user)518 self._user_db.user_auth.insert_one({519 '_id': user_object_id,520 'hashedPassword': auth_request.hashed_password,521 })522 response.auth_token = token.create_token(user_id, 'auth')523 response.authenticated_user.has_account = True524 response.authenticated_user.has_password = True525 response.is_password_updated = True526 self._handle_returning_user(response, force_update=True)527 return response528 if auth_request.user_id:529 # User is a guest using a pre-existing email account:530 # maybe they're using the same password.531 if auth_request.hashed_password != user_auth_dict.get('hashedPassword', ''):532 return response533 _parse_user_from_mongo(user_dict, response.authenticated_user)534 return response535 if auth_request.auth_token:536 self._reset_password(auth_request, user_id, user_auth_dict, user_dict)537 _parse_user_from_mongo(user_dict, response.authenticated_user)538 response.auth_token = token.create_token(user_id, 'auth')539 return response540 if not auth_request.hash_salt:541 # User exists but has not sent salt: probably just getting some fresh salt.542 return response543 # Check that salt is valid.544 salt = auth_request.hash_salt545 try:546 if not token.assert_valid_salt(salt, auth_request.email, instant):547 return response548 except ValueError as error:549 logging.info('Salt has not been generated by this server:\n %s', error)550 _abort_on_bad_auth()551 stored_hashed_password = user_auth_dict.get('hashedPassword', '')552 _check_password(stored_hashed_password, salt, auth_request.hashed_password)553 _parse_user_from_mongo(user_dict, response.authenticated_user)554 response.auth_token = token.create_token(user_id, 'auth')555 # Update the password.556 if auth_request.new_hashed_password:557 self._user_db.user_auth.replace_one(558 {'_id': user_object_id},559 {'hashedPassword': auth_request.new_hashed_password})560 response.is_password_updated = True561 user_dict['_id'] = user_id562 self.send_update_confirmation(user_dict)563 self._handle_returning_user(response)564 return response565 def _email_register(566 self, auth_request: auth_pb2.AuthRequest, response: auth_pb2.AuthResponse) \567 -> auth_pb2.AuthResponse:568 """Registers a new user using an email address."""569 is_existing_user = self._load_user_with_token(570 auth_request, response.authenticated_user, is_timestamp_required=False)571 force_update = False572 if not is_existing_user and auth_request.hashed_password:573 if not auth_request.first_name:574 flask.abort(422, i18n.flask_translate('Le champ first_name est nécessaire'))575 should_create_account = not is_existing_user and auth_request.hashed_password576 if is_existing_user or should_create_account:577 force_update |= response.authenticated_user.profile.email != auth_request.email578 response.authenticated_user.profile.email = auth_request.email579 response.authenticated_user.hashed_email = hash_user_email(auth_request.email)580 response.is_new_user = not response.authenticated_user.has_account581 force_update |= response.is_new_user582 response.authenticated_user.has_account = True583 force_update |= \584 response.authenticated_user.has_password != bool(auth_request.hashed_password)585 response.authenticated_user.has_password = bool(auth_request.hashed_password)586 if is_existing_user:587 self._handle_returning_user(response, force_update=force_update)588 elif should_create_account:589 response.authenticated_user.profile.name = auth_request.first_name590 response.authenticated_user.profile.last_name = auth_request.last_name591 self.save_new_user(response.authenticated_user, auth_request.user_data)592 if auth_request.hashed_password:593 object_id = objectid.ObjectId(response.authenticated_user.user_id)594 self._user_db.user_auth.replace_one(595 {'_id': object_id},596 {'hashedPassword': auth_request.hashed_password},597 upsert=True)598 response.is_password_updated = True599 response.is_new_user = not is_existing_user600 # TODO(cyrille): Consider dropping if there's no user_id.601 response.auth_token = token.create_token(response.authenticated_user.user_id, 'auth')602 return response603 def _reset_password(604 self, auth_request: auth_pb2.AuthRequest, user_id: str,605 user_auth_dict: dict[str, str], user_dict: dict[str, Any]) -> None:606 """Resets the user's password.607 The auth_request.auth_token here is the reset_token provided by create_reset_token.608 """609 try:610 is_token_valid = token.assert_valid_salt(611 auth_request.auth_token,612 auth_request.email + user_id + user_auth_dict.get('hashedPassword', ''),613 int(time.time()))614 if not is_token_valid:615 logging.info('Token in outdated.')616 raise ExpiredTokenException(_get_auth_error_message())617 except ValueError as error:618 logging.info('Token has not been generated by this server:\n %s', error)619 _abort_on_bad_auth()620 self._user_db.user_auth.replace_one(621 {'_id': objectid.ObjectId(user_id)},622 {'hashedPassword': auth_request.hashed_password})623 self.send_update_confirmation(user_dict)624 def _token_authenticate(self, auth_request: auth_pb2.AuthRequest) -> auth_pb2.AuthResponse:625 instant = int(time.time())626 response = auth_pb2.AuthResponse()627 response.hash_salt = token.timestamped_hash(instant, auth_request.email)628 self._load_user_with_token(auth_request, response.authenticated_user)629 if response.authenticated_user.HasField('deleted_at'):630 flask.abort(404, i18n.flask_translate('Compte supprimé'))631 response.auth_token = token.create_token(auth_request.user_id, 'auth')632 self._handle_returning_user(response)633 return response634 def _load_user_with_token(635 self, auth_request: auth_pb2.AuthRequest, out_user: user_pb2.User,636 is_timestamp_required: bool = True) -> bool:637 if not auth_request.user_id:638 return False639 try:640 user_id = objectid.ObjectId(auth_request.user_id)641 except bson.errors.InvalidId:642 flask.abort(643 400,644 i18n.flask_translate(645 'L\'identifiant utilisateur "{user_id}" n\'a pas le bon format.',646 ).format(user_id=auth_request.user_id))647 try:648 if not token.assert_valid_salt(649 auth_request.auth_token, str(user_id), int(time.time()),650 validity_seconds=datetime.timedelta(days=5).total_seconds(),651 role='auth') \652 and is_timestamp_required:653 raise ExpiredTokenException(i18n.flask_translate("Token d'authentification périmé"))654 except ValueError as error:655 flask.abort(656 403,657 i18n.flask_translate("Le sel n'a pas été généré par ce serveur\xa0: {error}.")658 .format(error=error))659 user_dict = self._user_collection.find_one({'_id': user_id})660 if not user_dict:661 flask.abort(404, i18n.flask_translate('Utilisateur inconnu.'))662 _parse_user_from_mongo(user_dict, out_user)663 return True664 def _load_user_from_token_or_email(665 self, auth_request: auth_pb2.AuthRequest, user: user_pb2.User,666 email: Optional[str]) -> bool:667 is_existing_user = email and proto.parse_from_mongo(668 self._user_collection.find_one({'hashedEmail': hash_user_email(email)}),669 user, 'user_id') or \670 self._load_user_with_token(auth_request, user, is_timestamp_required=False)671 had_email = bool(user.profile.email)672 if not had_email and email:673 user.profile.email = email674 return is_existing_user675 def _create_guest_user(self, auth_request: auth_pb2.AuthRequest) -> auth_pb2.AuthResponse:676 response = auth_pb2.AuthResponse()677 response.authenticated_user.profile.name = auth_request.first_name678 self.save_new_user(response.authenticated_user, auth_request.user_data)679 response.auth_token = token.create_token(response.authenticated_user.user_id, 'auth')680 response.is_new_user = True681 return response682 def create_reset_token(self, user_id: objectid.ObjectId) -> Tuple[Optional[str], str]:683 """Create a token to reset the user's password."""684 user_dict = self._user_collection.find_one({'_id': user_id})685 if not user_dict:686 return None, ''687 return self._create_reset_token_from_user(user_dict)688 def _create_reset_token_from_user(self, user_dict: dict[str, Any]) -> Tuple[Optional[str], str]:689 """Returns the reset token, and the linked email address."""690 email = user_dict.get('profile', {}).get('email', '')691 user_auth_dict = self._user_db.user_auth.find_one({'_id': user_dict['_id']})692 if not user_auth_dict or not user_auth_dict.get('hashedPassword'):693 return None, email694 hashed_old_password = user_auth_dict.get('hashedPassword', '')695 reset_token = token.timestamped_hash(696 int(time.time()), email + str(user_dict['_id']) + hashed_old_password)697 return reset_token, email698 def send_reset_password_token(self, email: str) -> None:699 """Sends an email to user with a reset token so that they can reset their password."""700 user_dict = self._user_collection.find_one({'hashedEmail': hash_user_email(email)})701 if not user_dict:702 # No user with this email address, however we don't want to tell that to a potential703 # attacker.704 return705 reset_token, unused_email = self._create_reset_token_from_user(user_dict)706 # User is a guest and/or doesn't have a password so we send them a email to login.707 if not reset_token:708 self.send_auth_token(user_dict)709 return710 user_profile = proto.create_from_mongo(711 user_dict.get('profile'), user_profile_pb2.UserProfile)712 reset_link = self._get_reset_password_link(user_dict)713 if not reset_link:714 return715 template_vars = {716 'firstname': user_profile.name,717 'productName': product.bob.name,718 'productLogoUrl': product.bob.get_config('productLogoUrl', ''),719 'resetLink': reset_link,720 }721 # TODO(cyrille): Create a static Campaign object and use it.722 result = mail_send.send_template('reset-password', user_profile, template_vars)723 if result.status_code != 200:724 logging.error('Failed to send an email with MailJet:\n %s', result.text)725 flask.abort(result.status_code)726 def send_auth_token(self, user_dict: dict[str, Any]) -> None:727 """Sends an email to the user with an auth token so that they can log in."""728 user_profile = proto.create_from_mongo(729 user_dict.get('profile'), user_profile_pb2.UserProfile)730 user_id = str(user_dict['_id'])731 auth_link = token.create_logged_url(user_id)732 template_vars = {733 'authLink': auth_link,734 'firstname': user_profile.name,735 'productName': product.bob.name,736 'productLogoUrl': product.bob.get_config('productLogoUrl', ''),737 }738 # TODO(cyrille): Create a static Campaign object and use it.739 result = mail_send.send_template('send-auth-token', user_profile, template_vars)740 if result.status_code != 200:741 logging.error('Failed to send an email with MailJet:\n %s', result.text)742 flask.abort(result.status_code)743 def _handle_returning_user(744 self, response: auth_pb2.AuthResponse, force_update: bool = False) -> None:745 response.last_access_at.CopyFrom(self._update_returning_user(746 response.authenticated_user,747 force_update=force_update or response.is_new_user))748def _get_oauth2_access_token(749 endpoint: str, code: str, client_id: str, client_secret: str, auth_name: str = 'OAuth2') \750 -> dict[str, str]:751 token_response = requests.post(752 endpoint,753 data=dict(754 grant_type='authorization_code',755 code=code,756 client_id=client_id,757 client_secret=client_secret,758 redirect_uri=flask.request.url_root,759 ))760 if token_response.status_code < 200 or token_response.status_code >= 400:761 try:762 json_error = typing.cast(dict[str, str], token_response.json())763 except request_exceptions.JSONDecodeError:764 json_error = {}765 if json_error and json_error.keys() == {'error', 'error_description'}:766 error_description = json_error['error_description']767 if json_error['error'] in ('redirect_uri_mismatch', 'invalid_redirect_uri'):768 error_description += f' "{flask.request.url_root}"'769 logging.warning('%s fails (%s): %s', auth_name, json_error['error'], error_description)770 else:771 logging.warning(772 '%s fails (%d): "%s"',773 auth_name, token_response.status_code, token_response.text)774 flask.abort(403, token_response.text)...

Full Screen

Full Screen

test_auth.py

Source:test_auth.py Github

copy

Full Screen

...28@pytest.fixture29def user(username="testuser", **kw):30 return User.objects.get_or_create(username=username, **kw)[0]31@pytest.fixture32def auth_request():33 r = create_request("GET", "/")34 r.user = user()35 return r36@pytest.fixture37def super_request():38 r = create_request("GET", "/")39 r.user = superuser()40 return r41from wheelcms_axle.workflows.default import Workflow42@pytest.mark.usefixtures("localtyperegistry")43class TestAssignments(object):44 type = Type1Type45 def setup(self):46 ## Make sure there are no workflow permission updates...

Full Screen

Full Screen

aws_v4.py

Source:aws_v4.py Github

copy

Full Screen

1"""p2 s3 authentication mixin"""2import hashlib3import hmac4from typing import Any, List, Optional5from urllib.parse import quote6from django.contrib.auth.models import User7from django.http import HttpRequest, QueryDict8from structlog import get_logger9from p2.api.models import APIKey10from p2.lib.hash import chunked_hasher11from p2.s3.auth.base import BaseAuth12from p2.s3.errors import (AWSAccessDenied, AWSContentSignatureMismatch,13 AWSSignatureMismatch)14LOGGER = get_logger()15UNSIGNED_PAYLOAD = 'UNSIGNED-PAYLOAD'16class SignatureMismatch(Exception):17 """Exception raised when given Hash does not match request body's hash"""18# pylint: disable=too-many-instance-attributes19class AWSv4AuthenticationRequest:20 """Holds all pieces of an AWSv4 Authenticated Request"""21 algorithm: str = ""22 signed_headers: str = ""23 signature: str = ""24 access_key: str = ""25 date: str = ""26 date_long: str = ""27 region: str = ""28 service: str = ""29 request: str = ""30 hash: str = ""31 def __init__(self):32 self.algorithm = self.date = self.signed_headers = self.signature = self.hash = ""33 self.access_key = self.date_long = self.region = self.service = self.request = ""34 @property35 def credentials(self) -> str:36 """Join properties together to re-construct credential string"""37 return "/".join([38 self.date,39 self.region,40 self.service,41 self.request42 ])43 @credentials.setter44 def credentials(self, value: str):45 # Further split credential value46 self.access_key, self.date, self.region, self.service, self.request = value.split('/')47 @staticmethod48 def from_querystring(get_dict: QueryDict) -> Optional['AWSv4AuthenticationRequest']:49 """Check if AWSv4 Authentication information was sent via Querystring,50 abd parse it into an AWSv4AuthenticationRequest object. If querystring doesn't51 contain necessary parameters, None is returned."""52 required_parameters = ['X-Amz-Date', 'X-Amz-Credential',53 'X-Amz-SignedHeaders', 'X-Amz-Signature']54 for required_parameter in required_parameters:55 if required_parameter not in get_dict:56 return None57 auth_request = AWSv4AuthenticationRequest()58 auth_request.credentials = get_dict.get('X-Amz-Credential')59 auth_request.signed_headers = get_dict.get('X-Amz-SignedHeaders')60 auth_request.date_long = get_dict.get('X-Amz-Date')61 auth_request.signature = get_dict.get('X-Amz-Signature')62 return auth_request63 @staticmethod64 def from_header(headers: dict) -> Optional['AWSv4AuthenticationRequest']:65 """Check if AWSv4 Authentication information was sent via headers,66 and parse it into an AWSv4AuthenticationRequest object. If headers don't67 contain necessary information, None is returned."""68 # Check if headers exist, otherwise return None69 if 'HTTP_AUTHORIZATION' not in headers:70 return None71 auth_request = AWSv4AuthenticationRequest()72 auth_request.algorithm, credential_container = \73 headers.get('HTTP_AUTHORIZATION').split(' ', 1)74 credential, signed_headers, signature = credential_container.split(',')75 # Remove "Credential=" from string76 _, auth_request.credentials = credential.split("=")77 _, auth_request.signed_headers = signed_headers.split("=")78 _, auth_request.signature = signature.split("=")79 auth_request.date_long = headers.get('HTTP_X_AMZ_DATE')80 if not auth_request.date_long:81 auth_request.date_long = auth_request.date82 return auth_request83class AWSV4Authentication(BaseAuth):84 """AWS v4 Signer"""85 # Key derivation functions. See:86 # http://docs.aws.amazon.com/general/latest/gr/signature-v4-examples.html#signature-v4-examples-python87 def _sign(self, key: str, msg: str) -> hmac.HMAC:88 """Simple HMAC wrapper"""89 return hmac.new(key, msg.encode('utf-8'), hashlib.sha256)90 def _get_signature_key(self, key: str, auth_request: AWSv4AuthenticationRequest) -> str:91 """Create signature like92 https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-header-based-auth.html"""93 k_date = self._sign(('AWS4' + key).encode('utf-8'), auth_request.date).digest()94 k_region = self._sign(k_date, auth_request.region).digest()95 k_service = self._sign(k_region, auth_request.service).digest()96 k_signing = self._sign(k_service, auth_request.request).digest()97 return k_signing98 def _make_query_string(self) -> str:99 """Parse existing Querystring, URI-encode them and sort them and put them back together"""100 pairs = []101 if self.request.META['QUERY_STRING'] == '':102 return self.request.META['QUERY_STRING']103 for kv_pair in self.request.META['QUERY_STRING'].split('&'):104 if '=' not in kv_pair:105 kv_pair = kv_pair + '='106 pairs.append(kv_pair)107 pairs.sort()108 return '&'.join(pairs)109 def _get_canonical_headers(self, only: List[str]) -> str:110 """Fix header keys from HTTP_X to x"""111 canonical_headers = ""112 def sorter(item):113 """Remove HTTP_ prefix, replace underscores with hyphens114 and lowercase convert to lowercase for comparison"""115 return item[0].replace('HTTP_', '', 1).replace('_', '-').lower()116 for header_key, header_value in sorted(self.request.META.items(), key=sorter):117 fixed_key = header_key.replace('HTTP_', '', 1).replace('_', '-').lower()118 if fixed_key in only:119 canonical_headers += f"{fixed_key}:{header_value}\n"120 return canonical_headers121 def _get_sha256(self, data: Any) -> str:122 """Get body hash in sha256"""123 hasher = hashlib.sha256()124 hasher.update(data)125 return hasher.hexdigest()126 def _get_canonical_request(self, auth_request: AWSv4AuthenticationRequest) -> str:127 """Create canonical request in AWS format (128 https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-header-based-auth.html)"""129 signed_headers_keys = auth_request.signed_headers.split(';')130 canonical_request = [131 self.request.META.get('REQUEST_METHOD', ''),132 quote(self.request.META.get('PATH_INFO', '')),133 self._make_query_string(),134 self._get_canonical_headers(signed_headers_keys),135 auth_request.signed_headers,136 auth_request.hash,137 ]138 return '\n'.join(canonical_request)139 def _lookup_access_key(self, access_key: str) -> Optional[APIKey]:140 """Lookup access_key in database, return secret if found otherwise None"""141 keys = APIKey.objects.filter(access_key=access_key)142 if keys.exists():143 return keys.first()144 return None145 @staticmethod146 def can_handle(request: HttpRequest) -> bool:147 if 'HTTP_AUTHORIZATION' in request.META:148 return 'AWS4-HMAC-SHA256' in request.META['HTTP_AUTHORIZATION']149 if 'X-Amz-Signature' in request.GET:150 return True151 return False152 def verify_content_sha256(self, auth_request: AWSv4AuthenticationRequest):153 """Verify X-Amz-Content-Sha256 Header, if sent"""154 # Header not set -> Empty hash, no checking155 if not auth_request.hash:156 auth_request.hash = ''157 return158 # Client has not calculated SHA256 of payload, no checking159 if auth_request.hash == UNSIGNED_PAYLOAD:160 return161 # Read request body chunk by chunk, compute sha256 and compare.162 request_body_hash = chunked_hasher(hashlib.sha256(), self.request)163 if auth_request.hash != request_body_hash:164 LOGGER.warning("CONTENT_SHA256 Header/param incorrect",165 theirs=auth_request.hash,166 ours=request_body_hash)167 raise AWSContentSignatureMismatch168 def validate(self) -> Optional[User]:169 """Check Authorization Header in AWS Compatible format"""170 auth_request = AWSv4AuthenticationRequest.from_header(self.request.META)171 if not auth_request:172 auth_request = AWSv4AuthenticationRequest.from_querystring(self.request.GET)173 auth_request.hash = self.request.META.get('HTTP_X_AMZ_CONTENT_SHA256')174 # Verify given Hash with request body175 self.verify_content_sha256(auth_request)176 # Build our own signature to compare177 secret_key = self._lookup_access_key(auth_request.access_key)178 if not secret_key:179 LOGGER.warning("No secret key found for request", access_key=auth_request.access_key)180 raise AWSAccessDenied181 signing_key = self._get_signature_key(secret_key.secret_key, auth_request)182 canonical_request = self._get_canonical_request(auth_request)183 string_to_sign = '\n'.join([184 auth_request.algorithm,185 auth_request.date_long,186 auth_request.credentials,187 self._get_sha256(canonical_request.encode('utf-8')),188 ])189 our_signature = self._sign(signing_key, string_to_sign).hexdigest()190 if auth_request.signature != our_signature:191 LOGGER.warning("Canonical Request", canonical_request=canonical_request)192 LOGGER.warning("Signatures", theirs=auth_request.signature,193 ours=our_signature)194 raise AWSSignatureMismatch...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful