How to use handler_foo method in localstack

Best Python code snippet using localstack_python

test_client_async_await.py

Source:test_client_async_await.py Github

copy

Full Screen

...34 errors = []35 async def error_handler(e):36 errors.append(e)37 await nc.connect(error_cb=error_handler)38 async def handler_foo(msg):39 msgs.append(msg)40 # Should not block other subscriptions from receiving messages.41 await asyncio.sleep(0.2)42 if msg.reply != "":43 await nc.publish(msg.reply, msg.data * 2)44 await nc.subscribe("foo", cb=handler_foo)45 async def handler_bar(msg):46 msgs.append(msg)47 if msg.reply != "":48 await nc.publish(msg.reply, b'')49 await nc.subscribe("bar", cb=handler_bar)50 await nc.publish("foo", b'1')51 await nc.publish("foo", b'2')52 await nc.publish("foo", b'3')53 # Will be processed before the others since no head of line54 # blocking among the subscriptions.55 await nc.publish("bar", b'4')56 response = await nc.request("foo", b'hello1', timeout=1)57 self.assertEqual(response.data, b'hello1hello1')58 with self.assertRaises(TimeoutError):59 await nc.request("foo", b'hello2', timeout=0.1)60 await nc.publish("bar", b'5')61 response = await nc.request("foo", b'hello2', timeout=1)62 self.assertEqual(response.data, b'hello2hello2')63 self.assertEqual(msgs[0].data, b'1')64 self.assertEqual(msgs[1].data, b'4')65 self.assertEqual(msgs[2].data, b'2')66 self.assertEqual(msgs[3].data, b'3')67 self.assertEqual(msgs[4].data, b'hello1')68 self.assertEqual(msgs[5].data, b'hello2')69 self.assertEqual(len(errors), 0)70 await nc.close()71 @async_test72 async def test_subscription_slow_consumer_pending_msg_limit(self):73 nc = NATS()74 msgs = []75 errors = []76 async def error_handler(e):77 if type(e) is SlowConsumerError:78 errors.append(e)79 await nc.connect(error_cb=error_handler)80 async def handler_foo(msg):81 await asyncio.sleep(0.2)82 msgs.append(msg)83 if msg.reply != "":84 await nc.publish(msg.reply, msg.data * 2)85 await nc.subscribe("foo", cb=handler_foo, pending_msgs_limit=5)86 async def handler_bar(msg):87 msgs.append(msg)88 if msg.reply != "":89 await nc.publish(msg.reply, msg.data * 3)90 await nc.subscribe("bar", cb=handler_bar)91 for i in range(10):92 await nc.publish("foo", f'{i}'.encode())93 # Will be processed before the others since no head of line94 # blocking among the subscriptions.95 await nc.publish("bar", b'14')96 response = await nc.request("bar", b'hi1', 2)97 self.assertEqual(response.data, b'hi1hi1hi1')98 self.assertEqual(len(msgs), 2)99 self.assertEqual(msgs[0].data, b'14')100 self.assertEqual(msgs[1].data, b'hi1')101 # Consumed messages but the rest were slow consumers.102 self.assertTrue(4 <= len(errors) <= 5)103 for e in errors:104 self.assertEqual(type(e), SlowConsumerError)105 self.assertIn("foo", str(e))106 self.assertNotIn("bar", str(e))107 self.assertEqual(errors[0].sid, 1)108 await nc.close()109 @async_test110 async def test_subscription_slow_consumer_pending_bytes_limit(self):111 nc = NATS()112 msgs = []113 errors = []114 async def error_handler(e):115 if type(e) is SlowConsumerError:116 errors.append(e)117 await nc.connect(error_cb=error_handler)118 async def handler_foo(msg):119 await asyncio.sleep(0.2)120 msgs.append(msg)121 if msg.reply != "":122 await nc.publish(msg.reply, msg.data * 2)123 await nc.subscribe("foo", cb=handler_foo, pending_bytes_limit=10)124 async def handler_bar(msg):125 msgs.append(msg)126 if msg.reply != "":127 await nc.publish(msg.reply, msg.data * 3)128 await nc.subscribe("bar", cb=handler_bar)129 for i in range(10):130 await nc.publish("foo", f"AAA{i}".encode())131 # Will be processed before the others since no head of line132 # blocking among the subscriptions....

Full Screen

Full Screen

test_tagging.py

Source:test_tagging.py Github

copy

Full Screen

1from collections import namedtuple2from unittest import mock3from typing import Dict, List, Any4import pytest5import fh_immuta_utils.tagging as tg6import fh_immuta_utils.data_source as ds7TAG_MAP = {"col_foo": ["foo", "foobar"], "col_bar": ["bar.baz"]}8DATA_SOURCE_TAGS = {9 ("handler_foo", "database_foo"): {10 "ath_match*": ["eeny", "meeny.miny"],11 "rs_fail*": ["moe"],12 }13}14@pytest.fixture15def tagger():16 with mock.patch("fh_immuta_utils.tagging.Tagger.read_configs", return_value=None):17 obj = tg.Tagger(config_root="")18 obj.tag_map_datadict = TAG_MAP19 obj.tag_map_datasource = DATA_SOURCE_TAGS20 return obj21@pytest.mark.parametrize("tag,is_root", [("foo", False), ("bar", True)])22def test_is_root_tag(tagger: tg.Tagger, tag: str, is_root: bool):23 assert tagger.is_root_tag(tag) == is_root24@pytest.mark.parametrize(25 "col,expected", [("col_foo", TAG_MAP["col_foo"]), ("bad_col", [])]26)27def test_get_tags_for_column(tagger: tg.Tagger, col: str, expected: List[str]):28 assert tagger.get_tags_for_column(column_name=col) == expected29def test_tags_to_make(tagger):30 assert list(tagger.tags_to_make()) == [31 ("foo", []),32 ("foobar", []),33 ("bar", ["bar.baz"]),34 ("eeny", []),35 ("meeny", ["meeny.miny"]),36 ("moe", []),37 ]38def test_get_tags_for_data_source(tagger):39 expected = [40 {"name": "eeny", "source": "curated"},41 {"name": "meeny.miny", "source": "curated"},42 ]43 assert (44 tagger.get_tags_for_data_source(45 name="ath_match_succeeds",46 handler_type="handler_foo",47 connection_string="fake.connection.string:5439/database_foo",48 )49 == expected50 )51 assert (52 tagger.get_tags_for_data_source(53 name="ath_fail_match",54 handler_type="handler_foo",55 connection_string="fake.connection.string:5439/database_foo",56 )57 == []58 )59 assert (60 tagger.get_tags_for_data_source(61 name="rs_fail_bar",62 handler_type="handler_fail_match",63 connection_string="fake.connection.string:5439/database_fail_match",64 )65 == []66 )67TagMsgBody = namedtuple("TagMsgBody", ["root_tag", "children", "expected"])68TAG_MSG_BODY = [69 TagMsgBody(root_tag="foo", children=[], expected={"tags": [{"name": "foo"}]}),70 TagMsgBody(71 root_tag="bar",72 children=["baz"],73 expected={74 "rootTag": {"name": "bar", "deleteHierarchy": False},75 "tags": [{"name": "baz"}],76 },77 ),78 TagMsgBody(79 root_tag="foobar",80 children=["foo", "bar"],81 expected={82 "rootTag": {"name": "foobar", "deleteHierarchy": False},83 "tags": [{"name": "foo"}, {"name": "bar"}],84 },85 ),86]87@pytest.mark.parametrize("root_tag,children,expected", TAG_MSG_BODY)88def test_create_message_body_for_tag_creation(89 tagger: tg.Tagger, root_tag: str, children: List[str], expected: Dict[str, Any]90):91 assert (92 tagger.create_message_body_for_tag_creation(93 root_tag=root_tag, children=children94 )95 == expected96 )97def test_enrich_columns_with_tagging(tagger: tg.Tagger):98 columns = [99 ds.DataSourceColumn(name="col_foo", dataType="", remoteType="", nullable=False),100 ds.DataSourceColumn(name="col_bar", dataType="", remoteType="", nullable=False),101 ds.DataSourceColumn(name="bad_col", dataType="", remoteType="", nullable=False),102 ]103 enriched_cols = tagger.enrich_columns_with_tagging(columns)104 assert len(enriched_cols) == len(columns)105 for col in enriched_cols:...

Full Screen

Full Screen

server.py

Source:server.py Github

copy

Full Screen

...7from http.server import HTTPServer8from socketserver import ThreadingMixIn9def handler_404(self):10 self.send_response(404)11def handler_foo(self):12 logging.info("Handling foo...")13 time.sleep(.075 + random.random() * .05)14 self.send_response(200)15 self.end_headers()16 self.wfile.write(b"Handled foo")17def handler_bar(self):18 logging.info("Handling bar...")19 time.sleep(.15 + random.random() * .1)20 self.send_response(200)21 self.end_headers()22 self.wfile.write(b"Handled bar")23ROUTES = {24 "/api/foo": handler_foo,25 "/api/bar": handler_bar,...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful