How to use selectors method in autotest

Best Python code snippet using autotest_python

test_selectors.py

Source:test_selectors.py Github

copy

Full Screen

1import errno2import os3import random4import selectors5import signal6import socket7import sys8from test import support9from time import sleep10import unittest11import unittest.mock12import tempfile13from time import monotonic as time14try:15 import resource16except ImportError:17 resource = None18if hasattr(socket, 'socketpair'):19 socketpair = socket.socketpair20else:21 def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0):22 with socket.socket(family, type, proto) as l:23 l.bind((support.HOST, 0))24 l.listen()25 c = socket.socket(family, type, proto)26 try:27 c.connect(l.getsockname())28 caddr = c.getsockname()29 while True:30 a, addr = l.accept()31 # check that we've got the correct client32 if addr == caddr:33 return c, a34 a.close()35 except OSError:36 c.close()37 raise38def find_ready_matching(ready, flag):39 match = []40 for key, events in ready:41 if events & flag:42 match.append(key.fileobj)43 return match44class BaseSelectorTestCase(unittest.TestCase):45 def make_socketpair(self):46 rd, wr = socketpair()47 self.addCleanup(rd.close)48 self.addCleanup(wr.close)49 return rd, wr50 def test_register(self):51 s = self.SELECTOR()52 self.addCleanup(s.close)53 rd, wr = self.make_socketpair()54 key = s.register(rd, selectors.EVENT_READ, "data")55 self.assertIsInstance(key, selectors.SelectorKey)56 self.assertEqual(key.fileobj, rd)57 self.assertEqual(key.fd, rd.fileno())58 self.assertEqual(key.events, selectors.EVENT_READ)59 self.assertEqual(key.data, "data")60 # register an unknown event61 self.assertRaises(ValueError, s.register, 0, 999999)62 # register an invalid FD63 self.assertRaises(ValueError, s.register, -10, selectors.EVENT_READ)64 # register twice65 self.assertRaises(KeyError, s.register, rd, selectors.EVENT_READ)66 # register the same FD, but with a different object67 self.assertRaises(KeyError, s.register, rd.fileno(),68 selectors.EVENT_READ)69 def test_unregister(self):70 s = self.SELECTOR()71 self.addCleanup(s.close)72 rd, wr = self.make_socketpair()73 s.register(rd, selectors.EVENT_READ)74 s.unregister(rd)75 # unregister an unknown file obj76 self.assertRaises(KeyError, s.unregister, 999999)77 # unregister twice78 self.assertRaises(KeyError, s.unregister, rd)79 def test_unregister_after_fd_close(self):80 s = self.SELECTOR()81 self.addCleanup(s.close)82 rd, wr = self.make_socketpair()83 r, w = rd.fileno(), wr.fileno()84 s.register(r, selectors.EVENT_READ)85 s.register(w, selectors.EVENT_WRITE)86 rd.close()87 wr.close()88 s.unregister(r)89 s.unregister(w)90 @unittest.skipUnless(os.name == 'posix', "requires posix")91 def test_unregister_after_fd_close_and_reuse(self):92 s = self.SELECTOR()93 self.addCleanup(s.close)94 rd, wr = self.make_socketpair()95 r, w = rd.fileno(), wr.fileno()96 s.register(r, selectors.EVENT_READ)97 s.register(w, selectors.EVENT_WRITE)98 rd2, wr2 = self.make_socketpair()99 rd.close()100 wr.close()101 os.dup2(rd2.fileno(), r)102 os.dup2(wr2.fileno(), w)103 self.addCleanup(os.close, r)104 self.addCleanup(os.close, w)105 s.unregister(r)106 s.unregister(w)107 def test_unregister_after_socket_close(self):108 s = self.SELECTOR()109 self.addCleanup(s.close)110 rd, wr = self.make_socketpair()111 s.register(rd, selectors.EVENT_READ)112 s.register(wr, selectors.EVENT_WRITE)113 rd.close()114 wr.close()115 s.unregister(rd)116 s.unregister(wr)117 def test_modify(self):118 s = self.SELECTOR()119 self.addCleanup(s.close)120 rd, wr = self.make_socketpair()121 key = s.register(rd, selectors.EVENT_READ)122 # modify events123 key2 = s.modify(rd, selectors.EVENT_WRITE)124 self.assertNotEqual(key.events, key2.events)125 self.assertEqual(key2, s.get_key(rd))126 s.unregister(rd)127 # modify data128 d1 = object()129 d2 = object()130 key = s.register(rd, selectors.EVENT_READ, d1)131 key2 = s.modify(rd, selectors.EVENT_READ, d2)132 self.assertEqual(key.events, key2.events)133 self.assertNotEqual(key.data, key2.data)134 self.assertEqual(key2, s.get_key(rd))135 self.assertEqual(key2.data, d2)136 # modify unknown file obj137 self.assertRaises(KeyError, s.modify, 999999, selectors.EVENT_READ)138 # modify use a shortcut139 d3 = object()140 s.register = unittest.mock.Mock()141 s.unregister = unittest.mock.Mock()142 s.modify(rd, selectors.EVENT_READ, d3)143 self.assertFalse(s.register.called)144 self.assertFalse(s.unregister.called)145 def test_modify_unregister(self):146 # Make sure the fd is unregister()ed in case of error on147 # modify(): http://bugs.python.org/issue30014148 if self.SELECTOR.__name__ == 'EpollSelector':149 patch = unittest.mock.patch(150 'selectors.EpollSelector._selector_cls')151 elif self.SELECTOR.__name__ == 'PollSelector':152 patch = unittest.mock.patch(153 'selectors.PollSelector._selector_cls')154 elif self.SELECTOR.__name__ == 'DevpollSelector':155 patch = unittest.mock.patch(156 'selectors.DevpollSelector._selector_cls')157 else:158 raise self.skipTest("")159 with patch as m:160 m.return_value.modify = unittest.mock.Mock(161 side_effect=ZeroDivisionError)162 s = self.SELECTOR()163 self.addCleanup(s.close)164 rd, wr = self.make_socketpair()165 s.register(rd, selectors.EVENT_READ)166 self.assertEqual(len(s._map), 1)167 with self.assertRaises(ZeroDivisionError):168 s.modify(rd, selectors.EVENT_WRITE)169 self.assertEqual(len(s._map), 0)170 def test_close(self):171 s = self.SELECTOR()172 self.addCleanup(s.close)173 mapping = s.get_map()174 rd, wr = self.make_socketpair()175 s.register(rd, selectors.EVENT_READ)176 s.register(wr, selectors.EVENT_WRITE)177 s.close()178 self.assertRaises(RuntimeError, s.get_key, rd)179 self.assertRaises(RuntimeError, s.get_key, wr)180 self.assertRaises(KeyError, mapping.__getitem__, rd)181 self.assertRaises(KeyError, mapping.__getitem__, wr)182 def test_get_key(self):183 s = self.SELECTOR()184 self.addCleanup(s.close)185 rd, wr = self.make_socketpair()186 key = s.register(rd, selectors.EVENT_READ, "data")187 self.assertEqual(key, s.get_key(rd))188 # unknown file obj189 self.assertRaises(KeyError, s.get_key, 999999)190 def test_get_map(self):191 s = self.SELECTOR()192 self.addCleanup(s.close)193 rd, wr = self.make_socketpair()194 keys = s.get_map()195 self.assertFalse(keys)196 self.assertEqual(len(keys), 0)197 self.assertEqual(list(keys), [])198 key = s.register(rd, selectors.EVENT_READ, "data")199 self.assertIn(rd, keys)200 self.assertEqual(key, keys[rd])201 self.assertEqual(len(keys), 1)202 self.assertEqual(list(keys), [rd.fileno()])203 self.assertEqual(list(keys.values()), [key])204 # unknown file obj205 with self.assertRaises(KeyError):206 keys[999999]207 # Read-only mapping208 with self.assertRaises(TypeError):209 del keys[rd]210 def test_select(self):211 s = self.SELECTOR()212 self.addCleanup(s.close)213 rd, wr = self.make_socketpair()214 s.register(rd, selectors.EVENT_READ)215 wr_key = s.register(wr, selectors.EVENT_WRITE)216 result = s.select()217 for key, events in result:218 self.assertTrue(isinstance(key, selectors.SelectorKey))219 self.assertTrue(events)220 self.assertFalse(events & ~(selectors.EVENT_READ |221 selectors.EVENT_WRITE))222 self.assertEqual([(wr_key, selectors.EVENT_WRITE)], result)223 def test_context_manager(self):224 s = self.SELECTOR()225 self.addCleanup(s.close)226 rd, wr = self.make_socketpair()227 with s as sel:228 sel.register(rd, selectors.EVENT_READ)229 sel.register(wr, selectors.EVENT_WRITE)230 self.assertRaises(RuntimeError, s.get_key, rd)231 self.assertRaises(RuntimeError, s.get_key, wr)232 def test_fileno(self):233 s = self.SELECTOR()234 self.addCleanup(s.close)235 if hasattr(s, 'fileno'):236 fd = s.fileno()237 self.assertTrue(isinstance(fd, int))238 self.assertGreaterEqual(fd, 0)239 def test_selector(self):240 s = self.SELECTOR()241 self.addCleanup(s.close)242 NUM_SOCKETS = 12243 MSG = b" This is a test."244 MSG_LEN = len(MSG)245 readers = []246 writers = []247 r2w = {}248 w2r = {}249 for i in range(NUM_SOCKETS):250 rd, wr = self.make_socketpair()251 s.register(rd, selectors.EVENT_READ)252 s.register(wr, selectors.EVENT_WRITE)253 readers.append(rd)254 writers.append(wr)255 r2w[rd] = wr256 w2r[wr] = rd257 bufs = []258 while writers:259 ready = s.select()260 ready_writers = find_ready_matching(ready, selectors.EVENT_WRITE)261 if not ready_writers:262 self.fail("no sockets ready for writing")263 wr = random.choice(ready_writers)264 wr.send(MSG)265 for i in range(10):266 ready = s.select()267 ready_readers = find_ready_matching(ready,268 selectors.EVENT_READ)269 if ready_readers:270 break271 # there might be a delay between the write to the write end and272 # the read end is reported ready273 sleep(0.1)274 else:275 self.fail("no sockets ready for reading")276 self.assertEqual([w2r[wr]], ready_readers)277 rd = ready_readers[0]278 buf = rd.recv(MSG_LEN)279 self.assertEqual(len(buf), MSG_LEN)280 bufs.append(buf)281 s.unregister(r2w[rd])282 s.unregister(rd)283 writers.remove(r2w[rd])284 self.assertEqual(bufs, [MSG] * NUM_SOCKETS)285 @unittest.skipIf(sys.platform == 'win32',286 'select.select() cannot be used with empty fd sets')287 def test_empty_select(self):288 # Issue #23009: Make sure EpollSelector.select() works when no FD is289 # registered.290 s = self.SELECTOR()291 self.addCleanup(s.close)292 self.assertEqual(s.select(timeout=0), [])293 def test_timeout(self):294 s = self.SELECTOR()295 self.addCleanup(s.close)296 rd, wr = self.make_socketpair()297 s.register(wr, selectors.EVENT_WRITE)298 t = time()299 self.assertEqual(1, len(s.select(0)))300 self.assertEqual(1, len(s.select(-1)))301 self.assertLess(time() - t, 0.5)302 s.unregister(wr)303 s.register(rd, selectors.EVENT_READ)304 t = time()305 self.assertFalse(s.select(0))306 self.assertFalse(s.select(-1))307 self.assertLess(time() - t, 0.5)308 t0 = time()309 self.assertFalse(s.select(1))310 t1 = time()311 dt = t1 - t0312 # Tolerate 2.0 seconds for very slow buildbots313 self.assertTrue(0.8 <= dt <= 2.0, dt)314 @unittest.skipUnless(hasattr(signal, "alarm"),315 "signal.alarm() required for this test")316 def test_select_interrupt_exc(self):317 s = self.SELECTOR()318 self.addCleanup(s.close)319 rd, wr = self.make_socketpair()320 class InterruptSelect(Exception):321 pass322 def handler(*args):323 raise InterruptSelect324 orig_alrm_handler = signal.signal(signal.SIGALRM, handler)325 self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)326 try:327 signal.alarm(1)328 s.register(rd, selectors.EVENT_READ)329 t = time()330 # select() is interrupted by a signal which raises an exception331 with self.assertRaises(InterruptSelect):332 s.select(30)333 # select() was interrupted before the timeout of 30 seconds334 self.assertLess(time() - t, 5.0)335 finally:336 signal.alarm(0)337 @unittest.skipUnless(hasattr(signal, "alarm"),338 "signal.alarm() required for this test")339 def test_select_interrupt_noraise(self):340 s = self.SELECTOR()341 self.addCleanup(s.close)342 rd, wr = self.make_socketpair()343 orig_alrm_handler = signal.signal(signal.SIGALRM, lambda *args: None)344 self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)345 try:346 signal.alarm(1)347 s.register(rd, selectors.EVENT_READ)348 t = time()349 # select() is interrupted by a signal, but the signal handler doesn't350 # raise an exception, so select() should by retries with a recomputed351 # timeout352 self.assertFalse(s.select(1.5))353 self.assertGreaterEqual(time() - t, 1.0)354 finally:355 signal.alarm(0)356class ScalableSelectorMixIn:357 # see issue #18963 for why it's skipped on older OS X versions358 @support.requires_mac_ver(10, 5)359 @unittest.skipUnless(resource, "Test needs resource module")360 def test_above_fd_setsize(self):361 # A scalable implementation should have no problem with more than362 # FD_SETSIZE file descriptors. Since we don't know the value, we just363 # try to set the soft RLIMIT_NOFILE to the hard RLIMIT_NOFILE ceiling.364 soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)365 try:366 resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard))367 self.addCleanup(resource.setrlimit, resource.RLIMIT_NOFILE,368 (soft, hard))369 NUM_FDS = min(hard, 2**16)370 except (OSError, ValueError):371 NUM_FDS = soft372 # guard for already allocated FDs (stdin, stdout...)373 NUM_FDS -= 32374 s = self.SELECTOR()375 self.addCleanup(s.close)376 for i in range(NUM_FDS // 2):377 try:378 rd, wr = self.make_socketpair()379 except OSError:380 # too many FDs, skip - note that we should only catch EMFILE381 # here, but apparently *BSD and Solaris can fail upon connect()382 # or bind() with EADDRNOTAVAIL, so let's be safe383 self.skipTest("FD limit reached")384 try:385 s.register(rd, selectors.EVENT_READ)386 s.register(wr, selectors.EVENT_WRITE)387 except OSError as e:388 if e.errno == errno.ENOSPC:389 # this can be raised by epoll if we go over390 # fs.epoll.max_user_watches sysctl391 self.skipTest("FD limit reached")392 raise393 try:394 fds = s.select()395 except OSError as e:396 if e.errno == errno.EINVAL and sys.platform == 'darwin':397 # unexplainable errors on macOS don't need to fail the test398 self.skipTest("Invalid argument error calling poll()")399 raise400 self.assertEqual(NUM_FDS // 2, len(fds))401class DefaultSelectorTestCase(BaseSelectorTestCase):402 SELECTOR = selectors.DefaultSelector403class SelectSelectorTestCase(BaseSelectorTestCase):404 SELECTOR = selectors.SelectSelector405@unittest.skipUnless(hasattr(selectors, 'PollSelector'),406 "Test needs selectors.PollSelector")407class PollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn):408 SELECTOR = getattr(selectors, 'PollSelector', None)409@unittest.skipUnless(hasattr(selectors, 'EpollSelector'),410 "Test needs selectors.EpollSelector")411class EpollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn):412 SELECTOR = getattr(selectors, 'EpollSelector', None)413 def test_register_file(self):414 # epoll(7) returns EPERM when given a file to watch415 s = self.SELECTOR()416 with tempfile.NamedTemporaryFile() as f:417 with self.assertRaises(IOError):418 s.register(f, selectors.EVENT_READ)419 # the SelectorKey has been removed420 with self.assertRaises(KeyError):421 s.get_key(f)422@unittest.skipUnless(hasattr(selectors, 'KqueueSelector'),423 "Test needs selectors.KqueueSelector)")424class KqueueSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn):425 SELECTOR = getattr(selectors, 'KqueueSelector', None)426 def test_register_bad_fd(self):427 # a file descriptor that's been closed should raise an OSError428 # with EBADF429 s = self.SELECTOR()430 bad_f = support.make_bad_fd()431 with self.assertRaises(OSError) as cm:432 s.register(bad_f, selectors.EVENT_READ)433 self.assertEqual(cm.exception.errno, errno.EBADF)434 # the SelectorKey has been removed435 with self.assertRaises(KeyError):436 s.get_key(bad_f)437@unittest.skipUnless(hasattr(selectors, 'DevpollSelector'),438 "Test needs selectors.DevpollSelector")439class DevpollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn):440 SELECTOR = getattr(selectors, 'DevpollSelector', None)441def test_main():442 tests = [DefaultSelectorTestCase, SelectSelectorTestCase,443 PollSelectorTestCase, EpollSelectorTestCase,444 KqueueSelectorTestCase, DevpollSelectorTestCase]445 support.run_unittest(*tests)446 support.reap_children()447if __name__ == "__main__":...

Full Screen

Full Screen

stores.py

Source:stores.py Github

copy

Full Screen

1from selenium import webdriver2from selenium.common.exceptions import TimeoutException3from selenium.webdriver.support import expected_conditions as EC4from selenium.webdriver.common.by import By5from selenium.webdriver.common.keys import Keys6from selenium.webdriver.support.ui import WebDriverWait7import loc_data8import aux_func9def academy(obj):10 price_selectors = {"input#dlItemPrice":"value",}11 sale_selectors = {"span#currentPrice":"innerHTML",}12 broken_link_selectors = {"p#search_results_total_count":"innerHTML"}13 try:14 obj.pricing(price_selectors,sale_selectors,broken_link_selectors)15 except:16 obj.log("Failed to aquire pricing data")17 #No third party18 #check out of stock19 try:20 try:21 oos = obj._driver.find_element_by_css_selector("button#add2CartBtn").get_attribute("innerHTML")22 except:23 oos = "in stock"24 if "Out of Stock" in oos:25 obj.set_out_of_stock()26 except:27 obj.log("Out of stock check failed")28 finally:29 try:30 obj.kill_driver()31 except:32 obj.log("Error Deleting Driver")33 return34def acehardware(obj):35 price_selectors = {"div.productPrice span script":"innerHTML",}36 try:37 obj.pricing(price_selectors)38 except:39 obj.log("Failed to aquire pricing data")40 #No third party41 #No out of stock42 finally:43 try:44 obj.kill_driver()45 except:46 obj.log("Error Deleting Driver")47 return48def autozone(obj):49 price_selectors = {"table.part-pricing-info td.price.base-price" : "innerText",}50 sale_selectors = {"span.price.light-gray>strong":"innerHTML"}51 broken_link_selectors = {"":""}52 loc_ins="loc_data.autozone(self)"53 try:54 obj.pricing(price_selectors,sale_selectors,broken_link_selectors,loc_ins)55 except:56 obj.log("Failed to acquire pricing data")57 #No third party58 #out of stock check59 try:60 if obj._find_data("div.button-bar-msg-out-of-stock","innerText|||Not Available"):61 obj.set_out_of_stock()62 except:63 obj.log("out of stock check failed")64 finally:65 try:66 obj.kill_driver()67 except:68 obj.log("Error Deleting Driver")69 return70def basspro(obj):71 loc_ins = """72bpsku = loc_data.basspro(obj)73for p,value in price_dict.iteritems():74price_dict[p.format(bpsku)] = price_dict.pop(p)75for p,value in sale_dict.iteritems():76sale_dict[p.format(bpsku)] = sale_dict.pop(p)"""77 price_selectors = {"div.top.namePartPriceContainer span[itemprop=price]" : "innerHTML","span#listPrice_{}.old_price":"innerHTML",\78 "span#offerPrice_{} > span":"innerHTML","div[itemprop=offers]>span[itemprop=price]":"content"}79 sale_selectors = {"span#offerPrice_{}.price.sale > span":"innerHTML"}80 broken_link_selectors = {"":""}81 try:82 obj.pricing(price_selectors,sale_selectors,broken_link_selectors)83 except:84 obj.log("Failed to acqure pricing data")85 finally:86 try:87 obj.kill_driver()88 except:89 obj.log("Error Deleting Driver")90 return91def blain(obj):92 price_selectors = {"meta[itemprop=lowprice]":"content",\93 "div.active-price>div.price>span":"innerHTML",\94 "div.original-price>span.price>span":"innerHTML"}95 sale_selectors = {"div.active-price.promo > div.price > span:not([class])":"innerHTML",}96 broken_link_selectors = {"div.list-header-text > span":"innerHTML"}97 try:98 obj.pricing(price_selectors,sale_selectors,broken_link_selectors)99 except:100 obj.log("Failed to acquire pricing data")101 #No third party102 try:103 if not obj._find_data("span.stock-msg.in-stock"):104 obj.set_out_of_stock()105 except:106 obj.log("Out of stock check failed")107 finally:108 try:109 obj.kill_driver()110 except:111 obj.log("Error Deleting Driver")112 return113def bootbarn(obj):114 price_selectors = {"span.price-original.price-holder-alt":"innerHTML",\115 "h6.product-callout-title > strong":"innerHTML"}116 sale_selectors = {"h6.product-callout-title > strong":"innerHTML"}117 broken_link_selectors = {"":""}118 try:119 obj.pricing(price_selectors,sale_selectors,broken_link_selectors)120 except:121 obj.log("Failed to aquire pricing data")122 #no third Party123 #no out of stock124 finally:125 try:126 obj.kill_driver()127 except:128 obj.log("Error Deleting Driver")129 return130def cabela(obj):131 price_selectors = {"dd.regularnprange":"innerHTML",\132 "div.price > dl > dd.nprange":"innerHTML",\133 "div.price > dl > dd.prange":"innerHTML"}134 sale_selectors = {"dd.saleprice":"innerHTML"}135 broken_link_selectors = {"":""}136 try:137 obj.pricing(price_selectors,sale_selectors,broken_link_selectors)138 except:139 obj.log("Failed to acquire pricing data")140 finally:141 try:142 obj.kill_driver()143 except:144 obj.log("Error Deleting Driver")145 return146def Chewy(obj):147 price_selectors = {"span.ga-eec__price" : "innerHTML",}148 sale_selectors = {"p.autoship-pricing" : "innerHTML"}149 broken_link_selectors = {"":""}150 try:151 obj.pricing(price_selectors,sale_selectors,broken_link_selectors)152 except:153 raise154 obj.log("Failed to acquire pricing data")155 #No third party156 try:157 if obj._find_data("div#availability span.unavailable"):158 obj.set_out_of_stock()159 except:160 obj.log("Out of Stock check failed")161 finally:162 try:163 obj.kill_driver()164 except:165 obj.log("Error Deleting Driver")166 return167def dickeybub(obj):168 price_selectors = {"p.price > del > span.woocommerce-Price-amount.amount" : "innerHTML",\169 "p.price > span.woocommerce-Price-amount.amount" : "innerHTML",}170 sale_selectors = {"p.price > ins > span.woocommerce-Price-amount.amount" : "innerHTML",}171 try:172 obj.pricing(price_selectors,sale_selectors,)173 except:174 obj.log("Failed to acquire pricing data")175 finally:176 try:177 obj.kill_driver()178 except:179 obj.log("Error Deleting Driver")180 return181def farm_and_home(obj):182 if obj.comp_id == 25:183 loc_ins = "loc_data.farm_and_home(self,10)"184 elif obj.comp_id == 4:185 loc_ins = "loc_data.farm_and_home(self,2)"186 price_selectors = {"div.product-main-info > div.price-box > span.regular-price > span.price" : "innerHTML", "div.product-info-price span.price" : "innerHTML"}187 sale_selectors = {}188 broken_link_selectors = {}189 try:190 obj.pricing(price_selectors,sale_selectors,broken_link_selectors,loc_ins)191 except:192 obj.log("Failed to acquire pricing data")193 finally:194 try:195 obj.kill_driver()196 except:197 obj.log("Error Deleting Driver")198 #obj.log("Competitor: %d not yet defined" %obj.comp_id)199 #obj.set_undefined()200 return201def home_depot(obj):202 if obj.comp_id == 23:203 loc_ins = "loc_data.home_depot(self,62226)"204 elif obj.comp_id == 5:205 loc_ins = "loc_data.home_depot(self,63028)"206 elif obj.comp_id == 17:207 loc_ins = "loc_data.home_depot(self,62650)"208 price_selectors = {"input#ciItemPrice":"value","span#ajaxPriceStrikeThru":"innerHTML","span#ajaxPriceAlt":"innerHTML",\209 "span#ajaxPrice":"content"}210 sale_selectors = {"span#ajaxPrice":"content"}211 broken_link_selectors = {"div.buybelt__flex-wrapper.buybelt__store-wrapper span.u__text--danger":"innerHTML|||Unavailable",\212 "div#productinfo_ctn > div.error >p":"innerHTML|||not currently available"}213 try:214 obj.pricing(price_selectors,sale_selectors,broken_link_selectors,loc_ins)215 except:216 obj.log("Failed to acquire pricing data")217 #Out of stock check218 try:219#Checking for hidden field that indicates availability220 try:221 if obj._find_data("input#availableInLocalStore","value|||false"):222 obj.set_out_of_stock()223 except:224 pass225#Check for if the quantity box under Pick Up In Store is zero226 try:227 WebDriverWait(obj._driver, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, "div.buybelt__box")))228 if '0' == str(obj._retrieve_data("span.quantity","innerHTML")):229 obj.set_out_of_stock()230 except TimeoutException as error:231 pass232 except:233 obj.log("Out of stock check failed")234 finally:235 try:236 obj.kill_driver()237 except:238 obj.log("Error Deleting Driver")239 return240def lowes(obj):241 if obj.comp_id == 6:242 loc_ins = "loc_data.lowes(self,63028,'Festus')"243 elif obj.comp_id == 15:244 loc_ins = "loc_data.lowes(self,63701,'Cape Girardeau')"245 elif obj.comp_id == 16:246 loc_ins = "loc_data.lowes(self,62704,'Springfield')"247 elif obj.comp_id == 24:248 loc_ins = "loc_data.lowes(self,62221,'Belleville')"249 price_selectors = {"input[name=productId]":"data-productprice","span.secondary-text.small-type.art-pd-wasPriceLbl":"innerHTML",\250 "span[itemprop=price]":"content","span.primary-font.jumbo.strong.art-pd-price":"innerHTML"}251 sale_selectors = {"span.primary-font.jumbo.strong.art-pd-contractPricing":"innerHTML"}252 broken_link_selectors = {"div.alert.alert-warning i.icon-error-outline.red":"",\253 " div.pd-shipping-delivery.met-fulfillment-delivery.grid-50.tablet-grid-50 div.media-body > p":"innerHTML|||unavailable"}254 try:255 obj.pricing(price_selectors,sale_selectors,broken_link_selectors,loc_ins)256 except:257 raise258 obj.log("Failed to acquire pricing data")259 #Out of stock check260 try:261 if obj._find_data("div.fulfillment-method div.media-body >p","innerHTML|||Unavailable"):262 obj.set_out_of_stock()263 except:264 obj.log("Out of stock check failed")265 finally:266 try:267 obj.kill_driver()268 except:269 obj.log("Error Deleting Driver")270 return271def menards(obj):272 if obj.comp_id == 7:273 loc_ins = "loc_data.menards(self,'3286')"274 elif obj.comp_id == 26:275 loc_ins = "loc_data.menards(self,'3334')"276 elif obj.comp_id == 27:277 loc_ins = "loc_data.menards(self,'3293')"278 price_selectors = {"span.bargainStrike" : "innerHTML",\279 "span.EDLP.fontSize16.fontBold.alignRight" : "innerHTML",\280 "span#totalItemPriceFloater" : "innerHTML",281 "span.finalPriceSpan.leftFloat":"innerText" }282 sale_selectors = {"span.bargainPrice" : "innerHTML", \283 "span#totalItemPriceFloater" : "innerHTML",284 "span.finalPriceSpan.leftFloat":"innerText"}285 broken_link_selectors = {"h3.resettitle":"innerHTML"}286 try:287 obj.pricing(price_selectors,sale_selectors,broken_link_selectors,loc_ins)288 except:289 obj.log("Failed to acquire pricing data")290 finally:291 try:292 obj.kill_driver()293 except:294 obj.log("Error Deleting Driver")295 return296def orscheln(obj):297 price_selectors = {"span.product_unit_price" : "innerHTML","meta[itemprop=price]" : "content"}298 sale_selectors = {"":""}299 broken_link_selectors = {"":""}300 try:301 obj.pricing(price_selectors,sale_selectors,broken_link_selectors)302 except:303 obj.log("Failed to acquire pricing data")304 #No third party305 #No out of stock306 try:307 if obj._find_data("div.product-info-stock-sku","innerHTML|||Out of Stock Online"):308 obj.set_out_of_stock()309 except:310 obj.log("Out of Stock check failed")311 finally:312 try:313 obj.kill_driver()314 except:315 obj.log("Error Deleting Driver")316 return317def petsense(obj):318 obj.log("Competitor: %d not yet defined" %obj.comp_id)319 obj.set_undefined()320 return321 price_selectors = {"div#product_price span.money" : "innerHTML",}322 sale_selectors = {"":""}323 broken_link_selectors = {"div.selector-wrapper>select.single-option-selector":"innerHTML"}324 try:325 obj.pricing(price_selectors,sale_selectors,broken_link_selectors)326 except:327 obj.log("Failed to acquire pricing data")328 #No third party329 #No out of stock330 finally:331 try:332 obj.kill_driver()333 except:334 obj.log("Error Deleting Driver")335 return336def ruralking(obj):337 price_selectors = {"meta[itemprop=price]":"content","meta[property='product:price:amount']":"content","span.price" : "innerHTML"}338 #"span[itemprop=offers] > span[itemprop=price]":"innerHTML"}339 sale_selectors = {"":""}340 broken_link_selectors = {"div.bluefoot-row.bluefoot-structural.with-media-background.not-found-404-page span" : "innerHTML|||SORRY",\341 "div.page-head-alt >h2":"innerHTML|||Sorry","div.page-head-alt >h3":"innerHTML|||Sorry"}342 try:343 obj.pricing(price_selectors,sale_selectors,broken_link_selectors)344 except:345 obj.log("Failed to acquire pricing data")346 try:347 if "OUT OF STOCK" in obj._retrieve_data("div.product-shop h1[style]","innerHTML"):348 obj.set_out_of_stock()349 elif obj._find_data("p.prod_availability >span.backorder"):350 obj.set_out_of_stock()351 except:352 obj.log("Out of stock check failed")353 finally:354 try:355 obj.kill_driver()356 except:357 obj.log("Error Deleting Driver")358 return359def sears(obj):360 price_selectors = {"span.price-wrapper":"innerHTML"}361 sale_selectors = {"h4.redSalePrice span.price-wrapper":"innerHTML"}362 broken_link_selectors = {"":""}363 try:364 obj.pricing(price_selectors,sale_selectors,broken_link_selectors)365 except:366 obj.log("Failed to acquire pricing data")367 #No Third Party368 #No out of Stock369 finally:370 try:371 obj.kill_driver()372 except:373 obj.log("Error Deleting Driver")374 return375def shelper(obj):376 price_selectors = {"div.product-content-inner > div.product-price > span.price-original.price-holder-alt > strong" : "innerHTML",}377 sale_selectors = {"div.product-content-inner > div.product-callout > h6.product-callout-title > strong" : "innerHTML",}378 broken_link_selectors = {"":""}379 try:380 obj.pricing(price_selectors,sale_selectors)381 except:382 obj.log("Failed to acquire pricing data")383 #No third party384 #No out of stock385 finally:386 try:387 obj.kill_driver()388 except:389 obj.log("Error Deleting Driver")390 return391def tsc(obj):392 #view in cart item393 # https://www.tractorsupply.com/tsc/product/jonsered-502cc-gas-chainsaw-cs2250s?cm_vc=-10005394 if obj.comp_id == 73:395 loc_ins = "loc_data.tsc(self,'63049')"396 elif obj.comp_id == 74:397 loc_ins = "loc_data.tsc(self,'63701')"398 elif obj.comp_id == 8:399 loc_ins = "loc_data.tsc(self,'63640')"400 elif obj.comp_id == 124:401 loc_ins = "loc_data.tsc(self,'63801')"402 price_selectors = {"div.was_save_sku span.was_text":"innerHTML","span.dollar_price":"innerHTML"}403 sale_selectors = {"span.dollar_price":"innerHTML"}404 broken_link_selectors = {"div#WC_GenericError_6.info":"innerHTML","div#UnpubProductErrormsg":"innerHTML|||selection below"}405 try:406 obj.pricing(price_selectors,sale_selectors,broken_link_selectors,loc_ins)407 except:408 obj.log("Failed to acquire pricing data")409 #no third party410 #no out of stock411 finally:412 try:413 obj.kill_driver()414 except:415 obj.log("Error Deleting Driver")416 return417def valleyvet(obj):418 obj.log("Competitor: %d not yet defined" %obj.comp_id)419 obj.set_undefined()420 return421def walmart(obj):422 price_selectors = {"span.price-characteristic[itemprop=price]":"content","div.Price-old.display-inline-block.arrange-fill.font-normal.u-textNavyBlue.display-inline > span.Price-group" : "title",\423 "div.prod-BotRow.prod-showBottomBorder.prod-OfferSection.prod-OfferSection-twoPriceDisplay div.Grid-col:nth-child(4) span.Price-group" : "title",\424 "span.display-inline-block.arrange-fit.Price.Price-enhanced.u-textNavyBlue > span.Price-group" : "title",\425 "span.display-inline-block.arrange-fit.Price.Price-enhanced.u-textGray > span.Price-group" : "title",}426 broken_link_selectors = {"div.font-semibold.prod-Bot-partial-head" : "innerHTML",\427 "p.error-ErrorPage-copy":"innerHTML"}428 sale_selectors = {}429 try:430 obj.pricing(price_selectors,sale_selectors,broken_link_selectors)431 except:432 obj.log("Failed to aquire pricing data")433 #check for Third party434 try:435 if not obj._find_data("a[data-tl-id=ProductSellerInfo-SellerName]","innerHTML|||Walmart"):436 sellers = obj._driver.find_elements_by_css_selector("div.marketplace-body")437 for sell in sellers:438 if sell.find_element_by_css_selector("span.seller-shipping-msg.u-textBlue").get_attribute("innerHTML").encode('utf-8') == 'Walmart':439 obj.set_price(aux_func.clean(sell.find_element_by_css_selector("span.Price-group").get_attribute('title')))440 break441 else:442 obj.set_third_party()443 except:444 obj.log("Third party check failed")445 #check Out of stock446 try:447 try:448 oos = obj._driver.find_element_by_css_selector("div.prod-ProductOffer-oosMsg.prod-PaddingTop--xxs > span").get_attribute("innerHTML")449 except:450 oos = "in stock"451 if "Out of stock" in oos:452 obj.set_out_of_stock()453 except:454 obj.log("Out of stock check failed")455 finally:456 try:457 obj.kill_driver()458 except:459 obj.log("Error Deleting Driver")460 return461def _default(obj):462 obj.log("Unknown Competitor ID")463 obj.set_undefined()...

Full Screen

Full Screen

flowablebaseandselectors.py

Source:flowablebaseandselectors.py Github

copy

Full Screen

...79 # is untouched.80 if isinstance(result.right, IdentitySeqMapInfo):81 # extend right selector with the observable that maps the selector_base to base82 if isinstance(result.left, ObservableSeqMapInfo):83 selector_map = ObservableSeqMapInfo(merge_selectors(84 result.left.observable,85 selector_obs,86 subscribe_scheduler=subscriber.scheduler,87 stack=stack,88 ))89 if other.selectors is not None:90 def gen_new_selectors():91 for key, val in other.selectors.items():92 selector = merge_selectors(93 left=merge_selectors(94 left=val,95 right=result.left.observable,96 subscribe_scheduler=subscriber.scheduler,97 stack=stack,98 ),99 right=selector_obs,100 subscribe_scheduler=subscriber.scheduler,101 stack=stack,102 )103 yield key, selector104 selectors = dict(gen_new_selectors())105 else:106 selectors = None107 elif isinstance(result.left, IdentitySeqMapInfo):108 selector_map = ObservableSeqMapInfo(selector_obs)109 if other.selectors is not None:110 def gen_new_selectors():111 for key, val in other.selectors.items():112 selector = merge_selectors(113 left=val,114 right=selector_obs,115 subscribe_scheduler=subscriber.scheduler,116 stack=stack,117 )118 yield key, selector119 selectors = dict(gen_new_selectors())120 else:121 selectors = None122 else:123 raise Exception(f'illegal result "{result.left}"')124 if selectors is not None or self.selectors is not None:125 if selectors is not None:126 if self.selectors is not None:127 selectors = {**self.selectors, **selectors}128 else:129 selectors = selectors130 else:131 selectors = self.selectors132 return FlowableBaseAndSelectors(self.base, selectors), result.right, selector_map133 return None134 def match_with(135 self,136 other: 'FlowableBaseAndSelectors',137 subscriber: Subscriber,138 stack: List[FrameSummary],139 ) -> Optional[FlowableBaseAndSelectorsMatch]:140 # bases are of type Optional[Base], therefore check first if base is not None141 if self.base is not None and other.base is not None:142 result = self.base.match_with(other.base, subscriber=subscriber, stack=stack)143 # this BaseAndSelectors and the other BaseAndSelectors match directly with144 # their bases145 if isinstance(result, FlowableBaseMatch):146 # the selectors can be taken over to the new base selector tuple147 if self.selectors is not None and other.selectors is not None:148 if self.selectors is not None:149 if other.selectors is None:150 selectors = self.selectors151 else:152 selectors = {**self.selectors, **other.selectors}153 else:154 selectors = other.selectors155 else:156 selectors = {}157 return FlowableBaseAndSelectorsMatch(158 base_selectors=FlowableBaseAndSelectors(159 base=result.base,160 selectors=selectors,161 ),162 left=result.left,163 right=result.right,164 )165 result = self.get_selector_maps(166 other=other,167 subscriber=subscriber,168 stack=stack,169 )170 if result is not None:171 base_selectors, left_selector_map, right_selector_map = result172 return FlowableBaseAndSelectorsMatch(173 left=left_selector_map,174 right=right_selector_map,175 base_selectors=base_selectors,176 )177 result = other.get_selector_maps(178 other=self,179 subscriber=subscriber,180 stack=stack,181 )182 if result is not None:183 base_selectors, right_selector_map, left_selector_map = result184 return FlowableBaseAndSelectorsMatch(185 left=left_selector_map,186 right=right_selector_map,187 base_selectors=base_selectors,188 )189 # two bases B1, B2 defined in selectors match, the following map is created190 # - define a new anonymous base B_match191 # - define two selectors that map a sequence with base B1 and B2 to base B_match192 # in case B1 and B2 match with identity, only define no selector193 # - define two selectors that map the two sequences of the match operator to base B_match194 if self.selectors is not None and other.selectors is not None:195 # check if some tuple of bases matches196 for sel_base_1, sel_observable_1 in self.selectors.items():197 for sel_base_2, sel_observable_2 in other.selectors.items():198 result = sel_base_1.match_with(199 sel_base_2,200 subscriber=subscriber,201 stack=stack,202 )203 # if two bases match ...204 if isinstance(result, FlowableBaseMatch):205 # once right is completed, keep consuming left side until it is completed as well206 def request_left(left, right):207 return not (isinstance(left, SelectCompleted) and isinstance(right, SelectNext))208 def request_right(left, right):209 return not (isinstance(right, SelectCompleted) and isinstance(left, SelectNext))210 if isinstance(result.left, IdentitySeqMapInfo) and isinstance(result.right, IdentitySeqMapInfo):211 merge_sel = RefCountObservable(212 source=ControlledZipObservable(213 left=sel_observable_1, #init_debug_observable(sel_observable_1, name='d1', stack=stack, subscriber=subscriber),214 right=sel_observable_2, #init_debug_observable(sel_observable_2, name='d1', stack=stack, subscriber=subscriber),215 request_left=request_left,216 request_right=request_right,217 match_func=lambda _, __: True,218 scheduler=subscriber.scheduler,219 stack=stack,220 ), #name='d2', stack=stack, subscriber=subscriber),221 subject=PublishObservableSubject(),222 subscribe_scheduler=subscriber.subscribe_scheduler,223 stack=stack,224 )225 def left_selector(t):226 if isinstance(t[1], SelectNext):227 return [select_next, select_completed]228 else:229 return [select_completed]230 def right_selector(t):231 if isinstance(t[0], SelectNext):232 return [select_next, select_completed]233 else:234 return [select_completed]235 left_sel = RefCountObservable(236 source=MapToIteratorObservable(237 source=FilterObservable(238 source=merge_sel,239 predicate=lambda t: isinstance(t[0], SelectNext),240 ),241 func=left_selector,242 ),243 subject=PublishObservableSubject(),244 subscribe_scheduler=subscriber.subscribe_scheduler,245 stack=stack,246 )247 right_sel = RefCountObservable(248 source=MapToIteratorObservable(249 source=FilterObservable(250 source=merge_sel,251 predicate=lambda t: isinstance(t[1], SelectNext),252 ),253 func=right_selector,254 ),255 subject=PublishObservableSubject(),256 subscribe_scheduler=subscriber.scheduler,257 stack=stack,258 )259 right_selector_map = MapObservable(260 source=FilterObservable(261 source=merge_sel,262 predicate=lambda t: type(t[0]) == type(t[1]),263 ),264 func=lambda t: t[0],265 )266 return FlowableBaseAndSelectorsMatch(267 left=ObservableSeqMapInfo(left_sel),268 right=ObservableSeqMapInfo(right_sel),269 base_selectors=FlowableBaseAndSelectors(270 base=None,271 selectors={272 sel_base_1: right_selector_map,273 **{k: merge_selectors(v, left_sel, subscribe_scheduler=subscriber.scheduler, stack=stack) for k, v in274 self.selectors.items() if k != sel_base_1},275 **{k: merge_selectors(v, right_sel, subscribe_scheduler=subscriber.scheduler, stack=stack) for k, v in276 other.selectors.items() if k != sel_base_2}277 },278 ),279 )...

Full Screen

Full Screen

test_baseselectortuple.py

Source:test_baseselectortuple.py Github

copy

Full Screen

...93 # #94 # #95 # # t1 = BaseAndSelectors(base=TestBase(self.num_base_1), selectors={self.num_base_2: self.sel2})96 # # t2 = BaseAndSelectors(base=self.num_base_2, selectors={self.num_base_3: self.sel3})97 # # result: Optional[BaseSelectorAndSelectorMaps] = t1.get_selectors(t2, self.subscriber)98 # #99 # # self.assertIsInstance(result, BaseSelectorAndSelectorMaps)100 # # self.assertIsInstance(result.right, IdentitySelectorMap)101 # # self.assertIsInstance(result.left, IdentitySelectorMap)102 # # self.assertIsInstance(result.base_selectors.base, NumericalBase)103 # # self.assertEqual(1, result.base_selectors.base.num)104 # # self.assertEqual({2: None, 3: None}, result.base_selectors.selectors)105 # def test_not_matching(self):106 # t1 = BaseAndSelectors(base=NumericalBase(1))107 # t2 = BaseAndSelectors(base=NumericalBase(2))108 # result: Optional[BaseSelectorsAndSelectorMaps] = t1.match_with(t2, self.subscriber)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful