How to use _create_connection method in tempest

Best Python code snippet using tempest_python

db.py

Source:db.py Github

copy

Full Screen

...17class DbConnector(object):18 def __init__(self, db):19 self._run_sql_script(db, self._get_init_script())20 self.db = db21 def _create_connection(self, db=None):22 if not db:23 db = self.db24 try:25 con = _orm.connect(db)26 except _orm.DatabaseError as err:27 raise WalletDbConectionError(err)28 return con29 @staticmethod30 def _get_init_script():31 from os.path import dirname32 import os33 rootdir = dirname(dirname(os.path.abspath(34 __file__)))35 return os.path.join(rootdir, 'init-db.sql')36 def _run_sql_script(self, db, script):37 # TODO: The present module is agnostic to the ORM in use with the38 # exception of this function (executescript is sqlite spcific).39 con = self._create_connection(db)40 cur = con.cursor()41 with open(script, 'r') as f:42 sql = f.read()43 cur.executescript(sql)44 con.close()45 def _dump_dict(self, entry):46 return json.dumps(entry, sort_keys=True)47 def _load_dict(self, body):48 return json.loads(body)49 def fetch_entry(self, alias, table):50 con = self._create_connection()51 cur = con.cursor()52 query = f'''53 SELECT body FROM '{table}' WHERE alias = '{alias}'54 '''55 try:56 cur.execute(query)57 except _orm.DatabaseError as err:58 raise DbQueryError(err)59 body = cur.fetchone()[0]60 con.close()61 out = self._load_dict(body)62 return out63 def fetch_nr(self, table):64 con = self._create_connection()65 cur = con.cursor()66 query = f'''67 SELECT COUNT(*) FROM '{table}'68 '''69 try:70 cur.execute(query)71 except _orm.DatabaseError as err:72 raise DbQueryError(err)73 out = cur.fetchone()[0]74 con.close()75 return out76 def fetch_aliases(self, table):77 con = self._create_connection()78 cur = con.cursor()79 query = f'''80 SELECT alias FROM '{table}'81 '''82 try:83 cur.execute(query)84 except _orm.DatabaseError as err:85 raise DbQueryError(err)86 out = list(map(lambda _: _[0], cur.fetchall()))87 con.close()88 return out89 def fetch_credentials_by_holder(self, alias):90 con = self._create_connection()91 cur = con.cursor()92 query = f'''93 SELECT alias FROM vc WHERE holder = '{alias}'94 '''95 try:96 cur.execute(query)97 except _orm.DatabaseError as err:98 raise DbQueryError(err)99 out = list(map(lambda _: _[0], cur.fetchall()))100 con.close()101 return out102 def store_key(self, alias, entry):103 con = self._create_connection()104 cur = con.cursor()105 body = self._dump_dict(entry)106 query = f'''107 INSERT INTO key(alias, body)108 VALUES ('{alias}', '{body}')109 '''110 try:111 cur.execute(query)112 except _orm.DatabaseError as err:113 raise DbQueryError(err)114 con.commit()115 con.close()116 return alias117 def store_did(self, alias, key, entry):118 con = self._create_connection()119 cur = con.cursor()120 body = self._dump_dict(entry)121 query = f'''122 INSERT INTO did(key, alias, body)123 VALUES ('{key}', '{alias}', '{body}')124 '''125 try:126 cur.execute(query)127 except _orm.DatabaseError as err:128 raise DbQueryError(err)129 con.commit()130 con.close()131 return alias132 def store_vc(self, alias, holder, entry):133 con = self._create_connection()134 cur = con.cursor()135 body = self._dump_dict(entry)136 query = f'''137 INSERT INTO vc(holder, alias, body)138 VALUES ('{holder}', '{alias}', '{body}')139 '''140 try:141 cur.execute(query)142 except _orm.DatabaseError as err:143 raise DbQueryError(err)144 con.commit()145 con.close()146 return alias147 def store_vp(self, alias, holder, entry):148 con = self._create_connection()149 cur = con.cursor()150 body = self._dump_dict(entry)151 query = f'''152 INSERT INTO vp(holder, alias, body)153 VALUES ('{holder}', '{alias}', '{body}')154 '''155 try:156 cur.execute(query)157 except _orm.DatabaseError as err:158 raise DbQueryError(err)159 con.commit()160 con.close()161 return alias162 def remove(self, alias, table):163 con = self._create_connection()164 cur = con.cursor()165 query = f'''166 DELETE FROM '{table}' WHERE alias = '{alias}'167 '''168 try:169 cur.execute(query)170 except _orm.DatabaseError as err:171 raise DbQueryError(err)172 con.commit()173 con.close()174 def clear(self, table):175 con = self._create_connection()176 cur = con.cursor()177 query = f'''178 DELETE FROM '{table}'179 '''180 try:181 cur.execute(query)182 except _orm.DatabaseError as err:183 raise DbQueryError(err)184 con.commit()...

Full Screen

Full Screen

test__backdoor.py

Source:test__backdoor.py Github

copy

Full Screen

...29 assert self._server is None30 self._server = backdoor.BackdoorServer(('127.0.0.1', 0), *args, **kwargs)31 self._close_on_teardown(self._server.stop)32 self._server.start()33 def _create_connection(self):34 conn = socket.socket()35 self._close_on_teardown(conn)36 conn.connect(('127.0.0.1', self._server.server_port))37 return conn38 def _close(self, conn, cmd=b'quit()\r\n)'):39 conn.sendall(cmd)40 line = readline(conn)41 self.assertEqual(line, '')42 conn.close()43 self.close_on_teardown.remove(conn)44 @greentest.skipOnLibuvOnTravisOnCPython27(45 "segfaults; "46 "See https://github.com/gevent/gevent/pull/1156")47 def test_multi(self):48 self._make_server()49 def connect():50 conn = self._create_connection()51 read_until(conn, b'>>> ')52 conn.sendall(b'2+2\r\n')53 line = readline(conn)54 self.assertEqual(line.strip(), '4', repr(line))55 self._close(conn)56 jobs = [gevent.spawn(connect) for _ in range(10)]57 done = gevent.joinall(jobs, raise_error=True)58 self.assertEqual(len(done), len(jobs), done)59 @greentest.skipOnAppVeyor("Times out")60 def test_quit(self):61 self._make_server()62 conn = self._create_connection()63 read_until(conn, b'>>> ')64 self._close(conn)65 @greentest.skipOnAppVeyor("Times out")66 def test_sys_exit(self):67 self._make_server()68 conn = self._create_connection()69 read_until(conn, b'>>> ')70 self._close(conn, b'import sys; sys.exit(0)\r\n')71 @greentest.skipOnAppVeyor("Times out")72 def test_banner(self):73 banner = "Welcome stranger!" # native string74 self._make_server(banner=banner)75 conn = self._create_connection()76 response = read_until(conn, b'>>> ')77 self.assertEqual(response[:len(banner)], banner, response)78 self._close(conn)79 @greentest.skipOnAppVeyor("Times out")80 def test_builtins(self):81 self._make_server()82 conn = self._create_connection()83 read_until(conn, b'>>> ')84 conn.sendall(b'locals()["__builtins__"]\r\n')85 response = read_until(conn, b'>>> ')86 self.assertTrue(len(response) < 300, msg="locals() unusable: %s..." % response)87 self._close(conn)88 def test_switch_exc(self):89 from gevent.queue import Queue, Empty90 def bad():91 q = Queue()92 print('switching out, then throwing in')93 try:94 q.get(block=True, timeout=0.1)95 except Empty:96 print("Got Empty")97 print('switching out')98 gevent.sleep(0.1)99 print('switched in')100 self._make_server(locals={'bad': bad})101 conn = self._create_connection()102 read_until(conn, b'>>> ')103 conn.sendall(b'bad()\r\n')104 response = read_until(conn, b'>>> ')105 response = response.replace('\r\n', '\n')106 self.assertEqual('switching out, then throwing in\nGot Empty\nswitching out\nswitched in\n>>> ', response)107 self._close(conn)108if __name__ == '__main__':...

Full Screen

Full Screen

redis_list_test.py

Source:redis_list_test.py Github

copy

Full Screen

2import asyncio3from redisobjects import connect_fakeredis4from .async_test import async_test5class RedisListTest(unittest.TestCase):6 def _create_connection(self):7 return connect_fakeredis()8 @async_test9 async def test_new_list_is_empty(self):10 redis = self._create_connection()11 test_list = redis.list('test:list')12 self.assertEqual(0, await test_list.size())13 @async_test14 async def test_for_every_element_right_pushed_list_size_grows_by_one(self):15 redis = self._create_connection()16 test_list = redis.list('test:list')17 n = 518 for i in range(n):19 self.assertEqual(i, await test_list.size())20 await test_list.push_right('dummy_value_%s' % (i,))21 self.assertEqual(n, await test_list.size())22 @async_test23 async def test_for_every_element_left_pushed_list_size_grows_by_one(self):24 redis = self._create_connection()25 test_list = redis.list('test:list')26 n = 527 for i in range(n):28 self.assertEqual(i, await test_list.size())29 await test_list.push_left('dummy_value_%s' % (i,))30 self.assertEqual(n, await test_list.size())31 @async_test32 async def test_right_pushing_preserves_order(self):33 redis = self._create_connection()34 test_list = redis.list('redis:list')35 await test_list.push_right(b'a', b'b', b'c')36 await test_list.push_right(b'd', b'e')37 self.assertEqual([b'a', b'b', b'c', b'd', b'e'], await test_list.list())38 @async_test39 async def test_left_pushing_preserves_inverse_order(self):40 redis = self._create_connection()41 test_list = redis.list('redis:list')42 await test_list.push_left(b'a', b'b', b'c')43 await test_list.push_left(b'd', b'e')44 self.assertEqual([b'e', b'd', b'c', b'b', b'a'], await test_list.list())45 @async_test46 async def test_removed_elements_are_not_contained_in_list(self):47 redis = self._create_connection()48 test_list = redis.list('redis:list')49 await test_list.push_right(b'a', b'b', b'c', b'd', b'e')50 await test_list.remove(b'b')51 await test_list.remove(b'c')52 await test_list.remove(b'e')...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful