Best Python code snippet using localstack_python
test_cdk_engine.py
Source:test_cdk_engine.py  
...50            OnFailure='ROLLBACK',51            Capabilities=['CAPABILITY_IAM', 'CAPABILITY_NAMED_IAM']52        )53        return mocked_response54    def mock_describe_stack_events(self, stack_name: str, stack_template: dict):55        resource_id = list(stack_template["Resources"].keys())[0]56        events_finish = {57            "StackEvents": [58                {59                    "StackId": uuid4().__str__(),60                    "EventId": "1",61                    "StackName": stack_name,62                    "LogicalResourceId": resource_id,63                    "PhysicalResourceId": resource_id,64                    "ResourceType": ".",65                    "Timestamp": datetime(2021, 1, 1),66                    "ResourceStatus": "CREATE_COMPLETE",67                    "ResourceStatusReason": "...",68                    "ResourceProperties": ".",69                    "ClientRequestToken": "."70                },71            ],72            "NextToken": "string"73        }74        self.stubber.add_response("describe_stack_events", events_finish, {75            "StackName": stack_name})76        mocked_response = self.cf_client.describe_stack_events(77            StackName=stack_name78        )79        return mocked_response80    def mock_stack_template(self):81        return {82            "Resources": {83                "dp123456789123rawelectronics": {84                    "Type": "AWS::S3::Bucket",85                    "Properties": {86                        "BucketEncryption": {87                            "ServerSideEncryptionConfiguration": [88                                  {89                                      "ServerSideEncryptionByDefault": {90                                          "SSEAlgorithm": "AES256"91                                      }92                                  }93                            ]94                        },95                        "BucketName": "123456789123-raw-electronics",96                        "LifecycleConfiguration": {97                            "Rules": [98                                {99                                      "Id": "default_trusted",100                                      "Status": "Enabled",101                                      "Transitions": [102                                          {103                                              "StorageClass": "STANDARD_IA",104                                              "TransitionInDays": 30105                                          },106                                          {107                                              "StorageClass": "GLACIER",108                                              "TransitionInDays": 60109                                          }110                                      ]111                                }112                            ]113                        },114                        "PublicAccessBlockConfiguration": {115                            "BlockPublicAcls": "true",116                            "BlockPublicPolicy": "true",117                            "IgnorePublicAcls": "true",118                            "RestrictPublicBuckets": "true"119                        }120                    },121                    "UpdateReplacePolicy": "Delete",122                    "DeletionPolicy": "Delete"123                }124            }125        }126    def get_exception_from_update_stack(self, exception: str, stack_name: str, stack_template: str):127        self.stubber.add_client_error("update_stack",128                                      service_error_code=exception,129                                      service_message=exception,130                                      http_status_code=404)131        try:132            self.cf_client.update_stack(133                StackName=stack_name,134                TemplateBody=json.dumps(stack_template),135                Capabilities=['CAPABILITY_IAM', 'CAPABILITY_NAMED_IAM']136            )137        except Exception as error:138            return error139    def get_exception_from_create_stack(self, exception: str, stack_name: str, stack_template: str):140        self.stubber.add_client_error("create_stack",141                                      service_error_code=exception,142                                      service_message=exception,143                                      http_status_code=404)144        try:145            self.cf_client.create_stack(146                StackName=stack_name,147                TemplateBody=json.dumps(stack_template),148                OnFailure='ROLLBACK',149                Capabilities=['CAPABILITY_IAM', 'CAPABILITY_NAMED_IAM']150            )151        except Exception as error:152            return error153    def get_exception_from_describe_stack_events(self, exception: str, stack_name: str):154        self.stubber.add_client_error("describe_stack_events",155                                      service_error_code=exception,156                                      service_message=exception,157                                      http_status_code=404)158        try:159            self.cf_client.describe_stack_events(160                StackName=stack_name161            )162        except Exception as error:163            return error164    def test_cdk_engine_app(self):165        cdk_engine = CDKEngine()166        assert isinstance(cdk_engine.app, cdk.App)167    def test_new_app(self):168        self.new_app()169    def test_update_stack(self):170        stack_template = self.mock_stack_template()171        my_stack = "my-stack"172        self.mock_cf.update_stack.side_effect = [173            self.mock_update_stack(174                stack_name=my_stack,175                stack_template=stack_template176            )177        ]178        self.mock_cf.describe_stack_events.side_effect = [179            self.mock_describe_stack_events(180                stack_name=my_stack,181                stack_template=stack_template182            )183        ]184        self._CDKEngine__update(185            self.mock_cf, my_stack, stack_template)186    @mock.patch("plugin.infrastructure.resource.aws.cdk.engine.helpers.is_created_resource", autospec=True)187    @mock.patch("plugin.infrastructure.resource.aws.cdk.engine.helpers.boto3", autospec=True)188    def test_deploy_stack(self, mock_boto3, mock_created_resource):189        self.new_app()190        my_stack = "my-stack"191        stack = Stack(self.app, my_stack)192        kms = stack.create_kms("test-kms", "arn::kms")193        stack.create_bucket_assets(194                name="my-bucket-to-put-assets", kms=kms)195        self.mock_cf.create_stack.side_effect = [196            self.mock_create_stack(197                stack_name=my_stack,198                stack_template=self.mock_stack_template()199            )200        ]201        self.mock_cf.describe_stack_events.side_effect = [202            self.mock_describe_stack_events(203                stack_name=my_stack,204                stack_template=self.mock_stack_template()205            )206        ]207        mock_created_resource.side_effect = [False, True]208        self.deploy(stack_name=my_stack, region="us-east-1")209    def test_raises_error_when_updating_stack(self):210        stack_template = self.mock_stack_template()211        my_stack = "my-stack"212        for exception in [213            "InsufficientCapabilitiesException",214                "TokenAlreadyExistsException"]:215            self.mock_cf.update_stack.side_effect = [216                self.get_exception_from_update_stack(217                    exception=exception,218                    stack_name=my_stack,219                    stack_template=stack_template220                )221            ]222            self.mock_cf.describe_stack_events.side_effect = [223                self.mock_describe_stack_events(224                    stack_name=my_stack,225                    stack_template=stack_template226                )227            ]228            with pytest.raises(ClientError):229                self._CDKEngine__update(self.mock_cf, my_stack, stack_template)230    @mock.patch("plugin.infrastructure.resource.aws.cdk.engine.helpers.boto3", autospec=True)231    def test_raises_already_exists_exception_when_deploy_stack(self, mock_boto3):232        mock_session = mock_boto3.Session.return_value233        mock_cf = mock_session.client.return_value234        self.new_app()235        my_stack = "my-stack"236        stack = Stack(self.app, my_stack)237        kms = stack.create_kms("test-kms", "arn::kms")238        stack.create_bucket_assets(239            name="my-bucket-to-put-assets", kms=kms)240        mock_cf.create_stack.side_effect = [241            self.get_exception_from_create_stack(242                exception="AlreadyExistsException",243                stack_name=my_stack,244                stack_template=self.mock_stack_template()245            )246        ]247        mock_cf.describe_stack_events.side_effect = [248            self.get_exception_from_describe_stack_events(249                exception="AlreadyExistsException",250                stack_name=my_stack251            )252        ]253        with pytest.raises(ClientError):254            self.deploy(stack_name=my_stack, region="us-east-1")255    @mock.patch("plugin.infrastructure.resource.aws.cdk.engine.helpers.boto3", autospec=True)256    def test_raises_unexpected_exception_when_deploy_stack(self, mock_boto3):257        mock_session = mock_boto3.Session.return_value258        mock_cf = mock_session.client.return_value259        self.new_app()260        my_stack = "my-stack"261        stack = Stack(self.app, my_stack)262        kms = stack.create_kms("test-kms", "arn::kms")263        stack.create_bucket_assets(264            name="my-bucket-to-put-assets", kms=kms)265        mock_cf.create_stack.side_effect = [266            self.get_exception_from_create_stack(267                exception="UnexpectedException",268                stack_name=my_stack,269                stack_template=self.mock_stack_template()270            )271        ]272        mock_cf.describe_stack_events.side_effect = [273            self.get_exception_from_describe_stack_events(274                exception="AlreadyExistsException",275                stack_name=my_stack276            )277        ]278        with pytest.raises(ClientError):...cfn.py
Source:cfn.py  
...57        :return: generator object yielding stack events58        :rtype: generator59        """60        if initial_entry is None:61            return self._tail_stack_events(name, len(self.describe_stack_events(name)))62        elif initial_entry < 0:63            return self._tail_stack_events(name, len(self.describe_stack_events(name)) + initial_entry)64        else:65            return self._tail_stack_events(name, initial_entry)66    def describe_stack_events(self, name):67        """68    Describe CFN stack events69    :param name: stack name70    :type name: str71    :return: stack events72    :rtype: list of boto.cloudformation.stack.StackEvent73    """74        return boto_all(self.connection.describe_stack_events,StackName=name)75    def _tail_stack_events(self, name, initial_entry):76        """77        See tail_stack_events()78        """79        previous_stack_events = initial_entry80        while True:81            stack = self.describe_stack(name)82            stack_events = self.describe_stack_events(name)83            if len(stack_events) > previous_stack_events:84                # iterate on all new events, at reversed order (the list is sorted from newest to oldest)85                for event in stack_events[:-previous_stack_events or None][::-1]:86                    yield {'resource_type': event.resource_type,87                           'logical_resource_id': event.logical_resource_id,88                           'physical_resource_id': event.physical_resource_id,89                           'resource_status': event.resource_status,90                           'resource_status_reason': event.resource_status_reason,91                           'timestamp': event.timestamp}92                previous_stack_events = len(stack_events)93            if stack.stack_status.endswith('_FAILED') or \94                    stack.stack_status in ('ROLLBACK_COMPLETE', 'UPDATE_ROLLBACK_COMPLETE'):95                yield StackFailStatus(stack.stack_status)96                break...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
