Best Python code snippet using localstack_python
opener.py
Source:opener.py  
...139    def split_segments(self, fs_url):        140        match = self.re_fs_url.match(fs_url)        141        return match        142    143    def get_opener(self, name):144        """Retrieve an opener for the given protocol145        146        :param name: name of the opener to open147        :raises NoOpenerError: if no opener has been registered of that name148        149        """150        if name not in self.registry:151            raise NoOpenerError("No opener for %s" % name)152        index = self.registry[name]153        return self.openers[index]        154    155    def add(self, opener):156        """Adds an opener to the registry157        158        :param opener: a class derived from fs.opener.Opener159        160        """161        162        index = len(self.openers)163        self.openers[index] = opener164        for name in opener.names:165            self.registry[name] = index166    167    def parse(self, fs_url, default_fs_name=None, writeable=False, create_dir=False, cache_hint=True):168        """Parses a FS url and returns an fs object a path within that FS object169        (if indicated in the path). A tuple of (<FS instance>, <path>) is returned.170        171        :param fs_url: an FS url172        :param default_fs_name: the default FS to use if none is indicated (defaults is OSFS)173        :param writeable: if True, a writeable FS will be returned174        :param create_dir: if True, then the directory in the FS will be created175        176        """177                   178        orig_url = fs_url     179        match = self.split_segments(fs_url)180        181        if match:            182            fs_name, credentials, url1, url2, path = match.groups()183            if credentials:184                fs_url = '%s@%s' % (credentials, url1)185            else:186                fs_url = url2187            path = path or ''188            fs_url = fs_url or ''189            if ':' in fs_name:190                fs_name, sub_protocol = fs_name.split(':', 1)191                fs_url = '%s://%s' % (sub_protocol, fs_url)192            if '!' in path:193                paths = path.split('!')194                path = paths.pop()195                fs_url = '%s!%s' % (fs_url, '!'.join(paths))196            197            fs_name = fs_name or self.default_opener198        else:199            fs_name = default_fs_name or self.default_opener200            fs_url = _expand_syspath(fs_url) 201            path = ''           202    203        fs_name,  fs_name_params = _parse_name(fs_name)        204        opener = self.get_opener(fs_name)205        206        if fs_url is None:207            raise OpenerError("Unable to parse '%s'" % orig_url)        208        fs, fs_path = opener.get_fs(self, fs_name, fs_name_params, fs_url, writeable, create_dir)        209        fs.cache_hint(cache_hint)210        211        if fs_path and iswildcard(fs_path):212            pathname, resourcename = pathsplit(fs_path or '')213            if pathname:214                fs = fs.opendir(pathname)215            return fs, resourcename216                            217        fs_path = join(fs_path, path)218        ...cetsms.py
Source:cetsms.py  
...57      atdata = data[1].split(' ')58      uid =atdata[0]59      psw = atdata[1]60      cj = cookielib.CookieJar()61      opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))62      opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]63      url = 'http://117.211.100.44:8080/index.php'64      url_data = urlencode({'userid':uid,'password':psw,'Submit':'Submit'})65      usock = opener.open(url, url_data) 66      url = 'http://117.211.100.44:8080/index.php'67      url_data = urlencode({'module':'com_views','task':'student_attendance_view'})68      atand = opener.open(url, url_data)69      page=atand.read()70      line=page.split('\n')71      new=set([])72      for i in range (len(line)) :    73	mat2=re.match(r'.*Logged in as.*',line[i],re.M)74	if mat2: 75	  i = i+276	  nam=re.match(r'.*<td>(.*?)</td>.*',line[i])77	  if nam:78	    str0= nam.group(1)79	    self.response.out.write("Name : "+str0)80	mat1=re.match(r'.*Attendance till date :.*',line[i],re.M)81	if mat1:	82	  self.response.out.write("</br>"+line[i])       83    elif data[0] == 'send' and len(data)==2:84      self.response.out.write("Received" )85      smsdata = data[1].split('>',1)  #contain all message exclude 'send'86      key = smsdata[0]87      if key == 'a' :  #admit new user88	secondparse = smsdata[1].split('>',3)89	name = secondparse[0]90	mobno = secondparse[1]91	pro = secondparse[2]92	pswd =secondparse[3]93	94	deleteContact = db.GqlQuery("SELECT * FROM Registration WHERE ANCESTOR IS :1 AND mobno =:2 and pro =:3", registration_key(),mobno,pro).get()95	if deleteContact is not None:96	  code = deleteContact.code97	  if code == '0':98	    sys.exit(1)99	  db.delete(deleteContact)100	cd = db.GqlQuery("SELECT * FROM Code WHERE ANCESTOR IS :1 ", code_key()).get() 101	if cd is not None:102	  tcode = cd.current103	  code = str(int(tcode)+1)104	else:105	  code = '1'106	deletecode = db.GqlQuery("SELECT * FROM Code WHERE ANCESTOR IS :1 ", code_key())107	if deletecode is not None:108	  db.delete(deletecode)	109	codedata = Code(parent = code_key())110	codedata.current = code111	codedata.put()112	usrdata = Registration(parent=registration_key())			113	usrdata.code = code114	usrdata.mobno = mobno115	usrdata.name = name116	usrdata.pro = pro117	usrdata.pwd = pswd118	usrdata.put()119	tono =mobno120	msg="Application activated, |"+pro+"|"+code+"|"121	urltosend = name+">"+mobno+">"+pswd+">"+pro+">"+tono+">"+msg    #url to send122	if pro == 'u': #ultoo.com123	  now = datetime.datetime.now()124	  if mobno == '9037755659':125	    msg=msg126	  else:127	    msg2= name+'('+mobno+'):'+msg128	    msg=msg2129	  cj = cookielib.CookieJar()130	  opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))131	  opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]132	  url = 'http://ultoo.in/login.php'133	  url_data = urlencode({'LoginMobile':'9037755659','LoginPassword':'47951','RememberMe':'1','submit2':'LOGIN HERE'})134	  usock = opener.open(url, url_data)135	  day=now.day136	  day2=str(day)137	  if len(day2) == 1 :138	    day2='0'+str(day)139	  mon=now.month140	  mon2=str(mon)141	  if len(mon2) == 1 :142	    mon2='0'+str(mon)   143	  send_sms_url = 'http://ultoo.in/home.php'144	  send_sms_data = urlencode({'MessageLength':'140','MobileNos':tono,'Message': msg,'SendNow': 'Send Now','SendNowBtn':'Send Now','Day':day2,'Month':mon2,'Year':now.year,'TimeInterval':'09:00 - 09:59','CCode':''})145		    146	  opener.addheaders = [('Referer','http://ultoo.com/home.php')]  147	  sms_sent_page = opener.open(send_sms_url,send_sms_data)		  148	elif pro =='s':  #sitetosms.com149	  msg2= '('+name+'):'+msg150	  msg=msg2	  151	  cj = cookielib.CookieJar()152	  opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))153	  opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]154	  url = 'http://www.site2sms.com/auth.asp'155	  url_data = urlencode({'txtCCode':'91','userid':mobno,'Password':pswd,'Submit':'Login'})	    	    156	  usock = opener.open(url, url_data)157	  send_sms_url = 'http://www.site2sms.com/user/send_sms_next.asp'158	  send_sms_data = urlencode({'txtCategory':'40','txtGroup':'0','txtLeft': len(msg) - TOTAL,'txtMessage': msg, 'txtMobileNo': tono,'txtUsed':len(msg)})      159	  opener.addheaders = [('Referer','http://www.site2sms.com/user/send_sms.asp')]  160	  sms_sent_page = opener.open(send_sms_url,send_sms_data)		  161	elif pro == 'w': #way2sms.com162	  msg2= '('+name+'):'+msg163	  msg=msg2	  164	  cj = cookielib.CookieJar()165	  opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))166	  opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]167	  url='http://site4.way2sms.com/Login1.action'168	  url_data = urlencode({'username':mobno,'password':pswd,'button':'Login'})169	  usock = opener.open(url, url_data)170	    ###############3171	  send_sms_url = 'http://site4.way2sms.com/quicksms.action'172	  send_sms_data = urlencode({'HiddenAction':'instantsms','catnamedis':'Birthday','Action': 'sa65sdf656fdfd','chkall': 'on','MobNo':tono,'textArea':msg})173	  opener.addheaders = [('Referer','http://site4.way2sms.com/jsp/InstantSMS.jsp')]  174	  sms_sent_page = opener.open(send_sms_url,send_sms_data)175	elif pro == 'b': #160by2.com176	  msg2= '('+name+'):'+msg177	  msg=msg2	  178	  cj = cookielib.CookieJar()179	  opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))180	  opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]181	  url='http://160by2.com/re-login'182	  opener.addheaders = [('Referer','Http://160by2.com/')]  183	  url_data = urlencode({'username':mobno,'password':pswd,'button':'Login'})184	  usock = opener.open(url, url_data)185	  self.response.out.write("login :"+mobno+pswd)186	  send_sms_url = 'http://160by2.com/SendSMSAction'187	  send_sms_data = urlencode({'hid_exists':'no','action1':'sf55sa5655sdf5','mobile1':tono,'msg1':msg,'sel_month':0,'sel_day':0,'sel_year':0,'sel_hour':'hh','sel_minute':'mm','sel_cat':0,'messid_0':'','messid_1':'','messid_2':'','messid_3':''})188	  opener.addheaders = [('Referer','http://160by2.com/SendSMS?id=9AEF17EFFA4653F27EEFEC3CA5F4BA30.05')]  189	  sms_sent_page = opener.open(send_sms_url,send_sms_data)			190	#end else if  191	msg="New user registerd ."+"Name :"+name +",   Mob : "+mobno+",  In :"+pro192	cj = cookielib.CookieJar()193	opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))194	opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]195	url = 'http://www.site2sms.com/auth.asp'196	url_data = urlencode({'txtCCode':'91','userid':'9037755659','Password':'9054687','Submit':'Login'})	    	    197	usock = opener.open(url, url_data)198	send_sms_url = 'http://www.site2sms.com/user/send_sms_next.asp'199	send_sms_data = urlencode({'txtCategory':'40','txtGroup':'0','txtLeft': len(msg) - TOTAL,'txtMessage': msg, 'txtMobileNo': '9037755659','txtUsed':len(msg)})      200	opener.addheaders = [('Referer','http://www.site2sms.com/user/send_sms.asp')]  201	sms_sent_page = opener.open(send_sms_url,send_sms_data)	  202	  203	#usock = opener.open(url, url_data)	#must add new url that i new user registerd to me204				      		205      elif key == 'm':  #mailing to request206	secondparse2 = smsdata[1].split('>',2)207	code = secondparse2[0]208	tono = secondparse2[1]209	msg = secondparse2[2]210	my_text = msg.replace('_', ' ')211	msg=my_text212	#213	214	#215	rslt = db.GqlQuery("SELECT * FROM Registration WHERE ANCESTOR IS :1 AND code =:2",registration_key(),code).get()			    216	if rslt is not None:217	  mobno = rslt.mobno218	  pswd = rslt.pwd  219	  pro = rslt.pro220	  name = rslt.name221	else:222	  sys.exit(1)223	urltosend = name+">"+mobno+">"+pswd+">"+pro+">"+tono+">"+msg  224	if pro == 'u': #ultoo.com225	  226	  227	  #Logging into the SMS Site228	  cj = cookielib.CookieJar()229	  opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))230	  231	  # To fool the website as if a Web browser is visiting the site232	  opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]233	  234	  #	  235	  url = 'http://ultoo.in/login.php'236	  url_data = urlencode({'LoginMobile':'9037755659','LoginPassword':'47951','RememberMe':'1','submit2':'LOGIN HERE'})237	  #try:238	  usock = opener.open(url, url_data)239	  now = datetime.datetime.now()240	  day=now.day241	  day2=str(day)242	  if len(day2) == 1 :243	    day2='0'+str(day)244	   245	  mon=now.month246	  mon2=str(mon)247	  if len(mon2) == 1 :248	    mon2='0'+str(mon)   249	  send_sms_url = 'http://ultoo.in/home.php'250	  send_sms_data = urlencode({'MessageLength':'140','MobileNos':tono,'Message': msg,'SendNow': 'Send Now','SendNowBtn':'Send Now','Day':day2,'Month':mon2,'Year':now.year,'TimeInterval':'09:00 - 09:59','CCode':''})251	  opener.addheaders = [('Referer','http://ultoo.com/home.php')]  252	  sms_sent_page = opener.open(send_sms_url,send_sms_data)253	  254	  255	elif pro =='s':  #sitetosms.com256	  #self.response.out.write("Site 2 sms  > "+urltosend)257	  msg2= '('+name+'):'+msg258	  msg=msg2	  259	  cj = cookielib.CookieJar()260	  opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))261	  opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]262	  url = 'http://www.site2sms.com/auth.asp'263	  url_data = urlencode({'txtCCode':'91','userid':mobno,'Password':pswd,'Submit':'Login'})	    	    264	  usock = opener.open(url, url_data)265	  send_sms_url = 'http://www.site2sms.com/user/send_sms_next.asp'266	  send_sms_data = urlencode({'txtCategory':'40','txtGroup':'0','txtLeft': len(msg) - TOTAL,'txtMessage': msg, 'txtMobileNo': tono,'txtUsed':len(msg)})      267	  opener.addheaders = [('Referer','http://www.site2sms.com/user/send_sms.asp')]  268	  sms_sent_page = opener.open(send_sms_url,send_sms_data)		  269	elif pro == 'w': #way2sms.com270	  msg2= '('+name+'):'+msg271	  msg=msg2	  272	  cj = cookielib.CookieJar()273	  opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))274	  opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]275	  url='http://site4.way2sms.com/Login1.action'276	  url_data = urlencode({'username':mobno,'password':pswd,'button':'Login'})277	  usock = opener.open(url, url_data)278	  send_sms_url = 'http://site4.way2sms.com/quicksms.action'279	  send_sms_data = urlencode({'HiddenAction':'instantsms','catnamedis':'Birthday','Action': 'sa65sdf656fdfd','chkall': 'on','MobNo':tono,'textArea':msg})280	  opener.addheaders = [('Referer','http://site4.way2sms.com/jsp/InstantSMS.jsp')]  281	  sms_sent_page = opener.open(send_sms_url,send_sms_data)282	elif pro == 'b': #160by2.com283	  self.response.out.write("enter")284	  msg2= '('+name+'):'+msg285	  msg=msg2	  286	  cj = cookielib.CookieJar()287	  opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))288	  opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]289	  url='http://160by2.com/re-login'290	  opener.addheaders = [('Referer','Http://160by2.com/')]  291	  url_data = urlencode({'username':mobno,'password':pswd,'button':'Login'})292	  usock = opener.open(url, url_data)293	  self.response.out.write("login :"+mobno+pswd)294	  send_sms_url = 'http://160by2.com/SendSMSAction'295	  send_sms_data = urlencode({'hid_exists':'no','action1':'sf55sa5655sdf5','mobile1':tono,'msg1':msg,'sel_month':0,'sel_day':0,'sel_year':0,'sel_hour':'hh','sel_minute':'mm','sel_cat':0,'messid_0':'','messid_1':'','messid_2':'','messid_3':''})296	  opener.addheaders = [('Referer','http://160by2.com/SendSMS?id=9AEF17EFFA4653F27EEFEC3CA5F4BA30.05')]  297	  sms_sent_page = opener.open(send_sms_url,send_sms_data)	  298	299	300      #if else end here301      elif key == 'x':302	self.response.out.write("<br>" )303	commands =smsdata[1].split('>')304	c1 = commands[0]305	c2 = commands[1]306	if c1 == "all" :307	  if c2 == "disp":308	    self.response.out.write("<br>" )309	    self.response.out.write("<Table>")310	    self.response.out.write("<tr>")311	    self.response.out.write("<td>No</td><td>Code</td><td>Name</td><td>Moblie Number</td><td>Password</td><td>Provider</td>")312	    self.response.out.write("</tr>")313	    members = db.GqlQuery("SELECT * FROM Registration WHERE ANCESTOR IS :1 ", registration_key())314	    count = 1315	    for member in members:316	      self.response.out.write("<tr>")317	      self.response.out.write("<td>"+str(count)+"</td><td>"+member.code +"</td><td>"+member.name+"</td><td>"+member.mobno+"</td><td>"+member.pwd+"</td><td>"+member.pro  )318	      self.response.out.write("</tr>" )	319	      count =count +1320	    self.response.out.write("</Table>")321	  elif c2 == "del":322	    deleteallreg = db.GqlQuery("SELECT * FROM Registration WHERE ANCESTOR IS :1 ", registration_key())323	    if deleteallreg is not None:324	      db.delete(deleteallreg)	    325	    deleteall = db.GqlQuery("SELECT * FROM Code WHERE ANCESTOR IS :1 ", code_key())326	    if deleteall is not None:327	      db.delete(deleteall)328	    self.response.out.write("Database cleared successfully" )329	elif c1 == "block":330	  blkuser = db.GqlQuery("SELECT * FROM Registration WHERE ANCESTOR IS :1 and code=:2 ", registration_key(),c2).get()331	  if blkuser is not None:332	    name = blkuser.name333	    mobno =blkuser.mobno334	    pswd = blkuser.pwd335	    pro =blkuser.pro336	    db.delete(blkuser)337	  usrdata = Registration(parent=registration_key())			338	  usrdata.code = '0'339	  usrdata.mobno = mobno340	  usrdata.name = name341	  usrdata.pro = pro342	  usrdata.pwd = pswd343	  usrdata.put()	344	  self.response.out.write("User Blocked :"+name+"  "+mobno )345	elif c1=="unblock":346	  ublkuser = db.GqlQuery("SELECT * FROM Registration WHERE ANCESTOR IS :1 and mobno=:2 ", registration_key(),c2).get()347	  if ublkuser is not None:348	    self.response.out.write("User Unblocked :"+ublkuser.name+"  "+ublkuser.mobno )349	    db.delete(ublkuser)	  350	    351      352          353    else:354                  op = urllib2.build_opener()355		  url='http://thesaurus.com/browse/'		  356		  #self.response.out.write("<html><body>")357		  l=[]358		  l.append(url)359		  l.append(data[0])360		  url=''.join(l)361		  intake=op.open(url)362		  page=intake.read()363		  line=page.split('\n')364		  new=set([])365		  for i in range (len(line)) :366			mat1=re.match(r'.*Antonyms:.*',line[i],re.M)367			if mat1:368			      i=i+2
...test_url_policy_open.py
Source:test_url_policy_open.py  
...61            self.follow_reference_calls = []62        def follow_reference(self, url):63            self.follow_reference_calls.append(url)64            return self._reference_values[url]65    def make_branch_opener(self, should_follow_references, references,66                           unsafe_urls=None):67        policy = _BlacklistPolicy(should_follow_references, unsafe_urls)68        opener = self.StubbedBranchOpener(references, policy)69        return opener70    def test_check_initial_url(self):71        # check_and_follow_branch_reference rejects all URLs that are not72        # allowed.73        opener = self.make_branch_opener(None, [], set(['a']))74        self.assertRaises(75            BadUrl, opener.check_and_follow_branch_reference, 'a')76    def test_not_reference(self):77        # When branch references are forbidden, check_and_follow_branch_reference78        # does not raise on non-references.79        opener = self.make_branch_opener(False, ['a', None])80        self.assertEqual(81            'a', opener.check_and_follow_branch_reference('a'))82        self.assertEqual(['a'], opener.follow_reference_calls)83    def test_branch_reference_forbidden(self):84        # check_and_follow_branch_reference raises BranchReferenceForbidden if85        # branch references are forbidden and the source URL points to a86        # branch reference.87        opener = self.make_branch_opener(False, ['a', 'b'])88        self.assertRaises(89            BranchReferenceForbidden,90            opener.check_and_follow_branch_reference, 'a')91        self.assertEqual(['a'], opener.follow_reference_calls)92    def test_allowed_reference(self):93        # check_and_follow_branch_reference does not raise if following references94        # is allowed and the source URL points to a branch reference to a95        # permitted location.96        opener = self.make_branch_opener(True, ['a', 'b', None])97        self.assertEqual(98            'b', opener.check_and_follow_branch_reference('a'))99        self.assertEqual(['a', 'b'], opener.follow_reference_calls)100    def test_check_referenced_urls(self):101        # check_and_follow_branch_reference checks if the URL a reference points102        # to is safe.103        opener = self.make_branch_opener(104            True, ['a', 'b', None], unsafe_urls=set('b'))105        self.assertRaises(106            BadUrl, opener.check_and_follow_branch_reference, 'a')107        self.assertEqual(['a'], opener.follow_reference_calls)108    def test_self_referencing_branch(self):109        # check_and_follow_branch_reference raises BranchReferenceLoopError if110        # following references is allowed and the source url points to a111        # self-referencing branch reference.112        opener = self.make_branch_opener(True, ['a', 'a'])113        self.assertRaises(114            BranchLoopError, opener.check_and_follow_branch_reference, 'a')115        self.assertEqual(['a'], opener.follow_reference_calls)116    def test_branch_reference_loop(self):117        # check_and_follow_branch_reference raises BranchReferenceLoopError if118        # following references is allowed and the source url points to a loop119        # of branch references.120        references = ['a', 'b', 'a']121        opener = self.make_branch_opener(True, references)122        self.assertRaises(123            BranchLoopError, opener.check_and_follow_branch_reference, 'a')124        self.assertEqual(['a', 'b'], opener.follow_reference_calls)125class TrackingProber(BzrProber):126    """Subclass of BzrProber which tracks URLs it has been asked to open."""127    seen_urls = []128    @classmethod129    def probe_transport(klass, transport):130        klass.seen_urls.append(transport.base)131        return BzrProber.probe_transport(transport)132class TestBranchOpenerStacking(TestCaseWithTransport):133    def setUp(self):134        super(TestBranchOpenerStacking, self).setUp()135        BranchOpener.install_hook()136    def make_branch_opener(self, allowed_urls, probers=None):137        policy = WhitelistPolicy(True, allowed_urls, True)138        return BranchOpener(policy, probers)139    def test_probers(self):140        # Only the specified probers should be used141        b = self.make_branch('branch')142        opener = self.make_branch_opener([b.base], probers=[])143        self.assertRaises(NotBranchError, opener.open, b.base)144        opener = self.make_branch_opener([b.base], probers=[BzrProber])145        self.assertEqual(b.base, opener.open(b.base).base)146    def test_default_probers(self):147        # If no probers are specified to the constructor148        # of BranchOpener, then a safe set will be used,149        # rather than all probers registered in bzr.150        self.addCleanup(ControlDirFormat.unregister_prober, TrackingProber)151        ControlDirFormat.register_prober(TrackingProber)152        # Open a location without any branches, so that all probers are153        # tried.154        # First, check that the TrackingProber tracks correctly.155        TrackingProber.seen_urls = []156        opener = self.make_branch_opener(["."], probers=[TrackingProber])157        self.assertRaises(NotBranchError, opener.open, ".")158        self.assertEqual(1, len(TrackingProber.seen_urls))159        TrackingProber.seen_urls = []160        # And make sure it's registered in such a way that ControlDir.open would161        # use it.162        self.assertRaises(NotBranchError, ControlDir.open, ".")163        self.assertEqual(1, len(TrackingProber.seen_urls))164    def test_allowed_url(self):165        # the opener does not raise an exception for branches stacked on166        # branches with allowed URLs.167        stacked_on_branch = self.make_branch('base-branch', format='1.6')168        stacked_branch = self.make_branch('stacked-branch', format='1.6')169        stacked_branch.set_stacked_on_url(stacked_on_branch.base)170        opener = self.make_branch_opener(171            [stacked_branch.base, stacked_on_branch.base])172        # This doesn't raise an exception.173        opener.open(stacked_branch.base)174    def test_nstackable_repository(self):175        # treats branches with UnstackableRepositoryFormats as176        # being not stacked.177        branch = self.make_branch('unstacked', format='knit')178        opener = self.make_branch_opener([branch.base])179        # This doesn't raise an exception.180        opener.open(branch.base)181    def test_allowed_relative_url(self):182        # passes on absolute urls to check_one_url, even if the183        # value of stacked_on_location in the config is set to a relative URL.184        stacked_on_branch = self.make_branch('base-branch', format='1.6')185        stacked_branch = self.make_branch('stacked-branch', format='1.6')186        stacked_branch.set_stacked_on_url('../base-branch')187        opener = self.make_branch_opener(188            [stacked_branch.base, stacked_on_branch.base])189        # Note that stacked_on_branch.base is not '../base-branch', it's an190        # absolute URL.191        self.assertNotEqual('../base-branch', stacked_on_branch.base)192        # This doesn't raise an exception.193        opener.open(stacked_branch.base)194    def test_allowed_relative_nested(self):195        # Relative URLs are resolved relative to the stacked branch.196        self.get_transport().mkdir('subdir')197        a = self.make_branch('subdir/a', format='1.6')198        b = self.make_branch('b', format='1.6')199        b.set_stacked_on_url('../subdir/a')200        c = self.make_branch('subdir/c', format='1.6')201        c.set_stacked_on_url('../../b')202        opener = self.make_branch_opener([c.base, b.base, a.base])203        # This doesn't raise an exception.204        opener.open(c.base)205    def test_forbidden_url(self):206        # raises a BadUrl exception if a branch is stacked on a207        # branch with a forbidden URL.208        stacked_on_branch = self.make_branch('base-branch', format='1.6')209        stacked_branch = self.make_branch('stacked-branch', format='1.6')210        stacked_branch.set_stacked_on_url(stacked_on_branch.base)211        opener = self.make_branch_opener([stacked_branch.base])212        self.assertRaises(BadUrl, opener.open, stacked_branch.base)213    def test_forbidden_url_nested(self):214        # raises a BadUrl exception if a branch is stacked on a215        # branch that is in turn stacked on a branch with a forbidden URL.216        a = self.make_branch('a', format='1.6')217        b = self.make_branch('b', format='1.6')218        b.set_stacked_on_url(a.base)219        c = self.make_branch('c', format='1.6')220        c.set_stacked_on_url(b.base)221        opener = self.make_branch_opener([c.base, b.base])222        self.assertRaises(BadUrl, opener.open, c.base)223    def test_self_stacked_branch(self):224        # raises StackingLoopError if a branch is stacked on225        # itself. This avoids infinite recursion errors.226        a = self.make_branch('a', format='1.6')227        # Bazaar 1.17 and up make it harder to create branches like this.228        # It's still worth testing that we don't blow up in the face of them,229        # so we grovel around a bit to create one anyway.230        a.get_config().set_user_option('stacked_on_location', a.base)231        opener = self.make_branch_opener([a.base])232        self.assertRaises(BranchLoopError, opener.open, a.base)233    def test_loop_stacked_branch(self):234        # raises StackingLoopError if a branch is stacked in such235        # a way so that it is ultimately stacked on itself. e.g. a stacked on236        # b stacked on a.237        a = self.make_branch('a', format='1.6')238        b = self.make_branch('b', format='1.6')239        a.set_stacked_on_url(b.base)240        b.set_stacked_on_url(a.base)241        opener = self.make_branch_opener([a.base, b.base])242        self.assertRaises(BranchLoopError, opener.open, a.base)243        self.assertRaises(BranchLoopError, opener.open, b.base)244    def test_custom_opener(self):245        # A custom function for opening a control dir can be specified.246        a = self.make_branch('a', format='2a')247        b = self.make_branch('b', format='2a')248        b.set_stacked_on_url(a.base)249        TrackingProber.seen_urls = []250        opener = self.make_branch_opener(251            [a.base, b.base], probers=[TrackingProber])252        opener.open(b.base)253        self.assertEqual(254            set(TrackingProber.seen_urls), set([b.base, a.base]))255    def test_custom_opener_with_branch_reference(self):256        # A custom function for opening a control dir can be specified.257        a = self.make_branch('a', format='2a')258        b_dir = self.make_bzrdir('b')259        b = BranchReferenceFormat().initialize(b_dir, target_branch=a)260        TrackingProber.seen_urls = []261        opener = self.make_branch_opener(262            [a.base, b.base], probers=[TrackingProber])263        opener.open(b.base)264        self.assertEqual(265            set(TrackingProber.seen_urls), set([b.base, a.base]))266class TestOpenOnlyScheme(TestCaseWithTransport):267    """Tests for `open_only_scheme`."""268    def setUp(self):269        super(TestOpenOnlyScheme, self).setUp()270        BranchOpener.install_hook()271    def test_hook_does_not_interfere(self):272        # The transform_fallback_location hook does not interfere with regular273        # stacked branch access outside of open_only_scheme.274        self.make_branch('stacked')275        self.make_branch('stacked-on')...test_Request.py
Source:test_Request.py  
1# -*- coding: utf-8 -*-2# (c) 2018 Matt Martz <matt@sivel.net>3# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)4from __future__ import absolute_import, division, print_function5__metaclass__ = type6import datetime7import os8from ansible.module_utils.urls import (Request, open_url, urllib_request, HAS_SSLCONTEXT, cookiejar, RequestWithMethod,9                                       UnixHTTPHandler, UnixHTTPSConnection, httplib)10from ansible.module_utils.urls import SSLValidationHandler, HTTPSClientAuthHandler, RedirectHandlerFactory11import pytest12from mock import call13if HAS_SSLCONTEXT:14    import ssl15@pytest.fixture16def urlopen_mock(mocker):17    return mocker.patch('ansible.module_utils.urls.urllib_request.urlopen')18@pytest.fixture19def install_opener_mock(mocker):20    return mocker.patch('ansible.module_utils.urls.urllib_request.install_opener')21def test_Request_fallback(urlopen_mock, install_opener_mock, mocker):22    cookies = cookiejar.CookieJar()23    request = Request(24        headers={'foo': 'bar'},25        use_proxy=False,26        force=True,27        timeout=100,28        validate_certs=False,29        url_username='user',30        url_password='passwd',31        http_agent='ansible-tests',32        force_basic_auth=True,33        follow_redirects='all',34        client_cert='/tmp/client.pem',35        client_key='/tmp/client.key',36        cookies=cookies,37        unix_socket='/foo/bar/baz.sock',38        ca_path='/foo/bar/baz.pem',39    )40    fallback_mock = mocker.spy(request, '_fallback')41    r = request.open('GET', 'https://ansible.com')42    calls = [43        call(None, False),  # use_proxy44        call(None, True),  # force45        call(None, 100),  # timeout46        call(None, False),  # validate_certs47        call(None, 'user'),  # url_username48        call(None, 'passwd'),  # url_password49        call(None, 'ansible-tests'),  # http_agent50        call(None, True),  # force_basic_auth51        call(None, 'all'),  # follow_redirects52        call(None, '/tmp/client.pem'),  # client_cert53        call(None, '/tmp/client.key'),  # client_key54        call(None, cookies),  # cookies55        call(None, '/foo/bar/baz.sock'),  # unix_socket56        call(None, '/foo/bar/baz.pem'),  # ca_path57    ]58    fallback_mock.assert_has_calls(calls)59    assert fallback_mock.call_count == 14  # All but headers use fallback60    args = urlopen_mock.call_args[0]61    assert args[1] is None  # data, this is handled in the Request not urlopen62    assert args[2] == 100  # timeout63    req = args[0]64    assert req.headers == {65        'Authorization': b'Basic dXNlcjpwYXNzd2Q=',66        'Cache-control': 'no-cache',67        'Foo': 'bar',68        'User-agent': 'ansible-tests'69    }70    assert req.data is None71    assert req.get_method() == 'GET'72def test_Request_open(urlopen_mock, install_opener_mock):73    r = Request().open('GET', 'https://ansible.com/')74    args = urlopen_mock.call_args[0]75    assert args[1] is None  # data, this is handled in the Request not urlopen76    assert args[2] == 10  # timeout77    req = args[0]78    assert req.headers == {}79    assert req.data is None80    assert req.get_method() == 'GET'81    opener = install_opener_mock.call_args[0][0]82    handlers = opener.handlers83    if not HAS_SSLCONTEXT:84        expected_handlers = (85            SSLValidationHandler,86            RedirectHandlerFactory(),  # factory, get handler87        )88    else:89        expected_handlers = (90            RedirectHandlerFactory(),  # factory, get handler91        )92    found_handlers = []93    for handler in handlers:94        if isinstance(handler, SSLValidationHandler) or handler.__class__.__name__ == 'RedirectHandler':95            found_handlers.append(handler)96    assert len(found_handlers) == len(expected_handlers)97def test_Request_open_http(urlopen_mock, install_opener_mock):98    r = Request().open('GET', 'http://ansible.com/')99    args = urlopen_mock.call_args[0]100    opener = install_opener_mock.call_args[0][0]101    handlers = opener.handlers102    found_handlers = []103    for handler in handlers:104        if isinstance(handler, SSLValidationHandler):105            found_handlers.append(handler)106    assert len(found_handlers) == 0107def test_Request_open_unix_socket(urlopen_mock, install_opener_mock):108    r = Request().open('GET', 'http://ansible.com/', unix_socket='/foo/bar/baz.sock')109    args = urlopen_mock.call_args[0]110    opener = install_opener_mock.call_args[0][0]111    handlers = opener.handlers112    found_handlers = []113    for handler in handlers:114        if isinstance(handler, UnixHTTPHandler):115            found_handlers.append(handler)116    assert len(found_handlers) == 1117def test_Request_open_https_unix_socket(urlopen_mock, install_opener_mock):118    r = Request().open('GET', 'https://ansible.com/', unix_socket='/foo/bar/baz.sock')119    args = urlopen_mock.call_args[0]120    opener = install_opener_mock.call_args[0][0]121    handlers = opener.handlers122    found_handlers = []123    for handler in handlers:124        if isinstance(handler, HTTPSClientAuthHandler):125            found_handlers.append(handler)126    assert len(found_handlers) == 1127    inst = found_handlers[0]._build_https_connection('foo')128    assert isinstance(inst, UnixHTTPSConnection)129def test_Request_open_ftp(urlopen_mock, install_opener_mock, mocker):130    mocker.patch('ansible.module_utils.urls.ParseResultDottedDict.as_list', side_effect=AssertionError)131    # Using ftp scheme should prevent the AssertionError side effect to fire132    r = Request().open('GET', 'ftp://foo@ansible.com/')133def test_Request_open_headers(urlopen_mock, install_opener_mock):134    r = Request().open('GET', 'http://ansible.com/', headers={'Foo': 'bar'})135    args = urlopen_mock.call_args[0]136    req = args[0]137    assert req.headers == {'Foo': 'bar'}138def test_Request_open_username(urlopen_mock, install_opener_mock):139    r = Request().open('GET', 'http://ansible.com/', url_username='user')140    opener = install_opener_mock.call_args[0][0]141    handlers = opener.handlers142    expected_handlers = (143        urllib_request.HTTPBasicAuthHandler,144        urllib_request.HTTPDigestAuthHandler,145    )146    found_handlers = []147    for handler in handlers:148        if isinstance(handler, expected_handlers):149            found_handlers.append(handler)150    assert len(found_handlers) == 2151    assert found_handlers[0].passwd.passwd[None] == {(('ansible.com', '/'),): ('user', None)}152def test_Request_open_username_in_url(urlopen_mock, install_opener_mock):153    r = Request().open('GET', 'http://user2@ansible.com/')154    opener = install_opener_mock.call_args[0][0]155    handlers = opener.handlers156    expected_handlers = (157        urllib_request.HTTPBasicAuthHandler,158        urllib_request.HTTPDigestAuthHandler,159    )160    found_handlers = []161    for handler in handlers:162        if isinstance(handler, expected_handlers):163            found_handlers.append(handler)164    assert found_handlers[0].passwd.passwd[None] == {(('ansible.com', '/'),): ('user2', '')}165def test_Request_open_username_force_basic(urlopen_mock, install_opener_mock):166    r = Request().open('GET', 'http://ansible.com/', url_username='user', url_password='passwd', force_basic_auth=True)167    opener = install_opener_mock.call_args[0][0]168    handlers = opener.handlers169    expected_handlers = (170        urllib_request.HTTPBasicAuthHandler,171        urllib_request.HTTPDigestAuthHandler,172    )173    found_handlers = []174    for handler in handlers:175        if isinstance(handler, expected_handlers):176            found_handlers.append(handler)177    assert len(found_handlers) == 0178    args = urlopen_mock.call_args[0]179    req = args[0]180    assert req.headers.get('Authorization') == b'Basic dXNlcjpwYXNzd2Q='181def test_Request_open_auth_in_netloc(urlopen_mock, install_opener_mock):182    r = Request().open('GET', 'http://user:passwd@ansible.com/')183    args = urlopen_mock.call_args[0]184    req = args[0]185    assert req.get_full_url() == 'http://ansible.com/'186    opener = install_opener_mock.call_args[0][0]187    handlers = opener.handlers188    expected_handlers = (189        urllib_request.HTTPBasicAuthHandler,190        urllib_request.HTTPDigestAuthHandler,191    )192    found_handlers = []193    for handler in handlers:194        if isinstance(handler, expected_handlers):195            found_handlers.append(handler)196    assert len(found_handlers) == 2197def test_Request_open_netrc(urlopen_mock, install_opener_mock, monkeypatch):198    here = os.path.dirname(__file__)199    monkeypatch.setenv('NETRC', os.path.join(here, 'fixtures/netrc'))200    r = Request().open('GET', 'http://ansible.com/')201    args = urlopen_mock.call_args[0]202    req = args[0]203    assert req.headers.get('Authorization') == b'Basic dXNlcjpwYXNzd2Q='204    r = Request().open('GET', 'http://foo.ansible.com/')205    args = urlopen_mock.call_args[0]206    req = args[0]207    assert 'Authorization' not in req.headers208    monkeypatch.setenv('NETRC', os.path.join(here, 'fixtures/netrc.nonexistant'))209    r = Request().open('GET', 'http://ansible.com/')210    args = urlopen_mock.call_args[0]211    req = args[0]212    assert 'Authorization' not in req.headers213def test_Request_open_no_proxy(urlopen_mock, install_opener_mock, mocker):214    build_opener_mock = mocker.patch('ansible.module_utils.urls.urllib_request.build_opener')215    r = Request().open('GET', 'http://ansible.com/', use_proxy=False)216    handlers = build_opener_mock.call_args[0]217    found_handlers = []218    for handler in handlers:219        if isinstance(handler, urllib_request.ProxyHandler):220            found_handlers.append(handler)221    assert len(found_handlers) == 1222@pytest.mark.skipif(not HAS_SSLCONTEXT, reason="requires SSLContext")223def test_Request_open_no_validate_certs(urlopen_mock, install_opener_mock):224    r = Request().open('GET', 'https://ansible.com/', validate_certs=False)225    opener = install_opener_mock.call_args[0][0]226    handlers = opener.handlers227    ssl_handler = None228    for handler in handlers:229        if isinstance(handler, HTTPSClientAuthHandler):230            ssl_handler = handler231            break232    assert ssl_handler is not None233    inst = ssl_handler._build_https_connection('foo')234    assert isinstance(inst, httplib.HTTPSConnection)235    context = ssl_handler._context236    assert context.protocol == ssl.PROTOCOL_SSLv23237    if ssl.OP_NO_SSLv2:238        assert context.options & ssl.OP_NO_SSLv2239    assert context.options & ssl.OP_NO_SSLv3240    assert context.verify_mode == ssl.CERT_NONE241    assert context.check_hostname is False242def test_Request_open_client_cert(urlopen_mock, install_opener_mock):243    here = os.path.dirname(__file__)244    client_cert = os.path.join(here, 'fixtures/client.pem')245    client_key = os.path.join(here, 'fixtures/client.key')246    r = Request().open('GET', 'https://ansible.com/', client_cert=client_cert, client_key=client_key)247    opener = install_opener_mock.call_args[0][0]248    handlers = opener.handlers249    ssl_handler = None250    for handler in handlers:251        if isinstance(handler, HTTPSClientAuthHandler):252            ssl_handler = handler253            break254    assert ssl_handler is not None255    assert ssl_handler.client_cert == client_cert256    assert ssl_handler.client_key == client_key257    https_connection = ssl_handler._build_https_connection('ansible.com')258    assert https_connection.key_file == client_key259    assert https_connection.cert_file == client_cert260def test_Request_open_cookies(urlopen_mock, install_opener_mock):261    r = Request().open('GET', 'https://ansible.com/', cookies=cookiejar.CookieJar())262    opener = install_opener_mock.call_args[0][0]263    handlers = opener.handlers264    cookies_handler = None265    for handler in handlers:266        if isinstance(handler, urllib_request.HTTPCookieProcessor):267            cookies_handler = handler268            break269    assert cookies_handler is not None270def test_Request_open_invalid_method(urlopen_mock, install_opener_mock):271    r = Request().open('UNKNOWN', 'https://ansible.com/')272    args = urlopen_mock.call_args[0]273    req = args[0]274    assert req.data is None275    assert req.get_method() == 'UNKNOWN'276    # assert r.status == 504277def test_Request_open_custom_method(urlopen_mock, install_opener_mock):278    r = Request().open('DELETE', 'https://ansible.com/')279    args = urlopen_mock.call_args[0]280    req = args[0]281    assert isinstance(req, RequestWithMethod)282def test_Request_open_user_agent(urlopen_mock, install_opener_mock):283    r = Request().open('GET', 'https://ansible.com/', http_agent='ansible-tests')284    args = urlopen_mock.call_args[0]285    req = args[0]286    assert req.headers.get('User-agent') == 'ansible-tests'287def test_Request_open_force(urlopen_mock, install_opener_mock):288    r = Request().open('GET', 'https://ansible.com/', force=True, last_mod_time=datetime.datetime.now())289    args = urlopen_mock.call_args[0]290    req = args[0]291    assert req.headers.get('Cache-control') == 'no-cache'292    assert 'If-modified-since' not in req.headers293def test_Request_open_last_mod(urlopen_mock, install_opener_mock):294    now = datetime.datetime.now()295    r = Request().open('GET', 'https://ansible.com/', last_mod_time=now)296    args = urlopen_mock.call_args[0]297    req = args[0]298    assert req.headers.get('If-modified-since') == now.strftime('%a, %d %b %Y %H:%M:%S -0000')299def test_Request_open_headers_not_dict(urlopen_mock, install_opener_mock):300    with pytest.raises(ValueError):301        Request().open('GET', 'https://ansible.com/', headers=['bob'])302def test_Request_init_headers_not_dict(urlopen_mock, install_opener_mock):303    with pytest.raises(ValueError):304        Request(headers=['bob'])305@pytest.mark.parametrize('method,kwargs', [306    ('get', {}),307    ('options', {}),308    ('head', {}),309    ('post', {'data': None}),310    ('put', {'data': None}),311    ('patch', {'data': None}),312    ('delete', {}),313])314def test_methods(method, kwargs, mocker):315    expected = method.upper()316    open_mock = mocker.patch('ansible.module_utils.urls.Request.open')317    request = Request()318    getattr(request, method)('https://ansible.com')319    open_mock.assert_called_once_with(expected, 'https://ansible.com', **kwargs)320def test_open_url(urlopen_mock, install_opener_mock, mocker):321    req_mock = mocker.patch('ansible.module_utils.urls.Request.open')322    open_url('https://ansible.com/')323    req_mock.assert_called_once_with('GET', 'https://ansible.com/', data=None, headers=None, use_proxy=True,324                                     force=False, last_mod_time=None, timeout=10, validate_certs=True,325                                     url_username=None, url_password=None, http_agent=None,326                                     force_basic_auth=False, follow_redirects='urllib2',327                                     client_cert=None, client_key=None, cookies=None, use_gssapi=False,...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
