Best Python code snippet using localstack_python
test_s3_notifications_sqs.py
Source:test_s3_notifications_sqs.py  
...69            ]70        ),71    )72@pytest.fixture73def s3_create_sqs_bucket_notification(s3_client, sqs_client) -> NotificationFactory:74    """75    A factory fixture for creating sqs bucket notifications.76    :param s3_client:77    :param sqs_client:78    :return:79    """80    def factory(bucket_name: str, queue_url: str, events: List["EventType"]):81        return create_sqs_bucket_notification(s3_client, sqs_client, bucket_name, queue_url, events)82    return factory83def sqs_collect_s3_events(84    sqs_client: "SQSClient", queue_url: str, min_events: int, timeout: int = 1085) -> List[Dict]:86    """87    Polls the given queue for the given amount of time and extracts and flattens from the received messages all88    events (messages that have a "Records" field in their body, and where the records can be json-deserialized).89    :param sqs_client: the boto3 client to use90    :param queue_url: the queue URL to listen from91    :param min_events: the minimum number of events to receive to wait for92    :param timeout: the number of seconds to wait before raising an assert error93    :return: a list with the deserialized records from the SQS messages94    """95    events = []96    def collect_events() -> int:97        _response = sqs_client.receive_message(98            QueueUrl=queue_url, WaitTimeSeconds=timeout, MaxNumberOfMessages=199        )100        messages = _response.get("Messages", [])101        if not messages:102            LOG.info("no messages received from %s after %d seconds", queue_url, timeout)103        for m in messages:104            body = m["Body"]105            # see https://www.mikulskibartosz.name/what-is-s3-test-event/106            if "s3:TestEvent" in body:107                continue108            assert "Records" in body, "Unexpected event received"109            doc = json.loads(body)110            events.extend(doc["Records"])111        return len(events)112    assert poll_condition(lambda: collect_events() >= min_events, timeout=timeout)113    return events114class TestS3NotificationsToSQS:115    @pytest.mark.aws_validated116    def test_object_created_put(117        self,118        s3_client,119        sqs_client,120        s3_create_bucket,121        sqs_create_queue,122        s3_create_sqs_bucket_notification,123    ):124        # setup fixture125        bucket_name = s3_create_bucket()126        queue_url = sqs_create_queue()127        s3_create_sqs_bucket_notification(bucket_name, queue_url, ["s3:ObjectCreated:Put"])128        s3_client.put_bucket_versioning(129            Bucket=bucket_name, VersioningConfiguration={"Status": "Enabled"}130        )131        obj0 = s3_client.put_object(Bucket=bucket_name, Key="my_key_0", Body="something")132        obj1 = s3_client.put_object(Bucket=bucket_name, Key="my_key_1", Body="something else")133        # collect s3 events from SQS queue134        events = sqs_collect_s3_events(sqs_client, queue_url, min_events=2)135        assert len(events) == 2, f"unexpected number of events in {events}"136        # assert137        assert events[0]["eventSource"] == "aws:s3"138        assert events[0]["eventName"] == "ObjectCreated:Put"139        assert events[0]["s3"]["bucket"]["name"] == bucket_name140        assert events[0]["s3"]["object"]["key"] == "my_key_0"141        assert events[0]["s3"]["object"]["size"] == 9142        assert events[0]["s3"]["object"]["versionId"]143        assert obj0["VersionId"] == events[0]["s3"]["object"]["versionId"]144        assert events[1]["eventSource"] == "aws:s3"145        assert events[0]["eventName"] == "ObjectCreated:Put"146        assert events[1]["s3"]["bucket"]["name"] == bucket_name147        assert events[1]["s3"]["object"]["key"] == "my_key_1"148        assert events[1]["s3"]["object"]["size"] == 14149        assert events[1]["s3"]["object"]["versionId"]150        assert obj1["VersionId"] == events[1]["s3"]["object"]["versionId"]151    @pytest.mark.aws_validated152    def test_object_created_copy(153        self,154        s3_client,155        sqs_client,156        s3_create_bucket,157        sqs_create_queue,158        s3_create_sqs_bucket_notification,159    ):160        # setup fixture161        bucket_name = s3_create_bucket()162        queue_url = sqs_create_queue()163        s3_create_sqs_bucket_notification(bucket_name, queue_url, ["s3:ObjectCreated:Copy"])164        src_key = "src-dest-%s" % short_uid()165        dest_key = "key-dest-%s" % short_uid()166        s3_client.put_object(Bucket=bucket_name, Key=src_key, Body="something")167        assert not sqs_collect_s3_events(168            sqs_client, queue_url, 0, timeout=1169        ), "unexpected event triggered for put_object"170        s3_client.copy_object(171            Bucket=bucket_name,172            CopySource={"Bucket": bucket_name, "Key": src_key},173            Key=dest_key,174        )175        events = sqs_collect_s3_events(sqs_client, queue_url, 1)176        assert len(events) == 1, f"unexpected number of events in {events}"177        assert events[0]["eventSource"] == "aws:s3"178        assert events[0]["eventName"] == "ObjectCreated:Copy"179        assert events[0]["s3"]["bucket"]["name"] == bucket_name180        assert events[0]["s3"]["object"]["key"] == dest_key181    @pytest.mark.aws_validated182    def test_object_created_and_object_removed(183        self,184        s3_client,185        sqs_client,186        s3_create_bucket,187        sqs_create_queue,188        s3_create_sqs_bucket_notification,189    ):190        # setup fixture191        bucket_name = s3_create_bucket()192        queue_url = sqs_create_queue()193        s3_create_sqs_bucket_notification(194            bucket_name, queue_url, ["s3:ObjectCreated:*", "s3:ObjectRemoved:*"]195        )196        src_key = "src-dest-%s" % short_uid()197        dest_key = "key-dest-%s" % short_uid()198        # event0 = PutObject199        s3_client.put_object(Bucket=bucket_name, Key=src_key, Body="something")200        # event1 = CopyObject201        s3_client.copy_object(202            Bucket=bucket_name,203            CopySource={"Bucket": bucket_name, "Key": src_key},204            Key=dest_key,205        )206        # event3 = DeleteObject207        s3_client.delete_object(Bucket=bucket_name, Key=src_key)208        # collect events209        events = sqs_collect_s3_events(sqs_client, queue_url, 3)210        assert len(events) == 3, f"unexpected number of events in {events}"211        assert events[0]["eventName"] == "ObjectCreated:Put"212        assert events[0]["s3"]["bucket"]["name"] == bucket_name213        assert events[0]["s3"]["object"]["key"] == src_key214        assert events[1]["eventName"] == "ObjectCreated:Copy"215        assert events[1]["s3"]["bucket"]["name"] == bucket_name216        assert events[1]["s3"]["object"]["key"] == dest_key217        assert events[2]["eventName"] == "ObjectRemoved:Delete"218        assert events[2]["s3"]["bucket"]["name"] == bucket_name219        assert events[2]["s3"]["object"]["key"] == src_key220    @pytest.mark.aws_validated221    def test_object_created_complete_multipart_upload(222        self,223        s3_client,224        sqs_client,225        s3_create_bucket,226        sqs_create_queue,227        s3_create_sqs_bucket_notification,228        tmpdir,229    ):230        # setup fixture231        bucket_name = s3_create_bucket()232        queue_url = sqs_create_queue()233        key = "test-key"234        s3_create_sqs_bucket_notification(bucket_name, queue_url, ["s3:ObjectCreated:*"])235        # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3.html#multipart-transfers236        config = TransferConfig(multipart_threshold=5 * KB, multipart_chunksize=1 * KB)237        file = tmpdir / "test-file.bin"238        data = b"1" * (6 * KB)  # create 6 kilobytes of ones239        file.write(data=data, mode="w")240        s3_client.upload_file(241            Bucket=bucket_name, Key=key, Filename=str(file.realpath()), Config=config242        )243        events = sqs_collect_s3_events(sqs_client, queue_url, 1)244        assert events[0]["eventName"] == "ObjectCreated:CompleteMultipartUpload"245        assert events[0]["s3"]["bucket"]["name"] == bucket_name246        assert events[0]["s3"]["object"]["key"] == key247        assert events[0]["s3"]["object"]["size"] == file.size()248    @pytest.mark.aws_validated249    def test_key_encoding(250        self,251        s3_client,252        sqs_client,253        s3_create_bucket,254        sqs_create_queue,255        s3_create_sqs_bucket_notification,256    ):257        # test for https://github.com/localstack/localstack/issues/2741258        bucket_name = s3_create_bucket()259        queue_url = sqs_create_queue()260        s3_create_sqs_bucket_notification(bucket_name, queue_url, ["s3:ObjectCreated:*"])261        key = "a@b"262        key_encoded = "a%40b"263        s3_client.put_object(Bucket=bucket_name, Key=key, Body="something")264        events = sqs_collect_s3_events(sqs_client, queue_url, min_events=1)265        assert events[0]["eventName"] == "ObjectCreated:Put"266        assert events[0]["s3"]["object"]["key"] == key_encoded267    @pytest.mark.aws_validated268    def test_object_created_put_with_presigned_url_upload(269        self,270        s3_client,271        sqs_client,272        s3_create_bucket,273        sqs_create_queue,274        s3_create_sqs_bucket_notification,275    ):276        bucket_name = s3_create_bucket()277        queue_url = sqs_create_queue()278        key = "key-by-hostname"279        s3_create_sqs_bucket_notification(bucket_name, queue_url, ["s3:ObjectCreated:*"])280        url = s3_client.generate_presigned_url(281            "put_object", Params={"Bucket": bucket_name, "Key": key}282        )283        requests.put(url, data="something", verify=False)284        events = sqs_collect_s3_events(sqs_client, queue_url, 1)285        assert events[0]["eventName"] == "ObjectCreated:Put"286        assert events[0]["s3"]["object"]["key"] == key287    @pytest.mark.aws_validated288    def test_xray_header(289        self,290        s3_client,291        sqs_client,292        s3_create_bucket,293        sqs_create_queue,294        s3_create_sqs_bucket_notification,295        cleanups,296    ):297        # test for https://github.com/localstack/localstack/issues/3686298        # add boto hook299        def add_xray_header(request, **kwargs):300            request.headers[301                "X-Amzn-Trace-Id"302            ] = "Root=1-3152b799-8954dae64eda91bc9a23a7e8;Parent=7fa8c0f79203be72;Sampled=1"303        s3_client.meta.events.register("before-send.s3.*", add_xray_header)304        # make sure the hook gets cleaned up after the test305        cleanups.append(306            lambda: s3_client.meta.events.unregister("before-send.s3.*", add_xray_header)307        )308        key = "test-data"309        bucket_name = s3_create_bucket()310        queue_url = sqs_create_queue()311        s3_create_sqs_bucket_notification(bucket_name, queue_url, ["s3:ObjectCreated:*"])312        # put an object where the bucket_name is in the path313        s3_client.put_object(Bucket=bucket_name, Key=key, Body="something")314        messages = []315        def get_messages():316            resp = sqs_client.receive_message(317                QueueUrl=queue_url,318                AttributeNames=["AWSTraceHeader"],319                MessageAttributeNames=["All"],320                VisibilityTimeout=0,321            )322            for m in resp["Messages"]:323                if "s3:TestEvent" in m["Body"]:324                    continue325                messages.append(m)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
