Best Python code snippet using playwright-python
Trimmer.py
Source:Trimmer.py  
...20        for region in regions:21            view.erase(edit, region)22            has_matches = True23        if has_matches:24            sublime.set_timeout(lambda: sublime.status_message(25                'Trimmer: trailing whitespace removed.'), 0)26        else:27            sublime.set_timeout(lambda: sublime.status_message(28                'Trimmer: no trailing whitespace found.'), 0)29class DeleteEmptyLinesCommand(sublime_plugin.TextCommand):30    def run(self, edit):31        view = self.view32        has_matches = False33        reobj = re.compile('^[ \t]*$\r?\n', re.MULTILINE)34        for region in selections(view):35            str_buffer = view.substr(region)36            trimmed = reobj.sub('', str_buffer)37            if str_buffer != trimmed:38                view.replace(edit, region, trimmed)39                has_matches = True40        if has_matches is True:41            sublime.set_timeout(lambda: sublime.status_message(42                'Trimmer: empty lines deleted.'), 0)43        else:44            sublime.set_timeout(lambda: sublime.status_message(45                'Trimmer: no empty lines to delete.'), 0)46class RemoveBlankSpaces(sublime_plugin.TextCommand):47    def run(self, edit):48        view = self.view49        has_matches = False50        reobj = re.compile(r'[ \t\r\n\v\f]')51        for region in selections(view):52            str_buffer = view.substr(region)53            trimmed = reobj.sub('', str_buffer)54            if str_buffer != trimmed:55                view.replace(edit, region, trimmed)56                has_matches = True57        if has_matches is True:58            sublime.set_timeout(lambda: sublime.status_message(59                'Trimmer: blanks spaces removed.'), 0)60        else:61            sublime.set_timeout(lambda: sublime.status_message(62                'Trimmer: no blank spaces to remove.'), 0)63class TrimLeadingWhitespaceCommand(sublime_plugin.TextCommand):64    def run(self, edit):65        view = self.view66        has_matches = False67        reobj = re.compile('^[ \t]+', re.MULTILINE)68        for region in selections(view):69            str_buffer = view.substr(region)70            trimmed = reobj.sub('', str_buffer)71            if str_buffer != trimmed:72                view.replace(edit, region, trimmed)73                has_matches = True74        if has_matches is True:75            sublime.set_timeout(lambda: sublime.status_message(76                'Trimmer: leading whitespace removed.'), 0)77        else:78            sublime.set_timeout(lambda: sublime.status_message(79                'Trimmer: no leading whitespace to remove.'), 0)80class TrimLeadingTrailingWhitespace(sublime_plugin.TextCommand):81    def run(self, edit):82        view = self.view83        has_matches = False84        reobj = re.compile('^[ \t]+|[\t ]+$', re.MULTILINE)85        for region in selections(view):86            str_buffer = view.substr(region)87            trimmed = reobj.sub('', str_buffer)88            if str_buffer != trimmed:89                view.replace(edit, region, trimmed)90                has_matches = True91        if has_matches is True:92            sublime.set_timeout(lambda: sublime.status_message(93                'Trimmer: leading and trailing whitespace removed.'), 0)94        else:95            sublime.set_timeout(lambda: sublime.status_message(96                'Trimmer: no leading or trailing whitespace to remove.'), 0)97class CollapseLines(sublime_plugin.TextCommand):98    def run(self, edit):99        view = self.view100        has_matches = False101        reobj = re.compile(r'(?:\s*)(\r?\n)(?:\s*)(?:\r?\n+)')102        for region in selections(view):103            str_buffer = view.substr(region)104            trimmed = reobj.sub(r'\1\1', str_buffer)105            if str_buffer != trimmed:106                view.replace(edit, region, trimmed)107                has_matches = True108        if has_matches is True:109            sublime.set_timeout(lambda: sublime.status_message(110                'Trimmer: lines collapsed.'), 0)111        else:112            sublime.set_timeout(lambda: sublime.status_message(113                'Trimmer: no lines to collapse.'), 0)114class CollapseSpaces(sublime_plugin.TextCommand):115    def run(self, edit):116        view = self.view117        has_matches = False118        reobj = re.compile('([ ])[ ]+')119        for region in selections(view):120            str_buffer = view.substr(region)121            trimmed = reobj.sub(r'\1', str_buffer)122            if str_buffer != trimmed:123                view.replace(edit, region, trimmed)124                has_matches = True125        if has_matches is True:126            sublime.set_timeout(lambda: sublime.status_message(127                'Trimmer: spaces collapsed.'), 0)128        else:129            sublime.set_timeout(lambda: sublime.status_message(130                'Trimmer: no spaces to collapse.'), 0)131class NormalizeSpaces(sublime_plugin.TextCommand):132    def run(self, edit):133        view = self.view134        has_matches = False135        reobj = re.compile('(^\\s+)|\\s(?=\\s+)|(\\s+$)')136        for region in selections(view):137            str_buffer = view.substr(region)138            trimmed = reobj.sub('', str_buffer)139            if str_buffer != trimmed:140                view.replace(edit, region, trimmed)141                has_matches = True142        if has_matches is True:143            sublime.set_timeout(lambda: sublime.status_message(144                'Trimmer: spaces normalized.'), 0)145        else:146            sublime.set_timeout(lambda: sublime.status_message(147                'Trimmer: no spaces to normalize.'), 0)148class TokenizeString(sublime_plugin.TextCommand):149    def run(self, edit):150        view = self.view151        has_matches = False152        reobj = re.compile('^ |\t+|( ){2,}|\t| |\t+$', re.MULTILINE)153        for region in selections(view):154            str_buffer = view.substr(region)155            trimmed = reobj.sub('', str_buffer)156            if str_buffer != trimmed:157                view.replace(edit, region, trimmed)158                has_matches = True159        if has_matches is True:160            sublime.set_timeout(lambda: sublime.status_message(161                'Trimmer: string tokenized.'), 0)162        else:163            sublime.set_timeout(lambda: sublime.status_message(164                'Trimmer: nothing to tokenize.'), 0)165class TrimEdges(sublime_plugin.TextCommand):166    def run(self, edit):167        view = self.view168        has_matches = False169        reobj = re.compile(r'(\A\s+|\s+\Z)')170        if view.size() > 0:171            region = sublime.Region(0, view.size())172            str_buffer = view.substr(region)173            trimmed = reobj.sub('', str_buffer)174            if str_buffer != trimmed:175                view.replace(edit, region, trimmed)176                has_matches = True177        if has_matches is True:178            return sublime.set_timeout(lambda: sublime.status_message(179                'Trimmer: file edges trimmed.'), 0)180        else:181            return sublime.set_timeout(lambda: sublime.status_message(182                'Trimmer: no file edges to trim.'), 0)183class DeleteEmptyTags(sublime_plugin.TextCommand):184    def run(self, edit):185        view = self.view186        has_matches = False187        reobj = re.compile(r'<([A-Z][A-Z0-9]*)\b[^>]*>\s*</\1>', re.IGNORECASE)188        for region in selections(view):189            str_buffer = view.substr(region)190            trimmed = reobj.sub('', str_buffer)191            if str_buffer != trimmed:192                view.replace(edit, region, trimmed)193                has_matches = True194        if has_matches is True:195            sublime.set_timeout(lambda: sublime.status_message(196                'Trimmer: empty tags deleted.'), 0)197        else:198            sublime.set_timeout(lambda: sublime.status_message(199                'Trimmer: no empty tags to delete.'), 0)200class TrimSelections(sublime_plugin.TextCommand):201    def run(self, edit):202        """203        Trim leading and trailing whitespace from selections.204        Originally from the 'MultiâEditâUtils' Plug-in205        https://github.com/philippotto/Sublime-MultiEditUtils206        """207        view = self.view208        selection = view.sel()209        new_regions = []210        for current_region in selection:211            text = view.substr(current_region)212            l_stripped_text = text.lstrip()213            r_stripped_text = l_stripped_text.rstrip()214            l_stripped_count = len(text) - len(l_stripped_text)215            r_stripped_count = len(l_stripped_text) - len(r_stripped_text)216            a = current_region.begin() + l_stripped_count217            b = current_region.end() - r_stripped_count218            if a == b:219                # the region only contained whitespace220                # use the old selection end to avoid jumping of cursor221                a = b = current_region.b222            new_regions.append(sublime.Region(a, b))223        selection.clear()224        for region in new_regions:225            selection.add(region)226        sublime.set_timeout(lambda: sublime.status_message(227            'Trimmer: selections trimmed.'), 0)228class RemoveComments(sublime_plugin.TextCommand):229    def run(self, edit):230        view = self.view231        has_matches = False232        re_single_line_comment = re.compile(r"//.*$", re.MULTILINE)233        re_hash_comment = re.compile("#[^!].*$", re.MULTILINE)234        re_html_comment = re.compile("<!--.*?-->", re.DOTALL)235        re_block_comment = re.compile(r"/\*.*?\*/", re.DOTALL)236        re_ini_comment = re.compile(r"^(?:\s+)?;.*$", re.MULTILINE)237        for region in selections(view):238            str_buffer = view.substr(region)239            #240            # TODO: re-work this brute force approach and filter by syntax/scope241            trimmed = re_single_line_comment.sub('', str_buffer)242            trimmed = re_hash_comment.sub('', trimmed)243            trimmed = re_html_comment.sub('', trimmed)244            trimmed = re_block_comment.sub('', trimmed)245            trimmed = re_ini_comment.sub('', trimmed)246            if str_buffer != trimmed:247                view.replace(edit, region, trimmed)248                has_matches = True249        if has_matches is True:250            view.run_command('collapse_lines')251            sublime.set_timeout(lambda: sublime.status_message(252                'Trimmer: comments removed.'), 0)253        else:254            sublime.set_timeout(lambda: sublime.status_message(255                'Trimmer: no comments to remove.'), 0)256class ReplaceSmartCharactersCommand(sublime_plugin.TextCommand):257    def run(self, edit):258        view = self.view259        has_matches = False260        smart_replacements = [261            [u'[âââ]', u'\''],262            [u'[ââ]', u'"'],263            [u'[â]', u'"'],264            [u'[â¦]', u'...'],265            [u'[â]', u'---'],266            [u'[â]', u'--'],267            [u'[â¢]', u'*'],268            [u'[·]', u'-'],269            [u'[â]', u'   '],270            [u'[â]', u'  '],271            [u'[ ââ]', u' '],272            [u'[«]', u'<<'],273            [u'[»]', u'>>'],274            [u'[©]', u'(C)'],275            [u'[®]', u'(R)'],276            [u'[â¢]', u'(TM)']277        ]278        for replacement in smart_replacements:279            for region in selections(view):280                source_text = view.substr(region)281                if len(source_text) > 0:282                    replaced_text = re.sub(283                        replacement[0], replacement[1], source_text)284                    if source_text != replaced_text:285                        view.replace(edit, region, replaced_text)286                        has_matches = True287        if has_matches is True:288            sublime.set_timeout(lambda: sublime.status_message(289                'Trimmer: smart characters replaced.'), 0)290        else:291            sublime.set_timeout(lambda: sublime.status_message(...python_html.py
Source:python_html.py  
...145        desear()146def num_giros_dados():147    numero = ran.randint(5, 9)148    if numero == 5:149        timer.set_timeout(girar_dado1, 200)150        timer.set_timeout(girar_dado2, 200)151        timer.set_timeout(girar_dado1, 400)152        timer.set_timeout(girar_dado2, 400)153        timer.set_timeout(girar_dado1, 600)154        timer.set_timeout(girar_dado2, 600)155        timer.set_timeout(girar_dado1, 800)156        timer.set_timeout(girar_dado2, 800)157        timer.set_timeout(girar_dado1, 1000)158        timer.set_timeout(girar_dado2, 1000)159    elif numero == 6:160        timer.set_timeout(girar_dado1, 200)161        timer.set_timeout(girar_dado2, 200)162        timer.set_timeout(girar_dado1, 400)163        timer.set_timeout(girar_dado2, 400)164        timer.set_timeout(girar_dado1, 600)165        timer.set_timeout(girar_dado2, 600)166        timer.set_timeout(girar_dado1, 800)167        timer.set_timeout(girar_dado2, 800)168        timer.set_timeout(girar_dado1, 1000)169        timer.set_timeout(girar_dado2, 1000)170        timer.set_timeout(girar_dado1, 1200)171        timer.set_timeout(girar_dado2, 1200)172    elif numero == 7:173        timer.set_timeout(girar_dado1, 200)174        timer.set_timeout(girar_dado2, 200)175        timer.set_timeout(girar_dado1, 400)176        timer.set_timeout(girar_dado2, 400)177        timer.set_timeout(girar_dado1, 600)178        timer.set_timeout(girar_dado2, 600)179        timer.set_timeout(girar_dado1, 800)180        timer.set_timeout(girar_dado2, 800)181        timer.set_timeout(girar_dado1, 1000)182        timer.set_timeout(girar_dado2, 1000)183        timer.set_timeout(girar_dado1, 1200)184        timer.set_timeout(girar_dado2, 1200)185        timer.set_timeout(girar_dado1, 1400)186        timer.set_timeout(girar_dado2, 1400)187    elif numero == 8:188        timer.set_timeout(girar_dado1, 200)189        timer.set_timeout(girar_dado2, 200)190        timer.set_timeout(girar_dado1, 400)191        timer.set_timeout(girar_dado2, 400)192        timer.set_timeout(girar_dado1, 600)193        timer.set_timeout(girar_dado2, 600)194        timer.set_timeout(girar_dado1, 800)195        timer.set_timeout(girar_dado2, 800)196        timer.set_timeout(girar_dado1, 1000)197        timer.set_timeout(girar_dado2, 1000)198        timer.set_timeout(girar_dado1, 1200)199        timer.set_timeout(girar_dado2, 1200)200        timer.set_timeout(girar_dado1, 1400)201        timer.set_timeout(girar_dado2, 1400)202        timer.set_timeout(girar_dado1, 1600)203        timer.set_timeout(girar_dado2, 1600)204    elif numero == 9:205        timer.set_timeout(girar_dado1, 200)206        timer.set_timeout(girar_dado2, 200)207        timer.set_timeout(girar_dado1, 400)208        timer.set_timeout(girar_dado1, 400)209        timer.set_timeout(girar_dado2, 600)210        timer.set_timeout(girar_dado1, 600)211        timer.set_timeout(girar_dado2, 800)212        timer.set_timeout(girar_dado1, 800)213        timer.set_timeout(girar_dado2, 1000)214        timer.set_timeout(girar_dado1, 1000)215        timer.set_timeout(girar_dado2, 1200)216        timer.set_timeout(girar_dado2, 1200)217        timer.set_timeout(girar_dado1, 1400)218        timer.set_timeout(girar_dado2, 1400)219        timer.set_timeout(girar_dado1, 1600)220        timer.set_timeout(girar_dado2, 1600)221        timer.set_timeout(girar_dado1, 1800)222        timer.set_timeout(girar_dado2, 1800)223    timer.set_timeout(ejecutar, 1900)224def sorteo(ev):225    num_giros_dados()226def sorteo_par(ev):227    document['sel'].text = "Par"228    sorteo(ev)229def sorteo_impar(ev):230    document['sel'].text = "Impar"231    sorteo(ev)232def suma_apuesta(ev):233    captura = document['apuesta'].value234    if (int(captura) > billetera):235        document['apuesta'].style.color = 'red'236        document['par'].unbind('click')237        document['impar'].unbind('click')...test_server.py
Source:test_server.py  
...28    signal.signal(signal.SIGINT, old_intr)29    signal.signal(signal.SIGTERM, old_term)30    signal.signal(signal.SIGUSR1, old_term)31@pytest.fixture32def set_timeout():33    def make_timeout(sec, callback):34        def _callback(signum, frame):35            signal.alarm(0)36            callback()37        signal.signal(signal.SIGALRM, _callback)38        signal.setitimer(signal.ITIMER_REAL, sec)39    yield make_timeout40@pytest.fixture41def exec_recorder():42    f = tempfile.NamedTemporaryFile(43        mode='w', encoding='utf8',44        prefix='aiotools.tests.server.',45    )46    f.close()47    def write(msg: str) -> None:48        path = f"{f.name}.{os.getpid()}"49        with open(path, 'a', encoding='utf8') as writer:50            writer.write(msg + '\n')51    def read() -> Sequence[str]:52        lines: List[str] = []53        for path in glob.glob(f"{f.name}.*"):54            with open(path, 'r', encoding='utf8') as reader:55                lines.extend(line.strip() for line in reader.readlines())56        return lines57    yield write, read58    for path in glob.glob(f"{f.name}.*"):59        os.unlink(path)60def interrupt():61    os.kill(0, signal.SIGINT)62def interrupt_usr1():63    os.kill(os.getpid(), signal.SIGUSR1)64@aiotools.server   # type: ignore65async def myserver_simple(loop, proc_idx, args):66    write = args[0]67    await asyncio.sleep(0)68    write(f'started:{proc_idx}')69    yield70    await asyncio.sleep(0)71    write(f'terminated:{proc_idx}')72def test_server_singleproc(set_timeout, restore_signal, exec_recorder):73    write, read = exec_recorder74    set_timeout(0.2, interrupt)75    aiotools.start_server(76        myserver_simple,77        args=(write,),78    )79    lines = set(read())80    assert 'started:0' in lines81    assert 'terminated:0' in lines82def test_server_multiproc(set_timeout, restore_signal, exec_recorder):83    write, read = exec_recorder84    set_timeout(0.2, interrupt)85    aiotools.start_server(86        myserver_simple,87        num_workers=3,88        args=(write,),89    )90    lines = set(read())91    assert lines == {92        'started:0', 'started:1', 'started:2',93        'terminated:0', 'terminated:1', 'terminated:2',94    }95@aiotools.server  # type: ignore96async def myserver_signal(loop, proc_idx, args):97    write = args[0]98    await asyncio.sleep(0)99    write(f'started:{proc_idx}')100    received_signum = yield101    await asyncio.sleep(0)102    write(f'terminated:{proc_idx}:{received_signum}')103def test_server_multiproc_custom_stop_signals(104    set_timeout,105    restore_signal,106    exec_recorder,107):108    write, read = exec_recorder109    set_timeout(0.2, interrupt_usr1)110    aiotools.start_server(111        myserver_signal,112        num_workers=2,113        stop_signals={signal.SIGUSR1},114        args=(write,),115    )116    lines = set(read())117    assert {'started:0', 'started:1'} < lines118    assert {119        f'terminated:0:{int(signal.SIGUSR1)}',120        f'terminated:1:{int(signal.SIGUSR1)}',121    } < lines122@aiotools.server  # type: ignore123async def myserver_worker_init_error(loop, proc_idx, args):124    write = args[0]125    class _LogAdaptor:126        def __init__(self, writer):127            self.writer = writer128        def write(self, msg):129            msg = msg.strip().replace('\n', ' ')130            self.writer(f'log:{proc_idx}:{msg}')131    log_stream = _LogAdaptor(write)132    logging.config.dictConfig({133        'version': 1,134        'handlers': {135            'console': {136                'class': 'logging.StreamHandler',137                'stream': log_stream,138                'level': 'DEBUG',139            },140        },141        'loggers': {142            'aiotools': {143                'handlers': ['console'],144                'level': 'DEBUG',145            },146        },147    })148    log = logging.getLogger('aiotools')149    write(f'started:{proc_idx}')150    log.debug('hello')151    if proc_idx in (0, 2):152        # delay until other workers start normally.153        await asyncio.sleep(0.1 * proc_idx)154        raise ZeroDivisionError('oops')155    yield156    # should not be reached if errored.157    await asyncio.sleep(0)158    write(f'terminated:{proc_idx}')159def test_server_worker_init_error(restore_signal, exec_recorder):160    write, read = exec_recorder161    aiotools.start_server(162        myserver_worker_init_error,163        num_workers=4,164        args=(write,),165    )166    lines = set(read())167    assert sum(1 if line.startswith('started:') else 0 for line in lines) == 4168    # workers who did not raise errors have already started,169    # and they should have terminated normally170    # when the errorneous worker interrupted the main loop.171    assert sum(1 if line.startswith('terminated:') else 0 for line in lines) == 2172    assert sum(1 if 'hello' in line else 0 for line in lines) == 4173    assert sum(1 if 'ZeroDivisionError: oops' in line else 0 for line in lines) == 2174def test_server_user_main(set_timeout, restore_signal):175    main_enter = False176    main_exit = False177    @aiotools.main178    def mymain_user_main():179        nonlocal main_enter, main_exit180        main_enter = True181        yield 987182        main_exit = True183    @aiotools.server  # type: ignore184    async def myworker_user_main(loop, proc_idx, args):185        assert args[0] == 987  # first arg from user main186        assert args[1] == 123  # second arg from start_server args187        yield188    set_timeout(0.2, interrupt)189    aiotools.start_server(190        myworker_user_main,191        mymain_user_main,192        num_workers=3,193        args=(123,),194    )195    assert main_enter196    assert main_exit197def test_server_user_main_custom_stop_signals(set_timeout, restore_signal):198    main_enter = False199    main_exit = False200    main_signal = None201    worker_signals = mp.Array('i', 3)202    @aiotools.main203    def mymain():204        nonlocal main_enter, main_exit, main_signal205        main_enter = True206        main_signal = yield207        main_exit = True208    @aiotools.server209    async def myworker(loop, proc_idx, args):210        worker_signals = args[0]211        worker_signals[proc_idx] = yield212    def noop(signum, frame):213        pass214    set_timeout(0.2, interrupt_usr1)215    aiotools.start_server(216        myworker,217        mymain,218        num_workers=3,219        stop_signals={signal.SIGUSR1},220        args=(worker_signals,),221    )222    assert main_enter223    assert main_exit224    assert main_signal == signal.SIGUSR1225    assert list(worker_signals) == [signal.SIGUSR1] * 3226def test_server_user_main_tuple(set_timeout, restore_signal):227    main_enter = False228    main_exit = False229    @aiotools.main230    def mymain():231        nonlocal main_enter, main_exit232        main_enter = True233        yield 987, 654234        main_exit = True235    @aiotools.server236    async def myworker(loop, proc_idx, args):237        assert args[0] == 987  # first arg from user main238        assert args[1] == 654  # second arg from user main239        assert args[2] == 123  # third arg from start_server args240        yield241    set_timeout(0.2, interrupt)242    aiotools.start_server(243        myworker,244        mymain,245        num_workers=3,246        args=(123,),247    )248    assert main_enter249    assert main_exit250def test_server_extra_proc(set_timeout, restore_signal):251    extras = mp.Array('i', [0, 0])252    def extra_proc(key, _, pidx, args):253        assert _ is None254        extras[key] = 980 + key255        try:256            while True:257                time.sleep(0.1)258        except KeyboardInterrupt:259            print(f'extra[{key}] interrupted', file=sys.stderr)260        except Exception as e:261            print(f'extra[{key}] exception', e, file=sys.stderr)262        finally:263            print(f'extra[{key}] finish', file=sys.stderr)264            extras[key] = 990 + key265    @aiotools.server266    async def myworker(loop, pidx, args):267        yield268    set_timeout(0.2, interrupt)269    aiotools.start_server(myworker, extra_procs=[270                              functools.partial(extra_proc, 0),271                              functools.partial(extra_proc, 1)],272                          num_workers=3, args=(123, ))273    assert extras[0] == 990274    assert extras[1] == 991275def test_server_extra_proc_custom_stop_signal(set_timeout, restore_signal):276    received_signals = mp.Array('i', [0, 0])277    def extra_proc(key, _, pidx, args):278        received_signals = args[0]279        try:280            while True:281                time.sleep(0.1)282        except aiotools.InterruptedBySignal as e:283            received_signals[key] = e.args[0]284    @aiotools.server285    async def myworker(loop, pidx, args):286        yield287    set_timeout(0.3, interrupt_usr1)288    aiotools.start_server(myworker, extra_procs=[289                              functools.partial(extra_proc, 0),290                              functools.partial(extra_proc, 1)],291                          stop_signals={signal.SIGUSR1},292                          args=(received_signals, ),293                          num_workers=3)294    assert received_signals[0] == signal.SIGUSR1...test_lock.py
Source:test_lock.py  
...68            t.start()69        for t in threads:70            t.join()71        def test03_lock_timeout(self):72            self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)73            self.assertEqual(self.env.get_timeout(db.DB_SET_LOCK_TIMEOUT), 0)74            self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)75            self.assertEqual(self.env.get_timeout(db.DB_SET_TXN_TIMEOUT), 0)76            self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)77            self.assertEqual(self.env.get_timeout(db.DB_SET_LOCK_TIMEOUT), 123456)78            self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)79            self.assertEqual(self.env.get_timeout(db.DB_SET_TXN_TIMEOUT), 7890123)80    def test04_lock_timeout2(self):81        self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)82        self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)83        self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)84        self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)85        def deadlock_detection() :86            while not deadlock_detection.end :87                deadlock_detection.count = \88                    self.env.lock_detect(db.DB_LOCK_EXPIRE)89                if deadlock_detection.count :90                    while not deadlock_detection.end :91                        pass92                    break93                time.sleep(0.01)94        deadlock_detection.end=False95        deadlock_detection.count=096        t=Thread(target=deadlock_detection)97        import sys98        if sys.version_info[0] < 3 :99            t.setDaemon(True)100        else :101            t.daemon = True102        t.start()103        self.env.set_timeout(100000, db.DB_SET_LOCK_TIMEOUT)104        anID = self.env.lock_id()105        anID2 = self.env.lock_id()106        self.assertNotEqual(anID, anID2)107        lock = self.env.lock_get(anID, "shared lock", db.DB_LOCK_WRITE)108        start_time=time.time()109        self.assertRaises(db.DBLockNotGrantedError,110                self.env.lock_get,anID2, "shared lock", db.DB_LOCK_READ)111        end_time=time.time()112        deadlock_detection.end=True113        # Floating point rounding114        self.assertTrue((end_time-start_time) >= 0.0999)115        self.env.lock_put(lock)116        t.join()117        self.env.lock_id_free(anID)...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
