How to use serialize_error method in Playwright Python

Best Python code snippet using playwright-python

Run Playwright Python automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Update.py

Source: Update.py Github

copy
1#!/usr/bin/env python3
2# API Python wrapper for The Vulnerability & Threat Intelligence Feed Service
3# Copyright (C) 2013 - 2020 vFeed, Inc. - https://vfeed.io
4
5import os
6import sys
7import shutil
8
9from common import config as cfg
10from common import utils as utility
11
12try:
13    import tarfile
14    from boto3.session import Session
15    from botocore.exceptions import ClientError
16except ImportError as e:
17    module = str(e).split("'")
18    response = utility.serialize_error(False, module[1], module[0])
19    sys.exit(response)
20
21
22class Update(object):
23    def __init__(self):
24
25        # database init
26        self.db = cfg.database["file"]
27        self.path = cfg.database["path"]
28
29        self.local_db = os.path.join(self.path, self.db)
30
31        self.access_key = cfg.subscription["access_key"]
32        self.secret_key = cfg.subscription["secret_key"]
33        self.plan_license = cfg.subscription["plan"]
34
35    def update(self):
36        """ callable method - initiate the database update in accordance with plan validity """
37
38        # init authorization
39        files = self.authorization()
40
41        try:
42            for self.file in files:
43
44                if "update" in self.file:
45                    self.update_file = self.file
46                else:
47                    self.remote_db = self.file
48
49            if not os.path.isfile(self.local_db):
50                print("[+] Deploying new database ...")
51                self.download(self.remote_db)
52                self.unpack_database()
53
54            else:
55                print("[+] Checking update status ...")
56                self.download(self.update_file)
57                self.check_status(self.update_file)
58
59        except Exception as e:
60            response = utility.serialize_error(False, str(e), str(e))
61            sys.exit(response)
62
63    def check_status(self, file):
64        """ check if new db is available"""
65
66        # set the target file
67        file = os.path.join(self.path, file)
68
69        try:
70
71            with open(file, 'r') as f:
72                checksum_remote = f.read().strip()
73
74                print("\t[-] Checksum verification", checksum_remote)
75
76                if checksum_remote == utility.checksum(self.local_db):
77                    print("\t[-] Already updated")
78                    self.clean()
79
80                else:
81                    print("\t[-] Database update available")
82                    self.download(self.remote_db)
83                    self.unpack_database()
84
85        except Exception as e:
86            response = utility.serialize_error(False, str(e), str(e))
87            sys.exit(response)
88
89    def download(self, file):
90        """ download files"""
91
92        print("\t[-] Downloading", file)
93
94        # set the target file
95        self.target = os.path.join(self.path, file)
96
97        try:
98            self.bucket.download_file(file, self.target)
99
100        except Exception as e:
101            response = utility.serialize_error(False, str(e), str(e))
102            sys.exit(response)
103
104    def unpack_database(self):
105        """ extract database """
106
107        print("\t[-] Unpacking", self.target)
108
109        try:
110            tar = tarfile.open(self.target, 'r:gz')
111            tar.extractall('.')
112        except Exception as e:
113            response = utility.serialize_error(False, str(e), str(e))
114            sys.exit(response)
115
116        shutil.move(self.db, self.local_db)
117        self.clean()
118
119    def authorization(self):
120        """ check authorization """
121
122        # init files dict
123        files = []
124
125        try:
126            session = Session(aws_access_key_id=self.access_key, aws_secret_access_key=self.secret_key)
127            s3 = session.resource('s3')
128            self.bucket = s3.Bucket(self.plan_license)
129
130            for file in self.bucket.objects.all():
131                files.append(file.key)
132
133        except Exception as e:
134
135            if "Could not connect" in str(e):
136                reason = "Connectivity error"
137            else:
138                code_error = e.response["Error"]["Code"]
139
140                if code_error == "403":
141                    reason = "Access suspended to: %s" % self.plan_license
142
143                if code_error == "AccessDenied":
144                    reason = "Access denied to plan: %s" % self.plan_license
145
146                if code_error == "InvalidAccessKeyId":
147                    reason = "Error on access key: %s" % self.access_key
148
149                if code_error == "SignatureDoesNotMatch":
150                    reason = "Error on secret key: %s" % self.secret_key
151
152                if code_error == "AuthorizationHeaderMalformed":
153                    reason = "Empty access key"
154
155                if code_error == "NoSuchBucket":
156                    reason = "Licensed plan not specified."
157
158            response = utility.serialize_error(False, reason, str(e))
159            sys.exit(response)
160
161        return files
162
163    def clean(self):
164        """ clean directory"""
165        print("[+] Cleaning tmp downloads ...")
166
167        try:
168            for file in os.listdir(self.path):
169                if "tgz" in file or "update" in file:
170                    os.remove(os.path.join(self.path, file))
171                else:
172                    pass
173
174        except Exception as e:
175            utility.serialize_error(False, "already cleaned", str(e))
176
Full Screen

ajaxmiddleware.py

Source: ajaxmiddleware.py Github

copy
1import json
2import sys, traceback
3
4from django.conf import settings
5from django.db.models.base import ObjectDoesNotExist
6from django.http import HttpResponse, Http404
7from django.utils.translation import ugettext as _
8
9from raven.contrib.django.models import client
10
11
12class AjaxErrorMiddleware(object):
13    '''Return AJAX errors to the browser in a sensible way.
14
15    Includes some code from http://www.djangosnippets.org/snippets/650/
16
17    '''
18
19    # Some useful errors that this middleware will catch.
20    class InputError(Exception):
21        def __init__(self, message):
22            self.message = message
23
24    def process_exception(self, request, exception):
25        if not request.is_ajax(): return
26
27        if isinstance(exception, (ObjectDoesNotExist, Http404)):
28            return self.not_found(request, exception)
29
30        if isinstance(exception, AjaxErrorMiddleware.InputError):
31            return self.bad_request(request, exception)
32
33        return self.server_error(request, exception)
34
35
36    def serialize_error(self, status, message):
37        return HttpResponse(json.dumps({
38                    'status': status,
39                    'message': message}),
40                            status=status)
41
42
43    def not_found(self, request, exception):
44        return self.serialize_error(404, str(exception))
45
46
47    def bad_request(self, request, exception):
48        return self.serialize_error(400, exception.message)
49
50
51    def server_error(self, request, exception):
52        exc_info = sys.exc_info()
53        client.captureException()
54
55        if settings.DEBUG:
56            (exc_type, exc_info, tb) = exc_info
57            message = "%s\n" % exc_type.__name__
58            message += "%s\n\n" % exc_info
59            message += "TRACEBACK:\n"
60            for tb in traceback.format_tb(tb):
61                message += "%s\n" % tb
62            return self.serialize_error(500, message)
63        else:
64            return self.serialize_error(500, _('Internal error'))
65
Full Screen

newsletter.py

Source: newsletter.py Github

copy
1import mailchimp_marketing as MailchimpMarketing
2from mailchimp_marketing.api_client import ApiClientError
3from decouple import config
4import hashlib
5import json
6
7
8class Newsletter:
9    """ 
10        newsletter class using mailchimp.
11
12        Read mailchimp docs to know the required
13        fields in member_info so as to know the 
14        keyword arguments to use. 
15    """
16
17    @staticmethod
18    def get_email_hash(email):
19        """ returns MD5 hash of the lowercase version of the email address. """
20        email_hash = hashlib.md5(email.encode('utf-8').lower()).hexdigest()
21        return email_hash
22
23    @staticmethod
24    def serialize_error(error:ApiClientError) -> dict:
25        """ 
26            convert apiclient error to dict 
27            type(error.text) -> str so it has to be converted to dict
28            to access the key and values
29        """
30        return json.loads(error.text)
31
32    def __init__(self, list_id:str):
33        """ initialize connection to mailchimp """
34        self.mailchimp = MailchimpMarketing.Client()
35        self.mailchimp.set_config({
36        "api_key": config('MAILCHIMP_KEY'),
37        "server": config('MAILCHIMP_SERVER')
38        })
39        self.list_id = list_id
40
41    def __str__(self):
42        """ string representation of the newsletter object """
43        return f"Newsletter object for {self.list_id}"
44    
45    def __eq__(self, other_object):
46        """ compares  two newsletter objects """
47        if isinstance(other_object, Newsletter):
48            return self.list_id == other_object.list_id
49        return False
50
51    def add_member(self, is_exist_subscribe: bool=True, **member_info):
52        """ 
53            adds member to the audience list with the member_info.
54
55            is_exist_subscribe determines whether user whose email_address
56            already exist in the list should be updated to change their status to
57            subscribed. This is useful as users who have unsubscribed might want
58            to subscribe back. Moreso, when a user unsubsribed their email_address
59            isn't removed from the list only their status is changed. 
60            More info on mailchimp docs as regards status and removing members.
61        """
62        try:
63            response = self.mailchimp.lists.add_list_member(self.list_id, member_info)
64            return 'added'
65        except ApiClientError as error:
66            response = self.get_member_info(self.get_email_hash(member_info['email_address']))
67            error = Newsletter.serialize_error(error)
68            if error['title'] == 'Member Exists' and response['status'] == 'unsubscribed':
69                if is_exist_subscribe:
70                    return self.subscribe(response['contact_id'])
71            return error['title']
72
73    def get_all_members(self):
74        """ gets all the members in a list """
75        try:
76            response = self.mailchimp.lists.get_list_members_info(self.list_id)
77            return response
78        except ApiClientError as error:
79            error = Newsletter.serialize_error(error)
80            return error
81
82    def get_member_info(self, member_email_hash):
83        """ gets info of a specific member """
84        try:
85            response = self.mailchimp.lists.get_list_member(self.list_id, member_email_hash)
86            return response
87        except ApiClientError as error:
88            error = Newsletter.serialize_error(error)
89            return error
90
91    def update_contact(self, member_email_hash, **member_info):
92        """ update the member info of the member that owns the email_hash """
93        try:
94            response = self.mailchimp.lists.update_list_member(self.list_id, member_email_hash, member_info)
95            return 'updated'
96        except ApiClientError as error:
97            error = Newsletter.serialize_error(error)
98            return error
99    
100    def subscribe(self, member_email_hash):
101        """ change a specific member status to subscribed """
102        return self.update_contact(member_email_hash, status='subscribed')
103
104    def unsubscribe(self, member_email_hash):
105        """ change a specific member status to unsubscribed """
106        return self.update_contact(member_email_hash, status='unsubscribed')
107
108
109list_id = 'a8eb3204d1'
110newsletter = Newsletter(list_id)
Full Screen

Accelerate Your Automation Test Cycles With LambdaTest

Leverage LambdaTest’s cloud-based platform to execute your automation tests in parallel and trim down your test execution time significantly. Your first 100 automation testing minutes are on us.

Try LambdaTest

Run Python Tests on LambdaTest Cloud Grid

Execute automation tests with Playwright Python on a cloud-based Grid of 3000+ real browsers and operating systems for both web and mobile applications.

Test now for Free
LambdaTestX

We use cookies to give you the best experience. Cookies help to provide a more personalized experience and relevant advertising for you, and web analytics for us. Learn More in our Cookies policy, Privacy & Terms of service

Allow Cookie
Sarah

I hope you find the best code examples for your project.

If you want to accelerate automated browser testing, try LambdaTest. Your first 100 automation testing minutes are FREE.

Sarah Elson (Product & Growth Lead)