Best Python code snippet using localstack_python
test_eventbridge.py
Source:test_eventbridge.py  
...56        client, stubber = self.create_stubbed_eventbridge_client(57            with_default_responses=True,58        )59        with stubber:60            client.put_events(**self._default_put_events_args())61        assert (62            stubber.requests[0].url63            == "https://events.us-east-1.amazonaws.com/"64        )65        assert b"EndpointId" not in stubber.requests[0].body66    def test_put_event_default_endpoint_explicit_configs(self):67        client, stubber = self.create_stubbed_eventbridge_client(68            with_default_responses=True,69            config=Config(70                use_dualstack_endpoint=False,71                use_fips_endpoint=False,72            ),73        )74        with stubber:75            client.put_events(**self._default_put_events_args())76        assert (77            stubber.requests[0].url78            == "https://events.us-east-1.amazonaws.com/"79        )80        assert b"EndpointId" not in stubber.requests[0].body81    @requires_crt()82    def test_put_event_endpoint_id(self):83        client, stubber = self.create_stubbed_eventbridge_client(84            with_default_responses=True,85        )86        default_args = self._default_put_events_args()87        endpoint_id = "abc123.456def"88        with stubber:89            client.put_events(EndpointId=endpoint_id, **default_args)90        self._assert_params_in_body(91            stubber.requests[0],92            [93                ("EndpointId", endpoint_id),94            ],95        )96        self._assert_multi_region_endpoint(stubber.requests[0], endpoint_id)97        self._assert_sigv4a_headers(stubber.requests[0])98    @requires_crt()99    def test_put_event_endpoint_id_explicit_config(self):100        client, stubber = self.create_stubbed_eventbridge_client(101            with_default_responses=True,102            config=Config(103                use_dualstack_endpoint=False,104                use_fips_endpoint=False,105            ),106        )107        default_args = self._default_put_events_args()108        endpoint_id = "abc123.456def"109        with stubber:110            client.put_events(EndpointId=endpoint_id, **default_args)111        self._assert_params_in_body(112            stubber.requests[0],113            [114                ("EndpointId", endpoint_id),115            ],116        )117        self._assert_multi_region_endpoint(stubber.requests[0], endpoint_id)118        self._assert_sigv4a_headers(stubber.requests[0])119    @requires_crt()120    def test_put_event_bad_endpoint_id(self):121        client, stubber = self.create_stubbed_eventbridge_client(122            with_default_responses=True,123        )124        default_args = self._default_put_events_args()125        endpoint_id = "badactor.com?foo=bar"126        with pytest.raises(InvalidEndpointConfigurationError) as e:127            client.put_events(EndpointId=endpoint_id, **default_args)128        assert "EndpointId is not a valid hostname component" in str(e.value)129    @requires_crt()130    def test_put_event_bad_endpoint_id_explicit_config(self):131        client, stubber = self.create_stubbed_eventbridge_client(132            with_default_responses=True,133            config=Config(134                use_dualstack_endpoint=False,135                use_fips_endpoint=False,136            ),137        )138        default_args = self._default_put_events_args()139        endpoint_id = "badactor.com?foo=bar"140        with pytest.raises(InvalidEndpointConfigurationError) as e:141            client.put_events(EndpointId=endpoint_id, **default_args)142        assert "EndpointId is not a valid hostname component" in str(e.value)143    @requires_crt()144    def test_put_event_empty_endpoint_id(self):145        client, stubber = self.create_stubbed_eventbridge_client(146            with_default_responses=True,147        )148        default_args = self._default_put_events_args()149        endpoint_id = ""150        with pytest.raises(InvalidEndpointConfigurationError) as e:151            client.put_events(EndpointId=endpoint_id, **default_args)152        assert "EndpointId must not be a zero length string" in str(e.value)153    @requires_crt()154    def test_put_event_empty_endpoint_id_explicit_config(self):155        client, stubber = self.create_stubbed_eventbridge_client(156            with_default_responses=True,157            config=Config(158                use_dualstack_endpoint=False,159                use_fips_endpoint=False,160            ),161        )162        default_args = self._default_put_events_args()163        endpoint_id = ""164        with pytest.raises(InvalidEndpointConfigurationError) as e:165            client.put_events(EndpointId=endpoint_id, **default_args)166        assert "EndpointId must not be a zero length string" in str(e.value)167    def test_put_event_default_dualstack_endpoint(self):168        config = Config(use_dualstack_endpoint=True, use_fips_endpoint=False)169        client, stubber = self.create_stubbed_eventbridge_client(170            with_default_responses=True, config=config171        )172        default_args = self._default_put_events_args()173        with stubber:174            client.put_events(**default_args)175        assert stubber.requests[0].url == "https://events.us-east-1.api.aws/"176    @requires_crt()177    def test_put_events_endpoint_id_dualstack(self):178        config = Config(use_dualstack_endpoint=True, use_fips_endpoint=False)179        client, stubber = self.create_stubbed_eventbridge_client(180            with_default_responses=True, config=config181        )182        default_args = self._default_put_events_args()183        endpoint_id = "abc123.456def"184        with stubber:185            client.put_events(EndpointId=endpoint_id, **default_args)186        self._assert_params_in_body(187            stubber.requests[0],188            [189                ("EndpointId", endpoint_id),190            ],191        )192        self._assert_multi_region_endpoint(193            stubber.requests[0], endpoint_id, suffix="api.aws"194        )195        self._assert_sigv4a_headers(stubber.requests[0])196    def test_put_events_default_fips_endpoint(self):197        config = Config(use_dualstack_endpoint=False, use_fips_endpoint=True)198        client, stubber = self.create_stubbed_eventbridge_client(199            with_default_responses=True, config=config200        )201        default_args = self._default_put_events_args()202        with stubber:203            client.put_events(**default_args)204        assert (205            stubber.requests[0].url206            == "https://events-fips.us-east-1.amazonaws.com/"207        )208    @requires_crt()209    def test_put_events_endpoint_id_fips(self):210        config = Config(use_dualstack_endpoint=False, use_fips_endpoint=True)211        client, stubber = self.create_stubbed_eventbridge_client(212            with_default_responses=True, config=config213        )214        default_args = self._default_put_events_args()215        endpoint_id = "abc123.456def"216        with pytest.raises(InvalidEndpointConfigurationError) as e:217            client.put_events(EndpointId=endpoint_id, **default_args)218        assert (219            "FIPS is not supported with EventBridge multi-region endpoints"220            in str(e.value)221        )222    def test_put_events_default_dualstack_fips_endpoint(self):223        config = Config(use_dualstack_endpoint=True, use_fips_endpoint=True)224        client, stubber = self.create_stubbed_eventbridge_client(225            with_default_responses=True, config=config226        )227        default_args = self._default_put_events_args()228        with stubber:229            client.put_events(**default_args)230        assert (231            stubber.requests[0].url == "https://events-fips.us-east-1.api.aws/"232        )233    @requires_crt()234    def test_put_events_endpoint_id_dualstack_fips(self):235        config = Config(use_dualstack_endpoint=True, use_fips_endpoint=True)236        client, stubber = self.create_stubbed_eventbridge_client(237            with_default_responses=True, config=config238        )239        default_args = self._default_put_events_args()240        endpoint_id = "abc123.456def"241        with pytest.raises(InvalidEndpointConfigurationError) as e:242            client.put_events(EndpointId=endpoint_id, **default_args)243        assert (244            "FIPS is not supported with EventBridge multi-region endpoints"245            in str(e.value)246        )247    def test_put_events_default_gov_endpoint(self):248        client, stubber = self.create_stubbed_eventbridge_client(249            with_default_responses=True,250            region="us-iso-east-1",251        )252        default_args = self._default_put_events_args()253        with stubber:254            client.put_events(**default_args)255        assert (256            stubber.requests[0].url257            == "https://events.us-iso-east-1.c2s.ic.gov/"258        )259    @requires_crt()260    def test_put_events_endpoint_id_gov(self):261        client, stubber = self.create_stubbed_eventbridge_client(262            with_default_responses=True,263            region="us-iso-east-1",264        )265        default_args = self._default_put_events_args()266        endpoint_id = "abc123.456def"267        with stubber:268            client.put_events(EndpointId=endpoint_id, **default_args)269        self._assert_params_in_body(270            stubber.requests[0],271            [272                ("EndpointId", endpoint_id),273            ],274        )275        self._assert_multi_region_endpoint(276            stubber.requests[0], endpoint_id, suffix="c2s.ic.gov"277        )278        self._assert_sigv4a_headers(stubber.requests[0])279    def test_put_events_default_custom_endpoint(self):280        client, stubber = self.create_stubbed_eventbridge_client(281            with_default_responses=True, endpoint_url="https://example.org"282        )283        default_args = self._default_put_events_args()284        with stubber:285            client.put_events(**default_args)286        assert stubber.requests[0].url == "https://example.org/"287    @requires_crt()288    def test_put_events_endpoint_id_custom(self):289        client, stubber = self.create_stubbed_eventbridge_client(290            with_default_responses=True, endpoint_url="https://example.org"291        )292        default_args = self._default_put_events_args()293        endpoint_id = "abc123.456def"294        with stubber:295            client.put_events(EndpointId=endpoint_id, **default_args)296        self._assert_params_in_body(297            stubber.requests[0],298            [299                ("EndpointId", endpoint_id),300            ],301        )302        assert stubber.requests[0].url == "https://example.org"...test_buffer.py
Source:test_buffer.py  
...36    event = {"event": "data"}37    for i in range(MAX_SIZE * 2):38        buffer.put_event(event)39    assert buffer.size() == MAX_SIZE40def test_put_events(buffer):41    events = [{"event": "data"}] * 242    dropped = buffer.put_events(events)43    assert buffer.size() == 244    assert dropped == 045def test_put_events_max_size(buffer):46    events = [{"event": "data"}] * MAX_SIZE * 247    dropped = buffer.put_events(events)48    assert buffer.size() == MAX_SIZE49    assert dropped == MAX_SIZE50def test_buffer_drops_events_when_put_events(buffer):51    events = [{"event": "data"}] * MAX_SIZE52    buffer.put_events(events)53    dropped = buffer.put_events(events)54    assert dropped == MAX_SIZE55    assert buffer.size() == MAX_SIZE56def test_put_multi_key_events(patch_connection_pool):57    key_a, events_a = "buffer_a", [{"event": "data"}] * 258    key_b, events_b = "buffer_b", [{"event": "data"}] * MAX_SIZE59    key_c, events_c = "buffer_c", [{"event": "data"}] * MAX_SIZE * 260    buffer_a, buffer_b, buffer_c = (61        get_buffer(key_a),62        get_buffer(key_b),63        get_buffer(key_c),64    )65    dropped = buffer_a.put_multi_key_events(66        {key_a: events_a, key_b: events_b, key_c: events_c}67    )68    assert buffer_a.size() == 269    assert buffer_b.size() == MAX_SIZE70    assert buffer_c.size() == MAX_SIZE71    assert dropped == {key_a: 0, key_b: 0, key_c: MAX_SIZE}72def test_pop_event(buffer):73    event = "buffer", {"event": "data"}74    buffer.put_event(event)75    popped_event = buffer.pop_event()76    assert id(event) != id(popped_event)77    assert event == popped_event78    assert buffer.size() == 079    assert buffer.pop_event() is None80def test_pop_events(buffer):81    events = [{"event": f"data{i}"} for i in range(MAX_SIZE)]82    buffer.put_events(events)83    popped_events = buffer.pop_events()84    assert len(popped_events) == BATCH_SIZE85    assert popped_events == events[:BATCH_SIZE]86    assert buffer.size() == MAX_SIZE - BATCH_SIZE87def test_pop_events_get_size(buffer):88    events = [{"event": f"data{i}"} for i in range(MAX_SIZE)]89    buffer.put_events(events)90    popped_events, size = buffer.pop_events_get_size()91    assert len(popped_events) == BATCH_SIZE92    assert popped_events == events[:BATCH_SIZE]93    assert size == MAX_SIZE - BATCH_SIZE94    assert buffer.size() == size95def test_clear(buffer):96    events = [{"event": "data"}] * 297    buffer.put_events(events)98    assert buffer.size() == 299    assert buffer.clear() == 2...eventbridge.py
Source:eventbridge.py  
...9        self.bus = bus10        self.endpoint=endpoint11    def put_event(self, source, event):12        if self.endpoint is not None:13            response = self.client.put_events(14                EndpointId=self.endpoint,15                Entries=[16                    {17                        "Time": datetime.now(),18                        "Source": source,19                        "Resources": [],20                        "DetailType": "boto3.client.put_events()",21                        "Detail": event,22                        "EventBusName": self.bus23                    }24                ]25            )26        else:27            response = self.client.put_events(28                Entries=[29                    {30                        "Time": datetime.now(),31                        "Source": source,32                        "Resources": [],33                        "DetailType": "boto3.client.put_events()",34                        "Detail": event,35                        "EventBusName": self.bus36                    }37                ]38            )...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
