Best Python code snippet using autotest_python
zeus.py
Source:zeus.py  
...121    if var is True:122        return return_code, res123    else:124        return return_code125def _remote_scp(src, dst, way, out=None):126    if way == "get":127        cmd = [128            "scp",129            "-r",130            "{user}@{host}:{src}".format(user=username, host=hostname, src=src),131            "{dst}".format(dst=dst),132        ]133    elif way == "put":134        cmd = [135            "scp",136            "-r",137            "{src}".format(src=src),138            "{user}@{host}:{dst}".format(user=username, host=hostname, dst=dst),139        ]140    else:141        if out:142            out.append_stdout("Scp wrapper not used properly\n")143        else:144            print("Scp wrapper not used properly\n")145        return -1146    popen = subprocess.Popen(147        cmd,148        stdout=subprocess.PIPE,149        stderr=subprocess.PIPE,150        universal_newlines=True,151    )152    for stdout_line in iter(popen.stdout.readline, ""):153        if out:154            out.append_stdout(stdout_line.strip() + "\n")155        else:156            print(stdout_line.strip())157    popen.stdout.close()158    return_code = popen.wait()159    if return_code:160        if out:161            for stdout_line in iter(popen.stderr.readline, ""):162                out.append_stdout(stdout_line.strip() + "\n")163        else:164            for stdout_line in iter(popen.stderr.readline, ""):165                print(stdout_line.strip())166    return return_code167def _remote_bsub(script_path, out=None):168    cmd = [169        "ssh",170        "{user}@{host}".format(user=username, host=hostname),171        "bsub < {cmd}".format(cmd=script_path),172    ]173    popen = subprocess.Popen(174        cmd,175        stdout=subprocess.PIPE,176        stderr=subprocess.PIPE,177        universal_newlines=True,178    )179    jobid = -1180    for stdout_line in iter(popen.stdout.readline, ""):181        if "is submitted to queue" in stdout_line:182            res = re.match(r"^.*<([0-9]*)>.*$", stdout_line)183            if res:184                jobid = res.group(1)185        if out:186            out.append_stdout(stdout_line.strip() + "\n")187        else:188            print(stdout_line.strip() + "\n")189    popen.stdout.close()190    return_code = popen.wait()191    if return_code:192        for stdout_line in iter(popen.stderr.readline, ""):193            if out:194                out.append_stdout(stdout_line.strip() + "\n")195            else:196                print(stdout_line.strip() + "\n")197    return return_code, jobid198def _remote_bjobs(jobid, out):199    status_str = "bjobs -o stat {jobid} | tail -n 1".format(jobid=jobid)200    cmd = [201        "ssh",202        "{user}@{host}".format(user=username, host=hostname),203        "{cmd}".format(cmd=status_str),204    ]205    popen = subprocess.Popen(206        cmd,207        stdout=subprocess.PIPE,208        stderr=subprocess.PIPE,209        universal_newlines=True,210    )211    status = None212    for stdout_line in iter(popen.stdout.readline, ""):213        status = stdout_line214    popen.stdout.close()215    return_code = popen.wait()216    if return_code:217        for stdout_line in iter(popen.stderr.readline, ""):218            out.append_stdout(stdout_line.strip() + "\n")219    return return_code, status220def execute(cmd):221    """Execute a command remotely on Zeus.222    Parameters223    ----------224    cmd : str225         Command to be executed on Zeus.226    Returns227    -------228    None229    Examples230    --------231    >>> zeus.execute('ls ~/')232    """233    _remote_cmd(cmd)234    return None235def get(src, dst):236    """Download a file from Zeus.237    Copy remote file specified in `src` from Zeus to the local path238    specified in `dst`.239    Parameters240    ----------241    src : str242         Path of the remote file to be downloaded from Zeus.243    dst : str244         Local path/name where to copy the file from Zeus.245    Returns246    -------247    None248    Examples249    --------250    >>> zeus.get('/users_home/user/file.nc', '/home/user/file.nc')251    """252    _remote_scp(src, dst, "get")253    return None254def put(src, dst):255    """Upload a file to Zeus.256    Copy a local file specified in `src` to the remote path specified in257    `dst` on Zeus.258    Parameters259    ----------260    src : str261         Path of the local file to be uploaded on Zeus.262    dst : str263         Path/name where to copy the file on Zeus.264    Returns265    -------266    None267    Examples268    --------269    >>> zeus.put('/home/user/file.nc', '/users_home/user/file.nc')270    """271    _remote_scp(src, dst, "put")272    return None273def process(script_name, data, compress=False, frequency=10):274    """Submit a script on LSF and download the output data.275    This function uploads the local script specified in `script_name`,276    executes the script on LSF (i.e., bsub < script_name) and downloads the277    output specified in `data` once the job is completed.278    Output data can be optionally compressed as tar.gz before downloading it.279    Job status checking frequency can be adjusted based on the expected job280    duration.281    The function is non-blocking, which means that other cells in the notebook282    can be executed right after this function is started, without needing to283    wait for the job to end.284    Parameters285    ----------286    script_name : src287        Absolute (local) path of the script to be submitted on LSF.288    data : src289        Absolute (remote) path of data to be downloaded.290    compress : bool, optional291        If data downloaded has to be compressed or not (default is False).292    frequency : int, optional293        Interval for checking job status in seconds (default is 10s).294    Returns295    -------296    None297    Examples298    --------299    >>> zeus.process('/home/user/script.lsf', '/users_home/user/output.nc',300    compress=True, frequency=20)301    """302    # Perform initial checks303    if not os.path.isabs(script_name) or not os.path.isfile(script_name):304        print("Please specify the absolute path of the script file")305        return None306    if not os.path.isabs(data):307        print("Please specify the absolute path of the remote data file")308        return None309    def thread_func(script_name, data, compress, out):310        if _remote_scp(script_name, os.path.basename(script_name), "put", out):311            out.append_stdout(312                "Something went wrong while uploading the script on Zeus"313            )314            return None315        ret, jobid = _remote_bsub(os.path.basename(script_name), out)316        if ret or int(jobid) < 0:317            out.append_stdout(318                "Something went wrong while running the script on LSF"319            )320            return None321        old_status = ""322        while True:323            ret, status = _remote_bjobs(jobid, out)324            if ret or not status:325                out.append_stdout(326                    "Something went wrong while checking the job status on LSF"327                )328                return None329            if status and status != old_status:330                out.append_stdout("Job status is: " + status)331                old_status = status332                if "DONE" in status or "EXIT" in status:333                    break334            time.sleep(frequency)335        if "EXIT" in status:336            out.append_stdout("Job execution was unsuccessful")337            return None338        if _remote_cmd("ls " + data, out):339            out.append_stdout("Remote data path not found")340            return None341        if compress:342            if _remote_cmd("mkdir -p %s" % tmp_path, out):343                out.append_stdout(344                    "Something went wrong while compressing the file"345                )346                return None347            if _remote_cmd(348                "tar -zcf %s%s.tar.gz %s"349                % (tmp_path, os.path.basename(data), data),350                out,351            ):352                out.append_stdout(353                    "Something went wrong while compressing the file"354                )355                return None356            local_path = os.path.join(357                os.environ["PWD"], os.path.basename(data) + ".tar.gz"358            )359            remote_path = os.path.join(360                tmp_path, os.path.basename(data) + ".tar.gz "361            )362        else:363            local_path = os.path.join(os.environ["PWD"], os.path.basename(data))364            remote_path = data365        if _remote_scp(remote_path, local_path, "get", out):366            out.append_stdout(367                "Something went wrong while getting the output data from Zeus"368            )369            return None370        if compress:371            _remote_cmd(372                "rm %s%s.tar.gz" % (tmp_path, os.path.basename(data)), out373            )374        out.append_stdout("Data has been downloaded in: " + local_path)375        return None376    display("Starting Job execution")377    out = widgets.Output()378    display(out)379    thread = threading.Thread(380        target=thread_func, args=(script_name, data, compress, out)381    )382    thread.start()383    return None384def info(jobid=None):385    """Get status of LSF jobs on Zeus.386    This function returns the list of the jobs the user submitted on LSF. If387    `jobid` argument is provided then only the status of that particular job388    is shown, otherwise all recent jobs will be listed.389    Parameters390    ----------391    jobid : int, optional392         ID of LSF job.393    Returns394    -------395    None396    Examples397    --------398    >>> zeus.info()399    >>> zeus.info(1234)400    """401    if jobid is not None:402        cmd = "bjobs " + str(jobid)403    else:404        cmd = "bjobs -a"405    _remote_cmd(cmd)406    return None407def start_dask(408    project,409    cores,410    memory,411    name="Dask-Test",412    processes=None,413    queue="p_short",414    local_directory="~/dask-space",415    interface="ib0",416    walltime=None,417    job_extra=None,418    env_extra=None,419    log_directory="~/dask-space",420    death_timeout=60,421    n_workers=1,422):423    """Start a new Dask cluster on Zeus424    This function starts a new Dask scheduler on the cluster front-end node425    and a set of worker process on the cluster compute nodes. The function426    returns a ready-to-use Dask client object. The arguments defined in the427    interface are lent from the Dask Jobqueue interface.428    Parameters429    ----------430    project : str431        Accounting string associated with each worker job. Passed to432        `#BSUB -P` option.433    cores : int434        Number of cores for the worker nodes. Passed to `#BSUB -n` option.435    memory: str436        Total amount of memory per worker job. Passed to `#BSUB -M` option.437    name: str, optional438        Name of Dask workers. By default set to Dask-test.439    processes: int, optional440        Cut the job up into this many processes. Good for GIL workloads or for441        nodes with many cores. By default, process ~= sqrt(cores) so that the442        number of processes and the number of threads per process is roughly443        the same.444    queue: str, optional445        Destination queue for each worker job. Passed to #BSUB -q option. By446        default `p_short` queue is used.447    local_directory: str, optional448        Dask worker local directory for file spilling. By default the folder449        `dask-space` in the home directory is used.450    interface: str, optional451        Network interface like `eth0` or `ib0`. This will be used for the Dask452        workers interface. By default `ib0` is used.453    walltime: str, optional454        Walltime for each worker job in HH:MM. Passed to `#BSUB -W` option. If455        not specified the default queue walltime is used.456    job_extra: list, optional457        List of optional LSF options, for example -x. Each option will be458        prepended with the #BSUB prefix.459    env_extra: list, optional460        Optional commands to add to script before launching worker.461    log_directory: str, optional462        Directory to use for job scheduler logs. By default the folder463        `dask-space` in the home directory is used.464    death_timeout: float, optional465        Seconds to wait for a scheduler before closing workers (default is 60).466    n_workers : int, optional467        Number of worker process to startup, i.e. jobs on LSF (default is 1).468    Returns469    -------470    dask.distributed.Client471        A ready-to-use Dask distributed client connected to the scheduler472    Examples473    --------474    >>> client = zeus.start_dask(475              project="R000",476              cores=36,477              memory="80 GB",478              name="Test",479              processes=12,480              local_directory="~/dask-space",481              interface="ib0",482              walltime="00:30",483              job_extra=["-x"],484              n_workers=1485             )486    Create a new cluster with a single worker on a whole Zeus node, using 12487    processes (3 threads/process), 80GB of RAM memory requested. Note that each488    process will get a maximum of 80/12GB of memory489    """490    # default values491    shebang = "#!/bin/bash"492    python = "python3"493    def lsf_format_bytes_ceil(n, lsf_units="mb"):494        """ Format bytes as text495        Convert bytes to megabytes which LSF requires.496        Parameters497        ----------498        n: int499            Bytes500        lsf_units: str501            Units for the memory in 2 character shorthand, kb through eb502        Examples503        --------504        >>> lsf_format_bytes_ceil(1234567890)505        '1235'506        """507        # Adapted from dask_jobqueue lsf.py508        units = {509            "B": 1,510            "KB": 10 ** 3,511            "MB": 10 ** 6,512            "GB": 10 ** 9,513            "TB": 10 ** 12,514        }515        number, unit = [string.strip() for string in n.split()]516        lsf_units = lsf_units.lower()[0]517        converter = {"k": 1, "m": 2, "g": 3, "t": 4, "p": 5, "e": 6, "z": 7}518        return "%d" % math.ceil(519            float(number) * units[unit] / (1000 ** converter[lsf_units])520        )521    def create_scheduler_script(522        shebang, python, name, log_directory, env_extra523    ):524        sched_script_lines = []525        sched_script_lines.append("%s" % shebang)526        """527        sched_script_lines.append("")528        sched_script_lines.append("#BSUB -J scheduler_%s" % name)529        sched_script_lines.append("#BSUB -e %s/scheduler_%s-%%J.err" % (log_directory, name))530        sched_script_lines.append("#BSUB -o %s/scheduler_%s-%%J.out" % (log_directory, name))531        sched_script_lines.append("#BSUB -q %s" % scheduler_queue)532        sched_script_lines.append("#BSUB -P %s" % project)533        memory_string = lsf_format_bytes_ceil(scheduler_memory)534        sched_script_lines.append("#BSUB -M %s" % memory_string)535        if scheduler_cores > 36:536            scheduler_cores = 36537            print("Worker cores specification for LSF higher than available, initializing it to %s" % scheduler_cores)538        sched_script_lines.append("#BSUB -n %s" % scheduler_cores)539        if scheduler_cores > 1:540            sched_script_lines.append('#BSUB -R "span[hosts=1]"')541        if walltime is not None:542            sched_script_lines.append("#BSUB -W %s" % walltime)543        if job_extra is not None:544            sched_script_lines.extend(["#BSUB %s" % arg for arg in job_extra])545        """546        # Zeus specific lines547        sched_script_lines.append("")548        sched_script_lines.append("module load anaconda/3.7")549        sched_script_lines.append("source activate %s" % conda_env)550        if env_extra is not None:551            sched_script_lines.extend(["%s" % arg for arg in env_extra])552        # Executable lines553        sched_exec = "%s -m distributed.cli.dask_scheduler" % python554        sched_exec += (555            " --port 0 --dashboard-address 0 --scheduler-file %s/connection"556            " --idle-timeout 3600"557            % local_directory558        )559        sched_exec += " --interface ens2f1"560        sched_exec += " >> %s/scheduler_%s.log 2>&1 &" % (log_directory, name)561        sched_script_lines.append(sched_exec)562        sched_script = "\n".join(sched_script_lines)563        return sched_script564    def create_worker_script(565        shebang,566        name,567        log_directory,568        project,569        worker_queue,570        worker_memory,571        worker_cores,572        walltime,573        job_extra,574        env_extra,575        interface,576        processes,577        death_timeout,578        sched_ip,579    ):580        if log_directory[0:1] == "~":581            log_directory = log_directory.replace("~", home)582        worker_script_lines = []583        worker_script_lines.append("%s" % shebang)584        worker_script_lines.append("")585        worker_script_lines.append("#BSUB -J dask_worker_%s" % name)586        worker_script_lines.append(587            "#BSUB -e %s/worker_%s-%%J.err" % (log_directory, name)588        )589        worker_script_lines.append(590            "#BSUB -o %s/worker_%s-%%J.out" % (log_directory, name)591        )592        worker_script_lines.append("#BSUB -q %s" % worker_queue)593        worker_script_lines.append("#BSUB -P %s" % project)594        memory_string = lsf_format_bytes_ceil(worker_memory)595        worker_script_lines.append("#BSUB -M %s" % memory_string)596        if worker_cores > 36:597            worker_cores = 36598            print(599                "Worker cores specification for LSF higher than available, "600                "initializing it to %s" % worker_cores601            )602        worker_script_lines.append("#BSUB -n %s" % worker_cores)603        if worker_cores > 1:604            worker_script_lines.append('#BSUB -R "span[hosts=1]"')605        if walltime is not None:606            worker_script_lines.append("#BSUB -W %s" % walltime)607        if job_extra is not None:608            worker_script_lines.extend(["#BSUB %s" % arg for arg in job_extra])609        # Python env specific lines610        worker_script_lines.append("")611        worker_script_lines.append("module load anaconda/3.7")612        worker_script_lines.append("source activate %s" % conda_env)613        if env_extra is not None:614            worker_script_lines.extend(["%s" % arg for arg in env_extra])615        # Executable lines616        worker_exec = "%s -m distributed.cli.dask_worker %s" % (617            python,618            sched_ip,619        )620        worker_exec += " --local-directory %s" % local_directory621        worker_exec += " --interface %s" % interface622        # Detect memory, processes and threads per each worker623        if processes is None:624            processes = max(math.floor(math.sqrt(worker_cores)), 1)625        threads = max(math.floor(float(worker_cores) / processes), 1)626        mem = float(memory_string) / processes627        worker_exec += (628            " --nthreads %i --nprocs %i --memory-limit %.2fMB --name 0 --nanny"629            " --death-timeout %i" % (threads, processes, mem, death_timeout)630        )631        worker_script_lines.append(worker_exec)632        worker_script = "\n".join(worker_script_lines)633        return worker_script634    def delete_tmp_files(local_path, remote_path):635        local_file = os.path.join(local_path, "scheduler.sh")636        if os.path.exists(local_file):637            os.remove(local_file)638        local_file = os.path.join(local_path, "worker.lsf")639        if os.path.exists(local_file):640            os.remove(local_file)641        local_file = os.path.join(local_path, "connection")642        if os.path.exists(local_file):643            os.remove(local_file)644        _remote_cmd("rm %s{%s,%s}" % (tmp_path, "scheduler.sh", "worker.lsf"))645        return None646    def run_scheduler(local_path, remote_path, local_directory, sched_script):647        timeout = 20648        local_file = os.path.join(local_path, "scheduler.sh")649        remote_file = os.path.join(remote_path, "scheduler.sh")650        with open(local_file, "w") as sched_file:651            sched_file.write(sched_script)652        if _remote_scp(local_file, remote_file, "put"):653            print("Error while copying scripts to Zeus")654            return None655        if _remote_cmd("/bin/bash %s" % remote_file):656            print("Something went wrong while executing Dask scheduler script")657            delete_tmp_files(local_path, remote_path)658            stop_dask()659            return None660        # Check connection file availablility661        i = 0662        ret = -1663        while i < timeout:664            time.sleep(1)665            ret = _remote_cmd("ls %s/connection" % local_directory)666            if ret == 0:667                break668            i += 1669        if ret != 0:670            print("Unable to retrieve Dask scheduler address")671            delete_tmp_files(local_path, remote_path)672            stop_dask()673            return None674        local_file = os.path.join(local_path, "connection")675        remote_file = "%s/connection" % local_directory676        if _remote_scp(remote_file, local_file, "get"):677            print("Error while copying files from Zeus")678            delete_tmp_files(local_path, remote_path)679            stop_dask()680            return None681        # Read connection info682        import json683        sched_address = None684        with open(local_file) as f:685            data = json.load(f)686            if "address" in data:687                sched_address = data["address"]688        if sched_address is None:689            print(690                "Something went wrong while retreiving Dask scheduler address"691            )692            delete_tmp_files(local_path, remote_path)693            stop_dask()694            return None695        return sched_address696    def run_workers(local_path, remote_path, n_workers, worker_script):697        local_file = os.path.join(local_path, "worker.lsf")698        remote_file = os.path.join(remote_path, "worker.lsf")699        with open(local_file, "w") as worker_file:700            worker_file.write(worker_script)701        if _remote_scp(local_file, remote_file, "put"):702            print("Error while copying scripts to Zeus")703            delete_tmp_files(local_path, remote_path)704            stop_dask()705            return None706        if n_workers < 1:707            n_workers = 1708        # Run worker scripts709        job_array = []710        job_num = 0711        for i in range(0, n_workers):712            if job_num > 0:713                if _remote_cmd(714                    "sed -i 's/--name %i/--name %i/g' %s"715                    % (job_num - 1, job_num, remote_file)...remote.py
Source:remote.py  
...194        time.sleep(2)195    # Timeout expired; try one more time but don't catch exceptions196    return remote_login(client, host, port, username, password, prompt,197                        linesep, log_filename, internal_timeout)198def _remote_scp(session, password_list, transfer_timeout=600, login_timeout=20):199    """200    Transfer file(s) to a remote host (guest) using SCP.  Wait for questions201    and provide answers.  If login_timeout expires while waiting for output202    from the child (e.g. a password prompt), fail.  If transfer_timeout expires203    while waiting for the transfer to complete, fail.204    @brief: Transfer files using SCP, given a command line.205    @param session: An Expect or ShellSession instance to operate on206    @param password_list: Password list to send in reply to the password prompt207    @param transfer_timeout: The time duration (in seconds) to wait for the208            transfer to complete.209    @param login_timeout: The maximal time duration (in seconds) to wait for210            each step of the login procedure (i.e. the "Are you sure" prompt or211            the password prompt)212    @raise SCPAuthenticationError: If authentication fails213    @raise SCPTransferTimeoutError: If the transfer fails to complete in time214    @raise SCPTransferFailedError: If the process terminates with a nonzero215            exit code216    @raise SCPError: If some other error occurs217    """218    password_prompt_count = 0219    timeout = login_timeout220    authentication_done = False221    scp_type = len(password_list)222    while True:223        try:224            match, text = session.read_until_last_line_matches(225                [r"[Aa]re you sure", r"[Pp]assword:\s*$", r"lost connection"],226                timeout=timeout, internal_timeout=0.5)227            if match == 0:  # "Are you sure you want to continue connecting"228                logging.debug("Got 'Are you sure...', sending 'yes'")229                session.sendline("yes")230                continue231            elif match == 1:  # "password:"232                if password_prompt_count == 0:233                    logging.debug("Got password prompt, sending '%s'" %234                                   password_list[password_prompt_count])235                    session.sendline(password_list[password_prompt_count])236                    password_prompt_count += 1237                    timeout = transfer_timeout238                    if scp_type == 1:239                        authentication_done = True240                    continue241                elif password_prompt_count == 1 and scp_type == 2:242                    logging.debug("Got password prompt, sending '%s'" %243                                   password_list[password_prompt_count])244                    session.sendline(password_list[password_prompt_count])245                    password_prompt_count += 1246                    timeout = transfer_timeout247                    authentication_done = True248                    continue249                else:250                    raise SCPAuthenticationError("Got password prompt twice",251                                                 text)252            elif match == 2:  # "lost connection"253                raise SCPError("SCP client said 'lost connection'", text)254        except aexpect.ExpectTimeoutError, e:255            if authentication_done:256                raise SCPTransferTimeoutError(e.output)257            else:258                raise SCPAuthenticationTimeoutError(e.output)259        except aexpect.ExpectProcessTerminatedError, e:260            if e.status == 0:261                logging.debug("SCP process terminated with status 0")262                break263            else:264                raise SCPTransferFailedError(e.status, e.output)265def remote_scp(command, password_list, log_filename=None, transfer_timeout=600,266               login_timeout=20):267    """268    Transfer file(s) to a remote host (guest) using SCP.269    @brief: Transfer files using SCP, given a command line.270    @param command: The command to execute271        (e.g. "scp -r foobar root@localhost:/tmp/").272    @param password_list: Password list to send in reply to a password prompt.273    @param log_filename: If specified, log all output to this file274    @param transfer_timeout: The time duration (in seconds) to wait for the275            transfer to complete.276    @param login_timeout: The maximal time duration (in seconds) to wait for277            each step of the login procedure (i.e. the "Are you sure" prompt278            or the password prompt)279    @raise: Whatever _remote_scp() raises280    """281    logging.debug("Trying to SCP with command '%s', timeout %ss",282                  command, transfer_timeout)283    if log_filename:284        output_func = utils_misc.log_line285        output_params = (log_filename,)286    else:287        output_func = None288        output_params = ()289    session = aexpect.Expect(command,290                                    output_func=output_func,291                                    output_params=output_params)292    try:293        _remote_scp(session, password_list, transfer_timeout, login_timeout)294    finally:295        session.close()296def scp_to_remote(host, port, username, password, local_path, remote_path,297                  limit="", log_filename=None, timeout=600):298    """299    Copy files to a remote host (guest) through scp.300    @param host: Hostname or IP address301    @param username: Username (if required)302    @param password: Password (if required)303    @param local_path: Path on the local machine where we are copying from304    @param remote_path: Path on the remote machine where we are copying to305    @param limit: Speed limit of file transfer.306    @param log_filename: If specified, log all output to this file307    @param timeout: The time duration (in seconds) to wait for the transfer...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
