How to use set_redirect method in Playwright Python

Best Python code snippet using playwright-python

rewrite_url.py

Source:rewrite_url.py Github

copy

Full Screen

1# rewrite_url.py V5.0.12#3# Copyright (c) 2022 NetCon Unternehmensberatung GmbH, https://www.netcon-consulting.com4# Author: Marc Dierksen (m.dierksen@netcon-consulting.com)5import re6from urllib.request import urlopen, Request7from urllib.parse import quote8from socket import timeout9from bs4 import BeautifulSoup10ADDITIONAL_ARGUMENTS = ( )11OPTIONAL_ARGUMENTS = True12CONFIG_PARAMETERS = ( "exception_list", "redirect_list", "timeout", "check_redirect", "url_blacklist", "url_whitelist", "substitution_list", "token_list", "annotation_text", "annotation_html" )13USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.66 Safari/537.36"14PATTERN_STRIP = re.compile(r"^https?://(\S+)$", re.IGNORECASE)15TEMPLATE_TOKEN = "${}"16def resolve_redirect(url, request_timeout):17 """18 Resolve redirect URL.19 :type url: str20 :type request_timeout: int21 :rtype: str22 """23 try:24 return urlopen(Request(url, headers={ "User-Agent": USER_AGENT }), timeout=request_timeout).url25 except timeout:26 raise Exception("Timeout redirect '{}'".format(url))27 except Exception:28 raise Exception("Invalid redirect '{}'".format(url))29def check_blacklisted(url, set_whitelist, set_blacklist, name_blacklist):30 """31 Check if URL is blacklisted.32 :type url: str33 :type set_blacklist: set34 :type set_whitelist: set35 :type name_blacklist: str36 """37 result = url_blacklisted(url, set_whitelist, set_blacklist)38 if result is not None:39 if result:40 raise Exception("'{}' listed on '{}'".format(result[0], result[1]))41 raise Exception("'{}' listed on '{}'".format(url, name_blacklist))42def modify_url(url, set_redirect, request_timeout, set_whitelist, set_blacklist, set_checked, name_blacklist, dict_replace):43 """44 Modify URL.45 :type url: str46 :type dict_modified: dict47 :type set_redirect: set48 :type request_timeout: int49 :type set_whitelist: set50 :type set_blacklist: set51 :type set_checked: set52 :type name_blacklist: str53 :type dict_replace: dict54 :rtype: str55 """56 if set_redirect is not None:57 for pattern in set_redirect:58 if re.search(pattern, url) is not None:59 url_redirect = resolve_redirect(url, request_timeout)60 if url_redirect != url:61 if set_checked is not None and url_redirect not in set_checked:62 check_blacklisted(url_redirect, set_whitelist, set_blacklist, name_blacklist)63 set_checked.add(url_redirect)64 url = url_redirect65 break66 if dict_replace is not None:67 for (pattern, replace) in dict_replace.items():68 url_replace = re.sub(pattern, replace, url)69 if url_replace != url:70 url = url_replace71 break72 return url73def modify_text(content, set_exception, dict_modified, set_redirect, request_timeout, set_whitelist, set_blacklist, set_checked, name_blacklist, dict_replace):74 """75 Rewrite URLs in text body.76 :type content: str77 :type set_exception: set78 :type dict_modified: dict79 :type set_redirect: set80 :type request_timeout: int81 :type set_whitelist: set82 :type set_blacklist: set83 :type set_checked: set84 :type name_blacklist: str85 :type dict_replace: dict86 :rtype: str or None87 """88 dict_url = dict()89 for url in set(re.findall(PATTERN_URL, content)):90 for pattern in set_exception:91 if re.search(pattern, url) is not None:92 break93 else:94 if url in dict_modified:95 dict_url[url] = dict_modified[url]96 else:97 url_modified = modify_url(url, set_redirect, request_timeout, set_whitelist, set_blacklist, set_checked, name_blacklist, dict_replace)98 if url_modified != url:99 dict_modified[url] = url_modified100 dict_url[url] = url_modified101 if dict_url:102 index_shift = 0103 for match in re.finditer(PATTERN_URL, content):104 url = match.group()105 if url in dict_url:106 url_modified = dict_url[url]107 content = content[:match.start() + index_shift] + url_modified + content[match.end() + index_shift:]108 index_shift += len(url_modified) - len(url)109 return content110 return None111def modify_html(content, charset, set_exception, dict_modified, set_redirect, request_timeout, set_whitelist, set_blacklist, set_checked, name_blacklist, dict_replace):112 """113 Rewrite URLs in html body.114 :type content: str115 :type charset: str116 :type set_exception: set117 :type dict_modified: dict118 :type set_redirect: set119 :type request_timeout: int120 :type set_whitelist: set121 :type set_blacklist: set122 :type set_checked: set123 :type name_blacklist: str124 :type dict_replace: dict125 :rtype: str or None126 """127 soup = BeautifulSoup(content, features="html5lib")128 content_modified = False129 for url in { a["href"] for a in soup.findAll("a", href=re.compile(r"^(?!mailto:).+", re.IGNORECASE)) }:130 for pattern in set_exception:131 if re.search(pattern, url) is not None:132 break133 else:134 modified = False135 if url in dict_modified:136 url_modified = dict_modified[url]137 modified = True138 else:139 url_modified = modify_url(url, set_redirect, request_timeout, set_whitelist, set_blacklist, set_checked, name_blacklist, dict_replace)140 if url_modified != url:141 dict_modified[url] = url_modified142 modified = True143 if modified:144 match = re.search(PATTERN_STRIP, url)145 if match is None:146 url_strip = None147 else:148 url_strip = match.group(1)149 for a in soup.find_all("a", href=url):150 a["href"] = url_modified151 if a.text == url or a.text == url_strip:152 a.string.replace_with(url_modified)153 if a.has_attr("title"):154 title = a["title"]155 if title == url or title == url_strip:156 a["title"] = url_modified157 content_modified = True158 if content_modified:159 try:160 content = soup.encode(charset).decode(charset)161 except Exception:162 raise Exception("Error converting soup to string")163 dict_url = dict()164 for url in set(re.findall(PATTERN_URL, html2text(content))):165 for pattern in set_exception:166 if re.search(pattern, url) is not None:167 break168 else:169 if url in dict_modified:170 dict_url[url] = dict_modified[url]171 else:172 url_modified = modify_url(url, set_redirect, request_timeout, set_whitelist, set_blacklist, set_checked, name_blacklist, dict_replace)173 if url_modified != url:174 dict_modified[url] = url_modified175 dict_url[url] = url_modified176 if dict_url:177 for tag in soup.findAll(text=PATTERN_URL):178 tag_text = tag.text179 index_shift = 0180 for match in re.finditer(PATTERN_URL, tag_text):181 url = match.group()182 if url in dict_url:183 url_modified = dict_url[url]184 tag_text = tag_text[:match.start() + index_shift] + url_modified + tag_text[match.end() + index_shift:]185 index_shift += len(url_modified) - len(url)186 tag.string.replace_with(tag_text)187 try:188 content = soup.encode(charset).decode(charset)189 except Exception:190 raise Exception("Error converting soup to string")191 content_modified = True192 if content_modified:193 return content194 return None195def run_command(input, log, config, additional, optional):196 """197 Rewrite URLs in text and html body by resolving redirects (and optionally check if resolved URL is blacklisted) and replacing URL parts.198 :type input: str199 :type log: str200 :type config: TupleConfig201 :type additional: TupleAdditional202 :type optional: dict203 """204 if not (config.redirect_list or config.substitution_list):205 return ReturnCode.NONE206 try:207 email = read_email(input)208 except Exception as ex:209 write_log(log, ex)210 return ReturnCode.DETECTED211 if config.exception_list:212 try:213 set_exception = set(url_list(config.exception_list))214 except Exception as ex:215 write_log(log, ex)216 return ReturnCode.ERROR217 set_exception = { re.compile(url2regex(url), re.IGNORECASE) for url in set_exception }218 else:219 set_exception = set()220 dict_modified = dict()221 if config.redirect_list:222 try:223 set_redirect = set(url_list(config.redirect_list))224 except Exception as ex:225 write_log(log, ex)226 return ReturnCode.DETECTED227 if not set_redirect:228 write_log(log, "Redirect list is empty")229 return ReturnCode.DETECTED230 set_redirect = { re.compile(url2regex(url), re.IGNORECASE) for url in set_redirect }231 if config.check_redirect:232 if config.url_blacklist:233 try:234 set_blacklist = set(url_list(config.url_blacklist))235 except Exception as ex:236 write_log(log, ex)237 return ReturnCode.ERROR238 set_blacklist = { re.compile(url2regex(url), re.IGNORECASE) for url in set_blacklist }239 else:240 set_blacklist = None241 if config.url_whitelist:242 try:243 set_whitelist = set(url_list(config.url_whitelist))244 except Exception as ex:245 write_log(log, ex)246 return ReturnCode.ERROR247 set_whitelist = { re.compile(url2regex(url), re.IGNORECASE) for url in set_whitelist }248 else:249 set_whitelist = None250 set_checked = set()251 else:252 set_blacklist = None253 set_whitelist = None254 set_checked = None255 else:256 set_redirect = None257 set_blacklist = None258 set_whitelist = None259 set_checked = None260 if config.substitution_list:261 try:262 set_substitution = set(lexical_list(config.substitution_list))263 except Exception as ex:264 write_log(log, ex)265 return ReturnCode.DETECTED266 if not set_substitution:267 write_log(log, "Substitution list is empty")268 return ReturnCode.DETECTED269 try:270 dict_replace = { re.compile(substitution.split("\n")[0]): substitution.split("\n")[1] for substitution in set_substitution }271 except Exception:272 write_log(log, "Invalid substitution list")273 return ReturnCode.DETECTED274 if config.token_list:275 try:276 set_token = set(lexical_list(config.token_list))277 except Exception as ex:278 write_log(log, ex)279 return ReturnCode.DETECTED280 if not set_token:281 write_log(log, "Token list is empty")282 return ReturnCode.DETECTED283 tokens_missing = set_token - optional.keys()284 if tokens_missing:285 write_log(log, "Missing optional arguments for tokens {}".format(str(tokens_missing)[1:-1]))286 return ReturnCode.DETECTED287 dict_token = { token: optional[token] for token in set_token }288 for (pattern, replace) in dict_replace.items():289 modified = False290 for (token, value) in dict_token.items():291 token = TEMPLATE_TOKEN.format(token)292 if token in replace:293 replace = replace.replace(token, quote(value))294 modified = True295 if modified:296 dict_replace[pattern] = replace297 else:298 dict_replace = None299 email_modified = False300 try:301 part = extract_part(email, "text/plain")302 except Exception as ex:303 write_log(log, ex)304 return ReturnCode.DETECTED305 if part is not None:306 (part, charset, content) = part307 try:308 content = modify_text(content, set_exception, dict_modified, set_redirect, config.timeout, set_whitelist, set_blacklist, set_checked, config.url_blacklist, dict_replace)309 except Exception as ex:310 write_log(log, ex)311 return ReturnCode.DETECTED312 if content is not None:313 if config.annotation_text:314 try:315 annotation_content = annotation(config.annotation_text).text316 except Exception as ex:317 write_log(log, ex)318 return ReturnCode.DETECTED319 content = annotation_content + content320 if charset != CHARSET_UTF8 and not string_ascii(annotation_content):321 charset = CHARSET_UTF8322 if HEADER_CTE in part:323 del part[HEADER_CTE]324 part.set_payload(content, charset=charset)325 email_modified = True326 try:327 part = extract_part(email, "text/html")328 except Exception as ex:329 write_log(log, ex)330 return ReturnCode.DETECTED331 if part is not None:332 (part, charset, content) = part333 try:334 content = modify_html(content, charset, set_exception, dict_modified, set_redirect, config.timeout, set_whitelist, set_blacklist, set_checked, config.url_blacklist, dict_replace)335 except Exception as ex:336 write_log(log, ex)337 return ReturnCode.DETECTED338 if content is not None:339 if config.annotation_html:340 try:341 annotation_content = annotation(config.annotation_html).html342 except Exception as ex:343 write_log(log, ex)344 return ReturnCode.DETECTED345 content = annotate_html(content, annotation_content)346 if charset != CHARSET_UTF8 and not string_ascii(annotation_content):347 charset = CHARSET_UTF8348 if HEADER_CTE in part:349 del part[HEADER_CTE]350 part.set_payload(content, charset=charset)351 email_modified = True352 if email_modified:353 try:354 with open(input, "wb") as f:355 f.write(email.as_bytes())356 except Exception:357 write_log(log, "Error writing '{}'".format(input))358 return ReturnCode.DETECTED359 return ReturnCode.MODIFIED...

Full Screen

Full Screen

auth.py

Source:auth.py Github

copy

Full Screen

...21from pylons.controllers.util import redirect, abort22from pylons import url23import hashlib24log = logging.getLogger(__name__)25def set_redirect():26 # TODO: This function is called 30+ times per page when not logged in, more than seems needed27 if not session.get('redirect_to', None):28 session['redirect_to'] = request.path_info29 session.save()30def set_role(msg):31 if not session.get('role_error', None):32 session['role_error'] = msg33 session.save()34class ValidZookeeprUser(UserIn):35 """36 Checks that the signed in user is one of the users specified when setting up37 the user management API.38 """39 def __init__(self):40 pass41 def check(self, app, environ, start_response):42 if not environ.get('REMOTE_USER'):43 set_redirect()44 raise NotAuthenticatedError('Not Authenticated')45 person = Person.find_by_email(environ['REMOTE_USER'])46 if Person is None:47 environ['auth_failure'] = 'NO_USER'48 raise NotAuthorizedError(49 'You are not one of the users allowed to access this resource.'50 )51 return app(environ, start_response)52class HasZookeeprRole(HasAuthKitRole):53 def check(self, app, environ, start_response):54 """55 Should return True if the user has the role or56 False if the user doesn't exist or doesn't have the role.57 In this implementation role names are case insensitive.58 """59 if not environ.get('REMOTE_USER'):60 if self.error:61 raise self.error62 set_redirect()63 raise NotAuthenticatedError('Not authenticated')64 for role in self.roles:65 if not self.role_exists(role):66 raise NotAuthorizedError("No such role %r exists"%role)67 person = Person.find_by_email(environ['REMOTE_USER'])68 if person is None:69 raise users.AuthKitNoSuchUserError(70 "No such user %r" % environ['REMOTE_USER'])71 if not person.activated:72 #set_role('User account must be activated')73 raise NotAuthorizedError(74 "User account must be activated"75 )76 if self.all:77 for role in self.roles:78 if not self.user_has_role(person, role):79 if self.error:80 raise self.error81 else:82 set_role("User doesn't have the role %s"%role.lower())83 raise NotAuthorizedError(84 "User doesn't have the role %s"%role.lower()85 )86 return app(environ, start_response)87 else:88 for role in self.roles:89 if self.user_has_role(person, role):90 return app(environ, start_response)91 if self.error:92 raise self.error93 else:94 set_role("User doesn't have any of the specified roles")95 raise NotAuthorizedError(96 "User doesn't have any of the specified roles"97 )98 def role_exists(self, role):99 """100 Returns ``True`` if the role exists, ``False`` otherwise. Roles are101 case insensitive.102 """103 role = Role.find_by_name(role)104 if role is not None:105 return True106 return False107 def user_has_role(self, person, role):108 """109 Returns ``True`` if the user has the role specified, ``False``110 otherwise. Raises an exception if the user doesn't exist.111 """112 if not self.role_exists(role.lower()):113 raise users.AuthKitNoSuchRoleError("No such role %r"%role.lower())114 for role_ in person.roles:115 if role_.name == role.lower():116 return True117 return False118class IsSameZookeeprUser(UserIn):119 """120 Checks that the signed in user is one of the users specified when setting up121 the user management API.122 """123 def __init__(self, id):124 self.id = int(id)125 def check(self, app, environ, start_response):126 if not environ.get('REMOTE_USER'):127 set_redirect()128 raise NotAuthenticatedError('Not Authenticated')129 person = Person.find_by_email(environ['REMOTE_USER'])130 if person is None:131 environ['auth_failure'] = 'NO_USER'132 raise NotAuthorizedError(133 'You are not one of the users allowed to access this resource.'134 )135 if self.id != person.id:136 set_role("User doesn't have any of the specified role")137 raise NotAuthorizedError(138 "User doesn't have any of the specified roles"139 )140 return app(environ, start_response)141class IsActivatedZookeeprUser(UserIn):142 """143 Checks that the signed in user is activated144 """145 def __init__(self):146 pass147 def check(self, app, environ, start_response):148 if not environ.get('REMOTE_USER'):149 set_redirect()150 raise NotAuthenticatedError('Not Authenticated')151 person = Person.find_by_email(environ['REMOTE_USER'])152 if person is None:153 set_redirect()154 environ['auth_failure'] = 'NO_USER'155 raise NotAuthorizedError(156 'You are not one of the users allowed to access this resource.'157 )158 if not person.activated:159 set_redirect()160 if 'is_active' in dir(meta.Session):161 meta.Session.flush()162 meta.Session.close()163 redirect(url(controller="person", action="activate"))164 return app(environ, start_response)165class IsSameZookeeprSubmitter(UserIn):166 """167 Checks that the signed in user is one of the users specified when setting up168 the user management API.169 """170 def __init__(self, proposal_id):171 self.proposal_id = int(proposal_id)172 def check(self, app, environ, start_response):173 if not environ.get('REMOTE_USER'):174 set_redirect()175 raise NotAuthenticatedError('Not Authenticated')176 person = Person.find_by_email(environ['REMOTE_USER'])177 if person is None:178 environ['auth_failure'] = 'NO_USER'179 raise NotAuthorizedError(180 'You are not one of the users allowed to access this resource.'181 )182 proposal = Proposal.find_by_id(self.proposal_id)183 if proposal is None:184 raise NotAuthorizedError(185 "Proposal doesn't exist"186 )187 if person not in proposal.people:188 set_role("User doesn't have any of the specified roles")189 raise NotAuthorizedError(190 "User doesn't have any of the specified roles"191 )192 return app(environ, start_response)193class IsSameZookeeprFundingSubmitter(UserIn):194 """195 Checks that the signed in user is one of the users specified when setting up196 the user management API.197 """198 def __init__(self, funding_id):199 self.funding_id = int(funding_id)200 def check(self, app, environ, start_response):201 if not environ.get('REMOTE_USER'):202 raise NotAuthenticatedError('Not Authenticated')203 person = Person.find_by_email(environ['REMOTE_USER'])204 if person is None:205 environ['auth_failure'] = 'NO_USER'206 raise NotAuthorizedError(207 'You are not one of the users allowed to access this resource.'208 )209 funding = Funding.find_by_id(self.funding_id)210 if funding is None:211 raise NotAuthorizedError(212 "Funding Request doesn't exist"213 )214 if person != funding.person:215 set_role("User doesn't have any of the specified roles")216 raise NotAuthorizedError(217 "User doesn't have any of the specified roles"218 )219 return app(environ, start_response)220class IsSameZookeeprAttendee(UserIn):221 """222 Checks that the signed in user is the user for which the given invoice223 is for.224 """225 def __init__(self, invoice_id):226 self.invoice_id = int(invoice_id)227 def check(self, app, environ, start_response):228 if not environ.get('REMOTE_USER'):229 set_redirect()230 raise NotAuthenticatedError('Not Authenticated')231 person = Person.find_by_email(environ['REMOTE_USER'])232 if person is None:233 environ['auth_failure'] = 'NO_USER'234 raise NotAuthorizedError(235 'You are not one of the users allowed to access this resource.'236 )237 invoice = Invoice.find_by_id(self.invoice_id)238 if invoice is None:239 raise NotAuthorizedError(240 "Invoice doesn't exist"241 )242 if person.id <> invoice.person_id:243 set_role("Invoice is not for this user")244 raise NotAuthorizedError(245 "Invoice is not for this user"246 )247 return app(environ, start_response)248class HasUniqueKey(Permission):249 def check(self, app, environ, start_response):250 url = request.path251 fields = dict(request.GET)252 if fields.has_key('hash'):253 dburl = URLHash.find_by_hash(fields['hash']).url254 if dburl is not None:255 if url.startswith(dburl):256 return app(environ, start_response)257 raise NotAuthorizedError(258 "You are not authorised to view this page"259 )260class IsSameZookeeprRegistration(UserIn):261 """262 Checks that the signed in user is the user this registration belongs263 to.264 """265 def __init__(self, registration_id):266 self.registration_id = int(registration_id)267 def check(self, app, environ, start_response):268 if not environ.get('REMOTE_USER'):269 set_redirect()270 raise NotAuthenticatedError('Not Authenticated')271 person = Person.find_by_email(environ['REMOTE_USER'])272 if person is None:273 environ['auth_failure'] = 'NO_USER'274 raise NotAuthorizedError(275 'You are not one of the users allowed to access this resource.'276 )277 registration = Registration.find_by_id(self.registration_id)278 if registration is None:279 raise NotAuthorizedError(280 "Registration doesn't exist"281 )282 if person.id <> registration.person_id:283 set_role("Registration is not for this user");...

Full Screen

Full Screen

utils.py

Source:utils.py Github

copy

Full Screen

...24 if vbr1 != vbr2 :25 port_in = get_exit(vbr2)26 bridge = vbr2 # Set final bridge27 port = port_out28 set_redirect(cond_name, vbr2, port_in, port_out,index)29 logging.info("Redirect from source to First PoP completed")30 # Redirecting through the PoPs now31 logging.debug("Redirect traffic through PoPs")32 for i in range(1,len(ordered_pop)):33 port_1, vbr1 = get_switch(ordered_pop[i-1][0])34 logging.debug("port coming is: "+port_in+" with vbridge "+vbr1)35 port_2, vbr2 = get_switch(ordered_pop[i][0])36 if vbr1 == vbr2:37 logging.debug("port to redirect is: "+port_out+" with vbridge "+vbr2)38 set_redirect(cond_name, vbr1, port_1, port_2,index)39 else:40 logging.debug("redirecting through different bridges")41 port_ex = get_exit(vbr1)42 set_redirect(cond_name, vbr1, port_1, port_ex,index)43 port_in = get_exit(vbr2)44 set_redirect(cond_name, vbr2, port_in, port_2,index)45 bridge = vbr246 port = port_247 logging.debug(" Inter PoP redirections completed ")48 port_out, exitbridge = get_switch(out_seg)49 if exitbridge == 'notsure':50 port_out = get_exit(bridge)51 elif exitbridge != bridge :52 logging.debug("redirecting through different bridges")53 port_ex = get_exit(bridge)54 set_redirect(cond_name, bridge, port, port_ex,index)55 port = get_exit(exitbridge)56 #set_redirect(cond_name, exitbridge, port_in, port_out)57 bridge = exitbridge58 else:59 bridge = exitbridge60 set_redirect(cond_name, bridge, port, port_out,index)61 # Need to implement (or not) going from last PoP to Outer Segment -- leaving Wan 62 #Just add to the flow array 63 logging.info("Posting new flow completed")64 return (flag)65def get_switch(seg):66 logging.debug("Incoming request for segment: "+seg)67 conn = e.connect()68 segment = pyt.get(seg)69 logging.debug("Segment to look in the database is: "+segment)70 query = conn.execute('SELECT port_id, bridge_name FROM connectivity WHERE segment="%s";'%segment)71 dt = query.fetchone()72 #TODO implement try 73 port, switch = dt[0],dt[1]74 logging.info("get_switch method completed. Returning: "+port+" "+switch+". If segment is 0.0.0.0/0, then it may not be correct")75 if segment == '0.0.0.0/0':76 switch = 'notsure'77 return (port, switch)78def get_exit(vbr):79 logging.debug("Incoming request to find exit port of vbridge: "+vbr)80 conn = e.connect()81 query = conn.execute('SELECT port_id FROM connectivity WHERE segment="0.0.0.0/0" AND bridge_name="%s";'%vbr)82 dt = query.fetchone()83 port = dt[0]84 logging.info("get_exit method completed. Returning: "+port )85 return (port )86def set_condition(cond_name, source, dest,index):87 logging.debug("Incoming set_condition call")88 s_url = 'operations/vtn-flow-condition:set-flow-condition'89 username, password, host, url, headers = get_info()90 data = {'input': {'name': cond_name, 'vtn-flow-match': [91 {'index': index, 'vtn-inet-match': {'source-network': source, 'destination-network': dest}}]}}92 '''93 this curl --user "username":"pass" -H "Content-type: application/json" -X POST http://localhost:8181/restconf/operations/vtn-flow-condition:set-flow-condition94 # -d '{"input":{"name":"cond1", "vtn-flow-match":[{"index":"1",95 # "vtn-inet-match":{"source-network":"10.0.0.1/32",96 # "destination-network":"10.0.0.3/32"}}]}}'97 '''98 logging.debug("Sending request to VTN to implement condition "+cond_name)99 r = requests.post(url + s_url, headers=headers,100 auth=(username, password), json=data)101 logging.info("Got this as response: " +str(r) )102 if not r.status_code == 200:103 logging.error('FLOW COND ERROR ' + str(r.status_code))104 return (r.status_code)105def delete_condition(cond_name):106 s_url = 'operations/vtn-flow-condition:remove-flow-condition'107 username, password, host, url, headers = get_info()108 data = {'input': {'name': cond_name}}109 logging.debug("Sending request to delete condition "+cond_name)110 r = requests.post(url+s_url, headers=headers, auth=(username, password), json=data)111 logging.info("Got response:" +str(r))112 if not r.status_code == 200:113 logging.error("Condition removal ERROR " + str(r.status_code))114 return (r.status_code)115def set_redirect(cond_name, vbr, port_id_in, port_id_out,index):116 s_url = 'operations/vtn-flow-filter:set-flow-filter'117 logging.debug("Incoming set_redirect call")118 username, password, host, url, headers = get_info()119 vtn_name = get_vtn_name()120 data = {"input": {"output": "false", "tenant-name": vtn_name, "bridge-name": vbr, "interface-name": port_id_in, "vtn-flow-filter": [121 {"index": index, "condition": cond_name, "vtn-redirect-filter": {"redirect-destination": {"bridge-name": vbr, "interface-name": port_id_out}, "output": "true"}}]}}122 '''123 this: curl --user "username":"pass" -H "Content-type: application/json" -X POST http://localhost:8181/restconf/operations/vtn-flow-filter:set-flow-filter124 -d '{"input":{"output":"false","tenant-name":"vtn1", "bridge-name":"vbr", interface-name":"if5", "vtn-flow-filter":[{"condition":"cond_1","index":"1","vtn-redirect-filter":125 {"redirect-destination":{"bridge-name":"vbr1","interface-name":"if3"},"output":"true"}}]}}'126 '''127 logging.debug("Sending request to set condition: "+str(data))128 r = requests.post(url + s_url, headers=headers,129 auth=(username, password), json=data)...

Full Screen

Full Screen

tests_gsapi.py

Source:tests_gsapi.py Github

copy

Full Screen

...37 _e = l['engine_name']38 return self.app.post(f'/api/link/create/{_e}', headers=headers, data=json.dumps(l))39 def consume_link(self, e: str):40 return self.app.get(f'/api/link/consume/{e}')41 def set_redirect(self, id_: int, r: str):42 return self.app.get(f'/api/link/set_redirect/{id_}/{r}')43 def verify_redirect(self, id_: int):44 return self.app.get(f'/api/link/verify_redirect/{id_}')45 def set_not_working(self, id_: int):46 return self.app.get(f'/api/link/set_not_working/{id_}')47 ########################48 #### tests ####49 ########################50 # todo verify database counts and integrity51 def test_homepage(self):52 r = self.app.get("/")53 r_json = json.loads(r.data)54 self.assertEqual(r_json['success'], 1)55 self.assertEqual(r_json['results'], "Server works just fine :)")56 def test_consume_links_OK(self):57 for l in links_data:58 response = self.consume_link(l['engine_name'])59 r_json = json.loads(response.data)60 # asserts61 self.assertEqual(r_json['success'], 1)62 def test_consume_links_WARNING(self):63 # first consume64 for l in links_data:65 response = self.consume_link(l['engine_name'])66 r_json = json.loads(response.data)67 self.assertEqual(r_json['success'], 1)68 # second consume, warnings69 for l in links_data:70 response = self.consume_link(l['engine_name'])71 r_json = json.loads(response.data)72 self.assertEqual(r_json['success'], 0)73 def test_set_redirect_links_OK(self):74 for i, l in enumerate(links_data, 1):75 response = self.set_redirect(i, 'http://google.com')76 r_json = json.loads(response.data)77 self.assertEqual(r_json['success'], 1)78 def test_verify_redirects_OK(self):79 # set redirects80 for i, l in enumerate(links_data, 1):81 response = self.set_redirect(i, 'http://google.com')82 r_json = json.loads(response.data)83 self.assertEqual(r_json['success'], 1)84 # verify redirects85 for i, l in enumerate(links_data, 1):86 response = self.verify_redirect(i)87 assert response.headers['location'] == 'http://google.com'88 def test_set_not_working(self):89 for i, l in enumerate(links_data, 1):90 response = self.set_not_working(i)91 r_json = json.loads(response.data)92 self.assertEqual(r_json['success'], 1)93 def test_whole_set1(self):94 # first consume95 for l in links_data:96 response = self.consume_link(l['engine_name'])97 r_json = json.loads(response.data)98 self.assertEqual(r_json['success'], 1)99 # set redirects100 for i, l in enumerate(links_data, 1):101 response = self.set_redirect(i, 'http://google.com')102 r_json = json.loads(response.data)103 self.assertEqual(r_json['success'], 1)104 # second consume, warnings105 for l in links_data:106 response = self.consume_link(l['engine_name'])107 r_json = json.loads(response.data)108 self.assertEqual(r_json['success'], 0)109 # verify redirects110 for i, l in enumerate(links_data, 1):111 response = self.verify_redirect(i)112 assert response.headers['location'] == 'http://google.com'113 # verify redirects114 for i, l in enumerate(links_data, 1):115 response = self.verify_redirect(i)116 assert response.headers['location'] == 'http://google.com'117 def test_whole_set2(self):118 # first consume119 for l in links_data:120 response = self.consume_link(l['engine_name'])121 r_json = json.loads(response.data)122 self.assertEqual(r_json['success'], 1)123 # second consume, warnings124 for l in links_data:125 response = self.consume_link(l['engine_name'])126 r_json = json.loads(response.data)127 self.assertEqual(r_json['success'], 0)128 # set not working129 for i, l in enumerate(links_data, 1):130 response = self.set_not_working(i)131 r_json = json.loads(response.data)132 self.assertEqual(r_json['success'], 1)133 # first consume134 for l in links_data:135 response = self.consume_link(l['engine_name'])136 r_json = json.loads(response.data)137 self.assertEqual(r_json['success'], 1)138 # set redirects139 for i, l in enumerate(links_data, 1):140 response = self.set_redirect(i, 'http://google.com')141 r_json = json.loads(response.data)142 self.assertEqual(r_json['success'], 1)143 # verify redirects144 for i, l in enumerate(links_data, 1):145 response = self.verify_redirect(i)146 assert response.headers['location'] == 'http://google.com'147 # second consume, warnings148 for l in links_data:149 response = self.consume_link(l['engine_name'])150 r_json = json.loads(response.data)151 self.assertEqual(r_json['success'], 0)152 # verify redirects153 for i, l in enumerate(links_data, 1):154 response = self.verify_redirect(i)...

Full Screen

Full Screen

Playwright tutorial

LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.

Chapters:

  1. What is Playwright : Playwright is comparatively new but has gained good popularity. Get to know some history of the Playwright with some interesting facts connected with it.
  2. How To Install Playwright : Learn in detail about what basic configuration and dependencies are required for installing Playwright and run a test. Get a step-by-step direction for installing the Playwright automation framework.
  3. Playwright Futuristic Features: Launched in 2020, Playwright gained huge popularity quickly because of some obliging features such as Playwright Test Generator and Inspector, Playwright Reporter, Playwright auto-waiting mechanism and etc. Read up on those features to master Playwright testing.
  4. What is Component Testing: Component testing in Playwright is a unique feature that allows a tester to test a single component of a web application without integrating them with other elements. Learn how to perform Component testing on the Playwright automation framework.
  5. Inputs And Buttons In Playwright: Every website has Input boxes and buttons; learn about testing inputs and buttons with different scenarios and examples.
  6. Functions and Selectors in Playwright: Learn how to launch the Chromium browser with Playwright. Also, gain a better understanding of some important functions like “BrowserContext,” which allows you to run multiple browser sessions, and “newPage” which interacts with a page.
  7. Handling Alerts and Dropdowns in Playwright : Playwright interact with different types of alerts and pop-ups, such as simple, confirmation, and prompt, and different types of dropdowns, such as single selector and multi-selector get your hands-on with handling alerts and dropdown in Playright testing.
  8. Playwright vs Puppeteer: Get to know about the difference between two testing frameworks and how they are different than one another, which browsers they support, and what features they provide.
  9. Run Playwright Tests on LambdaTest: Playwright testing with LambdaTest leverages test performance to the utmost. You can run multiple Playwright tests in Parallel with the LammbdaTest test cloud. Get a step-by-step guide to run your Playwright test on the LambdaTest platform.
  10. Playwright Python Tutorial: Playwright automation framework support all major languages such as Python, JavaScript, TypeScript, .NET and etc. However, there are various advantages to Python end-to-end testing with Playwright because of its versatile utility. Get the hang of Playwright python testing with this chapter.
  11. Playwright End To End Testing Tutorial: Get your hands on with Playwright end-to-end testing and learn to use some exciting features such as TraceViewer, Debugging, Networking, Component testing, Visual testing, and many more.
  12. Playwright Video Tutorial: Watch the video tutorials on Playwright testing from experts and get a consecutive in-depth explanation of Playwright automation testing.

Run Playwright Python automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful