Best Python code snippet using playwright-python
generate_api.py
Source:generate_api.py  
...150    if str(get_origin(value)) == "<class 'list'>":151        return ["mapping.from_impl_list(", ")"]152    if str(get_origin(value)) == "<class 'dict'>":153        return ["mapping.from_impl_dict(", ")"]154    return ["mapping.from_impl(", ")"]155header = """156# Copyright (c) Microsoft Corporation.157#158# Licensed under the Apache License, Version 2.0 (the "License");159# you may not use this file except in compliance with the License.160# You may obtain a copy of the License at161#162# http://www.apache.org/licenses/LICENSE-2.0163#164# Unless required by applicable law or agreed to in writing, software165# distributed under the License is distributed on an "AS IS" BASIS,166# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.167# See the License for the specific language governing permissions and168# limitations under the License....test_converters.py
Source:test_converters.py  
1from glom import Assign, glom, Iter, T2import numpy as np3import pandas as pd4import pytest5from friendly_data.converters import _source_type6from friendly_data.converters import from_df7from friendly_data.converters import from_dst8from friendly_data.converters import resolve_aliases9from friendly_data.converters import to_da10from friendly_data.converters import to_df11from friendly_data.converters import to_dst12from friendly_data.converters import to_mfdst13from friendly_data.converters import xr_metadata14from friendly_data.converters import xr_da15from friendly_data.dpkg import pkg_from_index, res_from_entry16from friendly_data.io import dwim_file17from .conftest import expected_schema, to_df_noalias18def test_source_type_heuristics():19    with pytest.raises(ValueError):20        _source_type("/path/to/non-existent-file.ext")21@pytest.mark.skip(reason="not sure how to test schema parsing")22def test_schema_parsing():23    pass24def test_pkg_to_df(rnd_pkg):25    for resource in rnd_pkg.resources:26        df = to_df(resource)  # test target, don't touch this27        from_impl = expected_schema(df, type_map={})28        # read from file; strings are read as `object`, remap to `string`29        raw = expected_schema(resource, type_map={"object": "string", "int64": "Int64"})30        # impl marks columns as timestamps based on the schema.  so remap31        # timestamp columns as datetime64[ns] as per the schema32        ts_cols = [33            field.name for field in resource.schema.fields if "datetime" in field.type34        ]35        raw.update((col, "datetime64[ns]") for col in ts_cols)36        assert from_impl == raw37        if not ts_cols:  # no timestamps, skip38            continue39        # resource w/ timestamps40        dtype_cmp = df[ts_cols].dtypes == np.dtype("datetime64[ns]")41        assert dtype_cmp.all(axis=None)42    # resource w/ a index43    resource = rnd_pkg.resources[0]44    field_names = [field.name for field in resource.schema.fields]45    glom(resource, Assign("schema.primaryKey", field_names[0]))46    df = to_df(resource)47    # compare columns48    assert list(df.columns) == field_names[1:]49    # check if the right column has been set as index50    assert df.index.name == resource.schema.fields[0].name51    # resource w/ a MultiIndex52    glom(resource, Assign("schema.primaryKey", field_names[:2]))53    df = to_df(resource)54    # compare columns55    assert list(df.columns) == field_names[2:]56    # check if the right column has been set as index57    assert df.index.names == field_names[:2]58    # resource w/ NA59    resource = rnd_pkg.resources[1]60    # set new NA value: "sit" from "Lorem ipsum dolor sit amet consectetur61    # adipiscing", TRE - 2nd column62    glom(resource, Assign("schema.missingValues", ["", "sit"]))63    df = to_df(resource)64    assert df.isna().any(axis=None)65    # unsupported resource type66    resource = rnd_pkg.resources[0]67    update = {68        "path": resource["path"].replace("csv", "txt"),69        "mediatype": resource["mediatype"].replace("csv", "plain"),70    }71    resource.update(update)72    with pytest.raises(ValueError, match="unsupported source.+"):  # default behaviour73        df = to_df(resource)74    assert to_df(resource, noexcept=True).empty  # suppress exceptions75def test_pkg_to_df_skip_rows(pkg_meta):76    _, pkg, __ = pkg_from_index(pkg_meta, "testing/files/skip_test/index.yaml")77    df = to_df(pkg["resources"][0])78    expected = ["UK", "Ireland", "France"]79    np.testing.assert_array_equal(df.columns, expected)80    assert isinstance(df.index, pd.DatetimeIndex)81def test_pkg_to_df_aliased_cols(pkg_w_alias):82    df = to_df(pkg_w_alias["resources"][1])83    assert "region" in df.index.names84    assert "flow_in" in df.columns85def test_resolve_aliases(pkg_w_alias):86    for res in pkg_w_alias.resources:87        _df, entry = to_df_noalias(res)88        df = resolve_aliases(_df, entry["alias"])89        assert "region" in df.index.names90        if "flow_in" in entry["path"]:91            assert "flow_in" in df.columns92def test_df_to_resource(tmp_path, pkg_w_alias):93    df = to_df(pkg_w_alias["resources"][1])94    res = from_df(df, basepath=tmp_path)95    fpath = f"{'_'.join(df.columns)}.csv"96    assert (tmp_path / fpath).exists()97    assert fpath == res["path"]98    df.columns = ["energy_in"]99    df.index.names = ["technology", "node", "unit"]100    alias = {"node": "region", "energy_in": "flow_in"}101    for r in (True, False):102        res = from_df(df, basepath=tmp_path, alias=alias, rename=r)103        res_alias = glom(104            res,105            (106                "schema.fields",107                Iter()108                .filter(lambda i: "alias" in i)109                .map(({1: "name", 2: "alias"}, T.values()))110                .all(),111                dict,112            ),113        )114        if r:115            assert not res_alias116        else:117            assert res_alias == alias118def test_xr_metadata(pkg_w_alias):119    # 1: alias, unit, 2: alias120    df1, df2 = [to_df(res) for res in pkg_w_alias.resources]121    df1_res, coords1, attrs1 = xr_metadata(df1)122    assert df1.index.names == df1_res.index.names123    assert set(df1.index.names) == set(coords1)124    assert attrs1 == {}125    df2_res, coords2, attrs2 = xr_metadata(df2)126    assert set(df2.index.names) - set(df2_res.index.names) == {"unit"}127    assert set(df2.index.names) - set(coords2) == {"unit"}128    assert set(attrs2) == {"unit"}129def test_xr_da(pkg_w_alias):130    # 1: alias, 2: alias, unit131    df1, df2 = [to_df(res) for res in pkg_w_alias.resources]132    df_aligned, coords, attrs = xr_metadata(df1)133    arr1 = xr_da(df_aligned, 0, coords=coords, attrs=attrs)134    arr2 = xr_da(df_aligned, df1.columns[0], coords=coords, attrs=attrs)135    assert arr1.equals(arr2)  # column specification136    assert list(arr1.coords) == df1.index.names137    arr = xr_da(df_aligned, 0, coords=coords, attrs=attrs)138    assert arr.attrs == {}139    df_aligned, coords, attrs = xr_metadata(df2)140    arr = xr_da(df_aligned, 0, coords=coords, attrs=attrs)141    assert set(df2.index.names) - set(arr.coords) == {"unit"}142    assert "unit" in arr.attrs143    df3 = df2.assign(foo=3)144    df_aligned, coords, attrs = xr_metadata(df3)145    arr1 = xr_da(df_aligned, 1, coords=coords, attrs=attrs)146    arr2 = xr_da(df_aligned, "foo", coords=coords, attrs=attrs)147    # assert arr1.name == arr2.name == "foo"148    arr1.data = arr2.data = np.where(np.isnan(arr1.data), 3, arr1.data)149    expected = np.full_like(arr1.data, 3)150    assert (arr1.data == expected).all() and (arr2.data == expected).all()151def test_to_da(pkg_w_alias):152    # alias, unit153    res = pkg_w_alias.resources[1]  # "unit" is excluded from dims154    assert len(to_da(res).dims) == glom(res, ("schema.primaryKey", len)) - 1155    res["path"] = res["path"] + ".bad"156    # wrapped in an array, cannot do `is None`157    assert to_da(res, noexcept=True).data == None  # noqa: E711158    # multicol159    entry = dwim_file("testing/files/xr/index.yaml")[0]160    res = res_from_entry(entry, "testing/files/xr")161    with pytest.raises(ValueError, match="only 1 column supported"):162        to_da(res)163def test_to_dst(pkg_w_alias):164    # alias, unit165    res = pkg_w_alias.resources[1]  # "unit" is excluded from dims166    assert len(to_dst(res).dims) == glom(res, ("schema.primaryKey", len)) - 1167    res["path"] = res["path"] + ".bad"168    assert not to_dst(res, noexcept=True).data_vars169    # multicol170    entry = dwim_file("testing/files/xr/index.yaml")[0]171    res = res_from_entry(entry, "testing/files/xr")172    assert len(to_dst(res).data_vars) == 2173def test_to_mfdst(pkg_w_alias):174    dst = to_mfdst(pkg_w_alias.resources)175    assert len(dst.data_vars) == 2176def test_dst_to_pkg(tmp_path, pkg_w_alias):177    dst = to_mfdst(pkg_w_alias.resources)178    resources = from_dst(dst, basepath=tmp_path)...cb.py
Source:cb.py  
...4647        for cb in cbs.callbacks:48            for key in _get_events(cb):49                cb_impl = getattr(cb, key)50                cb_spec = CallbackSpec.from_impl(cb_impl)5152                self._cb_map.setdefault(key, []).append(cb_spec)5354        self._required_args = {}55        for key, specs in self._cb_map.items():56            self._required_args[key] = {arg for spec in specs for arg in spec.required}5758    @property59    def events(self):60        return set(self._cb_map)6162    def dispatch(self, key, kwargs):63        missing = self._required_args.get(key, set()) - set(kwargs)64        if missing:65            raise RuntimeError(f"Missing arguments for event {key}: {missing}")6667        for cb in self._cb_map.get(key, []):68            cb.call(kwargs)697071class ConsistencyChecker:72    def __init__(self, cbs):73        self.events = {key for cb in cbs.callbacks for key in _get_events(cb)}74        self.optional_events = set(cbs.optional_events)7576        self._ev_types = {}77        self._stack = []78        self._called = set()7980        self._outer_ev = None8182        for cb in cbs.callbacks:83            for key in _get_events(cb):84                # pre-cache all known events85                self._get_ev_type(key)8687    def on_event(self, key):88        ev_type, ev = self._get_ev_type(key)89        exit_ev = self._modify_state(key, ev_type, ev)9091        try:92            self._check_first_event(key, ev_type, ev, exit_ev)93            self._check_reentrant(key, ev_type, ev, exit_ev)94            self._check_balancing(key, ev_type, ev, exit_ev)95            self._check_missing(key, ev_type, ev, exit_ev)9697        finally:98            self._reset_after_last(key, ev_type, ev, exit_ev)99100    def _get_ev_type(self, key):101        if key not in self._ev_types:102            self._ev_types[key] = self._extract_ev_type(key)103104        return self._ev_types[key]105106    @staticmethod107    def _extract_ev_type(key):108        assert key.startswith("on_")109110        if key.endswith("_start"):111            return EventType.start, key[len("on_") : -len("_start")]112113        elif key.endswith("_end"):114            return EventType.end, key[len("on_") : -len("_end")]115116        else:117            return EventType.event, key[len("on_") :]118119    def _modify_state(self, key, ev_type, ev):120        self._called.add(key)121122        if self._outer_ev is None and ev_type == EventType.start:123            self._outer_ev = ev124125        if ev_type is EventType.start:126            self._stack.append(ev)127            return None128129        elif ev_type is EventType.end:130            try:131                return self._stack.pop()132133            except IndexError:134                return None135136        else:137            return None138139    def _is_last_event(self, ev_type, ev):140        return ev_type is EventType.end and ev == self._outer_ev141142    def _reset_after_last(self, key, ev_type, ev, exit_ev):143        if not self._is_last_event(ev_type, ev):144            return145146        self._outer_ev = None147        self._called = set()148149    def _check_first_event(self, key, ev_type, ev, exit_ev):150        if self._outer_ev is None:151            raise RuntimeError(152                f"The first callback must be a start event 'on_*_start', got: {key}"153            )154155    def _check_reentrant(self, key, ev_type, ev, exit_ev):156        count_ev = sum(ev == e for e in self._stack)157        if self._stack.count(ev) >= 2:158            raise RuntimeError(f"Reentrant event {key}")159160    def _check_balancing(self, key, ev_type, ev, exit_ev):161        if ev_type is EventType.end and exit_ev != ev:162            raise RuntimeError(163                f"Unbalanced callback stack. Expected {exit_ev}, found {ev}"164            )165166    def _check_missing(self, key, ev_type, ev, exit_ev):167        if not self._is_last_event(ev_type, ev):168            return169170        missing = self.events - self._called - self.optional_events171        if missing:172            raise RuntimeError(f"Events not called: {missing}")173174175def _get_events(cb):176    events = []177178    for key in vars(type(cb)):179        if not key.startswith("on_"):180            continue181182        events += [key]183184    return events185186187class EventType(int, enum.Enum):188    start = enum.auto()189    end = enum.auto()190    event = enum.auto()191192193class CallbackSpec:194    @classmethod195    def from_impl(cls, cb_impl):196        cb_kwargs, cb_required = cls._get_cb_args(cb_impl)197        return CallbackSpec(cb_impl, cb_kwargs, cb_required)198199    def __init__(self, cb, kwargs, required):200        self.cb = cb201        self.kwargs = bool(kwargs)202        self.required = set(required)203204    def call(self, kwargs):205        missing = self.required - set(kwargs)206        if missing:207            raise RuntimeError(f"Missing callback arguments for {self.cb}: {missing}")208209        if self.kwargs:
...wordle.py
Source:wordle.py  
...68            WordleAction.DELETE_LETTER: self._handle_delete,69            WordleAction.SUBMIT_GUESS: self._handle_submit,70        }71    def on_player_added(self, player: wtypes.Player):72        self._players.append(models.Player.from_impl(player))73        self._emit(player.id, WordleEvent.LETTER_ADDED, self._current_guess)74        self._emit_all(WordleEvent.PLAYER_CHANGED, self._players)75    def on_player_removed(self, removed_player_id: wtypes.PlayerId):76        self._players = [77            player for player in self._players if player.id != removed_player_id78        ]79        self._emit_all(WordleEvent.PLAYER_CHANGED, self._players)80    def set_parameters(self, game_parameters: wtypes.GameParameters):81        self.params = game_parameters82        self.chosen_word = self._dictionary.generate(self.params.word_length)83    def process_action(84        self, player: wtypes.PlayerId, player_action: wtypes.PlayerAction85    ):86        wordle_action = WordleAction[player_action.action]...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
