Best Python code snippet using pytest-asyncio_python
test_protocol.py
Source:test_protocol.py  
1"""2    Copyright 2018 Inmanta3    Licensed under the Apache License, Version 2.0 (the "License");4    you may not use this file except in compliance with the License.5    You may obtain a copy of the License at6        http://www.apache.org/licenses/LICENSE-2.07    Unless required by applicable law or agreed to in writing, software8    distributed under the License is distributed on an "AS IS" BASIS,9    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.10    See the License for the specific language governing permissions and11    limitations under the License.12    Contact: code@inmanta.com13"""14import asyncio15import base6416import datetime17import json18import os19import random20import threading21import time22import urllib.parse23import uuid24from enum import Enum25from itertools import chain26from typing import Any, Dict, Iterator, List, Optional, Union27import pydantic28import pytest29import tornado30from pydantic.types import StrictBool31from tornado import gen, web32from tornado.httpclient import AsyncHTTPClient, HTTPRequest33from tornado.httputil import url_concat34from tornado.platform.asyncio import AnyThreadEventLoopPolicy35from inmanta import config, const, protocol36from inmanta.const import ClientType37from inmanta.data.model import BaseModel38from inmanta.protocol import VersionMatch, exceptions, json_encode39from inmanta.protocol.common import (40    HTML_CONTENT,41    HTML_CONTENT_WITH_UTF8_CHARSET,42    OCTET_STREAM_CONTENT,43    ZIP_CONTENT,44    ArgOption,45    InvalidMethodDefinition,46    InvalidPathException,47    MethodProperties,48    Result,49    ReturnValue,50)51from inmanta.protocol.methods import ENV_OPTS52from inmanta.protocol.rest import CallArguments53from inmanta.protocol.return_value_meta import ReturnValueWithMeta54from inmanta.server import config as opt55from inmanta.server.config import server_bind_port56from inmanta.server.protocol import Server, ServerSlice57from inmanta.types import Apireturn58from inmanta.util import hash_file59from utils import configure60def make_random_file(size=0):61    """62    Generate a random file.63    :param size: If size is > 0 content is generated that is equal or more than size.64    """65    randomvalue = str(random.randint(0, 10000))66    if size > 0:67        while len(randomvalue) < size:68            randomvalue += randomvalue69    content = ("Hello world %s\n" % (randomvalue)).encode()70    hash = hash_file(content)71    body = base64.b64encode(content).decode("ascii")72    return (hash, content, body)73async def test_client_files(client):74    (hash, content, body) = make_random_file()75    # Check if the file exists76    result = await client.stat_file(id=hash)77    assert result.code == 40478    # Create the file79    result = await client.upload_file(id=hash, content=body)80    assert result.code == 20081    # Get the file82    result = await client.get_file(id=hash)83    assert result.code == 20084    assert "content" in result.result85    assert result.result["content"] == body86async def test_client_files_lost(client):87    (hash, content, body) = make_random_file()88    # Get the file89    result = await client.get_file(id=hash)90    assert result.code == 40491async def test_sync_client_files(client):92    # work around for https://github.com/pytest-dev/pytest-asyncio/issues/16893    asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy())94    done = []95    limit = 10096    sleep = 0.0197    def do_test():98        sync_client = protocol.SyncClient("client")99        (hash, content, body) = make_random_file()100        # Check if the file exists101        result = sync_client.stat_file(id=hash)102        assert result.code == 404103        # Create the file104        result = sync_client.upload_file(id=hash, content=body)105        assert result.code == 200106        # Get the file107        result = sync_client.get_file(id=hash)108        assert result.code == 200109        assert "content" in result.result110        assert result.result["content"] == body111        done.append(True)112    thread = threading.Thread(target=do_test)113    thread.start()114    while len(done) == 0 and limit > 0:115        await gen.sleep(sleep)116        limit -= 1117    thread.join()118    assert len(done) > 0119async def test_client_files_stat(client):120    file_names = []121    i = 0122    while i < 10:123        (hash, content, body) = make_random_file()124        if hash not in file_names:125            file_names.append(hash)126            result = await client.upload_file(id=hash, content=body)127            assert result.code == 200128            i += 1129    result = await client.stat_files(files=file_names)130    assert len(result.result["files"]) == 0131    other_files = ["testtest"]132    result = await client.stat_files(files=file_names + other_files)133    assert len(result.result["files"]) == len(other_files)134async def test_diff(client):135    ca = "Hello world\n".encode()136    ha = hash_file(ca)137    result = await client.upload_file(id=ha, content=base64.b64encode(ca).decode("ascii"))138    assert result.code == 200139    cb = "Bye bye world\n".encode()140    hb = hash_file(cb)141    result = await client.upload_file(id=hb, content=base64.b64encode(cb).decode("ascii"))142    assert result.code == 200143    diff = await client.diff(ha, hb)144    assert diff.code == 200145    assert len(diff.result["diff"]) == 5146    diff = await client.diff(0, hb)147    assert diff.code == 200148    assert len(diff.result["diff"]) == 4149    diff = await client.diff(ha, 0)150    assert diff.code == 200151    assert len(diff.result["diff"]) == 4152async def test_client_files_bad(server, client):153    (hash, content, body) = make_random_file()154    # Create the file155    result = await client.upload_file(id=hash + "a", content=body)156    assert result.code == 400157async def test_client_files_corrupt(client):158    (hash, content, body) = make_random_file()159    # Create the file160    result = await client.upload_file(id=hash, content=body)161    assert result.code == 200162    state_dir = opt.state_dir.get()163    file_dir = os.path.join(state_dir, "server", "files")164    file_name = os.path.join(file_dir, hash)165    with open(file_name, "wb+") as fd:166        fd.write("Haha!".encode())167    opt.server_delete_currupt_files.set("false")168    result = await client.get_file(id=hash)169    assert result.code == 500170    result = await client.upload_file(id=hash, content=body)171    assert result.code == 500172    opt.server_delete_currupt_files.set("true")173    result = await client.get_file(id=hash)174    assert result.code == 500175    result = await client.upload_file(id=hash, content=body)176    assert result.code == 200177async def test_gzip_encoding(server):178    """179    Test if the server accepts gzipped encoding and returns gzipped encoding.180    """181    (hash, content, body) = make_random_file(size=1024)182    port = opt.get_bind_port()183    url = "http://localhost:%s/api/v1/file/%s" % (port, hash)184    zipped, body = protocol.gzipped_json({"content": body})185    assert zipped186    request = HTTPRequest(187        url=url,188        method="PUT",189        headers={"Accept-Encoding": "gzip", "Content-Encoding": "gzip"},190        body=body,191        decompress_response=True,192    )193    client = AsyncHTTPClient()194    response = await client.fetch(request)195    assert response.code == 200196    request = HTTPRequest(url=url, method="GET", headers={"Accept-Encoding": "gzip"}, decompress_response=True)197    client = AsyncHTTPClient()198    response = await client.fetch(request)199    assert response.code == 200200    assert response.headers["X-Consumed-Content-Encoding"] == "gzip"201class MainHandler(web.RequestHandler):202    def get(self):203        time.sleep(1.1)204@pytest.fixture(scope="function")205async def app(unused_tcp_port):206    http_app = web.Application([(r"/api/v1/file/abc", MainHandler)])207    server = tornado.httpserver.HTTPServer(http_app)208    server.bind(unused_tcp_port)209    server.start()210    yield server211    server.stop()212    await server.close_all_connections()213async def test_timeout_error(app):214    """215    Test test verifies that the protocol client can handle requests that timeout. This means it receives a http error216    status that is not generated by the server but by the client.217    """218    from inmanta.config import Config219    Config.load_config()220    port = str(list(app._sockets.values())[0].getsockname()[1])221    Config.set("client_rest_transport", "port", port)222    Config.set("client_rest_transport", "request_timeout", "1")223    from inmanta import protocol224    client = protocol.Client("client")225    x = await client.get_file(id="abc")226    assert x.code == 599227    assert "message" in x.result228async def test_method_properties():229    """230    Test method properties decorator and helper functions231    """232    @protocol.method(path="/test", operation="PUT", client_types=["api"], api_prefix="x", api_version=2)233    def test_method(name):234        """235        Create a new project236        """237    props = protocol.common.MethodProperties.methods["test_method"][0]238    assert "Authorization" in props.get_call_headers()239    assert props.get_listen_url() == "/x/v2/test"240    assert props.get_call_url({}) == "/x/v2/test"241async def test_invalid_client_type():242    """243    Test invalid client ype244    """245    with pytest.raises(InvalidMethodDefinition) as e:246        @protocol.method(path="/test", operation="PUT", client_types=["invalid"])247        def test_method(name):248            """249            Create a new project250            """251        assert "Invalid client type invalid specified for function" in str(e)252async def test_call_arguments_defaults():253    """254    Test processing RPC messages255    """256    @protocol.method(path="/test", operation="PUT", client_types=["api"])257    def test_method(name: str, value: int = 10):258        """259        Create a new project260        """261    call = CallArguments(protocol.common.MethodProperties.methods["test_method"][0], {"name": "test"}, {})262    await call.process()263    assert call.call_args["name"] == "test"264    assert call.call_args["value"] == 10265def test_create_client():266    with pytest.raises(AssertionError):267        protocol.SyncClient("agent", "120")268    with pytest.raises(AssertionError):269        protocol.Client("agent", "120")270async def test_pydantic():271    """272    Test validating pydantic objects273    """274    class Project(BaseModel):275        id: uuid.UUID276        name: str277    @protocol.method(path="/test", operation="PUT", client_types=["api"])278    def test_method(project: Project):279        """280        Create a new project281        """282    id = uuid.uuid4()283    call = CallArguments(284        protocol.common.MethodProperties.methods["test_method"][0], {"project": {"name": "test", "id": str(id)}}, {}285    )286    await call.process()287    project = call.call_args["project"]288    assert project.name == "test"289    assert project.id == id290    with pytest.raises(exceptions.BadRequest):291        call = CallArguments(292            protocol.common.MethodProperties.methods["test_method"][0], {"project": {"name": "test", "id": "abcd"}}, {}293        )294        await call.process()295def test_pydantic_json():296    """297    Test running pydanyic objects through the json encoder298    """299    class Options(str, Enum):300        yes = "yes"301        no = "no"302    class Project(BaseModel):303        id: uuid.UUID304        name: str305        opts: Options306    project = Project(id=uuid.uuid4(), name="test", opts="no")307    assert project.opts == Options.no308    json_string = json_encode(project)309    data = json.loads(json_string)310    assert "id" in data311    assert "name" in data312    assert data["id"] == str(project.id)313    assert data["name"] == "test"314    # Now create the project again315    new = Project(**data)316    assert project == new317    assert project is not new318async def test_pydantic_alias(unused_tcp_port, postgres_db, database_name, async_finalizer):319    """320    Round trip test on aliased object321    """322    configure(unused_tcp_port, database_name, postgres_db.port)323    class Project(BaseModel):324        source: str325        validate_: bool326        class Config:327            fields = {"validate_": {"alias": "validate"}}328    class ProjectServer(ServerSlice):329        @protocol.typedmethod(path="/test", operation="POST", client_types=["api"])330        def test_method(project: Project) -> ReturnValue[Project]:  # NOQA331            """332            Create a new project333            """334        @protocol.typedmethod(path="/test2", operation="POST", client_types=["api"])335        def test_method2(project: List[Project]) -> ReturnValue[List[Project]]:  # NOQA336            """337            Create a new project338            """339        @protocol.handle(test_method)340        async def test_methodi(self, project: Project) -> ReturnValue[Project]:341            new_project = project.copy()342            return ReturnValue(response=new_project)343        @protocol.handle(test_method2)344        async def test_method2i(self, project: List[Project]) -> ReturnValue[List[Project]]:345            return ReturnValue(response=project)346    rs = Server()347    server = ProjectServer(name="projectserver")348    rs.add_slice(server)349    await rs.start()350    async_finalizer.add(server.stop)351    async_finalizer.add(rs.stop)352    client = protocol.Client("client")353    projectt = Project(id=uuid.uuid4(), source="test", validate=True)354    assert projectt.validate_ is True355    projectf = Project(id=uuid.uuid4(), source="test", validate=False)356    assert projectf.validate_ is False357    async def roundtrip(obj: Project) -> None:358        data = await client.test_method(obj)359        assert obj.validate_ == data.result["data"]["validate"]360        data = await client.test_method2([obj])361        assert obj.validate_ == data.result["data"][0]["validate"]362    await roundtrip(projectf)363    await roundtrip(projectt)364async def test_return_non_warnings(unused_tcp_port, postgres_db, database_name, async_finalizer):365    """366    Test return none but pushing warnings367    """368    configure(unused_tcp_port, database_name, postgres_db.port)369    class ProjectServer(ServerSlice):370        @protocol.typedmethod(path="/test", operation="POST", client_types=["api"])371        def test_method(name: str) -> ReturnValue[None]:  # NOQA372            """373            Create a new project374            """375        @protocol.handle(test_method)376        async def test_method_handler(self, name) -> ReturnValue[None]:377            rv = ReturnValue()378            rv.add_warnings(["error1", "error2"])379            return rv380    rs = Server()381    server = ProjectServer(name="projectserver")382    rs.add_slice(server)383    await rs.start()384    async_finalizer.add(server.stop)385    async_finalizer.add(rs.stop)386    client = protocol.Client("client")387    response = await client.test_method("x")388    assert response.code == 200389    assert "data" in response.result390    assert response.result["data"] is None391    assert "metadata" in response.result392    assert "warnings" in response.result["metadata"]393    assert "error1" in response.result["metadata"]["warnings"]394async def test_invalid_handler():395    """396    Handlers should be async397    """398    with pytest.raises(ValueError):399        class ProjectServer(ServerSlice):400            @protocol.method(path="/test", operation="POST", client_types=["api"])401            def test_method(self):402                """403                Create a new project404                """405            @protocol.handle(test_method)406            def test_method(self):407                return408async def test_return_value(unused_tcp_port, postgres_db, database_name, async_finalizer):409    """410    Test the use and validation of methods that use common.ReturnValue411    """412    configure(unused_tcp_port, database_name, postgres_db.port)413    class Project(BaseModel):414        id: uuid.UUID415        name: str416    class ProjectServer(ServerSlice):417        @protocol.method(path="/test", operation="POST", client_types=["api"])418        def test_method(project: Project) -> ReturnValue[Project]:  # NOQA419            """420            Create a new project421            """422        @protocol.handle(test_method)423        async def test_method(self, project: Project) -> ReturnValue[Project]:424            new_project = project.copy()425            return ReturnValue(response=new_project)426    rs = Server()427    server = ProjectServer(name="projectserver")428    rs.add_slice(server)429    await rs.start()430    async_finalizer.add(server.stop)431    async_finalizer.add(rs.stop)432    client = protocol.Client("client")433    result = await client.test_method({"name": "test", "id": str(uuid.uuid4())})434    assert result.code == 200435    assert "id" in result.result436    assert "name" in result.result437async def test_return_model(unused_tcp_port, postgres_db, database_name, async_finalizer):438    """439    Test the use and validation of methods that use common.ReturnValue440    """441    configure(unused_tcp_port, database_name, postgres_db.port)442    class Project(BaseModel):443        id: uuid.UUID444        name: str445    class ProjectServer(ServerSlice):446        @protocol.method(path="/test", operation="POST", client_types=["api"])447        def test_method(project: Project) -> Project:  # NOQA448            """449            Create a new project450            """451        @protocol.method(path="/test2", operation="POST", client_types=["api"])452        def test_method2(project: Project) -> None:  # NOQA453            pass454        @protocol.method(path="/test3", operation="POST", client_types=["api"])455        def test_method3(project: Project) -> None:  # NOQA456            pass457        @protocol.handle(test_method)458        async def test_method(self, project: Project) -> Project:459            new_project = project.copy()460            return new_project461        @protocol.handle(test_method2)462        async def test_method2(self, project: Project) -> None:463            pass464        @protocol.handle(test_method3)465        async def test_method3(self, project: Project) -> None:466            return 1467    rs = Server()468    server = ProjectServer(name="projectserver")469    rs.add_slice(server)470    await rs.start()471    async_finalizer.add(server.stop)472    async_finalizer.add(rs.stop)473    client = protocol.Client("client")474    result = await client.test_method({"name": "test", "id": str(uuid.uuid4())})475    assert result.code == 200476    assert "id" in result.result477    assert "name" in result.result478    result = await client.test_method2({"name": "test", "id": str(uuid.uuid4())})479    assert result.code == 200480    result = await client.test_method3({"name": "test", "id": str(uuid.uuid4())})481    assert result.code == 500482async def test_data_envelope(unused_tcp_port, postgres_db, database_name, async_finalizer):483    """484    Test the use and validation of methods that use common.ReturnValue485    """486    configure(unused_tcp_port, database_name, postgres_db.port)487    class Project(BaseModel):488        id: uuid.UUID489        name: str490    class ProjectServer(ServerSlice):491        @protocol.typedmethod(path="/test", operation="POST", client_types=["api"])492        def test_method(project: Project) -> ReturnValue[Project]:  # NOQA493            pass494        @protocol.handle(test_method)495        async def test_method(self, project: Project) -> ReturnValue[Project]:496            new_project = project.copy()497            return ReturnValue(response=new_project)498        @protocol.typedmethod(path="/test2", operation="POST", client_types=["api"], envelope_key="method")499        def test_method2(project: Project) -> ReturnValue[Project]:  # NOQA500            pass501        @protocol.handle(test_method2)502        async def test_method2(self, project: Project) -> ReturnValue[Project]:503            new_project = project.copy()504            return ReturnValue(response=new_project)505        @protocol.method(path="/test3", operation="POST", client_types=["api"], envelope=True)506        def test_method3(project: Project):  # NOQA507            pass508        @protocol.handle(test_method3)509        async def test_method3(self, project: dict) -> Apireturn:510            return 200, {"id": 1, "name": 2}511        @protocol.method(path="/test4", operation="POST", client_types=["api"], envelope=True, envelope_key="project")512        def test_method4(project: Project):  # NOQA513            pass514        @protocol.handle(test_method4)515        async def test_method4(self, project: dict) -> Apireturn:516            return 200, {"id": 1, "name": 2}517    rs = Server()518    server = ProjectServer(name="projectserver")519    rs.add_slice(server)520    await rs.start()521    async_finalizer.add(server.stop)522    async_finalizer.add(rs.stop)523    client = protocol.Client("client")524    # 1525    result = await client.test_method({"name": "test", "id": str(uuid.uuid4())})526    assert result.code == 200527    assert "data" in result.result528    assert "id" in result.result["data"]529    assert "name" in result.result["data"]530    # 2531    result = await client.test_method2({"name": "test", "id": str(uuid.uuid4())})532    assert result.code == 200533    assert "method" in result.result534    assert "id" in result.result["method"]535    assert "name" in result.result["method"]536    # 3537    result = await client.test_method3({"name": "test", "id": str(uuid.uuid4())})538    assert result.code == 200539    assert "data" in result.result540    assert "id" in result.result["data"]541    assert "name" in result.result["data"]542    # 4543    result = await client.test_method4({"name": "test", "id": str(uuid.uuid4())})544    assert result.code == 200545    assert "project" in result.result546    assert "id" in result.result["project"]547    assert "name" in result.result["project"]548async def test_invalid_paths():549    """550    Test path validation551    """552    with pytest.raises(InvalidPathException) as e:553        @protocol.method(path="test", operation="PUT", client_types=["api"], api_prefix="x", api_version=2)554        def test_method(name):555            pass556    assert "test should start with a /" == str(e.value)557    with pytest.raises(InvalidPathException) as e:558        @protocol.method(path="/test/<othername>", operation="PUT", client_types=["api"], api_prefix="x", api_version=2)559        def test_method2(name):560            pass561    assert str(e.value).startswith("Variable othername in path /test/<othername> is not defined in function")562async def test_nested_paths(unused_tcp_port, postgres_db, database_name, async_finalizer):563    """Test overlapping path definition"""564    configure(unused_tcp_port, database_name, postgres_db.port)565    class Project(BaseModel):566        name: str567    class ProjectServer(ServerSlice):568        @protocol.typedmethod(path="/test/<data>", operation="GET", client_types=["api"])569        def test_method(data: str) -> Project:  # NOQA570            pass571        @protocol.typedmethod(path="/test/<data>/config", operation="GET", client_types=["api"])572        def test_method2(data: str) -> Project:  # NOQA573            pass574        @protocol.handle(test_method)575        async def test_method(self, data: str) -> Project:576            # verify that URL encoded data is properly decoded577            assert "%20" not in data578            return Project(name="test_method")579        @protocol.handle(test_method2)580        async def test_method2(self, data: str) -> Project:581            return Project(name="test_method2")582    rs = Server()583    server = ProjectServer(name="projectserver")584    rs.add_slice(server)585    await rs.start()586    async_finalizer.add(server.stop)587    async_finalizer.add(rs.stop)588    client = protocol.Client("client")589    result = await client.test_method({"data": "test "})590    assert result.code == 200591    assert "test_method" == result.result["data"]["name"]592    client = protocol.Client("client")593    result = await client.test_method2({"data": "test"})594    assert result.code == 200595    assert "test_method2" == result.result["data"]["name"]596async def test_list_basemodel_argument(unused_tcp_port, postgres_db, database_name, async_finalizer):597    """Test list of basemodel arguments and primitive types"""598    configure(unused_tcp_port, database_name, postgres_db.port)599    class Project(BaseModel):600        name: str601    class ProjectServer(ServerSlice):602        @protocol.typedmethod(path="/test", operation="POST", client_types=["api"])603        def test_method(data: List[Project], data2: List[int]) -> Project:  # NOQA604            pass605        @protocol.handle(test_method)606        async def test_method(self, data: List[Project], data2: List[int]) -> Project:607            assert len(data) == 1608            assert data[0].name == "test"609            assert len(data2) == 3610            return Project(name="test_method")611    rs = Server()612    server = ProjectServer(name="projectserver")613    rs.add_slice(server)614    await rs.start()615    async_finalizer.add(server.stop)616    async_finalizer.add(rs.stop)617    client = protocol.Client("client")618    result = await client.test_method(data=[{"name": "test"}], data2=[1, 2, 3])619    assert result.code == 200620    assert "test_method" == result.result["data"]["name"]621async def test_dict_basemodel_argument(unused_tcp_port, postgres_db, database_name, async_finalizer):622    """Test dict of basemodel arguments and primitive types"""623    configure(unused_tcp_port, database_name, postgres_db.port)624    class Project(BaseModel):625        name: str626    class ProjectServer(ServerSlice):627        @protocol.typedmethod(path="/test", operation="POST", client_types=["api"])628        def test_method(data: Dict[str, Project], data2: Dict[str, int]) -> Project:  # NOQA629            pass630        @protocol.handle(test_method)631        async def test_method(self, data: Dict[str, Project], data2: Dict[str, int]) -> Project:632            assert len(data) == 1633            assert data["projectA"].name == "test"634            assert len(data2) == 3635            return Project(name="test_method")636    rs = Server()637    server = ProjectServer(name="projectserver")638    rs.add_slice(server)639    await rs.start()640    async_finalizer.add(server.stop)641    async_finalizer.add(rs.stop)642    client = protocol.Client("client")643    result = await client.test_method(data={"projectA": {"name": "test"}}, data2={"1": 1, "2": 2, "3": 3})644    assert result.code == 200645    assert "test_method" == result.result["data"]["name"]646async def test_dict_with_optional_values(unused_tcp_port, postgres_db, database_name, async_finalizer):647    """Test dict which may have None as a value"""648    configure(unused_tcp_port, database_name, postgres_db.port)649    types = Union[pydantic.StrictInt, pydantic.StrictStr]650    class Result(BaseModel):651        val: Optional[types]652    class ProjectServer(ServerSlice):653        @protocol.typedmethod(path="/test", operation="POST", client_types=["api"])654        def test_method(data: Dict[str, Optional[types]]) -> Result:  # NOQA655            pass656        @protocol.handle(test_method)657        async def test_method(self, data: Dict[str, Optional[types]]) -> Result:658            assert len(data) == 1659            assert "test" in data660            return Result(val=data["test"])661        @protocol.typedmethod(path="/test", operation="GET", client_types=["api"])662        def test_method2(data: Optional[str] = None) -> None:  # NOQA663            pass664        @protocol.handle(test_method2)665        async def test_method2(self, data: Optional[str] = None) -> None:666            assert data is None667    rs = Server()668    server = ProjectServer(name="projectserver")669    rs.add_slice(server)670    await rs.start()671    async_finalizer.add(server.stop)672    async_finalizer.add(rs.stop)673    client = protocol.Client("client")674    result = await client.test_method(data={"test": None})675    assert result.code == 200676    assert result.result["data"]["val"] is None677    result = await client.test_method(data={"test": 5})678    assert result.code == 200679    assert result.result["data"]["val"] == 5680    result = await client.test_method(data={"test": "test123"})681    assert result.code == 200682    assert result.result["data"]["val"] == "test123"683    result = await client.test_method2()684    assert result.code == 200685    result = await client.test_method2(data=None)686    assert result.code == 200687async def test_dict_and_list_return(unused_tcp_port, postgres_db, database_name, async_finalizer):688    """Test list of basemodel arguments"""689    configure(unused_tcp_port, database_name, postgres_db.port)690    class Project(BaseModel):691        name: str692    class ProjectServer(ServerSlice):693        @protocol.typedmethod(path="/test", operation="POST", client_types=["api"])694        def test_method(data: Project) -> List[Project]:  # NOQA695            pass696        @protocol.handle(test_method)697        async def test_method(self, data: Project) -> List[Project]:  # NOQA698            return [Project(name="test_method")]699        @protocol.typedmethod(path="/test2", operation="POST", client_types=["api"])700        def test_method2(data: Project) -> List[str]:  # NOQA701            pass702        @protocol.handle(test_method2)703        async def test_method2(self, data: Project) -> List[str]:  # NOQA704            return ["test_method"]705    rs = Server()706    server = ProjectServer(name="projectserver")707    rs.add_slice(server)708    await rs.start()709    async_finalizer.add(server.stop)710    async_finalizer.add(rs.stop)711    client = protocol.Client("client")712    result = await client.test_method(data={"name": "test"})713    assert result.code == 200714    assert len(result.result["data"]) == 1715    assert "test_method" == result.result["data"][0]["name"]716    result = await client.test_method2(data={"name": "test"})717    assert result.code == 200718    assert len(result.result["data"]) == 1719    assert "test_method" == result.result["data"][0]720async def test_method_definition():721    """722    Test typed methods with wrong annotations723    """724    with pytest.raises(InvalidMethodDefinition) as e:725        @protocol.typedmethod(path="/test", operation="PUT", client_types=["api"])726        def test_method1(name) -> None:727            """728            Create a new project729            """730    assert "has no type annotation." in str(e.value)731    with pytest.raises(InvalidMethodDefinition) as e:732        @protocol.typedmethod(path="/test", operation="PUT", client_types=["api"])733        def test_method2(name: Iterator[str]) -> None:734            """735            Create a new project736            """737    assert "Type typing.Iterator[str] of argument name can only be generic List or Dict" in str(e.value)738    with pytest.raises(InvalidMethodDefinition) as e:739        @protocol.typedmethod(path="/test", operation="PUT", client_types=["api"])740        def test_method3(name: List[object]) -> None:741            """742            Create a new project743            """744    assert (745        "Type object of argument name must be a either BaseModel, Enum, UUID, str, float, int, StrictNonIntBool, datetime, "746        "bytes or a List of these types or a Dict with str keys and values of these types."747    ) in str(e.value)748    with pytest.raises(InvalidMethodDefinition) as e:749        @protocol.typedmethod(path="/test", operation="PUT", client_types=["api"])750        def test_method4(name: Dict[int, str]) -> None:751            """752            Create a new project753            """754    assert "Type typing.Dict[int, str] of argument name must be a Dict with str keys and not int" in str(e.value)755    with pytest.raises(InvalidMethodDefinition) as e:756        @protocol.typedmethod(path="/test", operation="PUT", client_types=["api"])757        def test_method5(name: Dict[str, object]) -> None:758            """759            Create a new project760            """761    assert (762        "Type object of argument name must be a either BaseModel, Enum, UUID, str, float, int, StrictNonIntBool, datetime, "763        "bytes or a List of these types or a Dict with str keys and values of these types."764    ) in str(e.value)765    @protocol.typedmethod(path="/service_types/<service_type>", operation="DELETE", client_types=["api"])766    def lcm_service_type_delete(tid: uuid.UUID, service_type: str) -> None:767        """Delete an existing service type."""768def test_optional():769    @protocol.typedmethod(path="/service_types/<service_type>", operation="DELETE", client_types=["api"])770    def lcm_service_type_delete(tid: uuid.UUID, service_type: str, version: Optional[str] = None) -> None:771        """Delete an existing service type."""772async def test_union_types(unused_tcp_port, postgres_db, database_name, async_finalizer):773    """Test use of union types"""774    configure(unused_tcp_port, database_name, postgres_db.port)775    SimpleTypes = Union[float, int, StrictBool, str]  # NOQA776    AttributeTypes = Union[SimpleTypes, List[SimpleTypes], Dict[str, SimpleTypes]]  # NOQA777    class ProjectServer(ServerSlice):778        @protocol.typedmethod(path="/test", operation="GET", client_types=["api"])779        def test_method(data: SimpleTypes, version: Optional[int] = None) -> List[SimpleTypes]:  # NOQA780            pass781        @protocol.handle(test_method)782        async def test_method(self, data: SimpleTypes, version: Optional[int] = None) -> List[SimpleTypes]:  # NOQA783            if isinstance(data, list):784                return data785            return [data]786        @protocol.typedmethod(path="/testp", operation="POST", client_types=["api"])787        def test_methodp(data: AttributeTypes, version: Optional[int] = None) -> List[SimpleTypes]:  # NOQA788            pass789        @protocol.handle(test_methodp)790        async def test_methodp(self, data: AttributeTypes, version: Optional[int] = None) -> List[SimpleTypes]:  # NOQA791            if isinstance(data, list):792                return data793            return [data]794    rs = Server()795    server = ProjectServer(name="projectserver")796    rs.add_slice(server)797    await rs.start()798    async_finalizer.add(server.stop)799    async_finalizer.add(rs.stop)800    client = protocol.Client("client")801    result = await client.test_methodp(data=[5], version=7)802    assert result.code == 200803    assert len(result.result["data"]) == 1804    assert 5 == result.result["data"][0]805    result = await client.test_method(data=5, version=3)806    assert result.code == 200807    assert len(result.result["data"]) == 1808    assert 5 == result.result["data"][0]809    result = await client.test_method(data=5)810    assert result.code == 200811    assert len(result.result["data"]) == 1812    assert 5 == result.result["data"][0]813    result = await client.test_method(data=5, version=7)814    assert result.code == 200815    assert len(result.result["data"]) == 1816    assert 5 == result.result["data"][0]817async def test_basemodel_validation(unused_tcp_port, postgres_db, database_name, async_finalizer):818    """Test validation of basemodel arguments and return, and how they are reported"""819    configure(unused_tcp_port, database_name, postgres_db.port)820    class Project(BaseModel):821        name: str822        value: str823    class ProjectServer(ServerSlice):824        @protocol.typedmethod(path="/test", operation="POST", client_types=["api"])825        def test_method(data: Project) -> Project:  # NOQA826            pass827        @protocol.handle(test_method)828        async def test_method(self, data: Project) -> Project:  # NOQA829            return Project()830    rs = Server()831    server = ProjectServer(name="projectserver")832    rs.add_slice(server)833    await rs.start()834    async_finalizer.add(server.stop)835    async_finalizer.add(rs.stop)836    client = protocol.Client("client")837    # Check validation of arguments838    result = await client.test_method(data={})839    assert result.code == 400840    assert "error_details" in result.result841    details = result.result["error_details"]["validation_errors"]842    assert len(details) == 2843    name = [d for d in details if d["loc"] == ["data", "name"]][0]844    value = [d for d in details if d["loc"] == ["data", "value"]][0]845    assert name["msg"] == "field required"846    assert value["msg"] == "field required"847    # Check the validation of the return value848    result = await client.test_method(data={"name": "X", "value": "Y"})849    assert result.code == 500850    assert "data validation error" in result.result["message"]851async def test_ACOA_header(server):852    """853    Test if the server accepts gzipped encoding and returns gzipped encoding.854    """855    port = opt.get_bind_port()856    url = f"http://localhost:{port}/api/v1/environment"857    request = HTTPRequest(url=url, method="GET")858    client = AsyncHTTPClient()859    response = await client.fetch(request)860    assert response.code == 200861    assert response.headers.get("Access-Control-Allow-Origin") is None862    config.Config.set("server", "access-control-allow-origin", "*")863    response = await client.fetch(request)864    assert response.code == 200865    assert response.headers.get("Access-Control-Allow-Origin") == "*"866async def test_multi_version_method(unused_tcp_port, postgres_db, database_name, async_finalizer):867    """Test multi version methods"""868    configure(unused_tcp_port, database_name, postgres_db.port)869    class Project(BaseModel):870        name: str871        value: str872    class ProjectServer(ServerSlice):873        @protocol.typedmethod(path="/test2", operation="POST", client_types=["api"], api_version=3)874        @protocol.typedmethod(path="/test", operation="POST", client_types=["api"], api_version=2, envelope_key="data")875        @protocol.typedmethod(path="/test", operation="POST", client_types=["api"], api_version=1, envelope_key="project")876        def test_method(project: Project) -> Project:  # NOQA877            pass878        @protocol.handle(test_method)879        async def test_method(self, project: Project) -> Project:  # NOQA880            return project881    rs = Server()882    server = ProjectServer(name="projectserver")883    rs.add_slice(server)884    await rs.start()885    async_finalizer.add(server.stop)886    async_finalizer.add(rs.stop)887    # rest call888    port = opt.get_bind_port()889    request = HTTPRequest(890        url=f"http://localhost:{port}/api/v1/test", method="POST", body=json_encode({"project": {"name": "a", "value": "b"}})891    )892    client = AsyncHTTPClient()893    response = await client.fetch(request)894    assert response.code == 200895    body = json.loads(response.body)896    assert "project" in body897    request = HTTPRequest(898        url=f"http://localhost:{port}/api/v2/test", method="POST", body=json_encode({"project": {"name": "a", "value": "b"}})899    )900    client = AsyncHTTPClient()901    response = await client.fetch(request)902    assert response.code == 200903    body = json.loads(response.body)904    assert "data" in body905    request = HTTPRequest(906        url=f"http://localhost:{port}/api/v3/test2", method="POST", body=json_encode({"project": {"name": "a", "value": "b"}})907    )908    client = AsyncHTTPClient()909    response = await client.fetch(request)910    assert response.code == 200911    body = json.loads(response.body)912    assert "data" in body913    # client based calls914    client = protocol.Client("client")915    response = await client.test_method(project=Project(name="a", value="b"))916    assert response.code == 200917    assert "project" in response.result918    client = protocol.Client("client", version_match=VersionMatch.highest)919    response = await client.test_method(project=Project(name="a", value="b"))920    assert response.code == 200921    assert "data" in response.result922    client = protocol.Client("client", version_match=VersionMatch.exact, exact_version=1)923    response = await client.test_method(project=Project(name="a", value="b"))924    assert response.code == 200925    assert "project" in response.result926    client = protocol.Client("client", version_match=VersionMatch.exact, exact_version=2)927    response = await client.test_method(project=Project(name="a", value="b"))928    assert response.code == 200929    assert "data" in response.result930async def test_multi_version_handler(unused_tcp_port, postgres_db, database_name, async_finalizer):931    """Test multi version methods"""932    configure(unused_tcp_port, database_name, postgres_db.port)933    class Project(BaseModel):934        name: str935        value: str936    class ProjectServer(ServerSlice):937        @protocol.typedmethod(path="/test", operation="POST", client_types=["api"], api_version=2, envelope_key="data")938        @protocol.typedmethod(path="/test", operation="POST", client_types=["api"], api_version=1, envelope_key="project")939        def test_method(project: Project) -> Project:  # NOQA940            pass941        @protocol.handle(test_method, api_version=1)942        async def test_methodX(self, project: Project) -> Project:  # NOQA943            return Project(name="v1", value="1")944        @protocol.handle(test_method, api_version=2)945        async def test_methodY(self, project: Project) -> Project:  # NOQA946            return Project(name="v2", value="2")947    rs = Server()948    server = ProjectServer(name="projectserver")949    rs.add_slice(server)950    await rs.start()951    async_finalizer.add(server.stop)952    async_finalizer.add(rs.stop)953    # client based calls954    client = protocol.Client("client")955    response = await client.test_method(project=Project(name="a", value="b"))956    assert response.code == 200957    assert "project" in response.result958    assert response.result["project"]["name"] == "v1"959    client = protocol.Client("client", version_match=VersionMatch.highest)960    response = await client.test_method(project=Project(name="a", value="b"))961    assert response.code == 200962    assert "data" in response.result963    assert response.result["data"]["name"] == "v2"964async def test_simple_return_type(unused_tcp_port, postgres_db, database_name, async_finalizer):965    """Test methods with simple return types"""966    configure(unused_tcp_port, database_name, postgres_db.port)967    class ProjectServer(ServerSlice):968        @protocol.typedmethod(path="/test", operation="POST", client_types=["api"])969        def test_method(project: str) -> str:  # NOQA970            pass971        @protocol.handle(test_method)972        async def test_methodY(self, project: str) -> str:  # NOQA973            return project974    rs = Server()975    server = ProjectServer(name="projectserver")976    rs.add_slice(server)977    await rs.start()978    async_finalizer.add(server.stop)979    async_finalizer.add(rs.stop)980    # client based calls981    client = protocol.Client("client")982    response = await client.test_method(project="x")983    assert response.code == 200984    assert response.result["data"] == "x"985async def test_html_content_type(unused_tcp_port, postgres_db, database_name, async_finalizer):986    """Test whether API endpoints with a text/html content-type work."""987    configure(unused_tcp_port, database_name, postgres_db.port)988    html_content = "<html><body>test</body></html>"989    @protocol.typedmethod(path="/test", operation="GET", client_types=["api"])990    def test_method() -> ReturnValue[str]:  # NOQA991        pass992    class TestServer(ServerSlice):993        @protocol.handle(test_method)994        async def test_methodY(self) -> ReturnValue[str]:  # NOQA995            return ReturnValue(response=html_content, content_type=HTML_CONTENT)996    rs = Server()997    server = TestServer(name="testserver")998    rs.add_slice(server)999    await rs.start()1000    async_finalizer.add(server.stop)1001    async_finalizer.add(rs.stop)1002    # client based calls1003    client = protocol.Client("client")1004    response = await client.test_method()1005    assert response.code == 2001006    assert response.result == html_content1007async def test_html_content_type_with_utf8_encoding(unused_tcp_port, postgres_db, database_name, async_finalizer):1008    """Test whether API endpoints with a "text/html; charset=UTF-8" content-type work."""1009    configure(unused_tcp_port, database_name, postgres_db.port)1010    html_content = "<html><body>test</body></html>"1011    @protocol.typedmethod(path="/test", operation="GET", client_types=["api"])1012    def test_method() -> ReturnValue[str]:  # NOQA1013        pass1014    class TestServer(ServerSlice):1015        @protocol.handle(test_method)1016        async def test_methodY(self) -> ReturnValue[str]:  # NOQA1017            return ReturnValue(response=html_content, content_type=HTML_CONTENT_WITH_UTF8_CHARSET)1018    rs = Server()1019    server = TestServer(name="testserver")1020    rs.add_slice(server)1021    await rs.start()1022    async_finalizer.add(server.stop)1023    async_finalizer.add(rs.stop)1024    # client based calls1025    client = protocol.Client("client")1026    response = await client.test_method()1027    assert response.code == 2001028    assert response.result == html_content1029async def test_octet_stream_content_type(unused_tcp_port, postgres_db, database_name, async_finalizer):1030    """Test whether API endpoints with an application/octet-stream content-type work."""1031    configure(unused_tcp_port, database_name, postgres_db.port)1032    byte_stream = b"test123"1033    @protocol.typedmethod(path="/test", operation="GET", client_types=["api"])1034    def test_method() -> ReturnValue[bytes]:  # NOQA1035        pass1036    class TestServer(ServerSlice):1037        @protocol.handle(test_method)1038        async def test_methodY(self) -> ReturnValue[bytes]:  # NOQA1039            return ReturnValue(response=byte_stream, content_type=OCTET_STREAM_CONTENT)1040    rs = Server()1041    server = TestServer(name="testserver")1042    rs.add_slice(server)1043    await rs.start()1044    async_finalizer.add(server.stop)1045    async_finalizer.add(rs.stop)1046    # client based calls1047    client = protocol.Client("client")1048    response = await client.test_method()1049    assert response.code == 2001050    assert response.result == byte_stream1051async def test_zip_content_type(unused_tcp_port, postgres_db, database_name, async_finalizer):1052    """Test whether API endpoints with an application/zip content-type work."""1053    configure(unused_tcp_port, database_name, postgres_db.port)1054    zip_content = b"test123"1055    @protocol.typedmethod(path="/test", operation="GET", client_types=["api"])1056    def test_method() -> ReturnValue[bytes]:  # NOQA1057        pass1058    class TestServer(ServerSlice):1059        @protocol.handle(test_method)1060        async def test_methodY(self) -> ReturnValue[bytes]:  # NOQA1061            return ReturnValue(response=zip_content, content_type=ZIP_CONTENT)1062    rs = Server()1063    server = TestServer(name="testserver")1064    rs.add_slice(server)1065    await rs.start()1066    async_finalizer.add(server.stop)1067    async_finalizer.add(rs.stop)1068    # client based calls1069    client = protocol.Client("client")1070    response = await client.test_method()1071    assert response.code == 2001072    assert response.result == zip_content1073@pytest.fixture1074async def options_server():1075    @protocol.typedmethod(path="/test", operation="OPTIONS", client_types=["api"])1076    def test_method() -> ReturnValue[str]:  # NOQA1077        pass1078    class TestServer(ServerSlice):1079        @protocol.handle(test_method)1080        async def test_methodY(self) -> ReturnValue[str]:  # NOQA1081            return ReturnValue(response="content")1082    return TestServer(name="testserver")1083@pytest.fixture1084def options_request(unused_tcp_port):1085    return HTTPRequest(1086        url=f"http://localhost:{unused_tcp_port}/api/v1/test",1087        method="OPTIONS",1088        connect_timeout=1.0,1089        request_timeout=1.0,1090        decompress_response=True,1091    )1092@pytest.mark.parametrize("auth_enabled, auth_header_allowed", [(True, True), (False, False)])1093async def test_auth_enabled_options_method(1094    auth_enabled,1095    auth_header_allowed,1096    unused_tcp_port,1097    postgres_db,1098    database_name,1099    async_finalizer,1100    options_server,1101    options_request,1102):1103    configure(unused_tcp_port, database_name, postgres_db.port)1104    config.Config.set("server", "auth", str(auth_enabled))1105    rs = Server()1106    rs.add_slice(options_server)1107    await rs.start()1108    async_finalizer.add(options_server.stop)1109    async_finalizer.add(rs.stop)1110    client = AsyncHTTPClient()1111    response = await client.fetch(options_request)1112    assert response.code == 2001113    assert ("Authorization" in response.headers.get("Access-Control-Allow-Headers")) == auth_header_allowed1114async def test_required_header_not_present(server):1115    client = AsyncHTTPClient()1116    response = await client.fetch(f"http://localhost:{server_bind_port.get()}/api/v2/environment_settings", raise_error=False)1117    assert response.code == 4001118async def test_malformed_json(server):1119    """1120    Tests sending malformed json to the server1121    """1122    port = opt.get_bind_port()1123    url = f"http://localhost:{port}/api/v2/environment"1124    request = HTTPRequest(url=url, method="PUT", body='{"name": env}')1125    client = AsyncHTTPClient()1126    response = await client.fetch(request, raise_error=False)1127    assert response.code == 4001128    assert (1129        json.loads(response.body)["message"]1130        == "The request body couldn't be decoded as a JSON: Expecting value: line 1 column 10 (char 9)"1131    )1132async def test_tuple_index_out_of_range(unused_tcp_port, postgres_db, database_name, async_finalizer):1133    configure(unused_tcp_port, database_name, postgres_db.port)1134    class Project(BaseModel):1135        name: str1136        value: str1137    class ProjectServer(ServerSlice):1138        @protocol.typedmethod(1139            api_prefix="test", path="/project/<project>", operation="GET", arg_options=ENV_OPTS, client_types=["api"]1140        )1141        def test_method(1142            tid: uuid.UUID, project: str, include_deleted: bool = False1143        ) -> List[Union[uuid.UUID, Project, bool]]:  # NOQA1144            pass1145        @protocol.handle(test_method)1146        async def test_method(1147            tid: uuid.UUID, project: Project, include_deleted: bool = False1148        ) -> List[Union[uuid.UUID, Project, bool]]:  # NOQA1149            return [tid, project, include_deleted]1150    rs = Server()1151    server = ProjectServer(name="projectserver")1152    rs.add_slice(server)1153    await rs.start()1154    async_finalizer.add(server.stop)1155    async_finalizer.add(rs.stop)1156    port = opt.get_bind_port()1157    url = f"http://localhost:{port}/test/v1/project/afcb51dc-1043-42b6-bb99-b4fc88603126"1158    request = HTTPRequest(url=url, method="GET")1159    client = AsyncHTTPClient()1160    response = await client.fetch(request, raise_error=False)1161    assert response.code == 4001162    assert json.loads(response.body)["message"] == "Invalid request: Field 'tid' is required."1163async def test_multiple_path_params(unused_tcp_port, postgres_db, database_name, async_finalizer):1164    configure(unused_tcp_port, database_name, postgres_db.port)1165    class ProjectServer(ServerSlice):1166        @protocol.typedmethod(path="/test/<id>/<name>", operation="GET", client_types=["api"])1167        def test_method(id: str, name: str, age: int) -> str:  # NOQA1168            pass1169        @protocol.handle(test_method)1170        async def test_methodY(self, id: str, name: str, age: int) -> str:  # NOQA1171            return name1172    rs = Server()1173    server = ProjectServer(name="projectserver")1174    rs.add_slice(server)1175    await rs.start()1176    async_finalizer.add(server.stop)1177    async_finalizer.add(rs.stop)1178    request = MethodProperties.methods["test_method"][0].build_call(args=[], kwargs={"id": "1", "name": "monty", "age": 42})1179    assert request.url == "/api/v1/test/1/monty?age=42"1180async def test_2151_method_header_parameter_in_body(async_finalizer) -> None:1181    async def _id(x: object, dct: Dict[str, str]) -> object:1182        return x1183    @protocol.method(1184        path="/testmethod",1185        operation="POST",1186        arg_options={"header_param": ArgOption(header="X-Inmanta-Header-Param", getter=_id)},1187        client_types=[const.ClientType.api],1188    )1189    def test_method(header_param: str, body_param: str) -> None:1190        """1191        A method used for testing.1192        """1193    class TestSlice(ServerSlice):1194        @protocol.handle(test_method)1195        async def test_method_implementation(self, header_param: str, body_param: str) -> None:1196            pass1197    server: Server = Server()1198    server_slice: ServerSlice = TestSlice("my_test_slice")1199    server.add_slice(server_slice)1200    await server.start()1201    async_finalizer.add(server_slice.stop)1202    async_finalizer.add(server.stop)1203    client = tornado.httpclient.AsyncHTTPClient()1204    # valid request should succeed1205    request = tornado.httpclient.HTTPRequest(1206        url=f"http://localhost:{opt.get_bind_port()}/api/v1/testmethod",1207        method="POST",1208        body=json_encode({"body_param": "body_param_value"}),1209        headers={"X-Inmanta-Header-Param": "header_param_value"},1210    )1211    response: tornado.httpclient.HTTPResponse = await client.fetch(request)1212    assert response.code == 2001213    # invalid request should fail1214    request = tornado.httpclient.HTTPRequest(1215        url=f"http://localhost:{opt.get_bind_port()}/api/v1/testmethod",1216        method="POST",1217        body=json_encode({"header_param": "header_param_value", "body_param": "body_param_value"}),1218    )1219    with pytest.raises(tornado.httpclient.HTTPClientError):1220        await client.fetch(request)1221@pytest.mark.parametrize("return_value,valid", [(1, True), (None, True), ("Hello World!", False)])1222async def test_2277_typedmethod_return_optional(async_finalizer, return_value: object, valid: bool) -> None:1223    @protocol.typedmethod(1224        path="/typedtestmethod",1225        operation="GET",1226        client_types=[const.ClientType.api],1227        api_version=1,1228    )1229    def test_method_typed() -> Optional[int]:1230        """1231        A typedmethod used for testing.1232        """1233    class TestSlice(ServerSlice):1234        @protocol.handle(test_method_typed)1235        async def test_method_typed_implementation(self) -> Optional[int]:1236            return return_value  # type: ignore1237    server: Server = Server()1238    server_slice: ServerSlice = TestSlice("my_test_slice")1239    server.add_slice(server_slice)1240    await server.start()1241    async_finalizer.add(server_slice.stop)1242    async_finalizer.add(server.stop)1243    client: protocol.Client = protocol.Client("client")1244    response: Result = await client.test_method_typed()1245    if valid:1246        assert response.code == 2001247        assert response.result == {"data": return_value}1248    else:1249        assert response.code == 4001250def test_method_strict_exception() -> None:1251    with pytest.raises(InvalidMethodDefinition, match="Invalid type for argument arg: Any type is not allowed in strict mode"):1252        @protocol.typedmethod(path="/testmethod", operation="POST", client_types=[const.ClientType.api])1253        def test_method(arg: Any) -> None:1254            pass1255async def test_method_nonstrict_allowed(async_finalizer) -> None:1256    @protocol.typedmethod(path="/zipsingle", operation="POST", client_types=[const.ClientType.api], strict_typing=False)1257    def merge_dicts(one: Dict[str, Any], other: Dict[str, int], any_arg: Any) -> Dict[str, Any]:1258        """1259        Merge two dicts.1260        """1261    class TestSlice(ServerSlice):1262        @protocol.handle(merge_dicts)1263        async def merge_dicts_impl(self, one: Dict[str, Any], other: Dict[str, int], any_arg: Any) -> Dict[str, Any]:1264            return {**one, **other}1265    server: Server = Server()1266    server_slice: ServerSlice = TestSlice("my_test_slice")1267    server.add_slice(server_slice)1268    await server.start()1269    async_finalizer.add(server_slice.stop)1270    async_finalizer.add(server.stop)1271    client: protocol.Client = protocol.Client("client")1272    one: Dict[str, Any] = {"my": {"nested": {"keys": 42}}}1273    other: Dict[str, int] = {"single_level": 42}1274    response: Result = await client.merge_dicts(one, other, None)1275    assert response.code == 2001276    assert response.result == {"data": {**one, **other}}1277@pytest.mark.parametrize(1278    "param_type,param_value,expected_url",1279    [1280        (1281            Dict[str, str],1282            {"a": "b", "c": "d", ",&?=%": ",&?=%."},1283            "/api/v1/test/1/monty?filter.a=b&filter.c=d&filter.%2C%26%3F%3D%25=%2C%26%3F%3D%25.",1284        ),1285        (1286            Dict[str, List[str]],1287            {"a": ["b"], "c": ["d", "e"], "g": ["h"]},1288            "/api/v1/test/1/monty?filter.a=b&filter.c=d&filter.c=e&filter.g=h",1289        ),1290        (1291            Dict[str, List[str]],1292            {"a": ["b"], "c": ["d", "e"], ",&?=%": [",&?=%", "f"], ".g.h": ["i"]},1293            "/api/v1/test/1/monty?filter.a=b&filter.c=d&filter.c=e"1294            "&filter.%2C%26%3F%3D%25=%2C%26%3F%3D%25&filter.%2C%26%3F%3D%25=f&filter..g.h=i",1295        ),1296        (1297            List[str],1298            [1299                "a ",1300                "b,",1301                "c",1302            ],1303            "/api/v1/test/1/monty?filter=a+&filter=b%2C&filter=c",1304        ),1305        (1306            List[str],1307            ["a", "b", ",&?=%", "c", "."],1308            "/api/v1/test/1/monty?filter=a&filter=b&filter=%2C%26%3F%3D%25&filter=c&filter=.",1309        ),1310        (List[str], ["a ", "b", "c", ","], "/api/v1/test/1/monty?filter=a+&filter=b&filter=c&filter=%2C"),1311    ],1312)1313async def test_dict_list_get_roundtrip(1314    unused_tcp_port, postgres_db, database_name, async_finalizer, param_type, param_value, expected_url1315):1316    configure(unused_tcp_port, database_name, postgres_db.port)1317    class ProjectServer(ServerSlice):1318        @protocol.typedmethod(path="/test/<id>/<name>", operation="GET", client_types=["api"], strict_typing=False)1319        def test_method(id: str, name: str, filter: param_type) -> Any:  # NOQA1320            pass1321        @protocol.handle(test_method)1322        async def test_method(self, id: str, name: str, filter: param_type) -> Any:  # NOQA1323            return filter1324    rs = Server()1325    server = ProjectServer(name="projectserver")1326    rs.add_slice(server)1327    await rs.start()1328    async_finalizer.add(server.stop)1329    async_finalizer.add(rs.stop)1330    request = MethodProperties.methods["test_method"][0].build_call(1331        args=[], kwargs={"id": "1", "name": "monty", "filter": param_value}1332    )1333    assert request.url == expected_url1334    client: protocol.Client = protocol.Client("client")1335    response: Result = await client.test_method(1, "monty", filter=param_value)1336    assert response.code == 2001337    assert response.result["data"] == param_value1338async def test_dict_get_optional(unused_tcp_port, postgres_db, database_name, async_finalizer):1339    configure(unused_tcp_port, database_name, postgres_db.port)1340    class ProjectServer(ServerSlice):1341        @protocol.typedmethod(path="/test/<id>/<name>", operation="GET", client_types=["api"])1342        def test_method(id: str, name: str, filter: Optional[Dict[str, str]] = None) -> str:  # NOQA1343            pass1344        @protocol.handle(test_method)1345        async def test_method(self, id: str, name: str, filter: Optional[Dict[str, str]] = None) -> str:  # NOQA1346            return ",".join(filter.keys()) if filter is not None else ""1347    rs = Server()1348    server = ProjectServer(name="projectserver")1349    rs.add_slice(server)1350    await rs.start()1351    async_finalizer.add(server.stop)1352    async_finalizer.add(rs.stop)1353    request = MethodProperties.methods["test_method"][0].build_call(1354        args=[], kwargs={"id": "1", "name": "monty", "filter": {"a": "b"}}1355    )1356    assert request.url == "/api/v1/test/1/monty?filter.a=b"1357    client: protocol.Client = protocol.Client("client")1358    response: Result = await client.test_method(1, "monty", filter={"a": "b", "c": "d"})1359    assert response.code == 2001360    assert response.result["data"] == "a,c"1361    response: Result = await client.test_method(1, "monty")1362    assert response.code == 2001363    assert response.result["data"] == ""1364async def test_dict_list_nested_get_optional(unused_tcp_port, postgres_db, database_name, async_finalizer):1365    configure(unused_tcp_port, database_name, postgres_db.port)1366    class ProjectServer(ServerSlice):1367        @protocol.typedmethod(path="/test/<id>/<name>", operation="GET", client_types=["api"])1368        def test_method(id: str, name: str, filter: Optional[Dict[str, List[str]]] = None) -> str:  # NOQA1369            pass1370        @protocol.handle(test_method)1371        async def test_method(self, id: str, name: str, filter: Optional[Dict[str, List[str]]] = None) -> str:  # NOQA1372            return ",".join(filter.keys()) if filter is not None else ""1373    rs = Server()1374    server = ProjectServer(name="projectserver")1375    rs.add_slice(server)1376    await rs.start()1377    async_finalizer.add(server.stop)1378    async_finalizer.add(rs.stop)1379    request = MethodProperties.methods["test_method"][0].build_call(1380        args=[], kwargs={"id": "1", "name": "monty", "filter": {"a": ["b"]}}1381    )1382    assert request.url == "/api/v1/test/1/monty?filter.a=b"1383    client: protocol.Client = protocol.Client("client")1384    response: Result = await client.test_method(1, "monty", filter={"a": "b", "c": ["d", "e"]})1385    assert response.code == 2001386    assert response.result["data"] == "a,c"1387    response: Result = await client.test_method(1, "monty")1388    assert response.code == 2001389    assert response.result["data"] == ""1390@pytest.mark.parametrize(1391    "param_type,expected_error_message",1392    [1393        (1394            Dict[str, Dict[str, str]],1395            "nested dictionaries and union types for dictionary values are not supported for GET requests",1396        ),1397        (1398            Dict[str, Union[str, List[str]]],1399            "nested dictionaries and union types for dictionary values are not supported for GET requests",1400        ),1401        (List[Dict[str, str]], "lists of dictionaries and lists of lists are not supported for GET requests"),1402        (List[List[str]], "lists of dictionaries and lists of lists are not supported for GET requests"),1403    ],1404)1405async def test_dict_list_get_invalid(1406    unused_tcp_port, postgres_db, database_name, async_finalizer, param_type, expected_error_message1407):1408    configure(unused_tcp_port, database_name, postgres_db.port)1409    with pytest.raises(InvalidMethodDefinition) as e:1410        class ProjectServer(ServerSlice):1411            @protocol.typedmethod(path="/test/<id>/<name>", operation="GET", client_types=["api"])1412            def test_method(id: str, name: str, filter: param_type) -> str:  # NOQA1413                pass1414            @protocol.handle(test_method)1415            async def test_method(self, id: str, name: str, filter: param_type) -> str:  # NOQA1416                return ""1417        assert expected_error_message in str(e)1418async def test_list_get_optional(unused_tcp_port, postgres_db, database_name, async_finalizer):1419    configure(unused_tcp_port, database_name, postgres_db.port)1420    class ProjectServer(ServerSlice):1421        @protocol.typedmethod(path="/test/<id>/<name>", operation="GET", client_types=["api"])1422        def test_method(id: str, name: str, sort: Optional[List[int]] = None) -> str:  # NOQA1423            pass1424        @protocol.typedmethod(path="/test_uuid/<id>", operation="GET", client_types=["api"])1425        def test_method_uuid(id: str, sort: Optional[List[uuid.UUID]] = None) -> str:  # NOQA1426            pass1427        @protocol.handle(test_method)1428        async def test_method(self, id: str, name: str, sort: Optional[List[int]] = None) -> str:  # NOQA1429            return str(sort) if sort else ""1430        @protocol.handle(test_method_uuid)1431        async def test_method_uuid(self, id: str, sort: Optional[List[uuid.UUID]] = None) -> str:  # NOQA1432            return str(sort) if sort else ""1433    rs = Server()1434    server = ProjectServer(name="projectserver")1435    rs.add_slice(server)1436    await rs.start()1437    async_finalizer.add(server.stop)1438    async_finalizer.add(rs.stop)1439    request = MethodProperties.methods["test_method"][0].build_call(1440        args=[], kwargs={"id": "1", "name": "monty", "sort": [1, 2]}1441    )1442    assert request.url == "/api/v1/test/1/monty?sort=1&sort=2"1443    client: protocol.Client = protocol.Client("client")1444    response: Result = await client.test_method(1, "monty", sort=[1, 2])1445    assert response.code == 2001446    assert response.result["data"] == "[1, 2]"1447    response: Result = await client.test_method(1, "monty")1448    assert response.code == 2001449    assert response.result["data"] == ""1450    uuids = [uuid.uuid4(), uuid.uuid4()]1451    request = MethodProperties.methods["test_method_uuid"][0].build_call(args=[], kwargs={"id": "1", "sort": uuids})1452    assert request.url == f"/api/v1/test_uuid/1?sort={uuids[0]}&sort={uuids[1]}"1453async def test_dicts_multiple_get(unused_tcp_port, postgres_db, database_name, async_finalizer):1454    configure(unused_tcp_port, database_name, postgres_db.port)1455    class ProjectServer(ServerSlice):1456        @protocol.typedmethod(path="/test/<id>/<name>", operation="GET", client_types=["api"])1457        def test_method(id: str, name: str, filter: Dict[str, List[str]], another_filter: Dict[str, str]) -> str:  # NOQA1458            pass1459        @protocol.handle(test_method)1460        async def test_method(1461            self, id: str, name: str, filter: Dict[str, List[str]], another_filter: Dict[str, str]1462        ) -> str:  # NOQA1463            return ",".join(chain(filter.keys(), another_filter.keys()))1464    rs = Server()1465    server = ProjectServer(name="projectserver")1466    rs.add_slice(server)1467    await rs.start()1468    async_finalizer.add(server.stop)1469    async_finalizer.add(rs.stop)1470    request = MethodProperties.methods["test_method"][0].build_call(1471        args=[], kwargs={"id": "1", "name": "monty", "filter": {"a": ["b", "c"]}, "another_filter": {"d": "e"}}1472    )1473    assert request.url == "/api/v1/test/1/monty?filter.a=b&filter.a=c&another_filter.d=e"1474    client: protocol.Client = protocol.Client("client")1475    response: Result = await client.test_method(1, "monty", filter={"a": ["b"], "c": ["d", "e"]}, another_filter={"x": "y"})1476    assert response.code == 2001477    assert response.result["data"] == "a,c,x"1478async def test_dict_list_get_by_url(unused_tcp_port, postgres_db, database_name, async_finalizer):1479    configure(unused_tcp_port, database_name, postgres_db.port)1480    class ProjectServer(ServerSlice):1481        @protocol.typedmethod(path="/test/<id>/<name>", operation="GET", client_types=["api"])1482        def test_method(id: str, name: str, filter: Dict[str, str]) -> str:  # NOQA1483            pass1484        @protocol.typedmethod(path="/test_list/<id>", operation="GET", client_types=["api"])1485        def test_method_list(id: str, filter: List[int]) -> str:  # NOQA1486            pass1487        @protocol.typedmethod(path="/test_dict_of_lists/<id>", operation="GET", client_types=["api"])1488        def test_method_dict_of_lists(id: str, filter: Dict[str, List[str]]) -> str:  # NOQA1489            pass1490        @protocol.handle(test_method)1491        async def test_method(self, id: str, name: str, filter: Dict[str, str]) -> str:  # NOQA1492            return ",".join(filter.keys())1493        @protocol.handle(test_method_list)1494        async def test_method_list(self, id: str, filter: List[int]) -> str:  # NOQA1495            return str(filter)1496        @protocol.handle(test_method_dict_of_lists)1497        async def test_method_dict_of_lists(self, id: str, filter: Dict[str, List[str]]) -> str:  # NOQA1498            return ",".join(filter.keys())1499    rs = Server()1500    server = ProjectServer(name="projectserver")1501    rs.add_slice(server)1502    await rs.start()1503    async_finalizer.add(server.stop)1504    async_finalizer.add(rs.stop)1505    client = AsyncHTTPClient()1506    response = await client.fetch(f"http://localhost:{server_bind_port.get()}/api/v1/test/1/monty?filter.=b", raise_error=False)1507    assert response.code == 4001508    response = await client.fetch(1509        f"http://localhost:{server_bind_port.get()}/api/v1/test/1/monty?filter.a=b&filter.=c", raise_error=False1510    )1511    assert response.code == 4001512    response = await client.fetch(1513        f"http://localhost:{server_bind_port.get()}/api/v1/test/1/monty?filter.a=b&filter.a=c", raise_error=False1514    )1515    assert response.code == 4001516    response = await client.fetch(1517        f"http://localhost:{server_bind_port.get()}/api/v1/test_list/1?filter.a=b&filter.=c", raise_error=False1518    )1519    assert response.code == 4001520    # Integer should also work1521    response = await client.fetch(1522        f"http://localhost:{server_bind_port.get()}/api/v1/test_list/1?filter=42&filter=45", raise_error=False1523    )1524    assert response.code == 2001525    # list nested in dict1526    response = await client.fetch(1527        f"http://localhost:{server_bind_port.get()}/api/v1/test_dict_of_lists/1?filter.a=42&filter.a=55&filter.b=e",1528        raise_error=False,1529    )1530    assert response.code == 2001531    filter_with_comma = {"filter.a": "42,55,%2C70", "filter.b": "e"}1532    url = url_concat(f"http://localhost:{server_bind_port.get()}/api/v1/test_dict_of_lists/1", filter_with_comma)1533    response = await client.fetch(1534        url,1535        raise_error=False,1536    )1537    assert response.code == 2001538    response = await client.fetch(1539        f"http://localhost:{server_bind_port.get()}/api/v1/test_dict_of_lists/1?filter.a=42&filter.a=55&filter.&filter.c=a",1540        raise_error=False,1541    )1542    assert response.code == 4001543    filter_with_comma = {"filter.a": "b", "filter.c": "e", "filter.,&?=%": ",&?=%"}1544    url = url_concat(f"http://localhost:{server_bind_port.get()}/api/v1/test/1/monty", filter_with_comma)1545    response = await client.fetch(1546        url,1547        raise_error=False,1548    )1549    assert response.code == 2001550async def test_api_datetime_utc(unused_tcp_port, postgres_db, database_name, async_finalizer):1551    """1552    Test API input and output conversion for timestamps. Objects should be either timezone-aware or implicit UTC.1553    """1554    configure(unused_tcp_port, database_name, postgres_db.port)1555    timezone: datetime.timezone = datetime.timezone(datetime.timedelta(hours=2))1556    now: datetime.datetime = datetime.datetime.now().astimezone(timezone)1557    naive_utc: datetime.datetime = now.astimezone(datetime.timezone.utc).replace(tzinfo=None)1558    class ProjectServer(ServerSlice):1559        @protocol.typedmethod(path="/test", operation="GET", client_types=["api"])1560        def test_method(timestamp: datetime.datetime) -> List[datetime.datetime]:1561            pass1562        @protocol.handle(test_method)1563        async def test_method(self, timestamp: datetime.datetime) -> List[datetime.datetime]:1564            assert timestamp.tzinfo is not None1565            assert timestamp == now1566            return [1567                now,1568                now.astimezone(datetime.timezone.utc),1569                now.astimezone(datetime.timezone.utc).replace(tzinfo=None),1570            ]1571    rs = Server()1572    server = ProjectServer(name="projectserver")1573    rs.add_slice(server)1574    await rs.start()1575    async_finalizer.add(server.stop)1576    async_finalizer.add(rs.stop)1577    client: protocol.Client = protocol.Client("client")1578    response: Result = await client.test_method(timestamp=now)1579    assert response.code == 2001580    assert all(pydantic.parse_obj_as(datetime.datetime, timestamp) == naive_utc for timestamp in response.result["data"])1581    response: Result = await client.test_method(timestamp=now.astimezone(datetime.timezone.utc))1582    assert response.code == 2001583    response: Result = await client.test_method(timestamp=now.astimezone(datetime.timezone.utc).replace(tzinfo=None))1584    assert response.code == 2001585    response: Result = await client.test_method(timestamp=now.replace(tzinfo=None))1586    assert response.code == 5001587    # Test REST API without going through Python client1588    port = opt.get_bind_port()1589    client = AsyncHTTPClient()1590    async def request(timestamp: datetime.datetime) -> tornado.httpclient.HTTPResponse:1591        request = HTTPRequest(1592            url=(1593                f"http://localhost:{port}/api/v1/test?timestamp="1594                f"{urllib.parse.quote(timestamp.isoformat(timespec='microseconds'))}"1595            ),1596            method="GET",1597        )1598        return await client.fetch(request)1599    response = await request(now)1600    assert response.code == 2001601    response = await request(now.astimezone(datetime.timezone.utc).replace(tzinfo=None))1602    assert response.code == 2001603    with pytest.raises(tornado.httpclient.HTTPClientError):1604        response = await request(now.replace(tzinfo=None))1605async def test_dict_of_list(unused_tcp_port, postgres_db, database_name, async_finalizer):1606    """1607    Test API input and output conversion for timestamps. Objects should be either timezone-aware or implicit UTC.1608    """1609    configure(unused_tcp_port, database_name, postgres_db.port)1610    class APydanticType(BaseModel):1611        attr: int1612    class ProjectServer(ServerSlice):1613        @protocol.typedmethod(path="/test", operation="GET", client_types=[const.ClientType.api])1614        def test_method(id: str) -> Dict[str, List[APydanticType]]:1615            pass1616        @protocol.handle(test_method)1617        async def test_method(self, id: str) -> Dict[str, List[APydanticType]]:1618            return {id: [APydanticType(attr=1), APydanticType(attr=5)]}1619    rs = Server()1620    server = ProjectServer(name="projectserver")1621    rs.add_slice(server)1622    await rs.start()1623    async_finalizer.add(server.stop)1624    async_finalizer.add(rs.stop)1625    client: protocol.Client = protocol.Client("client")1626    result = await client.test_method(id="test")1627    assert result.code == 200, result.result["message"]1628    assert result.result["data"] == {"test": [{"attr": 1}, {"attr": 5}]}1629async def test_return_value_with_meta(unused_tcp_port, postgres_db, database_name, async_finalizer):1630    configure(unused_tcp_port, database_name, postgres_db.port)1631    class ProjectServer(ServerSlice):1632        @protocol.typedmethod(path="/test", operation="GET", client_types=["api"])1633        def test_method(with_warning: bool) -> ReturnValueWithMeta[str]:  # NOQA1634            pass1635        @protocol.handle(test_method)1636        async def test_method(self, with_warning: bool) -> ReturnValueWithMeta:  # NOQA1637            metadata = {"additionalInfo": f"Today's bitcoin exchange rate is: {(random.random() * 100000):.2f}$"}1638            result = ReturnValueWithMeta(response="abcd", metadata=metadata)1639            if with_warning:1640                result.add_warnings(["Warning message"])1641            return result1642    rs = Server()1643    server = ProjectServer(name="projectserver")1644    rs.add_slice(server)1645    await rs.start()1646    async_finalizer.add(server.stop)1647    async_finalizer.add(rs.stop)1648    client: protocol.Client = protocol.Client("client")1649    response = await client.test_method(False)1650    assert response.code == 2001651    assert response.result["data"] == "abcd"1652    assert response.result["metadata"].get("additionalInfo") is not None1653    assert response.result["metadata"].get("warnings") is None1654    response = await client.test_method(True)1655    assert response.code == 2001656    assert response.result["data"] == "abcd"1657    assert response.result["metadata"].get("additionalInfo") is not None1658    assert response.result["metadata"].get("warnings") is not None1659async def test_kwargs(unused_tcp_port, postgres_db, database_name, async_finalizer):1660    """1661    Test the use and validation of methods that use common.ReturnValue1662    """1663    configure(unused_tcp_port, database_name, postgres_db.port)1664    class ProjectServer(ServerSlice):1665        @protocol.typedmethod(path="/test", operation="POST", client_types=[ClientType.api], varkw=True)1666        def test_method(id: str, **kwargs: object) -> Dict[str, str]:  # NOQA1667            """1668            Create a new project1669            """1670        @protocol.handle(test_method)1671        async def test_method(self, id: str, **kwargs: object) -> Dict[str, str]:1672            return {"name": str(kwargs["name"]), "value": str(kwargs["value"])}1673    rs = Server()1674    server = ProjectServer(name="projectserver")1675    rs.add_slice(server)1676    await rs.start()1677    async_finalizer.add(server.stop)1678    async_finalizer.add(rs.stop)1679    client = protocol.Client("client")1680    result = await client.test_method(id="test", name="test", value=True)1681    assert result.code == 2001682    assert result.result["data"]["name"] == "test"...plugin.py
Source:plugin.py  
...85                res = await gen_obj.__anext__()86                return res87            def finalizer():88                """Yield again, to finalize."""89                async def async_finalizer():90                    try:91                        await gen_obj.__anext__()92                    except StopAsyncIteration:93                        pass94                    else:95                        msg = "Async generator fixture didn't stop."96                        msg += "Yield only once."97                        raise ValueError(msg)98                loop.run_until_complete(async_finalizer())99            request.addfinalizer(finalizer)100            return loop.run_until_complete(setup())101        fixturedef.func = wrapper102    elif inspect.iscoroutinefunction(fixturedef.func):103        coro = fixturedef.func104        fixture_stripper = FixtureStripper(fixturedef)105        fixture_stripper.add(FixtureStripper.EVENT_LOOP)106        def wrapper(*args, **kwargs):107            loop = fixture_stripper.get_and_strip_from(FixtureStripper.EVENT_LOOP, kwargs)108            async def setup():109                res = await coro(*args, **kwargs)110                return res111            return loop.run_until_complete(setup())112        fixturedef.func = wrapper...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
