How to use set_test_class method in autotest

Best Python code snippet using autotest_python

concurrent_amqp_test_client.py

Source:concurrent_amqp_test_client.py Github

copy

Full Screen

...45 log("conne err: %s %s" % (args, kwargs))46 def run_after_connection(self):47 log.info("run after connect")48 self.test_class(self)49 def set_test_class(self, kls):50 log.info("test class setted %s" % kls)51 self.test_class = kls52class TestWSClient(object):53 def __init__(self, queue_manager, username, sess_id=None):54 self.message_callbacks = {}55 self.message_stack = {}56 self.user = UserModel.objects.get(username=username)57 self.request = type('MockWSRequestObject', (object,), {'remote_ip': '127.0.0.1'})58 self.queue_manager = queue_manager59 self.sess_id = sess_id or uuid.uuid4().hex60 sys.sessid_to_userid[self.sess_id] = self.user.key.lower()61 self.queue_manager.register_websocket(self.sess_id, self)62 self.login_user()63 # mimic tornado ws object64 # zengine.tornado_server.ws_to_queue.QueueManager will call write_message() method65 self.write_message = self.backend_to_client66 def login_user(self):67 session = Session(self.sess_id)68 current = Current(session=session, input={})69 current.auth.set_user(self.user)70 Login(current)._do_upgrade()71 def backend_to_client(self, body):72 """73 from backend to client74 """75 try:76 body = json_decode(body)77 if 'callbackID' in body:78 self.message_stack[body['callbackID']] = body79 self.message_callbacks[body['callbackID']](body)80 elif 'cmd' in body:81 self.message_callbacks[body['cmd']](body)82 except:83 import traceback84 print("\nException BODY: %s \n" % pformat(body))85 traceback.print_exc()86 log.info("WRITE MESSAGE TO CLIENT:\n%s" % (pformat(body),))87 def client_to_backend(self, message, callback, caller_fn_name):88 """89 from client to backend90 """91 cbid = uuid.uuid4().hex92 message = json_encode({"callbackID": cbid, "data": message})93 def cb(res):94 print("API Request: %s :: " % caller_fn_name, end='')95 result = callback(res, message)96 if ConcurrentTestCase.stc == callback and not result:97 FAIL = 'FAIL'98 else:99 FAIL = '--> %s' % callback.__name__100 print('PASS' if result else FAIL)101 # self.message_callbacks[cbid] = lambda res: callable(res, message)102 self.message_callbacks[cbid] = cb103 log.info("GOT MESSAGE FOR BACKEND %s: %s" % (self.sess_id, message))104 self.queue_manager.redirect_incoming_message(self.sess_id, message, self.request)105class ConcurrentTestCase(object):106 """107 Extend this class, define your test methods with "test_" prefix.108 """109 def __init__(self, queue_manager):110 log.info("ConcurrentTestCase class init with %s" % queue_manager)111 self.cmds = {}112 self.register_cmds()113 self.queue_manager = queue_manager114 self.clients = {}115 self.make_client('ulakbus')116 self.run_tests()117 def make_client(self, username):118 """119 Args:120 username: username for this client instance121 Returns:122 Logged in TestWSClient instance for given username123 """124 self.clients[username] = TestWSClient(self.queue_manager, username)125 def post(self, username, data, callback=None):126 if username not in self.clients:127 self.make_client(username)128 self.clients[username].message_callbacks.update(self.cmds)129 callback = callback or self.stc130 view_name = data['view'] if 'view' in data else sys._getframe(1).f_code.co_name131 self.clients[username].client_to_backend(data, callback, view_name)132 def register_cmds(self):133 for name in sorted(self.__class__.__dict__):134 if name.startswith("cmd_"):135 self.cmds[name[4:]] = getattr(self, name)136 def run_tests(self):137 for name in sorted(self.__class__.__dict__):138 if name.startswith("test_"):139 try:140 getattr(self, name)()141 except:142 import traceback143 traceback.print_exc()144 def process_error_reponse(self, resp):145 if 'error' in resp:146 print(resp['error'].replace('\\n','\n').replace('u\\', ''))147 return True148 def stc(self, response, request=None):149 """150 STC means Success Test Callback. Looks for 200 or 201 codes in response code.151 Args:152 response:153 request:154 """155 try:156 if not response['code'] in (200, 201):157 print("FAILED: Response not successful: \n")158 if not self.process_error_reponse(response):159 print("\nRESP:\n%s")160 print("\nREQ:\n %s" % (response, request))161 else:162 return True163 except Exception as e:164 log.exception("\n===========>\nFAILED API REQUEST\n<===========\n%s\n" % e)165 log.info("Response: \n%s\n\n" % response)166 def pstc(self, response, request=None):167 """168 Same as self.stc() (success request callback) but printing response/request169 for debugging purposes170 Args:171 response:172 request:173 """174 self.stc(response, request)175 print("\n\n=================\n\nRESPONSE: %s \n\nREQUEST: %s\n" % (response, request))176def main():177 from tornado import ioloop178 # initiate amqp manager179 ioloop = ioloop.IOLoop.instance()180 qm = TestQueueManager(io_loop=ioloop)181 # initiate test case182 qm.set_test_class(ConcurrentTestCase)183 qm.connect()184 ioloop.start()185if __name__ == '__main__':...

Full Screen

Full Screen

messaging_tests.py

Source:messaging_tests.py Github

copy

Full Screen

...42 # initiate amqp manager43 ioloop = ioloop.IOLoop.instance()44 qm = TestQueueManager(io_loop=ioloop)45 # initiate test case46 qm.set_test_class(TestCase)47 qm.connect()48 ioloop.start()49if __name__ == '__main__':...

Full Screen

Full Screen

base_splitter.py

Source:base_splitter.py Github

copy

Full Screen

...15 def set_working_dir(self, d):16 self.working_dir = d17 def set_train_class(self, tc):18 self.train_class = tc19 def set_test_class(self, tc):20 self.test_class = tc21 def set_split_size(self, split):22 self.split_size = split23 def set_other_config(self, config):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful