Best Python code snippet using autotest_python
test_selectors.py
Source:test_selectors.py  
1import errno2import os3import random4import selectors5import signal6import socket7import sys8from test import support9from time import sleep10import unittest11import unittest.mock12import tempfile13from time import monotonic as time14try:15    import resource16except ImportError:17    resource = None18if hasattr(socket, 'socketpair'):19    socketpair = socket.socketpair20else:21    def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0):22        with socket.socket(family, type, proto) as l:23            l.bind((support.HOST, 0))24            l.listen()25            c = socket.socket(family, type, proto)26            try:27                c.connect(l.getsockname())28                caddr = c.getsockname()29                while True:30                    a, addr = l.accept()31                    # check that we've got the correct client32                    if addr == caddr:33                        return c, a34                    a.close()35            except OSError:36                c.close()37                raise38def find_ready_matching(ready, flag):39    match = []40    for key, events in ready:41        if events & flag:42            match.append(key.fileobj)43    return match44class BaseSelectorTestCase(unittest.TestCase):45    def make_socketpair(self):46        rd, wr = socketpair()47        self.addCleanup(rd.close)48        self.addCleanup(wr.close)49        return rd, wr50    def test_register(self):51        s = self.SELECTOR()52        self.addCleanup(s.close)53        rd, wr = self.make_socketpair()54        key = s.register(rd, selectors.EVENT_READ, "data")55        self.assertIsInstance(key, selectors.SelectorKey)56        self.assertEqual(key.fileobj, rd)57        self.assertEqual(key.fd, rd.fileno())58        self.assertEqual(key.events, selectors.EVENT_READ)59        self.assertEqual(key.data, "data")60        # register an unknown event61        self.assertRaises(ValueError, s.register, 0, 999999)62        # register an invalid FD63        self.assertRaises(ValueError, s.register, -10, selectors.EVENT_READ)64        # register twice65        self.assertRaises(KeyError, s.register, rd, selectors.EVENT_READ)66        # register the same FD, but with a different object67        self.assertRaises(KeyError, s.register, rd.fileno(),68                          selectors.EVENT_READ)69    def test_unregister(self):70        s = self.SELECTOR()71        self.addCleanup(s.close)72        rd, wr = self.make_socketpair()73        s.register(rd, selectors.EVENT_READ)74        s.unregister(rd)75        # unregister an unknown file obj76        self.assertRaises(KeyError, s.unregister, 999999)77        # unregister twice78        self.assertRaises(KeyError, s.unregister, rd)79    def test_unregister_after_fd_close(self):80        s = self.SELECTOR()81        self.addCleanup(s.close)82        rd, wr = self.make_socketpair()83        r, w = rd.fileno(), wr.fileno()84        s.register(r, selectors.EVENT_READ)85        s.register(w, selectors.EVENT_WRITE)86        rd.close()87        wr.close()88        s.unregister(r)89        s.unregister(w)90    @unittest.skipUnless(os.name == 'posix', "requires posix")91    def test_unregister_after_fd_close_and_reuse(self):92        s = self.SELECTOR()93        self.addCleanup(s.close)94        rd, wr = self.make_socketpair()95        r, w = rd.fileno(), wr.fileno()96        s.register(r, selectors.EVENT_READ)97        s.register(w, selectors.EVENT_WRITE)98        rd2, wr2 = self.make_socketpair()99        rd.close()100        wr.close()101        os.dup2(rd2.fileno(), r)102        os.dup2(wr2.fileno(), w)103        self.addCleanup(os.close, r)104        self.addCleanup(os.close, w)105        s.unregister(r)106        s.unregister(w)107    def test_unregister_after_socket_close(self):108        s = self.SELECTOR()109        self.addCleanup(s.close)110        rd, wr = self.make_socketpair()111        s.register(rd, selectors.EVENT_READ)112        s.register(wr, selectors.EVENT_WRITE)113        rd.close()114        wr.close()115        s.unregister(rd)116        s.unregister(wr)117    def test_modify(self):118        s = self.SELECTOR()119        self.addCleanup(s.close)120        rd, wr = self.make_socketpair()121        key = s.register(rd, selectors.EVENT_READ)122        # modify events123        key2 = s.modify(rd, selectors.EVENT_WRITE)124        self.assertNotEqual(key.events, key2.events)125        self.assertEqual(key2, s.get_key(rd))126        s.unregister(rd)127        # modify data128        d1 = object()129        d2 = object()130        key = s.register(rd, selectors.EVENT_READ, d1)131        key2 = s.modify(rd, selectors.EVENT_READ, d2)132        self.assertEqual(key.events, key2.events)133        self.assertNotEqual(key.data, key2.data)134        self.assertEqual(key2, s.get_key(rd))135        self.assertEqual(key2.data, d2)136        # modify unknown file obj137        self.assertRaises(KeyError, s.modify, 999999, selectors.EVENT_READ)138        # modify use a shortcut139        d3 = object()140        s.register = unittest.mock.Mock()141        s.unregister = unittest.mock.Mock()142        s.modify(rd, selectors.EVENT_READ, d3)143        self.assertFalse(s.register.called)144        self.assertFalse(s.unregister.called)145    def test_modify_unregister(self):146        # Make sure the fd is unregister()ed in case of error on147        # modify(): http://bugs.python.org/issue30014148        if self.SELECTOR.__name__ == 'EpollSelector':149            patch = unittest.mock.patch(150                'selectors.EpollSelector._selector_cls')151        elif self.SELECTOR.__name__ == 'PollSelector':152            patch = unittest.mock.patch(153                'selectors.PollSelector._selector_cls')154        elif self.SELECTOR.__name__ == 'DevpollSelector':155            patch = unittest.mock.patch(156                'selectors.DevpollSelector._selector_cls')157        else:158            raise self.skipTest("")159        with patch as m:160            m.return_value.modify = unittest.mock.Mock(161                side_effect=ZeroDivisionError)162            s = self.SELECTOR()163            self.addCleanup(s.close)164            rd, wr = self.make_socketpair()165            s.register(rd, selectors.EVENT_READ)166            self.assertEqual(len(s._map), 1)167            with self.assertRaises(ZeroDivisionError):168                s.modify(rd, selectors.EVENT_WRITE)169            self.assertEqual(len(s._map), 0)170    def test_close(self):171        s = self.SELECTOR()172        self.addCleanup(s.close)173        mapping = s.get_map()174        rd, wr = self.make_socketpair()175        s.register(rd, selectors.EVENT_READ)176        s.register(wr, selectors.EVENT_WRITE)177        s.close()178        self.assertRaises(RuntimeError, s.get_key, rd)179        self.assertRaises(RuntimeError, s.get_key, wr)180        self.assertRaises(KeyError, mapping.__getitem__, rd)181        self.assertRaises(KeyError, mapping.__getitem__, wr)182    def test_get_key(self):183        s = self.SELECTOR()184        self.addCleanup(s.close)185        rd, wr = self.make_socketpair()186        key = s.register(rd, selectors.EVENT_READ, "data")187        self.assertEqual(key, s.get_key(rd))188        # unknown file obj189        self.assertRaises(KeyError, s.get_key, 999999)190    def test_get_map(self):191        s = self.SELECTOR()192        self.addCleanup(s.close)193        rd, wr = self.make_socketpair()194        keys = s.get_map()195        self.assertFalse(keys)196        self.assertEqual(len(keys), 0)197        self.assertEqual(list(keys), [])198        key = s.register(rd, selectors.EVENT_READ, "data")199        self.assertIn(rd, keys)200        self.assertEqual(key, keys[rd])201        self.assertEqual(len(keys), 1)202        self.assertEqual(list(keys), [rd.fileno()])203        self.assertEqual(list(keys.values()), [key])204        # unknown file obj205        with self.assertRaises(KeyError):206            keys[999999]207        # Read-only mapping208        with self.assertRaises(TypeError):209            del keys[rd]210    def test_select(self):211        s = self.SELECTOR()212        self.addCleanup(s.close)213        rd, wr = self.make_socketpair()214        s.register(rd, selectors.EVENT_READ)215        wr_key = s.register(wr, selectors.EVENT_WRITE)216        result = s.select()217        for key, events in result:218            self.assertTrue(isinstance(key, selectors.SelectorKey))219            self.assertTrue(events)220            self.assertFalse(events & ~(selectors.EVENT_READ |221                                        selectors.EVENT_WRITE))222        self.assertEqual([(wr_key, selectors.EVENT_WRITE)], result)223    def test_context_manager(self):224        s = self.SELECTOR()225        self.addCleanup(s.close)226        rd, wr = self.make_socketpair()227        with s as sel:228            sel.register(rd, selectors.EVENT_READ)229            sel.register(wr, selectors.EVENT_WRITE)230        self.assertRaises(RuntimeError, s.get_key, rd)231        self.assertRaises(RuntimeError, s.get_key, wr)232    def test_fileno(self):233        s = self.SELECTOR()234        self.addCleanup(s.close)235        if hasattr(s, 'fileno'):236            fd = s.fileno()237            self.assertTrue(isinstance(fd, int))238            self.assertGreaterEqual(fd, 0)239    def test_selector(self):240        s = self.SELECTOR()241        self.addCleanup(s.close)242        NUM_SOCKETS = 12243        MSG = b" This is a test."244        MSG_LEN = len(MSG)245        readers = []246        writers = []247        r2w = {}248        w2r = {}249        for i in range(NUM_SOCKETS):250            rd, wr = self.make_socketpair()251            s.register(rd, selectors.EVENT_READ)252            s.register(wr, selectors.EVENT_WRITE)253            readers.append(rd)254            writers.append(wr)255            r2w[rd] = wr256            w2r[wr] = rd257        bufs = []258        while writers:259            ready = s.select()260            ready_writers = find_ready_matching(ready, selectors.EVENT_WRITE)261            if not ready_writers:262                self.fail("no sockets ready for writing")263            wr = random.choice(ready_writers)264            wr.send(MSG)265            for i in range(10):266                ready = s.select()267                ready_readers = find_ready_matching(ready,268                                                    selectors.EVENT_READ)269                if ready_readers:270                    break271                # there might be a delay between the write to the write end and272                # the read end is reported ready273                sleep(0.1)274            else:275                self.fail("no sockets ready for reading")276            self.assertEqual([w2r[wr]], ready_readers)277            rd = ready_readers[0]278            buf = rd.recv(MSG_LEN)279            self.assertEqual(len(buf), MSG_LEN)280            bufs.append(buf)281            s.unregister(r2w[rd])282            s.unregister(rd)283            writers.remove(r2w[rd])284        self.assertEqual(bufs, [MSG] * NUM_SOCKETS)285    @unittest.skipIf(sys.platform == 'win32',286                     'select.select() cannot be used with empty fd sets')287    def test_empty_select(self):288        # Issue #23009: Make sure EpollSelector.select() works when no FD is289        # registered.290        s = self.SELECTOR()291        self.addCleanup(s.close)292        self.assertEqual(s.select(timeout=0), [])293    def test_timeout(self):294        s = self.SELECTOR()295        self.addCleanup(s.close)296        rd, wr = self.make_socketpair()297        s.register(wr, selectors.EVENT_WRITE)298        t = time()299        self.assertEqual(1, len(s.select(0)))300        self.assertEqual(1, len(s.select(-1)))301        self.assertLess(time() - t, 0.5)302        s.unregister(wr)303        s.register(rd, selectors.EVENT_READ)304        t = time()305        self.assertFalse(s.select(0))306        self.assertFalse(s.select(-1))307        self.assertLess(time() - t, 0.5)308        t0 = time()309        self.assertFalse(s.select(1))310        t1 = time()311        dt = t1 - t0312        # Tolerate 2.0 seconds for very slow buildbots313        self.assertTrue(0.8 <= dt <= 2.0, dt)314    @unittest.skipUnless(hasattr(signal, "alarm"),315                         "signal.alarm() required for this test")316    def test_select_interrupt_exc(self):317        s = self.SELECTOR()318        self.addCleanup(s.close)319        rd, wr = self.make_socketpair()320        class InterruptSelect(Exception):321            pass322        def handler(*args):323            raise InterruptSelect324        orig_alrm_handler = signal.signal(signal.SIGALRM, handler)325        self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)326        try:327            signal.alarm(1)328            s.register(rd, selectors.EVENT_READ)329            t = time()330            # select() is interrupted by a signal which raises an exception331            with self.assertRaises(InterruptSelect):332                s.select(30)333            # select() was interrupted before the timeout of 30 seconds334            self.assertLess(time() - t, 5.0)335        finally:336            signal.alarm(0)337    @unittest.skipUnless(hasattr(signal, "alarm"),338                         "signal.alarm() required for this test")339    def test_select_interrupt_noraise(self):340        s = self.SELECTOR()341        self.addCleanup(s.close)342        rd, wr = self.make_socketpair()343        orig_alrm_handler = signal.signal(signal.SIGALRM, lambda *args: None)344        self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)345        try:346            signal.alarm(1)347            s.register(rd, selectors.EVENT_READ)348            t = time()349            # select() is interrupted by a signal, but the signal handler doesn't350            # raise an exception, so select() should by retries with a recomputed351            # timeout352            self.assertFalse(s.select(1.5))353            self.assertGreaterEqual(time() - t, 1.0)354        finally:355            signal.alarm(0)356class ScalableSelectorMixIn:357    # see issue #18963 for why it's skipped on older OS X versions358    @support.requires_mac_ver(10, 5)359    @unittest.skipUnless(resource, "Test needs resource module")360    def test_above_fd_setsize(self):361        # A scalable implementation should have no problem with more than362        # FD_SETSIZE file descriptors. Since we don't know the value, we just363        # try to set the soft RLIMIT_NOFILE to the hard RLIMIT_NOFILE ceiling.364        soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)365        try:366            resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard))367            self.addCleanup(resource.setrlimit, resource.RLIMIT_NOFILE,368                            (soft, hard))369            NUM_FDS = min(hard, 2**16)370        except (OSError, ValueError):371            NUM_FDS = soft372        # guard for already allocated FDs (stdin, stdout...)373        NUM_FDS -= 32374        s = self.SELECTOR()375        self.addCleanup(s.close)376        for i in range(NUM_FDS // 2):377            try:378                rd, wr = self.make_socketpair()379            except OSError:380                # too many FDs, skip - note that we should only catch EMFILE381                # here, but apparently *BSD and Solaris can fail upon connect()382                # or bind() with EADDRNOTAVAIL, so let's be safe383                self.skipTest("FD limit reached")384            try:385                s.register(rd, selectors.EVENT_READ)386                s.register(wr, selectors.EVENT_WRITE)387            except OSError as e:388                if e.errno == errno.ENOSPC:389                    # this can be raised by epoll if we go over390                    # fs.epoll.max_user_watches sysctl391                    self.skipTest("FD limit reached")392                raise393        try:394            fds = s.select()395        except OSError as e:396            if e.errno == errno.EINVAL and sys.platform == 'darwin':397                # unexplainable errors on macOS don't need to fail the test398                self.skipTest("Invalid argument error calling poll()")399            raise400        self.assertEqual(NUM_FDS // 2, len(fds))401class DefaultSelectorTestCase(BaseSelectorTestCase):402    SELECTOR = selectors.DefaultSelector403class SelectSelectorTestCase(BaseSelectorTestCase):404    SELECTOR = selectors.SelectSelector405@unittest.skipUnless(hasattr(selectors, 'PollSelector'),406                     "Test needs selectors.PollSelector")407class PollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn):408    SELECTOR = getattr(selectors, 'PollSelector', None)409@unittest.skipUnless(hasattr(selectors, 'EpollSelector'),410                     "Test needs selectors.EpollSelector")411class EpollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn):412    SELECTOR = getattr(selectors, 'EpollSelector', None)413    def test_register_file(self):414        # epoll(7) returns EPERM when given a file to watch415        s = self.SELECTOR()416        with tempfile.NamedTemporaryFile() as f:417            with self.assertRaises(IOError):418                s.register(f, selectors.EVENT_READ)419            # the SelectorKey has been removed420            with self.assertRaises(KeyError):421                s.get_key(f)422@unittest.skipUnless(hasattr(selectors, 'KqueueSelector'),423                     "Test needs selectors.KqueueSelector)")424class KqueueSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn):425    SELECTOR = getattr(selectors, 'KqueueSelector', None)426    def test_register_bad_fd(self):427        # a file descriptor that's been closed should raise an OSError428        # with EBADF429        s = self.SELECTOR()430        bad_f = support.make_bad_fd()431        with self.assertRaises(OSError) as cm:432            s.register(bad_f, selectors.EVENT_READ)433        self.assertEqual(cm.exception.errno, errno.EBADF)434        # the SelectorKey has been removed435        with self.assertRaises(KeyError):436            s.get_key(bad_f)437@unittest.skipUnless(hasattr(selectors, 'DevpollSelector'),438                     "Test needs selectors.DevpollSelector")439class DevpollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn):440    SELECTOR = getattr(selectors, 'DevpollSelector', None)441def test_main():442    tests = [DefaultSelectorTestCase, SelectSelectorTestCase,443             PollSelectorTestCase, EpollSelectorTestCase,444             KqueueSelectorTestCase, DevpollSelectorTestCase]445    support.run_unittest(*tests)446    support.reap_children()447if __name__ == "__main__":...stores.py
Source:stores.py  
1from selenium import webdriver2from selenium.common.exceptions import TimeoutException3from selenium.webdriver.support import expected_conditions as EC4from selenium.webdriver.common.by import By5from selenium.webdriver.common.keys import Keys6from selenium.webdriver.support.ui import WebDriverWait7import loc_data8import aux_func9def academy(obj):10    price_selectors = {"input#dlItemPrice":"value",}11    sale_selectors = {"span#currentPrice":"innerHTML",}12    broken_link_selectors = {"p#search_results_total_count":"innerHTML"}13    try:14        obj.pricing(price_selectors,sale_selectors,broken_link_selectors)15    except:16        obj.log("Failed to aquire pricing data")17    #No third party18    #check out of stock19    try:20        try:21            oos = obj._driver.find_element_by_css_selector("button#add2CartBtn").get_attribute("innerHTML")22        except:23            oos = "in stock"24        if "Out of Stock" in oos:25            obj.set_out_of_stock()26    except:27        obj.log("Out of stock check failed")28    finally:29        try:30            obj.kill_driver()31        except:32            obj.log("Error Deleting Driver")33    return34def acehardware(obj):35    price_selectors = {"div.productPrice span script":"innerHTML",}36    try:37        obj.pricing(price_selectors)38    except:39        obj.log("Failed to aquire pricing data")40    #No third party41    #No out of stock42    finally:43        try:44            obj.kill_driver()45        except:46            obj.log("Error Deleting Driver")47    return48def autozone(obj):49    price_selectors = {"table.part-pricing-info td.price.base-price" : "innerText",}50    sale_selectors = {"span.price.light-gray>strong":"innerHTML"}51    broken_link_selectors = {"":""}52    loc_ins="loc_data.autozone(self)"53    try:54        obj.pricing(price_selectors,sale_selectors,broken_link_selectors,loc_ins)55    except:56        obj.log("Failed to acquire pricing data")57    #No third party58    #out of stock check59    try:60        if obj._find_data("div.button-bar-msg-out-of-stock","innerText|||Not Available"):61            obj.set_out_of_stock()62    except:63        obj.log("out of stock check failed")64    finally:65        try:66            obj.kill_driver()67        except:68            obj.log("Error Deleting Driver")69    return70def basspro(obj):71    loc_ins = """72bpsku = loc_data.basspro(obj)73for p,value in price_dict.iteritems():74price_dict[p.format(bpsku)] = price_dict.pop(p)75for p,value in sale_dict.iteritems():76sale_dict[p.format(bpsku)] = sale_dict.pop(p)"""77    price_selectors = {"div.top.namePartPriceContainer span[itemprop=price]" : "innerHTML","span#listPrice_{}.old_price":"innerHTML",\78    "span#offerPrice_{} > span":"innerHTML","div[itemprop=offers]>span[itemprop=price]":"content"}79    sale_selectors = {"span#offerPrice_{}.price.sale > span":"innerHTML"}80    broken_link_selectors = {"":""}81    try:82        obj.pricing(price_selectors,sale_selectors,broken_link_selectors)83    except:84        obj.log("Failed to acqure pricing data")85    finally:86        try:87            obj.kill_driver()88        except:89            obj.log("Error Deleting Driver")90    return91def blain(obj):92    price_selectors = {"meta[itemprop=lowprice]":"content",\93    "div.active-price>div.price>span":"innerHTML",\94    "div.original-price>span.price>span":"innerHTML"}95    sale_selectors = {"div.active-price.promo > div.price > span:not([class])":"innerHTML",}96    broken_link_selectors = {"div.list-header-text > span":"innerHTML"}97    try:98        obj.pricing(price_selectors,sale_selectors,broken_link_selectors)99    except:100        obj.log("Failed to acquire pricing data")101    #No third party102    try:103        if not obj._find_data("span.stock-msg.in-stock"):104            obj.set_out_of_stock()105    except:106        obj.log("Out of stock check failed")107    finally:108        try:109            obj.kill_driver()110        except:111            obj.log("Error Deleting Driver")112    return113def bootbarn(obj):114    price_selectors = {"span.price-original.price-holder-alt":"innerHTML",\115    "h6.product-callout-title > strong":"innerHTML"}116    sale_selectors = {"h6.product-callout-title > strong":"innerHTML"}117    broken_link_selectors = {"":""}118    try:119        obj.pricing(price_selectors,sale_selectors,broken_link_selectors)120    except:121        obj.log("Failed to aquire pricing data")122    #no third Party123    #no out of stock124    finally:125        try:126            obj.kill_driver()127        except:128            obj.log("Error Deleting Driver")129    return130def cabela(obj):131    price_selectors = {"dd.regularnprange":"innerHTML",\132    "div.price > dl > dd.nprange":"innerHTML",\133    "div.price > dl > dd.prange":"innerHTML"}134    sale_selectors = {"dd.saleprice":"innerHTML"}135    broken_link_selectors = {"":""}136    try:137        obj.pricing(price_selectors,sale_selectors,broken_link_selectors)138    except:139        obj.log("Failed to acquire pricing data")140    finally:141        try:142            obj.kill_driver()143        except:144            obj.log("Error Deleting Driver")145    return146def Chewy(obj):147    price_selectors = {"span.ga-eec__price" : "innerHTML",}148    sale_selectors = {"p.autoship-pricing" : "innerHTML"}149    broken_link_selectors = {"":""}150    try:151        obj.pricing(price_selectors,sale_selectors,broken_link_selectors)152    except:153        raise154        obj.log("Failed to acquire pricing data")155    #No third party156    try:157        if obj._find_data("div#availability span.unavailable"):158            obj.set_out_of_stock()159    except:160        obj.log("Out of Stock check failed")161    finally:162        try:163            obj.kill_driver()164        except:165            obj.log("Error Deleting Driver")166    return167def dickeybub(obj):168    price_selectors = {"p.price > del > span.woocommerce-Price-amount.amount" : "innerHTML",\169    "p.price > span.woocommerce-Price-amount.amount" : "innerHTML",}170    sale_selectors = {"p.price > ins > span.woocommerce-Price-amount.amount" : "innerHTML",}171    try:172        obj.pricing(price_selectors,sale_selectors,)173    except:174        obj.log("Failed to acquire pricing data")175    finally:176        try:177            obj.kill_driver()178        except:179            obj.log("Error Deleting Driver")180    return181def farm_and_home(obj):182    if obj.comp_id == 25:183        loc_ins = "loc_data.farm_and_home(self,10)"184    elif obj.comp_id == 4:185        loc_ins = "loc_data.farm_and_home(self,2)"186    price_selectors = {"div.product-main-info > div.price-box > span.regular-price > span.price" : "innerHTML", "div.product-info-price span.price" : "innerHTML"}187    sale_selectors = {}188    broken_link_selectors = {}189    try:190        obj.pricing(price_selectors,sale_selectors,broken_link_selectors,loc_ins)191    except:192        obj.log("Failed to acquire pricing data")193    finally:194        try:195            obj.kill_driver()196        except:197            obj.log("Error Deleting Driver")198    #obj.log("Competitor: %d not yet defined" %obj.comp_id)199    #obj.set_undefined()200    return201def home_depot(obj):202    if obj.comp_id == 23:203        loc_ins = "loc_data.home_depot(self,62226)"204    elif obj.comp_id == 5:205        loc_ins = "loc_data.home_depot(self,63028)"206    elif obj.comp_id == 17:207        loc_ins = "loc_data.home_depot(self,62650)"208    price_selectors = {"input#ciItemPrice":"value","span#ajaxPriceStrikeThru":"innerHTML","span#ajaxPriceAlt":"innerHTML",\209    "span#ajaxPrice":"content"}210    sale_selectors = {"span#ajaxPrice":"content"}211    broken_link_selectors = {"div.buybelt__flex-wrapper.buybelt__store-wrapper span.u__text--danger":"innerHTML|||Unavailable",\212    "div#productinfo_ctn > div.error >p":"innerHTML|||not currently available"}213    try:214        obj.pricing(price_selectors,sale_selectors,broken_link_selectors,loc_ins)215    except:216        obj.log("Failed to acquire pricing data")217        #Out of stock check218    try:219#Checking for hidden field that indicates availability220        try:221            if obj._find_data("input#availableInLocalStore","value|||false"):222                obj.set_out_of_stock()223        except:224            pass225#Check for if the quantity box under Pick Up In Store is zero226        try:227            WebDriverWait(obj._driver, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, "div.buybelt__box")))228            if '0' == str(obj._retrieve_data("span.quantity","innerHTML")):229                obj.set_out_of_stock()230        except TimeoutException as error:231            pass232    except:233        obj.log("Out of stock check failed")234    finally:235        try:236            obj.kill_driver()237        except:238            obj.log("Error Deleting Driver")239    return240def lowes(obj):241    if obj.comp_id == 6:242        loc_ins = "loc_data.lowes(self,63028,'Festus')"243    elif obj.comp_id == 15:244        loc_ins = "loc_data.lowes(self,63701,'Cape Girardeau')"245    elif obj.comp_id == 16:246        loc_ins = "loc_data.lowes(self,62704,'Springfield')"247    elif obj.comp_id == 24:248        loc_ins = "loc_data.lowes(self,62221,'Belleville')"249    price_selectors = {"input[name=productId]":"data-productprice","span.secondary-text.small-type.art-pd-wasPriceLbl":"innerHTML",\250    "span[itemprop=price]":"content","span.primary-font.jumbo.strong.art-pd-price":"innerHTML"}251    sale_selectors = {"span.primary-font.jumbo.strong.art-pd-contractPricing":"innerHTML"}252    broken_link_selectors = {"div.alert.alert-warning i.icon-error-outline.red":"",\253    " div.pd-shipping-delivery.met-fulfillment-delivery.grid-50.tablet-grid-50 div.media-body > p":"innerHTML|||unavailable"}254    try:255        obj.pricing(price_selectors,sale_selectors,broken_link_selectors,loc_ins)256    except:257        raise258        obj.log("Failed to acquire pricing data")259    #Out of stock check260    try:261        if obj._find_data("div.fulfillment-method div.media-body >p","innerHTML|||Unavailable"):262            obj.set_out_of_stock()263    except:264        obj.log("Out of stock check failed")265    finally:266        try:267            obj.kill_driver()268        except:269            obj.log("Error Deleting Driver")270    return271def menards(obj):272    if obj.comp_id == 7:273        loc_ins = "loc_data.menards(self,'3286')"274    elif obj.comp_id == 26:275        loc_ins = "loc_data.menards(self,'3334')"276    elif obj.comp_id == 27:277        loc_ins = "loc_data.menards(self,'3293')"278    price_selectors = {"span.bargainStrike" : "innerHTML",\279    "span.EDLP.fontSize16.fontBold.alignRight" : "innerHTML",\280    "span#totalItemPriceFloater" : "innerHTML",281    "span.finalPriceSpan.leftFloat":"innerText" }282    sale_selectors = {"span.bargainPrice" : "innerHTML", \283    "span#totalItemPriceFloater" : "innerHTML",284    "span.finalPriceSpan.leftFloat":"innerText"}285    broken_link_selectors = {"h3.resettitle":"innerHTML"}286    try:287        obj.pricing(price_selectors,sale_selectors,broken_link_selectors,loc_ins)288    except:289        obj.log("Failed to acquire pricing data")290    finally:291        try:292            obj.kill_driver()293        except:294            obj.log("Error Deleting Driver")295    return296def orscheln(obj):297    price_selectors = {"span.product_unit_price" : "innerHTML","meta[itemprop=price]" : "content"}298    sale_selectors = {"":""}299    broken_link_selectors = {"":""}300    try:301        obj.pricing(price_selectors,sale_selectors,broken_link_selectors)302    except:303        obj.log("Failed to acquire pricing data")304    #No third party305    #No out of stock306    try:307        if obj._find_data("div.product-info-stock-sku","innerHTML|||Out of Stock Online"):308            obj.set_out_of_stock()309    except:310        obj.log("Out of Stock check failed")311    finally:312        try:313            obj.kill_driver()314        except:315            obj.log("Error Deleting Driver")316    return317def petsense(obj):318    obj.log("Competitor: %d not yet defined" %obj.comp_id)319    obj.set_undefined()320    return321    price_selectors = {"div#product_price span.money" : "innerHTML",}322    sale_selectors = {"":""}323    broken_link_selectors = {"div.selector-wrapper>select.single-option-selector":"innerHTML"}324    try:325        obj.pricing(price_selectors,sale_selectors,broken_link_selectors)326    except:327        obj.log("Failed to acquire pricing data")328    #No third party329    #No out of stock330    finally:331        try:332            obj.kill_driver()333        except:334            obj.log("Error Deleting Driver")335    return336def ruralking(obj):337    price_selectors = {"meta[itemprop=price]":"content","meta[property='product:price:amount']":"content","span.price" : "innerHTML"}338    #"span[itemprop=offers] > span[itemprop=price]":"innerHTML"}339    sale_selectors = {"":""}340    broken_link_selectors = {"div.bluefoot-row.bluefoot-structural.with-media-background.not-found-404-page span" : "innerHTML|||SORRY",\341    "div.page-head-alt >h2":"innerHTML|||Sorry","div.page-head-alt >h3":"innerHTML|||Sorry"}342    try:343        obj.pricing(price_selectors,sale_selectors,broken_link_selectors)344    except:345        obj.log("Failed to acquire pricing data")346    try:347        if "OUT OF STOCK" in obj._retrieve_data("div.product-shop h1[style]","innerHTML"):348            obj.set_out_of_stock()349        elif obj._find_data("p.prod_availability >span.backorder"):350            obj.set_out_of_stock()351    except:352        obj.log("Out of stock check failed")353    finally:354        try:355            obj.kill_driver()356        except:357            obj.log("Error Deleting Driver")358    return359def sears(obj):360    price_selectors = {"span.price-wrapper":"innerHTML"}361    sale_selectors = {"h4.redSalePrice span.price-wrapper":"innerHTML"}362    broken_link_selectors = {"":""}363    try:364        obj.pricing(price_selectors,sale_selectors,broken_link_selectors)365    except:366        obj.log("Failed to acquire pricing data")367    #No Third Party368    #No out of Stock369    finally:370        try:371            obj.kill_driver()372        except:373            obj.log("Error Deleting Driver")374    return375def shelper(obj):376    price_selectors = {"div.product-content-inner > div.product-price > span.price-original.price-holder-alt > strong" : "innerHTML",}377    sale_selectors = {"div.product-content-inner > div.product-callout > h6.product-callout-title > strong" : "innerHTML",}378    broken_link_selectors = {"":""}379    try:380        obj.pricing(price_selectors,sale_selectors)381    except:382        obj.log("Failed to acquire pricing data")383        #No third party384        #No out of stock385    finally:386        try:387            obj.kill_driver()388        except:389            obj.log("Error Deleting Driver")390    return391def tsc(obj):392    #view in cart item393    # https://www.tractorsupply.com/tsc/product/jonsered-502cc-gas-chainsaw-cs2250s?cm_vc=-10005394    if obj.comp_id == 73:395        loc_ins = "loc_data.tsc(self,'63049')"396    elif obj.comp_id == 74:397        loc_ins = "loc_data.tsc(self,'63701')"398    elif obj.comp_id == 8:399        loc_ins = "loc_data.tsc(self,'63640')"400    elif obj.comp_id == 124:401        loc_ins = "loc_data.tsc(self,'63801')"402    price_selectors = {"div.was_save_sku span.was_text":"innerHTML","span.dollar_price":"innerHTML"}403    sale_selectors = {"span.dollar_price":"innerHTML"}404    broken_link_selectors = {"div#WC_GenericError_6.info":"innerHTML","div#UnpubProductErrormsg":"innerHTML|||selection below"}405    try:406        obj.pricing(price_selectors,sale_selectors,broken_link_selectors,loc_ins)407    except:408        obj.log("Failed to acquire pricing data")409        #no third party410        #no out of stock411    finally:412        try:413            obj.kill_driver()414        except:415            obj.log("Error Deleting Driver")416    return417def valleyvet(obj):418    obj.log("Competitor: %d not yet defined" %obj.comp_id)419    obj.set_undefined()420    return421def walmart(obj):422    price_selectors = {"span.price-characteristic[itemprop=price]":"content","div.Price-old.display-inline-block.arrange-fill.font-normal.u-textNavyBlue.display-inline > span.Price-group" : "title",\423    "div.prod-BotRow.prod-showBottomBorder.prod-OfferSection.prod-OfferSection-twoPriceDisplay div.Grid-col:nth-child(4) span.Price-group" : "title",\424    "span.display-inline-block.arrange-fit.Price.Price-enhanced.u-textNavyBlue > span.Price-group" : "title",\425    "span.display-inline-block.arrange-fit.Price.Price-enhanced.u-textGray > span.Price-group" : "title",}426    broken_link_selectors = {"div.font-semibold.prod-Bot-partial-head" : "innerHTML",\427    "p.error-ErrorPage-copy":"innerHTML"}428    sale_selectors = {}429    try:430        obj.pricing(price_selectors,sale_selectors,broken_link_selectors)431    except:432        obj.log("Failed to aquire pricing data")433    #check for Third party434    try:435        if not obj._find_data("a[data-tl-id=ProductSellerInfo-SellerName]","innerHTML|||Walmart"):436            sellers = obj._driver.find_elements_by_css_selector("div.marketplace-body")437            for sell in sellers:438                if sell.find_element_by_css_selector("span.seller-shipping-msg.u-textBlue").get_attribute("innerHTML").encode('utf-8') == 'Walmart':439                    obj.set_price(aux_func.clean(sell.find_element_by_css_selector("span.Price-group").get_attribute('title')))440                    break441            else:442                obj.set_third_party()443    except:444        obj.log("Third party check failed")445    #check Out of stock446    try:447        try:448            oos = obj._driver.find_element_by_css_selector("div.prod-ProductOffer-oosMsg.prod-PaddingTop--xxs > span").get_attribute("innerHTML")449        except:450            oos = "in stock"451        if "Out of stock" in oos:452            obj.set_out_of_stock()453    except:454        obj.log("Out of stock check failed")455    finally:456        try:457            obj.kill_driver()458        except:459            obj.log("Error Deleting Driver")460    return461def _default(obj):462    obj.log("Unknown Competitor ID")463    obj.set_undefined()...flowablebaseandselectors.py
Source:flowablebaseandselectors.py  
...79                # is untouched.80                if isinstance(result.right, IdentitySeqMapInfo):81                    # extend right selector with the observable that maps the selector_base to base82                    if isinstance(result.left, ObservableSeqMapInfo):83                        selector_map = ObservableSeqMapInfo(merge_selectors(84                            result.left.observable,85                            selector_obs,86                            subscribe_scheduler=subscriber.scheduler,87                            stack=stack,88                        ))89                        if other.selectors is not None:90                            def gen_new_selectors():91                                for key, val in other.selectors.items():92                                    selector = merge_selectors(93                                        left=merge_selectors(94                                            left=val,95                                            right=result.left.observable,96                                            subscribe_scheduler=subscriber.scheduler,97                                            stack=stack,98                                        ),99                                        right=selector_obs,100                                        subscribe_scheduler=subscriber.scheduler,101                                        stack=stack,102                                    )103                                    yield key, selector104                            selectors = dict(gen_new_selectors())105                        else:106                            selectors = None107                    elif isinstance(result.left, IdentitySeqMapInfo):108                        selector_map = ObservableSeqMapInfo(selector_obs)109                        if other.selectors is not None:110                            def gen_new_selectors():111                                for key, val in other.selectors.items():112                                    selector = merge_selectors(113                                        left=val,114                                        right=selector_obs,115                                        subscribe_scheduler=subscriber.scheduler,116                                        stack=stack,117                                    )118                                    yield key, selector119                            selectors = dict(gen_new_selectors())120                        else:121                            selectors = None122                    else:123                        raise Exception(f'illegal result "{result.left}"')124                    if selectors is not None or self.selectors is not None:125                        if selectors is not None:126                            if self.selectors is not None:127                                selectors = {**self.selectors, **selectors}128                            else:129                                selectors = selectors130                        else:131                            selectors = self.selectors132                    return FlowableBaseAndSelectors(self.base, selectors), result.right, selector_map133        return None134    def match_with(135            self,136            other: 'FlowableBaseAndSelectors',137            subscriber: Subscriber,138            stack: List[FrameSummary],139    ) -> Optional[FlowableBaseAndSelectorsMatch]:140        # bases are of type Optional[Base], therefore check first if base is not None141        if self.base is not None and other.base is not None:142            result = self.base.match_with(other.base, subscriber=subscriber, stack=stack)143            # this BaseAndSelectors and the other BaseAndSelectors match directly with144            # their bases145            if isinstance(result, FlowableBaseMatch):146                # the selectors can be taken over to the new base selector tuple147                if self.selectors is not None and other.selectors is not None:148                    if self.selectors is not None:149                        if other.selectors is None:150                            selectors = self.selectors151                        else:152                            selectors = {**self.selectors, **other.selectors}153                    else:154                        selectors = other.selectors155                else:156                    selectors = {}157                return FlowableBaseAndSelectorsMatch(158                    base_selectors=FlowableBaseAndSelectors(159                        base=result.base,160                        selectors=selectors,161                    ),162                    left=result.left,163                    right=result.right,164                )165        result = self.get_selector_maps(166            other=other,167            subscriber=subscriber,168            stack=stack,169        )170        if result is not None:171            base_selectors, left_selector_map, right_selector_map = result172            return FlowableBaseAndSelectorsMatch(173                left=left_selector_map,174                right=right_selector_map,175                base_selectors=base_selectors,176            )177        result = other.get_selector_maps(178            other=self,179            subscriber=subscriber,180            stack=stack,181        )182        if result is not None:183            base_selectors, right_selector_map, left_selector_map = result184            return FlowableBaseAndSelectorsMatch(185                left=left_selector_map,186                right=right_selector_map,187                base_selectors=base_selectors,188            )189        # two bases B1, B2 defined in selectors match, the following map is created190        # - define a new anonymous base B_match191        # - define two selectors that map a sequence with base B1 and B2 to base B_match192        #   in case B1 and B2 match with identity, only define no selector193        # - define two selectors that map the two sequences of the match operator to base B_match194        if self.selectors is not None and other.selectors is not None:195            # check if some tuple of bases matches196            for sel_base_1, sel_observable_1 in self.selectors.items():197                for sel_base_2, sel_observable_2 in other.selectors.items():198                    result = sel_base_1.match_with(199                        sel_base_2,200                        subscriber=subscriber,201                        stack=stack,202                    )203                    # if two bases match ...204                    if isinstance(result, FlowableBaseMatch):205                        # once right is completed, keep consuming left side until it is completed as well206                        def request_left(left, right):207                            return not (isinstance(left, SelectCompleted) and isinstance(right, SelectNext))208                        def request_right(left, right):209                            return not (isinstance(right, SelectCompleted) and isinstance(left, SelectNext))210                        if isinstance(result.left, IdentitySeqMapInfo) and isinstance(result.right, IdentitySeqMapInfo):211                            merge_sel = RefCountObservable(212                                source=ControlledZipObservable(213                                    left=sel_observable_1, #init_debug_observable(sel_observable_1, name='d1', stack=stack, subscriber=subscriber),214                                    right=sel_observable_2, #init_debug_observable(sel_observable_2, name='d1', stack=stack, subscriber=subscriber),215                                    request_left=request_left,216                                    request_right=request_right,217                                    match_func=lambda _, __: True,218                                    scheduler=subscriber.scheduler,219                                    stack=stack,220                                ), #name='d2', stack=stack, subscriber=subscriber),221                                subject=PublishObservableSubject(),222                                subscribe_scheduler=subscriber.subscribe_scheduler,223                                stack=stack,224                            )225                            def left_selector(t):226                                if isinstance(t[1], SelectNext):227                                    return [select_next, select_completed]228                                else:229                                    return [select_completed]230                            def right_selector(t):231                                if isinstance(t[0], SelectNext):232                                    return [select_next, select_completed]233                                else:234                                    return [select_completed]235                            left_sel = RefCountObservable(236                                source=MapToIteratorObservable(237                                    source=FilterObservable(238                                        source=merge_sel,239                                        predicate=lambda t: isinstance(t[0], SelectNext),240                                    ),241                                    func=left_selector,242                                ),243                                subject=PublishObservableSubject(),244                                subscribe_scheduler=subscriber.subscribe_scheduler,245                                stack=stack,246                            )247                            right_sel = RefCountObservable(248                                source=MapToIteratorObservable(249                                    source=FilterObservable(250                                        source=merge_sel,251                                        predicate=lambda t: isinstance(t[1], SelectNext),252                                    ),253                                    func=right_selector,254                                ),255                                subject=PublishObservableSubject(),256                                subscribe_scheduler=subscriber.scheduler,257                                stack=stack,258                            )259                            right_selector_map = MapObservable(260                                source=FilterObservable(261                                    source=merge_sel,262                                    predicate=lambda t: type(t[0]) == type(t[1]),263                                ),264                                func=lambda t: t[0],265                            )266                            return FlowableBaseAndSelectorsMatch(267                                left=ObservableSeqMapInfo(left_sel),268                                right=ObservableSeqMapInfo(right_sel),269                                base_selectors=FlowableBaseAndSelectors(270                                    base=None,271                                    selectors={272                                        sel_base_1: right_selector_map,273                                        **{k: merge_selectors(v, left_sel, subscribe_scheduler=subscriber.scheduler, stack=stack) for k, v in274                                           self.selectors.items() if k != sel_base_1},275                                        **{k: merge_selectors(v, right_sel, subscribe_scheduler=subscriber.scheduler, stack=stack) for k, v in276                                           other.selectors.items() if k != sel_base_2}277                                    },278                                ),279                            )...test_baseselectortuple.py
Source:test_baseselectortuple.py  
...93    # #94    # #95    # #     t1 = BaseAndSelectors(base=TestBase(self.num_base_1), selectors={self.num_base_2: self.sel2})96    # #     t2 = BaseAndSelectors(base=self.num_base_2, selectors={self.num_base_3: self.sel3})97    # #     result: Optional[BaseSelectorAndSelectorMaps] = t1.get_selectors(t2, self.subscriber)98    # #99    # #     self.assertIsInstance(result, BaseSelectorAndSelectorMaps)100    # #     self.assertIsInstance(result.right, IdentitySelectorMap)101    # #     self.assertIsInstance(result.left, IdentitySelectorMap)102    # #     self.assertIsInstance(result.base_selectors.base, NumericalBase)103    # #     self.assertEqual(1, result.base_selectors.base.num)104    # #     self.assertEqual({2: None, 3: None}, result.base_selectors.selectors)105    # def test_not_matching(self):106    #     t1 = BaseAndSelectors(base=NumericalBase(1))107    #     t2 = BaseAndSelectors(base=NumericalBase(2))108    #     result: Optional[BaseSelectorsAndSelectorMaps] = t1.match_with(t2, self.subscriber)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
