How to use test_wait_for_request method in Playwright Python

Best Python code snippet using playwright-python

Run Playwright Python automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

test_slimta_relay_http.py

Source: test_slimta_relay_http.py Github

copy
1import unittest2 as unittest
2from mox3.mox import MoxTestBase, IsA
3from gevent.event import AsyncResult
4from gevent import Timeout
5
6from slimta.envelope import Envelope
7from slimta.util.deque import BlockingDeque
8from slimta.util.pycompat import urlparse
9from slimta.smtp.reply import Reply
10from slimta.relay import PermanentRelayError, TransientRelayError
11from slimta.relay.http import HttpRelay, HttpRelayClient
12
13
14class TestHttpRelay(unittest.TestCase, MoxTestBase):
15
16    def test_add_client(self):
17        static = HttpRelay('http://testurl')
18        ret = static.add_client()
19        self.assertIsInstance(ret, HttpRelayClient)
20
21
22class TestHttpRelayClient(unittest.TestCase, MoxTestBase):
23
24    def setUp(self):
25        super(TestHttpRelayClient, self).setUp()
26        self.queue = self.mox.CreateMock(BlockingDeque)
27        class FakeRelay(object):
28            queue = self.queue
29            idle_timeout = None
30            url = urlparse.urlsplit('http://testurl:8025/path/info')
31            tls = None
32            http_verb = 'POST'
33            sender_header = 'X-Envelope-Sender'
34            recipient_header = 'X-Envelope-Recipient'
35            ehlo_header = 'X-Ehlo'
36            ehlo_as = 'test'
37            timeout = 10.0
38            idle_timeout = 10.0
39        self.client = HttpRelayClient(FakeRelay())
40        self.result = self.mox.CreateMock(AsyncResult)
41        self.env = Envelope('[email protected]', ['[email protected]', '[email protected]'])
42        self.env.parse(b'Header: value\r\n\r\ntest message\r\n')
43
44    def test_wait_for_request(self):
45        self.mox.StubOutWithMock(self.client, '_handle_request')
46        self.queue.popleft().AndReturn((5, 10))
47        self.client._handle_request(5, 10)
48        self.mox.ReplayAll()
49        self.client._wait_for_request()
50
51    def test_wait_for_request_timeout(self):
52        self.mox.StubOutWithMock(self.client, '_handle_request')
53        self.client.conn = self.mox.CreateMockAnything()
54        self.queue.popleft().AndReturn((None, None))
55        self.client.conn.close()
56        self.mox.ReplayAll()
57        self.client._wait_for_request()
58
59    def test_handle_request(self):
60        self.mox.StubOutWithMock(self.client, '_process_response')
61        conn = self.client.conn = self.mox.CreateMockAnything()
62        self.client.ehlo_as = 'test'
63        conn.putrequest('POST', '/path/info')
64        conn.putheader(b'Content-Length', b'31')
65        conn.putheader(b'Content-Type', b'message/rfc822')
66        conn.putheader(b'X-Ehlo', b'test')
67        conn.putheader(b'X-Envelope-Sender', b'c2VuZGVyQGV4YW1wbGUuY29t')
68        conn.putheader(b'X-Envelope-Recipient', b'cmNwdDFAZXhhbXBsZS5jb20=')
69        conn.putheader(b'X-Envelope-Recipient', b'cmNwdDJAZXhhbXBsZS5jb20=')
70        conn.endheaders(b'Header: value\r\n\r\n')
71        conn.send(b'test message\r\n')
72        conn.getresponse().AndReturn(13)
73        self.client._process_response(13, 21)
74        self.mox.ReplayAll()
75        self.client._handle_request(21, self.env)
76
77    def test_parse_smtp_reply_header(self):
78        http_res = self.mox.CreateMockAnything()
79        http_res.getheader('X-Smtp-Reply', '').AndReturn('250; message="2.0.0 Ok"')
80        http_res.getheader('X-Smtp-Reply', '').AndReturn('550; message="5.0.0 Nope" command="smtpcmd"')
81        http_res.getheader('X-Smtp-Reply', '').AndReturn('asdf')
82        self.mox.ReplayAll()
83        reply1 = self.client._parse_smtp_reply_header(http_res)
84        self.assertEqual('250', reply1.code)
85        self.assertEqual('2.0.0 Ok', reply1.message)
86        self.assertEqual(None, reply1.command)
87        reply2 = self.client._parse_smtp_reply_header(http_res)
88        self.assertEqual('550', reply2.code)
89        self.assertEqual('5.0.0 Nope', reply2.message)
90        self.assertEqual('smtpcmd', reply2.command)
91        reply3 = self.client._parse_smtp_reply_header(http_res)
92        self.assertEqual(None, reply3)
93
94    def test_process_response_200(self):
95        http_res = self.mox.CreateMockAnything()
96        http_res.status = '200'
97        http_res.reason = 'OK'
98        http_res.getheader('X-Smtp-Reply', '').AndReturn('250; message="2.0.0 Ok"')
99        http_res.getheaders()
100        self.result.set(Reply('250', '2.0.0 Ok'))
101        self.mox.ReplayAll()
102        self.client._process_response(http_res, self.result)
103
104    def test_process_response_400_with_smtp_reply(self):
105        http_res = self.mox.CreateMockAnything()
106        http_res.status = '400'
107        http_res.reason = 'Bad Request'
108        http_res.getheader('X-Smtp-Reply', '').AndReturn('550; message="5.0.0 Nope"')
109        http_res.getheaders()
110        self.result.set_exception(IsA(PermanentRelayError))
111        self.mox.ReplayAll()
112        self.client._process_response(http_res, self.result)
113
114    def test_process_response_400(self):
115        http_res = self.mox.CreateMockAnything()
116        http_res.status = '400'
117        http_res.reason = 'Bad Request'
118        http_res.getheader('X-Smtp-Reply', '').AndReturn('')
119        http_res.getheaders()
120        self.result.set_exception(IsA(PermanentRelayError))
121        self.mox.ReplayAll()
122        self.client._process_response(http_res, self.result)
123
124    def test_process_response_500(self):
125        http_res = self.mox.CreateMockAnything()
126        http_res.status = '500'
127        http_res.reason = 'Internal Server Error'
128        http_res.getheader('X-Smtp-Reply', '').AndReturn('')
129        http_res.getheaders()
130        self.result.set_exception(IsA(TransientRelayError))
131        self.mox.ReplayAll()
132        self.client._process_response(http_res, self.result)
133
134    def test_run(self):
135        self.mox.StubOutWithMock(self.client, '_wait_for_request')
136        self.client.conn = self.mox.CreateMockAnything()
137        self.client._wait_for_request()
138        self.client._wait_for_request().AndRaise(Timeout)
139        self.client.conn.close()
140        self.mox.ReplayAll()
141        self.client._run()
142
143
144# vim:et:fdm=marker:sts=4:sw=4:ts=4
145
Full Screen

Accelerate Your Automation Test Cycles With LambdaTest

Leverage LambdaTest’s cloud-based platform to execute your automation tests in parallel and trim down your test execution time significantly. Your first 100 automation testing minutes are on us.

Try LambdaTest

Run Python Tests on LambdaTest Cloud Grid

Execute automation tests with Playwright Python on a cloud-based Grid of 3000+ real browsers and operating systems for both web and mobile applications.

Test now for Free
LambdaTestX

We use cookies to give you the best experience. Cookies help to provide a more personalized experience and relevant advertising for you, and web analytics for us. Learn More in our Cookies policy, Privacy & Terms of service

Allow Cookie
Sarah

I hope you find the best code examples for your project.

If you want to accelerate automated browser testing, try LambdaTest. Your first 100 automation testing minutes are FREE.

Sarah Elson (Product & Growth Lead)