How to use update_package method in localstack

Best Python code snippet using localstack_python

package_handler.py

Source:package_handler.py Github

copy

Full Screen

1import logging2import os3from email import message4from typing import Iterable, List, Tuple, Type5from handler.common.env import Env, get_env6from handler.common.exception import PermissionDenied7from handler.handler_context import HandlerContext8from handler.model.activity import Action9from handler.model.base import (Context, FieldMask, HandlerBase, ModelBase,10 merge_resource)11from handler.model.base.base_db import ListOptions12from handler.model.model_activity import ModelActivity, search_activity13from handler.model.model_binary import ModelBinary14from handler.model.model_package import ModelPackage15from handler.model.model_subscription import ModelSubscription16from handler.model.model_thread import ModelThread17from handler.model.model_user import ModelUser, get_admins18from handler.util.state_util import on_approve19from handler.util.time_util import get_now20from .common.image import Image21from .protos import san11_platform_pb2 as pb22from .util.notifier import Notifier, notify, send_message23logger = logging.getLogger(os.path.basename(__file__))24class PackageHandler(HandlerBase):25 def create(self, parent: str, package: ModelPackage, handler_context: HandlerContext) -> ModelPackage:26 package.author_id = handler_context.user.user_id27 package.state = pb.ResourceState.UNDER_REVIEW28 package.create(parent=parent, user_id=handler_context.user.user_id)29 # Post creation actions30 if get_env() == Env.PROD:31 try:32 notifer = Notifier()33 for admin in get_admins():34 notifer.send_email(35 admin.email, '【新内容】待审核', f'[{package.package_name}] 已被 {handler_context.user.username} 创建。请审核。')36 send_message(package.author_id, admin.user_id,37 f'{handler_context.user.username} 创建了 {package.package_name}. 请审核.', package.name, '')38 except Exception as err:39 logger.error(f'Failed to notify admin: {err}')40 return package41 def get(self, name: str, handler_context: HandlerContext) -> ModelPackage:42 return ModelPackage.from_name(name)43 def list(self, list_options: ListOptions, handler_context: HandlerContext) -> Tuple[List[ModelPackage], str]:44 # (TODO): BEGIN - Remove logic for model migration45 # if not ModelPackage.list(ListOptions(parent='categories/1'))[0]:46 # for package in Package.list(page_size=1000, page_token=''):47 # new_model = ModelPackage.from_v1(package)48 # new_model.backfill()49 # - END50 # (TODO): Due to ListOptions.filter does not support `OR` operation, we51 # will do list without any filter and hide content later manually.52 # This should be replaced with different `filter` expression later when53 # possible.54 packages, next_page_token = ModelPackage.list(list_options)55 if handler_context.user is None:56 # Most users (including anonymous users) should only see packages in57 # `NORMAL` state.58 packages = list(filter(lambda x: x.state in [59 pb.ResourceState.NORMAL], packages))60 else:61 # Admin user can see all packages.62 # Users who also publish packages will also see packages they published63 # in `HIDDEN`, `UNDER_REVIEW` state.64 packages = list(filter(lambda x: handler_context.user.type == pb.User.UserType.ADMIN or65 x.state == pb.ResourceState.NORMAL or66 (x.author_id == handler_context.user.user_id and67 x.state in [pb.ResourceState.HIDDEN,68 pb.ResourceState.UNDER_REVIEW]),69 packages))70 return packages, next_page_token71 def update(self, update_package: ModelPackage, update_mask: FieldMask, handler_context: HandlerContext) -> ModelPackage:72 def verify_permission_on_update(curr: ModelPackage, dest: ModelPackage, update_mask: FieldMask) -> None:73 if on_approve(curr.state, dest.state):74 # Approve new package75 if not handler_context.user.is_admin():76 raise PermissionDenied(message='审核通过新工具需要管理员权限')77 def toggle_like_dislike(user_id: int, action: Action, package: ModelPackage):78 action2field = {79 Action.LIKE: 'like_count',80 Action.DISLIKE: 'dislike_count',81 }82 act1 = search_activity(83 f'users/{user_id}', action, package.name)84 if act1:85 act1.delete()86 setattr(package, action2field[action], getattr(87 package, action2field[action]) - 1)88 else:89 ModelActivity('', get_now(), action.value, package.name).create(90 parent=f'users/{user_id}')91 setattr(package, action2field[action], getattr(92 package, action2field[action]) + 1)93 reversed_action = Action.DISLIKE if action == Action.LIKE else Action.LIKE94 act2 = search_activity(95 f'users/{user_id}', reversed_action, package.name)96 if act2:97 act2.delete()98 setattr(package, action2field[reversed_action], getattr(99 package, action2field[reversed_action]) - 1)100 base_package = ModelPackage.from_name(update_package.name)101 if update_mask.has('state'):102 verify_permission_on_update(103 base_package, update_package, update_mask)104 # Remove `like_count`, `dislike_count` from update_mask here and handle the count update105 # explicitly later.106 sanitized_update_mask = FieldMask(107 set(update_mask.paths) - {'like_count', 'dislike_count'})108 package = merge_resource(base_resource=base_package,109 update_request=update_package,110 field_mask=sanitized_update_mask)111 user_id = handler_context.user.user_id112 if update_mask.has('like_count'):113 toggle_like_dislike(user_id, Action.LIKE, package)114 if update_mask.has('dislike_count'):115 toggle_like_dislike(user_id, Action.DISLIKE, package)116 # Delete image resources117 if update_mask.has('image_urls'):118 for image_url_to_delete in set(base_package.image_urls) - set(update_package.image_urls):119 try:120 image = Image.from_url(image_url_to_delete)121 image.delete()122 except Exception as err:123 logger.error(124 f'Failed to delete {image_url_to_delete}: {err}')125 is_visitor = True if set(update_mask.paths) <= {126 'like_count', 'dislike_count'} else False127 if is_visitor:128 package.update(update_update_time=False)129 else:130 package.update(user_id=user_id)131 # notify all subscribers132 author = ModelUser.from_name(f'users/{package.author_id}')133 if on_approve(base_package.state, update_package.state):134 for sub in ModelSubscription.list(ListOptions(parent=author.name))[0]:135 notify(136 sender_id=author.user_id,137 receiver_id=sub.subscriber_id,138 content=f'【新内容】{author.username} 发布了 {package.package_name}',139 link=package.name,140 image_preview=package.image_urls[0] if package.image_urls else '',141 )142 return package143 def delete(self, name: str, handler_context: HandlerContext) -> ModelPackage:144 package = ModelPackage.from_name(name)145 for image_url in package.image_urls:146 try:147 Image.from_url(image_url).delete()148 except Exception:149 logger.error(f'Failed to delete image: image_url={image_url}')150 for binary in ModelBinary.list(ListOptions(parent=package.name))[0]:151 try:152 binary.delete(user_id=handler_context.user.user_id)153 except Exception as err:154 logger.error(155 f'Failed to delete binary: binary={binary} err={err}')156 for thread in ModelThread.list(ListOptions(parent=package.name))[0]:157 try:158 thread.delete(user_id=handler_context.user.user_id)159 except Exception as err:160 logger.error(f'Failed to delete {thread} under {self}: {err}')161 package.delete(user_id=handler_context.user.user_id)162 return package163 def search_packages(self, request, context):164 # (TODO): Reimplement this with ModelPackage...

Full Screen

Full Screen

update_bandwidth_controller.py

Source:update_bandwidth_controller.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2import traceback3from odoo import http4from .execute_query import update_connection5class FreeradiusUpdateBandwitdh(http.Controller):6 @http.route('/freeradius/update_bandwidth', auth='public')7 def index(self, **kw):8 """9 Creating user from odoo erp10 Parameters11 ----------12 username : str13 username for the client. This will be needed for PPoE access14 bandwidth: str15 assign bandwidth for the client. Format 10M/10M16 update_package: str17 name of the updated package18 Returns19 -------20 str21 returns 'success' if successful, otherwise returns debug data. see update_connection22 """23 result=None24 username = kw['username']25 bandwidth = kw['bandwidth']26 update_package = kw['update_package']27 current_package = kw['current_package']28 print(username,bandwidth,update_package,current_package)29 try:30 result=update_connection(username, bandwidth, update_package)31 print('result'+result)32 if result == 'success':33 http.request.env['dgcon_radius.logs'].sudo().create(34 {35 'username': username,36 'bandwidth': bandwidth,37 'message': result,38 'ip_pool':current_package,39 'status': True,40 'type': 'Update',41 'update_package':update_package,42 'radius_error': False43 }44 )45 return result46 else:47 http.request.env['dgcon_radius.logs'].sudo().create(48 {49 'username': username,50 'bandwidth': bandwidth,51 'ip_pool':current_package,52 'message': result,53 'status': False,54 'type': 'Update',55 'update_package': update_package,56 'radius_error': True57 }58 )59 return result60 except:61 http.request.env['dgcon_radius.logs'].sudo().create(62 {63 'username': username,64 'bandwidth': bandwidth,65 'message': traceback.format_exc(),66 'status': False,67 'type': 'Update',68 'update_package': update_package,69 'radius_error': False70 }71 )...

Full Screen

Full Screen

6ea47fd76d53_.py

Source:6ea47fd76d53_.py Github

copy

Full Screen

1"""empty message2Revision ID: 6ea47fd76d533Revises: 4Create Date: 2017-12-06 19:05:09.9797765"""6from alembic import op7import sqlalchemy as sa8# revision identifiers, used by Alembic.9revision = '6ea47fd76d53'10down_revision = None11branch_labels = None12depends_on = None13def upgrade():14 # ### commands auto generated by Alembic - please adjust! ###15 op.add_column('update_package', sa.Column('version_from', sa.String(length=255), nullable=True))16 op.add_column('update_package', sa.Column('version_to', sa.String(length=255), nullable=True))17 op.drop_column('update_package', 'version')18 # ### end Alembic commands ###19def downgrade():20 # ### commands auto generated by Alembic - please adjust! ###21 op.add_column('update_package', sa.Column('version', sa.VARCHAR(length=255), nullable=True))22 op.drop_column('update_package', 'version_to')23 op.drop_column('update_package', 'version_from')...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful