Best Python code snippet using Airtest
servers_test.py
Source:servers_test.py  
...22LARGE = '{"kind": "large machine"}'23PONG = '{"kind": "pong.exe"}'24TETRIS = '{"kind": "tetris.exe"}'25PACMAN = '{"kind": "pacman.exe"}'26def setup_server(app, srv_id, data=None):27    if data:28        return app.post(29            '/v1/servers/{}'.format(srv_id),30            data=data,31            mimetype='application/json')32    else:33        return app.post(34            '/v1/servers/{}'.format(srv_id))35def setup_process(app, srv_id, prc_id, data=None):36    if data:37        return app.post(38            '/v1/servers/{}/processes/{}'.format(srv_id, prc_id),39            data=data,40            mimetype='application/json')41    else:42        return app.post(43            '/v1/servers/{}/processes/{}'.format(srv_id, prc_id))44class ServersTestCase(unittest.TestCase):45    def setUp(self):46        servers.app.testing = True47        self.app = servers.app.test_client()48    def tearDown(self):49        servers.reset()50    def test_empty_servers(self):51        rv = self.app.get('/v1/servers')52        self.assertTrue(rv.status_code == 200)53        self.assertTrue(b'{}' in rv.data)54    def test_empty_filter(self):55        rv = self.app.get('/v1/servers?filter=max_load')56        self.assertTrue(rv.status_code == 404)57    def test_create_server(self):58        rv = setup_server(self.app, 'srv001', SMALL)59        self.assertTrue(rv.status_code == 201)60        self.assertTrue(b'started Ok' in rv.data)61    def test_create_server_2ndp_conflict(self):62        setup_server(self.app, 'srv001', SMALL)63        rv = setup_server(self.app, 'srv001', LARGE)64        self.assertTrue(rv.status_code == 409)65    def test_create_server_2ndp_bad_request(self):66        setup_server(self.app, 'srv001', SMALL)67        rv = setup_server(self.app, 'srv002')68        self.assertTrue(rv.status_code == 400)69    def test_read_server(self):70        setup_server(self.app, 'srv001', SMALL)71        rv = self.app.get('/v1/servers/srv001')72        self.assertTrue(rv.status_code == 200)73        self.assertTrue(b'srv001' in rv.data)74    def test_read_server_2ndp_not_found(self):75        setup_server(self.app, 'srv001', SMALL)76        rv = self.app.get('/v1/servers/srv002')77        self.assertTrue(rv.status_code == 404)78    def test_read_servers(self):79        setup_server(self.app, 'srv001', SMALL)80        setup_server(self.app, 'srv007', LARGE)81        rv = self.app.get('/v1/servers')82        self.assertTrue(rv.status_code == 200)83        self.assertTrue(b'srv001' in rv.data)84        self.assertTrue(b'srv007' in rv.data)85    def test_update_server(self):86        setup_server(self.app, 'srv001', SMALL)87        rv = self.app.put('/v1/servers/srv001',88                          data=MEDIUM,89                          mimetype='application/json')90        self.assertTrue(rv.status_code == 200)91        self.assertTrue(b'reconfigured Ok' in rv.data)92        rv = self.app.get('/v1/servers/srv001')93        self.assertTrue(rv.status_code == 200)94        self.assertFalse(b'small' in rv.data)95        self.assertTrue(b'medium' in rv.data)96    def test_update_server_2ndp_not_found(self):97        setup_server(self.app, 'srv001', SMALL)98        rv = self.app.put('/v1/servers/srv002',99                          data='{"kind": "medium machine"}',100                          mimetype='application/json')101        self.assertTrue(rv.status_code == 404)102    def test_update_server_2ndp_bad_request(self):103        setup_server(self.app, 'srv001', SMALL)104        rv = self.app.put('/v1/servers/srv001')105        self.assertTrue(rv.status_code == 400)106    def test_delete_server(self):107        setup_server(self.app, 'srv001', SMALL)108        setup_server(self.app, 'srv007', SMALL)109        rv = self.app.delete('/v1/servers/srv001')110        self.assertTrue(rv.status_code == 200)111        self.assertTrue(b'stopped Ok' in rv.data)112        rv = self.app.get('/v1/servers')113        self.assertTrue(rv.status_code == 200)114        self.assertFalse(b'srv001' in rv.data)115        self.assertTrue(b'srv007' in rv.data)116    def test_delete_server_2ndp_not_found(self):117        setup_server(self.app, 'srv001', SMALL)118        setup_server(self.app, 'srv007', SMALL)119        rv = self.app.delete('/v1/servers/srv002')120        self.assertTrue(rv.status_code == 404)121class ProcessesTestCase(unittest.TestCase):122    def setUp(self):123        servers.app.testing = True124        self.app = servers.app.test_client()125    def tearDown(self):126        servers.reset()127    def test_empty_processes(self):128        setup_server(self.app, 'srv001', SMALL)129        rv = self.app.get('/v1/servers/srv001/processes')130        self.assertTrue(rv.status_code == 200)131        self.assertTrue(b'{}' in rv.data)132    def test_empty_processes_2ndp_not_found(self):133        setup_server(self.app, 'srv001', SMALL)134        rv = self.app.get('/v1/servers/srv002/processes')135        self.assertTrue(rv.status_code == 404)136    def test_create_process(self):137        setup_server(self.app, 'srv001', SMALL)138        rv = setup_process(self.app, 'srv001', 'proc001', TETRIS)139        self.assertTrue(rv.status_code == 201)140        self.assertTrue(b'launched Ok' in rv.data)141    def test_create_process_2ndp_conflict(self):142        setup_server(self.app, 'srv001', SMALL)143        setup_process(self.app, 'srv001', 'proc001', TETRIS)144        rv = setup_process(self.app, 'srv001', 'proc001', PONG)145        self.assertTrue(rv.status_code == 409)146    def test_create_process_2ndp_bad_request(self):147        setup_server(self.app, 'srv001', SMALL)148        setup_process(self.app, 'srv001', 'proc001', TETRIS)149        rv = setup_process(self.app, 'srv001', 'proc002')150        self.assertTrue(rv.status_code == 400)151    def test_read_process(self):152        setup_server(self.app, 'srv001', SMALL)153        setup_process(self.app, 'srv001', 'proc001', TETRIS)154        rv = self.app.get('/v1/servers/srv001/processes/proc001')155        self.assertTrue(rv.status_code == 200)156        self.assertTrue(b'tetris' in rv.data)157    def test_read_process_2ndp_not_found(self):158        setup_server(self.app, 'srv001', SMALL)159        setup_process(self.app, 'srv001', 'proc001', TETRIS)160        rv = self.app.get('/v1/servers/srv001/processes/proc002')161        self.assertTrue(rv.status_code == 404)162    def test_update_process(self):163        setup_server(self.app, 'srv001', SMALL)164        setup_process(self.app, 'srv001', 'proc001', TETRIS)165        rv = self.app.put('/v1/servers/srv001/processes/proc001',166                          data='{"kind": "pacman.exe"}',167                          mimetype='application/json')168        self.assertTrue(rv.status_code == 200)169        self.assertTrue(b'modified Ok' in rv.data)170        rv = self.app.get('/v1/servers/srv001')171        self.assertTrue(rv.status_code == 200)172        self.assertFalse(b'tetris' in rv.data)173        self.assertTrue(b'pacman' in rv.data)174    def test_update_process_2ndp_process_not_found(self):175        setup_server(self.app, 'srv001', SMALL)176        setup_process(self.app, 'srv001', 'proc001', TETRIS)177        rv = self.app.put('/v1/servers/srv001/processes/proc002',178                          data='{"kind": "pacman.exe"}',179                          mimetype='application/json')180        self.assertTrue(rv.status_code == 404)181    def test_update_process_2ndp_server_not_found(self):182        setup_server(self.app, 'srv001', SMALL)183        setup_process(self.app, 'srv001', 'proc001', TETRIS)184        rv = self.app.put('/v1/servers/srv002/processes/proc001',185                          data='{"kind": "pacman.exe"}',186                          mimetype='application/json')187        self.assertTrue(rv.status_code == 404)188    def test_update_process_2ndp_bad_request(self):189        setup_server(self.app, 'srv001', SMALL)190        setup_process(self.app, 'srv001', 'proc001', TETRIS)191        rv = self.app.put('/v1/servers/srv001/processes/proc001')192        self.assertTrue(rv.status_code == 400)193    def test_read_processes(self):194        setup_server(self.app, 'srv001', SMALL)195        setup_process(self.app, 'srv001', 'proc001', TETRIS)196        setup_process(self.app, 'srv001', 'proc002', PONG)197        rv = self.app.get('/v1/servers/srv001/processes')198        self.assertTrue(rv.status_code == 200)199        self.assertTrue(b'tetris' in rv.data)200        self.assertTrue(b'pong' in rv.data)201    def test_read_processes_2ndp_not_found(self):202        setup_server(self.app, 'srv001', SMALL)203        setup_process(self.app, 'srv001', 'proc001', TETRIS)204        setup_process(self.app, 'srv001', 'proc002', PONG)205        rv = self.app.get('/v1/servers/srv002/processes')206        self.assertTrue(rv.status_code == 404)207    def test_delete_process(self):208        setup_server(self.app, 'srv001', SMALL)209        setup_process(self.app, 'srv001', 'proc001', TETRIS)210        setup_process(self.app, 'srv001', 'proc002', PONG)211        rv = self.app.delete('/v1/servers/srv001/processes/proc001')212        self.assertTrue(rv.status_code == 200)213        self.assertTrue(b'halted Ok' in rv.data)214        rv = self.app.get('/v1/servers/srv001/processes')215        self.assertTrue(rv.status_code == 200)216        self.assertFalse(b'tetris' in rv.data)217        self.assertTrue(b'pong' in rv.data)218    def test_delete_process_2ndp_process_not_found(self):219        setup_server(self.app, 'srv001', SMALL)220        setup_process(self.app, 'srv001', 'proc001', TETRIS)221        rv = self.app.delete('/v1/servers/srv001/processes/proc002')222        self.assertTrue(rv.status_code == 404)223    def test_delete_process_2ndp_server_not_found(self):224        setup_server(self.app, 'srv001', SMALL)225        setup_process(self.app, 'srv001', 'proc001', TETRIS)226        rv = self.app.delete('/v1/servers/srv002/processes/proc001')227        self.assertTrue(rv.status_code == 404)228class ProcessesFiltersTestCase(unittest.TestCase):229    def setUp(self):230        servers.app.testing = True231        self.app = servers.app.test_client()232    def tearDown(self):233        servers.reset()234    def test_filter_max_load(self):235        setup_server(self.app, 'srv001', SMALL)236        setup_process(self.app, 'srv001', 'proc001', TETRIS)237        setup_process(self.app, 'srv001', 'proc002', PONG)238        setup_server(self.app, 'srv007', LARGE)239        setup_process(self.app, 'srv007', 'proc001', PACMAN)240        rv = self.app.get('/v1/servers?filter=max_load')241        self.assertTrue(rv.status_code == 200)242        self.assertTrue(b'srv001' in rv.data)243    def test_filter_max_load_2nd_graceful_unknown_server(self):244        setup_server(self.app, 'srv001', SMALL)245        setup_process(self.app, 'srv001', 'proc001', TETRIS)246        setup_process(self.app, 'srv001', 'proc002', PONG)247        setup_server(self.app, 'srv007', LARGE)248        setup_process(self.app, 'srv007', 'proc001', PACMAN)249        rv = self.app.get('/v1/servers?filter=unknown_filter')250        self.assertTrue(rv.status_code == 200)251        self.assertTrue(b'srv001' in rv.data)252        self.assertTrue(b'srv007' in rv.data)253    def test_filter_max_load_2nd_graceful_unknown_filter(self):254        setup_server(self.app, 'srv001', SMALL)255        setup_process(self.app, 'srv001', 'proc001', TETRIS)256        setup_process(self.app, 'srv001', 'proc002', PONG)257        setup_server(self.app, 'srv007', LARGE)258        setup_process(self.app, 'srv007', 'proc001', PACMAN)259        rv = self.app.get('/v1/servers?unknown_query=unknown_filter')260        self.assertTrue(rv.status_code == 200)261        self.assertTrue(b'srv001' in rv.data)262        self.assertTrue(b'srv007' in rv.data)263if __name__ == '__main__':...test_utils.py
Source:test_utils.py  
...16    ("ssl_certs", {"key": "value"}),17    ("latency", {"key": "value"}),18]19@pytest.fixture20def setup_server():21    server = Server(22        name="test_server",23        ip_address="1.1.1.1",24        domain_name="one.one.one.one",25        description="test_description",26        developer="test_developer",27        sysadmin="test_sysadmin",28        date_added=datetime.now(timezone("Europe/Dublin")),29        date_last_checked=datetime.now(timezone("Europe/Dublin")),30    )31    server.save()32    server.serverprofile = ServerProfile(33        server=server,34        is_up=True,...test_server.py
Source:test_server.py  
1from server import Server, User, Connection2from wireprotocol import WireProtocol, CMD3def setup_server(server):4    server.users = [User('u1'), User('u2'), User('u3')]5    conn1 = Connection('uuid1', None)6    conn2 = Connection('uuid2', None)7    conn3 = Connection('uuid3', None)8    server.users[0].login(conn1)9    server.users[1].login(conn2)10    server.users[2].login(conn3)11    server.connections = [conn1, conn2, conn3]12def test_conn2user():13    server = Server(host=None)14    setup_server(server)15    16    conn = server.connections[1]17    user = server.conn2user(conn)18    assert user.username == 'u2'19def test_conn2user_empty():20    server = Server(host=None)21    conn = Connection('uuid1', None)22    user = server.conn2user(conn)23    assert user is None24def test_conn2user_notpresent():25    server = Server(host=None)26    setup_server(server)27    conn = Connection('asdfasdf', None)28    user = server.conn2user(conn)29    assert user is None30def test_create():31    server = Server(host=None)32    setup_server(server)33    conn = Connection('newconn', None)34    server._process_create(conn, 'newuser')35    assert len(server.users) == 436    assert server.users[-1].username == 'newuser'37def test_create_failure():38    server = Server(host=None)39    setup_server(server)40    conn = Connection('newconn', None)41    server._process_create(conn, 'u2')42    assert len(server.users) == 343    assert conn.send_buffer[-len('username already exists'):] == b'username already exists'44def test_list():45    server = Server(host=None)46    setup_server(server)47    conn = Connection('newconn', None)48    server._process_list(conn, None)49    assert conn.send_buffer[-12:] == b'u1|||u2|||u3'50def test_list_wildcard():51    server = Server(host=None)52    setup_server(server)53    conn = Connection('newconn', None)54    server._process_list(conn, '*1')55    assert conn.send_buffer[-2:] == b'u1'56def test_send_immediately():57    server = Server(host=None)58    setup_server(server)59    conn = server.connections[1]60    server._process_send(conn, [server.users[1].username, server.users[2].username, 'test message'])61    assert conn.send_buffer[-22:] == b'u2|||u3|||test message'62    assert server.users[2].connection.send_buffer[-22:] == b'u2|||u3|||test message'63def test_send_notlogged():64    server = Server(host=None)65    setup_server(server)66    conn = Connection('newconn', None)67    server._process_send(conn, [server.users[1].username, server.users[2].username, 'test message'])68    wp = WireProtocol()69    wp.parse_incoming_bytes(conn.send_buffer)70    assert wp.command == CMD.RESPONSE # indicates error71def test_send_imposter():72    server = Server(host=None)73    setup_server(server)74    conn = server.connections[0]75    server._process_send(conn, [server.users[1].username, server.users[2].username, 'test message'])76    wp = WireProtocol()77    wp.parse_incoming_bytes(conn.send_buffer)78    assert wp.command == CMD.RESPONSE # indicates error79def test_send_notexists():80    server = Server(host=None)81    setup_server(server)82    conn = server.connections[1]83    server._process_send(conn, [server.users[1].username, 'asdfasdfadf', 'test message'])84    wp = WireProtocol()85    wp.parse_incoming_bytes(conn.send_buffer)86    assert wp.command == CMD.RESPONSE # indicates error87def test_send_later():88    server = Server(host=None)89    setup_server(server)90    server.users[2].logout()91    conn = server.connections[1]92    server._process_send(conn, [server.users[1].username, server.users[2].username, 'test message'])93    assert conn.send_buffer[-22:] == b'u2|||u3|||test message' # should still send to the sender94    assert len(server.users[2].undelivered_messages) == 195    assert server.users[2].undelivered_messages[0] == [server.users[1].username, server.users[2].username, 'test message']96def test_deliver_order():97    server = Server(host=None)98    setup_server(server)99    server.users[2].logout()100    # first send from u2 to u3101    conn = server.connections[1]102    server._process_send(conn, [server.users[1].username, server.users[2].username, 'test message!'])103    server._process_send(conn, [server.users[1].username, server.users[2].username, 'another test message'])104    # then login as u3 and get all the messages105    conn = server.connections[2]106    server.users[2].login(conn)107    assert len(server.users[2].undelivered_messages) == 2108    server._process_deliver(conn, None)109    wp = WireProtocol()110    wp.parse_incoming_bytes(conn.send_buffer)111    assert wp.command == CMD.SEND112    assert wp.data_buffer == b'u2|||u3|||test message!'113    114    remaining_bytes = wp.tmp_buffer115    wp.reset_buffers()116    wp.parse_incoming_bytes(remaining_bytes)117    assert wp.command == CMD.SEND118    assert wp.data_buffer == b'u2|||u3|||another test message'119def test_login():120    server = Server(host=None)121    setup_server(server)122    server.users[2].logout()123    conn = Connection('newconn', None)124    server._process_login(conn, server.users[2].username)125    wp = WireProtocol()126    wp.parse_incoming_bytes(conn.send_buffer)127    assert wp.command == CMD.RESPONSE128    assert wp.data_len == 0129def test_login_noaccount():130    server = Server(host=None)131    setup_server(server)132    server.users[2].logout()133    conn = Connection('newconn', None)134    server._process_login(conn, server.users[2].username+'asdfasdf')135    wp = WireProtocol()136    wp.parse_incoming_bytes(conn.send_buffer)137    assert wp.command == CMD.RESPONSE138    assert wp.data_len == len('username does not exist')139    assert wp.data_buffer == b'username does not exist'140def test_login_alreadylogged():141    server = Server(host=None)142    setup_server(server)143    conn = Connection('newconn', None)144    server._process_login(conn, server.users[2].username)145    wp = WireProtocol()146    wp.parse_incoming_bytes(conn.send_buffer)147    assert wp.command == CMD.RESPONSE148    assert wp.data_len == len('user is already logged in')...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
