How to use create_event_bus method in localstack

Best Python code snippet using localstack_python

test_cloudwatch_events.py

Source:test_cloudwatch_events.py Github

copy

Full Screen

...17 event_bus_name = "my-event-bus"18 def test__success(self):19 """success"""20 cwe = CloudWatchEvents()21 cwe.cwe_client.create_event_bus(Name=self.event_bus_name)22 cwe.put_permission(self.correct_principal, self.event_bus_name)23 cwe.cwe_client.delete_event_bus(Name=self.event_bus_name) # clean-up24 def test__fail__resource_not_found(self):25 """fail with ResourceNotFoundException"""26 cwe = CloudWatchEvents()27 with pytest.raises(cwe.cwe_client.exceptions.ResourceNotFoundException):28 cwe.put_permission(self.correct_principal, self.event_bus_name)29 def test__fail__client_error(self):30 """fail with InvalidParameterValue"""31 cwe = CloudWatchEvents()32 cwe.cwe_client.create_event_bus(Name=self.event_bus_name)33 with pytest.raises(ClientError) as err:34 cwe.put_permission(self.wrong_principal, self.event_bus_name)35 assert err.value.response["Error"]["Code"] == "InvalidParameterValue"36 cwe.cwe_client.delete_event_bus(Name=self.event_bus_name) # clean-up37@pytest.mark.TDD38@mock_events39class TestClassRemovePermission:40 """TDD test class for CloudWatch Events remove permission calls"""41 correct_principal = "123456789012"42 statement_id = "my-statement-01"43 event_bus_name = "my-event-bus"44 def test__success(self):45 """success"""46 cwe = CloudWatchEvents()47 cwe.cwe_client.create_event_bus(Name=self.event_bus_name) # mock setup48 cwe.put_permission(self.correct_principal, self.event_bus_name)49 cwe.remove_permission(self.statement_id, self.event_bus_name)50 cwe.cwe_client.delete_event_bus(Name=self.event_bus_name) # clean-up51 def test__fail__resource_not_found_eventbus(self):52 """fail with ResourceNotFoundException"""53 cwe = CloudWatchEvents()54 assert (55 cwe.remove_permission(self.statement_id, self.event_bus_name)56 is None57 )58 def test__fail__resource_not_found_policy(self):59 """fail with ResourceNotFoundException"""60 cwe = CloudWatchEvents()61 cwe.cwe_client.create_event_bus(Name=self.event_bus_name)62 assert (63 cwe.remove_permission("my-statement-02", self.event_bus_name)64 is None65 )66 cwe.cwe_client.delete_event_bus(Name=self.event_bus_name) # clean-up67 def test__fail__client_error(self):68 """fail with ClientError"""69 cwe = CloudWatchEvents()70 stubber = Stubber(cwe.cwe_client)71 stubber.add_client_error(72 "remove_permission",73 service_error_code="InternalException",74 service_message="this is test error",75 )76 stubber.activate()77 with pytest.raises(ClientError):78 cwe.remove_permission(self.statement_id, self.event_bus_name)79 stubber.deactivate()80@pytest.mark.TDD81@mock_events82class TestClassDescribe:83 """TDD test class for CloudWatch Events describe calls"""84 event_bus_name = "my-event-bus"85 def test__success(self):86 """success"""87 cwe = CloudWatchEvents()88 cwe.cwe_client.create_event_bus(Name=self.event_bus_name)89 resp = cwe.describe_event_bus(event_bus_name=self.event_bus_name)90 assert resp["Name"] == self.event_bus_name91 _region = os.environ.get("AWS_DEFAULT_REGION")92 assert re.match(93 fr"(arn:aws:events:{_region}:)\d{{12}}:(event-bus/{self.event_bus_name})",94 resp["Arn"],95 )96 assert resp["ResponseMetadata"]["HTTPStatusCode"] == 20097 cwe.cwe_client.delete_event_bus(Name=self.event_bus_name) # clean-up98 def test__fail__resource_not_found(self):99 """fail with ResourceNotFoundException"""100 cwe = CloudWatchEvents()101 with pytest.raises(cwe.cwe_client.exceptions.ResourceNotFoundException):102 cwe.describe_event_bus(event_bus_name=self.event_bus_name)

Full Screen

Full Screen

project_diagrams.py

Source:project_diagrams.py Github

copy

Full Screen

...44 if cache:45 service_cache = ElasticacheCacheNode(f"{service.value}Cache")46 node - service_cache47 return node48def create_event_bus(event_bus: EventBus) -> Node:49 return Eventbridge(event_bus.value)50def draw_gateway_diagram(**kwargs):51 with Diagram("Services diagram", **kwargs):52 with Cluster("Gateway"):53 gateway = create_service(Services.GATEWAY, cache=True)54 with Cluster("Auth"):55 auth = create_service(Services.AUTH, storage=True)56 with Cluster("PrivateNetwork"):57 with Cluster("Analytics"):58 analytics = create_service(Services.ANALYTICS, storage=True)59 with Cluster("Accounting"):60 accounting = create_service(Services.ACCOUNTING, storage=True)61 worker = Cronjob("BillingSendWorker")62 accounting - worker63 with Cluster("TaskKeeper"):64 task_keeper = create_service(65 Services.TASK_KEEPER, storage=True66 )67 worker = Cronjob("ReassignWorker")68 task_keeper - worker69 client = create_service(Services.WEB_CLIENT)70 client >> Edge(label="https") >> auth71 client >> Edge(label="https") >> gateway72 auth >> Edge(label="https") >> gateway73 gateway >> Edge(label="http") >> analytics74 gateway >> Edge(label="http") >> accounting75 gateway >> Edge(label="http") >> task_keeper76def draw_gateway_events_diagram(**kwargs):77 with Diagram("Events diagram", **kwargs):78 auth = create_service(Services.AUTH)79 analytics = create_service(Services.ANALYTICS)80 accounting = create_service(Services.ACCOUNTING)81 task_keeper = create_service(Services.TASK_KEEPER)82 account_bus = create_event_bus(EventBus.ACCOUNT_EVENTS)83 task_bus = create_event_bus(EventBus.TASK_EVENTS)84 transaction_bus = create_event_bus(EventBus.TRANSACTION_EVENTS)85 billing_cycle_bus = create_event_bus(EventBus.BILLING_CYCLE_EVENTS)86 task_cost_bus = create_event_bus(EventBus.TASK_COST_EVENTS)87 auth >> account_bus88 account_bus >> accounting89 account_bus >> task_keeper90 account_bus >> analytics91 task_keeper >> task_bus92 accounting >> task_cost_bus93 task_bus >> accounting94 task_bus >> analytics95 task_cost_bus >> analytics96 accounting >> transaction_bus >> analytics97 accounting >> billing_cycle_bus >> analytics98if __name__ == "__main__":99 draw_gateway_diagram(show=False, direction="TB")100 draw_gateway_events_diagram(show=False)

Full Screen

Full Screen

without_gateway.py

Source:without_gateway.py Github

copy

Full Screen

...41 if cache:42 service_cache = ElasticacheCacheNode(f"{service.value}Cache")43 node - service_cache44 return node45def create_event_bus(event_bus: EventBus) -> Node:46 return Eventbridge(event_bus.value)47def create_external_services(48 exclude: tuple[Services] = (),49) -> dict[Services, Node]:50 return {51 service_type: create_service(service_type)52 for service_type in Services53 if service_type not in exclude54 }55def create_event_buses() -> dict[EventBus, Node]:56 return {event_bus: create_event_bus(event_bus) for event_bus in EventBus}57def create_internal_services(58 exclude: tuple[Services] = (), **kwargs59) -> dict[Services, Node]:60 creator = partial(create_service, **kwargs)61 return {62 service: creator(63 service=service,64 )65 for service in Services66 if service not in exclude67 }68def draw_gateway_diagram(**kwargs):69 with Diagram("Services without gateway diagram", **kwargs):70 auth = create_service(Services.AUTH)71 analytics = create_service(Services.ANALYTICS)72 accounting = create_service(Services.ACCOUNTING)73 task_keeper = create_service(Services.TASK_KEEPER)74 client = create_service(Services.WEB_CLIENT)75 client >> auth76 client >> analytics77 client >> accounting78 client >> task_keeper79def draw_gateway_events_diagram(**kwargs):80 with Diagram("Events without gateway diagram", **kwargs):81 auth = create_service(Services.AUTH)82 analytics = create_service(Services.ANALYTICS)83 accounting = create_service(Services.ACCOUNTING)84 task_keeper = create_service(Services.TASK_KEEPER)85 account_bus = create_event_bus(EventBus.ACCOUNT_EVENTS)86 task_bus = create_event_bus(EventBus.TASK_EVENTS)87 billing_bus = create_event_bus(EventBus.BillingEvents)88 auth >> account_bus89 account_bus >> accounting90 account_bus >> task_keeper91 account_bus >> analytics92 task_keeper >> task_bus93 task_bus >> accounting94 task_bus >> analytics95 accounting >> billing_bus96 billing_bus >> analytics97if __name__ == "__main__":98 draw_gateway_diagram(show=False, direction="TB")...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful