Best Python code snippet using playwright-python
action.py
Source:action.py  
1import json2import logging3import requests4from urllib.parse import urljoin5from tabulate import tabulate6from waiter import http_util, terminal7from waiter.format import format_last_request_time8from waiter.format import format_status9from waiter.querying import get_service, get_services_using_token10from waiter.querying import print_no_data, query_service, query_services, query_token11from waiter.util import is_service_current, str2bool, response_message, print_error, wait_until12def ping_on_cluster(cluster, timeout, wait_for_request, token_name, service_exists_fn):13    """Pings using the given token name (or ^SERVICE-ID#) in the given cluster."""14    cluster_name = cluster['name']15    def perform_ping():16        status = None17        try:18            default_queue_timeout_millis = 30000019            timeout_seconds = timeout if wait_for_request else 520            timeout_millis = timeout_seconds * 100021            headers = {22                'X-Waiter-Queue-Timeout': str(max(default_queue_timeout_millis, timeout_millis)),23                'X-Waiter-Token': token_name,24                'X-Waiter-Timeout': str(timeout_millis)25            }26            read_timeout = timeout_seconds if wait_for_request else (timeout_seconds + 5)27            resp = http_util.get(cluster, '/waiter-ping', headers=headers, read_timeout=read_timeout)28            logging.debug(f'Response status code: {resp.status_code}')29            resp_json = resp.json()30            if resp.status_code == 200:31                status = resp_json.get('service-state', {}).get('status')32                ping_response = resp_json['ping-response']33                ping_response_result = ping_response['result']34                if ping_response_result in ['descriptor-error', 'received-response']:35                    ping_response_status = ping_response['status']36                    if ping_response_status == 200:37                        print(terminal.success('Ping successful.'))38                        result = True39                    else:40                        print_error(f'Ping responded with non-200 status {ping_response_status}.')41                        try:42                            ping_response_waiter_error = json.loads(ping_response['body'])['waiter-error']['message']43                            print_error(ping_response_waiter_error)44                        except json.JSONDecodeError:45                            logging.debug('Ping response is not in json format, cannot display waiter-error message.')46                        except KeyError:47                            logging.debug('Ping response body does not contain waiter-error message.')48                        result = False49                elif ping_response_result == 'timed-out':50                    if wait_for_request:51                        print_error('Ping request timed out.')52                        result = False53                    else:54                        logging.debug('ignoring ping request timeout due to --no-wait')55                        result = True56                else:57                    print_error(f'Encountered unknown ping result: {ping_response_result}.')58                    result = False59            else:60                print_error(response_message(resp_json))61                result = False62        except Exception:63            result = False64            message = f'Encountered error while pinging in {cluster_name}.'65            logging.exception(message)66            if wait_for_request:67                print_error(message)68        return result, status69    def check_service_status():70        result = wait_until(service_exists_fn, timeout=timeout, interval=5)71        if result:72            print(f'Service is currently {format_status(result["status"])}.')73            return True74        else:75            print_error('Timeout while waiting for service to start.')76            return False77    succeeded, service_status = perform_ping()78    if succeeded:79        if service_status is None or service_status == 'Inactive':80            check_service_status()81        else:82            print(f'Service is currently {format_status(service_status)}.')83    return succeeded84def token_has_current_service(cluster, token_name, current_token_etag):85    """If the given token has a "current" service, returns that service else None"""86    services = get_services_using_token(cluster, token_name)87    if services is not None:88        services = [s for s in services if is_service_current(s, current_token_etag, token_name)]89        return services[0] if len(services) > 0 else False90    else:91        cluster_name = cluster['name']92        print_error(f'Unable to retrieve services using token {token_name} in {cluster_name}.')93        return None94def ping_token_on_cluster(cluster, token_name, timeout, wait_for_request,95                          current_token_etag):96    """Pings the token with the given token name in the given cluster."""97    cluster_name = cluster['name']98    print(f'Pinging token {terminal.bold(token_name)} '99          f'in {terminal.bold(cluster_name)}...')100    return ping_on_cluster(cluster, timeout, wait_for_request,101                           token_name, lambda: token_has_current_service(cluster, token_name, current_token_etag))102def service_is_active(cluster, service_id):103    """If the given service id is active, returns the service, else None"""104    service = get_service(cluster, service_id)105    return service if (service and service.get('status') != 'Inactive') else None106def ping_service_on_cluster(cluster, service_id, timeout, wait_for_request):107    """Pings the service with the given service id in the given cluster."""108    cluster_name = cluster['name']109    print(f'Pinging service {terminal.bold(service_id)} '110          f'in {terminal.bold(cluster_name)}...')111    return ping_on_cluster(cluster, timeout, wait_for_request,112                           f'^SERVICE-ID#{service_id}', lambda: service_is_active(cluster, service_id))113def kill_service_on_cluster(cluster, service_id, timeout_seconds):114    """Kills the service with the given service id in the given cluster."""115    cluster_name = cluster['name']116    http_util.set_retries(0)117    try:118        print(f'Killing service {terminal.bold(service_id)} in {terminal.bold(cluster_name)}...')119        params = {'timeout': timeout_seconds * 1000}120        resp = http_util.delete(cluster, f'/apps/{service_id}', params=params, read_timeout=timeout_seconds)121        logging.debug(f'Response status code: {resp.status_code}')122        if resp.status_code == 200:123            routers_agree = resp.json().get('routers-agree')124            if routers_agree:125                print(f'Successfully killed {service_id} in {cluster_name}.')126                return True127            else:128                print(f'Successfully killed {service_id} in {cluster_name}. '129                      f'Server-side timeout waiting for routers to update.')130                return False131        else:132            print_error(response_message(resp.json()))133            return False134    except requests.exceptions.ReadTimeout:135        message = f'Request timed out while killing {service_id} in {cluster_name}.'136        logging.exception(message)137        print_error(message)138    except Exception:139        message = f'Encountered error while killing {service_id} in {cluster_name}.'140        logging.exception(message)141        print_error(message)142def process_kill_request(clusters, token_name_or_service_id, is_service_id, force_flag, timeout_secs,143                         no_service_result=False):144    """Kills the service(s) using the given token name or service-id.145    Returns False if no services can be found or if there was a failure in deleting any service.146    Returns True if all services using the token were deleted successfully."""147    if is_service_id:148        query_result = query_service(clusters, token_name_or_service_id)149        num_services = query_result['count']150        if num_services == 0:151            print_no_data(clusters)152            return no_service_result153    else:154        query_result = query_services(clusters, token_name_or_service_id)155        num_services = query_result['count']156        if num_services == 0:157            clusters_text = ' / '.join([terminal.bold(c['name']) for c in clusters])158            print(f'There are no services using token {terminal.bold(token_name_or_service_id)} in {clusters_text}.')159            return no_service_result160        elif num_services > 1:161            print(f'There are {num_services} services using token {terminal.bold(token_name_or_service_id)}:')162    cluster_data_pairs = sorted(query_result['clusters'].items())163    clusters_by_name = {c['name']: c for c in clusters}164    overall_success = True165    for cluster_name, data in cluster_data_pairs:166        if is_service_id:167            service = data['service']168            service['service-id'] = token_name_or_service_id169            services = [service]170        else:171            services = sorted(data['services'], key=lambda s: s.get('last-request-time', None) or '', reverse=True)172        for service in services:173            service_id = service['service-id']174            cluster = clusters_by_name[cluster_name]175            status_string = service['status']176            status = format_status(status_string)177            inactive = status_string == 'Inactive'178            should_kill = False179            if force_flag or num_services == 1:180                should_kill = True181            else:182                url = urljoin(cluster['url'], f'apps/{service_id}')183                if is_service_id:184                    active_instances = service['instances']['active-instances']185                    healthy_count = len([i for i in active_instances if i['healthy?']])186                    unhealthy_count = len([i for i in active_instances if not i['healthy?']])187                else:188                    healthy_count = service['instance-counts']['healthy-instances']189                    unhealthy_count = service['instance-counts']['unhealthy-instances']190                last_request_time = format_last_request_time(service)191                run_as_user = service['service-description']['run-as-user']192                table = [['Status', status],193                         ['Healthy', healthy_count],194                         ['Unhealthy', unhealthy_count],195                         ['URL', url],196                         ['Run as user', run_as_user]]197                table_text = tabulate(table, tablefmt='plain')198                print(f'\n'199                      f'=== {terminal.bold(cluster_name)} / {terminal.bold(service_id)} ===\n'200                      f'\n'201                      f'Last Request: {last_request_time}\n'202                      f'\n'203                      f'{table_text}\n')204                if not inactive:205                    answer = input(f'Kill service in {terminal.bold(cluster_name)}? ')206                    should_kill = str2bool(answer)207            if inactive:208                print(f'Service {terminal.bold(service_id)} on {terminal.bold(cluster_name)} cannot be killed because '209                      f'it is already {status}.')210                should_kill = False211            if should_kill:212                success = kill_service_on_cluster(cluster, service_id, timeout_secs)213                overall_success = overall_success and success214    return overall_success215def token_explicitly_created_on_cluster(cluster, token_cluster_name):216    """Returns true if the given token cluster matches the configured cluster name of the given cluster"""217    cluster_settings, _ = http_util.make_data_request(cluster, lambda: http_util.get(cluster, '/settings'))218    cluster_config_name = cluster_settings['cluster-config']['name'].upper()219    created_on_this_cluster = token_cluster_name == cluster_config_name220    return created_on_this_cluster221def process_ping_request(clusters, token_name_or_service_id, is_service_id, timeout_secs, wait_for_request):222    """Pings the service using the given token name or service-id.223    Returns False if the ping request fails or times out.224    Returns True if the ping request completes successfully."""225    if is_service_id:226        query_result = query_service(clusters, token_name_or_service_id)227    else:228        query_result = query_token(clusters, token_name_or_service_id)229    if query_result['count'] == 0:230        print_no_data(clusters)231        return False232    http_util.set_retries(0)233    cluster_data_pairs = sorted(query_result['clusters'].items())234    clusters_by_name = {c['name']: c for c in clusters}235    overall_success = True236    for cluster_name, data in cluster_data_pairs:237        cluster = clusters_by_name[cluster_name]238        if is_service_id:239            success = ping_service_on_cluster(cluster, token_name_or_service_id, timeout_secs, wait_for_request)240        else:241            token_data = data['token']242            token_etag = data['etag']243            token_cluster_name = token_data['cluster'].upper()244            if len(clusters) == 1 or token_explicitly_created_on_cluster(cluster, token_cluster_name):245                success = ping_token_on_cluster(cluster, token_name_or_service_id, timeout_secs,246                                                wait_for_request, token_etag)247            else:248                print(f'Not pinging token {terminal.bold(token_name_or_service_id)} '249                      f'in {terminal.bold(cluster_name)} '250                      f'because it was created in {terminal.bold(token_cluster_name)}.')251                success = True252        overall_success = overall_success and success...test_django.py
Source:test_django.py  
...30    User.objects.all().delete()31def test_notify(bugsnag_server, django_client):32    response = django_client.get('/notes/handled-exception/?foo=strawberry')33    assert response.status_code == 20034    bugsnag_server.wait_for_request()35    payload = bugsnag_server.received[0]['json_body']36    event = payload['events'][0]37    exception = event['exceptions'][0]38    assert payload['apiKey'] == 'a05afff2bd2ffaf0ab0f52715bbdcffd'39    assert event['context'] == 'notes.views.handle_notify'40    assert event['severityReason'] == {'type': 'handledException'}41    assert event['device']['runtimeVersions']['django'] == django.__version__42    assert 'environment' not in payload['events'][0]['metaData']43    assert event['metaData']['request'] == {44        'method': 'GET',45        'url': 'http://testserver/notes/handled-exception/?foo=strawberry',46        'path': '/notes/handled-exception/',47        'POST': {},48        'encoding': None,49        'GET': {'foo': ['strawberry']}50    }51    assert event['user'] == {}52    assert exception['errorClass'] == 'KeyError'53    assert exception['message'] == "'nonexistent-item'"54    assert exception['stacktrace'][0] == {55        'code': {56             '39': 'def handle_notify(request):',57             '40': '    items = {}',58             '41': '    try:',59             '42': '        print("item: {}" % items["nonexistent-item"])',60             '43': '    except KeyError as e:',61             '44': "        bugsnag.notify(e, unhappy='nonexistent-file')",62             '45': '',63        },64        'file': 'notes/views.py',65        'inProject': True,66        'lineNumber': 42,67        'method': 'handle_notify',68    }69def test_enable_environment(bugsnag_server, django_client):70    bugsnag.configure(send_environment=True)71    response = django_client.get('/notes/handled-exception/?foo=strawberry')72    assert response.status_code == 20073    bugsnag_server.wait_for_request()74    payload = bugsnag_server.received[0]['json_body']75    event = payload['events'][0]76    assert event['metaData']['environment']['REQUEST_METHOD'] == 'GET'77def test_notify_custom_info(bugsnag_server, django_client):78    django_client.get('/notes/handled-exception-custom/')79    bugsnag_server.wait_for_request()80    payload = bugsnag_server.received[0]['json_body']81    event = payload['events'][0]82    assert payload['apiKey'] == 'a05afff2bd2ffaf0ab0f52715bbdcffd'83    assert event['context'] == 'custom_info'84    assert event['severityReason'] == {'type': 'userSpecifiedSeverity'}85    assert event['severity'] == 'info'86def test_notify_post_body(bugsnag_server, django_client):87    response = django_client.post('/notes/handled-exception/',88                                  '{"foo": "strawberry"}',89                                  content_type='application/json')90    assert response.status_code == 20091    bugsnag_server.wait_for_request()92    payload = bugsnag_server.received[0]['json_body']93    event = payload['events'][0]94    exception = event['exceptions'][0]95    assert payload['apiKey'] == 'a05afff2bd2ffaf0ab0f52715bbdcffd'96    assert event['context'] == 'notes.views.handle_notify'97    assert event['severityReason'] == {'type': 'handledException'}98    assert event['severity'] == 'warning'99    assert event['device']['runtimeVersions']['django'] == django.__version__100    assert event['metaData']['request'] == {101        'method': 'POST',102        'url': 'http://testserver/notes/handled-exception/',103        'path': '/notes/handled-exception/',104        'GET': {},105        'encoding': None,106        'POST': {'foo': 'strawberry'}107    }108    assert event['user'] == {}109    assert exception['errorClass'] == 'KeyError'110    assert exception['message'] == "'nonexistent-item'"111    assert exception['stacktrace'][0] == {112        'code': {113             '39': 'def handle_notify(request):',114             '40': '    items = {}',115             '41': '    try:',116             '42': '        print("item: {}" % items["nonexistent-item"])',117             '43': '    except KeyError as e:',118             '44': "        bugsnag.notify(e, unhappy='nonexistent-file')",119             '45': '',120        },121        'file': 'notes/views.py',122        'inProject': True,123        'lineNumber': 42,124        'method': 'handle_notify',125    }126def test_unhandled_exception(bugsnag_server, django_client):127    with pytest.raises(RuntimeError):128        django_client.get('/notes/unhandled-crash/')129    bugsnag_server.wait_for_request()130    payload = bugsnag_server.received[0]['json_body']131    event = payload['events'][0]132    exception = event['exceptions'][0]133    assert payload['apiKey'] == 'a05afff2bd2ffaf0ab0f52715bbdcffd'134    assert event['context'] == 'crash'135    assert event['severityReason'] == {136        'type': 'unhandledExceptionMiddleware',137        'attributes':  {'framework': 'Django'}138    }139    assert event['device']['runtimeVersions']['django'] == django.__version__140    assert event['metaData']['request'] == {141        'method': 'GET',142        'url': 'http://testserver/notes/unhandled-crash/',143        'path': '/notes/unhandled-crash/',144        'POST': {},145        'encoding': None,146        'GET': {}147    }148    assert event['user'] == {}149    assert exception['errorClass'] == 'RuntimeError'150    assert exception['message'] == 'failed to return in time'151    assert exception['stacktrace'][0] == {152        'method': 'unhandled_crash',153        'file': 'notes/views.py',154        'lineNumber': 32,155        'inProject': True,156        'code': {157            '29': '',158            '30': '',159            '31': 'def unhandled_crash(request):',160            '32': "    raise RuntimeError('failed to return in time')",161            '33': '',162            '34': '',163            '35': 'def unhandled_crash_in_template(request):',164        },165    }166    assert exception['stacktrace'][1]['inProject'] is False167def test_unhandled_exception_in_template(bugsnag_server, django_client):168    with pytest.raises(TemplateSyntaxError):169        django_client.get('/notes/unhandled-template-crash/')170    bugsnag_server.wait_for_request()171    payload = bugsnag_server.received[0]['json_body']172    event = payload['events'][0]173    exception = event['exceptions'][0]174    assert payload['apiKey'] == 'a05afff2bd2ffaf0ab0f52715bbdcffd'175    assert event['context'] == 'notes.views.unhandled_crash_in_template'176    assert event['severityReason'] == {177        'type': 'unhandledExceptionMiddleware',178        'attributes':  {'framework': 'Django'}179    }180    assert event['device']['runtimeVersions']['django'] == django.__version__181    assert event['metaData']['request'] == {182        'method': 'GET',183        'url': 'http://testserver/notes/unhandled-template-crash/',184        'path': '/notes/unhandled-template-crash/',185        'POST': {},186        'encoding': None,187        'GET': {}188    }189    assert event['user'] == {}190    assert exception['errorClass'] == template_error_class191    assert 'idunno()' in exception['message']192def test_ignores_http404(bugsnag_server, django_client):193    response = django_client.get('/notes/missing_route/')194    assert response.status_code == 404195    with pytest.raises(MissingRequestError):196        bugsnag_server.wait_for_request()197    assert len(bugsnag_server.received) == 0198def test_report_error_from_http404handler(bugsnag_server, django_client):199    with pytest.raises(Exception):200        django_client.get('/notes/poorly-handled-404')201    bugsnag_server.wait_for_request()202    payload = bugsnag_server.received[0]['json_body']203    event = payload['events'][0]204    exception = event['exceptions'][0]205    assert payload['apiKey'] == 'a05afff2bd2ffaf0ab0f52715bbdcffd'206    assert event['context'] == 'GET /notes/poorly-handled-404'207    assert event['severityReason'] == {208        'type': 'unhandledExceptionMiddleware',209        'attributes':  {'framework': 'Django'}210    }211    assert event['device']['runtimeVersions']['django'] == django.__version__212    assert event['metaData']['request'] == {213        'method': 'GET',214        'url': 'http://testserver/notes/poorly-handled-404',215        'path': '/notes/poorly-handled-404',216        'POST': {},217        'encoding': None,218        'GET': {}219    }220    assert exception['errorClass'] == 'Exception'221    assert exception['message'] == 'nah'222    assert exception['stacktrace'][0] == {223        'method': 'handler404',224        'file': 'todo/urls.py',225        'lineNumber': 11,226        'inProject': True,227        'code': {228            '8': '',229            '9': 'def handler404(request, *args, **kwargs):',230            '10': "    if 'poorly-handled-404' in request.path:",231            '11': "        raise Exception('nah')",232            '12': '',233            '13': "    response = HttpResponseNotFound('Terrible happenings!',",  # noqa: E501234            '14': "                                    content_type='text/plain')",  # noqa: E501235        },236    }237def test_notify_appends_user_data(bugsnag_server, django_client):238    django_client.login(username='test', password='hunter2')239    response = django_client.get('/notes/handled-exception/?foo=strawberry')240    assert response.status_code == 200241    bugsnag_server.wait_for_request()242    payload = bugsnag_server.received[0]['json_body']243    event = payload['events'][0]244    exception = event['exceptions'][0]245    assert payload['apiKey'] == 'a05afff2bd2ffaf0ab0f52715bbdcffd'246    assert event['context'] == 'notes.views.handle_notify'247    assert event['severityReason'] == {'type': 'handledException'}248    assert event['device']['runtimeVersions']['django'] == django.__version__249    assert event['metaData']['custom']['unhappy'] == 'nonexistent-file'250    assert event['metaData']['request'] == {251        'method': 'GET',252        'url': 'http://testserver/notes/handled-exception/?foo=strawberry',253        'path': '/notes/handled-exception/',254        'POST': {},255        'encoding': None,256        'GET': {'foo': ['strawberry']}257    }258    assert event['user'] == {'email': 'test@example.com', 'id': 'test'}259    assert exception['errorClass'] == 'KeyError'260    assert exception['message'] == "'nonexistent-item'"261    assert exception['stacktrace'][0] == {262        'code': {263             '39': 'def handle_notify(request):',264             '40': '    items = {}',265             '41': '    try:',266             '42': '        print("item: {}" % items["nonexistent-item"])',267             '43': '    except KeyError as e:',268             '44': "        bugsnag.notify(e, unhappy='nonexistent-file')",269             '45': '',270        },271        'file': 'notes/views.py',272        'inProject': True,273        'lineNumber': 42,274        'method': 'handle_notify',275    }276def test_crash_appends_user_data(bugsnag_server, django_client):277    django_client.login(username='test', password='hunter2')278    with pytest.raises(RuntimeError):279        django_client.get('/notes/unhandled-crash/')280    bugsnag_server.wait_for_request()281    payload = bugsnag_server.received[0]['json_body']282    event = payload['events'][0]283    exception = event['exceptions'][0]284    assert payload['apiKey'] == 'a05afff2bd2ffaf0ab0f52715bbdcffd'285    assert event['context'] == 'crash'286    assert event['severityReason'] == {287        'type': 'unhandledExceptionMiddleware',288        'attributes':  {'framework': 'Django'}289    }290    assert event['device']['runtimeVersions']['django'] == django.__version__291    assert event['metaData']['request'] == {292        'method': 'GET',293        'url': 'http://testserver/notes/unhandled-crash/',294        'path': '/notes/unhandled-crash/',295        'POST': {},296        'encoding': None,297        'GET': {}298    }299    assert event['user'] == {'email': 'test@example.com', 'id': 'test'}300    assert exception['errorClass'] == 'RuntimeError'301    assert exception['message'] == 'failed to return in time'302    assert exception['stacktrace'][0] == {303        'method': 'unhandled_crash',304        'file': 'notes/views.py',305        'lineNumber': 32,306        'inProject': True,307        'code': {308            '29': '',309            '30': '',310            '31': 'def unhandled_crash(request):',311            '32': "    raise RuntimeError('failed to return in time')",312            '33': '',313            '34': '',314            '35': 'def unhandled_crash_in_template(request):',315        },316    }317    assert exception['stacktrace'][1]['inProject'] is False318def test_read_request_in_callback(bugsnag_server, django_client):319    with pytest.raises(RuntimeError):320        django_client.get('/notes/crash-with-callback/?user_id=foo')321    bugsnag_server.wait_for_request()322    payload = bugsnag_server.received[0]['json_body']323    event = payload['events'][0]...better_go_for_tea_sm.py
Source:better_go_for_tea_sm.py  
1#!/usr/bin/env python2import rospy3import sys4import smach5from std_msgs.msg import String6from smach_ros import IntrospectionServer, SimpleActionState, ServiceState7from move_base_msgs.msg import MoveBaseAction8from itr_tutorials.srv import Str2Coord, Str2CoordRequest9from smach import CBState10#WAITFORREQUEST STATE!!! (we got this from the slide) 11class WaitForRequest(smach.State):12    def __init__(self):13        smach.State.__init__(self, outcomes=['request_received','not_request_received','rest_needed'], #the three possible outcomes for the WAIT_FOR_REQUEST state 14                                output_keys = ['out_request_loc','out_resting'],15                                input_keys = ['in_resting']) #userdata allows states to communicate with each other16        17        self.request = []  #i think it makes it empty so far and changes in the execute function18        19        self.request_subs = rospy.Subscriber('/get_me_tea', String, self.request_cb) #subscribes to the topic of /get_me_tea, but where did this topic come from?20    21    def request_cb(self, msg):  #what is msg?22        self.request.append(msg.data)  #adds the msg to a list for a queue23    def execute(self, userdata):24        rospy.loginfo('Executing state WAIT_FOR_REQUEST with queue: ' + str(self.request))25        26        27        if self.request:28            userdata.out_request_loc = self.request.pop(0) #sets this information outside of just this state29            userdata.out_resting = False   #basically says if we have a request in the queue list, we are no longer resting30            return 'request_received'31        else:32            if userdata.in_resting:33                rospy.sleep(0.5)34                return 'not_request_received'35            else:36                return 'rest_needed'37class GoForTeaSMNode:38    def __init__(self):39        pass40    def create_sm(self):41        sm = smach.StateMachine(outcomes=['succeeded', 'aborted'])42        sm.userdata.resting = True43        with sm:44            smach.StateMachine.add('WAIT_FOR_REQUEST', WaitForRequest(),   #([name of state], [class name])45                                    transitions={'request_received': 'GO_TO_KITCHEN',         #if request_received is returned, transition to GO_TO_KITCHEN 46                                                'not_request_received': 'WAIT_FOR_REQUEST',47                                                'rest_needed':'GO_TO_REST'},48                                    remapping = {'out_request_loc':'request_loc',    #remapping format: ['inside state':'outside state'] 49                                                'in_resting':'resting',50                                                'out_resting':'resting'})  51            smach.StateMachine.add('GO_TO_KITCHEN', GetCoordsAndMoveSM('kitchen'),52                                    transitions = {'succeeded':'GO_TO_REQUEST',53                                                    'aborted':'WAIT_FOR_REQUEST'})54            smach.StateMachine.add('GO_TO_REQUEST', GetCoordsAndMoveSM(),55                                    transitions = {'succeeded':'WAIT_FOR_REQUEST',  #we make it go to request state to see if rest is necessary56                                                    'aborted':'WAIT_FOR_REQUEST'},57                                    remapping = {'in_request':'request_loc'})58            smach.StateMachine.add('GO_TO_REST', GetCoordsAndMoveSM('rest'),59                                    transitions = {'succeeded':'SET_RESTING',60                                                    'aborted':'aborted'})61            #uses a callback state instead of a normal state because its simple and only has one outcome. 62            smach.StateMachine.add('SET_RESTING', CBState(self.set_resting), transitions = {'succeeded':'WAIT_FOR_REQUEST'},63                                    remapping = {'out_resting':'resting'})64        return sm65    @smach.cb_interface(outcomes=['succeeded'], output_keys=['out_resting'])66    def set_resting(userdata):67        userdata.out_resting = True68        return 'succeeded'69    70    def execute_sm(self):71        #this is so you can visually see the state machine72        sm = self.create_sm()73        sis = IntrospectionServer('tea_Server', sm, 'SM_ROOT')   74        sis.start()75        outcome = sm.execute()76        sis.stop()77        return outcome78#this is to simplify the redundancies in our code79class GetCoordsAndMoveSM(smach.StateMachine):80    def __init__(self, fixed_place=None):81        smach.StateMachine.__init__(self, outcomes = ['succeeded', 'aborted'], input_keys = [] if fixed_place else ['in_request'])   #we know what fixed_place UNLESS from userdata82        with self:83            if fixed_place:84                #this first one is the servicestate for fixed places (rest, kitchen)85                smach.StateMachine.add('GET_COORDS', ServiceState('/str_to_coord', Str2Coord, request = Str2CoordRequest(fixed_place),  #??how does it know what the fixed place coords are???86                                                                        response_slots = ['coordinates']),  #must be the same thing in Str2Coord msg. the Str2Coord msg definition file has two parts: request and response. 'coordinates' is the response part87                                                                        transitions = {'succeeded':'MOVE_TO_PLACE',88                                                                                        'preempted':'aborted'}, 89                                                                        remapping = {'coordinates':'place_coords'})90            else:91                #this second one is the servicestate for places from userdata92                smach.StateMachine.add('GET_COORDS', ServiceState('/str_to_coord', Str2Coord, request_slots = ['place'],  #the Str2Coord service def file has two parts, and 'place' is the request part93                                                                        response_slots = ['coordinates']),  #must be the same thing in Str2Coord msg. the Str2Coord msg definition file has two parts: request and response. 'coordinates' is the response part94                                                                        transitions = {'succeeded':'MOVE_TO_PLACE',95                                                                                        'preempted':'aborted'}, 96                                                                        remapping = {'coordinates':'place_coords',97                                                                                    'place':'in_request'})98            smach.StateMachine.add('MOVE_TO_PLACE', SimpleActionState('/move_base', MoveBaseAction,99                                                                    goal_slots = ['target_pose']),100                                                                    transitions={'succeeded':'succeeded',101                                                                                    'preempted':'aborted',102                                                                                    'aborted':'aborted'},103                                                                    remapping = {'target_pose':'place_coords'})104    105if __name__=='__main__':106    rospy.init_node('go_for_tea_sm', sys.argv) #name it the same as python file name107    instance = GoForTeaSMNode()108    instance.execute_sm()...solution_better_go_for_tea_sm.py
Source:solution_better_go_for_tea_sm.py  
1#!/usr/bin/env python2import rospy3import sys4import smach5from smach_ros import IntrospectionServer, SimpleActionState, ServiceState6from std_msgs.msg import String7from move_base_msgs.msg import MoveBaseAction8from itr_tutorials.srv import Str2Coord, Str2CoordRequest9from smach import CBState10class WaitForRequest(smach.State):11    def __init__(self):12        smach.State.__init__(self, outcomes=['request_received', 'not_request_received', 'rest_needed'],13                             output_keys=['out_request_loc', 'out_resting'],14                             input_keys=['in_resting'])15        self.request = []16        self.request_subs = rospy.Subscriber('/get_me_tea', String, self.request_cb)17    def request_cb(self, msg):18        self.request.append(msg.data)19    def execute(self, userdata):20        rospy.loginfo('Executing state WAITFORREQUEST with queue: ' + str(self.request))21        if self.request:22            userdata.out_request_loc = self.request.pop(0)23            userdata.out_resting = False24            return 'request_received'25        else:26            if userdata.in_resting:27                rospy.sleep(0.5)28                return 'not_request_received'29            else:30                return 'rest_needed'31class GoForTeaSMNode:32    def __init__(self):33        pass34    def create_sm(self):35        sm = smach.StateMachine(outcomes=['succeeded', 'aborted'])36        sm.userdata.resting = True37        with sm:38            smach.StateMachine.add('WAIT_FOR_REQUEST', WaitForRequest(),39                                   transitions={'request_received': 'GO_TO_KITCHEN',40                                                'not_request_received': 'WAIT_FOR_REQUEST',41                                                'rest_needed': 'GO_TO_REST'},42                                   remapping={'out_request_loc': 'request_loc',43                                              'in_resting': 'resting',44                                              'out_resting': 'resting'})45            smach.StateMachine.add('GO_TO_KITCHEN', GetCoordsAndMoveSM('kitchen'),46                                   transitions={'succeeded': 'GO_TO_REQUEST', 'aborted': 'WAIT_FOR_REQUEST'})47            smach.StateMachine.add('GO_TO_REQUEST', GetCoordsAndMoveSM(),48                                   transitions={'succeeded': 'WAIT_FOR_REQUEST', 'aborted': 'WAIT_FOR_REQUEST'},49                                   remapping={'in_request': 'request_loc'})50            smach.StateMachine.add('GO_TO_REST', GetCoordsAndMoveSM('rest'),51                                   transitions={'succeeded': 'SET_RESTING', 'aborted': 'WAIT_FOR_REQUEST'})52            smach.StateMachine.add('SET_RESTING', CBState(self.set_resting), transitions={'succeeded': 'WAIT_FOR_REQUEST'},53                                   remapping={'out_resting': 'resting'})54        return sm55    @smach.cb_interface(outcomes=['succeeded'], output_keys=['out_resting'])56    def set_resting(userdata):57        userdata.out_resting = True58        return 'succeeded'59    def execute_sm(self):60        sm = self.create_sm()61        sis = IntrospectionServer('tea_Server', sm, 'SM_ROOT')62        sis.start()63        outcome = sm.execute()64        sis.stop()65        return outcome66class GetCoordsAndMoveSM(smach.StateMachine):67    def __init__(self, fixed_place=None):68        smach.StateMachine.__init__(self, outcomes=['succeeded', 'aborted'], input_keys=[] if fixed_place else ['in_request'])69        with self:70            if fixed_place:71                smach.StateMachine.add('GET_COORDS', ServiceState('/str_to_coord', Str2Coord,72                                                                  request=Str2CoordRequest(fixed_place),73                                                                  response_slots=['coordinates']),74                                       transitions={'succeeded': 'MOVE_TO_PLACE', 'preempted': 'aborted'},75                                       remapping={'coordinates': 'place_coords'})76            else:77                smach.StateMachine.add('GET_COORDS', ServiceState('/str_to_coord', Str2Coord,78                                                                  request_slots=['place'],79                                                                  response_slots=['coordinates']),80                                       transitions={'succeeded': 'MOVE_TO_PLACE', 'preempted': 'aborted'},81                                       remapping={'coordinates': 'place_coords',82                                                  'place': 'in_request'})83            smach.StateMachine.add("MOVE_TO_PLACE", SimpleActionState('/move_base', MoveBaseAction,84                                                                      goal_slots=['target_pose']),85                                   transitions={'succeeded': 'succeeded', 'aborted': 'aborted',86                                                'preempted': 'aborted'},87                                   remapping={'target_pose': 'place_coords'})88if __name__ == "__main__":89    rospy.init_node("go_for_tea_sm", sys.argv)90    get_Tea = GoForTeaSMNode()91    get_Tea.execute_sm()...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
