Best Python code snippet using refurb_python
company.py
Source:company.py  
...28@role_required(level=0) #owner only29def update_company(role, body):30    to_update, invalids = update_row_content(Company, body)31    if invalids:32        raise APIException.from_error(EM(invalids).bad_request)33    try:34        Company.query.filter(Company.id == role.company.id).update(to_update)35        db.session.commit()36    except SQLAlchemyError as e:37        handle_db_error(e)38    return JSONResponse(message=f'Company updated').to_json()39@company_bp.route('/users', methods=['GET'])40@json_required()41@role_required(level=1)#andmin user42def get_company_users(role):43    """44    optional query parameters:45        ?page="pagination-page:str"46        ?limit="pagination-limit:str"47        ?status="role-status:str" -> status_options: active, disabled, pending48    """49    qp = QueryParams(request.args)50    status = qp.get_first_value("status")51    page, limit = qp.get_pagination_params()52    main_q = db.session.query(Role).join(Role.user).filter(Role.company_id == role.company.id)53    54    if status:55        if status == "active":56            main_q = main_q.filter(Role._isActive == True)57        elif status == "disabled":58            main_q = main_q.filter(Role._isActive == False)59        elif status == "pending":60            main_q = main_q.filter(Role._inv_accepted == False)61    roles = main_q.order_by(func.lower(User.fname).asc()).paginate(page, limit)62    return JSONResponse(63        message=qp.get_warings(),64        payload={65            "users": list(map(lambda x: {**x.user.serialize(), "user_role": x.serialize()}, roles.items)),66            **qp.get_pagination_form(roles)67        }).to_json()68@company_bp.route('/users', methods=['POST'])69@json_required({"email": str, "role_id": int})70@role_required(level=1)71def invite_user(role, body):72    email = StringHelpers(body["email"])73    role_id = body["role_id"]74    invalids = Validations.validate_inputs({75        "role_id": IntegerHelpers.is_valid_id(role_id),76        "email": email.is_valid_email()77    })78    if invalids:79        raise APIException.from_error(EM(invalids).bad_request)80    new_role_function = RoleFunction.get_rolefunc_by_id(role_id)81    if not new_role_function:82        raise APIException.from_error(EM({"role_id": f"id-{role_id} not found"}).notFound)83    if role.role_function.level > new_role_function.level:84        raise APIException.from_error(EM({"role_level": "greater authorization level is required"}).unauthorized)85    user = User.get_user_by_email(email.email_normalized)86    # nuevo usuario...87    if not user:88        success, mail_msg = send_user_invitation(user_email=email.email_normalized, company_name=role.company.name)89        if not success:90            raise APIException.from_error(EM(mail_msg).service_unavailable)91        try:92            new_user = User(93                email=email.email_normalized,94                password = StringHelpers.random_password()95            )96            new_role = Role(97                user = new_user,98                company_id = role.company_id,99                role_function = new_role_function100            )101            db.session.add_all([new_user, new_role])102            db.session.commit()103        except SQLAlchemyError as e:104            handle_db_error(e)105        return JSONResponse("new user invited", status_code=201).to_json()106    #ususario existente...107    rel = db.session.query(User).join(User.roles).join(Role.company).\108        filter(User.id == user.id, Company.id == role.company_id).first()109    110    if rel:111        raise APIException.from_error(112            EM({"email": f"user <{email.value}> is already listed in current company'"}).conflict113        )114    115    sent, mail_msg = send_user_invitation(user_email=email.email_normalized, user_name=user.fname, company_name=role.company.name)116    if not sent:117        raise APIException.from_error(EM(mail_msg).service_unavailable)118        119    try:120        new_role = Role(121            company_id = role.company_id,122            user = user,123            role_function = new_role_function124        )125        db.session.add(new_role)126        db.session.commit()127    except SQLAlchemyError as e:128        handle_db_error(e)129    return JSONResponse('existing user invited').to_json()130@company_bp.route('/users/<int:user_id>', methods=['PUT'])131@json_required({'role_id':int, 'is_active':bool})132@role_required(level=1)133def update_user_company_relation(role, body, user_id):134    role_id = body["role_id"]135    new_status = body['is_active']136    invalids = Validations.validate_inputs({137        "role_id": IntegerHelpers.is_valid_id(role_id),138        "user_id": IntegerHelpers.is_valid_id(user_id)139    })140    if invalids:141        raise APIException.from_error(EM(invalids).bad_request)142    if role_id == role.user.id:143        raise APIException.from_error(EM({"role_id": "can't update self-user role"}).conflict)144    target_role = Role.get_relation_user_company(user_id, role.company.id)145    if not target_role:146        raise APIException.from_error(EM({"user_id": f"id-{user_id} not found"}).notFound)147    148    new_rolefunction = RoleFunction.get_rolefunc_by_id(role_id)149    if not new_rolefunction:150        raise APIException.from_error(EM({"role_id": "not found"}).notFound)151    if role.role_function.level > new_rolefunction.level:152        raise APIException.from_error(EM({"role_level": "greater authorization level required"}).unauthorized)153        154    try:155        target_role.role_function = new_rolefunction156        target_role._isActive = new_status157        db.session.commit()158    except SQLAlchemyError as e:159        handle_db_error(e)160    return JSONResponse("user role updated").to_json()161@company_bp.route('/users/<int:user_id>', methods=['DELETE'])162@json_required()163@role_required(level=1)164def delete_user_company_relation(role, user_id):165    valid, msg = IntegerHelpers.is_valid_id(user_id)166    if not valid:167        raise APIException.from_error(EM({"user_id": msg}).bad_request)168    if user_id == role.user.id:169        raise APIException.from_error(EM({"role_id": "can't delete self-user role"}).conflict)170    target_role = Role.get_relation_user_company(user_id, role.company.id)171    if not target_role:172        raise APIException.from_error(EM({"user_id": "not found"}).notFound)173    try:174        db.session.delete(target_role)175        db.session.commit()176    except IntegrityError as ie:177        raise APIException.from_error(EM({"delete": f"can't delete user-id: {user_id} relation - {ie}"}).conflict)178    except SQLAlchemyError as e:179        handle_db_error(e)180    return JSONResponse("user-relation was deleted of company").to_json()181@company_bp.route('/roles', methods=['GET'])182@json_required()183@role_required()#any user184def get_company_roles(role):185    return JSONResponse(payload={186        "roles": list(map(lambda x: x.serialize(), db.session.query(RoleFunction).all()))187    }).to_json()188@company_bp.route('/providers', methods=['GET'])189@json_required()190@role_required(level=1)191def get_company_providers(role):192    193    qp = QueryParams(request.args)194    provider_id = qp.get_first_value("provider_id", as_integer=True)195    if not provider_id:196        main_q = role.company.providers197        name_like = StringHelpers(qp.get_first_value("name_like"))198        if name_like:199            main_q = main_q.filter(Unaccent(func.lower(Provider.name)).like(f"%{name_like.unaccent.lower()}%"))200        page, limit = qp.get_pagination_params()201        providers = main_q.paginate(page, limit)202        return JSONResponse(203            message=qp.get_warings(),204            payload={205            'providers': list(map(lambda x: x.serialize(), providers.items)),206            **qp.get_pagination_form(providers)207        }).to_json()208    209    #provider_id in url parameters210    valid, msg = IntegerHelpers.is_valid_id(provider_id)211    if not valid:212        raise APIException.from_error(EM({"provider_id": msg}).bad_request)213    provider = role.company.get_provider(provider_id)214    if not provider:215        raise APIException.from_error(EM({"provider_id": f"id-{provider_id} not found"}).notFound)216    return JSONResponse(217        message=qp.get_warings(),218        payload={'provider': provider.serialize_all()}219    ).to_json()220@company_bp.route('/providers', methods=['POST'])221@json_required({'name': str})222@role_required(level=1)223def create_provider(role, body):224    to_add, invalids = update_row_content(Provider, body)225    if invalids:226        raise APIException.from_error(EM(invalids).bad_request)227    to_add.update({'company_id': role.company.id}) #se agrega company228    new_provider = Provider(**to_add)229    try:230        db.session.add(new_provider)231        db.session.commit()232    except SQLAlchemyError as e:233        handle_db_error(e)234    return JSONResponse(235        message='new provider was created', 236        payload={'provider': new_provider.serialize_all()},237        status_code=201238    ).to_json()239@company_bp.route('/providers/<int:provider_id>', methods=['PUT'])240@json_required()241@role_required(level=1)242def update_provider(role, body, provider_id):243    valid, msg = IntegerHelpers.is_valid_id(provider_id)244    if not valid:245        raise APIException.from_error(EM({"provider_id": msg}).bad_request)246    provider = role.company.get_provider(provider_id)247    if not provider:248        raise APIException.from_error(EM({"provider_id": f"id-{provider_id} not found"}).notFound)249    to_update, invalids = update_row_content(Provider, body)250    if invalids:251        raise APIException.from_error(EM(invalids).bad_request)252    try:253        db.session.query(Provider).filter(Provider.id == provider.id).update(to_update)254        db.session.commit()255    except SQLAlchemyError as e:256        handle_db_error(e)257    return JSONResponse(message=f'provider-id: {provider_id} was updated').to_json()258@company_bp.route('/providers/<int:provider_id>', methods=['DELETE'])259@json_required()260@role_required(level=1)261def delete_provider(role, provider_id):262    valid, msg = IntegerHelpers.is_valid_id(provider_id)263    if not valid:264        raise APIException.from_error(EM({"provider_id": msg}).bad_request)265    266    target_provider = role.company.get_provider(provider_id)267    if not target_provider:268        raise APIException.from_error(EM({"provider_id": f"id-{provider_id} not found"}).notFound)269    try:270        db.session.delete(target_provider)271        db.session.commit()272    except IntegrityError as ie:273        raise APIException.from_error(EM({"provider_id": f"can't delete provider id-{provider_id}, {ie}"}).conflict)274    except SQLAlchemyError as e:275        handle_db_error(e)276    return JSONResponse(message=f'provider-id: {provider_id} has been deleted').to_json()277@company_bp.route('/categories', methods=['GET'])278@json_required()279@role_required()280def get_company_categories(role):281    qp = QueryParams(request.args)282    category_id = qp.get_first_value("category_id", as_integer=True)283    if not category_id:284        cat = role.company.categories.filter(Category.parent_id == None).order_by(Category.name.asc()).all() #root categories only285        286        return JSONResponse(287            message=qp.get_warings(),288            payload={289                "categories": list(map(lambda x: x.serialize_children(), cat))290            }291        ).to_json()292    #category-id is present in the url-parameters293    valid, msg = IntegerHelpers.is_valid_id(category_id)294    if not valid:295        raise APIException.from_error(EM({"category_id": msg}).bad_request)296    cat = role.company.get_category_by_id(category_id)297    if not cat:298        raise APIException.from_error(EM({"category_id": f"id-{category_id} not found"}).notFound)299    #return item300    return JSONResponse(301        payload={302            'category': cat.serialize_all()303        }304    ).to_json()305@company_bp.route('/categories', methods=['POST'])306@company_bp.route('/categories/<int:category_id>', methods=['PUT'])307@json_required({'name': str})308@role_required()309def create_or_update_category(role, body, category_id=None):310    parent_id = body.get('parent_id', None)311    new_name = StringHelpers(body["name"])312    if parent_id:313        valid, msg = IntegerHelpers.is_valid_id(parent_id)314        if not valid:315            raise APIException.from_error(EM({"parent_id": msg}).bad_request)316        parent = role.company.get_category_by_id(parent_id)317        if not parent:318            raise APIException.from_error(EM({"parent_id": f"id-{parent_id} not found"}).notFound)319    newRows, invalids = update_row_content(Category, body)320    if invalids:321        raise APIException.from_error(EM(invalids).bad_request)322    main_q = db.session.query(Category).select_from(Company).join(Company.categories).\323        filter(Company.id == role.company.id, Unaccent(func.lower(Category.name)) == new_name.unaccent.lower())324    #POST method325    if request.method == "POST":326        category_exists = main_q.first()327        if category_exists:328            raise APIException.from_error(EM({"name": f"category name [{new_name.value}] already exists"}).conflict)329        newRows.update({'company_id': role.company.id, "parent_id": parent_id})330        new_category = Category(**newRows)331        try:332            db.session.add(new_category)333            db.session.commit()334        except SQLAlchemyError as e:335            handle_db_error(e)336        return JSONResponse(337            payload={"category": new_category.serialize()},338            status_code=201339        ).to_json()340    #PUT method341    target_cat = role.company.get_category_by_id(category_id)342    if not target_cat:343        raise APIException.from_error(EM({"category_id": f"id-{category_id} not found"}).notFound)344    name_exists = main_q.filter(Category.id != category_id).first()345    if name_exists:346        raise APIException.from_error(EM({"name": f"category name [{new_name.value}] already exists"}).conflict)347    try:348        db.session.query(Category).filter(Category.id == target_cat.id).update(newRows)349        db.session.commit()350    except SQLAlchemyError as e:351        handle_db_error(e)352    return JSONResponse(f'category-id: {category_id} updated').to_json()353@company_bp.route("/categories/<int:cat_id>", methods=["DELETE"])354@json_required()355@role_required(level=1)356def delete_category(role, cat_id):357    valid, msg = IntegerHelpers.is_valid_id(cat_id)358    if not valid:359        raise APIException.from_error(EM({"cat_id": msg}).bad_request)360    target_cat = role.company.get_category_by_id(cat_id)361    if not target_cat:362        raise APIException.from_error(EM({"cat_id": f"id-{cat_id} not found"}).notFound)363    try:364        db.session.delete(target_cat)365        db.session.commit()366    367    except IntegrityError as ie:368        raise APIException.from_error(EM({"cat_id": f"can't delete category_id: {cat_id}, {ie}"}).conflict)369    except SQLAlchemyError as e:370        handle_db_error(e)371    return JSONResponse(372        message=f"category_id: {cat_id} has been deleted"373    ).to_json()374@company_bp.route('/categories/<int:cat_id>/attributes', methods=['GET'])375@json_required()376@role_required()377def get_category_attributes(role, cat_id):378    valid, msg = IntegerHelpers.is_valid_id(cat_id)379    if not valid:380        raise APIException.from_error(EM({"cat_id": msg}).bad_request)381    382    target_cat = role.company.get_category_by_id(cat_id)383    if not target_cat:384        raise APIException.from_error(EM({"cat_id": f"id-{cat_id} not found"}).notFound)385    all_attributes = target_cat.get_attributes()386    return JSONResponse(387        payload={'attributes': list(map(lambda x: x.serialize_all(), all_attributes))}388    ).to_json()389@company_bp.route('/categories/<int:category_id>/attributes', methods=['PUT'])390@json_required({'attributes': list})391@role_required(level=1)392def update_category_attributes(role, body, category_id):393    attributes = body.get('attributes')394    valid, msg = IntegerHelpers.is_valid_id(category_id)395    if not valid:396        raise APIException.from_error(EM({"category_id": msg}).bad_request)397    target_cat = role.company.get_category_by_id(category_id)398    if not target_cat:399        raise APIException.from_error(EM({"category_id": f"id-{category_id} not found"}).notFound)400    if not attributes: #empty list clear all attibutes401        try:402            target_cat.attributes = []403            db.session.commit()404        except SQLAlchemyError as e:405            handle_db_error(e)406        return JSONResponse(407            message=f'all attributes of category-id: {category_id} were removed',408        ).to_json()409    not_integer = [r for r in attributes if not isinstance(r, int)]410    if not_integer:411        raise APIException.from_error(EM(412            {"attributes": f"list of attributes must include integers values only.. <{not_integer}> were detected"}413        ).bad_request)414    new_attributes = role.company.attributes.filter(Attribute.id.in_(attributes)).all()415    if not new_attributes:416        raise APIException.from_error(EM({"attributes": "no attributes were found in the database"}).notFound)417    try:418        target_cat.attributes = new_attributes419        db.session.commit()420    except SQLAlchemyError as e:421        handle_db_error(e)422    return JSONResponse(423        message=f'category-id: {category_id} updated',424        payload={425            'category': target_cat.serialize()426        }427    ).to_json()428@company_bp.route("/item-attributes", methods=["GET"])429@json_required()430@role_required()431def get_company_attributes(role):432    qp = QueryParams(request.args)433    attribute_id = qp.get_first_value("attribute_id", as_integer=True)434    if not attribute_id:435        page, limit = qp.get_pagination_params()436        name_like = StringHelpers(qp.get_first_value("name_like"))437        main_q = role.company.attributes.order_by(Attribute.name.asc())438        if name_like:439            main_q = main_q.filter(Unaccent(func.lower(Attribute.name)).like(f"%{name_like.unaccent.lower()}%"))440        attributes = main_q.paginate(page, limit)441        return JSONResponse(442            message=qp.get_warings(),443            payload={444                'attributes': list(map(lambda x:x.serialize(), attributes.items)),445                **qp.get_pagination_form(attributes)446            }447        ).to_json()448    valid, msg = IntegerHelpers.is_valid_id(attribute_id)449    if not valid:450        raise APIException.from_error(EM({"attribute_id": msg}).bad_request)451    452    target_attr = role.company.get_attribute(attribute_id)453    if not target_attr:454        raise APIException.from_error(EM({"attribute_id": f"id-{attribute_id} not found"}).notFound)455    return JSONResponse(456        message=qp.get_warings(),457        payload={458            'attribute': target_attr.serialize_all()459        }460    ).to_json()461@company_bp.route('/item-attributes', methods=["POST"])462@json_required({'name': str})463@role_required(level=1)    464def create_attribute(role, body):465    name = StringHelpers(body["name"])466    attribute_exists = db.session.query(Attribute).select_from(Company).join(Company.attributes).\467        filter(Unaccent(func.lower(Attribute.name)) == name.unaccent.lower()).first()468        469    if attribute_exists:470        raise APIException.from_error(EM({"name": f"attribute <{name.value}> already exists"}).conflict)471        472    to_add, invalids = update_row_content(Attribute, body)473    if invalids:474        raise APIException.from_error(EM(invalids).bad_request)475    476    to_add.update({'company_id': role.company.id})477    new_attribute = Attribute(**to_add)478    try:479        db.session.add(new_attribute)480        db.session.commit()481    except SQLAlchemyError as e:482        handle_db_error(e)483    return JSONResponse(484        payload={485            'attribute': new_attribute.serialize()486        },487        message='new attribute created',488        status_code=201489    ).to_json()490@company_bp.route("/item-attributes/<int:attribute_id>", methods=["PUT"])491@json_required({'name': str})492@role_required(level=1)493def update_attribute(role, body, attribute_id):494    name = StringHelpers(body["name"])495    invalids = Validations.validate_inputs({496        "attribute_id": IntegerHelpers.is_valid_id(attribute_id),497        "name": name.is_valid_string()498    })499    if invalids:500        raise APIException.from_error(EM(invalids).bad_request)501    target_attr = role.company.get_attribute(attribute_id)502    if not target_attr:503        raise APIException.from_error(EM({"attribute_id": f"id-{attribute_id} not found"}).notFound)504    attribute_exists = db.session.query(Attribute).select_from(Company).join(Company.attributes).\505        filter(Unaccent(func.lower(Attribute.name)) == name.unaccent.lower()).first()506        507    if attribute_exists:508        raise APIException.from_error(EM({"name": f"attribute <{name.value}> already exists"}).conflict)509    to_update, invalids = update_row_content(Attribute, body)510    if invalids:511        raise APIException.from_error(EM(invalids).bad_request)512    513    try:514        db.session.query(Attribute).filter(Attribute.id == target_attr.id).update(to_update)515        db.session.commit()516    except SQLAlchemyError as e:517        handle_db_error(e)518    return JSONResponse(519        message=f'attribute-id: {attribute_id} updated'520    ).to_json()521@company_bp.route("/item-attributes/<int:attribute_id>", methods=["DELETE"])522@json_required()523@role_required(level=1)524def delete_attribute(role, attribute_id):525    valid, msg = IntegerHelpers.is_valid_id(attribute_id)526    if not valid:527        raise APIException.from_error(EM({"attribute_id": msg}).bad_request)528    target_attr = role.company.get_attribute(attribute_id)529    if not target_attr:530        raise APIException.from_error(EM({"attribute_id": f"id-{attribute_id} not found"}).notFound)531    try:532        db.session.delete(target_attr)533        db.session.commit()534    535    except IntegrityError as ie:536        raise APIException.from_error(EM({"attribute_id": f"can't delete attribute_id: {attribute_id} - {ie}"}).conflict)537    except SQLAlchemyError as e:538        handle_db_error(e)539    return JSONResponse(f'attribute_id: {attribute_id} deleted').to_json()540@company_bp.route('/item-attributes/<int:attribute_id>/values', methods=["GET"])541@json_required()542@role_required()543def get_attribute_values(role, attribute_id):544    545    qp = QueryParams(request.args)546    valid, msg = IntegerHelpers.is_valid_id(attribute_id)547    if not valid:548        raise APIException.from_error(EM({"attribute_id": msg}).bad_request)549    target_attr = role.company.get_attribute(attribute_id)550    if not target_attr:551        raise APIException.from_error(EM({"attribute_id": f"id-{attribute_id} not found"}).notFound)552    main_q = target_attr.attribute_values.order_by(AttributeValue.value.asc())553    page, limit = qp.get_pagination_params()554    name_like = qp.get_first_value("name_like")555    if name_like:556        sh = StringHelpers(string=name_like)557        main_q = main_q.filter(Unaccent(func.lower(AttributeValue.value)).like(f'%{sh.unaccent.lower()}%'))558    main_q = main_q.paginate(page, limit)559    560    payload = {561        'attribute': target_attr.serialize(),562        'values': list(map(lambda x: x.serialize(), main_q.items)),563        **qp.get_pagination_form(main_q)564    }565    return JSONResponse(566        payload=payload,567        message=qp.get_warings()568    ).to_json()569@company_bp.route("/item-attributes/<int:attribute_id>/values", methods=["POST"])570@json_required({'attribute_value': str})571@role_required(level=1) 572def create_attribute_value(role, body, attribute_id):573    attr_value = StringHelpers(body["attribute_value"])574    invalids = Validations.validate_inputs({575        "attribute_value": attr_value.is_valid_string(),576        "attribute_id": IntegerHelpers.is_valid_id(attribute_id)577    })578    if invalids:579        raise APIException.from_error(EM(invalids).bad_request)580    target_attr = role.company.get_attribute(attribute_id)581    if not target_attr:582        raise APIException.from_error(EM({"attribute_id": f"id-{attribute_id} not found"}).notFound)583        584    value_exists = target_attr.attribute_values.\585        filter(Unaccent(func.lower(AttributeValue.value)) == attr_value.unaccent.lower()).first()586    if value_exists:587        raise APIException.from_error(EM({"attribute_value": f"attribute_value: {attr_value.value} already exists"}).conflict)588    new_attr_value = AttributeValue(589        value = attr_value.value,590        attribute = target_attr591    )592    try:593        db.session.add(new_attr_value)594        db.session.commit()595    except SQLAlchemyError as e:596        handle_db_error(e)597    return JSONResponse(598        message= f'new value created for attribute_id: {attribute_id}',599        payload= {600            'attribute': target_attr.serialize(),601            'new_value': new_attr_value.serialize()602        },603        status_code=201604    ).to_json()605@company_bp.route("/item-attributes/values/<int:value_id>", methods=["PUT"])606@json_required({'attribute_value': str})607@role_required(level=1)608def update_attribute_value(role, body, value_id):609    attr_value = StringHelpers(body["attribute_value"])610    invalids = Validations.validate_inputs({611        "attribute_value": attr_value.is_valid_string(),612        "value_id": IntegerHelpers.is_valid_id(value_id)613    })614    if invalids:615        raise APIException.from_error(EM(invalids).bad_request)616    base_q = db.session.query(AttributeValue).select_from(Company).join(Company.attributes).\617        join(Attribute.attribute_values).filter(Company.id == role.company.id)618    value_exists = base_q.filter(Unaccent(func.lower(AttributeValue.value)) == attr_value.unaccent.lower()).first()619    if value_exists:620        raise APIException.from_error(EM({"attribute_value": f"attribute_value: {attr_value.value} already exists"}).conflict)621    target_value = base_q.filter(AttributeValue.id == value_id).first()622    if not target_value:623        raise APIException.from_error(EM({"target_value": f"id-{value_id}"}).notFound)624    625    try:626        target_value.value = attr_value.value627        db.session.commit()628    except SQLAlchemyError as e:629        handle_db_error(e)630    return JSONResponse(631        message=f'attribute_value_id: <{value_id} updated>'632    ).to_json()633@company_bp.route("/item-attributes/values/<int:value_id>", methods=["DELETE"])634@json_required()635@role_required(level=1)636def delete_attributeValue(role, value_id):637    valid, msg = IntegerHelpers.is_valid_id(value_id)638    if not valid:639        raise APIException.from_error(EM({"value_id": msg}).bad_request)640    target_value = db.session.query(AttributeValue).select_from(Company).join(Company.attributes).join(Attribute.attribute_values).\641        filter(Company.id == role.company.id, AttributeValue.id == value_id).first()642    if not target_value:643        raise APIException.from_error(EM({"value_id": f"id-{value_id} not found"}).notFound)644    try:645        db.session.delete(target_value)646        db.session.commit()647    except IntegrityError as ie:648        raise APIException.from_error(EM({"value_id": f"can't delete value_id:{value_id} - {ie}"}))649    except SQLAlchemyError as e:650        handle_db_error(e)651    return JSONResponse(652        message=f'attribute_value_id: {value_id} deleted'653    ).to_json()654@company_bp.route("/qrcodes", methods=["GET"])655@json_required()656@role_required()657def get_all_qrcodes(role):658    qp = QueryParams(request.args)659    page, limit = qp.get_pagination_params()660    q = db.session.query(QRCode).select_from(Company).join(Company.qr_codes)661    662    status = qp.get_first_value("status")663    if status:664        if status == "active":665            q = q.filter(QRCode.is_active == True)666        if status == "disabled":667            q = q.filter(QRCode.is_active == False)668        if status == "used":669            q = q.filter(QRCode.container != None)670        if status == "free":671            q = q.filter(QRCode.container == None)672    673    qr_codes = q.paginate(page, limit)674    return JSONResponse(675        message=qp.get_warings(),676        payload={677            "qr_codes": list(map(lambda x:x.serialize(), qr_codes.items)),678            **qp.get_pagination_form(qr_codes)679        }680    ).to_json()681@company_bp.route("/qrcodes", methods=["POST"])682@json_required({"count": int})683@role_required()684def create_qrcode(role, body):685    count = body["count"]686    if count <= 0:687        raise APIException.from_error(EM({"count": f"count can't be less than 0"}).bad_request)688    bulk_qrcode = []689    while len(bulk_qrcode) < count:690        new_qrcode = QRCode(company_id = role.company.id)691        try:692            db.session.add(new_qrcode)693            db.session.commit()694        except SQLAlchemyError as e:695            handle_db_error(e)696        bulk_qrcode.append(new_qrcode)697    return JSONResponse(698        message="ok",699        payload={"qrcodes": list(map(lambda x:x.serialize(), bulk_qrcode))}700    ).to_json()701@company_bp.route("/qrcodes", methods=["DELETE"])702@json_required()703@role_required()704def delete_qrcode(role):705    qp = QueryParams(request.args)706    qrcodeList = qp.get_all_integers("qrcode")707    if not qrcodeList:708        raise APIException.from_error(EM({"qrcode": qp.get_warings()}).bad_request)709    #query returns: [(1,), (2,), ... (n,)] list of tuples with matching ids710    target_qrcodes = db.session.query(QRCode.id).join(QRCode.company).\711        filter(Company.id == role.company.id, QRCode.id.in_(qrcodeList), QRCode.container == None).all()712    if not target_qrcodes:713        raise APIException.from_error(EM({"qrcode": f"passed ids: {qrcodeList} not found"}).notFound)714    qrcodeList = [r[0] for r in target_qrcodes] #get list of ids inside each tuple returned by the query715    try:716        db.session.query(QRCode).filter(QRCode.id.in_(qrcodeList)).delete()717        db.session.commit()718    except IntegrityError as ie:719        raise APIException.from_error(EM({"qrcode_ud": f"can't delete qrcode_id:{qrcodeList} - {ie}"}).conflict)720    except SQLAlchemyError as e:721        handle_db_error(e)722    return JSONResponse(723        message=f"qrcode_id: {qrcodeList} deleted"...mypy_plugin.py
Source:mypy_plugin.py  
1"""2 * Copyright(c) 2021 to 2022 ZettaScale Technology and others3 *4 * This program and the accompanying materials are made available under the5 * terms of the Eclipse Public License v. 2.0 which is available at6 * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License7 * v. 1.0 which is available at8 * http://www.eclipse.org/org/documents/edl-v10.php.9 *10 * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause11"""12from mypy.plugin import Plugin, AnalyzeTypeContext13from mypy.types import RawExpressionType, AnyType, TypeOfAny14from mypy.typeanal import make_optional_type15class IdlMyPyPlugin(Plugin):16    def get_type_analyze_hook(self, fullname: str):17        if fullname.startswith("cyclonedds.idl.types"):18            if fullname == "cyclonedds.idl.types.array":19                return strip_length_array_type20            elif fullname == "cyclonedds.idl.types.sequence":21                return strip_length_sequence_type22            elif fullname == "cyclonedds.idl.types.typedef":23                return resolve_type_def24            elif fullname == "cyclonedds.idl.types.bounded_str":25                return bounded_str_to_str26            elif fullname == "cyclonedds.idl.types.default":27                return union_default_type28            elif fullname == "cyclonedds.idl.types.case":29                return union_case_type30        return None31    def get_class_decorator_hook(self, fullname: str):32        from mypy.plugins import dataclasses33        if fullname in ["cyclonedds.idl.IdlStruct", "cyclonedds.idl.IdlUnion"]:34            return dataclasses.dataclass_class_maker_callback35def plugin(version: str):36    return IdlMyPyPlugin37def strip_length_array_type(ctx: AnalyzeTypeContext):38    if len(ctx.type.args) == 0:39        # ref of type itself40        return AnyType(TypeOfAny.special_form)41    if len(ctx.type.args) != 2 or not isinstance(ctx.type.args[1], RawExpressionType) or \42            ctx.type.args[1].base_type_name != "builtins.int":43        ctx.api.fail("cyclonedds.idl.types.array requires two arguments, a subtype and a fixed size.", ctx.context)44        return AnyType(TypeOfAny.from_error)45    return ctx.api.named_type('typing.Sequence', [ctx.api.analyze_type(ctx.type.args[0])])46def resolve_type_def(ctx: AnalyzeTypeContext):47    if len(ctx.type.args) == 0:48        # ref of type itself49        return AnyType(TypeOfAny.special_form)50    if len(ctx.type.args) != 2:51        ctx.api.fail("cyclonedds.idl.types.typedef requires a name and one type argument.", ctx.context)52        return AnyType(TypeOfAny.from_error)53    return ctx.api.analyze_type(ctx.type.args[1])54def strip_length_sequence_type(ctx: AnalyzeTypeContext):55    if len(ctx.type.args) == 0:56        # ref of type itself57        return AnyType(TypeOfAny.special_form)58    if len(ctx.type.args) not in [1, 2]:59        ctx.api.fail("cyclonedds.idl.types.sequence requires a subtype and an optional max size.", ctx.context)60        return AnyType(TypeOfAny.from_error)61    elif len(ctx.type.args) == 2 and not isinstance(ctx.type.args[1], RawExpressionType):62        ctx.api.fail("cyclonedds.idl.types.sequence max size should be an integer.", ctx.context)63        return AnyType(TypeOfAny.from_error)64    elif len(ctx.type.args) == 2 and ctx.type.args[1].base_type_name != "builtins.int":65        ctx.api.fail("cyclonedds.idl.types.sequence max size should be an integer.", ctx.context)66        return AnyType(TypeOfAny.from_error)67    return ctx.api.named_type('typing.Sequence', [ctx.api.analyze_type(ctx.type.args[0])])68def bounded_str_to_str(ctx: AnalyzeTypeContext):69    if len(ctx.type.args) == 0:70        # ref of type itself71        return AnyType(TypeOfAny.special_form)72    if len(ctx.type.args) != 1 or not isinstance(ctx.type.args[0], RawExpressionType) or \73            ctx.type.args[0].base_type_name != "builtins.int":74        ctx.api.fail("cyclonedds.idl.types.bound_str requires one argument, a fixed size.", ctx.context)75        return AnyType(TypeOfAny.from_error)76    return ctx.api.named_type('builtins.str', [])77def union_default_type(ctx: AnalyzeTypeContext):78    if len(ctx.type.args) == 0:79        # ref of type itself80        return AnyType(TypeOfAny.special_form)81    if len(ctx.type.args) != 1:82        ctx.api.fail("cyclonedds.idl.types.default requires one argument, a type.", ctx.context)83        return AnyType(TypeOfAny.from_error)84    return make_optional_type(ctx.api.analyze_type(ctx.type.args[0]))85def union_case_type(ctx: AnalyzeTypeContext):86    if len(ctx.type.args) == 0:87        # ref of type itself88        return AnyType(TypeOfAny.special_form)89    if len(ctx.type.args) != 2:90        ctx.api.fail("cyclonedds.idl.types.case requires two arguments, a discriminator label and a type.", ctx.context)91        return AnyType(TypeOfAny.from_error)...__init__.py
Source:__init__.py  
1# SPDX-License-Identifier: MIT2import typing as t3from mypy.plugin import AnalyzeTypeContext, Plugin4from mypy.types import AnyType, EllipsisType, RawExpressionType, Type, TypeOfAny5class DisnakePlugin(Plugin):6    def get_type_analyze_hook(7        self, fullname: str8    ) -> t.Optional[t.Callable[[AnalyzeTypeContext], Type]]:9        if fullname == "disnake.ext.commands.params.Range":10            return range_type_analyze_callback11        if fullname == "disnake.ext.commands.params.String":12            return string_type_analyze_callback13        return None14def range_type_analyze_callback(ctx: AnalyzeTypeContext) -> Type:15    args = ctx.type.args16    if len(args) != 2:17        ctx.api.fail(f'"Range" expected 2 parameters, got {len(args)}', ctx.context)18        return AnyType(TypeOfAny.from_error)19    for arg in args:20        if isinstance(arg, EllipsisType):21            continue22        if not isinstance(arg, RawExpressionType):23            ctx.api.fail('invalid usage of "Range"', ctx.context)24            return AnyType(TypeOfAny.from_error)25        name = arg.simple_name()26        # if one is a float, `Range.underlying_type` returns `float`27        if name == "float":28            return ctx.api.named_type("builtins.float", [])29        # otherwise it should be an int; fail if it isn't30        elif name != "int":31            ctx.api.fail(f'"Range" parameters must be int or float, not {name}', ctx.context)32            return AnyType(TypeOfAny.from_error)33    return ctx.api.named_type("builtins.int", [])34def string_type_analyze_callback(ctx: AnalyzeTypeContext) -> Type:35    args = ctx.type.args36    if len(args) != 2:37        ctx.api.fail(f'"String" expected 2 parameters, got {len(args)}', ctx.context)38        return AnyType(TypeOfAny.from_error)39    for arg in args:40        if isinstance(arg, EllipsisType):41            continue42        if not isinstance(arg, RawExpressionType):43            ctx.api.fail('invalid usage of "String"', ctx.context)44            return AnyType(TypeOfAny.from_error)45        name = arg.simple_name()46        if name != "int":47            ctx.api.fail(f'"String" parameters must be int, not {name}', ctx.context)48            return AnyType(TypeOfAny.from_error)49    return ctx.api.named_type("builtins.str", [])50def plugin(version: str) -> t.Type[Plugin]:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
