Best Python code snippet using slash
jupyter_kernel.py
Source:jupyter_kernel.py  
...629                    #630                    # now a couple of things are connected, call the session_cleanup_callback631                    #632                    if self.task_cnt > 1 and self.session_cleanup_callback:633                        self.session_cleanup_callback()634                        self.session_cleanup_callback = None635                elif msg[0] == "unregister":636                    if msg[1] in self.tasks:637                        self.tasks[msg[1]].discard(msg[2])638                    self.task_cnt -= 1639                    #640                    # if there are no connection tasks left, then shutdown the kernel641                    #642                    if self.task_cnt == 0 and self.task_cnt_max >= 4:643                        asyncio.create_task(self.session_shutdown())644                        await asyncio.sleep(10000)645                elif msg[0] == "shutdown":646                    asyncio.create_task(self.session_shutdown())647                    await asyncio.sleep(10000)648            except asyncio.CancelledError:649                raise650            except Exception:651                _LOGGER.error("housekeep task exception: %s", traceback.format_exc(-1))652    async def startup_timeout(self):653        """Shut down the session if nothing connects after 30 seconds."""654        await self.housekeep_q.put(["register", "startup_timeout", asyncio.current_task()])655        await asyncio.sleep(self.no_connect_timeout)656        if self.task_cnt_max <= 1:657            #658            # nothing started other than us, so shut down the session659            #660            _LOGGER.error("No connections to session %s; shutting down", self.global_ctx_name)661            if self.session_cleanup_callback:662                self.session_cleanup_callback()663                self.session_cleanup_callback = None664            await self.housekeep_q.put(["shutdown"])665        await self.housekeep_q.put(["unregister", "startup_timeout", asyncio.current_task()])666    async def start_one_server(self, callback):667        """Start a server by finding an available port."""668        first_port = self.avail_port669        for _ in range(2048):670            try:671                server = await asyncio.start_server(callback, "0.0.0.0", self.avail_port)672                return server, self.avail_port673            except OSError:674                self.avail_port += 1675        _LOGGER.error(676            "unable to find an available port from %d to %d", first_port, self.avail_port - 1,677        )678        return None, None679    def get_ports(self):680        """Return a dict of the port numbers this kernel session is listening to."""681        return {682            "iopub_port": self.iopub_port,683            "hb_port": self.heartbeat_port,684            "control_port": self.control_port,685            "stdin_port": self.stdin_port,686            "shell_port": self.shell_port,687        }688    def set_session_cleanup_callback(self, callback):689        """Set a cleanup callback which is called right after the session has started."""690        self.session_cleanup_callback = callback691    async def session_start(self):692        """Start the kernel session."""693        self.ast_ctx.add_logger_handler(self.console)694        _LOGGER.info("Starting session %s", self.global_ctx_name)695        self.tasks["housekeep"] = {asyncio.create_task(self.housekeep_run())}696        self.tasks["startup_timeout"] = {asyncio.create_task(self.startup_timeout())}697        self.iopub_server, self.iopub_port = await self.start_one_server(self.iopub_listen)698        self.heartbeat_server, self.heartbeat_port = await self.start_one_server(self.heartbeat_listen)699        self.control_server, self.control_port = await self.start_one_server(self.control_listen)700        self.stdin_server, self.stdin_port = await self.start_one_server(self.stdin_listen)701        self.shell_server, self.shell_port = await self.start_one_server(self.shell_listen)702        #...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
