Best Python code snippet using localstack_python
dynamodb_listener.py
Source:dynamodb_listener.py  
...153                record['dynamodb']['OldImage'] = self._thread_local('existing_item')154                record['dynamodb']['NewImage'] = updated_item155                record['dynamodb']['SizeBytes'] = len(json.dumps(updated_item))156        elif action == '%s.BatchWriteItem' % ACTION_PREFIX:157            records = self.prepare_batch_write_item_records(record, data)158        elif action == '%s.TransactWriteItems' % ACTION_PREFIX:159            records = self.prepare_transact_write_item_records(record, data)160        elif action == '%s.PutItem' % ACTION_PREFIX:161            if response.status_code == 200:162                existing_item = self._thread_local('existing_item')163                record['eventName'] = 'INSERT' if not existing_item else 'MODIFY'164                keys = dynamodb_extract_keys(item=data['Item'], table_name=data['TableName'])165                if isinstance(keys, Response):166                    return keys167                record['dynamodb']['Keys'] = keys168                record['dynamodb']['NewImage'] = data['Item']169                record['dynamodb']['SizeBytes'] = len(json.dumps(data['Item']))170                if existing_item:171                    record['dynamodb']['OldImage'] = existing_item172        elif action == '%s.GetItem' % ACTION_PREFIX:173            if response.status_code == 200:174                content = json.loads(to_str(response.content))175                # make sure we append 'ConsumedCapacity', which is properly176                # returned by dynalite, but not by AWS's DynamoDBLocal177                if 'ConsumedCapacity' not in content and data.get('ReturnConsumedCapacity') in ('TOTAL', 'INDEXES'):178                    content['ConsumedCapacity'] = {179                        'CapacityUnits': 0.5,  # TODO hardcoded180                        'TableName': data['TableName']181                    }182                    response._content = json.dumps(content)183                    fix_headers_for_updated_response(response)184        elif action == '%s.DeleteItem' % ACTION_PREFIX:185            if response.status_code == 200:186                old_item = self._thread_local('existing_item')187                record['eventName'] = 'REMOVE'188                record['dynamodb']['Keys'] = data['Key']189                record['dynamodb']['OldImage'] = old_item190        elif action == '%s.CreateTable' % ACTION_PREFIX:191            if 'StreamSpecification' in data:192                create_dynamodb_stream(data)193            event_publisher.fire_event(event_publisher.EVENT_DYNAMODB_CREATE_TABLE,194                payload={'n': event_publisher.get_hash(data['TableName'])})195            return196        elif action == '%s.DeleteTable' % ACTION_PREFIX:197            event_publisher.fire_event(event_publisher.EVENT_DYNAMODB_DELETE_TABLE,198                payload={'n': event_publisher.get_hash(data['TableName'])})199            return200        elif action == '%s.UpdateTable' % ACTION_PREFIX:201            if 'StreamSpecification' in data:202                create_dynamodb_stream(data)203            return204        else:205            # nothing to do206            return207        if len(records) > 0 and 'eventName' in records[0]:208            if 'TableName' in data:209                records[0]['eventSourceARN'] = aws_stack.dynamodb_table_arn(data['TableName'])210            forward_to_lambda(records)211            forward_to_ddb_stream(records)212    def prepare_batch_write_item_records(self, record, data):213        records = []214        i = 0215        for table_name in sorted(data['RequestItems'].keys()):216            for request in data['RequestItems'][table_name]:217                put_request = request.get('PutRequest')218                if put_request:219                    existing_item = self._thread_local('existing_items')[i]220                    keys = dynamodb_extract_keys(item=put_request['Item'], table_name=table_name)221                    if isinstance(keys, Response):222                        return keys223                    new_record = clone(record)224                    new_record['eventName'] = 'INSERT' if not existing_item else 'MODIFY'225                    new_record['dynamodb']['Keys'] = keys226                    new_record['dynamodb']['NewImage'] = put_request['Item']...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
