Best Python code snippet using autotest_python
migrate.py
Source:migrate.py  
1import sys2import time3from importlib import import_module4from django.apps import apps5from django.core.management.base import (6    BaseCommand, CommandError, no_translations,7)8from django.core.management.sql import (9    emit_post_migrate_signal, emit_pre_migrate_signal,10)11from django.db import DEFAULT_DB_ALIAS, connections, router12from django.db.migrations.autodetector import MigrationAutodetector13from django.db.migrations.executor import MigrationExecutor14from django.db.migrations.loader import AmbiguityError15from django.db.migrations.state import ModelState, ProjectState16from django.utils.module_loading import module_has_submodule17from django.utils.text import Truncator18class Command(BaseCommand):19    help = "Updates database schema. Manages both apps with migrations and those without."20    requires_system_checks = []21    def add_arguments(self, parser):22        parser.add_argument(23            '--skip-checks', action='store_true',24            help='Skip system checks.',25        )26        parser.add_argument(27            'app_label', nargs='?',28            help='App label of an application to synchronize the state.',29        )30        parser.add_argument(31            'migration_name', nargs='?',32            help='Database state will be brought to the state after that '33                 'migration. Use the name "zero" to unapply all migrations.',34        )35        parser.add_argument(36            '--noinput', '--no-input', action='store_false', dest='interactive',37            help='Tells Django to NOT prompt the user for input of any kind.',38        )39        parser.add_argument(40            '--database',41            default=DEFAULT_DB_ALIAS,42            help='Nominates a database to synchronize. Defaults to the "default" database.',43        )44        parser.add_argument(45            '--fake', action='store_true',46            help='Mark migrations as run without actually running them.',47        )48        parser.add_argument(49            '--fake-initial', action='store_true',50            help='Detect if tables already exist and fake-apply initial migrations if so. Make sure '51                 'that the current database schema matches your initial migration before using this '52                 'flag. Django will only check for an existing table name.',53        )54        parser.add_argument(55            '--plan', action='store_true',56            help='Shows a list of the migration actions that will be performed.',57        )58        parser.add_argument(59            '--run-syncdb', action='store_true',60            help='Creates tables for apps without migrations.',61        )62        parser.add_argument(63            '--check', action='store_true', dest='check_unapplied',64            help='Exits with a non-zero status if unapplied migrations exist.',65        )66    @no_translations67    def handle(self, *args, **options):68        database = options['database']69        if not options['skip_checks']:70            self.check(databases=[database])71        self.verbosity = options['verbosity']72        self.interactive = options['interactive']73        # Import the 'management' module within each installed app, to register74        # dispatcher events.75        for app_config in apps.get_app_configs():76            if module_has_submodule(app_config.module, "management"):77                import_module('.management', app_config.name)78        # Get the database we're operating from79        connection = connections[database]80        # Hook for backends needing any database preparation81        connection.prepare_database()82        # Work out which apps have migrations and which do not83        executor = MigrationExecutor(connection, self.migration_progress_callback)84        # Raise an error if any migrations are applied before their dependencies.85        executor.loader.check_consistent_history(connection)86        # Before anything else, see if there's conflicting apps and drop out87        # hard if there are any88        conflicts = executor.loader.detect_conflicts()89        if conflicts:90            name_str = "; ".join(91                "%s in %s" % (", ".join(names), app)92                for app, names in conflicts.items()93            )94            raise CommandError(95                "Conflicting migrations detected; multiple leaf nodes in the "96                "migration graph: (%s).\nTo fix them run "97                "'python manage.py makemigrations --merge'" % name_str98            )99        # If they supplied command line arguments, work out what they mean.100        run_syncdb = options['run_syncdb']101        target_app_labels_only = True102        if options['app_label']:103            # Validate app_label.104            app_label = options['app_label']105            try:106                apps.get_app_config(app_label)107            except LookupError as err:108                raise CommandError(str(err))109            if run_syncdb:110                if app_label in executor.loader.migrated_apps:111                    raise CommandError("Can't use run_syncdb with app '%s' as it has migrations." % app_label)112            elif app_label not in executor.loader.migrated_apps:113                raise CommandError("App '%s' does not have migrations." % app_label)114        if options['app_label'] and options['migration_name']:115            migration_name = options['migration_name']116            if migration_name == "zero":117                targets = [(app_label, None)]118            else:119                try:120                    migration = executor.loader.get_migration_by_prefix(app_label, migration_name)121                except AmbiguityError:122                    raise CommandError(123                        "More than one migration matches '%s' in app '%s'. "124                        "Please be more specific." %125                        (migration_name, app_label)126                    )127                except KeyError:128                    raise CommandError("Cannot find a migration matching '%s' from app '%s'." % (129                        migration_name, app_label))130                targets = [(app_label, migration.name)]131            target_app_labels_only = False132        elif options['app_label']:133            targets = [key for key in executor.loader.graph.leaf_nodes() if key[0] == app_label]134        else:135            targets = executor.loader.graph.leaf_nodes()136        plan = executor.migration_plan(targets)137        exit_dry = plan and options['check_unapplied']138        if options['plan']:139            self.stdout.write('Planned operations:', self.style.MIGRATE_LABEL)140            if not plan:141                self.stdout.write('  No planned migration operations.')142            for migration, backwards in plan:143                self.stdout.write(str(migration), self.style.MIGRATE_HEADING)144                for operation in migration.operations:145                    message, is_error = self.describe_operation(operation, backwards)146                    style = self.style.WARNING if is_error else None147                    self.stdout.write('    ' + message, style)148            if exit_dry:149                sys.exit(1)150            return151        if exit_dry:152            sys.exit(1)153        # At this point, ignore run_syncdb if there aren't any apps to sync.154        run_syncdb = options['run_syncdb'] and executor.loader.unmigrated_apps155        # Print some useful info156        if self.verbosity >= 1:157            self.stdout.write(self.style.MIGRATE_HEADING("Operations to perform:"))158            if run_syncdb:159                if options['app_label']:160                    self.stdout.write(161                        self.style.MIGRATE_LABEL("  Synchronize unmigrated app: %s" % app_label)162                    )163                else:164                    self.stdout.write(165                        self.style.MIGRATE_LABEL("  Synchronize unmigrated apps: ") +166                        (", ".join(sorted(executor.loader.unmigrated_apps)))167                    )168            if target_app_labels_only:169                self.stdout.write(170                    self.style.MIGRATE_LABEL("  Apply all migrations: ") +171                    (", ".join(sorted({a for a, n in targets})) or "(none)")172                )173            else:174                if targets[0][1] is None:175                    self.stdout.write(176                        self.style.MIGRATE_LABEL('  Unapply all migrations: ') +177                        str(targets[0][0])178                    )179                else:180                    self.stdout.write(self.style.MIGRATE_LABEL(181                        "  Target specific migration: ") + "%s, from %s"182                        % (targets[0][1], targets[0][0])183                    )184        pre_migrate_state = executor._create_project_state(with_applied_migrations=True)185        pre_migrate_apps = pre_migrate_state.apps186        emit_pre_migrate_signal(187            self.verbosity, self.interactive, connection.alias, apps=pre_migrate_apps, plan=plan,188        )189        # Run the syncdb phase.190        if run_syncdb:191            if self.verbosity >= 1:192                self.stdout.write(self.style.MIGRATE_HEADING("Synchronizing apps without migrations:"))193            if options['app_label']:194                self.sync_apps(connection, [app_label])195            else:196                self.sync_apps(connection, executor.loader.unmigrated_apps)197        # Migrate!198        if self.verbosity >= 1:199            self.stdout.write(self.style.MIGRATE_HEADING("Running migrations:"))200        if not plan:201            if self.verbosity >= 1:202                self.stdout.write("  No migrations to apply.")203                # If there's changes that aren't in migrations yet, tell them how to fix it.204                autodetector = MigrationAutodetector(205                    executor.loader.project_state(),206                    ProjectState.from_apps(apps),207                )208                changes = autodetector.changes(graph=executor.loader.graph)209                if changes:210                    self.stdout.write(self.style.NOTICE(211                        "  Your models in app(s): %s have changes that are not "212                        "yet reflected in a migration, and so won't be "213                        "applied." % ", ".join(repr(app) for app in sorted(changes))214                    ))215                    self.stdout.write(self.style.NOTICE(216                        "  Run 'manage.py makemigrations' to make new "217                        "migrations, and then re-run 'manage.py migrate' to "218                        "apply them."219                    ))220            fake = False221            fake_initial = False222        else:223            fake = options['fake']224            fake_initial = options['fake_initial']225        post_migrate_state = executor.migrate(226            targets, plan=plan, state=pre_migrate_state.clone(), fake=fake,227            fake_initial=fake_initial,228        )229        # post_migrate signals have access to all models. Ensure that all models230        # are reloaded in case any are delayed.231        post_migrate_state.clear_delayed_apps_cache()232        post_migrate_apps = post_migrate_state.apps233        # Re-render models of real apps to include relationships now that234        # we've got a final state. This wouldn't be necessary if real apps235        # models were rendered with relationships in the first place.236        with post_migrate_apps.bulk_update():237            model_keys = []238            for model_state in post_migrate_apps.real_models:239                model_key = model_state.app_label, model_state.name_lower240                model_keys.append(model_key)241                post_migrate_apps.unregister_model(*model_key)242        post_migrate_apps.render_multiple([243            ModelState.from_model(apps.get_model(*model)) for model in model_keys244        ])245        # Send the post_migrate signal, so individual apps can do whatever they need246        # to do at this point.247        emit_post_migrate_signal(248            self.verbosity, self.interactive, connection.alias, apps=post_migrate_apps, plan=plan,249        )250    def migration_progress_callback(self, action, migration=None, fake=False):251        if self.verbosity >= 1:252            compute_time = self.verbosity > 1253            if action == "apply_start":254                if compute_time:255                    self.start = time.monotonic()256                self.stdout.write("  Applying %s..." % migration, ending="")257                self.stdout.flush()258            elif action == "apply_success":259                elapsed = " (%.3fs)" % (time.monotonic() - self.start) if compute_time else ""260                if fake:261                    self.stdout.write(self.style.SUCCESS(" FAKED" + elapsed))262                else:263                    self.stdout.write(self.style.SUCCESS(" OK" + elapsed))264            elif action == "unapply_start":265                if compute_time:266                    self.start = time.monotonic()267                self.stdout.write("  Unapplying %s..." % migration, ending="")268                self.stdout.flush()269            elif action == "unapply_success":270                elapsed = " (%.3fs)" % (time.monotonic() - self.start) if compute_time else ""271                if fake:272                    self.stdout.write(self.style.SUCCESS(" FAKED" + elapsed))273                else:274                    self.stdout.write(self.style.SUCCESS(" OK" + elapsed))275            elif action == "render_start":276                if compute_time:277                    self.start = time.monotonic()278                self.stdout.write("  Rendering model states...", ending="")279                self.stdout.flush()280            elif action == "render_success":281                elapsed = " (%.3fs)" % (time.monotonic() - self.start) if compute_time else ""282                self.stdout.write(self.style.SUCCESS(" DONE" + elapsed))283    def sync_apps(self, connection, app_labels):284        """Run the old syncdb-style operation on a list of app_labels."""285        with connection.cursor() as cursor:286            tables = connection.introspection.table_names(cursor)287        # Build the manifest of apps and models that are to be synchronized.288        all_models = [289            (290                app_config.label,291                router.get_migratable_models(app_config, connection.alias, include_auto_created=False),292            )293            for app_config in apps.get_app_configs()294            if app_config.models_module is not None and app_config.label in app_labels295        ]296        def model_installed(model):297            opts = model._meta298            converter = connection.introspection.identifier_converter299            return not (300                (converter(opts.db_table) in tables) or301                (opts.auto_created and converter(opts.auto_created._meta.db_table) in tables)302            )303        manifest = {304            app_name: list(filter(model_installed, model_list))305            for app_name, model_list in all_models306        }307        # Create the tables for each model308        if self.verbosity >= 1:309            self.stdout.write('  Creating tables...')310        with connection.schema_editor() as editor:311            for app_name, model_list in manifest.items():312                for model in model_list:313                    # Never install unmanaged models, etc.314                    if not model._meta.can_migrate(connection):315                        continue316                    if self.verbosity >= 3:317                        self.stdout.write(318                            '    Processing %s.%s model' % (app_name, model._meta.object_name)319                        )320                    if self.verbosity >= 1:321                        self.stdout.write('    Creating table %s' % model._meta.db_table)322                    editor.create_model(model)323            # Deferred SQL is executed when exiting the editor's context.324            if self.verbosity >= 1:325                self.stdout.write('    Running deferred SQL...')326    @staticmethod327    def describe_operation(operation, backwards):328        """Return a string that describes a migration operation for --plan."""329        prefix = ''330        is_error = False331        if hasattr(operation, 'code'):332            code = operation.reverse_code if backwards else operation.code333            action = (code.__doc__ or '') if code else None334        elif hasattr(operation, 'sql'):335            action = operation.reverse_sql if backwards else operation.sql336        else:337            action = ''338            if backwards:339                prefix = 'Undo '340        if action is not None:341            action = str(action).replace('\n', '')342        elif backwards:343            action = 'IRREVERSIBLE'344            is_error = True345        if action:346            action = ' -> ' + action347        truncated = Truncator(action)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
