How to use get_migrations method in autotest

Best Python code snippet using autotest_python

groups.py

Source:groups.py Github

copy

Full Screen

...12 Provides the list of migrations associated with an group and a loader that returns13 the requested migration.14 """15 @abstractmethod16 def get_migrations(self) -> Sequence[str]:17 """18 Returns the list of migration IDs in the order they should be executed.19 """20 raise NotImplementedError21 @abstractmethod22 def load_migration(self, migration_id: str) -> Migration:23 raise NotImplementedError24class DirectoryLoader(GroupLoader, ABC):25 """26 Loads migrations that are defined as files of a directory. The file name27 represents the migration ID.28 """29 def __init__(self, module: str) -> None:30 self.__module = module31 @abstractmethod32 def get_migrations(self) -> Sequence[str]:33 raise NotImplementedError34 def load_migration(self, migration_id: str) -> Migration:35 try:36 module = import_module(f"{self.__module}.{migration_id}")37 return module.Migration() # type: ignore38 except ModuleNotFoundError:39 raise MigrationDoesNotExist("Invalid migration ID")40class ConfigurationLoader(DirectoryLoader):41 """42 Loads migration groups from YAML configuration files.43 """44 def __init__(45 self, path_to_migration_group_config: str, migration_path: str | None = None46 ) -> None:47 self.migration_group = load_migration_group(path_to_migration_group_config)48 self.migration_group_name = self.migration_group["name"]49 self.migration_names = [50 str(migration) for migration in self.migration_group["migrations"]51 ]52 self.optional = self.migration_group.get("optional", False)53 migration_path = migration_path or "snuba.snuba_migrations"54 super().__init__(f"{migration_path}.{self.migration_group_name}")55 def get_migrations(self) -> Sequence[str]:56 return self.migration_names57class SystemLoader(DirectoryLoader):58 def __init__(self) -> None:59 super().__init__("snuba.migrations.system_migrations")60 def get_migrations(self) -> Sequence[str]:61 return ["0001_migrations"]62class EventsLoader(DirectoryLoader):63 def __init__(self) -> None:64 super().__init__("snuba.snuba_migrations.events")65 def get_migrations(self) -> Sequence[str]:66 return [67 "0001_events_initial",68 "0002_events_onpremise_compatibility",69 "0003_errors",70 "0004_errors_onpremise_compatibility",71 "0005_events_tags_hash_map",72 "0006_errors_tags_hash_map",73 "0007_groupedmessages",74 "0008_groupassignees",75 "0009_errors_add_http_fields",76 "0010_groupedmessages_onpremise_compatibility",77 "0011_rebuild_errors",78 "0012_errors_make_level_nullable",79 "0013_errors_add_hierarchical_hashes",80 "0014_backfill_errors",81 "0015_truncate_events",82 "0016_drop_legacy_events",83 ]84class TransactionsLoader(DirectoryLoader):85 def __init__(self) -> None:86 super().__init__("snuba.snuba_migrations.transactions")87 def get_migrations(self) -> Sequence[str]:88 return [89 "0001_transactions",90 "0002_transactions_onpremise_fix_orderby_and_partitionby",91 "0003_transactions_onpremise_fix_columns",92 "0004_transactions_add_tags_hash_map",93 "0005_transactions_add_measurements",94 "0006_transactions_add_http_fields",95 "0007_transactions_add_discover_cols",96 "0008_transactions_add_timestamp_index",97 "0009_transactions_fix_title_and_message",98 "0010_transactions_nullable_trace_id",99 "0011_transactions_add_span_op_breakdowns",100 "0012_transactions_add_spans",101 "0013_transactions_reduce_spans_exclusive_time",102 "0014_transactions_remove_flattened_columns",103 "0015_transactions_add_source_column",104 "0016_transactions_add_group_ids_column",105 "0017_transactions_add_app_start_type_column",106 ]107class DiscoverLoader(DirectoryLoader):108 """109 This migration group depends on events and transactions110 """111 def __init__(self) -> None:112 super().__init__("snuba.snuba_migrations.discover")113 def get_migrations(self) -> Sequence[str]:114 return [115 "0001_discover_merge_table",116 "0002_discover_add_deleted_tags_hash_map",117 "0003_discover_fix_user_column",118 "0004_discover_fix_title_and_message",119 "0005_discover_fix_transaction_name",120 "0006_discover_add_trace_id",121 "0007_discover_add_span_id",122 ]123class OutcomesLoader(DirectoryLoader):124 def __init__(self) -> None:125 super().__init__("snuba.snuba_migrations.outcomes")126 def get_migrations(self) -> Sequence[str]:127 return [128 "0001_outcomes",129 "0002_outcomes_remove_size_and_bytes",130 "0003_outcomes_add_category_and_quantity",131 "0004_outcomes_matview_additions",132 ]133class ReplaysLoader(DirectoryLoader):134 def __init__(self) -> None:135 super().__init__("snuba.snuba_migrations.replays")136 def get_migrations(self) -> Sequence[str]:137 return [138 "0001_replays",139 "0002_add_url",140 "0003_alter_url_allow_null",141 "0004_add_error_ids_column",142 "0005_add_urls_user_agent_replay_start_timestamp",143 "0006_add_is_archived_column",144 ]145class MetricsLoader(DirectoryLoader):146 def __init__(self) -> None:147 super().__init__("snuba.snuba_migrations.metrics")148 def get_migrations(self) -> Sequence[str]:149 return [150 "0001_metrics_buckets",151 "0002_metrics_sets",152 "0003_counters_to_buckets",153 "0004_metrics_counters",154 "0005_metrics_distributions_buckets",155 "0006_metrics_distributions",156 "0007_metrics_sets_granularity_10",157 "0008_metrics_counters_granularity_10",158 "0009_metrics_distributions_granularity_10",159 "0010_metrics_sets_granularity_1h",160 "0011_metrics_counters_granularity_1h",161 "0012_metrics_distributions_granularity_1h",162 "0013_metrics_sets_granularity_1d",163 "0014_metrics_counters_granularity_1d",164 "0015_metrics_distributions_granularity_1d",165 "0016_metrics_sets_consolidated_granularity",166 "0017_metrics_counters_consolidated_granularity",167 "0018_metrics_distributions_consolidated_granularity",168 "0019_aggregate_tables_add_ttl",169 "0020_polymorphic_buckets_table",170 "0021_polymorphic_bucket_materialized_views",171 "0022_repartition_polymorphic_table",172 "0023_polymorphic_repartitioned_bucket_matview",173 "0024_metrics_distributions_add_histogram",174 "0025_metrics_counters_aggregate_v2",175 "0026_metrics_counters_v2_writing_matview",176 "0027_fix_migration_0026",177 "0028_metrics_sets_aggregate_v2",178 "0029_metrics_distributions_aggregate_v2",179 "0030_metrics_distributions_v2_writing_mv",180 "0031_metrics_sets_v2_writing_mv",181 "0032_redo_0030_and_0031_without_timestamps",182 "0033_metrics_cleanup_old_views",183 "0034_metrics_cleanup_old_tables",184 ]185class SessionsLoader(DirectoryLoader):186 def __init__(self) -> None:187 super().__init__("snuba.snuba_migrations.sessions")188 def get_migrations(self) -> Sequence[str]:189 return ["0001_sessions", "0002_sessions_aggregates", "0003_sessions_matview"]190class QuerylogLoader(DirectoryLoader):191 def __init__(self) -> None:192 super().__init__("snuba.snuba_migrations.querylog")193 def get_migrations(self) -> Sequence[str]:194 return ["0001_querylog", "0002_status_type_change", "0003_add_profile_fields"]195class ProfilesLoader(DirectoryLoader):196 def __init__(self) -> None:197 super().__init__("snuba.snuba_migrations.profiles")198 def get_migrations(self) -> Sequence[str]:199 return [200 "0001_profiles",201 "0002_disable_vertical_merge_algorithm",202 "0003_add_device_architecture",203 "0004_drop_profile_column",204 ]205class FunctionsLoader(DirectoryLoader):206 def __init__(self) -> None:207 super().__init__("snuba.snuba_migrations.functions")208 def get_migrations(self) -> Sequence[str]:209 return ["0001_functions"]210MigrationGroup = str211CONFIG_BUILT_MIGRATIONS = {212 MigrationGroup(loader.migration_group_name): loader213 for loader in [214 ConfigurationLoader(config_file)215 for config_file in glob(settings.MIGRATION_CONFIG_FILES_GLOB, recursive=True)216 ]217}218# This is a list instead of a dictionary to preserve the order219# of existing migrations. For example, the discover migrations220# need to run after the events/transactions migrations.221REGISTERED_GROUPS: list[tuple[str, GroupLoader]] = [222 ("system", SystemLoader()),...

Full Screen

Full Screen

migrator-dbtool.py

Source:migrator-dbtool.py Github

copy

Full Screen

...42 S.put_migration_version( migr_old_dir, ver )43 else: # advanced mode44 print( "advanced version=%s" % (ver) )45 #46 files = S.get_migrations( migr_new_dir )47 sfiles = db.get_migrations()48 S.put_migrations( migr_old_dir, sfiles )49 S.put_migration_version( migr_old_dir, ver )50 ver2 = S.compare_migration_tree(files, sfiles)51 if ver2 is not None:52 ver2id = ver2['id']53 print( "down version=%s" % (ver2id) )54 S.put_migration_version( migr_old_dir, ver2id, '.version2' )55elif sys.argv[1]=='pre-force':56 if not is_ver: # empty table57 print( "empty database" )58 elif not is_files: # legacy mode!59 print( "legacy version=%s" % (ver) )60 S.put_migration_version( migr_old_dir, ver )61 else: # advanced mode62 print( "advanced version=%s" % (ver) )63 #64 files = S.get_migrations( migr_new_dir )65 sfiles = db.get_migrations()66 S.put_migrations( migr_old_dir, sfiles )67 S.put_migration_version( migr_old_dir, ver )68 ver2 = S.compare_migration_tree_force(files, sfiles)69 if ver2 is not None:70 ver2id = ver2['id']71 print( "rollback version=%s" % (ver2id) )72 S.put_migration_version( migr_old_dir, ver2id, '.version2' )73elif sys.argv[1]=='post' or sys.argv[1]=='post-force':74 if not is_ver: # empty table75 S.error( "empty database" )76 elif not is_files: # still legacy mode!77 files = S.get_migrations( migr_new_dir )78 db.create_migrations()79 db.put_migrations( files )80 else: # advanced mode81 files = S.get_migrations( os.environ['DATABASE_MIGRATIONS'] )82 db.put_migrations( files, True )83elif sys.argv[1]=='log':84 if not is_log:85 db.create_log()86 db.put_log( sys.stdin.read() )87elif sys.argv[1]=='get-log':88 if not is_log:89 S.warning( "No log" )90 else:91 for v in db.get_log():92 sys.stdout.write("%s\t%d\t%s\n" % (v[0].strftime("%Y-%m-%d %H:%M:%S"), v[1], v[2]))...

Full Screen

Full Screen

test_migrations.py

Source:test_migrations.py Github

copy

Full Screen

...33 _add('foobar', '1.0.1a1')34 _add('foobar', '1.0a9.dev0')35 app_settings['applications'] = ['foobar', 'foobarother']36 def test_get_migrations_for_application(self):37 self.assertEqual(len(migrate.get_migrations('foobar')), 8)38 def test_get_migrations_for_application_in_order(self):39 self.assertEqual(40 [m.to_version_raw for m in migrate.get_migrations('foobar')], [41 '0.2',42 '0.5',43 '1.0a1',44 '1.0a9.dev0',45 '1.0',46 '1.0.1a1',47 '1.5.0',48 '2.1a1'49 ])50 def test_get_migrations_from_version(self):51 self.assertEqual(52 [m.to_version_raw for m in migrate.get_migrations('foobar', '1.0')], [53 '1.0.1a1',54 '1.5.0',55 '2.1a1'56 ])57 def test_get_migrations_to_version(self):58 self.assertEqual(59 [m.to_version_raw for m in migrate.get_migrations('foobar', to_version='1.0')], [60 '0.2',61 '0.5',62 '1.0a1',63 '1.0a9.dev0',64 '1.0'65 ])66 def test_run_migrations(self):67 migrate.run_migrations(68 self.layer.app,69 migrate.get_migrations('foobar'))70 site = self.layer.app['plone']['plone']71 registry = site['_registry']72 self.assertEquals(registry[MIGRATION_DATA_REGISTRY_KEY]['foobar'], '2.1a1')73 def test_should_only_get_migrations_for_activated_applications(self):74 self.assertEqual(len(migrate.get_migrations('foobar')), 8)75 app_settings['applications'] = []76 self.assertEqual(len(migrate.get_migrations('foobarother')), 0)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful