Best Python code snippet using sure_python
networkio.py
Source:networkio.py  
1from .. core.Edge import Edge2from .. core.Node import Node3from .. core.NodeType import NodeType4from .. core.Network import Network5from ..core.shared_functions import test_kwarg6from ..core.parameters import saved_node_attribute_list7from ..core.parameters import saved_edge_attribute_list8from ..core.parameters import saved_network_attribute_list9def is_float_try(the_string):10    """ Simple test to check if a string can be represented as a 11    floating point.12    """13    try:14        float(the_string)15        return True16    except ValueError:17        return False18    19def string_to_list(the_string, force_to_float = True, sep_char =  ", "):20    """ Convert strings read from file to Python lists21    """22    the_string = the_string.lstrip("[")23    the_string = the_string.rstrip("]")24    the_list = the_string.split(sep_char)25    if the_list[0].startswith("'") & the_list[0].endswith("'"):26        the_list = [x.lstrip("'").rstrip("'") for x in the_list]27    elif the_list[0].startswith("\'") & the_list[0].endswith("\'"):28        the_list = [x.lstrip("\'").rstrip("\'") for x in the_list]    29    for index, the_value in enumerate(the_list):30        31        if is_float_try(the_value) & (force_to_float == True):32            the_list[index] = float(the_value)33        else:34            the_list[index] = the_value35    return the_list    36def pickle_network(the_network, the_filename, **kwargs):37    """ Break apart and save the network38    Arguments:39     the_network: object to save40     filename: effectively a prefix, don't include .pickle, .npy, etc..., these will be added41    kwargs:42     path: directory to save to43    44    """45    # collections does not appear to be well preserved46    # across platforms, so switched to a47    # list to improve pickle portability48    import cPickle49    # don't need to copy since the object50    # will be written to file.51    # from copy import deepcopy52    from numpy import save53    from numpy import load54    import os55    if 'path' in kwargs:56        path = kwargs['path']57    else:58        path = ""59    # We avoid dependence on60    # package classes.  This will ensure models can be61    # fully ported even with base class updates.62    the_node_list_dict = [] 63    the_edge_list_dict = []64    if len(the_network.nodetypes) == 1:65        the_nodes = the_network.nodetypes[0].nodes66        for the_node in the_nodes:67            the_dict = {}68            the_dict[the_node.id] = {}69            for the_attribute in saved_node_attribute_list:70                the_dict[the_node.id][the_attribute] = getattr(the_node, the_attribute)71            the_node_list_dict.append(the_dict)72        for the_edge in the_network.edges:73            the_dict = {}74            the_dict[the_edge.id] = {}75            for the_attribute in saved_edge_attribute_list:76                the_dict[the_edge.id][the_attribute] = getattr(the_edge, the_attribute)77            the_dict[the_edge.id]['nodes'] = [the_edge._nodes[0].id, the_edge._nodes[1].id]78            the_edge_list_dict.append(the_dict)        79        the_network_dict = {}80        the_network_dict['nodes'] = the_node_list_dict81        the_network_dict['edges'] = the_edge_list_dict82        for the_attribute in saved_network_attribute_list:83            if the_attribute in dir(the_network):84                the_network_dict[the_attribute] = getattr(the_network, the_attribute)85        the_network_dict['id'] = the_network.id86        if not the_filename.endswith(".pickle"):87            the_filename += ".pickle"88        fp = open(path + the_filename, "wb")89        cPickle.dump(the_network_dict, fp)90        fp.close()91    else:92        print "Save error. Convert to monopartite network to save."93def load_pickled_network(the_filename, **kwargs):94    """ Load network object from file95    Arguments:96     the_filename: effectively a prefix, don't include .pickle, .npy, etc..., these will be added97    kwargs:98     path: directory to load from99    100    """101    import cPickle102    import os103    if 'path' in kwargs:104        path = kwargs['path']105    else:106        path = ""107    verbose = test_kwarg('verbose', kwargs, [False, True])108    if not the_filename.endswith(".pickle"):109        the_filename += ".pickle"110    111    if verbose:112        print 'Loading network file %s ...' %(path + the_filename)113        114    fp = open(path + the_filename, "rb")115    the_network_dict = cPickle.load(fp)116    fp.close()117    if verbose:118        print '... file loaded. Creating network nodes ...'    119    the_network = Network(the_network_dict['id'])120    for the_attribute in saved_network_attribute_list:121        if the_attribute in dir(the_network):122            setattr(the_network, the_attribute, the_network_dict[the_attribute])123    the_node_list_dict = the_network_dict['nodes']124    the_edge_list_dict = the_network_dict['edges']125    the_node_ids = [x.keys()[0] for x in the_node_list_dict]126    the_nodetype = the_network.nodetypes[0]127    the_nodetype.add_nodes(the_node_ids)128    for the_index, the_node in enumerate(the_nodetype.nodes):129        for the_attribute in saved_node_attribute_list:130            if the_attribute in dir(the_node):131                setattr(the_node, the_attribute, the_node_list_dict[the_index][the_node.id][the_attribute])132    if verbose:133        print '... nodes created. Linking nodes ...'  134    the_node_pair_list = []135    for the_index, the_edge_dict in enumerate(the_edge_list_dict):136        # We allow for updating the edge id here with the new default separation character137        the_edge_dict = the_edge_dict[the_edge_dict.keys()[0]]138        the_node_pair = [the_network.nodetypes[0].nodes.get_by_id(the_edge_dict['nodes'][0]), the_network.nodetypes[0].nodes.get_by_id(the_edge_dict['nodes'][1])]139        the_node_pair_list.append(the_node_pair)140    the_edge_list = the_network.connect_node_pair_list(the_node_pair_list)141                142    for the_index, the_edge in enumerate(the_edge_list):143        the_edge_dict = the_edge_list_dict[the_index]144        the_edge_dict = the_edge_dict[the_edge_dict.keys()[0]]145        for the_attribute in saved_edge_attribute_list:146            if the_attribute in dir(the_edge):147                setattr(the_edge, the_attribute, the_edge_dict[the_attribute])148        149    the_network.update()150    151    return the_network152def create_network_model_from_textfile(network_id, network_file, **kwargs):153    """ Script to build a basic network model154    from a network model. Will reserve annotation,155    initialization for helper functions to call later156    Arguments:157     network_id: string, name of the model158     file_name: name of the network file.  2 columns tab delimited, with node1 node2159    kwargs:160     none, just a pass-through161    Returns:162     network_model163    """164    verbose = test_kwarg('verbose', kwargs, [False, True])165    if verbose:166        print "Reading the network file..."167        168    fp = open(network_file, 'rU')169    the_list = fp.readlines()170    the_list = [x.rstrip("\n") for x in the_list]171    fp.close()172    if verbose:173        print "     ... read the file."  174        print "Creating the nodes..."175    176    the_nodes_1 = []177    the_nodes_2 = []178    for the_entry in the_list:179        the_entry = the_entry.split("\t")180        the_nodes_1.append(the_entry[0])181        the_nodes_2.append(the_entry[1])182    the_nodes = list(set(the_nodes_1 + the_nodes_2))183    the_nodes.sort()184    the_network = Network(network_id)185    the_nodetype = the_network.nodetypes[0]186    the_nodetype.add_nodes(the_nodes)187    # we will define nodes as the default 'monopartite' nodetype188    for the_node in the_nodetype.nodes:189        the_node.set_nodetype(the_nodetype.id)190        191    if verbose:192        print "     ... the nodes are created."193        print "Linking the nodes, this may take a while..."194    the_nodes_1 = [the_network.nodetypes[0].nodes.get_by_id(the_id) for the_id in the_nodes_1]195    the_nodes_2 = [the_network.nodetypes[0].nodes.get_by_id(the_id) for the_id in the_nodes_2]196    # Get rid of duplicate entries197    the_node_pairs = [[the_nodes_1[i], the_nodes_2[i]] for i in range(0, len(the_nodes_1))]198    # Get rid of pairs where node pairs with itself199    the_node_pairs = [x for x in the_node_pairs if (x[0] != x[1])]200        201    the_edge_list = the_network.connect_node_pair_list(the_node_pairs, **kwargs)202    203    the_network.update()204    return the_network205def create_source_dict_from_textfile(source_file, **kwargs):206    """ Script to create a dict of source values.  This207    will attempt to pull more ids with bioservices if more ids are208    specified.209    Arguments:210     source_file: name of the file that contains the sources.211      1 or 2 columns tab delimited, with node /t value_1212      node: id will be used for pairing to netowrk model nodes213      value_1: optional, assumed to be 1.214    kwargs: 215     none, just a pass through216    Returns:217     source_dict218                  219    """220    continue_flag = True221    222    try:223        fp = open(source_file, 'rU')224        the_list = fp.readlines()225        the_list = [x.rstrip("\n") for x in the_list]226        limit_dict = {}227    except:228        print('Cannot load the source file.  Exiting...')229        continue_flag = False230    source_dict = {}231    if continue_flag:232        source_dict = {}233        for the_entry in the_list:234            the_entry = the_entry.split("\t")235            file_node_id = the_entry[0]236            source_dict[file_node_id] = {}237            if len(the_entry) == 1:238                the_value = 1.239            else:240                the_value = float(the_entry[1])241            source_dict[file_node_id]['value'] = the_value242    return source_dict243def read_table_file_to_dict(filename, **kwargs):244    """simple function to extract a dict of dicts as245    {top_key:{key: value}} from an 246    Unix / tab-delineated file.  This is useful for getting 247    node / edge attributes stored in text format.  The first row248    is assumed to be header information.249    Arguments:250     filename251    kwargs:252     top_key: if left blank, the first entry in the first line is taken to253      be the highest level key for the 254     subfield_key_list: a list of fields to include.  Leave blank if all255                        fields are to be included.256     comment_char: Lines starting with this character/string257                  will be ignored258     sep_char: Separation character259     interpret_lists: [True (default), False]260     columns_on_top: [False (default), True]261      if True, the columns will be used262      as the top level keys.263    Returns:264     organized_return_dict265      266    267    """268    if 'top_key' in kwargs:269        top_key = kwargs['top_key']270    else:271        top_key = ''272    if 'subfield_key_list' in kwargs:273        subfield_key_list = kwargs['subfield_key_list']274    else:275        subfield_key_list = []276    if 'comment_char' in kwargs:277        comment_char = kwargs['comment_char']278    else:279        comment_char = ""280    if 'sep_char' in kwargs:281        sep_char = kwargs['sep_char']282    else:283        sep_char = "\t"284    columns_on_top = test_kwarg('columns_on_top',kwargs,[False, True])285        286    force_to_float = test_kwarg('force_to_float', kwargs, [True, False])287    interpret_lists = test_kwarg('interpret_lists', kwargs, [True, False])288    289    fp = open(filename, 'rU')290    the_list = fp.readlines()291    292    the_list = [x.rstrip("\n") for x in the_list]293    294    if (len(comment_char) > 0):295        found_first = False296        while not(found_first):297            if the_list[0].startswith(comment_char):298                the_list.pop(0)299            else:300                found_first = True301                302    if top_key == '':303        firstline = the_list[0].split(sep_char)304        key_index = 0305        top_key = firstline[0]306    else:307        firstline = the_list[0].split(sep_char)308        key_index = firstline.index(top_key)     309    if (len(subfield_key_list) > 0):310        valueindices = [firstline.index(subfield_key) for subfield_key in subfield_key_list]311    else:312        valueindices = [x for x in range(0, len(firstline)) if (x != key_index)]313        subfield_key_list = [firstline[x] for x in valueindices]314    the_list.pop(0)315    316    return_dict={}317    for the_line in the_list:318        if ((len(comment_char) == 0) | (not(the_line.startswith(comment_char)))):319            the_line = the_line.split(sep_char)320            return_dict[the_line[key_index]] = {}321            for index, subfield_key in enumerate(subfield_key_list):322                cur_value = the_line[valueindices[index]]323                if is_float_try(cur_value) & (force_to_float == True):324                    cur_value = float(cur_value)325                else:326                    if ((cur_value.startswith('"')) & (cur_value.endswith(('"')))):327                        # The double-quotes are sometimes inserted before328                        # and after long string/lists329                        # and aren't needed330                        cur_value = cur_value.lstrip('"').rstrip('"')331                    if interpret_lists:332                        if type(cur_value) != list:333                            if ((cur_value.startswith('[')) & (cur_value.endswith((']')))):334                                cur_value = string_to_list(cur_value, force_to_float = force_to_float)335                return_dict[the_line[key_index]][subfield_key] = cur_value336    if columns_on_top:337        organized_return_dict = {}338        for the_column_header in subfield_key_list:339            organized_return_dict[the_column_header] = {}340            for the_row_name in return_dict.keys():341                organized_return_dict[the_column_header][the_row_name] = return_dict[the_row_name][the_column_header]342            343    else:344        organized_return_dict = return_dict345            346    return organized_return_dict347def write_dict_to_textfile(the_filename, the_output_dict, **kwargs):348    """ Write a dict of dicts to a tab-delimited textfile table.349    Arguments:350     the_filename: file to write to351     the_output_dict: the dict to write352    kwargs:353     top_key: string to describe the top key / id in the table.354     subfields_on_top: [False (default), True]355      whether to write to flip the order356      so the subfields become the column headers in357      the table and the top levels keys become the358      row names.359    Returns:360     nothing, just writes to file361     362    """363    from copy import deepcopy364    subfields_on_top = test_kwarg('subfields_on_top',kwargs,[False, True])365    if 'top_key' in kwargs:366        top_key = kwargs['top_key']367    else:368        top_key = 'name'369    if not subfields_on_top:370        all_top_ids = the_output_dict.keys()371        all_subfield_ids = set([])372        for the_key in all_top_ids:373            all_subfield_ids.update(set(the_output_dict[the_key].keys()))374        all_subfield_ids = list(all_subfield_ids)375        all_subfield_ids.sort()376        all_top_ids.sort()377        the_original_dict = deepcopy(the_output_dict)378        the_output_dict = {}379        for the_new_top_key in all_subfield_ids:380            the_output_dict[the_new_top_key] = {}381            for the_new_subfield_key in all_top_ids:382                the_output_dict[the_new_top_key][the_new_subfield_key] = the_original_dict[the_new_subfield_key][the_new_top_key]383    # First we convert to a table to align384    # the columns385    # First scan through and get all the keys                386    all_entries = the_output_dict.keys()387    all_entries.sort()    388    col_names = set([])389    for cur_entry in the_output_dict.keys():390        col_names.update(set(the_output_dict[cur_entry].keys()))391    col_names = [x for x in col_names]392    col_names.sort()393    the_table = []394    the_table.append([])395    the_table[0].append(top_key)396    the_table[0].extend(col_names)397    for row_index, entry in enumerate(all_entries):398        the_table.append([])399        target_row=[]400        target_row.append(entry)401        for target_col_index, cur_col in enumerate(col_names):402            try:403                cur_value = the_output_dict[entry][cur_col]404            except:405                cur_value=''406            target_row.append(cur_value)407        the_table[row_index + 1] = target_row408    409    # Now write the table410    fp = file(the_filename, 'w')411    for row in the_table:412        for colindex, entry in enumerate(row):413            fp.writelines("%s" % entry)414            if colindex == (len(row)-1):415                fp.writelines("\n")416            else:417                fp.writelines("\t")        ...CeleryTest.py
Source:CeleryTest.py  
1# -*- coding: utf-8 -*-2from __future__ import print_function, absolute_import, division3import os4import sys5import unittest6import pickle7sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..'))8from .TaskSpecTest import TaskSpecTest9from eufrates.specs import Celery, WorkflowSpec10from eufrates.operators import Attrib11from eufrates.serializer.dict import DictionarySerializer12from base64 import b64encode13class CeleryTest(TaskSpecTest):14    CORRELATE = Celery15    def create_instance(self):16        if 'testtask' in self.wf_spec.task_specs:17            del self.wf_spec.task_specs['testtask']18        return Celery(self.wf_spec,19                      'testtask', 'call.name',20                      call_args=[Attrib('the_attribute'), 1],21                      description='foo',22                      named_kw=[],23                      dict_kw={}24                      )25    def testTryFire(self):26        pass27    def testRetryFire(self):28        pass29    def testSerializationWithoutKwargs(self):30        new_wf_spec = WorkflowSpec()31        serializer = DictionarySerializer()32        nokw = Celery(self.wf_spec, 'testnokw', 'call.name',33                      call_args=[Attrib('the_attribute'), 1])34        data = nokw.serialize(serializer)35        nokw2 = Celery.deserialize(serializer, new_wf_spec, data)36        self.assertDictEqual(nokw.kwargs, nokw2.kwargs)37        kw = Celery(self.wf_spec, 'testkw', 'call.name',38                    call_args=[Attrib('the_attribute'), 1],39                    some_arg={"key": "value"})40        data = kw.serialize(serializer)41        kw2 = Celery.deserialize(serializer, new_wf_spec, data)42        self.assertDictEqual(kw.kwargs, kw2.kwargs)43        # Has kwargs, but they belong to TaskSpec44        kw_defined = Celery(self.wf_spec, 'testkwdef', 'call.name',45                            call_args=[Attrib('the_attribute'), 1],46                            some_ref=Attrib('value'),47                            defines={"key": "value"})48        data = kw_defined.serialize(serializer)49        kw_defined2 = Celery.deserialize(serializer, new_wf_spec, data)50        self.assertIsInstance(kw_defined2.kwargs['some_ref'], Attrib)51        args = [b64encode(pickle.dumps(v))52                for v in [Attrib('the_attribute'), 'ip', 'dc455016e2e04a469c01a866f11c0854']]53        data = {'R': b64encode(pickle.dumps('1'))}54        # Comes from live data. Bug not identified, but there we are...55        data = {'inputs': ['Wait:1'], 'lookahead': 2, 'description': '',56                'outputs': [], 'args': args,57                'manual': False,58                'data': data, 'locks': [], 'pre_assign': [],59                'call': 'call.x',60                'internal': False, 'post_assign': [], 'id': 8,61                'result_key': None, 'defines': data,62                'class': 'eufrates.specs.Celery.Celery',63                'name': 'RS1:1'}64        Celery.deserialize(serializer, new_wf_spec, data)65def suite():66    try:67        import celery68    except ImportError:69        print("WARNING: Celery not found, not all tests are running!")70        return lambda x: None71    else:72        return unittest.TestLoader().loadTestsFromTestCase(CeleryTest)73if __name__ == '__main__':...orm.py
Source:orm.py  
1import time2from datetime import datetime, date3from decimal import Decimal4from sqlalchemy import create_engine5from sqlalchemy.ext.declarative import declarative_base6from sqlalchemy.orm import sessionmaker7from sqlalchemy.orm.attributes import InstrumentedAttribute8from sqlalchemy.types import String9from config import config10_db_url = 'postgresql+psycopg2://{user}:{password}@{host}/{db}'.format(**config.db_info)11engine = create_engine(_db_url, echo=config.db_echo)12Base = declarative_base(bind=engine)13Session = sessionmaker(bind=engine)14class AlchemyMixin(object):15    def __setattr__(self, key, value):16        # å½ç» String ç±»ååæ®µèµå¼æ¶éå¶å符串çé¿åº¦, å¦ Column(String(20)) 䏿é¿åªè½æ 20 个å符.17        if isinstance(value, str):18            the_attribute = getattr(self.__class__, key, None)19            if isinstance(the_attribute, InstrumentedAttribute):20                # is a column21                if isinstance(the_attribute.property.columns[0].type, String):22                    max_length = the_attribute.property.columns[0].type.length23                    value = value[:max_length]24        super().__setattr__(key, value)25    @classmethod26    def get(cls, object_id, session):27        """28        Get object by object id, if not find, return None.29        """30        the_object = session.query(cls).filter(31            cls.id == object_id32        ).first()33        return the_object34    def to_dict(self, **kwargs):35        """36        å°ä¸ä¸ª SQLAlchemy ç对象 (row, declarative_base) 转为 dict.37        妿 kwargs æ columns, å columns æ¯ææéè¦ç项; è¥æ excluded, å excluded æ¯è¦æé¤ç项. äºè
ä¸å¯å¹¶å.38        """39        d = {}40        columns = kwargs.get('columns')41        if columns:42            columns = [x.key for x in columns]43        else:44            # ä¸éè¦ç项45            excluded = kwargs.get('excluded')46            columns = set(x.name for x in self.__table__.columns)47            if excluded:48                excluded = set(x.key for x in excluded)49                columns -= excluded50        for column in columns:51            value = getattr(self, column)52            if isinstance(value, datetime):53                value = int(time.mktime(value.timetuple()))54            elif isinstance(value, date):55                value = value.isoformat()56            elif isinstance(value, Decimal):57                value = str(value)58            d[column] = value59        return d60def print_create_table_sql(model):61    """62    æå°ä¸æ°æ®è¡¨çå建è¯å¥, 以便æå¨å建ç...63    """64    # from sqlalchemy.schema import CreateTable65    # print('#### Create table SQL for model {}: ####'.format(model))66    # print(CreateTable(model.__table__).compile(engine))67    # print('#' * 20)68def create_table_if_not_exist(the_object):69    """70    妿éè¦ç表ä¸åå¨, å建å®71    """...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
