How to use from_error method in refurb

Best Python code snippet using refurb_python

company.py

Source:company.py Github

copy

Full Screen

...28@role_required(level=0) #owner only29def update_company(role, body):30 to_update, invalids = update_row_content(Company, body)31 if invalids:32 raise APIException.from_error(EM(invalids).bad_request)33 try:34 Company.query.filter(Company.id == role.company.id).update(to_update)35 db.session.commit()36 except SQLAlchemyError as e:37 handle_db_error(e)38 return JSONResponse(message=f'Company updated').to_json()39@company_bp.route('/users', methods=['GET'])40@json_required()41@role_required(level=1)#andmin user42def get_company_users(role):43 """44 optional query parameters:45 ?page="pagination-page:str"46 ?limit="pagination-limit:str"47 ?status="role-status:str" -> status_options: active, disabled, pending48 """49 qp = QueryParams(request.args)50 status = qp.get_first_value("status")51 page, limit = qp.get_pagination_params()52 main_q = db.session.query(Role).join(Role.user).filter(Role.company_id == role.company.id)53 54 if status:55 if status == "active":56 main_q = main_q.filter(Role._isActive == True)57 elif status == "disabled":58 main_q = main_q.filter(Role._isActive == False)59 elif status == "pending":60 main_q = main_q.filter(Role._inv_accepted == False)61 roles = main_q.order_by(func.lower(User.fname).asc()).paginate(page, limit)62 return JSONResponse(63 message=qp.get_warings(),64 payload={65 "users": list(map(lambda x: {**x.user.serialize(), "user_role": x.serialize()}, roles.items)),66 **qp.get_pagination_form(roles)67 }).to_json()68@company_bp.route('/users', methods=['POST'])69@json_required({"email": str, "role_id": int})70@role_required(level=1)71def invite_user(role, body):72 email = StringHelpers(body["email"])73 role_id = body["role_id"]74 invalids = Validations.validate_inputs({75 "role_id": IntegerHelpers.is_valid_id(role_id),76 "email": email.is_valid_email()77 })78 if invalids:79 raise APIException.from_error(EM(invalids).bad_request)80 new_role_function = RoleFunction.get_rolefunc_by_id(role_id)81 if not new_role_function:82 raise APIException.from_error(EM({"role_id": f"id-{role_id} not found"}).notFound)83 if role.role_function.level > new_role_function.level:84 raise APIException.from_error(EM({"role_level": "greater authorization level is required"}).unauthorized)85 user = User.get_user_by_email(email.email_normalized)86 # nuevo usuario...87 if not user:88 success, mail_msg = send_user_invitation(user_email=email.email_normalized, company_name=role.company.name)89 if not success:90 raise APIException.from_error(EM(mail_msg).service_unavailable)91 try:92 new_user = User(93 email=email.email_normalized,94 password = StringHelpers.random_password()95 )96 new_role = Role(97 user = new_user,98 company_id = role.company_id,99 role_function = new_role_function100 )101 db.session.add_all([new_user, new_role])102 db.session.commit()103 except SQLAlchemyError as e:104 handle_db_error(e)105 return JSONResponse("new user invited", status_code=201).to_json()106 #ususario existente...107 rel = db.session.query(User).join(User.roles).join(Role.company).\108 filter(User.id == user.id, Company.id == role.company_id).first()109 110 if rel:111 raise APIException.from_error(112 EM({"email": f"user <{email.value}> is already listed in current company'"}).conflict113 )114 115 sent, mail_msg = send_user_invitation(user_email=email.email_normalized, user_name=user.fname, company_name=role.company.name)116 if not sent:117 raise APIException.from_error(EM(mail_msg).service_unavailable)118 119 try:120 new_role = Role(121 company_id = role.company_id,122 user = user,123 role_function = new_role_function124 )125 db.session.add(new_role)126 db.session.commit()127 except SQLAlchemyError as e:128 handle_db_error(e)129 return JSONResponse('existing user invited').to_json()130@company_bp.route('/users/<int:user_id>', methods=['PUT'])131@json_required({'role_id':int, 'is_active':bool})132@role_required(level=1)133def update_user_company_relation(role, body, user_id):134 role_id = body["role_id"]135 new_status = body['is_active']136 invalids = Validations.validate_inputs({137 "role_id": IntegerHelpers.is_valid_id(role_id),138 "user_id": IntegerHelpers.is_valid_id(user_id)139 })140 if invalids:141 raise APIException.from_error(EM(invalids).bad_request)142 if role_id == role.user.id:143 raise APIException.from_error(EM({"role_id": "can't update self-user role"}).conflict)144 target_role = Role.get_relation_user_company(user_id, role.company.id)145 if not target_role:146 raise APIException.from_error(EM({"user_id": f"id-{user_id} not found"}).notFound)147 148 new_rolefunction = RoleFunction.get_rolefunc_by_id(role_id)149 if not new_rolefunction:150 raise APIException.from_error(EM({"role_id": "not found"}).notFound)151 if role.role_function.level > new_rolefunction.level:152 raise APIException.from_error(EM({"role_level": "greater authorization level required"}).unauthorized)153 154 try:155 target_role.role_function = new_rolefunction156 target_role._isActive = new_status157 db.session.commit()158 except SQLAlchemyError as e:159 handle_db_error(e)160 return JSONResponse("user role updated").to_json()161@company_bp.route('/users/<int:user_id>', methods=['DELETE'])162@json_required()163@role_required(level=1)164def delete_user_company_relation(role, user_id):165 valid, msg = IntegerHelpers.is_valid_id(user_id)166 if not valid:167 raise APIException.from_error(EM({"user_id": msg}).bad_request)168 if user_id == role.user.id:169 raise APIException.from_error(EM({"role_id": "can't delete self-user role"}).conflict)170 target_role = Role.get_relation_user_company(user_id, role.company.id)171 if not target_role:172 raise APIException.from_error(EM({"user_id": "not found"}).notFound)173 try:174 db.session.delete(target_role)175 db.session.commit()176 except IntegrityError as ie:177 raise APIException.from_error(EM({"delete": f"can't delete user-id: {user_id} relation - {ie}"}).conflict)178 except SQLAlchemyError as e:179 handle_db_error(e)180 return JSONResponse("user-relation was deleted of company").to_json()181@company_bp.route('/roles', methods=['GET'])182@json_required()183@role_required()#any user184def get_company_roles(role):185 return JSONResponse(payload={186 "roles": list(map(lambda x: x.serialize(), db.session.query(RoleFunction).all()))187 }).to_json()188@company_bp.route('/providers', methods=['GET'])189@json_required()190@role_required(level=1)191def get_company_providers(role):192 193 qp = QueryParams(request.args)194 provider_id = qp.get_first_value("provider_id", as_integer=True)195 if not provider_id:196 main_q = role.company.providers197 name_like = StringHelpers(qp.get_first_value("name_like"))198 if name_like:199 main_q = main_q.filter(Unaccent(func.lower(Provider.name)).like(f"%{name_like.unaccent.lower()}%"))200 page, limit = qp.get_pagination_params()201 providers = main_q.paginate(page, limit)202 return JSONResponse(203 message=qp.get_warings(),204 payload={205 'providers': list(map(lambda x: x.serialize(), providers.items)),206 **qp.get_pagination_form(providers)207 }).to_json()208 209 #provider_id in url parameters210 valid, msg = IntegerHelpers.is_valid_id(provider_id)211 if not valid:212 raise APIException.from_error(EM({"provider_id": msg}).bad_request)213 provider = role.company.get_provider(provider_id)214 if not provider:215 raise APIException.from_error(EM({"provider_id": f"id-{provider_id} not found"}).notFound)216 return JSONResponse(217 message=qp.get_warings(),218 payload={'provider': provider.serialize_all()}219 ).to_json()220@company_bp.route('/providers', methods=['POST'])221@json_required({'name': str})222@role_required(level=1)223def create_provider(role, body):224 to_add, invalids = update_row_content(Provider, body)225 if invalids:226 raise APIException.from_error(EM(invalids).bad_request)227 to_add.update({'company_id': role.company.id}) #se agrega company228 new_provider = Provider(**to_add)229 try:230 db.session.add(new_provider)231 db.session.commit()232 except SQLAlchemyError as e:233 handle_db_error(e)234 return JSONResponse(235 message='new provider was created', 236 payload={'provider': new_provider.serialize_all()},237 status_code=201238 ).to_json()239@company_bp.route('/providers/<int:provider_id>', methods=['PUT'])240@json_required()241@role_required(level=1)242def update_provider(role, body, provider_id):243 valid, msg = IntegerHelpers.is_valid_id(provider_id)244 if not valid:245 raise APIException.from_error(EM({"provider_id": msg}).bad_request)246 provider = role.company.get_provider(provider_id)247 if not provider:248 raise APIException.from_error(EM({"provider_id": f"id-{provider_id} not found"}).notFound)249 to_update, invalids = update_row_content(Provider, body)250 if invalids:251 raise APIException.from_error(EM(invalids).bad_request)252 try:253 db.session.query(Provider).filter(Provider.id == provider.id).update(to_update)254 db.session.commit()255 except SQLAlchemyError as e:256 handle_db_error(e)257 return JSONResponse(message=f'provider-id: {provider_id} was updated').to_json()258@company_bp.route('/providers/<int:provider_id>', methods=['DELETE'])259@json_required()260@role_required(level=1)261def delete_provider(role, provider_id):262 valid, msg = IntegerHelpers.is_valid_id(provider_id)263 if not valid:264 raise APIException.from_error(EM({"provider_id": msg}).bad_request)265 266 target_provider = role.company.get_provider(provider_id)267 if not target_provider:268 raise APIException.from_error(EM({"provider_id": f"id-{provider_id} not found"}).notFound)269 try:270 db.session.delete(target_provider)271 db.session.commit()272 except IntegrityError as ie:273 raise APIException.from_error(EM({"provider_id": f"can't delete provider id-{provider_id}, {ie}"}).conflict)274 except SQLAlchemyError as e:275 handle_db_error(e)276 return JSONResponse(message=f'provider-id: {provider_id} has been deleted').to_json()277@company_bp.route('/categories', methods=['GET'])278@json_required()279@role_required()280def get_company_categories(role):281 qp = QueryParams(request.args)282 category_id = qp.get_first_value("category_id", as_integer=True)283 if not category_id:284 cat = role.company.categories.filter(Category.parent_id == None).order_by(Category.name.asc()).all() #root categories only285 286 return JSONResponse(287 message=qp.get_warings(),288 payload={289 "categories": list(map(lambda x: x.serialize_children(), cat))290 }291 ).to_json()292 #category-id is present in the url-parameters293 valid, msg = IntegerHelpers.is_valid_id(category_id)294 if not valid:295 raise APIException.from_error(EM({"category_id": msg}).bad_request)296 cat = role.company.get_category_by_id(category_id)297 if not cat:298 raise APIException.from_error(EM({"category_id": f"id-{category_id} not found"}).notFound)299 #return item300 return JSONResponse(301 payload={302 'category': cat.serialize_all()303 }304 ).to_json()305@company_bp.route('/categories', methods=['POST'])306@company_bp.route('/categories/<int:category_id>', methods=['PUT'])307@json_required({'name': str})308@role_required()309def create_or_update_category(role, body, category_id=None):310 parent_id = body.get('parent_id', None)311 new_name = StringHelpers(body["name"])312 if parent_id:313 valid, msg = IntegerHelpers.is_valid_id(parent_id)314 if not valid:315 raise APIException.from_error(EM({"parent_id": msg}).bad_request)316 parent = role.company.get_category_by_id(parent_id)317 if not parent:318 raise APIException.from_error(EM({"parent_id": f"id-{parent_id} not found"}).notFound)319 newRows, invalids = update_row_content(Category, body)320 if invalids:321 raise APIException.from_error(EM(invalids).bad_request)322 main_q = db.session.query(Category).select_from(Company).join(Company.categories).\323 filter(Company.id == role.company.id, Unaccent(func.lower(Category.name)) == new_name.unaccent.lower())324 #POST method325 if request.method == "POST":326 category_exists = main_q.first()327 if category_exists:328 raise APIException.from_error(EM({"name": f"category name [{new_name.value}] already exists"}).conflict)329 newRows.update({'company_id': role.company.id, "parent_id": parent_id})330 new_category = Category(**newRows)331 try:332 db.session.add(new_category)333 db.session.commit()334 except SQLAlchemyError as e:335 handle_db_error(e)336 return JSONResponse(337 payload={"category": new_category.serialize()},338 status_code=201339 ).to_json()340 #PUT method341 target_cat = role.company.get_category_by_id(category_id)342 if not target_cat:343 raise APIException.from_error(EM({"category_id": f"id-{category_id} not found"}).notFound)344 name_exists = main_q.filter(Category.id != category_id).first()345 if name_exists:346 raise APIException.from_error(EM({"name": f"category name [{new_name.value}] already exists"}).conflict)347 try:348 db.session.query(Category).filter(Category.id == target_cat.id).update(newRows)349 db.session.commit()350 except SQLAlchemyError as e:351 handle_db_error(e)352 return JSONResponse(f'category-id: {category_id} updated').to_json()353@company_bp.route("/categories/<int:cat_id>", methods=["DELETE"])354@json_required()355@role_required(level=1)356def delete_category(role, cat_id):357 valid, msg = IntegerHelpers.is_valid_id(cat_id)358 if not valid:359 raise APIException.from_error(EM({"cat_id": msg}).bad_request)360 target_cat = role.company.get_category_by_id(cat_id)361 if not target_cat:362 raise APIException.from_error(EM({"cat_id": f"id-{cat_id} not found"}).notFound)363 try:364 db.session.delete(target_cat)365 db.session.commit()366 367 except IntegrityError as ie:368 raise APIException.from_error(EM({"cat_id": f"can't delete category_id: {cat_id}, {ie}"}).conflict)369 except SQLAlchemyError as e:370 handle_db_error(e)371 return JSONResponse(372 message=f"category_id: {cat_id} has been deleted"373 ).to_json()374@company_bp.route('/categories/<int:cat_id>/attributes', methods=['GET'])375@json_required()376@role_required()377def get_category_attributes(role, cat_id):378 valid, msg = IntegerHelpers.is_valid_id(cat_id)379 if not valid:380 raise APIException.from_error(EM({"cat_id": msg}).bad_request)381 382 target_cat = role.company.get_category_by_id(cat_id)383 if not target_cat:384 raise APIException.from_error(EM({"cat_id": f"id-{cat_id} not found"}).notFound)385 all_attributes = target_cat.get_attributes()386 return JSONResponse(387 payload={'attributes': list(map(lambda x: x.serialize_all(), all_attributes))}388 ).to_json()389@company_bp.route('/categories/<int:category_id>/attributes', methods=['PUT'])390@json_required({'attributes': list})391@role_required(level=1)392def update_category_attributes(role, body, category_id):393 attributes = body.get('attributes')394 valid, msg = IntegerHelpers.is_valid_id(category_id)395 if not valid:396 raise APIException.from_error(EM({"category_id": msg}).bad_request)397 target_cat = role.company.get_category_by_id(category_id)398 if not target_cat:399 raise APIException.from_error(EM({"category_id": f"id-{category_id} not found"}).notFound)400 if not attributes: #empty list clear all attibutes401 try:402 target_cat.attributes = []403 db.session.commit()404 except SQLAlchemyError as e:405 handle_db_error(e)406 return JSONResponse(407 message=f'all attributes of category-id: {category_id} were removed',408 ).to_json()409 not_integer = [r for r in attributes if not isinstance(r, int)]410 if not_integer:411 raise APIException.from_error(EM(412 {"attributes": f"list of attributes must include integers values only.. <{not_integer}> were detected"}413 ).bad_request)414 new_attributes = role.company.attributes.filter(Attribute.id.in_(attributes)).all()415 if not new_attributes:416 raise APIException.from_error(EM({"attributes": "no attributes were found in the database"}).notFound)417 try:418 target_cat.attributes = new_attributes419 db.session.commit()420 except SQLAlchemyError as e:421 handle_db_error(e)422 return JSONResponse(423 message=f'category-id: {category_id} updated',424 payload={425 'category': target_cat.serialize()426 }427 ).to_json()428@company_bp.route("/item-attributes", methods=["GET"])429@json_required()430@role_required()431def get_company_attributes(role):432 qp = QueryParams(request.args)433 attribute_id = qp.get_first_value("attribute_id", as_integer=True)434 if not attribute_id:435 page, limit = qp.get_pagination_params()436 name_like = StringHelpers(qp.get_first_value("name_like"))437 main_q = role.company.attributes.order_by(Attribute.name.asc())438 if name_like:439 main_q = main_q.filter(Unaccent(func.lower(Attribute.name)).like(f"%{name_like.unaccent.lower()}%"))440 attributes = main_q.paginate(page, limit)441 return JSONResponse(442 message=qp.get_warings(),443 payload={444 'attributes': list(map(lambda x:x.serialize(), attributes.items)),445 **qp.get_pagination_form(attributes)446 }447 ).to_json()448 valid, msg = IntegerHelpers.is_valid_id(attribute_id)449 if not valid:450 raise APIException.from_error(EM({"attribute_id": msg}).bad_request)451 452 target_attr = role.company.get_attribute(attribute_id)453 if not target_attr:454 raise APIException.from_error(EM({"attribute_id": f"id-{attribute_id} not found"}).notFound)455 return JSONResponse(456 message=qp.get_warings(),457 payload={458 'attribute': target_attr.serialize_all()459 }460 ).to_json()461@company_bp.route('/item-attributes', methods=["POST"])462@json_required({'name': str})463@role_required(level=1) 464def create_attribute(role, body):465 name = StringHelpers(body["name"])466 attribute_exists = db.session.query(Attribute).select_from(Company).join(Company.attributes).\467 filter(Unaccent(func.lower(Attribute.name)) == name.unaccent.lower()).first()468 469 if attribute_exists:470 raise APIException.from_error(EM({"name": f"attribute <{name.value}> already exists"}).conflict)471 472 to_add, invalids = update_row_content(Attribute, body)473 if invalids:474 raise APIException.from_error(EM(invalids).bad_request)475 476 to_add.update({'company_id': role.company.id})477 new_attribute = Attribute(**to_add)478 try:479 db.session.add(new_attribute)480 db.session.commit()481 except SQLAlchemyError as e:482 handle_db_error(e)483 return JSONResponse(484 payload={485 'attribute': new_attribute.serialize()486 },487 message='new attribute created',488 status_code=201489 ).to_json()490@company_bp.route("/item-attributes/<int:attribute_id>", methods=["PUT"])491@json_required({'name': str})492@role_required(level=1)493def update_attribute(role, body, attribute_id):494 name = StringHelpers(body["name"])495 invalids = Validations.validate_inputs({496 "attribute_id": IntegerHelpers.is_valid_id(attribute_id),497 "name": name.is_valid_string()498 })499 if invalids:500 raise APIException.from_error(EM(invalids).bad_request)501 target_attr = role.company.get_attribute(attribute_id)502 if not target_attr:503 raise APIException.from_error(EM({"attribute_id": f"id-{attribute_id} not found"}).notFound)504 attribute_exists = db.session.query(Attribute).select_from(Company).join(Company.attributes).\505 filter(Unaccent(func.lower(Attribute.name)) == name.unaccent.lower()).first()506 507 if attribute_exists:508 raise APIException.from_error(EM({"name": f"attribute <{name.value}> already exists"}).conflict)509 to_update, invalids = update_row_content(Attribute, body)510 if invalids:511 raise APIException.from_error(EM(invalids).bad_request)512 513 try:514 db.session.query(Attribute).filter(Attribute.id == target_attr.id).update(to_update)515 db.session.commit()516 except SQLAlchemyError as e:517 handle_db_error(e)518 return JSONResponse(519 message=f'attribute-id: {attribute_id} updated'520 ).to_json()521@company_bp.route("/item-attributes/<int:attribute_id>", methods=["DELETE"])522@json_required()523@role_required(level=1)524def delete_attribute(role, attribute_id):525 valid, msg = IntegerHelpers.is_valid_id(attribute_id)526 if not valid:527 raise APIException.from_error(EM({"attribute_id": msg}).bad_request)528 target_attr = role.company.get_attribute(attribute_id)529 if not target_attr:530 raise APIException.from_error(EM({"attribute_id": f"id-{attribute_id} not found"}).notFound)531 try:532 db.session.delete(target_attr)533 db.session.commit()534 535 except IntegrityError as ie:536 raise APIException.from_error(EM({"attribute_id": f"can't delete attribute_id: {attribute_id} - {ie}"}).conflict)537 except SQLAlchemyError as e:538 handle_db_error(e)539 return JSONResponse(f'attribute_id: {attribute_id} deleted').to_json()540@company_bp.route('/item-attributes/<int:attribute_id>/values', methods=["GET"])541@json_required()542@role_required()543def get_attribute_values(role, attribute_id):544 545 qp = QueryParams(request.args)546 valid, msg = IntegerHelpers.is_valid_id(attribute_id)547 if not valid:548 raise APIException.from_error(EM({"attribute_id": msg}).bad_request)549 target_attr = role.company.get_attribute(attribute_id)550 if not target_attr:551 raise APIException.from_error(EM({"attribute_id": f"id-{attribute_id} not found"}).notFound)552 main_q = target_attr.attribute_values.order_by(AttributeValue.value.asc())553 page, limit = qp.get_pagination_params()554 name_like = qp.get_first_value("name_like")555 if name_like:556 sh = StringHelpers(string=name_like)557 main_q = main_q.filter(Unaccent(func.lower(AttributeValue.value)).like(f'%{sh.unaccent.lower()}%'))558 main_q = main_q.paginate(page, limit)559 560 payload = {561 'attribute': target_attr.serialize(),562 'values': list(map(lambda x: x.serialize(), main_q.items)),563 **qp.get_pagination_form(main_q)564 }565 return JSONResponse(566 payload=payload,567 message=qp.get_warings()568 ).to_json()569@company_bp.route("/item-attributes/<int:attribute_id>/values", methods=["POST"])570@json_required({'attribute_value': str})571@role_required(level=1) 572def create_attribute_value(role, body, attribute_id):573 attr_value = StringHelpers(body["attribute_value"])574 invalids = Validations.validate_inputs({575 "attribute_value": attr_value.is_valid_string(),576 "attribute_id": IntegerHelpers.is_valid_id(attribute_id)577 })578 if invalids:579 raise APIException.from_error(EM(invalids).bad_request)580 target_attr = role.company.get_attribute(attribute_id)581 if not target_attr:582 raise APIException.from_error(EM({"attribute_id": f"id-{attribute_id} not found"}).notFound)583 584 value_exists = target_attr.attribute_values.\585 filter(Unaccent(func.lower(AttributeValue.value)) == attr_value.unaccent.lower()).first()586 if value_exists:587 raise APIException.from_error(EM({"attribute_value": f"attribute_value: {attr_value.value} already exists"}).conflict)588 new_attr_value = AttributeValue(589 value = attr_value.value,590 attribute = target_attr591 )592 try:593 db.session.add(new_attr_value)594 db.session.commit()595 except SQLAlchemyError as e:596 handle_db_error(e)597 return JSONResponse(598 message= f'new value created for attribute_id: {attribute_id}',599 payload= {600 'attribute': target_attr.serialize(),601 'new_value': new_attr_value.serialize()602 },603 status_code=201604 ).to_json()605@company_bp.route("/item-attributes/values/<int:value_id>", methods=["PUT"])606@json_required({'attribute_value': str})607@role_required(level=1)608def update_attribute_value(role, body, value_id):609 attr_value = StringHelpers(body["attribute_value"])610 invalids = Validations.validate_inputs({611 "attribute_value": attr_value.is_valid_string(),612 "value_id": IntegerHelpers.is_valid_id(value_id)613 })614 if invalids:615 raise APIException.from_error(EM(invalids).bad_request)616 base_q = db.session.query(AttributeValue).select_from(Company).join(Company.attributes).\617 join(Attribute.attribute_values).filter(Company.id == role.company.id)618 value_exists = base_q.filter(Unaccent(func.lower(AttributeValue.value)) == attr_value.unaccent.lower()).first()619 if value_exists:620 raise APIException.from_error(EM({"attribute_value": f"attribute_value: {attr_value.value} already exists"}).conflict)621 target_value = base_q.filter(AttributeValue.id == value_id).first()622 if not target_value:623 raise APIException.from_error(EM({"target_value": f"id-{value_id}"}).notFound)624 625 try:626 target_value.value = attr_value.value627 db.session.commit()628 except SQLAlchemyError as e:629 handle_db_error(e)630 return JSONResponse(631 message=f'attribute_value_id: <{value_id} updated>'632 ).to_json()633@company_bp.route("/item-attributes/values/<int:value_id>", methods=["DELETE"])634@json_required()635@role_required(level=1)636def delete_attributeValue(role, value_id):637 valid, msg = IntegerHelpers.is_valid_id(value_id)638 if not valid:639 raise APIException.from_error(EM({"value_id": msg}).bad_request)640 target_value = db.session.query(AttributeValue).select_from(Company).join(Company.attributes).join(Attribute.attribute_values).\641 filter(Company.id == role.company.id, AttributeValue.id == value_id).first()642 if not target_value:643 raise APIException.from_error(EM({"value_id": f"id-{value_id} not found"}).notFound)644 try:645 db.session.delete(target_value)646 db.session.commit()647 except IntegrityError as ie:648 raise APIException.from_error(EM({"value_id": f"can't delete value_id:{value_id} - {ie}"}))649 except SQLAlchemyError as e:650 handle_db_error(e)651 return JSONResponse(652 message=f'attribute_value_id: {value_id} deleted'653 ).to_json()654@company_bp.route("/qrcodes", methods=["GET"])655@json_required()656@role_required()657def get_all_qrcodes(role):658 qp = QueryParams(request.args)659 page, limit = qp.get_pagination_params()660 q = db.session.query(QRCode).select_from(Company).join(Company.qr_codes)661 662 status = qp.get_first_value("status")663 if status:664 if status == "active":665 q = q.filter(QRCode.is_active == True)666 if status == "disabled":667 q = q.filter(QRCode.is_active == False)668 if status == "used":669 q = q.filter(QRCode.container != None)670 if status == "free":671 q = q.filter(QRCode.container == None)672 673 qr_codes = q.paginate(page, limit)674 return JSONResponse(675 message=qp.get_warings(),676 payload={677 "qr_codes": list(map(lambda x:x.serialize(), qr_codes.items)),678 **qp.get_pagination_form(qr_codes)679 }680 ).to_json()681@company_bp.route("/qrcodes", methods=["POST"])682@json_required({"count": int})683@role_required()684def create_qrcode(role, body):685 count = body["count"]686 if count <= 0:687 raise APIException.from_error(EM({"count": f"count can't be less than 0"}).bad_request)688 bulk_qrcode = []689 while len(bulk_qrcode) < count:690 new_qrcode = QRCode(company_id = role.company.id)691 try:692 db.session.add(new_qrcode)693 db.session.commit()694 except SQLAlchemyError as e:695 handle_db_error(e)696 bulk_qrcode.append(new_qrcode)697 return JSONResponse(698 message="ok",699 payload={"qrcodes": list(map(lambda x:x.serialize(), bulk_qrcode))}700 ).to_json()701@company_bp.route("/qrcodes", methods=["DELETE"])702@json_required()703@role_required()704def delete_qrcode(role):705 qp = QueryParams(request.args)706 qrcodeList = qp.get_all_integers("qrcode")707 if not qrcodeList:708 raise APIException.from_error(EM({"qrcode": qp.get_warings()}).bad_request)709 #query returns: [(1,), (2,), ... (n,)] list of tuples with matching ids710 target_qrcodes = db.session.query(QRCode.id).join(QRCode.company).\711 filter(Company.id == role.company.id, QRCode.id.in_(qrcodeList), QRCode.container == None).all()712 if not target_qrcodes:713 raise APIException.from_error(EM({"qrcode": f"passed ids: {qrcodeList} not found"}).notFound)714 qrcodeList = [r[0] for r in target_qrcodes] #get list of ids inside each tuple returned by the query715 try:716 db.session.query(QRCode).filter(QRCode.id.in_(qrcodeList)).delete()717 db.session.commit()718 except IntegrityError as ie:719 raise APIException.from_error(EM({"qrcode_ud": f"can't delete qrcode_id:{qrcodeList} - {ie}"}).conflict)720 except SQLAlchemyError as e:721 handle_db_error(e)722 return JSONResponse(723 message=f"qrcode_id: {qrcodeList} deleted"...

Full Screen

Full Screen

mypy_plugin.py

Source:mypy_plugin.py Github

copy

Full Screen

1"""2 * Copyright(c) 2021 to 2022 ZettaScale Technology and others3 *4 * This program and the accompanying materials are made available under the5 * terms of the Eclipse Public License v. 2.0 which is available at6 * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License7 * v. 1.0 which is available at8 * http://www.eclipse.org/org/documents/edl-v10.php.9 *10 * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause11"""12from mypy.plugin import Plugin, AnalyzeTypeContext13from mypy.types import RawExpressionType, AnyType, TypeOfAny14from mypy.typeanal import make_optional_type15class IdlMyPyPlugin(Plugin):16 def get_type_analyze_hook(self, fullname: str):17 if fullname.startswith("cyclonedds.idl.types"):18 if fullname == "cyclonedds.idl.types.array":19 return strip_length_array_type20 elif fullname == "cyclonedds.idl.types.sequence":21 return strip_length_sequence_type22 elif fullname == "cyclonedds.idl.types.typedef":23 return resolve_type_def24 elif fullname == "cyclonedds.idl.types.bounded_str":25 return bounded_str_to_str26 elif fullname == "cyclonedds.idl.types.default":27 return union_default_type28 elif fullname == "cyclonedds.idl.types.case":29 return union_case_type30 return None31 def get_class_decorator_hook(self, fullname: str):32 from mypy.plugins import dataclasses33 if fullname in ["cyclonedds.idl.IdlStruct", "cyclonedds.idl.IdlUnion"]:34 return dataclasses.dataclass_class_maker_callback35def plugin(version: str):36 return IdlMyPyPlugin37def strip_length_array_type(ctx: AnalyzeTypeContext):38 if len(ctx.type.args) == 0:39 # ref of type itself40 return AnyType(TypeOfAny.special_form)41 if len(ctx.type.args) != 2 or not isinstance(ctx.type.args[1], RawExpressionType) or \42 ctx.type.args[1].base_type_name != "builtins.int":43 ctx.api.fail("cyclonedds.idl.types.array requires two arguments, a subtype and a fixed size.", ctx.context)44 return AnyType(TypeOfAny.from_error)45 return ctx.api.named_type('typing.Sequence', [ctx.api.analyze_type(ctx.type.args[0])])46def resolve_type_def(ctx: AnalyzeTypeContext):47 if len(ctx.type.args) == 0:48 # ref of type itself49 return AnyType(TypeOfAny.special_form)50 if len(ctx.type.args) != 2:51 ctx.api.fail("cyclonedds.idl.types.typedef requires a name and one type argument.", ctx.context)52 return AnyType(TypeOfAny.from_error)53 return ctx.api.analyze_type(ctx.type.args[1])54def strip_length_sequence_type(ctx: AnalyzeTypeContext):55 if len(ctx.type.args) == 0:56 # ref of type itself57 return AnyType(TypeOfAny.special_form)58 if len(ctx.type.args) not in [1, 2]:59 ctx.api.fail("cyclonedds.idl.types.sequence requires a subtype and an optional max size.", ctx.context)60 return AnyType(TypeOfAny.from_error)61 elif len(ctx.type.args) == 2 and not isinstance(ctx.type.args[1], RawExpressionType):62 ctx.api.fail("cyclonedds.idl.types.sequence max size should be an integer.", ctx.context)63 return AnyType(TypeOfAny.from_error)64 elif len(ctx.type.args) == 2 and ctx.type.args[1].base_type_name != "builtins.int":65 ctx.api.fail("cyclonedds.idl.types.sequence max size should be an integer.", ctx.context)66 return AnyType(TypeOfAny.from_error)67 return ctx.api.named_type('typing.Sequence', [ctx.api.analyze_type(ctx.type.args[0])])68def bounded_str_to_str(ctx: AnalyzeTypeContext):69 if len(ctx.type.args) == 0:70 # ref of type itself71 return AnyType(TypeOfAny.special_form)72 if len(ctx.type.args) != 1 or not isinstance(ctx.type.args[0], RawExpressionType) or \73 ctx.type.args[0].base_type_name != "builtins.int":74 ctx.api.fail("cyclonedds.idl.types.bound_str requires one argument, a fixed size.", ctx.context)75 return AnyType(TypeOfAny.from_error)76 return ctx.api.named_type('builtins.str', [])77def union_default_type(ctx: AnalyzeTypeContext):78 if len(ctx.type.args) == 0:79 # ref of type itself80 return AnyType(TypeOfAny.special_form)81 if len(ctx.type.args) != 1:82 ctx.api.fail("cyclonedds.idl.types.default requires one argument, a type.", ctx.context)83 return AnyType(TypeOfAny.from_error)84 return make_optional_type(ctx.api.analyze_type(ctx.type.args[0]))85def union_case_type(ctx: AnalyzeTypeContext):86 if len(ctx.type.args) == 0:87 # ref of type itself88 return AnyType(TypeOfAny.special_form)89 if len(ctx.type.args) != 2:90 ctx.api.fail("cyclonedds.idl.types.case requires two arguments, a discriminator label and a type.", ctx.context)91 return AnyType(TypeOfAny.from_error)...

Full Screen

Full Screen

__init__.py

Source:__init__.py Github

copy

Full Screen

1# SPDX-License-Identifier: MIT2import typing as t3from mypy.plugin import AnalyzeTypeContext, Plugin4from mypy.types import AnyType, EllipsisType, RawExpressionType, Type, TypeOfAny5class DisnakePlugin(Plugin):6 def get_type_analyze_hook(7 self, fullname: str8 ) -> t.Optional[t.Callable[[AnalyzeTypeContext], Type]]:9 if fullname == "disnake.ext.commands.params.Range":10 return range_type_analyze_callback11 if fullname == "disnake.ext.commands.params.String":12 return string_type_analyze_callback13 return None14def range_type_analyze_callback(ctx: AnalyzeTypeContext) -> Type:15 args = ctx.type.args16 if len(args) != 2:17 ctx.api.fail(f'"Range" expected 2 parameters, got {len(args)}', ctx.context)18 return AnyType(TypeOfAny.from_error)19 for arg in args:20 if isinstance(arg, EllipsisType):21 continue22 if not isinstance(arg, RawExpressionType):23 ctx.api.fail('invalid usage of "Range"', ctx.context)24 return AnyType(TypeOfAny.from_error)25 name = arg.simple_name()26 # if one is a float, `Range.underlying_type` returns `float`27 if name == "float":28 return ctx.api.named_type("builtins.float", [])29 # otherwise it should be an int; fail if it isn't30 elif name != "int":31 ctx.api.fail(f'"Range" parameters must be int or float, not {name}', ctx.context)32 return AnyType(TypeOfAny.from_error)33 return ctx.api.named_type("builtins.int", [])34def string_type_analyze_callback(ctx: AnalyzeTypeContext) -> Type:35 args = ctx.type.args36 if len(args) != 2:37 ctx.api.fail(f'"String" expected 2 parameters, got {len(args)}', ctx.context)38 return AnyType(TypeOfAny.from_error)39 for arg in args:40 if isinstance(arg, EllipsisType):41 continue42 if not isinstance(arg, RawExpressionType):43 ctx.api.fail('invalid usage of "String"', ctx.context)44 return AnyType(TypeOfAny.from_error)45 name = arg.simple_name()46 if name != "int":47 ctx.api.fail(f'"String" parameters must be int, not {name}', ctx.context)48 return AnyType(TypeOfAny.from_error)49 return ctx.api.named_type("builtins.str", [])50def plugin(version: str) -> t.Type[Plugin]:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run refurb automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful