How to use wait_for_request method in Playwright Python

Best Python code snippet using playwright-python

action.py

Source:action.py Github

copy

Full Screen

1import json2import logging3import requests4from urllib.parse import urljoin5from tabulate import tabulate6from waiter import http_util, terminal7from waiter.format import format_last_request_time8from waiter.format import format_status9from waiter.querying import get_service, get_services_using_token10from waiter.querying import print_no_data, query_service, query_services, query_token11from waiter.util import is_service_current, str2bool, response_message, print_error, wait_until12def ping_on_cluster(cluster, timeout, wait_for_request, token_name, service_exists_fn):13 """Pings using the given token name (or ^SERVICE-ID#) in the given cluster."""14 cluster_name = cluster['name']15 def perform_ping():16 status = None17 try:18 default_queue_timeout_millis = 30000019 timeout_seconds = timeout if wait_for_request else 520 timeout_millis = timeout_seconds * 100021 headers = {22 'X-Waiter-Queue-Timeout': str(max(default_queue_timeout_millis, timeout_millis)),23 'X-Waiter-Token': token_name,24 'X-Waiter-Timeout': str(timeout_millis)25 }26 read_timeout = timeout_seconds if wait_for_request else (timeout_seconds + 5)27 resp = http_util.get(cluster, '/waiter-ping', headers=headers, read_timeout=read_timeout)28 logging.debug(f'Response status code: {resp.status_code}')29 resp_json = resp.json()30 if resp.status_code == 200:31 status = resp_json.get('service-state', {}).get('status')32 ping_response = resp_json['ping-response']33 ping_response_result = ping_response['result']34 if ping_response_result in ['descriptor-error', 'received-response']:35 ping_response_status = ping_response['status']36 if ping_response_status == 200:37 print(terminal.success('Ping successful.'))38 result = True39 else:40 print_error(f'Ping responded with non-200 status {ping_response_status}.')41 try:42 ping_response_waiter_error = json.loads(ping_response['body'])['waiter-error']['message']43 print_error(ping_response_waiter_error)44 except json.JSONDecodeError:45 logging.debug('Ping response is not in json format, cannot display waiter-error message.')46 except KeyError:47 logging.debug('Ping response body does not contain waiter-error message.')48 result = False49 elif ping_response_result == 'timed-out':50 if wait_for_request:51 print_error('Ping request timed out.')52 result = False53 else:54 logging.debug('ignoring ping request timeout due to --no-wait')55 result = True56 else:57 print_error(f'Encountered unknown ping result: {ping_response_result}.')58 result = False59 else:60 print_error(response_message(resp_json))61 result = False62 except Exception:63 result = False64 message = f'Encountered error while pinging in {cluster_name}.'65 logging.exception(message)66 if wait_for_request:67 print_error(message)68 return result, status69 def check_service_status():70 result = wait_until(service_exists_fn, timeout=timeout, interval=5)71 if result:72 print(f'Service is currently {format_status(result["status"])}.')73 return True74 else:75 print_error('Timeout while waiting for service to start.')76 return False77 succeeded, service_status = perform_ping()78 if succeeded:79 if service_status is None or service_status == 'Inactive':80 check_service_status()81 else:82 print(f'Service is currently {format_status(service_status)}.')83 return succeeded84def token_has_current_service(cluster, token_name, current_token_etag):85 """If the given token has a "current" service, returns that service else None"""86 services = get_services_using_token(cluster, token_name)87 if services is not None:88 services = [s for s in services if is_service_current(s, current_token_etag, token_name)]89 return services[0] if len(services) > 0 else False90 else:91 cluster_name = cluster['name']92 print_error(f'Unable to retrieve services using token {token_name} in {cluster_name}.')93 return None94def ping_token_on_cluster(cluster, token_name, timeout, wait_for_request,95 current_token_etag):96 """Pings the token with the given token name in the given cluster."""97 cluster_name = cluster['name']98 print(f'Pinging token {terminal.bold(token_name)} '99 f'in {terminal.bold(cluster_name)}...')100 return ping_on_cluster(cluster, timeout, wait_for_request,101 token_name, lambda: token_has_current_service(cluster, token_name, current_token_etag))102def service_is_active(cluster, service_id):103 """If the given service id is active, returns the service, else None"""104 service = get_service(cluster, service_id)105 return service if (service and service.get('status') != 'Inactive') else None106def ping_service_on_cluster(cluster, service_id, timeout, wait_for_request):107 """Pings the service with the given service id in the given cluster."""108 cluster_name = cluster['name']109 print(f'Pinging service {terminal.bold(service_id)} '110 f'in {terminal.bold(cluster_name)}...')111 return ping_on_cluster(cluster, timeout, wait_for_request,112 f'^SERVICE-ID#{service_id}', lambda: service_is_active(cluster, service_id))113def kill_service_on_cluster(cluster, service_id, timeout_seconds):114 """Kills the service with the given service id in the given cluster."""115 cluster_name = cluster['name']116 http_util.set_retries(0)117 try:118 print(f'Killing service {terminal.bold(service_id)} in {terminal.bold(cluster_name)}...')119 params = {'timeout': timeout_seconds * 1000}120 resp = http_util.delete(cluster, f'/apps/{service_id}', params=params, read_timeout=timeout_seconds)121 logging.debug(f'Response status code: {resp.status_code}')122 if resp.status_code == 200:123 routers_agree = resp.json().get('routers-agree')124 if routers_agree:125 print(f'Successfully killed {service_id} in {cluster_name}.')126 return True127 else:128 print(f'Successfully killed {service_id} in {cluster_name}. '129 f'Server-side timeout waiting for routers to update.')130 return False131 else:132 print_error(response_message(resp.json()))133 return False134 except requests.exceptions.ReadTimeout:135 message = f'Request timed out while killing {service_id} in {cluster_name}.'136 logging.exception(message)137 print_error(message)138 except Exception:139 message = f'Encountered error while killing {service_id} in {cluster_name}.'140 logging.exception(message)141 print_error(message)142def process_kill_request(clusters, token_name_or_service_id, is_service_id, force_flag, timeout_secs,143 no_service_result=False):144 """Kills the service(s) using the given token name or service-id.145 Returns False if no services can be found or if there was a failure in deleting any service.146 Returns True if all services using the token were deleted successfully."""147 if is_service_id:148 query_result = query_service(clusters, token_name_or_service_id)149 num_services = query_result['count']150 if num_services == 0:151 print_no_data(clusters)152 return no_service_result153 else:154 query_result = query_services(clusters, token_name_or_service_id)155 num_services = query_result['count']156 if num_services == 0:157 clusters_text = ' / '.join([terminal.bold(c['name']) for c in clusters])158 print(f'There are no services using token {terminal.bold(token_name_or_service_id)} in {clusters_text}.')159 return no_service_result160 elif num_services > 1:161 print(f'There are {num_services} services using token {terminal.bold(token_name_or_service_id)}:')162 cluster_data_pairs = sorted(query_result['clusters'].items())163 clusters_by_name = {c['name']: c for c in clusters}164 overall_success = True165 for cluster_name, data in cluster_data_pairs:166 if is_service_id:167 service = data['service']168 service['service-id'] = token_name_or_service_id169 services = [service]170 else:171 services = sorted(data['services'], key=lambda s: s.get('last-request-time', None) or '', reverse=True)172 for service in services:173 service_id = service['service-id']174 cluster = clusters_by_name[cluster_name]175 status_string = service['status']176 status = format_status(status_string)177 inactive = status_string == 'Inactive'178 should_kill = False179 if force_flag or num_services == 1:180 should_kill = True181 else:182 url = urljoin(cluster['url'], f'apps/{service_id}')183 if is_service_id:184 active_instances = service['instances']['active-instances']185 healthy_count = len([i for i in active_instances if i['healthy?']])186 unhealthy_count = len([i for i in active_instances if not i['healthy?']])187 else:188 healthy_count = service['instance-counts']['healthy-instances']189 unhealthy_count = service['instance-counts']['unhealthy-instances']190 last_request_time = format_last_request_time(service)191 run_as_user = service['service-description']['run-as-user']192 table = [['Status', status],193 ['Healthy', healthy_count],194 ['Unhealthy', unhealthy_count],195 ['URL', url],196 ['Run as user', run_as_user]]197 table_text = tabulate(table, tablefmt='plain')198 print(f'\n'199 f'=== {terminal.bold(cluster_name)} / {terminal.bold(service_id)} ===\n'200 f'\n'201 f'Last Request: {last_request_time}\n'202 f'\n'203 f'{table_text}\n')204 if not inactive:205 answer = input(f'Kill service in {terminal.bold(cluster_name)}? ')206 should_kill = str2bool(answer)207 if inactive:208 print(f'Service {terminal.bold(service_id)} on {terminal.bold(cluster_name)} cannot be killed because '209 f'it is already {status}.')210 should_kill = False211 if should_kill:212 success = kill_service_on_cluster(cluster, service_id, timeout_secs)213 overall_success = overall_success and success214 return overall_success215def token_explicitly_created_on_cluster(cluster, token_cluster_name):216 """Returns true if the given token cluster matches the configured cluster name of the given cluster"""217 cluster_settings, _ = http_util.make_data_request(cluster, lambda: http_util.get(cluster, '/settings'))218 cluster_config_name = cluster_settings['cluster-config']['name'].upper()219 created_on_this_cluster = token_cluster_name == cluster_config_name220 return created_on_this_cluster221def process_ping_request(clusters, token_name_or_service_id, is_service_id, timeout_secs, wait_for_request):222 """Pings the service using the given token name or service-id.223 Returns False if the ping request fails or times out.224 Returns True if the ping request completes successfully."""225 if is_service_id:226 query_result = query_service(clusters, token_name_or_service_id)227 else:228 query_result = query_token(clusters, token_name_or_service_id)229 if query_result['count'] == 0:230 print_no_data(clusters)231 return False232 http_util.set_retries(0)233 cluster_data_pairs = sorted(query_result['clusters'].items())234 clusters_by_name = {c['name']: c for c in clusters}235 overall_success = True236 for cluster_name, data in cluster_data_pairs:237 cluster = clusters_by_name[cluster_name]238 if is_service_id:239 success = ping_service_on_cluster(cluster, token_name_or_service_id, timeout_secs, wait_for_request)240 else:241 token_data = data['token']242 token_etag = data['etag']243 token_cluster_name = token_data['cluster'].upper()244 if len(clusters) == 1 or token_explicitly_created_on_cluster(cluster, token_cluster_name):245 success = ping_token_on_cluster(cluster, token_name_or_service_id, timeout_secs,246 wait_for_request, token_etag)247 else:248 print(f'Not pinging token {terminal.bold(token_name_or_service_id)} '249 f'in {terminal.bold(cluster_name)} '250 f'because it was created in {terminal.bold(token_cluster_name)}.')251 success = True252 overall_success = overall_success and success...

Full Screen

Full Screen

test_django.py

Source:test_django.py Github

copy

Full Screen

...30 User.objects.all().delete()31def test_notify(bugsnag_server, django_client):32 response = django_client.get('/notes/handled-exception/?foo=strawberry')33 assert response.status_code == 20034 bugsnag_server.wait_for_request()35 payload = bugsnag_server.received[0]['json_body']36 event = payload['events'][0]37 exception = event['exceptions'][0]38 assert payload['apiKey'] == 'a05afff2bd2ffaf0ab0f52715bbdcffd'39 assert event['context'] == 'notes.views.handle_notify'40 assert event['severityReason'] == {'type': 'handledException'}41 assert event['device']['runtimeVersions']['django'] == django.__version__42 assert 'environment' not in payload['events'][0]['metaData']43 assert event['metaData']['request'] == {44 'method': 'GET',45 'url': 'http://testserver/notes/handled-exception/?foo=strawberry',46 'path': '/notes/handled-exception/',47 'POST': {},48 'encoding': None,49 'GET': {'foo': ['strawberry']}50 }51 assert event['user'] == {}52 assert exception['errorClass'] == 'KeyError'53 assert exception['message'] == "'nonexistent-item'"54 assert exception['stacktrace'][0] == {55 'code': {56 '39': 'def handle_notify(request):',57 '40': ' items = {}',58 '41': ' try:',59 '42': ' print("item: {}" % items["nonexistent-item"])',60 '43': ' except KeyError as e:',61 '44': " bugsnag.notify(e, unhappy='nonexistent-file')",62 '45': '',63 },64 'file': 'notes/views.py',65 'inProject': True,66 'lineNumber': 42,67 'method': 'handle_notify',68 }69def test_enable_environment(bugsnag_server, django_client):70 bugsnag.configure(send_environment=True)71 response = django_client.get('/notes/handled-exception/?foo=strawberry')72 assert response.status_code == 20073 bugsnag_server.wait_for_request()74 payload = bugsnag_server.received[0]['json_body']75 event = payload['events'][0]76 assert event['metaData']['environment']['REQUEST_METHOD'] == 'GET'77def test_notify_custom_info(bugsnag_server, django_client):78 django_client.get('/notes/handled-exception-custom/')79 bugsnag_server.wait_for_request()80 payload = bugsnag_server.received[0]['json_body']81 event = payload['events'][0]82 assert payload['apiKey'] == 'a05afff2bd2ffaf0ab0f52715bbdcffd'83 assert event['context'] == 'custom_info'84 assert event['severityReason'] == {'type': 'userSpecifiedSeverity'}85 assert event['severity'] == 'info'86def test_notify_post_body(bugsnag_server, django_client):87 response = django_client.post('/notes/handled-exception/',88 '{"foo": "strawberry"}',89 content_type='application/json')90 assert response.status_code == 20091 bugsnag_server.wait_for_request()92 payload = bugsnag_server.received[0]['json_body']93 event = payload['events'][0]94 exception = event['exceptions'][0]95 assert payload['apiKey'] == 'a05afff2bd2ffaf0ab0f52715bbdcffd'96 assert event['context'] == 'notes.views.handle_notify'97 assert event['severityReason'] == {'type': 'handledException'}98 assert event['severity'] == 'warning'99 assert event['device']['runtimeVersions']['django'] == django.__version__100 assert event['metaData']['request'] == {101 'method': 'POST',102 'url': 'http://testserver/notes/handled-exception/',103 'path': '/notes/handled-exception/',104 'GET': {},105 'encoding': None,106 'POST': {'foo': 'strawberry'}107 }108 assert event['user'] == {}109 assert exception['errorClass'] == 'KeyError'110 assert exception['message'] == "'nonexistent-item'"111 assert exception['stacktrace'][0] == {112 'code': {113 '39': 'def handle_notify(request):',114 '40': ' items = {}',115 '41': ' try:',116 '42': ' print("item: {}" % items["nonexistent-item"])',117 '43': ' except KeyError as e:',118 '44': " bugsnag.notify(e, unhappy='nonexistent-file')",119 '45': '',120 },121 'file': 'notes/views.py',122 'inProject': True,123 'lineNumber': 42,124 'method': 'handle_notify',125 }126def test_unhandled_exception(bugsnag_server, django_client):127 with pytest.raises(RuntimeError):128 django_client.get('/notes/unhandled-crash/')129 bugsnag_server.wait_for_request()130 payload = bugsnag_server.received[0]['json_body']131 event = payload['events'][0]132 exception = event['exceptions'][0]133 assert payload['apiKey'] == 'a05afff2bd2ffaf0ab0f52715bbdcffd'134 assert event['context'] == 'crash'135 assert event['severityReason'] == {136 'type': 'unhandledExceptionMiddleware',137 'attributes': {'framework': 'Django'}138 }139 assert event['device']['runtimeVersions']['django'] == django.__version__140 assert event['metaData']['request'] == {141 'method': 'GET',142 'url': 'http://testserver/notes/unhandled-crash/',143 'path': '/notes/unhandled-crash/',144 'POST': {},145 'encoding': None,146 'GET': {}147 }148 assert event['user'] == {}149 assert exception['errorClass'] == 'RuntimeError'150 assert exception['message'] == 'failed to return in time'151 assert exception['stacktrace'][0] == {152 'method': 'unhandled_crash',153 'file': 'notes/views.py',154 'lineNumber': 32,155 'inProject': True,156 'code': {157 '29': '',158 '30': '',159 '31': 'def unhandled_crash(request):',160 '32': " raise RuntimeError('failed to return in time')",161 '33': '',162 '34': '',163 '35': 'def unhandled_crash_in_template(request):',164 },165 }166 assert exception['stacktrace'][1]['inProject'] is False167def test_unhandled_exception_in_template(bugsnag_server, django_client):168 with pytest.raises(TemplateSyntaxError):169 django_client.get('/notes/unhandled-template-crash/')170 bugsnag_server.wait_for_request()171 payload = bugsnag_server.received[0]['json_body']172 event = payload['events'][0]173 exception = event['exceptions'][0]174 assert payload['apiKey'] == 'a05afff2bd2ffaf0ab0f52715bbdcffd'175 assert event['context'] == 'notes.views.unhandled_crash_in_template'176 assert event['severityReason'] == {177 'type': 'unhandledExceptionMiddleware',178 'attributes': {'framework': 'Django'}179 }180 assert event['device']['runtimeVersions']['django'] == django.__version__181 assert event['metaData']['request'] == {182 'method': 'GET',183 'url': 'http://testserver/notes/unhandled-template-crash/',184 'path': '/notes/unhandled-template-crash/',185 'POST': {},186 'encoding': None,187 'GET': {}188 }189 assert event['user'] == {}190 assert exception['errorClass'] == template_error_class191 assert 'idunno()' in exception['message']192def test_ignores_http404(bugsnag_server, django_client):193 response = django_client.get('/notes/missing_route/')194 assert response.status_code == 404195 with pytest.raises(MissingRequestError):196 bugsnag_server.wait_for_request()197 assert len(bugsnag_server.received) == 0198def test_report_error_from_http404handler(bugsnag_server, django_client):199 with pytest.raises(Exception):200 django_client.get('/notes/poorly-handled-404')201 bugsnag_server.wait_for_request()202 payload = bugsnag_server.received[0]['json_body']203 event = payload['events'][0]204 exception = event['exceptions'][0]205 assert payload['apiKey'] == 'a05afff2bd2ffaf0ab0f52715bbdcffd'206 assert event['context'] == 'GET /notes/poorly-handled-404'207 assert event['severityReason'] == {208 'type': 'unhandledExceptionMiddleware',209 'attributes': {'framework': 'Django'}210 }211 assert event['device']['runtimeVersions']['django'] == django.__version__212 assert event['metaData']['request'] == {213 'method': 'GET',214 'url': 'http://testserver/notes/poorly-handled-404',215 'path': '/notes/poorly-handled-404',216 'POST': {},217 'encoding': None,218 'GET': {}219 }220 assert exception['errorClass'] == 'Exception'221 assert exception['message'] == 'nah'222 assert exception['stacktrace'][0] == {223 'method': 'handler404',224 'file': 'todo/urls.py',225 'lineNumber': 11,226 'inProject': True,227 'code': {228 '8': '',229 '9': 'def handler404(request, *args, **kwargs):',230 '10': " if 'poorly-handled-404' in request.path:",231 '11': " raise Exception('nah')",232 '12': '',233 '13': " response = HttpResponseNotFound('Terrible happenings!',", # noqa: E501234 '14': " content_type='text/plain')", # noqa: E501235 },236 }237def test_notify_appends_user_data(bugsnag_server, django_client):238 django_client.login(username='test', password='hunter2')239 response = django_client.get('/notes/handled-exception/?foo=strawberry')240 assert response.status_code == 200241 bugsnag_server.wait_for_request()242 payload = bugsnag_server.received[0]['json_body']243 event = payload['events'][0]244 exception = event['exceptions'][0]245 assert payload['apiKey'] == 'a05afff2bd2ffaf0ab0f52715bbdcffd'246 assert event['context'] == 'notes.views.handle_notify'247 assert event['severityReason'] == {'type': 'handledException'}248 assert event['device']['runtimeVersions']['django'] == django.__version__249 assert event['metaData']['custom']['unhappy'] == 'nonexistent-file'250 assert event['metaData']['request'] == {251 'method': 'GET',252 'url': 'http://testserver/notes/handled-exception/?foo=strawberry',253 'path': '/notes/handled-exception/',254 'POST': {},255 'encoding': None,256 'GET': {'foo': ['strawberry']}257 }258 assert event['user'] == {'email': 'test@example.com', 'id': 'test'}259 assert exception['errorClass'] == 'KeyError'260 assert exception['message'] == "'nonexistent-item'"261 assert exception['stacktrace'][0] == {262 'code': {263 '39': 'def handle_notify(request):',264 '40': ' items = {}',265 '41': ' try:',266 '42': ' print("item: {}" % items["nonexistent-item"])',267 '43': ' except KeyError as e:',268 '44': " bugsnag.notify(e, unhappy='nonexistent-file')",269 '45': '',270 },271 'file': 'notes/views.py',272 'inProject': True,273 'lineNumber': 42,274 'method': 'handle_notify',275 }276def test_crash_appends_user_data(bugsnag_server, django_client):277 django_client.login(username='test', password='hunter2')278 with pytest.raises(RuntimeError):279 django_client.get('/notes/unhandled-crash/')280 bugsnag_server.wait_for_request()281 payload = bugsnag_server.received[0]['json_body']282 event = payload['events'][0]283 exception = event['exceptions'][0]284 assert payload['apiKey'] == 'a05afff2bd2ffaf0ab0f52715bbdcffd'285 assert event['context'] == 'crash'286 assert event['severityReason'] == {287 'type': 'unhandledExceptionMiddleware',288 'attributes': {'framework': 'Django'}289 }290 assert event['device']['runtimeVersions']['django'] == django.__version__291 assert event['metaData']['request'] == {292 'method': 'GET',293 'url': 'http://testserver/notes/unhandled-crash/',294 'path': '/notes/unhandled-crash/',295 'POST': {},296 'encoding': None,297 'GET': {}298 }299 assert event['user'] == {'email': 'test@example.com', 'id': 'test'}300 assert exception['errorClass'] == 'RuntimeError'301 assert exception['message'] == 'failed to return in time'302 assert exception['stacktrace'][0] == {303 'method': 'unhandled_crash',304 'file': 'notes/views.py',305 'lineNumber': 32,306 'inProject': True,307 'code': {308 '29': '',309 '30': '',310 '31': 'def unhandled_crash(request):',311 '32': " raise RuntimeError('failed to return in time')",312 '33': '',313 '34': '',314 '35': 'def unhandled_crash_in_template(request):',315 },316 }317 assert exception['stacktrace'][1]['inProject'] is False318def test_read_request_in_callback(bugsnag_server, django_client):319 with pytest.raises(RuntimeError):320 django_client.get('/notes/crash-with-callback/?user_id=foo')321 bugsnag_server.wait_for_request()322 payload = bugsnag_server.received[0]['json_body']323 event = payload['events'][0]...

Full Screen

Full Screen

better_go_for_tea_sm.py

Source:better_go_for_tea_sm.py Github

copy

Full Screen

1#!/usr/bin/env python2import rospy3import sys4import smach5from std_msgs.msg import String6from smach_ros import IntrospectionServer, SimpleActionState, ServiceState7from move_base_msgs.msg import MoveBaseAction8from itr_tutorials.srv import Str2Coord, Str2CoordRequest9from smach import CBState10#WAITFORREQUEST STATE!!! (we got this from the slide) 11class WaitForRequest(smach.State):12 def __init__(self):13 smach.State.__init__(self, outcomes=['request_received','not_request_received','rest_needed'], #the three possible outcomes for the WAIT_FOR_REQUEST state 14 output_keys = ['out_request_loc','out_resting'],15 input_keys = ['in_resting']) #userdata allows states to communicate with each other16 17 self.request = [] #i think it makes it empty so far and changes in the execute function18 19 self.request_subs = rospy.Subscriber('/get_me_tea', String, self.request_cb) #subscribes to the topic of /get_me_tea, but where did this topic come from?20 21 def request_cb(self, msg): #what is msg?22 self.request.append(msg.data) #adds the msg to a list for a queue23 def execute(self, userdata):24 rospy.loginfo('Executing state WAIT_FOR_REQUEST with queue: ' + str(self.request))25 26 27 if self.request:28 userdata.out_request_loc = self.request.pop(0) #sets this information outside of just this state29 userdata.out_resting = False #basically says if we have a request in the queue list, we are no longer resting30 return 'request_received'31 else:32 if userdata.in_resting:33 rospy.sleep(0.5)34 return 'not_request_received'35 else:36 return 'rest_needed'37class GoForTeaSMNode:38 def __init__(self):39 pass40 def create_sm(self):41 sm = smach.StateMachine(outcomes=['succeeded', 'aborted'])42 sm.userdata.resting = True43 with sm:44 smach.StateMachine.add('WAIT_FOR_REQUEST', WaitForRequest(), #([name of state], [class name])45 transitions={'request_received': 'GO_TO_KITCHEN', #if request_received is returned, transition to GO_TO_KITCHEN 46 'not_request_received': 'WAIT_FOR_REQUEST',47 'rest_needed':'GO_TO_REST'},48 remapping = {'out_request_loc':'request_loc', #remapping format: ['inside state':'outside state'] 49 'in_resting':'resting',50 'out_resting':'resting'}) 51 smach.StateMachine.add('GO_TO_KITCHEN', GetCoordsAndMoveSM('kitchen'),52 transitions = {'succeeded':'GO_TO_REQUEST',53 'aborted':'WAIT_FOR_REQUEST'})54 smach.StateMachine.add('GO_TO_REQUEST', GetCoordsAndMoveSM(),55 transitions = {'succeeded':'WAIT_FOR_REQUEST', #we make it go to request state to see if rest is necessary56 'aborted':'WAIT_FOR_REQUEST'},57 remapping = {'in_request':'request_loc'})58 smach.StateMachine.add('GO_TO_REST', GetCoordsAndMoveSM('rest'),59 transitions = {'succeeded':'SET_RESTING',60 'aborted':'aborted'})61 #uses a callback state instead of a normal state because its simple and only has one outcome. 62 smach.StateMachine.add('SET_RESTING', CBState(self.set_resting), transitions = {'succeeded':'WAIT_FOR_REQUEST'},63 remapping = {'out_resting':'resting'})64 return sm65 @smach.cb_interface(outcomes=['succeeded'], output_keys=['out_resting'])66 def set_resting(userdata):67 userdata.out_resting = True68 return 'succeeded'69 70 def execute_sm(self):71 #this is so you can visually see the state machine72 sm = self.create_sm()73 sis = IntrospectionServer('tea_Server', sm, 'SM_ROOT') 74 sis.start()75 outcome = sm.execute()76 sis.stop()77 return outcome78#this is to simplify the redundancies in our code79class GetCoordsAndMoveSM(smach.StateMachine):80 def __init__(self, fixed_place=None):81 smach.StateMachine.__init__(self, outcomes = ['succeeded', 'aborted'], input_keys = [] if fixed_place else ['in_request']) #we know what fixed_place UNLESS from userdata82 with self:83 if fixed_place:84 #this first one is the servicestate for fixed places (rest, kitchen)85 smach.StateMachine.add('GET_COORDS', ServiceState('/str_to_coord', Str2Coord, request = Str2CoordRequest(fixed_place), #??how does it know what the fixed place coords are???86 response_slots = ['coordinates']), #must be the same thing in Str2Coord msg. the Str2Coord msg definition file has two parts: request and response. 'coordinates' is the response part87 transitions = {'succeeded':'MOVE_TO_PLACE',88 'preempted':'aborted'}, 89 remapping = {'coordinates':'place_coords'})90 else:91 #this second one is the servicestate for places from userdata92 smach.StateMachine.add('GET_COORDS', ServiceState('/str_to_coord', Str2Coord, request_slots = ['place'], #the Str2Coord service def file has two parts, and 'place' is the request part93 response_slots = ['coordinates']), #must be the same thing in Str2Coord msg. the Str2Coord msg definition file has two parts: request and response. 'coordinates' is the response part94 transitions = {'succeeded':'MOVE_TO_PLACE',95 'preempted':'aborted'}, 96 remapping = {'coordinates':'place_coords',97 'place':'in_request'})98 smach.StateMachine.add('MOVE_TO_PLACE', SimpleActionState('/move_base', MoveBaseAction,99 goal_slots = ['target_pose']),100 transitions={'succeeded':'succeeded',101 'preempted':'aborted',102 'aborted':'aborted'},103 remapping = {'target_pose':'place_coords'})104 105if __name__=='__main__':106 rospy.init_node('go_for_tea_sm', sys.argv) #name it the same as python file name107 instance = GoForTeaSMNode()108 instance.execute_sm()...

Full Screen

Full Screen

solution_better_go_for_tea_sm.py

Source:solution_better_go_for_tea_sm.py Github

copy

Full Screen

1#!/usr/bin/env python2import rospy3import sys4import smach5from smach_ros import IntrospectionServer, SimpleActionState, ServiceState6from std_msgs.msg import String7from move_base_msgs.msg import MoveBaseAction8from itr_tutorials.srv import Str2Coord, Str2CoordRequest9from smach import CBState10class WaitForRequest(smach.State):11 def __init__(self):12 smach.State.__init__(self, outcomes=['request_received', 'not_request_received', 'rest_needed'],13 output_keys=['out_request_loc', 'out_resting'],14 input_keys=['in_resting'])15 self.request = []16 self.request_subs = rospy.Subscriber('/get_me_tea', String, self.request_cb)17 def request_cb(self, msg):18 self.request.append(msg.data)19 def execute(self, userdata):20 rospy.loginfo('Executing state WAITFORREQUEST with queue: ' + str(self.request))21 if self.request:22 userdata.out_request_loc = self.request.pop(0)23 userdata.out_resting = False24 return 'request_received'25 else:26 if userdata.in_resting:27 rospy.sleep(0.5)28 return 'not_request_received'29 else:30 return 'rest_needed'31class GoForTeaSMNode:32 def __init__(self):33 pass34 def create_sm(self):35 sm = smach.StateMachine(outcomes=['succeeded', 'aborted'])36 sm.userdata.resting = True37 with sm:38 smach.StateMachine.add('WAIT_FOR_REQUEST', WaitForRequest(),39 transitions={'request_received': 'GO_TO_KITCHEN',40 'not_request_received': 'WAIT_FOR_REQUEST',41 'rest_needed': 'GO_TO_REST'},42 remapping={'out_request_loc': 'request_loc',43 'in_resting': 'resting',44 'out_resting': 'resting'})45 smach.StateMachine.add('GO_TO_KITCHEN', GetCoordsAndMoveSM('kitchen'),46 transitions={'succeeded': 'GO_TO_REQUEST', 'aborted': 'WAIT_FOR_REQUEST'})47 smach.StateMachine.add('GO_TO_REQUEST', GetCoordsAndMoveSM(),48 transitions={'succeeded': 'WAIT_FOR_REQUEST', 'aborted': 'WAIT_FOR_REQUEST'},49 remapping={'in_request': 'request_loc'})50 smach.StateMachine.add('GO_TO_REST', GetCoordsAndMoveSM('rest'),51 transitions={'succeeded': 'SET_RESTING', 'aborted': 'WAIT_FOR_REQUEST'})52 smach.StateMachine.add('SET_RESTING', CBState(self.set_resting), transitions={'succeeded': 'WAIT_FOR_REQUEST'},53 remapping={'out_resting': 'resting'})54 return sm55 @smach.cb_interface(outcomes=['succeeded'], output_keys=['out_resting'])56 def set_resting(userdata):57 userdata.out_resting = True58 return 'succeeded'59 def execute_sm(self):60 sm = self.create_sm()61 sis = IntrospectionServer('tea_Server', sm, 'SM_ROOT')62 sis.start()63 outcome = sm.execute()64 sis.stop()65 return outcome66class GetCoordsAndMoveSM(smach.StateMachine):67 def __init__(self, fixed_place=None):68 smach.StateMachine.__init__(self, outcomes=['succeeded', 'aborted'], input_keys=[] if fixed_place else ['in_request'])69 with self:70 if fixed_place:71 smach.StateMachine.add('GET_COORDS', ServiceState('/str_to_coord', Str2Coord,72 request=Str2CoordRequest(fixed_place),73 response_slots=['coordinates']),74 transitions={'succeeded': 'MOVE_TO_PLACE', 'preempted': 'aborted'},75 remapping={'coordinates': 'place_coords'})76 else:77 smach.StateMachine.add('GET_COORDS', ServiceState('/str_to_coord', Str2Coord,78 request_slots=['place'],79 response_slots=['coordinates']),80 transitions={'succeeded': 'MOVE_TO_PLACE', 'preempted': 'aborted'},81 remapping={'coordinates': 'place_coords',82 'place': 'in_request'})83 smach.StateMachine.add("MOVE_TO_PLACE", SimpleActionState('/move_base', MoveBaseAction,84 goal_slots=['target_pose']),85 transitions={'succeeded': 'succeeded', 'aborted': 'aborted',86 'preempted': 'aborted'},87 remapping={'target_pose': 'place_coords'})88if __name__ == "__main__":89 rospy.init_node("go_for_tea_sm", sys.argv)90 get_Tea = GoForTeaSMNode()91 get_Tea.execute_sm()...

Full Screen

Full Screen

Playwright tutorial

LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.

Chapters:

  1. What is Playwright : Playwright is comparatively new but has gained good popularity. Get to know some history of the Playwright with some interesting facts connected with it.
  2. How To Install Playwright : Learn in detail about what basic configuration and dependencies are required for installing Playwright and run a test. Get a step-by-step direction for installing the Playwright automation framework.
  3. Playwright Futuristic Features: Launched in 2020, Playwright gained huge popularity quickly because of some obliging features such as Playwright Test Generator and Inspector, Playwright Reporter, Playwright auto-waiting mechanism and etc. Read up on those features to master Playwright testing.
  4. What is Component Testing: Component testing in Playwright is a unique feature that allows a tester to test a single component of a web application without integrating them with other elements. Learn how to perform Component testing on the Playwright automation framework.
  5. Inputs And Buttons In Playwright: Every website has Input boxes and buttons; learn about testing inputs and buttons with different scenarios and examples.
  6. Functions and Selectors in Playwright: Learn how to launch the Chromium browser with Playwright. Also, gain a better understanding of some important functions like “BrowserContext,” which allows you to run multiple browser sessions, and “newPage” which interacts with a page.
  7. Handling Alerts and Dropdowns in Playwright : Playwright interact with different types of alerts and pop-ups, such as simple, confirmation, and prompt, and different types of dropdowns, such as single selector and multi-selector get your hands-on with handling alerts and dropdown in Playright testing.
  8. Playwright vs Puppeteer: Get to know about the difference between two testing frameworks and how they are different than one another, which browsers they support, and what features they provide.
  9. Run Playwright Tests on LambdaTest: Playwright testing with LambdaTest leverages test performance to the utmost. You can run multiple Playwright tests in Parallel with the LammbdaTest test cloud. Get a step-by-step guide to run your Playwright test on the LambdaTest platform.
  10. Playwright Python Tutorial: Playwright automation framework support all major languages such as Python, JavaScript, TypeScript, .NET and etc. However, there are various advantages to Python end-to-end testing with Playwright because of its versatile utility. Get the hang of Playwright python testing with this chapter.
  11. Playwright End To End Testing Tutorial: Get your hands on with Playwright end-to-end testing and learn to use some exciting features such as TraceViewer, Debugging, Networking, Component testing, Visual testing, and many more.
  12. Playwright Video Tutorial: Watch the video tutorials on Playwright testing from experts and get a consecutive in-depth explanation of Playwright automation testing.

Run Playwright Python automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful