Best Python code snippet using localstack_python
test_namedpipe.py
Source:test_namedpipe.py  
...38            self._mock_output_queue,39            self._mock_client_connected,40            self._FAKE_LOG_PATH)41        self._handler._ioutils = self._ioutils42    def _mock_setup_pipe_handler(self):43        self._handler._log_file_handle = mock.Mock()44        self._handler._pipe_handle = mock.sentinel.pipe_handle45        self._handler._workers = [mock.Mock(), mock.Mock()]46        self._handler._r_buffer = mock.Mock()47        self._handler._w_buffer = mock.Mock()48        self._handler._r_overlapped = mock.Mock()49        self._handler._w_overlapped = mock.Mock()50        self._handler._r_completion_routine = mock.Mock()51        self._handler._w_completion_routine = mock.Mock()52    @mock.patch.object(builtins, 'open')53    @mock.patch.object(namedpipe.NamedPipeHandler, '_open_pipe')54    def test_start_pipe_handler(self, mock_open_pipe, mock_open):55        self._handler.start()56        mock_open_pipe.assert_called_once_with()57        mock_open.assert_called_once_with(self._FAKE_LOG_PATH, 'ab', 1)58        self.assertEqual(mock_open.return_value,59                         self._handler._log_file_handle)60        thread = namedpipe.threading.Thread61        thread.assert_has_calls(62            [mock.call(target=self._handler._read_from_pipe),63             mock.call().setDaemon(True),64             mock.call().start(),65             mock.call(target=self._handler._write_to_pipe),66             mock.call().setDaemon(True),67             mock.call().start()])68    @mock.patch.object(namedpipe.NamedPipeHandler, 'stop')69    @mock.patch.object(namedpipe.NamedPipeHandler, '_open_pipe')70    def test_start_pipe_handler_exception(self, mock_open_pipe,71                                          mock_stop_handler):72        mock_open_pipe.side_effect = Exception73        self.assertRaises(exceptions.OSWinException,74                          self._handler.start)75        mock_stop_handler.assert_called_once_with()76    @mock.patch.object(namedpipe.NamedPipeHandler, '_close_pipe')77    def test_stop_pipe_handler(self, mock_close_pipe):78        self._mock_setup_pipe_handler()79        self._handler.stop()80        self._handler._stopped.set.assert_called_once_with()81        mock_close_pipe.assert_called_once_with()82        self._handler._log_file_handle.close.assert_called_once_with()83        self._ioutils.set_event.assert_has_calls(84            [mock.call(self._handler._r_overlapped.hEvent),85             mock.call(self._handler._w_overlapped.hEvent)])86        self._ioutils.cancel_io.assert_called_once_with(87            mock.sentinel.pipe_handle)88        for worker in self._handler._workers:89            worker.join.assert_called_once_with()90    def test_setup_io_structures(self):91        self._handler._setup_io_structures()92        self.assertEqual(self._ioutils.get_buffer.return_value,93                         self._handler._r_buffer)94        self.assertEqual(self._ioutils.get_buffer.return_value,95                         self._handler._w_buffer)96        self.assertEqual(97            self._ioutils.get_new_overlapped_structure.return_value,98            self._handler._r_overlapped)99        self.assertEqual(100            self._ioutils.get_new_overlapped_structure.return_value,101            self._handler._w_overlapped)102        self.assertEqual(103            self._ioutils.get_completion_routine.return_value,104            self._handler._r_completion_routine)105        self.assertEqual(106            self._ioutils.get_completion_routine.return_value,107            self._handler._w_completion_routine)108        self.assertIsNone(self._handler._log_file_handle)109        self._ioutils.get_buffer.assert_has_calls(110            [mock.call(constants.SERIAL_CONSOLE_BUFFER_SIZE)] * 2)111        self._ioutils.get_completion_routine.assert_has_calls(112            [mock.call(self._handler._read_callback),113             mock.call()])114    def test_open_pipe(self):115        self._handler._open_pipe()116        self._ioutils.wait_named_pipe.assert_called_once_with(117            mock.sentinel.pipe_name)118        self._ioutils.open.assert_called_once_with(119            mock.sentinel.pipe_name,120            desired_access=(ioutils.GENERIC_READ | ioutils.GENERIC_WRITE),121            share_mode=(ioutils.FILE_SHARE_READ | ioutils.FILE_SHARE_WRITE),122            creation_disposition=ioutils.OPEN_EXISTING,123            flags_and_attributes=ioutils.FILE_FLAG_OVERLAPPED)124        self.assertEqual(self._ioutils.open.return_value,125                         self._handler._pipe_handle)126    def test_close_pipe(self):127        self._mock_setup_pipe_handler()128        self._handler._close_pipe()129        self._ioutils.close_handle.assert_called_once_with(130            mock.sentinel.pipe_handle)131        self.assertIsNone(self._handler._pipe_handle)132    def test_cancel_io(self):133        self._mock_setup_pipe_handler()134        self._handler._cancel_io()135        self._ioutils.set_event.assert_has_calls(136            [mock.call(self._handler._r_overlapped.hEvent),137             mock.call(self._handler._w_overlapped.hEvent)])138        self._ioutils.cancel_io.assert_called_once_with(139            mock.sentinel.pipe_handle)140    @mock.patch.object(namedpipe.NamedPipeHandler, '_start_io_worker')141    def test_read_from_pipe(self, mock_start_worker):142        self._mock_setup_pipe_handler()143        self._handler._read_from_pipe()144        mock_start_worker.assert_called_once_with(145            self._ioutils.read,146            self._handler._r_buffer,147            self._handler._r_overlapped,148            self._handler._r_completion_routine)149    @mock.patch.object(namedpipe.NamedPipeHandler, '_start_io_worker')150    def test_write_to_pipe(self, mock_start_worker):151        self._mock_setup_pipe_handler()152        self._handler._write_to_pipe()153        mock_start_worker.assert_called_once_with(154            self._ioutils.write,155            self._handler._w_buffer,156            self._handler._w_overlapped,157            self._handler._w_completion_routine,158            self._handler._get_data_to_write)159    def _test_start_io_worker(self, buff_update_func=None, exception=None):160        self._handler._stopped.isSet.side_effect = [False, True]161        self._handler._pipe_handle = mock.sentinel.pipe_handle162        self._handler.stop = mock.Mock()163        io_func = mock.Mock(side_effect=exception)164        fake_buffer = 'fake_buffer'165        self._handler._start_io_worker(io_func, fake_buffer,166                                       mock.sentinel.overlapped_structure,167                                       mock.sentinel.completion_routine,168                                       buff_update_func)169        if buff_update_func:170            num_bytes = buff_update_func()171        else:172            num_bytes = len(fake_buffer)173        io_func.assert_called_once_with(mock.sentinel.pipe_handle,174                                        fake_buffer, num_bytes,175                                        mock.sentinel.overlapped_structure,176                                        mock.sentinel.completion_routine)177        if exception:178            self._handler._stopped.set.assert_called_once_with()179    def test_start_io_worker(self):180        self._test_start_io_worker()181    def test_start_io_worker_with_buffer_update_method(self):182        self._test_start_io_worker(buff_update_func=mock.Mock())183    def test_start_io_worker_exception(self):184        self._test_start_io_worker(exception=IOError)185    @mock.patch.object(namedpipe.NamedPipeHandler, '_write_to_log')186    def test_read_callback(self, mock_write_to_log):187        self._mock_setup_pipe_handler()188        fake_data = self._ioutils.get_buffer_data.return_value189        self._handler._read_callback(mock.sentinel.num_bytes)190        self._ioutils.get_buffer_data.assert_called_once_with(191            self._handler._r_buffer, mock.sentinel.num_bytes)192        self._mock_output_queue.put.assert_called_once_with(fake_data)193        mock_write_to_log.assert_called_once_with(fake_data)194    @mock.patch.object(namedpipe, 'time')195    def test_get_data_to_write(self, mock_time):196        self._mock_setup_pipe_handler()197        self._handler._stopped.isSet.side_effect = [False, False]198        self._mock_client_connected.isSet.side_effect = [False, True]199        fake_data = 'fake input data'200        self._mock_input_queue.get.return_value = fake_data201        num_bytes = self._handler._get_data_to_write()202        mock_time.sleep.assert_called_once_with(1)203        self._ioutils.write_buffer_data.assert_called_once_with(204            self._handler._w_buffer, fake_data)205        self.assertEqual(len(fake_data), num_bytes)206    @mock.patch.object(namedpipe.NamedPipeHandler, '_rotate_logs')207    def _test_write_to_log(self, mock_rotate_logs, size_exceeded=False):208        self._mock_setup_pipe_handler()209        self._handler._stopped.isSet.return_value = False210        fake_handle = self._handler._log_file_handle211        fake_handle.tell.return_value = (constants.MAX_CONSOLE_LOG_FILE_SIZE212                                         if size_exceeded else 0)213        fake_data = 'fake_data'214        self._handler._write_to_log(fake_data)215        if size_exceeded:216            mock_rotate_logs.assert_called_once_with()217        self._handler._log_file_handle.write.assert_called_once_with(218            fake_data)219    def test_write_to_log(self):220        self._test_write_to_log()221    def test_write_to_log_size_exceeded(self):222        self._test_write_to_log(size_exceeded=True)223    @mock.patch.object(namedpipe.NamedPipeHandler, '_retry_if_file_in_use')224    @mock.patch.object(builtins, 'open')225    @mock.patch.object(namedpipe, 'os')226    def test_rotate_logs(self, mock_os, mock_open, mock_exec_retry):227        fake_archived_log_path = self._FAKE_LOG_PATH + '.1'228        mock_os.path.exists.return_value = True229        self._mock_setup_pipe_handler()230        fake_handle = self._handler._log_file_handle231        self._handler._rotate_logs()232        fake_handle.flush.assert_called_once_with()233        fake_handle.close.assert_called_once_with()234        mock_os.path.exists.assert_called_once_with(235            fake_archived_log_path)236        mock_exec_retry.assert_has_calls([mock.call(mock_os.remove,237                                                    fake_archived_log_path),238                                          mock.call(mock_os.rename,239                                                    self._FAKE_LOG_PATH,240                                                    fake_archived_log_path)])241        mock_open.assert_called_once_with(self._FAKE_LOG_PATH, 'ab', 1)242        self.assertEqual(mock_open.return_value,243                         self._handler._log_file_handle)...motors_controller.py
Source:motors_controller.py  
1import mathutils2class MotorController:3    """Handles 4 motors with mecanum wheels motion."""4    def __init__(self, motors_handler, sensors_handler,5                 encoders_handler, brake_delay):6        self._handler = motors_handler7        self._sensors_input = True8        self._sensors_handler = sensors_handler9        self._encoders_handler = encoders_handler10        self._mathUtils = mathutils.MathUtils()11        self._brake_delay = brake_delay12    """Moves the robot forward"""13    def forward(self, speed: int):14        self._handler.set_motor('A', 'F', speed)15        self._handler.set_motor('B', 'F', speed)16        self._handler.set_motor('C', 'F', speed)17        self._handler.set_motor('D', 'F', speed)18        self._handler.write_motors()19    def forward(self, speed: int, rotations: float):20        if not self._sensors_input:21            return22        encoders_data = []23        while not encoders_data:24            encoders_data = self._encoders_handler.read()25        target_values = []26        for encoder in encoders_data:27            target_values.append(encoder + rotations)28        while True:29            encoders_data = []30            while not encoders_data:31                encoders_data = self._encoders_handler.read()32            diffs = []33            self.forward(speed)34            for i in range(len(encoders_data)):35                diffs = target_values[i] - encoders_data[i]36            if abs(diffs[0]) < 5:37                self.brake('A')38                return39            elif abs(diffs[1]) < 5:40                self.brake('B')41                return42            elif abs(diffs[2]) < 5:43                self.brake('C')44                return45            elif abs(diffs[3]) < 5:46                self.brake('D')47                return48    """Moves the robot backward."""49    def backward(self, speed: int):50        self._handler.set_motor('A', 'B', speed)51        self._handler.set_motor('B', 'B', speed)52        self._handler.set_motor('C', 'B', speed)53        self._handler.set_motor('D', 'B', speed)54        self._handler.write_motors()55    def backward(self, speed: int, rotations: float):56        if not self._sensors_input:57            return58        encoders_data = []59        while not encoders_data:60            encoders_data = self._encoders_handler.read()61        target_values = []62        for encoder in encoders_data:63            target_values.append(encoder - rotations)64        while True:65            encoders_data = []66            while not encoders_data:67                encoders_data = self._encoders_handler.read()68            diffs = []69            self.backward(speed)70            for i in range(len(encoders_data)):71                diffs = target_values[i] - encoders_data[i]72            if abs(diffs[0]) < 5:73                self.brake('A')74                return75            elif abs(diffs[1]) < 5:76                self.brake('B')77                return78            elif abs(diffs[2]) < 5:79                self.brake('C')80                return81            elif abs(diffs[3]) < 5:82                self.brake('D')83                return84    def left(self, speed: int):85        self._handler.set_motor('A', 'B', speed)86        self._handler.set_motor('B', 'F', speed)87        self._handler.set_motor('C', 'B', speed)88        self._handler.set_motor('D', 'F', speed)89        self._handler.write_motors()90    def right(self, speed: int):91        self._handler.set_motor('A', 'F', speed)92        self._handler.set_motor('B', 'B', speed)93        self._handler.set_motor('C', 'F', speed)94        self._handler.set_motor('D', 'B', speed)95        self._handler.write_motors()96    """Slides the robot to a given angle."""97    def slide(self, angle: float, speed: int):98        if not self._sensors_input:99            return100        if angle >= 0:101            if angle <= 45:102                bc_speed = int(self._mathUtils.valmap(angle, 45, 0, 0, speed))103                self._handler.set_motor('A', 'F', speed)104                self._handler.set_motor('B', 'F', bc_speed)105                self._handler.set_motor('C', 'F', bc_speed)106                self._handler.set_motor('D', 'F', speed)107            elif angle <= 90:108                bc_speed = int(self._mathUtils.valmap(angle, 45, 90, 0, speed))109                self._handler.set_motor('A', 'F', speed)110                self._handler.set_motor('B', 'B', bc_speed)111                self._handler.set_motor('C', 'B', bc_speed)112                self._handler.set_motor('D', 'F', speed)113            elif angle <= 135:114                ad_speed = int(115                    self._mathUtils.valmap(angle, 135, 90, 0, speed)116                )117                self._handler.set_motor('A', 'F', ad_speed)118                self._handler.set_motor('B', 'B', speed)119                self._handler.set_motor('C', 'B', speed)120                self._handler.set_motor('D', 'F', ad_speed)121            elif angle <= 180:122                bc_speed = int(123                    self._mathUtils.valmap(angle, 135, 180, 0, speed)124                )125                self._handler.set_motor('A', 'B', speed)126                self._handler.set_motor('B', 'B', bc_speed)127                self._handler.set_motor('C', 'B', bc_speed)128                self._handler.set_motor('D', 'B', speed)129        else:130            if angle >= -45:131                ad_speed = int(self._mathUtils.valmap(angle, -45, 0, 0, speed))132                self._handler.set_motor('A', 'F', ad_speed)133                self._handler.set_motor('B', 'F', speed)134                self._handler.set_motor('C', 'F', speed)135                self._handler.set_motor('D', 'F', ad_speed)136            elif angle >= -90:137                ad_speed = int(138                    self._mathUtils.valmap(angle, -45, -90, 0, speed)139                )140                self._handler.set_motor('A', 'B', ad_speed)141                self._handler.set_motor('B', 'F', speed)142                self._handler.set_motor('C', 'F', speed)143                self._handler.set_motor('D', 'B', ad_speed)144            elif angle >= -135:145                bc_speed = int(146                    self._mathUtils.valmap(angle, -135, -90, 0, speed)147                )148                self._handler.set_motor('A', 'B', speed)149                self._handler.set_motor('B', 'F', bc_speed)150                self._handler.set_motor('C', 'F', bc_speed)151                self._handler.set_motor('D', 'B', speed)152            elif angle >= -180:153                ad_speed = int(154                    self._mathUtils.valmap(angle, -135, -180, 0, speed)155                )156                self._handler.set_motor('A', 'B', ad_speed)157                self._handler.set_motor('B', 'B', speed)158                self._handler.set_motor('C', 'B', speed)159                self._handler.set_motor('D', 'B', ad_speed)160        self._handler.write_motors()161    """Turns the robot to a given angle"""162    def turn(self, angle: float, speed: int):163        if not self._sensors_input:164            return165        sensors_data = []166        while not sensors_data:167            sensors_data = self._sensors_handler.read()168        robot_angle = sensors_data[2]169        target_angle = robot_angle + angle170        target_angle = target_angle % 360171        if target_angle < 0:172            target_angle = target_angle + 360173        while True:174            sensors_data = []175            while not sensors_data:176                sensors_data = self._sensors_handler.read()177            robot_angle = sensors_data[2]178            diff = target_angle - robot_angle179            direction = 180 - (diff + 360) % 360180            if direction > 0:181                self.right(speed)182            else:183                self.left(speed)184            if abs(diff) < 5:185                self.brake()186                return187    def point_to(self, angle, speed):188        if not self._sensors_input:189            return190        sensors_data = []191        while not sensors_data:192            sensors_data = self._handler.get_sensors()193        robot_angle = sensors_data[2]194        diff = angle - robot_angle195        self.turn(diff, speed)196    def turn_manual(self, direction: str, speed: int):197        if direction == 'L':198            self.left(speed)199        elif direction == 'R':200            self.right(speed)201    """Stops all motors."""202    def brake(self):203        self._handler.set_motor('A', 'F', 0)204        self._handler.set_motor('B', 'F', 0)205        self._handler.set_motor('C', 'F', 0)206        self._handler.set_motor('D', 'F', 0)207        self._handler.write_motors()208    def brake(self, motor):209        self._handler.set_motor(motor, 'F', 0)..._invocation.py
Source:_invocation.py  
1# Copyright 2017 gRPC authors.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7#     http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14import logging15import threading16import grpc17_NOT_YET_OBSERVED = object()18logging.basicConfig()19_LOGGER = logging.getLogger(__name__)20def _cancel(handler):21    return handler.cancel(grpc.StatusCode.CANCELLED, 'Locally cancelled!')22def _is_active(handler):23    return handler.is_active()24def _time_remaining(unused_handler):25    raise NotImplementedError()26def _add_callback(handler, callback):27    return handler.add_callback(callback)28def _initial_metadata(handler):29    return handler.initial_metadata()30def _trailing_metadata(handler):31    trailing_metadata, unused_code, unused_details = handler.termination()32    return trailing_metadata33def _code(handler):34    unused_trailing_metadata, code, unused_details = handler.termination()35    return code36def _details(handler):37    unused_trailing_metadata, unused_code, details = handler.termination()38    return details39class _Call(grpc.Call):40    def __init__(self, handler):41        self._handler = handler42    def cancel(self):43        _cancel(self._handler)44    def is_active(self):45        return _is_active(self._handler)46    def time_remaining(self):47        return _time_remaining(self._handler)48    def add_callback(self, callback):49        return _add_callback(self._handler, callback)50    def initial_metadata(self):51        return _initial_metadata(self._handler)52    def trailing_metadata(self):53        return _trailing_metadata(self._handler)54    def code(self):55        return _code(self._handler)56    def details(self):57        return _details(self._handler)58class _RpcErrorCall(grpc.RpcError, grpc.Call):59    def __init__(self, handler):60        self._handler = handler61    def cancel(self):62        _cancel(self._handler)63    def is_active(self):64        return _is_active(self._handler)65    def time_remaining(self):66        return _time_remaining(self._handler)67    def add_callback(self, callback):68        return _add_callback(self._handler, callback)69    def initial_metadata(self):70        return _initial_metadata(self._handler)71    def trailing_metadata(self):72        return _trailing_metadata(self._handler)73    def code(self):74        return _code(self._handler)75    def details(self):76        return _details(self._handler)77def _next(handler):78    read = handler.take_response()79    if read.code is None:80        return read.response81    elif read.code is grpc.StatusCode.OK:82        raise StopIteration()83    else:84        raise _RpcErrorCall(handler)85class _HandlerExtras(object):86    def __init__(self):87        self.condition = threading.Condition()88        self.unary_response = _NOT_YET_OBSERVED89        self.cancelled = False90def _with_extras_cancel(handler, extras):91    with extras.condition:92        if handler.cancel(grpc.StatusCode.CANCELLED, 'Locally cancelled!'):93            extras.cancelled = True94            return True95        else:96            return False97def _extras_without_cancelled(extras):98    with extras.condition:99        return extras.cancelled100def _running(handler):101    return handler.is_active()102def _done(handler):103    return not handler.is_active()104def _with_extras_unary_response(handler, extras):105    with extras.condition:106        if extras.unary_response is _NOT_YET_OBSERVED:107            read = handler.take_response()108            if read.code is None:109                extras.unary_response = read.response110                return read.response111            else:112                raise _RpcErrorCall(handler)113        else:114            return extras.unary_response115def _exception(unused_handler):116    raise NotImplementedError('TODO!')117def _traceback(unused_handler):118    raise NotImplementedError('TODO!')119def _add_done_callback(handler, callback, future):120    adapted_callback = lambda: callback(future)121    if not handler.add_callback(adapted_callback):122        callback(future)123class _FutureCall(grpc.Future, grpc.Call):124    def __init__(self, handler, extras):125        self._handler = handler126        self._extras = extras127    def cancel(self):128        return _with_extras_cancel(self._handler, self._extras)129    def cancelled(self):130        return _extras_without_cancelled(self._extras)131    def running(self):132        return _running(self._handler)133    def done(self):134        return _done(self._handler)135    def result(self):136        return _with_extras_unary_response(self._handler, self._extras)137    def exception(self):138        return _exception(self._handler)139    def traceback(self):140        return _traceback(self._handler)141    def add_done_callback(self, fn):142        _add_done_callback(self._handler, fn, self)143    def is_active(self):144        return _is_active(self._handler)145    def time_remaining(self):146        return _time_remaining(self._handler)147    def add_callback(self, callback):148        return _add_callback(self._handler, callback)149    def initial_metadata(self):150        return _initial_metadata(self._handler)151    def trailing_metadata(self):152        return _trailing_metadata(self._handler)153    def code(self):154        return _code(self._handler)155    def details(self):156        return _details(self._handler)157def consume_requests(request_iterator, handler):158    def _consume():159        while True:160            try:161                request = next(request_iterator)162                added = handler.add_request(request)163                if not added:164                    break165            except StopIteration:166                handler.close_requests()167                break168            except Exception:  # pylint: disable=broad-except169                details = 'Exception iterating requests!'170                _LOGGER.exception(details)171                handler.cancel(grpc.StatusCode.UNKNOWN, details)172    consumption = threading.Thread(target=_consume)173    consumption.start()174def blocking_unary_response(handler):175    read = handler.take_response()176    if read.code is None:177        unused_trailing_metadata, code, unused_details = handler.termination()178        if code is grpc.StatusCode.OK:179            return read.response180        else:181            raise _RpcErrorCall(handler)182    else:183        raise _RpcErrorCall(handler)184def blocking_unary_response_with_call(handler):185    read = handler.take_response()186    if read.code is None:187        unused_trailing_metadata, code, unused_details = handler.termination()188        if code is grpc.StatusCode.OK:189            return read.response, _Call(handler)190        else:191            raise _RpcErrorCall(handler)192    else:193        raise _RpcErrorCall(handler)194def future_call(handler):195    return _FutureCall(handler, _HandlerExtras())196class ResponseIteratorCall(grpc.Call):197    def __init__(self, handler):198        self._handler = handler199    def __iter__(self):200        return self201    def __next__(self):202        return _next(self._handler)203    def next(self):204        return _next(self._handler)205    def cancel(self):206        _cancel(self._handler)207    def is_active(self):208        return _is_active(self._handler)209    def time_remaining(self):210        return _time_remaining(self._handler)211    def add_callback(self, callback):212        return _add_callback(self._handler, callback)213    def initial_metadata(self):214        return _initial_metadata(self._handler)215    def trailing_metadata(self):216        return _trailing_metadata(self._handler)217    def code(self):218        return _code(self._handler)219    def details(self):..._server_rpc.py
Source:_server_rpc.py  
1# Copyright 2017 gRPC authors.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7#     http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14import grpc_testing15class UnaryUnaryServerRpc(grpc_testing.UnaryUnaryServerRpc):16    def __init__(self, handler):17        self._handler = handler18    def initial_metadata(self):19        return self._handler.initial_metadata()20    def cancel(self):21        self._handler.cancel()22    def termination(self):23        return self._handler.unary_response_termination()24class UnaryStreamServerRpc(grpc_testing.UnaryStreamServerRpc):25    def __init__(self, handler):26        self._handler = handler27    def initial_metadata(self):28        return self._handler.initial_metadata()29    def take_response(self):30        return self._handler.take_response()31    def cancel(self):32        self._handler.cancel()33    def termination(self):34        return self._handler.stream_response_termination()35class StreamUnaryServerRpc(grpc_testing.StreamUnaryServerRpc):36    def __init__(self, handler):37        self._handler = handler38    def initial_metadata(self):39        return self._handler.initial_metadata()40    def send_request(self, request):41        self._handler.add_request(request)42    def requests_closed(self):43        self._handler.requests_closed()44    def cancel(self):45        self._handler.cancel()46    def termination(self):47        return self._handler.unary_response_termination()48class StreamStreamServerRpc(grpc_testing.StreamStreamServerRpc):49    def __init__(self, handler):50        self._handler = handler51    def initial_metadata(self):52        return self._handler.initial_metadata()53    def send_request(self, request):54        self._handler.add_request(request)55    def requests_closed(self):56        self._handler.requests_closed()57    def take_response(self):58        return self._handler.take_response()59    def cancel(self):60        self._handler.cancel()61    def termination(self):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
