Best Python code snippet using playwright-python
server.py
Source:server.py  
1"""Sans-I/O ventilator backend server protocol."""2from typing import Optional, Union, Tuple3import logging4import attr5from ventserver.protocols import events, exceptions6from ventserver.protocols.backend import backend, connections7from ventserver.protocols.devices import file, frontend, mcu, rotary_encoder8from ventserver.protocols.protobuf import mcu_pb9from ventserver.sansio import protocols10# Events11@attr.s12class GenericConnectionEvent(events.Event):13    """Generic connection status event."""14    connected: bool = attr.ib(default=False)15    def has_data(self) -> bool:16        """Return whether the event has data."""17        return True18class FrontendConnectionEvent(GenericConnectionEvent):19    """Frontend connection status event."""20class MCUConnectionEvent(GenericConnectionEvent):21    """MCU connection status event."""22ConnectionEvent = Union[MCUConnectionEvent, FrontendConnectionEvent]23@attr.s24class ReceiveDataEvent(events.Event):25    """Server receive input event."""26    # This is the system's "wall clock" time, which may jump backwards or27    # forwards when the system time is adjusted. It should only be used for28    # recording timestamps intended to be read by people.29    wall_time: Optional[float] = attr.ib(default=None)30    # This is the system's monotonic time, which always increases at the same31    # rate regardless of adjustments to system time. Anything which needs to32    # compute the duration of elapsed time since some instant should use the33    # monotonic time.34    monotonic_time: Optional[float] = attr.ib(default=None)35    serial_receive: Optional[bytes] = attr.ib(default=None)36    websocket_receive: Optional[bytes] = attr.ib(default=None)37    rotary_encoder_receive: Tuple[int, bool] = attr.ib(default=None)38    file_receive: Optional[file.LowerReceiveEvent] = attr.ib(default=None)39    def has_data(self) -> bool:40        """Return whether the event has data."""41        return (42            self.wall_time is not None43            or self.monotonic_time is not None44            or bool(self.serial_receive)45            or self.websocket_receive is not None46            or self.rotary_encoder_receive is not None47            or self.file_receive is not None48        )49@attr.s50class ReceiveOutputEvent(events.Event):51    """Server receive output/send event."""52    states_send: Optional[backend.SendEvent] = attr.ib(default=None)53    sysclock_setting: Optional[float] = attr.ib(default=None)54    kill_frontend: bool = attr.ib(default=False)55    def has_data(self) -> bool:56        """Return whether the event has data."""57        if (self.states_send is not None) and self.states_send.has_data():58            return True59        if (self.sysclock_setting is not None) or self.kill_frontend:60            return True61        return False62ReceiveEvent = Union[ReceiveDataEvent, ConnectionEvent]63SendEvent = backend.SendEvent64@attr.s65class SendOutputEvent(events.Event):66    """Server send output/send event."""67    serial_send: Optional[bytes] = attr.ib(default=None)68    websocket_send: Optional[bytes] = attr.ib(default=None)69    file_send: Optional[file.StateData] = attr.ib(default=None)70    def has_data(self) -> bool:71        """Return whether the event has data."""72        return (73            bool(self.serial_send) or74            self.websocket_send is not None or75            self.file_send is not None76        )77def make_serial_receive(78        serial_receive: bytes,79        wall_time: float, monotonic_time: Optional[float],80) -> ReceiveDataEvent:81    """Make a ReceiveDataEvent from serial receive data."""82    return ReceiveDataEvent(83        serial_receive=serial_receive,84        wall_time=wall_time, monotonic_time=monotonic_time85    )86def make_websocket_receive(87        ws_receive: bytes,88        wall_time: float, monotonic_time: Optional[float]89) -> ReceiveDataEvent:90    """Make a ReceiveDataEvent from websocket receive data."""91    return ReceiveDataEvent(92        websocket_receive=ws_receive,93        wall_time=wall_time, monotonic_time=monotonic_time94    )95def make_rotary_encoder_receive(96        re_receive: Tuple[int, bool],97        wall_time: float, monotonic_time: Optional[float]98) -> ReceiveDataEvent:99    """Make a ReceiveDataEvent from rotary encoder receive data."""100    return ReceiveDataEvent(101        rotary_encoder_receive=re_receive,102        wall_time=wall_time, monotonic_time=monotonic_time103    )104# Filters105@attr.s106class Receiver(protocols.Filter[ReceiveEvent, ReceiveOutputEvent]):107    """Filter which transforms receive bytes into high-level events.108    Because this filter immediately handles inputs instead of taking inputs109    through an internal buffer and then processing the buffer in the output()110    method, the input method is not thread-safe! It should only be used in111    sequential or async/await code.112    """113    _logger = logging.getLogger('.'.join((__name__, 'Receiver')))114    wall_time: float = attr.ib(default=0)115    monotonic_time: float = attr.ib(default=0)116    _backend: backend.Receiver = attr.ib(factory=backend.Receiver)117    # Devices118    _mcu: mcu.Receiver = attr.ib(factory=mcu.Receiver)119    _frontend: frontend.Receiver = attr.ib(factory=frontend.Receiver)120    _file: file.Receiver = attr.ib(factory=file.Receiver)121    _rotary_encoder: rotary_encoder.Receiver = attr.ib(122        factory=rotary_encoder.Receiver123    )124    _connections: connections.TimeoutHandler = \125        attr.ib(factory=connections.TimeoutHandler)126    _connection_states: mcu_pb.BackendConnections = \127        attr.ib(factory=mcu_pb.BackendConnections)128    def input(self, event: Optional[ReceiveEvent]) -> None:129        """Handle input events."""130        if event is None or not event.has_data():131            return132        if isinstance(event, MCUConnectionEvent):133            update_type = (134                connections.Update.MCU_CONNECTED if event.connected135                else connections.Update.MCU_DISCONNECTED136            )137            self._connections.input(connections.UpdateEvent(138                monotonic_time=self.monotonic_time, type=update_type139            ))140            return141        if isinstance(event, FrontendConnectionEvent):142            update_type = (143                connections.Update.FRONTEND_CONNECTED if event.connected144                else connections.Update.FRONTEND_DISCONNECTED145            )146            self._connections.input(connections.UpdateEvent(147                monotonic_time=self.monotonic_time, type=update_type148            ))149            return150        if event.wall_time is not None:151            self.wall_time = event.wall_time152        if event.monotonic_time is not None:153            self.monotonic_time = event.monotonic_time154            self._connections.input(connections.UpdateEvent(155                monotonic_time=self.monotonic_time156            ))157        self._mcu.input(event.serial_receive)158        self._frontend.input(event.websocket_receive)159        self._rotary_encoder.input(rotary_encoder.ReceiveEvent(160            wall_time=self.wall_time, re_data=event.rotary_encoder_receive161        ))162        self._file.input(event.file_receive)163    def output(self) -> Optional[ReceiveOutputEvent]:164        """Emit the next output event."""165        devices_processed = self._process_devices()166        if not devices_processed:167            self._backend.input(backend.ReceiveDataEvent(168                wall_time=self.wall_time, monotonic_time=self.monotonic_time169            ))170        # Process backend output until the backend has data to output or it171        # indicates that it has no more receive data to process.172        output_event = ReceiveOutputEvent()173        while True:174            backend_send = self._backend.output()175            if backend_send is None or backend_send.has_data():176                break177        if backend_send is not None:178            output_event.states_send = backend_send.states_send179        output_event.kill_frontend = self._process_connection_statuses()180        if backend_send is not None:181            output_event.sysclock_setting = backend_send.sysclock_setting182        if not (devices_processed or output_event.has_data()):183            return None184        return output_event185    def _process_mcu(self) -> bool:186        """Process the next event from the mcu protocol."""187        mcu_output = self._mcu.output()188        if mcu_output is None:189            return False190        self._backend.input(backend.ReceiveDataEvent(191            wall_time=self.wall_time, monotonic_time=self.monotonic_time,192            mcu_receive=mcu_output, frontend_receive=None193        ))194        self._connections.input(connections.UpdateEvent(195            monotonic_time=self.monotonic_time,196            type=connections.Update.MCU_RECEIVED197        ))198        if not self._connection_states.has_mcu:199            self._logger.info('Receiving data from the MCU')200            self._backend.input(backend.ExternalLogEvent(201                wall_time=self.wall_time, monotonic_time=self.monotonic_time,202                active=False,203                code=mcu_pb.LogEventCode.backend_mcu_connection_down204            ))205            self._connection_states.has_mcu = True206            self._backend.input(backend.ReceiveDataEvent(207                wall_time=self.wall_time, monotonic_time=self.monotonic_time,208                server_receive=self._connection_states209            ))210        return True211    def _process_frontend(self) -> bool:212        """Process the next event from the frontend protocol."""213        frontend_output = self._frontend.output()214        if frontend_output is None:215            return False216        self._backend.input(backend.ReceiveDataEvent(217            wall_time=self.wall_time, monotonic_time=self.monotonic_time,218            frontend_receive=frontend_output219        ))220        self._connections.input(connections.UpdateEvent(221            monotonic_time=self.monotonic_time,222            type=connections.Update.FRONTEND_RECEIVED223        ))224        if not self._connection_states.has_frontend:225            self._logger.info('Receiving data from the frontend')226            self._backend.input(backend.ExternalLogEvent(227                wall_time=self.wall_time, monotonic_time=self.monotonic_time,228                active=False,229                code=mcu_pb.LogEventCode.backend_frontend_connection_down230            ))231            self._backend.input(backend.ExternalLogEvent(232                wall_time=self.wall_time, monotonic_time=self.monotonic_time,233                code=mcu_pb.LogEventCode.backend_frontend_connection_up234            ))235            self._connection_states.has_frontend = True236            self._backend.input(backend.ReceiveDataEvent(237                wall_time=self.wall_time, monotonic_time=self.monotonic_time,238                server_receive=self._connection_states239            ))240        return True241    def _process_file(self) -> bool:242        """Process the next event from the file."""243        file_output = self._file.output()  # throws ProtocolDataError244        if file_output is None:245            return False246        self._backend.input(backend.ReceiveDataEvent(247            wall_time=self.wall_time, monotonic_time=self.monotonic_time,248            file_receive=file_output249        ))250        return True251    def _process_rotary_encoder(self) -> bool:252        """Process the next event from the rotary encoder."""253        rotary_encoder_output = self._rotary_encoder.output()254        if rotary_encoder_output is None:255            return False256        self._backend.input(backend.ReceiveDataEvent(257            wall_time=self.wall_time, monotonic_time=self.monotonic_time,258            frontend_receive=rotary_encoder_output259        ))260        return True261    def _process_devices(self) -> bool:262        """Process the next event from every device."""263        any_updated = False264        any_updated = self._process_mcu() or any_updated265        any_updated = self._process_frontend() or any_updated266        # TODO: why do we need an exception handler here?267        try:268            any_updated = self._process_file() or any_updated269        except exceptions.ProtocolDataError as err:270            self._logger.error(err)271        any_updated = self._process_rotary_encoder() or any_updated272        return any_updated273    def _process_connection_statuses(self) -> bool:274        """Handle any connection timeouts."""275        actions = self._connections.output()276        if actions.alarm_mcu:277            self._backend.input(backend.ExternalLogEvent(278                wall_time=self.wall_time, monotonic_time=self.monotonic_time,279                active=True,280                code=mcu_pb.LogEventCode.backend_mcu_connection_down281            ))282            self._connection_states.has_mcu = False283            self._backend.input(backend.ReceiveDataEvent(284                wall_time=self.wall_time, monotonic_time=self.monotonic_time,285                server_receive=self._connection_states286            ))287            # The backend should temporarily override any active alarm mute by288            # cancelling it in case local alarm sounds need to be audible in the289            # backend, e.g. for loss of the MCU. It will also propagate this290            # override to the frontend, if the frontend is connected; the291            # frontend will also independently cancel the alarm mute, so when292            # the backend overrides the frontend (or vice versa) the alarm mute293            # is still cancelled. AlarmMute will be overridden again by the MCU294            # when it reconnects, but the MCU will also cancel any active alarm295            # mute in the MCU, so the alarm mute is still cancelled.296            self._backend.input(backend.AlarmMuteCancellationEvent(297                wall_time=self.wall_time, monotonic_time=self.monotonic_time,298                source=mcu_pb.AlarmMuteSource.backend_mcu_loss299            ))300        if actions.alarm_frontend:301            self._backend.input(backend.ExternalLogEvent(302                wall_time=self.wall_time, monotonic_time=self.monotonic_time,303                active=True,304                code=mcu_pb.LogEventCode.backend_frontend_connection_down305            ))306            self._connection_states.has_frontend = False307            self._backend.input(backend.ReceiveDataEvent(308                wall_time=self.wall_time, monotonic_time=self.monotonic_time,309                server_receive=self._connection_states310            ))311            # We don't need to request an alarm mute cancellation from the312            # MCU, because the MCU already cancels the alarm mute when the313            # backend tells it that the frontend has been lost. If the MCU is314            # also lost, that will be handled in the previous `if` statement.315        return actions.kill_frontend316    def input_serial(self, serial_receive: bytes) -> None:317        """Input a ReceiveDataEvent corresponding to serial data.318        This is just a convenience function intended for writing unit tests319        more concisely.320        """321        self.input(make_serial_receive(322            serial_receive, self.wall_time, self.monotonic_time323        ))324    def input_websocket(self, websocket: bytes) -> None:325        """Input a ReceiveDataEvent corresponding to websocket data.326        This is just a convenience function intended for writing unit tests327        more concisely.328        """329        self.input(make_websocket_receive(330            websocket, self.wall_time, self.monotonic_time331        ))332    @property333    def backend(self) -> backend.Receiver:334        """Return the backend receiver."""335        return self._backend336    @property337    def file(self) -> file.Receiver:338        """Return the file receiver"""339        return self._file340@attr.s341class Sender(protocols.Filter[SendEvent, SendOutputEvent]):342    """Filter which transforms high-level events into send bytes."""343    # Devices344    _mcu: mcu.Sender = attr.ib(factory=mcu.Sender)345    _frontend: frontend.Sender = attr.ib(factory=frontend.Sender)346    _file: file.Sender = attr.ib(factory=file.Sender)347    def input(self, event: Optional[SendEvent]) -> None:348        """Handle input events."""349        self._mcu.input(backend.get_mcu_send(event))350        self._frontend.input(backend.get_frontend_send(event))351        self._file.input(backend.get_file_send(event))352    def output(self) -> Optional[SendOutputEvent]:353        """Emit the next output event."""354        send_event = SendOutputEvent(355            serial_send=self._mcu.output(),356            websocket_send=self._frontend.output(),357            file_send=self._file.output()358        )359        if not send_event.has_data():360            return None361        return send_event362    @property363    def file(self) -> file.Sender:364        """Return file sendfilter"""365        return self._file366# Protocols367@attr.s368class Protocol(protocols.Protocol[369        ReceiveEvent, ReceiveOutputEvent, SendEvent, SendOutputEvent370]):371    """Backend communication protocol."""372    _receive: Receiver = attr.ib(factory=Receiver)373    _send: Sender = attr.ib(factory=Sender)374    @property375    def receive(self) -> Receiver:376        """Return a Filter interface for receive events."""377        return self._receive378    @property379    def send(self) -> Sender:380        """Return a Filter interface for send events."""...mock_time.py
Source:mock_time.py  
1#!/usr/bin/env python2#3# Copyright 2015 British Broadcasting Corporation4# 5# Licensed under the Apache License, Version 2.0 (the "License");6# you may not use this file except in compliance with the License.7# You may obtain a copy of the License at8# 9#     http://www.apache.org/licenses/LICENSE-2.010# 11# Unless required by applicable law or agreed to in writing, software12# distributed under the License is distributed on an "AS IS" BASIS,13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14# See the License for the specific language governing permissions and15# limitations under the License.16import unittest17import _useDvbCssUninstalled18import dvbcss.monotonic_time as monotonic_time19import threading20class MockTime(object):21    """\22    Object that provides mocks for :func:`monotonic_time.time` and23    :func:`dvbcss.monotonic_time.sleep`24    25    Use :func:`install` and :func:`uninstall` to plug the mock in.26    27    It works by replacing the time() and sleep() function in the28    :mod:`dvbcss.monotonic_time` module (or alternative module that you specify29    when calling :func:`install`).30    It therefore will mock the monotonic_time module in any module that has31    already imported it, or that imports it in the future, with only one32    call to :func:`install`.33    """34    def __init__(self):35        super(MockTime,self).__init__()36        self._timeNow = 037        self.doctoredTimeModule=None38        self.oldTimeFunc = None39        self.oldSleepFunc = None40        self.sleepEvents = []41        self._incrStep = 042        43    @property44    def timeNow(self):45        """\46        Get/set this property to set the value that the mock time() function will return.47        48        If setting this property mean that the time has passed the point at which49        a call to the mocked :func:`sleep` function is due to wake (unblock)50        and return then this will take place.51        """52        # process any pending wakeups for this time53        # go through list of pending sleeps and wake any up that need to be,54        # deleting them on the way.55        i=056        while i<len(self.sleepEvents):57            wakeUpTime, event = self.sleepEvents[i]58            if self._timeNow >= wakeUpTime:59                event.set()60                del self.sleepEvents[i]61            else:62                i=i+163        64        t = self._timeNow65        66        # do auto increment (if enabled)67        if self._incrStep != 0:68            self._incrCountdown -= 169            if self._incrCountdown == 0:70                self._incrCountdown = self._incrInterval71                self._timeNow += self._incrStep72                73        return t74        75    @timeNow.setter76    def timeNow(self, newValue):77        self._timeNow = newValue78        # trigger reading the time to cause any sleeps that need to be woken 79        # to be processed80        _ = self.timeNow81        82    def enableAutoIncrementBy(self, step, numReadsBetweenIncrements=1):83        self._incrStep = step84        self._incrInterval = numReadsBetweenIncrements85        self._incrCountdown = self._incrInterval86        87    def disableAutoIncrement(self):88        self._incrStep = 089    90    def flush(self):91        """\92        Flushes any calls blocked on the mock :func:`sleep` function by causing93        them to all wake (unblock) and return.94        """95        while len(self.sleepEvents):96            wakeUpTime, event = self.sleepEvents[0]97            event.set()98            del self.sleepEvents[0]99    100    def install(self, module=monotonic_time):101        """\102        Plugs the mock in (or does nothing if already plugged into something.103        104        :param module: The "time" module to have its `time()` and `sleep()` functions replaced. Defaults to the dvbcss.monotonic_time module.105        """106        if self.doctoredTimeModule is None:107        108            self.doctoredTimeModule = module109            self.oldTimeFunc = getattr(module,"time")110            111            setattr(module,"time", self.mockTimeFunc)112            self.oldSleepFunc = getattr(module,"sleep")113            114            setattr(module,"sleep", self.mockSleepFunc)115        else:116            raise Exception("MockTime is already plugged in. Cannot be plugged in in two places.")117    def mockTimeFunc(self):118        return self.timeNow119        120    def mockSleepFunc(self, durationSecs):121        wakeUpTime = self.timeNow + durationSecs122        event = threading.Event()123        event.clear()124        self.sleepEvents.append((wakeUpTime, event))125        event.wait()126        127    def uninstall(self):128        """\129        Unplugs the mock (or does nothing if already unplugged.130        131        Also causes a :func:`flush` to happen.132        """133        if self.doctoredTimeModule is not None:134        135            self.flush()136        137            setattr(self.doctoredTimeModule,"time", self.oldTimeFunc)138            self.oldTimeFunc=None139            140            setattr(self.doctoredTimeModule,"sleep", self.oldSleepFunc)141            self.oldSleepFunc=None142            143            self.doctoredTimeModule = None144        else:145            raise Exception("MockTime is not already plugged in. Cannot be unplugged.")146            147class SleepThread(threading.Thread):148    def __init__(self, sleepArg):149        super(SleepThread,self).__init__()150        self.sleepArg = sleepArg151        self.daemon = True152    def run(self):153        monotonic_time.sleep(self.sleepArg)154class Test_mock_time(unittest.TestCase):155    """\156    Tests for the MockTime class.157    158    Turtles all the way down.159    """160    161    def testInstallUninstall(self):162        """\163        Check if the mock appears to replace (when installing) and164        restore (when uninstalling) the monotonic_module time() and sleep() functions.165        """166        import dvbcss.monotonic_time as monotonic_time167        168        oldTime = monotonic_time.time169        oldSleep = monotonic_time.sleep170        171        mockUnderTest = MockTime()172        self.assertEquals(oldTime, monotonic_time.time)173        self.assertEquals(oldSleep, monotonic_time.sleep)174        mockUnderTest.install(monotonic_time)175        try:176            self.assertNotEquals(oldTime, monotonic_time.time)177            self.assertNotEquals(oldSleep, monotonic_time.sleep)178        finally:179            mockUnderTest.uninstall()180        181        self.assertEquals(oldTime, monotonic_time.time)182        self.assertEquals(oldSleep, monotonic_time.sleep)183        184        185    def testTime(self):186        import dvbcss.monotonic_time as monotonic_time187    188        mockUnderTest = MockTime()189        mockUnderTest.install(monotonic_time)190        try:191            mockUnderTest.timeNow = 1234192            self.assertEquals(monotonic_time.time(), 1234)193            194            mockUnderTest.timeNow = -485.2701195            self.assertEquals(monotonic_time.time(), -485.2701)196        finally:197            mockUnderTest.uninstall()198    def testSleep(self):199        import dvbcss.monotonic_time as monotonic_time200    201        mockUnderTest = MockTime()202        mockUnderTest.install(monotonic_time)203        204        try:205            a = SleepThread(5.0)206            b = SleepThread(2.0)207            c = SleepThread(7.0)208            209            mockUnderTest.timeNow = 1000.0210            a.start()  # will happen at t > 1005211            212            self.assertTrue(a.isAlive())213            214            mockUnderTest.timeNow = 1001.0215            b.start()   # will happen at t >= 1003216            c.start()   # will happen at t >= 1008217            218            self.assertTrue(a.isAlive())219            self.assertTrue(b.isAlive())220            self.assertTrue(c.isAlive())221            mockUnderTest.timeNow = 1002.99222            self.assertTrue(a.isAlive())223            self.assertTrue(b.isAlive())224            self.assertTrue(c.isAlive())225             226            mockUnderTest.timeNow = 1003.1227            b.join(1.0)228            self.assertTrue(a.isAlive())229            self.assertFalse(b.isAlive())230            self.assertTrue(c.isAlive())231             232            mockUnderTest.timeNow = 1004.99233            self.assertTrue(a.isAlive())234            self.assertTrue(c.isAlive())235            mockUnderTest.timeNow = 1005.7236            a.join(1.0)237            self.assertFalse(a.isAlive())238            self.assertTrue(c.isAlive())239            240            mockUnderTest.timeNow = 1007.8241            self.assertTrue(c.isAlive())242            mockUnderTest.timeNow = 1008.0243            c.join(1.0)244            self.assertFalse(c.isAlive())245        finally:246            mockUnderTest.uninstall()247    def testSleepFlush(self):248        """\249        Check that sleeps that have not triggered are unblocked and return when250        the flush method is called.251        """252        import dvbcss.monotonic_time as monotonic_time253    254        mockUnderTest = MockTime()255        mockUnderTest.install(monotonic_time)256        257        try:258            a = SleepThread(5.0)259            b = SleepThread(2.0)260            c = SleepThread(7.0)261            262            mockUnderTest.timeNow = 1000.0263            a.start()  # will happen at t > 1005264            265            self.assertTrue(a.isAlive())266            267            mockUnderTest.timeNow = 1001.0268            b.start()   # will happen at t >= 1003269            c.start()   # will happen at t >= 1008270         271            self.assertTrue(a.isAlive())272            self.assertTrue(b.isAlive())273            self.assertTrue(c.isAlive())274            275            mockUnderTest.flush()276            a.join(1.0)         277            b.join(1.0)         278            c.join(1.0)     279            280            self.assertFalse(a.isAlive())    281            self.assertFalse(b.isAlive())    282            self.assertFalse(c.isAlive())283        finally:284            mockUnderTest.uninstall()285    def testSleepUninstallFlush(self):286        """\287        Check that sleeps that have not triggered are unblocked and return when288        the flush method is called.289        """290        import dvbcss.monotonic_time as monotonic_time291    292        mockUnderTest = MockTime()293        mockUnderTest.install(monotonic_time)294        295        try:296            a = SleepThread(5.0)297            b = SleepThread(2.0)298            c = SleepThread(7.0)299            300            mockUnderTest.timeNow = 1000.0301            a.start()  # will happen at t > 1005302            303            self.assertTrue(a.isAlive())304            305            mockUnderTest.timeNow = 1001.0306            b.start()   # will happen at t >= 1003307            c.start()   # will happen at t >= 1008308         309            self.assertTrue(a.isAlive())310            self.assertTrue(b.isAlive())311            self.assertTrue(c.isAlive())312            313        finally:314            mockUnderTest.uninstall()315        a.join(1.0)         316        b.join(1.0)         317        c.join(1.0)     318        319        self.assertFalse(a.isAlive())    320        self.assertFalse(b.isAlive())    321        self.assertFalse(c.isAlive())322        323    def testIncrInterval(self):324        """\325        Mock time can be auto incremented326        """327        import dvbcss.monotonic_time as monotonic_time328    329        mockUnderTest = MockTime()330        mockUnderTest.install(monotonic_time)331        332        try:333            mockUnderTest.timeNow = 5334            335            # no auto increment by Default336            for i in range(0,10000):337                self.assertEquals(5, monotonic_time.time())338                339            mockUnderTest.enableAutoIncrementBy(0.1, numReadsBetweenIncrements=5)340            for i in range(0,100):341                for j in range(0,5):342                    self.assertAlmostEquals(5 + i*0.1, monotonic_time.time(), delta=0.0001)343            344        finally:345            mockUnderTest.uninstall()346            347if __name__ == "__main__":348    #import sys;sys.argv = ['', 'Test.testName']...connections.py
Source:connections.py  
1"""Sans-I/O backend connection loss handling protocol."""2import enum3from typing import Optional, Tuple4import logging5import attr6from ventserver.protocols import events7from ventserver.protocols.application import connections8from ventserver.sansio import protocols9# Events10@enum.unique11class Update(enum.Enum):12    """Enum for specifying the type of connection update event."""13    MCU_CONNECTED = enum.auto()14    MCU_DISCONNECTED = enum.auto()15    MCU_RECEIVED = enum.auto()16    FRONTEND_CONNECTED = enum.auto()17    FRONTEND_DISCONNECTED = enum.auto()18    FRONTEND_RECEIVED = enum.auto()19    CLOCK = enum.auto()20@attr.s21class UpdateEvent(events.Event):22    """Generic connection status event."""23    monotonic_time: float = attr.ib()24    type: Update = attr.ib(default=Update.CLOCK)25    def has_data(self) -> bool:26        """Return whether the event has data."""27        return True28@attr.s29class ActionsEvent(events.Event):30    """Actions execution event."""31    alarm_mcu: bool = attr.ib(default=False)32    alarm_frontend: bool = attr.ib(default=False)33    kill_frontend: bool = attr.ib(default=False)34    def has_data(self) -> bool:35        """Return whether the event has data."""36        return True37@attr.s38class TimeoutHandler(protocols.Filter[UpdateEvent, ActionsEvent]):39    """Filter which handles connection timeouts for MCU and frontend."""40    _logger = logging.getLogger('.'.join((__name__, 'TimeoutHandler')))41    monotonic_time: float = attr.ib(default=0)42    _mcu_status: connections.TimeoutDetector = attr.ib()43    _mcu_alarm_trigger: connections.ActionTrigger = \44        attr.ib(factory=connections.ActionTrigger)45    _frontend_status: connections.TimeoutDetector = attr.ib()46    _frontend_kill_trigger: connections.ActionTrigger = attr.ib()47    _frontend_alarm_trigger: connections.ActionTrigger = \48        attr.ib(factory=connections.ActionTrigger)49    @_mcu_status.default50    def init_mcu_status(self) -> \51            connections.TimeoutDetector:  # pylint: disable=no-self-use52        """Initialize the MCU connection status tracker."""53        return connections.TimeoutDetector(event_timeout=0.5)54    @_frontend_status.default55    def init_frontend_status(self) -> \56            connections.TimeoutDetector:  # pylint: disable=no-self-use57        """Initialize the frontend connection status tracker."""58        return connections.TimeoutDetector(event_timeout=2)59    @_frontend_kill_trigger.default60    def init_frontend_kill_trigger(self) -> \61            connections.ActionTrigger:  # pylint: disable=no-self-use62        """Initialize the frontend kill trigger."""63        return connections.ActionTrigger(repeat_interval=5)64    def input(self, event: Optional[UpdateEvent]) -> None:65        """Handle input events."""66        if event is None:67            return68        self.monotonic_time = event.monotonic_time69        self._update_clocks()70        if event.type == Update.MCU_CONNECTED:71            self._logger.info('Received connection from the MCU')72            self._handle_mcu_connection(True)73        elif event.type == Update.MCU_DISCONNECTED:74            self._logger.warning('Lost connection from the MCU!')75            self._handle_mcu_connection(False)76        elif event.type == Update.MCU_RECEIVED:77            self._mcu_status.input(connections.UpdateEvent(78                monotonic_time=self.monotonic_time,79                event_received=True80            ))81        elif event.type == Update.FRONTEND_CONNECTED:82            self._logger.info('Received connection from the frontend')83            self._handle_frontend_connection(True)84        elif event.type == Update.FRONTEND_DISCONNECTED:85            self._logger.warning('Lost connection from the frontend!')86            self._handle_frontend_connection(False)87        elif event.type == Update.FRONTEND_RECEIVED:88            self._frontend_status.input(connections.UpdateEvent(89                monotonic_time=self.monotonic_time,90                event_received=True91            ))92    def output(self) -> ActionsEvent:93        """Emit the next output event."""94        output = ActionsEvent()95        output.alarm_mcu = self._process_mcu()96        (output.alarm_frontend, output.kill_frontend) = self._process_frontend()97        return output98    def _update_clocks(self) -> None:99        """Update all clocks."""100        status_clock_update = connections.UpdateEvent(101            monotonic_time=self.monotonic_time102        )103        self._mcu_status.input(status_clock_update)104        self._frontend_status.input(status_clock_update)105        trigger_clock_update = connections.ActionStatus(106            monotonic_time=self.monotonic_time107        )108        self._mcu_alarm_trigger.input(trigger_clock_update)109        self._frontend_alarm_trigger.input(trigger_clock_update)110        self._frontend_kill_trigger.input(trigger_clock_update)111    def _process_mcu(self) -> bool:112        """Decide if the MCU connection is unresponsive and what to do."""113        mcu_status = self._mcu_status.output()114        # Update action trigger115        if mcu_status is not None:116            self._mcu_alarm_trigger.input(connections.ActionStatus(117                monotonic_time=self.monotonic_time, trigger=mcu_status.timed_out118            ))119        # Decide whether to run action120        alarm_mcu = self._mcu_alarm_trigger.output()121        if alarm_mcu:122            self._logger.warning('No longer receiving data from the MCU!')123            self._mcu_alarm_trigger.input(connections.ActionStatus(124                monotonic_time=self.monotonic_time, execute=True125            ))126        return alarm_mcu127    def _process_frontend(self) -> Tuple[bool, bool]:128        """Decide if the frontend connection is unresponsive and what to do."""129        frontend_status = self._frontend_status.output()130        # Update action triggers131        if frontend_status is not None:132            self._frontend_alarm_trigger.input(connections.ActionStatus(133                monotonic_time=self.monotonic_time,134                trigger=frontend_status.timed_out135            ))136            self._frontend_kill_trigger.input(connections.ActionStatus(137                monotonic_time=self.monotonic_time, trigger=(138                    frontend_status.timed_out and frontend_status.uptime > 2139                )140            ))141        # Decide whether to run actions142        alarm_frontend = self._frontend_alarm_trigger.output()143        if alarm_frontend:144            self._logger.warning('No longer receiving data from the frontend!')145            self._frontend_alarm_trigger.input(connections.ActionStatus(146                monotonic_time=self.monotonic_time,147                execute=True148            ))149        kill_frontend = self._frontend_kill_trigger.output()150        if kill_frontend:151            self._frontend_kill_trigger.input(connections.ActionStatus(152                monotonic_time=self.monotonic_time,153                execute=True154            ))155        return (alarm_frontend, kill_frontend)156    def _handle_mcu_connection(self, connected: bool) -> None:157        """Handle a connection event for the MCU."""158        self._mcu_status.input(connections.UpdateEvent(159            connected=connected, monotonic_time=self.monotonic_time160        ))161    def _handle_frontend_connection(self, connected: bool) -> None:162        """Handle a connection event for the Frontend."""163        if connected:164            # Stop repeatedly trying to kill the frontend, to give the frontend165            # connection some time to start producing events166            self._frontend_kill_trigger.input(connections.ActionStatus(167                monotonic_time=self.monotonic_time,168                trigger=False169            ))170        self._frontend_status.input(connections.UpdateEvent(171            connected=connected, monotonic_time=self.monotonic_time...alarm_mute.py
Source:alarm_mute.py  
1"""alarm muting request and response"""2import logging3import random4import typing5from typing import Mapping, Optional6import attr7import betterproto8from ventserver.protocols.backend import alarms, log, states9from ventserver.protocols.protobuf import mcu_pb10@attr.s11class Service:12    """Implement Alarm Mute Service"""13    MUTE_MAX_DURATION = 120000  # ms14    _logger = logging.getLogger('.'.join((__name__, 'Service')))15    active: bool = attr.ib(default=False)16    mute_start_time: Optional[float] = attr.ib(default=None)17    seq_num: int = attr.ib(default=random.getrandbits(32))18    source: mcu_pb.AlarmMuteSource = \19        attr.ib(default=mcu_pb.AlarmMuteSource.initialization)20    # TODO: also allow canceling alarm mute upon loss of connection, maybe with21    # an AlarmMute client (rather than a service)22    def transform(23            self, monotonic_time: float,24            store: Mapping[states.StateSegment, Optional[betterproto.Message]],25            events_log: alarms.Manager26    ) -> None:27        """Update the parameters for alarm mute service"""28        alarm_mute_request = typing.cast(29            mcu_pb.AlarmMuteRequest,30            store[states.StateSegment.ALARM_MUTE_REQUEST]31        )32        alarm_mute = typing.cast(33            mcu_pb.AlarmMute, store[states.StateSegment.ALARM_MUTE]34        )35        backend_connections = typing.cast(36            mcu_pb.BackendConnections,37            store[states.StateSegment.BACKEND_CONNECTIONS]38        )39        self.transform_mute(40            monotonic_time, alarm_mute_request, alarm_mute, events_log41        )42        if (43                backend_connections is not None and44                not backend_connections.has_frontend45        ):46            self.transform_mute_internal(47                monotonic_time, False,48                mcu_pb.AlarmMuteSource.backend_frontend_loss,49                alarm_mute, events_log50            )51    def transform_mute(52            self, monotonic_time: float,53            request: mcu_pb.AlarmMuteRequest, response: mcu_pb.AlarmMute,54            events_log: alarms.Manager55    ) -> None:56        """Implement alarm muting."""57        if request.seq_num == self.seq_num + 1:58            self._update_internal_state(59                request.active, monotonic_time, request.source,60                events_log61            )62        self._update_response(monotonic_time, response, events_log)63    def transform_mute_internal(64            self, monotonic_time: float,65            mute: bool, source: mcu_pb.AlarmMuteSource,66            response: mcu_pb.AlarmMute, events_log: alarms.Manager67    ) -> None:68        """Implement alarm muting."""69        self._update_internal_state(70            mute, monotonic_time, source, events_log71        )72        self._update_response(monotonic_time, response, events_log)73    def _update_internal_state(74            self, active: bool, monotonic_time: float,75            source: mcu_pb.AlarmMuteSource, events_log: alarms.Manager76    ) -> None:77        """Update internal state."""78        if self.active == active:79            return80        self.seq_num += 181        self.active = active82        self.source = source83        if active:84            self.mute_start_time = monotonic_time * 100085            if source == mcu_pb.AlarmMuteSource.user_software:86                log_event_code = mcu_pb.LogEventCode.alarms_muted_user_software87            else:88                self._logger.error(89                    'Unexpected alarm mute activation source %s', source90                )91                log_event_code = mcu_pb.LogEventCode.alarms_muted_unknown92        else:93            self.mute_start_time = None94            if source == mcu_pb.AlarmMuteSource.initialization:95                log_event_code = \96                    mcu_pb.LogEventCode.alarms_unmuted_initialization97            elif source == mcu_pb.AlarmMuteSource.user_software:98                log_event_code = \99                    mcu_pb.LogEventCode.alarms_unmuted_user_software100            elif source == mcu_pb.AlarmMuteSource.timeout:101                log_event_code = \102                    mcu_pb.LogEventCode.alarms_unmuted_timeout103            elif source == mcu_pb.AlarmMuteSource.backend_frontend_loss:104                log_event_code = \105                    mcu_pb.LogEventCode.alarms_unmuted_backend_frontend_loss106            elif source == mcu_pb.AlarmMuteSource.frontend_backend_loss:107                log_event_code = \108                    mcu_pb.LogEventCode.alarms_unmuted_frontend_backend_loss109            else:110                self._logger.error(111                    'Unexpected alarm mute cancellation source %s', source112                )113                log_event_code = mcu_pb.LogEventCode.alarms_unmuted_unknown114        events_log.input(log.LocalLogInputEvent(new_event=mcu_pb.LogEvent(115            code=log_event_code, type=mcu_pb.LogEventType.system116        )))117    def _update_response(118            self, monotonic_time: float, response: mcu_pb.AlarmMute,119            events_log: alarms.Manager120    ) -> None:121        """Update response based on internal state."""122        response.active = self.active123        response.seq_num = self.seq_num124        response.source = self.source125        self._update_remaining(monotonic_time, response)126        if response.remaining > 0:127            return128        self._update_internal_state(129            False, monotonic_time, mcu_pb.AlarmMuteSource.timeout, events_log130        )131        response.active = self.active132        response.seq_num = self.seq_num133        response.source = self.source134        self._update_remaining(monotonic_time, response)135    def _update_remaining(136            self, monotonic_time: float, response: mcu_pb.AlarmMute137    ) -> None:138        """Update remaining field of response based on internal state."""139        if not self.active:140            response.remaining = self.MUTE_MAX_DURATION141            return142        if self.mute_start_time is None:143            self.mute_start_time = monotonic_time * 1000144        mute_duration = monotonic_time * 1000 - self.mute_start_time145        remaining = self.MUTE_MAX_DURATION - int(mute_duration)...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
