How to use navigation_timeout method in Playwright Python

Best Python code snippet using playwright-python

Run Playwright Python automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

neyboy_env.py

Source: neyboy_env.py Github

copy
1import math
2import os
3
4import numpy as np
5
6import gym
7from gym import spaces, utils, logger
8from gym.utils import seeding
9
10from gym_neyboy.envs.neyboy import SyncGame, ACTION_NAMES, ACTION_LEFT, ACTION_RIGHT, GAME_OVER_SCREEN, \
11    DEFAULT_NAVIGATION_TIMEOUT, DEFAULT_GAME_URL
12
13
14class NeyboyEnv(gym.Env, utils.EzPickle):
15    metadata = {'render.modes': ['human', 'rgb_array']}
16
17    def __init__(self, headless=None, score_threshold=0.975, death_reward=-1, stay_alive_reward=0.1, user_data_dir=None):
18        utils.EzPickle.__init__(self, headless, score_threshold, death_reward)
19
20        if headless is None:
21            headless = os.environ.get('GYM_NEYBOY_ENV_NON_HEADLESS', None) is None
22
23        self.headless = headless
24        self.score_threshold = float(os.environ.get('GYM_NEYBOY_SCORE_THRESH', score_threshold))
25        self.stay_alive_reward = float(os.environ.get('GYM_NEYBOY_STAY_ALIVE_REWARD', stay_alive_reward))
26        self.death_reward = float(os.environ.get('GYM_NEYBOY_DEATH_REWARD', death_reward))
27
28        self._state = None
29        self.viewer = None
30
31        self.reward_strategy = os.environ.get('GYM_NEYBOY_REWARD_STRATEGY', 'cosine_thresh')
32
33        self.obs_for_terminal = os.environ.get('GYM_NEYBOY_OBS_AS_BYTES', None) is not None
34
35        navigation_timeout = int(os.environ.get('GYM_NEYBOY_ENV_TIMEOUT', DEFAULT_NAVIGATION_TIMEOUT))
36        game_url = os.environ.get('GYM_NEYBOY_GAME_URL', DEFAULT_GAME_URL)
37        browser_ws_endpoint = os.environ.get('GYM_NEYBOY_BROWSER_WS_ENDPOINT', None)
38
39        self._create_game(browser_ws_endpoint, game_url, headless, navigation_timeout, user_data_dir)
40        self._update_state()
41
42        self.observation_space = spaces.Box(low=0, high=255, shape=self.state.snapshot.shape, dtype=np.uint8)
43        self.action_space = spaces.Discrete(len(ACTION_NAMES))
44
45    def _create_game(self, browser_ws_endpoint, game_url, headless, navigation_timeout, user_data_dir):
46        if browser_ws_endpoint is not None:
47            self.game = SyncGame.create(navigation_timeout=navigation_timeout, game_url=game_url,
48                                        browser_ws_endpoint=browser_ws_endpoint)
49        else:
50            self.game = SyncGame.create(headless=headless, user_data_dir=user_data_dir,
51                                        navigation_timeout=navigation_timeout, game_url=game_url)
52
53    @property
54    def state(self):
55        return self._state
56
57    def _update_state(self):
58        if self.obs_for_terminal:
59            self._state = self.game.get_state(include_snapshot='bytes')
60        else:
61            self._state = self.game.get_state()
62
63    def step(self, a):
64        self.game.resume()
65        if a == ACTION_LEFT:
66            self.game.tap_left()
67        elif a == ACTION_RIGHT:
68            self.game.tap_right()
69        self._update_state()
70        self.game.pause()
71        is_over = self.state.status == GAME_OVER_SCREEN
72
73        if is_over:
74            reward = self.death_reward
75        else:
76            angle = self.state.position['angle']
77            cosine = math.cos(angle)
78            
79            if self.reward_strategy == 'cosine':
80                reward = cosine
81            elif self.reward_strategy == 'one':
82                reward = 1.0
83            elif self.reward_strategy == 'cosine_thresh':
84                reward = cosine if cosine > self.score_threshold else cosine * self.stay_alive_reward
85            else:
86                raise ValueError('Invalid reward strategy: {}'.format(self.reward_strategy))    
87    
88        logger.debug('HiScore: {}, Score: {}, Action: {}, Reward: {}, GameOver: {}'.format(
89            self.state.hiscore,
90            self.state.score,
91            ACTION_NAMES[a],
92            reward,
93            is_over))
94        return self._get_obs(), reward, is_over, dict(score=self.state.score, hiscore=self.state.hiscore, position=self.state.position['angle'])
95
96    def _get_obs(self):
97        return self.state.snapshot
98
99    def reset(self):
100        self.game.restart()
101        self._update_state()
102        self.game.pause()
103        return self._get_obs()
104
105    def render(self, mode='human', close=False):
106        img = self.state.snapshot
107        if mode == 'rgb_array':
108            return img
109        elif mode == 'human':
110            from gym.envs.classic_control import rendering
111            if self.viewer is None:
112                self.viewer = rendering.SimpleImageViewer()
113            self.viewer.imshow(img)
114            return self.viewer.isopen
115
116    def close(self):
117        if self.viewer is not None:
118            self.viewer.close()
119            self.viewer = None
120        self.game.stop()
121        super(NeyboyEnv, self).close()
122
123    def get_action_meanings(self):
124        return ACTION_NAMES
125
126    def seed(self, seed=None):
127        self.np_random, seed1 = seeding.np_random(seed)
128
129
130class NeyboyEnvAngle(NeyboyEnv):
131    def __init__(self, headless=None, score_threshold=0.95, death_reward=-1, stay_alive_reward=0.1, user_data_dir=None):
132        super().__init__(headless, score_threshold, death_reward, stay_alive_reward, user_data_dir)
133        self.observation_space = spaces.Box(low=-1, high=1, shape=(), dtype=np.float32)
134
135    def _update_state(self):
136        self._state = self.game.get_state(include_snapshot=None)
137
138    def _get_obs(self):
139        return self.state.position['angle']
140
Full Screen

neyboy.py

Source: neyboy.py Github

copy
1import base64
2import datetime as dt
3import io
4
5try:
6    from gym import logger
7except:
8    import logging as logger
9
10import random
11import re
12import uuid
13from collections import namedtuple
14
15import pathlib
16import numpy as np
17from PIL import Image
18from pyppeteer import launch, connect
19from syncer import sync
20
21ACTION_NAMES = ["NOOP", "LEFT", "RIGHT"]
22ACTION_NONE = 0
23ACTION_LEFT = 1
24ACTION_RIGHT = 2
25
26START_SCREEN = 0
27GAME_SCREEN = 1
28GAME_OVER_SCREEN = 2
29
30TOAST_APPEARANCE_FREQUENCY = 10
31
32GameState = namedtuple('GameState',
33                       ['game_id', 'id', 'score', 'status', 'hiscore', 'snapshot', 'timestamp', 'dimensions',
34                        'position'])
35
36DEFAULT_NAVIGATION_TIMEOUT = 60 * 1000
37DEFAULT_GAME_URL = 'http://fabito.github.io/neyboy/'
38DEFAULT_BROWSER_WS_ENDPOINT = 'ws://localhost:3000'
39DEFAULT_CHROMIUM_LAUNCH_ARGS = ['--no-sandbox', '--window-size=80,315', '--disable-infobars']
40
41
42class Game:
43
44    def __init__(self, headless=True, user_data_dir=None, navigation_timeout=DEFAULT_NAVIGATION_TIMEOUT,
45                 game_url=DEFAULT_GAME_URL, browser_ws_endpoint=None, initial_width=180, initial_height=320):
46
47        self.initial_width = initial_width
48        self.initial_height = initial_height
49        self.headless = headless
50        self.user_data_dir = user_data_dir
51        self.navigation_timeout = navigation_timeout
52        self.is_running = False
53        self.browser = None
54        self.page = None
55        self.state = None
56        self._dims = None
57        self.game_id = str(uuid.uuid4())
58        self.state_id = 0
59        self.game_url = game_url
60        self.browser_ws_endpoint = browser_ws_endpoint
61
62    async def initialize(self):
63        if self.browser_ws_endpoint is not None:
64            logger.info('Connecting to running instance at: %s', self.browser_ws_endpoint)
65            self.browser = await connect(browserWSEndpoint=self.browser_ws_endpoint)
66            self.page = await self.browser.newPage()
67        else:
68            logger.info('Launching new browser instance')
69            if self.user_data_dir is not None:
70                self.browser = await launch(headless=self.headless, userDataDir=self.user_data_dir,
71                                            args=DEFAULT_CHROMIUM_LAUNCH_ARGS)
72            else:
73                self.browser = await launch(headless=self.headless)
74
75            pages = await self.browser.pages()
76            if len(pages) > 0:
77                self.page = pages[0]
78            else:
79                self.page = await self.browser.newPage()
80
81        self.page.setDefaultNavigationTimeout(self.navigation_timeout)
82        await self.page.setViewport(dict(width=117, height=156))
83        await self.page.goto('{}?w={}&h={}'.format(self.game_url, self.initial_width, self.initial_height),
84                             {'waitUntil': 'networkidle2'})
85        envjs_path = pathlib.Path(__file__).resolve().parent.joinpath('env.js')
86        await self.page.addScriptTag(dict(path=str(envjs_path)))
87        await self.is_ready()
88
89    @staticmethod
90    async def create(headless=True, user_data_dir=None, navigation_timeout=DEFAULT_NAVIGATION_TIMEOUT,
91                     game_url=DEFAULT_GAME_URL, browser_ws_endpoint=None) -> 'Game':
92        o = Game(headless, user_data_dir, navigation_timeout, game_url, browser_ws_endpoint)
93        await o.initialize()
94        return o
95
96    @property
97    def x(self):
98        return int(self._dims['x'])
99
100    @property
101    def y(self):
102        return int(self._dims['y'])
103
104    @property
105    def width(self):
106        return int(self._dims['width'])
107
108    @property
109    def height(self):
110        return int(self._dims['height'])
111
112    async def dimensions(self):
113        dimensions = await self.page.evaluate('''() => {
114                return neyboyChallenge.dimensions();
115            }''')
116        return dimensions
117
118    async def is_ready(self):
119        await self.page.waitForFunction('''()=>{
120            return neyboyChallenge && neyboyChallenge.isReady();
121        }''')
122
123    async def start(self):
124        if random.randint(0, 1):
125            await self.tap_right()
126        else:
127            await self.tap_left()
128        await self._shuffle_toasts()
129        return self
130
131    async def _shuffle_toasts(self):
132        await self.page.evaluate('''() => {
133            neyboyChallenge.shuffleToasts();
134        }''')
135        return self
136
137    def is_over(self):
138        return self.state.status == GAME_OVER_SCREEN
139
140    async def _wait_until_replay_button_is_active(self):
141        await self.resume()
142        await self.page.waitForFunction('''() => {
143            return neyboyChallenge.isOver();
144        }''')
145
146    async def is_loaded(self):
147        return await self.is_ready()
148
149    async def pause(self):
150        await self.page.evaluate('''() => {
151            neyboyChallenge.pause();
152        }''')
153
154    async def resume(self):
155        await self.page.evaluate('''() => {
156            neyboyChallenge.resume();
157        }''')
158
159    async def get_score(self):
160        score = await self.page.evaluate('''() => {
161            return neyboyChallenge.getScore();
162        }''')
163        return int(score) if score else 1
164
165    async def get_high_score(self):
166        hiscore = await self.page.evaluate('''() => {
167                return neyboyChallenge.runtime !== undefined &&
168                       neyboyChallenge.runtime.getEventVariableByName('hiscore').data;
169                }''')
170        return int(hiscore) if hiscore else 1
171
172    async def get_scores(self):
173        scores = await self.page.evaluate('''() => {
174                const score = neyboyChallenge.getScore();
175                const hiscore = neyboyChallenge.runtime.getEventVariableByName('hiscore').data || 0;
176                return {score, hiscore};
177                }''')
178        scores['score'] = int(scores['score'])
179        scores['hiscore'] = int(scores['hiscore'])
180        return scores
181
182    async def tap_left(self, delay=0):
183        x = self.x + self.width // 4
184        y = self.y + self.height // 3
185        await self.page.mouse.click(x, y, {'delay': delay})
186
187    async def tap_right(self, delay=0):
188        x = (self.x + self.width) - self.width // 4
189        y = self.y + self.height // 3
190        await self.page.mouse.click(x, y, {'delay': delay})
191
192    async def stop(self):
193        await self.browser.close()
194
195    async def _hard_restart(self):
196        await self.page.reload({'waitUntil': 'networkidle2'})
197        await self.is_loaded()
198
199    async def restart(self):
200        self.game_id = str(uuid.uuid4())
201        self.state_id = 0
202
203        if self.state.status == GAME_SCREEN:
204            # commit suicide
205
206            while not self.is_over():
207                logger.debug('suiciding')
208                await self.tap_left()
209                await self.tap_left()
210                await self.tap_left()
211                await self.get_state()
212
213        if self.is_over():
214            await self._wait_until_replay_button_is_active()
215            x = self.x + self.width // 2
216            y = self.y + self.height - self.height // 7
217            await self.page.mouse.click(x, y)
218        elif self.state.status == START_SCREEN:
219            logger.debug('start screen')
220        else:
221            raise ValueError('Unknown state: {}'.format(self.state.status))
222
223        await self.start()
224
225    async def screenshot(self, format="jpeg", quality=30, encoding='binary'):
226        dims = await self.dimensions()
227        dims['y'] = dims['height'] / 2
228        dims['height'] = dims['height'] - dims['y'] - 30
229        snapshot = await self.page.screenshot({
230            'type': format,
231            'quality': quality,
232            'clip': dims
233        })
234        if encoding == 'binary':
235            return snapshot
236        else:
237            encoded_snapshot = base64.b64encode(snapshot)
238            return encoded_snapshot.decode('ascii')
239
240    async def get_state(self, include_snapshot='numpy'):
241        """
242
243        :param include_snapshot: numpy, pil, ascii, bytes, None
244        :param fmt:
245        :param quality:
246        :return: a GameState instance
247        """
248        state = await self.page.evaluate('''(includeSnapshot, format, quality) => {
249            return neyboyChallenge.state(includeSnapshot, format, quality);
250        }''', include_snapshot, 'image/jpeg', 30)
251
252        self.state_id += 1
253        self._dims = state['dimensions']
254        state['hiscore'] = int(state['hiscore'])
255        state['score'] = int(state['score'])
256        state['status'] = int(state['status'])
257        state['id'] = self.state_id
258        state['game_id'] = self.game_id
259        state['timestamp'] = dt.datetime.today().timestamp()
260
261        if include_snapshot is not None:
262            base64_string = state['snapshot']
263            base64_string = re.sub('^data:image/.+;base64,', '', base64_string)
264            imgdata = base64.b64decode(base64_string)
265            bytes_io = io.BytesIO(imgdata)
266
267            if include_snapshot == 'numpy':
268                image = Image.open(bytes_io)
269                state['snapshot'] = np.array(image)
270            elif include_snapshot == 'pil':
271                image = Image.open(bytes_io)
272                state['snapshot'] = image
273            elif include_snapshot == 'ascii':
274                image = Image.open(bytes_io)
275                state['snapshot'] = self.screenshot_to_ascii(image, 0.1, 3)
276            elif include_snapshot == 'bytes':
277                state['snapshot'] = bytes_io
278            else:
279                raise ValueError('Supported snapshot formats are: numpy, pil, ascii, bytes')
280
281        self.state = GameState(**state)
282
283        return self.state
284
285    async def save_screenshot(self, path, format="jpeg", quality=30):
286        dims = await self.dimensions()
287        dims['y'] = dims['height'] / 2
288        dims['height'] = dims['height'] - dims['y'] - 30
289        await self.page.screenshot({
290            'path': path,
291            'type': format,
292            'quality': quality,
293            'clip': dims
294        })
295
296    async def is_in_start_screen(self):
297        playing_status = await self._get_is_playing_status()
298        return playing_status == START_SCREEN
299
300    @staticmethod
301    def screenshot_to_ascii(img, scale, intensity_correction_factor, width_correction_factor=7 / 4):
302        """
303        https://gist.github.com/cdiener/10491632
304        :return:
305        """
306        chars = np.asarray(list(' .,:;irsXA253hMHGS#9B&@'))
307        SC, GCF, WCF = scale, intensity_correction_factor, width_correction_factor
308        S = (round(img.size[0] * SC * WCF), round(img.size[1] * SC))
309        img = np.sum(np.asarray(img.resize(S)), axis=2)
310        img -= img.min()
311        img = (1.0 - img / img.max()) ** GCF * (chars.size - 1)
312        return "\n".join(("".join(r) for r in chars[img.astype(int)]))
313
314
315class SyncGame:
316
317    def __init__(self, game: Game):
318        self.game = game
319
320    def __getattr__(self, attr):
321        return sync(getattr(self.game, attr))
322
323    @staticmethod
324    def create(headless=True, user_data_dir=None, navigation_timeout=DEFAULT_NAVIGATION_TIMEOUT,
325               game_url=DEFAULT_GAME_URL, browser_ws_endpoint=None) -> 'SyncGame':
326        o = sync(Game.create)(headless, user_data_dir, navigation_timeout, game_url, browser_ws_endpoint)
327        return SyncGame(o)
328
Full Screen

_helper.py

Source: _helper.py Github

copy
1# Copyright (c) Microsoft Corporation.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import fnmatch
16import math
17import re
18import sys
19import time
20import traceback
21from types import TracebackType
22from typing import (
23    TYPE_CHECKING,
24    Any,
25    Callable,
26    Dict,
27    List,
28    Optional,
29    Pattern,
30    Union,
31    cast,
32)
33
34from playwright._impl._api_types import Error, TimeoutError
35
36if sys.version_info >= (3, 8):  # pragma: no cover
37    from typing import Literal, TypedDict
38else:  # pragma: no cover
39    from typing_extensions import Literal, TypedDict
40
41
42if TYPE_CHECKING:  # pragma: no cover
43    from playwright._impl._network import Request, Response, Route
44
45URLMatch = Union[str, Pattern, Callable[[str], bool]]
46URLMatchRequest = Union[str, Pattern, Callable[["Request"], bool]]
47URLMatchResponse = Union[str, Pattern, Callable[["Response"], bool]]
48RouteHandler = Union[Callable[["Route"], Any], Callable[["Route", "Request"], Any]]
49
50ColorScheme = Literal["dark", "light", "no-preference"]
51DocumentLoadState = Literal["domcontentloaded", "load", "networkidle"]
52KeyboardModifier = Literal["Alt", "Control", "Meta", "Shift"]
53MouseButton = Literal["left", "middle", "right"]
54
55
56class ErrorPayload(TypedDict, total=False):
57    message: str
58    name: str
59    stack: str
60    value: Any
61
62
63class Header(TypedDict):
64    name: str
65    value: str
66
67
68class ContinueParameters(TypedDict, total=False):
69    url: Optional[str]
70    method: Optional[str]
71    headers: Optional[List[Header]]
72    postData: Optional[str]
73
74
75class ParsedMessageParams(TypedDict):
76    type: str
77    guid: str
78    initializer: Dict
79
80
81class ParsedMessagePayload(TypedDict, total=False):
82    id: int
83    guid: str
84    method: str
85    params: ParsedMessageParams
86    result: Any
87    error: ErrorPayload
88
89
90class Document(TypedDict):
91    request: Optional[Any]
92
93
94class FrameNavigatedEvent(TypedDict):
95    url: str
96    name: str
97    newDocument: Optional[Document]
98    error: Optional[str]
99
100
101Env = Dict[str, Union[str, float, bool]]
102
103
104class URLMatcher:
105    def __init__(self, match: URLMatch) -> None:
106        self._callback: Optional[Callable[[str], bool]] = None
107        self._regex_obj: Optional[Pattern] = None
108        if isinstance(match, str):
109            regex = fnmatch.translate(match)
110            self._regex_obj = re.compile(regex)
111        elif isinstance(match, Pattern):
112            self._regex_obj = match
113        else:
114            self._callback = match
115        self.match = match
116
117    def matches(self, url: str) -> bool:
118        if self._callback:
119            return self._callback(url)
120        if self._regex_obj:
121            return cast(bool, self._regex_obj.search(url))
122        return False
123
124
125class TimeoutSettings:
126    def __init__(self, parent: Optional["TimeoutSettings"]) -> None:
127        self._parent = parent
128        self._timeout = 30000.0
129        self._navigation_timeout = 30000.0
130
131    def set_timeout(self, timeout: float) -> None:
132        self._timeout = timeout
133
134    def timeout(self) -> float:
135        if self._timeout is not None:
136            return self._timeout
137        if self._parent:
138            return self._parent.timeout()
139        return 30000
140
141    def set_navigation_timeout(self, navigation_timeout: float) -> None:
142        self._navigation_timeout = navigation_timeout
143
144    def navigation_timeout(self) -> float:
145        if self._navigation_timeout is not None:
146            return self._navigation_timeout
147        if self._parent:
148            return self._parent.navigation_timeout()
149        return 30000
150
151
152def serialize_error(ex: Exception, tb: Optional[TracebackType]) -> ErrorPayload:
153    return dict(message=str(ex), name="Error", stack="".join(traceback.format_tb(tb)))
154
155
156def parse_error(error: ErrorPayload) -> Error:
157    base_error_class = Error
158    if error.get("name") == "TimeoutError":
159        base_error_class = TimeoutError
160    return base_error_class(
161        cast(str, patch_error_message(error.get("message"))), error["stack"]
162    )
163
164
165def patch_error_message(message: Optional[str]) -> Optional[str]:
166    if not message:
167        return None
168
169    match = re.match(r"(\w+)(: expected .*)", message)
170    if match:
171        message = to_snake_case(match.group(1)) + match.group(2)
172    message = message.replace(
173        "Pass { acceptDownloads: true }", "Pass { accept_downloads: True }"
174    )
175    return message
176
177
178def locals_to_params(args: Dict) -> Dict:
179    copy = {}
180    for key in args:
181        if key == "self":
182            continue
183        if args[key] is not None:
184            copy[key] = args[key]
185    return copy
186
187
188def monotonic_time() -> int:
189    return math.floor(time.monotonic() * 1000)
190
191
192class RouteHandlerEntry:
193    def __init__(self, matcher: URLMatcher, handler: RouteHandler):
194        self.matcher = matcher
195        self.handler = handler
196
197
198def is_safe_close_error(error: Exception) -> bool:
199    message = str(error)
200    return message.endswith("Browser has been closed") or message.endswith(
201        "Target page, context or browser has been closed"
202    )
203
204
205def not_installed_error(message: str) -> Exception:
206    return Exception(
207        f"""
208================================================================================
209{message}
210Please complete Playwright installation via running
211
212    "python -m playwright install"
213
214================================================================================
215"""
216    )
217
218
219to_snake_case_regex = re.compile("((?<=[a-z0-9])[A-Z]|(?!^)[A-Z](?=[a-z]))")
220
221
222def to_snake_case(name: str) -> str:
223    return to_snake_case_regex.sub(r"_\1", name).lower()
224
Full Screen

Accelerate Your Automation Test Cycles With LambdaTest

Leverage LambdaTest’s cloud-based platform to execute your automation tests in parallel and trim down your test execution time significantly. Your first 100 automation testing minutes are on us.

Try LambdaTest

Run Python Tests on LambdaTest Cloud Grid

Execute automation tests with Playwright Python on a cloud-based Grid of 3000+ real browsers and operating systems for both web and mobile applications.

Test now for Free
LambdaTestX

We use cookies to give you the best experience. Cookies help to provide a more personalized experience and relevant advertising for you, and web analytics for us. Learn More in our Cookies policy, Privacy & Terms of service

Allow Cookie
Sarah

I hope you find the best code examples for your project.

If you want to accelerate automated browser testing, try LambdaTest. Your first 100 automation testing minutes are FREE.

Sarah Elson (Product & Growth Lead)