How to use ws_client_handler method in devtools-proxy

Best Python code snippet using devtools-proxy_python

sensor_manager.py

Source:sensor_manager.py Github

copy

Full Screen

1#!/usr/bin/env python32# -*- coding: utf-8 -*-3#4# Copyright 2011-2016 Lancaster University.5#6#7# This file is part of Yarely.8#9# Licensed under the Apache License, Version 2.0.10# For full licensing information see /LICENSE.11# Standard library imports12import copy13from importlib import import_module14import logging15import queue16import threading17import time18from xml.etree import ElementTree19# Third party imports20import zmq21# Local (Yarely) imports22from yarely.core.helpers.base_classes import ApplicationError, HandlerStub23from yarely.core.helpers.base_classes.manager import (24 check_handler_token, Manager25)26from yarely.core.helpers.execution import application_loop27from yarely.core.helpers.zmq import (28 ZMQ_ADDRESS_INPROC, ZMQ_ADDRESS_LOCALHOST, ZMQ_SOCKET_LINGER_MSEC,29 ZMQ_SOCKET_NO_LINGER, ZMQ_REQUEST_TIMEOUT_MSEC,30 ZMQ_SENSORMANAGER_REP_PORT, ZMQ_SENSORMANAGER_REQ_PORT31)32log = logging.getLogger(__name__)33_TERMINATION_MARKER = object()34QUEUE_TIMEOUT = 1 # Seconds35SUBPROCESS_CHECKIN_INTERVAL = 1 # Seconds36WARN_NO_REPLY = 'Expected reply from Scheduler not received, will retry.'37SOCKET_PORT = 978638WEBSOCKET_PORT = 978739class SensorMangerError(ApplicationError):40 """Base class for Sensor Manager errors"""41 pass42class SensorManager(Manager):43 """Manages sensors"""44 def __init__(self):45 """Default constructor - Creates a new SensorManager."""46 description = "Manage Yarely sensors"47 # The parent constructor provides config and logging and gets a48 # starting set of handlers using this classes _init_handlers() method.49 super().__init__(ZMQ_SENSORMANAGER_REP_PORT, description)50 self.registered = False51 # Setup for ZMQ Scheduler Messaging52 sensor_term_id = "sensormanager_term_{id}"53 self.zmq_sensormanager_term_identifier = sensor_term_id.format(54 id=id(self)55 )56 self.zmq_scheduler_req_addr = ZMQ_ADDRESS_LOCALHOST.format(57 port=ZMQ_SENSORMANAGER_REQ_PORT58 )59 self.zmq_scheduler_request_queue = queue.Queue() # Q of infinite size60 def _init_handlers(self):61 # The _registered_handlers dictionary is keyed by 'type',62 # current keys are:63 # 'socket' => Handles a socket (virtual) sensor.64 # 'websocket' => Handles a Websocket interface.65 python_launch_str = 'python3 -m {module}'66 # Socket handler67 # socket_handler = HandlerStub(68 # command_line_args=python_launch_str.format(69 # module='yarely.core.sensors.handlers.socket'70 # )71 # )72 # socket_handler.params_over_zmq = dict([('port', str(SOCKET_PORT))])73 # self.add_handler('socket', socket_handler)74 # Websocket handler (TURNED OFF FOR NOW.)75 # websocket_handler = HandlerStub(76 # command_line_args=python_launch_str.format(77 # module='yarely.core.sensors.handlers.websocket'78 # )79 # )80 # websocket_handler.params_over_zmq = dict(81 # [('port', str(WEBSOCKET_PORT))]82 # )83 # Websocket Client handler84 ws_client_handler = HandlerStub(85 command_line_args=python_launch_str.format(86 module='yarely.core.sensors.handlers.ws_client'87 )88 )89 # TODO: change this when we start using Python 3.490 try:91 import_module('tornado')92 except ImportError:93 log.warning(94 "Tornado not installed on this machine, not adding it to "95 "the list of handlers."96 )97 else:98 # self.add_handler('websocket', websocket_handler)99 self.add_handler('ws_client', ws_client_handler)100 def _handle_zmq_req_to_scheduler(self):101 """Executes in separate thread: _zmq_req_to_scheduler_thread."""102 # Provide some constants used as the return codes of nested103 # function _loop_over_sockets()104 NO_DATA = 0105 TERM = -1106 # Initialise ZMQ request socket107 zmq_request_socket = self.zmq_context.socket(zmq.REQ)108 zmq_request_socket.setsockopt(zmq.LINGER, ZMQ_SOCKET_LINGER_MSEC)109 zmq_request_socket.connect(self.zmq_scheduler_req_addr)110 # Initialise ZMQ socket to watch for termination before recvs111 # (we use the request queue to watch for termination before sends).112 zmq_termination_reply_socket = self.zmq_context.socket(zmq.REP)113 zmq_termination_reply_socket.bind(114 ZMQ_ADDRESS_INPROC.format(115 identifier=self.zmq_sensormanager_term_identifier116 )117 )118 # Initialise ZMQ Poller for recvs119 zmq_request_poller = zmq.Poller()120 zmq_request_poller.register(zmq_request_socket, zmq.POLLIN)121 zmq_request_poller.register(zmq_termination_reply_socket, zmq.POLLIN)122 # Provide a method to loop over sockets that have data123 def _loop_over_sockets():124 rtn = NO_DATA125 for sock in socks_with_data:126 if sock is zmq_termination_reply_socket:127 return TERM128 elif sock is zmq_request_socket:129 reply = sock.recv().decode()130 if not reply:131 continue132 log.debug(133 'Received reply from Scheduler: {msg}'.format(134 msg=reply135 )136 )137 self._handle_zmq_msg(reply)138 rtn = len(reply)139 else:140 log.info(141 'Unhandled socket data: {sock}'.format(sock=sock)142 )143 return rtn144 # Time the last request was sent (unix timestamp)145 last_request = 0146 while True:147 # Send a message from the message queue148 try:149 qitem = self.zmq_scheduler_request_queue.get(150 timeout=QUEUE_TIMEOUT151 )152 # First check for termnation153 if qitem is _TERMINATION_MARKER:154 break155 # We've not been asked to terminate, so send156 # the message over ZMQ.157 # Queue items are ElementTree Elements, so we encode them to a158 # byte representation.159 encoded_qitem = ElementTree.tostring(qitem, encoding="UTF-8")160 last_request = time.time()161 log.debug('Sending request to Scheduler: {msg}'.format(162 msg=encoded_qitem)163 )164 zmq_request_socket.send(encoded_qitem)165 # Every send should have an associated receive.166 expect_reply = True167 result = None168 while expect_reply:169 socks_with_data = dict(170 zmq_request_poller.poll(ZMQ_REQUEST_TIMEOUT_MSEC)171 )172 if socks_with_data:173 result = _loop_over_sockets()174 if result is TERM: # Terminate175 break176 elif result is NO_DATA: # Rebuild socket177 log.warning(WARN_NO_REPLY)178 zmq_request_socket.setsockopt(zmq.LINGER,179 ZMQ_SOCKET_NO_LINGER)180 zmq_request_socket.close()181 zmq_request_poller.unregister(zmq_request_socket)182 zmq_request_socket = self.zmq_context.socket(183 zmq.REQ)184 zmq_request_socket.setsockopt(185 zmq.LINGER, ZMQ_SOCKET_LINGER_MSEC186 )187 zmq_request_socket.connect(self.zmq_req_addr)188 zmq_request_poller.register(zmq_request_socket,189 zmq.POLLIN)190 zmq_request_socket.send(encoded_qitem)191 else:192 assert(result > 0)193 expect_reply = False # Success!194 if result is TERM:195 break196 except queue.Empty:197 pass198 # We do this last so we don't check in if we've just sent data199 # If we're not registered yet, we can't checkin200 next_checkin = last_request + SUBPROCESS_CHECKIN_INTERVAL201 if self.registered and next_checkin <= time.time():202 self.check_in()203 zmq_request_socket.close()204 def _handle_reply_pong(self, msg_root, msg_elem):205 self.registered = True206 @check_handler_token207 def _handle_request_sensor_update(self, msg_root, msg_elem):208 token = msg_root.attrib['token']209 with self._lock:210 handler = self._lookup_executing_handler_with_token(token)211 handler.last_checkin = time.time()212 # Forward the message213 self.zmq_scheduler_request_queue.put_nowait(214 self._encapsulate_request(msg_elem)215 )216 return self._encapsulate_reply(self._generate_pong())217 def check_in(self):218 """Provide an occasional check-in to the Scheduler via ZMQ."""219 etree = self._encapsulate_request(self._generate_ping())220 self.zmq_scheduler_request_queue.put_nowait(etree)221 def _start_handler_if_exists(self, handler):222 """ Only starts `handler` if it was registered before. """223 if handler not in self._registered_handlers:224 return225 handler_stub = copy.deepcopy(226 self.get_handler_stub(handler)227 )228 self.start_handler(handler_stub)229 def _load_config(self):230 """In order to do additional stuff for a sensor, create a method231 called _load_config_ with the sensor name as suffix.232 """233 for handler_name, handler in self._registered_handlers.items():234 method_name = '_load_config_{}'.format(handler_name)235 log.debug(method_name)236 try:237 method = getattr(self, method_name)238 except AttributeError:239 continue240 method(handler)241 def _load_config_ws_client(self, handler):242 # Getting URL, path, display id and beacon id from config.243 ws_server_host = self.config.get(244 'Personalisation', 'ws_server_host',245 fallback='wss://scc-ecampus-isp.lancs.ac.uk'246 )247 ws_server_path = self.config.get(248 'Personalisation', 'ws_server_path', fallback='/sign_connect'249 )250 display_id = self.config.get(251 'Personalisation', 'display_id', fallback=None252 )253 beacon_id = self.config.get(254 'Personalisation', 'beacon_id', fallback=None255 )256 handler.params_over_zmq = dict([257 ('ws_server_host', str(ws_server_host)),258 ('ws_server_path', str(ws_server_path)),259 ('display_id', str(display_id)),260 ('beacon_id', str(beacon_id)),261 ])262 def _start_handler_if_exists(self, handler):263 """Only starts `handler` if it was registered before."""264 if handler not in self._registered_handlers:265 return266 handler_stub = copy.deepcopy(267 self.get_handler_stub(handler)268 )269 self.start_handler(handler_stub)270 def check_in(self):271 """Provide an occasional check-in to the Scheduler via ZMQ."""272 etree = self._encapsulate_request(self._generate_ping())273 self.zmq_scheduler_request_queue.put_nowait(etree)274 def start(self):275 """The main execution method for this application"""276 super().start()277 # Loading config from file and adding to handler stup if applicable.278 # We can't load the config while initialising handlers as the config279 # module is not available yet at that point.280 self._load_config()281 # Start socket, websocket and ws_client handlers282 self._start_handler_if_exists('socket')283 self._start_handler_if_exists('websocket')284 self._start_handler_if_exists('ws_client')285 # Start a new thread to create a ZMQ socket and send messages to286 # the scheduler287 self._zmq_req_to_scheduler_thread = threading.Thread(288 target=self._handle_zmq_req_to_scheduler289 )290 t_name = 'ZMQ Request Messenger (-> Scheduler)'291 self._zmq_req_to_scheduler_thread.name = t_name292 self._zmq_req_to_scheduler_thread.daemon = True293 self._zmq_req_to_scheduler_thread.start()294 def stop(self):295 # Send a ZMQ request to the inproc socket to be picked up by the poller296 zmq_termination_request_socket = self.zmq_context.socket(zmq.REQ)297 zmq_termination_request_socket.setsockopt(298 zmq.LINGER, ZMQ_SOCKET_LINGER_MSEC299 )300 zmq_termination_request_socket.connect(301 ZMQ_ADDRESS_INPROC.format(302 identifier=self.zmq_sensormanager_term_identifier303 )304 )305 zmq_termination_request_socket.send_unicode('TERMINATE')306 zmq_termination_request_socket.close()307 # And then pop a message on the queue of messages to go out just in308 # case we'd otherwise be blocked at that line.309 self.zmq_scheduler_request_queue.put_nowait(_TERMINATION_MARKER)310 # Now join the ZMQ thread and then call the parent class's stop() for311 # final cleanup.312 self._zmq_req_to_scheduler_thread.join()313 super().stop()314if __name__ == "__main__":...

Full Screen

Full Screen

proxy.py

Source:proxy.py Github

copy

Full Screen

...34 app['tabs'][tab_id] = {}35 # https://aiohttp.readthedocs.io/en/v1.0.0/faq.html#how-to-receive-an-incoming-events-from-different-sources-in-parallel36 task = app.loop.create_task(ws_browser_handler(request))37 app['tasks'].append(task)38 return await ws_client_handler(request)39async def ws_client_handler(request):40 app = request.app41 path_qs = request.path_qs42 tab_id = path_qs.split('/')[-1]43 url = f'ws://{app["chrome_host"]}:{app["chrome_port"]}{path_qs}'44 encode_id = app['f']['encode_id']45 client_id = len(app['clients'])46 log_prefix = f'[CLIENT {client_id}]'47 log_msg = app['f']['print']48 ws_client = WebSocketResponse()49 await ws_client.prepare(request)50 if client_id >= app['max_clients']:51 log_msg(log_prefix, 'CONNECTION FAILED')52 return ws_client53 app['clients'][ws_client] = {...

Full Screen

Full Screen

main.py

Source:main.py Github

copy

Full Screen

...325 if (job := await murdock.remove_job(uid)) is None:326 raise HTTPException(status_code=404, detail=f"No job with uid '{uid}' found")327 return job328@app.websocket("/ws/status")329async def ws_client_handler(websocket: WebSocket):330 LOGGER.debug("websocket opening")331 await websocket.accept()332 LOGGER.debug("websocket connection opened")333 murdock.add_ws_client(websocket)334 try:335 while True:336 _ = await websocket.receive_text()337 except WebSocketDisconnect:338 LOGGER.debug("websocket connection closed")...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run devtools-proxy automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful