How to use stream_output method in avocado

Best Python code snippet using avocado_python

test_abstract_source.py

Source:test_abstract_source.py Github

copy

Full Screen

1#2# Copyright (c) 2021 Airbyte, Inc., all rights reserved.3#4import logging5from collections import defaultdict6from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Tuple, Union7import pytest8from airbyte_cdk.models import (9 AirbyteCatalog,10 AirbyteConnectionStatus,11 AirbyteMessage,12 AirbyteRecordMessage,13 AirbyteStateMessage,14 AirbyteStream,15 ConfiguredAirbyteCatalog,16 ConfiguredAirbyteStream,17 DestinationSyncMode,18 Status,19 SyncMode,20 Type,21)22from airbyte_cdk.sources import AbstractSource23from airbyte_cdk.sources.streams import Stream24logger = logging.getLogger("airbyte")25class MockSource(AbstractSource):26 def __init__(self, check_lambda: Callable[[], Tuple[bool, Optional[Any]]] = None, streams: List[Stream] = None):27 self._streams = streams28 self.check_lambda = check_lambda29 def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:30 if self.check_lambda:31 return self.check_lambda()32 return (False, "Missing callable.")33 def streams(self, config: Mapping[str, Any]) -> List[Stream]:34 if not self._streams:35 raise Exception("Stream is not set")36 return self._streams37def test_successful_check():38 """Tests that if a source returns TRUE for the connection check the appropriate connectionStatus success message is returned"""39 expected = AirbyteConnectionStatus(status=Status.SUCCEEDED)40 assert expected == MockSource(check_lambda=lambda: (True, None)).check(logger, {})41def test_failed_check():42 """Tests that if a source returns FALSE for the connection check the appropriate connectionStatus failure message is returned"""43 expected = AirbyteConnectionStatus(status=Status.FAILED, message="'womp womp'")44 assert expected == MockSource(check_lambda=lambda: (False, "womp womp")).check(logger, {})45def test_raising_check():46 """Tests that if a source raises an unexpected exception the connection check the appropriate connectionStatus failure message is returned"""47 expected = AirbyteConnectionStatus(status=Status.FAILED, message="Exception('this should fail')")48 assert expected == MockSource(check_lambda=lambda: exec('raise Exception("this should fail")')).check(logger, {})49class MockStream(Stream):50 def __init__(self, inputs_and_mocked_outputs: List[Tuple[Mapping[str, Any], Iterable[Mapping[str, Any]]]] = None, name: str = None):51 self._inputs_and_mocked_outputs = inputs_and_mocked_outputs52 self._name = name53 @property54 def name(self):55 return self._name56 def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]: # type: ignore57 # Remove None values58 kwargs = {k: v for k, v in kwargs.items() if v is not None}59 if self._inputs_and_mocked_outputs:60 for _input, output in self._inputs_and_mocked_outputs:61 if kwargs == _input:62 return output63 raise Exception(f"No mocked output supplied for input: {kwargs}. Mocked inputs/outputs: {self._inputs_and_mocked_outputs}")64 @property65 def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:66 return "pk"67def test_discover(mocker):68 """Tests that the appropriate AirbyteCatalog is returned from the discover method"""69 airbyte_stream1 = AirbyteStream(70 name="1",71 json_schema={},72 supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental],73 default_cursor_field=["cursor"],74 source_defined_cursor=True,75 source_defined_primary_key=[["pk"]],76 )77 airbyte_stream2 = AirbyteStream(name="2", json_schema={}, supported_sync_modes=[SyncMode.full_refresh])78 stream1 = MockStream()79 stream2 = MockStream()80 mocker.patch.object(stream1, "as_airbyte_stream", return_value=airbyte_stream1)81 mocker.patch.object(stream2, "as_airbyte_stream", return_value=airbyte_stream2)82 expected = AirbyteCatalog(streams=[airbyte_stream1, airbyte_stream2])83 src = MockSource(check_lambda=lambda: (True, None), streams=[stream1, stream2])84 assert expected == src.discover(logger, {})85def test_read_nonexistent_stream_raises_exception(mocker):86 """Tests that attempting to sync a stream which the source does not return from the `streams` method raises an exception"""87 s1 = MockStream(name="s1")88 s2 = MockStream(name="this_stream_doesnt_exist_in_the_source")89 mocker.patch.object(MockStream, "get_json_schema", return_value={})90 src = MockSource(streams=[s1])91 catalog = ConfiguredAirbyteCatalog(streams=[_configured_stream(s2, SyncMode.full_refresh)])92 with pytest.raises(KeyError):93 list(src.read(logger, {}, catalog))94GLOBAL_EMITTED_AT = 195def _as_record(stream: str, data: Dict[str, Any]) -> AirbyteMessage:96 return AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream=stream, data=data, emitted_at=GLOBAL_EMITTED_AT))97def _as_records(stream: str, data: List[Dict[str, Any]]) -> List[AirbyteMessage]:98 return [_as_record(stream, datum) for datum in data]99def _configured_stream(stream: Stream, sync_mode: SyncMode):100 return ConfiguredAirbyteStream(101 stream=stream.as_airbyte_stream(), sync_mode=sync_mode, destination_sync_mode=DestinationSyncMode.overwrite102 )103def _fix_emitted_at(messages: List[AirbyteMessage]) -> List[AirbyteMessage]:104 for msg in messages:105 if msg.type == Type.RECORD and msg.record:106 msg.record.emitted_at = GLOBAL_EMITTED_AT107 return messages108def test_valid_full_refresh_read_no_slices(mocker):109 """Tests that running a full refresh sync on streams which don't specify slices produces the expected AirbyteMessages"""110 stream_output = [{"k1": "v1"}, {"k2": "v2"}]111 s1 = MockStream([({"sync_mode": SyncMode.full_refresh}, stream_output)], name="s1")112 s2 = MockStream([({"sync_mode": SyncMode.full_refresh}, stream_output)], name="s2")113 mocker.patch.object(MockStream, "get_json_schema", return_value={})114 src = MockSource(streams=[s1, s2])115 catalog = ConfiguredAirbyteCatalog(116 streams=[_configured_stream(s1, SyncMode.full_refresh), _configured_stream(s2, SyncMode.full_refresh)]117 )118 expected = _as_records("s1", stream_output) + _as_records("s2", stream_output)119 messages = _fix_emitted_at(list(src.read(logger, {}, catalog)))120 assert expected == messages121def test_valid_full_refresh_read_with_slices(mocker):122 """Tests that running a full refresh sync on streams which use slices produces the expected AirbyteMessages"""123 slices = [{"1": "1"}, {"2": "2"}]124 # When attempting to sync a slice, just output that slice as a record125 s1 = MockStream([({"sync_mode": SyncMode.full_refresh, "stream_slice": s}, [s]) for s in slices], name="s1")126 s2 = MockStream([({"sync_mode": SyncMode.full_refresh, "stream_slice": s}, [s]) for s in slices], name="s2")127 mocker.patch.object(MockStream, "get_json_schema", return_value={})128 mocker.patch.object(MockStream, "stream_slices", return_value=slices)129 src = MockSource(streams=[s1, s2])130 catalog = ConfiguredAirbyteCatalog(131 streams=[_configured_stream(s1, SyncMode.full_refresh), _configured_stream(s2, SyncMode.full_refresh)]132 )133 expected = [*_as_records("s1", slices), *_as_records("s2", slices)]134 messages = _fix_emitted_at(list(src.read(logger, {}, catalog)))135 assert expected == messages136def _state(state_data: Dict[str, Any]):137 return AirbyteMessage(type=Type.STATE, state=AirbyteStateMessage(data=state_data))138def test_valid_incremental_read_with_checkpoint_interval(mocker):139 """Tests that an incremental read which doesn't specify a checkpoint interval outputs a STATE message after reading N records within a stream"""140 stream_output = [{"k1": "v1"}, {"k2": "v2"}]141 s1 = MockStream([({"sync_mode": SyncMode.incremental, "stream_state": {}}, stream_output)], name="s1")142 s2 = MockStream([({"sync_mode": SyncMode.incremental, "stream_state": {}}, stream_output)], name="s2")143 state = {"cursor": "value"}144 mocker.patch.object(MockStream, "get_updated_state", return_value=state)145 mocker.patch.object(MockStream, "supports_incremental", return_value=True)146 mocker.patch.object(MockStream, "get_json_schema", return_value={})147 # Tell the source to output one state message per record148 mocker.patch.object(MockStream, "state_checkpoint_interval", new_callable=mocker.PropertyMock, return_value=1)149 src = MockSource(streams=[s1, s2])150 catalog = ConfiguredAirbyteCatalog(streams=[_configured_stream(s1, SyncMode.incremental), _configured_stream(s2, SyncMode.incremental)])151 expected = [152 _as_record("s1", stream_output[0]),153 _state({"s1": state}),154 _as_record("s1", stream_output[1]),155 _state({"s1": state}),156 _state({"s1": state}),157 _as_record("s2", stream_output[0]),158 _state({"s1": state, "s2": state}),159 _as_record("s2", stream_output[1]),160 _state({"s1": state, "s2": state}),161 _state({"s1": state, "s2": state}),162 ]163 messages = _fix_emitted_at(list(src.read(logger, {}, catalog, state=defaultdict(dict))))164 assert expected == messages165def test_valid_incremental_read_with_no_interval(mocker):166 """Tests that an incremental read which doesn't specify a checkpoint interval outputs a STATE message only after fully reading the stream and does167 not output any STATE messages during syncing the stream."""168 stream_output = [{"k1": "v1"}, {"k2": "v2"}]169 s1 = MockStream([({"sync_mode": SyncMode.incremental, "stream_state": {}}, stream_output)], name="s1")170 s2 = MockStream([({"sync_mode": SyncMode.incremental, "stream_state": {}}, stream_output)], name="s2")171 state = {"cursor": "value"}172 mocker.patch.object(MockStream, "get_updated_state", return_value=state)173 mocker.patch.object(MockStream, "supports_incremental", return_value=True)174 mocker.patch.object(MockStream, "get_json_schema", return_value={})175 src = MockSource(streams=[s1, s2])176 catalog = ConfiguredAirbyteCatalog(streams=[_configured_stream(s1, SyncMode.incremental), _configured_stream(s2, SyncMode.incremental)])177 expected = [178 *_as_records("s1", stream_output),179 _state({"s1": state}),180 *_as_records("s2", stream_output),181 _state({"s1": state, "s2": state}),182 ]183 messages = _fix_emitted_at(list(src.read(logger, {}, catalog, state=defaultdict(dict))))184 assert expected == messages185def test_valid_incremental_read_with_slices(mocker):186 """Tests that an incremental read which uses slices outputs each record in the slice followed by a STATE message, for each slice"""187 slices = [{"1": "1"}, {"2": "2"}]188 stream_output = [{"k1": "v1"}, {"k2": "v2"}, {"k3": "v3"}]189 s1 = MockStream(190 [({"sync_mode": SyncMode.incremental, "stream_slice": s, "stream_state": mocker.ANY}, stream_output) for s in slices], name="s1"191 )192 s2 = MockStream(193 [({"sync_mode": SyncMode.incremental, "stream_slice": s, "stream_state": mocker.ANY}, stream_output) for s in slices], name="s2"194 )195 state = {"cursor": "value"}196 mocker.patch.object(MockStream, "get_updated_state", return_value=state)197 mocker.patch.object(MockStream, "supports_incremental", return_value=True)198 mocker.patch.object(MockStream, "get_json_schema", return_value={})199 mocker.patch.object(MockStream, "stream_slices", return_value=slices)200 src = MockSource(streams=[s1, s2])201 catalog = ConfiguredAirbyteCatalog(streams=[_configured_stream(s1, SyncMode.incremental), _configured_stream(s2, SyncMode.incremental)])202 expected = [203 # stream 1 slice 1204 *_as_records("s1", stream_output),205 _state({"s1": state}),206 # stream 1 slice 2207 *_as_records("s1", stream_output),208 _state({"s1": state}),209 # stream 2 slice 1210 *_as_records("s2", stream_output),211 _state({"s1": state, "s2": state}),212 # stream 2 slice 2213 *_as_records("s2", stream_output),214 _state({"s1": state, "s2": state}),215 ]216 messages = _fix_emitted_at(list(src.read(logger, {}, catalog, state=defaultdict(dict))))217 assert expected == messages218def test_valid_incremental_read_with_slices_and_interval(mocker):219 """220 Tests that an incremental read which uses slices and a checkpoint interval:221 1. outputs all records222 2. outputs a state message every N records (N=checkpoint_interval)223 3. outputs a state message after reading the entire slice224 """225 slices = [{"1": "1"}, {"2": "2"}]226 stream_output = [{"k1": "v1"}, {"k2": "v2"}, {"k3": "v3"}]227 s1 = MockStream(228 [({"sync_mode": SyncMode.incremental, "stream_slice": s, "stream_state": mocker.ANY}, stream_output) for s in slices], name="s1"229 )230 s2 = MockStream(231 [({"sync_mode": SyncMode.incremental, "stream_slice": s, "stream_state": mocker.ANY}, stream_output) for s in slices], name="s2"232 )233 state = {"cursor": "value"}234 mocker.patch.object(MockStream, "get_updated_state", return_value=state)235 mocker.patch.object(MockStream, "supports_incremental", return_value=True)236 mocker.patch.object(MockStream, "get_json_schema", return_value={})237 mocker.patch.object(MockStream, "stream_slices", return_value=slices)238 mocker.patch.object(MockStream, "state_checkpoint_interval", new_callable=mocker.PropertyMock, return_value=2)239 src = MockSource(streams=[s1, s2])240 catalog = ConfiguredAirbyteCatalog(streams=[_configured_stream(s1, SyncMode.incremental), _configured_stream(s2, SyncMode.incremental)])241 expected = [242 # stream 1 slice 1243 _as_record("s1", stream_output[0]),244 _as_record("s1", stream_output[1]),245 _state({"s1": state}),246 _as_record("s1", stream_output[2]),247 _state({"s1": state}),248 # stream 1 slice 2249 _as_record("s1", stream_output[0]),250 _as_record("s1", stream_output[1]),251 _state({"s1": state}),252 _as_record("s1", stream_output[2]),253 _state({"s1": state}),254 # stream 2 slice 1255 _as_record("s2", stream_output[0]),256 _as_record("s2", stream_output[1]),257 _state({"s1": state, "s2": state}),258 _as_record("s2", stream_output[2]),259 _state({"s1": state, "s2": state}),260 # stream 2 slice 2261 _as_record("s2", stream_output[0]),262 _as_record("s2", stream_output[1]),263 _state({"s1": state, "s2": state}),264 _as_record("s2", stream_output[2]),265 _state({"s1": state, "s2": state}),266 ]267 messages = _fix_emitted_at(list(src.read(logger, {}, catalog, state=defaultdict(dict))))...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful