How to use write_line method in pytest-benchmark

Best Python code snippet using pytest-benchmark

test_smtpd.py

Source:test_smtpd.py Github

copy

Full Screen

...36 server = smtpd.SMTPServer((socket_helper.HOST, 0), ('b', 0),37 decode_data=True)38 conn, addr = server.accept()39 channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)40 def write_line(line):41 channel.socket.queue_recv(line)42 channel.handle_read()43 write_line(b'HELO example')44 write_line(b'MAIL From:eggs@example')45 write_line(b'RCPT To:spam@example')46 write_line(b'DATA')47 self.assertRaises(NotImplementedError, write_line, b'spam\r\n.\r\n')48 def test_decode_data_and_enable_SMTPUTF8_raises(self):49 self.assertRaises(50 ValueError,51 smtpd.SMTPServer,52 (socket_helper.HOST, 0),53 ('b', 0),54 enable_SMTPUTF8=True,55 decode_data=True)56 def tearDown(self):57 asyncore.close_all()58 asyncore.socket = smtpd.socket = socket59class DebuggingServerTest(unittest.TestCase):60 def setUp(self):61 smtpd.socket = asyncore.socket = mock_socket62 def send_data(self, channel, data, enable_SMTPUTF8=False):63 def write_line(line):64 channel.socket.queue_recv(line)65 channel.handle_read()66 write_line(b'EHLO example')67 if enable_SMTPUTF8:68 write_line(b'MAIL From:eggs@example BODY=8BITMIME SMTPUTF8')69 else:70 write_line(b'MAIL From:eggs@example')71 write_line(b'RCPT To:spam@example')72 write_line(b'DATA')73 write_line(data)74 write_line(b'.')75 def test_process_message_with_decode_data_true(self):76 server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0),77 decode_data=True)78 conn, addr = server.accept()79 channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)80 with support.captured_stdout() as s:81 self.send_data(channel, b'From: test\n\nhello\n')82 stdout = s.getvalue()83 self.assertEqual(stdout, textwrap.dedent("""\84 ---------- MESSAGE FOLLOWS ----------85 From: test86 X-Peer: peer-address87 hello88 ------------ END MESSAGE ------------89 """))90 def test_process_message_with_decode_data_false(self):91 server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0))92 conn, addr = server.accept()93 channel = smtpd.SMTPChannel(server, conn, addr)94 with support.captured_stdout() as s:95 self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n')96 stdout = s.getvalue()97 self.assertEqual(stdout, textwrap.dedent("""\98 ---------- MESSAGE FOLLOWS ----------99 b'From: test'100 b'X-Peer: peer-address'101 b''102 b'h\\xc3\\xa9llo\\xff'103 ------------ END MESSAGE ------------104 """))105 def test_process_message_with_enable_SMTPUTF8_true(self):106 server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0),107 enable_SMTPUTF8=True)108 conn, addr = server.accept()109 channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)110 with support.captured_stdout() as s:111 self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n')112 stdout = s.getvalue()113 self.assertEqual(stdout, textwrap.dedent("""\114 ---------- MESSAGE FOLLOWS ----------115 b'From: test'116 b'X-Peer: peer-address'117 b''118 b'h\\xc3\\xa9llo\\xff'119 ------------ END MESSAGE ------------120 """))121 def test_process_SMTPUTF8_message_with_enable_SMTPUTF8_true(self):122 server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0),123 enable_SMTPUTF8=True)124 conn, addr = server.accept()125 channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)126 with support.captured_stdout() as s:127 self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n',128 enable_SMTPUTF8=True)129 stdout = s.getvalue()130 self.assertEqual(stdout, textwrap.dedent("""\131 ---------- MESSAGE FOLLOWS ----------132 mail options: ['BODY=8BITMIME', 'SMTPUTF8']133 b'From: test'134 b'X-Peer: peer-address'135 b''136 b'h\\xc3\\xa9llo\\xff'137 ------------ END MESSAGE ------------138 """))139 def tearDown(self):140 asyncore.close_all()141 asyncore.socket = smtpd.socket = socket142class TestFamilyDetection(unittest.TestCase):143 def setUp(self):144 smtpd.socket = asyncore.socket = mock_socket145 def tearDown(self):146 asyncore.close_all()147 asyncore.socket = smtpd.socket = socket148 @unittest.skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled")149 def test_socket_uses_IPv6(self):150 server = smtpd.SMTPServer((socket_helper.HOSTv6, 0), (socket_helper.HOSTv4, 0))151 self.assertEqual(server.socket.family, socket.AF_INET6)152 def test_socket_uses_IPv4(self):153 server = smtpd.SMTPServer((socket_helper.HOSTv4, 0), (socket_helper.HOSTv6, 0))154 self.assertEqual(server.socket.family, socket.AF_INET)155class TestRcptOptionParsing(unittest.TestCase):156 error_response = (b'555 RCPT TO parameters not recognized or not '157 b'implemented\r\n')158 def setUp(self):159 smtpd.socket = asyncore.socket = mock_socket160 self.old_debugstream = smtpd.DEBUGSTREAM161 self.debug = smtpd.DEBUGSTREAM = io.StringIO()162 def tearDown(self):163 asyncore.close_all()164 asyncore.socket = smtpd.socket = socket165 smtpd.DEBUGSTREAM = self.old_debugstream166 def write_line(self, channel, line):167 channel.socket.queue_recv(line)168 channel.handle_read()169 def test_params_rejected(self):170 server = DummyServer((socket_helper.HOST, 0), ('b', 0))171 conn, addr = server.accept()172 channel = smtpd.SMTPChannel(server, conn, addr)173 self.write_line(channel, b'EHLO example')174 self.write_line(channel, b'MAIL from: <foo@example.com> size=20')175 self.write_line(channel, b'RCPT to: <foo@example.com> foo=bar')176 self.assertEqual(channel.socket.last, self.error_response)177 def test_nothing_accepted(self):178 server = DummyServer((socket_helper.HOST, 0), ('b', 0))179 conn, addr = server.accept()180 channel = smtpd.SMTPChannel(server, conn, addr)181 self.write_line(channel, b'EHLO example')182 self.write_line(channel, b'MAIL from: <foo@example.com> size=20')183 self.write_line(channel, b'RCPT to: <foo@example.com>')184 self.assertEqual(channel.socket.last, b'250 OK\r\n')185class TestMailOptionParsing(unittest.TestCase):186 error_response = (b'555 MAIL FROM parameters not recognized or not '187 b'implemented\r\n')188 def setUp(self):189 smtpd.socket = asyncore.socket = mock_socket190 self.old_debugstream = smtpd.DEBUGSTREAM191 self.debug = smtpd.DEBUGSTREAM = io.StringIO()192 def tearDown(self):193 asyncore.close_all()194 asyncore.socket = smtpd.socket = socket195 smtpd.DEBUGSTREAM = self.old_debugstream196 def write_line(self, channel, line):197 channel.socket.queue_recv(line)198 channel.handle_read()199 def test_with_decode_data_true(self):200 server = DummyServer((socket_helper.HOST, 0), ('b', 0), decode_data=True)201 conn, addr = server.accept()202 channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)203 self.write_line(channel, b'EHLO example')204 for line in [205 b'MAIL from: <foo@example.com> size=20 SMTPUTF8',206 b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=8BITMIME',207 b'MAIL from: <foo@example.com> size=20 BODY=UNKNOWN',208 b'MAIL from: <foo@example.com> size=20 body=8bitmime',209 ]:210 self.write_line(channel, line)211 self.assertEqual(channel.socket.last, self.error_response)212 self.write_line(channel, b'MAIL from: <foo@example.com> size=20')213 self.assertEqual(channel.socket.last, b'250 OK\r\n')214 def test_with_decode_data_false(self):215 server = DummyServer((socket_helper.HOST, 0), ('b', 0))216 conn, addr = server.accept()217 channel = smtpd.SMTPChannel(server, conn, addr)218 self.write_line(channel, b'EHLO example')219 for line in [220 b'MAIL from: <foo@example.com> size=20 SMTPUTF8',221 b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=8BITMIME',222 ]:223 self.write_line(channel, line)224 self.assertEqual(channel.socket.last, self.error_response)225 self.write_line(226 channel,227 b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=UNKNOWN')228 self.assertEqual(229 channel.socket.last,230 b'501 Error: BODY can only be one of 7BIT, 8BITMIME\r\n')231 self.write_line(232 channel, b'MAIL from: <foo@example.com> size=20 body=8bitmime')233 self.assertEqual(channel.socket.last, b'250 OK\r\n')234 def test_with_enable_smtputf8_true(self):235 server = DummyServer((socket_helper.HOST, 0), ('b', 0), enable_SMTPUTF8=True)236 conn, addr = server.accept()237 channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)238 self.write_line(channel, b'EHLO example')239 self.write_line(240 channel,241 b'MAIL from: <foo@example.com> size=20 body=8bitmime smtputf8')242 self.assertEqual(channel.socket.last, b'250 OK\r\n')243class SMTPDChannelTest(unittest.TestCase):244 def setUp(self):245 smtpd.socket = asyncore.socket = mock_socket246 self.old_debugstream = smtpd.DEBUGSTREAM247 self.debug = smtpd.DEBUGSTREAM = io.StringIO()248 self.server = DummyServer((socket_helper.HOST, 0), ('b', 0),249 decode_data=True)250 conn, addr = self.server.accept()251 self.channel = smtpd.SMTPChannel(self.server, conn, addr,252 decode_data=True)253 def tearDown(self):254 asyncore.close_all()255 asyncore.socket = smtpd.socket = socket256 smtpd.DEBUGSTREAM = self.old_debugstream257 def write_line(self, line):258 self.channel.socket.queue_recv(line)259 self.channel.handle_read()260 def test_broken_connect(self):261 self.assertRaises(262 DummyDispatcherBroken, BrokenDummyServer,263 (socket_helper.HOST, 0), ('b', 0), decode_data=True)264 def test_decode_data_and_enable_SMTPUTF8_raises(self):265 self.assertRaises(266 ValueError, smtpd.SMTPChannel,267 self.server, self.channel.conn, self.channel.addr,268 enable_SMTPUTF8=True, decode_data=True)269 def test_server_accept(self):270 self.server.handle_accept()271 def test_missing_data(self):272 self.write_line(b'')273 self.assertEqual(self.channel.socket.last,274 b'500 Error: bad syntax\r\n')275 def test_EHLO(self):276 self.write_line(b'EHLO example')277 self.assertEqual(self.channel.socket.last, b'250 HELP\r\n')278 def test_EHLO_bad_syntax(self):279 self.write_line(b'EHLO')280 self.assertEqual(self.channel.socket.last,281 b'501 Syntax: EHLO hostname\r\n')282 def test_EHLO_duplicate(self):283 self.write_line(b'EHLO example')284 self.write_line(b'EHLO example')285 self.assertEqual(self.channel.socket.last,286 b'503 Duplicate HELO/EHLO\r\n')287 def test_EHLO_HELO_duplicate(self):288 self.write_line(b'EHLO example')289 self.write_line(b'HELO example')290 self.assertEqual(self.channel.socket.last,291 b'503 Duplicate HELO/EHLO\r\n')292 def test_HELO(self):293 name = smtpd.socket.getfqdn()294 self.write_line(b'HELO example')295 self.assertEqual(self.channel.socket.last,296 '250 {}\r\n'.format(name).encode('ascii'))297 def test_HELO_EHLO_duplicate(self):298 self.write_line(b'HELO example')299 self.write_line(b'EHLO example')300 self.assertEqual(self.channel.socket.last,301 b'503 Duplicate HELO/EHLO\r\n')302 def test_HELP(self):303 self.write_line(b'HELP')304 self.assertEqual(self.channel.socket.last,305 b'250 Supported commands: EHLO HELO MAIL RCPT ' + \306 b'DATA RSET NOOP QUIT VRFY\r\n')307 def test_HELP_command(self):308 self.write_line(b'HELP MAIL')309 self.assertEqual(self.channel.socket.last,310 b'250 Syntax: MAIL FROM: <address>\r\n')311 def test_HELP_command_unknown(self):312 self.write_line(b'HELP SPAM')313 self.assertEqual(self.channel.socket.last,314 b'501 Supported commands: EHLO HELO MAIL RCPT ' + \315 b'DATA RSET NOOP QUIT VRFY\r\n')316 def test_HELO_bad_syntax(self):317 self.write_line(b'HELO')318 self.assertEqual(self.channel.socket.last,319 b'501 Syntax: HELO hostname\r\n')320 def test_HELO_duplicate(self):321 self.write_line(b'HELO example')322 self.write_line(b'HELO example')323 self.assertEqual(self.channel.socket.last,324 b'503 Duplicate HELO/EHLO\r\n')325 def test_HELO_parameter_rejected_when_extensions_not_enabled(self):326 self.extended_smtp = False327 self.write_line(b'HELO example')328 self.write_line(b'MAIL from:<foo@example.com> SIZE=1234')329 self.assertEqual(self.channel.socket.last,330 b'501 Syntax: MAIL FROM: <address>\r\n')331 def test_MAIL_allows_space_after_colon(self):332 self.write_line(b'HELO example')333 self.write_line(b'MAIL from: <foo@example.com>')334 self.assertEqual(self.channel.socket.last,335 b'250 OK\r\n')336 def test_extended_MAIL_allows_space_after_colon(self):337 self.write_line(b'EHLO example')338 self.write_line(b'MAIL from: <foo@example.com> size=20')339 self.assertEqual(self.channel.socket.last,340 b'250 OK\r\n')341 def test_NOOP(self):342 self.write_line(b'NOOP')343 self.assertEqual(self.channel.socket.last, b'250 OK\r\n')344 def test_HELO_NOOP(self):345 self.write_line(b'HELO example')346 self.write_line(b'NOOP')347 self.assertEqual(self.channel.socket.last, b'250 OK\r\n')348 def test_NOOP_bad_syntax(self):349 self.write_line(b'NOOP hi')350 self.assertEqual(self.channel.socket.last,351 b'501 Syntax: NOOP\r\n')352 def test_QUIT(self):353 self.write_line(b'QUIT')354 self.assertEqual(self.channel.socket.last, b'221 Bye\r\n')355 def test_HELO_QUIT(self):356 self.write_line(b'HELO example')357 self.write_line(b'QUIT')358 self.assertEqual(self.channel.socket.last, b'221 Bye\r\n')359 def test_QUIT_arg_ignored(self):360 self.write_line(b'QUIT bye bye')361 self.assertEqual(self.channel.socket.last, b'221 Bye\r\n')362 def test_bad_state(self):363 self.channel.smtp_state = 'BAD STATE'364 self.write_line(b'HELO example')365 self.assertEqual(self.channel.socket.last,366 b'451 Internal confusion\r\n')367 def test_command_too_long(self):368 self.write_line(b'HELO example')369 self.write_line(b'MAIL from: ' +370 b'a' * self.channel.command_size_limit +371 b'@example')372 self.assertEqual(self.channel.socket.last,373 b'500 Error: line too long\r\n')374 def test_MAIL_command_limit_extended_with_SIZE(self):375 self.write_line(b'EHLO example')376 fill_len = self.channel.command_size_limit - len('MAIL from:<@example>')377 self.write_line(b'MAIL from:<' +378 b'a' * fill_len +379 b'@example> SIZE=1234')380 self.assertEqual(self.channel.socket.last, b'250 OK\r\n')381 self.write_line(b'MAIL from:<' +382 b'a' * (fill_len + 26) +383 b'@example> SIZE=1234')384 self.assertEqual(self.channel.socket.last,385 b'500 Error: line too long\r\n')386 def test_MAIL_command_rejects_SMTPUTF8_by_default(self):387 self.write_line(b'EHLO example')388 self.write_line(389 b'MAIL from: <naive@example.com> BODY=8BITMIME SMTPUTF8')390 self.assertEqual(self.channel.socket.last[0:1], b'5')391 def test_data_longer_than_default_data_size_limit(self):392 # Hack the default so we don't have to generate so much data.393 self.channel.data_size_limit = 1048394 self.write_line(b'HELO example')395 self.write_line(b'MAIL From:eggs@example')396 self.write_line(b'RCPT To:spam@example')397 self.write_line(b'DATA')398 self.write_line(b'A' * self.channel.data_size_limit +399 b'A\r\n.')400 self.assertEqual(self.channel.socket.last,401 b'552 Error: Too much mail data\r\n')402 def test_MAIL_size_parameter(self):403 self.write_line(b'EHLO example')404 self.write_line(b'MAIL FROM:<eggs@example> SIZE=512')405 self.assertEqual(self.channel.socket.last,406 b'250 OK\r\n')407 def test_MAIL_invalid_size_parameter(self):408 self.write_line(b'EHLO example')409 self.write_line(b'MAIL FROM:<eggs@example> SIZE=invalid')410 self.assertEqual(self.channel.socket.last,411 b'501 Syntax: MAIL FROM: <address> [SP <mail-parameters>]\r\n')412 def test_MAIL_RCPT_unknown_parameters(self):413 self.write_line(b'EHLO example')414 self.write_line(b'MAIL FROM:<eggs@example> ham=green')415 self.assertEqual(self.channel.socket.last,416 b'555 MAIL FROM parameters not recognized or not implemented\r\n')417 self.write_line(b'MAIL FROM:<eggs@example>')418 self.write_line(b'RCPT TO:<eggs@example> ham=green')419 self.assertEqual(self.channel.socket.last,420 b'555 RCPT TO parameters not recognized or not implemented\r\n')421 def test_MAIL_size_parameter_larger_than_default_data_size_limit(self):422 self.channel.data_size_limit = 1048423 self.write_line(b'EHLO example')424 self.write_line(b'MAIL FROM:<eggs@example> SIZE=2096')425 self.assertEqual(self.channel.socket.last,426 b'552 Error: message size exceeds fixed maximum message size\r\n')427 def test_need_MAIL(self):428 self.write_line(b'HELO example')429 self.write_line(b'RCPT to:spam@example')430 self.assertEqual(self.channel.socket.last,431 b'503 Error: need MAIL command\r\n')432 def test_MAIL_syntax_HELO(self):433 self.write_line(b'HELO example')434 self.write_line(b'MAIL from eggs@example')435 self.assertEqual(self.channel.socket.last,436 b'501 Syntax: MAIL FROM: <address>\r\n')437 def test_MAIL_syntax_EHLO(self):438 self.write_line(b'EHLO example')439 self.write_line(b'MAIL from eggs@example')440 self.assertEqual(self.channel.socket.last,441 b'501 Syntax: MAIL FROM: <address> [SP <mail-parameters>]\r\n')442 def test_MAIL_missing_address(self):443 self.write_line(b'HELO example')444 self.write_line(b'MAIL from:')445 self.assertEqual(self.channel.socket.last,446 b'501 Syntax: MAIL FROM: <address>\r\n')447 def test_MAIL_chevrons(self):448 self.write_line(b'HELO example')449 self.write_line(b'MAIL from:<eggs@example>')450 self.assertEqual(self.channel.socket.last, b'250 OK\r\n')451 def test_MAIL_empty_chevrons(self):452 self.write_line(b'EHLO example')453 self.write_line(b'MAIL from:<>')454 self.assertEqual(self.channel.socket.last, b'250 OK\r\n')455 def test_MAIL_quoted_localpart(self):456 self.write_line(b'EHLO example')457 self.write_line(b'MAIL from: <"Fred Blogs"@example.com>')458 self.assertEqual(self.channel.socket.last, b'250 OK\r\n')459 self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com')460 def test_MAIL_quoted_localpart_no_angles(self):461 self.write_line(b'EHLO example')462 self.write_line(b'MAIL from: "Fred Blogs"@example.com')463 self.assertEqual(self.channel.socket.last, b'250 OK\r\n')464 self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com')465 def test_MAIL_quoted_localpart_with_size(self):466 self.write_line(b'EHLO example')467 self.write_line(b'MAIL from: <"Fred Blogs"@example.com> SIZE=1000')468 self.assertEqual(self.channel.socket.last, b'250 OK\r\n')469 self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com')470 def test_MAIL_quoted_localpart_with_size_no_angles(self):471 self.write_line(b'EHLO example')472 self.write_line(b'MAIL from: "Fred Blogs"@example.com SIZE=1000')473 self.assertEqual(self.channel.socket.last, b'250 OK\r\n')474 self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com')475 def test_nested_MAIL(self):476 self.write_line(b'HELO example')477 self.write_line(b'MAIL from:eggs@example')478 self.write_line(b'MAIL from:spam@example')479 self.assertEqual(self.channel.socket.last,480 b'503 Error: nested MAIL command\r\n')481 def test_VRFY(self):482 self.write_line(b'VRFY eggs@example')483 self.assertEqual(self.channel.socket.last,484 b'252 Cannot VRFY user, but will accept message and attempt ' + \485 b'delivery\r\n')486 def test_VRFY_syntax(self):487 self.write_line(b'VRFY')488 self.assertEqual(self.channel.socket.last,489 b'501 Syntax: VRFY <address>\r\n')490 def test_EXPN_not_implemented(self):491 self.write_line(b'EXPN')492 self.assertEqual(self.channel.socket.last,493 b'502 EXPN not implemented\r\n')494 def test_no_HELO_MAIL(self):495 self.write_line(b'MAIL from:<foo@example.com>')496 self.assertEqual(self.channel.socket.last,497 b'503 Error: send HELO first\r\n')498 def test_need_RCPT(self):499 self.write_line(b'HELO example')500 self.write_line(b'MAIL From:eggs@example')501 self.write_line(b'DATA')502 self.assertEqual(self.channel.socket.last,503 b'503 Error: need RCPT command\r\n')504 def test_RCPT_syntax_HELO(self):505 self.write_line(b'HELO example')506 self.write_line(b'MAIL From: eggs@example')507 self.write_line(b'RCPT to eggs@example')508 self.assertEqual(self.channel.socket.last,509 b'501 Syntax: RCPT TO: <address>\r\n')510 def test_RCPT_syntax_EHLO(self):511 self.write_line(b'EHLO example')512 self.write_line(b'MAIL From: eggs@example')513 self.write_line(b'RCPT to eggs@example')514 self.assertEqual(self.channel.socket.last,515 b'501 Syntax: RCPT TO: <address> [SP <mail-parameters>]\r\n')516 def test_RCPT_lowercase_to_OK(self):517 self.write_line(b'HELO example')518 self.write_line(b'MAIL From: eggs@example')519 self.write_line(b'RCPT to: <eggs@example>')520 self.assertEqual(self.channel.socket.last, b'250 OK\r\n')521 def test_no_HELO_RCPT(self):522 self.write_line(b'RCPT to eggs@example')523 self.assertEqual(self.channel.socket.last,524 b'503 Error: send HELO first\r\n')525 def test_data_dialog(self):526 self.write_line(b'HELO example')527 self.write_line(b'MAIL From:eggs@example')528 self.assertEqual(self.channel.socket.last, b'250 OK\r\n')529 self.write_line(b'RCPT To:spam@example')530 self.assertEqual(self.channel.socket.last, b'250 OK\r\n')531 self.write_line(b'DATA')532 self.assertEqual(self.channel.socket.last,533 b'354 End data with <CR><LF>.<CR><LF>\r\n')534 self.write_line(b'data\r\nmore\r\n.')535 self.assertEqual(self.channel.socket.last, b'250 OK\r\n')536 self.assertEqual(self.server.messages,537 [(('peer-address', 'peer-port'),538 'eggs@example',539 ['spam@example'],540 'data\nmore')])541 def test_DATA_syntax(self):542 self.write_line(b'HELO example')543 self.write_line(b'MAIL From:eggs@example')544 self.write_line(b'RCPT To:spam@example')545 self.write_line(b'DATA spam')546 self.assertEqual(self.channel.socket.last, b'501 Syntax: DATA\r\n')547 def test_no_HELO_DATA(self):548 self.write_line(b'DATA spam')549 self.assertEqual(self.channel.socket.last,550 b'503 Error: send HELO first\r\n')551 def test_data_transparency_section_4_5_2(self):552 self.write_line(b'HELO example')553 self.write_line(b'MAIL From:eggs@example')554 self.write_line(b'RCPT To:spam@example')555 self.write_line(b'DATA')556 self.write_line(b'..\r\n.\r\n')557 self.assertEqual(self.channel.received_data, '.')558 def test_multiple_RCPT(self):559 self.write_line(b'HELO example')560 self.write_line(b'MAIL From:eggs@example')561 self.write_line(b'RCPT To:spam@example')562 self.write_line(b'RCPT To:ham@example')563 self.write_line(b'DATA')564 self.write_line(b'data\r\n.')565 self.assertEqual(self.server.messages,566 [(('peer-address', 'peer-port'),567 'eggs@example',568 ['spam@example','ham@example'],569 'data')])570 def test_manual_status(self):571 # checks that the Channel is able to return a custom status message572 self.write_line(b'HELO example')573 self.write_line(b'MAIL From:eggs@example')574 self.write_line(b'RCPT To:spam@example')575 self.write_line(b'DATA')576 self.write_line(b'return status\r\n.')577 self.assertEqual(self.channel.socket.last, b'250 Okish\r\n')578 def test_RSET(self):579 self.write_line(b'HELO example')580 self.write_line(b'MAIL From:eggs@example')581 self.write_line(b'RCPT To:spam@example')582 self.write_line(b'RSET')583 self.assertEqual(self.channel.socket.last, b'250 OK\r\n')584 self.write_line(b'MAIL From:foo@example')585 self.write_line(b'RCPT To:eggs@example')586 self.write_line(b'DATA')587 self.write_line(b'data\r\n.')588 self.assertEqual(self.server.messages,589 [(('peer-address', 'peer-port'),590 'foo@example',591 ['eggs@example'],592 'data')])593 def test_HELO_RSET(self):594 self.write_line(b'HELO example')595 self.write_line(b'RSET')596 self.assertEqual(self.channel.socket.last, b'250 OK\r\n')597 def test_RSET_syntax(self):598 self.write_line(b'RSET hi')599 self.assertEqual(self.channel.socket.last, b'501 Syntax: RSET\r\n')600 def test_unknown_command(self):601 self.write_line(b'UNKNOWN_CMD')602 self.assertEqual(self.channel.socket.last,603 b'500 Error: command "UNKNOWN_CMD" not ' + \604 b'recognized\r\n')605 def test_attribute_deprecations(self):606 with warnings_helper.check_warnings(('', DeprecationWarning)):607 spam = self.channel._SMTPChannel__server608 with warnings_helper.check_warnings(('', DeprecationWarning)):609 self.channel._SMTPChannel__server = 'spam'610 with warnings_helper.check_warnings(('', DeprecationWarning)):611 spam = self.channel._SMTPChannel__line612 with warnings_helper.check_warnings(('', DeprecationWarning)):613 self.channel._SMTPChannel__line = 'spam'614 with warnings_helper.check_warnings(('', DeprecationWarning)):615 spam = self.channel._SMTPChannel__state616 with warnings_helper.check_warnings(('', DeprecationWarning)):617 self.channel._SMTPChannel__state = 'spam'618 with warnings_helper.check_warnings(('', DeprecationWarning)):619 spam = self.channel._SMTPChannel__greeting620 with warnings_helper.check_warnings(('', DeprecationWarning)):621 self.channel._SMTPChannel__greeting = 'spam'622 with warnings_helper.check_warnings(('', DeprecationWarning)):623 spam = self.channel._SMTPChannel__mailfrom624 with warnings_helper.check_warnings(('', DeprecationWarning)):625 self.channel._SMTPChannel__mailfrom = 'spam'626 with warnings_helper.check_warnings(('', DeprecationWarning)):627 spam = self.channel._SMTPChannel__rcpttos628 with warnings_helper.check_warnings(('', DeprecationWarning)):629 self.channel._SMTPChannel__rcpttos = 'spam'630 with warnings_helper.check_warnings(('', DeprecationWarning)):631 spam = self.channel._SMTPChannel__data632 with warnings_helper.check_warnings(('', DeprecationWarning)):633 self.channel._SMTPChannel__data = 'spam'634 with warnings_helper.check_warnings(('', DeprecationWarning)):635 spam = self.channel._SMTPChannel__fqdn636 with warnings_helper.check_warnings(('', DeprecationWarning)):637 self.channel._SMTPChannel__fqdn = 'spam'638 with warnings_helper.check_warnings(('', DeprecationWarning)):639 spam = self.channel._SMTPChannel__peer640 with warnings_helper.check_warnings(('', DeprecationWarning)):641 self.channel._SMTPChannel__peer = 'spam'642 with warnings_helper.check_warnings(('', DeprecationWarning)):643 spam = self.channel._SMTPChannel__conn644 with warnings_helper.check_warnings(('', DeprecationWarning)):645 self.channel._SMTPChannel__conn = 'spam'646 with warnings_helper.check_warnings(('', DeprecationWarning)):647 spam = self.channel._SMTPChannel__addr648 with warnings_helper.check_warnings(('', DeprecationWarning)):649 self.channel._SMTPChannel__addr = 'spam'650@unittest.skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled")651class SMTPDChannelIPv6Test(SMTPDChannelTest):652 def setUp(self):653 smtpd.socket = asyncore.socket = mock_socket654 self.old_debugstream = smtpd.DEBUGSTREAM655 self.debug = smtpd.DEBUGSTREAM = io.StringIO()656 self.server = DummyServer((socket_helper.HOSTv6, 0), ('b', 0),657 decode_data=True)658 conn, addr = self.server.accept()659 self.channel = smtpd.SMTPChannel(self.server, conn, addr,660 decode_data=True)661class SMTPDChannelWithDataSizeLimitTest(unittest.TestCase):662 def setUp(self):663 smtpd.socket = asyncore.socket = mock_socket664 self.old_debugstream = smtpd.DEBUGSTREAM665 self.debug = smtpd.DEBUGSTREAM = io.StringIO()666 self.server = DummyServer((socket_helper.HOST, 0), ('b', 0),667 decode_data=True)668 conn, addr = self.server.accept()669 # Set DATA size limit to 32 bytes for easy testing670 self.channel = smtpd.SMTPChannel(self.server, conn, addr, 32,671 decode_data=True)672 def tearDown(self):673 asyncore.close_all()674 asyncore.socket = smtpd.socket = socket675 smtpd.DEBUGSTREAM = self.old_debugstream676 def write_line(self, line):677 self.channel.socket.queue_recv(line)678 self.channel.handle_read()679 def test_data_limit_dialog(self):680 self.write_line(b'HELO example')681 self.write_line(b'MAIL From:eggs@example')682 self.assertEqual(self.channel.socket.last, b'250 OK\r\n')683 self.write_line(b'RCPT To:spam@example')684 self.assertEqual(self.channel.socket.last, b'250 OK\r\n')685 self.write_line(b'DATA')686 self.assertEqual(self.channel.socket.last,687 b'354 End data with <CR><LF>.<CR><LF>\r\n')688 self.write_line(b'data\r\nmore\r\n.')689 self.assertEqual(self.channel.socket.last, b'250 OK\r\n')690 self.assertEqual(self.server.messages,691 [(('peer-address', 'peer-port'),692 'eggs@example',693 ['spam@example'],694 'data\nmore')])695 def test_data_limit_dialog_too_much_data(self):696 self.write_line(b'HELO example')697 self.write_line(b'MAIL From:eggs@example')698 self.assertEqual(self.channel.socket.last, b'250 OK\r\n')699 self.write_line(b'RCPT To:spam@example')700 self.assertEqual(self.channel.socket.last, b'250 OK\r\n')701 self.write_line(b'DATA')702 self.assertEqual(self.channel.socket.last,703 b'354 End data with <CR><LF>.<CR><LF>\r\n')704 self.write_line(b'This message is longer than 32 bytes\r\n.')705 self.assertEqual(self.channel.socket.last,706 b'552 Error: Too much mail data\r\n')707class SMTPDChannelWithDecodeDataFalse(unittest.TestCase):708 def setUp(self):709 smtpd.socket = asyncore.socket = mock_socket710 self.old_debugstream = smtpd.DEBUGSTREAM711 self.debug = smtpd.DEBUGSTREAM = io.StringIO()712 self.server = DummyServer((socket_helper.HOST, 0), ('b', 0))713 conn, addr = self.server.accept()714 self.channel = smtpd.SMTPChannel(self.server, conn, addr)715 def tearDown(self):716 asyncore.close_all()717 asyncore.socket = smtpd.socket = socket718 smtpd.DEBUGSTREAM = self.old_debugstream719 def write_line(self, line):720 self.channel.socket.queue_recv(line)721 self.channel.handle_read()722 def test_ascii_data(self):723 self.write_line(b'HELO example')724 self.write_line(b'MAIL From:eggs@example')725 self.write_line(b'RCPT To:spam@example')726 self.write_line(b'DATA')727 self.write_line(b'plain ascii text')728 self.write_line(b'.')729 self.assertEqual(self.channel.received_data, b'plain ascii text')730 def test_utf8_data(self):731 self.write_line(b'HELO example')732 self.write_line(b'MAIL From:eggs@example')733 self.write_line(b'RCPT To:spam@example')734 self.write_line(b'DATA')735 self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')736 self.write_line(b'and some plain ascii')737 self.write_line(b'.')738 self.assertEqual(739 self.channel.received_data,740 b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87\n'741 b'and some plain ascii')742class SMTPDChannelWithDecodeDataTrue(unittest.TestCase):743 def setUp(self):744 smtpd.socket = asyncore.socket = mock_socket745 self.old_debugstream = smtpd.DEBUGSTREAM746 self.debug = smtpd.DEBUGSTREAM = io.StringIO()747 self.server = DummyServer((socket_helper.HOST, 0), ('b', 0),748 decode_data=True)749 conn, addr = self.server.accept()750 # Set decode_data to True751 self.channel = smtpd.SMTPChannel(self.server, conn, addr,752 decode_data=True)753 def tearDown(self):754 asyncore.close_all()755 asyncore.socket = smtpd.socket = socket756 smtpd.DEBUGSTREAM = self.old_debugstream757 def write_line(self, line):758 self.channel.socket.queue_recv(line)759 self.channel.handle_read()760 def test_ascii_data(self):761 self.write_line(b'HELO example')762 self.write_line(b'MAIL From:eggs@example')763 self.write_line(b'RCPT To:spam@example')764 self.write_line(b'DATA')765 self.write_line(b'plain ascii text')766 self.write_line(b'.')767 self.assertEqual(self.channel.received_data, 'plain ascii text')768 def test_utf8_data(self):769 self.write_line(b'HELO example')770 self.write_line(b'MAIL From:eggs@example')771 self.write_line(b'RCPT To:spam@example')772 self.write_line(b'DATA')773 self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')774 self.write_line(b'and some plain ascii')775 self.write_line(b'.')776 self.assertEqual(777 self.channel.received_data,778 'utf8 enriched text: żźć\nand some plain ascii')779class SMTPDChannelTestWithEnableSMTPUTF8True(unittest.TestCase):780 def setUp(self):781 smtpd.socket = asyncore.socket = mock_socket782 self.old_debugstream = smtpd.DEBUGSTREAM783 self.debug = smtpd.DEBUGSTREAM = io.StringIO()784 self.server = DummyServer((socket_helper.HOST, 0), ('b', 0),785 enable_SMTPUTF8=True)786 conn, addr = self.server.accept()787 self.channel = smtpd.SMTPChannel(self.server, conn, addr,788 enable_SMTPUTF8=True)789 def tearDown(self):790 asyncore.close_all()791 asyncore.socket = smtpd.socket = socket792 smtpd.DEBUGSTREAM = self.old_debugstream793 def write_line(self, line):794 self.channel.socket.queue_recv(line)795 self.channel.handle_read()796 def test_MAIL_command_accepts_SMTPUTF8_when_announced(self):797 self.write_line(b'EHLO example')798 self.write_line(799 'MAIL from: <naïve@example.com> BODY=8BITMIME SMTPUTF8'.encode(800 'utf-8')801 )802 self.assertEqual(self.channel.socket.last, b'250 OK\r\n')803 def test_process_smtputf8_message(self):804 self.write_line(b'EHLO example')805 for mail_parameters in [b'', b'BODY=8BITMIME SMTPUTF8']:806 self.write_line(b'MAIL from: <a@example> ' + mail_parameters)807 self.assertEqual(self.channel.socket.last[0:3], b'250')808 self.write_line(b'rcpt to:<b@example.com>')809 self.assertEqual(self.channel.socket.last[0:3], b'250')810 self.write_line(b'data')811 self.assertEqual(self.channel.socket.last[0:3], b'354')812 self.write_line(b'c\r\n.')813 if mail_parameters == b'':814 self.assertEqual(self.channel.socket.last, b'250 OK\r\n')815 else:816 self.assertEqual(self.channel.socket.last,817 b'250 SMTPUTF8 message okish\r\n')818 def test_utf8_data(self):819 self.write_line(b'EHLO example')820 self.write_line(821 'MAIL From: naïve@examplé BODY=8BITMIME SMTPUTF8'.encode('utf-8'))822 self.assertEqual(self.channel.socket.last[0:3], b'250')823 self.write_line('RCPT To:späm@examplé'.encode('utf-8'))824 self.assertEqual(self.channel.socket.last[0:3], b'250')825 self.write_line(b'DATA')826 self.assertEqual(self.channel.socket.last[0:3], b'354')827 self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')828 self.write_line(b'.')829 self.assertEqual(830 self.channel.received_data,831 b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')832 def test_MAIL_command_limit_extended_with_SIZE_and_SMTPUTF8(self):833 self.write_line(b'ehlo example')834 fill_len = (512 + 26 + 10) - len('mail from:<@example>')835 self.write_line(b'MAIL from:<' +836 b'a' * (fill_len + 1) +837 b'@example>')838 self.assertEqual(self.channel.socket.last,839 b'500 Error: line too long\r\n')840 self.write_line(b'MAIL from:<' +841 b'a' * fill_len +842 b'@example>')843 self.assertEqual(self.channel.socket.last, b'250 OK\r\n')844 def test_multiple_emails_with_extended_command_length(self):845 self.write_line(b'ehlo example')846 fill_len = (512 + 26 + 10) - len('mail from:<@example>')847 for char in [b'a', b'b', b'c']:848 self.write_line(b'MAIL from:<' + char * fill_len + b'a@example>')849 self.assertEqual(self.channel.socket.last[0:3], b'500')850 self.write_line(b'MAIL from:<' + char * fill_len + b'@example>')851 self.assertEqual(self.channel.socket.last[0:3], b'250')852 self.write_line(b'rcpt to:<hans@example.com>')853 self.assertEqual(self.channel.socket.last[0:3], b'250')854 self.write_line(b'data')855 self.assertEqual(self.channel.socket.last[0:3], b'354')856 self.write_line(b'test\r\n.')857 self.assertEqual(self.channel.socket.last[0:3], b'250')858class MiscTestCase(unittest.TestCase):859 def test__all__(self):860 not_exported = {861 "program", "Devnull", "DEBUGSTREAM", "NEWLINE", "COMMASPACE",862 "DATA_SIZE_DEFAULT", "usage", "Options", "parseargs",863 }864 support.check__all__(self, smtpd, not_exported=not_exported)865if __name__ == "__main__":...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run pytest-benchmark automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful