Best Python code snippet using lisa_python
planner.py
Source:planner.py  
1import logging2import yaml3from toscaparser.nodetemplate import NodeTemplate4from toscaparser.tosca_template import ToscaTemplate5from toscaparser.topology_template import TopologyTemplate6import operator7from service.simple_spec_alayzer import SimpleAnalyzer8from util import tosca_helper9logger = logging.getLogger(__name__)10logger.setLevel(logging.DEBUG)11def add_requirement(node, missing_requirement, capable_node_name):12    """Add the requirement to the node """13    first_key = next(iter(missing_requirement))14    missing_requirement[first_key]['node'] = capable_node_name15    if isinstance(node, NodeTemplate):16        contains_requirement = False17        for node_requirement in node.requirements:18            if node_requirement == missing_requirement:19                contains_requirement = True20                break21        if not contains_requirement:22            node.requirements.append(missing_requirement)23    elif isinstance(node, dict):24        type_name = next(iter(node))25        if 'requirements' not in node[type_name]:26            node[type_name]['requirements'] = []27        node_requirements = node[type_name]['requirements']28        contains_requirement = False29        for node_requirement in node_requirements:30            if node_requirement == missing_requirement:31                contains_requirement = True32                break33        if not contains_requirement:34            node[type_name]['requirements'].append(missing_requirement)35    return node36def get_default_value(node_property):37    if node_property and node_property.required and isinstance(node_property.value, dict) and 'required' in \38            node_property.value and 'type' in node_property.value:39        if node_property.default:40            return {node_property.name: node_property.default}41        if node_property.constraints:42            for constraint in node_property.constraints:43                print(constraint)44    if node_property and node_property.required and node_property.value:45        return {node_property.name: node_property.value}46    return None47class Planner:48    def __init__(self, tosca_path=None, yaml_dict_tpl=None, spec_service=None):49        if tosca_path:50            self.path = tosca_path51            self.tosca_template = ToscaTemplate(tosca_path)52        elif yaml_dict_tpl:53            self.yaml_dict_tpl = yaml_dict_tpl54            logger.info('yaml_dict_tpl:\n' + str(yaml.dump(yaml_dict_tpl)))55            self.tosca_template = ToscaTemplate(yaml_dict_tpl=yaml_dict_tpl)56        self.tosca_node_types = self.tosca_template.nodetemplates[0].type_definition.TOSCA_DEF57        self.all_custom_def = self.tosca_template.nodetemplates[0].custom_def58        self.all_node_types = {}59        self.all_node_types.update(self.tosca_node_types.items())60        self.all_node_types.update(self.all_custom_def.items())61        self.required_nodes = []62        self.spec_service = spec_service63    def add_required_nodes_to_template(self, required_nodes):64        for req_node in required_nodes:65            node_template = tosca_helper.node_type_2_node_template(req_node, self.all_custom_def)66            self.tosca_template.nodetemplates.append(node_template)67        return self.tosca_template68    def set_default_node_properties(self, node):69        logger.info('Setting properties for: ' + str(node.type))70        ancestors_properties = tosca_helper.get_all_ancestors_properties(node, self.all_node_types,71                                                                         self.all_custom_def)72        default_properties = {}73        if ancestors_properties:74            for ancestors_property in ancestors_properties:75                for node_prop in node.get_properties_objects():76                    if ancestors_property.name == node_prop.name and isinstance(node_prop.value,dict) \77                            and 'required' in node_prop.value and 'type' in node_prop.value:78                        default_property = get_default_value(ancestors_property)79                        if default_property:80                            node_prop.value = default_property81                            default_properties[next(iter(default_property))] = default_property[82                                next(iter(default_property))]83        if default_properties:84            for default_property in default_properties:85                for node_prop_obj in node.get_properties_objects():86                    if default_property == node_prop_obj.name and not node_prop_obj.value:87                        node.get_properties_objects().append(default_property)88            node_name = next(iter(node.templates))89            if 'properties' in node.templates[node_name]:90                for prop_name in node.templates[node_name]['properties']:91                    if isinstance(node.templates[node_name]['properties'][prop_name], dict) or \92                            isinstance(node.templates[node_name]['properties'][prop_name], str):93                        if 'required' in node.templates[node_name]['properties'][prop_name] and \94                                node.templates[node_name]['properties'][prop_name]['required'] and \95                                'default' in node.templates[node_name]['properties'][prop_name] and \96                                prop_name not in default_properties:97                            default_properties[prop_name] = node.templates[node_name]['properties'][prop_name][98                                'default']99            logger.info(100                'Adding to : ' + str(node.templates[node_name]) + ' properties: ' + str(default_properties))101            node.templates[node_name]['properties'] = default_properties102        return node103    def set_node_templates_properties(self):104        for node_template in self.tosca_template.nodetemplates:105            self.set_default_node_properties(node_template)106        specification_analyzer = SimpleAnalyzer(self.tosca_template)107        nodes_with_new_relationship_occurrences = specification_analyzer.set_relationship_occurrences()108        added_node_names = []109        for new_spec_occurrences in nodes_with_new_relationship_occurrences:110            for index, node_in_temple in enumerate(self.tosca_template.nodetemplates):111                if new_spec_occurrences.name == node_in_temple.name:112                    added_node_names.append(new_spec_occurrences.name)113                    self.tosca_template.nodetemplates[index] = new_spec_occurrences114                    break115        for new_spec_occurrences in nodes_with_new_relationship_occurrences:116            if new_spec_occurrences.name not in added_node_names:117                self.tosca_template.nodetemplates.append(new_spec_occurrences)118        return self.tosca_template119    def get_node_template_property(self, prop_key, node_prop_dict):120        prop_value = self.spec_service.get_property(prop_key)121        if prop_value:122            return {prop_key: node_prop_dict}123        else:124            if 'required' in node_prop_dict and 'default' in node_prop_dict:125                return node_prop_dict['default']126            if 'constraints' in node_prop_dict:127                constraints = node_prop_dict['constraints']128                for constraint in constraints:129                    if next(iter(constraint)) == 'greater_or_equal':130                        return constraint[next(iter(constraint))]131            return None132    def resolve_requirements(self):133        """ Resolve requirements. Go over all nodes and recursively resolve requirements till node has no134        requirements  e.g. docker -> k8s -> cluster -> vm """135        for node in self.tosca_template.nodetemplates:136            self.add_interfaces(node)137            self.add_required_nodes(node)138        return self.add_required_nodes_to_template(self.required_nodes)139    def add_required_nodes(self, node):140        """Adds the required nodes in self.required_nodes for an input node."""141        if isinstance(node, NodeTemplate):142            logger.info('Resolving requirements for: ' + node.name)143        elif isinstance(node, dict):144            logger.info('Resolving requirements for: ' + str(next(iter(node))))145        # Get all requirements for node.146        all_requirements = self.get_all_requirements(node)147        if not all_requirements:148            logger.debug('Node: ' + tosca_helper.get_node_type_name(node) + ' has no requirements')149            return150        matching_nodes_dict = self.find_best_node_for_requirements(all_requirements)151        for capability in matching_nodes_dict:152            # Only add node that is not in node_templates153            matching_node = matching_nodes_dict[capability]154            matching_node_type_name = next(iter(matching_node))155            matching_node_template = tosca_helper.node_type_2_node_template(matching_node, self.all_custom_def)156            for req in all_requirements:157                req_name = next(iter(req))158                requirement_capability = req[req_name]['capability']159                if capability == requirement_capability:160                    # Add the requirements to the node we analyzed. e.g. docker needed host now we added the type and name of host161                    node = add_requirement(node, req, matching_node_template.name)162                    break163            if not tosca_helper.contains_node_type(self.required_nodes, matching_node_type_name) and \164                not tosca_helper.contains_node_type(self.tosca_template.nodetemplates, matching_node_type_name):165                logger.info('  Adding: ' + str(matching_node_template.name))166                self.required_nodes.append(matching_node)167            # Find matching nodes for the new node's requirements168            self.add_required_nodes(matching_node)169    def get_all_requirements(self, node):170        """Returns  all requirements for an input node including all parents requirements"""171        node_type_name = tosca_helper.get_node_type_name(node)172        logger.info('      Looking for requirements for node: ' + node_type_name)173        # Get the requirements for this node from its definition e.g. docker: hostedOn k8s174        def_type = self.all_node_types[node_type_name]175        all_requirements = []176        if 'requirements' in def_type.keys():177            all_requirements = def_type['requirements']178            logger.info('      Found requirements: ' + str(all_requirements) + ' for node: ' + node_type_name)179        # Get the requirements for this node from the template. e.g. wordpress: connectsTo mysql180        # node_requirements =  tosca_helper.get_node_requirements(node)181        # if node_requirements:182        #     all_requirements += node_requirements183        # Put them all together184        parent_requirements = tosca_helper.get_ancestors_requirements(node, self.all_node_types, self.all_custom_def)185        parent_type = tosca_helper.get_node_type_name(node)186        if parent_type and parent_requirements:187            logger.info(188                '       Adding to : ' + str(node_type_name) + '  parent requirements from: ' + str(parent_type))189            if not all_requirements:190                all_requirements += parent_requirements191            else:192                for all_requirement in all_requirements:193                    for parent_requirement in parent_requirements:194                        all_requirement_key = next(iter(all_requirement))195                        parent_requirement_key = next(iter(parent_requirement))196                        if all_requirement_key != parent_requirement_key and all_requirement[all_requirement_key][197                            'capability'] != parent_requirement[parent_requirement_key]['capability']:198                            all_requirements.append(parent_requirement)199            logger.debug('      all_requirements: ' + str(all_requirements))200        return all_requirements201    def get_node_types_by_capability(self, cap):202        """Returns  all nodes that have the  capability: cap and have interfaces. This way we  distinguish between203        'abstract' and 'concrete' """204        candidate_nodes = {}205        for tosca_node_type in self.all_node_types:206            if tosca_node_type.startswith('tosca.nodes') and 'capabilities' in self.all_node_types[tosca_node_type]:207                logger.debug('      Node: ' + str(tosca_node_type))208                for caps in self.all_node_types[tosca_node_type]['capabilities']:209                    logger.debug('          ' + str(210                        self.all_node_types[tosca_node_type]['capabilities'][caps]['type']) + ' == ' + cap)211                    if self.all_node_types[tosca_node_type]['capabilities'][caps]['type'] == cap:212                        candidate_nodes[tosca_node_type] = self.all_node_types[tosca_node_type]213                        logger.debug('          candidate_node: ' + str(tosca_node_type))214        candidate_child_nodes = {}215        for node in candidate_nodes:216            candidate_child_nodes.update(self.get_child_nodes(node))217        candidate_nodes.update(candidate_child_nodes)218        capable_nodes = {}219        # Only return the nodes that have interfaces. This means that they are not "abstract"220        nodes_type_names_with_interface = tosca_helper.get_node_types_with_interface(candidate_nodes)221        for type_name in nodes_type_names_with_interface:222            capable_nodes[type_name] = candidate_nodes[type_name]223        return capable_nodes224    def find_best_node_for_requirements(self, all_requirements):225        """Returns  the 'best' node for a set of requirements. Here we count the number of requiremets that the node226        can cover and return the one which covers the most """227        matching_nodes = {}228        number_of_matching_requirement = {}229        met_requirements = {}230        # Loop requirements to find nodes per requirement231        for req in all_requirements:232            key = next(iter(req))233            if not req[key]:234                raise Exception('Requirement: '+str(req)+ ' is not properly defined')235            if 'capability' in req[key]:236                capability = req[key]['capability']237                # Find all nodes in the definitions that have the capability: capability238                logger.info('  Looking for nodes in node types with capability: ' + capability)239                capable_nodes = self.get_node_types_by_capability(capability)240                if not capable_nodes:241                    logger.error('Did not find any node with required capability: ' + str(capability))242                    raise Exception('Did not find any node with required capability: ' + str(capability))243                matching_nodes[capability] = capable_nodes244        # if we only found 1 return it245        if len(matching_nodes) == 1:246            return matching_nodes247        returning_nodes = {}248        for capability in matching_nodes:249            nodes = matching_nodes[capability]250            key = next(iter(nodes))251            returning_nodes[capability] = {key:nodes[key]}252        return returning_nodes253        # sorted_number_of_matching_requirement = sorted(number_of_matching_requirement.items(),254        #                                                key=operator.itemgetter(1))255        # index = len(sorted_number_of_matching_requirement) - 1256        # winner_type = next(iter(sorted_number_of_matching_requirement[index]))257        # return {winner_type: matching_nodes[winner_type]}258    def get_child_nodes(self, parent_node_type_name):259        child_nodes = {}260        for tosca_node_type in self.all_node_types:261            if tosca_node_type.startswith('tosca.nodes') and 'derived_from' in self.all_node_types[tosca_node_type]:262                if parent_node_type_name == self.all_node_types[tosca_node_type]['derived_from']:263                    child_nodes[tosca_node_type] = self.all_node_types[tosca_node_type]264        return child_nodes265    def add_interfaces(self, node):266        # node_type_interfaces = tosca_helper.get_node_type_interfaces(node)267        # node_template_interfaces = tosca_helper.get_node_template_interfaces(node)268        # if not node_template_interfaces and node_type_interfaces:269        #     tosca_helper.add_interfaces(node,node_type_interfaces)...validate_relationships.py
Source:validate_relationships.py  
1class ValidateRelationships:2    node_capability = []3    old_relationships = []4    local_key_index = []5    remote_key_index = []6    def get_relationship_indexs(self, content):7        nodes = content["topology_template"]["relationship_templates"]8        for keys, values in nodes.items():9            if 'Local' in keys:10                self.local_key_index.append(keys.split('_')[2])11            if 'Remote' in keys:12                self.remote_key_index.append(keys.split('_')[2])13    def validate_capability_and_relationship_of_remote_node(self, content, nodes_to_change):14        for node in nodes_to_change:15            main_node = node.split('#')16            node_requirement = content["topology_template"]["node_templates"][main_node[0]]["requirements"]17            for n in range(len(node_requirement)):18                self.get_relationship_indexs(content)19                index_num = int(max(self.remote_key_index)) + 1 if self.remote_key_index else 020                for key, val in node_requirement[n].items():21                    if 'connecttopipelineremote' in key.lower():22                        for key_0, val_0 in val.items():23                            if "capability" in key_0.lower():24                                if val_0.lower() != key.lower():25                                    val[key_0] = val_0.replace('Pipeline', 'PipelineRemote')26                            elif "relationship" in key_0.lower():27                                edit_value = val_0.split('_')28                                updated_value = edit_value[0] + '_' + \29                                                edit_value[1].replace('Local', 'Remote') + '_' + str(index_num)30                                val[key_0] = updated_value31                                self.old_relationships.append(val_0 + '#' + updated_value)32        return content33    def validate_capability_and_relationship_of_local_node(self, content, nodes_to_change):34        for node in nodes_to_change:35            main_node = node.split('#')36            node_requirement = content["topology_template"]["node_templates"][main_node[0]]["requirements"]37            for n in range(len(node_requirement)):38                self.get_relationship_indexs(content)39                index_num = int(max(self.local_key_index)) + 1 if self.local_key_index else 040                for key, val in node_requirement[n].items():41                    if 'connecttopipeline' == key.lower():42                        for key_0, val_0 in val.items():43                            if "capability" in key_0.lower():44                                if val_0.lower() != key.lower():45                                    val[key_0] = val_0.replace('PipelineRemote', 'Pipeline')46                            elif "relationship" in key_0.lower():47                                edit_value = val_0.split('_')48                                updated_value = edit_value[0] + '_' + \49                                                edit_value[1].replace('Remote', 'Local') + '_' + str(index_num)50                                val[key_0] = updated_value51                                self.old_relationships.append(val_0 + '#' + updated_value)52        return content53    def validate_relationship_templates(self, content):54        nodes = content["topology_template"]["relationship_templates"]55        for relation in self.old_relationships:56            new_old_key=relation.split('#')57            for keys,values in nodes.items():58                if (new_old_key[0] in keys) and ('local' in keys.lower()):59                    nodes[new_old_key[1]]=nodes.pop(keys)60                    for key_0,val_0 in values.items():61                        values[key_0]=val_0.replace('Local', 'Remote')62                elif (new_old_key[0] in keys) and ('remote' in keys.lower()):63                    nodes[new_old_key[1]]=nodes.pop(keys)64                    for key_0,val_0 in values.items():65                        values[key_0]=val_0.replace('Remote', 'Local')66        return content67    def delete_duplicate_node_relationship(self,content,duplicate_node_keys):68        for relation_key in duplicate_node_keys:69            del content["topology_template"]["relationship_templates"][relation_key]...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
