How to use do_execute method in localstack

Best Python code snippet using localstack_python

stock_actions.py

Source:stock_actions.py Github

copy

Full Screen

...23 def test_pickle(self):24 self.assertIsInstance(pickle.dumps(CreateDirectory("xxx")), bytes)25 def test_success(self):26 self.makedirs.expect("xxx")27 CreateDirectory("xxx").do_execute({})28 self.makedirs.assert_called_once_with("xxx")29 self.isdir.assert_not_called()30 def test_directory_exists(self):31 self.makedirs.side_effect = OSError(errno.EEXIST, "File exists")32 self.isdir.return_value = True33 CreateDirectory("xxx").do_execute({})34 self.makedirs.assert_called_once_with("xxx")35 self.isdir.assert_called_once_with("xxx")36 def test_file_exists(self):37 self.makedirs.side_effect = OSError(errno.EEXIST, "File exists")38 self.isdir.return_value = False39 with self.assertRaises(OSError):40 CreateDirectory("xxx").do_execute({})41 self.makedirs.assert_called_once_with("xxx")42 self.isdir.assert_called_once_with("xxx")43 def test_other_failure(self):44 self.makedirs.side_effect = OSError(-1, "Foobar")45 with self.assertRaises(OSError):46 CreateDirectory("xxx").do_execute({})47 self.makedirs.assert_called_once_with("xxx")48 self.isdir.assert_not_called()49class CallSubprocessTestCase(PatchingTestCase):50 def setUp(self):51 self.check_call = self.patch("subprocess.check_call")52 def test_default_label(self):53 self.assertEqual(CallSubprocess(["xxx", "yyy"]).label, "xxx yyy")54 def test_label(self):55 self.assertEqual(CallSubprocess(["xxx", "yyy"], label="foo").label, "foo")56 def test_accept_failed_dependencies(self):57 self.assertTrue(CallSubprocess(["xxx", "yyy"], accept_failed_dependencies=True).accept_failed_dependencies)58 def test_pickle(self):59 self.assertIsInstance(pickle.dumps(CallSubprocess(["xxx", "yyy"])), bytes)60 def test_simple_call(self):61 CallSubprocess(["xxx"]).do_execute({})62 self.check_call.assert_called_once_with(["xxx"])63 def test_call_with_several_args(self):64 self.check_call.expect(["xxx", "yyy"])65 CallSubprocess(["xxx", "yyy"]).do_execute({})66 self.check_call.assert_called_once_with(["xxx", "yyy"])67 def test_call_with_kwds(self):68 CallSubprocess(["xxx", "yyy"], kwargs=dict(foo="bar")).do_execute({})69 self.check_call.assert_called_once_with(["xxx", "yyy"], foo="bar")70 def test_called_process_error(self):71 self.check_call.side_effect = subprocess.CalledProcessError(1, ["false"], None)72 with self.assertRaises(CalledProcessError) as catcher:73 CallSubprocess(["false"]).do_execute({})74 self.assertEqual(catcher.exception.args, (1, ["false"], None))75class CallSubprocessForRealTestCase(unittest.TestCase):76 def test_called_process_error(self):77 with self.assertRaises(CompoundException) as catcher:78 execute(CallSubprocess(["false"]))79 self.assertEqual(catcher.exception.exceptions[0].args, (1, ["false"], None))80class DeleteFileTestCase(PatchingTestCase):81 def setUp(self):82 self.unlink = self.patch("os.unlink")83 def test_label(self):84 self.assertEqual(DeleteFile("xxx").label, "rm xxx")85 self.unlink.assert_not_called()86 def test_pickle(self):87 self.assertIsInstance(pickle.dumps(DeleteFile("xxx")), bytes)88 def test_success(self):89 DeleteFile("xxx").do_execute({})90 self.unlink.assert_called_once_with("xxx")91 def test_file_does_not_exist(self):92 self.unlink.side_effect = OSError(errno.ENOENT, "No such file or directory")93 DeleteFile("xxx").do_execute({})94 self.unlink.assert_called_once_with("xxx")95 def test_other_failure(self):96 self.unlink.side_effect = OSError(-1, "Foobar")97 with self.assertRaises(OSError):98 DeleteFile("xxx").do_execute({})99 self.unlink.assert_called_once_with("xxx")100class DeleteDirectoryTestCase(PatchingTestCase):101 def setUp(self):102 self.rmtree = self.patch("shutil.rmtree")103 def test_label(self):104 self.assertEqual(DeleteDirectory("xxx").label, "rm -r xxx")105 self.rmtree.assert_not_called()106 def test_pickle(self):107 self.assertIsInstance(pickle.dumps(DeleteDirectory("xxx")), bytes)108 def test_success(self):109 DeleteDirectory("xxx").do_execute({})110 self.rmtree.assert_called_once_with("xxx")111 def test_directory_does_not_exist(self):112 self.rmtree.side_effect = OSError(errno.ENOENT, "No such file or directory")113 DeleteDirectory("xxx").do_execute({})114 self.rmtree.assert_called_once_with("xxx")115 def test_other_failure(self):116 self.rmtree.side_effect = OSError(-1, "Foobar")117 with self.assertRaises(OSError):118 DeleteDirectory("xxx").do_execute({})119 self.rmtree.assert_called_once_with("xxx")120class CopyFileTestCase(PatchingTestCase):121 def setUp(self):122 self.copy = self.patch("shutil.copy")123 def test_label(self):124 self.assertEqual(CopyFile("from", "to").label, "cp from to")125 self.copy.assert_not_called()126 def test_pickle(self):127 self.assertIsInstance(pickle.dumps(CopyFile("from", "to")), bytes)128 def test_success(self):129 CopyFile("from", "to").do_execute({})130 self.copy.assert_called_once_with("from", "to")131 def test_failure(self):132 self.copy.side_effect = OSError(-1, "Foobar")133 with self.assertRaises(OSError):134 CopyFile("from", "to").do_execute({})135 self.copy.assert_called_once_with("from", "to")136class TouchFileTestCase(PatchingTestCase):137 def setUp(self):138 self.open = self.patch("ActionTree.stock.open", new=unittest.mock.mock_open(), create=True)139 self.utime = self.patch("os.utime")140 def test_label(self):141 self.assertEqual(TouchFile("xxx").label, "touch xxx")142 self.open.assert_not_called()143 self.utime.assert_not_called()144 def test_pickle(self):145 self.assertIsInstance(pickle.dumps(TouchFile("xxx")), bytes)146 def test_success(self):147 TouchFile("xxx").do_execute({})148 self.open.assert_called_once_with("xxx", "ab")149 self.open().close.assert_called_once_with()150 self.utime.assert_called_once_with("xxx", None)151class NullActionTestCase(unittest.TestCase):152 def test_label(self):153 self.assertIsNone(NullAction().label)154 def test_pickle(self):155 self.assertIsInstance(pickle.dumps(NullAction()), bytes)156 def test(self):157 NullAction().do_execute({})158class SleepTestCase(PatchingTestCase):159 def setUp(self):160 self.sleep = self.patch("time.sleep")161 def test_label(self):162 self.assertEqual(Sleep(1).label, "sleep 1")163 self.sleep.assert_not_called()164 def test_pickle(self):165 self.assertIsInstance(pickle.dumps(Sleep(1)), bytes)166 def test(self):167 Sleep(1).do_execute({})...

Full Screen

Full Screen

sql_handler.py

Source:sql_handler.py Github

copy

Full Screen

...65 return None66 @current_connection.setter67 def current_connection(self, conn: pymysql.Connection):68 self.conn_locals.conn = conn69 def do_execute(self, cursor, query, args):70 beg = time.time()71 try:72 cursor.execute(query, args)73 finally:74 cost = time.time() - beg75 self.sql_logger.info(f'{self.__class__.__name__}: cost={cost:.3f}s, query={query}, args={args}')76 def do_insert(self, query, args, return_id=False):77 def do_execute():78 with conn.cursor() as cursor:79 self.do_execute(cursor, query, args)80 if return_id:81 # get must before commit82 insert_id = conn.insert_id()83 if return_id:84 return insert_id85 conn = self.current_connection86 if conn is None:87 with self.factory.do_connect() as conn:88 return do_execute()89 else:90 return do_execute()91 def do_select(self, query, args=None, size: typing.Union[int, None] = 0, cursorclass=None):92 """93 :param query:94 :param args:95 :param size: None: 一个;0:全部;>=1 size个96 :param cursorclass:97 :return:98 """99 def do_execute():100 with conn.cursor(cursor=cursorclass) as cursor:101 self.do_execute(cursor, query, args)102 if size is None:103 # 单独获取一个104 return cursor.fetchone()105 elif size == 0:106 # 获取全部107 return cursor.fetchall()108 else:109 assert size >= 1110 return cursor.fetchmany(size)111 conn = self.current_connection112 if conn is None:113 with self.factory.do_connect() as conn:114 return do_execute()115 else:116 return do_execute()117 def do_update(self, query, args=None):118 def do_execute():119 with conn.cursor() as cursor:120 self.do_execute(cursor, query, args)121 conn = self.current_connection122 if conn is None:123 with self.factory.do_connect() as conn:124 return do_execute()125 else:126 return do_execute()127 def do_delete(self, query, args=None):128 def do_execute():129 with conn.cursor() as cursor:130 self.do_execute(cursor, query, args)131 conn = self.current_connection132 if conn is None:133 with self.factory.do_connect() as conn:134 return do_execute()135 else:136 return do_execute()137 def transaction(self, conn: pymysql.Connection = None) -> Transaction:138 if conn is None or not conn.open:139 conn = self.factory.do_connect(autocommit=False)140 self.current_connection = conn...

Full Screen

Full Screen

test_checks.py

Source:test_checks.py Github

copy

Full Screen

...7 content_to_verify = '{"abc123": true}'.encode('utf-8')8 mock_transaction = Mock()9 mock_transaction.op_return = 'e323ace018d459e737988f8c02944224f9d02d2ee58c60eaaf134dd2a36e7d32'10 checker = BinaryFileIntegrityChecker(content_to_verify, mock_transaction)11 self.assertTrue(checker.do_execute())12 def test_compare_hashes_v1_1_fail(self):13 content_to_verify = '{"abc123": true}'.encode('utf-8')14 mock_transaction = Mock()15 mock_transaction.op_return = 'e323ace7777459e737988f8c02944224f9d02d2ee58c60eaaf134dd2a36e7d32'16 checker = BinaryFileIntegrityChecker(content_to_verify, mock_transaction)17 self.assertEquals(False, checker.do_execute())18 def test_expired(self):19 expires = pytz.UTC.localize(datetime(2007, 12, 5))20 checker = ExpiredChecker(expires)21 self.assertEquals(False, checker.do_execute())22 def test_not_expired(self):23 now = datetime.utcnow()24 next_year = now.replace(year=now.year + 1)25 expires = pytz.UTC.localize(next_year)26 checker = ExpiredChecker(expires)27 self.assertTrue(checker.do_execute())28 def test_no_expiration(self):29 expires = None30 checker = ExpiredChecker(expires)31 self.assertTrue(checker.do_execute())32 def test_check_not_revoked_fail(self):33 revoked_addresses = set()34 revoked_addresses.add('123456')35 revoked_addresses.add('444444')36 addresses_to_check = ['123456']37 revoked_checker = RevocationChecker(addresses_to_check, revoked_addresses)38 result = revoked_checker.do_execute()39 self.assertFalse(result)40 def test_check_not_revoked_pass(self):41 revoked_addresses = set()42 revoked_addresses.add('343434')43 revoked_addresses.add('444444')44 addresses_to_check = ['123456']45 revoked_checker = RevocationChecker(addresses_to_check, revoked_addresses)46 result = revoked_checker.do_execute()47 self.assertTrue(result)48 def test_verify_expired(self):49 with open('data/1.2/expired.json') as cert_file:50 cert_json = json.load(cert_file)51 certificate_model = model.to_certificate_model(certificate_json=cert_json)52 not_expired_checker = ExpiredChecker(certificate_model.expires)53 self.assertFalse(not_expired_checker.do_execute())54 def test_not_yet_expired(self):55 with open('data/1.2/not_yet_expired.json') as cert_file:56 cert_json = json.load(cert_file)57 certificate_model = model.to_certificate_model(certificate_json=cert_json)58 not_expired_checker = ExpiredChecker(certificate_model.expires)59 self.assertTrue(not_expired_checker.do_execute())60if __name__ == '__main__':...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful