How to use get_header method in fMBT

Best Python code snippet using fMBT_python

test_worker.py

Source:test_worker.py Github

copy

Full Screen

...7import tornado.testing8logging.disable(logging.WARNING)9class RegisterGraderEndpointsTest(BaseTest):10 def test_register(self):11 self.assertIsNotNone(self.register_worker(self.get_header()))12 def test_duplicate_id(self):13 worker_id = "duplicate"14 self.register_worker(self.get_header(), worker_id=worker_id, expected_code=200)15 self.register_worker(self.get_header(), worker_id=worker_id, expected_code=400)16 def test_reregister_id(self):17 worker_id = self.register_worker(self.get_header(), expected_code=200)18 time.sleep(HEARTBEAT_INTERVAL * 2 + 1)19 self.register_worker(self.get_header(), worker_id=worker_id, expected_code=200)20 def test_unauthorized(self):21 self.register_worker(None, 401)22 def test_wrong_token(self):23 self.register_worker(self.get_header("invalid"), 401)24class PollGradingJobEndpointsTest(BaseTest):25 def test_unauthorized(self):26 worker_id = self.register_worker(self.get_header())27 self.assertEqual(self.poll_job(worker_id, None), 401)28 def test_wrong_token(self):29 worker_id = self.register_worker(self.get_header())30 self.assertEqual(self.poll_job(worker_id, self.get_header("invalid")), 401)31 def test_invalid_worker_id(self):32 self.assertEqual(self.poll_job("1234", self.get_header()), 401)33 def test_empty_poll(self):34 worker_id = self.register_worker(self.get_header())35 self.assertEqual(self.poll_job(worker_id, self.get_header()), 498)36class UpdateGradingJobEndpointsTest(BaseTest):37 def test_unauthorized(self):38 worker_id = self.register_worker(self.get_header())39 self.post_job_result(worker_id, None, "1234", True, 401)40 def test_wrong_token(self):41 worker_id = self.register_worker(self.get_header())42 self.post_job_result(worker_id, self.get_header("invalid"), "1234", True, 401)43 def test_invalid_worker_id(self):44 self.post_job_result("1234", self.get_header(), "1234", True, 401)45class HeartBeatEndpointsTest(BaseTest):46 def test_unauthorized(self):47 worker_id = self.register_worker(self.get_header())48 self.send_heartbeat(worker_id, None, 401)49 def test_wrong_token(self):50 worker_id = self.register_worker(self.get_header())51 self.send_heartbeat(worker_id, self.get_header("fake"), 401)52 def test_invalid_worker_id(self):53 self.send_heartbeat("1234", self.get_header(), 401)54 def test_valid_heartbeat(self):55 worker_id = self.register_worker(self.get_header())56 self.send_heartbeat(worker_id, self.get_header())57class WorkerWSEndpointTest(BaseTest):58 @tornado.testing.gen_test59 async def test_decode_error(self):60 async with self.worker_ws_conn(61 worker_id="test_worker", headers=self.get_header()62 ) as conn:63 try:64 await conn.send("i'm not json")65 except Exception as e:66 self.assertEqual(e.code, 1011)67 # submit job result before registering68 @tornado.testing.gen_test69 async def test_bad_job_result(self):70 async with self.worker_ws_conn(71 worker_id="test_worker", headers=self.get_header()72 ) as conn:73 try:74 await conn.send(75 json.dumps(76 {77 "type": "job_result",78 "args": {79 "grading_job_id": "someid",80 "success": True,81 "results": [{"res": "spoof"}],82 "logs": {"stdout": "stdout", "stderr": "stderr"},83 },84 }85 )86 )87 await conn.recv()88 except Exception as e:89 self.assertEqual(e.code, 1002)90 @tornado.testing.gen_test91 async def test_register(self):92 async with self.worker_ws_conn(93 worker_id="test_worker", headers=self.get_header()94 ) as conn:95 await conn.send(96 json.dumps({"type": "register", "args": {"hostname": "eniac"}})97 )98 ack = json.loads(await conn.recv())99 self.assertTrue(ack["success"])100 @tornado.testing.gen_test101 async def test_pong(self):102 async with self.worker_ws_conn(103 worker_id="test_worker", headers=self.get_header()104 ) as conn:105 await conn.send(106 json.dumps({"type": "register", "args": {"hostname": "eniac"}})107 )108 ack = json.loads(await conn.recv())109 self.assertTrue(ack["success"])110 await conn.pong()111 @tornado.testing.gen_test112 async def test_no_token(self):113 async with self.worker_ws_conn(worker_id="test_worker", headers=None) as conn:114 try:115 await conn.send(116 json.dumps({"type": "register", "args": {"hostname": "eniac"}})117 )118 ack = json.loads(await conn.recv())119 self.assertFalse(ack["success"])120 except websockets.exceptions.ConnectionClosed as e:121 self.assertEqual(e.code, 1008)122 @tornado.testing.gen_test123 async def test_wrong_token(self):124 async with self.worker_ws_conn(125 worker_id="test_worker", headers=self.get_header("invalid")126 ) as conn:127 try:128 await conn.send(129 json.dumps({"type": "register", "args": {"hostname": "eniac"}})130 )131 ack = json.loads(await conn.recv())132 self.assertFalse(ack["success"])133 except websockets.exceptions.ConnectionClosed as e:134 self.assertEqual(e.code, 1008)135 @tornado.testing.gen_test136 async def test_duplicate_token(self):137 async with self.worker_ws_conn(138 worker_id="test_worker", headers=self.get_header()139 ) as conn1:140 await conn1.send(141 json.dumps({"type": "register", "args": {"hostname": "eniac"}})142 )143 # worker 1 should successfully register144 ack = json.loads(await conn1.recv())145 self.assertTrue(ack["success"])146 async with self.worker_ws_conn(147 worker_id="test_worker", headers=self.get_header()148 ) as conn2:149 try:150 await conn2.send(151 json.dumps({"type": "register", "args": {"hostname": "eniac"}})152 )153 # worker 2 should fail154 ack = json.loads(await conn2.recv())155 self.assertFalse(ack["success"])156 except websockets.exceptions.ConnectionClosed as e:157 self.assertEqual(e.code, 1002)158 @tornado.testing.gen_test159 async def test_reregister(self):160 async with self.worker_ws_conn(161 worker_id="test_worker", headers=self.get_header()162 ) as conn1:163 await conn1.send(164 json.dumps({"type": "register", "args": {"hostname": "eniac"}})165 )166 # worker 1 should succeed167 ack = json.loads(await conn1.recv())168 self.assertTrue(ack["success"])169 async with self.worker_ws_conn(170 worker_id="test_worker", headers=self.get_header()171 ) as conn2:172 await conn2.send(173 json.dumps({"type": "register", "args": {"hostname": "eniac"}})174 )175 # worker 2 should also succeed176 ack = json.loads(await conn2.recv())177 self.assertTrue(ack["success"])178 @tornado.testing.gen_test179 async def test_wrong_job_id(self):180 async with self.worker_ws_conn(181 worker_id="test_worker", headers=self.get_header()182 ) as conn:183 await conn.send(184 json.dumps({"type": "register", "args": {"hostname": "eniac"}})185 )186 ack = json.loads(await conn.recv())187 self.assertTrue(ack["success"])188 try:189 await conn.send(190 json.dumps(191 {192 "type": "job_result",193 "args": {194 "grading_job_id": "no_such_id",195 "success": True,...

Full Screen

Full Screen

handlers.py

Source:handlers.py Github

copy

Full Screen

...34 container["name"] = c35 i = i + 136 resp.append(container)37 self.send_json(resp)38 def get_header(self, header):39 if header in self.request.headers:40 return self.request.headers[header]41 else:42 return None43class ContainerHandler(ObjectStorageBaseHandler):44 def get(self, account, container):45 limit = self.get_argument("limit", None)46 marker = self.get_argument("marker", None)47 end_marker = self.get_argument("end_marker", None)48 prefix = self.get_argument("prefix", None)49 format = self.get_argument("format", None)50 delimiter = self.get_argument("delimiter", None)51 path = self.get_argument("path", None)52 objects = self.p.queryObjects(account, container, limit,53 marker, end_marker, prefix, format, delimiter, path)54 resp = [55 {56 "hash":o.key,57 "last_modified":"",58 "bytes":"",59 "name":o.key,60 "content_type":""61 }62 for o in objects63 ]64 self.send_json(resp)65 def put(self, account, container):66 x_container_read = self.get_header("X-Container-Read")67 x_container_write = self.get_header("X-Container-Write")68 x_container_sync_to = self.get_header("X-Container-Sync-To")69 x_container_sync_key = self.get_header("X-Container-Sync-Key")70 x_versions_location = self.get_header("X-Versions-Location")71 x_container_meta_name = self.get_header("X-Container-Meta-name")72 content_type = self.get_header("Content-Type")73 x_detect_content_type = self.get_header("X-Detect-Content-Type")74 x_container_meta_tempurl_key = self.get_header("X-Container-Meta-Tempurl-Key")75 x_container_meta_tempurl_key_2 = self.get_header("X-Container-Meta-Tempurl-Key-2")76 x_trans_id_extra = self.get_header("X-Trans-Id-Extra")77 self.p.createContainer(account, container,78 x_container_read,79 x_container_write,80 x_container_sync_to,81 x_container_sync_key,82 x_versions_location,83 x_container_meta_name,84 content_type,85 x_detect_content_type,86 x_container_meta_tempurl_key,87 x_container_meta_tempurl_key_2,88 x_trans_id_extra)89 def delete(self, account, container):90 x_container_meta_tempurl_key = self.get_header("X-Container-Meta-Tempurl-Key")91 x_container_meta_tempurl_key_2 = self.get_header("X-Container-Meta-Tempurl-Key-2")92 x_trans_id_extra = self.get_header("X-Trans-Id-Extra")93 if self.p.deleteContainer(account, container,94 x_container_meta_tempurl_key,95 x_container_meta_tempurl_key_2,96 x_trans_id_extra):97 self.set_status(200)98 else:99 self.set_status(400)100class ObjectHandler(ObjectStorageBaseHandler):101 """102 def prepare(self):103 #if http method is COPY, call self.copy()104 pass105 """106 def put(self, account, container, object_):107 multipart_manifest = self.get_argument("multipart-manifest", None)108 temp_url_sig = self.get_argument("temp_url_sig", None)109 temp_url_expires = self.get_argument("temp_url_expires", None)110 filename = self.get_argument("filename", None)111 x_object_manifest = self.get_header("X-Object-Manifest")112 content_length = self.get_header("Content-Length")113 transfer_encoding = self.get_header("Transfer-Encoding")114 content_type = self.get_header("Content-Type")115 x_detect_content_type = self.get_header("X-Detect-Content-Type")116 x_copy_from = self.get_header("X-Copy-From")117 etag = self.get_header("ETag")118 content_disposition = self.get_header("Content-Disposition")119 content_encoding = self.get_header("Content-Encoding")120 x_delete_at = self.get_header("X-Delete-At")121 x_delete_after = self.get_header("X-Delete-After")122 x_object_meta_name = self.get_header("X-Object-Meta-name")123 if_none_match = self.get_header("If-None-Match")124 x_trans_id_extra = self.get_header("X-Trans-Id-Extra")125 self.p.createObject(account, container, object_,126 multipart_manifest,127 temp_url_sig,128 temp_url_expires,129 filename,130 x_object_manifest,131 content_length,132 transfer_encoding,133 content_type,134 x_detect_content_type,135 x_copy_from,136 etag,137 content_disposition,138 content_encoding,139 x_delete_at,140 x_delete_after,141 x_object_meta_name,142 if_none_match,143 x_trans_id_extra)144 def copy(self, account, container, object_):145 destination = self.get_header("Destination")146 content_type = self.get_header("Destination")147 content_encoding = self.get_header("Destination")148 content_disposition = self.get_header("Destination")149 x_object_meta_name = self.get_header("Destination")150 x_fresh_metadata = self.get_header("Destination")151 x_trans_id_extra = self.get_header("X-Trans-Id-Extra")152 if self.copyObject(account, container, object_,153 destination,154 content_type,155 content_encoding,156 content_disposition,157 x_object_meta_name,158 x_fresh_metadata,159 x_trans_id_extra):160 self.set_status(201)161 else:162 self.set_status(400)163 return164 def delete(self, account, container, object_):165 multipart_manifest = self.get_argument("multipart-manifest", None)166 x_trans_id_extra = self.get_header("X-Trans-Id-Extra")...

Full Screen

Full Screen

02.py

Source:02.py Github

copy

Full Screen

1# 参考文档:https://www.cnblogs.com/crazyrunning/p/7095014.html2# https://www.cnblogs.com/miyauchi-renge/p/10922092.html3class Minix1:4 """该混合类为header列表末尾添加data1"""5 def get_header(self):6 print('run Minix1.get_header')7 ctx = super().get_header()8 ctx.append('data1')9 return ctx10class Minix2:11 """该混合类为header列表头部添加data2"""12 def get_header(self):13 print('run Minix2.get_header')14 ctx = super().get_header()15 ctx.insert(0, 'data2')16 return ctx17class Header:18 header = []19 def get_header(self):20 print('run Headers.get_header')21 return self.header if self.header else []22class Final(Minix1, Minix2, Header):23 def get_header(self):24 return super().get_header()25print(Final.mro())26#[Final, Minix1, Minix2, Header, object]27header = Final().get_header()28#run Minix1.get_header29#run Minix2.get_header30#run Headers.get_header31print(header)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run fMBT automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful