Best Python code snippet using tempest_python
test_ssl.py
Source:test_ssl.py  
...90                               stdout=subprocess.PIPE,91                               stderr=subprocess.PIPE)92        time.sleep(sleepTime)93        return pid94    def stop_server(self, pid):95        pid.terminate()96        out, err = pid.communicate()97        return six.ensure_text(out), six.ensure_text(err)98    def http_get(self, s):99        s.send(b'GET / HTTP/1.0\n\n')100        resp = b''101        while 1:102            try:103                r = s.recv(4096)104                if not r:105                    break106            except SSL.SSLError:  # s_server throws an 'unexpected eof'...107                break108            resp = resp + r109        return six.ensure_text(resp)110    def setUp(self):111        self.srv_host = srv_host112        self.srv_port = allocate_srv_port()113        self.srv_addr = (srv_host, self.srv_port)114        self.srv_url = 'https://%s:%s/' % (srv_host, self.srv_port)115        self.args = ['s_server', '-quiet', '-www',116                     # '-cert', 'server.pem', Implicitly using this117                     '-accept', str(self.srv_port)]118class PassSSLClientTestCase(BaseSSLClientTestCase):119    def test_pass(self):120        pass121class HttpslibSSLClientTestCase(BaseSSLClientTestCase):122    def setUp(self):123        super(HttpslibSSLClientTestCase, self).setUp()124        self.ctx = SSL.Context()125    def tearDown(self):126        self.ctx.close()127    def test_HTTPSConnection(self):128        pid = self.start_server(self.args)129        try:130            c = httpslib.HTTPSConnection(srv_host, self.srv_port)131            c.request('GET', '/')132            data = c.getresponse().read()133            c.close()134        finally:135            self.stop_server(pid)136        self.assertIn('s_server -quiet -www', six.ensure_text(data))137    def test_HTTPSConnection_resume_session(self):138        pid = self.start_server(self.args)139        try:140            self.ctx.load_verify_locations(cafile='tests/ca.pem')141            self.ctx.load_cert('tests/x509.pem')142            self.ctx.set_verify(143                SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 1)144            self.ctx.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT)145            c = httpslib.HTTPSConnection(srv_host, self.srv_port,146                                         ssl_context=self.ctx)147            c.request('GET', '/')148            ses = c.get_session()149            t = ses.as_text()150            data = c.getresponse().read()151            # Appearently closing connection here screws session; Ali Polatel?152            # c.close()153            ctx2 = SSL.Context()154            ctx2.load_verify_locations(cafile='tests/ca.pem')155            ctx2.load_cert('tests/x509.pem')156            ctx2.set_verify(157                SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 1)158            ctx2.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT)159            c2 = httpslib.HTTPSConnection(srv_host, self.srv_port,160                                          ssl_context=ctx2)161            c2.set_session(ses)162            c2.request('GET', '/')163            ses2 = c2.get_session()164            t2 = ses2.as_text()165            data = six.ensure_text(c2.getresponse().read())166            c.close()167            c2.close()168            self.assertEqual(t, t2, "Sessions did not match")169        finally:170            self.stop_server(pid)171        self.assertIn('s_server -quiet -www', data)172    def test_HTTPSConnection_secure_context(self):173        pid = self.start_server(self.args)174        try:175            self.ctx.set_verify(176                SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)177            self.ctx.load_verify_locations('tests/ca.pem')178            c = httpslib.HTTPSConnection(srv_host, self.srv_port,179                                         ssl_context=self.ctx)180            c.request('GET', '/')181            data = six.ensure_text(c.getresponse().read())182            c.close()183        finally:184            self.stop_server(pid)185        self.assertIn('s_server -quiet -www', data)186    def test_HTTPSConnection_secure_context_fail(self):187        pid = self.start_server(self.args)188        try:189            self.ctx.set_verify(190                SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)191            self.ctx.load_verify_locations('tests/server.pem')192            c = httpslib.HTTPSConnection(srv_host, self.srv_port,193                                         ssl_context=self.ctx)194            with self.assertRaises(SSL.SSLError):195                c.request('GET', '/')196            c.close()197        finally:198            self.stop_server(pid)199    def test_HTTPSConnection_illegalkeywordarg(self):200        with self.assertRaises(ValueError):201            httpslib.HTTPSConnection('example.org', badKeyword=True)202class HttpslibSSLSNIClientTestCase(BaseSSLClientTestCase):203    def setUp(self):204        super(HttpslibSSLSNIClientTestCase, self).setUp()205        self.args = ['s_server', '-servername', srv_host, '-debug', '-www', '-msg',206                     '-cert', 'server.pem', '-key', 'server_key.pem',207                     '-cert2', 'server.pem', '-key2', 'server_key.pem',208                     '-accept', str(self.srv_port)]209        self.ctx = SSL.Context()210    def tearDown(self):211        self.ctx.close()212    def test_SNI_support(self):213        pid = self.start_server(self.args)214        try:215            c = httpslib.HTTPSConnection(self.srv_host, self.srv_port,216                                         ssl_context=self.ctx)217            c.request('GET', '/')218            c.close()219        finally:220            # (openssl s_server) buffers its log output, and ends the TLS session221            # with the client (allowing the client to terminate) before flushing222            # the log; so, the client may get here and terminate the server223            # before it manages to log the output.224            # So, give the server hopefully enough time to flush the logs.225            time.sleep(sleepTime)226            out, _ = self.stop_server(pid)227        self.assertIn('Hostname in TLS extension: "%s"' % srv_host, out)228    def test_IP_call(self):229        no_exception = True230        runs_counter = 0231        pid = self.start_server(self.args)232        for entry in socket.getaddrinfo(self.srv_host, self.srv_port,233                                        socket.AF_INET,234                                        socket.SOCK_STREAM,235                                        socket.IPPROTO_TCP):236            ipfamily, socktype, _, _, sockaddr = entry237            ip = sockaddr[0]238            sock = socket.socket(ipfamily, socktype)239            conn = SSL.Connection(self.ctx, sock=sock)240            conn.set_tlsext_host_name(self.srv_host)241            conn.set1_host(self.srv_host)242            runs_counter += 1243            try:244                conn.connect((ip, self.srv_port))245            except (SSL.SSLError, socket.error):246                log.exception("Failed to connect to %s:%s", ip, self.srv_port)247                no_exception = False248            finally:249                conn.close()250        out, _ = self.stop_server(pid)251        self.assertEqual(252            out.count('Hostname in TLS extension: "%s"' % self.srv_host),253            runs_counter)254        self.assertTrue(no_exception)255class MiscSSLClientTestCase(BaseSSLClientTestCase):256    def test_no_connection(self):257        ctx = SSL.Context()258        SSL.Connection(ctx)259    def test_server_simple(self):260        pid = self.start_server(self.args)261        try:262            with self.assertRaises(ValueError):263                SSL.Context('tlsv5')264            ctx = SSL.Context()265            s = SSL.Connection(ctx)266            s.connect(self.srv_addr)267            with self.assertRaises(ValueError):268                s.read(0)269            data = self.http_get(s)270            s.close()271        finally:272            self.stop_server(pid)273        self.assertIn('s_server -quiet -www', data)274    def test_server_simple_secure_context(self):275        pid = self.start_server(self.args)276        try:277            ctx = SSL.Context()278            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,279                           9)280            ctx.load_verify_locations('tests/ca.pem')281            s = SSL.Connection(ctx)282            s.connect(self.srv_addr)283            data = self.http_get(s)284            s.close()285        finally:286            self.stop_server(pid)287        self.assertIn('s_server -quiet -www', data)288    def test_server_simple_secure_context_fail(self):289        pid = self.start_server(self.args)290        try:291            ctx = SSL.Context()292            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,293                           9)294            ctx.load_verify_locations('tests/server.pem')295            s = SSL.Connection(ctx)296            with self.assertRaises(SSL.SSLError):297                s.connect(self.srv_addr)298            s.close()299        finally:300            self.stop_server(pid)301    def test_server_simple_timeouts(self):302        pid = self.start_server(self.args)303        try:304            with self.assertRaises(ValueError):305                SSL.Context('tlsv5')306            ctx = SSL.Context()307            s = SSL.Connection(ctx)308            r = s.get_socket_read_timeout()309            w = s.get_socket_write_timeout()310            self.assertEqual(r.sec, 0, r.sec)311            self.assertEqual(r.microsec, 0, r.microsec)312            self.assertEqual(w.sec, 0, w.sec)313            self.assertEqual(w.microsec, 0, w.microsec)314            s.set_socket_read_timeout(SSL.timeout())315            s.set_socket_write_timeout(SSL.timeout(909, 9))316            r = s.get_socket_read_timeout()317            w = s.get_socket_write_timeout()318            self.assertEqual(r.sec, 600, r.sec)319            self.assertEqual(r.microsec, 0, r.microsec)320            self.assertEqual(w.sec, 909, w.sec)321            # self.assertEqual(w.microsec, 9, w.microsec) XXX 4000322            s.connect(self.srv_addr)323            data = self.http_get(s)324            s.close()325        finally:326            self.stop_server(pid)327        self.assertIn('s_server -quiet -www', data)328    # TLS is required in FIPS mode329    @unittest.skipIf(fips_mode, "Can't be run in FIPS mode")330    def test_tls1_nok(self):331        self.args.append('-no_tls1')332        pid = self.start_server(self.args)333        try:334            with warnings.catch_warnings():335                warnings.simplefilter('ignore', DeprecationWarning)336                ctx = SSL.Context('tlsv1')337            s = SSL.Connection(ctx)338            with six.assertRaisesRegex(self, SSL.SSLError,339                                       r'version|unexpected eof'):340                s.connect(self.srv_addr)341            s.close()342        finally:343            self.stop_server(pid)344    def test_tls1_ok(self):345        self.args.append('-tls1')346        pid = self.start_server(self.args)347        try:348            with warnings.catch_warnings():349                warnings.simplefilter('ignore', DeprecationWarning)350                ctx = SSL.Context('tlsv1')351            s = SSL.Connection(ctx)352            s.connect(self.srv_addr)353            data = self.http_get(s)354            s.close()355        finally:356            self.stop_server(pid)357        self.assertIn('s_server -quiet -www', data)358    def test_cipher_mismatch(self):359        self.args = self.args + ['-cipher', 'AES256-SHA']360        pid = self.start_server(self.args)361        try:362            ctx = SSL.Context()363            s = SSL.Connection(ctx)364            s.set_cipher_list('AES128-SHA')365            with six.assertRaisesRegex(self, SSL.SSLError,366                                       'sslv3 alert handshake failure'):367                s.connect(self.srv_addr)368            s.close()369        finally:370            self.stop_server(pid)371    def test_no_such_cipher(self):372        self.args = self.args + ['-cipher', 'AES128-SHA']373        pid = self.start_server(self.args)374        try:375            ctx = SSL.Context()376            s = SSL.Connection(ctx)377            s.set_cipher_list('EXP-RC2-MD5')378            with six.assertRaisesRegex(self, SSL.SSLError,379                                       'no ciphers available'):380                s.connect(self.srv_addr)381            s.close()382        finally:383            self.stop_server(pid)384    def test_cipher_ok(self):385        self.args = self.args + ['-cipher', 'AES128-SHA']386        pid = self.start_server(self.args)387        try:388            ctx = SSL.Context()389            s = SSL.Connection(ctx)390            s.set_cipher_list('AES128-SHA')391            s.connect(self.srv_addr)392            data = self.http_get(s)393            self.assertEqual(s.get_cipher().name(), 'AES128-SHA',394                             s.get_cipher().name())395            cipher_stack = s.get_ciphers()396            self.assertEqual(cipher_stack[0].name(), 'AES128-SHA',397                             cipher_stack[0].name())398            with self.assertRaises(IndexError):399                cipher_stack.__getitem__(2)400            # For some reason there are 2 entries in the stack401            # self.assertEqual(len(cipher_stack), 1, len(cipher_stack))402            self.assertEqual(s.get_cipher_list(), 'AES128-SHA',403                             s.get_cipher_list())404            # Test Cipher_Stack iterator405            i = 0406            for cipher in cipher_stack:407                i += 1408                self.assertEqual(cipher.name(), 'AES128-SHA',409                                 '"%s"' % cipher.name())410                self.assertEqual('AES128-SHA-128', str(cipher))411            # For some reason there are 2 entries in the stack412            # self.assertEqual(i, 1, i)413            self.assertEqual(i, len(cipher_stack))414            s.close()415        finally:416            self.stop_server(pid)417        self.assertIn('s_server -quiet -www', data)418    def verify_cb_new(self, ok, store):419        return verify_cb_new_function(ok, store)420    def test_verify_cb_new(self):421        pid = self.start_server(self.args)422        try:423            ctx = SSL.Context()424            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,425                           9, self.verify_cb_new)426            s = SSL.Connection(ctx)427            try:428                s.connect(self.srv_addr)429            except SSL.SSLError as e:430                self.fail(e)431            data = self.http_get(s)432            s.close()433        finally:434            self.stop_server(pid)435        self.assertIn('s_server -quiet -www', data)436    def test_verify_cb_new_class(self):437        pid = self.start_server(self.args)438        try:439            ctx = SSL.Context()440            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,441                           9, VerifyCB())442            s = SSL.Connection(ctx)443            try:444                s.connect(self.srv_addr)445            except SSL.SSLError as e:446                log.exception(e)447                self.fail(e)448            data = self.http_get(s)449            s.close()450        finally:451            self.stop_server(pid)452        self.assertIn('s_server -quiet -www', data)453    def test_verify_cb_new_function(self):454        pid = self.start_server(self.args)455        try:456            ctx = SSL.Context()457            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,458                           9, verify_cb_new_function)459            s = SSL.Connection(ctx)460            try:461                s.connect(self.srv_addr)462            except SSL.SSLError as e:463                self.fail(e)464            data = self.http_get(s)465            s.close()466        finally:467            self.stop_server(pid)468        self.assertIn('s_server -quiet -www', data)469    def test_verify_cb_lambda(self):470        pid = self.start_server(self.args)471        try:472            ctx = SSL.Context()473            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,474                           9, lambda ok, store: 1)475            s = SSL.Connection(ctx)476            try:477                s.connect(self.srv_addr)478            except SSL.SSLError as e:479                self.fail(e)480            data = self.http_get(s)481            s.close()482        finally:483            self.stop_server(pid)484        self.assertIn('s_server -quiet -www', data)485    def verify_cb_exception(self, ok, store):486        self.fail('We should fail verification')487    def test_verify_cb_exception(self):488        pid = self.start_server(self.args)489        try:490            ctx = SSL.Context()491            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,492                           9, self.verify_cb_exception)493            s = SSL.Connection(ctx)494            with self.assertRaises(SSL.SSLError):495                s.connect(self.srv_addr)496            s.close()497        finally:498            self.stop_server(pid)499    def test_verify_cb_not_callable(self):500        ctx = SSL.Context()501        with self.assertRaises(TypeError):502            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,503                           9, 1)504    def test_verify_cb_wrong_callable(self):505        pid = self.start_server(self.args)506        try:507            ctx = SSL.Context()508            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,509                           9, lambda _: '')510            s = SSL.Connection(ctx)511            with self.assertRaises(SSL.SSLError):512                with warnings.catch_warnings():513                    warnings.simplefilter('ignore', DeprecationWarning)514                    s.connect(self.srv_addr)515            s.close()516        finally:517            self.stop_server(pid)518    def verify_cb_old(self, ctx_ptr, x509_ptr, err, depth, ok):519        try:520            self.assertFalse(ok)521            self.assertIn(err,522                          [m2.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT,523                           m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY,524                           m2.X509_V_ERR_CERT_UNTRUSTED,525                           m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE])526            self.assertTrue(m2.ssl_ctx_get_cert_store(ctx_ptr))527            self.assertTrue(X509.X509(x509_ptr).as_pem())528        except AssertionError:529            # If we let exceptions propagate from here the530            # caller may see strange errors. This is cleaner.531            return 0532        return 1533    def test_verify_cb_old(self):534        with warnings.catch_warnings():535            warnings.simplefilter("ignore", DeprecationWarning)536            pid = self.start_server(self.args)537            try:538                ctx = SSL.Context()539                ctx.set_verify(540                    SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,541                    9, self.verify_cb_old)542                s = SSL.Connection(ctx)543                try:544                    s.connect(self.srv_addr)545                except SSL.SSLError as e:546                    self.fail(e)547                data = self.http_get(s)548                s.close()549            finally:550                self.stop_server(pid)551            self.assertIn('s_server -quiet -www', data)552    def test_verify_allow_unknown_old(self):553        pid = self.start_server(self.args)554        try:555            ctx = SSL.Context()556            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,557                           9, SSL.cb.ssl_verify_callback_allow_unknown_ca)558            ctx.set_allow_unknown_ca(1)559            s = SSL.Connection(ctx)560            try:561                s.connect(self.srv_addr)562            except SSL.SSLError:563                log.error('Failed to connect to %s', self.srv_addr)564                raise565            data = self.http_get(s)566            s.close()567        finally:568            self.stop_server(pid)569        self.assertIn('s_server -quiet -www', data)570    def test_verify_allow_unknown_new(self):571        pid = self.start_server(self.args)572        try:573            ctx = SSL.Context()574            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,575                           9, SSL.cb.ssl_verify_callback_allow_unknown_ca)576            s = SSL.Connection(ctx)577            try:578                s.connect(self.srv_addr)579            except SSL.SSLError as e:580                self.fail(e)581            data = self.http_get(s)582            s.close()583        finally:584            self.stop_server(pid)585        self.assertIn('s_server -quiet -www', data)586    def test_verify_cert(self):587        pid = self.start_server(self.args)588        try:589            ctx = SSL.Context()590            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,591                           9)592            ctx.load_verify_locations('tests/ca.pem')593            s = SSL.Connection(ctx)594            try:595                s.connect(self.srv_addr)596            except SSL.SSLError as e:597                self.fail(e)598            data = self.http_get(s)599            s.close()600        finally:601            self.stop_server(pid)602        self.assertIn('s_server -quiet -www', data)603    def test_verify_cert_fail(self):604        pid = self.start_server(self.args)605        try:606            ctx = SSL.Context()607            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,608                           9)609            ctx.load_verify_locations('tests/server.pem')610            s = SSL.Connection(ctx)611            with self.assertRaises(SSL.SSLError):612                s.connect(self.srv_addr)613            s.close()614        finally:615            self.stop_server(pid)616    def test_verify_cert_mutual_auth(self):617        self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem'])618        pid = self.start_server(self.args)619        try:620            ctx = SSL.Context()621            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,622                           9)623            ctx.load_verify_locations('tests/ca.pem')624            ctx.load_cert('tests/x509.pem')625            s = SSL.Connection(ctx)626            try:627                s.connect(self.srv_addr)628            except SSL.SSLError as e:629                self.fail(e)630            data = self.http_get(s)631            s.close()632        finally:633            self.stop_server(pid)634        self.assertIn('s_server -quiet -www', data)635    def test_verify_cert_mutual_auth_servernbio(self):636        self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem', '-nbio'])637        pid = self.start_server(self.args)638        try:639            ctx = SSL.Context()640            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,641                           9)642            ctx.load_verify_locations('tests/ca.pem')643            ctx.load_cert('tests/x509.pem')644            s = SSL.Connection(ctx)645            try:646                s.connect(self.srv_addr)647            except SSL.SSLError as e:648                self.fail(e)649            data = self.http_get(s)650            s.close()651        finally:652            self.stop_server(pid)653        self.assertIn('s_server -quiet -www', data)654    def test_verify_cert_mutual_auth_fail(self):655        self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem'])656        pid = self.start_server(self.args)657        try:658            ctx = SSL.Context()659            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,660                           9)661            ctx.load_verify_locations('tests/ca.pem')662            s = SSL.Connection(ctx)663            with self.assertRaises(SSL.SSLError):664                s.connect(self.srv_addr)665            s.close()666        finally:667            self.stop_server(pid)668    def test_verify_nocert_fail(self):669        self.args.extend(['-nocert'])670        pid = self.start_server(self.args)671        try:672            ctx = SSL.Context()673            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,674                           9)675            ctx.load_verify_locations('tests/ca.pem')676            s = SSL.Connection(ctx)677            with self.assertRaises(SSL.SSLError):678                s.connect(self.srv_addr)679            s.close()680        finally:681            self.stop_server(pid)682    def test_blocking0(self):683        pid = self.start_server(self.args)684        try:685            ctx = SSL.Context()686            s = SSL.Connection(ctx)687            s.setblocking(0)688            with self.assertRaises(Exception):689                s.connect(self.srv_addr)690            s.close()691        finally:692            self.stop_server(pid)693    def test_blocking1(self):694        pid = self.start_server(self.args)695        try:696            ctx = SSL.Context()697            s = SSL.Connection(ctx)698            s.setblocking(1)699            try:700                s.connect(self.srv_addr)701            except SSL.SSLError as e:702                self.fail(e)703            data = self.http_get(s)704            s.close()705        finally:706            self.stop_server(pid)707        self.assertIn('s_server -quiet -www', data)708    def test_makefile(self):709        pid = self.start_server(self.args)710        try:711            ctx = SSL.Context()712            s = SSL.Connection(ctx)713            try:714                s.connect(self.srv_addr)715            except SSL.SSLError as e:716                self.fail(e)717            bio = s.makefile('rwb')718            # s.close()  # XXX bug 6628?719            bio.write(b'GET / HTTP/1.0\n\n')720            bio.flush()721            data = bio.read()722            bio.close()723            s.close()724        finally:725            self.stop_server(pid)726        self.assertIn(b's_server -quiet -www', data)727    def test_makefile_err(self):728        pid = self.start_server(self.args)729        try:730            ctx = SSL.Context()731            s = SSL.Connection(ctx)732            try:733                s.connect(self.srv_addr)734            except SSL.SSLError as e:735                self.fail(e)736            f = s.makefile()737            data = self.http_get(s)738            s.close()739            del f740            del s741            err_code = Err.peek_error_code()742            self.assertEqual(err_code, 0,743                             'Unexpected error: %s' % err_code)744            err = Err.get_error()745            self.assertIsNone(err, 'Unexpected error: %s' % err)746        finally:747            self.stop_server(pid)748        self.assertIn('s_server -quiet -www', data)749    def test_info_callback(self):750        pid = self.start_server(self.args)751        try:752            ctx = SSL.Context()753            ctx.set_info_callback()754            s = SSL.Connection(ctx)755            s.connect(self.srv_addr)756            data = self.http_get(s)757            s.close()758        finally:759            self.stop_server(pid)760        self.assertIn('s_server -quiet -www', data)761    def test_ssl_connection_free(self):762        pid = self.start_server(self.args)763        orig_m2_ssl_free = SSL.Connection.m2_ssl_free764        def _m2_ssl_free(ssl):765            orig_m2_ssl_free(ssl)766            _m2_ssl_free.called = True767        try:768            ctx = SSL.Context()769            s = SSL.Connection(ctx)770            s.m2_ssl_free = _m2_ssl_free771            s.connect(self.srv_addr)772            data = self.http_get(s)773            s.close()774            self.assertFalse(hasattr(_m2_ssl_free, 'called'))775            # keep fingers crossed that SSL.Connection.__del__ is called776            # by the python interpreter777            del s778        finally:779            self.stop_server(pid)780        self.assertIn('s_server -quiet -www', data)781        self.assertTrue(getattr(_m2_ssl_free, 'called', False))782    def test_ssl_connection_no_free(self):783        pid = self.start_server(self.args)784        orig_m2_ssl_free = SSL.Connection.m2_ssl_free785        def _m2_ssl_free(ssl):786            _m2_ssl_free.called = True787            orig_m2_ssl_free(ssl)788        try:789            ctx = SSL.Context()790            s = SSL.Connection(ctx)791            s.m2_ssl_free = _m2_ssl_free792            s.set_ssl_close_flag(m2.bio_close)793            s.connect(self.srv_addr)794            data = self.http_get(s)795            s.close()796            self.assertFalse(hasattr(_m2_ssl_free, 'called'))797            # keep fingers crossed that SSL.Connection.__del__ is called798            # by the python interpreter799            del s800        finally:801            self.stop_server(pid)802        self.assertIn('s_server -quiet -www', data)803        self.assertFalse(hasattr(_m2_ssl_free, 'called'))804class UrllibSSLClientTestCase(BaseSSLClientTestCase):805    @unittest.skipIf(six.PY3, "urllib.URLOpener is deprecated in py3k")806    def test_urllib(self):807        pid = self.start_server(self.args)808        try:809            url = m2urllib.FancyURLopener()810            url.addheader('Connection', 'close')811            u = url.open('https://%s:%s/' % (srv_host, self.srv_port))812            data = u.read()813            u.close()814        finally:815            self.stop_server(pid)816        self.assertIn('s_server -quiet -www', data)817    # XXX Don't actually know how to use m2urllib safely!818    # def test_urllib_safe_context(self):819    # def test_urllib_safe_context_fail(self):820class Urllib2SSLClientTestCase(BaseSSLClientTestCase):821    def test_urllib2(self):822        pid = self.start_server(self.args)823        try:824            opener = m2urllib2.build_opener()825            opener.addheaders = [('Connection', 'close')]826            u = opener.open('https://%s:%s/' % (srv_host, self.srv_port))827            data = u.read()828            u.close()829        finally:830            self.stop_server(pid)831        self.assertIn(b's_server -quiet -www', data)832    def test_urllib2_secure_context(self):833        pid = self.start_server(self.args)834        try:835            ctx = SSL.Context()836            ctx.set_verify(837                SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)838            ctx.load_verify_locations('tests/ca.pem')839            opener = m2urllib2.build_opener(ctx)840            opener.addheaders = [('Connection', 'close')]841            u = opener.open('https://%s:%s/' % (srv_host, self.srv_port))842            data = u.read()843            u.close()844        finally:845            self.stop_server(pid)846        self.assertIn(b's_server -quiet -www', data)847    def test_urllib2_secure_context_fail(self):848        pid = self.start_server(self.args)849        try:850            ctx = SSL.Context()851            ctx.set_verify(852                SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)853            ctx.load_verify_locations('tests/server.pem')854            opener = m2urllib2.build_opener(ctx)855            opener.addheaders = [('Connection', 'close')]856            with self.assertRaises(SSL.SSLError):857                opener.open('https://%s:%s/' % (srv_host, self.srv_port))858        finally:859            self.stop_server(pid)860    def test_z_urllib2_opener(self):861        pid = self.start_server(self.args)862        try:863            ctx = SSL.Context()864            opener = m2urllib2.build_opener(865                ctx, m2urllib2.HTTPBasicAuthHandler())866            m2urllib2.install_opener(opener)867            req = m2urllib2.Request('https://%s:%s/' %868                                    (srv_host, self.srv_port))869            u = m2urllib2.urlopen(req)870            data = u.read()871            u.close()872        finally:873            self.stop_server(pid)874        self.assertIn(b's_server -quiet -www', data)875    def test_urllib2_opener_handlers(self):876        ctx = SSL.Context()877        m2urllib2.build_opener(ctx, m2urllib2.HTTPBasicAuthHandler())878    def test_urllib2_leak(self):879        pid = self.start_server(self.args)880        try:881            o = m2urllib2.build_opener()882            r = o.open('https://%s:%s/' % (srv_host, self.srv_port))883            s = [r.fp._sock.fp]884            r.close()885            # TODO This should be assertEqual 1, but we leak sock886            # somehwere. Not sure how to fix it.887            log.debug('get_referrers = %d', len(gc.get_referrers(s[0])))888            self.assertLessEqual(len(gc.get_referrers(s[0])), 2)889        finally:890            self.stop_server(pid)891class Urllib2TEChunkedSSLClientTestCase(BaseSSLClientTestCase):892    """Test a response with "Transfer-Encoding: chunked"."""893    def setUp(self):894        super(Urllib2TEChunkedSSLClientTestCase, self).setUp()895        self.args = ['s_server', '-quiet', '-HTTP',896                     '-accept', str(self.srv_port)]897    def test_transfer_encoding_chunked(self):898        pid = self.start_server(self.args)899        try:900            url = 'https://%s:%s/te_chunked_response.txt' % (srv_host,901                                                             self.srv_port)902            o = m2urllib2.build_opener()903            u = o.open(url)904            data = u.read()905            self.assertEqual(b'foo\nfoobar\n', data)906        finally:907            self.stop_server(pid)908@unittest.skipUnless(py27plus,909                     "Twisted doesn't test well with Python 2.6")910class TwistedSSLClientTestCase(BaseSSLClientTestCase):911    def test_timeout(self):912        pid = self.start_server(self.args)913        try:914            ctx = SSL.Context()915            s = SSL.Connection(ctx)916            # Just a really small number so we can timeout917            s.settimeout(0.000000000000000000000000000001)918            with self.assertRaises(SSL.SSLTimeoutError):919                s.connect(self.srv_addr)920            s.close()921        finally:922            self.stop_server(pid)923    def test_makefile_timeout(self):924        # httpslib uses makefile to read the response925        pid = self.start_server(self.args)926        try:927            c = httpslib.HTTPSConnection(srv_host, self.srv_port)928            c.putrequest('GET', '/')929            c.putheader('Accept', 'text/html')930            c.putheader('Accept', 'text/plain')931            c.endheaders()932            c.sock.settimeout(100)933            resp = c.getresponse()934            self.assertEqual(resp.status, 200, resp.reason)935            data = resp.read()936            c.close()937        finally:938            self.stop_server(pid)939        self.assertIn(b's_server -quiet -www', data)940    def test_makefile_timeout_fires(self):941        # This is convoluted because (openssl s_server -www) starts942        # writing the response as soon as it receives the first line of943        # the request, so it's possible for it to send the response944        # before the request is sent and there would be no timeout.  So,945        # let the server spend time reading from an empty pipe946        FIFO_NAME = 'test_makefile_timeout_fires_fifo'  # noqa947        os.mkfifo('tests/' + FIFO_NAME)948        pipe_pid = os.fork()949        try:950            if pipe_pid == 0:951                try:952                    with open('tests/' + FIFO_NAME, 'w') as f:953                        time.sleep(sleepTime + 1)954                        f.write('Content\n')955                finally:956                    os._exit(0)957            self.args[self.args.index('-www')] = '-WWW'958            pid = self.start_server(self.args)959            try:960                c = httpslib.HTTPSConnection(srv_host, self.srv_port)961                c.putrequest('GET', '/' + FIFO_NAME)962                c.putheader('Accept', 'text/html')963                c.putheader('Accept', 'text/plain')964                c.endheaders()965                c.sock.settimeout(0.0000000001)966                with self.assertRaises(socket.timeout):967                    c.getresponse()968                c.close()969            finally:970                self.stop_server(pid)971        finally:972            os.kill(pipe_pid, signal.SIGTERM)973            os.waitpid(pipe_pid, 0)974            os.unlink('tests/' + FIFO_NAME)975    def test_twisted_wrapper(self):976        # Test only when twisted and ZopeInterfaces are present977        try:978            from twisted.internet.protocol import ClientFactory979            from twisted.protocols.basic import LineReceiver980            from twisted.internet import reactor981            import M2Crypto.SSL.TwistedProtocolWrapper as wrapper982        except ImportError:983            warnings.warn(984                'Skipping twisted wrapper test because twisted not found')985            return986        # TODO Class must implement all abstract methods987        class EchoClient(LineReceiver):988            def connectionMade(self):989                self.sendLine(b'GET / HTTP/1.0\n\n')990            def lineReceived(self, line):991                global twisted_data992                twisted_data += line993        class EchoClientFactory(ClientFactory):994            protocol = EchoClient995            def clientConnectionFailed(self, connector, reason):996                reactor.stop()997                self.fail(reason)998            def clientConnectionLost(self, connector, reason):999                reactor.stop()1000        pid = self.start_server(self.args)1001        class ContextFactory(object):1002            def getContext(self):1003                return SSL.Context()1004        try:1005            global twisted_data1006            twisted_data = b''1007            context_factory = ContextFactory()1008            factory = EchoClientFactory()1009            wrapper.connectSSL(srv_host, self.srv_port, factory,1010                               context_factory)1011            # This will block until reactor.stop() is called1012            reactor.run()1013        finally:1014            self.stop_server(pid)1015        self.assertIn(b's_server -quiet -www', twisted_data)1016twisted_data = ''1017class XmlRpcLibTestCase(unittest.TestCase):1018    def test_lib(self):1019        m2xmlrpclib.SSL_Transport()1020        # XXX need server to test against1021class FtpsLibTestCase(unittest.TestCase):1022    def test_lib(self):1023        ftpslib.FTP_TLS()1024        # XXX need server to test against1025class SessionTestCase(unittest.TestCase):1026    def test_session_load_bad(self):1027        with self.assertRaises(SSL.SSLError):1028            SSL.Session.load_session('tests/signer.pem')...test_client_server.py
Source:test_client_server.py  
...48            self.loop.run_until_complete(49                asyncio.wait_for(self.client.worker_task, timeout=1))50        except asyncio.TimeoutError:                # pragma: no cover51            self.fail("Client failed to stop")52    def stop_server(self):53        self.server.close()54        try:55            self.loop.run_until_complete(56                asyncio.wait_for(self.server.wait_closed(), timeout=1))57        except asyncio.TimeoutError:                # pragma: no cover58            self.fail("Server failed to stop")59    def test_basic(self):60        self.start_server()61        self.start_client()62        self.loop.run_until_complete(self.client.send("Hello!"))63        reply = self.loop.run_until_complete(self.client.recv())64        self.assertEqual(reply, "Hello!")65        self.stop_client()66        self.stop_server()67    def test_server_close_while_client_connected(self):68        self.start_server()69        self.start_client()70        self.stop_server()71    def test_explicit_event_loop(self):72        self.start_server(loop=self.loop)73        self.start_client(loop=self.loop)74        self.loop.run_until_complete(self.client.send("Hello!"))75        reply = self.loop.run_until_complete(self.client.recv())76        self.assertEqual(reply, "Hello!")77        self.stop_client()78        self.stop_server()79    def test_protocol_attributes(self):80        self.start_server()81        self.start_client('attributes')82        expected_attrs = ('localhost', 8642, self.secure)83        client_attrs = (self.client.host, self.client.port, self.client.secure)84        self.assertEqual(client_attrs, expected_attrs)85        server_attrs = self.loop.run_until_complete(self.client.recv())86        self.assertEqual(server_attrs, repr(expected_attrs))87        self.stop_client()88        self.stop_server()89    def test_protocol_headers(self):90        self.start_server()91        self.start_client('headers')92        client_req = self.client.request_headers93        client_resp = self.client.response_headers94        self.assertEqual(client_req['User-Agent'], USER_AGENT)95        self.assertEqual(client_resp['Server'], USER_AGENT)96        server_req = self.loop.run_until_complete(self.client.recv())97        server_resp = self.loop.run_until_complete(self.client.recv())98        self.assertEqual(server_req, str(client_req))99        self.assertEqual(server_resp, str(client_resp))100        self.stop_client()101        self.stop_server()102    def test_protocol_raw_headers(self):103        self.start_server()104        self.start_client('raw_headers')105        client_req = self.client.raw_request_headers106        client_resp = self.client.raw_response_headers107        self.assertEqual(dict(client_req)['User-Agent'], USER_AGENT)108        self.assertEqual(dict(client_resp)['Server'], USER_AGENT)109        server_req = self.loop.run_until_complete(self.client.recv())110        server_resp = self.loop.run_until_complete(self.client.recv())111        self.assertEqual(server_req, repr(client_req))112        self.assertEqual(server_resp, repr(client_resp))113        self.stop_client()114        self.stop_server()115    def test_protocol_custom_request_headers_dict(self):116        self.start_server()117        self.start_client('raw_headers', extra_headers={'X-Spam': 'Eggs'})118        req_headers = self.loop.run_until_complete(self.client.recv())119        self.loop.run_until_complete(self.client.recv())120        self.assertIn("('X-Spam', 'Eggs')", req_headers)121        self.stop_client()122        self.stop_server()123    def test_protocol_custom_request_headers_list(self):124        self.start_server()125        self.start_client('raw_headers', extra_headers=[('X-Spam', 'Eggs')])126        req_headers = self.loop.run_until_complete(self.client.recv())127        self.loop.run_until_complete(self.client.recv())128        self.assertIn("('X-Spam', 'Eggs')", req_headers)129        self.stop_client()130        self.stop_server()131    def test_protocol_custom_response_headers_callable_dict(self):132        self.start_server(extra_headers=lambda p, r: {'X-Spam': 'Eggs'})133        self.start_client('raw_headers')134        self.loop.run_until_complete(self.client.recv())135        resp_headers = self.loop.run_until_complete(self.client.recv())136        self.assertIn("('X-Spam', 'Eggs')", resp_headers)137        self.stop_client()138        self.stop_server()139    def test_protocol_custom_response_headers_callable_list(self):140        self.start_server(extra_headers=lambda p, r: [('X-Spam', 'Eggs')])141        self.start_client('raw_headers')142        self.loop.run_until_complete(self.client.recv())143        resp_headers = self.loop.run_until_complete(self.client.recv())144        self.assertIn("('X-Spam', 'Eggs')", resp_headers)145        self.stop_client()146        self.stop_server()147    def test_protocol_custom_response_headers_dict(self):148        self.start_server(extra_headers={'X-Spam': 'Eggs'})149        self.start_client('raw_headers')150        self.loop.run_until_complete(self.client.recv())151        resp_headers = self.loop.run_until_complete(self.client.recv())152        self.assertIn("('X-Spam', 'Eggs')", resp_headers)153        self.stop_client()154        self.stop_server()155    def test_protocol_custom_response_headers_list(self):156        self.start_server(extra_headers=[('X-Spam', 'Eggs')])157        self.start_client('raw_headers')158        self.loop.run_until_complete(self.client.recv())159        resp_headers = self.loop.run_until_complete(self.client.recv())160        self.assertIn("('X-Spam', 'Eggs')", resp_headers)161        self.stop_client()162        self.stop_server()163    def test_no_subprotocol(self):164        self.start_server()165        self.start_client('subprotocol')166        server_subprotocol = self.loop.run_until_complete(self.client.recv())167        self.assertEqual(server_subprotocol, repr(None))168        self.assertEqual(self.client.subprotocol, None)169        self.stop_client()170        self.stop_server()171    def test_subprotocol_found(self):172        self.start_server(subprotocols=['superchat', 'chat'])173        self.start_client('subprotocol', subprotocols=['otherchat', 'chat'])174        server_subprotocol = self.loop.run_until_complete(self.client.recv())175        self.assertEqual(server_subprotocol, repr('chat'))176        self.assertEqual(self.client.subprotocol, 'chat')177        self.stop_client()178        self.stop_server()179    def test_subprotocol_not_found(self):180        self.start_server(subprotocols=['superchat'])181        self.start_client('subprotocol', subprotocols=['otherchat'])182        server_subprotocol = self.loop.run_until_complete(self.client.recv())183        self.assertEqual(server_subprotocol, repr(None))184        self.assertEqual(self.client.subprotocol, None)185        self.stop_client()186        self.stop_server()187    def test_subprotocol_not_offered(self):188        self.start_server()189        self.start_client('subprotocol', subprotocols=['otherchat', 'chat'])190        server_subprotocol = self.loop.run_until_complete(self.client.recv())191        self.assertEqual(server_subprotocol, repr(None))192        self.assertEqual(self.client.subprotocol, None)193        self.stop_client()194        self.stop_server()195    def test_subprotocol_not_requested(self):196        self.start_server(subprotocols=['superchat', 'chat'])197        self.start_client('subprotocol')198        server_subprotocol = self.loop.run_until_complete(self.client.recv())199        self.assertEqual(server_subprotocol, repr(None))200        self.assertEqual(self.client.subprotocol, None)201        self.stop_client()202        self.stop_server()203    @unittest.mock.patch.object(WebSocketServerProtocol, 'select_subprotocol')204    def test_subprotocol_error(self, _select_subprotocol):205        _select_subprotocol.return_value = 'superchat'206        self.start_server(subprotocols=['superchat'])207        with self.assertRaises(InvalidHandshake):208            self.start_client('subprotocol', subprotocols=['otherchat'])209        self.run_loop_once()210        self.stop_server()211    @unittest.mock.patch('websockets.server.read_request')212    def test_server_receives_malformed_request(self, _read_request):213        _read_request.side_effect = ValueError("read_request failed")214        self.start_server()215        with self.assertRaises(InvalidHandshake):216            self.start_client()217        self.stop_server()218    @unittest.mock.patch('websockets.client.read_response')219    def test_client_receives_malformed_response(self, _read_response):220        _read_response.side_effect = ValueError("read_response failed")221        self.start_server()222        with self.assertRaises(InvalidHandshake):223            self.start_client()224        self.run_loop_once()225        self.stop_server()226    @unittest.mock.patch('websockets.client.build_request')227    def test_client_sends_invalid_handshake_request(self, _build_request):228        def wrong_build_request(set_header):229            return '42'230        _build_request.side_effect = wrong_build_request231        self.start_server()232        with self.assertRaises(InvalidHandshake):233            self.start_client()234        self.stop_server()235    @unittest.mock.patch('websockets.server.build_response')236    def test_server_sends_invalid_handshake_response(self, _build_response):237        def wrong_build_response(set_header, key):238            return build_response(set_header, '42')239        _build_response.side_effect = wrong_build_response240        self.start_server()241        with self.assertRaises(InvalidHandshake):242            self.start_client()243        self.stop_server()244    @unittest.mock.patch('websockets.client.read_response')245    def test_server_does_not_switch_protocols(self, _read_response):246        @asyncio.coroutine247        def wrong_read_response(stream):248            code, headers = yield from read_response(stream)249            return 400, headers250        _read_response.side_effect = wrong_read_response251        self.start_server()252        with self.assertRaises(InvalidHandshake):253            self.start_client()254        self.run_loop_once()255        self.stop_server()256    @unittest.mock.patch('websockets.server.WebSocketServerProtocol.send')257    def test_server_handler_crashes(self, send):258        send.side_effect = ValueError("send failed")259        self.start_server()260        self.start_client()261        self.loop.run_until_complete(self.client.send("Hello!"))262        with self.assertRaises(ConnectionClosed):263            self.loop.run_until_complete(self.client.recv())264        self.stop_client()265        self.stop_server()266        # Connection ends with an unexpected error.267        self.assertEqual(self.client.close_code, 1011)268    @unittest.mock.patch('websockets.server.WebSocketServerProtocol.close')269    def test_server_close_crashes(self, close):270        close.side_effect = ValueError("close failed")271        self.start_server()272        self.start_client()273        self.loop.run_until_complete(self.client.send("Hello!"))274        reply = self.loop.run_until_complete(self.client.recv())275        self.assertEqual(reply, "Hello!")276        self.stop_client()277        self.stop_server()278        # Connection ends with an abnormal closure.279        self.assertEqual(self.client.close_code, 1006)280    @unittest.mock.patch.object(WebSocketClientProtocol, 'handshake')281    def test_client_closes_connection_before_handshake(self, handshake):282        self.start_server()283        self.start_client()284        # We have mocked the handshake() method to prevent the client from285        # performing the opening handshake. Force it to close the connection.286        self.loop.run_until_complete(self.client.close_connection(force=True))287        self.stop_client()288        # The server should stop properly anyway. It used to hang because the289        # worker handling the connection was waiting for the opening handshake.290        self.stop_server()291    @unittest.mock.patch('websockets.server.read_request')292    def test_server_shuts_down_during_opening_handshake(self, _read_request):293        _read_request.side_effect = asyncio.CancelledError294        self.start_server()295        self.server.closing = True296        with self.assertRaises(InvalidHandshake) as raised:297            self.start_client()298        self.stop_server()299        # Opening handshake fails with 503 Service Unavailable300        self.assertEqual(str(raised.exception), "Bad status code: 503")301    def test_server_shuts_down_during_connection_handling(self):302        self.start_server()303        self.start_client()304        self.server.close()305        with self.assertRaises(ConnectionClosed):306            self.loop.run_until_complete(self.client.recv())307        self.stop_client()308        self.stop_server()309        # Websocket connection terminates with 1001 Going Away.310        self.assertEqual(self.client.close_code, 1001)311@unittest.skipUnless(os.path.exists(testcert), "test certificate is missing")312class SSLClientServerTests(ClientServerTests):313    secure = True314    @property315    def server_context(self):316        ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)317        ssl_context.load_cert_chain(testcert)318        return ssl_context319    @property320    def client_context(self):321        ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)322        ssl_context.load_verify_locations(testcert)323        ssl_context.verify_mode = ssl.CERT_REQUIRED324        return ssl_context325    def start_server(self, *args, **kwds):326        kwds['ssl'] = self.server_context327        server = serve(handler, 'localhost', 8642, **kwds)328        self.server = self.loop.run_until_complete(server)329    def start_client(self, path='', **kwds):330        kwds['ssl'] = self.client_context331        client = connect('wss://localhost:8642/' + path, **kwds)332        self.client = self.loop.run_until_complete(client)333    def test_ws_uri_is_rejected(self):334        self.start_server()335        client = connect('ws://localhost:8642/', ssl=self.client_context)336        with self.assertRaises(ValueError):337            self.loop.run_until_complete(client)338        self.stop_server()339class ClientServerOriginTests(unittest.TestCase):340    def setUp(self):341        self.loop = asyncio.new_event_loop()342        asyncio.set_event_loop(self.loop)343    def tearDown(self):344        self.loop.close()345    def test_checking_origin_succeeds(self):346        server = self.loop.run_until_complete(347            serve(handler, 'localhost', 8642, origins=['http://localhost']))348        client = self.loop.run_until_complete(349            connect('ws://localhost:8642/', origin='http://localhost'))350        self.loop.run_until_complete(client.send("Hello!"))351        self.assertEqual(self.loop.run_until_complete(client.recv()), "Hello!")352        self.loop.run_until_complete(client.close())...service_test_case.py
Source:service_test_case.py  
1import contextlib2import io3import logging4import unittest5import socket6from threading import Thread, Event7from wsgiref.headers import Headers8from wsgiref.simple_server import make_server9from wsgiref.util import request_uri10from urllib.parse import urlparse11from typing import Any, Callable, cast, Dict, Generator, Iterable, List12from typing import Optional, Tuple, TYPE_CHECKING13if TYPE_CHECKING:14    from wsgiref.types import StartResponse15    Environ = Dict[str, Any]16    Handler = Callable[17        [18            Environ,19            StartResponse20        ],21        Iterable[bytes]22    ]23    HandlerIdentifier = Tuple[str, str]24def get_port() -> int:25    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)26    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)27    s.bind(("localhost", 0))28    port = cast(int, s.getsockname()[1])29    s.close()30    return port31class ServiceRequest(object):32    # eventually this can contain headers, body, etc. as necessary for comparison33    def __init__(34            self, headers: Dict[str, str], method: str, path: str, body: Optional[bytes]) -> None:35        self.headers = Headers([(key.replace("_", "-"), value) for key, value in headers.items()])36        self.method = method37        self.path = path38        self.body = body39    def assert_header_equals(self, header_key: str, header_value: str) -> None:40        assert header_key in self.headers, \41            f"Expected header {header_key} not in request headers."42        actual_value = self.headers[header_key]43        assert actual_value == header_value, \44            f"Request header {header_key} unexpectedly set to " \45            f"`{actual_value}` instead of `{header_value}`."46    def assert_body_equals(self, body: bytes) -> None:47        assert body == self.body, f"Body unexpectedly equals {self.body} instead of {body}"48class Service(object):49    fetches: Dict[Tuple[str, str], List[ServiceRequest]]50    server_started: Optional[Event]51    stop_server: Optional[Event]52    thread: Optional[Thread]53    def __init__(self) -> None:54        self.port: int = get_port()55        self.handlers: "Dict[HandlerIdentifier, Handler]" = {}56        self.thread = None57        self.fetches = {}58        self.server_started = None59        self.stop_server = None60    def url(self, path: str) -> str:61        return f"http://localhost:{self.port}{path}"62    def add_handler(self, method: str, path: str, callback: "Handler") -> None:63        identifier: "HandlerIdentifier" = (method, path)64        self.handlers[identifier] = callback65    def handler(self, environ: "Environ", start_response: "StartResponse") -> Iterable[bytes]:66        uri = request_uri(environ, include_query=True)67        path = urlparse(uri).path68        method = environ["REQUEST_METHOD"]69        body: Optional[bytes] = None70        if method in ("POST", "PUT"):71            content_length = int(environ.get("CONTENT_LENGTH", 0) or "0")72            if content_length > 0:73                body = environ["wsgi.input"].read(content_length)74                if body is None:75                    body = b""76                environ["wsgi.input"] = io.BytesIO(body)77            else:78                logging.warning(f"Unable to determine content length for request {method} {path}")79        headers = {80            key: value for key, value in environ.copy().items()81            if key.startswith("HTTP_") or key == "CONTENT_TYPE"82        }83        request = ServiceRequest(headers=headers, method=method, path=path, body=body)84        logging.info(f"Received {method} request for localhost:{self.port}{path}.")85        identifier = (method, path)86        if identifier not in self.handlers:87            logging.warning(88                f"No handler registered for {method} "89                f"localhost:{self.port}{path}")90            start_response("404 Not Found", [("Content-type", "text/plain")])91            return [f"No handler registered for {identifier}".encode("utf8")]92        environ["REQUEST_PATH"] = path93        self.fetches.setdefault(identifier, [])94        self.fetches[identifier].append(request)95        return self.handlers[identifier](environ, start_response)96    def start(self) -> None:97        if self.server_started is not None or self.stop_server is not None:98            raise Exception(f"Service already started on port {self.port}")99        self.server_started = Event()100        self.stop_server = Event()101        # work around mypy failing to infer that these variables can't be None102        server_started = self.server_started103        stop_server = self.stop_server104        self.thread = Thread(target=lambda: self.loop(server_started, stop_server))105        self.thread.start()106        logging.info(f"Starting server on port {self.port}...")107        server_started.wait()108        logging.info(f"Server on port {self.port} ready for requests.")109    def stop(self) -> None:110        if self.server_started is not None and self.stop_server is not None \111                and self.thread is not None:112            self.stop_server.set()113            self.thread.join()114        self.server_started = None115        self.stop_server = None116        self.thread = None117    def loop(self, server_started: Event, stop_server: Event) -> None:118        with make_server("localhost", self.port, self.handler) as httpd:119            httpd.timeout = 0.01120            server_started.set()121            while not stop_server.is_set():122                httpd.handle_request()123    def assert_requested(124            self, method: str, path: str,125            headers: Optional[Dict[str, str]] = None) -> ServiceRequest:126        identifier = (method, path)127        assert identifier in self.fetches, f"Could not find request matching {method} {path}"128        request = self.fetches[identifier][0]129        if headers is not None:130            for expected_header, expected_value in headers.items():131                request.assert_header_equals(expected_header, expected_value)132        return request133    def get_all_requests(self, method: str, path: str) -> List[ServiceRequest]:134        identifier = (method, path)135        return self.fetches.get(identifier, [])136    def assert_not_requested(self, method: str, path: str) -> None:137        identifier = (method, path)138        assert identifier not in self.fetches, f"Unexpected request found for {method} {path}"139    def assert_requested_n_times(140            self, method: str, path: str, n: int) -> List[ServiceRequest]:141        requests = self.get_all_requests(method, path)142        assert len(requests) == n, \143            f"Expected request count for {method} {path} ({n}) did not match " \144            f"actual count: {len(requests)}"145        return requests146class ServiceTestCase(unittest.TestCase):147    def setUp(self) -> None:148        super().setUp()149        self.services: List[Service] = []150    def tearDown(self) -> None:151        super().tearDown()152        self.stop_services()153    def add_service(self) -> Service:154        service = Service()155        self.services.append(service)156        return service157    def start_services(self) -> None:158        for service in self.services:159            service.start()160    def stop_services(self) -> None:161        for service in self.services:162            service.stop()163    @contextlib.contextmanager164    def run_services(self) -> Generator[None, None, None]:165        self.start_services()166        try:167            yield168        finally:...run_tests.py
Source:run_tests.py  
...16        print("RESPONSE: {}".format(response))17        if response == '{"id": "8db4206f-8878-174d-7a23-dd2c4f4ef5a0", "prediction": 0.1495}':18            print("\nPredict endpoint: PASS")19            print("###")20            stop_server(server)21            exit(0)22        else:23            print("\nPredict endpoint: FAIL")24            print("###")            25            stop_server(server)26            exit(1)27    except:28        stop_server(server)29def test_train_model():30    print("\n###")31    print("Testing train model")32    server = start_server()33    try:34        config = load_config(config_file)35        n_models = len(os.listdir(config["model_dir"]))36        data = {"dataset_path": "training_set.parquet"}37        requests.post("http://localhost:8080/train", json=data)38        if n_models + 1 == len(os.listdir(config["model_dir"])):39            print("\nTrain endpoint: PASS")40            print("###")41            stop_server(server)42            exit(0)43        else:44            print("\nTrain endpoint: FAIL")45            print("###")            46            stop_server(server)47            exit(1)48    except:49        stop_server(server)50def test_update_model():51    print("\n###")52    print("Testing update model")53    server = start_server()54    old_model = load_config(config_file)["current_model"]55    # Using the same file56    data = {"dataset_path": "training_set.parquet"}57    response = requests.post("http://localhost:8080/train", json=data)58    # For some reason the response is coming with single quotes and that is not a proper JSON59    # So I need this ugly implementation to work around this...60    model_name = json.loads(response.text.replace("'", "\""))["model_name"]61    data = {"model_name": "{}".format(model_name)}62    response = requests.post("http://localhost:8080/update_model", json=data)63    model_name = json.loads(response.text.replace("'", "\""))["current_model"]64    if old_model != model_name:65        print("\nUpdate model endpoint: PASS")66        print("###")67        stop_server(server)68        exit(0)69    else:70        print("\nUpdate model endpoint: FAIL")71        print("###")        72        stop_server(server)73        exit(1)74    try:75        pass76    except:77        stop_server(server)78def test_config():79    print("\n###")80    print("Testing config")81    server = start_server()82    try:83        config = load_config(config_file)84        response = requests.get("http://localhost:8080/config")85        if json.dumps(config) == response.text:86            print("\nConfig endpoint: PASS")87            print("###")88            stop_server(server)89            exit(0)90        else:91            print("\nConfig endpoint: FAIL")92            print("###")            93            stop_server(server)94            exit(1)95    except:96        stop_server(server)97def start_server():98    server = sp.Popen(['python', 'server_run.py'])99    # Giving a few seconds to have the server start100    time.sleep(3)101    return server102def stop_server(server_process):103    sp.Popen.terminate(server_process)104def load_config(config_path):105    # Loading config file from disk106    with open(config_path, "r") as json_data:107        config = json.loads(json_data.read())108        return config109if __name__ == "__main__":110    config_file = "config.json"111    # Tests to run112    test_config()113    test_predict()114    test_train_model()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
