Best Python code snippet using playwright-python
feed.py
Source:feed.py  
...206                break207        if not systempost:208            self.sys_posts_count = len(sys_posts_content)209        return systempost210    def _on_page(self, data, add_bottom=True, **kwargs):211        if not data:212            self.loading = False213            return214        # only insert when getting next page -- i.e. adding from the bottom215        systempost = self._on_page_add_system_post() if add_bottom else None216        removed = removed_ds.data()217        systempost_place = 0218        for pdata in data:219            systempost_place += 1220            if systempost and systempost_place == 4:221                self._add_system_post(222                    key='system_post_%s' % (self.sys_posts_count + 1),223                    index=0,224                    **systempost)225            post = pdata.get('post', None)226            hash = pdata.get('hash', None)227            if post:228                key = post.get('key', None)229                if key in removed:230                    continue231                d = convert_json_from_unicode(post)232                d['hash'] = hash233                if not key:234                    continue235                if self.stream_system and self.stream_system.key in d['channels']:236                    print 'Got system post'237                    self.stream_system.hash_bounds = hash238                    if len(d['content']) > 2 and d['content'][0] == '#':239                        print 'Systempost -- deep link'240                        key = d['content'][1:]241                        def on_get_deeplink(data, hash):242                            print 'Got deep links data'243                            if len(data) > 0:244                                wrapper = {'hash': hash, 'post': data[0]}245                                self._on_page([wrapper], add_bottom=False)246                        from api.items import get as getPostData247                        getPostData(keys=[key], on_items=on_get_deeplink)248                        continue249                    d['background'] = 'bkg.jpg'250                    d['when'] = 'once'251                    if d['role_text'].lower() == 'invite':252                        d['action'] = 'invite'253                        d['button'] = 'send invitation'254                    elif d['role_text'].lower() == 'linkedin':255                        d['action'] = 'linkedin'256                        d['button'] = 'Login with linkedin'257                    else:258                        d['action'] = 'close'259                    d['key'] = 'system_post_%s' % (self.sys_posts_count + 1)260                    d['index'] = -1261                    self._add_system_post(**d)262                    continue263                if add_bottom:264                    self.container.enqueue(key, d, index=0)265                else:266                    self.container.enqueue(key, d, index=-1)267        self.loading = False268    def _on_updates(self, data, hash=None):269        self.last_update_hash = hash270        removed = removed_ds.data()271        for pdata in data:272            post = pdata.get('obj', None)273            if post:274                key = post.get('key', None)275                if key and key not in removed:276                    self.container.enqueue(key, post)277    def _update(self, *largs):278        if not self.stream:279            return280        keys = self.container.get_widget_keys()281        self._get_next_page()282        args = {'hash': self.last_update_hash} if self.last_update_hash else {}283        get_updates(284            keys=keys,285            on_updates=self._on_updates,286            on_error=self.feed_network_error_message,287            **args)288    def _on_nextpage_getsystemposts(self, data, add_bottom=True):289        if self.stream_system and self.stream:290            if not self.stream_system.hash_bounds:291                self.stream_system.hash_bounds = self.stream.hash_bounds292            self.stream_system.get_next_page(293                on_page=partial(self._on_page, add_bottom=False),294                on_error=self.feed_network_error_message)295        self._on_page(data, add_bottom=add_bottom)296    def _get_page(self, *args):297        if self.stream:298            self.stream.get_current_page(299                on_page=self._on_nextpage_getsystemposts,300                on_error=self.feed_network_error_message)301    def _get_prev_page(self, *args):302        LogTestFairy('Feed prev page')303        if self.stream:304            self.stream.get_prev_page(305                on_page=self._on_page,306                on_error=self.feed_network_error_message)307    @staticmethod308    def feed_network_error_message(*args):309        return...data_manager.py
Source:data_manager.py  
1# This class will deal with the low-level data management, saving and loading to files.2import enum3import math4import os5import random6from typing import Any, Iterator, List, Tuple7import attr8import glob9import numpy as np10from go_space import exceptions11from . import datum_lib12Batch = Any  # List[np.ndarray, np.ndarray]13Data = List[datum_lib.Datum]14PAGE_SIZE = 20015PAGES_IN_MEMORY = 1016class TrainTest(enum.Enum):17    TRAIN = 118    TEST = 219@attr.s20class Page(object):21    page_num: int = attr.ib()22    content: Data = attr.ib()23    def __len__(self) -> int:24        return len(self.content)25# TODO: Clean up26class DataManager(object):27    def __init__(self, tgt_dir):28        # TODO: Rename cursors to be include "write".  These are a mess.29        self.page_cursor = -130        self.entry_cursor = 031        self.test_pages = set()32        self._page_cache = list()33        self.data_path = tgt_dir34        # Used in the course of generating batches35        self.reset()36        self._note_existing_pages()37    def _note_existing_pages(self) -> None:38        # Assumes files are written 1, 2, ..., n39        num_files = len(glob.glob(os.path.join(self.data_path, "*.txt")))40        self.page_cursor = num_files41        self.entry_cursor = 042        if os.path.exists(os.path.join(self.data_path, str(self.page_cursor) + ".txt")):43            self.entry_cursor = len(self._read_page(self.page_cursor))44    def _read_page(self, page_num: int) -> Page:45        if page_num > self.page_cursor:46            raise exceptions.DataException(f"Page {page_num} doesn't exist")47        # Check cache first48        for page in self._page_cache:49            if page.page_num == page_num:50                return page51        # Read with an LRU cache52        page_data = list()53        with open(os.path.join(self.data_path, str(page_num) + ".txt"), "r") as f:54            for line in f.readlines():55                page_data.append(datum_lib.Datum.from_json(line))56        page = Page(page_num=page_num, content=page_data)57        self._page_cache = [page] + self._page_cache58        self._page_cache = self._page_cache[:PAGES_IN_MEMORY]59        return page60    def _read_entry(self, page_num: int, entry_num: int) -> datum_lib.Datum:61        page = self._read_page(page_num)62        if entry_num >= len(page):63            raise exceptions.DataException(64                f"Trying to read entry {entry_num} off of page {page_num}, but entries only go to {len(page)-1}."65            )66        return page.content[entry_num]67    def _choose_next(self, data_split: TrainTest) -> Tuple[int, int]:68        # Pick a random page, then go through the data on that page in order.  Subject to change, I suppose.69        def choose_new_page() -> int:70            if data_split == TrainTest.TRAIN:71                if len(self._read_train_pages | self.test_pages) == self.page_cursor:72                    raise exceptions.DataException("Tried to read too many pages.")73            if data_split == TrainTest.TEST:74                if len(self._read_test_pages) == len(self.test_pages):75                    raise exceptions.DataException("Tried to read too many pages.")76            def already_read(try_page: Page) -> bool:77                nonlocal data_split78                if data_split == TrainTest.TRAIN:79                    return try_page in self._read_train_pages80                if data_split == TrainTest.TEST:81                    return try_page in self._read_test_pages82            def wrong_data(try_page: Page) -> bool:83                nonlocal data_split84                if data_split == TrainTest.TRAIN:85                    return try_page in self.test_pages86                if data_split == TrainTest.TEST:87                    return try_page not in self.test_pages88            try_page = random.randint(0, self.page_cursor - 1)89            while already_read(try_page) or wrong_data(try_page):90                try_page = random.randint(0, self.page_cursor - 1)91            # Mark as read92            if data_split == TrainTest.TRAIN:93                self._read_train_pages.add(try_page)94            if data_split == TrainTest.TEST:95                self._read_test_pages.add(try_page)96            return try_page97        if self._on_page == -1:98            self._on_page = choose_new_page()99            self._read_cursor = 0100        if self._read_cursor >= len(self._read_page(self._on_page)):101            self._on_page = choose_new_page()102            self._read_cursor = 0103        result = (self._on_page, self._read_cursor)104        self._read_cursor += 1105        return result106    def size(self) -> int:107        return (self.page_cursor - 1) * PAGE_SIZE + self.entry_cursor108    def save_datum(self, datum: datum_lib.Datum) -> None:109        if self.page_cursor == -1 or self.entry_cursor == PAGE_SIZE:110            self.page_cursor += 1111            self.entry_cursor = 0112        with open(113            os.path.join(self.data_path, str(self.page_cursor) + ".txt"), "a"114        ) as f:115            f.write(datum.to_json() + "\n")116        self.entry_cursor += 1117    def train_test_split(self, portion_test: float) -> None:118        # Will split on a page level119        if len(self.test_pages) > 0:120            raise exceptions.DataException("Ran train_test_split multiple times.")121        if self.page_cursor == -1:122            raise exceptions.DataException("No data saved.")123        num_test_pages = math.ceil(self.page_cursor * portion_test)124        for page in random.sample(range(self.page_cursor), num_test_pages):125            self.test_pages.add(page)126    def get_batch(127        self, batch_size: int, data_split: TrainTest, reset: bool = True128    ) -> Batch:129        # Should be semi-random.130        if self.page_cursor == -1:131            raise exceptions.DataException("No data saved.")132        if reset:133            self.reset()134        features, targets = list(), list()135        for _ in range(batch_size):136            next_datum = self._read_entry(*self._choose_next(data_split))137            features.append(next_datum.np_feature())138            targets.append(next_datum.np_target())139        return np.stack(features, axis=0), np.stack(targets, axis=0)140    def reset(self) -> None:141        """Needs to be called between looping batches"""142        print("RESET")143        self._read_train_pages = set()144        self._read_test_pages = set()145        self._on_page = -1146        self._read_cursor = 0147    def generate_batches(148        self, batch_size: int, data_split: TrainTest149    ) -> Iterator[Batch]:150        while True:...project_screen.py
Source:project_screen.py  
...100        self._previous.setDisabled(self._on_first_page())101        self._next.setDisabled(self._on_last_page())102        self._pages_navigation.setCurrentIndex(index)103    def _on_last_page(self):104        return self._on_page(self._page_count - 1)105    def _on_first_page(self):106        return self._on_page(0)107    def _on_page(self, index):108        return self.current_page == index109    def _insert_page(self, widget, position):110        self._pages.insertWidget(position, widget)111    def _remove_page(self, number):112        page = self._pages.widget(number)113        self._pages.removeWidget(page)114        page.setParent(None)115        page.close()116    @property117    def current_page(self):118        return self._pages.currentIndex()119    @property120    def _page_count(self):121        return self._pages.count()...dailypuzzle.py
Source:dailypuzzle.py  
...19    def run(self):20        d = self._account.get('community/index.phtml')21        d.addCallback(self._on_page)22        return d23    def _on_page(self, page):24        form = page.find('form', attrs=self._POLL_FORM_ATTRS)25        if not form:26            self._logger.info('Puzzle is not available')27            return28        d = self._outside_browser.get(29            'http://www.jellyneo.net/?go=dailypuzzle')30        d.addCallback(self._on_answers_page)31        return d32    def _on_answers_page(self, page):33        page = BeautifulSoup(page)34        daily_answer = page.find(text=self._ANSWER_DATE_RE)35        if not daily_answer:36            raise PageParseError(page)37        answer_date = datetime.datetime.strptime(...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
