How to use wait_for_table_created method in localstack

Best Python code snippet using localstack_python

fixtures.py

Source:fixtures.py Github

copy

Full Screen

...176 # cleanup177 for table in tables:178 try:179 # table has to be in ACTIVE state before deletion180 def wait_for_table_created():181 return (182 dynamodb_client.describe_table(TableName=table)["Table"]["TableStatus"]183 == "ACTIVE"184 )185 poll_condition(wait_for_table_created, timeout=30)186 dynamodb_client.delete_table(TableName=table)187 except Exception as e:188 LOG.debug("error cleaning up table %s: %s", table, e)189@pytest.fixture190def s3_create_bucket(s3_client):191 buckets = []192 def factory(**kwargs) -> str:193 if "Bucket" not in kwargs:194 kwargs["Bucket"] = "test-bucket-%s" % short_uid()...

Full Screen

Full Screen

test_lambda_integration.py

Source:test_lambda_integration.py Github

copy

Full Screen

...127 StartingPosition="TRIM_HORIZON",128 MaximumBatchingWindowInSeconds=1,129 )130 uuid = rs["UUID"]131 def wait_for_table_created():132 return (133 dynamodb_client.describe_table(TableName=ddb_table)["Table"]["TableStatus"]134 == "ACTIVE"135 )136 assert poll_condition(wait_for_table_created, timeout=30)137 def wait_for_stream_created():138 return (139 dynamodbstreams_client.describe_stream(StreamArn=latest_stream_arn)[140 "StreamDescription"141 ]["StreamStatus"]142 == "ENABLED"143 )144 assert poll_condition(wait_for_stream_created, timeout=30)145 table = dynamodb_resource.Table(ddb_table)146 items = [147 {"id": short_uid(), "data": "data1"},148 {"id": short_uid(), "data": "data2"},149 ]150 table.put_item(Item=items[0])151 def assert_events():152 events = get_lambda_log_events(function_name, logs_client=logs_client)153 # lambda was invoked 1 time154 assert 1 == len(events[0]["Records"])155 # might take some time against AWS156 retry(assert_events, sleep=3, retries=10)157 # disable event source mapping158 lambda_client.update_event_source_mapping(UUID=uuid, Enabled=False)159 table.put_item(Item=items[1])160 events = get_lambda_log_events(function_name, logs_client=logs_client)161 # lambda no longer invoked, still have 1 event162 assert 1 == len(events[0]["Records"])163 # TODO invalid test against AWS, this behavior just is not correct164 def test_deletion_event_source_mapping_with_dynamodb(165 self, create_lambda_function, lambda_client, dynamodb_client, lambda_su_role166 ):167 function_name = f"lambda_func-{short_uid()}"168 ddb_table = f"ddb_table-{short_uid()}"169 create_lambda_function(170 func_name=function_name,171 handler_file=TEST_LAMBDA_PYTHON_ECHO,172 runtime=LAMBDA_RUNTIME_PYTHON36,173 role=lambda_su_role,174 )175 latest_stream_arn = aws_stack.create_dynamodb_table(176 table_name=ddb_table,177 partition_key="id",178 client=dynamodb_client,179 stream_view_type="NEW_IMAGE",180 )["TableDescription"]["LatestStreamArn"]181 lambda_client.create_event_source_mapping(182 FunctionName=function_name,183 EventSourceArn=latest_stream_arn,184 StartingPosition="TRIM_HORIZON",185 )186 def wait_for_table_created():187 return (188 dynamodb_client.describe_table(TableName=ddb_table)["Table"]["TableStatus"]189 == "ACTIVE"190 )191 assert poll_condition(wait_for_table_created, timeout=30)192 dynamodb_client.delete_table(TableName=ddb_table)193 result = lambda_client.list_event_source_mappings(EventSourceArn=latest_stream_arn)194 assert 1 == len(result["EventSourceMappings"])195 def test_event_source_mapping_with_sqs(196 self,197 create_lambda_function,198 lambda_client,199 sqs_client,200 sqs_create_queue,...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful