Best Python code snippet using hypothesis
test_worker.py
Source:test_worker.py  
1# -*- encoding: utf-8 -*-2#3# Licensed under the Apache License, Version 2.0 (the "License"); you may4# not use this file except in compliance with the License. You may obtain5# a copy of the License at6#7#      http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the12# License for the specific language governing permissions and limitations13# under the License.14import asyncio15import sys16import time17from unittest import mock18from freezegun import freeze_time19import httpx20import pytest21from mergify_engine import exceptions22from mergify_engine import logs23from mergify_engine import utils24from mergify_engine import worker25from mergify_engine.clients import http26if sys.version_info < (3, 8):27    # https://github.com/pytest-dev/pytest-asyncio/issues/6928    pytest.skip(29        "mock + pytest-asyncio requires python3.8 or higher",30        allow_module_level=True,31    )32@pytest.fixture()33async def redis():34    r = await utils.create_aredis_for_stream()35    await r.flushdb()36    try:37        yield r38    finally:39        await r.flushdb()40        r.connection_pool.max_idle_time = 041        r.connection_pool.disconnect()42        await utils.stop_pending_aredis_tasks()43async def run_worker():44    w = worker.Worker()45    w.start()46    timeout = 1047    started_at = time.monotonic()48    while (49        w._redis is None or (await w._redis.zcard("streams")) > 050    ) and time.monotonic() - started_at < timeout:51        await asyncio.sleep(0.5)52    w.stop()53    await w.wait_shutdown_complete()54@pytest.mark.asyncio55@mock.patch("mergify_engine.worker.run_engine")56async def test_worker_with_waiting_tasks(run_engine, redis, logger_checker):57    stream_names = []58    for installation_id in range(8):59        for pull_number in range(2):60            for data in range(3):61                owner = f"owner-{installation_id}"62                repo = f"repo-{installation_id}"63                stream_names.append(f"stream~owner-{installation_id}")64                await worker.push(65                    redis,66                    owner,67                    repo,68                    pull_number,69                    "pull_request",70                    {"payload": data},71                )72    # Check everything we push are in redis73    assert 8 == (await redis.zcard("streams"))74    assert 8 == len(await redis.keys("stream~*"))75    for stream_name in stream_names:76        assert 6 == (await redis.xlen(stream_name))77    await run_worker()78    # Check redis is empty79    assert 0 == (await redis.zcard("streams"))80    assert 0 == len(await redis.keys("stream~*"))81    assert 0 == len(await redis.hgetall("attempts"))82    # Check engine have been run with expect data83    assert 16 == len(run_engine.mock_calls)84    assert (85        mock.call(86            "owner-0",87            "repo-0",88            0,89            [90                {91                    "event_type": "pull_request",92                    "data": {"payload": 0},93                    "timestamp": mock.ANY,94                },95                {96                    "event_type": "pull_request",97                    "data": {"payload": 1},98                    "timestamp": mock.ANY,99                },100                {101                    "event_type": "pull_request",102                    "data": {"payload": 2},103                    "timestamp": mock.ANY,104                },105            ],106        )107        in run_engine.mock_calls108    )109@pytest.mark.asyncio110@mock.patch("mergify_engine.worker.run_engine")111@mock.patch("mergify_engine.clients.github.aget_client")112@mock.patch("mergify_engine.github_events.extract_pull_numbers_from_event")113async def test_worker_expanded_events(114    extract_pull_numbers_from_event,115    aget_client,116    run_engine,117    redis,118    logger_checker,119):120    client = mock.Mock(121        name="foo",122        owner="owner",123        repo="repo",124        auth=mock.Mock(installation={"id": 12345}, owner="owner", repo="repo"),125    )126    client.__aenter__ = mock.AsyncMock(return_value=client)127    client.__aexit__ = mock.AsyncMock()128    client.items.return_value = mock.AsyncMock()129    aget_client.return_value = client130    extract_pull_numbers_from_event.return_value = [123, 456, 789]131    await worker.push(132        redis,133        "owner",134        "repo",135        123,136        "pull_request",137        {"payload": "whatever"},138    )139    await worker.push(140        redis,141        "owner",142        "repo",143        None,144        "comment",145        {"payload": "foobar"},146    )147    assert 1 == (await redis.zcard("streams"))148    assert 1 == len(await redis.keys("stream~*"))149    assert 2 == (await redis.xlen("stream~owner"))150    await run_worker()151    # Check redis is empty152    assert 0 == (await redis.zcard("streams"))153    assert 0 == len(await redis.keys("stream~*"))154    assert 0 == len(await redis.hgetall("attempts"))155    # Check engine have been run with expect data156    assert 3 == len(run_engine.mock_calls)157    assert run_engine.mock_calls[0] == mock.call(158        "owner",159        "repo",160        123,161        [162            {163                "event_type": "pull_request",164                "data": {"payload": "whatever"},165                "timestamp": mock.ANY,166            },167            {168                "event_type": "comment",169                "data": {"payload": "foobar"},170                "timestamp": mock.ANY,171            },172        ],173    )174    assert run_engine.mock_calls[1] == mock.call(175        "owner",176        "repo",177        456,178        [179            {180                "event_type": "comment",181                "data": {"payload": "foobar"},182                "timestamp": mock.ANY,183            },184        ],185    )186    assert run_engine.mock_calls[2] == mock.call(187        "owner",188        "repo",189        789,190        [191            {192                "event_type": "comment",193                "data": {"payload": "foobar"},194                "timestamp": mock.ANY,195            },196        ],197    )198@pytest.mark.asyncio199@mock.patch("mergify_engine.worker.run_engine")200async def test_worker_with_one_task(run_engine, redis, logger_checker):201    await worker.push(202        redis,203        "owner",204        "repo",205        123,206        "pull_request",207        {"payload": "whatever"},208    )209    await worker.push(210        redis,211        "owner",212        "repo",213        123,214        "comment",215        {"payload": "foobar"},216    )217    assert 1 == (await redis.zcard("streams"))218    assert 1 == len(await redis.keys("stream~*"))219    assert 2 == (await redis.xlen("stream~owner"))220    await run_worker()221    # Check redis is empty222    assert 0 == (await redis.zcard("streams"))223    assert 0 == len(await redis.keys("stream~*"))224    assert 0 == len(await redis.hgetall("attempts"))225    # Check engine have been run with expect data226    assert 1 == len(run_engine.mock_calls)227    assert run_engine.mock_calls[0] == mock.call(228        "owner",229        "repo",230        123,231        [232            {233                "event_type": "pull_request",234                "data": {"payload": "whatever"},235                "timestamp": mock.ANY,236            },237            {238                "event_type": "comment",239                "data": {"payload": "foobar"},240                "timestamp": mock.ANY,241            },242        ],243    )244@pytest.mark.asyncio245@mock.patch("mergify_engine.worker.run_engine")246async def test_consume_unexisting_stream(run_engine, redis, logger_checker):247    p = worker.StreamProcessor(redis)248    await p.consume("stream~notexists")249    assert len(run_engine.mock_calls) == 0250@pytest.mark.asyncio251@mock.patch("mergify_engine.worker.run_engine")252async def test_consume_good_stream(run_engine, redis, logger_checker):253    await worker.push(254        redis,255        "owner",256        "repo",257        123,258        "pull_request",259        {"payload": "whatever"},260    )261    await worker.push(262        redis,263        "owner",264        "repo",265        123,266        "comment",267        {"payload": "foobar"},268    )269    assert 1 == (await redis.zcard("streams"))270    assert 1 == len(await redis.keys("stream~*"))271    assert 2 == await redis.xlen("stream~owner")272    assert 0 == len(await redis.hgetall("attempts"))273    p = worker.StreamProcessor(redis)274    await p.consume("stream~owner")275    assert len(run_engine.mock_calls) == 1276    assert run_engine.mock_calls[0] == mock.call(277        "owner",278        "repo",279        123,280        [281            {282                "event_type": "pull_request",283                "data": {"payload": "whatever"},284                "timestamp": mock.ANY,285            },286            {287                "event_type": "comment",288                "data": {"payload": "foobar"},289                "timestamp": mock.ANY,290            },291        ],292    )293    # Check redis is empty294    assert 0 == (await redis.zcard("streams"))295    assert 0 == len(await redis.keys("stream~*"))296    assert 0 == len(await redis.hgetall("attempts"))297@pytest.mark.asyncio298@mock.patch("mergify_engine.worker.daiquiri.getLogger")299@mock.patch("mergify_engine.worker.run_engine")300async def test_stream_processor_retrying_pull(run_engine, logger_class, redis):301    logs.setup_logging()302    logger = logger_class.return_value303    # One retries once, the other reaches max_retry304    run_engine.side_effect = [305        exceptions.MergeableStateUnknown(mock.Mock()),306        exceptions.MergeableStateUnknown(mock.Mock()),307        mock.Mock(),308        exceptions.MergeableStateUnknown(mock.Mock()),309        exceptions.MergeableStateUnknown(mock.Mock()),310    ]311    await worker.push(312        redis,313        "owner",314        "repo",315        123,316        "pull_request",317        {"payload": "whatever"},318    )319    await worker.push(320        redis,321        "owner",322        "repo",323        42,324        "comment",325        {"payload": "foobar"},326    )327    assert 1 == (await redis.zcard("streams"))328    assert 1 == len(await redis.keys("stream~*"))329    assert 2 == await redis.xlen("stream~owner")330    assert 0 == len(await redis.hgetall("attempts"))331    p = worker.StreamProcessor(redis)332    await p.consume("stream~owner")333    assert len(run_engine.mock_calls) == 2334    assert run_engine.mock_calls == [335        mock.call(336            "owner",337            "repo",338            123,339            [340                {341                    "event_type": "pull_request",342                    "data": {"payload": "whatever"},343                    "timestamp": mock.ANY,344                },345            ],346        ),347        mock.call(348            "owner",349            "repo",350            42,351            [352                {353                    "event_type": "comment",354                    "data": {"payload": "foobar"},355                    "timestamp": mock.ANY,356                },357            ],358        ),359    ]360    # Check stream still there and attempts recorded361    assert 1 == (await redis.zcard("streams"))362    assert 1 == len(await redis.keys("stream~*"))363    assert {364        b"pull~owner~repo~42": b"1",365        b"pull~owner~repo~123": b"1",366    } == await redis.hgetall("attempts")367    await p.consume("stream~owner")368    assert 1 == (await redis.zcard("streams"))369    assert 1 == len(await redis.keys("stream~*"))370    assert 1 == len(await redis.hgetall("attempts"))371    assert len(run_engine.mock_calls) == 4372    assert {b"pull~owner~repo~42": b"2"} == await redis.hgetall("attempts")373    await p.consume("stream~owner")374    assert len(run_engine.mock_calls) == 5375    # Too many retries, everything is gone376    assert 3 == len(logger.info.mock_calls)377    assert 1 == len(logger.error.mock_calls)378    assert logger.info.mock_calls[0].args == (379        "failed to process pull request, retrying",380    )381    assert logger.info.mock_calls[1].args == (382        "failed to process pull request, retrying",383    )384    assert logger.error.mock_calls[0].args == (385        "failed to process pull request, abandoning",386    )387    assert 0 == (await redis.zcard("streams"))388    assert 0 == len(await redis.keys("stream~*"))389    assert 0 == len(await redis.hgetall("attempts"))390@pytest.mark.asyncio391@mock.patch.object(worker, "LOG")392@mock.patch("mergify_engine.worker.run_engine")393async def test_stream_processor_retrying_stream_recovered(run_engine, logger, redis):394    logs.setup_logging()395    response = mock.Mock()396    response.json.return_value = {"message": "boom"}397    response.status_code = 401398    run_engine.side_effect = http.HTTPClientSideError(399        message="foobar", request=response.request, response=response400    )401    await worker.push(402        redis,403        "owner",404        "repo",405        123,406        "pull_request",407        {"payload": "whatever"},408    )409    await worker.push(410        redis,411        "owner",412        "repo",413        123,414        "comment",415        {"payload": "foobar"},416    )417    assert 1 == (await redis.zcard("streams"))418    assert 1 == len(await redis.keys("stream~*"))419    assert 2 == await redis.xlen("stream~owner")420    assert 0 == len(await redis.hgetall("attempts"))421    p = worker.StreamProcessor(redis)422    await p.consume("stream~owner")423    assert len(run_engine.mock_calls) == 1424    assert run_engine.mock_calls[0] == mock.call(425        "owner",426        "repo",427        123,428        [429            {430                "event_type": "pull_request",431                "data": {"payload": "whatever"},432                "timestamp": mock.ANY,433            },434            {435                "event_type": "comment",436                "data": {"payload": "foobar"},437                "timestamp": mock.ANY,438            },439        ],440    )441    # Check stream still there and attempts recorded442    assert 1 == (await redis.zcard("streams"))443    assert 1 == len(await redis.keys("stream~*"))444    assert 1 == len(await redis.hgetall("attempts"))445    assert {b"stream~owner": b"1"} == await redis.hgetall("attempts")446    run_engine.side_effect = None447    await p.consume("stream~owner")448    assert len(run_engine.mock_calls) == 2449    assert 0 == (await redis.zcard("streams"))450    assert 0 == len(await redis.keys("stream~*"))451    assert 0 == len(await redis.hgetall("attempts"))452    assert 1 == len(logger.info.mock_calls)453    assert 0 == len(logger.error.mock_calls)454    assert logger.info.mock_calls[0].args == ("failed to process stream, retrying",)455@pytest.mark.asyncio456@mock.patch.object(worker, "LOG")457@mock.patch("mergify_engine.worker.run_engine")458async def test_stream_processor_retrying_stream_failure(run_engine, logger, redis):459    logs.setup_logging()460    response = mock.Mock()461    response.json.return_value = {"message": "boom"}462    response.status_code = 401463    run_engine.side_effect = http.HTTPClientSideError(464        message="foobar", request=response.request, response=response465    )466    await worker.push(467        redis,468        "owner",469        "repo",470        123,471        "pull_request",472        {"payload": "whatever"},473    )474    await worker.push(475        redis,476        "owner",477        "repo",478        123,479        "comment",480        {"payload": "foobar"},481    )482    assert 1 == (await redis.zcard("streams"))483    assert 1 == len(await redis.keys("stream~*"))484    assert 2 == await redis.xlen("stream~owner")485    assert 0 == len(await redis.hgetall("attempts"))486    p = worker.StreamProcessor(redis)487    await p.consume("stream~owner")488    assert len(run_engine.mock_calls) == 1489    assert run_engine.mock_calls[0] == mock.call(490        "owner",491        "repo",492        123,493        [494            {495                "event_type": "pull_request",496                "data": {"payload": "whatever"},497                "timestamp": mock.ANY,498            },499            {500                "event_type": "comment",501                "data": {"payload": "foobar"},502                "timestamp": mock.ANY,503            },504        ],505    )506    # Check stream still there and attempts recorded507    assert 1 == (await redis.zcard("streams"))508    assert 1 == len(await redis.keys("stream~*"))509    assert 1 == len(await redis.hgetall("attempts"))510    assert {b"stream~owner": b"1"} == await redis.hgetall("attempts")511    await p.consume("stream~owner")512    assert len(run_engine.mock_calls) == 2513    assert {b"stream~owner": b"2"} == await redis.hgetall("attempts")514    await p.consume("stream~owner")515    assert len(run_engine.mock_calls) == 3516    # Still there517    assert 3 == len(logger.info.mock_calls)518    assert 0 == len(logger.error.mock_calls)519    assert logger.info.mock_calls[0].args == ("failed to process stream, retrying",)520    assert logger.info.mock_calls[1].args == ("failed to process stream, retrying",)521    assert logger.info.mock_calls[2].args == ("failed to process stream, retrying",)522    assert 1 == (await redis.zcard("streams"))523    assert 1 == len(await redis.keys("stream~*"))524    assert 1 == len(await redis.hgetall("attempts"))525@pytest.mark.asyncio526@mock.patch("mergify_engine.worker.daiquiri.getLogger")527@mock.patch("mergify_engine.worker.run_engine")528async def test_stream_processor_pull_unexpected_error(run_engine, logger_class, redis):529    logs.setup_logging()530    logger = logger_class.return_value531    run_engine.side_effect = Exception532    await worker.push(533        redis,534        "owner",535        "repo",536        123,537        "pull_request",538        {"payload": "whatever"},539    )540    p = worker.StreamProcessor(redis)541    await p.consume("stream~owner")542    await p.consume("stream~owner")543    # Exception have been logged, redis must be clean544    assert len(run_engine.mock_calls) == 2545    assert len(logger.error.mock_calls) == 2546    assert logger.error.mock_calls[0].args == ("failed to process pull request",)547    assert logger.error.mock_calls[1].args == ("failed to process pull request",)548    assert 1 == (await redis.zcard("streams"))549    assert 1 == len(await redis.keys("stream~*"))550    assert 0 == len(await redis.hgetall("attempts"))551@pytest.mark.asyncio552@mock.patch("mergify_engine.worker.run_engine")553async def test_stream_processor_date_scheduling(run_engine, redis, logger_checker):554    # Don't process it before 2040555    with freeze_time("2040-01-01"):556        await worker.push(557            redis,558            "owner1",559            "repo",560            123,561            "pull_request",562            {"payload": "whatever"},563        )564        unwanted_owner_id = "owner1"565    with freeze_time("2020-01-01"):566        await worker.push(567            redis,568            "owner2",569            "repo",570            321,571            "pull_request",572            {"payload": "foobar"},573        )574        wanted_owner_id = "owner2"575    assert 2 == (await redis.zcard("streams"))576    assert 2 == len(await redis.keys("stream~*"))577    assert 0 == len(await redis.hgetall("attempts"))578    s = worker.StreamSelector(1, redis)579    p = worker.StreamProcessor(redis)580    received = []581    def fake_engine(owner, repo, pull_number, sources):582        received.append(owner)583    run_engine.side_effect = fake_engine584    with freeze_time("2020-01-14"):585        async with s.next_stream() as stream_name:586            assert stream_name is not None587            await p.consume(stream_name)588    assert 1 == (await redis.zcard("streams"))589    assert 1 == len(await redis.keys("stream~*"))590    assert 0 == len(await redis.hgetall("attempts"))591    assert received == [wanted_owner_id]592    with freeze_time("2030-01-14"):593        async with s.next_stream() as stream_name:594            assert stream_name is None595    assert 1 == (await redis.zcard("streams"))596    assert 1 == len(await redis.keys("stream~*"))597    assert 0 == len(await redis.hgetall("attempts"))598    assert received == [wanted_owner_id]599    # We are in 2041, we have something todo :)600    with freeze_time("2041-01-14"):601        async with s.next_stream() as stream_name:602            assert stream_name is not None603            await p.consume(stream_name)604    assert 0 == (await redis.zcard("streams"))605    assert 0 == len(await redis.keys("stream~*"))606    assert 0 == len(await redis.hgetall("attempts"))607    assert received == [wanted_owner_id, unwanted_owner_id]608@pytest.mark.asyncio609async def test_worker_debug_report(redis, logger_checker):610    stream_names = []611    for installation_id in range(8):612        for pull_number in range(2):613            for data in range(3):614                owner = f"owner-{installation_id}"615                repo = f"repo-{installation_id}"616                stream_names.append(f"stream~owner-{installation_id}")617                await worker.push(618                    redis,619                    owner,620                    repo,621                    pull_number,622                    "pull_request",623                    {"payload": data},624                )625    await worker.async_status()626@pytest.mark.asyncio627@mock.patch("mergify_engine.worker.run_engine")628async def test_stream_processor_retrying_after_read_error(run_engine, redis):629    response = mock.Mock()630    response.json.return_value = {"message": "boom"}631    response.status_code = 503632    run_engine.side_effect = httpx.ReadError(633        "Server disconnected while attempting read",634        request=mock.Mock(),635    )636    p = worker.StreamProcessor(redis)637    with pytest.raises(worker.StreamRetry):638        await p._run_engine_and_translate_exception_to_retries(639            "stream-owner", "owner", "repo", 1234, []...test_merge.py
Source:test_merge.py  
...41        self.git("fetch", "--all")42        p_ready, _ = self.create_pr(base_repo="main")43        self.add_label(p_need_rebase, "ready")44        self.add_label(p_ready, "ready")45        self.run_engine()46        return p_need_rebase, p_ready47    def test_merge_smart_ordered(self):48        p_need_rebase, p_ready = self._do_test_smart_order("smart+ordered")49        ctxt = context.Context(self.cli_integration, p_need_rebase.raw_data, {})50        q = queue.Queue.from_context(ctxt)51        pulls_in_queue = q.get_pulls()52        assert pulls_in_queue == [p_need_rebase.number, p_ready.number]53    def test_merge_smart_unordered(self):54        p_need_rebase, p_ready = self._do_test_smart_order("smart+fastpath")55        ctxt = context.Context(self.cli_integration, p_need_rebase.raw_data, {})56        q = queue.Queue.from_context(ctxt)57        pulls_in_queue = q.get_pulls()58        assert pulls_in_queue == [p_need_rebase.number]59        p_ready.update()60        assert p_ready.merged61    def test_merge_smart_legacy(self):62        p_need_rebase, p_ready = self._do_test_smart_order("smart")63        ctxt = context.Context(self.cli_integration, p_need_rebase.raw_data, {})64        q = queue.Queue.from_context(ctxt)65        pulls_in_queue = q.get_pulls()66        assert pulls_in_queue == [p_need_rebase.number, p_ready.number]67    def test_merge_priority(self):68        rules = {69            "pull_request_rules": [70                {71                    "name": "Merge priority high",72                    "conditions": [73                        f"base={self.master_branch_name}",74                        "label=high",75                        "status-success=continuous-integration/fake-ci",76                    ],77                    "actions": {78                        "merge": {"strict": "smart+ordered", "priority": "high"}79                    },80                },81                {82                    "name": "Merge priority default",83                    "conditions": [84                        f"base={self.master_branch_name}",85                        "label=medium",86                        "status-success=continuous-integration/fake-ci",87                    ],88                    "actions": {"merge": {"strict": "smart+ordered"}},89                },90                {91                    "name": "Merge priority low",92                    "conditions": [93                        f"base={self.master_branch_name}",94                        "label=low",95                        "status-success=continuous-integration/fake-ci",96                    ],97                    "actions": {"merge": {"strict": "smart+ordered", "priority": 1}},98                },99            ]100        }101        self.setup_repo(yaml.dump(rules))102        p_high, _ = self.create_pr()103        p_medium, _ = self.create_pr()104        p_low, _ = self.create_pr()105        # To force others to be rebased106        p, _ = self.create_pr()107        p.merge()108        self.wait_for("pull_request", {"action": "closed"}),109        self.run_engine()110        # Merge them in reverse priority to ensure there are reordered111        self.add_label(p_low, "low")112        self.create_status(p_low)113        self.add_label(p_medium, "medium")114        self.create_status(p_medium)115        self.add_label(p_high, "high")116        self.create_status(p_high)117        self.run_engine()118        ctxt = context.Context(self.cli_integration, p.raw_data, {})119        q = queue.Queue.from_context(ctxt)120        pulls_in_queue = q.get_pulls()121        assert pulls_in_queue == [p_high.number, p_medium.number, p_low.number]122        # Each PR can rebased, because we insert them in reserve order, but they are still123        # all in queue124        self.wait_for("pull_request", {"action": "synchronize"})125        self.wait_for("pull_request", {"action": "synchronize"})126        self.wait_for("pull_request", {"action": "synchronize"})127        self.run_engine()128        p_high.update()129        self.create_status(p_high)130        self.run_engine()  # PR merged, refresh emitted on next PR131        self.wait_for("pull_request", {"action": "closed"})132        self.run_engine()  # exec the refresh133        self.wait_for("pull_request", {"action": "synchronize"})134        self.run_engine()135        p_medium.update()136        self.create_status(p_medium)137        self.run_engine()  # PR merged, refresh emitted on next PR138        self.wait_for("pull_request", {"action": "closed"})139        self.run_engine()  # exec the refresh140        self.wait_for("pull_request", {"action": "synchronize"})141        self.run_engine()142        p_low.update()143        self.create_status(p_low)144        self.run_engine()  # PR merged, refresh emitted on next PR145        self.wait_for("pull_request", {"action": "closed"})146        p_low = p_low.base.repo.get_pull(p_low.number)147        p_medium = p_medium.base.repo.get_pull(p_medium.number)148        p_high = p_high.base.repo.get_pull(p_high.number)149        self.assertEqual(True, p_low.merged)150        self.assertEqual(True, p_medium.merged)151        self.assertEqual(True, p_high.merged)152        assert p_low.merged_at > p_medium.merged_at > p_high.merged_at153    def test_merge_rule_switch(self):154        rules = {155            "pull_request_rules": [156                {157                    "name": "Merge priority high",158                    "conditions": [159                        f"base={self.master_branch_name}",160                        "label=high",161                        "status-success=continuous-integration/fake-ci",162                    ],163                    "actions": {164                        "merge": {"strict": "smart+ordered", "priority": "high"}165                    },166                },167                {168                    "name": "Merge priority medium",169                    "conditions": [170                        f"base={self.master_branch_name}",171                        "label=medium",172                        "status-success=continuous-integration/fake-ci",173                    ],174                    "actions": {"merge": {"strict": "smart+ordered"}},175                },176                {177                    "name": "Merge priority low",178                    "conditions": [179                        f"base={self.master_branch_name}",180                        "label=low",181                        "status-success=continuous-integration/fake-ci",182                    ],183                    "actions": {"merge": {"strict": "smart+ordered", "priority": 1}},184                },185            ]186        }187        self.setup_repo(yaml.dump(rules))188        p1, _ = self.create_pr()189        p2, _ = self.create_pr()190        # To force others to be rebased191        p, _ = self.create_pr()192        p.merge()193        self.wait_for("pull_request", {"action": "closed"}),194        # Merge them in reverse priority to ensure there are reordered195        self.add_label(p1, "medium")196        self.add_label(p2, "low")197        self.create_status(p1)198        self.create_status(p2)199        self.run_engine()200        ctxt = context.Context(self.cli_integration, p.raw_data, {})201        q = queue.Queue.from_context(ctxt)202        pulls_in_queue = q.get_pulls()203        assert pulls_in_queue == [p1.number, p2.number]204        p2.remove_from_labels("low")205        self.add_label(p2, "high")206        self.run_engine()207        pulls_in_queue = q.get_pulls()208        assert pulls_in_queue == [p2.number, p1.number]209    def test_merge_github_workflow(self):210        rules = {211            "pull_request_rules": [212                {213                    "name": "Merge",214                    "conditions": [f"base={self.master_branch_name}"],215                    "actions": {"merge": {"strict": "smart+ordered"}},216                },217            ]218        }219        self.setup_repo(yaml.dump(rules))220        p, _ = self.create_pr(files={".github/workflows/foo.yml": "whatever"})221        self.run_engine()222        ctxt = context.Context(self.cli_integration, p.raw_data, {})223        checks = ctxt.pull_engine_check_runs224        assert len(checks) == 2225        check = checks[1]226        assert check["conclusion"] == "action_required"227        assert check["output"]["title"] == "Pull request must be merged manually."228        assert (229            check["output"]["summary"]230            == "GitHub App like Mergify are not allowed to merge pull request where `.github/workflows` is changed.\n<br />\nThis pull request must be merged manually."231        )232    def test_merge_with_installation_token(self):233        rules = {234            "pull_request_rules": [235                {236                    "name": "merge on master",237                    "conditions": [f"base={self.master_branch_name}"],238                    "actions": {"merge": {}},239                },240            ]241        }242        self.setup_repo(yaml.dump(rules))243        p, _ = self.create_pr()244        self.run_engine()245        self.wait_for("pull_request", {"action": "closed"})246        p.update()247        self.assertEqual(True, p.merged)248        self.assertEqual("mergify-test[bot]", p.merged_by.login)249    def test_merge_with_oauth_token(self):250        rules = {251            "pull_request_rules": [252                {253                    "name": "merge on master",254                    "conditions": [f"base={self.master_branch_name}"],255                    "actions": {"merge": {"merge_bot_account": "mergify-test1"}},256                },257            ]258        }259        self.setup_repo(yaml.dump(rules))260        p, _ = self.create_pr()261        self.run_engine()262        self.wait_for("pull_request", {"action": "closed"})263        p.update()264        self.assertEqual(True, p.merged)265        self.assertEqual("mergify-test1", p.merged_by.login)266class TestMergeNoSubAction(base.FunctionalTestBase):267    SUBSCRIPTION_ACTIVE = False268    def test_merge_priority(self):269        rules = {270            "pull_request_rules": [271                {272                    "name": "Merge priority high",273                    "conditions": [274                        f"base={self.master_branch_name}",275                        "label=high",276                        "status-success=continuous-integration/fake-ci",277                    ],278                    "actions": {279                        "merge": {"strict": "smart+ordered", "priority": "high"}280                    },281                },282                {283                    "name": "Merge priority default",284                    "conditions": [285                        f"base={self.master_branch_name}",286                        "label=medium",287                        "status-success=continuous-integration/fake-ci",288                    ],289                    "actions": {"merge": {"strict": "smart+ordered"}},290                },291                {292                    "name": "Merge priority low",293                    "conditions": [294                        f"base={self.master_branch_name}",295                        "label=low",296                        "status-success=continuous-integration/fake-ci",297                    ],298                    "actions": {"merge": {"strict": "smart+ordered", "priority": 1}},299                },300            ]301        }302        self.setup_repo(yaml.dump(rules))303        p_high, _ = self.create_pr()304        p_medium, _ = self.create_pr()305        p_low, _ = self.create_pr()306        # To force others to be rebased307        p, _ = self.create_pr()308        p.merge()309        self.wait_for("pull_request", {"action": "closed"}),310        self.run_engine()311        # Merge them in reverse priority to ensure there are reordered312        self.add_label(p_low, "low")313        self.create_status(p_low)314        self.add_label(p_medium, "medium")315        self.create_status(p_medium)316        self.add_label(p_high, "high")317        self.create_status(p_high)318        self.run_engine()319        ctxt = context.Context(self.cli_integration, p.raw_data, {})320        q = queue.Queue.from_context(ctxt)321        pulls_in_queue = q.get_pulls()322        assert pulls_in_queue == [p_low.number, p_medium.number, p_high.number]323        p_low.update()324        self.create_status(p_low)325        self.run_engine()326        self.wait_for("pull_request", {"action": "synchronize"})327        self.run_engine()328        p_medium.update()329        self.create_status(p_medium)330        self.run_engine()331        self.wait_for("pull_request", {"action": "synchronize"})332        self.run_engine()333        p_high.update()334        self.create_status(p_high)335        self.run_engine()336        self.wait_for("pull_request", {"action": "closed"})337        p_low.update()338        p_medium.update()339        p_high.update()340        self.assertEqual(True, p_low.merged)341        self.assertEqual(True, p_medium.merged)342        self.assertEqual(True, p_high.merged)...test_payment_engine.py
Source:test_payment_engine.py  
...9         {'type': 'deposit', 'client': '1', 'tx': '2', 'amount': '1.0'},10         {'type': 'deposit', 'client': '2', 'tx': '3', 'amount': '234.0442'},11         {'type': 'deposit', 'client': '2', 'tx': '4', 'amount': '254.044772'}])12    pe = PaymentEngine(prepared_data)13    pe.run_engine()14    pe.print_results()15    assert captured_output.getvalue() == "client,available,held,total,locked\n" \16                                         "1,2.0,0.0,2.0,false\n2,488.089,0.0,488.089,false\n"17def test_deposit_negative():18    captured_output = capture_stdout()19    prepared_data = generate_test_data(20        [{'type': 'deposit', 'client': '1', 'tx': '1', 'amount': '1.0'},21         {'type': 'deposit', 'client': '1', 'tx': '2', 'amount': '1.0'},22         {'type': 'deposit', 'client': '1', 'tx': '3', 'amount': '-23'},23         {'type': 'deposit', 'client': '1', 'tx': '4', 'amount': '0'}])24    pe = PaymentEngine(prepared_data)25    pe.run_engine()26    pe.print_results()27    assert captured_output.getvalue() == "client,available,held,total,locked\n1,2.0,0.0,2.0,false\n"28def test_withdraw_positive():29    captured_output = capture_stdout()30    prepared_data = generate_test_data(31        [{'type': 'deposit', 'client': '1', 'tx': '1', 'amount': '1421.0'},32         {'type': 'deposit', 'client': '2', 'tx': '2', 'amount': '1.0'},33         {'type': 'withdrawal', 'client': '1', 'tx': '3', 'amount': '1420.0'},34         {'type': 'withdrawal', 'client': '2', 'tx': '4', 'amount': '1.0'}])35    pe = PaymentEngine(prepared_data)36    pe.run_engine()37    pe.print_results()38    assert captured_output.getvalue() == "client,available,held,total,locked\n" \39                                         "1,1.0,0.0,1.0,false\n2,1.0,0.0,1.0,false\n"40def test_withdraw_negative():41    captured_output = capture_stdout()42    prepared_data = generate_test_data(43        [{'type': 'deposit', 'client': '1', 'tx': '1', 'amount': '12.2'},44         {'type': 'withdrawal', 'client': '1', 'tx': '2', 'amount': '13.0'},45         {'type': 'withdrawal', 'client': '1', 'tx': '3', 'amount': '12.1'},46         {'type': 'deposit', 'client': '2', 'tx': '4', 'amount': '1'},47         {'type': 'withdrawal', 'client': '2', 'tx': '5', 'amount': '0.9999'}])48    pe = PaymentEngine(prepared_data)49    pe.run_engine()50    pe.print_results()51    assert captured_output.getvalue() == "client,available,held,total,locked\n" \52                                         "1,0.1,0.0,0.1,false\n2,0.0001,0.0,0.0001,false\n"53@pytest.mark.parametrize(('tx', 'expected_outcome'), [54    (1, "TransactionHistory(type='deposit', client=1, amount=1421.0)"),55    (2, "TransactionHistory(type='deposit', client=2, amount=1.0)")])56def test_transaction_history(tx, expected_outcome):57    prepared_data = generate_test_data(58        [{'type': 'deposit', 'client': '1', 'tx': '1', 'amount': '1421.0'},59         {'type': 'deposit', 'client': '2', 'tx': '2', 'amount': '1.0'}])60    pe = PaymentEngine(prepared_data)61    pe.run_engine()62    assert str(pe.get_transaction_by_tx(tx)) == expected_outcome63def test_dispute_after_deposit():64    captured_output = capture_stdout()65    prepared_data = generate_test_data(66        [{'type': 'deposit', 'client': '1', 'tx': '1', 'amount': '1421.0'},67         {'type': 'withdrawal', 'client': '1', 'tx': '2', 'amount': '1.0'},68         {'type': 'dispute', 'client': '1', 'tx': '2'}])69    pe = PaymentEngine(prepared_data)70    pe.run_engine()71    pe.print_results()72    assert captured_output.getvalue() == "client,available,held,total,locked\n1,1419.0,1.0,1420.0,false\n"73def test_dispute_after_withdrawal():74    captured_output = capture_stdout()75    prepared_data = generate_test_data(76        [{'type': 'deposit', 'client': '1', 'tx': '1', 'amount': '1421.0'},77         {'type': 'deposit', 'client': '1', 'tx': '2', 'amount': '1.0'},78         {'type': 'dispute', 'client': '1', 'tx': '2'}])79    pe = PaymentEngine(prepared_data)80    pe.run_engine()81    pe.print_results()82    assert captured_output.getvalue() == "client,available,held,total,locked\n1,1421.0,1.0,1422.0,false\n"83def test_dispute_by_another_client():84    captured_output = capture_stdout()85    prepared_data = generate_test_data(86        [{'type': 'deposit', 'client': '1', 'tx': '1', 'amount': '1421.0'},87         {'type': 'withdrawal', 'client': '1', 'tx': '2', 'amount': '1.0'},88         {'type': 'dispute', 'client': '2', 'tx': '2'}])89    pe = PaymentEngine(prepared_data)90    pe.run_engine()91    pe.print_results()92    assert captured_output.getvalue() == "client,available,held,total,locked\n" \93                                         "1,1420.0,0.0,1420.0,false\n2,0.0,0.0,0.0,false\n"94def test_dispute_available_below_zero():95    captured_output = capture_stdout()96    prepared_data = generate_test_data(97        [{'type': 'deposit', 'client': '1', 'tx': '1', 'amount': '1421.0'},98         {'type': 'withdrawal', 'client': '1', 'tx': '2', 'amount': '1000.0324'},99         {'type': 'dispute', 'client': '1', 'tx': '2'}])100    pe = PaymentEngine(prepared_data)101    pe.run_engine()102    pe.print_results()103    assert captured_output.getvalue() == "client,available,held,total,locked\n" \104                                         "1,-579.0648,1000.0324,420.9676,false\n"105def test_resolve_dispute_positive():106    captured_output = capture_stdout()107    prepared_data = generate_test_data(108        [{'type': 'deposit', 'client': '1', 'tx': '1', 'amount': '1421.0'},109         {'type': 'withdrawal', 'client': '1', 'tx': '2', 'amount': '1.0'},110         {'type': 'dispute', 'client': '1', 'tx': '2'},111         {'type': 'resolve', 'client': '1', 'tx': '2'}])112    pe = PaymentEngine(prepared_data)113    pe.run_engine()114    pe.print_results()115    assert captured_output.getvalue() == "client,available,held,total,locked\n1,1420.0,0.0,1420.0,false\n"116def test_resolve_dispute_by_another_client():117    captured_output = capture_stdout()118    prepared_data = generate_test_data(119        [{'type': 'deposit', 'client': '1', 'tx': '1', 'amount': '1421.0'},120         {'type': 'withdrawal', 'client': '1', 'tx': '2', 'amount': '1.0'},121         {'type': 'dispute', 'client': '1', 'tx': '2'},122         {'type': 'resolve', 'client': '2', 'tx': '2'}])123    pe = PaymentEngine(prepared_data)124    pe.run_engine()125    pe.print_results()126    assert captured_output.getvalue() == "client,available,held,total,locked\n" \127                                         "1,1419.0,1.0,1420.0,false\n2,0.0,0.0,0.0,false\n"128def test_resolve_dispute_with_wrong_tx():129    captured_output = capture_stdout()130    prepared_data = generate_test_data(131        [{'type': 'deposit', 'client': '1', 'tx': '1', 'amount': '1421.0'},132         {'type': 'withdrawal', 'client': '1', 'tx': '2', 'amount': '1.0'},133         {'type': 'dispute', 'client': '1', 'tx': '2'},134         {'type': 'resolve', 'client': '1', 'tx': '1'}])135    pe = PaymentEngine(prepared_data)136    pe.run_engine()137    pe.print_results()138    assert captured_output.getvalue() == "client,available,held,total,locked\n1,1419.0,1.0,1420.0,false\n"139def test_chargeback_dispute_by_another_client():140    captured_output = capture_stdout()141    prepared_data = generate_test_data(142        [{'type': 'deposit', 'client': '1', 'tx': '1', 'amount': '1421.0'},143         {'type': 'withdrawal', 'client': '1', 'tx': '2', 'amount': '1.0'},144         {'type': 'dispute', 'client': '1', 'tx': '2'},145         {'type': 'chargeback', 'client': '2', 'tx': '2'}])146    pe = PaymentEngine(prepared_data)147    pe.run_engine()148    pe.print_results()149    assert captured_output.getvalue() == "client,available,held,total,locked\n" \150                                         "1,1419.0,1.0,1420.0,false\n2,0.0,0.0,0.0,false\n"151def test_chargeback_dispute_with_wrong_tx():152    captured_output = capture_stdout()153    prepared_data = generate_test_data(154        [{'type': 'deposit', 'client': '1', 'tx': '1', 'amount': '1421.0'},155         {'type': 'withdrawal', 'client': '1', 'tx': '2', 'amount': '1.0'},156         {'type': 'dispute', 'client': '1', 'tx': '2'},157         {'type': 'chargeback', 'client': '1', 'tx': '1'}])158    pe = PaymentEngine(prepared_data)159    pe.run_engine()160    pe.print_results()161    assert captured_output.getvalue() == "client,available,held,total,locked\n1,1419.0,1.0,1420.0,false\n"162def test_chargeback_dispute_client_cant_operate_as_locked():163    captured_output = capture_stdout()164    prepared_data = generate_test_data(165        [{'type': 'deposit', 'client': '1', 'tx': '1', 'amount': '1421.0'},166         {'type': 'withdrawal', 'client': '1', 'tx': '2', 'amount': '1.0'},167         {'type': 'dispute', 'client': '1', 'tx': '2'},168         {'type': 'chargeback', 'client': '1', 'tx': '2'},169         {'type': 'deposit', 'client': '1', 'tx': '3', 'amount': '22.0'},170         {'type': 'withdrawal', 'client': '1', 'tx': '4', 'amount': '3.0'}])171    pe = PaymentEngine(prepared_data)172    pe.run_engine()173    pe.print_results()174    assert captured_output.getvalue() == "client,available,held,total,locked\n1,1419.0,0.0,1419.0,true\n"175def test_transaction_id_not_unique():176    captured_output = capture_stdout()177    prepared_data = generate_test_data(178        [{'type': 'deposit', 'client': '1', 'tx': '1', 'amount': '1421.0'},179         {'type': 'withdrawal', 'client': '1', 'tx': '2', 'amount': '1.0'},180         {'type': 'deposit', 'client': '1', 'tx': '1', 'amount': '1.0'},181         ])182    pe = PaymentEngine(prepared_data)183    pe.run_engine()184    pe.print_results()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
