Best Python code snippet using fMBT_python
sessionManager.py
Source:sessionManager.py  
...94        except pexpect.EOF:95            if self.spawnProc.isalive():96                raise SessionManagerException("error starting %s" % command)97            else:98                self._postConnect()99        except Exception, e:100            msg = "session error: %s" % e101            self.disconnect()102            raise e103	    104	    105    def issueCommand(self,command, timeout=3, message=None):106        """107        Send a command to the process spawned by pexpect and and do108        not wait for anything.109        This command should not be used unless really necessary110        """111        p = self.spawnProc112        p.sendline(command)113        #self._checkCommandStatus()       114    115    # Renamed assertCommand from Command116    def assertCommand(self,command, expected=None, timeout=30, message=None):117        assertTimeout = int (globalVar.assertTimeout)118        if (timeout != 0) :119            assertTimeout = timeout120        ssh = self.spawnProc121        eatPrompt = True122        if not expected:123            eatPrompt = False124            expected = self.prompt125        126        if not message:127            message = 'command "%s" timed out waiting for %s' % \128                       (command, expected)129        130        expList.append(expected)    131        ssh.sendline(command)132        res = ssh.expect(expList, assertTimeout)133        134        if eatPrompt: 135            ssh.expect(self.prompt, assertTimeout)136            if (res != expList.index(expected)):137                self._postCheck(res, message)138        #try :    139        #    if eatPrompt: 140        #        ssh.expect(self.prompt, timeout)141        #except pexpect.TIMEOUT:142        #    raise SessionManagerException(message)143        return ssh.before144    def assertOutput(self, expected=None, timeout=5, message=None):145       """146       This functionsWait for the expected output from the spawned pexpect process.147       148       This method sends nothing to the process but expects output;149       useful in cases where some event other than direct command input150       is causing the process to react.151       If expect times out or end of file is received an AssertionError will be raised152       """153       assertTimeout = int (globalVar.assertTimeout)154       if (timeout != 0) :155           assertTimeout = timeout156       p = self.spawnProc157      158       #If any expected output is specified, append it to the List    159       if not expected:160           expected = self.prompt    161       expList.append(expected) 162       163       if not message :164           message = "Expected output %s not received" %expected165       166       # Wait for the output 167       result = p.expect(expList, assertTimeout)168       # If expected is true and the output is not expected, Call the _postCheck function169       if (result != expList.index(expected)):170          self._postCheck(result, message)171       expList.remove(expected)172       173    def filter(self, subcommand, pattern=None, delim=None):174        """175        Send a command and return filtered output.176        """177        #print "filter command:%s" %subcommand178        # Clear the buffer so that the output of the previous command(s) is179        # eaten up180        clear = False181        count = 0182        # Changed the code to support delimeter other than '#'183        if (not delim):184            prompt = self.prompt185        else:186            prompt = delim187        while(clear == False):188            res = self.spawnProc.expect([prompt,pexpect.TIMEOUT],.5)189            if (res == 1):190                clear = True191        output = self._sendAndTrim(subcommand,delim)192        #self._checkCommandStatus()193        return output194	        195    def disconnect(self):196        """197        Disconnect from the session.  If we are a subsession, close198        the spawned pexpect process.  This method assumes the199        subsession has sent whatever commands necessary to end itself,200        so we expect an EOF here before the close.201        Expect the process to be closing or already closed, which202        generates an EOF.  Look # for a 2-second timeout also.203        If no spawnProc is defined, return quietly so that callers don't204        have problems when calling disconnect() more than once.205        """206        if not self.spawnProc:207            return208        209        ssh = self.spawnProc210        try:211            ssh.expect([pexpect.EOF], self.sshTimeout)212        except OSError,e:213            ssh.kill(signal.SIGKILL)214            self.cleanUp()215        except pexpect.TIMEOUT:216            ssh.kill(signal.SIGKILL)217            self.cleanUp()218        except Exception, exc: 219            ssh.kill(signal.SIGKILL)220            self.cleanUp()221        self.cleanUp()222        223    def cleanUp(self):224        """Clean up all the resource created during the session creation"""225        self.isConnected=False226        self.spawnProc=None227           228    ############################################################229    # Internal methods below - may be overridden by descendents230    ############################################################231    def _postCheck (self, result, message=None, promptCheck=False):232        """ 233        This function does the error handling for the functions:234        assertOutput, assertCommand, filter.235	236        If error is end of file or timeout then AssertionError is raised237        with the message.238	   239        Arguments:240        result : The result of the p.expect command241        message : Message to be printed if the command failed242        """243        if not message:244            message = "Execution of command failed"245        if promptCheck :246            if (result == expList.index(self.prompt)):247                # got a prompt, want to save the prompt chunk so we can use248                # it later to trim command output.  do this by sending a249                # \r and cultivating the bare prompt.250                self.spawnProc.sendline("")251                self.spawnProc.expect(self.prompt)252                self._extractPChunk(self.spawnProc.before)253                expList.remove(self.prompt)254	    255    	  # If timeout occured, raise Assertion error256        if (result == expList.index(pexpect.TIMEOUT)):257            raise AssertionError('TIME OUT : %s '%message)258    	  # If End of file received, raise Assertion error259        elif (result == expList.index(pexpect.EOF)):260            raise AssertionError('End Of file received: %s '%message)261	262    def _checkCommandStatus(self, lastCommand=False):263        """Get the status of the last command.264        """265        p = self.spawnProc266        p.sendline('echo $?')267        regex = re.compile('^[0-9]+',re.M)268        p.expect(regex, 2)269        msg = '_checkCommandStatus : Execution of command FAILED'270       	if lastCommand:271    	    msg = '_checkCommandStatus :Execution of command : "%s" FAILED' %lastCommand272        if p.after != '0' and p.after != '99':273            raise AssertionError(msg)274        275    def _connect(self):276        """277        Run the command or spawn the process that is the basis for the278        session.  If we are a parent session, then create the new279        spawned process.  If we are a child, send the command to the280        existing subprocess.281        """282        if not self.isChild:283            msg = "SessionManager._connect: failed to spawn %s, timeout is : %s" % (self.command, self.sshTimeout)284            try:285                self.spawnProc = pexpect.spawn(self.command,286                             self.args, self.sshTimeout)287                if not self.spawnProc:288                    raise SessionManagerException(msg)289                self._postConnect()290                self.isConnected = True291            except pexpect.TIMEOUT:292                raise SessionManagerException("Timeout while " + msg)293            except pexpect.EOF:294                raise SessionManagerException("SessionManager._connect :End of File condition while " + msg)295            except Exception, exc:296                raise SessionManagerException('SessionManager._connect: caught %s' % exc)297        else:298            cmdline = self.command + ' ' + string.join(self.args,' ')299            self.spawnProc.sendline(cmdline)300            self.isConnected = True301	    302	    303    def _extractPChunk(self, line):304        """ Extract the prompt from the program output.  This is for305        use with (expect) functions that determine end-of-output by306        waiting for the command prompt.  The problem is that the307        prompt (or a piece of it) is left in the output.  The308        extracted prompt chunk is used later in the trim functions.309        310        """311        chunk = string.split(line,'\n')[1]312        self.promptChunk = chunk313    def _postConnect(self):314        """ Do whatever expect operations necessary to establish315        positive contact with the command after connecting.  If316        overriding in a descendent, this method must set the317        promptChunk variable if using the default _sendAndTrim().318        """319        p = self.spawnProc320        msg = "SessionManager._postConnect: failed to get prompt"321        expList.append(self.prompt)322        match = p.expect(expList, self.sshTimeout)323        self._postCheck(match,msg,True)324    325    def _sendAndTrim(self, command, delim=None):326        """327        General-purpose method that will send the command and trim328        the prompt from the output. 329        """330        assertTimeout = int (globalVar.assertTimeout)331        p = self.spawnProc332        p.sendline(command)333        a=p.readline()334        #print a335	if (not delim):336	    prompt = self.prompt337	else:338	    prompt = delim339        expList.append(prompt) 340        341        result = p.expect(expList,assertTimeout)342        if (result != 2) :343            self._postCheck(result)344        # at this point, we have the output but also the command and345        # part of the prompt.  get rid of the prompt chunk.346        if (not delim):347	    promptChunk = self.promptChunk348	else:349	    promptChunk = delim350        output = re.sub(promptChunk, '', p.before)351        output = re.sub(command+'\r\n', '', output)352        return output353    354class SSH(SessionManager):355    """Set up an SSH session.356    Note that brackets ("[", "]") are NOT ALLOWED in the prompt string357    on the target host.  Brackets will screw up the trim functions358    because they are list operators in Python.  Beware of any other359    characters in the prompt that might confuse this class.360    The first argument should be in the format login@ip.361    The prompt MUST end with '$' or '#', followed by a space.  This is a362    typical default for most shells, except maybe the C-shell varieties,363    which are not endorsed by The Creator."""364    365    def __init__(self, args, parent=None,ctxt=None):366        if not args: args = []367        if type(args) is types.StringType:368	    # Code added for checking if the login 369	    # arguments are of the type "user@<remoteip>"370	    # if not then by default the user is taken as "root"371	    # and the login arguments become "root@<ip>372            #Added by ND373            # Commented by Akanksha374            # While loggin in to the local shell this isnt provided375            # Remove this or add better handling376            377	    #logindetails = args.split("@")378            #self.ipaddr = logindetails[1]379            args = [args]380            if (args[0].find('@') == -1):381                user = "root"382                login = user + '@' + args[0]383                args[0] = login384        self.longComand = None385        super(SSH, self).__init__("ssh", args, "[#$] ", parent,context=ctxt)386        387    def disconnect(self):388        """Send an exit to the remote shell and give it a chance to389        finish up before calling the parent disconnect, which closes390        the pexpect subprocess."""391        p = self.spawnProc392        p.sendline("exit")393        super(SSH, self).disconnect()394    def runLong(self, command):395        """Run a command that is expected to run for a long time,396        like 'tail -f'."""397        self.longCommand = command398        self.spawnProc.sendline(command)399    def stopLong(self, reject=False):400        """Stop a command started by runLongCmd.401        Returns any output generated by the command or a timeout402        string.  If reject is true, bail out after the command is403        stopped. 404        TODO  The '^C' string does not occur in terminal output on405        TODO  Linux, therefore the regex substitution will fail to find406        TODO  a match and you'll get the prompt in the output.  This can407        TODO  be fixed for Linux by doing a uname check and modifying408        TODO  the trailingJunk string accordingly.409        TODO  The output is not completely clean.  Tests have shown a410        TODO  leading space and a trailing newline.  Callers can411        TODO  strip() the output, so not a high priority.  Caveat412        TODO  emptor!413        """414        if self.longCommand:415            p = self.spawnProc416            #print 'stopLong: sending ctrl-c'417            p.send(CTRL_C)418            match = p.expect([self.prompt,419                              pexpect.TIMEOUT], 2)420            if match == 0:421		if reject: return422                trailingJunk = '\^C' + '\r\n' + self.promptChunk423                output = re.sub(self.longCommand+'\r\n', '', p.before)424                output = re.sub(trailingJunk, '', output)425                return output426            else:427                return "timed out"428    def _postConnect(self):429        """430        This function performs the error checking for the ssh specific 431        connections. 432	433        The matching of the prompt received when ssh command is executed is done against434        different pexpect error conditions.435        Exceptions are raised based on the error condition of SSH connection scenarios.436	437        """438        #timeout = 5439        p = self.spawnProc440        list = [self.prompt,"ssh:", "[Pp]assword: ", "\? ", 441	        "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@", 442        pexpect.EOF,pexpect.TIMEOUT]...AbstractConnection.py
Source:AbstractConnection.py  
...54        MUST be overloaded, but may be just 'return true'.55        """56        raise NotImplementedError("_initConnection() not implemented in " +57                                  " abstract class AbstractConnection")58    def _postConnect(self):59        """60        MAY call _connectionEstablished(), but for some heavier protocols61        like Mumble, it may be too early for that.62        called before entering the listening loop.63        returning False or raising an error will immediately close the64        connection, causing it to _not_ enter the listening loop.65        MAY be overloaded.66        """67        self._connectionEstablished()68        return True69    def _closeConnection(self):70        """71        SHOULD close the socket/file.72        returns True on success, and if the connection was open beforehand.73        MUST be overloaded.74        """75        raise NotImplementedError("_closeConnection() not implemented in " +76                                  "abstract class AbstractConnection")77    def _listen(self):78        """79        Called from the main listening loop.80        SHOULD read data from socket/file, and send responses.81        return False or raise an error if listening fails.82        the loop will be cleanly terminated in that case.83        MUST be overloaded.84        """85        raise NotImplementedError("_listen() not implemented in " +86                                  "abstract class AbstractConnection")87    def _sendMessageUnsafe(self, message):88        """89        SHOULD send the message via the connection's socket/file.90        return False or raise an error if the sending fails.91        MUST be overloaded.92        """93        raise NotImplementedError("_sendMessageUnsafe() not implemented in " +94                                  "abstract class AbstractConnection")95    def _sendTextMessageUnsafe(self, message):96        """97        Sends a text message.98        return False or raise an error if the sending has failed.99        SHOULD add the neccesary 'text message' headers to the message,100        and call _sendMessage().101        MUST be overloaded.102        """103        raise NotImplementedError("sendTextMessage() not implemented in " +104                                  "abstract class AbstractConnection")105    # the following methods SHOULD NOT be overloaded.106    def registerTextCallback(self, function):107        self._textCallback.append(function)108    def registerConnectionEstablishedCallback(self, function):109        self._connectionEstablishedCallback.append(function)110    def registerConnectionLostCallback(self, function):111        self._connectionLostCallback.append(function)112    def registerConnectionFailedCallback(self, function):113        self._connectionFailedCallback.append(function)114    def _invokeTextCallback(self, sender, message):115        for f in self._textCallback:116            f(sender, message)117    def _invokeConnectionEstablishedCallback(self):118        for f in self._connectionEstablishedCallback:119            f()120    def _invokeConnectionLostCallback(self):121        for f in self._connectionLostCallback:122            f()123    def _invokeConnectionFailedCallback(self):124        for f in self._connectionFailedCallback:125            f()126    def start(self):127        """128        call this to start the connection, as a thread.129        """130        thread.start_new_thread(self.run, ())131    def stop(self):132        """133        call this to terminate the connection.134        """135        self._connected = False136        self._established = False137    def _connectionEstablished(self):138        """139        MUST be called manually, as soon as the connection is ready to140        transmit text messages.141        """142        if not self._connected:143            raise Exception("connection can't be established, since it's " +144                            "not even connected")145        self._established = True146        self._invokeConnectionEstablishedCallback()147    def run(self):148        """149        opens and initializes the connection, contains the listening loop,150        and closes the connection.151        """152        try:153            if not self._openConnection():154                raise Exception("unknown error")155        except:156            self._log("connection could not be opened:\n" +157                      str(sys.exc_info()[0]), 0)158            self._log(traceback.format_exc(), 1)159            self._invokeConnectionFailedCallback()160            return161        else:162            self._log("connection successfully opened", 2)163        try:164            if not self._initConnection():165                raise Exception("unknown error")166        except:167            self._logException("initial packages could not be sent", 1)168            try:169                self._closeConnection()170            except:171                pass172            self._invokeConnectionFailedCallback()173            return174        else:175            self._log("initial packages successfully sent", 2)176        # we can now consider ourselves connected.177        # please note that the connection does not count as 'established' yet,178        # as authorization may still be required.179        # call _connectionEstablished() yourself.180        self._connected = True181        try:182            # ... for example from _postConnect()!183            if not self._postConnect():184                raise Exception("postConnect error")185            # you may even call it from inside _listen() once that auth186            # confirm arrives.187            while self._connected:188                if not self._listen():189                    raise Exception("listening error")190        except:191            self._logException("connection terminated with error", 0)192        else:193            self._log("connection terminated without error", 1)194        self._established = False195        self._connected = False196        # try to close the file/socket, in case it's still open.197        try:...dbpool.py
Source:dbpool.py  
1# $Id$2import weakref as _weakref3import Queue as _Queue4import thread as _thread5import time as _time6import atexit as _atexit7_log_level = 08_log_name = "/tmp/dbpool.log"9_log_file = None10_log_lock = _thread.allocate_lock()11apilevel = "2.0"12threadsafety = 213_dbmod = None14_lock = _thread.allocate_lock()15_refs = {}16_COPY_ATTRS = ("paramstyle", "Warning", "Error", "InterfaceError",17  "DatabaseError", "DataError", "OperationalError", "IntegrityError",18  "InternalError", "ProgrammingError", "NotSupportedError")19def _log(level, message, *args, **kwargs):20  global _log_file21  if _log_level >= level:22    if args or kwargs:23      argslist = [repr(arg) for arg in args]24      argslist.extend("%s=%r" % item for item in kwargs.items())25      message += "(" + ", ".join(argslist) + ")"26    _log_lock.acquire()27    try:28      if not _log_file:29        _log_file = open(_log_name, "a", 1)30      _log_file.write("%s %s\n" % (_time.strftime("%b %d %H:%M:%S"), message))31    finally:32      _log_lock.release()33def set_database(dbmod, minconns, timeout=0, postconnect=None):34  if minconns < 1:35    raise ValueError("minconns must be greater than or equal to 1")36  if _dbmod is not None:37    if _dbmod is dbmod:38      return39    raise Exception("dbpool module is already in use")40  if len(dbmod.apilevel) != 3 or dbmod.apilevel[:2] != "2." or \41    not dbmod.apilevel[2].isdigit():42    raise ValueError("specified database module is not DB API 2.0 compliant")43  if dbmod.threadsafety < 1:44    raise ValueError("specified database module must have threadsafety level"45      " of at least 1")46  _log(1, "set_database", dbmod.__name__, minconns, timeout)47  g = globals()48  g["_dbmod"] = dbmod49  g["_available"] = {}50  g["_minconns"] = minconns51  g["_timeout"] = timeout52  g["_postconnect"] = postconnect53  for v in _COPY_ATTRS:54    g[v] = getattr(dbmod, v)55def connect(*args, **kwargs):56  if _dbmod is None:57    raise Exception("No database module has been specified")58  key = repr(args) + "\0" + repr(kwargs)59  _log(1, "connect", *args, **kwargs)60  try:61    while True:62      conn = _available[key].get(0)63      if _timeout == 0 or _time.time() - conn._lastuse < _timeout:64        _log(2, "connect: returning connection %r from _available" % conn)65        return conn66      else:67        conn._inner._connection = None68        _log(2, "connect: discarded connection %r from _available due to age" %69          conn)70  except (KeyError, _Queue.Empty):71    conn = _Connection(None, None, *args, **kwargs)72    _log(2, "connect: created new connection %r" % conn)73    return conn74def _make_available(conn):75  key = repr(conn._args) + "\0" + repr(conn._kwargs)76  _log(2, "_make_available", conn)77  _lock.acquire()78  try:79    try:80      _available[key].put(conn, 0)81      _log(3, "_make_available: put into existing _available slot")82    except KeyError:83      _log(3, "_make_available: created new _available slot")84      q = _Queue.Queue(_minconns)85      q.put(conn, 0)86      _available[key] = q87    except _Queue.Full:88      conn._inner._connection = None89      _log(3, "_make_available: discarded, _available slot full")90  finally:91    _lock.release()92def _connection_notinuse(ref):93  # if the Python interpreter is exiting, the globals might already have94  # been deleted, so check for them explicitly95  if _refs is None:96    return97  inner = _refs[ref]98  del _refs[ref]99  inner._cursorref = None100  if inner._connection is not None:101    if _make_available is not None and _Connection is not None:102      _make_available(_Connection(inner))103class _Connection(object):104  def __init__(self, inner, *args, **kwargs):105    self._inner = None106    _log(4, "_Connection", self, inner, *args, **kwargs)107    if inner is None:108      self._inner = _InnerConnection(*args, **kwargs)109      _log(5, "_Connection: new inner=%r" % self._inner)110    else:111      self._inner = inner112    self._inner._outerref = _weakref.ref(self)113    ref = _weakref.ref(self, _connection_notinuse)114    _log(5, "_Connection: ref=%r" % ref)115    _refs[ref] = self._inner116  def __repr__(self):117    return "<dbpool._Connection(%r) at %x>" % (self._inner, id(self))118  def cursor(self, *args, **kwargs):119    # this method would not be necessary (i.e. the __getattr__ would take120    # care of it) but if someone does dbpool.connect().cursor() all in one121    # expression, the outer _Connection class was getting garbage-collected122    # (and hence the actual database connection being put back in the pool)123    # *in the middle of the expression*, i.e. after connect() was called but124    # before cursor() was called. So you could end up with 2 cursors on the125    # same database connection.126    return self._inner.cursor(*args, **kwargs)127  def __getattr__(self, attr):128    return getattr(self._inner, attr)129class _InnerConnection(object):130  def __init__(self, connection, *args, **kwargs):131    self._connection = None132    _log(4, "_InnerConnection", self, connection, *args, **kwargs)133    self._args = args134    self._kwargs = kwargs135    if connection is None:136      _log(2, "_InnerConnection: Calling actual connect", *args, **kwargs)137      self._connection = _dbmod.connect(*args, **kwargs)138      if _postconnect:139        _postconnect(self._connection, *args, **kwargs)140    else:141      _log(5, "_InnerConnection: Re-using connection %r" % connection)142      self._connection = connection143    self._cursorref = None144    self._outerref = None145    self._lock = _thread.allocate_lock()146    self._lastuse = _time.time()147  def __repr__(self):148    return "<dbpool._InnerConnection(%r) at %x>" % (self._connection, id(self))149  def close(self):150    _log(3, "_Connection.close", self)151    if self._cursorref is not None:152      c = self._cursorref()153      if c is not None:154        _log(4, "_Connection.close: closing cursor %r" % c)155        c.close()156    self._cursorref = None157    self._outerref = None158    conn = self._connection159    if conn:160      self._connection = None161      if _make_available is not None:162        _make_available(_Connection(None, conn, *self._args, **self._kwargs))163  def __getattr__(self, attr):164    return getattr(self._connection, attr)165  def cursor(self, *args, **kwargs):166    _log(3, "cursor", self, *args, **kwargs)167    if _timeout == 0 or _time.time() - self._lastuse < _timeout:168      self._lock.acquire()169      try:170        if self._cursorref is None or self._cursorref() is None:171          c = _Cursor(self, *args, **kwargs)172          self._cursorref = _weakref.ref(c)173          self._lastuse = _time.time()174          return c175      finally:176        self._lock.release()177    _log(3, "cursor: creating new connection")178    return connect(*self._args, **self._kwargs).cursor(*args, **kwargs)179class _Cursor(object):180  def __init__(self, connection, *args, **kwargs):181    self._cursor = None182    _log(4, "_Cursor", connection, *args, **kwargs)183    self._connection = connection184    self._outer = connection._outerref()185    self._cursor = connection._connection.cursor(*args, **kwargs)186  def __repr__(self):187    return "<dbpool._Cursor(%r) at %x>" % (self._cursor, id(self))188  def close(self):189    _log(4, "_Cursor.close", self)190    self._connection._cursorref = None191    self._connection = None192    self._cursor.close()193    self._outer = None194  def __getattr__(self, attr):195    return getattr(self._cursor, attr)196def _exiting():197  global _make_available198  _make_available = None...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
