Best Python code snippet using localstack_python
test_sns.py
Source:test_sns.py  
...22        client = session_factory().client("sns")23        name = "test-sns-remove-matched"24        topic_arn = client.create_topic(Name=name)["TopicArn"]25        self.addCleanup(client.delete_topic, TopicArn=topic_arn)26        client.set_topic_attributes(27            TopicArn=topic_arn,28            AttributeName="Policy",29            AttributeValue=json.dumps(30                {31                    "Version": "2012-10-17",32                    "Statement": [33                        {34                            "Sid": "SpecificAllow",35                            "Effect": "Allow",36                            "Principal": {"AWS": "arn:aws:iam::644160558196:root"},37                            "Action": ["SNS:Subscribe"],38                            "Resource": topic_arn,39                        },40                        {41                            "Sid": "Public",42                            "Effect": "Allow",43                            "Principal": "*",44                            "Action": ["SNS:GetTopicAttributes"],45                            "Resource": topic_arn,46                        },47                    ],48                }49            ),50        )51        p = self.load_policy(52            {53                "name": "sns-rm-matched",54                "resource": "sns",55                "filters": [56                    {"TopicArn": topic_arn},57                    {"type": "cross-account", "whitelist": ["123456789012"]},58                ],59                "actions": [{"type": "remove-statements", "statement_ids": "matched"}],60            },61            session_factory=session_factory,62        )63        resources = p.run()64        self.assertEqual([r["TopicArn"] for r in resources], [topic_arn])65        data = json.loads(66            client.get_topic_attributes(TopicArn=resources[0]["TopicArn"])[67                "Attributes"68            ][69                "Policy"70            ]71        )72        self.assertEqual(73            [s["Sid"] for s in data.get("Statement", ())], ["SpecificAllow"]74        )75    @functional76    def test_sns_remove_named(self):77        session_factory = self.replay_flight_data("test_sns_remove_named")78        client = session_factory().client("sns")79        name = "test-sns-remove-named"80        topic_arn = client.create_topic(Name=name)["TopicArn"]81        self.addCleanup(client.delete_topic, TopicArn=topic_arn)82        client.set_topic_attributes(83            TopicArn=topic_arn,84            AttributeName="Policy",85            AttributeValue=json.dumps(86                {87                    "Version": "2012-10-17",88                    "Statement": [89                        {90                            "Sid": "SpecificAllow",91                            "Effect": "Allow",92                            "Principal": "*",93                            "Action": ["SNS:Subscribe"],94                            "Resource": topic_arn,95                        },96                        {97                            "Sid": "RemoveMe",98                            "Effect": "Allow",99                            "Principal": "*",100                            "Action": ["SNS:GetTopicAttributes"],101                            "Resource": topic_arn,102                        },103                    ],104                }105            ),106        )107        p = self.load_policy(108            {109                "name": "sns-rm-named",110                "resource": "sns",111                "filters": [{"TopicArn": topic_arn}],112                "actions": [113                    {"type": "remove-statements", "statement_ids": ["RemoveMe"]}114                ],115            },116            session_factory=session_factory,117        )118        resources = p.run()119        self.assertEqual(len(resources), 1)120        data = json.loads(121            client.get_topic_attributes(TopicArn=resources[0]["TopicArn"])[122                "Attributes"123            ][124                "Policy"125            ]126        )127        self.assertTrue("RemoveMe" not in [s["Sid"] for s in data.get("Statement", ())])128    @functional129    def test_sns_modify_replace_policy(self):130        session_factory = self.replay_flight_data("test_sns_modify_replace_policy")131        client = session_factory().client("sns")132        name = "test_sns_modify_replace_policy"133        topic_arn = client.create_topic(Name=name)["TopicArn"]134        self.addCleanup(client.delete_topic, TopicArn=topic_arn)135        client.set_topic_attributes(136            TopicArn=topic_arn,137            AttributeName="Policy",138            AttributeValue=json.dumps(139                {140                    "Version": "2012-10-17",141                    "Statement": [142                        {143                            "Sid": "SpecificAllow",144                            "Effect": "Allow",145                            "Principal": "*",146                            "Action": ["SNS:Subscribe"],147                            "Resource": topic_arn,148                        }149                    ],150                }151            ),152        )153        p = self.load_policy(154            {155                "name": "sns-modify-replace-policy",156                "resource": "sns",157                "filters": [{"TopicArn": topic_arn}],158                "actions": [159                    {160                        "type": "modify-policy",161                        "add-statements": [162                            {163                                "Sid": "ReplaceWithMe",164                                "Effect": "Allow",165                                "Principal": "*",166                                "Action": ["SNS:GetTopicAttributes"],167                                "Resource": topic_arn,168                            }169                        ],170                        "remove-statements": "*",171                    }172                ],173            },174            session_factory=session_factory,175        )176        resources = p.run()177        self.assertEqual(len(resources), 1)178        data = json.loads(179            client.get_topic_attributes(TopicArn=resources[0]["TopicArn"])[180                "Attributes"181            ][182                "Policy"183            ]184        )185        self.assertTrue(186            "ReplaceWithMe" in [s["Sid"] for s in data.get("Statement", ())]187        )188    @functional189    def test_sns_account_id_template(self):190        session_factory = self.replay_flight_data("test_sns_account_id_template")191        client = session_factory().client("sns")192        name = "test_sns_account_id_template"193        topic_arn = client.create_topic(Name=name)["TopicArn"]194        self.addCleanup(client.delete_topic, TopicArn=topic_arn)195        client.set_topic_attributes(196            TopicArn=topic_arn,197            AttributeName="Policy",198            AttributeValue=json.dumps(199                {200                    "Version": "2012-10-17",201                    "Statement": [202                        {203                            "Sid": "SpecificAllow",204                            "Effect": "Allow",205                            "Principal": "*",206                            "Action": ["SNS:Subscribe"],207                            "Resource": topic_arn,208                        }209                    ],210                }211            ),212        )213        p = self.load_policy(214            {215                "name": "sns-modify-replace-policy",216                "resource": "sns",217                "filters": [{"TopicArn": topic_arn}],218                "actions": [219                    {220                        "type": "modify-policy",221                        "add-statements": [222                            {223                                "Sid": "__default_statement_ID_{account_id}",224                                "Effect": "Allow",225                                "Principal": {"Service": "s3.amazonaws.com"},226                                "Action": "SNS:Publish",227                                "Resource": topic_arn,228                                "Condition": {229                                    "StringEquals": {230                                        "AWS:SourceAccount": "{account_id}"231                                    },232                                    "ArnLike": {"aws:SourceArn": "arn:aws:s3:*:*:*"},233                                },234                            }235                        ],236                        "remove-statements": "*",237                    }238                ],239            },240            session_factory=session_factory,241        )242        resources = p.run()243        self.assertEqual(len(resources), 1)244        data = json.loads(245            client.get_topic_attributes(TopicArn=resources[0]["TopicArn"])[246                "Attributes"247            ][248                "Policy"249            ]250        )251        self.assertTrue(252            "__default_statement_ID_" +253            self.account_id in [s["Sid"] for s in data.get("Statement", ())]254        )255    @functional256    def test_sns_modify_remove_policy(self):257        session_factory = self.replay_flight_data("test_sns_modify_remove_policy")258        client = session_factory().client("sns")259        name = "test_sns_modify_remove_policy"260        topic_arn = client.create_topic(Name=name)["TopicArn"]261        self.addCleanup(client.delete_topic, TopicArn=topic_arn)262        client.set_topic_attributes(263            TopicArn=topic_arn,264            AttributeName="Policy",265            AttributeValue=json.dumps(266                {267                    "Version": "2012-10-17",268                    "Statement": [269                        {270                            "Sid": "SpecificAllow",271                            "Effect": "Allow",272                            "Principal": "*",273                            "Action": ["SNS:Subscribe"],274                            "Resource": topic_arn,275                        },276                        {277                            "Sid": "RemoveMe",278                            "Effect": "Allow",279                            "Principal": "*",280                            "Action": ["SNS:GetTopicAttributes"],281                            "Resource": topic_arn,282                        },283                    ],284                }285            ),286        )287        p = self.load_policy(288            {289                "name": "sns-modify-remove-policy",290                "resource": "sns",291                "filters": [{"TopicArn": topic_arn}],292                "actions": [293                    {294                        "type": "modify-policy",295                        "add-statements": [],296                        "remove-statements": ["RemoveMe"],297                    }298                ],299            },300            session_factory=session_factory,301        )302        resources = p.run()303        self.assertEqual(len(resources), 1)304        data = json.loads(305            client.get_topic_attributes(TopicArn=resources[0]["TopicArn"])[306                "Attributes"307            ][308                "Policy"309            ]310        )311        self.assertTrue("RemoveMe" not in [s["Sid"] for s in data.get("Statement", ())])312    @functional313    def test_sns_modify_add_policy(self):314        session_factory = self.replay_flight_data("test_sns_modify_add_policy")315        client = session_factory().client("sns")316        name = "test_sns_modify_add_policy"317        topic_arn = client.create_topic(Name=name)["TopicArn"]318        self.addCleanup(client.delete_topic, TopicArn=topic_arn)319        client.set_topic_attributes(320            TopicArn=topic_arn,321            AttributeName="Policy",322            AttributeValue=json.dumps(323                {324                    "Version": "2012-10-17",325                    "Statement": [326                        {327                            "Sid": "SpecificAllow",328                            "Effect": "Allow",329                            "Principal": "*",330                            "Action": ["SNS:Subscribe"],331                            "Resource": topic_arn,332                        }333                    ],334                }335            ),336        )337        p = self.load_policy(338            {339                "name": "sns-modify-add-policy",340                "resource": "sns",341                "filters": [{"TopicArn": topic_arn}],342                "actions": [343                    {344                        "type": "modify-policy",345                        "add-statements": [346                            {347                                "Sid": "AddMe",348                                "Effect": "Allow",349                                "Principal": "*",350                                "Action": ["SNS:GetTopicAttributes"],351                                "Resource": topic_arn,352                            }353                        ],354                        "remove-statements": [],355                    }356                ],357            },358            session_factory=session_factory,359        )360        resources = p.run()361        self.assertEqual(len(resources), 1)362        data = json.loads(363            client.get_topic_attributes(TopicArn=resources[0]["TopicArn"])[364                "Attributes"365            ][366                "Policy"367            ]368        )369        self.assertTrue("AddMe" in [s["Sid"] for s in data.get("Statement", ())])370    @functional371    def test_sns_modify_add_and_remove_policy(self):372        session_factory = self.replay_flight_data(373            "test_sns_modify_add_and_remove_policy"374        )375        client = session_factory().client("sns")376        name = "test_sns_modify_add_and_remove_policy"377        topic_arn = client.create_topic(Name=name)["TopicArn"]378        self.addCleanup(client.delete_topic, TopicArn=topic_arn)379        client.set_topic_attributes(380            TopicArn=topic_arn,381            AttributeName="Policy",382            AttributeValue=json.dumps(383                {384                    "Version": "2012-10-17",385                    "Statement": [386                        {387                            "Sid": "SpecificAllow",388                            "Effect": "Allow",389                            "Principal": "*",390                            "Action": ["SNS:Subscribe"],391                            "Resource": topic_arn,392                        },393                        {...create_sns_topics.py
Source:create_sns_topics.py  
...37    scale_down_topic = sns_client.create_topic(38        Name=awsvars['scaleDownTopicName'],39    )40    print("Scale Down Topic Created is: ", scale_down_topic)41    set_topic_attributes(awsvars, sns_client, asg_client, scale_up_topic, scale_down_topic)42def set_topic_attributes(awsvars, sns_client, asg_client, scale_up_topic, scale_down_topic):43    scale_up_response = sns_client.set_topic_attributes(44        TopicArn=scale_up_topic['TopicArn'],45        AttributeName=awsvars['attributeName'],46        AttributeValue=awsvars['scaleUpNotificationName']47    )48    print("Scale Up notification created is: ", scale_up_response)49    scale_down_response = sns_client.set_topic_attributes(50        TopicArn=scale_down_topic['TopicArn'],51        AttributeName=awsvars['attributeName'],52        AttributeValue=awsvars['scaleDownNotificationName']53    )54    print("Scale Down notification created is: ", scale_down_response)55    subscribe_to_topics(awsvars, sns_client, asg_client, scale_up_topic, scale_down_topic)56def subscribe_to_topics(awsvars, sns_client, asg_client, scale_up_topic, scale_down_topic):57    sns_subscribe_up = sns_client.subscribe(58        TopicArn=scale_up_topic['TopicArn'],59        Protocol=awsvars['topicProtocol'],60        Endpoint=awsvars['emailAddress'],61        ReturnSubscriptionArn=True62    )63    print("Subscribing to Scale Up Events: ", sns_subscribe_up)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
