Best Python code snippet using keyboard
TServer.py
Source:TServer.py  
...132    def __init__(self, *args):133        TServer.__init__(self, *args)134        self.children = []135    def serve(self):136        def try_close(file):137            try:138                file.close()139            except IOError as e:140                logger.warning(e, exc_info=True)141        self.serverTransport.listen()142        while True:143            client = self.serverTransport.accept()144            if not client:145                continue146            try:147                pid = os.fork()148                if pid:149                    self.children.append(pid)150                    self.collect_children()151                    itrans = self.inputTransportFactory.getTransport(client)152                    otrans = self.outputTransportFactory.getTransport(client)153                    try_close(itrans)154                    try_close(otrans)155                else:156                    itrans = self.inputTransportFactory.getTransport(client)157                    otrans = self.outputTransportFactory.getTransport(client)158                    iprot = self.inputProtocolFactory.getProtocol(itrans)159                    oprot = self.outputProtocolFactory.getProtocol(otrans)160                    ecode = 0161                    try:162                        try:163                            while True:164                                self.processor.process(iprot, oprot)165                        except TTransport.TTransportException:166                            pass167                        except Exception as e:168                            logger.exception(e)169                            ecode = 1170                    finally:171                        try_close(itrans)172                        try_close(otrans)173                    os._exit(ecode)174            except TTransport.TTransportException:175                pass176            except Exception as x:177                logger.exception(x)178    def collect_children(self):179        while self.children:180            try:181                pid, status = os.waitpid(0, os.WNOHANG)182            except os.error:183                pid = None184            if pid:185                self.children.remove(pid)186            else:...wrap_squid_base.py
Source:wrap_squid_base.py  
...15  #-----------16  def __enter__(self):17    return self18  def __del__(self):19    self.try_close()20  def __exit__(self, type, value, tb):21    self.try_close()22  #-----------23  def init(self, part) :24    self.part = part25    found = 026    try:27      path = os.path.dirname(os.path.realpath(__file__))28      source = open(path + '/wrap_squid.txt', 'r')29      for row in source :30        row = row.strip()31        if(len(row)) :32          t = row.split('=')33          if('ip' == t[0]) :34            self.serverHost = t[1]35            found += 136          elif('port' == t[0]) :37            self.serverPort = int(t[1])38            found += 139        if(2 == found) :40          break   41    except:42      print ('ERR no open source')43      return False44    return True45  #-----------46  def try_close(self) :47    if(None == self.s):48      return49    try:50      self.s.shutdown(SHUT_RDWR)51    except: 52      pass53    self.s.close()54    self.s = None55  #-----------56  def connect(self) :   57    result = False58    self.try_close()59    if(self.serverPort and not '' == self.serverHost) :60      try:61        self.s = socket(AF_INET, SOCK_STREAM)62        self.s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)63        self.s.connect((self.serverHost, self.serverPort))64        self.s.send(bytes(self.part + '\n', 'utf-8'))65        responce = self.s.recv(1024)66        responce = responce.decode("utf-8")67        if(not responce == 'ok-' + self.part + '\n') :68          self.try_close()69        else :70          result = True71      except Exception as e: 72        print("ERR " + str(e))73        self.try_close()74    return result75  #-----------76  def send(self, data) :77    if(None == self.s) :78      if(not self.connect()) :79        print("Err no Conn\n")80        return False81    try:82      sent = self.s.sendall(data)83      if(None == sent) :84        return True85      else:86        self.try_close()87        return False88    except:89      self.try_close()90    return False91  #-----------92  def recv(self) :93    try:94      data = self.s.recv(1024)95      if(not data) :96        self.try_close()97        return None      98      else :        99        return data100    except:101      self.try_close()102    return None      103#----------------------------------------104def conv_input(data) :105  data = data.encode("utf-8")106  #data = data.encode("iso8859-1")107  data = base64.b64encode(data) + b'\n';108  return data109#----------------------------------------110def conv_output(data) :111  return (data.decode("utf-8")).strip()112#----------------------------------------113def try_send(conn, line) :114  if(conn.send(line)) :115    data = conn.recv()116    if(not None == data) :117      data = conv_output(data)118      print(data)119      return True120  conn.try_close()121  return False122#----------------------------------------123def perform_wrap_squid(conn) :124  #conn.connect()125  for line in sys.stdin:126    if(not line) :127      break128    line = line.strip()129    if(len(line) > 0) :130      line = conv_input(line)131      try:132        if(not try_send(conn, line)) :133          time.sleep(0.2)134          if(not try_send(conn, line)) :135            print("ERR no send\n")136            conn.try_close()137            #return138      except:139        pass140#----------------------------------------141def wrap_squid_base(helper_text) :142  with connection() as conn :143    if(conn.init(helper_text)):144      perform_wrap_squid(conn)145      conn.try_close()...__init__.py
Source:__init__.py  
2from . import exceptions3PROTO = 'CAP/0.1.0'4BUFSIZ = 10245local_urls = []6def try_close(func):7    """Decorator to wrap function in a try/except for OSError.""" 8    def tc(*args, **kwarg):9        try:10            return func(*args, **kwarg)11        except (OSError, EOFError):12            raise exceptions.SocketClosed()13    return tc14def local(func):15    """Decorator cancel network if local_urls != []""" 16    def nop():17        pass18    def ll(*args, **kwarg):19        if local_urls == []:20            return func(*args, **kwarg)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
