Best Python code snippet using autotest_python
usdlParser.py
Source:usdlParser.py  
...63            offerings_number = offerings_number + 164        if offerings_number != 1:65            msg = _('No service offering has been defined')66            raise Exception(msg)67    def _get_field(self, namespace, element, predicate, id_=False):68        result = []69        for e in self._graph.objects(element, namespace[predicate]):70            if not id_:71                result.append(unicode(e))72            else:73                #If id = True means that the uri will be used so it is necesary to return the class74                result.append(e)75        if len(result) == 0:76            result.append('')77        return result78    def _parse_basic_info(self, service_uri):79        count = 080        result = {}81        for t in self._graph.predicate_objects(service_uri):82            count = count + 183        if count < 3:84            result['part_ref'] = True85            return86        result['part_ref'] = False87        vendor = self._get_field(USDL, service_uri, 'hasProvider', id_=True)[0]88        result['vendor'] = self._get_field(FOAF, vendor, 'name')[0]89        # provider vCard90        vcard = self._get_field(VCARD, vendor, 'adr', id_=True)[0]91        if vcard != '':92            result['vcard'] = {93                'BEGIN': [{94                    'properties': {},95                    'value':'VCARD',96                }],97                'FN': [{98                    'properties': {},99                    'value': result['vendor']100                }],101                'ADR': [{102                    'properties': {},103                    'value': self._get_field(VCARD, vcard, 'street-address')[0] + ';' + self._get_field(VCARD, vcard, 'postal-code')[0] + ';' + self._get_field(VCARD, vcard, 'locality')[0] + ';' + self._get_field(VCARD, vcard, 'country-name')[0]104                }],105                'TEL': [{106                    'properties': {},107                    'value': self._get_field(VCARD, vcard, 'tel')[0]108                }],109                'EMAIL': [{110                    'properties': {},111                    'value': self._get_field(VCARD, vcard, 'email')[0]112                }],113                'END': [{114                    'properties': {},115                    'value': "VCARD"116                }]117            }118        result['name'] = self._get_field(DCTERMS, service_uri, 'title')[0]119        version = self._get_field(USDL, service_uri, 'versionInfo')[0]120        service_parts = self._get_field(USDL, service_uri, 'hasPartMandatory', id_=True)121        if len(service_parts) > 0:122            result['parts'] = []123            for part in service_parts:124                part_info = {}125                part_info['name'] = self._get_field(DCTERMS, part, 'title')[0]126                part_info['uri'] = unicode(part)127                result['parts'].append(part_info)128        result['short_description'] = self._get_field(DCTERMS, service_uri, 'abstract')[0]129        result['long_description'] = self._get_field(DCTERMS, service_uri, 'description')[0]130        result['created'] = self._get_field(DCTERMS, service_uri, 'created')[0]131        result['modified'] = self._get_field(DCTERMS, service_uri, 'modified')[0]132        result['uriImage'] = self._get_field(FOAF, service_uri, 'depiction')[0]133        result['version'] = version134        result['page'] = self._get_field(FOAF, service_uri, 'page')[0]135        return result136    def _parse_legal_info(self, service_uri):137        result = []138        legal_conditions = self._get_field(USDL, service_uri, 'hasLegalCondition', id_=True)139        # If legal does not exist the method does nothing140        if len(legal_conditions) == 1 and legal_conditions[0] == '':141            return []142        for legal in legal_conditions:143            legal_condition = {144                'type': self._get_field(RDF, legal, 'type')[0],145                'label': self._get_field(DCTERMS, legal, 'title')[0],146                'description': self._get_field(DCTERMS, legal, 'description')[0],147                'clauses': [],148            }149            clauses = self._get_field(LEGAL, legal, 'hasClause', id_=True)150            for c in clauses:151                clause = {}152                clause['name'] = self._get_field(LEGAL, c, 'name')[0]153                clause['text'] = self._get_field(LEGAL, c, 'text')[0]154                legal_condition['clauses'].append(clause)155            result.append(legal_condition)156        return result157    def _parse_sla_info(self, service_uri):158        result = []159        service_level_profile = self._get_field(USDL, service_uri, 'hasServiceLevelProfile', id_=True)[0]160        #If sla does not exist the method does nothing161        if service_level_profile != '':162            service_levels = self._get_field(SLA, service_level_profile, 'hasServiceLevel', id_=True)163            for sla in service_levels:164                service_level = {165                    'type': self._get_field(RDF, sla, 'type')[0],166                    'name': self._get_field(DCTERMS, sla, 'title')[0],167                    'description': self._get_field(DCTERMS, sla, 'description')[0],168                    'obligatedParty': self._get_field(SLA, sla, 'obligatedParty')[0],169                    'slaExpresions': [],170                }171                sla_expresions = self._get_field(SLA, sla, 'serviceLevelExpression', id_=True)172                for exp in sla_expresions:173                    expresion = {174                        'name': self._get_field(DCTERMS, exp, 'title')[0],175                        'description': self._get_field(DCTERMS, exp, 'description')[0],176                        'variables': [],177                    }178                    variables = self._get_field(SLA, exp, 'hasVariable', id_=True)179                    for var in variables:180                        # The sla variables may defines service availibility or location, defined181                        # by a point in utm coodinates plus a radius, in this case this property is182                        # set in the sla expresion.183                        radius = False184                        default_value = self._get_field(SLA, var, 'hasDefault', id_=True)[0]185                        if default_value == '':186                            default_value = self._get_field(SLA, var, 'hasServiceRadius', id_=True)[0]187                            if default_value != '':188                                radius = True189                                if 'location' not in expresion:190                                    expresion['location'] = {}191                        type_ = self._get_field(RDF, default_value, 'type', id_=True)[0]192                        # The sla variable may contains a location, i.e service availability193                        if type_ == GR['QualitativeValue']:194                            location = self._get_field(GEO, default_value, 'location', id_=True)[0]195                            if location != '':196                                if 'location' not in expresion:197                                    expresion['location'] = {}198                                expresion['location']['coordinates'] = {199                                    'lat': self._get_field(GEO, location, 'lat')[0],200                                    'long': self._get_field(GEO, location, 'long')[0],201                                }202                        else:203                            variable_info = {204                                'label': self._get_field(RDFS, var, 'label')[0],205                                'type': self._get_field(RDF, default_value, 'type')[0],206                                'value': self._get_field(GR, default_value, 'hasValue')[0],207                                'unit': self._get_field(GR, default_value, 'hasUnitOfMeasurement')[0],208                            }209                            expresion['variables'].append(variable_info)210                            if radius:211                                expresion['location']['radius'] = variable_info212                    service_level['slaExpresions'].append(expresion)213                result.append(service_level)214        return result215    def _parse_interaction_protocols(self, service_uri):216        result = []217        # Get the interaction protocols that define the service functionality218        interaction_protocols = self._get_field(USDL, service_uri, 'hasInteractionProtocol', id_=True)219        if len(interaction_protocols) == 1 and interaction_protocols[0] == '':220            return []221        for itp in interaction_protocols:222            protocol_info = {223                'title': self._get_field(DCTERMS, itp, 'title')[0],224                'description': self._get_field(DCTERMS, itp, 'description')[0],225                'technical_interface': self._get_field(USDL, itp, 'hasTechnicalInterface')[0],226                'interactions': []227            }228            interactions = self._get_field(USDL, itp, 'hasInteraction', id_=True)229            for inte in interactions:230                interaction_info = {231                    'title': self._get_field(DCTERMS, inte, 'title')[0],232                    'description': self._get_field(DCTERMS, inte, 'description')[0],233                    'interface_operation': self._get_field(USDL, inte, 'hasInterfaceOperation')[0],234                    'inputs': [],235                    'outputs': []236                }237                for input_ in self._get_field(USDL, inte, 'hasInput', id_=True):238                    input_info = {239                        'label': self._get_field(RDFS, input_, 'label')[0],240                        'description': self._get_field(DCTERMS, input_, 'description')[0],241                        'interface_element': self._get_field(USDL, input_, 'hasInterfaceElement')[0]242                    }243                    interaction_info['inputs'].append(input_info)244                for output in self._get_field(USDL, inte, 'hasOutput', id_=True):245                    output_info = {246                        'label': self._get_field(RDFS, output, 'label')[0],247                        'description': self._get_field(DCTERMS, output, 'description')[0],248                        'interface_element': self._get_field(USDL, output, 'hasInterfaceElement')[0]249                    }250                    interaction_info['outputs'].append(output_info)251                protocol_info['interactions'].append(interaction_info)252            result.append(protocol_info)253        return result254    def _parse_bind_expression(self, expression):255        result = {}256        operations = {257            unicode(SP['Sum']): '+',258            unicode(SP['Minus']): '-',259            unicode(SP['Mul']): '*',260            unicode(SP['Div']): '/'261        }262        # Check the operation263        exp_type = self._get_field(RDF, expression, 'type', id_=True)264        if exp_type[0] == '':265            raise Exception('Invalid price function: An expression must contain an operation per level')266        try:267            op = operations[unicode(exp_type[0])]268        except:269            raise Exception('Invalid price function: Invalid operation')270        # Get arguments value271        arg1 = self._get_field(SP, expression, 'arg1', id_=True)[0]272        arg2 = self._get_field(SP, expression, 'arg2', id_=True)[0]273        if self._get_field(RDF, arg1, 'type', id_=True)[0] == '':274            # Get variable name275            result['arg1'] = self._get_field(SP, arg1, 'varName')[0]276        else:277            # arg1 is an expression278            result['arg1'] = self._parse_bind_expression(arg1)279        if self._get_field(RDF, arg2, 'type', id_=True)[0] == '':280            # Get variable name281            result['arg2'] = self._get_field(SP, arg2, 'varName')[0]282        else:283            # arg2 is an expression284            result['arg2'] = self._parse_bind_expression(arg2)285        result['operation'] = op286        return result287    def _parse_function(self, price_function):288        result = {289            'label': self._get_field(RDFS, price_function, 'label')[0],290            'variables': {},291            'function': {}292        }293        variables = []294        # Pre-process price variables295        for variable in self._get_field(PRICE, price_function, 'hasVariable', id_=True):296            # Check variable type to know if it is a constant or an usage297            # variable298            var_label = self._get_field(RDFS, variable, 'label')[0]299            var_type = self._get_field(RDF, variable, 'type', id_=True)[0]300            if var_type == PRICE['Constant']:301                val_part = self._get_field(PRICE, variable, 'hasValue', id_=True)[0]302                const_value = self._get_field(GR, val_part, 'hasValueFloat')303                # Check that the value is valid304                if len(const_value) == 1 and const_value[0] == '':305                    const_value = self._get_field(GR, val_part, 'hasValueInteger')306                    if len(const_value) != 1 or const_value[0] == '':307                        raise Exception('Invalid price function: Only a value is allowed for constants')308                variables.append({309                    'type': 'constant',310                    'id': variable,311                    'label': var_label,312                    'value': const_value[0]313                })314            elif var_type == PRICE['Usage']:315                variables.append({316                    'type': 'usage',317                    'id': variable,318                    'label': var_label319                })320            else:321                raise Exception('Invalid price function: Invalid variable type')322        # Get body expression323        function_body = self._get_field(SPIN, price_function, 'body', id_=True)[0]324        body_type = self._get_field(RDF, function_body, 'type', id_=True)[0]325        if body_type != SP['Select']:326            raise Exception('Invalid price function: Invalid SPARQL method')327        # Check where clause328        bind_expression = None329        matching_exp = {}330        # Iterate over expressions list331        exp_list = self._get_field(SP, function_body, 'where', id_=True)[0]332        # First element333        first = self._get_field(RDF, exp_list, 'first', id_=True)[0]334        # Rest of the list335        rest = self._get_field(RDF, exp_list, 'rest', id_=True)[0]336        nil = False337        while not nil:338            # Get expression node339            exp = first340            # Check bind expression341            exp_type = self._get_field(RDF, exp, 'type', id_=True)342            if len(exp_type) > 0 and exp_type[0] == SP['Bind']:343                if bind_expression == None:344                    bind_expression = exp345                else:346                    raise Exception('Invalid price function: Only a bind expression is allowed')347            else:348                # Get expression tuple349                subject = self._get_field(SP, exp, 'subject', id_=True)[0]350                predicate = self._get_field(SP, exp, 'predicate', id_=True)[0]351                object_ = self._get_field(SP, exp, 'object', id_=True)[0]352                # Check if the expression defines an aux name353                if predicate == PRICE['hasValue']:354                    variable_info = None355                    found = False356                    i = 0357                    while not found and i < len(variables):358                        if variables[i]['id'] == subject:359                            found = True360                            variable_info = variables[i]361                            variable_info['type'] = variables[i]['type']362                        i += 1363                    if not found:364                        raise Exception('Invalid price function: Variable not declared')365                    # Get the aux name366                    aux_name = self._get_field(SP, object_, 'varName')[0]367                    if aux_name in matching_exp:368                        # Check that is a variable name and not a369                        # duplicity in the expression370                        if 'variable_info' in matching_exp[aux_name]:371                            raise Exception('Invalid price function: Duplicated expression')372                        # The variable name used in the bind expression has373                        # already been processed374                        final_name = matching_exp[aux_name]['final_name']375                        # The variable and its name are bound so it is possible to store376                        # the variable info in the result structure377                        result['variables'][final_name] = {378                            'label': variable_info['label'],379                            'type': variable_info['type']380                        }381                        if variable_info['type'] == 'constant':382                            result['variables'][final_name]['value'] = variable_info['value']383                        # Store variable info in matching expression in order to avoid duplicity384                        matching_exp[aux_name]['variable_info'] = variable_info385                    else:386                        matching_exp[aux_name] = {387                            'variable_info': variable_info388                        }389                # Check if the expression defines a final name390                elif predicate == GR['hasValueFloat'] or predicate == GR['hasValueInteger']:391                    aux_name = self._get_field(SP, subject, 'varName')[0]392                    final_name = self._get_field(SP, object_, 'varName')[0]393                    if aux_name in matching_exp:394                        # Check that it contains variable info and395                        # not a duplicated final name396                        if 'final_name' in matching_exp[aux_name]:397                            raise Exception('Invalid price function: Duplicated expression')398                        # The variable has already been processed so it is possible399                        # to store the variable info in the result structure400                        result['variables'][final_name] = {401                            'label': matching_exp[aux_name]['variable_info']['label'],402                            'type': matching_exp[aux_name]['variable_info']['type']403                        }404                        if matching_exp[aux_name]['variable_info']['type'] == 'constant':405                            result['variables'][final_name]['value'] = matching_exp[aux_name]['variable_info']['value']406                        # Store the final name in matching expression to avoid duplicity407                        matching_exp[aux_name]['final_name'] = final_name408                    else:409                        matching_exp[aux_name] = {410                            'final_name': final_name411                        }412                else:413                    raise Exception('Invalid price function: Invalid predicate')414            # Check list pointer415            if rest == RDF['nil']:416                # There is not more elements417                nil = True418            else:419                # Update list pointer420                first = self._get_field(RDF, rest, 'first', id_=True)[0]421                rest = self._get_field(RDF, rest, 'rest', id_=True)[0]422        # Parse bind expression423        expression = self._get_field(SP, bind_expression, 'expression', id_=True)[0]424        # Parse the concrete function used to calculate the price425        result['function'] = self._parse_bind_expression(expression)426        # Build price function427        return result428    def _parse_pricing_info(self):429        result = {}430        # Parse offering info431        result['title'] = self._get_field(DCTERMS, self._offering_uri, 'title')[0]432        result['description'] = self._get_field(DCTERMS, self._offering_uri, 'description')[0]433        result['valid_from'] = self._get_field(USDL, self._offering_uri, 'validFrom')[0]434        result['valid_through'] = self._get_field(USDL, self._offering_uri, 'validThrough')[0]435        # Parse price plans info436        price_plans = self._get_field(USDL, self._offering_uri, 'hasPricePlan', id_=True)437        result['price_plans'] = []438        # Parse price plans439        for price in price_plans:440            price_plan = {}441            price_plan['title'] = self._get_field(DCTERMS, price, 'title')[0]442            price_plan['description'] = self._get_field(DCTERMS, price, 'description')[0]443            # Get the label of the price plan444            label = self._get_field(RDFS, price, 'label')445            if len(label) == 1:446                price_plan['label'] = label[0]447            elif len(label) > 1:448                raise Exception('Only a label is supported for the price plan')449            # Get price components450            price_components = self._get_field(PRICE, price, 'hasPriceComponent', id_=True)451            if len(price_components) > 1 or price_components[0] != '':452                # Initialize components and deductions453                price_plan['price_components'] = []454                price_plan['deductions'] = []455                for pc in price_components:456                    # Get initial information457                    price_component = {458                        'title': self._get_field(DCTERMS, pc, 'title')[0],459                        'description': self._get_field(DCTERMS, pc, 'description')[0]460                    }461                    # Check if it includes a price specification462                    price_specification = self._get_field(PRICE, pc, 'hasPrice', id_=True)[0]463                    function_found = False464                    if price_specification != '':465                        price_container = price_specification466                    else:467                        # Check if a price function exists468                        price_function = self._get_field(PRICE, pc, 'hasPriceFunction', id_=True)[0]469                        text_function = self._get_field(PRICE, pc, 'hasTextFunction')[0]470                        if price_function != '' and text_function != '':471                            function_found = True472                        else:473                            price_container = pc474                    if function_found:475                        # Save price function serialization476                        price_component['text_function'] = text_function477                        price_component['price_function'] = self._parse_function(price_function)478                    else:479                        # If not price function defined, get the price480                        price_component['currency'] = self._get_field(GR, price_container, 'hasCurrency')[0]481                        value = self._get_field(GR, price_container, 'hasCurrencyValue')[0]482                        if not value:483                            price_component['value'] = self._get_field(GR, price_container, 'hasValueFloat')[0]484                        else:485                            price_component['value'] = value486                        price_component['unit'] = self._get_field(GR, price_container, 'hasUnitOfMeasurement')[0]487                    # Check if it is a deduction getting component type488                    component_type = self._get_field(RDF, pc, 'type', id_=True)[0]489                    if component_type == PRICE['PriceComponent']:490                        price_plan['price_components'].append(price_component)491                    elif component_type == PRICE['Deduction']:492                        price_plan['deductions'].append(price_component)493            taxes = self._get_field(PRICE, price, 'hasTax', id_=True)494            if len(taxes) > 1 or taxes[0] != '':495                price_plan['taxes'] = []496                for pc in taxes:497                    tax = {498                        'title': self._get_field(DCTERMS, pc, 'title')[0],499                        'description': self._get_field(DCTERMS, pc, 'description')[0],500                        'currency': self._get_field(GR, pc, 'hasCurrency')[0],501                    }502                    value = self._get_field(GR, pc, 'hasCurrencyValue')503                    if not value:504                        tax['value'] = value[0]505                    else:506                        tax['value'] = self._get_field(GR, pc, 'hasValueFloat')[0]507                    tax['unit'] = self._get_field(GR, pc, 'hasUnitOfMeasurement')[0]508                    price_plan['taxes'].append(tax)509            result['price_plans'].append(price_plan)510        return result511    def parse(self):512        result = {}513        result['pricing'] = self._parse_pricing_info()514        result['services_included'] = []515        for service_uri in self._get_field(USDL, self._offering_uri, 'includes', id_=True):516            if service_uri != '':517                basic_info = self._parse_basic_info(service_uri)518                if not basic_info['part_ref']:519                    basic_info['legal'] = self._parse_legal_info(service_uri)520                    basic_info['sla'] = self._parse_sla_info(service_uri)521                    basic_info['interactions'] = self._parse_interaction_protocols(service_uri)522                    del(basic_info['part_ref'])523                result['services_included'].append(basic_info)524        if not len(result['services_included']) > 0:525            raise Exception('No services included')526        return result527def validate_usdl(usdl, mimetype, offering_data):528    valid = True529    reason = ''...exceptions.py
Source:exceptions.py  
...57    pass58class Diagnostics(object):59    def __init__(self, exc):60        self._exc = exc61    def _get_field(self, field):62        from uxdbcffi._impl.adapters import bytes_to_ascii63        if self._exc and self._exc._pgres:64            b = libpq.PQresultErrorField(self._exc._pgres, field)65            if b:66                b = ffi.string(b)67                if six.PY3:  # py2 tests insist on str here68                    b = bytes_to_ascii(b)69                return b70    @property71    def severity(self):72        return self._get_field(libpq.LIBPQ_DIAG_SEVERITY)73    @property74    def sqlstate(self):75        return self._get_field(libpq.LIBPQ_DIAG_SQLSTATE)76    @property77    def message_primary(self):78        return self._get_field(libpq.LIBPQ_DIAG_MESSAGE_PRIMARY)79    @property80    def message_detail(self):81        return self._get_field(libpq.LIBPQ_DIAG_MESSAGE_DETAIL)82    @property83    def message_hint(self):84        return self._get_field(libpq.LIBPQ_DIAG_MESSAGE_HINT)85    @property86    def statement_position(self):87        return self._get_field(libpq.LIBPQ_DIAG_STATEMENT_POSITION)88    @property89    def internal_position(self):90        return self._get_field(libpq.LIBPQ_DIAG_INTERNAL_POSITION)91    @property92    def internal_query(self):93        return self._get_field(libpq.LIBPQ_DIAG_INTERNAL_QUERY)94    @property95    def context(self):96        return self._get_field(libpq.LIBPQ_DIAG_CONTEXT)97    @property98    def schema_name(self):99        return self._get_field(libpq.LIBPQ_DIAG_SCHEMA_NAME)100    @property101    def table_name(self):102        return self._get_field(libpq.LIBPQ_DIAG_TABLE_NAME)103    @property104    def column_name(self):105        return self._get_field(libpq.LIBPQ_DIAG_COLUMN_NAME)106    @property107    def datatype_name(self):108        return self._get_field(libpq.LIBPQ_DIAG_DATATYPE_NAME)109    @property110    def constraint_name(self):111        return self._get_field(libpq.LIBPQ_DIAG_CONSTRAINT_NAME)112    @property113    def source_file(self):114        return self._get_field(libpq.LIBPQ_DIAG_SOURCE_FILE)115    @property116    def source_line(self):117        return self._get_field(libpq.LIBPQ_DIAG_SOURCE_LINE)118    @property119    def source_function(self):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
