Best Python code snippet using localstack_python
cloudformation_starter.py
Source:cloudformation_starter.py  
...685        return S3_Bucket_get_cfn_attribute_orig(self, attribute_name)686    S3_Bucket_get_cfn_attribute_orig = s3_models.FakeBucket.get_cfn_attribute687    s3_models.FakeBucket.get_cfn_attribute = S3_Bucket_get_cfn_attribute688    @classmethod689    def Bucket_update_from_cloudformation_json(cls,690            original_resource, new_resource_name, cloudformation_json, region_name):691        if cloudformation_json.get('BucketName') in [None, original_resource.name]:692            # TODO: apply other resource updates693            cloudformation_json['BucketName'] = original_resource.name694            return original_resource695        return Bucket_update_from_cf_json_orig(original_resource, new_resource_name, cloudformation_json, region_name)696    Bucket_update_from_cf_json_orig = s3_models.FakeBucket.update_from_cloudformation_json697    s3_models.FakeBucket.update_from_cloudformation_json = Bucket_update_from_cloudformation_json698    # Patch SQS physical_resource_id(..) method in moto699    @property700    def SQS_Queue_physical_resource_id(self):701        result = SQS_Queue_physical_resource_id_orig.fget(self)702        if '://' not in result:703            # convert ID to queue URL704            self._physical_resource_id = (getattr(self, '_physical_resource_id', None) or705                aws_stack.get_sqs_queue_url(result))706            return self._physical_resource_id707        return result708    SQS_Queue_physical_resource_id_orig = sqs_models.Queue.physical_resource_id709    sqs_models.Queue.physical_resource_id = SQS_Queue_physical_resource_id710    # Patch LogGroup get_cfn_attribute(..) method in moto711    def LogGroup_get_cfn_attribute(self, attribute_name):712        try:713            return LogGroup_get_cfn_attribute_orig(self, attribute_name)714        except Exception:715            if attribute_name == 'Arn':716                return aws_stack.log_group_arn(self.name)717            raise718    LogGroup_get_cfn_attribute_orig = getattr(cw_models.LogGroup, 'get_cfn_attribute', None)719    cw_models.LogGroup.get_cfn_attribute = LogGroup_get_cfn_attribute720    # Patch Lambda get_cfn_attribute(..) method in moto721    def Lambda_Function_get_cfn_attribute(self, attribute_name):722        try:723            if attribute_name == 'Arn':724                return self.function_arn725            return Lambda_Function_get_cfn_attribute_orig(self, attribute_name)726        except Exception:727            if attribute_name in ('Name', 'FunctionName'):728                return self.function_name729            raise730    Lambda_Function_get_cfn_attribute_orig = lambda_models.LambdaFunction.get_cfn_attribute731    lambda_models.LambdaFunction.get_cfn_attribute = Lambda_Function_get_cfn_attribute732    # Patch DynamoDB get_cfn_attribute(..) method in moto733    def DynamoDB_Table_get_cfn_attribute(self, attribute_name):734        try:735            if attribute_name == 'StreamArn':736                streams = aws_stack.connect_to_service('dynamodbstreams').list_streams(TableName=self.name)['Streams']737                return streams[0]['StreamArn'] if streams else None738            return DynamoDB_Table_get_cfn_attribute_orig(self, attribute_name)739        except Exception as e:740            LOG.warning('Unable to get attribute "%s" from resource %s: %s' % (attribute_name, type(self), e))741            raise742    DynamoDB_Table_get_cfn_attribute_orig = dynamodb_models.Table.get_cfn_attribute743    dynamodb_models.Table.get_cfn_attribute = DynamoDB_Table_get_cfn_attribute744    # Patch IAM get_cfn_attribute(..) method in moto745    def IAM_Role_get_cfn_attribute(self, attribute_name):746        try:747            return IAM_Role_get_cfn_attribute_orig(self, attribute_name)748        except Exception:749            if attribute_name == 'Arn':750                return aws_stack.role_arn(self.name)751            raise752    IAM_Role_get_cfn_attribute_orig = iam_models.Role.get_cfn_attribute753    iam_models.Role.get_cfn_attribute = IAM_Role_get_cfn_attribute754    # Patch IAM Role model755    # https://github.com/localstack/localstack/issues/925756    @property757    def IAM_Role_physical_resource_id(self):758        return self.name759    iam_models.Role.physical_resource_id = IAM_Role_physical_resource_id760    # Patch SNS Topic get_cfn_attribute(..) method in moto761    def SNS_Topic_get_cfn_attribute(self, attribute_name):762        result = SNS_Topic_get_cfn_attribute_orig(self, attribute_name)763        if attribute_name.lower() in ['arn', 'topicarn']:764            result = aws_stack.fix_account_id_in_arns(result)765        return result766    SNS_Topic_get_cfn_attribute_orig = sns_models.Topic.get_cfn_attribute767    sns_models.Topic.get_cfn_attribute = SNS_Topic_get_cfn_attribute768    # Patch create_from_cloudformation_json(..)769    # #2568 Cloudformation create-stack for SNS with yaml causes TypeError770    SNS_Topic_create_from_cloudformation_json_orig = sns_models.Topic.create_from_cloudformation_json771    def SNS_Topic_create_from_cloudformation_json(resource_name, cloudformation_json, region_name):772        properties = cloudformation_json['Properties']773        if properties.get('Subscription'):774            properties['Subscription'] = [subscription for subscription in properties['Subscription'] if subscription]775        result = SNS_Topic_create_from_cloudformation_json_orig(resource_name, cloudformation_json, region_name)776        # remove topic from backend, as it will be created by our mechanism with additional attributes (like tags)777        sns_models.sns_backends[region_name].topics.pop(result.arn)778        return result779    sns_models.Topic.create_from_cloudformation_json = SNS_Topic_create_from_cloudformation_json780    # Patch ES get_cfn_attribute(..) method781    def ES_get_cfn_attribute(self, attribute_name):782        if attribute_name in ['Arn', 'DomainArn']:783            return aws_stack.es_domain_arn(self.params.get('DomainName'))784        if attribute_name == 'DomainEndpoint':785            if not hasattr(self, '_domain_endpoint'):786                es_details = aws_stack.connect_to_service('es').describe_elasticsearch_domain(787                    DomainName=self.params.get('DomainName'))788                self._domain_endpoint = es_details['DomainStatus']['Endpoint']789            return self._domain_endpoint790        raise UnformattedGetAttTemplateException()791    service_models.ElasticsearchDomain.get_cfn_attribute = ES_get_cfn_attribute792    # Patch Firehose get_cfn_attribute(..) method793    def Firehose_get_cfn_attribute(self, attribute_name):794        if attribute_name == 'Arn':795            return aws_stack.firehose_stream_arn(self.params.get('DeliveryStreamName'))796        raise UnformattedGetAttTemplateException()797    service_models.FirehoseDeliveryStream.get_cfn_attribute = Firehose_get_cfn_attribute798    # Patch LambdaFunction create_from_cloudformation_json(..) method in moto799    @classmethod800    def Lambda_create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):801        resource_name = cloudformation_json.get('Properties', {}).get('FunctionName') or resource_name802        def _from_env(*args, **kwargs):803            result = Mock()804            result.api = Mock()805            result.api.get_adapter = (lambda *args, **kwargs: None)806            return result807        # Temporarily set a mock client, to prevent moto from talking to the Docker daemon808        import docker809        _from_env_orig = docker.from_env810        docker.from_env = _from_env811        try:812            result = Lambda_create_from_cloudformation_json_orig(resource_name, cloudformation_json, region_name)813        finally:814            docker.from_env = _from_env_orig815        return result816    Lambda_create_from_cloudformation_json_orig = lambda_models.LambdaFunction.create_from_cloudformation_json817    lambda_models.LambdaFunction.create_from_cloudformation_json = Lambda_create_from_cloudformation_json818    # Patch EventSourceMapping create_from_cloudformation_json(..) method in moto819    @classmethod820    def Mapping_create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):821        props = cloudformation_json.get('Properties', {})822        func_name = props.get('FunctionName') or ''823        if ':lambda:' in func_name:824            props['FunctionName'] = aws_stack.lambda_function_name(func_name)825        try:826            return Mapping_create_from_cloudformation_json_orig(resource_name, cloudformation_json, region_name)827        except Exception:828            LOG.info('Unable to add Lambda event mapping for source ARN "%s" in moto backend (ignoring)' %829                props.get('EventSourceArn'))830            # return an empty dummy instance, to avoid downstream None value issues831            return service_models.BaseModel()832    Mapping_create_from_cloudformation_json_orig = lambda_models.EventSourceMapping.create_from_cloudformation_json833    lambda_models.EventSourceMapping.create_from_cloudformation_json = Mapping_create_from_cloudformation_json834    # Patch LambdaFunction update_from_cloudformation_json(..) method in moto835    @classmethod836    def Lambda_update_from_cloudformation_json(cls,837            original_resource, new_resource_name, cloudformation_json, region_name):838        resource_name = cloudformation_json.get('Properties', {}).get('FunctionName') or new_resource_name839        return Lambda_create_from_cloudformation_json_orig(resource_name, cloudformation_json, region_name)840    if not hasattr(lambda_models.LambdaFunction, 'update_from_cloudformation_json'):841        lambda_models.LambdaFunction.update_from_cloudformation_json = Lambda_update_from_cloudformation_json842    # Patch Role update_from_cloudformation_json(..) method843    @classmethod844    def Role_update_from_cloudformation_json(cls,845            original_resource, new_resource_name, cloudformation_json, region_name):846        props = cloudformation_json.get('Properties', {})847        original_resource.name = props.get('RoleName') or original_resource.name848        original_resource.assume_role_policy_document = props.get('AssumeRolePolicyDocument')849        return original_resource850    if not hasattr(iam_models.Role, 'update_from_cloudformation_json'):851        iam_models.Role.update_from_cloudformation_json = Role_update_from_cloudformation_json852    # patch ApiGateway Deployment deletion853    @staticmethod854    def depl_delete_from_cloudformation_json(resource_name, resource_json, region_name):855        properties = resource_json['Properties']856        LOG.info('TODO: apigateway.Deployment.delete_from_cloudformation_json %s' % properties)857    if not hasattr(apigw_models.Deployment, 'delete_from_cloudformation_json'):858        apigw_models.Deployment.delete_from_cloudformation_json = depl_delete_from_cloudformation_json...service_models.py
Source:service_models.py  
...129    def resource_id(self):130        """Return the logical resource ID of this resource (i.e., the ref. name within the stack's resources)."""131        return self.resource_json["LogicalResourceId"]132    @classmethod133    def update_from_cloudformation_json(134        cls, original_resource, new_resource_name, cloudformation_json, region_name135    ):136        props = cloudformation_json.get("Properties", {})137        for key, val in props.items():138            snake_key = camel_to_snake_case(key)139            lower_key = key.lower()140            for candidate in [key, lower_key, snake_key]:141                if hasattr(original_resource, candidate) or candidate == snake_key:142                    setattr(original_resource, candidate, val)143                    break144        return original_resource145    @classmethod146    def create_from_cloudformation_json(cls, resource_name, resource_json, region_name):147        return cls(...models.py
Source:models.py  
...23        state_machine = StateMachine(arn, name, definition=definition, role_arn=role_arn)24        stepfunction_backends[region_name].state_machines.append(state_machine)25        return state_machine26    @classmethod27    def update_from_cloudformation_json(cls, original_resource, new_resource_name, cloudformation_json, region_name):28        props = cloudformation_json.get('Properties', {})29        definition = props.get('DefinitionString')30        role_arn = props.get('RoleArn')31        state_machine = stepfunction_backends[region_name].update_state_machine(32            original_resource.arn, definition=definition, role_arn=role_arn,33        )34        return state_machine35    def update(self, **kwargs):36        for key, value in kwargs.items():37            if value is not None:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
