Best Python code snippet using localstack_python
migrations.py
Source:migrations.py  
...38            self._tables[key] = tables[key]39        self._connect()40    def create_roles_table(self):41        """Create Roles Table"""42        if self._table_exists(self._tables['prefix'] + self._tables['roles_table']):43            return None44        query="""CREATE TABLE IF NOT EXISTS `{prefix}{table}` (45              `id` int(10) unsigned NOT NULL AUTO_INCREMENT,46              `name` varchar(255) COLLATE utf8_unicode_ci NOT NULL,47              `display_name` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL,48              `description` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL,49              `created_at` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',50              `updated_at` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',51              `enabled` tinyint(1) NOT NULL DEFAULT '0',52              PRIMARY KEY (`id`),53              UNIQUE KEY `roles_name_unique` (`name`)54            ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;""".format(prefix=self._tables['prefix'], table=self._tables['roles_table'])55        return self._query(query)56    def create_permissions_table(self):57        """Create Permissions Table"""58        if self._table_exists(self._tables['prefix'] + self._tables['permissions_table']):59            return None60        query="""CREATE TABLE IF NOT EXISTS `{prefix}{table}` (61              `id` int(10) unsigned NOT NULL AUTO_INCREMENT,62              `name` varchar(255) COLLATE utf8_unicode_ci NOT NULL,63              `display_name` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL,64              `description` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL,65              `created_at` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',66              `updated_at` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',67              `enabled` tinyint(1) NOT NULL DEFAULT '0',68              PRIMARY KEY (`id`),69              UNIQUE KEY `permissions_name_unique` (`name`)70            ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;""".format(prefix=self._tables['prefix'], table=self._tables['permissions_table'])71        return self._query(query)72    def create_permission_role_table(self):73        """Create Permission Role Table"""74        if self._table_exists(self._tables['prefix'] + self._tables['permission_role_table']):75            return None76        query="""CREATE TABLE IF NOT EXISTS `{prefix}{table}` (77              `permission_id` int(10) unsigned NOT NULL,78              `role_id` int(10) unsigned NOT NULL,79              PRIMARY KEY (`permission_id`,`role_id`),80              KEY `permission_role_role_id_foreign` (`role_id`),81              CONSTRAINT `permission_role_permission_id_foreign` FOREIGN KEY (`permission_id`) REFERENCES `{permissions_table}` (`id`) ON DELETE CASCADE ON UPDATE CASCADE,82              CONSTRAINT `permission_role_role_id_foreign` FOREIGN KEY (`role_id`) REFERENCES `{roles_table}` (`id`) ON DELETE CASCADE ON UPDATE CASCADE83            ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;""".format(prefix=self._tables['prefix'], table=self._tables['permission_role_table'], roles_table=self._tables['prefix'] + self._tables['roles_table'], permissions_table=self._tables['prefix'] + self._tables['permissions_table'])84        return self._query(query)85    def create_role_user_table(self):86        """Create Role User Table"""87        if self._tables['users_table'] != False and self._tables['users_table_id'] != False and self._table_exists(self._tables['users_table']):88            users_table_constraint=""",89            CONSTRAINT `role_user_user_id_foreign` FOREIGN KEY (`user_id`) REFERENCES `{users_table}` (`{users_table_id}`) ON DELETE CASCADE ON UPDATE CASCADE90            """.format(users_table=self._tables['users_table'], users_table_id=self._tables['users_table_id'])91        else:92            users_table_constraint=""93        if self._table_exists(self._tables['prefix'] + self._tables['role_user_table']):94            return None95        query="""CREATE TABLE IF NOT EXISTS `{prefix}{table}` (96              `user_id` int(10) unsigned NOT NULL,97              `role_id` int(10) unsigned NOT NULL,98              PRIMARY KEY (`user_id`,`role_id`),99              KEY `role_user_role_id_foreign` (`role_id`),100              CONSTRAINT `role_user_role_id_foreign` FOREIGN KEY (`role_id`) REFERENCES `{roles_table}` (`id`) ON DELETE CASCADE ON UPDATE CASCADE{users_table_constraint}101            ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;""".format(prefix=self._tables['prefix'], table=self._tables['role_user_table'], users_table_constraint=users_table_constraint, roles_table=self._tables['prefix'] + self._tables['roles_table'], permissions_table=self._tables['prefix'] + self._tables['permissions_table'])102        return self._query(query)103    def create_permission_user_table(self):104        """Create Permission User Table"""105        if self._tables['users_table'] != False and self._tables['users_table_id'] != False and self._table_exists(self._tables['users_table']):106            users_table_constraint=""",107            CONSTRAINT `permission_user_user_id_foreign` FOREIGN KEY (`user_id`) REFERENCES `{users_table}` (`{users_table_id}`) ON DELETE CASCADE ON UPDATE CASCADE108            """.format(users_table=self._tables['users_table'], users_table_id=self._tables['users_table_id'])109        else:110            users_table_constraint=""111        if self._table_exists(self._tables['prefix'] + self._tables['permission_user_table']):112            return None113        query="""CREATE TABLE IF NOT EXISTS `{prefix}{table}` (114              `permission_id` int(10) unsigned NOT NULL,115              `user_id` int(10) unsigned NOT NULL,116              PRIMARY KEY (`permission_id`,`user_id`),117              KEY `permission_user_user_id_foreign` (`user_id`),118              CONSTRAINT `permission_user_permission_id_foreign` FOREIGN KEY (`permission_id`) REFERENCES `{permissions_table}` (`id`) ON DELETE CASCADE ON UPDATE CASCADE{users_table_constraint}119            ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;""".format(prefix=self._tables['prefix'], table=self._tables['permission_user_table'], users_table_constraint=users_table_constraint, roles_table=self._tables['prefix'] + self._tables['roles_table'], permissions_table=self._tables['prefix'] + self._tables['permissions_table'])120        return self._query(query)121    def drop_roles_table(self):122        """Drop Roles Table"""123        if not self._table_exists(self._tables['prefix'] + self._tables['roles_table']):124            return None125        query="DROP TABLE IF EXISTS `{prefix}{table}`;".format(prefix=self._tables['prefix'], table=self._tables['roles_table'])126        return self._query(query)127    def drop_permissions_table(self):128        """Drop Permissions Table"""129        if not self._table_exists(self._tables['prefix'] + self._tables['permissions_table']):130            return None131        query="DROP TABLE IF EXISTS `{prefix}{table}`;".format(prefix=self._tables['prefix'], table=self._tables['permissions_table'])132        return self._query(query)133    def drop_permission_role_table(self):134        """Drop Permission Role Table"""135        if not self._table_exists(self._tables['prefix'] + self._tables['permission_role_table']):136            return None137        query="DROP TABLE IF EXISTS `{prefix}{table}`;".format(prefix=self._tables['prefix'], table=self._tables['permission_role_table'])138        return self._query(query)139    def drop_role_user_table(self):140        """Drop Role User Table"""141        if not self._table_exists(self._tables['prefix'] + self._tables['role_user_table']):142            return None143        query="DROP TABLE IF EXISTS `{prefix}{table}`;".format(prefix=self._tables['prefix'], table=self._tables['role_user_table'])144        return self._query(query)145    def drop_permission_user_table(self):146        """Drop Permission User Table"""147        if not self._table_exists(self._tables['prefix'] + self._tables['permission_user_table']):148            return None149        query="DROP TABLE IF EXISTS `{prefix}{table}`;".format(prefix=self._tables['prefix'], table=self._tables['permission_user_table'])150        return self._query(query)151    def _table_exists(self, table_name):152        """Check if Tables Exist153            Args:154                table_name: a table name to check155        """156        with self._connection.cursor() as cursor:157            cursor.execute("SHOW TABLES LIKE '" + table_name +"';")158        self._connection.commit()159        for row in cursor:160            return table_name in row.values()161    def _connect(self):162        """Connect to Database"""163        try:164            self._connection = pymysql.connect(host=self._db['host'], user=self._db['username'], password=self._db['password'], db=self._db['database'], charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor)165        except Exception as e:...test_caribou.py
Source:test_caribou.py  
...47    def tearDown(self):48        if os.path.exists(self.db_url):49            os.remove(self.db_url)50    @staticmethod51    def _table_exists(conn, table_name):52        sql = """53            SELECT *54              FROM sqlite_master55             WHERE type = 'table'56               AND name = :157               """58        with caribou.execute(conn, sql, [table_name]) as cursor:59            return bool(cursor.fetchall())60    def test_invalid_migration_filenames(self):61        """62        assert we can't load migrations with invalid version names63        """64        # assert we test invalid version names65        invalid_names = os.path.join(get_this_dir(), INVALID_NAMES, '*.py')66        for filename in glob.glob(invalid_names):67            try:68                migration = caribou.Migration(filename)69                migration.get_version()70            except caribou.InvalidNameError:71                pass72            else:73                assert False, filename74    def test_valid_migration_filenames(self):75        """ assert we can parse the versions from migration files """76        # test some valid versions77        for version, suffix in [ ('20091112130101', '__migration_one.py')78                               , ('20091112150200', '__migration_two.py')79                               , ('20091112150205', '_migration_three.py')80                               ]:81            path = os.path.join(self.migrations_path, version + suffix)82            migration = caribou.Migration(path)83            actual_version = migration.get_version()84            assert actual_version == version, '%s != %s' % (85                            actual_version, version)86    def test_invalid_migraton_code(self):87        filenames = [ '20091112130101_syntax_error.py'88                    , '20091112150200_missing_upgrade.py'89                    , '20091112150205_missing_downgrade.py'90                    ]91        code_dir = os.path.join(get_this_dir(), INVALID_CODE)92        # assert we can't load a directory containing invalid migrations93        try:94            caribou.load_migrations(code_dir)95        except caribou.InvalidMigrationError:96            pass97        else:98            assert False, 'loaded a dir with invalid migrations'99        # assert we can't load each invalid migration100        migrations = [os.path.join(code_dir, f) for f in filenames]101        for migration in migrations:102            try:103                caribou.Migration(migration)104            except caribou.InvalidMigrationError:105                pass106            else:107                assert False, 'loaded invalid migration [%s]' % migration108    def test_unknown_migration(self):109        """ assert we can't target an unknown migration or non existant dirs"""110        db_url = self.db_url111        migrations_path = self.migrations_path112        for v in ['asdf', '22341', 'asdfasdfasdf', '----']:113            for func in [caribou.upgrade, caribou.downgrade]:114                try:115                    func(db_url, migrations_path, v)116                except caribou.Error:117                    pass118                else:119                    assert False, 'ran an unknown migration: %s' % v120        # assert we can't run non-existant migrations121        path = '/path/to/nowhereski/whoop'122        for func, args in [ (caribou.upgrade, (db_url, path, None))123                          , (caribou.downgrade, (db_url, path, 0))124                          ]:125            try:126                func(*args)127            except caribou.Error:128                pass129            else:130                assert False, '%s %s' % (func, str(args))131    def test_migration(self):132        # assert migrations haven't been run133        db_url = self.db_url134        conn = sqlite3.connect(db_url)135        assert not self._table_exists(conn, 'games')136        assert not self._table_exists(conn, 'players')137        assert caribou.get_version(db_url) == None138        # assert that the first migration has been run successfully139        # and that subsequent runs have no effect 140        v1 = '20091112130101' 141        v2 = '20091112150200'142        v3 = '20091112150205'143        for _ in range(3):144            caribou.upgrade(db_url, self.migrations_path, v1)145            assert self._table_exists(conn, 'games')146            assert self._table_exists(conn, 'players')147            actual_version = caribou.get_version(self.db_url)148            assert actual_version == v1, '%s != %s' % (actual_version, v1)149            # make sure none of the other migrations run150            assert not self._table_exists(conn, 'scores')151        # run the 2nd migration152        for _ in range(3):153            caribou.upgrade(db_url, self.migrations_path, v2)154            tables = ['games', 'players', 'scores']155            assert all((self._table_exists(conn, t) for t in tables))156            actual_version = caribou.get_version(db_url)157            assert actual_version == v2, '%s != %s' % (actual_version, v2)158        # downgrade the second migration159        for _ in range(3):160            caribou.downgrade(db_url, self.migrations_path, v1)161            assert self._table_exists(conn, 'games')162            assert self._table_exists(conn, 'players')163            actual_version = caribou.get_version(db_url)164            assert actual_version == v1, '%s != %s' % (actual_version, v1)165            # make sure none of the other migrations run166            assert not self._table_exists(conn, 'scores')167        # upgrade all the way 168        for _ in range(3):169            caribou.upgrade(db_url, self.migrations_path)170            tables = ['games', 'players', 'scores', 'jams']171            assert all((self._table_exists(conn, t) for t in tables))172            actual_version = caribou.get_version(db_url)173            assert actual_version == v3, '%s != %s' % (actual_version, v3)174        # downgrade all the way 175        for _ in range(3):176            caribou.downgrade(db_url, self.migrations_path, 0)177            tables = ['games', 'players', 'scores', 'jams']178            assert all((not self._table_exists(conn, t) for t in tables))179            actual_version = caribou.get_version(db_url)180            assert actual_version == '0'181        # upgrade all the way again182        for _ in range(3):183            caribou.upgrade(db_url, self.migrations_path)184            tables = ['games', 'players', 'scores', 'jams']185            assert all((self._table_exists(conn, t) for t in tables))186            actual_version = caribou.get_version(db_url)187            assert actual_version == v3, '%s != %s' % (actual_version, v3)188    def test_create_migration(self):189        """ assert we can create migration templates """190        for name, directory in [ ('tc_1', None), ('tc_2', 'test_create__')]:191            if directory and not os.path.exists(directory):192                os.makedirs(directory)193            path = caribou.create_migration(name, directory)194            try:195                assert os.path.exists(path)196                # assert it is a valid migration197                print caribou.Migration(path)198            finally:199                # remove compiled test migration as well...dynamo_table.py
Source:dynamo_table.py  
...14def create():15    global _table_name16    global dynamodb17    global _continue_printing18    if _table_exists():19        print("Table already exists. Delete the table and try again")20        continue_prompt()21        return22    print(f"Creating {_table_name} table")23    # Create the DynamoDB table.24    table = dynamodb.create_table(25        TableName=_table_name,26        KeySchema=[27            {"AttributeName": "resource_id", "KeyType": "HASH"},28            {"AttributeName": "tag_name", "KeyType": "RANGE"},29        ],30        AttributeDefinitions=[31            {"AttributeName": "resource_id", "AttributeType": "S"},32            {"AttributeName": "tag_name", "AttributeType": "S"},33        ],34        ProvisionedThroughput={"ReadCapacityUnits": 1, "WriteCapacityUnits": 1},35    )36    start_print_progress()37    # Wait until the table exists.38    table.meta.client.get_waiter("table_exists").wait(TableName=_table_name)39    stop_print_progress()40    # Print out some data about the table.41    print("\nFinished creating table.")42    continue_prompt()43def delete():44    global dynamodb45    global _table_name46    if _table_exists():47        table = dynamodb.Table(_table_name)48        table.delete()49        print(f"Deleted {_table_name} Table")50        continue_prompt()51    else:52        print(f"Table {_table_name} not exists")53        continue_prompt()54def delete_all_records(prompt=True):55    global dynamodb56    global _table_name57    if _table_exists():58        table = dynamodb.Table(_table_name)59        scan = table.scan(60            ProjectionExpression="#k,resource_id",61            ExpressionAttributeNames={"#k": "tag_name"},62        )63        with table.batch_writer() as batch:64            for each in scan["Items"]:65                batch.delete_item(66                    Key={67                        "resource_id": each["resource_id"],68                        "tag_name": each["tag_name"],69                    }70                )71        print(f"Deleted all records from {_table_name} Table")72        if prompt == True:73            continue_prompt()74    else:75        print(f"Table {_table_name} not exists")76        if continue_prompt == True:77            continue_prompt()78def insert_records(data):79    global dynamodb80    global _table_name81    if _table_exists():82        table = dynamodb.Table(_table_name)83        with table.batch_writer(84            overwrite_by_pkeys=["resource_id", "tag_name"]85        ) as batch:86            for i in range(len(data)):87                batch.put_item(Item=data[i])88def export_csv(csv_file_path):89    global dynamodb90    global _table_name91    if _table_exists():92        table = dynamodb.Table(_table_name)93        response = table.scan()94        data = response["Items"]95        while "LastEvaluatedKey" in response:96            response = table.scan(ExclusiveStartKey=response["LastEvaluatedKey"])97            data.extend(response["Items"])98        keys = data[0].keys()99        with open(csv_file_path, "w", newline="") as output_file:100            dict_writer = csv.DictWriter(101                output_file,102                ["resource_id", "type", "tag_name", "tag_value", "delete(y/n)"],103            )104            dict_writer.writeheader()105            dict_writer.writerows(data)106        print("Export completed.")107        continue_prompt()108def import_data_from_csv(csv_file_path, replace=True):109    global dynamodb110    global _table_name111    if _table_exists():112        table = dynamodb.Table(_table_name)113        if replace == True:114            delete_all_records(prompt=False)115        print("Importing data")116        start_print_progress()117        with open(csv_file_path) as csv_file:118            csv_reader = csv.DictReader(csv_file, delimiter=",")119            for row in csv_reader:120                table.put_item(Item=row)121        stop_print_progress()122        print("Import completed.")123        continue_prompt()124def get_all_items():125    global dynamodb126    global _table_name127    data = []128    if _table_exists():129        table = dynamodb.Table(_table_name)130        response = table.scan()131        data = response["Items"]132        while "LastEvaluatedKey" in response:133            response = table.scan(ExclusiveStartKey=response["LastEvaluatedKey"])134            data.extend(response["Items"])135    return data136def _table_exists():137    global dynamodb138    global _table_name139    try:140        response = dynamodb.meta.client.describe_table(TableName=_table_name)141        return True142    except dynamodb.meta.client.exceptions.ResourceNotFoundException:143        return False144_continue_printing = False145def _print_status():146    global _continue_printing147    while _continue_printing:148        sys.stdout.write(".")149        time.sleep(1)150        sys.stdout.flush()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
