Best Python code snippet using playwright-python
main.py
Source:main.py  
1# --- Revised 3-Clause BSD License ---2# Copyright Semtech Corporation 2020. All rights reserved.3#4# Redistribution and use in source and binary forms, with or without modification,5# are permitted provided that the following conditions are met:6#7#     * Redistributions of source code must retain the above copyright notice,8#       this list of conditions and the following disclaimer.9#     * Redistributions in binary form must reproduce the above copyright notice,10#       this list of conditions and the following disclaimer in the documentation11#       and/or other materials provided with the distribution.12#     * Neither the name of the Semtech corporation nor the names of its13#       contributors may be used to endorse or promote products derived from this14#       software without specific prior written permission.15#16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND17# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED18# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE19# DISCLAIMED. IN NO EVENT SHALL SEMTECH CORPORATION. BE LIABLE FOR ANY DIRECT,20# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,21# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,22# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF23# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE24# OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF25# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.26from typing import Any,Awaitable,Callable,Dict,List,Mapping,Optional27import sys28import os29import traceback30import asyncio31import websockets32import logging33import json34import argparse35from urllib.parse import urlparse36from websockets.server import WebSocketServerProtocol as WSSP37import router_config38from router import Router39from id6 import Id640logger = logging.getLogger('ts2pktfwd')41# All track stations managed by this process. Every web socket connection42# on muxs is forwarded to the router registered here. This map is populated43# at startup time.44routerid2router = {}    # type:Mapping[Id6,Router]45async def add_router(routerid:'Id6', rconfig:Mapping[str,Any], pkfwduri:str) -> Router:46    assert routerid not in routerid2router47    r = Router(routerid, rconfig, pkfwduri)48    await r.start()49    routerid2router[routerid] = r50    return r51async def websocket_send_error(websocket: WSSP, router:Optional[str], message:str) -> None:52    await websocket.send(json.dumps({ 'router': router if router else '0', 'error': message }))53class Infos():54    ''' Simple info server to handle router info requests. '''55    def __init__(self, host:str, port:int, muxs_uri:str) -> None:56        self.host = host57        self.port = port58        self.muxs_uri = muxs_uri59        self.ws_server = None    # type: Optional[websockets.server.WebSocketServer]60    async def start(self):61        #self.ws_server = await websockets.serve(self.accept, host=self.host, port=self.port)62        self.ws_server = await websockets.serve(self.accept, port=self.port)63    def __str__(self):64        return 'Infos'65    async def accept(self, websocket:WSSP, path:str) -> None:66        logger.info('%s: accept: path %s' % (self, path))67        router = None  # type:Optional[str]68        errmsg = None  # type:Optional[str]69        try:70            s = json.loads(await websocket.recv())71            logger.info('%s: read: %s' % (self, s))72            if 'router' not in s:73                errmsg = 'Invalid request data'74            else:75                router = s['router']76                routerid = Id6(router, 'router')77                if routerid not in routerid2router:78                    errmsg = 'Router not provisioned'79                else:80                    logger.info('%s: respond: %s' % (self, self.muxs_uri+'/'+str(routerid)))81                    resp = { 'router': router, 'muxs': 'muxs-::0', 'uri': self.muxs_uri+'/'+str(routerid) }82                    await websocket.send(json.dumps(resp))83                    return84        except asyncio.CancelledError:85            raise86        except Exception as exc:87            errmsg = 'Could not handle request'88            logger.error('%s: server socket failed: %s', self, exc, exc_info=True)89        await websocket_send_error(websocket, router, errmsg)90        resp = { 'error': errmsg }91        await websocket.send(json.dumps(resp))92    async def shutdown(self) -> None:93        ws_server = self.ws_server94        if ws_server:95            self.ws_server = None96            ws_server.close()97            await ws_server.wait_closed()98class Muxs():99    ''' Simple muxs server to accept router connects. '''100    def __init__(self, host:str, port:int) -> None:101        self.host = host102        self.port = port103        self.ws_server = None    # type: Optional[websockets.server.WebSocketServer]104    async def start(self):105        #self.ws_server = await websockets.serve(self.accept, host=self.host, port=self.port)106        self.ws_server = await websockets.serve(self.accept, port=self.port)107    def __str__(self):108        return 'Muxs'109    async def accept(self, websocket:WSSP, path:str) -> None:110        logger.info('%s: accept: %s' % (self, path))111        try:112            s = path[1:]113            routerid = Id6(s, 'router')114            if routerid not in routerid2router:115                await websocket_send_error(websocket, s, 'Router not provisioned')116                return117            r = routerid2router[routerid]118            await r.on_ws_connect(websocket)119        except Exception as exc:120            logger.error('%s: server socket failed: %s', self, exc, exc_info=True)121            return122    async def shutdown(self) -> None:123        ws_server = self.ws_server124        if ws_server:125            self.ws_server = None126            ws_server.close()127            await ws_server.wait_closed()128infos  = None  # type:Optional[Infos]129muxs   = None  # type:Optional[Muxs]130LOG_LEVELS = [ 'ERROR', 'WARNING', 'INFO', 'DEBUG' ]131LOG_STR2LEVEL = {132    'ERROR':   logging.ERROR,133    'WARNING': logging.WARNING,134    'INFO':    logging.INFO,135    'DEBUG':   logging.DEBUG136}137def ap_routerid(s:str) -> Id6:138    return Id6(s,'router')139def handle_exc(exc, exit_code=2):140    osenv = os.environ141    if 'stacktrace' in osenv and osenv['stacktrace']:142        print('Execution failed:')143        print(traceback.format_exc())144    else:145        print('Execution failed: %s' % (exc))146    sys.exit(exit_code)147async def main(args):148    global infos, muxs149    root = logging.getLogger()150    ll = logging.INFO151    if args.loglevel:152        ll = LOG_STR2LEVEL[args.loglevel]153    root.setLevel(ll)154    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')155    if args.logfile:156        fh = logging.FileHandler(args.logfile)157        fh.setLevel(logging.DEBUG)158        root.addHandler(fh)159    else:160        ch = logging.StreamHandler(sys.stdout)161        ch.setLevel(logging.DEBUG)162        ch.setFormatter(formatter)163        root.addHandler(ch)164    infosuri = urlparse(args.infosuri)165    infosport = infosuri.port166    infoshost = infosuri.hostname167    muxsuri = ''168    if args.muxsuri:169        muxsuri = urlparse(args.muxsuri)170    else:171        muxsuri = infosuri._replace(netloc="{}:{}".format(infoshost, infosport+2))172    muxsport = muxsuri.port173    muxshost = muxsuri.hostname174    pkfwduri = urlparse(args.pkfwduri)175    logger.info('Connection details: infosuri %s, muxsuri %s, pkfwduri %s' % (infosuri.geturl(), muxsuri.geturl(), pkfwduri.geturl()))176    infos = Infos(infoshost, infosport, muxsuri.geturl())177    muxs = Muxs(muxshost, muxsport)178    await infos.start()179    logger.info('Infos started.')180    await muxs.start()181    logger.info('Muxs started.')182    router_config.ini([ args.confdir ])183    routers = args.routerids if args.routerids else router_config.routerid2config.keys()184    for s in routers:185        routerid = Id6(s, 'router')186        logger.info("Instantiating %s" % (routerid))187        rc = router_config.get_router_config(routerid)188        await add_router(routerid, rc, pkfwduri)189if __name__ == '__main__':  # pragma:nocover190    parser = argparse.ArgumentParser(description='''ts2pkdfwd.''')191    parser.add_argument("--infosuri", type=str, default="ws://localhost:6090", help="Info server base URI.")192    parser.add_argument("--muxsuri", type=str, default=None, help="Mux server base URI, by default Info server port plus 2.")193    parser.add_argument("--pkfwduri", type=str, help="Packet forwarder destination URI.", default="udp://localhost:1680")194    parser.add_argument("--confdir", type=str, help="Directory where to load region and router configuration.", default=".")195    parser.add_argument("--logfile", type=str, help="Log file, by default logged to stdout.", default=None)196    parser.add_argument("--loglevel", type=str, choices=LOG_LEVELS, help="Log level: %s" % LOG_LEVELS, default='INFO')197    parser.add_argument("routerids", type=ap_routerid, nargs='*', help='Router ids', default= None)198    try:199        args = parser.parse_args()200    except Exception as exc:201        handle_exc(exc, 1)202    loop = asyncio.get_event_loop()203    try:204        loop.run_until_complete(main(args))205        asyncio.get_event_loop().run_forever()206    except Exception as ex:207        handle_exc(ex, 2)208    finally:...server.py
Source:server.py  
1import asyncio2import base643import json4import ssl5from os import environ as env6import aiohttp7import aiohttp_jinja28import jinja29from aiohttp import ClientSession, WSMsgType, client, web10from aiohttp_session import get_session, setup11from aiohttp_session.cookie_storage import EncryptedCookieStorage12from dotenv import find_dotenv, load_dotenv13from oauthlib.oauth2 import WebApplicationClient14import if_debug15from code_server_manager import CodeServerManager16load_dotenv(find_dotenv())17if_debug.attach_debugger_if_dev()18oath_client = WebApplicationClient(env["GITHUB_CLIENT_ID"])19async def login(req: web.Request) -> web.Response:20    request_uri = oath_client.prepare_request_uri(21        env["GITHUB_URL"],22        redirect_uri=str(req.url.parent) + "/callback",23        scope=["openid", "email", "profile"],24    )25    raise web.HTTPFound(request_uri)26@aiohttp_jinja2.template("home.html")27async def callback(req: web.Request) -> web.Response:28    code = req.query["code"]29    token_url, headers, body = oath_client.prepare_token_request(30        env["GITHUB_ACCESS_TOKEN"],31        # authorization_response=str(req.url),32        # redirect_url=str(req.url.parent),33        code=code,34    )35    headers["Accept"] = "application/json"36    async with aiohttp.ClientSession() as session:37        async with session.post(38            token_url,39            headers=headers,40            data=body,41            params=[42                ("client_id", env["GITHUB_CLIENT_ID"]),43                ("client_secret", env["GITHUB_CLIENT_SECRET"]),44                ("code", code),45            ]46            # auth=(env["GITHUB_CLIENT_ID"], env["GITHUB_CLIENT_SECRET"])47        ) as token_response:48            data = await token_response.read()49            print(data)50            tokens = oath_client.parse_request_body_response(data)51            print(tokens)52            new_headers = {"Authorization": "token " + tokens.get("access_token")}53            async with session.get(54                "https://api.github.com/user", headers=new_headers55            ) as user_response:56                user_data_str = await user_response.read()57                user_data = json.loads(user_data_str)58                print(user_data)59                session = await get_session(req)60                session["container_name"] = (61                    user_data["login"] + "_" + str(user_data["id"])62                )63                return {"is_logged_in": True}64@aiohttp_jinja2.template("home.html")65async def logout(req: web.Request) -> web.Response:66    session = await get_session(req)67    session.invalidate()68    return {"is_logged_in": False}69@aiohttp_jinja2.template("home.html")70async def does_work(req: web.Request) -> web.Response:71    sess = await get_session(req)72    return {"is_logged_in": "container_name" in sess.keys()}73async def proxy_handler(req: web.Request) -> web.Response:74    sess = await get_session(req)75    if "container_name" not in sess.keys():76        raise web.HTTPFound("/login")77    else:78        container_name = sess["container_name"]79    code_server_manager = CodeServerManager(container_name)80    await code_server_manager.find_or_create_container()81    reqH = req.headers.copy()82    base_url = f"http://{container_name}:8080"83    # Do web socket Stuff84    if (85        reqH["connection"] == "Upgrade"86        and reqH["upgrade"] == "websocket"87        and req.method == "GET"88    ):89        ws_server = web.WebSocketResponse()90        await ws_server.prepare(req)91        print(f"##### WS_SERVER {ws_server}")92        client_session = ClientSession(cookies=req.cookies)93        path_qs_cleaned = req.path_qs.removeprefix("/devenv")94        async with client_session.ws_connect(base_url + path_qs_cleaned) as ws_client:95            print(f"##### WS_CLIENT {ws_client}")96            async def wsforward(ws_from, ws_to):97                async for msg in ws_from:98                    print(f">>> msg: {msg}")99                    mt = msg.type100                    md = msg.data101                    if mt == WSMsgType.TEXT:102                        await ws_to.send_str(md)103                    elif mt == WSMsgType.BINARY:104                        await ws_to.send_bytes(md)105                    elif mt == WSMsgType.PING:106                        await ws_to.ping()107                    elif mt == WSMsgType.PONG:108                        await ws_to.pong()109                    elif ws_to.closed:110                        await ws_to.close(code=ws_to.close_code, message=msg.extra)111                    else:112                        raise ValueError(f"unexpected message type: {msg}")113            await asyncio.wait(114                [wsforward(ws_server, ws_client), wsforward(ws_client, ws_server)],115                return_when=asyncio.FIRST_COMPLETED,116            )117            return ws_server118    else:  # Do http proxy119        proxyPath = req.path_qs120        if proxyPath != "":121            proxyPath = (122                proxyPath.removeprefix("/devenv")123                .removeprefix("devenv")124                .removeprefix("/")125            )126            proxyPath = "/" + proxyPath127        async with client.request(128            req.method,129            base_url + proxyPath,130            allow_redirects=False,131            data=await req.read(),132        ) as res:133            headers = res.headers.copy()134            headers["service-worker-allowed"] = "/"135            body = await res.read()136            return web.Response(headers=headers, status=res.status, body=body)137app = web.Application()138# Set up sessions139secret_key = base64.urlsafe_b64decode(env["FERNET_KEY"])140setup(app, EncryptedCookieStorage(secret_key))141# Set up routes142app.add_routes([web.get("/", does_work)])143app.add_routes([web.get("/login", login)])144app.add_routes([web.get("/callback", callback)])145app.add_routes([web.get("/logout", logout)])146app.add_routes([web.get(r"/devenv", proxy_handler)])147app.add_routes([web.get(r"/{proxyPath:.*}", proxy_handler)])148# app.add_routes([web.get(r'/{proxyPath:.*}', proxy_handler)])149aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader("./templates"))150if "SSL_CRT_FILE" in env.keys() and "SSL_KEY_FILE" in env.keys():151    ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)152    ssl_context.load_cert_chain(env["SSL_CRT_FILE"], env["SSL_KEY_FILE"])153    web.run_app(app, port=5000, ssl_context=ssl_context)...humming_ws_server.py
Source:humming_ws_server.py  
...12    # url_host_only is used for creating one HummingWSServer to handle all websockets requests and responses for13    # a given url host.14    url_host_only = False15    @staticmethod16    def get_ws_server(url):17        if HummingWsServerFactory.url_host_only:18            url = urlparse(url).netloc19        return HummingWsServerFactory._ws_servers.get(url)20    @staticmethod21    def start_new_server(url):22        port = get_open_port()23        ws_server = HummingWsServer(HummingWsServerFactory.host, port)24        if HummingWsServerFactory.url_host_only:25            url = urlparse(url).netloc26        HummingWsServerFactory._ws_servers[url] = ws_server27        ws_server.start()28        return ws_server29    @staticmethod30    def reroute_ws_connect(url, **kwargs):31        ws_server = HummingWsServerFactory.get_ws_server(url)32        if ws_server is None:33            return HummingWsServerFactory._orig_ws_connect(url, **kwargs)34        kwargs.clear()35        return HummingWsServerFactory._orig_ws_connect(f"ws://{ws_server.host}:{ws_server.port}", **kwargs)36    @staticmethod37    async def send_str(url, message, delay=0):38        if delay > 0:39            await asyncio.sleep(delay)40        ws_server = HummingWsServerFactory.get_ws_server(url)41        await ws_server.websocket.send(message)42    @staticmethod43    def send_str_threadsafe(url, msg, delay=0):44        ws_server = HummingWsServerFactory.get_ws_server(url)45        asyncio.run_coroutine_threadsafe(HummingWsServerFactory.send_str(url, msg, delay), ws_server.ev_loop)46    @staticmethod47    async def send_json(url, data, delay=0):48        if delay > 0:49            await asyncio.sleep(delay)50        ws_server = HummingWsServerFactory.get_ws_server(url)51        await ws_server.websocket.send(json.dumps(data))52    @staticmethod53    def send_json_threadsafe(url, data, delay=0):54        ws_server = HummingWsServerFactory.get_ws_server(url)55        asyncio.run_coroutine_threadsafe(HummingWsServerFactory.send_json(url, data, delay), ws_server.ev_loop)56class HummingWsServer:57    def __init__(self, host, port):58        self.ev_loop: None59        self._started: bool = False60        self.host = host61        self.port = port62        self.websocket = None63        self.stock_responses = {}64    def add_stock_response(self, request, json_response):65        self.stock_responses[request] = json_response66    async def _handler(self, websocket, path):67        self.websocket = websocket68        async for msg in self.websocket:...chat.py
Source:chat.py  
1import json, threading2class Destination:3	def __init__( self, nick, con=None ):4		self.nick = nick5		self.con = con6class ChatServer:7	def __init__( self ):8		self.dst_queues = {}9		self.dst_by_id = {}10	def register_destination( self, dst ):11		if dst.nick not in self.dst_queues:12			self.dst_queues[dst.nick] = {}13		if dst.con not in self.dst_queues[dst.nick]:14			if dst.con!=None and None in self.dst_queues[dst.nick]:15				# aufgelaufene nicht verbindungsorientierte Nachrichten übernehmen16				self.dst_queues[dst.nick][dst.con] = [] + self.dst_queues[dst.nick][None]17				# anonyme Warteschlange wird nach der Ãbernahme bereinigt:18				self.dst_queues[dst.nick][None] = []19			else:20				self.dst_queues[dst.nick][dst.con] = []21	def remove_destination( self, dst ):22		if dst.nick in self.dst_queues:23			if dst.con in self.dst_queues[dst.nick] and dst.con!=None:24				del self.dst_queues[dst.nick][dst.con]25			if len(self.dst_queues[dst.nick])==0:26				del self.dst_queues[dst.nick]27	def get_connected_nicks( self ):28		nicks = set()29		for nick in self.dst_queues:30			for con in self.dst_queues[nick]:31				if con!=None:32					nicks.add( nick )33		return list(nicks)34	def send_message( self, dst, msg, msg_type="text", src=None ):35		self.register_destination( dst )36		if dst.con:37			# Direktzustellung an spezifische Verbindung38			self.dst_queues[dst.nick][dst.con].insert( 0, {"src" : src, "msg_type" : msg_type, "msg" : msg} )39		else:40			# Nachrichtenzustellung erfolgt an alle für den Nick registrierten Verbindungen41			for con in self.dst_queues[dst.nick]:42				self.dst_queues[dst.nick][con].insert( 0, {"src" : src, "msg_type" : msg_type, "msg" : msg} )43	def broadcast_message( self, msg, msg_type="text", src=None ):44		for nick in self.dst_queues:45			self.send_message( src=src, dst=Destination(nick), msg=msg, msg_type=msg_type )46	def recv_message( self, dst ):47		self.register_destination( dst )48		while self.dst_queues[dst.nick][dst.con]:49			msg = self.dst_queues[dst.nick][dst.con].pop()50			#msg["msg"] += " -> [%s,%s]" % (dst.nick,str(dst.con))51			yield msg52		53global_chat_server = ChatServer()54global_chat_server_semaphore = threading.Semaphore()55def initialize( ws_server ):56	print( str(threading.current_thread().ident)+" initialize" )57	global global_chat_server58	ws_server.user_status = ws_server.app.user.status()59	nick = ws_server.user_status["login"]["nick"]60	global_chat_server_semaphore.acquire()61	global_chat_server.register_destination( dst=Destination(nick,ws_server.con) )62	global_chat_server.broadcast_message( msg={"nick":nick, "nick_list":global_chat_server.get_connected_nicks()}, msg_type="join" )63	print( str(global_chat_server.dst_queues) )64	global_chat_server_semaphore.release()65def process_message( ws_server, msg ):66	print( str(threading.current_thread().ident)+" process_message" )67	cmd = json.loads(msg)68	global global_chat_server69	if "msg" in cmd:70		global_chat_server_semaphore.acquire()71		if "dst" in cmd:72			global_chat_server.send_message( src=ws_server.user_status["login"]["nick"], dst=Destination(cmd["dst"]), msg=cmd["msg"] )73		else:74			global_chat_server.broadcast_message( src=ws_server.user_status["login"]["nick"], msg=cmd["msg"] )75		global_chat_server_semaphore.release()76def run( ws_server ):77	#print( str(threading.current_thread().ident)+" run" )78	global global_chat_server79	global_chat_server_semaphore.acquire()80	for msg in global_chat_server.recv_message( dst=Destination(ws_server.user_status["login"]["nick"],ws_server.con) ):81		ws_server.ws.send( json.dumps(msg) )82	global_chat_server_semaphore.release()83def sleep( ws_server ):84	#print( str(threading.current_thread().ident)+" sleep" )85	ws_server.ws.client_message_event.wait( timeout=0.2 )86	87def cleanup( ws_server ):88	print( str(threading.current_thread().ident)+" cleanup" )89	global global_chat_server90	nick = ws_server.user_status["login"]["nick"]91	global_chat_server_semaphore.acquire()92	global_chat_server.remove_destination( dst=Destination(nick,ws_server.con) )93	global_chat_server.broadcast_message( msg={"nick":nick, "nick_list":global_chat_server.get_connected_nicks()}, msg_type="leave" )94	print( str(global_chat_server.dst_queues) )...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
