Best Python code snippet using autotest_python Github


...23 raise NotImplementedError24class GILTests(test_rthread.AbstractGCTestClass):25 use_threads = True26 bigtest = False27 def test_one_thread(self, skew=+1):28 from rpython.rlib.debug import debug_print29 if self.bigtest:30 N = 10000031 skew *= 2500032 else:33 N = 10034 skew *= 2535 space = FakeSpace()36 class State:37 pass38 state = State()39 def runme(main=False):40 j = 041 for i in range(N + [-skew, skew][main]):42 state.datalen1 += 1 # try to crash if the GIL is not43 state.datalen2 += 1 # correctly acquired44, i))45 state.datalen3 += 146 state.datalen4 += 147 assert state.datalen1 == len( assert state.datalen2 == len( assert state.datalen3 == len( assert state.datalen4 == len( debug_print(main, i, state.datalen4)52 gil.do_yield_thread()53 assert i == j54 j += 155 def bootstrap():56 try:57 runme()58 except Exception, e:59 assert 060 thread.gc_thread_die()61 def f():62 = []63 state.datalen1 = 064 state.datalen2 = 065 state.datalen3 = 066 state.datalen4 = 067 state.threadlocals = gil.GILThreadLocals()68 state.threadlocals.setup_threads(space)69 thread.gc_thread_prepare()70 subident = thread.start_new_thread(bootstrap, ())71 mainident = thread.get_ident()72 runme(True)73 still_waiting = 300074 while len( < 2*N:75 debug_print(len( if not still_waiting:77 raise ValueError("time out")78 still_waiting -= 179 if not we_are_translated(): gil.before_external_call()80 time.sleep(0.01)81 if not we_are_translated(): gil.after_external_call()82 debug_print("leaving!")83 i1 = i2 = 084 for tid, i in if tid == mainident:86 assert i == i1; i1 += 187 elif tid == subident:88 assert i == i2; i2 += 189 else:90 assert 091 assert i1 == N + skew92 assert i2 == N - skew93 return len( fn = self.getcompiled(f, [])95 res = fn()96 assert res == 2*N97 def test_one_thread_rev(self):98 self.test_one_thread(skew=-1)99class TestRunDirectly(GILTests):100 def getcompiled(self, f, argtypes):101 return f102class TestUsingFramework(GILTests):103 gcpolicy = 'generation'...

