Best Python code snippet using localstack_python
test_skeleton.py
Source:test_skeleton.py  
...96        message_group_id: String = None,97    ) -> SendMessageResult:98        raise NotImplementedError99""" Test implementations """100def _get_sqs_request_headers():101    return {102        "Remote-Addr": "127.0.0.1",103        "Host": "localhost:4566",104        "Accept-Encoding": "identity",105        "Content-Type": "application/x-www-form-urlencoded; charset=utf-8",106        "User-Agent": "aws-cli/1.20.47 Python/3.8.10 Linux/5.4.0-88-generic botocore/1.21.47",107        "X-Amz-Date": "20211009T185815Z",108        "Authorization": "AWS4-HMAC-SHA256 Credential=test/20211009/us-east-1/sqs/aws4_request, SignedHeaders=content-type;host;x-amz-date, Signature=d9f93b13a07dda8cba650fba583fab92e0c72465e5e02fb56a3bb4994aefc339",109        "Content-Length": "169",110        "x-localstack-request-url": "http://localhost:4566/",111        "X-Forwarded-For": "127.0.0.1, localhost:4566",112    }113def test_skeleton_e2e_sqs_send_message():114    sqs_service = load_service("sqs")115    skeleton = Skeleton(sqs_service, TestSqsApi())116    context = RequestContext()117    context.account = "test"118    context.region = "us-west-1"119    context.service = sqs_service120    context.request = {121        "method": "POST",122        "path": "/",123        "body": "Action=SendMessage&Version=2012-11-05&QueueUrl=http%3A%2F%2Flocalhost%3A4566%2F000000000000%2Ftf-acc-test-queue&MessageBody=%7B%22foo%22%3A+%22bared%22%7D&DelaySeconds=2",124        "headers": _get_sqs_request_headers(),125    }126    result = skeleton.invoke(context)127    # Use the parser from botocore to parse the serialized response128    response_parser = create_parser(sqs_service.protocol)129    parsed_response = response_parser.parse(130        result, sqs_service.operation_model("SendMessage").output_shape131    )132    # Test the ResponseMetadata and delete it afterwards133    assert "ResponseMetadata" in parsed_response134    assert "RequestId" in parsed_response["ResponseMetadata"]135    assert len(parsed_response["ResponseMetadata"]["RequestId"]) == 52136    assert "HTTPStatusCode" in parsed_response["ResponseMetadata"]137    assert parsed_response["ResponseMetadata"]["HTTPStatusCode"] == 200138    del parsed_response["ResponseMetadata"]139    # Compare the (remaining) actual payload140    assert parsed_response == {141        "MD5OfMessageBody": "String",142        "MD5OfMessageAttributes": "String",143        "MD5OfMessageSystemAttributes": "String",144        "MessageId": "String",145        "SequenceNumber": "String",146    }147def test_skeleton_e2e_sqs_send_message_not_implemented():148    sqs_service = load_service("sqs")149    skeleton = Skeleton(sqs_service, TestSqsApiNotImplemented())150    context = RequestContext()151    context.account = "test"152    context.region = "us-west-1"153    context.service = sqs_service154    context.request = {155        "method": "POST",156        "path": "/",157        "body": "Action=SendMessage&Version=2012-11-05&QueueUrl=http%3A%2F%2Flocalhost%3A4566%2F000000000000%2Ftf-acc-test-queue&MessageBody=%7B%22foo%22%3A+%22bared%22%7D&DelaySeconds=2",158        "headers": _get_sqs_request_headers(),159    }160    result = skeleton.invoke(context)161    # Use the parser from botocore to parse the serialized response162    response_parser = create_parser(sqs_service.protocol)163    parsed_response = response_parser.parse(164        result, sqs_service.operation_model("SendMessage").output_shape165    )166    # Test the ResponseMetadata167    assert "ResponseMetadata" in parsed_response168    assert "RequestId" in parsed_response["ResponseMetadata"]169    assert len(parsed_response["ResponseMetadata"]["RequestId"]) == 52170    assert "HTTPStatusCode" in parsed_response["ResponseMetadata"]171    assert parsed_response["ResponseMetadata"]["HTTPStatusCode"] == 501172    # Compare the (remaining) actual eror payload173    assert "Error" in parsed_response174    assert parsed_response["Error"] == {175        "Code": "InternalFailure",176        "Message": "API action 'SendMessage' for service 'sqs' not yet implemented",177    }178def test_dispatch_common_service_exception():179    def delete_queue(_context: RequestContext, _request: ServiceRequest):180        raise CommonServiceException("NonExistentQueue", "No such queue")181    table: DispatchTable = dict()182    table["DeleteQueue"] = delete_queue183    sqs_service = load_service("sqs")184    skeleton = Skeleton(sqs_service, table)185    context = RequestContext()186    context.account = "test"187    context.region = "us-west-1"188    context.service = sqs_service189    context.request = {190        "method": "POST",191        "path": "/",192        "body": "Action=DeleteQueue&Version=2012-11-05&QueueUrl=http%3A%2F%2Flocalhost%3A4566%2F000000000000%2Ftf-acc-test-queue",193        "headers": _get_sqs_request_headers(),194    }195    result = skeleton.invoke(context)196    # Use the parser from botocore to parse the serialized response197    response_parser = create_parser(sqs_service.protocol)198    parsed_response = response_parser.parse(199        result, sqs_service.operation_model("SendMessage").output_shape200    )201    assert "Error" in parsed_response202    assert parsed_response["Error"] == {203        "Code": "NonExistentQueue",204        "Message": "No such queue",205    }206def test_dispatch_missing_method_returns_internal_failure():207    table: DispatchTable = dict()208    sqs_service = load_service("sqs")209    skeleton = Skeleton(sqs_service, table)210    context = RequestContext()211    context.account = "test"212    context.region = "us-west-1"213    context.service = sqs_service214    context.request = {215        "method": "POST",216        "path": "/",217        "body": "Action=DeleteQueue&Version=2012-11-05&QueueUrl=http%3A%2F%2Flocalhost%3A4566%2F000000000000%2Ftf-acc-test-queue",218        "headers": _get_sqs_request_headers(),219    }220    result = skeleton.invoke(context)221    # Use the parser from botocore to parse the serialized response222    response_parser = create_parser(sqs_service.protocol)223    parsed_response = response_parser.parse(224        result, sqs_service.operation_model("SendMessage").output_shape225    )226    assert "Error" in parsed_response227    assert parsed_response["Error"] == {228        "Code": "InternalFailure",229        "Message": "API action 'DeleteQueue' for service 'sqs' not yet implemented",230    }231class TestServiceRequestDispatcher:232    def test_default_dispatcher(self):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
