Best Python code snippet using localstack_python
_fileio.py
Source:_fileio.py  
...72                yield line73            else:74                break75    async def aclose(self) -> None:76        return await to_thread.run_sync(self._fp.close)77    async def read(self, size: int = -1) -> AnyStr:78        return await to_thread.run_sync(self._fp.read, size)79    async def read1(self: "AsyncFile[bytes]", size: int = -1) -> bytes:80        return await to_thread.run_sync(self._fp.read1, size)81    async def readline(self) -> AnyStr:82        return await to_thread.run_sync(self._fp.readline)83    async def readlines(self) -> List[AnyStr]:84        return await to_thread.run_sync(self._fp.readlines)85    async def readinto(self: "AsyncFile[bytes]", b: WriteableBuffer) -> bytes:86        return await to_thread.run_sync(self._fp.readinto, b)87    async def readinto1(self: "AsyncFile[bytes]", b: WriteableBuffer) -> bytes:88        return await to_thread.run_sync(self._fp.readinto1, b)89    @overload90    async def write(self: "AsyncFile[bytes]", b: ReadableBuffer) -> int:91        ...92    @overload93    async def write(self: "AsyncFile[str]", b: str) -> int:94        ...95    async def write(self, b: Union[ReadableBuffer, str]) -> int:96        return await to_thread.run_sync(self._fp.write, b)97    @overload98    async def writelines(99        self: "AsyncFile[bytes]", lines: Iterable[ReadableBuffer]100    ) -> None:101        ...102    @overload103    async def writelines(self: "AsyncFile[str]", lines: Iterable[str]) -> None:104        ...105    async def writelines(106        self, lines: Union[Iterable[ReadableBuffer], Iterable[str]]107    ) -> None:108        return await to_thread.run_sync(self._fp.writelines, lines)109    async def truncate(self, size: Optional[int] = None) -> int:110        return await to_thread.run_sync(self._fp.truncate, size)111    async def seek(self, offset: int, whence: Optional[int] = os.SEEK_SET) -> int:112        return await to_thread.run_sync(self._fp.seek, offset, whence)113    async def tell(self) -> int:114        return await to_thread.run_sync(self._fp.tell)115    async def flush(self) -> None:116        return await to_thread.run_sync(self._fp.flush)117@overload118async def open_file(119    file: Union[str, "PathLike[str]", int],120    mode: OpenBinaryMode,121    buffering: int = ...,122    encoding: Optional[str] = ...,123    errors: Optional[str] = ...,124    newline: Optional[str] = ...,125    closefd: bool = ...,126    opener: Optional[Callable[[str, int], int]] = ...,127) -> AsyncFile[bytes]:128    ...129@overload130async def open_file(131    file: Union[str, "PathLike[str]", int],132    mode: OpenTextMode = ...,133    buffering: int = ...,134    encoding: Optional[str] = ...,135    errors: Optional[str] = ...,136    newline: Optional[str] = ...,137    closefd: bool = ...,138    opener: Optional[Callable[[str, int], int]] = ...,139) -> AsyncFile[str]:140    ...141async def open_file(142    file: Union[str, "PathLike[str]", int],143    mode: str = "r",144    buffering: int = -1,145    encoding: Optional[str] = None,146    errors: Optional[str] = None,147    newline: Optional[str] = None,148    closefd: bool = True,149    opener: Optional[Callable[[str, int], int]] = None,150) -> AsyncFile[Any]:151    """152    Open a file asynchronously.153    The arguments are exactly the same as for the builtin :func:`open`.154    :return: an asynchronous file object155    """156    fp = await to_thread.run_sync(157        open, file, mode, buffering, encoding, errors, newline, closefd, opener158    )159    return AsyncFile(fp)160def wrap_file(file: IO[AnyStr]) -> AsyncFile[AnyStr]:161    """162    Wrap an existing file as an asynchronous file.163    :param file: an existing file-like object164    :return: an asynchronous file object165    """166    return AsyncFile(file)167@dataclass(eq=False)168class _PathIterator(AsyncIterator["Path"]):169    iterator: Iterator["PathLike[str]"]170    async def __anext__(self) -> "Path":171        nextval = await to_thread.run_sync(next, self.iterator, None, cancellable=True)172        if nextval is None:173            raise StopAsyncIteration from None174        return Path(cast("PathLike[str]", nextval))175class Path:176    """177    An asynchronous version of :class:`pathlib.Path`.178    This class cannot be substituted for :class:`pathlib.Path` or :class:`pathlib.PurePath`, but179    it is compatible with the :class:`os.PathLike` interface.180    It implements the Python 3.10 version of :class:`pathlib.Path` interface, except for the181    deprecated :meth:`~pathlib.Path.link_to` method.182    Any methods that do disk I/O need to be awaited on. These methods are:183    * :meth:`~pathlib.Path.absolute`184    * :meth:`~pathlib.Path.chmod`185    * :meth:`~pathlib.Path.cwd`186    * :meth:`~pathlib.Path.exists`187    * :meth:`~pathlib.Path.expanduser`188    * :meth:`~pathlib.Path.group`189    * :meth:`~pathlib.Path.hardlink_to`190    * :meth:`~pathlib.Path.home`191    * :meth:`~pathlib.Path.is_block_device`192    * :meth:`~pathlib.Path.is_char_device`193    * :meth:`~pathlib.Path.is_dir`194    * :meth:`~pathlib.Path.is_fifo`195    * :meth:`~pathlib.Path.is_file`196    * :meth:`~pathlib.Path.is_mount`197    * :meth:`~pathlib.Path.lchmod`198    * :meth:`~pathlib.Path.lstat`199    * :meth:`~pathlib.Path.mkdir`200    * :meth:`~pathlib.Path.open`201    * :meth:`~pathlib.Path.owner`202    * :meth:`~pathlib.Path.read_bytes`203    * :meth:`~pathlib.Path.read_text`204    * :meth:`~pathlib.Path.readlink`205    * :meth:`~pathlib.Path.rename`206    * :meth:`~pathlib.Path.replace`207    * :meth:`~pathlib.Path.rmdir`208    * :meth:`~pathlib.Path.samefile`209    * :meth:`~pathlib.Path.stat`210    * :meth:`~pathlib.Path.touch`211    * :meth:`~pathlib.Path.unlink`212    * :meth:`~pathlib.Path.write_bytes`213    * :meth:`~pathlib.Path.write_text`214    Additionally, the following methods return an async iterator yielding :class:`~.Path` objects:215    * :meth:`~pathlib.Path.glob`216    * :meth:`~pathlib.Path.iterdir`217    * :meth:`~pathlib.Path.rglob`218    """219    __slots__ = "_path", "__weakref__"220    __weakref__: Any221    def __init__(self, *args: Union[str, "PathLike[str]"]) -> None:222        self._path: Final[pathlib.Path] = pathlib.Path(*args)223    def __fspath__(self) -> str:224        return self._path.__fspath__()225    def __str__(self) -> str:226        return self._path.__str__()227    def __repr__(self) -> str:228        return f"{self.__class__.__name__}({self.as_posix()!r})"229    def __bytes__(self) -> bytes:230        return self._path.__bytes__()231    def __hash__(self) -> int:232        return self._path.__hash__()233    def __eq__(self, other: object) -> bool:234        target = other._path if isinstance(other, Path) else other235        return self._path.__eq__(target)236    def __lt__(self, other: "Path") -> bool:237        target = other._path if isinstance(other, Path) else other238        return self._path.__lt__(target)239    def __le__(self, other: "Path") -> bool:240        target = other._path if isinstance(other, Path) else other241        return self._path.__le__(target)242    def __gt__(self, other: "Path") -> bool:243        target = other._path if isinstance(other, Path) else other244        return self._path.__gt__(target)245    def __ge__(self, other: "Path") -> bool:246        target = other._path if isinstance(other, Path) else other247        return self._path.__ge__(target)248    def __truediv__(self, other: Any) -> "Path":249        return Path(self._path / other)250    def __rtruediv__(self, other: Any) -> "Path":251        return Path(other) / self252    @property253    def parts(self) -> Tuple[str, ...]:254        return self._path.parts255    @property256    def drive(self) -> str:257        return self._path.drive258    @property259    def root(self) -> str:260        return self._path.root261    @property262    def anchor(self) -> str:263        return self._path.anchor264    @property265    def parents(self) -> Sequence["Path"]:266        return tuple(Path(p) for p in self._path.parents)267    @property268    def parent(self) -> "Path":269        return Path(self._path.parent)270    @property271    def name(self) -> str:272        return self._path.name273    @property274    def suffix(self) -> str:275        return self._path.suffix276    @property277    def suffixes(self) -> List[str]:278        return self._path.suffixes279    @property280    def stem(self) -> str:281        return self._path.stem282    async def absolute(self) -> "Path":283        path = await to_thread.run_sync(self._path.absolute)284        return Path(path)285    def as_posix(self) -> str:286        return self._path.as_posix()287    def as_uri(self) -> str:288        return self._path.as_uri()289    def match(self, path_pattern: str) -> bool:290        return self._path.match(path_pattern)291    def is_relative_to(self, *other: Union[str, "PathLike[str]"]) -> bool:292        try:293            self.relative_to(*other)294            return True295        except ValueError:296            return False297    async def chmod(self, mode: int, *, follow_symlinks: bool = True) -> None:298        func = partial(os.chmod, follow_symlinks=follow_symlinks)299        return await to_thread.run_sync(func, self._path, mode)300    @classmethod301    async def cwd(cls) -> "Path":302        path = await to_thread.run_sync(pathlib.Path.cwd)303        return cls(path)304    async def exists(self) -> bool:305        return await to_thread.run_sync(self._path.exists, cancellable=True)306    async def expanduser(self) -> "Path":307        return Path(await to_thread.run_sync(self._path.expanduser, cancellable=True))308    def glob(self, pattern: str) -> AsyncIterator["Path"]:309        gen = self._path.glob(pattern)310        return _PathIterator(gen)311    async def group(self) -> str:312        return await to_thread.run_sync(self._path.group, cancellable=True)313    async def hardlink_to(self, target: Union[str, pathlib.Path, "Path"]) -> None:314        if isinstance(target, Path):315            target = target._path316        await to_thread.run_sync(os.link, target, self)317    @classmethod318    async def home(cls) -> "Path":319        home_path = await to_thread.run_sync(pathlib.Path.home)320        return cls(home_path)321    def is_absolute(self) -> bool:322        return self._path.is_absolute()323    async def is_block_device(self) -> bool:324        return await to_thread.run_sync(self._path.is_block_device, cancellable=True)325    async def is_char_device(self) -> bool:326        return await to_thread.run_sync(self._path.is_char_device, cancellable=True)327    async def is_dir(self) -> bool:328        return await to_thread.run_sync(self._path.is_dir, cancellable=True)329    async def is_fifo(self) -> bool:330        return await to_thread.run_sync(self._path.is_fifo, cancellable=True)331    async def is_file(self) -> bool:332        return await to_thread.run_sync(self._path.is_file, cancellable=True)333    async def is_mount(self) -> bool:334        return await to_thread.run_sync(os.path.ismount, self._path, cancellable=True)335    def is_reserved(self) -> bool:336        return self._path.is_reserved()337    async def is_socket(self) -> bool:338        return await to_thread.run_sync(self._path.is_socket, cancellable=True)339    async def is_symlink(self) -> bool:340        return await to_thread.run_sync(self._path.is_symlink, cancellable=True)341    def iterdir(self) -> AsyncIterator["Path"]:342        gen = self._path.iterdir()343        return _PathIterator(gen)344    def joinpath(self, *args: Union[str, "PathLike[str]"]) -> "Path":345        return Path(self._path.joinpath(*args))346    async def lchmod(self, mode: int) -> None:347        await to_thread.run_sync(self._path.lchmod, mode)348    async def lstat(self) -> os.stat_result:349        return await to_thread.run_sync(self._path.lstat, cancellable=True)350    async def mkdir(351        self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False352    ) -> None:353        await to_thread.run_sync(self._path.mkdir, mode, parents, exist_ok)354    @overload355    async def open(356        self,357        mode: OpenBinaryMode,358        buffering: int = ...,359        encoding: Optional[str] = ...,360        errors: Optional[str] = ...,361        newline: Optional[str] = ...,362    ) -> AsyncFile[bytes]:363        ...364    @overload365    async def open(366        self,367        mode: OpenTextMode = ...,368        buffering: int = ...,369        encoding: Optional[str] = ...,370        errors: Optional[str] = ...,371        newline: Optional[str] = ...,372    ) -> AsyncFile[str]:373        ...374    async def open(375        self,376        mode: str = "r",377        buffering: int = -1,378        encoding: Optional[str] = None,379        errors: Optional[str] = None,380        newline: Optional[str] = None,381    ) -> AsyncFile[Any]:382        fp = await to_thread.run_sync(383            self._path.open, mode, buffering, encoding, errors, newline384        )385        return AsyncFile(fp)386    async def owner(self) -> str:387        return await to_thread.run_sync(self._path.owner, cancellable=True)388    async def read_bytes(self) -> bytes:389        return await to_thread.run_sync(self._path.read_bytes)390    async def read_text(391        self, encoding: Optional[str] = None, errors: Optional[str] = None392    ) -> str:393        return await to_thread.run_sync(self._path.read_text, encoding, errors)394    def relative_to(self, *other: Union[str, "PathLike[str]"]) -> "Path":395        return Path(self._path.relative_to(*other))396    async def readlink(self) -> "Path":397        target = await to_thread.run_sync(os.readlink, self._path)398        return Path(cast(str, target))399    async def rename(self, target: Union[str, pathlib.PurePath, "Path"]) -> "Path":400        if isinstance(target, Path):401            target = target._path402        await to_thread.run_sync(self._path.rename, target)403        return Path(target)404    async def replace(self, target: Union[str, pathlib.PurePath, "Path"]) -> "Path":405        if isinstance(target, Path):406            target = target._path407        await to_thread.run_sync(self._path.replace, target)408        return Path(target)409    async def resolve(self, strict: bool = False) -> "Path":410        func = partial(self._path.resolve, strict=strict)411        return Path(await to_thread.run_sync(func, cancellable=True))412    def rglob(self, pattern: str) -> AsyncIterator["Path"]:413        gen = self._path.rglob(pattern)414        return _PathIterator(gen)415    async def rmdir(self) -> None:416        await to_thread.run_sync(self._path.rmdir)417    async def samefile(418        self, other_path: Union[str, bytes, int, pathlib.Path, "Path"]419    ) -> bool:420        if isinstance(other_path, Path):421            other_path = other_path._path422        return await to_thread.run_sync(423            self._path.samefile, other_path, cancellable=True424        )425    async def stat(self, *, follow_symlinks: bool = True) -> os.stat_result:426        func = partial(os.stat, follow_symlinks=follow_symlinks)427        return await to_thread.run_sync(func, self._path, cancellable=True)428    async def symlink_to(429        self,430        target: Union[str, pathlib.Path, "Path"],431        target_is_directory: bool = False,432    ) -> None:433        if isinstance(target, Path):434            target = target._path435        await to_thread.run_sync(self._path.symlink_to, target, target_is_directory)436    async def touch(self, mode: int = 0o666, exist_ok: bool = True) -> None:437        await to_thread.run_sync(self._path.touch, mode, exist_ok)438    async def unlink(self, missing_ok: bool = False) -> None:439        try:440            await to_thread.run_sync(self._path.unlink)441        except FileNotFoundError:442            if not missing_ok:443                raise444    def with_name(self, name: str) -> "Path":445        return Path(self._path.with_name(name))446    def with_stem(self, stem: str) -> "Path":447        return Path(self._path.with_name(stem + self._path.suffix))448    def with_suffix(self, suffix: str) -> "Path":449        return Path(self._path.with_suffix(suffix))450    async def write_bytes(self, data: bytes) -> int:451        return await to_thread.run_sync(self._path.write_bytes, data)452    async def write_text(453        self,454        data: str,455        encoding: Optional[str] = None,456        errors: Optional[str] = None,457        newline: Optional[str] = None,458    ) -> int:459        # Path.write_text() does not support the "newline" parameter before Python 3.10460        def sync_write_text() -> int:461            with self._path.open(462                "w", encoding=encoding, errors=errors, newline=newline463            ) as fp:464                return fp.write(data)465        return await to_thread.run_sync(sync_write_text)...test_spawner.py
Source:test_spawner.py  
...39    kwargs.setdefault('poll_interval', 1)40    return LocalProcessSpawner(db=db, **kwargs)41def test_spawner(db, io_loop):42    spawner = new_spawner(db)43    ip, port = io_loop.run_sync(spawner.start)44    assert ip == '127.0.0.1'45    assert isinstance(port, int)46    assert port > 047    spawner.user.server.ip = ip48    spawner.user.server.port = port49    db.commit()50    51    # wait for the process to get to the while True: loop52    time.sleep(1)53    status = io_loop.run_sync(spawner.poll)54    assert status is None55    io_loop.run_sync(spawner.stop)56    status = io_loop.run_sync(spawner.poll)57    assert status == 158def test_single_user_spawner(db, io_loop):59    spawner = new_spawner(db, cmd=['jupyterhub-singleuser'])60    spawner.api_token = 'secret'61    ip, port = io_loop.run_sync(spawner.start)62    assert ip == '127.0.0.1'63    assert isinstance(port, int)64    assert port > 065    spawner.user.server.ip = ip66    spawner.user.server.port = port67    db.commit()68    # wait for http server to come up,69    # checking for early termination every 1s70    def wait():71        return spawner.user.server.wait_up(timeout=1, http=True)72    for i in range(30):73        status = io_loop.run_sync(spawner.poll)74        assert status is None75        try:76            io_loop.run_sync(wait)77        except TimeoutError:78            continue79        else:80            break81    io_loop.run_sync(wait)82    status = io_loop.run_sync(spawner.poll)83    assert status == None84    io_loop.run_sync(spawner.stop)85    status = io_loop.run_sync(spawner.poll)86    assert status == 087def test_stop_spawner_sigint_fails(db, io_loop):88    spawner = new_spawner(db, cmd=[sys.executable, '-c', _uninterruptible])89    io_loop.run_sync(spawner.start)90    91    # wait for the process to get to the while True: loop92    time.sleep(1)93    94    status = io_loop.run_sync(spawner.poll)95    assert status is None96    97    io_loop.run_sync(spawner.stop)98    status = io_loop.run_sync(spawner.poll)99    assert status == -signal.SIGTERM100def test_stop_spawner_stop_now(db, io_loop):101    spawner = new_spawner(db)102    io_loop.run_sync(spawner.start)103    104    # wait for the process to get to the while True: loop105    time.sleep(1)106    107    status = io_loop.run_sync(spawner.poll)108    assert status is None109    110    io_loop.run_sync(lambda : spawner.stop(now=True))111    status = io_loop.run_sync(spawner.poll)112    assert status == -signal.SIGTERM113def test_spawner_poll(db, io_loop):114    first_spawner = new_spawner(db)115    user = first_spawner.user116    io_loop.run_sync(first_spawner.start)117    proc = first_spawner.proc118    status = io_loop.run_sync(first_spawner.poll)119    assert status is None120    user.state = first_spawner.get_state()121    assert 'pid' in user.state122    123    # create a new Spawner, loading from state of previous124    spawner = new_spawner(db, user=first_spawner.user)125    spawner.start_polling()126    127    # wait for the process to get to the while True: loop128    io_loop.run_sync(lambda : gen.sleep(1))129    status = io_loop.run_sync(spawner.poll)130    assert status is None131    132    # kill the process133    proc.terminate()134    for i in range(10):135        if proc.poll() is None:136            time.sleep(1)137        else:138            break139    assert proc.poll() is not None140    io_loop.run_sync(lambda : gen.sleep(2))141    status = io_loop.run_sync(spawner.poll)142    assert status is not None143def test_setcwd():144    cwd = os.getcwd()145    with tempfile.TemporaryDirectory() as td:146        td = os.path.realpath(os.path.abspath(td))147        spawnermod._try_setcwd(td)148        assert os.path.samefile(os.getcwd(), td)149    os.chdir(cwd)150    chdir = os.chdir151    temp_root = os.path.realpath(os.path.abspath(tempfile.gettempdir()))152    def raiser(path):153        path = os.path.realpath(os.path.abspath(path))154        if not path.startswith(temp_root):155            raise OSError(path)...test_simpletest_sanity.py
Source:test_simpletest_sanity.py  
...3# You can obtain one at http://mozilla.org/MPL/2.0/.4from marionette_harness import MarionetteTestCase5class SimpletestSanityTest(MarionetteTestCase):6    callFinish = "return finish();"7    def run_sync(self, test):8        return self.marionette.execute_js_script(test, async=False)9    def run_async(self, test):10        return self.marionette.execute_js_script(test)11    def test_is(self):12        def runtests():13            sentFail1 = "is(true, false, 'isTest1', TEST_UNEXPECTED_FAIL, TEST_PASS);" + self.callFinish14            sentFail2 = "is(true, false, 'isTest2', TEST_UNEXPECTED_FAIL, TEST_PASS);" + self.callFinish15            sentPass1 = "is(true, true, 'isTest3');" + self.callFinish16            sentPass2 = "is(true, true, 'isTest4');" + self.callFinish17            self.assertEqual(1, len(self.run_sync(sentFail1)["failures"]))18            self.assertEqual(0, self.run_sync(sentFail2)["passed"])19            self.assertEqual(1, self.run_sync(sentPass1)["passed"])20            self.assertEqual(0, len(self.run_sync(sentPass2)["failures"]))21            self.marionette.timeout.script = 122            self.assertEqual(1, len(self.run_async(sentFail1)["failures"]))23            self.assertEqual(0, self.run_async(sentFail2)["passed"])24            self.assertEqual(1, self.run_async(sentPass1)["passed"])25            self.assertEqual(0, len(self.run_async(sentPass2)["failures"]))26        self.marionette.set_context("content")27        runtests()28        self.marionette.set_context("chrome")29        runtests()30    def test_isnot(self):31        def runtests():32           sentFail1 = "isnot(true, true, 'isnotTest3', TEST_UNEXPECTED_FAIL, TEST_PASS);" + self.callFinish33           sentFail2 = "isnot(true, true, 'isnotTest4', TEST_UNEXPECTED_FAIL, TEST_PASS);" + self.callFinish34           sentPass1 = "isnot(true, false, 'isnotTest1');" + self.callFinish35           sentPass2 = "isnot(true, false, 'isnotTest2');" + self.callFinish36           self.assertEqual(1, len(self.run_sync(sentFail1)["failures"]));37           self.assertEqual(0, self.run_sync(sentFail2)["passed"]);38           self.assertEqual(0, len(self.run_sync(sentPass1)["failures"]));39           self.assertEqual(1, self.run_sync(sentPass2)["passed"]);40           self.marionette.timeout.script = 141           self.assertEqual(1, len(self.run_async(sentFail1)["failures"]));42           self.assertEqual(0, self.run_async(sentFail2)["passed"]);43           self.assertEqual(0, len(self.run_async(sentPass1)["failures"]));44           self.assertEqual(1, self.run_async(sentPass2)["passed"]);45        self.marionette.set_context("content")46        runtests()47        self.marionette.set_context("chrome")48        runtests()49    def test_ok(self):50        def runtests():51            sentFail1 = "ok(1==2, 'testOk1', TEST_UNEXPECTED_FAIL, TEST_PASS);" + self.callFinish52            sentFail2 = "ok(1==2, 'testOk2', TEST_UNEXPECTED_FAIL, TEST_PASS);" + self.callFinish53            sentPass1 = "ok(1==1, 'testOk3');" + self.callFinish54            sentPass2 = "ok(1==1, 'testOk4');" + self.callFinish55            self.assertEqual(1, len(self.run_sync(sentFail1)["failures"]));56            self.assertEqual(0, self.run_sync(sentFail2)["passed"]);57            self.assertEqual(0, len(self.run_sync(sentPass1)["failures"]));58            self.assertEqual(1, self.run_sync(sentPass2)["passed"]);59            self.marionette.timeout.script = 160            self.assertEqual(1, len(self.run_async(sentFail1)["failures"]));61            self.assertEqual(0, self.run_async(sentFail2)["passed"]);62            self.assertEqual(0, len(self.run_async(sentPass1)["failures"]));63            self.assertEqual(1, self.run_async(sentPass2)["passed"]);64        self.marionette.set_context("content")65        runtests()66        self.marionette.set_context("chrome")67        runtests()68    def test_todo(self):69        def runtests():70            sentFail1 = "todo(1==1, 'testTodo1', TEST_UNEXPECTED_PASS, TEST_KNOWN_FAIL);" + self.callFinish71            sentFail2 = "todo(1==1, 'testTodo2', TEST_UNEXPECTED_PASS, TEST_KNOWN_FAIL);" + self.callFinish72            sentPass1 = "todo(1==2, 'testTodo3');" + self.callFinish73            sentPass2 = "todo(1==2, 'testTodo4');" + self.callFinish74            self.assertEqual(1, len(self.run_sync(sentFail1)["unexpectedSuccesses"]));75            self.assertEqual(0, len(self.run_sync(sentFail2)["expectedFailures"]));76            self.assertEqual(0, len(self.run_sync(sentPass1)["unexpectedSuccesses"]));77            self.assertEqual(1, len(self.run_sync(sentPass2)["expectedFailures"]));78            self.marionette.timeout.script = 179            self.assertEqual(1, len(self.run_async(sentFail1)["unexpectedSuccesses"]));80            self.assertEqual(0, len(self.run_async(sentFail2)["expectedFailures"]));81            self.assertEqual(0, len(self.run_async(sentPass1)["unexpectedSuccesses"]));82            self.assertEqual(1, len(self.run_async(sentPass2)["expectedFailures"]));83        self.marionette.set_context("content")84        runtests()85        self.marionette.set_context("chrome")...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
