Best Python code snippet using tempest_python
test_ssl.py
Source:test_ssl.py  
...58            pass59    return False60class BaseSSLClientTestCase(unittest.TestCase):61    openssl_in_path = find_openssl()62    def start_server(self, args):63        if not self.openssl_in_path:64            raise Exception('openssl command not in PATH')65        66        pid = os.fork()67        if pid == 0:68            # openssl must be started in the tests directory for it69            # to find the .pem files70            os.chdir('tests')71            try:72                os.execvp('openssl', args)73            finally:74                os.chdir('..')75                76        else:77            time.sleep(sleepTime)78            return pid79    def stop_server(self, pid):80        os.kill(pid, 1)81        os.waitpid(pid, 0)82    def http_get(self, s):83        s.send('GET / HTTP/1.0\n\n') 84        resp = ''85        while 1:86            try:87                r = s.recv(4096)88                if not r:89                    break90            except SSL.SSLError: # s_server throws an 'unexpected eof'...91                break92            resp = resp + r 93        return resp94    def setUp(self):95        self.srv_host = srv_host96        self.srv_port = srv_port97        self.srv_addr = (srv_host, srv_port)98        self.srv_url = 'https://%s:%s/' % (srv_host, srv_port)99        self.args = ['s_server', '-quiet', '-www',100                     #'-cert', 'server.pem', Implicitly using this101                     '-accept', str(self.srv_port)]102    def tearDown(self):103        global srv_port104        srv_port = srv_port - 1105class PassSSLClientTestCase(BaseSSLClientTestCase):106        107    def test_pass(self):108        pass109class HttpslibSSLClientTestCase(BaseSSLClientTestCase):110    def test_HTTPSConnection(self):111        pid = self.start_server(self.args)112        try:113            from M2Crypto import httpslib114            c = httpslib.HTTPSConnection(srv_host, srv_port)115            c.request('GET', '/')116            data = c.getresponse().read()117            c.close()118        finally:119            self.stop_server(pid)120        self.failIf(string.find(data, 's_server -quiet -www') == -1)121    def test_HTTPSConnection_resume_session(self):122        pid = self.start_server(self.args)123        try:124            from M2Crypto import httpslib125            ctx = SSL.Context()126            ctx.load_verify_locations(cafile='tests/ca.pem')127            ctx.load_cert('tests/x509.pem')128            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 1)129            ctx.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT)130            c = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx)131            c.request('GET', '/')132            ses = c.get_session()133            t = ses.as_text()134            data = c.getresponse().read()135            # Appearently closing connection here screws session; Ali Polatel?136            # c.close()137            138            ctx2 = SSL.Context()139            ctx2.load_verify_locations(cafile='tests/ca.pem')140            ctx2.load_cert('tests/x509.pem')141            ctx2.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 1)142            ctx2.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT)143            c2 = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx2)144            c2.set_session(ses)145            c2.request('GET', '/')146            ses2 = c2.get_session()147            t2 = ses2.as_text()148            data = c2.getresponse().read()149            c.close()150            c2.close()151            assert t == t2, "Sessions did not match"152        finally:153            self.stop_server(pid)154        self.failIf(string.find(data, 's_server -quiet -www') == -1)155    def test_HTTPSConnection_secure_context(self):156        pid = self.start_server(self.args)157        try:158            from M2Crypto import httpslib159            ctx = SSL.Context()160            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)161            ctx.load_verify_locations('tests/ca.pem')162            c = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx)163            c.request('GET', '/')164            data = c.getresponse().read()165            c.close()166        finally:167            self.stop_server(pid)168        self.failIf(string.find(data, 's_server -quiet -www') == -1)169    def test_HTTPSConnection_secure_context_fail(self):170        pid = self.start_server(self.args)171        try:172            from M2Crypto import httpslib173            ctx = SSL.Context()174            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)175            ctx.load_verify_locations('tests/server.pem')176            c = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx)177            self.assertRaises(SSL.SSLError, c.request, 'GET', '/')178            c.close()179        finally:180            self.stop_server(pid)181    def test_HTTPS(self):182        pid = self.start_server(self.args)183        try:184            from M2Crypto import httpslib185            c = httpslib.HTTPS(srv_host, srv_port)186            c.putrequest('GET', '/')187            c.putheader('Accept', 'text/html')188            c.putheader('Accept', 'text/plain')189            c.endheaders()190            err, msg, headers = c.getreply()191            assert err == 200, err192            f = c.getfile()193            data = f.read()194            c.close()195        finally:196            self.stop_server(pid)197        self.failIf(string.find(data, 's_server -quiet -www') == -1)198    def test_HTTPS_secure_context(self):199        pid = self.start_server(self.args)200        try:201            from M2Crypto import httpslib202            ctx = SSL.Context()203            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)204            ctx.load_verify_locations('tests/ca.pem')205            c = httpslib.HTTPS(srv_host, srv_port, ssl_context=ctx)206            c.putrequest('GET', '/')207            c.putheader('Accept', 'text/html')208            c.putheader('Accept', 'text/plain')209            c.endheaders()210            err, msg, headers = c.getreply()211            assert err == 200, err212            f = c.getfile()213            data = f.read()214            c.close()215        finally:216            self.stop_server(pid)217        self.failIf(string.find(data, 's_server -quiet -www') == -1)218    def test_HTTPS_secure_context_fail(self):219        pid = self.start_server(self.args)220        try:221            from M2Crypto import httpslib222            ctx = SSL.Context()223            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)224            ctx.load_verify_locations('tests/server.pem')225            c = httpslib.HTTPS(srv_host, srv_port, ssl_context=ctx)226            c.putrequest('GET', '/')227            c.putheader('Accept', 'text/html')228            c.putheader('Accept', 'text/plain')229            self.assertRaises(SSL.SSLError, c.endheaders)230            c.close()231        finally:232            self.stop_server(pid)233            234    def test_HTTPSConnection_illegalkeywordarg(self):235        from M2Crypto import httpslib236        self.assertRaises(ValueError, httpslib.HTTPSConnection, 'example.org',237                          badKeyword=True)238class MiscSSLClientTestCase(BaseSSLClientTestCase):239    def test_no_connection(self):240        ctx = SSL.Context()241        s = SSL.Connection(ctx)242        243    def test_server_simple(self):244        pid = self.start_server(self.args)245        try:246            self.assertRaises(ValueError, SSL.Context, 'tlsv5')247            ctx = SSL.Context()248            s = SSL.Connection(ctx)249            s.connect(self.srv_addr)250            self.assertRaises(ValueError, s.read, 0)251            data = self.http_get(s)252            s.close()253        finally:254            self.stop_server(pid)255        self.failIf(string.find(data, 's_server -quiet -www') == -1)256    def test_server_simple_secure_context(self):257        pid = self.start_server(self.args)258        try:259            ctx = SSL.Context()260            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)261            ctx.load_verify_locations('tests/ca.pem')262            s = SSL.Connection(ctx)263            s.connect(self.srv_addr)264            data = self.http_get(s)265            s.close()266        finally:267            self.stop_server(pid)268        self.failIf(string.find(data, 's_server -quiet -www') == -1)269    def test_server_simple_secure_context_fail(self):270        pid = self.start_server(self.args)271        try:272            ctx = SSL.Context()273            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)274            ctx.load_verify_locations('tests/server.pem')275            s = SSL.Connection(ctx)276            self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)277            s.close()278        finally:279            self.stop_server(pid)280    def test_server_simple_timeouts(self):281        pid = self.start_server(self.args)282        try:283            self.assertRaises(ValueError, SSL.Context, 'tlsv5')284            ctx = SSL.Context()285            s = SSL.Connection(ctx)286            287            r = s.get_socket_read_timeout()288            w = s.get_socket_write_timeout()289            assert r.sec == 0, r.sec290            assert r.microsec == 0, r.microsec291            assert w.sec == 0, w.sec292            assert w.microsec == 0, w.microsec293            s.set_socket_read_timeout(SSL.timeout())294            s.set_socket_write_timeout(SSL.timeout(909,9))295            r = s.get_socket_read_timeout()296            w = s.get_socket_write_timeout()297            assert r.sec == 600, r.sec298            assert r.microsec == 0, r.microsec299            assert w.sec == 909, w.sec300            #assert w.microsec == 9, w.microsec XXX 4000301            302            s.connect(self.srv_addr)303            data = self.http_get(s)304            s.close()305        finally:306            self.stop_server(pid)307        self.failIf(string.find(data, 's_server -quiet -www') == -1)308    def test_tls1_nok(self):309        if fips_mode: # TLS is required in FIPS mode310            return311        self.args.append('-no_tls1')312        pid = self.start_server(self.args)313        try:314            ctx = SSL.Context('tlsv1')315            s = SSL.Connection(ctx)316            try:317                s.connect(self.srv_addr)318            except SSL.SSLError, e:319                self.failUnlessEqual(e[0], 'wrong version number')320            s.close()321        finally:322            self.stop_server(pid)323    def test_tls1_ok(self):324        self.args.append('-tls1')325        pid = self.start_server(self.args)326        try:327            ctx = SSL.Context('tlsv1')328            s = SSL.Connection(ctx)329            s.connect(self.srv_addr)330            data = self.http_get(s)331            s.close()332        finally:333            self.stop_server(pid)334        self.failIf(string.find(data, 's_server -quiet -www') == -1)335    def test_sslv23_no_v2(self):336        if fips_mode: # TLS is required in FIPS mode337            return338        self.args.append('-no_tls1')339        pid = self.start_server(self.args)340        try:341            ctx = SSL.Context('sslv23')342            s = SSL.Connection(ctx)343            s.connect(self.srv_addr)344            self.failUnlessEqual(s.get_version(), 'SSLv3')345            s.close()346        finally:347            self.stop_server(pid)348    def test_sslv23_no_v2_no_service(self):349        if fips_mode: # TLS is required in FIPS mode350            return351        self.args = self.args + ['-no_tls1', '-no_ssl3']352        pid = self.start_server(self.args)353        try:354            ctx = SSL.Context('sslv23')355            s = SSL.Connection(ctx)356            self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)357            s.close()358        finally:359            self.stop_server(pid)360    def test_sslv23_weak_crypto(self):361        if fips_mode: # TLS is required in FIPS mode362            return363        self.args = self.args + ['-no_tls1', '-no_ssl3']364        pid = self.start_server(self.args)365        try:366            ctx = SSL.Context('sslv23', weak_crypto=1)367            s = SSL.Connection(ctx)368            if m2.OPENSSL_VERSION_NUMBER < 0x10000000: # SSLv2 ciphers disabled by default in newer OpenSSL369                s.connect(self.srv_addr)370                self.failUnlessEqual(s.get_version(), 'SSLv2')371            else:372                self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)373            s.close()374        finally:375            self.stop_server(pid)376    def test_cipher_mismatch(self):377        self.args = self.args + ['-cipher', 'AES256-SHA']378        pid = self.start_server(self.args)379        try:380            ctx = SSL.Context()381            s = SSL.Connection(ctx)382            s.set_cipher_list('AES128-SHA')383            try:384                s.connect(self.srv_addr)385            except SSL.SSLError, e:386                self.failUnlessEqual(e[0], 'sslv3 alert handshake failure')387            s.close()388        finally:389            self.stop_server(pid)390        391    def test_no_such_cipher(self):392        self.args = self.args + ['-cipher', 'AES128-SHA']393        pid = self.start_server(self.args)394        try:395            ctx = SSL.Context()396            s = SSL.Connection(ctx)397            s.set_cipher_list('EXP-RC2-MD5')398            try:399                s.connect(self.srv_addr)400            except SSL.SSLError, e:401                self.failUnlessEqual(e[0], 'no ciphers available')402            s.close()403        finally:404            self.stop_server(pid)405        406    def test_no_weak_cipher(self):407        if fips_mode: # Weak ciphers are prohibited408            return409        self.args = self.args + ['-cipher', 'EXP']410        pid = self.start_server(self.args)411        try:412            ctx = SSL.Context()413            s = SSL.Connection(ctx)414            try:415                s.connect(self.srv_addr)416            except SSL.SSLError, e:417                self.failUnlessEqual(e[0], 'sslv3 alert handshake failure')418            s.close()419        finally:420            self.stop_server(pid)421        422    def test_use_weak_cipher(self):423        if fips_mode: # Weak ciphers are prohibited424            return425        self.args = self.args + ['-cipher', 'EXP']426        pid = self.start_server(self.args)427        try:428            ctx = SSL.Context(weak_crypto=1)429            s = SSL.Connection(ctx)430            s.connect(self.srv_addr)431            data = self.http_get(s)432            s.close()433        finally:434            self.stop_server(pid)435        self.failIf(string.find(data, 's_server -quiet -www') == -1)436        437    def test_cipher_ok(self):438        self.args = self.args + ['-cipher', 'AES128-SHA']439        pid = self.start_server(self.args)440        try:441            ctx = SSL.Context()442            s = SSL.Connection(ctx)443            s.set_cipher_list('AES128-SHA')444            s.connect(self.srv_addr)445            data = self.http_get(s)446            447            assert s.get_cipher().name() == 'AES128-SHA', s.get_cipher().name()448            449            cipher_stack = s.get_ciphers()450            assert cipher_stack[0].name() == 'AES128-SHA', cipher_stack[0].name()451            self.assertRaises(IndexError, cipher_stack.__getitem__, 2)452            # For some reason there are 2 entries in the stack453            #assert len(cipher_stack) == 1, len(cipher_stack)454            assert s.get_cipher_list() == 'AES128-SHA', s.get_cipher_list()455            456            # Test Cipher_Stack iterator457            i = 0458            for cipher in cipher_stack:459                i += 1460                assert cipher.name() == 'AES128-SHA', '"%s"' % cipher.name()461                self.assertEqual('AES128-SHA-128', str(cipher))462            # For some reason there are 2 entries in the stack463            #assert i == 1, i464            self.assertEqual(i, len(cipher_stack))465            466            s.close()467        finally:468            self.stop_server(pid)469        self.failIf(string.find(data, 's_server -quiet -www') == -1)470        471    def verify_cb_new(self, ok, store):472        return verify_cb_new_function(ok, store)473    def test_verify_cb_new(self):474        pid = self.start_server(self.args)475        try:476            ctx = SSL.Context()477            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,478                           self.verify_cb_new)479            s = SSL.Connection(ctx)480            try:481                s.connect(self.srv_addr)482            except SSL.SSLError, e:483                assert 0, e484            data = self.http_get(s)485            s.close()486        finally:487            self.stop_server(pid)488        self.failIf(string.find(data, 's_server -quiet -www') == -1)489    def test_verify_cb_new_class(self):490        pid = self.start_server(self.args)491        try:492            ctx = SSL.Context()493            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,494                           VerifyCB())495            s = SSL.Connection(ctx)496            try:497                s.connect(self.srv_addr)498            except SSL.SSLError, e:499                assert 0, e500            data = self.http_get(s)501            s.close()502        finally:503            self.stop_server(pid)504        self.failIf(string.find(data, 's_server -quiet -www') == -1)505    def test_verify_cb_new_function(self):506        pid = self.start_server(self.args)507        try:508            ctx = SSL.Context()509            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,510                           verify_cb_new_function)511            s = SSL.Connection(ctx)512            try:513                s.connect(self.srv_addr)514            except SSL.SSLError, e:515                assert 0, e516            data = self.http_get(s)517            s.close()518        finally:519            self.stop_server(pid)520        self.failIf(string.find(data, 's_server -quiet -www') == -1)521    def test_verify_cb_lambda(self):522        pid = self.start_server(self.args)523        try:524            ctx = SSL.Context()525            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,526                           lambda ok, store: 1)527            s = SSL.Connection(ctx)528            try:529                s.connect(self.srv_addr)530            except SSL.SSLError, e:531                assert 0, e532            data = self.http_get(s)533            s.close()534        finally:535            self.stop_server(pid)536        self.failIf(string.find(data, 's_server -quiet -www') == -1)537    def verify_cb_exception(self, ok, store):538        raise Exception, 'We should fail verification'539    def test_verify_cb_exception(self):540        pid = self.start_server(self.args)541        try:542            ctx = SSL.Context()543            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,544                           self.verify_cb_exception)545            s = SSL.Connection(ctx)546            self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)547            s.close()548        finally:549            self.stop_server(pid)550    def test_verify_cb_not_callable(self):551        ctx = SSL.Context()552        self.assertRaises(TypeError,553                          ctx.set_verify,554                          SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,555                          9,556                          1)557    def test_verify_cb_wrong_callable(self):558        pid = self.start_server(self.args)559        try:560            ctx = SSL.Context()561            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,562                           lambda _: '')563            s = SSL.Connection(ctx)564            self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)565            s.close()566        finally:567            self.stop_server(pid)568    def verify_cb_old(self, ctx_ptr, x509_ptr, err, depth, ok):569        try:570            from M2Crypto import X509571            assert not ok572            assert err == m2.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT or \573                   err == m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY or \574                   err == m2.X509_V_ERR_CERT_UNTRUSTED or \575                   err == m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE576            assert m2.ssl_ctx_get_cert_store(ctx_ptr)577            assert X509.X509(x509_ptr).as_pem()578        except AssertionError:579            # If we let exceptions propagate from here the580            # caller may see strange errors. This is cleaner.581            return 0582        return 1583    def test_verify_cb_old(self):584        pid = self.start_server(self.args)585        try:586            ctx = SSL.Context()587            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,588                           self.verify_cb_old)589            s = SSL.Connection(ctx)590            try:591                s.connect(self.srv_addr)592            except SSL.SSLError, e:593                assert 0, e594            data = self.http_get(s)595            s.close()596        finally:597            self.stop_server(pid)598        self.failIf(string.find(data, 's_server -quiet -www') == -1)599    def test_verify_allow_unknown_old(self):600        pid = self.start_server(self.args)601        try:602            ctx = SSL.Context()603            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,604                           SSL.cb.ssl_verify_callback)605            ctx.set_allow_unknown_ca(1)606            s = SSL.Connection(ctx)607            try:608                s.connect(self.srv_addr)609            except SSL.SSLError, e:610                assert 0, e611            data = self.http_get(s)612            s.close()613        finally:614            self.stop_server(pid)615        self.failIf(string.find(data, 's_server -quiet -www') == -1)616    def test_verify_allow_unknown_new(self):617        pid = self.start_server(self.args)618        try:619            ctx = SSL.Context()620            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,621                           SSL.cb.ssl_verify_callback_allow_unknown_ca)622            s = SSL.Connection(ctx)623            try:624                s.connect(self.srv_addr)625            except SSL.SSLError, e:626                assert 0, e627            data = self.http_get(s)628            s.close()629        finally:630            self.stop_server(pid)631        self.failIf(string.find(data, 's_server -quiet -www') == -1)632    def test_verify_cert(self):633        pid = self.start_server(self.args)634        try:635            ctx = SSL.Context()636            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)637            ctx.load_verify_locations('tests/ca.pem')638            s = SSL.Connection(ctx)639            try:640                s.connect(self.srv_addr)641            except SSL.SSLError, e:642                assert 0, e643            data = self.http_get(s)644            s.close()645        finally:646            self.stop_server(pid)647        self.failIf(string.find(data, 's_server -quiet -www') == -1)648    def test_verify_cert_fail(self):649        pid = self.start_server(self.args)650        try:651            ctx = SSL.Context()652            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)653            ctx.load_verify_locations('tests/server.pem')654            s = SSL.Connection(ctx)655            self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)656            s.close()657        finally:658            self.stop_server(pid)659    def test_verify_cert_mutual_auth(self):660        self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem'])        661        pid = self.start_server(self.args)662        try:663            ctx = SSL.Context()664            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)665            ctx.load_verify_locations('tests/ca.pem')666            ctx.load_cert('tests/x509.pem')667            s = SSL.Connection(ctx)668            try:669                s.connect(self.srv_addr)670            except SSL.SSLError, e:671                assert 0, e672            data = self.http_get(s)673            s.close()674        finally:675            self.stop_server(pid)676        self.failIf(string.find(data, 's_server -quiet -www') == -1)677    def test_verify_cert_mutual_auth_servernbio(self):678        self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem', '-nbio'])679        pid = self.start_server(self.args)680        try:681            ctx = SSL.Context()682            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)683            ctx.load_verify_locations('tests/ca.pem')684            ctx.load_cert('tests/x509.pem')685            s = SSL.Connection(ctx)686            try:687                s.connect(self.srv_addr)688            except SSL.SSLError, e:689                assert 0, e690            data = self.http_get(s)691            s.close()692        finally:693            self.stop_server(pid)694        self.failIf(string.find(data, 's_server -quiet -www') == -1)695    def test_verify_cert_mutual_auth_fail(self):696        self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem'])        697        pid = self.start_server(self.args)698        try:699            ctx = SSL.Context()700            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)701            ctx.load_verify_locations('tests/ca.pem')702            s = SSL.Connection(ctx)703            self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)704            s.close()705        finally:706            self.stop_server(pid)707    def test_verify_nocert_fail(self):708        self.args.extend(['-nocert'])        709        pid = self.start_server(self.args)710        try:711            ctx = SSL.Context()712            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)713            ctx.load_verify_locations('tests/ca.pem')714            s = SSL.Connection(ctx)715            self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)716            s.close()717        finally:718            self.stop_server(pid)719    def test_blocking0(self):720        pid = self.start_server(self.args)721        try:722            ctx = SSL.Context()723            s = SSL.Connection(ctx)724            s.setblocking(0)725            self.assertRaises(Exception, s.connect, self.srv_addr)726            s.close()727        finally:728            self.stop_server(pid)729    def test_blocking1(self):730        pid = self.start_server(self.args)731        try:732            ctx = SSL.Context()733            s = SSL.Connection(ctx)734            s.setblocking(1)735            try:736                s.connect(self.srv_addr)737            except SSL.SSLError, e:738                assert 0, e739            data = self.http_get(s)740            s.close()741        finally:742            self.stop_server(pid)743        self.failIf(string.find(data, 's_server -quiet -www') == -1)744    def test_makefile(self):745        pid = self.start_server(self.args)746        try:747            ctx = SSL.Context()748            s = SSL.Connection(ctx)749            try:750                s.connect(self.srv_addr)751            except SSL.SSLError, e:752                assert 0, e753            bio = s.makefile('rw')754            #s.close()  # XXX bug 6628?755            bio.write('GET / HTTP/1.0\n\n')756            bio.flush()757            data = bio.read()758            bio.close()759            s.close()760        finally:761            self.stop_server(pid)762        self.failIf(string.find(data, 's_server -quiet -www') == -1)763    def test_makefile_err(self):764        pid = self.start_server(self.args)765        try:766            ctx = SSL.Context()767            s = SSL.Connection(ctx)768            try:769                s.connect(self.srv_addr)770            except SSL.SSLError, e:771                assert 0, e772            f = s.makefile()773            data = self.http_get(s)774            s.close()775            del f776            del s777            err_code = Err.peek_error_code()778            assert not err_code, 'Unexpected error: %s' % err_code779            err = Err.get_error()780            assert not err, 'Unexpected error: %s' % err781        finally:782            self.stop_server(pid)783        self.failIf(string.find(data, 's_server -quiet -www') == -1)784    def test_info_callback(self):785        pid = self.start_server(self.args)786        try:787            ctx = SSL.Context()788            ctx.set_info_callback()789            s = SSL.Connection(ctx)790            s.connect(self.srv_addr)791            data = self.http_get(s)792            s.close()793        finally:794            self.stop_server(pid)795        self.failIf(string.find(data, 's_server -quiet -www') == -1)796class UrllibSSLClientTestCase(BaseSSLClientTestCase):797    def test_urllib(self):798        pid = self.start_server(self.args)799        try:800            from M2Crypto import m2urllib801            url = m2urllib.FancyURLopener()802            url.addheader('Connection', 'close')803            u = url.open('https://%s:%s/' % (srv_host, srv_port))804            data = u.read()805            u.close()806        finally:807            self.stop_server(pid)808        self.failIf(string.find(data, 's_server -quiet -www') == -1)809    # XXX Don't actually know how to use m2urllib safely!810    #def test_urllib_secure_context(self):811    #def test_urllib_secure_context_fail(self):812    # XXX Don't actually know how to use m2urllib safely!813    #def test_urllib_safe_context(self):814    #def test_urllib_safe_context_fail(self):815class Urllib2SSLClientTestCase(BaseSSLClientTestCase):816    if sys.version_info >= (2,4):817        def test_urllib2(self):818            pid = self.start_server(self.args)819            try:820                from M2Crypto import m2urllib2821                opener = m2urllib2.build_opener()822                opener.addheaders = [('Connection', 'close')]823                u = opener.open('https://%s:%s/' % (srv_host, srv_port))824                data = u.read()825                u.close()826            finally:827                self.stop_server(pid)828            self.failIf(string.find(data, 's_server -quiet -www') == -1)829    830        def test_urllib2_secure_context(self):831            pid = self.start_server(self.args)832            try:833                ctx = SSL.Context()834                ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)835                ctx.load_verify_locations('tests/ca.pem')836                837                from M2Crypto import m2urllib2838                opener = m2urllib2.build_opener(ctx)839                opener.addheaders = [('Connection', 'close')]           840                u = opener.open('https://%s:%s/' % (srv_host, srv_port))841                data = u.read()842                u.close()843            finally:844                self.stop_server(pid)845            self.failIf(string.find(data, 's_server -quiet -www') == -1)846    847        def test_urllib2_secure_context_fail(self):848            pid = self.start_server(self.args)849            try:850                ctx = SSL.Context()851                ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)852                ctx.load_verify_locations('tests/server.pem')853                854                from M2Crypto import m2urllib2855                opener = m2urllib2.build_opener(ctx)856                opener.addheaders = [('Connection', 'close')]857                self.assertRaises(SSL.SSLError, opener.open, 'https://%s:%s/' % (srv_host, srv_port))858            finally:859                self.stop_server(pid)860        def test_z_urllib2_opener(self):861            pid = self.start_server(self.args)862            try:863                ctx = SSL.Context()864                from M2Crypto import m2urllib2865                opener = m2urllib2.build_opener(ctx, m2urllib2.HTTPBasicAuthHandler())866                m2urllib2.install_opener(opener)867                req = m2urllib2.Request('https://%s:%s/' % (srv_host, srv_port))868                u = m2urllib2.urlopen(req)869                data = u.read()870                u.close()871            finally:872                self.stop_server(pid)873            self.failIf(string.find(data, 's_server -quiet -www') == -1)874        def test_urllib2_opener_handlers(self):875            ctx = SSL.Context()876            from M2Crypto import m2urllib2877            opener = m2urllib2.build_opener(ctx,878                                            m2urllib2.HTTPBasicAuthHandler())879        def test_urllib2_leak(self):880            pid = self.start_server(self.args)881            try:882                import gc883                from M2Crypto import m2urllib2884                o = m2urllib2.build_opener()885                r = o.open('https://%s:%s/' % (srv_host, srv_port))886                s = [r.fp._sock.fp]887                r.close()888                self.assertEqual(len(gc.get_referrers(s[0])), 1)889            finally:890                self.stop_server(pid)891class TwistedSSLClientTestCase(BaseSSLClientTestCase):892    def test_twisted_wrapper(self):893        # Test only when twisted and ZopeInterfaces are present894        try:895            from twisted.internet.protocol import ClientFactory896            from twisted.protocols.basic import LineReceiver897            from twisted.internet import reactor898            import M2Crypto.SSL.TwistedProtocolWrapper as wrapper899        except ImportError:900            import warnings901            warnings.warn('Skipping twisted wrapper test because twisted not found')902            return903        904        class EchoClient(LineReceiver):905            def connectionMade(self):906                self.sendLine('GET / HTTP/1.0\n\n')907            def lineReceived(self, line):908                global twisted_data909                twisted_data += line910        class EchoClientFactory(ClientFactory):911            protocol = EchoClient912        913            def clientConnectionFailed(self, connector, reason):914                reactor.stop()915                assert 0, reason916        917            def clientConnectionLost(self, connector, reason):918                reactor.stop()919                920        pid = self.start_server(self.args)921        class ContextFactory:922            def getContext(self):923                return SSL.Context()924        try:925            global twisted_data926            twisted_data = ''927            928            contextFactory = ContextFactory()929            factory = EchoClientFactory()930            wrapper.connectSSL(srv_host, srv_port, factory, contextFactory)931            reactor.run() # This will block until reactor.stop() is called932        finally:933            self.stop_server(pid)934        self.failIf(string.find(twisted_data, 's_server -quiet -www') == -1)...test_ssl.py.svn-base
Source:test_ssl.py.svn-base  
...58            pass59    return False60class BaseSSLClientTestCase(unittest.TestCase):61    openssl_in_path = find_openssl()62    def start_server(self, args):63        if not self.openssl_in_path:64            raise Exception('openssl command not in PATH')65        66        pid = os.fork()67        if pid == 0:68            # openssl must be started in the tests directory for it69            # to find the .pem files70            os.chdir('tests')71            try:72                os.execvp('openssl', args)73            finally:74                os.chdir('..')75                76        else:77            time.sleep(sleepTime)78            return pid79    def stop_server(self, pid):80        os.kill(pid, 1)81        os.waitpid(pid, 0)82    def http_get(self, s):83        s.send('GET / HTTP/1.0\n\n') 84        resp = ''85        while 1:86            try:87                r = s.recv(4096)88                if not r:89                    break90            except SSL.SSLError: # s_server throws an 'unexpected eof'...91                break92            resp = resp + r 93        return resp94    def setUp(self):95        self.srv_host = srv_host96        self.srv_port = srv_port97        self.srv_addr = (srv_host, srv_port)98        self.srv_url = 'https://%s:%s/' % (srv_host, srv_port)99        self.args = ['s_server', '-quiet', '-www',100                     #'-cert', 'server.pem', Implicitly using this101                     '-accept', str(self.srv_port)]102    def tearDown(self):103        global srv_port104        srv_port = srv_port - 1105class PassSSLClientTestCase(BaseSSLClientTestCase):106        107    def test_pass(self):108        pass109class HttpslibSSLClientTestCase(BaseSSLClientTestCase):110    def test_HTTPSConnection(self):111        pid = self.start_server(self.args)112        try:113            from M2Crypto import httpslib114            c = httpslib.HTTPSConnection(srv_host, srv_port)115            c.request('GET', '/')116            data = c.getresponse().read()117            c.close()118        finally:119            self.stop_server(pid)120        self.failIf(string.find(data, 's_server -quiet -www') == -1)121    def test_HTTPSConnection_resume_session(self):122        pid = self.start_server(self.args)123        try:124            from M2Crypto import httpslib125            ctx = SSL.Context()126            ctx.load_verify_locations(cafile='tests/ca.pem')127            ctx.load_cert('tests/x509.pem')128            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 1)129            ctx.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT)130            c = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx)131            c.request('GET', '/')132            ses = c.get_session()133            t = ses.as_text()134            data = c.getresponse().read()135            # Appearently closing connection here screws session; Ali Polatel?136            # c.close()137            138            ctx2 = SSL.Context()139            ctx2.load_verify_locations(cafile='tests/ca.pem')140            ctx2.load_cert('tests/x509.pem')141            ctx2.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 1)142            ctx2.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT)143            c2 = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx2)144            c2.set_session(ses)145            c2.request('GET', '/')146            ses2 = c2.get_session()147            t2 = ses2.as_text()148            data = c2.getresponse().read()149            c.close()150            c2.close()151            assert t == t2, "Sessions did not match"152        finally:153            self.stop_server(pid)154        self.failIf(string.find(data, 's_server -quiet -www') == -1)155    def test_HTTPSConnection_secure_context(self):156        pid = self.start_server(self.args)157        try:158            from M2Crypto import httpslib159            ctx = SSL.Context()160            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)161            ctx.load_verify_locations('tests/ca.pem')162            c = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx)163            c.request('GET', '/')164            data = c.getresponse().read()165            c.close()166        finally:167            self.stop_server(pid)168        self.failIf(string.find(data, 's_server -quiet -www') == -1)169    def test_HTTPSConnection_secure_context_fail(self):170        pid = self.start_server(self.args)171        try:172            from M2Crypto import httpslib173            ctx = SSL.Context()174            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)175            ctx.load_verify_locations('tests/server.pem')176            c = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx)177            self.assertRaises(SSL.SSLError, c.request, 'GET', '/')178            c.close()179        finally:180            self.stop_server(pid)181    def test_HTTPS(self):182        pid = self.start_server(self.args)183        try:184            from M2Crypto import httpslib185            c = httpslib.HTTPS(srv_host, srv_port)186            c.putrequest('GET', '/')187            c.putheader('Accept', 'text/html')188            c.putheader('Accept', 'text/plain')189            c.endheaders()190            err, msg, headers = c.getreply()191            assert err == 200, err192            f = c.getfile()193            data = f.read()194            c.close()195        finally:196            self.stop_server(pid)197        self.failIf(string.find(data, 's_server -quiet -www') == -1)198    def test_HTTPS_secure_context(self):199        pid = self.start_server(self.args)200        try:201            from M2Crypto import httpslib202            ctx = SSL.Context()203            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)204            ctx.load_verify_locations('tests/ca.pem')205            c = httpslib.HTTPS(srv_host, srv_port, ssl_context=ctx)206            c.putrequest('GET', '/')207            c.putheader('Accept', 'text/html')208            c.putheader('Accept', 'text/plain')209            c.endheaders()210            err, msg, headers = c.getreply()211            assert err == 200, err212            f = c.getfile()213            data = f.read()214            c.close()215        finally:216            self.stop_server(pid)217        self.failIf(string.find(data, 's_server -quiet -www') == -1)218    def test_HTTPS_secure_context_fail(self):219        pid = self.start_server(self.args)220        try:221            from M2Crypto import httpslib222            ctx = SSL.Context()223            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)224            ctx.load_verify_locations('tests/server.pem')225            c = httpslib.HTTPS(srv_host, srv_port, ssl_context=ctx)226            c.putrequest('GET', '/')227            c.putheader('Accept', 'text/html')228            c.putheader('Accept', 'text/plain')229            self.assertRaises(SSL.SSLError, c.endheaders)230            c.close()231        finally:232            self.stop_server(pid)233            234    def test_HTTPSConnection_illegalkeywordarg(self):235        from M2Crypto import httpslib236        self.assertRaises(ValueError, httpslib.HTTPSConnection, 'example.org',237                          badKeyword=True)238class MiscSSLClientTestCase(BaseSSLClientTestCase):239    def test_no_connection(self):240        ctx = SSL.Context()241        s = SSL.Connection(ctx)242        243    def test_server_simple(self):244        pid = self.start_server(self.args)245        try:246            self.assertRaises(ValueError, SSL.Context, 'tlsv5')247            ctx = SSL.Context()248            s = SSL.Connection(ctx)249            s.connect(self.srv_addr)250            self.assertRaises(ValueError, s.read, 0)251            data = self.http_get(s)252            s.close()253        finally:254            self.stop_server(pid)255        self.failIf(string.find(data, 's_server -quiet -www') == -1)256    def test_server_simple_secure_context(self):257        pid = self.start_server(self.args)258        try:259            ctx = SSL.Context()260            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)261            ctx.load_verify_locations('tests/ca.pem')262            s = SSL.Connection(ctx)263            s.connect(self.srv_addr)264            data = self.http_get(s)265            s.close()266        finally:267            self.stop_server(pid)268        self.failIf(string.find(data, 's_server -quiet -www') == -1)269    def test_server_simple_secure_context_fail(self):270        pid = self.start_server(self.args)271        try:272            ctx = SSL.Context()273            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)274            ctx.load_verify_locations('tests/server.pem')275            s = SSL.Connection(ctx)276            self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)277            s.close()278        finally:279            self.stop_server(pid)280    def test_server_simple_timeouts(self):281        pid = self.start_server(self.args)282        try:283            self.assertRaises(ValueError, SSL.Context, 'tlsv5')284            ctx = SSL.Context()285            s = SSL.Connection(ctx)286            287            r = s.get_socket_read_timeout()288            w = s.get_socket_write_timeout()289            assert r.sec == 0, r.sec290            assert r.microsec == 0, r.microsec291            assert w.sec == 0, w.sec292            assert w.microsec == 0, w.microsec293            s.set_socket_read_timeout(SSL.timeout())294            s.set_socket_write_timeout(SSL.timeout(909,9))295            r = s.get_socket_read_timeout()296            w = s.get_socket_write_timeout()297            assert r.sec == 600, r.sec298            assert r.microsec == 0, r.microsec299            assert w.sec == 909, w.sec300            #assert w.microsec == 9, w.microsec XXX 4000301            302            s.connect(self.srv_addr)303            data = self.http_get(s)304            s.close()305        finally:306            self.stop_server(pid)307        self.failIf(string.find(data, 's_server -quiet -www') == -1)308    def test_tls1_nok(self):309        if fips_mode: # TLS is required in FIPS mode310            return311        self.args.append('-no_tls1')312        pid = self.start_server(self.args)313        try:314            ctx = SSL.Context('tlsv1')315            s = SSL.Connection(ctx)316            try:317                s.connect(self.srv_addr)318            except SSL.SSLError, e:319                self.failUnlessEqual(e[0], 'wrong version number')320            s.close()321        finally:322            self.stop_server(pid)323    def test_tls1_ok(self):324        self.args.append('-tls1')325        pid = self.start_server(self.args)326        try:327            ctx = SSL.Context('tlsv1')328            s = SSL.Connection(ctx)329            s.connect(self.srv_addr)330            data = self.http_get(s)331            s.close()332        finally:333            self.stop_server(pid)334        self.failIf(string.find(data, 's_server -quiet -www') == -1)335    def test_sslv23_no_v2(self):336        if fips_mode: # TLS is required in FIPS mode337            return338        self.args.append('-no_tls1')339        pid = self.start_server(self.args)340        try:341            ctx = SSL.Context('sslv23')342            s = SSL.Connection(ctx)343            s.connect(self.srv_addr)344            self.failUnlessEqual(s.get_version(), 'SSLv3')345            s.close()346        finally:347            self.stop_server(pid)348    def test_sslv23_no_v2_no_service(self):349        if fips_mode: # TLS is required in FIPS mode350            return351        self.args = self.args + ['-no_tls1', '-no_ssl3']352        pid = self.start_server(self.args)353        try:354            ctx = SSL.Context('sslv23')355            s = SSL.Connection(ctx)356            self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)357            s.close()358        finally:359            self.stop_server(pid)360    def test_sslv23_weak_crypto(self):361        if fips_mode: # TLS is required in FIPS mode362            return363        self.args = self.args + ['-no_tls1', '-no_ssl3']364        pid = self.start_server(self.args)365        try:366            ctx = SSL.Context('sslv23', weak_crypto=1)367            s = SSL.Connection(ctx)368            if m2.OPENSSL_VERSION_NUMBER < 0x10000000: # SSLv2 ciphers disabled by default in newer OpenSSL369                s.connect(self.srv_addr)370                self.failUnlessEqual(s.get_version(), 'SSLv2')371            else:372                self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)373            s.close()374        finally:375            self.stop_server(pid)376    def test_cipher_mismatch(self):377        self.args = self.args + ['-cipher', 'AES256-SHA']378        pid = self.start_server(self.args)379        try:380            ctx = SSL.Context()381            s = SSL.Connection(ctx)382            s.set_cipher_list('AES128-SHA')383            try:384                s.connect(self.srv_addr)385            except SSL.SSLError, e:386                self.failUnlessEqual(e[0], 'sslv3 alert handshake failure')387            s.close()388        finally:389            self.stop_server(pid)390        391    def test_no_such_cipher(self):392        self.args = self.args + ['-cipher', 'AES128-SHA']393        pid = self.start_server(self.args)394        try:395            ctx = SSL.Context()396            s = SSL.Connection(ctx)397            s.set_cipher_list('EXP-RC2-MD5')398            try:399                s.connect(self.srv_addr)400            except SSL.SSLError, e:401                self.failUnlessEqual(e[0], 'no ciphers available')402            s.close()403        finally:404            self.stop_server(pid)405        406    def test_no_weak_cipher(self):407        if fips_mode: # Weak ciphers are prohibited408            return409        self.args = self.args + ['-cipher', 'EXP']410        pid = self.start_server(self.args)411        try:412            ctx = SSL.Context()413            s = SSL.Connection(ctx)414            try:415                s.connect(self.srv_addr)416            except SSL.SSLError, e:417                self.failUnlessEqual(e[0], 'sslv3 alert handshake failure')418            s.close()419        finally:420            self.stop_server(pid)421        422    def test_use_weak_cipher(self):423        if fips_mode: # Weak ciphers are prohibited424            return425        self.args = self.args + ['-cipher', 'EXP']426        pid = self.start_server(self.args)427        try:428            ctx = SSL.Context(weak_crypto=1)429            s = SSL.Connection(ctx)430            s.connect(self.srv_addr)431            data = self.http_get(s)432            s.close()433        finally:434            self.stop_server(pid)435        self.failIf(string.find(data, 's_server -quiet -www') == -1)436        437    def test_cipher_ok(self):438        self.args = self.args + ['-cipher', 'AES128-SHA']439        pid = self.start_server(self.args)440        try:441            ctx = SSL.Context()442            s = SSL.Connection(ctx)443            s.set_cipher_list('AES128-SHA')444            s.connect(self.srv_addr)445            data = self.http_get(s)446            447            assert s.get_cipher().name() == 'AES128-SHA', s.get_cipher().name()448            449            cipher_stack = s.get_ciphers()450            assert cipher_stack[0].name() == 'AES128-SHA', cipher_stack[0].name()451            self.assertRaises(IndexError, cipher_stack.__getitem__, 2)452            # For some reason there are 2 entries in the stack453            #assert len(cipher_stack) == 1, len(cipher_stack)454            assert s.get_cipher_list() == 'AES128-SHA', s.get_cipher_list()455            456            # Test Cipher_Stack iterator457            i = 0458            for cipher in cipher_stack:459                i += 1460                assert cipher.name() == 'AES128-SHA', '"%s"' % cipher.name()461                self.assertEqual('AES128-SHA-128', str(cipher))462            # For some reason there are 2 entries in the stack463            #assert i == 1, i464            self.assertEqual(i, len(cipher_stack))465            466            s.close()467        finally:468            self.stop_server(pid)469        self.failIf(string.find(data, 's_server -quiet -www') == -1)470        471    def verify_cb_new(self, ok, store):472        return verify_cb_new_function(ok, store)473    def test_verify_cb_new(self):474        pid = self.start_server(self.args)475        try:476            ctx = SSL.Context()477            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,478                           self.verify_cb_new)479            s = SSL.Connection(ctx)480            try:481                s.connect(self.srv_addr)482            except SSL.SSLError, e:483                assert 0, e484            data = self.http_get(s)485            s.close()486        finally:487            self.stop_server(pid)488        self.failIf(string.find(data, 's_server -quiet -www') == -1)489    def test_verify_cb_new_class(self):490        pid = self.start_server(self.args)491        try:492            ctx = SSL.Context()493            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,494                           VerifyCB())495            s = SSL.Connection(ctx)496            try:497                s.connect(self.srv_addr)498            except SSL.SSLError, e:499                assert 0, e500            data = self.http_get(s)501            s.close()502        finally:503            self.stop_server(pid)504        self.failIf(string.find(data, 's_server -quiet -www') == -1)505    def test_verify_cb_new_function(self):506        pid = self.start_server(self.args)507        try:508            ctx = SSL.Context()509            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,510                           verify_cb_new_function)511            s = SSL.Connection(ctx)512            try:513                s.connect(self.srv_addr)514            except SSL.SSLError, e:515                assert 0, e516            data = self.http_get(s)517            s.close()518        finally:519            self.stop_server(pid)520        self.failIf(string.find(data, 's_server -quiet -www') == -1)521    def test_verify_cb_lambda(self):522        pid = self.start_server(self.args)523        try:524            ctx = SSL.Context()525            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,526                           lambda ok, store: 1)527            s = SSL.Connection(ctx)528            try:529                s.connect(self.srv_addr)530            except SSL.SSLError, e:531                assert 0, e532            data = self.http_get(s)533            s.close()534        finally:535            self.stop_server(pid)536        self.failIf(string.find(data, 's_server -quiet -www') == -1)537    def verify_cb_exception(self, ok, store):538        raise Exception, 'We should fail verification'539    def test_verify_cb_exception(self):540        pid = self.start_server(self.args)541        try:542            ctx = SSL.Context()543            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,544                           self.verify_cb_exception)545            s = SSL.Connection(ctx)546            self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)547            s.close()548        finally:549            self.stop_server(pid)550    def test_verify_cb_not_callable(self):551        ctx = SSL.Context()552        self.assertRaises(TypeError,553                          ctx.set_verify,554                          SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,555                          9,556                          1)557    def test_verify_cb_wrong_callable(self):558        pid = self.start_server(self.args)559        try:560            ctx = SSL.Context()561            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,562                           lambda _: '')563            s = SSL.Connection(ctx)564            self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)565            s.close()566        finally:567            self.stop_server(pid)568    def verify_cb_old(self, ctx_ptr, x509_ptr, err, depth, ok):569        try:570            from M2Crypto import X509571            assert not ok572            assert err == m2.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT or \573                   err == m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY or \574                   err == m2.X509_V_ERR_CERT_UNTRUSTED or \575                   err == m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE576            assert m2.ssl_ctx_get_cert_store(ctx_ptr)577            assert X509.X509(x509_ptr).as_pem()578        except AssertionError:579            # If we let exceptions propagate from here the580            # caller may see strange errors. This is cleaner.581            return 0582        return 1583    def test_verify_cb_old(self):584        pid = self.start_server(self.args)585        try:586            ctx = SSL.Context()587            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,588                           self.verify_cb_old)589            s = SSL.Connection(ctx)590            try:591                s.connect(self.srv_addr)592            except SSL.SSLError, e:593                assert 0, e594            data = self.http_get(s)595            s.close()596        finally:597            self.stop_server(pid)598        self.failIf(string.find(data, 's_server -quiet -www') == -1)599    def test_verify_allow_unknown_old(self):600        pid = self.start_server(self.args)601        try:602            ctx = SSL.Context()603            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,604                           SSL.cb.ssl_verify_callback)605            ctx.set_allow_unknown_ca(1)606            s = SSL.Connection(ctx)607            try:608                s.connect(self.srv_addr)609            except SSL.SSLError, e:610                assert 0, e611            data = self.http_get(s)612            s.close()613        finally:614            self.stop_server(pid)615        self.failIf(string.find(data, 's_server -quiet -www') == -1)616    def test_verify_allow_unknown_new(self):617        pid = self.start_server(self.args)618        try:619            ctx = SSL.Context()620            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,621                           SSL.cb.ssl_verify_callback_allow_unknown_ca)622            s = SSL.Connection(ctx)623            try:624                s.connect(self.srv_addr)625            except SSL.SSLError, e:626                assert 0, e627            data = self.http_get(s)628            s.close()629        finally:630            self.stop_server(pid)631        self.failIf(string.find(data, 's_server -quiet -www') == -1)632    def test_verify_cert(self):633        pid = self.start_server(self.args)634        try:635            ctx = SSL.Context()636            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)637            ctx.load_verify_locations('tests/ca.pem')638            s = SSL.Connection(ctx)639            try:640                s.connect(self.srv_addr)641            except SSL.SSLError, e:642                assert 0, e643            data = self.http_get(s)644            s.close()645        finally:646            self.stop_server(pid)647        self.failIf(string.find(data, 's_server -quiet -www') == -1)648    def test_verify_cert_fail(self):649        pid = self.start_server(self.args)650        try:651            ctx = SSL.Context()652            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)653            ctx.load_verify_locations('tests/server.pem')654            s = SSL.Connection(ctx)655            self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)656            s.close()657        finally:658            self.stop_server(pid)659    def test_verify_cert_mutual_auth(self):660        self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem'])        661        pid = self.start_server(self.args)662        try:663            ctx = SSL.Context()664            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)665            ctx.load_verify_locations('tests/ca.pem')666            ctx.load_cert('tests/x509.pem')667            s = SSL.Connection(ctx)668            try:669                s.connect(self.srv_addr)670            except SSL.SSLError, e:671                assert 0, e672            data = self.http_get(s)673            s.close()674        finally:675            self.stop_server(pid)676        self.failIf(string.find(data, 's_server -quiet -www') == -1)677    def test_verify_cert_mutual_auth_servernbio(self):678        self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem', '-nbio'])679        pid = self.start_server(self.args)680        try:681            ctx = SSL.Context()682            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)683            ctx.load_verify_locations('tests/ca.pem')684            ctx.load_cert('tests/x509.pem')685            s = SSL.Connection(ctx)686            try:687                s.connect(self.srv_addr)688            except SSL.SSLError, e:689                assert 0, e690            data = self.http_get(s)691            s.close()692        finally:693            self.stop_server(pid)694        self.failIf(string.find(data, 's_server -quiet -www') == -1)695    def test_verify_cert_mutual_auth_fail(self):696        self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem'])        697        pid = self.start_server(self.args)698        try:699            ctx = SSL.Context()700            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)701            ctx.load_verify_locations('tests/ca.pem')702            s = SSL.Connection(ctx)703            self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)704            s.close()705        finally:706            self.stop_server(pid)707    def test_verify_nocert_fail(self):708        self.args.extend(['-nocert'])        709        pid = self.start_server(self.args)710        try:711            ctx = SSL.Context()712            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)713            ctx.load_verify_locations('tests/ca.pem')714            s = SSL.Connection(ctx)715            self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)716            s.close()717        finally:718            self.stop_server(pid)719    def test_blocking0(self):720        pid = self.start_server(self.args)721        try:722            ctx = SSL.Context()723            s = SSL.Connection(ctx)724            s.setblocking(0)725            self.assertRaises(Exception, s.connect, self.srv_addr)726            s.close()727        finally:728            self.stop_server(pid)729    def test_blocking1(self):730        pid = self.start_server(self.args)731        try:732            ctx = SSL.Context()733            s = SSL.Connection(ctx)734            s.setblocking(1)735            try:736                s.connect(self.srv_addr)737            except SSL.SSLError, e:738                assert 0, e739            data = self.http_get(s)740            s.close()741        finally:742            self.stop_server(pid)743        self.failIf(string.find(data, 's_server -quiet -www') == -1)744    def test_makefile(self):745        pid = self.start_server(self.args)746        try:747            ctx = SSL.Context()748            s = SSL.Connection(ctx)749            try:750                s.connect(self.srv_addr)751            except SSL.SSLError, e:752                assert 0, e753            bio = s.makefile('rw')754            #s.close()  # XXX bug 6628?755            bio.write('GET / HTTP/1.0\n\n')756            bio.flush()757            data = bio.read()758            bio.close()759            s.close()760        finally:761            self.stop_server(pid)762        self.failIf(string.find(data, 's_server -quiet -www') == -1)763    def test_makefile_err(self):764        pid = self.start_server(self.args)765        try:766            ctx = SSL.Context()767            s = SSL.Connection(ctx)768            try:769                s.connect(self.srv_addr)770            except SSL.SSLError, e:771                assert 0, e772            f = s.makefile()773            data = self.http_get(s)774            s.close()775            del f776            del s777            err_code = Err.peek_error_code()778            assert not err_code, 'Unexpected error: %s' % err_code779            err = Err.get_error()780            assert not err, 'Unexpected error: %s' % err781        finally:782            self.stop_server(pid)783        self.failIf(string.find(data, 's_server -quiet -www') == -1)784    def test_info_callback(self):785        pid = self.start_server(self.args)786        try:787            ctx = SSL.Context()788            ctx.set_info_callback()789            s = SSL.Connection(ctx)790            s.connect(self.srv_addr)791            data = self.http_get(s)792            s.close()793        finally:794            self.stop_server(pid)795        self.failIf(string.find(data, 's_server -quiet -www') == -1)796class UrllibSSLClientTestCase(BaseSSLClientTestCase):797    def test_urllib(self):798        pid = self.start_server(self.args)799        try:800            from M2Crypto import m2urllib801            url = m2urllib.FancyURLopener()802            url.addheader('Connection', 'close')803            u = url.open('https://%s:%s/' % (srv_host, srv_port))804            data = u.read()805            u.close()806        finally:807            self.stop_server(pid)808        self.failIf(string.find(data, 's_server -quiet -www') == -1)809    # XXX Don't actually know how to use m2urllib safely!810    #def test_urllib_secure_context(self):811    #def test_urllib_secure_context_fail(self):812    # XXX Don't actually know how to use m2urllib safely!813    #def test_urllib_safe_context(self):814    #def test_urllib_safe_context_fail(self):815class Urllib2SSLClientTestCase(BaseSSLClientTestCase):816    if sys.version_info >= (2,4):817        def test_urllib2(self):818            pid = self.start_server(self.args)819            try:820                from M2Crypto import m2urllib2821                opener = m2urllib2.build_opener()822                opener.addheaders = [('Connection', 'close')]823                u = opener.open('https://%s:%s/' % (srv_host, srv_port))824                data = u.read()825                u.close()826            finally:827                self.stop_server(pid)828            self.failIf(string.find(data, 's_server -quiet -www') == -1)829    830        def test_urllib2_secure_context(self):831            pid = self.start_server(self.args)832            try:833                ctx = SSL.Context()834                ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)835                ctx.load_verify_locations('tests/ca.pem')836                837                from M2Crypto import m2urllib2838                opener = m2urllib2.build_opener(ctx)839                opener.addheaders = [('Connection', 'close')]           840                u = opener.open('https://%s:%s/' % (srv_host, srv_port))841                data = u.read()842                u.close()843            finally:844                self.stop_server(pid)845            self.failIf(string.find(data, 's_server -quiet -www') == -1)846    847        def test_urllib2_secure_context_fail(self):848            pid = self.start_server(self.args)849            try:850                ctx = SSL.Context()851                ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)852                ctx.load_verify_locations('tests/server.pem')853                854                from M2Crypto import m2urllib2855                opener = m2urllib2.build_opener(ctx)856                opener.addheaders = [('Connection', 'close')]857                self.assertRaises(SSL.SSLError, opener.open, 'https://%s:%s/' % (srv_host, srv_port))858            finally:859                self.stop_server(pid)860        def test_z_urllib2_opener(self):861            pid = self.start_server(self.args)862            try:863                ctx = SSL.Context()864                from M2Crypto import m2urllib2865                opener = m2urllib2.build_opener(ctx, m2urllib2.HTTPBasicAuthHandler())866                m2urllib2.install_opener(opener)867                req = m2urllib2.Request('https://%s:%s/' % (srv_host, srv_port))868                u = m2urllib2.urlopen(req)869                data = u.read()870                u.close()871            finally:872                self.stop_server(pid)873            self.failIf(string.find(data, 's_server -quiet -www') == -1)874        def test_urllib2_opener_handlers(self):875            ctx = SSL.Context()876            from M2Crypto import m2urllib2877            opener = m2urllib2.build_opener(ctx,878                                            m2urllib2.HTTPBasicAuthHandler())879        def test_urllib2_leak(self):880            pid = self.start_server(self.args)881            try:882                import gc883                from M2Crypto import m2urllib2884                o = m2urllib2.build_opener()885                r = o.open('https://%s:%s/' % (srv_host, srv_port))886                s = [r.fp._sock.fp]887                r.close()888                self.assertEqual(len(gc.get_referrers(s[0])), 1)889            finally:890                self.stop_server(pid)891class TwistedSSLClientTestCase(BaseSSLClientTestCase):892    def test_twisted_wrapper(self):893        # Test only when twisted and ZopeInterfaces are present894        try:895            from twisted.internet.protocol import ClientFactory896            from twisted.protocols.basic import LineReceiver897            from twisted.internet import reactor898            import M2Crypto.SSL.TwistedProtocolWrapper as wrapper899        except ImportError:900            import warnings901            warnings.warn('Skipping twisted wrapper test because twisted not found')902            return903        904        class EchoClient(LineReceiver):905            def connectionMade(self):906                self.sendLine('GET / HTTP/1.0\n\n')907            def lineReceived(self, line):908                global twisted_data909                twisted_data += line910        class EchoClientFactory(ClientFactory):911            protocol = EchoClient912        913            def clientConnectionFailed(self, connector, reason):914                reactor.stop()915                assert 0, reason916        917            def clientConnectionLost(self, connector, reason):918                reactor.stop()919                920        pid = self.start_server(self.args)921        class ContextFactory:922            def getContext(self):923                return SSL.Context()924        try:925            global twisted_data926            twisted_data = ''927            928            contextFactory = ContextFactory()929            factory = EchoClientFactory()930            wrapper.connectSSL(srv_host, srv_port, factory, contextFactory)931            reactor.run() # This will block until reactor.stop() is called932        finally:933            self.stop_server(pid)934        self.failIf(string.find(twisted_data, 's_server -quiet -www') == -1)...test_debug_server.py
Source:test_debug_server.py  
...20    return killed21def setup_server(start_server, find_free_port, no_eval=False, stdout=False, env=None):22    port = find_free_port()23    args = ["+DEBUG_LOG", "+NO_EVAL"] if no_eval else ["+DEBUG_LOG"]24    s = start_server(port, "test_debug_server", args, stdout=stdout, env=env)25    assert s.poll() is None26    uri = "ws://localhost:{0}".format(port)27    return s, uri28def test_continue_stop(start_server, find_free_port):29    s, uri = setup_server(start_server, find_free_port, True)30    async def test_logic():31        client = hgdb.HGDBClient(uri, None)32        await client.connect()33        await client.continue_()34        await client.stop()35    asyncio.get_event_loop().run_until_complete(test_logic())36    # check if process exit37    killed = is_killed(s)38    if not killed:...test_client_server.py
Source:test_client_server.py  
...36        # Process callbacks scheduled with call_soon by appending a callback37        # to stop the event loop then running it until it hits that callback.38        self.loop.call_soon(self.loop.stop)39        self.loop.run_forever()40    def start_server(self, **kwds):41        server = serve(handler, 'localhost', 8642, **kwds)42        self.server = self.loop.run_until_complete(server)43    def start_client(self, path='', **kwds):44        client = connect('ws://localhost:8642/' + path, **kwds)45        self.client = self.loop.run_until_complete(client)46    def stop_client(self):47        self.loop.run_until_complete(self.client.worker)48    def stop_server(self):49        self.server.close()50        self.loop.run_until_complete(self.server.wait_closed())51    def test_basic(self):52        self.start_server()53        self.start_client()54        self.loop.run_until_complete(self.client.send("Hello!"))55        reply = self.loop.run_until_complete(self.client.recv())56        self.assertEqual(reply, "Hello!")57        self.stop_client()58        self.stop_server()59    def test_server_close_while_client_connected(self):60        self.start_server()61        self.start_client()62        self.stop_server()63    def test_explicit_event_loop(self):64        self.start_server(loop=self.loop)65        self.start_client(loop=self.loop)66        self.loop.run_until_complete(self.client.send("Hello!"))67        reply = self.loop.run_until_complete(self.client.recv())68        self.assertEqual(reply, "Hello!")69        self.stop_client()70        self.stop_server()71    def test_protocol_attributes(self):72        self.start_server()73        self.start_client('attributes')74        expected_attrs = ('localhost', 8642, self.secure)75        client_attrs = (self.client.host, self.client.port, self.client.secure)76        self.assertEqual(client_attrs, expected_attrs)77        server_attrs = self.loop.run_until_complete(self.client.recv())78        self.assertEqual(server_attrs, repr(expected_attrs))79        self.stop_client()80        self.stop_server()81    def test_protocol_headers(self):82        self.start_server()83        self.start_client('headers')84        client_req = self.client.request_headers85        client_resp = self.client.response_headers86        self.assertEqual(client_req['User-Agent'], USER_AGENT)87        self.assertEqual(client_resp['Server'], USER_AGENT)88        server_req = self.loop.run_until_complete(self.client.recv())89        server_resp = self.loop.run_until_complete(self.client.recv())90        self.assertEqual(server_req, str(client_req))91        self.assertEqual(server_resp, str(client_resp))92        self.stop_client()93        self.stop_server()94    def test_protocol_raw_headers(self):95        self.start_server()96        self.start_client('raw_headers')97        client_req = self.client.raw_request_headers98        client_resp = self.client.raw_response_headers99        self.assertEqual(dict(client_req)['User-Agent'], USER_AGENT)100        self.assertEqual(dict(client_resp)['Server'], USER_AGENT)101        server_req = self.loop.run_until_complete(self.client.recv())102        server_resp = self.loop.run_until_complete(self.client.recv())103        self.assertEqual(server_req, repr(client_req))104        self.assertEqual(server_resp, repr(client_resp))105        self.stop_client()106        self.stop_server()107    def test_protocol_custom_request_headers_dict(self):108        self.start_server()109        self.start_client('raw_headers', extra_headers={'X-Spam': 'Eggs'})110        req_headers = self.loop.run_until_complete(self.client.recv())111        self.loop.run_until_complete(self.client.recv())112        self.assertIn("('X-Spam', 'Eggs')", req_headers)113        self.stop_client()114        self.stop_server()115    def test_protocol_custom_request_headers_list(self):116        self.start_server()117        self.start_client('raw_headers', extra_headers=[('X-Spam', 'Eggs')])118        req_headers = self.loop.run_until_complete(self.client.recv())119        self.loop.run_until_complete(self.client.recv())120        self.assertIn("('X-Spam', 'Eggs')", req_headers)121        self.stop_client()122        self.stop_server()123    def test_protocol_custom_response_headers_callable_dict(self):124        self.start_server(extra_headers=lambda p, r: {'X-Spam': 'Eggs'})125        self.start_client('raw_headers')126        self.loop.run_until_complete(self.client.recv())127        resp_headers = self.loop.run_until_complete(self.client.recv())128        self.assertIn("('X-Spam', 'Eggs')", resp_headers)129        self.stop_client()130        self.stop_server()131    def test_protocol_custom_response_headers_callable_list(self):132        self.start_server(extra_headers=lambda p, r: [('X-Spam', 'Eggs')])133        self.start_client('raw_headers')134        self.loop.run_until_complete(self.client.recv())135        resp_headers = self.loop.run_until_complete(self.client.recv())136        self.assertIn("('X-Spam', 'Eggs')", resp_headers)137        self.stop_client()138        self.stop_server()139    def test_protocol_custom_response_headers_dict(self):140        self.start_server(extra_headers={'X-Spam': 'Eggs'})141        self.start_client('raw_headers')142        self.loop.run_until_complete(self.client.recv())143        resp_headers = self.loop.run_until_complete(self.client.recv())144        self.assertIn("('X-Spam', 'Eggs')", resp_headers)145        self.stop_client()146        self.stop_server()147    def test_protocol_custom_response_headers_list(self):148        self.start_server(extra_headers=[('X-Spam', 'Eggs')])149        self.start_client('raw_headers')150        self.loop.run_until_complete(self.client.recv())151        resp_headers = self.loop.run_until_complete(self.client.recv())152        self.assertIn("('X-Spam', 'Eggs')", resp_headers)153        self.stop_client()154        self.stop_server()155    def test_no_subprotocol(self):156        self.start_server()157        self.start_client('subprotocol')158        server_subprotocol = self.loop.run_until_complete(self.client.recv())159        self.assertEqual(server_subprotocol, repr(None))160        self.assertEqual(self.client.subprotocol, None)161        self.stop_client()162        self.stop_server()163    def test_subprotocol_found(self):164        self.start_server(subprotocols=['superchat', 'chat'])165        self.start_client('subprotocol', subprotocols=['otherchat', 'chat'])166        server_subprotocol = self.loop.run_until_complete(self.client.recv())167        self.assertEqual(server_subprotocol, repr('chat'))168        self.assertEqual(self.client.subprotocol, 'chat')169        self.stop_client()170        self.stop_server()171    def test_subprotocol_not_found(self):172        self.start_server(subprotocols=['superchat'])173        self.start_client('subprotocol', subprotocols=['otherchat'])174        server_subprotocol = self.loop.run_until_complete(self.client.recv())175        self.assertEqual(server_subprotocol, repr(None))176        self.assertEqual(self.client.subprotocol, None)177        self.stop_client()178        self.stop_server()179    def test_subprotocol_not_offered(self):180        self.start_server()181        self.start_client('subprotocol', subprotocols=['otherchat', 'chat'])182        server_subprotocol = self.loop.run_until_complete(self.client.recv())183        self.assertEqual(server_subprotocol, repr(None))184        self.assertEqual(self.client.subprotocol, None)185        self.stop_client()186        self.stop_server()187    def test_subprotocol_not_requested(self):188        self.start_server(subprotocols=['superchat', 'chat'])189        self.start_client('subprotocol')190        server_subprotocol = self.loop.run_until_complete(self.client.recv())191        self.assertEqual(server_subprotocol, repr(None))192        self.assertEqual(self.client.subprotocol, None)193        self.stop_client()194        self.stop_server()195    @unittest.mock.patch.object(196        WebSocketServerProtocol, 'select_subprotocol', autospec=True)197    def test_subprotocol_error(self, _select_subprotocol):198        _select_subprotocol.return_value = 'superchat'199        self.start_server(subprotocols=['superchat'])200        with self.assertRaises(InvalidHandshake):201            self.start_client('subprotocol', subprotocols=['otherchat'])202        self.run_loop_once()203        self.stop_server()204    @unittest.mock.patch('websockets.server.read_request')205    def test_server_receives_malformed_request(self, _read_request):206        _read_request.side_effect = ValueError("read_request failed")207        self.start_server()208        with self.assertRaises(InvalidHandshake):209            self.start_client()210        self.stop_server()211    @unittest.mock.patch('websockets.client.read_response')212    def test_client_receives_malformed_response(self, _read_response):213        _read_response.side_effect = ValueError("read_response failed")214        self.start_server()215        with self.assertRaises(InvalidHandshake):216            self.start_client()217        self.run_loop_once()218        self.stop_server()219    @unittest.mock.patch('websockets.client.build_request')220    def test_client_sends_invalid_handshake_request(self, _build_request):221        def wrong_build_request(set_header):222            return '42'223        _build_request.side_effect = wrong_build_request224        self.start_server()225        with self.assertRaises(InvalidHandshake):226            self.start_client()227        self.stop_server()228    @unittest.mock.patch('websockets.server.build_response')229    def test_server_sends_invalid_handshake_response(self, _build_response):230        def wrong_build_response(set_header, key):231            return build_response(set_header, '42')232        _build_response.side_effect = wrong_build_response233        self.start_server()234        with self.assertRaises(InvalidHandshake):235            self.start_client()236        self.stop_server()237    @unittest.mock.patch('websockets.client.read_response')238    def test_server_does_not_switch_protocols(self, _read_response):239        @asyncio.coroutine240        def wrong_read_response(stream):241            code, headers = yield from read_response(stream)242            return 400, headers243        _read_response.side_effect = wrong_read_response244        self.start_server()245        with self.assertRaises(InvalidHandshake):246            self.start_client()247        self.run_loop_once()248        self.stop_server()249    @unittest.mock.patch('websockets.server.WebSocketServerProtocol.send')250    def test_server_handler_crashes(self, send):251        send.side_effect = ValueError("send failed")252        self.start_server()253        self.start_client()254        self.loop.run_until_complete(self.client.send("Hello!"))255        with self.assertRaises(ConnectionClosed):256            self.loop.run_until_complete(self.client.recv())257        self.stop_client()258        self.stop_server()259        # Connection ends with an unexpected error.260        self.assertEqual(self.client.close_code, 1011)261    @unittest.mock.patch('websockets.server.WebSocketServerProtocol.close')262    def test_server_close_crashes(self, close):263        close.side_effect = ValueError("close failed")264        self.start_server()265        self.start_client()266        self.loop.run_until_complete(self.client.send("Hello!"))267        reply = self.loop.run_until_complete(self.client.recv())268        self.assertEqual(reply, "Hello!")269        self.stop_client()270        self.stop_server()271        # Connection ends with an abnormal closure.272        self.assertEqual(self.client.close_code, 1006)273@unittest.skipUnless(os.path.exists(testcert), "test certificate is missing")274class SSLClientServerTests(ClientServerTests):275    secure = True276    @property277    def server_context(self):278        ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)279        ssl_context.load_cert_chain(testcert)280        return ssl_context281    @property282    def client_context(self):283        ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)284        ssl_context.load_verify_locations(testcert)285        ssl_context.verify_mode = ssl.CERT_REQUIRED286        return ssl_context287    def start_server(self, *args, **kwds):288        kwds['ssl'] = self.server_context289        server = serve(handler, 'localhost', 8642, **kwds)290        self.server = self.loop.run_until_complete(server)291    def start_client(self, path='', **kwds):292        kwds['ssl'] = self.client_context293        client = connect('wss://localhost:8642/' + path, **kwds)294        self.client = self.loop.run_until_complete(client)295    def test_ws_uri_is_rejected(self):296        self.start_server()297        client = connect('ws://localhost:8642/', ssl=self.client_context)298        with self.assertRaises(ValueError):299            self.loop.run_until_complete(client)300        self.stop_server()301class ClientServerOriginTests(unittest.TestCase):302    def setUp(self):303        self.loop = asyncio.new_event_loop()304        asyncio.set_event_loop(self.loop)305    def tearDown(self):306        self.loop.close()307    def test_checking_origin_succeeds(self):308        server = self.loop.run_until_complete(309            serve(handler, 'localhost', 8642, origins=['http://localhost']))310        client = self.loop.run_until_complete(...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
