Best Python code snippet using sure_python
test_worker.py
Source:test_worker.py  
...7import tornado.testing8logging.disable(logging.WARNING)9class RegisterGraderEndpointsTest(BaseTest):10    def test_register(self):11        self.assertIsNotNone(self.register_worker(self.get_header()))12    def test_duplicate_id(self):13        worker_id = "duplicate"14        self.register_worker(self.get_header(), worker_id=worker_id, expected_code=200)15        self.register_worker(self.get_header(), worker_id=worker_id, expected_code=400)16    def test_reregister_id(self):17        worker_id = self.register_worker(self.get_header(), expected_code=200)18        time.sleep(HEARTBEAT_INTERVAL * 2 + 1)19        self.register_worker(self.get_header(), worker_id=worker_id, expected_code=200)20    def test_unauthorized(self):21        self.register_worker(None, 401)22    def test_wrong_token(self):23        self.register_worker(self.get_header("invalid"), 401)24class PollGradingJobEndpointsTest(BaseTest):25    def test_unauthorized(self):26        worker_id = self.register_worker(self.get_header())27        self.assertEqual(self.poll_job(worker_id, None), 401)28    def test_wrong_token(self):29        worker_id = self.register_worker(self.get_header())30        self.assertEqual(self.poll_job(worker_id, self.get_header("invalid")), 401)31    def test_invalid_worker_id(self):32        self.assertEqual(self.poll_job("1234", self.get_header()), 401)33    def test_empty_poll(self):34        worker_id = self.register_worker(self.get_header())35        self.assertEqual(self.poll_job(worker_id, self.get_header()), 498)36class UpdateGradingJobEndpointsTest(BaseTest):37    def test_unauthorized(self):38        worker_id = self.register_worker(self.get_header())39        self.post_job_result(worker_id, None, "1234", True, 401)40    def test_wrong_token(self):41        worker_id = self.register_worker(self.get_header())42        self.post_job_result(worker_id, self.get_header("invalid"), "1234", True, 401)43    def test_invalid_worker_id(self):44        self.post_job_result("1234", self.get_header(), "1234", True, 401)45class HeartBeatEndpointsTest(BaseTest):46    def test_unauthorized(self):47        worker_id = self.register_worker(self.get_header())48        self.send_heartbeat(worker_id, None, 401)49    def test_wrong_token(self):50        worker_id = self.register_worker(self.get_header())51        self.send_heartbeat(worker_id, self.get_header("fake"), 401)52    def test_invalid_worker_id(self):53        self.send_heartbeat("1234", self.get_header(), 401)54    def test_valid_heartbeat(self):55        worker_id = self.register_worker(self.get_header())56        self.send_heartbeat(worker_id, self.get_header())57class WorkerWSEndpointTest(BaseTest):58    @tornado.testing.gen_test59    async def test_decode_error(self):60        async with self.worker_ws_conn(61            worker_id="test_worker", headers=self.get_header()62        ) as conn:63            try:64                await conn.send("i'm not json")65            except Exception as e:66                self.assertEqual(e.code, 1011)67    # submit job result before registering68    @tornado.testing.gen_test69    async def test_bad_job_result(self):70        async with self.worker_ws_conn(71            worker_id="test_worker", headers=self.get_header()72        ) as conn:73            try:74                await conn.send(75                    json.dumps(76                        {77                            "type": "job_result",78                            "args": {79                                "grading_job_id": "someid",80                                "success": True,81                                "results": [{"res": "spoof"}],82                                "logs": {"stdout": "stdout", "stderr": "stderr"},83                            },84                        }85                    )86                )87                await conn.recv()88            except Exception as e:89                self.assertEqual(e.code, 1002)90    @tornado.testing.gen_test91    async def test_register(self):92        async with self.worker_ws_conn(93            worker_id="test_worker", headers=self.get_header()94        ) as conn:95            await conn.send(96                json.dumps({"type": "register", "args": {"hostname": "eniac"}})97            )98            ack = json.loads(await conn.recv())99            self.assertTrue(ack["success"])100    @tornado.testing.gen_test101    async def test_pong(self):102        async with self.worker_ws_conn(103            worker_id="test_worker", headers=self.get_header()104        ) as conn:105            await conn.send(106                json.dumps({"type": "register", "args": {"hostname": "eniac"}})107            )108            ack = json.loads(await conn.recv())109            self.assertTrue(ack["success"])110            await conn.pong()111    @tornado.testing.gen_test112    async def test_no_token(self):113        async with self.worker_ws_conn(worker_id="test_worker", headers=None) as conn:114            try:115                await conn.send(116                    json.dumps({"type": "register", "args": {"hostname": "eniac"}})117                )118                ack = json.loads(await conn.recv())119                self.assertFalse(ack["success"])120            except websockets.exceptions.ConnectionClosed as e:121                self.assertEqual(e.code, 1008)122    @tornado.testing.gen_test123    async def test_wrong_token(self):124        async with self.worker_ws_conn(125            worker_id="test_worker", headers=self.get_header("invalid")126        ) as conn:127            try:128                await conn.send(129                    json.dumps({"type": "register", "args": {"hostname": "eniac"}})130                )131                ack = json.loads(await conn.recv())132                self.assertFalse(ack["success"])133            except websockets.exceptions.ConnectionClosed as e:134                self.assertEqual(e.code, 1008)135    @tornado.testing.gen_test136    async def test_duplicate_token(self):137        async with self.worker_ws_conn(138            worker_id="test_worker", headers=self.get_header()139        ) as conn1:140            await conn1.send(141                json.dumps({"type": "register", "args": {"hostname": "eniac"}})142            )143            # worker 1 should successfully register144            ack = json.loads(await conn1.recv())145            self.assertTrue(ack["success"])146            async with self.worker_ws_conn(147                worker_id="test_worker", headers=self.get_header()148            ) as conn2:149                try:150                    await conn2.send(151                        json.dumps({"type": "register", "args": {"hostname": "eniac"}})152                    )153                    # worker 2 should fail154                    ack = json.loads(await conn2.recv())155                    self.assertFalse(ack["success"])156                except websockets.exceptions.ConnectionClosed as e:157                    self.assertEqual(e.code, 1002)158    @tornado.testing.gen_test159    async def test_reregister(self):160        async with self.worker_ws_conn(161            worker_id="test_worker", headers=self.get_header()162        ) as conn1:163            await conn1.send(164                json.dumps({"type": "register", "args": {"hostname": "eniac"}})165            )166            # worker 1 should succeed167            ack = json.loads(await conn1.recv())168            self.assertTrue(ack["success"])169        async with self.worker_ws_conn(170            worker_id="test_worker", headers=self.get_header()171        ) as conn2:172            await conn2.send(173                json.dumps({"type": "register", "args": {"hostname": "eniac"}})174            )175            # worker 2 should also succeed176            ack = json.loads(await conn2.recv())177            self.assertTrue(ack["success"])178    @tornado.testing.gen_test179    async def test_wrong_job_id(self):180        async with self.worker_ws_conn(181            worker_id="test_worker", headers=self.get_header()182        ) as conn:183            await conn.send(184                json.dumps({"type": "register", "args": {"hostname": "eniac"}})185            )186            ack = json.loads(await conn.recv())187            self.assertTrue(ack["success"])188            try:189                await conn.send(190                    json.dumps(191                        {192                            "type": "job_result",193                            "args": {194                                "grading_job_id": "no_such_id",195                                "success": True,...handlers.py
Source:handlers.py  
...34            container["name"] = c35            i = i + 136            resp.append(container)37        self.send_json(resp)38    def get_header(self, header):39        if header in self.request.headers:40            return self.request.headers[header]41        else:42            return None43class ContainerHandler(ObjectStorageBaseHandler):44    def get(self, account, container):45        limit = self.get_argument("limit", None)46        marker = self.get_argument("marker", None)47        end_marker = self.get_argument("end_marker", None)48        prefix = self.get_argument("prefix", None)49        format = self.get_argument("format", None)50        delimiter = self.get_argument("delimiter", None)51        path = self.get_argument("path", None)52        objects = self.p.queryObjects(account, container, limit,53                marker, end_marker, prefix, format, delimiter, path)54        resp = [55            {56                "hash":o.key,57                "last_modified":"",58                "bytes":"",59                "name":o.key,60                "content_type":""61            }62            for o in objects63        ]64        self.send_json(resp)65    def put(self, account, container):66        x_container_read = self.get_header("X-Container-Read")67        x_container_write = self.get_header("X-Container-Write")68        x_container_sync_to = self.get_header("X-Container-Sync-To")69        x_container_sync_key = self.get_header("X-Container-Sync-Key")70        x_versions_location = self.get_header("X-Versions-Location")71        x_container_meta_name = self.get_header("X-Container-Meta-name")72        content_type = self.get_header("Content-Type")73        x_detect_content_type = self.get_header("X-Detect-Content-Type")74        x_container_meta_tempurl_key = self.get_header("X-Container-Meta-Tempurl-Key")75        x_container_meta_tempurl_key_2 = self.get_header("X-Container-Meta-Tempurl-Key-2")76        x_trans_id_extra = self.get_header("X-Trans-Id-Extra")77        self.p.createContainer(account, container,78                x_container_read,79                x_container_write,80                x_container_sync_to,81                x_container_sync_key,82                x_versions_location,83                x_container_meta_name,84                content_type,85                x_detect_content_type,86                x_container_meta_tempurl_key,87                x_container_meta_tempurl_key_2,88                x_trans_id_extra)89    def delete(self, account, container):90        x_container_meta_tempurl_key = self.get_header("X-Container-Meta-Tempurl-Key")91        x_container_meta_tempurl_key_2 = self.get_header("X-Container-Meta-Tempurl-Key-2")92        x_trans_id_extra = self.get_header("X-Trans-Id-Extra")93        if self.p.deleteContainer(account, container,94                x_container_meta_tempurl_key,95                x_container_meta_tempurl_key_2,96                x_trans_id_extra):97            self.set_status(200)98        else:99            self.set_status(400)100class ObjectHandler(ObjectStorageBaseHandler):101    """102    def prepare(self):103        #if http method is COPY, call self.copy()104        pass105    """106    def put(self, account, container, object_):107        multipart_manifest = self.get_argument("multipart-manifest", None)108        temp_url_sig = self.get_argument("temp_url_sig", None)109        temp_url_expires = self.get_argument("temp_url_expires", None)110        filename = self.get_argument("filename", None)111        x_object_manifest = self.get_header("X-Object-Manifest")112        content_length = self.get_header("Content-Length")113        transfer_encoding = self.get_header("Transfer-Encoding")114        content_type = self.get_header("Content-Type")115        x_detect_content_type = self.get_header("X-Detect-Content-Type")116        x_copy_from = self.get_header("X-Copy-From")117        etag = self.get_header("ETag")118        content_disposition = self.get_header("Content-Disposition")119        content_encoding = self.get_header("Content-Encoding")120        x_delete_at = self.get_header("X-Delete-At")121        x_delete_after = self.get_header("X-Delete-After")122        x_object_meta_name = self.get_header("X-Object-Meta-name")123        if_none_match = self.get_header("If-None-Match")124        x_trans_id_extra = self.get_header("X-Trans-Id-Extra")125        self.p.createObject(account, container, object_,126                multipart_manifest,127                temp_url_sig,128                temp_url_expires,129                filename,130                x_object_manifest,131                content_length,132                transfer_encoding,133                content_type,134                x_detect_content_type,135                x_copy_from,136                etag,137                content_disposition,138                content_encoding,139                x_delete_at,140                x_delete_after,141                x_object_meta_name,142                if_none_match,143                x_trans_id_extra)144    def copy(self, account, container, object_):145        destination = self.get_header("Destination")146        content_type = self.get_header("Destination")147        content_encoding = self.get_header("Destination")148        content_disposition = self.get_header("Destination")149        x_object_meta_name = self.get_header("Destination")150        x_fresh_metadata = self.get_header("Destination")151        x_trans_id_extra = self.get_header("X-Trans-Id-Extra")152        if self.copyObject(account, container, object_,153                destination,154                content_type,155                content_encoding,156                content_disposition,157                x_object_meta_name,158                x_fresh_metadata,159                x_trans_id_extra):160            self.set_status(201)161        else:162            self.set_status(400)163            return164    def delete(self, account, container, object_):165        multipart_manifest = self.get_argument("multipart-manifest", None)166        x_trans_id_extra = self.get_header("X-Trans-Id-Extra")...02.py
Source:02.py  
1# åèææ¡£ï¼https://www.cnblogs.com/crazyrunning/p/7095014.html2# https://www.cnblogs.com/miyauchi-renge/p/10922092.html3class Minix1:4	"""该混å类为headerå表æ«å°¾æ·»å data1"""5	def get_header(self):6		print('run Minix1.get_header')7		ctx = super().get_header()8		ctx.append('data1')9		return ctx10class Minix2:11	"""该混å类为headeråè¡¨å¤´é¨æ·»å data2"""12	def get_header(self):13		print('run Minix2.get_header')14		ctx = super().get_header()15		ctx.insert(0, 'data2')16		return ctx17class Header:18	header = []19	def get_header(self):20		print('run Headers.get_header')21		return self.header if self.header else []22class Final(Minix1, Minix2, Header):23	def get_header(self):24		return super().get_header()25print(Final.mro())26#[Final, Minix1, Minix2, Header, object]27header = Final().get_header()28#run Minix1.get_header29#run Minix2.get_header30#run Headers.get_header31print(header)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
