Best Python code snippet using molotov_python
test_run.py
Source:test_run.py  
...152        self.assertTrue(wanted in stdout)153    @dedicatedloop154    def test_name(self):155        @scenario(weight=10)156        async def here_three(session):157            _RES.append(3)158        @scenario(weight=30, name="me")159        async def here_four(session):160            _RES.append(4)161        stdout, stderr, rc = self._test_molotov(162            "-cx", "--max-runs", "2", "-s", "me", "molotov.tests.test_run"163        )164        wanted = "SUCCESSES: 2"165        self.assertTrue(wanted in stdout)166        self.assertTrue(_RES, [4, 4])167    @dedicatedloop168    def test_single_mode(self):169        @scenario(weight=10)170        async def here_three(session):171            _RES.append(3)172        stdout, stderr, rc = self._test_molotov(173            "-cx", "--max-runs", "2", "-s", "here_three", "molotov.tests.test_run"174        )175        wanted = "SUCCESSES: 2"176        self.assertTrue(wanted in stdout)177    @dedicatedloop178    def test_fail_mode_pass(self):179        @scenario(weight=10)180        async def here_three(session):181            _RES.append(3)182        stdout, stderr, rc = self._test_molotov(183            "-cx",184            "--max-runs",185            "2",186            "--fail",187            "1",188            "-s",189            "here_three",190            "molotov.tests.test_run",191        )192        wanted = "SUCCESSES: 2"193        self.assertTrue(wanted in stdout)194        self.assertEqual(rc, 0)195    @dedicatedloop196    def test_fail_mode_fail(self):197        @scenario(weight=10)198        async def here_three(session):199            assert False200        stdout, stderr, rc = self._test_molotov(201            "-cx",202            "--max-runs",203            "2",204            "--fail",205            "1",206            "-s",207            "here_three",208            "molotov.tests.test_run",209        )210        self.assertEqual(rc, 1)211    @only_pypy212    @dedicatedloop213    def test_uvloop_pypy(self):214        @scenario(weight=10)215        async def here_three(session):216            _RES.append(3)217        orig_import = __import__218        def import_mock(name, *args):219            if name == "uvloop":220                raise ImportError()221            return orig_import(name, *args)222        with patch("builtins.__import__", side_effect=import_mock):223            stdout, stderr, rc = self._test_molotov(224                "-cx",225                "--max-runs",226                "2",227                "-s",228                "here_three",229                "--uvloop",230                "molotov.tests.test_run",231            )232        wanted = "You can't use uvloop"233        self.assertTrue(wanted in stdout)234    @skip_pypy235    @dedicatedloop236    def test_uvloop_import_error(self):237        @scenario(weight=10)238        async def here_three(session):239            _RES.append(3)240        orig_import = __import__241        def import_mock(name, *args):242            if name == "uvloop":243                raise ImportError()244            return orig_import(name, *args)245        with patch("builtins.__import__", side_effect=import_mock):246            stdout, stderr, rc = self._test_molotov(247                "-cx",248                "--max-runs",249                "2",250                "--console-update",251                "0",252                "-s",253                "here_three",254                "--uvloop",255                "molotov.tests.test_run",256            )257        wanted = "You need to install uvloop"258        self.assertTrue(wanted in stdout)259    @skip_pypy260    @dedicatedloop261    def test_uvloop(self):262        try:263            import uvloop  # noqa264        except ImportError:265            return266        @scenario(weight=10)267        async def here_three(session):268            _RES.append(3)269        stdout, stderr, rc = self._test_molotov(270            "-cx",271            "--max-runs",272            "2",273            "-s",274            "here_three",275            "--uvloop",276            "molotov.tests.test_run",277        )278        wanted = "SUCCESSES: 2"279        self.assertTrue(wanted in stdout, stdout)280    @dedicatedloop281    def test_delay(self):282        with catch_sleep() as delay:283            @scenario(weight=10, delay=0.1)284            async def here_three(session):285                _RES.append(3)286            stdout, stderr, rc = self._test_molotov(287                "--delay",288                ".6",289                "--console-update",290                "0",291                "-cx",292                "--max-runs",293                "2",294                "-s",295                "here_three",296                "molotov.tests.test_run",297            )298            wanted = "SUCCESSES: 2"299            self.assertTrue(wanted in stdout, stdout)300            self.assertEqual(delay[:9], [1, 0.1, 1, 0.6, 1, 0.1, 1, 0.6, 1])301    @dedicatedloop302    def test_rampup(self):303        with catch_sleep() as delay:304            @scenario(weight=10)305            async def here_three(session):306                _RES.append(3)307            stdout, stderr, rc = self._test_molotov(308                "--ramp-up",309                "10",310                "--workers",311                "5",312                "--console-update",313                "0",314                "-cx",315                "--max-runs",316                "2",317                "-s",318                "here_three",319                "molotov.tests.test_run",320            )321            # workers should start every 2 seconds since322            # we have 5 workers and a ramp-up323            # the first one starts immediatly, then each worker324            # sleeps 2 seconds more.325            delay = [d for d in delay if d != 0]326            self.assertEqual(delay, [1, 2.0, 4.0, 6.0, 8.0, 1, 1])327            wanted = "SUCCESSES: 10"328            self.assertTrue(wanted in stdout, stdout)329    @dedicatedloop330    def test_sizing(self):331        _RES2["fail"] = 0332        _RES2["succ"] = 0333        with catch_sleep():334            @scenario()335            async def sizer(session):336                if random.randint(0, 20) == 1:337                    _RES2["fail"] += 1338                    raise AssertionError()339                else:340                    _RES2["succ"] += 1341            stdout, stderr, rc = self._test_molotov(342                "--sizing",343                "--console-update",344                "0",345                "--sizing-tolerance",346                "5",347                "-s",348                "sizer",349                "molotov.tests.test_run",350            )351        ratio = float(_RES2["fail"]) / float(_RES2["succ"]) * 100.0352        self.assertTrue(ratio < 14.75 and ratio >= 4.75, ratio)353        found = re.findall(r"obtained with (\d+) workers", stdout)354        assert int(found[0]) > 50355    @unittest.skipIf(os.name == "nt", "win32")356    @dedicatedloop357    def test_sizing_multiprocess(self):358        counters = SharedCounters("OK", "FAILED")359        with catch_sleep():360            @scenario()361            async def sizer(session):362                if random.randint(0, 10) == 1:363                    counters["FAILED"] += 1364                    raise AssertionError()365                else:366                    counters["OK"] += 1367            with set_args(368                "molotov",369                "--sizing",370                "-p",371                "2",372                "--sizing-tolerance",373                "5",374                "--console-update",375                "0",376                "-s",377                "sizer",378                "molotov.tests.test_run",379            ) as (stdout, stderr):380                try:381                    main()382                except SystemExit:383                    pass384            stdout, stderr = stdout.read().strip(), stderr.read().strip()385            # stdout, stderr, rc = self._test_molotov()386            ratio = (387                float(counters["FAILED"].value) / float(counters["OK"].value) * 100.0388            )389            self.assertTrue(ratio >= 4.75, ratio)390    @unittest.skipIf(os.name == "nt", "win32")391    @dedicatedloop_noclose392    def test_statsd_multiprocess(self):393        test_loop = asyncio.get_event_loop()394        @scenario()395        async def staty(session):396            get_context(session).statsd.increment("yopla")397        server = UDPServer("127.0.0.1", 9999, loop=test_loop)398        _stop = asyncio.Future()399        async def stop():400            await _stop401            await server.stop()402        server_task = asyncio.ensure_future(server.run())403        stop_task = asyncio.ensure_future(stop())404        args = self._get_args()405        args.verbose = 2406        args.processes = 2407        args.max_runs = 5408        args.duration = 1000409        args.statsd = True410        args.statsd_address = "udp://127.0.0.1:9999"411        args.single_mode = "staty"412        args.scenario = "molotov.tests.test_run"413        stream = io.StringIO()414        run(args, stream=stream)415        _stop.set_result(True)416        test_loop.run_until_complete(asyncio.gather(server_task, stop_task))417        udp = server.flush()418        incrs = 0419        for line in udp:420            for el in line.split(b"\n"):421                if el.strip() == b"":422                    continue423                incrs += 1424        # two processes making 5 run each425        # we want at least 5  here426        self.assertTrue(incrs > 5)427        stream.seek(0)428        output = stream.read()429        self.assertTrue("Happy breaking!" in output, output)430    @dedicatedloop431    def test_timed_sizing(self):432        _RES2["fail"] = 0433        _RES2["succ"] = 0434        _RES2["messed"] = False435        with catch_sleep():436            @scenario()437            async def sizer(session):438                if get_context(session).worker_id == 200 and not _RES2["messed"]:439                    # worker 2 will mess with the timer440                    # since we're faking all timers, the current441                    # time in the test is always around 0442                    # so to have now() - get_timer() > 60443                    # we need to set a negative value here444                    # to trick it445                    set_timer(-61)446                    _RES2["messed"] = True447                    _RES2["fail"] = _RES2["succ"] = 0448                if get_context(session).worker_id > 100:449                    # starting to introduce errors passed the 100th450                    if random.randint(0, 10) == 1:451                        _RES2["fail"] += 1452                        raise AssertionError()453                    else:454                        _RES2["succ"] += 1455                # forces a switch456                await asyncio.sleep(0)457            stdout, stderr, rc = self._test_molotov(458                "--sizing",459                "--sizing-tolerance",460                "5",461                "--console-update",462                "0",463                "-cs",464                "sizer",465                "molotov.tests.test_run",466            )467        ratio = float(_RES2["fail"]) / float(_RES2["succ"]) * 100.0468        self.assertTrue(ratio < 20.0 and ratio > 4.75, ratio)469    @unittest.skipIf(os.name == "nt", "win32")470    @dedicatedloop471    def test_sizing_multiprocess_interrupted(self):472        counters = SharedCounters("OK", "FAILED")473        @scenario()474        async def sizer(session):475            if random.randint(0, 10) == 1:476                counters["FAILED"] += 1477                raise AssertionError()478            else:479                counters["OK"] += 1480        async def _stop():481            await asyncio.sleep(2.0)482            os.kill(os.getpid(), signal.SIGINT)483        asyncio.ensure_future(_stop())484        stdout, stderr, rc = self._test_molotov(485            "--sizing",486            "-p",487            "3",488            "--sizing-tolerance",489            "90",490            "--console-update",491            "0",492            "-s",493            "sizer",494            "molotov.tests.test_run",495        )496        self.assertTrue("Sizing was not finished" in stdout)497    @dedicatedloop498    def test_use_extension(self):499        ext = os.path.join(_HERE, "example5.py")500        @scenario(weight=10)501        async def simpletest(session):502            async with session.get("http://localhost:8888") as resp:503                assert resp.status == 200504        with coserver():505            stdout, stderr, rc = self._test_molotov(506                "-cx",507                "--max-runs",508                "1",509                "--use-extension=" + ext,510                "-s",511                "simpletest",512                "molotov.tests.test_run",513            )514        self.assertTrue("=>" in stdout)515        self.assertTrue("<=" in stdout)516    @dedicatedloop517    def test_use_extension_fail(self):518        ext = os.path.join(_HERE, "exampleIDONTEXIST.py")519        @scenario(weight=10)520        async def simpletest(session):521            async with session.get("http://localhost:8888") as resp:522                assert resp.status == 200523        with coserver():524            stdout, stderr, rc = self._test_molotov(525                "-cx",526                "--max-runs",527                "1",528                "--use-extension=" + ext,529                "-s",530                "simpletest",531                "molotov.tests.test_run",532            )533        self.assertTrue("Cannot import" in stdout)534    @dedicatedloop535    def test_use_extension_module_name(self):536        ext = "molotov.tests.example5"537        @scenario(weight=10)538        async def simpletest(session):539            async with session.get("http://localhost:8888") as resp:540                assert resp.status == 200541        with coserver():542            stdout, stderr, rc = self._test_molotov(543                "-cx",544                "--max-runs",545                "1",546                "--use-extension=" + ext,547                "-s",548                "simpletest",549                "molotov.tests.test_run",550            )551        self.assertTrue("=>" in stdout)552        self.assertTrue("<=" in stdout)553    @dedicatedloop554    def test_use_extension_module_name_fail(self):555        ext = "IDONTEXTSIST"556        @scenario(weight=10)557        async def simpletest(session):558            async with session.get("http://localhost:8888") as resp:559                assert resp.status == 200560        with coserver():561            stdout, stderr, rc = self._test_molotov(562                "-cx",563                "--max-runs",564                "1",565                "--use-extension=" + ext,566                "-s",567                "simpletest",568                "molotov.tests.test_run",569            )570        self.assertTrue("Cannot import" in stdout)571    @dedicatedloop572    def test_quiet(self):573        @scenario(weight=10)574        async def here_three(session):575            _RES.append(3)576        stdout, stderr, rc = self._test_molotov(577            "-cx", "--max-runs", "1", "-q", "-s", "here_three", "molotov.tests.test_run"578        )579        self.assertEqual(stdout, "")580        self.assertEqual(stderr, "")581    @dedicatedloop_noclose582    def test_slow_server_force_shutdown(self):583        @scenario(weight=10)584        async def _one(session):585            async with session.get("http://localhost:8888/slow") as resp:586                assert resp.status == 200587                _RES.append(1)588        args = self._get_args()...__init__.py
Source:__init__.py  
1from pelican.contents import Article as pelican_Article2from dataclasses import dataclass as _dataclass3"""4EXPERIMENTALLY: We render a pseudo-expandable-collapsable tree of *arbitrary*5depth for website navigation (a "nav tree").6Here's a whirlwind introduction to our "nav-tree theory", intermixed7with requirements and features:8First; a requirement, an anti-requirement, and a design decision:91) The underlying data must itself be a tree; i.e., every node in your site10   site graph data structure must have exactly one parent, except for your11   root node. Children have order.122) There is no constraint on the max depth of your tree. This plugin will13   render well-formed and valid HTML for trees of any depth. (But you will14   hit some practical limits at some point, depending on the user agent etc.)153) We chose not to give a visual representation of the root node of the site16   (the index/home page); because it effectively "wastes" one level of depth17   in a world of limited screen real-estate. As it works out, this is a18   non-issue because a link to the site itself is always rendered by a template19   elsewhere in our frontier theme (just because that's what the default theme20   does, is show the title of the site somewhere and makes it a link).21Two axioms of all nav trees, as we see it:224) Every *branch* node (nodes with children) in your graph, when represented23   visually, must "splay" all its top-level children when it is "selected"24   (i.e., when it is the current page you are on) (otherwise how would25   you navigate to all the nodes of the site?).265) There must be some discrete visual representation of which node you are27   "on" (if any) of the nodes displayed. (The only time you are not "on"28   one of the nodes visible in your nav is when you're at the root node,29   given (3).)30As a corollary of (3) and (4),316) All level-1 nodes (nodes that are immediate children of the root node)32   are displayed at all times, no matter where you are on the site (that is,33   the root is always "splayed"). This follows from there being no visual34   representation of a root node to click to expand; but also it feels more35   natural, appearing and behaving like most other nav trees out in the wild36   in this regard.37This one is an interesting intersection of UX and practical vectors:387) We do not accomodate for "pure" branch nodes; that is, every branch node39   (node with children) will also have content of its own. This sort of stems40   from the fact that we represent our document trees using vendor frontmatter41   residing in individual articles -- parent-child relationships are42   represented through a `Children` component frontmatter, frontmatter which43   is de-facto already associated with an article (with content).44   But also this produces better UX because we never produce for a branch45   node just a blank page staring back at you with nothing but nav.46   This affects the nav tree because it will never have a branch node that is47   not also "clickable". Given this, this now leads to our intuitive-most48   solution for (5): The only node in the visible nav tree that isn't49   clickable is the page you are on.50Now, as a corollary of (2),518) IFF necessary there's a "depth trail" of links showing the trail52   between the root and the node you are on. There is no depth trail when53   you're on the root node, a level-1 one, or a level-2 node (probably54   as a corollary of several of these points).55New in this edition (#history-B.4) we have the tree "unfurled" to *two*56levels of depth hard-codedly.57For a tree unfurled to only to one level of depth, conceptually, here58is the data that goes in to the nav tree:59    - 0-N top-level nodes before60    - 0-N the depth-trail, each node clickable61    - one of:62      - IF you're ON a leaf node63        - IF depth trail is 0 length64          - the leaf you're on (not clickable)65        - OTHERWISE (the sibling splay):66          - 0-N nodes before67          - the leaf you're on (not clickable)68          - 0-N nodes after69      - OTHERWISE (and you're ON a branch node):70        - the branch you're on (not clickable)71        - splay its 1-N cx, each is clickable72    - (don't forget to close tags for each item in depth trail)73    - 0-N top-level nodes after74EXPERIMENTALLY We can distill it down to:75    - 0-N top-level nodes before76    - 0-N the depth trail77    - 0-N sibling nodes before you78    - 0 or 1 the node you're on79    - 0-N your children nodes80    - 0-N sibiling nodes after you81    - 0-N top-level nodes after82"""83def _go_ham(generator):  # #testpoint84    do_single = generator.settings.get('DO_SINGLE_DEPTH_NAV_TREE', False)85    def build_getter(k):86        # For each article, we build the nav tree only lazily: If you have87        # 100 articles, that's 100 nav-trees you have to build, each88        # requiring various traversals and link constructions...89        def func():90            if func.x is None:91                func.x = build_nav_tree(k)92            return func.x93        func.x = None94        return func95    big_index = _build_big_index(generator.articles, build_getter)96    if do_single:97        build_nav_tree = _build_build_nav_tree_for_single_deep(big_index)98    else:99        build_nav_tree = _build_build_nav_tree_for_double_deep(big_index)100def _build_build_nav_tree_for_double_deep(big_index):101    def build_nav_tree(key):102        trail_upwards = big_index.build_trail_upwards(key)103        depth = len(trail_upwards)104        if depth:105            assert 'index' == trail_upwards[-1]106        if 2 < depth:107            kw = {k: v for k, v in case_deep(trail_upwards, key)}108            return _DoubleDeepNavTreeDeeplySelected(**kw)109        if 2 == depth:110            kw = {k: v for k, v in case_level_two(trail_upwards, key)}111            return _DoubleDeepNavTreeSecondLevelNodeSelected(**kw)112        if 1 == depth:113            kw = {k: v for k, v in case_level_one(key)}114            return _DoubleDeepNavTreeTopNodeSelected(**kw)115        assert 0 == depth116        kw = {k: v for k, v in case_level_zero()}117        return _DoubleDeepNavTreeNothingSelected(**kw)118    def case_deep(trail, key):119        kw = level_one_and_two_dict(trail)120        cx = kw.pop('second_level_children')121        nb, here, na = nodes_before_here_nodes_after(*trail[-3:-1], cx)122        yield 'second_level_nodes_before', nb123        yield 'second_level_nodes_after', na124        title, url = here125        yield 'silo_second_level_node_title', title126        yield 'silo_second_level_node_url', url127        # You either do or don't have children128        ch_ks = children_of.get(key)129        # Skip the root, skip the first level, skip the 2nd. that's 3130        use_trail_ks = list(reversed(trail[:-3]))  # (zero length OK)131        for k, v in kw.items():  # common, shared boilerplate things132            yield k, v133        # If you do have children,134        if ch_ks:135            # # Put yourself on the depth trail136            # use_trail_ks.append(k)137            yield 'depth_trail', title_and_urls(use_trail_ks)138            yield 'sibling_nodes_before', None  # explicit for now139            yield 'current_node_title', article_via_key[key].title140            # Splay all your children141            yield 'children_of_current_node', title_and_urls(ch_ks)142            yield 'sibling_nodes_after', None  # explicit for now143            return144        # Since you don't have children, splay your siblings145        level_N_key = trail[0]  # (trail is backwards, 0 is key of your parent)146        ks = children_of[level_N_key]147        ks_before, _, ks_after = split_into_three(ks.index(key), ks)148        yield 'depth_trail', title_and_urls(use_trail_ks)  # 0 length OK149        yield 'sibling_nodes_before', title_and_urls(ks_before)150        yield 'current_node_title', article_via_key[key].title151        yield 'children_of_current_node', None  # make it explicit for now152        yield 'sibling_nodes_after', title_and_urls(ks_after)153    def case_level_two(trail, k):154        kw = level_one_and_two_dict(trail)155        cx = kw.pop('second_level_children')156        nb, here, na = nodes_before_here_nodes_after(k, trail[0], cx)157        yield 'sibling_nodes_before', nb158        yield 'sibling_nodes_after', na159        title, _url = here160        yield 'current_node_title', title161        ch_ks = children_of.get(k)162        if ch_ks:163            use = title_and_urls(ch_ks)164        else:165            use = None166        yield 'children_of_current_node', use167        for k, v in kw.items():168            yield k, v169    def case_level_one(k):170        kw = sawtooth_dict(k)171        title, _url, cx = kw.pop('here_three')172        yield 'current_node_title', title173        yield 'children_of_current_node', cx174        for k, v in kw.items():175            yield k, v176    def case_level_zero():177        yield 'double_deep_nodes', sawteeth178    # == Shared between level 2 and level 3179    def level_one_and_two_dict(trail):180        kw = sawtooth_dict(trail[-2])  # -1 is 'index', -2 is level_1_key181        title, url, cx = kw.pop('here_three')182        kw['silo_top_node_title'] = title183        kw['silo_top_node_url'] = url184        kw['second_level_children'] = cx185        return kw186    def sawtooth_dict(level_one_key):187        return {k: v for k, v in do_sawtooth_dict(level_one_key)}188    def do_sawtooth_dict(level_one_key):189        i = offset_via_level_one_key[level_one_key]190        bef, mid, aft = split_into_three(i, sawteeth)191        yield 'double_deep_nodes_before', bef192        yield 'here_three', mid193        yield 'double_deep_nodes_after', aft194    # ==195    def nodes_before_here_nodes_after(level_Np1_key, level_N_key, cx):196        ch_ks = children_of[level_N_key]197        i = ch_ks.index(level_Np1_key)198        return split_into_three(i, cx)199    def split_into_three(i, tup):200        return tup[:i], tup[i], tup[(i+1):]201    def title_and_urls(keys):202        return tuple(title_and_url(k) for k in keys)203    def title_and_url(k):204        article = article_via_key[k]205        return article.title, article.url206    root_key, children_of, parent_of, article_via_key = big_index207    if len(children_of):208        level_one_keys = children_of[root_key]209    else:210        level_one_keys = ()  # allow single-article sites211    offset_via_level_one_key = {k: i for i, k in enumerate(level_one_keys)}212    # == FROM213    build = _make_sawtooth_builder(children_of, title_and_url)214    sawteeth = tuple(build(k) for k in level_one_keys)215    # == TO216    return build_nav_tree217def _make_sawtooth_builder(children_of, title_and_url):218    def build_sawtooth_tooth(key):219        ch_ks = children_of.get(key, ())220        cx = tuple(title_and_url(k) for k in ch_ks)221        return (*title_and_url(key), cx)222    return build_sawtooth_tooth223def _build_build_nav_tree_for_single_deep(big_index):224    def build_nav_tree(k):225        kv = {k: v for k, v in do_build_nav_tree(k)}226        return _SingleDepthNavTree(**kv)227    def do_build_nav_tree(k):228        trail_upwards = big_index.build_trail_upwards_legacy(k)229        # Edge case: root node (not in nav)230        depth = len(trail_upwards)231        if 1 == depth:232            yield 'top_level_nodes_before', of(top_level_keys)233            return234        top_level_k = trail_upwards[-2]  # -1 is always the root artcle no see235        i = top_level_keys.index(top_level_k)  # #here2236        yield 'top_level_nodes_before', of(top_level_keys[:i])237        yield 'top_level_nodes_after', of(top_level_keys[(i+1):])238        cx = children_of.get(k)239        # Are you only one level deep in the nav?240        if 2 == depth:241            # Is the selected node a leaf node?242            if cx is None:243                yield 'current_node_title', ti(top_level_k)244                return245            # You're one level deep and node has children. splay children246            yield 'current_node_title', ti(top_level_k)247            yield 'children_nodes', of(cx)248            return249        # We're somewhere "deep" inside the tree, we have a depth trail250        # (trail upwards does NOT include self, never inclues root article)251        use_trail_ks = tuple(reversed(trail_upwards[1:-1]))252        assert use_trail_ks253        yield 'depth_trail', of(use_trail_ks)254        # If we are a leaf node and not a branch node, render our siblings255        if cx is None:256            # Siblings and self257            parent_k = trail_upwards[1]258            parent_cx = children_of[parent_k]259            i = parent_cx.index(k)260            yield 'sibling_nodes_before', of(parent_cx[:i])261            yield 'current_node_title', ti(k)262            yield 'sibling_nodes_after', of(parent_cx[(i+1):])263            return264        # Otherwise, we're a branch node, render our children265        yield 'current_node_title', ti(k)266        yield 'children_nodes', of(cx)267    def of(keys):268        return tuple(two_for(k) for k in keys)269    def two_for(k):270        article = article_via_key[k]271        return article.title, article.url272    def ti(k):273        return article_via_key[k].title274    root_key, children_of, parent_of, article_via_key = big_index275    top_level_keys = children_of[root_key]276    return build_nav_tree277# == Nav Tree Data Structures (ideally there would be just one)278@_dataclass279class _DoubleDeepNavTreeNothingSelected:280    double_deep_nodes: tuple281    my_silo_type = 'silo_type_nothing_selected'282@_dataclass283class _DoubleDeepNavTreeTopNodeSelected:284    double_deep_nodes_before: tuple285    current_node_title: str286    children_of_current_node: tuple287    double_deep_nodes_after: tuple288    my_silo_type = 'silo_type_top_node_selected'289@_dataclass290class _DoubleDeepNavTreeSecondLevelNodeSelected:291    double_deep_nodes_before: tuple292    silo_top_node_title: str293    silo_top_node_url: str294    sibling_nodes_before: tuple295    current_node_title: str296    children_of_current_node: tuple297    sibling_nodes_after: tuple298    double_deep_nodes_after: tuple299    my_silo_type = 'silo_type_second_level_node_selected'300@_dataclass301class _DoubleDeepNavTreeDeeplySelected:302    double_deep_nodes_before: tuple303    silo_top_node_title: str304    silo_top_node_url: str305    second_level_nodes_before: tuple306    silo_second_level_node_title: str307    silo_second_level_node_url: str308    depth_trail: tuple309    sibling_nodes_before: tuple310    current_node_title: str311    children_of_current_node: tuple312    sibling_nodes_after: tuple313    second_level_nodes_after: tuple314    double_deep_nodes_after: tuple315    my_silo_type = 'silo_type_deeply_selected'316@_dataclass317class _SingleDepthNavTree:318    top_level_nodes_before: tuple = ()319    depth_trail: tuple = ()320    sibling_nodes_before: tuple = ()321    current_node_title: tuple = None322    sibling_nodes_after: tuple = ()323    children_nodes: tuple = ()324    top_level_nodes_after: tuple = ()325    my_silo_type = 'legacy_single_depth_tree'326    # (to be correct there should be several silo types within but meh)327# == Big Index328@_dataclass329class _BigIndex:330    root_key: str331    children_of: dict332    parent_of: dict333    article_via_key: dict334    def build_trail_upwards(self, k):335        return tuple(self._do_build_trail_upwards(k))336    def build_trail_upwards_legacy(self, k):337        return tuple(self._do_build_trail_upwards_legacy(k))338    def _do_build_trail_upwards(self, curr):339        curr = self.parent_of[curr]340        while curr is not None:  # #here1341            yield curr342            curr = self.parent_of[curr]343    def _do_build_trail_upwards_legacy(self, curr):344        while True:345            yield curr346            curr = self.parent_of[curr]347            if curr is None:  # #here1348                break349    def __iter__(self):350        return iter((self.root_key, self.children_of,351                     self.parent_of, self.article_via_key))352def _build_big_index(articles, build_getter):353    # Tricky: traverse every article, *mutating* it by putting this getter354    # on it, while also building the big index355    import re356    rx = re.compile(r'^(.+)\.md\Z')  # meh for now357    parent_of, children_of, article_via_key = {}, {}, {}358    forward_references = {}359    for article in articles:360        md = rx.match(article.relative_source_path)361        k = md[1]362        article._do_retrieve_nav_tree = build_getter(k)  # #testpoint363        # Index the article by its key364        if k in article_via_key:365            raise _MyException(f"Multiple articles with same key: {k!r}")366        article_via_key[k] = article367        # Nothing more to do unless it has children368        s = getattr(article, 'children', None)369        if not s:370            continue371        # Associate parent with children and child with parent372        these = tuple(s.split(' '))373        children_of[k] = these374        for ch_k in these:375            if ch_k in parent_of:376                otr_k = parent_of[ch_k]377                raise _MyException(f"{ch_k!r} is child of both {otr_k!r} and {k!r}")  # noqa: E501378            parent_of[ch_k] = k379            if ch_k in article_via_key:380                continue381            forward_references[ch_k] = k382    # Did any keys appear in lists of children then ended up not existing?383    these = set(forward_references) - set(article_via_key)384    if these:385        these = tuple(sorted(these))  # normalize ordering for tests386        raise _MyException(f"unresolved fwd reference(s): {these!r}")387    # Now that everything is indexed,388    these = tuple(k for k in article_via_key if k not in parent_of)389    if 1 < len(these):390        these = tuple(sorted(these))  # normalize order for tests391        raise _MyException(f"Only one can be root, the others need a parent: {these!r}")  # noqa: E501392    root_key, = these393    parent_of[root_key] = None  # #here1394    return _BigIndex(root_key, children_of, parent_of, article_via_key)395def retrieve_nav_tree(self):396    return self._do_retrieve_nav_tree()397assert not hasattr(pelican_Article, 'nav_tree')398pelican_Article.nav_tree = property(retrieve_nav_tree)399def register():400    from pelican import signals401    signals.article_generator_finalized.connect(_go_ham)402_MyException = RuntimeError403def xx(msg=None):404    raise RuntimeError(''.join(('ohai', *((': ', msg) if msg else ()))))405# #history-B.4 double-deep...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
