How to use upload_pkg method in autotest

Best Python code snippet using autotest_python

aempkg

Source:aempkg Github

copy

Full Screen

...173 # state='present'174 # --------------------------------------------------------------------------------175 def present(self):176 if not self.pkg_uploaded or self.force == 'yes':177 self.upload_pkg()178 if not self.pkg_installed or self.force == 'yes':179 self.install_pkg()180 # --------------------------------------------------------------------------------181 # state='absent'182 # --------------------------------------------------------------------------------183 def absent(self):184 if self.pkg_installed or self.force == 'yes':185 self.uninstall_pkg()186 if self.pkg_uploaded or self.force == 'yes':187 self.delete_pkg()188 # --------------------------------------------------------------------------------189 # state='uploaded'190 # --------------------------------------------------------------------------------191 def uploaded(self):192 if not self.pkg_uploaded:193 self.upload_pkg()194 # --------------------------------------------------------------------------------195 # state='uninstalled'196 # --------------------------------------------------------------------------------197 def uninstalled(self):198 if self.pkg_installed:199 self.uninstall_pkg()200 # --------------------------------------------------------------------------------201 # Upload a package.202 #203 # NOTE: this is ugly and uses curl to upload the package until I can figure204 # out the Python code to do it.205 #206 # WARNING: This leaves the admin_username and admin_password exposed to a ps command.207 #208 # TODO: re-implement this (perhaps using code from209 # http://code.activestate.com/recipes/578846-composing-a-postable-http-request-with-multipartfo/)210 # --------------------------------------------------------------------------------211 def upload_pkg(self):212 if not self.path and not self.url:213 self.module.fail_json(msg='Missing required argument: path or url')214 start_time = time.time()215 while True:216 if self.url:217 file = '/tmp/' + os.path.basename(self.url)218 if not self.module.check_mode:219 cmd="%s -s -w '%%{http_code}' -o %s %s" % (self.curl_cmd, file, self.url)220 (rc, out, err) = self.module.run_command(cmd)221 if rc != 0 or out != '200':222 if os.path.isfile(file):223 os.remove(file)224 self.module.fail_json(msg='upload_pkg: failed to retrieve package: rc=%s stdout=%s stderr=%s' % (rc, out, err))225 self.msg.append('package retrieved')...

Full Screen

Full Screen

releases.py

Source:releases.py Github

copy

Full Screen

1import os2import subprocess3import tempfile4import sys5import click6import rapidjson as json7import github8import tenacity9from .shards import (10 make_repodata_shard,11 get_shard_path,12 shard_exists,13 push_shard,14)15from .metadata import UNDISTRIBUTABLE16from .utils import split_pkg, print_github_api_limits17@tenacity.retry(18 wait=tenacity.wait_random_exponential(multiplier=1, max=10),19 stop=tenacity.stop_after_attempt(5),20 reraise=True,21)22def get_or_make_release(repo, subdir, pkg, repo_pth=None, make_commit=True):23 tag = f"{subdir}/{pkg}"24 try:25 rel = repo.get_release(tag)26 except github.UnknownObjectException:27 repo_sha = make_or_get_commit(28 subdir,29 pkg,30 make_commit=make_commit,31 repo_pth=repo_pth,32 )33 rel = repo.create_git_tag_and_release(34 tag,35 "",36 tag,37 "",38 repo_sha,39 "commit",40 )41 curr_asts = [ast for ast in rel.get_assets()]42 return rel, curr_asts43@tenacity.retry(44 wait=tenacity.wait_random_exponential(multiplier=1, max=10),45 stop=tenacity.stop_after_attempt(5),46 reraise=True,47)48def upload_asset(rel, curr_asts, pth, content_type):49 name = os.path.basename(pth)50 ast = None51 for _ast in curr_asts:52 if _ast.name == name:53 ast = _ast54 break55 print("found asset %s for %s" % (ast, name), flush=True)56 if ast is None:57 ast = rel.upload_asset(pth, content_type=content_type)58 curr_asts.append(ast)59 return ast60@tenacity.retry(61 wait=tenacity.wait_random_exponential(multiplier=1, max=10),62 stop=tenacity.stop_after_attempt(5),63 reraise=True,64)65def make_or_get_commit(subdir, pkg, make_commit=False, repo_pth=None):66 if repo_pth is None:67 repo_pth = "."68 if make_commit:69 subprocess.run(70 f"cd {repo_pth} && git pull --no-edit",71 shell=True,72 check=True,73 )74 subprocess.run(75 f"cd {repo_pth} && git commit --allow-empty -m "76 f"'{subdir}/{pkg} [ci skip] [cf admin skip] ***NO_CI***'",77 shell=True,78 check=True,79 )80 repo_sha = subprocess.run(81 f"cd {repo_pth} && git rev-parse --verify HEAD",82 shell=True,83 capture_output=True,84 ).stdout.decode("utf-8").strip()85 if make_commit:86 subprocess.run(87 f"cd {repo_pth} && git pull --no-edit",88 shell=True,89 check=True,90 )91 subprocess.run(92 f"cd {repo_pth} && git push",93 shell=True,94 check=True,95 )96 return repo_sha97@click.command()98def main():99 """Make a GitHub release of a package and upload the repodata shard.100 This command is meant to be run inside of GitHub actions, triggered on101 repo dispatch events.102 """103 # pull event data104 with open(os.environ["GITHUB_EVENT_PATH"], 'r') as fp:105 event_data = json.load(fp)106 assert event_data["action"] in ["release", "validate"]107 # package info108 subdir = event_data['client_payload']["subdir"]109 pkg = event_data['client_payload']["package"]110 url = event_data['client_payload']["url"]111 label = event_data['client_payload']["label"]112 feedstock = event_data['client_payload']["feedstock"]113 add_shard = event_data['client_payload'].get("add_shard", True)114 md5_val = event_data['client_payload']["md5"]115 print("subdir/package: %s/%s" % (subdir, pkg), flush=True)116 print("url:", url, flush=True)117 print("add shard:", add_shard, flush=True)118 shard_pth = get_shard_path(subdir, pkg)119 shard_pth_exists = shard_exists(shard_pth)120 print("shard exists:", shard_pth_exists, flush=True)121 print("shard path:", shard_pth, flush=True)122 if shard_pth_exists:123 print("shard already exists! not uploading new package!", flush=True)124 sys.exit(0)125 # we are not making github releases anymore126 # if split_pkg(os.path.join(subdir, pkg))[1] in UNDISTRIBUTABLE:127 # upload_pkg = False128 # else:129 # upload_pkg = True130 upload_pkg = False131 # repo info132 gh = github.Github(os.environ["GITHUB_TOKEN"])133 print_github_api_limits(gh)134 # make release and upload if shard does not exist135 with tempfile.TemporaryDirectory() as tmpdir:136 shard = make_repodata_shard(137 subdir,138 pkg,139 label,140 feedstock,141 url,142 tmpdir,143 md5_checksum=md5_val,144 )145 if upload_pkg:146 repo = gh.get_repo("conda-forge/releases")147 rel, curr_asts = get_or_make_release(148 repo,149 subdir,150 pkg,151 make_commit=False,152 )153 ast = upload_asset(154 rel,155 curr_asts,156 f"{tmpdir}/{subdir}/{pkg}",157 content_type="application/x-bzip2",158 )159 shard["url"] = ast.browser_download_url160 # we don't upload shards to releases anymore161 # with open(f"{tmpdir}/repodata_shard.json", "w") as fp:162 # json.dump(shard, fp, sort_keys=True, indent=2)163 # upload_asset(164 # rel,165 # curr_asts,166 # f"{tmpdir}/repodata_shard.json",167 # content_type="application/json",168 # )169 # push the repodata shard170 if add_shard and not shard_pth_exists:...

Full Screen

Full Screen

FlaskServer.py

Source:FlaskServer.py Github

copy

Full Screen

...56 return '{"error", "Could not process your request!"}'57 ret = {"id": uid, "response": resp}58 return json.dumps(ret) # '{"id":"%s", "result":"%s"}' % (uid, resp)59 @app.route('/upload_pkg', methods=["POST"])60 def upload_pkg():61 self.updater.loadNewGit(request.files["update"])62 return '{"message": "Upload Successful!"}'63 @app.route('/upload_vers', methods=["POST"])64 def upload_vers():65 self.updater.loadNewVersions(request.files["versions"])66 return '{"message": "Upload Successful!"}'67 @app.route('/upload_full', methods=["POST"])68 def upload_full():69 print request.files70 self.updater.loadNewVersions(request.files["versions"])71 self.updater.loadNewGit(request.files["update"])72 return '{"message": "Upload Successful!"}'73 return app74 def test(self, params=None):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful