Best Python code snippet using localstack_python
common_test_functions.py
Source:common_test_functions.py  
1import requests2import unittest3import random4from unittest.mock import patch5from parameterized import parameterized6import common_functions7EVENT = {8    'getObjectContext': {9        'outputRoute': 'route',10        'outputToken': 'token',11        'inputS3Url': 'http://s3.amazonaws.com/'12    }13}14class BaseTest(unittest.TestCase):15    def get_compression(self, content):16        pass17    def get_decompression_handler(self, event, context):18        pass19    @classmethod20    def setUpClass(cls):21        if cls is BaseTest:22            raise unittest.SkipTest("Skip BaseTest tests, it's a base class")23        super(BaseTest, cls).setUpClass()24    @parameterized.expand([25        [500], [401], [400], [411], [304], [404], [409], [403], [206], [405], [416], [412], [503], [599]])26    @patch("common_functions._get_s3_client")27    @patch("requests.get")28    def test_error_from_s3(self, status_code, mock_get, mock_s3):29        mock_get.return_value.status_code = status_code30        def assert_get_decompression_handler(**kwargs):31            self.assertEqual(kwargs.get("RequestRoute"), "route")32            self.assertEqual(kwargs.get("RequestToken"), "token")33            self.assertEqual(kwargs.get("StatusCode"), status_code)34        mock_s3.return_value.write_get_object_response.side_effect = assert_get_decompression_handler35        result = self.get_decompression_handler(EVENT, None)36        self.assertIsNone(result)37        mock_s3.return_value.write_get_object_response.assert_called_once()38    @patch("common_functions._get_s3_client")39    @patch("requests.get")40    def test_with_dummy_text(self, mock_get, mock_s3):41        dummy_string = "This is some dummy text"42        dummy_content = bytes(dummy_string, "utf-8")43        data = self.get_compression(dummy_content)44        mock_get.return_value.status_code = 20045        mock_get.return_value.headers = {}46        mock_get.return_value.raw = data47        def assert_get_decompression_handler(**kwargs):48            self.assertEqual(kwargs.get("Body").read(1024), dummy_content)49            self.assertEqual(kwargs.get("RequestRoute"), "route")50            self.assertEqual(kwargs.get("RequestToken"), "token")51            self.assertEqual(kwargs.get("Metadata"), {})52        mock_s3.return_value.write_get_object_response.side_effect = assert_get_decompression_handler53        result = self.get_decompression_handler(EVENT, None)54        self.assertIsNone(result)55        mock_s3.return_value.write_get_object_response.assert_called_once()56    @parameterized.expand([57        ['ErrorMessage', 'some_value', 'x-amz-fwd-error-message', 'some_value'],58        ['ContentLanguage', 'some_value', 'Content-Language', 'some_value'],59        ['MissingMeta', 'some_value', 'x-amz-missing-meta', 'some_value'],60        ['ObjectLockLegalHoldStatus', 'some_value', 'x-amz-object-lock-legal-hold', 'some_value'],61        ['ReplicationStatus', 'some_value', 'x-amz-replication-status', 'some_value'],62        ['RequestCharged', 'some_value', 'x-fwd-header-x-amz-request-charged', 'some_value'],63        ['Restore', 'some_value', 'x-amz-restore', 'some_value'],64        ['ServerSideEncryption', 'some_value', 'x-amz-server-side-encryption', 'some_value'],65        ['SSECustomerAlgorithm', 'some_value', 'x-amz-server-side-encryption-customer-algorithm', 'some_value'],66        ['SSEKMSKeyId', 'some_value', 'x-amz-server-side-encryption-aws-kms-key-id', 'some_value'],67        ['SSECustomerKeyMD5', 'some_value', 'x-amz-server-side-encryption-customer-key-MD5', 'some_value'],68        ['StorageClass', 'some_value', 'x-amz-storage-class', 'some_value'],69        ['TagCount', 5, 'x-amz-tagging-count', '5'],70        ['VersionId', 'some_value', 'x-amz-version-id', 'some_value'],71        ['Metadata', {'test-a': 'some_value'}, 'x-amz-meta-test-a', 'some_value']72    ])73    @patch("common_functions._get_s3_client")74    @patch("requests.get")75    def test_return_with_headers(self, param_name, param_value, header, header_value, mock_get, mock_s3):76        dummy_string = "This is some dummy text"77        dummy_content = bytes(dummy_string, "utf-8")78        data = self.get_compression(dummy_content)79        mock_get.return_value.status_code = 20080        mock_get.return_value.headers = {header: header_value}81        mock_get.return_value.raw = data82        def assert_get_decompression_handler(**kwargs):83            self.assertEqual(kwargs.get("Body").read(1024), dummy_content)84            self.assertEqual(kwargs.get("RequestRoute"), "route")85            self.assertEqual(kwargs.get("RequestToken"), "token")86            self.assertEqual(kwargs.get(param_name), param_value)87        mock_s3.return_value.write_get_object_response.side_effect = assert_get_decompression_handler88        result = self.get_decompression_handler(EVENT, None)89        self.assertIsNone(result)90        mock_s3.return_value.write_get_object_response.assert_called_once()91    @patch("common_functions._get_s3_client")92    @patch("requests.get")93    def test_error_retrieving_data(self, mock_get, mock_s3):94        mock_get.side_effect = requests.exceptions.RequestException("Some error")95        # mock_get.side_effect = MockGetResponse96        def assert_get_decompression_handler(**kwargs):97            self.assertEqual(kwargs.get("StatusCode"), 500)98            self.assertEqual(kwargs.get("RequestRoute"), "route")99            self.assertEqual(kwargs.get("RequestToken"), "token")100        mock_s3.return_value.write_get_object_response.side_effect = assert_get_decompression_handler101        result = self.get_decompression_handler(EVENT, None)102        self.assertIsNone(result)...index.py
Source:index.py  
...38        response = client.get_object(39            Bucket=BUCKET_NAME,40            Key=key,41        )42        client.write_get_object_response(43            Body=response['Body'].read().decode('utf8'),44            RequestRoute=request_route,45            RequestToken=request_token)46        47        return {'status_code': 200}48    # arg is like49    # 202001: tagged version50    # latest: latest tagged version51    # earliest: oldest tagged version52    first = True53    response = {}54    55    kargs = {56        'Bucket': BUCKET_NAME,57        'Prefix': key,58        'MaxKeys': 1000,59    }60    61    versions = []62    tags = {}63    64    while True:65        if first:66            first = False67        elif response['IsTruncated']:68            kargs.update({69                'KeyMarker': response['NextKeyMarker'],70                'VersionIdMarker': response['NextVersionIdMarker'],71            })72        else:73            break74        75        response = client.list_object_versions(**kargs)76        77        if 'Versions' in response:78            for version in response['Versions']:79                versions.append({80                    'VersionId': version['VersionId'],81                    'LastModified': version['LastModified'],82                    'IsLatest': version['IsLatest'],83                })84    85    for version in versions:86        response = client.get_object_tagging(87            Bucket=BUCKET_NAME,88            Key=key,89            VersionId=version['VersionId'],90        )91        92        print(response)93    94        for tag in response['TagSet']:95            if tag['Key'] == TAG_NAME:96                if not tag['Value'] in tags:97                    tags[tag['Value']] = []98                tags[tag['Value']].append({99                    'VersionId': version['VersionId'],100                    'LastModified': version['LastModified']101                })102    103    item = None104    105    sorted_versions = sorted(tags.items())106    107    if arg == 'log':108        client.write_get_object_response(109            Body=json.dumps(sorted_versions, default=json_default_encode),110            RequestRoute=request_route,111            RequestToken=request_token112        )113        return {'status_code': 200}114    # if same tag is found, selet latest.115    if arg == 'latest':116        item = sorted(sorted_versions[-1][1], key=lambda x:x['LastModified'])[-1]117    elif arg == 'earliest':118        item = sorted(sorted_versions[0][1], key=lambda x:x['LastModified'])[-1]119    elif arg in tags:120        item = sorted(tags[arg], key=lambda x:x['LastModified'])[-1]121    122    if not item:123        client.write_get_object_response(124            Body='no data',125            RequestRoute=request_route,126            RequestToken=request_token127        )128        return {'status_code': 200}129    130    response = client.get_object(131        Bucket=BUCKET_NAME,132        Key=key,133        VersionId=item['VersionId'],134    )135    136    client.write_get_object_response(137        Body=response['Body'].read().decode('utf8'),138        RequestRoute=request_route,139        RequestToken=request_token140    )...lambda.py
Source:lambda.py  
...24        print(f'vir_name: {vir_name}')25        s3 = boto3.client('s3') 26        if infected:27            print('file was infected')28            s3.write_get_object_response(29                Body='',30                RequestRoute=request_route,31                RequestToken=request_token,32                StatusCode=403,33                ErrorCode='Forbidden',34                ErrorMessage='Forbidden')35            36        else:37            print('file was not infected')38            # Write object back to S3 Object Lambda39            s3 = boto3.client('s3')40            s3.write_get_object_response(41                Body=scanfile,42                RequestRoute=request_route,43                RequestToken=request_token)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
