How to use interrupted method in Slash

Best Python code snippet using slash

main_sm.py

Source:main_sm.py Github

copy

Full Screen

1#!/usr/bin/env python2import smach3from states import *4from assembly_kitting_states import *5import pick_and_place6from Sensors import Sensors_functions7import path_planning8import process_management9import nist_assembly10import smach_ros11def child_term_cb_track(outcome_map):12 if outcome_map['TRAY'] == 'done':13 return True14def out_cb_track(outcome_map):15 if outcome_map['TRAY'] == 'done':16 return 'trayon'17# gets called when ANY child state terminates18def child_term_cb_kitting(outcome_map):19 if outcome_map['ORDERS'] == 'highPriorityOrder':20 return True21 if outcome_map['KITTING']:22 return True23 # in all other case, just keep running, don't terminate anything24 return False25# gets called when ALL child states are terminated26def out_cb_kitting(outcome_map):27 if outcome_map['ORDERS'] == 'highPriorityOrder':28 return 'interrupted'29 elif outcome_map['KITTING'] == 'interrupted':30 return 'interrupted'31 else:32 return 'complete'33# gets called when ANY child state terminates34def child_term_cb_assembly(outcome_map):35 if outcome_map['ORDERS'] == 'highPriorityOrder':36 return True37 if outcome_map['ASSEMBLY']:38 return True39 # in all other case, just keep running, don't terminate anything40 return False41# gets called when ALL child states are terminated42def out_cb_assembly(outcome_map):43 if outcome_map['ORDERS'] == 'highPriorityOrder':44 return 'interrupted'45 elif outcome_map['ASSEMBLY'] == 'interrupted':46 return 'interrupted'47 else:48 return 'complete'49if __name__ == '__main__':50 rospy.init_node('main_node', anonymous=True)51 rm = pick_and_place.RobotMover()52 act = rm.inverse_kin53 sen = Sensors_functions()54 # sens = Sensors_subscribers()55 gp = path_planning.GantryPlanner()56 node = process_management.process_management()57 ass = nist_assembly.AssemblyCommander()58 asm = smach.StateMachine(outcomes=['finished', 'interrupted'], input_keys=['task', 'kittingtask'])59 with asm:60 smach.StateMachine.add('CHECKGRIPPER', CheckGripper(act),61 transitions={'next': 'SENDGANTRY', 'changegripper': 'CHANGEGRIP',62 'preempted': 'interrupted'}, remapping={'gripper': 'gripper'})63 smach.StateMachine.add('CHANGEGRIP', GetGripper(gp, rm),64 transitions={'gripperon': 'SENDGANTRY', 'preempted': 'interrupted'},65 remapping={'gripper': 'gripper'})66 smach.StateMachine.add('SENDGANTRY', SendGantry(gp, node, ass, sen),67 transitions={'arrived': 'CHECKPART', 'preempted': 'interrupted'},68 remapping={'task': 'task'})69 smach.StateMachine.add('CHECKPART', CheckPart(node),70 transitions={'noParts': 'SUBMITASSEMBLY', 'newPart': 'FINDPART', 'skip': 'CHECKPART',71 'preempted': 'interrupted'}, remapping={'task': 'task', 'part': 'part'})72 smach.StateMachine.add('SUBMITASSEMBLY', SubmitAssemblyShipment(node),73 transitions={'success': 'finished', 'preempted': 'interrupted'},74 remapping={'task': 'task'})75 smach.StateMachine.add('FINDPART', FindPartOnTray(act, node, sen),76 transitions={'found': 'GANTRYMOVEPART', 'noFound': 'CHECKPART',77 'preempted': 'interrupted'},78 remapping={'kittingtask': 'kittingtask', 'partcurrentpose': 'partcurrentpose',79 'part': 'part'})80 smach.StateMachine.add('GANTRYMOVEPART', GantryMovePart(rm, sen, node, ass, gp, act),81 transitions={'moved': 'CHECKPART', 'preempted': 'interrupted'},82 remapping={'partcurrentpose': 'partcurrentpose', 'part': 'part', 'task': 'task'})83 ksm = smach.StateMachine(outcomes=['finished', 'interrupted'], input_keys=['kittingtask', 'faultybinpose'])84 with ksm:85 get_gripper_tray_sm = smach.StateMachine(outcomes=['done', 'stop'], input_keys=['kittingtask'],86 output_keys=['gripper'])87 with get_gripper_tray_sm:88 smach.StateMachine.add('CHECKTRAY', CheckMoveableTray(act, rm),89 transitions={'changegripper': 'GETGRIPPER', 'changetray': 'GETTRAY',90 'preempted': 'stop'},91 remapping={'task': 'kittingtask', 'gripper': 'gripper'})92 smach.StateMachine.add('GETGRIPPER', GetGripper(gp, rm),93 transitions={'gripperon': 'GETTRAY', 'preempted': 'stop'},94 remapping={'gripper': 'gripper'})95 smach.StateMachine.add('GETTRAY', GantryGetTray(node, gp, rm, sen),96 transitions={'trayon': 'done', 'preempted': 'stop'},97 remapping={'task': 'kittingtask'})98 pick_from_track = smach.StateMachine(outcomes=['finish'], input_keys=['kittingtask'])99 with pick_from_track:100 smach.StateMachine.add('CHECKBELT', WaitConveyorBelt(),101 transitions={'ontrack': 'PICKFROMBELT', 'preempted': 'finish'})102 smach.StateMachine.add('PICKFROMBELT', PickFromConveyor(rm, act),103 transitions={'next': 'CHECKBELT', 'preempted': 'finish'},104 remapping={'task': 'kittingtask'})105 concurrent_tray_track_sm = smach.Concurrence(outcomes=['trayon'], default_outcome='trayon',106 input_keys=['kittingtask'], output_keys=['gripper'],107 child_termination_cb=child_term_cb_track, outcome_cb=out_cb_track)108 with concurrent_tray_track_sm:109 smach.Concurrence.add('TRACK', pick_from_track, remapping={'task': 'kittingtask'})110 smach.Concurrence.add('TRAY', get_gripper_tray_sm,111 remapping={'kittingtask': 'kittingtask', 'gripper': 'gripper'})112 smach.StateMachine.add('CHECKAGV', CheckAGV(node),113 transitions={'agvatks': 'TRACKANDTRAY', 'agvnotatks': 'MOVEAGVTOKS',114 'preempted': 'interrupted'}, remapping={'task': 'kittingtask'})115 smach.StateMachine.add('MOVEAGVTOKS', SendAGV(node),116 transitions={'agvatks': 'TRACKANDTRAY', 'preempted': 'interrupted'},117 remapping={'task': 'kittingtask'})118 # smach.StateMachine.add('CHECKTRAY', CheckMoveableTray(act, rm), transitions={'changegripper':'GETGRIPPER', 'changetray':'GETTRAY', 'preempted':'interrupted'}, remapping={'task':'kittingtask', 'gripper':'gripper'})119 # smach.StateMachine.add('GETGRIPPER', GetGripper(gp, rm), transitions={'gripperon':'GETTRAY', 'preempted':'interrupted'}, remapping={'gripper':'gripper'})120 # smach.StateMachine.add('GETTRAY', GantryGetTray(node, gp, rm, sen), transitions={'trayon':'CHECKTOREMOVE', 'preempted':'interrupted'}, remapping={'task':'kittingtask',})121 smach.StateMachine.add('TRACKANDTRAY', concurrent_tray_track_sm, transitions={'trayon': 'CHECKTOREMOVE'},122 remapping={'kittingtask': 'kittingtask', 'gripper': 'gripper', 'task': 'task'})123 smach.StateMachine.add('CHECKTOREMOVE', CheckToRemove(node),124 transitions={'remove': 'REMOVE', 'continue': 'CHECKKITTINGPART',125 'preempted': 'interrupted'},126 remapping={'positiontoremove': 'positiontoremove'})127 smach.StateMachine.add('REMOVE', RemovePart(node, rm, sen),128 transitions={'removed': 'CHECKTOREMOVE', 'preempted': 'interrupted',129 'broken': 'SUBMITKITTING'},130 remapping={'positiontoremove': 'positiontoremove'})131 smach.StateMachine.add('CHECKKITTINGPART', CheckPart(node),132 transitions={'noParts': 'SUBMITKITTING', 'newPart': 'FINDPARTINENV',133 'skip': 'CHECKKITTINGPART', 'preempted': 'interrupted'},134 remapping={'task': 'kittingtask', 'part': 'part'})135 smach.StateMachine.add('FINDPARTINENV', FindPartInEnvironment(node, sen),136 transitions={'found': 'KITTINGPICKANDPLACE', 'none': 'CHECKKITTINGPART',137 'preempted': 'interrupted'},138 remapping={'task': 'kittingtask', 'part': 'part', 'partcurrentpose': 'partcurrentpose',139 'partpose': 'partpose'})140 smach.StateMachine.add('KITTINGPICKANDPLACE', KittingRobotPickAndPlace(node, rm, sen),141 transitions={'success': 'CHECKFAULTY', 'lost': 'FINDPARTINENV',142 'preempted': 'interrupted', 'broken': 'SUBMITKITTING'},143 remapping={'task': 'kittingtask', 'partpose': 'partpose',144 'partcurrentpose': 'partcurrentpose', 'part': 'part'})145 smach.StateMachine.add('CHECKFAULTY', CheckFaulty(rm),146 transitions={'faulty': 'FAULTYPICKANDPLACE', 'notfaulty': 'CHECKKITTINGPART',147 'preempted': 'interrupted'}, remapping={'part': 'part'})148 smach.StateMachine.add('FAULTYPICKANDPLACE', FaultyPickAndPlace(node, rm, sen),149 transitions={'success': 'FINDPARTINENV', 'lost': 'FINDPARTINENV',150 'preempted': 'interrupted', 'broken': 'SUBMITKITTING'},151 remapping={'task': 'kittingtask', 'partpose': 'faultybinpose',152 'partcurrentpose': 'partpose', 'part': 'part'})153 smach.StateMachine.add('SUBMITKITTING', SubmitKittingShipment(node),154 transitions={'success': 'finished', 'preempted': 'interrupted'},155 remapping={'task': 'kittingtask'})156 hksm = smach.StateMachine(outcomes=['finished', 'interrupted'], input_keys=['kittingtask', 'faultybinpose'])157 with hksm:158 smach.StateMachine.add('CHECKAGV', CheckAGV(node),159 transitions={'agvatks': 'CHECKTRAY', 'agvnotatks': 'MOVEAGVTOKS',160 'preempted': 'interrupted'}, remapping={'task': 'kittingtask'})161 smach.StateMachine.add('MOVEAGVTOKS', SendAGV(node),162 transitions={'agvatks': 'CHECKTRAY', 'preempted': 'interrupted'},163 remapping={'task': 'kittingtask'})164 smach.StateMachine.add('CHECKTRAY', CheckMoveableTray(act, rm),165 transitions={'changegripper': 'GETGRIPPER', 'changetray': 'GETTRAY',166 'preempted': 'interrupted'},167 remapping={'task': 'kittingtask', 'gripper': 'gripper'})168 smach.StateMachine.add('GETGRIPPER', GetGripper(gp, rm),169 transitions={'gripperon': 'GETTRAY', 'preempted': 'interrupted'},170 remapping={'gripper': 'gripper'})171 smach.StateMachine.add('GETTRAY', GantryGetTray(node, gp, rm, sen),172 transitions={'trayon': 'CHECKTOREMOVE', 'preempted': 'interrupted'},173 remapping={'task': 'kittingtask'})174 # smach.StateMachine.add('TRACKANDTRAY', concurrent_tray_track_sm, transitions={'done':'CHECKKITTINGPART'}, remapping={'kittingtask':'kittingtask', 'gripper':'gripper', 'task':'task', 'traydone':'traydone'})175 smach.StateMachine.add('CHECKTOREMOVE', CheckToRemove(node),176 transitions={'remove': 'REMOVE', 'continue': 'HCHECKKITTINGPART',177 'preempted': 'interrupted'},178 remapping={'positiontoremove': 'positiontoremove'})179 smach.StateMachine.add('REMOVE', RemovePart(node, rm, sen),180 transitions={'removed': 'CHECKTOREMOVE', 'preempted': 'interrupted',181 'broken': 'SUBMITKITTING'},182 remapping={'positiontoremove': 'positiontoremove'})183 smach.StateMachine.add('HCHECKKITTINGPART', CheckPart(node),184 transitions={'noParts': 'SUBMITKITTING', 'newPart': 'FINDPARTINENV',185 'skip': 'HCHECKKITTINGPART', 'preempted': 'interrupted'},186 remapping={'task': 'kittingtask', 'part': 'part'})187 smach.StateMachine.add('FINDPARTINENV', FindPartInEnvironment(node, sen),188 transitions={'found': 'KITTINGPICKANDPLACE', 'none': 'HCHECKKITTINGPART',189 'preempted': 'interrupted'},190 remapping={'task': 'kittingtask', 'part': 'part', 'partcurrentpose': 'partcurrentpose',191 'partpose': 'partpose'})192 smach.StateMachine.add('KITTINGPICKANDPLACE', KittingRobotPickAndPlace(node, rm, sen),193 transitions={'success': 'CHECKFAULTY', 'lost': 'FINDPARTINENV',194 'preempted': 'interrupted', 'broken': 'SUBMITKITTING'},195 remapping={'task': 'kittingtask', 'partpose': 'partpose',196 'partcurrentpose': 'partcurrentpose', 'part': 'part'})197 smach.StateMachine.add('CHECKFAULTY', CheckFaulty(rm),198 transitions={'faulty': 'FAULTYPICKANDPLACE', 'notfaulty': 'HCHECKKITTINGPART',199 'preempted': 'interrupted'}, remapping={'part': 'part'})200 smach.StateMachine.add('FAULTYPICKANDPLACE', FaultyPickAndPlace(node, rm, sen),201 transitions={'success': 'FINDPARTINENV', 'lost': 'FINDPARTINENV',202 'preempted': 'interrupted', 'broken': 'SUBMITKITTING'},203 remapping={'task': 'kittingtask', 'partpose': 'faultybinpose',204 'partcurrentpose': 'partpose', 'part': 'part'})205 smach.StateMachine.add('SUBMITKITTING', SubmitKittingShipment(node),206 transitions={'success': 'finished', 'preempted': 'interrupted'},207 remapping={'task': 'kittingtask'})208 simple_kitting = smach.StateMachine(outcomes=['finished', 'interrupted'],209 input_keys=['kittingtask', 'faultybinpose'])210 with simple_kitting:211 # smach.StateMachine.add('TRACKANDTRAY', concurrent_tray_track_sm, transitions={'done':'CHECKKITTINGPART'}, remapping={'kittingtask':'kittingtask', 'gripper':'gripper', 'task':'task', 'traydone':'traydone'})212 smach.StateMachine.add('CHECKTOREMOVE', CheckToRemove(node),213 transitions={'remove': 'REMOVE', 'continue': 'CHECKKITTINGPART',214 'preempted': 'interrupted'},215 remapping={'positiontoremove': 'positiontoremove'})216 smach.StateMachine.add('REMOVE', RemovePart(node, rm, sen),217 transitions={'removed': 'CHECKTOREMOVE', 'preempted': 'interrupted',218 'broken': 'SUBMITKITTING'},219 remapping={'positiontoremove': 'positiontoremove'})220 smach.StateMachine.add('CHECKKITTINGPART', CheckPart(node),221 transitions={'noParts': 'SUBMITKITTING', 'newPart': 'FINDPARTINENV',222 'skip': 'CHECKKITTINGPART', 'preempted': 'interrupted'},223 remapping={'task': 'kittingtask', 'part': 'part'})224 smach.StateMachine.add('FINDPARTINENV', FindPartInEnvironment(node, sen),225 transitions={'found': 'KITTINGPICKANDPLACE', 'none': 'CHECKKITTINGPART',226 'preempted': 'interrupted'},227 remapping={'task': 'kittingtask', 'part': 'part', 'partcurrentpose': 'partcurrentpose',228 'partpose': 'partpose'})229 smach.StateMachine.add('KITTINGPICKANDPLACE', KittingRobotPickAndPlace(node, rm, sen),230 transitions={'success': 'CHECKFAULTY', 'lost': 'FINDPARTINENV',231 'preempted': 'interrupted', 'broken': 'SUBMITKITTING'},232 remapping={'task': 'kittingtask', 'partpose': 'partpose',233 'partcurrentpose': 'partcurrentpose', 'part': 'part'})234 smach.StateMachine.add('CHECKFAULTY', CheckFaulty(rm),235 transitions={'faulty': 'FAULTYPICKANDPLACE', 'notfaulty': 'CHECKKITTINGPART',236 'preempted': 'interrupted'}, remapping={'part': 'part'})237 smach.StateMachine.add('FAULTYPICKANDPLACE', FaultyPickAndPlace(node, rm, sen),238 transitions={'success': 'FINDPARTINENV', 'lost': 'FINDPARTINENV',239 'preempted': 'interrupted', 'broken': 'SUBMITKITTING'},240 remapping={'task': 'kittingtask', 'partpose': 'faultybinpose',241 'partcurrentpose': 'partpose', 'part': 'part'})242 smach.StateMachine.add('SUBMITKITTING', SubmitKittingShipment(node),243 transitions={'success': 'finished', 'preempted': 'interrupted'},244 remapping={'task': 'kittingtask'})245 sm = smach.StateMachine(outcomes=['end'], input_keys=['faultybinpose'])246 with sm:247 smach.StateMachine.add('START', StartCompetition(node), transitions={'success': 'CHECKORDERS'},248 remapping={'interrupted': 'interrupted'})249 smach.StateMachine.add('CHECKORDERS', CheckOrders(node),250 transitions={'complete': 'ENDCOMP', 'nextOrder': 'CHECKTASKS',251 'highPriorityOrder': 'HPCHECKTASKS'}, remapping={'nextOrder': 'order'})252 smach.StateMachine.add('CHECKTASKS', CheckTasks(node),253 transitions={'kitting': 'CKITTING', 'assembly': 'CASSEMBLY', 'complete': 'CHECKORDERS',254 'kittingnotray': 'SIMPLEKITTING'},255 remapping={'order': 'order', 'task': 'task'})256 smach.StateMachine.add('ENDCOMP', EndCompetition(node), transitions={'ended': 'end'})257 # waitorder = smach.StateMachine(outcomes=['highPriorityOrder'], output_keys=['nextOrder'])258 # with waitorder:259 # smach.StateMachine.add('WAITFORORDER', CheckOrders(node), transitions={'complete':'WAITFORORDER', 'highPriorityOrder':'highPriorityOrder', 'nextOrder':'WAITFORORDER'}, remapping={'nextOrder':'nextOrder'})260 concurrent_assembly = smach.Concurrence(outcomes=['interrupted', 'complete'], default_outcome='complete',261 input_keys=['task', 'kittingtask', 'interrupted'],262 output_keys=['nextOrder'], child_termination_cb=child_term_cb_assembly,263 outcome_cb=out_cb_assembly)264 with concurrent_assembly:265 smach.Concurrence.add('ASSEMBLY', asm, remapping={'task': 'task', 'kittingtask': 'kittingtask'})266 smach.Concurrence.add('ORDERS', WaitOrder(node), remapping={'nextOrder': 'nextOrder'})267 smach.StateMachine.add('CASSEMBLY', concurrent_assembly,268 transitions={'complete': 'CHECKORDERS', 'interrupted': 'STOREASSEMBLYTASK'},269 remapping={'task': 'task', 'kittingtask': 'kittingtask', 'nextOrder': 'nextOrder',270 'interrupted': 'interrupted'})271 smach.StateMachine.add('STOREASSEMBLYTASK', StoreTask(node), transitions={'stored': 'HPCHECKTASKS'},272 remapping={'task': 'task'})273 # concurrent_kitting = smach.Concurrence(outcomes=['interrupted', 'complete'], default_outcome='complete', input_keys=['kittingtask', 'interrupted', 'faultybinpose'], output_keys=['nextOrder'], outcome_map={'interrupted':{'ORDERS':'highPriorityOrder'}, 'complete':{'KITTING':'finished'}})274 concurrent_kitting = smach.Concurrence(outcomes=['interrupted', 'complete'], default_outcome='complete',275 input_keys=['kittingtask', 'interrupted', 'faultybinpose'],276 output_keys=['nextOrder'], child_termination_cb=child_term_cb_kitting,277 outcome_cb=out_cb_kitting)278 with concurrent_kitting:279 smach.Concurrence.add('KITTING', ksm,280 remapping={'kittingtask': 'kittingtask', 'faultybinpose': 'faultybinpose'})281 smach.Concurrence.add('ORDERS', WaitOrder(node), remapping={'nextOrder': 'nextOrder'})282 smach.StateMachine.add('SIMPLEKITTING', simple_kitting,283 transitions={'finished': 'CHECKTASKS', 'interrupted': 'CHECKTASKS'})284 smach.StateMachine.add('CKITTING', concurrent_kitting,285 transitions={'complete': 'CHECKTASKS', 'interrupted': 'STOREKITTINGTASK'},286 remapping={'kittigtask': 'kittingtask', 'nextOrder': 'nextOrder',287 'interrupted': 'interrupted', 'faultybinpose': 'faultybinpose'})288 smach.StateMachine.add('STOREKITTINGTASK', StoreTask(node), transitions={'stored': 'HPCHECKTASKS'},289 remapping={'task': 'kittingtask'})290 smach.StateMachine.add('HPCHECKTASKS', HPCheckTasks(node),291 transitions={'kitting': 'HKITTING', 'assembly': 'HASSEMBLY',292 'complete': 'CONTINUEINTERRUPTED'},293 remapping={'order': 'nextOrder', 'HPtask': 'HPtask',294 'HPkittingtask': 'HPkittingtask'}) # define continue interrupted, save interrupted order295 smach.StateMachine.add('HASSEMBLY', asm, transitions={'finished': 'CONTINUEINTERRUPTED', 'interrupted': 'end'},296 remapping={'task': 'HPtask', 'kittingtask': 'HPkittingtask'})297 smach.StateMachine.add('HKITTING', hksm, transitions={'finished': 'HPCHECKTASKS', 'interrupted': 'end'},298 remapping={'kittingtask': 'HPkittingtask', 'faultybinpose': 'faultybinpose'})299 smach.StateMachine.add('CONTINUEINTERRUPTED', ContinueInterrupted(), transitions={'continue': 'CHECKORDERS'},300 remapping={'interrupted': 'interrupted'})301 faultybin = Pose()302 faultybin.position.x = -2.186829303 faultybin.position.y = 0.1304 faultybin.position.z = 0.8305 sm.userdata.faultybinpose = faultybin306 sis = smach_ros.IntrospectionServer('server_name', sm, '/SM_ROOT')307 sis.start()...

Full Screen

Full Screen

test_deinterrupt.py

Source:test_deinterrupt.py Github

copy

Full Screen

1# Copyright (C) 2014 Stefan C. Mueller2import unittest3from pydron.translation import utils4from pydron.translation.deinterrupt import DeInterrupt5class TestDeInterrupt(unittest.TestCase):6 def test_add_return(self):7 src = """8 def test():9 pass10 """11 expected = """12 def test():13 pass14 return None15 """16 utils.compare(src, expected, DeInterrupt)17 def test_return(self):18 src = """19 def test():20 return "Hello"21 """22 expected = """23 def test():24 returnvalue__U0 = 'Hello'25 interrupted__U0 = '3_return'26 return returnvalue__U027 """28 utils.compare(src, expected, DeInterrupt)29 30 def test_return_inside_if(self):31 src = """32 def test():33 if True:34 return "Hello"35 """36 expected = """37 def test():38 returnvalue__U0 = None39 interrupted__U0 = "0_none"40 if True:41 returnvalue__U0 = 'Hello'42 interrupted__U0 = "3_return"43 return returnvalue__U044 """45 utils.compare(src, expected, DeInterrupt)46 def test_return_skip(self):47 src = """48 def test():49 if True:50 return "Hello"51 print "not always executed"52 """53 expected = """54 def test():55 returnvalue__U0 = None56 interrupted__U0 = "0_none"57 if True:58 returnvalue__U0 = 'Hello'59 interrupted__U0 = '3_return'60 if interrupted__U0 == "0_none":61 print "not always executed"62 return returnvalue__U063 """64 utils.compare(src, expected, DeInterrupt)65 66 def test_return_skip_nested(self):67 src = """68 def test():69 if a:70 if b:71 return "Hello"72 print "a"73 print "b"74 """75 expected = """76 def test():77 returnvalue__U0 = None78 interrupted__U0 = "0_none"79 if a:80 interrupted__U0 = "0_none"81 if b:82 returnvalue__U0 = 'Hello'83 interrupted__U0 = '3_return'84 if interrupted__U0 == "0_none":85 print "a"86 if interrupted__U0 == "0_none":87 print "b"88 return returnvalue__U089 """90 utils.compare(src, expected, DeInterrupt)91 92 def test_for(self):93 src = """94 def test():95 for a in b:96 pass97 """98 expected = """99 def test():100 for a in b:101 pass102 return None103 """104 utils.compare(src, expected, DeInterrupt)105 106 def test_for_continue(self):107 src = """108 def test():109 for a in b:110 continue111 """112 expected = """113 def test():114 for a in b:115 interrupted__U0 = "1_continue"116 interrupted__U0 = "0_none"117 return None118 """119 utils.compare(src, expected, DeInterrupt)120 121 def test_for_continue_jump(self):122 src = """123 def test():124 for a in b:125 continue126 print "hi"127 """128 expected = """129 def test():130 for a in b:131 interrupted__U0 = "1_continue"132 interrupted__U0 = "0_none"133 return None134 """135 utils.compare(src, expected, DeInterrupt)136 137 def test_for_continue_if_jump(self):138 src = """139 def test():140 for a in b:141 if a:142 continue143 print "hi"144 """145 expected = """146 def test():147 for a in b:148 interrupted__U0 = "0_none"149 if a:150 interrupted__U0 = "1_continue"151 if interrupted__U0 == "0_none":152 print "hi"153 if interrupted__U0 == "1_continue":154 interrupted__U0 = "0_none"155 return None156 """157 utils.compare(src, expected, DeInterrupt)158 159 def test_for_break(self):160 src = """161 def test():162 for a in b:163 break164 print "hi"165 """166 expected = """167 def test():168 interrupted__U0 = "0_none"169 for a in b:170 interrupted__U0 = "2_break"171 if ((interrupted__U0 == '2_break') or (interrupted__U0 == '3_return')):172 break173 if interrupted__U0 == "2_break":174 interrupted__U0 = "0_none"175 return None176 """177 utils.compare(src, expected, DeInterrupt)178 179 def test_for_break_continue(self):180 src = """181 def test():182 for a in b:183 if a:184 break185 if b:186 continue187 print "hi"188 """189 expected = """190 def test():191 interrupted__U0 = "0_none"192 for a in b:193 interrupted__U0 = "0_none"194 if a:195 interrupted__U0 = "2_break"196 if interrupted__U0 == "0_none":197 interrupted__U0 = "0_none"198 if b:199 interrupted__U0 = "1_continue"200 if interrupted__U0 == "0_none":201 print "hi"202 if interrupted__U0 == "1_continue":203 interrupted__U0 = "0_none"204 if ((interrupted__U0 == '2_break') or (interrupted__U0 == '3_return')):205 break206 if interrupted__U0 == "2_break":207 interrupted__U0 = "0_none"208 return None209 """210 utils.compare(src, expected, DeInterrupt)211 212 def test_for_break_if(self):213 src = """214 def test():215 for a in b:216 if a:217 break218 print "hi"219 """220 expected = """221 def test():222 interrupted__U0 = "0_none"223 for a in b:224 interrupted__U0 = "0_none"225 if a:226 interrupted__U0 = "2_break"227 if interrupted__U0 == "0_none":228 print "hi"229 if ((interrupted__U0 == '2_break') or (interrupted__U0 == '3_return')):230 break231 if interrupted__U0 == "2_break":232 interrupted__U0 = "0_none"233 return None234 """235 utils.compare(src, expected, DeInterrupt)236 237 def test_for_break_else(self):238 src = """239 def test():240 for a in b:241 break242 else:243 print "hi"244 """245 expected = """246 def test():247 interrupted__U0 = "0_none"248 for a in b:249 interrupted__U0 = "2_break"250 if ((interrupted__U0 == '2_break') or (interrupted__U0 == '3_return')):251 break252 if interrupted__U0 == "0_none":253 print "hi"254 elif interrupted__U0 == "2_break":255 interrupted__U0 = "0_none"256 return None257 """258 utils.compare(src, expected, DeInterrupt)259 260 def test_for_else(self):261 src = """262 def test():263 for a in b:264 print "hi"265 else:266 print "HI"267 """268 expected = """269 def test():270 for a in b:271 print "hi"272 print "HI"273 return None274 """275 utils.compare(src, expected, DeInterrupt)276 def test_for_return(self):277 src = """278 def test():279 for a in b:280 return281 print "hi"282 """283 expected = """284 def test():285 returnvalue__U0 = None286 interrupted__U0 = "0_none"287 for a in b:288 returnvalue__U0 = None289 interrupted__U0 = "3_return"290 if ((interrupted__U0 == '2_break') or (interrupted__U0 == '3_return')):291 break292 if interrupted__U0 == "0_none":293 print "hi"294 return returnvalue__U0295 """296 utils.compare(src, expected, DeInterrupt)297 def test_for_return_else(self):298 src = """299 def test():300 for a in b:301 return302 else: 303 print "hi"304 """305 expected = """306 def test():307 returnvalue__U0 = None308 interrupted__U0 = "0_none"309 for a in b:310 returnvalue__U0 = None311 interrupted__U0 = "3_return"312 if ((interrupted__U0 == '2_break') or (interrupted__U0 == '3_return')):313 break314 if interrupted__U0 == "0_none":315 print "hi"316 return returnvalue__U0317 """318 utils.compare(src, expected, DeInterrupt)319 320 def test_while(self):321 src = """322 def test():323 while a:324 print "hi"325 """326 expected = """327 def test():328 while a:329 print "hi"330 return None331 """332 utils.compare(src, expected, DeInterrupt)333 334 def test_while_continue(self):335 src = """336 def test():337 while a:338 if b:339 continue340 print "hi"341 """342 expected = """343 def test():344 while a:345 interrupted__U0 = "0_none"346 if b:347 interrupted__U0 = "1_continue"348 if interrupted__U0 == "0_none":349 print "hi"350 if interrupted__U0 == "1_continue":351 interrupted__U0 = "0_none"352 return None353 """354 utils.compare(src, expected, DeInterrupt)355 def test_while_break(self):356 src = """357 def test():358 while a:359 if b:360 break361 print "hi"362 """363 expected = """364 def test():365 interrupted__U0 = "0_none"366 while interrupted__U0 == "0_none" and a:367 interrupted__U0 = "0_none"368 if b:369 interrupted__U0 = "2_break"370 if interrupted__U0 == "0_none":371 print "hi"372 if interrupted__U0 == "2_break":373 interrupted__U0 = "0_none"374 return None375 """376 utils.compare(src, expected, DeInterrupt)377 378 def test_With(self):379 src = """380 def test():381 with a:382 return x383 print "hi"384 """385 expected = """386 def test():387 with a:388 returnvalue__U0 = x389 interrupted__U0 = "3_return"390 return returnvalue__U0391 """392 utils.compare(src, expected, DeInterrupt)393 def test_tryexcept(self):394 src = """395 def test():396 try:397 return "hi"398 except:399 print "hello"400 print "world"401 """402 expected = """403 def test():404 returnvalue__U0 = None405 interrupted__U0 = "0_none"406 try:407 returnvalue__U0 = "hi"408 interrupted__U0 = "3_return"409 except:410 print "hello"411 if interrupted__U0 == "0_none":412 print "world"413 return returnvalue__U0414 """415 utils.compare(src, expected, DeInterrupt)416 417 def test_tryexcept_inexcept(self):418 src = """419 def test():420 try:421 print "hi"422 except:423 return "hello"424 print "world"425 """426 expected = """427 def test():428 returnvalue__U0 = None429 interrupted__U0 = "0_none"430 try:431 print "hi"432 except:433 returnvalue__U0 = "hello"434 interrupted__U0 = "3_return"435 if interrupted__U0 == "0_none":436 print "world"437 return returnvalue__U0438 """439 utils.compare(src, expected, DeInterrupt)440 def test_tryfinally(self):441 src = """442 def test():443 try:444 return "hi"445 finally:446 return "hello"447 print "world"448 """449 expected = """450 def test():451 returnvalue__U0 = None452 interrupted__U0 = "0_none"453 try:454 returnvalue__U0 = "hi"455 interrupted__U0 = "3_return"456 finally:457 interrupted__U1 = interrupted__U0458 returnvalue__U0 = "hello"459 interrupted__U0 = "3_return"460 interrupted__U0 = __pydron_max__(interrupted__U0, interrupted__U1)461 if interrupted__U0 == "0_none":462 print "world"463 return returnvalue__U0464 """...

Full Screen

Full Screen

Interrupt.py

Source:Interrupt.py Github

copy

Full Screen

1# Copyright (c) Meta Platforms, Inc. and affiliates.2#3# This source code is licensed under the MIT license found in the4# LICENSE file in the root directory of this source tree.5interrupted = False6def wasInterrupted() -> bool:7 global interrupted8 return interrupted9def setInterrupted() -> None:10 global interrupted11 interrupted = True12def checkInterrupt() -> None:13 """14 If an interrupt was detected, raise it now.15 We use this to defer interrupt processing until we're16 in the right place to handle it.17 """18 if wasInterrupted():...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Slash automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful