How to use the_attribute method in Sure

Best Python code snippet using sure_python

networkio.py

Source:networkio.py Github

copy

Full Screen

1from .. core.Edge import Edge2from .. core.Node import Node3from .. core.NodeType import NodeType4from .. core.Network import Network5from ..core.shared_functions import test_kwarg6from ..core.parameters import saved_node_attribute_list7from ..core.parameters import saved_edge_attribute_list8from ..core.parameters import saved_network_attribute_list9def is_float_try(the_string):10 """ Simple test to check if a string can be represented as a 11 floating point.12 """13 try:14 float(the_string)15 return True16 except ValueError:17 return False18 19def string_to_list(the_string, force_to_float = True, sep_char = ", "):20 """ Convert strings read from file to Python lists21 """22 the_string = the_string.lstrip("[")23 the_string = the_string.rstrip("]")24 the_list = the_string.split(sep_char)25 if the_list[0].startswith("'") & the_list[0].endswith("'"):26 the_list = [x.lstrip("'").rstrip("'") for x in the_list]27 elif the_list[0].startswith("\'") & the_list[0].endswith("\'"):28 the_list = [x.lstrip("\'").rstrip("\'") for x in the_list] 29 for index, the_value in enumerate(the_list):30 31 if is_float_try(the_value) & (force_to_float == True):32 the_list[index] = float(the_value)33 else:34 the_list[index] = the_value35 return the_list 36def pickle_network(the_network, the_filename, **kwargs):37 """ Break apart and save the network38 Arguments:39 the_network: object to save40 filename: effectively a prefix, don't include .pickle, .npy, etc..., these will be added41 kwargs:42 path: directory to save to43 44 """45 # collections does not appear to be well preserved46 # across platforms, so switched to a47 # list to improve pickle portability48 import cPickle49 # don't need to copy since the object50 # will be written to file.51 # from copy import deepcopy52 from numpy import save53 from numpy import load54 import os55 if 'path' in kwargs:56 path = kwargs['path']57 else:58 path = ""59 # We avoid dependence on60 # package classes. This will ensure models can be61 # fully ported even with base class updates.62 the_node_list_dict = [] 63 the_edge_list_dict = []64 if len(the_network.nodetypes) == 1:65 the_nodes = the_network.nodetypes[0].nodes66 for the_node in the_nodes:67 the_dict = {}68 the_dict[the_node.id] = {}69 for the_attribute in saved_node_attribute_list:70 the_dict[the_node.id][the_attribute] = getattr(the_node, the_attribute)71 the_node_list_dict.append(the_dict)72 for the_edge in the_network.edges:73 the_dict = {}74 the_dict[the_edge.id] = {}75 for the_attribute in saved_edge_attribute_list:76 the_dict[the_edge.id][the_attribute] = getattr(the_edge, the_attribute)77 the_dict[the_edge.id]['nodes'] = [the_edge._nodes[0].id, the_edge._nodes[1].id]78 the_edge_list_dict.append(the_dict) 79 the_network_dict = {}80 the_network_dict['nodes'] = the_node_list_dict81 the_network_dict['edges'] = the_edge_list_dict82 for the_attribute in saved_network_attribute_list:83 if the_attribute in dir(the_network):84 the_network_dict[the_attribute] = getattr(the_network, the_attribute)85 the_network_dict['id'] = the_network.id86 if not the_filename.endswith(".pickle"):87 the_filename += ".pickle"88 fp = open(path + the_filename, "wb")89 cPickle.dump(the_network_dict, fp)90 fp.close()91 else:92 print "Save error. Convert to monopartite network to save."93def load_pickled_network(the_filename, **kwargs):94 """ Load network object from file95 Arguments:96 the_filename: effectively a prefix, don't include .pickle, .npy, etc..., these will be added97 kwargs:98 path: directory to load from99 100 """101 import cPickle102 import os103 if 'path' in kwargs:104 path = kwargs['path']105 else:106 path = ""107 verbose = test_kwarg('verbose', kwargs, [False, True])108 if not the_filename.endswith(".pickle"):109 the_filename += ".pickle"110 111 if verbose:112 print 'Loading network file %s ...' %(path + the_filename)113 114 fp = open(path + the_filename, "rb")115 the_network_dict = cPickle.load(fp)116 fp.close()117 if verbose:118 print '... file loaded. Creating network nodes ...' 119 the_network = Network(the_network_dict['id'])120 for the_attribute in saved_network_attribute_list:121 if the_attribute in dir(the_network):122 setattr(the_network, the_attribute, the_network_dict[the_attribute])123 the_node_list_dict = the_network_dict['nodes']124 the_edge_list_dict = the_network_dict['edges']125 the_node_ids = [x.keys()[0] for x in the_node_list_dict]126 the_nodetype = the_network.nodetypes[0]127 the_nodetype.add_nodes(the_node_ids)128 for the_index, the_node in enumerate(the_nodetype.nodes):129 for the_attribute in saved_node_attribute_list:130 if the_attribute in dir(the_node):131 setattr(the_node, the_attribute, the_node_list_dict[the_index][the_node.id][the_attribute])132 if verbose:133 print '... nodes created. Linking nodes ...' 134 the_node_pair_list = []135 for the_index, the_edge_dict in enumerate(the_edge_list_dict):136 # We allow for updating the edge id here with the new default separation character137 the_edge_dict = the_edge_dict[the_edge_dict.keys()[0]]138 the_node_pair = [the_network.nodetypes[0].nodes.get_by_id(the_edge_dict['nodes'][0]), the_network.nodetypes[0].nodes.get_by_id(the_edge_dict['nodes'][1])]139 the_node_pair_list.append(the_node_pair)140 the_edge_list = the_network.connect_node_pair_list(the_node_pair_list)141 142 for the_index, the_edge in enumerate(the_edge_list):143 the_edge_dict = the_edge_list_dict[the_index]144 the_edge_dict = the_edge_dict[the_edge_dict.keys()[0]]145 for the_attribute in saved_edge_attribute_list:146 if the_attribute in dir(the_edge):147 setattr(the_edge, the_attribute, the_edge_dict[the_attribute])148 149 the_network.update()150 151 return the_network152def create_network_model_from_textfile(network_id, network_file, **kwargs):153 """ Script to build a basic network model154 from a network model. Will reserve annotation,155 initialization for helper functions to call later156 Arguments:157 network_id: string, name of the model158 file_name: name of the network file. 2 columns tab delimited, with node1 node2159 kwargs:160 none, just a pass-through161 Returns:162 network_model163 """164 verbose = test_kwarg('verbose', kwargs, [False, True])165 if verbose:166 print "Reading the network file..."167 168 fp = open(network_file, 'rU')169 the_list = fp.readlines()170 the_list = [x.rstrip("\n") for x in the_list]171 fp.close()172 if verbose:173 print " ... read the file." 174 print "Creating the nodes..."175 176 the_nodes_1 = []177 the_nodes_2 = []178 for the_entry in the_list:179 the_entry = the_entry.split("\t")180 the_nodes_1.append(the_entry[0])181 the_nodes_2.append(the_entry[1])182 the_nodes = list(set(the_nodes_1 + the_nodes_2))183 the_nodes.sort()184 the_network = Network(network_id)185 the_nodetype = the_network.nodetypes[0]186 the_nodetype.add_nodes(the_nodes)187 # we will define nodes as the default 'monopartite' nodetype188 for the_node in the_nodetype.nodes:189 the_node.set_nodetype(the_nodetype.id)190 191 if verbose:192 print " ... the nodes are created."193 print "Linking the nodes, this may take a while..."194 the_nodes_1 = [the_network.nodetypes[0].nodes.get_by_id(the_id) for the_id in the_nodes_1]195 the_nodes_2 = [the_network.nodetypes[0].nodes.get_by_id(the_id) for the_id in the_nodes_2]196 # Get rid of duplicate entries197 the_node_pairs = [[the_nodes_1[i], the_nodes_2[i]] for i in range(0, len(the_nodes_1))]198 # Get rid of pairs where node pairs with itself199 the_node_pairs = [x for x in the_node_pairs if (x[0] != x[1])]200 201 the_edge_list = the_network.connect_node_pair_list(the_node_pairs, **kwargs)202 203 the_network.update()204 return the_network205def create_source_dict_from_textfile(source_file, **kwargs):206 """ Script to create a dict of source values. This207 will attempt to pull more ids with bioservices if more ids are208 specified.209 Arguments:210 source_file: name of the file that contains the sources.211 1 or 2 columns tab delimited, with node /t value_1212 node: id will be used for pairing to netowrk model nodes213 value_1: optional, assumed to be 1.214 kwargs: 215 none, just a pass through216 Returns:217 source_dict218 219 """220 continue_flag = True221 222 try:223 fp = open(source_file, 'rU')224 the_list = fp.readlines()225 the_list = [x.rstrip("\n") for x in the_list]226 limit_dict = {}227 except:228 print('Cannot load the source file. Exiting...')229 continue_flag = False230 source_dict = {}231 if continue_flag:232 source_dict = {}233 for the_entry in the_list:234 the_entry = the_entry.split("\t")235 file_node_id = the_entry[0]236 source_dict[file_node_id] = {}237 if len(the_entry) == 1:238 the_value = 1.239 else:240 the_value = float(the_entry[1])241 source_dict[file_node_id]['value'] = the_value242 return source_dict243def read_table_file_to_dict(filename, **kwargs):244 """simple function to extract a dict of dicts as245 {top_key:{key: value}} from an 246 Unix / tab-delineated file. This is useful for getting 247 node / edge attributes stored in text format. The first row248 is assumed to be header information.249 Arguments:250 filename251 kwargs:252 top_key: if left blank, the first entry in the first line is taken to253 be the highest level key for the 254 subfield_key_list: a list of fields to include. Leave blank if all255 fields are to be included.256 comment_char: Lines starting with this character/string257 will be ignored258 sep_char: Separation character259 interpret_lists: [True (default), False]260 columns_on_top: [False (default), True]261 if True, the columns will be used262 as the top level keys.263 Returns:264 organized_return_dict265 266 267 """268 if 'top_key' in kwargs:269 top_key = kwargs['top_key']270 else:271 top_key = ''272 if 'subfield_key_list' in kwargs:273 subfield_key_list = kwargs['subfield_key_list']274 else:275 subfield_key_list = []276 if 'comment_char' in kwargs:277 comment_char = kwargs['comment_char']278 else:279 comment_char = ""280 if 'sep_char' in kwargs:281 sep_char = kwargs['sep_char']282 else:283 sep_char = "\t"284 columns_on_top = test_kwarg('columns_on_top',kwargs,[False, True])285 286 force_to_float = test_kwarg('force_to_float', kwargs, [True, False])287 interpret_lists = test_kwarg('interpret_lists', kwargs, [True, False])288 289 fp = open(filename, 'rU')290 the_list = fp.readlines()291 292 the_list = [x.rstrip("\n") for x in the_list]293 294 if (len(comment_char) > 0):295 found_first = False296 while not(found_first):297 if the_list[0].startswith(comment_char):298 the_list.pop(0)299 else:300 found_first = True301 302 if top_key == '':303 firstline = the_list[0].split(sep_char)304 key_index = 0305 top_key = firstline[0]306 else:307 firstline = the_list[0].split(sep_char)308 key_index = firstline.index(top_key) 309 if (len(subfield_key_list) > 0):310 valueindices = [firstline.index(subfield_key) for subfield_key in subfield_key_list]311 else:312 valueindices = [x for x in range(0, len(firstline)) if (x != key_index)]313 subfield_key_list = [firstline[x] for x in valueindices]314 the_list.pop(0)315 316 return_dict={}317 for the_line in the_list:318 if ((len(comment_char) == 0) | (not(the_line.startswith(comment_char)))):319 the_line = the_line.split(sep_char)320 return_dict[the_line[key_index]] = {}321 for index, subfield_key in enumerate(subfield_key_list):322 cur_value = the_line[valueindices[index]]323 if is_float_try(cur_value) & (force_to_float == True):324 cur_value = float(cur_value)325 else:326 if ((cur_value.startswith('"')) & (cur_value.endswith(('"')))):327 # The double-quotes are sometimes inserted before328 # and after long string/lists329 # and aren't needed330 cur_value = cur_value.lstrip('"').rstrip('"')331 if interpret_lists:332 if type(cur_value) != list:333 if ((cur_value.startswith('[')) & (cur_value.endswith((']')))):334 cur_value = string_to_list(cur_value, force_to_float = force_to_float)335 return_dict[the_line[key_index]][subfield_key] = cur_value336 if columns_on_top:337 organized_return_dict = {}338 for the_column_header in subfield_key_list:339 organized_return_dict[the_column_header] = {}340 for the_row_name in return_dict.keys():341 organized_return_dict[the_column_header][the_row_name] = return_dict[the_row_name][the_column_header]342 343 else:344 organized_return_dict = return_dict345 346 return organized_return_dict347def write_dict_to_textfile(the_filename, the_output_dict, **kwargs):348 """ Write a dict of dicts to a tab-delimited textfile table.349 Arguments:350 the_filename: file to write to351 the_output_dict: the dict to write352 kwargs:353 top_key: string to describe the top key / id in the table.354 subfields_on_top: [False (default), True]355 whether to write to flip the order356 so the subfields become the column headers in357 the table and the top levels keys become the358 row names.359 Returns:360 nothing, just writes to file361 362 """363 from copy import deepcopy364 subfields_on_top = test_kwarg('subfields_on_top',kwargs,[False, True])365 if 'top_key' in kwargs:366 top_key = kwargs['top_key']367 else:368 top_key = 'name'369 if not subfields_on_top:370 all_top_ids = the_output_dict.keys()371 all_subfield_ids = set([])372 for the_key in all_top_ids:373 all_subfield_ids.update(set(the_output_dict[the_key].keys()))374 all_subfield_ids = list(all_subfield_ids)375 all_subfield_ids.sort()376 all_top_ids.sort()377 the_original_dict = deepcopy(the_output_dict)378 the_output_dict = {}379 for the_new_top_key in all_subfield_ids:380 the_output_dict[the_new_top_key] = {}381 for the_new_subfield_key in all_top_ids:382 the_output_dict[the_new_top_key][the_new_subfield_key] = the_original_dict[the_new_subfield_key][the_new_top_key]383 # First we convert to a table to align384 # the columns385 # First scan through and get all the keys 386 all_entries = the_output_dict.keys()387 all_entries.sort() 388 col_names = set([])389 for cur_entry in the_output_dict.keys():390 col_names.update(set(the_output_dict[cur_entry].keys()))391 col_names = [x for x in col_names]392 col_names.sort()393 the_table = []394 the_table.append([])395 the_table[0].append(top_key)396 the_table[0].extend(col_names)397 for row_index, entry in enumerate(all_entries):398 the_table.append([])399 target_row=[]400 target_row.append(entry)401 for target_col_index, cur_col in enumerate(col_names):402 try:403 cur_value = the_output_dict[entry][cur_col]404 except:405 cur_value=''406 target_row.append(cur_value)407 the_table[row_index + 1] = target_row408 409 # Now write the table410 fp = file(the_filename, 'w')411 for row in the_table:412 for colindex, entry in enumerate(row):413 fp.writelines("%s" % entry)414 if colindex == (len(row)-1):415 fp.writelines("\n")416 else:417 fp.writelines("\t") ...

Full Screen

Full Screen

CeleryTest.py

Source:CeleryTest.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2from __future__ import print_function, absolute_import, division3import os4import sys5import unittest6import pickle7sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..'))8from .TaskSpecTest import TaskSpecTest9from eufrates.specs import Celery, WorkflowSpec10from eufrates.operators import Attrib11from eufrates.serializer.dict import DictionarySerializer12from base64 import b64encode13class CeleryTest(TaskSpecTest):14 CORRELATE = Celery15 def create_instance(self):16 if 'testtask' in self.wf_spec.task_specs:17 del self.wf_spec.task_specs['testtask']18 return Celery(self.wf_spec,19 'testtask', 'call.name',20 call_args=[Attrib('the_attribute'), 1],21 description='foo',22 named_kw=[],23 dict_kw={}24 )25 def testTryFire(self):26 pass27 def testRetryFire(self):28 pass29 def testSerializationWithoutKwargs(self):30 new_wf_spec = WorkflowSpec()31 serializer = DictionarySerializer()32 nokw = Celery(self.wf_spec, 'testnokw', 'call.name',33 call_args=[Attrib('the_attribute'), 1])34 data = nokw.serialize(serializer)35 nokw2 = Celery.deserialize(serializer, new_wf_spec, data)36 self.assertDictEqual(nokw.kwargs, nokw2.kwargs)37 kw = Celery(self.wf_spec, 'testkw', 'call.name',38 call_args=[Attrib('the_attribute'), 1],39 some_arg={"key": "value"})40 data = kw.serialize(serializer)41 kw2 = Celery.deserialize(serializer, new_wf_spec, data)42 self.assertDictEqual(kw.kwargs, kw2.kwargs)43 # Has kwargs, but they belong to TaskSpec44 kw_defined = Celery(self.wf_spec, 'testkwdef', 'call.name',45 call_args=[Attrib('the_attribute'), 1],46 some_ref=Attrib('value'),47 defines={"key": "value"})48 data = kw_defined.serialize(serializer)49 kw_defined2 = Celery.deserialize(serializer, new_wf_spec, data)50 self.assertIsInstance(kw_defined2.kwargs['some_ref'], Attrib)51 args = [b64encode(pickle.dumps(v))52 for v in [Attrib('the_attribute'), 'ip', 'dc455016e2e04a469c01a866f11c0854']]53 data = {'R': b64encode(pickle.dumps('1'))}54 # Comes from live data. Bug not identified, but there we are...55 data = {'inputs': ['Wait:1'], 'lookahead': 2, 'description': '',56 'outputs': [], 'args': args,57 'manual': False,58 'data': data, 'locks': [], 'pre_assign': [],59 'call': 'call.x',60 'internal': False, 'post_assign': [], 'id': 8,61 'result_key': None, 'defines': data,62 'class': 'eufrates.specs.Celery.Celery',63 'name': 'RS1:1'}64 Celery.deserialize(serializer, new_wf_spec, data)65def suite():66 try:67 import celery68 except ImportError:69 print("WARNING: Celery not found, not all tests are running!")70 return lambda x: None71 else:72 return unittest.TestLoader().loadTestsFromTestCase(CeleryTest)73if __name__ == '__main__':...

Full Screen

Full Screen

orm.py

Source:orm.py Github

copy

Full Screen

1import time2from datetime import datetime, date3from decimal import Decimal4from sqlalchemy import create_engine5from sqlalchemy.ext.declarative import declarative_base6from sqlalchemy.orm import sessionmaker7from sqlalchemy.orm.attributes import InstrumentedAttribute8from sqlalchemy.types import String9from config import config10_db_url = 'postgresql+psycopg2://{user}:{password}@{host}/{db}'.format(**config.db_info)11engine = create_engine(_db_url, echo=config.db_echo)12Base = declarative_base(bind=engine)13Session = sessionmaker(bind=engine)14class AlchemyMixin(object):15 def __setattr__(self, key, value):16 # 当给 String 类型字段赋值时限制字符串的长度, 如 Column(String(20)) 中最长只能有 20 个字符.17 if isinstance(value, str):18 the_attribute = getattr(self.__class__, key, None)19 if isinstance(the_attribute, InstrumentedAttribute):20 # is a column21 if isinstance(the_attribute.property.columns[0].type, String):22 max_length = the_attribute.property.columns[0].type.length23 value = value[:max_length]24 super().__setattr__(key, value)25 @classmethod26 def get(cls, object_id, session):27 """28 Get object by object id, if not find, return None.29 """30 the_object = session.query(cls).filter(31 cls.id == object_id32 ).first()33 return the_object34 def to_dict(self, **kwargs):35 """36 将一个 SQLAlchemy 的对象 (row, declarative_base) 转为 dict.37 如果 kwargs 有 columns, 则 columns 是所有需要的项; 若有 excluded, 则 excluded 是要排除的项. 二者不可并存.38 """39 d = {}40 columns = kwargs.get('columns')41 if columns:42 columns = [x.key for x in columns]43 else:44 # 不需要的项45 excluded = kwargs.get('excluded')46 columns = set(x.name for x in self.__table__.columns)47 if excluded:48 excluded = set(x.key for x in excluded)49 columns -= excluded50 for column in columns:51 value = getattr(self, column)52 if isinstance(value, datetime):53 value = int(time.mktime(value.timetuple()))54 elif isinstance(value, date):55 value = value.isoformat()56 elif isinstance(value, Decimal):57 value = str(value)58 d[column] = value59 return d60def print_create_table_sql(model):61 """62 打印下数据表的创建语句, 以便手动创建等...63 """64 # from sqlalchemy.schema import CreateTable65 # print('#### Create table SQL for model {}: ####'.format(model))66 # print(CreateTable(model.__table__).compile(engine))67 # print('#' * 20)68def create_table_if_not_exist(the_object):69 """70 如果需要的表不存在, 创建它71 """...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Sure automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful