Best Python code snippet using localstack_python
views.py
Source:views.py  
...53    signals.user_confirmed_email.send(current_app._get_current_object(), user=user)54    # Prepare one-time system message55    flash(_('Your email has been confirmed.'), 'success')56    # Auto-login after confirm or redirect to login page57    next = request.args.get('next', _endpoint_url(user_manager.after_confirm_endpoint))58    if user_manager.auto_login_after_confirm:59        return _do_login_user(user, next)                       # auto-login60    else:61        return redirect(url_for('user.login')+'?next='+next)    # redirect to login page62@login_required63def change_password():64    """ Prompt for old password and new password and change the user's password."""65    user_manager =  current_app.user_manager66    db_adapter = user_manager.db_adapter67    # Initialize form68    form = user_manager.change_password_form(request.form)69    form.next.data = request.args.get('next', _endpoint_url(user_manager.after_change_password_endpoint))  # Place ?next query param in next form field70    # Process valid POST71    if request.method=='POST' and form.validate():72        # Hash password73        hashed_password = user_manager.hash_password(form.new_password.data)74        # Change password75        user_manager.update_password(current_user, hashed_password)76        # Send 'password_changed' email77        if user_manager.enable_email and user_manager.send_password_changed_email:78            emails.send_password_changed_email(current_user)79        # Send password_changed signal80        signals.user_changed_password.send(current_app._get_current_object(), user=current_user)81        # Prepare one-time system message82        flash(_('Your password has been changed successfully.'), 'success')83        # Redirect to 'next' URL84        return redirect(form.next.data)85    # Process GET or invalid POST86    return render_template(user_manager.change_password_template, form=form)87@login_required88def change_username():89    """ Prompt for new username and old password and change the user's username."""90    user_manager =  current_app.user_manager91    db_adapter = user_manager.db_adapter92    # Initialize form93    form = user_manager.change_username_form(request.form)94    form.next.data = request.args.get('next', _endpoint_url(user_manager.after_change_username_endpoint))  # Place ?next query param in next form field95    # Process valid POST96    if request.method=='POST' and form.validate():97        new_username = form.new_username.data98        # Change username99        user_auth = current_user.user_auth if db_adapter.UserAuthClass and hasattr(current_user, 'user_auth') else current_user100        db_adapter.update_object(user_auth, username=new_username)101        db_adapter.commit()102        # Send 'username_changed' email103        if user_manager.enable_email and user_manager.send_username_changed_email:104            emails.send_username_changed_email(current_user)105        # Send username_changed signal106        signals.user_changed_username.send(current_app._get_current_object(), user=current_user)107        # Prepare one-time system message108        flash(_("Your username has been changed to '%(username)s'.", username=new_username), 'success')109        # Redirect to 'next' URL110        return redirect(form.next.data)111    # Process GET or invalid POST112    return render_template(user_manager.change_username_template, form=form)113@login_required114def email_action(id, action):115    """ Perform action 'action' on UserEmail object 'id'116    """117    user_manager =  current_app.user_manager118    db_adapter = user_manager.db_adapter119    # Retrieve UserEmail by id120    user_email = db_adapter.find_first_object(db_adapter.UserEmailClass, id=id)121    # Users may only change their own UserEmails122    if not user_email or user_email.user_id != int(current_user.get_id()):123        return unauthorized()124    if action=='delete':125        # Primary UserEmail can not be deleted126        if user_email.is_primary:127            return unauthorized()128        # Delete UserEmail129        db_adapter.delete_object(user_email)130        db_adapter.commit()131    elif action=='make-primary':132        # Disable previously primary emails133        user_emails = db_adapter.find_all_objects(db_adapter.UserEmailClass, user_id=int(current_user.get_id()))134        for ue in user_emails:135            if ue.is_primary:136                ue.is_primary = False137        # Enable current primary email138        user_email.is_primary = True139        # Commit140        db_adapter.commit()141    elif action=='confirm':142        _send_confirm_email(user_email.user, user_email)143    else:144        return unauthorized()145    return redirect(url_for('user.manage_emails'))146def forgot_password():147    """Prompt for email and send reset password email."""148    user_manager =  current_app.user_manager149    db_adapter = user_manager.db_adapter150    # Initialize form151    form = user_manager.forgot_password_form(request.form)152    # Process valid POST153    if request.method=='POST' and form.validate():154        email = form.email.data155        # Find user by email156        user, user_email = user_manager.find_user_by_email(email)157        if user:158            # Generate reset password link159            token = user_manager.generate_token(int(user.get_id()))160            reset_password_link = url_for('user.reset_password', token=token, _external=True)161            # Send forgot password email162            emails.send_forgot_password_email(user, user_email, reset_password_link)163            # Store token164            if hasattr(user, 'reset_password_token'):165                db_adapter.update_object(user, reset_password_token=token)166                db_adapter.commit()167            # Send forgot_password signal168            signals.user_forgot_password.send(current_app._get_current_object(), user=user)169        # Prepare one-time system message170        flash(_("A reset password email has been sent to '%(email)s'. Open that email and follow the instructions to reset your password.", email=email), 'success')171        # Redirect to the login page172        return redirect(_endpoint_url(user_manager.after_forgot_password_endpoint))173    # Process GET or invalid POST174    return render_template(user_manager.forgot_password_template, form=form)175@app.route('/user/profile', methods=['GET', 'POST'])176@login_required177def user_profile_page():178    # Initialize form179    form = UserProfileForm(request.form, current_user)180    # Process valid POST181    if request.method=='POST' and form.validate():182        # Copy form fields to user_profile fields183        form.populate_obj(current_user)184        # Save user_profile185        db.session.commit()186        # Redirect to home page187        return redirect(url_for('home_page'))188    # Process GET or invalid POST189    return render_template('users/user_profile_page.html',form=form)190@app.route('/user/sign-in', methods=['GET', 'POST'])191def login():192    """ Prompt for username/email and password and sign the user in."""193    user_manager =  current_app.user_manager194    db_adapter = user_manager.db_adapter195    next = request.args.get('next', _endpoint_url(user_manager.after_login_endpoint))196    reg_next = request.args.get('reg_next', _endpoint_url(user_manager.after_register_endpoint))197    # Immediately redirect already logged in users198    if current_user.is_authenticated and user_manager.auto_login_at_login:199        return redirect(next)200    # Initialize form201    login_form = user_manager.login_form(request.form)          # for login.html202    register_form = user_manager.register_form()                # for login_or_register.html203    if request.method!='POST':204        login_form.next.data     = register_form.next.data = next205        login_form.reg_next.data = register_form.reg_next.data = reg_next206    # Process valid POST207    if request.method=='POST' and login_form.validate():208        # Retrieve User209        user = None210        user_email = None211        if user_manager.enable_username:212            # Find user record by username213            user = user_manager.find_user_by_username(login_form.username.data)214            user_email = None215            # Find primary user_email record216            if user and db_adapter.UserEmailClass:217                user_email = db_adapter.find_first_object(db_adapter.UserEmailClass,218                        user_id=int(user.get_id()),219                        is_primary=True,220                        )221            # Find user record by email (with form.username)222            if not user and user_manager.enable_email:223                user, user_email = user_manager.find_user_by_email(login_form.username.data)224        else:225            # Find user by email (with form.email)226            user, user_email = user_manager.find_user_by_email(login_form.email.data)227        if user:228            # Log user in229            return _do_login_user(user, login_form.next.data, login_form.remember_me.data)230    # Process GET or invalid POST231    return render_template(user_manager.login_template,232            form=login_form,233            login_form=login_form,234            register_form=register_form)235@app.route('/user/sign-out', methods=['GET', 'POST'])236def logout():237    """ Sign the user out."""238    user_manager =  current_app.user_manager239    # Send user_logged_out signal240    signals.user_logged_out.send(current_app._get_current_object(), user=current_user)241    # Use Flask-Login to sign out user242    logout_user()243    # Prepare one-time system message244    flash(_('You have signed out successfully.'), 'success')245    # Redirect to logout_next endpoint or '/'246    next = request.args.get('next', _endpoint_url(user_manager.after_logout_endpoint))  # Get 'next' query param247    return redirect(next)248def _do_login_user(user, next, remember_me=False):249    # User must have been authenticated250    if not user: return unauthenticated()251    # Check if user account has been disabled252    if not user.is_active():253        flash(_('Your account has not been activated. Please check your email.'), 'error')254        return redirect(url_for('user.login'))255    # Check if user has a confirmed email address256    user_manager = current_app.user_manager257    if user_manager.enable_email and user_manager.enable_confirm_email \258            and not current_app.user_manager.enable_login_without_confirm_email \259            and not user.has_confirmed_email():260        url = url_for('user.resend_confirm_email')261        flash(_('Your email address has not yet been confirmed. <a href="%(url)s">Re-send confirmation email</a>.', url=url), 'error')262        return redirect(url_for('user.login'))263    # Use Flask-Login to sign in user264    #print('login_user: remember_me=', remember_me)265    login_user(user, remember=remember_me)266    # Send user_logged_in signal267    signals.user_logged_in.send(current_app._get_current_object(), user=user)268    # Prepare one-time system message269    #flash(_('You have signed in successfully.'), 'success')270    if current_user.user_auth.credentials == 1:271        stripe.api_key = decode(current_user.user_auth.api_key)272        session['api_key'] = stripe.api_key273        # Redirect to 'next' URL274        return redirect(next)275    else:276        flash(_('Please enter your Stripe API key'), 'error')277        return redirect(url_for('getstarted'))278def register():279    """ Display registration form and create new User."""280    user_manager =  current_app.user_manager281    db_adapter = user_manager.db_adapter282    next = request.args.get('next', _endpoint_url(user_manager.after_login_endpoint))283    reg_next = request.args.get('reg_next', _endpoint_url(user_manager.after_register_endpoint))284    # Initialize form285    login_form = user_manager.login_form()                      # for login_or_register.html286    register_form = user_manager.register_form(request.form)    # for register.html287    if request.method!='POST':288        login_form.next.data     = register_form.next.data     = next289        login_form.reg_next.data = register_form.reg_next.data = reg_next290    # Process valid POST291    if request.method=='POST' and register_form.validate():292        # Create a User object using Form fields that have a corresponding User field293        User = db_adapter.UserClass294        user_class_fields = User.__dict__295        user_fields = {}296        user_auth_fi297        # Create a UserEmail object using Form fields that have a corresponding UserEmail field298        if db_adapter.UserEmailClass:299            UserEmail = db_adapter.UserEmailClass300            user_email_class_fields = UserEmail.__dict__301            user_email_fields = {}302        # Create a UserAuth object using Form fields that have a corresponding UserAuth field303        if db_adapter.UserAuthClass:304            UserAuth = db_adapter.UserAuthClass305            user_auth_class_fields = UserAuth.__dict__306            user_auth_fields = {}307        # Enable user account308        if db_adapter.UserProfileClass:309            if hasattr(db_adapter.UserProfileClass, 'active'):310                user_auth_fields['active'] = True311            elif hasattr(db_adapter.UserProfileClass, 'is_enabled'):312                user_auth_fields['is_enabled'] = True313            else:314                user_auth_fields['is_active'] = True315        else:316            if hasattr(db_adapter.UserClass, 'active'):317                user_fields['active'] = True318            elif hasattr(db_adapter.UserClass, 'is_enabled'):319                user_fields['is_enabled'] = True320            else:321                user_fields['is_active'] = True322        # For all form fields323        for field_name, field_value in register_form.data.items():324            # Hash password field325            if field_name=='password':326                hashed_password = user_manager.hash_password(field_value)327                if db_adapter.UserAuthClass:328                    user_auth_fields['password'] = hashed_password329                else:330                    user_fields['password'] = hashed_password331            # Store corresponding Form fields into the User object and/or UserProfile object332            else:333                if field_name in user_class_fields:334                    user_fields[field_name] = field_value335                if db_adapter.UserEmailClass:336                    if field_name in user_email_class_fields:337                        user_email_fields[field_name] = field_value338                if db_adapter.UserAuthClass:339                    if field_name in user_auth_class_fields:340                        user_auth_fields[field_name] = field_value341        # Add User record using named arguments 'user_fields'342        user = db_adapter.add_object(User, **user_fields)343        if db_adapter.UserProfileClass:344            user_profile = user345        # Add UserEmail record using named arguments 'user_email_fields'346        if db_adapter.UserEmailClass:347            user_email = db_adapter.add_object(UserEmail,348                    user=user,349                    is_primary=True,350                    **user_email_fields)351        else:352            user_email = None353        # Add UserAuth record using named arguments 'user_auth_fields'354        if db_adapter.UserAuthClass:355            user_auth = db_adapter.add_object(UserAuth, **user_auth_fields)356            if db_adapter.UserProfileClass:357                user = user_auth358            else:359                user.user_auth = user_auth360        db_adapter.commit()361        # Send 'registered' email and delete new User object if send fails362        if user_manager.send_registered_email:363            try:364                # Send 'registered' email365                _send_registered_email(user, user_email)366            except Exception as e:367                # delete new User object if send  fails368                db_adapter.delete_object(user)369                db_adapter.commit()370                raise e371        # Send user_registered signal372        signals.user_registered.send(current_app._get_current_object(), user=user)373        # Redirect if USER_ENABLE_CONFIRM_EMAIL is set374        if user_manager.enable_confirm_email:375            next = request.args.get('next', _endpoint_url(user_manager.after_register_endpoint))376            return redirect(next)377        # Auto-login after register or redirect to login page378        next = request.args.get('next', _endpoint_url(user_manager.after_confirm_endpoint))379        if user_manager.auto_login_after_register:380            return _do_login_user(user, reg_next)                     # auto-login381        else:382            return redirect(url_for('user.login')+'?next='+reg_next)  # redirect to login page383    # Process GET or invalid POST384    return render_template(user_manager.register_template,385            form=register_form,386            login_form=login_form,387            register_form=register_form)388def resend_confirm_email():389    """Prompt for email and re-send email conformation email."""390    user_manager =  current_app.user_manager391    db_adapter = user_manager.db_adapter392    # Initialize form393    form = user_manager.resend_confirm_email_form(request.form)394    # Process valid POST395    if request.method=='POST' and form.validate():396        email = form.email.data397        # Find user by email398        user, user_email = user_manager.find_user_by_email(email)399        if user:400            _send_confirm_email(user, user_email)401        # Redirect to the login page402        return redirect(_endpoint_url(user_manager.after_resend_confirm_email_endpoint))403    # Process GET or invalid POST404    return render_template(user_manager.resend_confirm_email_template, form=form)405def reset_password(token):406    """ Verify the password reset token, Prompt for new password, and set the user's password."""407    # Verify token408    user_manager = current_app.user_manager409    db_adapter = user_manager.db_adapter410    is_valid, has_expired, user_id = user_manager.verify_token(411            token,412            user_manager.reset_password_expiration)413    if has_expired:414        flash(_('Your reset password token has expired.'), 'error')415        return redirect(url_for('user.login'))416    if not is_valid:417        flash(_('Your reset password token is invalid.'), 'error')418        return redirect(url_for('user.login'))419    user = user_manager.get_user_by_id(user_id)420    if user:421        # Avoid re-using old tokens422        if hasattr(user, 'reset_password_token'):423            verified = user.reset_password_token == token424        else:425            verified = True426    if not user or not verified:427        flash(_('Your reset password token is invalid.'), 'error')428        return redirect(_endpoint_url(user_manager.login_endpoint))429    # Initialize form430    form = user_manager.reset_password_form(request.form)431    # Process valid POST432    if request.method=='POST' and form.validate():433        # Invalidate the token by clearing the stored token434        if hasattr(user, 'reset_password_token'):435            db_adapter.update_object(user, reset_password_token='')436        # Change password437        hashed_password = user_manager.hash_password(form.new_password.data)438        user_auth = user.user_auth if db_adapter.UserAuthClass and hasattr(user, 'user_auth') else user439        db_adapter.update_object(user_auth, password=hashed_password)440        db_adapter.commit()441        # Send 'password_changed' email442        if user_manager.enable_email and user_manager.send_password_changed_email:443            emails.send_password_changed_email(user)444        # Prepare one-time system message445        flash(_("Your password has been reset successfully. Please sign in with your new password"), 'success')446        # Auto-login after reset password or redirect to login page447        next = request.args.get('next', _endpoint_url(user_manager.after_reset_password_endpoint))448        if user_manager.auto_login_after_reset_password:449            return _do_login_user(user, next)                       # auto-login450        else:451            return redirect(url_for('user.login')+'?next='+next)    # redirect to login page452    # Process GET or invalid POST453    return render_template(user_manager.reset_password_template, form=form)454def unconfirmed():455    """ Prepare a Flash message and redirect to USER_UNCONFIRMED_URL"""456    # Prepare Flash message457    url = request.script_root + request.path458    flash(_("You must confirm your email to access '%(url)s'.", url=url), 'error')459    # Redirect to USER_UNCONFIRMED_EMAIL_ENDPOINT460    user_manager = current_app.user_manager461    return redirect(_endpoint_url(user_manager.unconfirmed_email_endpoint))462def unauthenticated():463    """ Prepare a Flash message and redirect to USER_UNAUTHENTICATED_ENDPOINT"""464    # Prepare Flash message465    url = request.url466    flash(_("You must be signed in to access '%(url)s'.", url=url), 'error')467    # quote the fully qualified url468    quoted_url = quote(url)469    # Redirect to USER_UNAUTHENTICATED_ENDPOINT470    user_manager = current_app.user_manager471    return redirect(_endpoint_url(user_manager.unauthenticated_endpoint)+'?next='+ quoted_url)472def unauthorized():473    """ Prepare a Flash message and redirect to USER_UNAUTHORIZED_ENDPOINT"""474    # Prepare Flash message475    url = request.script_root + request.path476    flash(_("You do not have permission to access '%(url)s'.", url=url), 'error')477    # Redirect to USER_UNAUTHORIZED_ENDPOINT478    user_manager = current_app.user_manager479    return redirect(_endpoint_url(user_manager.unauthorized_endpoint))480def _endpoint_url(endpoint):481    url = '/'482    if endpoint:483        url = url_for(endpoint)...bases.py
Source:bases.py  
...18            self._response = requests.get(self._endpoint_url)19            return self._response20        else:21            self._response = requests.get(self._endpoint_url, headers=self._auth_header)22            self._refreshing_endpoint_url()23            return self._response24    def _send_post_requests(self, apiClient=False):25        if apiClient == False:26            self._response = requests.post(27                url=self._endpoint_url,28                json=self._request_body,29                headers=self._auth_header30            )31            try:32                self._orderId = self._response.json()["orderId"]33                open("endpoints/orderId.txt", "w").write(copy.deepcopy(self._orderId))34            except KeyError:35                self._error_msg = self._response.json()["error"]36            return self._response37        else:38            self._response = requests.post(39                url=self._endpoint_url,40                json=self._request_body41            ) 42            return self._response43    def _send_patch_requests(self):44        self._response = requests.patch(45            url=self._endpoint_url,46            json=self._customer_name,47            headers=self._auth_header48        )49        self._refreshing_endpoint_url()50        return self._response51    def _send_delete_requests(self):52        self._response = requests.delete(53            url=self._endpoint_url,54            headers=self._auth_header55        )56        self._refreshing_endpoint_url()57        return self._response58    def _refreshing_endpoint_url(self):59        self._endpoint_url = self._endpoint_url_copy60        61    def checking_status_code(self):...flightweather.py
Source:flightweather.py  
1import requests2import json3import logging4import pandas as pd5class RestAPI:6    def __init__():7        pass8    def call(self, *args):9        response = requests.get(url=self._endpoint_url, params=self.params)10        self.data = response.json()11class WeatherAPI(RestAPI):12    """13    Wrapper for OpenWeather API14    """15    def __init__(self, api_key, lat, lon):16        self._endpoint_url="https://api.openweathermap.org/data/2.5/weather?"17        self.params = {18            "appid": api_key,19            "lat": lat,20            "lon": lon,21            "units": "metric"22        }23    def get(self, metric="main"):24        self.call()25        data = self.data[metric]26        logging.info(data)27        df = pd.DataFrame([data])28        return df29class FlightAPI(RestAPI):30    """31    Wrapper for AirLabs API32    """33    def __init__(self, api_key, endpoint="flights"):34        self._endpoint_url=f"https://airlabs.co/api/v9/{endpoint}?"35        self.params = {36            "api_key": api_key37        }38    def get(self):39        self.call()40        logging.debug(self.data)41        data = self.data["response"]42        logging.debug(data)43        if type(data) != list:44            data = [data]45        df = pd.DataFrame(data)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
