How to use process_raw_message method in avocado

Best Python code snippet using avocado_python

test_status_repo.py

Source:test_status_repo.py Github

copy

Full Screen

...4 def setUp(self):5 self.status_repo = repo.StatusRepo()6 def test_process_raw_message_invalid(self):7 with self.assertRaises(utils.StatusMsgInvalidJSONError):8 self.status_repo.process_raw_message('+-+-InvalidJSON-AFAICT-+-+')9 def test_process_raw_message_no_id(self):10 msg = ('{"status": "finished", "time": 1597774000.5140226, '11 '"returncode": 0}')12 with self.assertRaises(repo.StatusMsgMissingDataError):13 self.status_repo.process_raw_message(msg)14 def test_set_task_data(self):15 self.status_repo._set_task_data({"id": "1-foo", "status": "started"})16 self.assertEqual(self.status_repo._all_data["1-foo"],17 [{"status": "started"}])18 def test_handle_task_started(self):19 msg = {"id": "1-foo", "status": "started", "output_dir": "/fake/path"}20 self.status_repo._handle_task_started(msg)21 self.assertEqual(self.status_repo.get_task_data("1-foo"),22 [{"status": "started", "output_dir": "/fake/path"}])23 def test_handle_task_started_no_output_dir(self):24 msg = {"id": "1-foo", "status": "started"}25 with self.assertRaises(repo.StatusMsgMissingDataError):26 self.status_repo._handle_task_started(msg)27 def test_handle_task_finished_no_result(self):28 msg = {"id": "1-foo", "status": "finished"}29 self.status_repo._handle_task_finished(msg)30 self.assertEqual(self.status_repo.get_task_data("1-foo"),31 [{"status": "finished"}])32 self.assertEqual(self.status_repo._by_result.get(None), ["1-foo"])33 def test_handle_task_finished_result(self):34 msg = {"id": "1-foo", "status": "finished", "result": "pass"}35 self.status_repo._handle_task_finished(msg)36 self.assertEqual(self.status_repo.get_task_data("1-foo"),37 [{"status": "finished", "result": "pass"}])38 self.assertEqual(self.status_repo._by_result.get("pass"), ["1-foo"])39 def test_process_message_running(self):40 msg = {"id": "1-foo", "status": "running"}41 self.status_repo.process_message(msg)42 self.assertEqual(self.status_repo.get_task_data("1-foo"),43 [{"status": "running"}])44 def test_process_raw_message_task_started(self):45 msg = '{"id": "1-foo", "status": "started", "output_dir": "/fake/path"}'46 self.status_repo.process_raw_message(msg)47 self.assertEqual(self.status_repo.get_task_data("1-foo"),48 [{"status": "started", "output_dir": "/fake/path"}])49 def test_process_raw_message_task_running(self):50 msg = '{"id": "1-foo", "status": "running"}'51 self.status_repo.process_raw_message(msg)52 self.assertEqual(self.status_repo.get_task_data("1-foo"),53 [{"status": "running"}])54 def test_process_messages_running(self):55 msg = {"id": "1-foo", "status": "running", "time": 1597894378.6080744}56 self.status_repo.process_message(msg)57 msg = {"id": "1-foo", "status": "running", "time": 1597894378.6103745}58 self.status_repo.process_message(msg)59 self.assertEqual(self.status_repo.get_latest_task_data("1-foo"),60 {"status": "running", "time": 1597894378.6103745})61 def test_task_status_time(self):62 msg = {"id": "1-foo", "status": "running", "time": 1597894378.0000002}63 self.status_repo.process_message(msg)64 self.assertEqual(self.status_repo._status["1-foo"],65 ("running", 1597894378.0000002))...

Full Screen

Full Screen

websockets.py

Source:websockets.py Github

copy

Full Screen

...16 # process message normally17 # print("message type: {}".format(msg['e']))18 pprint(msg)19 # do something20def process_raw_message(msg):21 pprint(f"A: {(msg['a'][0])}")22 pprint(f"B: {(msg['b'][0])}")23 # pprint(msg)24# trade data socket25# conn_key_trade = bm.start_trade_socket(SYMBOL, process_m_message)26# depth data socket27# conn_key_depth_partial = bm.start_depth_socket(SYMBOL, process_raw_message, depth=BinanceSocketManager.WEBSOCKET_DEPTH_5)28# conn_key_depth_diff = bm.start_depth_socket(SYMBOL, process_raw_message)29# 체잔 socket (for 현물, 마진)30# conn_key_spot_trade = bm.start_user_socket(process_message)31# conn_key_fut_cross_trade = bm.start_margin_socket(process_message)32# conn_key_fut_iso_trade = bm.start_isolated_margin_socket(SYMBOL, process_message)33# 체결 socket34# aggtrade : "Get compressed, aggregate trades. Trades that fill at the time, from the same order, with the same price will have the quantity aggregated."...

Full Screen

Full Screen

receiver.py

Source:receiver.py Github

copy

Full Screen

1import asyncio2import logging3from ..utils import _msg_backend_module, spec_import4logger = logging.getLogger(__name__)5def generic_receiver(opts):6 loop = asyncio.get_event_loop()7 receiver = _msg_backend_module(opts).Receiver()8 receiver.connect(opts['RECEIVE_SOCKET'])9 for processor_spec in opts['--processor']:10 handler_class = spec_import(processor_spec)11 try:12 handler = handler_class()13 process_message = getattr(handler, 'process_message', None)14 process_raw_message = getattr(handler, 'process_raw_message', None)15 assert process_message or process_raw_message16 assert not process_message or callable(process_message)17 assert not process_raw_message or callable(process_raw_message)18 except (TypeError, AssertionError):19 logger.error(f"Error with processor '{processor_spec}'")20 logger.error(21 "Processors must have one or both of 'process_message' and 'process_raw_message' methods"22 )23 return24 if process_message:25 receiver.add_listener(process_message)26 if process_raw_message:27 receiver.add_listener(process_raw_message)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful