Best Python code snippet using localstack_python
test_dynamodb.py
Source:test_dynamodb.py  
...10TEST_DDB_TABLE_NAME_4 = 'test-ddb-table-4'11PARTITION_KEY = 'id'12class DynamoDBIntegrationTest (unittest.TestCase):13    def test_non_ascii_chars(self):14        dynamodb = aws_stack.connect_to_resource('dynamodb')15        testutil.create_dynamodb_table(TEST_DDB_TABLE_NAME, partition_key=PARTITION_KEY)16        table = dynamodb.Table(TEST_DDB_TABLE_NAME)17        # write some items containing non-ASCII characters18        items = {19            'id1': {PARTITION_KEY: 'id1', 'data': 'foobar123 â'},20            'id2': {PARTITION_KEY: 'id2', 'data': 'foobar123 £'},21            'id3': {PARTITION_KEY: 'id3', 'data': 'foobar123 ¢'}22        }23        for k, item in items.items():24            table.put_item(Item=item)25        for item_id in items.keys():26            item = table.get_item(Key={PARTITION_KEY: item_id})['Item']27            # need to fix up the JSON and convert str to unicode for Python 228            item1 = json_safe(item)29            item2 = json_safe(items[item_id])30            assert item1 == item231    def test_large_data_download(self):32        dynamodb = aws_stack.connect_to_resource('dynamodb')33        dynamodb_client = aws_stack.connect_to_service('dynamodb')34        testutil.create_dynamodb_table(TEST_DDB_TABLE_NAME_2, partition_key=PARTITION_KEY)35        table = dynamodb.Table(TEST_DDB_TABLE_NAME_2)36        # Create a large amount of items37        num_items = 2038        for i in range(0, num_items):39            item = {PARTITION_KEY: 'id%s' % i, 'data1': 'foobar123 ' * 1000}40            table.put_item(Item=item)41        # Retrieve the items. The data will be transmitted to the client with chunked transfer encoding42        result = table.scan(TableName=TEST_DDB_TABLE_NAME_2)43        assert len(result['Items']) == num_items44        # Clean up45        dynamodb_client.delete_table(TableName=TEST_DDB_TABLE_NAME_2)46    def test_time_to_live(self):47        dynamodb = aws_stack.connect_to_resource('dynamodb')48        dynamodb_client = aws_stack.connect_to_service('dynamodb')49        testutil.create_dynamodb_table(TEST_DDB_TABLE_NAME_3, partition_key=PARTITION_KEY)50        table = dynamodb.Table(TEST_DDB_TABLE_NAME_3)51        # Insert some items to the table52        items = {53            'id1': {PARTITION_KEY: 'id1', 'data': 'IT IS'},54            'id2': {PARTITION_KEY: 'id2', 'data': 'TIME'},55            'id3': {PARTITION_KEY: 'id3', 'data': 'TO LIVE!'}56        }57        for k, item in items.items():58            table.put_item(Item=item)59        # Describe TTL when still unset.60        response = testutil.send_describe_dynamodb_ttl_request(TEST_DDB_TABLE_NAME_3)61        assert response.status_code == 20062        assert json.loads(response._content)['TimeToLiveDescription']['TimeToLiveStatus'] == 'DISABLED'63        # Enable TTL for given table64        response = testutil.send_update_dynamodb_ttl_request(TEST_DDB_TABLE_NAME_3, True)65        assert response.status_code == 20066        assert json.loads(response._content)['TimeToLiveSpecification']['Enabled'] is True67        # Describe TTL status after being enabled.68        response = testutil.send_describe_dynamodb_ttl_request(TEST_DDB_TABLE_NAME_3)69        assert response.status_code == 20070        assert json.loads(response._content)['TimeToLiveDescription']['TimeToLiveStatus'] == 'ENABLED'71        # Disable TTL for given table72        response = testutil.send_update_dynamodb_ttl_request(TEST_DDB_TABLE_NAME_3, False)73        assert response.status_code == 20074        assert json.loads(response._content)['TimeToLiveSpecification']['Enabled'] is False75        # Describe TTL status after being disabled.76        response = testutil.send_describe_dynamodb_ttl_request(TEST_DDB_TABLE_NAME_3)77        assert response.status_code == 20078        assert json.loads(response._content)['TimeToLiveDescription']['TimeToLiveStatus'] == 'DISABLED'79        # Enable TTL for given table again80        response = testutil.send_update_dynamodb_ttl_request(TEST_DDB_TABLE_NAME_3, True)81        assert response.status_code == 20082        assert json.loads(response._content)['TimeToLiveSpecification']['Enabled'] is True83        # Describe TTL status after being enabled again.84        response = testutil.send_describe_dynamodb_ttl_request(TEST_DDB_TABLE_NAME_3)85        assert response.status_code == 20086        assert json.loads(response._content)['TimeToLiveDescription']['TimeToLiveStatus'] == 'ENABLED'87        # Clean up table88        dynamodb_client.delete_table(TableName=TEST_DDB_TABLE_NAME_3)89    def test_tag_resource(self):90        response = testutil.send_dynamodb_request('', action='TagResource', request_body=json.dumps({91            'ResourceArn': testutil.get_sample_arn('dynamodb', 'table'),92            'Tags': [{'tagkey1': 'tagvalue1'}, {'tagkey2': 'tagvalue2'}, {'tagkey3': 'tagvalue3'}]93        }))94        assert response.status_code == 20095        assert not response._content  # Empty string if tagging succeeded (mocked for now)96    def test_untag_resource(self):97        response = testutil.send_dynamodb_request('', action='UntagResource', request_body=json.dumps({98            'ResourceArn': testutil.get_sample_arn('dynamodb', 'table'),99            'TagKeys': ['tagkey1', 'tagkey2']  # Keys to untag100        }))101        assert response.status_code == 200102        assert not response._content  # Empty string if untagging succeeded (mocked for now)103    def test_list_tags_of_resource(self):104        response = testutil.send_dynamodb_request('', action='ListTagsOfResource', request_body=json.dumps({105            'ResourceArn': testutil.get_sample_arn('dynamodb', 'table')106        }))107        assert response.status_code == 200108        assert json.loads(response._content)['Tags'] == []  # Empty list returned109    def test_region_replacement(self):110        dynamodb = aws_stack.connect_to_resource('dynamodb')111        testutil.create_dynamodb_table(112            TEST_DDB_TABLE_NAME_4,113            partition_key=PARTITION_KEY,114            stream_view_type='NEW_AND_OLD_IMAGES'115        )116        table = dynamodb.Table(TEST_DDB_TABLE_NAME_4)117        expected_arn_prefix = 'arn:aws:dynamodb:' + aws_stack.get_local_region()118        assert table.table_arn.startswith(expected_arn_prefix)...client.py
Source:client.py  
...85            ssl_args = commands.get('ssl_args', {})86            extra = commands.get('extra')87            connect_timeout = commands.get('connect_timeout')88            inactivity_timeout = commands.get('inactivity_timeout')89            self.connect_to_resource(remote, is_ssl=is_ssl, connect_timeout=connect_timeout,90                    inactivity_timeout=inactivity_timeout, extra=extra,91                    **ssl_args)92        elif 'close' in commands:93            if isinstance(commands['close'], basestring): 94                self.send_data(self.sock, commands['close'])95            raise StopIteration()96        elif 'file' in commands:97            # command to send a file98            if isinstance(commands['file'], basestring):99                fdin = os.open(commands['file'], os.O_RDONLY)100            else:101                fdin = commands['file']102            offset = commands.get('offset', 0)103            nbytes = commands.get('nbytes', os.fstat(fdin).st_size)104        105            # send a reply if needed, useful in HTTP response.106            if 'reply' in commands:107                self.send_data(self.sock, commands['reply'])108            109            # use sendfile if possible to send the file content110            async_sendfile(self.sock.fileno(), fdin, offset, nbytes)111            raise StopIteration()112        else:113            raise StopIteration()114    def send_data(self, sock, data):115        if hasattr(data, 'read'):116            try:117                data.seek(0)118            except (ValueError, IOError):119                pass120            121            while True:122                chunk = data.readline()123                if not chunk:124                    break125                sock.sendall(chunk)    126        elif isinstance(data, basestring):127           sock.sendall(data)128        else:129            for chunk in data:130                sock.sendall(chunk)131    def connect_to_resource(self, addr, is_ssl=False, connect_timeout=None,132            inactivity_timeout=None, extra=None, **ssl_args):133        with gevent.Timeout(connect_timeout, ConnectionError):134            try:135                if is_ipv6(addr[0]):136                    sock = socket.socket(socket.AF_INET6, 137                            socket.SOCK_STREAM)138                else:139                    sock = socket.socket(socket.AF_INET, 140                            socket.SOCK_STREAM)141                if is_ssl:142                    sock = ssl.wrap_socket(sock, **ssl_args)143                sock.connect(addr)144            except socket.error, e:145                raise ConnectionError(...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
