How to use tag_delivery_stream method in localstack

Best Python code snippet using localstack_python

test_firehose_tags.py

Source:test_firehose_tags.py Github

copy

Full Screen

...60 assert len(result["Tags"]) == number_of_tags61 assert result["Tags"] == tags62 assert result["HasMoreTags"] is False63@mock_firehose64def test_tag_delivery_stream():65 """Test successful, failed invocations of tag_delivery_stream()."""66 client = boto3.client("firehose", region_name=TEST_REGION)67 # Create a delivery stream for testing purposes.68 stream_name = f"test_tags_{get_random_hex(6)}"69 client.create_delivery_stream(70 DeliveryStreamName=stream_name,71 ExtendedS3DestinationConfiguration=sample_s3_dest_config(),72 )73 # Unknown stream name.74 unknown_name = "foo"75 with pytest.raises(ClientError) as exc:76 client.tag_delivery_stream(77 DeliveryStreamName=unknown_name, Tags=[{"Key": "foo", "Value": "bar"}]78 )79 err = exc.value.response["Error"]80 assert err["Code"] == "ResourceNotFoundException"81 assert (82 f"Firehose {unknown_name} under account {ACCOUNT_ID} not found"83 in err["Message"]84 )85 # Too many tags.86 with pytest.raises(ClientError) as exc:87 client.tag_delivery_stream(88 DeliveryStreamName=stream_name,89 Tags=[{"Key": f"{x}", "Value": f"{x}"} for x in range(51)],90 )91 err = exc.value.response["Error"]92 assert err["Code"] == "ValidationException"93 assert (94 f"failed to satisify contstraint: Member must have length "95 f"less than or equal to {MAX_TAGS_PER_DELIVERY_STREAM}"96 ) in err["Message"]97 # Bad tags.98 with pytest.raises(ClientError) as exc:99 client.tag_delivery_stream(100 DeliveryStreamName=stream_name, Tags=[{"Key": "foo!", "Value": "bar"}]101 )102 err = exc.value.response["Error"]103 assert err["Code"] == "ValidationException"104 assert (105 "1 validation error detected: Value 'foo!' at 'tags.1.member.key' "106 "failed to satisfy constraint: Member must satisfy regular "107 "expression pattern"108 ) in err["Message"]109 # Successful addition of tags.110 added_tags = [111 {"Key": f"{x}", "Value": f"{x}"} for x in range(MAX_TAGS_PER_DELIVERY_STREAM)112 ]113 client.tag_delivery_stream(DeliveryStreamName=stream_name, Tags=added_tags)114 results = client.list_tags_for_delivery_stream(DeliveryStreamName=stream_name)115 assert len(results["Tags"]) == MAX_TAGS_PER_DELIVERY_STREAM116 assert results["Tags"] == added_tags117@mock_firehose118def test_untag_delivery_stream():119 """Test successful, failed invocations of untag_delivery_stream()."""120 client = boto3.client("firehose", region_name=TEST_REGION)121 # Create a delivery stream for testing purposes.122 stream_name = f"test_untag_{get_random_hex(6)}"123 tag_list = [124 {"Key": "one", "Value": "1"},125 {"Key": "two", "Value": "2"},126 {"Key": "three", "Value": "3"},127 ]128 client.create_delivery_stream(129 DeliveryStreamName=stream_name,130 ExtendedS3DestinationConfiguration=sample_s3_dest_config(),131 Tags=tag_list,132 )133 # Untag all of the tags. Verify there are no more tags.134 tag_keys = [x["Key"] for x in tag_list]135 client.untag_delivery_stream(DeliveryStreamName=stream_name, TagKeys=tag_keys)136 results = client.list_tags_for_delivery_stream(DeliveryStreamName=stream_name)137 assert not results["Tags"]...

Full Screen

Full Screen

responses.py

Source:responses.py Github

copy

Full Screen

...66 result = self.firehose_backend.put_record_batch(67 self._get_param("DeliveryStreamName"), self._get_param("Records")68 )69 return json.dumps(result)70 def tag_delivery_stream(self):71 """Prepare arguments and respond to TagDeliveryStream request."""72 self.firehose_backend.tag_delivery_stream(73 self._get_param("DeliveryStreamName"), self._get_param("Tags"),74 )75 return json.dumps({})76 def untag_delivery_stream(self):77 """Prepare arguments and respond to UntagDeliveryStream()."""78 self.firehose_backend.untag_delivery_stream(79 self._get_param("DeliveryStreamName"), self._get_param("TagKeys"),80 )81 return json.dumps({})82 def update_destination(self):83 """Prepare arguments and respond to UpdateDestination()."""84 self.firehose_backend.update_destination(85 self._get_param("DeliveryStreamName"),86 self._get_param("CurrentDeliveryStreamVersionId"),87 self._get_param("DestinationId"),88 self._get_param("S3DestinationUpdate"),89 self._get_param("ExtendedS3DestinationUpdate"),90 self._get_param("S3BackupMode"),91 self._get_param("RedshiftDestinationUpdate"),92 self._get_param("ElasticsearchDestinationUpdate"),...

Full Screen

Full Screen

client.py

Source:client.py Github

copy

Full Screen

...31 def start_delivery_stream_encryption(self, DeliveryStreamName: str) -> Dict:32 pass33 def stop_delivery_stream_encryption(self, DeliveryStreamName: str) -> Dict:34 pass35 def tag_delivery_stream(self, DeliveryStreamName: str, Tags: List) -> Dict:36 pass37 def untag_delivery_stream(self, DeliveryStreamName: str, TagKeys: List) -> Dict:38 pass39 def update_destination(self, DeliveryStreamName: str, CurrentDeliveryStreamVersionId: str, DestinationId: str, S3DestinationUpdate: Dict = None, ExtendedS3DestinationUpdate: Dict = None, RedshiftDestinationUpdate: Dict = None, ElasticsearchDestinationUpdate: Dict = None, SplunkDestinationUpdate: Dict = None) -> Dict:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful