Best Python code snippet using autotest_python
proxy.py
Source:proxy.py  
...156        """157        logger.debug("HFP handshake is starting...")158        try:159            # features160            self._ag_features = yield self._send_and_wait(161                "AT+BRSF={}".format(HF_BRSF_FEATURES),162                "BRSF")163            logger.debug("AG feature set = {}".format(self._ag_features))164            # we could negotiate to mSBC if we support it later, but for now165            # only support CVSD/PCM166            self._codec = "CVSD"167            if self._ag_features["CODEC_NEG"]:168                yield self._send_and_wait("AT+BAC=1", "OK")169                # try:170                #     yield self._send_and_wait("AT+BAC=2", "OK")171                #     self._codec = "mSBC"172                # except TimeoutError:173                #     logger.debug("Failed to negotiate to mSBC codec.")174            # indicators175            yield self._send_and_wait("AT+CIND=?", "CIND")176            yield self._send_and_wait("AT+CIND?", "CIND")177            yield self._send_and_wait("AT+CMER=3,0,0,1", "OK")178            # specific to call wait/3-way179            if self._ag_features["3WAY"]:180                self._ag_multicall = yield self._send_and_wait(181                    "AT+CHLD=?", "CHLD")182                # call waiting alerts183                yield self._send_and_wait("AT+CCWA=1", "OK")184                185            # extended error handling186            yield self._send_and_wait("AT+CMEE=1", "OK")187            # CLI188            yield self._send_and_wait("AT+CLIP=1", "OK")189            # network operator format190            yield self._send_and_wait("AT+COPS=3,0", "OK")191            self._connection.send_message("AT+COPS?")192        except TimeoutError as e:193            logger.exception("HFP handshake error.")194            if self._connection:195                self._connection.close()196            return197        # handshake is complete198        self._handshake_completed = True199        logger.debug("HFP handshake is complete.")200        if self.on_connected_changed:201            self.on_connected_changed(connected=True)202    @coroutine203    def _send_and_wait(self, command, reply_code):204        """DRY helper function to send a command, wait for reply and log it.205        """206        response = yield self._connection.send_message(207            message=command,208            async_reply_code=reply_code)209        logger.debug("{} response = {}".format(command, response))210        return response211    def _raise_event(self, name, **kwargs):212        """Helper to raise an event and capture any exceptions that may bubble213        from the event processor.214        """215        if self.on_event:216            try:217                self.on_event(name=name, **kwargs)...distributed_utils.py
Source:distributed_utils.py  
...23    world_size = torch.distributed.get_world_size()24    rank = torch.distributed.get_rank()25    if world_size == 1:26        return27    def _send_and_wait(r):28        if rank == r:29            tensor = torch.tensor(0, device="cuda")30        else:31            tensor = torch.tensor(1, device="cuda")32        torch.distributed.broadcast(tensor, r)33        while tensor.item() == 1:34            time.sleep(1)35    _send_and_wait(0)36    # now sync on the main process37    _send_and_wait(1)38def reduce_loss_dict(loss_dict):39    """40    Reduce the loss dictionary from all processes so that process with rank41    0 has the averaged results. Returns a dict with the same fields as42    loss_dict, after reduction.43    """44    world_size = get_world_size()45    if world_size < 2:46        return loss_dict47    with torch.no_grad():48        loss_names = []49        all_losses = []50        for k in sorted(loss_dict.keys()):51            loss_names.append(k)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
