How to use serialize_error method in Playwright Python

Best Python code snippet using playwright-python

main.py

Source:main.py Github

copy

Full Screen

...10ALLOWED_EXTENSIONS = set(['csv', 'xlsx'])11GEOCODE_URL = "https://maps.googleapis.com/maps/api/geocode/json"12def serialize_response(data):13 return json.dumps({'rows': data}, indent=2, ensure_ascii=False)14def serialize_error(message):15 return json.dumps({'error': message})16def read_functions(extension):17 dic = {18 'csv': pd.read_csv,19 'xlsx': pd.read_excel20 }21 return dic[extension]22def get_google_results(address):23 """24 Get geocode results from Google Maps Geocoding API.25 26 Note, that in the case of multiple google geocode reuslts, this function returns details of the FIRST result.27 28 @param address: String address as accurate as possible. For Example "18 Grafton Street, Dublin, Ireland"29 @param api_key: String API key if present from google. 30 If supplied, requests will use your allowance from the Google API. If not, you31 will be limited to the free usage of 2500 requests per day.32 @param return_full_response: Boolean to indicate if you'd like to return the full response from google. This33 is useful if you'd like additional location details for storage or parsing later.34 """35 # Set up your Geocoding url36 logging.info("[GOOGLE URL]: init")37 params = {38 "address":address,39 "key":GEOPY.get('AQUEDUCT_GOOGLE_PLACES_PRIVATE_KEY')40 }41 42 # Ping google for the reuslts:43 try:44 with requests.Session() as s:45 s.mount('https://',HTTPAdapter(max_retries=Retry(2, backoff_factor=0.001)))46 r = s.get(url=GEOCODE_URL, params=params, timeout=5)47 48 if r.status_code == requests.codes.ok:49 # Results will be in JSON format - convert to dict using requests functionality50 results = r.json()51 # if there's no results or an error, return empty results.52 if len(results['results']) == 0:53 output = {54 "matched_address" : None,55 "lat": None,56 "lon": None,57 "match": False58 }59 else: 60 answer = results['results'][0]61 output = {62 "matched_address" : answer.get('formatted_address'),63 "lat": answer.get('geometry').get('location').get('lat'),64 "lon": answer.get('geometry').get('location').get('lng'),65 "match":True66 }67 else:68 logging.error(f"[GEOCODER: Get google place]: {r.text}")69 logging.error(f"[GEOCODER- GOOGLE URL]: {r.status_code}")70 output = {71 "matched_address" : None,72 "lat": None,73 "lon": None,74 "match": False75 }76 77 # Append some other details: 78 output['address'] = address79 output['number_of_results'] = len(results['results'])80 output['status'] = results.get('status')81 82 return output83 except Exception as e:84 raise e85def get_latlonrow(x):86 index, row = x87 logging.info(f"{index}")88 if pd.notna(row['address']) or (row['address'] in ('', ' ')):89 address = get_google_results(row['address'])90 return {**address, **row}91 else:92 return {93 **row,94 "matched_address" : None,95 "lat": None,96 "lon": None,97 "match": False,98 "number_of_results": None,99 "status":"Address value not available"100 }101def geocoding(data):102 try:103 logging.info(f'[GeoCode Service] geocoding init:')104 105 with Pool(processes=16) as p:106 output = p.map_async(get_latlonrow, data.iterrows())107 output.wait()108 109 logging.info('[GeoCode Service] geocoding end')110 return output.get() 111 except Exception as e:112 raise e113def geocoder(request):114 # For more information about CORS and CORS preflight requests, see115 # https://developer.mozilla.org/en-US/docs/Glossary/Preflight_request116 # for more information.117 # Set CORS headers for the main request118 request.get_json()119 120 headers = {121 'Access-Control-Allow-Origin': '*',122 'Content-Type': 'application/json'123 }124 # Set CORS headers for the preflight request125 if request.method == 'OPTIONS':126 # Allows GET requests from any origin with the Content-Type127 # header and caches preflight response for an 3600s128 headers.update({129 'Access-Control-Allow-Origin': '*',130 'Access-Control-Allow-Methods': 'GET, POST, OPTIONS',131 'Access-Control-Allow-Headers': 'Content-Type',132 'Access-Control-Max-Age': '3600'133 })134 return ('', 204, headers)135 try:136 logging.info(f'[GeoCode Service] init')137 if request.method == 'POST':138 if 'file' not in request.files:139 return (serialize_error(f'No file provided'), 500, headers)140 141 file = request.files['file']142 extension = file.filename.rsplit('.')[-1].lower()143 if extension in ALLOWED_EXTENSIONS:144 data = read_functions(extension)(request.files.get('file'))145 data.columns = data.columns.str.lower()146 147 logging.info(f'[GeoCode Service] Data loaded: {data.columns}')148 data.rename(columns={'Unnamed: 0': 'row'}, inplace=True)149 150 if 'row' not in data.columns:151 data.insert(loc=0, column='row', value=range(1, 1 + len(data)))152 153 154 data.dropna(axis=1, how='all', inplace=True)155 156 #data.fillna(value=None, method=None, inplace=True)157 logging.info(f'[GeoCode Service] Data loaded; columns cleaned: {data.columns}')158 if 'location_name' not in data.columns:159 data.insert(loc=0, column='location_name', value=range(1, 1 + len(data)))160 logging.info(f'[GeoCode Service] Data loaded; columns cleaned: {data.columns}')161 if 'address' not in data.columns:162 return (serialize_error(f'Address column missing'), 500, headers)163 if len(data) == 0:164 return (serialize_error(f'The file is empty'), 500, headers)165 if len(data) > 500:166 dataSize = len(data)167 return (serialize_error(f'Row number should be less or equal to 500, provided {dataSize} rows'), 500, headers)168 169 return (serialize_response(geocoding(data)), 200, headers)170 else:171 return (serialize_error(f'{extension} is not an allowed file extension'), 500, headers)172 else:173 return (serialize_error(f'request method ({request.method}) not allowed'), 500, headers) 174 except Exception as e:...

Full Screen

Full Screen

Update.py

Source:Update.py Github

copy

Full Screen

...11 from boto3.session import Session12 from botocore.exceptions import ClientError13except ImportError as e:14 module = str(e).split("'")15 response = utility.serialize_error(False, module[1], module[0])16 sys.exit(response)17class Update(object):18 def __init__(self):19 # database init20 self.db = cfg.database["file"]21 self.path = cfg.database["path"]22 self.local_db = os.path.join(self.path, self.db)23 self.access_key = cfg.subscription["access_key"]24 self.secret_key = cfg.subscription["secret_key"]25 self.plan_license = cfg.subscription["plan"]26 def update(self):27 """ callable method - initiate the database update in accordance with plan validity """28 # init authorization29 files = self.authorization()30 try:31 for self.file in files:32 if "update" in self.file:33 self.update_file = self.file34 else:35 self.remote_db = self.file36 if not os.path.isfile(self.local_db):37 print("[+] Deploying new database ...")38 self.download(self.remote_db)39 self.unpack_database()40 else:41 print("[+] Checking update status ...")42 self.download(self.update_file)43 self.check_status(self.update_file)44 except Exception as e:45 response = utility.serialize_error(False, str(e), str(e))46 sys.exit(response)47 def check_status(self, file):48 """ check if new db is available"""49 # set the target file50 file = os.path.join(self.path, file)51 try:52 with open(file, 'r') as f:53 checksum_remote = f.read().strip()54 print("\t[-] Checksum verification", checksum_remote)55 if checksum_remote == utility.checksum(self.local_db):56 print("\t[-] Already updated")57 self.clean()58 else:59 print("\t[-] Database update available")60 self.download(self.remote_db)61 self.unpack_database()62 except Exception as e:63 response = utility.serialize_error(False, str(e), str(e))64 sys.exit(response)65 def download(self, file):66 """ download files"""67 print("\t[-] Downloading", file)68 # set the target file69 self.target = os.path.join(self.path, file)70 try:71 self.bucket.download_file(file, self.target)72 except Exception as e:73 response = utility.serialize_error(False, str(e), str(e))74 sys.exit(response)75 def unpack_database(self):76 """ extract database """77 print("\t[-] Unpacking", self.target)78 try:79 tar = tarfile.open(self.target, 'r:gz')80 tar.extractall('.')81 except Exception as e:82 response = utility.serialize_error(False, str(e), str(e))83 sys.exit(response)84 shutil.move(self.db, self.local_db)85 self.clean()86 def authorization(self):87 """ check authorization """88 # init files dict89 files = []90 try:91 session = Session(aws_access_key_id=self.access_key, aws_secret_access_key=self.secret_key)92 s3 = session.resource('s3')93 self.bucket = s3.Bucket(self.plan_license)94 for file in self.bucket.objects.all():95 files.append(file.key)96 except Exception as e:97 if "Could not connect" in str(e):98 reason = "Connectivity error"99 else:100 code_error = e.response["Error"]["Code"]101 if code_error == "403":102 reason = "Access suspended to: %s" % self.plan_license103 if code_error == "AccessDenied":104 reason = "Access denied to plan: %s" % self.plan_license105 if code_error == "InvalidAccessKeyId":106 reason = "Error on access key: %s" % self.access_key107 if code_error == "SignatureDoesNotMatch":108 reason = "Error on secret key: %s" % self.secret_key109 if code_error == "AuthorizationHeaderMalformed":110 reason = "Empty access key"111 if code_error == "NoSuchBucket":112 reason = "Licensed plan not specified."113 response = utility.serialize_error(False, reason, str(e))114 sys.exit(response)115 return files116 def clean(self):117 """ clean directory"""118 print("[+] Cleaning tmp downloads ...")119 try:120 for file in os.listdir(self.path):121 if "tgz" in file or "update" in file:122 os.remove(os.path.join(self.path, file))123 else:124 pass125 except Exception as e:...

Full Screen

Full Screen

newsletter.py

Source:newsletter.py Github

copy

Full Screen

...15 """ returns MD5 hash of the lowercase version of the email address. """16 email_hash = hashlib.md5(email.encode('utf-8').lower()).hexdigest()17 return email_hash18 @staticmethod19 def serialize_error(error:ApiClientError) -> dict:20 """ 21 convert apiclient error to dict 22 type(error.text) -> str so it has to be converted to dict23 to access the key and values24 """25 return json.loads(error.text)26 def __init__(self, list_id:str):27 """ initialize connection to mailchimp """28 self.mailchimp = MailchimpMarketing.Client()29 self.mailchimp.set_config({30 "api_key": config('MAILCHIMP_KEY'),31 "server": config('MAILCHIMP_SERVER')32 })33 self.list_id = list_id34 def __str__(self):35 """ string representation of the newsletter object """36 return f"Newsletter object for {self.list_id}"37 38 def __eq__(self, other_object):39 """ compares two newsletter objects """40 if isinstance(other_object, Newsletter):41 return self.list_id == other_object.list_id42 return False43 def add_member(self, is_exist_subscribe: bool=True, **member_info):44 """ 45 adds member to the audience list with the member_info.46 is_exist_subscribe determines whether user whose email_address47 already exist in the list should be updated to change their status to48 subscribed. This is useful as users who have unsubscribed might want49 to subscribe back. Moreso, when a user unsubsribed their email_address50 isn't removed from the list only their status is changed. 51 More info on mailchimp docs as regards status and removing members.52 """53 try:54 response = self.mailchimp.lists.add_list_member(self.list_id, member_info)55 return 'added'56 except ApiClientError as error:57 response = self.get_member_info(self.get_email_hash(member_info['email_address']))58 error = Newsletter.serialize_error(error)59 if error['title'] == 'Member Exists' and response['status'] == 'unsubscribed':60 if is_exist_subscribe:61 return self.subscribe(response['contact_id'])62 return error['title']63 def get_all_members(self):64 """ gets all the members in a list """65 try:66 response = self.mailchimp.lists.get_list_members_info(self.list_id)67 return response68 except ApiClientError as error:69 error = Newsletter.serialize_error(error)70 return error71 def get_member_info(self, member_email_hash):72 """ gets info of a specific member """73 try:74 response = self.mailchimp.lists.get_list_member(self.list_id, member_email_hash)75 return response76 except ApiClientError as error:77 error = Newsletter.serialize_error(error)78 return error79 def update_contact(self, member_email_hash, **member_info):80 """ update the member info of the member that owns the email_hash """81 try:82 response = self.mailchimp.lists.update_list_member(self.list_id, member_email_hash, member_info)83 return 'updated'84 except ApiClientError as error:85 error = Newsletter.serialize_error(error)86 return error87 88 def subscribe(self, member_email_hash):89 """ change a specific member status to subscribed """90 return self.update_contact(member_email_hash, status='subscribed')91 def unsubscribe(self, member_email_hash):92 """ change a specific member status to unsubscribed """93 return self.update_contact(member_email_hash, status='unsubscribed')94list_id = 'a8eb3204d1'...

Full Screen

Full Screen

Search.py

Source:Search.py Github

copy

Full Screen

...15 # set the CVE as uppercase16 self.id = self.id.upper()17 # check if valid CVE18 if not self.id.startswith('CVE-'):19 response = utility.serialize_error(False, self.id, "Not a valid CVE identifier")20 return response21 # load CVE data22 response = json.loads(Information(self.id).get_info())23 exploits = json.loads(Exploitation(self.id).get_exploits())24 if response['description'] != []:25 # new tag added to search whenever an exploits is available26 if exploits['exploitation'] != []:27 response.update(exploits)28 return utility.serialize_data(response)29 def search_cwe(self):30 """ basic search CWE identifiers """31 # set the CWE as uppercase32 self.cwe = self.id.upper()33 # check if valid CWE34 if not self.cwe.startswith('CWE-'):35 response = utility.serialize_error(False, self.id, "Not a valid CWE identifier")36 return response37 # query the database38 self.cur.execute("SELECT title,class,link FROM cwe_db WHERE cwe_id=? ", (self.cwe,))39 cwe_data = self.cur.fetchone()40 if cwe_data:41 # set the CWE data42 title = cwe_data[0]43 cwe_class = cwe_data[1]44 url = cwe_data[2]45 # query the database46 self.cur.execute("SELECT cve_id from map_cwe_cve where cwe_id=? ORDER BY cve_id DESC", (self.cwe,))47 data = self.cur.fetchall()48 if data:49 # init dict50 cve = []51 for cve_id in data:52 cve.append(cve_id[0])53 # set the response54 response = {"id": self.cwe, "parameters": {"title": title, "class": cwe_class, "url": url},55 "vulnerability": cve}56 return utility.serialize_data(response)57 def search_cpe(self):58 """ basic search for CPE identifiers """59 if not self.id.startswith("cpe:/") and not self.id.startswith("cpe:2.3:"):60 response = utility.serialize_error(False, self.id, "Not a valid CPE identifier")61 return response62 # check whether is CPE 2.2 or CPE 2.363 if "cpe:2.3" in self.id:64 col = "cpe23_id"65 if "cpe:/" in self.id:66 col = "cpe_id"67 # query the database68 self.cur.execute("SELECT cve_id FROM map_cpe_cve WHERE {tn} = ? ORDER BY cve_id DESC".format(tn=col),69 self.query)70 data = self.cur.fetchall()71 if data:72 # init dict73 cve = []74 for cve_id in data:...

Full Screen

Full Screen

Playwright tutorial

LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.

Chapters:

  1. What is Playwright : Playwright is comparatively new but has gained good popularity. Get to know some history of the Playwright with some interesting facts connected with it.
  2. How To Install Playwright : Learn in detail about what basic configuration and dependencies are required for installing Playwright and run a test. Get a step-by-step direction for installing the Playwright automation framework.
  3. Playwright Futuristic Features: Launched in 2020, Playwright gained huge popularity quickly because of some obliging features such as Playwright Test Generator and Inspector, Playwright Reporter, Playwright auto-waiting mechanism and etc. Read up on those features to master Playwright testing.
  4. What is Component Testing: Component testing in Playwright is a unique feature that allows a tester to test a single component of a web application without integrating them with other elements. Learn how to perform Component testing on the Playwright automation framework.
  5. Inputs And Buttons In Playwright: Every website has Input boxes and buttons; learn about testing inputs and buttons with different scenarios and examples.
  6. Functions and Selectors in Playwright: Learn how to launch the Chromium browser with Playwright. Also, gain a better understanding of some important functions like “BrowserContext,” which allows you to run multiple browser sessions, and “newPage” which interacts with a page.
  7. Handling Alerts and Dropdowns in Playwright : Playwright interact with different types of alerts and pop-ups, such as simple, confirmation, and prompt, and different types of dropdowns, such as single selector and multi-selector get your hands-on with handling alerts and dropdown in Playright testing.
  8. Playwright vs Puppeteer: Get to know about the difference between two testing frameworks and how they are different than one another, which browsers they support, and what features they provide.
  9. Run Playwright Tests on LambdaTest: Playwright testing with LambdaTest leverages test performance to the utmost. You can run multiple Playwright tests in Parallel with the LammbdaTest test cloud. Get a step-by-step guide to run your Playwright test on the LambdaTest platform.
  10. Playwright Python Tutorial: Playwright automation framework support all major languages such as Python, JavaScript, TypeScript, .NET and etc. However, there are various advantages to Python end-to-end testing with Playwright because of its versatile utility. Get the hang of Playwright python testing with this chapter.
  11. Playwright End To End Testing Tutorial: Get your hands on with Playwright end-to-end testing and learn to use some exciting features such as TraceViewer, Debugging, Networking, Component testing, Visual testing, and many more.
  12. Playwright Video Tutorial: Watch the video tutorials on Playwright testing from experts and get a consecutive in-depth explanation of Playwright automation testing.

Run Playwright Python automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful