Best Python code snippet using localstack_python
dynamodb_listener.py
Source:dynamodb_listener.py  
...38                return 20039        elif action in ('%s.PutItem' % ACTION_PREFIX, '%s.UpdateItem' % ACTION_PREFIX, '%s.DeleteItem' % ACTION_PREFIX):40            # find an existing item and store it in a thread-local, so we can access it in return_response,41            # in order to determine whether an item already existed (MODIFY) or not (INSERT)42            ProxyListenerDynamoDB.thread_local.existing_item = find_existing_item(data)43        elif action == '%s.DescribeTable' % ACTION_PREFIX:44            # Check if table exists, to avoid error log output from DynamoDBLocal45            table_names = ddb_client.list_tables()['TableNames']46            if to_str(data['TableName']) not in table_names:47                response = error_response(message='Cannot do operations on a non-existent table',48                                          error_type='ResourceNotFoundException')49                fix_headers_for_updated_response(response)50                return response51        elif action == '%s.DeleteTable' % ACTION_PREFIX:52            # Check if table exists, to avoid error log output from DynamoDBLocal53            table_names = ddb_client.list_tables()['TableNames']54            if to_str(data['TableName']) not in table_names:55                response = error_response(message='Cannot do operations on a non-existent table',56                                          error_type='ResourceNotFoundException')57                fix_headers_for_updated_response(response)58                return response59        elif action == '%s.BatchWriteItem' % ACTION_PREFIX:60            existing_items = []61            for table_name in sorted(data['RequestItems'].keys()):62                for request in data['RequestItems'][table_name]:63                    for key in ['PutRequest', 'DeleteRequest']:64                        inner_request = request.get(key)65                        if inner_request:66                            existing_items.append(find_existing_item(inner_request, table_name))67            ProxyListenerDynamoDB.thread_local.existing_items = existing_items68        elif action == '%s.TransactWriteItems' % ACTION_PREFIX:69            existing_items = []70            for item in data['TransactItems']:71                for key in ['Put', 'Update', 'Delete']:72                    inner_item = item.get(key)73                    if inner_item:74                        existing_items.append(find_existing_item(inner_item))75            ProxyListenerDynamoDB.thread_local.existing_items = existing_items76        elif action == '%s.UpdateTimeToLive' % ACTION_PREFIX:77            # TODO: TTL status is maintained/mocked but no real expiry is happening for items78            response = Response()79            response.status_code = 20080            self._table_ttl_map[data['TableName']] = {81                'AttributeName': data['TimeToLiveSpecification']['AttributeName'],82                'Status': data['TimeToLiveSpecification']['Enabled']83            }84            response._content = json.dumps({'TimeToLiveSpecification': data['TimeToLiveSpecification']})85            fix_headers_for_updated_response(response)86            return response87        elif action == '%s.DescribeTimeToLive' % ACTION_PREFIX:88            response = Response()89            response.status_code = 20090            if data['TableName'] in self._table_ttl_map:91                if self._table_ttl_map[data['TableName']]['Status']:92                    ttl_status = 'ENABLED'93                else:94                    ttl_status = 'DISABLED'95                response._content = json.dumps({96                    'TimeToLiveDescription': {97                        'AttributeName': self._table_ttl_map[data['TableName']]['AttributeName'],98                        'TimeToLiveStatus': ttl_status99                    }100                })101            else:  # TTL for dynamodb table not set102                response._content = json.dumps({'TimeToLiveDescription': {'TimeToLiveStatus': 'DISABLED'}})103            fix_headers_for_updated_response(response)104            return response105        elif action == '%s.TagResource' % ACTION_PREFIX or action == '%s.UntagResource' % ACTION_PREFIX:106            response = Response()107            response.status_code = 200108            response._content = ''  # returns an empty body on success.109            fix_headers_for_updated_response(response)110            return response111        elif action == '%s.ListTagsOfResource' % ACTION_PREFIX:112            response = Response()113            response.status_code = 200114            response._content = json.dumps({'Tags': []})  # TODO: mocked and returns an empty list of tags for now.115            fix_headers_for_updated_response(response)116            return response117        return True118    def return_response(self, method, path, data, headers, response):119        if path.startswith('/shell'):120            return121        data = json.loads(to_str(data))122        # update table definitions123        if data and 'TableName' in data and 'KeySchema' in data:124            TABLE_DEFINITIONS[data['TableName']] = data125        if response._content:126            # fix the table and latest stream ARNs (DynamoDBLocal hardcodes "ddblocal" as the region)127            content_replaced = re.sub(r'("TableArn"|"LatestStreamArn")\s*:\s*"arn:aws:dynamodb:ddblocal:([^"]+)"',128                r'\1: "arn:aws:dynamodb:%s:\2"' % aws_stack.get_local_region(), to_str(response._content))129            if content_replaced != response._content:130                response._content = content_replaced131                fix_headers_for_updated_response(response)132        action = headers.get('X-Amz-Target')133        if not action:134            return135        record = {136            'eventID': '1',137            'eventVersion': '1.0',138            'dynamodb': {139                'StreamViewType': 'NEW_AND_OLD_IMAGES',140                'SizeBytes': -1141            },142            'awsRegion': DEFAULT_REGION,143            'eventSource': 'aws:dynamodb'144        }145        records = [record]146        if action == '%s.UpdateItem' % ACTION_PREFIX:147            if response.status_code == 200:148                updated_item = find_existing_item(data)149                if not updated_item:150                    return151                record['eventName'] = 'MODIFY'152                record['dynamodb']['Keys'] = data['Key']153                record['dynamodb']['OldImage'] = self._thread_local('existing_item')154                record['dynamodb']['NewImage'] = updated_item155                record['dynamodb']['SizeBytes'] = len(json.dumps(updated_item))156        elif action == '%s.BatchWriteItem' % ACTION_PREFIX:157            records = self.prepare_batch_write_item_records(record, data)158        elif action == '%s.TransactWriteItems' % ACTION_PREFIX:159            records = self.prepare_transact_write_item_records(record, data)160        elif action == '%s.PutItem' % ACTION_PREFIX:161            if response.status_code == 200:162                existing_item = self._thread_local('existing_item')163                record['eventName'] = 'INSERT' if not existing_item else 'MODIFY'164                keys = dynamodb_extract_keys(item=data['Item'], table_name=data['TableName'])165                if isinstance(keys, Response):166                    return keys167                record['dynamodb']['Keys'] = keys168                record['dynamodb']['NewImage'] = data['Item']169                record['dynamodb']['SizeBytes'] = len(json.dumps(data['Item']))170                if existing_item:171                    record['dynamodb']['OldImage'] = existing_item172        elif action == '%s.GetItem' % ACTION_PREFIX:173            if response.status_code == 200:174                content = json.loads(to_str(response.content))175                # make sure we append 'ConsumedCapacity', which is properly176                # returned by dynalite, but not by AWS's DynamoDBLocal177                if 'ConsumedCapacity' not in content and data.get('ReturnConsumedCapacity') in ('TOTAL', 'INDEXES'):178                    content['ConsumedCapacity'] = {179                        'CapacityUnits': 0.5,  # TODO hardcoded180                        'TableName': data['TableName']181                    }182                    response._content = json.dumps(content)183                    fix_headers_for_updated_response(response)184        elif action == '%s.DeleteItem' % ACTION_PREFIX:185            if response.status_code == 200:186                old_item = self._thread_local('existing_item')187                record['eventName'] = 'REMOVE'188                record['dynamodb']['Keys'] = data['Key']189                record['dynamodb']['OldImage'] = old_item190        elif action == '%s.CreateTable' % ACTION_PREFIX:191            if 'StreamSpecification' in data:192                create_dynamodb_stream(data)193            event_publisher.fire_event(event_publisher.EVENT_DYNAMODB_CREATE_TABLE,194                payload={'n': event_publisher.get_hash(data['TableName'])})195            return196        elif action == '%s.DeleteTable' % ACTION_PREFIX:197            event_publisher.fire_event(event_publisher.EVENT_DYNAMODB_DELETE_TABLE,198                payload={'n': event_publisher.get_hash(data['TableName'])})199            return200        elif action == '%s.UpdateTable' % ACTION_PREFIX:201            if 'StreamSpecification' in data:202                create_dynamodb_stream(data)203            return204        else:205            # nothing to do206            return207        if len(records) > 0 and 'eventName' in records[0]:208            if 'TableName' in data:209                records[0]['eventSourceARN'] = aws_stack.dynamodb_table_arn(data['TableName'])210            forward_to_lambda(records)211            forward_to_ddb_stream(records)212    def prepare_batch_write_item_records(self, record, data):213        records = []214        i = 0215        for table_name in sorted(data['RequestItems'].keys()):216            for request in data['RequestItems'][table_name]:217                put_request = request.get('PutRequest')218                if put_request:219                    existing_item = self._thread_local('existing_items')[i]220                    keys = dynamodb_extract_keys(item=put_request['Item'], table_name=table_name)221                    if isinstance(keys, Response):222                        return keys223                    new_record = clone(record)224                    new_record['eventName'] = 'INSERT' if not existing_item else 'MODIFY'225                    new_record['dynamodb']['Keys'] = keys226                    new_record['dynamodb']['NewImage'] = put_request['Item']227                    if existing_item:228                        new_record['dynamodb']['OldImage'] = existing_item229                    new_record['eventSourceARN'] = aws_stack.dynamodb_table_arn(table_name)230                    records.append(new_record)231                delete_request = request.get('DeleteRequest')232                if delete_request:233                    keys = delete_request['Key']234                    if isinstance(keys, Response):235                        return keys236                    new_record = clone(record)237                    new_record['eventName'] = 'REMOVE'238                    new_record['dynamodb']['Keys'] = keys239                    new_record['dynamodb']['OldImage'] = self._thread_local('existing_items')[i]240                    new_record['eventSourceARN'] = aws_stack.dynamodb_table_arn(table_name)241                    records.append(new_record)242                i += 1243        return records244    def prepare_transact_write_item_records(self, record, data):245        records = []246        for i, request in enumerate(data['TransactItems']):247            put_request = request.get('Put')248            if put_request:249                existing_item = self._thread_local('existing_items')[i]250                table_name = put_request['TableName']251                keys = dynamodb_extract_keys(item=put_request['Item'], table_name=table_name)252                if isinstance(keys, Response):253                    return keys254                new_record = clone(record)255                new_record['eventName'] = 'INSERT' if not existing_item else 'MODIFY'256                new_record['dynamodb']['Keys'] = keys257                new_record['dynamodb']['NewImage'] = put_request['Item']258                if existing_item:259                    new_record['dynamodb']['OldImage'] = existing_item260                new_record['eventSourceARN'] = aws_stack.dynamodb_table_arn(table_name)261                records.append(new_record)262            update_request = request.get('Update')263            if update_request:264                table_name = update_request['TableName']265                keys = update_request['Key']266                if isinstance(keys, Response):267                    return keys268                updated_item = find_existing_item(update_request, table_name)269                if not updated_item:270                    return271                new_record = clone(record)272                new_record['eventName'] = 'MODIFY'273                new_record['dynamodb']['Keys'] = keys274                new_record['dynamodb']['OldImage'] = self._thread_local('existing_items')[i]275                new_record['dynamodb']['NewImage'] = updated_item276                new_record['eventSourceARN'] = aws_stack.dynamodb_table_arn(table_name)277                records.append(new_record)278            delete_request = request.get('Delete')279            if delete_request:280                table_name = delete_request['TableName']281                keys = delete_request['Key']282                if isinstance(keys, Response):283                    return keys284                new_record = clone(record)285                new_record['eventName'] = 'REMOVE'286                new_record['dynamodb']['Keys'] = keys287                new_record['dynamodb']['OldImage'] = self._thread_local('existing_items')[i]288                new_record['eventSourceARN'] = aws_stack.dynamodb_table_arn(table_name)289                records.append(new_record)290        return records291    def _thread_local(self, name, default=None):292        try:293            return getattr(ProxyListenerDynamoDB.thread_local, name)294        except AttributeError:295            return default296# instantiate listener297UPDATE_DYNAMODB = ProxyListenerDynamoDB()298def find_existing_item(put_item, table_name=None):299    table_name = table_name or put_item['TableName']300    ddb_client = aws_stack.connect_to_service('dynamodb')301    search_key = {}302    if 'Key' in put_item:303        search_key = put_item['Key']304    else:305        schema = ddb_client.describe_table(TableName=table_name)306        schemas = [schema['Table']['KeySchema']]307        for index in schema['Table'].get('GlobalSecondaryIndexes', []):308            # schemas.append(index['KeySchema'])309            pass310        for schema in schemas:311            for key in schema:312                key_name = key['AttributeName']...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
