Best Python code snippet using autotest_python
aempkg
Source:aempkg  
...173    # state='present'174    # --------------------------------------------------------------------------------175    def present(self):176        if not self.pkg_uploaded or self.force == 'yes':177            self.upload_pkg()178        if not self.pkg_installed or self.force == 'yes':179            self.install_pkg()180    # --------------------------------------------------------------------------------181    # state='absent'182    # --------------------------------------------------------------------------------183    def absent(self):184        if self.pkg_installed or self.force == 'yes':185            self.uninstall_pkg()186        if self.pkg_uploaded or self.force == 'yes':187            self.delete_pkg()188    # --------------------------------------------------------------------------------189    # state='uploaded'190    # --------------------------------------------------------------------------------191    def uploaded(self):192        if not self.pkg_uploaded:193            self.upload_pkg()194    # --------------------------------------------------------------------------------195    # state='uninstalled'196    # --------------------------------------------------------------------------------197    def uninstalled(self):198        if self.pkg_installed:199            self.uninstall_pkg()200    # --------------------------------------------------------------------------------201    # Upload a package.202    #203    # NOTE: this is ugly and uses curl to upload the package until I can figure204    #       out the Python code to do it.205    #206    # WARNING: This leaves the admin_username and admin_password exposed to a ps command.207    #208    # TODO: re-implement this (perhaps using code from209    #       http://code.activestate.com/recipes/578846-composing-a-postable-http-request-with-multipartfo/)210    # --------------------------------------------------------------------------------211    def upload_pkg(self):212        if not self.path and not self.url:213            self.module.fail_json(msg='Missing required argument: path or url')214        start_time = time.time()215        while True:216            if self.url:217                file = '/tmp/' + os.path.basename(self.url)218                if not self.module.check_mode:219                    cmd="%s -s -w '%%{http_code}' -o %s %s" % (self.curl_cmd, file, self.url)220                    (rc, out, err) = self.module.run_command(cmd)221                    if rc != 0 or out != '200':222                        if os.path.isfile(file):223                            os.remove(file)224                        self.module.fail_json(msg='upload_pkg: failed to retrieve package: rc=%s stdout=%s stderr=%s' % (rc, out, err))225                    self.msg.append('package retrieved')...releases.py
Source:releases.py  
1import os2import subprocess3import tempfile4import sys5import click6import rapidjson as json7import github8import tenacity9from .shards import (10    make_repodata_shard,11    get_shard_path,12    shard_exists,13    push_shard,14)15from .metadata import UNDISTRIBUTABLE16from .utils import split_pkg, print_github_api_limits17@tenacity.retry(18    wait=tenacity.wait_random_exponential(multiplier=1, max=10),19    stop=tenacity.stop_after_attempt(5),20    reraise=True,21)22def get_or_make_release(repo, subdir, pkg, repo_pth=None, make_commit=True):23    tag = f"{subdir}/{pkg}"24    try:25        rel = repo.get_release(tag)26    except github.UnknownObjectException:27        repo_sha = make_or_get_commit(28            subdir,29            pkg,30            make_commit=make_commit,31            repo_pth=repo_pth,32        )33        rel = repo.create_git_tag_and_release(34            tag,35            "",36            tag,37            "",38            repo_sha,39            "commit",40        )41    curr_asts = [ast for ast in rel.get_assets()]42    return rel, curr_asts43@tenacity.retry(44    wait=tenacity.wait_random_exponential(multiplier=1, max=10),45    stop=tenacity.stop_after_attempt(5),46    reraise=True,47)48def upload_asset(rel, curr_asts, pth, content_type):49    name = os.path.basename(pth)50    ast = None51    for _ast in curr_asts:52        if _ast.name == name:53            ast = _ast54            break55    print("found asset %s for %s" % (ast, name), flush=True)56    if ast is None:57        ast = rel.upload_asset(pth, content_type=content_type)58        curr_asts.append(ast)59    return ast60@tenacity.retry(61    wait=tenacity.wait_random_exponential(multiplier=1, max=10),62    stop=tenacity.stop_after_attempt(5),63    reraise=True,64)65def make_or_get_commit(subdir, pkg, make_commit=False, repo_pth=None):66    if repo_pth is None:67        repo_pth = "."68    if make_commit:69        subprocess.run(70            f"cd {repo_pth} && git pull --no-edit",71            shell=True,72            check=True,73        )74        subprocess.run(75            f"cd {repo_pth} && git commit --allow-empty -m "76            f"'{subdir}/{pkg} [ci skip] [cf admin skip] ***NO_CI***'",77            shell=True,78            check=True,79        )80    repo_sha = subprocess.run(81        f"cd {repo_pth} && git rev-parse --verify HEAD",82        shell=True,83        capture_output=True,84    ).stdout.decode("utf-8").strip()85    if make_commit:86        subprocess.run(87            f"cd {repo_pth} && git pull --no-edit",88            shell=True,89            check=True,90        )91        subprocess.run(92            f"cd {repo_pth} && git push",93            shell=True,94            check=True,95        )96    return repo_sha97@click.command()98def main():99    """Make a GitHub release of a package and upload the repodata shard.100    This command is meant to be run inside of GitHub actions, triggered on101    repo dispatch events.102    """103    # pull event data104    with open(os.environ["GITHUB_EVENT_PATH"], 'r') as fp:105        event_data = json.load(fp)106    assert event_data["action"] in ["release", "validate"]107    # package info108    subdir = event_data['client_payload']["subdir"]109    pkg = event_data['client_payload']["package"]110    url = event_data['client_payload']["url"]111    label = event_data['client_payload']["label"]112    feedstock = event_data['client_payload']["feedstock"]113    add_shard = event_data['client_payload'].get("add_shard", True)114    md5_val = event_data['client_payload']["md5"]115    print("subdir/package: %s/%s" % (subdir, pkg), flush=True)116    print("url:", url, flush=True)117    print("add shard:", add_shard, flush=True)118    shard_pth = get_shard_path(subdir, pkg)119    shard_pth_exists = shard_exists(shard_pth)120    print("shard exists:", shard_pth_exists, flush=True)121    print("shard path:", shard_pth, flush=True)122    if shard_pth_exists:123        print("shard already exists! not uploading new package!", flush=True)124        sys.exit(0)125    # we are not making github releases anymore126    # if split_pkg(os.path.join(subdir, pkg))[1] in UNDISTRIBUTABLE:127    #     upload_pkg = False128    # else:129    #     upload_pkg = True130    upload_pkg = False131    # repo info132    gh = github.Github(os.environ["GITHUB_TOKEN"])133    print_github_api_limits(gh)134    # make release and upload if shard does not exist135    with tempfile.TemporaryDirectory() as tmpdir:136        shard = make_repodata_shard(137            subdir,138            pkg,139            label,140            feedstock,141            url,142            tmpdir,143            md5_checksum=md5_val,144        )145        if upload_pkg:146            repo = gh.get_repo("conda-forge/releases")147            rel, curr_asts = get_or_make_release(148                repo,149                subdir,150                pkg,151                make_commit=False,152            )153            ast = upload_asset(154                rel,155                curr_asts,156                f"{tmpdir}/{subdir}/{pkg}",157                content_type="application/x-bzip2",158            )159            shard["url"] = ast.browser_download_url160            # we don't upload shards to releases anymore161            # with open(f"{tmpdir}/repodata_shard.json", "w") as fp:162            #     json.dump(shard, fp, sort_keys=True, indent=2)163            # upload_asset(164            #     rel,165            #     curr_asts,166            #     f"{tmpdir}/repodata_shard.json",167            #     content_type="application/json",168            # )169    # push the repodata shard170    if add_shard and not shard_pth_exists:...FlaskServer.py
Source:FlaskServer.py  
...56                return '{"error", "Could not process your request!"}'57            ret = {"id": uid, "response": resp}58            return json.dumps(ret) # '{"id":"%s", "result":"%s"}' % (uid, resp)59        @app.route('/upload_pkg', methods=["POST"])60        def upload_pkg():61            self.updater.loadNewGit(request.files["update"])62            return '{"message": "Upload Successful!"}'63        @app.route('/upload_vers', methods=["POST"])64        def upload_vers():65            self.updater.loadNewVersions(request.files["versions"])66            return '{"message": "Upload Successful!"}'67        @app.route('/upload_full', methods=["POST"])68        def upload_full():69            print request.files70            self.updater.loadNewVersions(request.files["versions"])71            self.updater.loadNewGit(request.files["update"])72            return '{"message": "Upload Successful!"}'73        return app74    def test(self, params=None):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
