How to use on_recv method in ATX

Best Python code snippet using ATX

api_connector.py

Source:api_connector.py Github

copy

Full Screen

...50 def _on_timeout():51 APIConnector.log_debug("%s: timed out", self.debug_str())52 self.has_errors = True53 self.timeout_callback = None54 stream.stop_on_recv()55 stream.close()56 self._release()57 if on_timeout is not None:58 on_timeout()59 def _on_recv(msg):60 APIConnector.log_debug("%s: message received", self.debug_str())61 if self.timeout_callback is not None:62 loop.remove_timeout(self.timeout_callback)63 self.timeout_callback = None64 stream.stop_on_recv()65 self._release()66 msg = json.loads(msg[0])67 if on_recv is not None:68 on_recv(msg)69 self.log_debug("%s: making call with timeout %r", self.debug_str(), timeout)70 self.timeout_callback = loop.add_timeout(timeout, _on_timeout)71 stream.on_recv(_on_recv)72 self.queue.incr_outstanding(1)73 self.sock.send(send_data)74 @staticmethod75 def send_recv(api_name, cmd, args=None, vargs=None, on_recv=None, on_timeout=None, on_overload=None, timeout=None):76 send_data = APIConnector.make_req(cmd, args=args, vargs=vargs)77 api = APIConnector._get_conn(api_name)78 if api.queue.mean_outstanding >= APIQueue.BUFFER_SZ:79 on_overload()80 return81 APIConnector.log_debug("%s: calling %s", api.debug_str(), cmd)82 api.conn_send_recv(send_data, on_recv, on_timeout, timeout)83 @staticmethod84 def send_terminate_msg(api_name, on_recv=None):85 APIConnector.send_recv(api_name, APIConnector.CMD_TERMINATE, on_recv=on_recv)...

Full Screen

Full Screen

test_zmqstream.py

Source:test_zmqstream.py Github

copy

Full Screen

...42 assert not timed_out43 def test_callable_check(self):44 """Ensure callable check works (py3k)."""45 self.stream.on_send(lambda *args: None)46 self.stream.on_recv(lambda *args: None)47 self.assertRaises(AssertionError, self.stream.on_recv, 1)48 self.assertRaises(AssertionError, self.stream.on_send, 1)49 self.assertRaises(AssertionError, self.stream.on_recv, zmq)50 def test_on_recv_basic(self):51 sent = [b'basic']52 def callback(msg):53 assert msg == sent54 self.loop.stop()55 self.loop.add_callback(lambda: self.push.send_multipart(sent))56 self.pull.on_recv(callback)57 self.run_until_timeout()58 def test_on_recv_wake(self):59 sent = [b'wake']60 def callback(msg):61 assert msg == sent62 self.loop.stop()63 self.pull.on_recv(callback)64 self.loop.call_later(1, lambda: self.push.send_multipart(sent))...

Full Screen

Full Screen

concurrentbase.py

Source:concurrentbase.py Github

copy

Full Screen

...17 self.recv_future.result() # here ?18 self.send_future.result()19 def recv_routine(self, on_recv):20 while self.is_active:21 on_recv()22 time.sleep(1)23 def send_routine(self, on_working):24 on_working()25 def cancel(self):26 self.is_active = False27if __name__ == "__main__":28 active = True29 def on_recv():30 print('on_recv')31 def on_working():32 # How to cancel? 33 while active:34 print('send_routine')35 time.sleep(1)36 concurrent = ConcurrentBase(on_recv, on_working)37 concurrent.run()38 # wait for KeyboardInterrupt39 try:40 while True:41 time.sleep(0.01)42 except KeyboardInterrupt:43 concurrent.cancel() # or shutdown?...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run ATX automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful