Best Python code snippet using localstack_python
nicolive.py
Source:nicolive.py  
1import json2import logging3import re4from streamlink.utils import websocket5import threading6import time7from streamlink.plugin import Plugin, PluginArguments, PluginArgument8from streamlink.plugin.api import useragents9from streamlink.stream import HLSStream10from streamlink.compat import urlparse, unquote_plus11_log = logging.getLogger(__name__)12_url_re = re.compile(13    r"^https?://(?P<domain>live[0-9]*\.nicovideo\.jp)/watch/lv[0-9]*")14_login_url = "https://account.nicovideo.jp/login/redirector"15_login_url_params = {16    "show_button_twitter": 1,17    "show_button_facebook": 1,18    "next_url": "/"}19class NicoLive(Plugin):20    arguments = PluginArguments(21        PluginArgument(22            "email",23            argument_name="niconico-email",24            sensitive=True,25            metavar="EMAIL",26            help="The email or phone number associated with your "27                 "Niconico account"),28        PluginArgument(29            "password",30            argument_name="niconico-password",31            sensitive=True,32            metavar="PASSWORD",33            help="The password of your Niconico account"),34        PluginArgument(35            "user-session",36            argument_name="niconico-user-session",37            sensitive=True,38            metavar="VALUE",39            help="Value of the user-session token \n(can be used in "40                 "case you do not want to put your password here)"))41    is_stream_ready = False42    is_stream_ended = False43    watching_interval = 3044    watching_interval_worker_thread = None45    stream_reader = None46    _ws = None47    frontend_id = None48    @classmethod49    def can_handle_url(cls, url):50        return _url_re.match(url) is not None51    def _get_streams(self):52        self.url = self.url.split("?")[0]53        self.session.http.headers.update({54            "User-Agent": useragents.CHROME,55        })56        if not self.get_wss_api_url():57            _log.debug("Coundn't extract wss_api_url. Attempting login...")58            if not self.niconico_web_login():59                return None60            if not self.get_wss_api_url():61                _log.error("Failed to get wss_api_url.")62                _log.error(63                    "Please check if the URL is correct, "64                    "and make sure your account has access to the video.")65                return None66        self.api_connect(self.wss_api_url)67        i = 068        while not self.is_stream_ready:69            if i % 10 == 0:70                _log.debug("Waiting for permit...")71            if i == 600:72                _log.error("Waiting for permit timed out.")73                return None74            if self.is_stream_ended:75                return None76            time.sleep(0.1)77            i += 178        streams = HLSStream.parse_variant_playlist(79            self.session, self.hls_stream_url)80        nico_streams = {}81        for s in streams:82            nico_stream = NicoHLSStream(streams[s], self)83            nico_streams[s] = nico_stream84        return nico_streams85    def get_wss_api_url(self):86        _log.debug("Getting video page: {0}".format(self.url))87        resp = self.session.http.get(self.url)88        try:89            self.wss_api_url = extract_text(90                resp.text, ""webSocketUrl":"", """)91            if not self.wss_api_url:92                return False93        except Exception as e:94            _log.debug(e)95            _log.debug("Failed to extract wss api url")96            return False97        try:98            self.frontend_id = extract_text(99                resp.text, ""frontendId":", ","")100        except Exception as e:101            _log.debug(e)102            _log.warning("Failed to extract frontend id")103        self.wss_api_url = "{0}&frontend_id={1}".format(self.wss_api_url, self.frontend_id)104        _log.debug("Video page response code: {0}".format(resp.status_code))105        _log.trace(u"Video page response body: {0}".format(resp.text))106        _log.debug("Got wss_api_url: {0}".format(self.wss_api_url))107        _log.debug("Got frontend_id: {0}".format(self.frontend_id))108        return self.wss_api_url.startswith("wss://")109    def api_on_open(self):110        self.send_playerversion()111        require_new_stream = not self.is_stream_ready112        self.send_getpermit(require_new_stream=require_new_stream)113    def api_on_error(self, ws, error=None):114        if error:115            _log.warning(error)116        _log.warning("wss api disconnected.")117        _log.warning("Attempting to reconnect in 5 secs...")118        time.sleep(5)119        self.api_connect(self.wss_api_url)120    def api_connect(self, url):121        # Proxy support adapted from the UStreamTV plugin (ustreamtv.py)122        proxy_url = self.session.get_option("https-proxy")123        if proxy_url is None:124            proxy_url = self.session.get_option("http-proxy")125        proxy_options = parse_proxy_url(proxy_url)126        if proxy_options.get('http_proxy_host'):127            _log.debug("Using proxy ({0}://{1}:{2})".format(128                proxy_options.get('proxy_type') or "http",129                proxy_options.get('http_proxy_host'),130                proxy_options.get('http_proxy_port') or 80))131        _log.debug("Connecting: {0}".format(url))132        self._ws = websocket.WebSocketApp(133            url,134            header=["User-Agent: {0}".format(useragents.CHROME)],135            on_open=self.api_on_open,136            on_message=self.handle_api_message,137            on_error=self.api_on_error)138        self.ws_worker_thread = threading.Thread(139            target=self._ws.run_forever,140            args=proxy_options)141        self.ws_worker_thread.daemon = True142        self.ws_worker_thread.start()143    def send_message(self, type_, body):144        msg = {"type": type_, "body": body}145        msg_json = json.dumps(msg)146        _log.debug(u"Sending: {0}".format(msg_json))147        if self._ws and self._ws.sock.connected:148            self._ws.send(msg_json)149        else:150            _log.warning("wss api is not connected.")151    def send_no_body_message(self, type_):152        msg = {"type": type_}153        msg_json = json.dumps(msg)154        _log.debug(u"Sending: {0}".format(msg_json))155        if self._ws and self._ws.sock.connected:156            self._ws.send(msg_json)157        else:158            _log.warning("wss api is not connected.")159    def send_custom_message(self, msg):160        msg_json = json.dumps(msg)161        _log.debug(u"Sending: {0}".format(msg_json))162        if self._ws and self._ws.sock.connected:163            self._ws.send(msg_json)164        else:165            _log.warning("wss api is not connected.")166    def send_playerversion(self):167        body = {168            "type": "startWatching",169            "data": {170                "stream": {171                    "quality": "abr",172                    "protocol": "hls",173                    "latency": "high",174                    "chasePlay": False175                },176                "room": {177                    "protocol": "webSocket",178                    "commentable": True179                },180                "reconnect": False181            }182        }183        self.send_custom_message(body)184    def send_getpermit(self, require_new_stream=True):185        body = {186            "type": "getAkashic",187            "data": {188                "chasePlay": False189            }190        }191        self.send_custom_message(body)192    def send_watching(self):193        body = {194            "command": "watching",195            "params": [self.broadcast_id, "-1", "0"]196        }197        self.send_message("watch", body)198    def send_pong(self):199        self.send_no_body_message("pong")200        self.send_no_body_message("keepSeat")201    def handle_api_message(self, message):202        _log.debug(u"Received: {0}".format(message))203        message_parsed = json.loads(message)204        if message_parsed["type"] == "stream":205            data = message_parsed["data"]206            self.hls_stream_url = data["uri"]207            self.is_stream_ready = True208        if message_parsed["type"] == "watch":209            body = message_parsed["body"]210            command = body["command"]211            if command == "currentstream":212                current_stream = body["currentStream"]213                self.hls_stream_url = current_stream["uri"]214                self.is_stream_ready = True215            elif command == "watchinginterval":216                self.watching_interval = int(body["params"][0])217                _log.debug("Got watching_interval: {0}".format(218                    self.watching_interval))219                if self.watching_interval_worker_thread is None:220                    _log.debug("send_watching_scheduler starting.")221                    self.watching_interval_worker_thread = threading.Thread(222                        target=self.send_watching_scheduler)223                    self.watching_interval_worker_thread.daemon = True224                    self.watching_interval_worker_thread.start()225                else:226                    _log.debug("send_watching_scheduler already running.")227            elif command == "disconnect":228                _log.info("Websocket API closed.")229                _log.info("Stream ended.")230                self.is_stream_ended = True231                if self.stream_reader is not None:232                    self.stream_reader.close()233                    _log.info("Stream reader closed.")234        elif message_parsed["type"] == "ping":235            self.send_pong()236    def send_watching_scheduler(self):237        """238        Periodically send "watching" command to the API.239        This is necessary to keep the session alive.240        """241        while not self.is_stream_ended:242            self.send_watching()243            time.sleep(self.watching_interval)244    def niconico_web_login(self):245        user_session = self.get_option("user-session")246        email = self.get_option("email")247        password = self.get_option("password")248        if user_session is not None:249            _log.info("User session cookie is provided. Using it.")250            self.session.http.cookies.set(251                "user_session",252                user_session,253                path="/",254                domain="nicovideo.jp")255            self.save_cookies()256            return True257        elif email is not None and password is not None:258            _log.info("Email and password are provided. Attemping login.")259            payload = {"mail_tel": email, "password": password}260            resp = self.session.http.post(_login_url, data=payload,261                                          params=_login_url_params)262            _log.debug("Login response code: {0}".format(resp.status_code))263            _log.trace(u"Login response body: {0}".format(resp.text))264            _log.debug("Cookies: {0}".format(265                self.session.http.cookies.get_dict()))266            if self.session.http.cookies.get("user_session") is None:267                try:268                    msg = extract_text(269                        resp.text, '<p class="notice__text">', "</p>")270                except Exception as e:271                    _log.debug(e)272                    msg = "unknown reason"273                _log.warn("Login failed. {0}".format(msg))274                return False275            else:276                _log.info("Logged in.")277                self.save_cookies()278                return True279        else:280            _log.warn(281                "Neither a email and password combination nor a user session "282                "token is provided. Cannot attempt login.")283            return False284class NicoHLSStream(HLSStream):285    def __init__(self, hls_stream, nicolive_plugin):286        super(NicoHLSStream, self).__init__(287            hls_stream.session,288            force_restart=hls_stream.force_restart,289            start_offset=hls_stream.start_offset,290            duration=hls_stream.duration,291            **hls_stream.args)292        # url is already in hls_stream.args293        self.nicolive_plugin = nicolive_plugin294    def open(self):295        reader = super(NicoHLSStream, self).open()296        self.nicolive_plugin.stream_reader = reader297        return reader298def extract_text(text, left, right):299    """Extract text from HTML"""300    result = re.findall("{0}(.*?){1}".format(left, right), text)301    if len(result) != 1:302        raise Exception("Failed to extract string. "303                        "Expected 1, found {0}".format(len(result)))304    return result[0]305def parse_proxy_url(purl):306    """Adapted from UStreamTV plugin (ustreamtv.py)"""307    proxy_options = {}308    if purl:309        p = urlparse(purl)310        proxy_options['proxy_type'] = p.scheme311        proxy_options['http_proxy_host'] = p.hostname312        if p.port:313            proxy_options['http_proxy_port'] = p.port314        if p.username:315            proxy_options['http_proxy_auth'] = \316                (unquote_plus(p.username), unquote_plus(p.password or ""))317    return proxy_options...client.py
Source:client.py  
1import grpc2# classes generated by grpc3from .generated import eval_server_pb24from .generated import eval_server_pb2_grpc5# other imports6import numpy as np 7import time8from time import perf_counter9from datetime import datetime10# multiprocessing 11import multiprocessing as mp12from multiprocessing import shared_memory, resource_tracker13from threading import Thread14# suppress shared memory warnings15from .utils import remove_shm_from_resource_tracker16remove_shm_from_resource_tracker()17# receive input fidx streamed by server, store them in a list18def receive_stream(seq, latest_fidx, fid_ptr_dict, is_stream_ready, stream_start_time, config, verbose=False):19    if verbose:20        print("EvalClient (", datetime.now(), "): ", "Requesting stream for sequence ", seq)21    channel = grpc.insecure_channel(config['loopback_ip'] + ":" + str(config['image_service_port']))22    stub = eval_server_pb2_grpc.ImageServiceStub(channel)23    stream_request = eval_server_pb2.String(value=seq)24    send_times = []25    # receive input stream26    for i, response in enumerate(stub.GetImageStream(stream_request)):27        if i == 0:28            stream_start_time.value = perf_counter()29            if verbose:30                print("EvalClient (", datetime.now(), "): ", "Receiving stream for sequence ", seq) 31        if response.end_marker:32            latest_fidx.value = -133            break34        is_stream_ready.clear()35        latest_fidx.value = response.fid36        fid_ptr_dict[response.fid] = (response.start_ptr, response.end_ptr)37        if response.fid >= 0:38            is_stream_ready.set()39        send_times.append(perf_counter() - response.timestamp)40    # if verbose:41    #     print("EvalClient (", datetime.now(), "): ", "Mean sending time = ", np.mean(send_times), "s, stdev = ", np.std(send_times), "Max = ", np.max(send_times), "Min = ", np.min(send_times))42    channel.close()43class EvalClient:44    def __init__(self, config, state=None, verbose=False):45        self.img_width, self.img_height = 1920, 120046        if state is None:47            mp.set_start_method('spawn')48            self.latest_fidx = mp.Value('i', -1, lock=True)49            self.is_stream_ready = mp.Event()50            self.fid_ptr_dict = mp.Manager().dict()51            self.stream_start_time = mp.Value('d', 0.0, lock=True)52        else:53            self.latest_fidx = state[0]54            self.is_stream_ready = state[1]55            self.fid_ptr_dict = state[2]56        57        self.verbose = verbose58        # create image receiver stub59        self.channel = grpc.insecure_channel(config['loopback_ip'] + ":" + str(config['image_service_port']))60        self.config = config61        self.stub = eval_server_pb2_grpc.ImageServiceStub(self.channel)62        response = self.stub.GetShm(eval_server_pb2.Empty())63        self.existing_shm = shared_memory.SharedMemory(name=response.value)64        self.channel.close()65        # create result sender stub66        self.result_channel = grpc.insecure_channel(config['loopback_ip'] + ":" + str(config['result_service_port']))67        self.result_stub = eval_server_pb2_grpc.ResultServiceStub(self.result_channel)68        response = self.result_stub.GetShm(eval_server_pb2.Empty())69        self.results_shm = shared_memory.SharedMemory(name=response.value)70        self.results_np = np.ndarray((100, 6), dtype=np.float32, buffer=self.results_shm.buf)71        self.is_stream_ready.clear()72        self.stream_process = None73        self.result_thread = None74    def get_state(self):75        return (self.latest_fidx, self.is_stream_ready, self.fid_ptr_dict)76    def close(self, results_file='results.json'):77        self.result_stub.GenResults(eval_server_pb2.String(value=results_file))78        self.result_channel.close()79        self.existing_shm.close()80        self.results_shm.close()81    def stop_stream(self):82        self.stream_process.join()83        self.result_stub.FinishSequence(eval_server_pb2.Empty())84        self.is_stream_ready.clear()85        self.stream_process = None86    def request_stream(self, seq):87        # receiver as separate processs88        self.stream_process = mp.Process(target=receive_stream, args=(seq, self.latest_fidx, self.fid_ptr_dict, self.is_stream_ready, self.stream_start_time, self.config, self.verbose))89        self.stream_process.start()90    def get_latest_fidx(self):91        self.is_stream_ready.wait()92        return self.latest_fidx.value93    def get_frame(self, fid=None, ptr=False):94        if fid is not None and fid < 0:95            raise TypeError(f"fid must be non-negative")96        if fid is None:97            fid = self.get_latest_fidx()98            if fid == -1:99                return None, None100        elif fid not in self.fid_ptr_dict:101            raise KeyError(f"frame not available yet")102        start_ptr, end_ptr = self.fid_ptr_dict[fid]103        if ptr:104            return fid, int(start_ptr/(self.img_height*self.img_width*3))105        return fid, np.ndarray((self.img_height, self.img_width, 3), dtype=np.uint8, buffer=self.existing_shm.buf[start_ptr:end_ptr])106    def send_result_shm(self, bboxes, bbox_scores, labels, timestamp):107        num_detections = min(len(bboxes), 100)108        self.results_np[:num_detections, :4] = bboxes[:num_detections]109        self.results_np[:num_detections, 4] = bbox_scores[:num_detections]110        self.results_np[:num_detections, 5] = labels[:num_detections]111        self.result_stub.SignalResultsReady(eval_server_pb2.Result(timestamp=timestamp, num_bboxes=num_detections))112    def send_result_to_server(self, bboxes, bbox_scores, labels):113        timestamp = perf_counter()114        if self.result_thread:115            self.result_thread.join()116        self.result_thread = Thread(target=self.send_result_shm, args=(bboxes, bbox_scores, labels, timestamp))117        self.result_thread.start()118    def get_frame_buf(self):119        return self.existing_shm120    def get_stream_start_time(self):121        self.is_stream_ready.wait()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
