Best Python code snippet using localstack_python
package_handler.py
Source:package_handler.py  
1import logging2import os3from email import message4from typing import Iterable, List, Tuple, Type5from handler.common.env import Env, get_env6from handler.common.exception import PermissionDenied7from handler.handler_context import HandlerContext8from handler.model.activity import Action9from handler.model.base import (Context, FieldMask, HandlerBase, ModelBase,10                                merge_resource)11from handler.model.base.base_db import ListOptions12from handler.model.model_activity import ModelActivity, search_activity13from handler.model.model_binary import ModelBinary14from handler.model.model_package import ModelPackage15from handler.model.model_subscription import ModelSubscription16from handler.model.model_thread import ModelThread17from handler.model.model_user import ModelUser, get_admins18from handler.util.state_util import on_approve19from handler.util.time_util import get_now20from .common.image import Image21from .protos import san11_platform_pb2 as pb22from .util.notifier import Notifier, notify, send_message23logger = logging.getLogger(os.path.basename(__file__))24class PackageHandler(HandlerBase):25    def create(self, parent: str, package: ModelPackage, handler_context: HandlerContext) -> ModelPackage:26        package.author_id = handler_context.user.user_id27        package.state = pb.ResourceState.UNDER_REVIEW28        package.create(parent=parent, user_id=handler_context.user.user_id)29        # Post creation actions30        if get_env() == Env.PROD:31            try:32                notifer = Notifier()33                for admin in get_admins():34                    notifer.send_email(35                        admin.email, 'ãæ°å
容ãå¾
审核', f'[{package.package_name}] 已被 {handler_context.user.username} å建ã请审核ã')36                    send_message(package.author_id, admin.user_id,37                                 f'{handler_context.user.username} åå»ºäº {package.package_name}. 请审核.', package.name, '')38            except Exception as err:39                logger.error(f'Failed to notify admin: {err}')40        return package41    def get(self, name: str, handler_context: HandlerContext) -> ModelPackage:42        return ModelPackage.from_name(name)43    def list(self, list_options: ListOptions, handler_context: HandlerContext) -> Tuple[List[ModelPackage], str]:44        # (TODO): BEGIN - Remove logic for model migration45        # if not ModelPackage.list(ListOptions(parent='categories/1'))[0]:46        #     for package in Package.list(page_size=1000, page_token=''):47        #         new_model = ModelPackage.from_v1(package)48        #         new_model.backfill()49        # - END50        # (TODO): Due to ListOptions.filter does not support `OR` operation, we51        # will do list without any filter and hide content later manually.52        # This should be replaced with different `filter` expression later when53        # possible.54        packages, next_page_token = ModelPackage.list(list_options)55        if handler_context.user is None:56            # Most users (including anonymous users) should only see packages in57            # `NORMAL` state.58            packages = list(filter(lambda x: x.state in [59                pb.ResourceState.NORMAL], packages))60        else:61            # Admin user can see all packages.62            # Users who also publish packages will also see packages they published63            # in `HIDDEN`, `UNDER_REVIEW` state.64            packages = list(filter(lambda x: handler_context.user.type == pb.User.UserType.ADMIN or65                                   x.state == pb.ResourceState.NORMAL or66                                   (x.author_id == handler_context.user.user_id and67                                    x.state in [pb.ResourceState.HIDDEN,68                                                pb.ResourceState.UNDER_REVIEW]),69                                   packages))70        return packages, next_page_token71    def update(self, update_package: ModelPackage, update_mask: FieldMask, handler_context: HandlerContext) -> ModelPackage:72        def verify_permission_on_update(curr: ModelPackage, dest: ModelPackage, update_mask: FieldMask) -> None:73            if on_approve(curr.state, dest.state):74                # Approve new package75                if not handler_context.user.is_admin():76                    raise PermissionDenied(message='审核éè¿æ°å·¥å
·éè¦ç®¡çåæé')77        def toggle_like_dislike(user_id: int, action: Action, package: ModelPackage):78            action2field = {79                Action.LIKE: 'like_count',80                Action.DISLIKE: 'dislike_count',81            }82            act1 = search_activity(83                f'users/{user_id}', action, package.name)84            if act1:85                act1.delete()86                setattr(package, action2field[action], getattr(87                    package, action2field[action]) - 1)88            else:89                ModelActivity('', get_now(), action.value, package.name).create(90                    parent=f'users/{user_id}')91                setattr(package, action2field[action], getattr(92                    package, action2field[action]) + 1)93                reversed_action = Action.DISLIKE if action == Action.LIKE else Action.LIKE94                act2 = search_activity(95                    f'users/{user_id}', reversed_action, package.name)96                if act2:97                    act2.delete()98                    setattr(package, action2field[reversed_action], getattr(99                        package, action2field[reversed_action]) - 1)100        base_package = ModelPackage.from_name(update_package.name)101        if update_mask.has('state'):102            verify_permission_on_update(103                base_package, update_package, update_mask)104        # Remove `like_count`, `dislike_count` from update_mask here and handle the count update105        #   explicitly later.106        sanitized_update_mask = FieldMask(107            set(update_mask.paths) - {'like_count', 'dislike_count'})108        package = merge_resource(base_resource=base_package,109                                 update_request=update_package,110                                 field_mask=sanitized_update_mask)111        user_id = handler_context.user.user_id112        if update_mask.has('like_count'):113            toggle_like_dislike(user_id, Action.LIKE, package)114        if update_mask.has('dislike_count'):115            toggle_like_dislike(user_id, Action.DISLIKE, package)116        # Delete image resources117        if update_mask.has('image_urls'):118            for image_url_to_delete in set(base_package.image_urls) - set(update_package.image_urls):119                try:120                    image = Image.from_url(image_url_to_delete)121                    image.delete()122                except Exception as err:123                    logger.error(124                        f'Failed to delete {image_url_to_delete}: {err}')125        is_visitor = True if set(update_mask.paths) <= {126            'like_count', 'dislike_count'} else False127        if is_visitor:128            package.update(update_update_time=False)129        else:130            package.update(user_id=user_id)131        # notify all subscribers132        author = ModelUser.from_name(f'users/{package.author_id}')133        if on_approve(base_package.state, update_package.state):134            for sub in ModelSubscription.list(ListOptions(parent=author.name))[0]:135                notify(136                    sender_id=author.user_id,137                    receiver_id=sub.subscriber_id,138                    content=f'ãæ°å
容ã{author.username} åå¸äº {package.package_name}',139                    link=package.name,140                    image_preview=package.image_urls[0] if package.image_urls else '',141                )142        return package143    def delete(self, name: str, handler_context: HandlerContext) -> ModelPackage:144        package = ModelPackage.from_name(name)145        for image_url in package.image_urls:146            try:147                Image.from_url(image_url).delete()148            except Exception:149                logger.error(f'Failed to delete image: image_url={image_url}')150        for binary in ModelBinary.list(ListOptions(parent=package.name))[0]:151            try:152                binary.delete(user_id=handler_context.user.user_id)153            except Exception as err:154                logger.error(155                    f'Failed to delete binary: binary={binary} err={err}')156        for thread in ModelThread.list(ListOptions(parent=package.name))[0]:157            try:158                thread.delete(user_id=handler_context.user.user_id)159            except Exception as err:160                logger.error(f'Failed to delete {thread} under {self}: {err}')161        package.delete(user_id=handler_context.user.user_id)162        return package163    def search_packages(self, request, context):164        # (TODO): Reimplement this with ModelPackage...update_bandwidth_controller.py
Source:update_bandwidth_controller.py  
1# -*- coding: utf-8 -*-2import traceback3from odoo import http4from .execute_query import update_connection5class FreeradiusUpdateBandwitdh(http.Controller):6    @http.route('/freeradius/update_bandwidth', auth='public')7    def index(self, **kw):8        """9        Creating user from odoo erp10        Parameters11        ----------12        username : str13            username for the client. This will be needed for PPoE access14        bandwidth: str15            assign bandwidth for the client. Format 10M/10M16        update_package: str17            name of the updated package18        Returns19        -------20        str21            returns 'success' if successful, otherwise returns debug data. see update_connection22        """23        result=None24        username = kw['username']25        bandwidth = kw['bandwidth']26        update_package = kw['update_package']27        current_package = kw['current_package']28        print(username,bandwidth,update_package,current_package)29        try:30            result=update_connection(username, bandwidth, update_package)31            print('result'+result)32            if result == 'success':33                http.request.env['dgcon_radius.logs'].sudo().create(34                    {35                        'username': username,36                        'bandwidth': bandwidth,37                        'message': result,38                        'ip_pool':current_package,39                        'status': True,40                        'type': 'Update',41                        'update_package':update_package,42                        'radius_error': False43                    }44                )45                return result46            else:47                http.request.env['dgcon_radius.logs'].sudo().create(48                    {49                        'username': username,50                        'bandwidth': bandwidth,51                        'ip_pool':current_package,52                        'message': result,53                        'status': False,54                        'type': 'Update',55                        'update_package': update_package,56                        'radius_error': True57                    }58                )59                return result60        except:61            http.request.env['dgcon_radius.logs'].sudo().create(62                {63                    'username': username,64                    'bandwidth': bandwidth,65                    'message': traceback.format_exc(),66                    'status': False,67                    'type': 'Update',68                    'update_package': update_package,69                    'radius_error': False70                }71            )...6ea47fd76d53_.py
Source:6ea47fd76d53_.py  
1"""empty message2Revision ID: 6ea47fd76d533Revises: 4Create Date: 2017-12-06 19:05:09.9797765"""6from alembic import op7import sqlalchemy as sa8# revision identifiers, used by Alembic.9revision = '6ea47fd76d53'10down_revision = None11branch_labels = None12depends_on = None13def upgrade():14    # ### commands auto generated by Alembic - please adjust! ###15    op.add_column('update_package', sa.Column('version_from', sa.String(length=255), nullable=True))16    op.add_column('update_package', sa.Column('version_to', sa.String(length=255), nullable=True))17    op.drop_column('update_package', 'version')18    # ### end Alembic commands ###19def downgrade():20    # ### commands auto generated by Alembic - please adjust! ###21    op.add_column('update_package', sa.Column('version', sa.VARCHAR(length=255), nullable=True))22    op.drop_column('update_package', 'version_to')23    op.drop_column('update_package', 'version_from')...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
