Best Python code snippet using fMBT_python
__init__.py
Source:__init__.py  
...59            except (socket.error, IOError):60                pass61def _send(msg, destination, acquire_send_lock=True, pickle=True):62    if acquire_send_lock:63        _acquire_send_lock(destination)64    try:65        if pickle:66            cPickle.dump(msg, destination, 2)67        else:68            destination.write(msg)69        destination.flush()70    finally:71        if acquire_send_lock:72            _release_send_lock(destination)73_send.locks = {}74def _send_opt(msg, destination, recv_caps, acquire_send_lock=True):75    data = cPickle.dumps(msg, 2)76    data_length = len(data)77    if data_length < _SEND_OPT_MESSAGE_MIN:78        _send(data, destination, acquire_send_lock=acquire_send_lock, pickle=False)79        return None80    if recv_caps & messages.RECV_CAP_COMPRESSION:81        # Try if compressing makes sense. For instance, at least 20 %82        # compression could be required for the first block to compress83        # everything.84        compress_block_len = min(data_length, _SEND_OPT_COMPRESS_TRIAL)85        compressed_block = zlib.compress(data[:compress_block_len],86                                         _SEND_OPT_COMPRESSION_LEVEL)87        if len(compressed_block) < compress_block_len * _SEND_OPT_COMPRESS_MIN:88            _uncompressed_data_length = data_length89            if compress_block_len == data_length:90                # everything got compressed for trial91                data = compressed_block92            else:93                # only first bytes were compressed in trial, compress all now94                data = zlib.compress(data, _SEND_OPT_COMPRESSION_LEVEL)95            data_length = len(data)96            compression_info = "compressed(%s)" % (_uncompressed_data_length,)97        else:98            compression_info = "no_compression"99    else:100        compression_info = "no_compression"101    data_info = messages.Data_info(102        data_type="Exec_rv",103        data_length=data_length,104        data_format=compression_info + ",pickled,allinone")105    if acquire_send_lock:106        _acquire_send_lock(destination)107    try:108        _send(data_info, destination, acquire_send_lock=False)109        bytes_sent = 0110        while bytes_sent < data_length:111            block_len = min(_SEND_OPT_BLOCK_SIZE, data_length-bytes_sent)112            data_block = data[bytes_sent:bytes_sent+block_len]113            # this may raise socket.error, let it raise through114            destination.write(data_block)115            destination.flush()116            bytes_sent += block_len117    finally:118        _release_send_lock(destination)119    return data_info120def _recv(source, acquire_recv_lock=True):121    """returns the first message from source"""122    if acquire_recv_lock:123        _acquire_recv_lock(source)124    try:125        try:126            return cPickle.load(source)127        except (ValueError, cPickle.UnpicklingError), e:128            return messages.Unloadable(str(e))129        except EOFError:130            raise131        except socket.error, e:132            raise EOFError("socket.error: " + str(e))133        except AttributeError, e:134            # If another thread closes the connection between send/recv,135            # cPickle.load() may raise "'NoneType' has no attribute 'recv'".136            # Make this look like EOF (connection lost)137            raise EOFError(str(e))138        except Exception, e:139            return messages.Unloadable("load error %s: %s" % (type(e).__name__, e))140    finally:141        if acquire_recv_lock:142            _release_recv_lock(source)143_recv.locks = {}144def _recv_with_info(source, acquire_recv_lock=True):145    """returns the first payload message from source that may/may not be146    preceded by Data_info147    """148    if acquire_recv_lock:149        _acquire_recv_lock(source)150    try:151        msg = _recv(source, False)152        if not isinstance(msg, messages.Data_info):153            return msg154        data = source.read(msg.data_length)155        if len(data) != msg.data_length:156            raise EOFError()157        if "compressed(" in msg.data_format:158            data = zlib.decompress(data)159        try:160            return cPickle.loads(data)161        except (ValueError, cPickle.UnpicklingError), e:162            return messages.Unloadable(str(e))163        except Exception, e:164            return messages.Unloadable("load error %s: %s" % (type(e).__name__, e))165    finally:166        if acquire_recv_lock:167            _release_recv_lock(source)168def _forward(source, destination, data_length,169             acquire_send_lock=True,170             acquire_recv_lock=True):171    if acquire_recv_lock:172        _acquire_recv_lock(source)173    if acquire_send_lock:174        _acquire_send_lock(destination)175    destination_ok = True176    try:177        bytes_sent = 0178        bytes_read = 0179        forward_block_size = _SEND_OPT_BLOCK_SIZE180        while bytes_read < data_length:181            forward_block = source.read(182                min(forward_block_size, data_length - bytes_read))183            bytes_read += len(forward_block)184            if forward_block:185                if destination_ok:186                    try:187                        destination.write(forward_block)188                        destination.flush()189                        bytes_sent += len(forward_block)190                    except socket.error:191                        destination_ok = False192                        # must still keep reading everything from the source193            else:194                raise EOFError() # source run out of data195        return bytes_sent196    finally:197        if acquire_recv_lock:198            _release_recv_lock(source)199        if acquire_send_lock:200            _release_send_lock(destination)201def _acquire_recv_lock(source):202    if not source in _recv.locks:203        _recv.locks[source] = thread.allocate_lock()204    _recv.locks[source].acquire()205def _acquire_send_lock(destination):206    if not destination in _send.locks:207        _send.locks[destination] = thread.allocate_lock()208    _send.locks[destination].acquire()209def _release_recv_lock(source):210    try:211        _recv.locks[source].release()212    except thread.error:213        pass # already released214def _release_send_lock(destination):215    try:216        _send.locks[destination].release()217    except thread.error:218        pass # already released219_g_hooks = {}...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
