Best Python code snippet using dbt-osmosis_python
wsgi_server_test.py
Source:wsgi_server_test.py  
1#!/usr/bin/env python2#3# Copyright 2007 Google Inc.4#5# Licensed under the Apache License, Version 2.0 (the "License");6# you may not use this file except in compliance with the License.7# You may obtain a copy of the License at8#9#     http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing, software12# distributed under the License is distributed on an "AS IS" BASIS,13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14# See the License for the specific language governing permissions and15# limitations under the License.16#17"""Tests for google.appengine.tools.devappserver2.wsgi_server."""18import errno19import json20import os21import select22import socket23import sys24import time25import unittest26import urllib227import google28import mox29from cherrypy import wsgiserver30from google.appengine.tools.devappserver2 import wsgi_server31class TestError(Exception):32  pass33class _SingleAddressWsgiServerTest(unittest.TestCase):34  def setUp(self):35    super(_SingleAddressWsgiServerTest, self).setUp()36    self.server = wsgi_server._SingleAddressWsgiServer(('localhost', 0),37                                                       self.wsgi_application)38    self.server.start()39  def tearDown(self):40    super(_SingleAddressWsgiServerTest, self).tearDown()41    self.server.quit()42  def test_serve(self):43    result = urllib2.urlopen('http://localhost:%d/foo?bar=baz' %44                             self.server.port)45    body = result.read()46    environ = json.loads(body)47    self.assertEqual(200, result.code)48    self.assertEqual('/foo', environ['PATH_INFO'])49    self.assertEqual('bar=baz', environ['QUERY_STRING'])50  def wsgi_application(self, environ, start_response):51    start_response('200 OK', [('Content-Type', 'application/json')])52    serializable_environ = environ.copy()53    del serializable_environ['wsgi.input']54    del serializable_environ['wsgi.errors']55    return [json.dumps(serializable_environ)]56  def other_wsgi_application(self, environ, start_response):57    start_response('200 OK', [('Content-Type', 'text/plain')])58    return ['Hello World']59  def test_set_app(self):60    self.server.set_app(self.other_wsgi_application)61    result = urllib2.urlopen('http://localhost:%d/foo?bar=baz' %62                             self.server.port)63    body = result.read()64    self.assertEqual(200, result.code)65    self.assertEqual('Hello World', body)66  def test_set_error(self):67    self.server.set_error(204)68    result = urllib2.urlopen('http://localhost:%d/foo?bar=baz' %69                             self.server.port)70    self.assertEqual(204, result.code)71class SharedCherryPyThreadPoolTest(unittest.TestCase):72  def setUp(self):73    self.mox = mox.Mox()74    self.mox.StubOutWithMock(wsgi_server._THREAD_POOL, 'submit')75    self.thread_pool = wsgi_server._SharedCherryPyThreadPool()76  def tearDown(self):77    self.mox.UnsetStubs()78  def test_put(self):79    connection = object()80    wsgi_server._THREAD_POOL.submit(self.thread_pool._handle, connection)81    self.mox.ReplayAll()82    self.thread_pool.put(connection)83    self.mox.VerifyAll()84    self.assertEqual(set([connection]), self.thread_pool._connections)85  def test_handle(self):86    connection = self.mox.CreateMock(wsgiserver.HTTPConnection)87    self.mox.StubOutWithMock(self.thread_pool._condition, 'notify')88    self.thread_pool._connections.add(connection)89    connection.communicate()90    connection.close()91    self.thread_pool._condition.notify()92    self.mox.ReplayAll()93    self.thread_pool._handle(connection)94    self.mox.VerifyAll()95    self.assertEqual(set(), self.thread_pool._connections)96  def test_handle_with_exception(self):97    connection = self.mox.CreateMock(wsgiserver.HTTPConnection)98    self.mox.StubOutWithMock(self.thread_pool._condition, 'notify')99    self.thread_pool._connections.add(connection)100    connection.communicate().AndRaise(TestError)101    connection.close()102    self.thread_pool._condition.notify()103    self.mox.ReplayAll()104    self.assertRaises(TestError, self.thread_pool._handle, connection)105    self.mox.VerifyAll()106    self.assertEqual(set(), self.thread_pool._connections)107  def test_stop(self):108    self.mox.ReplayAll()109    self.thread_pool.stop(3)110    self.mox.VerifyAll()111  def test_stop_no_connections(self):112    self.mox.ReplayAll()113    self.thread_pool.stop(0.1)114    self.mox.VerifyAll()115  def test_stop_with_connections(self):116    connection = self.mox.CreateMock(wsgiserver.HTTPConnection)117    self.thread_pool._connections.add(connection)118    self.mox.StubOutWithMock(self.thread_pool, '_shutdown_connection')119    self.thread_pool._shutdown_connection(connection)120    self.mox.ReplayAll()121    self.thread_pool.stop(1)122    self.mox.VerifyAll()123  def test_shutdown_connection(self):124    class DummyObect(object):125      pass126    connection = DummyObect()127    connection.rfile = DummyObect()128    connection.rfile.closed = False129    connection.socket = self.mox.CreateMockAnything()130    connection.socket.shutdown(socket.SHUT_RD)131    self.mox.ReplayAll()132    self.thread_pool._shutdown_connection(connection)133    self.mox.VerifyAll()134  def test_shutdown_connection_rfile_already_close(self):135    class DummyObect(object):136      pass137    connection = DummyObect()138    connection.rfile = DummyObect()139    connection.rfile.closed = True140    connection.socket = self.mox.CreateMockAnything()141    self.mox.ReplayAll()142    self.thread_pool._shutdown_connection(connection)143    self.mox.VerifyAll()144class SelectThreadTest(unittest.TestCase):145  class _MockSocket(object):146    def fileno(self):147      return id(self)148  def setUp(self):149    self.select_thread = wsgi_server.SelectThread()150    self.original_has_poll = wsgi_server._HAS_POLL151    self.mox = mox.Mox()152    self.mox.StubOutWithMock(select, 'select')153    if hasattr(select, 'poll'):154      self.mox.StubOutWithMock(select, 'poll')155    self.mox.StubOutWithMock(time, 'sleep')156  def tearDown(self):157    self.mox.UnsetStubs()158    wsgi_server._HAS_POLL = self.original_has_poll159  def test_add_socket(self):160    file_descriptors = self.select_thread._file_descriptors161    file_descriptor_to_callback = (162        self.select_thread._file_descriptor_to_callback)163    file_descriptors_copy = frozenset(self.select_thread._file_descriptors)164    file_descriptor_to_callback_copy = (165        self.select_thread._file_descriptor_to_callback.copy())166    s = self._MockSocket()167    callback = object()168    self.select_thread.add_socket(s, callback)169    self.assertEqual(file_descriptors_copy, file_descriptors)170    self.assertEqual(file_descriptor_to_callback_copy,171                     file_descriptor_to_callback)172    self.assertEqual(frozenset([s.fileno()]),173                     self.select_thread._file_descriptors)174    self.assertEqual({s.fileno(): callback},175                     self.select_thread._file_descriptor_to_callback)176  def test_remove_socket(self):177    s1 = self._MockSocket()178    callback1 = object()179    s2 = self._MockSocket()180    callback2 = object()181    self.select_thread._file_descriptors = frozenset([s1.fileno(), s2.fileno()])182    self.select_thread._file_descriptor_to_callback = {183        s1.fileno(): callback1, s2.fileno(): callback2}184    file_descriptors = self.select_thread._file_descriptors185    file_descriptor_to_callback = (186        self.select_thread._file_descriptor_to_callback)187    file_descriptors_copy = frozenset(self.select_thread._file_descriptors)188    file_descriptor_to_callback_copy = (189        self.select_thread._file_descriptor_to_callback.copy())190    self.select_thread.remove_socket(s1)191    self.assertEqual(file_descriptors_copy, file_descriptors)192    self.assertEqual(file_descriptor_to_callback_copy,193                     file_descriptor_to_callback)194    self.assertEqual(frozenset([s2.fileno()]),195                     self.select_thread._file_descriptors)196    self.assertEqual({s2.fileno(): callback2},197                     self.select_thread._file_descriptor_to_callback)198  def test_select_no_sockets(self):199    time.sleep(1)200    self.mox.ReplayAll()201    self.select_thread._select()202    self.mox.VerifyAll()203  def test_select_no_poll(self):204    wsgi_server._HAS_POLL = False205    s = self._MockSocket()206    callback = self.mox.CreateMockAnything()207    select.select(frozenset([s.fileno()]), [], [], 1).AndReturn(208        ([s.fileno()], [], []))209    callback()210    self.mox.ReplayAll()211    self.select_thread.add_socket(s, callback)212    self.select_thread._select()213    self.mox.VerifyAll()214  @unittest.skipUnless(wsgi_server._HAS_POLL, 'requires select.poll')215  def test_select_with_poll(self):216    s = self._MockSocket()217    callback = self.mox.CreateMockAnything()218    poll = self.mox.CreateMockAnything()219    select.poll().AndReturn(poll)220    poll.register(s.fileno(), select.POLLIN)221    poll.poll(1000).AndReturn([(s.fileno(), select.POLLIN)])222    callback()223    self.mox.ReplayAll()224    self.select_thread.add_socket(s, callback)225    self.select_thread._select()226    self.mox.VerifyAll()227  def test_select_not_ready_no_poll(self):228    wsgi_server._HAS_POLL = False229    s = self._MockSocket()230    callback = self.mox.CreateMockAnything()231    select.select(frozenset([s.fileno()]), [], [], 1).AndReturn(([], [], []))232    self.mox.ReplayAll()233    self.select_thread.add_socket(s, callback)234    self.select_thread._select()235    self.mox.VerifyAll()236  @unittest.skipUnless(wsgi_server._HAS_POLL, 'requires select.poll')237  def test_select_not_ready_with_poll(self):238    s = self._MockSocket()239    callback = self.mox.CreateMockAnything()240    poll = self.mox.CreateMockAnything()241    select.poll().AndReturn(poll)242    poll.register(s.fileno(), select.POLLIN)243    poll.poll(1000).AndReturn([])244    self.mox.ReplayAll()245    self.select_thread.add_socket(s, callback)246    self.select_thread._select()247    self.mox.VerifyAll()248class WsgiServerStartupTest(unittest.TestCase):249  def setUp(self):250    self.mox = mox.Mox()251    self.server = wsgi_server.WsgiServer(('localhost', 123), None)252  def tearDown(self):253    self.mox.UnsetStubs()254  def test_start_some_fail_to_bind(self):255    failing_server = self.mox.CreateMock(256        wsgi_server._SingleAddressWsgiServer)257    starting_server = self.mox.CreateMock(258        wsgi_server._SingleAddressWsgiServer)259    another_starting_server = self.mox.CreateMock(260        wsgi_server._SingleAddressWsgiServer)261    self.mox.StubOutWithMock(wsgi_server, '_SingleAddressWsgiServer')262    self.mox.StubOutWithMock(socket, 'getaddrinfo')263    socket.getaddrinfo('localhost', 123, socket.AF_UNSPEC, socket.SOCK_STREAM,264                       0, socket.AI_PASSIVE).AndReturn(265                           [(None, None, None, None, ('foo', 'bar', 'baz')),266                            (None, None, None, None, (1, 2, 3, 4, 5)),267                            (None, None, None, None, (3, 4))])268    wsgi_server._SingleAddressWsgiServer(('foo', 'bar'), None).AndReturn(269        failing_server)270    wsgi_server._SingleAddressWsgiServer((1, 2), None).AndReturn(271        starting_server)272    wsgi_server._SingleAddressWsgiServer((3, 4), None).AndReturn(273        another_starting_server)274    starting_server.start()275    failing_server.start().AndRaise(wsgi_server.BindError)276    another_starting_server.start()277    self.mox.ReplayAll()278    self.server.start()279    self.mox.VerifyAll()280    self.assertItemsEqual([starting_server, another_starting_server],281                          self.server._servers)282  def test_start_all_fail_to_bind(self):283    failing_server = self.mox.CreateMock(284        wsgi_server._SingleAddressWsgiServer)285    self.mox.StubOutWithMock(wsgi_server, '_SingleAddressWsgiServer')286    self.mox.StubOutWithMock(socket, 'getaddrinfo')287    socket.getaddrinfo('localhost', 123, socket.AF_UNSPEC, socket.SOCK_STREAM,288                       0, socket.AI_PASSIVE).AndReturn(289                           [(None, None, None, None, ('foo', 'bar', 'baz'))])290    wsgi_server._SingleAddressWsgiServer(('foo', 'bar'), None).AndReturn(291        failing_server)292    failing_server.start().AndRaise(wsgi_server.BindError)293    self.mox.ReplayAll()294    self.assertRaises(wsgi_server.BindError, self.server.start)295    self.mox.VerifyAll()296  def test_remove_duplicates(self):297    foo_server = self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)298    foo2_server = self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)299    self.mox.StubOutWithMock(wsgi_server, '_SingleAddressWsgiServer')300    self.mox.StubOutWithMock(socket, 'getaddrinfo')301    socket.getaddrinfo('localhost', 123, socket.AF_UNSPEC, socket.SOCK_STREAM,302                       0, socket.AI_PASSIVE).AndReturn(303                           [(0, 0, 0, '', ('127.0.0.1', 123)),304                            (0, 0, 0, '', ('::1', 123, 0, 0)),305                            (0, 0, 0, '', ('127.0.0.1', 123))])306    wsgi_server._SingleAddressWsgiServer(('127.0.0.1', 123), None).AndReturn(307        foo_server)308    foo_server.start()309    wsgi_server._SingleAddressWsgiServer(('::1', 123), None).AndReturn(310        foo2_server)311    foo2_server.start()312    self.mox.ReplayAll()313    self.server.start()314    self.mox.VerifyAll()315  def test_quit(self):316    running_server = self.mox.CreateMock(317        wsgi_server._SingleAddressWsgiServer)318    self.server._servers = [running_server]319    running_server.quit()320    self.mox.ReplayAll()321    self.server.quit()322    self.mox.VerifyAll()323class WsgiServerPort0StartupTest(unittest.TestCase):324  def setUp(self):325    self.mox = mox.Mox()326    self.server = wsgi_server.WsgiServer(('localhost', 0), None)327  def tearDown(self):328    self.mox.UnsetStubs()329  def test_basic_behavior(self):330    inet4_server = self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)331    inet6_server = self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)332    self.mox.StubOutWithMock(wsgi_server, '_SingleAddressWsgiServer')333    self.mox.StubOutWithMock(socket, 'getaddrinfo')334    socket.getaddrinfo('localhost', 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,335                       socket.AI_PASSIVE).AndReturn(336                           [(None, None, None, None, ('127.0.0.1', 0, 'baz')),337                            (None, None, None, None, ('::1', 0, 'baz'))])338    wsgi_server._SingleAddressWsgiServer(('127.0.0.1', 0), None).AndReturn(339        inet4_server)340    inet4_server.start()341    inet4_server.port = 123342    wsgi_server._SingleAddressWsgiServer(('::1', 123), None).AndReturn(343        inet6_server)344    inet6_server.start()345    self.mox.ReplayAll()346    self.server.start()347    self.mox.VerifyAll()348    self.assertItemsEqual([inet4_server, inet6_server],349                          self.server._servers)350  def test_retry_eaddrinuse(self):351    inet4_server = self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)352    inet6_server = self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)353    inet4_server_retry = self.mox.CreateMock(354        wsgi_server._SingleAddressWsgiServer)355    inet6_server_retry = self.mox.CreateMock(356        wsgi_server._SingleAddressWsgiServer)357    self.mox.StubOutWithMock(wsgi_server, '_SingleAddressWsgiServer')358    self.mox.StubOutWithMock(socket, 'getaddrinfo')359    socket.getaddrinfo('localhost', 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,360                       socket.AI_PASSIVE).AndReturn(361                           [(None, None, None, None, ('127.0.0.1', 0, 'baz')),362                            (None, None, None, None, ('::1', 0, 'baz'))])363    # First try364    wsgi_server._SingleAddressWsgiServer(('127.0.0.1', 0), None).AndReturn(365        inet4_server)366    inet4_server.start()367    inet4_server.port = 123368    wsgi_server._SingleAddressWsgiServer(('::1', 123), None).AndReturn(369        inet6_server)370    inet6_server.start().AndRaise(371        wsgi_server.BindError('message', (errno.EADDRINUSE, 'in use')))372    inet4_server.quit()373    # Retry374    wsgi_server._SingleAddressWsgiServer(('127.0.0.1', 0), None).AndReturn(375        inet4_server_retry)376    inet4_server_retry.start()377    inet4_server_retry.port = 456378    wsgi_server._SingleAddressWsgiServer(('::1', 456), None).AndReturn(379        inet6_server_retry)380    inet6_server_retry.start()381    self.mox.ReplayAll()382    self.server.start()383    self.mox.VerifyAll()384    self.assertItemsEqual([inet4_server_retry, inet6_server_retry],385                          self.server._servers)386  def test_retry_limited(self):387    inet4_servers = [self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)388                     for _ in range(wsgi_server._PORT_0_RETRIES)]389    inet6_servers = [self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)390                     for _ in range(wsgi_server._PORT_0_RETRIES)]391    self.mox.StubOutWithMock(wsgi_server, '_SingleAddressWsgiServer')392    self.mox.StubOutWithMock(socket, 'getaddrinfo')393    socket.getaddrinfo('localhost', 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,394                       socket.AI_PASSIVE).AndReturn(395                           [(None, None, None, None, ('127.0.0.1', 0, 'baz')),396                            (None, None, None, None, ('::1', 0, 'baz'))])397    for offset, (inet4_server, inet6_server) in enumerate(zip(398        inet4_servers, inet6_servers)):399      wsgi_server._SingleAddressWsgiServer(('127.0.0.1', 0), None).AndReturn(400          inet4_server)401      inet4_server.start()402      inet4_server.port = offset + 1403      wsgi_server._SingleAddressWsgiServer(('::1', offset + 1), None).AndReturn(404          inet6_server)405      inet6_server.start().AndRaise(406          wsgi_server.BindError('message', (errno.EADDRINUSE, 'in use')))407      inet4_server.quit()408    self.mox.ReplayAll()409    self.assertRaises(wsgi_server.BindError, self.server.start)410    self.mox.VerifyAll()411  def test_ignore_other_errors(self):412    inet4_server = self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)413    inet6_server = self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)414    self.mox.StubOutWithMock(wsgi_server, '_SingleAddressWsgiServer')415    self.mox.StubOutWithMock(socket, 'getaddrinfo')416    socket.getaddrinfo('localhost', 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,417                       socket.AI_PASSIVE).AndReturn(418                           [(None, None, None, None, ('127.0.0.1', 0, 'baz')),419                            (None, None, None, None, ('::1', 0, 'baz'))])420    wsgi_server._SingleAddressWsgiServer(('127.0.0.1', 0), None).AndReturn(421        inet4_server)422    inet4_server.start()423    inet4_server.port = 123424    wsgi_server._SingleAddressWsgiServer(('::1', 123), None).AndReturn(425        inet6_server)426    inet6_server.start().AndRaise(427        wsgi_server.BindError('message', (errno.ENOPROTOOPT, 'no protocol')))428    self.mox.ReplayAll()429    self.server.start()430    self.mox.VerifyAll()431    self.assertItemsEqual([inet4_server],432                          self.server._servers)433class _SingleAddressWsgiServerStartupTest(unittest.TestCase):434  def setUp(self):435    self.mox = mox.Mox()436    self.server = wsgi_server._SingleAddressWsgiServer(('localhost', 0), None)437  def tearDown(self):438    self.mox.UnsetStubs()439  def test_start_port_in_use(self):440    self.mox.StubOutWithMock(socket, 'getaddrinfo')441    self.mox.StubOutWithMock(self.server, 'bind')442    af = object()443    socktype = object()444    proto = object()445    socket.getaddrinfo('localhost', 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,446                       socket.AI_PASSIVE).AndReturn(447                           [(af, socktype, proto, None, None)])448    self.server.bind(af, socktype, proto).AndRaise(socket.error)449    self.mox.ReplayAll()450    self.assertRaises(wsgi_server.BindError, self.server.start)451    self.mox.VerifyAll()452  def test_start(self):453    # Ensure no CherryPy thread pools are started.454    self.mox.StubOutWithMock(wsgiserver.ThreadPool, 'start')455    self.mox.StubOutWithMock(wsgi_server._SELECT_THREAD, 'add_socket')456    wsgi_server._SELECT_THREAD.add_socket(mox.IsA(socket.socket),457                                          self.server.tick)458    self.mox.ReplayAll()459    self.server.start()460    self.mox.VerifyAll()461  def test_quit(self):462    self.mox.StubOutWithMock(wsgi_server._SELECT_THREAD, 'remove_socket')463    self.server.socket = object()464    self.server.requests = self.mox.CreateMock(465        wsgi_server._SharedCherryPyThreadPool)466    wsgi_server._SELECT_THREAD.remove_socket(self.server.socket)467    self.server.requests.stop(timeout=1)468    self.mox.ReplayAll()469    self.server.quit()470    self.mox.VerifyAll()471if __name__ == '__main__':...iperf_server_test.py
Source:iperf_server_test.py  
1#!/usr/bin/env python32#3#   Copyright 2019 - The Android Open Source Project4#5#   Licensed under the Apache License, Version 2.0 (the "License");6#   you may not use this file except in compliance with the License.7#   You may obtain a copy of the License at8#9#       http://www.apache.org/licenses/LICENSE-2.010#11#   Unless required by applicable law or agreed to in writing, software12#   distributed under the License is distributed on an "AS IS" BASIS,13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14#   See the License for the specific language governing permissions and15#   limitations under the License.16import logging17import unittest18import mock19import os20from acts.controllers import iperf_server21from acts.controllers.iperf_server import IPerfServer22from acts.controllers.iperf_server import IPerfServerOverAdb23from acts.controllers.iperf_server import IPerfServerOverSsh24# The position in the call tuple that represents the args array.25ARGS = 026# The position in the call tuple that represents the kwargs dict.27KWARGS = 128MOCK_LOGFILE_PATH = '/path/to/foo'29class IPerfServerModuleTest(unittest.TestCase):30    """Tests the acts.controllers.iperf_server module."""31    def test_create_creates_local_iperf_server_with_int(self):32        self.assertIsInstance(33            iperf_server.create([12345])[0],34            IPerfServer,35            'create() failed to create IPerfServer for integer input.'36        )37    def test_create_creates_local_iperf_server_with_str(self):38        self.assertIsInstance(39            iperf_server.create(['12345'])[0],40            IPerfServer,41            'create() failed to create IPerfServer for integer input.'42        )43    def test_create_cannot_create_local_iperf_server_with_bad_str(self):44        with self.assertRaises(ValueError):45            iperf_server.create(['12345BAD_STRING'])46    @mock.patch('acts.controllers.iperf_server.utils')47    def test_create_creates_server_over_ssh_with_ssh_config_and_port(self, _):48        self.assertIsInstance(49            iperf_server.create([{'ssh_config': {'user': '', 'host': ''},50                                  'port': ''}])[0],51            IPerfServerOverSsh,52            'create() failed to create IPerfServerOverSsh for a valid config.'53        )54    def test_create_creates_server_over_adb_with_proper_config(self):55        self.assertIsInstance(56            iperf_server.create([{'AndroidDevice': '53R147', 'port': 0}])[0],57            IPerfServerOverAdb,58            'create() failed to create IPerfServerOverAdb for a valid config.'59        )60    def test_create_raises_value_error_on_bad_config_dict(self):61        with self.assertRaises(ValueError):62            iperf_server.create([{'AndroidDevice': '53R147', 'ssh_config': {}}])63    def test_get_port_from_ss_output_returns_correct_port_ipv4(self):64        ss_output = ('tcp LISTEN  0 5 127.0.0.1:<PORT>  *:*'65                     ' users:(("cmd",pid=<PID>,fd=3))')66        self.assertEqual(67            iperf_server._get_port_from_ss_output(ss_output, '<PID>'), '<PORT>')68    def test_get_port_from_ss_output_returns_correct_port_ipv6(self):69        ss_output = ('tcp LISTEN  0 5 ff:ff:ff:ff:ff:ff:<PORT>  *:*'70                     ' users:(("cmd",pid=<PID>,fd=3))')71        self.assertEqual(72            iperf_server._get_port_from_ss_output(ss_output, '<PID>'), '<PORT>')73class IPerfServerBaseTest(unittest.TestCase):74    """Tests acts.controllers.iperf_server.IPerfServerBase."""75    @mock.patch('os.makedirs')76    def test_get_full_file_path_creates_parent_directory(self, mock_makedirs):77        # Will never actually be created/used.78        logging.log_path = '/tmp/unit_test_garbage'79        server = IPerfServer('port')80        full_file_path = server._get_full_file_path()81        self.assertTrue(82            mock_makedirs.called,83            'Did not attempt to create a directory.'84        )85        self.assertEqual(86            os.path.dirname(full_file_path),87            mock_makedirs.call_args[ARGS][0],88            'The parent directory of the full file path was not created.'89        )90class IPerfServerTest(unittest.TestCase):91    """Tests acts.controllers.iperf_server.IPerfServer."""92    PID = 12345693    def setUp(self):94        iperf_server._get_port_from_ss_output = lambda *_: IPerfServerTest.PID95    @mock.patch('builtins.open')96    @mock.patch('acts.controllers.iperf_server.subprocess')97    @mock.patch('acts.controllers.iperf_server.job')98    def test_start_makes_started_true(self, mock_job, __, ___):99        """Tests calling start() without calling stop() makes started True."""100        server = IPerfServer('port')101        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH102        server.start()103        self.assertTrue(server.started)104    @mock.patch('builtins.open')105    @mock.patch('acts.controllers.iperf_server.subprocess')106    @mock.patch('acts.controllers.iperf_server.job')107    def test_start_stop_makes_started_false(self, _, __, ___):108        """Tests calling start() without calling stop() makes started True."""109        server = IPerfServer('port')110        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH111        server.start()112        server.stop()113        self.assertFalse(server.started)114    @mock.patch('builtins.open')115    @mock.patch('acts.controllers.iperf_server.subprocess')116    @mock.patch('acts.controllers.iperf_server.job')117    def test_start_sets_current_log_file(self, _, __, ___):118        server = IPerfServer('port')119        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH120        server.start()121        self.assertEqual(122            server._current_log_file,123            MOCK_LOGFILE_PATH,124            'The _current_log_file was not received from _get_full_file_path.'125        )126    @mock.patch('builtins.open')127    @mock.patch('acts.controllers.iperf_server.subprocess')128    def test_stop_returns_current_log_file(self, _, __):129        server = IPerfServer('port')130        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH131        server._current_log_file = MOCK_LOGFILE_PATH132        server._iperf_process = mock.Mock()133        log_file = server.stop()134        self.assertEqual(135            log_file,136            MOCK_LOGFILE_PATH,137            'The _current_log_file was not returned by stop().'138        )139    @mock.patch('builtins.open')140    @mock.patch('acts.controllers.iperf_server.subprocess')141    @mock.patch('acts.controllers.iperf_server.job')142    def test_start_does_not_run_two_concurrent_processes(self, start_proc, _, __):143        server = IPerfServer('port')144        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH145        server._iperf_process = mock.Mock()146        server.start()147        self.assertFalse(148            start_proc.called,149            'start() should not begin a second process if another is running.'150        )151    @mock.patch('acts.utils.stop_standing_subprocess')152    def test_stop_exits_early_if_no_process_has_started(self, stop_proc):153        server = IPerfServer('port')154        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH155        server._iperf_process = None156        server.stop()157        self.assertFalse(158            stop_proc.called,159            'stop() should not kill a process if no process is running.'160        )161class IPerfServerOverSshTest(unittest.TestCase):162    """Tests acts.controllers.iperf_server.IPerfServerOverSsh."""163    INIT_ARGS = [{'host': 'TEST_HOST', 'user': 'test'}, 'PORT']164    @mock.patch('acts.controllers.iperf_server.connection')165    def test_start_makes_started_true(self, _):166        """Tests calling start() without calling stop() makes started True."""167        server = IPerfServerOverSsh(*self.INIT_ARGS)168        server._ssh_session = mock.Mock()169        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH170        server.start()171        self.assertTrue(server.started)172    @mock.patch('builtins.open')173    @mock.patch('acts.controllers.iperf_server.connection')174    def test_start_stop_makes_started_false(self, _, __):175        """Tests calling start() without calling stop() makes started True."""176        server = IPerfServerOverSsh(*self.INIT_ARGS)177        server._ssh_session = mock.Mock()178        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH179        server.start()180        server.stop()181        self.assertFalse(server.started)182    @mock.patch('builtins.open')183    @mock.patch('acts.controllers.iperf_server.connection')184    def test_stop_returns_expected_log_file(self, _, __):185        server = IPerfServerOverSsh(*self.INIT_ARGS)186        server._ssh_session = mock.Mock()187        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH188        server._iperf_pid = mock.Mock()189        log_file = server.stop()190        self.assertEqual(191            log_file,192            MOCK_LOGFILE_PATH,193            'The expected log file was not returned by stop().'194        )195    @mock.patch('acts.controllers.iperf_server.connection')196    def test_start_does_not_run_two_concurrent_processes(self, _):197        server = IPerfServerOverSsh(*self.INIT_ARGS)198        server._ssh_session = mock.Mock()199        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH200        server._iperf_pid = mock.Mock()201        server.start()202        self.assertFalse(203            server._ssh_session.run_async.called,204            'start() should not begin a second process if another is running.'205        )206    @mock.patch('acts.utils.stop_standing_subprocess')207    @mock.patch('acts.controllers.iperf_server.connection')208    def test_stop_exits_early_if_no_process_has_started(self, _, __):209        server = IPerfServerOverSsh(*self.INIT_ARGS)210        server._ssh_session = mock.Mock()211        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH212        server._iperf_pid = None213        server.stop()214        self.assertFalse(215            server._ssh_session.run_async.called,216            'stop() should not kill a process if no process is running.'217        )218class IPerfServerOverAdbTest(unittest.TestCase):219    """Tests acts.controllers.iperf_server.IPerfServerOverSsh."""220    ANDROID_DEVICE_PROP = ('acts.controllers.iperf_server.'221                           'IPerfServerOverAdb._android_device')222    @mock.patch(ANDROID_DEVICE_PROP)223    def test_start_makes_started_true(self, mock_ad):224        """Tests calling start() without calling stop() makes started True."""225        server = IPerfServerOverAdb('53R147', 'PORT')226        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH227        mock_ad.adb.shell.return_value = '<PID>'228        server.start()229        self.assertTrue(server.started)230    @mock.patch('acts.libs.proc.job.run')231    @mock.patch('builtins.open')232    @mock.patch(ANDROID_DEVICE_PROP)233    def test_start_stop_makes_started_false(self, mock_ad, _, __):234        """Tests calling start() without calling stop() makes started True."""235        server = IPerfServerOverAdb('53R147', 'PORT')236        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH237        mock_ad.adb.shell.side_effect = ['<PID>', '', '', '']238        server.start()239        server.stop()240        self.assertFalse(server.started)241    @mock.patch('acts.libs.proc.job.run')242    @mock.patch('builtins.open')243    @mock.patch(ANDROID_DEVICE_PROP)244    def test_stop_returns_expected_log_file(self, mock_ad, _, __):245        server = IPerfServerOverAdb('53R147', 'PORT')246        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH247        server._iperf_process = mock.Mock()248        server._iperf_process_adb_pid = '<PID>'249        mock_ad.adb.shell.side_effect = ['', '', '']250        log_file = server.stop()251        self.assertEqual(252            log_file,253            MOCK_LOGFILE_PATH,254            'The expected log file was not returned by stop().'255        )256    @mock.patch(ANDROID_DEVICE_PROP)257    def test_start_does_not_run_two_concurrent_processes(self, android_device):258        server = IPerfServerOverAdb('53R147', 'PORT')259        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH260        server._iperf_process = mock.Mock()261        server.start()262        self.assertFalse(263            android_device.adb.shell_nb.called,264            'start() should not begin a second process if another is running.'265        )266    @mock.patch('acts.libs.proc.job.run')267    @mock.patch('builtins.open')268    @mock.patch(ANDROID_DEVICE_PROP)269    def test_stop_exits_early_if_no_process_has_started(self, android_device, _,270                                                        __):271        server = IPerfServerOverAdb('53R147', 'PORT')272        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH273        server._iperf_pid = None274        server.stop()275        self.assertFalse(276            android_device.adb.shell_nb.called,277            'stop() should not kill a process if no process is running.'278        )279if __name__ == '__main__':...fetchmail.py
Source:fetchmail.py  
1# -*- coding: utf-8 -*-2# Part of Odoo. See LICENSE file for full copyright and licensing details.3import logging4import poplib5from imaplib import IMAP4, IMAP4_SSL6from poplib import POP3, POP3_SSL7from odoo import api, fields, models, tools, _8from odoo.exceptions import UserError9_logger = logging.getLogger(__name__)10MAX_POP_MESSAGES = 5011MAIL_TIMEOUT = 6012# Workaround for Python 2.7.8 bug https://bugs.python.org/issue2390613poplib._MAXLINE = 6553614class FetchmailServer(models.Model):15    """Incoming POP/IMAP mail server account"""16    _name = 'fetchmail.server'17    _description = "POP/IMAP Server"18    _order = 'priority'19    name = fields.Char('Name', required=True)20    active = fields.Boolean('Active', default=True)21    state = fields.Selection([22        ('draft', 'Not Confirmed'),23        ('done', 'Confirmed'),24    ], string='Status', index=True, readonly=True, copy=False, default='draft')25    server = fields.Char(string='Server Name', readonly=True, help="Hostname or IP of the mail server", states={'draft': [('readonly', False)]})26    port = fields.Integer(readonly=True, states={'draft': [('readonly', False)]})27    type = fields.Selection([28        ('pop', 'POP Server'),29        ('imap', 'IMAP Server'),30        ('local', 'Local Server'),31    ], 'Server Type', index=True, required=True, default='pop')32    is_ssl = fields.Boolean('SSL/TLS', help="Connections are encrypted with SSL/TLS through a dedicated port (default: IMAPS=993, POP3S=995)")33    attach = fields.Boolean('Keep Attachments', help="Whether attachments should be downloaded. "34                                                     "If not enabled, incoming emails will be stripped of any attachments before being processed", default=True)35    original = fields.Boolean('Keep Original', help="Whether a full original copy of each email should be kept for reference "36                                                    "and attached to each processed message. This will usually double the size of your message database.")37    date = fields.Datetime(string='Last Fetch Date', readonly=True)38    user = fields.Char(string='Username', readonly=True, states={'draft': [('readonly', False)]})39    password = fields.Char(readonly=True, states={'draft': [('readonly', False)]})40    action_id = fields.Many2one('ir.actions.server', string='Server Action', help="Optional custom server action to trigger for each incoming mail, on the record that was created or updated by this mail")41    object_id = fields.Many2one('ir.model', string="Create a New Record", help="Process each incoming mail as part of a conversation "42                                                                                "corresponding to this document type. This will create "43                                                                                "new documents for new conversations, or attach follow-up "44                                                                                "emails to the existing conversations (documents).")45    priority = fields.Integer(string='Server Priority', readonly=True, states={'draft': [('readonly', False)]}, help="Defines the order of processing, lower values mean higher priority", default=5)46    message_ids = fields.One2many('mail.mail', 'fetchmail_server_id', string='Messages', readonly=True)47    configuration = fields.Text('Configuration', readonly=True)48    script = fields.Char(readonly=True, default='/mail/static/scripts/openerp_mailgate.py')49    @api.onchange('type', 'is_ssl', 'object_id')50    def onchange_server_type(self):51        self.port = 052        if self.type == 'pop':53            self.port = self.is_ssl and 995 or 11054        elif self.type == 'imap':55            self.port = self.is_ssl and 993 or 14356        else:57            self.server = ''58        conf = {59            'dbname': self.env.cr.dbname,60            'uid': self.env.uid,61            'model': self.object_id.model if self.object_id else 'MODELNAME'62        }63        self.configuration = """64            Use the below script with the following command line options with your Mail Transport Agent (MTA)65            openerp_mailgate.py --host=HOSTNAME --port=PORT -u %(uid)d -p PASSWORD -d %(dbname)s66            Example configuration for the postfix mta running locally:67            /etc/postfix/virtual_aliases:68            @youdomain openerp_mailgate@localhost69            /etc/aliases:70            openerp_mailgate: "|/path/to/openerp-mailgate.py --host=localhost -u %(uid)d -p PASSWORD -d %(dbname)s"71        """ % conf72    @api.model73    def create(self, values):74        res = super(FetchmailServer, self).create(values)75        self._update_cron()76        return res77    @api.multi78    def write(self, values):79        res = super(FetchmailServer, self).write(values)80        self._update_cron()81        return res82    @api.multi83    def unlink(self):84        res = super(FetchmailServer, self).unlink()85        self._update_cron()86        return res87    @api.multi88    def set_draft(self):89        self.write({'state': 'draft'})90        return True91    @api.multi92    def connect(self):93        self.ensure_one()94        if self.type == 'imap':95            if self.is_ssl:96                connection = IMAP4_SSL(self.server, int(self.port))97            else:98                connection = IMAP4(self.server, int(self.port))99            connection.login(self.user, self.password)100        elif self.type == 'pop':101            if self.is_ssl:102                connection = POP3_SSL(self.server, int(self.port))103            else:104                connection = POP3(self.server, int(self.port))105            #TODO: use this to remove only unread messages106            #connection.user("recent:"+server.user)107            connection.user(self.user)108            connection.pass_(self.password)109        # Add timeout on socket110        connection.sock.settimeout(MAIL_TIMEOUT)111        return connection112    @api.multi113    def button_confirm_login(self):114        for server in self:115            try:116                connection = server.connect()117                server.write({'state': 'done'})118            except Exception as err:119                _logger.info("Failed to connect to %s server %s.", server.type, server.name, exc_info=True)120                raise UserError(_("Connection test failed: %s") % tools.ustr(err))121            finally:122                try:123                    if connection:124                        if server.type == 'imap':125                            connection.close()126                        elif server.type == 'pop':127                            connection.quit()128                except Exception:129                    # ignored, just a consequence of the previous exception130                    pass131        return True132    @api.model133    def _fetch_mails(self):134        """ Method called by cron to fetch mails from servers """135        return self.search([('state', '=', 'done'), ('type', 'in', ['pop', 'imap'])]).fetch_mail()136    @api.multi137    def fetch_mail(self):138        """ WARNING: meant for cron usage only - will commit() after each email! """139        additionnal_context = {140            'fetchmail_cron_running': True141        }142        MailThread = self.env['mail.thread']143        for server in self:144            _logger.info('start checking for new emails on %s server %s', server.type, server.name)145            additionnal_context['fetchmail_server_id'] = server.id146            additionnal_context['server_type'] = server.type147            count, failed = 0, 0148            imap_server = None149            pop_server = None150            if server.type == 'imap':151                try:152                    imap_server = server.connect()153                    imap_server.select()154                    result, data = imap_server.search(None, '(UNSEEN)')155                    for num in data[0].split():156                        res_id = None157                        result, data = imap_server.fetch(num, '(RFC822)')158                        imap_server.store(num, '-FLAGS', '\\Seen')159                        try:160                            res_id = MailThread.with_context(**additionnal_context).message_process(server.object_id.model, data[0][1], save_original=server.original, strip_attachments=(not server.attach))161                        except Exception:162                            _logger.info('Failed to process mail from %s server %s.', server.type, server.name, exc_info=True)163                            failed += 1164                        if res_id and server.action_id:165                            server.action_id.with_context({166                                'active_id': res_id,167                                'active_ids': [res_id],168                                'active_model': self.env.context.get("thread_model", server.object_id.model)169                            }).run()170                        imap_server.store(num, '+FLAGS', '\\Seen')171                        self._cr.commit()172                        count += 1173                    _logger.info("Fetched %d email(s) on %s server %s; %d succeeded, %d failed.", count, server.type, server.name, (count - failed), failed)174                except Exception:175                    _logger.info("General failure when trying to fetch mail from %s server %s.", server.type, server.name, exc_info=True)176                finally:177                    if imap_server:178                        imap_server.close()179                        imap_server.logout()180            elif server.type == 'pop':181                try:182                    while True:183                        pop_server = server.connect()184                        (num_messages, total_size) = pop_server.stat()185                        pop_server.list()186                        for num in range(1, min(MAX_POP_MESSAGES, num_messages) + 1):187                            (header, messages, octets) = pop_server.retr(num)188                            message = (b'\n').join(messages)189                            res_id = None190                            try:191                                res_id = MailThread.with_context(**additionnal_context).message_process(server.object_id.model, message, save_original=server.original, strip_attachments=(not server.attach))192                                pop_server.dele(num)193                            except Exception:194                                _logger.info('Failed to process mail from %s server %s.', server.type, server.name, exc_info=True)195                                failed += 1196                            if res_id and server.action_id:197                                server.action_id.with_context({198                                    'active_id': res_id,199                                    'active_ids': [res_id],200                                    'active_model': self.env.context.get("thread_model", server.object_id.model)201                                }).run()202                            self.env.cr.commit()203                        if num_messages < MAX_POP_MESSAGES:204                            break205                        pop_server.quit()206                        _logger.info("Fetched %d email(s) on %s server %s; %d succeeded, %d failed.", num_messages, server.type, server.name, (num_messages - failed), failed)207                except Exception:208                    _logger.info("General failure when trying to fetch mail from %s server %s.", server.type, server.name, exc_info=True)209                finally:210                    if pop_server:211                        pop_server.quit()212            server.write({'date': fields.Datetime.now()})213        return True214    @api.model215    def _update_cron(self):216        if self.env.context.get('fetchmail_cron_running'):217            return218        try:219            # Enabled/Disable cron based on the number of 'done' server of type pop or imap220            cron = self.env.ref('fetchmail.ir_cron_mail_gateway_action')221            cron.toggle(model=self._name, domain=[('state', '=', 'done'), ('type', 'in', ['pop', 'imap'])])222        except ValueError:...socket_test_utils.py
Source:socket_test_utils.py  
1#2#   Copyright 2018 - The Android Open Source Project3#4#   Licensed under the Apache License, Version 2.0 (the "License");5#   you may not use this file except in compliance with the License.6#   You may obtain a copy of the License at7#8#       http://www.apache.org/licenses/LICENSE-2.09#10#   Unless required by applicable law or agreed to in writing, software11#   distributed under the License is distributed on an "AS IS" BASIS,12#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13#   See the License for the specific language governing permissions and14#   limitations under the License.15import queue16import re17import threading18import time19from acts.test_utils.net import connectivity_const as cconst20from acts import asserts21MSG = "Test message "22PKTS = 523""" Methods for android.system.Os based sockets """24def open_android_socket(ad, domain, sock_type, ip, port):25    """ Open TCP or UDP using android.system.Os class26    Args:27      1. ad - android device object28      2. domain - IPv4 or IPv6 type29      3. sock_type - UDP or TCP socket30      4. ip - IP addr on the device31      5. port - open socket on port32    Returns:33      File descriptor key34    """35    fd_key = ad.droid.openSocket(domain, sock_type, ip, port)36    ad.log.info("File descriptor: %s" % fd_key)37    asserts.assert_true(fd_key, "Failed to open socket")38    return fd_key39def close_android_socket(ad, fd_key):40    """ Close socket41    Args:42      1. ad - android device object43      2. fd_key - file descriptor key44    """45    status = ad.droid.closeSocket(fd_key)46    asserts.assert_true(status, "Failed to close socket")47def listen_accept_android_socket(client,48                                 server,49                                 client_fd,50                                 server_fd,51                                 server_ip,52                                 server_port):53    """ Listen, accept TCP sockets54    Args:55      1. client : ad object for client device56      2. server : ad object for server device57      3. client_fd : client's socket handle58      4. server_fd : server's socket handle59      5. server_ip : send data to this IP60      6. server_port : send data to this port61    """62    server.droid.listenSocket(server_fd)63    client.droid.connectSocket(client_fd, server_ip, server_port)64    sock = server.droid.acceptSocket(server_fd)65    asserts.assert_true(sock, "Failed to accept socket")66    return sock67def send_recv_data_android_sockets(client,68                                   server,69                                   client_fd,70                                   server_fd,71                                   server_ip,72                                   server_port):73    """ Send TCP or UDP data over android os sockets from client to server.74        Verify that server received the data.75    Args:76      1. client : ad object for client device77      2. server : ad object for server device78      3. client_fd : client's socket handle79      4. server_fd : server's socket handle80      5. server_ip : send data to this IP81      6. server_port : send data to this port82    """83    send_list = []84    recv_list = []85    for _ in range(1, PKTS+1):86        msg = MSG + " %s" % _87        send_list.append(msg)88        client.log.info("Sending message: %s" % msg)89        client.droid.sendDataOverSocket(server_ip, server_port, msg, client_fd)90        recv_msg = server.droid.recvDataOverSocket(server_fd)91        server.log.info("Received message: %s" % recv_msg)92        recv_list.append(recv_msg)93    recv_list = [x.rstrip('\x00') if x else x for x in recv_list]94    asserts.assert_true(send_list and recv_list and send_list == recv_list,95                        "Send and recv information is incorrect")96""" Methods for java.net.DatagramSocket based sockets """97def open_datagram_socket(ad, ip, port):98    """ Open datagram socket99    Args:100      1. ad : android device object101      2. ip : IP addr on the device102      3. port : socket port103    Returns:104      Hash key of the datagram socket105    """106    socket_key = ad.droid.openDatagramSocket(ip, port)107    ad.log.info("Datagram socket: %s" % socket_key)108    asserts.assert_true(socket_key, "Failed to open datagram socket")109    return socket_key110def close_datagram_socket(ad, socket_key):111    """ Close datagram socket112    Args:113      1. socket_key : hash key of datagram socket114    """115    status = ad.droid.closeDatagramSocket(socket_key)116    asserts.assert_true(status, "Failed to close datagram socket")117def send_recv_data_datagram_sockets(client,118                                    server,119                                    client_sock,120                                    server_sock,121                                    server_ip,122                                    server_port):123    """ Send data over datagram socket from dut_a to dut_b.124        Verify that dut_b received the data.125    Args:126      1. client : ad object for client device127      2. server : ad object for server device128      3. client_sock : client's socket handle129      4. server_sock : server's socket handle130      5. server_ip : send data to this IP131      6. server_port : send data to this port132    """133    send_list = []134    recv_list = []135    for _ in range(1, PKTS+1):136        msg = MSG + " %s" % _137        send_list.append(msg)138        client.log.info("Sending message: %s" % msg)139        client.droid.sendDataOverDatagramSocket(client_sock,140                                                msg,141                                                server_ip,142                                                server_port)143        recv_msg = server.droid.recvDataOverDatagramSocket(server_sock)144        server.log.info("Received message: %s" % recv_msg)145        recv_list.append(recv_msg)146    recv_list = [x.rstrip('\x00') if x else x for x in recv_list]147    asserts.assert_true(send_list and recv_list and send_list == recv_list,148                        "Send and recv information is incorrect")149""" Utils methods for java.net.Socket based sockets """150def _accept_socket(server, server_ip, server_port, server_sock, q):151    sock = server.droid.acceptTcpSocket(server_sock)152    server.log.info("Server socket: %s" % sock)153    q.put(sock)154def _client_socket(client, server_ip, server_port, client_ip, client_port, q):155    time.sleep(0.5)156    sock = client.droid.openTcpSocket(server_ip,157                                      server_port,158                                      client_ip,159                                      client_port)160    client.log.info("Client socket: %s" % sock)161    q.put(sock)162def open_connect_socket(client,163                        server,164                        client_ip,165                        server_ip,166                        client_port,167                        server_port,168                        server_sock):169    """ Open tcp socket and connect to server170    Args:171      1. client : ad object for client device172      2. server : ad object for server device173      3. client_ip : client's socket handle174      4. server_ip : send data to this IP175      5. client_port : port on client socket176      6. server_port : port on server socket177      7. server_sock : server socket178    Returns:179      client and server socket from successful connect180    """181    sq = queue.Queue()182    cq = queue.Queue()183    s = threading.Thread(target = _accept_socket,184                         args = (server, server_ip, server_port, server_sock,185                                 sq))186    c = threading.Thread(target = _client_socket,187                         args = (client, server_ip, server_port, client_ip,188                                 client_port, cq))189    s.start()190    c.start()191    c.join()192    s.join()193    client_sock = cq.get()194    server_sock = sq.get()195    asserts.assert_true(client_sock and server_sock, "Failed to open sockets")196    return client_sock, server_sock197def open_server_socket(server, server_ip, server_port):198    """ Open tcp server socket199    Args:200      1. server : ad object for server device201      2. server_ip : send data to this IP202      3. server_port : send data to this port203    """204    sock = server.droid.openTcpServerSocket(server_ip, server_port)205    server.log.info("Server Socket: %s" % sock)206    asserts.assert_true(sock, "Failed to open server socket")207    return sock208def close_socket(ad, socket):209    """ Close socket210    Args:211      1. ad - android device object212      2. socket - socket key213    """214    status = ad.droid.closeTcpSocket(socket)215    asserts.assert_true(status, "Failed to socket")216def close_server_socket(ad, socket):217    """ Close server socket218    Args:219      1. ad - android device object220      2. socket - server socket key221    """222    status = ad.droid.closeTcpServerSocket(socket)223    asserts.assert_true(status, "Failed to socket")224def send_recv_data_sockets(client, server, client_sock, server_sock):225    """ Send data over TCP socket from client to server.226        Verify that server received the data227    Args:228      1. client : ad object for client device229      2. server : ad object for server device230      3. client_sock : client's socket handle231      4. server_sock : server's socket handle232    """233    send_list = []234    recv_list = []235    for _ in range(1, PKTS+1):236        msg = MSG + " %s" % _237        send_list.append(msg)238        client.log.info("Sending message: %s" % msg)239        client.droid.sendDataOverTcpSocket(client_sock, msg)240        recv_msg = server.droid.recvDataOverTcpSocket(server_sock)241        server.log.info("Received message: %s" % recv_msg)242        recv_list.append(recv_msg)243    recv_list = [x.rstrip('\x00') if x else x for x in recv_list]244    asserts.assert_true(send_list and recv_list and send_list == recv_list,...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
