How to use _await_response method in tavern

Best Python code snippet using tavern

MTPClient.py

Source:MTPClient.py Github

copy

Full Screen

...242 )243 self.stack.append(s)244 break245246 def _await_response(247 self,248 expected: str,249 exception: str = "Server responded inadequately",250 optional: bool = False,251 ) -> Union[str, List[str]]:252 """253 Looks up the stack for expected response the protocol, returns data254 if they are expected (\"<>\" notation in expected str)255 """256 if self.stack == []:257 self._recv_into_stack()258 response = self.stack.pop(0)259 data = []260261 """262 if the response should contain any kind of non-static information,263 treat the expected string as a regex and save it to264 """265 if "<>" in expected:266 # check if the response is valid in the first place267 pattern = re.compile("^" + expected.replace("<>", ".+") + "$")268 try:269 assert re.match(pattern, response), exception270 except AssertionError as e:271 if not optional:272 raise AssertionError(exception) from e273 else:274 self.stack.insert(0, response)275 return276277 # find the indexes of placeholders in the expected response278 indexes = [279 substr.start() for substr in re.finditer("<>", expected)280 ]281282 """283 go to placeholder's starting index and check what character should284 be right after the placeholder, extract that index and use it285 as the ending index of the data in response286287 (this may be overkill, but would solve the problem,288 if the protocol expanded and would pass, for example,289 multiple tokens via one server response - not each one290 in a separate netstring)291292 fails if the placeholder is followed by a character that293 also occurs inside the data - however, this would be294 an inconsistency on the side of the protocol, not295 the algorithm296 """297 for i in indexes:298 if len(expected) <= i + 2:299 data.append(response[i:])300 else:301 stop_char = expected[i + 2]302 stop_index = response.index(stop_char, i)303 data.append(response[i:stop_index])304 return data[0] if len(indexes) == 1 else data305 else:306 try:307 assert response == expected, exception308 except AssertionError as e:309 if not optional:310 raise AssertionError(exception) from e311 else:312 self.stack.insert(0, response)313314315class MTPClient:316 """317 Client for the usage of MTP (MTP V:1.0)318 ---------------------------319 GUI (standalone usage) -> main()\n320 NO GUI (third-party usage) -> send_memes(*args)321 """322323 def _build_gui(self) -> None:324 """Builds the application GUI"""325 self.gui = MemeGui(self)326 self.gui._build()327328 def send_meme(329 self,330 ip: str,331 port: int,332 name: str,333 password: str,334 is_nsfw: bool,335 description: str,336 meme: str,337 ) -> None:338 """Sends memes to the target location via MTP"""339 self.ip = ip340 self.port = port341 self.name = name342 self.password = password343 self.is_nsfw = is_nsfw344 self.description = description345 self.meme = meme346347 # handle any exception during runtime + case of tcp connection failure348 try:349 # connect via tcp to the server350 main_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)351 main_socket.connect((self.ip, self.port))352353 # initialize main socket handler - custom interface for socket communication354 main_channel = SocketHandler(main_socket)355356 # 3 main stages of the protocol - init, transport, end357 token_a, data_port = self._initialize_connection(main_channel)358 token_b, data_len = self._transport_data(token_a, data_port)359 self._end_connection(main_channel, token_b, data_len)360 messagebox.showinfo(361 "Success",362 "You've successfully sent a meme to the server at {}:{}!".format(363 self.ip, self.port364 ),365 )366 return367 except socket.error as e:368 print(str(traceback.format_exc()))369 e_msg = "Could not set up a connection / Connection disrupted"370 print(e_msg)371 self.gui.error(e_msg)372 except Exception as e:373 print(str(traceback.format_exc()))374 main_channel._send(f"E {str(e)}")375 self.gui.error(str(e))376 finally:377 main_socket.close()378 self.gui.error(379 "Your meme wasn't successfully delivered, check whether the host is up and try again later"380 )381382 def _initialize_connection(383 self, channel: SocketHandler384 ) -> Tuple[str, int]:385 """Represents the 1st stage of MTP"""386 # verify if both nodes use the same version of MTP387 channel._send("C MTP V:1.0")388 channel._await_response(389 "S MTP V:1.0",390 "Server did not respond correctly to the version challenge (init stage)",391 )392393 # pass a nickname to send memes under394 channel._send(f"C {self.name}")395396 # recieve token and data channel port397 token = channel._await_response(398 "S <>", "Token was not recieved successfully (init stage)"399 )400 data_port = channel._await_response(401 "S <>", "Data port was not recieved successfully (init stage)"402 )403404 return (token, int(data_port))405406 def _transport_data(self, token: str, data_port: int) -> str:407 """Represents the 2nd stage of MTP"""408 # connect to the data channel409 data_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)410 data_socket.connect((self.ip, data_port))411 data_channel = SocketHandler(data_socket)412413 # greet the server with our nick :)414 data_channel._send(f"C {self.name}")415416 # recieve confirmation token and data type, verify token match417 recv_token = data_channel._await_response(418 "S <>",419 "Couldn't start the data transport due to server's unexpected behaviour (transport stage)",420 )421 assert (422 recv_token == token423 ), "Pre-data transport token mismatch (transport stage)"424425 # TRANSPORT LOOP426 # capture total length of data sent to server427 data_len_sum = 0428 while True:429 current_data_len = 0430431 # identify target data and check if it can be accessed432 requested_data_type = data_channel._await_response(433 "S REQ:<>",434 "Expected data type was not recieved successfully (transport stage)",435 )436 assert requested_data_type in [437 "meme",438 "description",439 "isNSFW",440 "password",441 ], f"Unknown data type requested: {requested_data_type} - use meme, description, isNSFW or password (transport stage)"442443 # send requested data444 if requested_data_type == "meme":445 data_channel._send(f"C {self.meme}")446 current_data_len = len(self.meme)447 elif requested_data_type == "description":448 data_channel._send(f"C {self.description}")449 current_data_len = len(self.description)450 elif requested_data_type == "isNSFW":451 val = "true" if self.is_nsfw else "false"452 data_channel._send(f"C {val}")453 current_data_len = len(val)454 else:455 data_channel._send(f"C {self.password}")456 current_data_len = len(self.password)457458 # suppose that data corruption doesn't occur and so if the lengths match - data was transported successfully459 recv_data_len = data_channel._await_response(460 "S ACK:<>",461 "Data length of the sent data was not recieved successfully (transport stage)",462 )463 assert (464 int(recv_data_len) == current_data_len465 ), "Recieved data length doesn't match the sent data length (transport stage)"466 data_len_sum += current_data_len467468 # attempt to recieve a token and end communication on this channel469 token = data_channel._await_response(470 "S END:<>",471 exception="Token was not recieved successfully (transport stage)",472 optional=True,473 )474 if token:475 data_socket.close()476 return (token, data_len_sum)477478 def _end_connection(479 self, main_channel: SocketHandler, token: str, data_len: int480 ) -> None:481 """Represents the 3rd stage of MTP"""482 # check if total length of data recieved by server matches the sent483 recv_data_len = main_channel._await_response(484 "S <>",485 "Total length of data was not recieved successfully (end stage)",486 )487 assert (488 int(recv_data_len) == data_len489 ), "Total length of data recieved by server doesn't match the total length of data sent by client (end stage)"490491 # identify with the token recieved at the end of the data transport492 main_channel._send(f"C {token}")493494 # check for ack of the end of the communication495 main_channel._await_response(496 "S ACK", "Server did not acknowledge the end of communication"497 )498499 def main(self) -> None:500 """Entry point of the MTP client"""501 self._build_gui()502503504if __name__ == "__main__":505 client = MTPClient() ...

Full Screen

Full Screen

test_client.py

Source:test_client.py Github

copy

Full Screen

...25 (b'raw request', b'raw request', b'', b''),26 ('request data', b'request data', b'Test. \xd0\xa2\xd0\xb5\xd1\x81\xd1\x82', 'Test. Тест'),27 (b'raw request', b'raw request', b'Raw message \xff', b'Raw message \xff'),28 ])29 async def test_await_response(self, connection, data, sent_payload, received_payload, expected_result):30 """Check that data returned by await_response is parsed and returned correctly."""31 rpc = cabbage.AsyncAmqpRpc(connection=connection)32 await rpc.connect()33 with patch('cabbage.amqp.AsyncAmqpRpc._await_response', return_value=received_payload, autospec=True), \34 patch('cabbage.amqp.uuid.uuid4', return_value=RESPONSE_CORR_ID):35 result = await rpc.send_rpc(destination=TEST_DESTINATION, data=data, await_response=True)36 assert result == expected_result37 rpc.channel.basic_publish.assert_called_once_with(38 exchange_name='', routing_key=TEST_DESTINATION, payload=sent_payload,39 properties={'reply_to': RANDOM_QUEUE, 'correlation_id': RESPONSE_CORR_ID})40 @pytest.mark.parametrize('correlation_id, expected', [41 ('my-correlation-id', 'my-correlation-id'),42 (None, RESPONSE_CORR_ID), # generated by the library43 ])44 async def test_correlation_id(self, connection, correlation_id, expected):45 """Check that correlation id either matches the custom id or is generated from uuid."""46 rpc = cabbage.AsyncAmqpRpc(connection=connection)47 await rpc.connect()48 with patch('cabbage.amqp.AsyncAmqpRpc._await_response', return_value=b'response', autospec=True), \49 patch('cabbage.amqp.uuid.uuid4', return_value=RESPONSE_CORR_ID):50 await rpc.send_rpc(destination=TEST_DESTINATION, data='payload', await_response=True,51 correlation_id=correlation_id)52 rpc.channel.basic_publish.assert_called_once_with(53 exchange_name='', routing_key=TEST_DESTINATION, payload=b'payload',54 properties={'reply_to': RANDOM_QUEUE, 'correlation_id': expected})55class TestAwaitResponse:56 """AsyncAmqpRpc._await_response"""57 async def test_ok(self, rpc):58 # schedule awaiting response in another Task59 task = asyncio.ensure_future(rpc._await_response(correlation_id=RESPONSE_CORR_ID, timeout=10.0))60 # but it's not executing yet61 assert set(rpc._responses.keys()) == set()62 # let it run for a bit63 await asyncio.sleep(0)64 # check that it created a Future65 assert set(rpc._responses.keys()) == {RESPONSE_CORR_ID}66 # set Future result67 rpc._responses[RESPONSE_CORR_ID].set_result('task result')68 # let the Task run to completion69 await task70 assert task.done()71 assert task.result() == 'task result'72 # check that it cleaned up rpc._responses73 assert set(rpc._responses.keys()) == set()74 async def test_timeout(self, rpc):75 with pytest.raises(cabbage.ServiceUnavailableError):76 await rpc._await_response(correlation_id=RESPONSE_CORR_ID, timeout=0)77 assert set(rpc._responses.keys()) == set()78class TestOnResponse:79 """AsyncAmqpRpc._on_response: aioamqp handler for messages in callback queue."""80 @pytest.mark.parametrize('body', [b'', b'Test message body. \xd0\xa2\xd0\xb5\xd1\x81\xd1\x82'])81 async def test_ok(self, rpc, body):82 rpc._responses[RESPONSE_CORR_ID] = asyncio.Future()83 await rpc._on_response(channel=rpc.channel, body=body, envelope=MockEnvelope(), properties=MockProperties())84 assert rpc._responses[RESPONSE_CORR_ID].done()85 assert rpc._responses[RESPONSE_CORR_ID].result() == body86 rpc.channel.basic_client_ack.assert_called_once_with(delivery_tag=DELIVERY_TAG)87 rpc.channel.basic_client_nack.assert_not_called()88 async def test_unexpected_tag(self, rpc):89 await rpc._on_response(channel=rpc.channel, body=b'resp', envelope=MockEnvelope(), properties=MockProperties())90 rpc.channel.basic_client_ack.assert_called_once_with(delivery_tag=DELIVERY_TAG)

Full Screen

Full Screen

tests.py

Source:tests.py Github

copy

Full Screen

...73 Base.metadata.drop_all(self.engine)74 # Utilities ------------------------------------------------------------------------75 def _send_message(self, message):76 self.app.send_message(self.bot, message)77 def _await_response(self):78 while self.response == Empty or self.response == self.last_response:79 pass80 self.last_response = self.response81 def _assertResponseContains(self, *items):82 return all(item in self.response.text for item in items)83 def _assertResponseEquals(self, item):84 return self.response.text == item85 def _assertKeyboardContains(self, *messages):86 return all(87 [message] in self.response.reply_markup["keyboard"] for message in messages88 )89 # Tests ----------------------------------------------------------------------------90 def test_welcome(self):91 self._send_message("/start")92 self._await_response()93 self.assertEqual(self.response.text, CONFIG["MESSAGES"]["WELCOME"])94 def test_address(self, address="New York"):95 self.address = address96 self._send_message(address)97 self._await_response()98 self._assertResponseContains("\u00B0C", self.address)99 self._assertKeyboardContains(100 CONFIG["KEYBOARD"]["SET_DEFAULT"], CONFIG["KEYBOARD"]["SET_DEFAULT"]101 )102 def test_address_wrong(self):103 with self.subTest("narrow"):104 self._send_message("Times Square")105 self._await_response()106 self._assertResponseContains("too general or too narrow")107 with self.subTest("wide"):108 self._send_message("USA")109 self._await_response()110 self._assertResponseContains("too general or too narrow")111 def test_repeat(self):112 self.test_address()113 self._send_message(CONFIG["KEYBOARD"]["REPEAT"])114 self._await_response()115 self._assertResponseContains("\u00B0C", self.address)116 def test_set_default(self, address="Moscow"):117 self.default_address = address118 self.test_address(self.default_address)119 self._send_message(CONFIG["KEYBOARD"]["SET_DEFAULT"])120 self._await_response()121 self._assertResponseEquals(CONFIG["MESSAGES"]["SET_DEFAULT_CONF"])122 def test_get_default(self):123 self.test_set_default()124 self.test_address()125 self._send_message(CONFIG["KEYBOARD"]["GET_DEFAULT"])126 self._await_response()127 self._assertResponseContains("\u00B0C", self.default_address)128if __name__ == "__main__":...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tavern automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful