How to use run_sync method in localstack

Best Python code snippet using localstack_python

_fileio.py

Source:_fileio.py Github

copy

Full Screen

...72 yield line73 else:74 break75 async def aclose(self) -> None:76 return await to_thread.run_sync(self._fp.close)77 async def read(self, size: int = -1) -> AnyStr:78 return await to_thread.run_sync(self._fp.read, size)79 async def read1(self: "AsyncFile[bytes]", size: int = -1) -> bytes:80 return await to_thread.run_sync(self._fp.read1, size)81 async def readline(self) -> AnyStr:82 return await to_thread.run_sync(self._fp.readline)83 async def readlines(self) -> List[AnyStr]:84 return await to_thread.run_sync(self._fp.readlines)85 async def readinto(self: "AsyncFile[bytes]", b: WriteableBuffer) -> bytes:86 return await to_thread.run_sync(self._fp.readinto, b)87 async def readinto1(self: "AsyncFile[bytes]", b: WriteableBuffer) -> bytes:88 return await to_thread.run_sync(self._fp.readinto1, b)89 @overload90 async def write(self: "AsyncFile[bytes]", b: ReadableBuffer) -> int:91 ...92 @overload93 async def write(self: "AsyncFile[str]", b: str) -> int:94 ...95 async def write(self, b: Union[ReadableBuffer, str]) -> int:96 return await to_thread.run_sync(self._fp.write, b)97 @overload98 async def writelines(99 self: "AsyncFile[bytes]", lines: Iterable[ReadableBuffer]100 ) -> None:101 ...102 @overload103 async def writelines(self: "AsyncFile[str]", lines: Iterable[str]) -> None:104 ...105 async def writelines(106 self, lines: Union[Iterable[ReadableBuffer], Iterable[str]]107 ) -> None:108 return await to_thread.run_sync(self._fp.writelines, lines)109 async def truncate(self, size: Optional[int] = None) -> int:110 return await to_thread.run_sync(self._fp.truncate, size)111 async def seek(self, offset: int, whence: Optional[int] = os.SEEK_SET) -> int:112 return await to_thread.run_sync(self._fp.seek, offset, whence)113 async def tell(self) -> int:114 return await to_thread.run_sync(self._fp.tell)115 async def flush(self) -> None:116 return await to_thread.run_sync(self._fp.flush)117@overload118async def open_file(119 file: Union[str, "PathLike[str]", int],120 mode: OpenBinaryMode,121 buffering: int = ...,122 encoding: Optional[str] = ...,123 errors: Optional[str] = ...,124 newline: Optional[str] = ...,125 closefd: bool = ...,126 opener: Optional[Callable[[str, int], int]] = ...,127) -> AsyncFile[bytes]:128 ...129@overload130async def open_file(131 file: Union[str, "PathLike[str]", int],132 mode: OpenTextMode = ...,133 buffering: int = ...,134 encoding: Optional[str] = ...,135 errors: Optional[str] = ...,136 newline: Optional[str] = ...,137 closefd: bool = ...,138 opener: Optional[Callable[[str, int], int]] = ...,139) -> AsyncFile[str]:140 ...141async def open_file(142 file: Union[str, "PathLike[str]", int],143 mode: str = "r",144 buffering: int = -1,145 encoding: Optional[str] = None,146 errors: Optional[str] = None,147 newline: Optional[str] = None,148 closefd: bool = True,149 opener: Optional[Callable[[str, int], int]] = None,150) -> AsyncFile[Any]:151 """152 Open a file asynchronously.153 The arguments are exactly the same as for the builtin :func:`open`.154 :return: an asynchronous file object155 """156 fp = await to_thread.run_sync(157 open, file, mode, buffering, encoding, errors, newline, closefd, opener158 )159 return AsyncFile(fp)160def wrap_file(file: IO[AnyStr]) -> AsyncFile[AnyStr]:161 """162 Wrap an existing file as an asynchronous file.163 :param file: an existing file-like object164 :return: an asynchronous file object165 """166 return AsyncFile(file)167@dataclass(eq=False)168class _PathIterator(AsyncIterator["Path"]):169 iterator: Iterator["PathLike[str]"]170 async def __anext__(self) -> "Path":171 nextval = await to_thread.run_sync(next, self.iterator, None, cancellable=True)172 if nextval is None:173 raise StopAsyncIteration from None174 return Path(cast("PathLike[str]", nextval))175class Path:176 """177 An asynchronous version of :class:`pathlib.Path`.178 This class cannot be substituted for :class:`pathlib.Path` or :class:`pathlib.PurePath`, but179 it is compatible with the :class:`os.PathLike` interface.180 It implements the Python 3.10 version of :class:`pathlib.Path` interface, except for the181 deprecated :meth:`~pathlib.Path.link_to` method.182 Any methods that do disk I/O need to be awaited on. These methods are:183 * :meth:`~pathlib.Path.absolute`184 * :meth:`~pathlib.Path.chmod`185 * :meth:`~pathlib.Path.cwd`186 * :meth:`~pathlib.Path.exists`187 * :meth:`~pathlib.Path.expanduser`188 * :meth:`~pathlib.Path.group`189 * :meth:`~pathlib.Path.hardlink_to`190 * :meth:`~pathlib.Path.home`191 * :meth:`~pathlib.Path.is_block_device`192 * :meth:`~pathlib.Path.is_char_device`193 * :meth:`~pathlib.Path.is_dir`194 * :meth:`~pathlib.Path.is_fifo`195 * :meth:`~pathlib.Path.is_file`196 * :meth:`~pathlib.Path.is_mount`197 * :meth:`~pathlib.Path.lchmod`198 * :meth:`~pathlib.Path.lstat`199 * :meth:`~pathlib.Path.mkdir`200 * :meth:`~pathlib.Path.open`201 * :meth:`~pathlib.Path.owner`202 * :meth:`~pathlib.Path.read_bytes`203 * :meth:`~pathlib.Path.read_text`204 * :meth:`~pathlib.Path.readlink`205 * :meth:`~pathlib.Path.rename`206 * :meth:`~pathlib.Path.replace`207 * :meth:`~pathlib.Path.rmdir`208 * :meth:`~pathlib.Path.samefile`209 * :meth:`~pathlib.Path.stat`210 * :meth:`~pathlib.Path.touch`211 * :meth:`~pathlib.Path.unlink`212 * :meth:`~pathlib.Path.write_bytes`213 * :meth:`~pathlib.Path.write_text`214 Additionally, the following methods return an async iterator yielding :class:`~.Path` objects:215 * :meth:`~pathlib.Path.glob`216 * :meth:`~pathlib.Path.iterdir`217 * :meth:`~pathlib.Path.rglob`218 """219 __slots__ = "_path", "__weakref__"220 __weakref__: Any221 def __init__(self, *args: Union[str, "PathLike[str]"]) -> None:222 self._path: Final[pathlib.Path] = pathlib.Path(*args)223 def __fspath__(self) -> str:224 return self._path.__fspath__()225 def __str__(self) -> str:226 return self._path.__str__()227 def __repr__(self) -> str:228 return f"{self.__class__.__name__}({self.as_posix()!r})"229 def __bytes__(self) -> bytes:230 return self._path.__bytes__()231 def __hash__(self) -> int:232 return self._path.__hash__()233 def __eq__(self, other: object) -> bool:234 target = other._path if isinstance(other, Path) else other235 return self._path.__eq__(target)236 def __lt__(self, other: "Path") -> bool:237 target = other._path if isinstance(other, Path) else other238 return self._path.__lt__(target)239 def __le__(self, other: "Path") -> bool:240 target = other._path if isinstance(other, Path) else other241 return self._path.__le__(target)242 def __gt__(self, other: "Path") -> bool:243 target = other._path if isinstance(other, Path) else other244 return self._path.__gt__(target)245 def __ge__(self, other: "Path") -> bool:246 target = other._path if isinstance(other, Path) else other247 return self._path.__ge__(target)248 def __truediv__(self, other: Any) -> "Path":249 return Path(self._path / other)250 def __rtruediv__(self, other: Any) -> "Path":251 return Path(other) / self252 @property253 def parts(self) -> Tuple[str, ...]:254 return self._path.parts255 @property256 def drive(self) -> str:257 return self._path.drive258 @property259 def root(self) -> str:260 return self._path.root261 @property262 def anchor(self) -> str:263 return self._path.anchor264 @property265 def parents(self) -> Sequence["Path"]:266 return tuple(Path(p) for p in self._path.parents)267 @property268 def parent(self) -> "Path":269 return Path(self._path.parent)270 @property271 def name(self) -> str:272 return self._path.name273 @property274 def suffix(self) -> str:275 return self._path.suffix276 @property277 def suffixes(self) -> List[str]:278 return self._path.suffixes279 @property280 def stem(self) -> str:281 return self._path.stem282 async def absolute(self) -> "Path":283 path = await to_thread.run_sync(self._path.absolute)284 return Path(path)285 def as_posix(self) -> str:286 return self._path.as_posix()287 def as_uri(self) -> str:288 return self._path.as_uri()289 def match(self, path_pattern: str) -> bool:290 return self._path.match(path_pattern)291 def is_relative_to(self, *other: Union[str, "PathLike[str]"]) -> bool:292 try:293 self.relative_to(*other)294 return True295 except ValueError:296 return False297 async def chmod(self, mode: int, *, follow_symlinks: bool = True) -> None:298 func = partial(os.chmod, follow_symlinks=follow_symlinks)299 return await to_thread.run_sync(func, self._path, mode)300 @classmethod301 async def cwd(cls) -> "Path":302 path = await to_thread.run_sync(pathlib.Path.cwd)303 return cls(path)304 async def exists(self) -> bool:305 return await to_thread.run_sync(self._path.exists, cancellable=True)306 async def expanduser(self) -> "Path":307 return Path(await to_thread.run_sync(self._path.expanduser, cancellable=True))308 def glob(self, pattern: str) -> AsyncIterator["Path"]:309 gen = self._path.glob(pattern)310 return _PathIterator(gen)311 async def group(self) -> str:312 return await to_thread.run_sync(self._path.group, cancellable=True)313 async def hardlink_to(self, target: Union[str, pathlib.Path, "Path"]) -> None:314 if isinstance(target, Path):315 target = target._path316 await to_thread.run_sync(os.link, target, self)317 @classmethod318 async def home(cls) -> "Path":319 home_path = await to_thread.run_sync(pathlib.Path.home)320 return cls(home_path)321 def is_absolute(self) -> bool:322 return self._path.is_absolute()323 async def is_block_device(self) -> bool:324 return await to_thread.run_sync(self._path.is_block_device, cancellable=True)325 async def is_char_device(self) -> bool:326 return await to_thread.run_sync(self._path.is_char_device, cancellable=True)327 async def is_dir(self) -> bool:328 return await to_thread.run_sync(self._path.is_dir, cancellable=True)329 async def is_fifo(self) -> bool:330 return await to_thread.run_sync(self._path.is_fifo, cancellable=True)331 async def is_file(self) -> bool:332 return await to_thread.run_sync(self._path.is_file, cancellable=True)333 async def is_mount(self) -> bool:334 return await to_thread.run_sync(os.path.ismount, self._path, cancellable=True)335 def is_reserved(self) -> bool:336 return self._path.is_reserved()337 async def is_socket(self) -> bool:338 return await to_thread.run_sync(self._path.is_socket, cancellable=True)339 async def is_symlink(self) -> bool:340 return await to_thread.run_sync(self._path.is_symlink, cancellable=True)341 def iterdir(self) -> AsyncIterator["Path"]:342 gen = self._path.iterdir()343 return _PathIterator(gen)344 def joinpath(self, *args: Union[str, "PathLike[str]"]) -> "Path":345 return Path(self._path.joinpath(*args))346 async def lchmod(self, mode: int) -> None:347 await to_thread.run_sync(self._path.lchmod, mode)348 async def lstat(self) -> os.stat_result:349 return await to_thread.run_sync(self._path.lstat, cancellable=True)350 async def mkdir(351 self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False352 ) -> None:353 await to_thread.run_sync(self._path.mkdir, mode, parents, exist_ok)354 @overload355 async def open(356 self,357 mode: OpenBinaryMode,358 buffering: int = ...,359 encoding: Optional[str] = ...,360 errors: Optional[str] = ...,361 newline: Optional[str] = ...,362 ) -> AsyncFile[bytes]:363 ...364 @overload365 async def open(366 self,367 mode: OpenTextMode = ...,368 buffering: int = ...,369 encoding: Optional[str] = ...,370 errors: Optional[str] = ...,371 newline: Optional[str] = ...,372 ) -> AsyncFile[str]:373 ...374 async def open(375 self,376 mode: str = "r",377 buffering: int = -1,378 encoding: Optional[str] = None,379 errors: Optional[str] = None,380 newline: Optional[str] = None,381 ) -> AsyncFile[Any]:382 fp = await to_thread.run_sync(383 self._path.open, mode, buffering, encoding, errors, newline384 )385 return AsyncFile(fp)386 async def owner(self) -> str:387 return await to_thread.run_sync(self._path.owner, cancellable=True)388 async def read_bytes(self) -> bytes:389 return await to_thread.run_sync(self._path.read_bytes)390 async def read_text(391 self, encoding: Optional[str] = None, errors: Optional[str] = None392 ) -> str:393 return await to_thread.run_sync(self._path.read_text, encoding, errors)394 def relative_to(self, *other: Union[str, "PathLike[str]"]) -> "Path":395 return Path(self._path.relative_to(*other))396 async def readlink(self) -> "Path":397 target = await to_thread.run_sync(os.readlink, self._path)398 return Path(cast(str, target))399 async def rename(self, target: Union[str, pathlib.PurePath, "Path"]) -> "Path":400 if isinstance(target, Path):401 target = target._path402 await to_thread.run_sync(self._path.rename, target)403 return Path(target)404 async def replace(self, target: Union[str, pathlib.PurePath, "Path"]) -> "Path":405 if isinstance(target, Path):406 target = target._path407 await to_thread.run_sync(self._path.replace, target)408 return Path(target)409 async def resolve(self, strict: bool = False) -> "Path":410 func = partial(self._path.resolve, strict=strict)411 return Path(await to_thread.run_sync(func, cancellable=True))412 def rglob(self, pattern: str) -> AsyncIterator["Path"]:413 gen = self._path.rglob(pattern)414 return _PathIterator(gen)415 async def rmdir(self) -> None:416 await to_thread.run_sync(self._path.rmdir)417 async def samefile(418 self, other_path: Union[str, bytes, int, pathlib.Path, "Path"]419 ) -> bool:420 if isinstance(other_path, Path):421 other_path = other_path._path422 return await to_thread.run_sync(423 self._path.samefile, other_path, cancellable=True424 )425 async def stat(self, *, follow_symlinks: bool = True) -> os.stat_result:426 func = partial(os.stat, follow_symlinks=follow_symlinks)427 return await to_thread.run_sync(func, self._path, cancellable=True)428 async def symlink_to(429 self,430 target: Union[str, pathlib.Path, "Path"],431 target_is_directory: bool = False,432 ) -> None:433 if isinstance(target, Path):434 target = target._path435 await to_thread.run_sync(self._path.symlink_to, target, target_is_directory)436 async def touch(self, mode: int = 0o666, exist_ok: bool = True) -> None:437 await to_thread.run_sync(self._path.touch, mode, exist_ok)438 async def unlink(self, missing_ok: bool = False) -> None:439 try:440 await to_thread.run_sync(self._path.unlink)441 except FileNotFoundError:442 if not missing_ok:443 raise444 def with_name(self, name: str) -> "Path":445 return Path(self._path.with_name(name))446 def with_stem(self, stem: str) -> "Path":447 return Path(self._path.with_name(stem + self._path.suffix))448 def with_suffix(self, suffix: str) -> "Path":449 return Path(self._path.with_suffix(suffix))450 async def write_bytes(self, data: bytes) -> int:451 return await to_thread.run_sync(self._path.write_bytes, data)452 async def write_text(453 self,454 data: str,455 encoding: Optional[str] = None,456 errors: Optional[str] = None,457 newline: Optional[str] = None,458 ) -> int:459 # Path.write_text() does not support the "newline" parameter before Python 3.10460 def sync_write_text() -> int:461 with self._path.open(462 "w", encoding=encoding, errors=errors, newline=newline463 ) as fp:464 return fp.write(data)465 return await to_thread.run_sync(sync_write_text)...

Full Screen

Full Screen

test_spawner.py

Source:test_spawner.py Github

copy

Full Screen

...39 kwargs.setdefault('poll_interval', 1)40 return LocalProcessSpawner(db=db, **kwargs)41def test_spawner(db, io_loop):42 spawner = new_spawner(db)43 ip, port = io_loop.run_sync(spawner.start)44 assert ip == '127.0.0.1'45 assert isinstance(port, int)46 assert port > 047 spawner.user.server.ip = ip48 spawner.user.server.port = port49 db.commit()50 51 # wait for the process to get to the while True: loop52 time.sleep(1)53 status = io_loop.run_sync(spawner.poll)54 assert status is None55 io_loop.run_sync(spawner.stop)56 status = io_loop.run_sync(spawner.poll)57 assert status == 158def test_single_user_spawner(db, io_loop):59 spawner = new_spawner(db, cmd=['jupyterhub-singleuser'])60 spawner.api_token = 'secret'61 ip, port = io_loop.run_sync(spawner.start)62 assert ip == '127.0.0.1'63 assert isinstance(port, int)64 assert port > 065 spawner.user.server.ip = ip66 spawner.user.server.port = port67 db.commit()68 # wait for http server to come up,69 # checking for early termination every 1s70 def wait():71 return spawner.user.server.wait_up(timeout=1, http=True)72 for i in range(30):73 status = io_loop.run_sync(spawner.poll)74 assert status is None75 try:76 io_loop.run_sync(wait)77 except TimeoutError:78 continue79 else:80 break81 io_loop.run_sync(wait)82 status = io_loop.run_sync(spawner.poll)83 assert status == None84 io_loop.run_sync(spawner.stop)85 status = io_loop.run_sync(spawner.poll)86 assert status == 087def test_stop_spawner_sigint_fails(db, io_loop):88 spawner = new_spawner(db, cmd=[sys.executable, '-c', _uninterruptible])89 io_loop.run_sync(spawner.start)90 91 # wait for the process to get to the while True: loop92 time.sleep(1)93 94 status = io_loop.run_sync(spawner.poll)95 assert status is None96 97 io_loop.run_sync(spawner.stop)98 status = io_loop.run_sync(spawner.poll)99 assert status == -signal.SIGTERM100def test_stop_spawner_stop_now(db, io_loop):101 spawner = new_spawner(db)102 io_loop.run_sync(spawner.start)103 104 # wait for the process to get to the while True: loop105 time.sleep(1)106 107 status = io_loop.run_sync(spawner.poll)108 assert status is None109 110 io_loop.run_sync(lambda : spawner.stop(now=True))111 status = io_loop.run_sync(spawner.poll)112 assert status == -signal.SIGTERM113def test_spawner_poll(db, io_loop):114 first_spawner = new_spawner(db)115 user = first_spawner.user116 io_loop.run_sync(first_spawner.start)117 proc = first_spawner.proc118 status = io_loop.run_sync(first_spawner.poll)119 assert status is None120 user.state = first_spawner.get_state()121 assert 'pid' in user.state122 123 # create a new Spawner, loading from state of previous124 spawner = new_spawner(db, user=first_spawner.user)125 spawner.start_polling()126 127 # wait for the process to get to the while True: loop128 io_loop.run_sync(lambda : gen.sleep(1))129 status = io_loop.run_sync(spawner.poll)130 assert status is None131 132 # kill the process133 proc.terminate()134 for i in range(10):135 if proc.poll() is None:136 time.sleep(1)137 else:138 break139 assert proc.poll() is not None140 io_loop.run_sync(lambda : gen.sleep(2))141 status = io_loop.run_sync(spawner.poll)142 assert status is not None143def test_setcwd():144 cwd = os.getcwd()145 with tempfile.TemporaryDirectory() as td:146 td = os.path.realpath(os.path.abspath(td))147 spawnermod._try_setcwd(td)148 assert os.path.samefile(os.getcwd(), td)149 os.chdir(cwd)150 chdir = os.chdir151 temp_root = os.path.realpath(os.path.abspath(tempfile.gettempdir()))152 def raiser(path):153 path = os.path.realpath(os.path.abspath(path))154 if not path.startswith(temp_root):155 raise OSError(path)...

Full Screen

Full Screen

test_simpletest_sanity.py

Source:test_simpletest_sanity.py Github

copy

Full Screen

...3# You can obtain one at http://mozilla.org/MPL/2.0/.4from marionette_harness import MarionetteTestCase5class SimpletestSanityTest(MarionetteTestCase):6 callFinish = "return finish();"7 def run_sync(self, test):8 return self.marionette.execute_js_script(test, async=False)9 def run_async(self, test):10 return self.marionette.execute_js_script(test)11 def test_is(self):12 def runtests():13 sentFail1 = "is(true, false, 'isTest1', TEST_UNEXPECTED_FAIL, TEST_PASS);" + self.callFinish14 sentFail2 = "is(true, false, 'isTest2', TEST_UNEXPECTED_FAIL, TEST_PASS);" + self.callFinish15 sentPass1 = "is(true, true, 'isTest3');" + self.callFinish16 sentPass2 = "is(true, true, 'isTest4');" + self.callFinish17 self.assertEqual(1, len(self.run_sync(sentFail1)["failures"]))18 self.assertEqual(0, self.run_sync(sentFail2)["passed"])19 self.assertEqual(1, self.run_sync(sentPass1)["passed"])20 self.assertEqual(0, len(self.run_sync(sentPass2)["failures"]))21 self.marionette.timeout.script = 122 self.assertEqual(1, len(self.run_async(sentFail1)["failures"]))23 self.assertEqual(0, self.run_async(sentFail2)["passed"])24 self.assertEqual(1, self.run_async(sentPass1)["passed"])25 self.assertEqual(0, len(self.run_async(sentPass2)["failures"]))26 self.marionette.set_context("content")27 runtests()28 self.marionette.set_context("chrome")29 runtests()30 def test_isnot(self):31 def runtests():32 sentFail1 = "isnot(true, true, 'isnotTest3', TEST_UNEXPECTED_FAIL, TEST_PASS);" + self.callFinish33 sentFail2 = "isnot(true, true, 'isnotTest4', TEST_UNEXPECTED_FAIL, TEST_PASS);" + self.callFinish34 sentPass1 = "isnot(true, false, 'isnotTest1');" + self.callFinish35 sentPass2 = "isnot(true, false, 'isnotTest2');" + self.callFinish36 self.assertEqual(1, len(self.run_sync(sentFail1)["failures"]));37 self.assertEqual(0, self.run_sync(sentFail2)["passed"]);38 self.assertEqual(0, len(self.run_sync(sentPass1)["failures"]));39 self.assertEqual(1, self.run_sync(sentPass2)["passed"]);40 self.marionette.timeout.script = 141 self.assertEqual(1, len(self.run_async(sentFail1)["failures"]));42 self.assertEqual(0, self.run_async(sentFail2)["passed"]);43 self.assertEqual(0, len(self.run_async(sentPass1)["failures"]));44 self.assertEqual(1, self.run_async(sentPass2)["passed"]);45 self.marionette.set_context("content")46 runtests()47 self.marionette.set_context("chrome")48 runtests()49 def test_ok(self):50 def runtests():51 sentFail1 = "ok(1==2, 'testOk1', TEST_UNEXPECTED_FAIL, TEST_PASS);" + self.callFinish52 sentFail2 = "ok(1==2, 'testOk2', TEST_UNEXPECTED_FAIL, TEST_PASS);" + self.callFinish53 sentPass1 = "ok(1==1, 'testOk3');" + self.callFinish54 sentPass2 = "ok(1==1, 'testOk4');" + self.callFinish55 self.assertEqual(1, len(self.run_sync(sentFail1)["failures"]));56 self.assertEqual(0, self.run_sync(sentFail2)["passed"]);57 self.assertEqual(0, len(self.run_sync(sentPass1)["failures"]));58 self.assertEqual(1, self.run_sync(sentPass2)["passed"]);59 self.marionette.timeout.script = 160 self.assertEqual(1, len(self.run_async(sentFail1)["failures"]));61 self.assertEqual(0, self.run_async(sentFail2)["passed"]);62 self.assertEqual(0, len(self.run_async(sentPass1)["failures"]));63 self.assertEqual(1, self.run_async(sentPass2)["passed"]);64 self.marionette.set_context("content")65 runtests()66 self.marionette.set_context("chrome")67 runtests()68 def test_todo(self):69 def runtests():70 sentFail1 = "todo(1==1, 'testTodo1', TEST_UNEXPECTED_PASS, TEST_KNOWN_FAIL);" + self.callFinish71 sentFail2 = "todo(1==1, 'testTodo2', TEST_UNEXPECTED_PASS, TEST_KNOWN_FAIL);" + self.callFinish72 sentPass1 = "todo(1==2, 'testTodo3');" + self.callFinish73 sentPass2 = "todo(1==2, 'testTodo4');" + self.callFinish74 self.assertEqual(1, len(self.run_sync(sentFail1)["unexpectedSuccesses"]));75 self.assertEqual(0, len(self.run_sync(sentFail2)["expectedFailures"]));76 self.assertEqual(0, len(self.run_sync(sentPass1)["unexpectedSuccesses"]));77 self.assertEqual(1, len(self.run_sync(sentPass2)["expectedFailures"]));78 self.marionette.timeout.script = 179 self.assertEqual(1, len(self.run_async(sentFail1)["unexpectedSuccesses"]));80 self.assertEqual(0, len(self.run_async(sentFail2)["expectedFailures"]));81 self.assertEqual(0, len(self.run_async(sentPass1)["unexpectedSuccesses"]));82 self.assertEqual(1, len(self.run_async(sentPass2)["expectedFailures"]));83 self.marionette.set_context("content")84 runtests()85 self.marionette.set_context("chrome")...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful