How to use timed_out method in localstack

Best Python code snippet using localstack_python

policy_executor_physical.py

Source:policy_executor_physical.py Github

copy

Full Screen

1#!/usr/bin/env python2import pnp_physical_vision as ppv3import rospkg4def ClaimNewOnion():5 # Create a SMACH state machine6 ClaimNewOnion = ppv.StateMachine(outcomes=['TIMED_OUT', 'SUCCEEDED', 'SORT COMPLETE'])7 ClaimNewOnion.userdata.sm_x = []8 ClaimNewOnion.userdata.sm_y = []9 ClaimNewOnion.userdata.sm_z = []10 ClaimNewOnion.userdata.sm_color = []11 ClaimNewOnion.userdata.sm_counter = 012 try:13 # Open the container14 with ClaimNewOnion:15 # Add states to the container16 ppv.StateMachine.add('GETINFO', ppv.Get_info_w_check(), 17 transitions={'updated':'CLAIM', 18 'not_updated':'GETINFO',19 'timed_out': 'TIMED_OUT',20 'completed': 'SORT COMPLETE'},21 remapping={'x':'sm_x', 'y': 'sm_y', 'z': 'sm_z',22 'color':'sm_color','counter':'sm_counter'})23 ppv.StateMachine.add('CLAIM', ppv.Claim(), 24 transitions={'updated':'SUCCEEDED', 25 'not_updated':'CLAIM',26 'timed_out': 'TIMED_OUT',27 'not_found': 'GETINFO',28 'move': 'GETINFO',29 'completed': 'SORT COMPLETE'},30 remapping={'x':'sm_x', 'y': 'sm_y', 'z': 'sm_z',31 'color':'sm_color','counter':'sm_counter'})32 # Execute SMACH plan33 outcome_claim = ClaimNewOnion.execute()34 # print '\nClaim outcome is: ', outcome_claim35 # ppv.rospy.sleep(100)36 return outcome_claim37 except ppv.rospy.ROSInterruptException:38 return39 except KeyboardInterrupt:40 return41def Pick():42 # Create a SMACH state machine43 Pick = ppv.StateMachine(outcomes=['TIMED_OUT', 'SUCCEEDED', 'FAILED'])44 Pick.userdata.sm_x = []45 Pick.userdata.sm_y = []46 Pick.userdata.sm_z = []47 Pick.userdata.sm_color = []48 Pick.userdata.sm_counter = 049 try:50 # Open the container51 with Pick:52 ppv.StateMachine.add('APPROACH', ppv.Approach(), 53 transitions={'success':'PICK', 54 'failed':'APPROACH',55 'timed_out': 'TIMED_OUT'},56 remapping={'x':'sm_x', 'y': 'sm_y', 'z': 'sm_z',57 'color':'sm_color','counter':'sm_counter'})58 ppv.StateMachine.add('PICK', ppv.Dipdown(), 59 transitions={'success':'GRASP', 60 'failed':'PICK',61 'timed_out': 'TIMED_OUT'},62 remapping={'x':'sm_x', 'y': 'sm_y', 'z': 'sm_z',63 'color':'sm_color','counter':'sm_counter'})64 65 ppv.StateMachine.add('GRASP', ppv.Grasp_object(),66 transitions={'success':'LIFTUP', 67 'failed':'GRASP',68 'timed_out': 'TIMED_OUT'},69 remapping={'x':'sm_x', 'y': 'sm_y', 'z': 'sm_z',70 'color':'sm_color','counter':'sm_counter'})71 ppv.StateMachine.add('LIFTUP', ppv.Liftup(),72 transitions={'success': 'SUCCEEDED', 73 'failed':'LIFTUP',74 'no_grasp':'FAILED',75 'timed_out': 'TIMED_OUT'},76 remapping={'counter':'sm_counter'})77 # Execute SMACH plan78 outcome_pick = Pick.execute()79 return outcome_pick80 except ppv.rospy.ROSInterruptException:81 return82 except KeyboardInterrupt:83 return84def InspectAfterPicking():85 # Create a SMACH state machine86 InspectAfterPicking = ppv.StateMachine(outcomes=['TIMED_OUT', 'SUCCEEDED'])87 InspectAfterPicking.userdata.sm_x = []88 InspectAfterPicking.userdata.sm_y = []89 InspectAfterPicking.userdata.sm_z = []90 InspectAfterPicking.userdata.sm_color = []91 InspectAfterPicking.userdata.sm_counter = 092 try:93 # Open the container94 with InspectAfterPicking:95 ppv.StateMachine.add('VIEW', ppv.View(), 96 transitions={'success':'SUCCEEDED', 97 'failed':'VIEW',98 'timed_out': 'TIMED_OUT'},99 remapping={'color':'sm_color', 'counter':'sm_counter'})100 # Execute SMACH plan101 outcome_inspect = InspectAfterPicking.execute()102 return outcome_inspect103 except ppv.rospy.ROSInterruptException:104 return105 except KeyboardInterrupt:106 return107def PlaceInBin():108 # Create a SMACH state machine109 PlaceInBin = ppv.StateMachine(outcomes=['TIMED_OUT', 'SUCCEEDED'])110 PlaceInBin.userdata.sm_x = []111 PlaceInBin.userdata.sm_y = []112 PlaceInBin.userdata.sm_z = []113 PlaceInBin.userdata.sm_color = []114 PlaceInBin.userdata.sm_counter = 0115 try:116 # Open the container117 with PlaceInBin:118 ppv.StateMachine.add('PLACEINBIN', ppv.Placeinbin(), 119 transitions={'success':'DETACH', 120 'failed':'PLACEINBIN',121 'timed_out': 'TIMED_OUT'},122 remapping={'color': 'sm_color','counter':'sm_counter'})123 ppv.StateMachine.add('DETACH', ppv.Detach_object(),124 transitions={'success':'SUCCEEDED', 125 'failed':'DETACH',126 'timed_out': 'TIMED_OUT'},127 remapping={'counter':'sm_counter'})128 # Execute SMACH plan129 outcome_place = PlaceInBin.execute()130 return outcome_place131 except ppv.rospy.ROSInterruptException:132 return133 except KeyboardInterrupt:134 return135def PlaceOnConveyor():136 # Create a SMACH state machine137 PlaceOnConveyor = ppv.StateMachine(outcomes=['TIMED_OUT', 'SUCCEEDED'])138 PlaceOnConveyor.userdata.sm_x = []139 PlaceOnConveyor.userdata.sm_y = []140 PlaceOnConveyor.userdata.sm_z = []141 PlaceOnConveyor.userdata.sm_color = []142 PlaceOnConveyor.userdata.sm_counter = 0143 try:144 # Open the container145 with PlaceOnConveyor: 146 ppv.StateMachine.add('PLACEONCONVEYOR', ppv.Placeonconveyor(), 147 transitions={'success':'DETACH', 148 'failed':'PLACEONCONVEYOR',149 'timed_out': 'TIMED_OUT'},150 remapping={'color': 'sm_color','counter':'sm_counter'})151 ppv.StateMachine.add('DETACH', ppv.Detach_object(),152 transitions={'success':'SUCCEEDED', 153 'failed':'DETACH',154 'timed_out': 'TIMED_OUT'},155 remapping={'counter':'sm_counter'})156 # ppv.StateMachine.add('LIFTUP', ppv.Liftup(),157 # transitions={'success': 'SUCCEEDED', 158 # 'failed':'LIFTUP',159 # 'timed_out': 'TIMED_OUT'},160 # remapping={'counter':'sm_counter'})161 # Execute SMACH plan162 outcome_place = PlaceOnConveyor.execute()163 return outcome_place164 except ppv.rospy.ROSInterruptException:165 return166 except KeyboardInterrupt:167 return168def main():169 rospack = rospkg.RosPack() # get an instance of RosPack with the default search paths170 path = rospack.get_path('sawyer_irl_project') # get the file path for sawyer_irl_project171 if len(ppv.sys.argv) < 1:172 print "Default policy - expert chosen"173 policyfile = "expert.csv"174 else:175 policyfile = ppv.sys.argv[1]176 print "\n{} chosen".format(policyfile)177 178 policy = ppv.np.genfromtxt(path+'/scripts/'+ policyfile, delimiter=' ')179 actList = {0:InspectAfterPicking, 1:PlaceOnConveyor, 2:PlaceInBin, 3:Pick, 4:ClaimNewOnion} 180 # print "\nI'm in main now!"181 ppv.rospy.init_node('policy_exec_phys', anonymous=True, disable_signals=False)182 rgbtopic = '/kinect2/hd/image_color_rect'183 depthtopic = '/kinect2/hd/image_depth_rect'184 camerainfo = '/kinect2/hd/camera_info'185 choice = 'real'186 # camera = ppv.Camera('kinectv2', rgbtopic, depthtopic, camerainfo, choice)187 # ppv.getCameraInstance(camera)188 # ppv.pnp.goto_home(tolerance=0.1, goal_tol=0.1, orientation_tol=0.1)189 outcome = actList[4]() 190 while not ppv.rospy.is_shutdown() and outcome != 'SORT COMPLETE':191 print '\n OUTCOME: ', outcome192 # ppv.rospy.sleep(10)193 try:194 print '\nCurrent state is: ',ppv.current_state195 print '\nExecuting action: ',policy[ppv.current_state]196 outcome = actList[policy[ppv.current_state]]()197 if outcome == 'TIMED_OUT' or outcome == 'FAILED':198 print("\nTimed out/Failed, so going back to claim again!")199 outcome = actList[4]()200 except ppv.rospy.ROSInterruptException:201 ppv.rospy.signal_shutdown("Shutting down node, ROS interrupt received!")202 except KeyboardInterrupt:203 ppv.rospy.signal_shutdown("Shutting down node, Keyboard interrupt received!")204 # ppv.rospy.spin()205 206if __name__ == '__main__':207 # print("\nCalling Main!")208 try:209 main()210 except ppv.rospy.ROSInterruptException:...

Full Screen

Full Screen

test_entry_notifier.py

Source:test_entry_notifier.py Github

copy

Full Screen

1from __future__ import print_function2'''----------------------------------------------------------------------------'''3''' Copyright (c) FIRST 2017. All Rights Reserved. '''4''' Open Source Software - may be modified and shared by FRC teams. The code '''5''' must be accompanied by the FIRST BSD license file in the root directory of '''6''' the project. '''7'''----------------------------------------------------------------------------'''8#9# These tests are adapted from ntcore's test suite10#11import pytest12from ntcore.constants import (13 NT_NOTIFY_IMMEDIATE,14 NT_NOTIFY_LOCAL,15 NT_NOTIFY_NEW,16 NT_NOTIFY_DELETE,17 NT_NOTIFY_UPDATE,18 NT_NOTIFY_FLAGS,19)20from ntcore.entry_notifier import EntryNotifier21from ntcore.value import Value22@pytest.fixture23def notifier():24 n = EntryNotifier(verbose=True)25 n.start()26 yield n27 n.stop()28def generateNotifications(notifier):29 # All flags combos that can be generated by Storage30 flags = [31 # "normal" notifications32 NT_NOTIFY_NEW, NT_NOTIFY_DELETE, NT_NOTIFY_UPDATE, NT_NOTIFY_FLAGS,33 NT_NOTIFY_UPDATE | NT_NOTIFY_FLAGS,34 # immediate notifications are always "new"35 NT_NOTIFY_IMMEDIATE | NT_NOTIFY_NEW,36 # local notifications can be of any flag combo37 NT_NOTIFY_LOCAL | NT_NOTIFY_NEW, NT_NOTIFY_LOCAL | NT_NOTIFY_DELETE,38 NT_NOTIFY_LOCAL | NT_NOTIFY_UPDATE, NT_NOTIFY_LOCAL | NT_NOTIFY_FLAGS,39 NT_NOTIFY_LOCAL | NT_NOTIFY_UPDATE | NT_NOTIFY_FLAGS40 ]41 42 # Generate across keys43 keys = ["/foo/bar", "/baz", "/boo"]44 val = Value.makeDouble(1)45 # Provide unique key indexes for each key46 keyindex = 547 for key in keys:48 for flag in flags:49 notifier.notifyEntry(keyindex, key, val, flag)50 51 keyindex += 152def test_PollEntryMultiple(notifier):53 poller1 = notifier.createPoller()54 poller2 = notifier.createPoller()55 poller3 = notifier.createPoller()56 h1 = notifier.addPolledById(poller1, 6, NT_NOTIFY_NEW)57 h2 = notifier.addPolledById(poller2, 6, NT_NOTIFY_NEW)58 h3 = notifier.addPolledById(poller3, 6, NT_NOTIFY_UPDATE)59 assert not notifier.m_local_notifiers60 generateNotifications(notifier)61 assert notifier.waitForQueue(1.0)62 63 results1, timed_out = notifier.poll(poller1, 0)64 assert not timed_out65 results2, timed_out = notifier.poll(poller2, 0)66 assert not timed_out67 results3, timed_out = notifier.poll(poller3, 0)68 assert not timed_out69 assert len(results1) == 270 for h, result in results1:71 print(result)72 assert h == h173 assert len(results2) == 274 for h, result in results2:75 print(result)76 assert h == h277 assert len(results3) == 278 for h, result in results3:79 print(result)80 assert h == h381def test_PollEntryBasic(notifier):82 poller = notifier.createPoller()83 g1 = notifier.addPolledById(poller, 6, NT_NOTIFY_NEW)84 g2 = notifier.addPolledById(poller, 6, NT_NOTIFY_DELETE)85 g3 = notifier.addPolledById(poller, 6, NT_NOTIFY_UPDATE)86 g4 = notifier.addPolledById(poller, 6, NT_NOTIFY_FLAGS)87 assert not notifier.m_local_notifiers88 generateNotifications(notifier)89 assert notifier.waitForQueue(1.0)90 91 timed_out = False92 results, timed_out = notifier.poll(poller, 0)93 assert not timed_out94 g1count = 095 g2count = 096 g3count = 097 g4count = 098 for h, result in results:99 print(h, result)100 assert result.name == "/baz"101 assert result.value == Value.makeDouble(1)102 assert result.local_id == 6103 104 if h == g1:105 g1count += 1106 assert (result.flags & NT_NOTIFY_NEW) != 0107 elif h == g2:108 g2count += 1109 assert (result.flags & NT_NOTIFY_DELETE) != 0110 elif h == g3:111 g3count += 1112 assert (result.flags & NT_NOTIFY_UPDATE) != 0113 elif h == g4:114 g4count += 1115 assert (result.flags & NT_NOTIFY_FLAGS) != 0116 else:117 assert False, "unknown listener index"118 119 assert g1count == 2120 assert g2count == 1 # NT_NOTIFY_DELETE121 assert g3count == 2122 assert g4count == 2123def test_PollEntryImmediate(notifier):124 poller = notifier.createPoller()125 notifier.addPolledById(poller, 6, NT_NOTIFY_NEW | NT_NOTIFY_IMMEDIATE)126 notifier.addPolledById(poller, 6, NT_NOTIFY_NEW)127 assert not notifier.m_local_notifiers128 generateNotifications(notifier)129 assert notifier.waitForQueue(1.0)130 131 results, timed_out = notifier.poll(poller, 0)132 assert not timed_out133 print(results)134 assert len(results) == 4135def test_PollEntryLocal(notifier):136 poller = notifier.createPoller()137 notifier.addPolledById(poller, 6, NT_NOTIFY_NEW | NT_NOTIFY_LOCAL)138 notifier.addPolledById(poller, 6, NT_NOTIFY_NEW)139 assert notifier.m_local_notifiers140 generateNotifications(notifier)141 assert notifier.waitForQueue(1.0)142 143 results, timed_out = notifier.poll(poller, 0)144 assert not timed_out145 print(results)146 assert len(results) == 6147def test_PollPrefixMultiple(notifier):148 poller1 = notifier.createPoller()149 poller2 = notifier.createPoller()150 poller3 = notifier.createPoller()151 h1 = notifier.addPolled(poller1, "/foo", NT_NOTIFY_NEW)152 h2 = notifier.addPolled(poller2, "/foo", NT_NOTIFY_NEW)153 h3 = notifier.addPolled(poller3, "/foo", NT_NOTIFY_UPDATE)154 assert not notifier.m_local_notifiers155 generateNotifications(notifier)156 assert notifier.waitForQueue(1.0)157 158 results1, timed_out = notifier.poll(poller1, 0)159 assert not timed_out160 results2, timed_out = notifier.poll(poller2, 0)161 assert not timed_out162 results3, timed_out = notifier.poll(poller3, 0)163 assert not timed_out164 assert len(results1) == 2165 for h, result in results1:166 print(result)167 assert h == h1168 169 assert len(results2) == 2170 for h, result in results2:171 print(result)172 assert h == h2173 174 assert len(results3) == 2175 for h, result in results3:176 print(result)177 assert h == h3178def test_PollPrefixBasic(notifier):179 poller = notifier.createPoller()180 g1 = notifier.addPolled(poller, "/foo", NT_NOTIFY_NEW)181 g2 = notifier.addPolled(poller, "/foo", NT_NOTIFY_DELETE)182 g3 = notifier.addPolled(poller, "/foo", NT_NOTIFY_UPDATE)183 g4 = notifier.addPolled(poller, "/foo", NT_NOTIFY_FLAGS)184 notifier.addPolled(poller, "/bar", NT_NOTIFY_NEW)185 notifier.addPolled(poller, "/bar", NT_NOTIFY_DELETE)186 notifier.addPolled(poller, "/bar", NT_NOTIFY_UPDATE)187 notifier.addPolled(poller, "/bar", NT_NOTIFY_FLAGS)188 assert not notifier.m_local_notifiers189 generateNotifications(notifier)190 assert notifier.waitForQueue(1.0)191 192 results, timed_out = notifier.poll(poller, 0)193 assert not timed_out194 g1count = 0195 g2count = 0196 g3count = 0197 g4count = 0198 for h, result in results:199 print(result)200 assert result.name.startswith("/foo")201 assert result.value == Value.makeDouble(1)202 203 if h == g1:204 g1count += 1205 assert (result.flags & NT_NOTIFY_NEW) != 0206 elif h == g2:207 g2count += 1208 assert (result.flags & NT_NOTIFY_DELETE) != 0209 elif h == g3:210 g3count += 1211 assert (result.flags & NT_NOTIFY_UPDATE) != 0212 elif h == g4:213 g4count += 1214 assert (result.flags & NT_NOTIFY_FLAGS) != 0215 else:216 assert False, "unknown listener index"217 218 assert g1count == 2219 assert g2count == 1 # NT_NOTIFY_DELETE220 assert g3count == 2221 assert g4count == 2222def test_PollPrefixImmediate(notifier):223 poller = notifier.createPoller()224 notifier.addPolled(poller, "/foo", NT_NOTIFY_NEW | NT_NOTIFY_IMMEDIATE)225 notifier.addPolled(poller, "/foo", NT_NOTIFY_NEW)226 assert not notifier.m_local_notifiers227 generateNotifications(notifier)228 assert notifier.waitForQueue(1.0)229 230 results, timed_out = notifier.poll(poller, 0)231 assert not timed_out232 print(results)233 assert len(results) == 4234def test_PollPrefixLocal(notifier):235 poller = notifier.createPoller()236 notifier.addPolled(poller, "/foo", NT_NOTIFY_NEW | NT_NOTIFY_LOCAL)237 notifier.addPolled(poller, "/foo", NT_NOTIFY_NEW)238 assert notifier.m_local_notifiers239 generateNotifications(notifier)240 assert notifier.waitForQueue(1.0)241 242 results, timed_out = notifier.poll(poller, 0)243 assert not timed_out244 print(results)...

Full Screen

Full Screen

nested-textmenus.py

Source:nested-textmenus.py Github

copy

Full Screen

1from discord.ext import commands2from cogs.utils import textmenus3description = "Nested textmenus example"4bot = commands.Bot(command_prefix='?', description=description)5class CharacterSheetMenu(textmenus.Menu):6 timed_out = False7 def get_initial_message(self):8 details = "\n".join(["**Name:** Guin", "**Class:**", "**Stats:**", "**Feats:**", "**Professions:**"])9 return f"Welcome to your character sheet.\n{details}"10 def finalize(self, timed_out):11 self.timed_out = timed_out12 @textmenus.submenu('View your inventory')13 async def view_inventory(self, payload):14 await self.ctx.channel.send("Your inventory would be listed here.")15 @textmenus.submenu('View your spells')16 async def view_spells(self, payload):17 await self.ctx.channel.send("Your spells would be listed here.")18class WorkshopMenu(textmenus.Menu):19 timed_out = False20 def get_initial_message(self):21 return "Welcome to the workshop."22 def finalize(self, timed_out):23 self.timed_out = timed_out24 @textmenus.submenu('Create a mundane item')25 async def craft_mundane(self, payload):26 await self.ctx.channel.send("Your mundane recipes would be listed here.")27 @textmenus.submenu('Create a consumable item')28 async def craft_consumable(self, payload):29 await self.ctx.channel.send("Your consumable recipes would be listed here.")30class MarketMenu(textmenus.Menu):31 timed_out = False32 def get_initial_message(self):33 return "Welcome to the market."34 def finalize(self, timed_out):35 self.timed_out = timed_out36 @textmenus.submenu('Buy items from the market')37 async def buy_item_menu(self, payload):38 await self.ctx.channel.send("This would list the menu for buying items from the market")39 @textmenus.submenu('Sell items on the market')40 async def sell_item_menu(self, payload):41 await self.ctx.channel.send("This would list the menu for selling items on the market")42class MainMenu(textmenus.Menu):43 """44 Definition of main menu object45 """46 timed_out = False47 def get_initial_message(self):48 return "Welcome to the main menu."49 def finalize(self, timed_out):50 self.timed_out = timed_out51 @textmenus.submenu('Character Sheet')52 async def character_sheet(self, payload):53 await CharacterSheetMenu().start(self.ctx)54 @textmenus.submenu('Workshop')55 async def workshop(self, payload):56 await WorkshopMenu().start(self.ctx)57 @textmenus.submenu('Market')58 async def market(self, payload):59 await MarketMenu().start(self.ctx)60@bot.command()61async def menu_example(ctx):62 m = MainMenu()63 await m.start(ctx)64@bot.command()65async def looping_menu_example(ctx):66 def get_exit_reason(menu):67 if isinstance(menu.exception, textmenus.ExitException):68 return "'exit' received."69 if isinstance(menu.exception, textmenus.StopException):70 return "'stop' received."71 if menu.timed_out:72 return "Menu timed out."73 return ""74 m = MainMenu()75 await m.start(ctx)76 await ctx.channel.send(f"Closing menu. {get_exit_reason(m)}")77# bot.run('token')78import secrets...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful