How to use set_timeout method in yandex-tank

Best Python code snippet using yandex-tank

Trimmer.py

Source:Trimmer.py Github

copy

Full Screen

...20 for region in regions:21 view.erase(edit, region)22 has_matches = True23 if has_matches:24 sublime.set_timeout(lambda: sublime.status_message(25 'Trimmer: trailing whitespace removed.'), 0)26 else:27 sublime.set_timeout(lambda: sublime.status_message(28 'Trimmer: no trailing whitespace found.'), 0)29class DeleteEmptyLinesCommand(sublime_plugin.TextCommand):30 def run(self, edit):31 view = self.view32 has_matches = False33 reobj = re.compile('^[ \t]*$\r?\n', re.MULTILINE)34 for region in selections(view):35 str_buffer = view.substr(region)36 trimmed = reobj.sub('', str_buffer)37 if str_buffer != trimmed:38 view.replace(edit, region, trimmed)39 has_matches = True40 if has_matches is True:41 sublime.set_timeout(lambda: sublime.status_message(42 'Trimmer: empty lines deleted.'), 0)43 else:44 sublime.set_timeout(lambda: sublime.status_message(45 'Trimmer: no empty lines to delete.'), 0)46class RemoveBlankSpaces(sublime_plugin.TextCommand):47 def run(self, edit):48 view = self.view49 has_matches = False50 reobj = re.compile(r'[ \t\r\n\v\f]')51 for region in selections(view):52 str_buffer = view.substr(region)53 trimmed = reobj.sub('', str_buffer)54 if str_buffer != trimmed:55 view.replace(edit, region, trimmed)56 has_matches = True57 if has_matches is True:58 sublime.set_timeout(lambda: sublime.status_message(59 'Trimmer: blanks spaces removed.'), 0)60 else:61 sublime.set_timeout(lambda: sublime.status_message(62 'Trimmer: no blank spaces to remove.'), 0)63class TrimLeadingWhitespaceCommand(sublime_plugin.TextCommand):64 def run(self, edit):65 view = self.view66 has_matches = False67 reobj = re.compile('^[ \t]+', re.MULTILINE)68 for region in selections(view):69 str_buffer = view.substr(region)70 trimmed = reobj.sub('', str_buffer)71 if str_buffer != trimmed:72 view.replace(edit, region, trimmed)73 has_matches = True74 if has_matches is True:75 sublime.set_timeout(lambda: sublime.status_message(76 'Trimmer: leading whitespace removed.'), 0)77 else:78 sublime.set_timeout(lambda: sublime.status_message(79 'Trimmer: no leading whitespace to remove.'), 0)80class TrimLeadingTrailingWhitespace(sublime_plugin.TextCommand):81 def run(self, edit):82 view = self.view83 has_matches = False84 reobj = re.compile('^[ \t]+|[\t ]+$', re.MULTILINE)85 for region in selections(view):86 str_buffer = view.substr(region)87 trimmed = reobj.sub('', str_buffer)88 if str_buffer != trimmed:89 view.replace(edit, region, trimmed)90 has_matches = True91 if has_matches is True:92 sublime.set_timeout(lambda: sublime.status_message(93 'Trimmer: leading and trailing whitespace removed.'), 0)94 else:95 sublime.set_timeout(lambda: sublime.status_message(96 'Trimmer: no leading or trailing whitespace to remove.'), 0)97class CollapseLines(sublime_plugin.TextCommand):98 def run(self, edit):99 view = self.view100 has_matches = False101 reobj = re.compile(r'(?:\s*)(\r?\n)(?:\s*)(?:\r?\n+)')102 for region in selections(view):103 str_buffer = view.substr(region)104 trimmed = reobj.sub(r'\1\1', str_buffer)105 if str_buffer != trimmed:106 view.replace(edit, region, trimmed)107 has_matches = True108 if has_matches is True:109 sublime.set_timeout(lambda: sublime.status_message(110 'Trimmer: lines collapsed.'), 0)111 else:112 sublime.set_timeout(lambda: sublime.status_message(113 'Trimmer: no lines to collapse.'), 0)114class CollapseSpaces(sublime_plugin.TextCommand):115 def run(self, edit):116 view = self.view117 has_matches = False118 reobj = re.compile('([ ])[ ]+')119 for region in selections(view):120 str_buffer = view.substr(region)121 trimmed = reobj.sub(r'\1', str_buffer)122 if str_buffer != trimmed:123 view.replace(edit, region, trimmed)124 has_matches = True125 if has_matches is True:126 sublime.set_timeout(lambda: sublime.status_message(127 'Trimmer: spaces collapsed.'), 0)128 else:129 sublime.set_timeout(lambda: sublime.status_message(130 'Trimmer: no spaces to collapse.'), 0)131class NormalizeSpaces(sublime_plugin.TextCommand):132 def run(self, edit):133 view = self.view134 has_matches = False135 reobj = re.compile('(^\\s+)|\\s(?=\\s+)|(\\s+$)')136 for region in selections(view):137 str_buffer = view.substr(region)138 trimmed = reobj.sub('', str_buffer)139 if str_buffer != trimmed:140 view.replace(edit, region, trimmed)141 has_matches = True142 if has_matches is True:143 sublime.set_timeout(lambda: sublime.status_message(144 'Trimmer: spaces normalized.'), 0)145 else:146 sublime.set_timeout(lambda: sublime.status_message(147 'Trimmer: no spaces to normalize.'), 0)148class TokenizeString(sublime_plugin.TextCommand):149 def run(self, edit):150 view = self.view151 has_matches = False152 reobj = re.compile('^ |\t+|( ){2,}|\t| |\t+$', re.MULTILINE)153 for region in selections(view):154 str_buffer = view.substr(region)155 trimmed = reobj.sub('', str_buffer)156 if str_buffer != trimmed:157 view.replace(edit, region, trimmed)158 has_matches = True159 if has_matches is True:160 sublime.set_timeout(lambda: sublime.status_message(161 'Trimmer: string tokenized.'), 0)162 else:163 sublime.set_timeout(lambda: sublime.status_message(164 'Trimmer: nothing to tokenize.'), 0)165class TrimEdges(sublime_plugin.TextCommand):166 def run(self, edit):167 view = self.view168 has_matches = False169 reobj = re.compile(r'(\A\s+|\s+\Z)')170 if view.size() > 0:171 region = sublime.Region(0, view.size())172 str_buffer = view.substr(region)173 trimmed = reobj.sub('', str_buffer)174 if str_buffer != trimmed:175 view.replace(edit, region, trimmed)176 has_matches = True177 if has_matches is True:178 return sublime.set_timeout(lambda: sublime.status_message(179 'Trimmer: file edges trimmed.'), 0)180 else:181 return sublime.set_timeout(lambda: sublime.status_message(182 'Trimmer: no file edges to trim.'), 0)183class DeleteEmptyTags(sublime_plugin.TextCommand):184 def run(self, edit):185 view = self.view186 has_matches = False187 reobj = re.compile(r'<([A-Z][A-Z0-9]*)\b[^>]*>\s*</\1>', re.IGNORECASE)188 for region in selections(view):189 str_buffer = view.substr(region)190 trimmed = reobj.sub('', str_buffer)191 if str_buffer != trimmed:192 view.replace(edit, region, trimmed)193 has_matches = True194 if has_matches is True:195 sublime.set_timeout(lambda: sublime.status_message(196 'Trimmer: empty tags deleted.'), 0)197 else:198 sublime.set_timeout(lambda: sublime.status_message(199 'Trimmer: no empty tags to delete.'), 0)200class TrimSelections(sublime_plugin.TextCommand):201 def run(self, edit):202 """203 Trim leading and trailing whitespace from selections.204 Originally from the 'Multi​Edit​Utils' Plug-in205 https://github.com/philippotto/Sublime-MultiEditUtils206 """207 view = self.view208 selection = view.sel()209 new_regions = []210 for current_region in selection:211 text = view.substr(current_region)212 l_stripped_text = text.lstrip()213 r_stripped_text = l_stripped_text.rstrip()214 l_stripped_count = len(text) - len(l_stripped_text)215 r_stripped_count = len(l_stripped_text) - len(r_stripped_text)216 a = current_region.begin() + l_stripped_count217 b = current_region.end() - r_stripped_count218 if a == b:219 # the region only contained whitespace220 # use the old selection end to avoid jumping of cursor221 a = b = current_region.b222 new_regions.append(sublime.Region(a, b))223 selection.clear()224 for region in new_regions:225 selection.add(region)226 sublime.set_timeout(lambda: sublime.status_message(227 'Trimmer: selections trimmed.'), 0)228class RemoveComments(sublime_plugin.TextCommand):229 def run(self, edit):230 view = self.view231 has_matches = False232 re_single_line_comment = re.compile(r"//.*$", re.MULTILINE)233 re_hash_comment = re.compile("#[^!].*$", re.MULTILINE)234 re_html_comment = re.compile("<!--.*?-->", re.DOTALL)235 re_block_comment = re.compile(r"/\*.*?\*/", re.DOTALL)236 re_ini_comment = re.compile(r"^(?:\s+)?;.*$", re.MULTILINE)237 for region in selections(view):238 str_buffer = view.substr(region)239 #240 # TODO: re-work this brute force approach and filter by syntax/scope241 trimmed = re_single_line_comment.sub('', str_buffer)242 trimmed = re_hash_comment.sub('', trimmed)243 trimmed = re_html_comment.sub('', trimmed)244 trimmed = re_block_comment.sub('', trimmed)245 trimmed = re_ini_comment.sub('', trimmed)246 if str_buffer != trimmed:247 view.replace(edit, region, trimmed)248 has_matches = True249 if has_matches is True:250 view.run_command('collapse_lines')251 sublime.set_timeout(lambda: sublime.status_message(252 'Trimmer: comments removed.'), 0)253 else:254 sublime.set_timeout(lambda: sublime.status_message(255 'Trimmer: no comments to remove.'), 0)256class ReplaceSmartCharactersCommand(sublime_plugin.TextCommand):257 def run(self, edit):258 view = self.view259 has_matches = False260 smart_replacements = [261 [u'[’‘‚]', u'\''],262 [u'[“”]', u'"'],263 [u'[„]', u'"'],264 [u'[…]', u'...'],265 [u'[—]', u'---'],266 [u'[–]', u'--'],267 [u'[•]', u'*'],268 [u'[·]', u'-'],269 [u'[ ]', u' '],270 [u'[ ]', u' '],271 [u'[   ]', u' '],272 [u'[«]', u'<<'],273 [u'[»]', u'>>'],274 [u'[©]', u'(C)'],275 [u'[®]', u'(R)'],276 [u'[™]', u'(TM)']277 ]278 for replacement in smart_replacements:279 for region in selections(view):280 source_text = view.substr(region)281 if len(source_text) > 0:282 replaced_text = re.sub(283 replacement[0], replacement[1], source_text)284 if source_text != replaced_text:285 view.replace(edit, region, replaced_text)286 has_matches = True287 if has_matches is True:288 sublime.set_timeout(lambda: sublime.status_message(289 'Trimmer: smart characters replaced.'), 0)290 else:291 sublime.set_timeout(lambda: sublime.status_message(...

Full Screen

Full Screen

python_html.py

Source:python_html.py Github

copy

Full Screen

...145 desear()146def num_giros_dados():147 numero = ran.randint(5, 9)148 if numero == 5:149 timer.set_timeout(girar_dado1, 200)150 timer.set_timeout(girar_dado2, 200)151 timer.set_timeout(girar_dado1, 400)152 timer.set_timeout(girar_dado2, 400)153 timer.set_timeout(girar_dado1, 600)154 timer.set_timeout(girar_dado2, 600)155 timer.set_timeout(girar_dado1, 800)156 timer.set_timeout(girar_dado2, 800)157 timer.set_timeout(girar_dado1, 1000)158 timer.set_timeout(girar_dado2, 1000)159 elif numero == 6:160 timer.set_timeout(girar_dado1, 200)161 timer.set_timeout(girar_dado2, 200)162 timer.set_timeout(girar_dado1, 400)163 timer.set_timeout(girar_dado2, 400)164 timer.set_timeout(girar_dado1, 600)165 timer.set_timeout(girar_dado2, 600)166 timer.set_timeout(girar_dado1, 800)167 timer.set_timeout(girar_dado2, 800)168 timer.set_timeout(girar_dado1, 1000)169 timer.set_timeout(girar_dado2, 1000)170 timer.set_timeout(girar_dado1, 1200)171 timer.set_timeout(girar_dado2, 1200)172 elif numero == 7:173 timer.set_timeout(girar_dado1, 200)174 timer.set_timeout(girar_dado2, 200)175 timer.set_timeout(girar_dado1, 400)176 timer.set_timeout(girar_dado2, 400)177 timer.set_timeout(girar_dado1, 600)178 timer.set_timeout(girar_dado2, 600)179 timer.set_timeout(girar_dado1, 800)180 timer.set_timeout(girar_dado2, 800)181 timer.set_timeout(girar_dado1, 1000)182 timer.set_timeout(girar_dado2, 1000)183 timer.set_timeout(girar_dado1, 1200)184 timer.set_timeout(girar_dado2, 1200)185 timer.set_timeout(girar_dado1, 1400)186 timer.set_timeout(girar_dado2, 1400)187 elif numero == 8:188 timer.set_timeout(girar_dado1, 200)189 timer.set_timeout(girar_dado2, 200)190 timer.set_timeout(girar_dado1, 400)191 timer.set_timeout(girar_dado2, 400)192 timer.set_timeout(girar_dado1, 600)193 timer.set_timeout(girar_dado2, 600)194 timer.set_timeout(girar_dado1, 800)195 timer.set_timeout(girar_dado2, 800)196 timer.set_timeout(girar_dado1, 1000)197 timer.set_timeout(girar_dado2, 1000)198 timer.set_timeout(girar_dado1, 1200)199 timer.set_timeout(girar_dado2, 1200)200 timer.set_timeout(girar_dado1, 1400)201 timer.set_timeout(girar_dado2, 1400)202 timer.set_timeout(girar_dado1, 1600)203 timer.set_timeout(girar_dado2, 1600)204 elif numero == 9:205 timer.set_timeout(girar_dado1, 200)206 timer.set_timeout(girar_dado2, 200)207 timer.set_timeout(girar_dado1, 400)208 timer.set_timeout(girar_dado1, 400)209 timer.set_timeout(girar_dado2, 600)210 timer.set_timeout(girar_dado1, 600)211 timer.set_timeout(girar_dado2, 800)212 timer.set_timeout(girar_dado1, 800)213 timer.set_timeout(girar_dado2, 1000)214 timer.set_timeout(girar_dado1, 1000)215 timer.set_timeout(girar_dado2, 1200)216 timer.set_timeout(girar_dado2, 1200)217 timer.set_timeout(girar_dado1, 1400)218 timer.set_timeout(girar_dado2, 1400)219 timer.set_timeout(girar_dado1, 1600)220 timer.set_timeout(girar_dado2, 1600)221 timer.set_timeout(girar_dado1, 1800)222 timer.set_timeout(girar_dado2, 1800)223 timer.set_timeout(ejecutar, 1900)224def sorteo(ev):225 num_giros_dados()226def sorteo_par(ev):227 document['sel'].text = "Par"228 sorteo(ev)229def sorteo_impar(ev):230 document['sel'].text = "Impar"231 sorteo(ev)232def suma_apuesta(ev):233 captura = document['apuesta'].value234 if (int(captura) > billetera):235 document['apuesta'].style.color = 'red'236 document['par'].unbind('click')237 document['impar'].unbind('click')...

Full Screen

Full Screen

test_server.py

Source:test_server.py Github

copy

Full Screen

...28 signal.signal(signal.SIGINT, old_intr)29 signal.signal(signal.SIGTERM, old_term)30 signal.signal(signal.SIGUSR1, old_term)31@pytest.fixture32def set_timeout():33 def make_timeout(sec, callback):34 def _callback(signum, frame):35 signal.alarm(0)36 callback()37 signal.signal(signal.SIGALRM, _callback)38 signal.setitimer(signal.ITIMER_REAL, sec)39 yield make_timeout40@pytest.fixture41def exec_recorder():42 f = tempfile.NamedTemporaryFile(43 mode='w', encoding='utf8',44 prefix='aiotools.tests.server.',45 )46 f.close()47 def write(msg: str) -> None:48 path = f"{f.name}.{os.getpid()}"49 with open(path, 'a', encoding='utf8') as writer:50 writer.write(msg + '\n')51 def read() -> Sequence[str]:52 lines: List[str] = []53 for path in glob.glob(f"{f.name}.*"):54 with open(path, 'r', encoding='utf8') as reader:55 lines.extend(line.strip() for line in reader.readlines())56 return lines57 yield write, read58 for path in glob.glob(f"{f.name}.*"):59 os.unlink(path)60def interrupt():61 os.kill(0, signal.SIGINT)62def interrupt_usr1():63 os.kill(os.getpid(), signal.SIGUSR1)64@aiotools.server # type: ignore65async def myserver_simple(loop, proc_idx, args):66 write = args[0]67 await asyncio.sleep(0)68 write(f'started:{proc_idx}')69 yield70 await asyncio.sleep(0)71 write(f'terminated:{proc_idx}')72def test_server_singleproc(set_timeout, restore_signal, exec_recorder):73 write, read = exec_recorder74 set_timeout(0.2, interrupt)75 aiotools.start_server(76 myserver_simple,77 args=(write,),78 )79 lines = set(read())80 assert 'started:0' in lines81 assert 'terminated:0' in lines82def test_server_multiproc(set_timeout, restore_signal, exec_recorder):83 write, read = exec_recorder84 set_timeout(0.2, interrupt)85 aiotools.start_server(86 myserver_simple,87 num_workers=3,88 args=(write,),89 )90 lines = set(read())91 assert lines == {92 'started:0', 'started:1', 'started:2',93 'terminated:0', 'terminated:1', 'terminated:2',94 }95@aiotools.server # type: ignore96async def myserver_signal(loop, proc_idx, args):97 write = args[0]98 await asyncio.sleep(0)99 write(f'started:{proc_idx}')100 received_signum = yield101 await asyncio.sleep(0)102 write(f'terminated:{proc_idx}:{received_signum}')103def test_server_multiproc_custom_stop_signals(104 set_timeout,105 restore_signal,106 exec_recorder,107):108 write, read = exec_recorder109 set_timeout(0.2, interrupt_usr1)110 aiotools.start_server(111 myserver_signal,112 num_workers=2,113 stop_signals={signal.SIGUSR1},114 args=(write,),115 )116 lines = set(read())117 assert {'started:0', 'started:1'} < lines118 assert {119 f'terminated:0:{int(signal.SIGUSR1)}',120 f'terminated:1:{int(signal.SIGUSR1)}',121 } < lines122@aiotools.server # type: ignore123async def myserver_worker_init_error(loop, proc_idx, args):124 write = args[0]125 class _LogAdaptor:126 def __init__(self, writer):127 self.writer = writer128 def write(self, msg):129 msg = msg.strip().replace('\n', ' ')130 self.writer(f'log:{proc_idx}:{msg}')131 log_stream = _LogAdaptor(write)132 logging.config.dictConfig({133 'version': 1,134 'handlers': {135 'console': {136 'class': 'logging.StreamHandler',137 'stream': log_stream,138 'level': 'DEBUG',139 },140 },141 'loggers': {142 'aiotools': {143 'handlers': ['console'],144 'level': 'DEBUG',145 },146 },147 })148 log = logging.getLogger('aiotools')149 write(f'started:{proc_idx}')150 log.debug('hello')151 if proc_idx in (0, 2):152 # delay until other workers start normally.153 await asyncio.sleep(0.1 * proc_idx)154 raise ZeroDivisionError('oops')155 yield156 # should not be reached if errored.157 await asyncio.sleep(0)158 write(f'terminated:{proc_idx}')159def test_server_worker_init_error(restore_signal, exec_recorder):160 write, read = exec_recorder161 aiotools.start_server(162 myserver_worker_init_error,163 num_workers=4,164 args=(write,),165 )166 lines = set(read())167 assert sum(1 if line.startswith('started:') else 0 for line in lines) == 4168 # workers who did not raise errors have already started,169 # and they should have terminated normally170 # when the errorneous worker interrupted the main loop.171 assert sum(1 if line.startswith('terminated:') else 0 for line in lines) == 2172 assert sum(1 if 'hello' in line else 0 for line in lines) == 4173 assert sum(1 if 'ZeroDivisionError: oops' in line else 0 for line in lines) == 2174def test_server_user_main(set_timeout, restore_signal):175 main_enter = False176 main_exit = False177 @aiotools.main178 def mymain_user_main():179 nonlocal main_enter, main_exit180 main_enter = True181 yield 987182 main_exit = True183 @aiotools.server # type: ignore184 async def myworker_user_main(loop, proc_idx, args):185 assert args[0] == 987 # first arg from user main186 assert args[1] == 123 # second arg from start_server args187 yield188 set_timeout(0.2, interrupt)189 aiotools.start_server(190 myworker_user_main,191 mymain_user_main,192 num_workers=3,193 args=(123,),194 )195 assert main_enter196 assert main_exit197def test_server_user_main_custom_stop_signals(set_timeout, restore_signal):198 main_enter = False199 main_exit = False200 main_signal = None201 worker_signals = mp.Array('i', 3)202 @aiotools.main203 def mymain():204 nonlocal main_enter, main_exit, main_signal205 main_enter = True206 main_signal = yield207 main_exit = True208 @aiotools.server209 async def myworker(loop, proc_idx, args):210 worker_signals = args[0]211 worker_signals[proc_idx] = yield212 def noop(signum, frame):213 pass214 set_timeout(0.2, interrupt_usr1)215 aiotools.start_server(216 myworker,217 mymain,218 num_workers=3,219 stop_signals={signal.SIGUSR1},220 args=(worker_signals,),221 )222 assert main_enter223 assert main_exit224 assert main_signal == signal.SIGUSR1225 assert list(worker_signals) == [signal.SIGUSR1] * 3226def test_server_user_main_tuple(set_timeout, restore_signal):227 main_enter = False228 main_exit = False229 @aiotools.main230 def mymain():231 nonlocal main_enter, main_exit232 main_enter = True233 yield 987, 654234 main_exit = True235 @aiotools.server236 async def myworker(loop, proc_idx, args):237 assert args[0] == 987 # first arg from user main238 assert args[1] == 654 # second arg from user main239 assert args[2] == 123 # third arg from start_server args240 yield241 set_timeout(0.2, interrupt)242 aiotools.start_server(243 myworker,244 mymain,245 num_workers=3,246 args=(123,),247 )248 assert main_enter249 assert main_exit250def test_server_extra_proc(set_timeout, restore_signal):251 extras = mp.Array('i', [0, 0])252 def extra_proc(key, _, pidx, args):253 assert _ is None254 extras[key] = 980 + key255 try:256 while True:257 time.sleep(0.1)258 except KeyboardInterrupt:259 print(f'extra[{key}] interrupted', file=sys.stderr)260 except Exception as e:261 print(f'extra[{key}] exception', e, file=sys.stderr)262 finally:263 print(f'extra[{key}] finish', file=sys.stderr)264 extras[key] = 990 + key265 @aiotools.server266 async def myworker(loop, pidx, args):267 yield268 set_timeout(0.2, interrupt)269 aiotools.start_server(myworker, extra_procs=[270 functools.partial(extra_proc, 0),271 functools.partial(extra_proc, 1)],272 num_workers=3, args=(123, ))273 assert extras[0] == 990274 assert extras[1] == 991275def test_server_extra_proc_custom_stop_signal(set_timeout, restore_signal):276 received_signals = mp.Array('i', [0, 0])277 def extra_proc(key, _, pidx, args):278 received_signals = args[0]279 try:280 while True:281 time.sleep(0.1)282 except aiotools.InterruptedBySignal as e:283 received_signals[key] = e.args[0]284 @aiotools.server285 async def myworker(loop, pidx, args):286 yield287 set_timeout(0.3, interrupt_usr1)288 aiotools.start_server(myworker, extra_procs=[289 functools.partial(extra_proc, 0),290 functools.partial(extra_proc, 1)],291 stop_signals={signal.SIGUSR1},292 args=(received_signals, ),293 num_workers=3)294 assert received_signals[0] == signal.SIGUSR1...

Full Screen

Full Screen

test_lock.py

Source:test_lock.py Github

copy

Full Screen

...68 t.start()69 for t in threads:70 t.join()71 def test03_lock_timeout(self):72 self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)73 self.assertEqual(self.env.get_timeout(db.DB_SET_LOCK_TIMEOUT), 0)74 self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)75 self.assertEqual(self.env.get_timeout(db.DB_SET_TXN_TIMEOUT), 0)76 self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)77 self.assertEqual(self.env.get_timeout(db.DB_SET_LOCK_TIMEOUT), 123456)78 self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)79 self.assertEqual(self.env.get_timeout(db.DB_SET_TXN_TIMEOUT), 7890123)80 def test04_lock_timeout2(self):81 self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)82 self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)83 self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)84 self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)85 def deadlock_detection() :86 while not deadlock_detection.end :87 deadlock_detection.count = \88 self.env.lock_detect(db.DB_LOCK_EXPIRE)89 if deadlock_detection.count :90 while not deadlock_detection.end :91 pass92 break93 time.sleep(0.01)94 deadlock_detection.end=False95 deadlock_detection.count=096 t=Thread(target=deadlock_detection)97 import sys98 if sys.version_info[0] < 3 :99 t.setDaemon(True)100 else :101 t.daemon = True102 t.start()103 self.env.set_timeout(100000, db.DB_SET_LOCK_TIMEOUT)104 anID = self.env.lock_id()105 anID2 = self.env.lock_id()106 self.assertNotEqual(anID, anID2)107 lock = self.env.lock_get(anID, "shared lock", db.DB_LOCK_WRITE)108 start_time=time.time()109 self.assertRaises(db.DBLockNotGrantedError,110 self.env.lock_get,anID2, "shared lock", db.DB_LOCK_READ)111 end_time=time.time()112 deadlock_detection.end=True113 # Floating point rounding114 self.assertTrue((end_time-start_time) >= 0.0999)115 self.env.lock_put(lock)116 t.join()117 self.env.lock_id_free(anID)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run yandex-tank automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful