How to use server method in Cypress

Best JavaScript code snippet using cypress

wsgi_server_test.py

Source:wsgi_server_test.py Github

copy

Full Screen

1#!/usr/bin/env python2#3# Copyright 2007 Google Inc.4#5# Licensed under the Apache License, Version 2.0 (the "License");6# you may not use this file except in compliance with the License.7# You may obtain a copy of the License at8#9#     http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing, software12# distributed under the License is distributed on an "AS IS" BASIS,13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14# See the License for the specific language governing permissions and15# limitations under the License.16#17"""Tests for google.appengine.tools.devappserver2.wsgi_server."""18import errno19import json20import os21import select22import socket23import sys24import time25import unittest26import urllib227import google28import mox29from cherrypy import wsgiserver30from google.appengine.tools.devappserver2 import wsgi_server31class TestError(Exception):32  pass33class _SingleAddressWsgiServerTest(unittest.TestCase):34  def setUp(self):35    super(_SingleAddressWsgiServerTest, self).setUp()36    self.server = wsgi_server._SingleAddressWsgiServer(('localhost', 0),37                                                       self.wsgi_application)38    self.server.start()39  def tearDown(self):40    super(_SingleAddressWsgiServerTest, self).tearDown()41    self.server.quit()42  def test_serve(self):43    result = urllib2.urlopen('http://localhost:%d/foo?bar=baz' %44                             self.server.port)45    body = result.read()46    environ = json.loads(body)47    self.assertEqual(200, result.code)48    self.assertEqual('/foo', environ['PATH_INFO'])49    self.assertEqual('bar=baz', environ['QUERY_STRING'])50  def wsgi_application(self, environ, start_response):51    start_response('200 OK', [('Content-Type', 'application/json')])52    serializable_environ = environ.copy()53    del serializable_environ['wsgi.input']54    del serializable_environ['wsgi.errors']55    return [json.dumps(serializable_environ)]56  def other_wsgi_application(self, environ, start_response):57    start_response('200 OK', [('Content-Type', 'text/plain')])58    return ['Hello World']59  def test_set_app(self):60    self.server.set_app(self.other_wsgi_application)61    result = urllib2.urlopen('http://localhost:%d/foo?bar=baz' %62                             self.server.port)63    body = result.read()64    self.assertEqual(200, result.code)65    self.assertEqual('Hello World', body)66  def test_set_error(self):67    self.server.set_error(204)68    result = urllib2.urlopen('http://localhost:%d/foo?bar=baz' %69                             self.server.port)70    self.assertEqual(204, result.code)71class SharedCherryPyThreadPoolTest(unittest.TestCase):72  def setUp(self):73    self.mox = mox.Mox()74    self.mox.StubOutWithMock(wsgi_server._THREAD_POOL, 'submit')75    self.thread_pool = wsgi_server._SharedCherryPyThreadPool()76  def tearDown(self):77    self.mox.UnsetStubs()78  def test_put(self):79    connection = object()80    wsgi_server._THREAD_POOL.submit(self.thread_pool._handle, connection)81    self.mox.ReplayAll()82    self.thread_pool.put(connection)83    self.mox.VerifyAll()84    self.assertEqual(set([connection]), self.thread_pool._connections)85  def test_handle(self):86    connection = self.mox.CreateMock(wsgiserver.HTTPConnection)87    self.mox.StubOutWithMock(self.thread_pool._condition, 'notify')88    self.thread_pool._connections.add(connection)89    connection.communicate()90    connection.close()91    self.thread_pool._condition.notify()92    self.mox.ReplayAll()93    self.thread_pool._handle(connection)94    self.mox.VerifyAll()95    self.assertEqual(set(), self.thread_pool._connections)96  def test_handle_with_exception(self):97    connection = self.mox.CreateMock(wsgiserver.HTTPConnection)98    self.mox.StubOutWithMock(self.thread_pool._condition, 'notify')99    self.thread_pool._connections.add(connection)100    connection.communicate().AndRaise(TestError)101    connection.close()102    self.thread_pool._condition.notify()103    self.mox.ReplayAll()104    self.assertRaises(TestError, self.thread_pool._handle, connection)105    self.mox.VerifyAll()106    self.assertEqual(set(), self.thread_pool._connections)107  def test_stop(self):108    self.mox.ReplayAll()109    self.thread_pool.stop(3)110    self.mox.VerifyAll()111  def test_stop_no_connections(self):112    self.mox.ReplayAll()113    self.thread_pool.stop(0.1)114    self.mox.VerifyAll()115  def test_stop_with_connections(self):116    connection = self.mox.CreateMock(wsgiserver.HTTPConnection)117    self.thread_pool._connections.add(connection)118    self.mox.StubOutWithMock(self.thread_pool, '_shutdown_connection')119    self.thread_pool._shutdown_connection(connection)120    self.mox.ReplayAll()121    self.thread_pool.stop(1)122    self.mox.VerifyAll()123  def test_shutdown_connection(self):124    class DummyObect(object):125      pass126    connection = DummyObect()127    connection.rfile = DummyObect()128    connection.rfile.closed = False129    connection.socket = self.mox.CreateMockAnything()130    connection.socket.shutdown(socket.SHUT_RD)131    self.mox.ReplayAll()132    self.thread_pool._shutdown_connection(connection)133    self.mox.VerifyAll()134  def test_shutdown_connection_rfile_already_close(self):135    class DummyObect(object):136      pass137    connection = DummyObect()138    connection.rfile = DummyObect()139    connection.rfile.closed = True140    connection.socket = self.mox.CreateMockAnything()141    self.mox.ReplayAll()142    self.thread_pool._shutdown_connection(connection)143    self.mox.VerifyAll()144class SelectThreadTest(unittest.TestCase):145  class _MockSocket(object):146    def fileno(self):147      return id(self)148  def setUp(self):149    self.select_thread = wsgi_server.SelectThread()150    self.original_has_poll = wsgi_server._HAS_POLL151    self.mox = mox.Mox()152    self.mox.StubOutWithMock(select, 'select')153    if hasattr(select, 'poll'):154      self.mox.StubOutWithMock(select, 'poll')155    self.mox.StubOutWithMock(time, 'sleep')156  def tearDown(self):157    self.mox.UnsetStubs()158    wsgi_server._HAS_POLL = self.original_has_poll159  def test_add_socket(self):160    file_descriptors = self.select_thread._file_descriptors161    file_descriptor_to_callback = (162        self.select_thread._file_descriptor_to_callback)163    file_descriptors_copy = frozenset(self.select_thread._file_descriptors)164    file_descriptor_to_callback_copy = (165        self.select_thread._file_descriptor_to_callback.copy())166    s = self._MockSocket()167    callback = object()168    self.select_thread.add_socket(s, callback)169    self.assertEqual(file_descriptors_copy, file_descriptors)170    self.assertEqual(file_descriptor_to_callback_copy,171                     file_descriptor_to_callback)172    self.assertEqual(frozenset([s.fileno()]),173                     self.select_thread._file_descriptors)174    self.assertEqual({s.fileno(): callback},175                     self.select_thread._file_descriptor_to_callback)176  def test_remove_socket(self):177    s1 = self._MockSocket()178    callback1 = object()179    s2 = self._MockSocket()180    callback2 = object()181    self.select_thread._file_descriptors = frozenset([s1.fileno(), s2.fileno()])182    self.select_thread._file_descriptor_to_callback = {183        s1.fileno(): callback1, s2.fileno(): callback2}184    file_descriptors = self.select_thread._file_descriptors185    file_descriptor_to_callback = (186        self.select_thread._file_descriptor_to_callback)187    file_descriptors_copy = frozenset(self.select_thread._file_descriptors)188    file_descriptor_to_callback_copy = (189        self.select_thread._file_descriptor_to_callback.copy())190    self.select_thread.remove_socket(s1)191    self.assertEqual(file_descriptors_copy, file_descriptors)192    self.assertEqual(file_descriptor_to_callback_copy,193                     file_descriptor_to_callback)194    self.assertEqual(frozenset([s2.fileno()]),195                     self.select_thread._file_descriptors)196    self.assertEqual({s2.fileno(): callback2},197                     self.select_thread._file_descriptor_to_callback)198  def test_select_no_sockets(self):199    time.sleep(1)200    self.mox.ReplayAll()201    self.select_thread._select()202    self.mox.VerifyAll()203  def test_select_no_poll(self):204    wsgi_server._HAS_POLL = False205    s = self._MockSocket()206    callback = self.mox.CreateMockAnything()207    select.select(frozenset([s.fileno()]), [], [], 1).AndReturn(208        ([s.fileno()], [], []))209    callback()210    self.mox.ReplayAll()211    self.select_thread.add_socket(s, callback)212    self.select_thread._select()213    self.mox.VerifyAll()214  @unittest.skipUnless(wsgi_server._HAS_POLL, 'requires select.poll')215  def test_select_with_poll(self):216    s = self._MockSocket()217    callback = self.mox.CreateMockAnything()218    poll = self.mox.CreateMockAnything()219    select.poll().AndReturn(poll)220    poll.register(s.fileno(), select.POLLIN)221    poll.poll(1000).AndReturn([(s.fileno(), select.POLLIN)])222    callback()223    self.mox.ReplayAll()224    self.select_thread.add_socket(s, callback)225    self.select_thread._select()226    self.mox.VerifyAll()227  def test_select_not_ready_no_poll(self):228    wsgi_server._HAS_POLL = False229    s = self._MockSocket()230    callback = self.mox.CreateMockAnything()231    select.select(frozenset([s.fileno()]), [], [], 1).AndReturn(([], [], []))232    self.mox.ReplayAll()233    self.select_thread.add_socket(s, callback)234    self.select_thread._select()235    self.mox.VerifyAll()236  @unittest.skipUnless(wsgi_server._HAS_POLL, 'requires select.poll')237  def test_select_not_ready_with_poll(self):238    s = self._MockSocket()239    callback = self.mox.CreateMockAnything()240    poll = self.mox.CreateMockAnything()241    select.poll().AndReturn(poll)242    poll.register(s.fileno(), select.POLLIN)243    poll.poll(1000).AndReturn([])244    self.mox.ReplayAll()245    self.select_thread.add_socket(s, callback)246    self.select_thread._select()247    self.mox.VerifyAll()248class WsgiServerStartupTest(unittest.TestCase):249  def setUp(self):250    self.mox = mox.Mox()251    self.server = wsgi_server.WsgiServer(('localhost', 123), None)252  def tearDown(self):253    self.mox.UnsetStubs()254  def test_start_some_fail_to_bind(self):255    failing_server = self.mox.CreateMock(256        wsgi_server._SingleAddressWsgiServer)257    starting_server = self.mox.CreateMock(258        wsgi_server._SingleAddressWsgiServer)259    another_starting_server = self.mox.CreateMock(260        wsgi_server._SingleAddressWsgiServer)261    self.mox.StubOutWithMock(wsgi_server, '_SingleAddressWsgiServer')262    self.mox.StubOutWithMock(socket, 'getaddrinfo')263    socket.getaddrinfo('localhost', 123, socket.AF_UNSPEC, socket.SOCK_STREAM,264                       0, socket.AI_PASSIVE).AndReturn(265                           [(None, None, None, None, ('foo', 'bar', 'baz')),266                            (None, None, None, None, (1, 2, 3, 4, 5)),267                            (None, None, None, None, (3, 4))])268    wsgi_server._SingleAddressWsgiServer(('foo', 'bar'), None).AndReturn(269        failing_server)270    wsgi_server._SingleAddressWsgiServer((1, 2), None).AndReturn(271        starting_server)272    wsgi_server._SingleAddressWsgiServer((3, 4), None).AndReturn(273        another_starting_server)274    starting_server.start()275    failing_server.start().AndRaise(wsgi_server.BindError)276    another_starting_server.start()277    self.mox.ReplayAll()278    self.server.start()279    self.mox.VerifyAll()280    self.assertItemsEqual([starting_server, another_starting_server],281                          self.server._servers)282  def test_start_all_fail_to_bind(self):283    failing_server = self.mox.CreateMock(284        wsgi_server._SingleAddressWsgiServer)285    self.mox.StubOutWithMock(wsgi_server, '_SingleAddressWsgiServer')286    self.mox.StubOutWithMock(socket, 'getaddrinfo')287    socket.getaddrinfo('localhost', 123, socket.AF_UNSPEC, socket.SOCK_STREAM,288                       0, socket.AI_PASSIVE).AndReturn(289                           [(None, None, None, None, ('foo', 'bar', 'baz'))])290    wsgi_server._SingleAddressWsgiServer(('foo', 'bar'), None).AndReturn(291        failing_server)292    failing_server.start().AndRaise(wsgi_server.BindError)293    self.mox.ReplayAll()294    self.assertRaises(wsgi_server.BindError, self.server.start)295    self.mox.VerifyAll()296  def test_remove_duplicates(self):297    foo_server = self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)298    foo2_server = self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)299    self.mox.StubOutWithMock(wsgi_server, '_SingleAddressWsgiServer')300    self.mox.StubOutWithMock(socket, 'getaddrinfo')301    socket.getaddrinfo('localhost', 123, socket.AF_UNSPEC, socket.SOCK_STREAM,302                       0, socket.AI_PASSIVE).AndReturn(303                           [(0, 0, 0, '', ('127.0.0.1', 123)),304                            (0, 0, 0, '', ('::1', 123, 0, 0)),305                            (0, 0, 0, '', ('127.0.0.1', 123))])306    wsgi_server._SingleAddressWsgiServer(('127.0.0.1', 123), None).AndReturn(307        foo_server)308    foo_server.start()309    wsgi_server._SingleAddressWsgiServer(('::1', 123), None).AndReturn(310        foo2_server)311    foo2_server.start()312    self.mox.ReplayAll()313    self.server.start()314    self.mox.VerifyAll()315  def test_quit(self):316    running_server = self.mox.CreateMock(317        wsgi_server._SingleAddressWsgiServer)318    self.server._servers = [running_server]319    running_server.quit()320    self.mox.ReplayAll()321    self.server.quit()322    self.mox.VerifyAll()323class WsgiServerPort0StartupTest(unittest.TestCase):324  def setUp(self):325    self.mox = mox.Mox()326    self.server = wsgi_server.WsgiServer(('localhost', 0), None)327  def tearDown(self):328    self.mox.UnsetStubs()329  def test_basic_behavior(self):330    inet4_server = self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)331    inet6_server = self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)332    self.mox.StubOutWithMock(wsgi_server, '_SingleAddressWsgiServer')333    self.mox.StubOutWithMock(socket, 'getaddrinfo')334    socket.getaddrinfo('localhost', 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,335                       socket.AI_PASSIVE).AndReturn(336                           [(None, None, None, None, ('127.0.0.1', 0, 'baz')),337                            (None, None, None, None, ('::1', 0, 'baz'))])338    wsgi_server._SingleAddressWsgiServer(('127.0.0.1', 0), None).AndReturn(339        inet4_server)340    inet4_server.start()341    inet4_server.port = 123342    wsgi_server._SingleAddressWsgiServer(('::1', 123), None).AndReturn(343        inet6_server)344    inet6_server.start()345    self.mox.ReplayAll()346    self.server.start()347    self.mox.VerifyAll()348    self.assertItemsEqual([inet4_server, inet6_server],349                          self.server._servers)350  def test_retry_eaddrinuse(self):351    inet4_server = self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)352    inet6_server = self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)353    inet4_server_retry = self.mox.CreateMock(354        wsgi_server._SingleAddressWsgiServer)355    inet6_server_retry = self.mox.CreateMock(356        wsgi_server._SingleAddressWsgiServer)357    self.mox.StubOutWithMock(wsgi_server, '_SingleAddressWsgiServer')358    self.mox.StubOutWithMock(socket, 'getaddrinfo')359    socket.getaddrinfo('localhost', 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,360                       socket.AI_PASSIVE).AndReturn(361                           [(None, None, None, None, ('127.0.0.1', 0, 'baz')),362                            (None, None, None, None, ('::1', 0, 'baz'))])363    # First try364    wsgi_server._SingleAddressWsgiServer(('127.0.0.1', 0), None).AndReturn(365        inet4_server)366    inet4_server.start()367    inet4_server.port = 123368    wsgi_server._SingleAddressWsgiServer(('::1', 123), None).AndReturn(369        inet6_server)370    inet6_server.start().AndRaise(371        wsgi_server.BindError('message', (errno.EADDRINUSE, 'in use')))372    inet4_server.quit()373    # Retry374    wsgi_server._SingleAddressWsgiServer(('127.0.0.1', 0), None).AndReturn(375        inet4_server_retry)376    inet4_server_retry.start()377    inet4_server_retry.port = 456378    wsgi_server._SingleAddressWsgiServer(('::1', 456), None).AndReturn(379        inet6_server_retry)380    inet6_server_retry.start()381    self.mox.ReplayAll()382    self.server.start()383    self.mox.VerifyAll()384    self.assertItemsEqual([inet4_server_retry, inet6_server_retry],385                          self.server._servers)386  def test_retry_limited(self):387    inet4_servers = [self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)388                     for _ in range(wsgi_server._PORT_0_RETRIES)]389    inet6_servers = [self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)390                     for _ in range(wsgi_server._PORT_0_RETRIES)]391    self.mox.StubOutWithMock(wsgi_server, '_SingleAddressWsgiServer')392    self.mox.StubOutWithMock(socket, 'getaddrinfo')393    socket.getaddrinfo('localhost', 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,394                       socket.AI_PASSIVE).AndReturn(395                           [(None, None, None, None, ('127.0.0.1', 0, 'baz')),396                            (None, None, None, None, ('::1', 0, 'baz'))])397    for offset, (inet4_server, inet6_server) in enumerate(zip(398        inet4_servers, inet6_servers)):399      wsgi_server._SingleAddressWsgiServer(('127.0.0.1', 0), None).AndReturn(400          inet4_server)401      inet4_server.start()402      inet4_server.port = offset + 1403      wsgi_server._SingleAddressWsgiServer(('::1', offset + 1), None).AndReturn(404          inet6_server)405      inet6_server.start().AndRaise(406          wsgi_server.BindError('message', (errno.EADDRINUSE, 'in use')))407      inet4_server.quit()408    self.mox.ReplayAll()409    self.assertRaises(wsgi_server.BindError, self.server.start)410    self.mox.VerifyAll()411  def test_ignore_other_errors(self):412    inet4_server = self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)413    inet6_server = self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)414    self.mox.StubOutWithMock(wsgi_server, '_SingleAddressWsgiServer')415    self.mox.StubOutWithMock(socket, 'getaddrinfo')416    socket.getaddrinfo('localhost', 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,417                       socket.AI_PASSIVE).AndReturn(418                           [(None, None, None, None, ('127.0.0.1', 0, 'baz')),419                            (None, None, None, None, ('::1', 0, 'baz'))])420    wsgi_server._SingleAddressWsgiServer(('127.0.0.1', 0), None).AndReturn(421        inet4_server)422    inet4_server.start()423    inet4_server.port = 123424    wsgi_server._SingleAddressWsgiServer(('::1', 123), None).AndReturn(425        inet6_server)426    inet6_server.start().AndRaise(427        wsgi_server.BindError('message', (errno.ENOPROTOOPT, 'no protocol')))428    self.mox.ReplayAll()429    self.server.start()430    self.mox.VerifyAll()431    self.assertItemsEqual([inet4_server],432                          self.server._servers)433class _SingleAddressWsgiServerStartupTest(unittest.TestCase):434  def setUp(self):435    self.mox = mox.Mox()436    self.server = wsgi_server._SingleAddressWsgiServer(('localhost', 0), None)437  def tearDown(self):438    self.mox.UnsetStubs()439  def test_start_port_in_use(self):440    self.mox.StubOutWithMock(socket, 'getaddrinfo')441    self.mox.StubOutWithMock(self.server, 'bind')442    af = object()443    socktype = object()444    proto = object()445    socket.getaddrinfo('localhost', 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,446                       socket.AI_PASSIVE).AndReturn(447                           [(af, socktype, proto, None, None)])448    self.server.bind(af, socktype, proto).AndRaise(socket.error)449    self.mox.ReplayAll()450    self.assertRaises(wsgi_server.BindError, self.server.start)451    self.mox.VerifyAll()452  def test_start(self):453    # Ensure no CherryPy thread pools are started.454    self.mox.StubOutWithMock(wsgiserver.ThreadPool, 'start')455    self.mox.StubOutWithMock(wsgi_server._SELECT_THREAD, 'add_socket')456    wsgi_server._SELECT_THREAD.add_socket(mox.IsA(socket.socket),457                                          self.server.tick)458    self.mox.ReplayAll()459    self.server.start()460    self.mox.VerifyAll()461  def test_quit(self):462    self.mox.StubOutWithMock(wsgi_server._SELECT_THREAD, 'remove_socket')463    self.server.socket = object()464    self.server.requests = self.mox.CreateMock(465        wsgi_server._SharedCherryPyThreadPool)466    wsgi_server._SELECT_THREAD.remove_socket(self.server.socket)467    self.server.requests.stop(timeout=1)468    self.mox.ReplayAll()469    self.server.quit()470    self.mox.VerifyAll()471if __name__ == '__main__':...

Full Screen

Full Screen

iperf_server_test.py

Source:iperf_server_test.py Github

copy

Full Screen

1#!/usr/bin/env python32#3#   Copyright 2019 - The Android Open Source Project4#5#   Licensed under the Apache License, Version 2.0 (the "License");6#   you may not use this file except in compliance with the License.7#   You may obtain a copy of the License at8#9#       http://www.apache.org/licenses/LICENSE-2.010#11#   Unless required by applicable law or agreed to in writing, software12#   distributed under the License is distributed on an "AS IS" BASIS,13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14#   See the License for the specific language governing permissions and15#   limitations under the License.16import logging17import unittest18import mock19import os20from acts.controllers import iperf_server21from acts.controllers.iperf_server import IPerfServer22from acts.controllers.iperf_server import IPerfServerOverAdb23from acts.controllers.iperf_server import IPerfServerOverSsh24# The position in the call tuple that represents the args array.25ARGS = 026# The position in the call tuple that represents the kwargs dict.27KWARGS = 128MOCK_LOGFILE_PATH = '/path/to/foo'29class IPerfServerModuleTest(unittest.TestCase):30    """Tests the acts.controllers.iperf_server module."""31    def test_create_creates_local_iperf_server_with_int(self):32        self.assertIsInstance(33            iperf_server.create([12345])[0],34            IPerfServer,35            'create() failed to create IPerfServer for integer input.'36        )37    def test_create_creates_local_iperf_server_with_str(self):38        self.assertIsInstance(39            iperf_server.create(['12345'])[0],40            IPerfServer,41            'create() failed to create IPerfServer for integer input.'42        )43    def test_create_cannot_create_local_iperf_server_with_bad_str(self):44        with self.assertRaises(ValueError):45            iperf_server.create(['12345BAD_STRING'])46    @mock.patch('acts.controllers.iperf_server.utils')47    def test_create_creates_server_over_ssh_with_ssh_config_and_port(self, _):48        self.assertIsInstance(49            iperf_server.create([{'ssh_config': {'user': '', 'host': ''},50                                  'port': ''}])[0],51            IPerfServerOverSsh,52            'create() failed to create IPerfServerOverSsh for a valid config.'53        )54    def test_create_creates_server_over_adb_with_proper_config(self):55        self.assertIsInstance(56            iperf_server.create([{'AndroidDevice': '53R147', 'port': 0}])[0],57            IPerfServerOverAdb,58            'create() failed to create IPerfServerOverAdb for a valid config.'59        )60    def test_create_raises_value_error_on_bad_config_dict(self):61        with self.assertRaises(ValueError):62            iperf_server.create([{'AndroidDevice': '53R147', 'ssh_config': {}}])63    def test_get_port_from_ss_output_returns_correct_port_ipv4(self):64        ss_output = ('tcp LISTEN  0 5 127.0.0.1:<PORT>  *:*'65                     ' users:(("cmd",pid=<PID>,fd=3))')66        self.assertEqual(67            iperf_server._get_port_from_ss_output(ss_output, '<PID>'), '<PORT>')68    def test_get_port_from_ss_output_returns_correct_port_ipv6(self):69        ss_output = ('tcp LISTEN  0 5 ff:ff:ff:ff:ff:ff:<PORT>  *:*'70                     ' users:(("cmd",pid=<PID>,fd=3))')71        self.assertEqual(72            iperf_server._get_port_from_ss_output(ss_output, '<PID>'), '<PORT>')73class IPerfServerBaseTest(unittest.TestCase):74    """Tests acts.controllers.iperf_server.IPerfServerBase."""75    @mock.patch('os.makedirs')76    def test_get_full_file_path_creates_parent_directory(self, mock_makedirs):77        # Will never actually be created/used.78        logging.log_path = '/tmp/unit_test_garbage'79        server = IPerfServer('port')80        full_file_path = server._get_full_file_path()81        self.assertTrue(82            mock_makedirs.called,83            'Did not attempt to create a directory.'84        )85        self.assertEqual(86            os.path.dirname(full_file_path),87            mock_makedirs.call_args[ARGS][0],88            'The parent directory of the full file path was not created.'89        )90class IPerfServerTest(unittest.TestCase):91    """Tests acts.controllers.iperf_server.IPerfServer."""92    PID = 12345693    def setUp(self):94        iperf_server._get_port_from_ss_output = lambda *_: IPerfServerTest.PID95    @mock.patch('builtins.open')96    @mock.patch('acts.controllers.iperf_server.subprocess')97    @mock.patch('acts.controllers.iperf_server.job')98    def test_start_makes_started_true(self, mock_job, __, ___):99        """Tests calling start() without calling stop() makes started True."""100        server = IPerfServer('port')101        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH102        server.start()103        self.assertTrue(server.started)104    @mock.patch('builtins.open')105    @mock.patch('acts.controllers.iperf_server.subprocess')106    @mock.patch('acts.controllers.iperf_server.job')107    def test_start_stop_makes_started_false(self, _, __, ___):108        """Tests calling start() without calling stop() makes started True."""109        server = IPerfServer('port')110        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH111        server.start()112        server.stop()113        self.assertFalse(server.started)114    @mock.patch('builtins.open')115    @mock.patch('acts.controllers.iperf_server.subprocess')116    @mock.patch('acts.controllers.iperf_server.job')117    def test_start_sets_current_log_file(self, _, __, ___):118        server = IPerfServer('port')119        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH120        server.start()121        self.assertEqual(122            server._current_log_file,123            MOCK_LOGFILE_PATH,124            'The _current_log_file was not received from _get_full_file_path.'125        )126    @mock.patch('builtins.open')127    @mock.patch('acts.controllers.iperf_server.subprocess')128    def test_stop_returns_current_log_file(self, _, __):129        server = IPerfServer('port')130        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH131        server._current_log_file = MOCK_LOGFILE_PATH132        server._iperf_process = mock.Mock()133        log_file = server.stop()134        self.assertEqual(135            log_file,136            MOCK_LOGFILE_PATH,137            'The _current_log_file was not returned by stop().'138        )139    @mock.patch('builtins.open')140    @mock.patch('acts.controllers.iperf_server.subprocess')141    @mock.patch('acts.controllers.iperf_server.job')142    def test_start_does_not_run_two_concurrent_processes(self, start_proc, _, __):143        server = IPerfServer('port')144        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH145        server._iperf_process = mock.Mock()146        server.start()147        self.assertFalse(148            start_proc.called,149            'start() should not begin a second process if another is running.'150        )151    @mock.patch('acts.utils.stop_standing_subprocess')152    def test_stop_exits_early_if_no_process_has_started(self, stop_proc):153        server = IPerfServer('port')154        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH155        server._iperf_process = None156        server.stop()157        self.assertFalse(158            stop_proc.called,159            'stop() should not kill a process if no process is running.'160        )161class IPerfServerOverSshTest(unittest.TestCase):162    """Tests acts.controllers.iperf_server.IPerfServerOverSsh."""163    INIT_ARGS = [{'host': 'TEST_HOST', 'user': 'test'}, 'PORT']164    @mock.patch('acts.controllers.iperf_server.connection')165    def test_start_makes_started_true(self, _):166        """Tests calling start() without calling stop() makes started True."""167        server = IPerfServerOverSsh(*self.INIT_ARGS)168        server._ssh_session = mock.Mock()169        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH170        server.start()171        self.assertTrue(server.started)172    @mock.patch('builtins.open')173    @mock.patch('acts.controllers.iperf_server.connection')174    def test_start_stop_makes_started_false(self, _, __):175        """Tests calling start() without calling stop() makes started True."""176        server = IPerfServerOverSsh(*self.INIT_ARGS)177        server._ssh_session = mock.Mock()178        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH179        server.start()180        server.stop()181        self.assertFalse(server.started)182    @mock.patch('builtins.open')183    @mock.patch('acts.controllers.iperf_server.connection')184    def test_stop_returns_expected_log_file(self, _, __):185        server = IPerfServerOverSsh(*self.INIT_ARGS)186        server._ssh_session = mock.Mock()187        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH188        server._iperf_pid = mock.Mock()189        log_file = server.stop()190        self.assertEqual(191            log_file,192            MOCK_LOGFILE_PATH,193            'The expected log file was not returned by stop().'194        )195    @mock.patch('acts.controllers.iperf_server.connection')196    def test_start_does_not_run_two_concurrent_processes(self, _):197        server = IPerfServerOverSsh(*self.INIT_ARGS)198        server._ssh_session = mock.Mock()199        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH200        server._iperf_pid = mock.Mock()201        server.start()202        self.assertFalse(203            server._ssh_session.run_async.called,204            'start() should not begin a second process if another is running.'205        )206    @mock.patch('acts.utils.stop_standing_subprocess')207    @mock.patch('acts.controllers.iperf_server.connection')208    def test_stop_exits_early_if_no_process_has_started(self, _, __):209        server = IPerfServerOverSsh(*self.INIT_ARGS)210        server._ssh_session = mock.Mock()211        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH212        server._iperf_pid = None213        server.stop()214        self.assertFalse(215            server._ssh_session.run_async.called,216            'stop() should not kill a process if no process is running.'217        )218class IPerfServerOverAdbTest(unittest.TestCase):219    """Tests acts.controllers.iperf_server.IPerfServerOverSsh."""220    ANDROID_DEVICE_PROP = ('acts.controllers.iperf_server.'221                           'IPerfServerOverAdb._android_device')222    @mock.patch(ANDROID_DEVICE_PROP)223    def test_start_makes_started_true(self, mock_ad):224        """Tests calling start() without calling stop() makes started True."""225        server = IPerfServerOverAdb('53R147', 'PORT')226        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH227        mock_ad.adb.shell.return_value = '<PID>'228        server.start()229        self.assertTrue(server.started)230    @mock.patch('acts.libs.proc.job.run')231    @mock.patch('builtins.open')232    @mock.patch(ANDROID_DEVICE_PROP)233    def test_start_stop_makes_started_false(self, mock_ad, _, __):234        """Tests calling start() without calling stop() makes started True."""235        server = IPerfServerOverAdb('53R147', 'PORT')236        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH237        mock_ad.adb.shell.side_effect = ['<PID>', '', '', '']238        server.start()239        server.stop()240        self.assertFalse(server.started)241    @mock.patch('acts.libs.proc.job.run')242    @mock.patch('builtins.open')243    @mock.patch(ANDROID_DEVICE_PROP)244    def test_stop_returns_expected_log_file(self, mock_ad, _, __):245        server = IPerfServerOverAdb('53R147', 'PORT')246        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH247        server._iperf_process = mock.Mock()248        server._iperf_process_adb_pid = '<PID>'249        mock_ad.adb.shell.side_effect = ['', '', '']250        log_file = server.stop()251        self.assertEqual(252            log_file,253            MOCK_LOGFILE_PATH,254            'The expected log file was not returned by stop().'255        )256    @mock.patch(ANDROID_DEVICE_PROP)257    def test_start_does_not_run_two_concurrent_processes(self, android_device):258        server = IPerfServerOverAdb('53R147', 'PORT')259        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH260        server._iperf_process = mock.Mock()261        server.start()262        self.assertFalse(263            android_device.adb.shell_nb.called,264            'start() should not begin a second process if another is running.'265        )266    @mock.patch('acts.libs.proc.job.run')267    @mock.patch('builtins.open')268    @mock.patch(ANDROID_DEVICE_PROP)269    def test_stop_exits_early_if_no_process_has_started(self, android_device, _,270                                                        __):271        server = IPerfServerOverAdb('53R147', 'PORT')272        server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH273        server._iperf_pid = None274        server.stop()275        self.assertFalse(276            android_device.adb.shell_nb.called,277            'stop() should not kill a process if no process is running.'278        )279if __name__ == '__main__':...

Full Screen

Full Screen

fetchmail.py

Source:fetchmail.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2# Part of Odoo. See LICENSE file for full copyright and licensing details.3import logging4import poplib5from imaplib import IMAP4, IMAP4_SSL6from poplib import POP3, POP3_SSL7from odoo import api, fields, models, tools, _8from odoo.exceptions import UserError9_logger = logging.getLogger(__name__)10MAX_POP_MESSAGES = 5011MAIL_TIMEOUT = 6012# Workaround for Python 2.7.8 bug https://bugs.python.org/issue2390613poplib._MAXLINE = 6553614class FetchmailServer(models.Model):15    """Incoming POP/IMAP mail server account"""16    _name = 'fetchmail.server'17    _description = "POP/IMAP Server"18    _order = 'priority'19    name = fields.Char('Name', required=True)20    active = fields.Boolean('Active', default=True)21    state = fields.Selection([22        ('draft', 'Not Confirmed'),23        ('done', 'Confirmed'),24    ], string='Status', index=True, readonly=True, copy=False, default='draft')25    server = fields.Char(string='Server Name', readonly=True, help="Hostname or IP of the mail server", states={'draft': [('readonly', False)]})26    port = fields.Integer(readonly=True, states={'draft': [('readonly', False)]})27    type = fields.Selection([28        ('pop', 'POP Server'),29        ('imap', 'IMAP Server'),30        ('local', 'Local Server'),31    ], 'Server Type', index=True, required=True, default='pop')32    is_ssl = fields.Boolean('SSL/TLS', help="Connections are encrypted with SSL/TLS through a dedicated port (default: IMAPS=993, POP3S=995)")33    attach = fields.Boolean('Keep Attachments', help="Whether attachments should be downloaded. "34                                                     "If not enabled, incoming emails will be stripped of any attachments before being processed", default=True)35    original = fields.Boolean('Keep Original', help="Whether a full original copy of each email should be kept for reference "36                                                    "and attached to each processed message. This will usually double the size of your message database.")37    date = fields.Datetime(string='Last Fetch Date', readonly=True)38    user = fields.Char(string='Username', readonly=True, states={'draft': [('readonly', False)]})39    password = fields.Char(readonly=True, states={'draft': [('readonly', False)]})40    action_id = fields.Many2one('ir.actions.server', string='Server Action', help="Optional custom server action to trigger for each incoming mail, on the record that was created or updated by this mail")41    object_id = fields.Many2one('ir.model', string="Create a New Record", help="Process each incoming mail as part of a conversation "42                                                                                "corresponding to this document type. This will create "43                                                                                "new documents for new conversations, or attach follow-up "44                                                                                "emails to the existing conversations (documents).")45    priority = fields.Integer(string='Server Priority', readonly=True, states={'draft': [('readonly', False)]}, help="Defines the order of processing, lower values mean higher priority", default=5)46    message_ids = fields.One2many('mail.mail', 'fetchmail_server_id', string='Messages', readonly=True)47    configuration = fields.Text('Configuration', readonly=True)48    script = fields.Char(readonly=True, default='/mail/static/scripts/openerp_mailgate.py')49    @api.onchange('type', 'is_ssl', 'object_id')50    def onchange_server_type(self):51        self.port = 052        if self.type == 'pop':53            self.port = self.is_ssl and 995 or 11054        elif self.type == 'imap':55            self.port = self.is_ssl and 993 or 14356        else:57            self.server = ''58        conf = {59            'dbname': self.env.cr.dbname,60            'uid': self.env.uid,61            'model': self.object_id.model if self.object_id else 'MODELNAME'62        }63        self.configuration = """64            Use the below script with the following command line options with your Mail Transport Agent (MTA)65            openerp_mailgate.py --host=HOSTNAME --port=PORT -u %(uid)d -p PASSWORD -d %(dbname)s66            Example configuration for the postfix mta running locally:67            /etc/postfix/virtual_aliases:68            @youdomain openerp_mailgate@localhost69            /etc/aliases:70            openerp_mailgate: "|/path/to/openerp-mailgate.py --host=localhost -u %(uid)d -p PASSWORD -d %(dbname)s"71        """ % conf72    @api.model73    def create(self, values):74        res = super(FetchmailServer, self).create(values)75        self._update_cron()76        return res77    @api.multi78    def write(self, values):79        res = super(FetchmailServer, self).write(values)80        self._update_cron()81        return res82    @api.multi83    def unlink(self):84        res = super(FetchmailServer, self).unlink()85        self._update_cron()86        return res87    @api.multi88    def set_draft(self):89        self.write({'state': 'draft'})90        return True91    @api.multi92    def connect(self):93        self.ensure_one()94        if self.type == 'imap':95            if self.is_ssl:96                connection = IMAP4_SSL(self.server, int(self.port))97            else:98                connection = IMAP4(self.server, int(self.port))99            connection.login(self.user, self.password)100        elif self.type == 'pop':101            if self.is_ssl:102                connection = POP3_SSL(self.server, int(self.port))103            else:104                connection = POP3(self.server, int(self.port))105            #TODO: use this to remove only unread messages106            #connection.user("recent:"+server.user)107            connection.user(self.user)108            connection.pass_(self.password)109        # Add timeout on socket110        connection.sock.settimeout(MAIL_TIMEOUT)111        return connection112    @api.multi113    def button_confirm_login(self):114        for server in self:115            try:116                connection = server.connect()117                server.write({'state': 'done'})118            except Exception as err:119                _logger.info("Failed to connect to %s server %s.", server.type, server.name, exc_info=True)120                raise UserError(_("Connection test failed: %s") % tools.ustr(err))121            finally:122                try:123                    if connection:124                        if server.type == 'imap':125                            connection.close()126                        elif server.type == 'pop':127                            connection.quit()128                except Exception:129                    # ignored, just a consequence of the previous exception130                    pass131        return True132    @api.model133    def _fetch_mails(self):134        """ Method called by cron to fetch mails from servers """135        return self.search([('state', '=', 'done'), ('type', 'in', ['pop', 'imap'])]).fetch_mail()136    @api.multi137    def fetch_mail(self):138        """ WARNING: meant for cron usage only - will commit() after each email! """139        additionnal_context = {140            'fetchmail_cron_running': True141        }142        MailThread = self.env['mail.thread']143        for server in self:144            _logger.info('start checking for new emails on %s server %s', server.type, server.name)145            additionnal_context['fetchmail_server_id'] = server.id146            additionnal_context['server_type'] = server.type147            count, failed = 0, 0148            imap_server = None149            pop_server = None150            if server.type == 'imap':151                try:152                    imap_server = server.connect()153                    imap_server.select()154                    result, data = imap_server.search(None, '(UNSEEN)')155                    for num in data[0].split():156                        res_id = None157                        result, data = imap_server.fetch(num, '(RFC822)')158                        imap_server.store(num, '-FLAGS', '\\Seen')159                        try:160                            res_id = MailThread.with_context(**additionnal_context).message_process(server.object_id.model, data[0][1], save_original=server.original, strip_attachments=(not server.attach))161                        except Exception:162                            _logger.info('Failed to process mail from %s server %s.', server.type, server.name, exc_info=True)163                            failed += 1164                        if res_id and server.action_id:165                            server.action_id.with_context({166                                'active_id': res_id,167                                'active_ids': [res_id],168                                'active_model': self.env.context.get("thread_model", server.object_id.model)169                            }).run()170                        imap_server.store(num, '+FLAGS', '\\Seen')171                        self._cr.commit()172                        count += 1173                    _logger.info("Fetched %d email(s) on %s server %s; %d succeeded, %d failed.", count, server.type, server.name, (count - failed), failed)174                except Exception:175                    _logger.info("General failure when trying to fetch mail from %s server %s.", server.type, server.name, exc_info=True)176                finally:177                    if imap_server:178                        imap_server.close()179                        imap_server.logout()180            elif server.type == 'pop':181                try:182                    while True:183                        pop_server = server.connect()184                        (num_messages, total_size) = pop_server.stat()185                        pop_server.list()186                        for num in range(1, min(MAX_POP_MESSAGES, num_messages) + 1):187                            (header, messages, octets) = pop_server.retr(num)188                            message = (b'\n').join(messages)189                            res_id = None190                            try:191                                res_id = MailThread.with_context(**additionnal_context).message_process(server.object_id.model, message, save_original=server.original, strip_attachments=(not server.attach))192                                pop_server.dele(num)193                            except Exception:194                                _logger.info('Failed to process mail from %s server %s.', server.type, server.name, exc_info=True)195                                failed += 1196                            if res_id and server.action_id:197                                server.action_id.with_context({198                                    'active_id': res_id,199                                    'active_ids': [res_id],200                                    'active_model': self.env.context.get("thread_model", server.object_id.model)201                                }).run()202                            self.env.cr.commit()203                        if num_messages < MAX_POP_MESSAGES:204                            break205                        pop_server.quit()206                        _logger.info("Fetched %d email(s) on %s server %s; %d succeeded, %d failed.", num_messages, server.type, server.name, (num_messages - failed), failed)207                except Exception:208                    _logger.info("General failure when trying to fetch mail from %s server %s.", server.type, server.name, exc_info=True)209                finally:210                    if pop_server:211                        pop_server.quit()212            server.write({'date': fields.Datetime.now()})213        return True214    @api.model215    def _update_cron(self):216        if self.env.context.get('fetchmail_cron_running'):217            return218        try:219            # Enabled/Disable cron based on the number of 'done' server of type pop or imap220            cron = self.env.ref('fetchmail.ir_cron_mail_gateway_action')221            cron.toggle(model=self._name, domain=[('state', '=', 'done'), ('type', 'in', ['pop', 'imap'])])222        except ValueError:...

Full Screen

Full Screen

socket_test_utils.py

Source:socket_test_utils.py Github

copy

Full Screen

1#2#   Copyright 2018 - The Android Open Source Project3#4#   Licensed under the Apache License, Version 2.0 (the "License");5#   you may not use this file except in compliance with the License.6#   You may obtain a copy of the License at7#8#       http://www.apache.org/licenses/LICENSE-2.09#10#   Unless required by applicable law or agreed to in writing, software11#   distributed under the License is distributed on an "AS IS" BASIS,12#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13#   See the License for the specific language governing permissions and14#   limitations under the License.15import queue16import re17import threading18import time19from acts.test_utils.net import connectivity_const as cconst20from acts import asserts21MSG = "Test message "22PKTS = 523""" Methods for android.system.Os based sockets """24def open_android_socket(ad, domain, sock_type, ip, port):25    """ Open TCP or UDP using android.system.Os class26    Args:27      1. ad - android device object28      2. domain - IPv4 or IPv6 type29      3. sock_type - UDP or TCP socket30      4. ip - IP addr on the device31      5. port - open socket on port32    Returns:33      File descriptor key34    """35    fd_key = ad.droid.openSocket(domain, sock_type, ip, port)36    ad.log.info("File descriptor: %s" % fd_key)37    asserts.assert_true(fd_key, "Failed to open socket")38    return fd_key39def close_android_socket(ad, fd_key):40    """ Close socket41    Args:42      1. ad - android device object43      2. fd_key - file descriptor key44    """45    status = ad.droid.closeSocket(fd_key)46    asserts.assert_true(status, "Failed to close socket")47def listen_accept_android_socket(client,48                                 server,49                                 client_fd,50                                 server_fd,51                                 server_ip,52                                 server_port):53    """ Listen, accept TCP sockets54    Args:55      1. client : ad object for client device56      2. server : ad object for server device57      3. client_fd : client's socket handle58      4. server_fd : server's socket handle59      5. server_ip : send data to this IP60      6. server_port : send data to this port61    """62    server.droid.listenSocket(server_fd)63    client.droid.connectSocket(client_fd, server_ip, server_port)64    sock = server.droid.acceptSocket(server_fd)65    asserts.assert_true(sock, "Failed to accept socket")66    return sock67def send_recv_data_android_sockets(client,68                                   server,69                                   client_fd,70                                   server_fd,71                                   server_ip,72                                   server_port):73    """ Send TCP or UDP data over android os sockets from client to server.74        Verify that server received the data.75    Args:76      1. client : ad object for client device77      2. server : ad object for server device78      3. client_fd : client's socket handle79      4. server_fd : server's socket handle80      5. server_ip : send data to this IP81      6. server_port : send data to this port82    """83    send_list = []84    recv_list = []85    for _ in range(1, PKTS+1):86        msg = MSG + " %s" % _87        send_list.append(msg)88        client.log.info("Sending message: %s" % msg)89        client.droid.sendDataOverSocket(server_ip, server_port, msg, client_fd)90        recv_msg = server.droid.recvDataOverSocket(server_fd)91        server.log.info("Received message: %s" % recv_msg)92        recv_list.append(recv_msg)93    recv_list = [x.rstrip('\x00') if x else x for x in recv_list]94    asserts.assert_true(send_list and recv_list and send_list == recv_list,95                        "Send and recv information is incorrect")96""" Methods for java.net.DatagramSocket based sockets """97def open_datagram_socket(ad, ip, port):98    """ Open datagram socket99    Args:100      1. ad : android device object101      2. ip : IP addr on the device102      3. port : socket port103    Returns:104      Hash key of the datagram socket105    """106    socket_key = ad.droid.openDatagramSocket(ip, port)107    ad.log.info("Datagram socket: %s" % socket_key)108    asserts.assert_true(socket_key, "Failed to open datagram socket")109    return socket_key110def close_datagram_socket(ad, socket_key):111    """ Close datagram socket112    Args:113      1. socket_key : hash key of datagram socket114    """115    status = ad.droid.closeDatagramSocket(socket_key)116    asserts.assert_true(status, "Failed to close datagram socket")117def send_recv_data_datagram_sockets(client,118                                    server,119                                    client_sock,120                                    server_sock,121                                    server_ip,122                                    server_port):123    """ Send data over datagram socket from dut_a to dut_b.124        Verify that dut_b received the data.125    Args:126      1. client : ad object for client device127      2. server : ad object for server device128      3. client_sock : client's socket handle129      4. server_sock : server's socket handle130      5. server_ip : send data to this IP131      6. server_port : send data to this port132    """133    send_list = []134    recv_list = []135    for _ in range(1, PKTS+1):136        msg = MSG + " %s" % _137        send_list.append(msg)138        client.log.info("Sending message: %s" % msg)139        client.droid.sendDataOverDatagramSocket(client_sock,140                                                msg,141                                                server_ip,142                                                server_port)143        recv_msg = server.droid.recvDataOverDatagramSocket(server_sock)144        server.log.info("Received message: %s" % recv_msg)145        recv_list.append(recv_msg)146    recv_list = [x.rstrip('\x00') if x else x for x in recv_list]147    asserts.assert_true(send_list and recv_list and send_list == recv_list,148                        "Send and recv information is incorrect")149""" Utils methods for java.net.Socket based sockets """150def _accept_socket(server, server_ip, server_port, server_sock, q):151    sock = server.droid.acceptTcpSocket(server_sock)152    server.log.info("Server socket: %s" % sock)153    q.put(sock)154def _client_socket(client, server_ip, server_port, client_ip, client_port, q):155    time.sleep(0.5)156    sock = client.droid.openTcpSocket(server_ip,157                                      server_port,158                                      client_ip,159                                      client_port)160    client.log.info("Client socket: %s" % sock)161    q.put(sock)162def open_connect_socket(client,163                        server,164                        client_ip,165                        server_ip,166                        client_port,167                        server_port,168                        server_sock):169    """ Open tcp socket and connect to server170    Args:171      1. client : ad object for client device172      2. server : ad object for server device173      3. client_ip : client's socket handle174      4. server_ip : send data to this IP175      5. client_port : port on client socket176      6. server_port : port on server socket177      7. server_sock : server socket178    Returns:179      client and server socket from successful connect180    """181    sq = queue.Queue()182    cq = queue.Queue()183    s = threading.Thread(target = _accept_socket,184                         args = (server, server_ip, server_port, server_sock,185                                 sq))186    c = threading.Thread(target = _client_socket,187                         args = (client, server_ip, server_port, client_ip,188                                 client_port, cq))189    s.start()190    c.start()191    c.join()192    s.join()193    client_sock = cq.get()194    server_sock = sq.get()195    asserts.assert_true(client_sock and server_sock, "Failed to open sockets")196    return client_sock, server_sock197def open_server_socket(server, server_ip, server_port):198    """ Open tcp server socket199    Args:200      1. server : ad object for server device201      2. server_ip : send data to this IP202      3. server_port : send data to this port203    """204    sock = server.droid.openTcpServerSocket(server_ip, server_port)205    server.log.info("Server Socket: %s" % sock)206    asserts.assert_true(sock, "Failed to open server socket")207    return sock208def close_socket(ad, socket):209    """ Close socket210    Args:211      1. ad - android device object212      2. socket - socket key213    """214    status = ad.droid.closeTcpSocket(socket)215    asserts.assert_true(status, "Failed to socket")216def close_server_socket(ad, socket):217    """ Close server socket218    Args:219      1. ad - android device object220      2. socket - server socket key221    """222    status = ad.droid.closeTcpServerSocket(socket)223    asserts.assert_true(status, "Failed to socket")224def send_recv_data_sockets(client, server, client_sock, server_sock):225    """ Send data over TCP socket from client to server.226        Verify that server received the data227    Args:228      1. client : ad object for client device229      2. server : ad object for server device230      3. client_sock : client's socket handle231      4. server_sock : server's socket handle232    """233    send_list = []234    recv_list = []235    for _ in range(1, PKTS+1):236        msg = MSG + " %s" % _237        send_list.append(msg)238        client.log.info("Sending message: %s" % msg)239        client.droid.sendDataOverTcpSocket(client_sock, msg)240        recv_msg = server.droid.recvDataOverTcpSocket(server_sock)241        server.log.info("Received message: %s" % recv_msg)242        recv_list.append(recv_msg)243    recv_list = [x.rstrip('\x00') if x else x for x in recv_list]244    asserts.assert_true(send_list and recv_list and send_list == recv_list,...

Full Screen

Full Screen

test_socketserver.py

Source:test_socketserver.py Github

copy

Full Screen

...87                else:88                    fn = fn.replace(os.altsep, os.sep)89            self.test_files.append(fn)90            return fn91    def make_server(self, addr, svrcls, hdlrbase):92        class MyServer(svrcls):93            def handle_error(self, request, client_address):94                self.close_request(request)95                self.server_close()96                raise97        class MyHandler(hdlrbase):98            def handle(self):99                line = self.rfile.readline()100                self.wfile.write(line)101        if verbose: print "creating server"102        server = MyServer(addr, MyHandler)103        self.assertEqual(server.server_address, server.socket.getsockname())104        return server105    @unittest.skipUnless(threading, 'Threading required for this test.')106    @reap_threads107    def run_server(self, svrcls, hdlrbase, testfunc):108        server = self.make_server(self.pickaddr(svrcls.address_family),109                                  svrcls, hdlrbase)110        # We had the OS pick a port, so pull the real address out of111        # the server.112        addr = server.server_address113        if verbose:114            print "server created"115            print "ADDR =", addr116            print "CLASS =", svrcls117        t = threading.Thread(118            name='%s serving' % svrcls,119            target=server.serve_forever,120            # Short poll interval to make the test finish quickly.121            # Time between requests is short enough that we won't wake122            # up spuriously too many times.123            kwargs={'poll_interval':0.01})124        t.daemon = True  # In case this function raises.125        t.start()126        if verbose: print "server running"127        for i in range(3):128            if verbose: print "test client", i129            testfunc(svrcls.address_family, addr)130        if verbose: print "waiting for server"131        server.shutdown()132        t.join()133        if verbose: print "done"134    def stream_examine(self, proto, addr):135        s = socket.socket(proto, socket.SOCK_STREAM)136        s.connect(addr)137        s.sendall(TEST_STR)138        buf = data = receive(s, 100)139        while data and '\n' not in buf:140            data = receive(s, 100)141            buf += data142        self.assertEqual(buf, TEST_STR)143        s.close()144    def dgram_examine(self, proto, addr):145        s = socket.socket(proto, socket.SOCK_DGRAM)146        s.sendto(TEST_STR, addr)147        buf = data = receive(s, 100)148        while data and '\n' not in buf:149            data = receive(s, 100)150            buf += data151        self.assertEqual(buf, TEST_STR)152        s.close()153    def test_TCPServer(self):154        self.run_server(SocketServer.TCPServer,155                        SocketServer.StreamRequestHandler,156                        self.stream_examine)157    def test_ThreadingTCPServer(self):158        self.run_server(SocketServer.ThreadingTCPServer,159                        SocketServer.StreamRequestHandler,160                        self.stream_examine)161    if HAVE_FORKING:162        def test_ForkingTCPServer(self):163            with simple_subprocess(self):164                self.run_server(SocketServer.ForkingTCPServer,165                                SocketServer.StreamRequestHandler,166                                self.stream_examine)167    if HAVE_UNIX_SOCKETS:168        def test_UnixStreamServer(self):169            self.run_server(SocketServer.UnixStreamServer,170                            SocketServer.StreamRequestHandler,171                            self.stream_examine)172        def test_ThreadingUnixStreamServer(self):173            self.run_server(SocketServer.ThreadingUnixStreamServer,174                            SocketServer.StreamRequestHandler,175                            self.stream_examine)176        if HAVE_FORKING:177            def test_ForkingUnixStreamServer(self):178                with simple_subprocess(self):179                    self.run_server(ForkingUnixStreamServer,180                                    SocketServer.StreamRequestHandler,181                                    self.stream_examine)182    def test_UDPServer(self):183        self.run_server(SocketServer.UDPServer,184                        SocketServer.DatagramRequestHandler,185                        self.dgram_examine)186    def test_ThreadingUDPServer(self):187        self.run_server(SocketServer.ThreadingUDPServer,188                        SocketServer.DatagramRequestHandler,189                        self.dgram_examine)190    if HAVE_FORKING:191        def test_ForkingUDPServer(self):192            with simple_subprocess(self):193                self.run_server(SocketServer.ForkingUDPServer,194                                SocketServer.DatagramRequestHandler,195                                self.dgram_examine)196    # Alas, on Linux (at least) recvfrom() doesn't return a meaningful197    # client address so this cannot work:198    # if HAVE_UNIX_SOCKETS:199    #     def test_UnixDatagramServer(self):200    #         self.run_server(SocketServer.UnixDatagramServer,201    #                         SocketServer.DatagramRequestHandler,202    #                         self.dgram_examine)203    #204    #     def test_ThreadingUnixDatagramServer(self):205    #         self.run_server(SocketServer.ThreadingUnixDatagramServer,206    #                         SocketServer.DatagramRequestHandler,207    #                         self.dgram_examine)208    #209    #     if HAVE_FORKING:210    #         def test_ForkingUnixDatagramServer(self):211    #             self.run_server(SocketServer.ForkingUnixDatagramServer,212    #                             SocketServer.DatagramRequestHandler,213    #                             self.dgram_examine)214    @reap_threads215    def test_shutdown(self):216        # Issue #2302: shutdown() should always succeed in making an217        # other thread leave serve_forever().218        class MyServer(SocketServer.TCPServer):219            pass220        class MyHandler(SocketServer.StreamRequestHandler):221            pass222        threads = []223        for i in range(20):224            s = MyServer((HOST, 0), MyHandler)225            t = threading.Thread(...

Full Screen

Full Screen

sensor.py

Source:sensor.py Github

copy

Full Screen

1"""The Minecraft Server sensor platform."""2from __future__ import annotations3from typing import Any4from homeassistant.components.sensor import SensorEntity5from homeassistant.config_entries import ConfigEntry6from homeassistant.const import TIME_MILLISECONDS7from homeassistant.core import HomeAssistant8from homeassistant.helpers.entity_platform import AddEntitiesCallback9from . import MinecraftServer, MinecraftServerEntity10from .const import (11    ATTR_PLAYERS_LIST,12    DOMAIN,13    ICON_LATENCY_TIME,14    ICON_PLAYERS_MAX,15    ICON_PLAYERS_ONLINE,16    ICON_PROTOCOL_VERSION,17    ICON_VERSION,18    NAME_LATENCY_TIME,19    NAME_PLAYERS_MAX,20    NAME_PLAYERS_ONLINE,21    NAME_PROTOCOL_VERSION,22    NAME_VERSION,23    UNIT_PLAYERS_MAX,24    UNIT_PLAYERS_ONLINE,25    UNIT_PROTOCOL_VERSION,26    UNIT_VERSION,27)28async def async_setup_entry(29    hass: HomeAssistant,30    config_entry: ConfigEntry,31    async_add_entities: AddEntitiesCallback,32) -> None:33    """Set up the Minecraft Server sensor platform."""34    server = hass.data[DOMAIN][config_entry.unique_id]35    # Create entities list.36    entities = [37        MinecraftServerVersionSensor(server),38        MinecraftServerProtocolVersionSensor(server),39        MinecraftServerLatencyTimeSensor(server),40        MinecraftServerPlayersOnlineSensor(server),41        MinecraftServerPlayersMaxSensor(server),42    ]43    # Add sensor entities.44    async_add_entities(entities, True)45class MinecraftServerSensorEntity(MinecraftServerEntity, SensorEntity):46    """Representation of a Minecraft Server sensor base entity."""47    def __init__(48        self,49        server: MinecraftServer,50        type_name: str,51        icon: str = None,52        unit: str = None,53        device_class: str = None,54    ) -> None:55        """Initialize sensor base entity."""56        super().__init__(server, type_name, icon, device_class)57        self._state = None58        self._unit = unit59    @property60    def available(self) -> bool:61        """Return sensor availability."""62        return self._server.online63    @property64    def native_value(self) -> Any:65        """Return sensor state."""66        return self._state67    @property68    def native_unit_of_measurement(self) -> str:69        """Return sensor measurement unit."""70        return self._unit71class MinecraftServerVersionSensor(MinecraftServerSensorEntity):72    """Representation of a Minecraft Server version sensor."""73    def __init__(self, server: MinecraftServer) -> None:74        """Initialize version sensor."""75        super().__init__(76            server=server, type_name=NAME_VERSION, icon=ICON_VERSION, unit=UNIT_VERSION77        )78    async def async_update(self) -> None:79        """Update version."""80        self._state = self._server.version81class MinecraftServerProtocolVersionSensor(MinecraftServerSensorEntity):82    """Representation of a Minecraft Server protocol version sensor."""83    def __init__(self, server: MinecraftServer) -> None:84        """Initialize protocol version sensor."""85        super().__init__(86            server=server,87            type_name=NAME_PROTOCOL_VERSION,88            icon=ICON_PROTOCOL_VERSION,89            unit=UNIT_PROTOCOL_VERSION,90        )91    async def async_update(self) -> None:92        """Update protocol version."""93        self._state = self._server.protocol_version94class MinecraftServerLatencyTimeSensor(MinecraftServerSensorEntity):95    """Representation of a Minecraft Server latency time sensor."""96    def __init__(self, server: MinecraftServer) -> None:97        """Initialize latency time sensor."""98        super().__init__(99            server=server,100            type_name=NAME_LATENCY_TIME,101            icon=ICON_LATENCY_TIME,102            unit=TIME_MILLISECONDS,103        )104    async def async_update(self) -> None:105        """Update latency time."""106        self._state = self._server.latency_time107class MinecraftServerPlayersOnlineSensor(MinecraftServerSensorEntity):108    """Representation of a Minecraft Server online players sensor."""109    def __init__(self, server: MinecraftServer) -> None:110        """Initialize online players sensor."""111        super().__init__(112            server=server,113            type_name=NAME_PLAYERS_ONLINE,114            icon=ICON_PLAYERS_ONLINE,115            unit=UNIT_PLAYERS_ONLINE,116        )117    async def async_update(self) -> None:118        """Update online players state and device state attributes."""119        self._state = self._server.players_online120        extra_state_attributes = None121        players_list = self._server.players_list122        if players_list is not None and len(players_list) != 0:123            extra_state_attributes = {ATTR_PLAYERS_LIST: self._server.players_list}124        self._extra_state_attributes = extra_state_attributes125    @property126    def extra_state_attributes(self) -> dict[str, Any]:127        """Return players list in device state attributes."""128        return self._extra_state_attributes129class MinecraftServerPlayersMaxSensor(MinecraftServerSensorEntity):130    """Representation of a Minecraft Server maximum number of players sensor."""131    def __init__(self, server: MinecraftServer) -> None:132        """Initialize maximum number of players sensor."""133        super().__init__(134            server=server,135            type_name=NAME_PLAYERS_MAX,136            icon=ICON_PLAYERS_MAX,137            unit=UNIT_PLAYERS_MAX,138        )139    async def async_update(self) -> None:140        """Update maximum number of players."""...

Full Screen

Full Screen

server_runner.py

Source:server_runner.py Github

copy

Full Screen

1#2# Licensed to the Apache Software Foundation (ASF) under one3# or more contributor license agreements.  See the NOTICE file4# distributed with this work for additional information5# regarding copyright ownership.  The ASF licenses this file6# to you under the Apache License, Version 2.0 (the7# "License"); you may not use this file except in compliance8# with the License.  You may obtain a copy of the License at9#10#   http://www.apache.org/licenses/LICENSE-2.011#12# Unless required by applicable law or agreed to in writing,13# software distributed under the License is distributed on an14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY15# KIND, either express or implied.  See the License for the16# specific language governing permissions and limitations17# under the License.18#19import argparse20import grpc21from typing import Text22from ai_flow.endpoint.server.server import AIFlowServer23from ai_flow.endpoint.server.server_config import AIFlowServerConfig24from ai_flow.client.ai_flow_client import get_ai_flow_client25from ai_flow.settings import AIFLOW_HOME26from ai_flow.util.net_utils import get_ip_addr27import logging28_SQLITE_DB_FILE = 'aiflow.db'29_SQLITE_DB_URI = '%s%s' % ('sqlite:///', _SQLITE_DB_FILE)30_MYSQL_DB_URI = 'mysql+pymysql://root:aliyunmysql@localhost:3306/aiflow'31_PORT = '50051'32GLOBAL_MASTER_CONFIG = {}33class AIFlowServerRunner(object):34    """35    AI flow server runner. This class is the runner class for the AIFlowServer. It parse the server configuration and36    manage the live cycle of the AIFlowServer.37    """38    def __init__(self, config_file: Text = None, enable_ha=False, server_uri: str = None, ttl_ms=10000) -> None:39        """40        Set the server attribute according to the server config file.41        :param config_file: server configuration file.42        """43        super().__init__()44        self.config_file = config_file45        self.server = None46        self.server_config = AIFlowServerConfig()47        self.enable_ha = enable_ha48        self.server_uri = server_uri49        self.ttl_ms = ttl_ms50    def start(self,51              is_block=False) -> None:52        """53        Start the AI flow runner.54        :param is_block: AI flow runner will run non-stop if True.55        """56        if self.config_file is not None:57            self.server_config.load_from_file(self.config_file)58        else:59            self.server_config.set_server_port(str(_PORT))60        global GLOBAL_MASTER_CONFIG61        GLOBAL_MASTER_CONFIG = self.server_config62        logging.info("AIFlow Master Config {}".format(GLOBAL_MASTER_CONFIG))63        self.server = AIFlowServer(64            store_uri=self.server_config.get_db_uri(),65            port=str(self.server_config.get_server_port()),66            notification_server_uri=self.server_config.get_notification_server_uri(),67            start_meta_service=self.server_config.start_meta_service(),68            start_model_center_service=self.server_config.start_model_center_service(),69            start_metric_service=self.server_config.start_metric_service(),70            start_scheduler_service=self.server_config.start_scheduler_service(),71            scheduler_service_config=self.server_config.get_scheduler_service_config(),72            enabled_ha=self.server_config.get_enable_ha(),73            ha_server_uri=get_ip_addr() + ":" + str(self.server_config.get_server_port()),74            ttl_ms=self.server_config.get_ha_ttl_ms(),75            base_log_folder=self.server_config.get_base_log_folder()76            if self.server_config.get_base_log_folder()77            else AIFLOW_HOME)78        self.server.run(is_block=is_block)79        if not is_block:80            self._wait_for_server_available(timeout=self.server_config.get_wait_for_server_started_timeout())81    def stop(self, clear_sql_lite_db_file=True) -> None:82        """83        Stop the AI flow runner.84        :param clear_sql_lite_db_file: If True, the sqlite database files will be deleted When the server stops working.85        """86        self.server.stop(clear_sql_lite_db_file)87    def _clear_db(self):88        self.server._clear_db()89    def _wait_for_server_available(self, timeout):90        """91        Wait for server to be started and available.92        :param timeout: Float value. Seconds to wait for server available.93                        If None, wait forever until server started.94        """95        server_uri = 'localhost:{}'.format(self.server_config.get_server_port())96        try:97            channel = grpc.insecure_channel(server_uri)98            grpc.channel_ready_future(channel).result(timeout=timeout)99            logging.info("AIFlow Server started successfully.")100        except grpc.FutureTimeoutError as e:101            logging.error('AIFlow Server is not available after waiting for {} seconds.'.format(timeout))102            raise e103def set_master_config():104    code, config, message = get_ai_flow_client().get_master_config()105    for k, v in config.items():106        GLOBAL_MASTER_CONFIG[k] = v107if __name__ == '__main__':108    parser = argparse.ArgumentParser()109    parser.add_argument('--config', required=True, help='master config file')110    args = parser.parse_args()111    logging.info(args.config)112    config_file = args.config113    master = AIFlowServerRunner(114        config_file=config_file)...

Full Screen

Full Screen

gcdwebserver.gyp

Source:gcdwebserver.gyp Github

copy

Full Screen

1# Copyright 2014 The Chromium Authors. All rights reserved.2# Use of this source code is governed by a BSD-style license that can be3# found in the LICENSE file.4{5  'targets' : [6    {7      'target_name' : 'gcdwebserver',8      'type': 'static_library',9      'include_dirs': [10        'src/GCDWebServer/Core',11        'src/GCDWebServer/Requests',12        'src/GCDWebServer/Responses',13      ],14      'direct_dependent_settings': {15        'include_dirs': [16          'src/GCDWebServer/Core',17          'src/GCDWebServer/Requests',18          'src/GCDWebServer/Responses',19        ],20      },21      'xcode_settings': {22        'CLANG_ENABLE_OBJC_ARC': 'YES',23      },24      'sources': [25        'src/GCDWebServer/Core/GCDWebServer.h',26        'src/GCDWebServer/Core/GCDWebServer.m',27        'src/GCDWebServer/Core/GCDWebServerConnection.h',28        'src/GCDWebServer/Core/GCDWebServerConnection.m',29        'src/GCDWebServer/Core/GCDWebServerFunctions.h',30        'src/GCDWebServer/Core/GCDWebServerFunctions.m',31        'src/GCDWebServer/Core/GCDWebServerHTTPStatusCodes.h',32        'src/GCDWebServer/Core/GCDWebServerPrivate.h',33        'src/GCDWebServer/Core/GCDWebServerRequest.h',34        'src/GCDWebServer/Core/GCDWebServerRequest.m',35        'src/GCDWebServer/Core/GCDWebServerResponse.h',36        'src/GCDWebServer/Core/GCDWebServerResponse.m',37        'src/GCDWebServer/Requests/GCDWebServerDataRequest.h',38        'src/GCDWebServer/Requests/GCDWebServerDataRequest.m',39        'src/GCDWebServer/Requests/GCDWebServerFileRequest.h',40        'src/GCDWebServer/Requests/GCDWebServerFileRequest.m',41        'src/GCDWebServer/Requests/GCDWebServerMultiPartFormRequest.h',42        'src/GCDWebServer/Requests/GCDWebServerMultiPartFormRequest.m',43        'src/GCDWebServer/Requests/GCDWebServerURLEncodedFormRequest.h',44        'src/GCDWebServer/Requests/GCDWebServerURLEncodedFormRequest.m',45        'src/GCDWebServer/Responses/GCDWebServerDataResponse.h',46        'src/GCDWebServer/Responses/GCDWebServerDataResponse.m',47        'src/GCDWebServer/Responses/GCDWebServerErrorResponse.h',48        'src/GCDWebServer/Responses/GCDWebServerErrorResponse.m',49        'src/GCDWebServer/Responses/GCDWebServerFileResponse.h',50        'src/GCDWebServer/Responses/GCDWebServerFileResponse.m',51        'src/GCDWebServer/Responses/GCDWebServerStreamedResponse.h',52        'src/GCDWebServer/Responses/GCDWebServerStreamedResponse.m',53      ],54      'link_settings': {55        'libraries': [56          '$(SDKROOT)/System/Library/Frameworks/CFNetwork.framework',57          '$(SDKROOT)/System/Library/Frameworks/MobileCoreServices.framework',58        ],59        'xcode_settings': {60          'OTHER_LDFLAGS': [61            '-lz',62          ],63        },64      },65    },66  ],...

Full Screen

Full Screen

Using AI Code Generation

copy

Full Screen

1describe('My First Test', function() {2  it('Does not do much!', function() {3    cy.contains('type').click()4    cy.url().should('include', '/commands/actions')5    cy.get('.action-email')6      .type('fake@email')7      .should('have.value', 'fake@email')8  })9})10describe('My First Test', function() {11  it('Does not do much!', function() {12    cy.contains('type').click()13    cy.url().should('include', '/commands/actions')14    cy.get('.action-email')15      .type('fake@email')16      .should('have.value', 'fake@email')17  })18})19describe('My First Test', function() {20  it('Does not do much!', function() {21    cy.contains('type').click()22    cy.url().should('include', '/commands/actions')23    cy.get('.action-email')24      .type('fake@email')25      .should('have.value', 'fake@email')26  })27})28describe('My First Test', function() {29  it('Does not do much!', function() {30    cy.contains('type').click()31    cy.url().should('include', '/commands/actions')32    cy.get('.action-email')33      .type('fake@email')34      .should('have.value', 'fake@email')35  })36})37describe('My First Test', function() {38  it('Does not do much!', function() {39    cy.contains('type').click()40    cy.url().should('include', '/commands/actions')41    cy.get('.action-email')42      .type('fake@email')43      .should('have.value', 'fake@email')44  })45})46describe('My First Test', function() {47  it('Does not do much!', function() {

Full Screen

Using AI Code Generation

copy

Full Screen

1describe('My First Test Suite', function() 2{3it('My FirstTest case',function() {4cy.get('select').select('option2').should('have.value','option2')5cy.get('tr td:nth-child(2)').each(($e1, index, $list) => {6const text=$e1.text()7if(text.includes("Python"))8{9cy.get('tr td:nth-child(2)').eq(index).next().then(function(price)10{11const priceText=price.text()12expect(priceText).to.equal('25')13})14}15})16})17})

Full Screen

Using AI Code Generation

copy

Full Screen

1describe('My First Test', function() {2  it('Does not do much!', function() {3    cy.server()4    cy.route('POST', '/api/login').as('login')5    cy.contains('login').click()6    cy.get('#username').type('testuser')7    cy.get('#password').type('testpassword')8    cy.get('#login-button').click()9    cy.wait('@login')10    cy.contains('testuser logged in')11  })12})

Full Screen

Using AI Code Generation

copy

Full Screen

1describe('My First Test', function() {2  it('Does not do much!', function() {3    expect(true).to.equal(true)4  })5})6describe('My First Test', function() {7  it('Does not do much!', function() {8    expect(true).to.equal(true)9  })10})

Full Screen

Using AI Code Generation

copy

Full Screen

1describe('My First Test', function() {2  it('Does not do much!', function() {3    cy.contains('h1', 'Hello World')4    cy.server()5    cy.route('POST', '/api/todos').as('create')6    cy.get('.new-todo').type('test{enter}')7    cy.wait('@create')8    cy.get('.todo-list li').should('have.length', 1)9  })10})

Full Screen

Using AI Code Generation

copy

Full Screen

1describe('test', () => {2    it('test', () => {3        cy.server();4        cy.route({5            response: {6            }7        });8    });9});

Full Screen

Using AI Code Generation

copy

Full Screen

1describe('My Second Test Suite', function() 2{3  it('My FirstTest case',function() {4    cy.get('tr td:nth-child(2)').each(($el, index, $list) => {5      const text = $el.text()6      if(text.includes('Python')){7        cy.get('tr td:nth-child(2)').eq(index).next().then(function(price){8          const priceText = price.text()9          expect(priceText).to.equal('25')10        })11      }12    })13  })14})

Full Screen

Using AI Code Generation

copy

Full Screen

1describe('My First Test', function() {2  it('Does not do much!', function() {3    cy.get('[data-testid="add-button"]').click()4    cy.get('[data-testid="input"]').type('test')5    cy.get('[data-testid="add-button"]').click()6    cy.get('[data-testid="input"]').type('test1')7    cy.get('[data-testid="add-button"]').click()8    cy.get('[data-testid="input"]').type('test2')9    cy.get('[data-testid="add-button"]').click()10    cy.get('[data-testid="input"]').type('test3')11    cy.get('[data-testid="add-button"]').click()12    cy.get('[data-testid="input"]').type('test4')13    cy.get('[data-testid="add-button"]').click()14    cy.get('[data-testid="input"]').type('test5')15    cy.get('[data-testid="add-button"]').click()16    cy.get('[data-testid="input"]').type('test6')17    cy.get('[data-testid="add-button"]').click()18    cy.get('[data-testid="input"]').type('test7')19    cy.get('[data-testid="add-button"]').click()20    cy.get('[data-testid="input"]').type('test8')21    cy.get('[data-testid="add-button"]').click()22    cy.get('[data-testid="input"]').type('test9')23    cy.get('[data-testid="add-button"]').click()24    cy.get('[data-testid="input"]').type('test10')25    cy.get('[data-testid="add-button"]').click()26    cy.get('[data-testid="input"]').type('test11')27    cy.get('[data-testid="add-button"]').click()28    cy.get('[data-testid="input"]').type('test12')29    cy.get('[data-testid="add-button"]').click()30    cy.get('[data-testid="input"]').type('test13')31    cy.get('[data-testid="add-button"]').click()32    cy.get('[data-testid="input"]').type('test14')33    cy.get('[data-testid="add-button"]').click()34    cy.get('[data-testid="input"]').type('test15')35    cy.get('[data-testid="add-button"]').click()36    cy.get('[data-testid="input"]').type('test16')37    cy.get('[data-testid="add-button"]').click()38    cy.get('[data-testid="input"]').type('test17')

Full Screen

Cypress Tutorial

Cypress is a renowned Javascript-based open-source, easy-to-use end-to-end testing framework primarily used for testing web applications. Cypress is a relatively new player in the automation testing space and has been gaining much traction lately, as evidenced by the number of Forks (2.7K) and Stars (42.1K) for the project. LambdaTest’s Cypress Tutorial covers step-by-step guides that will help you learn from the basics till you run automation tests on LambdaTest.

Chapters:

  1. What is Cypress? -
  2. Why Cypress? - Learn why Cypress might be a good choice for testing your web applications.
  3. Features of Cypress Testing - Learn about features that make Cypress a powerful and flexible tool for testing web applications.
  4. Cypress Drawbacks - Although Cypress has many strengths, it has a few limitations that you should be aware of.
  5. Cypress Architecture - Learn more about Cypress architecture and how it is designed to be run directly in the browser, i.e., it does not have any additional servers.
  6. Browsers Supported by Cypress - Cypress is built on top of the Electron browser, supporting all modern web browsers. Learn browsers that support Cypress.
  7. Selenium vs Cypress: A Detailed Comparison - Compare and explore some key differences in terms of their design and features.
  8. Cypress Learning: Best Practices - Take a deep dive into some of the best practices you should use to avoid anti-patterns in your automation tests.
  9. How To Run Cypress Tests on LambdaTest? - Set up a LambdaTest account, and now you are all set to learn how to run Cypress tests.

Certification

You can elevate your expertise with end-to-end testing using the Cypress automation framework and stay one step ahead in your career by earning a Cypress certification. Check out our Cypress 101 Certification.

YouTube

Watch this 3 hours of complete tutorial to learn the basics of Cypress and various Cypress commands with the Cypress testing at LambdaTest.

Run Cypress automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful