Best Python code snippet using yandex-tank
Trimmer.py
Source:Trimmer.py  
...20        for region in regions:21            view.erase(edit, region)22            has_matches = True23        if has_matches:24            sublime.set_timeout(lambda: sublime.status_message(25                'Trimmer: trailing whitespace removed.'), 0)26        else:27            sublime.set_timeout(lambda: sublime.status_message(28                'Trimmer: no trailing whitespace found.'), 0)29class DeleteEmptyLinesCommand(sublime_plugin.TextCommand):30    def run(self, edit):31        view = self.view32        has_matches = False33        reobj = re.compile('^[ \t]*$\r?\n', re.MULTILINE)34        for region in selections(view):35            str_buffer = view.substr(region)36            trimmed = reobj.sub('', str_buffer)37            if str_buffer != trimmed:38                view.replace(edit, region, trimmed)39                has_matches = True40        if has_matches is True:41            sublime.set_timeout(lambda: sublime.status_message(42                'Trimmer: empty lines deleted.'), 0)43        else:44            sublime.set_timeout(lambda: sublime.status_message(45                'Trimmer: no empty lines to delete.'), 0)46class RemoveBlankSpaces(sublime_plugin.TextCommand):47    def run(self, edit):48        view = self.view49        has_matches = False50        reobj = re.compile(r'[ \t\r\n\v\f]')51        for region in selections(view):52            str_buffer = view.substr(region)53            trimmed = reobj.sub('', str_buffer)54            if str_buffer != trimmed:55                view.replace(edit, region, trimmed)56                has_matches = True57        if has_matches is True:58            sublime.set_timeout(lambda: sublime.status_message(59                'Trimmer: blanks spaces removed.'), 0)60        else:61            sublime.set_timeout(lambda: sublime.status_message(62                'Trimmer: no blank spaces to remove.'), 0)63class TrimLeadingWhitespaceCommand(sublime_plugin.TextCommand):64    def run(self, edit):65        view = self.view66        has_matches = False67        reobj = re.compile('^[ \t]+', re.MULTILINE)68        for region in selections(view):69            str_buffer = view.substr(region)70            trimmed = reobj.sub('', str_buffer)71            if str_buffer != trimmed:72                view.replace(edit, region, trimmed)73                has_matches = True74        if has_matches is True:75            sublime.set_timeout(lambda: sublime.status_message(76                'Trimmer: leading whitespace removed.'), 0)77        else:78            sublime.set_timeout(lambda: sublime.status_message(79                'Trimmer: no leading whitespace to remove.'), 0)80class TrimLeadingTrailingWhitespace(sublime_plugin.TextCommand):81    def run(self, edit):82        view = self.view83        has_matches = False84        reobj = re.compile('^[ \t]+|[\t ]+$', re.MULTILINE)85        for region in selections(view):86            str_buffer = view.substr(region)87            trimmed = reobj.sub('', str_buffer)88            if str_buffer != trimmed:89                view.replace(edit, region, trimmed)90                has_matches = True91        if has_matches is True:92            sublime.set_timeout(lambda: sublime.status_message(93                'Trimmer: leading and trailing whitespace removed.'), 0)94        else:95            sublime.set_timeout(lambda: sublime.status_message(96                'Trimmer: no leading or trailing whitespace to remove.'), 0)97class CollapseLines(sublime_plugin.TextCommand):98    def run(self, edit):99        view = self.view100        has_matches = False101        reobj = re.compile(r'(?:\s*)(\r?\n)(?:\s*)(?:\r?\n+)')102        for region in selections(view):103            str_buffer = view.substr(region)104            trimmed = reobj.sub(r'\1\1', str_buffer)105            if str_buffer != trimmed:106                view.replace(edit, region, trimmed)107                has_matches = True108        if has_matches is True:109            sublime.set_timeout(lambda: sublime.status_message(110                'Trimmer: lines collapsed.'), 0)111        else:112            sublime.set_timeout(lambda: sublime.status_message(113                'Trimmer: no lines to collapse.'), 0)114class CollapseSpaces(sublime_plugin.TextCommand):115    def run(self, edit):116        view = self.view117        has_matches = False118        reobj = re.compile('([ ])[ ]+')119        for region in selections(view):120            str_buffer = view.substr(region)121            trimmed = reobj.sub(r'\1', str_buffer)122            if str_buffer != trimmed:123                view.replace(edit, region, trimmed)124                has_matches = True125        if has_matches is True:126            sublime.set_timeout(lambda: sublime.status_message(127                'Trimmer: spaces collapsed.'), 0)128        else:129            sublime.set_timeout(lambda: sublime.status_message(130                'Trimmer: no spaces to collapse.'), 0)131class NormalizeSpaces(sublime_plugin.TextCommand):132    def run(self, edit):133        view = self.view134        has_matches = False135        reobj = re.compile('(^\\s+)|\\s(?=\\s+)|(\\s+$)')136        for region in selections(view):137            str_buffer = view.substr(region)138            trimmed = reobj.sub('', str_buffer)139            if str_buffer != trimmed:140                view.replace(edit, region, trimmed)141                has_matches = True142        if has_matches is True:143            sublime.set_timeout(lambda: sublime.status_message(144                'Trimmer: spaces normalized.'), 0)145        else:146            sublime.set_timeout(lambda: sublime.status_message(147                'Trimmer: no spaces to normalize.'), 0)148class TokenizeString(sublime_plugin.TextCommand):149    def run(self, edit):150        view = self.view151        has_matches = False152        reobj = re.compile('^ |\t+|( ){2,}|\t| |\t+$', re.MULTILINE)153        for region in selections(view):154            str_buffer = view.substr(region)155            trimmed = reobj.sub('', str_buffer)156            if str_buffer != trimmed:157                view.replace(edit, region, trimmed)158                has_matches = True159        if has_matches is True:160            sublime.set_timeout(lambda: sublime.status_message(161                'Trimmer: string tokenized.'), 0)162        else:163            sublime.set_timeout(lambda: sublime.status_message(164                'Trimmer: nothing to tokenize.'), 0)165class TrimEdges(sublime_plugin.TextCommand):166    def run(self, edit):167        view = self.view168        has_matches = False169        reobj = re.compile(r'(\A\s+|\s+\Z)')170        if view.size() > 0:171            region = sublime.Region(0, view.size())172            str_buffer = view.substr(region)173            trimmed = reobj.sub('', str_buffer)174            if str_buffer != trimmed:175                view.replace(edit, region, trimmed)176                has_matches = True177        if has_matches is True:178            return sublime.set_timeout(lambda: sublime.status_message(179                'Trimmer: file edges trimmed.'), 0)180        else:181            return sublime.set_timeout(lambda: sublime.status_message(182                'Trimmer: no file edges to trim.'), 0)183class DeleteEmptyTags(sublime_plugin.TextCommand):184    def run(self, edit):185        view = self.view186        has_matches = False187        reobj = re.compile(r'<([A-Z][A-Z0-9]*)\b[^>]*>\s*</\1>', re.IGNORECASE)188        for region in selections(view):189            str_buffer = view.substr(region)190            trimmed = reobj.sub('', str_buffer)191            if str_buffer != trimmed:192                view.replace(edit, region, trimmed)193                has_matches = True194        if has_matches is True:195            sublime.set_timeout(lambda: sublime.status_message(196                'Trimmer: empty tags deleted.'), 0)197        else:198            sublime.set_timeout(lambda: sublime.status_message(199                'Trimmer: no empty tags to delete.'), 0)200class TrimSelections(sublime_plugin.TextCommand):201    def run(self, edit):202        """203        Trim leading and trailing whitespace from selections.204        Originally from the 'MultiâEditâUtils' Plug-in205        https://github.com/philippotto/Sublime-MultiEditUtils206        """207        view = self.view208        selection = view.sel()209        new_regions = []210        for current_region in selection:211            text = view.substr(current_region)212            l_stripped_text = text.lstrip()213            r_stripped_text = l_stripped_text.rstrip()214            l_stripped_count = len(text) - len(l_stripped_text)215            r_stripped_count = len(l_stripped_text) - len(r_stripped_text)216            a = current_region.begin() + l_stripped_count217            b = current_region.end() - r_stripped_count218            if a == b:219                # the region only contained whitespace220                # use the old selection end to avoid jumping of cursor221                a = b = current_region.b222            new_regions.append(sublime.Region(a, b))223        selection.clear()224        for region in new_regions:225            selection.add(region)226        sublime.set_timeout(lambda: sublime.status_message(227            'Trimmer: selections trimmed.'), 0)228class RemoveComments(sublime_plugin.TextCommand):229    def run(self, edit):230        view = self.view231        has_matches = False232        re_single_line_comment = re.compile(r"//.*$", re.MULTILINE)233        re_hash_comment = re.compile("#[^!].*$", re.MULTILINE)234        re_html_comment = re.compile("<!--.*?-->", re.DOTALL)235        re_block_comment = re.compile(r"/\*.*?\*/", re.DOTALL)236        re_ini_comment = re.compile(r"^(?:\s+)?;.*$", re.MULTILINE)237        for region in selections(view):238            str_buffer = view.substr(region)239            #240            # TODO: re-work this brute force approach and filter by syntax/scope241            trimmed = re_single_line_comment.sub('', str_buffer)242            trimmed = re_hash_comment.sub('', trimmed)243            trimmed = re_html_comment.sub('', trimmed)244            trimmed = re_block_comment.sub('', trimmed)245            trimmed = re_ini_comment.sub('', trimmed)246            if str_buffer != trimmed:247                view.replace(edit, region, trimmed)248                has_matches = True249        if has_matches is True:250            view.run_command('collapse_lines')251            sublime.set_timeout(lambda: sublime.status_message(252                'Trimmer: comments removed.'), 0)253        else:254            sublime.set_timeout(lambda: sublime.status_message(255                'Trimmer: no comments to remove.'), 0)256class ReplaceSmartCharactersCommand(sublime_plugin.TextCommand):257    def run(self, edit):258        view = self.view259        has_matches = False260        smart_replacements = [261            [u'[âââ]', u'\''],262            [u'[ââ]', u'"'],263            [u'[â]', u'"'],264            [u'[â¦]', u'...'],265            [u'[â]', u'---'],266            [u'[â]', u'--'],267            [u'[â¢]', u'*'],268            [u'[·]', u'-'],269            [u'[â]', u'   '],270            [u'[â]', u'  '],271            [u'[ ââ]', u' '],272            [u'[«]', u'<<'],273            [u'[»]', u'>>'],274            [u'[©]', u'(C)'],275            [u'[®]', u'(R)'],276            [u'[â¢]', u'(TM)']277        ]278        for replacement in smart_replacements:279            for region in selections(view):280                source_text = view.substr(region)281                if len(source_text) > 0:282                    replaced_text = re.sub(283                        replacement[0], replacement[1], source_text)284                    if source_text != replaced_text:285                        view.replace(edit, region, replaced_text)286                        has_matches = True287        if has_matches is True:288            sublime.set_timeout(lambda: sublime.status_message(289                'Trimmer: smart characters replaced.'), 0)290        else:291            sublime.set_timeout(lambda: sublime.status_message(...python_html.py
Source:python_html.py  
...145        desear()146def num_giros_dados():147    numero = ran.randint(5, 9)148    if numero == 5:149        timer.set_timeout(girar_dado1, 200)150        timer.set_timeout(girar_dado2, 200)151        timer.set_timeout(girar_dado1, 400)152        timer.set_timeout(girar_dado2, 400)153        timer.set_timeout(girar_dado1, 600)154        timer.set_timeout(girar_dado2, 600)155        timer.set_timeout(girar_dado1, 800)156        timer.set_timeout(girar_dado2, 800)157        timer.set_timeout(girar_dado1, 1000)158        timer.set_timeout(girar_dado2, 1000)159    elif numero == 6:160        timer.set_timeout(girar_dado1, 200)161        timer.set_timeout(girar_dado2, 200)162        timer.set_timeout(girar_dado1, 400)163        timer.set_timeout(girar_dado2, 400)164        timer.set_timeout(girar_dado1, 600)165        timer.set_timeout(girar_dado2, 600)166        timer.set_timeout(girar_dado1, 800)167        timer.set_timeout(girar_dado2, 800)168        timer.set_timeout(girar_dado1, 1000)169        timer.set_timeout(girar_dado2, 1000)170        timer.set_timeout(girar_dado1, 1200)171        timer.set_timeout(girar_dado2, 1200)172    elif numero == 7:173        timer.set_timeout(girar_dado1, 200)174        timer.set_timeout(girar_dado2, 200)175        timer.set_timeout(girar_dado1, 400)176        timer.set_timeout(girar_dado2, 400)177        timer.set_timeout(girar_dado1, 600)178        timer.set_timeout(girar_dado2, 600)179        timer.set_timeout(girar_dado1, 800)180        timer.set_timeout(girar_dado2, 800)181        timer.set_timeout(girar_dado1, 1000)182        timer.set_timeout(girar_dado2, 1000)183        timer.set_timeout(girar_dado1, 1200)184        timer.set_timeout(girar_dado2, 1200)185        timer.set_timeout(girar_dado1, 1400)186        timer.set_timeout(girar_dado2, 1400)187    elif numero == 8:188        timer.set_timeout(girar_dado1, 200)189        timer.set_timeout(girar_dado2, 200)190        timer.set_timeout(girar_dado1, 400)191        timer.set_timeout(girar_dado2, 400)192        timer.set_timeout(girar_dado1, 600)193        timer.set_timeout(girar_dado2, 600)194        timer.set_timeout(girar_dado1, 800)195        timer.set_timeout(girar_dado2, 800)196        timer.set_timeout(girar_dado1, 1000)197        timer.set_timeout(girar_dado2, 1000)198        timer.set_timeout(girar_dado1, 1200)199        timer.set_timeout(girar_dado2, 1200)200        timer.set_timeout(girar_dado1, 1400)201        timer.set_timeout(girar_dado2, 1400)202        timer.set_timeout(girar_dado1, 1600)203        timer.set_timeout(girar_dado2, 1600)204    elif numero == 9:205        timer.set_timeout(girar_dado1, 200)206        timer.set_timeout(girar_dado2, 200)207        timer.set_timeout(girar_dado1, 400)208        timer.set_timeout(girar_dado1, 400)209        timer.set_timeout(girar_dado2, 600)210        timer.set_timeout(girar_dado1, 600)211        timer.set_timeout(girar_dado2, 800)212        timer.set_timeout(girar_dado1, 800)213        timer.set_timeout(girar_dado2, 1000)214        timer.set_timeout(girar_dado1, 1000)215        timer.set_timeout(girar_dado2, 1200)216        timer.set_timeout(girar_dado2, 1200)217        timer.set_timeout(girar_dado1, 1400)218        timer.set_timeout(girar_dado2, 1400)219        timer.set_timeout(girar_dado1, 1600)220        timer.set_timeout(girar_dado2, 1600)221        timer.set_timeout(girar_dado1, 1800)222        timer.set_timeout(girar_dado2, 1800)223    timer.set_timeout(ejecutar, 1900)224def sorteo(ev):225    num_giros_dados()226def sorteo_par(ev):227    document['sel'].text = "Par"228    sorteo(ev)229def sorteo_impar(ev):230    document['sel'].text = "Impar"231    sorteo(ev)232def suma_apuesta(ev):233    captura = document['apuesta'].value234    if (int(captura) > billetera):235        document['apuesta'].style.color = 'red'236        document['par'].unbind('click')237        document['impar'].unbind('click')...test_server.py
Source:test_server.py  
...28    signal.signal(signal.SIGINT, old_intr)29    signal.signal(signal.SIGTERM, old_term)30    signal.signal(signal.SIGUSR1, old_term)31@pytest.fixture32def set_timeout():33    def make_timeout(sec, callback):34        def _callback(signum, frame):35            signal.alarm(0)36            callback()37        signal.signal(signal.SIGALRM, _callback)38        signal.setitimer(signal.ITIMER_REAL, sec)39    yield make_timeout40@pytest.fixture41def exec_recorder():42    f = tempfile.NamedTemporaryFile(43        mode='w', encoding='utf8',44        prefix='aiotools.tests.server.',45    )46    f.close()47    def write(msg: str) -> None:48        path = f"{f.name}.{os.getpid()}"49        with open(path, 'a', encoding='utf8') as writer:50            writer.write(msg + '\n')51    def read() -> Sequence[str]:52        lines: List[str] = []53        for path in glob.glob(f"{f.name}.*"):54            with open(path, 'r', encoding='utf8') as reader:55                lines.extend(line.strip() for line in reader.readlines())56        return lines57    yield write, read58    for path in glob.glob(f"{f.name}.*"):59        os.unlink(path)60def interrupt():61    os.kill(0, signal.SIGINT)62def interrupt_usr1():63    os.kill(os.getpid(), signal.SIGUSR1)64@aiotools.server   # type: ignore65async def myserver_simple(loop, proc_idx, args):66    write = args[0]67    await asyncio.sleep(0)68    write(f'started:{proc_idx}')69    yield70    await asyncio.sleep(0)71    write(f'terminated:{proc_idx}')72def test_server_singleproc(set_timeout, restore_signal, exec_recorder):73    write, read = exec_recorder74    set_timeout(0.2, interrupt)75    aiotools.start_server(76        myserver_simple,77        args=(write,),78    )79    lines = set(read())80    assert 'started:0' in lines81    assert 'terminated:0' in lines82def test_server_multiproc(set_timeout, restore_signal, exec_recorder):83    write, read = exec_recorder84    set_timeout(0.2, interrupt)85    aiotools.start_server(86        myserver_simple,87        num_workers=3,88        args=(write,),89    )90    lines = set(read())91    assert lines == {92        'started:0', 'started:1', 'started:2',93        'terminated:0', 'terminated:1', 'terminated:2',94    }95@aiotools.server  # type: ignore96async def myserver_signal(loop, proc_idx, args):97    write = args[0]98    await asyncio.sleep(0)99    write(f'started:{proc_idx}')100    received_signum = yield101    await asyncio.sleep(0)102    write(f'terminated:{proc_idx}:{received_signum}')103def test_server_multiproc_custom_stop_signals(104    set_timeout,105    restore_signal,106    exec_recorder,107):108    write, read = exec_recorder109    set_timeout(0.2, interrupt_usr1)110    aiotools.start_server(111        myserver_signal,112        num_workers=2,113        stop_signals={signal.SIGUSR1},114        args=(write,),115    )116    lines = set(read())117    assert {'started:0', 'started:1'} < lines118    assert {119        f'terminated:0:{int(signal.SIGUSR1)}',120        f'terminated:1:{int(signal.SIGUSR1)}',121    } < lines122@aiotools.server  # type: ignore123async def myserver_worker_init_error(loop, proc_idx, args):124    write = args[0]125    class _LogAdaptor:126        def __init__(self, writer):127            self.writer = writer128        def write(self, msg):129            msg = msg.strip().replace('\n', ' ')130            self.writer(f'log:{proc_idx}:{msg}')131    log_stream = _LogAdaptor(write)132    logging.config.dictConfig({133        'version': 1,134        'handlers': {135            'console': {136                'class': 'logging.StreamHandler',137                'stream': log_stream,138                'level': 'DEBUG',139            },140        },141        'loggers': {142            'aiotools': {143                'handlers': ['console'],144                'level': 'DEBUG',145            },146        },147    })148    log = logging.getLogger('aiotools')149    write(f'started:{proc_idx}')150    log.debug('hello')151    if proc_idx in (0, 2):152        # delay until other workers start normally.153        await asyncio.sleep(0.1 * proc_idx)154        raise ZeroDivisionError('oops')155    yield156    # should not be reached if errored.157    await asyncio.sleep(0)158    write(f'terminated:{proc_idx}')159def test_server_worker_init_error(restore_signal, exec_recorder):160    write, read = exec_recorder161    aiotools.start_server(162        myserver_worker_init_error,163        num_workers=4,164        args=(write,),165    )166    lines = set(read())167    assert sum(1 if line.startswith('started:') else 0 for line in lines) == 4168    # workers who did not raise errors have already started,169    # and they should have terminated normally170    # when the errorneous worker interrupted the main loop.171    assert sum(1 if line.startswith('terminated:') else 0 for line in lines) == 2172    assert sum(1 if 'hello' in line else 0 for line in lines) == 4173    assert sum(1 if 'ZeroDivisionError: oops' in line else 0 for line in lines) == 2174def test_server_user_main(set_timeout, restore_signal):175    main_enter = False176    main_exit = False177    @aiotools.main178    def mymain_user_main():179        nonlocal main_enter, main_exit180        main_enter = True181        yield 987182        main_exit = True183    @aiotools.server  # type: ignore184    async def myworker_user_main(loop, proc_idx, args):185        assert args[0] == 987  # first arg from user main186        assert args[1] == 123  # second arg from start_server args187        yield188    set_timeout(0.2, interrupt)189    aiotools.start_server(190        myworker_user_main,191        mymain_user_main,192        num_workers=3,193        args=(123,),194    )195    assert main_enter196    assert main_exit197def test_server_user_main_custom_stop_signals(set_timeout, restore_signal):198    main_enter = False199    main_exit = False200    main_signal = None201    worker_signals = mp.Array('i', 3)202    @aiotools.main203    def mymain():204        nonlocal main_enter, main_exit, main_signal205        main_enter = True206        main_signal = yield207        main_exit = True208    @aiotools.server209    async def myworker(loop, proc_idx, args):210        worker_signals = args[0]211        worker_signals[proc_idx] = yield212    def noop(signum, frame):213        pass214    set_timeout(0.2, interrupt_usr1)215    aiotools.start_server(216        myworker,217        mymain,218        num_workers=3,219        stop_signals={signal.SIGUSR1},220        args=(worker_signals,),221    )222    assert main_enter223    assert main_exit224    assert main_signal == signal.SIGUSR1225    assert list(worker_signals) == [signal.SIGUSR1] * 3226def test_server_user_main_tuple(set_timeout, restore_signal):227    main_enter = False228    main_exit = False229    @aiotools.main230    def mymain():231        nonlocal main_enter, main_exit232        main_enter = True233        yield 987, 654234        main_exit = True235    @aiotools.server236    async def myworker(loop, proc_idx, args):237        assert args[0] == 987  # first arg from user main238        assert args[1] == 654  # second arg from user main239        assert args[2] == 123  # third arg from start_server args240        yield241    set_timeout(0.2, interrupt)242    aiotools.start_server(243        myworker,244        mymain,245        num_workers=3,246        args=(123,),247    )248    assert main_enter249    assert main_exit250def test_server_extra_proc(set_timeout, restore_signal):251    extras = mp.Array('i', [0, 0])252    def extra_proc(key, _, pidx, args):253        assert _ is None254        extras[key] = 980 + key255        try:256            while True:257                time.sleep(0.1)258        except KeyboardInterrupt:259            print(f'extra[{key}] interrupted', file=sys.stderr)260        except Exception as e:261            print(f'extra[{key}] exception', e, file=sys.stderr)262        finally:263            print(f'extra[{key}] finish', file=sys.stderr)264            extras[key] = 990 + key265    @aiotools.server266    async def myworker(loop, pidx, args):267        yield268    set_timeout(0.2, interrupt)269    aiotools.start_server(myworker, extra_procs=[270                              functools.partial(extra_proc, 0),271                              functools.partial(extra_proc, 1)],272                          num_workers=3, args=(123, ))273    assert extras[0] == 990274    assert extras[1] == 991275def test_server_extra_proc_custom_stop_signal(set_timeout, restore_signal):276    received_signals = mp.Array('i', [0, 0])277    def extra_proc(key, _, pidx, args):278        received_signals = args[0]279        try:280            while True:281                time.sleep(0.1)282        except aiotools.InterruptedBySignal as e:283            received_signals[key] = e.args[0]284    @aiotools.server285    async def myworker(loop, pidx, args):286        yield287    set_timeout(0.3, interrupt_usr1)288    aiotools.start_server(myworker, extra_procs=[289                              functools.partial(extra_proc, 0),290                              functools.partial(extra_proc, 1)],291                          stop_signals={signal.SIGUSR1},292                          args=(received_signals, ),293                          num_workers=3)294    assert received_signals[0] == signal.SIGUSR1...test_lock.py
Source:test_lock.py  
...68            t.start()69        for t in threads:70            t.join()71        def test03_lock_timeout(self):72            self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)73            self.assertEqual(self.env.get_timeout(db.DB_SET_LOCK_TIMEOUT), 0)74            self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)75            self.assertEqual(self.env.get_timeout(db.DB_SET_TXN_TIMEOUT), 0)76            self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)77            self.assertEqual(self.env.get_timeout(db.DB_SET_LOCK_TIMEOUT), 123456)78            self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)79            self.assertEqual(self.env.get_timeout(db.DB_SET_TXN_TIMEOUT), 7890123)80    def test04_lock_timeout2(self):81        self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)82        self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)83        self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)84        self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)85        def deadlock_detection() :86            while not deadlock_detection.end :87                deadlock_detection.count = \88                    self.env.lock_detect(db.DB_LOCK_EXPIRE)89                if deadlock_detection.count :90                    while not deadlock_detection.end :91                        pass92                    break93                time.sleep(0.01)94        deadlock_detection.end=False95        deadlock_detection.count=096        t=Thread(target=deadlock_detection)97        import sys98        if sys.version_info[0] < 3 :99            t.setDaemon(True)100        else :101            t.daemon = True102        t.start()103        self.env.set_timeout(100000, db.DB_SET_LOCK_TIMEOUT)104        anID = self.env.lock_id()105        anID2 = self.env.lock_id()106        self.assertNotEqual(anID, anID2)107        lock = self.env.lock_get(anID, "shared lock", db.DB_LOCK_WRITE)108        start_time=time.time()109        self.assertRaises(db.DBLockNotGrantedError,110                self.env.lock_get,anID2, "shared lock", db.DB_LOCK_READ)111        end_time=time.time()112        deadlock_detection.end=True113        # Floating point rounding114        self.assertTrue((end_time-start_time) >= 0.0999)115        self.env.lock_put(lock)116        t.join()117        self.env.lock_id_free(anID)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
