How to use call_on_drop method in fMBT

Best Python code snippet using fMBT_python

server.py

Source:server.py Github

copy

Full Screen

...130 if opt_debug:131 daemon_log("on disconnect rv: %s" % (rv,))132 if setter_conn_id == conn_id:133 self._on_disconnect.remove((conn_id, code))134 def call_on_drop(self):135 for code in self._on_drop:136 exec_msg = messages.Exec(self.ns, code, None)137 if opt_debug:138 daemon_log("on drop: %s" % (exec_msg,))139 rv = _local_execute(exec_msg)140 if opt_debug:141 daemon_log("on drop rv: %s" % (rv,))142 def read_rv(self, async_rv):143 """Return and remove asynchronous return value.144 """145 if self.ns != async_rv.ns:146 raise ValueError("Namespace mismatch")147 if (async_rv.ns in _g_async_rvs and148 async_rv.rvid in _g_async_rvs[async_rv.ns]):149 rv = _g_async_rvs[async_rv.ns][async_rv.rvid]150 if not isinstance(rv, pythonshare.InProgress):151 del _g_async_rvs[async_rv.ns][async_rv.rvid]152 return rv153 else:154 raise ValueError('Invalid return value id: "%s"'155 % (async_rv.rvid,))156 def poll_rvs(self):157 """Returns list of Async_rv instances that are ready for reading.158 """159 rv = []160 for rvid, value in _g_async_rvs[self.ns].iteritems():161 if not isinstance(value, pythonshare.InProgress):162 rv.append(messages.Async_rv(self.ns, rvid))163 return rv164class Pythonshare_rns(object):165 """Remote namespace"""166 def __init__(self, conn, to_remote, from_remote):167 self.conn = conn168 self.to_remote = to_remote169 self.from_remote = from_remote170 def __del__(self):171 pythonshare._close(self.conn, self.to_remote, self.from_remote)172_g_local_namespaces = {}173# client-id -> set of namespaces174_g_namespace_users = {}175_g_executing_pythonshare_conn_id = None176# _g_remote_namespaces: namespace -> Connection to origin177_g_remote_namespaces = {}178# _g_namespace_exports: namespace -> list of Connections to which the179# namespace (remote or local) has been exported. If the namespace is180# deleted (or connection to origin is lost), these Connection objects181# are to be notified.182_g_namespace_exports = {}183_g_local_namespace_locks = {}184_g_async_rvs = {}185_g_async_rv_counter = 0186_g_server_shutdown = False187def _init_local_namespace(ns, init_code=None, force=False):188 if not ns in _g_local_namespaces:189 if opt_allow_new_namespaces or force:190 daemon_log('added local namespace "%s"' % (ns,))191 _g_local_namespaces[ns] = {192 "pythonshare_ns": Pythonshare_ns(ns),193 "Async_rv": pythonshare.messages.Async_rv194 }195 _g_local_namespace_locks[ns] = thread.allocate_lock()196 _g_async_rvs[ns] = {}197 else:198 raise ValueError('Unknown namespace "%s"' % (ns,))199 if init_code != None:200 try:201 exec init_code in _g_local_namespaces[ns]202 except Exception, e:203 daemon_log('namespace "%s" init error in <string>:\n%s\n\n%s' % (204 ns, code2string(init_code), exception2string(sys.exc_info())))205def _drop_local_namespace(ns):206 daemon_log('drop local namespace "%s"' % (ns,))207 _g_local_namespaces[ns]["pythonshare_ns"].call_on_drop()208 del _g_local_namespaces[ns]209 del _g_local_namespace_locks[ns]210 del _g_async_rvs[ns]211 # send notification to all connections in _g_namespace_exports[ns]?212def _drop_remote_namespace(ns):213 daemon_log('drop remote namespace "%s"' % (ns,))214 try:215 rns = _g_remote_namespaces[ns]216 del _g_remote_namespaces[ns]217 rns.__del__()218 except KeyError:219 pass # already dropped220 # send notification to all connections in _g_namespace_exports[ns]?221def _init_remote_namespace(ns, conn, to_remote, from_remote):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run fMBT automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful