How to use set_topic_attributes method in localstack

Best Python code snippet using localstack_python

test_sns.py

Source:test_sns.py Github

copy

Full Screen

...22 client = session_factory().client("sns")23 name = "test-sns-remove-matched"24 topic_arn = client.create_topic(Name=name)["TopicArn"]25 self.addCleanup(client.delete_topic, TopicArn=topic_arn)26 client.set_topic_attributes(27 TopicArn=topic_arn,28 AttributeName="Policy",29 AttributeValue=json.dumps(30 {31 "Version": "2012-10-17",32 "Statement": [33 {34 "Sid": "SpecificAllow",35 "Effect": "Allow",36 "Principal": {"AWS": "arn:aws:iam::644160558196:root"},37 "Action": ["SNS:Subscribe"],38 "Resource": topic_arn,39 },40 {41 "Sid": "Public",42 "Effect": "Allow",43 "Principal": "*",44 "Action": ["SNS:GetTopicAttributes"],45 "Resource": topic_arn,46 },47 ],48 }49 ),50 )51 p = self.load_policy(52 {53 "name": "sns-rm-matched",54 "resource": "sns",55 "filters": [56 {"TopicArn": topic_arn},57 {"type": "cross-account", "whitelist": ["123456789012"]},58 ],59 "actions": [{"type": "remove-statements", "statement_ids": "matched"}],60 },61 session_factory=session_factory,62 )63 resources = p.run()64 self.assertEqual([r["TopicArn"] for r in resources], [topic_arn])65 data = json.loads(66 client.get_topic_attributes(TopicArn=resources[0]["TopicArn"])[67 "Attributes"68 ][69 "Policy"70 ]71 )72 self.assertEqual(73 [s["Sid"] for s in data.get("Statement", ())], ["SpecificAllow"]74 )75 @functional76 def test_sns_remove_named(self):77 session_factory = self.replay_flight_data("test_sns_remove_named")78 client = session_factory().client("sns")79 name = "test-sns-remove-named"80 topic_arn = client.create_topic(Name=name)["TopicArn"]81 self.addCleanup(client.delete_topic, TopicArn=topic_arn)82 client.set_topic_attributes(83 TopicArn=topic_arn,84 AttributeName="Policy",85 AttributeValue=json.dumps(86 {87 "Version": "2012-10-17",88 "Statement": [89 {90 "Sid": "SpecificAllow",91 "Effect": "Allow",92 "Principal": "*",93 "Action": ["SNS:Subscribe"],94 "Resource": topic_arn,95 },96 {97 "Sid": "RemoveMe",98 "Effect": "Allow",99 "Principal": "*",100 "Action": ["SNS:GetTopicAttributes"],101 "Resource": topic_arn,102 },103 ],104 }105 ),106 )107 p = self.load_policy(108 {109 "name": "sns-rm-named",110 "resource": "sns",111 "filters": [{"TopicArn": topic_arn}],112 "actions": [113 {"type": "remove-statements", "statement_ids": ["RemoveMe"]}114 ],115 },116 session_factory=session_factory,117 )118 resources = p.run()119 self.assertEqual(len(resources), 1)120 data = json.loads(121 client.get_topic_attributes(TopicArn=resources[0]["TopicArn"])[122 "Attributes"123 ][124 "Policy"125 ]126 )127 self.assertTrue("RemoveMe" not in [s["Sid"] for s in data.get("Statement", ())])128 @functional129 def test_sns_modify_replace_policy(self):130 session_factory = self.replay_flight_data("test_sns_modify_replace_policy")131 client = session_factory().client("sns")132 name = "test_sns_modify_replace_policy"133 topic_arn = client.create_topic(Name=name)["TopicArn"]134 self.addCleanup(client.delete_topic, TopicArn=topic_arn)135 client.set_topic_attributes(136 TopicArn=topic_arn,137 AttributeName="Policy",138 AttributeValue=json.dumps(139 {140 "Version": "2012-10-17",141 "Statement": [142 {143 "Sid": "SpecificAllow",144 "Effect": "Allow",145 "Principal": "*",146 "Action": ["SNS:Subscribe"],147 "Resource": topic_arn,148 }149 ],150 }151 ),152 )153 p = self.load_policy(154 {155 "name": "sns-modify-replace-policy",156 "resource": "sns",157 "filters": [{"TopicArn": topic_arn}],158 "actions": [159 {160 "type": "modify-policy",161 "add-statements": [162 {163 "Sid": "ReplaceWithMe",164 "Effect": "Allow",165 "Principal": "*",166 "Action": ["SNS:GetTopicAttributes"],167 "Resource": topic_arn,168 }169 ],170 "remove-statements": "*",171 }172 ],173 },174 session_factory=session_factory,175 )176 resources = p.run()177 self.assertEqual(len(resources), 1)178 data = json.loads(179 client.get_topic_attributes(TopicArn=resources[0]["TopicArn"])[180 "Attributes"181 ][182 "Policy"183 ]184 )185 self.assertTrue(186 "ReplaceWithMe" in [s["Sid"] for s in data.get("Statement", ())]187 )188 @functional189 def test_sns_account_id_template(self):190 session_factory = self.replay_flight_data("test_sns_account_id_template")191 client = session_factory().client("sns")192 name = "test_sns_account_id_template"193 topic_arn = client.create_topic(Name=name)["TopicArn"]194 self.addCleanup(client.delete_topic, TopicArn=topic_arn)195 client.set_topic_attributes(196 TopicArn=topic_arn,197 AttributeName="Policy",198 AttributeValue=json.dumps(199 {200 "Version": "2012-10-17",201 "Statement": [202 {203 "Sid": "SpecificAllow",204 "Effect": "Allow",205 "Principal": "*",206 "Action": ["SNS:Subscribe"],207 "Resource": topic_arn,208 }209 ],210 }211 ),212 )213 p = self.load_policy(214 {215 "name": "sns-modify-replace-policy",216 "resource": "sns",217 "filters": [{"TopicArn": topic_arn}],218 "actions": [219 {220 "type": "modify-policy",221 "add-statements": [222 {223 "Sid": "__default_statement_ID_{account_id}",224 "Effect": "Allow",225 "Principal": {"Service": "s3.amazonaws.com"},226 "Action": "SNS:Publish",227 "Resource": topic_arn,228 "Condition": {229 "StringEquals": {230 "AWS:SourceAccount": "{account_id}"231 },232 "ArnLike": {"aws:SourceArn": "arn:aws:s3:*:*:*"},233 },234 }235 ],236 "remove-statements": "*",237 }238 ],239 },240 session_factory=session_factory,241 )242 resources = p.run()243 self.assertEqual(len(resources), 1)244 data = json.loads(245 client.get_topic_attributes(TopicArn=resources[0]["TopicArn"])[246 "Attributes"247 ][248 "Policy"249 ]250 )251 self.assertTrue(252 "__default_statement_ID_" +253 self.account_id in [s["Sid"] for s in data.get("Statement", ())]254 )255 @functional256 def test_sns_modify_remove_policy(self):257 session_factory = self.replay_flight_data("test_sns_modify_remove_policy")258 client = session_factory().client("sns")259 name = "test_sns_modify_remove_policy"260 topic_arn = client.create_topic(Name=name)["TopicArn"]261 self.addCleanup(client.delete_topic, TopicArn=topic_arn)262 client.set_topic_attributes(263 TopicArn=topic_arn,264 AttributeName="Policy",265 AttributeValue=json.dumps(266 {267 "Version": "2012-10-17",268 "Statement": [269 {270 "Sid": "SpecificAllow",271 "Effect": "Allow",272 "Principal": "*",273 "Action": ["SNS:Subscribe"],274 "Resource": topic_arn,275 },276 {277 "Sid": "RemoveMe",278 "Effect": "Allow",279 "Principal": "*",280 "Action": ["SNS:GetTopicAttributes"],281 "Resource": topic_arn,282 },283 ],284 }285 ),286 )287 p = self.load_policy(288 {289 "name": "sns-modify-remove-policy",290 "resource": "sns",291 "filters": [{"TopicArn": topic_arn}],292 "actions": [293 {294 "type": "modify-policy",295 "add-statements": [],296 "remove-statements": ["RemoveMe"],297 }298 ],299 },300 session_factory=session_factory,301 )302 resources = p.run()303 self.assertEqual(len(resources), 1)304 data = json.loads(305 client.get_topic_attributes(TopicArn=resources[0]["TopicArn"])[306 "Attributes"307 ][308 "Policy"309 ]310 )311 self.assertTrue("RemoveMe" not in [s["Sid"] for s in data.get("Statement", ())])312 @functional313 def test_sns_modify_add_policy(self):314 session_factory = self.replay_flight_data("test_sns_modify_add_policy")315 client = session_factory().client("sns")316 name = "test_sns_modify_add_policy"317 topic_arn = client.create_topic(Name=name)["TopicArn"]318 self.addCleanup(client.delete_topic, TopicArn=topic_arn)319 client.set_topic_attributes(320 TopicArn=topic_arn,321 AttributeName="Policy",322 AttributeValue=json.dumps(323 {324 "Version": "2012-10-17",325 "Statement": [326 {327 "Sid": "SpecificAllow",328 "Effect": "Allow",329 "Principal": "*",330 "Action": ["SNS:Subscribe"],331 "Resource": topic_arn,332 }333 ],334 }335 ),336 )337 p = self.load_policy(338 {339 "name": "sns-modify-add-policy",340 "resource": "sns",341 "filters": [{"TopicArn": topic_arn}],342 "actions": [343 {344 "type": "modify-policy",345 "add-statements": [346 {347 "Sid": "AddMe",348 "Effect": "Allow",349 "Principal": "*",350 "Action": ["SNS:GetTopicAttributes"],351 "Resource": topic_arn,352 }353 ],354 "remove-statements": [],355 }356 ],357 },358 session_factory=session_factory,359 )360 resources = p.run()361 self.assertEqual(len(resources), 1)362 data = json.loads(363 client.get_topic_attributes(TopicArn=resources[0]["TopicArn"])[364 "Attributes"365 ][366 "Policy"367 ]368 )369 self.assertTrue("AddMe" in [s["Sid"] for s in data.get("Statement", ())])370 @functional371 def test_sns_modify_add_and_remove_policy(self):372 session_factory = self.replay_flight_data(373 "test_sns_modify_add_and_remove_policy"374 )375 client = session_factory().client("sns")376 name = "test_sns_modify_add_and_remove_policy"377 topic_arn = client.create_topic(Name=name)["TopicArn"]378 self.addCleanup(client.delete_topic, TopicArn=topic_arn)379 client.set_topic_attributes(380 TopicArn=topic_arn,381 AttributeName="Policy",382 AttributeValue=json.dumps(383 {384 "Version": "2012-10-17",385 "Statement": [386 {387 "Sid": "SpecificAllow",388 "Effect": "Allow",389 "Principal": "*",390 "Action": ["SNS:Subscribe"],391 "Resource": topic_arn,392 },393 {...

Full Screen

Full Screen

create_sns_topics.py

Source:create_sns_topics.py Github

copy

Full Screen

...37 scale_down_topic = sns_client.create_topic(38 Name=awsvars['scaleDownTopicName'],39 )40 print("Scale Down Topic Created is: ", scale_down_topic)41 set_topic_attributes(awsvars, sns_client, asg_client, scale_up_topic, scale_down_topic)42def set_topic_attributes(awsvars, sns_client, asg_client, scale_up_topic, scale_down_topic):43 scale_up_response = sns_client.set_topic_attributes(44 TopicArn=scale_up_topic['TopicArn'],45 AttributeName=awsvars['attributeName'],46 AttributeValue=awsvars['scaleUpNotificationName']47 )48 print("Scale Up notification created is: ", scale_up_response)49 scale_down_response = sns_client.set_topic_attributes(50 TopicArn=scale_down_topic['TopicArn'],51 AttributeName=awsvars['attributeName'],52 AttributeValue=awsvars['scaleDownNotificationName']53 )54 print("Scale Down notification created is: ", scale_down_response)55 subscribe_to_topics(awsvars, sns_client, asg_client, scale_up_topic, scale_down_topic)56def subscribe_to_topics(awsvars, sns_client, asg_client, scale_up_topic, scale_down_topic):57 sns_subscribe_up = sns_client.subscribe(58 TopicArn=scale_up_topic['TopicArn'],59 Protocol=awsvars['topicProtocol'],60 Endpoint=awsvars['emailAddress'],61 ReturnSubscriptionArn=True62 )63 print("Subscribing to Scale Up Events: ", sns_subscribe_up)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful