Best Python code snippet using localstack_python
api.py
Source:api.py  
1from pyusermanager import *2import bottle3from http import HTTPStatus4from bottle import (5    route,6    run,7    post,8    get,9    static_file,10    request,11    redirect,12    HTTPResponse,13    response,14)15# from gevent import monkey16# monkey.patch_all()17from return_stuff import *18import filestuff19def get_default_response():20    default_response = HTTPResponse(21        status=HTTPStatus.IM_A_TEAPOT,22        body=json.dumps({"error": "this should never happen lol"}),23    )24    default_response.headers["Access-Control-Allow-Origin"] = "*"25    default_response.headers[26        "Access-Control-Allow-Methods"27    ] = "PUT, GET, POST, DELETE, OPTIONS"28    default_response.headers[29        "Access-Control-Allow-Headers"30    ] = "Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token"31    default_response.body = get_json_from_args(32        Alert("oh god something went terribly wrong", ALERT_TYPE.DANGER)33    )34    return default_response35###################################36#37# Fastapi stuff starts here!38#39###################################40app = bottle.app()41@app.get("/user/<username>")42def api_verify_token(username):43    ip = request.environ.get("HTTP_X_FORWARDED_FOR") or request.environ.get(44        "REMOTE_ADDR"45    )46    return_response = get_default_response()47    return_response.status = HTTPStatus.UNAUTHORIZED48    request_params = dict(request.query.decode())49    print(request_params)50    try:51        token = request_params["token"]52    except Exception:53        token = None54        return_response.status = HTTPStatus.BAD_REQUEST55        return_response.body = get_json_from_args(56            Alert("No Token supplied!", ALERT_TYPE.DANGER), Redirect("/")57        )58        return return_response59    try:60        success, perms, username = verify_token(token, ip)61    except TypeError:62        return_response.status = HTTPStatus.BAD_REQUEST63        return_response.body = get_json_from_args(64            Alert("Token Type is not valid!", ALERT_TYPE.DANGER), Redirect("/")65        )66        return return_response67@app.post("/login")68def login():69    return_response = get_default_response()70    print("reeee")71    json_text = request.body.read().decode("utf-8")72    print(json_text)73    try:74        json_obj = json.load(request.body)75    except Exception as err:76        return_response.status = HTTPStatus.BAD_REQUEST77        print(err)78        print("could not read json input!")79        return return_response80    print("successfully converted to json")81    print(json_obj)82    # perform login!83    try:84        password = json_obj["password"]85        username = json_obj["username"]86    except Exception:87        return_response.status = HTTPStatus.BAD_REQUEST88        return_response.body = get_json_from_args(89            Alert("required vars missing", ALERT_TYPE.WARNING)90        )91        return return_response92    try:93        remember_me = json_obj["remember_me"]94        if remember_me:95            valid_days = 36596        else:97            valid_days = 198    except Exception:99        valid_days = 1100    try:101        success, user = login_user(username, password)102    except MissingUserException:103        return_response.status = HTTPStatus.BAD_REQUEST104        return_response.body = get_json_from_args(105            Alert("User does not exist!", ALERT_TYPE.WARNING)106        )107        return return_response108    if success:109        ip = request.environ.get("HTTP_X_FORWARDED_FOR") or request.environ.get(110            "REMOTE_ADDR"111        )112        # create token113        try:114            token = create_token(user, ip, valid_days)115            return_response.status = HTTPStatus.OK116            return_response.body = get_json_from_args(117                {"Login": {"valid_day": valid_days, "token": token}},118                Alert("Successfull Login", ALERT_TYPE.INFO),119                Redirect(f"/user/{user}"),120            )121        except Exception:122            return_response.status = HTTPStatus.INTERNAL_SERVER_ERROR123            return_response.body = get_json_from_args(124                Alert("could not generate token!", ALERT_TYPE.DANGER)125            )126    else:127        return_response.status = HTTPStatus.UNAUTHORIZED128        return_response.body = get_json_from_args(129            Alert("password/User Combination is wrong!", ALERT_TYPE.WARNING)130        )131    return return_response132@app.get("/header")133def api_get_header():134    return_response = get_default_response()135    request_params = dict(request.query.decode())136    print(request_params)137    header = filestuff.get_template("header_logged_out.html")138    return_response.status = HTTPStatus.OK139    return_response.body = get_json_from_args({"pre_rendered": {"content": header}})140    try:141        token = request_params["token"]142    except Exception:143        token = None144    print(f"token: {token}")145    if token is not None:146        ip = request.environ.get("HTTP_X_FORWARDED_FOR") or request.environ.get(147            "REMOTE_ADDR"148        )149        # verify token150        success, perms, username = verify_token(token, ip)151        print(verify_token(token, ip))152        if success:153            user_dict, token_dict, perm_dict = get_extended_info(username)154            template_vars = {155                "username": username,156                "api_url": "http://127.0.0.1:1337",157                "user_avatar": user_dict["avatar"],158            }159            if LoginConfig.admin_group_name in perms:160                header = filestuff.get_template(161                    "header_logged_in_admin.html", **template_vars162                )163            else:164                header = filestuff.get_template(165                    "header_logged_in.html", **template_vars166                )167            return_response.body = get_json_from_args(168                {"pre_rendered": {"content": header}}169            )170    return return_response171@app.get("/users")172def api_get_users():173    ip = request.environ.get("HTTP_X_FORWARDED_FOR") or request.environ.get(174        "REMOTE_ADDR"175    )176    return_response = get_default_response()177    return_response.status = HTTPStatus.UNAUTHORIZED178    request_params = dict(request.query.decode())179    print(request_params)180    try:181        token = request_params["token"]182    except Exception:183        token = None184    print(token)185    if token is None or len(token) < 2:186        return_response.status = HTTPStatus.UNAUTHORIZED187        return_json = get_json_from_args(188            Alert("please log in", ALERT_TYPE.WARNING), Redirect("/login")189        )190        # return_json = get_json_from_args(Alert("Bitte einloggen!",ALERT_TYPE.WARNING),Modal("du bist nicht eingelogt",MODAL_TYPE.ERROR,headline="satanismus") ,Redirect("/"))191        return_response.body = return_json192    else:193        print(token)194        # verify token195        print(verify_token(str(token), ip))196        success, perms, username = verify_token(str(token), ip)197        if success:198            return_response.status = HTTPStatus.OK199            user_dict = get_users()200            return_response.body = get_json_from_args(user_dict)201            print(return_response.body)202        else:203            return_response.status = HTTPStatus.UNAUTHORIZED204            print(205                get_json_from_args(206                    Alert("please log in", ALERT_TYPE.WARNING), Redirect("/login")207                )208            )209            return_response.body = get_json_from_args(210                Alert("Bitte einloggen!", ALERT_TYPE.WARNING), Redirect("/login")211            )212    print(return_response)213    return return_response214@app.get("/user/<username>")215def api_get_user(username):216    ip = request.environ.get("HTTP_X_FORWARDED_FOR") or request.environ.get(217        "REMOTE_ADDR"218    )219    return_response = get_default_response()220    return_response.status = HTTPStatus.UNAUTHORIZED221    request_params = dict(request.query.decode())222    print(request_params)223    try:224        token = request_params["token"]225    except Exception:226        token = None227    print(token)228    if token is None or len(token) < 2:229        return_response.status = HTTPStatus.UNAUTHORIZED230        return_json = get_json_from_args(231            Alert("please log in", ALERT_TYPE.WARNING), Redirect("/login")232        )233        return_response.body = return_json234    else:235        print(token)236        # verify token237        print(verify_token(str(token), ip))238        success, perms, user = verify_token(str(token), ip)239        include_mail = None240        if user == username or LoginConfig.admin_group_name in perms:241            include_mail = True242        if success:243            return_response.status = HTTPStatus.OK244            try:245                user_dict, token_dict, perm_dict = get_extended_info(246                    username, include_mail247                )248            except MissingUserException:249                return_response.status = HTTPStatus.BAD_REQUEST250                return_response.body = get_json_from_args(251                    Alert("User does not Exist", ALERT_TYPE.DANGER), Redirect("/users")252                )253                return return_response254            if LoginConfig.admin_group_name in perms:255                return_response.body = get_json_from_args(256                    {"user": user_dict},257                    {"token": token_dict},258                    {"groups": perm_dict},259                    {"Admin": True},260                )261            elif username == user:262                return_response.body = get_json_from_args(263                    {"user": user_dict}, {"token": token_dict}, {"groups": perm_dict}264                )265            else:266                return_response.body = get_json_from_args(267                    {"user": user_dict}, {"groups": perm_dict}268                )269            print(return_response.body)270        else:271            return_response.status = HTTPStatus.UNAUTHORIZED272            print(273                get_json_from_args(274                    Alert("please log in", ALERT_TYPE.WARNING), Redirect("/login")275                )276            )277            return_response.body = get_json_from_args(278                Alert("Bitte einloggen!", ALERT_TYPE.WARNING), Redirect("/login")279            )280    print(return_response)281    return return_response282@app.post("/user/delete/<username>")283def api_delete_user(username):284    return_response = get_default_response()285    return return_response286@app.get("/logout")287def api_logout_user():288    ip = request.environ.get("HTTP_X_FORWARDED_FOR") or request.environ.get(289        "REMOTE_ADDR"290    )291    return_response = get_default_response()292    return_response.status = HTTPStatus.UNAUTHORIZED293    request_params = dict(request.query.decode())294    print(request_params)295    try:296        token = request_params["token"]297    except Exception:298        token = None299    print(token)300    if token is not None:301        try:302            logout = {"Logout": logout_user(token, ip)}303            return_response.body = get_json_from_args(304                logout, Alert("successfull Logout", ALERT_TYPE.INFO), Redirect("/")305            )306        except TokenMissingException:307            return_response.status = HTTPStatus.BAD_REQUEST308            return_response.body = get_json_from_args(309                logout, Alert("Token does not exist!", ALERT_TYPE.DANGER)310            )311        except ValueError:312            return_response.status = HTTPStatus.BAD_REQUEST313            return_response.body = get_json_from_args(314                logout,315                Alert("cant Log Out User from Different IP!", ALERT_TYPE.WARNING),316            )317    else:318        return_response.status = HTTPStatus.BAD_REQUEST319        return_response.body = get_json_from_args(320            Alert("no Token transmitted", ALERT_TYPE.DANGER)321        )322    return return_response323@app.get("/avatars/<filename>")324def static_files(filename):325    return static_file(str(filename), root="./avatars/")326if __name__ == "__main__":327    # init base ad config328    ad_config = AD_Config()329    # init db config330    db_config = DB_Config(331        provider="mysql",332        host="127.0.0.1",333        port=3306,334        user="test",335        pw="test123",336        db_name="users",337    )338    # init db config with db config and other parameters339    config = LoginConfig(340        db_config=db_config,341        debug=True,342        auto_activate_accounts=False,343        admin_group_name="administrator",344    )345    # generate our admin user if it does not exist yet346    try:347        create_user(uname="admin", pw="12345", email="test@local", auth=AUTH_TYPE.LOCAL)348        # if user is new we need to verify the token if auto activate is disabled349        if not LoginConfig.auto_activate_accounts:350            print("since new admin user is not activated we try to activate him")351            auth_token = get_token("admin", ActivationCode)352            feedback = list(verify_token(token=auth_token, token_type=ActivationCode))353            print(f"success: {feedback[0]}")354    except AlreadyExistsException:355        print("user already exists")356    except Exception:357        print("somethign bad happened o-o")358    # add default admin group and add admin to it359    print(f"creating admin group: {LoginConfig.admin_group_name}")360    create_perm(LoginConfig.admin_group_name)361    print(f"adding admin user to {LoginConfig.admin_group_name} group")362    assign_perm_to_user("admin", LoginConfig.admin_group_name)...lambda_authorizer_jwt.py
Source:lambda_authorizer_jwt.py  
...21    jwks_client = PyJWKClient(url)22except Exception as e:23    logging.error(e)24    raise ("Unable to download JWKS")25def return_response(isAuthorized, other_params={}):26    return {"isAuthorized": isAuthorized, "context": other_params}27def lambda_handler(event, context):28    try:29        # fetching access token from event30        token = event["headers"]["authorization"]31        # check token structure32        if len(token.split(".")) != 3:33            return return_response(isAuthorized=False, other_params={})34    except Exception as e:35        logging.error(e)36        return return_response(isAuthorized=False, other_params={})37    try:38        # get unverified headers39        headers = jwt.get_unverified_header(token)40        # get signing key41        signing_key = jwks_client.get_signing_key_from_jwt(token)42        # validating exp, iat, signature, iss43        data = jwt.decode(44            token,45            signing_key.key,46            algorithms=[headers.get("alg")],47            options={48                "verify_signature": True,49                "verify_exp": True,50                "verify_iat": True,51                "verify_iss": True,52                "verify_aud": False,53            },54        )55    except jwt.InvalidTokenError as e:56        logging.error(e)57        return return_response(isAuthorized=False, other_params={})58    except jwt.DecodeError as e:59        logging.error(e)60        return return_response(isAuthorized=False, other_params={})61    except jwt.InvalidSignatureError as e:62        logging.error(e)63        return return_response(isAuthorized=False, other_params={})64    except jwt.ExpiredSignatureError as e:65        logging.error(e)66        return return_response(isAuthorized=False, other_params={})67    except jwt.InvalidIssuerError as e:68        logging.error(e)69        return return_response(isAuthorized=False, other_params={})70    except jwt.InvalidIssuedAtError as e:71        logging.error(e)72        return return_response(isAuthorized=False, other_params={})73    except Exception as e:74        logging.error(e)75        return return_response(isAuthorized=False, other_params={})76    try:77        # verifying audience...use data['client_id'] if verifying an access token else data['aud']78        if app_client != data.get("client_id"):79            return return_response(isAuthorized=False, other_params={})80    except Exception as e:81        logging.error(e)82        return return_response(isAuthorized=False, other_params={})83    try:84        # token_use check85        if data.get("token_use") != "access":86            return return_response(isAuthorized=False, other_params={})87    except Exception as e:88        logging.error(e)89        return return_response(isAuthorized=False, other_params={})90    try:91        # scope check92        if "openid" not in data.get("scope").split(" "):93            return return_response(isAuthorized=False, other_params={})94    except Exception as e:95        logging.error(e)96        return return_response(isAuthorized=False, other_params={})...azure.py
Source:azure.py  
1import requests2import os3import json4pic_directory = "DB/pictures/"5def faceRec(file_name):6    return_response = {'status': None}7    image_path = pic_directory + file_name8    print(image_path)9    image_data = open(image_path, 'rb')10    SUBSCRIPTION_KEY = '8b8cfcd8d7d742b3ba6542827a90fb03'11    ENDPOINT = 'https://testfacial.cognitiveservices.azure.com/face/v1.0/'12    tempfaceid = detectImage(image_data, SUBSCRIPTION_KEY, ENDPOINT)13    if not tempfaceid:14        return_response['status'] = 'Fail'15    else:16        face_id = identifyImage(tempfaceid, SUBSCRIPTION_KEY, ENDPOINT)17        if not face_id:18            return_response['status'] = 'Fail'19        else:20            return_response['status'] = 'Ok'21            return_response['faceId'] = face_id22    return_response = json.dumps(return_response, indent=3)23#    print(return_response)24    return return_response25def detectImage(image_data, subscription_key, endpoint):26    headers = {'Content-Type': 'application/octet-stream',27               'Ocp-Apim-Subscription-Key': subscription_key}28    params = {29        'returnFaceId': 'true',30    }31    response = requests.post(endpoint + 'detect', params=params, headers=headers, data=image_data)32    response.raise_for_status()33    faces = response.json()34    if faces:35        return faces[0]['faceId']36    else:37        return None38def identifyImage(tempfaceid, subscription_key, endpoint):39    headers = {'Content-Type': 'application/json',40               'Ocp-Apim-Subscription-Key': subscription_key}41    params = {42        'personGroupId': 'godseye2020',43        'faceIds': [tempfaceid]44    }45    response = requests.post(endpoint + 'identify', json=params, headers=headers)46    face_id = response.json()47    if face_id:48        return face_id[0]['candidates'][0]['personId']49    else:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
