How to use _table_exists method in localstack

Best Python code snippet using localstack_python

migrations.py

Source:migrations.py Github

copy

Full Screen

...38 self._tables[key] = tables[key]39 self._connect()40 def create_roles_table(self):41 """Create Roles Table"""42 if self._table_exists(self._tables['prefix'] + self._tables['roles_table']):43 return None44 query="""CREATE TABLE IF NOT EXISTS `{prefix}{table}` (45 `id` int(10) unsigned NOT NULL AUTO_INCREMENT,46 `name` varchar(255) COLLATE utf8_unicode_ci NOT NULL,47 `display_name` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL,48 `description` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL,49 `created_at` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',50 `updated_at` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',51 `enabled` tinyint(1) NOT NULL DEFAULT '0',52 PRIMARY KEY (`id`),53 UNIQUE KEY `roles_name_unique` (`name`)54 ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;""".format(prefix=self._tables['prefix'], table=self._tables['roles_table'])55 return self._query(query)56 def create_permissions_table(self):57 """Create Permissions Table"""58 if self._table_exists(self._tables['prefix'] + self._tables['permissions_table']):59 return None60 query="""CREATE TABLE IF NOT EXISTS `{prefix}{table}` (61 `id` int(10) unsigned NOT NULL AUTO_INCREMENT,62 `name` varchar(255) COLLATE utf8_unicode_ci NOT NULL,63 `display_name` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL,64 `description` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL,65 `created_at` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',66 `updated_at` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',67 `enabled` tinyint(1) NOT NULL DEFAULT '0',68 PRIMARY KEY (`id`),69 UNIQUE KEY `permissions_name_unique` (`name`)70 ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;""".format(prefix=self._tables['prefix'], table=self._tables['permissions_table'])71 return self._query(query)72 def create_permission_role_table(self):73 """Create Permission Role Table"""74 if self._table_exists(self._tables['prefix'] + self._tables['permission_role_table']):75 return None76 query="""CREATE TABLE IF NOT EXISTS `{prefix}{table}` (77 `permission_id` int(10) unsigned NOT NULL,78 `role_id` int(10) unsigned NOT NULL,79 PRIMARY KEY (`permission_id`,`role_id`),80 KEY `permission_role_role_id_foreign` (`role_id`),81 CONSTRAINT `permission_role_permission_id_foreign` FOREIGN KEY (`permission_id`) REFERENCES `{permissions_table}` (`id`) ON DELETE CASCADE ON UPDATE CASCADE,82 CONSTRAINT `permission_role_role_id_foreign` FOREIGN KEY (`role_id`) REFERENCES `{roles_table}` (`id`) ON DELETE CASCADE ON UPDATE CASCADE83 ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;""".format(prefix=self._tables['prefix'], table=self._tables['permission_role_table'], roles_table=self._tables['prefix'] + self._tables['roles_table'], permissions_table=self._tables['prefix'] + self._tables['permissions_table'])84 return self._query(query)85 def create_role_user_table(self):86 """Create Role User Table"""87 if self._tables['users_table'] != False and self._tables['users_table_id'] != False and self._table_exists(self._tables['users_table']):88 users_table_constraint=""",89 CONSTRAINT `role_user_user_id_foreign` FOREIGN KEY (`user_id`) REFERENCES `{users_table}` (`{users_table_id}`) ON DELETE CASCADE ON UPDATE CASCADE90 """.format(users_table=self._tables['users_table'], users_table_id=self._tables['users_table_id'])91 else:92 users_table_constraint=""93 if self._table_exists(self._tables['prefix'] + self._tables['role_user_table']):94 return None95 query="""CREATE TABLE IF NOT EXISTS `{prefix}{table}` (96 `user_id` int(10) unsigned NOT NULL,97 `role_id` int(10) unsigned NOT NULL,98 PRIMARY KEY (`user_id`,`role_id`),99 KEY `role_user_role_id_foreign` (`role_id`),100 CONSTRAINT `role_user_role_id_foreign` FOREIGN KEY (`role_id`) REFERENCES `{roles_table}` (`id`) ON DELETE CASCADE ON UPDATE CASCADE{users_table_constraint}101 ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;""".format(prefix=self._tables['prefix'], table=self._tables['role_user_table'], users_table_constraint=users_table_constraint, roles_table=self._tables['prefix'] + self._tables['roles_table'], permissions_table=self._tables['prefix'] + self._tables['permissions_table'])102 return self._query(query)103 def create_permission_user_table(self):104 """Create Permission User Table"""105 if self._tables['users_table'] != False and self._tables['users_table_id'] != False and self._table_exists(self._tables['users_table']):106 users_table_constraint=""",107 CONSTRAINT `permission_user_user_id_foreign` FOREIGN KEY (`user_id`) REFERENCES `{users_table}` (`{users_table_id}`) ON DELETE CASCADE ON UPDATE CASCADE108 """.format(users_table=self._tables['users_table'], users_table_id=self._tables['users_table_id'])109 else:110 users_table_constraint=""111 if self._table_exists(self._tables['prefix'] + self._tables['permission_user_table']):112 return None113 query="""CREATE TABLE IF NOT EXISTS `{prefix}{table}` (114 `permission_id` int(10) unsigned NOT NULL,115 `user_id` int(10) unsigned NOT NULL,116 PRIMARY KEY (`permission_id`,`user_id`),117 KEY `permission_user_user_id_foreign` (`user_id`),118 CONSTRAINT `permission_user_permission_id_foreign` FOREIGN KEY (`permission_id`) REFERENCES `{permissions_table}` (`id`) ON DELETE CASCADE ON UPDATE CASCADE{users_table_constraint}119 ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;""".format(prefix=self._tables['prefix'], table=self._tables['permission_user_table'], users_table_constraint=users_table_constraint, roles_table=self._tables['prefix'] + self._tables['roles_table'], permissions_table=self._tables['prefix'] + self._tables['permissions_table'])120 return self._query(query)121 def drop_roles_table(self):122 """Drop Roles Table"""123 if not self._table_exists(self._tables['prefix'] + self._tables['roles_table']):124 return None125 query="DROP TABLE IF EXISTS `{prefix}{table}`;".format(prefix=self._tables['prefix'], table=self._tables['roles_table'])126 return self._query(query)127 def drop_permissions_table(self):128 """Drop Permissions Table"""129 if not self._table_exists(self._tables['prefix'] + self._tables['permissions_table']):130 return None131 query="DROP TABLE IF EXISTS `{prefix}{table}`;".format(prefix=self._tables['prefix'], table=self._tables['permissions_table'])132 return self._query(query)133 def drop_permission_role_table(self):134 """Drop Permission Role Table"""135 if not self._table_exists(self._tables['prefix'] + self._tables['permission_role_table']):136 return None137 query="DROP TABLE IF EXISTS `{prefix}{table}`;".format(prefix=self._tables['prefix'], table=self._tables['permission_role_table'])138 return self._query(query)139 def drop_role_user_table(self):140 """Drop Role User Table"""141 if not self._table_exists(self._tables['prefix'] + self._tables['role_user_table']):142 return None143 query="DROP TABLE IF EXISTS `{prefix}{table}`;".format(prefix=self._tables['prefix'], table=self._tables['role_user_table'])144 return self._query(query)145 def drop_permission_user_table(self):146 """Drop Permission User Table"""147 if not self._table_exists(self._tables['prefix'] + self._tables['permission_user_table']):148 return None149 query="DROP TABLE IF EXISTS `{prefix}{table}`;".format(prefix=self._tables['prefix'], table=self._tables['permission_user_table'])150 return self._query(query)151 def _table_exists(self, table_name):152 """Check if Tables Exist153 Args:154 table_name: a table name to check155 """156 with self._connection.cursor() as cursor:157 cursor.execute("SHOW TABLES LIKE '" + table_name +"';")158 self._connection.commit()159 for row in cursor:160 return table_name in row.values()161 def _connect(self):162 """Connect to Database"""163 try:164 self._connection = pymysql.connect(host=self._db['host'], user=self._db['username'], password=self._db['password'], db=self._db['database'], charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor)165 except Exception as e:...

Full Screen

Full Screen

test_caribou.py

Source:test_caribou.py Github

copy

Full Screen

...47 def tearDown(self):48 if os.path.exists(self.db_url):49 os.remove(self.db_url)50 @staticmethod51 def _table_exists(conn, table_name):52 sql = """53 SELECT *54 FROM sqlite_master55 WHERE type = 'table'56 AND name = :157 """58 with caribou.execute(conn, sql, [table_name]) as cursor:59 return bool(cursor.fetchall())60 def test_invalid_migration_filenames(self):61 """62 assert we can't load migrations with invalid version names63 """64 # assert we test invalid version names65 invalid_names = os.path.join(get_this_dir(), INVALID_NAMES, '*.py')66 for filename in glob.glob(invalid_names):67 try:68 migration = caribou.Migration(filename)69 migration.get_version()70 except caribou.InvalidNameError:71 pass72 else:73 assert False, filename74 def test_valid_migration_filenames(self):75 """ assert we can parse the versions from migration files """76 # test some valid versions77 for version, suffix in [ ('20091112130101', '__migration_one.py')78 , ('20091112150200', '__migration_two.py')79 , ('20091112150205', '_migration_three.py')80 ]:81 path = os.path.join(self.migrations_path, version + suffix)82 migration = caribou.Migration(path)83 actual_version = migration.get_version()84 assert actual_version == version, '%s != %s' % (85 actual_version, version)86 def test_invalid_migraton_code(self):87 filenames = [ '20091112130101_syntax_error.py'88 , '20091112150200_missing_upgrade.py'89 , '20091112150205_missing_downgrade.py'90 ]91 code_dir = os.path.join(get_this_dir(), INVALID_CODE)92 # assert we can't load a directory containing invalid migrations93 try:94 caribou.load_migrations(code_dir)95 except caribou.InvalidMigrationError:96 pass97 else:98 assert False, 'loaded a dir with invalid migrations'99 # assert we can't load each invalid migration100 migrations = [os.path.join(code_dir, f) for f in filenames]101 for migration in migrations:102 try:103 caribou.Migration(migration)104 except caribou.InvalidMigrationError:105 pass106 else:107 assert False, 'loaded invalid migration [%s]' % migration108 def test_unknown_migration(self):109 """ assert we can't target an unknown migration or non existant dirs"""110 db_url = self.db_url111 migrations_path = self.migrations_path112 for v in ['asdf', '22341', 'asdfasdfasdf', '----']:113 for func in [caribou.upgrade, caribou.downgrade]:114 try:115 func(db_url, migrations_path, v)116 except caribou.Error:117 pass118 else:119 assert False, 'ran an unknown migration: %s' % v120 # assert we can't run non-existant migrations121 path = '/path/to/nowhereski/whoop'122 for func, args in [ (caribou.upgrade, (db_url, path, None))123 , (caribou.downgrade, (db_url, path, 0))124 ]:125 try:126 func(*args)127 except caribou.Error:128 pass129 else:130 assert False, '%s %s' % (func, str(args))131 def test_migration(self):132 # assert migrations haven't been run133 db_url = self.db_url134 conn = sqlite3.connect(db_url)135 assert not self._table_exists(conn, 'games')136 assert not self._table_exists(conn, 'players')137 assert caribou.get_version(db_url) == None138 # assert that the first migration has been run successfully139 # and that subsequent runs have no effect 140 v1 = '20091112130101' 141 v2 = '20091112150200'142 v3 = '20091112150205'143 for _ in range(3):144 caribou.upgrade(db_url, self.migrations_path, v1)145 assert self._table_exists(conn, 'games')146 assert self._table_exists(conn, 'players')147 actual_version = caribou.get_version(self.db_url)148 assert actual_version == v1, '%s != %s' % (actual_version, v1)149 # make sure none of the other migrations run150 assert not self._table_exists(conn, 'scores')151 # run the 2nd migration152 for _ in range(3):153 caribou.upgrade(db_url, self.migrations_path, v2)154 tables = ['games', 'players', 'scores']155 assert all((self._table_exists(conn, t) for t in tables))156 actual_version = caribou.get_version(db_url)157 assert actual_version == v2, '%s != %s' % (actual_version, v2)158 # downgrade the second migration159 for _ in range(3):160 caribou.downgrade(db_url, self.migrations_path, v1)161 assert self._table_exists(conn, 'games')162 assert self._table_exists(conn, 'players')163 actual_version = caribou.get_version(db_url)164 assert actual_version == v1, '%s != %s' % (actual_version, v1)165 # make sure none of the other migrations run166 assert not self._table_exists(conn, 'scores')167 # upgrade all the way 168 for _ in range(3):169 caribou.upgrade(db_url, self.migrations_path)170 tables = ['games', 'players', 'scores', 'jams']171 assert all((self._table_exists(conn, t) for t in tables))172 actual_version = caribou.get_version(db_url)173 assert actual_version == v3, '%s != %s' % (actual_version, v3)174 # downgrade all the way 175 for _ in range(3):176 caribou.downgrade(db_url, self.migrations_path, 0)177 tables = ['games', 'players', 'scores', 'jams']178 assert all((not self._table_exists(conn, t) for t in tables))179 actual_version = caribou.get_version(db_url)180 assert actual_version == '0'181 # upgrade all the way again182 for _ in range(3):183 caribou.upgrade(db_url, self.migrations_path)184 tables = ['games', 'players', 'scores', 'jams']185 assert all((self._table_exists(conn, t) for t in tables))186 actual_version = caribou.get_version(db_url)187 assert actual_version == v3, '%s != %s' % (actual_version, v3)188 def test_create_migration(self):189 """ assert we can create migration templates """190 for name, directory in [ ('tc_1', None), ('tc_2', 'test_create__')]:191 if directory and not os.path.exists(directory):192 os.makedirs(directory)193 path = caribou.create_migration(name, directory)194 try:195 assert os.path.exists(path)196 # assert it is a valid migration197 print caribou.Migration(path)198 finally:199 # remove compiled test migration as well...

Full Screen

Full Screen

dynamo_table.py

Source:dynamo_table.py Github

copy

Full Screen

...14def create():15 global _table_name16 global dynamodb17 global _continue_printing18 if _table_exists():19 print("Table already exists. Delete the table and try again")20 continue_prompt()21 return22 print(f"Creating {_table_name} table")23 # Create the DynamoDB table.24 table = dynamodb.create_table(25 TableName=_table_name,26 KeySchema=[27 {"AttributeName": "resource_id", "KeyType": "HASH"},28 {"AttributeName": "tag_name", "KeyType": "RANGE"},29 ],30 AttributeDefinitions=[31 {"AttributeName": "resource_id", "AttributeType": "S"},32 {"AttributeName": "tag_name", "AttributeType": "S"},33 ],34 ProvisionedThroughput={"ReadCapacityUnits": 1, "WriteCapacityUnits": 1},35 )36 start_print_progress()37 # Wait until the table exists.38 table.meta.client.get_waiter("table_exists").wait(TableName=_table_name)39 stop_print_progress()40 # Print out some data about the table.41 print("\nFinished creating table.")42 continue_prompt()43def delete():44 global dynamodb45 global _table_name46 if _table_exists():47 table = dynamodb.Table(_table_name)48 table.delete()49 print(f"Deleted {_table_name} Table")50 continue_prompt()51 else:52 print(f"Table {_table_name} not exists")53 continue_prompt()54def delete_all_records(prompt=True):55 global dynamodb56 global _table_name57 if _table_exists():58 table = dynamodb.Table(_table_name)59 scan = table.scan(60 ProjectionExpression="#k,resource_id",61 ExpressionAttributeNames={"#k": "tag_name"},62 )63 with table.batch_writer() as batch:64 for each in scan["Items"]:65 batch.delete_item(66 Key={67 "resource_id": each["resource_id"],68 "tag_name": each["tag_name"],69 }70 )71 print(f"Deleted all records from {_table_name} Table")72 if prompt == True:73 continue_prompt()74 else:75 print(f"Table {_table_name} not exists")76 if continue_prompt == True:77 continue_prompt()78def insert_records(data):79 global dynamodb80 global _table_name81 if _table_exists():82 table = dynamodb.Table(_table_name)83 with table.batch_writer(84 overwrite_by_pkeys=["resource_id", "tag_name"]85 ) as batch:86 for i in range(len(data)):87 batch.put_item(Item=data[i])88def export_csv(csv_file_path):89 global dynamodb90 global _table_name91 if _table_exists():92 table = dynamodb.Table(_table_name)93 response = table.scan()94 data = response["Items"]95 while "LastEvaluatedKey" in response:96 response = table.scan(ExclusiveStartKey=response["LastEvaluatedKey"])97 data.extend(response["Items"])98 keys = data[0].keys()99 with open(csv_file_path, "w", newline="") as output_file:100 dict_writer = csv.DictWriter(101 output_file,102 ["resource_id", "type", "tag_name", "tag_value", "delete(y/n)"],103 )104 dict_writer.writeheader()105 dict_writer.writerows(data)106 print("Export completed.")107 continue_prompt()108def import_data_from_csv(csv_file_path, replace=True):109 global dynamodb110 global _table_name111 if _table_exists():112 table = dynamodb.Table(_table_name)113 if replace == True:114 delete_all_records(prompt=False)115 print("Importing data")116 start_print_progress()117 with open(csv_file_path) as csv_file:118 csv_reader = csv.DictReader(csv_file, delimiter=",")119 for row in csv_reader:120 table.put_item(Item=row)121 stop_print_progress()122 print("Import completed.")123 continue_prompt()124def get_all_items():125 global dynamodb126 global _table_name127 data = []128 if _table_exists():129 table = dynamodb.Table(_table_name)130 response = table.scan()131 data = response["Items"]132 while "LastEvaluatedKey" in response:133 response = table.scan(ExclusiveStartKey=response["LastEvaluatedKey"])134 data.extend(response["Items"])135 return data136def _table_exists():137 global dynamodb138 global _table_name139 try:140 response = dynamodb.meta.client.describe_table(TableName=_table_name)141 return True142 except dynamodb.meta.client.exceptions.ResourceNotFoundException:143 return False144_continue_printing = False145def _print_status():146 global _continue_printing147 while _continue_printing:148 sys.stdout.write(".")149 time.sleep(1)150 sys.stdout.flush()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful