How to use read_pipe method in autotest

1from collections import namedtuple2import os3import errno4import threading5import time6import uuid7import struct8from nose import SkipTest9from import eq_10from import raises11import mock12from kazoo.exceptions import ConnectionLoss13from kazoo.protocol.serialization import (14 Connect,15 int_struct,16 write_string,17)18from kazoo.protocol.states import KazooState19from kazoo.protocol.connection import _CONNECTION_DROP20from kazoo.testing import KazooTestCase21from kazoo.tests.util import wait22class Delete(namedtuple('Delete', 'path version')):23 type = 224 def serialize(self):25 b = bytearray()26 b.extend(write_string(self.path))27 b.extend(int_struct.pack(self.version))28 return b29 @classmethod30 def deserialize(self, bytes, offset):31 raise ValueError("oh my")32class TestConnectionHandler(KazooTestCase):33 def test_bad_deserialization(self):34 async_object = self.client.handler.async_result()35 self.client._queue.append((Delete(self.client.chroot, -1), async_object))36 os.write(self.client._connection._write_pipe, b'\0')37 @raises(ValueError)38 def testit():39 async_object.get()40 testit()41 def test_with_bad_sessionid(self):42 ev = threading.Event()43 def expired(state):44 if state == KazooState.CONNECTED:45 ev.set()46 password = os.urandom(16)47 client = self._get_client(client_id=(82838284824, password))48 client.add_listener(expired)49 client.start()50 try:51 ev.wait(15)52 eq_(ev.is_set(), True)53 finally:54 client.stop()55 def test_connection_read_timeout(self):56 client = self.client57 ev = threading.Event()58 path = "/" + uuid.uuid4().hex59 handler = client.handler60 _select = handler.select61 _socket = client._connection._socket62 def delayed_select(*args, **kwargs):63 result = _select(*args, **kwargs)64 if len(args[0]) == 1 and _socket in args[0]:65 # for any socket read, simulate a timeout66 return [], [], []67 return result68 def back(state):69 if state == KazooState.CONNECTED:70 ev.set()71 client.add_listener(back)72 client.create(path, b"1")73 try:74 = delayed_select75 self.assertRaises(ConnectionLoss, client.get, path)76 finally:77 = _select78 # the client reconnects automatically79 ev.wait(5)80 eq_(ev.is_set(), True)81 eq_(client.get(path)[0], b"1")82 def test_connection_write_timeout(self):83 client = self.client84 ev = threading.Event()85 path = "/" + uuid.uuid4().hex86 handler = client.handler87 _select = handler.select88 _socket = client._connection._socket89 def delayed_select(*args, **kwargs):90 result = _select(*args, **kwargs)91 if _socket in args[1]:92 # for any socket write, simulate a timeout93 return [], [], []94 return result95 def back(state):96 if state == KazooState.CONNECTED:97 ev.set()98 client.add_listener(back)99 try:100 = delayed_select101 self.assertRaises(ConnectionLoss, client.create, path)102 finally:103 = _select104 # the client reconnects automatically105 ev.wait(5)106 eq_(ev.is_set(), True)107 eq_(client.exists(path), None)108 def test_connection_deserialize_fail(self):109 client = self.client110 ev = threading.Event()111 path = "/" + uuid.uuid4().hex112 handler = client.handler113 _select = handler.select114 _socket = client._connection._socket115 def delayed_select(*args, **kwargs):116 result = _select(*args, **kwargs)117 if _socket in args[1]:118 # for any socket write, simulate a timeout119 return [], [], []120 return result121 def back(state):122 if state == KazooState.CONNECTED:123 ev.set()124 client.add_listener(back)125 deserialize_ev = threading.Event()126 def bad_deserialize(bytes, offset):127 deserialize_ev.set()128 raise struct.error()129 # force the connection to die but, on reconnect, cause the130 # server response to be non-deserializable. ensure that the client131 # continues to retry. This partially reproduces a rare bug seen132 # in production.133 with mock.patch.object(Connect, 'deserialize') as mock_deserialize:134 mock_deserialize.side_effect = bad_deserialize135 try:136 = delayed_select137 self.assertRaises(ConnectionLoss, client.create, path)138 finally:139 = _select140 # the client reconnects automatically but the first attempt will141 # hit a deserialize failure. wait for that.142 deserialize_ev.wait(5)143 eq_(deserialize_ev.is_set(), True)144 # this time should succeed145 ev.wait(5)146 eq_(ev.is_set(), True)147 eq_(client.exists(path), None)148 def test_connection_close(self):149 self.assertRaises(Exception, self.client.close)150 self.client.stop()151 self.client.close()152 # should be able to restart153 self.client.start()154 def test_connection_pipe(self):155 client = self.client156 read_pipe = client._connection._read_pipe157 write_pipe = client._connection._write_pipe158 assert read_pipe is not None159 assert write_pipe is not None160 # stop client and pipe should not yet be closed161 client.stop()162 assert read_pipe is not None163 assert write_pipe is not None164 os.fstat(read_pipe)165 os.fstat(write_pipe)166 # close client, and pipes should be167 client.close()168 try:169 os.fstat(read_pipe)170 except OSError as e:171 if not e.errno == errno.EBADF:172 raise173 else:174"Expected read_pipe to be closed")175 try:176 os.fstat(write_pipe)177 except OSError as e:178 if not e.errno == errno.EBADF:179 raise180 else:181"Expected write_pipe to be closed")182 # start client back up. should get a new, valid pipe183 client.start()184 read_pipe = client._connection._read_pipe185 write_pipe = client._connection._write_pipe186 assert read_pipe is not None187 assert write_pipe is not None188 os.fstat(read_pipe)189 os.fstat(write_pipe)190 def test_dirty_pipe(self):191 client = self.client192 read_pipe = client._connection._read_pipe193 write_pipe = client._connection._write_pipe194 # add a stray byte to the pipe and ensure that doesn't195 # blow up client. simulates case where some error leaves196 # a byte in the pipe which doesn't correspond to the197 # request queue.198 os.write(write_pipe, b'\0')199 # eventually this byte should disappear from pipe200 wait(lambda:[read_pipe], [], [], 0)[0] == [])201class TestConnectionDrop(KazooTestCase):202 def test_connection_dropped(self):203 ev = threading.Event()204 def back(state):205 if state == KazooState.CONNECTED:206 ev.set()207 # create a node with a large value and stop the ZK node208 path = "/" + uuid.uuid4().hex209 self.client.create(path)210 self.client.add_listener(back)211 result = self.client.set_async(path, b'a' * 1000 * 1024)212 self.client._call(_CONNECTION_DROP, None)213 self.assertRaises(ConnectionLoss, result.get)214 # we have a working connection to a new node215 ev.wait(30)216 eq_(ev.is_set(), True)217class TestReadOnlyMode(KazooTestCase):218 def setUp(self):219 self.setup_zookeeper(read_only=True)220 ver = self.client.server_version()221 if ver[1] < 4:222 raise SkipTest("Must use zookeeper 3.4 or above")223 def tearDown(self):224 self.client.stop()225 def test_read_only(self):226 from kazoo.exceptions import NotReadOnlyCallError227 from kazoo.protocol.states import KeeperState228 client = self.client229 states = []230 ev = threading.Event()231 @client.add_listener232 def listen(state):233 states.append(state)234 if client.client_state == KeeperState.CONNECTED_RO:235 ev.set()236 try:237 self.cluster[1].stop()238 self.cluster[2].stop()239 ev.wait(6)240 eq_(ev.is_set(), True)241 eq_(client.client_state, KeeperState.CONNECTED_RO)242 # Test read only command243 eq_(client.get_children('/'), [])244 # Test error with write command245 @raises(NotReadOnlyCallError)246 def testit():247 client.create('/fred')248 testit()249 # Wait for a ping250 time.sleep(15)251 finally:252 client.remove_listener(listen)253 self.cluster[1].run()...

...104 write_pipe.readln()105 assert_that(str(context.exception), contains_string("Attempted to read from '/home/michael/dev/MAMEToolkit/test/emulator/mame/pipes/write-env1.pipe' in 'w' mode"))106 finally:107 close_pipes(write_pipe, lua_read_pipe)108 def test_write_to_read_pipe(self):109 read_pipe, lua_write_pipe = [None, None]110 try:111 read_pipe = Pipe("env1", "read", 'r', "mame/pipes")112 lua_write_pipe = setup_pipe(read_pipe)113 with self.assertRaises(IOError) as context:114 read_pipe.writeln("TEST")115 assert_that(str(context.exception), contains_string("Attempted to write to '/home/michael/dev/MAMEToolkit/test/emulator/mame/pipes/read-env1.pipe' in 'r' mode"))116 finally:117 close_pipes(read_pipe, lua_write_pipe)118 def test_lua_string(self):119 write_pipe, lua_read_pipe, read_pipe, lua_write_pipe = [None, None, None, None]120 try:121 write_pipe, lua_read_pipe, read_pipe, lua_write_pipe = setup_all_pipes()122 assert_that(read_pipe.get_lua_string(args=["test"]), equal_to('readPipe:write(test.."\\n"); readPipe:flush(); '))...

