How to use put_records method in localstack

Best Python code snippet using localstack_python

test_aws_kinesis_origin.py

Source:test_aws_kinesis_origin.py Github

copy

Full Screen

...48 aws.wait_for_stream_status(stream_name=stream_name, status='ACTIVE')49 expected_messages = set('Message {0}'.format(i) for i in range(10))50 # not using PartitionKey logic and hence assign some temp key51 put_records = [{'Data': exp_msg, 'PartitionKey': '111'} for exp_msg in expected_messages]52 client.put_records(Records=put_records, StreamName=stream_name)53 # messages are published, read through the pipeline and assert54 sdc_executor.start_pipeline(consumer_origin_pipeline).wait_for_pipeline_output_records_count(11)55 sdc_executor.stop_pipeline(consumer_origin_pipeline)56 output_records = [record.field['text'].value57 for record in wiretap.output_records]58 assert set(output_records) == expected_messages59 finally:60 _ensure_pipeline_is_stopped(sdc_executor, consumer_origin_pipeline)61 logger.info('Deleting %s Kinesis stream on AWS ...', stream_name)62 client.delete_stream(StreamName=stream_name) # Stream operations are done. Delete the stream.63 logger.info('Deleting %s DynamoDB table on AWS ...', application_name)64 aws.dynamodb.delete_table(TableName=application_name)65@aws('kinesis')66@sdc_min_version('3.19.0')67@pytest.mark.parametrize('additional_configurations', [68 [],69 [{'key': 'failoverTimeMillis', 'value': '10000'}],70 [{'key': 'taskBackoffTimeMillis', 'value': '500'}],71 [{'key': 'metricsBufferTimeMillis', 'value': '10000'}],72 [{'key': 'metricsMaxQueueSize', 'value': '10000'}],73 [{'key': 'validateSequenceNumberBeforeCheckpointing', 'value': 'true'}],74 [{'key': 'shutdownGraceMillis', 'value': '1'}],75 [{'key': 'billingMode', 'value': 'PROVISIONED'}],76 [{'key': 'timeoutInSeconds', 'value': '50'}],77 [{'key': 'retryGetRecordsInSeconds', 'value': '50'}],78 [{'key': 'maxGetRecordsThreadPool', 'value': '50'}],79 [{'key': 'maxLeaseRenewalThreads', 'value': '20'}],80 [{'key': 'logWarningForTaskAfterMillis', 'value': '50'}],81 [{'key': 'listShardsBackoffTimeInMillis', 'value': '1500'}],82 [{'key': 'maxListShardsRetryAttempts', 'value': '50'}],83 [{'key': 'userAgentPrefix', 'value': ''}],84 [{'key': 'userAgentSuffix', 'value': ''}],85 [{'key': 'maxConnections', 'value': '50'}],86 [{'key': 'requestTimeout', 'value': '0'}],87 [{'key': 'clientExecutionTimeout', 'value': '0'}],88 [{'key': 'throttleRetries', 'value': 'true'}],89 [{'key': 'connectionMaxIdleMillis', 'value': '60000'}],90 [{'key': 'validateAfterInactivityMillis', 'value': '5000'}],91 [{'key': 'useExpectContinue', 'value': 'true'}],92 [{'key': 'maxConsecutiveRetriesBeforeThrottling', 'value': '100'}],93 [{'key': 'retryMode', 'value': 'null'}],94 [{'key': 'cleanupLeasesUponShardCompletion', 'value': 'true'}],95 [{'key': 'a', 'value': '1'}]])96def test_kinesis_consumer_additional_properties(sdc_builder, sdc_executor, aws, additional_configurations):97 """Test for Kinesis consumer origin stage. We do so by publishing data to a test stream using Kinesis client and98 having a pipeline which reads that data using Kinesis consumer origin stage. Data is then asserted for what is99 published at Kinesis client and what we read in the pipeline. The pipeline looks like:100 Kinesis Consumer pipeline:101 kinesis_consumer >> wiretap102 """103 invalid_config = False104 # build consumer pipeline105 application_name = get_random_string(string.ascii_letters, 10)106 stream_name = '{}_{}'.format(aws.kinesis_stream_prefix, get_random_string(string.ascii_letters, 10))107 builder = sdc_builder.get_pipeline_builder()108 builder.add_error_stage('Discard')109 kinesis_consumer = builder.add_stage('Kinesis Consumer')110 kinesis_consumer.set_attributes(application_name=application_name, data_format='TEXT',111 initial_position='TRIM_HORIZON',112 stream_name=stream_name,113 kinesis_configuration=additional_configurations)114 wiretap = builder.add_wiretap()115 kinesis_consumer >> wiretap.destination116 consumer_origin_pipeline = builder.build(title='Kinesis Consumer pipeline').configure_for_environment(aws)117 sdc_executor.add_pipeline(consumer_origin_pipeline)118 client = aws.kinesis119 try:120 logger.info('Creating %s Kinesis stream on AWS ...', stream_name)121 client.create_stream(StreamName=stream_name, ShardCount=1)122 aws.wait_for_stream_status(stream_name=stream_name, status='ACTIVE')123 expected_messages = set('Message {0}'.format(i) for i in range(10))124 # not using PartitionKey logic and hence assign some temp key125 put_records = [{'Data': exp_msg, 'PartitionKey': '111'} for exp_msg in expected_messages]126 client.put_records(Records=put_records, StreamName=stream_name)127 # messages are published, read through the pipeline and assert128 sdc_executor.start_pipeline(consumer_origin_pipeline).wait_for_pipeline_output_records_count(11)129 sdc_executor.stop_pipeline(consumer_origin_pipeline)130 output_records = [record.field['text'].value131 for record in wiretap.output_records]132 assert set(output_records) == expected_messages133 except Exception as error:134 if additional_configurations[0]['key'] == 'a':135 assert 'KINESIS_24 - Invalid setting for \'' + additional_configurations[0]['key'] + \136 '\' property' in error.message137 invalid_config = True138 else:139 raise error140 finally:141 _ensure_pipeline_is_stopped(sdc_executor, consumer_origin_pipeline)142 logger.info('Deleting %s Kinesis stream on AWS ...', stream_name)143 client.delete_stream(StreamName=stream_name) # Stream operations are done. Delete the stream.144 if not invalid_config:145 logger.info('Deleting %s DynamoDB table on AWS ...', application_name)146 aws.dynamodb.delete_table(TableName=application_name)147@aws('kinesis')148def test_kinesis_consumer_at_timestamp(sdc_builder, sdc_executor, aws):149 """Test for Kinesis consumer origin stage, with AT_TIMESTAMP option. We do so by:150 - 1. Publishing data to a test stream151 - 2. Wait some time and store current timestamp152 - 3. Publishing new data153 - 4. Using Kinesis client to attempt reading from stored timestamp, passing it to the AT_TIMESTAMP option154 - 5. Assert that only the newest data has been read155 The pipelines look like:156 Kinesis Consumer pipeline: kinesis_consumer >> wiretap157 """158 # build stream159 application_name = get_random_string()160 stream_name = f'{aws.kinesis_stream_prefix}_{get_random_string()}'161 client = aws.kinesis162 try:163 logger.info('Creating %s Kinesis stream on AWS ...', stream_name)164 client.create_stream(StreamName=stream_name, ShardCount=1)165 aws.wait_for_stream_status(stream_name=stream_name, status='ACTIVE')166 # 1. Publish data to the stream167 put_records = [{'Data': f'First Message {i}', 'PartitionKey': '111'} for i in range(10)]168 client.put_records(Records=put_records, StreamName=stream_name)169 # 2. Wait and store timestamp170 time.sleep(10)171 timestamp = int(time.time()) * 1000172 # 3. Publish new data173 expected_messages = set('Second Message {0}'.format(i) for i in range(10))174 put_records = [{'Data': f'Second Message {i}', 'PartitionKey': '111'} for i in range(10)]175 client.put_records(Records=put_records, StreamName=stream_name)176 # 4. Build consumer pipeline using timestamp177 builder = sdc_builder.get_pipeline_builder()178 builder.add_error_stage('Discard')179 kinesis_consumer = builder.add_stage('Kinesis Consumer')180 kinesis_consumer.set_attributes(application_name=application_name, data_format='TEXT',181 initial_position='AT_TIMESTAMP',182 initial_timestamp=timestamp,183 stream_name=stream_name)184 wiretap = builder.add_wiretap()185 kinesis_consumer >> wiretap.destination186 consumer_origin_pipeline = builder.build(title='Kinesis Consumer pipeline').configure_for_environment(aws)187 sdc_executor.add_pipeline(consumer_origin_pipeline)188 # 5. messages are published, read through the pipeline and assert189 sdc_executor.start_pipeline(consumer_origin_pipeline).wait_for_pipeline_output_records_count(11)190 sdc_executor.stop_pipeline(consumer_origin_pipeline)191 output_records = [record.field['text'].value192 for record in wiretap.output_records]193 assert set(output_records) == expected_messages194 finally:195 _ensure_pipeline_is_stopped(sdc_executor, consumer_origin_pipeline)196 logger.info('Deleting %s Kinesis stream on AWS ...', stream_name)197 client.delete_stream(StreamName=stream_name) # Stream operations are done. Delete the stream.198 logger.info('Deleting %s DynamoDB table on AWS ...', application_name)199 aws.dynamodb.delete_table(TableName=application_name)200@aws('kinesis')201@pytest.mark.parametrize('no_of_msg', [1, 5, 10, 20, 35])202def test_kinesis_consumer_stop_resume(sdc_builder, sdc_executor, aws, no_of_msg):203 """Test for Kinesis consumer origin stage. We do so by publishing data to a test stream using Kinesis client and204 having a pipeline which reads that data using Kinesis consumer origin stage. Data is then asserted for what is205 published at Kinesis client and what we read in the pipeline. The pipeline looks like:206 Kinesis Consumer pipeline:207 kinesis_consumer >> wiretap208 """209 # build consumer pipeline210 application_name = get_random_string(string.ascii_letters, 10)211 stream_name = '{}_{}'.format(aws.kinesis_stream_prefix, get_random_string(string.ascii_letters, 10))212 builder = sdc_builder.get_pipeline_builder()213 builder.add_error_stage('Discard')214 kinesis_consumer = builder.add_stage('Kinesis Consumer')215 kinesis_consumer.set_attributes(application_name=application_name, data_format='TEXT',216 initial_position='TRIM_HORIZON',217 stream_name=stream_name)218 wiretap = builder.add_wiretap()219 kinesis_consumer >> wiretap.destination220 consumer_origin_pipeline = builder.build(221 title=f'Kinesis Consumer Stop Resume: {no_of_msg}').configure_for_environment(aws)222 sdc_executor.add_pipeline(consumer_origin_pipeline)223 client = aws.kinesis224 try:225 logger.info('Creating %s Kinesis stream on AWS ...', stream_name)226 client.create_stream(StreamName=stream_name, ShardCount=1)227 aws.wait_for_stream_status(stream_name=stream_name, status='ACTIVE')228 expected_messages = set('Message {0}'.format(i) for i in range(no_of_msg))229 # not using PartitionKey logic and hence assign some temp key230 put_records = [{'Data': exp_msg, 'PartitionKey': '111'} for exp_msg in expected_messages]231 client.put_records(Records=put_records, StreamName=stream_name)232 # messages are published, read through the pipeline and assert233 # number of batches to be captured is the number of messages + 1234 sdc_executor.start_pipeline(consumer_origin_pipeline).wait_for_pipeline_output_records_count(no_of_msg + 1)235 sdc_executor.stop_pipeline(consumer_origin_pipeline)236 output_records = [record.field['text'].value237 for record in wiretap.output_records]238 assert set(output_records) == expected_messages239 expected_messages = set('Message B {0}'.format(i) for i in range(no_of_msg))240 # not using PartitionKey logic and hence assign some temp key241 put_records = [{'Data': exp_msg, 'PartitionKey': '111'} for exp_msg in expected_messages]242 client.put_records(Records=put_records, StreamName=stream_name)243 # messages are published, read through the pipeline and assert244 # number of batches to be captured is the number of messages + 1245 wiretap.reset()246 sdc_executor.start_pipeline(consumer_origin_pipeline).wait_for_pipeline_output_records_count(no_of_msg + 1)247 sdc_executor.stop_pipeline(consumer_origin_pipeline)248 output_records = [record.field['text'].value249 for record in wiretap.output_records]250 assert set(output_records) == expected_messages251 finally:252 _ensure_pipeline_is_stopped(sdc_executor, consumer_origin_pipeline)253 logger.info('Deleting %s Kinesis stream on AWS ...', stream_name)254 client.delete_stream(StreamName=stream_name) # Stream operations are done. Delete the stream.255 logger.info('Deleting %s DynamoDB table on AWS ...', application_name)256 aws.dynamodb.delete_table(TableName=application_name)257@aws('kinesis')258def test_kinesis_consumer_other_region(sdc_builder, sdc_executor, aws):259 """Test for Kinesis consumer origin stage using other as region and service endpoint. We do so by publishing data to260 a test stream using Kinesis client and having a pipeline which reads that data using Kinesis consumer origin stage.261 The region is set to other, and the service endpoint for kinesis is used.262 Data is then asserted for what is published at Kinesis client and what we read in the pipeline.263 The pipeline looks like:264 Kinesis Consumer pipeline:265 kinesis_consumer >> wiretap266 """267 endpoint = SERVICE_ENDPOINT_FORMAT.format('kinesis', aws.region)268 # build consumer pipeline269 application_name = get_random_string(string.ascii_letters, 10)270 stream_name = '{}_{}'.format(aws.kinesis_stream_prefix, get_random_string(string.ascii_letters, 10))271 builder = sdc_builder.get_pipeline_builder()272 builder.add_error_stage('Discard')273 kinesis_consumer = builder.add_stage('Kinesis Consumer')274 kinesis_consumer.set_attributes(application_name=application_name, data_format='TEXT',275 initial_position='TRIM_HORIZON',276 stream_name=stream_name)277 wiretap = builder.add_wiretap()278 kinesis_consumer >> wiretap.destination279 consumer_origin_pipeline = builder.build().configure_for_environment(aws)280 kinesis_consumer.set_attributes(region='OTHER', endpoint=endpoint)281 sdc_executor.add_pipeline(consumer_origin_pipeline)282 client = aws.kinesis283 try:284 logger.info('Creating %s Kinesis stream on AWS ...', stream_name)285 client.create_stream(StreamName=stream_name, ShardCount=1)286 aws.wait_for_stream_status(stream_name=stream_name, status='ACTIVE')287 expected_messages = set('Message {0}'.format(i) for i in range(10))288 # not using PartitionKey logic and hence assign some temp key289 put_records = [{'Data': exp_msg, 'PartitionKey': '111'} for exp_msg in expected_messages]290 client.put_records(Records=put_records, StreamName=stream_name)291 # messages are published, read through the pipeline and assert292 sdc_executor.start_pipeline(consumer_origin_pipeline).wait_for_pipeline_output_records_count(11)293 sdc_executor.stop_pipeline(consumer_origin_pipeline)294 output_records = [record.field['text'].value295 for record in wiretap.output_records]296 assert set(output_records) == expected_messages297 finally:298 _ensure_pipeline_is_stopped(sdc_executor, consumer_origin_pipeline)299 logger.info('Deleting %s Kinesis stream on AWS ...', stream_name)300 client.delete_stream(StreamName=stream_name) # Stream operations are done. Delete the stream.301 logger.info('Deleting %s DynamoDB table on AWS ...', application_name)302 aws.dynamodb.delete_table(TableName=application_name)303def _ensure_pipeline_is_stopped(sdc_executor, pipeline):304 if sdc_executor.get_pipeline_status(pipeline).response.json().get('status') == 'RUNNING':...

Full Screen

Full Screen

sender_tests.py

Source:sender_tests.py Github

copy

Full Screen

...68 }]69 send([{'Data': 'foo'}, {'Data': 'bar'}], "stream_name", mock_kinesis)70 # Records with "InternalFailure" should not be resent. In this case it71 # can happen that errors occurred but not records should be resent.72 # But put_records() must not be called with an empty list, it raises73 # an exception if that happens.74 self.assertEqual(mock_kinesis.put_records.call_count, 1)75 mock_kinesis.put_records.assert_any_call(76 StreamName="stream_name",77 Records=[{'Data': 'foo'}, {'Data': 'bar'}])78if __name__ == '__main__':...

Full Screen

Full Screen

test_deletion_check_initiator.py

Source:test_deletion_check_initiator.py Github

copy

Full Screen

...16 table_name = table["TableDescription"]["TableName"]17 yield table_name18 mock_dynamo_client.delete_table(TableName=table_name)19@pytest.fixture(scope="function")20def put_records(mock_dynamo_client, test_dynamo_table):21 def _put_records(records):22 for record in records:23 mock_dynamo_client.put_item(24 TableName=test_dynamo_table, Item={"id": {"S": record["id"]}}25 )26 yield _put_records27@pytest.fixture(scope="function")28def deletion_check_initiator(29 mock_dynamo_client, mock_sns_client, test_topic_arn, test_dynamo_table30):31 yield DeletionCheckInitiator(32 dynamo_client=mock_dynamo_client,33 sns_client=mock_sns_client,34 reindexer_topic_arn=test_topic_arn,35 source_table_name=test_dynamo_table,36 )37def test_all_records(deletion_check_initiator, put_records, get_test_topic_messages):38 n_records = 200039 lots_of_records = ({"id": f"{i:05d}"} for i in range(n_records))40 put_records(lots_of_records)41 deletion_check_initiator.all_records()42 messages = list(get_test_topic_messages())43 assert len(messages) >= (n_records / 1000)44 first_message = messages[0]45 assert first_message["jobConfigId"] == "calm--calm_deletion_checker"46 assert first_message["parameters"]["type"] == "CompleteReindexParameters"47 total_segments = first_message["parameters"]["totalSegments"]48 assert max([m["parameters"]["segment"] for m in messages]) == (total_segments - 1)49def test_specific_records(50 deletion_check_initiator, put_records, get_test_topic_messages51):52 records = [{"id": f"{i:05d}"} for i in range(10)]53 put_records(records)54 chosen_record_ids = [r["id"] for r in records[:3]]55 deletion_check_initiator.specific_records(chosen_record_ids)56 messages = list(get_test_topic_messages())57 assert len(messages) == 158 assert messages[0]["parameters"]["type"] == "SpecificReindexParameters"59 assert messages[0]["parameters"]["ids"] == chosen_record_ids60def test_specific_records_existence_check(deletion_check_initiator, put_records):61 records = [{"id": f"{i:05d}"} for i in range(10)]62 put_records(records)63 with pytest.raises(Exception) as e:64 deletion_check_initiator.specific_records(["not-stored-record"])...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful