How to use get_topic_attributes method in localstack

Best Python code snippet using localstack_python

test_sns.py

Source:test_sns.py Github

copy

Full Screen

...62 )63 resources = p.run()64 self.assertEqual([r["TopicArn"] for r in resources], [topic_arn])65 data = json.loads(66 client.get_topic_attributes(TopicArn=resources[0]["TopicArn"])[67 "Attributes"68 ][69 "Policy"70 ]71 )72 self.assertEqual(73 [s["Sid"] for s in data.get("Statement", ())], ["SpecificAllow"]74 )75 @functional76 def test_sns_remove_named(self):77 session_factory = self.replay_flight_data("test_sns_remove_named")78 client = session_factory().client("sns")79 name = "test-sns-remove-named"80 topic_arn = client.create_topic(Name=name)["TopicArn"]81 self.addCleanup(client.delete_topic, TopicArn=topic_arn)82 client.set_topic_attributes(83 TopicArn=topic_arn,84 AttributeName="Policy",85 AttributeValue=json.dumps(86 {87 "Version": "2012-10-17",88 "Statement": [89 {90 "Sid": "SpecificAllow",91 "Effect": "Allow",92 "Principal": "*",93 "Action": ["SNS:Subscribe"],94 "Resource": topic_arn,95 },96 {97 "Sid": "RemoveMe",98 "Effect": "Allow",99 "Principal": "*",100 "Action": ["SNS:GetTopicAttributes"],101 "Resource": topic_arn,102 },103 ],104 }105 ),106 )107 p = self.load_policy(108 {109 "name": "sns-rm-named",110 "resource": "sns",111 "filters": [{"TopicArn": topic_arn}],112 "actions": [113 {"type": "remove-statements", "statement_ids": ["RemoveMe"]}114 ],115 },116 session_factory=session_factory,117 )118 resources = p.run()119 self.assertEqual(len(resources), 1)120 data = json.loads(121 client.get_topic_attributes(TopicArn=resources[0]["TopicArn"])[122 "Attributes"123 ][124 "Policy"125 ]126 )127 self.assertTrue("RemoveMe" not in [s["Sid"] for s in data.get("Statement", ())])128 @functional129 def test_sns_modify_replace_policy(self):130 session_factory = self.replay_flight_data("test_sns_modify_replace_policy")131 client = session_factory().client("sns")132 name = "test_sns_modify_replace_policy"133 topic_arn = client.create_topic(Name=name)["TopicArn"]134 self.addCleanup(client.delete_topic, TopicArn=topic_arn)135 client.set_topic_attributes(136 TopicArn=topic_arn,137 AttributeName="Policy",138 AttributeValue=json.dumps(139 {140 "Version": "2012-10-17",141 "Statement": [142 {143 "Sid": "SpecificAllow",144 "Effect": "Allow",145 "Principal": "*",146 "Action": ["SNS:Subscribe"],147 "Resource": topic_arn,148 }149 ],150 }151 ),152 )153 p = self.load_policy(154 {155 "name": "sns-modify-replace-policy",156 "resource": "sns",157 "filters": [{"TopicArn": topic_arn}],158 "actions": [159 {160 "type": "modify-policy",161 "add-statements": [162 {163 "Sid": "ReplaceWithMe",164 "Effect": "Allow",165 "Principal": "*",166 "Action": ["SNS:GetTopicAttributes"],167 "Resource": topic_arn,168 }169 ],170 "remove-statements": "*",171 }172 ],173 },174 session_factory=session_factory,175 )176 resources = p.run()177 self.assertEqual(len(resources), 1)178 data = json.loads(179 client.get_topic_attributes(TopicArn=resources[0]["TopicArn"])[180 "Attributes"181 ][182 "Policy"183 ]184 )185 self.assertTrue(186 "ReplaceWithMe" in [s["Sid"] for s in data.get("Statement", ())]187 )188 @functional189 def test_sns_account_id_template(self):190 session_factory = self.replay_flight_data("test_sns_account_id_template")191 client = session_factory().client("sns")192 name = "test_sns_account_id_template"193 topic_arn = client.create_topic(Name=name)["TopicArn"]194 self.addCleanup(client.delete_topic, TopicArn=topic_arn)195 client.set_topic_attributes(196 TopicArn=topic_arn,197 AttributeName="Policy",198 AttributeValue=json.dumps(199 {200 "Version": "2012-10-17",201 "Statement": [202 {203 "Sid": "SpecificAllow",204 "Effect": "Allow",205 "Principal": "*",206 "Action": ["SNS:Subscribe"],207 "Resource": topic_arn,208 }209 ],210 }211 ),212 )213 p = self.load_policy(214 {215 "name": "sns-modify-replace-policy",216 "resource": "sns",217 "filters": [{"TopicArn": topic_arn}],218 "actions": [219 {220 "type": "modify-policy",221 "add-statements": [222 {223 "Sid": "__default_statement_ID_{account_id}",224 "Effect": "Allow",225 "Principal": {"Service": "s3.amazonaws.com"},226 "Action": "SNS:Publish",227 "Resource": topic_arn,228 "Condition": {229 "StringEquals": {230 "AWS:SourceAccount": "{account_id}"231 },232 "ArnLike": {"aws:SourceArn": "arn:aws:s3:*:*:*"},233 },234 }235 ],236 "remove-statements": "*",237 }238 ],239 },240 session_factory=session_factory,241 )242 resources = p.run()243 self.assertEqual(len(resources), 1)244 data = json.loads(245 client.get_topic_attributes(TopicArn=resources[0]["TopicArn"])[246 "Attributes"247 ][248 "Policy"249 ]250 )251 self.assertTrue(252 "__default_statement_ID_" +253 self.account_id in [s["Sid"] for s in data.get("Statement", ())]254 )255 @functional256 def test_sns_modify_remove_policy(self):257 session_factory = self.replay_flight_data("test_sns_modify_remove_policy")258 client = session_factory().client("sns")259 name = "test_sns_modify_remove_policy"260 topic_arn = client.create_topic(Name=name)["TopicArn"]261 self.addCleanup(client.delete_topic, TopicArn=topic_arn)262 client.set_topic_attributes(263 TopicArn=topic_arn,264 AttributeName="Policy",265 AttributeValue=json.dumps(266 {267 "Version": "2012-10-17",268 "Statement": [269 {270 "Sid": "SpecificAllow",271 "Effect": "Allow",272 "Principal": "*",273 "Action": ["SNS:Subscribe"],274 "Resource": topic_arn,275 },276 {277 "Sid": "RemoveMe",278 "Effect": "Allow",279 "Principal": "*",280 "Action": ["SNS:GetTopicAttributes"],281 "Resource": topic_arn,282 },283 ],284 }285 ),286 )287 p = self.load_policy(288 {289 "name": "sns-modify-remove-policy",290 "resource": "sns",291 "filters": [{"TopicArn": topic_arn}],292 "actions": [293 {294 "type": "modify-policy",295 "add-statements": [],296 "remove-statements": ["RemoveMe"],297 }298 ],299 },300 session_factory=session_factory,301 )302 resources = p.run()303 self.assertEqual(len(resources), 1)304 data = json.loads(305 client.get_topic_attributes(TopicArn=resources[0]["TopicArn"])[306 "Attributes"307 ][308 "Policy"309 ]310 )311 self.assertTrue("RemoveMe" not in [s["Sid"] for s in data.get("Statement", ())])312 @functional313 def test_sns_modify_add_policy(self):314 session_factory = self.replay_flight_data("test_sns_modify_add_policy")315 client = session_factory().client("sns")316 name = "test_sns_modify_add_policy"317 topic_arn = client.create_topic(Name=name)["TopicArn"]318 self.addCleanup(client.delete_topic, TopicArn=topic_arn)319 client.set_topic_attributes(320 TopicArn=topic_arn,321 AttributeName="Policy",322 AttributeValue=json.dumps(323 {324 "Version": "2012-10-17",325 "Statement": [326 {327 "Sid": "SpecificAllow",328 "Effect": "Allow",329 "Principal": "*",330 "Action": ["SNS:Subscribe"],331 "Resource": topic_arn,332 }333 ],334 }335 ),336 )337 p = self.load_policy(338 {339 "name": "sns-modify-add-policy",340 "resource": "sns",341 "filters": [{"TopicArn": topic_arn}],342 "actions": [343 {344 "type": "modify-policy",345 "add-statements": [346 {347 "Sid": "AddMe",348 "Effect": "Allow",349 "Principal": "*",350 "Action": ["SNS:GetTopicAttributes"],351 "Resource": topic_arn,352 }353 ],354 "remove-statements": [],355 }356 ],357 },358 session_factory=session_factory,359 )360 resources = p.run()361 self.assertEqual(len(resources), 1)362 data = json.loads(363 client.get_topic_attributes(TopicArn=resources[0]["TopicArn"])[364 "Attributes"365 ][366 "Policy"367 ]368 )369 self.assertTrue("AddMe" in [s["Sid"] for s in data.get("Statement", ())])370 @functional371 def test_sns_modify_add_and_remove_policy(self):372 session_factory = self.replay_flight_data(373 "test_sns_modify_add_and_remove_policy"374 )375 client = session_factory().client("sns")376 name = "test_sns_modify_add_and_remove_policy"377 topic_arn = client.create_topic(Name=name)["TopicArn"]378 self.addCleanup(client.delete_topic, TopicArn=topic_arn)379 client.set_topic_attributes(380 TopicArn=topic_arn,381 AttributeName="Policy",382 AttributeValue=json.dumps(383 {384 "Version": "2012-10-17",385 "Statement": [386 {387 "Sid": "SpecificAllow",388 "Effect": "Allow",389 "Principal": "*",390 "Action": ["SNS:Subscribe"],391 "Resource": topic_arn,392 },393 {394 "Sid": "RemoveMe",395 "Effect": "Allow",396 "Principal": "*",397 "Action": ["SNS:GetTopicAttributes"],398 "Resource": topic_arn,399 },400 ],401 }402 ),403 )404 p = self.load_policy(405 {406 "name": "sns-modify-add-and-remove-policy",407 "resource": "sns",408 "filters": [{"TopicArn": topic_arn}],409 "actions": [410 {411 "type": "modify-policy",412 "add-statements": [413 {414 "Sid": "AddMe",415 "Effect": "Allow",416 "Principal": "*",417 "Action": ["SNS:GetTopicAttributes"],418 "Resource": topic_arn,419 }420 ],421 "remove-statements": ["RemoveMe"],422 }423 ],424 },425 session_factory=session_factory,426 )427 resources = p.run()428 self.assertEqual(len(resources), 1)429 data = json.loads(430 client.get_topic_attributes(TopicArn=resources[0]["TopicArn"])[431 "Attributes"432 ][433 "Policy"434 ]435 )436 statement_ids = {s["Sid"] for s in data.get("Statement", ())}437 self.assertTrue("AddMe" in statement_ids)438 self.assertTrue("RemoveMe" not in statement_ids)439 self.assertTrue("SpecificAllow" in statement_ids)440 def test_sns_topic_encryption(self):441 session_factory = self.replay_flight_data('test_sns_kms_related_filter_test')442 kms = session_factory().client('kms')443 p = self.load_policy(444 {445 'name': 'test-sns-kms-related-filter',446 'resource': 'sns',447 'filters': [448 {449 'TopicArn': 'arn:aws:sns:us-east-1:644160558196:test'450 },451 {452 'type': 'kms-key',453 'key': 'c7n:AliasName',454 'value': 'alias/skunk/trails'455 }456 ]457 },458 session_factory=session_factory459 )460 resources = p.run()461 self.assertTrue(len(resources), 1)462 aliases = kms.list_aliases(KeyId=resources[0]['KmsMasterKeyId'])463 self.assertEqual(aliases['Aliases'][0]['AliasName'], 'alias/skunk/trails')464 def test_set_sns_topic_encryption(self):465 session_factory = self.replay_flight_data('test_sns_set_encryption')466 topic = 'arn:aws:sns:us-west-1:644160558196:test'467 p = self.load_policy(468 {469 'name': 'test-sns-kms-related-filter',470 'resource': 'sns',471 'filters': [472 {473 'TopicArn': topic474 },475 {476 'KmsMasterKeyId': 'absent'477 }478 ],479 'actions': [480 {481 'type': 'set-encryption'482 }483 ]484 },485 session_factory=session_factory486 )487 resources = p.run()488 self.assertEqual(len(resources), 1)489 sns = session_factory().client('sns')490 attributes = sns.get_topic_attributes(TopicArn=topic)491 self.assertTrue(attributes['Attributes']['KmsMasterKeyId'], 'alias/aws/sns')492 def test_sns_disable_encryption(self):493 session_factory = self.replay_flight_data('test_sns_unset_encryption')494 topic = 'arn:aws:sns:us-west-1:644160558196:test'495 p = self.load_policy(496 {497 'name': 'test-sns-kms-related-filter',498 'resource': 'sns',499 'filters': [500 {501 'TopicArn': topic502 },503 {504 'KmsMasterKeyId': 'alias/aws/sns'505 }506 ],507 'actions': [508 {509 'type': 'set-encryption',510 'enabled': False511 }512 ]513 },514 session_factory=session_factory515 )516 resources = p.run()517 self.assertEqual(len(resources), 1)518 sns = session_factory().client('sns')519 attributes = sns.get_topic_attributes(TopicArn=topic)['Attributes']520 self.assertFalse(attributes.get('KmsMasterKeyId'))521 def test_sns_set_encryption_custom_key(self):522 session_factory = self.replay_flight_data('test_sns_set_encryption_custom_key')523 topic = 'arn:aws:sns:us-west-1:644160558196:test'524 key_alias = 'alias/alias/test/key'525 sns = session_factory().client('sns')526 p = self.load_policy(527 {528 'name': 'test-sns-kms-related-filter-alias',529 'resource': 'sns',530 'filters': [531 {532 'TopicArn': topic533 },534 {535 'KmsMasterKeyId': 'absent'536 }537 ],538 'actions': [539 {540 'type': 'set-encryption',541 'key': key_alias542 }543 ]544 },545 session_factory=session_factory546 )547 resources = p.run()548 self.assertEqual(len(resources), 1)549 attributes = sns.get_topic_attributes(TopicArn=topic)['Attributes']550 self.assertEqual(attributes.get('KmsMasterKeyId'), key_alias)551 def test_sns_delete(self):552 session_factory = self.replay_flight_data('test_sns_delete_topic')553 policy = """554 name: delete-sns555 resource: aws.sns556 filters:557 - TopicArn: arn:aws:sns:us-west-1:644160558196:test558 actions:559 - type: delete560 """561 p = self.load_policy(yaml_load(policy), session_factory=session_factory)562 resources = p.run()563 self.assertEqual(len(resources), 1)...

Full Screen

Full Screen

sns_barrel.py

Source:sns_barrel.py Github

copy

Full Screen

...23 'list_topics': ['Topics']24 }25 def __init__(self, oil, **kwargs):26 super().__init__(oil, **kwargs)27 def get_topic_attributes(self):28 """29 Example30 barrel = ELBBarrel()31 results = barrel.describe_load_balancer_attributes()32 Returns33 {34 'us-east-1': {35 'ALoadBalancer': {36 ... # Attributes like CrossZoneLoadBalancing, AccessLog37 }38 }39 }40 Depends on describe_load_balancers41 """42 items = {}43 for region, client in self.clients.items():44 items[region] = {}45 for topic in self.tap('list_topics')[region]:46 response = client.get_topic_attributes(47 TopicArn=topic['TopicArn']48 )49 items[region][topic['TopicArn']] = response['Attributes']...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful