Best Python code snippet using localstack_python
pm_affiliation.py
Source:pm_affiliation.py  
...21                                          inspect.currentframe())22    key = {"UserID": user_id, "OrganizationID": organization_id}23    result = DB_utils.query_key(user_id, Tables.PM_AFFILIATION, key)24    if result and convert_response:25        result = common_utils.convert_response(26            user_id, result, RESPONSE_AFFILIATION)27    return common_utils.response(result, pm_logger)28def create_affiliation(trace_id, email, user_id, organization_id, authority,29                       invitation_status):30    common_utils.begin_logger(trace_id, __name__, inspect.currentframe())31    # ä¸è¦ãªã¯ã©ã ã使ãããã¨ãé¿ãããã32    date_now = common_utils.get_current_date()33    create_affiliation = {34        "MailAddress": email,35        "UserID": user_id,36        "OrganizationID": organization_id,37        "Authority": authority,38        "InvitationStatus": invitation_status,39        'CreatedAt': date_now,...test_convert_response.py
Source:test_convert_response.py  
1""".convert_response() test suite."""2import pytest3import sphinxcontrib.openapi._lib2to3 as lib2to34@pytest.fixture(scope="function")5def convert_response(oas_fragment):6    def _wrapper(response, produces):7        oas2 = oas_fragment(8            """9            swagger: "2.0"10            info:11              title: An example spec12              version: "1.0"13            paths:14              /test:15                get:16                  responses:17                    '200':18                      description: a response description19            """20        )21        oas2["paths"]["/test"]["get"]["responses"]["200"] = response22        oas2["paths"]["/test"]["get"]["produces"] = produces23        oas3 = lib2to3.convert(oas2)24        return oas3["paths"]["/test"]["get"]["responses"]["200"]25    return _wrapper26def test_minimal(convert_response, oas_fragment):27    converted = convert_response(28        oas_fragment(29            """30            description: a response description31            """32        ),33        produces=["application/json"],34    )35    assert converted == oas_fragment(36        """37        description: a response description38        """39    )40def test_schema(convert_response, oas_fragment):41    converted = convert_response(42        oas_fragment(43            """44            description: a response description45            schema:46              items:47                format: int3248                type: integer49              type: array50            """51        ),52        produces=["application/json"],53    )54    assert converted == oas_fragment(55        """56        content:57          application/json:58            schema:59              items:60                format: int3261                type: integer62              type: array63        description: a response description64        """65    )66def test_schema_mimetypes(convert_response, oas_fragment):67    converted = convert_response(68        oas_fragment(69            """70            description: a response description71            schema:72              items:73                format: int3274                type: integer75              type: array76            """77        ),78        produces=["application/json", "text/plain"],79    )80    assert converted == oas_fragment(81        """82        content:83          application/json:84            schema:85              items:86                format: int3287                type: integer88              type: array89          text/plain:90            schema:91              items:92                format: int3293                type: integer94              type: array95        description: a response description96        """97    )98def test_schema_no_mimetypes(convert_response, oas_fragment):99    converted = convert_response(100        oas_fragment(101            """102            description: a response description103            schema:104              items:105                format: int32106                type: integer107              type: array108            """109        ),110        produces=None,111    )112    assert converted == oas_fragment(113        """114        content:115          '*/*':116            schema:117              items:118                format: int32119                type: integer120              type: array121        description: a response description122        """123    )124def test_examples(convert_response, oas_fragment):125    converted = convert_response(126        oas_fragment(127            """128            description: a response description129            examples:130              application/json:131                something: important132            """133        ),134        produces=["application/json"],135    )136    assert converted == oas_fragment(137        """138        content:139          application/json:140            example:141              something: important142        description: a response description143        """144    )145def test_examples_any_type(convert_response, oas_fragment):146    converted = convert_response(147        oas_fragment(148            """149            description: a response description150            examples:151              application/json: '{"something": "important"}'152            """153        ),154        produces=["application/json"],155    )156    assert converted == oas_fragment(157        """158        content:159          application/json:160            example: '{"something": "important"}'161        description: a response description162        """163    )164def test_examples_mimetypes(convert_response, oas_fragment):165    converted = convert_response(166        oas_fragment(167            """168            description: a response description169            examples:170              application/json:171                something: important172              text/plain: something=imporant173            """174        ),175        produces=["application/json", "text/plain"],176    )177    assert converted == oas_fragment(178        """179        content:180          application/json:181            example:182              something: important183          text/plain:184            example: something=imporant185        description: a response description186        """187    )188def test_headers_schema_only(convert_response, oas_fragment):189    converted = convert_response(190        oas_fragment(191            """192            description: a response description193            headers:194              X-Test:195                type: string196            """197        ),198        produces=["application/json"],199    )200    assert converted == oas_fragment(201        """202        description: a response description203        headers:204          X-Test:205            schema:206              type: string207        """208    )209def test_headers_schema_extra(convert_response, oas_fragment):210    converted = convert_response(211        oas_fragment(212            """213            description: a response description214            headers:215              X-Test:216                description: Is it a test?217                type: string218            """219        ),220        produces=["application/json"],221    )222    assert converted == oas_fragment(223        """224        description: a response description225        headers:226          X-Test:227            description: Is it a test?228            schema:229              type: string230        """231    )232def test_headers_multiple(convert_response, oas_fragment):233    converted = convert_response(234        oas_fragment(235            """236            description: a response description237            headers:238              X-Bar:239                format: int32240                type: integer241              X-Foo:242                type: string243            """244        ),245        produces=["application/json"],246    )247    assert converted == oas_fragment(248        """249        description: a response description250        headers:251          X-Bar:252            schema:253              format: int32254              type: integer255          X-Foo:256            schema:257              type: string258        """259    )260def test_schema_examples_headers(convert_response, oas_fragment):261    converted = convert_response(262        oas_fragment(263            """264            description: a response description265            examples:266              application/json:267                something: important268            headers:269              X-Test:270                description: Is it a test?271                type: string272            schema:273              items:274                format: int32275                type: integer276              type: array277            """278        ),279        produces=["application/json"],280    )281    assert converted == oas_fragment(282        """283        description: a response description284        content:285          application/json:286            example:287              something: important288            schema:289              items:290                format: int32291                type: integer292              type: array293        headers:294          X-Test:295            description: Is it a test?296            schema:297              type: string298        """299    )300def test_complete(convert_response, oas_fragment):301    converted = convert_response(302        oas_fragment(303            """304            description: a response description305            examples:306              application/json:307                something: important308            headers:309              X-Test:310                description: Is it a test?311                type: string312            schema:313              items:314                format: int32315                type: integer316              type: array317            """318        ),319        produces=["application/json"],320    )321    assert converted == oas_fragment(322        """323        description: a response description324        content:325          application/json:326            example:327              something: important328            schema:329              items:330                format: int32331                type: integer332              type: array333        headers:334          X-Test:335            description: Is it a test?336            schema:337              type: string338        """339    )340def test_vendor_extensions(convert_response, oas_fragment):341    converted = convert_response(342        oas_fragment(343            """344            description: a response description345            examples:346              application/json:347                something: important348            headers:349              X-Test:350                description: Is it a test?351                type: string352                x-header-ext: header-ext353            schema:354              items:355                format: int32...auth.py
Source:auth.py  
...24            res = await f(request)25            return res26        else:27            # the user is not authorized.28            return await convert_response({'status': 'not_authorized'}, ResponseCode.AUTH_ERROR)29    return decorated_function30async def check_request_for_authorization_status(request: 'Request') -> bool:31    await db_connect()32    session = SqliteDataBase.create_session()33    header = request.headers.get("Authorization")34    if header is None:35        return False36    tokens = header.split(' ')37    if len(tokens) != 2:38        return False39    t_type = tokens[0]40    if t_type != 'basic':41        return False42    token: str = tokens[1]43    try:44        payload: dict = await jwt_decode(token.encode())45    except InvalidSignatureError:46        return False47    try:48        session.query(Account).filter(Account.account_id == payload['id']).one()49    except NoResultFound:50        return False51    # TODO check valid type52    return True53class SignUp(Resource):54    async def post(self, request: 'Request') -> 'HTTPResponse':55        await db_connect()56        session = SqliteDataBase.create_session()57        res = await self._logic(session, request)58        await db_close(session)59        return res60    async def _logic(self, session: 'Session', request: 'Request') -> 'HTTPResponse':61        req_json = request.json62        account_id = req_json.get("id")63        if account_id is None:64            return await convert_response({"error": "account id is empty"}, ResponseCode.RESPONSE_ERROR)65        account_pwd = req_json.get("pw")66        if account_pwd is None:67            return await convert_response({"error": "account pw is empty"}, ResponseCode.RESPONSE_ERROR)68        user = session.query(Account).filter(Account.account_id == account_id).first()69        if user:70            return await convert_response({"msg": f"already exist account"}, ResponseCode.RESPONSE_ERROR)71        account = Account(account_id, account_pwd)72        session.add(account)73        session.commit()74        token: bytes = await jwt_encode(account_id, account.valid_token_time.timestamp())75        return await convert_response({"token": token.decode()})76class SignIn(Resource):77    async def put(self, request: 'Request') -> 'HTTPResponse':78        await db_connect()79        session = SqliteDataBase.create_session()80        res = await self._logic(session, request)81        await db_close(session)82        return res83    async def _logic(self, session: 'Session', request: 'Request') -> 'HTTPResponse':84        req_json = request.json85        account_id = req_json.get("id")86        if account_id is None:87            return await convert_response({"error": "account id is empty"}, ResponseCode.RESPONSE_ERROR)88        account_pwd = req_json.get("pw")89        if account_pwd is None:90            return await convert_response({"error": "account pw is empty"}, ResponseCode.RESPONSE_ERROR)91        try:92            user = session.query(Account).filter(Account.account_id == account_id).one()93        except NoResultFound:94            return await convert_response({"msg": f"account is empty"}, ResponseCode.RESPONSE_ERROR)95        if user.account_pwd != account_pwd:96            return await convert_response({"error": "invalid pw"}, ResponseCode.RESPONSE_ERROR)97        session.commit()98        token: bytes = await jwt_encode(account_id, user.valid_token_time.timestamp())99        return await convert_response({"token": token.decode()})100class RefreshToken(Resource):101    async def put(self, request: 'Request') -> 'HTTPResponse':102        await db_connect()103        session = SqliteDataBase.create_session()104        res = await self._logic(session, request)105        await db_close(session)106        return res107    async def _logic(self, session: 'Session', request: 'Request') -> 'HTTPResponse':108        ret: Tuple[bool, str] = await self._valid_header(request)109        ret_bool, ret_token = ret110        if not ret_bool:111            return await convert_response({"error": "invalid refresh token"}, ResponseCode.RESPONSE_ERROR)112        try:113            payload: dict = await jwt_decode(ret_token.encode())114        except InvalidSignatureError:115            return await convert_response({"error": "invalid refresh token"}, ResponseCode.RESPONSE_ERROR)116        try:117            user = session.query(Account).filter(Account.account_id == payload['id']).one()118        except NoResultFound:119            return await convert_response({"msg": f"account is empty"}, ResponseCode.RESPONSE_ERROR)120        user.valid_token_time = datetime.utcnow()121        session.commit()122        token: bytes = await jwt_encode(user.account_id, user.valid_token_time.timestamp())123        return await convert_response({"token": token.decode()})124    async def _valid_header(self, request: 'Request') -> tuple:125        header = request.headers.get("Authorization")126        if header is None:127            return False, ""128        tokens = header.split(' ')129        if len(tokens) != 2:130            return False, ""131        t_type = tokens[0]132        if t_type != 'bearer':133            return False, ""134        token: str = tokens[1]...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
