Best Python code snippet using playwright-python
playwrightmanager.py
Source:playwrightmanager.py  
...202            strict_selectors=strict_selectors,203            user_agent=user_agent,204            viewport=viewport205        )206        self._context.set_default_navigation_timeout(self.default_navigation_timeout)207        self._context.set_default_timeout(self.default_timeout)208    def close_context(self):209        """å
³éæµè§å¨ä¸ä¸æã210        å±äºæµè§å¨ä¸ä¸æçææé¡µé¢é½å°å
³éã211        æ æ³å
³éé»è®¤æµè§å¨ä¸ä¸æã212        """213        if self._context is not None:214            self._context.close()215            if self._browser.contexts:216                self._context = self._browser.contexts[-1]217                self._page = None218                self._interaction = None219    def new_page(220            self,221            accept_downloads: bool = None,222            base_url: str = None,223            bypass_csp: bool = None,224            extra_http_headers: typing.Optional[typing.Dict[str, str]] = None,225            http_credentials: HttpCredentials = None,226            ignore_https_errors: bool = None,227            java_script_enabled: bool = None,228            no_viewport: bool = None,229            proxy: ProxySettings = None,230            storage_state: typing.Union[StorageState, str, pathlib.Path] = None,231            strict_selectors: bool = None,232            user_agent: str = None,233            viewport: ViewportSize = None234    ):235        """卿°çæµè§å¨ä¸ä¸æä¸å建ä¸ä¸ªæ°é¡µé¢ã å
³éæ¤é¡µé¢ä¹å°å
³éä¸ä¸æãè¿æ¯ä¸ä¸ªæ¹ä¾¿çAPIï¼åºè¯¥åªç¨äºåé¡µåºæ¯åççæ®µã236        ç产代ç åºæ¾å¼å建æµè§å¨ä¸ä¸æï¼ç¶åå建页é¢ï¼ä»¥æ§å¶å
¶ç¡®åççå½å¨æã237        :param accept_downloads: æ¯å¦èªå¨ä¸è½½ææéä»¶ã é»è®¤ä¸º false ï¼å
¶ä¸ææä¸è½½é½è¢«åæ¶ã238        :param base_url: å½ä½¿ç¨ goto(url, **kwargs), route(url, handler), wait_for_url(url, **kwargs),239            expect_request(url_or_predicate, **kwargs), æ expect_response(url_or_predicate) , **kwargs) æ¶ï¼240            å®éè¿ä½¿ç¨ URL() æé å½æ°æ¥æå»ºç¸åºç URL ãä¾åï¼241            baseURL: http://localhost:3000 并导èªå° /bar.html => http://localhost:3000/bar.html242            baseURL: http://localhost:3000/foo/ 并导èªå° ./bar.html => http://localhost:3000/foo/bar.html243        :param bypass_csp: 忢ç»è¿é¡µé¢çå
容å®å
¨çç¥ã244        :param extra_http_headers: å
嫿¯ä¸ªè¯·æ±é½è¦åéçéå HTTP头çå¯¹è±¡ãæææ é¢å¼å¿
é¡»æ¯å符串ã245        :param http_credentials: HTTP 身份éªè¯çåæ®ã246        :param ignore_https_errors: æ¯å¦å¨å¯¼èªè¿ç¨ä¸å¿½ç¥ HTTPS é误ã é»è®¤ä¸º falseã247        :param java_script_enabled: æ¯å¦å¨ä¸ä¸æä¸å¯ç¨ JavaScriptã é»è®¤ä¸º trueã248        :param no_viewport: ä¸å¼ºå¶åºå®è§å£ï¼å
è®¸å¨æå¤´æ¨¡å¼ä¸è°æ´çªå£å¤§å°ã249        :param proxy: 䏿¤ä¸ä¸æä¸èµ·ä½¿ç¨çç½ç»ä»£ç设置ã250        :param storage_state: 使ç¨ç»å®çåå¨ç¶æå¡«å
ä¸ä¸æã251            æ¤é项å¯ç¨äºä½¿ç¨éè¿ storage_state(**kwargs) è·åçç»å½ä¿¡æ¯åå§åä¸ä¸æã252            å
·æä¿ååå¨çæä»¶çè·¯å¾ï¼æå
·æä»¥ä¸å段ç对象:253            cookies <List[Dict]> 为ä¸ä¸æè®¾ç½®çå¯é cookie254                name <str>255                value <str>256                url <str> urlãdomainåpathä¸éä¸ã257                domain <str> urlãdomainåpathä¸éä¸ã258                path <str> urlãdomainåpathä¸éä¸ã259                expires <float> å¯éç Unix æ¶é´ï¼ä»¥ç§ä¸ºåä½ï¼ã260                httpOnly <bool> å¯éç httpOnly æ å¿261                secure <bool> å¯éçsecureæ å¿262                sameSite <"Strict"|"Lax"|"None"> å¯éç sameSite æ å¿263            origins <List[Dict]> 为ä¸ä¸æè®¾ç½®çå¯é localStorage264                origin <str>265                localStorage <List[Dict]>266                name <str>267                value <str>268        :param strict_selectors: 宿å®ï¼ä¸ºæ¤ä¸ä¸æå¯ç¨ä¸¥æ ¼éæ©å¨æ¨¡å¼ã269            å¨ä¸¥æ ¼éæ©å¨æ¨¡å¼æ¨¡å¼ä¸ï¼å½å¤ä¸ªå
ç´ ä¸éæ©å¨å¹é
æ¶ï¼å°æåºå¯¹éæ©å¨çæææä½ï¼è¿äºæä½æå³çåä¸ªç®æ DOMå
ç´ ã270        :param user_agent: 卿¤ä¸ä¸æä¸ä½¿ç¨çç¹å®ç¨æ·ä»£çã271        :param viewport: 为æ¯ä¸ªé¡µé¢è®¾ç½®ä¸è´çè§çªãé»è®¤ä¸º 1280x720 è§çªã272            width <int> 以å素为åä½ç页é¢å®½åº¦ã273            height <int> 以å素为åä½ç页é¢é«åº¦ã274        """275        if self._context is not None:276            self._page = self._context.new_page()277        else:278            if no_viewport is None:279                no_viewport = True280            self._page = self._browser.new_page(281                accept_downloads=accept_downloads,282                base_url=base_url,283                bypass_csp=bypass_csp,284                extra_http_headers=extra_http_headers,285                http_credentials=http_credentials,286                ignore_https_errors=ignore_https_errors,287                java_script_enabled=java_script_enabled,288                no_viewport=no_viewport,289                proxy=proxy,290                storage_state=storage_state,291                strict_selectors=strict_selectors,292                user_agent=user_agent,293                viewport=viewport294            )295            self._page.context.set_default_timeout(self.default_timeout)296            self._page.context.set_default_navigation_timeout(self.default_navigation_timeout)297        self._interaction = self._page298    def close_page(self):299        """å
³éå½åç页é¢ãå¦æè¿æå
¶ä»æå¼ç页é¢ï¼å°åæ¢å°æè¿æå¼ç页é¢ã"""300        if self._page is not None:301            self._page.close()302            if self._context.pages:  # å¦æè¿ææå¼ç页é¢303                self._page = self._context.pages[-1]304                self._interaction = self._page305            else:306                self._interaction = None307    def switch_context(self, index: int):308        """æ `index` å°æ´»å¨æµè§å¨ä¸ä¸æåæ¢å°å¦ä¸ä¸ªæå¼çä¸ä¸æã309        :param index: è¦æ´æ¹ä¸ºçä¸ä¸æçç´¢å¼ãä»0å¼å§ã310        """..._frame.py
Source:_frame.py  
...117            Error("Navigating frame was detached!"),118            lambda frame: frame == self,119        )120        if timeout is None:121            timeout = self._page._timeout_settings.navigation_timeout()122        wait_helper.reject_on_timeout(timeout, f"Timeout {timeout}ms exceeded.")123        return wait_helper124    def expect_navigation(125        self,126        url: URLMatch = None,127        wait_until: DocumentLoadState = None,128        timeout: float = None,129    ) -> EventContextManagerImpl[Response]:130        if not wait_until:131            wait_until = "load"132        if timeout is None:133            timeout = self._page._timeout_settings.navigation_timeout()134        deadline = monotonic_time() + timeout135        wait_helper = self._setup_navigation_wait_helper(timeout)136        matcher = URLMatcher(url) if url else None137        def predicate(event: Any) -> bool:138            # Any failed navigation results in a rejection.139            if event.get("error"):140                return True141            return not matcher or matcher.matches(event["url"])142        wait_helper.wait_for_event(143            self._event_emitter,144            "navigated",145            predicate=predicate,146        )147        async def continuation() -> Optional[Response]:...asv_pilot.py
Source:asv_pilot.py  
1#!/usr/bin/env python2# -*- coding: utf-8 -*-3# Software License Agreement (BSD License)4#5#  Copyright (c) 2014, Ocean Systems Laboratory, Heriot-Watt University, UK.6#  All rights reserved.7#8#  Redistribution and use in source and binary forms, with or without9#  modification, are permitted provided that the following conditions10#  are met:11#12#   * Redistributions of source code must retain the above copyright13#     notice, this list of conditions and the following disclaimer.14#   * Redistributions in binary form must reproduce the above15#     copyright notice, this list of conditions and the following16#     disclaimer in the documentation and/or other materials provided17#     with the distribution.18#   * Neither the name of the Heriot-Watt University nor the names of19#     its contributors may be used to endorse or promote products20#     derived from this software without specific prior written21#     permission.22#23#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS24#  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT25#  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS26#  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE27#  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,28#  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,29#  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;30#  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER31#  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT32#  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN33#  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE34#  POSSIBILITY OF SUCH DAMAGE.35#36#  Original authors:37#   Valerio De Carolis, Marian Andrecki, Corina Barbalata, Gordon Frost38from __future__ import division39import roslib40roslib.load_manifest('asv_pilot')41import rospy42import numpy as np43np.set_printoptions(precision=2, suppress=True)44# import sys45import asv_controllers as ctrl46import frame_maths as fm47# Messages48from vehicle_interface.msg import PilotRequest, PilotStatus, ThrusterCommand49from auv_msgs.msg import NavSts50from vehicle_interface.srv import BooleanService, BooleanServiceResponse51# Constants52TOPIC_THROTTLE = '/motors/throttle'53TOPIC_POSITION_REQUEST = '/pilot/position_req'54TOPIC_BODY_REQUEST = '/pilot/body_req'55TOPIC_GEO_REQUEST = '/pilot/geo_req'56TOPIC_VELOCITY_REQUEST = '/pilot/velocity_req'57TOPIC_STATUS = '/pilot/status'58TOPIC_NAV = '/nav/nav_sts'59SRV_SWITCH = '/pilot/switch'60SRV_PID_CONFIG = '/pilot/pid_config'61NAVIGATION_TIMEOUT = 5  # seconds62LOOP_RATE = 5  # Hz63SIMULATION = False64VERBOSE = False65# Status66CTRL_DISABLED = 067CTRL_ENABLED = 168STATUS_CTRL = {69    CTRL_DISABLED: PilotStatus.PILOT_DISABLED,70    CTRL_ENABLED: PilotStatus.PILOT_ENABLED71}72STATUS_MODE = {73    ctrl.MODE_POSITION: PilotStatus.MODE_POSITION,74    ctrl.MODE_VELOCITY: PilotStatus.MODE_VELOCITY,75    ctrl.MODE_POINT_SHOOT: 'point_shoot'76}77SCALE_THROTTLE = 1.078class Pilot(object):79    """Node provides an interface between control logic and ROS. This node outputs throttle commands that can be80     consumed either by pololu_driver or thruster_sim. The controller will not run if fresh navigation81     information is not available. The controller can be enabled or disabled via service.82     Generally, the asv has two degrees of freedom: surge and yaw. They are coupled - that is, the boat cannot yaw83     without thrusting forward (note that this means limited steering when slowing down).84     Different control policies:85        - point and shoot - simple P controller on position (distance to goal and orientation towards the goal).86        - cascaded pid - PID position controller outputs a desired velocity, then PID velocity controller attempts to87            achieve this velocity88        - velocity control - in progress89    """90    def __init__(self, name, topic_throttle, topic_position_request, topic_body_request, topic_geo_request,91                 topic_velocity_request, topic_nav, topic_pilot_status, srv_switch, verbose, controller_config):92        self.name = name93        # latest throttle received94        self.pose = np.zeros(6)  # [x, y, z, roll, pitch, yaw]95        self.vel = np.zeros(6)96        self.origin = np.zeros(2)97        self.geo_radius = fm.compute_geocentric_radius(self.origin[0])98        self.last_nav_t = 099        self.nav_switch = False100        self.pilot_enable = CTRL_DISABLED101        self.verbose = verbose102        self.controller = ctrl.Controller(1/LOOP_RATE)103        self.controller.set_mode(ctrl.MODE_POSITION)104        self.controller.update_gains(controller_config)105        # Subscribers106        self.position_sub = rospy.Subscriber(topic_position_request, PilotRequest, self.handle_pose_req, tcp_nodelay=True, queue_size=1)107        self.body_sub = rospy.Subscriber(topic_body_request, PilotRequest, self.handle_body_req, tcp_nodelay=True, queue_size=1)108        self.geo_sub = rospy.Subscriber(topic_geo_request, PilotRequest, self.handle_geo_req, tcp_nodelay=True, queue_size=1)109        self.velocity_sub = rospy.Subscriber(topic_velocity_request, PilotRequest, self.handle_vel_req, tcp_nodelay=True, queue_size=1)110        self.nav_sub = rospy.Subscriber(topic_nav, NavSts, self.handle_nav, tcp_nodelay=True, queue_size=1)111        # Publishers112        self.throttle_pub = rospy.Publisher(topic_throttle, ThrusterCommand, tcp_nodelay=True, queue_size=1)113        self.status_pub = rospy.Publisher(topic_pilot_status, PilotStatus, tcp_nodelay=True, queue_size=1)114        # Services115        self.srv_switch = rospy.Service(srv_switch, BooleanService, self.handle_switch)116        self.srv_pid_config = rospy.Service(SRV_PID_CONFIG, BooleanService, self.handle_pid_config)117    def loop(self):118        # if nav message is old stop the controller119        if (rospy.Time.now().to_sec() - self.last_nav_t) > NAVIGATION_TIMEOUT and self.nav_switch:120            self.nav_switch = False121            rospy.logerr('Navigation outdated')122        if self.nav_switch and self.pilot_enable:123            throttle = self.controller.evaluate_control()124            if self.verbose:125                rospy.loginfo(str(self.controller))126            thr_msg = ThrusterCommand()127            thr_msg.header.stamp = rospy.Time.now()128            thr_msg.throttle = throttle129            self.throttle_pub.publish(thr_msg)130        self.send_status()131    def handle_nav(self, msg):132        try:133            pos = msg.position134            orient = msg.orientation135            vel = msg.body_velocity136            rot = msg.orientation_rate137            self.pose[0:3] = np.array([pos.north, pos.east, pos.depth])138            self.pose[3:6] = np.array([orient.roll, orient.pitch, orient.yaw])139            self.vel[0:3] = np.array([vel.x, vel.y, vel.z])140            self.vel[3:6] = np.array([rot.roll, rot.pitch, rot.yaw])141            tmp_origin = np.array([msg.origin.latitude, msg.origin.longitude])142            if np.all(self.origin != tmp_origin):143                self.origin = tmp_origin144                self.geo_radius = fm.compute_geocentric_radius(self.origin[0])145            # dt = msg.header.stamp.to_sec() - self.last_nav_t146            self.last_nav_t = msg.header.stamp.to_sec()147            self.nav_switch = True148            self.controller.update_nav(self.pose, velocity=self.vel)149        except Exception as e:150            rospy.logerr('%s', e)151            rospy.logerr('Bad navigation message format, skipping!')152    def handle_pose_req(self, msg):153        req_pose = np.array(msg.position)154        # ignore depth, pitch and roll155        if any(req_pose[2:5]):156            rospy.logwarn('Non-zero depth, pitch or roll requested. Setting those to zero.')157        req_pose[2:6] = 0158        self.controller.request_pose(req_pose)159    def handle_body_req(self, msg):160        req_pose = np.array(msg.position)161        # ignore depth, pitch and roll162        if any(req_pose[2:5]):163            rospy.logwarn('Non-zero depth, pitch or roll requested. Setting those to zero.')164        req_pose[2:6] = 0165        self.controller.request_body(req_pose)166    def handle_geo_req(self, msg):167        req_pose = np.array(msg.position)168        if any(req_pose[2:5]):169            rospy.logwarn('Non-zero depth, pitch or roll requested. Setting those to zero.')170        # ignore depth, pitch and roll171        req_pose[2:6] = 0172        req_pose[0:2] = fm.geo2ne(req_pose[0:2], self.origin, self.geo_radius)173        self.controller.request_pose(req_pose)174    def handle_vel_req(self, msg):175        req_vel = np.array(msg.velocity)176        if any(req_vel[1:5]):177            rospy.logwarn('Non-zero sway, heave, pitch or roll requested. Setting those to zero.')178        req_vel[1:5] = 0179        self.controller.request_vel(req_vel)180    def handle_switch(self, srv):181        self.pilot_enable = srv.request182        if not self.pilot_enable:183            thr_msg = ThrusterCommand()184            thr_msg.header.stamp = rospy.Time.now()185            thr_msg.throttle = np.zeros(6)186            self.throttle_pub.publish(thr_msg)187        return BooleanServiceResponse(self.pilot_enable)188    def handle_pid_config(self, srv):189        if srv.request is True:190            config = rospy.get_param('/controller', dict())191            self.controller.update_gains(config)192            rospy.logwarn("PID config reloaded")193            return BooleanServiceResponse(True)194        else:195            return BooleanServiceResponse(False)196    def send_status(self, event=None):197        ps = PilotStatus()198        ps.header.stamp = rospy.Time.now()199        ps.status = STATUS_CTRL[self.pilot_enable]200        ps.mode = STATUS_MODE[self.controller.mode]201        if self.controller.req_pose is not None:202            ps.des_pos = self.controller.req_pose.tolist()203        vel = np.zeros(6)204        vel[0:2] = self.controller.des_vel205        ps.des_vel = vel.tolist()206        self.status_pub.publish(ps)207if __name__ == '__main__':208    rospy.init_node('asv_pilot')209    name = rospy.get_name()210    topic_throttle = rospy.get_param('~topic_throttle', TOPIC_THROTTLE)211    topic_position_request = rospy.get_param('~topic_position_request', TOPIC_POSITION_REQUEST)212    topic_body_request = rospy.get_param('~topic_body_request', TOPIC_BODY_REQUEST)213    topic_geo_request = rospy.get_param('~topic_geo_request', TOPIC_GEO_REQUEST)214    topic_velocity_request = rospy.get_param('~topic_velocity_request', TOPIC_VELOCITY_REQUEST)215    topic_nav = rospy.get_param('~topic_nav', TOPIC_NAV)216    topic_pilot_status = rospy.get_param('~topic_pilot_status', TOPIC_STATUS)217    srv_switch = rospy.get_param('~srv_switch', SRV_SWITCH)218    # simulation = bool(int(rospy.get_param('~simulation', SIMULATION)))219    verbose = bool(int(rospy.get_param('~verbose', VERBOSE)))220    controller_config = rospy.get_param('~controller', dict())221    rospy.loginfo('%s: throttle topic: %s', name, topic_throttle)222    rospy.loginfo('%s: topic_position_request: %s', name, topic_position_request)223    rospy.loginfo('%s: topic_body_request: %s', name, topic_body_request)224    rospy.loginfo('%s: topic_geo_request: %s', name, topic_geo_request)225    rospy.loginfo('%s: topic_velocity_request: %s', name, topic_velocity_request)226    rospy.loginfo('%s: topic_nav: %s', name, topic_nav)227    rospy.loginfo('%s: topic_pilot_status: %s', name, topic_pilot_status)228    rospy.loginfo('%s: srv_switch: %s', name, srv_switch)229    rospy.loginfo('%s: verbose: %s', name, verbose)230    pilot = Pilot(name, topic_throttle, topic_position_request, topic_body_request, topic_geo_request,231                  topic_velocity_request, topic_nav, topic_pilot_status, srv_switch, verbose,232                  controller_config)233    loop_rate = rospy.Rate(LOOP_RATE)234    while not rospy.is_shutdown():235        try:236            pilot.loop()237            loop_rate.sleep()238        except rospy.ROSInterruptException:239            rospy.loginfo('%s caught ros interrupt!', name)240        # except Exception as e:241        #     rospy.logfatal('%s', e)242        #     rospy.logfatal('Caught exception and dying!')...neyboy.py
Source:neyboy.py  
1import base642import datetime as dt3import io4try:5    from gym import logger6except:7    import logging as logger8import random9import re10import uuid11from collections import namedtuple12import pathlib13import numpy as np14from PIL import Image15from pyppeteer import launch, connect16from syncer import sync17ACTION_NAMES = ["NOOP", "LEFT", "RIGHT"]18ACTION_NONE = 019ACTION_LEFT = 120ACTION_RIGHT = 221START_SCREEN = 022GAME_SCREEN = 123GAME_OVER_SCREEN = 224TOAST_APPEARANCE_FREQUENCY = 1025GameState = namedtuple('GameState',26                       ['game_id', 'id', 'score', 'status', 'hiscore', 'snapshot', 'timestamp', 'dimensions',27                        'position'])28DEFAULT_NAVIGATION_TIMEOUT = 60 * 100029DEFAULT_GAME_URL = 'http://fabito.github.io/neyboy/'30DEFAULT_BROWSER_WS_ENDPOINT = 'ws://localhost:3000'31DEFAULT_CHROMIUM_LAUNCH_ARGS = ['--no-sandbox', '--window-size=80,315', '--disable-infobars']32class Game:33    def __init__(self, headless=True, user_data_dir=None, navigation_timeout=DEFAULT_NAVIGATION_TIMEOUT,34                 game_url=DEFAULT_GAME_URL, browser_ws_endpoint=None, initial_width=180, initial_height=320):35        self.initial_width = initial_width36        self.initial_height = initial_height37        self.headless = headless38        self.user_data_dir = user_data_dir39        self.navigation_timeout = navigation_timeout40        self.is_running = False41        self.browser = None42        self.page = None43        self.state = None44        self._dims = None45        self.game_id = str(uuid.uuid4())46        self.state_id = 047        self.game_url = game_url48        self.browser_ws_endpoint = browser_ws_endpoint49    async def initialize(self):50        if self.browser_ws_endpoint is not None:51            logger.info('Connecting to running instance at: %s', self.browser_ws_endpoint)52            self.browser = await connect(browserWSEndpoint=self.browser_ws_endpoint)53            self.page = await self.browser.newPage()54        else:55            logger.info('Launching new browser instance')56            if self.user_data_dir is not None:57                self.browser = await launch(headless=self.headless, userDataDir=self.user_data_dir,58                                            args=DEFAULT_CHROMIUM_LAUNCH_ARGS)59            else:60                self.browser = await launch(headless=self.headless)61            pages = await self.browser.pages()62            if len(pages) > 0:63                self.page = pages[0]64            else:65                self.page = await self.browser.newPage()66        self.page.setDefaultNavigationTimeout(self.navigation_timeout)67        await self.page.setViewport(dict(width=117, height=156))68        await self.page.goto('{}?w={}&h={}'.format(self.game_url, self.initial_width, self.initial_height),69                             {'waitUntil': 'networkidle2'})70        envjs_path = pathlib.Path(__file__).resolve().parent.joinpath('env.js')71        await self.page.addScriptTag(dict(path=str(envjs_path)))72        await self.is_ready()73    @staticmethod74    async def create(headless=True, user_data_dir=None, navigation_timeout=DEFAULT_NAVIGATION_TIMEOUT,75                     game_url=DEFAULT_GAME_URL, browser_ws_endpoint=None) -> 'Game':76        o = Game(headless, user_data_dir, navigation_timeout, game_url, browser_ws_endpoint)77        await o.initialize()78        return o79    @property80    def x(self):81        return int(self._dims['x'])82    @property83    def y(self):84        return int(self._dims['y'])85    @property86    def width(self):87        return int(self._dims['width'])88    @property89    def height(self):90        return int(self._dims['height'])91    async def dimensions(self):92        dimensions = await self.page.evaluate('''() => {93                return neyboyChallenge.dimensions();94            }''')95        return dimensions96    async def is_ready(self):97        await self.page.waitForFunction('''()=>{98            return neyboyChallenge && neyboyChallenge.isReady();99        }''')100    async def start(self):101        if random.randint(0, 1):102            await self.tap_right()103        else:104            await self.tap_left()105        await self._shuffle_toasts()106        return self107    async def _shuffle_toasts(self):108        await self.page.evaluate('''() => {109            neyboyChallenge.shuffleToasts();110        }''')111        return self112    def is_over(self):113        return self.state.status == GAME_OVER_SCREEN114    async def _wait_until_replay_button_is_active(self):115        await self.resume()116        await self.page.waitForFunction('''() => {117            return neyboyChallenge.isOver();118        }''')119    async def is_loaded(self):120        return await self.is_ready()121    async def pause(self):122        await self.page.evaluate('''() => {123            neyboyChallenge.pause();124        }''')125    async def resume(self):126        await self.page.evaluate('''() => {127            neyboyChallenge.resume();128        }''')129    async def get_score(self):130        score = await self.page.evaluate('''() => {131            return neyboyChallenge.getScore();132        }''')133        return int(score) if score else 1134    async def get_high_score(self):135        hiscore = await self.page.evaluate('''() => {136                return neyboyChallenge.runtime !== undefined &&137                       neyboyChallenge.runtime.getEventVariableByName('hiscore').data;138                }''')139        return int(hiscore) if hiscore else 1140    async def get_scores(self):141        scores = await self.page.evaluate('''() => {142                const score = neyboyChallenge.getScore();143                const hiscore = neyboyChallenge.runtime.getEventVariableByName('hiscore').data || 0;144                return {score, hiscore};145                }''')146        scores['score'] = int(scores['score'])147        scores['hiscore'] = int(scores['hiscore'])148        return scores149    async def tap_left(self, delay=0):150        x = self.x + self.width // 4151        y = self.y + self.height // 3152        await self.page.mouse.click(x, y, {'delay': delay})153    async def tap_right(self, delay=0):154        x = (self.x + self.width) - self.width // 4155        y = self.y + self.height // 3156        await self.page.mouse.click(x, y, {'delay': delay})157    async def stop(self):158        await self.browser.close()159    async def _hard_restart(self):160        await self.page.reload({'waitUntil': 'networkidle2'})161        await self.is_loaded()162    async def restart(self):163        self.game_id = str(uuid.uuid4())164        self.state_id = 0165        if self.state.status == GAME_SCREEN:166            # commit suicide167            while not self.is_over():168                logger.debug('suiciding')169                await self.tap_left()170                await self.tap_left()171                await self.tap_left()172                await self.get_state()173        if self.is_over():174            await self._wait_until_replay_button_is_active()175            x = self.x + self.width // 2176            y = self.y + self.height - self.height // 7177            await self.page.mouse.click(x, y)178        elif self.state.status == START_SCREEN:179            logger.debug('start screen')180        else:181            raise ValueError('Unknown state: {}'.format(self.state.status))182        await self.start()183    async def screenshot(self, format="jpeg", quality=30, encoding='binary'):184        dims = await self.dimensions()185        dims['y'] = dims['height'] / 2186        dims['height'] = dims['height'] - dims['y'] - 30187        snapshot = await self.page.screenshot({188            'type': format,189            'quality': quality,190            'clip': dims191        })192        if encoding == 'binary':193            return snapshot194        else:195            encoded_snapshot = base64.b64encode(snapshot)196            return encoded_snapshot.decode('ascii')197    async def get_state(self, include_snapshot='numpy'):198        """199        :param include_snapshot: numpy, pil, ascii, bytes, None200        :param fmt:201        :param quality:202        :return: a GameState instance203        """204        state = await self.page.evaluate('''(includeSnapshot, format, quality) => {205            return neyboyChallenge.state(includeSnapshot, format, quality);206        }''', include_snapshot, 'image/jpeg', 30)207        self.state_id += 1208        self._dims = state['dimensions']209        state['hiscore'] = int(state['hiscore'])210        state['score'] = int(state['score'])211        state['status'] = int(state['status'])212        state['id'] = self.state_id213        state['game_id'] = self.game_id214        state['timestamp'] = dt.datetime.today().timestamp()215        if include_snapshot is not None:216            base64_string = state['snapshot']217            base64_string = re.sub('^data:image/.+;base64,', '', base64_string)218            imgdata = base64.b64decode(base64_string)219            bytes_io = io.BytesIO(imgdata)220            if include_snapshot == 'numpy':221                image = Image.open(bytes_io)222                state['snapshot'] = np.array(image)223            elif include_snapshot == 'pil':224                image = Image.open(bytes_io)225                state['snapshot'] = image226            elif include_snapshot == 'ascii':227                image = Image.open(bytes_io)228                state['snapshot'] = self.screenshot_to_ascii(image, 0.1, 3)229            elif include_snapshot == 'bytes':230                state['snapshot'] = bytes_io231            else:232                raise ValueError('Supported snapshot formats are: numpy, pil, ascii, bytes')233        self.state = GameState(**state)234        return self.state235    async def save_screenshot(self, path, format="jpeg", quality=30):236        dims = await self.dimensions()237        dims['y'] = dims['height'] / 2238        dims['height'] = dims['height'] - dims['y'] - 30239        await self.page.screenshot({240            'path': path,241            'type': format,242            'quality': quality,243            'clip': dims244        })245    async def is_in_start_screen(self):246        playing_status = await self._get_is_playing_status()247        return playing_status == START_SCREEN248    @staticmethod249    def screenshot_to_ascii(img, scale, intensity_correction_factor, width_correction_factor=7 / 4):250        """251        https://gist.github.com/cdiener/10491632252        :return:253        """254        chars = np.asarray(list(' .,:;irsXA253hMHGS#9B&@'))255        SC, GCF, WCF = scale, intensity_correction_factor, width_correction_factor256        S = (round(img.size[0] * SC * WCF), round(img.size[1] * SC))257        img = np.sum(np.asarray(img.resize(S)), axis=2)258        img -= img.min()259        img = (1.0 - img / img.max()) ** GCF * (chars.size - 1)260        return "\n".join(("".join(r) for r in chars[img.astype(int)]))261class SyncGame:262    def __init__(self, game: Game):263        self.game = game264    def __getattr__(self, attr):265        return sync(getattr(self.game, attr))266    @staticmethod267    def create(headless=True, user_data_dir=None, navigation_timeout=DEFAULT_NAVIGATION_TIMEOUT,268               game_url=DEFAULT_GAME_URL, browser_ws_endpoint=None) -> 'SyncGame':269        o = sync(Game.create)(headless, user_data_dir, navigation_timeout, game_url, browser_ws_endpoint)...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
