How to use delete_all_event_source_mappings method in localstack

Best Python code snippet using localstack_python

dynamodb_listener.py

Source:dynamodb_listener.py Github

copy

Full Screen

...286 event_publisher.fire_event(287 event_publisher.EVENT_DYNAMODB_DELETE_TABLE,288 payload={'n': event_publisher.get_hash(data['TableName'])}289 )290 self.delete_all_event_source_mappings(table_arn)291 TABLE_TAGS.pop(table_arn, None)292 return293 elif action == '%s.UpdateTable' % ACTION_PREFIX:294 if 'StreamSpecification' in data:295 if response.status_code == 200:296 content = json.loads(to_str(response._content))297 create_dynamodb_stream(data, content['TableDescription'].get('LatestStreamLabel'))298 return299 elif action == '%s.TagResource' % ACTION_PREFIX:300 table_arn = data['ResourceArn']301 if table_arn not in TABLE_TAGS:302 TABLE_TAGS[table_arn] = {}303 TABLE_TAGS[table_arn].update({tag['Key']: tag['Value'] for tag in data.get('Tags', [])})304 return305 elif action == '%s.UntagResource' % ACTION_PREFIX:306 table_arn = data['ResourceArn']307 for tag_key in data.get('TagKeys', []):308 TABLE_TAGS.get(table_arn, {}).pop(tag_key, None)309 return310 else:311 # nothing to do312 return313 if len(records) > 0 and 'eventName' in records[0]:314 if 'TableName' in data:315 records[0]['eventSourceARN'] = aws_stack.dynamodb_table_arn(data['TableName'])316 forward_to_lambda(records)317 forward_to_ddb_stream(records)318 # -------------319 # UTIL METHODS320 # -------------321 def prepare_batch_write_item_records(self, record, data):322 records = []323 i = 0324 for table_name in sorted(data['RequestItems'].keys()):325 for request in data['RequestItems'][table_name]:326 put_request = request.get('PutRequest')327 if put_request:328 existing_item = self._thread_local('existing_items')[i]329 keys = dynamodb_extract_keys(item=put_request['Item'], table_name=table_name)330 if isinstance(keys, Response):331 return keys332 new_record = clone(record)333 new_record['eventName'] = 'INSERT' if not existing_item else 'MODIFY'334 new_record['dynamodb']['Keys'] = keys335 new_record['dynamodb']['NewImage'] = put_request['Item']336 if existing_item:337 new_record['dynamodb']['OldImage'] = existing_item338 new_record['eventSourceARN'] = aws_stack.dynamodb_table_arn(table_name)339 records.append(new_record)340 delete_request = request.get('DeleteRequest')341 if delete_request:342 keys = delete_request['Key']343 if isinstance(keys, Response):344 return keys345 new_record = clone(record)346 new_record['eventName'] = 'REMOVE'347 new_record['dynamodb']['Keys'] = keys348 new_record['dynamodb']['OldImage'] = self._thread_local('existing_items')[i]349 new_record['eventSourceARN'] = aws_stack.dynamodb_table_arn(table_name)350 records.append(new_record)351 i += 1352 return records353 def prepare_transact_write_item_records(self, record, data):354 records = []355 # Fix issue #2745: existing_items only contain the Put/Update/Delete records,356 # so we will increase the index based on these events357 i = 0358 for request in data['TransactItems']:359 put_request = request.get('Put')360 if put_request:361 existing_item = self._thread_local('existing_items')[i]362 table_name = put_request['TableName']363 keys = dynamodb_extract_keys(item=put_request['Item'], table_name=table_name)364 if isinstance(keys, Response):365 return keys366 new_record = clone(record)367 new_record['eventName'] = 'INSERT' if not existing_item else 'MODIFY'368 new_record['dynamodb']['Keys'] = keys369 new_record['dynamodb']['NewImage'] = put_request['Item']370 if existing_item:371 new_record['dynamodb']['OldImage'] = existing_item372 new_record['eventSourceARN'] = aws_stack.dynamodb_table_arn(table_name)373 records.append(new_record)374 i += 1375 update_request = request.get('Update')376 if update_request:377 table_name = update_request['TableName']378 keys = update_request['Key']379 if isinstance(keys, Response):380 return keys381 updated_item = find_existing_item(update_request, table_name)382 if not updated_item:383 return384 new_record = clone(record)385 new_record['eventName'] = 'MODIFY'386 new_record['dynamodb']['Keys'] = keys387 new_record['dynamodb']['OldImage'] = self._thread_local('existing_items')[i]388 new_record['dynamodb']['NewImage'] = updated_item389 new_record['eventSourceARN'] = aws_stack.dynamodb_table_arn(table_name)390 records.append(new_record)391 i += 1392 delete_request = request.get('Delete')393 if delete_request:394 table_name = delete_request['TableName']395 keys = delete_request['Key']396 if isinstance(keys, Response):397 return keys398 new_record = clone(record)399 new_record['eventName'] = 'REMOVE'400 new_record['dynamodb']['Keys'] = keys401 new_record['dynamodb']['OldImage'] = self._thread_local('existing_items')[i]402 new_record['eventSourceARN'] = aws_stack.dynamodb_table_arn(table_name)403 records.append(new_record)404 i += 1405 return records406 def delete_all_event_source_mappings(self, table_arn):407 if table_arn:408 # fix start dynamodb service without lambda409 if not is_api_enabled('lambda'):410 return411 lambda_client = aws_stack.connect_to_service('lambda')412 result = lambda_client.list_event_source_mappings(EventSourceArn=table_arn)413 for event in result['EventSourceMappings']:414 event_source_mapping_id = event['UUID']415 lambda_client.delete_event_source_mapping(UUID=event_source_mapping_id)416 @staticmethod417 def _thread_local(name, default=None):418 try:419 return getattr(ProxyListenerDynamoDB.thread_local, name)420 except AttributeError:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful