Best Python code snippet using localstack_python
dynamodb_listener.py
Source:dynamodb_listener.py  
...155                record['dynamodb']['SizeBytes'] = len(json.dumps(updated_item))156        elif action == '%s.BatchWriteItem' % ACTION_PREFIX:157            records = self.prepare_batch_write_item_records(record, data)158        elif action == '%s.TransactWriteItems' % ACTION_PREFIX:159            records = self.prepare_transact_write_item_records(record, data)160        elif action == '%s.PutItem' % ACTION_PREFIX:161            if response.status_code == 200:162                existing_item = self._thread_local('existing_item')163                record['eventName'] = 'INSERT' if not existing_item else 'MODIFY'164                keys = dynamodb_extract_keys(item=data['Item'], table_name=data['TableName'])165                if isinstance(keys, Response):166                    return keys167                record['dynamodb']['Keys'] = keys168                record['dynamodb']['NewImage'] = data['Item']169                record['dynamodb']['SizeBytes'] = len(json.dumps(data['Item']))170                if existing_item:171                    record['dynamodb']['OldImage'] = existing_item172        elif action == '%s.GetItem' % ACTION_PREFIX:173            if response.status_code == 200:174                content = json.loads(to_str(response.content))175                # make sure we append 'ConsumedCapacity', which is properly176                # returned by dynalite, but not by AWS's DynamoDBLocal177                if 'ConsumedCapacity' not in content and data.get('ReturnConsumedCapacity') in ('TOTAL', 'INDEXES'):178                    content['ConsumedCapacity'] = {179                        'CapacityUnits': 0.5,  # TODO hardcoded180                        'TableName': data['TableName']181                    }182                    response._content = json.dumps(content)183                    fix_headers_for_updated_response(response)184        elif action == '%s.DeleteItem' % ACTION_PREFIX:185            if response.status_code == 200:186                old_item = self._thread_local('existing_item')187                record['eventName'] = 'REMOVE'188                record['dynamodb']['Keys'] = data['Key']189                record['dynamodb']['OldImage'] = old_item190        elif action == '%s.CreateTable' % ACTION_PREFIX:191            if 'StreamSpecification' in data:192                create_dynamodb_stream(data)193            event_publisher.fire_event(event_publisher.EVENT_DYNAMODB_CREATE_TABLE,194                payload={'n': event_publisher.get_hash(data['TableName'])})195            return196        elif action == '%s.DeleteTable' % ACTION_PREFIX:197            event_publisher.fire_event(event_publisher.EVENT_DYNAMODB_DELETE_TABLE,198                payload={'n': event_publisher.get_hash(data['TableName'])})199            return200        elif action == '%s.UpdateTable' % ACTION_PREFIX:201            if 'StreamSpecification' in data:202                create_dynamodb_stream(data)203            return204        else:205            # nothing to do206            return207        if len(records) > 0 and 'eventName' in records[0]:208            if 'TableName' in data:209                records[0]['eventSourceARN'] = aws_stack.dynamodb_table_arn(data['TableName'])210            forward_to_lambda(records)211            forward_to_ddb_stream(records)212    def prepare_batch_write_item_records(self, record, data):213        records = []214        i = 0215        for table_name in sorted(data['RequestItems'].keys()):216            for request in data['RequestItems'][table_name]:217                put_request = request.get('PutRequest')218                if put_request:219                    existing_item = self._thread_local('existing_items')[i]220                    keys = dynamodb_extract_keys(item=put_request['Item'], table_name=table_name)221                    if isinstance(keys, Response):222                        return keys223                    new_record = clone(record)224                    new_record['eventName'] = 'INSERT' if not existing_item else 'MODIFY'225                    new_record['dynamodb']['Keys'] = keys226                    new_record['dynamodb']['NewImage'] = put_request['Item']227                    if existing_item:228                        new_record['dynamodb']['OldImage'] = existing_item229                    new_record['eventSourceARN'] = aws_stack.dynamodb_table_arn(table_name)230                    records.append(new_record)231                delete_request = request.get('DeleteRequest')232                if delete_request:233                    keys = delete_request['Key']234                    if isinstance(keys, Response):235                        return keys236                    new_record = clone(record)237                    new_record['eventName'] = 'REMOVE'238                    new_record['dynamodb']['Keys'] = keys239                    new_record['dynamodb']['OldImage'] = self._thread_local('existing_items')[i]240                    new_record['eventSourceARN'] = aws_stack.dynamodb_table_arn(table_name)241                    records.append(new_record)242                i += 1243        return records244    def prepare_transact_write_item_records(self, record, data):245        records = []246        for i, request in enumerate(data['TransactItems']):247            put_request = request.get('Put')248            if put_request:249                existing_item = self._thread_local('existing_items')[i]250                table_name = put_request['TableName']251                keys = dynamodb_extract_keys(item=put_request['Item'], table_name=table_name)252                if isinstance(keys, Response):253                    return keys254                new_record = clone(record)255                new_record['eventName'] = 'INSERT' if not existing_item else 'MODIFY'256                new_record['dynamodb']['Keys'] = keys257                new_record['dynamodb']['NewImage'] = put_request['Item']258                if existing_item:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
