Best JavaScript code snippet using playwright-internal
opener.py
Source:opener.py  
...139    def split_segments(self, fs_url):        140        match = self.re_fs_url.match(fs_url)        141        return match        142    143    def get_opener(self, name):144        """Retrieve an opener for the given protocol145        146        :param name: name of the opener to open147        :raises NoOpenerError: if no opener has been registered of that name148        149        """150        if name not in self.registry:151            raise NoOpenerError("No opener for %s" % name)152        index = self.registry[name]153        return self.openers[index]        154    155    def add(self, opener):156        """Adds an opener to the registry157        158        :param opener: a class derived from fs.opener.Opener159        160        """161        162        index = len(self.openers)163        self.openers[index] = opener164        for name in opener.names:165            self.registry[name] = index166    167    def parse(self, fs_url, default_fs_name=None, writeable=False, create_dir=False, cache_hint=True):168        """Parses a FS url and returns an fs object a path within that FS object169        (if indicated in the path). A tuple of (<FS instance>, <path>) is returned.170        171        :param fs_url: an FS url172        :param default_fs_name: the default FS to use if none is indicated (defaults is OSFS)173        :param writeable: if True, a writeable FS will be returned174        :param create_dir: if True, then the directory in the FS will be created175        176        """177                   178        orig_url = fs_url     179        match = self.split_segments(fs_url)180        181        if match:            182            fs_name, credentials, url1, url2, path = match.groups()183            if credentials:184                fs_url = '%s@%s' % (credentials, url1)185            else:186                fs_url = url2187            path = path or ''188            fs_url = fs_url or ''189            if ':' in fs_name:190                fs_name, sub_protocol = fs_name.split(':', 1)191                fs_url = '%s://%s' % (sub_protocol, fs_url)192            if '!' in path:193                paths = path.split('!')194                path = paths.pop()195                fs_url = '%s!%s' % (fs_url, '!'.join(paths))196            197            fs_name = fs_name or self.default_opener198        else:199            fs_name = default_fs_name or self.default_opener200            fs_url = _expand_syspath(fs_url) 201            path = ''           202    203        fs_name,  fs_name_params = _parse_name(fs_name)        204        opener = self.get_opener(fs_name)205        206        if fs_url is None:207            raise OpenerError("Unable to parse '%s'" % orig_url)        208        fs, fs_path = opener.get_fs(self, fs_name, fs_name_params, fs_url, writeable, create_dir)        209        fs.cache_hint(cache_hint)210        211        if fs_path and iswildcard(fs_path):212            pathname, resourcename = pathsplit(fs_path or '')213            if pathname:214                fs = fs.opendir(pathname)215            return fs, resourcename216                            217        fs_path = join(fs_path, path)218        ...cetsms.py
Source:cetsms.py  
...57      atdata = data[1].split(' ')58      uid =atdata[0]59      psw = atdata[1]60      cj = cookielib.CookieJar()61      opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))62      opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]63      url = 'http://117.211.100.44:8080/index.php'64      url_data = urlencode({'userid':uid,'password':psw,'Submit':'Submit'})65      usock = opener.open(url, url_data) 66      url = 'http://117.211.100.44:8080/index.php'67      url_data = urlencode({'module':'com_views','task':'student_attendance_view'})68      atand = opener.open(url, url_data)69      page=atand.read()70      line=page.split('\n')71      new=set([])72      for i in range (len(line)) :    73	mat2=re.match(r'.*Logged in as.*',line[i],re.M)74	if mat2: 75	  i = i+276	  nam=re.match(r'.*<td>(.*?)</td>.*',line[i])77	  if nam:78	    str0= nam.group(1)79	    self.response.out.write("Name : "+str0)80	mat1=re.match(r'.*Attendance till date :.*',line[i],re.M)81	if mat1:	82	  self.response.out.write("</br>"+line[i])       83    elif data[0] == 'send' and len(data)==2:84      self.response.out.write("Received" )85      smsdata = data[1].split('>',1)  #contain all message exclude 'send'86      key = smsdata[0]87      if key == 'a' :  #admit new user88	secondparse = smsdata[1].split('>',3)89	name = secondparse[0]90	mobno = secondparse[1]91	pro = secondparse[2]92	pswd =secondparse[3]93	94	deleteContact = db.GqlQuery("SELECT * FROM Registration WHERE ANCESTOR IS :1 AND mobno =:2 and pro =:3", registration_key(),mobno,pro).get()95	if deleteContact is not None:96	  code = deleteContact.code97	  if code == '0':98	    sys.exit(1)99	  db.delete(deleteContact)100	cd = db.GqlQuery("SELECT * FROM Code WHERE ANCESTOR IS :1 ", code_key()).get() 101	if cd is not None:102	  tcode = cd.current103	  code = str(int(tcode)+1)104	else:105	  code = '1'106	deletecode = db.GqlQuery("SELECT * FROM Code WHERE ANCESTOR IS :1 ", code_key())107	if deletecode is not None:108	  db.delete(deletecode)	109	codedata = Code(parent = code_key())110	codedata.current = code111	codedata.put()112	usrdata = Registration(parent=registration_key())			113	usrdata.code = code114	usrdata.mobno = mobno115	usrdata.name = name116	usrdata.pro = pro117	usrdata.pwd = pswd118	usrdata.put()119	tono =mobno120	msg="Application activated, |"+pro+"|"+code+"|"121	urltosend = name+">"+mobno+">"+pswd+">"+pro+">"+tono+">"+msg    #url to send122	if pro == 'u': #ultoo.com123	  now = datetime.datetime.now()124	  if mobno == '9037755659':125	    msg=msg126	  else:127	    msg2= name+'('+mobno+'):'+msg128	    msg=msg2129	  cj = cookielib.CookieJar()130	  opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))131	  opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]132	  url = 'http://ultoo.in/login.php'133	  url_data = urlencode({'LoginMobile':'9037755659','LoginPassword':'47951','RememberMe':'1','submit2':'LOGIN HERE'})134	  usock = opener.open(url, url_data)135	  day=now.day136	  day2=str(day)137	  if len(day2) == 1 :138	    day2='0'+str(day)139	  mon=now.month140	  mon2=str(mon)141	  if len(mon2) == 1 :142	    mon2='0'+str(mon)   143	  send_sms_url = 'http://ultoo.in/home.php'144	  send_sms_data = urlencode({'MessageLength':'140','MobileNos':tono,'Message': msg,'SendNow': 'Send Now','SendNowBtn':'Send Now','Day':day2,'Month':mon2,'Year':now.year,'TimeInterval':'09:00 - 09:59','CCode':''})145		    146	  opener.addheaders = [('Referer','http://ultoo.com/home.php')]  147	  sms_sent_page = opener.open(send_sms_url,send_sms_data)		  148	elif pro =='s':  #sitetosms.com149	  msg2= '('+name+'):'+msg150	  msg=msg2	  151	  cj = cookielib.CookieJar()152	  opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))153	  opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]154	  url = 'http://www.site2sms.com/auth.asp'155	  url_data = urlencode({'txtCCode':'91','userid':mobno,'Password':pswd,'Submit':'Login'})	    	    156	  usock = opener.open(url, url_data)157	  send_sms_url = 'http://www.site2sms.com/user/send_sms_next.asp'158	  send_sms_data = urlencode({'txtCategory':'40','txtGroup':'0','txtLeft': len(msg) - TOTAL,'txtMessage': msg, 'txtMobileNo': tono,'txtUsed':len(msg)})      159	  opener.addheaders = [('Referer','http://www.site2sms.com/user/send_sms.asp')]  160	  sms_sent_page = opener.open(send_sms_url,send_sms_data)		  161	elif pro == 'w': #way2sms.com162	  msg2= '('+name+'):'+msg163	  msg=msg2	  164	  cj = cookielib.CookieJar()165	  opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))166	  opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]167	  url='http://site4.way2sms.com/Login1.action'168	  url_data = urlencode({'username':mobno,'password':pswd,'button':'Login'})169	  usock = opener.open(url, url_data)170	    ###############3171	  send_sms_url = 'http://site4.way2sms.com/quicksms.action'172	  send_sms_data = urlencode({'HiddenAction':'instantsms','catnamedis':'Birthday','Action': 'sa65sdf656fdfd','chkall': 'on','MobNo':tono,'textArea':msg})173	  opener.addheaders = [('Referer','http://site4.way2sms.com/jsp/InstantSMS.jsp')]  174	  sms_sent_page = opener.open(send_sms_url,send_sms_data)175	elif pro == 'b': #160by2.com176	  msg2= '('+name+'):'+msg177	  msg=msg2	  178	  cj = cookielib.CookieJar()179	  opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))180	  opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]181	  url='http://160by2.com/re-login'182	  opener.addheaders = [('Referer','Http://160by2.com/')]  183	  url_data = urlencode({'username':mobno,'password':pswd,'button':'Login'})184	  usock = opener.open(url, url_data)185	  self.response.out.write("login :"+mobno+pswd)186	  send_sms_url = 'http://160by2.com/SendSMSAction'187	  send_sms_data = urlencode({'hid_exists':'no','action1':'sf55sa5655sdf5','mobile1':tono,'msg1':msg,'sel_month':0,'sel_day':0,'sel_year':0,'sel_hour':'hh','sel_minute':'mm','sel_cat':0,'messid_0':'','messid_1':'','messid_2':'','messid_3':''})188	  opener.addheaders = [('Referer','http://160by2.com/SendSMS?id=9AEF17EFFA4653F27EEFEC3CA5F4BA30.05')]  189	  sms_sent_page = opener.open(send_sms_url,send_sms_data)			190	#end else if  191	msg="New user registerd ."+"Name :"+name +",   Mob : "+mobno+",  In :"+pro192	cj = cookielib.CookieJar()193	opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))194	opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]195	url = 'http://www.site2sms.com/auth.asp'196	url_data = urlencode({'txtCCode':'91','userid':'9037755659','Password':'9054687','Submit':'Login'})	    	    197	usock = opener.open(url, url_data)198	send_sms_url = 'http://www.site2sms.com/user/send_sms_next.asp'199	send_sms_data = urlencode({'txtCategory':'40','txtGroup':'0','txtLeft': len(msg) - TOTAL,'txtMessage': msg, 'txtMobileNo': '9037755659','txtUsed':len(msg)})      200	opener.addheaders = [('Referer','http://www.site2sms.com/user/send_sms.asp')]  201	sms_sent_page = opener.open(send_sms_url,send_sms_data)	  202	  203	#usock = opener.open(url, url_data)	#must add new url that i new user registerd to me204				      		205      elif key == 'm':  #mailing to request206	secondparse2 = smsdata[1].split('>',2)207	code = secondparse2[0]208	tono = secondparse2[1]209	msg = secondparse2[2]210	my_text = msg.replace('_', ' ')211	msg=my_text212	#213	214	#215	rslt = db.GqlQuery("SELECT * FROM Registration WHERE ANCESTOR IS :1 AND code =:2",registration_key(),code).get()			    216	if rslt is not None:217	  mobno = rslt.mobno218	  pswd = rslt.pwd  219	  pro = rslt.pro220	  name = rslt.name221	else:222	  sys.exit(1)223	urltosend = name+">"+mobno+">"+pswd+">"+pro+">"+tono+">"+msg  224	if pro == 'u': #ultoo.com225	  226	  227	  #Logging into the SMS Site228	  cj = cookielib.CookieJar()229	  opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))230	  231	  # To fool the website as if a Web browser is visiting the site232	  opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]233	  234	  #	  235	  url = 'http://ultoo.in/login.php'236	  url_data = urlencode({'LoginMobile':'9037755659','LoginPassword':'47951','RememberMe':'1','submit2':'LOGIN HERE'})237	  #try:238	  usock = opener.open(url, url_data)239	  now = datetime.datetime.now()240	  day=now.day241	  day2=str(day)242	  if len(day2) == 1 :243	    day2='0'+str(day)244	   245	  mon=now.month246	  mon2=str(mon)247	  if len(mon2) == 1 :248	    mon2='0'+str(mon)   249	  send_sms_url = 'http://ultoo.in/home.php'250	  send_sms_data = urlencode({'MessageLength':'140','MobileNos':tono,'Message': msg,'SendNow': 'Send Now','SendNowBtn':'Send Now','Day':day2,'Month':mon2,'Year':now.year,'TimeInterval':'09:00 - 09:59','CCode':''})251	  opener.addheaders = [('Referer','http://ultoo.com/home.php')]  252	  sms_sent_page = opener.open(send_sms_url,send_sms_data)253	  254	  255	elif pro =='s':  #sitetosms.com256	  #self.response.out.write("Site 2 sms  > "+urltosend)257	  msg2= '('+name+'):'+msg258	  msg=msg2	  259	  cj = cookielib.CookieJar()260	  opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))261	  opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]262	  url = 'http://www.site2sms.com/auth.asp'263	  url_data = urlencode({'txtCCode':'91','userid':mobno,'Password':pswd,'Submit':'Login'})	    	    264	  usock = opener.open(url, url_data)265	  send_sms_url = 'http://www.site2sms.com/user/send_sms_next.asp'266	  send_sms_data = urlencode({'txtCategory':'40','txtGroup':'0','txtLeft': len(msg) - TOTAL,'txtMessage': msg, 'txtMobileNo': tono,'txtUsed':len(msg)})      267	  opener.addheaders = [('Referer','http://www.site2sms.com/user/send_sms.asp')]  268	  sms_sent_page = opener.open(send_sms_url,send_sms_data)		  269	elif pro == 'w': #way2sms.com270	  msg2= '('+name+'):'+msg271	  msg=msg2	  272	  cj = cookielib.CookieJar()273	  opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))274	  opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]275	  url='http://site4.way2sms.com/Login1.action'276	  url_data = urlencode({'username':mobno,'password':pswd,'button':'Login'})277	  usock = opener.open(url, url_data)278	  send_sms_url = 'http://site4.way2sms.com/quicksms.action'279	  send_sms_data = urlencode({'HiddenAction':'instantsms','catnamedis':'Birthday','Action': 'sa65sdf656fdfd','chkall': 'on','MobNo':tono,'textArea':msg})280	  opener.addheaders = [('Referer','http://site4.way2sms.com/jsp/InstantSMS.jsp')]  281	  sms_sent_page = opener.open(send_sms_url,send_sms_data)282	elif pro == 'b': #160by2.com283	  self.response.out.write("enter")284	  msg2= '('+name+'):'+msg285	  msg=msg2	  286	  cj = cookielib.CookieJar()287	  opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))288	  opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]289	  url='http://160by2.com/re-login'290	  opener.addheaders = [('Referer','Http://160by2.com/')]  291	  url_data = urlencode({'username':mobno,'password':pswd,'button':'Login'})292	  usock = opener.open(url, url_data)293	  self.response.out.write("login :"+mobno+pswd)294	  send_sms_url = 'http://160by2.com/SendSMSAction'295	  send_sms_data = urlencode({'hid_exists':'no','action1':'sf55sa5655sdf5','mobile1':tono,'msg1':msg,'sel_month':0,'sel_day':0,'sel_year':0,'sel_hour':'hh','sel_minute':'mm','sel_cat':0,'messid_0':'','messid_1':'','messid_2':'','messid_3':''})296	  opener.addheaders = [('Referer','http://160by2.com/SendSMS?id=9AEF17EFFA4653F27EEFEC3CA5F4BA30.05')]  297	  sms_sent_page = opener.open(send_sms_url,send_sms_data)	  298	299	300      #if else end here301      elif key == 'x':302	self.response.out.write("<br>" )303	commands =smsdata[1].split('>')304	c1 = commands[0]305	c2 = commands[1]306	if c1 == "all" :307	  if c2 == "disp":308	    self.response.out.write("<br>" )309	    self.response.out.write("<Table>")310	    self.response.out.write("<tr>")311	    self.response.out.write("<td>No</td><td>Code</td><td>Name</td><td>Moblie Number</td><td>Password</td><td>Provider</td>")312	    self.response.out.write("</tr>")313	    members = db.GqlQuery("SELECT * FROM Registration WHERE ANCESTOR IS :1 ", registration_key())314	    count = 1315	    for member in members:316	      self.response.out.write("<tr>")317	      self.response.out.write("<td>"+str(count)+"</td><td>"+member.code +"</td><td>"+member.name+"</td><td>"+member.mobno+"</td><td>"+member.pwd+"</td><td>"+member.pro  )318	      self.response.out.write("</tr>" )	319	      count =count +1320	    self.response.out.write("</Table>")321	  elif c2 == "del":322	    deleteallreg = db.GqlQuery("SELECT * FROM Registration WHERE ANCESTOR IS :1 ", registration_key())323	    if deleteallreg is not None:324	      db.delete(deleteallreg)	    325	    deleteall = db.GqlQuery("SELECT * FROM Code WHERE ANCESTOR IS :1 ", code_key())326	    if deleteall is not None:327	      db.delete(deleteall)328	    self.response.out.write("Database cleared successfully" )329	elif c1 == "block":330	  blkuser = db.GqlQuery("SELECT * FROM Registration WHERE ANCESTOR IS :1 and code=:2 ", registration_key(),c2).get()331	  if blkuser is not None:332	    name = blkuser.name333	    mobno =blkuser.mobno334	    pswd = blkuser.pwd335	    pro =blkuser.pro336	    db.delete(blkuser)337	  usrdata = Registration(parent=registration_key())			338	  usrdata.code = '0'339	  usrdata.mobno = mobno340	  usrdata.name = name341	  usrdata.pro = pro342	  usrdata.pwd = pswd343	  usrdata.put()	344	  self.response.out.write("User Blocked :"+name+"  "+mobno )345	elif c1=="unblock":346	  ublkuser = db.GqlQuery("SELECT * FROM Registration WHERE ANCESTOR IS :1 and mobno=:2 ", registration_key(),c2).get()347	  if ublkuser is not None:348	    self.response.out.write("User Unblocked :"+ublkuser.name+"  "+ublkuser.mobno )349	    db.delete(ublkuser)	  350	    351      352          353    else:354                  op = urllib2.build_opener()355		  url='http://thesaurus.com/browse/'		  356		  #self.response.out.write("<html><body>")357		  l=[]358		  l.append(url)359		  l.append(data[0])360		  url=''.join(l)361		  intake=op.open(url)362		  page=intake.read()363		  line=page.split('\n')364		  new=set([])365		  for i in range (len(line)) :366			mat1=re.match(r'.*Antonyms:.*',line[i],re.M)367			if mat1:368			      i=i+2
...test_url_policy_open.py
Source:test_url_policy_open.py  
...61            self.follow_reference_calls = []62        def follow_reference(self, url):63            self.follow_reference_calls.append(url)64            return self._reference_values[url]65    def make_branch_opener(self, should_follow_references, references,66                           unsafe_urls=None):67        policy = _BlacklistPolicy(should_follow_references, unsafe_urls)68        opener = self.StubbedBranchOpener(references, policy)69        return opener70    def test_check_initial_url(self):71        # check_and_follow_branch_reference rejects all URLs that are not72        # allowed.73        opener = self.make_branch_opener(None, [], set(['a']))74        self.assertRaises(75            BadUrl, opener.check_and_follow_branch_reference, 'a')76    def test_not_reference(self):77        # When branch references are forbidden, check_and_follow_branch_reference78        # does not raise on non-references.79        opener = self.make_branch_opener(False, ['a', None])80        self.assertEqual(81            'a', opener.check_and_follow_branch_reference('a'))82        self.assertEqual(['a'], opener.follow_reference_calls)83    def test_branch_reference_forbidden(self):84        # check_and_follow_branch_reference raises BranchReferenceForbidden if85        # branch references are forbidden and the source URL points to a86        # branch reference.87        opener = self.make_branch_opener(False, ['a', 'b'])88        self.assertRaises(89            BranchReferenceForbidden,90            opener.check_and_follow_branch_reference, 'a')91        self.assertEqual(['a'], opener.follow_reference_calls)92    def test_allowed_reference(self):93        # check_and_follow_branch_reference does not raise if following references94        # is allowed and the source URL points to a branch reference to a95        # permitted location.96        opener = self.make_branch_opener(True, ['a', 'b', None])97        self.assertEqual(98            'b', opener.check_and_follow_branch_reference('a'))99        self.assertEqual(['a', 'b'], opener.follow_reference_calls)100    def test_check_referenced_urls(self):101        # check_and_follow_branch_reference checks if the URL a reference points102        # to is safe.103        opener = self.make_branch_opener(104            True, ['a', 'b', None], unsafe_urls=set('b'))105        self.assertRaises(106            BadUrl, opener.check_and_follow_branch_reference, 'a')107        self.assertEqual(['a'], opener.follow_reference_calls)108    def test_self_referencing_branch(self):109        # check_and_follow_branch_reference raises BranchReferenceLoopError if110        # following references is allowed and the source url points to a111        # self-referencing branch reference.112        opener = self.make_branch_opener(True, ['a', 'a'])113        self.assertRaises(114            BranchLoopError, opener.check_and_follow_branch_reference, 'a')115        self.assertEqual(['a'], opener.follow_reference_calls)116    def test_branch_reference_loop(self):117        # check_and_follow_branch_reference raises BranchReferenceLoopError if118        # following references is allowed and the source url points to a loop119        # of branch references.120        references = ['a', 'b', 'a']121        opener = self.make_branch_opener(True, references)122        self.assertRaises(123            BranchLoopError, opener.check_and_follow_branch_reference, 'a')124        self.assertEqual(['a', 'b'], opener.follow_reference_calls)125class TrackingProber(BzrProber):126    """Subclass of BzrProber which tracks URLs it has been asked to open."""127    seen_urls = []128    @classmethod129    def probe_transport(klass, transport):130        klass.seen_urls.append(transport.base)131        return BzrProber.probe_transport(transport)132class TestBranchOpenerStacking(TestCaseWithTransport):133    def setUp(self):134        super(TestBranchOpenerStacking, self).setUp()135        BranchOpener.install_hook()136    def make_branch_opener(self, allowed_urls, probers=None):137        policy = WhitelistPolicy(True, allowed_urls, True)138        return BranchOpener(policy, probers)139    def test_probers(self):140        # Only the specified probers should be used141        b = self.make_branch('branch')142        opener = self.make_branch_opener([b.base], probers=[])143        self.assertRaises(NotBranchError, opener.open, b.base)144        opener = self.make_branch_opener([b.base], probers=[BzrProber])145        self.assertEqual(b.base, opener.open(b.base).base)146    def test_default_probers(self):147        # If no probers are specified to the constructor148        # of BranchOpener, then a safe set will be used,149        # rather than all probers registered in bzr.150        self.addCleanup(ControlDirFormat.unregister_prober, TrackingProber)151        ControlDirFormat.register_prober(TrackingProber)152        # Open a location without any branches, so that all probers are153        # tried.154        # First, check that the TrackingProber tracks correctly.155        TrackingProber.seen_urls = []156        opener = self.make_branch_opener(["."], probers=[TrackingProber])157        self.assertRaises(NotBranchError, opener.open, ".")158        self.assertEqual(1, len(TrackingProber.seen_urls))159        TrackingProber.seen_urls = []160        # And make sure it's registered in such a way that ControlDir.open would161        # use it.162        self.assertRaises(NotBranchError, ControlDir.open, ".")163        self.assertEqual(1, len(TrackingProber.seen_urls))164    def test_allowed_url(self):165        # the opener does not raise an exception for branches stacked on166        # branches with allowed URLs.167        stacked_on_branch = self.make_branch('base-branch', format='1.6')168        stacked_branch = self.make_branch('stacked-branch', format='1.6')169        stacked_branch.set_stacked_on_url(stacked_on_branch.base)170        opener = self.make_branch_opener(171            [stacked_branch.base, stacked_on_branch.base])172        # This doesn't raise an exception.173        opener.open(stacked_branch.base)174    def test_nstackable_repository(self):175        # treats branches with UnstackableRepositoryFormats as176        # being not stacked.177        branch = self.make_branch('unstacked', format='knit')178        opener = self.make_branch_opener([branch.base])179        # This doesn't raise an exception.180        opener.open(branch.base)181    def test_allowed_relative_url(self):182        # passes on absolute urls to check_one_url, even if the183        # value of stacked_on_location in the config is set to a relative URL.184        stacked_on_branch = self.make_branch('base-branch', format='1.6')185        stacked_branch = self.make_branch('stacked-branch', format='1.6')186        stacked_branch.set_stacked_on_url('../base-branch')187        opener = self.make_branch_opener(188            [stacked_branch.base, stacked_on_branch.base])189        # Note that stacked_on_branch.base is not '../base-branch', it's an190        # absolute URL.191        self.assertNotEqual('../base-branch', stacked_on_branch.base)192        # This doesn't raise an exception.193        opener.open(stacked_branch.base)194    def test_allowed_relative_nested(self):195        # Relative URLs are resolved relative to the stacked branch.196        self.get_transport().mkdir('subdir')197        a = self.make_branch('subdir/a', format='1.6')198        b = self.make_branch('b', format='1.6')199        b.set_stacked_on_url('../subdir/a')200        c = self.make_branch('subdir/c', format='1.6')201        c.set_stacked_on_url('../../b')202        opener = self.make_branch_opener([c.base, b.base, a.base])203        # This doesn't raise an exception.204        opener.open(c.base)205    def test_forbidden_url(self):206        # raises a BadUrl exception if a branch is stacked on a207        # branch with a forbidden URL.208        stacked_on_branch = self.make_branch('base-branch', format='1.6')209        stacked_branch = self.make_branch('stacked-branch', format='1.6')210        stacked_branch.set_stacked_on_url(stacked_on_branch.base)211        opener = self.make_branch_opener([stacked_branch.base])212        self.assertRaises(BadUrl, opener.open, stacked_branch.base)213    def test_forbidden_url_nested(self):214        # raises a BadUrl exception if a branch is stacked on a215        # branch that is in turn stacked on a branch with a forbidden URL.216        a = self.make_branch('a', format='1.6')217        b = self.make_branch('b', format='1.6')218        b.set_stacked_on_url(a.base)219        c = self.make_branch('c', format='1.6')220        c.set_stacked_on_url(b.base)221        opener = self.make_branch_opener([c.base, b.base])222        self.assertRaises(BadUrl, opener.open, c.base)223    def test_self_stacked_branch(self):224        # raises StackingLoopError if a branch is stacked on225        # itself. This avoids infinite recursion errors.226        a = self.make_branch('a', format='1.6')227        # Bazaar 1.17 and up make it harder to create branches like this.228        # It's still worth testing that we don't blow up in the face of them,229        # so we grovel around a bit to create one anyway.230        a.get_config().set_user_option('stacked_on_location', a.base)231        opener = self.make_branch_opener([a.base])232        self.assertRaises(BranchLoopError, opener.open, a.base)233    def test_loop_stacked_branch(self):234        # raises StackingLoopError if a branch is stacked in such235        # a way so that it is ultimately stacked on itself. e.g. a stacked on236        # b stacked on a.237        a = self.make_branch('a', format='1.6')238        b = self.make_branch('b', format='1.6')239        a.set_stacked_on_url(b.base)240        b.set_stacked_on_url(a.base)241        opener = self.make_branch_opener([a.base, b.base])242        self.assertRaises(BranchLoopError, opener.open, a.base)243        self.assertRaises(BranchLoopError, opener.open, b.base)244    def test_custom_opener(self):245        # A custom function for opening a control dir can be specified.246        a = self.make_branch('a', format='2a')247        b = self.make_branch('b', format='2a')248        b.set_stacked_on_url(a.base)249        TrackingProber.seen_urls = []250        opener = self.make_branch_opener(251            [a.base, b.base], probers=[TrackingProber])252        opener.open(b.base)253        self.assertEqual(254            set(TrackingProber.seen_urls), set([b.base, a.base]))255    def test_custom_opener_with_branch_reference(self):256        # A custom function for opening a control dir can be specified.257        a = self.make_branch('a', format='2a')258        b_dir = self.make_bzrdir('b')259        b = BranchReferenceFormat().initialize(b_dir, target_branch=a)260        TrackingProber.seen_urls = []261        opener = self.make_branch_opener(262            [a.base, b.base], probers=[TrackingProber])263        opener.open(b.base)264        self.assertEqual(265            set(TrackingProber.seen_urls), set([b.base, a.base]))266class TestOpenOnlyScheme(TestCaseWithTransport):267    """Tests for `open_only_scheme`."""268    def setUp(self):269        super(TestOpenOnlyScheme, self).setUp()270        BranchOpener.install_hook()271    def test_hook_does_not_interfere(self):272        # The transform_fallback_location hook does not interfere with regular273        # stacked branch access outside of open_only_scheme.274        self.make_branch('stacked')275        self.make_branch('stacked-on')...test_Request.py
Source:test_Request.py  
1# -*- coding: utf-8 -*-2# (c) 2018 Matt Martz <matt@sivel.net>3# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)4from __future__ import absolute_import, division, print_function5__metaclass__ = type6import datetime7import os8from ansible.module_utils.urls import (Request, open_url, urllib_request, HAS_SSLCONTEXT, cookiejar, RequestWithMethod,9                                       UnixHTTPHandler, UnixHTTPSConnection, httplib)10from ansible.module_utils.urls import SSLValidationHandler, HTTPSClientAuthHandler, RedirectHandlerFactory11import pytest12from mock import call13if HAS_SSLCONTEXT:14    import ssl15@pytest.fixture16def urlopen_mock(mocker):17    return mocker.patch('ansible.module_utils.urls.urllib_request.urlopen')18@pytest.fixture19def install_opener_mock(mocker):20    return mocker.patch('ansible.module_utils.urls.urllib_request.install_opener')21def test_Request_fallback(urlopen_mock, install_opener_mock, mocker):22    cookies = cookiejar.CookieJar()23    request = Request(24        headers={'foo': 'bar'},25        use_proxy=False,26        force=True,27        timeout=100,28        validate_certs=False,29        url_username='user',30        url_password='passwd',31        http_agent='ansible-tests',32        force_basic_auth=True,33        follow_redirects='all',34        client_cert='/tmp/client.pem',35        client_key='/tmp/client.key',36        cookies=cookies,37        unix_socket='/foo/bar/baz.sock',38        ca_path='/foo/bar/baz.pem',39    )40    fallback_mock = mocker.spy(request, '_fallback')41    r = request.open('GET', 'https://ansible.com')42    calls = [43        call(None, False),  # use_proxy44        call(None, True),  # force45        call(None, 100),  # timeout46        call(None, False),  # validate_certs47        call(None, 'user'),  # url_username48        call(None, 'passwd'),  # url_password49        call(None, 'ansible-tests'),  # http_agent50        call(None, True),  # force_basic_auth51        call(None, 'all'),  # follow_redirects52        call(None, '/tmp/client.pem'),  # client_cert53        call(None, '/tmp/client.key'),  # client_key54        call(None, cookies),  # cookies55        call(None, '/foo/bar/baz.sock'),  # unix_socket56        call(None, '/foo/bar/baz.pem'),  # ca_path57    ]58    fallback_mock.assert_has_calls(calls)59    assert fallback_mock.call_count == 14  # All but headers use fallback60    args = urlopen_mock.call_args[0]61    assert args[1] is None  # data, this is handled in the Request not urlopen62    assert args[2] == 100  # timeout63    req = args[0]64    assert req.headers == {65        'Authorization': b'Basic dXNlcjpwYXNzd2Q=',66        'Cache-control': 'no-cache',67        'Foo': 'bar',68        'User-agent': 'ansible-tests'69    }70    assert req.data is None71    assert req.get_method() == 'GET'72def test_Request_open(urlopen_mock, install_opener_mock):73    r = Request().open('GET', 'https://ansible.com/')74    args = urlopen_mock.call_args[0]75    assert args[1] is None  # data, this is handled in the Request not urlopen76    assert args[2] == 10  # timeout77    req = args[0]78    assert req.headers == {}79    assert req.data is None80    assert req.get_method() == 'GET'81    opener = install_opener_mock.call_args[0][0]82    handlers = opener.handlers83    if not HAS_SSLCONTEXT:84        expected_handlers = (85            SSLValidationHandler,86            RedirectHandlerFactory(),  # factory, get handler87        )88    else:89        expected_handlers = (90            RedirectHandlerFactory(),  # factory, get handler91        )92    found_handlers = []93    for handler in handlers:94        if isinstance(handler, SSLValidationHandler) or handler.__class__.__name__ == 'RedirectHandler':95            found_handlers.append(handler)96    assert len(found_handlers) == len(expected_handlers)97def test_Request_open_http(urlopen_mock, install_opener_mock):98    r = Request().open('GET', 'http://ansible.com/')99    args = urlopen_mock.call_args[0]100    opener = install_opener_mock.call_args[0][0]101    handlers = opener.handlers102    found_handlers = []103    for handler in handlers:104        if isinstance(handler, SSLValidationHandler):105            found_handlers.append(handler)106    assert len(found_handlers) == 0107def test_Request_open_unix_socket(urlopen_mock, install_opener_mock):108    r = Request().open('GET', 'http://ansible.com/', unix_socket='/foo/bar/baz.sock')109    args = urlopen_mock.call_args[0]110    opener = install_opener_mock.call_args[0][0]111    handlers = opener.handlers112    found_handlers = []113    for handler in handlers:114        if isinstance(handler, UnixHTTPHandler):115            found_handlers.append(handler)116    assert len(found_handlers) == 1117def test_Request_open_https_unix_socket(urlopen_mock, install_opener_mock):118    r = Request().open('GET', 'https://ansible.com/', unix_socket='/foo/bar/baz.sock')119    args = urlopen_mock.call_args[0]120    opener = install_opener_mock.call_args[0][0]121    handlers = opener.handlers122    found_handlers = []123    for handler in handlers:124        if isinstance(handler, HTTPSClientAuthHandler):125            found_handlers.append(handler)126    assert len(found_handlers) == 1127    inst = found_handlers[0]._build_https_connection('foo')128    assert isinstance(inst, UnixHTTPSConnection)129def test_Request_open_ftp(urlopen_mock, install_opener_mock, mocker):130    mocker.patch('ansible.module_utils.urls.ParseResultDottedDict.as_list', side_effect=AssertionError)131    # Using ftp scheme should prevent the AssertionError side effect to fire132    r = Request().open('GET', 'ftp://foo@ansible.com/')133def test_Request_open_headers(urlopen_mock, install_opener_mock):134    r = Request().open('GET', 'http://ansible.com/', headers={'Foo': 'bar'})135    args = urlopen_mock.call_args[0]136    req = args[0]137    assert req.headers == {'Foo': 'bar'}138def test_Request_open_username(urlopen_mock, install_opener_mock):139    r = Request().open('GET', 'http://ansible.com/', url_username='user')140    opener = install_opener_mock.call_args[0][0]141    handlers = opener.handlers142    expected_handlers = (143        urllib_request.HTTPBasicAuthHandler,144        urllib_request.HTTPDigestAuthHandler,145    )146    found_handlers = []147    for handler in handlers:148        if isinstance(handler, expected_handlers):149            found_handlers.append(handler)150    assert len(found_handlers) == 2151    assert found_handlers[0].passwd.passwd[None] == {(('ansible.com', '/'),): ('user', None)}152def test_Request_open_username_in_url(urlopen_mock, install_opener_mock):153    r = Request().open('GET', 'http://user2@ansible.com/')154    opener = install_opener_mock.call_args[0][0]155    handlers = opener.handlers156    expected_handlers = (157        urllib_request.HTTPBasicAuthHandler,158        urllib_request.HTTPDigestAuthHandler,159    )160    found_handlers = []161    for handler in handlers:162        if isinstance(handler, expected_handlers):163            found_handlers.append(handler)164    assert found_handlers[0].passwd.passwd[None] == {(('ansible.com', '/'),): ('user2', '')}165def test_Request_open_username_force_basic(urlopen_mock, install_opener_mock):166    r = Request().open('GET', 'http://ansible.com/', url_username='user', url_password='passwd', force_basic_auth=True)167    opener = install_opener_mock.call_args[0][0]168    handlers = opener.handlers169    expected_handlers = (170        urllib_request.HTTPBasicAuthHandler,171        urllib_request.HTTPDigestAuthHandler,172    )173    found_handlers = []174    for handler in handlers:175        if isinstance(handler, expected_handlers):176            found_handlers.append(handler)177    assert len(found_handlers) == 0178    args = urlopen_mock.call_args[0]179    req = args[0]180    assert req.headers.get('Authorization') == b'Basic dXNlcjpwYXNzd2Q='181def test_Request_open_auth_in_netloc(urlopen_mock, install_opener_mock):182    r = Request().open('GET', 'http://user:passwd@ansible.com/')183    args = urlopen_mock.call_args[0]184    req = args[0]185    assert req.get_full_url() == 'http://ansible.com/'186    opener = install_opener_mock.call_args[0][0]187    handlers = opener.handlers188    expected_handlers = (189        urllib_request.HTTPBasicAuthHandler,190        urllib_request.HTTPDigestAuthHandler,191    )192    found_handlers = []193    for handler in handlers:194        if isinstance(handler, expected_handlers):195            found_handlers.append(handler)196    assert len(found_handlers) == 2197def test_Request_open_netrc(urlopen_mock, install_opener_mock, monkeypatch):198    here = os.path.dirname(__file__)199    monkeypatch.setenv('NETRC', os.path.join(here, 'fixtures/netrc'))200    r = Request().open('GET', 'http://ansible.com/')201    args = urlopen_mock.call_args[0]202    req = args[0]203    assert req.headers.get('Authorization') == b'Basic dXNlcjpwYXNzd2Q='204    r = Request().open('GET', 'http://foo.ansible.com/')205    args = urlopen_mock.call_args[0]206    req = args[0]207    assert 'Authorization' not in req.headers208    monkeypatch.setenv('NETRC', os.path.join(here, 'fixtures/netrc.nonexistant'))209    r = Request().open('GET', 'http://ansible.com/')210    args = urlopen_mock.call_args[0]211    req = args[0]212    assert 'Authorization' not in req.headers213def test_Request_open_no_proxy(urlopen_mock, install_opener_mock, mocker):214    build_opener_mock = mocker.patch('ansible.module_utils.urls.urllib_request.build_opener')215    r = Request().open('GET', 'http://ansible.com/', use_proxy=False)216    handlers = build_opener_mock.call_args[0]217    found_handlers = []218    for handler in handlers:219        if isinstance(handler, urllib_request.ProxyHandler):220            found_handlers.append(handler)221    assert len(found_handlers) == 1222@pytest.mark.skipif(not HAS_SSLCONTEXT, reason="requires SSLContext")223def test_Request_open_no_validate_certs(urlopen_mock, install_opener_mock):224    r = Request().open('GET', 'https://ansible.com/', validate_certs=False)225    opener = install_opener_mock.call_args[0][0]226    handlers = opener.handlers227    ssl_handler = None228    for handler in handlers:229        if isinstance(handler, HTTPSClientAuthHandler):230            ssl_handler = handler231            break232    assert ssl_handler is not None233    inst = ssl_handler._build_https_connection('foo')234    assert isinstance(inst, httplib.HTTPSConnection)235    context = ssl_handler._context236    assert context.protocol == ssl.PROTOCOL_SSLv23237    if ssl.OP_NO_SSLv2:238        assert context.options & ssl.OP_NO_SSLv2239    assert context.options & ssl.OP_NO_SSLv3240    assert context.verify_mode == ssl.CERT_NONE241    assert context.check_hostname is False242def test_Request_open_client_cert(urlopen_mock, install_opener_mock):243    here = os.path.dirname(__file__)244    client_cert = os.path.join(here, 'fixtures/client.pem')245    client_key = os.path.join(here, 'fixtures/client.key')246    r = Request().open('GET', 'https://ansible.com/', client_cert=client_cert, client_key=client_key)247    opener = install_opener_mock.call_args[0][0]248    handlers = opener.handlers249    ssl_handler = None250    for handler in handlers:251        if isinstance(handler, HTTPSClientAuthHandler):252            ssl_handler = handler253            break254    assert ssl_handler is not None255    assert ssl_handler.client_cert == client_cert256    assert ssl_handler.client_key == client_key257    https_connection = ssl_handler._build_https_connection('ansible.com')258    assert https_connection.key_file == client_key259    assert https_connection.cert_file == client_cert260def test_Request_open_cookies(urlopen_mock, install_opener_mock):261    r = Request().open('GET', 'https://ansible.com/', cookies=cookiejar.CookieJar())262    opener = install_opener_mock.call_args[0][0]263    handlers = opener.handlers264    cookies_handler = None265    for handler in handlers:266        if isinstance(handler, urllib_request.HTTPCookieProcessor):267            cookies_handler = handler268            break269    assert cookies_handler is not None270def test_Request_open_invalid_method(urlopen_mock, install_opener_mock):271    r = Request().open('UNKNOWN', 'https://ansible.com/')272    args = urlopen_mock.call_args[0]273    req = args[0]274    assert req.data is None275    assert req.get_method() == 'UNKNOWN'276    # assert r.status == 504277def test_Request_open_custom_method(urlopen_mock, install_opener_mock):278    r = Request().open('DELETE', 'https://ansible.com/')279    args = urlopen_mock.call_args[0]280    req = args[0]281    assert isinstance(req, RequestWithMethod)282def test_Request_open_user_agent(urlopen_mock, install_opener_mock):283    r = Request().open('GET', 'https://ansible.com/', http_agent='ansible-tests')284    args = urlopen_mock.call_args[0]285    req = args[0]286    assert req.headers.get('User-agent') == 'ansible-tests'287def test_Request_open_force(urlopen_mock, install_opener_mock):288    r = Request().open('GET', 'https://ansible.com/', force=True, last_mod_time=datetime.datetime.now())289    args = urlopen_mock.call_args[0]290    req = args[0]291    assert req.headers.get('Cache-control') == 'no-cache'292    assert 'If-modified-since' not in req.headers293def test_Request_open_last_mod(urlopen_mock, install_opener_mock):294    now = datetime.datetime.now()295    r = Request().open('GET', 'https://ansible.com/', last_mod_time=now)296    args = urlopen_mock.call_args[0]297    req = args[0]298    assert req.headers.get('If-modified-since') == now.strftime('%a, %d %b %Y %H:%M:%S -0000')299def test_Request_open_headers_not_dict(urlopen_mock, install_opener_mock):300    with pytest.raises(ValueError):301        Request().open('GET', 'https://ansible.com/', headers=['bob'])302def test_Request_init_headers_not_dict(urlopen_mock, install_opener_mock):303    with pytest.raises(ValueError):304        Request(headers=['bob'])305@pytest.mark.parametrize('method,kwargs', [306    ('get', {}),307    ('options', {}),308    ('head', {}),309    ('post', {'data': None}),310    ('put', {'data': None}),311    ('patch', {'data': None}),312    ('delete', {}),313])314def test_methods(method, kwargs, mocker):315    expected = method.upper()316    open_mock = mocker.patch('ansible.module_utils.urls.Request.open')317    request = Request()318    getattr(request, method)('https://ansible.com')319    open_mock.assert_called_once_with(expected, 'https://ansible.com', **kwargs)320def test_open_url(urlopen_mock, install_opener_mock, mocker):321    req_mock = mocker.patch('ansible.module_utils.urls.Request.open')322    open_url('https://ansible.com/')323    req_mock.assert_called_once_with('GET', 'https://ansible.com/', data=None, headers=None, use_proxy=True,324                                     force=False, last_mod_time=None, timeout=10, validate_certs=True,325                                     url_username=None, url_password=None, http_agent=None,326                                     force_basic_auth=False, follow_redirects='urllib2',327                                     client_cert=None, client_key=None, cookies=None, use_gssapi=False,..._opener.py
Source:_opener.py  
...312    handlers = []313    replacement_handlers = []314    def __init__(self, klass=OpenerDirector):315        self.klass = klass316    def build_opener(self, *handlers):317        """Create an opener object from a list of handlers and processors.318        The opener will use several default handlers and processors, including319        support for HTTP and FTP.320        If any of the handlers passed as arguments are subclasses of the321        default handlers, the default handlers will not be used.322        """323        opener = self.klass()324        default_classes = list(self.default_classes)325        skip = []326        for klass in default_classes:327            for check in handlers:328                if type(check) == types.ClassType:329                    if issubclass(check, klass):330                        skip.append(klass)331                elif type(check) == types.InstanceType:332                    if isinstance(check, klass):333                        skip.append(klass)334        for klass in skip:335            default_classes.remove(klass)336        for klass in default_classes:337            opener.add_handler(klass())338        for h in handlers:339            if type(h) == types.ClassType:340                h = h()341            opener.add_handler(h)342        return opener343build_opener = OpenerFactory().build_opener344_opener = None345urlopen_lock = _threading.Lock()346def urlopen(url, data=None, timeout=_sockettimeout._GLOBAL_DEFAULT_TIMEOUT):347    global _opener348    if _opener is None:349        urlopen_lock.acquire()350        try:351            if _opener is None:352                _opener = build_opener()353        finally:354            urlopen_lock.release()355    return _opener.open(url, data, timeout)356def urlretrieve(url, filename=None, reporthook=None, data=None,357                timeout=_sockettimeout._GLOBAL_DEFAULT_TIMEOUT):358    global _opener359    if _opener is None:360        urlopen_lock.acquire()361        try:362            if _opener is None:363                _opener = build_opener()364        finally:365            urlopen_lock.release()366    return _opener.retrieve(url, filename, reporthook, data, timeout)367def install_opener(opener):368    global _opener...test_locked_file.py
Source:test_locked_file.py  
1# Copyright 2016 Google Inc. All rights reserved.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7#      http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14import errno15import os16import sys17import tempfile18import mock19import unittest220from oauth2client.contrib import locked_file21class TestOpener(unittest2.TestCase):22    def _make_one(self):23        _filehandle, filename = tempfile.mkstemp()24        os.close(_filehandle)25        return locked_file._Opener(filename, 'r+', 'r'), filename26    def test_ctor(self):27        instance, filename = self._make_one()28        self.assertFalse(instance._locked)29        self.assertEqual(instance._filename, filename)30        self.assertEqual(instance._mode, 'r+')31        self.assertEqual(instance._fallback_mode, 'r')32        self.assertIsNone(instance._fh)33        self.assertIsNone(instance._lock_fd)34    def test_is_locked(self):35        instance, _ = self._make_one()36        self.assertFalse(instance.is_locked())37        instance._locked = True38        self.assertTrue(instance.is_locked())39    def test_file_handle(self):40        instance, _ = self._make_one()41        self.assertIsNone(instance.file_handle())42        fh = mock.Mock()43        instance._fh = fh44        self.assertEqual(instance.file_handle(), fh)45    def test_filename(self):46        instance, filename = self._make_one()47        self.assertEqual(instance.filename(), filename)48    def test_open_and_lock(self):49        instance, _ = self._make_one()50        instance.open_and_lock(1, 1)51    def test_unlock_and_close(self):52        instance, _ = self._make_one()53        instance.unlock_and_close()54class TestPosixOpener(TestOpener):55    def _make_one(self):56        _filehandle, filename = tempfile.mkstemp()57        os.close(_filehandle)58        return locked_file._PosixOpener(filename, 'r+', 'r'), filename59    def test_relock_fail(self):60        instance, _ = self._make_one()61        instance.open_and_lock(1, 1)62        self.assertTrue(instance.is_locked())63        self.assertIsNotNone(instance.file_handle())64        self.assertRaises(65            locked_file.AlreadyLockedException, instance.open_and_lock, 1, 1)66    @mock.patch('oauth2client.contrib.locked_file.open', create=True)67    def test_lock_access_error_fallback_mode(self, mock_open):68        # NOTE: This is a bad case. The behavior here should be that the69        # error gets re-raised, but the module lets the if statement fall70        # through.71        instance, _ = self._make_one()72        mock_open.side_effect = [IOError(errno.ENOENT, '')]73        instance.open_and_lock(1, 1)74        self.assertIsNone(instance.file_handle())75        self.assertTrue(instance.is_locked())76    @mock.patch('oauth2client.contrib.locked_file.open', create=True)77    def test_lock_non_access_error(self, mock_open):78        instance, _ = self._make_one()79        fh_mock = mock.Mock()80        mock_open.side_effect = [IOError(errno.EACCES, ''), fh_mock]81        instance.open_and_lock(1, 1)82        self.assertEqual(instance.file_handle(), fh_mock)83        self.assertFalse(instance.is_locked())84    @mock.patch('oauth2client.contrib.locked_file.open', create=True)85    def test_lock_unexpected_error(self, mock_open):86        instance, _ = self._make_one()87        with mock.patch('os.open') as mock_os_open:88            mock_os_open.side_effect = [OSError(errno.EPERM, '')]89            self.assertRaises(OSError, instance.open_and_lock, 1, 1)90    @mock.patch('oauth2client.contrib.locked_file.open', create=True)91    @mock.patch('oauth2client.contrib.locked_file.logger')92    @mock.patch('time.time')93    def test_lock_timeout_error(self, mock_time, mock_logger, mock_open):94        instance, _ = self._make_one()95        # Make it seem like 10 seconds have passed between calls.96        mock_time.side_effect = [0, 10]97        with mock.patch('os.open') as mock_os_open:98            # Raising EEXIST should cause it to try to retry locking.99            mock_os_open.side_effect = [OSError(errno.EEXIST, '')]100            instance.open_and_lock(1, 1)101            self.assertFalse(instance.is_locked())102            self.assertTrue(mock_logger.warn.called)103    @mock.patch('oauth2client.contrib.locked_file.open', create=True)104    @mock.patch('oauth2client.contrib.locked_file.logger')105    @mock.patch('time.time')106    def test_lock_timeout_error_no_fh(self, mock_time, mock_logger, mock_open):107        instance, _ = self._make_one()108        # Make it seem like 10 seconds have passed between calls.109        mock_time.side_effect = [0, 10]110        # This will cause the retry loop to enter without a file handle.111        fh_mock = mock.Mock()112        mock_open.side_effect = [IOError(errno.ENOENT, ''), fh_mock]113        with mock.patch('os.open') as mock_os_open:114            # Raising EEXIST should cause it to try to retry locking.115            mock_os_open.side_effect = [OSError(errno.EEXIST, '')]116            instance.open_and_lock(1, 1)117            self.assertFalse(instance.is_locked())118            self.assertTrue(mock_logger.warn.called)119            self.assertEqual(instance.file_handle(), fh_mock)120    @mock.patch('oauth2client.contrib.locked_file.open', create=True)121    @mock.patch('time.time')122    @mock.patch('time.sleep')123    def test_lock_retry_success(self, mock_sleep, mock_time, mock_open):124        instance, _ = self._make_one()125        # Make it seem like 1 second has passed between calls. Extra values126        # are needed by the logging module.127        mock_time.side_effect = [0, 1]128        with mock.patch('os.open') as mock_os_open:129            # Raising EEXIST should cause it to try to retry locking.130            mock_os_open.side_effect = [131                OSError(errno.EEXIST, ''), mock.Mock()]132            instance.open_and_lock(10, 1)133            print(mock_os_open.call_args_list)134            self.assertTrue(instance.is_locked())135            mock_sleep.assert_called_with(1)136    @mock.patch('oauth2client.contrib.locked_file.os')137    def test_unlock(self, os_mock):138        instance, _ = self._make_one()139        instance._locked = True140        lock_fd_mock = instance._lock_fd = mock.Mock()141        instance._fh = mock.Mock()142        instance.unlock_and_close()143        self.assertFalse(instance.is_locked())144        os_mock.close.assert_called_once_with(lock_fd_mock)145        self.assertTrue(os_mock.unlink.called)146        self.assertTrue(instance._fh.close.called)147class TestLockedFile(unittest2.TestCase):148    @mock.patch('oauth2client.contrib.locked_file._PosixOpener')149    def _make_one(self, opener_ctor_mock):150        opener_mock = mock.Mock()151        opener_ctor_mock.return_value = opener_mock152        return locked_file.LockedFile(153            'a_file', 'r+', 'r', use_native_locking=False), opener_mock154    @mock.patch('oauth2client.contrib.locked_file._PosixOpener')155    def test_ctor_minimal(self, opener_mock):156        locked_file.LockedFile(157            'a_file', 'r+', 'r', use_native_locking=False)158        opener_mock.assert_called_with('a_file', 'r+', 'r')159    @mock.patch.dict('sys.modules', {160        'oauth2client.contrib._win32_opener': mock.Mock()})161    def test_ctor_native_win32(self):162        _win32_opener_mock = sys.modules['oauth2client.contrib._win32_opener']163        locked_file.LockedFile(164            'a_file', 'r+', 'r', use_native_locking=True)165        _win32_opener_mock._Win32Opener.assert_called_with('a_file', 'r+', 'r')166    @mock.patch.dict('sys.modules', {167        'oauth2client.contrib._win32_opener': None,168        'oauth2client.contrib._fcntl_opener': mock.Mock()})169    def test_ctor_native_fcntl(self):170        _fnctl_opener_mock = sys.modules['oauth2client.contrib._fcntl_opener']171        locked_file.LockedFile(172            'a_file', 'r+', 'r', use_native_locking=True)173        _fnctl_opener_mock._FcntlOpener.assert_called_with('a_file', 'r+', 'r')174    @mock.patch('oauth2client.contrib.locked_file._PosixOpener')175    @mock.patch.dict('sys.modules', {176        'oauth2client.contrib._win32_opener': None,177        'oauth2client.contrib._fcntl_opener': None})178    def test_ctor_native_posix_fallback(self, opener_mock):179        locked_file.LockedFile(180            'a_file', 'r+', 'r', use_native_locking=True)181        opener_mock.assert_called_with('a_file', 'r+', 'r')182    def test_filename(self):183        instance, opener = self._make_one()184        opener._filename = 'some file'185        self.assertEqual(instance.filename(), 'some file')186    def test_file_handle(self):187        instance, opener = self._make_one()188        self.assertEqual(instance.file_handle(), opener.file_handle())189        self.assertTrue(opener.file_handle.called)190    def test_is_locked(self):191        instance, opener = self._make_one()192        self.assertEqual(instance.is_locked(), opener.is_locked())193        self.assertTrue(opener.is_locked.called)194    def test_open_and_lock(self):195        instance, opener = self._make_one()196        instance.open_and_lock()197        opener.open_and_lock.assert_called_with(0, 0.05)198    def test_unlock_and_close(self):199        instance, opener = self._make_one()200        instance.unlock_and_close()201        opener.unlock_and_close.assert_called_with()202if __name__ == '__main__':  # pragma: NO COVER...template.py
Source:template.py  
1import statistics2import math3validation = False4def get_first_corrupted_chunk(line):5    openers = []6    for index, char in enumerate(line):7        if char == '}':8            last_opener = openers.pop()9            if last_opener != '{':10                return 119711        elif char == ']':12            last_opener = openers.pop()13            if last_opener != '[':14                return 5715        elif char == ')':16            last_opener = openers.pop()17            if last_opener != '(':18                return 319        elif char == '>':20            last_opener = openers.pop()21            if last_opener != '<':22                return 2513723        else:24            openers.append(char)25    return 026def part_1():27    part_1_input_file = "input.txt"28    if validation:29        part_1_input_file = "validation_part_1.txt"30    with open(part_1_input_file, "r") as file:31        # Solution here32        stack_dict = {'{': [], '}': [], '[': [], ']': [], '(': [], ')': [], '<': [], '>': []}33        total_score = 034        for line in file:35            total_score += get_first_corrupted_chunk(line.strip())36        return total_score37def get_all_unfinished_openers(line):38    openers = []39    for index, char in enumerate(line):40        if char == '}':41            last_opener = openers.pop()42            if last_opener != '{':43                return []44        elif char == ']':45            last_opener = openers.pop()46            if last_opener != '[':47                return []48        elif char == ')':49            last_opener = openers.pop()50            if last_opener != '(':51                return []52        elif char == '>':53            last_opener = openers.pop()54            if last_opener != '<':55                return []56        else:57            openers.append(char)58    return openers59def part_2():60    part_2_input_file = "input.txt"61    if validation:62        part_2_input_file = "validation_part_2.txt"63    with open(part_2_input_file, "r") as file:64        # Solution here65        scores = []66        for index, line in enumerate(file):67            score = 068            openers = get_all_unfinished_openers(line.strip())69            print(index, openers)70            if len(openers) > 0:71                openers.reverse()72                for opener in openers:73                    match opener:74                        case '(':75                            score *= 576                            score += 177                        case '[':78                            score *= 579                            score += 280                        case '{':81                            score *= 582                            score += 383                        case '<':84                            score *= 585                            score += 486                #print(openers, score)87                scores.append(score)88        scores.sort()89        print(scores[int(math.floor(len(scores) / 2))])90        return statistics.median(scores)91answer_part_1 = part_1()92answer_part_2 = part_2()93print(f'{answer_part_1=} {answer_part_2=}')...test_urllib2.py
Source:test_urllib2.py  
...15from ndg.httpsclient.urllib2_build_opener import build_opener16class Urllib2TestCase(unittest.TestCase):17    """Unit tests for urllib2 functionality"""18    19    def test01_urllib2_build_opener(self):     20        opener = build_opener()21        self.assert_(opener)22    def test02_open(self):23        opener = build_opener()24        res = opener.open(Constants.TEST_URI)25        self.assert_(res)26        print("res = %s" % res.read())27    def test03_open_fails_unknown_loc(self):28        opener = build_opener()29        self.failUnlessRaises(URLError, opener.open, Constants.TEST_URI2)30        31    def test04_open_peer_cert_verification_fails(self):32        # Explicitly set empty CA directory to make verification fail33        ctx = SSL.Context(SSL.SSLv3_METHOD)34        verify_callback = lambda conn, x509, errnum, errdepth, preverify_ok: \35            preverify_ok 36            37        ctx.set_verify(SSL.VERIFY_PEER, verify_callback)38        ctx.load_verify_locations(None, './')39        opener = build_opener(ssl_context=ctx)40        self.failUnlessRaises(SSL.Error, opener.open, Constants.TEST_URI)41        42        43if __name__ == "__main__":...Using AI Code Generation
1const { chromium } = require('playwright');2(async () => {3  const browser = await chromium.launch();4  const context = await browser.newContext();5  const page = await context.newPage();6  await page.screenshot({ path: 'google.png' });7  await browser.close();8})();9const { chromium } = require('playwright');10(async () => {11  const browserServer = await chromium.launchServer();12  const wsEndpoint = browserServer.wsEndpoint();13  const browser = await chromium.connect({ wsEndpoint });14  const context = await browser.newContext();15  const page = await context.newPage();16  await page.screenshot({ path: 'google.png' });17  await browser.close();18  await browserServer.close();19})();20const { chromium } = require('playwright');21(async () => {22  const browser = await chromium.launch();23  const context = await browser.newContext({24  });25  const page = await context.newPage();Using AI Code Generation
1const { chromium } = require('playwright');2(async () => {3  const browser = await chromium.launch();4  const context = await browser.newContext();5  const page = await context.newPage();6  await page.opener();7  await page.close();8  await context.close();9  await browser.close();10})();11const { chromium } = require('playwright');12(async () => {13  const browser = await chromium.launch();14  const context = await browser.newContext();15  const page = await context.newPage();16  const newPage = await page.opener();17  await newPage.close();18  await page.close();19  await context.close();20  await browser.close();21})();22const { chromium } = require('playwright');23(async () => {24  const browser = await chromium.launch();25  const context = await browser.newContext();26  const page = await context.newPage();27  const newContext = await browser.newContext();28  const newPage = await page.opener({ context: newContext });29  await newPage.close();30  await context.close();31  await newContext.close();32  await browser.close();33})();34const { chromium } = require('playwright');35(async () => {36  const browser = await chromium.launch();37  const context = await browser.newContext();38  const page = await context.newPage();39  const newPage = await page.opener();40  await newPage.close();Using AI Code Generation
1const playwright = require('playwright');2(async () => {3  for (const browserType of ['chromium', 'webkit', 'firefox']) {4    const browser = await playwright[browserType].launch();5    const context = await browser.newContext();6    const page = await context.newPage();7    const opener = await page.opener();8    if (opener === null) {9      console.log('No opener');10    } else {11      console.log('The opener is:');12      console.log(opener);13    }14    await browser.close();15  }16})();17const playwright = require('playwright');18(async () => {19  for (const browserType of ['chromium', 'webkit', 'firefox']) {20    const browser = await playwright[browserType].launch();21    const context = await browser.newContext();22    const page = await context.newPage();23    await Promise.all([24      page.waitForEvent('popup'),25      page.click('a'),26    ]);27    const opener = await page.opener();28    if (opener === null) {29      console.log('No opener');30    } else {31      console.log('The opener is:');32      console.log(opener);33    }34    await browser.close();35  }36})();37const playwright = require('playwright');38(async () => {39  for (const browserType of ['chromium', 'webkit', 'firefox']) {40    const browser = await playwright[browserType].launch();41    const context = await browser.newContext();42    const page = await context.newPage();43    await Promise.all([44      page.waitForEvent('popup'),45      page.click('a'),46    ]);47    const opener = await page.opener();48    if (opener === null) {49      console.log('No opener');50    } else {51      console.log('The opener is:');52      console.log(opener);53    }54    await browser.close();55  }56})();57const playwright = require('playwright');58(async () => {59  for (constUsing AI Code Generation
1const { chromium } = require('playwright');2(async () => {3  const browser = await chromium.launch();4  const context = await browser.newContext();5  const page = await context.newPage();6  await page.opener();7  await browser.close();8})();9## 3.3.4. Page.opener()10const { chromium } = require('playwright');11(async () => {12  const browser = await chromium.launch();13  const context = await browser.newContext();14  const page = await context.newPage();15  console.log(await page.opener());16  await browser.close();17})();18## 3.3.5. Page.opener()19const { chromium } = require('playwright');20(async () => {21  const browser = await chromium.launch();22  const context = await browser.newContext();23  const page = await context.newPage();24  console.log(await page.opener());25  await browser.close();26})();27## 3.3.6. Page.opener()28const { chromium } = require('playwright');29(async () => {30  const browser = await chromium.launch();31  const context = await browser.newContext();32  const page = await context.newPage();33  console.log(await page.opener());34  await browser.close();35})();36## 3.3.7. Page.opener()37const { chromium } = require('playwright');38(async () => {39  const browser = await chromium.launch();40  const context = await browser.newContext();Using AI Code Generation
1const { chromium } = require('playwright');2(async () => {3    const browser = await chromium.launch({ headless: false, slowMo: 50 });4    const context = await browser.newContext();5    const page = await context.newPage();6    await page.click('text=Get started');7    const newPagePromise = new Promise(x => browser.once('targetcreated', target => x(target.page())));8    await page.click('text=Show documentation');9    const newPage = await newPagePromise;10    await newPage.waitForLoadState();11    console.log(newPage.url());12    await newPage.close();13    await browser.close();14})();Using AI Code Generation
1const { chromium } = require('playwright');2(async () => {3    const browser = await chromium.launch();4    const context = await browser.newContext();5    const page = await context.newPage();6    await page.click('text=Sign in');7    await page.waitForSelector('input[type="email"]');8    await page.fill('input[type="email"]', 'yourEmail');9    await page.click('text=Next');10    await page.waitForSelector('input[type="password"]');11    await page.fill('input[type="password"]', 'yourPassword');12    await page.click('text=Next');13    await page.waitForNavigation();14    await page.click('text=Create');15    await page.waitForSelector('input[name="q"]');16    await page.fill('input[name="q"]', 'yourSearch');17    await page.click('text=Google Search');18    await page.waitForNavigation();19    await page.click('text=Images');20    await page.waitForNavigation();21    await page.click('text=Videos');22    await page.waitForNavigation();23    await page.click('text=Maps');24    await page.waitForNavigation();25    await page.click('text=News');26    await page.waitForNavigation();27    await page.click('text=Gmail');28    await page.waitForNavigation();29    await page.click('text=Drive');30    await page.waitForNavigation();31    await page.click('text=Calendar');32    await page.waitForNavigation();33    await page.click('text=Translate');34    await page.waitForNavigation();35    await page.click('text=Photos');36    await page.waitForNavigation();37    await page.click('text=Shopping');38    await page.waitForNavigation();39    await page.click('text=More');40    await page.waitForNavigation();41    await page.click('text=Your data in Search');42    await page.waitForNavigation();43    await page.click('text=Web History');44    await page.waitForNavigation();45    await page.click('text=Privacy');46    await page.waitForNavigation();47    await page.click('text=Terms');48    await page.waitForNavigation();49    await page.click('text=Settings');50    await page.waitForNavigation();51    await page.click('text=Advertising');52    await page.waitForNavigation();53    await page.click('text=Business');54    await page.waitForNavigation();55    await page.click('text=About');56    await page.waitForNavigation();57    await page.click('text=How Search works');Using AI Code Generation
1const playwright = require("playwright");2(async () => {3  const browser = await playwright["chromium"].launch();4  const context = await browser.newContext();5  const page = await context.newPage();6  await page.opener();7  await page.screenshot({ path: "example.png" });8  await browser.close();9})();Using AI Code Generation
1const { openBrowser, goto, textBox, write, click, closeBrowser } = require('taiko');2(async () => {3    try {4        await openBrowser({ headless: false });5        await write("Taiko", into(textBox({ placeholder: "Search" })));6        await click("Google Search");7    } catch (e) {8        console.error(e);9    } finally {10        await closeBrowser();11    }12})();Using AI Code Generation
1const { chromium } = require("playwright");2const browser = await chromium.launch();3const context = await browser.newContext({4  opener: (url) => {5    console.log("Opening:", url);6    return true;7  },8});9const page = await context.newPage();10await page.screenshot({ path: "google.png" });11await browser.close();12const { chromium } = require("playwright");13const browser = await chromium.launch();14const context = await browser.newContext();15const page = await context.newPage();16await page.screenshot({ path: "google.png" });17await page.click("text=About");18await page.waitForNavigation();19await page.screenshot({ path: "about.png" });20await browser.close();21const { chromium } = require("playwright");22const browser = await chromium.launch();23const context = await browser.newContext();24const page = await context.newPage();25await page.screenshot({ path: "google.png" });26await page.click("text=About");27await page.waitForNavigation();28await page.screenshot({ path: "about.png" });29const [request] = await Promise.all([30  page.waitForRequest("**/collect"),31  page.click("text=Advertising"),32]);33console.log(request.url());34await browser.close();35const { chromium } = require("playwright");36const browser = await chromium.launch();37const context = await browser.newContext();38const page = await context.newPage();39await page.screenshot({ path: "google.png" });40await page.click("text=About");LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
