How to use _max_processes method in autotest

Best Python code snippet using autotest_python

taar_amodump.py

Source:taar_amodump.py Github

copy

Full Screen

1#!/bin/env python2import click3import json4import logging5import logging.config6import typing7from six.moves import urllib, queue8from six import text_type9from .taar_utils import store_json_to_s310import requests11from requests_toolbelt.threaded import pool12AMO_DUMP_BUCKET = "telemetry-parquet"13AMO_DUMP_PREFIX = "telemetry-ml/addon_recommender/"14AMO_DUMP_FILENAME = "extended_addons_database"15DEFAULT_AMO_REQUEST_URI = "https://addons.mozilla.org/api/v3/addons/search/"16QUERY_PARAMS = "?app=firefox&sort=created&type=extension"17logger = logging.getLogger("amo_database")18"""19JSON from addons.mozilla.org server are parsed using subclasses of20JSONSchema to declare the types that we want to extract.21The top level object that we are parsing is `AMOAddonInfo`.22All classes of type JSONSchema have a `meta` dictionary attribute23which define keys which we are interested in extracting.24The key is the name of the attribute which we are interested in25retrieving.26The value defines how we want to coerce the inbound data. There are 327general cases:28 1) subclasses of JSONSchema are nested objects which are29 represented as dictionaries30 2) List<T> or Dict<T,T> types where values are coerced31 recursively using the marshal function.32 3) Everything else. These are callable type defintions. Usually33 Python built-ins like str or bool. It is possible to define34 custom callables if you want to do custom data conversion.35"""36class JSONSchema:37 pass38class AMOAddonFile(JSONSchema):39 meta = {"id": int, "platform": str, "status": str, "is_webextension": bool}40class AMOAddonVersion(JSONSchema):41 meta = {"files": typing.List[AMOAddonFile]}42class AMOAddonInfo(JSONSchema):43 meta = {44 "guid": str,45 "categories": typing.Dict[str, typing.List[str]],46 "default_locale": str,47 "description": typing.Dict[str, str],48 "name": typing.Dict[str, str],49 "current_version": AMOAddonVersion,50 "ratings": typing.Dict[str, float],51 "summary": typing.Dict[str, str],52 "tags": typing.List[str],53 "weekly_downloads": int,54 }55class AMODatabase:56 def __init__(self, worker_count):57 """58 Just setup the page_count59 """60 self._max_processes = worker_count61 uri = DEFAULT_AMO_REQUEST_URI + QUERY_PARAMS62 response = requests.get(uri)63 jdata = json.loads(response.content.decode("utf8"))64 self._page_count = jdata["page_count"]65 def fetch_addons(self):66 addon_map = self._fetch_pages()67 self._fetch_versions(addon_map)68 final_result = {}69 for k, v in list(addon_map.items()):70 if "first_create_date" in v:71 final_result[k] = v72 logger.info("Final addon set includes %d addons." % len(final_result))73 return final_result74 def _fetch_pages(self):75 addon_map = {}76 urls = []77 for i in range(1, self._page_count + 1):78 url = "{0}{1}&page={2}".format(DEFAULT_AMO_REQUEST_URI, QUERY_PARAMS, i)79 urls.append(url)80 logger.info("Processing AMO urls")81 p = pool.Pool.from_urls(urls, num_processes=self._max_processes)82 p.join_all()83 self._handle_responses(p, addon_map)84 # Try failed requests85 exceptions = p.exceptions()86 p = pool.Pool.from_exceptions(exceptions, num_processes=self._max_processes)87 p.join_all()88 self._handle_responses(p, addon_map)89 return addon_map90 def _fetch_versions(self, addon_map):91 logger.info("Processing Version urls")92 q = queue.Queue()93 logger.info("Filling initial verson page queue")94 def iterFactory(guid_map):95 for guid in list(guid_map.keys()):96 yield "https://addons.mozilla.org/api/v3/addons/addon/%s/versions/" % guid97 def chunker(seq, size):98 collector = []99 for term in seq:100 collector.append(term)101 if len(collector) == size:102 yield collector103 collector = []104 # Yield any dangling records we collected105 if len(collector) > 0:106 yield collector107 total_processed_addons = 0108 for chunk in chunker(iterFactory(addon_map), 500):109 for i, url in enumerate(chunk):110 q.put({"method": "GET", "url": url, "timeout": 2.0})111 logger.info("Queue setup - processing initial version page requests")112 logger.info("%d requests to process" % q.qsize())113 p = pool.Pool(q, num_processes=self._max_processes)114 p.join_all()115 logger.info("Pool completed - processing responses")116 last_page_urls = self._handle_version_responses(p)117 logger.info("Captured %d last page urls" % len(last_page_urls))118 total_processed_addons += len(last_page_urls)119 # Try processing the exceptions once120 p = pool.Pool.from_exceptions(121 p.exceptions(), num_processes=self._max_processes122 )123 p.join_all()124 last_page_urls.extend(self._handle_version_responses(p))125 # Now fetch the last version of each addon126 logger.info("Processing last page urls: %d" % len(last_page_urls))127 p = pool.Pool.from_urls(last_page_urls, num_processes=self._max_processes)128 p.join_all()129 self._handle_last_version_responses(p, addon_map)130 # Try processing exceptions once131 p = pool.Pool.from_exceptions(132 p.exceptions(), num_processes=self._max_processes133 )134 p.join_all()135 self._handle_last_version_responses(p, addon_map)136 logger.info(137 "Processed %d addons with version info" % total_processed_addons138 )139 def _handle_last_version_responses(self, p, addon_map):140 for i, resp in enumerate(p.responses()):141 try:142 if resp.status_code == 200:143 jdata = json.loads(resp.content.decode("utf8"))144 results = jdata["results"]145 raw_guid = resp.url.split("addon/")[1].split("/versions")[0]146 guid = urllib.parse.unquote(raw_guid)147 create_date = results[-1]["files"][0]["created"]148 record = addon_map.get(guid, None)149 if record is not None:150 record["first_create_date"] = create_date151 except Exception as e:152 # Skip this record153 logger.error(e)154 return addon_map155 def _handle_responses(self, p, addon_map):156 i = 0157 for resp in p.responses():158 try:159 if resp.status_code == 200:160 jdata = json.loads(resp.content.decode("utf8"))161 results = jdata["results"]162 for record in results:163 if i % 500 == 0:164 logger.info("Still parsing addons...")165 guid = record["guid"]166 addon_map[guid] = record167 i += 1168 except Exception as e:169 # Skip this record170 logger.error(e)171 def _handle_version_responses(self, p):172 page_urls = []173 for i, resp in enumerate(p.responses()):174 try:175 if resp.status_code == 200:176 jdata = json.loads(resp.content.decode("utf8"))177 page_count = int(jdata["page_count"])178 if page_count > 1:179 url = resp.url + "?page=%d" % page_count180 else:181 url = resp.url182 page_urls.append(url)183 except Exception as e:184 # Skip this record185 logger.error(e)186 return page_urls187class Undefined:188 """189 This value is used to disambiguate None vs a non-existant value on190 dict.get() lookups191 """192 pass193def marshal(value, name, type_def):194 serializers = {195 typing.List: list,196 typing.Dict: dict,197 str: text_type,198 text_type: text_type,199 int: int,200 float: float,201 bool: bool,202 }203 if issubclass(type_def, JSONSchema):204 obj = {}205 for attr_name, attr_type_def in list(type_def.meta.items()):206 attr_value = value.get(attr_name, Undefined)207 if attr_value is not Undefined:208 # Try marshalling the value209 obj[attr_name] = marshal(attr_value, attr_name, attr_type_def)210 return obj211 elif issubclass(type_def, typing.Container) and type_def not in [str, bytes]:212 if issubclass(type_def, typing.List):213 item_type = type_def.__args__[0]214 return [marshal(j, name, item_type) for j in value]215 elif issubclass(type_def, typing.Dict):216 if value is None:217 return None218 k_cast, v_cast = type_def.__args__219 dict_vals = [220 (marshal(k, name, k_cast), marshal(v, name, v_cast))221 for k, v in list(value.items())222 ]223 return dict(dict_vals)224 else:225 return serializers[type_def](value)226@click.command()227@click.option("--date", required=True)228@click.option("--workers", default=100)229@click.option("--s3-prefix", default=AMO_DUMP_PREFIX)230@click.option("--s3-bucket", default=AMO_DUMP_BUCKET)231def main(date, workers, s3_prefix, s3_bucket):232 amodb = AMODatabase(int(workers))233 addon_map = amodb.fetch_addons()234 try:235 store_json_to_s3(236 json.dumps(addon_map), AMO_DUMP_FILENAME, date, s3_prefix, s3_bucket237 )238 logger.info(239 "Completed uploading s3://%s/%s%s.json"240 % (s3_bucket, s3_prefix, AMO_DUMP_FILENAME)241 )242 except Exception:243 logger.exception("Error uploading data to S3")244if __name__ == "__main__":...

Full Screen

Full Screen

metadata_art_gen.py

Source:metadata_art_gen.py Github

copy

Full Screen

1#!/usr/bin/env python32import json3import copy4from PIL import Image, ImageDraw, ImageFont5from multiprocessing import Pool, Lock6import os7import time8asset_path = "input/assets"9bg_path = ""10fg_path = ""11img_out_path = f"./output/images"12data_out_path = f"./output/metadata"13start_id = 114total_supply = 1000015lock = None16# Multithreaded implementation17def startCreatingMulti(_max_processes=None):18 if not _max_processes:19 _max_processes = os.cpu_count()20 max_processes = _max_processes21 global lock22 lock = Lock()23 print('##################')24 print('# Generative Art')25 print('# - Create your NFT collection')26 print('###########')27 print()28 print('start creating NFTs.')29 ids = range(start_id, total_supply+start_id)30 # Start processes31 with Pool(max_processes) as pool:32 pool.map(creator, ids)33def getMetadata(id):34 metadata_path = "input/metadata"35 with open(f"{metadata_path}/{id}.json") as f:36 data = json.load(f)37 return data38# Worker thread function39def creator(id):40 print('-----------------')41 print(f'creating NFT {id}')42 bg_source = Image.open(bg_path)43 metadata = getMetadata(id)44 attributes = metadata['attributes']45 attributesMetadata = [a for a in attributes if 'Background' not in a['trait_type']]46 new_data = [{47 'new_data' : f'new hello'48 }]49 tempMetadata = {50 'dna': metadata['dna'],51 'name': f"new {metadata['name']}",52 'description': f"new {metadata['name']}",53 'image': f"{metadata['image']}",54 'edition': f"{metadata['edition']}",55 'attributes': new_data + attributesMetadata56 }57 bg = copy.copy(bg_source)58 for data in attributes:59 trait = data['trait_type']60 value = ''.join(data['value'].split())61 # Skip any traits62 if "Background" in trait:63 continue64 65 layer = Image.open(f"{asset_path}/{trait}/{value}.png")66 bg.paste(67 layer,68 (0,0),69 layer70 )71 # fg = Image.open(f"{fg_path}")72 # bg.paste(73 # fg,74 # (0,0),75 # fg76 # )77 with lock:78 bg.save(f"{img_out_path}/{id}.png")79 with open(f"{data_out_path}/{id}.json", 'w') as f:80 f.write(json.dumps(tempMetadata))81def timer(f):82 start = time.time()83 f()84 end = time.time()85 return end-start86# Initiate code87# startCreating()88doneFlag = "done.flag"89if os.path.exists(doneFlag):90 os.remove(doneFlag)91delta = timer(startCreatingMulti)92with open(doneFlag,'w') as f:93 f.write(f"Done in {delta} seconds")...

Full Screen

Full Screen

art_preview.py

Source:art_preview.py Github

copy

Full Screen

1#!/usr/bin/env python32import copy3from PIL import Image, ImageDraw, ImageFont4from multiprocessing import Pool, Lock5import os6import time7import random8asset_path = "input/assets"9bg_path = "input/Base/Base.png"10fg_path = ""11img_out_path = f"./output/images"12start_id = 113total_supply = 2514lock = None15assets = {}16# Multithreaded implementation17def startCreatingMulti(_max_processes=None):18 if not _max_processes:19 _max_processes = os.cpu_count()20 max_processes = _max_processes21 global lock22 lock = Lock()23 print('start creating NFTs.')24 for asset in os.listdir(asset_path):25 assets[asset] = [trait.split('.')[0] for trait in os.listdir(f"{asset_path}/{asset}")]26 # Start processes27 with Pool(max_processes) as pool:28 pool.map(creator, assets)29# Worker thread function30def creator(testAsset):31 print('-----------------')32 print(f'creating NFT {testAsset}')33 try:34 os.mkdir(f"{img_out_path}/{testAsset}")35 except OSError as error:36 pass37 bg_source = Image.open(bg_path)38 for testValue in assets[testAsset]:39 bg = copy.copy(bg_source)40 for asset in assets:41 value = testValue42 if asset != testAsset:43 rand = len(assets[asset])-144 value = assets[asset][random.randint(0,rand)]45 layer = Image.open(f"{asset_path}/{asset}/{value}.png")46 bg.paste(47 layer,48 (0,0),49 layer50 )51 with lock:52 bg.save(f"{img_out_path}/{testAsset}/{testValue}.png")53def timer(f):54 start = time.time()55 f()56 end = time.time()57 return end-start58# Initiate code59doneFlag = "done.flag"60if os.path.exists(doneFlag):61 os.remove(doneFlag)62delta = timer(startCreatingMulti)63with open(doneFlag,'w') as f:64 f.write(f"Done in {delta} seconds")...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful