Best Python code snippet using localstack_python
test_s3_notifications_sqs.py
Source:test_s3_notifications_sqs.py  
...79    """80    def factory(bucket_name: str, queue_url: str, events: List["EventType"]):81        return create_sqs_bucket_notification(s3_client, sqs_client, bucket_name, queue_url, events)82    return factory83def sqs_collect_s3_events(84    sqs_client: "SQSClient", queue_url: str, min_events: int, timeout: int = 1085) -> List[Dict]:86    """87    Polls the given queue for the given amount of time and extracts and flattens from the received messages all88    events (messages that have a "Records" field in their body, and where the records can be json-deserialized).89    :param sqs_client: the boto3 client to use90    :param queue_url: the queue URL to listen from91    :param min_events: the minimum number of events to receive to wait for92    :param timeout: the number of seconds to wait before raising an assert error93    :return: a list with the deserialized records from the SQS messages94    """95    events = []96    def collect_events() -> int:97        _response = sqs_client.receive_message(98            QueueUrl=queue_url, WaitTimeSeconds=timeout, MaxNumberOfMessages=199        )100        messages = _response.get("Messages", [])101        if not messages:102            LOG.info("no messages received from %s after %d seconds", queue_url, timeout)103        for m in messages:104            body = m["Body"]105            # see https://www.mikulskibartosz.name/what-is-s3-test-event/106            if "s3:TestEvent" in body:107                continue108            assert "Records" in body, "Unexpected event received"109            doc = json.loads(body)110            events.extend(doc["Records"])111        return len(events)112    assert poll_condition(lambda: collect_events() >= min_events, timeout=timeout)113    return events114class TestS3NotificationsToSQS:115    @pytest.mark.aws_validated116    @pytest.mark.skip_snapshot_verify(paths=["$..s3.object.eTag"])117    def test_object_created_put(118        self,119        s3_client,120        sqs_client,121        s3_create_bucket,122        sqs_create_queue,123        s3_create_sqs_bucket_notification,124        snapshot,125    ):126        snapshot.add_transformer(snapshot.transform.sqs_api())127        snapshot.add_transformer(snapshot.transform.s3_api())128        # setup fixture129        bucket_name = s3_create_bucket()130        queue_url = sqs_create_queue()131        s3_create_sqs_bucket_notification(bucket_name, queue_url, ["s3:ObjectCreated:Put"])132        s3_client.put_bucket_versioning(133            Bucket=bucket_name, VersioningConfiguration={"Status": "Enabled"}134        )135        obj0 = s3_client.put_object(Bucket=bucket_name, Key="my_key_0", Body="something")136        obj1 = s3_client.put_object(Bucket=bucket_name, Key="my_key_1", Body="something else")137        # collect s3 events from SQS queue138        events = sqs_collect_s3_events(sqs_client, queue_url, min_events=2)139        assert len(events) == 2, f"unexpected number of events in {events}"140        # order seems not be guaranteed - sort so we can rely on the order141        events.sort(key=lambda x: x["s3"]["object"]["size"])142        snapshot.match("receive_messages", {"messages": events})143        # assert144        assert events[0]["eventSource"] == "aws:s3"145        assert events[0]["eventName"] == "ObjectCreated:Put"146        assert events[0]["s3"]["bucket"]["name"] == bucket_name147        assert events[0]["s3"]["object"]["key"] == "my_key_0"148        assert events[0]["s3"]["object"]["size"] == 9149        assert events[0]["s3"]["object"]["versionId"]150        assert obj0["VersionId"] == events[0]["s3"]["object"]["versionId"]151        assert events[1]["eventSource"] == "aws:s3"152        assert events[0]["eventName"] == "ObjectCreated:Put"153        assert events[1]["s3"]["bucket"]["name"] == bucket_name154        assert events[1]["s3"]["object"]["key"] == "my_key_1"155        assert events[1]["s3"]["object"]["size"] == 14156        assert events[1]["s3"]["object"]["versionId"]157        assert obj1["VersionId"] == events[1]["s3"]["object"]["versionId"]158    @pytest.mark.aws_validated159    @pytest.mark.skip_snapshot_verify(paths=["$..s3.object.eTag", "$..s3.object.versionId"])160    def test_object_created_copy(161        self,162        s3_client,163        sqs_client,164        s3_create_bucket,165        sqs_create_queue,166        s3_create_sqs_bucket_notification,167        snapshot,168    ):169        snapshot.add_transformer(snapshot.transform.sqs_api())170        snapshot.add_transformer(snapshot.transform.s3_api())171        snapshot.add_transformer(snapshot.transform.jsonpath("$..s3.object.key", "object-key"))172        # setup fixture173        bucket_name = s3_create_bucket()174        queue_url = sqs_create_queue()175        s3_create_sqs_bucket_notification(bucket_name, queue_url, ["s3:ObjectCreated:Copy"])176        src_key = "src-dest-%s" % short_uid()177        dest_key = "key-dest-%s" % short_uid()178        s3_client.put_object(Bucket=bucket_name, Key=src_key, Body="something")179        assert not sqs_collect_s3_events(180            sqs_client, queue_url, 0, timeout=1181        ), "unexpected event triggered for put_object"182        s3_client.copy_object(183            Bucket=bucket_name,184            CopySource={"Bucket": bucket_name, "Key": src_key},185            Key=dest_key,186        )187        events = sqs_collect_s3_events(sqs_client, queue_url, 1)188        assert len(events) == 1, f"unexpected number of events in {events}"189        snapshot.match("receive_messages", {"messages": events})190        assert events[0]["eventSource"] == "aws:s3"191        assert events[0]["eventName"] == "ObjectCreated:Copy"192        assert events[0]["s3"]["bucket"]["name"] == bucket_name193        assert events[0]["s3"]["object"]["key"] == dest_key194    @pytest.mark.aws_validated195    @pytest.mark.skip_snapshot_verify(196        paths=["$..s3.object.eTag", "$..s3.object.versionId", "$..s3.object.size"]197    )198    def test_object_created_and_object_removed(199        self,200        s3_client,201        sqs_client,202        s3_create_bucket,203        sqs_create_queue,204        s3_create_sqs_bucket_notification,205        snapshot,206    ):207        snapshot.add_transformer(snapshot.transform.sqs_api())208        snapshot.add_transformer(snapshot.transform.s3_api())209        snapshot.add_transformer(snapshot.transform.jsonpath("$..s3.object.key", "object-key"))210        # setup fixture211        bucket_name = s3_create_bucket()212        queue_url = sqs_create_queue()213        s3_create_sqs_bucket_notification(214            bucket_name, queue_url, ["s3:ObjectCreated:*", "s3:ObjectRemoved:*"]215        )216        src_key = "src-dest-%s" % short_uid()217        dest_key = "key-dest-%s" % short_uid()218        # event0 = PutObject219        s3_client.put_object(Bucket=bucket_name, Key=src_key, Body="something")220        # event1 = CopyObject221        s3_client.copy_object(222            Bucket=bucket_name,223            CopySource={"Bucket": bucket_name, "Key": src_key},224            Key=dest_key,225        )226        # event3 = DeleteObject227        s3_client.delete_object(Bucket=bucket_name, Key=src_key)228        # collect events229        events = sqs_collect_s3_events(sqs_client, queue_url, 3)230        assert len(events) == 3, f"unexpected number of events in {events}"231        # order seems not be guaranteed - sort so we can rely on the order232        events.sort(key=lambda x: x["eventName"])233        snapshot.match("receive_messages", {"messages": events})234        assert events[1]["eventName"] == "ObjectCreated:Put"235        assert events[1]["s3"]["bucket"]["name"] == bucket_name236        assert events[1]["s3"]["object"]["key"] == src_key237        assert events[0]["eventName"] == "ObjectCreated:Copy"238        assert events[0]["s3"]["bucket"]["name"] == bucket_name239        assert events[0]["s3"]["object"]["key"] == dest_key240        assert events[2]["eventName"] == "ObjectRemoved:Delete"241        assert events[2]["s3"]["bucket"]["name"] == bucket_name242        assert events[2]["s3"]["object"]["key"] == src_key243    @pytest.mark.aws_validated244    @pytest.mark.skip_snapshot_verify(paths=["$..s3.object.eTag", "$..s3.object.versionId"])245    def test_object_created_complete_multipart_upload(246        self,247        s3_client,248        sqs_client,249        s3_create_bucket,250        sqs_create_queue,251        s3_create_sqs_bucket_notification,252        tmpdir,253        snapshot,254    ):255        snapshot.add_transformer(snapshot.transform.sqs_api())256        snapshot.add_transformer(snapshot.transform.s3_api())257        # setup fixture258        bucket_name = s3_create_bucket()259        queue_url = sqs_create_queue()260        key = "test-key"261        s3_create_sqs_bucket_notification(bucket_name, queue_url, ["s3:ObjectCreated:*"])262        # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3.html#multipart-transfers263        config = TransferConfig(multipart_threshold=5 * KB, multipart_chunksize=1 * KB)264        file = tmpdir / "test-file.bin"265        data = b"1" * (6 * KB)  # create 6 kilobytes of ones266        file.write(data=data, mode="w")267        s3_client.upload_file(268            Bucket=bucket_name, Key=key, Filename=str(file.realpath()), Config=config269        )270        events = sqs_collect_s3_events(sqs_client, queue_url, 1)271        snapshot.match("receive_messages", {"messages": events})272        assert events[0]["eventName"] == "ObjectCreated:CompleteMultipartUpload"273        assert events[0]["s3"]["bucket"]["name"] == bucket_name274        assert events[0]["s3"]["object"]["key"] == key275        assert events[0]["s3"]["object"]["size"] == file.size()276    @pytest.mark.aws_validated277    @pytest.mark.skip_snapshot_verify(paths=["$..s3.object.eTag", "$..s3.object.versionId"])278    def test_key_encoding(279        self,280        s3_client,281        sqs_client,282        s3_create_bucket,283        sqs_create_queue,284        s3_create_sqs_bucket_notification,285        snapshot,286    ):287        snapshot.add_transformer(snapshot.transform.sqs_api())288        snapshot.add_transformer(snapshot.transform.s3_api())289        # test for https://github.com/localstack/localstack/issues/2741290        bucket_name = s3_create_bucket()291        queue_url = sqs_create_queue()292        s3_create_sqs_bucket_notification(bucket_name, queue_url, ["s3:ObjectCreated:*"])293        key = "a@b"294        key_encoded = "a%40b"295        s3_client.put_object(Bucket=bucket_name, Key=key, Body="something")296        events = sqs_collect_s3_events(sqs_client, queue_url, min_events=1)297        snapshot.match("receive_messages", {"messages": events})298        assert events[0]["eventName"] == "ObjectCreated:Put"299        assert events[0]["s3"]["object"]["key"] == key_encoded300    @pytest.mark.aws_validated301    @pytest.mark.skip_snapshot_verify(paths=["$..s3.object.eTag", "$..s3.object.versionId"])302    def test_object_created_put_with_presigned_url_upload(303        self,304        s3_client,305        sqs_client,306        s3_create_bucket,307        sqs_create_queue,308        s3_create_sqs_bucket_notification,309        snapshot,310    ):311        snapshot.add_transformer(snapshot.transform.sqs_api())312        snapshot.add_transformer(snapshot.transform.s3_api())313        bucket_name = s3_create_bucket()314        queue_url = sqs_create_queue()315        key = "key-by-hostname"316        s3_create_sqs_bucket_notification(bucket_name, queue_url, ["s3:ObjectCreated:*"])317        url = s3_client.generate_presigned_url(318            "put_object", Params={"Bucket": bucket_name, "Key": key}319        )320        requests.put(url, data="something", verify=False)321        events = sqs_collect_s3_events(sqs_client, queue_url, 1)322        snapshot.match("receive_messages", {"messages": events})323        assert events[0]["eventName"] == "ObjectCreated:Put"324        assert events[0]["s3"]["object"]["key"] == key325    @pytest.mark.aws_validated326    @pytest.mark.skip_snapshot_verify(327        paths=[328            "$..s3.object.eTag",329            "$..s3.object.versionId",330            "$..s3.object.size",331            "$..s3.object.sequencer",332            "$..eventVersion",333        ]334    )335    def test_object_tagging_put_event(336        self,337        s3_client,338        sqs_client,339        s3_create_bucket,340        sqs_create_queue,341        s3_create_sqs_bucket_notification,342        snapshot,343    ):344        snapshot.add_transformer(snapshot.transform.sqs_api())345        snapshot.add_transformer(snapshot.transform.s3_api())346        snapshot.add_transformer(snapshot.transform.jsonpath("$..s3.object.key", "object-key"))347        # setup fixture348        bucket_name = s3_create_bucket()349        queue_url = sqs_create_queue()350        s3_create_sqs_bucket_notification(bucket_name, queue_url, ["s3:ObjectTagging:Put"])351        dest_key = "key-dest-%s" % short_uid()352        s3_client.put_object(Bucket=bucket_name, Key=dest_key, Body="FooBarBlitz")353        assert not sqs_collect_s3_events(354            sqs_client, queue_url, 0, timeout=1355        ), "unexpected event triggered for put_object"356        s3_client.put_object_tagging(357            Bucket=bucket_name,358            Key=dest_key,359            Tagging={360                "TagSet": [361                    {"Key": "swallow_type", "Value": "african"},362                ]363            },364        )365        events = sqs_collect_s3_events(sqs_client, queue_url, 1)366        assert len(events) == 1, f"unexpected number of events in {events}"367        snapshot.match("receive_messages", {"messages": events})368        assert events[0]["eventSource"] == "aws:s3"369        assert events[0]["eventName"] == "ObjectTagging:Put"370        assert events[0]["s3"]["bucket"]["name"] == bucket_name371        assert events[0]["s3"]["object"]["key"] == dest_key372    @pytest.mark.aws_validated373    @pytest.mark.skip_snapshot_verify(374        paths=[375            "$..s3.object.eTag",376            "$..s3.object.versionId",377            "$..s3.object.size",378            "$..s3.object.sequencer",379            "$..eventVersion",380        ]381    )382    def test_object_tagging_delete_event(383        self,384        s3_client,385        sqs_client,386        s3_create_bucket,387        sqs_create_queue,388        s3_create_sqs_bucket_notification,389        snapshot,390    ):391        snapshot.add_transformer(snapshot.transform.sqs_api())392        snapshot.add_transformer(snapshot.transform.s3_api())393        snapshot.add_transformer(snapshot.transform.jsonpath("$..s3.object.key", "object-key"))394        # setup fixture395        bucket_name = s3_create_bucket()396        queue_url = sqs_create_queue()397        s3_create_sqs_bucket_notification(bucket_name, queue_url, ["s3:ObjectTagging:Delete"])398        dest_key = "key-dest-%s" % short_uid()399        s3_client.put_object(Bucket=bucket_name, Key=dest_key, Body="FooBarBlitz")400        assert not sqs_collect_s3_events(401            sqs_client, queue_url, 0, timeout=1402        ), "unexpected event triggered for put_object"403        s3_client.put_object_tagging(404            Bucket=bucket_name,405            Key=dest_key,406            Tagging={407                "TagSet": [408                    {"Key": "swallow_type", "Value": "african"},409                ]410            },411        )412        s3_client.delete_object_tagging(413            Bucket=bucket_name,414            Key=dest_key,415        )416        events = sqs_collect_s3_events(sqs_client, queue_url, 1)417        assert len(events) == 1, f"unexpected number of events in {events}"418        snapshot.match("receive_messages", {"messages": events})419        assert events[0]["eventSource"] == "aws:s3"420        assert events[0]["eventName"] == "ObjectTagging:Delete"421        assert events[0]["s3"]["bucket"]["name"] == bucket_name422        assert events[0]["s3"]["object"]["key"] == dest_key423    @pytest.mark.aws_validated424    @pytest.mark.skip_snapshot_verify(paths=["$..s3.object.eTag", "$..s3.object.versionId"])425    def test_xray_header(426        self,427        s3_client,428        sqs_client,429        s3_create_bucket,430        sqs_create_queue,...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
