How to use convert_response method in localstack

Best Python code snippet using localstack_python

pm_affiliation.py

Source:pm_affiliation.py Github

copy

Full Screen

...21 inspect.currentframe())22 key = {"UserID": user_id, "OrganizationID": organization_id}23 result = DB_utils.query_key(user_id, Tables.PM_AFFILIATION, key)24 if result and convert_response:25 result = common_utils.convert_response(26 user_id, result, RESPONSE_AFFILIATION)27 return common_utils.response(result, pm_logger)28def create_affiliation(trace_id, email, user_id, organization_id, authority,29 invitation_status):30 common_utils.begin_logger(trace_id, __name__, inspect.currentframe())31 # 不要なクラムを作成することを避けるため32 date_now = common_utils.get_current_date()33 create_affiliation = {34 "MailAddress": email,35 "UserID": user_id,36 "OrganizationID": organization_id,37 "Authority": authority,38 "InvitationStatus": invitation_status,39 'CreatedAt': date_now,...

Full Screen

Full Screen

test_convert_response.py

Source:test_convert_response.py Github

copy

Full Screen

1""".convert_response() test suite."""2import pytest3import sphinxcontrib.openapi._lib2to3 as lib2to34@pytest.fixture(scope="function")5def convert_response(oas_fragment):6 def _wrapper(response, produces):7 oas2 = oas_fragment(8 """9 swagger: "2.0"10 info:11 title: An example spec12 version: "1.0"13 paths:14 /test:15 get:16 responses:17 '200':18 description: a response description19 """20 )21 oas2["paths"]["/test"]["get"]["responses"]["200"] = response22 oas2["paths"]["/test"]["get"]["produces"] = produces23 oas3 = lib2to3.convert(oas2)24 return oas3["paths"]["/test"]["get"]["responses"]["200"]25 return _wrapper26def test_minimal(convert_response, oas_fragment):27 converted = convert_response(28 oas_fragment(29 """30 description: a response description31 """32 ),33 produces=["application/json"],34 )35 assert converted == oas_fragment(36 """37 description: a response description38 """39 )40def test_schema(convert_response, oas_fragment):41 converted = convert_response(42 oas_fragment(43 """44 description: a response description45 schema:46 items:47 format: int3248 type: integer49 type: array50 """51 ),52 produces=["application/json"],53 )54 assert converted == oas_fragment(55 """56 content:57 application/json:58 schema:59 items:60 format: int3261 type: integer62 type: array63 description: a response description64 """65 )66def test_schema_mimetypes(convert_response, oas_fragment):67 converted = convert_response(68 oas_fragment(69 """70 description: a response description71 schema:72 items:73 format: int3274 type: integer75 type: array76 """77 ),78 produces=["application/json", "text/plain"],79 )80 assert converted == oas_fragment(81 """82 content:83 application/json:84 schema:85 items:86 format: int3287 type: integer88 type: array89 text/plain:90 schema:91 items:92 format: int3293 type: integer94 type: array95 description: a response description96 """97 )98def test_schema_no_mimetypes(convert_response, oas_fragment):99 converted = convert_response(100 oas_fragment(101 """102 description: a response description103 schema:104 items:105 format: int32106 type: integer107 type: array108 """109 ),110 produces=None,111 )112 assert converted == oas_fragment(113 """114 content:115 '*/*':116 schema:117 items:118 format: int32119 type: integer120 type: array121 description: a response description122 """123 )124def test_examples(convert_response, oas_fragment):125 converted = convert_response(126 oas_fragment(127 """128 description: a response description129 examples:130 application/json:131 something: important132 """133 ),134 produces=["application/json"],135 )136 assert converted == oas_fragment(137 """138 content:139 application/json:140 example:141 something: important142 description: a response description143 """144 )145def test_examples_any_type(convert_response, oas_fragment):146 converted = convert_response(147 oas_fragment(148 """149 description: a response description150 examples:151 application/json: '{"something": "important"}'152 """153 ),154 produces=["application/json"],155 )156 assert converted == oas_fragment(157 """158 content:159 application/json:160 example: '{"something": "important"}'161 description: a response description162 """163 )164def test_examples_mimetypes(convert_response, oas_fragment):165 converted = convert_response(166 oas_fragment(167 """168 description: a response description169 examples:170 application/json:171 something: important172 text/plain: something=imporant173 """174 ),175 produces=["application/json", "text/plain"],176 )177 assert converted == oas_fragment(178 """179 content:180 application/json:181 example:182 something: important183 text/plain:184 example: something=imporant185 description: a response description186 """187 )188def test_headers_schema_only(convert_response, oas_fragment):189 converted = convert_response(190 oas_fragment(191 """192 description: a response description193 headers:194 X-Test:195 type: string196 """197 ),198 produces=["application/json"],199 )200 assert converted == oas_fragment(201 """202 description: a response description203 headers:204 X-Test:205 schema:206 type: string207 """208 )209def test_headers_schema_extra(convert_response, oas_fragment):210 converted = convert_response(211 oas_fragment(212 """213 description: a response description214 headers:215 X-Test:216 description: Is it a test?217 type: string218 """219 ),220 produces=["application/json"],221 )222 assert converted == oas_fragment(223 """224 description: a response description225 headers:226 X-Test:227 description: Is it a test?228 schema:229 type: string230 """231 )232def test_headers_multiple(convert_response, oas_fragment):233 converted = convert_response(234 oas_fragment(235 """236 description: a response description237 headers:238 X-Bar:239 format: int32240 type: integer241 X-Foo:242 type: string243 """244 ),245 produces=["application/json"],246 )247 assert converted == oas_fragment(248 """249 description: a response description250 headers:251 X-Bar:252 schema:253 format: int32254 type: integer255 X-Foo:256 schema:257 type: string258 """259 )260def test_schema_examples_headers(convert_response, oas_fragment):261 converted = convert_response(262 oas_fragment(263 """264 description: a response description265 examples:266 application/json:267 something: important268 headers:269 X-Test:270 description: Is it a test?271 type: string272 schema:273 items:274 format: int32275 type: integer276 type: array277 """278 ),279 produces=["application/json"],280 )281 assert converted == oas_fragment(282 """283 description: a response description284 content:285 application/json:286 example:287 something: important288 schema:289 items:290 format: int32291 type: integer292 type: array293 headers:294 X-Test:295 description: Is it a test?296 schema:297 type: string298 """299 )300def test_complete(convert_response, oas_fragment):301 converted = convert_response(302 oas_fragment(303 """304 description: a response description305 examples:306 application/json:307 something: important308 headers:309 X-Test:310 description: Is it a test?311 type: string312 schema:313 items:314 format: int32315 type: integer316 type: array317 """318 ),319 produces=["application/json"],320 )321 assert converted == oas_fragment(322 """323 description: a response description324 content:325 application/json:326 example:327 something: important328 schema:329 items:330 format: int32331 type: integer332 type: array333 headers:334 X-Test:335 description: Is it a test?336 schema:337 type: string338 """339 )340def test_vendor_extensions(convert_response, oas_fragment):341 converted = convert_response(342 oas_fragment(343 """344 description: a response description345 examples:346 application/json:347 something: important348 headers:349 X-Test:350 description: Is it a test?351 type: string352 x-header-ext: header-ext353 schema:354 items:355 format: int32...

Full Screen

Full Screen

auth.py

Source:auth.py Github

copy

Full Screen

...24 res = await f(request)25 return res26 else:27 # the user is not authorized.28 return await convert_response({'status': 'not_authorized'}, ResponseCode.AUTH_ERROR)29 return decorated_function30async def check_request_for_authorization_status(request: 'Request') -> bool:31 await db_connect()32 session = SqliteDataBase.create_session()33 header = request.headers.get("Authorization")34 if header is None:35 return False36 tokens = header.split(' ')37 if len(tokens) != 2:38 return False39 t_type = tokens[0]40 if t_type != 'basic':41 return False42 token: str = tokens[1]43 try:44 payload: dict = await jwt_decode(token.encode())45 except InvalidSignatureError:46 return False47 try:48 session.query(Account).filter(Account.account_id == payload['id']).one()49 except NoResultFound:50 return False51 # TODO check valid type52 return True53class SignUp(Resource):54 async def post(self, request: 'Request') -> 'HTTPResponse':55 await db_connect()56 session = SqliteDataBase.create_session()57 res = await self._logic(session, request)58 await db_close(session)59 return res60 async def _logic(self, session: 'Session', request: 'Request') -> 'HTTPResponse':61 req_json = request.json62 account_id = req_json.get("id")63 if account_id is None:64 return await convert_response({"error": "account id is empty"}, ResponseCode.RESPONSE_ERROR)65 account_pwd = req_json.get("pw")66 if account_pwd is None:67 return await convert_response({"error": "account pw is empty"}, ResponseCode.RESPONSE_ERROR)68 user = session.query(Account).filter(Account.account_id == account_id).first()69 if user:70 return await convert_response({"msg": f"already exist account"}, ResponseCode.RESPONSE_ERROR)71 account = Account(account_id, account_pwd)72 session.add(account)73 session.commit()74 token: bytes = await jwt_encode(account_id, account.valid_token_time.timestamp())75 return await convert_response({"token": token.decode()})76class SignIn(Resource):77 async def put(self, request: 'Request') -> 'HTTPResponse':78 await db_connect()79 session = SqliteDataBase.create_session()80 res = await self._logic(session, request)81 await db_close(session)82 return res83 async def _logic(self, session: 'Session', request: 'Request') -> 'HTTPResponse':84 req_json = request.json85 account_id = req_json.get("id")86 if account_id is None:87 return await convert_response({"error": "account id is empty"}, ResponseCode.RESPONSE_ERROR)88 account_pwd = req_json.get("pw")89 if account_pwd is None:90 return await convert_response({"error": "account pw is empty"}, ResponseCode.RESPONSE_ERROR)91 try:92 user = session.query(Account).filter(Account.account_id == account_id).one()93 except NoResultFound:94 return await convert_response({"msg": f"account is empty"}, ResponseCode.RESPONSE_ERROR)95 if user.account_pwd != account_pwd:96 return await convert_response({"error": "invalid pw"}, ResponseCode.RESPONSE_ERROR)97 session.commit()98 token: bytes = await jwt_encode(account_id, user.valid_token_time.timestamp())99 return await convert_response({"token": token.decode()})100class RefreshToken(Resource):101 async def put(self, request: 'Request') -> 'HTTPResponse':102 await db_connect()103 session = SqliteDataBase.create_session()104 res = await self._logic(session, request)105 await db_close(session)106 return res107 async def _logic(self, session: 'Session', request: 'Request') -> 'HTTPResponse':108 ret: Tuple[bool, str] = await self._valid_header(request)109 ret_bool, ret_token = ret110 if not ret_bool:111 return await convert_response({"error": "invalid refresh token"}, ResponseCode.RESPONSE_ERROR)112 try:113 payload: dict = await jwt_decode(ret_token.encode())114 except InvalidSignatureError:115 return await convert_response({"error": "invalid refresh token"}, ResponseCode.RESPONSE_ERROR)116 try:117 user = session.query(Account).filter(Account.account_id == payload['id']).one()118 except NoResultFound:119 return await convert_response({"msg": f"account is empty"}, ResponseCode.RESPONSE_ERROR)120 user.valid_token_time = datetime.utcnow()121 session.commit()122 token: bytes = await jwt_encode(user.account_id, user.valid_token_time.timestamp())123 return await convert_response({"token": token.decode()})124 async def _valid_header(self, request: 'Request') -> tuple:125 header = request.headers.get("Authorization")126 if header is None:127 return False, ""128 tokens = header.split(' ')129 if len(tokens) != 2:130 return False, ""131 t_type = tokens[0]132 if t_type != 'bearer':133 return False, ""134 token: str = tokens[1]...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful