Best Python code snippet using lisa_python
test_type_schema_loader.py
Source:test_type_schema_loader.py  
...43def test_load_type_schema_from_json(loader):44    with patch.object(45        loader, "load_type_schema_from_json", wraps=loader.load_type_schema_from_json46    ) as mock_load_json:47        type_schema = loader.load_type_schema(TEST_TARGET_SCHEMA_JSON)48    assert_dict_equals(TEST_TARGET_SCHEMA, type_schema)49    mock_load_json.assert_called_with(TEST_TARGET_SCHEMA_JSON, None)50def test_load_type_schema_from_invalid_json(loader):51    with patch.object(52        loader, "load_type_schema_from_json", wraps=loader.load_type_schema_from_json53    ) as mock_load_json:54        type_schema = loader.load_type_schema(55            '{"Credentials" :{"ApiKey": "123", xxxx}}'56        )57    assert not type_schema58    mock_load_json.assert_called_with('{"Credentials" :{"ApiKey": "123", xxxx}}', None)59def test_load_type_schema_from_invalid_json_fallback_to_default(loader):60    with patch.object(61        loader, "load_type_schema_from_json", wraps=loader.load_type_schema_from_json62    ) as mock_load_json:63        type_schema = loader.load_type_schema(64            '{"Credentials" :{"ApiKey": "123", xxxx}}',65            OTHER_TEST_TARGET_FALLBACK_SCHEMA,66        )67    assert_dict_equals(OTHER_TEST_TARGET_FALLBACK_SCHEMA, type_schema)68    mock_load_json.assert_called_with(69        '{"Credentials" :{"ApiKey": "123", xxxx}}', OTHER_TEST_TARGET_FALLBACK_SCHEMA70    )71def test_load_type_schema_from_json_array(loader):72    with patch.object(73        loader, "load_type_schema_from_json", wraps=loader.load_type_schema_from_json74    ) as mock_load_json:75        type_schema = loader.load_type_schema(TEST_TARGET_SCHEMA_JSON_ARRAY)76    assert [TEST_TARGET_SCHEMA] == type_schema77    mock_load_json.assert_called_with(TEST_TARGET_SCHEMA_JSON_ARRAY, None)78def test_load_type_schema_from_invalid_json_array(loader):79    with patch.object(80        loader, "load_type_schema_from_json", wraps=loader.load_type_schema_from_json81    ) as mock_load_json:82        type_schema = loader.load_type_schema('[{"Credentials" :{"ApiKey": "123"}}]]')83    assert not type_schema84    mock_load_json.assert_called_with('[{"Credentials" :{"ApiKey": "123"}}]]', None)85def test_load_type_schema_from_invalid_json_array_fallback_to_default(loader):86    with patch.object(87        loader, "load_type_schema_from_json", wraps=loader.load_type_schema_from_json88    ) as mock_load_json:89        type_schema = loader.load_type_schema(90            '[{"Credentials" :{"ApiKey": "123"}}]]',91            OTHER_TEST_TARGET_FALLBACK_SCHEMA,92        )93    assert_dict_equals(OTHER_TEST_TARGET_FALLBACK_SCHEMA, type_schema)94    mock_load_json.assert_called_with(95        '[{"Credentials" :{"ApiKey": "123"}}]]', OTHER_TEST_TARGET_FALLBACK_SCHEMA96    )97def test_load_type_schema_from_file(loader):98    patch_file = patch("builtins.open", mock_open(read_data=TEST_TARGET_SCHEMA_JSON))99    patch_path_is_file = patch(100        "rpdk.core.type_schema_loader.os.path.isfile", return_value=True101    )102    patch_load_file = patch.object(103        loader, "load_type_schema_from_file", wraps=loader.load_type_schema_from_file104    )105    with patch_file as mock_file, patch_path_is_file as mock_path_is_file, patch_load_file as mock_load_file:106        type_schema = loader.load_type_schema(TEST_TARGET_SCHEMA_FILE_PATH)107    assert_dict_equals(TEST_TARGET_SCHEMA, type_schema)108    mock_path_is_file.assert_any_call(TEST_TARGET_SCHEMA_FILE_PATH)109    mock_load_file.assert_called_with(TEST_TARGET_SCHEMA_FILE_PATH, None)110    mock_file.assert_called_with(TEST_TARGET_SCHEMA_FILE_PATH, "r")111def test_load_type_schema_from_file_file_not_found(loader):112    patch_file = patch("builtins.open", mock_open())113    patch_path_is_file = patch(114        "rpdk.core.type_schema_loader.os.path.isfile", return_value=True115    )116    patch_load_file = patch.object(117        loader, "load_type_schema_from_file", wraps=loader.load_type_schema_from_file118    )119    with patch_file as mock_file, patch_path_is_file as mock_path_is_file, patch_load_file as mock_load_file:120        mock_file.side_effect = FileNotFoundError()121        type_schema = loader.load_type_schema(TEST_TARGET_SCHEMA_FILE_PATH)122    assert not type_schema123    mock_path_is_file.assert_any_call(TEST_TARGET_SCHEMA_FILE_PATH)124    mock_load_file.assert_called_with(TEST_TARGET_SCHEMA_FILE_PATH, None)125    mock_file.assert_called_with(TEST_TARGET_SCHEMA_FILE_PATH, "r")126def test_load_type_schema_from_file_error_fallback_to_default(loader):127    patch_file = patch("builtins.open", mock_open())128    patch_path_is_file = patch(129        "rpdk.core.type_schema_loader.os.path.isfile", return_value=True130    )131    patch_load_file = patch.object(132        loader, "load_type_schema_from_file", wraps=loader.load_type_schema_from_file133    )134    with patch_file as mock_file, patch_path_is_file as mock_path_is_file, patch_load_file as mock_load_file:135        mock_file.side_effect = FileNotFoundError()136        type_schema = loader.load_type_schema(137            TEST_TARGET_SCHEMA_FILE_PATH, OTHER_TEST_TARGET_FALLBACK_SCHEMA138        )139    assert_dict_equals(OTHER_TEST_TARGET_FALLBACK_SCHEMA, type_schema)140    mock_path_is_file.assert_any_call(TEST_TARGET_SCHEMA_FILE_PATH)141    mock_load_file.assert_called_with(142        TEST_TARGET_SCHEMA_FILE_PATH, OTHER_TEST_TARGET_FALLBACK_SCHEMA143    )144    mock_file.assert_called_with(TEST_TARGET_SCHEMA_FILE_PATH, "r")145def test_load_type_schema_from_file_uri(loader):146    patch_file = patch("builtins.open", mock_open(read_data=TEST_TARGET_SCHEMA_JSON))147    patch_load_from_uri = patch.object(148        loader, "load_type_schema_from_uri", wraps=loader.load_type_schema_from_uri149    )150    patch_load_file = patch.object(151        loader, "load_type_schema_from_file", wraps=loader.load_type_schema_from_file152    )153    with patch_file as mock_file, patch_load_from_uri as mock_load_from_uri, patch_load_file as mock_load_file:154        type_schema = loader.load_type_schema(TEST_TARGET_SCHEMA_FILE_URI)155    assert_dict_equals(TEST_TARGET_SCHEMA, type_schema)156    mock_load_from_uri.assert_called_with(TEST_TARGET_SCHEMA_FILE_URI, None)157    mock_load_file.assert_called_with(TEST_TARGET_SCHEMA_FILE_PATH, None)158    mock_file.assert_called_with(TEST_TARGET_SCHEMA_FILE_PATH, "r")159def test_load_type_schema_from_file_uri_file_not_found(loader):160    patch_file = patch("builtins.open", mock_open())161    patch_load_from_uri = patch.object(162        loader, "load_type_schema_from_uri", wraps=loader.load_type_schema_from_uri163    )164    patch_load_file = patch.object(165        loader, "load_type_schema_from_file", wraps=loader.load_type_schema_from_file166    )167    with patch_file as mock_file, patch_load_from_uri as mock_load_from_uri, patch_load_file as mock_load_file:168        mock_file.side_effect = FileNotFoundError()169        type_schema = loader.load_type_schema(TEST_TARGET_SCHEMA_FILE_URI)170    assert not type_schema171    mock_load_from_uri.assert_called_with(TEST_TARGET_SCHEMA_FILE_URI, None)172    mock_load_file.assert_called_with(TEST_TARGET_SCHEMA_FILE_PATH, None)173    mock_file.assert_called_with(TEST_TARGET_SCHEMA_FILE_PATH, "r")174def test_load_type_schema_from_file_uri_error_fallback_to_default(loader):175    patch_file = patch("builtins.open", mock_open())176    patch_load_from_uri = patch.object(177        loader, "load_type_schema_from_uri", wraps=loader.load_type_schema_from_uri178    )179    patch_load_file = patch.object(180        loader, "load_type_schema_from_file", wraps=loader.load_type_schema_from_file181    )182    with patch_file as mock_file, patch_load_from_uri as mock_load_from_uri, patch_load_file as mock_load_file:183        mock_file.side_effect = FileNotFoundError()184        type_schema = loader.load_type_schema(185            TEST_TARGET_SCHEMA_FILE_URI, OTHER_TEST_TARGET_FALLBACK_SCHEMA186        )187    assert_dict_equals(OTHER_TEST_TARGET_FALLBACK_SCHEMA, type_schema)188    mock_load_from_uri.assert_called_with(189        TEST_TARGET_SCHEMA_FILE_URI, OTHER_TEST_TARGET_FALLBACK_SCHEMA190    )191    mock_load_file.assert_called_with(192        TEST_TARGET_SCHEMA_FILE_PATH, OTHER_TEST_TARGET_FALLBACK_SCHEMA193    )194    mock_file.assert_called_with(TEST_TARGET_SCHEMA_FILE_PATH, "r")195def test_load_type_schema_from_https_url(loader):196    mock_request = Mock()197    mock_request.status_code = 200198    mock_request.content = TEST_TARGET_SCHEMA_JSON.encode("utf-8")199    patch_get_request = patch(200        "rpdk.core.type_schema_loader.requests.get", return_value=mock_request201    )202    patch_load_from_uri = patch.object(203        loader, "load_type_schema_from_uri", wraps=loader.load_type_schema_from_uri204    )205    patch_get_from_url = patch.object(206        loader, "_get_type_schema_from_url", wraps=loader._get_type_schema_from_url207    )208    with patch_get_request as mock_get_request, patch_load_from_uri as mock_load_from_uri, patch_get_from_url as mock_get_from_url:209        type_schema = loader.load_type_schema(TEST_HTTPS_TARGET_SCHEMA_URI)210    assert_dict_equals(TEST_TARGET_SCHEMA, type_schema)211    mock_load_from_uri.assert_called_with(TEST_HTTPS_TARGET_SCHEMA_URI, None)212    mock_get_from_url.assert_called_with(TEST_HTTPS_TARGET_SCHEMA_URI, None)213    mock_get_request.assert_called_with(TEST_HTTPS_TARGET_SCHEMA_URI, timeout=60)214def test_load_type_schema_from_https_url_unsuccessful(loader):215    mock_request = Mock()216    mock_request.status_code = 404217    patch_get_request = patch(218        "rpdk.core.type_schema_loader.requests.get", return_value=mock_request219    )220    patch_load_from_uri = patch.object(221        loader, "load_type_schema_from_uri", wraps=loader.load_type_schema_from_uri222    )223    patch_get_from_url = patch.object(224        loader, "_get_type_schema_from_url", wraps=loader._get_type_schema_from_url225    )226    with patch_get_request as mock_get_request, patch_load_from_uri as mock_load_from_uri, patch_get_from_url as mock_get_from_url:227        type_schema = loader.load_type_schema(TEST_HTTPS_TARGET_SCHEMA_URI)228    assert not type_schema229    mock_load_from_uri.assert_called_with(TEST_HTTPS_TARGET_SCHEMA_URI, None)230    mock_get_from_url.assert_called_with(TEST_HTTPS_TARGET_SCHEMA_URI, None)231    mock_get_request.assert_called_with(TEST_HTTPS_TARGET_SCHEMA_URI, timeout=60)232def test_load_type_schema_from_https_url_unsuccessful_fallback_to_default(loader):233    mock_request = Mock()234    mock_request.status_code = 404235    patch_get_request = patch(236        "rpdk.core.type_schema_loader.requests.get", return_value=mock_request237    )238    patch_load_from_uri = patch.object(239        loader, "load_type_schema_from_uri", wraps=loader.load_type_schema_from_uri240    )241    patch_get_from_url = patch.object(242        loader, "_get_type_schema_from_url", wraps=loader._get_type_schema_from_url243    )244    with patch_get_request as mock_get_request, patch_load_from_uri as mock_load_from_uri, patch_get_from_url as mock_get_from_url:245        type_schema = loader.load_type_schema(246            TEST_HTTPS_TARGET_SCHEMA_URI, OTHER_TEST_TARGET_FALLBACK_SCHEMA247        )248    assert_dict_equals(OTHER_TEST_TARGET_FALLBACK_SCHEMA, type_schema)249    mock_load_from_uri.assert_called_with(250        TEST_HTTPS_TARGET_SCHEMA_URI, OTHER_TEST_TARGET_FALLBACK_SCHEMA251    )252    mock_get_from_url.assert_called_with(253        TEST_HTTPS_TARGET_SCHEMA_URI, OTHER_TEST_TARGET_FALLBACK_SCHEMA254    )255    mock_get_request.assert_called_with(TEST_HTTPS_TARGET_SCHEMA_URI, timeout=60)256def test_load_type_schema_from_s3(loader):257    loader.s3_client.get_object.return_value = {258        "Body": BytesIO(TEST_TARGET_SCHEMA_JSON.encode("utf-8"))259    }260    patch_load_from_uri = patch.object(261        loader, "load_type_schema_from_uri", wraps=loader.load_type_schema_from_uri262    )263    patch_get_from_s3 = patch.object(264        loader, "_get_type_schema_from_s3", wraps=loader._get_type_schema_from_s3265    )266    with patch_load_from_uri as mock_load_from_uri, patch_get_from_s3 as mock_get_from_s3:267        type_schema = loader.load_type_schema(TEST_S3_TARGET_SCHEMA_URI)268    assert_dict_equals(TEST_TARGET_SCHEMA, type_schema)269    mock_load_from_uri.assert_called_with(TEST_S3_TARGET_SCHEMA_URI, None)270    mock_get_from_s3.assert_called_with(271        TEST_TARGET_SCHEMA_BUCKET, TEST_TARGET_SCHEMA_KEY, None272    )273    loader.s3_client.get_object.assert_called_once_with(274        Bucket=TEST_TARGET_SCHEMA_BUCKET, Key=TEST_TARGET_SCHEMA_KEY275    )276def test_load_type_schema_from_s3_client_error(loader):277    loader.s3_client.get_object.side_effect = ClientError(278        {"Error": {"Code": "", "Message": "Bucket does not exist"}},279        "get_object",280    )281    patch_load_from_uri = patch.object(282        loader, "load_type_schema_from_uri", wraps=loader.load_type_schema_from_uri283    )284    patch_get_from_s3 = patch.object(285        loader, "_get_type_schema_from_s3", wraps=loader._get_type_schema_from_s3286    )287    with patch_load_from_uri as mock_load_from_uri, patch_get_from_s3 as mock_get_from_s3:288        type_schema = loader.load_type_schema(TEST_S3_TARGET_SCHEMA_URI)289    assert not type_schema290    mock_load_from_uri.assert_called_with(TEST_S3_TARGET_SCHEMA_URI, None)291    mock_get_from_s3.assert_called_with(292        TEST_TARGET_SCHEMA_BUCKET, TEST_TARGET_SCHEMA_KEY, None293    )294    loader.s3_client.get_object.assert_called_once_with(295        Bucket=TEST_TARGET_SCHEMA_BUCKET, Key=TEST_TARGET_SCHEMA_KEY296    )297def test_load_type_schema_from_s3_error_fallback_to_default(loader):298    loader.s3_client.get_object.side_effect = ClientError(299        {"Error": {"Code": "", "Message": "Bucket does not exist"}},300        "get_object",301    )302    patch_load_from_uri = patch.object(303        loader, "load_type_schema_from_uri", wraps=loader.load_type_schema_from_uri304    )305    patch_get_from_s3 = patch.object(306        loader, "_get_type_schema_from_s3", wraps=loader._get_type_schema_from_s3307    )308    with patch_load_from_uri as mock_load_from_uri, patch_get_from_s3 as mock_get_from_s3:309        type_schema = loader.load_type_schema(310            TEST_S3_TARGET_SCHEMA_URI, OTHER_TEST_TARGET_FALLBACK_SCHEMA311        )312    assert_dict_equals(OTHER_TEST_TARGET_FALLBACK_SCHEMA, type_schema)313    mock_load_from_uri.assert_called_with(314        TEST_S3_TARGET_SCHEMA_URI, OTHER_TEST_TARGET_FALLBACK_SCHEMA315    )316    mock_get_from_s3.assert_called_with(317        TEST_TARGET_SCHEMA_BUCKET,318        TEST_TARGET_SCHEMA_KEY,319        OTHER_TEST_TARGET_FALLBACK_SCHEMA,320    )321    loader.s3_client.get_object.assert_called_once_with(322        Bucket=TEST_TARGET_SCHEMA_BUCKET, Key=TEST_TARGET_SCHEMA_KEY323    )324def test_load_type_schema_from_cfn_registry(loader):325    loader.cfn_client.describe_type.return_value = {326        "Schema": TEST_TARGET_SCHEMA_JSON,327        "Type": "RESOURCE",328        "ProvisioningType": "FULLY_MUTABLE",329    }330    type_schema, target_type, provisioning_type = loader.load_schema_from_cfn_registry(331        TEST_TARGET_TYPE_NAME, "RESOURCE"332    )333    assert_dict_equals(TEST_TARGET_SCHEMA, type_schema)334    assert target_type == "RESOURCE"335    assert provisioning_type == "FULLY_MUTABLE"336    loader.cfn_client.describe_type.assert_called_once_with(337        Type="RESOURCE", TypeName=TEST_TARGET_TYPE_NAME338    )339def test_load_type_schema_from_cfn_registry_client_error(loader):340    loader.cfn_client.describe_type.side_effect = ClientError(341        {"Error": {"Code": "", "Message": "Type does not exist"}},342        "get_object",343    )344    type_schema, target_type, provisioning_type = loader.load_schema_from_cfn_registry(345        TEST_TARGET_TYPE_NAME, "RESOURCE"346    )347    assert not type_schema348    assert not target_type349    assert not provisioning_type350    loader.cfn_client.describe_type.assert_called_once_with(351        Type="RESOURCE", TypeName=TEST_TARGET_TYPE_NAME352    )353def test_load_type_schema_from_cfn_registry_error_fallback_to_default(loader):354    loader.cfn_client.describe_type.side_effect = ClientError(355        {"Error": {"Code": "", "Message": "Type does not exist"}},356        "get_object",357    )358    type_schema, target_type, provisioning_type = loader.load_schema_from_cfn_registry(359        TEST_TARGET_TYPE_NAME, "RESOURCE", OTHER_TEST_TARGET_FALLBACK_SCHEMA360    )361    assert_dict_equals(OTHER_TEST_TARGET_FALLBACK_SCHEMA, type_schema)362    assert not target_type363    assert not provisioning_type364    loader.cfn_client.describe_type.assert_called_once_with(365        Type="RESOURCE", TypeName=TEST_TARGET_TYPE_NAME366    )367def test_get_provision_type(loader):368    loader.cfn_client.describe_type.return_value = {369        "Schema": TEST_TARGET_SCHEMA_JSON,370        "Type": "RESOURCE",371        "ProvisioningType": "IMMUTABLE",372    }373    provisioning_type = loader.get_provision_type(TEST_TARGET_TYPE_NAME, "RESOURCE")374    assert provisioning_type == "IMMUTABLE"375    loader.cfn_client.describe_type.assert_called_once_with(376        Type="RESOURCE", TypeName=TEST_TARGET_TYPE_NAME377    )378def test_get_provision_type_client_error(loader):379    loader.cfn_client.describe_type.side_effect = ClientError(380        {"Error": {"Code": "", "Message": "Type does not exist"}},381        "get_object",382    )383    provisioning_type = loader.get_provision_type(TEST_TARGET_TYPE_NAME, "RESOURCE")384    assert not provisioning_type385    loader.cfn_client.describe_type.assert_called_once_with(386        Type="RESOURCE", TypeName=TEST_TARGET_TYPE_NAME387    )388def test_load_type_schema_null_input(loader):389    type_schema = loader.load_type_schema(None, OTHER_TEST_TARGET_FALLBACK_SCHEMA)390    assert_dict_equals(OTHER_TEST_TARGET_FALLBACK_SCHEMA, type_schema)391    type_schema = loader.load_type_schema_from_json(392        None, OTHER_TEST_TARGET_FALLBACK_SCHEMA393    )394    assert_dict_equals(OTHER_TEST_TARGET_FALLBACK_SCHEMA, type_schema)395    type_schema = loader.load_type_schema_from_uri(396        None, OTHER_TEST_TARGET_FALLBACK_SCHEMA397    )398    assert_dict_equals(OTHER_TEST_TARGET_FALLBACK_SCHEMA, type_schema)399    type_schema = loader.load_type_schema_from_file(400        None, OTHER_TEST_TARGET_FALLBACK_SCHEMA401    )402    assert_dict_equals(OTHER_TEST_TARGET_FALLBACK_SCHEMA, type_schema)403def test_load_type_schema_invalid_input(loader):404    type_schema = loader.load_type_schema(405        "This is invalid input", OTHER_TEST_TARGET_FALLBACK_SCHEMA406    )407    assert_dict_equals(OTHER_TEST_TARGET_FALLBACK_SCHEMA, type_schema)408    with patch(409        "rpdk.core.type_schema_loader.is_valid_type_schema_uri", return_value=True410    ):411        type_schema = loader.load_type_schema_from_uri(412            "ftp://unsupportedurlschema.com/test-schema.json",413            OTHER_TEST_TARGET_FALLBACK_SCHEMA,414        )415    assert_dict_equals(OTHER_TEST_TARGET_FALLBACK_SCHEMA, type_schema)416@pytest.mark.parametrize(417    "uri",418    [...action.py
Source:action.py  
1# Copyright 2019 Ben Kehoe2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7#     http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14name_pattern = r"^\w+$"15action_response_schema = {16    "type": "object",17    "properties": {18        "json": {19            "type": "object"20        },21        "html": {22            "type": "string"23        },24        "text": {25            "type": "string"26        },27        "redirect": {28            "type": "string",29            "format": "uri"30        }31    }32}33def _get_action_schema(34        type_schema,35        properties={},36        required=[],37        schema={}):38    ts = {39        "type": "string"40    }41    ts.update(type_schema)42    props = {43        "name": {44            "type": "string",45            "pattern": name_pattern46        },47        "type": ts,48        "response": action_response_schema49    }50    props.update(properties)51    req = ["name", "type"]52    req.extend(required)53    s = {54        "type": "object",55        "properties": props,56        "required": req57    }58    s.update(schema)59    return s60def _get_post_outcome_schema(61        type_schema,62        properties={},63        required=[],64        schema={}):65    p = {66        "schema": {67            "type": "object"68        }69    }70    p.update(properties)71    req = ["schema"]72    req.extend(required)73    return _get_action_schema(74        type_schema,75        properties=p,76        required=req,77        schema=schema78    )79post_outcome_success_schema = _get_post_outcome_schema(80    type_schema={"const": "success"},81    schema={82        "oneOf": [83            {84                "properties": {85                    "output_body": {86                        "const": True,87                    }88                },89                "required": ["output_body"]90            },91            {92                "properties": {93                    "output_path": {94                        "type": "string"95                    }96                },97                "required": ["output_path"]98            },99            {100                "properties": {101                    "output": {}102                },103                "required": ["output"]104            }105        ]106    }107)108post_outcome_failure_schema = _get_post_outcome_schema(109    type_schema={"const": "failure"},110    properties={111        "error": {112            "type": "string"113        },114        "error_path": {115            "type": "string"116        },117        "cause": {118            "type": "string"119        },120        "cause_path": {121            "type": "string"122        }123    },124    schema={125        "allOf": [126            {127                "not": {128                    "required": ["error", "error_path"]129                }130            },131            {132                "not": {133                    "required": ["cause", "cause_path"]134                }135            }136        ]137    }138)139post_outcome_heartbeat_schema = _get_post_outcome_schema(140    type_schema={"const": "heartbeat"}141)142post_outcome_schema = {143    "oneOf": [144        post_outcome_success_schema,145        post_outcome_failure_schema,146        post_outcome_heartbeat_schema147    ]148}149post_action_schema = _get_action_schema(150    type_schema={"const": "post"},151    properties={152        "outcomes": {153            "type": "array",154            "items": post_outcome_schema,155            "minItems": 1156        }157    }158)159success_action_schema = _get_action_schema(160    type_schema={"const": "success"},161    properties={162        "output": {}163    },164    required=["output"]165)166failure_action_schema = _get_action_schema(167    type_schema={"const": "failure"},168    properties={169        "error": {170            "type": "string"171        },172        "cause": {173            "type": "string"174        }175    },176)177heartbeat_action_schema = _get_action_schema(178    type_schema={"const": "heartbeat"}179)180action_schema = {181    "oneOf": [182        success_action_schema,183        failure_action_schema,184        heartbeat_action_schema,185        post_action_schema186    ]...transform.py
Source:transform.py  
1class InvalidData(Exception):2    """Raise when data doesn't validate the schema"""3def transform_row(row, schema):4    return _transform_field(row, schema)5def _transform_datetime(value):6    return value + "Z"7def _any_of(data, schema_list):8    for schema in schema_list:9        try:10            return _transform_field(data, schema)11        except Exception:12            pass13    raise InvalidData("{} doesn't match any of {}".format(data, schema_list))14def _array(data, items_schema):15    return [_transform_field(value, items_schema) for value in data]16def _object(data, properties_schema):17    return {field: _transform_field(data[field], field_schema)18            for field, field_schema in properties_schema.items()19            if field in data}20def _type_transform(value, type_schema):21    if isinstance(type_schema, list):22        for typ in type_schema:23            try:24                return _type_transform(value, typ)25            except Exception:26                pass27        raise InvalidData("{} doesn't match any of {}".format(value, type_schema))28    if value is None:29        if type_schema != "null":30            raise InvalidData("Null is not allowed")31        else:32            return None33    if type_schema == "string":34        return str(value)35    if type_schema == "integer":36        return int(value)37    if type_schema == "number":38        return float(value)39    if type_schema == "boolean":40        return bool(value)41    raise InvalidData("Unknown type {}".format(type_schema))42def _format_transform(value, format_schema):43    if format_schema == "date-time":44        return _transform_datetime(value)45    raise InvalidData("Unknown format {}".format(format_schema))46def _transform_field(value, field_schema):47    if "anyOf" in field_schema:48        return _any_of(value, field_schema["anyOf"])49    if field_schema["type"] == "array":50        return _array(value, field_schema["items"])51    if field_schema["type"] == "object":52        return _object(value, field_schema["properties"])53    value = _type_transform(value, field_schema["type"])54    if "format" in field_schema:55        value = _format_transform(value, field_schema["format"])...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
