Best Python code snippet using molotov_python
protocols_test.py
Source:protocols_test.py  
...17        request_packet_mock.is_wrq = lambda: True18        request_packet_mock.is_rrq = lambda: False19        klass = self.protocol.select_protocol(request_packet_mock)20        self.assertTrue(klass == WRQProtocol)21    def test_datagram_received(self):22        data = b'\x00\x01TEST\x00binary\x00'23        mock_loop = MagicMock()24        mock_loop.create_datagram_endpoint.return_value = ('endpoint',)25        mock_loop.create_task.return_value = True26        proto = TFTPServerProtocol('127.0.0.1', mock_loop, {})27        proto.datagram_received(data, ('127.0.0.1', 0,))28        self.assertTrue(mock_loop.create_datagram_endpoint.called)29        mock_loop.create_task.assert_called_with(('endpoint',))30class TestWRQProtocol(t.TestCase):31    @classmethod32    def setUpClass(cls):33        cls.packet_factory = TFTPPacketFactory()34    def setUp(self):35        self.addr = ('127.0.0.1', 9999,)36        self.wrq = WRQ + b'\x00filename1\x00octet\x00'37        self.proto = WRQProtocol(self.wrq, MagicMock, self.addr, {})38        self.proto.set_proto_attributes()39        self.proto.h_timeout = MagicMock()40        self.proto.file_handler = MagicMock()41        self.proto.counter = 1042        self.proto.transport = MagicMock()43    def test_reply_after_write(self):44        data = DAT + b'\x00\x0BAAAA'45        self.proto.datagram_received(data, self.addr)46        self.assertEqual(self.proto.counter, 11)47        self.proto.transport.sendto.assert_called_with(ACK + b'\x00\x0b',48                                                       self.addr)49    def test_close_transport_after_file_finished(self):50        data = DAT + b'\x00\x0BAAAA'51        self.proto.datagram_received(data, self.addr)52        self.assertTrue(self.proto.transport.close.called)53    def test_correct_packet_received_and_saved(self):54        data = DAT + b'\x00\x0BAAAA'55        self.proto.datagram_received(data, self.addr)56        self.proto.file_handler.write_chunk.assert_called_with(b'AAAA')57    def test_bad_tid(self):58        data = DAT + b'\x00\x0BAAAA'59        addr = ('127.0.0.1', 8888,)60        self.proto.datagram_received(data, addr)61        err_tid = self.packet_factory.err_unknown_tid()62        self.proto.transport.sendto.assert_called_with(err_tid.to_bytes(),63                                                       addr)64    def test_bad_packet(self):65        data = ACK + b'\x00\x0C'66        self.proto.datagram_received(data, self.addr)67        self.assertFalse(self.proto.transport.sendto.called)68    def test_bad_packet_sequence_is_ignored(self):69        data = DAT + b'\x00\x0CAAAA'70        self.proto.datagram_received(data, self.addr)71        self.assertFalse(self.proto.transport.sendto.called)72    def test_roll_over(self):73        self.proto.counter = 6553574        dat1 = DAT + b'\xff\xffAAAA'75        dat2 = DAT + b'\x00\x00AAAA'76        self.proto.datagram_received(dat1, self.addr)77        self.assertEqual(self.proto.counter, 65535)78        self.proto.datagram_received(dat2, self.addr)79        self.assertEqual(self.proto.counter, 0)80        self.proto.transport.sendto.assert_called_with(ACK + b'\x00\x00',81                                                       self.addr)82class TestRRQProtocol(t.TestCase):83    @classmethod84    def setUpClass(cls):85        cls.packet_factory = TFTPPacketFactory()86    def setUp(self):87        self.addr = ('127.0.0.1', 9999,)88        self.rrq = RRQ + b'\x00filename1\x00octet\x00'89        self.proto = RRQProtocol(self.rrq, MagicMock, self.addr, {})90        self.proto.set_proto_attributes()91        self.proto.h_timeout = MagicMock()92        self.proto.file_handler = MagicMock()93        self.proto.file_handler.read_chunk = MagicMock(return_value=b'AAAA')94        self.proto.handle_err_pkt = MagicMock()95        self.proto.counter = 1096        self.proto.transport = MagicMock()97    def test_get_next_chunk_of_data(self):98        rsp = self.proto.next_datagram()99        self.assertEqual(rsp.to_bytes(), DAT + b'\x00\x0aAAAA')100    def test_get_sequence_of_chunks(self):101        self.proto.file_handler.finished = False102        ack1 = ACK + b'\x00\x0a'103        ack2 = ACK + b'\x00\x0b'104        dat1 = DAT + b'\x00\x0bAAAA'105        dat2 = DAT + b'\x00\x0cAAAA'106        self.proto.datagram_received(ack1, self.addr)107        self.proto.datagram_received(ack2, self.addr)108        calls = [call(dat1, self.addr), call(dat2, self.addr)]109        self.proto.transport.sendto.assert_has_calls(calls)110    def test_get_next_window_of_data(self):111        self.proto.file_handler.finished = False112        self.proto.opts[b'windowsize'] = 2113        self.proto.packets = [None] * 2114        ack1 = ACK + b'\x00\x0a'115        dat1 = DAT + b'\x00\x0bAAAA'116        dat2 = DAT + b'\x00\x0cAAAA'117        self.proto.datagram_received(ack1, self.addr)118        calls = [call(dat1, self.addr), call(dat2, self.addr)]119        self.proto.transport.sendto.assert_has_calls(calls)120    def test_get_sequence_of_windows(self):121        self.proto.file_handler.finished = False122        self.proto.opts[b'windowsize'] = 2123        self.proto.packets = [None] * 2124        ack1 = ACK + b'\x00\x0a'125        ack2 = ACK + b'\x00\x0c'126        dat1 = DAT + b'\x00\x0bAAAA'127        dat2 = DAT + b'\x00\x0cAAAA'128        dat3 = DAT + b'\x00\x0dAAAA'129        dat4 = DAT + b'\x00\x0eAAAA'130        self.proto.datagram_received(ack1, self.addr)131        self.proto.datagram_received(ack2, self.addr)132        calls = [call(dat1, self.addr), call(dat2, self.addr),133                 call(dat3, self.addr), call(dat4, self.addr)]134        self.proto.transport.sendto.assert_has_calls(calls)135    def test_send_last_packet(self):136        self.proto.file_handler.read_chunk = MagicMock(return_value=b'AA')137        self.proto.file_handler.finished = False138        ack1 = ACK + b'\x00\x0a'139        ack2 = ACK + b'\x00\x0b'140        self.proto.datagram_received(ack1, self.addr)141        self.proto.transport.sendto.assert_called_with(DAT + b'\x00\x0bAA',142                                                       self.addr)143        self.proto.file_handler.finished = True144        self.proto.datagram_received(ack2, self.addr)145        self.assertTrue(self.proto.transport.close.called)146    def test_bad_packet(self):147        bad_msg = DAT + b'\x00\x0aAAAA'148        self.proto.datagram_received(bad_msg, self.addr)149        self.assertFalse(self.proto.transport.sendto.called)150    def test_bad_tid(self):151        # this should get moved to base tests, same for wrq152        addr = ('127.0.0.1', 8888,)153        ack1 = ACK + b'\x00\x0a'154        self.proto.datagram_received(ack1, addr)155        err_tid = self.packet_factory.err_unknown_tid()156        self.proto.transport.sendto.assert_called_with(err_tid.to_bytes(),157                                                       addr)158    def test_bad_packet_sequence_is_ignored(self):159        ack1 = ACK + b'\x00\x0b'160        self.proto.datagram_received(ack1, self.addr)161        self.assertFalse(self.proto.transport.sendto.called)162        self.assertFalse(self.proto.file_handler.send.called)163    def test_bad_packet_sequence_starts_new_window(self):164        self.proto.file_handler.finished = False165        self.proto.opts[b'windowsize'] = 2166        self.proto.packets = [None] * 2167        ack1 = ACK + b'\x00\x0a'168        ack2 = ACK + b'\x00\x0b'  # ACK of block_no within window169        dat1 = DAT + b'\x00\x0bAAAA'170        dat2 = DAT + b'\x00\x0cAAAA'171        dat3 = DAT + b'\x00\x0dAAAA'172        # After ACK of block_no \x0a, block_no \x0b and \x0c are sent173        self.proto.datagram_received(ack1, self.addr)174        # After ACK of block_no \x0b, block_no \x0c and \x0d are sent175        self.proto.datagram_received(ack2, self.addr)176        calls = [call(dat1, self.addr), call(dat2, self.addr),177                 call(dat2, self.addr), call(dat3, self.addr)]178        self.proto.transport.sendto.assert_has_calls(calls)179    def test_roll_over(self):180        self.proto.file_handler.finished = False181        self.proto.counter = (2 ** 16) - 1182        ack1 = ACK + b'\xff\xff'183        ack2 = ACK + b'\x00\x00'184        dat1 = DAT + b'\x00\x00AAAA'185        dat2 = DAT + b'\x00\x01AAAA'186        self.proto.datagram_received(ack1, self.addr)187        self.proto.datagram_received(ack2, self.addr)188        calls = [call(dat1, self.addr), call(dat2, self.addr)]189        self.proto.transport.sendto.assert_has_calls(calls)190    def test_err_received(self):191        err = ERR + b'\x00TFTP Aborted.\x00'192        self.proto.datagram_received(err, self.addr)193        self.assertTrue(self.proto.handle_err_pkt.called)194        self.assertFalse(self.proto.transport.sendto.called)195    def test_err_received_windowsize(self):196        err = ERR + b'\x00TFTP Aborted.\x00'197        self.proto.opts[b'windowsize'] = 2198        self.proto.datagram_received(err, self.addr)199        self.assertTrue(self.proto.handle_err_pkt.called)...test_bpup.py
Source:test_bpup.py  
...46    bpup_protocol.connection_made(transport)47    assert transport.sendto.mock_calls == [call(b"\n")]48    transport.sendto.reset_mock()49    assert bpup_subscriptions.alive is False50    bpup_protocol.datagram_received(51        b'{"B":"KNKSADE42149","d":0,"v":"v2.29.2-beta"}\n', MOCK_ADDR52    )53    assert bpup_subscriptions.alive is True54    mock_time_changed(loop, dt.datetime.now(dt.timezone.utc) + dt.timedelta(seconds=60))55    assert transport.sendto.mock_calls == [call(b"\n")]56    transport.sendto.reset_mock()57    mock_protocol_connection_lost(bpup_protocol, None)58    assert "BPUP connection lost" not in caplog.text59    assert bpup_subscriptions.alive is False60    mock_time_changed(61        loop, dt.datetime.now(dt.timezone.utc) + dt.timedelta(seconds=120)62    )63    assert transport.sendto.mock_calls == []64@pytest.mark.asyncio65async def test_protocol_keep_connection_lost_with_error(transport, caplog):66    bpup_subscriptions = BPUPSubscriptions()67    loop = asyncio.get_event_loop()68    bpup_protocol = BPUProtocol(bpup_subscriptions)69    bpup_protocol.connection_made(transport)70    assert transport.sendto.mock_calls == [call(b"\n")]71    transport.sendto.reset_mock()72    assert bpup_subscriptions.alive is False73    bpup_protocol.datagram_received(74        b'{"B":"KNKSADE42149","d":0,"v":"v2.29.2-beta"}\n', MOCK_ADDR75    )76    mock_time_changed(loop, dt.datetime.now(dt.timezone.utc) + dt.timedelta(seconds=60))77    assert transport.sendto.mock_calls == [call(b"\n")]78    transport.sendto.reset_mock()79    assert bpup_subscriptions.alive is True80    mock_protocol_connection_lost(bpup_protocol, OSError())81    assert "BPUP connection lost" in caplog.text82    assert bpup_subscriptions.alive is False83    assert transport.sendto.mock_calls == []84    mock_time_changed(85        loop, dt.datetime.now(dt.timezone.utc) + dt.timedelta(seconds=120)86    )87    assert transport.sendto.mock_calls == []88@pytest.mark.asyncio89async def test_protocol_subscriptions(transport, caplog):90    bpup_subscriptions = BPUPSubscriptions()91    bpup_protocol = BPUProtocol(bpup_subscriptions)92    last_msg = None93    def _on_new_message(msg):94        nonlocal last_msg95        last_msg = msg96    bpup_subscriptions.subscribe("1", _on_new_message)97    bpup_protocol.connection_made(transport)98    # Make sure we can do it again99    bpup_protocol.connection_made(transport)100    bpup_protocol.datagram_received(101        b'{"B":"KNKSADE42149","d":0,"v":"v2.29.2-beta"}\n', MOCK_ADDR102    )103    bpup_protocol.datagram_received(104        b'{"t":"devices/1/state","s":200,"b":{"power":1,"speed":1,"timer":0,"breeze":[0,50,50],"_":"690b6aff"}}\n',105        MOCK_ADDR,106    )107    assert last_msg == {108        "_": "690b6aff",109        "breeze": [0, 50, 50],110        "power": 1,111        "speed": 1,112        "timer": 0,113    }114    bpup_protocol.datagram_received(115        b'{"t":"devices/1/state","s":200,"b":{"power":1,"speed":1,"timer":0,"breeze":[0,50,50],"_":"690b6aff"}}\n',116        MOCK_ADDR,117    )118    # 500 error should not trigger a new message119    assert last_msg == {120        "_": "690b6aff",121        "breeze": [0, 50, 50],122        "power": 1,123        "speed": 1,124        "timer": 0,125    }126    last_msg = {}127    bpup_subscriptions.unsubscribe("1", _on_new_message)128    bpup_protocol.datagram_received(129        b'{"t":"devices/1/state","s":200,"b":{"power":1,"speed":1,"timer":0,"breeze":[0,50,50],"_":"690b6aff"}}\n',130        MOCK_ADDR,131    )132    assert last_msg == {}133    bpup_protocol.datagram_received(134        b'{"B":"KVPRBDGXXXXX","_error_id":633,"_error_msg":"BPUP client timeout"}',135        MOCK_ADDR,136    )137    assert last_msg == {}138    bpup_protocol.datagram_received(139        b"GIGO",140        MOCK_ADDR,141    )142    assert "Failed to process BPUP message" in caplog.text143    assert "GIGO" in caplog.text144@pytest.mark.asyncio145async def test_protocol_errors(transport, caplog):146    bpup_subscriptions = BPUPSubscriptions()147    bpup_protocol = BPUProtocol(bpup_subscriptions)148    bpup_protocol.connection_made(transport)149    bpup_protocol.error_received(OSError())150    assert "BPUP error" in caplog.text151@pytest.mark.asyncio152async def test_start_bpup(transport):...test_statsd.py
Source:test_statsd.py  
...46    def _test_gauge_or_ms(self, metric_type, utcnow):47        metric_name = "test_gauge_or_ms"48        metric_key = metric_name + "|" + metric_type49        utcnow.return_value = datetime.datetime(2015, 1, 7, 13, 58, 36)50        self.server.datagram_received(51            ("%s:1|%s" % (metric_name, metric_type)).encode('ascii'),52            ("127.0.0.1", 12345))53        self.stats.flush()54        r = self.stats.indexer.get_resource('generic',55                                            self.conf.statsd.resource_id,56                                            with_metrics=True)57        metric = r.get_metric(metric_key)58        self.stats.storage.process_background_tasks(59            self.stats.indexer, sync=True)60        measures = self.stats.storage.get_measures(metric)61        self.assertEqual([62            (utils.datetime_utc(2015, 1, 7), 86400.0, 1.0),63            (utils.datetime_utc(2015, 1, 7, 13), 3600.0, 1.0),64            (utils.datetime_utc(2015, 1, 7, 13, 58), 60.0, 1.0)65        ], measures)66        utcnow.return_value = datetime.datetime(2015, 1, 7, 13, 59, 37)67        # This one is going to be ignored68        self.server.datagram_received(69            ("%s:45|%s" % (metric_name, metric_type)).encode('ascii'),70            ("127.0.0.1", 12345))71        self.server.datagram_received(72            ("%s:2|%s" % (metric_name, metric_type)).encode('ascii'),73            ("127.0.0.1", 12345))74        self.stats.flush()75        self.stats.storage.process_background_tasks(76            self.stats.indexer, sync=True)77        measures = self.stats.storage.get_measures(metric)78        self.assertEqual([79            (utils.datetime_utc(2015, 1, 7), 86400.0, 1.5),80            (utils.datetime_utc(2015, 1, 7, 13), 3600.0, 1.5),81            (utils.datetime_utc(2015, 1, 7, 13, 58), 60.0, 1.0),82            (utils.datetime_utc(2015, 1, 7, 13, 59), 60.0, 2.0)83        ], measures)84    def test_gauge(self):85        self._test_gauge_or_ms("g")86    def test_ms(self):87        self._test_gauge_or_ms("ms")88    @mock.patch.object(timeutils, 'utcnow')89    def test_counter(self, utcnow):90        metric_name = "test_counter"91        metric_key = metric_name + "|c"92        utcnow.return_value = datetime.datetime(2015, 1, 7, 13, 58, 36)93        self.server.datagram_received(94            ("%s:1|c" % metric_name).encode('ascii'),95            ("127.0.0.1", 12345))96        self.stats.flush()97        r = self.stats.indexer.get_resource('generic',98                                            self.conf.statsd.resource_id,99                                            with_metrics=True)100        metric = r.get_metric(metric_key)101        self.assertIsNotNone(metric)102        self.stats.storage.process_background_tasks(103            self.stats.indexer, sync=True)104        measures = self.stats.storage.get_measures(metric)105        self.assertEqual([106            (utils.datetime_utc(2015, 1, 7), 86400.0, 1.0),107            (utils.datetime_utc(2015, 1, 7, 13), 3600.0, 1.0),108            (utils.datetime_utc(2015, 1, 7, 13, 58), 60.0, 1.0)], measures)109        utcnow.return_value = datetime.datetime(2015, 1, 7, 13, 59, 37)110        self.server.datagram_received(111            ("%s:45|c" % metric_name).encode('ascii'),112            ("127.0.0.1", 12345))113        self.server.datagram_received(114            ("%s:2|c|@0.2" % metric_name).encode('ascii'),115            ("127.0.0.1", 12345))116        self.stats.flush()117        self.stats.storage.process_background_tasks(118            self.stats.indexer, sync=True)119        measures = self.stats.storage.get_measures(metric)120        self.assertEqual([121            (utils.datetime_utc(2015, 1, 7), 86400.0, 28),122            (utils.datetime_utc(2015, 1, 7, 13), 3600.0, 28),123            (utils.datetime_utc(2015, 1, 7, 13, 58), 60.0, 1.0),124            (utils.datetime_utc(2015, 1, 7, 13, 59), 60.0, 55.0)], measures)125class TestStatsdArchivePolicyRule(TestStatsd):126    STATSD_ARCHIVE_POLICY_NAME = ""127    def setUp(self):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
