How to use stop_async method in Playwright Python

Best Python code snippet using playwright-python

Run Playwright Python automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

__init__.py

Source: __init__.py Github

copy
1"""Support for Nest devices."""
2
3import logging
4
5from google_nest_sdm.event import EventMessage
6from google_nest_sdm.exceptions import (
7    AuthException,
8    ConfigurationException,
9    GoogleNestException,
10)
11from google_nest_sdm.google_nest_subscriber import GoogleNestSubscriber
12import voluptuous as vol
13
14from homeassistant.config_entries import ConfigEntry
15from homeassistant.const import (
16    CONF_BINARY_SENSORS,
17    CONF_CLIENT_ID,
18    CONF_CLIENT_SECRET,
19    CONF_MONITORED_CONDITIONS,
20    CONF_SENSORS,
21    CONF_STRUCTURE,
22)
23from homeassistant.core import HomeAssistant
24from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
25from homeassistant.helpers import (
26    aiohttp_client,
27    config_entry_oauth2_flow,
28    config_validation as cv,
29)
30from homeassistant.helpers.typing import ConfigType
31
32from . import api, config_flow
33from .const import DATA_SDM, DATA_SUBSCRIBER, DOMAIN, OAUTH2_AUTHORIZE, OAUTH2_TOKEN
34from .events import EVENT_NAME_MAP, NEST_EVENT
35from .legacy import async_setup_legacy, async_setup_legacy_entry
36
37_LOGGER = logging.getLogger(__name__)
38
39CONF_PROJECT_ID = "project_id"
40CONF_SUBSCRIBER_ID = "subscriber_id"
41DATA_NEST_CONFIG = "nest_config"
42DATA_NEST_UNAVAILABLE = "nest_unavailable"
43
44NEST_SETUP_NOTIFICATION = "nest_setup"
45
46SENSOR_SCHEMA = vol.Schema(
47    {vol.Optional(CONF_MONITORED_CONDITIONS): vol.All(cv.ensure_list)}
48)
49
50CONFIG_SCHEMA = vol.Schema(
51    {
52        DOMAIN: vol.Schema(
53            {
54                vol.Required(CONF_CLIENT_ID): cv.string,
55                vol.Required(CONF_CLIENT_SECRET): cv.string,
56                # Required to use the new API (optional for compatibility)
57                vol.Optional(CONF_PROJECT_ID): cv.string,
58                vol.Optional(CONF_SUBSCRIBER_ID): cv.string,
59                # Config that only currently works on the old API
60                vol.Optional(CONF_STRUCTURE): vol.All(cv.ensure_list, [cv.string]),
61                vol.Optional(CONF_SENSORS): SENSOR_SCHEMA,
62                vol.Optional(CONF_BINARY_SENSORS): SENSOR_SCHEMA,
63            }
64        )
65    },
66    extra=vol.ALLOW_EXTRA,
67)
68
69# Platforms for SDM API
70PLATFORMS = ["sensor", "camera", "climate"]
71
72
73async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
74    """Set up Nest components with dispatch between old/new flows."""
75    hass.data[DOMAIN] = {}
76
77    if DOMAIN not in config:
78        return True
79
80    if CONF_PROJECT_ID not in config[DOMAIN]:
81        return await async_setup_legacy(hass, config)
82
83    if CONF_SUBSCRIBER_ID not in config[DOMAIN]:
84        _LOGGER.error("Configuration option '{CONF_SUBSCRIBER_ID}' required")
85        return False
86
87    # For setup of ConfigEntry below
88    hass.data[DOMAIN][DATA_NEST_CONFIG] = config[DOMAIN]
89    project_id = config[DOMAIN][CONF_PROJECT_ID]
90    config_flow.NestFlowHandler.register_sdm_api(hass)
91    config_flow.NestFlowHandler.async_register_implementation(
92        hass,
93        config_entry_oauth2_flow.LocalOAuth2Implementation(
94            hass,
95            DOMAIN,
96            config[DOMAIN][CONF_CLIENT_ID],
97            config[DOMAIN][CONF_CLIENT_SECRET],
98            OAUTH2_AUTHORIZE.format(project_id=project_id),
99            OAUTH2_TOKEN,
100        ),
101    )
102
103    return True
104
105
106class SignalUpdateCallback:
107    """An EventCallback invoked when new events arrive from subscriber."""
108
109    def __init__(self, hass: HomeAssistant) -> None:
110        """Initialize EventCallback."""
111        self._hass = hass
112
113    async def async_handle_event(self, event_message: EventMessage) -> None:
114        """Process an incoming EventMessage."""
115        if not event_message.resource_update_name:
116            return
117        device_id = event_message.resource_update_name
118        events = event_message.resource_update_events
119        if not events:
120            return
121        _LOGGER.debug("Event Update %s", events.keys())
122        device_registry = await self._hass.helpers.device_registry.async_get_registry()
123        device_entry = device_registry.async_get_device({(DOMAIN, device_id)})
124        if not device_entry:
125            return
126        for event in events:
127            event_type = EVENT_NAME_MAP.get(event)
128            if not event_type:
129                continue
130            message = {
131                "device_id": device_entry.id,
132                "type": event_type,
133                "timestamp": event_message.timestamp,
134            }
135            self._hass.bus.async_fire(NEST_EVENT, message)
136
137
138async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
139    """Set up Nest from a config entry with dispatch between old/new flows."""
140
141    if DATA_SDM not in entry.data:
142        return await async_setup_legacy_entry(hass, entry)
143
144    implementation = (
145        await config_entry_oauth2_flow.async_get_config_entry_implementation(
146            hass, entry
147        )
148    )
149
150    config = hass.data[DOMAIN][DATA_NEST_CONFIG]
151
152    session = config_entry_oauth2_flow.OAuth2Session(hass, entry, implementation)
153    auth = api.AsyncConfigEntryAuth(
154        aiohttp_client.async_get_clientsession(hass),
155        session,
156        config[CONF_CLIENT_ID],
157        config[CONF_CLIENT_SECRET],
158    )
159    subscriber = GoogleNestSubscriber(
160        auth, config[CONF_PROJECT_ID], config[CONF_SUBSCRIBER_ID]
161    )
162    callback = SignalUpdateCallback(hass)
163    subscriber.set_update_callback(callback.async_handle_event)
164
165    try:
166        await subscriber.start_async()
167    except AuthException as err:
168        _LOGGER.debug("Subscriber authentication error: %s", err)
169        raise ConfigEntryAuthFailed from err
170    except ConfigurationException as err:
171        _LOGGER.error("Configuration error: %s", err)
172        subscriber.stop_async()
173        return False
174    except GoogleNestException as err:
175        if DATA_NEST_UNAVAILABLE not in hass.data[DOMAIN]:
176            _LOGGER.error("Subscriber error: %s", err)
177            hass.data[DOMAIN][DATA_NEST_UNAVAILABLE] = True
178        subscriber.stop_async()
179        raise ConfigEntryNotReady from err
180
181    try:
182        await subscriber.async_get_device_manager()
183    except GoogleNestException as err:
184        if DATA_NEST_UNAVAILABLE not in hass.data[DOMAIN]:
185            _LOGGER.error("Device manager error: %s", err)
186            hass.data[DOMAIN][DATA_NEST_UNAVAILABLE] = True
187        subscriber.stop_async()
188        raise ConfigEntryNotReady from err
189
190    hass.data[DOMAIN].pop(DATA_NEST_UNAVAILABLE, None)
191    hass.data[DOMAIN][DATA_SUBSCRIBER] = subscriber
192
193    hass.config_entries.async_setup_platforms(entry, PLATFORMS)
194
195    return True
196
197
198async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
199    """Unload a config entry."""
200    if DATA_SDM not in entry.data:
201        # Legacy API
202        return True
203    _LOGGER.debug("Stopping nest subscriber")
204    subscriber = hass.data[DOMAIN][DATA_SUBSCRIBER]
205    subscriber.stop_async()
206    unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
207    if unload_ok:
208        hass.data[DOMAIN].pop(DATA_SUBSCRIBER)
209        hass.data[DOMAIN].pop(DATA_NEST_UNAVAILABLE, None)
210
211    return unload_ok
212
Full Screen

test_send_async.py

Source: test_send_async.py Github

copy
1# -- coding: utf-8 --
2#-------------------------------------------------------------------------
3# Copyright (c) Microsoft Corporation. All rights reserved.
4# Licensed under the MIT License. See License.txt in the project root for
5# license information.
6#--------------------------------------------------------------------------
7
8import os
9import asyncio
10import pytest
11import time
12import json
13
14from azure.eventhub import EventData, EventHubClientAsync
15
16
17@pytest.mark.asyncio
18async def test_send_with_partition_key_async(connstr_receivers):
19    connection_str, receivers = connstr_receivers
20    client = EventHubClientAsync.from_connection_string(connection_str, debug=False)
21    sender = client.add_async_sender()
22    await client.run_async()
23
24    data_val = 0
25    for partition in [b"a", b"b", b"c", b"d", b"e", b"f"]:
26        partition_key = b"test_partition_" + partition
27        for i in range(50):
28            data = EventData(str(data_val))
29            data.partition_key = partition_key
30            data_val += 1
31            await sender.send(data)
32    await client.stop_async()
33
34    found_partition_keys = {}
35    for index, partition in enumerate(receivers):
36        received = partition.receive(timeout=5)
37        for message in received:
38            try:
39                existing = found_partition_keys[message.partition_key]
40                assert existing == index
41            except KeyError:
42                found_partition_keys[message.partition_key] = index
43
44
45@pytest.mark.asyncio
46async def test_send_and_receive_zero_length_body_async(connstr_receivers):
47    connection_str, receivers = connstr_receivers
48    client = EventHubClientAsync.from_connection_string(connection_str, debug=False)
49    sender = client.add_async_sender()
50    try:
51        await client.run_async()
52        await sender.send(EventData(""))
53    except:
54        raise
55    finally:
56        await client.stop_async()
57
58    received = []
59    for r in receivers:
60        received.extend(r.receive(timeout=1))
61
62    assert len(received) == 1
63    assert list(received[0].body)[0] == b""
64
65
66@pytest.mark.asyncio
67async def test_send_single_event_async(connstr_receivers):
68    connection_str, receivers = connstr_receivers
69    client = EventHubClientAsync.from_connection_string(connection_str, debug=False)
70    sender = client.add_async_sender()
71    try:
72        await client.run_async()
73        await sender.send(EventData(b"A single event"))
74    except:
75        raise
76    finally:
77        await client.stop_async()
78
79    received = []
80    for r in receivers:
81        received.extend(r.receive(timeout=1))
82
83    assert len(received) == 1
84    assert list(received[0].body)[0] == b"A single event"
85
86
87@pytest.mark.asyncio
88async def test_send_batch_async(connstr_receivers):
89    connection_str, receivers = connstr_receivers
90    def batched():
91        for i in range(10):
92            yield "Event number {}".format(i)
93
94    client = EventHubClientAsync.from_connection_string(connection_str, debug=False)
95    sender = client.add_async_sender()
96    try:
97        await client.run_async()
98        await sender.send(EventData(batch=batched()))
99    except:
100        raise
101    finally:
102        await client.stop_async()
103
104    time.sleep(1)
105    received = []
106    for r in receivers:
107        received.extend(r.receive(timeout=3))
108
109    assert len(received) == 10
110    for index, message in enumerate(received):
111        assert list(message.body)[0] == "Event number {}".format(index).encode('utf-8')
112
113
114@pytest.mark.asyncio
115async def test_send_partition_async(connstr_receivers):
116    connection_str, receivers = connstr_receivers
117    client = EventHubClientAsync.from_connection_string(connection_str, debug=False)
118    sender = client.add_async_sender(partition="1")
119    try:
120        await client.run_async()
121        await sender.send(EventData(b"Data"))
122    except:
123        raise
124    finally:
125        await client.stop_async()
126
127    partition_0 = receivers[0].receive(timeout=2)
128    assert len(partition_0) == 0
129    partition_1 = receivers[1].receive(timeout=2)
130    assert len(partition_1) == 1
131
132
133@pytest.mark.asyncio
134async def test_send_non_ascii_async(connstr_receivers):
135    connection_str, receivers = connstr_receivers
136    client = EventHubClientAsync.from_connection_string(connection_str, debug=False)
137    sender = client.add_async_sender(partition="0")
138    try:
139        await client.run_async()
140        await sender.send(EventData("é,è,à,ù,â,ê,î,ô,û"))
141        await sender.send(EventData(json.dumps({"foo": "漢字"})))
142    except:
143        raise
144    finally:
145        await client.stop_async()
146
147    partition_0 = receivers[0].receive(timeout=2)
148    assert len(partition_0) == 2
149    assert partition_0[0].body_as_str() == "é,è,à,ù,â,ê,î,ô,û"
150    assert partition_0[1].body_as_json() == {"foo": "漢字"}
151
152
153@pytest.mark.asyncio
154async def test_send_partition_batch_async(connstr_receivers):
155    connection_str, receivers = connstr_receivers
156    def batched():
157        for i in range(10):
158            yield "Event number {}".format(i)
159
160    client = EventHubClientAsync.from_connection_string(connection_str, debug=False)
161    sender = client.add_async_sender(partition="1")
162    try:
163        await client.run_async()
164        await sender.send(EventData(batch=batched()))
165    except:
166        raise
167    finally:
168        await client.stop_async()
169
170    partition_0 = receivers[0].receive(timeout=2)
171    assert len(partition_0) == 0
172    partition_1 = receivers[1].receive(timeout=2)
173    assert len(partition_1) == 10
174
175
176@pytest.mark.asyncio
177async def test_send_array_async(connstr_receivers):
178    connection_str, receivers = connstr_receivers
179    client = EventHubClientAsync.from_connection_string(connection_str, debug=False)
180    sender = client.add_async_sender()
181    try:
182        await client.run_async()
183        await sender.send(EventData([b"A", b"B", b"C"]))
184    except:
185        raise
186    finally:
187        await client.stop_async()
188
189    received = []
190    for r in receivers:
191        received.extend(r.receive(timeout=1))
192
193    assert len(received) == 1
194    assert list(received[0].body) == [b"A", b"B", b"C"]
195
196
197@pytest.mark.asyncio
198async def test_send_multiple_clients_async(connstr_receivers):
199    connection_str, receivers = connstr_receivers
200    client = EventHubClientAsync.from_connection_string(connection_str, debug=False)
201    sender_0 = client.add_async_sender(partition="0")
202    sender_1 = client.add_async_sender(partition="1")
203    try:
204        await client.run_async()
205        await sender_0.send(EventData(b"Message 0"))
206        await sender_1.send(EventData(b"Message 1"))
207    except:
208        raise
209    finally:
210        await client.stop_async()
211
212    partition_0 = receivers[0].receive(timeout=2)
213    assert len(partition_0) == 1
214    partition_1 = receivers[1].receive(timeout=2)
215    assert len(partition_1) == 1
Full Screen

test_receive_async.py

Source: test_receive_async.py Github

copy
1#-------------------------------------------------------------------------
2# Copyright (c) Microsoft Corporation. All rights reserved.
3# Licensed under the MIT License. See License.txt in the project root for
4# license information.
5#--------------------------------------------------------------------------
6
7import os
8import asyncio
9import pytest
10import time
11
12from azure import eventhub
13from azure.eventhub import EventData, Offset, EventHubError, EventHubClientAsync
14
15
16@pytest.mark.asyncio
17async def test_receive_end_of_stream_async(connstr_senders):
18    connection_str, senders = connstr_senders
19    client = EventHubClientAsync.from_connection_string(connection_str, debug=False)
20    receiver = client.add_async_receiver("$default", "0", offset=Offset('@latest'))
21    await client.run_async()
22    try:
23        received = await receiver.receive(timeout=5)
24        assert len(received) == 0
25        senders[0].send(EventData(b"Receiving only a single event"))
26        received = await receiver.receive(timeout=5)
27        assert len(received) == 1
28
29        assert list(received[-1].body)[0] == b"Receiving only a single event"
30    except:
31        raise
32    finally:
33        await client.stop_async()
34
35
36@pytest.mark.asyncio
37async def test_receive_with_offset_async(connstr_senders):
38    connection_str, senders = connstr_senders
39    client = EventHubClientAsync.from_connection_string(connection_str, debug=False)
40    receiver = client.add_async_receiver("$default", "0", offset=Offset('@latest'))
41    await client.run_async()
42    try:
43        received = await receiver.receive(timeout=5)
44        assert len(received) == 0
45        senders[0].send(EventData(b"Data"))
46        time.sleep(1)
47        received = await receiver.receive(timeout=3)
48        assert len(received) == 1
49        offset = received[0].offset
50
51        offset_receiver = client.add_async_receiver("$default", "0", offset=offset)
52        await client.run_async()
53        received = await offset_receiver.receive(timeout=5)
54        assert len(received) == 0
55        senders[0].send(EventData(b"Message after offset"))
56        received = await offset_receiver.receive(timeout=5)
57        assert len(received) == 1
58    except:
59        raise
60    finally:
61        await client.stop_async()
62
63
64@pytest.mark.asyncio
65async def test_receive_with_inclusive_offset_async(connstr_senders):
66    connection_str, senders = connstr_senders
67    client = EventHubClientAsync.from_connection_string(connection_str, debug=False)
68    receiver = client.add_async_receiver("$default", "0", offset=Offset('@latest'))
69    await client.run_async()
70    try:
71        received = await receiver.receive(timeout=5)
72        assert len(received) == 0
73        senders[0].send(EventData(b"Data"))
74        time.sleep(1)
75        received = await receiver.receive(timeout=5)
76        assert len(received) == 1
77        offset = received[0].offset
78
79        offset_receiver = client.add_async_receiver("$default", "0", offset=Offset(offset.value, inclusive=True))
80        await client.run_async()
81        received = await offset_receiver.receive(timeout=5)
82        assert len(received) == 1
83    except:
84        raise
85    finally:
86        await client.stop_async()
87
88
89@pytest.mark.asyncio
90async def test_receive_with_datetime_async(connstr_senders):
91    connection_str, senders = connstr_senders
92    client = EventHubClientAsync.from_connection_string(connection_str, debug=False)
93    receiver = client.add_async_receiver("$default", "0", offset=Offset('@latest'))
94    await client.run_async()
95    try:
96        received = await receiver.receive(timeout=5)
97        assert len(received) == 0
98        senders[0].send(EventData(b"Data"))
99        received = await receiver.receive(timeout=5)
100        assert len(received) == 1
101        offset = received[0].enqueued_time
102
103        offset_receiver = client.add_async_receiver("$default", "0", offset=Offset(offset))
104        await client.run_async()
105        received = await offset_receiver.receive(timeout=5)
106        assert len(received) == 0
107        senders[0].send(EventData(b"Message after timestamp"))
108        time.sleep(1)
109        received = await offset_receiver.receive(timeout=5)
110        assert len(received) == 1
111    except:
112        raise
113    finally:
114        await client.stop_async()
115
116
117@pytest.mark.asyncio
118async def test_receive_with_sequence_no_async(connstr_senders):
119    connection_str, senders = connstr_senders
120    client = EventHubClientAsync.from_connection_string(connection_str, debug=False)
121    receiver = client.add_async_receiver("$default", "0", offset=Offset('@latest'))
122    await client.run_async()
123    try:
124        received = await receiver.receive(timeout=5)
125        assert len(received) == 0
126        senders[0].send(EventData(b"Data"))
127        received = await receiver.receive(timeout=5)
128        assert len(received) == 1
129        offset = received[0].sequence_number
130
131        offset_receiver = client.add_async_receiver("$default", "0", offset=Offset(offset))
132        await client.run_async()
133        received = await offset_receiver.receive(timeout=5)
134        assert len(received) == 0
135        senders[0].send(EventData(b"Message next in sequence"))
136        time.sleep(1)
137        received = await offset_receiver.receive(timeout=5)
138        assert len(received) == 1
139    except:
140        raise
141    finally:
142        await client.stop_async()
143
144
145@pytest.mark.asyncio
146async def test_receive_with_inclusive_sequence_no_async(connstr_senders):
147    connection_str, senders = connstr_senders
148    client = EventHubClientAsync.from_connection_string(connection_str, debug=False)
149    receiver = client.add_async_receiver("$default", "0", offset=Offset('@latest'))
150    await client.run_async()
151    try:
152        received = await receiver.receive(timeout=5)
153        assert len(received) == 0
154        senders[0].send(EventData(b"Data"))
155        received = await receiver.receive(timeout=5)
156        assert len(received) == 1
157        offset = received[0].sequence_number
158
159        offset_receiver = client.add_async_receiver("$default", "0", offset=Offset(offset, inclusive=True))
160        await client.run_async()
161        received = await offset_receiver.receive(timeout=5)
162        assert len(received) == 1
163    except:
164        raise
165    finally:
166        await client.stop_async()
167
168
169@pytest.mark.asyncio
170async def test_receive_batch_async(connstr_senders):
171    connection_str, senders = connstr_senders
172    client = EventHubClientAsync.from_connection_string(connection_str, debug=False)
173    receiver = client.add_async_receiver("$default", "0", prefetch=500, offset=Offset('@latest'))
174    await client.run_async()
175    try:
176        received = await receiver.receive(timeout=5)
177        assert len(received) == 0
178        for i in range(10):
179            senders[0].send(EventData(b"Data"))
180        received = await receiver.receive(max_batch_size=5, timeout=5)
181        assert len(received) == 5
182    except:
183        raise
184    finally:
185        await client.stop_async()
186
187
188async def pump(receiver, sleep=None):
189    messages = 0
190    count = 0
191    if sleep:
192        await asyncio.sleep(sleep)
193    batch = await receiver.receive(timeout=10)
194    while batch:
195        count += 1
196        if count >= 10:
197            break
198        messages += len(batch)
199        batch = await receiver.receive(timeout=10)
200    return messages
201
202
203@pytest.mark.asyncio
204async def test_epoch_receiver_async(connstr_senders):
205    connection_str, senders = connstr_senders
206    senders[0].send(EventData(b"Receiving only a single event"))
207
208    client = EventHubClientAsync.from_connection_string(connection_str, debug=False)
209    receivers = []
210    for epoch in [10, 20]:
211        receivers.append(client.add_async_epoch_receiver("$default", "0", epoch, prefetch=5))
212    try:
213        await client.run_async()
214        outputs = await asyncio.gather(
215            pump(receivers[0]),
216            pump(receivers[1]),
217            return_exceptions=True)
218        assert isinstance(outputs[0], EventHubError)
219        assert outputs[1] == 1
220    except:
221        raise
222    finally:
223        await client.stop_async()
224
225
226@pytest.mark.asyncio
227async def test_multiple_receiver_async(connstr_senders):
228    connection_str, senders = connstr_senders
229    senders[0].send(EventData(b"Receiving only a single event"))
230
231    client = EventHubClientAsync.from_connection_string(connection_str, debug=True)
232    partitions = await client.get_eventhub_info_async()
233    assert partitions["partition_ids"] == ["0", "1"]
234    receivers = []
235    for i in range(2):
236        receivers.append(client.add_async_receiver("$default", "0", prefetch=10))
237    try:
238        await client.run_async()
239        more_partitions = await client.get_eventhub_info_async()
240        assert more_partitions["partition_ids"] == ["0", "1"]
241        outputs = await asyncio.gather(
242            pump(receivers[0]),
243            pump(receivers[1]),
244            return_exceptions=True)
245        assert isinstance(outputs[0], int) and outputs[0] == 1
246        assert isinstance(outputs[1], int) and outputs[1] == 1
247    except:
248        raise
249    finally:
250        await client.stop_async()
251
252
253@pytest.mark.asyncio
254async def test_epoch_receiver_after_non_epoch_receiver_async(connstr_senders):
255    connection_str, senders = connstr_senders
256    senders[0].send(EventData(b"Receiving only a single event"))
257
258    client = EventHubClientAsync.from_connection_string(connection_str, debug=False)
259    receivers = []
260    receivers.append(client.add_async_receiver("$default", "0", prefetch=10))
261    receivers.append(client.add_async_epoch_receiver("$default", "0", 15, prefetch=10))
262    try:
263        await client.run_async()
264        outputs = await asyncio.gather(
265            pump(receivers[0]),
266            pump(receivers[1], sleep=5),
267            return_exceptions=True)
268        assert isinstance(outputs[0], EventHubError)
269        assert isinstance(outputs[1], int) and outputs[1] == 1
270    except:
271        raise
272    finally:
273        await client.stop_async()
274
275
276@pytest.mark.asyncio
277async def test_non_epoch_receiver_after_epoch_receiver_async(connstr_senders):
278    connection_str, senders = connstr_senders
279    senders[0].send(EventData(b"Receiving only a single event"))
280
281    client = EventHubClientAsync.from_connection_string(connection_str, debug=False)
282    receivers = []
283    receivers.append(client.add_async_epoch_receiver("$default", "0", 15, prefetch=10))
284    receivers.append(client.add_async_receiver("$default", "0", prefetch=10))
285    try:
286        await client.run_async()
287        outputs = await asyncio.gather(
288            pump(receivers[0]),
289            pump(receivers[1]),
290            return_exceptions=True)
291        assert isinstance(outputs[1], EventHubError)
292        assert isinstance(outputs[0], int) and outputs[0] == 1
293    except:
294        raise
295    finally:
296        await client.stop_async()
297
298
Full Screen

Accelerate Your Automation Test Cycles With LambdaTest

Leverage LambdaTest’s cloud-based platform to execute your automation tests in parallel and trim down your test execution time significantly. Your first 100 automation testing minutes are on us.

Try LambdaTest

Run Python Tests on LambdaTest Cloud Grid

Execute automation tests with Playwright Python on a cloud-based Grid of 3000+ real browsers and operating systems for both web and mobile applications.

Test now for Free
LambdaTestX

We use cookies to give you the best experience. Cookies help to provide a more personalized experience and relevant advertising for you, and web analytics for us. Learn More in our Cookies policy, Privacy & Terms of service

Allow Cookie
Sarah

I hope you find the best code examples for your project.

If you want to accelerate automated browser testing, try LambdaTest. Your first 100 automation testing minutes are FREE.

Sarah Elson (Product & Growth Lead)