Best Python code snippet using autotest_python
em_migration_controller.py
Source:em_migration_controller.py  
1import em_path_inject # noqa2import os3import sys4import traceback5from future.moves.urllib.parse import quote6import tarfile7import time8import em_common9import em_constants10import em_utils11from em_migration import migration_handlers_dict, BaseMigrationHandler12from em_migration.em_model_migration_metadata import MigrationMetadata13from em_migration.process_control import (14    NonMigrationDataInputsController,15    SavedsearchController,16    ProcessControlInternalException17)18from modinput_wrapper.base_modularinput import BaseModularInput19from rest_handler.session import session20from service_manager.splunkd.conf import ConfManager21from splunk import getDefault22from splunk.clilib.bundle_paths import make_splunkhome_path23from splunklib.client import Service24from logging_utils import log25APP_CONF_FILE = 'app'26BACKUP_DIR_NAME = 'migration_backup'27class KnownMigrationException(Exception):28    def __init__(self, msg):29        super(KnownMigrationException, self).__init__(msg)30class EMMigrationController(BaseModularInput):31    app = em_constants.APP_NAME32    name = 'sai_migration_controller'33    title = 'Splunk App for Infrastructure - Version Migration Controller'34    description = 'Modular input to handle SAI migrations from the previous ' + \35        'version to the current version'36    use_external_validation = False37    use_kvstore_checkpointer = False38    use_hec_event_writer = False39    use_single_instance = True40    def __init__(self):41        super(EMMigrationController, self).__init__()42    def do_additional_setup(self):43        log_level = self.inputs.get('job').get('log_level', 'INFO')44        self.logger = log.getLogger(logger_name=self.name,45                                    log_level=log_level)46        self.check_kvstore_readiness()47        self.session_key = session['authtoken']48        self.dry_run = self.inputs.get('job').get('dry_run', '').lower() in ('1', 'true')49        server_uri = em_common.get_server_uri()50        self.splunkd_service = Service(51            port=getDefault('port'),52            token=self.session_key,53            owner='nobody',54            app=em_constants.APP_NAME,55        )56        self.app_conf_manager = ConfManager(57            conf_file=APP_CONF_FILE,58            server_uri=server_uri,59            session_key=self.session_key,60            app=em_constants.APP_NAME61        )62        self.migration_metadata = MigrationMetadata.get()63        self.current_version = self.migration_metadata.latest_migrated_version64        self.new_version = self.app_conf_manager.get_stanza('launcher')['entry'][0]['content']['version']65        self.data_inputs_controller = NonMigrationDataInputsController()66        self.savedsearch_controller = SavedsearchController()67    @property68    def current_version(self):69        return self._current_version70    @current_version.setter71    def current_version(self, version):72        self.migration_metadata.latest_migrated_version = version73        self.migration_metadata.save()74        self._current_version = version75    def do_execute(self):76        migration_started = False77        try:78            # Only run on the search head captain in a search head cluster79            if not em_common.modular_input_should_run(self.session_key):80                self.logger.info('SAI migration skipped on non-search head captain in search head cluster')81                return82            # initialize latest migrated version83            if self.current_version == '?.0.0' and self.is_fresh_installation():84                self.current_version = self.new_version85            if self.current_version != self.new_version:86                migration_started = True87                self.set_migration_running_status(running=True)88                self.backup_app()89                self.disable_non_migration_processes()90                for version in migration_handlers_dict:91                    if self.current_version == '?.0.0' or em_utils.is_lower_version(self.current_version, version):92                        self.execute_migration(version)93                        self.current_version = version94                # ensure that the latest migrated version is the latest app version even95                # when there's no migration needed for new version96                self.current_version = self.new_version97                self.post_migration_success_message()98        except KnownMigrationException as e:99            self.logger.error(e)100            self.post_migration_failure_message(e)101        # Unknown migration errors102        except Exception as e:103            self.logger.error('Failed to migrate to version %s - Error: %s' % (self.new_version, e))104            self.logger.debug(traceback.format_exc())105            query = quote('index=_internal source="*%s.log*"' % self.name)106            search_link = 'Check [[/app/splunk_app_infrastructure/search?q=%s|SAI migration logs]] for details' % query107            self.post_migration_failure_message(search_link)108        finally:109            if migration_started:110                self.set_migration_running_status(running=False)111                self.enable_non_migration_processes()112    def check_kvstore_readiness(self):113        try:114            em_common.check_kvstore_readiness(session['authtoken'])115        except em_common.KVStoreNotReadyException as e:116            self.logger.error('Migration failed because KVStore is not ready - Error: %s' % e)117            query = quote('index=_internal source="*%s.log*"' % self.name)118            search_link = 'Check [[/app/splunk_app_infrastructure/search?q=%s|SAI migration logs]] for details' % query119            fail_message = 'SAI migration failed because KVStore is not ready. %s' % search_link120            self.post_message('error', fail_message)121            sys.exit(1)122    def is_fresh_installation(self):123        entity_store = self.splunkd_service.kvstore['em_entities'].data124        group_store = self.splunkd_service.kvstore[em_constants.STORE_GROUPS].data125        entities = entity_store.query(limit=1)126        groups = group_store.query(limit=1)127        de_entity_store = self.splunkd_service.kvstore[em_constants.STORE_ENTITY_CACHE].data128        de_entities = de_entity_store.query(limit=1)129        # if there's no entities and no groups, there should be no alerts130        return len(entities) == 0 and len(groups) == 0 and len(de_entities) == 0131    def set_migration_running_status(self, running):132        """133        Update migration status that indicates if migration is ongoing134        :param running - boolean indicating if migration is running135        :type boolean136        """137        if not isinstance(running, bool):138            raise KnownMigrationException('Invalid migration status set')139        self.migration_metadata.is_running = int(running)140        self.migration_metadata.save()141    def backup_app(self):142        """143        Create a backup tarball of the app's local configurations144        """145        self.logger.info('Start creating backup of existing app installation...')146        sai_app_path = make_splunkhome_path(['etc', 'apps', em_constants.APP_NAME])147        backup_folder = os.path.join(sai_app_path, BACKUP_DIR_NAME)148        backup_file_path = os.path.join(backup_folder, 'sai_app_backup_%d.tgz' % time.time())149        if not os.path.exists(backup_folder):150            os.makedirs(backup_folder)151        with tarfile.open(backup_file_path, 'w:gz') as tar:152            local_conf_dir = os.path.join(sai_app_path, 'local')153            local_meta = os.path.join(sai_app_path, 'metadata', 'local.meta')154            tar.add(local_conf_dir, recursive=True, arcname=os.path.join(em_constants.APP_NAME, 'local'))155            tar.add(local_meta, recursive=True, arcname=os.path.join(em_constants.APP_NAME, 'metadata', 'local.meta'))156        self.post_message(157            'info',158            'SAI is migrating to version %s. Saved backup of current version at %s.' % (159                self.new_version,160                os.path.join('$SPLUNK_HOME', 'etc', 'apps', em_constants.APP_NAME, BACKUP_DIR_NAME)161            )162        )163        self.logger.info('Created backup file of existing app installation at %s.' % backup_file_path)164    def disable_non_migration_processes(self):165        self.data_inputs_controller.disable()166        self.savedsearch_controller.disable()167    def enable_non_migration_processes(self):168        try:169            self.data_inputs_controller.enable()170            self.savedsearch_controller.enable()171        except ProcessControlInternalException as e:172            self.logger.error('Failed to re-enable non-migration processes - Error: %s' % e)173            self.post_message(174                'warn',175                'Failed to enable processes that were disabled during migration. Please re-enable them manually.'176            )177    def execute_migration(self, migrate_to_version):178        """179        Executes migration handlers for the input target version180        :param migrate_to_version - target version to migrate to181        :type string182        """183        migration_dict = migration_handlers_dict.get(migrate_to_version, {})184        # default minimal version to 1.0.0, versions before that are not supported185        minimal_version = migration_dict.get('minimal_version', '1.0.0')186        migration_handlers = migration_dict.get('handlers', [])187        if not self.should_execute(migrate_to_version, minimal_version, migration_handlers):188            self.logger.info('No migration needs to be applied from version %s to %s, skipping...' % (189                self.current_version,190                migrate_to_version191            ))192            return193        self.logger.info('Starting SAI migration from version %s to %s...' % (self.current_version, migrate_to_version))194        for handler_cls in migration_handlers:195            migration_handler_obj = handler_cls(self.logger, self.session_key)196            migration_handler_obj.execute(dry_run=self.dry_run)197        if self.dry_run:198            self.logger.info('SAI tested migration from version %s to %s. No changes have been applied.' % (199                self.current_version,200                migrate_to_version201            ))202        else:203            self.logger.info('SAI migration from version %s to %s succeeded' % (204                self.current_version,205                migrate_to_version206            ))207    def should_execute(self, migrate_to_version, minimal_version, migration_handlers):208        # Check if there was an error in loading versions209        if not self.current_version:210            raise KnownMigrationException('Migration unable to identify app version')211        # check if version satisfies minimal version212        if self.current_version != '?.0.0' and em_utils.is_lower_version(self.current_version, minimal_version):213            raise KnownMigrationException('Migration cannot migrate from this base version')214        if len(migration_handlers) == 0:215            return False216        # Check that each migration in the migration handler is supported by this version217        for handler_cls in migration_handlers:218            if not issubclass(handler_cls, BaseMigrationHandler):219                raise KnownMigrationException('Migration handler of invalid type found')220        return self.current_version != migrate_to_version221    def post_migration_success_message(self):222        success_msg = 'SAI successfully migrated to version %s' % self.new_version223        self.post_message('info', success_msg)224    def post_migration_failure_message(self, complementary_message):225        backup_dir_path = os.path.join('$SPLUNK_HOME', 'etc', 'apps', em_constants.APP_NAME, BACKUP_DIR_NAME)226        fail_message = 'SAI failed to migrate to version %s. %s. Saved backup of old version at %s.' % (227            self.new_version,228            complementary_message,229            backup_dir_path230        )231        self.post_message('error', fail_message)232    def post_message(self, severity, message):233        self.splunkd_service.messages.create(234            '[SAI migration]',235            severity=severity,236            value=message237        )238if __name__ == '__main__':239    instance = EMMigrationController()...compat.py
Source:compat.py  
1"""Provides a simple migration layer for pipeline jsons.2Given a pipeline version, a mapping will be used to establish if the3pipeline should be migrated, that is, modified in place, until it is4brought to the latest version used in Orchest.5If you are to implement a new migration add another _migrate_...6function following the given convention: 1.0.0 -> _migrate_1_0_0. Then7add a mapping to the "_version_to_migration_function" dictionary to8map the current version to the migration function and the version to9which it will migrate.10Note that the system only supports forward migrations.11"""12from _orchest.internals import utils as _utils13def _migrate_1_0_0(pipeline: dict) -> None:14    """Pre-k8s migrations"""15    # Take care of old pipelines with no defined params.16    if "parameters" not in pipeline:17        pipeline["parameters"] = {}18    # Pipelines had no constraints related to env var names.19    services = pipeline.get("services", {})20    for service in services.values():21        env_variables = service.get("env_variables", {})22        if not _utils.are_environment_variables_valid(env_variables):23            tmp = {}24            for key, value in env_variables.items():25                valid_key = _utils.make_env_var_name_valid(key)26                # Do not lose variables to collisions.27                i = 228                while valid_key in tmp:29                    valid_key = f"{valid_key}_{i}"30                    i += 131                tmp[valid_key] = value32            service["env_variables"] = tmp33        env_vars_inherit = service.get("env_variables_inherit", [])34        tmp = set()35        for var in env_vars_inherit:36            valid_var = _utils.make_env_var_name_valid(var)37            i = 238            while valid_var in tmp:39                valid_var = f"{valid_var}_{i}"40                i += 141            tmp.add(valid_var)42        service["env_variables_inherit"] = list(tmp)43    for step in pipeline["steps"].values():44        if (45            "kernel" in step46            and "name" in step["kernel"]47            and step["kernel"]["name"] == "ir"48        ):49            step["kernel"]["name"] = "r"50def _migrate_1_1_0(pipeline: dict) -> None:51    """Migrate from pre-k8s to k8s."""52    services = pipeline.get("services", {})53    for service in services.values():54        # Can't use a get() with a default value because it would imply55        # a different thing. Migrate from pre-k8s to k8s.56        if "entrypoint" in service:57            if "command" in service:58                service["args"] = service["command"]59                del service["command"]60            service["command"] = service["entrypoint"]61            del service["entrypoint"]62        if not service.get("ports", []):63            service["ports"] = [8080]64        service["exposed"] = service.get("exposed", False)65        service["requires_authentication"] = service.get(66            "requires_authentication", True67        )68# version: (migration function, version to which it's migrated)69_version_to_migration_function = {70    "1.0.0": (_migrate_1_0_0, "1.1.0"),71    "1.1.0": (_migrate_1_1_0, "1.2.0"),72}73# Make sure no forward version is repeated.74__to_versions = set([item[1] for item in _version_to_migration_function.items()])75assert len(_version_to_migration_function) == len(__to_versions)76# Make sure no migration function is repeated.77__migration_functions = set(78    [item[0] for item in _version_to_migration_function.items()]79)80assert len(_version_to_migration_function) == len(__migration_functions)81def migrate_pipeline(pipeline: dict):82    """Migrates a pipeline in place to the latest version."""83    if not pipeline.get("version", ""):84        pipeline["version"] = "1.0.0"85    while pipeline["version"] in _version_to_migration_function:86        migration_func, migrate_to_version = _version_to_migration_function[87            pipeline["version"]88        ]89        migration_func(pipeline)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
