How to use _get_sqs_request_headers method in localstack

Best Python code snippet using localstack_python

test_skeleton.py

Source:test_skeleton.py Github

copy

Full Screen

...96 message_group_id: String = None,97 ) -> SendMessageResult:98 raise NotImplementedError99""" Test implementations """100def _get_sqs_request_headers():101 return {102 "Remote-Addr": "127.0.0.1",103 "Host": "localhost:4566",104 "Accept-Encoding": "identity",105 "Content-Type": "application/x-www-form-urlencoded; charset=utf-8",106 "User-Agent": "aws-cli/1.20.47 Python/3.8.10 Linux/5.4.0-88-generic botocore/1.21.47",107 "X-Amz-Date": "20211009T185815Z",108 "Authorization": "AWS4-HMAC-SHA256 Credential=test/20211009/us-east-1/sqs/aws4_request, SignedHeaders=content-type;host;x-amz-date, Signature=d9f93b13a07dda8cba650fba583fab92e0c72465e5e02fb56a3bb4994aefc339",109 "Content-Length": "169",110 "x-localstack-request-url": "http://localhost:4566/",111 "X-Forwarded-For": "127.0.0.1, localhost:4566",112 }113def test_skeleton_e2e_sqs_send_message():114 sqs_service = load_service("sqs")115 skeleton = Skeleton(sqs_service, TestSqsApi())116 context = RequestContext()117 context.account = "test"118 context.region = "us-west-1"119 context.service = sqs_service120 context.request = {121 "method": "POST",122 "path": "/",123 "body": "Action=SendMessage&Version=2012-11-05&QueueUrl=http%3A%2F%2Flocalhost%3A4566%2F000000000000%2Ftf-acc-test-queue&MessageBody=%7B%22foo%22%3A+%22bared%22%7D&DelaySeconds=2",124 "headers": _get_sqs_request_headers(),125 }126 result = skeleton.invoke(context)127 # Use the parser from botocore to parse the serialized response128 response_parser = create_parser(sqs_service.protocol)129 parsed_response = response_parser.parse(130 result, sqs_service.operation_model("SendMessage").output_shape131 )132 # Test the ResponseMetadata and delete it afterwards133 assert "ResponseMetadata" in parsed_response134 assert "RequestId" in parsed_response["ResponseMetadata"]135 assert len(parsed_response["ResponseMetadata"]["RequestId"]) == 52136 assert "HTTPStatusCode" in parsed_response["ResponseMetadata"]137 assert parsed_response["ResponseMetadata"]["HTTPStatusCode"] == 200138 del parsed_response["ResponseMetadata"]139 # Compare the (remaining) actual payload140 assert parsed_response == {141 "MD5OfMessageBody": "String",142 "MD5OfMessageAttributes": "String",143 "MD5OfMessageSystemAttributes": "String",144 "MessageId": "String",145 "SequenceNumber": "String",146 }147def test_skeleton_e2e_sqs_send_message_not_implemented():148 sqs_service = load_service("sqs")149 skeleton = Skeleton(sqs_service, TestSqsApiNotImplemented())150 context = RequestContext()151 context.account = "test"152 context.region = "us-west-1"153 context.service = sqs_service154 context.request = {155 "method": "POST",156 "path": "/",157 "body": "Action=SendMessage&Version=2012-11-05&QueueUrl=http%3A%2F%2Flocalhost%3A4566%2F000000000000%2Ftf-acc-test-queue&MessageBody=%7B%22foo%22%3A+%22bared%22%7D&DelaySeconds=2",158 "headers": _get_sqs_request_headers(),159 }160 result = skeleton.invoke(context)161 # Use the parser from botocore to parse the serialized response162 response_parser = create_parser(sqs_service.protocol)163 parsed_response = response_parser.parse(164 result, sqs_service.operation_model("SendMessage").output_shape165 )166 # Test the ResponseMetadata167 assert "ResponseMetadata" in parsed_response168 assert "RequestId" in parsed_response["ResponseMetadata"]169 assert len(parsed_response["ResponseMetadata"]["RequestId"]) == 52170 assert "HTTPStatusCode" in parsed_response["ResponseMetadata"]171 assert parsed_response["ResponseMetadata"]["HTTPStatusCode"] == 501172 # Compare the (remaining) actual eror payload173 assert "Error" in parsed_response174 assert parsed_response["Error"] == {175 "Code": "InternalFailure",176 "Message": "API action 'SendMessage' for service 'sqs' not yet implemented",177 }178def test_dispatch_common_service_exception():179 def delete_queue(_context: RequestContext, _request: ServiceRequest):180 raise CommonServiceException("NonExistentQueue", "No such queue")181 table: DispatchTable = dict()182 table["DeleteQueue"] = delete_queue183 sqs_service = load_service("sqs")184 skeleton = Skeleton(sqs_service, table)185 context = RequestContext()186 context.account = "test"187 context.region = "us-west-1"188 context.service = sqs_service189 context.request = {190 "method": "POST",191 "path": "/",192 "body": "Action=DeleteQueue&Version=2012-11-05&QueueUrl=http%3A%2F%2Flocalhost%3A4566%2F000000000000%2Ftf-acc-test-queue",193 "headers": _get_sqs_request_headers(),194 }195 result = skeleton.invoke(context)196 # Use the parser from botocore to parse the serialized response197 response_parser = create_parser(sqs_service.protocol)198 parsed_response = response_parser.parse(199 result, sqs_service.operation_model("SendMessage").output_shape200 )201 assert "Error" in parsed_response202 assert parsed_response["Error"] == {203 "Code": "NonExistentQueue",204 "Message": "No such queue",205 }206def test_dispatch_missing_method_returns_internal_failure():207 table: DispatchTable = dict()208 sqs_service = load_service("sqs")209 skeleton = Skeleton(sqs_service, table)210 context = RequestContext()211 context.account = "test"212 context.region = "us-west-1"213 context.service = sqs_service214 context.request = {215 "method": "POST",216 "path": "/",217 "body": "Action=DeleteQueue&Version=2012-11-05&QueueUrl=http%3A%2F%2Flocalhost%3A4566%2F000000000000%2Ftf-acc-test-queue",218 "headers": _get_sqs_request_headers(),219 }220 result = skeleton.invoke(context)221 # Use the parser from botocore to parse the serialized response222 response_parser = create_parser(sqs_service.protocol)223 parsed_response = response_parser.parse(224 result, sqs_service.operation_model("SendMessage").output_shape225 )226 assert "Error" in parsed_response227 assert parsed_response["Error"] == {228 "Code": "InternalFailure",229 "Message": "API action 'DeleteQueue' for service 'sqs' not yet implemented",230 }231class TestServiceRequestDispatcher:232 def test_default_dispatcher(self):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful