Best Python code snippet using autotest_python
nodewatcher.py
Source:nodewatcher.py  
...99    """100    _jobs = scheduler_module.has_jobs(hostname)101    log.debug("jobs=%s" % _jobs)102    return _jobs103def _lock_host(scheduler_module, hostname, unlock=False):104    """105    Lock/Unlock the given host (e.g. before termination).106    :param scheduler_module: scheduler specific module to use107    :param hostname: host to lock108    :param unlock: False to lock the host, True to unlock109    """110    log.debug("%s %s" % (unlock and "unlocking" or "locking", hostname))111    scheduler_module.lock_host(hostname, unlock)112    time.sleep(15)  # allow for some settling113def _self_terminate(asg_client, instance_id, decrement_desired=True):114    """115    Terminate the given instance and decrease ASG desired capacity.116    :param asg_client: ASG boto3 client117    :param instance_id: the instance to terminate118    :param decrement_desired: if True decrements ASG desired by 1119    """120    try:121        log.info("Self terminating %s" % instance_id)122        asg_client.terminate_instance_in_auto_scaling_group(123            InstanceId=instance_id, ShouldDecrementDesiredCapacity=decrement_desired124        )125        sys.exit(0)126    except ClientError as e:127        if e.response["Error"]["Code"] == "ValidationError":128            log.info("Min ASG size reached. Not terminating.")129        else:130            log.error("Failed when self terminating instance with error %s.", e.response)131    except Exception as e:132        log.error("Failed when self terminating instance with exception %s.", e)133def _maintain_size(asg_name, asg_client):134    """135    Verify if the desired capacity is lower than the configured min size.136    :param asg_name: the ASG to query for137    :param asg_client: ASG boto3 client138    :return: True if the desired capacity is lower than the configured min size.139    """140    try:141        asg = asg_client.describe_auto_scaling_groups(AutoScalingGroupNames=[asg_name]).get("AutoScalingGroups")[0]142        _capacity = asg.get("DesiredCapacity")143        _min_size = asg.get("MinSize")144        log.info("DesiredCapacity is %d, MinSize is %d" % (_capacity, _min_size))145        if _capacity > _min_size:146            log.debug("Capacity greater than min size.")147            return False148        else:149            log.debug("Capacity less than or equal to min size.")150            return True151    except Exception as e:152        log.error(153            "Failed when checking min cluster size with exception %s. Assuming capacity is greater than min size.", e154        )155        return False156def _dump_logs(instance_id):157    """Dump gzipped /var/log dir to /home/logs/compute/$instance_id.tar.gz."""158    logs_dir = "/home/logs/compute"159    filename = "{0}/{1}.tar.gz".format(logs_dir, instance_id)160    try:161        try:162            os.makedirs(logs_dir)163        except OSError as e:164            if e.errno != errno.EEXIST:165                raise166        log.info("Dumping logs to %s", filename)167        with closing(tarfile.open(filename, "w|gz")) as archive:168            archive.add("/var/log", recursive=True)169    except Exception as e:170        log.warning("Failed while dumping logs to %s with exception %s.", filename, e)171def _terminate_if_down(scheduler_module, config, asg_name, instance_id, max_wait):172    """Check that node is correctly attached to scheduler otherwise terminate the instance."""173    asg_client = boto3.client("autoscaling", region_name=config.region, config=config.proxy_config)174    @retry(wait_fixed=seconds(10), retry_on_result=lambda result: result is True, stop_max_delay=max_wait)175    def _poll_wait_for_node_ready():176        is_down = scheduler_module.is_node_down()177        if is_down:178            log.warning("Node reported as down")179        return is_down180    try:181        _poll_wait_for_node_ready()182    except RetryError:183        log.error("Node is marked as down by scheduler or not attached correctly. Terminating...")184        _dump_logs(instance_id)185        # jobwatcher already has the logic to request a new host in case of down nodes,186        # which is done in order to speed up cluster recovery.187        _self_terminate(asg_client, instance_id, decrement_desired=not _maintain_size(asg_name, asg_client))188@retry(189    wait_exponential_multiplier=seconds(1),190    wait_exponential_max=seconds(10),191    retry_on_result=lambda result: result is False,192    stop_max_delay=minutes(10),193)194def _wait_for_stack_ready(stack_name, region, proxy_config):195    """196    Verify if the Stack is in one of the *_COMPLETE states.197    :param stack_name: Stack to query for198    :param region: AWS region199    :param proxy_config: Proxy configuration200    :return: true if the stack is in the *_COMPLETE status201    """202    log.info("Waiting for stack %s to be ready", stack_name)203    cfn_client = boto3.client("cloudformation", region_name=region, config=proxy_config)204    stacks = cfn_client.describe_stacks(StackName=stack_name)205    stack_status = stacks["Stacks"][0]["StackStatus"]206    log.info("Stack %s is in status: %s", stack_name, stack_status)207    return stack_status in [208        "CREATE_COMPLETE",209        "UPDATE_COMPLETE",210        "UPDATE_ROLLBACK_COMPLETE",211        "CREATE_FAILED",212        "UPDATE_FAILED",213    ]214def _init_data_dir():215    """Create folder to store nodewatcher data."""216    try:217        if not os.path.exists(DATA_DIR):218            os.makedirs(DATA_DIR)219    except Exception as e:220        log.warning(221            "Unable to create the folder '%s' to persist current idle time. Failed with exception: %s", DATA_DIR, e222        )223def _store_idletime(idletime):224    """225    Save idletime to file, in json format.226    :param idletime: the idletime value to store227    """228    data = {"current_idletime": idletime}229    try:230        with open(IDLETIME_FILE, "w") as outfile:231            json.dump(data, outfile)232    except Exception as e:233        log.warning(234            "Unable to store idletime '%s' in the file '%s'. Failed with exception: %s", idletime, IDLETIME_FILE, e235        )236def _init_idletime():237    """238    Initialize idletime value (from file if there).239    :return: the current idletime value (0 if the file doesn't exist)240    """241    idletime = 0242    _init_data_dir()243    if os.path.isfile(IDLETIME_FILE):244        try:245            with open(IDLETIME_FILE) as f:246                data = json.loads(f.read())247                idletime = data.get("current_idletime", 0)248        except Exception as e:249            log.warning("Unable to get idletime from the file '%s'. Failed with exception: %s", IDLETIME_FILE, e)250    return idletime251def _lock_and_terminate(region, proxy_config, scheduler_module, hostname, instance_id):252    _lock_host(scheduler_module, hostname)253    if _has_jobs(scheduler_module, hostname):254        log.info("Instance has active jobs.")255        _lock_host(scheduler_module, hostname, unlock=True)256        return257    asg_client = boto3.client("autoscaling", region_name=region, config=proxy_config)258    _self_terminate(asg_client, instance_id)259    # _self_terminate exits on success260    _lock_host(scheduler_module, hostname, unlock=True)261def _refresh_cluster_properties(region, proxy_config, asg_name):262    """263    Return dynamic cluster properties (at the moment only max cluster size).264    The properties are fetched every CLUSTER_PROPERTIES_REFRESH_INTERVAL otherwise a cached value is returned.265    """266    if not hasattr(_refresh_cluster_properties, "cluster_properties_refresh_timer"):267        _refresh_cluster_properties.cluster_properties_refresh_timer = 0268        _refresh_cluster_properties.cached_max_cluster_size = None269    _refresh_cluster_properties.cluster_properties_refresh_timer += LOOP_TIME270    if (271        not _refresh_cluster_properties.cached_max_cluster_size272        or _refresh_cluster_properties.cluster_properties_refresh_timer >= CLUSTER_PROPERTIES_REFRESH_INTERVAL273    ):274        _refresh_cluster_properties.cluster_properties_refresh_timer = 0...test_kill_kernel.py
Source:test_kill_kernel.py  
...91                                                  proc_name=param['proc_name'],92                                                  timeout=60)93                    self.proc_list.remove(param)94        super(KillMajorCriticalProcess, self).tearDown()95    def _lock_host(self, host_id, host_name):96        print('Lock Host')97        host_action(self, host_id, "lock")98        cli_helpers.wait_until_host_state_equals(self, host_name,99                                                 'administrative', 'locked')100        print('Verify the unit state changes to locked-disabled-online')101        cli_helpers.wait_until_host_state_equals(self, host_name,102                                                 'availability', 'online')103    def _unlock_host(self, host_id, host_name):104        print('Unlock Host')105        host_action(self, host_id, "unlock")106        cli_helpers.wait_until_host_state_equals(self, host_name,107                                                 'administrative', 'unlocked')108        cli_helpers.wait_until_host_state_equals(self, host_name,109                                                 'availability', 'intest')...crawler.py
Source:crawler.py  
...112        except Exception as e:113            logging.debug("Can't process url '%s' (%s)" % (url, e))114            return False115        116    def _lock_host(self, url):117        """Pass if host is to be locked for crawling purposes"""118        return (self.locked and self._same_host(url)) or not self.locked119    def crawl(self):120        """ Main function in the crawling process.  Core algorithm is:121        q <- starting page122        while q not empty:123           url <- q.get()124           if url is new and suitable:125              page <- fetch(url)   126              q.put(urls found in page)127           else:128              nothing129        new and suitable means that we don't re-visit URLs we've seen130        already fetched, and user-supplied criteria like maximum...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
